1
"""The DBUS service which interfaces to the broker."""
5
from dbus.service import signal
8
from landscape.lib.dbus_util import (get_object, Object, method,
9
byte_array, array_to_string)
10
from landscape.lib.bpickle import loads, dumps
11
from landscape.lib.twisted_util import gather_results
13
from landscape.manager.manager import FAILED
16
BUS_NAME = "com.canonical.landscape.Broker"
17
OBJECT_PATH = "/com/canonical/landscape/Broker"
21
class BrokerDBusObject(Object):
22
"""A DBus-published object which allows adding messages to the queue."""
25
object_path = OBJECT_PATH
27
def __init__(self, config, reactor, exchange, registration,
31
L{MessageExchange<landscape.exchange.MessageExchange>} to send
33
@param bus: The L{Bus} that represents where we're listening.
35
super(BrokerDBusObject, self).__init__(bus)
36
self._registered_plugins = set()
39
self.reactor = reactor
40
self.exchange = exchange
41
self.registration = registration
42
self.message_store = message_store
43
reactor.call_on("message", self._broadcast_message)
44
reactor.call_on("impending-exchange", self.impending_exchange)
45
reactor.call_on("exchange-failed", self.exchange_failed)
46
reactor.call_on("registration-done", self.registration_done)
47
reactor.call_on("registration-failed", self.registration_failed)
48
reactor.call_on("message-type-acceptance-changed",
49
self.message_type_acceptance_changed)
50
reactor.call_on("resynchronize-clients", self.resynchronize)
53
def resynchronize(self):
57
def impending_exchange(self):
61
def exchange_failed(self):
64
def _broadcast_message(self, message):
65
blob = byte_array(dumps(message))
67
for plugin in self.get_plugin_objects():
68
results.append(plugin.message(blob))
69
return gather_results(results).addCallback(self._message_delivered,
72
def _message_delivered(self, results, message):
74
If the message wasn't handled, and it's an operation request (i.e. it
75
has an operation-id), then respond with a failing operation result
78
opid = message.get("operation-id")
79
if (True not in results
81
and message["type"] != "resynchronize"):
82
mtype = message["type"]
83
logging.error("Nobody handled the %s message." % (mtype,))
86
Landscape client failed to handle this request (%s) because the
87
plugin which should handle it isn't available. This could mean that the
88
plugin has been intentionally disabled, or that the client isn't running
89
properly, or you may be running an older version of the client that doesn't
92
Please contact the Landscape team for more information.
95
"type": "operation-result",
97
"result-text": result_text,
99
self.exchange.send(response, urgent=True)
108
def send_message(self, message, urgent=False):
109
"""Queue the given message in the message exchange.
111
This method is DBUS-published.
113
@param message: The message dict.
114
@param urgent: If True, exchange urgently. Defaults to False.
116
message = loads(array_to_string(message))
118
logging.debug("Got a %r message over DBUS." % (message["type"],))
119
except (KeyError, TypeError), e:
120
logging.exception(str(e))
121
return self.exchange.send(message, urgent=urgent)
124
def is_message_pending(self, message_id):
125
return self.message_store.is_pending(message_id)
128
def reload_configuration(self):
130
# Now we'll kill off everything else so that they can be restarted and
131
# notice configuration changes.
132
return self.stop_plugins()
136
return self.registration.register()
139
def registration_done(self):
143
def registration_failed(self):
146
@method(IFACE_NAME, out_signature="as")
147
def get_accepted_message_types(self):
148
return self.message_store.get_accepted_types()
151
def message_type_acceptance_changed(self, type, accepted):
155
def register_plugin(self, bus_name, object_path):
156
self._registered_plugins.add((bus_name, object_path))
159
def get_registered_plugins(self):
160
return list(self._registered_plugins)
162
def get_plugin_objects(self, retry_timeout=None):
163
return [get_object(self.bus, bus_name, object_path,
164
retry_timeout=retry_timeout)
165
for bus_name, object_path in self._registered_plugins]
167
def stop_plugins(self):
168
"""Tell all plugins to exit."""
170
# We disable our timeout with retry_timeout=0 here. The process might
171
# already have exited, or be truly wedged, so the default DBus timeout
173
for plugin in self.get_plugin_objects(retry_timeout=0):
174
results.append(plugin.exit())
175
result = gather_results(results, consume_errors=True)
176
result.addCallback(lambda ignored: None)
181
"""Request a graceful exit from the broker.
183
Before this method returns, all plugins will be notified of the
184
broker's intention of exiting, so that they have the chance to
185
stop whatever they're doing in a graceful way, and then exit
188
This method will only return a result when all plugins returned
191
# Fire pre-exit before calling any of the plugins, so that everything
192
# in the broker acknowledges that we're about to exit and asking
193
# plugins to die. This prevents any exchanges from happening,
195
self.reactor.fire("pre-exit")
197
result = self.stop_plugins()
199
def fire_post_exit(ignored):
200
# Fire it shortly, to give us a chance to send a DBUS reply.
201
self.reactor.call_later(1, lambda: self.reactor.fire("post-exit"))
202
result.addBoth(fire_post_exit)