1
# ubuntuone.syncdaemon.event_queue - Event queuing
3
# Author: Facundo Batista <facundo@canonical.com>
5
# Copyright 2009 Canonical Ltd.
7
# This program is free software: you can redistribute it and/or modify it
8
# under the terms of the GNU General Public License version 3, as published
9
# by the Free Software Foundation.
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
14
# PURPOSE. See the GNU General Public License for more details.
16
# You should have received a copy of the GNU General Public License along
17
# with this program. If not, see <http://www.gnu.org/licenses/>.
18
'''Module that implements the Event Queue machinery.'''
26
from twisted.internet import abstract, reactor, error, defer
28
from Queue import Queue, Empty
30
class InvalidEventError(Exception):
31
'''Received an Event that is not in the allowed list.'''
33
# these are our internal events, what is inserted into the whole system
35
'FS_FILE_CLOSE_WRITE': ('path',),
36
'FS_FILE_CREATE': ('path',),
37
'FS_DIR_CREATE': ('path',),
38
'FS_FILE_DELETE': ('path',),
39
'FS_DIR_DELETE': ('path',),
40
'FS_FILE_MOVE': ('path_from', 'path_to',),
41
'FS_DIR_MOVE': ('path_from', 'path_to',),
43
'AQ_FILE_NEW_OK': ('marker', 'new_id'),
44
'AQ_FILE_NEW_ERROR': ('marker', 'error'),
45
'AQ_DIR_NEW_OK': ('marker', 'new_id'),
46
'AQ_DIR_NEW_ERROR': ('marker', 'error'),
47
'AQ_MOVE_OK': ('share_id', 'node_id'),
48
'AQ_MOVE_ERROR': ('share_id', 'node_id',
49
'old_parent_id', 'new_parent_id', 'new_name', 'error'),
50
'AQ_UNLINK_OK': ('share_id', 'parent_id', 'node_id'),
51
'AQ_UNLINK_ERROR': ('share_id', 'parent_id', 'node_id', 'error'),
52
'AQ_DOWNLOAD_STARTED': ('share_id', 'node_id', 'server_hash'),
53
'AQ_DOWNLOAD_FINISHED': ('share_id', 'node_id', 'server_hash'),
54
'AQ_DOWNLOAD_ERROR': ('share_id', 'node_id', 'server_hash', 'error'),
55
'AQ_UPLOAD_STARTED' : ('share_id', 'node_id', 'hash'),
56
'AQ_UPLOAD_FINISHED': ('share_id', 'node_id', 'hash'),
57
'AQ_UPLOAD_ERROR': ('share_id', 'node_id', 'error', 'hash'),
58
'AQ_SHARES_LIST': ('shares_list',),
59
'AQ_LIST_SHARES_ERROR': ('error',),
60
'AQ_CREATE_SHARE_OK': ('share_id', 'marker'),
61
'AQ_CREATE_SHARE_ERROR': ('marker', 'error'),
62
'AQ_QUERY_ERROR': ('item', 'error'),
64
'SV_SHARE_CHANGED': ('message', 'info'),
65
'SV_SHARE_ANSWERED': ('share_id', 'answer'),
66
'SV_HASH_NEW': ('share_id', 'node_id', 'hash'),
67
'SV_FILE_NEW': ('share_id', 'node_id', 'parent_id', 'name'),
68
'SV_DIR_NEW': ('share_id', 'node_id', 'parent_id', 'name'),
69
'SV_FILE_DELETED': ('share_id', 'node_id'),
70
'SV_MOVED': ('share_id', 'node_id', 'new_share_id', 'new_parent_id',
72
'HQ_HASH_NEW': ('path', 'hash', 'crc32', 'size', 'stat'),
73
'SYS_CONNECT': ('access_token',),
75
'SYS_STATE_CHANGED': ('state',),
76
'SYS_NET_CONNECTED': (),
77
'SYS_NET_DISCONNECTED': (),
78
'SYS_WAIT_FOR_LOCAL_RESCAN': (),
79
'SYS_LOCAL_RESCAN_DONE': (),
80
'SYS_CONNECTION_MADE': (),
81
'SYS_CONNECTION_LOST': (),
82
'SYS_PROTOCOL_VERSION_ERROR': ('error',),
83
'SYS_PROTOCOL_VERSION_OK': (),
84
'SYS_OAUTH_ERROR': ('error',),
86
'SYS_SERVER_RESCAN_STARTING': (),
87
'SYS_SERVER_RESCAN_DONE': (),
88
'SYS_META_QUEUE_WAITING': (),
89
'SYS_META_QUEUE_DONE': (),
90
'SYS_CONTENT_QUEUE_WAITING': (),
91
'SYS_CONTENT_QUEUE_DONE': (),
92
'SYS_CLEANUP_STARTED': (),
93
'SYS_CLEANUP_FINISHED': (),
94
'SYS_UNKNOWN_ERROR': (),
97
if 'IN_CREATE' in vars(pyinotify.EventsCodes):
98
# < 0.8; event codes in EventsCodes; events have is_dir attribute
99
evtcodes = pyinotify.EventsCodes
100
event_is_dir = lambda event: event.is_dir
102
# >= 0.8; event codes in pyinotify itself; events have dir attribute
104
event_is_dir = lambda event: event.dir
106
# translates quickly the event and it's is_dir state to our standard events
107
NAME_TRANSLATIONS = {
108
evtcodes.IN_CLOSE_WRITE: 'FS_FILE_CLOSE_WRITE',
109
evtcodes.IN_CREATE: 'FS_FILE_CREATE',
110
evtcodes.IN_CREATE | evtcodes.IN_ISDIR: 'FS_DIR_CREATE',
111
evtcodes.IN_DELETE: 'FS_FILE_DELETE',
112
evtcodes.IN_DELETE | evtcodes.IN_ISDIR: 'FS_DIR_DELETE',
113
evtcodes.IN_MOVED_FROM: 'FS_FILE_DELETE',
114
evtcodes.IN_MOVED_FROM | evtcodes.IN_ISDIR: 'FS_DIR_DELETE',
115
evtcodes.IN_MOVED_TO: 'FS_FILE_CREATE',
116
evtcodes.IN_MOVED_TO | evtcodes.IN_ISDIR: 'FS_DIR_CREATE',
119
# these are the events that will listen from inotify
121
evtcodes.IN_CLOSE_WRITE |
124
evtcodes.IN_MOVED_FROM |
125
evtcodes.IN_MOVED_TO |
126
evtcodes.IN_MOVE_SELF
129
DEFAULT_HANDLER = "handle_default" # receives (event_name, *args, **kwargs)
131
class _INotifyProcessor(pyinotify.ProcessEvent):
132
'''Helper class that is called from inpotify when an event happens.
134
This class also catchs the MOVEs events, and synthetises a new
135
FS_(DIR|FILE)_MOVE event when possible.
137
def __init__(self, eq):
138
self.log = logging.getLogger('ubuntuone.SyncDaemon.INotifyProcessor')
140
self.held_event = None
142
self.frozen_path = None
143
self.frozen_evts = False
145
def on_timeout(self):
146
'''Called on timeout.'''
147
if self.held_event is not None:
148
self.release_held_event(True)
150
def release_held_event(self, timed_out=False):
151
'''Release the event on hold to fulfill its destiny.'''
155
except error.AlreadyCalled:
156
# self.timeout() was *just* called, do noting here
158
self.push_event(self.held_event)
159
self.held_event = None
161
def process_IN_MOVE_SELF(self, event):
162
'''Don't do anything here.
164
We just turned this event on because pyinotify does some
165
path-fixing in its internal processing when this happens
168
def process_IN_MOVED_FROM(self, event):
169
'''Capture the MOVED_FROM to maybe syntethize FILE_MOVED.'''
170
if self.held_event is not None:
171
self.release_held_event()
173
self.held_event = event
174
self.timer = reactor.callLater(1, self.on_timeout)
176
def is_ignored(self, path):
177
"""should we ignore this path?"""
178
for part in path.split(os.path.sep):
179
if path.endswith(".partial") or \
180
re.search(r"\.conflict(?:\.\d+)?$", path):
183
# don't support symlinks yet
184
if os.path.islink(path):
188
def process_IN_MOVED_TO(self, event):
189
'''Capture the MOVED_TO to maybe syntethize FILE_MOVED.'''
190
if self.held_event is not None:
191
if event.cookie == self.held_event.cookie:
194
except error.AlreadyCalled:
195
# self.timeout() was *just* called, do noting here
198
f_path = os.path.join(self.held_event.path,
199
self.held_event.name)
200
t_path = os.path.join(event.path, event.name)
202
if not self.is_ignored(f_path) and \
203
not self.is_ignored(t_path):
204
f_share_id = self.eq.fs.get_by_path(\
205
os.path.dirname(f_path)).share_id
206
t_share_id = self.eq.fs.get_by_path(\
207
os.path.dirname(t_path)).share_id
208
evtname = "FS_DIR_" if event_is_dir(event) else "FS_FILE_"
209
if f_share_id != t_share_id:
210
# if the share_id are != push a delete/create
211
self.eq.push(evtname+"DELETE", f_path)
212
self.eq.push(evtname+"CREATE", t_path)
214
self.eq.push(evtname+"MOVE", f_path, t_path)
215
self.held_event = None
218
self.release_held_event()
219
self.push_event(event)
221
# we don't have a held_event so this is a move from outside.
222
# if it's a file move it's atomic on POSIX, so we aren't going to
223
# receive a IN_CLOSE_WRITE, so let's fake it for files
224
self.push_event(event)
225
t_path = os.path.join(event.path, event.name)
226
if not os.path.isdir(t_path):
227
self.eq.push('FS_FILE_CLOSE_WRITE', t_path)
229
def process_default(self, event):
230
'''Push the event into the EventQueue.'''
231
if self.held_event is not None:
232
self.release_held_event()
233
self.push_event(event)
235
def push_event(self, event):
236
'''Push the event to the EQ.'''
238
if event.mask == evtcodes.IN_IGNORED:
241
# change the pattern IN_CREATE to FS_FILE_CREATE or FS_DIR_CREATE
243
evt_name = NAME_TRANSLATIONS[event.mask]
245
raise KeyError("Unhandled Event in INotify: %s" % event)
248
fullpath = os.path.join(event.path, event.name)
250
# check if the path is not frozen
251
if self.frozen_path is not None:
252
if os.path.dirname(fullpath) == self.frozen_path:
253
# this will at least store the last one, for debug
255
self.frozen_evts = (evt_name, fullpath)
258
if not self.is_ignored(fullpath):
259
if evt_name == 'FS_DIR_DELETE':
260
self.handle_dir_delete(fullpath)
261
self.eq.push(evt_name, fullpath)
263
def freeze_begin(self, path):
264
"""Puts in hold all the events for this path."""
265
self.log.debug("Freeze begin: %r", path)
266
self.frozen_path = path
267
self.frozen_evts = False
269
def freeze_rollback(self):
270
"""Unfreezes the frozen path, reseting to idle state."""
271
self.log.debug("Freeze rollback: %r", self.frozen_path)
272
self.frozen_path = None
273
self.frozen_evts = False
275
def freeze_commit(self, events):
276
"""Unfreezes the frozen path, sending received events if not dirty.
278
If events for that path happened:
281
- push the here received events, return False
283
self.log.debug("Freeze commit: %r (%d events)",
284
self.frozen_path, len(events))
287
self.log.debug("Dirty by %s", self.frozen_evts)
288
self.frozen_evts = False
291
# push the received events
292
for evt_name, path in events:
293
if not self.is_ignored(path):
294
self.eq.push(evt_name, path)
296
self.frozen_path = None
297
self.frozen_evts = False
300
def handle_dir_delete(self, fullpath):
301
""" handle the case of move a dir to a non-watched directory """
302
paths = self.eq.fs.get_paths_starting_with(fullpath)
303
paths.sort(reverse=True)
304
for path, is_dir in paths:
308
self.eq.push('FS_DIR_DELETE', path)
310
self.eq.push('FS_FILE_DELETE', path)
313
class EventQueue(object):
314
'''Manages the events from different sources and distributes them.'''
316
def __init__(self, fs):
319
self.log = logging.getLogger('ubuntuone.SyncDaemon.EQ')
322
self._inotify_reader = None
323
self._inotify_wm = wm = pyinotify.WatchManager()
324
self._processor = _INotifyProcessor(self)
325
self._inotify_notifier = pyinotify.Notifier(wm, self._processor)
326
self._hook_inotify_to_twisted(wm, self._inotify_notifier)
328
self.dispatching = False
329
self.dispatch_queue = Queue()
330
self.empty_event_queue_callbacks = set()
332
def add_empty_event_queue_callback(self, callback):
333
"""add a callback for when the even queue has no more events."""
334
self.empty_event_queue_callbacks.add(callback)
335
if not self.dispatching and self.dispatch_queue.empty():
336
if callable(callback):
339
def remove_empty_event_queue_callback(self, callback):
340
"""remove the callback"""
341
self.empty_event_queue_callbacks.remove(callback)
343
def _hook_inotify_to_twisted(self, wm, notifier):
344
'''This will hook inotify to twisted.'''
346
class MyReader(abstract.FileDescriptor):
347
'''Chain between inotify and twisted.'''
348
# will never pass a fd to write, pylint: disable-msg=W0223
351
'''Returns the fileno to select().'''
355
'''Called when twisted says there's something to read.'''
356
notifier.read_events()
357
notifier.process_events()
360
self._inotify_reader = reader
361
reactor.addReader(reader)
364
'''Prepares the EQ to be closed.'''
365
self._inotify_notifier.stop()
366
reactor.removeReader(self._inotify_reader)
368
def inotify_rm_watch(self, dirpath):
369
'''Remove watch from a dir.'''
371
wd = self._watchs[dirpath]
373
raise ValueError("The path %r is not watched right now!" % dirpath)
374
result = self._inotify_wm.rm_watch(wd)
376
raise RuntimeError("The path %r couldn't be removed!" % dirpath)
377
del self._watchs[dirpath]
379
def inotify_add_watch(self, dirpath):
380
'''Add watch to a dir.'''
381
self.log.debug("Adding inotify watch to %r", dirpath)
382
result = self._inotify_wm.add_watch(dirpath, INOTIFY_EVENTS)
383
self._watchs[dirpath] = result[dirpath]
385
def unsubscribe(self, obj):
386
'''Removes the callback object from the listener queue.
388
@param obj: the callback object to remove from the queue.
390
self._listeners.remove(obj)
392
def subscribe(self, obj):
393
'''Stores the callback object to whom push the events when received.
395
@param obj: the callback object to add to the listener queue.
397
These objects should provide a 'handle_FOO' to receive the FOO
398
events (replace FOO with the desired event).
400
if obj not in self._listeners:
401
self._listeners.append(obj)
403
def push(self, event_name, *args, **kwargs):
404
'''Receives a push for all events.
406
The signature for each event is forced on each method, not in this
409
self.log.debug("push_event: %s, args:%s, kw:%s",
410
event_name, args, kwargs)
411
# get the event parameters
413
event_params = EVENTS[event_name]
415
msg = "The received event_name (%r) is not valid!" % event_name
417
raise InvalidEventError(msg)
419
# validate that the received arguments are ok
421
if len(args) > len(event_params):
422
msg = "Too many arguments! (should receive %s)" % event_params
425
event_params = event_params[len(args):]
427
s_eventparms = set(event_params)
428
s_kwargs = set(kwargs.keys())
429
if s_eventparms != s_kwargs:
430
msg = "Wrong arguments for event %s (should receive %s, got %s)" \
431
% (event_name, event_params, kwargs.keys())
435
# check if we are currently dispatching an event
436
self.dispatch_queue.put((event_name, args, kwargs))
437
if not self.dispatching:
438
self.dispatching = True
441
event_name, args, kwargs = \
442
self.dispatch_queue.get(block=False)
443
self._dispatch(event_name, *args, **kwargs)
445
self.dispatching = False
446
for callable in self.empty_event_queue_callbacks.copy():
450
def _dispatch(self, event_name, *args, **kwargs):
451
""" push the event to all listeners. """
452
# check listeners to see if have the proper method, and call it
453
meth_name = "handle_" + event_name
454
for listener in self._listeners:
455
# don't use hasattr because is expensive and
456
# catch too many errors
457
# we need to catch all here, pylint: disable-msg=W0703
458
method = self._get_listener_method(listener, meth_name, event_name)
459
if method is not None:
461
method(*args, **kwargs)
463
self.log.exception("Error encountered while handling: %s"
464
" in %s", event_name, listener)
466
def _get_listener_method(self, listener, method_name, event_name):
467
""" returns the method named method_name or hanlde_default from the
468
listener. Or None if the methods are not defined in the listener.
470
method = getattr(listener, method_name, None)
472
method = getattr(listener, DEFAULT_HANDLER, None)
473
if method is not None:
474
method = functools.partial(method, event_name)
477
def freeze_begin(self, path):
478
"""Puts in hold all the events for this path."""
479
if self._processor.frozen_path is not None:
480
raise ValueError("There's something already frozen!")
481
self._processor.freeze_begin(path)
483
def freeze_rollback(self):
484
"""Unfreezes the frozen path, reseting to idle state."""
485
if self._processor.frozen_path is None:
486
raise ValueError("Rolling back with nothing frozen!")
487
self._processor.freeze_rollback()
489
def freeze_commit(self, events):
490
"""Unfreezes the frozen path, sending received events if not dirty.
492
If events for that path happened:
495
- push the here received events, return False
497
if self._processor.frozen_path is None:
498
raise ValueError("Commiting with nothing frozen!")
500
d = defer.execute(self._processor.freeze_commit, events)