~kirkland/pyjuju/tmux

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
import logging

from twisted.internet.defer import fail, inlineCallbacks, returnValue, succeed
from zookeeper import NoNodeException

from ensemble.environment.config import EnvironmentsConfig
from ensemble.errors import ProviderInteractionError
from ensemble.lib.twistutils import concurrent_execution_guard
from ensemble.state.errors import (
    MachineStateNotFound, ServiceStateNotFound, ServiceUnitStateNotFound,
    StateChanged, StopWatcher)
from ensemble.state.machine import MachineStateManager
from ensemble.state.service import ServiceStateManager

from .base import BaseAgent


log = logging.getLogger("ensemble.agents.provision")


NotExposed = object()


# XXX All the port exposing/unexposing logic here should really live
#     in its own class rather than poluting the provisioning agent
#     itself. Please don't expand/copy this functionality without
#     first moving it out.

class ProvisioningAgent(BaseAgent):

    name = "ensemble-provisoning-agent"

    _current_machines = ()

    # time in seconds
    machine_check_period = 60

    def get_agent_name(self):
        return "provision:%s" % (self.environment.type)

    @inlineCallbacks
    def start(self):
        self.environment = yield self.configure_environment()
        self.provider = self.environment.get_machine_provider()
        self.machine_state_manager = MachineStateManager(self.client)
        self.service_state_manager = ServiceStateManager(self.client)
        self._open_close_ports_observer = None
        self._open_close_ports_on_machine_observer = None
        self._running = True

        # Map service name to either NotExposed or set of exposed unit names.
        # If a service name is present in the dictionary, it means its
        # respective expose node is being watched.
        self._watched_services = {}

        # Track all currently watched machines, using machine ID.
        self._watched_machines = set()

        if self.get_watch_enabled():
            self.machine_state_manager.watch_machine_states(
                self.watch_machine_changes)
            self.service_state_manager.watch_service_states(
                self.watch_service_changes)
            from twisted.internet import reactor
            reactor.callLater(
                self.machine_check_period, self.periodic_machine_check)
            log.info("Started provisioning agent")
        else:
            log.info("Started provisioning agent without watches enabled")

    def stop(self):
        log.info("Stopping provisioning agent")
        self._running = False
        return succeed(True)

    @inlineCallbacks
    def configure_environment(self):
        """The provisioning agent configure its environment on start or change.

        The environment contains the configuration th agent needs to interact
        with its machine provider, in order to do its work. This configuration
        data is deployed lazily over an encrypted connection upon first usage.

        The agent waits for this data to exist before completing its startup.
        """
        try:
            get_d, watch_d = self.client.get_and_watch("/environment")
            environment_data, stat = yield get_d
            watch_d.addCallback(self._on_environment_changed)
        except NoNodeException:
            # Wait till the environment node appears. play twisted gymnastics
            exists_d, watch_d = self.client.exists_and_watch("/environment")
            stat = yield exists_d
            if stat:
                environment = yield self.configure_environment()
            else:
                watch_d.addCallback(
                    lambda result: self.configure_environment())
            if not stat:
                environment = yield watch_d
            returnValue(environment)

        config = EnvironmentsConfig()
        config.parse(environment_data)
        returnValue(config.get_default())

    @inlineCallbacks
    def _on_environment_changed(self, event):
        """Reload the environment if its data changes."""

        if event.type_name == "deleted":
            return

        self.environment = yield self.configure_environment()
        self.provider = self.environment.get_machine_provider()

    def periodic_machine_check(self):
        """A periodic checking of machine states and provider machines.

        In addition to the on demand changes to zookeeper states that are
        monitored by L{watch_machine_changes}, the periodic machine check
        performs non zookeeper state related verification by periodically
        checking the last current provider machine states against the
        last known zookeeper state.

        Primarily this helps in recovering from transient error conditions
        which may have prevent processing of an individual machine state, as
        well as verifying the current state of the provider's running machines
        against the zk state, thus pruning unused resources.
        """
        from twisted.internet import reactor
        d = self.process_machines(self._current_machines)
        d.addBoth(
            lambda result: reactor.callLater(
                self.machine_check_period, self.periodic_machine_check))
        return d

    def watch_machine_changes(self, old_machines, new_machines):
        """Watches and processes machine state changes.

        This function is used to subscribe to topology changes, and
        specifically changes to machines within the topology. It performs
        work against the machine provider to ensure that the currently
        running state of the ensemble cluster corresponds to the topology
        via creation and deletion of machines.

        The subscription utilized is a permanent one, meaning that this
        function will automatically be rescheduled to run whenever a topology
        state change happens that involves machines.

        This functional also caches the current set of machines as an agent
        instance attribute.

        @param old_machines machine ids as existed in the previous topology.
        @param new_machines machine ids as exist in the current topology.
        """
        if not self._running:
            return fail(StopWatcher())
        log.debug("Machines changed old:%s new:%s", old_machines, new_machines)
        self._current_machines = new_machines
        return self.process_machines(self._current_machines)

    @concurrent_execution_guard("_processing_machines")
    @inlineCallbacks
    def process_machines(self, current_machines):
        """Ensure the currently running machines correspond to state.

        At the end of each process_machines execution, verify that all
        running machines within the provider correspond to machine_ids within
        the topology. If they don't then shut them down.

        Utilizes concurrent execution guard, to ensure that this is only being
        executed at most once per process.
        """
        # XXX this is obviously broken, but the margins of 80 columns prevent
        # me from describing. hint think concurrent agents, and use a lock.

        # map of instance_id -> machine
        try:
            provider_machines = yield self.provider.list_machines()
        except ProviderInteractionError:
            log.exception("Cannot get machine list")
            return

        provider_machines = dict(
            [(m.instance_id, m) for m in provider_machines])

        instance_ids = []
        for machine_state_id in current_machines:
            try:
                instance_id = yield self.process_machine(
                    machine_state_id, provider_machines)
            except (StateChanged,
                    MachineStateNotFound,
                    ProviderInteractionError):
                log.exception("Cannot process machine %s", machine_state_id)
                continue
            instance_ids.append(instance_id)

        # Terminate all unused ensemble machines running within the cluster.
        unused = set(provider_machines.keys()) - set(instance_ids)
        for instance_id in unused:
            log.info("Shutting down machine id:%s ...", instance_id)
            machine = provider_machines[instance_id]
            try:
                yield self.provider.shutdown_machine(machine)
            except ProviderInteractionError:
                log.exception("Cannot shutdown machine %s", instance_id)
                continue

    @inlineCallbacks
    def process_machine(self, machine_state_id, provider_machine_map):
        """Ensure a provider machine for a machine state id.

        For each machine_id in new machines which represents the current state
        of the topology:

          * Check to ensure its state reflects that it has been
            launched. If it hasn't then create the machine and update
            the state.

          * Watch the machine's assigned services so that changes can
            be applied to the firewall for service exposing support.
        """
        # fetch the machine state
        machine_state = yield self.machine_state_manager.get_machine_state(
            machine_state_id)
        instance_id = yield machine_state.get_instance_id()

        # Verify a machine id has state and is running, else launch it.
        if instance_id is None or not instance_id in provider_machine_map:
            log.info("Starting machine id:%s ...", machine_state.id)
            machines = yield self.provider.start_machine(
                {"machine-id": machine_state.id})
            instance_id = machines[0].instance_id
            yield machine_state.set_instance_id(instance_id)

        def cb_watch_assigned_units(old_units, new_units):
            """Watch assigned units for changes possibly require port mgmt.

            This watch handles the scenario where a service or service
            unit is removed from the topology. Because the service
            unit is no longer in the topology, the corresponding watch
            terminates and is unsable to open_close_ports in response
            to the change. However, the specific machine watch will be
            called, and that suffices to determine that its port
            policy should be checked.
            """
            log.debug("Assigned units for machine %r: old=%r, new=%r",
                      machine_state.id, old_units, new_units)
            return self.open_close_ports_on_machine(machine_state.id)

        if self.get_watch_enabled() and \
                machine_state.id not in self._watched_machines:
            self._watched_machines.add(machine_state.id)
            yield machine_state.watch_assigned_units(cb_watch_assigned_units)

        returnValue(instance_id)

    @inlineCallbacks
    def watch_service_changes(self, old_services, new_services):
        """Manage watching service exposed status.

        This method is called upon every change to the set of services
        currently deployed. All services are then watched for changes
        to their exposed flag setting.

        `old_services` is the set of services before this change;
        `new_services` is the current set.
        """
        removed_services = old_services - new_services
        for service_name in removed_services:
            self._watched_services.pop(service_name, None)
        for service_name in new_services:
            yield self._setup_new_service_watch(service_name)

    @inlineCallbacks
    def _setup_new_service_watch(self, service_name):
        """Sets up the watching of the exposed flag for a new service.

        If `service_name` is not watched (as known by
        `self._watched_services`), adds the watch and a corresponding
        entry in self._watched_services.

        (This dict is necessary because there is currently no way to
        introspect a service for whether it is watched or not.)
        """
        if service_name in self._watched_services:
            return  # already watched
        self._watched_services[service_name] = NotExposed
        try:
            service_state = yield self.service_state_manager.get_service_state(
                service_name)
        except ServiceStateNotFound:
            log.debug("Cannot setup watch, since service %r no longer exists",
                      service_name)
            self._watched_services.pop(service_name, None)
            return

        @inlineCallbacks
        def cb_watch_service_exposed_flag(exposed):
            if not self._running:
                raise StopWatcher()

            if exposed:
                log.debug("Service %r is exposed", service_name)
            else:
                log.debug("Service %r is unexposed", service_name)

            try:
                unit_states = yield service_state.get_all_unit_states()
            except StateChanged:
                log.debug("Stopping watch on %r, no longer in topology",
                          service_name)
                raise StopWatcher()
            for unit_state in unit_states:
                yield self.open_close_ports(unit_state)

            if not exposed:
                self._watched_services[service_name] = NotExposed
            else:
                self._watched_services[service_name] = set()
                yield self._setup_service_unit_watch(service_state)

        yield service_state.watch_exposed_flag(cb_watch_service_exposed_flag)
        log.debug("Started watch of %r on changes to being exposed",
                  service_name)

    @inlineCallbacks
    def _setup_service_unit_watch(self, service_state):
        """Setup watches on service units of newly exposed `service_name`."""
        @inlineCallbacks
        def cb_check_service_units(old_service_units, new_service_units):
            watched_units = self._watched_services.get(
                service_state.service_name)
            if not self._running or watched_units is NotExposed:
                raise StopWatcher()

            removed_service_units = old_service_units - new_service_units
            for unit_name in removed_service_units:
                watched_units.discard(unit_name)
                if not self._running:
                    raise StopWatcher()
                try:
                    unit_state = yield service_state.get_unit_state(unit_name)
                except (ServiceUnitStateNotFound, StateChanged):
                    log.debug("Not setting up watch on %r, not in topology",
                              unit_name)
                    continue
                yield self.open_close_ports(unit_state)

            for unit_name in new_service_units:
                if unit_name not in watched_units:
                    watched_units.add(unit_name)
                    yield self._setup_watch_ports(service_state, unit_name)

        yield service_state.watch_service_unit_states(cb_check_service_units)
        log.debug("Started watch of service units for exposed service %r",
                 service_state.service_name)

    @inlineCallbacks
    def _setup_watch_ports(self, service_state, unit_name):
        """Setup the watching of ports for `unit_name`."""
        try:
            unit_state = yield service_state.get_unit_state(unit_name)
        except (ServiceUnitStateNotFound, StateChanged):
            log.debug("Cannot setup watch on %r (no longer exists), ignoring",
                      unit_name)
            return

        @inlineCallbacks
        def cb_watch_ports(value):
            """Permanently watch ports until service is no longer exposed."""
            watched_units = self._watched_services.get(
                service_state.service_name, NotExposed)
            if (not self._running or watched_units is NotExposed or
                unit_name not in watched_units):
                log.debug("Stopping ports watch for %r", unit_name)
                raise StopWatcher()
            yield self.open_close_ports(unit_state)

        yield unit_state.watch_ports(cb_watch_ports)
        log.debug("Started watch of %r on changes to open ports", unit_name)

    def set_open_close_ports_observer(self, observer):
        """Set `observer` for calls to `open_close_ports`.

        The `observer` callback receives the service name and unit
        name for each such call.
        """
        self._open_close_ports_observer = observer

    @inlineCallbacks
    def open_close_ports(self, unit_state):
        """Called upon changes that *may* open/close ports for a service unit.
        """
        if not self._running:
            raise StopWatcher()
        try:
            try:
                machine_id = yield unit_state.get_assigned_machine_id()
            except StateChanged:
                log.debug("Stopping watch, machine %r no longer in topology",
                          unit_state.unit_name)
                raise StopWatcher()
            if machine_id is not None:
                yield self.open_close_ports_on_machine(machine_id)
        finally:
            # Ensure that the observation runs after the corresponding
            # action completes.  In particular, tests that use
            # observation depend on this ordering to ensure that has
            # happened before they can proceed.
            if self._open_close_ports_observer is not None:
                yield self._open_close_ports_observer(unit_state)

    def set_open_close_ports_on_machine_observer(self, observer):
        """Set `observer` for calls to `open_close_ports`.

        The `observer` callback receives the service name and unit
        name for each such call.
        """
        self._open_close_ports_on_machine_observer = observer

    @inlineCallbacks
    def open_close_ports_on_machine(self, machine_id):
        """Called upon changes that *may* open/close ports for a machine.

        This code supports multiple service units being assigned to a
        machine; all service units are checked each time this is
        called to determine the active set of ports to be opened.
        """
        if not self._running:
            raise StopWatcher()
        try:
            machine_state = yield self.machine_state_manager.get_machine_state(
                machine_id)
            instance_id = yield machine_state.get_instance_id()
            machine = yield self.provider.get_machine(instance_id)
            unit_states = yield machine_state.get_all_service_unit_states()
            policy_ports = set()
            for unit_state in unit_states:
                service_state = yield self.service_state_manager.\
                    get_service_state(unit_state.service_name)
                exposed = yield service_state.get_exposed_flag()
                if exposed:
                    ports = yield unit_state.get_open_ports()
                    for port in ports:
                        policy_ports.add(
                            (port["port"], port["proto"]))

            current_ports = yield self.provider.get_opened_ports(machine)
            to_open = policy_ports - current_ports
            to_close = current_ports - policy_ports
            for port, proto in to_open:
                yield self.provider.open_port(machine, port, proto)
            for port, proto in to_close:
                yield self.provider.close_port(machine, port, proto)
        except ProviderInteractionError:
            log.info("No provisioned machine for machine %r", machine_id)
        finally:
            # Ensure that the observation runs after the corresponding
            # action completes.  In particular, tests that use
            # observation depend on this ordering to ensure that has
            # happened before they can proceed.
            if self._open_close_ports_on_machine_observer is not None:
                yield self._open_close_ports_on_machine_observer(machine_id)


if __name__ == '__main__':
    ProvisioningAgent().run()