~moovida-developers/moovida/account_video_intro

« back to all changes in this revision

Viewing changes to elisa-core/elisa/core/bus.py

  • Committer: Philippe Normand
  • Date: 2009-08-19 11:53:04 UTC
  • mfrom: (1485.2.3 moovida_bus)
  • Revision ID: philippe@fluendo.com-20090819115304-xybmptvgvw648u5a
reworked the message bus to dispatch messages using task coiterators

Show diffs side-by-side

added added

removed removed

Lines of Context:
17
17
__maintainer2__ = 'Florian Boucault <florian@fluendo.com>'
18
18
 
19
19
 
20
 
import threading
21
 
 
22
20
from elisa.core import log
23
21
from elisa.core.components.message import Message
24
 
from elisa.core.utils import threadsafe_list
25
 
from twisted.internet import reactor
 
22
from elisa.core.utils import threadsafe_list, defer
 
23
from twisted.internet import reactor, task
26
24
 
27
25
class Bus(log.Loggable):
28
26
    """
70
68
        # callbacks dictionary mapping callables to Message type lists
71
69
        self._callbacks = {}
72
70
 
73
 
        # threading lock for operations on self._callbacks
74
 
        self._callbacks_lock = threading.Lock()
75
 
 
76
 
    def _dispatch_queued_msgs(self):
77
 
        # dispatch pending messages
78
 
        while self._queue:
79
 
            message, sender = self._queue.pop()
80
 
            reactor.callFromThread(self._dispatch, message, sender)
 
71
    def deferred_dispatch(self):
 
72
        """
 
73
        Dequeue and dispatch each queued message. This method is
 
74
        called when the bus starts, to dispatch all the messages that
 
75
        were sent before startup.
 
76
 
 
77
        This method can also be used to force dispatch on a bus you
 
78
        don't want to start, for instance for functional test purpose.
 
79
 
 
80
        @returns: a deferred fired when all messages have been dispatched
 
81
        @rtype:   L{twisted.internet.defer.Deferred}
 
82
        """
 
83
        def iterate():
 
84
            while self._queue:
 
85
                message, sender = self._queue.pop()
 
86
                yield self._dispatch(message, sender)
 
87
 
 
88
        return task.coiterate(iterate())
81
89
 
82
90
    def start(self):
83
91
        """
84
92
        Start message dispatching: once started, messages sent over the bus
85
93
        are guaranteed to be dispatched.
86
94
 
87
 
        Queued messages are dispatched inside a separate thread.
 
95
        @returns: a deferred fired when all queued messages have been
 
96
                  dispatched
 
97
        @rtype:   L{elisa.core.utils.defer.Deferred}
88
98
        """
89
99
        self.info("Starting")
90
100
        self._running = True
91
 
        self._dispatch_queued_msgs()
 
101
        return self.deferred_dispatch()
92
102
 
93
103
    def stop(self):
94
104
        """
97
107
        """
98
108
        self.info("Stopping")
99
109
        self._running = False
100
 
        
 
110
 
101
111
    def send_message(self, message, sender=None):
102
112
        """Send a message over the bus. The message is automatically
103
 
        dispatched (inside a thread) if the L{Bus} is
104
 
        running. Otherwise the message is locally queued until the
105
 
        L{Bus} starts.
106
 
 
107
 
        MT safe.
 
113
        dispatched if the L{Bus} is running. Otherwise the message is
 
114
        locally queued until the L{Bus} starts.
108
115
 
109
116
        @param message: the message to send
110
117
        @type message:  L{elisa.core.components.message.Message}
111
118
        @param sender:  the sender object. None by default. Will be passed to
112
119
                        receiver callbacks.
113
120
        @type sender:   object
 
121
        @returns:       a deferred fired when the message have been
 
122
                        dispatched or queued
 
123
        @rtype:         L{elisa.core.utils.defer.Deferred}
114
124
        """
115
125
        assert isinstance(message, Message), message
116
126
        self.debug("Sending message %r", message)
117
127
 
118
128
        if self._running:
119
 
            reactor.callFromThread(self._dispatch, message, sender)
 
129
            dfr = self._dispatch(message, sender)
120
130
        else:
121
 
            self._queue.append((message, sender))
122
 
    
 
131
            dfr = defer.maybeDeferred(self._queue.append, (message, sender))
 
132
        return dfr
 
133
 
123
134
    def register(self, callback, *message_types):
124
135
        """
125
136
        Register a new callback with the bus. The given callback will be
126
137
        called when a message of one of the given types is dispatched on the
127
138
        bus.
128
139
 
129
 
        MT safe.
130
 
 
131
140
        @param callback:      the callback to register
132
141
        @type callback:       callable
133
142
        @param message_types: Message types to filter on
139
148
        self.debug("Registering callback %r for %r message types",
140
149
                   callback, message_types)
141
150
 
142
 
        try:
143
 
            self._callbacks_lock.acquire()
144
 
            self._callbacks[callback] = message_types
145
 
        finally:
146
 
            self._callbacks_lock.release()
 
151
        self._callbacks[callback] = message_types
147
152
 
148
153
    def unregister(self, callback):
149
154
        """Unregister a callback from the bus.
150
155
 
151
 
        MT safe.
152
 
 
153
156
        @param callback: the callback to register
154
157
        @type callback:  callable
155
158
        """
156
 
        try:
157
 
            self._callbacks_lock.acquire()
158
 
            if callback in self._callbacks:
159
 
                del self._callbacks[callback]
160
 
        finally:
161
 
            self._callbacks_lock.release()
 
159
        if callback in self._callbacks:
 
160
            del self._callbacks[callback]
162
161
 
163
162
    def _dispatch(self, message, sender):
164
 
        """Dispatch messages to registered callbacks and empty the
165
 
        message queue.
 
163
        """Dispatch one message to registered callbacks.
 
164
 
 
165
        @param message: the message to dispatch
 
166
        @type message:  L{elisa.core.components.message.Message}
 
167
        @param sender:  the sender object. None by default. Will be passed to
 
168
                        receiver callbacks.
 
169
        @type sender:   object
 
170
        @returns:       a deferred fired when the message have been
 
171
                        dispatched
 
172
        @rtype:         L{twisted.internet.defer.Deferred}
166
173
        """
167
 
        dispatched = False
168
 
        try:
169
 
            self._callbacks_lock.acquire()
 
174
 
 
175
        def got_failure(failure, message, cb_name):
 
176
            self.warning("Exception happened during dispatch of %r to %s: %s",
 
177
                         message, cb_name, failure.getTraceback())
 
178
 
 
179
        def iterate():
 
180
            dispatched = False
170
181
            for callback, mfilter in self._callbacks.iteritems():
171
182
                if isinstance(message, mfilter):
172
183
                    try:
174
185
                                             callback.__name__)
175
186
                    except AttributeError:
176
187
                        cb_name = callback.__name__
177
 
                    self.debug("Dispatching message %r to %r", message, cb_name)
178
 
                    callback(message, sender)
 
188
                    self.debug("Dispatching message %r to %r", message,
 
189
                               cb_name)
179
190
                    dispatched = True
180
 
        finally:
181
 
            self._callbacks_lock.release()
182
 
 
183
 
        if not dispatched:
184
 
            self.debug("Undispatched message: %r", message)
185
 
        return dispatched
186
 
    
 
191
                    dfr = defer.maybeDeferred(callback, message, sender)
 
192
                    dfr.addErrback(got_failure, message, cb_name)
 
193
                    yield dfr
 
194
 
 
195
            if not dispatched:
 
196
                self.debug("Undispatched message: %r", message)
 
197
 
 
198
        return task.coiterate(iterate())
 
199
 
187
200
def bus_listener(bus, *message_types):
188
201
    """ Utility decorator to simply register a function or method on
189
202
    the message bus.