~jderose/dmedia/non-suck

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
# Authors:
#   Jason Gerard DeRose <jderose@novacut.com>
#
# dmedia: distributed media library
# Copyright (C) 2010 Jason Gerard DeRose <jderose@novacut.com>
#
# This file is part of `dmedia`.
#
# `dmedia` is free software: you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# `dmedia` is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with `dmedia`.  If not, see <http://www.gnu.org/licenses/>.

"""
Multi-process workers.

This module implements the `Manager` and `Worker` classes that add some
conveniences on top of the Python multiprocessing module.

Any heavy lifting dmedia does (importing, downloading, uploading, verifying,
etc) is done in a subprocess created with multiprocessing.Process().  This
allows us to fully utilize multicore processors, plus makes dmedia more robust
as things can go horribly wrong in a Worker without crashing the main process.
It also keeps the memory footprint of the main process smaller and more stable
over time, which is important as dmedia is a long running process (whether as a
DBus service or a pure server).

To start a new `Worker`, a `Manager` launches a new process and passes that
process a description of the job, plus a queue that the `Worker` uses to send
signals to the `Manager`.  The communication is one-way: once the `Worker` is
started, the `Manager` has no way to signal the `Worker`, the only thing it can
do is kill the `Worker`.

Workers need to be atomic, must be able to be killed at any time without leaving
the job in an undefined state.  Some workers (like `DownloadWorker`) will
themselves intelligently resume a job where they left off (resume the download).
Other workers (like `ImportWorker`) will simply start over from the beginning.
But either way, the status of the job itself must be atomic: it's finished, or
it's not, with no gray area.

A `Worker` sends signals to the `Manager` over the queue.  These signals are
largely used to provide UI status updates (stuff like a progress bar for a
specific file being downloaded).  Typically these signals will be emitted over
DBus, which is how apps built on dmedia will show progress, status, etc to their
users.  But as the core dmedia bits also need to run on headless servers, the
`Manager` is kept away from the details, instead is created with an optional
callback that is called to pass a signal to "higher level code", whatever that
happens to be.
"""

import multiprocessing
from multiprocessing import current_process
from threading import Thread, Lock
from queue import Empty
import logging

from microfiber import Database

from .constants import TYPE_ERROR


log = logging.getLogger()
_workers = {}


def isregistered(worker):
    if not (isinstance(worker, type) and issubclass(worker, Worker)):
        raise TypeError(
            'worker: must be subclass of %r; got %r' % (Worker, worker)
        )
    name = worker.__name__
    return (name in _workers)


def register(worker):
    if not (isinstance(worker, type) and issubclass(worker, Worker)):
        raise TypeError(
            'worker: must be subclass of %r; got %r' % (Worker, worker)
        )
    name = worker.__name__
    if name in _workers:
        raise ValueError(
            'cannot register %r, worker with name %r already registered' % (
                worker, name
            )
        )
    _workers[name] = worker


def exception_name(exception):
    """
    Return name of ``Exception`` subclass or instance *exception*.

    Works with ``Exception`` instances:

    >>> exception_name(ValueError('bad value!'))
    'ValueError'

    And with ``Exception`` subclasses:

    >>> exception_name(ValueError)
    'ValueError'

    """
    if isinstance(exception, Exception):
        return exception.__class__.__name__
    return exception.__name__


def dispatch(worker, env, q, key, args):
    """
    Dispatch a worker in this proccess.

    This function will create an instance of the appropriate `Worker` subclass
    and call its run() method.

    If this function catches an exception, it will be logged and then forwarded
    to the `Manager` through the queue via the "error" signal.

    Unless something goes spectacularly wrong, this function will always send
    a "terminate" signal to the `Manager`, even if the worker crashes or a
    `Worker` subclass named *worker* isn't registered.

    :param worker: name of worker class, eg ``'ImportWorker'``
    :param env: a ``dict`` containing run-time information like the CouchDB URL
    :param q: a ``multiprocessing.Queue`` or similar
    :param key: a key to uniquely identify this worker among active workers
        controlled by the `Manager` that launched this worker
    :param args: arguments to be passed to `Worker.run()`
    """
    pid = current_process().pid
    log.debug('** dispatch in process %d: worker=%r, key=%r, args=%r',
        pid, worker, key, args
    )
    try:
        klass = _workers[worker]
        inst = klass(env, q, key, args)
        inst.run()
    except Exception as e:
        log.exception('exception in procces %d, worker=%r', pid, worker)
        q.put(dict(
            signal='error',
            args=(key, exception_name(e), str(e)),
            worker=worker,
            pid=pid,
        ))
    finally:
        q.put(dict(
            signal='terminate',
            args=(key,),
            worker=worker,
            pid=pid,
        ))


class Worker(object):
    """
    Just a workin' class class.

    To create a worker, just subclass like this and override the
    `Worker.execute()` method:

    >>> class MyWorker(Worker):
    ...     def execute(self, junk):
    ...         self.emit('my_signal', 'doing stuff with junk')
    ...

    You must be explicitly registered your worker with `register()` for
    `dispatch()` to be able to create instances of your worker in a sub process.
    It's best to first check if your worker is registered, like this:

    >>> if not isregistered(MyWorker):
    ...     register(MyWorker)
    ...

    Ideally, the corresponding `Manager` subclass should insure all its workers
    are registered, typically in its __init__() method, like this:

    >>> class MyManager(Manager):
    ...     def __init__(self, env, callback=None):
    ...         super(MyManager, self).__init__(env, callback)
    ...         for klass in [MyWorker]:
    ...             if not isregistered(klass):
    ...                 register(klass)
    ...
    >>> manager = MyManager({})

    :param env: a ``dict`` containing run-time information like the CouchDB URL
    :param q: a ``multiprocessing.Queue`` or similar
    :param key: a key to uniquely identify this worker among active workers
        controlled by the `Manager` that launched this worker
    :param args: arguments to be passed to `Worker.run()`
    """
    def __init__(self, env, q, key, args):
        self.env = env
        self.q = q
        self.key = key
        self.args = args
        self.pid = current_process().pid
        self.name = self.__class__.__name__

    def emit(self, signal, *args):
        """
        Put *signal* into message queue, optionally with *args*.

        To aid debugging and logging, the worker class name and worker process
        ID are included in the message.

        The message is a ``dict`` with the following keys:

            *worker* - the worker class name
            *pid* - the process ID
            *signal* - the signal name
            *args* - signal arguments
        """
        self.q.put(dict(
            worker=self.name,
            pid=self.pid,
            signal=signal,
            args=(self.key,) + args,
        ))

    def run(self):
        self.execute(*self.args)

    def execute(self, *args):
        raise NotImplementedError(
            '%s.execute()' % self.name
        )


class CouchWorker(Worker):
    def __init__(self, env, q, key, args):
        super(CouchWorker, self).__init__(env, q, key, args)
        self.db = Database('dmedia', self.env)
        self.db.ensure()


class Manager(object):
    def __init__(self, env, callback=None):
        if not (callback is None or callable(callback)):
            raise TypeError(
                'callback must be callable; got %r' % callback
            )
        self.env = env
        self._callback = callback
        self._q = multiprocessing.Queue()
        self._lock = Lock()
        self._workers = {}
        self._running = False
        self._thread = None
        self.name = self.__class__.__name__

    def _start_signal_thread(self):
        assert self._running is False
        assert len(self._workers) > 0
        if self._thread is not None:
            self._thread.join()
        self._running = True
        self._thread = Thread(target=self._signal_thread)
        self._thread.daemon = True
        self._thread.start()

    def _kill_signal_thread(self):
        self._running = False

    def _signal_thread(self):
        while self._running:
            try:
                self._process_message(self._q.get(timeout=0.5))
            except Empty:
                pass

    def _process_message(self, msg):
        log.info('[from %(worker)s %(pid)d] %(signal)s %(args)r', msg)
        with self._lock:
            signal = msg['signal']
            args = msg['args']
            handler = getattr(self, 'on_' + signal, None)
            if callable(handler):
                handler(*args)
            else:
                self.emit(signal, *args)

    def first_worker_starting(self):
        """
        Called before starting the first worker.

        This method can be overridden by subclasses.  It is called when the
        manager goes from a state of having no active workers to a state of
        having at least one active worker, just prior to starting the new
        worker.

        For example, `ImportManager` overrides this method to fire the
        "BatchStarted" signal.

        Also see `Manager.last_worker_finished()`.
        """

    def last_worker_finished(self):
        """
        Called after last worker has finished.

        This method can be overridden by subclasses.  It is called when the
        manager goes from a state of having at least one active worker to a
        state of having no active workers, just after the worker's "terminate"
        signal has been handled and the worker removed from the ``_workers``
        dictionary.

        For example, `ImportManager` overrides this method to fire the
        "BatchFinished" signal.

        Also see `Manager.first_worker_starting()`.
        """

    def on_terminate(self, key):
        p = self._workers.pop(key)
        p.join()
        if len(self._workers) == 0:
            self._kill_signal_thread()
            self.last_worker_finished()

    def on_error(self, key, exception, message):
        log.error('%s %s: %s: %s', self.name, key, exception, message)

    def kill(self):
        if not self._running:
            return False
        log.info('Killing %s', self.name)
        self._running = False
        self._thread.join()  # Cleanly shutdown _signal_thread
        with self._lock:
            for p in self._workers.values():
                p.terminate()
                p.join()
            self._workers.clear()
            return True

    def get_worker_env(self, worker, key, args):
        return dict(self.env)

    def start_job(self, worker, key, *args):
        """
        Start a process identified by *key*, using worker class *name*.

        If the job identified by *key* is already running, ``False`` is
        returned, without any further action.

        Otherwise the job is dispatched to a new worker process, and ``True`` is
        returned.

        Note that this method is asynchronous and will return immediately.

        :param worker: name of worker class, eg ``'ImportWorker'``
        :param key: a key to uniquely identify new `Worker` among active workers
            controlled by this `Manager`
        :param args: arguments to be passed to `Worker.run()`
        """
        with self._lock:
            if key in self._workers:
                return False
            if len(self._workers) == 0:
                self.first_worker_starting()
            env = self.get_worker_env(worker, key, args)
            p = multiprocessing.Process(
                target=dispatch,
                args=(worker, env, self._q, key, args),
            )
            p.daemon = True
            self._workers[key] = p
            p.start()
            if len(self._workers) == 1:
                self._start_signal_thread()
            return True

    def kill_job(self, key):
        with self._lock:
            if key not in self._workers:
                return False
            p = self._workers.pop(key)
            p.terminate()
            p.join()
            return True

    def list_jobs(self):
        return sorted(self._workers)

    def emit(self, signal, *args):
        """
        Emit a signal to higher-level code.
        """
        if self._callback is None:
            return
        self._callback(signal, args)


class CouchManager(Manager):
    def __init__(self, env, callback=None):
        super(CouchManager, self).__init__(env, callback)
        self.db = Database('dmedia', self.env)
        self.db.ensure()