~ubuntu-branches/ubuntu/oneiric/ubuntuone-client/oneiric

« back to all changes in this revision

Viewing changes to ubuntuone/platform/windows/filesystem_notifications.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2011-02-23 18:34:09 UTC
  • mfrom: (1.1.45 upstream)
  • Revision ID: james.westby@ubuntu.com-20110223183409-535o7yo165wbjmca
Tags: 1.5.5-0ubuntu1
* New upstream release.
  - Subscribing to a RO share will not download content (LP: #712528)
  - Can't synchronize "~/Ubuntu One Music" (LP: #714976)
  - Syncdaemon needs to show progress in Unity launcher (LP: #702116)
  - Notifications say "your cloud" (LP: #715887)
  - No longer requires python-libproxy
  - Recommend unity and indicator libs by default

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
#
14
14
# You should have received a copy of the GNU General Public License along
15
15
# with this program.  If not, see <http://www.gnu.org/licenses/>.
16
 
 
 
16
"""File notifications on windows."""
 
17
 
 
18
import logging
 
19
import os
 
20
 
 
21
from Queue import Queue, Empty
 
22
from threading import Thread
 
23
from uuid import uuid4
 
24
from twisted.internet import defer, task
 
25
from win32con import (
 
26
    FILE_SHARE_READ,
 
27
    FILE_SHARE_WRITE,
 
28
    FILE_FLAG_BACKUP_SEMANTICS,
 
29
    FILE_NOTIFY_CHANGE_FILE_NAME,
 
30
    FILE_NOTIFY_CHANGE_DIR_NAME,
 
31
    FILE_NOTIFY_CHANGE_ATTRIBUTES,
 
32
    FILE_NOTIFY_CHANGE_SIZE,
 
33
    FILE_NOTIFY_CHANGE_LAST_WRITE,
 
34
    FILE_NOTIFY_CHANGE_SECURITY,
 
35
    OPEN_EXISTING
 
36
)
 
37
from win32file import CreateFile, ReadDirectoryChangesW
 
38
from ubuntuone.platform.windows.pyinotify import (
 
39
    Event,
 
40
    WatchManagerError,
 
41
    PrintAllEvents,
 
42
    ProcessEvent,
 
43
    IN_OPEN,
 
44
    IN_CLOSE_NOWRITE,
 
45
    IN_CLOSE_WRITE,
 
46
    IN_CREATE,
 
47
    IN_IGNORED,
 
48
    IN_ISDIR,
 
49
    IN_DELETE,
 
50
    IN_MOVED_FROM,
 
51
    IN_MOVED_TO,
 
52
    IN_MODIFY,
 
53
)
 
54
from ubuntuone.syncdaemon.filesystem_notifications import (
 
55
    GeneralINotifyProcessor
 
56
)
 
57
from ubuntuone.platform.windows.os_helper import (
 
58
    LONG_PATH_PREFIX,
 
59
    listdir
 
60
)
 
61
 
 
62
# constant found in the msdn documentation:
 
63
# http://msdn.microsoft.com/en-us/library/ff538834(v=vs.85).aspx
 
64
FILE_LIST_DIRECTORY = 0x0001
 
65
FILE_NOTIFY_CHANGE_LAST_ACCESS = 0x00000020
 
66
FILE_NOTIFY_CHANGE_CREATION = 0x00000040
 
67
 
 
68
# a map between the few events that we have on windows and those
 
69
# found in pyinotify
 
70
WINDOWS_ACTIONS = {
 
71
  1: IN_CREATE,
 
72
  2: IN_DELETE,
 
73
  3: IN_MODIFY,
 
74
  4: IN_MOVED_FROM,
 
75
  5: IN_MOVED_TO
 
76
}
 
77
 
 
78
# translates quickly the event and it's is_dir state to our standard events
 
79
NAME_TRANSLATIONS = {
 
80
    IN_OPEN: 'FS_FILE_OPEN',
 
81
    IN_CLOSE_NOWRITE: 'FS_FILE_CLOSE_NOWRITE',
 
82
    IN_CLOSE_WRITE: 'FS_FILE_CLOSE_WRITE',
 
83
    IN_CREATE: 'FS_FILE_CREATE',
 
84
    IN_CREATE | IN_ISDIR: 'FS_DIR_CREATE',
 
85
    IN_DELETE: 'FS_FILE_DELETE',
 
86
    IN_DELETE | IN_ISDIR: 'FS_DIR_DELETE',
 
87
    IN_MOVED_FROM: 'FS_FILE_DELETE',
 
88
    IN_MOVED_FROM | IN_ISDIR: 'FS_DIR_DELETE',
 
89
    IN_MOVED_TO: 'FS_FILE_CREATE',
 
90
    IN_MOVED_TO | IN_ISDIR: 'FS_DIR_CREATE',
 
91
}
 
92
 
 
93
# the default mask to be used in the watches added by the FilesystemMonitor
 
94
# class
 
95
FILESYSTEM_MONITOR_MASK = FILE_NOTIFY_CHANGE_FILE_NAME | \
 
96
    FILE_NOTIFY_CHANGE_DIR_NAME | \
 
97
    FILE_NOTIFY_CHANGE_ATTRIBUTES | \
 
98
    FILE_NOTIFY_CHANGE_SIZE | \
 
99
    FILE_NOTIFY_CHANGE_LAST_WRITE | \
 
100
    FILE_NOTIFY_CHANGE_SECURITY | \
 
101
    FILE_NOTIFY_CHANGE_LAST_ACCESS
 
102
 
 
103
 
 
104
# The implementation of the code that is provided as the pyinotify
 
105
# substitute
 
106
class Watch(object):
 
107
    """Implement the same functions as pyinotify.Watch."""
 
108
 
 
109
    def __init__(self, watch_descriptor, path, mask, auto_add,
 
110
        events_queue=None, exclude_filter=None, proc_fun=None):
 
111
        super(Watch, self).__init__()
 
112
        self.log = logging.getLogger('ubuntuone.platform.windows.' +
 
113
            'filesystem_notifications.Watch')
 
114
        self._watching = False
 
115
        self._descriptor = watch_descriptor
 
116
        self._auto_add = auto_add
 
117
        self.exclude_filter = exclude_filter 
 
118
        self._proc_fun = proc_fun
 
119
        self._cookie = None
 
120
        self._source_pathname = None
 
121
        # remember the subdirs we have so that when we have a delete we can
 
122
        # check if it was a remove
 
123
        self._subdirs = []
 
124
        # ensure that we work with an abspath and that we can deal with
 
125
        # long paths over 260 chars.
 
126
        self._path = os.path.abspath(path)
 
127
        if not self._path.startswith(LONG_PATH_PREFIX):
 
128
            self._path = LONG_PATH_PREFIX + self._path
 
129
        self._mask = mask
 
130
        # lets make the q as big as possible
 
131
        self._raw_events_queue = Queue()
 
132
        if not events_queue:
 
133
            events_queue = Queue()
 
134
        self.events_queue = events_queue
 
135
 
 
136
    def _path_is_dir(self, path):
 
137
        """"Check if the path is a dir and update the local subdir list."""
 
138
        self.log.debug('Testing if path "%s" is a dir', path)
 
139
        is_dir = False
 
140
        if os.path.exists(path):
 
141
            is_dir = os.path.isdir(path)
 
142
        else:
 
143
            self.log.debug('Path "%s" was deleted subdirs are %s.',
 
144
                path, self._subdirs)
 
145
            # we removed the path, we look in the internal list
 
146
            if path in self._subdirs:
 
147
                is_dir = True
 
148
                self._subdirs.remove(path)
 
149
        if is_dir:
 
150
            self.log.debug('Adding %s to subdirs %s', path, self._subdirs)
 
151
            self._subdirs.append(path)
 
152
        return is_dir
 
153
 
 
154
    def _process_events(self):
 
155
        """Process the events form the queue."""
 
156
        # we transform the events to be the same as the one in pyinotify
 
157
        # and then use the proc_fun
 
158
        while self._watching or not self._raw_events_queue.empty():
 
159
            file_name, action = self._raw_events_queue.get()
 
160
            # map the windows events to the pyinotify ones, tis is dirty but
 
161
            # makes the multiplatform better, linux was first :P
 
162
            is_dir = self._path_is_dir(file_name)
 
163
            if os.path.exists(file_name):
 
164
                is_dir = os.path.isdir(file_name)
 
165
            else:
 
166
                # we removed the path, we look in the internal list
 
167
                if file_name in self._subdirs:
 
168
                    is_dir = True
 
169
                    self._subdirs.remove(file_name)
 
170
            if is_dir:
 
171
                self._subdirs.append(file_name)
 
172
            mask = WINDOWS_ACTIONS[action]
 
173
            head, tail = os.path.split(file_name)
 
174
            if is_dir:
 
175
                mask |= IN_ISDIR
 
176
            event_raw_data = {
 
177
                'wd': self._descriptor,
 
178
                'dir': is_dir,
 
179
                'mask': mask,
 
180
                'name': tail,
 
181
                'path': head.replace(self.path, '.')
 
182
            }
 
183
            # by the way in which the win api fires the events we know for
 
184
            # sure that no move events will be added in the wrong order, this
 
185
            # is kind of hacky, I dont like it too much
 
186
            if WINDOWS_ACTIONS[action] == IN_MOVED_FROM:
 
187
                self._cookie = str(uuid4())
 
188
                self._source_pathname = tail
 
189
                event_raw_data['cookie'] = self._cookie
 
190
            if WINDOWS_ACTIONS[action] == IN_MOVED_TO:
 
191
                event_raw_data['src_pathname'] = self._source_pathname
 
192
                event_raw_data['cookie'] = self._cookie
 
193
            event = Event(event_raw_data)
 
194
            # FIXME: event deduces the pathname wrong and we need to manually
 
195
            # set it
 
196
            event.pathname = file_name
 
197
            # add the event only if we do not have an exclude filter or
 
198
            # the exclude filter returns False, that is, the event will not
 
199
            # be excluded
 
200
            if not self.exclude_filter or not self.exclude_filter(event):
 
201
                self.log.debug('Addding event %s to queue.', event)
 
202
                self.events_queue.put(event)
 
203
 
 
204
    def _watch(self):
 
205
        """Watch a path that is a directory."""
 
206
        # we are going to be using the ReadDirectoryChangesW whihc requires
 
207
        # a directory handle and the mask to be used.
 
208
        handle = CreateFile(
 
209
            self._path,
 
210
            FILE_LIST_DIRECTORY,
 
211
            FILE_SHARE_READ | FILE_SHARE_WRITE,
 
212
            None,
 
213
            OPEN_EXISTING,
 
214
            FILE_FLAG_BACKUP_SEMANTICS,
 
215
            None
 
216
        )
 
217
        self.log.debug('Watchng path %s.', self._path)
 
218
        while self._watching:
 
219
            # important information to know about the parameters:
 
220
            # param 1: the handle to the dir
 
221
            # param 2: the size to be used in the kernel to store events
 
222
            # that might be lost while the call is being performed. This
 
223
            # is complicated to fine tune since if you make lots of watcher
 
224
            # you migh used too much memory and make your OS to BSOD
 
225
            results = ReadDirectoryChangesW(
 
226
                handle,
 
227
                1024,
 
228
                self._auto_add,
 
229
                self._mask,
 
230
                None,
 
231
                None
 
232
            )
 
233
            # add the diff events to the q so that the can be processed no
 
234
            # matter the speed.
 
235
            for action, file in results:
 
236
                full_filename = os.path.join(self._path, file)
 
237
                self._raw_events_queue.put((full_filename, action))
 
238
                self.log.debug('Added %s to raw events queue.',
 
239
                    (full_filename, action))
 
240
 
 
241
    def start_watching(self):
 
242
        """Tell the watch to start processing events."""
 
243
        # get the diff dirs in the path
 
244
        for current_child in listdir(self._path):
 
245
            full_child_path = os.path.join(self._path, current_child)
 
246
            if os.path.isdir(full_child_path):
 
247
                self._subdirs.append(full_child_path)
 
248
        # start to diff threads, one to watch the path, the other to
 
249
        # process the events.
 
250
        self.log.debug('Sart watching path.')
 
251
        self._watching = True
 
252
        watch_thread = Thread(target=self._watch,
 
253
            name='Watch(%s)' % self._path)
 
254
        process_thread = Thread(target=self._process_events,
 
255
            name='Process(%s)' % self._path)
 
256
        process_thread.start()
 
257
        watch_thread.start()
 
258
 
 
259
    def stop_watching(self):
 
260
        """Tell the watch to stop processing events."""
 
261
        self._watching = False
 
262
        self._subdirs = []
 
263
 
 
264
    def update(self, mask, proc_fun=None, auto_add=False):
 
265
        """Update the info used by the watcher."""
 
266
        self.log.debug('update(%s, %s, %s)', mask, proc_fun, auto_add)
 
267
        self._mask = mask
 
268
        self._proc_fun = proc_fun
 
269
        self._auto_add = auto_add
 
270
 
 
271
    @property
 
272
    def path(self):
 
273
        """Return the patch watched."""
 
274
        return self._path
 
275
 
 
276
    @property
 
277
    def auto_add(self):
 
278
        return self._auto_add
 
279
 
 
280
    @property
 
281
    def proc_fun(self):
 
282
        return self._proc_fun
 
283
 
 
284
 
 
285
class WatchManager(object):
 
286
    """Implement the same functions as pyinotify.WatchManager."""
 
287
 
 
288
    def __init__(self, exclude_filter=lambda path: False, watch_factory=Watch):
 
289
        """Init the manager to keep trak of the different watches."""
 
290
        super(WatchManager, self).__init__()
 
291
        self.log = logging.getLogger('ubuntuone.platform.windows.'
 
292
            + 'filesystem_notifications.WatchManager')
 
293
        self._wdm = {}
 
294
        self._wd_count = 0
 
295
        self._exclude_filter = exclude_filter
 
296
        self._events_queue = Queue()
 
297
        self._ignored_paths = []
 
298
        self._watch_factory = watch_factory
 
299
 
 
300
    def stop(self):
 
301
        """Close the manager and stop all watches."""
 
302
        self.log.debug('Stopping watches.')
 
303
        for current_wd in self._wdm:
 
304
            self._wdm[current_wd].stop_watching()
 
305
            self.log.debug('Watch for %s stopped.', self._wdm[current_wd].path)
 
306
 
 
307
    def get_watch(self, wd):
 
308
        """Return the watch with the given descriptor."""
 
309
        return self._wdm[wd]
 
310
 
 
311
    def del_watch(self, wd):
 
312
        """Delete the watch with the given descriptor."""
 
313
        try:
 
314
            watch = self._wdm[wd]
 
315
            watch.stop_watching()
 
316
            del self._wdm[wd]
 
317
            self.log.debug('Watch %s removed.', wd)
 
318
        except KeyError, e:
 
319
            logging.error(str(e))
 
320
 
 
321
    def _add_single_watch(self, path, mask, proc_fun=None, auto_add=False,
 
322
        quiet=True, exclude_filter=None):
 
323
        self.log.debug('add_single_watch(%s, %s, %s, %s, %s, %s)', path, mask,
 
324
            proc_fun, auto_add, quiet, exclude_filter)
 
325
        self._wdm[self._wd_count] = self._watch_factory(self._wd_count, path, mask,
 
326
            auto_add, events_queue=self._events_queue,
 
327
            exclude_filter=exclude_filter, proc_fun=proc_fun)
 
328
        self._wdm[self._wd_count].start_watching()
 
329
        self._wd_count += 1
 
330
        self.log.debug('Watch count increased to %s', self._wd_count)
 
331
 
 
332
    def add_watch(self, path, mask, proc_fun=None, auto_add=False,
 
333
        quiet=True, exclude_filter=None):
 
334
        if hasattr(path, '__iter__'):
 
335
            self.log.debug('Added collection of watches.')
 
336
            # we are dealing with a collection of paths
 
337
            for current_path in path:
 
338
                if not self.get_wd(current_path):
 
339
                    self._add_single_watch(current_path, mask, proc_fun,
 
340
                        auto_add, quiet, exclude_filter)
 
341
        elif not self.get_wd(path):
 
342
            self.log.debug('Adding single watch.')
 
343
            self._add_single_watch(path, mask, proc_fun, auto_add,
 
344
                quiet, exclude_filter)
 
345
 
 
346
    def update_watch(self, wd, mask=None, proc_fun=None, rec=False,
 
347
                     auto_add=False, quiet=True):
 
348
        try:
 
349
            watch = self._wdm[wd]
 
350
            watch.stop_watching()
 
351
            self.log.debug('Stopped watch on %s for update.', watch.path)
 
352
            # update the data and restart watching
 
353
            auto_add = auto_add or rec
 
354
            watch.update(mask, proc_fun=proc_fun, auto_add=auto_add)
 
355
            # only start the watcher again if the mask was given, otherwhise
 
356
            # we are not watchng and therefore do not care
 
357
            if mask:
 
358
                watch.start_watching()
 
359
        except KeyError, e:
 
360
            self.log.error(str(e))
 
361
            if not quiet:
 
362
                raise WatchManagerError('Watch %s was not found' % wd, {})
 
363
 
 
364
    def get_wd(self, path):
 
365
        """Return the watcher that is used to watch the given path."""
 
366
        for current_wd in self._wdm:
 
367
            watch_path = self._wdm[current_wd].path 
 
368
            if watch_path == path or (
 
369
                    watch_path in path and self._wdm[current_wd].auto_add):
 
370
                return current_wd
 
371
 
 
372
    def get_path(self, wd):
 
373
        """Return the path watched by the wath with the given wd."""
 
374
        watch = self._wdm.get(wd)
 
375
        if watch:
 
376
            return watch.path
 
377
 
 
378
    def rm_watch(self, wd, rec=False, quiet=True):
 
379
        """Remove the the watch with the given wd."""
 
380
        try:
 
381
            watch = self._wdm[wd]
 
382
            watch.stop_watching()
 
383
            del self._wdm[wd]
 
384
        except KeyError, err:
 
385
            self.log.error(str(err))
 
386
            if not quiet:
 
387
                raise WatchManagerError('Watch %s was not found' % wd, {})
 
388
 
 
389
    def rm_path(self, path):
 
390
        """Remove a watch to the given path."""
 
391
        wd = self.get_wd(path)
 
392
        if wd:
 
393
            if self._wdm[wd].path == path:
 
394
                self.log.debug('Removing watch for path "%s"', path)
 
395
                self.rm_watch(wd)
 
396
            else:
 
397
                self.log.debug('Adding exclude filter for "%s"', path)
 
398
                # we have a watch that cotains the path as a child path
 
399
                if not path in self._ignored_paths:
 
400
                    self._ignored_paths.append(path)
 
401
                # it would be very tricky to remove a subpath from a watcher that is
 
402
                # looking at changes in ther kids. To make it simpler and less error
 
403
                # prone (and even better performant since we use less threads) we will
 
404
                # add a filter to the events in the watcher so that the events from
 
405
                # that child are not received :)
 
406
                def ignore_path(event):
 
407
                    """Ignore an event if it has a given path."""
 
408
                    for ignored_path in self._ignored_paths:
 
409
                        if ignore_path in event.pathname:
 
410
                            return True
 
411
                    return False
 
412
 
 
413
                # FIXME: This assumes that we do not have other function
 
414
                # which in our usecase is correct, but what is we move this
 
415
                # to other project?!? Maybe using the manager
 
416
                # exclude_filter is better
 
417
                self._wdm[wd].exclude_filter = ignore_path
 
418
 
 
419
    @property
 
420
    def watches(self):
 
421
        """Return a reference to the dictionary that contains the watches."""
 
422
        return self._wdm
 
423
 
 
424
    @property
 
425
    def events_queue(self):
 
426
        """Return the queue with the events that the manager contains."""
 
427
        return self._events_queue
 
428
 
 
429
 
 
430
class Notifier(object):
 
431
    """
 
432
    Read notifications, process events. Inspired by the pyinotify.Notifier
 
433
    """
 
434
 
 
435
    def __init__(self, watch_manager, default_proc_fun=None, read_freq=0,
 
436
                 threshold=10, timeout=-1):
 
437
        """Init to process event according to the given timeout & threshold."""
 
438
        super(Notifier, self).__init__()
 
439
        self.log = logging.getLogger('ubuntuone.platform.windows.'
 
440
            + 'filesystem_notifications.Notifier')
 
441
        # Watch Manager instance
 
442
        self._watch_manager = watch_manager
 
443
        # Default processing method
 
444
        self._default_proc_fun = default_proc_fun
 
445
        if default_proc_fun is None:
 
446
            self._default_proc_fun = PrintAllEvents()
 
447
        # Loop parameters
 
448
        self._read_freq = read_freq
 
449
        self._threshold = threshold
 
450
        self._timeout = timeout
 
451
 
 
452
    def proc_fun(self):
 
453
        return self._default_proc_fun
 
454
 
 
455
    def process_events(self):
 
456
        """
 
457
        Process the event given the threshold and the timeout.
 
458
        """
 
459
        self.log.debug('Processing events with threashold: %s and timeout: %s',
 
460
            self._threshold, self._timeout)
 
461
        # we will process an amount of events equal to the threshold of
 
462
        # the notifier and will block for the amount given by the timeout
 
463
        processed_events = 0
 
464
        while processed_events < self._threshold:
 
465
            try:
 
466
                raw_event = None
 
467
                if not self._timeout or self._timeout < 0:
 
468
                    raw_event = self._watch_manager.events_queue.get(
 
469
                        block=False)
 
470
                else:
 
471
                    raw_event = self._watch_manager.events_queue.get(
 
472
                        timeout=self._timeout)
 
473
                watch = self._watch_manager.get_watch(raw_event.wd)
 
474
                if watch is None:
 
475
                    # Not really sure how we ended up here, nor how we should
 
476
                    # handle these types of events and if it is appropriate to
 
477
                    # completly skip them (like we are doing here).
 
478
                    self.log.warning('Unable to retrieve Watch object '
 
479
                        + 'associated to %s', raw_event)
 
480
                    processed_events += 1
 
481
                    continue
 
482
                if watch and watch.proc_fun:
 
483
                    self.log.debug('Executing proc_fun from watch.')
 
484
                    watch.proc_fun(raw_event)  # user processings
 
485
                else:
 
486
                    self.log.debug('Executing default_proc_fun')
 
487
                    self._default_proc_fun(raw_event)
 
488
                processed_events += 1
 
489
            except Empty:
 
490
                # increase the number of processed events, and continue
 
491
                processed_events += 1
 
492
                continue
 
493
 
 
494
    def stop(self):
 
495
        """Stop processing events and the watch manager."""
 
496
        self._watch_manager.stop()
 
497
 
 
498
 
 
499
class NotifyProcessor(ProcessEvent):
 
500
    """Processor that takes care of dealing with the events."""
 
501
 
 
502
    def __init__(self, monitor, ignore_config=None):
 
503
        self.general_processor = GeneralINotifyProcessor(monitor,
 
504
            self.handle_dir_delete, NAME_TRANSLATIONS,
 
505
            self.platform_is_ignored, IN_IGNORED, ignore_config=ignore_config)
 
506
        self.held_event = None
 
507
 
 
508
    def rm_from_mute_filter(self, event, paths):
 
509
        """Remove event from the mute filter."""
 
510
        self.general_processor.rm_from_mute_filter(event, paths)
 
511
 
 
512
    def add_to_mute_filter(self, event, paths):
 
513
        """Add an event and path(s) to the mute filter."""
 
514
        self.general_processor.add_to_mute_filter(event, paths)
 
515
 
 
516
    def platform_is_ignored(self, path):
 
517
        """Should we ignore this path in the current platform.?"""
 
518
        # don't support links yet
 
519
        if path.endswith('.lnk'):
 
520
            return True
 
521
        return False
 
522
 
 
523
    def is_ignored(self, path):
 
524
        """Should we ignore this path?"""
 
525
        return self.general_processor.is_ignored(path)
 
526
 
 
527
    def release_held_event(self, timed_out=False):
 
528
        """Release the event on hold to fulfill its destiny."""
 
529
        self.general_processor.push_event(self.held_event)
 
530
        self.held_event = None
 
531
 
 
532
    def process_IN_MODIFY(self, event):
 
533
        """Capture a modify event and fake an open ^ close write events."""
 
534
        # on windows we just get IN_MODIFY, lets always fake
 
535
        # an OPEN & CLOSE_WRITE couple
 
536
        raw_open = raw_close = {
 
537
           'wd': event.wd,
 
538
           'dir': event.dir,
 
539
           'name': event.name,
 
540
           'path': event.path
 
541
        }
 
542
        # caculate the open mask
 
543
        if event.dir:
 
544
            raw_open['mask'] = IN_OPEN | IN_ISDIR
 
545
        else:
 
546
            raw_open['mask'] = IN_OPEN
 
547
        # create the event using the raw data, then fix the pathname param
 
548
        open_event = Event(raw_open)
 
549
        open_event.pathname = event.pathname
 
550
        # push the open
 
551
        self.general_processor.push_event(open_event)
 
552
        # calculate the close mask
 
553
        if event.dir:
 
554
            raw_close['mask'] = IN_CLOSE_WRITE | IN_ISDIR
 
555
        else:
 
556
            raw_close['mask'] = IN_CLOSE_WRITE
 
557
        close_event = Event(raw_close)
 
558
        close_event.pathname = event.pathname
 
559
        # push the close event
 
560
        self.general_processor.push_event(close_event)
 
561
 
 
562
    def process_IN_MOVED_FROM(self, event):
 
563
        """Capture the MOVED_FROM to maybe syntethize FILE_MOVED."""
 
564
        if self.held_event is not None:
 
565
            self.general_processor.log.warn('Lost pair event of %s', self.held_event)
 
566
        self.held_event = event
 
567
 
 
568
    def process_IN_MOVED_TO(self, event):
 
569
        """Capture the MOVED_TO to maybe syntethize FILE_MOVED."""
 
570
        if self.held_event is not None:
 
571
            if event.cookie == self.held_event.cookie:
 
572
                f_path_dir = os.path.split(self.held_event.pathname)[0]
 
573
                t_path_dir = os.path.split(event.pathname)[0]
 
574
 
 
575
                is_from_forreal = not self.is_ignored(self.held_event.pathname)
 
576
                is_to_forreal = not self.is_ignored(event.pathname)
 
577
                if is_from_forreal and is_to_forreal:
 
578
                    f_share_id = self.general_processor.get_path_share_id(
 
579
                        f_path_dir)
 
580
                    t_share_id = self.general_processor.get_path_share_id(
 
581
                        t_path_dir)
 
582
                    if event.dir:
 
583
                        evtname = "FS_DIR_"
 
584
                    else:
 
585
                        evtname = "FS_FILE_"
 
586
                    if f_share_id != t_share_id:
 
587
                        # if the share_id are != push a delete/create
 
588
                        m = "Delete because of different shares: %r"
 
589
                        self.log.info(m, self.held_event.pathname)
 
590
                        self.general_processor.eq_push(
 
591
                            evtname + "DELETE", path=self.held_event.pathname)
 
592
                        self.general_processor.eq_push(
 
593
                            evtname + "CREATE", path=event.pathname)
 
594
                        if not event.dir:
 
595
                            self.general_processor.eq_push(
 
596
                                'FS_FILE_CLOSE_WRITE', path=event.pathname)
 
597
                    else:
 
598
                        self.general_processor.eq_push(evtname + "MOVE",
 
599
                            path_from=self.held_event.pathname,
 
600
                            path_to=event.pathname)
 
601
                elif is_to_forreal:
 
602
                    # this is the case of a MOVE from something ignored
 
603
                    # to a valid filename
 
604
                    if event.dir:
 
605
                        evtname = "FS_DIR_"
 
606
                    else:
 
607
                        evtname = "FS_FILE_"
 
608
                    self.general_processor.eq_push(evtname + "CREATE",
 
609
                        path=event.pathname)
 
610
                    if not event.dir:
 
611
                        self.general_processor.eq_push('FS_FILE_CLOSE_WRITE',
 
612
                            path=event.pathname)
 
613
 
 
614
                self.held_event = None
 
615
                return
 
616
            else:
 
617
                self.release_held_event()
 
618
                self.general_processor.push_event(event)
 
619
        else:
 
620
            # We should never get here on windows, I really do not know how we
 
621
            # got here
 
622
            self.general_processor.log.warn('Cookie does not match the previoues held event!')
 
623
            self.general_processor.log.warn('Ignoring %s', event)
 
624
 
 
625
    def process_default(self, event):
 
626
        """Push the event into the EventQueue."""
 
627
        if self.held_event is not None:
 
628
            self.release_held_event()
 
629
        self.general_processor.push_event(event)
 
630
 
 
631
    def handle_dir_delete(self, fullpath):
 
632
        """Some special work when a directory is deleted."""
 
633
        # remove the watch on that dir from our structures
 
634
        self.general_processor.rm_watch(fullpath)
 
635
 
 
636
        # handle the case of move a dir to a non-watched directory
 
637
        paths = self.general_processor.get_paths_starting_with(fullpath,
 
638
            include_base=False)
 
639
 
 
640
        paths.sort(reverse=True)
 
641
        for path, is_dir in paths:
 
642
            m = "Pushing deletion because of parent dir move: (is_dir=%s) %r"
 
643
            self.general_processor.log.info(m, is_dir, path)
 
644
            if is_dir:
 
645
                self.general_processor.rm_watch(path)
 
646
                self.general_processor.eq_push('FS_DIR_DELETE', path=path)
 
647
            else:
 
648
                self.general_processor.eq_push('FS_FILE_DELETE', path=path)
 
649
 
 
650
    def freeze_begin(self, path):
 
651
        """Puts in hold all the events for this path."""
 
652
        self.general_processor.freeze_begin(path)
 
653
 
 
654
    def freeze_rollback(self):
 
655
        """Unfreezes the frozen path, reseting to idle state."""
 
656
        self.general_processor.freeze_rollback()
 
657
 
 
658
    def freeze_commit(self, events):
 
659
        """Unfreezes the frozen path, sending received events if not dirty.
 
660
 
 
661
        If events for that path happened:
 
662
            - return True
 
663
        else:
 
664
            - push the here received events, return False
 
665
        """
 
666
        return self.general_processor.freeze_commit(events)
 
667
 
 
668
    @property
 
669
    def mute_filter(self):
 
670
        """Return the mute filter used by the processor."""
 
671
        return self.general_processor.filter
 
672
 
 
673
    @property
 
674
    def frozen_path(self):
 
675
        """Return the frozen path."""
 
676
        return self.general_processor.frozen_path
 
677
 
 
678
    @property
 
679
    def log(self):
 
680
        """Return the logger of the instance."""
 
681
        return self.general_processor.log
 
682
 
17
683
 
18
684
class FilesystemMonitor(object):
19
685
    """Manages the signals from filesystem."""
20
 
    # TODO: Implement this on windows!
21
 
    pass
 
686
 
 
687
    def __init__(self, eq, fs, ignore_config=None, timeout=0.1):
 
688
        self.log = logging.getLogger('ubuntuone.SyncDaemon.FSMonitor')
 
689
        self.fs = fs
 
690
        self.eq = eq
 
691
        # XXX: We need to find a decent time for the time out, is 0.2 seconds
 
692
        # too little?
 
693
        self.timeout = timeout
 
694
        # general inotify
 
695
        self._watch_manager = WatchManager()
 
696
        self._processor = NotifyProcessor(self, ignore_config)
 
697
        self._notifier = Notifier(self._watch_manager, self._processor)
 
698
        self._process_task = self._hook_inotify_to_twisted()
 
699
 
 
700
    def _fix_mute_filter_info(self, info):
 
701
        # we always make sure that the long path prefix is present so
 
702
        # that we can deal with the paths
 
703
        if 'path' in info and not info['path'].startswith(LONG_PATH_PREFIX):
 
704
            info['path'] = LONG_PATH_PREFIX + info['path']
 
705
        if 'path_to' in info and \
 
706
            not info['path_to'].startswith(LONG_PATH_PREFIX):
 
707
            info['path_to'] = LONG_PATH_PREFIX + info['path_to']
 
708
        if 'path_from' in info and \
 
709
            not info['path_from'].startswith(LONG_PATH_PREFIX):
 
710
            info['path_from'] = LONG_PATH_PREFIX + info['path_from']
 
711
 
 
712
    def add_to_mute_filter(self, event, **info):
 
713
        """Add info to mute filter in the processor."""
 
714
        self._fix_mute_filter_info(info)
 
715
        self._processor.add_to_mute_filter(event, info)
 
716
 
 
717
    def rm_from_mute_filter(self, event, **info):
 
718
        """Remove info to mute filter in the processor."""
 
719
        self._fix_mute_filter_info(info)
 
720
        self._processor.rm_from_mute_filter(event, info)
 
721
 
 
722
    def _hook_inotify_to_twisted(self):
 
723
        """This will hook inotify to twisted."""
 
724
 
 
725
        # since the select does not work on windows besides sockets
 
726
        # we have to use a much uglier techinique which is to perform
 
727
        # a pool our selfs which might not have events in the Queue of the
 
728
        # notifier. To make this as less painfull as possible we did set the
 
729
        # notifier to not have a timeout
 
730
        def process_events():
 
731
            self._notifier.process_events()
 
732
 
 
733
        process_task = task.LoopingCall(process_events)
 
734
        process_task.start(self.timeout)
 
735
        return process_task
 
736
 
 
737
    def shutdown(self):
 
738
        """Prepares the EQ to be closed."""
 
739
        self._notifier.stop()
 
740
        self._process_task.stop()
 
741
 
 
742
    def rm_watch(self, dirpath):
 
743
        """Remove watch from a dir."""
 
744
        # trust the implementation of the manager
 
745
        self._watch_manager.rm_path(dirpath)
 
746
 
 
747
    def add_watch(self, dirpath):
 
748
        """Add watch to a dir."""
 
749
        # we try to get a watch descriptor, it it exists it means that
 
750
        # the path is watch either by an specific watch of by a watch that
 
751
        # is also watching it kids
 
752
        if not self._watch_manager.get_wd(dirpath):
 
753
            # we need to add a watch which will also watch its kids
 
754
            self._watch_manager.add_watch(dirpath, FILESYSTEM_MONITOR_MASK,
 
755
                auto_add=True)
 
756
 
 
757
    def has_watch(self, dirpath):
 
758
        """Check if a dirpath is watched."""
 
759
        return self._watch_manager.get_wd(dirpath) is not None
 
760
 
 
761
    def is_frozen(self):
 
762
        """Checks if there's something frozen."""
 
763
        return self._processor.frozen_path is not None
 
764
 
 
765
    def freeze_begin(self, path):
 
766
        """Puts in hold all the events for this path."""
 
767
        if self._processor.frozen_path is not None:
 
768
            raise ValueError("There's something already frozen!")
 
769
        self._processor.freeze_begin(path)
 
770
 
 
771
    def freeze_rollback(self):
 
772
        """Unfreezes the frozen path, reseting to idle state."""
 
773
        if self._processor.frozen_path is None:
 
774
            raise ValueError("Rolling back with nothing frozen!")
 
775
        self._processor.freeze_rollback()
 
776
 
 
777
    def freeze_commit(self, events):
 
778
        """Unfreezes the frozen path, sending received events if not dirty.
 
779
 
 
780
        If events for that path happened:
 
781
            - return True
 
782
        else:
 
783
            - push the here received events, return False
 
784
        """
 
785
        if self._processor.frozen_path is None:
 
786
            raise ValueError("Commiting with nothing frozen!")
 
787
 
 
788
        d = defer.execute(self._processor.freeze_commit, events)
 
789
        return d