3
from twisted.internet.defer import Deferred
5
from landscape.lib.twisted_util import gather_results
6
from landscape.amp import RemoteComponentsRegistry
7
from landscape.manager.manager import FAILED
11
"""Turns a L{BrokerServer} method into an event broadcaster.
13
When the decorated method is called, an event is fired on all connected
14
clients. The event will have the same name as the method being called,
15
except that any underscore in the method name will be replaced with a dash.
17
event_type = method.__name__.replace("_", "-")
19
def broadcast_event(self, *args, **kwargs):
21
for client in self.get_clients():
22
fired.append(client.fire_event(event_type, *args, **kwargs))
23
return gather_results(fired)
25
return broadcast_event
28
class BrokerServer(object):
30
A broker server capable of handling messages from plugins connected using
31
the L{BrokerProtocol}.
34
connectors_registry = RemoteComponentsRegistry
36
def __init__(self, config, reactor, exchange, registration,
39
@param config: The L{BrokerConfiguration} used by the broker.
40
@param reactor: The L{TwistedReactor} driving the broker's events.
41
@param exchange: The L{MessageExchange} to send messages with.
42
@param registration: The {RegistrationHandler}.
43
@param message_store: The broker's L{MessageStore}.
46
self._reactor = reactor
47
self._exchanger = exchange
48
self._registration = registration
49
self._message_store = message_store
50
self._registered_clients = {}
53
reactor.call_on("message", self.broadcast_message)
54
reactor.call_on("impending-exchange", self.impending_exchange)
55
reactor.call_on("message-type-acceptance-changed",
56
self.message_type_acceptance_changed)
57
reactor.call_on("server-uuid-changed", self.server_uuid_changed)
58
reactor.call_on("package-data-changed", self.package_data_changed)
59
reactor.call_on("resynchronize-clients", self.resynchronize)
65
def register_client(self, name):
66
"""Register a broker client called C{name}.
68
Various broker clients interact with the broker server, such as the
69
monitor for example, using the L{BrokerServerProtocol} for performing
70
remote method calls on the L{BrokerServer}.
72
They establish connectivity with the broker by connecting and
73
registering themselves, the L{BrokerServer} will in turn connect
74
to them in order to be able to perform remote method calls like
75
broadcasting events and messages.
77
@param name: The name of the client, such a C{monitor} or C{manager}.
79
connector_class = self.connectors_registry.get(name)
80
connector = connector_class(self._reactor, self._config)
82
def register(remote_client):
83
self._registered_clients[name] = remote_client
84
self._connectors[remote_client] = connector
86
connected = connector.connect()
87
return connected.addCallback(register)
89
def get_clients(self):
90
"""Get L{RemoteClient} instances for registered clients."""
91
return self._registered_clients.values()
93
def get_client(self, name):
94
"""Return the client with the given C{name} or C{None}."""
95
return self._registered_clients.get(name)
97
def get_connectors(self):
98
"""Get connectors for registered clients.
100
@see L{RemoteLandscapeComponentCreator}.
102
return self._connectors.values()
104
def get_connector(self, name):
105
"""Return the connector for the given C{name} or C{None}."""
106
return self._connectors.get(self.get_client(name))
108
def send_message(self, message, urgent=False):
109
"""Queue C{message} for delivery to the server at the next exchange.
111
@param message: The message C{dict} to send to the server. It must
112
have a C{type} key and be compatible with C{landscape.lib.bpickle}.
113
@param urgent: If C{True}, exchange urgently, otherwise exchange
114
during the next regularly scheduled exchange.
115
@return: The message identifier created when queuing C{message}.
117
return self._exchanger.send(message, urgent=urgent)
119
def is_message_pending(self, message_id):
120
"""Indicate if a message with given C{message_id} is pending."""
121
return self._message_store.is_pending(message_id)
123
def stop_clients(self):
124
"""Tell all the clients to exit."""
126
# FIXME: check whether the client are still alive
127
for client in self.get_clients():
128
results.append(client.exit())
129
result = gather_results(results, consume_errors=True)
130
return result.addCallback(lambda ignored: None)
132
def reload_configuration(self):
133
"""Reload the configuration file, and stop all clients."""
134
self._config.reload()
135
# Now we'll kill off everything else so that they can be restarted and
136
# notice configuration changes.
137
return self.stop_clients()
140
"""Attempt to register with the Landscape server.
142
@see: L{RegistrationHandler.register}
144
return self._registration.register()
146
def get_accepted_message_types(self):
147
"""Return the message types accepted by the Landscape server."""
148
return self._message_store.get_accepted_types()
150
def get_server_uuid(self):
151
"""Return the uuid of the Landscape server we're pointing at."""
152
# Convert Nones to empty strings. The Remote will
153
# convert them back to Nones.
154
return self._message_store.get_server_uuid()
156
def register_client_accepted_message_type(self, type):
157
"""Register a new message type which can be accepted by this client.
159
@param type: The message type to accept.
161
self._exchanger.register_client_accepted_message_type(type)
163
def fire_event(self, event_type):
164
"""Fire an event in the broker reactor."""
165
self._reactor.fire(event_type)
168
"""Request a graceful exit from the broker server.
170
Before this method returns, all broker clients will be notified
171
of the server broker's intention of exiting, so that they have
172
the chance to stop whatever they're doing in a graceful way, and
173
then exit themselves.
175
This method will only return a result when all plugins returned
178
# Fire pre-exit before calling any of the plugins, so that everything
179
# in the broker acknowledges that we're about to exit and asking
180
# broker clients to die. This prevents any exchanges from happening,
182
self._reactor.fire("pre-exit")
184
clients_stopped = self.stop_clients()
186
def fire_post_exit(ignored):
187
# Fire it shortly, to give us a chance to send an AMP reply.
188
self._reactor.call_later(
189
1, lambda: self._reactor.fire("post-exit"))
191
return clients_stopped.addBoth(fire_post_exit)
194
def resynchronize(self):
195
"""Broadcast a C{resynchronize} event to the clients."""
198
def impending_exchange(self):
199
"""Broadcast an C{impending-exchange} event to the clients."""
201
def listen_events(self, event_types):
203
Return a C{Deferred} that fires when the first event occurs among the
206
deferred = Deferred()
209
def get_handler(event_type):
213
self._reactor.cancel_call(call)
214
deferred.callback(event_type)
218
for event_type in event_types:
219
call = self._reactor.call_on(event_type, get_handler(event_type))
224
def broker_reconnect(self):
225
"""Broadcast a C{broker-reconnect} event to the clients."""
228
def server_uuid_changed(self, old_uuid, new_uuid):
229
"""Broadcast a C{server-uuid-changed} event to the clients."""
232
def message_type_acceptance_changed(self, type, accepted):
236
def package_data_changed(self):
237
"""Fire a package-data-changed event in the reactor of each client."""
239
def broadcast_message(self, message):
240
"""Call the C{message} method of all the registered plugins.
242
@see: L{register_plugin}.
245
for client in self.get_clients():
246
results.append(client.message(message))
247
result = gather_results(results)
248
return result.addCallback(self._message_delivered, message)
250
def _message_delivered(self, results, message):
252
If the message wasn't handled, and it's an operation request (i.e. it
253
has an operation-id), then respond with a failing operation result
256
opid = message.get("operation-id")
257
if (True not in results
259
and message["type"] != "resynchronize"):
260
mtype = message["type"]
261
logging.error("Nobody handled the %s message." % (mtype,))
264
Landscape client failed to handle this request (%s) because the
265
plugin which should handle it isn't available. This could mean that the
266
plugin has been intentionally disabled, or that the client isn't running
267
properly, or you may be running an older version of the client that doesn't
268
support this feature.
270
Please contact the Landscape team for more information.
273
"type": "operation-result",
275
"result-text": result_text,
276
"operation-id": opid}
277
self._exchanger.send(response, urgent=True)