@ -80,9 +80,11 @@ luks_default_key_secret_opt = 'key-secret=keysec0'
def qemu_img ( * args ) :
''' Run qemu-img and return the exit code '''
devnull = open ( ' /dev/null ' , ' r+ ' )
exitcode = subprocess . call ( qemu_img_args + list ( args ) , stdin = devnull , stdout = devnull )
exitcode = subprocess . call ( qemu_img_args + list ( args ) ,
stdin = devnull , stdout = devnull )
if exitcode < 0 :
sys . stderr . write ( ' qemu-img received signal %i : %s \n ' % ( - exitcode , ' ' . join ( qemu_img_args + list ( args ) ) ) )
sys . stderr . write ( ' qemu-img received signal %i : %s \n '
% ( - exitcode , ' ' . join ( qemu_img_args + list ( args ) ) ) )
return exitcode
def ordered_qmp ( qmsg , conv_keys = True ) :
@ -121,7 +123,8 @@ def qemu_img_verbose(*args):
''' Run qemu-img without suppressing its output and return the exit code '''
exitcode = subprocess . call ( qemu_img_args + list ( args ) )
if exitcode < 0 :
sys . stderr . write ( ' qemu-img received signal %i : %s \n ' % ( - exitcode , ' ' . join ( qemu_img_args + list ( args ) ) ) )
sys . stderr . write ( ' qemu-img received signal %i : %s \n '
% ( - exitcode , ' ' . join ( qemu_img_args + list ( args ) ) ) )
return exitcode
def qemu_img_pipe ( * args ) :
@ -132,7 +135,8 @@ def qemu_img_pipe(*args):
universal_newlines = True )
exitcode = subp . wait ( )
if exitcode < 0 :
sys . stderr . write ( ' qemu-img received signal %i : %s \n ' % ( - exitcode , ' ' . join ( qemu_img_args + list ( args ) ) ) )
sys . stderr . write ( ' qemu-img received signal %i : %s \n '
% ( - exitcode , ' ' . join ( qemu_img_args + list ( args ) ) ) )
return subp . communicate ( ) [ 0 ]
def qemu_img_log ( * args ) :
@ -162,7 +166,8 @@ def qemu_io(*args):
universal_newlines = True )
exitcode = subp . wait ( )
if exitcode < 0 :
sys . stderr . write ( ' qemu-io received signal %i : %s \n ' % ( - exitcode , ' ' . join ( args ) ) )
sys . stderr . write ( ' qemu-io received signal %i : %s \n '
% ( - exitcode , ' ' . join ( args ) ) )
return subp . communicate ( ) [ 0 ]
def qemu_io_log ( * args ) :
@ -284,10 +289,13 @@ win32_re = re.compile(r"\r")
def filter_win32 ( msg ) :
return win32_re . sub ( " " , msg )
qemu_io_re = re . compile ( r " [0-9]* ops; [0-9 \ /:. sec]* \ ([0-9 \ /.inf]* [EPTGMKiBbytes]* \ /sec and [0-9 \ /.inf]* ops \ /sec \ ) " )
qemu_io_re = re . compile ( r " [0-9]* ops; [0-9 \ /:. sec]* "
r " \ ([0-9 \ /.inf]* [EPTGMKiBbytes]* \ /sec "
r " and [0-9 \ /.inf]* ops \ /sec \ ) " )
def filter_qemu_io ( msg ) :
msg = filter_win32 ( msg )
return qemu_io_re . sub ( " X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec) " , msg )
return qemu_io_re . sub ( " X ops; XX:XX:XX.X "
" (XXX YYY/sec and XXX ops/sec) " , msg )
chown_re = re . compile ( r " chown [0-9]+:[0-9]+ " )
def filter_chown ( msg ) :
@ -340,7 +348,9 @@ def filter_img_info(output, filename):
line = filter_testfiles ( line )
line = line . replace ( imgfmt , ' IMGFMT ' )
line = re . sub ( ' iters: [0-9]+ ' , ' iters: XXX ' , line )
line = re . sub ( ' uuid: [-a-f0-9]+ ' , ' uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX ' , line )
line = re . sub ( ' uuid: [-a-f0-9]+ ' ,
' uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX ' ,
line )
line = re . sub ( ' cid: [0-9]+ ' , ' cid: XXXXXXXXXX ' , line )
lines . append ( line )
return ' \n ' . join ( lines )
@ -538,11 +548,13 @@ class VM(qtest.QEMUQtestMachine):
self . pause_drive ( drive , " write_aio " )
return
self . qmp ( ' human-monitor-command ' ,
command_line = ' qemu-io %s " break %s bp_ %s " ' % ( drive , event , drive ) )
command_line = ' qemu-io %s " break %s bp_ %s " '
% ( drive , event , drive ) )
def resume_drive ( self , drive ) :
self . qmp ( ' human-monitor-command ' ,
command_line = ' qemu-io %s " remove_break bp_ %s " ' % ( drive , drive ) )
command_line = ' qemu-io %s " remove_break bp_ %s " '
% ( drive , drive ) )
def hmp_qemu_io ( self , drive , cmd ) :
''' Write to a given drive using an HMP command '''
@ -802,16 +814,18 @@ class QMPTestCase(unittest.TestCase):
idx = int ( idx )
if not isinstance ( d , dict ) or component not in d :
self . fail ( ' failed path traversal for " %s " in " %s " ' % ( path , str ( d ) ) )
self . fail ( f ' failed path traversal for " { path } " in " { d } " ' )
d = d [ component ]
if m :
if not isinstance ( d , list ) :
self . fail ( ' path component " %s " in " %s " is not a list in " %s " ' % ( component , path , str ( d ) ) )
self . fail ( f ' path component " { component } " in " { path } " '
f ' is not a list in " { d } " ' )
try :
d = d [ idx ]
except IndexError :
self . fail ( ' invalid index " %s " in path " %s " in " %s " ' % ( idx , path , str ( d ) ) )
self . fail ( f ' invalid index " { idx } " in path " { path } " '
f ' in " { d } " ' )
return d
def assert_qmp_absent ( self , d , path ) :
@ -862,10 +876,13 @@ class QMPTestCase(unittest.TestCase):
''' Asserts that the given filename is a json: filename and that its
content is equal to the given reference object '''
self . assertEqual ( json_filename [ : 5 ] , ' json: ' )
self . assertEqual ( self . vm . flatten_qmp_object ( json . loads ( json_filename [ 5 : ] ) ) ,
self . vm . flatten_qmp_object ( reference ) )
self . assertEqual (
self . vm . flatten_qmp_object ( json . loads ( json_filename [ 5 : ] ) ) ,
self . vm . flatten_qmp_object ( reference )
)
def cancel_and_wait ( self , drive = ' drive0 ' , force = False , resume = False , wait = 60.0 ) :
def cancel_and_wait ( self , drive = ' drive0 ' , force = False ,
resume = False , wait = 60.0 ) :
''' Cancel a block job and wait for it to finish, returning the event '''
result = self . vm . qmp ( ' block-job-cancel ' , device = drive , force = force )
self . assert_qmp ( result , ' return ' , { } )
@ -889,8 +906,8 @@ class QMPTestCase(unittest.TestCase):
self . assert_no_active_block_jobs ( )
return result
def wait_until_completed ( self , drive = ' drive0 ' , check_offset = True , wait = 60.0 ,
error = None ) :
def wait_until_completed ( self , drive = ' drive0 ' , check_offset = True ,
wait = 60.0 , error = None ) :
''' Wait for a block job to finish, returning the event '''
while True :
for event in self . vm . get_qmp_events ( wait = wait ) :
@ -1029,8 +1046,11 @@ def verify_quorum():
notrun ( ' quorum support missing ' )
def qemu_pipe ( * args ) :
''' Run qemu with an option to print something and exit (e.g. a help option),
and return its output '''
"""
Run qemu with an option to print something and exit ( e . g . a help option ) .
: return : QEMU ' s stdout output.
"""
args = [ qemu_prog ] + qemu_opts + list ( args )
subp = subprocess . Popen ( args , stdout = subprocess . PIPE ,
stderr = subprocess . STDOUT ,
@ -1068,8 +1088,8 @@ def skip_if_unsupported(required_formats=(), read_only=False):
usf_list = list ( set ( fmts ) - set ( supported_formats ( read_only ) ) )
if usf_list :
test_case . case_skip ( ' {} : formats {} are not whitelisted ' . format (
test_case , usf_list ) )
msg = f ' { test_case } : formats { usf_list } are not whitelisted '
test_case . case_skip ( msg )
return None
else :
return func ( test_case , * args , * * kwargs )