~free.ekanayaka/landscape-client/jaunty-proposed

1.1.1 by Rick Clark
Import upstream version 1.0.18
1
import thread
2
import time
3
import sys
4
import logging
5
import bisect
6
import socket
7
8
import gobject
9
10
from twisted.test.proto_helpers import FakeDatagramTransport
11
from twisted.internet.defer import succeed, fail
12
from twisted.internet.error import DNSLookupError
13
14
from landscape.log import format_object
15
16
17
class InvalidID(Exception):
18
    """Raised when an invalid ID is used with reactor.cancel_call()."""
19
20
21
class CallHookError(Exception):
22
    """Raised when hooking on a reactor incorrectly."""
23
24
25
class EventID(object):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
26
    """Unique identifier for an event handler."""
1.1.1 by Rick Clark
Import upstream version 1.0.18
27
28
    def __init__(self, event_type, pair):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
29
        """
30
        @param event_type: Name of the event type handled by the handler.
31
        @param pair: Binary tuple C{(handler, priority)} holding the handler
32
            function and its priority.
33
        """
1.1.1 by Rick Clark
Import upstream version 1.0.18
34
        self._event_type = event_type
35
        self._pair = pair
36
37
38
class EventHandlingReactorMixin(object):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
39
    """Fire events identified by strings and register handlers for them."""
1.1.1 by Rick Clark
Import upstream version 1.0.18
40
41
    def __init__(self):
42
        super(EventHandlingReactorMixin, self).__init__()
43
        self._event_handlers = {}
44
45
    def call_on(self, event_type, handler, priority=0):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
46
        """Register an event handler.
47
48
        @param event_type: The name of the event type to handle.
49
        @param handler: The function handling the given event type.
50
        @param priority: The priority of the given handler function.
51
52
        @return: The L{EventID} of the registered handler.
53
        """
1.1.1 by Rick Clark
Import upstream version 1.0.18
54
        pair = (handler, priority)
55
56
        handlers = self._event_handlers.setdefault(event_type, [])
57
        handlers.append(pair)
58
        handlers.sort(key=lambda pair: pair[1])
59
60
        return EventID(event_type, pair)
61
62
    def fire(self, event_type, *args, **kwargs):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
63
        """Fire an event of a given type.
64
65
        Call all handlers registered for the given C{event_type}, in order
66
        of priority.
67
68
        @param event_type: The name of the event type to fire.
69
        @param args: Positional arguments to pass to the registered handlers.
70
        @param kwargs: Keyword arguments to pass to the registered handlers.
71
        """
1.1.1 by Rick Clark
Import upstream version 1.0.18
72
        logging.debug("Started firing %s.", event_type)
73
        results = []
74
        for handler, priority in self._event_handlers.get(event_type, ()):
75
            try:
76
                logging.debug("Calling %s for %s with priority %d.",
77
                              format_object(handler), event_type, priority)
78
                results.append(handler(*args, **kwargs))
79
            except KeyboardInterrupt:
80
                logging.exception("Keyboard interrupt while running event "
81
                                  "handler %s for event type %r with "
82
                                  "args %r %r.", format_object(handler),
83
                                  event_type, args, kwargs)
84
                self.stop()
85
                raise
86
            except:
87
                logging.exception("Error running event handler %s for "
88
                                  "event type %r with args %r %r.",
89
                                  format_object(handler), event_type,
90
                                  args, kwargs)
91
        logging.debug("Finished firing %s.", event_type)
92
        return results
93
94
    def cancel_call(self, id):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
95
        """Unregister an event handler.
96
97
        @param id: the L{EventID} of the handler to unregister.
98
        """
1.1.1 by Rick Clark
Import upstream version 1.0.18
99
        if type(id) is EventID:
100
            self._event_handlers[id._event_type].remove(id._pair)
101
        else:
102
            raise InvalidID("EventID instance expected, received %r" % id)
103
104
105
class ThreadedCallsReactorMixin(object):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
106
    """Schedule functions for execution in the main thread or in new ones."""
1.1.1 by Rick Clark
Import upstream version 1.0.18
107
108
    def __init__(self):
109
        super(ThreadedCallsReactorMixin, self).__init__()
110
        self._threaded_callbacks = []
111
112
    def call_in_main(self, f, *args, **kwargs):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
113
        """Schedule a function for execution in the main thread."""
1.1.1 by Rick Clark
Import upstream version 1.0.18
114
        self._threaded_callbacks.append(lambda: f(*args, **kwargs))
115
116
    def call_in_thread(self, callback, errback, f, *args, **kwargs):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
117
        """
118
        Execute a callable object in a new separate thread.
119
120
        @param callback: A function to call in case C{f} was successful, it
121
            will be passed the return value of C{f}.
122
        @param errback: A function to call in case C{f} raised an exception,
123
            it will be pass a C{(type, value, traceback)} tuple giving
124
            information about the raised exception (see L{sys.exc_info}).
125
126
        @note: Both C{callback} and C{errback} will be executed in the
127
            the parent thread.
128
        """
1.1.1 by Rick Clark
Import upstream version 1.0.18
129
        thread.start_new_thread(self._in_thread,
130
                                (callback, errback, f, args, kwargs))
131
132
    def _in_thread(self, callback, errback, f, args, kwargs):
133
        try:
134
            result = f(*args, **kwargs)
135
        except Exception, e:
136
            exc_info = sys.exc_info()
137
            if errback is None:
138
                self.call_in_main(logging.error, e, exc_info=exc_info)
139
            else:
140
                self.call_in_main(errback, *exc_info)
141
        else:
142
            if callback:
143
                self.call_in_main(callback, result)
144
145
    def _run_threaded_callbacks(self):
146
        while self._threaded_callbacks:
147
            try:
148
                self._threaded_callbacks.pop(0)()
149
            except Exception, e:
150
                logging.exception(e)
151
152
    def _hook_threaded_callbacks(self):
153
        id = self.call_every(0.5, self._run_threaded_callbacks)
154
        self._run_threaded_callbacks_id = id
155
156
    def _unhook_threaded_callbacks(self):
157
        self.cancel_call(self._run_threaded_callbacks_id)
158
159
160
class ReactorID(object):
161
162
    def __init__(self, timeout):
163
        self._timeout = timeout
164
165
166
class Reactor(EventHandlingReactorMixin,
167
              ThreadedCallsReactorMixin):
168
169
    def __init__(self):
170
        super(Reactor, self).__init__()
171
        self._context = gobject.MainContext()
172
        self._mainloop = gobject.MainLoop(context=self._context)
173
174
    def call_later(self, timeout, function, *args, **kwargs):
175
        def fake_function():
176
            function(*args, **kwargs)
177
            return False
178
        timeout = gobject.Timeout(int(timeout*1000))
179
        timeout.set_callback(fake_function)
180
        timeout.attach(self._context)
181
        return ReactorID(timeout)
182
183
    def cancel_call(self, id):
184
        if type(id) is ReactorID:
185
            id._timeout.destroy()
186
        else:
187
            super(Reactor, self).cancel_call(id)
188
189
    def call_every(self, timeout, function, *args, **kwargs):
190
        def fake_function():
191
            function(*args, **kwargs)
192
            return True
193
        timeout = gobject.Timeout(int(timeout*1000))
194
        timeout.set_callback(fake_function)
195
        timeout.attach(self._context)
196
        return ReactorID(timeout)
197
198
    def run(self):
199
        self.fire("run")
200
        self._hook_threaded_callbacks()
201
        self._mainloop.run()
202
        self._unhook_threaded_callbacks()
203
        self.fire("stop")
204
205
    def stop(self):
206
        self._mainloop.quit()
207
208
209
class FakeReactorID(object):
210
211
    def __init__(self, data):
212
        self.active = True
213
        self._data = data
214
215
216
class FakeReactor(EventHandlingReactorMixin,
217
                  ThreadedCallsReactorMixin):
218
    """
219
    @ivar udp_transports: dict of {port: (protocol, transport)}
220
    @ivar hosts: Dict of {hostname: ip}. Users should populate this
221
        and L{resolve} will use it.
222
    """
223
    def __init__(self):
224
        super(FakeReactor, self).__init__()
225
        self._current_time = 0
226
        self._calls = []
227
        self.udp_transports = {}
228
        self.hosts = {}
229
230
    def time(self):
231
        return float(self._current_time)
232
233
    def call_later(self, seconds, f, *args, **kwargs):
234
        scheduled_time = self._current_time + seconds
235
        call = (scheduled_time, f, args, kwargs)
236
        bisect.insort_left(self._calls, call)
237
        return FakeReactorID(call)
238
239
    def cancel_call(self, id):
240
        if type(id) is FakeReactorID:
241
            if id._data in self._calls:
242
                self._calls.remove(id._data)
243
            id.active = False
244
        else:
245
            super(FakeReactor, self).cancel_call(id)
246
247
    def call_every(self, seconds, f, *args, **kwargs):
248
        def fake():
249
            # update the call so that cancellation will continue
250
            # working with the same ID. And do it *before* the call
251
            # because the call might cancel it!
252
            call._data = self.call_later(seconds, fake)._data
253
            try:
254
                f(*args, **kwargs)
255
            except:
256
                if call.active:
257
                    self.cancel_call(call)
258
                raise
259
        call = self.call_later(seconds, fake)
260
        return call
261
262
    def call_in_thread(self, callback, errback, f, *args, **kwargs):
263
        self._in_thread(callback, errback, f, args, kwargs)
264
265
        # Running threaded callbacks here doesn't reflect reality, since
266
        # they're usually run while the main reactor loop is active.
267
        # At the same time, this is convenient as it means we don't need
268
        # to run the the reactor with all registered handlers to test for
269
        # actions performed on completion of specific events (e.g. firing
270
        # exchange will fire exchange-done when ready). IOW, it's easier
271
        # to test things synchronously.
272
        self._run_threaded_callbacks()
273
274
    def advance(self, seconds):
275
        """Advance this reactor C{seconds} into the future.
276
277
        This is the preferred method for advancing time in your unit tests.
278
        """
279
        while (self._calls and self._calls[0][0]
280
               <= self._current_time + seconds):
281
            call = self._calls.pop(0)
282
            # If we find a call within the time we're advancing,
283
            # before calling it, let's advance the time *just* to
284
            # when that call is expecting to be run, so that if it
285
            # schedules any calls itself they will be relative to
286
            # the correct time.
287
            seconds -= call[0] - self._current_time
288
            self._current_time = call[0]
289
            try:
290
                call[1](*call[2], **call[3])
291
            except Exception, e:
292
                logging.exception(e)
293
        self._current_time += seconds
294
295
    def run(self):
296
        """Continuously advance this reactor until reactor.stop() is called."""
297
        self.fire("run")
298
        self._running = True
299
        while self._running:
300
            self.advance(self._calls[0][0])
301
        self.fire("stop")
302
303
    def stop(self):
304
        self._running = False
305
306
    def listen_udp(self, port, protocol):
307
        """
308
        Connect the given protocol with a fake transport, and keep the
309
        transport in C{self.udp_transports}.
310
        """
311
        transport = FakeDatagramTransport()
312
        self.udp_transports[port] = (protocol, transport)
313
        protocol.makeConnection(transport)
314
315
316
    def resolve(self, hostname):
317
        """Look up the hostname in C{self.hosts}.
318
319
        @return: A Deferred resulting in the IP address.
320
        """
321
        try:
322
            # is it an IP address?
323
            socket.inet_aton(hostname)
324
        except socket.error: # no
325
            if hostname in self.hosts:
326
                return succeed(self.hosts[hostname])
327
            else:
328
                return fail(DNSLookupError(hostname))
329
        else: # yes
330
            return succeed(hostname)
331
332
333
334
class TwistedReactor(EventHandlingReactorMixin,
335
                     ThreadedCallsReactorMixin):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
336
    """Wrap and add functionalities to the Twisted C{reactor}."""
1.1.1 by Rick Clark
Import upstream version 1.0.18
337
338
    def __init__(self):
339
        from twisted.internet import reactor
340
        from twisted.internet.task import LoopingCall
341
        self._LoopingCall = LoopingCall
342
        self._reactor = reactor
343
        self._cleanup()
344
345
        super(TwistedReactor, self).__init__()
346
347
    def _cleanup(self):
348
        # Since the reactor is global, we should clean it up when we
349
        # initialize one of our wrappers.
350
        for call in self._reactor.getDelayedCalls():
351
            if call.active():
352
                call.cancel()
353
354
    def call_later(self, *args, **kwargs):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
355
        """Call a function later.
356
357
        Simply call C{callLater(*args, **kwargs)} and return its result.
358
359
        @see: L{twisted.internet.interfaces.IReactorTime.callLater}.
360
361
        """
1.1.1 by Rick Clark
Import upstream version 1.0.18
362
        return self._reactor.callLater(*args, **kwargs)
363
364
    def call_every(self, seconds, f, *args, **kwargs):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
365
        """Call a function repeatedly.
366
367
        Create a new L{twisted.internet.task.LoopingCall} object and
368
        start it.
369
370
        @return: the created C{LoopingCall} object.
371
        """
1.1.1 by Rick Clark
Import upstream version 1.0.18
372
        lc = self._LoopingCall(f, *args, **kwargs)
373
        lc.start(seconds, now=False)
374
        return lc
375
376
    def cancel_call(self, id):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
377
        """Cancel a scheduled function or event handler.
378
379
        @param id: The function call or handler to remove. It can be an
380
            L{EventID}, a L{LoopingCall} or a C{IDelayedCall}, as returned
381
            by L{call_on}, L{call_every} and L{call_later} respectively.
382
        """
1.1.1 by Rick Clark
Import upstream version 1.0.18
383
        if isinstance(id, EventID):
384
            return EventHandlingReactorMixin.cancel_call(self, id)
385
        if isinstance(id, self._LoopingCall):
386
            return id.stop()
387
        if id.active():
388
            id.cancel()
389
390
    def call_in_main(self, f, *args, **kwargs):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
391
        """Cause a function to be executed by the reactor thread.
392
393
        @param f: The callable object to execute.
394
        @param args: The arguments to call it with.
395
        @param kwargs: The keyword arguments to call it with.
396
397
        @see: L{twisted.internet.interfaces.IReactorThreads.callFromThread}
398
        """
1.1.1 by Rick Clark
Import upstream version 1.0.18
399
        self._reactor.callFromThread(f, *args, **kwargs)
400
401
    def run(self):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
402
        """Start the reactor, a C{"run"} event will be fired."""
403
1.1.1 by Rick Clark
Import upstream version 1.0.18
404
        self.fire("run")
405
        self._reactor.run()
406
        self.fire("stop")
407
408
    def stop(self):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
409
        """Stop the reactor, a C{"stop"} event will be fired."""
410
1.1.1 by Rick Clark
Import upstream version 1.0.18
411
        self._reactor.crash()
412
        self._cleanup()
413
414
    def time(self):
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
415
        """Get current time.
416
417
        @see L{time.time}
418
        """
1.1.1 by Rick Clark
Import upstream version 1.0.18
419
        return time.time()
420
421
422
    def listen_udp(self, port, protocol):
423
        """Connect the given protocol with a UDP transport.
424
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
425
        @see L{twisted.internet.interfaces.IReactorUDP.listenUDP}.
1.1.1 by Rick Clark
Import upstream version 1.0.18
426
        """
427
        return self._reactor.listenUDP(port, protocol)
428
429
    def resolve(self, host):
430
        """Look up the IP of the given host.
431
1.2.1 by Free Ekanayaka
Import upstream version 1.3.2.3
432
        @return: A L{Deferred} resulting in the hostname.
433
434
        @see L{twisted.internet.interfaces.IReactorCore.resolve}.
435
1.1.1 by Rick Clark
Import upstream version 1.0.18
436
        """
437
        return self._reactor.resolve(host)