60
if sys.platform == 'darwin':
61
from ubuntuone.platform.filesystem_notifications import darwin
63
elif sys.platform == 'win32':
64
from ubuntuone.platform.filesystem_notifications import windows
67
raise ImportError('Not supported platform')
59
69
# a map between the few events that we have on common platforms and those
60
70
# found in pyinotify
71
ACTIONS = source.ACTIONS
63
73
# a map of the actions to names so that we have better logs.
64
COMMON_ACTIONS_NAMES = {}
66
# We should have this here, because we use if from other modules that
67
# share this, but we need to declare it this way yo avoid flakes issues.
74
ACTIONS_NAMES = source.ACTIONS_NAMES
76
# ignore paths in the platform, mainly links atm
77
path_is_ignored = source.path_is_ignored
79
# the base class to be use for a platform
80
PlatformWatch = source.Watch
81
PlatformWatchManager = source.WatchManager
70
83
# translates quickly the event and it's is_dir state to our standard events
71
84
NAME_TRANSLATIONS = {
89
102
class Watch(object):
90
103
"""Implement the same functions as pyinotify.Watch."""
92
def __init__(self, watch_descriptor, path, mask, auto_add, processor,
94
self.log = logging.getLogger('ubuntuone.SyncDaemon.platform.common.' +
95
'filesystem_notifications.Watch')
96
self.log.setLevel(TRACE)
105
def __init__(self, watch_descriptor, path, processor):
106
"""Create a new watch."""
108
# do ensure that we provide a os.path.sep
109
if not path.endswith(os.path.sep):
112
self.ignore_paths = []
97
113
self._processor = processor
98
self._buf_size = buf_size
99
self._watching = False
100
114
self._descriptor = watch_descriptor
101
self._auto_add = auto_add
102
self._ignore_paths = []
103
115
self._cookie = None
104
116
self._source_pathname = None
105
self._process_thread = None
106
self._watch_handle = None
107
117
# remember the subdirs we have so that when we have a delete we can
108
118
# check if it was a remove
109
119
self._subdirs = set()
110
# ensure that we work with an abspath and that we can deal with
111
# long paths over 260 chars.
112
if not path.endswith(os.path.sep):
114
self._path = os.path.abspath(path)
117
def _process_events_from_filesystem(self, action, file_name, cookie,
121
# platform watch used to deal with the platform details
122
self.platform_watch = PlatformWatch(self.path, self.process_events)
124
self.log = logging.getLogger('ubuntuone.SyncDaemon.platform.common.' +
125
'filesystem_notifications.Watch')
126
self.log.setLevel(TRACE)
128
def process_events(self, action, file_name, cookie, syncdaemon_path):
119
129
"""Process the events from the queue."""
130
# do not process events when the watch was stopped
131
if not self.platform_watch.watching:
134
# do not process those events that should be ignored
135
if any([file_name.startswith(path)
136
for path in self.ignore_paths]):
120
139
# map the filesystem events to the pyinotify ones, tis is dirty but
121
140
# makes the multiplatform better, linux was first :P
122
full_dir_path = os.path.join(self._path, file_name)
141
full_dir_path = os.path.join(self.path, file_name)
123
142
is_dir = self._path_is_dir(full_dir_path)
125
145
# we need to update the list of subdirs that we have
126
146
self._update_subdirs(full_dir_path, action)
127
mask = COMMON_ACTIONS[action]
148
mask = ACTIONS[action]
128
149
head, tail = os.path.split(file_name)
137
# by the way in which the win api fires the events we know for
158
# by the way in which the api fires the events we know for
138
159
# sure that no move events will be added in the wrong order, this
139
160
# is kind of hacky, I dont like it too much
140
if COMMON_ACTIONS[action] == IN_MOVED_FROM:
161
if ACTIONS[action] == IN_MOVED_FROM:
141
162
self._cookie = cookie
142
163
self._source_pathname = tail
143
164
event_raw_data['cookie'] = self._cookie
144
if COMMON_ACTIONS[action] == IN_MOVED_TO:
165
if ACTIONS[action] == IN_MOVED_TO:
145
166
event_raw_data['src_pathname'] = self._source_pathname
146
167
event_raw_data['cookie'] = self._cookie
147
168
event = Event(event_raw_data)
187
208
"""Add the path of the events to ignore."""
188
209
if not path.endswith(os.path.sep):
189
210
path += os.path.sep
190
if path.startswith(self._path):
191
path = path[len(self._path):]
192
self._ignore_paths.append(path)
211
if path.startswith(self.path):
212
path = path[len(self.path):]
213
self.ignore_paths.append(path)
194
215
@is_valid_os_path(path_indexes=[1])
195
216
def remove_ignored_path(self, path):
196
217
"""Reaccept path."""
197
218
if not path.endswith(os.path.sep):
198
219
path += os.path.sep
199
if path.startswith(self._path):
200
path = path[len(self._path):]
201
if path in self._ignore_paths:
202
self._ignore_paths.remove(path)
220
if path.startswith(self.path):
221
path = path[len(self.path):]
222
if path in self.ignore_paths:
223
self.ignore_paths.remove(path)
225
@defer.inlineCallbacks
204
226
def start_watching(self):
205
227
"""Tell the watch to start processing events."""
206
for current_child in os.listdir(self._path):
207
full_child_path = os.path.join(self._path, current_child)
228
for current_child in os.listdir(self.path):
229
full_child_path = os.path.join(self.path, current_child)
208
230
if os.path.isdir(full_child_path):
209
231
self._subdirs.add(full_child_path)
210
232
# start to diff threads, one to watch the path, the other to
211
233
# process the events.
212
234
self.log.debug('Start watching path.')
213
self._watching = True
235
yield self.platform_watch.start_watching()
215
237
def stop_watching(self):
216
238
"""Tell the watch to stop processing events."""
217
self.log.info('Stop watching %s', self._path)
218
self._watching = False
239
self.log.info('Stop watching %s', self.path)
240
self.platform_watch.watching = False
219
241
self._subdirs = set()
221
def update(self, mask, auto_add=False):
222
"""Update the info used by the watcher."""
223
self.log.debug('update(%s, %s)', mask, auto_add)
225
self._auto_add = auto_add
229
"""Return the patch watched."""
234
return self._auto_add
242
return self.platform_watch.stop_watching()
246
"""Return if we are watching."""
247
return self.platform_watch.watching
251
"""A deferred that will be called when the watch is running."""
252
return self.platform_watch.started
256
"""A deferred fired when the watch thread has finished."""
257
return self.platform_watch.stopped
237
260
class WatchManager(object):
244
267
def __init__(self, processor):
245
268
"""Init the manager to keep trak of the different watches."""
246
self._processor = processor
247
269
self.log = logging.getLogger('ubuntuone.SyncDaemon.platform.common.'
248
270
+ 'filesystem_notifications.WatchManager')
249
271
self.log.setLevel(TRACE)
272
self._processor = processor
273
# use the platform manager to perform the actual actions
274
self.manager = PlatformWatchManager(self.log)
252
276
self._ignored_paths = []
254
def add_watch(self, path, mask, auto_add=False, quiet=True):
255
"""Add a new path to be watch.
278
@defer.inlineCallbacks
279
def _add_single_watch(self, path, mask, quiet=True):
280
"""A just one watch."""
281
if path in self._ignored_paths:
282
# simply removed it from the filter
283
self._ignored_paths.remove(path)
286
# we need to add a new watch
287
self.log.debug('add_single_watch(%s, %s, %s)', path, mask, quiet)
289
# common code that will ensure that we keep track of the watches
290
watch = Watch(len(self._wdm), path, self._processor)
291
self._wdm[len(self._wdm)] = watch
292
yield watch.start_watching()
294
# trust that the platform watch manager to do the rest of the start
296
defer.returnValue(self.manager.add_watch(watch))
298
@is_valid_os_path(path_indexes=[1])
299
def add_watch(self, path, mask, quiet=True):
300
"""Add a new path to be watched.
257
302
The method will ensure that the path is not already present.
259
raise NotImplementedError("Not implemented on this platform.")
262
"""Close the manager and stop all watches."""
263
# Should be implemented for each platform
264
raise NotImplementedError("Not implemented on this platform.")
304
wd = self.get_wd(path)
306
self.log.debug('Adding single watch on %r', path)
307
return self._add_single_watch(path, mask, quiet)
309
self.log.debug('Watch already exists on %r', path)
310
return self._wdm[wd].started
266
312
def get_watch(self, wd):
267
313
"""Return the watch with the given descriptor."""
274
320
watch = self._wdm[wd]
275
321
yield watch.stop_watching()
276
322
del self._wdm[wd]
323
# trust that the platform watch manager will do the rest of the
324
# operations needed to delete a watch
325
self.manager.del_watch(watch)
277
326
self.log.debug('Watch %s removed.', wd)
278
327
except KeyError, e:
279
328
logging.error(str(e))
281
def _add_single_watch(self, path, mask, auto_add=False, quiet=True):
282
if path in self._ignored_paths:
283
# simply removed it from the filter
284
self._ignored_paths.remove(path)
286
# we need to add a new watch
287
self.log.debug('add_single_watch(%s, %s, %s, %s)', path, mask,
290
return self._adding_watch(path, mask, auto_add)
292
def _adding_watch(self, path, mask, auto_add):
293
"""Add the watch to the dict and start it."""
294
# This should be implemented for each OS
295
raise NotImplementedError("Not implemented on this platform.")
297
def update_watch(self, wd, mask=None, rec=False,
298
auto_add=False, quiet=True):
299
raise NotImplementedError("Not implemented on this platform.")
301
330
@is_valid_os_path(path_indexes=[1])
302
331
def get_wd(self, path):
303
332
"""Return the watcher that is used to watch the given path."""
336
367
self.log.debug('Adding exclude filter for %r', path)
337
368
self._wdm[wd].ignore_path(path)
370
@defer.inlineCallbacks
372
"""Close the manager and stop all watches."""
373
self.log.debug('Stopping watches.')
374
for current_wd in self._wdm:
375
watch = self._wdm[current_wd]
376
yield self.manager.stop_watch(watch)
377
self.log.debug('Stopping Watch on %r.', watch.path)
378
yield self.manager.stop()
340
381
class NotifyProcessor(ProcessEvent):
341
382
"""Processor that takes care of dealing with the events.
350
391
GeneralINotifyProcessor)
351
392
self.general_processor = GeneralINotifyProcessor(monitor,
352
393
self.handle_dir_delete, NAME_TRANSLATIONS,
353
self.platform_is_ignored, IN_IGNORED, ignore_config=ignore_config)
394
path_is_ignored, IN_IGNORED, ignore_config=ignore_config)
354
395
self.held_event = None
356
397
def rm_from_mute_filter(self, event, paths):
361
402
"""Add an event and path(s) to the mute filter."""
362
403
self.general_processor.add_to_mute_filter(event, paths)
364
def platform_is_ignored(self, path):
365
"""Should we ignore this path in the current platform.?"""
366
# This should be implemented in each platform
367
raise NotImplementedError("Not implemented on this platform.")
369
405
@is_valid_syncdaemon_path(path_indexes=[1])
370
406
def is_ignored(self, path):
371
407
"""Should we ignore this path?"""
552
588
self.log.setLevel(TRACE)
555
# You will need to create the NotifyProcessor and WatchManager
556
# in each OS-specific implementation
591
self._processor = NotifyProcessor(self, ignore_config)
592
self._watch_manager = WatchManager(self._processor)
558
594
def add_to_mute_filter(self, event, **info):
559
595
"""Add info to mute filter in the processor."""
579
615
# the logic to check if the watch is already set
580
616
# is all in WatchManager.add_watch
581
617
return self._watch_manager.add_watch(dirpath,
582
self.filesystem_monitor_mask, auto_add=True)
618
self.filesystem_monitor_mask)
584
620
def add_watches_to_udf_ancestors(self, volume):
585
621
"""Add a inotify watch to volume's ancestors if it's an UDF."""
586
# Should be implemented in each OS if necessary
587
raise NotImplementedError("Not implemented on this platform.")
622
return defer.succeed(True)
589
624
def is_frozen(self):
590
625
"""Checks if there's something frozen."""