~hazmat/pyjuju/security-policy-with-topology

« back to all changes in this revision

Viewing changes to juju/agents/provision.py

  • Committer: kapil.thangavelu at canonical
  • Date: 2012-03-31 03:34:10 UTC
  • mfrom: (316.1.11 states-with-principals)
  • Revision ID: kapil.thangavelu@canonical.com-20120331033410-mj2znr90wnm0j88b
merge trunk

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
import logging
2
2
 
3
 
from twisted.internet.defer import fail, inlineCallbacks, returnValue, succeed
 
3
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
4
4
from zookeeper import NoNodeException
5
5
 
6
 
from ensemble.environment.config import EnvironmentsConfig
7
 
from ensemble.errors import MachinesNotFound, ProviderInteractionError
8
 
from ensemble.lib.twistutils import concurrent_execution_guard
9
 
from ensemble.state.errors import (
10
 
    MachineStateNotFound, ServiceStateNotFound, ServiceUnitStateNotFound,
11
 
    StateChanged, StopWatcher)
12
 
from ensemble.state.machine import MachineStateManager
13
 
from ensemble.state.service import ServiceStateManager
 
6
from juju.environment.config import EnvironmentsConfig
 
7
from juju.errors import ProviderError
 
8
from juju.lib.twistutils import concurrent_execution_guard
 
9
from juju.state.errors import MachineStateNotFound, StateChanged, StopWatcher
 
10
from juju.state.firewall import FirewallManager
 
11
from juju.state.machine import MachineStateManager
 
12
from juju.state.service import ServiceStateManager
14
13
 
15
14
from .base import BaseAgent
16
15
 
17
16
 
18
 
log = logging.getLogger("ensemble.agents.provision")
19
 
 
20
 
 
21
 
NotExposed = object()
22
 
 
23
 
 
24
 
# XXX All the port exposing/unexposing logic here should really live
25
 
#     in its own class rather than poluting the provisioning agent
26
 
#     itself. Please don't expand/copy this functionality without
27
 
#     first moving it out.
 
17
log = logging.getLogger("juju.agents.provision")
 
18
 
28
19
 
29
20
class ProvisioningAgent(BaseAgent):
30
21
 
31
 
    name = "ensemble-provisoning-agent"
 
22
    name = "juju-provisoning-agent"
32
23
 
33
24
    _current_machines = ()
34
25
 
40
31
 
41
32
    @inlineCallbacks
42
33
    def start(self):
 
34
        self._running = True
 
35
 
43
36
        self.environment = yield self.configure_environment()
44
37
        self.provider = self.environment.get_machine_provider()
45
38
        self.machine_state_manager = MachineStateManager(self.client)
46
39
        self.service_state_manager = ServiceStateManager(self.client)
47
 
        self._open_close_ports_observer = None
48
 
        self._open_close_ports_on_machine_observer = None
49
 
        self._running = True
50
 
 
51
 
        # Map service name to either NotExposed or set of exposed unit names.
52
 
        # If a service name is present in the dictionary, it means its
53
 
        # respective expose node is being watched.
54
 
        self._watched_services = {}
55
 
 
56
 
        # Track all currently watched machines, using machine ID.
57
 
        self._watched_machines = set()
 
40
        self.firewall_manager = FirewallManager(
 
41
            self.client, self.is_running, self.provider)
58
42
 
59
43
        if self.get_watch_enabled():
60
44
            self.machine_state_manager.watch_machine_states(
61
45
                self.watch_machine_changes)
62
46
            self.service_state_manager.watch_service_states(
63
 
                self.watch_service_changes)
 
47
                self.firewall_manager.watch_service_changes)
64
48
            from twisted.internet import reactor
65
49
            reactor.callLater(
66
50
                self.machine_check_period, self.periodic_machine_check)
73
57
        self._running = False
74
58
        return succeed(True)
75
59
 
 
60
    def is_running(self):
 
61
        """Whether this agent is running or not."""
 
62
        return self._running
 
63
 
76
64
    @inlineCallbacks
77
65
    def configure_environment(self):
78
66
        """The provisioning agent configure its environment on start or change.
135
123
                self.machine_check_period, self.periodic_machine_check))
136
124
        return d
137
125
 
 
126
    @inlineCallbacks
138
127
    def watch_machine_changes(self, old_machines, new_machines):
139
128
        """Watches and processes machine state changes.
140
129
 
141
130
        This function is used to subscribe to topology changes, and
142
131
        specifically changes to machines within the topology. It performs
143
132
        work against the machine provider to ensure that the currently
144
 
        running state of the ensemble cluster corresponds to the topology
 
133
        running state of the juju cluster corresponds to the topology
145
134
        via creation and deletion of machines.
146
135
 
147
136
        The subscription utilized is a permanent one, meaning that this
155
144
        @param new_machines machine ids as exist in the current topology.
156
145
        """
157
146
        if not self._running:
158
 
            return fail(StopWatcher())
 
147
            raise StopWatcher()
159
148
        log.debug("Machines changed old:%s new:%s", old_machines, new_machines)
160
149
        self._current_machines = new_machines
161
 
        return self.process_machines(self._current_machines)
 
150
        try:
 
151
            yield self.process_machines(self._current_machines)
 
152
        except Exception:
 
153
            # Log and effectively retry later in periodic_machine_check
 
154
            log.exception(
 
155
                "Got unexpected exception in processing machines,"
 
156
                " will retry")
162
157
 
163
158
    @concurrent_execution_guard("_processing_machines")
164
159
    @inlineCallbacks
178
173
        # map of instance_id -> machine
179
174
        try:
180
175
            provider_machines = yield self.provider.get_machines()
181
 
        except ProviderInteractionError:
 
176
        except ProviderError:
182
177
            log.exception("Cannot get machine list")
183
178
            return
184
179
 
192
187
                    machine_state_id, provider_machines)
193
188
            except (StateChanged,
194
189
                    MachineStateNotFound,
195
 
                    ProviderInteractionError):
 
190
                    ProviderError):
196
191
                log.exception("Cannot process machine %s", machine_state_id)
197
192
                continue
198
193
            instance_ids.append(instance_id)
199
194
 
200
 
        # Terminate all unused ensemble machines running within the cluster.
 
195
        # Terminate all unused juju machines running within the cluster.
201
196
        unused = set(provider_machines.keys()) - set(instance_ids)
202
197
        for instance_id in unused:
203
198
            log.info("Shutting down machine id:%s ...", instance_id)
204
199
            machine = provider_machines[instance_id]
205
200
            try:
206
201
                yield self.provider.shutdown_machine(machine)
207
 
            except ProviderInteractionError:
 
202
            except ProviderError:
208
203
                log.exception("Cannot shutdown machine %s", instance_id)
209
204
                continue
210
205
 
235
230
            instance_id = machines[0].instance_id
236
231
            yield machine_state.set_instance_id(instance_id)
237
232
 
238
 
        def cb_watch_assigned_units(old_units, new_units):
239
 
            """Watch assigned units for changes possibly require port mgmt.
240
 
 
241
 
            This watch handles the scenario where a service or service
242
 
            unit is removed from the topology. Because the service
243
 
            unit is no longer in the topology, the corresponding watch
244
 
            terminates and is unsable to open_close_ports in response
245
 
            to the change. However, the specific machine watch will be
246
 
            called, and that suffices to determine that its port
247
 
            policy should be checked.
248
 
            """
249
 
            log.debug("Assigned units for machine %r: old=%r, new=%r",
250
 
                      machine_state.id, old_units, new_units)
251
 
            return self.open_close_ports_on_machine(machine_state.id)
252
 
 
253
 
        if self.get_watch_enabled() and \
254
 
                machine_state.id not in self._watched_machines:
255
 
            self._watched_machines.add(machine_state.id)
256
 
            yield machine_state.watch_assigned_units(cb_watch_assigned_units)
257
 
 
 
233
        # The firewall manager also needs to be checked for any
 
234
        # outstanding retries on this machine
 
235
        yield self.firewall_manager.process_machine(machine_state)
258
236
        returnValue(instance_id)
259
237
 
260
 
    @inlineCallbacks
261
 
    def watch_service_changes(self, old_services, new_services):
262
 
        """Manage watching service exposed status.
263
 
 
264
 
        This method is called upon every change to the set of services
265
 
        currently deployed. All services are then watched for changes
266
 
        to their exposed flag setting.
267
 
 
268
 
        `old_services` is the set of services before this change;
269
 
        `new_services` is the current set.
270
 
        """
271
 
        removed_services = old_services - new_services
272
 
        for service_name in removed_services:
273
 
            self._watched_services.pop(service_name, None)
274
 
        for service_name in new_services:
275
 
            yield self._setup_new_service_watch(service_name)
276
 
 
277
 
    @inlineCallbacks
278
 
    def _setup_new_service_watch(self, service_name):
279
 
        """Sets up the watching of the exposed flag for a new service.
280
 
 
281
 
        If `service_name` is not watched (as known by
282
 
        `self._watched_services`), adds the watch and a corresponding
283
 
        entry in self._watched_services.
284
 
 
285
 
        (This dict is necessary because there is currently no way to
286
 
        introspect a service for whether it is watched or not.)
287
 
        """
288
 
        if service_name in self._watched_services:
289
 
            return  # already watched
290
 
        self._watched_services[service_name] = NotExposed
291
 
        try:
292
 
            service_state = yield self.service_state_manager.get_service_state(
293
 
                service_name)
294
 
        except ServiceStateNotFound:
295
 
            log.debug("Cannot setup watch, since service %r no longer exists",
296
 
                      service_name)
297
 
            self._watched_services.pop(service_name, None)
298
 
            return
299
 
 
300
 
        @inlineCallbacks
301
 
        def cb_watch_service_exposed_flag(exposed):
302
 
            if not self._running:
303
 
                raise StopWatcher()
304
 
 
305
 
            if exposed:
306
 
                log.debug("Service %r is exposed", service_name)
307
 
            else:
308
 
                log.debug("Service %r is unexposed", service_name)
309
 
 
310
 
            try:
311
 
                unit_states = yield service_state.get_all_unit_states()
312
 
            except StateChanged:
313
 
                log.debug("Stopping watch on %r, no longer in topology",
314
 
                          service_name)
315
 
                raise StopWatcher()
316
 
            for unit_state in unit_states:
317
 
                yield self.open_close_ports(unit_state)
318
 
 
319
 
            if not exposed:
320
 
                log.debug("Service %r is unexposed", service_name)
321
 
                self._watched_services[service_name] = NotExposed
322
 
            else:
323
 
                log.debug("Service %r is exposed", service_name)
324
 
                self._watched_services[service_name] = set()
325
 
                yield self._setup_service_unit_watch(service_state)
326
 
 
327
 
        yield service_state.watch_exposed_flag(cb_watch_service_exposed_flag)
328
 
        log.debug("Started watch of %r on changes to being exposed",
329
 
                  service_name)
330
 
 
331
 
    @inlineCallbacks
332
 
    def _setup_service_unit_watch(self, service_state):
333
 
        """Setup watches on service units of newly exposed `service_name`."""
334
 
        @inlineCallbacks
335
 
        def cb_check_service_units(old_service_units, new_service_units):
336
 
            watched_units = self._watched_services.get(
337
 
                service_state.service_name, NotExposed)
338
 
            if not self._running or watched_units is NotExposed:
339
 
                raise StopWatcher()
340
 
 
341
 
            removed_service_units = old_service_units - new_service_units
342
 
            for unit_name in removed_service_units:
343
 
                watched_units.discard(unit_name)
344
 
                if not self._running:
345
 
                    raise StopWatcher()
346
 
                try:
347
 
                    unit_state = yield service_state.get_unit_state(unit_name)
348
 
                except (ServiceUnitStateNotFound, StateChanged):
349
 
                    log.debug("Not setting up watch on %r, not in topology",
350
 
                              unit_name)
351
 
                    continue
352
 
                yield self.open_close_ports(unit_state)
353
 
 
354
 
            for unit_name in new_service_units:
355
 
                if unit_name not in watched_units:
356
 
                    watched_units.add(unit_name)
357
 
                    yield self._setup_watch_ports(service_state, unit_name)
358
 
 
359
 
        yield service_state.watch_service_unit_states(cb_check_service_units)
360
 
        log.debug("Started watch of service units for exposed service %r",
361
 
                 service_state.service_name)
362
 
 
363
 
    @inlineCallbacks
364
 
    def _setup_watch_ports(self, service_state, unit_name):
365
 
        """Setup the watching of ports for `unit_name`."""
366
 
        try:
367
 
            unit_state = yield service_state.get_unit_state(unit_name)
368
 
        except (ServiceUnitStateNotFound, StateChanged):
369
 
            log.debug("Cannot setup watch on %r (no longer exists), ignoring",
370
 
                      unit_name)
371
 
            return
372
 
 
373
 
        @inlineCallbacks
374
 
        def cb_watch_ports(value):
375
 
            """Permanently watch ports until service is no longer exposed."""
376
 
            watched_units = self._watched_services.get(
377
 
                service_state.service_name, NotExposed)
378
 
            if (not self._running or watched_units is NotExposed or
379
 
                unit_name not in watched_units):
380
 
                log.debug("Stopping ports watch for %r", unit_name)
381
 
                raise StopWatcher()
382
 
            yield self.open_close_ports(unit_state)
383
 
 
384
 
        yield unit_state.watch_ports(cb_watch_ports)
385
 
        log.debug("Started watch of %r on changes to open ports", unit_name)
386
 
 
387
 
    def set_open_close_ports_observer(self, observer):
388
 
        """Set `observer` for calls to `open_close_ports`.
389
 
 
390
 
        The `observer` callback receives the service name and unit
391
 
        name for each such call.
392
 
        """
393
 
        self._open_close_ports_observer = observer
394
 
 
395
 
    @inlineCallbacks
396
 
    def open_close_ports(self, unit_state):
397
 
        """Called upon changes that *may* open/close ports for a service unit.
398
 
        """
399
 
        if not self._running:
400
 
            raise StopWatcher()
401
 
        try:
402
 
            try:
403
 
                machine_id = yield unit_state.get_assigned_machine_id()
404
 
            except StateChanged:
405
 
                log.debug("Stopping watch, machine %r no longer in topology",
406
 
                          unit_state.unit_name)
407
 
                raise StopWatcher()
408
 
            if machine_id is not None:
409
 
                yield self.open_close_ports_on_machine(machine_id)
410
 
        finally:
411
 
            # Ensure that the observation runs after the corresponding
412
 
            # action completes.  In particular, tests that use
413
 
            # observation depend on this ordering to ensure that has
414
 
            # happened before they can proceed.
415
 
            if self._open_close_ports_observer is not None:
416
 
                yield self._open_close_ports_observer(unit_state)
417
 
 
418
 
    def set_open_close_ports_on_machine_observer(self, observer):
419
 
        """Set `observer` for calls to `open_close_ports`.
420
 
 
421
 
        The `observer` callback receives the service name and unit
422
 
        name for each such call.
423
 
        """
424
 
        self._open_close_ports_on_machine_observer = observer
425
 
 
426
 
    @inlineCallbacks
427
 
    def open_close_ports_on_machine(self, machine_id):
428
 
        """Called upon changes that *may* open/close ports for a machine.
429
 
 
430
 
        This code supports multiple service units being assigned to a
431
 
        machine; all service units are checked each time this is
432
 
        called to determine the active set of ports to be opened.
433
 
        """
434
 
        if not self._running:
435
 
            raise StopWatcher()
436
 
        try:
437
 
            machine_state = yield self.machine_state_manager.get_machine_state(
438
 
                machine_id)
439
 
            instance_id = yield machine_state.get_instance_id()
440
 
            machine = yield self.provider.get_machine(instance_id)
441
 
            unit_states = yield machine_state.get_all_service_unit_states()
442
 
            policy_ports = set()
443
 
            for unit_state in unit_states:
444
 
                service_state = yield self.service_state_manager.\
445
 
                    get_service_state(unit_state.service_name)
446
 
                exposed = yield service_state.get_exposed_flag()
447
 
                if exposed:
448
 
                    ports = yield unit_state.get_open_ports()
449
 
                    for port in ports:
450
 
                        policy_ports.add(
451
 
                            (port["port"], port["proto"]))
452
 
            current_ports = yield self.provider.get_opened_ports(
453
 
                machine, machine_id)
454
 
            to_open = policy_ports - current_ports
455
 
            to_close = current_ports - policy_ports
456
 
            for port, proto in to_open:
457
 
                yield self.provider.open_port(machine, machine_id, port, proto)
458
 
            for port, proto in to_close:
459
 
                yield self.provider.close_port(
460
 
                    machine, machine_id, port, proto)
461
 
        except MachinesNotFound:
462
 
            log.info("No provisioned machine for machine %r", machine_id)
463
 
        finally:
464
 
            # Ensure that the observation runs after the corresponding
465
 
            # action completes.  In particular, tests that use
466
 
            # observation depend on this ordering to ensure that has
467
 
            # happened before they can proceed.
468
 
            if self._open_close_ports_on_machine_observer is not None:
469
 
                yield self._open_close_ports_on_machine_observer(machine_id)
470
 
 
471
 
 
472
238
if __name__ == '__main__':
473
239
    ProvisioningAgent().run()