1
# -*- test-case-name: twisted.words.test.test_xishutil -*-
3
# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
4
# See LICENSE for details.
7
Event Dispatching and Callback utilities.
10
from twisted.python import log
11
from twisted.words.xish import xpath
13
class _MethodWrapper(object):
15
Internal class for tracking method calls.
17
def __init__(self, method, *args, **kwargs):
23
def __call__(self, *args, **kwargs):
24
nargs = self.args + args
25
nkwargs = self.kwargs.copy()
26
nkwargs.update(kwargs)
27
self.method(*nargs, **nkwargs)
33
Container for callbacks.
35
Event queries are linked to lists of callables. When a matching event
36
occurs, these callables are called in sequence. One-time callbacks
37
are removed from the list after the first time the event was triggered.
39
Arguments to callbacks are split spread across two sets. The first set,
40
callback specific, is passed to C{addCallback} and is used for all
41
subsequent event triggers. The second set is passed to C{callback} and is
42
event specific. Positional arguments in the second set come after the
43
positional arguments of the first set. Keyword arguments in the second set
44
override those in the first set.
46
@ivar callbacks: The registered callbacks as mapping from the callable to a
47
tuple of a wrapper for that callable that keeps the
48
callback specific arguments and a boolean that signifies
49
if it is to be called only once.
50
@type callbacks: C{dict}
57
def addCallback(self, onetime, method, *args, **kwargs):
61
The arguments passed are used as callback specific arguments.
63
@param onetime: If C{True}, this callback is called at most once.
64
@type onetime: C{bool}
65
@param method: The callback callable to be added.
66
@param args: Positional arguments to the callable.
68
@param kwargs: Keyword arguments to the callable.
72
if not method in self.callbacks:
73
self.callbacks[method] = (_MethodWrapper(method, *args, **kwargs),
77
def removeCallback(self, method):
81
@param method: The callable to be removed.
84
if method in self.callbacks:
85
del self.callbacks[method]
88
def callback(self, *args, **kwargs):
90
Call all registered callbacks.
92
The passed arguments are event specific and augment and override
93
the callback specific arguments as described above.
95
@note: Exceptions raised by callbacks are trapped and logged. They will
96
not propagate up to make sure other callbacks will still be
97
called, and the event dispatching allways succeeds.
99
@param args: Positional arguments to the callable.
101
@param kwargs: Keyword arguments to the callable.
102
@type kwargs: C{dict}
105
for key, (methodwrapper, onetime) in self.callbacks.items():
107
methodwrapper(*args, **kwargs)
112
del self.callbacks[key]
117
Return if list of registered callbacks is empty.
122
return len(self.callbacks) == 0
126
class EventDispatcher:
128
Event dispatching service.
130
The C{EventDispatcher} allows observers to be registered for certain events
131
that are dispatched. There are two types of events: XPath events and Named
134
Every dispatch is triggered by calling L{dispatch} with a data object and,
135
for named events, the name of the event.
137
When an XPath type event is dispatched, the associated object is assumed to
138
be an L{Element<twisted.words.xish.domish.Element>} instance, which is
139
matched against all registered XPath queries. For every match, the
140
respective observer will be called with the data object.
142
A named event will simply call each registered observer for that particular
143
event name, with the data object. Unlike XPath type events, the data object
144
is not restricted to L{Element<twisted.words.xish.domish.Element>}, but can
147
When registering observers, the event that is to be observed is specified
148
using an L{xpath.XPathQuery} instance or a string. In the latter case, the
149
string can also contain the string representation of an XPath expression.
150
To distinguish these from named events, each named event should start with
151
a special prefix that is stored in C{self.prefix}. It defaults to
154
Observers registered using L{addObserver} are persistent: after the
155
observer has been triggered by a dispatch, it remains registered for a
156
possible next dispatch. If instead L{addOnetimeObserver} was used to
157
observe an event, the observer is removed from the list of observers after
158
the first observed event.
160
Obsevers can also prioritized, by providing an optional C{priority}
161
parameter to the L{addObserver} and L{addOnetimeObserver} methods. Higher
162
priority observers are then called before lower priority observers.
164
Finally, observers can be unregistered by using L{removeObserver}.
167
def __init__(self, eventprefix="//event/"):
168
self.prefix = eventprefix
169
self._eventObservers = {}
170
self._xpathObservers = {}
171
self._dispatchDepth = 0 # Flag indicating levels of dispatching
173
self._updateQueue = [] # Queued updates for observer ops
176
def _getEventAndObservers(self, event):
177
if isinstance(event, xpath.XPathQuery):
179
observers = self._xpathObservers
181
if self.prefix == event[:len(self.prefix)]:
183
observers = self._eventObservers
186
event = xpath.internQuery(event)
187
observers = self._xpathObservers
189
return event, observers
192
def addOnetimeObserver(self, event, observerfn, priority=0, *args, **kwargs):
194
Register a one-time observer for an event.
196
Like L{addObserver}, but is only triggered at most once. See there
197
for a description of the parameters.
199
self._addObserver(True, event, observerfn, priority, *args, **kwargs)
202
def addObserver(self, event, observerfn, priority=0, *args, **kwargs):
204
Register an observer for an event.
206
Each observer will be registered with a certain priority. Higher
207
priority observers get called before lower priority observers.
209
@param event: Name or XPath query for the event to be monitored.
210
@type event: C{str} or L{xpath.XPathQuery}.
211
@param observerfn: Function to be called when the specified event
212
has been triggered. This callable takes
213
one parameter: the data object that triggered
214
the event. When specified, the C{*args} and
215
C{**kwargs} parameters to addObserver are being used
216
as additional parameters to the registered observer
218
@param priority: (Optional) priority of this observer in relation to
219
other observer that match the same event. Defaults to
221
@type priority: C{int}
223
self._addObserver(False, event, observerfn, priority, *args, **kwargs)
226
def _addObserver(self, onetime, event, observerfn, priority, *args, **kwargs):
227
# If this is happening in the middle of the dispatch, queue
228
# it up for processing after the dispatch completes
229
if self._dispatchDepth > 0:
230
self._updateQueue.append(lambda:self._addObserver(onetime, event, observerfn, priority, *args, **kwargs))
233
event, observers = self._getEventAndObservers(event)
235
if priority not in observers:
237
observers[priority] = {event: cbl}
239
priorityObservers = observers[priority]
240
if event not in priorityObservers:
242
observers[priority][event] = cbl
244
cbl = priorityObservers[event]
246
cbl.addCallback(onetime, observerfn, *args, **kwargs)
249
def removeObserver(self, event, observerfn):
251
Remove callable as observer for an event.
253
The observer callable is removed for all priority levels for the
256
@param event: Event for which the observer callable was registered.
257
@type event: C{str} or L{xpath.XPathQuery}
258
@param observerfn: Observer callable to be unregistered.
261
# If this is happening in the middle of the dispatch, queue
262
# it up for processing after the dispatch completes
263
if self._dispatchDepth > 0:
264
self._updateQueue.append(lambda:self.removeObserver(event, observerfn))
267
event, observers = self._getEventAndObservers(event)
270
for priority, priorityObservers in observers.iteritems():
271
for query, callbacklist in priorityObservers.iteritems():
273
callbacklist.removeCallback(observerfn)
274
if callbacklist.isEmpty():
275
emptyLists.append((priority, query))
277
for priority, query in emptyLists:
278
del observers[priority][query]
281
def dispatch(self, obj, event=None):
285
When C{event} is C{None}, an XPath type event is triggered, and
286
C{obj} is assumed to be an instance of
287
L{Element<twisted.words.xish.domish.Element>}. Otherwise, C{event}
288
holds the name of the named event being triggered. In the latter case,
289
C{obj} can be anything.
291
@param obj: The object to be dispatched.
292
@param event: Optional event name.
298
self._dispatchDepth += 1
302
observers = self._eventObservers
303
match = lambda query, obj: query == event
306
observers = self._xpathObservers
307
match = lambda query, obj: query.matches(obj)
309
priorities = observers.keys()
314
for priority in priorities:
315
for query, callbacklist in observers[priority].iteritems():
316
if match(query, obj):
317
callbacklist.callback(obj)
319
if callbacklist.isEmpty():
320
emptyLists.append((priority, query))
322
for priority, query in emptyLists:
323
del observers[priority][query]
325
self._dispatchDepth -= 1
327
# If this is a dispatch within a dispatch, don't
328
# do anything with the updateQueue -- it needs to
329
# wait until we've back all the way out of the stack
330
if self._dispatchDepth == 0:
331
# Deal with pending update operations
332
for f in self._updateQueue:
334
self._updateQueue = []
340
class XmlPipe(object):
344
Connects two objects that communicate stanzas through an XML stream like
345
interface. Each of the ends of the pipe (sink and source) can be used to
346
send XML stanzas to the other side, or add observers to process XML stanzas
347
that were sent from the other side.
349
XML pipes are usually used in place of regular XML streams that are
350
transported over TCP. This is the reason for the use of the names source
351
and sink for both ends of the pipe. The source side corresponds with the
352
entity that initiated the TCP connection, whereas the sink corresponds with
353
the entity that accepts that connection. In this object, though, the source
354
and sink are treated equally.
357
L{XmlStream<twisted.words.protocols.jabber.xmlstream.XmlStream>}s, the sink
358
and source objects are assumed to represent an eternal connected and
359
initialized XML stream. As such, events corresponding to connection,
360
disconnection, initialization and stream errors are not dispatched or
364
@ivar source: Source XML stream.
365
@ivar sink: Sink XML stream.
369
self.source = EventDispatcher()
370
self.sink = EventDispatcher()
371
self.source.send = lambda obj: self.sink.dispatch(obj)
372
self.sink.send = lambda obj: self.source.dispatch(obj)