~landscape/landscape-client/trunk

« back to all changes in this revision

Viewing changes to landscape/lib/amp.py

  • Committer: Tarmac
  • Author(s): Free Ekanayaka
  • Date: 2013-04-23 13:50:29 UTC
  • mfrom: (660.2.3 amp-cleanup-2)
  • Revision ID: tarmac-20130423135029-plhso0kausu3xp7u
Merge amp-cleanup-2 [f=1170669] [r=ack,tealeg] [a=Free Ekanayaka]
This branch is a step toward the goal of fully-synchronous tests in the client. In detail:

- Drop the retry_on_reconnect parameter of the RemoteBroker constructor, and
  stick it to MethodCallClientFactory instead. The idea is to make the factory
  a sort of controller of RemoteBroker. Since user code is supposed to deal
  with factories directly, the can control the RemoteBroker's behavior by
  modifying the factory.

- Move the MethodCallFactory class after the RemoteBroker class, since we now need
  to set the latter as 'remote' class attribute of the former.

- Rename MethodCallFactory to MethodCallClientFactory, since this is really
  a protocol factory for clients, not for servers.

- Implement a new interface for MethodCallClientFactory, which should be more convenient
  for user-code. This includes adding a getRemoteObject method and replacing the
  notify_success and notify_failure methods with a single notifyOnConnect (camel case
  for consistency with Twisted).

Show diffs side-by-side

added added

removed removed

Lines of Context:
45
45
for more details about the Twisted AMP protocol.
46
46
"""
47
47
 
48
 
from twisted.internet.defer import Deferred, maybeDeferred
 
48
from twisted.internet.defer import Deferred, maybeDeferred, succeed
49
49
from twisted.internet.protocol import ReconnectingClientFactory
50
50
from twisted.python.failure import Failure
51
51
 
277
277
 
278
278
 
279
279
class MethodCallClientProtocol(AMP):
280
 
    """XXX Placeholder"""
 
280
    """Send L{MethodCall} commands over the wire using the AMP protocol."""
 
281
 
 
282
    def connectionMade(self):
 
283
        """Notify our factory that we're ready to go."""
 
284
        self.factory.clientConnectionMade(self)
281
285
 
282
286
 
283
287
class MethodCallProtocol(MethodCallServerProtocol, MethodCallClientProtocol):
288
292
        MethodCallClientProtocol.__init__(self)
289
293
 
290
294
 
291
 
class MethodCallFactory(ReconnectingClientFactory):
292
 
    """
293
 
    Factory for L{MethodCallProtocol}s exposing an object or connecting to
294
 
    L{MethodCall} servers.
295
 
 
296
 
    When used to connect, if the connection fails or is lost the factory
297
 
    will keep retrying to establish it.
298
 
 
299
 
    @cvar protocol: The factory used to build protocol instances.
300
 
    @cvar factor: The time factor by which the delay between two subsequent
301
 
        connection retries will increase.
302
 
    @cvar maxDelay: Maximum number of seconds between connection attempts.
303
 
    """
304
 
 
305
 
    protocol = MethodCallProtocol
306
 
    factor = 1.6180339887498948
307
 
    maxDelay = 30
308
 
 
309
 
    def __init__(self, object=None, reactor=None):
310
 
        """
311
 
        @param object: The object exposed by the L{MethodCallProtocol}s
312
 
            instances created by this factory.
313
 
        @param reactor: The reactor used by the created protocols
314
 
            to schedule notifications and timeouts.
315
 
        """
316
 
        self.object = object
317
 
        self.reactor = reactor
318
 
        self.clock = self.reactor
319
 
        self.delay = self.initialDelay
320
 
        self._notifiers = []
321
 
 
322
 
    def add_notifier(self, callback, errback=None):
323
 
        """Call the given function on connection, reconnection or give up.
324
 
 
325
 
        @param notifier: A function that will be called when the factory builds
326
 
            a new connected protocol or gives up connecting.  It will be passed
327
 
            the new protocol instance as argument, or the connectionf failure.
328
 
        """
329
 
        self._notifiers.append((callback, errback))
330
 
 
331
 
    def remove_notifier(self, callback, errback=None):
332
 
        """Remove a notifier."""
333
 
        self._notifiers.remove((callback, errback))
334
 
 
335
 
    def notify_success(self, *args, **kwargs):
336
 
        """Notify all registered notifier callbacks."""
337
 
        for callback, _ in self._notifiers:
338
 
            self.reactor.callLater(0, callback, *args, **kwargs)
339
 
 
340
 
    def notify_failure(self, failure):
341
 
        """Notify all registered notifier errbacks."""
342
 
        for _, errback in self._notifiers:
343
 
            if errback is not None:
344
 
                self.reactor.callLater(0, errback, failure)
345
 
 
346
 
    def clientConnectionFailed(self, connector, reason):
347
 
        ReconnectingClientFactory.clientConnectionFailed(self, connector,
348
 
                                                         reason)
349
 
        if self.maxRetries is not None and (self.retries > self.maxRetries):
350
 
            self.notify_failure(reason)  # Give up
351
 
 
352
 
    def buildProtocol(self, addr):
353
 
        self.resetDelay()
354
 
        if self.object is not None:
355
 
            # XXX temporary hack to emulate the behavior of this code before
356
 
            # MethodCallReceiver was introduced
357
 
            locator = MethodCallReceiver(self.object, self.protocol.methods)
358
 
            protocol = AMP(locator=locator)
359
 
            protocol.factory = self
360
 
        else:
361
 
            protocol = ReconnectingClientFactory.buildProtocol(self, addr)
362
 
        self.notify_success(protocol)
363
 
        return protocol
364
 
 
365
 
 
366
295
class RemoteObject(object):
367
296
    """An object able to transparently call methods on a remote object.
368
297
 
371
300
    the remote object exposed by the peer.
372
301
    """
373
302
 
374
 
    def __init__(self, sender, retry_on_reconnect=False, timeout=None,
375
 
                 factory=None):
 
303
    def __init__(self, factory):
376
304
        """
377
305
        @param protocol: A reference to a connected L{AMP} protocol instance,
378
306
            which will be used to send L{MethodCall} commands.
383
311
            can't perform them again successfully within this number of
384
312
            seconds, they will errback with a L{MethodCallError}.
385
313
        """
386
 
        self._sender = sender
 
314
        self._sender = None
 
315
        self._pending_requests = {}
387
316
        self._factory = factory
388
 
        self._retry_on_reconnect = retry_on_reconnect
389
 
        self._timeout = timeout
390
 
        self._pending_requests = {}
391
 
        if self._factory:
392
 
            # XXX temporary hack to emulate the behavior of this code before
393
 
            # MethodCallReceiver was introduced
394
 
            self._reactor = factory.reactor
395
 
            self._factory.add_notifier(self._handle_reconnect)
 
317
        self._factory.notifyOnConnect(self._handle_connect)
396
318
 
397
319
    def __getattr__(self, method):
398
320
        """Return a function sending a L{MethodCall} for the given C{method}.
445
367
            the given deferred.
446
368
        """
447
369
        is_method_call_error = failure.type is MethodCallError
448
 
        dont_retry = self._retry_on_reconnect is False
 
370
        dont_retry = self._factory.retryOnReconnect is False
449
371
 
450
372
        if is_method_call_error or dont_retry:
451
373
            # This means either that the connection is working, and a
459
381
            deferred.errback(failure)
460
382
            return
461
383
 
462
 
        if self._timeout and call is None:
 
384
        if self._factory.retryTimeout and call is None:
463
385
            # This is the first failure for this request, let's schedule a
464
386
            # timeout call.
465
387
            timeout = Failure(MethodCallError("timeout"))
466
 
            call = self._reactor.callLater(self._timeout,
467
 
                                           self._handle_failure,
468
 
                                           timeout, method, args,
469
 
                                           kwargs, deferred=deferred)
 
388
            call = self._factory.clock.callLater(self._factory.retryTimeout,
 
389
                                                 self._handle_failure,
 
390
                                                 timeout, method, args,
 
391
                                                 kwargs, deferred=deferred)
470
392
 
471
393
        self._pending_requests[deferred] = (method, args, kwargs, call)
472
394
 
473
 
    def _handle_reconnect(self, protocol):
 
395
    def _handle_connect(self, protocol):
474
396
        """Handles a reconnection.
475
397
 
476
398
        @param protocol: The newly connected protocol instance.
477
399
        """
478
 
        self._sender.protocol = protocol
 
400
        if self._sender is None:
 
401
            self._sender = MethodCallSender(protocol, self._factory.clock)
 
402
        else:
 
403
            self._sender.protocol = protocol
479
404
        if self._retry_on_reconnect:
480
405
            self._retry()
481
406
 
498
423
                              deferred=deferred, call=call)
499
424
 
500
425
 
 
426
class MethodCallClientFactory(ReconnectingClientFactory):
 
427
    """
 
428
    Factory for L{MethodCallProtocol}s exposing an object or connecting to
 
429
    L{MethodCall} servers.
 
430
 
 
431
    When used to connect, if the connection fails or is lost the factory
 
432
    will keep retrying to establish it.
 
433
 
 
434
    @ivar factor: The time factor by which the delay between two subsequent
 
435
        connection retries will increase.
 
436
    @ivar maxDelay: Maximum number of seconds between connection attempts.
 
437
    @ivar protocol: The factory used to build protocol instances.
 
438
    @ivar remote: The factory used to build remote object instances.
 
439
    @ivar retryOnReconnect: If C{True}, the remote object returned by the
 
440
        C{getRemoteObject} method will retry requests that failed, as a
 
441
        result of a lost connection, as soon as a new connection is available.
 
442
    @param retryTimeout: A timeout for retrying requests, if the remote object
 
443
        can't perform them again successfully within this number of seconds,
 
444
        they will errback with a L{MethodCallError}.
 
445
    """
 
446
 
 
447
    factor = 1.6180339887498948
 
448
    maxDelay = 30
 
449
 
 
450
    protocol = MethodCallClientProtocol
 
451
    remote = RemoteObject
 
452
 
 
453
    retryOnReconnect = False
 
454
    retryTimeout = None
 
455
 
 
456
    def __init__(self, reactor=None, object=None):
 
457
        """
 
458
        @param object: The object exposed by the L{MethodCallProtocol}s
 
459
            instances created by this factory.
 
460
        @param reactor: The reactor used by the created protocols
 
461
            to schedule notifications and timeouts.
 
462
        """
 
463
        self.object = object  # XXX
 
464
        self.reactor = reactor
 
465
        self.clock = self.reactor
 
466
        self.delay = self.initialDelay
 
467
        self._connects = []
 
468
        self._requests = []
 
469
        self._remote = None
 
470
 
 
471
    def getRemoteObject(self):
 
472
        """Get a L{RemoteObject} as soon as the connection is ready.
 
473
 
 
474
        @return: A C{Deferred} firing with a connected L{RemoteObject}.
 
475
        """
 
476
        if self._remote is not None:
 
477
            return succeed(self._remote)
 
478
        deferred = Deferred()
 
479
        self._requests.append(deferred)
 
480
        return deferred
 
481
 
 
482
    def notifyOnConnect(self, callback):
 
483
        """Invoke the given C{callback} when a connection is re-established."""
 
484
        self._connects.append(callback)
 
485
 
 
486
    def dontNotifyOnConnect(self, callback):
 
487
        """Remove the given C{callback} from listeners."""
 
488
        self._connects.remove(callback)
 
489
 
 
490
    def clientConnectionMade(self, protocol):
 
491
        """Called when a newly built protocol gets connected."""
 
492
        if self._remote is None:
 
493
            # This is the first time we successfully connect
 
494
            self._remote = self.remote(self)
 
495
 
 
496
        for callback in self._connects:
 
497
            callback(protocol)
 
498
 
 
499
        # In all cases fire pending requests
 
500
        self._fire_requests(self._remote)
 
501
 
 
502
    def clientConnectionFailed(self, connector, reason):
 
503
        """Try to connect again or errback pending request."""
 
504
        ReconnectingClientFactory.clientConnectionFailed(self, connector,
 
505
                                                         reason)
 
506
        if self._callID is None:
 
507
            # The factory won't retry to connect, so notify that we failed
 
508
            self._fire_requests(reason)
 
509
 
 
510
    def buildProtocol(self, addr):
 
511
        self.resetDelay()
 
512
        if self.object is not None:
 
513
            # XXX temporary hack to emulate the behavior of this code before
 
514
            # MethodCallReceiver was introduced
 
515
            locator = MethodCallReceiver(self.object, self.protocol.methods)
 
516
            protocol = AMP(locator=locator)
 
517
            protocol.factory = self
 
518
        else:
 
519
            protocol = ReconnectingClientFactory.buildProtocol(self, addr)
 
520
        return protocol
 
521
 
 
522
    def _fire_requests(self, result):
 
523
        """
 
524
        Fire all pending L{getRemoteObject} deferreds with the given C{result}.
 
525
        """
 
526
        requests = self._requests[:]
 
527
        self._requests = []
 
528
 
 
529
        for deferred in requests:
 
530
            deferred.callback(result)
 
531
 
 
532
 
 
533
class MethodCallFactory(MethodCallClientFactory):
 
534
    """XXX placeholder"""
 
535
 
 
536
 
501
537
class RemoteObjectConnector(object):
502
538
    """Connect to remote objects exposed by a L{MethodCallProtocol}."""
503
539
 
504
 
    factory = MethodCallFactory
 
540
    factory = MethodCallClientFactory
505
541
    remote = RemoteObject
506
542
 
507
 
    def __init__(self, reactor, socket_path, *args, **kwargs):
 
543
    def __init__(self, reactor, socket_path):
508
544
        """
509
545
        @param reactor: A reactor able to connect to Unix sockets.
510
546
        @param socket: The path to the socket we want to connect to.
513
549
        """
514
550
        self._socket_path = socket_path
515
551
        self._reactor = reactor
516
 
        self._args = args
517
 
        self._kwargs = kwargs
518
 
        self._remote = None
519
552
        self._factory = None
520
553
 
521
554
    def connect(self, max_retries=None, factor=None):
530
563
            delay between subsequent retries should increase. Smaller values
531
564
            result in a faster reconnection attempts pace.
532
565
        """
533
 
        self._connected = Deferred()
534
566
        self._factory = self.factory(reactor=self._reactor)
535
567
        self._factory.maxRetries = max_retries
536
568
        if factor:
537
569
            self._factory.factor = factor
538
 
        self._factory.add_notifier(self._success, self._failure)
539
570
        self._reactor.connectUNIX(self._socket_path, self._factory)
540
 
        return self._connected
541
 
 
542
 
    def _success(self, result):
543
 
        """Called when the first connection has been established"""
544
 
 
545
 
        # We did our job, remove our own notifier and let the remote object
546
 
        # handle reconnections.
547
 
        self._factory.remove_notifier(self._success, self._failure)
548
 
        sender = MethodCallSender(result, self._reactor)
549
 
        # XXX temporary hack to emulate the behavior of this code before
550
 
        # MethodCallReceiver was introduced
551
 
        self._kwargs["factory"] = self._factory
552
 
        self._remote = self.remote(sender, *self._args, **self._kwargs)
553
 
        self._connected.callback(self._remote)
554
 
 
555
 
    def _failure(self, failure):
556
 
        """Called when the first connection has failed"""
557
 
        self._connected.errback(failure)
 
571
        return self._factory.getRemoteObject()
558
572
 
559
573
    def disconnect(self):
560
574
        """Disconnect the L{RemoteObject} that we have created."""
561
575
        if self._factory:
562
576
            self._factory.stopTrying()
563
 
        if self._remote:
564
 
            if self._remote._sender.protocol.transport:
565
 
                self._remote._sender.protocol.transport.loseConnection()
566
 
            self._remote = None
 
577
            remote = self._factory._remote
 
578
            if remote:
 
579
                if remote._sender.protocol.transport:
 
580
                    remote._sender.protocol.transport.loseConnection()