~hazmat/pyjuju/proposed-support

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
import os
import sys

from twisted.internet import protocol
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
from twisted.python.failure import Failure

from juju import errors
from juju.state.errors import RelationStateNotFound
from juju.state.hook import RelationHookContext


class HookProtocol(protocol.ProcessProtocol):
    """Protocol used to communicate between the unit agent and hook process.

    This class manages events associated with the hook process,
    including its exit and status of file descriptors.
    """
    def __init__(self, hook_name, context, log=None):
        self._context = context
        self._hook_name = hook_name
        self._log = log

        # The process has exited. The value of this Deferred is
        # the exit code, and only if it is 0. Otherwise a
        # `CharmInvocationError` is communicated through this
        # Deferred.
        self.exited = Deferred()

        # The process has ended, that is, its file descriptors
        # are closed. Output can now be fully read. The deferred
        # semantics are the same as `exited` above.
        self.ended = Deferred()

    def outReceived(self, data):
        """Log `data` from stduout until the child process has ended."""
        self._log.info(data)

    def errReceived(self, data):
        """Log `data` from stderr until the child process has ended."""
        self._log.error(data)

    def _process_reason(self, reason, deferred):
        """Common code for working with both processEnded and processExited.

        The semantics of `exited` and `ended` are the same with
        respect to how they process the status code; the difference is
        when these events occur. For more, see :class:`Invoker`.
        """
        exit_code = reason.value.exitCode
        if exit_code == 0:
            return deferred.callback(exit_code)
        elif exit_code == None and reason.value.signal:
            error = errors.CharmInvocationError(
                self._hook_name, exit_code, signal=reason.value.signal)
        else:
            error = errors.CharmInvocationError(self._hook_name, exit_code)
        deferred.errback(error)

    def processExited(self, reason):
        """Called when the process has exited."""
        self._process_reason(reason, self.exited)

    def processEnded(self, reason):
        """Called when file descriptors for the process are closed."""
        self._process_reason(reason, self.ended)


class FormatSettingChanges(object):
    """Wrapper to delay executing __str_ of changes until written, if at all.

    :param list changes: Each change is a pair (`relation_ident`,
       `item`), where `item` may be an `AddedItem`, `DeletedItem`, or
       `ModifiedItem`. If `relation_ident` is None, this implies that
       it is a setting on the implied (or parent) context; it is
       sorted first and the relation_ident for the implied context is
       not logged.
    """
    def __init__(self, changes):
        self.changes = changes

    def __str__(self):
        changes = sorted(
            self.changes,
            key=lambda (relation_ident, item): (relation_ident, item.key))
        lines = []
        for relation_ident, item in changes:
            if relation_ident is None:
                lines.append("    %s" % str(item))
            else:
                lines.append("    %s on %r" % (str(item), relation_ident))
        return "\n".join(lines)


class Invoker(object):
    """Responsible for the execution and management of hook invocation.

    In a nutshell, *how* hooks are invoked, not *when* or *why*.

    Responsible for the following:

      * Manages socket connection with the unit agent.

      * Connects the child process stdout/stderr file descriptors to
        logging.

      * Handles the exit of the hook process, including reporting its
        exit code.

      * Cleans up resources of the hook process upon its exit.

    It's important to understand the difference between a process
    exiting and the process ending (using the terminology established
    by Twisted). Process exit is simple - this is the first event and
    occurs by the process returning its status code through the exit
    process. Normally process ending occurs very shortly thereafter,
    however, it may be briefly delayed because of pending writes to
    its file descriptors.

    In certain cases, however, hook scripts may invoke poorly written
    commands that fork child processes in the background that will
    wait around indefinitely, but do not close their file
    descriptors. In this case, it is the responsibility of the Invoker
    to wait briefly (for now hardcoded to 5 seconds), then reap such
    processes.
    """
    def __init__(self, context, change, client_id, socket_path,
                 unit_path, logger):
        """Takes the following arguments:

        `context`: an `juju.state.hook.HookContext`

        `change`: an `juju.state.hook.RelationChange`

        `client_id`: a string uniquely identifying a client session

        `socket_path`: the path to the UNIX Domain socket used by
           clients to communicate with the Unit Agent

        `logger`: instance of a `logging.Logger` object used to capture
           hook output
        """
        self.environment = {}
        self._context = context
        self._relation_contexts = {}
        self._change = change
        self._client_id = client_id
        self._socket_path = socket_path
        self._unit_path = unit_path
        self._log = logger

        # The twisted.internet.process.Process instance.
        self._process = None

        # The hook executable path
        self._process_executable = None

        # Deferred tracking whether the process HookProtocol is ended
        self._ended = None

        # When set, a delayed call that ensures the process is
        # properly terminated with loseConnection
        self._reaper = None

        # Add the initial context to the relation contexts if it's in
        # fact such
        if isinstance(context, RelationHookContext):
            self._relation_contexts[context.relation_ident] = context

    @inlineCallbacks
    def start(self):
        """Cache relation hook contexts for all relation idents."""
        # Get all relation idents (None means "all")
        relation_idents = set((yield self.get_relation_idents(None)))
        if isinstance(self._context, RelationHookContext):
            # Exclude the parent context for being looked up as a child
            relation_idents.discard(self._context.relation_ident)
            display_parent_relation_ident = " on %r" % \
                self._context.relation_ident
        else:
            display_parent_relation_ident = ""
        for relation_ident in relation_idents:
            child = yield self._context.get_relation_hook_context(
                relation_ident)
            self._relation_contexts[relation_ident] = child
        self._log.debug("Cached relation hook contexts%s: %r" % (
                display_parent_relation_ident,
                sorted(relation_idents)))

    @property
    def ended(self):
        return self._ended

    @property
    def unit_path(self):
        return self._unit_path

    def get_environment(self):
        """
        Returns the environment used to run the hook as a dict.
        Defaults are provided based on information passed to __init__.
        By setting keys inside Invoker.environment you can override
        the defaults or provide additional variables.
        """
        base = dict(JUJU_AGENT_SOCKET=self._socket_path,
                    JUJU_CLIENT_ID=self._client_id,
                    CHARM_DIR=os.path.join(self._unit_path, "charm"),
                    JUJU_UNIT_NAME=os.environ["JUJU_UNIT_NAME"],
                    DEBIAN_FRONTEND="noninteractive",
                    APT_LISTCHANGES_FRONTEND="none",
                    PATH=os.environ["PATH"],
                    JUJU_PYTHONPATH=":".join(sys.path))
        base.update(self.environment)

        return self.get_environment_from_change(base, self._change)

    def get_environment_from_change(self, env, change):
        """Supplement the default environment with dict with variables
        originating from the `change` argument to __init__.
        """
        return env

    def get_context(self):
        """Returns the hook context for the invocation."""
        return self._context

    def get_relation_hook_context(self, relation_ident):
        """Returns a hook context corresponding to `relation_ident`"""
        try:
            return self._relation_contexts[relation_ident]
        except KeyError:
            raise RelationStateNotFound()

    def get_relation_idents(self, relation_name):
        return self._context.get_relation_idents(relation_name)

    def validate_hook(self, hook_filename):
        """Verify that the hook_filename exists and is executable. """
        if not os.path.exists(hook_filename):
            raise errors.FileNotFound(hook_filename)

        if not os.access(hook_filename, os.X_OK):
            raise errors.CharmError(hook_filename,
                                      "hook is not executable")

    def send_signal(self, signal_id):
        """Send a signal of the given signal_id.

        `signal_id`: limited value interpretation, numeric signals
        ids are used as given, some values for symbolic string
        interpretation are available see
        ``twisted.internet.process._BaseProcess.signalProcess`` for
        additional details.

        Raises a `ValueError` if the process doesn't exist or
        `ProcessExitedAlready` if the process has already ended.
        """

        if not self._process:
            raise ValueError("No Process")
        return self._process.signalProcess(signal_id)

    def _ensure_process_termination(self, ignored):
        """Cancels any scheduled reaper and terminates hook process, if around.

        Canceling the reaper itself is necessary to ensure that
        deferreds like this are not left in the reactor. This would
        otherwise be the case for test that are awaiting the log, by
        using the `Invoker.end` deferred.
        """
        if self._reaper:
            if not self._reaper.called:
                self._reaper.cancel()
        self._process.loseConnection()

    @inlineCallbacks
    def _cleanup_process(self, hook, result):
        """Performs process cleanup:

           * Flushes any changes (eg relation settings maded by the
             hook)

           * Ensures that the result will be the exit code of the
             process (if 0), or the `CharmInvocationError` from the
             underlying `HookProtocol`, with cleaned up traceback.

           * Also schedules a reaper to be called later that ensures
             process termination.
        """
        message = result
        if isinstance(message, Failure):
            message = message.getTraceback(elideFrameworkCode=True)
        self._log.debug("hook %s exited, exit code %s." % (
                os.path.basename(hook), message))

        # Ensure that the process is terminated (via loseConnection)
        # no more than 5 seconds (arbitrary) after it exits, unless it
        # normally ends. If ended, the reaper is cancelled to ensure
        # it is not left in the reactor.
        #
        # The 5 seconds was chosen to make it vanishly small that
        # there would be any lost output (as might be *occasionally*
        # seen with a 50ms threshold in actual testing).
        from twisted.internet import reactor
        self._reaper = reactor.callLater(5, self._process.loseConnection)

        # Flush context changes back to zookeeper if hook was successful.
        if result == 0 and self._context:
            relation_setting_changes = []
            for context in self._relation_contexts.itervalues():
                changes = yield context.flush()
                if changes:
                    for change in changes:
                        if context is self._context:
                            relation_setting_changes.append((None, change))
                        else:
                            # Only log relation idents for relation settings
                            # on child relation hook contexts
                            relation_setting_changes.append(
                                (context.relation_ident, change))
            if relation_setting_changes:
                if hasattr(self._context, "relation_ident"):
                    display_parent_relation_ident = " on %r" % \
                        self._context.relation_ident
                else:
                    display_parent_relation_ident = ""
                self._log.debug(
                    "Flushed values for hook %r%s\n%s",
                    os.path.basename(hook),
                    display_parent_relation_ident,
                    FormatSettingChanges(relation_setting_changes))

        returnValue(result)

    def __call__(self, hook):
        """Execute `hook` in a runtime context and returns status code.

        The `hook` parameter should be a complete path to the desired
        executable. The returned value is a `Deferred` that is called
        when the hook exits.
        """
        # Sanity check the hook.
        self.validate_hook(hook)

        # Setup for actual invocation
        env = self.get_environment()
        hook_proto = HookProtocol(hook, self._context, self._log)
        exited = hook_proto.exited
        self._ended = ended = hook_proto.ended

        from twisted.internet import reactor
        self._process = reactor.spawnProcess(
            hook_proto, hook, [hook], env,
            os.path.join(self._unit_path, "charm"))

        # Manage cleanup after hook exits
        def cb_cleanup_process(result):
            return self._cleanup_process(hook, result)

        exited.addBoth(cb_cleanup_process)
        ended.addBoth(self._ensure_process_termination)
        return exited