~ubuntu-branches/ubuntu/oneiric/ubuntuone-client/oneiric

« back to all changes in this revision

Viewing changes to ubuntuone/syncdaemon/event_queue.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2010-12-14 14:57:03 UTC
  • mto: This revision was merged to the branch mainline in revision 61.
  • Revision ID: james.westby@ubuntu.com-20101214145703-6rqksi59jbeb4xpy
Tags: upstream-1.5.1
ImportĀ upstreamĀ versionĀ 1.5.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
19
19
 
20
20
import functools
21
21
import logging
22
 
import os
23
 
import re
24
 
 
25
 
import pyinotify
26
 
from twisted.internet import abstract, reactor, error, defer
27
22
 
28
23
from Queue import Queue, Empty
29
24
 
 
25
from ubuntuone.platform import FilesystemMonitor
 
26
 
30
27
class InvalidEventError(Exception):
31
28
    """Received an Event that is not in the allowed list."""
32
29
 
67
64
    'AQ_UPLOAD_ERROR': ('share_id', 'node_id', 'error', 'hash'),
68
65
    'AQ_SHARES_LIST': ('shares_list',),
69
66
    'AQ_LIST_SHARES_ERROR': ('error',),
 
67
    'AQ_SHARE_INVITATION_SENT': ('marker',),
70
68
    'AQ_CREATE_SHARE_OK': ('share_id', 'marker'),
71
69
    'AQ_CREATE_SHARE_ERROR': ('marker', 'error'),
72
70
    'AQ_DELETE_SHARE_OK': ('share_id',),
103
101
    'SV_VOLUME_CREATED': ('volume',),
104
102
    'SV_VOLUME_DELETED': ('volume_id',),
105
103
    'SV_VOLUME_NEW_GENERATION': ('volume_id', 'generation'),
 
104
    'SV_FILE_NEW': ('volume_id', 'node_id', 'parent_id', 'name'),
 
105
    'SV_DIR_NEW': ('volume_id', 'node_id', 'parent_id', 'name'),
 
106
    'SV_FILE_DELETED': ('volume_id', 'node_id', 'is_dir'),
106
107
 
107
108
    'HQ_HASH_NEW': ('path', 'hash', 'crc32', 'size', 'stat'),
108
109
    'HQ_HASH_ERROR': ('mdid',),
140
141
    'SYS_QUOTA_EXCEEDED': ('volume_id', 'free_bytes'),
141
142
    'SYS_BROKEN_NODE': ('volume_id', 'node_id', 'path', 'mdid'),
142
143
 
 
144
    'FSM_FILE_CONFLICT': ('old_name', 'new_name'),
 
145
    'FSM_DIR_CONFLICT': ('old_name', 'new_name'),
 
146
 
143
147
    'VM_UDF_SUBSCRIBED': ('udf',),
144
148
    'VM_UDF_SUBSCRIBE_ERROR': ('udf_id', 'error'),
145
149
    'VM_UDF_UNSUBSCRIBED': ('udf',),
147
151
    'VM_UDF_CREATED': ('udf',),
148
152
    'VM_UDF_CREATE_ERROR': ('path', 'error'),
149
153
    'VM_SHARE_CREATED': ('share_id',),
150
 
    'VM_SHARE_DELETED': ('udf',),
151
 
    'VM_SHARE_DELETE_ERROR': ('path', 'error'),
 
154
    'VM_SHARE_DELETED': ('share',),
 
155
    'VM_SHARE_DELETE_ERROR': ('share_id', 'error'),
152
156
    'VM_VOLUME_DELETED': ('volume',),
153
157
    'VM_VOLUME_DELETE_ERROR': ('volume_id', 'error'),
154
158
    'VM_SHARE_CHANGED': ('share_id',),
155
159
 
156
160
}
157
161
 
158
 
if 'IN_CREATE' in vars(pyinotify.EventsCodes):
159
 
    # < 0.8; event codes in EventsCodes; events have is_dir attribute
160
 
    evtcodes = pyinotify.EventsCodes
161
 
    event_is_dir = lambda event: event.is_dir
162
 
else:
163
 
    # >= 0.8; event codes in pyinotify itself; events have dir attribute
164
 
    evtcodes = pyinotify
165
 
    event_is_dir = lambda event: event.dir
166
 
 
167
 
# translates quickly the event and it's is_dir state to our standard events
168
 
NAME_TRANSLATIONS = {
169
 
    evtcodes.IN_OPEN: 'FS_FILE_OPEN',
170
 
    evtcodes.IN_CLOSE_NOWRITE: 'FS_FILE_CLOSE_NOWRITE',
171
 
    evtcodes.IN_CLOSE_WRITE: 'FS_FILE_CLOSE_WRITE',
172
 
    evtcodes.IN_CREATE: 'FS_FILE_CREATE',
173
 
    evtcodes.IN_CREATE | evtcodes.IN_ISDIR: 'FS_DIR_CREATE',
174
 
    evtcodes.IN_DELETE: 'FS_FILE_DELETE',
175
 
    evtcodes.IN_DELETE | evtcodes.IN_ISDIR: 'FS_DIR_DELETE',
176
 
    evtcodes.IN_MOVED_FROM: 'FS_FILE_DELETE',
177
 
    evtcodes.IN_MOVED_FROM | evtcodes.IN_ISDIR: 'FS_DIR_DELETE',
178
 
    evtcodes.IN_MOVED_TO: 'FS_FILE_CREATE',
179
 
    evtcodes.IN_MOVED_TO | evtcodes.IN_ISDIR: 'FS_DIR_CREATE',
180
 
}
181
 
 
182
 
# these are the events that will listen from inotify
183
 
INOTIFY_EVENTS_GENERAL = (
184
 
    evtcodes.IN_OPEN |
185
 
    evtcodes.IN_CLOSE_NOWRITE |
186
 
    evtcodes.IN_CLOSE_WRITE |
187
 
    evtcodes.IN_CREATE |
188
 
    evtcodes.IN_DELETE |
189
 
    evtcodes.IN_MOVED_FROM |
190
 
    evtcodes.IN_MOVED_TO |
191
 
    evtcodes.IN_MOVE_SELF
192
 
)
193
 
INOTIFY_EVENTS_ANCESTORS = (
194
 
    evtcodes.IN_DELETE |
195
 
    evtcodes.IN_MOVED_FROM |
196
 
    evtcodes.IN_MOVED_TO |
197
 
    evtcodes.IN_MOVE_SELF
198
 
)
199
 
 
200
162
DEFAULT_HANDLER = "handle_default" # receives (event_name, *args, **kwargs)
201
163
 
202
164
 
203
 
def validate_filename(real_func):
204
 
    """Decorator that validates the filename."""
205
 
    def func(self, event):
206
 
        """If valid, executes original function."""
207
 
        try:
208
 
            # validate UTF-8
209
 
            event.name.decode("utf8")
210
 
        except UnicodeDecodeError:
211
 
            dirname = event.path.decode("utf8")
212
 
            self.invnames_log.info("%s in %r: path %r", event.maskname,
213
 
                                   dirname, event.name)
214
 
            self.eq.push('FS_INVALID_NAME', dirname, event.name)
215
 
        else:
216
 
            real_func(self, event)
217
 
    return func
218
 
 
219
 
 
220
 
class MuteFilter(object):
221
 
    """Stores what needs to be muted."""
222
 
    def __init__(self):
223
 
        self._cnt = {}
224
 
        self.log = logging.getLogger('ubuntuone.SyncDaemon.MuteFilter')
225
 
 
226
 
    def add(self, element):
227
 
        """Add and element to the filter."""
228
 
        self.log.debug("Adding: %s", element)
229
 
        self._cnt[element] = self._cnt.get(element, 0) + 1
230
 
 
231
 
    def rm(self, element):
232
 
        """Remove an element from the filter."""
233
 
        self.log.debug("Removing: %s", element)
234
 
        new_val = self._cnt[element] - 1
235
 
        if new_val == 0:
236
 
            del self._cnt[element]
237
 
        else:
238
 
            self._cnt[element] = new_val
239
 
 
240
 
    def pop(self, element):
241
 
        """Pops an element from the filter, if there, and returns if it was."""
242
 
        if element not in self._cnt:
243
 
            return False
244
 
 
245
 
        self._cnt[element] = self._cnt.get(element, 0) - 1
246
 
        if not self._cnt[element]:
247
 
            # reached zero
248
 
            del self._cnt[element]
249
 
 
250
 
        # log what happened and how many items we have left
251
 
        q = sum(self._cnt.itervalues())
252
 
        self.log.debug("Blocking %s (%d left)", element, q)
253
 
 
254
 
        return True
255
 
 
256
 
 
257
 
class _AncestorsINotifyProcessor(pyinotify.ProcessEvent):
258
 
    """inotify's processor when an event happens on an UDFs ancestor."""
259
 
    def __init__(self, eq):
260
 
        self.log = logging.getLogger('ubuntuone.SyncDaemon.AncestorsINotProc')
261
 
        self.eq = eq
262
 
 
263
 
    def _get_udfs(self, path):
264
 
        """Yield all the subscribed udfs under a specific path."""
265
 
        pathsep = path + os.path.sep
266
 
        for udf in self.eq.fs.vm.udfs.itervalues():
267
 
            udfpath = udf.path + os.path.sep
268
 
            if udfpath.startswith(pathsep) and udf.subscribed:
269
 
                yield udf
270
 
 
271
 
    def process_IN_MOVE_SELF(self, event):
272
 
        """Don't do anything here.
273
 
 
274
 
        We just turned this event on because pyinotify does some
275
 
        path-fixing in its internal processing when this happens.
276
 
        """
277
 
    process_IN_MOVED_TO = process_IN_MOVE_SELF
278
 
 
279
 
    def process_IN_MOVED_FROM(self, event):
280
 
        """Getting it out or renaming means unsuscribe."""
281
 
        if event.mask & evtcodes.IN_ISDIR:
282
 
            unsubscribed_udfs = set()
283
 
            for udf in self._get_udfs(event.pathname):
284
 
                self.log.info("Got MOVED_FROM on path %r, unsubscribing "
285
 
                               "udf %s", event.pathname, udf)
286
 
                self.eq.fs.vm.unsubscribe_udf(udf.volume_id)
287
 
                unsubscribed_udfs.add(udf)
288
 
            self._unwatch_ancestors(unsubscribed_udfs)
289
 
 
290
 
    def process_IN_DELETE(self, event):
291
 
        """Check to see if the UDF was deleted."""
292
 
        if event.mask & evtcodes.IN_ISDIR:
293
 
            deleted_udfs = set()
294
 
            for udf in self._get_udfs(event.pathname):
295
 
                self.log.info("Got DELETE on path %r, deleting udf %s",
296
 
                               event.pathname, udf)
297
 
                self.eq.fs.vm.delete_volume(udf.volume_id)
298
 
                deleted_udfs.add(udf)
299
 
            self._unwatch_ancestors(deleted_udfs)
300
 
 
301
 
    def _unwatch_ancestors(self, udfs):
302
 
        """Unwatch the ancestors of the recevied udfs only."""
303
 
        # collect all the ancestors of the received udfs
304
 
        ancestors_to_unwatch = set()
305
 
        for udf in udfs:
306
 
            ancestors_to_unwatch.update(set(udf.ancestors))
307
 
 
308
 
        # collect the ancestors of all the still subscribed UDFs except
309
 
        # the received ones
310
 
        sub_udfs = (u for u in self.eq.fs.vm.udfs.itervalues() if u.subscribed)
311
 
        udf_remain = set(sub_udfs) - udfs
312
 
        ancestors_to_keep = set()
313
 
        for udf in udf_remain:
314
 
            ancestors_to_keep.update(set(udf.ancestors))
315
 
 
316
 
        # unwatch only the ancestors of the received udfs
317
 
        only_these = ancestors_to_unwatch - ancestors_to_keep
318
 
        for ancestor in only_these:
319
 
            self.eq.inotify_rm_watch(ancestor)
320
 
 
321
 
 
322
 
class _GeneralINotifyProcessor(pyinotify.ProcessEvent):
323
 
    """inotify's processor when a general event happens.
324
 
 
325
 
    This class also catchs the MOVEs events, and synthetises a new
326
 
    FS_(DIR|FILE)_MOVE event when possible.
327
 
    """
328
 
    def __init__(self, eq, ignore_config=None):
329
 
        self.log = logging.getLogger('ubuntuone.SyncDaemon.GeneralINotProc')
330
 
        self.invnames_log = logging.getLogger(
331
 
                                        'ubuntuone.SyncDaemon.InvalidNames')
332
 
        self.eq = eq
333
 
        self.held_event = None
334
 
        self.timer = None
335
 
        self.frozen_path = None
336
 
        self.frozen_evts = False
337
 
        self._to_mute = MuteFilter()
338
 
        self.conflict_RE = re.compile(r"\.u1conflict(?:\.\d+)?$")
339
 
 
340
 
        if ignore_config is not None:
341
 
            self.log.info("Ignoring files: %s", ignore_config)
342
 
            # thanks Chipaca for the following "regex composing"
343
 
            complex = '|'.join('(?:' + r + ')' for r in ignore_config)
344
 
            self.ignore_RE = re.compile(complex)
345
 
        else:
346
 
            self.ignore_RE = None
347
 
 
348
 
    def _mute_filter(self, action, event, *paths):
349
 
        """Really touches the mute filter."""
350
 
        # all events have one path except the MOVEs
351
 
        if event in ("FS_FILE_MOVE", "FS_DIR_MOVE"):
352
 
            f_path, t_path = paths
353
 
            is_from_forreal = not self.is_ignored(f_path)
354
 
            is_to_forreal = not self.is_ignored(t_path)
355
 
            if is_from_forreal and is_to_forreal:
356
 
                action((event, f_path, t_path))
357
 
            elif is_to_forreal:
358
 
                action(('FS_FILE_CREATE', t_path))
359
 
                action(('FS_FILE_CLOSE_WRITE', t_path))
360
 
        else:
361
 
            path = paths[0]
362
 
            if not self.is_ignored(path):
363
 
                action((event, path))
364
 
 
365
 
    def rm_from_mute_filter(self, event, *paths):
366
 
        """Remove an event and path(s) from the mute filter."""
367
 
        self._mute_filter(self._to_mute.rm, event, *paths)
368
 
 
369
 
    def add_to_mute_filter(self, event, *paths):
370
 
        """Add an event and path(s) to the mute filter."""
371
 
        self._mute_filter(self._to_mute.add, event, *paths)
372
 
 
373
 
    def on_timeout(self):
374
 
        """Called on timeout."""
375
 
        if self.held_event is not None:
376
 
            self.release_held_event(True)
377
 
 
378
 
    def release_held_event(self, timed_out=False):
379
 
        """Release the event on hold to fulfill its destiny."""
380
 
        if not timed_out:
381
 
            try:
382
 
                self.timer.cancel()
383
 
            except error.AlreadyCalled:
384
 
                # self.timeout() was *just* called, do nothing here
385
 
                return
386
 
        self.push_event(self.held_event)
387
 
        self.held_event = None
388
 
 
389
 
    @validate_filename
390
 
    def process_IN_OPEN(self, event):
391
 
        """Filter IN_OPEN to make it happen only in files."""
392
 
        if not (event.mask & evtcodes.IN_ISDIR):
393
 
            self.push_event(event)
394
 
 
395
 
    @validate_filename
396
 
    def process_IN_CLOSE_NOWRITE(self, event):
397
 
        """Filter IN_CLOSE_NOWRITE to make it happen only in files."""
398
 
        if not (event.mask & evtcodes.IN_ISDIR):
399
 
            self.push_event(event)
400
 
 
401
 
    def process_IN_MOVE_SELF(self, event):
402
 
        """Don't do anything here.
403
 
 
404
 
        We just turned this event on because pyinotify does some
405
 
        path-fixing in its internal processing when this happens.
406
 
 
407
 
        """
408
 
 
409
 
    @validate_filename
410
 
    def process_IN_MOVED_FROM(self, event):
411
 
        """Capture the MOVED_FROM to maybe syntethize FILE_MOVED."""
412
 
        if self.held_event is not None:
413
 
            self.release_held_event()
414
 
 
415
 
        self.held_event = event
416
 
        self.timer = reactor.callLater(1, self.on_timeout)
417
 
 
418
 
    def is_ignored(self, path):
419
 
        """should we ignore this path?"""
420
 
        # don't support symlinks yet
421
 
        if os.path.islink(path):
422
 
            return True
423
 
 
424
 
        # check if we are can read
425
 
        if os.path.exists(path) and not os.access(path, os.R_OK):
426
 
            self.log.warning("Ignoring path as we don't have enough "
427
 
                             "permissions to track it: %r", path)
428
 
            return True
429
 
 
430
 
        is_conflict = self.conflict_RE.search
431
 
        dirname, filename = os.path.split(path)
432
 
        # ignore conflicts
433
 
        if is_conflict(filename):
434
 
            return True
435
 
        # ignore partial downloads
436
 
        if filename == '.u1partial' or filename.startswith('.u1partial.'):
437
 
            return True
438
 
 
439
 
        # and ignore paths that are inside conflicts (why are we even
440
 
        # getting the event?)
441
 
        if any(part.endswith('.u1partial') or is_conflict(part)
442
 
               for part in dirname.split(os.path.sep)):
443
 
            return True
444
 
 
445
 
        if self.ignore_RE is not None and self.ignore_RE.match(filename):
446
 
            return True
447
 
 
448
 
        return False
449
 
 
450
 
    @validate_filename
451
 
    def process_IN_MOVED_TO(self, event):
452
 
        """Capture the MOVED_TO to maybe syntethize FILE_MOVED."""
453
 
        if self.held_event is not None:
454
 
            if event.cookie == self.held_event.cookie:
455
 
                try:
456
 
                    self.timer.cancel()
457
 
                except error.AlreadyCalled:
458
 
                    # self.timeout() was *just* called, do nothing here
459
 
                    pass
460
 
                else:
461
 
                    f_path_dir = self.held_event.path
462
 
                    f_path = os.path.join(f_path_dir, self.held_event.name)
463
 
                    t_path_dir = event.path
464
 
                    t_path = os.path.join(t_path_dir, event.name)
465
 
 
466
 
                    is_from_forreal = not self.is_ignored(f_path)
467
 
                    is_to_forreal = not self.is_ignored(t_path)
468
 
                    if is_from_forreal and is_to_forreal:
469
 
                        f_share_id = self.eq.fs.get_by_path(f_path_dir).share_id
470
 
                        t_share_id = self.eq.fs.get_by_path(t_path_dir).share_id
471
 
                        this_is_a_dir = event_is_dir(event)
472
 
                        if this_is_a_dir:
473
 
                            evtname = "FS_DIR_"
474
 
                        else:
475
 
                            evtname = "FS_FILE_"
476
 
                        if f_share_id != t_share_id:
477
 
                            # if the share_id are != push a delete/create
478
 
                            m = "Delete because of different shares: %r"
479
 
                            self.log.info(m, f_path)
480
 
                            self.eq_push(evtname+"DELETE", f_path)
481
 
                            self.eq_push(evtname+"CREATE", t_path)
482
 
                            if not this_is_a_dir:
483
 
                                self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
484
 
                        else:
485
 
                            self.eq.inotify_watch_fix(f_path, t_path)
486
 
                            self.eq_push(evtname+"MOVE", f_path, t_path)
487
 
                    elif is_to_forreal:
488
 
                        # this is the case of a MOVE from something ignored
489
 
                        # to a valid filename
490
 
                        this_is_a_dir = event_is_dir(event)
491
 
                        if this_is_a_dir:
492
 
                            evtname = "FS_DIR_"
493
 
                        else:
494
 
                            evtname = "FS_FILE_"
495
 
                        self.eq_push(evtname+"CREATE", t_path)
496
 
                        if not this_is_a_dir:
497
 
                            self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
498
 
 
499
 
                    self.held_event = None
500
 
                return
501
 
            else:
502
 
                self.release_held_event()
503
 
                self.push_event(event)
504
 
        else:
505
 
            # we don't have a held_event so this is a move from outside.
506
 
            # if it's a file move it's atomic on POSIX, so we aren't going to
507
 
            # receive a IN_CLOSE_WRITE, so let's fake it for files
508
 
            self.push_event(event)
509
 
            is_dir = event_is_dir(event)
510
 
            if not is_dir:
511
 
                t_path = os.path.join(event.path, event.name)
512
 
                self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
513
 
 
514
 
    def eq_push(self, *event_data):
515
 
        """Sends to EQ the event data, maybe filtering it."""
516
 
        if not self._to_mute.pop(event_data):
517
 
            self.eq.push(*event_data)
518
 
 
519
 
    @validate_filename
520
 
    def process_default(self, event):
521
 
        """Push the event into the EventQueue."""
522
 
        if self.held_event is not None:
523
 
            self.release_held_event()
524
 
        self.push_event(event)
525
 
 
526
 
    def push_event(self, event):
527
 
        """Push the event to the EQ."""
528
 
        # ignore this trash
529
 
        if event.mask == evtcodes.IN_IGNORED:
530
 
            return
531
 
 
532
 
        # change the pattern IN_CREATE to FS_FILE_CREATE or FS_DIR_CREATE
533
 
        try:
534
 
            evt_name = NAME_TRANSLATIONS[event.mask]
535
 
        except:
536
 
            raise KeyError("Unhandled Event in INotify: %s" % event)
537
 
 
538
 
        # push the event
539
 
        fullpath = os.path.join(event.path, event.name)
540
 
 
541
 
        # check if the path is not frozen
542
 
        if self.frozen_path is not None:
543
 
            if event.path == self.frozen_path:
544
 
                # this will at least store the last one, for debug
545
 
                # purposses
546
 
                self.frozen_evts = (evt_name, fullpath)
547
 
                return
548
 
 
549
 
        if not self.is_ignored(fullpath):
550
 
            if evt_name == 'FS_DIR_DELETE':
551
 
                self.handle_dir_delete(fullpath)
552
 
            self.eq_push(evt_name, fullpath)
553
 
 
554
 
    def freeze_begin(self, path):
555
 
        """Puts in hold all the events for this path."""
556
 
        self.log.debug("Freeze begin: %r", path)
557
 
        self.frozen_path = path
558
 
        self.frozen_evts = False
559
 
 
560
 
    def freeze_rollback(self):
561
 
        """Unfreezes the frozen path, reseting to idle state."""
562
 
        self.log.debug("Freeze rollback: %r", self.frozen_path)
563
 
        self.frozen_path = None
564
 
        self.frozen_evts = False
565
 
 
566
 
    def freeze_commit(self, events):
567
 
        """Unfreezes the frozen path, sending received events if not dirty.
568
 
 
569
 
        If events for that path happened:
570
 
            - return True
571
 
        else:
572
 
            - push the here received events, return False
573
 
        """
574
 
        self.log.debug("Freeze commit: %r (%d events)",
575
 
                                                self.frozen_path, len(events))
576
 
        if self.frozen_evts:
577
 
            # ouch! we're dirty!
578
 
            self.log.debug("Dirty by %s", self.frozen_evts)
579
 
            self.frozen_evts = False
580
 
            return True
581
 
 
582
 
        # push the received events
583
 
        for evt_name, path in events:
584
 
            if not self.is_ignored(path):
585
 
                self.eq_push(evt_name, path)
586
 
 
587
 
        self.frozen_path = None
588
 
        self.frozen_evts = False
589
 
        return False
590
 
 
591
 
    def handle_dir_delete(self, fullpath):
592
 
        """Some special work when a directory is deleted."""
593
 
        # remove the watch on that dir from our structures
594
 
        self.eq.inotify_rm_watch(fullpath)
595
 
 
596
 
        # handle the case of move a dir to a non-watched directory
597
 
        paths = self.eq.fs.get_paths_starting_with(fullpath,
598
 
                                                   include_base=False)
599
 
        paths.sort(reverse=True)
600
 
        for path, is_dir in paths:
601
 
            m = "Pushing deletion because of parent dir move: (is_dir=%s) %r"
602
 
            self.log.info(m, is_dir, path)
603
 
            if is_dir:
604
 
                self.eq.inotify_rm_watch(path)
605
 
                self.eq_push('FS_DIR_DELETE', path)
606
 
            else:
607
 
                self.eq_push('FS_FILE_DELETE', path)
608
 
 
609
 
 
610
165
class EventQueue(object):
611
166
    """Manages the events from different sources and distributes them."""
612
167
 
616
171
        self.log = logging.getLogger('ubuntuone.SyncDaemon.EQ')
617
172
        self.fs = fs
618
173
 
619
 
        # general inotify
620
 
        self._inotify_general_wm = wm = pyinotify.WatchManager()
621
 
        self._processor = _GeneralINotifyProcessor(self, ignore_config)
622
 
        self._inotify_notifier_gral = pyinotify.Notifier(wm, self._processor)
623
 
        self._inotify_reader_gral = self._hook_inotify_to_twisted(
624
 
                                            wm, self._inotify_notifier_gral)
625
 
        self._general_watchs = {}
626
 
 
627
 
        # ancestors inotify
628
 
        self._inotify_ancestors_wm = wm = pyinotify.WatchManager()
629
 
        antr_processor = _AncestorsINotifyProcessor(self)
630
 
        self._inotify_notifier_antr = pyinotify.Notifier(wm, antr_processor)
631
 
        self._inotify_reader_antr = self._hook_inotify_to_twisted(
632
 
                                            wm, self._inotify_notifier_antr)
633
 
        self._ancestors_watchs = {}
 
174
        self.monitor = FilesystemMonitor(self, fs, ignore_config)
634
175
 
635
176
        self.dispatching = False
636
177
        self.dispatch_queue = Queue()
638
179
 
639
180
    def add_to_mute_filter(self, *info):
640
181
        """Add info to mute filter in the processor."""
641
 
        self._processor.add_to_mute_filter(*info)
 
182
        self.monitor.add_to_mute_filter(*info)
642
183
 
643
184
    def rm_from_mute_filter(self, *info):
644
185
        """Remove info to mute filter in the processor."""
645
 
        self._processor.rm_from_mute_filter(*info)
 
186
        self.monitor.rm_from_mute_filter(*info)
646
187
 
647
188
    def add_empty_event_queue_callback(self, callback):
648
 
        """add a callback for when the even queue has no more events."""
 
189
        """Add a callback for when the even queue has no more events."""
649
190
        self.empty_event_queue_callbacks.add(callback)
650
191
        if not self.dispatching and self.dispatch_queue.empty():
651
192
            if callable(callback):
652
193
                callback()
653
194
 
654
195
    def remove_empty_event_queue_callback(self, callback):
655
 
        """remove the callback"""
 
196
        """Remove the callback."""
656
197
        self.empty_event_queue_callbacks.remove(callback)
657
198
 
658
 
    def _hook_inotify_to_twisted(self, wm, notifier):
659
 
        """This will hook inotify to twisted."""
660
 
 
661
 
        class MyReader(abstract.FileDescriptor):
662
 
            """Chain between inotify and twisted."""
663
 
            # will never pass a fd to write, pylint: disable-msg=W0223
664
 
 
665
 
            def fileno(self):
666
 
                """Returns the fileno to select()."""
667
 
                # pylint: disable-msg=W0212
668
 
                return wm._fd
669
 
 
670
 
            def doRead(self):
671
 
                """Called when twisted says there's something to read."""
672
 
                notifier.read_events()
673
 
                notifier.process_events()
674
 
 
675
 
        reader = MyReader()
676
 
        reactor.addReader(reader)
677
 
        return reader
678
 
 
679
199
    def shutdown(self):
680
 
        """Prepares the EQ to be closed."""
681
 
        self._inotify_notifier_gral.stop()
682
 
        self._inotify_notifier_antr.stop()
683
 
        reactor.removeReader(self._inotify_reader_gral)
684
 
        reactor.removeReader(self._inotify_reader_antr)
 
200
        """Make the monitor shutdown."""
 
201
        self.monitor.shutdown()
685
202
 
686
 
    def inotify_rm_watch(self, dirpath):
 
203
    def rm_watch(self, dirpath):
687
204
        """Remove watch from a dir."""
688
 
        if dirpath in self._general_watchs:
689
 
            w_dict = self._general_watchs
690
 
            w_manager = self._inotify_general_wm
691
 
        elif dirpath in self._ancestors_watchs:
692
 
            w_dict = self._ancestors_watchs
693
 
            w_manager = self._inotify_ancestors_wm
694
 
        else:
695
 
            self.log.warning("Tried to remove a nonexistent watch on %r",
696
 
                             dirpath)
697
 
            return
698
 
 
699
 
        wd = w_dict.pop(dirpath)
700
 
        w_manager.rm_watch(wd)
701
 
 
702
 
    def inotify_add_watch(self, dirpath):
 
205
        self.monitor.rm_watch(dirpath)
 
206
 
 
207
    def add_watch(self, dirpath):
703
208
        """Add watch to a dir."""
704
 
        # see where to add it
705
 
        if self._is_udf_ancestor(dirpath):
706
 
            w_type = "ancestors"
707
 
            w_manager = self._inotify_ancestors_wm
708
 
            w_dict = self._ancestors_watchs
709
 
            events = INOTIFY_EVENTS_ANCESTORS
710
 
        else:
711
 
            w_type = "general"
712
 
            w_manager = self._inotify_general_wm
713
 
            w_dict = self._general_watchs
714
 
            events = INOTIFY_EVENTS_GENERAL
715
 
 
716
 
        # add the watch!
717
 
        self.log.debug("Adding %s inotify watch to %r", w_type, dirpath)
718
 
        result = w_manager.add_watch(dirpath, events)
719
 
        w_dict[dirpath] = result[dirpath]
720
 
 
721
 
    def inotify_has_watch(self, dirpath):
 
209
        self.monitor.add_watch(dirpath)
 
210
 
 
211
    def has_watch(self, dirpath):
722
212
        """Check if a dirpath is watched."""
723
 
        return (dirpath in self._general_watchs or
724
 
                dirpath in self._ancestors_watchs)
725
 
 
726
 
    def inotify_watch_fix(self, pathfrom, pathto):
727
 
        """Fix the path in inotify structures."""
728
 
        if pathfrom in self._general_watchs:
729
 
            wdict = self._general_watchs
730
 
        elif pathfrom in self._ancestors_watchs:
731
 
            wdict = self._ancestors_watchs
732
 
        else:
733
 
            m = "Tried to fix nonexistent path %r in watches (to %r)"
734
 
            self.log.warning(m, pathfrom, pathto)
735
 
            return
736
 
 
737
 
        # fix
738
 
        wdict[pathto] = wdict.pop(pathfrom)
739
 
 
740
 
    def _is_udf_ancestor(self, path):
741
 
        """Decide if path is an UDF ancestor or not."""
742
 
        for udf in self.fs.vm.udfs.itervalues():
743
 
            parent = os.path.dirname(udf.path) + os.path.sep
744
 
            if parent.startswith(path + os.path.sep):
745
 
                return True
746
 
        return False
 
213
        return self.monitor.has_watch(dirpath)
747
214
 
748
215
    def unsubscribe(self, obj):
749
216
        """Remove the callback object from the listener queue.
846
313
 
847
314
    def is_frozen(self):
848
315
        """Checks if there's something frozen."""
849
 
        return self._processor.frozen_path is not None
 
316
        return self.monitor.is_frozen()
850
317
 
851
318
    def freeze_begin(self, path):
852
319
        """Puts in hold all the events for this path."""
853
 
        if self._processor.frozen_path is not None:
854
 
            raise ValueError("There's something already frozen!")
855
 
        self._processor.freeze_begin(path)
 
320
        self.monitor.freeze_begin(path)
856
321
 
857
322
    def freeze_rollback(self):
858
323
        """Unfreezes the frozen path, reseting to idle state."""
859
 
        if self._processor.frozen_path is None:
860
 
            raise ValueError("Rolling back with nothing frozen!")
861
 
        self._processor.freeze_rollback()
 
324
        self.monitor.freeze_rollback()
862
325
 
863
326
    def freeze_commit(self, events):
864
327
        """Unfreezes the frozen path, sending received events if not dirty.
868
331
        else:
869
332
            - push the here received events, return False
870
333
        """
871
 
        if self._processor.frozen_path is None:
872
 
            raise ValueError("Commiting with nothing frozen!")
873
 
 
874
 
        d = defer.execute(self._processor.freeze_commit, events)
875
 
        return d
 
334
        return self.monitor.freeze_commit(events)