~free.ekanayaka/landscape-client/config-failure

« back to all changes in this revision

Viewing changes to landscape/broker/server.py

  • Committer: Free Ekanayaka
  • Date: 2010-04-28 13:38:56 UTC
  • mfrom: (182.1.30 amp-trunk)
  • Revision ID: free.ekanayaka@canonical.com-20100428133856-wo697ypl3e96cd6i
Merge amp-trunk [trivial]

Migrate our inter-process communication system from D-Bus to a Twisted AMP-based
protocol.

The change has been splitted into serveral branches that have been reviewed
separately in the LP bugs 570763, 570192, 568983, 568499, 567152, 564587,
564620, 562330, 561471, 559312, 558455, 556603, 552414, 546825, 544070, 539658,
537538, 535227, 499176, 531480, 499121, 514250, 499225 and 499018.

For sake of convenience here follows a summary of the changes introduced by
each branch:

  - Cleanup for having the base classes for monitor and manager plugins
    in landscape.monitor.plugin and landscape.manager.plugin.
  
  - Install the dbus bpickle extensions when starting a LandscapeService, letting
    plugins like the HAL/hardware one send dbus data types over AMP.
  
  - Make the monitor and manager test cases inherit from LandscapeTest instead of
    LandscapeIsolatedTest, as they don't need DBus anymore.
  
  - Make the landscape-config script use the AMP-based protocol to communicate
    with the broker. 

 -  Change the MethodCall protocol to split arguments in chunks of
    64k and transparently send them over several AMP commands.

 -  Make all task handlers derived from PackageTaskHandlers use the AMP-based
    protocol instead of D-Bus to communicate with the broker.
  
  - Port the behavior introduced in Bug #542215 to the AMP-based
    BrokerServer, which now broadcasts package-data-changed events.
  
  - Make the BrokerServiceHelper provide a 'live' RemoteBroker instead
    of a FakeRemoteBroker. The former BrokerServiceHelper has been renamed
    to FakeBrokerServiceHelper.
  
  - Migrate the watchdog to AMP, replacing the existing DBus-based
    communication mechanism with the AMP-based one.

  - Change is in the shutdown logic in the watchdog to be fully asynchronous,
    while before was relying on DBus being synchronous.
  
  - Add a 'factor' parameter to the RemoteComponentConnector.connect
    mehtod, for setting the pace at which service will try to reconnect.
  
  - Make services pass "wantPID" to the reactor.listenUNIX method, which
    cleans up left-over unix sockets on the filesystem (e.g. the former
    process died).
  
  - Change the landscape-{broker,monitor,manager} scripts to use the AMP-based versions
    of the associated services.
  
  - Move the BrokerConfiguration from landscape.broker.deployment to landscape.broker.config,
    for consistency with the monitor and the manager equivalent classes.

  - Replace the DBus-based MonitorHelper and ManagerHelper with their
    AMP-based equivalent.
  
  - Make the BrokerServer subscribe to the exchanger events, like "message",
   "impending-exchange", "exchange-failed", "registration-done", etc. and
    notifiy all connected clients about them.
        
  - Convert the usermanager and usermonitor communication mechanism from
    D-Bus to our AMP-based protocol.
  
  - Add a ManagerService class that takes care of starting the manager
    and make it listen to the correct Unix socket for incoming AMP
    connections.
 
  - Add a MonitorService class that takes care of starting the monitor
    and make it listen to the correct Unix socket for incoming AMP
    connections.

  - Add a BrokerService class that starts listening for incoming AMP connections
    on a Unix socket.
  
  - Add a BrokerClientProtocol for exposing the methods of a broker client
    to the broker server.

  - Decouple the broker client API logic from DBus, adding a new BrokerClient
    class implementing the behavior the broker expects for its clients.

  - Add a BrokerServerProtocol class that can be used by the broker
    clients (monitor and manager) to perform remote method calls
    on the BrokerServer.

  - Add a MethodCall-based protocol for basic communication between the
    various Landscape components.
  
  - Add a BrokerServer class implementing the same methods and
    interfaces of BrokerDBusObject, but not publishing them on
    DBus. The methods of this class will be exposed to the broker
    clients (the monitor and the manager) by an AMP-based protocol.

  - Add a reconnection mechanism to the MethodCall protocol makes it:
  
    * Keep trying to connect if the very fist attempt failed (for instance the
      remote process we're trying to contact is not yet ready to accept
      connections).
  
    * Try to reconnect if the connection drops (for instance the remote
      process died and the watchdog started it again).
  
    * Try to re-perform a failed MethodCall request as soon as the connection
      is up again, in case it failed because it was issued during a connection 
      blackout (for instance a monitor plugin tries to call a 
      RemoteBroker.send_message while the broker is restarting).
  
  - Make the MethodCall protocol able to cope with remote methods returning
    deferreds.
  
  - Add an AMP-based protocol to easily expose objects over AMP and perform
    remote method calls on them.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
import logging
 
2
 
 
3
from twisted.internet.defer import Deferred
 
4
 
 
5
from landscape.lib.twisted_util import gather_results
 
6
from landscape.amp import RemoteComponentsRegistry
 
7
from landscape.manager.manager import FAILED
 
8
 
 
9
 
 
10
def event(method):
 
11
    """Turns a L{BrokerServer} method into an event broadcaster.
 
12
 
 
13
    When the decorated method is called, an event is fired on all connected
 
14
    clients. The event will have the same name as the method being called,
 
15
    except that any underscore in the method name will be replaced with a dash.
 
16
    """
 
17
    event_type = method.__name__.replace("_", "-")
 
18
 
 
19
    def broadcast_event(self, *args, **kwargs):
 
20
        fired = []
 
21
        for client in self.get_clients():
 
22
            fired.append(client.fire_event(event_type, *args, **kwargs))
 
23
        return gather_results(fired)
 
24
 
 
25
    return broadcast_event
 
26
 
 
27
 
 
28
class BrokerServer(object):
 
29
    """
 
30
    A broker server capable of handling messages from plugins connected using
 
31
    the L{BrokerProtocol}.
 
32
    """
 
33
    name = "broker"
 
34
    connectors_registry = RemoteComponentsRegistry
 
35
 
 
36
    def __init__(self, config, reactor, exchange, registration,
 
37
                 message_store):
 
38
        """
 
39
        @param config: The L{BrokerConfiguration} used by the broker.
 
40
        @param reactor: The L{TwistedReactor} driving the broker's events.
 
41
        @param exchange: The L{MessageExchange} to send messages with.
 
42
        @param registration: The {RegistrationHandler}.
 
43
        @param message_store: The broker's L{MessageStore}.
 
44
        """
 
45
        self._config = config
 
46
        self._reactor = reactor
 
47
        self._exchanger = exchange
 
48
        self._registration = registration
 
49
        self._message_store = message_store
 
50
        self._registered_clients = {}
 
51
        self._connectors = {}
 
52
 
 
53
        reactor.call_on("message", self.broadcast_message)
 
54
        reactor.call_on("impending-exchange", self.impending_exchange)
 
55
        reactor.call_on("exchange-failed", self.exchange_failed)
 
56
        reactor.call_on("registration-done", self.registration_done)
 
57
        reactor.call_on("registration-failed", self.registration_failed)
 
58
        reactor.call_on("message-type-acceptance-changed",
 
59
                        self.message_type_acceptance_changed)
 
60
        reactor.call_on("server-uuid-changed", self.server_uuid_changed)
 
61
        reactor.call_on("package-data-changed", self.package_data_changed)
 
62
        reactor.call_on("resynchronize-clients", self.resynchronize)
 
63
 
 
64
    def ping(self):
 
65
        """Return C{True}."""
 
66
        return True
 
67
 
 
68
    def register_client(self, name):
 
69
        """Register a broker client called C{name}.
 
70
 
 
71
        Various broker clients interact with the broker server, such as the
 
72
        monitor for example, using the L{BrokerServerProtocol} for performing
 
73
        remote method calls on the L{BrokerServer}.
 
74
 
 
75
        They establish connectivity with the broker by connecting and
 
76
        registering themselves, the L{BrokerServer} will in turn connect
 
77
        to them in order to be able to perform remote method calls like
 
78
        broadcasting events and messages.
 
79
 
 
80
        @param name: The name of the client, such a C{monitor} or C{manager}.
 
81
        """
 
82
        connector_class = self.connectors_registry.get(name)
 
83
        connector = connector_class(self._reactor, self._config)
 
84
 
 
85
        def register(remote_client):
 
86
            self._registered_clients[name] = remote_client
 
87
            self._connectors[remote_client] = connector
 
88
 
 
89
        connected = connector.connect()
 
90
        return connected.addCallback(register)
 
91
 
 
92
    def get_clients(self):
 
93
        """Get L{RemoteClient} instances for registered clients."""
 
94
        return self._registered_clients.values()
 
95
 
 
96
    def get_client(self, name):
 
97
        """Return the client with the given C{name} or C{None}."""
 
98
        return self._registered_clients.get(name)
 
99
 
 
100
    def get_connectors(self):
 
101
        """Get connectors for registered clients.
 
102
 
 
103
        @see L{RemoteLandscapeComponentCreator}.
 
104
        """
 
105
        return self._connectors.values()
 
106
 
 
107
    def get_connector(self, name):
 
108
        """Return the connector for the given C{name} or C{None}."""
 
109
        return self._connectors.get(self.get_client(name))
 
110
 
 
111
    def send_message(self, message, urgent=False):
 
112
        """Queue C{message} for delivery to the server at the next exchange.
 
113
 
 
114
        @param message: The message C{dict} to send to the server.  It must
 
115
            have a C{type} key and be compatible with C{landscape.lib.bpickle}.
 
116
        @param urgent: If C{True}, exchange urgently, otherwise exchange
 
117
            during the next regularly scheduled exchange.
 
118
        @return: The message identifier created when queuing C{message}.
 
119
        """
 
120
        return self._exchanger.send(message, urgent=urgent)
 
121
 
 
122
    def is_message_pending(self, message_id):
 
123
        """Indicate if a message with given C{message_id} is pending."""
 
124
        return self._message_store.is_pending(message_id)
 
125
 
 
126
    def stop_clients(self):
 
127
        """Tell all the clients to exit."""
 
128
        results = []
 
129
        # FIXME: check whether the client are still alive
 
130
        for client in self.get_clients():
 
131
            results.append(client.exit())
 
132
        result = gather_results(results, consume_errors=True)
 
133
        return result.addCallback(lambda ignored: None)
 
134
 
 
135
    def reload_configuration(self):
 
136
        """Reload the configuration file, and stop all clients."""
 
137
        self._config.reload()
 
138
        # Now we'll kill off everything else so that they can be restarted and
 
139
        # notice configuration changes.
 
140
        return self.stop_clients()
 
141
 
 
142
    def register(self):
 
143
        """Attempt to register with the Landscape server.
 
144
 
 
145
        @see: L{RegistrationHandler.register}
 
146
        """
 
147
        return self._registration.register()
 
148
 
 
149
    def get_accepted_message_types(self):
 
150
        """Return the message types accepted by the Landscape server."""
 
151
        return self._message_store.get_accepted_types()
 
152
 
 
153
    def get_server_uuid(self):
 
154
        """Return the uuid of the Landscape server we're pointing at."""
 
155
        # Convert Nones to empty strings.  The Remote will
 
156
        # convert them back to Nones.
 
157
        return self._message_store.get_server_uuid()
 
158
 
 
159
    def register_client_accepted_message_type(self, type):
 
160
        """Register a new message type which can be accepted by this client.
 
161
 
 
162
        @param type: The message type to accept.
 
163
        """
 
164
        self._exchanger.register_client_accepted_message_type(type)
 
165
 
 
166
    def fire_event(self, event_type):
 
167
        """Fire an event in the broker reactor."""
 
168
        self._reactor.fire(event_type)
 
169
 
 
170
    def exit(self):
 
171
        """Request a graceful exit from the broker server.
 
172
 
 
173
        Before this method returns, all broker clients will be notified
 
174
        of the server broker's intention of exiting, so that they have
 
175
        the chance to stop whatever they're doing in a graceful way, and
 
176
        then exit themselves.
 
177
 
 
178
        This method will only return a result when all plugins returned
 
179
        their own results.
 
180
        """
 
181
        # Fire pre-exit before calling any of the plugins, so that everything
 
182
        # in the broker acknowledges that we're about to exit and asking
 
183
        # broker clients to die.  This prevents any exchanges from happening,
 
184
        # for instance.
 
185
        self._reactor.fire("pre-exit")
 
186
 
 
187
        clients_stopped = self.stop_clients()
 
188
 
 
189
        def fire_post_exit(ignored):
 
190
            # Fire it shortly, to give us a chance to send an AMP reply.
 
191
            self._reactor.call_later(
 
192
                1, lambda: self._reactor.fire("post-exit"))
 
193
 
 
194
        return clients_stopped.addBoth(fire_post_exit)
 
195
 
 
196
    @event
 
197
    def resynchronize(self):
 
198
        """Broadcast a C{resynchronize} event to the clients."""
 
199
 
 
200
    @event
 
201
    def impending_exchange(self):
 
202
        """Broadcast an C{impending-exchange} event to the clients."""
 
203
 
 
204
    @event
 
205
    def exchange_failed(self):
 
206
        """Broadcast a C{exchange-failed} event to the clients."""
 
207
 
 
208
    @event
 
209
    def registration_done(self):
 
210
        """Broadcast a C{registration-done} event to the clients."""
 
211
 
 
212
    @event
 
213
    def registration_failed(self):
 
214
        """Broadcast a C{registration-failed} event to the clients."""
 
215
 
 
216
    def listen_events(self, event_types):
 
217
        """
 
218
        Return a C{Deferred} that fires when the first event occurs among the
 
219
        given ones.
 
220
        """
 
221
        deferred = Deferred()
 
222
        calls = []
 
223
 
 
224
        def get_handler(event_type):
 
225
 
 
226
            def handler():
 
227
                for call in calls:
 
228
                    self._reactor.cancel_call(call)
 
229
                deferred.callback(event_type)
 
230
 
 
231
            return handler
 
232
 
 
233
        for event_type in event_types:
 
234
            call = self._reactor.call_on(event_type, get_handler(event_type))
 
235
            calls.append(call)
 
236
        return deferred
 
237
 
 
238
    @event
 
239
    def broker_reconnect(self):
 
240
        """Broadcast a C{broker-reconnect} event to the clients."""
 
241
 
 
242
    @event
 
243
    def server_uuid_changed(self, old_uuid, new_uuid):
 
244
        """Broadcast a C{server-uuid-changed} event to the clients."""
 
245
 
 
246
    @event
 
247
    def message_type_acceptance_changed(self, type, accepted):
 
248
        pass
 
249
 
 
250
    @event
 
251
    def package_data_changed(self):
 
252
        """Fire a package-data-changed event in the reactor of each client."""
 
253
 
 
254
    def broadcast_message(self, message):
 
255
        """Call the C{message} method of all the registered plugins.
 
256
 
 
257
        @see: L{register_plugin}.
 
258
        """
 
259
        results = []
 
260
        for client in self.get_clients():
 
261
            results.append(client.message(message))
 
262
        result = gather_results(results)
 
263
        return result.addCallback(self._message_delivered, message)
 
264
 
 
265
    def _message_delivered(self, results, message):
 
266
        """
 
267
        If the message wasn't handled, and it's an operation request (i.e. it
 
268
        has an operation-id), then respond with a failing operation result
 
269
        indicating as such.
 
270
        """
 
271
        opid = message.get("operation-id")
 
272
        if (True not in results
 
273
            and opid is not None
 
274
            and message["type"] != "resynchronize"):
 
275
            mtype = message["type"]
 
276
            logging.error("Nobody handled the %s message." % (mtype,))
 
277
 
 
278
            result_text = """\
 
279
Landscape client failed to handle this request (%s) because the
 
280
plugin which should handle it isn't available.  This could mean that the
 
281
plugin has been intentionally disabled, or that the client isn't running
 
282
properly, or you may be running an older version of the client that doesn't
 
283
support this feature.
 
284
 
 
285
Please contact the Landscape team for more information.
 
286
""" % (mtype,)
 
287
            response = {
 
288
                "type": "operation-result",
 
289
                "status": FAILED,
 
290
                "result-text": result_text,
 
291
                "operation-id": opid}
 
292
            self._exchanger.send(response, urgent=True)