~ubuntu-branches/ubuntu/saucy/maas/saucy-updates

« back to all changes in this revision

Viewing changes to src/provisioningserver/import_images/tests/test_ephemerals_script.py

  • Committer: Package Import Robot
  • Author(s): Andres Rodriguez
  • Date: 2013-08-28 11:17:44 UTC
  • mfrom: (1.2.14)
  • Revision ID: package-import@ubuntu.com-20130828111744-a8hqmsmvvc1wnanc
Tags: 1.4+bzr1655+dfsg-0ubuntu1
* New Upstream release. (LP: #1218526)
* debian/control:
  - Depends on python-djorm-ext-pgarray, python-curtin,
    python-simplestreams, ubuntu-cloud-keyring.
  - Depends on maas-dns, maas-dhcp to get them seeded and
    into main (LP: #1227353)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2013 Canonical Ltd.  This software is licensed under the
 
2
# GNU Affero General Public License version 3 (see the file LICENSE).
 
3
 
 
4
"""Tests for `ephemerals_script`."""
 
5
 
 
6
from __future__ import (
 
7
    absolute_import,
 
8
    print_function,
 
9
    unicode_literals,
 
10
    )
 
11
 
 
12
__metaclass__ = type
 
13
__all__ = []
 
14
 
 
15
from os import (
 
16
    listdir,
 
17
    readlink,
 
18
    )
 
19
import os.path
 
20
import subprocess
 
21
 
 
22
from maastesting.factory import factory
 
23
from provisioningserver.import_images import ephemerals_script
 
24
from provisioningserver.import_images.ephemerals_script import (
 
25
    copy_file_by_glob,
 
26
    create_symlinked_image_dir,
 
27
    extract_image_tarball,
 
28
    install_image_from_simplestreams,
 
29
    )
 
30
from provisioningserver.pxe.tftppath import (
 
31
    compose_image_path,
 
32
    locate_tftp_path,
 
33
    )
 
34
from provisioningserver.testing.config import ConfigFixture
 
35
from provisioningserver.testing.testcase import PservTestCase
 
36
from testtools.matchers import (
 
37
    FileContains,
 
38
    StartsWith,
 
39
    )
 
40
from provisioningserver.utils import read_text_file
 
41
 
 
42
 
 
43
def split_path(path):
 
44
    """Return directory and filename component of a file path."""
 
45
    return os.path.dirname(path), os.path.basename(path)
 
46
 
 
47
 
 
48
class TestHelpers(PservTestCase):
 
49
    def make_target(self):
 
50
        """Return an existing directory, and nonexistent filename."""
 
51
        return self.make_dir(), factory.make_name()
 
52
 
 
53
    def test_copy_file_by_glob_copies_file(self):
 
54
        content = factory.getRandomString()
 
55
        source_dir, source_name = split_path(self.make_file(contents=content))
 
56
        target_dir, target_name = self.make_target()
 
57
 
 
58
        copy_file_by_glob(
 
59
            source_dir, source_name[:3] + '*',
 
60
            target_dir, target_name)
 
61
 
 
62
        self.assertThat(
 
63
            os.path.join(source_dir, source_name),
 
64
            FileContains(content))
 
65
        self.assertThat(
 
66
            os.path.join(target_dir, target_name),
 
67
            FileContains(content))
 
68
 
 
69
    def test_copy_by_file_returns_target_path(self):
 
70
        source_dir, source_name = split_path(self.make_file())
 
71
        target_dir, target_name = self.make_target()
 
72
 
 
73
        target = copy_file_by_glob(
 
74
            source_dir, source_name, target_dir, target_name)
 
75
 
 
76
        self.assertEqual(os.path.join(target_dir, target_name), target)
 
77
 
 
78
    def test_copy_file_by_glob_ignores_nonmatching_files(self):
 
79
        content = factory.getRandomString()
 
80
        source_dir, source_name = split_path(self.make_file(contents=content))
 
81
        other_content = factory.getRandomString()
 
82
        other_file = factory.make_file(source_dir, contents=other_content)
 
83
        target_dir, target_name = self.make_target()
 
84
 
 
85
        copy_file_by_glob(source_dir, source_name, target_dir, target_name)
 
86
 
 
87
        self.assertThat(other_file, FileContains(other_content))
 
88
        self.assertThat(
 
89
            os.path.join(target_dir, target_name),
 
90
            FileContains(content))
 
91
        self.assertItemsEqual(
 
92
            [source_name, os.path.basename(other_file)],
 
93
            os.listdir(source_dir))
 
94
        self.assertItemsEqual([target_name], os.listdir(target_dir))
 
95
 
 
96
    def test_copy_file_by_glob_fails_if_no_files_match(self):
 
97
        self.assertRaises(
 
98
            AssertionError,
 
99
            copy_file_by_glob,
 
100
            self.make_dir(), factory.make_name() + '*',
 
101
            self.make_dir(), factory.make_name())
 
102
 
 
103
    def test_copy_file_by_glob_fails_if_multiple_files_match(self):
 
104
        source_dir = self.make_dir()
 
105
        factory.make_file(source_dir)
 
106
        factory.make_file(source_dir)
 
107
 
 
108
        self.assertRaises(
 
109
            AssertionError,
 
110
            copy_file_by_glob,
 
111
            source_dir, '*', self.make_dir(), factory.make_name())
 
112
 
 
113
 
 
114
class TestExtractImageTarball(PservTestCase):
 
115
    """Tests for `extract_image_tarball`."""
 
116
 
 
117
    def test_copies_boot_image_files_from_tarball(self):
 
118
        prefix = factory.make_name()
 
119
        kernel_content = factory.getRandomString()
 
120
        initrd_content = factory.getRandomString()
 
121
        img_content = factory.getRandomString()
 
122
        tarball = factory.make_tarball(self.make_dir(), {
 
123
            '%s-vmlinuz.gz' % prefix: kernel_content,
 
124
            '%s-initrd.gz' % prefix: initrd_content,
 
125
            '%s.img' % prefix: img_content,
 
126
            })
 
127
        target_dir = self.make_dir()
 
128
        self.patch(ephemerals_script, 'call_uec2roottar')
 
129
 
 
130
        extract_image_tarball(tarball, target_dir)
 
131
 
 
132
        self.assertItemsEqual(
 
133
            ['linux', 'initrd.gz', 'disk.img'],
 
134
            listdir(target_dir))
 
135
        self.assertThat(
 
136
            os.path.join(target_dir, 'linux'),
 
137
            FileContains(kernel_content))
 
138
        self.assertThat(
 
139
            os.path.join(target_dir, 'initrd.gz'),
 
140
            FileContains(initrd_content))
 
141
        self.assertThat(
 
142
            os.path.join(target_dir, 'disk.img'),
 
143
            FileContains(img_content))
 
144
 
 
145
    def test_ignores_extraneous_files_in_tarball(self):
 
146
        prefix = factory.make_name()
 
147
        tarball = factory.make_tarball(self.make_dir(), {
 
148
            '%s-vmlinuz.gz' % prefix: None,
 
149
            '%s-initrd.gz' % prefix: None,
 
150
            '%s.img' % prefix: None,
 
151
            'HELLO.TXT': None,
 
152
            })
 
153
        target_dir = self.make_dir()
 
154
        self.patch(ephemerals_script, 'call_uec2roottar')
 
155
 
 
156
        extract_image_tarball(tarball, target_dir)
 
157
 
 
158
        self.assertItemsEqual(
 
159
            ['linux', 'initrd.gz', 'disk.img'],
 
160
            listdir(target_dir))
 
161
 
 
162
    def test_runs_uec2roottar(self):
 
163
        check_call = self.patch(subprocess, 'check_call')
 
164
        fake_image = factory.make_name('image')
 
165
        self.patch(ephemerals_script, 'copy_file_by_glob').return_value = (
 
166
            fake_image)
 
167
        tarball = factory.make_name('tarball') + '.tar.gz'
 
168
        target_dir = self.make_dir()
 
169
 
 
170
        extract_image_tarball(tarball, target_dir)
 
171
 
 
172
        check_call.assert_called_with([
 
173
            'uec2roottar',
 
174
            fake_image,
 
175
            os.path.join(target_dir, 'dist-root.tar.gz'),
 
176
            ])
 
177
 
 
178
    def test_cleans_up_temp_location(self):
 
179
        self.patch(subprocess, 'check_call')
 
180
        fake_image = factory.make_name('image')
 
181
        self.patch(ephemerals_script, 'copy_file_by_glob').return_value = (
 
182
            fake_image)
 
183
        tarball = factory.make_name('tarball') + '.tar.gz'
 
184
        target_dir = self.make_dir()
 
185
        temp_location = self.make_dir()
 
186
 
 
187
        extract_image_tarball(tarball, target_dir, temp_location)
 
188
 
 
189
        self.assertItemsEqual([], listdir(temp_location))
 
190
 
 
191
    def test_cleans_up_after_failure(self):
 
192
        class DeliberateFailure(RuntimeError):
 
193
            pass
 
194
 
 
195
        self.patch(subprocess, 'check_call').side_effect = DeliberateFailure()
 
196
        fake_image = factory.make_name('image')
 
197
        self.patch(ephemerals_script, 'copy_file_by_glob').return_value = (
 
198
            fake_image)
 
199
        tarball = factory.make_name('tarball') + '.tar.gz'
 
200
        target_dir = self.make_dir()
 
201
        temp_location = self.make_dir()
 
202
 
 
203
        self.assertRaises(
 
204
            DeliberateFailure,
 
205
            extract_image_tarball, tarball, target_dir, temp_location)
 
206
 
 
207
        self.assertItemsEqual([], listdir(temp_location))
 
208
 
 
209
 
 
210
class TestCreateSymlinkedImageDir(PservTestCase):
 
211
    """Tests for `create_symlinked_image_dir`."""
 
212
 
 
213
    def make_original_dir(self):
 
214
        """Create a directory with the kernel, initrd and root tarball."""
 
215
        original_dir = self.make_dir()
 
216
        factory.make_file(original_dir, 'linux')
 
217
        factory.make_file(original_dir, 'initrd.gz')
 
218
        factory.make_file(original_dir, 'dist-root.tar.gz')
 
219
        return original_dir
 
220
 
 
221
    def test_symlinks_files(self):
 
222
        original_dir = self.make_original_dir()
 
223
        temp_location = self.make_dir()
 
224
 
 
225
        image_dir = create_symlinked_image_dir(original_dir, temp_location)
 
226
 
 
227
        self.assertNotEqual(original_dir, image_dir)
 
228
        self.assertNotEqual(temp_location, image_dir)
 
229
        self.assertThat(image_dir, StartsWith(temp_location + '/'))
 
230
        self.assertItemsEqual(
 
231
            ['linux', 'initrd.gz', 'root.tar.gz'],
 
232
            listdir(image_dir))
 
233
        self.assertEqual(
 
234
            os.path.join(original_dir, 'linux'),
 
235
            readlink(os.path.join(image_dir, 'linux')))
 
236
        self.assertEqual(
 
237
            os.path.join(original_dir, 'initrd.gz'),
 
238
            readlink(os.path.join(image_dir, 'initrd.gz')))
 
239
        self.assertEqual(
 
240
            os.path.join(original_dir, 'dist-root.tar.gz'),
 
241
            readlink(os.path.join(image_dir, 'root.tar.gz')))
 
242
 
 
243
    def test_cleans_up_temp_location(self):
 
244
        original_dir = self.make_original_dir()
 
245
        temp_location = self.make_dir()
 
246
 
 
247
        image_dir = create_symlinked_image_dir(original_dir, temp_location)
 
248
 
 
249
        # Nothing is left in temp_location except the result.
 
250
        self.assertItemsEqual(
 
251
            [os.path.basename(image_dir)],
 
252
            listdir(temp_location))
 
253
 
 
254
    def test_cleans_up_after_failure(self):
 
255
        class DeliberateFailure(RuntimeError):
 
256
            pass
 
257
 
 
258
        self.patch(ephemerals_script, 'symlink').side_effect = (
 
259
            DeliberateFailure("Symlinking intentionally broken"))
 
260
        original_dir = self.make_dir()
 
261
        temp_location = self.make_dir()
 
262
 
 
263
        self.assertRaises(
 
264
            DeliberateFailure,
 
265
            create_symlinked_image_dir, original_dir, temp_location)
 
266
 
 
267
        self.assertItemsEqual([], listdir(temp_location))
 
268
 
 
269
 
 
270
class TestInstallImageFromSimplestreams(PservTestCase):
 
271
    """Tests for `install_image_from_simplestreams`."""
 
272
 
 
273
    def prepare_storage_dir(self):
 
274
        """Set up a storage directory with kernel, initrd, and root tarball."""
 
275
        storage = self.make_dir()
 
276
        factory.make_file(storage, 'linux')
 
277
        factory.make_file(storage, 'initrd.gz')
 
278
        factory.make_file(storage, 'dist-root.tar.gz')
 
279
        return storage
 
280
 
 
281
    def patch_config(self, tftp_root):
 
282
        """Set up a fake config, pointing to the given TFTP root directory."""
 
283
        self.useFixture(ConfigFixture({'tftp': {'root': tftp_root}}))
 
284
 
 
285
    def test_installs_image(self):
 
286
        tftp_root = self.make_dir()
 
287
        self.patch_config(tftp_root)
 
288
        storage_dir = self.prepare_storage_dir()
 
289
        release = factory.make_name('release')
 
290
        arch = factory.make_name('arch')
 
291
 
 
292
        install_image_from_simplestreams(
 
293
            storage_dir, release=release, arch=arch)
 
294
 
 
295
        install_dir = locate_tftp_path(
 
296
            compose_image_path(arch, 'generic', release, 'commissioning'),
 
297
            tftproot=tftp_root)
 
298
        self.assertItemsEqual(
 
299
            ['linux', 'initrd.gz', 'root.tar.gz'],
 
300
            listdir(install_dir))
 
301
        self.assertThat(
 
302
            os.path.join(install_dir, 'linux'),
 
303
            FileContains(read_text_file(os.path.join(storage_dir, 'linux'))))
 
304
 
 
305
    def test_cleans_up_temp_location(self):
 
306
        self.patch(ephemerals_script, 'install_image')
 
307
        temp_location = self.make_dir()
 
308
        storage_dir = self.prepare_storage_dir()
 
309
 
 
310
        install_image_from_simplestreams(
 
311
            storage_dir, release=factory.make_name('release'),
 
312
            arch=factory.make_name('arch'), temp_location=temp_location)
 
313
 
 
314
        self.assertItemsEqual([], listdir(temp_location))
 
315
 
 
316
    def test_cleans_up_after_failure(self):
 
317
        class DeliberateFailure(RuntimeError):
 
318
            pass
 
319
 
 
320
        self.patch(ephemerals_script, 'install_image').side_effect = (
 
321
            DeliberateFailure())
 
322
        temp_location = self.make_dir()
 
323
        storage_dir = self.prepare_storage_dir()
 
324
 
 
325
        self.assertRaises(
 
326
            DeliberateFailure,
 
327
            install_image_from_simplestreams,
 
328
            storage_dir, release=factory.make_name('release'),
 
329
            arch=factory.make_name('arch'), temp_location=temp_location)
 
330
 
 
331
        self.assertItemsEqual([], listdir(temp_location))