1
from logging import info, exception
3
from twisted.internet.defer import maybeDeferred
5
from landscape.log import format_object
6
from landscape.lib.twisted_util import gather_results
9
class HandlerNotFoundError(Exception):
10
"""A handler for the given message type was not found."""
13
class BrokerClientPlugin(object):
14
"""A convenience for writing L{BrokerClient} plugins.
16
This provides a register method which will set up a bunch of
17
reactor handlers in the idiomatic way.
19
If C{run} is defined on subclasses, it will be called every C{run_interval}
20
seconds after being registered.
22
@cvar run_interval: The interval, in seconds, to execute the C{run} method.
23
If set to C{None}, then C{run} will not be scheduled.
24
@cvar run_immediately: If C{True} the plugin will be run immediately after
29
run_immediately = False
31
def register(self, client):
33
if getattr(self, "run", None) is not None:
34
if self.run_immediately:
36
if self.run_interval is not None:
37
self.client.reactor.call_every(self.run_interval, self.run)
41
"""An alias for the C{client} attribute."""
45
class BrokerClient(object):
46
"""Basic plugin registry for clients that have to deal with the broker.
48
This knows about the needs of a client when dealing with the Landscape
49
broker, including interest in messages of a particular type delivered
50
by the broker to the client.
52
@cvar name: The name used when registering to the broker, it must be
53
defined by sub-classes.
54
@ivar broker: A reference to a connected L{RemoteBroker}, it must be set
55
by the connecting machinery at service startup.
59
def __init__(self, reactor):
61
@param reactor: A L{TwistedReactor}.
63
super(BrokerClient, self).__init__()
64
self.reactor = reactor
66
self._registered_messages = {}
68
self._plugin_names = {}
70
# Register event handlers
71
self.reactor.call_on("impending-exchange", self.notify_exchange)
72
self.reactor.call_on("broker-reconnect", self.handle_reconnect)
78
def add(self, plugin):
81
The plugin's C{register} method will be called with this broker client
84
If the plugin has a C{plugin_name} attribute, it will be possible to
85
look up the plugin later with L{get_plugin}.
87
info("Registering plugin %s.", format_object(plugin))
88
self._plugins.append(plugin)
89
if hasattr(plugin, 'plugin_name'):
90
self._plugin_names[plugin.plugin_name] = plugin
93
def get_plugins(self):
94
"""Get the list of plugins."""
95
return self._plugins[:]
97
def get_plugin(self, name):
98
"""Get a particular plugin by name."""
99
return self._plugin_names[name]
101
def register_message(self, type, handler):
103
Register interest in a particular type of Landscape server->client
106
@param type: The type of message to register C{handler} for.
107
@param handler: A callable taking a message as a parameter, called
108
when messages of C{type} are received.
109
@return: A C{Deferred} that will fire when registration completes.
111
self._registered_messages[type] = handler
112
return self.broker.register_client_accepted_message_type(type)
114
def dispatch_message(self, message):
115
"""Run the handler registered for the type of the given message.
117
@return: The return value of the handler, if found.
118
@raises: HandlerNotFoundError if the handler was not found
120
type = message["type"]
121
handler = self._registered_messages.get(type)
123
raise HandlerNotFoundError(type)
125
return handler(message)
127
exception("Error running message handler for type %r: %r"
130
def message(self, message):
131
"""Call C{dispatch_message} for the given C{message}.
133
@return: A boolean indicating if a handler for the message was found.
136
self.dispatch_message(message)
138
except HandlerNotFoundError:
142
"""Call C{exchange} on all plugins."""
143
for plugin in self.get_plugins():
144
if hasattr(plugin, "exchange"):
148
exception("Error during plugin exchange")
150
def notify_exchange(self):
151
"""Notify all plugins about an impending exchange."""
152
info("Got notification of impending exchange. Notifying all plugins.")
155
def fire_event(self, event_type, *args, **kwargs):
156
"""Fire an event of a given type.
158
@return: A L{Deferred} resulting in a list of returns values of
159
the fired event handlers, in the order they were fired.
161
if event_type == "message-type-acceptance-changed":
162
message_type = args[0]
164
results = self.reactor.fire((event_type, message_type), acceptance)
166
results = self.reactor.fire(event_type, *args, **kwargs)
167
return gather_results([
168
maybeDeferred(lambda x: x, result) for result in results])
170
def handle_reconnect(self):
171
"""Called when the connection with the broker is established again.
173
The following needs to be done:
175
- Re-register any previously registered message types, so the broker
176
knows we have interest on them.
178
- Re-register ourselves as client, so the broker knows we exist and
179
will talk to us firing events and dispatching messages.
181
for type in self._registered_messages:
182
self.broker.register_client_accepted_message_type(type)
183
self.broker.register_client(self.name)
186
"""Stop the reactor and exit the process."""
187
# Stop with a short delay to give a chance to reply to the
189
self.reactor.call_later(0.1, self.reactor.stop)