3
from twisted.internet.defer import fail, inlineCallbacks, returnValue, succeed
3
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
4
4
from zookeeper import NoNodeException
6
from ensemble.environment.config import EnvironmentsConfig
7
from ensemble.errors import MachinesNotFound, ProviderInteractionError
8
from ensemble.lib.twistutils import concurrent_execution_guard
9
from ensemble.state.errors import (
10
MachineStateNotFound, ServiceStateNotFound, ServiceUnitStateNotFound,
11
StateChanged, StopWatcher)
12
from ensemble.state.machine import MachineStateManager
13
from ensemble.state.service import ServiceStateManager
6
from juju.environment.config import EnvironmentsConfig
7
from juju.errors import ProviderError
8
from juju.lib.twistutils import concurrent_execution_guard
9
from juju.state.errors import MachineStateNotFound, StateChanged, StopWatcher
10
from juju.state.firewall import FirewallManager
11
from juju.state.machine import MachineStateManager
12
from juju.state.service import ServiceStateManager
15
14
from .base import BaseAgent
18
log = logging.getLogger("ensemble.agents.provision")
24
# XXX All the port exposing/unexposing logic here should really live
25
# in its own class rather than poluting the provisioning agent
26
# itself. Please don't expand/copy this functionality without
27
# first moving it out.
17
log = logging.getLogger("juju.agents.provision")
29
20
class ProvisioningAgent(BaseAgent):
31
name = "ensemble-provisoning-agent"
22
name = "juju-provisoning-agent"
33
24
_current_machines = ()
235
230
instance_id = machines[0].instance_id
236
231
yield machine_state.set_instance_id(instance_id)
238
def cb_watch_assigned_units(old_units, new_units):
239
"""Watch assigned units for changes possibly require port mgmt.
241
This watch handles the scenario where a service or service
242
unit is removed from the topology. Because the service
243
unit is no longer in the topology, the corresponding watch
244
terminates and is unsable to open_close_ports in response
245
to the change. However, the specific machine watch will be
246
called, and that suffices to determine that its port
247
policy should be checked.
249
log.debug("Assigned units for machine %r: old=%r, new=%r",
250
machine_state.id, old_units, new_units)
251
return self.open_close_ports_on_machine(machine_state.id)
253
if self.get_watch_enabled() and \
254
machine_state.id not in self._watched_machines:
255
self._watched_machines.add(machine_state.id)
256
yield machine_state.watch_assigned_units(cb_watch_assigned_units)
233
# The firewall manager also needs to be checked for any
234
# outstanding retries on this machine
235
yield self.firewall_manager.process_machine(machine_state)
258
236
returnValue(instance_id)
261
def watch_service_changes(self, old_services, new_services):
262
"""Manage watching service exposed status.
264
This method is called upon every change to the set of services
265
currently deployed. All services are then watched for changes
266
to their exposed flag setting.
268
`old_services` is the set of services before this change;
269
`new_services` is the current set.
271
removed_services = old_services - new_services
272
for service_name in removed_services:
273
self._watched_services.pop(service_name, None)
274
for service_name in new_services:
275
yield self._setup_new_service_watch(service_name)
278
def _setup_new_service_watch(self, service_name):
279
"""Sets up the watching of the exposed flag for a new service.
281
If `service_name` is not watched (as known by
282
`self._watched_services`), adds the watch and a corresponding
283
entry in self._watched_services.
285
(This dict is necessary because there is currently no way to
286
introspect a service for whether it is watched or not.)
288
if service_name in self._watched_services:
289
return # already watched
290
self._watched_services[service_name] = NotExposed
292
service_state = yield self.service_state_manager.get_service_state(
294
except ServiceStateNotFound:
295
log.debug("Cannot setup watch, since service %r no longer exists",
297
self._watched_services.pop(service_name, None)
301
def cb_watch_service_exposed_flag(exposed):
302
if not self._running:
306
log.debug("Service %r is exposed", service_name)
308
log.debug("Service %r is unexposed", service_name)
311
unit_states = yield service_state.get_all_unit_states()
313
log.debug("Stopping watch on %r, no longer in topology",
316
for unit_state in unit_states:
317
yield self.open_close_ports(unit_state)
320
log.debug("Service %r is unexposed", service_name)
321
self._watched_services[service_name] = NotExposed
323
log.debug("Service %r is exposed", service_name)
324
self._watched_services[service_name] = set()
325
yield self._setup_service_unit_watch(service_state)
327
yield service_state.watch_exposed_flag(cb_watch_service_exposed_flag)
328
log.debug("Started watch of %r on changes to being exposed",
332
def _setup_service_unit_watch(self, service_state):
333
"""Setup watches on service units of newly exposed `service_name`."""
335
def cb_check_service_units(old_service_units, new_service_units):
336
watched_units = self._watched_services.get(
337
service_state.service_name, NotExposed)
338
if not self._running or watched_units is NotExposed:
341
removed_service_units = old_service_units - new_service_units
342
for unit_name in removed_service_units:
343
watched_units.discard(unit_name)
344
if not self._running:
347
unit_state = yield service_state.get_unit_state(unit_name)
348
except (ServiceUnitStateNotFound, StateChanged):
349
log.debug("Not setting up watch on %r, not in topology",
352
yield self.open_close_ports(unit_state)
354
for unit_name in new_service_units:
355
if unit_name not in watched_units:
356
watched_units.add(unit_name)
357
yield self._setup_watch_ports(service_state, unit_name)
359
yield service_state.watch_service_unit_states(cb_check_service_units)
360
log.debug("Started watch of service units for exposed service %r",
361
service_state.service_name)
364
def _setup_watch_ports(self, service_state, unit_name):
365
"""Setup the watching of ports for `unit_name`."""
367
unit_state = yield service_state.get_unit_state(unit_name)
368
except (ServiceUnitStateNotFound, StateChanged):
369
log.debug("Cannot setup watch on %r (no longer exists), ignoring",
374
def cb_watch_ports(value):
375
"""Permanently watch ports until service is no longer exposed."""
376
watched_units = self._watched_services.get(
377
service_state.service_name, NotExposed)
378
if (not self._running or watched_units is NotExposed or
379
unit_name not in watched_units):
380
log.debug("Stopping ports watch for %r", unit_name)
382
yield self.open_close_ports(unit_state)
384
yield unit_state.watch_ports(cb_watch_ports)
385
log.debug("Started watch of %r on changes to open ports", unit_name)
387
def set_open_close_ports_observer(self, observer):
388
"""Set `observer` for calls to `open_close_ports`.
390
The `observer` callback receives the service name and unit
391
name for each such call.
393
self._open_close_ports_observer = observer
396
def open_close_ports(self, unit_state):
397
"""Called upon changes that *may* open/close ports for a service unit.
399
if not self._running:
403
machine_id = yield unit_state.get_assigned_machine_id()
405
log.debug("Stopping watch, machine %r no longer in topology",
406
unit_state.unit_name)
408
if machine_id is not None:
409
yield self.open_close_ports_on_machine(machine_id)
411
# Ensure that the observation runs after the corresponding
412
# action completes. In particular, tests that use
413
# observation depend on this ordering to ensure that has
414
# happened before they can proceed.
415
if self._open_close_ports_observer is not None:
416
yield self._open_close_ports_observer(unit_state)
418
def set_open_close_ports_on_machine_observer(self, observer):
419
"""Set `observer` for calls to `open_close_ports`.
421
The `observer` callback receives the service name and unit
422
name for each such call.
424
self._open_close_ports_on_machine_observer = observer
427
def open_close_ports_on_machine(self, machine_id):
428
"""Called upon changes that *may* open/close ports for a machine.
430
This code supports multiple service units being assigned to a
431
machine; all service units are checked each time this is
432
called to determine the active set of ports to be opened.
434
if not self._running:
437
machine_state = yield self.machine_state_manager.get_machine_state(
439
instance_id = yield machine_state.get_instance_id()
440
machine = yield self.provider.get_machine(instance_id)
441
unit_states = yield machine_state.get_all_service_unit_states()
443
for unit_state in unit_states:
444
service_state = yield self.service_state_manager.\
445
get_service_state(unit_state.service_name)
446
exposed = yield service_state.get_exposed_flag()
448
ports = yield unit_state.get_open_ports()
451
(port["port"], port["proto"]))
452
current_ports = yield self.provider.get_opened_ports(
454
to_open = policy_ports - current_ports
455
to_close = current_ports - policy_ports
456
for port, proto in to_open:
457
yield self.provider.open_port(machine, machine_id, port, proto)
458
for port, proto in to_close:
459
yield self.provider.close_port(
460
machine, machine_id, port, proto)
461
except MachinesNotFound:
462
log.info("No provisioned machine for machine %r", machine_id)
464
# Ensure that the observation runs after the corresponding
465
# action completes. In particular, tests that use
466
# observation depend on this ordering to ensure that has
467
# happened before they can proceed.
468
if self._open_close_ports_on_machine_observer is not None:
469
yield self._open_close_ports_on_machine_observer(machine_id)
472
238
if __name__ == '__main__':
473
239
ProvisioningAgent().run()