3
from twisted.internet.defer import Deferred
5
from landscape.lib.twisted_util import gather_results
6
from landscape.amp import RemoteComponentsRegistry
7
from landscape.manager.manager import FAILED
11
"""Turns a L{BrokerServer} method into an event broadcaster.
13
When the decorated method is called, an event is fired on all connected
14
clients. The event will have the same name as the method being called,
15
except that any underscore in the method name will be replaced with a dash.
17
event_type = method.__name__.replace("_", "-")
19
def broadcast_event(self, *args, **kwargs):
21
for client in self.get_clients():
22
fired.append(client.fire_event(event_type, *args, **kwargs))
23
return gather_results(fired)
25
return broadcast_event
28
class BrokerServer(object):
30
A broker server capable of handling messages from plugins connected using
31
the L{BrokerProtocol}.
34
connectors_registry = RemoteComponentsRegistry
36
def __init__(self, config, reactor, exchange, registration,
39
@param config: The L{BrokerConfiguration} used by the broker.
40
@param reactor: The L{TwistedReactor} driving the broker's events.
41
@param exchange: The L{MessageExchange} to send messages with.
42
@param registration: The {RegistrationHandler}.
43
@param message_store: The broker's L{MessageStore}.
46
self._reactor = reactor
47
self._exchanger = exchange
48
self._registration = registration
49
self._message_store = message_store
50
self._registered_clients = {}
53
reactor.call_on("message", self.broadcast_message)
54
reactor.call_on("impending-exchange", self.impending_exchange)
55
reactor.call_on("exchange-failed", self.exchange_failed)
56
reactor.call_on("registration-done", self.registration_done)
57
reactor.call_on("registration-failed", self.registration_failed)
58
reactor.call_on("message-type-acceptance-changed",
59
self.message_type_acceptance_changed)
60
reactor.call_on("server-uuid-changed", self.server_uuid_changed)
61
reactor.call_on("package-data-changed", self.package_data_changed)
62
reactor.call_on("resynchronize-clients", self.resynchronize)
68
def register_client(self, name):
69
"""Register a broker client called C{name}.
71
Various broker clients interact with the broker server, such as the
72
monitor for example, using the L{BrokerServerProtocol} for performing
73
remote method calls on the L{BrokerServer}.
75
They establish connectivity with the broker by connecting and
76
registering themselves, the L{BrokerServer} will in turn connect
77
to them in order to be able to perform remote method calls like
78
broadcasting events and messages.
80
@param name: The name of the client, such a C{monitor} or C{manager}.
82
connector_class = self.connectors_registry.get(name)
83
connector = connector_class(self._reactor, self._config)
85
def register(remote_client):
86
self._registered_clients[name] = remote_client
87
self._connectors[remote_client] = connector
89
connected = connector.connect()
90
return connected.addCallback(register)
92
def get_clients(self):
93
"""Get L{RemoteClient} instances for registered clients."""
94
return self._registered_clients.values()
96
def get_client(self, name):
97
"""Return the client with the given C{name} or C{None}."""
98
return self._registered_clients.get(name)
100
def get_connectors(self):
101
"""Get connectors for registered clients.
103
@see L{RemoteLandscapeComponentCreator}.
105
return self._connectors.values()
107
def get_connector(self, name):
108
"""Return the connector for the given C{name} or C{None}."""
109
return self._connectors.get(self.get_client(name))
111
def send_message(self, message, urgent=False):
112
"""Queue C{message} for delivery to the server at the next exchange.
114
@param message: The message C{dict} to send to the server. It must
115
have a C{type} key and be compatible with C{landscape.lib.bpickle}.
116
@param urgent: If C{True}, exchange urgently, otherwise exchange
117
during the next regularly scheduled exchange.
118
@return: The message identifier created when queuing C{message}.
120
return self._exchanger.send(message, urgent=urgent)
122
def is_message_pending(self, message_id):
123
"""Indicate if a message with given C{message_id} is pending."""
124
return self._message_store.is_pending(message_id)
126
def stop_clients(self):
127
"""Tell all the clients to exit."""
129
# FIXME: check whether the client are still alive
130
for client in self.get_clients():
131
results.append(client.exit())
132
result = gather_results(results, consume_errors=True)
133
return result.addCallback(lambda ignored: None)
135
def reload_configuration(self):
136
"""Reload the configuration file, and stop all clients."""
137
self._config.reload()
138
# Now we'll kill off everything else so that they can be restarted and
139
# notice configuration changes.
140
return self.stop_clients()
143
"""Attempt to register with the Landscape server.
145
@see: L{RegistrationHandler.register}
147
return self._registration.register()
149
def get_accepted_message_types(self):
150
"""Return the message types accepted by the Landscape server."""
151
return self._message_store.get_accepted_types()
153
def get_server_uuid(self):
154
"""Return the uuid of the Landscape server we're pointing at."""
155
# Convert Nones to empty strings. The Remote will
156
# convert them back to Nones.
157
return self._message_store.get_server_uuid()
159
def register_client_accepted_message_type(self, type):
160
"""Register a new message type which can be accepted by this client.
162
@param type: The message type to accept.
164
self._exchanger.register_client_accepted_message_type(type)
166
def fire_event(self, event_type):
167
"""Fire an event in the broker reactor."""
168
self._reactor.fire(event_type)
171
"""Request a graceful exit from the broker server.
173
Before this method returns, all broker clients will be notified
174
of the server broker's intention of exiting, so that they have
175
the chance to stop whatever they're doing in a graceful way, and
176
then exit themselves.
178
This method will only return a result when all plugins returned
181
# Fire pre-exit before calling any of the plugins, so that everything
182
# in the broker acknowledges that we're about to exit and asking
183
# broker clients to die. This prevents any exchanges from happening,
185
self._reactor.fire("pre-exit")
187
clients_stopped = self.stop_clients()
189
def fire_post_exit(ignored):
190
# Fire it shortly, to give us a chance to send an AMP reply.
191
self._reactor.call_later(
192
1, lambda: self._reactor.fire("post-exit"))
194
return clients_stopped.addBoth(fire_post_exit)
197
def resynchronize(self):
198
"""Broadcast a C{resynchronize} event to the clients."""
201
def impending_exchange(self):
202
"""Broadcast an C{impending-exchange} event to the clients."""
205
def exchange_failed(self):
206
"""Broadcast a C{exchange-failed} event to the clients."""
209
def registration_done(self):
210
"""Broadcast a C{registration-done} event to the clients."""
213
def registration_failed(self):
214
"""Broadcast a C{registration-failed} event to the clients."""
216
def listen_events(self, event_types):
218
Return a C{Deferred} that fires when the first event occurs among the
221
deferred = Deferred()
224
def get_handler(event_type):
228
self._reactor.cancel_call(call)
229
deferred.callback(event_type)
233
for event_type in event_types:
234
call = self._reactor.call_on(event_type, get_handler(event_type))
239
def broker_reconnect(self):
240
"""Broadcast a C{broker-reconnect} event to the clients."""
243
def server_uuid_changed(self, old_uuid, new_uuid):
244
"""Broadcast a C{server-uuid-changed} event to the clients."""
247
def message_type_acceptance_changed(self, type, accepted):
251
def package_data_changed(self):
252
"""Fire a package-data-changed event in the reactor of each client."""
254
def broadcast_message(self, message):
255
"""Call the C{message} method of all the registered plugins.
257
@see: L{register_plugin}.
260
for client in self.get_clients():
261
results.append(client.message(message))
262
result = gather_results(results)
263
return result.addCallback(self._message_delivered, message)
265
def _message_delivered(self, results, message):
267
If the message wasn't handled, and it's an operation request (i.e. it
268
has an operation-id), then respond with a failing operation result
271
opid = message.get("operation-id")
272
if (True not in results
274
and message["type"] != "resynchronize"):
275
mtype = message["type"]
276
logging.error("Nobody handled the %s message." % (mtype,))
279
Landscape client failed to handle this request (%s) because the
280
plugin which should handle it isn't available. This could mean that the
281
plugin has been intentionally disabled, or that the client isn't running
282
properly, or you may be running an older version of the client that doesn't
283
support this feature.
285
Please contact the Landscape team for more information.
288
"type": "operation-result",
290
"result-text": result_text,
291
"operation-id": opid}
292
self._exchanger.send(response, urgent=True)