~ubuntu-branches/ubuntu/precise/ubuntuone-client/precise-201112142106

« back to all changes in this revision

Viewing changes to tests/syncdaemon/test_eventsnanny.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2011-08-09 12:47:56 UTC
  • mfrom: (1.1.53 upstream)
  • Revision ID: james.westby@ubuntu.com-20110809124756-7nzilp3oix0a1yl9
Tags: 1.7.1-0ubuntu1
* New upstream release.
* debian/*:
  - Removed obsolete pycompat file
  - Removed ubuntuone-client-gnome deps and binary packaging, as it was
    moved out to separate project upstream.
  - Updated copyright to remove obsolete file reference
* debian/patches:
  - Removed patches which have been included upstream.

Show diffs side-by-side

added added

removed removed

Lines of Context:
20
20
 
21
21
import inspect
22
22
import os
23
 
import shutil
24
23
import unittest
25
24
 
26
25
from twisted.internet import defer, reactor
29
28
from ubuntuone.syncdaemon import (filesystem_manager, event_queue,
30
29
                                  events_nanny, hash_queue, tritcask)
31
30
 
32
 
TESTS_DIR = os.path.join(os.getcwd(), "tmp")
33
 
 
34
31
 
35
32
class EventListener(object):
36
33
    """Store the events."""
63
60
    """Test the AQ Download Finished Nanny behaviour."""
64
61
    timeout = 2
65
62
 
 
63
    @defer.inlineCallbacks
66
64
    def setUp(self):
67
65
        """set up the test."""
68
 
        testcase.BaseTwistedTestCase.setUp(self)
69
 
        try:
70
 
            os.mkdir(TESTS_DIR)
71
 
        except OSError:
72
 
            # already there, remove it to clean and create again
73
 
            shutil.rmtree(TESTS_DIR)
74
 
            os.mkdir(TESTS_DIR)
75
 
        self.usrdir = os.path.join(TESTS_DIR, "usrdir")
76
 
        self.partials_dir = os.path.join(TESTS_DIR, "partials")
 
66
        yield super(DownloadFinishedTests, self).setUp()
 
67
        self.usrdir = self.mktemp("usrdir")
 
68
        self.partials_dir = self.mktemp("partials")
77
69
 
78
70
        # hack hash queue
79
71
        self._original_hash_run = hash_queue._Hasher.run
97
89
        fsm.create(self.tf, "")
98
90
        fsm.set_node_id(self.tf, "nodeid")
99
91
 
 
92
    @defer.inlineCallbacks
100
93
    def tearDown(self):
101
94
        """tear down the test."""
102
95
        hash_queue._Hasher.run = self._original_hash_run
103
96
        self.hq.shutdown()
104
 
        testcase.BaseTwistedTestCase.tearDown(self)
 
97
        yield super(DownloadFinishedTests, self).tearDown()
105
98
 
106
99
    def insert_in_hq(self, path, node_id):
107
100
        """Inserts something in HQ and waits that thread."""
862
855
                defined_args = inspect.getargspec(meth)[0]
863
856
                self.assertEqual(defined_args[0], 'self')
864
857
                self.assertEqual(set(defined_args[1:]), set(evtargs))
865
 
 
866
 
 
867
 
def test_suite():
868
 
    # pylint: disable-msg=C0111
869
 
    return unittest.TestLoader().loadTestsFromName(__name__)
870
 
 
871
 
if __name__ == "__main__":
872
 
    unittest.main()