34
34
from twisted.internet import defer, reactor
36
from ubuntuone.platform.filesystem_notifications import common
36
from ubuntuone.platform.filesystem_notifications.pyinotify_agnostic import (
39
44
# a map between the few events that we have on common platforms and those
40
45
# found in pyinotify
41
common.COMMON_ACTIONS = {
42
fsevents.IN_CREATE: common.IN_CREATE,
43
fsevents.IN_DELETE: common.IN_DELETE,
44
fsevents.IN_MODIFY: common.IN_MODIFY,
45
fsevents.IN_MOVED_FROM: common.IN_MOVED_FROM,
46
fsevents.IN_MOVED_TO: common.IN_MOVED_TO,
47
fsevents.IN_CREATE: IN_CREATE,
48
fsevents.IN_DELETE: IN_DELETE,
49
fsevents.IN_MODIFY: IN_MODIFY,
50
fsevents.IN_MOVED_FROM: IN_MOVED_FROM,
51
fsevents.IN_MOVED_TO: IN_MOVED_TO,
49
54
# a map of the actions to names so that we have better logs.
50
common.COMMON_ACTIONS_NAMES = {
51
fsevents.IN_CREATE: 'IN_CREATE',
52
fsevents.IN_DELETE: 'IN_DELETE',
53
fsevents.IN_MODIFY: 'IN_MODIFY',
54
fsevents.IN_MOVED_FROM: 'IN_MOVED_FROM',
55
fsevents.IN_MOVED_TO: 'IN_MOVED_TO',
56
fsevents.IN_CREATE: 'IN_CREATE',
57
fsevents.IN_DELETE: 'IN_DELETE',
58
fsevents.IN_MODIFY: 'IN_MODIFY',
59
fsevents.IN_MOVED_FROM: 'IN_MOVED_FROM',
60
fsevents.IN_MOVED_TO: 'IN_MOVED_TO',
64
def path_is_ignored(self, path):
65
"""Should we ignore this path in the current platform.?"""
66
# don't support links yet
67
if os.path.islink(path):
59
72
# The implementation of the code that is provided as the pyinotify substitute
60
class Watch(common.Watch):
61
74
"""Implement the same functions as pyinotify.Watch."""
63
def __init__(self, watch_descriptor, path, mask, auto_add, processor,
65
super(Watch, self).__init__(watch_descriptor, path, mask, auto_add,
76
def __init__(self, path, process_events):
77
"""Create a new instance for the given path.
79
The process_events parameter is a callback to be executed in the main
80
reactor thread to convert events in pyinotify events and add them to
83
self.path = os.path.abspath(path)
84
self.process_events = process_events
86
self.ignore_paths = []
67
87
# Create stream with folder to watch
68
88
self.stream = fsevents.Stream(self._process_events,
69
89
path, file_events=True)
75
95
def _process_events_in_main_thread(self, event):
76
96
"""Process the events from the queue."""
77
# do not do it if we stop watching and the events are empty
78
if not self._watching:
81
97
action, cookie, file_name = (event.mask, event.cookie, event.name)
82
if any([file_name.startswith(path)
83
for path in self._ignore_paths]):
85
syncdaemon_path = os.path.join(self._path, file_name)
86
self._process_events_from_filesystem(action, file_name, cookie,
99
syncdaemon_path = os.path.join(self.path, file_name)
100
self.process_events(action, file_name, cookie,
103
def start_watching(self):
104
"""Start watching."""
106
return defer.succeed(self.watching)
108
def stop_watching(self):
110
self.watching = False
111
return defer.succeed(self.watching)
89
113
# For API compatibility
91
115
def started(self):
92
116
"""A deferred that will be called when the watch is running."""
93
return defer.succeed(self._watching)
117
return defer.succeed(self.watching)
96
120
def stopped(self):
97
121
"""A deferred fired when the watch thread has finished."""
98
return defer.succeed(self._watching)
101
class WatchManager(common.WatchManager):
122
return defer.succeed(self.watching)
125
class WatchManager(object):
102
126
"""Implement the same functions as pyinotify.WatchManager.
104
128
All paths passed to methods in this class should be darwin paths.
108
def __init__(self, processor):
132
def __init__(self, log):
109
133
"""Init the manager to keep track of the different watches."""
110
super(WatchManager, self).__init__(processor)
111
135
self.observer = fsevents.Observer()
112
136
self.observer.start()
114
def add_watch(self, path, mask, auto_add=False, quiet=True):
115
"""Add a new path to be watched.
117
The method will ensure that the path is not already present.
119
if not isinstance(path, str):
120
e = NotImplementedError("No implementation on this platform.")
122
wd = self.get_wd(path)
124
self.log.debug('Adding single watch on %r', path)
125
return self._add_single_watch(path, mask, auto_add, quiet)
127
self.log.debug('Watch already exists on %r', path)
128
return self._wdm[wd].started
130
138
def __del__(self):
131
139
"""Stop the observer."""
132
140
self.observer.stop()
142
def stop_watch(self, watch):
143
"""Stop a given watch."""
144
watch.stop_watching()
145
self.observer.unschedule(watch.platform_watch.stream)
146
return defer.succeed(True)
135
"""Close the manager and stop all watches."""
136
self.log.debug('Stopping watches.')
137
for current_wd in self._wdm:
138
self._wdm[current_wd].stop_watching()
139
self.observer.unschedule(self._wdm[current_wd].stream)
140
self.log.debug('Stopping Watch on %r.', self._wdm[current_wd].path)
149
"""Stop the manager."""
141
150
self.observer.stop()
143
def del_watch(self, wd):
144
"""Delete the watch with the given descriptor."""
145
watch = self.get_watch(wd)
152
def del_watch(self, watch):
153
"""Delete the watch and clean resources."""
146
154
self.observer.unschedule(watch.stream)
147
return super(WatchManager, self).del_watch(wd)
149
def _adding_watch(self, path, mask, auto_add):
156
def add_watch(self, watch):
150
157
"""This method perform actually the action of registering the watch."""
151
watch = Watch(self._wd_count, path, mask, auto_add, self._processor)
152
self._wdm[self._wd_count] = watch
153
self._wdm[self._wd_count].start_watching()
154
self.observer.schedule(self._wdm[self._wd_count].stream)
156
return defer.succeed(True)
158
self.observer.schedule(watch.platform_watch.stream)
158
def rm_watch(self, wd, rec=False, quiet=True):
161
def rm_watch(self, watch):
159
162
"""Remove the the watch with the given wd."""
160
watch = self.get_watch(wd)
161
self.observer.unschedule(watch.stream)
162
super(WatchManager, self).del_watch(wd, rec, quiet)
165
class FilesystemMonitor(object):
166
"""Empty implementation of FilesystemMonitor"""
169
class NotifyProcessor(object):
170
"""Empty implementation of NotifyProcessor"""
163
self.observer.unschedule(watch.platform_watch.stream)