14
14
# You should have received a copy of the GNU General Public License along
15
15
# with this program. If not, see <http://www.gnu.org/licenses/>.
16
"""File notifications on windows."""
21
from Queue import Queue, Empty
22
from threading import Thread
23
from uuid import uuid4
24
from twisted.internet import defer, task
25
from win32con import (
28
FILE_FLAG_BACKUP_SEMANTICS,
29
FILE_NOTIFY_CHANGE_FILE_NAME,
30
FILE_NOTIFY_CHANGE_DIR_NAME,
31
FILE_NOTIFY_CHANGE_ATTRIBUTES,
32
FILE_NOTIFY_CHANGE_SIZE,
33
FILE_NOTIFY_CHANGE_LAST_WRITE,
34
FILE_NOTIFY_CHANGE_SECURITY,
37
from win32file import CreateFile, ReadDirectoryChangesW
38
from ubuntuone.platform.windows.pyinotify import (
54
from ubuntuone.syncdaemon.filesystem_notifications import (
55
GeneralINotifyProcessor
57
from ubuntuone.platform.windows.os_helper import (
62
# constant found in the msdn documentation:
63
# http://msdn.microsoft.com/en-us/library/ff538834(v=vs.85).aspx
64
FILE_LIST_DIRECTORY = 0x0001
65
FILE_NOTIFY_CHANGE_LAST_ACCESS = 0x00000020
66
FILE_NOTIFY_CHANGE_CREATION = 0x00000040
68
# a map between the few events that we have on windows and those
78
# translates quickly the event and it's is_dir state to our standard events
80
IN_OPEN: 'FS_FILE_OPEN',
81
IN_CLOSE_NOWRITE: 'FS_FILE_CLOSE_NOWRITE',
82
IN_CLOSE_WRITE: 'FS_FILE_CLOSE_WRITE',
83
IN_CREATE: 'FS_FILE_CREATE',
84
IN_CREATE | IN_ISDIR: 'FS_DIR_CREATE',
85
IN_DELETE: 'FS_FILE_DELETE',
86
IN_DELETE | IN_ISDIR: 'FS_DIR_DELETE',
87
IN_MOVED_FROM: 'FS_FILE_DELETE',
88
IN_MOVED_FROM | IN_ISDIR: 'FS_DIR_DELETE',
89
IN_MOVED_TO: 'FS_FILE_CREATE',
90
IN_MOVED_TO | IN_ISDIR: 'FS_DIR_CREATE',
93
# the default mask to be used in the watches added by the FilesystemMonitor
95
FILESYSTEM_MONITOR_MASK = FILE_NOTIFY_CHANGE_FILE_NAME | \
96
FILE_NOTIFY_CHANGE_DIR_NAME | \
97
FILE_NOTIFY_CHANGE_ATTRIBUTES | \
98
FILE_NOTIFY_CHANGE_SIZE | \
99
FILE_NOTIFY_CHANGE_LAST_WRITE | \
100
FILE_NOTIFY_CHANGE_SECURITY | \
101
FILE_NOTIFY_CHANGE_LAST_ACCESS
104
# The implementation of the code that is provided as the pyinotify
107
"""Implement the same functions as pyinotify.Watch."""
109
def __init__(self, watch_descriptor, path, mask, auto_add,
110
events_queue=None, exclude_filter=None, proc_fun=None):
111
super(Watch, self).__init__()
112
self.log = logging.getLogger('ubuntuone.platform.windows.' +
113
'filesystem_notifications.Watch')
114
self._watching = False
115
self._descriptor = watch_descriptor
116
self._auto_add = auto_add
117
self.exclude_filter = exclude_filter
118
self._proc_fun = proc_fun
120
self._source_pathname = None
121
# remember the subdirs we have so that when we have a delete we can
122
# check if it was a remove
124
# ensure that we work with an abspath and that we can deal with
125
# long paths over 260 chars.
126
self._path = os.path.abspath(path)
127
if not self._path.startswith(LONG_PATH_PREFIX):
128
self._path = LONG_PATH_PREFIX + self._path
130
# lets make the q as big as possible
131
self._raw_events_queue = Queue()
133
events_queue = Queue()
134
self.events_queue = events_queue
136
def _path_is_dir(self, path):
137
""""Check if the path is a dir and update the local subdir list."""
138
self.log.debug('Testing if path "%s" is a dir', path)
140
if os.path.exists(path):
141
is_dir = os.path.isdir(path)
143
self.log.debug('Path "%s" was deleted subdirs are %s.',
145
# we removed the path, we look in the internal list
146
if path in self._subdirs:
148
self._subdirs.remove(path)
150
self.log.debug('Adding %s to subdirs %s', path, self._subdirs)
151
self._subdirs.append(path)
154
def _process_events(self):
155
"""Process the events form the queue."""
156
# we transform the events to be the same as the one in pyinotify
157
# and then use the proc_fun
158
while self._watching or not self._raw_events_queue.empty():
159
file_name, action = self._raw_events_queue.get()
160
# map the windows events to the pyinotify ones, tis is dirty but
161
# makes the multiplatform better, linux was first :P
162
is_dir = self._path_is_dir(file_name)
163
if os.path.exists(file_name):
164
is_dir = os.path.isdir(file_name)
166
# we removed the path, we look in the internal list
167
if file_name in self._subdirs:
169
self._subdirs.remove(file_name)
171
self._subdirs.append(file_name)
172
mask = WINDOWS_ACTIONS[action]
173
head, tail = os.path.split(file_name)
177
'wd': self._descriptor,
181
'path': head.replace(self.path, '.')
183
# by the way in which the win api fires the events we know for
184
# sure that no move events will be added in the wrong order, this
185
# is kind of hacky, I dont like it too much
186
if WINDOWS_ACTIONS[action] == IN_MOVED_FROM:
187
self._cookie = str(uuid4())
188
self._source_pathname = tail
189
event_raw_data['cookie'] = self._cookie
190
if WINDOWS_ACTIONS[action] == IN_MOVED_TO:
191
event_raw_data['src_pathname'] = self._source_pathname
192
event_raw_data['cookie'] = self._cookie
193
event = Event(event_raw_data)
194
# FIXME: event deduces the pathname wrong and we need to manually
196
event.pathname = file_name
197
# add the event only if we do not have an exclude filter or
198
# the exclude filter returns False, that is, the event will not
200
if not self.exclude_filter or not self.exclude_filter(event):
201
self.log.debug('Addding event %s to queue.', event)
202
self.events_queue.put(event)
205
"""Watch a path that is a directory."""
206
# we are going to be using the ReadDirectoryChangesW whihc requires
207
# a directory handle and the mask to be used.
211
FILE_SHARE_READ | FILE_SHARE_WRITE,
214
FILE_FLAG_BACKUP_SEMANTICS,
217
self.log.debug('Watchng path %s.', self._path)
218
while self._watching:
219
# important information to know about the parameters:
220
# param 1: the handle to the dir
221
# param 2: the size to be used in the kernel to store events
222
# that might be lost while the call is being performed. This
223
# is complicated to fine tune since if you make lots of watcher
224
# you migh used too much memory and make your OS to BSOD
225
results = ReadDirectoryChangesW(
233
# add the diff events to the q so that the can be processed no
235
for action, file in results:
236
full_filename = os.path.join(self._path, file)
237
self._raw_events_queue.put((full_filename, action))
238
self.log.debug('Added %s to raw events queue.',
239
(full_filename, action))
241
def start_watching(self):
242
"""Tell the watch to start processing events."""
243
# get the diff dirs in the path
244
for current_child in listdir(self._path):
245
full_child_path = os.path.join(self._path, current_child)
246
if os.path.isdir(full_child_path):
247
self._subdirs.append(full_child_path)
248
# start to diff threads, one to watch the path, the other to
249
# process the events.
250
self.log.debug('Sart watching path.')
251
self._watching = True
252
watch_thread = Thread(target=self._watch,
253
name='Watch(%s)' % self._path)
254
process_thread = Thread(target=self._process_events,
255
name='Process(%s)' % self._path)
256
process_thread.start()
259
def stop_watching(self):
260
"""Tell the watch to stop processing events."""
261
self._watching = False
264
def update(self, mask, proc_fun=None, auto_add=False):
265
"""Update the info used by the watcher."""
266
self.log.debug('update(%s, %s, %s)', mask, proc_fun, auto_add)
268
self._proc_fun = proc_fun
269
self._auto_add = auto_add
273
"""Return the patch watched."""
278
return self._auto_add
282
return self._proc_fun
285
class WatchManager(object):
286
"""Implement the same functions as pyinotify.WatchManager."""
288
def __init__(self, exclude_filter=lambda path: False, watch_factory=Watch):
289
"""Init the manager to keep trak of the different watches."""
290
super(WatchManager, self).__init__()
291
self.log = logging.getLogger('ubuntuone.platform.windows.'
292
+ 'filesystem_notifications.WatchManager')
295
self._exclude_filter = exclude_filter
296
self._events_queue = Queue()
297
self._ignored_paths = []
298
self._watch_factory = watch_factory
301
"""Close the manager and stop all watches."""
302
self.log.debug('Stopping watches.')
303
for current_wd in self._wdm:
304
self._wdm[current_wd].stop_watching()
305
self.log.debug('Watch for %s stopped.', self._wdm[current_wd].path)
307
def get_watch(self, wd):
308
"""Return the watch with the given descriptor."""
311
def del_watch(self, wd):
312
"""Delete the watch with the given descriptor."""
314
watch = self._wdm[wd]
315
watch.stop_watching()
317
self.log.debug('Watch %s removed.', wd)
319
logging.error(str(e))
321
def _add_single_watch(self, path, mask, proc_fun=None, auto_add=False,
322
quiet=True, exclude_filter=None):
323
self.log.debug('add_single_watch(%s, %s, %s, %s, %s, %s)', path, mask,
324
proc_fun, auto_add, quiet, exclude_filter)
325
self._wdm[self._wd_count] = self._watch_factory(self._wd_count, path, mask,
326
auto_add, events_queue=self._events_queue,
327
exclude_filter=exclude_filter, proc_fun=proc_fun)
328
self._wdm[self._wd_count].start_watching()
330
self.log.debug('Watch count increased to %s', self._wd_count)
332
def add_watch(self, path, mask, proc_fun=None, auto_add=False,
333
quiet=True, exclude_filter=None):
334
if hasattr(path, '__iter__'):
335
self.log.debug('Added collection of watches.')
336
# we are dealing with a collection of paths
337
for current_path in path:
338
if not self.get_wd(current_path):
339
self._add_single_watch(current_path, mask, proc_fun,
340
auto_add, quiet, exclude_filter)
341
elif not self.get_wd(path):
342
self.log.debug('Adding single watch.')
343
self._add_single_watch(path, mask, proc_fun, auto_add,
344
quiet, exclude_filter)
346
def update_watch(self, wd, mask=None, proc_fun=None, rec=False,
347
auto_add=False, quiet=True):
349
watch = self._wdm[wd]
350
watch.stop_watching()
351
self.log.debug('Stopped watch on %s for update.', watch.path)
352
# update the data and restart watching
353
auto_add = auto_add or rec
354
watch.update(mask, proc_fun=proc_fun, auto_add=auto_add)
355
# only start the watcher again if the mask was given, otherwhise
356
# we are not watchng and therefore do not care
358
watch.start_watching()
360
self.log.error(str(e))
362
raise WatchManagerError('Watch %s was not found' % wd, {})
364
def get_wd(self, path):
365
"""Return the watcher that is used to watch the given path."""
366
for current_wd in self._wdm:
367
watch_path = self._wdm[current_wd].path
368
if watch_path == path or (
369
watch_path in path and self._wdm[current_wd].auto_add):
372
def get_path(self, wd):
373
"""Return the path watched by the wath with the given wd."""
374
watch = self._wdm.get(wd)
378
def rm_watch(self, wd, rec=False, quiet=True):
379
"""Remove the the watch with the given wd."""
381
watch = self._wdm[wd]
382
watch.stop_watching()
384
except KeyError, err:
385
self.log.error(str(err))
387
raise WatchManagerError('Watch %s was not found' % wd, {})
389
def rm_path(self, path):
390
"""Remove a watch to the given path."""
391
wd = self.get_wd(path)
393
if self._wdm[wd].path == path:
394
self.log.debug('Removing watch for path "%s"', path)
397
self.log.debug('Adding exclude filter for "%s"', path)
398
# we have a watch that cotains the path as a child path
399
if not path in self._ignored_paths:
400
self._ignored_paths.append(path)
401
# it would be very tricky to remove a subpath from a watcher that is
402
# looking at changes in ther kids. To make it simpler and less error
403
# prone (and even better performant since we use less threads) we will
404
# add a filter to the events in the watcher so that the events from
405
# that child are not received :)
406
def ignore_path(event):
407
"""Ignore an event if it has a given path."""
408
for ignored_path in self._ignored_paths:
409
if ignore_path in event.pathname:
413
# FIXME: This assumes that we do not have other function
414
# which in our usecase is correct, but what is we move this
415
# to other project?!? Maybe using the manager
416
# exclude_filter is better
417
self._wdm[wd].exclude_filter = ignore_path
421
"""Return a reference to the dictionary that contains the watches."""
425
def events_queue(self):
426
"""Return the queue with the events that the manager contains."""
427
return self._events_queue
430
class Notifier(object):
432
Read notifications, process events. Inspired by the pyinotify.Notifier
435
def __init__(self, watch_manager, default_proc_fun=None, read_freq=0,
436
threshold=10, timeout=-1):
437
"""Init to process event according to the given timeout & threshold."""
438
super(Notifier, self).__init__()
439
self.log = logging.getLogger('ubuntuone.platform.windows.'
440
+ 'filesystem_notifications.Notifier')
441
# Watch Manager instance
442
self._watch_manager = watch_manager
443
# Default processing method
444
self._default_proc_fun = default_proc_fun
445
if default_proc_fun is None:
446
self._default_proc_fun = PrintAllEvents()
448
self._read_freq = read_freq
449
self._threshold = threshold
450
self._timeout = timeout
453
return self._default_proc_fun
455
def process_events(self):
457
Process the event given the threshold and the timeout.
459
self.log.debug('Processing events with threashold: %s and timeout: %s',
460
self._threshold, self._timeout)
461
# we will process an amount of events equal to the threshold of
462
# the notifier and will block for the amount given by the timeout
464
while processed_events < self._threshold:
467
if not self._timeout or self._timeout < 0:
468
raw_event = self._watch_manager.events_queue.get(
471
raw_event = self._watch_manager.events_queue.get(
472
timeout=self._timeout)
473
watch = self._watch_manager.get_watch(raw_event.wd)
475
# Not really sure how we ended up here, nor how we should
476
# handle these types of events and if it is appropriate to
477
# completly skip them (like we are doing here).
478
self.log.warning('Unable to retrieve Watch object '
479
+ 'associated to %s', raw_event)
480
processed_events += 1
482
if watch and watch.proc_fun:
483
self.log.debug('Executing proc_fun from watch.')
484
watch.proc_fun(raw_event) # user processings
486
self.log.debug('Executing default_proc_fun')
487
self._default_proc_fun(raw_event)
488
processed_events += 1
490
# increase the number of processed events, and continue
491
processed_events += 1
495
"""Stop processing events and the watch manager."""
496
self._watch_manager.stop()
499
class NotifyProcessor(ProcessEvent):
500
"""Processor that takes care of dealing with the events."""
502
def __init__(self, monitor, ignore_config=None):
503
self.general_processor = GeneralINotifyProcessor(monitor,
504
self.handle_dir_delete, NAME_TRANSLATIONS,
505
self.platform_is_ignored, IN_IGNORED, ignore_config=ignore_config)
506
self.held_event = None
508
def rm_from_mute_filter(self, event, paths):
509
"""Remove event from the mute filter."""
510
self.general_processor.rm_from_mute_filter(event, paths)
512
def add_to_mute_filter(self, event, paths):
513
"""Add an event and path(s) to the mute filter."""
514
self.general_processor.add_to_mute_filter(event, paths)
516
def platform_is_ignored(self, path):
517
"""Should we ignore this path in the current platform.?"""
518
# don't support links yet
519
if path.endswith('.lnk'):
523
def is_ignored(self, path):
524
"""Should we ignore this path?"""
525
return self.general_processor.is_ignored(path)
527
def release_held_event(self, timed_out=False):
528
"""Release the event on hold to fulfill its destiny."""
529
self.general_processor.push_event(self.held_event)
530
self.held_event = None
532
def process_IN_MODIFY(self, event):
533
"""Capture a modify event and fake an open ^ close write events."""
534
# on windows we just get IN_MODIFY, lets always fake
535
# an OPEN & CLOSE_WRITE couple
536
raw_open = raw_close = {
542
# caculate the open mask
544
raw_open['mask'] = IN_OPEN | IN_ISDIR
546
raw_open['mask'] = IN_OPEN
547
# create the event using the raw data, then fix the pathname param
548
open_event = Event(raw_open)
549
open_event.pathname = event.pathname
551
self.general_processor.push_event(open_event)
552
# calculate the close mask
554
raw_close['mask'] = IN_CLOSE_WRITE | IN_ISDIR
556
raw_close['mask'] = IN_CLOSE_WRITE
557
close_event = Event(raw_close)
558
close_event.pathname = event.pathname
559
# push the close event
560
self.general_processor.push_event(close_event)
562
def process_IN_MOVED_FROM(self, event):
563
"""Capture the MOVED_FROM to maybe syntethize FILE_MOVED."""
564
if self.held_event is not None:
565
self.general_processor.log.warn('Lost pair event of %s', self.held_event)
566
self.held_event = event
568
def process_IN_MOVED_TO(self, event):
569
"""Capture the MOVED_TO to maybe syntethize FILE_MOVED."""
570
if self.held_event is not None:
571
if event.cookie == self.held_event.cookie:
572
f_path_dir = os.path.split(self.held_event.pathname)[0]
573
t_path_dir = os.path.split(event.pathname)[0]
575
is_from_forreal = not self.is_ignored(self.held_event.pathname)
576
is_to_forreal = not self.is_ignored(event.pathname)
577
if is_from_forreal and is_to_forreal:
578
f_share_id = self.general_processor.get_path_share_id(
580
t_share_id = self.general_processor.get_path_share_id(
586
if f_share_id != t_share_id:
587
# if the share_id are != push a delete/create
588
m = "Delete because of different shares: %r"
589
self.log.info(m, self.held_event.pathname)
590
self.general_processor.eq_push(
591
evtname + "DELETE", path=self.held_event.pathname)
592
self.general_processor.eq_push(
593
evtname + "CREATE", path=event.pathname)
595
self.general_processor.eq_push(
596
'FS_FILE_CLOSE_WRITE', path=event.pathname)
598
self.general_processor.eq_push(evtname + "MOVE",
599
path_from=self.held_event.pathname,
600
path_to=event.pathname)
602
# this is the case of a MOVE from something ignored
603
# to a valid filename
608
self.general_processor.eq_push(evtname + "CREATE",
611
self.general_processor.eq_push('FS_FILE_CLOSE_WRITE',
614
self.held_event = None
617
self.release_held_event()
618
self.general_processor.push_event(event)
620
# We should never get here on windows, I really do not know how we
622
self.general_processor.log.warn('Cookie does not match the previoues held event!')
623
self.general_processor.log.warn('Ignoring %s', event)
625
def process_default(self, event):
626
"""Push the event into the EventQueue."""
627
if self.held_event is not None:
628
self.release_held_event()
629
self.general_processor.push_event(event)
631
def handle_dir_delete(self, fullpath):
632
"""Some special work when a directory is deleted."""
633
# remove the watch on that dir from our structures
634
self.general_processor.rm_watch(fullpath)
636
# handle the case of move a dir to a non-watched directory
637
paths = self.general_processor.get_paths_starting_with(fullpath,
640
paths.sort(reverse=True)
641
for path, is_dir in paths:
642
m = "Pushing deletion because of parent dir move: (is_dir=%s) %r"
643
self.general_processor.log.info(m, is_dir, path)
645
self.general_processor.rm_watch(path)
646
self.general_processor.eq_push('FS_DIR_DELETE', path=path)
648
self.general_processor.eq_push('FS_FILE_DELETE', path=path)
650
def freeze_begin(self, path):
651
"""Puts in hold all the events for this path."""
652
self.general_processor.freeze_begin(path)
654
def freeze_rollback(self):
655
"""Unfreezes the frozen path, reseting to idle state."""
656
self.general_processor.freeze_rollback()
658
def freeze_commit(self, events):
659
"""Unfreezes the frozen path, sending received events if not dirty.
661
If events for that path happened:
664
- push the here received events, return False
666
return self.general_processor.freeze_commit(events)
669
def mute_filter(self):
670
"""Return the mute filter used by the processor."""
671
return self.general_processor.filter
674
def frozen_path(self):
675
"""Return the frozen path."""
676
return self.general_processor.frozen_path
680
"""Return the logger of the instance."""
681
return self.general_processor.log
18
684
class FilesystemMonitor(object):
19
685
"""Manages the signals from filesystem."""
20
# TODO: Implement this on windows!
687
def __init__(self, eq, fs, ignore_config=None, timeout=0.1):
688
self.log = logging.getLogger('ubuntuone.SyncDaemon.FSMonitor')
691
# XXX: We need to find a decent time for the time out, is 0.2 seconds
693
self.timeout = timeout
695
self._watch_manager = WatchManager()
696
self._processor = NotifyProcessor(self, ignore_config)
697
self._notifier = Notifier(self._watch_manager, self._processor)
698
self._process_task = self._hook_inotify_to_twisted()
700
def _fix_mute_filter_info(self, info):
701
# we always make sure that the long path prefix is present so
702
# that we can deal with the paths
703
if 'path' in info and not info['path'].startswith(LONG_PATH_PREFIX):
704
info['path'] = LONG_PATH_PREFIX + info['path']
705
if 'path_to' in info and \
706
not info['path_to'].startswith(LONG_PATH_PREFIX):
707
info['path_to'] = LONG_PATH_PREFIX + info['path_to']
708
if 'path_from' in info and \
709
not info['path_from'].startswith(LONG_PATH_PREFIX):
710
info['path_from'] = LONG_PATH_PREFIX + info['path_from']
712
def add_to_mute_filter(self, event, **info):
713
"""Add info to mute filter in the processor."""
714
self._fix_mute_filter_info(info)
715
self._processor.add_to_mute_filter(event, info)
717
def rm_from_mute_filter(self, event, **info):
718
"""Remove info to mute filter in the processor."""
719
self._fix_mute_filter_info(info)
720
self._processor.rm_from_mute_filter(event, info)
722
def _hook_inotify_to_twisted(self):
723
"""This will hook inotify to twisted."""
725
# since the select does not work on windows besides sockets
726
# we have to use a much uglier techinique which is to perform
727
# a pool our selfs which might not have events in the Queue of the
728
# notifier. To make this as less painfull as possible we did set the
729
# notifier to not have a timeout
730
def process_events():
731
self._notifier.process_events()
733
process_task = task.LoopingCall(process_events)
734
process_task.start(self.timeout)
738
"""Prepares the EQ to be closed."""
739
self._notifier.stop()
740
self._process_task.stop()
742
def rm_watch(self, dirpath):
743
"""Remove watch from a dir."""
744
# trust the implementation of the manager
745
self._watch_manager.rm_path(dirpath)
747
def add_watch(self, dirpath):
748
"""Add watch to a dir."""
749
# we try to get a watch descriptor, it it exists it means that
750
# the path is watch either by an specific watch of by a watch that
751
# is also watching it kids
752
if not self._watch_manager.get_wd(dirpath):
753
# we need to add a watch which will also watch its kids
754
self._watch_manager.add_watch(dirpath, FILESYSTEM_MONITOR_MASK,
757
def has_watch(self, dirpath):
758
"""Check if a dirpath is watched."""
759
return self._watch_manager.get_wd(dirpath) is not None
762
"""Checks if there's something frozen."""
763
return self._processor.frozen_path is not None
765
def freeze_begin(self, path):
766
"""Puts in hold all the events for this path."""
767
if self._processor.frozen_path is not None:
768
raise ValueError("There's something already frozen!")
769
self._processor.freeze_begin(path)
771
def freeze_rollback(self):
772
"""Unfreezes the frozen path, reseting to idle state."""
773
if self._processor.frozen_path is None:
774
raise ValueError("Rolling back with nothing frozen!")
775
self._processor.freeze_rollback()
777
def freeze_commit(self, events):
778
"""Unfreezes the frozen path, sending received events if not dirty.
780
If events for that path happened:
783
- push the here received events, return False
785
if self._processor.frozen_path is None:
786
raise ValueError("Commiting with nothing frozen!")
788
d = defer.execute(self._processor.freeze_commit, events)