~ubuntu-branches/ubuntu/utopic/maas/utopic-security

« back to all changes in this revision

Viewing changes to src/provisioningserver/tests/test_tasks.py

  • Committer: Package Import Robot
  • Author(s): Andres Rodriguez, Jeroen Vermeulen, Andres Rodriguez, Jason Hobbs, Raphaël Badin, Louis Bouchard, Gavin Panella
  • Date: 2014-08-21 19:36:30 UTC
  • mfrom: (1.3.1)
  • Revision ID: package-import@ubuntu.com-20140821193630-kertpu5hd8yyss8h
Tags: 1.7.0~beta7+bzr3266-0ubuntu1
* New Upstream Snapshot, Beta 7 bzr3266

[ Jeroen Vermeulen ]
* debian/extras/99-maas-sudoers
  debian/maas-dhcp.postinst
  debian/rules
  - Add second DHCP server instance for IPv6.
* debian/maas-region-controller-min.install
  debian/maas-region-controller-min.lintian-overrides
  - Install deployment user-data: maas_configure_interfaces.py script.
* debian/maas-cluster-controller.links
  debian/maas-cluster-controller.install
  debian/maas-cluster-controller.postinst
  - Reflect Celery removal changes made in trunk r3067.
  - Don't install celeryconfig_cluster.py any longer. 
  - Don't install maas_local_celeryconfig_cluster.py any longer.
  - Don't symlink maas_local_celeryconfig_cluster.py from /etc to /usr.
  - Don't insert UUID into maas_local_celeryconfig_cluster.py.

[ Andres Rodriguez ]
* debian/maas-region-controller-min.postrm: Cleanup lefover files.
* debian/maas-dhcp.postrm: Clean leftover configs.
* Provide new maas-proxy package that replaces the usage of
  squid-deb-proxy:
  - debian/control: New maas-proxy package that replaces the usage
    of squid-deb-proxy; Drop depends on squid-deb-proxy.
  - Add upstrart job.
  - Ensure squid3 is stopped as maas-proxy uses a caching proxy.
* Remove Celery references to cluster controller:
  - Rename upstart job from maas-pserv to maas-cluster; rename
    maas-cluster-celery to maas-cluster-register. Ensure services
    are stopped on upgrade.
  - debian/maintscript: Cleanup config files.
  - Remove all references to the MAAS celery daemon and config
    files as we don't use it like that anymore
* Move some entries in debian/maintscript to
  debian/maas-cluster-controller.maintscript
* Remove usage of txlongpoll and rabbitmq-server. Handle upgrades
  to ensure these are removed correctly.

[ Jason Hobbs ]
* debian/maas-region-controller-min.install: Install
  maas-generate-winrm-cert script.

[ Raphaël Badin ]
* debian/extras/maas-region-admin: Bypass django-admin as it prints
  spurious messages to stdout (LP: #1365130).

[Louis Bouchard]
* debian/maas-cluster-controller.postinst:
  - Exclude /var/log/maas/rsyslog when changing ownership
    (LP: #1346703)

[Gavin Panella]
* debian/maas-cluster-controller.maas-clusterd.upstart:
  - Don't start-up the cluster controller unless a shared-secret has
    been installed.
* debian/maas-cluster-controller.maas-cluster-register.upstart: Drop.

Show diffs side-by-side

added added

removed removed

Lines of Context:
14
14
__metaclass__ = type
15
15
__all__ = []
16
16
 
17
 
from datetime import datetime
18
 
import json
19
17
import os
20
18
import random
21
 
from subprocess import (
22
 
    CalledProcessError,
23
 
    PIPE,
24
 
    )
 
19
from subprocess import CalledProcessError
25
20
 
26
 
from apiclient.creds import convert_tuple_to_string
27
 
from apiclient.maas_client import MAASClient
28
 
from apiclient.testing.credentials import make_api_credentials
29
21
import celery
30
22
from celery import states
31
 
from celery.app import app_or_default
32
 
from celery.task import Task
33
23
from maastesting.celery import CeleryFixture
34
24
from maastesting.factory import factory
35
25
from maastesting.fakemethod import (
36
26
    FakeMethod,
37
27
    MultiFakeMethod,
38
28
    )
39
 
from maastesting.matchers import (
40
 
    MockAnyCall,
41
 
    MockCalledOnceWith,
42
 
    )
43
 
from mock import Mock
44
29
from netaddr import IPNetwork
45
 
from provisioningserver import (
46
 
    auth,
47
 
    boot_images,
48
 
    cache,
49
 
    tags,
50
 
    tasks,
51
 
    )
52
 
from provisioningserver.boot import tftppath
53
 
from provisioningserver.dhcp import (
54
 
    config,
55
 
    leases,
56
 
    )
 
30
from provisioningserver import tasks
57
31
from provisioningserver.dns.config import (
58
 
    celery_conf,
59
32
    MAAS_NAMED_CONF_NAME,
60
33
    MAAS_NAMED_CONF_OPTIONS_INSIDE_NAME,
61
34
    MAAS_NAMED_RNDC_CONF_NAME,
62
35
    MAAS_RNDC_CONF_NAME,
63
36
    )
 
37
from provisioningserver.dns.testing import patch_dns_config_path
64
38
from provisioningserver.dns.zoneconfig import (
65
39
    DNSForwardZoneConfig,
66
40
    DNSReverseZoneConfig,
67
41
    )
68
 
from provisioningserver.import_images import boot_resources
69
 
from provisioningserver.power.poweraction import PowerActionFail
70
 
from provisioningserver.tags import MissingCredentials
71
42
from provisioningserver.tasks import (
72
 
    add_new_dhcp_host_map,
73
 
    ALREADY_STOPPED_MESSAGE,
74
 
    ALREADY_STOPPED_RETURNCODE,
75
 
    enlist_nodes_from_mscm,
76
 
    enlist_nodes_from_ucsm,
77
 
    import_boot_images,
78
 
    Omshell,
79
 
    power_off,
80
 
    power_on,
81
 
    refresh_secrets,
82
 
    remove_dhcp_host_map,
83
 
    report_boot_images,
84
 
    restart_dhcp_server,
85
43
    rndc_command,
86
44
    RNDC_COMMAND_MAX_RETRY,
87
45
    setup_rndc_configuration,
88
 
    stop_dhcp_server,
89
 
    update_node_tags,
90
 
    UPDATE_NODE_TAGS_MAX_RETRY,
91
 
    write_dhcp_config,
92
46
    write_dns_config,
93
47
    write_dns_zone_config,
94
48
    write_full_dns_config,
95
49
    )
96
 
from provisioningserver.testing.boot_images import make_boot_image_params
97
 
from provisioningserver.testing.config import (
98
 
    BootSourcesFixture,
99
 
    set_tftp_root,
100
 
    )
101
50
from provisioningserver.testing.testcase import PservTestCase
102
 
from provisioningserver.utils import filter_dict
103
 
import provisioningserver.utils.fs as fs_module
104
51
from provisioningserver.utils.shell import ExternalProcessError
105
 
from testresources import FixtureResource
106
52
from testtools.matchers import (
107
 
    ContainsAll,
108
53
    Equals,
109
54
    FileExists,
110
55
    MatchesListwise,
111
56
    )
112
57
 
113
 
# An arbitrary MAC address.  Not using a properly random one here since
114
 
# we might accidentally affect real machines on the network.
115
 
arbitrary_mac = "AA:BB:CC:DD:EE:FF"
116
 
 
117
 
 
118
 
celery_config = app_or_default().conf
119
 
 
120
 
 
121
 
class TestRefreshSecrets(PservTestCase):
122
 
    """Tests for the `refresh_secrets` task."""
123
 
 
124
 
    resources = (
125
 
        ("celery", FixtureResource(CeleryFixture())),
126
 
        )
127
 
 
128
 
    def test_does_not_require_arguments(self):
129
 
        refresh_secrets()
130
 
        # Nothing is refreshed, but there is no error either.
131
 
        pass
132
 
 
133
 
    def test_breaks_on_unknown_item(self):
134
 
        self.assertRaises(AssertionError, refresh_secrets, not_an_item=None)
135
 
 
136
 
    def test_works_as_a_task(self):
137
 
        self.assertTrue(refresh_secrets.delay().successful())
138
 
 
139
 
    def test_updates_api_credentials(self):
140
 
        credentials = make_api_credentials()
141
 
        refresh_secrets(
142
 
            api_credentials=convert_tuple_to_string(credentials))
143
 
        self.assertEqual(credentials, auth.get_recorded_api_credentials())
144
 
 
145
 
    def test_updates_nodegroup_uuid(self):
146
 
        nodegroup_uuid = factory.make_name('nodegroupuuid')
147
 
        refresh_secrets(nodegroup_uuid=nodegroup_uuid)
148
 
        self.assertEqual(nodegroup_uuid, cache.cache.get('nodegroup_uuid'))
149
 
 
150
 
 
151
 
class TestPowerTasks(PservTestCase):
152
 
 
153
 
    resources = (
154
 
        ("celery", FixtureResource(CeleryFixture())),
155
 
        )
156
 
 
157
 
    def test_ether_wake_power_on_with_not_enough_template_args(self):
158
 
        # In eager test mode the assertion is raised immediately rather
159
 
        # than being stored in the AsyncResult, so we need to test for
160
 
        # that instead of using result.get().
161
 
        self.assertRaises(
162
 
            PowerActionFail, power_on.delay, "ether_wake")
163
 
 
164
 
    def test_ether_wake_power_on(self):
165
 
        result = power_on.delay(
166
 
            "ether_wake", mac_address=arbitrary_mac)
167
 
        self.assertTrue(result.successful())
168
 
 
169
 
    def test_ether_wake_does_not_support_power_off(self):
170
 
        self.assertRaises(
171
 
            PowerActionFail, power_off.delay,
172
 
            "ether_wake", mac=arbitrary_mac)
173
 
 
174
 
 
175
 
class TestDHCPTasks(PservTestCase):
176
 
 
177
 
    resources = (
178
 
        ("celery", FixtureResource(CeleryFixture())),
179
 
        )
180
 
 
181
 
    def assertRecordedStdin(self, recorder, *args):
182
 
        # Helper to check that the function recorder "recorder" has all
183
 
        # of the items mentioned in "args" which are extracted from
184
 
        # stdin.  We can just check that all the parameters that were
185
 
        # passed are being used.
186
 
        self.assertThat(
187
 
            recorder.extract_args()[0][0],
188
 
            ContainsAll(args))
189
 
 
190
 
    def make_dhcp_config_params(self):
191
 
        """Fake up a dict of dhcp configuration parameters."""
192
 
        param_names = [
193
 
            'interface',
194
 
            'subnet',
195
 
            'subnet_mask',
196
 
            'broadcast_ip',
197
 
            'dns_servers',
198
 
            'domain_name',
199
 
            'router_ip',
200
 
            'ip_range_low',
201
 
            'ip_range_high',
202
 
        ]
203
 
        return {
204
 
            'dhcp_subnets': [
205
 
                {param: factory.make_string() for param in param_names}
206
 
            ],
207
 
            'omapi_key': factory.make_string(),
208
 
        }
209
 
 
210
 
    def test_upload_dhcp_leases(self):
211
 
        self.patch(
212
 
            leases, 'parse_leases_file',
213
 
            Mock(return_value=(datetime.utcnow(), {})))
214
 
        self.patch(leases, 'process_leases', Mock())
215
 
        tasks.upload_dhcp_leases.delay()
216
 
        self.assertEqual(1, leases.process_leases.call_count)
217
 
 
218
 
    def test_add_new_dhcp_host_map(self):
219
 
        # We don't want to actually run omshell in the task, so we stub
220
 
        # out the wrapper class's _run method and record what it would
221
 
        # do.
222
 
        mac = factory.getRandomMACAddress()
223
 
        ip = factory.getRandomIPAddress()
224
 
        server_address = factory.make_string()
225
 
        key = factory.make_string()
226
 
        recorder = FakeMethod(result=(0, "hardware-type"))
227
 
        self.patch(Omshell, '_run', recorder)
228
 
        add_new_dhcp_host_map.delay({ip: mac}, server_address, key)
229
 
 
230
 
        self.assertRecordedStdin(recorder, ip, mac, server_address, key)
231
 
 
232
 
    def test_add_new_dhcp_host_map_failure(self):
233
 
        # Check that task failures are caught.  Nothing much happens in
234
 
        # the Task code right now though.
235
 
        mac = factory.getRandomMACAddress()
236
 
        ip = factory.getRandomIPAddress()
237
 
        server_address = factory.make_string()
238
 
        key = factory.make_string()
239
 
        self.patch(Omshell, '_run', FakeMethod(result=(0, "this_will_fail")))
240
 
        self.assertRaises(
241
 
            CalledProcessError, add_new_dhcp_host_map.delay,
242
 
            {mac: ip}, server_address, key)
243
 
 
244
 
    def test_remove_dhcp_host_map(self):
245
 
        # We don't want to actually run omshell in the task, so we stub
246
 
        # out the wrapper class's _run method and record what it would
247
 
        # do.
248
 
        ip = factory.getRandomIPAddress()
249
 
        server_address = factory.make_string()
250
 
        key = factory.make_string()
251
 
        recorder = FakeMethod(result=(0, "obj: <null>"))
252
 
        self.patch(Omshell, '_run', recorder)
253
 
        remove_dhcp_host_map.delay(ip, server_address, key)
254
 
 
255
 
        self.assertRecordedStdin(recorder, ip, server_address, key)
256
 
 
257
 
    def test_remove_dhcp_host_map_failure(self):
258
 
        # Check that task failures are caught.  Nothing much happens in
259
 
        # the Task code right now though.
260
 
        ip = factory.getRandomIPAddress()
261
 
        server_address = factory.make_string()
262
 
        key = factory.make_string()
263
 
        self.patch(Omshell, '_run', FakeMethod(result=(0, "this_will_fail")))
264
 
        self.assertRaises(
265
 
            CalledProcessError, remove_dhcp_host_map.delay,
266
 
            ip, server_address, key)
267
 
 
268
 
    def test_write_dhcp_config_invokes_script_correctly(self):
269
 
        mocked_proc = Mock()
270
 
        mocked_proc.returncode = 0
271
 
        mocked_proc.communicate = Mock(return_value=('output', 'error output'))
272
 
        mocked_popen = self.patch(
273
 
            fs_module, "Popen", Mock(return_value=mocked_proc))
274
 
 
275
 
        config_params = self.make_dhcp_config_params()
276
 
        write_dhcp_config(**config_params)
277
 
 
278
 
        # It should construct Popen with the right parameters.
279
 
        self.assertThat(mocked_popen, MockAnyCall(
280
 
            ["sudo", "-n", "maas-provision", "atomic-write", "--filename",
281
 
             celery_config.DHCP_CONFIG_FILE, "--mode", "0644"], stdin=PIPE))
282
 
 
283
 
        # It should then pass the content to communicate().
284
 
        content = config.get_config(**config_params).encode("ascii")
285
 
        self.assertThat(mocked_proc.communicate, MockAnyCall(content))
286
 
 
287
 
        # Similarly, it also writes the DHCPD interfaces to
288
 
        # /var/lib/maas/dhcpd-interfaces.
289
 
        self.assertThat(mocked_popen, MockAnyCall(
290
 
            [
291
 
                "sudo", "-n", "maas-provision", "atomic-write", "--filename",
292
 
                celery_config.DHCP_INTERFACES_FILE, "--mode", "0644",
293
 
            ],
294
 
            stdin=PIPE))
295
 
 
296
 
    def test_restart_dhcp_server_sends_command(self):
297
 
        self.patch(tasks, 'call_and_check')
298
 
        restart_dhcp_server()
299
 
        self.assertThat(tasks.call_and_check, MockCalledOnceWith(
300
 
            ['sudo', '-n', 'service', 'maas-dhcp-server', 'restart']))
301
 
 
302
 
    def test_stop_dhcp_server_sends_command_and_writes_empty_config(self):
303
 
        self.patch(tasks, 'call_and_check')
304
 
        self.patch(tasks, 'sudo_write_file')
305
 
        stop_dhcp_server()
306
 
        self.assertThat(tasks.call_and_check, MockCalledOnceWith(
307
 
            ['sudo', '-n', 'service', 'maas-dhcp-server', 'stop'],
308
 
            env={'LC_ALL': 'C'}))
309
 
        self.assertThat(tasks.sudo_write_file, MockCalledOnceWith(
310
 
            celery_config.DHCP_CONFIG_FILE, tasks.DISABLED_DHCP_SERVER))
311
 
 
312
 
    def test_stop_dhcp_server_ignores_already_stopped_error(self):
313
 
        # Add whitespaces around the error message to make sure they
314
 
        # are stipped off.
315
 
        output = ' ' + ALREADY_STOPPED_MESSAGE + '\n'
316
 
        exception = ExternalProcessError(
317
 
            ALREADY_STOPPED_RETURNCODE, [], output=output)
318
 
        self.patch(tasks, 'call_and_check', Mock(side_effect=exception))
319
 
        self.patch(tasks, 'sudo_write_file')
320
 
        self.assertIsNone(stop_dhcp_server())
321
 
 
322
 
    def test_stop_dhcp_server_raises_other_returncodes(self):
323
 
        # Use a returncode that is *not* ALREADY_STOPPED_RETURNCODE.
324
 
        returncode = ALREADY_STOPPED_RETURNCODE + 1
325
 
        exception = ExternalProcessError(
326
 
            returncode, [], output=ALREADY_STOPPED_MESSAGE)
327
 
        self.patch(tasks, 'call_and_check', Mock(side_effect=exception))
328
 
        self.patch(tasks, 'sudo_write_file')
329
 
        self.assertRaises(ExternalProcessError, stop_dhcp_server)
330
 
 
331
 
    def test_stop_dhcp_server_raises_other_error_outputs(self):
332
 
        # Use an error output that is *not* ALREADY_STOPPED_MESSAGE.
333
 
        output = factory.make_string()
334
 
        exception = ExternalProcessError(
335
 
            ALREADY_STOPPED_RETURNCODE, [], output=output)
336
 
        self.patch(tasks, 'call_and_check', Mock(side_effect=exception))
337
 
        self.patch(tasks, 'sudo_write_file')
338
 
        self.assertRaises(ExternalProcessError, stop_dhcp_server)
339
 
 
340
58
 
341
59
def assertTaskRetried(runner, result, nb_retries, task_name):
342
60
    # In celery version 2.5 (in Saucy) a retried tasks that eventually
362
80
        # Patch DNS_CONFIG_DIR so that the configuration files will be
363
81
        # written in a temporary directory.
364
82
        self.dns_conf_dir = self.make_dir()
365
 
        self.patch(celery_conf, 'DNS_CONFIG_DIR', self.dns_conf_dir)
 
83
        patch_dns_config_path(self, self.dns_conf_dir)
366
84
        # Record the calls to 'execute_rndc_command' (instead of
367
85
        # executing real rndc commands).
368
86
        self.rndc_recorder = FakeMethod()
390
108
                )),
391
109
            result)
392
110
 
393
 
    def test_write_dns_config_attached_to_dns_worker_queue(self):
394
 
        self.assertEqual(
395
 
            write_dns_config.queue,
396
 
            celery_config.WORKER_QUEUE_DNS)
397
 
 
398
111
    def test_write_dns_zone_config_writes_file(self):
399
112
        command = factory.make_string()
400
113
        domain = factory.make_string()
429
142
                )),
430
143
            result)
431
144
 
432
 
    def test_write_dns_zone_config_attached_to_dns_worker_queue(self):
433
 
        self.assertEqual(
434
 
            write_dns_zone_config.queue,
435
 
            celery_config.WORKER_QUEUE_DNS)
436
 
 
437
145
    def test_setup_rndc_configuration_writes_files(self):
438
146
        command = factory.make_string()
439
147
        result = setup_rndc_configuration.delay(
456
164
                )),
457
165
            result)
458
166
 
459
 
    def test_setup_rndc_configuration_attached_to_dns_worker_queue(self):
460
 
        self.assertEqual(
461
 
            setup_rndc_configuration.queue,
462
 
            celery_config.WORKER_QUEUE_DNS)
463
 
 
464
167
    def test_rndc_command_execute_command(self):
465
168
        command = factory.make_string()
466
169
        result = rndc_command.delay(command)
504
207
            ExternalProcessError, rndc_command.delay,
505
208
            command, retry=True)
506
209
 
507
 
    def test_rndc_command_attached_to_dns_worker_queue(self):
508
 
        self.assertEqual(rndc_command.queue, celery_config.WORKER_QUEUE_DNS)
509
 
 
510
210
    def test_write_full_dns_config_sets_up_config(self):
511
211
        # write_full_dns_config writes the config file, writes
512
212
        # the zone files, and reloads the dns service.
527
227
        result = write_full_dns_config.delay(
528
228
            zones=zones,
529
229
            callback=rndc_command.subtask(args=[command]),
530
 
            upstream_dns=factory.getRandomIPAddress())
 
230
            upstream_dns=factory.make_ipv4_address())
531
231
 
532
232
        forward_file_name = 'zone.%s' % domain
533
233
        reverse_file_name = 'zone.0.168.192.in-addr.arpa'
550
250
                    FileExists(),
551
251
                    FileExists(),
552
252
                )))
553
 
 
554
 
    def test_write_full_dns_attached_to_dns_worker_queue(self):
555
 
        self.assertEqual(
556
 
            write_full_dns_config.queue,
557
 
            celery_config.WORKER_QUEUE_DNS)
558
 
 
559
 
 
560
 
class TestBootImagesTasks(PservTestCase):
561
 
 
562
 
    resources = (
563
 
        ("celery", FixtureResource(CeleryFixture())),
564
 
        )
565
 
 
566
 
    def test_sends_boot_images_to_server(self):
567
 
        self.useFixture(set_tftp_root(self.make_dir()))
568
 
        self.set_maas_url()
569
 
        auth.record_api_credentials(':'.join(make_api_credentials()))
570
 
        image = make_boot_image_params()
571
 
        self.patch(tftppath, 'list_boot_images', Mock(return_value=[image]))
572
 
        self.patch(boot_images, "get_cluster_uuid")
573
 
        self.patch(MAASClient, 'post')
574
 
 
575
 
        report_boot_images.delay()
576
 
 
577
 
        args, kwargs = MAASClient.post.call_args
578
 
        self.assertItemsEqual([image], json.loads(kwargs['images']))
579
 
 
580
 
 
581
 
class TestTagTasks(PservTestCase):
582
 
 
583
 
    def setUp(self):
584
 
        super(TestTagTasks, self).setUp()
585
 
        self.celery = self.useFixture(CeleryFixture())
586
 
 
587
 
    def test_update_node_tags_can_be_retried(self):
588
 
        self.set_secrets()
589
 
        # The update_node_tags task can be retried.
590
 
        # Simulate a temporary failure.
591
 
        number_of_failures = UPDATE_NODE_TAGS_MAX_RETRY
592
 
        raised_exception = MissingCredentials(
593
 
            factory.make_name('exception'), random.randint(100, 200))
594
 
        simulate_failures = MultiFakeMethod(
595
 
            [FakeMethod(failure=raised_exception)] * number_of_failures +
596
 
            [FakeMethod()])
597
 
        self.patch(tags, 'process_node_tags', simulate_failures)
598
 
        tag = factory.make_string()
599
 
        result = update_node_tags.delay(
600
 
            tag, '//node', tag_nsmap=None, retry=True)
601
 
        assertTaskRetried(
602
 
            self, result, UPDATE_NODE_TAGS_MAX_RETRY + 1,
603
 
            'provisioningserver.tasks.update_node_tags')
604
 
 
605
 
    def test_update_node_tags_is_retried_a_limited_number_of_times(self):
606
 
        self.set_secrets()
607
 
        # If we simulate UPDATE_NODE_TAGS_MAX_RETRY + 1 failures, the
608
 
        # task fails.
609
 
        number_of_failures = UPDATE_NODE_TAGS_MAX_RETRY + 1
610
 
        raised_exception = MissingCredentials(
611
 
            factory.make_name('exception'), random.randint(100, 200))
612
 
        simulate_failures = MultiFakeMethod(
613
 
            [FakeMethod(failure=raised_exception)] * number_of_failures +
614
 
            [FakeMethod()])
615
 
        self.patch(tags, 'process_node_tags', simulate_failures)
616
 
        tag = factory.make_string()
617
 
        self.assertRaises(
618
 
            MissingCredentials, update_node_tags.delay, tag,
619
 
            '//node', tag_nsmap=None, retry=True)
620
 
 
621
 
 
622
 
class TestImportBootImages(PservTestCase):
623
 
 
624
 
    def make_archive_url(self, name=None):
625
 
        if name is None:
626
 
            name = factory.make_name('archive')
627
 
        return 'http://%s.example.com/%s' % (name, factory.make_name('path'))
628
 
 
629
 
    def patch_boot_resources_function(self):
630
 
        """Patch out `boot_resources.import_images`.
631
 
 
632
 
        Returns the installed fake.  After the fake has been called, but not
633
 
        before, its `env` attribute will have a copy of the environment dict.
634
 
        """
635
 
 
636
 
        class CaptureEnv:
637
 
            """Fake function; records a copy of the environment."""
638
 
 
639
 
            def __call__(self, *args, **kwargs):
640
 
                self.args = args
641
 
                self.env = os.environ.copy()
642
 
 
643
 
        return self.patch(boot_resources, 'import_images', CaptureEnv())
644
 
 
645
 
    def test_import_boot_images_integrates_with_boot_resources_function(self):
646
 
        # If the config specifies no sources, nothing will be imported.  But
647
 
        # the task succeeds without errors.
648
 
        fixture = self.useFixture(BootSourcesFixture([]))
649
 
        self.patch(boot_resources, 'logger')
650
 
        self.patch(boot_resources, 'locate_config').return_value = (
651
 
            fixture.filename)
652
 
        import_boot_images(sources=[])
653
 
        self.assertIsInstance(import_boot_images, Task)
654
 
 
655
 
    def test_import_boot_images_sets_GPGHOME(self):
656
 
        home = factory.make_name('home')
657
 
        self.patch(tasks, 'MAAS_USER_GPGHOME', home)
658
 
        fake = self.patch_boot_resources_function()
659
 
        import_boot_images(sources=[])
660
 
        self.assertEqual(home, fake.env['GNUPGHOME'])
661
 
 
662
 
    def test_import_boot_images_sets_proxy_if_given(self):
663
 
        proxy = 'http://%s.example.com' % factory.make_name('proxy')
664
 
        proxy_vars = ['http_proxy', 'https_proxy']
665
 
        fake = self.patch_boot_resources_function()
666
 
        import_boot_images(sources=[], http_proxy=proxy)
667
 
        self.assertEqual(
668
 
            {
669
 
                var: proxy
670
 
                for var in proxy_vars
671
 
            },
672
 
            filter_dict(fake.env, proxy_vars))
673
 
 
674
 
    def test_import_boot_images_leaves_proxy_unchanged_if_not_given(self):
675
 
        proxy_vars = ['http_proxy', 'https_proxy']
676
 
        fake = self.patch_boot_resources_function()
677
 
        import_boot_images(sources=[])
678
 
        self.assertEqual({}, filter_dict(fake.env, proxy_vars))
679
 
 
680
 
    def test_import_boot_images_calls_callback(self):
681
 
        self.patch_boot_resources_function()
682
 
        mock_callback = Mock()
683
 
        import_boot_images(sources=[], callback=mock_callback)
684
 
        self.assertThat(mock_callback.delay, MockCalledOnceWith())
685
 
 
686
 
    def test_import_boot_images_accepts_sources_parameter(self):
687
 
        fake = self.patch(boot_resources, 'import_images')
688
 
        sources = [
689
 
            {
690
 
                'path': "http://example.com",
691
 
                'selections': [
692
 
                    {
693
 
                        'release': "trusty",
694
 
                        'arches': ["amd64"],
695
 
                        'subarches': ["generic"],
696
 
                        'labels': ["release"]
697
 
                    },
698
 
                ],
699
 
            },
700
 
        ]
701
 
        import_boot_images(sources=sources)
702
 
        self.assertThat(fake, MockCalledOnceWith(sources))
703
 
 
704
 
 
705
 
class TestAddUCSM(PservTestCase):
706
 
 
707
 
    def test_enlist_nodes_from_ucsm(self):
708
 
        url = 'url'
709
 
        username = 'username'
710
 
        password = 'password'
711
 
        mock = self.patch(tasks, 'probe_and_enlist_ucsm')
712
 
        enlist_nodes_from_ucsm(url, username, password)
713
 
        self.assertThat(mock, MockCalledOnceWith(url, username, password))
714
 
 
715
 
 
716
 
class TestAddMSCM(PservTestCase):
717
 
 
718
 
    def test_enlist_nodes_from_mscm(self):
719
 
        host = 'host'
720
 
        username = 'username'
721
 
        password = 'password'
722
 
        mock = self.patch(tasks, 'probe_and_enlist_mscm')
723
 
        enlist_nodes_from_mscm(host, username, password)
724
 
        self.assertThat(mock, MockCalledOnceWith(host, username, password))