24
24
import sys; sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'QMP'))
27
28
__all__ = ['imgfmt', 'imgproto', 'test_dir' 'qemu_img', 'qemu_io',
28
29
'VM', 'QMPTestCase', 'notrun', 'main']
51
52
args = qemu_io_args + list(args)
52
53
return subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0]
55
def compare_images(img1, img2):
56
'''Return True if two image files are identical'''
57
return qemu_img('compare', '-f', imgfmt,
58
'-F', imgfmt, img1, img2) == 0
60
def create_image(name, size):
61
'''Create a fully-allocated raw image with sector markers'''
62
file = open(name, 'w')
65
sector = struct.pack('>l504xl', i / 512, i / 512)
79
95
self._num_drives += 1
98
def hmp_qemu_io(self, drive, cmd):
99
'''Write to a given drive using an HMP command'''
100
return self.qmp('human-monitor-command',
101
command_line='qemu-io %s "%s"' % (drive, cmd))
82
103
def add_fd(self, fd, fdset, opaque, opts=''):
83
104
'''Pass a file descriptor to the VM'''
84
105
options = ['fd=%d' % fd,
170
191
result = self.dictpath(d, path)
171
192
self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
194
def assert_no_active_block_jobs(self):
195
result = self.vm.qmp('query-block-jobs')
196
self.assert_qmp(result, 'return', [])
198
def cancel_and_wait(self, drive='drive0', force=False):
199
'''Cancel a block job and wait for it to finish, returning the event'''
200
result = self.vm.qmp('block-job-cancel', device=drive, force=force)
201
self.assert_qmp(result, 'return', {})
206
for event in self.vm.get_qmp_events(wait=True):
207
if event['event'] == 'BLOCK_JOB_COMPLETED' or \
208
event['event'] == 'BLOCK_JOB_CANCELLED':
209
self.assert_qmp(event, 'data/device', drive)
213
self.assert_no_active_block_jobs()
216
def wait_until_completed(self, drive='drive0'):
217
'''Wait for a block job to finish, returning the event'''
220
for event in self.vm.get_qmp_events(wait=True):
221
if event['event'] == 'BLOCK_JOB_COMPLETED':
222
self.assert_qmp(event, 'data/device', drive)
223
self.assert_qmp_absent(event, 'data/error')
224
self.assert_qmp(event, 'data/offset', self.image_len)
225
self.assert_qmp(event, 'data/len', self.image_len)
228
self.assert_no_active_block_jobs()
173
231
def notrun(reason):
174
232
'''Skip this test suite'''
175
233
# Each test in qemu-iotests has a number ("seq")