~rmcbride/ubuntu/lucid/ubuntuone-client/fixucg

« back to all changes in this revision

Viewing changes to tests/syncdaemon/test_eventqueue.py

  • Committer: Bazaar Package Importer
  • Author(s): Rodney Dawes
  • Date: 2009-07-16 17:00:00 UTC
  • mto: (17.1.1 ubuntuone-client)
  • mto: This revision was merged to the branch mainline in revision 3.
  • Revision ID: james.westby@ubuntu.com-20090716170000-tnnk149g57bxxo24
Tags: upstream-0.90.4
ImportĀ upstreamĀ versionĀ 0.90.4

Show diffs side-by-side

added added

removed removed

Lines of Context:
40
40
        self.root_dir = self.mktemp('root_dir')
41
41
        self.fs = filesystem_manager.FileSystemManager(self.fsmdir,
42
42
                                   testcase.FakeVolumeManager(self.root_dir))
43
 
        mdobj = self.fs.create(path=self.root_dir,
44
 
                                 share_id='', is_dir=True)
 
43
        self.fs.create(path=self.root_dir,
 
44
                       share_id='', is_dir=True)
45
45
        self.fs.set_by_path(path=self.root_dir,
46
46
                              local_hash=None, server_hash=None)
47
47
        self.eq = event_queue.EventQueue(self.fs)
57
57
 
58
58
    def test_subscription(self):
59
59
        '''Test that subscription works.'''
 
60
        # pylint: disable-msg=W0212
60
61
        # simple one
61
62
        o = object()
62
63
        self.eq.subscribe(o)
70
71
 
71
72
    def test_unsubscription(self):
72
73
        '''Test that unsubscription works.'''
 
74
        # pylint: disable-msg=W0212
73
75
        # simple one
74
76
        o = object()
75
77
        self.eq.subscribe(o)
128
130
        '''Push events and listem them.'''
129
131
 
130
132
        # helper class, pylint: disable-msg=C0111
131
 
        class C(object):
 
133
        class Create(object):
132
134
            def __init__(self):
133
135
                self.a = None
134
136
            def handle_FS_FILE_CREATE(self, a):
135
137
                self.a = a
136
138
 
137
139
        # it get passed!
138
 
        c = C()
 
140
        c = Create()
139
141
        self.eq.subscribe(c)
140
142
        self.eq.push("FS_FILE_CREATE", 1)
141
143
        self.assertEqual(c.a, 1)
142
144
        self.eq.unsubscribe(c)
143
145
 
144
146
        # don't get what don't listen
145
 
        c = C()
 
147
        c = Create()
146
148
        self.eq.subscribe(c)
147
149
        self.eq.push("FS_FILE_DELETE", 1)
148
150
        self.assertEqual(c.a, None)
152
154
        '''Check that the handle signatures are forced when passing.'''
153
155
 
154
156
        # helper class, pylint: disable-msg=C0111
155
 
        class C(object):
 
157
        class Create(object):
156
158
            def handle_FS_FILE_CREATE(self, a, b): # it should be only 'a' here
157
159
                pass
158
160
 
159
161
        # it get passed!
160
 
        c = C()
 
162
        c = Create()
161
163
        self.eq.subscribe(c)
162
164
 
163
165
        # caught by EQ
173
175
        self.eq.push("FS_FILE_CREATE", 1)
174
176
        self.assertEquals(1, len(handler.records))
175
177
        self.assertEquals('FS_FILE_CREATE', handler.records[0].args[0])
176
 
        self.assertEquals(C, type(handler.records[0].args[1]))
 
178
        self.assertEquals(Create, type(handler.records[0].args[1]))
177
179
 
178
180
        self.eq.log.removeHandler(handler)
179
181
        self.eq.unsubscribe(c)
270
272
 
271
273
        # create 10 listeners in order to create an event madness
272
274
        listeners = []
 
275
        # pylint: disable-msg=W0612
273
276
        for i in xrange(0, 10):
274
277
            l = Listener(self.eq)
275
278
            listeners.append(l)