~free.ekanayaka/landscape-client/jaunty-proposed

« back to all changes in this revision

Viewing changes to landscape/reactor.py

  • Committer: Bazaar Package Importer
  • Author(s): Free Ekanayaka
  • Date: 2009-09-21 17:59:31 UTC
  • mfrom: (1.2.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20090921175931-4ucv40j9ro26i3lm
Tags: 1.3.2.3-0ubuntu0.9.04.0
* New upstream release (LP: #347983):
  - Don't clear the hash_id_requests table upon resynchronize (LP #417122)
  - Include the README file in landscape-client (LP: #396260)
  - Fix client capturing stderr from run_command when constructing
    hash-id-databases url (LP: #397480)
  - Use substvars to conditionally depend on update-motd or
    libpam-modules (LP: #393454)
  - Fix reporting wrong version to the server (LP: #391225)
  - The init script does not wait for the network to be available
    before checking for EC2 user data (LP: #383336)
  - When the broker is restarted by the watchdog, the state of the client
    is inconsistent (LP: #380633)
  - Package stays unknown forever in the client with hash-id-databases
    support (LP: #381356)
  - Standard error not captured when calling smart-update (LP: #387441)
  - Changer calls reporter without switching groups, just user (LP: #388092)
  - Run smart update in the package-reporter instead of having a cronjob (LP: #362355)
  - Package changer does not inherit proxy settings (LP: #381241)
  - The ./test script doesn't work in landscape-client (LP: #381613)
  - The source package should build on all supported releases (LP: #385098)
  - Strip smart update's output (LP: #387331)
  - The fetch() timeout isn't based on activity (#389224)
  - Client can use a UUID of "None" when fetching the hash-id-database (LP: #381291)
  - Registration should use the fqdn rather than just the hostname (LP: #385730)

Show diffs side-by-side

added added

removed removed

Lines of Context:
23
23
 
24
24
 
25
25
class EventID(object):
 
26
    """Unique identifier for an event handler."""
26
27
 
27
28
    def __init__(self, event_type, pair):
 
29
        """
 
30
        @param event_type: Name of the event type handled by the handler.
 
31
        @param pair: Binary tuple C{(handler, priority)} holding the handler
 
32
            function and its priority.
 
33
        """
28
34
        self._event_type = event_type
29
35
        self._pair = pair
30
36
 
31
37
 
32
38
class EventHandlingReactorMixin(object):
 
39
    """Fire events identified by strings and register handlers for them."""
33
40
 
34
41
    def __init__(self):
35
42
        super(EventHandlingReactorMixin, self).__init__()
36
43
        self._event_handlers = {}
37
44
 
38
45
    def call_on(self, event_type, handler, priority=0):
 
46
        """Register an event handler.
 
47
 
 
48
        @param event_type: The name of the event type to handle.
 
49
        @param handler: The function handling the given event type.
 
50
        @param priority: The priority of the given handler function.
 
51
 
 
52
        @return: The L{EventID} of the registered handler.
 
53
        """
39
54
        pair = (handler, priority)
40
55
 
41
56
        handlers = self._event_handlers.setdefault(event_type, [])
45
60
        return EventID(event_type, pair)
46
61
 
47
62
    def fire(self, event_type, *args, **kwargs):
 
63
        """Fire an event of a given type.
 
64
 
 
65
        Call all handlers registered for the given C{event_type}, in order
 
66
        of priority.
 
67
 
 
68
        @param event_type: The name of the event type to fire.
 
69
        @param args: Positional arguments to pass to the registered handlers.
 
70
        @param kwargs: Keyword arguments to pass to the registered handlers.
 
71
        """
48
72
        logging.debug("Started firing %s.", event_type)
49
73
        results = []
50
74
        for handler, priority in self._event_handlers.get(event_type, ()):
68
92
        return results
69
93
 
70
94
    def cancel_call(self, id):
 
95
        """Unregister an event handler.
 
96
 
 
97
        @param id: the L{EventID} of the handler to unregister.
 
98
        """
71
99
        if type(id) is EventID:
72
100
            self._event_handlers[id._event_type].remove(id._pair)
73
101
        else:
75
103
 
76
104
 
77
105
class ThreadedCallsReactorMixin(object):
 
106
    """Schedule functions for execution in the main thread or in new ones."""
78
107
 
79
108
    def __init__(self):
80
109
        super(ThreadedCallsReactorMixin, self).__init__()
81
110
        self._threaded_callbacks = []
82
111
 
83
112
    def call_in_main(self, f, *args, **kwargs):
 
113
        """Schedule a function for execution in the main thread."""
84
114
        self._threaded_callbacks.append(lambda: f(*args, **kwargs))
85
115
 
86
116
    def call_in_thread(self, callback, errback, f, *args, **kwargs):
 
117
        """
 
118
        Execute a callable object in a new separate thread.
 
119
 
 
120
        @param callback: A function to call in case C{f} was successful, it
 
121
            will be passed the return value of C{f}.
 
122
        @param errback: A function to call in case C{f} raised an exception,
 
123
            it will be pass a C{(type, value, traceback)} tuple giving
 
124
            information about the raised exception (see L{sys.exc_info}).
 
125
 
 
126
        @note: Both C{callback} and C{errback} will be executed in the
 
127
            the parent thread.
 
128
        """
87
129
        thread.start_new_thread(self._in_thread,
88
130
                                (callback, errback, f, args, kwargs))
89
131
 
291
333
 
292
334
class TwistedReactor(EventHandlingReactorMixin,
293
335
                     ThreadedCallsReactorMixin):
 
336
    """Wrap and add functionalities to the Twisted C{reactor}."""
294
337
 
295
338
    def __init__(self):
296
339
        from twisted.internet import reactor
309
352
                call.cancel()
310
353
 
311
354
    def call_later(self, *args, **kwargs):
 
355
        """Call a function later.
 
356
 
 
357
        Simply call C{callLater(*args, **kwargs)} and return its result.
 
358
 
 
359
        @see: L{twisted.internet.interfaces.IReactorTime.callLater}.
 
360
 
 
361
        """
312
362
        return self._reactor.callLater(*args, **kwargs)
313
363
 
314
364
    def call_every(self, seconds, f, *args, **kwargs):
 
365
        """Call a function repeatedly.
 
366
 
 
367
        Create a new L{twisted.internet.task.LoopingCall} object and
 
368
        start it.
 
369
 
 
370
        @return: the created C{LoopingCall} object.
 
371
        """
315
372
        lc = self._LoopingCall(f, *args, **kwargs)
316
373
        lc.start(seconds, now=False)
317
374
        return lc
318
375
 
319
376
    def cancel_call(self, id):
 
377
        """Cancel a scheduled function or event handler.
 
378
 
 
379
        @param id: The function call or handler to remove. It can be an
 
380
            L{EventID}, a L{LoopingCall} or a C{IDelayedCall}, as returned
 
381
            by L{call_on}, L{call_every} and L{call_later} respectively.
 
382
        """
320
383
        if isinstance(id, EventID):
321
384
            return EventHandlingReactorMixin.cancel_call(self, id)
322
385
        if isinstance(id, self._LoopingCall):
325
388
            id.cancel()
326
389
 
327
390
    def call_in_main(self, f, *args, **kwargs):
 
391
        """Cause a function to be executed by the reactor thread.
 
392
 
 
393
        @param f: The callable object to execute.
 
394
        @param args: The arguments to call it with.
 
395
        @param kwargs: The keyword arguments to call it with.
 
396
 
 
397
        @see: L{twisted.internet.interfaces.IReactorThreads.callFromThread}
 
398
        """
328
399
        self._reactor.callFromThread(f, *args, **kwargs)
329
400
 
330
401
    def run(self):
 
402
        """Start the reactor, a C{"run"} event will be fired."""
 
403
 
331
404
        self.fire("run")
332
405
        self._reactor.run()
333
406
        self.fire("stop")
334
407
 
335
408
    def stop(self):
 
409
        """Stop the reactor, a C{"stop"} event will be fired."""
 
410
 
336
411
        self._reactor.crash()
337
412
        self._cleanup()
338
413
 
339
414
    def time(self):
 
415
        """Get current time.
 
416
 
 
417
        @see L{time.time}
 
418
        """
340
419
        return time.time()
341
420
 
342
421
 
343
422
    def listen_udp(self, port, protocol):
344
423
        """Connect the given protocol with a UDP transport.
345
424
 
346
 
        See L{twisted.internet.interfaces.IReactorUDP.listenUDP}.
 
425
        @see L{twisted.internet.interfaces.IReactorUDP.listenUDP}.
347
426
        """
348
427
        return self._reactor.listenUDP(port, protocol)
349
428
 
350
429
    def resolve(self, host):
351
430
        """Look up the IP of the given host.
352
431
 
353
 
        See L{twisted.internet.interfaces.IReactorCore.resolve}.
354
 
 
355
 
        @return: A Deferred resulting in the hostname.
 
432
        @return: A L{Deferred} resulting in the hostname.
 
433
 
 
434
        @see L{twisted.internet.interfaces.IReactorCore.resolve}.
 
435
 
356
436
        """
357
437
        return self._reactor.resolve(host)