~ubuntuone-pqm-team/celery/stable

« back to all changes in this revision

Viewing changes to celery/worker/autoreload.py

  • Committer: Ricardo Kirkner
  • Date: 2013-10-25 11:44:55 UTC
  • Revision ID: ricardo.kirkner@canonical.com-20131025114455-3atssn554emdwy4y
Tags: v2.5.0
imported celery 2.5.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 -*-
 
2
"""
 
3
    celery.worker.autoreload
 
4
    ~~~~~~~~~~~~~~~~~~~~~~~~
 
5
 
 
6
    This module implements automatic module reloading
 
7
"""
 
8
from __future__ import absolute_import
 
9
from __future__ import with_statement
 
10
 
 
11
import errno
 
12
import hashlib
 
13
import os
 
14
import select
 
15
import sys
 
16
import time
 
17
 
 
18
from collections import defaultdict
 
19
 
 
20
from ..abstract import StartStopComponent
 
21
from ..utils.threads import bgThread, Event
 
22
 
 
23
try:
 
24
    import pyinotify
 
25
    _ProcessEvent = pyinotify.ProcessEvent
 
26
except ImportError:
 
27
    pyinotify = None        # noqa
 
28
    _ProcessEvent = object  # noqa
 
29
 
 
30
 
 
31
class WorkerComponent(StartStopComponent):
 
32
    name = "worker.autoreloader"
 
33
    requires = ("pool", )
 
34
 
 
35
    def __init__(self, w, autoreload=None, **kwargs):
 
36
        self.enabled = w.autoreload = autoreload
 
37
        w.autoreloader = None
 
38
 
 
39
    def create(self, w):
 
40
        w.autoreloader = self.instantiate(w.autoreloader_cls,
 
41
                                          controller=w,
 
42
                                          logger=w.logger)
 
43
        return w.autoreloader
 
44
 
 
45
 
 
46
def file_hash(filename, algorithm="md5"):
 
47
    hobj = hashlib.new(algorithm)
 
48
    with open(filename, "rb") as f:
 
49
        for chunk in iter(lambda: f.read(2 ** 20), ''):
 
50
            hobj.update(chunk)
 
51
    return hobj.digest()
 
52
 
 
53
 
 
54
class BaseMonitor(object):
 
55
 
 
56
    def __init__(self, files, on_change=None, shutdown_event=None,
 
57
            interval=0.5):
 
58
        self.files = files
 
59
        self.interval = interval
 
60
        self._on_change = on_change
 
61
        self.modify_times = defaultdict(int)
 
62
        self.shutdown_event = shutdown_event or Event()
 
63
 
 
64
    def start(self):
 
65
        raise NotImplementedError("Subclass responsibility")
 
66
 
 
67
    def stop(self):
 
68
        pass
 
69
 
 
70
    def on_change(self, modified):
 
71
        if self._on_change:
 
72
            return self._on_change(modified)
 
73
 
 
74
 
 
75
class StatMonitor(BaseMonitor):
 
76
    """File change monitor based on the ``stat`` system call."""
 
77
 
 
78
    def _mtimes(self):
 
79
        return ((f, self._mtime(f)) for f in self.files)
 
80
 
 
81
    def _maybe_modified(self, f, mt):
 
82
        return mt is not None and self.modify_times[f] != mt
 
83
 
 
84
    def start(self):
 
85
        while not self.shutdown_event.is_set():
 
86
            modified = dict((f, mt) for f, mt in self._mtimes()
 
87
                                if self._maybe_modified(f, mt))
 
88
            if modified:
 
89
                self.on_change(modified.keys())
 
90
                self.modify_times.update(modified)
 
91
            time.sleep(self.interval)
 
92
 
 
93
    @staticmethod
 
94
    def _mtime(path):
 
95
        try:
 
96
            return os.stat(path).st_mtime
 
97
        except Exception:
 
98
            pass
 
99
 
 
100
 
 
101
class KQueueMonitor(BaseMonitor):
 
102
    """File change monitor based on BSD kernel event notifications"""
 
103
 
 
104
    def __init__(self, *args, **kwargs):
 
105
        assert hasattr(select, "kqueue")
 
106
        super(KQueueMonitor, self).__init__(*args, **kwargs)
 
107
        self.filemap = dict((f, None) for f in self.files)
 
108
 
 
109
    def start(self):
 
110
        self._kq = select.kqueue()
 
111
        kevents = []
 
112
        for f in self.filemap:
 
113
            self.filemap[f] = fd = os.open(f, os.O_RDONLY)
 
114
 
 
115
            ev = select.kevent(fd,
 
116
                    filter=select.KQ_FILTER_VNODE,
 
117
                    flags=select.KQ_EV_ADD |
 
118
                            select.KQ_EV_ENABLE |
 
119
                            select.KQ_EV_CLEAR,
 
120
                    fflags=select.KQ_NOTE_WRITE |
 
121
                            select.KQ_NOTE_EXTEND)
 
122
            kevents.append(ev)
 
123
 
 
124
        events = self._kq.control(kevents, 0)
 
125
        while not self.shutdown_event.is_set():
 
126
            events = self._kq.control(kevents, 1)
 
127
            fds = [e.ident for e in events]
 
128
            modified = [k for k, v in self.filemap.iteritems()
 
129
                                        if v in fds]
 
130
            self.on_change(modified)
 
131
 
 
132
    def stop(self):
 
133
        self._kq.close()
 
134
        for fd in filter(None, self.filemap.values()):
 
135
            try:
 
136
                os.close(fd)
 
137
            except OSError, exc:
 
138
                if exc != errno.EBADF:
 
139
                    raise
 
140
            self.filemap[fd] = None
 
141
        self.filemap.clear()
 
142
 
 
143
 
 
144
class InotifyMonitor(_ProcessEvent):
 
145
    """File change monitor based on Linux kernel `inotify` subsystem"""
 
146
 
 
147
    def __init__(self, modules, on_change=None, **kwargs):
 
148
        assert pyinotify
 
149
        self._modules = modules
 
150
        self._on_change = on_change
 
151
 
 
152
    def start(self):
 
153
        try:
 
154
            self._wm = pyinotify.WatchManager()
 
155
            self._notifier = pyinotify.Notifier(self._wm)
 
156
            for m in self._modules:
 
157
                self._wm.add_watch(m, pyinotify.IN_MODIFY)
 
158
            self._notifier.loop()
 
159
        finally:
 
160
            self.close()
 
161
 
 
162
    def close(self):
 
163
        self._notifier.stop()
 
164
        self._wm.close()
 
165
 
 
166
    def process_IN_MODIFY(self, event):
 
167
        self.on_change(event.pathname)
 
168
 
 
169
    def on_change(self, modified):
 
170
        if self._on_change:
 
171
            return self._on_change(modified)
 
172
 
 
173
 
 
174
def default_implementation():
 
175
    # kqueue monitor not working properly at this time.
 
176
    if hasattr(select, "kqueue"):
 
177
        return "kqueue"
 
178
    if sys.platform.startswith("linux") and pyinotify:
 
179
        return "inotify"
 
180
    else:
 
181
        return "stat"
 
182
 
 
183
implementations = {"kqueue": KQueueMonitor,
 
184
                   "inotify": InotifyMonitor,
 
185
                   "stat": StatMonitor}
 
186
Monitor = implementations[
 
187
            os.environ.get("CELERYD_FSNOTIFY") or default_implementation()]
 
188
 
 
189
 
 
190
class Autoreloader(bgThread):
 
191
    """Tracks changes in modules and fires reload commands"""
 
192
    Monitor = Monitor
 
193
 
 
194
    def __init__(self, controller, modules=None, monitor_cls=None,
 
195
            logger=None, **options):
 
196
        super(Autoreloader, self).__init__()
 
197
        self.controller = controller
 
198
        app = self.controller.app
 
199
        self.modules = app.loader.task_modules if modules is None else modules
 
200
        self.logger = logger
 
201
        self.options = options
 
202
        self.Monitor = monitor_cls or self.Monitor
 
203
        self._monitor = None
 
204
        self._hashes = None
 
205
 
 
206
    def body(self):
 
207
        files = [sys.modules[m].__file__ for m in self.modules]
 
208
        self._monitor = self.Monitor(files, self.on_change,
 
209
                shutdown_event=self._is_shutdown, **self.options)
 
210
        self._hashes = dict([(f, file_hash(f)) for f in files])
 
211
        try:
 
212
            self._monitor.start()
 
213
        except OSError, exc:
 
214
            if exc.errno not in (errno.EINTR, errno.EAGAIN):
 
215
                raise
 
216
 
 
217
    def _maybe_modified(self, f):
 
218
        digest = file_hash(f)
 
219
        if digest != self._hashes[f]:
 
220
            self._hashes[f] = digest
 
221
            return True
 
222
        return False
 
223
 
 
224
    def on_change(self, files):
 
225
        modified = [f for f in files if self._maybe_modified(f)]
 
226
        if modified:
 
227
            self.logger.info("Detected modified modules: %s" % (
 
228
                    map(self._module_name, modified), ))
 
229
            self._reload(map(self._module_name, modified))
 
230
 
 
231
    def _reload(self, modules):
 
232
        self.controller.reload(modules, reload=True)
 
233
 
 
234
    def stop(self):
 
235
        if self._monitor:
 
236
            self._monitor.stop()
 
237
 
 
238
    @staticmethod
 
239
    def _module_name(path):
 
240
        return os.path.splitext(os.path.basename(path))[0]