288
292
MethodCallClientProtocol.__init__(self)
291
class MethodCallFactory(ReconnectingClientFactory):
293
Factory for L{MethodCallProtocol}s exposing an object or connecting to
294
L{MethodCall} servers.
296
When used to connect, if the connection fails or is lost the factory
297
will keep retrying to establish it.
299
@cvar protocol: The factory used to build protocol instances.
300
@cvar factor: The time factor by which the delay between two subsequent
301
connection retries will increase.
302
@cvar maxDelay: Maximum number of seconds between connection attempts.
305
protocol = MethodCallProtocol
306
factor = 1.6180339887498948
309
def __init__(self, object=None, reactor=None):
311
@param object: The object exposed by the L{MethodCallProtocol}s
312
instances created by this factory.
313
@param reactor: The reactor used by the created protocols
314
to schedule notifications and timeouts.
317
self.reactor = reactor
318
self.clock = self.reactor
319
self.delay = self.initialDelay
322
def add_notifier(self, callback, errback=None):
323
"""Call the given function on connection, reconnection or give up.
325
@param notifier: A function that will be called when the factory builds
326
a new connected protocol or gives up connecting. It will be passed
327
the new protocol instance as argument, or the connectionf failure.
329
self._notifiers.append((callback, errback))
331
def remove_notifier(self, callback, errback=None):
332
"""Remove a notifier."""
333
self._notifiers.remove((callback, errback))
335
def notify_success(self, *args, **kwargs):
336
"""Notify all registered notifier callbacks."""
337
for callback, _ in self._notifiers:
338
self.reactor.callLater(0, callback, *args, **kwargs)
340
def notify_failure(self, failure):
341
"""Notify all registered notifier errbacks."""
342
for _, errback in self._notifiers:
343
if errback is not None:
344
self.reactor.callLater(0, errback, failure)
346
def clientConnectionFailed(self, connector, reason):
347
ReconnectingClientFactory.clientConnectionFailed(self, connector,
349
if self.maxRetries is not None and (self.retries > self.maxRetries):
350
self.notify_failure(reason) # Give up
352
def buildProtocol(self, addr):
354
if self.object is not None:
355
# XXX temporary hack to emulate the behavior of this code before
356
# MethodCallReceiver was introduced
357
locator = MethodCallReceiver(self.object, self.protocol.methods)
358
protocol = AMP(locator=locator)
359
protocol.factory = self
361
protocol = ReconnectingClientFactory.buildProtocol(self, addr)
362
self.notify_success(protocol)
366
295
class RemoteObject(object):
367
296
"""An object able to transparently call methods on a remote object.
459
381
deferred.errback(failure)
462
if self._timeout and call is None:
384
if self._factory.retryTimeout and call is None:
463
385
# This is the first failure for this request, let's schedule a
465
387
timeout = Failure(MethodCallError("timeout"))
466
call = self._reactor.callLater(self._timeout,
467
self._handle_failure,
468
timeout, method, args,
469
kwargs, deferred=deferred)
388
call = self._factory.clock.callLater(self._factory.retryTimeout,
389
self._handle_failure,
390
timeout, method, args,
391
kwargs, deferred=deferred)
471
393
self._pending_requests[deferred] = (method, args, kwargs, call)
473
def _handle_reconnect(self, protocol):
395
def _handle_connect(self, protocol):
474
396
"""Handles a reconnection.
476
398
@param protocol: The newly connected protocol instance.
478
self._sender.protocol = protocol
400
if self._sender is None:
401
self._sender = MethodCallSender(protocol, self._factory.clock)
403
self._sender.protocol = protocol
479
404
if self._retry_on_reconnect:
498
423
deferred=deferred, call=call)
426
class MethodCallClientFactory(ReconnectingClientFactory):
428
Factory for L{MethodCallProtocol}s exposing an object or connecting to
429
L{MethodCall} servers.
431
When used to connect, if the connection fails or is lost the factory
432
will keep retrying to establish it.
434
@ivar factor: The time factor by which the delay between two subsequent
435
connection retries will increase.
436
@ivar maxDelay: Maximum number of seconds between connection attempts.
437
@ivar protocol: The factory used to build protocol instances.
438
@ivar remote: The factory used to build remote object instances.
439
@ivar retryOnReconnect: If C{True}, the remote object returned by the
440
C{getRemoteObject} method will retry requests that failed, as a
441
result of a lost connection, as soon as a new connection is available.
442
@param retryTimeout: A timeout for retrying requests, if the remote object
443
can't perform them again successfully within this number of seconds,
444
they will errback with a L{MethodCallError}.
447
factor = 1.6180339887498948
450
protocol = MethodCallClientProtocol
451
remote = RemoteObject
453
retryOnReconnect = False
456
def __init__(self, reactor=None, object=None):
458
@param object: The object exposed by the L{MethodCallProtocol}s
459
instances created by this factory.
460
@param reactor: The reactor used by the created protocols
461
to schedule notifications and timeouts.
463
self.object = object # XXX
464
self.reactor = reactor
465
self.clock = self.reactor
466
self.delay = self.initialDelay
471
def getRemoteObject(self):
472
"""Get a L{RemoteObject} as soon as the connection is ready.
474
@return: A C{Deferred} firing with a connected L{RemoteObject}.
476
if self._remote is not None:
477
return succeed(self._remote)
478
deferred = Deferred()
479
self._requests.append(deferred)
482
def notifyOnConnect(self, callback):
483
"""Invoke the given C{callback} when a connection is re-established."""
484
self._connects.append(callback)
486
def dontNotifyOnConnect(self, callback):
487
"""Remove the given C{callback} from listeners."""
488
self._connects.remove(callback)
490
def clientConnectionMade(self, protocol):
491
"""Called when a newly built protocol gets connected."""
492
if self._remote is None:
493
# This is the first time we successfully connect
494
self._remote = self.remote(self)
496
for callback in self._connects:
499
# In all cases fire pending requests
500
self._fire_requests(self._remote)
502
def clientConnectionFailed(self, connector, reason):
503
"""Try to connect again or errback pending request."""
504
ReconnectingClientFactory.clientConnectionFailed(self, connector,
506
if self._callID is None:
507
# The factory won't retry to connect, so notify that we failed
508
self._fire_requests(reason)
510
def buildProtocol(self, addr):
512
if self.object is not None:
513
# XXX temporary hack to emulate the behavior of this code before
514
# MethodCallReceiver was introduced
515
locator = MethodCallReceiver(self.object, self.protocol.methods)
516
protocol = AMP(locator=locator)
517
protocol.factory = self
519
protocol = ReconnectingClientFactory.buildProtocol(self, addr)
522
def _fire_requests(self, result):
524
Fire all pending L{getRemoteObject} deferreds with the given C{result}.
526
requests = self._requests[:]
529
for deferred in requests:
530
deferred.callback(result)
533
class MethodCallFactory(MethodCallClientFactory):
534
"""XXX placeholder"""
501
537
class RemoteObjectConnector(object):
502
538
"""Connect to remote objects exposed by a L{MethodCallProtocol}."""
504
factory = MethodCallFactory
540
factory = MethodCallClientFactory
505
541
remote = RemoteObject
507
def __init__(self, reactor, socket_path, *args, **kwargs):
543
def __init__(self, reactor, socket_path):
509
545
@param reactor: A reactor able to connect to Unix sockets.
510
546
@param socket: The path to the socket we want to connect to.
530
563
delay between subsequent retries should increase. Smaller values
531
564
result in a faster reconnection attempts pace.
533
self._connected = Deferred()
534
566
self._factory = self.factory(reactor=self._reactor)
535
567
self._factory.maxRetries = max_retries
537
569
self._factory.factor = factor
538
self._factory.add_notifier(self._success, self._failure)
539
570
self._reactor.connectUNIX(self._socket_path, self._factory)
540
return self._connected
542
def _success(self, result):
543
"""Called when the first connection has been established"""
545
# We did our job, remove our own notifier and let the remote object
546
# handle reconnections.
547
self._factory.remove_notifier(self._success, self._failure)
548
sender = MethodCallSender(result, self._reactor)
549
# XXX temporary hack to emulate the behavior of this code before
550
# MethodCallReceiver was introduced
551
self._kwargs["factory"] = self._factory
552
self._remote = self.remote(sender, *self._args, **self._kwargs)
553
self._connected.callback(self._remote)
555
def _failure(self, failure):
556
"""Called when the first connection has failed"""
557
self._connected.errback(failure)
571
return self._factory.getRemoteObject()
559
573
def disconnect(self):
560
574
"""Disconnect the L{RemoteObject} that we have created."""
561
575
if self._factory:
562
576
self._factory.stopTrying()
564
if self._remote._sender.protocol.transport:
565
self._remote._sender.protocol.transport.loseConnection()
577
remote = self._factory._remote
579
if remote._sender.protocol.transport:
580
remote._sender.protocol.transport.loseConnection()