~free.ekanayaka/landscape-client/karmic-updates-1.4.4-0ubuntu0.9.10

« back to all changes in this revision

Viewing changes to landscape/broker/remote.py

  • Committer: Bazaar Package Importer
  • Author(s): Rick Clark
  • Date: 2008-09-08 16:35:57 UTC
  • mfrom: (1.1.1 upstream)
  • Revision ID: james.westby@ubuntu.com-20080908163557-l3ixzj5dxz37wnw2
Tags: 1.0.18-0ubuntu1
New upstream release 

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""A client for the service in L{landscape.broker.broker.BrokerDBusObject}."""
 
2
 
 
3
from twisted.internet.defer import execute, maybeDeferred, succeed
 
4
 
 
5
from dbus import DBusException
 
6
 
 
7
from landscape.broker.broker import BUS_NAME, OBJECT_PATH, IFACE_NAME
 
8
from landscape.lib.dbus_util import ServiceUnknownError, get_object, byte_array
 
9
from landscape.lib.bpickle import dumps
 
10
 
 
11
 
 
12
class RemoteBroker(object):
 
13
    """
 
14
    An object which knows how to talk to a remote BrokerDBusObject
 
15
    service over DBUS.
 
16
    """
 
17
 
 
18
    def __init__(self, bus, retry_timeout=None):
 
19
        self.bus = bus
 
20
        try:
 
21
            self.broker = get_object(bus, BUS_NAME, OBJECT_PATH,
 
22
                                     retry_timeout=retry_timeout)
 
23
        except DBusException, e:
 
24
            if str(e).startswith("org.freedesktop.DBus.Error.ServiceUnknown"):
 
25
                raise ServiceUnknownError()
 
26
 
 
27
    def connect_to_signal(self, *args, **kwargs):
 
28
        kwargs["dbus_interface"] = IFACE_NAME
 
29
        return self.broker.connect_to_signal(*args, **kwargs)
 
30
 
 
31
    def send_message(self, message, urgent=False):
 
32
        """Send a message to the message exchange service.
 
33
 
 
34
        @return: A deferred which will fire with the result of the send() call.
 
35
        """
 
36
        return self._perform_call("send_message",
 
37
                                  byte_array(dumps(message)), urgent)
 
38
 
 
39
    def reload_configuration(self):
 
40
        """Reload the broker configuration.
 
41
 
 
42
        @return: A deferred which will fire with the result of the
 
43
                 reload_configuration() call.
 
44
        """
 
45
        return self._perform_call("reload_configuration")
 
46
 
 
47
    def register(self, timeout=1):
 
48
        return self._perform_call("register", timeout=timeout)
 
49
 
 
50
    def get_accepted_message_types(self):
 
51
        return self._perform_call("get_accepted_message_types")
 
52
 
 
53
    def call_if_accepted(self, type, callable, *args):
 
54
        deferred_types = self.get_accepted_message_types()
 
55
        def got_accepted_types(result):
 
56
            if type in result:
 
57
                return callable(*args)
 
58
        deferred_types.addCallback(got_accepted_types)
 
59
        return deferred_types
 
60
 
 
61
    def is_message_pending(self, message_id):
 
62
        return self._perform_call("is_message_pending", message_id)
 
63
 
 
64
    def register_plugin(self, service_name, path):
 
65
        return self._perform_call("register_plugin", service_name, path)
 
66
 
 
67
    def get_registered_plugins(self):
 
68
        def convert(result):
 
69
            return [(str(service), str(path)) for service, path in result]
 
70
        result = self._perform_call("get_registered_plugins")
 
71
        return result.addCallback(convert)
 
72
 
 
73
    def exit(self):
 
74
        return self._perform_call("exit")
 
75
 
 
76
    def _perform_call(self, name, *args, **kwargs):
 
77
        method = getattr(self.broker, name)
 
78
        result = method(*args, **kwargs)
 
79
        return result
 
80
 
 
81
 
 
82
class FakeRemoteBroker(object):
 
83
    """Looks like L{RemoteBroker}, but actually talks to local objects."""
 
84
 
 
85
    def __init__(self, exchanger, message_store):
 
86
        self.exchanger = exchanger
 
87
        self.message_store = message_store
 
88
 
 
89
    def call_if_accepted(self, type, callable, *args):
 
90
        if type in self.message_store.get_accepted_types():
 
91
            return maybeDeferred(callable, *args)
 
92
        return succeed(None)
 
93
 
 
94
    def send_message(self, message, urgent=False):
 
95
        """Send to the previously given L{MessageExchange} object."""
 
96
        return execute(self.exchanger.send, message, urgent=urgent)
 
97
 
 
98
 
 
99
class DBusSignalToReactorTransmitter(object):
 
100
    """
 
101
    An object which broadcasts Landscape messages received via DBUS to the
 
102
    reactor. The event key is C{("message", message-type)}, and one argument,
 
103
    the message, will be passed.
 
104
 
 
105
    In addition, C{resynchronize} signals will be translated to
 
106
    C{resynchronize} reactor events.
 
107
    """
 
108
    def __init__(self, bus, reactor):
 
109
        self.bus = bus
 
110
        self.reactor = reactor
 
111
        bus.add_signal_receiver(self._broadcast_resynchronize, "resynchronize")
 
112
        bus.add_signal_receiver(self._broadcast_message_type_acceptance_changed,
 
113
                                "message_type_acceptance_changed")
 
114
 
 
115
 
 
116
    def _broadcast_resynchronize(self):
 
117
        # XXX This event should probably be renamed to something like
 
118
        # "clear data" since the only result of this event being fired
 
119
        # is that persist data is cleared out, no actual data uploads
 
120
        # are triggered by it.
 
121
        self.reactor.fire("resynchronize")
 
122
 
 
123
    def _broadcast_message_type_acceptance_changed(self, type, acceptance):
 
124
        self.reactor.fire(("message-type-acceptance-changed", type), acceptance)