~hazmat/pyjuju/proposed-support

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
import logging

from twisted.internet.defer import (
    Deferred, inlineCallbacks, fail, returnValue, succeed)

from juju.errors import ProviderInteractionError
from juju.lib.mocker import MATCH
from juju.machine.tests.test_constraints import series_constraints
from juju.providers.dummy import DummyMachine, MachineProvider
from juju.state.errors import StopWatcher
from juju.state.firewall import FirewallManager
from juju.state.machine import MachineStateManager
from juju.state.service import ServiceStateManager
from juju.state.tests.test_service import ServiceStateManagerTestBase


MATCH_MACHINE = MATCH(lambda x: isinstance(x, DummyMachine))


class FirewallTestBase(ServiceStateManagerTestBase):

    @inlineCallbacks
    def setUp(self):
        yield super(FirewallTestBase, self).setUp()
        self._running = True
        self.environment = self.config.get_default()
        self.provider = self.environment.get_machine_provider()
        self.firewall_manager = FirewallManager(
            self.client, self.is_running, self.provider)
        self.service_state_manager = ServiceStateManager(self.client)
        self.output = self.capture_logging(level=logging.DEBUG)

    # The following methods are used to provide the scaffolding given
    # by the provisioning agent, which normally runs the FirewallManager

    def is_running(self):
        return self._running

    def start(self):
        self.service_state_manager.watch_service_states(
            self.firewall_manager.watch_service_changes)

    def stop(self):
        self._running = False

    @inlineCallbacks
    def provide_machine(self, machine_state):
        machines = yield self.provider.start_machine(
            {"machine-id": machine_state.id})
        instance_id = machines[0].instance_id
        yield machine_state.set_instance_id(instance_id)

    def wait_on_expected_units(self, expected):
        """Returns deferred for waiting on `expected` unit names.

        These unit names may require the firewall to have ports opened
        and/or closed.
        """
        condition_met = Deferred()
        seen = set()

        def observer(unit_state):
            unit_name = unit_state.unit_name
            seen.add(unit_name)
            if seen >= expected:
                # Call the callback just once, since it is possible
                # for this condition to be satisfied multiple times in
                # using tests because of background activity
                if not condition_met.called:
                    condition_met.callback(True)
            return succeed(True)

        self.firewall_manager.add_open_close_ports_observer(observer)
        return condition_met

    def wait_on_expected_machines(self, expected):
        """Returns deferred for waiting on `expected` machine IDs.

        These machines may require the firewall to have ports opened
        and/or closed.
        """
        condition_met = Deferred()
        seen = set()

        def observer(machine_id):
            seen.add(machine_id)
            if seen >= expected:
                # Call the callback just once, since it is possible
                # for this condition to be satisfied multiple times in
                # using tests because of background activity
                if not condition_met.called:
                    condition_met.callback(True)
            return succeed(True)

        self.firewall_manager.add_open_close_ports_on_machine_observer(
            observer)
        return condition_met


class FirewallServiceTest(FirewallTestBase):

    @inlineCallbacks
    def test_service_exposed_flag_changes(self):
        """Verify that a service unit is checked whenever a change
        occurs such that ports may need to be opened and/or closed
        for the machine corresponding to a given service unit.
        """
        self.start()
        expected_units = self.wait_on_expected_units(
            set(["wordpress/0"]))
        wordpress = yield self.add_service("wordpress")
        yield wordpress.add_unit_state()
        yield wordpress.set_exposed_flag()
        self.assertTrue((yield expected_units))

        # Then clear the flag, see that it triggers on the expected units
        expected_units = self.wait_on_expected_units(
            set(["wordpress/0"]))
        yield wordpress.clear_exposed_flag()
        self.assertTrue((yield expected_units))

        # Re-expose wordpress: set the flag again, verify that it
        # triggers on the expected units
        expected_units = self.wait_on_expected_units(
            set(["wordpress/0"]))
        yield wordpress.set_exposed_flag()
        self.assertTrue((yield expected_units))
        self.stop()

    @inlineCallbacks
    def test_add_remove_service_units_for_exposed_service(self):
        """Verify that adding/removing service units for an exposed
        service triggers the appropriate firewall management of
        opening/closing ports on the machines for the corresponding
        service units.
        """
        self.start()
        wordpress = yield self.add_service("wordpress")
        yield wordpress.set_exposed_flag()

        # Adding service units to this exposed service will trigger
        expected_units = self.wait_on_expected_units(
            set(["wordpress/0", "wordpress/1"]))
        wordpress_0 = yield wordpress.add_unit_state()
        yield wordpress.add_unit_state()
        self.assertTrue((yield expected_units))

        # Removing service units will also trigger
        expected_units = self.wait_on_expected_units(
            set(["wordpress/2"]))
        yield wordpress.remove_unit_state(wordpress_0)
        yield wordpress.add_unit_state()
        self.assertTrue((yield expected_units))
        self.stop()

    @inlineCallbacks
    def test_open_close_ports(self):
        """Verify that opening/closing ports triggers the appropriate
        firewall management for the corresponding service units.
        """
        self.start()
        expected_units = self.wait_on_expected_units(
            set(["wordpress/0"]))
        wordpress = yield self.add_service("wordpress")
        yield wordpress.set_exposed_flag()
        wordpress_0 = yield wordpress.add_unit_state()
        wordpress_1 = yield wordpress.add_unit_state()
        yield wordpress.add_unit_state()
        yield wordpress_0.open_port(443, "tcp")
        yield wordpress_0.open_port(80, "tcp")
        yield wordpress_0.close_port(443, "tcp")
        self.assertTrue((yield expected_units))

        expected_units = self.wait_on_expected_units(
            set(["wordpress/1", "wordpress/3"]))
        wordpress_3 = yield wordpress.add_unit_state()
        yield wordpress_1.open_port(53, "udp")
        yield wordpress_3.open_port(80, "tcp")
        self.assertTrue((yield expected_units))

        expected_units = self.wait_on_expected_units(
            set(["wordpress/0", "wordpress/1", "wordpress/3"]))
        yield wordpress.clear_exposed_flag()
        self.assertTrue((yield expected_units))
        self.stop()

    @inlineCallbacks
    def test_remove_service_state(self):
        """Verify that firewall mgmt for corresponding service units
        is triggered upon the service's removal.
        """
        self.start()
        expected_units = self.wait_on_expected_units(
            set(["wordpress/0", "wordpress/1"]))
        wordpress = yield self.add_service("wordpress")
        yield wordpress.add_unit_state()
        yield wordpress.add_unit_state()
        yield wordpress.set_exposed_flag()
        self.assertTrue((yield expected_units))

        # Do not clear the exposed flag prior to removal, triggering
        # should still occur as expected
        yield self.service_state_manager.remove_service_state(wordpress)
        self.stop()

    @inlineCallbacks
    def test_port_mgmt_for_unexposed_service_is_a_nop(self):
        """Verify that activity on an unexposed service does NOT
        trigger firewall mgmt for the corresponding service unit."""
        self.start()
        expected_units = self.wait_on_expected_units(
            set(["not-called"]))
        wordpress = yield self.add_service("wordpress")
        wordpress_0 = yield wordpress.add_unit_state()
        yield wordpress_0.open_port(53, "tcp")
        # The observer should not be called in this case
        self.assertFalse(expected_units.called)
        self.stop()

    @inlineCallbacks
    def test_provisioning_agent_restart(self):
        """Verify that firewall management is correct if the agent restarts.

        In particular, this test verifies that all state relevant for
        firewall management is stored in ZK and not in the agent
        itself.
        """
        # Store into ZK relevant state, this might have been observed
        # in a scenario in which the agent has previously been
        # running.
        wordpress = yield self.add_service("wordpress")
        wordpress_0 = yield wordpress.add_unit_state()
        wordpress_1 = yield wordpress.add_unit_state()
        yield wordpress_1.open_port(443, "tcp")
        yield wordpress_1.open_port(80, "tcp")
        yield wordpress.set_exposed_flag()

        # Now simulate agent start
        self.start()

        # Verify the expected service units are observed as needing
        # firewall mgmt
        expected_units = self.wait_on_expected_units(
            set(["wordpress/0", "wordpress/1"]))
        yield wordpress_0.open_port(53, "udp")
        yield wordpress_1.close_port(443, "tcp")
        self.assertTrue((yield expected_units))

        # Also verify that opening/closing ports work as expected
        expected_units = self.wait_on_expected_units(
            set(["wordpress/1"]))
        yield wordpress_1.close_port(80, "tcp")

        expected_units = self.wait_on_expected_units(
            set(["wordpress/0", "wordpress/1"]))
        yield wordpress.clear_exposed_flag()
        self.assertTrue((yield expected_units))
        self.stop()


class FirewallMachineTest(FirewallTestBase):

    def add_machine_state(self):
        manager = MachineStateManager(self.client)
        return manager.add_machine_state(series_constraints)

    @inlineCallbacks
    def get_provider_ports(self, machine):
        instance_id = yield machine.get_instance_id()
        machine_provider = yield self.provider.get_machine(instance_id)
        provider_ports = yield self.provider.get_opened_ports(
            machine_provider, machine.id)
        returnValue(provider_ports)

    def test_open_close_ports_on_machine(self):
        """Verify opening/closing ports on a machine works properly.

        In particular this is done without watch support."""
        machine = yield self.add_machine_state()
        yield self.firewall_manager.process_machine(machine)

        # Expose a service
        wordpress = yield self.add_service("wordpress")
        yield wordpress.set_exposed_flag()
        wordpress_0 = yield wordpress.add_unit_state()
        yield wordpress_0.open_port(80, "tcp")
        yield wordpress_0.open_port(443, "tcp")
        yield wordpress_0.assign_to_machine(machine)
        yield self.firewall_manager.open_close_ports_on_machine(machine.id)
        self.assertEqual((yield self.get_provider_ports(machine)),
                         set([(80, "tcp"), (443, "tcp")]))
        self.assertIn("Opened 80/tcp on provider machine 0",
                      self.output.getvalue())
        self.assertIn("Opened 443/tcp on provider machine 0",
                      self.output.getvalue())

        # Now change port setup
        yield wordpress_0.open_port(8080, "tcp")
        yield wordpress_0.close_port(443, "tcp")
        yield self.firewall_manager.open_close_ports_on_machine(machine.id)
        self.assertEqual((yield self.get_provider_ports(machine)),
                         set([(80, "tcp"), (8080, "tcp")]))
        self.assertIn("Opened 8080/tcp on provider machine 0",
                      self.output.getvalue())
        self.assertIn("Closed 443/tcp on provider machine 0",
                      self.output.getvalue())

    @inlineCallbacks
    def test_open_close_ports_on_unassigned_machine(self):
        """Verify corner case that nothing happens on an unassigned machine."""
        machine = yield self.add_machine_state()
        yield self.provide_machine(machine)
        yield self.firewall_manager.process_machine(machine)
        yield self.firewall_manager.open_close_ports_on_machine(machine.id)
        self.assertEqual((yield self.get_provider_ports(machine)),
                         set())

    @inlineCallbacks
    def test_open_close_ports_on_machine_unexposed_service(self):
        """Verify opening/closing ports on a machine works properly.

        In particular this is done without watch support."""
        machine = yield self.add_machine_state()
        yield self.provide_machine(machine)
        wordpress = yield self.add_service("wordpress")
        wordpress_0 = yield wordpress.add_unit_state()

        # Port activity, but service is not exposed
        yield wordpress_0.open_port(80, "tcp")
        yield wordpress_0.open_port(443, "tcp")
        yield wordpress_0.assign_to_machine(machine)
        yield self.firewall_manager.open_close_ports_on_machine(machine.id)
        self.assertEqual((yield self.get_provider_ports(machine)),
                         set())

        # Now expose it
        yield wordpress.set_exposed_flag()
        yield self.firewall_manager.open_close_ports_on_machine(machine.id)
        self.assertEqual((yield self.get_provider_ports(machine)),
                         set([(80, "tcp"), (443, "tcp")]))

    @inlineCallbacks
    def test_open_close_ports_on_machine_not_yet_provided(self):
        """Verify that opening/closing ports will eventually succeed
        once a machine is provided.
        """
        machine = yield self.add_machine_state()
        wordpress = yield self.add_service("wordpress")
        yield wordpress.set_exposed_flag()
        wordpress_0 = yield wordpress.add_unit_state()
        yield wordpress_0.open_port(80, "tcp")
        yield wordpress_0.open_port(443, "tcp")
        yield wordpress_0.assign_to_machine(machine)

        # First attempt to open ports quietly fails (except for
        # logging) because the machine has not yet been provisioned
        yield self.firewall_manager.open_close_ports_on_machine(machine.id)
        self.assertIn("No provisioned machine for machine 0",
                      self.output.getvalue())

        yield self.provide_machine(machine)
        # Machine is now provisioned (normally visible in the
        # provisioning agent through periodic rescan and corresponding
        # watches)
        yield self.firewall_manager.open_close_ports_on_machine(machine.id)
        self.assertEqual((yield self.get_provider_ports(machine)),
                         set([(80, "tcp"), (443, "tcp")]))

    @inlineCallbacks
    def test_open_close_ports_in_stopped_agent_stops_watch(self):
        """Verify code called by watches properly stops when agent stops."""
        self.stop()
        yield self.assertFailure(
            self.firewall_manager.open_close_ports_on_machine(0),
            StopWatcher)

    @inlineCallbacks
    def test_watches_trigger_port_mgmt(self):
        """Verify that watches properly trigger firewall management
        for the corresponding service units on the corresponding
        machines.
        """
        self.start()

        # Immediately expose
        drupal = yield self.add_service("drupal")
        wordpress = yield self.add_service("wordpress")
        yield drupal.set_exposed_flag()
        yield wordpress.set_exposed_flag()

        # Then add these units
        drupal_0 = yield drupal.add_unit_state()
        wordpress_0 = yield wordpress.add_unit_state()
        wordpress_1 = yield wordpress.add_unit_state()
        wordpress_2 = yield wordpress.add_unit_state()

        # Assign some machines; in particular verify that multiple
        # service units on one machine works properly with opening
        # firewall
        machine_0 = yield self.add_machine_state()
        machine_1 = yield self.add_machine_state()
        machine_2 = yield self.add_machine_state()
        yield self.provide_machine(machine_0)
        yield self.provide_machine(machine_1)
        yield self.provide_machine(machine_2)

        yield drupal_0.assign_to_machine(machine_0)
        yield wordpress_0.assign_to_machine(machine_0)
        yield wordpress_1.assign_to_machine(machine_1)
        yield wordpress_2.assign_to_machine(machine_2)

        # Simulate service units opening ports
        expected_machines = self.wait_on_expected_machines(set([0, 1]))
        expected_units = self.wait_on_expected_units(
            set(["wordpress/0", "wordpress/1", "drupal/0"]))
        yield drupal_0.open_port(8080, "tcp")
        yield drupal_0.open_port(443, "tcp")
        yield wordpress_0.open_port(80, "tcp")
        yield wordpress_1.open_port(80, "tcp")
        self.assertTrue((yield expected_units))
        self.assertTrue((yield expected_machines))
        self.assertEqual((yield self.get_provider_ports(machine_0)),
                         set([(80, "tcp"), (443, "tcp"), (8080, "tcp")]))
        self.assertEqual((yield self.get_provider_ports(machine_1)),
                         set([(80, "tcp")]))

        # Simulate service units close port
        expected_machines = self.wait_on_expected_machines(set([1, 2]))
        yield wordpress_1.close_port(80, "tcp")
        yield wordpress_2.open_port(80, "tcp")
        self.assertTrue((yield expected_machines))
        self.assertEqual((yield self.get_provider_ports(machine_1)), set())

        # Simulate service units open port
        expected_machines = self.wait_on_expected_machines(set([0]))
        yield wordpress_0.open_port(53, "udp")
        self.assertTrue((yield expected_machines))
        self.assertEqual((yield self.get_provider_ports(machine_0)),
                         set([(53, "udp"), (80, "tcp"),
                              (443, "tcp"), (8080, "tcp")]))
        self.stop()

    @inlineCallbacks
    def test_late_expose_properly_triggers(self):
        """Verify that an expose flag properly cascades the
        corresponding watches to perform the desired firewall mgmt.
        """
        self.start()
        drupal = yield self.add_service("drupal")
        wordpress = yield self.add_service("wordpress")

        # Then add these units
        drupal_0 = yield drupal.add_unit_state()
        wordpress_0 = yield wordpress.add_unit_state()
        wordpress_1 = yield wordpress.add_unit_state()

        machine_0 = yield self.add_machine_state()
        machine_1 = yield self.add_machine_state()
        yield self.provide_machine(machine_0)
        yield self.provide_machine(machine_1)

        yield drupal_0.assign_to_machine(machine_0)
        yield wordpress_0.assign_to_machine(machine_0)
        yield wordpress_1.assign_to_machine(machine_1)

        # Simulate service units opening ports
        expected_machines = self.wait_on_expected_machines(set([0, 1]))
        expected_units = self.wait_on_expected_units(
            set(["wordpress/0", "wordpress/1"]))
        yield drupal_0.open_port(8080, "tcp")
        yield drupal_0.open_port(443, "tcp")
        yield wordpress_0.open_port(80, "tcp")
        yield wordpress_1.open_port(80, "tcp")
        yield wordpress.set_exposed_flag()
        self.assertTrue((yield expected_units))
        self.assertTrue((yield expected_machines))
        self.assertEqual((yield self.get_provider_ports(machine_0)),
                         set([(80, "tcp")]))
        self.assertEqual((yield self.get_provider_ports(machine_1)),
                         set([(80, "tcp")]))

        # Expose drupal service, verify ports are opened on provider
        expected_machines = self.wait_on_expected_machines(set([0]))
        expected_units = self.wait_on_expected_units(set(["drupal/0"]))
        yield drupal.set_exposed_flag()
        self.assertTrue((yield expected_machines))
        self.assertTrue((yield expected_units))
        self.assertEqual((yield self.get_provider_ports(machine_0)),
                         set([(80, "tcp"), (443, "tcp"), (8080, "tcp")]))

        # Unexpose drupal service, verify only wordpress ports are now opened
        expected_machines = self.wait_on_expected_machines(set([0]))
        expected_units = self.wait_on_expected_units(set(["drupal/0"]))
        yield drupal.clear_exposed_flag()
        self.assertTrue((yield expected_machines))
        self.assertTrue((yield expected_units))
        self.assertEqual((yield self.get_provider_ports(machine_0)),
                         set([(80, "tcp")]))

        # Re-expose drupal service, verify ports are once again opened
        expected_machines = self.wait_on_expected_machines(set([0]))
        expected_units = self.wait_on_expected_units(set(["drupal/0"]))
        yield drupal.set_exposed_flag()
        self.assertTrue((yield expected_machines))
        self.assertTrue((yield expected_units))
        self.assertEqual((yield self.get_provider_ports(machine_0)),
                         set([(80, "tcp"), (443, "tcp"), (8080, "tcp")]))
        self.stop()

    @inlineCallbacks
    def test_open_close_ports_on_machine_will_retry(self):
        """Verify port mgmt for a machine will retry if there's a failure."""
        mock_provider = self.mocker.patch(MachineProvider)
        mock_provider.open_port(MATCH_MACHINE, 0, 80, "tcp")
        self.mocker.result(fail(
                TypeError("'NoneType' object is not iterable")))
        mock_provider.open_port(MATCH_MACHINE, 0, 80, "tcp")
        self.mocker.result(fail(
                ProviderInteractionError("Some sort of EC2 problem")))
        mock_provider.open_port(MATCH_MACHINE, 0, 80, "tcp")
        self.mocker.passthrough()
        self.mocker.replay()

        machine = yield self.add_machine_state()
        yield self.provide_machine(machine)

        # Expose a service and attempt to open/close ports. The first
        # attempt will see the simulated failure.
        wordpress = yield self.add_service("wordpress")
        yield wordpress.set_exposed_flag()
        wordpress_0 = yield wordpress.add_unit_state()
        yield wordpress_0.assign_to_machine(machine)
        yield self.firewall_manager.process_machine(machine)

        yield wordpress_0.open_port(80, "tcp")
        yield self.firewall_manager.open_close_ports_on_machine(machine.id)
        self.assertEqual((yield self.get_provider_ports(machine)),
                         set())
        self.assertIn(
            "Got exception in opening/closing ports, will retry",
            self.output.getvalue())
        self.assertIn("TypeError: 'NoneType' object is not iterable",
                      self.output.getvalue())

        # Retries will now happen in the periodic recheck. First one
        # still fails due to simulated error.
        yield self.firewall_manager.process_machine(machine)
        self.assertEqual((yield self.get_provider_ports(machine)),
                         set())
        self.assertIn("ProviderInteractionError: Some sort of EC2 problem",
                      self.output.getvalue())

        # Third time is the charm in the mock setup, the recheck succeeds
        yield self.firewall_manager.process_machine(machine)
        self.assertEqual((yield self.get_provider_ports(machine)),
                         set([(80, "tcp")]))
        self.assertIn("Opened 80/tcp on provider machine 0",
                      self.output.getvalue())

    @inlineCallbacks
    def test_process_machine_ignores_stop_watcher(self):
        """Verify that process machine catches `StopWatcher`.

        `process_machine` calls `open_close_ports_on_machine`, which
        as verified in an earlier test, raises a `StopWatcher`
        exception to shutdown watches that use it in the event of
        agent shutdown. Verify this dual usage does not cause issues
        while the agent is being stopped for this usage.
        """
        mock_provider = self.mocker.patch(MachineProvider)
        mock_provider.open_port(MATCH_MACHINE, 0, 80, "tcp")
        self.mocker.result(fail(
                TypeError("'NoneType' object is not iterable")))
        self.mocker.replay()

        machine = yield self.add_machine_state()
        yield self.provide_machine(machine)

        # Expose a service and attempt to open/close ports. The first
        # attempt will see the simulated failure.
        wordpress = yield self.add_service("wordpress")
        yield wordpress.set_exposed_flag()
        wordpress_0 = yield wordpress.add_unit_state()
        yield wordpress_0.assign_to_machine(machine)
        yield self.firewall_manager.process_machine(machine)

        yield wordpress_0.open_port(80, "tcp")
        yield self.firewall_manager.open_close_ports_on_machine(machine.id)
        self.assertEqual((yield self.get_provider_ports(machine)),
                         set())
        self.assertIn(
            "Got exception in opening/closing ports, will retry",
            self.output.getvalue())
        self.assertIn("TypeError: 'NoneType' object is not iterable",
                      self.output.getvalue())

        # Stop the provisioning agent
        self.stop()

        # But retries can potentially still happening anyway, just
        # make certain nothing bad happens.
        yield self.firewall_manager.process_machine(machine)


class Observer(object):

    def __init__(self, calls, name):
        self.calls = calls
        self.name = name

    def __call__(self, obj):
        self.calls[0].add((self.name, obj))


class FirewallObserversTest(FirewallTestBase):

    @inlineCallbacks
    def test_observe_open_close_ports(self):
        """Verify one or more observers can be established on action."""
        wordpress = yield self.add_service("wordpress")
        wordpress_0 = yield wordpress.add_unit_state()
        yield wordpress.set_exposed_flag()

        # Add one observer, verify it gets called
        calls = [set()]
        self.firewall_manager.add_open_close_ports_observer(
            Observer(calls, "a"))
        yield self.firewall_manager.open_close_ports(wordpress_0)
        self.assertEqual(calls[0], set([("a", wordpress_0)]))

        # Reset records of calls, and then add a second observer.
        # Verify both get called.
        calls[0] = set()
        self.firewall_manager.add_open_close_ports_observer(
            Observer(calls, "b"))
        yield self.firewall_manager.open_close_ports(wordpress_0)
        self.assertEqual(
            calls[0],
            set([("a", wordpress_0), ("b", wordpress_0)]))

    @inlineCallbacks
    def test_observe_open_close_ports_on_machine(self):
        """Verify one or more observers can be established on action."""
        machine = yield self.add_machine_state()

        # Add one observer, verify it gets called
        calls = [set()]
        self.firewall_manager.add_open_close_ports_on_machine_observer(
            Observer(calls, "a"))
        yield self.firewall_manager.open_close_ports_on_machine(machine.id)
        self.assertEqual(calls[0], set([("a", machine.id)]))

        # Reset records of calls, and then add a second observer.
        # Verify both get called.
        calls[0] = set()
        self.firewall_manager.add_open_close_ports_on_machine_observer(
            Observer(calls, "b"))
        yield self.firewall_manager.open_close_ports_on_machine(machine.id)
        self.assertEqual(
            calls[0],
            set([("a", machine.id), ("b", machine.id)]))