~ubuntu-branches/ubuntu/maverick/landscape-client/maverick-proposed

« back to all changes in this revision

Viewing changes to landscape/lib/amp.py

  • Committer: Bazaar Package Importer
  • Author(s): Free Ekanayaka
  • Date: 2010-04-21 19:58:10 UTC
  • mfrom: (1.1.16 upstream)
  • Revision ID: james.westby@ubuntu.com-20100421195810-s30uv3s6i27lue38
Tags: 1.5.2-0ubuntu0.10.10.0
* New upstream version (LP: #594594):
  - A new includes information about active network devices and their
    IP address in sysinfo output (LP: #272344).
  - A new plugin collects information about network traffic (#LP :284662).
  - Report information about which packages requested a reboot (LP: #538253).
  - Fix breakage on Lucid AMIs having no ramdisk (LP: #574810).
  - Migrate the inter-process communication system from DBus to Twisted AMP.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
"""Expose the methods of a remote object over AMP."""
 
2
 
 
3
from twisted.internet.defer import Deferred, maybeDeferred
 
4
from twisted.internet.protocol import ReconnectingClientFactory
 
5
try:
 
6
    from twisted.protocols.amp import (
 
7
        Argument, String, Integer, Command, AMP, MAX_VALUE_LENGTH)
 
8
except ImportError:
 
9
    from landscape.lib.twisted_amp import (
 
10
        Argument, String, Integer, Command, AMP, MAX_VALUE_LENGTH)
 
11
    
 
12
from twisted.python.failure import Failure
 
13
 
 
14
from landscape.lib.bpickle import loads, dumps, dumps_table
 
15
 
 
16
 
 
17
class MethodCallArgument(Argument):
 
18
    """A bpickle-compatible argument."""
 
19
 
 
20
    def toString(self, inObject):
 
21
        """Serialize an argument."""
 
22
        return dumps(inObject)
 
23
 
 
24
    def fromString(self, inString):
 
25
        """Unserialize an argument."""
 
26
        return loads(inString)
 
27
 
 
28
    @classmethod
 
29
    def check(cls, inObject):
 
30
        """Check if an argument is serializable."""
 
31
        return type(inObject) in dumps_table
 
32
 
 
33
 
 
34
class MethodCallError(Exception):
 
35
    """Raised when a L{MethodCall} command fails."""
 
36
 
 
37
 
 
38
class MethodCall(Command):
 
39
    """Call a method on the object exposed by a L{MethodCallProtocol}."""
 
40
 
 
41
    arguments = [("sequence", Integer()),
 
42
                 ("method", String()),
 
43
                 ("arguments", String())]
 
44
 
 
45
    response = [("result", MethodCallArgument())]
 
46
 
 
47
    errors = {MethodCallError: "METHOD_CALL_ERROR"}
 
48
 
 
49
 
 
50
class MethodCallChunk(Command):
 
51
    """Send a chunk of L{MethodCall} containing a portion of the arguments.
 
52
 
 
53
    When a the arguments of a L{MethodCall} are bigger than 64k, they get split
 
54
    in several L{MethodCallChunk}s that are buffered on the receiver side.
 
55
    """
 
56
 
 
57
    arguments = [("sequence", Integer()),
 
58
                 ("chunk", String())]
 
59
 
 
60
    response = [("result", Integer())]
 
61
 
 
62
    errors = {MethodCallError: "METHOD_CALL_ERROR"}
 
63
 
 
64
 
 
65
class MethodCallServerProtocol(AMP):
 
66
    """Expose methods of a local object over AMP.
 
67
 
 
68
    The object to be exposed is expected to be the C{object} attribute of our
 
69
    protocol factory.
 
70
 
 
71
    @cvar methods: The list of exposed object's methods that can be called with
 
72
        the protocol. It must be defined by sub-classes.
 
73
    """
 
74
 
 
75
    methods = []
 
76
 
 
77
    def __init__(self):
 
78
        AMP.__init__(self)
 
79
        self._pending_chunks = {}
 
80
 
 
81
    @MethodCall.responder
 
82
    def receive_method_call(self, sequence, method, arguments):
 
83
        """Call an object's method with the given arguments.
 
84
 
 
85
        If a connected client sends a L{MethodCall} for method C{foo_bar}, then
 
86
        the actual method C{foo_bar} of the object associated with the protocol
 
87
        will be called with the given C{args} and C{kwargs} and its return
 
88
        value delivered back to the client as response to the command.
 
89
 
 
90
        @param sequence: The integer that uniquely identifies the L{MethodCall}
 
91
            being received.
 
92
        @param method: The name of the object's method to call.
 
93
        @param arguments: A bpickle'd binary tuple of (args, kwargs) to be
 
94
           passed to the method. In case this L{MethodCall} has been preceded
 
95
           by one or more L{MethodCallChunk}s, C{arguments} is the last chunk
 
96
           of data.
 
97
        """
 
98
        chunks = self._pending_chunks.pop(sequence, None)
 
99
        if chunks is not None:
 
100
            # We got some L{MethodCallChunk}s before, this is the last.
 
101
            chunks.append(arguments)
 
102
            arguments = "".join(chunks)
 
103
 
 
104
        args, kwargs = loads(arguments)
 
105
 
 
106
        if not method in self.methods:
 
107
            raise MethodCallError("Forbidden method '%s'" % method)
 
108
 
 
109
        method_func = getattr(self.factory.object, method)
 
110
 
 
111
        def handle_result(result):
 
112
            return {"result": self._check_result(result)}
 
113
 
 
114
        def handle_failure(failure):
 
115
            raise MethodCallError(failure.value)
 
116
 
 
117
        deferred = maybeDeferred(method_func, *args, **kwargs)
 
118
        deferred.addCallback(handle_result)
 
119
        deferred.addErrback(handle_failure)
 
120
        return deferred
 
121
 
 
122
    @MethodCallChunk.responder
 
123
    def receive_method_call_chunk(self, sequence, chunk):
 
124
        """Receive a part of a multi-chunk L{MethodCall}.
 
125
 
 
126
        Add the received C{chunk} to the buffer of the L{MethodCall} identified
 
127
        by C{sequence}.
 
128
        """
 
129
        self._pending_chunks.setdefault(sequence, []).append(chunk)
 
130
        return {"result": sequence}
 
131
 
 
132
    def _check_result(self, result):
 
133
        """Check that the C{result} we're about to return is serializable.
 
134
 
 
135
        @return: The C{result} itself if valid.
 
136
        @raises: L{MethodCallError} if C{result} is not serializable.
 
137
        """
 
138
        if not MethodCallArgument.check(result):
 
139
            raise MethodCallError("Non-serializable result")
 
140
        return result
 
141
 
 
142
 
 
143
class MethodCallClientProtocol(AMP):
 
144
    """Calls methods of a remote object over L{AMP}.
 
145
 
 
146
    @note: If the remote method returns a deferred, the associated local
 
147
        deferred returned by L{send_method_call} will result in the same
 
148
        callback value of the remote deferred.
 
149
    @cvar timeout: A timeout for remote methods returning L{Deferred}s, if a
 
150
        response for the deferred is not received within this amount of
 
151
        seconds, the remote method call will errback with a L{MethodCallError}.
 
152
    """
 
153
    timeout = 60
 
154
    chunk_size = MAX_VALUE_LENGTH
 
155
 
 
156
    def __init__(self):
 
157
        AMP.__init__(self)
 
158
        self._pending_responses = []
 
159
        self._sequence = 0
 
160
 
 
161
    def _create_sequence(self):
 
162
        """Return a unique sequence number for a L{MethodCall}."""
 
163
        self._sequence += 1
 
164
        return self._sequence
 
165
 
 
166
    def _call_remote_with_timeout(self, command, **kwargs):
 
167
        """Send an L{AMP} command that will errback in case of a timeout.
 
168
 
 
169
        @return: A deferred resulting in the command's response (or failure) if
 
170
            the peer responds within L{MethodClientProtocol.timeout} seconds,
 
171
            or that errbacks with a L{MethodCallError} otherwise.
 
172
        """
 
173
        deferred = Deferred()
 
174
 
 
175
        def handle_response(response):
 
176
            if call.called:
 
177
                # Late response for a request that has timeout,
 
178
                # just ignore it.
 
179
                return
 
180
            call.cancel()
 
181
            deferred.callback(response)
 
182
 
 
183
        def handle_timeout():
 
184
            # The peer didn't respond on time, raise an error.
 
185
            deferred.errback(MethodCallError("timeout"))
 
186
 
 
187
        call = self.factory.reactor.callLater(self.timeout, handle_timeout)
 
188
 
 
189
        result = self.callRemote(command, **kwargs)
 
190
        result.addBoth(handle_response)
 
191
        return deferred
 
192
 
 
193
    def send_method_call(self, method, args=[], kwargs={}):
 
194
        """Send a L{MethodCall} command with the given arguments.
 
195
 
 
196
        @param method: The name of the remote method to invoke.
 
197
        @param args: The positional arguments to pass to the remote method.
 
198
        @param kwargs: The keyword arguments to pass to the remote method.
 
199
        """
 
200
        arguments = dumps((args, kwargs))
 
201
        sequence = self._create_sequence()
 
202
 
 
203
        # Split the given arguments in one or more chunks
 
204
        chunks = [arguments[i:i + self.chunk_size]
 
205
                  for i in xrange(0, len(arguments), self.chunk_size)]
 
206
 
 
207
        result = Deferred()
 
208
        if len(chunks) > 1:
 
209
            # If we have N chunks, send the first N-1 as MethodCallChunk's
 
210
            for chunk in chunks[:-1]:
 
211
 
 
212
                def create_send_chunk(sequence, chunk):
 
213
                    send_chunk = lambda x: self.callRemote(
 
214
                        MethodCallChunk, sequence=sequence, chunk=chunk)
 
215
                    return send_chunk
 
216
 
 
217
                result.addCallback(create_send_chunk(sequence, chunk))
 
218
 
 
219
        def send_last_chunk(ignored):
 
220
            chunk = chunks[-1]
 
221
            return self._call_remote_with_timeout(
 
222
                MethodCall, sequence=sequence, method=method, arguments=chunk)
 
223
 
 
224
        result.addCallback(send_last_chunk)
 
225
        result.callback(None)
 
226
        return result
 
227
 
 
228
 
 
229
class MethodCallProtocol(MethodCallServerProtocol, MethodCallClientProtocol):
 
230
    """Can be used both for sending and receiving L{MethodCall}s."""
 
231
 
 
232
    def __init__(self):
 
233
        MethodCallServerProtocol.__init__(self)
 
234
        MethodCallClientProtocol.__init__(self)
 
235
 
 
236
 
 
237
class MethodCallFactory(ReconnectingClientFactory):
 
238
    """
 
239
    Factory for L{MethodCallProtocol}s exposing an object or connecting to
 
240
    to L{MethodCall} servers.
 
241
 
 
242
    When used to connect, if the connection fails or is lost the factory
 
243
    will keep retrying to establish it.
 
244
 
 
245
    @cvar protocol: The factory used to build protocol instances.
 
246
    @cvar factor: The time factor by which the delay between two subsequent
 
247
        connection retries will increase.
 
248
    @cvar maxDelay: Maximum number of seconds between connection attempts.
 
249
    """
 
250
 
 
251
    protocol = MethodCallProtocol
 
252
    factor = 1.6180339887498948
 
253
    maxDelay = 30
 
254
 
 
255
    def __init__(self, object=None, reactor=None):
 
256
        """
 
257
        @param object: The object exposed by the L{MethodCallProtocol}s
 
258
            instances created by this factory.
 
259
        @param reactor: The reactor used by the created protocols
 
260
            to schedule notifications and timeouts.
 
261
        """
 
262
        self.object = object
 
263
        self.reactor = reactor
 
264
        self.clock = self.reactor
 
265
        self.delay = self.initialDelay
 
266
        self._notifiers = []
 
267
 
 
268
    def add_notifier(self, callback, errback=None):
 
269
        """Call the given function on connection, reconnection or give up.
 
270
 
 
271
        @param notifier: A function that will be called when the factory builds
 
272
            a new connected protocol or gives up connecting.  It will be passed
 
273
            the new protocol instance as argument, or the connectionf failure.
 
274
        """
 
275
        self._notifiers.append((callback, errback))
 
276
 
 
277
    def remove_notifier(self, callback, errback=None):
 
278
        """Remove a notifier."""
 
279
        self._notifiers.remove((callback, errback))
 
280
 
 
281
    def notify_success(self, *args, **kwargs):
 
282
        """Notify all registered notifier callbacks."""
 
283
        for callback, _ in self._notifiers:
 
284
            self.reactor.callLater(0, callback, *args, **kwargs)
 
285
 
 
286
    def notify_failure(self, failure):
 
287
        """Notify all registered notifier errbacks."""
 
288
        for _, errback in self._notifiers:
 
289
            if errback is not None:
 
290
                self.reactor.callLater(0, errback, failure)
 
291
 
 
292
    def clientConnectionFailed(self, connector, reason):
 
293
        ReconnectingClientFactory.clientConnectionFailed(self, connector,
 
294
                                                         reason)
 
295
        if self.maxRetries is not None and (self.retries > self.maxRetries):
 
296
            self.notify_failure(reason) # Give up
 
297
 
 
298
    def buildProtocol(self, addr):
 
299
        self.resetDelay()
 
300
        protocol = self.protocol()
 
301
        protocol.factory = self
 
302
        self.notify_success(protocol)
 
303
        return protocol
 
304
 
 
305
 
 
306
class RemoteObject(object):
 
307
    """An object able to transparently call methods on a remote object.
 
308
 
 
309
    Any method call on a L{RemoteObject} instance will return a L{Deferred}
 
310
    resulting in the return value of the same method call performed on
 
311
    the remote object exposed by the peer.
 
312
    """
 
313
 
 
314
    def __init__(self, protocol, retry_on_reconnect=False, timeout=None):
 
315
        """
 
316
        @param protocol: A reference to a connected L{AMP} protocol instance,
 
317
            which will be used to send L{MethodCall} commands.
 
318
        @param retry_on_reconnect: If C{True}, this L{RemoteObject} will retry
 
319
            to perform again requests that failed due to a lost connection, as
 
320
            soon as a new connection is available.
 
321
        @param timeout: A timeout for failed requests, if the L{RemoteObject}
 
322
            can't perform them again successfully within this number of
 
323
            seconds, they will errback with a L{MethodCallError}.
 
324
        """
 
325
        self._protocol = protocol
 
326
        self._factory = protocol.factory
 
327
        self._reactor = protocol.factory.reactor
 
328
        self._retry_on_reconnect = retry_on_reconnect
 
329
        self._timeout = timeout
 
330
        self._pending_requests = {}
 
331
        self._factory.add_notifier(self._handle_reconnect)
 
332
 
 
333
    def __getattr__(self, method):
 
334
        """Return a function sending a L{MethodCall} for the given C{method}.
 
335
 
 
336
        When the created function is called, it sends the an appropriate
 
337
        L{MethodCall} to the remote peer passing it the arguments and
 
338
        keyword arguments it was called with, and returning a L{Deferred}
 
339
        resulting in the L{MethodCall}'s response value.
 
340
        """
 
341
 
 
342
        def send_method_call(*args, **kwargs):
 
343
            result = self._protocol.send_method_call(method=method,
 
344
                                                     args=args,
 
345
                                                     kwargs=kwargs)
 
346
            deferred = Deferred()
 
347
            result.addCallback(self._handle_response, deferred)
 
348
            result.addErrback(self._handle_failure, method, args, kwargs,
 
349
                              deferred)
 
350
            return deferred
 
351
 
 
352
        return send_method_call
 
353
 
 
354
    def _handle_response(self, response, deferred, call=None):
 
355
        """Handles a successful L{MethodCall} response.
 
356
 
 
357
        @param response: The L{MethodCall} response.
 
358
        @param deferred: The deferred that was returned to the caller.
 
359
        @param call: If not C{None}, the scheduled timeout call associated with
 
360
            the given deferred.
 
361
        """
 
362
        result = response["result"]
 
363
        if call is not None:
 
364
            call.cancel() # This is a successful retry, cancel the timeout.
 
365
        deferred.callback(result)
 
366
 
 
367
    def _handle_failure(self, failure, method, args, kwargs, deferred,
 
368
                        call=None):
 
369
        """Called when a L{MethodCall} command fails.
 
370
 
 
371
        If a failure is due to a connection error and if C{retry_on_reconnect}
 
372
        is C{True}, we will try to perform the requested L{MethodCall} again
 
373
        as soon as a new connection becomes available, giving up after the
 
374
        specified C{timeout}, if any.
 
375
 
 
376
        @param failure: The L{Failure} raised by the requested L{MethodCall}.
 
377
        @param name: The method name associated with the failed L{MethodCall}.
 
378
        @param args: The positional arguments of the failed L{MethodCall}.
 
379
        @param kwargs: The keyword arguments of the failed L{MethodCall}.
 
380
        @param deferred: The deferred that was returned to the caller.
 
381
        @param call: If not C{None}, the scheduled timeout call associated with
 
382
            the given deferred.
 
383
        """
 
384
        is_method_call_error = failure.type is MethodCallError
 
385
        dont_retry = self._retry_on_reconnect == False
 
386
 
 
387
        if is_method_call_error or dont_retry:
 
388
            # This means either that the connection is working, and a
 
389
            # MethodCall protocol error occured, or that we gave up
 
390
            # trying and raised a timeout. In any case just propagate
 
391
            # the error.
 
392
            if deferred in self._pending_requests:
 
393
                self._pending_requests.pop(deferred)
 
394
            if call:
 
395
                call.cancel()
 
396
            deferred.errback(failure)
 
397
            return
 
398
 
 
399
        if self._timeout and call is None:
 
400
            # This is the first failure for this request, let's schedule a
 
401
            # timeout call.
 
402
            timeout = Failure(MethodCallError("timeout"))
 
403
            call = self._reactor.callLater(self._timeout,
 
404
                                           self._handle_failure,
 
405
                                           timeout, method, args,
 
406
                                           kwargs, deferred=deferred)
 
407
 
 
408
        self._pending_requests[deferred] = (method, args, kwargs, call)
 
409
 
 
410
    def _handle_reconnect(self, protocol):
 
411
        """Handles a reconnection.
 
412
 
 
413
        @param protocol: The newly connected protocol instance.
 
414
        """
 
415
        self._protocol = protocol
 
416
        if self._retry_on_reconnect:
 
417
            self._retry()
 
418
 
 
419
    def _retry(self):
 
420
        """Try to perform again requests that failed."""
 
421
 
 
422
        # We need to copy the requests list before iterating over it, because
 
423
        # if we are actually still disconnected, callRemote will return a
 
424
        # failed deferred and the _handle_failure errback will be executed
 
425
        # synchronously during the loop, modifing the requests list itself.
 
426
        requests = self._pending_requests.copy()
 
427
        self._pending_requests.clear()
 
428
 
 
429
        while requests:
 
430
            deferred, (method, args, kwargs, call) = requests.popitem()
 
431
            result = self._protocol.send_method_call(method, args, kwargs)
 
432
            result.addCallback(self._handle_response,
 
433
                               deferred=deferred, call=call)
 
434
            result.addErrback(self._handle_failure, method, args, kwargs,
 
435
                              deferred=deferred, call=call)
 
436
 
 
437
 
 
438
class RemoteObjectConnector(object):
 
439
    """Connect to remote objects exposed by a L{MethodCallProtocol}."""
 
440
 
 
441
    factory = MethodCallFactory
 
442
    remote = RemoteObject
 
443
 
 
444
    def __init__(self, reactor, socket_path, *args, **kwargs):
 
445
        """
 
446
        @param reactor: A reactor able to connect to Unix sockets.
 
447
        @param socket: The path to the socket we want to connect to.
 
448
        @param args: Arguments to be passed to the created L{RemoteObject}.
 
449
        @param kwargs: Keyword arguments for the created L{RemoteObject}.
 
450
        """
 
451
        self._socket_path = socket_path
 
452
        self._reactor = reactor
 
453
        self._args = args
 
454
        self._kwargs = kwargs
 
455
        self._remote = None
 
456
        self._factory = None
 
457
 
 
458
    def connect(self, max_retries=None, factor=None):
 
459
        """Connect to a remote object exposed by a L{MethodCallProtocol}.
 
460
 
 
461
        This method will connect to the socket provided in the constructor
 
462
        and return a L{Deferred} resulting in a connected L{RemoteObject}.
 
463
 
 
464
        @param max_retries: If not C{None} give up try to connect after this
 
465
            amount of times, otherwise keep trying to connect forever.
 
466
        @param factor: Optionally a float indicating by which factor the
 
467
            delay between subsequent retries should increase. Smaller values
 
468
            result in a faster reconnection attempts pace.
 
469
        """
 
470
        self._connected = Deferred()
 
471
        self._factory = self.factory(reactor=self._reactor)
 
472
        self._factory.maxRetries = max_retries
 
473
        if factor:
 
474
            self._factory.factor = factor
 
475
        self._factory.add_notifier(self._success, self._failure)
 
476
        self._reactor.connectUNIX(self._socket_path, self._factory)
 
477
        return self._connected
 
478
 
 
479
    def _success(self, result):
 
480
        """Called when the first connection has been established"""
 
481
 
 
482
        # We did our job, remove our own notifier and let the remote object
 
483
        # handle reconnections.
 
484
        self._factory.remove_notifier(self._success, self._failure)
 
485
        self._remote = self.remote(result, *self._args, **self._kwargs)
 
486
        self._connected.callback(self._remote)
 
487
 
 
488
    def _failure(self, failure):
 
489
        """Called when the first connection has failed"""
 
490
        self._connected.errback(failure)
 
491
 
 
492
    def disconnect(self):
 
493
        """Disconnect the L{RemoteObject} that we have created."""
 
494
        if self._factory:
 
495
            self._factory.stopTrying()
 
496
        if self._remote:
 
497
            if self._remote._protocol.transport:
 
498
                self._remote._protocol.transport.loseConnection()
 
499
            self._remote = None