~alecu/ubuntuone-client/show-main-exceptions

« back to all changes in this revision

Viewing changes to ubuntuone/platform/filesystem_notifications/common.py

  • Committer: Tarmac
  • Author(s): Manuel de la Pena, Diego Sarmentero, manuel.delapena at canonical
  • Date: 2012-07-11 17:06:31 UTC
  • mfrom: (1272.1.26 clean-fsevents)
  • Revision ID: tarmac-20120711170631-ttg2pugml5fr61mx
Refactor that simplifies the way the diff implementations per platform are done so that in future branches is simpler to add the daemon code.

Show diffs side-by-side

added added

removed removed

Lines of Context:
30
30
 
31
31
import logging
32
32
import os
 
33
import sys
33
34
 
34
35
from twisted.internet import defer
35
36
 
44
45
    IN_IGNORED,
45
46
    IN_ISDIR,
46
47
    IN_DELETE,
47
 
    IN_MODIFY as in_modify,
48
48
    IN_MOVED_FROM,
49
 
    IN_MOVED_TO)
 
49
    IN_MOVED_TO,
 
50
)
50
51
 
51
52
from ubuntuone import logger
52
53
 
56
57
    os_path,
57
58
)
58
59
 
 
60
if sys.platform == 'darwin':
 
61
    from ubuntuone.platform.filesystem_notifications import darwin
 
62
    source = darwin
 
63
elif sys.platform == 'win32':
 
64
    from ubuntuone.platform.filesystem_notifications import windows
 
65
    source = windows
 
66
else:
 
67
    raise ImportError('Not supported platform')
 
68
 
59
69
# a map between the few events that we have on common platforms and those
60
70
# found in pyinotify
61
 
COMMON_ACTIONS = {}
 
71
ACTIONS = source.ACTIONS
62
72
 
63
73
# a map of the actions to names so that we have better logs.
64
 
COMMON_ACTIONS_NAMES = {}
65
 
 
66
 
# We should have this here, because we use if from other modules that
67
 
# share this, but we need to declare it this way yo avoid flakes issues.
68
 
IN_MODIFY = in_modify
 
74
ACTIONS_NAMES = source.ACTIONS_NAMES
 
75
 
 
76
# ignore paths in the platform, mainly links atm
 
77
path_is_ignored = source.path_is_ignored
 
78
 
 
79
# the base class to be use for a platform
 
80
PlatformWatch = source.Watch
 
81
PlatformWatchManager = source.WatchManager
69
82
 
70
83
# translates quickly the event and it's is_dir state to our standard events
71
84
NAME_TRANSLATIONS = {
89
102
class Watch(object):
90
103
    """Implement the same functions as pyinotify.Watch."""
91
104
 
92
 
    def __init__(self, watch_descriptor, path, mask, auto_add, processor,
93
 
        buf_size=8192):
94
 
        self.log = logging.getLogger('ubuntuone.SyncDaemon.platform.common.' +
95
 
            'filesystem_notifications.Watch')
96
 
        self.log.setLevel(TRACE)
 
105
    def __init__(self, watch_descriptor, path, processor):
 
106
        """Create a new watch."""
 
107
 
 
108
        # do ensure that we provide a os.path.sep
 
109
        if not path.endswith(os.path.sep):
 
110
            path += os.path.sep
 
111
        self.path = path
 
112
        self.ignore_paths = []
97
113
        self._processor = processor
98
 
        self._buf_size = buf_size
99
 
        self._watching = False
100
114
        self._descriptor = watch_descriptor
101
 
        self._auto_add = auto_add
102
 
        self._ignore_paths = []
103
115
        self._cookie = None
104
116
        self._source_pathname = None
105
 
        self._process_thread = None
106
 
        self._watch_handle = None
107
117
        # remember the subdirs we have so that when we have a delete we can
108
118
        # check if it was a remove
109
119
        self._subdirs = set()
110
 
        # ensure that we work with an abspath and that we can deal with
111
 
        # long paths over 260 chars.
112
 
        if not path.endswith(os.path.sep):
113
 
            path += os.path.sep
114
 
        self._path = os.path.abspath(path)
115
 
        self._mask = mask
116
 
 
117
 
    def _process_events_from_filesystem(self, action, file_name, cookie,
118
 
        syncdaemon_path):
 
120
 
 
121
        # platform watch used to deal with the platform details
 
122
        self.platform_watch = PlatformWatch(self.path, self.process_events)
 
123
 
 
124
        self.log = logging.getLogger('ubuntuone.SyncDaemon.platform.common.' +
 
125
            'filesystem_notifications.Watch')
 
126
        self.log.setLevel(TRACE)
 
127
 
 
128
    def process_events(self, action, file_name, cookie, syncdaemon_path):
119
129
        """Process the events from the queue."""
 
130
        # do not process events when the watch was stopped
 
131
        if not self.platform_watch.watching:
 
132
            return
 
133
 
 
134
        # do not process those events that should be ignored
 
135
        if any([file_name.startswith(path)
 
136
                for path in self.ignore_paths]):
 
137
            return
 
138
 
120
139
        # map the filesystem events to the pyinotify ones, tis is dirty but
121
140
        # makes the multiplatform better, linux was first :P
122
 
        full_dir_path = os.path.join(self._path, file_name)
 
141
        full_dir_path = os.path.join(self.path, file_name)
123
142
        is_dir = self._path_is_dir(full_dir_path)
 
143
 
124
144
        if is_dir:
125
145
            # we need to update the list of subdirs that we have
126
146
            self._update_subdirs(full_dir_path, action)
127
 
        mask = COMMON_ACTIONS[action]
 
147
 
 
148
        mask = ACTIONS[action]
128
149
        head, tail = os.path.split(file_name)
129
150
        if is_dir:
130
151
            mask |= IN_ISDIR
134
155
            'mask': mask,
135
156
            'name': tail,
136
157
            'path': '.'}
137
 
        # by the way in which the win api fires the events we know for
 
158
        # by the way in which the api fires the events we know for
138
159
        # sure that no move events will be added in the wrong order, this
139
160
        # is kind of hacky, I dont like it too much
140
 
        if COMMON_ACTIONS[action] == IN_MOVED_FROM:
 
161
        if ACTIONS[action] == IN_MOVED_FROM:
141
162
            self._cookie = cookie
142
163
            self._source_pathname = tail
143
164
            event_raw_data['cookie'] = self._cookie
144
 
        if COMMON_ACTIONS[action] == IN_MOVED_TO:
 
165
        if ACTIONS[action] == IN_MOVED_TO:
145
166
            event_raw_data['src_pathname'] = self._source_pathname
146
167
            event_raw_data['cookie'] = self._cookie
147
168
        event = Event(event_raw_data)
161
182
        The given path is considered to be a path and therefore this
162
183
        will not be checked.
163
184
        """
164
 
        if COMMON_ACTIONS[event] == IN_CREATE:
 
185
        if ACTIONS[event] == IN_CREATE:
165
186
            self._subdirs.add(path)
166
 
        elif COMMON_ACTIONS[event] == IN_DELETE and path in self._subdirs:
 
187
        elif ACTIONS[event] == IN_DELETE and path in self._subdirs:
167
188
            self._subdirs.remove(path)
168
189
 
169
190
    @is_valid_os_path(path_indexes=[1])
187
208
        """Add the path of the events to ignore."""
188
209
        if not path.endswith(os.path.sep):
189
210
            path += os.path.sep
190
 
        if path.startswith(self._path):
191
 
            path = path[len(self._path):]
192
 
            self._ignore_paths.append(path)
 
211
        if path.startswith(self.path):
 
212
            path = path[len(self.path):]
 
213
            self.ignore_paths.append(path)
193
214
 
194
215
    @is_valid_os_path(path_indexes=[1])
195
216
    def remove_ignored_path(self, path):
196
217
        """Reaccept path."""
197
218
        if not path.endswith(os.path.sep):
198
219
            path += os.path.sep
199
 
        if path.startswith(self._path):
200
 
            path = path[len(self._path):]
201
 
            if path in self._ignore_paths:
202
 
                self._ignore_paths.remove(path)
 
220
        if path.startswith(self.path):
 
221
            path = path[len(self.path):]
 
222
            if path in self.ignore_paths:
 
223
                self.ignore_paths.remove(path)
203
224
 
 
225
    @defer.inlineCallbacks
204
226
    def start_watching(self):
205
227
        """Tell the watch to start processing events."""
206
 
        for current_child in os.listdir(self._path):
207
 
            full_child_path = os.path.join(self._path, current_child)
 
228
        for current_child in os.listdir(self.path):
 
229
            full_child_path = os.path.join(self.path, current_child)
208
230
            if os.path.isdir(full_child_path):
209
231
                self._subdirs.add(full_child_path)
210
232
        # start to diff threads, one to watch the path, the other to
211
233
        # process the events.
212
234
        self.log.debug('Start watching path.')
213
 
        self._watching = True
 
235
        yield self.platform_watch.start_watching()
214
236
 
215
237
    def stop_watching(self):
216
238
        """Tell the watch to stop processing events."""
217
 
        self.log.info('Stop watching %s', self._path)
218
 
        self._watching = False
 
239
        self.log.info('Stop watching %s', self.path)
 
240
        self.platform_watch.watching = False
219
241
        self._subdirs = set()
220
 
 
221
 
    def update(self, mask, auto_add=False):
222
 
        """Update the info used by the watcher."""
223
 
        self.log.debug('update(%s, %s)', mask, auto_add)
224
 
        self._mask = mask
225
 
        self._auto_add = auto_add
226
 
 
227
 
    @property
228
 
    def path(self):
229
 
        """Return the patch watched."""
230
 
        return self._path
231
 
 
232
 
    @property
233
 
    def auto_add(self):
234
 
        return self._auto_add
 
242
        return self.platform_watch.stop_watching()
 
243
 
 
244
    @property
 
245
    def watching(self):
 
246
        """Return if we are watching."""
 
247
        return self.platform_watch.watching
 
248
 
 
249
    @property
 
250
    def started(self):
 
251
        """A deferred that will be called when the watch is running."""
 
252
        return self.platform_watch.started
 
253
 
 
254
    @property
 
255
    def stopped(self):
 
256
        """A deferred fired when the watch thread has finished."""
 
257
        return self.platform_watch.stopped
235
258
 
236
259
 
237
260
class WatchManager(object):
243
266
 
244
267
    def __init__(self, processor):
245
268
        """Init the manager to keep trak of the different watches."""
246
 
        self._processor = processor
247
269
        self.log = logging.getLogger('ubuntuone.SyncDaemon.platform.common.'
248
270
            + 'filesystem_notifications.WatchManager')
249
271
        self.log.setLevel(TRACE)
 
272
        self._processor = processor
 
273
        # use the platform manager to perform the actual actions
 
274
        self.manager = PlatformWatchManager(self.log)
250
275
        self._wdm = {}
251
 
        self._wd_count = 0
252
276
        self._ignored_paths = []
253
277
 
254
 
    def add_watch(self, path, mask, auto_add=False, quiet=True):
255
 
        """Add a new path to be watch.
 
278
    @defer.inlineCallbacks
 
279
    def _add_single_watch(self, path, mask, quiet=True):
 
280
        """A just one watch."""
 
281
        if path in self._ignored_paths:
 
282
            # simply removed it from the filter
 
283
            self._ignored_paths.remove(path)
 
284
            return
 
285
 
 
286
        # we need to add a new watch
 
287
        self.log.debug('add_single_watch(%s, %s, %s)', path, mask, quiet)
 
288
 
 
289
        # common code that will ensure that we keep track of the watches
 
290
        watch = Watch(len(self._wdm), path, self._processor)
 
291
        self._wdm[len(self._wdm)] = watch
 
292
        yield watch.start_watching()
 
293
 
 
294
        # trust that the platform watch manager to do the rest of the start
 
295
        # operations
 
296
        defer.returnValue(self.manager.add_watch(watch))
 
297
 
 
298
    @is_valid_os_path(path_indexes=[1])
 
299
    def add_watch(self, path, mask, quiet=True):
 
300
        """Add a new path to be watched.
256
301
 
257
302
        The method will ensure that the path is not already present.
258
303
        """
259
 
        raise NotImplementedError("Not implemented on this platform.")
260
 
 
261
 
    def stop(self):
262
 
        """Close the manager and stop all watches."""
263
 
        # Should be implemented for each platform
264
 
        raise NotImplementedError("Not implemented on this platform.")
 
304
        wd = self.get_wd(path)
 
305
        if wd is None:
 
306
            self.log.debug('Adding single watch on %r', path)
 
307
            return self._add_single_watch(path, mask, quiet)
 
308
        else:
 
309
            self.log.debug('Watch already exists on %r', path)
 
310
            return self._wdm[wd].started
265
311
 
266
312
    def get_watch(self, wd):
267
313
        """Return the watch with the given descriptor."""
274
320
            watch = self._wdm[wd]
275
321
            yield watch.stop_watching()
276
322
            del self._wdm[wd]
 
323
            # trust that the platform watch manager will do the rest of the
 
324
            # operations needed to delete a watch
 
325
            self.manager.del_watch(watch)
277
326
            self.log.debug('Watch %s removed.', wd)
278
327
        except KeyError, e:
279
328
            logging.error(str(e))
280
329
 
281
 
    def _add_single_watch(self, path, mask, auto_add=False, quiet=True):
282
 
        if path in self._ignored_paths:
283
 
            # simply removed it from the filter
284
 
            self._ignored_paths.remove(path)
285
 
            return
286
 
        # we need to add a new watch
287
 
        self.log.debug('add_single_watch(%s, %s, %s, %s)', path, mask,
288
 
            auto_add, quiet)
289
 
 
290
 
        return self._adding_watch(path, mask, auto_add)
291
 
 
292
 
    def _adding_watch(self, path, mask, auto_add):
293
 
        """Add the watch to the dict and start it."""
294
 
        # This should be implemented for each OS
295
 
        raise NotImplementedError("Not implemented on this platform.")
296
 
 
297
 
    def update_watch(self, wd, mask=None, rec=False,
298
 
                     auto_add=False, quiet=True):
299
 
        raise NotImplementedError("Not implemented on this platform.")
300
 
 
301
330
    @is_valid_os_path(path_indexes=[1])
302
331
    def get_wd(self, path):
303
332
        """Return the watcher that is used to watch the given path."""
305
334
            path = path + os.path.sep
306
335
        for current_wd in self._wdm:
307
336
            watch_path = self._wdm[current_wd].path
308
 
            if ((watch_path == path or (
309
 
                    watch_path in path and self._wdm[current_wd].auto_add))
 
337
            if ((watch_path == path or watch_path in path)
310
338
                    and path not in self._ignored_paths):
311
339
                return current_wd
312
340
 
323
351
            watch = self._wdm[wd]
324
352
            yield watch.stop_watching()
325
353
            del self._wdm[wd]
 
354
            # trust that the platform watch manager will do the rest of the
 
355
            # operations needed to delete a watch
 
356
            self.manager.rm_watch(watch)
326
357
        except KeyError, err:
327
358
            self.log.error(str(err))
328
359
            if not quiet:
336
367
            self.log.debug('Adding exclude filter for %r', path)
337
368
            self._wdm[wd].ignore_path(path)
338
369
 
 
370
    @defer.inlineCallbacks
 
371
    def stop(self):
 
372
        """Close the manager and stop all watches."""
 
373
        self.log.debug('Stopping watches.')
 
374
        for current_wd in self._wdm:
 
375
            watch = self._wdm[current_wd]
 
376
            yield self.manager.stop_watch(watch)
 
377
            self.log.debug('Stopping Watch on %r.', watch.path)
 
378
        yield self.manager.stop()
 
379
 
339
380
 
340
381
class NotifyProcessor(ProcessEvent):
341
382
    """Processor that takes care of dealing with the events.
350
391
            GeneralINotifyProcessor)
351
392
        self.general_processor = GeneralINotifyProcessor(monitor,
352
393
            self.handle_dir_delete, NAME_TRANSLATIONS,
353
 
            self.platform_is_ignored, IN_IGNORED, ignore_config=ignore_config)
 
394
            path_is_ignored, IN_IGNORED, ignore_config=ignore_config)
354
395
        self.held_event = None
355
396
 
356
397
    def rm_from_mute_filter(self, event, paths):
361
402
        """Add an event and path(s) to the mute filter."""
362
403
        self.general_processor.add_to_mute_filter(event, paths)
363
404
 
364
 
    def platform_is_ignored(self, path):
365
 
        """Should we ignore this path in the current platform.?"""
366
 
        # This should be implemented in each platform
367
 
        raise NotImplementedError("Not implemented on this platform.")
368
 
 
369
405
    @is_valid_syncdaemon_path(path_indexes=[1])
370
406
    def is_ignored(self, path):
371
407
        """Should we ignore this path?"""
552
588
        self.log.setLevel(TRACE)
553
589
        self.fs = fs
554
590
        self.eq = eq
555
 
        # You will need to create the NotifyProcessor and WatchManager
556
 
        # in each OS-specific implementation
 
591
        self._processor = NotifyProcessor(self, ignore_config)
 
592
        self._watch_manager = WatchManager(self._processor)
557
593
 
558
594
    def add_to_mute_filter(self, event, **info):
559
595
        """Add info to mute filter in the processor."""
579
615
        # the logic to check if the watch is already set
580
616
        # is all in WatchManager.add_watch
581
617
        return self._watch_manager.add_watch(dirpath,
582
 
                             self.filesystem_monitor_mask, auto_add=True)
 
618
            self.filesystem_monitor_mask)
583
619
 
584
620
    def add_watches_to_udf_ancestors(self, volume):
585
621
        """Add a inotify watch to volume's ancestors if it's an UDF."""
586
 
        # Should be implemented in each OS if necessary
587
 
        raise NotImplementedError("Not implemented on this platform.")
 
622
        return defer.succeed(True)
588
623
 
589
624
    def is_frozen(self):
590
625
        """Checks if there's something frozen."""