17
17
__maintainer2__ = 'Florian Boucault <florian@fluendo.com>'
22
20
from elisa.core import log
23
21
from elisa.core.components.message import Message
24
from elisa.core.utils import threadsafe_list
25
from twisted.internet import reactor
22
from elisa.core.utils import threadsafe_list, defer
23
from twisted.internet import reactor, task
27
25
class Bus(log.Loggable):
70
68
# callbacks dictionary mapping callables to Message type lists
71
69
self._callbacks = {}
73
# threading lock for operations on self._callbacks
74
self._callbacks_lock = threading.Lock()
76
def _dispatch_queued_msgs(self):
77
# dispatch pending messages
79
message, sender = self._queue.pop()
80
reactor.callFromThread(self._dispatch, message, sender)
71
def deferred_dispatch(self):
73
Dequeue and dispatch each queued message. This method is
74
called when the bus starts, to dispatch all the messages that
75
were sent before startup.
77
This method can also be used to force dispatch on a bus you
78
don't want to start, for instance for functional test purpose.
80
@returns: a deferred fired when all messages have been dispatched
81
@rtype: L{twisted.internet.defer.Deferred}
85
message, sender = self._queue.pop()
86
yield self._dispatch(message, sender)
88
return task.coiterate(iterate())
84
92
Start message dispatching: once started, messages sent over the bus
85
93
are guaranteed to be dispatched.
87
Queued messages are dispatched inside a separate thread.
95
@returns: a deferred fired when all queued messages have been
97
@rtype: L{elisa.core.utils.defer.Deferred}
89
99
self.info("Starting")
90
100
self._running = True
91
self._dispatch_queued_msgs()
101
return self.deferred_dispatch()
98
108
self.info("Stopping")
99
109
self._running = False
101
111
def send_message(self, message, sender=None):
102
112
"""Send a message over the bus. The message is automatically
103
dispatched (inside a thread) if the L{Bus} is
104
running. Otherwise the message is locally queued until the
113
dispatched if the L{Bus} is running. Otherwise the message is
114
locally queued until the L{Bus} starts.
109
116
@param message: the message to send
110
117
@type message: L{elisa.core.components.message.Message}
111
118
@param sender: the sender object. None by default. Will be passed to
112
119
receiver callbacks.
113
120
@type sender: object
121
@returns: a deferred fired when the message have been
123
@rtype: L{elisa.core.utils.defer.Deferred}
115
125
assert isinstance(message, Message), message
116
126
self.debug("Sending message %r", message)
118
128
if self._running:
119
reactor.callFromThread(self._dispatch, message, sender)
129
dfr = self._dispatch(message, sender)
121
self._queue.append((message, sender))
131
dfr = defer.maybeDeferred(self._queue.append, (message, sender))
123
134
def register(self, callback, *message_types):
125
136
Register a new callback with the bus. The given callback will be
126
137
called when a message of one of the given types is dispatched on the
131
140
@param callback: the callback to register
132
141
@type callback: callable
133
142
@param message_types: Message types to filter on
139
148
self.debug("Registering callback %r for %r message types",
140
149
callback, message_types)
143
self._callbacks_lock.acquire()
144
self._callbacks[callback] = message_types
146
self._callbacks_lock.release()
151
self._callbacks[callback] = message_types
148
153
def unregister(self, callback):
149
154
"""Unregister a callback from the bus.
153
156
@param callback: the callback to register
154
157
@type callback: callable
157
self._callbacks_lock.acquire()
158
if callback in self._callbacks:
159
del self._callbacks[callback]
161
self._callbacks_lock.release()
159
if callback in self._callbacks:
160
del self._callbacks[callback]
163
162
def _dispatch(self, message, sender):
164
"""Dispatch messages to registered callbacks and empty the
163
"""Dispatch one message to registered callbacks.
165
@param message: the message to dispatch
166
@type message: L{elisa.core.components.message.Message}
167
@param sender: the sender object. None by default. Will be passed to
170
@returns: a deferred fired when the message have been
172
@rtype: L{twisted.internet.defer.Deferred}
169
self._callbacks_lock.acquire()
175
def got_failure(failure, message, cb_name):
176
self.warning("Exception happened during dispatch of %r to %s: %s",
177
message, cb_name, failure.getTraceback())
170
181
for callback, mfilter in self._callbacks.iteritems():
171
182
if isinstance(message, mfilter):
174
185
callback.__name__)
175
186
except AttributeError:
176
187
cb_name = callback.__name__
177
self.debug("Dispatching message %r to %r", message, cb_name)
178
callback(message, sender)
188
self.debug("Dispatching message %r to %r", message,
179
190
dispatched = True
181
self._callbacks_lock.release()
184
self.debug("Undispatched message: %r", message)
191
dfr = defer.maybeDeferred(callback, message, sender)
192
dfr.addErrback(got_failure, message, cb_name)
196
self.debug("Undispatched message: %r", message)
198
return task.coiterate(iterate())
187
200
def bus_listener(bus, *message_types):
188
201
""" Utility decorator to simply register a function or method on