1.1.1
by Rick Clark
Import upstream version 1.0.18 |
1 |
import thread |
2 |
import time |
|
3 |
import sys |
|
4 |
import logging |
|
5 |
import bisect |
|
6 |
import socket |
|
7 |
||
8 |
import gobject |
|
9 |
||
10 |
from twisted.test.proto_helpers import FakeDatagramTransport |
|
11 |
from twisted.internet.defer import succeed, fail |
|
12 |
from twisted.internet.error import DNSLookupError |
|
13 |
||
14 |
from landscape.log import format_object |
|
15 |
||
16 |
||
17 |
class InvalidID(Exception): |
|
18 |
"""Raised when an invalid ID is used with reactor.cancel_call()."""
|
|
19 |
||
20 |
||
21 |
class CallHookError(Exception): |
|
22 |
"""Raised when hooking on a reactor incorrectly."""
|
|
23 |
||
24 |
||
25 |
class EventID(object): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
26 |
"""Unique identifier for an event handler."""
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
27 |
|
28 |
def __init__(self, event_type, pair): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
29 |
"""
|
30 |
@param event_type: Name of the event type handled by the handler.
|
|
31 |
@param pair: Binary tuple C{(handler, priority)} holding the handler
|
|
32 |
function and its priority.
|
|
33 |
"""
|
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
34 |
self._event_type = event_type |
35 |
self._pair = pair |
|
36 |
||
37 |
||
38 |
class EventHandlingReactorMixin(object): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
39 |
"""Fire events identified by strings and register handlers for them."""
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
40 |
|
41 |
def __init__(self): |
|
42 |
super(EventHandlingReactorMixin, self).__init__() |
|
43 |
self._event_handlers = {} |
|
44 |
||
45 |
def call_on(self, event_type, handler, priority=0): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
46 |
"""Register an event handler.
|
47 |
||
48 |
@param event_type: The name of the event type to handle.
|
|
49 |
@param handler: The function handling the given event type.
|
|
50 |
@param priority: The priority of the given handler function.
|
|
51 |
||
52 |
@return: The L{EventID} of the registered handler.
|
|
53 |
"""
|
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
54 |
pair = (handler, priority) |
55 |
||
56 |
handlers = self._event_handlers.setdefault(event_type, []) |
|
57 |
handlers.append(pair) |
|
58 |
handlers.sort(key=lambda pair: pair[1]) |
|
59 |
||
60 |
return EventID(event_type, pair) |
|
61 |
||
62 |
def fire(self, event_type, *args, **kwargs): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
63 |
"""Fire an event of a given type.
|
64 |
||
65 |
Call all handlers registered for the given C{event_type}, in order
|
|
66 |
of priority.
|
|
67 |
||
68 |
@param event_type: The name of the event type to fire.
|
|
69 |
@param args: Positional arguments to pass to the registered handlers.
|
|
70 |
@param kwargs: Keyword arguments to pass to the registered handlers.
|
|
71 |
"""
|
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
72 |
logging.debug("Started firing %s.", event_type) |
73 |
results = [] |
|
74 |
for handler, priority in self._event_handlers.get(event_type, ()): |
|
75 |
try: |
|
76 |
logging.debug("Calling %s for %s with priority %d.", |
|
77 |
format_object(handler), event_type, priority) |
|
78 |
results.append(handler(*args, **kwargs)) |
|
79 |
except KeyboardInterrupt: |
|
80 |
logging.exception("Keyboard interrupt while running event " |
|
81 |
"handler %s for event type %r with " |
|
82 |
"args %r %r.", format_object(handler), |
|
83 |
event_type, args, kwargs) |
|
84 |
self.stop() |
|
85 |
raise
|
|
86 |
except: |
|
87 |
logging.exception("Error running event handler %s for " |
|
88 |
"event type %r with args %r %r.", |
|
89 |
format_object(handler), event_type, |
|
90 |
args, kwargs) |
|
91 |
logging.debug("Finished firing %s.", event_type) |
|
92 |
return results |
|
93 |
||
94 |
def cancel_call(self, id): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
95 |
"""Unregister an event handler.
|
96 |
||
97 |
@param id: the L{EventID} of the handler to unregister.
|
|
98 |
"""
|
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
99 |
if type(id) is EventID: |
100 |
self._event_handlers[id._event_type].remove(id._pair) |
|
101 |
else: |
|
102 |
raise InvalidID("EventID instance expected, received %r" % id) |
|
103 |
||
104 |
||
105 |
class ThreadedCallsReactorMixin(object): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
106 |
"""Schedule functions for execution in the main thread or in new ones."""
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
107 |
|
108 |
def __init__(self): |
|
109 |
super(ThreadedCallsReactorMixin, self).__init__() |
|
110 |
self._threaded_callbacks = [] |
|
111 |
||
112 |
def call_in_main(self, f, *args, **kwargs): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
113 |
"""Schedule a function for execution in the main thread."""
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
114 |
self._threaded_callbacks.append(lambda: f(*args, **kwargs)) |
115 |
||
116 |
def call_in_thread(self, callback, errback, f, *args, **kwargs): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
117 |
"""
|
118 |
Execute a callable object in a new separate thread.
|
|
119 |
||
120 |
@param callback: A function to call in case C{f} was successful, it
|
|
121 |
will be passed the return value of C{f}.
|
|
122 |
@param errback: A function to call in case C{f} raised an exception,
|
|
123 |
it will be pass a C{(type, value, traceback)} tuple giving
|
|
124 |
information about the raised exception (see L{sys.exc_info}).
|
|
125 |
||
126 |
@note: Both C{callback} and C{errback} will be executed in the
|
|
127 |
the parent thread.
|
|
128 |
"""
|
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
129 |
thread.start_new_thread(self._in_thread, |
130 |
(callback, errback, f, args, kwargs)) |
|
131 |
||
132 |
def _in_thread(self, callback, errback, f, args, kwargs): |
|
133 |
try: |
|
134 |
result = f(*args, **kwargs) |
|
135 |
except Exception, e: |
|
136 |
exc_info = sys.exc_info() |
|
137 |
if errback is None: |
|
138 |
self.call_in_main(logging.error, e, exc_info=exc_info) |
|
139 |
else: |
|
140 |
self.call_in_main(errback, *exc_info) |
|
141 |
else: |
|
142 |
if callback: |
|
143 |
self.call_in_main(callback, result) |
|
144 |
||
145 |
def _run_threaded_callbacks(self): |
|
146 |
while self._threaded_callbacks: |
|
147 |
try: |
|
148 |
self._threaded_callbacks.pop(0)() |
|
149 |
except Exception, e: |
|
150 |
logging.exception(e) |
|
151 |
||
152 |
def _hook_threaded_callbacks(self): |
|
153 |
id = self.call_every(0.5, self._run_threaded_callbacks) |
|
154 |
self._run_threaded_callbacks_id = id |
|
155 |
||
156 |
def _unhook_threaded_callbacks(self): |
|
157 |
self.cancel_call(self._run_threaded_callbacks_id) |
|
158 |
||
159 |
||
160 |
class ReactorID(object): |
|
161 |
||
162 |
def __init__(self, timeout): |
|
163 |
self._timeout = timeout |
|
164 |
||
165 |
||
166 |
class Reactor(EventHandlingReactorMixin, |
|
167 |
ThreadedCallsReactorMixin): |
|
168 |
||
169 |
def __init__(self): |
|
170 |
super(Reactor, self).__init__() |
|
171 |
self._context = gobject.MainContext() |
|
172 |
self._mainloop = gobject.MainLoop(context=self._context) |
|
173 |
||
174 |
def call_later(self, timeout, function, *args, **kwargs): |
|
175 |
def fake_function(): |
|
176 |
function(*args, **kwargs) |
|
177 |
return False |
|
178 |
timeout = gobject.Timeout(int(timeout*1000)) |
|
179 |
timeout.set_callback(fake_function) |
|
180 |
timeout.attach(self._context) |
|
181 |
return ReactorID(timeout) |
|
182 |
||
183 |
def cancel_call(self, id): |
|
184 |
if type(id) is ReactorID: |
|
185 |
id._timeout.destroy() |
|
186 |
else: |
|
187 |
super(Reactor, self).cancel_call(id) |
|
188 |
||
189 |
def call_every(self, timeout, function, *args, **kwargs): |
|
190 |
def fake_function(): |
|
191 |
function(*args, **kwargs) |
|
192 |
return True |
|
193 |
timeout = gobject.Timeout(int(timeout*1000)) |
|
194 |
timeout.set_callback(fake_function) |
|
195 |
timeout.attach(self._context) |
|
196 |
return ReactorID(timeout) |
|
197 |
||
198 |
def run(self): |
|
199 |
self.fire("run") |
|
200 |
self._hook_threaded_callbacks() |
|
201 |
self._mainloop.run() |
|
202 |
self._unhook_threaded_callbacks() |
|
203 |
self.fire("stop") |
|
204 |
||
205 |
def stop(self): |
|
206 |
self._mainloop.quit() |
|
207 |
||
208 |
||
209 |
class FakeReactorID(object): |
|
210 |
||
211 |
def __init__(self, data): |
|
212 |
self.active = True |
|
213 |
self._data = data |
|
214 |
||
215 |
||
216 |
class FakeReactor(EventHandlingReactorMixin, |
|
217 |
ThreadedCallsReactorMixin): |
|
218 |
"""
|
|
219 |
@ivar udp_transports: dict of {port: (protocol, transport)}
|
|
220 |
@ivar hosts: Dict of {hostname: ip}. Users should populate this
|
|
221 |
and L{resolve} will use it.
|
|
222 |
"""
|
|
223 |
def __init__(self): |
|
224 |
super(FakeReactor, self).__init__() |
|
225 |
self._current_time = 0 |
|
226 |
self._calls = [] |
|
227 |
self.udp_transports = {} |
|
228 |
self.hosts = {} |
|
229 |
||
230 |
def time(self): |
|
231 |
return float(self._current_time) |
|
232 |
||
233 |
def call_later(self, seconds, f, *args, **kwargs): |
|
234 |
scheduled_time = self._current_time + seconds |
|
235 |
call = (scheduled_time, f, args, kwargs) |
|
236 |
bisect.insort_left(self._calls, call) |
|
237 |
return FakeReactorID(call) |
|
238 |
||
239 |
def cancel_call(self, id): |
|
240 |
if type(id) is FakeReactorID: |
|
241 |
if id._data in self._calls: |
|
242 |
self._calls.remove(id._data) |
|
243 |
id.active = False |
|
244 |
else: |
|
245 |
super(FakeReactor, self).cancel_call(id) |
|
246 |
||
247 |
def call_every(self, seconds, f, *args, **kwargs): |
|
248 |
def fake(): |
|
249 |
# update the call so that cancellation will continue
|
|
250 |
# working with the same ID. And do it *before* the call
|
|
251 |
# because the call might cancel it!
|
|
252 |
call._data = self.call_later(seconds, fake)._data |
|
253 |
try: |
|
254 |
f(*args, **kwargs) |
|
255 |
except: |
|
256 |
if call.active: |
|
257 |
self.cancel_call(call) |
|
258 |
raise
|
|
259 |
call = self.call_later(seconds, fake) |
|
260 |
return call |
|
261 |
||
262 |
def call_in_thread(self, callback, errback, f, *args, **kwargs): |
|
263 |
self._in_thread(callback, errback, f, args, kwargs) |
|
264 |
||
265 |
# Running threaded callbacks here doesn't reflect reality, since
|
|
266 |
# they're usually run while the main reactor loop is active.
|
|
267 |
# At the same time, this is convenient as it means we don't need
|
|
268 |
# to run the the reactor with all registered handlers to test for
|
|
269 |
# actions performed on completion of specific events (e.g. firing
|
|
270 |
# exchange will fire exchange-done when ready). IOW, it's easier
|
|
271 |
# to test things synchronously.
|
|
272 |
self._run_threaded_callbacks() |
|
273 |
||
274 |
def advance(self, seconds): |
|
275 |
"""Advance this reactor C{seconds} into the future.
|
|
276 |
||
277 |
This is the preferred method for advancing time in your unit tests.
|
|
278 |
"""
|
|
279 |
while (self._calls and self._calls[0][0] |
|
280 |
<= self._current_time + seconds): |
|
281 |
call = self._calls.pop(0) |
|
282 |
# If we find a call within the time we're advancing,
|
|
283 |
# before calling it, let's advance the time *just* to
|
|
284 |
# when that call is expecting to be run, so that if it
|
|
285 |
# schedules any calls itself they will be relative to
|
|
286 |
# the correct time.
|
|
287 |
seconds -= call[0] - self._current_time |
|
288 |
self._current_time = call[0] |
|
289 |
try: |
|
290 |
call[1](*call[2], **call[3]) |
|
291 |
except Exception, e: |
|
292 |
logging.exception(e) |
|
293 |
self._current_time += seconds |
|
294 |
||
295 |
def run(self): |
|
296 |
"""Continuously advance this reactor until reactor.stop() is called."""
|
|
297 |
self.fire("run") |
|
298 |
self._running = True |
|
299 |
while self._running: |
|
300 |
self.advance(self._calls[0][0]) |
|
301 |
self.fire("stop") |
|
302 |
||
303 |
def stop(self): |
|
304 |
self._running = False |
|
305 |
||
306 |
def listen_udp(self, port, protocol): |
|
307 |
"""
|
|
308 |
Connect the given protocol with a fake transport, and keep the
|
|
309 |
transport in C{self.udp_transports}.
|
|
310 |
"""
|
|
311 |
transport = FakeDatagramTransport() |
|
312 |
self.udp_transports[port] = (protocol, transport) |
|
313 |
protocol.makeConnection(transport) |
|
314 |
||
315 |
||
316 |
def resolve(self, hostname): |
|
317 |
"""Look up the hostname in C{self.hosts}.
|
|
318 |
||
319 |
@return: A Deferred resulting in the IP address.
|
|
320 |
"""
|
|
321 |
try: |
|
322 |
# is it an IP address?
|
|
323 |
socket.inet_aton(hostname) |
|
324 |
except socket.error: # no |
|
325 |
if hostname in self.hosts: |
|
326 |
return succeed(self.hosts[hostname]) |
|
327 |
else: |
|
328 |
return fail(DNSLookupError(hostname)) |
|
329 |
else: # yes |
|
330 |
return succeed(hostname) |
|
331 |
||
332 |
||
333 |
||
334 |
class TwistedReactor(EventHandlingReactorMixin, |
|
335 |
ThreadedCallsReactorMixin): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
336 |
"""Wrap and add functionalities to the Twisted C{reactor}."""
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
337 |
|
338 |
def __init__(self): |
|
339 |
from twisted.internet import reactor |
|
340 |
from twisted.internet.task import LoopingCall |
|
341 |
self._LoopingCall = LoopingCall |
|
342 |
self._reactor = reactor |
|
343 |
self._cleanup() |
|
344 |
||
345 |
super(TwistedReactor, self).__init__() |
|
346 |
||
347 |
def _cleanup(self): |
|
348 |
# Since the reactor is global, we should clean it up when we
|
|
349 |
# initialize one of our wrappers.
|
|
350 |
for call in self._reactor.getDelayedCalls(): |
|
351 |
if call.active(): |
|
352 |
call.cancel() |
|
353 |
||
354 |
def call_later(self, *args, **kwargs): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
355 |
"""Call a function later.
|
356 |
||
357 |
Simply call C{callLater(*args, **kwargs)} and return its result.
|
|
358 |
||
359 |
@see: L{twisted.internet.interfaces.IReactorTime.callLater}.
|
|
360 |
||
361 |
"""
|
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
362 |
return self._reactor.callLater(*args, **kwargs) |
363 |
||
364 |
def call_every(self, seconds, f, *args, **kwargs): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
365 |
"""Call a function repeatedly.
|
366 |
||
367 |
Create a new L{twisted.internet.task.LoopingCall} object and
|
|
368 |
start it.
|
|
369 |
||
370 |
@return: the created C{LoopingCall} object.
|
|
371 |
"""
|
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
372 |
lc = self._LoopingCall(f, *args, **kwargs) |
373 |
lc.start(seconds, now=False) |
|
374 |
return lc |
|
375 |
||
376 |
def cancel_call(self, id): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
377 |
"""Cancel a scheduled function or event handler.
|
378 |
||
379 |
@param id: The function call or handler to remove. It can be an
|
|
380 |
L{EventID}, a L{LoopingCall} or a C{IDelayedCall}, as returned
|
|
381 |
by L{call_on}, L{call_every} and L{call_later} respectively.
|
|
382 |
"""
|
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
383 |
if isinstance(id, EventID): |
384 |
return EventHandlingReactorMixin.cancel_call(self, id) |
|
385 |
if isinstance(id, self._LoopingCall): |
|
386 |
return id.stop() |
|
387 |
if id.active(): |
|
388 |
id.cancel() |
|
389 |
||
390 |
def call_in_main(self, f, *args, **kwargs): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
391 |
"""Cause a function to be executed by the reactor thread.
|
392 |
||
393 |
@param f: The callable object to execute.
|
|
394 |
@param args: The arguments to call it with.
|
|
395 |
@param kwargs: The keyword arguments to call it with.
|
|
396 |
||
397 |
@see: L{twisted.internet.interfaces.IReactorThreads.callFromThread}
|
|
398 |
"""
|
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
399 |
self._reactor.callFromThread(f, *args, **kwargs) |
400 |
||
401 |
def run(self): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
402 |
"""Start the reactor, a C{"run"} event will be fired."""
|
403 |
||
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
404 |
self.fire("run") |
405 |
self._reactor.run() |
|
406 |
self.fire("stop") |
|
407 |
||
408 |
def stop(self): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
409 |
"""Stop the reactor, a C{"stop"} event will be fired."""
|
410 |
||
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
411 |
self._reactor.crash() |
412 |
self._cleanup() |
|
413 |
||
414 |
def time(self): |
|
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
415 |
"""Get current time.
|
416 |
||
417 |
@see L{time.time}
|
|
418 |
"""
|
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
419 |
return time.time() |
420 |
||
421 |
||
422 |
def listen_udp(self, port, protocol): |
|
423 |
"""Connect the given protocol with a UDP transport.
|
|
424 |
||
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
425 |
@see L{twisted.internet.interfaces.IReactorUDP.listenUDP}.
|
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
426 |
"""
|
427 |
return self._reactor.listenUDP(port, protocol) |
|
428 |
||
429 |
def resolve(self, host): |
|
430 |
"""Look up the IP of the given host.
|
|
431 |
||
1.2.1
by Free Ekanayaka
Import upstream version 1.3.2.3 |
432 |
@return: A L{Deferred} resulting in the hostname.
|
433 |
||
434 |
@see L{twisted.internet.interfaces.IReactorCore.resolve}.
|
|
435 |
||
1.1.1
by Rick Clark
Import upstream version 1.0.18 |
436 |
"""
|
437 |
return self._reactor.resolve(host) |