~ubuntu-branches/ubuntu/lucid/ubuntuone-client/lucid-updates

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/event_queue.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2009-10-16 13:35:00 UTC
  • mfrom: (1.1.12 upstream)
  • Revision ID: james.westby@ubuntu.com-20091016133500-t2zacp5ozfj1hpug
Tags: 1.0.2-0ubuntu1
* New upstream release.
  - Avoid conflicts due to bouncing FS events (LP: #449605)
  - Don't try to authorize if connect on start is never (LP: #451154)

Show diffs side-by-side

added added

removed removed

Lines of Context:
149
149
 
150
150
DEFAULT_HANDLER = "handle_default" # receives (event_name, *args, **kwargs)
151
151
 
 
152
 
 
153
class MuteFilter(object):
 
154
    '''Stores what needs to be muted.'''
 
155
    def __init__(self):
 
156
        self._cnt = {}
 
157
        self.log = logging.getLogger('ubuntuone.SyncDaemon.MuteFilter')
 
158
 
 
159
    def add(self, element):
 
160
        '''Adds and element to the filter.'''
 
161
        self.log.debug("Adding: %s", element)
 
162
        self._cnt[element] = self._cnt.get(element, 0) + 1
 
163
 
 
164
    def pop(self, element):
 
165
        '''Pops an element from the filter, if there, and returns if it was.'''
 
166
        if element not in self._cnt:
 
167
            return False
 
168
 
 
169
        self._cnt[element] = self._cnt.get(element, 0) - 1
 
170
        if not self._cnt[element]:
 
171
            # reached zero
 
172
            del self._cnt[element]
 
173
 
 
174
        # log what happened and how many items we have left
 
175
        q = sum(self._cnt.itervalues())
 
176
        self.log.debug("Blocking %s (%d left)", element, q)
 
177
 
 
178
        return True
 
179
 
 
180
 
152
181
class _INotifyProcessor(pyinotify.ProcessEvent):
153
182
    '''Helper class that is called from inpotify when an event happens.
154
183
 
162
191
        self.timer = None
163
192
        self.frozen_path = None
164
193
        self.frozen_evts = False
 
194
        self._to_mute = MuteFilter()
 
195
 
 
196
    def add_to_mute_filter(self, event, *paths):
 
197
        '''Add an event and path(s) to the mute filter.'''
 
198
        # all events have one path except the MOVEs
 
199
        if event in ("FS_FILE_MOVE", "FS_DIR_MOVE"):
 
200
            f_path, t_path = paths
 
201
            is_from_forreal = not self.is_ignored(f_path)
 
202
            is_to_forreal = not self.is_ignored(t_path)
 
203
            if is_from_forreal and is_to_forreal:
 
204
                self._to_mute.add((event, f_path, t_path))
 
205
            elif is_to_forreal:
 
206
                self._to_mute.add(('FS_FILE_CREATE', t_path))
 
207
                self._to_mute.add(('FS_FILE_CLOSE_WRITE', t_path))
 
208
        else:
 
209
            path = paths[0]
 
210
            if not self.is_ignored(path):
 
211
                self._to_mute.add((event, path))
165
212
 
166
213
    def on_timeout(self):
167
214
        '''Called on timeout.'''
174
221
            try:
175
222
                self.timer.cancel()
176
223
            except error.AlreadyCalled:
177
 
                # self.timeout() was *just* called, do noting here
 
224
                # self.timeout() was *just* called, do nothing here
178
225
                return
179
226
        self.push_event(self.held_event)
180
227
        self.held_event = None
234
281
                try:
235
282
                    self.timer.cancel()
236
283
                except error.AlreadyCalled:
237
 
                    # self.timeout() was *just* called, do noting here
 
284
                    # self.timeout() was *just* called, do nothing here
238
285
                    pass
239
286
                else:
240
287
                    f_path = os.path.join(self.held_event.path,
255
302
                            evtname = "FS_FILE_"
256
303
                        if f_share_id != t_share_id:
257
304
                            # if the share_id are != push a delete/create
258
 
                            self.eq.push(evtname+"DELETE", f_path)
259
 
                            self.eq.push(evtname+"CREATE", t_path)
 
305
                            self.eq_push(evtname+"DELETE", f_path)
 
306
                            self.eq_push(evtname+"CREATE", t_path)
260
307
                            if not this_is_a_dir:
261
 
                                self.eq.push('FS_FILE_CLOSE_WRITE', t_path)
 
308
                                self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
262
309
                        else:
263
 
                            self.eq.push(evtname+"MOVE", f_path, t_path)
 
310
                            self.eq_push(evtname+"MOVE", f_path, t_path)
264
311
                    elif is_to_forreal:
265
312
                        # this is the case of a MOVE from something ignored
266
313
                        # to a valid filename
267
 
                        self.eq.push('FS_FILE_CREATE', t_path)
268
 
                        self.eq.push('FS_FILE_CLOSE_WRITE', t_path)
 
314
                        self.eq_push('FS_FILE_CREATE', t_path)
 
315
                        self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
269
316
 
270
317
                    self.held_event = None
271
318
                return
279
326
            self.push_event(event)
280
327
            t_path = os.path.join(event.path, event.name)
281
328
            if not os.path.isdir(t_path):
282
 
                self.eq.push('FS_FILE_CLOSE_WRITE', t_path)
 
329
                self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
 
330
 
 
331
    def eq_push(self, *event_data):
 
332
        '''Sends to EQ the event data, maybe filtering it.'''
 
333
        if not self._to_mute.pop(event_data):
 
334
            self.eq.push(*event_data)
283
335
 
284
336
    def process_default(self, event):
285
337
        '''Push the event into the EventQueue.'''
313
365
        if not self.is_ignored(fullpath):
314
366
            if evt_name == 'FS_DIR_DELETE':
315
367
                self.handle_dir_delete(fullpath)
316
 
            self.eq.push(evt_name, fullpath)
 
368
            self.eq_push(evt_name, fullpath)
317
369
 
318
370
    def freeze_begin(self, path):
319
371
        """Puts in hold all the events for this path."""
346
398
        # push the received events
347
399
        for evt_name, path in events:
348
400
            if not self.is_ignored(path):
349
 
                self.eq.push(evt_name, path)
 
401
                self.eq_push(evt_name, path)
350
402
 
351
403
        self.frozen_path = None
352
404
        self.frozen_evts = False
360
412
            if path == fullpath:
361
413
                continue
362
414
            if is_dir:
363
 
                self.eq.push('FS_DIR_DELETE', path)
 
415
                self.eq_push('FS_DIR_DELETE', path)
364
416
            else:
365
 
                self.eq.push('FS_FILE_DELETE', path)
 
417
                self.eq_push('FS_FILE_DELETE', path)
366
418
 
367
419
 
368
420
class EventQueue(object):
384
436
        self.dispatch_queue = Queue()
385
437
        self.empty_event_queue_callbacks = set()
386
438
 
 
439
    def add_to_mute_filter(self, *info):
 
440
        '''Adds info to mute filter in the processor.'''
 
441
        self._processor.add_to_mute_filter(*info)
 
442
 
387
443
    def add_empty_event_queue_callback(self, callback):
388
444
        """add a callback for when the even queue has no more events."""
389
445
        self.empty_event_queue_callbacks.add(callback)