1
# vim: tabstop=4 shiftwidth=4 softtabstop=4
3
# Copyright 2012 Grid Dynamics
6
# Licensed under the Apache License, Version 2.0 (the "License"); you may
7
# not use this file except in compliance with the License. You may obtain
8
# a copy of the License at
10
# http://www.apache.org/licenses/LICENSE-2.0
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15
# License for the specific language governing permissions and limitations
21
from nova.openstack.common import cfg
23
from nova.tests import fake_libvirt_utils
24
from nova.virt.libvirt import imagebackend
29
class _ImageTestCase(object):
30
INSTANCES_PATH = '/fake'
32
def mock_create_image(self, image):
33
def create_image(fn, base, size, *args, **kwargs):
34
fn(target=base, *args, **kwargs)
35
image.create_image = create_image
38
super(_ImageTestCase, self).setUp()
39
self.flags(disable_process_locking=True,
40
instances_path=self.INSTANCES_PATH)
41
self.INSTANCE = 'instance'
43
self.TEMPLATE = 'template'
45
self.PATH = os.path.join(CONF.instances_path, self.INSTANCE,
47
self.TEMPLATE_DIR = os.path.join(CONF.instances_path,
49
self.TEMPLATE_PATH = os.path.join(self.TEMPLATE_DIR, 'template')
51
self.useFixture(fixtures.MonkeyPatch(
52
'nova.virt.libvirt.imagebackend.libvirt_utils',
56
self.mox.StubOutWithMock(os.path, 'exists')
57
os.path.exists(self.PATH).AndReturn(False)
58
os.path.exists(self.TEMPLATE_DIR).AndReturn(False)
59
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
60
fn = self.mox.CreateMockAnything()
61
fn(target=self.TEMPLATE_PATH)
62
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
63
imagebackend.fileutils.ensure_tree(self.TEMPLATE_DIR)
66
image = self.image_class(self.INSTANCE, self.NAME)
67
self.mock_create_image(image)
68
image.cache(fn, self.TEMPLATE)
72
def test_cache_image_exists(self):
73
self.mox.StubOutWithMock(os.path, 'exists')
74
os.path.exists(self.PATH).AndReturn(True)
77
image = self.image_class(self.INSTANCE, self.NAME)
78
image.cache(None, self.TEMPLATE)
82
def test_cache_base_dir_exists(self):
83
self.mox.StubOutWithMock(os.path, 'exists')
84
os.path.exists(self.PATH).AndReturn(False)
85
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
86
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
87
fn = self.mox.CreateMockAnything()
88
fn(target=self.TEMPLATE_PATH)
89
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
92
image = self.image_class(self.INSTANCE, self.NAME)
93
self.mock_create_image(image)
94
image.cache(fn, self.TEMPLATE)
98
def test_cache_template_exists(self):
99
self.mox.StubOutWithMock(os.path, 'exists')
100
os.path.exists(self.PATH).AndReturn(False)
101
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
102
os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
103
fn = self.mox.CreateMockAnything()
106
image = self.image_class(self.INSTANCE, self.NAME)
107
self.mock_create_image(image)
108
image.cache(fn, self.TEMPLATE)
113
class RawTestCase(_ImageTestCase, test.TestCase):
118
self.image_class = imagebackend.Raw
119
super(RawTestCase, self).setUp()
121
def prepare_mocks(self):
122
fn = self.mox.CreateMockAnything()
123
self.mox.StubOutWithMock(imagebackend.lockutils.synchronized,
125
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
126
self.mox.StubOutWithMock(imagebackend.disk, 'extend')
129
def test_create_image(self):
130
fn = self.prepare_mocks()
131
fn(target=self.TEMPLATE_PATH, image_id=None)
132
imagebackend.libvirt_utils.copy_image(self.TEMPLATE_PATH, self.PATH)
135
image = self.image_class(self.INSTANCE, self.NAME)
136
image.create_image(fn, self.TEMPLATE_PATH, None, image_id=None)
140
def test_create_image_generated(self):
141
fn = self.prepare_mocks()
145
image = self.image_class(self.INSTANCE, self.NAME)
146
image.create_image(fn, self.TEMPLATE_PATH, None)
150
def test_create_image_extend(self):
151
fn = self.prepare_mocks()
152
fn(target=self.TEMPLATE_PATH, image_id=None)
153
imagebackend.libvirt_utils.copy_image(self.TEMPLATE_PATH, self.PATH)
154
imagebackend.disk.extend(self.PATH, self.SIZE)
157
image = self.image_class(self.INSTANCE, self.NAME)
158
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE, image_id=None)
163
class Qcow2TestCase(_ImageTestCase, test.TestCase):
164
SIZE = 1024 * 1024 * 1024
167
self.image_class = imagebackend.Qcow2
168
super(Qcow2TestCase, self).setUp()
169
self.QCOW2_BASE = (self.TEMPLATE_PATH +
170
'_%d' % (self.SIZE / (1024 * 1024 * 1024)))
172
def prepare_mocks(self):
173
fn = self.mox.CreateMockAnything()
174
self.mox.StubOutWithMock(imagebackend.lockutils.synchronized,
176
self.mox.StubOutWithMock(imagebackend.libvirt_utils,
178
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
179
self.mox.StubOutWithMock(imagebackend.disk, 'extend')
182
def test_create_image(self):
183
fn = self.prepare_mocks()
184
fn(target=self.TEMPLATE_PATH)
185
imagebackend.libvirt_utils.create_cow_image(self.TEMPLATE_PATH,
189
image = self.image_class(self.INSTANCE, self.NAME)
190
image.create_image(fn, self.TEMPLATE_PATH, None)
194
def test_create_image_with_size(self):
195
fn = self.prepare_mocks()
196
fn(target=self.TEMPLATE_PATH)
197
self.mox.StubOutWithMock(os.path, 'exists')
198
imagebackend.libvirt_utils.create_cow_image(self.TEMPLATE_PATH,
200
imagebackend.disk.extend(self.PATH, self.SIZE)
203
image = self.image_class(self.INSTANCE, self.NAME)
204
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE)
209
class LvmTestCase(_ImageTestCase, test.TestCase):
215
self.image_class = imagebackend.Lvm
216
super(LvmTestCase, self).setUp()
217
self.flags(libvirt_images_volume_group=self.VG)
218
self.LV = '%s_%s' % (self.INSTANCE, self.NAME)
219
self.PATH = os.path.join('/dev', self.VG, self.LV)
221
self.disk = imagebackend.disk
222
self.utils = imagebackend.utils
223
self.libvirt_utils = imagebackend.libvirt_utils
225
def prepare_mocks(self):
226
fn = self.mox.CreateMockAnything()
227
self.mox.StubOutWithMock(self.disk, 'resize2fs')
228
self.mox.StubOutWithMock(self.libvirt_utils, 'create_lvm_image')
229
self.mox.StubOutWithMock(self.disk, 'get_disk_size')
230
self.mox.StubOutWithMock(self.utils, 'execute')
233
def _create_image(self, sparse):
234
fn = self.prepare_mocks()
235
fn(target=self.TEMPLATE_PATH)
236
self.libvirt_utils.create_lvm_image(self.VG,
240
self.disk.get_disk_size(self.TEMPLATE_PATH
241
).AndReturn(self.TEMPLATE_SIZE)
242
cmd = ('dd', 'if=%s' % self.TEMPLATE_PATH,
243
'of=%s' % self.PATH, 'bs=4M')
244
self.utils.execute(*cmd, run_as_root=True)
247
image = self.image_class(self.INSTANCE, self.NAME)
248
image.create_image(fn, self.TEMPLATE_PATH, None)
252
def _create_image_generated(self, sparse):
253
fn = self.prepare_mocks()
254
self.libvirt_utils.create_lvm_image(self.VG, self.LV,
255
self.SIZE, sparse=sparse)
256
fn(target=self.PATH, ephemeral_size=None)
259
image = self.image_class(self.INSTANCE, self.NAME)
260
image.create_image(fn, self.TEMPLATE_PATH,
261
self.SIZE, ephemeral_size=None)
265
def _create_image_resize(self, sparse):
266
fn = self.prepare_mocks()
267
fn(target=self.TEMPLATE_PATH)
268
self.libvirt_utils.create_lvm_image(self.VG, self.LV,
269
self.SIZE, sparse=sparse)
270
self.disk.get_disk_size(self.TEMPLATE_PATH
271
).AndReturn(self.TEMPLATE_SIZE)
272
cmd = ('dd', 'if=%s' % self.TEMPLATE_PATH,
273
'of=%s' % self.PATH, 'bs=4M')
274
self.utils.execute(*cmd, run_as_root=True)
275
self.disk.resize2fs(self.PATH)
278
image = self.image_class(self.INSTANCE, self.NAME)
279
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE)
283
def test_create_image(self):
284
self._create_image(False)
286
def test_create_image_sparsed(self):
287
self.flags(libvirt_sparse_logical_volumes=True)
288
self._create_image(True)
290
def test_create_image_generated(self):
291
self._create_image_generated(False)
293
def test_create_image_generated_sparsed(self):
294
self.flags(libvirt_sparse_logical_volumes=True)
295
self._create_image_generated(True)
297
def test_create_image_resize(self):
298
self._create_image_resize(False)
300
def test_create_image_resize_sparsed(self):
301
self.flags(libvirt_sparse_logical_volumes=True)
302
self._create_image_resize(True)
304
def test_create_image_negative(self):
305
fn = self.prepare_mocks()
306
fn(target=self.TEMPLATE_PATH)
307
self.libvirt_utils.create_lvm_image(self.VG,
311
).AndRaise(RuntimeError())
312
self.disk.get_disk_size(self.TEMPLATE_PATH
313
).AndReturn(self.TEMPLATE_SIZE)
314
self.mox.StubOutWithMock(self.libvirt_utils, 'remove_logical_volumes')
315
self.libvirt_utils.remove_logical_volumes(self.PATH)
318
image = self.image_class(self.INSTANCE, self.NAME)
320
self.assertRaises(RuntimeError, image.create_image, fn,
321
self.TEMPLATE_PATH, self.SIZE)
324
def test_create_image_generated_negative(self):
325
fn = self.prepare_mocks()
327
ephemeral_size=None).AndRaise(RuntimeError())
328
self.libvirt_utils.create_lvm_image(self.VG,
332
self.mox.StubOutWithMock(self.libvirt_utils, 'remove_logical_volumes')
333
self.libvirt_utils.remove_logical_volumes(self.PATH)
336
image = self.image_class(self.INSTANCE, self.NAME)
338
self.assertRaises(RuntimeError, image.create_image, fn,
339
self.TEMPLATE_PATH, self.SIZE,
344
class BackendTestCase(test.TestCase):
345
INSTANCE = 'fake-instance'
346
NAME = 'fake-name.suffix'
348
def get_image(self, use_cow, image_type):
349
return imagebackend.Backend(use_cow).image(self.INSTANCE,
353
def _test_image(self, image_type, image_not_cow, image_cow):
354
image1 = self.get_image(False, image_type)
355
image2 = self.get_image(True, image_type)
357
def assertIsInstance(instance, class_object):
358
failure = ('Expected %s,' +
359
' but got %s.') % (class_object.__name__,
360
instance.__class__.__name__)
361
self.assertTrue(isinstance(instance, class_object), failure)
363
assertIsInstance(image1, image_not_cow)
364
assertIsInstance(image2, image_cow)
366
def test_image_raw(self):
367
self._test_image('raw', imagebackend.Raw, imagebackend.Raw)
369
def test_image_qcow2(self):
370
self._test_image('qcow2', imagebackend.Qcow2, imagebackend.Qcow2)
372
def test_image_lvm(self):
373
self.flags(libvirt_images_volume_group='FakeVG')
374
self._test_image('lvm', imagebackend.Lvm, imagebackend.Lvm)
376
def test_image_default(self):
377
self._test_image('default', imagebackend.Raw, imagebackend.Qcow2)