1
from logging import info, exception
1
from logging import info
3
from landscape.lib.bpickle import loads
4
from landscape.lib.dbus_util import Object, array_to_string
5
3
from landscape.log import format_object
8
class HandlerNotFoundError(Exception):
9
"""A handler for the given message type was not found."""
12
6
class PluginConfigError(Exception):
13
7
"""There was an error registering or configuring a plugin."""
44
38
return self._plugin_names[name]
47
class BrokerClientPluginRegistry(PluginRegistry):
48
"""Basic plugin registry for clients that have to deal with the broker.
50
This knows about the needs of a client when dealing with the Landscape
51
broker, including interest in messages of a particular type delivered
52
by the broker to the client.
55
def __init__(self, broker):
56
super(BrokerClientPluginRegistry, self).__init__()
57
self._registered_messages = {}
60
def register_message(self, type, handler):
62
Register interest in a particular type of Landscape server->client
65
self._registered_messages[type] = handler
66
return self.broker.register_client_accepted_message_type(type)
68
def broker_started(self):
70
Re-register any previously registered message types when the broker
73
for type in self._registered_messages:
74
self.broker.register_client_accepted_message_type(type)
76
def dispatch_message(self, message):
77
"""Run the handler registered for the type of the given message."""
78
type = message["type"]
79
handler = self._registered_messages.get(type)
80
if handler is not None:
82
return handler(message)
84
exception("Error running message handler for type %r: %r"
87
raise HandlerNotFoundError(type)
90
"""Call C{exchange} on all plugins."""
91
for plugin in self._plugins:
92
if hasattr(plugin, "exchange"):
96
exception("Error during plugin exchange")
99
41
class Plugin(object):
100
42
"""A convenience for writing plugins.
116
58
self.registry = registry
117
59
if hasattr(self, "run") and self.run_interval is not None:
118
60
registry.reactor.call_every(self.run_interval, self.run)
121
class BrokerPlugin(Object):
123
A DBus object which exposes the 'plugin' interface that the Broker expects
127
def __init__(self, bus, registry):
129
@param bus: a DBus connection, typically a C{dbus.SystemBus} object.
130
@param registry: an instance of L{PluginRegistry} or of a subclass of
133
Object.__init__(self, bus)
134
self.registry = registry
135
bus.add_signal_receiver(self.notify_exchange, "impending_exchange")
137
def notify_exchange(self):
138
info("Got notification of impending exchange. Notifying all plugins.")
139
self.registry.exchange()
145
from twisted.internet import reactor
146
reactor.callLater(0.1, reactor.stop)
148
def dispatch_message(self, blob):
150
Call the L{PluginRegistry}'s C{dispatch_message} method and return True
151
if a message handler was found and False otherwise.
153
message = loads(array_to_string(blob))
155
self.registry.dispatch_message(message)
157
except HandlerNotFoundError:
160
def message(self, blob):
162
Call L{dispatch_message} with C{blob} and return the result.
164
return self.dispatch_message(blob)