1
# -*- test-case-name: twisted.test.test_pb -*-
2
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
3
# See LICENSE for details.
8
\"This isn\'t a professional opinion, but it's probably got enough
9
internet to kill you.\" --glyph
14
This is a broker for proxies for and copies of objects. It provides a
15
translucent interface layer to those proxies.
17
The protocol is not opaque, because it provides objects which represent the
18
remote proxies and require no context (server references, IDs) to operate on.
20
It is not transparent because it does I{not} attempt to make remote objects
21
behave identically, or even similiarly, to local objects. Method calls are
22
invoked asynchronously, and specific rules are applied when serializing
25
To get started, begin with L{PBClientFactory} and L{PBServerFactory}.
27
@author: Glyph Lefkowitz
34
from zope.interface import implements, Interface
37
from twisted.python import log, failure, reflect
38
from twisted.python.versions import Version
39
from twisted.python.deprecate import deprecated
40
from twisted.python.hashlib import md5
41
from twisted.internet import defer, protocol
42
from twisted.cred.portal import Portal
43
from twisted.cred.credentials import IAnonymous, ICredentials
44
from twisted.cred.credentials import IUsernameHashedPassword, Anonymous
45
from twisted.persisted import styles
46
from twisted.python.components import registerAdapter
48
from twisted.spread.interfaces import IJellyable, IUnjellyable
49
from twisted.spread.jelly import jelly, unjelly, globalSecurity
50
from twisted.spread import banana
52
from twisted.spread.flavors import Serializable
53
from twisted.spread.flavors import Referenceable, NoSuchMethod
54
from twisted.spread.flavors import Root, IPBRoot
55
from twisted.spread.flavors import ViewPoint
56
from twisted.spread.flavors import Viewable
57
from twisted.spread.flavors import Copyable
58
from twisted.spread.flavors import Jellyable
59
from twisted.spread.flavors import Cacheable
60
from twisted.spread.flavors import RemoteCopy
61
from twisted.spread.flavors import RemoteCache
62
from twisted.spread.flavors import RemoteCacheObserver
63
from twisted.spread.flavors import copyTags
65
from twisted.spread.flavors import setUnjellyableForClass
66
from twisted.spread.flavors import setUnjellyableFactoryForClass
67
from twisted.spread.flavors import setUnjellyableForClassTree
68
# These three are backwards compatibility aliases for the previous three.
69
# Ultimately they should be deprecated. -exarkun
70
from twisted.spread.flavors import setCopierForClass
71
from twisted.spread.flavors import setFactoryForClass
72
from twisted.spread.flavors import setCopierForClassTree
75
MAX_BROKER_REFS = 1024
80
class ProtocolError(Exception):
82
This error is raised when an invalid protocol statement is received.
85
class DeadReferenceError(ProtocolError):
87
This error is raised when a method is called on a dead reference (one whose
88
broker has been disconnected).
91
class Error(Exception):
93
This error can be raised to generate known error conditions.
95
When a PB callable method (perspective_, remote_, view_) raises
96
this error, it indicates that a traceback should not be printed,
97
but instead, the string representation of the exception should be
102
"""This is a translucent reference to a remote message.
104
def __init__(self, obj, name):
105
"""Initialize with a L{RemoteReference} and the name of this message.
110
def __cmp__(self, other):
111
return cmp((self.obj, self.name), other)
114
return hash((self.obj, self.name))
116
def __call__(self, *args, **kw):
117
"""Asynchronously invoke a remote method.
119
return self.obj.broker._sendMessage('',self.obj.perspective, self.obj.luid, self.name, args, kw)
123
def noOperation(*args, **kw):
127
Neque porro quisquam est qui dolorem ipsum quia dolor sit amet,
128
consectetur, adipisci velit...
130
noOperation = deprecated(Version("twisted", 8, 2, 0))(noOperation)
134
class PBConnectionLost(Exception):
139
def printTraceback(tb):
141
Print a traceback (string) to the standard log.
143
log.msg('Perspective Broker Traceback:' )
145
printTraceback = deprecated(Version("twisted", 8, 2, 0))(printTraceback)
148
class IPerspective(Interface):
150
per*spec*tive, n. : The relationship of aspects of a subject to each
151
other and to a whole: 'a perspective of history'; 'a need to view
152
the problem in the proper perspective'.
154
This is a Perspective Broker-specific wrapper for an avatar. That
155
is to say, a PB-published view on to the business logic for the
156
system's concept of a 'user'.
158
The concept of attached/detached is no longer implemented by the
159
framework. The realm is expected to implement such semantics if
163
def perspectiveMessageReceived(broker, message, args, kwargs):
165
This method is called when a network message is received.
167
@arg broker: The Perspective Broker.
170
@arg message: The name of the method called by the other end.
172
@type args: list in jelly format
173
@arg args: The arguments that were passed by the other end. It
174
is recommend that you use the `unserialize' method of the
175
broker to decode this.
177
@type kwargs: dict in jelly format
178
@arg kwargs: The keyword arguments that were passed by the
179
other end. It is recommended that you use the
180
`unserialize' method of the broker to decode this.
182
@rtype: A jelly list.
183
@return: It is recommended that you use the `serialize' method
184
of the broker on whatever object you need to return to
185
generate the return value.
192
A default IPerspective implementor.
194
This class is intended to be subclassed, and a realm should return
195
an instance of such a subclass when IPerspective is requested of
198
A peer requesting a perspective will receive only a
199
L{RemoteReference} to a pb.Avatar. When a method is called on
200
that L{RemoteReference}, it will translate to a method on the
201
remote perspective named 'perspective_methodname'. (For more
202
information on invoking methods on other objects, see
203
L{flavors.ViewPoint}.)
206
implements(IPerspective)
208
def perspectiveMessageReceived(self, broker, message, args, kw):
210
This method is called when a network message is received.
214
self.perspective_%(message)s(*broker.unserialize(args),
215
**broker.unserialize(kw))
217
to handle the method; subclasses of Avatar are expected to
218
implement methods using this naming convention.
221
args = broker.unserialize(args, self)
222
kw = broker.unserialize(kw, self)
223
method = getattr(self, "perspective_%s" % message)
225
state = method(*args, **kw)
227
log.msg("%s didn't accept %s and %s" % (method, args, kw))
229
return broker.serialize(state, self, method, args, kw)
233
class AsReferenceable(Referenceable):
235
A reference directed towards another object.
238
def __init__(self, object, messageType="remote"):
239
self.remoteMessageReceived = getattr(
240
object, messageType + "MessageReceived")
244
class RemoteReference(Serializable, styles.Ephemeral):
246
A translucent reference to a remote object.
248
I may be a reference to a L{flavors.ViewPoint}, a
249
L{flavors.Referenceable}, or an L{IPerspective} implementor (e.g.,
250
pb.Avatar). From the client's perspective, it is not possible to
251
tell which except by convention.
253
I am a \"translucent\" reference because although no additional
254
bookkeeping overhead is given to the application programmer for
255
manipulating a reference, return values are asynchronous.
257
See also L{twisted.internet.defer}.
259
@ivar broker: The broker I am obtained through.
260
@type broker: L{Broker}
263
implements(IUnjellyable)
265
def __init__(self, perspective, broker, luid, doRefCount):
266
"""(internal) Initialize me with a broker and a locally-unique ID.
268
The ID is unique only to the particular Perspective Broker
273
self.doRefCount = doRefCount
274
self.perspective = perspective
275
self.disconnectCallbacks = []
277
def notifyOnDisconnect(self, callback):
278
"""Register a callback to be called if our broker gets disconnected.
280
This callback will be called with one argument, this instance.
282
assert callable(callback)
283
self.disconnectCallbacks.append(callback)
284
if len(self.disconnectCallbacks) == 1:
285
self.broker.notifyOnDisconnect(self._disconnected)
287
def dontNotifyOnDisconnect(self, callback):
288
"""Remove a callback that was registered with notifyOnDisconnect."""
289
self.disconnectCallbacks.remove(callback)
290
if not self.disconnectCallbacks:
291
self.broker.dontNotifyOnDisconnect(self._disconnected)
293
def _disconnected(self):
294
"""Called if we are disconnected and have callbacks registered."""
295
for callback in self.disconnectCallbacks:
297
self.disconnectCallbacks = None
299
def jellyFor(self, jellier):
300
"""If I am being sent back to where I came from, serialize as a local backreference.
303
assert self.broker == jellier.invoker, "Can't send references to brokers other than their own."
304
return "local", self.luid
306
return "unpersistable", "References cannot be serialized"
308
def unjellyFor(self, unjellier, unjellyList):
309
self.__init__(unjellier.invoker.unserializingPerspective, unjellier.invoker, unjellyList[1], 1)
312
def callRemote(self, _name, *args, **kw):
313
"""Asynchronously invoke a remote method.
315
@type _name: C{string}
316
@param _name: the name of the remote method to invoke
317
@param args: arguments to serialize for the remote function
318
@param kw: keyword arguments to serialize for the remote function.
319
@rtype: L{twisted.internet.defer.Deferred}
320
@returns: a Deferred which will be fired when the result of
321
this remote call is received.
323
# note that we use '_name' instead of 'name' so the user can call
324
# remote methods with 'name' as a keyword parameter, like this:
325
# ref.callRemote("getPeopleNamed", count=12, name="Bob")
327
return self.broker._sendMessage('',self.perspective, self.luid,
330
def remoteMethod(self, key):
331
"""Get a L{RemoteMethod} for this key.
333
return RemoteMethod(self, key)
335
def __cmp__(self,other):
336
"""Compare me [to another L{RemoteReference}].
338
if isinstance(other, RemoteReference):
339
if other.broker == self.broker:
340
return cmp(self.luid, other.luid)
341
return cmp(self.broker, other)
349
"""Do distributed reference counting on finalization.
352
self.broker.sendDecRef(self.luid)
354
setUnjellyableForClass("remote", RemoteReference)
357
"""(internal) A reference to a local object.
360
def __init__(self, object, perspective=None):
364
self.perspective = perspective
368
return "<pb.Local %r ref:%s>" % (self.object, self.refcount)
371
"""Increment and return my reference count.
373
self.refcount = self.refcount + 1
377
"""Decrement and return my reference count.
379
self.refcount = self.refcount - 1
387
class CopyableFailure(failure.Failure, Copyable):
389
A L{flavors.RemoteCopy} and L{flavors.Copyable} version of
390
L{twisted.python.failure.Failure} for serialization.
395
def getStateToCopy(self):
397
Collect state related to the exception which occurred, discarding
398
state which cannot reasonably be serialized.
400
state = self.__dict__.copy()
404
if isinstance(self.value, failure.Failure):
405
state['value'] = failure2Copyable(self.value, self.unsafeTracebacks)
407
state['value'] = str(self.value) # Exception instance
408
if isinstance(self.type, str):
409
state['type'] = self.type
411
state['type'] = reflect.qual(self.type) # Exception class
412
if self.unsafeTracebacks:
413
state['traceback'] = self.getTraceback()
415
state['traceback'] = 'Traceback unavailable\n'
419
class CopiedFailure(RemoteCopy, failure.Failure):
420
def printTraceback(self, file=None, elideFrameworkCode=0, detail='default'):
423
file.write("Traceback from remote host -- ")
424
file.write(self.traceback)
426
printBriefTraceback = printTraceback
427
printDetailedTraceback = printTraceback
429
setUnjellyableForClass(CopyableFailure, CopiedFailure)
431
def failure2Copyable(fail, unsafeTracebacks=0):
432
f = new.instance(CopyableFailure, fail.__dict__)
433
f.unsafeTracebacks = unsafeTracebacks
436
class Broker(banana.Banana):
437
"""I am a broker for objects.
444
def __init__(self, isClient=1, security=globalSecurity):
445
banana.Banana.__init__(self, isClient)
446
self.disconnected = 0
447
self.disconnects = []
450
self.localObjects = {}
451
self.security = security
452
self.pageProducers = []
453
self.currentRequestID = 0
454
self.currentLocalID = 0
456
# PUID: process unique ID; return value of id() function. type "int".
457
# LUID: locally unique ID; an ID unique to an object mapped over this
458
# connection. type "int"
459
# GUID: (not used yet) globally unique ID; an ID for an object which
460
# may be on a redirected or meta server. Type as yet undecided.
461
# Dictionary mapping LUIDs to local objects.
462
# set above to allow root object to be assigned before connection is made
463
# self.localObjects = {}
464
# Dictionary mapping PUIDs to LUIDs.
466
# Dictionary mapping LUIDs to local (remotely cached) objects. Remotely
467
# cached means that they're objects which originate here, and were
469
self.remotelyCachedObjects = {}
470
# Dictionary mapping PUIDs to (cached) LUIDs
471
self.remotelyCachedLUIDs = {}
472
# Dictionary mapping (remote) LUIDs to (locally cached) objects.
473
self.locallyCachedObjects = {}
474
self.waitingForAnswers = {}
476
# Mapping from LUIDs to weakref objects with callbacks for performing
477
# any local cleanup which may be necessary for the corresponding
478
# object once it no longer exists.
479
self._localCleanup = {}
482
def resumeProducing(self):
483
"""Called when the consumer attached to me runs out of buffer.
485
# Go backwards over the list so we can remove indexes from it as we go
486
for pageridx in xrange(len(self.pageProducers)-1, -1, -1):
487
pager = self.pageProducers[pageridx]
489
if not pager.stillPaging():
490
del self.pageProducers[pageridx]
491
if not self.pageProducers:
492
self.transport.unregisterProducer()
494
# Streaming producer methods; not necessary to implement.
495
def pauseProducing(self):
498
def stopProducing(self):
501
def registerPageProducer(self, pager):
502
self.pageProducers.append(pager)
503
if len(self.pageProducers) == 1:
504
self.transport.registerProducer(self, 0)
506
def expressionReceived(self, sexp):
507
"""Evaluate an expression as it's received.
509
if isinstance(sexp, types.ListType):
511
methodName = "proto_%s" % command
512
method = getattr(self, methodName, None)
516
self.sendCall("didNotUnderstand", command)
518
raise ProtocolError("Non-list expression received.")
521
def proto_version(self, vnum):
522
"""Protocol message: (version version-number)
524
Check to make sure that both ends of the protocol are speaking
525
the same version dialect.
528
if vnum != self.version:
529
raise ProtocolError("Version Incompatibility: %s %s" % (self.version, vnum))
532
def sendCall(self, *exp):
533
"""Utility method to send an expression to the other side of the connection.
535
self.sendEncoded(exp)
537
def proto_didNotUnderstand(self, command):
538
"""Respond to stock 'C{didNotUnderstand}' message.
540
Log the command that was not understood and continue. (Note:
541
this will probably be changed to close the connection or raise
542
an exception in the future.)
544
log.msg("Didn't understand command: %r" % command)
546
def connectionReady(self):
547
"""Initialize. Called after Banana negotiation is done.
549
self.sendCall("version", self.version)
550
for notifier in self.connects:
556
if self.factory: # in tests we won't have factory
557
self.factory.clientConnectionMade(self)
559
def connectionFailed(self):
560
# XXX should never get called anymore? check!
561
for notifier in self.failures:
568
waitingForAnswers = None
570
def connectionLost(self, reason):
571
"""The connection was lost.
573
self.disconnected = 1
574
# nuke potential circular references.
576
if self.waitingForAnswers:
577
for d in self.waitingForAnswers.values():
579
d.errback(failure.Failure(PBConnectionLost(reason)))
582
# Assure all Cacheable.stoppedObserving are called
583
for lobj in self.remotelyCachedObjects.values():
584
cacheable = lobj.object
585
perspective = lobj.perspective
587
cacheable.stoppedObserving(perspective, RemoteCacheObserver(self, cacheable, perspective))
590
# Loop on a copy to prevent notifiers to mixup
591
# the list by calling dontNotifyOnDisconnect
592
for notifier in self.disconnects[:]:
597
self.disconnects = None
598
self.waitingForAnswers = None
599
self.localSecurity = None
600
self.remoteSecurity = None
601
self.remotelyCachedObjects = None
602
self.remotelyCachedLUIDs = None
603
self.locallyCachedObjects = None
604
self.localObjects = None
606
def notifyOnDisconnect(self, notifier):
607
"""Call the given callback when the Broker disconnects."""
608
assert callable(notifier)
609
self.disconnects.append(notifier)
611
def notifyOnFail(self, notifier):
612
"""Call the given callback if the Broker fails to connect."""
613
assert callable(notifier)
614
self.failures.append(notifier)
616
def notifyOnConnect(self, notifier):
617
"""Call the given callback when the Broker connects."""
618
assert callable(notifier)
619
if self.connects is None:
625
self.connects.append(notifier)
627
def dontNotifyOnDisconnect(self, notifier):
628
"""Remove a callback from list of disconnect callbacks."""
630
self.disconnects.remove(notifier)
634
def localObjectForID(self, luid):
636
Get a local object for a locally unique ID.
638
@return: An object previously stored with L{registerReference} or
639
C{None} if there is no object which corresponds to the given
642
lob = self.localObjects.get(luid)
647
maxBrokerRefsViolations = 0
649
def registerReference(self, object):
650
"""Get an ID for a local object.
652
Store a persistent reference to a local object and map its id()
653
to a generated, session-unique ID and return that ID.
656
assert object is not None
657
puid = object.processUniqueID()
658
luid = self.luids.get(puid)
660
if len(self.localObjects) > MAX_BROKER_REFS:
661
self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1
662
if self.maxBrokerRefsViolations > 3:
663
self.transport.loseConnection()
664
raise Error("Maximum PB reference count exceeded. "
666
raise Error("Maximum PB reference count exceeded.")
668
luid = self.newLocalID()
669
self.localObjects[luid] = Local(object)
670
self.luids[puid] = luid
672
self.localObjects[luid].incref()
675
def setNameForLocal(self, name, object):
676
"""Store a special (string) ID for this object.
678
This is how you specify a 'base' set of objects that the remote
679
protocol can connect to.
681
assert object is not None
682
self.localObjects[name] = Local(object)
684
def remoteForName(self, name):
685
"""Returns an object from the remote name mapping.
687
Note that this does not check the validity of the name, only
688
creates a translucent reference for it.
690
return RemoteReference(None, self, name, 0)
692
def cachedRemotelyAs(self, instance, incref=0):
693
"""Returns an ID that says what this instance is cached as remotely, or C{None} if it's not.
696
puid = instance.processUniqueID()
697
luid = self.remotelyCachedLUIDs.get(puid)
698
if (luid is not None) and (incref):
699
self.remotelyCachedObjects[luid].incref()
702
def remotelyCachedForLUID(self, luid):
703
"""Returns an instance which is cached remotely, with this LUID.
705
return self.remotelyCachedObjects[luid].object
707
def cacheRemotely(self, instance):
710
puid = instance.processUniqueID()
711
luid = self.newLocalID()
712
if len(self.remotelyCachedObjects) > MAX_BROKER_REFS:
713
self.maxBrokerRefsViolations = self.maxBrokerRefsViolations + 1
714
if self.maxBrokerRefsViolations > 3:
715
self.transport.loseConnection()
716
raise Error("Maximum PB cache count exceeded. "
718
raise Error("Maximum PB cache count exceeded.")
720
self.remotelyCachedLUIDs[puid] = luid
721
# This table may not be necessary -- for now, it's to make sure that no
722
# monkey business happens with id(instance)
723
self.remotelyCachedObjects[luid] = Local(instance, self.serializingPerspective)
726
def cacheLocally(self, cid, instance):
729
Store a non-filled-out cached instance locally.
731
self.locallyCachedObjects[cid] = instance
733
def cachedLocallyAs(self, cid):
734
instance = self.locallyCachedObjects[cid]
737
def serialize(self, object, perspective=None, method=None, args=None, kw=None):
738
"""Jelly an object according to the remote security rules for this broker.
741
if isinstance(object, defer.Deferred):
742
object.addCallbacks(self.serialize, lambda x: x,
744
'perspective': perspective,
751
# XXX This call is NOT REENTRANT and testing for reentrancy is just
752
# crazy, so it likely won't be. Don't ever write methods that call the
753
# broker's serialize() method recursively (e.g. sending a method call
754
# from within a getState (this causes concurrency problems anyway so
755
# you really, really shouldn't do it))
757
# self.jellier = _NetJellier(self)
758
self.serializingPerspective = perspective
759
self.jellyMethod = method
760
self.jellyArgs = args
763
return jelly(object, self.security, None, self)
765
self.serializingPerspective = None
766
self.jellyMethod = None
767
self.jellyArgs = None
770
def unserialize(self, sexp, perspective = None):
771
"""Unjelly an sexp according to the local security rules for this broker.
774
self.unserializingPerspective = perspective
776
return unjelly(sexp, self.security, None, self)
778
self.unserializingPerspective = None
780
def newLocalID(self):
781
"""Generate a new LUID.
783
self.currentLocalID = self.currentLocalID + 1
784
return self.currentLocalID
786
def newRequestID(self):
787
"""Generate a new request ID.
789
self.currentRequestID = self.currentRequestID + 1
790
return self.currentRequestID
792
def _sendMessage(self, prefix, perspective, objectID, message, args, kw):
796
if kw.has_key('pbcallback'):
797
pbc = kw['pbcallback']
799
if kw.has_key('pberrback'):
800
pbe = kw['pberrback']
802
if kw.has_key('pbanswer'):
803
assert (not pbe) and (not pbc), "You can't specify a no-answer requirement."
804
answerRequired = kw['pbanswer']
806
if self.disconnected:
807
raise DeadReferenceError("Calling Stale Broker")
809
netArgs = self.serialize(args, perspective=perspective, method=message)
810
netKw = self.serialize(kw, perspective=perspective, method=message)
812
return defer.fail(failure.Failure())
813
requestID = self.newRequestID()
815
rval = defer.Deferred()
816
self.waitingForAnswers[requestID] = rval
818
log.msg('warning! using deprecated "pbcallback"')
819
rval.addCallbacks(pbc, pbe)
822
self.sendCall(prefix+"message", requestID, objectID, message, answerRequired, netArgs, netKw)
825
def proto_message(self, requestID, objectID, message, answerRequired, netArgs, netKw):
826
self._recvMessage(self.localObjectForID, requestID, objectID, message, answerRequired, netArgs, netKw)
827
def proto_cachemessage(self, requestID, objectID, message, answerRequired, netArgs, netKw):
828
self._recvMessage(self.cachedLocallyAs, requestID, objectID, message, answerRequired, netArgs, netKw)
830
def _recvMessage(self, findObjMethod, requestID, objectID, message, answerRequired, netArgs, netKw):
831
"""Received a message-send.
833
Look up message based on object, unserialize the arguments, and
834
invoke it with args, and send an 'answer' or 'error' response.
837
object = findObjMethod(objectID)
839
raise Error("Invalid Object ID")
840
netResult = object.remoteMessageReceived(self, message, netArgs, netKw)
843
# If the error is Jellyable or explicitly allowed via our
844
# security options, send it back and let the code on the
845
# other end deal with unjellying. If it isn't Jellyable,
846
# wrap it in a CopyableFailure, which ensures it can be
847
# unjellied on the other end. We have to do this because
848
# all errors must be sent back.
849
if isinstance(e, Jellyable) or self.security.isClassAllowed(e.__class__):
850
self._sendError(e, requestID)
852
self._sendError(CopyableFailure(e), requestID)
855
log.msg("Peer will receive following PB traceback:", isError=True)
856
f = CopyableFailure()
857
self._sendError(f, requestID)
861
if isinstance(netResult, defer.Deferred):
863
netResult.addCallbacks(self._sendAnswer, self._sendFailureOrError,
864
callbackArgs=args, errbackArgs=args)
865
# XXX Should this be done somewhere else?
867
self._sendAnswer(netResult, requestID)
872
def _sendAnswer(self, netResult, requestID):
873
"""(internal) Send an answer to a previously sent message.
875
self.sendCall("answer", requestID, netResult)
877
def proto_answer(self, requestID, netResult):
878
"""(internal) Got an answer to a previously sent message.
880
Look up the appropriate callback and call it.
882
d = self.waitingForAnswers[requestID]
883
del self.waitingForAnswers[requestID]
884
d.callback(self.unserialize(netResult))
889
def _sendFailureOrError(self, fail, requestID):
891
Call L{_sendError} or L{_sendFailure}, depending on whether C{fail}
892
represents an L{Error} subclass or not.
894
if fail.check(Error) is None:
895
self._sendFailure(fail, requestID)
897
self._sendError(fail, requestID)
900
def _sendFailure(self, fail, requestID):
901
"""Log error and then send it."""
902
log.msg("Peer will receive following PB traceback:")
904
self._sendError(fail, requestID)
906
def _sendError(self, fail, requestID):
907
"""(internal) Send an error for a previously sent message.
909
if isinstance(fail, failure.Failure):
910
# If the failures value is jellyable or allowed through security,
912
if (isinstance(fail.value, Jellyable) or
913
self.security.isClassAllowed(fail.value.__class__)):
915
elif not isinstance(fail, CopyableFailure):
916
fail = failure2Copyable(fail, self.factory.unsafeTracebacks)
917
if isinstance(fail, CopyableFailure):
918
fail.unsafeTracebacks = self.factory.unsafeTracebacks
919
self.sendCall("error", requestID, self.serialize(fail))
921
def proto_error(self, requestID, fail):
922
"""(internal) Deal with an error.
924
d = self.waitingForAnswers[requestID]
925
del self.waitingForAnswers[requestID]
926
d.errback(self.unserialize(fail))
932
def sendDecRef(self, objectID):
933
"""(internal) Send a DECREF directive.
935
self.sendCall("decref", objectID)
937
def proto_decref(self, objectID):
938
"""(internal) Decrement the reference count of an object.
940
If the reference count is zero, it will free the reference to this
943
refs = self.localObjects[objectID].decref()
945
puid = self.localObjects[objectID].object.processUniqueID()
947
del self.localObjects[objectID]
948
self._localCleanup.pop(puid, lambda: None)()
954
def decCacheRef(self, objectID):
955
"""(internal) Send a DECACHE directive.
957
self.sendCall("decache", objectID)
959
def proto_decache(self, objectID):
960
"""(internal) Decrement the reference count of a cached object.
962
If the reference count is zero, free the reference, then send an
963
'uncached' directive.
965
refs = self.remotelyCachedObjects[objectID].decref()
966
# log.msg('decaching: %s #refs: %s' % (objectID, refs))
968
lobj = self.remotelyCachedObjects[objectID]
969
cacheable = lobj.object
970
perspective = lobj.perspective
971
# TODO: force_decache needs to be able to force-invalidate a
972
# cacheable reference.
974
cacheable.stoppedObserving(perspective, RemoteCacheObserver(self, cacheable, perspective))
977
puid = cacheable.processUniqueID()
978
del self.remotelyCachedLUIDs[puid]
979
del self.remotelyCachedObjects[objectID]
980
self.sendCall("uncache", objectID)
982
def proto_uncache(self, objectID):
983
"""(internal) Tell the client it is now OK to uncache an object.
985
# log.msg("uncaching locally %d" % objectID)
986
obj = self.locallyCachedObjects[objectID]
988
## def reallyDel(obj=obj):
989
## obj.__really_del__()
990
## obj.__del__ = reallyDel
991
del self.locallyCachedObjects[objectID]
995
def respond(challenge, password):
996
"""Respond to a challenge.
998
This is useful for challenge/response authentication.
1002
hashedPassword = m.digest()
1004
m.update(hashedPassword)
1006
doubleHashedPassword = m.digest()
1007
return doubleHashedPassword
1010
"""I return some random data."""
1012
for x in range(random.randrange(15,25)):
1013
crap = crap + chr(random.randint(65,90))
1014
crap = md5(crap).digest()
1018
class PBClientFactory(protocol.ClientFactory):
1020
Client factory for PB brokers.
1022
As with all client factories, use with reactor.connectTCP/SSL/etc..
1023
getPerspective and getRootObject can be called either before or
1028
unsafeTracebacks = False
1030
def __init__(self, unsafeTracebacks=False, security=globalSecurity):
1032
@param unsafeTracebacks: if set, tracebacks for exceptions will be sent
1034
@type unsafeTracebacks: C{bool}
1036
@param security: security options used by the broker, default to
1038
@type security: L{twisted.spread.jelly.SecurityOptions}
1040
self.unsafeTracebacks = unsafeTracebacks
1041
self.security = security
1045
def buildProtocol(self, addr):
1047
Build the broker instance, passing the security options to it.
1049
p = self.protocol(isClient=True, security=self.security)
1055
self.rootObjectRequests = [] # list of deferred
1059
def _failAll(self, reason):
1060
deferreds = self.rootObjectRequests
1065
def clientConnectionFailed(self, connector, reason):
1066
self._failAll(reason)
1068
def clientConnectionLost(self, connector, reason, reconnecting=0):
1069
"""Reconnecting subclasses should call with reconnecting=1."""
1071
# any pending requests will go to next connection attempt
1072
# so we don't fail them.
1076
self._failAll(reason)
1078
def clientConnectionMade(self, broker):
1079
self._broker = broker
1080
self._root = broker.remoteForName("root")
1081
ds = self.rootObjectRequests
1082
self.rootObjectRequests = []
1084
d.callback(self._root)
1086
def getRootObject(self):
1087
"""Get root object of remote PB server.
1089
@return: Deferred of the root object.
1091
if self._broker and not self._broker.disconnected:
1092
return defer.succeed(self._root)
1093
d = defer.Deferred()
1094
self.rootObjectRequests.append(d)
1097
def disconnect(self):
1098
"""If the factory is connected, close the connection.
1100
Note that if you set up the factory to reconnect, you will need to
1101
implement extra logic to prevent automatic reconnection after this
1105
self._broker.transport.loseConnection()
1107
def _cbSendUsername(self, root, username, password, client):
1108
return root.callRemote("login", username).addCallback(
1109
self._cbResponse, password, client)
1111
def _cbResponse(self, (challenge, challenger), password, client):
1112
return challenger.callRemote("respond", respond(challenge, password), client)
1115
def _cbLoginAnonymous(self, root, client):
1117
Attempt an anonymous login on the given remote root object.
1119
@type root: L{RemoteReference}
1120
@param root: The object on which to attempt the login, most likely
1121
returned by a call to L{PBClientFactory.getRootObject}.
1123
@param client: A jellyable object which will be used as the I{mind}
1124
parameter for the login attempt.
1127
@return: A L{Deferred} which will be called back with a
1128
L{RemoteReference} to an avatar when anonymous login succeeds, or
1129
which will errback if anonymous login fails.
1131
return root.callRemote("loginAnonymous", client)
1134
def login(self, credentials, client=None):
1136
Login and get perspective from remote PB server.
1138
Currently the following credentials are supported::
1140
L{twisted.cred.credentials.IUsernamePassword}
1141
L{twisted.cred.credentials.IAnonymous}
1144
@return: A L{Deferred} which will be called back with a
1145
L{RemoteReference} for the avatar logged in to, or which will
1146
errback if login fails.
1148
d = self.getRootObject()
1150
if IAnonymous.providedBy(credentials):
1151
d.addCallback(self._cbLoginAnonymous, client)
1154
self._cbSendUsername, credentials.username,
1155
credentials.password, client)
1160
class PBServerFactory(protocol.ServerFactory):
1162
Server factory for perspective broker.
1164
Login is done using a Portal object, whose realm is expected to return
1165
avatars implementing IPerspective. The credential checkers in the portal
1166
should accept IUsernameHashedPassword or IUsernameMD5Password.
1168
Alternatively, any object providing or adaptable to L{IPBRoot} can be
1169
used instead of a portal to provide the root object of the PB server.
1172
unsafeTracebacks = False
1174
# object broker factory
1177
def __init__(self, root, unsafeTracebacks=False, security=globalSecurity):
1179
@param root: factory providing the root Referenceable used by the broker.
1180
@type root: object providing or adaptable to L{IPBRoot}.
1182
@param unsafeTracebacks: if set, tracebacks for exceptions will be sent
1184
@type unsafeTracebacks: C{bool}
1186
@param security: security options used by the broker, default to
1188
@type security: L{twisted.spread.jelly.SecurityOptions}
1190
self.root = IPBRoot(root)
1191
self.unsafeTracebacks = unsafeTracebacks
1192
self.security = security
1195
def buildProtocol(self, addr):
1197
Return a Broker attached to the factory (as the service provider).
1199
proto = self.protocol(isClient=False, security=self.security)
1200
proto.factory = self
1201
proto.setNameForLocal("root", self.root.rootObject(proto))
1204
def clientConnectionMade(self, protocol):
1205
# XXX does this method make any sense?
1209
class IUsernameMD5Password(ICredentials):
1211
I encapsulate a username and a hashed password.
1213
This credential is used for username/password over PB. CredentialCheckers
1214
which check this kind of credential must store the passwords in plaintext
1215
form or as a MD5 digest.
1217
@type username: C{str} or C{Deferred}
1218
@ivar username: The username associated with these credentials.
1221
def checkPassword(password):
1223
Validate these credentials against the correct password.
1225
@type password: C{str}
1226
@param password: The correct, plaintext password against which to
1229
@rtype: C{bool} or L{Deferred}
1230
@return: C{True} if the credentials represented by this object match the
1231
given password, C{False} if they do not, or a L{Deferred} which will
1232
be called back with one of these values.
1235
def checkMD5Password(password):
1237
Validate these credentials against the correct MD5 digest of the
1240
@type password: C{str}
1241
@param password: The correct MD5 digest of a password against which to
1244
@rtype: C{bool} or L{Deferred}
1245
@return: C{True} if the credentials represented by this object match the
1246
given digest, C{False} if they do not, or a L{Deferred} which will
1247
be called back with one of these values.
1252
"""Root object, used to login to portal."""
1256
def __init__(self, portal):
1257
self.portal = portal
1259
def rootObject(self, broker):
1260
return _PortalWrapper(self.portal, broker)
1262
registerAdapter(_PortalRoot, Portal, IPBRoot)
1266
class _JellyableAvatarMixin:
1268
Helper class for code which deals with avatars which PB must be capable of
1271
def _cbLogin(self, (interface, avatar, logout)):
1273
Ensure that the avatar to be returned to the client is jellyable and
1274
set up disconnection notification to call the realm's logout object.
1276
if not IJellyable.providedBy(avatar):
1277
avatar = AsReferenceable(avatar, "perspective")
1279
puid = avatar.processUniqueID()
1281
def dereferenceLogout():
1282
self.broker.dontNotifyOnDisconnect(logout)
1285
self.broker._localCleanup[puid] = dereferenceLogout
1286
# No special helper function is necessary for notifyOnDisconnect
1287
# because dereference callbacks won't be invoked if the connection is
1288
# randomly dropped. I'm not sure those are ideal semantics, but this
1289
# is the only user of the (private) API at the moment and it works just
1290
# fine as things are. -exarkun
1291
self.broker.notifyOnDisconnect(logout)
1296
class _PortalWrapper(Referenceable, _JellyableAvatarMixin):
1298
Root Referenceable object, used to login to portal.
1301
def __init__(self, portal, broker):
1302
self.portal = portal
1303
self.broker = broker
1306
def remote_login(self, username):
1308
Start of username/password login.
1311
return c, _PortalAuthChallenger(self.portal, self.broker, username, c)
1314
def remote_loginAnonymous(self, mind):
1316
Attempt an anonymous login.
1318
@param mind: An object to use as the mind parameter to the portal login
1319
call (possibly None).
1322
@return: A Deferred which will be called back with an avatar when login
1323
succeeds or which will be errbacked if login fails somehow.
1325
d = self.portal.login(Anonymous(), mind, IPerspective)
1326
d.addCallback(self._cbLogin)
1331
class _PortalAuthChallenger(Referenceable, _JellyableAvatarMixin):
1333
Called with response to password challenge.
1335
implements(IUsernameHashedPassword, IUsernameMD5Password)
1337
def __init__(self, portal, broker, username, challenge):
1338
self.portal = portal
1339
self.broker = broker
1340
self.username = username
1341
self.challenge = challenge
1344
def remote_respond(self, response, mind):
1345
self.response = response
1346
d = self.portal.login(self, mind, IPerspective)
1347
d.addCallback(self._cbLogin)
1351
# IUsernameHashedPassword:
1352
def checkPassword(self, password):
1353
return self.checkMD5Password(md5(password).digest())
1356
# IUsernameMD5Password
1357
def checkMD5Password(self, md5Password):
1359
md.update(md5Password)
1360
md.update(self.challenge)
1361
correct = md.digest()
1362
return self.response == correct
1366
# Everything from flavors is exposed publically here.
1367
'IPBRoot', 'Serializable', 'Referenceable', 'NoSuchMethod', 'Root',
1368
'ViewPoint', 'Viewable', 'Copyable', 'Jellyable', 'Cacheable',
1369
'RemoteCopy', 'RemoteCache', 'RemoteCacheObserver', 'copyTags',
1370
'setUnjellyableForClass', 'setUnjellyableFactoryForClass',
1371
'setUnjellyableForClassTree',
1373
'MAX_BROKER_REFS', 'portno',
1375
'ProtocolError', 'DeadReferenceError', 'Error', 'PBConnectionLost',
1376
'RemoteMethod', 'IPerspective', 'Avatar', 'AsReferenceable',
1377
'RemoteReference', 'CopyableFailure', 'CopiedFailure', 'failure2Copyable',
1378
'Broker', 'respond', 'challenge', 'PBClientFactory', 'PBServerFactory',
1379
'IUsernameMD5Password',