~didrocks/ubuntuone-client/dont-suffer-zg-crash

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/event_queue.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2009-06-30 12:00:00 UTC
  • Revision ID: james.westby@ubuntu.com-20090630120000-by806ovmw3193qe8
Tags: upstream-0.90.3
ImportĀ upstreamĀ versionĀ 0.90.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# ubuntuone.syncdaemon.event_queue - Event queuing
 
2
#
 
3
# Author: Facundo Batista <facundo@canonical.com>
 
4
#
 
5
# Copyright 2009 Canonical Ltd.
 
6
#
 
7
# This program is free software: you can redistribute it and/or modify it
 
8
# under the terms of the GNU General Public License version 3, as published
 
9
# by the Free Software Foundation.
 
10
#
 
11
# This program is distributed in the hope that it will be useful, but
 
12
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
13
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
14
# PURPOSE.  See the GNU General Public License for more details.
 
15
#
 
16
# You should have received a copy of the GNU General Public License along
 
17
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
'''Module that implements the Event Queue machinery.'''
 
19
 
 
20
import functools
 
21
import logging
 
22
import os
 
23
import re
 
24
 
 
25
import pyinotify
 
26
from twisted.internet import abstract, reactor, error, defer
 
27
 
 
28
from Queue import Queue, Empty
 
29
 
 
30
class InvalidEventError(Exception):
 
31
    '''Received an Event that is not in the allowed list.'''
 
32
 
 
33
# these are our internal events, what is inserted into the whole system
 
34
EVENTS = {
 
35
    'FS_FILE_CLOSE_WRITE': ('path',),
 
36
    'FS_FILE_CREATE': ('path',),
 
37
    'FS_DIR_CREATE': ('path',),
 
38
    'FS_FILE_DELETE': ('path',),
 
39
    'FS_DIR_DELETE': ('path',),
 
40
    'FS_FILE_MOVE': ('path_from', 'path_to',),
 
41
    'FS_DIR_MOVE': ('path_from', 'path_to',),
 
42
 
 
43
    'AQ_FILE_NEW_OK': ('marker', 'new_id'),
 
44
    'AQ_FILE_NEW_ERROR': ('marker', 'error'),
 
45
    'AQ_DIR_NEW_OK': ('marker', 'new_id'),
 
46
    'AQ_DIR_NEW_ERROR': ('marker', 'error'),
 
47
    'AQ_MOVE_OK': ('share_id', 'node_id'),
 
48
    'AQ_MOVE_ERROR': ('share_id', 'node_id',
 
49
                      'old_parent_id', 'new_parent_id', 'new_name', 'error'),
 
50
    'AQ_UNLINK_OK': ('share_id', 'parent_id', 'node_id'),
 
51
    'AQ_UNLINK_ERROR': ('share_id', 'parent_id', 'node_id', 'error'),
 
52
    'AQ_DOWNLOAD_STARTED': ('share_id', 'node_id', 'server_hash'),
 
53
    'AQ_DOWNLOAD_FINISHED': ('share_id', 'node_id', 'server_hash'),
 
54
    'AQ_DOWNLOAD_ERROR': ('share_id', 'node_id', 'server_hash', 'error'),
 
55
    'AQ_UPLOAD_STARTED' : ('share_id', 'node_id', 'hash'),
 
56
    'AQ_UPLOAD_FINISHED': ('share_id', 'node_id', 'hash'),
 
57
    'AQ_UPLOAD_ERROR': ('share_id', 'node_id', 'error', 'hash'),
 
58
    'AQ_SHARES_LIST': ('shares_list',),
 
59
    'AQ_LIST_SHARES_ERROR': ('error',),
 
60
    'AQ_CREATE_SHARE_OK': ('share_id', 'marker'),
 
61
    'AQ_CREATE_SHARE_ERROR': ('marker', 'error'),
 
62
    'AQ_QUERY_ERROR': ('item', 'error'),
 
63
 
 
64
    'SV_SHARE_CHANGED': ('message', 'info'),
 
65
    'SV_SHARE_ANSWERED': ('share_id', 'answer'),
 
66
    'SV_HASH_NEW': ('share_id', 'node_id', 'hash'),
 
67
    'SV_FILE_NEW': ('share_id', 'node_id', 'parent_id', 'name'),
 
68
    'SV_DIR_NEW': ('share_id', 'node_id', 'parent_id', 'name'),
 
69
    'SV_FILE_DELETED': ('share_id', 'node_id'),
 
70
    'SV_MOVED': ('share_id', 'node_id', 'new_share_id', 'new_parent_id',
 
71
                 'new_name'),
 
72
    'HQ_HASH_NEW': ('path', 'hash', 'crc32', 'size', 'stat'),
 
73
    'SYS_CONNECT': ('access_token',),
 
74
    'SYS_DISCONNECT': (),
 
75
    'SYS_STATE_CHANGED': ('state',),
 
76
    'SYS_NET_CONNECTED': (),
 
77
    'SYS_NET_DISCONNECTED': (),
 
78
    'SYS_WAIT_FOR_LOCAL_RESCAN': (),
 
79
    'SYS_LOCAL_RESCAN_DONE': (),
 
80
    'SYS_CONNECTION_MADE': (),
 
81
    'SYS_CONNECTION_LOST': (),
 
82
    'SYS_PROTOCOL_VERSION_ERROR': ('error',),
 
83
    'SYS_PROTOCOL_VERSION_OK': (),
 
84
    'SYS_OAUTH_ERROR': ('error',),
 
85
    'SYS_OAUTH_OK': (),
 
86
    'SYS_SERVER_RESCAN_STARTING': (),
 
87
    'SYS_SERVER_RESCAN_DONE': (),
 
88
    'SYS_META_QUEUE_WAITING': (),
 
89
    'SYS_META_QUEUE_DONE': (),
 
90
    'SYS_CONTENT_QUEUE_WAITING': (),
 
91
    'SYS_CONTENT_QUEUE_DONE': (),
 
92
    'SYS_CLEANUP_STARTED': (),
 
93
    'SYS_CLEANUP_FINISHED': (),
 
94
    'SYS_UNKNOWN_ERROR': (),
 
95
}
 
96
 
 
97
if 'IN_CREATE' in vars(pyinotify.EventsCodes):
 
98
    # < 0.8; event codes in EventsCodes; events have is_dir attribute
 
99
    evtcodes = pyinotify.EventsCodes
 
100
    event_is_dir = lambda event: event.is_dir
 
101
else:
 
102
    # >= 0.8; event codes in pyinotify itself; events have dir attribute
 
103
    evtcodes = pyinotify
 
104
    event_is_dir = lambda event: event.dir
 
105
 
 
106
# translates quickly the event and it's is_dir state to our standard events
 
107
NAME_TRANSLATIONS = {
 
108
    evtcodes.IN_CLOSE_WRITE: 'FS_FILE_CLOSE_WRITE',
 
109
    evtcodes.IN_CREATE: 'FS_FILE_CREATE',
 
110
    evtcodes.IN_CREATE | evtcodes.IN_ISDIR: 'FS_DIR_CREATE',
 
111
    evtcodes.IN_DELETE: 'FS_FILE_DELETE',
 
112
    evtcodes.IN_DELETE | evtcodes.IN_ISDIR: 'FS_DIR_DELETE',
 
113
    evtcodes.IN_MOVED_FROM: 'FS_FILE_DELETE',
 
114
    evtcodes.IN_MOVED_FROM | evtcodes.IN_ISDIR: 'FS_DIR_DELETE',
 
115
    evtcodes.IN_MOVED_TO: 'FS_FILE_CREATE',
 
116
    evtcodes.IN_MOVED_TO | evtcodes.IN_ISDIR: 'FS_DIR_CREATE',
 
117
}
 
118
 
 
119
# these are the events that will listen from inotify
 
120
INOTIFY_EVENTS = (
 
121
    evtcodes.IN_CLOSE_WRITE |
 
122
    evtcodes.IN_CREATE |
 
123
    evtcodes.IN_DELETE |
 
124
    evtcodes.IN_MOVED_FROM |
 
125
    evtcodes.IN_MOVED_TO |
 
126
    evtcodes.IN_MOVE_SELF
 
127
)
 
128
 
 
129
DEFAULT_HANDLER = "handle_default" # receives (event_name, *args, **kwargs)
 
130
 
 
131
class _INotifyProcessor(pyinotify.ProcessEvent):
 
132
    '''Helper class that is called from inpotify when an event happens.
 
133
 
 
134
    This class also catchs the MOVEs events, and synthetises a new
 
135
    FS_(DIR|FILE)_MOVE event when possible.
 
136
    '''
 
137
    def __init__(self, eq):
 
138
        self.log = logging.getLogger('ubuntuone.SyncDaemon.INotifyProcessor')
 
139
        self.eq = eq
 
140
        self.held_event = None
 
141
        self.timer = None
 
142
        self.frozen_path = None
 
143
        self.frozen_evts = False
 
144
 
 
145
    def on_timeout(self):
 
146
        '''Called on timeout.'''
 
147
        if self.held_event is not None:
 
148
            self.release_held_event(True)
 
149
 
 
150
    def release_held_event(self, timed_out=False):
 
151
        '''Release the event on hold to fulfill its destiny.'''
 
152
        if not timed_out:
 
153
            try:
 
154
                self.timer.cancel()
 
155
            except error.AlreadyCalled:
 
156
                # self.timeout() was *just* called, do noting here
 
157
                return
 
158
        self.push_event(self.held_event)
 
159
        self.held_event = None
 
160
 
 
161
    def process_IN_MOVE_SELF(self, event):
 
162
        '''Don't do anything here.
 
163
 
 
164
        We just turned this event on because pyinotify does some
 
165
        path-fixing in its internal processing when this happens
 
166
        '''
 
167
 
 
168
    def process_IN_MOVED_FROM(self, event):
 
169
        '''Capture the MOVED_FROM to maybe syntethize FILE_MOVED.'''
 
170
        if self.held_event is not None:
 
171
            self.release_held_event()
 
172
 
 
173
        self.held_event = event
 
174
        self.timer = reactor.callLater(1, self.on_timeout)
 
175
 
 
176
    def is_ignored(self, path):
 
177
        """should we ignore this path?"""
 
178
        for part in path.split(os.path.sep):
 
179
            if path.endswith(".partial") or \
 
180
                    re.search(r"\.conflict(?:\.\d+)?$", path):
 
181
                return True
 
182
 
 
183
        # don't support symlinks yet
 
184
        if os.path.islink(path):
 
185
            return True
 
186
        return False
 
187
 
 
188
    def process_IN_MOVED_TO(self, event):
 
189
        '''Capture the MOVED_TO to maybe syntethize FILE_MOVED.'''
 
190
        if self.held_event is not None:
 
191
            if event.cookie == self.held_event.cookie:
 
192
                try:
 
193
                    self.timer.cancel()
 
194
                except error.AlreadyCalled:
 
195
                    # self.timeout() was *just* called, do noting here
 
196
                    pass
 
197
                else:
 
198
                    f_path = os.path.join(self.held_event.path,
 
199
                                                        self.held_event.name)
 
200
                    t_path = os.path.join(event.path, event.name)
 
201
 
 
202
                    if not self.is_ignored(f_path) and \
 
203
                            not self.is_ignored(t_path):
 
204
                        f_share_id = self.eq.fs.get_by_path(\
 
205
                                             os.path.dirname(f_path)).share_id
 
206
                        t_share_id = self.eq.fs.get_by_path(\
 
207
                                             os.path.dirname(t_path)).share_id
 
208
                        evtname = "FS_DIR_" if event_is_dir(event) else "FS_FILE_"
 
209
                        if f_share_id != t_share_id:
 
210
                            # if the share_id are != push a delete/create
 
211
                            self.eq.push(evtname+"DELETE", f_path)
 
212
                            self.eq.push(evtname+"CREATE", t_path)
 
213
                        else:
 
214
                            self.eq.push(evtname+"MOVE", f_path, t_path)
 
215
                    self.held_event = None
 
216
                return
 
217
            else:
 
218
                self.release_held_event()
 
219
                self.push_event(event)
 
220
        else:
 
221
            # we don't have a held_event so this is a move from outside.
 
222
            # if it's a file move it's atomic on POSIX, so we aren't going to
 
223
            # receive a IN_CLOSE_WRITE, so let's fake it for files
 
224
            self.push_event(event)
 
225
            t_path = os.path.join(event.path, event.name)
 
226
            if not os.path.isdir(t_path):
 
227
                self.eq.push('FS_FILE_CLOSE_WRITE', t_path)
 
228
 
 
229
    def process_default(self, event):
 
230
        '''Push the event into the EventQueue.'''
 
231
        if self.held_event is not None:
 
232
            self.release_held_event()
 
233
        self.push_event(event)
 
234
 
 
235
    def push_event(self, event):
 
236
        '''Push the event to the EQ.'''
 
237
        # ignore this trash
 
238
        if event.mask == evtcodes.IN_IGNORED:
 
239
            return
 
240
 
 
241
        # change the pattern IN_CREATE to FS_FILE_CREATE or FS_DIR_CREATE
 
242
        try:
 
243
            evt_name = NAME_TRANSLATIONS[event.mask]
 
244
        except:
 
245
            raise KeyError("Unhandled Event in INotify: %s" % event)
 
246
 
 
247
        # push the event
 
248
        fullpath = os.path.join(event.path, event.name)
 
249
 
 
250
        # check if the path is not frozen
 
251
        if self.frozen_path is not None:
 
252
            if os.path.dirname(fullpath) == self.frozen_path:
 
253
                # this will at least store the last one, for debug
 
254
                # purposses
 
255
                self.frozen_evts = (evt_name, fullpath)
 
256
                return
 
257
 
 
258
        if not self.is_ignored(fullpath):
 
259
            if evt_name == 'FS_DIR_DELETE':
 
260
                self.handle_dir_delete(fullpath)
 
261
            self.eq.push(evt_name, fullpath)
 
262
 
 
263
    def freeze_begin(self, path):
 
264
        """Puts in hold all the events for this path."""
 
265
        self.log.debug("Freeze begin: %r", path)
 
266
        self.frozen_path = path
 
267
        self.frozen_evts = False
 
268
 
 
269
    def freeze_rollback(self):
 
270
        """Unfreezes the frozen path, reseting to idle state."""
 
271
        self.log.debug("Freeze rollback: %r", self.frozen_path)
 
272
        self.frozen_path = None
 
273
        self.frozen_evts = False
 
274
 
 
275
    def freeze_commit(self, events):
 
276
        """Unfreezes the frozen path, sending received events if not dirty.
 
277
 
 
278
        If events for that path happened:
 
279
            - return True
 
280
        else:
 
281
            - push the here received events, return False
 
282
        """
 
283
        self.log.debug("Freeze commit: %r (%d events)",
 
284
                                                self.frozen_path, len(events))
 
285
        if self.frozen_evts:
 
286
            # ouch! we're dirty!
 
287
            self.log.debug("Dirty by %s", self.frozen_evts)
 
288
            self.frozen_evts = False
 
289
            return True
 
290
 
 
291
        # push the received events
 
292
        for evt_name, path in events:
 
293
            if not self.is_ignored(path):
 
294
                self.eq.push(evt_name, path)
 
295
 
 
296
        self.frozen_path = None
 
297
        self.frozen_evts = False
 
298
        return False
 
299
 
 
300
    def handle_dir_delete(self, fullpath):
 
301
        """ handle the case of move a dir to a non-watched directory """
 
302
        paths = self.eq.fs.get_paths_starting_with(fullpath)
 
303
        paths.sort(reverse=True)
 
304
        for path, is_dir in paths:
 
305
            if path == fullpath:
 
306
                continue
 
307
            if is_dir:
 
308
                self.eq.push('FS_DIR_DELETE', path)
 
309
            else:
 
310
                self.eq.push('FS_FILE_DELETE', path)
 
311
 
 
312
 
 
313
class EventQueue(object):
 
314
    '''Manages the events from different sources and distributes them.'''
 
315
 
 
316
    def __init__(self, fs):
 
317
        self._listeners = []
 
318
 
 
319
        self.log = logging.getLogger('ubuntuone.SyncDaemon.EQ')
 
320
        self.fs = fs
 
321
        # hook inotify
 
322
        self._inotify_reader = None
 
323
        self._inotify_wm = wm = pyinotify.WatchManager()
 
324
        self._processor = _INotifyProcessor(self)
 
325
        self._inotify_notifier = pyinotify.Notifier(wm, self._processor)
 
326
        self._hook_inotify_to_twisted(wm, self._inotify_notifier)
 
327
        self._watchs = {}
 
328
        self.dispatching = False
 
329
        self.dispatch_queue = Queue()
 
330
        self.empty_event_queue_callbacks = set()
 
331
 
 
332
    def add_empty_event_queue_callback(self, callback):
 
333
        """add a callback for when the even queue has no more events."""
 
334
        self.empty_event_queue_callbacks.add(callback)
 
335
        if not self.dispatching and self.dispatch_queue.empty():
 
336
            if callable(callback):
 
337
                callback()
 
338
 
 
339
    def remove_empty_event_queue_callback(self, callback):
 
340
        """remove the callback"""
 
341
        self.empty_event_queue_callbacks.remove(callback)
 
342
 
 
343
    def _hook_inotify_to_twisted(self, wm, notifier):
 
344
        '''This will hook inotify to twisted.'''
 
345
 
 
346
        class MyReader(abstract.FileDescriptor):
 
347
            '''Chain between inotify and twisted.'''
 
348
            # will never pass a fd to write, pylint: disable-msg=W0223
 
349
 
 
350
            def fileno(self):
 
351
                '''Returns the fileno to select().'''
 
352
                return wm._fd
 
353
 
 
354
            def doRead(self):
 
355
                '''Called when twisted says there's something to read.'''
 
356
                notifier.read_events()
 
357
                notifier.process_events()
 
358
 
 
359
        reader = MyReader()
 
360
        self._inotify_reader = reader
 
361
        reactor.addReader(reader)
 
362
 
 
363
    def shutdown(self):
 
364
        '''Prepares the EQ to be closed.'''
 
365
        self._inotify_notifier.stop()
 
366
        reactor.removeReader(self._inotify_reader)
 
367
 
 
368
    def inotify_rm_watch(self, dirpath):
 
369
        '''Remove watch from a dir.'''
 
370
        try:
 
371
            wd = self._watchs[dirpath]
 
372
        except KeyError:
 
373
            raise ValueError("The path %r is not watched right now!" % dirpath)
 
374
        result = self._inotify_wm.rm_watch(wd)
 
375
        if not result[wd]:
 
376
            raise RuntimeError("The path %r couldn't be removed!" % dirpath)
 
377
        del self._watchs[dirpath]
 
378
 
 
379
    def inotify_add_watch(self, dirpath):
 
380
        '''Add watch to a dir.'''
 
381
        self.log.debug("Adding inotify watch to %r", dirpath)
 
382
        result = self._inotify_wm.add_watch(dirpath, INOTIFY_EVENTS)
 
383
        self._watchs[dirpath] = result[dirpath]
 
384
 
 
385
    def unsubscribe(self, obj):
 
386
        '''Removes the callback object from the listener queue.
 
387
 
 
388
        @param obj: the callback object to remove from the queue.
 
389
        '''
 
390
        self._listeners.remove(obj)
 
391
 
 
392
    def subscribe(self, obj):
 
393
        '''Stores the callback object to whom push the events when received.
 
394
 
 
395
        @param obj: the callback object to add to the listener queue.
 
396
 
 
397
        These objects should provide a 'handle_FOO' to receive the FOO
 
398
        events (replace FOO with the desired event).
 
399
        '''
 
400
        if obj not in self._listeners:
 
401
            self._listeners.append(obj)
 
402
 
 
403
    def push(self, event_name, *args, **kwargs):
 
404
        '''Receives a push for all events.
 
405
 
 
406
        The signature for each event is forced on each method, not in this
 
407
        'push' arguments.
 
408
        '''
 
409
        self.log.debug("push_event: %s, args:%s, kw:%s",
 
410
                     event_name, args, kwargs)
 
411
        # get the event parameters
 
412
        try:
 
413
            event_params = EVENTS[event_name]
 
414
        except KeyError:
 
415
            msg = "The received event_name (%r) is not valid!" % event_name
 
416
            self.log.error(msg)
 
417
            raise InvalidEventError(msg)
 
418
 
 
419
        # validate that the received arguments are ok
 
420
        if args:
 
421
            if len(args) > len(event_params):
 
422
                msg = "Too many arguments! (should receive %s)" % event_params
 
423
                self.log.error(msg)
 
424
                raise TypeError(msg)
 
425
            event_params = event_params[len(args):]
 
426
 
 
427
        s_eventparms = set(event_params)
 
428
        s_kwargs = set(kwargs.keys())
 
429
        if s_eventparms != s_kwargs:
 
430
            msg = "Wrong arguments for event %s (should receive %s, got %s)" \
 
431
                  % (event_name, event_params, kwargs.keys())
 
432
            self.log.error(msg)
 
433
            raise TypeError(msg)
 
434
 
 
435
        # check if we are currently dispatching an event
 
436
        self.dispatch_queue.put((event_name, args, kwargs))
 
437
        if not self.dispatching:
 
438
            self.dispatching = True
 
439
            while True:
 
440
                try:
 
441
                    event_name, args, kwargs = \
 
442
                            self.dispatch_queue.get(block=False)
 
443
                    self._dispatch(event_name, *args, **kwargs)
 
444
                except Empty:
 
445
                    self.dispatching = False
 
446
                    for callable in self.empty_event_queue_callbacks.copy():
 
447
                        callable()
 
448
                    break
 
449
 
 
450
    def _dispatch(self, event_name, *args, **kwargs):
 
451
        """ push the event to all listeners. """
 
452
        # check listeners to see if have the proper method, and call it
 
453
        meth_name = "handle_" + event_name
 
454
        for listener in self._listeners:
 
455
            # don't use hasattr because is expensive and
 
456
            # catch too many errors
 
457
            # we need to catch all here, pylint: disable-msg=W0703
 
458
            method = self._get_listener_method(listener, meth_name, event_name)
 
459
            if method is not None:
 
460
                try:
 
461
                    method(*args, **kwargs)
 
462
                except Exception:
 
463
                    self.log.exception("Error encountered while handling: %s"
 
464
                                     " in %s", event_name, listener)
 
465
 
 
466
    def _get_listener_method(self, listener, method_name, event_name):
 
467
        """ returns the method named method_name or hanlde_default from the
 
468
        listener. Or None if the methods are not defined in the listener.
 
469
        """
 
470
        method = getattr(listener, method_name, None)
 
471
        if method is None:
 
472
            method = getattr(listener, DEFAULT_HANDLER, None)
 
473
            if method is not None:
 
474
                method = functools.partial(method, event_name)
 
475
        return method
 
476
 
 
477
    def freeze_begin(self, path):
 
478
        """Puts in hold all the events for this path."""
 
479
        if self._processor.frozen_path is not None:
 
480
            raise ValueError("There's something already frozen!")
 
481
        self._processor.freeze_begin(path)
 
482
 
 
483
    def freeze_rollback(self):
 
484
        """Unfreezes the frozen path, reseting to idle state."""
 
485
        if self._processor.frozen_path is None:
 
486
            raise ValueError("Rolling back with nothing frozen!")
 
487
        self._processor.freeze_rollback()
 
488
 
 
489
    def freeze_commit(self, events):
 
490
        """Unfreezes the frozen path, sending received events if not dirty.
 
491
 
 
492
        If events for that path happened:
 
493
            - return True
 
494
        else:
 
495
            - push the here received events, return False
 
496
        """
 
497
        if self._processor.frozen_path is None:
 
498
            raise ValueError("Commiting with nothing frozen!")
 
499
 
 
500
        d = defer.execute(self._processor.freeze_commit, events)
 
501
        return d