2
# Author: Facundo Batista <facundo@canonical.com>
4
# Author: Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
6
# Copyright 2009 Canonical Ltd.
2
# Authors: Facundo Batista <facundo@canonical.com>
3
# Guillermo Gonzalez <guillermo.gonzalez@canonical.com>
5
# Copyright 2009-2011 Canonical Ltd.
8
7
# This program is free software: you can redistribute it and/or modify it
9
8
# under the terms of the GNU General Public License version 3, as published
34
32
from ubuntuone.syncdaemon import volume_manager
35
33
from ubuntuone.syncdaemon.tritcask import Tritcask
35
# We normally access to private attribs in tests
36
# pylint: disable=W0212
38
39
class BaseFSMonitorTestCase(testcase.BaseTwistedTestCase):
39
40
"""Test the structures where we have the path/watch."""
44
@defer.inlineCallbacks
45
testcase.BaseTwistedTestCase.setUp(self)
47
yield super(BaseFSMonitorTestCase, self).setUp()
46
48
fsmdir = self.mktemp('fsmdir')
47
49
partials_dir = self.mktemp('partials_dir')
48
50
self.root_dir = self.mktemp('root_dir')
70
72
self.log_handler.setLevel(logging.DEBUG)
71
73
self.monitor.log.addHandler(self.log_handler)
75
@defer.inlineCallbacks
73
76
def tearDown(self):
74
77
"""Clean up the tests."""
75
78
self.monitor.shutdown()
76
self.rmtree(self.tmpdir)
77
79
self.monitor.log.removeHandler(self.log_handler)
78
testcase.BaseTwistedTestCase.tearDown(self)
80
yield super(BaseFSMonitorTestCase, self).tearDown()
81
83
class WatchManagerTests(BaseFSMonitorTestCase):
89
91
self.monitor.add_watch(path)
91
93
# we have the watch, remove the dir, the watch should be gone
92
self.assertTrue(self.monitor.has_watch(path))
94
self.assertIn(path, self.monitor._general_watchs)
96
98
"""Check state after the event."""
97
self.assertFalse(self.monitor.has_watch(path))
99
self.assertNotIn(path, self.monitor._general_watchs)
99
101
self.deferred.addCallback(check)
100
102
return self.deferred
110
112
# we have the watch, rename the dir, new name should have the watch,
111
113
# old name should not
112
self.assertTrue(self.monitor.has_watch(path1))
114
self.assertIn(path1, self.monitor._general_watchs)
113
115
os.rename(path1, path2)
116
118
"""Check state after the event."""
117
self.assertTrue(self.monitor.has_watch(path2))
118
self.assertFalse(self.monitor.has_watch(path1))
119
self.assertIn(path2, self.monitor._general_watchs)
120
self.assertNotIn(path1, self.monitor._general_watchs)
120
122
self.deferred.addCallback(check)
121
123
return self.deferred
131
133
self.monitor.add_watch(path1)
133
135
# we have the watch, move it outside watched structure, no more watches
134
self.assertTrue(self.monitor.has_watch(path1))
136
self.assertIn(path1, self.monitor._general_watchs)
135
137
os.rename(path1, path2)
138
140
"""Check state after the event."""
139
self.assertFalse(self.monitor.has_watch(path1))
140
self.assertFalse(self.monitor.has_watch(path2))
141
self.assertNotIn(path1, self.monitor._general_watchs)
142
self.assertNotIn(path2, self.monitor._general_watchs)
142
144
self.deferred.addCallback(check)
143
145
return self.deferred
211
class BaseTwisted(BaseFSMonitorTestCase):
212
"""Base class for twisted tests."""
214
# this timeout must be bigger than the one used in event_queue
218
"""Setup the test."""
219
BaseFSMonitorTestCase.setUp(self)
221
# create the deferred for the tests
222
self._deferred = defer.Deferred()
224
def finished_ok(self):
225
"""Called to indicate that the tests finished ok."""
226
self._deferred.callback(True)
228
def finished_error(self, msg):
229
"""Called to indicate that the tests finished badly."""
230
self._deferred.errback(Exception(msg))
232
def failUnlessEqual(self, first, second, msg=''):
233
"""Fail the test if C{first} and C{second} are not equal.
235
@param msg: A string describing the failure that's included in the
239
if not first == second:
244
exception = self.failureException(
245
'%snot equal:\na = %s\nb = %s\n'
246
% (msg, repr(first), repr(second)))
247
self.finished_error(exception)
250
assertEqual = assertEquals = failUnlessEquals = failUnlessEqual
253
213
class WatchTests(BaseFSMonitorTestCase):
254
214
"""Test the EQ API to add and remove watchs."""
264
224
def test_add_general_watch(self):
265
225
"""Test that general watchs can be added."""
266
# we should have what we asked for
267
226
self.monitor.add_watch(self.root_dir)
269
# check only added dir in watchs
270
# pylint: disable-msg=W0212
271
self.assertTrue(self.root_dir in self.monitor._general_watchs)
272
self.assertTrue("not-added-dir" not in self.monitor._general_watchs)
228
# check only added dir in watchs, and logs
229
self.assertIn(self.root_dir, self.monitor._general_watchs)
230
self.assertNotIn("not-added-dir", self.monitor._general_watchs)
231
self.assertTrue(self.log_handler.check_debug(
232
"Adding general inotify watch", self.root_dir))
274
234
# nothing in the udf ancestors watch
275
235
self.assertEqual(self.monitor._ancestors_watchs, {})
237
def test_add_general_watch_twice(self):
238
"""Test that general watchs can be added."""
239
self.monitor.add_watch(self.root_dir)
240
self.assertTrue(self.log_handler.check_debug(
241
"Adding general inotify watch", self.root_dir))
242
self.assertIn(self.root_dir, self.monitor._general_watchs)
245
self.monitor.add_watch(self.root_dir)
246
self.assertTrue(self.log_handler.check_debug("Watch already there for",
248
self.assertIn(self.root_dir, self.monitor._general_watchs)
277
250
@defer.inlineCallbacks
278
251
def test_add_watch_on_udf_ancestor(self):
279
252
"""Test that ancestors watchs can be added."""
284
257
self.monitor.add_watch(path_ancestor)
286
259
# check only added dir in watchs
287
# pylint: disable-msg=W0212
288
260
self.assertTrue(path_ancestor in self.monitor._ancestors_watchs)
289
261
self.assertTrue("not-added-dir" not in self.monitor._ancestors_watchs)
299
271
yield self._create_udf(path_udf)
300
272
self.monitor.add_watch(path_udf)
302
# pylint: disable-msg=W0212
303
274
self.assertTrue(path_udf in self.monitor._general_watchs)
304
275
self.assertEqual(self.monitor._ancestors_watchs, {})
313
284
os.mkdir(path_ancestor)
314
285
self.monitor.add_watch(path_ancestor)
316
# pylint: disable-msg=W0212
317
287
self.assertTrue(path_ancestor in self.monitor._general_watchs)
318
288
self.assertEqual(self.monitor._ancestors_watchs, {})
327
297
not_existing_dir = "not-added-dir"
328
298
self.monitor.add_watch(not_existing_dir)
329
self.assertTrue(self.monitor.has_watch(not_existing_dir))
299
self.assertIn(not_existing_dir, self.monitor._general_watchs)
330
300
self.monitor.rm_watch(not_existing_dir)
331
self.assertFalse(self.monitor.has_watch(not_existing_dir))
301
self.assertNotIn(not_existing_dir, self.monitor._general_watchs)
333
303
@defer.inlineCallbacks
334
304
def test_rm_watch_wrong(self):
351
321
self.monitor.add_watch(self.root_dir)
352
322
self.monitor.rm_watch(self.root_dir)
354
# pylint: disable-msg=W0212
355
324
self.assertEqual(self.monitor._general_watchs, {})
356
325
self.assertEqual(self.monitor._ancestors_watchs, {})
367
336
# remove what we added
368
337
self.monitor.rm_watch(path_ancestor)
369
# pylint: disable-msg=W0212
370
338
self.assertEqual(self.monitor._general_watchs, {})
371
339
self.assertEqual(self.monitor._ancestors_watchs, {})
373
def test_has_watch_general(self):
374
"""Test that a general path is watched."""
375
self.assertFalse(self.monitor.has_watch(self.root_dir))
378
self.monitor.add_watch(self.root_dir)
379
self.assertTrue(self.monitor.has_watch(self.root_dir))
382
self.monitor.rm_watch(self.root_dir)
383
self.assertFalse(self.monitor.has_watch(self.root_dir))
385
@defer.inlineCallbacks
386
def test_has_watch_ancestor(self):
387
"""Test that an ancestor path is watched."""
388
path_udf = os.path.join(self.home_dir, "path/to/UDF")
389
yield self._create_udf(path_udf)
390
path_ancestor = os.path.join(self.home_dir, "path")
392
self.assertFalse(self.monitor.has_watch(path_ancestor))
395
# create the udf and add the watch
396
self.monitor.add_watch(path_ancestor)
397
self.assertTrue(self.monitor.has_watch(path_ancestor))
400
self.monitor.rm_watch(path_ancestor)
401
self.assertFalse(self.monitor.has_watch(path_ancestor))
405
"""Collect the tests."""
407
if sys.platform == 'linux2':
408
tests = unittest.TestLoader().loadTestsFromName(__name__)
413
if __name__ == "__main__":