~ubuntu-branches/ubuntu/precise/ubuntuone-client/precise-201112142106

« back to all changes in this revision

Viewing changes to tests/syncdaemon/test_eventqueue.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2011-08-09 12:47:56 UTC
  • mfrom: (1.1.53 upstream)
  • Revision ID: james.westby@ubuntu.com-20110809124756-7nzilp3oix0a1yl9
Tags: 1.7.1-0ubuntu1
* New upstream release.
* debian/*:
  - Removed obsolete pycompat file
  - Removed ubuntuone-client-gnome deps and binary packaging, as it was
    moved out to separate project upstream.
  - Updated copyright to remove obsolete file reference
* debian/patches:
  - Removed patches which have been included upstream.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
"""Tests for the Event Queue."""
21
21
 
22
22
import logging
23
 
import unittest
24
23
 
25
24
from ubuntuone.syncdaemon import (
26
25
    event_queue,
27
26
    filesystem_manager,
28
27
    tritcask,
29
28
)
30
 
from contrib.testing import testcase
 
29
from contrib.testing.testcase import BaseTwistedTestCase, FakeVolumeManager
31
30
from twisted.internet import defer
32
31
from ubuntuone.devtools.handlers import MementoHandler
33
32
 
34
33
 
35
 
class BaseEQTestCase(testcase.BaseTwistedTestCase):
 
34
class BaseEQTestCase(BaseTwistedTestCase):
36
35
    """ Setup an EQ for test. """
37
36
 
 
37
    @defer.inlineCallbacks
38
38
    def setUp(self):
39
39
        """Setup the test."""
40
 
        testcase.BaseTwistedTestCase.setUp(self)
 
40
        yield super(BaseEQTestCase, self).setUp()
41
41
        self.fsmdir = self.mktemp('fsmdir')
42
42
        self.partials_dir = self.mktemp('partials_dir')
43
43
        self.root_dir = self.mktemp('root_dir')
44
44
        self.home_dir = self.mktemp('home_dir')
45
 
        self.vm = testcase.FakeVolumeManager(self.root_dir)
 
45
        self.vm = FakeVolumeManager(self.root_dir)
46
46
        self.db = tritcask.Tritcask(self.mktemp('tritcask'))
 
47
        self.addCleanup(self.db.shutdown)
47
48
        self.fs = filesystem_manager.FileSystemManager(self.fsmdir,
48
49
                                                       self.partials_dir,
49
50
                                                       self.vm, self.db)
52
53
        self.fs.set_by_path(path=self.root_dir,
53
54
                              local_hash=None, server_hash=None)
54
55
        self.eq = event_queue.EventQueue(self.fs)
 
56
        self.eq.listener_map = {}
 
57
        self.addCleanup(self.eq.shutdown)
55
58
        self.fs.register_eq(self.eq)
56
59
 
57
60
        # add a Memento handler to the logger
59
62
        self.log_handler.setLevel(logging.DEBUG)
60
63
        self.eq.log.addHandler(self.log_handler)
61
64
 
62
 
    def tearDown(self):
63
 
        """Clean up the tests."""
64
 
        self.eq.listener_map = {}
65
 
        self.eq.shutdown()
66
 
        self.db.shutdown()
67
 
        self.rmtree(self.tmpdir)
68
 
        testcase.BaseTwistedTestCase.tearDown(self)
69
 
 
70
65
 
71
66
class SubscriptionTests(BaseEQTestCase):
72
67
    """Test to subscribe and unsubscribe to the EQ."""
384
379
 
385
380
        d.addCallback(callback)
386
381
        return d
387
 
 
388
 
 
389
 
def test_suite():
390
 
    # pylint: disable-msg=C0111
391
 
    return unittest.TestLoader().loadTestsFromName(__name__)
392
 
 
393
 
if __name__ == "__main__":
394
 
    unittest.main()