1
# Copyright 2013 Canonical Ltd. This software is licensed under the
2
# GNU Affero General Public License version 3 (see the file LICENSE).
4
"""Tests for `ephemerals_script`."""
6
from __future__ import (
17
from argparse import ArgumentParser
18
from copy import deepcopy
24
from pipes import quote
26
from textwrap import dedent
28
from fixtures import EnvironmentVariableFixture
29
from maastesting.factory import factory
30
from provisioningserver.config import Config
31
from provisioningserver.import_images import (
32
config as config_module,
35
from provisioningserver.import_images.ephemerals_script import (
37
create_symlinked_image_dir,
38
extract_image_tarball,
39
install_image_from_simplestreams,
43
from provisioningserver.pxe.tftppath import (
47
from provisioningserver.testing.config import ConfigFixture
48
from provisioningserver.testing.testcase import PservTestCase
49
from provisioningserver.utils import (
53
from testtools.matchers import (
62
"""Return directory and filename component of a file path."""
63
return os.path.dirname(path), os.path.basename(path)
66
class TestHelpers(PservTestCase):
67
def make_target(self):
68
"""Return an existing directory, and nonexistent filename."""
69
return self.make_dir(), factory.make_name()
71
def test_move_file_by_glob_moves_file(self):
72
content = factory.getRandomString()
73
source_dir, source_name = split_path(self.make_file(contents=content))
74
target_dir, target_name = self.make_target()
77
source_dir, source_name[:3] + '*',
78
target_dir, target_name)
81
os.path.join(source_dir, source_name),
84
os.path.join(target_dir, target_name),
85
FileContains(content))
87
def test_move_file_by_glob_returns_target_path(self):
88
source_dir, source_name = split_path(self.make_file())
89
target_dir, target_name = self.make_target()
91
target = move_file_by_glob(
92
source_dir, source_name, target_dir, target_name)
94
self.assertEqual(os.path.join(target_dir, target_name), target)
96
def test_move_file_by_glob_ignores_nonmatching_files(self):
97
content = factory.getRandomString()
98
source_dir, source_name = split_path(self.make_file(contents=content))
99
other_content = factory.getRandomString()
100
other_file = factory.make_file(source_dir, contents=other_content)
101
target_dir, target_name = self.make_target()
103
move_file_by_glob(source_dir, source_name, target_dir, target_name)
105
self.assertThat(other_file, FileContains(other_content))
107
os.path.join(target_dir, target_name),
108
FileContains(content))
109
self.assertItemsEqual(
110
[os.path.basename(other_file)],
111
os.listdir(source_dir))
112
self.assertItemsEqual([target_name], os.listdir(target_dir))
114
def test_move_file_by_glob_fails_if_no_files_match(self):
118
self.make_dir(), factory.make_name() + '*',
119
self.make_dir(), factory.make_name())
121
def test_move_file_by_glob_fails_if_multiple_files_match(self):
122
source_dir = self.make_dir()
123
factory.make_file(source_dir)
124
factory.make_file(source_dir)
129
source_dir, '*', self.make_dir(), factory.make_name())
131
def test_compose_filter_returns_single_literal(self):
132
key = factory.make_name('key')
133
literal = factory.getRandomString()
135
'%s~(%s)' % (key, literal),
136
compose_filter(key, [literal]))
138
def test_compose_filter_combines_literals(self):
139
key = factory.make_name('key')
140
values = (factory.getRandomString(), factory.getRandomString())
142
'%s~(%s|%s)' % (key, values[0], values[1]),
143
compose_filter(key, values))
145
def test_compose_filter_escapes_literals_for_regex_use(self):
146
key = factory.make_name('key')
148
'%s~(x\\.y\\*)' % key,
149
compose_filter(key, ['x.y*']))
152
class TestExtractImageTarball(PservTestCase):
153
"""Tests for `extract_image_tarball`."""
155
def test_copies_boot_image_files_from_tarball(self):
156
prefix = factory.make_name()
157
kernel_content = factory.getRandomString()
158
initrd_content = factory.getRandomString()
159
img_content = factory.getRandomString()
160
tarball = factory.make_tarball(self.make_dir(), {
161
'%s-vmlinuz.gz' % prefix: kernel_content,
162
'%s-initrd.gz' % prefix: initrd_content,
163
'%s.img' % prefix: img_content,
165
target_dir = self.make_dir()
166
self.patch(ephemerals_script, 'call_uec2roottar')
168
extract_image_tarball(tarball, target_dir)
170
self.assertItemsEqual(
171
['linux', 'initrd.gz', 'disk.img'],
174
os.path.join(target_dir, 'linux'),
175
FileContains(kernel_content))
177
os.path.join(target_dir, 'initrd.gz'),
178
FileContains(initrd_content))
180
os.path.join(target_dir, 'disk.img'),
181
FileContains(img_content))
183
def test_ignores_extraneous_files_in_tarball(self):
184
prefix = factory.make_name()
185
tarball = factory.make_tarball(self.make_dir(), {
186
'%s-vmlinuz.gz' % prefix: None,
187
'%s-initrd.gz' % prefix: None,
188
'%s.img' % prefix: None,
191
target_dir = self.make_dir()
192
self.patch(ephemerals_script, 'call_uec2roottar')
194
extract_image_tarball(tarball, target_dir)
196
self.assertItemsEqual(
197
['linux', 'initrd.gz', 'disk.img'],
200
def test_runs_uec2roottar(self):
201
check_call = self.patch(subprocess, 'check_call')
202
fake_image = factory.make_name('image')
203
self.patch(ephemerals_script, 'move_file_by_glob').return_value = (
205
tarball = factory.make_name('tarball') + '.tar.gz'
206
target_dir = self.make_dir()
208
extract_image_tarball(tarball, target_dir)
210
check_call.assert_called_with([
213
os.path.join(target_dir, 'dist-root.tar.gz'),
216
def test_cleans_up_temp_location(self):
217
self.patch(subprocess, 'check_call')
218
fake_image = factory.make_name('image')
219
self.patch(ephemerals_script, 'move_file_by_glob').return_value = (
221
tarball = factory.make_name('tarball') + '.tar.gz'
222
target_dir = self.make_dir()
223
temp_location = self.make_dir()
225
extract_image_tarball(tarball, target_dir, temp_location)
227
self.assertItemsEqual([], listdir(temp_location))
229
def test_cleans_up_after_failure(self):
230
self.patch(subprocess, 'check_call').side_effect = (
231
ExternalProcessError(-1, "some_command"))
232
fake_image = factory.make_name('image')
233
self.patch(ephemerals_script, 'move_file_by_glob').return_value = (
235
tarball = factory.make_name('tarball') + '.tar.gz'
236
target_dir = self.make_dir()
237
temp_location = self.make_dir()
240
ExternalProcessError,
241
extract_image_tarball, tarball, target_dir, temp_location)
243
self.assertItemsEqual([], listdir(temp_location))
246
class TestCreateSymlinkedImageDir(PservTestCase):
247
"""Tests for `create_symlinked_image_dir`."""
249
def make_original_dir(self):
250
"""Create a directory with the kernel, initrd and root tarball."""
251
original_dir = self.make_dir()
252
factory.make_file(original_dir, 'linux')
253
factory.make_file(original_dir, 'initrd.gz')
254
factory.make_file(original_dir, 'dist-root.tar.gz')
257
def test_symlinks_files(self):
258
original_dir = self.make_original_dir()
259
temp_location = self.make_dir()
261
image_dir = create_symlinked_image_dir(original_dir, temp_location)
263
self.assertNotEqual(original_dir, image_dir)
264
self.assertNotEqual(temp_location, image_dir)
265
self.assertThat(image_dir, StartsWith(temp_location + '/'))
266
self.assertItemsEqual(
267
['linux', 'initrd.gz', 'root.tar.gz'],
270
os.path.join(original_dir, 'linux'),
271
readlink(os.path.join(image_dir, 'linux')))
273
os.path.join(original_dir, 'initrd.gz'),
274
readlink(os.path.join(image_dir, 'initrd.gz')))
276
os.path.join(original_dir, 'dist-root.tar.gz'),
277
readlink(os.path.join(image_dir, 'root.tar.gz')))
279
def test_cleans_up_temp_location(self):
280
original_dir = self.make_original_dir()
281
temp_location = self.make_dir()
283
image_dir = create_symlinked_image_dir(original_dir, temp_location)
285
# Nothing is left in temp_location except the result.
286
self.assertItemsEqual(
287
[os.path.basename(image_dir)],
288
listdir(temp_location))
290
def test_cleans_up_after_failure(self):
291
class DeliberateFailure(RuntimeError):
294
self.patch(ephemerals_script, 'symlink').side_effect = (
295
DeliberateFailure("Symlinking intentionally broken"))
296
original_dir = self.make_dir()
297
temp_location = self.make_dir()
301
create_symlinked_image_dir, original_dir, temp_location)
303
self.assertItemsEqual([], listdir(temp_location))
306
class TestInstallImageFromSimplestreams(PservTestCase):
307
"""Tests for `install_image_from_simplestreams`."""
309
def prepare_storage_dir(self):
310
"""Set up a storage directory with kernel, initrd, and root tarball."""
311
storage = self.make_dir()
312
factory.make_file(storage, 'linux')
313
factory.make_file(storage, 'initrd.gz')
314
factory.make_file(storage, 'dist-root.tar.gz')
317
def patch_config(self, tftp_root):
318
"""Set up a fake config, pointing to the given TFTP root directory."""
319
self.useFixture(ConfigFixture({'tftp': {'root': tftp_root}}))
321
def test_installs_image(self):
322
tftp_root = self.make_dir()
323
self.patch_config(tftp_root)
324
storage_dir = self.prepare_storage_dir()
325
release = factory.make_name('release')
326
arch = factory.make_name('arch')
328
install_image_from_simplestreams(
329
storage_dir, release=release, arch=arch)
331
install_dir = locate_tftp_path(
332
compose_image_path(arch, 'generic', release, 'commissioning'),
334
self.assertItemsEqual(
335
['linux', 'initrd.gz', 'root.tar.gz'],
336
listdir(install_dir))
338
os.path.join(install_dir, 'linux'),
339
FileContains(read_text_file(os.path.join(storage_dir, 'linux'))))
341
def test_cleans_up_temp_location(self):
342
self.patch(ephemerals_script, 'install_image')
343
temp_location = self.make_dir()
344
storage_dir = self.prepare_storage_dir()
346
install_image_from_simplestreams(
347
storage_dir, release=factory.make_name('release'),
348
arch=factory.make_name('arch'), temp_location=temp_location)
350
self.assertItemsEqual([], listdir(temp_location))
352
def test_cleans_up_after_failure(self):
353
class DeliberateFailure(RuntimeError):
356
self.patch(ephemerals_script, 'install_image').side_effect = (
358
temp_location = self.make_dir()
359
storage_dir = self.prepare_storage_dir()
363
install_image_from_simplestreams,
364
storage_dir, release=factory.make_name('release'),
365
arch=factory.make_name('arch'), temp_location=temp_location)
367
self.assertItemsEqual([], listdir(temp_location))
370
def make_legacy_config(data_dir=None, arches=None, releases=None):
371
"""Create contents for a legacy, shell-script config file."""
373
data_dir = factory.make_name('datadir')
375
arches = [factory.make_name('arch') for counter in range(2)]
377
releases = [factory.make_name('release') for counter in range(2)]
384
quote(' '.join(arches)),
385
quote(' '.join(releases)),
389
def install_legacy_config(testcase, contents):
390
"""Set up a legacy config file with the given contents.
392
Returns the config file's path.
394
legacy_file = testcase.make_file(contents=contents)
395
testcase.patch(config_module, 'EPHEMERALS_LEGACY_CONFIG', legacy_file)
399
class TestMakeArgParser(PservTestCase):
401
def test_creates_parser(self):
402
self.useFixture(ConfigFixture({'boot': {'ephemeral': {}}}))
403
documentation = factory.getRandomString()
405
parser = make_arg_parser(documentation)
407
self.assertIsInstance(parser, ArgumentParser)
408
self.assertEqual(documentation, parser.description)
410
def test_defaults_to_config(self):
411
images_directory = self.make_dir()
412
arches = [factory.make_name('arch1'), factory.make_name('arch2')]
413
releases = [factory.make_name('rel1'), factory.make_name('rel2')]
414
self.useFixture(ConfigFixture({
416
'architectures': arches,
418
'images_directory': images_directory,
419
'releases': releases,
424
parser = make_arg_parser(factory.getRandomString())
426
args = parser.parse_args('')
427
self.assertEqual(images_directory, args.output)
428
self.assertItemsEqual(
430
compose_filter('arch', arches),
431
compose_filter('release', releases),
435
def test_does_not_require_config(self):
436
defaults = Config.get_defaults()
437
no_file = os.path.join(self.make_dir(), factory.make_name() + '.yaml')
439
EnvironmentVariableFixture('MAAS_PROVISIONING_SETTINGS', no_file))
441
parser = make_arg_parser(factory.getRandomString())
443
args = parser.parse_args('')
445
defaults['boot']['ephemeral']['images_directory'],
447
self.assertItemsEqual([], args.filters)
449
def test_does_not_modify_config(self):
450
self.useFixture(ConfigFixture({
452
'architectures': [factory.make_name('arch')],
454
'images_directory': self.make_dir(),
455
'releases': [factory.make_name('release')],
459
original_boot_config = deepcopy(Config.load_from_cache()['boot'])
460
install_legacy_config(self, make_legacy_config())
462
make_arg_parser(factory.getRandomString())
465
original_boot_config,
466
Config.load_from_cache()['boot'])
468
def test_uses_legacy_config(self):
469
data_dir = self.make_dir()
470
self.useFixture(ConfigFixture({}))
471
install_legacy_config(self, make_legacy_config(data_dir=data_dir))
473
parser = make_arg_parser(factory.getRandomString())
475
args = parser.parse_args('')
476
self.assertEqual(data_dir, args.output)