1
"""A client for the service in L{landscape.broker.broker.BrokerDBusObject}."""
3
from twisted.internet.defer import execute, maybeDeferred, succeed
5
from dbus import DBusException
7
from landscape.broker.broker import BUS_NAME, OBJECT_PATH, IFACE_NAME
8
from landscape.lib.dbus_util import ServiceUnknownError, get_object, byte_array
9
from landscape.lib.bpickle import dumps
12
class RemoteBroker(object):
14
An object which knows how to talk to a remote BrokerDBusObject
18
def __init__(self, bus, retry_timeout=None):
21
self.broker = get_object(bus, BUS_NAME, OBJECT_PATH,
22
retry_timeout=retry_timeout)
23
except DBusException, e:
24
if str(e).startswith("org.freedesktop.DBus.Error.ServiceUnknown"):
25
raise ServiceUnknownError()
27
def connect_to_signal(self, *args, **kwargs):
28
kwargs["dbus_interface"] = IFACE_NAME
29
return self.broker.connect_to_signal(*args, **kwargs)
31
def send_message(self, message, urgent=False):
32
"""Send a message to the message exchange service.
34
@return: A deferred which will fire with the result of the send() call.
36
return self._perform_call("send_message",
37
byte_array(dumps(message)), urgent)
39
def reload_configuration(self):
40
"""Reload the broker configuration.
42
@return: A deferred which will fire with the result of the
43
reload_configuration() call.
45
return self._perform_call("reload_configuration")
47
def register(self, timeout=1):
48
return self._perform_call("register", timeout=timeout)
50
def get_accepted_message_types(self):
51
return self._perform_call("get_accepted_message_types")
53
def call_if_accepted(self, type, callable, *args):
54
deferred_types = self.get_accepted_message_types()
55
def got_accepted_types(result):
57
return callable(*args)
58
deferred_types.addCallback(got_accepted_types)
61
def is_message_pending(self, message_id):
62
return self._perform_call("is_message_pending", message_id)
64
def register_plugin(self, service_name, path):
65
return self._perform_call("register_plugin", service_name, path)
67
def get_registered_plugins(self):
69
return [(str(service), str(path)) for service, path in result]
70
result = self._perform_call("get_registered_plugins")
71
return result.addCallback(convert)
74
return self._perform_call("exit")
76
def _perform_call(self, name, *args, **kwargs):
77
method = getattr(self.broker, name)
78
result = method(*args, **kwargs)
82
class FakeRemoteBroker(object):
83
"""Looks like L{RemoteBroker}, but actually talks to local objects."""
85
def __init__(self, exchanger, message_store):
86
self.exchanger = exchanger
87
self.message_store = message_store
89
def call_if_accepted(self, type, callable, *args):
90
if type in self.message_store.get_accepted_types():
91
return maybeDeferred(callable, *args)
94
def send_message(self, message, urgent=False):
95
"""Send to the previously given L{MessageExchange} object."""
96
return execute(self.exchanger.send, message, urgent=urgent)
99
class DBusSignalToReactorTransmitter(object):
101
An object which broadcasts Landscape messages received via DBUS to the
102
reactor. The event key is C{("message", message-type)}, and one argument,
103
the message, will be passed.
105
In addition, C{resynchronize} signals will be translated to
106
C{resynchronize} reactor events.
108
def __init__(self, bus, reactor):
110
self.reactor = reactor
111
bus.add_signal_receiver(self._broadcast_resynchronize, "resynchronize")
112
bus.add_signal_receiver(self._broadcast_message_type_acceptance_changed,
113
"message_type_acceptance_changed")
116
def _broadcast_resynchronize(self):
117
# XXX This event should probably be renamed to something like
118
# "clear data" since the only result of this event being fired
119
# is that persist data is cleared out, no actual data uploads
120
# are triggered by it.
121
self.reactor.fire("resynchronize")
123
def _broadcast_message_type_acceptance_changed(self, type, acceptance):
124
self.reactor.fire(("message-type-acceptance-changed", type), acceptance)