1
# -*- coding: utf-8 *-*
3
# Copyright 2012 Canonical Ltd.
5
# This program is free software: you can redistribute it and/or modify it
6
# under the terms of the GNU General Public License version 3, as published
7
# by the Free Software Foundation.
9
# This program is distributed in the hope that it will be useful, but
10
# WITHOUT ANY WARRANTY; without even the implied warranties of
11
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
12
# PURPOSE. See the GNU General Public License for more details.
14
# You should have received a copy of the GNU General Public License along
15
# with this program. If not, see <http://www.gnu.org/licenses/>.
17
# In addition, as a special exception, the copyright holders give
18
# permission to link the code of portions of this program with the
19
# OpenSSL library under certain conditions as described in each
20
# individual source file, and distribute linked combinations
22
# You must obey the GNU General Public License in all respects
23
# for all of the code used other than OpenSSL. If you modify
24
# file(s) with this exception, you may extend this exception to your
25
# version of the file(s), but you are not obligated to do so. If you
26
# do not wish to do so, delete this exception statement from your
27
# version. If you delete this exception statement from all source
28
# files in the program, then also delete it here.
29
"""Tests for the fsevents daemon integration."""
33
from twisted.internet import defer, protocol
35
from contrib.testing.testcase import BaseTwistedTestCase
36
from ubuntuone.darwin import fsevents
37
from ubuntuone.devtools.testcases.txsocketserver import TidyUnixServer
38
from ubuntuone.platform.filesystem_notifications.monitor.darwin import (
41
from ubuntuone.platform.filesystem_notifications.pyinotify_agnostic import (
48
class FakeServerProtocol(protocol.Protocol):
49
"""A test protocol."""
51
def dataReceived(self, data):
52
"""Echo the data received."""
53
self.transport.write(data)
56
class FakeServerFactory(protocol.ServerFactory):
57
"""A factory for the test server."""
59
protocol = FakeServerProtocol
62
class FakeDaemonEvent(object):
63
"""A fake daemon event."""
67
self.is_directory = False
68
self.event_type = None
71
class FakeProcessor(object):
72
"""A fake processor."""
74
def __init__(self, *args):
75
"""Create a new instance."""
76
self.processed_events = []
78
def __call__(self, event):
79
"""Process and event."""
80
self.processed_events.append(event)
83
class FakePyInotifyEventsFactory(object):
87
"""Create a new instance."""
88
self.processor = FakeProcessor()
90
self.watched_paths = []
91
self.ignored_paths = []
94
class FakeTransport(object):
95
"""A fake transport for the protocol."""
98
"""Create a new instance."""
101
def loseConnection(self):
102
""""Lost the connection."""
103
self.called.append('loseConnection')
106
class FakeProtocol(object):
107
"""A fake protocol object to interact with the daemon."""
110
"""Create a new instance."""
112
self.transport = FakeTransport()
114
def remove_user(self):
115
"""Remove the user."""
116
self.called.append('remove_user')
117
return defer.succeed(None)
119
def remove_path(self, path):
121
self.called.extend(['remove_path', path])
122
return defer.succeed(True)
124
def add_path(self, path):
126
self.called.extend(['add_path', path])
128
class PyInotifyEventsFactoryTestCase(BaseTwistedTestCase):
129
"""Test the factory used to receive events."""
131
@defer.inlineCallbacks
133
"""Set the diff tests."""
134
yield super(PyInotifyEventsFactoryTestCase, self).setUp()
135
self.processor = FakeProcessor()
136
self.factory = fsevents_daemon.PyInotifyEventsFactory(self.processor)
138
def test_path_interesting_not_watched_or_ignored(self):
139
"""Test that we do know if the path is not interesting."""
140
path = u'/not/watched/path'
141
self.assertTrue(self.factory.path_is_not_interesting(path))
143
def test_path_interesting_watched_not_ignored(self):
144
"""Test that we do not know if the path is not interesting."""
145
path = u'/watched/path'
146
self.factory.watched_paths.append(path)
147
self.assertFalse(self.factory.path_is_not_interesting(path))
149
def test_path_interesting_watched_but_ignored(self):
150
"""Test that we do not know if the path is not interesting."""
151
path = u'/ignored/path'
152
self.factory.watched_paths.append(path)
153
self.factory.ignored_paths.append(path)
154
self.assertTrue(self.factory.path_is_not_interesting(path))
156
def test_path_interesting_not_watched_but_ignored(self):
157
"""Test that we do not know if the path is not interesting."""
158
path = u'/ignored/path'
159
self.factory.ignored_paths.append(path)
160
self.assertTrue(self.factory.path_is_not_interesting(path))
162
def test_is_create_false_rename(self):
163
"""Test if we do know when an event is a create."""
164
source_path = u'/other/watched/path'
165
destination_path = u'/watched/path'
166
source_head, _ = os.path.split(source_path)
167
destination_head, _ = os.path.split(destination_path)
168
self.factory.watched_paths.extend([source_head, destination_head])
169
event = FakeDaemonEvent()
170
event.event_paths.extend([source_path, destination_path])
171
self.assertFalse(self.factory.is_create(event))
173
def test_is_create_false_delete(self):
174
"""Test if we do know when an event is a create."""
175
source_path = u'/watched/path'
176
destination_path = u'/not/watched/path'
177
source_head, _ = os.path.split(source_path)
178
self.factory.watched_paths.append(source_head)
179
event = FakeDaemonEvent()
180
event.event_paths.extend([source_path, destination_path])
181
self.assertFalse(self.factory.is_create(event))
183
def test_is_create_true(self):
184
"""Test is we do know when an event is a create."""
185
source_path = u'/not/watched/path'
186
destination_path = u'/watched/path'
187
destination_head, _ = os.path.split(destination_path)
188
self.factory.watched_paths.append(destination_head)
189
event = FakeDaemonEvent()
190
event.event_paths.extend([source_path, destination_path])
191
self.assertTrue(self.factory.is_create(event))
193
def test_is_delete_false_rename(self):
194
"""Test if we do know when an event is a delete."""
195
source_path = u'/other/watched/path'
196
destination_path = u'/watched/path'
197
source_head, _ = os.path.split(source_path)
198
destination_head, _ = os.path.split(destination_path)
199
self.factory.watched_paths.extend([source_head, destination_head])
200
event = FakeDaemonEvent()
201
event.event_paths.extend([source_path, destination_path])
202
self.assertFalse(self.factory.is_delete(event))
204
def test_is_delete_false_create(self):
205
"""Test if we do know when an event is a delete."""
206
source_path = u'/not/watched/path'
207
destination_path = u'/watched/path'
208
destination_head, _ = os.path.split(destination_path)
209
self.factory.watched_paths.append(destination_head)
210
event = FakeDaemonEvent()
211
event.event_paths.extend([source_path, destination_path])
212
self.assertFalse(self.factory.is_delete(event))
214
def test_is_delete_true(self):
215
"""Test if we do know when an event is a delete."""
216
source_path = u'/watched/path'
217
destination_path = u'/not/watched/path'
218
source_head, _ = os.path.split(source_path)
219
self.factory.watched_paths.append(source_head)
220
event = FakeDaemonEvent()
221
event.event_paths.extend([source_path, destination_path])
222
self.assertTrue(self.factory.is_delete(event))
224
def test_generate_from_event(self):
225
"""Test the creation of a fake from event."""
227
source_path = u'/source/path'
228
destination_path = u'/destination/path'
229
event = FakeDaemonEvent()
230
event.event_paths.extend([source_path, destination_path])
231
pyinotify_event = self.factory.generate_from_event(event, cookie)
232
self.assertEqual(cookie, pyinotify_event.cookie)
233
self.assertEqual(0, pyinotify_event.wd)
234
self.assertEqual(event.is_directory, pyinotify_event.dir)
235
self.assertEqual(IN_MOVED_FROM, pyinotify_event.mask)
236
self.assertEqual(source_path, pyinotify_event.pathname)
238
def test_generate_to_event(self):
239
"""Test the creation of a fake to event."""
241
source_path = u'/source/path'
242
destination_path = u'/destination/path'
243
event = FakeDaemonEvent()
244
event.event_paths.extend([source_path, destination_path])
245
pyinotify_event = self.factory.generate_to_event(event, cookie)
246
self.assertEqual(cookie, pyinotify_event.cookie)
247
self.assertEqual(0, pyinotify_event.wd)
248
self.assertEqual(event.is_directory, pyinotify_event.dir)
249
self.assertEqual(IN_MOVED_TO, pyinotify_event.mask)
250
self.assertEqual(destination_path, pyinotify_event.pathname)
252
def test_convert_in_pyinotify_event_no_rename(self):
253
"""Test the creation of a no rename event."""
254
event_path = u'/path/of/the/event'
255
for action in fsevents_daemon.DARWIN_ACTIONS:
256
event = FakeDaemonEvent()
257
event.event_paths.append(event_path)
258
event.event_type = action
259
converted_events = self.factory.convert_in_pyinotify_event(event)
260
self.assertEqual(1, len(converted_events))
261
pyinotify_event = converted_events[0]
262
self.assertEqual(0, pyinotify_event.wd)
263
self.assertEqual(event.is_directory, pyinotify_event.dir)
264
self.assertEqual(fsevents_daemon.DARWIN_ACTIONS[action],
265
pyinotify_event.mask)
266
self.assertEqual(event_path, pyinotify_event.pathname)
268
def test_convert_in_pyinotify_event_rename_create(self):
269
"""Test the creation of a rename which is a create."""
270
source_path = u'/not/watched/path'
271
destination_path = u'/watched/path'
272
head, _ = os.path.split(destination_path)
273
self.factory.watched_paths.append(head)
274
event = FakeDaemonEvent()
275
event.event_type = fsevents.FSE_RENAME
276
event.event_paths.extend([source_path, destination_path])
277
converted_events = self.factory.convert_in_pyinotify_event(event)
278
self.assertEqual(1, len(converted_events))
279
pyinotify_event = converted_events[0]
280
self.assertEqual(0, pyinotify_event.wd)
281
self.assertEqual(event.is_directory, pyinotify_event.dir)
282
self.assertEqual(IN_CREATE, pyinotify_event.mask)
283
self.assertEqual(destination_path, pyinotify_event.pathname)
285
def test_convert_in_pyinotify_event_rename_delete(self):
286
"""Test the creation of a rename which is a delete."""
287
source_path = u'/watched/path'
288
destination_path = u'/not/watched/path'
289
head, _ = os.path.split(source_path)
290
self.factory.watched_paths.append(head)
291
event = FakeDaemonEvent()
292
event.event_type = fsevents.FSE_RENAME
293
event.event_paths.extend([source_path, destination_path])
294
converted_events = self.factory.convert_in_pyinotify_event(event)
295
self.assertEqual(1, len(converted_events))
296
pyinotify_event = converted_events[0]
297
self.assertEqual(0, pyinotify_event.wd)
298
self.assertEqual(event.is_directory, pyinotify_event.dir)
299
self.assertEqual(IN_DELETE, pyinotify_event.mask)
300
self.assertEqual(source_path, pyinotify_event.pathname)
302
def test_convert_in_pyinotify_event_rename(self):
303
"""Test the creation of a rename event."""
304
source_path = u'/watched/path1'
305
destination_path = u'/watched/path2'
306
head, _ = os.path.split(source_path)
307
self.factory.watched_paths.append(head)
308
event = FakeDaemonEvent()
309
event.event_type = fsevents.FSE_RENAME
310
event.event_paths.extend([source_path, destination_path])
311
converted_events = self.factory.convert_in_pyinotify_event(event)
312
self.assertEqual(2, len(converted_events))
313
from_event = converted_events[0]
314
to_event = converted_events[1]
316
self.assertEqual(0, from_event.wd)
317
self.assertEqual(event.is_directory, from_event.dir)
318
self.assertEqual(IN_MOVED_FROM, from_event.mask)
319
self.assertEqual(source_path, from_event.pathname)
321
self.assertEqual(0, to_event.wd)
322
self.assertEqual(event.is_directory, to_event.dir)
323
self.assertEqual(IN_MOVED_TO, to_event.mask)
324
self.assertEqual(destination_path, to_event.pathname)
326
self.assertEqual(from_event.cookie, to_event.cookie)
328
def test_process_event_ignored_type(self):
329
"""Test processing the event of an ignored type."""
330
for action in fsevents_daemon.DARWIN_IGNORED_ACTIONS:
331
event = FakeDaemonEvent()
332
event.event_type = action
333
self.factory.process_event(event)
334
self.assertEqual(0, len(self.processor.processed_events))
336
def test_process_event_dropped(self):
337
"""Test processing the drop of the events."""
339
event = FakeDaemonEvent()
340
event.event_type = fsevents.FSE_EVENTS_DROPPED
342
def fake_events_dropped():
343
"""A fake events dropped implementation."""
344
func_called.append('fake_events_dropped')
346
self.patch(self.factory, 'events_dropper', fake_events_dropped)
347
self.factory.process_event(event)
348
self.assertIn('fake_events_dropped', func_called)
350
def test_process_ignored_path(self):
351
"""Test processing events from an ignored path."""
352
event_path = u'/path/of/the/event'
353
head, _ = os.path.split(event_path)
354
self.factory.ignored_paths.append(head)
355
event = FakeDaemonEvent()
356
event.event_paths.append(event_path)
357
event.event_type = fsevents.FSE_CREATE_FILE
358
self.factory.process_event(event)
359
self.assertEqual(0, len(self.processor.processed_events))
361
def test_process_not_ignored_path(self):
362
"""Test processing events that are not ignored."""
363
event_path = u'/path/of/the/event'
364
head, _ = os.path.split(event_path)
365
self.factory.watched_paths.append(head)
366
event = FakeDaemonEvent()
367
event.event_paths.append(event_path)
368
event.event_type = fsevents.FSE_CREATE_FILE
369
self.factory.process_event(event)
370
self.assertEqual(1, len(self.processor.processed_events))
371
self.assertEqual(event_path,
372
self.processor.processed_events[0].pathname)
375
class FilesystemMonitorTestCase(BaseTwistedTestCase):
376
"""Test the notify processor."""
378
def fake_connect_to_daemon(self):
379
"""A fake connection to daemon call."""
380
self.monitor._protocol = self.protocol
381
defer.succeed(self.protocol)
383
@defer.inlineCallbacks
386
yield super(FilesystemMonitorTestCase, self).setUp()
387
self.patch(fsevents_daemon, 'NotifyProcessor', FakeProcessor)
388
self.factory = FakePyInotifyEventsFactory()
389
self.protocol = FakeProtocol()
390
self.monitor = fsevents_daemon.FilesystemMonitor(None, None)
391
self.processor = self.monitor._processor
393
# override default objects
394
self.monitor._factory = self.factory
397
self.patch(fsevents_daemon.FilesystemMonitor, '_connect_to_daemon',
398
self.fake_connect_to_daemon)
400
@defer.inlineCallbacks
401
def test_shutdown_protocol(self):
402
"""Test shutdown with a protocol."""
403
self.monitor._protocol = self.protocol
404
yield self.monitor.shutdown()
405
self.assertIn('remove_user', self.protocol.called)
407
@defer.inlineCallbacks
408
def test_shutdown_no_protocol(self):
409
"""Test shutdown without a protocol."""
410
stopped = yield self.monitor.shutdown()
411
self.assertTrue(stopped)
413
@defer.inlineCallbacks
414
def test_rm_path_not_root(self):
415
"""Test removing a path."""
416
dirpath = '/path/to/remove/'
417
self.factory.watched_paths.append('/path')
418
yield self.monitor.rm_watch(dirpath)
419
self.assertIn(dirpath, self.factory.ignored_paths)
421
@defer.inlineCallbacks
422
def test_rm_path_root(self):
423
"""Test removing a path that is a root path."""
424
dirpath = '/path/to/remove/'
425
self.factory.watched_paths.append(dirpath)
426
yield self.monitor.rm_watch(dirpath)
427
self.assertIn('remove_path', self.protocol.called)
428
self.assertIn(dirpath, self.protocol.called)
429
self.assertNotIn(dirpath, self.factory.watched_paths)
431
@defer.inlineCallbacks
432
def test_add_watch_not_root(self):
433
"""Test adding a watch."""
434
dirpath = '/path/to/remove/'
435
self.factory.watched_paths.append('/path')
436
yield self.monitor.add_watch(dirpath)
437
self.assertNotIn('add_path', self.protocol.called)
439
@defer.inlineCallbacks
440
def test_add_watch_root(self):
441
"""Test adding a watch that is a root."""
442
dirpath = '/path/to/remove/'
443
self.factory.watched_paths.append('/other/path')
444
yield self.monitor.add_watch(dirpath)
445
self.assertIn('add_path', self.protocol.called)
446
self.assertIn(dirpath, self.protocol.called)
448
@defer.inlineCallbacks
449
def test_add_watch_ignored(self):
450
"""Test adding a watch that was ignored."""
451
dirpath = '/path/to/remove/'
452
self.factory.ignored_paths.append(dirpath)
453
yield self.monitor.add_watch(dirpath)
454
self.assertNotIn('add_path', self.protocol.called)
456
@defer.inlineCallbacks
457
def test_is_available_monitor_running(self):
458
"""Test the method when it is indeed running."""
459
monitor_cls = fsevents_daemon.FilesystemMonitor
461
# start a fake server for the test
462
server = TidyUnixServer()
463
yield server.listen_server(FakeServerFactory)
464
self.addCleanup(server.clean_up)
467
old_socket = fsevents_daemon.DAEMON_SOCKET
468
fsevents_daemon.DAEMON_SOCKET = server.path
469
self.addCleanup(setattr, fsevents_daemon, 'DAEMON_SOCKET', old_socket)
471
result = yield monitor_cls.is_available_monitor()
472
self.assertTrue(result)
474
@defer.inlineCallbacks
475
def test_is_available_monitor_fail(self):
476
"""Test the method when the daemon is not running."""
477
monitor_cls = fsevents_daemon.FilesystemMonitor
478
old_socket = fsevents_daemon.DAEMON_SOCKET
479
fsevents_daemon.DAEMON_SOCKET += 'test'
480
self.addCleanup(setattr, fsevents_daemon, 'DAEMON_SOCKET', old_socket)
482
result = yield monitor_cls.is_available_monitor()
483
self.assertFalse(result)