1
# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Test cases for twisted.words.xish.utility
8
from twisted.trial import unittest
10
from twisted.python.util import OrderedDict
11
from twisted.words.xish import utility
12
from twisted.words.xish.domish import Element
13
from twisted.words.xish.utility import EventDispatcher
15
class CallbackTracker:
17
Test helper for tracking callbacks.
19
Increases a counter on each call to L{call} and stores the object
29
self.called = self.called + 1
34
class OrderedCallbackTracker:
36
Test helper for tracking callbacks and their order.
43
def call1(self, object):
44
self.callList.append(self.call1)
47
def call2(self, object):
48
self.callList.append(self.call2)
51
def call3(self, object):
52
self.callList.append(self.call3)
56
class EventDispatcherTest(unittest.TestCase):
58
Tests for L{EventDispatcher}.
63
cb1 = CallbackTracker()
64
cb2 = CallbackTracker()
65
cb3 = CallbackTracker()
67
d.addObserver("/message/body", cb1.call)
68
d.addObserver("/message", cb1.call)
69
d.addObserver("/presence", cb2.call)
70
d.addObserver("//event/testevent", cb3.call)
72
msg = Element(("ns", "message"))
73
msg.addElement("body")
75
pres = Element(("ns", "presence"))
76
pres.addElement("presence")
79
self.assertEquals(cb1.called, 2)
80
self.assertEquals(cb1.obj, msg)
81
self.assertEquals(cb2.called, 0)
84
self.assertEquals(cb1.called, 2)
85
self.assertEquals(cb2.called, 1)
86
self.assertEquals(cb2.obj, pres)
87
self.assertEquals(cb3.called, 0)
89
d.dispatch(d, "//event/testevent")
90
self.assertEquals(cb3.called, 1)
91
self.assertEquals(cb3.obj, d)
93
d.removeObserver("/presence", cb2.call)
95
self.assertEquals(cb2.called, 1)
98
def test_addObserverTwice(self):
100
Test adding two observers for the same query.
102
When the event is dispath both of the observers need to be called.
104
d = EventDispatcher()
105
cb1 = CallbackTracker()
106
cb2 = CallbackTracker()
108
d.addObserver("//event/testevent", cb1.call)
109
d.addObserver("//event/testevent", cb2.call)
110
d.dispatch(d, "//event/testevent")
112
self.assertEquals(cb1.called, 1)
113
self.assertEquals(cb1.obj, d)
114
self.assertEquals(cb2.called, 1)
115
self.assertEquals(cb2.obj, d)
118
def test_addObserverInDispatch(self):
120
Test for registration of an observer during dispatch.
122
d = EventDispatcher()
123
msg = Element(("ns", "message"))
124
cb = CallbackTracker()
127
d.addObserver("/message", cb.call)
129
d.addOnetimeObserver("/message", onMessage)
132
self.assertEquals(cb.called, 0)
135
self.assertEquals(cb.called, 1)
138
self.assertEquals(cb.called, 2)
141
def test_addOnetimeObserverInDispatch(self):
143
Test for registration of a onetime observer during dispatch.
145
d = EventDispatcher()
146
msg = Element(("ns", "message"))
147
cb = CallbackTracker()
150
d.addOnetimeObserver("/message", cb.call)
152
d.addOnetimeObserver("/message", onMessage)
155
self.assertEquals(cb.called, 0)
158
self.assertEquals(cb.called, 1)
161
self.assertEquals(cb.called, 1)
164
def testOnetimeDispatch(self):
165
d = EventDispatcher()
166
msg = Element(("ns", "message"))
167
cb = CallbackTracker()
169
d.addOnetimeObserver("/message", cb.call)
171
self.assertEquals(cb.called, 1)
173
self.assertEquals(cb.called, 1)
176
def testDispatcherResult(self):
177
d = EventDispatcher()
178
msg = Element(("ns", "message"))
179
pres = Element(("ns", "presence"))
180
cb = CallbackTracker()
182
d.addObserver("/presence", cb.call)
183
result = d.dispatch(msg)
184
self.assertEquals(False, result)
186
result = d.dispatch(pres)
187
self.assertEquals(True, result)
190
def testOrderedXPathDispatch(self):
191
d = EventDispatcher()
192
cb = OrderedCallbackTracker()
193
d.addObserver("/message/body", cb.call2)
194
d.addObserver("/message", cb.call3, -1)
195
d.addObserver("/message/body", cb.call1, 1)
197
msg = Element(("ns", "message"))
198
msg.addElement("body")
200
self.assertEquals(cb.callList, [cb.call1, cb.call2, cb.call3],
201
"Calls out of order: %s" %
202
repr([c.__name__ for c in cb.callList]))
205
# Observers are put into CallbackLists that are then put into dictionaries
206
# keyed by the event trigger. Upon removal of the last observer for a
207
# particular event trigger, the (now empty) CallbackList and corresponding
208
# event trigger should be removed from those dictionaries to prevent
209
# slowdown and memory leakage.
211
def test_cleanUpRemoveEventObserver(self):
213
Test observer clean-up after removeObserver for named events.
216
d = EventDispatcher()
217
cb = CallbackTracker()
219
d.addObserver('//event/test', cb.call)
220
d.dispatch(None, '//event/test')
221
self.assertEqual(1, cb.called)
222
d.removeObserver('//event/test', cb.call)
223
self.assertEqual(0, len(d._eventObservers.pop(0)))
226
def test_cleanUpRemoveXPathObserver(self):
228
Test observer clean-up after removeObserver for XPath events.
231
d = EventDispatcher()
232
cb = CallbackTracker()
233
msg = Element((None, "message"))
235
d.addObserver('/message', cb.call)
237
self.assertEqual(1, cb.called)
238
d.removeObserver('/message', cb.call)
239
self.assertEqual(0, len(d._xpathObservers.pop(0)))
242
def test_cleanUpOnetimeEventObserver(self):
244
Test observer clean-up after onetime named events.
247
d = EventDispatcher()
248
cb = CallbackTracker()
250
d.addOnetimeObserver('//event/test', cb.call)
251
d.dispatch(None, '//event/test')
252
self.assertEqual(1, cb.called)
253
self.assertEqual(0, len(d._eventObservers.pop(0)))
256
def test_cleanUpOnetimeXPathObserver(self):
258
Test observer clean-up after onetime XPath events.
261
d = EventDispatcher()
262
cb = CallbackTracker()
263
msg = Element((None, "message"))
265
d.addOnetimeObserver('/message', cb.call)
267
self.assertEqual(1, cb.called)
268
self.assertEqual(0, len(d._xpathObservers.pop(0)))
271
def test_observerRaisingException(self):
273
Test that exceptions in observers do not bubble up to dispatch.
275
The exceptions raised in observers should be logged and other
276
observers should be called as if nothing happened.
279
class OrderedCallbackList(utility.CallbackList):
281
self.callbacks = OrderedDict()
283
class TestError(Exception):
289
d = EventDispatcher()
290
cb = CallbackTracker()
292
originalCallbackList = utility.CallbackList
295
utility.CallbackList = OrderedCallbackList
297
d.addObserver('//event/test', raiseError)
298
d.addObserver('//event/test', cb.call)
300
d.dispatch(None, '//event/test')
302
self.fail("TestError raised. Should have been logged instead.")
304
self.assertEqual(1, len(self.flushLoggedErrors(TestError)))
305
self.assertEqual(1, cb.called)
307
utility.CallbackList = originalCallbackList
311
class XmlPipeTest(unittest.TestCase):
313
Tests for L{twisted.words.xish.utility.XmlPipe}.
317
self.pipe = utility.XmlPipe()
320
def test_sendFromSource(self):
322
Send an element from the source and observe it from the sink.
328
self.pipe.sink.addObserver('/test[@xmlns="testns"]', cb)
329
element = Element(('testns', 'test'))
330
self.pipe.source.send(element)
331
self.assertEquals([element], called)
334
def test_sendFromSink(self):
336
Send an element from the sink and observe it from the source.
342
self.pipe.source.addObserver('/test[@xmlns="testns"]', cb)
343
element = Element(('testns', 'test'))
344
self.pipe.sink.send(element)
345
self.assertEquals([element], called)