269
269
instance_data = dict(self.test_instance)
270
270
self._check_xml_and_container(instance_data)
272
def test_snapshot(self):
273
if not self.lazy_load_library_exists():
276
self.flags(image_service='nova.image.fake.FakeImageService')
279
image_service = utils.import_object(FLAGS.image_service)
281
# Assuming that base image already exists in image_service
282
instance_ref = db.instance_create(self.context, self.test_instance)
283
properties = {'instance_id': instance_ref['id'],
284
'user_id': str(self.context.user_id)}
285
snapshot_name = 'test-snap'
286
sent_meta = {'name': snapshot_name, 'is_public': False,
287
'status': 'creating', 'properties': properties}
288
# Create new image. It will be updated in snapshot method
289
# To work with it from snapshot, the single image_service is needed
290
recv_meta = image_service.create(context, sent_meta)
292
self.mox.StubOutWithMock(connection.LibvirtConnection, '_conn')
293
connection.LibvirtConnection._conn.lookupByName = self.fake_lookup
294
self.mox.StubOutWithMock(connection.utils, 'execute')
295
connection.utils.execute = self.fake_execute
299
conn = connection.LibvirtConnection(False)
300
conn.snapshot(self.context, instance_ref, recv_meta['id'])
302
snapshot = image_service.show(context, recv_meta['id'])
303
self.assertEquals(snapshot['properties']['image_state'], 'available')
304
self.assertEquals(snapshot['status'], 'active')
272
def test_snapshot_in_raw_format(self):
273
if not self.lazy_load_library_exists():
276
self.flags(image_service='nova.image.fake.FakeImageService')
279
image_service = utils.import_object(FLAGS.image_service)
281
# Assuming that base image already exists in image_service
282
instance_ref = db.instance_create(self.context, self.test_instance)
283
properties = {'instance_id': instance_ref['id'],
284
'user_id': str(self.context.user_id)}
285
snapshot_name = 'test-snap'
286
sent_meta = {'name': snapshot_name, 'is_public': False,
287
'status': 'creating', 'properties': properties}
288
# Create new image. It will be updated in snapshot method
289
# To work with it from snapshot, the single image_service is needed
290
recv_meta = image_service.create(context, sent_meta)
292
self.mox.StubOutWithMock(connection.LibvirtConnection, '_conn')
293
connection.LibvirtConnection._conn.lookupByName = self.fake_lookup
294
self.mox.StubOutWithMock(connection.utils, 'execute')
295
connection.utils.execute = self.fake_execute
299
conn = connection.LibvirtConnection(False)
300
conn.snapshot(self.context, instance_ref, recv_meta['id'])
302
snapshot = image_service.show(context, recv_meta['id'])
303
self.assertEquals(snapshot['properties']['image_state'], 'available')
304
self.assertEquals(snapshot['status'], 'active')
305
self.assertEquals(snapshot['disk_format'], 'raw')
306
self.assertEquals(snapshot['name'], snapshot_name)
308
def test_snapshot_in_qcow2_format(self):
309
if not self.lazy_load_library_exists():
312
self.flags(image_service='nova.image.fake.FakeImageService')
313
self.flags(snapshot_image_format='qcow2')
316
image_service = utils.import_object(FLAGS.image_service)
318
# Assuming that base image already exists in image_service
319
instance_ref = db.instance_create(self.context, self.test_instance)
320
properties = {'instance_id': instance_ref['id'],
321
'user_id': str(self.context.user_id)}
322
snapshot_name = 'test-snap'
323
sent_meta = {'name': snapshot_name, 'is_public': False,
324
'status': 'creating', 'properties': properties}
325
# Create new image. It will be updated in snapshot method
326
# To work with it from snapshot, the single image_service is needed
327
recv_meta = image_service.create(context, sent_meta)
329
self.mox.StubOutWithMock(connection.LibvirtConnection, '_conn')
330
connection.LibvirtConnection._conn.lookupByName = self.fake_lookup
331
self.mox.StubOutWithMock(connection.utils, 'execute')
332
connection.utils.execute = self.fake_execute
336
conn = connection.LibvirtConnection(False)
337
conn.snapshot(self.context, instance_ref, recv_meta['id'])
339
snapshot = image_service.show(context, recv_meta['id'])
340
self.assertEquals(snapshot['properties']['image_state'], 'available')
341
self.assertEquals(snapshot['status'], 'active')
342
self.assertEquals(snapshot['disk_format'], 'qcow2')
305
343
self.assertEquals(snapshot['name'], snapshot_name)
307
345
def test_snapshot_no_image_architecture(self):