1
# -*- test-case-name: twisted.pb.test.test_sturdyref -*-
3
# this module is responsible for sending and receiving OnlyReferenceable and
4
# Referenceable (callable) objects. All details of actually invoking methods
9
from zope.interface import interface
10
from zope.interface import implements, providedBy
11
from twisted.python.components import registerAdapter
12
Interface = interface.Interface
13
from twisted.internet import defer, error
14
from twisted.python import failure, log
16
from twisted.pb import ipb, schema, slicer, tokens, call
17
BananaError = tokens.BananaError
18
Violation = tokens.Violation
19
from twisted.pb.remoteinterface import getRemoteInterface, getRemoteInterfaceByName
20
from twisted.pb.copyable import Copyable, RemoteCopy
22
class OnlyReferenceable(object):
23
implements(ipb.IReferenceable)
25
def processUniqueID(self):
28
class Referenceable(OnlyReferenceable):
29
implements(ipb.IReferenceable, ipb.IRemotelyCallable)
33
# TODO: this code wants to be in an adapter, not a base class. Also, it
34
# would be nice to cache this across the class: if every instance has the
35
# same interfaces, they will have the same values of _interface and
36
# _interfaceName, and it feels silly to store this data separately for
37
# each instance. Perhaps we could compare the instance's interface list
38
# with that of the class and only recompute this stuff if they differ.
40
def getInterface(self):
41
if not self._interface:
42
self._interface = getRemoteInterface(self)
44
self._interfaceName = self._interface.__remote_name__
46
self._interfaceName = None
47
return self._interface
49
def getInterfaceName(self):
51
return self._interfaceName
53
def doRemoteCall(self, methodname, kwargs):
54
meth = getattr(self, "remote_%s" % methodname)
58
class ReferenceableTracker:
59
"""I hold the data which tracks a local Referenceable that is in used by
62
@ivar obj: the actual object
63
@ivar refcount: the number of times this reference has been sent to the
64
remote end, minus the number of DECREF messages which it
65
has sent back. When it goes to zero, the remote end has
66
forgotten the RemoteReference, and is prepared to forget
67
the RemoteReferenceData as soon as the DECREF message is
69
@ivar clid: the connection-local ID used to represent this object on the
73
def __init__(self, tub, obj, puid, clid):
81
"""Increment the refcount.
82
@return: True if this is the first transmission of the reference.
85
if self.refcount == 1:
90
return self.tub.getURLForReference(self.obj)
93
def decref(self, count):
94
"""Call this in response to a DECREF message from the other end.
95
@return: True if the refcount went to zero, meaning this clid should
98
assert self.refcount >= count, "decref(%d) but refcount was %d" % (count, self.refcount)
99
self.refcount -= count
100
if self.refcount == 0:
104
# TODO: rather than subclassing Referenceable, ReferenceableSlicer should be
105
# registered to use for anything which provides any RemoteInterface
107
class ReferenceableSlicer(slicer.BaseSlicer):
108
"""I handle pb.Referenceable objects (things with remotely invokable
109
methods, which are copied by reference).
111
opentype = ('my-reference',)
113
def sliceBody(self, streamable, broker):
114
puid = ipb.IReferenceable(self.obj).processUniqueID()
115
tracker = broker.getTrackerForMyReference(puid, self.obj)
117
firstTime = tracker.send()
119
# this is the first time the Referenceable has crossed this wire.
120
# In addition to the clid, send the interface name (if any), and
121
# any URL this reference might be known by
122
iname = ipb.IRemotelyCallable(self.obj).getInterfaceName()
127
url = tracker.getURL()
131
registerAdapter(ReferenceableSlicer, Referenceable, ipb.ISlicer)
133
class CallableSlicer(slicer.BaseSlicer):
134
"""Bound methods are serialized as my-reference sequences with negative
136
opentype = ('my-reference',)
138
def sliceBody(self, streamable, broker):
139
# TODO: consider this requirement, maybe based upon a Tub flag
140
# assert ipb.ISlicer(self.obj.im_self)
141
# or maybe even isinstance(self.obj.im_self, Referenceable)
143
tracker = broker.getTrackerForMyCall(puid, self.obj)
145
firstTime = tracker.send()
147
# this is the first time the Call has crossed this wire. In
148
# addition to the clid, send the schema name and any URL this
149
# reference might be known by
150
schema = self.getSchema()
155
url = tracker.getURL()
160
return None # TODO: not quite ready yet
161
# callables which are actually bound methods of a pb.Referenceable
162
# can use the schema from that
163
s = ipb.IReferenceable(self.obj.im_self, None)
165
return s.getSchemaForMethodNamed(self.obj.im_func.__name__)
166
# both bound methods and raw callables can also use a .schema
168
return getattr(self.obj, "schema", None)
171
# The CallableSlicer is activated through PBRootSlicer.slicerTable, because a
172
# StorageBanana might want to stick with the old MethodSlicer/FunctionSlicer
174
#registerAdapter(CallableSlicer, types.MethodType, ipb.ISlicer)
177
class ReferenceUnslicer(slicer.BaseUnslicer):
178
"""I turn an incoming 'my-reference' sequence into a RemoteReference or a
179
RemoteMethodReference."""
184
inameConstraint = schema.StringConstraint(200) # TODO: only known RI names?
185
urlConstraint = schema.StringConstraint(200)
187
def checkToken(self, typebyte, size):
189
if typebyte not in (tokens.INT, tokens.NEG):
190
raise BananaError("reference ID must be an INT or NEG")
191
elif self.state == 1:
192
self.inameConstraint.checkToken(typebyte, size)
193
elif self.state == 2:
194
self.urlConstraint.checkToken(typebyte, size)
196
raise Violation("too many parameters in my-reference")
198
def receiveChild(self, obj, ready_deferred=None):
199
assert not isinstance(obj, defer.Deferred)
200
assert ready_deferred is None
204
elif self.state == 1:
205
# must be the interface name
206
self.interfaceName = obj
208
self.interfaceName = None
210
elif self.state == 2:
215
raise BananaError("Too many my-reference parameters")
217
def receiveClose(self):
218
if self.clid is None:
219
raise BananaError("sequence ended too early")
220
tracker = self.broker.getTrackerForYourReference(self.clid,
223
return tracker.getRef(), None
226
if self.clid is None:
228
return "<ref-%s>" % self.clid
232
class RemoteReferenceTracker:
233
"""I hold the data necessary to locate (or create) a RemoteReference.
235
@ivar url: the target Referenceable's global URL
236
@ivar broker: the Broker which holds this RemoteReference
237
@ivar clid: for that Broker, the your-reference CLID for the
239
@ivar interfaceName: the name of a RemoteInterface object that the
240
RemoteReference claims to implement
241
@ivar interface: our version of a RemoteInterface object that corresponds
243
@ivar received_count: the number of times the remote end has send us this
244
object. We must send back decref() calls to match.
245
@ivar ref: a weakref to the RemoteReference itself
248
def __init__(self, parent, clid, url, interfaceName):
251
# TODO: the remote end sends us a global URL, when really it should
252
# probably send us a per-Tub name, which can can then concatenate to
253
# their TubID if/when we pass it on to others. By accepting a full
254
# URL, we give them the ability to sort-of spoof others. We could
255
# check that url.startswith(broker.remoteTub.baseURL), but the Right
256
# Way is to just not have them send the base part in the first place.
257
# I haven't yet made this change because I'm not yet positive it
258
# would work.. how exactly does the base url get sent, anyway? What
259
# about Tubs visible through multiple names?
261
self.interfaceName = interfaceName
262
self.interface = getRemoteInterfaceByName(interfaceName)
263
self.received_count = 0
267
s = "<RemoteReferenceTracker(clid=%d,url=%s)>" % (self.clid, self.url)
271
"""Return the actual RemoteReference that we hold, creating it if
274
ref = RemoteReference(self)
275
self.ref = weakref.ref(ref, self._refLost)
276
self.received_count += 1
279
def _refLost(self, wref):
280
count, self.received_count = self.received_count, 0
281
self.broker.freeYourReference(self, count)
284
class RemoteReferenceOnly(object):
285
def __init__(self, tracker):
286
"""@param tracker: the RemoteReferenceTracker which points to us"""
287
self.tracker = tracker
289
def getSturdyRef(self):
290
return self.tracker.sturdy
292
def notifyOnDisconnect(self, callback):
293
self.tracker.broker.notifyOnDisconnect(callback)
294
def dontNotifyOnDisconnect(self, callback):
295
self.tracker.broker.dontNotifyOnDisconnect(callback)
298
r = "<%s at 0x%x" % (self.__class__.__name__, abs(id(self)))
300
r += " [%s]" % self.tracker.url
304
class RemoteReference(RemoteReferenceOnly):
305
def callRemote(self, _name, *args, **kwargs):
306
# Note: for consistency, *all* failures are reported asynchronously.
309
broker = self.tracker.broker
311
# remember that "none" is not a valid constraint, so we use it to
312
# mean "not set by the caller", which means we fall back to whatever
313
# the RemoteInterface says. Using None would mean an AnyConstraint,
314
# which is not the same thing.
315
methodConstraintOverride = kwargs.get("_methodConstraint", "none")
316
resultConstraint = kwargs.get("_resultConstraint", "none")
317
useSchema = kwargs.get("_useSchema", True)
319
if "_methodConstraint" in kwargs:
320
del kwargs["_methodConstraint"]
321
if "_resultConstraint" in kwargs:
322
del kwargs["_resultConstraint"]
323
if "_useSchema" in kwargs:
324
del kwargs["_useSchema"]
327
# newRequestID() could fail with a DeadReferenceError
328
reqID = broker.newRequestID()
333
# in this clause, we validate the outbound arguments against our
334
# notion of what the other end will accept (the RemoteInterface)
335
req = call.PendingRequest(reqID, self)
337
# first, figure out which method they want to invoke
339
(methodName, methodSchema) = self._getMethodInfo(_name)
340
req.methodName = methodName # for debugging
341
if methodConstraintOverride != "none":
342
methodSchema = methodConstraintOverride
343
if useSchema and methodSchema:
344
# turn positional arguments into kwargs. mapArguments() could
345
# fail for bad argument names or missing required parameters
346
argsdict = methodSchema.mapArguments(args, kwargs)
348
# check args against the arg constraint. This could fail if
349
# any arguments are of the wrong type
350
methodSchema.checkAllArgs(kwargs)
352
# the Interface gets to constraint the return value too, so
353
# make a note of it to use later
354
req.setConstraint(methodSchema.getResponseConstraint())
357
why = "positional arguments require a RemoteInterface"
358
why += " for %s.%s()" % (self, methodName)
359
raise tokens.BananaError(why)
362
# if the caller specified a _resultConstraint, that overrides
364
if resultConstraint != "none":
366
req.setConstraint(schema.makeConstraint(resultConstraint))
368
except: # TODO: merge this with the next try/except clause
369
# we have not yet sent anything to the far end. A failure here
370
# is entirely local: stale broker, bad method name, bad
371
# arguments. We abandon the PendingRequest, but errback the
372
# Deferred it was going to use
373
req.fail(failure.Failure())
377
# once we start sending the CallSlicer, we could get either a
378
# local or a remote failure, so we must be prepared to accept an
379
# answer. After this point, we assign all responsibility to the
380
# PendingRequest structure.
381
self.tracker.broker.addRequest(req)
383
# TODO: there is a decidability problem here: if the reqID made
384
# it through, the other end will send us an answer (possibly an
385
# error if the remaining slices were aborted). If not, we will
386
# not get an answer. To decide whether we should remove our
387
# broker.waitingForAnswers[] entry, we need to know how far the
388
# slicing process made it.
390
slicer = call.CallSlicer(reqID, self.tracker.clid,
391
methodName, argsdict)
393
# this could fail if any of the arguments (or their children)
395
d = broker.send(slicer)
396
# d will fire when the last argument has been serialized. It
397
# will errback if the arguments could not be serialized. We need
398
# to catch this case and errback the caller.
401
req.fail(failure.Failure())
404
# if we got here, we have been able to start serializing the
405
# arguments. If serialization fails, the PendingRequest needs to be
406
# flunked (because we aren't guaranteed that the far end will do it).
408
d.addErrback(req.fail)
410
# the remote end could send back an error response for many reasons:
412
# bad argument types (violated their schema)
413
# exception during method execution
414
# method result violated the results schema
415
# something else could occur to cause an errback:
416
# connection lost before response completely received
417
# exception during deserialization of the response
418
# [but only if it occurs after the reqID is received]
419
# method result violated our results schema
420
# if none of those occurred, the callback will be run
424
def _getMethodInfo(self, name):
425
assert type(name) is str
429
iface = self.tracker.interface
431
interfaceName = iface.__remote_name__
433
methodSchema = iface[name]
435
raise Violation("%s(%s) does not offer %s" % \
436
(interfaceName, self, name))
437
return methodName, methodSchema
440
class RemoteMethodReferenceTracker(RemoteReferenceTracker):
443
ref = RemoteMethodReference(self)
444
self.ref = weakref.ref(ref, self._refLost)
445
self.received_count += 1
448
class RemoteMethodReference(RemoteReference):
449
def callRemote(self, *args, **kwargs):
450
# TODO: I suspect it would safer to use something other than
452
# TODO: this probably needs a very different implementation
454
# there is no schema support yet, so we can't convert positional args
457
return RemoteReference.callRemote(self, "", *args, **kwargs)
459
def _getMethodInfo(self, name):
462
return methodName, methodSchema
465
class YourReferenceSlicer(slicer.BaseSlicer):
466
"""I handle pb.RemoteReference objects (being sent back home to the
467
original pb.Referenceable-holder)
470
def slice(self, streamable, broker):
471
self.streamable = streamable
472
tracker = self.obj.tracker
473
if tracker.broker == broker:
474
# sending back to home broker
475
yield 'your-reference'
478
# sending somewhere else
479
giftID = broker.makeGift(self.obj)
480
yield 'their-reference'
485
return "<your-ref-%s>" % self.obj.tracker.clid
487
registerAdapter(YourReferenceSlicer, RemoteReference, ipb.ISlicer)
489
class YourReferenceUnslicer(slicer.LeafUnslicer):
490
"""I accept incoming (integer) your-reference sequences and try to turn
491
them back into the original Referenceable. I also accept (string)
492
your-reference sequences and try to turn them into a published
493
Referenceable that they did not have access to before."""
496
def checkToken(self, typebyte, size):
497
if typebyte != tokens.INT:
498
raise BananaError("your-reference ID must be an INT")
500
def receiveChild(self, obj, ready_deferred=None):
501
assert not isinstance(obj, defer.Deferred)
502
assert ready_deferred is None
505
def receiveClose(self):
506
if self.clid is None:
507
raise BananaError("sequence ended too early")
508
obj = self.broker.getMyReferenceByCLID(self.clid)
510
raise Violation("unknown clid '%s'" % self.clid)
514
return "<your-ref-%s>" % self.obj.refID
517
class TheirReferenceUnslicer(slicer.LeafUnslicer):
518
"""I accept gifts of third-party references. This is turned into a live
519
reference upon receipt."""
520
# (their-reference, giftID, URL)
524
urlConstraint = schema.StringConstraint(200)
526
def checkToken(self, typebyte, size):
528
if typebyte != tokens.INT:
529
raise BananaError("their-reference giftID must be an INT")
530
elif self.state == 1:
531
self.urlConstraint.checkToken(typebyte, size)
533
raise Violation("too many parameters in their-reference")
535
def receiveChild(self, obj, ready_deferred=None):
536
assert not isinstance(obj, defer.Deferred)
537
assert ready_deferred is None
541
elif self.state == 1:
546
raise BananaError("Too many their-reference parameters")
548
def receiveClose(self):
549
if self.giftID is None or self.url is None:
550
raise BananaError("sequence ended too early")
551
d = self.broker.tub.getReference(self.url)
552
d.addBoth(self.ackGift)
555
def ackGift(self, rref):
556
d = self.broker.remote_broker.callRemote("decgift",
557
giftID=self.giftID, count=1)
558
# if we lose the connection, they'll decref the gift anyway
559
d.addErrback(lambda f: f.trap(ipb.DeadReferenceError))
560
d.addErrback(lambda f: f.trap(error.ConnectionLost))
561
d.addErrback(lambda f: f.trap(error.ConnectionDone))
565
if self.giftID is None:
567
return "<gift-%s>" % self.giftID
569
class SturdyRef(Copyable, RemoteCopy):
570
"""I am a pointer to a Referenceable that lives in some (probably remote)
571
Tub. This pointer is long-lived, however you cannot send messages with it
572
directly. To use it, you must ask your Tub to turn it into a
573
RemoteReference with tub.getReference(sturdyref).
575
The SturdyRef is associated with a URL: you can create a SturdyRef out of
576
a URL that you obtain from some other source, and you can ask the
577
SturdyRef for its URL.
579
SturdyRefs are serialized by copying their URL, and create an identical
580
SturdyRef on the receiving side."""
588
def __init__(self, url=None):
590
# pb://key@{ip:port,host:port,[ipv6]:port}[/unix]/swissnumber
591
# i.e. pb://tubID@{locationHints..}/name
593
# it can live at any one of a variety of network-accessible
594
# locations, or at a single UNIX-domain socket.
596
# there is also an unencrypted form, which is indexed by the
597
# single locationHint, because it does not have a TubID
599
if url.startswith("pb://"):
600
self.encrypted = True
601
url = url[len("pb://"):]
602
slash = url.rfind("/")
603
self.name = url[slash+1:]
606
self.tubID = url[:at]
607
self.locationHints = url[at+1:slash].split(",")
608
elif url.startswith("pbu://"):
609
self.encrypted = False
610
url = url[len("pbu://"):]
611
slash = url.rfind("/")
612
self.name = url[slash+1:]
614
self.location = url[:slash]
616
raise ValueError("unknown PB-URL prefix in '%s'" % url)
620
return TubRef(self.tubID, self.locationHints)
621
return NoAuthTubRef(self.location)
625
return ("pb://" + self.tubID + "@" +
626
",".join(self.locationHints) +
628
return "pbu://" + self.location + "/" + self.name
633
def _distinguishers(self):
634
"""Two SturdyRefs are equivalent if they point to the same object.
635
SturdyRefs to encrypted Tubs only pay attention to the TubID and the
636
reference name. SturdyRefs to unencrypted Tubs must use the location
637
hint instead of the (missing) TubID. This method makes it easier to
638
compare a pair of SturdyRefs."""
640
return (True, self.tubID, self.name)
641
return (False, self.location, self.name)
644
return hash(self._distinguishers())
645
def __cmp__(self, them):
646
return (cmp(type(self), type(them)) or
647
cmp(self.__class__, them.__class__) or
648
cmp(self._distinguishers(), them._distinguishers()))
651
"""Return an object that can be sent over the wire and unserialized
652
as a live RemoteReference on the far end. Use this when you have a
653
SturdyRef and want to give someone a reference to its target, but
654
when you haven't bothered to acquire your own live reference to it."""
656
return _AsLiveRef(self)
659
implements(ipb.ISlicer)
661
def __init__(self, sturdy):
664
def slice(self, streamable, banana):
665
yield 'their-reference'
667
yield self.target.getURL()
668
yield [] # interfacenames
672
"""This is a little helper class which provides a comparable identifier
673
for Tubs. TubRefs can be used as keys in dictionaries that track
674
connections to remote Tubs."""
677
def __init__(self, tubID, locationHints=None):
679
self.locationHints = locationHints
681
def getLocations(self):
682
return self.locationHints
685
return "pb://" + self.tubID
687
def _distinguishers(self):
688
"""This serves the same purpose as SturdyRef._distinguishers."""
692
return hash(self._distinguishers())
693
def __cmp__(self, them):
694
return (cmp(type(self), type(them)) or
695
cmp(self.__class__, them.__class__) or
696
cmp(self._distinguishers(), them._distinguishers()))
698
class NoAuthTubRef(TubRef):
699
# this is only used on outbound connections
702
def __init__(self, location):
703
self.location = location
705
def getLocations(self):
706
return [self.location]
709
return "pbu://" + self.location
711
def _distinguishers(self):
712
"""This serves the same purpose as SturdyRef._distinguishers."""
713
return (self.location,)