1
"""Expose the methods of a remote object over AMP."""
3
from twisted.internet.defer import Deferred, maybeDeferred
4
from twisted.internet.protocol import ReconnectingClientFactory
6
from twisted.protocols.amp import (
7
Argument, String, Integer, Command, AMP, MAX_VALUE_LENGTH)
9
from landscape.lib.twisted_amp import (
10
Argument, String, Integer, Command, AMP, MAX_VALUE_LENGTH)
12
from twisted.python.failure import Failure
14
from landscape.lib.bpickle import loads, dumps, dumps_table
17
class MethodCallArgument(Argument):
18
"""A bpickle-compatible argument."""
20
def toString(self, inObject):
21
"""Serialize an argument."""
22
return dumps(inObject)
24
def fromString(self, inString):
25
"""Unserialize an argument."""
26
return loads(inString)
29
def check(cls, inObject):
30
"""Check if an argument is serializable."""
31
return type(inObject) in dumps_table
34
class MethodCallError(Exception):
35
"""Raised when a L{MethodCall} command fails."""
38
class MethodCall(Command):
39
"""Call a method on the object exposed by a L{MethodCallProtocol}."""
41
arguments = [("sequence", Integer()),
43
("arguments", String())]
45
response = [("result", MethodCallArgument())]
47
errors = {MethodCallError: "METHOD_CALL_ERROR"}
50
class MethodCallChunk(Command):
51
"""Send a chunk of L{MethodCall} containing a portion of the arguments.
53
When a the arguments of a L{MethodCall} are bigger than 64k, they get split
54
in several L{MethodCallChunk}s that are buffered on the receiver side.
57
arguments = [("sequence", Integer()),
60
response = [("result", Integer())]
62
errors = {MethodCallError: "METHOD_CALL_ERROR"}
65
class MethodCallServerProtocol(AMP):
66
"""Expose methods of a local object over AMP.
68
The object to be exposed is expected to be the C{object} attribute of our
71
@cvar methods: The list of exposed object's methods that can be called with
72
the protocol. It must be defined by sub-classes.
79
self._pending_chunks = {}
82
def receive_method_call(self, sequence, method, arguments):
83
"""Call an object's method with the given arguments.
85
If a connected client sends a L{MethodCall} for method C{foo_bar}, then
86
the actual method C{foo_bar} of the object associated with the protocol
87
will be called with the given C{args} and C{kwargs} and its return
88
value delivered back to the client as response to the command.
90
@param sequence: The integer that uniquely identifies the L{MethodCall}
92
@param method: The name of the object's method to call.
93
@param arguments: A bpickle'd binary tuple of (args, kwargs) to be
94
passed to the method. In case this L{MethodCall} has been preceded
95
by one or more L{MethodCallChunk}s, C{arguments} is the last chunk
98
chunks = self._pending_chunks.pop(sequence, None)
99
if chunks is not None:
100
# We got some L{MethodCallChunk}s before, this is the last.
101
chunks.append(arguments)
102
arguments = "".join(chunks)
104
args, kwargs = loads(arguments)
106
if not method in self.methods:
107
raise MethodCallError("Forbidden method '%s'" % method)
109
method_func = getattr(self.factory.object, method)
111
def handle_result(result):
112
return {"result": self._check_result(result)}
114
def handle_failure(failure):
115
raise MethodCallError(failure.value)
117
deferred = maybeDeferred(method_func, *args, **kwargs)
118
deferred.addCallback(handle_result)
119
deferred.addErrback(handle_failure)
122
@MethodCallChunk.responder
123
def receive_method_call_chunk(self, sequence, chunk):
124
"""Receive a part of a multi-chunk L{MethodCall}.
126
Add the received C{chunk} to the buffer of the L{MethodCall} identified
129
self._pending_chunks.setdefault(sequence, []).append(chunk)
130
return {"result": sequence}
132
def _check_result(self, result):
133
"""Check that the C{result} we're about to return is serializable.
135
@return: The C{result} itself if valid.
136
@raises: L{MethodCallError} if C{result} is not serializable.
138
if not MethodCallArgument.check(result):
139
raise MethodCallError("Non-serializable result")
143
class MethodCallClientProtocol(AMP):
144
"""Calls methods of a remote object over L{AMP}.
146
@note: If the remote method returns a deferred, the associated local
147
deferred returned by L{send_method_call} will result in the same
148
callback value of the remote deferred.
149
@cvar timeout: A timeout for remote methods returning L{Deferred}s, if a
150
response for the deferred is not received within this amount of
151
seconds, the remote method call will errback with a L{MethodCallError}.
154
chunk_size = MAX_VALUE_LENGTH
158
self._pending_responses = []
161
def _create_sequence(self):
162
"""Return a unique sequence number for a L{MethodCall}."""
164
return self._sequence
166
def _call_remote_with_timeout(self, command, **kwargs):
167
"""Send an L{AMP} command that will errback in case of a timeout.
169
@return: A deferred resulting in the command's response (or failure) if
170
the peer responds within L{MethodClientProtocol.timeout} seconds,
171
or that errbacks with a L{MethodCallError} otherwise.
173
deferred = Deferred()
175
def handle_response(response):
177
# Late response for a request that has timeout,
181
deferred.callback(response)
183
def handle_timeout():
184
# The peer didn't respond on time, raise an error.
185
deferred.errback(MethodCallError("timeout"))
187
call = self.factory.reactor.callLater(self.timeout, handle_timeout)
189
result = self.callRemote(command, **kwargs)
190
result.addBoth(handle_response)
193
def send_method_call(self, method, args=[], kwargs={}):
194
"""Send a L{MethodCall} command with the given arguments.
196
@param method: The name of the remote method to invoke.
197
@param args: The positional arguments to pass to the remote method.
198
@param kwargs: The keyword arguments to pass to the remote method.
200
arguments = dumps((args, kwargs))
201
sequence = self._create_sequence()
203
# Split the given arguments in one or more chunks
204
chunks = [arguments[i:i + self.chunk_size]
205
for i in xrange(0, len(arguments), self.chunk_size)]
209
# If we have N chunks, send the first N-1 as MethodCallChunk's
210
for chunk in chunks[:-1]:
212
def create_send_chunk(sequence, chunk):
213
send_chunk = lambda x: self.callRemote(
214
MethodCallChunk, sequence=sequence, chunk=chunk)
217
result.addCallback(create_send_chunk(sequence, chunk))
219
def send_last_chunk(ignored):
221
return self._call_remote_with_timeout(
222
MethodCall, sequence=sequence, method=method, arguments=chunk)
224
result.addCallback(send_last_chunk)
225
result.callback(None)
229
class MethodCallProtocol(MethodCallServerProtocol, MethodCallClientProtocol):
230
"""Can be used both for sending and receiving L{MethodCall}s."""
233
MethodCallServerProtocol.__init__(self)
234
MethodCallClientProtocol.__init__(self)
237
class MethodCallFactory(ReconnectingClientFactory):
239
Factory for L{MethodCallProtocol}s exposing an object or connecting to
240
to L{MethodCall} servers.
242
When used to connect, if the connection fails or is lost the factory
243
will keep retrying to establish it.
245
@cvar protocol: The factory used to build protocol instances.
246
@cvar factor: The time factor by which the delay between two subsequent
247
connection retries will increase.
248
@cvar maxDelay: Maximum number of seconds between connection attempts.
251
protocol = MethodCallProtocol
252
factor = 1.6180339887498948
255
def __init__(self, object=None, reactor=None):
257
@param object: The object exposed by the L{MethodCallProtocol}s
258
instances created by this factory.
259
@param reactor: The reactor used by the created protocols
260
to schedule notifications and timeouts.
263
self.reactor = reactor
264
self.clock = self.reactor
265
self.delay = self.initialDelay
268
def add_notifier(self, callback, errback=None):
269
"""Call the given function on connection, reconnection or give up.
271
@param notifier: A function that will be called when the factory builds
272
a new connected protocol or gives up connecting. It will be passed
273
the new protocol instance as argument, or the connectionf failure.
275
self._notifiers.append((callback, errback))
277
def remove_notifier(self, callback, errback=None):
278
"""Remove a notifier."""
279
self._notifiers.remove((callback, errback))
281
def notify_success(self, *args, **kwargs):
282
"""Notify all registered notifier callbacks."""
283
for callback, _ in self._notifiers:
284
self.reactor.callLater(0, callback, *args, **kwargs)
286
def notify_failure(self, failure):
287
"""Notify all registered notifier errbacks."""
288
for _, errback in self._notifiers:
289
if errback is not None:
290
self.reactor.callLater(0, errback, failure)
292
def clientConnectionFailed(self, connector, reason):
293
ReconnectingClientFactory.clientConnectionFailed(self, connector,
295
if self.maxRetries is not None and (self.retries > self.maxRetries):
296
self.notify_failure(reason) # Give up
298
def buildProtocol(self, addr):
300
protocol = self.protocol()
301
protocol.factory = self
302
self.notify_success(protocol)
306
class RemoteObject(object):
307
"""An object able to transparently call methods on a remote object.
309
Any method call on a L{RemoteObject} instance will return a L{Deferred}
310
resulting in the return value of the same method call performed on
311
the remote object exposed by the peer.
314
def __init__(self, protocol, retry_on_reconnect=False, timeout=None):
316
@param protocol: A reference to a connected L{AMP} protocol instance,
317
which will be used to send L{MethodCall} commands.
318
@param retry_on_reconnect: If C{True}, this L{RemoteObject} will retry
319
to perform again requests that failed due to a lost connection, as
320
soon as a new connection is available.
321
@param timeout: A timeout for failed requests, if the L{RemoteObject}
322
can't perform them again successfully within this number of
323
seconds, they will errback with a L{MethodCallError}.
325
self._protocol = protocol
326
self._factory = protocol.factory
327
self._reactor = protocol.factory.reactor
328
self._retry_on_reconnect = retry_on_reconnect
329
self._timeout = timeout
330
self._pending_requests = {}
331
self._factory.add_notifier(self._handle_reconnect)
333
def __getattr__(self, method):
334
"""Return a function sending a L{MethodCall} for the given C{method}.
336
When the created function is called, it sends the an appropriate
337
L{MethodCall} to the remote peer passing it the arguments and
338
keyword arguments it was called with, and returning a L{Deferred}
339
resulting in the L{MethodCall}'s response value.
342
def send_method_call(*args, **kwargs):
343
result = self._protocol.send_method_call(method=method,
346
deferred = Deferred()
347
result.addCallback(self._handle_response, deferred)
348
result.addErrback(self._handle_failure, method, args, kwargs,
352
return send_method_call
354
def _handle_response(self, response, deferred, call=None):
355
"""Handles a successful L{MethodCall} response.
357
@param response: The L{MethodCall} response.
358
@param deferred: The deferred that was returned to the caller.
359
@param call: If not C{None}, the scheduled timeout call associated with
362
result = response["result"]
364
call.cancel() # This is a successful retry, cancel the timeout.
365
deferred.callback(result)
367
def _handle_failure(self, failure, method, args, kwargs, deferred,
369
"""Called when a L{MethodCall} command fails.
371
If a failure is due to a connection error and if C{retry_on_reconnect}
372
is C{True}, we will try to perform the requested L{MethodCall} again
373
as soon as a new connection becomes available, giving up after the
374
specified C{timeout}, if any.
376
@param failure: The L{Failure} raised by the requested L{MethodCall}.
377
@param name: The method name associated with the failed L{MethodCall}.
378
@param args: The positional arguments of the failed L{MethodCall}.
379
@param kwargs: The keyword arguments of the failed L{MethodCall}.
380
@param deferred: The deferred that was returned to the caller.
381
@param call: If not C{None}, the scheduled timeout call associated with
384
is_method_call_error = failure.type is MethodCallError
385
dont_retry = self._retry_on_reconnect == False
387
if is_method_call_error or dont_retry:
388
# This means either that the connection is working, and a
389
# MethodCall protocol error occured, or that we gave up
390
# trying and raised a timeout. In any case just propagate
392
if deferred in self._pending_requests:
393
self._pending_requests.pop(deferred)
396
deferred.errback(failure)
399
if self._timeout and call is None:
400
# This is the first failure for this request, let's schedule a
402
timeout = Failure(MethodCallError("timeout"))
403
call = self._reactor.callLater(self._timeout,
404
self._handle_failure,
405
timeout, method, args,
406
kwargs, deferred=deferred)
408
self._pending_requests[deferred] = (method, args, kwargs, call)
410
def _handle_reconnect(self, protocol):
411
"""Handles a reconnection.
413
@param protocol: The newly connected protocol instance.
415
self._protocol = protocol
416
if self._retry_on_reconnect:
420
"""Try to perform again requests that failed."""
422
# We need to copy the requests list before iterating over it, because
423
# if we are actually still disconnected, callRemote will return a
424
# failed deferred and the _handle_failure errback will be executed
425
# synchronously during the loop, modifing the requests list itself.
426
requests = self._pending_requests.copy()
427
self._pending_requests.clear()
430
deferred, (method, args, kwargs, call) = requests.popitem()
431
result = self._protocol.send_method_call(method, args, kwargs)
432
result.addCallback(self._handle_response,
433
deferred=deferred, call=call)
434
result.addErrback(self._handle_failure, method, args, kwargs,
435
deferred=deferred, call=call)
438
class RemoteObjectConnector(object):
439
"""Connect to remote objects exposed by a L{MethodCallProtocol}."""
441
factory = MethodCallFactory
442
remote = RemoteObject
444
def __init__(self, reactor, socket_path, *args, **kwargs):
446
@param reactor: A reactor able to connect to Unix sockets.
447
@param socket: The path to the socket we want to connect to.
448
@param args: Arguments to be passed to the created L{RemoteObject}.
449
@param kwargs: Keyword arguments for the created L{RemoteObject}.
451
self._socket_path = socket_path
452
self._reactor = reactor
454
self._kwargs = kwargs
458
def connect(self, max_retries=None, factor=None):
459
"""Connect to a remote object exposed by a L{MethodCallProtocol}.
461
This method will connect to the socket provided in the constructor
462
and return a L{Deferred} resulting in a connected L{RemoteObject}.
464
@param max_retries: If not C{None} give up try to connect after this
465
amount of times, otherwise keep trying to connect forever.
466
@param factor: Optionally a float indicating by which factor the
467
delay between subsequent retries should increase. Smaller values
468
result in a faster reconnection attempts pace.
470
self._connected = Deferred()
471
self._factory = self.factory(reactor=self._reactor)
472
self._factory.maxRetries = max_retries
474
self._factory.factor = factor
475
self._factory.add_notifier(self._success, self._failure)
476
self._reactor.connectUNIX(self._socket_path, self._factory)
477
return self._connected
479
def _success(self, result):
480
"""Called when the first connection has been established"""
482
# We did our job, remove our own notifier and let the remote object
483
# handle reconnections.
484
self._factory.remove_notifier(self._success, self._failure)
485
self._remote = self.remote(result, *self._args, **self._kwargs)
486
self._connected.callback(self._remote)
488
def _failure(self, failure):
489
"""Called when the first connection has failed"""
490
self._connected.errback(failure)
492
def disconnect(self):
493
"""Disconnect the L{RemoteObject} that we have created."""
495
self._factory.stopTrying()
497
if self._remote._protocol.transport:
498
self._remote._protocol.transport.loseConnection()