147
151
'VM_UDF_CREATED': ('udf',),
148
152
'VM_UDF_CREATE_ERROR': ('path', 'error'),
149
153
'VM_SHARE_CREATED': ('share_id',),
150
'VM_SHARE_DELETED': ('udf',),
151
'VM_SHARE_DELETE_ERROR': ('path', 'error'),
154
'VM_SHARE_DELETED': ('share',),
155
'VM_SHARE_DELETE_ERROR': ('share_id', 'error'),
152
156
'VM_VOLUME_DELETED': ('volume',),
153
157
'VM_VOLUME_DELETE_ERROR': ('volume_id', 'error'),
154
158
'VM_SHARE_CHANGED': ('share_id',),
158
if 'IN_CREATE' in vars(pyinotify.EventsCodes):
159
# < 0.8; event codes in EventsCodes; events have is_dir attribute
160
evtcodes = pyinotify.EventsCodes
161
event_is_dir = lambda event: event.is_dir
163
# >= 0.8; event codes in pyinotify itself; events have dir attribute
165
event_is_dir = lambda event: event.dir
167
# translates quickly the event and it's is_dir state to our standard events
168
NAME_TRANSLATIONS = {
169
evtcodes.IN_OPEN: 'FS_FILE_OPEN',
170
evtcodes.IN_CLOSE_NOWRITE: 'FS_FILE_CLOSE_NOWRITE',
171
evtcodes.IN_CLOSE_WRITE: 'FS_FILE_CLOSE_WRITE',
172
evtcodes.IN_CREATE: 'FS_FILE_CREATE',
173
evtcodes.IN_CREATE | evtcodes.IN_ISDIR: 'FS_DIR_CREATE',
174
evtcodes.IN_DELETE: 'FS_FILE_DELETE',
175
evtcodes.IN_DELETE | evtcodes.IN_ISDIR: 'FS_DIR_DELETE',
176
evtcodes.IN_MOVED_FROM: 'FS_FILE_DELETE',
177
evtcodes.IN_MOVED_FROM | evtcodes.IN_ISDIR: 'FS_DIR_DELETE',
178
evtcodes.IN_MOVED_TO: 'FS_FILE_CREATE',
179
evtcodes.IN_MOVED_TO | evtcodes.IN_ISDIR: 'FS_DIR_CREATE',
182
# these are the events that will listen from inotify
183
INOTIFY_EVENTS_GENERAL = (
185
evtcodes.IN_CLOSE_NOWRITE |
186
evtcodes.IN_CLOSE_WRITE |
189
evtcodes.IN_MOVED_FROM |
190
evtcodes.IN_MOVED_TO |
191
evtcodes.IN_MOVE_SELF
193
INOTIFY_EVENTS_ANCESTORS = (
195
evtcodes.IN_MOVED_FROM |
196
evtcodes.IN_MOVED_TO |
197
evtcodes.IN_MOVE_SELF
200
162
DEFAULT_HANDLER = "handle_default" # receives (event_name, *args, **kwargs)
203
def validate_filename(real_func):
204
"""Decorator that validates the filename."""
205
def func(self, event):
206
"""If valid, executes original function."""
209
event.name.decode("utf8")
210
except UnicodeDecodeError:
211
dirname = event.path.decode("utf8")
212
self.invnames_log.info("%s in %r: path %r", event.maskname,
214
self.eq.push('FS_INVALID_NAME', dirname, event.name)
216
real_func(self, event)
220
class MuteFilter(object):
221
"""Stores what needs to be muted."""
224
self.log = logging.getLogger('ubuntuone.SyncDaemon.MuteFilter')
226
def add(self, element):
227
"""Add and element to the filter."""
228
self.log.debug("Adding: %s", element)
229
self._cnt[element] = self._cnt.get(element, 0) + 1
231
def rm(self, element):
232
"""Remove an element from the filter."""
233
self.log.debug("Removing: %s", element)
234
new_val = self._cnt[element] - 1
236
del self._cnt[element]
238
self._cnt[element] = new_val
240
def pop(self, element):
241
"""Pops an element from the filter, if there, and returns if it was."""
242
if element not in self._cnt:
245
self._cnt[element] = self._cnt.get(element, 0) - 1
246
if not self._cnt[element]:
248
del self._cnt[element]
250
# log what happened and how many items we have left
251
q = sum(self._cnt.itervalues())
252
self.log.debug("Blocking %s (%d left)", element, q)
257
class _AncestorsINotifyProcessor(pyinotify.ProcessEvent):
258
"""inotify's processor when an event happens on an UDFs ancestor."""
259
def __init__(self, eq):
260
self.log = logging.getLogger('ubuntuone.SyncDaemon.AncestorsINotProc')
263
def _get_udfs(self, path):
264
"""Yield all the subscribed udfs under a specific path."""
265
pathsep = path + os.path.sep
266
for udf in self.eq.fs.vm.udfs.itervalues():
267
udfpath = udf.path + os.path.sep
268
if udfpath.startswith(pathsep) and udf.subscribed:
271
def process_IN_MOVE_SELF(self, event):
272
"""Don't do anything here.
274
We just turned this event on because pyinotify does some
275
path-fixing in its internal processing when this happens.
277
process_IN_MOVED_TO = process_IN_MOVE_SELF
279
def process_IN_MOVED_FROM(self, event):
280
"""Getting it out or renaming means unsuscribe."""
281
if event.mask & evtcodes.IN_ISDIR:
282
unsubscribed_udfs = set()
283
for udf in self._get_udfs(event.pathname):
284
self.log.info("Got MOVED_FROM on path %r, unsubscribing "
285
"udf %s", event.pathname, udf)
286
self.eq.fs.vm.unsubscribe_udf(udf.volume_id)
287
unsubscribed_udfs.add(udf)
288
self._unwatch_ancestors(unsubscribed_udfs)
290
def process_IN_DELETE(self, event):
291
"""Check to see if the UDF was deleted."""
292
if event.mask & evtcodes.IN_ISDIR:
294
for udf in self._get_udfs(event.pathname):
295
self.log.info("Got DELETE on path %r, deleting udf %s",
297
self.eq.fs.vm.delete_volume(udf.volume_id)
298
deleted_udfs.add(udf)
299
self._unwatch_ancestors(deleted_udfs)
301
def _unwatch_ancestors(self, udfs):
302
"""Unwatch the ancestors of the recevied udfs only."""
303
# collect all the ancestors of the received udfs
304
ancestors_to_unwatch = set()
306
ancestors_to_unwatch.update(set(udf.ancestors))
308
# collect the ancestors of all the still subscribed UDFs except
310
sub_udfs = (u for u in self.eq.fs.vm.udfs.itervalues() if u.subscribed)
311
udf_remain = set(sub_udfs) - udfs
312
ancestors_to_keep = set()
313
for udf in udf_remain:
314
ancestors_to_keep.update(set(udf.ancestors))
316
# unwatch only the ancestors of the received udfs
317
only_these = ancestors_to_unwatch - ancestors_to_keep
318
for ancestor in only_these:
319
self.eq.inotify_rm_watch(ancestor)
322
class _GeneralINotifyProcessor(pyinotify.ProcessEvent):
323
"""inotify's processor when a general event happens.
325
This class also catchs the MOVEs events, and synthetises a new
326
FS_(DIR|FILE)_MOVE event when possible.
328
def __init__(self, eq, ignore_config=None):
329
self.log = logging.getLogger('ubuntuone.SyncDaemon.GeneralINotProc')
330
self.invnames_log = logging.getLogger(
331
'ubuntuone.SyncDaemon.InvalidNames')
333
self.held_event = None
335
self.frozen_path = None
336
self.frozen_evts = False
337
self._to_mute = MuteFilter()
338
self.conflict_RE = re.compile(r"\.u1conflict(?:\.\d+)?$")
340
if ignore_config is not None:
341
self.log.info("Ignoring files: %s", ignore_config)
342
# thanks Chipaca for the following "regex composing"
343
complex = '|'.join('(?:' + r + ')' for r in ignore_config)
344
self.ignore_RE = re.compile(complex)
346
self.ignore_RE = None
348
def _mute_filter(self, action, event, *paths):
349
"""Really touches the mute filter."""
350
# all events have one path except the MOVEs
351
if event in ("FS_FILE_MOVE", "FS_DIR_MOVE"):
352
f_path, t_path = paths
353
is_from_forreal = not self.is_ignored(f_path)
354
is_to_forreal = not self.is_ignored(t_path)
355
if is_from_forreal and is_to_forreal:
356
action((event, f_path, t_path))
358
action(('FS_FILE_CREATE', t_path))
359
action(('FS_FILE_CLOSE_WRITE', t_path))
362
if not self.is_ignored(path):
363
action((event, path))
365
def rm_from_mute_filter(self, event, *paths):
366
"""Remove an event and path(s) from the mute filter."""
367
self._mute_filter(self._to_mute.rm, event, *paths)
369
def add_to_mute_filter(self, event, *paths):
370
"""Add an event and path(s) to the mute filter."""
371
self._mute_filter(self._to_mute.add, event, *paths)
373
def on_timeout(self):
374
"""Called on timeout."""
375
if self.held_event is not None:
376
self.release_held_event(True)
378
def release_held_event(self, timed_out=False):
379
"""Release the event on hold to fulfill its destiny."""
383
except error.AlreadyCalled:
384
# self.timeout() was *just* called, do nothing here
386
self.push_event(self.held_event)
387
self.held_event = None
390
def process_IN_OPEN(self, event):
391
"""Filter IN_OPEN to make it happen only in files."""
392
if not (event.mask & evtcodes.IN_ISDIR):
393
self.push_event(event)
396
def process_IN_CLOSE_NOWRITE(self, event):
397
"""Filter IN_CLOSE_NOWRITE to make it happen only in files."""
398
if not (event.mask & evtcodes.IN_ISDIR):
399
self.push_event(event)
401
def process_IN_MOVE_SELF(self, event):
402
"""Don't do anything here.
404
We just turned this event on because pyinotify does some
405
path-fixing in its internal processing when this happens.
410
def process_IN_MOVED_FROM(self, event):
411
"""Capture the MOVED_FROM to maybe syntethize FILE_MOVED."""
412
if self.held_event is not None:
413
self.release_held_event()
415
self.held_event = event
416
self.timer = reactor.callLater(1, self.on_timeout)
418
def is_ignored(self, path):
419
"""should we ignore this path?"""
420
# don't support symlinks yet
421
if os.path.islink(path):
424
# check if we are can read
425
if os.path.exists(path) and not os.access(path, os.R_OK):
426
self.log.warning("Ignoring path as we don't have enough "
427
"permissions to track it: %r", path)
430
is_conflict = self.conflict_RE.search
431
dirname, filename = os.path.split(path)
433
if is_conflict(filename):
435
# ignore partial downloads
436
if filename == '.u1partial' or filename.startswith('.u1partial.'):
439
# and ignore paths that are inside conflicts (why are we even
440
# getting the event?)
441
if any(part.endswith('.u1partial') or is_conflict(part)
442
for part in dirname.split(os.path.sep)):
445
if self.ignore_RE is not None and self.ignore_RE.match(filename):
451
def process_IN_MOVED_TO(self, event):
452
"""Capture the MOVED_TO to maybe syntethize FILE_MOVED."""
453
if self.held_event is not None:
454
if event.cookie == self.held_event.cookie:
457
except error.AlreadyCalled:
458
# self.timeout() was *just* called, do nothing here
461
f_path_dir = self.held_event.path
462
f_path = os.path.join(f_path_dir, self.held_event.name)
463
t_path_dir = event.path
464
t_path = os.path.join(t_path_dir, event.name)
466
is_from_forreal = not self.is_ignored(f_path)
467
is_to_forreal = not self.is_ignored(t_path)
468
if is_from_forreal and is_to_forreal:
469
f_share_id = self.eq.fs.get_by_path(f_path_dir).share_id
470
t_share_id = self.eq.fs.get_by_path(t_path_dir).share_id
471
this_is_a_dir = event_is_dir(event)
476
if f_share_id != t_share_id:
477
# if the share_id are != push a delete/create
478
m = "Delete because of different shares: %r"
479
self.log.info(m, f_path)
480
self.eq_push(evtname+"DELETE", f_path)
481
self.eq_push(evtname+"CREATE", t_path)
482
if not this_is_a_dir:
483
self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
485
self.eq.inotify_watch_fix(f_path, t_path)
486
self.eq_push(evtname+"MOVE", f_path, t_path)
488
# this is the case of a MOVE from something ignored
489
# to a valid filename
490
this_is_a_dir = event_is_dir(event)
495
self.eq_push(evtname+"CREATE", t_path)
496
if not this_is_a_dir:
497
self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
499
self.held_event = None
502
self.release_held_event()
503
self.push_event(event)
505
# we don't have a held_event so this is a move from outside.
506
# if it's a file move it's atomic on POSIX, so we aren't going to
507
# receive a IN_CLOSE_WRITE, so let's fake it for files
508
self.push_event(event)
509
is_dir = event_is_dir(event)
511
t_path = os.path.join(event.path, event.name)
512
self.eq_push('FS_FILE_CLOSE_WRITE', t_path)
514
def eq_push(self, *event_data):
515
"""Sends to EQ the event data, maybe filtering it."""
516
if not self._to_mute.pop(event_data):
517
self.eq.push(*event_data)
520
def process_default(self, event):
521
"""Push the event into the EventQueue."""
522
if self.held_event is not None:
523
self.release_held_event()
524
self.push_event(event)
526
def push_event(self, event):
527
"""Push the event to the EQ."""
529
if event.mask == evtcodes.IN_IGNORED:
532
# change the pattern IN_CREATE to FS_FILE_CREATE or FS_DIR_CREATE
534
evt_name = NAME_TRANSLATIONS[event.mask]
536
raise KeyError("Unhandled Event in INotify: %s" % event)
539
fullpath = os.path.join(event.path, event.name)
541
# check if the path is not frozen
542
if self.frozen_path is not None:
543
if event.path == self.frozen_path:
544
# this will at least store the last one, for debug
546
self.frozen_evts = (evt_name, fullpath)
549
if not self.is_ignored(fullpath):
550
if evt_name == 'FS_DIR_DELETE':
551
self.handle_dir_delete(fullpath)
552
self.eq_push(evt_name, fullpath)
554
def freeze_begin(self, path):
555
"""Puts in hold all the events for this path."""
556
self.log.debug("Freeze begin: %r", path)
557
self.frozen_path = path
558
self.frozen_evts = False
560
def freeze_rollback(self):
561
"""Unfreezes the frozen path, reseting to idle state."""
562
self.log.debug("Freeze rollback: %r", self.frozen_path)
563
self.frozen_path = None
564
self.frozen_evts = False
566
def freeze_commit(self, events):
567
"""Unfreezes the frozen path, sending received events if not dirty.
569
If events for that path happened:
572
- push the here received events, return False
574
self.log.debug("Freeze commit: %r (%d events)",
575
self.frozen_path, len(events))
578
self.log.debug("Dirty by %s", self.frozen_evts)
579
self.frozen_evts = False
582
# push the received events
583
for evt_name, path in events:
584
if not self.is_ignored(path):
585
self.eq_push(evt_name, path)
587
self.frozen_path = None
588
self.frozen_evts = False
591
def handle_dir_delete(self, fullpath):
592
"""Some special work when a directory is deleted."""
593
# remove the watch on that dir from our structures
594
self.eq.inotify_rm_watch(fullpath)
596
# handle the case of move a dir to a non-watched directory
597
paths = self.eq.fs.get_paths_starting_with(fullpath,
599
paths.sort(reverse=True)
600
for path, is_dir in paths:
601
m = "Pushing deletion because of parent dir move: (is_dir=%s) %r"
602
self.log.info(m, is_dir, path)
604
self.eq.inotify_rm_watch(path)
605
self.eq_push('FS_DIR_DELETE', path)
607
self.eq_push('FS_FILE_DELETE', path)
610
165
class EventQueue(object):
611
166
"""Manages the events from different sources and distributes them."""
639
180
def add_to_mute_filter(self, *info):
640
181
"""Add info to mute filter in the processor."""
641
self._processor.add_to_mute_filter(*info)
182
self.monitor.add_to_mute_filter(*info)
643
184
def rm_from_mute_filter(self, *info):
644
185
"""Remove info to mute filter in the processor."""
645
self._processor.rm_from_mute_filter(*info)
186
self.monitor.rm_from_mute_filter(*info)
647
188
def add_empty_event_queue_callback(self, callback):
648
"""add a callback for when the even queue has no more events."""
189
"""Add a callback for when the even queue has no more events."""
649
190
self.empty_event_queue_callbacks.add(callback)
650
191
if not self.dispatching and self.dispatch_queue.empty():
651
192
if callable(callback):
654
195
def remove_empty_event_queue_callback(self, callback):
655
"""remove the callback"""
196
"""Remove the callback."""
656
197
self.empty_event_queue_callbacks.remove(callback)
658
def _hook_inotify_to_twisted(self, wm, notifier):
659
"""This will hook inotify to twisted."""
661
class MyReader(abstract.FileDescriptor):
662
"""Chain between inotify and twisted."""
663
# will never pass a fd to write, pylint: disable-msg=W0223
666
"""Returns the fileno to select()."""
667
# pylint: disable-msg=W0212
671
"""Called when twisted says there's something to read."""
672
notifier.read_events()
673
notifier.process_events()
676
reactor.addReader(reader)
679
199
def shutdown(self):
680
"""Prepares the EQ to be closed."""
681
self._inotify_notifier_gral.stop()
682
self._inotify_notifier_antr.stop()
683
reactor.removeReader(self._inotify_reader_gral)
684
reactor.removeReader(self._inotify_reader_antr)
200
"""Make the monitor shutdown."""
201
self.monitor.shutdown()
686
def inotify_rm_watch(self, dirpath):
203
def rm_watch(self, dirpath):
687
204
"""Remove watch from a dir."""
688
if dirpath in self._general_watchs:
689
w_dict = self._general_watchs
690
w_manager = self._inotify_general_wm
691
elif dirpath in self._ancestors_watchs:
692
w_dict = self._ancestors_watchs
693
w_manager = self._inotify_ancestors_wm
695
self.log.warning("Tried to remove a nonexistent watch on %r",
699
wd = w_dict.pop(dirpath)
700
w_manager.rm_watch(wd)
702
def inotify_add_watch(self, dirpath):
205
self.monitor.rm_watch(dirpath)
207
def add_watch(self, dirpath):
703
208
"""Add watch to a dir."""
704
# see where to add it
705
if self._is_udf_ancestor(dirpath):
707
w_manager = self._inotify_ancestors_wm
708
w_dict = self._ancestors_watchs
709
events = INOTIFY_EVENTS_ANCESTORS
712
w_manager = self._inotify_general_wm
713
w_dict = self._general_watchs
714
events = INOTIFY_EVENTS_GENERAL
717
self.log.debug("Adding %s inotify watch to %r", w_type, dirpath)
718
result = w_manager.add_watch(dirpath, events)
719
w_dict[dirpath] = result[dirpath]
721
def inotify_has_watch(self, dirpath):
209
self.monitor.add_watch(dirpath)
211
def has_watch(self, dirpath):
722
212
"""Check if a dirpath is watched."""
723
return (dirpath in self._general_watchs or
724
dirpath in self._ancestors_watchs)
726
def inotify_watch_fix(self, pathfrom, pathto):
727
"""Fix the path in inotify structures."""
728
if pathfrom in self._general_watchs:
729
wdict = self._general_watchs
730
elif pathfrom in self._ancestors_watchs:
731
wdict = self._ancestors_watchs
733
m = "Tried to fix nonexistent path %r in watches (to %r)"
734
self.log.warning(m, pathfrom, pathto)
738
wdict[pathto] = wdict.pop(pathfrom)
740
def _is_udf_ancestor(self, path):
741
"""Decide if path is an UDF ancestor or not."""
742
for udf in self.fs.vm.udfs.itervalues():
743
parent = os.path.dirname(udf.path) + os.path.sep
744
if parent.startswith(path + os.path.sep):
213
return self.monitor.has_watch(dirpath)
748
215
def unsubscribe(self, obj):
749
216
"""Remove the callback object from the listener queue.