~ubuntu-branches/ubuntu/oneiric/ubuntuone-client/oneiric

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/event_queue.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2011-02-23 18:34:09 UTC
  • mfrom: (1.1.45 upstream)
  • Revision ID: james.westby@ubuntu.com-20110223183409-535o7yo165wbjmca
Tags: 1.5.5-0ubuntu1
* New upstream release.
  - Subscribing to a RO share will not download content (LP: #712528)
  - Can't synchronize "~/Ubuntu One Music" (LP: #714976)
  - Syncdaemon needs to show progress in Unity launcher (LP: #702116)
  - Notifications say "your cloud" (LP: #715887)
  - No longer requires python-libproxy
  - Recommend unity and indicator libs by default

Show diffs side-by-side

added added

removed removed

Lines of Context:
2
2
#
3
3
# Author: Facundo Batista <facundo@canonical.com>
4
4
#
5
 
# Copyright 2009 Canonical Ltd.
 
5
# Copyright 2009-2011 Canonical Ltd.
6
6
#
7
7
# This program is free software: you can redistribute it and/or modify it
8
8
# under the terms of the GNU General Public License version 3, as published
17
17
# with this program.  If not, see <http://www.gnu.org/licenses/>.
18
18
"""Module that implements the Event Queue machinery."""
19
19
 
 
20
import collections
20
21
import functools
21
22
import logging
22
23
 
23
 
from Queue import Queue, Empty
24
 
 
25
24
from ubuntuone.platform import FilesystemMonitor
26
25
 
27
 
class InvalidEventError(Exception):
28
 
    """Received an Event that is not in the allowed list."""
29
 
 
30
26
# these are our internal events, what is inserted into the whole system
31
27
EVENTS = {
32
28
    'FS_FILE_OPEN': ('path',),
171
167
    """Manages the events from different sources and distributes them."""
172
168
 
173
169
    def __init__(self, fs, ignore_config=None):
174
 
        self._listeners = []
 
170
        self.listener_map = {}
175
171
 
176
172
        self.log = logging.getLogger('ubuntuone.SyncDaemon.EQ')
177
173
        self.fs = fs
179
175
        self.monitor = FilesystemMonitor(self, fs, ignore_config)
180
176
 
181
177
        self.dispatching = False
182
 
        self.dispatch_queue = Queue()
 
178
        self.dispatch_queue = collections.deque()
 
179
        self._have_empty_eq_cback = False
183
180
        self.empty_event_queue_callbacks = set()
184
181
 
185
182
    def add_to_mute_filter(self, event, **info):
192
189
 
193
190
    def add_empty_event_queue_callback(self, callback):
194
191
        """Add a callback for when the even queue has no more events."""
 
192
        self._have_empty_eq_cback = True
195
193
        self.empty_event_queue_callbacks.add(callback)
196
 
        if not self.dispatching and self.dispatch_queue.empty():
 
194
        if not self.dispatching and not self.dispatch_queue:
197
195
            if callable(callback):
198
196
                callback()
199
197
 
200
198
    def remove_empty_event_queue_callback(self, callback):
201
199
        """Remove the callback."""
202
200
        self.empty_event_queue_callbacks.remove(callback)
 
201
        if not self.empty_event_queue_callbacks:
 
202
            self._have_empty_eq_cback = False
203
203
 
204
204
    def shutdown(self):
205
205
        """Make the monitor shutdown."""
222
222
 
223
223
        @param obj: the callback object to remove from the queue.
224
224
        """
225
 
        self._listeners.remove(obj)
 
225
        for k, v in self.listener_map.items():
 
226
            v.pop(obj, None)
 
227
            if not v:
 
228
                del self.listener_map[k]
226
229
 
227
230
    def subscribe(self, obj):
228
231
        """Store the callback object to whom push the events when received.
232
235
        These objects should provide a 'handle_FOO' to receive the FOO
233
236
        events (replace FOO with the desired event).
234
237
        """
235
 
        if obj not in self._listeners:
236
 
            self._listeners.append(obj)
 
238
        for event_name in EVENTS.keys():
 
239
            meth_name = "handle_" + event_name
 
240
            method = self._get_listener_method(obj, meth_name, event_name)
 
241
            if method is not None:
 
242
                self.listener_map.setdefault(event_name, {})[obj] = method
237
243
 
238
244
    def push(self, event_name, **kwargs):
239
245
        """Receives a push for all events.
250
256
        else:
251
257
            self.log.debug(log_msg, event_name, kwargs)
252
258
 
253
 
        # get the event parameters
254
 
        try:
255
 
            event_params = EVENTS[event_name]
256
 
        except KeyError:
257
 
            msg = "The received event_name (%r) is not valid!" % event_name
258
 
            self.log.error(msg)
259
 
            raise InvalidEventError(msg)
260
 
 
261
 
        # validate that the received arguments are ok
262
 
        s_eventparms = set(event_params)
263
 
        s_kwargs = set(kwargs.keys())
264
 
        if s_eventparms != s_kwargs:
265
 
            msg = "Wrong arguments for event %s (should receive %s, got %s)" \
266
 
                  % (event_name, event_params, kwargs.keys())
267
 
            self.log.error(msg)
268
 
            raise TypeError(msg)
269
 
 
270
259
        # check if we are currently dispatching an event
271
 
        self.dispatch_queue.put((event_name, kwargs))
 
260
        self.dispatch_queue.append((event_name, kwargs))
272
261
        if not self.dispatching:
273
262
            self.dispatching = True
274
263
            while True:
275
264
                try:
276
 
                    event_name, kwargs = self.dispatch_queue.get(block=False)
 
265
                    event_name, kwargs = self.dispatch_queue.popleft()
277
266
                    self._dispatch(event_name, **kwargs)
278
 
                except Empty:
 
267
                except IndexError:
279
268
                    self.dispatching = False
280
 
                    for callable in self.empty_event_queue_callbacks.copy():
281
 
                        callable()
 
269
                    if self._have_empty_eq_cback:
 
270
                        for cback in self.empty_event_queue_callbacks.copy():
 
271
                            cback()
282
272
                    break
283
273
 
284
274
    def _dispatch(self, event_name, **kwargs):
285
275
        """ push the event to all listeners. """
 
276
        try:
 
277
            listeners = self.listener_map[event_name]
 
278
        except KeyError:
 
279
            # no listener for this
 
280
            return
 
281
 
286
282
        # check listeners to see if have the proper method, and call it
287
 
        meth_name = "handle_" + event_name
288
 
        for listener in self._listeners:
289
 
            # don't use hasattr because is expensive and
290
 
            # catch too many errors
291
 
            # we need to catch all here, pylint: disable-msg=W0703
292
 
            method = self._get_listener_method(listener, meth_name, event_name)
293
 
            if method is not None:
294
 
                try:
295
 
                    method(**kwargs)
296
 
                except Exception:
297
 
                    self.log.exception("Error encountered while handling: %s"
298
 
                                     " in %s", event_name, listener)
 
283
        for listener, method in listeners.items():
 
284
            try:
 
285
                method(**kwargs)
 
286
            except Exception:
 
287
                self.log.exception("Error encountered while handling: %s "
 
288
                                   "in %s", event_name, listener)
299
289
 
300
290
    def _get_listener_method(self, listener, method_name, event_name):
301
291
        """ returns the method named method_name or hanlde_default from the