1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
|
import logging
from twisted.internet.defer import fail, inlineCallbacks, returnValue, succeed
from zookeeper import NoNodeException
from ensemble.environment.config import EnvironmentsConfig
from ensemble.errors import ProviderInteractionError
from ensemble.lib.twistutils import concurrent_execution_guard
from ensemble.state.errors import (
MachineStateNotFound, ServiceStateNotFound, ServiceUnitStateNotFound,
StateChanged, StopWatcher)
from ensemble.state.machine import MachineStateManager
from ensemble.state.service import ServiceStateManager
from .base import BaseAgent
log = logging.getLogger("ensemble.agents.provision")
NotExposed = object()
# XXX All the port exposing/unexposing logic here should really live
# in its own class rather than poluting the provisioning agent
# itself. Please don't expand/copy this functionality without
# first moving it out.
class ProvisioningAgent(BaseAgent):
name = "ensemble-provisoning-agent"
_current_machines = ()
# time in seconds
machine_check_period = 60
def get_agent_name(self):
return "provision:%s" % (self.environment.type)
@inlineCallbacks
def start(self):
self.environment = yield self.configure_environment()
self.provider = self.environment.get_machine_provider()
self.machine_state_manager = MachineStateManager(self.client)
self.service_state_manager = ServiceStateManager(self.client)
self._open_close_ports_observer = None
self._open_close_ports_on_machine_observer = None
self._running = True
# Map service name to either NotExposed or set of exposed unit names.
# If a service name is present in the dictionary, it means its
# respective expose node is being watched.
self._watched_services = {}
# Track all currently watched machines, using machine ID.
self._watched_machines = set()
if self.get_watch_enabled():
self.machine_state_manager.watch_machine_states(
self.watch_machine_changes)
self.service_state_manager.watch_service_states(
self.watch_service_changes)
from twisted.internet import reactor
reactor.callLater(
self.machine_check_period, self.periodic_machine_check)
log.info("Started provisioning agent")
else:
log.info("Started provisioning agent without watches enabled")
def stop(self):
log.info("Stopping provisioning agent")
self._running = False
return succeed(True)
@inlineCallbacks
def configure_environment(self):
"""The provisioning agent configure its environment on start or change.
The environment contains the configuration th agent needs to interact
with its machine provider, in order to do its work. This configuration
data is deployed lazily over an encrypted connection upon first usage.
The agent waits for this data to exist before completing its startup.
"""
try:
get_d, watch_d = self.client.get_and_watch("/environment")
environment_data, stat = yield get_d
watch_d.addCallback(self._on_environment_changed)
except NoNodeException:
# Wait till the environment node appears. play twisted gymnastics
exists_d, watch_d = self.client.exists_and_watch("/environment")
stat = yield exists_d
if stat:
environment = yield self.configure_environment()
else:
watch_d.addCallback(
lambda result: self.configure_environment())
if not stat:
environment = yield watch_d
returnValue(environment)
config = EnvironmentsConfig()
config.parse(environment_data)
returnValue(config.get_default())
@inlineCallbacks
def _on_environment_changed(self, event):
"""Reload the environment if its data changes."""
if event.type_name == "deleted":
return
self.environment = yield self.configure_environment()
self.provider = self.environment.get_machine_provider()
def periodic_machine_check(self):
"""A periodic checking of machine states and provider machines.
In addition to the on demand changes to zookeeper states that are
monitored by L{watch_machine_changes}, the periodic machine check
performs non zookeeper state related verification by periodically
checking the last current provider machine states against the
last known zookeeper state.
Primarily this helps in recovering from transient error conditions
which may have prevent processing of an individual machine state, as
well as verifying the current state of the provider's running machines
against the zk state, thus pruning unused resources.
"""
from twisted.internet import reactor
d = self.process_machines(self._current_machines)
d.addBoth(
lambda result: reactor.callLater(
self.machine_check_period, self.periodic_machine_check))
return d
def watch_machine_changes(self, old_machines, new_machines):
"""Watches and processes machine state changes.
This function is used to subscribe to topology changes, and
specifically changes to machines within the topology. It performs
work against the machine provider to ensure that the currently
running state of the ensemble cluster corresponds to the topology
via creation and deletion of machines.
The subscription utilized is a permanent one, meaning that this
function will automatically be rescheduled to run whenever a topology
state change happens that involves machines.
This functional also caches the current set of machines as an agent
instance attribute.
@param old_machines machine ids as existed in the previous topology.
@param new_machines machine ids as exist in the current topology.
"""
if not self._running:
return fail(StopWatcher())
log.debug("Machines changed old:%s new:%s", old_machines, new_machines)
self._current_machines = new_machines
return self.process_machines(self._current_machines)
@concurrent_execution_guard("_processing_machines")
@inlineCallbacks
def process_machines(self, current_machines):
"""Ensure the currently running machines correspond to state.
At the end of each process_machines execution, verify that all
running machines within the provider correspond to machine_ids within
the topology. If they don't then shut them down.
Utilizes concurrent execution guard, to ensure that this is only being
executed at most once per process.
"""
# XXX this is obviously broken, but the margins of 80 columns prevent
# me from describing. hint think concurrent agents, and use a lock.
# map of instance_id -> machine
try:
provider_machines = yield self.provider.list_machines()
except ProviderInteractionError:
log.exception("Cannot get machine list")
return
provider_machines = dict(
[(m.instance_id, m) for m in provider_machines])
instance_ids = []
for machine_state_id in current_machines:
try:
instance_id = yield self.process_machine(
machine_state_id, provider_machines)
except (StateChanged,
MachineStateNotFound,
ProviderInteractionError):
log.exception("Cannot process machine %s", machine_state_id)
continue
instance_ids.append(instance_id)
# Terminate all unused ensemble machines running within the cluster.
unused = set(provider_machines.keys()) - set(instance_ids)
for instance_id in unused:
log.info("Shutting down machine id:%s ...", instance_id)
machine = provider_machines[instance_id]
try:
yield self.provider.shutdown_machine(machine)
except ProviderInteractionError:
log.exception("Cannot shutdown machine %s", instance_id)
continue
@inlineCallbacks
def process_machine(self, machine_state_id, provider_machine_map):
"""Ensure a provider machine for a machine state id.
For each machine_id in new machines which represents the current state
of the topology:
* Check to ensure its state reflects that it has been
launched. If it hasn't then create the machine and update
the state.
* Watch the machine's assigned services so that changes can
be applied to the firewall for service exposing support.
"""
# fetch the machine state
machine_state = yield self.machine_state_manager.get_machine_state(
machine_state_id)
instance_id = yield machine_state.get_instance_id()
# Verify a machine id has state and is running, else launch it.
if instance_id is None or not instance_id in provider_machine_map:
log.info("Starting machine id:%s ...", machine_state.id)
machines = yield self.provider.start_machine(
{"machine-id": machine_state.id})
instance_id = machines[0].instance_id
yield machine_state.set_instance_id(instance_id)
def cb_watch_assigned_units(old_units, new_units):
"""Watch assigned units for changes possibly require port mgmt.
This watch handles the scenario where a service or service
unit is removed from the topology. Because the service
unit is no longer in the topology, the corresponding watch
terminates and is unsable to open_close_ports in response
to the change. However, the specific machine watch will be
called, and that suffices to determine that its port
policy should be checked.
"""
log.debug("Assigned units for machine %r: old=%r, new=%r",
machine_state.id, old_units, new_units)
return self.open_close_ports_on_machine(machine_state.id)
if self.get_watch_enabled() and \
machine_state.id not in self._watched_machines:
self._watched_machines.add(machine_state.id)
yield machine_state.watch_assigned_units(cb_watch_assigned_units)
returnValue(instance_id)
@inlineCallbacks
def watch_service_changes(self, old_services, new_services):
"""Manage watching service exposed status.
This method is called upon every change to the set of services
currently deployed. All services are then watched for changes
to their exposed flag setting.
`old_services` is the set of services before this change;
`new_services` is the current set.
"""
removed_services = old_services - new_services
for service_name in removed_services:
self._watched_services.pop(service_name, None)
for service_name in new_services:
yield self._setup_new_service_watch(service_name)
@inlineCallbacks
def _setup_new_service_watch(self, service_name):
"""Sets up the watching of the exposed flag for a new service.
If `service_name` is not watched (as known by
`self._watched_services`), adds the watch and a corresponding
entry in self._watched_services.
(This dict is necessary because there is currently no way to
introspect a service for whether it is watched or not.)
"""
if service_name in self._watched_services:
return # already watched
self._watched_services[service_name] = NotExposed
try:
service_state = yield self.service_state_manager.get_service_state(
service_name)
except ServiceStateNotFound:
log.debug("Cannot setup watch, since service %r no longer exists",
service_name)
self._watched_services.pop(service_name, None)
return
@inlineCallbacks
def cb_watch_service_exposed_flag(exposed):
if not self._running:
raise StopWatcher()
if exposed:
log.debug("Service %r is exposed", service_name)
else:
log.debug("Service %r is unexposed", service_name)
try:
unit_states = yield service_state.get_all_unit_states()
except StateChanged:
log.debug("Stopping watch on %r, no longer in topology",
service_name)
raise StopWatcher()
for unit_state in unit_states:
yield self.open_close_ports(unit_state)
if not exposed:
self._watched_services[service_name] = NotExposed
else:
self._watched_services[service_name] = set()
yield self._setup_service_unit_watch(service_state)
yield service_state.watch_exposed_flag(cb_watch_service_exposed_flag)
log.debug("Started watch of %r on changes to being exposed",
service_name)
@inlineCallbacks
def _setup_service_unit_watch(self, service_state):
"""Setup watches on service units of newly exposed `service_name`."""
@inlineCallbacks
def cb_check_service_units(old_service_units, new_service_units):
watched_units = self._watched_services.get(
service_state.service_name)
if not self._running or watched_units is NotExposed:
raise StopWatcher()
removed_service_units = old_service_units - new_service_units
for unit_name in removed_service_units:
watched_units.discard(unit_name)
if not self._running:
raise StopWatcher()
try:
unit_state = yield service_state.get_unit_state(unit_name)
except (ServiceUnitStateNotFound, StateChanged):
log.debug("Not setting up watch on %r, not in topology",
unit_name)
continue
yield self.open_close_ports(unit_state)
for unit_name in new_service_units:
if unit_name not in watched_units:
watched_units.add(unit_name)
yield self._setup_watch_ports(service_state, unit_name)
yield service_state.watch_service_unit_states(cb_check_service_units)
log.debug("Started watch of service units for exposed service %r",
service_state.service_name)
@inlineCallbacks
def _setup_watch_ports(self, service_state, unit_name):
"""Setup the watching of ports for `unit_name`."""
try:
unit_state = yield service_state.get_unit_state(unit_name)
except (ServiceUnitStateNotFound, StateChanged):
log.debug("Cannot setup watch on %r (no longer exists), ignoring",
unit_name)
return
@inlineCallbacks
def cb_watch_ports(value):
"""Permanently watch ports until service is no longer exposed."""
watched_units = self._watched_services.get(
service_state.service_name, NotExposed)
if (not self._running or watched_units is NotExposed or
unit_name not in watched_units):
log.debug("Stopping ports watch for %r", unit_name)
raise StopWatcher()
yield self.open_close_ports(unit_state)
yield unit_state.watch_ports(cb_watch_ports)
log.debug("Started watch of %r on changes to open ports", unit_name)
def set_open_close_ports_observer(self, observer):
"""Set `observer` for calls to `open_close_ports`.
The `observer` callback receives the service name and unit
name for each such call.
"""
self._open_close_ports_observer = observer
@inlineCallbacks
def open_close_ports(self, unit_state):
"""Called upon changes that *may* open/close ports for a service unit.
"""
if not self._running:
raise StopWatcher()
try:
try:
machine_id = yield unit_state.get_assigned_machine_id()
except StateChanged:
log.debug("Stopping watch, machine %r no longer in topology",
unit_state.unit_name)
raise StopWatcher()
if machine_id is not None:
yield self.open_close_ports_on_machine(machine_id)
finally:
# Ensure that the observation runs after the corresponding
# action completes. In particular, tests that use
# observation depend on this ordering to ensure that has
# happened before they can proceed.
if self._open_close_ports_observer is not None:
yield self._open_close_ports_observer(unit_state)
def set_open_close_ports_on_machine_observer(self, observer):
"""Set `observer` for calls to `open_close_ports`.
The `observer` callback receives the service name and unit
name for each such call.
"""
self._open_close_ports_on_machine_observer = observer
@inlineCallbacks
def open_close_ports_on_machine(self, machine_id):
"""Called upon changes that *may* open/close ports for a machine.
This code supports multiple service units being assigned to a
machine; all service units are checked each time this is
called to determine the active set of ports to be opened.
"""
if not self._running:
raise StopWatcher()
try:
machine_state = yield self.machine_state_manager.get_machine_state(
machine_id)
instance_id = yield machine_state.get_instance_id()
machine = yield self.provider.get_machine(instance_id)
unit_states = yield machine_state.get_all_service_unit_states()
policy_ports = set()
for unit_state in unit_states:
service_state = yield self.service_state_manager.\
get_service_state(unit_state.service_name)
exposed = yield service_state.get_exposed_flag()
if exposed:
ports = yield unit_state.get_open_ports()
for port in ports:
policy_ports.add(
(port["port"], port["proto"]))
current_ports = yield self.provider.get_opened_ports(machine)
to_open = policy_ports - current_ports
to_close = current_ports - policy_ports
for port, proto in to_open:
yield self.provider.open_port(machine, port, proto)
for port, proto in to_close:
yield self.provider.close_port(machine, port, proto)
except ProviderInteractionError:
log.info("No provisioned machine for machine %r", machine_id)
finally:
# Ensure that the observation runs after the corresponding
# action completes. In particular, tests that use
# observation depend on this ordering to ensure that has
# happened before they can proceed.
if self._open_close_ports_on_machine_observer is not None:
yield self._open_close_ports_on_machine_observer(machine_id)
if __name__ == '__main__':
ProvisioningAgent().run()
|