~lutostag/ubuntu/trusty/maas/1.5.4+keystone

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
# Copyright 2005-2013 Canonical Ltd.  This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).

"""Tests for the maastftp Twisted plugin."""

from __future__ import (
    absolute_import,
    print_function,
    unicode_literals,
    )

str = None

__metaclass__ = type
__all__ = []

from functools import partial
import json
from os import path
from urllib import urlencode
from urlparse import (
    parse_qsl,
    urlparse,
    )

from maastesting.factory import factory
from maastesting.testcase import MAASTestCase
from provisioningserver import tftp as tftp_module
from provisioningserver.pxe.tftppath import compose_config_path
from provisioningserver.tests.test_kernel_opts import make_kernel_parameters
from provisioningserver.tftp import (
    BytesReader,
    TFTPBackend,
    TFTPService,
    )
from testtools.deferredruntest import AsynchronousDeferredRunTest
from testtools.matchers import (
    AfterPreprocessing,
    AllMatch,
    Equals,
    IsInstance,
    MatchesAll,
    MatchesStructure,
    )
from tftp.backend import IReader
from tftp.protocol import TFTP
from twisted.application import internet
from twisted.application.service import MultiService
from twisted.internet import reactor
from twisted.internet.defer import (
    inlineCallbacks,
    succeed,
    )
from twisted.python import context
from zope.interface.verify import verifyObject


class TestBytesReader(MAASTestCase):
    """Tests for `provisioningserver.tftp.BytesReader`."""

    def test_interfaces(self):
        reader = BytesReader(b"")
        self.addCleanup(reader.finish)
        verifyObject(IReader, reader)

    def test_read(self):
        data = factory.getRandomString(size=10).encode("ascii")
        reader = BytesReader(data)
        self.addCleanup(reader.finish)
        self.assertEqual(data[:7], reader.read(7))
        self.assertEqual(data[7:], reader.read(7))
        self.assertEqual(b"", reader.read(7))

    def test_finish(self):
        reader = BytesReader(b"1234")
        reader.finish()
        self.assertRaises(ValueError, reader.read, 1)


class TestTFTPBackendRegex(MAASTestCase):
    """Tests for `provisioningserver.tftp.TFTPBackend.re_config_file`."""

    @staticmethod
    def get_example_path_and_components():
        """Return a plausible path and its components.

        The path is intended to match `re_config_file`, and the components are
        the expected groups from a match.
        """
        components = {"mac": factory.getRandomMACAddress("-"),
                      "arch": None,
                      "subarch": None}
        config_path = compose_config_path(components["mac"])
        return config_path, components

    def test_re_config_file_is_compatible_with_config_path_generator(self):
        # The regular expression for extracting components of the file path is
        # compatible with the PXE config path generator.
        regex = TFTPBackend.re_config_file
        for iteration in range(10):
            config_path, args = self.get_example_path_and_components()
            match = regex.match(config_path)
            self.assertIsNotNone(match, config_path)
            self.assertEqual(args, match.groupdict())

    def test_re_config_file_with_leading_slash(self):
        # The regular expression for extracting components of the file path
        # doesn't care if there's a leading forward slash; the TFTP server is
        # easy on this point, so it makes sense to be also.
        config_path, args = self.get_example_path_and_components()
        # Ensure there's a leading slash.
        config_path = "/" + config_path.lstrip("/")
        match = TFTPBackend.re_config_file.match(config_path)
        self.assertIsNotNone(match, config_path)
        self.assertEqual(args, match.groupdict())

    def test_re_config_file_without_leading_slash(self):
        # The regular expression for extracting components of the file path
        # doesn't care if there's no leading forward slash; the TFTP server is
        # easy on this point, so it makes sense to be also.
        config_path, args = self.get_example_path_and_components()
        # Ensure there's no leading slash.
        config_path = config_path.lstrip("/")
        match = TFTPBackend.re_config_file.match(config_path)
        self.assertIsNotNone(match, config_path)
        self.assertEqual(args, match.groupdict())

    def test_re_config_file_matches_classic_pxelinux_cfg(self):
        # The default config path is simply "pxelinux.cfg" (without
        # leading slash).  The regex matches this.
        mac = 'aa-bb-cc-dd-ee-ff'
        match = TFTPBackend.re_config_file.match('pxelinux.cfg/01-%s' % mac)
        self.assertIsNotNone(match)
        self.assertEqual({'mac': mac, 'arch': None, 'subarch': None},
                         match.groupdict())

    def test_re_config_file_matches_pxelinux_cfg_with_leading_slash(self):
        mac = 'aa-bb-cc-dd-ee-ff'
        match = TFTPBackend.re_config_file.match('/pxelinux.cfg/01-%s' % mac)
        self.assertIsNotNone(match)
        self.assertEqual({'mac': mac, 'arch': None, 'subarch': None},
                         match.groupdict())

    def test_re_config_file_does_not_match_non_config_file(self):
        self.assertIsNone(
            TFTPBackend.re_config_file.match('pxelinux.cfg/kernel'))

    def test_re_config_file_does_not_match_file_in_root(self):
        self.assertIsNone(
            TFTPBackend.re_config_file.match('01-aa-bb-cc-dd-ee-ff'))

    def test_re_config_file_does_not_match_file_not_in_pxelinux_cfg(self):
        self.assertIsNone(
            TFTPBackend.re_config_file.match('foo/01-aa-bb-cc-dd-ee-ff'))

    def test_re_config_file_with_default(self):
        match = TFTPBackend.re_config_file.match('pxelinux.cfg/default')
        self.assertIsNotNone(match)
        self.assertEqual(
            {'mac': None, 'arch': None, 'subarch': None},
            match.groupdict())

    def test_re_config_file_with_default_arch(self):
        arch = factory.make_name('arch', sep='')
        match = TFTPBackend.re_config_file.match('pxelinux.cfg/default.%s' %
                                                 arch)
        self.assertIsNotNone(match)
        self.assertEqual(
            {'mac': None, 'arch': arch, 'subarch': None},
            match.groupdict())

    def test_re_config_file_with_default_arch_and_subarch(self):
        arch = factory.make_name('arch', sep='')
        subarch = factory.make_name('subarch', sep='')
        match = TFTPBackend.re_config_file.match(
            'pxelinux.cfg/default.%s-%s' % (arch, subarch))
        self.assertIsNotNone(match)
        self.assertEqual(
            {'mac': None, 'arch': arch, 'subarch': subarch},
            match.groupdict())


class TestTFTPBackend(MAASTestCase):
    """Tests for `provisioningserver.tftp.TFTPBackend`."""

    run_tests_with = AsynchronousDeferredRunTest.make_factory(timeout=5)

    def test_init(self):
        temp_dir = self.make_dir()
        generator_url = "http://%s.example.com/%s" % (
            factory.make_name("domain"), factory.make_name("path"))
        backend = TFTPBackend(temp_dir, generator_url)
        self.assertEqual((True, False), (backend.can_read, backend.can_write))
        self.assertEqual(temp_dir, backend.base.path)
        self.assertEqual(generator_url, backend.generator_url.geturl())

    def test_get_generator_url(self):
        # get_generator_url() merges the parameters obtained from the request
        # file path (arch, subarch, name) into the configured generator URL.
        mac = factory.getRandomMACAddress("-")
        dummy = factory.make_name("dummy").encode("ascii")
        backend_url = b"http://example.com/?" + urlencode({b"dummy": dummy})
        backend = TFTPBackend(self.make_dir(), backend_url)
        # params is an example of the parameters obtained from a request.
        params = {"mac": mac}
        generator_url = urlparse(backend.get_generator_url(params))
        self.assertEqual("example.com", generator_url.hostname)
        query = parse_qsl(generator_url.query)
        query_expected = [
            ("dummy", dummy),
            ("mac", mac),
            ]
        self.assertItemsEqual(query_expected, query)

    @inlineCallbacks
    def test_get_reader_regular_file(self):
        # TFTPBackend.get_reader() returns a regular FilesystemReader for
        # paths not matching re_config_file.
        data = factory.getRandomString().encode("ascii")
        temp_file = self.make_file(name="example", contents=data)
        temp_dir = path.dirname(temp_file)
        backend = TFTPBackend(temp_dir, "http://nowhere.example.com/")
        reader = yield backend.get_reader("example")
        self.addCleanup(reader.finish)
        self.assertEqual(len(data), reader.size)
        self.assertEqual(data, reader.read(len(data)))
        self.assertEqual(b"", reader.read(1))

    @inlineCallbacks
    def test_get_reader_config_file(self):
        # For paths matching re_config_file, TFTPBackend.get_reader() returns
        # a Deferred that will yield a BytesReader.
        cluster_uuid = factory.getRandomUUID()
        self.patch(tftp_module, 'get_cluster_uuid').return_value = (
            cluster_uuid)
        mac = factory.getRandomMACAddress("-")
        config_path = compose_config_path(mac)
        backend = TFTPBackend(self.make_dir(), b"http://example.com/")
        # python-tx-tftp sets up call context so that backends can discover
        # more about the environment in which they're running.
        call_context = {
            "local": (
                factory.getRandomIPAddress(),
                factory.getRandomPort()),
            "remote": (
                factory.getRandomIPAddress(),
                factory.getRandomPort()),
            }

        @partial(self.patch, backend, "get_config_reader")
        def get_config_reader(params):
            params_json = json.dumps(params)
            params_json_reader = BytesReader(params_json)
            return succeed(params_json_reader)

        reader = yield context.call(
            call_context, backend.get_reader, config_path)
        output = reader.read(10000)
        # The addresses provided by python-tx-tftp in the call context are
        # passed over the wire as address:port strings.
        expected_params = {
            "mac": mac,
            "local": call_context["local"][0],  # address only.
            "remote": call_context["remote"][0],  # address only.
            "cluster_uuid": cluster_uuid,
            }
        observed_params = json.loads(output)
        self.assertEqual(expected_params, observed_params)

    @inlineCallbacks
    def test_get_config_reader_returns_rendered_params(self):
        # get_config_reader() takes a dict() of parameters and returns an
        # `IReader` of a PXE configuration, rendered by `render_pxe_config`.
        backend = TFTPBackend(self.make_dir(), b"http://example.com/")
        # Fake configuration parameters, as discovered from the file path.
        fake_params = {"mac": factory.getRandomMACAddress("-")}
        # Fake kernel configuration parameters, as returned from the API call.
        fake_kernel_params = make_kernel_parameters()

        # Stub get_page to return the fake API configuration parameters.
        fake_get_page_result = json.dumps(fake_kernel_params._asdict())
        get_page_patch = self.patch(backend, "get_page")
        get_page_patch.return_value = succeed(fake_get_page_result)

        # Stub render_pxe_config to return the render parameters.
        fake_render_result = factory.make_name("render")
        render_patch = self.patch(backend, "render_pxe_config")
        render_patch.return_value = fake_render_result

        # Get the rendered configuration, which will actually be a JSON dump
        # of the render-time parameters.
        reader = yield backend.get_config_reader(fake_params)
        self.addCleanup(reader.finish)
        self.assertIsInstance(reader, BytesReader)
        output = reader.read(10000)

        # The kernel parameters were fetched using `backend.get_page`.
        backend.get_page.assert_called_once()

        # The result has been rendered by `backend.render_pxe_config`.
        self.assertEqual(fake_render_result.encode("utf-8"), output)
        backend.render_pxe_config.assert_called_once_with(
            kernel_params=fake_kernel_params, **fake_params)


class TestTFTPService(MAASTestCase):

    def test_tftp_service(self):
        # A TFTP service is configured and added to the top-level service.
        interfaces = [
            factory.getRandomIPAddress(),
            factory.getRandomIPAddress(),
            ]
        self.patch(
            tftp_module, "get_all_interface_addresses",
            lambda: interfaces)
        example_root = self.make_dir()
        example_generator = "http://example.com/generator"
        example_port = factory.getRandomPort()
        tftp_service = TFTPService(
            root=example_root, generator=example_generator,
            port=example_port)
        tftp_service.updateServers()
        # The "tftp" service is a multi-service containing UDP servers for
        # each interface defined by get_all_interface_addresses().
        self.assertIsInstance(tftp_service, MultiService)
        # There's also a TimerService that updates the servers every 45s.
        self.assertThat(
            tftp_service.refresher, MatchesStructure.byEquality(
                step=45, parent=tftp_service, name="refresher",
                call=(tftp_service.updateServers, (), {}),
            ))
        expected_backend = MatchesAll(
            IsInstance(TFTPBackend),
            AfterPreprocessing(
                lambda backend: backend.base.path,
                Equals(example_root)),
            AfterPreprocessing(
                lambda backend: backend.generator_url.geturl(),
                Equals(example_generator)))
        expected_protocol = MatchesAll(
            IsInstance(TFTP),
            AfterPreprocessing(
                lambda protocol: protocol.backend,
                expected_backend))
        expected_server = MatchesAll(
            IsInstance(internet.UDPServer),
            AfterPreprocessing(
                lambda service: len(service.args),
                Equals(2)),
            AfterPreprocessing(
                lambda service: service.args[0],  # port
                Equals(example_port)),
            AfterPreprocessing(
                lambda service: service.args[1],  # protocol
                expected_protocol))
        self.assertThat(
            tftp_service.getServers(),
            AllMatch(expected_server))
        # Only the interface used for each service differs.
        self.assertItemsEqual(
            [svc.kwargs for svc in tftp_service.getServers()],
            [{"interface": interface} for interface in interfaces])

    def test_tftp_service_rebinds_on_HUP(self):
        # Initial set of interfaces to bind to.
        interfaces = {"1.1.1.1", "2.2.2.2"}
        self.patch(
            tftp_module, "get_all_interface_addresses",
            lambda: interfaces)

        tftp_service = TFTPService(
            root=self.make_dir(), generator="http://mighty/wind",
            port=factory.getRandomPort())
        tftp_service.updateServers()

        # The child services of tftp_services are named after the
        # interface they bind to.
        self.assertEqual(interfaces, {
            server.name for server in tftp_service.getServers()
        })

        # Update the set of interfaces to bind to.
        interfaces.add("3.3.3.3")
        interfaces.remove("1.1.1.1")

        # Ask the TFTP service to update its set of servers.
        tftp_service.updateServers()

        # We're in the reactor thread but we want to move the reactor
        # forwards, hence we need to get all explicit about it.
        reactor.runUntilCurrent()

        # The interfaces now bound match the updated interfaces set.
        self.assertEqual(interfaces, {
            server.name for server in tftp_service.getServers()
        })