~lutostag/ubuntu/trusty/maas/1.5.4+keystone

« back to all changes in this revision

Viewing changes to src/provisioningserver/import_images/tests/test_ephemerals_script.py

  • Committer: Package Import Robot
  • Author(s): Andres Rodriguez, Andres Rodriguez, Julian Edwards, Dustin Kirkland
  • Date: 2014-03-28 10:43:53 UTC
  • mfrom: (1.2.26)
  • Revision ID: package-import@ubuntu.com-20140328104353-9hj74f1nvl7xis5z
Tags: 1.5+bzr2204-0ubuntu1
* New upstream release (LP: #1281881)

[ Andres Rodriguez ]
* debian/maas-region-controller-min.templates: Set installation note to false
  by default.
* Check rabbitmqctl is present before running commands:
  - debian/maas-region-controller-min.maas-region-celery.upstart.
  - debian/maas-region-controller-min.maas-txlongpoll.upstart.
* make sure maas_longpoll rabbitmq user is created/with correct password on
  a package reconfigure.
* debian/maas-dns.postinst: Fix upgrade setup of named.conf.options.
* debian/maas-cluster-controller.install: Install UEFI templates (LP: #1299143)

[ Julian Edwards ]
* debian/extas/maas: Echo warning to stderr so json stdout is not polluted
* debian/maas-cluster-controller.postinst: Run upgrade-cluster on each
  upgrade
* debian/maas-dns.postinst: Call edit_named_options to add a line in
  /etc/bind/named.conf.options that includes the
  /etc/named/maas/named.conf.options.inside.maas file.
* debian/control:
  - maas-dns depends on python-iscpy
  - maas-cluster-controller depends on python-seamicroclient
* debian/maas-cluster-controller.install: Install bootresources.yaml

[ Dustin Kirkland ]
* debian/control: LP: #1297097
  - clean up package descriptions, modernize, and more clearly/simply
    explain what each package does
  - drop "Ubuntu" in front of MAAS, clean up command line/API description

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2013 Canonical Ltd.  This software is licensed under the
2
 
# GNU Affero General Public License version 3 (see the file LICENSE).
3
 
 
4
 
"""Tests for `ephemerals_script`."""
5
 
 
6
 
from __future__ import (
7
 
    absolute_import,
8
 
    print_function,
9
 
    unicode_literals,
10
 
    )
11
 
 
12
 
str = None
13
 
 
14
 
__metaclass__ = type
15
 
__all__ = []
16
 
 
17
 
from argparse import ArgumentParser
18
 
from copy import deepcopy
19
 
from os import (
20
 
    listdir,
21
 
    readlink,
22
 
    )
23
 
import os.path
24
 
from pipes import quote
25
 
import subprocess
26
 
from textwrap import dedent
27
 
 
28
 
from fixtures import EnvironmentVariableFixture
29
 
from maastesting.factory import factory
30
 
from provisioningserver.config import Config
31
 
from provisioningserver.import_images import (
32
 
    config as config_module,
33
 
    ephemerals_script,
34
 
    )
35
 
from provisioningserver.import_images.ephemerals_script import (
36
 
    compose_filter,
37
 
    create_symlinked_image_dir,
38
 
    extract_image_tarball,
39
 
    install_image_from_simplestreams,
40
 
    make_arg_parser,
41
 
    move_file_by_glob,
42
 
    )
43
 
from provisioningserver.pxe.tftppath import (
44
 
    compose_image_path,
45
 
    locate_tftp_path,
46
 
    )
47
 
from provisioningserver.testing.config import ConfigFixture
48
 
from provisioningserver.testing.testcase import PservTestCase
49
 
from provisioningserver.utils import (
50
 
    ExternalProcessError,
51
 
    read_text_file,
52
 
    )
53
 
from testtools.matchers import (
54
 
    FileContains,
55
 
    FileExists,
56
 
    Not,
57
 
    StartsWith,
58
 
    )
59
 
 
60
 
 
61
 
def split_path(path):
62
 
    """Return directory and filename component of a file path."""
63
 
    return os.path.dirname(path), os.path.basename(path)
64
 
 
65
 
 
66
 
class TestHelpers(PservTestCase):
67
 
    def make_target(self):
68
 
        """Return an existing directory, and nonexistent filename."""
69
 
        return self.make_dir(), factory.make_name()
70
 
 
71
 
    def test_move_file_by_glob_moves_file(self):
72
 
        content = factory.getRandomString()
73
 
        source_dir, source_name = split_path(self.make_file(contents=content))
74
 
        target_dir, target_name = self.make_target()
75
 
 
76
 
        move_file_by_glob(
77
 
            source_dir, source_name[:3] + '*',
78
 
            target_dir, target_name)
79
 
 
80
 
        self.assertThat(
81
 
            os.path.join(source_dir, source_name),
82
 
            Not(FileExists()))
83
 
        self.assertThat(
84
 
            os.path.join(target_dir, target_name),
85
 
            FileContains(content))
86
 
 
87
 
    def test_move_file_by_glob_returns_target_path(self):
88
 
        source_dir, source_name = split_path(self.make_file())
89
 
        target_dir, target_name = self.make_target()
90
 
 
91
 
        target = move_file_by_glob(
92
 
            source_dir, source_name, target_dir, target_name)
93
 
 
94
 
        self.assertEqual(os.path.join(target_dir, target_name), target)
95
 
 
96
 
    def test_move_file_by_glob_ignores_nonmatching_files(self):
97
 
        content = factory.getRandomString()
98
 
        source_dir, source_name = split_path(self.make_file(contents=content))
99
 
        other_content = factory.getRandomString()
100
 
        other_file = factory.make_file(source_dir, contents=other_content)
101
 
        target_dir, target_name = self.make_target()
102
 
 
103
 
        move_file_by_glob(source_dir, source_name, target_dir, target_name)
104
 
 
105
 
        self.assertThat(other_file, FileContains(other_content))
106
 
        self.assertThat(
107
 
            os.path.join(target_dir, target_name),
108
 
            FileContains(content))
109
 
        self.assertItemsEqual(
110
 
            [os.path.basename(other_file)],
111
 
            os.listdir(source_dir))
112
 
        self.assertItemsEqual([target_name], os.listdir(target_dir))
113
 
 
114
 
    def test_move_file_by_glob_fails_if_no_files_match(self):
115
 
        self.assertRaises(
116
 
            AssertionError,
117
 
            move_file_by_glob,
118
 
            self.make_dir(), factory.make_name() + '*',
119
 
            self.make_dir(), factory.make_name())
120
 
 
121
 
    def test_move_file_by_glob_fails_if_multiple_files_match(self):
122
 
        source_dir = self.make_dir()
123
 
        factory.make_file(source_dir)
124
 
        factory.make_file(source_dir)
125
 
 
126
 
        self.assertRaises(
127
 
            AssertionError,
128
 
            move_file_by_glob,
129
 
            source_dir, '*', self.make_dir(), factory.make_name())
130
 
 
131
 
    def test_compose_filter_returns_single_literal(self):
132
 
        key = factory.make_name('key')
133
 
        literal = factory.getRandomString()
134
 
        self.assertEqual(
135
 
            '%s~(%s)' % (key, literal),
136
 
            compose_filter(key, [literal]))
137
 
 
138
 
    def test_compose_filter_combines_literals(self):
139
 
        key = factory.make_name('key')
140
 
        values = (factory.getRandomString(), factory.getRandomString())
141
 
        self.assertEqual(
142
 
            '%s~(%s|%s)' % (key, values[0], values[1]),
143
 
            compose_filter(key, values))
144
 
 
145
 
    def test_compose_filter_escapes_literals_for_regex_use(self):
146
 
        key = factory.make_name('key')
147
 
        self.assertEqual(
148
 
            '%s~(x\\.y\\*)' % key,
149
 
            compose_filter(key, ['x.y*']))
150
 
 
151
 
 
152
 
class TestExtractImageTarball(PservTestCase):
153
 
    """Tests for `extract_image_tarball`."""
154
 
 
155
 
    def test_copies_boot_image_files_from_tarball(self):
156
 
        prefix = factory.make_name()
157
 
        kernel_content = factory.getRandomString()
158
 
        initrd_content = factory.getRandomString()
159
 
        img_content = factory.getRandomString()
160
 
        tarball = factory.make_tarball(self.make_dir(), {
161
 
            '%s-vmlinuz.gz' % prefix: kernel_content,
162
 
            '%s-initrd.gz' % prefix: initrd_content,
163
 
            '%s.img' % prefix: img_content,
164
 
            })
165
 
        target_dir = self.make_dir()
166
 
        self.patch(ephemerals_script, 'call_uec2roottar')
167
 
 
168
 
        extract_image_tarball(tarball, target_dir)
169
 
 
170
 
        self.assertItemsEqual(
171
 
            ['linux', 'initrd.gz', 'disk.img'],
172
 
            listdir(target_dir))
173
 
        self.assertThat(
174
 
            os.path.join(target_dir, 'linux'),
175
 
            FileContains(kernel_content))
176
 
        self.assertThat(
177
 
            os.path.join(target_dir, 'initrd.gz'),
178
 
            FileContains(initrd_content))
179
 
        self.assertThat(
180
 
            os.path.join(target_dir, 'disk.img'),
181
 
            FileContains(img_content))
182
 
 
183
 
    def test_ignores_extraneous_files_in_tarball(self):
184
 
        prefix = factory.make_name()
185
 
        tarball = factory.make_tarball(self.make_dir(), {
186
 
            '%s-vmlinuz.gz' % prefix: None,
187
 
            '%s-initrd.gz' % prefix: None,
188
 
            '%s.img' % prefix: None,
189
 
            'HELLO.TXT': None,
190
 
            })
191
 
        target_dir = self.make_dir()
192
 
        self.patch(ephemerals_script, 'call_uec2roottar')
193
 
 
194
 
        extract_image_tarball(tarball, target_dir)
195
 
 
196
 
        self.assertItemsEqual(
197
 
            ['linux', 'initrd.gz', 'disk.img'],
198
 
            listdir(target_dir))
199
 
 
200
 
    def test_runs_uec2roottar(self):
201
 
        check_call = self.patch(subprocess, 'check_call')
202
 
        fake_image = factory.make_name('image')
203
 
        self.patch(ephemerals_script, 'move_file_by_glob').return_value = (
204
 
            fake_image)
205
 
        tarball = factory.make_name('tarball') + '.tar.gz'
206
 
        target_dir = self.make_dir()
207
 
 
208
 
        extract_image_tarball(tarball, target_dir)
209
 
 
210
 
        check_call.assert_called_with([
211
 
            'uec2roottar',
212
 
            fake_image,
213
 
            os.path.join(target_dir, 'dist-root.tar.gz'),
214
 
            ])
215
 
 
216
 
    def test_cleans_up_temp_location(self):
217
 
        self.patch(subprocess, 'check_call')
218
 
        fake_image = factory.make_name('image')
219
 
        self.patch(ephemerals_script, 'move_file_by_glob').return_value = (
220
 
            fake_image)
221
 
        tarball = factory.make_name('tarball') + '.tar.gz'
222
 
        target_dir = self.make_dir()
223
 
        temp_location = self.make_dir()
224
 
 
225
 
        extract_image_tarball(tarball, target_dir, temp_location)
226
 
 
227
 
        self.assertItemsEqual([], listdir(temp_location))
228
 
 
229
 
    def test_cleans_up_after_failure(self):
230
 
        self.patch(subprocess, 'check_call').side_effect = (
231
 
            ExternalProcessError(-1, "some_command"))
232
 
        fake_image = factory.make_name('image')
233
 
        self.patch(ephemerals_script, 'move_file_by_glob').return_value = (
234
 
            fake_image)
235
 
        tarball = factory.make_name('tarball') + '.tar.gz'
236
 
        target_dir = self.make_dir()
237
 
        temp_location = self.make_dir()
238
 
 
239
 
        self.assertRaises(
240
 
            ExternalProcessError,
241
 
            extract_image_tarball, tarball, target_dir, temp_location)
242
 
 
243
 
        self.assertItemsEqual([], listdir(temp_location))
244
 
 
245
 
 
246
 
class TestCreateSymlinkedImageDir(PservTestCase):
247
 
    """Tests for `create_symlinked_image_dir`."""
248
 
 
249
 
    def make_original_dir(self):
250
 
        """Create a directory with the kernel, initrd and root tarball."""
251
 
        original_dir = self.make_dir()
252
 
        factory.make_file(original_dir, 'linux')
253
 
        factory.make_file(original_dir, 'initrd.gz')
254
 
        factory.make_file(original_dir, 'dist-root.tar.gz')
255
 
        return original_dir
256
 
 
257
 
    def test_symlinks_files(self):
258
 
        original_dir = self.make_original_dir()
259
 
        temp_location = self.make_dir()
260
 
 
261
 
        image_dir = create_symlinked_image_dir(original_dir, temp_location)
262
 
 
263
 
        self.assertNotEqual(original_dir, image_dir)
264
 
        self.assertNotEqual(temp_location, image_dir)
265
 
        self.assertThat(image_dir, StartsWith(temp_location + '/'))
266
 
        self.assertItemsEqual(
267
 
            ['linux', 'initrd.gz', 'root.tar.gz'],
268
 
            listdir(image_dir))
269
 
        self.assertEqual(
270
 
            os.path.join(original_dir, 'linux'),
271
 
            readlink(os.path.join(image_dir, 'linux')))
272
 
        self.assertEqual(
273
 
            os.path.join(original_dir, 'initrd.gz'),
274
 
            readlink(os.path.join(image_dir, 'initrd.gz')))
275
 
        self.assertEqual(
276
 
            os.path.join(original_dir, 'dist-root.tar.gz'),
277
 
            readlink(os.path.join(image_dir, 'root.tar.gz')))
278
 
 
279
 
    def test_cleans_up_temp_location(self):
280
 
        original_dir = self.make_original_dir()
281
 
        temp_location = self.make_dir()
282
 
 
283
 
        image_dir = create_symlinked_image_dir(original_dir, temp_location)
284
 
 
285
 
        # Nothing is left in temp_location except the result.
286
 
        self.assertItemsEqual(
287
 
            [os.path.basename(image_dir)],
288
 
            listdir(temp_location))
289
 
 
290
 
    def test_cleans_up_after_failure(self):
291
 
        class DeliberateFailure(RuntimeError):
292
 
            pass
293
 
 
294
 
        self.patch(ephemerals_script, 'symlink').side_effect = (
295
 
            DeliberateFailure("Symlinking intentionally broken"))
296
 
        original_dir = self.make_dir()
297
 
        temp_location = self.make_dir()
298
 
 
299
 
        self.assertRaises(
300
 
            DeliberateFailure,
301
 
            create_symlinked_image_dir, original_dir, temp_location)
302
 
 
303
 
        self.assertItemsEqual([], listdir(temp_location))
304
 
 
305
 
 
306
 
class TestInstallImageFromSimplestreams(PservTestCase):
307
 
    """Tests for `install_image_from_simplestreams`."""
308
 
 
309
 
    def prepare_storage_dir(self):
310
 
        """Set up a storage directory with kernel, initrd, and root tarball."""
311
 
        storage = self.make_dir()
312
 
        factory.make_file(storage, 'linux')
313
 
        factory.make_file(storage, 'initrd.gz')
314
 
        factory.make_file(storage, 'dist-root.tar.gz')
315
 
        return storage
316
 
 
317
 
    def patch_config(self, tftp_root):
318
 
        """Set up a fake config, pointing to the given TFTP root directory."""
319
 
        self.useFixture(ConfigFixture({'tftp': {'root': tftp_root}}))
320
 
 
321
 
    def test_installs_image(self):
322
 
        tftp_root = self.make_dir()
323
 
        self.patch_config(tftp_root)
324
 
        storage_dir = self.prepare_storage_dir()
325
 
        release = factory.make_name('release')
326
 
        arch = factory.make_name('arch')
327
 
 
328
 
        install_image_from_simplestreams(
329
 
            storage_dir, release=release, arch=arch)
330
 
 
331
 
        install_dir = locate_tftp_path(
332
 
            compose_image_path(arch, 'generic', release, 'commissioning'),
333
 
            tftproot=tftp_root)
334
 
        self.assertItemsEqual(
335
 
            ['linux', 'initrd.gz', 'root.tar.gz'],
336
 
            listdir(install_dir))
337
 
        self.assertThat(
338
 
            os.path.join(install_dir, 'linux'),
339
 
            FileContains(read_text_file(os.path.join(storage_dir, 'linux'))))
340
 
 
341
 
    def test_cleans_up_temp_location(self):
342
 
        self.patch(ephemerals_script, 'install_image')
343
 
        temp_location = self.make_dir()
344
 
        storage_dir = self.prepare_storage_dir()
345
 
 
346
 
        install_image_from_simplestreams(
347
 
            storage_dir, release=factory.make_name('release'),
348
 
            arch=factory.make_name('arch'), temp_location=temp_location)
349
 
 
350
 
        self.assertItemsEqual([], listdir(temp_location))
351
 
 
352
 
    def test_cleans_up_after_failure(self):
353
 
        class DeliberateFailure(RuntimeError):
354
 
            pass
355
 
 
356
 
        self.patch(ephemerals_script, 'install_image').side_effect = (
357
 
            DeliberateFailure())
358
 
        temp_location = self.make_dir()
359
 
        storage_dir = self.prepare_storage_dir()
360
 
 
361
 
        self.assertRaises(
362
 
            DeliberateFailure,
363
 
            install_image_from_simplestreams,
364
 
            storage_dir, release=factory.make_name('release'),
365
 
            arch=factory.make_name('arch'), temp_location=temp_location)
366
 
 
367
 
        self.assertItemsEqual([], listdir(temp_location))
368
 
 
369
 
 
370
 
def make_legacy_config(data_dir=None, arches=None, releases=None):
371
 
    """Create contents for a legacy, shell-script config file."""
372
 
    if data_dir is None:
373
 
        data_dir = factory.make_name('datadir')
374
 
    if arches is None:
375
 
        arches = [factory.make_name('arch') for counter in range(2)]
376
 
    if releases is None:
377
 
        releases = [factory.make_name('release') for counter in range(2)]
378
 
    return dedent("""\
379
 
    DATA_DIR=%s
380
 
    ARCHES=%s
381
 
    RELEASES=%s
382
 
    """) % (
383
 
        quote(data_dir),
384
 
        quote(' '.join(arches)),
385
 
        quote(' '.join(releases)),
386
 
    )
387
 
 
388
 
 
389
 
def install_legacy_config(testcase, contents):
390
 
    """Set up a legacy config file with the given contents.
391
 
 
392
 
    Returns the config file's path.
393
 
    """
394
 
    legacy_file = testcase.make_file(contents=contents)
395
 
    testcase.patch(config_module, 'EPHEMERALS_LEGACY_CONFIG', legacy_file)
396
 
    return legacy_file
397
 
 
398
 
 
399
 
class TestMakeArgParser(PservTestCase):
400
 
 
401
 
    def test_creates_parser(self):
402
 
        self.useFixture(ConfigFixture({'boot': {'ephemeral': {}}}))
403
 
        documentation = factory.getRandomString()
404
 
 
405
 
        parser = make_arg_parser(documentation)
406
 
 
407
 
        self.assertIsInstance(parser, ArgumentParser)
408
 
        self.assertEqual(documentation, parser.description)
409
 
 
410
 
    def test_defaults_to_config(self):
411
 
        images_directory = self.make_dir()
412
 
        arches = [factory.make_name('arch1'), factory.make_name('arch2')]
413
 
        releases = [factory.make_name('rel1'), factory.make_name('rel2')]
414
 
        self.useFixture(ConfigFixture({
415
 
            'boot': {
416
 
                'architectures': arches,
417
 
                'ephemeral': {
418
 
                    'images_directory': images_directory,
419
 
                    'releases': releases,
420
 
                },
421
 
            },
422
 
        }))
423
 
 
424
 
        parser = make_arg_parser(factory.getRandomString())
425
 
 
426
 
        args = parser.parse_args('')
427
 
        self.assertEqual(images_directory, args.output)
428
 
        self.assertItemsEqual(
429
 
            [
430
 
                compose_filter('arch', arches),
431
 
                compose_filter('release', releases),
432
 
            ],
433
 
            args.filters)
434
 
 
435
 
    def test_does_not_require_config(self):
436
 
        defaults = Config.get_defaults()
437
 
        no_file = os.path.join(self.make_dir(), factory.make_name() + '.yaml')
438
 
        self.useFixture(
439
 
            EnvironmentVariableFixture('MAAS_PROVISIONING_SETTINGS', no_file))
440
 
 
441
 
        parser = make_arg_parser(factory.getRandomString())
442
 
 
443
 
        args = parser.parse_args('')
444
 
        self.assertEqual(
445
 
            defaults['boot']['ephemeral']['images_directory'],
446
 
            args.output)
447
 
        self.assertItemsEqual([], args.filters)
448
 
 
449
 
    def test_does_not_modify_config(self):
450
 
        self.useFixture(ConfigFixture({
451
 
            'boot': {
452
 
                'architectures': [factory.make_name('arch')],
453
 
                'ephemeral': {
454
 
                    'images_directory': self.make_dir(),
455
 
                    'releases': [factory.make_name('release')],
456
 
                },
457
 
            },
458
 
        }))
459
 
        original_boot_config = deepcopy(Config.load_from_cache()['boot'])
460
 
        install_legacy_config(self, make_legacy_config())
461
 
 
462
 
        make_arg_parser(factory.getRandomString())
463
 
 
464
 
        self.assertEqual(
465
 
            original_boot_config,
466
 
            Config.load_from_cache()['boot'])
467
 
 
468
 
    def test_uses_legacy_config(self):
469
 
        data_dir = self.make_dir()
470
 
        self.useFixture(ConfigFixture({}))
471
 
        install_legacy_config(self, make_legacy_config(data_dir=data_dir))
472
 
 
473
 
        parser = make_arg_parser(factory.getRandomString())
474
 
 
475
 
        args = parser.parse_args('')
476
 
        self.assertEqual(data_dir, args.output)