~ahasenack/landscape-client/landscape-client-11.02-0ubuntu0.8.04.1

« back to all changes in this revision

Viewing changes to landscape/broker/server.py

  • Committer: Andreas Hasenack
  • Date: 2011-05-05 14:12:15 UTC
  • Revision ID: andreas@canonical.com-20110505141215-5ymuyyh5es9pwa6p
Added hardy files.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import logging
 
2
 
 
3
from twisted.internet.defer import Deferred
 
4
 
 
5
from landscape.lib.twisted_util import gather_results
 
6
from landscape.amp import RemoteComponentsRegistry
 
7
from landscape.manager.manager import FAILED
 
8
 
 
9
 
 
10
def event(method):
 
11
    """Turns a L{BrokerServer} method into an event broadcaster.
 
12
 
 
13
    When the decorated method is called, an event is fired on all connected
 
14
    clients. The event will have the same name as the method being called,
 
15
    except that any underscore in the method name will be replaced with a dash.
 
16
    """
 
17
    event_type = method.__name__.replace("_", "-")
 
18
 
 
19
    def broadcast_event(self, *args, **kwargs):
 
20
        fired = []
 
21
        for client in self.get_clients():
 
22
            fired.append(client.fire_event(event_type, *args, **kwargs))
 
23
        return gather_results(fired)
 
24
 
 
25
    return broadcast_event
 
26
 
 
27
 
 
28
class BrokerServer(object):
 
29
    """
 
30
    A broker server capable of handling messages from plugins connected using
 
31
    the L{BrokerProtocol}.
 
32
    """
 
33
    name = "broker"
 
34
    connectors_registry = RemoteComponentsRegistry
 
35
 
 
36
    def __init__(self, config, reactor, exchange, registration,
 
37
                 message_store):
 
38
        """
 
39
        @param config: The L{BrokerConfiguration} used by the broker.
 
40
        @param reactor: The L{TwistedReactor} driving the broker's events.
 
41
        @param exchange: The L{MessageExchange} to send messages with.
 
42
        @param registration: The {RegistrationHandler}.
 
43
        @param message_store: The broker's L{MessageStore}.
 
44
        """
 
45
        self._config = config
 
46
        self._reactor = reactor
 
47
        self._exchanger = exchange
 
48
        self._registration = registration
 
49
        self._message_store = message_store
 
50
        self._registered_clients = {}
 
51
        self._connectors = {}
 
52
 
 
53
        reactor.call_on("message", self.broadcast_message)
 
54
        reactor.call_on("impending-exchange", self.impending_exchange)
 
55
        reactor.call_on("message-type-acceptance-changed",
 
56
                        self.message_type_acceptance_changed)
 
57
        reactor.call_on("server-uuid-changed", self.server_uuid_changed)
 
58
        reactor.call_on("package-data-changed", self.package_data_changed)
 
59
        reactor.call_on("resynchronize-clients", self.resynchronize)
 
60
 
 
61
    def ping(self):
 
62
        """Return C{True}."""
 
63
        return True
 
64
 
 
65
    def register_client(self, name):
 
66
        """Register a broker client called C{name}.
 
67
 
 
68
        Various broker clients interact with the broker server, such as the
 
69
        monitor for example, using the L{BrokerServerProtocol} for performing
 
70
        remote method calls on the L{BrokerServer}.
 
71
 
 
72
        They establish connectivity with the broker by connecting and
 
73
        registering themselves, the L{BrokerServer} will in turn connect
 
74
        to them in order to be able to perform remote method calls like
 
75
        broadcasting events and messages.
 
76
 
 
77
        @param name: The name of the client, such a C{monitor} or C{manager}.
 
78
        """
 
79
        connector_class = self.connectors_registry.get(name)
 
80
        connector = connector_class(self._reactor, self._config)
 
81
 
 
82
        def register(remote_client):
 
83
            self._registered_clients[name] = remote_client
 
84
            self._connectors[remote_client] = connector
 
85
 
 
86
        connected = connector.connect()
 
87
        return connected.addCallback(register)
 
88
 
 
89
    def get_clients(self):
 
90
        """Get L{RemoteClient} instances for registered clients."""
 
91
        return self._registered_clients.values()
 
92
 
 
93
    def get_client(self, name):
 
94
        """Return the client with the given C{name} or C{None}."""
 
95
        return self._registered_clients.get(name)
 
96
 
 
97
    def get_connectors(self):
 
98
        """Get connectors for registered clients.
 
99
 
 
100
        @see L{RemoteLandscapeComponentCreator}.
 
101
        """
 
102
        return self._connectors.values()
 
103
 
 
104
    def get_connector(self, name):
 
105
        """Return the connector for the given C{name} or C{None}."""
 
106
        return self._connectors.get(self.get_client(name))
 
107
 
 
108
    def send_message(self, message, urgent=False):
 
109
        """Queue C{message} for delivery to the server at the next exchange.
 
110
 
 
111
        @param message: The message C{dict} to send to the server.  It must
 
112
            have a C{type} key and be compatible with C{landscape.lib.bpickle}.
 
113
        @param urgent: If C{True}, exchange urgently, otherwise exchange
 
114
            during the next regularly scheduled exchange.
 
115
        @return: The message identifier created when queuing C{message}.
 
116
        """
 
117
        return self._exchanger.send(message, urgent=urgent)
 
118
 
 
119
    def is_message_pending(self, message_id):
 
120
        """Indicate if a message with given C{message_id} is pending."""
 
121
        return self._message_store.is_pending(message_id)
 
122
 
 
123
    def stop_clients(self):
 
124
        """Tell all the clients to exit."""
 
125
        results = []
 
126
        # FIXME: check whether the client are still alive
 
127
        for client in self.get_clients():
 
128
            results.append(client.exit())
 
129
        result = gather_results(results, consume_errors=True)
 
130
        return result.addCallback(lambda ignored: None)
 
131
 
 
132
    def reload_configuration(self):
 
133
        """Reload the configuration file, and stop all clients."""
 
134
        self._config.reload()
 
135
        # Now we'll kill off everything else so that they can be restarted and
 
136
        # notice configuration changes.
 
137
        return self.stop_clients()
 
138
 
 
139
    def register(self):
 
140
        """Attempt to register with the Landscape server.
 
141
 
 
142
        @see: L{RegistrationHandler.register}
 
143
        """
 
144
        return self._registration.register()
 
145
 
 
146
    def get_accepted_message_types(self):
 
147
        """Return the message types accepted by the Landscape server."""
 
148
        return self._message_store.get_accepted_types()
 
149
 
 
150
    def get_server_uuid(self):
 
151
        """Return the uuid of the Landscape server we're pointing at."""
 
152
        # Convert Nones to empty strings.  The Remote will
 
153
        # convert them back to Nones.
 
154
        return self._message_store.get_server_uuid()
 
155
 
 
156
    def register_client_accepted_message_type(self, type):
 
157
        """Register a new message type which can be accepted by this client.
 
158
 
 
159
        @param type: The message type to accept.
 
160
        """
 
161
        self._exchanger.register_client_accepted_message_type(type)
 
162
 
 
163
    def fire_event(self, event_type):
 
164
        """Fire an event in the broker reactor."""
 
165
        self._reactor.fire(event_type)
 
166
 
 
167
    def exit(self):
 
168
        """Request a graceful exit from the broker server.
 
169
 
 
170
        Before this method returns, all broker clients will be notified
 
171
        of the server broker's intention of exiting, so that they have
 
172
        the chance to stop whatever they're doing in a graceful way, and
 
173
        then exit themselves.
 
174
 
 
175
        This method will only return a result when all plugins returned
 
176
        their own results.
 
177
        """
 
178
        # Fire pre-exit before calling any of the plugins, so that everything
 
179
        # in the broker acknowledges that we're about to exit and asking
 
180
        # broker clients to die.  This prevents any exchanges from happening,
 
181
        # for instance.
 
182
        self._reactor.fire("pre-exit")
 
183
 
 
184
        clients_stopped = self.stop_clients()
 
185
 
 
186
        def fire_post_exit(ignored):
 
187
            # Fire it shortly, to give us a chance to send an AMP reply.
 
188
            self._reactor.call_later(
 
189
                1, lambda: self._reactor.fire("post-exit"))
 
190
 
 
191
        return clients_stopped.addBoth(fire_post_exit)
 
192
 
 
193
    @event
 
194
    def resynchronize(self):
 
195
        """Broadcast a C{resynchronize} event to the clients."""
 
196
 
 
197
    @event
 
198
    def impending_exchange(self):
 
199
        """Broadcast an C{impending-exchange} event to the clients."""
 
200
 
 
201
    def listen_events(self, event_types):
 
202
        """
 
203
        Return a C{Deferred} that fires when the first event occurs among the
 
204
        given ones.
 
205
        """
 
206
        deferred = Deferred()
 
207
        calls = []
 
208
 
 
209
        def get_handler(event_type):
 
210
 
 
211
            def handler():
 
212
                for call in calls:
 
213
                    self._reactor.cancel_call(call)
 
214
                deferred.callback(event_type)
 
215
 
 
216
            return handler
 
217
 
 
218
        for event_type in event_types:
 
219
            call = self._reactor.call_on(event_type, get_handler(event_type))
 
220
            calls.append(call)
 
221
        return deferred
 
222
 
 
223
    @event
 
224
    def broker_reconnect(self):
 
225
        """Broadcast a C{broker-reconnect} event to the clients."""
 
226
 
 
227
    @event
 
228
    def server_uuid_changed(self, old_uuid, new_uuid):
 
229
        """Broadcast a C{server-uuid-changed} event to the clients."""
 
230
 
 
231
    @event
 
232
    def message_type_acceptance_changed(self, type, accepted):
 
233
        pass
 
234
 
 
235
    @event
 
236
    def package_data_changed(self):
 
237
        """Fire a package-data-changed event in the reactor of each client."""
 
238
 
 
239
    def broadcast_message(self, message):
 
240
        """Call the C{message} method of all the registered plugins.
 
241
 
 
242
        @see: L{register_plugin}.
 
243
        """
 
244
        results = []
 
245
        for client in self.get_clients():
 
246
            results.append(client.message(message))
 
247
        result = gather_results(results)
 
248
        return result.addCallback(self._message_delivered, message)
 
249
 
 
250
    def _message_delivered(self, results, message):
 
251
        """
 
252
        If the message wasn't handled, and it's an operation request (i.e. it
 
253
        has an operation-id), then respond with a failing operation result
 
254
        indicating as such.
 
255
        """
 
256
        opid = message.get("operation-id")
 
257
        if (True not in results
 
258
            and opid is not None
 
259
            and message["type"] != "resynchronize"):
 
260
            mtype = message["type"]
 
261
            logging.error("Nobody handled the %s message." % (mtype,))
 
262
 
 
263
            result_text = """\
 
264
Landscape client failed to handle this request (%s) because the
 
265
plugin which should handle it isn't available.  This could mean that the
 
266
plugin has been intentionally disabled, or that the client isn't running
 
267
properly, or you may be running an older version of the client that doesn't
 
268
support this feature.
 
269
 
 
270
Please contact the Landscape team for more information.
 
271
""" % (mtype,)
 
272
            response = {
 
273
                "type": "operation-result",
 
274
                "status": FAILED,
 
275
                "result-text": result_text,
 
276
                "operation-id": opid}
 
277
            self._exchanger.send(response, urgent=True)