~ubuntu-branches/ubuntu/quantal/ubuntuone-client/quantal

« back to all changes in this revision

Viewing changes to tests/platform/filesystem_notifications/test_fsevents_daemon.py

  • Committer: Package Import Robot
  • Author(s): Rodney Dawes
  • Date: 2012-08-02 09:23:29 UTC
  • mto: This revision was merged to the branch mainline in revision 110.
  • Revision ID: package-import@ubuntu.com-20120802092329-776xphxixowc3i1r
Tags: upstream-3.99.3
ImportĀ upstreamĀ versionĀ 3.99.3

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- coding: utf-8 *-*
 
2
#
 
3
# Copyright 2012 Canonical Ltd.
 
4
#
 
5
# This program is free software: you can redistribute it and/or modify it
 
6
# under the terms of the GNU General Public License version 3, as published
 
7
# by the Free Software Foundation.
 
8
#
 
9
# This program is distributed in the hope that it will be useful, but
 
10
# WITHOUT ANY WARRANTY; without even the implied warranties of
 
11
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
 
12
# PURPOSE.  See the GNU General Public License for more details.
 
13
#
 
14
# You should have received a copy of the GNU General Public License along
 
15
# with this program.  If not, see <http://www.gnu.org/licenses/>.
 
16
#
 
17
# In addition, as a special exception, the copyright holders give
 
18
# permission to link the code of portions of this program with the
 
19
# OpenSSL library under certain conditions as described in each
 
20
# individual source file, and distribute linked combinations
 
21
# including the two.
 
22
# You must obey the GNU General Public License in all respects
 
23
# for all of the code used other than OpenSSL.  If you modify
 
24
# file(s) with this exception, you may extend this exception to your
 
25
# version of the file(s), but you are not obligated to do so.  If you
 
26
# do not wish to do so, delete this exception statement from your
 
27
# version.  If you delete this exception statement from all source
 
28
# files in the program, then also delete it here.
 
29
"""Tests for the fsevents daemon integration."""
 
30
 
 
31
import os
 
32
 
 
33
from twisted.internet import defer, protocol
 
34
 
 
35
from contrib.testing.testcase import BaseTwistedTestCase
 
36
from ubuntuone.darwin import fsevents
 
37
from ubuntuone.devtools.testcases.txsocketserver import TidyUnixServer
 
38
from ubuntuone.platform.filesystem_notifications.monitor.darwin import (
 
39
    fsevents_daemon,
 
40
)
 
41
from ubuntuone.platform.filesystem_notifications.pyinotify_agnostic import (
 
42
    IN_CREATE,
 
43
    IN_DELETE,
 
44
    IN_MOVED_FROM,
 
45
    IN_MOVED_TO,
 
46
)
 
47
 
 
48
class FakeServerProtocol(protocol.Protocol):
 
49
    """A test protocol."""
 
50
 
 
51
    def dataReceived(self, data):
 
52
        """Echo the data received."""
 
53
        self.transport.write(data)
 
54
 
 
55
 
 
56
class FakeServerFactory(protocol.ServerFactory):
 
57
    """A factory for the test server."""
 
58
 
 
59
    protocol = FakeServerProtocol
 
60
 
 
61
 
 
62
class FakeDaemonEvent(object):
 
63
    """A fake daemon event."""
 
64
 
 
65
    def __init__(self):
 
66
        self.event_paths = []
 
67
        self.is_directory = False
 
68
        self.event_type = None
 
69
 
 
70
 
 
71
class FakeProcessor(object):
 
72
    """A fake processor."""
 
73
 
 
74
    def __init__(self, *args):
 
75
        """Create a new instance."""
 
76
        self.processed_events = []
 
77
 
 
78
    def __call__(self, event):
 
79
        """Process and event."""
 
80
        self.processed_events.append(event)
 
81
 
 
82
 
 
83
class FakePyInotifyEventsFactory(object):
 
84
    """Fake factory."""
 
85
 
 
86
    def __init__(self):
 
87
        """Create a new instance."""
 
88
        self.processor = FakeProcessor()
 
89
        self.called = []
 
90
        self.watched_paths = []
 
91
        self.ignored_paths = []
 
92
 
 
93
 
 
94
class FakeTransport(object):
 
95
    """A fake transport for the protocol."""
 
96
 
 
97
    def __init__(self):
 
98
        """Create a new instance."""
 
99
        self.called = []
 
100
 
 
101
    def loseConnection(self):
 
102
        """"Lost the connection."""
 
103
        self.called.append('loseConnection')
 
104
 
 
105
 
 
106
class FakeProtocol(object):
 
107
    """A fake protocol object to interact with the daemon."""
 
108
 
 
109
    def __init__(self):
 
110
        """Create a new instance."""
 
111
        self.called = []
 
112
        self.transport = FakeTransport()
 
113
 
 
114
    def remove_user(self):
 
115
        """Remove the user."""
 
116
        self.called.append('remove_user')
 
117
        return defer.succeed(None)
 
118
 
 
119
    def remove_path(self, path):
 
120
        """Remove a path."""
 
121
        self.called.extend(['remove_path', path])
 
122
        return defer.succeed(True)
 
123
 
 
124
    def add_path(self, path):
 
125
        """Add a path."""
 
126
        self.called.extend(['add_path', path])
 
127
 
 
128
class PyInotifyEventsFactoryTestCase(BaseTwistedTestCase):
 
129
    """Test the factory used to receive events."""
 
130
 
 
131
    @defer.inlineCallbacks
 
132
    def setUp(self):
 
133
        """Set the diff tests."""
 
134
        yield super(PyInotifyEventsFactoryTestCase, self).setUp()
 
135
        self.processor = FakeProcessor()
 
136
        self.factory = fsevents_daemon.PyInotifyEventsFactory(self.processor)
 
137
 
 
138
    def test_path_interesting_not_watched_or_ignored(self):
 
139
        """Test that we do know if the path is not interesting."""
 
140
        path = u'/not/watched/path'
 
141
        self.assertTrue(self.factory.path_is_not_interesting(path))
 
142
 
 
143
    def test_path_interesting_watched_not_ignored(self):
 
144
        """Test that we do not know if the path is not interesting."""
 
145
        path = u'/watched/path'
 
146
        self.factory.watched_paths.append(path)
 
147
        self.assertFalse(self.factory.path_is_not_interesting(path))
 
148
 
 
149
    def test_path_interesting_watched_but_ignored(self):
 
150
        """Test that we do not know if the path is not interesting."""
 
151
        path = u'/ignored/path'
 
152
        self.factory.watched_paths.append(path)
 
153
        self.factory.ignored_paths.append(path)
 
154
        self.assertTrue(self.factory.path_is_not_interesting(path))
 
155
 
 
156
    def test_path_interesting_not_watched_but_ignored(self):
 
157
        """Test that we do not know if the path is not interesting."""
 
158
        path = u'/ignored/path'
 
159
        self.factory.ignored_paths.append(path)
 
160
        self.assertTrue(self.factory.path_is_not_interesting(path))
 
161
 
 
162
    def test_is_create_false_rename(self):
 
163
        """Test if we do know when an event is a create."""
 
164
        source_path = u'/other/watched/path'
 
165
        destination_path = u'/watched/path'
 
166
        source_head, _ = os.path.split(source_path)
 
167
        destination_head, _ = os.path.split(destination_path)
 
168
        self.factory.watched_paths.extend([source_head, destination_head])
 
169
        event = FakeDaemonEvent()
 
170
        event.event_paths.extend([source_path, destination_path])
 
171
        self.assertFalse(self.factory.is_create(event))
 
172
 
 
173
    def test_is_create_false_delete(self):
 
174
        """Test if we do know when an event is a create."""
 
175
        source_path = u'/watched/path'
 
176
        destination_path = u'/not/watched/path'
 
177
        source_head, _ = os.path.split(source_path)
 
178
        self.factory.watched_paths.append(source_head)
 
179
        event = FakeDaemonEvent()
 
180
        event.event_paths.extend([source_path, destination_path])
 
181
        self.assertFalse(self.factory.is_create(event))
 
182
 
 
183
    def test_is_create_true(self):
 
184
        """Test is we do know when an event is a create."""
 
185
        source_path = u'/not/watched/path'
 
186
        destination_path = u'/watched/path'
 
187
        destination_head, _ = os.path.split(destination_path)
 
188
        self.factory.watched_paths.append(destination_head)
 
189
        event = FakeDaemonEvent()
 
190
        event.event_paths.extend([source_path, destination_path])
 
191
        self.assertTrue(self.factory.is_create(event))
 
192
 
 
193
    def test_is_delete_false_rename(self):
 
194
        """Test if we do know when an event is a delete."""
 
195
        source_path = u'/other/watched/path'
 
196
        destination_path = u'/watched/path'
 
197
        source_head, _ = os.path.split(source_path)
 
198
        destination_head, _ = os.path.split(destination_path)
 
199
        self.factory.watched_paths.extend([source_head, destination_head])
 
200
        event = FakeDaemonEvent()
 
201
        event.event_paths.extend([source_path, destination_path])
 
202
        self.assertFalse(self.factory.is_delete(event))
 
203
 
 
204
    def test_is_delete_false_create(self):
 
205
        """Test if we do know when an event is a delete."""
 
206
        source_path = u'/not/watched/path'
 
207
        destination_path = u'/watched/path'
 
208
        destination_head, _ = os.path.split(destination_path)
 
209
        self.factory.watched_paths.append(destination_head)
 
210
        event = FakeDaemonEvent()
 
211
        event.event_paths.extend([source_path, destination_path])
 
212
        self.assertFalse(self.factory.is_delete(event))
 
213
 
 
214
    def test_is_delete_true(self):
 
215
        """Test if we do know when an event is a delete."""
 
216
        source_path = u'/watched/path'
 
217
        destination_path = u'/not/watched/path'
 
218
        source_head, _ = os.path.split(source_path)
 
219
        self.factory.watched_paths.append(source_head)
 
220
        event = FakeDaemonEvent()
 
221
        event.event_paths.extend([source_path, destination_path])
 
222
        self.assertTrue(self.factory.is_delete(event))
 
223
 
 
224
    def test_generate_from_event(self):
 
225
        """Test the creation of a fake from event."""
 
226
        cookie = 'cookie'
 
227
        source_path = u'/source/path'
 
228
        destination_path = u'/destination/path'
 
229
        event = FakeDaemonEvent()
 
230
        event.event_paths.extend([source_path, destination_path])
 
231
        pyinotify_event = self.factory.generate_from_event(event, cookie)
 
232
        self.assertEqual(cookie, pyinotify_event.cookie)
 
233
        self.assertEqual(0, pyinotify_event.wd)
 
234
        self.assertEqual(event.is_directory, pyinotify_event.dir)
 
235
        self.assertEqual(IN_MOVED_FROM, pyinotify_event.mask)
 
236
        self.assertEqual(source_path, pyinotify_event.pathname)
 
237
 
 
238
    def test_generate_to_event(self):
 
239
        """Test the creation of a fake to event."""
 
240
        cookie = 'cookie'
 
241
        source_path = u'/source/path'
 
242
        destination_path = u'/destination/path'
 
243
        event = FakeDaemonEvent()
 
244
        event.event_paths.extend([source_path, destination_path])
 
245
        pyinotify_event = self.factory.generate_to_event(event, cookie)
 
246
        self.assertEqual(cookie, pyinotify_event.cookie)
 
247
        self.assertEqual(0, pyinotify_event.wd)
 
248
        self.assertEqual(event.is_directory, pyinotify_event.dir)
 
249
        self.assertEqual(IN_MOVED_TO, pyinotify_event.mask)
 
250
        self.assertEqual(destination_path, pyinotify_event.pathname)
 
251
 
 
252
    def test_convert_in_pyinotify_event_no_rename(self):
 
253
        """Test the creation of a no rename event."""
 
254
        event_path = u'/path/of/the/event'
 
255
        for action in fsevents_daemon.DARWIN_ACTIONS:
 
256
            event = FakeDaemonEvent()
 
257
            event.event_paths.append(event_path)
 
258
            event.event_type = action
 
259
            converted_events = self.factory.convert_in_pyinotify_event(event)
 
260
            self.assertEqual(1, len(converted_events))
 
261
            pyinotify_event = converted_events[0]
 
262
            self.assertEqual(0, pyinotify_event.wd)
 
263
            self.assertEqual(event.is_directory, pyinotify_event.dir)
 
264
            self.assertEqual(fsevents_daemon.DARWIN_ACTIONS[action],
 
265
                pyinotify_event.mask)
 
266
            self.assertEqual(event_path, pyinotify_event.pathname)
 
267
 
 
268
    def test_convert_in_pyinotify_event_rename_create(self):
 
269
        """Test the creation of a rename which is a create."""
 
270
        source_path = u'/not/watched/path'
 
271
        destination_path = u'/watched/path'
 
272
        head, _ = os.path.split(destination_path)
 
273
        self.factory.watched_paths.append(head)
 
274
        event = FakeDaemonEvent()
 
275
        event.event_type = fsevents.FSE_RENAME
 
276
        event.event_paths.extend([source_path, destination_path])
 
277
        converted_events = self.factory.convert_in_pyinotify_event(event)
 
278
        self.assertEqual(1, len(converted_events))
 
279
        pyinotify_event = converted_events[0]
 
280
        self.assertEqual(0, pyinotify_event.wd)
 
281
        self.assertEqual(event.is_directory, pyinotify_event.dir)
 
282
        self.assertEqual(IN_CREATE, pyinotify_event.mask)
 
283
        self.assertEqual(destination_path, pyinotify_event.pathname)
 
284
 
 
285
    def test_convert_in_pyinotify_event_rename_delete(self):
 
286
        """Test the creation of a rename which is a delete."""
 
287
        source_path = u'/watched/path'
 
288
        destination_path = u'/not/watched/path'
 
289
        head, _ = os.path.split(source_path)
 
290
        self.factory.watched_paths.append(head)
 
291
        event = FakeDaemonEvent()
 
292
        event.event_type = fsevents.FSE_RENAME
 
293
        event.event_paths.extend([source_path, destination_path])
 
294
        converted_events = self.factory.convert_in_pyinotify_event(event)
 
295
        self.assertEqual(1, len(converted_events))
 
296
        pyinotify_event = converted_events[0]
 
297
        self.assertEqual(0, pyinotify_event.wd)
 
298
        self.assertEqual(event.is_directory, pyinotify_event.dir)
 
299
        self.assertEqual(IN_DELETE, pyinotify_event.mask)
 
300
        self.assertEqual(source_path, pyinotify_event.pathname)
 
301
 
 
302
    def test_convert_in_pyinotify_event_rename(self):
 
303
        """Test the creation of a rename event."""
 
304
        source_path = u'/watched/path1'
 
305
        destination_path = u'/watched/path2'
 
306
        head, _ = os.path.split(source_path)
 
307
        self.factory.watched_paths.append(head)
 
308
        event = FakeDaemonEvent()
 
309
        event.event_type = fsevents.FSE_RENAME
 
310
        event.event_paths.extend([source_path, destination_path])
 
311
        converted_events = self.factory.convert_in_pyinotify_event(event)
 
312
        self.assertEqual(2, len(converted_events))
 
313
        from_event = converted_events[0]
 
314
        to_event = converted_events[1]
 
315
        # assert from event
 
316
        self.assertEqual(0, from_event.wd)
 
317
        self.assertEqual(event.is_directory, from_event.dir)
 
318
        self.assertEqual(IN_MOVED_FROM, from_event.mask)
 
319
        self.assertEqual(source_path, from_event.pathname)
 
320
        # assert to event
 
321
        self.assertEqual(0, to_event.wd)
 
322
        self.assertEqual(event.is_directory, to_event.dir)
 
323
        self.assertEqual(IN_MOVED_TO, to_event.mask)
 
324
        self.assertEqual(destination_path, to_event.pathname)
 
325
        # assert cookie
 
326
        self.assertEqual(from_event.cookie, to_event.cookie)
 
327
 
 
328
    def test_process_event_ignored_type(self):
 
329
        """Test processing the event of an ignored type."""
 
330
        for action in fsevents_daemon.DARWIN_IGNORED_ACTIONS:
 
331
            event = FakeDaemonEvent()
 
332
            event.event_type = action
 
333
            self.factory.process_event(event)
 
334
        self.assertEqual(0, len(self.processor.processed_events))
 
335
 
 
336
    def test_process_event_dropped(self):
 
337
        """Test processing the drop of the events."""
 
338
        func_called = []
 
339
        event = FakeDaemonEvent()
 
340
        event.event_type = fsevents.FSE_EVENTS_DROPPED
 
341
 
 
342
        def fake_events_dropped():
 
343
            """A fake events dropped implementation."""
 
344
            func_called.append('fake_events_dropped')
 
345
 
 
346
        self.patch(self.factory, 'events_dropper', fake_events_dropped)
 
347
        self.factory.process_event(event)
 
348
        self.assertIn('fake_events_dropped', func_called)
 
349
 
 
350
    def test_process_ignored_path(self):
 
351
        """Test processing events from an ignored path."""
 
352
        event_path = u'/path/of/the/event'
 
353
        head, _ = os.path.split(event_path)
 
354
        self.factory.ignored_paths.append(head)
 
355
        event = FakeDaemonEvent()
 
356
        event.event_paths.append(event_path)
 
357
        event.event_type = fsevents.FSE_CREATE_FILE
 
358
        self.factory.process_event(event)
 
359
        self.assertEqual(0, len(self.processor.processed_events))
 
360
 
 
361
    def test_process_not_ignored_path(self):
 
362
        """Test processing events that are not ignored."""
 
363
        event_path = u'/path/of/the/event'
 
364
        head, _ = os.path.split(event_path)
 
365
        self.factory.watched_paths.append(head)
 
366
        event = FakeDaemonEvent()
 
367
        event.event_paths.append(event_path)
 
368
        event.event_type = fsevents.FSE_CREATE_FILE
 
369
        self.factory.process_event(event)
 
370
        self.assertEqual(1, len(self.processor.processed_events))
 
371
        self.assertEqual(event_path,
 
372
                self.processor.processed_events[0].pathname)
 
373
 
 
374
 
 
375
class FilesystemMonitorTestCase(BaseTwistedTestCase):
 
376
    """Test the notify processor."""
 
377
 
 
378
    def fake_connect_to_daemon(self):
 
379
        """A fake connection to daemon call."""
 
380
        self.monitor._protocol = self.protocol
 
381
        defer.succeed(self.protocol)
 
382
 
 
383
    @defer.inlineCallbacks
 
384
    def setUp(self):
 
385
        """Set the tests."""
 
386
        yield super(FilesystemMonitorTestCase, self).setUp()
 
387
        self.patch(fsevents_daemon, 'NotifyProcessor', FakeProcessor)
 
388
        self.factory = FakePyInotifyEventsFactory()
 
389
        self.protocol = FakeProtocol()
 
390
        self.monitor = fsevents_daemon.FilesystemMonitor(None, None)
 
391
        self.processor = self.monitor._processor
 
392
 
 
393
        # override default objects
 
394
        self.monitor._factory = self.factory
 
395
 
 
396
        # patch the connect
 
397
        self.patch(fsevents_daemon.FilesystemMonitor, '_connect_to_daemon',
 
398
                self.fake_connect_to_daemon)
 
399
 
 
400
    @defer.inlineCallbacks
 
401
    def test_shutdown_protocol(self):
 
402
        """Test shutdown with a protocol."""
 
403
        self.monitor._protocol = self.protocol
 
404
        yield self.monitor.shutdown()
 
405
        self.assertIn('remove_user', self.protocol.called)
 
406
 
 
407
    @defer.inlineCallbacks
 
408
    def test_shutdown_no_protocol(self):
 
409
        """Test shutdown without a protocol."""
 
410
        stopped = yield self.monitor.shutdown()
 
411
        self.assertTrue(stopped)
 
412
 
 
413
    @defer.inlineCallbacks
 
414
    def test_rm_path_not_root(self):
 
415
        """Test removing a path."""
 
416
        dirpath = '/path/to/remove/'
 
417
        self.factory.watched_paths.append('/path')
 
418
        yield self.monitor.rm_watch(dirpath)
 
419
        self.assertIn(dirpath, self.factory.ignored_paths)
 
420
 
 
421
    @defer.inlineCallbacks
 
422
    def test_rm_path_root(self):
 
423
        """Test removing a path that is a root path."""
 
424
        dirpath = '/path/to/remove/'
 
425
        self.factory.watched_paths.append(dirpath)
 
426
        yield self.monitor.rm_watch(dirpath)
 
427
        self.assertIn('remove_path', self.protocol.called)
 
428
        self.assertIn(dirpath, self.protocol.called)
 
429
        self.assertNotIn(dirpath, self.factory.watched_paths)
 
430
 
 
431
    @defer.inlineCallbacks
 
432
    def test_add_watch_not_root(self):
 
433
        """Test adding a watch."""
 
434
        dirpath = '/path/to/remove/'
 
435
        self.factory.watched_paths.append('/path')
 
436
        yield self.monitor.add_watch(dirpath)
 
437
        self.assertNotIn('add_path', self.protocol.called)
 
438
 
 
439
    @defer.inlineCallbacks
 
440
    def test_add_watch_root(self):
 
441
        """Test adding a watch that is a root."""
 
442
        dirpath = '/path/to/remove/'
 
443
        self.factory.watched_paths.append('/other/path')
 
444
        yield self.monitor.add_watch(dirpath)
 
445
        self.assertIn('add_path', self.protocol.called)
 
446
        self.assertIn(dirpath, self.protocol.called)
 
447
 
 
448
    @defer.inlineCallbacks
 
449
    def test_add_watch_ignored(self):
 
450
        """Test adding a watch that was ignored."""
 
451
        dirpath = '/path/to/remove/'
 
452
        self.factory.ignored_paths.append(dirpath)
 
453
        yield self.monitor.add_watch(dirpath)
 
454
        self.assertNotIn('add_path', self.protocol.called)
 
455
 
 
456
    @defer.inlineCallbacks
 
457
    def test_is_available_monitor_running(self):
 
458
        """Test the method when it is indeed running."""
 
459
        monitor_cls = fsevents_daemon.FilesystemMonitor
 
460
 
 
461
        # start a fake server for the test
 
462
        server = TidyUnixServer()
 
463
        yield server.listen_server(FakeServerFactory)
 
464
        self.addCleanup(server.clean_up)
 
465
 
 
466
        # set the path
 
467
        old_socket = fsevents_daemon.DAEMON_SOCKET
 
468
        fsevents_daemon.DAEMON_SOCKET = server.path
 
469
        self.addCleanup(setattr, fsevents_daemon, 'DAEMON_SOCKET', old_socket)
 
470
 
 
471
        result = yield monitor_cls.is_available_monitor()
 
472
        self.assertTrue(result)
 
473
 
 
474
    @defer.inlineCallbacks
 
475
    def test_is_available_monitor_fail(self):
 
476
        """Test the method when the daemon is not running."""
 
477
        monitor_cls = fsevents_daemon.FilesystemMonitor
 
478
        old_socket = fsevents_daemon.DAEMON_SOCKET
 
479
        fsevents_daemon.DAEMON_SOCKET += 'test'
 
480
        self.addCleanup(setattr, fsevents_daemon, 'DAEMON_SOCKET', old_socket)
 
481
 
 
482
        result = yield monitor_cls.is_available_monitor()
 
483
        self.assertFalse(result)