1
"""A client for the service in L{landscape.broker.broker.BrokerDBusObject}."""
3
from twisted.internet.defer import execute, maybeDeferred, succeed
5
from dbus import DBusException
7
from landscape.broker.broker import BUS_NAME, OBJECT_PATH, IFACE_NAME
8
from landscape.lib.dbus_util import ServiceUnknownError, get_object, byte_array
9
from landscape.lib.bpickle import dumps
12
class RemoteBroker(object):
14
An object which knows how to talk to a remote BrokerDBusObject
18
def __init__(self, bus, retry_timeout=None):
21
self.broker = get_object(bus, BUS_NAME, OBJECT_PATH,
22
retry_timeout=retry_timeout)
23
except DBusException, e:
24
if str(e).startswith("org.freedesktop.DBus.Error.ServiceUnknown"):
25
raise ServiceUnknownError()
27
def connect_to_signal(self, *args, **kwargs):
28
kwargs["dbus_interface"] = IFACE_NAME
29
return self.broker.connect_to_signal(*args, **kwargs)
31
def send_message(self, message, urgent=False):
32
"""Send a message to the message exchange service.
34
@return: A deferred which will fire with the result of the send() call.
36
return self._perform_call("send_message",
37
byte_array(dumps(message)), urgent)
39
def fire_event(self, event_type):
40
"""Fire an event in the broker reactor."""
41
return self._perform_call("fire_event", event_type)
43
def reload_configuration(self):
44
"""Reload the broker configuration.
46
@return: A deferred which will fire with the result of the
47
reload_configuration() call.
49
return self._perform_call("reload_configuration")
51
def register(self, timeout=1):
52
return self._perform_call("register", timeout=timeout)
54
def get_accepted_message_types(self):
55
return self._perform_call("get_accepted_message_types")
57
def call_if_accepted(self, type, callable, *args):
58
deferred_types = self.get_accepted_message_types()
59
def got_accepted_types(result):
61
return callable(*args)
62
deferred_types.addCallback(got_accepted_types)
65
def register_client_accepted_message_type(self, type):
66
return self._perform_call("register_client_accepted_message_type",
69
def is_message_pending(self, message_id):
70
return self._perform_call("is_message_pending", message_id)
72
def register_plugin(self, service_name, path):
73
return self._perform_call("register_plugin", service_name, path)
75
def get_registered_plugins(self):
77
return [(str(service), str(path)) for service, path in result]
78
result = self._perform_call("get_registered_plugins")
79
return result.addCallback(convert)
81
def get_server_uuid(self):
82
# DBus doesn't like Nones, so we transfer them as empty strings.
83
def empty_string_to_none(uuid):
85
result = self._perform_call("get_server_uuid")
86
result.addCallback(empty_string_to_none)
90
return self._perform_call("exit")
92
def _perform_call(self, name, *args, **kwargs):
93
method = getattr(self.broker, name)
94
result = method(*args, **kwargs)
98
class FakeRemoteBroker(object):
99
"""Looks like L{RemoteBroker}, but actually talks to local objects."""
101
def __init__(self, exchanger, message_store):
102
self.exchanger = exchanger
103
self.message_store = message_store
105
def call_if_accepted(self, type, callable, *args):
106
if type in self.message_store.get_accepted_types():
107
return maybeDeferred(callable, *args)
110
def send_message(self, message, urgent=False):
111
"""Send to the previously given L{MessageExchange} object."""
112
return execute(self.exchanger.send, message, urgent=urgent)
114
def register_client_accepted_message_type(self, type):
115
return execute(self.exchanger.register_client_accepted_message_type, type)
118
class DBusSignalToReactorTransmitter(object):
120
An object which broadcasts Landscape messages received via DBUS to the
121
reactor. The event key is C{("message", message-type)}, and one argument,
122
the message, will be passed.
124
In addition, C{resynchronize} signals will be translated to
125
C{resynchronize} reactor events.
127
def __init__(self, bus, reactor):
129
self.reactor = reactor
130
bus.add_signal_receiver(self._broadcast_resynchronize, "resynchronize")
131
bus.add_signal_receiver(self._broadcast_message_type_acceptance_changed,
132
"message_type_acceptance_changed")
133
bus.add_signal_receiver(self._broadcast_server_uuid_changed,
134
"server_uuid_changed")
135
bus.add_signal_receiver(self._broadcast_package_data_changed,
136
"package_data_changed")
139
def _broadcast_resynchronize(self):
140
# XXX This event should probably be renamed to something like
141
# "clear data" since the only result of this event being fired
142
# is that persist data is cleared out, no actual data uploads
143
# are triggered by it.
144
self.reactor.fire("resynchronize")
146
def _broadcast_message_type_acceptance_changed(self, type, acceptance):
147
self.reactor.fire(("message-type-acceptance-changed", type), acceptance)
149
def _broadcast_server_uuid_changed(self, old_uuid, new_uuid):
150
# DBus doesn't work well with Nones, so the signal emitter converts
151
# them to empty strings when sending the signal. The remote should
152
# then convert them back to Nones so that we have the same API on
154
self.reactor.fire("server-uuid-changed",
155
old_uuid or None, new_uuid or None)
157
def _broadcast_package_data_changed(self):
158
self.reactor.fire("package-data-changed")