1
# -*- coding: utf-8 -*-
3
celery.worker.autoreload
4
~~~~~~~~~~~~~~~~~~~~~~~~
6
This module implements automatic module reloading
8
from __future__ import absolute_import
9
from __future__ import with_statement
18
from collections import defaultdict
20
from ..abstract import StartStopComponent
21
from ..utils.threads import bgThread, Event
25
_ProcessEvent = pyinotify.ProcessEvent
27
pyinotify = None # noqa
28
_ProcessEvent = object # noqa
31
class WorkerComponent(StartStopComponent):
32
name = "worker.autoreloader"
35
def __init__(self, w, autoreload=None, **kwargs):
36
self.enabled = w.autoreload = autoreload
40
w.autoreloader = self.instantiate(w.autoreloader_cls,
46
def file_hash(filename, algorithm="md5"):
47
hobj = hashlib.new(algorithm)
48
with open(filename, "rb") as f:
49
for chunk in iter(lambda: f.read(2 ** 20), ''):
54
class BaseMonitor(object):
56
def __init__(self, files, on_change=None, shutdown_event=None,
59
self.interval = interval
60
self._on_change = on_change
61
self.modify_times = defaultdict(int)
62
self.shutdown_event = shutdown_event or Event()
65
raise NotImplementedError("Subclass responsibility")
70
def on_change(self, modified):
72
return self._on_change(modified)
75
class StatMonitor(BaseMonitor):
76
"""File change monitor based on the ``stat`` system call."""
79
return ((f, self._mtime(f)) for f in self.files)
81
def _maybe_modified(self, f, mt):
82
return mt is not None and self.modify_times[f] != mt
85
while not self.shutdown_event.is_set():
86
modified = dict((f, mt) for f, mt in self._mtimes()
87
if self._maybe_modified(f, mt))
89
self.on_change(modified.keys())
90
self.modify_times.update(modified)
91
time.sleep(self.interval)
96
return os.stat(path).st_mtime
101
class KQueueMonitor(BaseMonitor):
102
"""File change monitor based on BSD kernel event notifications"""
104
def __init__(self, *args, **kwargs):
105
assert hasattr(select, "kqueue")
106
super(KQueueMonitor, self).__init__(*args, **kwargs)
107
self.filemap = dict((f, None) for f in self.files)
110
self._kq = select.kqueue()
112
for f in self.filemap:
113
self.filemap[f] = fd = os.open(f, os.O_RDONLY)
115
ev = select.kevent(fd,
116
filter=select.KQ_FILTER_VNODE,
117
flags=select.KQ_EV_ADD |
118
select.KQ_EV_ENABLE |
120
fflags=select.KQ_NOTE_WRITE |
121
select.KQ_NOTE_EXTEND)
124
events = self._kq.control(kevents, 0)
125
while not self.shutdown_event.is_set():
126
events = self._kq.control(kevents, 1)
127
fds = [e.ident for e in events]
128
modified = [k for k, v in self.filemap.iteritems()
130
self.on_change(modified)
134
for fd in filter(None, self.filemap.values()):
138
if exc != errno.EBADF:
140
self.filemap[fd] = None
144
class InotifyMonitor(_ProcessEvent):
145
"""File change monitor based on Linux kernel `inotify` subsystem"""
147
def __init__(self, modules, on_change=None, **kwargs):
149
self._modules = modules
150
self._on_change = on_change
154
self._wm = pyinotify.WatchManager()
155
self._notifier = pyinotify.Notifier(self._wm)
156
for m in self._modules:
157
self._wm.add_watch(m, pyinotify.IN_MODIFY)
158
self._notifier.loop()
163
self._notifier.stop()
166
def process_IN_MODIFY(self, event):
167
self.on_change(event.pathname)
169
def on_change(self, modified):
171
return self._on_change(modified)
174
def default_implementation():
175
# kqueue monitor not working properly at this time.
176
if hasattr(select, "kqueue"):
178
if sys.platform.startswith("linux") and pyinotify:
183
implementations = {"kqueue": KQueueMonitor,
184
"inotify": InotifyMonitor,
186
Monitor = implementations[
187
os.environ.get("CELERYD_FSNOTIFY") or default_implementation()]
190
class Autoreloader(bgThread):
191
"""Tracks changes in modules and fires reload commands"""
194
def __init__(self, controller, modules=None, monitor_cls=None,
195
logger=None, **options):
196
super(Autoreloader, self).__init__()
197
self.controller = controller
198
app = self.controller.app
199
self.modules = app.loader.task_modules if modules is None else modules
201
self.options = options
202
self.Monitor = monitor_cls or self.Monitor
207
files = [sys.modules[m].__file__ for m in self.modules]
208
self._monitor = self.Monitor(files, self.on_change,
209
shutdown_event=self._is_shutdown, **self.options)
210
self._hashes = dict([(f, file_hash(f)) for f in files])
212
self._monitor.start()
214
if exc.errno not in (errno.EINTR, errno.EAGAIN):
217
def _maybe_modified(self, f):
218
digest = file_hash(f)
219
if digest != self._hashes[f]:
220
self._hashes[f] = digest
224
def on_change(self, files):
225
modified = [f for f in files if self._maybe_modified(f)]
227
self.logger.info("Detected modified modules: %s" % (
228
map(self._module_name, modified), ))
229
self._reload(map(self._module_name, modified))
231
def _reload(self, modules):
232
self.controller.reload(modules, reload=True)
239
def _module_name(path):
240
return os.path.splitext(os.path.basename(path))[0]