~0x44/nova/extdoc

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/words/test/test_xishutil.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Test cases for twisted.words.xish.utility
 
6
"""
 
7
 
 
8
from twisted.trial import unittest
 
9
 
 
10
from twisted.python.util import OrderedDict
 
11
from twisted.words.xish import utility
 
12
from twisted.words.xish.domish import Element
 
13
from twisted.words.xish.utility import EventDispatcher
 
14
 
 
15
class CallbackTracker:
 
16
    """
 
17
    Test helper for tracking callbacks.
 
18
 
 
19
    Increases a counter on each call to L{call} and stores the object
 
20
    passed in the call.
 
21
    """
 
22
 
 
23
    def __init__(self):
 
24
        self.called = 0
 
25
        self.obj = None
 
26
 
 
27
 
 
28
    def call(self, obj):
 
29
        self.called = self.called + 1
 
30
        self.obj = obj
 
31
 
 
32
 
 
33
 
 
34
class OrderedCallbackTracker:
 
35
    """
 
36
    Test helper for tracking callbacks and their order.
 
37
    """
 
38
 
 
39
    def __init__(self):
 
40
        self.callList = []
 
41
 
 
42
 
 
43
    def call1(self, object):
 
44
        self.callList.append(self.call1)
 
45
 
 
46
 
 
47
    def call2(self, object):
 
48
        self.callList.append(self.call2)
 
49
 
 
50
 
 
51
    def call3(self, object):
 
52
        self.callList.append(self.call3)
 
53
 
 
54
 
 
55
 
 
56
class EventDispatcherTest(unittest.TestCase):
 
57
    """
 
58
    Tests for L{EventDispatcher}.
 
59
    """
 
60
 
 
61
    def testStuff(self):
 
62
        d = EventDispatcher()
 
63
        cb1 = CallbackTracker()
 
64
        cb2 = CallbackTracker()
 
65
        cb3 = CallbackTracker()
 
66
 
 
67
        d.addObserver("/message/body", cb1.call)
 
68
        d.addObserver("/message", cb1.call)
 
69
        d.addObserver("/presence", cb2.call)
 
70
        d.addObserver("//event/testevent", cb3.call)
 
71
 
 
72
        msg = Element(("ns", "message"))
 
73
        msg.addElement("body")
 
74
 
 
75
        pres = Element(("ns", "presence"))
 
76
        pres.addElement("presence")
 
77
 
 
78
        d.dispatch(msg)
 
79
        self.assertEquals(cb1.called, 2)
 
80
        self.assertEquals(cb1.obj, msg)
 
81
        self.assertEquals(cb2.called, 0)
 
82
 
 
83
        d.dispatch(pres)
 
84
        self.assertEquals(cb1.called, 2)
 
85
        self.assertEquals(cb2.called, 1)
 
86
        self.assertEquals(cb2.obj, pres)
 
87
        self.assertEquals(cb3.called, 0)
 
88
 
 
89
        d.dispatch(d, "//event/testevent")
 
90
        self.assertEquals(cb3.called, 1)
 
91
        self.assertEquals(cb3.obj, d)
 
92
 
 
93
        d.removeObserver("/presence", cb2.call)
 
94
        d.dispatch(pres)
 
95
        self.assertEquals(cb2.called, 1)
 
96
 
 
97
 
 
98
    def test_addObserverTwice(self):
 
99
        """
 
100
        Test adding two observers for the same query.
 
101
 
 
102
        When the event is dispath both of the observers need to be called.
 
103
        """
 
104
        d = EventDispatcher()
 
105
        cb1 = CallbackTracker()
 
106
        cb2 = CallbackTracker()
 
107
 
 
108
        d.addObserver("//event/testevent", cb1.call)
 
109
        d.addObserver("//event/testevent", cb2.call)
 
110
        d.dispatch(d, "//event/testevent")
 
111
 
 
112
        self.assertEquals(cb1.called, 1)
 
113
        self.assertEquals(cb1.obj, d)
 
114
        self.assertEquals(cb2.called, 1)
 
115
        self.assertEquals(cb2.obj, d)
 
116
 
 
117
 
 
118
    def test_addObserverInDispatch(self):
 
119
        """
 
120
        Test for registration of an observer during dispatch.
 
121
        """
 
122
        d = EventDispatcher()
 
123
        msg = Element(("ns", "message"))
 
124
        cb = CallbackTracker()
 
125
 
 
126
        def onMessage(_):
 
127
            d.addObserver("/message", cb.call)
 
128
 
 
129
        d.addOnetimeObserver("/message", onMessage)
 
130
 
 
131
        d.dispatch(msg)
 
132
        self.assertEquals(cb.called, 0)
 
133
 
 
134
        d.dispatch(msg)
 
135
        self.assertEquals(cb.called, 1)
 
136
 
 
137
        d.dispatch(msg)
 
138
        self.assertEquals(cb.called, 2)
 
139
 
 
140
 
 
141
    def test_addOnetimeObserverInDispatch(self):
 
142
        """
 
143
        Test for registration of a onetime observer during dispatch.
 
144
        """
 
145
        d = EventDispatcher()
 
146
        msg = Element(("ns", "message"))
 
147
        cb = CallbackTracker()
 
148
 
 
149
        def onMessage(msg):
 
150
            d.addOnetimeObserver("/message", cb.call)
 
151
 
 
152
        d.addOnetimeObserver("/message", onMessage)
 
153
 
 
154
        d.dispatch(msg)
 
155
        self.assertEquals(cb.called, 0)
 
156
 
 
157
        d.dispatch(msg)
 
158
        self.assertEquals(cb.called, 1)
 
159
 
 
160
        d.dispatch(msg)
 
161
        self.assertEquals(cb.called, 1)
 
162
 
 
163
 
 
164
    def testOnetimeDispatch(self):
 
165
        d = EventDispatcher()
 
166
        msg = Element(("ns", "message"))
 
167
        cb = CallbackTracker()
 
168
 
 
169
        d.addOnetimeObserver("/message", cb.call)
 
170
        d.dispatch(msg)
 
171
        self.assertEquals(cb.called, 1)
 
172
        d.dispatch(msg)
 
173
        self.assertEquals(cb.called, 1)
 
174
 
 
175
 
 
176
    def testDispatcherResult(self):
 
177
        d = EventDispatcher()
 
178
        msg = Element(("ns", "message"))
 
179
        pres = Element(("ns", "presence"))
 
180
        cb = CallbackTracker()
 
181
 
 
182
        d.addObserver("/presence", cb.call)
 
183
        result = d.dispatch(msg)
 
184
        self.assertEquals(False, result)
 
185
 
 
186
        result = d.dispatch(pres)
 
187
        self.assertEquals(True, result)
 
188
 
 
189
 
 
190
    def testOrderedXPathDispatch(self):
 
191
        d = EventDispatcher()
 
192
        cb = OrderedCallbackTracker()
 
193
        d.addObserver("/message/body", cb.call2)
 
194
        d.addObserver("/message", cb.call3, -1)
 
195
        d.addObserver("/message/body", cb.call1, 1)
 
196
 
 
197
        msg = Element(("ns", "message"))
 
198
        msg.addElement("body")
 
199
        d.dispatch(msg)
 
200
        self.assertEquals(cb.callList, [cb.call1, cb.call2, cb.call3],
 
201
                          "Calls out of order: %s" %
 
202
                          repr([c.__name__ for c in cb.callList]))
 
203
 
 
204
 
 
205
    # Observers are put into CallbackLists that are then put into dictionaries
 
206
    # keyed by the event trigger. Upon removal of the last observer for a
 
207
    # particular event trigger, the (now empty) CallbackList and corresponding
 
208
    # event trigger should be removed from those dictionaries to prevent
 
209
    # slowdown and memory leakage.
 
210
 
 
211
    def test_cleanUpRemoveEventObserver(self):
 
212
        """
 
213
        Test observer clean-up after removeObserver for named events.
 
214
        """
 
215
 
 
216
        d = EventDispatcher()
 
217
        cb = CallbackTracker()
 
218
 
 
219
        d.addObserver('//event/test', cb.call)
 
220
        d.dispatch(None, '//event/test')
 
221
        self.assertEqual(1, cb.called)
 
222
        d.removeObserver('//event/test', cb.call)
 
223
        self.assertEqual(0, len(d._eventObservers.pop(0)))
 
224
 
 
225
 
 
226
    def test_cleanUpRemoveXPathObserver(self):
 
227
        """
 
228
        Test observer clean-up after removeObserver for XPath events.
 
229
        """
 
230
 
 
231
        d = EventDispatcher()
 
232
        cb = CallbackTracker()
 
233
        msg = Element((None, "message"))
 
234
 
 
235
        d.addObserver('/message', cb.call)
 
236
        d.dispatch(msg)
 
237
        self.assertEqual(1, cb.called)
 
238
        d.removeObserver('/message', cb.call)
 
239
        self.assertEqual(0, len(d._xpathObservers.pop(0)))
 
240
 
 
241
 
 
242
    def test_cleanUpOnetimeEventObserver(self):
 
243
        """
 
244
        Test observer clean-up after onetime named events.
 
245
        """
 
246
 
 
247
        d = EventDispatcher()
 
248
        cb = CallbackTracker()
 
249
 
 
250
        d.addOnetimeObserver('//event/test', cb.call)
 
251
        d.dispatch(None, '//event/test')
 
252
        self.assertEqual(1, cb.called)
 
253
        self.assertEqual(0, len(d._eventObservers.pop(0)))
 
254
 
 
255
 
 
256
    def test_cleanUpOnetimeXPathObserver(self):
 
257
        """
 
258
        Test observer clean-up after onetime XPath events.
 
259
        """
 
260
 
 
261
        d = EventDispatcher()
 
262
        cb = CallbackTracker()
 
263
        msg = Element((None, "message"))
 
264
 
 
265
        d.addOnetimeObserver('/message', cb.call)
 
266
        d.dispatch(msg)
 
267
        self.assertEqual(1, cb.called)
 
268
        self.assertEqual(0, len(d._xpathObservers.pop(0)))
 
269
 
 
270
 
 
271
    def test_observerRaisingException(self):
 
272
        """
 
273
        Test that exceptions in observers do not bubble up to dispatch.
 
274
 
 
275
        The exceptions raised in observers should be logged and other
 
276
        observers should be called as if nothing happened.
 
277
        """
 
278
 
 
279
        class OrderedCallbackList(utility.CallbackList):
 
280
            def __init__(self):
 
281
                self.callbacks = OrderedDict()
 
282
 
 
283
        class TestError(Exception):
 
284
            pass
 
285
 
 
286
        def raiseError(_):
 
287
            raise TestError()
 
288
 
 
289
        d = EventDispatcher()
 
290
        cb = CallbackTracker()
 
291
 
 
292
        originalCallbackList = utility.CallbackList
 
293
 
 
294
        try:
 
295
            utility.CallbackList = OrderedCallbackList
 
296
 
 
297
            d.addObserver('//event/test', raiseError)
 
298
            d.addObserver('//event/test', cb.call)
 
299
            try:
 
300
                d.dispatch(None, '//event/test')
 
301
            except TestError:
 
302
                self.fail("TestError raised. Should have been logged instead.")
 
303
 
 
304
            self.assertEqual(1, len(self.flushLoggedErrors(TestError)))
 
305
            self.assertEqual(1, cb.called)
 
306
        finally:
 
307
            utility.CallbackList = originalCallbackList
 
308
 
 
309
 
 
310
 
 
311
class XmlPipeTest(unittest.TestCase):
 
312
    """
 
313
    Tests for L{twisted.words.xish.utility.XmlPipe}.
 
314
    """
 
315
 
 
316
    def setUp(self):
 
317
        self.pipe = utility.XmlPipe()
 
318
 
 
319
 
 
320
    def test_sendFromSource(self):
 
321
        """
 
322
        Send an element from the source and observe it from the sink.
 
323
        """
 
324
        def cb(obj):
 
325
            called.append(obj)
 
326
 
 
327
        called = []
 
328
        self.pipe.sink.addObserver('/test[@xmlns="testns"]', cb)
 
329
        element = Element(('testns', 'test'))
 
330
        self.pipe.source.send(element)
 
331
        self.assertEquals([element], called)
 
332
 
 
333
 
 
334
    def test_sendFromSink(self):
 
335
        """
 
336
        Send an element from the sink and observe it from the source.
 
337
        """
 
338
        def cb(obj):
 
339
            called.append(obj)
 
340
 
 
341
        called = []
 
342
        self.pipe.source.addObserver('/test[@xmlns="testns"]', cb)
 
343
        element = Element(('testns', 'test'))
 
344
        self.pipe.sink.send(element)
 
345
        self.assertEquals([element], called)