~blake-rouse/maas/osystem-preseed-cleanup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# Copyright 2012-2014 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Tests for the tftppath module."""

from __future__ import (
    absolute_import,
    print_function,
    unicode_literals,
    )

str = None

__metaclass__ = type
__all__ = []

import errno
import os.path

from maastesting.factory import factory
from maastesting.testcase import MAASTestCase
from mock import Mock
from provisioningserver.boot import tftppath
from provisioningserver.boot.tftppath import (
    compose_image_path,
    drill_down,
    extend_path,
    is_visible_subdir,
    list_boot_images,
    list_subdirs,
    locate_tftp_path,
    )
from provisioningserver.driver import (
    OperatingSystem,
    OperatingSystemRegistry,
    )
from provisioningserver.testing.boot_images import (
    make_boot_image_storage_params,
    )
from provisioningserver.testing.config import set_tftp_root
from testtools.matchers import (
    Not,
    StartsWith,
    )
from testtools.testcase import ExpectedException


class FakeOS(OperatingSystem):

    name = ""
    title = ""

    def __init__(self, name, purpose, releases=None):
        self.name = name
        self.title = name
        self.purpose = purpose
        if releases is None:
            self.fake_list = [
                factory.getRandomString()
                for _ in range(3)
                ]
        else:
            self.fake_list = releases

    def get_boot_image_purposes(self, *args):
        return self.purpose

    def get_supported_releases(self):
        return self.fake_list

    def get_default_release(self):
        return self.fake_list[0]

    def format_release_choices(self, releases):
        return [
            (release, release)
            for release in releases
            if release in self.fake_list
            ]


def make_image(params, purpose):
    """Describe an image as a dict similar to what `list_boot_images` returns.

    The `params` are as returned from `make_boot_image_storage_params`.
    """
    image = params.copy()
    image['purpose'] = purpose
    return image


def make_osystem(testcase, osystem, purpose):
    """Makes the operating system class and registers it."""
    if osystem not in OperatingSystemRegistry:
        fake = FakeOS(osystem, purpose)
        OperatingSystemRegistry.register_item(fake.name, fake)
        testcase.addCleanup(
            OperatingSystemRegistry.unregister_item, osystem)
        return fake

    else:

        obj = OperatingSystemRegistry[osystem]
        old_func = obj.get_boot_image_purposes
        testcase.patch(obj, 'get_boot_image_purposes').return_value = purpose

        def reset_func(obj, old_func):
            obj.get_boot_image_purposes = old_func

        testcase.addCleanup(reset_func, obj, old_func)

        return obj


class TestTFTPPath(MAASTestCase):

    def setUp(self):
        super(TestTFTPPath, self).setUp()
        self.tftproot = self.make_dir()
        self.useFixture(set_tftp_root(self.tftproot))

    def make_image_dir(self, image_params, tftproot):
        """Fake a boot image matching `image_params` under `tftproot`."""
        image_dir = locate_tftp_path(
            compose_image_path(
                osystem=image_params['osystem'],
                arch=image_params['architecture'],
                subarch=image_params['subarchitecture'],
                release=image_params['release'],
                label=image_params['label']),
            tftproot)
        os.makedirs(image_dir)
        factory.make_file(image_dir, 'linux')
        factory.make_file(image_dir, 'initrd.gz')

    def test_compose_image_path_follows_storage_directory_layout(self):
        osystem = factory.make_name('osystem')
        arch = factory.make_name('arch')
        subarch = factory.make_name('subarch')
        release = factory.make_name('release')
        label = factory.make_name('label')
        self.assertEqual(
            '%s/%s/%s/%s/%s' % (osystem, arch, subarch, release, label),
            compose_image_path(osystem, arch, subarch, release, label))

    def test_compose_image_path_does_not_include_tftp_root(self):
        osystem = factory.make_name('osystem')
        arch = factory.make_name('arch')
        subarch = factory.make_name('subarch')
        release = factory.make_name('release')
        label = factory.make_name('label')
        self.assertThat(
            compose_image_path(osystem, arch, subarch, release, label),
            Not(StartsWith(self.tftproot)))

    def test_locate_tftp_path_prefixes_tftp_root(self):
        pxefile = factory.make_name('pxefile')
        self.assertEqual(
            os.path.join(self.tftproot, pxefile),
            locate_tftp_path(pxefile, tftproot=self.tftproot))

    def test_locate_tftp_path_returns_root_when_path_is_None(self):
        self.assertEqual(
            self.tftproot, locate_tftp_path(None, tftproot=self.tftproot))

    def test_list_boot_images_copes_with_missing_directory(self):
        self.assertEqual([], list_boot_images(factory.getRandomString()))

    def test_list_boot_images_passes_on_other_exceptions(self):
        error = OSError(errno.EACCES, "Deliberate error for testing.")
        self.patch(tftppath, 'list_subdirs', Mock(side_effect=error))
        with ExpectedException(OSError):
            list_boot_images(factory.getRandomString())

    def test_list_boot_images_copes_with_empty_directory(self):
        self.assertEqual([], list_boot_images(self.tftproot))

    def test_list_boot_images_copes_with_unexpected_files(self):
        os.makedirs(os.path.join(self.tftproot, factory.make_name('empty')))
        factory.make_file(self.tftproot)
        self.assertEqual([], list_boot_images(self.tftproot))

    def test_list_boot_images_finds_boot_image(self):
        params = make_boot_image_storage_params()
        self.make_image_dir(params, self.tftproot)
        purposes = ['install', 'commissioning', 'xinstall']
        make_osystem(self, params['osystem'], purposes)
        self.assertItemsEqual(
            [make_image(params, purpose) for purpose in purposes],
            list_boot_images(self.tftproot))

    def test_list_boot_images_enumerates_boot_images(self):
        purposes = ['install', 'commissioning', 'xinstall']
        params = [make_boot_image_storage_params() for counter in range(3)]
        for param in params:
            self.make_image_dir(param, self.tftproot)
            make_osystem(self, param['osystem'], purposes)
        self.assertItemsEqual(
            [
                make_image(param, purpose)
                for param in params
                for purpose in purposes
            ],
            list_boot_images(self.tftproot))

    def test_list_boot_images_empty_on_missing_osystems(self):
        params = [make_boot_image_storage_params() for counter in range(3)]
        for param in params:
            self.make_image_dir(param, self.tftproot)
        self.assertItemsEqual([], list_boot_images(self.tftproot))

    def test_is_visible_subdir_ignores_regular_files(self):
        plain_file = self.make_file()
        self.assertFalse(
            is_visible_subdir(
                os.path.dirname(plain_file), os.path.basename(plain_file)))

    def test_is_visible_subdir_ignores_hidden_directories(self):
        base_dir = self.make_dir()
        hidden_dir = factory.make_name('.')
        os.makedirs(os.path.join(base_dir, hidden_dir))
        self.assertFalse(is_visible_subdir(base_dir, hidden_dir))

    def test_is_visible_subdir_recognizes_subdirectory(self):
        base_dir = self.make_dir()
        subdir = factory.make_name('subdir')
        os.makedirs(os.path.join(base_dir, subdir))
        self.assertTrue(is_visible_subdir(base_dir, subdir))

    def test_list_subdirs_lists_empty_directory(self):
        self.assertItemsEqual([], list_subdirs(self.make_dir()))

    def test_list_subdirs_lists_subdirs(self):
        base_dir = self.make_dir()
        factory.make_file(base_dir, factory.make_name('plain-file'))
        subdir = factory.make_name('subdir')
        os.makedirs(os.path.join(base_dir, subdir))
        self.assertItemsEqual([subdir], list_subdirs(base_dir))

    def test_extend_path_finds_path_extensions(self):
        base_dir = self.make_dir()
        subdirs = [
            factory.make_name('subdir-%d' % counter)
            for counter in range(3)]
        for subdir in subdirs:
            os.makedirs(os.path.join(base_dir, subdir))
        self.assertItemsEqual(
            [[os.path.basename(base_dir), subdir] for subdir in subdirs],
            extend_path(
                os.path.dirname(base_dir), [os.path.basename(base_dir)]))

    def test_extend_path_builds_on_given_paths(self):
        base_dir = self.make_dir()
        lower_dir = factory.make_name('lower')
        subdir = factory.make_name('sub')
        os.makedirs(os.path.join(base_dir, lower_dir, subdir))
        self.assertEqual(
            [[lower_dir, subdir]],
            extend_path(base_dir, [lower_dir]))

    def test_extend_path_stops_if_no_subdirs_found(self):
        self.assertItemsEqual([], extend_path(self.make_dir(), []))

    def test_drill_down_follows_directory_tree(self):
        base_dir = self.make_dir()
        lower_dir = factory.make_name('lower')
        os.makedirs(os.path.join(base_dir, lower_dir))
        subdirs = [
            factory.make_name('subdir-%d' % counter)
            for counter in range(3)]
        for subdir in subdirs:
            os.makedirs(os.path.join(base_dir, lower_dir, subdir))
        self.assertItemsEqual(
            [[lower_dir, subdir] for subdir in subdirs],
            drill_down(base_dir, [[lower_dir]]))

    def test_drill_down_ignores_subdir_not_in_path(self):
        base_dir = self.make_dir()
        irrelevant_dir = factory.make_name('irrelevant')
        irrelevant_subdir = factory.make_name('subdir')
        relevant_dir = factory.make_name('relevant')
        relevant_subdir = factory.make_name('subdir')
        os.makedirs(os.path.join(base_dir, irrelevant_dir, irrelevant_subdir))
        os.makedirs(os.path.join(base_dir, relevant_dir, relevant_subdir))
        self.assertEqual(
            [[relevant_dir, relevant_subdir]],
            drill_down(base_dir, [[relevant_dir]]))

    def test_drill_down_drops_paths_that_do_not_go_deep_enough(self):
        base_dir = self.make_dir()
        shallow_dir = factory.make_name('shallow')
        os.makedirs(os.path.join(base_dir, shallow_dir))
        deep_dir = factory.make_name('deep')
        subdir = factory.make_name('sub')
        os.makedirs(os.path.join(base_dir, deep_dir, subdir))
        self.assertEqual(
            [[deep_dir, subdir]],
            drill_down(base_dir, [[shallow_dir], [deep_dir]]))