1
# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for Perspective Broker module.
7
TODO: update protocol level tests to use new connection API, leaving
8
only specific tests for old API.
11
# issue1195 TODOs: replace pump.pump() with something involving Deferreds.
12
# Clean up warning suppression.
14
import sys, os, time, gc
16
from cStringIO import StringIO
17
from zope.interface import implements, Interface
19
from twisted.python.versions import Version
20
from twisted.trial import unittest
21
from twisted.spread import pb, util, publish, jelly
22
from twisted.internet import protocol, main, reactor
23
from twisted.internet.error import ConnectionRefusedError
24
from twisted.internet.defer import Deferred, gatherResults, succeed
25
from twisted.protocols.policies import WrappingFactory
26
from twisted.python import failure, log
27
from twisted.cred.error import UnauthorizedLogin, UnhandledCredentials
28
from twisted.cred import portal, checkers, credentials
31
class Dummy(pb.Viewable):
32
def view_doNothing(self, user):
33
if isinstance(user, DummyPerspective):
36
return 'goodbye, cruel world!'
39
class DummyPerspective(pb.Avatar):
41
An L{IPerspective} avatar which will be used in some tests.
43
def perspective_getDummyViewPoint(self):
48
class DummyRealm(object):
49
implements(portal.IRealm)
51
def requestAvatar(self, avatarId, mind, *interfaces):
52
for iface in interfaces:
53
if iface is pb.IPerspective:
54
return iface, DummyPerspective(avatarId), lambda: None
59
Utility to pump data between clients and servers for protocol testing.
61
Perhaps this is a utility worthy of being in protocol.py?
63
def __init__(self, client, server, clientIO, serverIO):
66
self.clientIO = clientIO
67
self.serverIO = serverIO
71
Pump until there is no more input or output. This does not run any
72
timers, so don't use it with any code that calls reactor.callLater.
75
timeout = time.time() + 5
77
if time.time() > timeout:
82
Move data back and forth.
84
Returns whether any data was moved.
88
cData = self.clientIO.read()
89
sData = self.serverIO.read()
92
self.clientIO.truncate()
93
self.serverIO.truncate()
94
self.client.transport._checkProducer()
95
self.server.transport._checkProducer()
97
self.server.dataReceived(byte)
99
self.client.dataReceived(byte)
106
def connectedServerAndClient():
108
Returns a 3-tuple: (client, server, pump).
110
clientBroker = pb.Broker()
111
checker = checkers.InMemoryUsernamePasswordDatabaseDontUse(guest='guest')
112
factory = pb.PBServerFactory(portal.Portal(DummyRealm(), [checker]))
113
serverBroker = factory.buildProtocol(('127.0.0.1',))
115
clientTransport = StringIO()
116
serverTransport = StringIO()
117
clientBroker.makeConnection(protocol.FileWrapper(clientTransport))
118
serverBroker.makeConnection(protocol.FileWrapper(serverTransport))
119
pump = IOPump(clientBroker, serverBroker, clientTransport, serverTransport)
120
# Challenge-response authentication:
122
return clientBroker, serverBroker, pump
125
class SimpleRemote(pb.Referenceable):
126
def remote_thunk(self, arg):
130
def remote_knuth(self, arg):
134
class NestedRemote(pb.Referenceable):
135
def remote_getSimple(self):
136
return SimpleRemote()
139
class SimpleCopy(pb.Copyable):
142
self.y = {"Hello":"World"}
146
class SimpleLocalCopy(pb.RemoteCopy):
149
pb.setUnjellyableForClass(SimpleCopy, SimpleLocalCopy)
152
class SimpleFactoryCopy(pb.Copyable):
154
@cvar allIDs: hold every created instances of this class.
155
@type allIDs: C{dict}
158
def __init__(self, id):
160
SimpleFactoryCopy.allIDs[id] = self
163
def createFactoryCopy(state):
165
Factory of L{SimpleFactoryCopy}, getting a created instance given the
166
C{id} found in C{state}.
168
stateId = state.get("id", None)
170
raise RuntimeError("factory copy state has no 'id' member %s" %
172
if not stateId in SimpleFactoryCopy.allIDs:
173
raise RuntimeError("factory class has no ID: %s" %
174
(SimpleFactoryCopy.allIDs,))
175
inst = SimpleFactoryCopy.allIDs[stateId]
177
raise RuntimeError("factory method found no object with id")
180
pb.setUnjellyableFactoryForClass(SimpleFactoryCopy, createFactoryCopy)
183
class NestedCopy(pb.Referenceable):
184
def remote_getCopy(self):
187
def remote_getFactory(self, value):
188
return SimpleFactoryCopy(value)
192
class SimpleCache(pb.Cacheable):
195
self.y = {"Hello":"World"}
199
class NestedComplicatedCache(pb.Referenceable):
201
self.c = VeryVeryComplicatedCacheable()
203
def remote_getCache(self):
207
class VeryVeryComplicatedCacheable(pb.Cacheable):
215
self.observer.callRemote('foo',4)
217
def getStateToCacheAndObserveFor(self, perspective, observer):
218
self.observer = observer
223
def stoppedObserving(self, perspective, observer):
224
log.msg("stopped observing")
225
observer.callRemote("end")
226
if observer == self.observer:
230
class RatherBaroqueCache(pb.RemoteCache):
231
def observe_foo(self, newFoo):
234
def observe_end(self):
235
log.msg("the end of things")
237
pb.setUnjellyableForClass(VeryVeryComplicatedCacheable, RatherBaroqueCache)
240
class SimpleLocalCache(pb.RemoteCache):
241
def setCopyableState(self, state):
242
self.__dict__.update(state)
244
def checkMethod(self):
253
pb.setUnjellyableForClass(SimpleCache, SimpleLocalCache)
256
class NestedCache(pb.Referenceable):
258
self.x = SimpleCache()
260
def remote_getCache(self):
261
return [self.x,self.x]
263
def remote_putCache(self, cache):
264
return (self.x is cache)
267
class Observable(pb.Referenceable):
271
def remote_observe(self, obs):
272
self.observers.append(obs)
274
def remote_unobserve(self, obs):
275
self.observers.remove(obs)
277
def notify(self, obj):
278
for observer in self.observers:
279
observer.callRemote('notify', self, obj)
282
class DeferredRemote(pb.Referenceable):
286
def runMe(self, arg):
290
def dontRunMe(self, arg):
291
assert 0, "shouldn't have been run!"
293
def remote_doItLater(self):
295
Return a L{Deferred} to be fired on client side. When fired,
296
C{self.runMe} is called.
299
d.addCallbacks(self.runMe, self.dontRunMe)
304
class Observer(pb.Referenceable):
307
def remote_notify(self, other, obj):
309
self.notified = self.notified + 1
310
other.callRemote('unobserve',self)
313
class NewStyleCopy(pb.Copyable, pb.RemoteCopy, object):
314
def __init__(self, s):
316
pb.setUnjellyableForClass(NewStyleCopy, NewStyleCopy)
319
class NewStyleCopy2(pb.Copyable, pb.RemoteCopy, object):
325
NewStyleCopy2.allocated += 1
326
inst = object.__new__(self)
331
NewStyleCopy2.initialized += 1
333
pb.setUnjellyableForClass(NewStyleCopy2, NewStyleCopy2)
336
class NewStyleCacheCopy(pb.Cacheable, pb.RemoteCache, object):
337
def getStateToCacheAndObserveFor(self, perspective, observer):
340
pb.setUnjellyableForClass(NewStyleCacheCopy, NewStyleCacheCopy)
343
class Echoer(pb.Root):
344
def remote_echo(self, st):
348
class CachedReturner(pb.Root):
349
def __init__(self, cache):
351
def remote_giveMeCache(self, st):
355
class NewStyleTestCase(unittest.TestCase):
358
Create a pb server using L{Echoer} protocol and connect a client to it.
360
self.serverFactory = pb.PBServerFactory(Echoer())
361
self.wrapper = WrappingFactory(self.serverFactory)
362
self.server = reactor.listenTCP(0, self.wrapper)
363
clientFactory = pb.PBClientFactory()
364
reactor.connectTCP("localhost", self.server.getHost().port,
368
return clientFactory.getRootObject().addCallback(gotRoot)
373
Close client and server connections, reset values of L{NewStyleCopy2}
376
NewStyleCopy2.allocated = 0
377
NewStyleCopy2.initialized = 0
378
NewStyleCopy2.value = 1
379
self.ref.broker.transport.loseConnection()
380
# Disconnect any server-side connections too.
381
for proto in self.wrapper.protocols:
382
proto.transport.loseConnection()
383
return self.server.stopListening()
385
def test_newStyle(self):
387
Create a new style object, send it over the wire, and check the result.
389
orig = NewStyleCopy("value")
390
d = self.ref.callRemote("echo", orig)
392
self.failUnless(isinstance(res, NewStyleCopy))
393
self.failUnlessEqual(res.s, "value")
394
self.failIf(res is orig) # no cheating :)
398
def test_alloc(self):
400
Send a new style object and check the number of allocations.
402
orig = NewStyleCopy2()
403
self.failUnlessEqual(NewStyleCopy2.allocated, 1)
404
self.failUnlessEqual(NewStyleCopy2.initialized, 1)
405
d = self.ref.callRemote("echo", orig)
407
# receiving the response creates a third one on the way back
408
self.failUnless(isinstance(res, NewStyleCopy2))
409
self.failUnlessEqual(res.value, 2)
410
self.failUnlessEqual(NewStyleCopy2.allocated, 3)
411
self.failUnlessEqual(NewStyleCopy2.initialized, 1)
412
self.failIf(res is orig) # no cheating :)
413
# sending the object creates a second one on the far side
419
class ConnectionNotifyServerFactory(pb.PBServerFactory):
421
A server factory which stores the last connection and fires a
422
L{Deferred} on connection made. This factory can handle only one
425
@ivar protocolInstance: the last protocol instance.
426
@type protocolInstance: C{pb.Broker}
428
@ivar connectionMade: the deferred fired upon connection.
429
@type connectionMade: C{Deferred}
431
protocolInstance = None
433
def __init__(self, root):
435
Initialize the factory.
437
pb.PBServerFactory.__init__(self, root)
438
self.connectionMade = Deferred()
441
def clientConnectionMade(self, protocol):
443
Store the protocol and fire the connection deferred.
445
self.protocolInstance = protocol
446
d, self.connectionMade = self.connectionMade, None
452
class NewStyleCachedTestCase(unittest.TestCase):
455
Create a pb server using L{CachedReturner} protocol and connect a
458
self.orig = NewStyleCacheCopy()
459
self.orig.s = "value"
460
self.server = reactor.listenTCP(0,
461
ConnectionNotifyServerFactory(CachedReturner(self.orig)))
462
clientFactory = pb.PBClientFactory()
463
reactor.connectTCP("localhost", self.server.getHost().port,
467
d1 = clientFactory.getRootObject().addCallback(gotRoot)
468
d2 = self.server.factory.connectionMade
469
return gatherResults([d1, d2])
474
Close client and server connections.
476
self.server.factory.protocolInstance.transport.loseConnection()
477
self.ref.broker.transport.loseConnection()
478
return self.server.stopListening()
481
def test_newStyleCache(self):
483
Get the object from the cache, and checks its properties.
485
d = self.ref.callRemote("giveMeCache", self.orig)
487
self.failUnless(isinstance(res, NewStyleCacheCopy))
488
self.failUnlessEqual(res.s, "value")
489
self.failIf(res is self.orig) # no cheating :)
495
class BrokerTestCase(unittest.TestCase):
500
# from RemotePublished.getFileName
501
os.unlink('None-None-TESTING.pub')
505
def thunkErrorBad(self, error):
506
self.fail("This should cause a return value, not %s" % (error,))
508
def thunkResultGood(self, result):
509
self.thunkResult = result
511
def thunkErrorGood(self, tb):
514
def thunkResultBad(self, result):
515
self.fail("This should cause an error, not %s" % (result,))
517
def test_reference(self):
518
c, s, pump = connectedServerAndClient()
520
class X(pb.Referenceable):
521
def remote_catch(self,arg):
524
class Y(pb.Referenceable):
525
def remote_throw(self, a, b):
526
a.callRemote('catch', b)
528
s.setNameForLocal("y", Y())
529
y = c.remoteForName("y")
532
y.callRemote('throw', x, z)
536
self.assertIdentical(x.caught, z, "X should have caught Z")
538
# make sure references to remote methods are equals
539
self.assertEquals(y.remoteMethod('throw'), y.remoteMethod('throw'))
541
def test_result(self):
542
c, s, pump = connectedServerAndClient()
543
for x, y in (c, s), (s, c):
546
x.setNameForLocal("foo", foo)
547
bar = y.remoteForName("foo")
548
self.expectedThunkResult = 8
549
bar.callRemote('thunk',self.expectedThunkResult - 1
550
).addCallbacks(self.thunkResultGood, self.thunkErrorBad)
555
# Shouldn't require any more pumping than that...
556
self.assertEquals(self.thunkResult, self.expectedThunkResult,
557
"result wasn't received.")
559
def refcountResult(self, result):
560
self.nestedRemote = result
562
def test_tooManyRefs(self):
565
c, s, pump = connectedServerAndClient()
567
s.setNameForLocal("foo", foo)
568
x = c.remoteForName("foo")
569
for igno in xrange(pb.MAX_BROKER_REFS + 10):
570
if s.transport.closed or c.transport.closed:
572
x.callRemote("getSimple").addCallbacks(l.append, e.append)
574
expected = (pb.MAX_BROKER_REFS - 1)
575
self.assertTrue(s.transport.closed, "transport was not closed")
576
self.assertEquals(len(l), expected,
577
"expected %s got %s" % (expected, len(l)))
580
c, s, pump = connectedServerAndClient()
582
s.setNameForLocal("foo", foo)
583
x = c.remoteForName("foo")
584
x.callRemote('getCopy'
585
).addCallbacks(self.thunkResultGood, self.thunkErrorBad)
588
self.assertEquals(self.thunkResult.x, 1)
589
self.assertEquals(self.thunkResult.y['Hello'], 'World')
590
self.assertEquals(self.thunkResult.z[0], 'test')
592
def test_observe(self):
593
c, s, pump = connectedServerAndClient()
595
# this is really testing the comparison between remote objects, to make
596
# sure that you can *UN*observe when you have an observer architecture.
599
s.setNameForLocal("a", a)
600
ra = c.remoteForName("a")
601
ra.callRemote('observe',b)
609
self.assertNotIdentical(b.obj, None, "didn't notify")
610
self.assertEquals(b.obj, 1, 'notified too much')
612
def test_defer(self):
613
c, s, pump = connectedServerAndClient()
615
s.setNameForLocal("d", d)
616
e = c.remoteForName("d")
617
pump.pump(); pump.pump()
619
e.callRemote('doItLater').addCallback(results.append)
620
pump.pump(); pump.pump()
621
self.assertFalse(d.run, "Deferred method run too early.")
623
self.assertEquals(d.run, 5, "Deferred method run too late.")
624
pump.pump(); pump.pump()
625
self.assertEquals(results[0], 6, "Incorrect result.")
628
def test_refcount(self):
629
c, s, pump = connectedServerAndClient()
631
s.setNameForLocal("foo", foo)
632
bar = c.remoteForName("foo")
633
bar.callRemote('getSimple'
634
).addCallbacks(self.refcountResult, self.thunkErrorBad)
641
# delving into internal structures here, because GC is sort of
642
# inherently internal.
643
rluid = self.nestedRemote.luid
644
self.assertIn(rluid, s.localObjects)
645
del self.nestedRemote
647
if sys.hexversion >= 0x2000000 and os.name != "java":
649
# try to nudge the GC even if we can't really
653
self.assertNotIn(rluid, s.localObjects)
655
def test_cache(self):
656
c, s, pump = connectedServerAndClient()
658
obj2 = NestedComplicatedCache()
660
s.setNameForLocal("obj", obj)
661
s.setNameForLocal("xxx", obj2)
662
o2 = c.remoteForName("obj")
663
o3 = c.remoteForName("xxx")
665
o2.callRemote("getCache"
666
).addCallback(coll.append).addErrback(coll.append)
667
o2.callRemote("getCache"
668
).addCallback(coll.append).addErrback(coll.append)
670
o3.callRemote("getCache").addCallback(complex.append)
671
o3.callRemote("getCache").addCallback(complex.append)
673
# `worst things first'
674
self.assertEquals(complex[0].x, 1)
675
self.assertEquals(complex[0].y, 2)
676
self.assertEquals(complex[0].foo, 3)
680
self.assertEquals(complex[0].foo, 4)
681
self.assertEquals(len(coll), 2)
683
self.assertIdentical(cp.checkMethod().im_self, cp,
684
"potential refcounting issue")
685
self.assertIdentical(cp.checkSelf(), cp,
686
"other potential refcounting issue")
688
o2.callRemote('putCache',cp).addCallback(col2.append)
690
# The objects were the same (testing lcache identity)
691
self.assertTrue(col2[0])
692
# test equality of references to methods
693
self.assertEquals(o2.remoteMethod("getCache"),
694
o2.remoteMethod("getCache"))
696
# now, refcounting (similiar to testRefCount)
698
baroqueLuid = complex[0].luid
699
self.assertIn(luid, s.remotelyCachedObjects,
700
"remote cache doesn't have it")
710
if sys.hexversion >= 0x2000000 and os.name != "java":
712
# try to nudge the GC even if we can't really
714
# The GC is done with it.
715
self.assertNotIn(luid, s.remotelyCachedObjects,
716
"Server still had it after GC")
717
self.assertNotIn(luid, c.locallyCachedObjects,
718
"Client still had it after GC")
719
self.assertNotIn(baroqueLuid, s.remotelyCachedObjects,
720
"Server still had complex after GC")
721
self.assertNotIn(baroqueLuid, c.locallyCachedObjects,
722
"Client still had complex after GC")
723
self.assertIdentical(vcc.observer, None, "observer was not removed")
725
def test_publishable(self):
727
os.unlink('None-None-TESTING.pub') # from RemotePublished.getFileName
729
pass # Sometimes it's not there.
730
c, s, pump = connectedServerAndClient()
732
# foo.pub.timestamp = 1.0
733
s.setNameForLocal("foo", foo)
734
bar = c.remoteForName("foo")
736
bar.callRemote('getPub').addCallbacks(accum.append, self.thunkErrorBad)
739
self.assertEquals(obj.activateCalled, 1)
740
self.assertEquals(obj.isActivated, 1)
741
self.assertEquals(obj.yayIGotPublished, 1)
742
# timestamp's dirty, we don't have a cache file
743
self.assertEquals(obj._wasCleanWhenLoaded, 0)
744
c, s, pump = connectedServerAndClient()
745
s.setNameForLocal("foo", foo)
746
bar = c.remoteForName("foo")
747
bar.callRemote('getPub').addCallbacks(accum.append, self.thunkErrorBad)
750
# timestamp's clean, our cache file is up-to-date
751
self.assertEquals(obj._wasCleanWhenLoaded, 1)
753
def gotCopy(self, val):
754
self.thunkResult = val.id
757
def test_factoryCopy(self):
758
c, s, pump = connectedServerAndClient()
761
s.setNameForLocal("foo", obj)
762
x = c.remoteForName("foo")
763
x.callRemote('getFactory', ID
764
).addCallbacks(self.gotCopy, self.thunkResultBad)
768
self.assertEquals(self.thunkResult, ID,
769
"ID not correct on factory object %s" % (self.thunkResult,))
772
bigString = "helloworld" * 50
775
callbackKeyword = None
777
def finishedCallback(*args, **kw):
778
global callbackArgs, callbackKeyword
783
class Pagerizer(pb.Referenceable):
784
def __init__(self, callback, *args, **kw):
785
self.callback, self.args, self.kw = callback, args, kw
787
def remote_getPages(self, collector):
788
util.StringPager(collector, bigString, 100,
789
self.callback, *self.args, **self.kw)
790
self.args = self.kw = None
793
class FilePagerizer(pb.Referenceable):
796
def __init__(self, filename, callback, *args, **kw):
797
self.filename = filename
798
self.callback, self.args, self.kw = callback, args, kw
800
def remote_getPages(self, collector):
801
self.pager = util.FilePager(collector, file(self.filename),
802
self.callback, *self.args, **self.kw)
803
self.args = self.kw = None
807
class PagingTestCase(unittest.TestCase):
809
Test pb objects sending data by pages.
814
Create a file used to test L{util.FilePager}.
816
self.filename = self.mktemp()
817
fd = file(self.filename, 'w')
822
def test_pagingWithCallback(self):
824
Test L{util.StringPager}, passing a callback to fire when all pages
827
c, s, pump = connectedServerAndClient()
828
s.setNameForLocal("foo", Pagerizer(finishedCallback, 'hello', value=10))
829
x = c.remoteForName("foo")
831
util.getAllPages(x, "getPages").addCallback(l.append)
834
self.assertEquals(''.join(l[0]), bigString,
835
"Pages received not equal to pages sent!")
836
self.assertEquals(callbackArgs, ('hello',),
837
"Completed callback not invoked")
838
self.assertEquals(callbackKeyword, {'value': 10},
839
"Completed callback not invoked")
842
def test_pagingWithoutCallback(self):
844
Test L{util.StringPager} without a callback.
846
c, s, pump = connectedServerAndClient()
847
s.setNameForLocal("foo", Pagerizer(None))
848
x = c.remoteForName("foo")
850
util.getAllPages(x, "getPages").addCallback(l.append)
853
self.assertEquals(''.join(l[0]), bigString,
854
"Pages received not equal to pages sent!")
857
def test_emptyFilePaging(self):
859
Test L{util.FilePager}, sending an empty file.
861
filenameEmpty = self.mktemp()
862
fd = file(filenameEmpty, 'w')
864
c, s, pump = connectedServerAndClient()
865
pagerizer = FilePagerizer(filenameEmpty, None)
866
s.setNameForLocal("bar", pagerizer)
867
x = c.remoteForName("bar")
869
util.getAllPages(x, "getPages").addCallback(l.append)
871
while not l and ttl > 0:
875
self.fail('getAllPages timed out')
876
self.assertEquals(''.join(l[0]), '',
877
"Pages received not equal to pages sent!")
880
def test_filePagingWithCallback(self):
882
Test L{util.FilePager}, passing a callback to fire when all pages
883
are sent, and verify that the pager doesn't keep chunks in memory.
885
c, s, pump = connectedServerAndClient()
886
pagerizer = FilePagerizer(self.filename, finishedCallback,
888
s.setNameForLocal("bar", pagerizer)
889
x = c.remoteForName("bar")
891
util.getAllPages(x, "getPages").addCallback(l.append)
894
self.assertEquals(''.join(l[0]), bigString,
895
"Pages received not equal to pages sent!")
896
self.assertEquals(callbackArgs, ('frodo',),
897
"Completed callback not invoked")
898
self.assertEquals(callbackKeyword, {'value': 9},
899
"Completed callback not invoked")
900
self.assertEquals(pagerizer.pager.chunks, [])
903
def test_filePagingWithoutCallback(self):
905
Test L{util.FilePager} without a callback.
907
c, s, pump = connectedServerAndClient()
908
pagerizer = FilePagerizer(self.filename, None)
909
s.setNameForLocal("bar", pagerizer)
910
x = c.remoteForName("bar")
912
util.getAllPages(x, "getPages").addCallback(l.append)
915
self.assertEquals(''.join(l[0]), bigString,
916
"Pages received not equal to pages sent!")
917
self.assertEquals(pagerizer.pager.chunks, [])
921
class DumbPublishable(publish.Publishable):
922
def getStateToPublish(self):
923
return {"yayIGotPublished": 1}
926
class DumbPub(publish.RemotePublished):
928
self.activateCalled = 1
931
class GetPublisher(pb.Referenceable):
933
self.pub = DumbPublishable("TESTING")
935
def remote_getPub(self):
939
pb.setUnjellyableForClass(DumbPublishable, DumbPub)
941
class DisconnectionTestCase(unittest.TestCase):
943
Test disconnection callbacks.
946
def error(self, *args):
947
raise RuntimeError("I shouldn't have been called: %s" % (args,))
950
def gotDisconnected(self):
952
Called on broker disconnect.
956
def objectDisconnected(self, o):
958
Called on RemoteReference disconnect.
960
self.assertEquals(o, self.remoteObject)
961
self.objectCallback = 1
963
def test_badSerialization(self):
964
c, s, pump = connectedServerAndClient()
966
s.setNameForLocal("o", BadCopySet())
967
g = c.remoteForName("o")
969
g.callRemote("setBadCopy", BadCopyable()).addErrback(l.append)
971
self.assertEquals(len(l), 1)
973
def test_disconnection(self):
974
c, s, pump = connectedServerAndClient()
976
s.setNameForLocal("o", SimpleRemote())
978
# get a client reference to server object
979
r = c.remoteForName("o")
984
# register and then unregister disconnect callbacks
985
# making sure they get unregistered
986
c.notifyOnDisconnect(self.error)
987
self.assertIn(self.error, c.disconnects)
988
c.dontNotifyOnDisconnect(self.error)
989
self.assertNotIn(self.error, c.disconnects)
991
r.notifyOnDisconnect(self.error)
992
self.assertIn(r._disconnected, c.disconnects)
993
self.assertIn(self.error, r.disconnectCallbacks)
994
r.dontNotifyOnDisconnect(self.error)
995
self.assertNotIn(r._disconnected, c.disconnects)
996
self.assertNotIn(self.error, r.disconnectCallbacks)
998
# register disconnect callbacks
999
c.notifyOnDisconnect(self.gotDisconnected)
1000
r.notifyOnDisconnect(self.objectDisconnected)
1001
self.remoteObject = r
1004
c.connectionLost(failure.Failure(main.CONNECTION_DONE))
1005
self.assertTrue(self.gotCallback)
1006
self.assertTrue(self.objectCallback)
1009
class FreakOut(Exception):
1013
class BadCopyable(pb.Copyable):
1014
def getStateToCopyFor(self, p):
1018
class BadCopySet(pb.Referenceable):
1019
def remote_setBadCopy(self, bc):
1023
class LocalRemoteTest(util.LocalAsRemote):
1024
reportAllTracebacks = 0
1026
def sync_add1(self, x):
1029
def async_add(self, x=0, y=1):
1032
def async_fail(self):
1033
raise RuntimeError()
1037
class MyPerspective(pb.Avatar):
1039
@ivar loggedIn: set to C{True} when the avatar is logged in.
1040
@type loggedIn: C{bool}
1042
@ivar loggedOut: set to C{True} when the avatar is logged out.
1043
@type loggedOut: C{bool}
1045
implements(pb.IPerspective)
1047
loggedIn = loggedOut = False
1049
def __init__(self, avatarId):
1050
self.avatarId = avatarId
1053
def perspective_getAvatarId(self):
1055
Return the avatar identifier which was used to access this avatar.
1057
return self.avatarId
1060
def perspective_getViewPoint(self):
1064
def perspective_add(self, a, b):
1066
Add the given objects and return the result. This is a method
1067
unavailable on L{Echoer}, so it can only be invoked by authenticated
1068
users who received their avatar from L{TestRealm}.
1074
self.loggedOut = True
1078
class TestRealm(object):
1080
A realm which repeatedly gives out a single instance of L{MyPerspective}
1081
for non-anonymous logins and which gives out a new instance of L{Echoer}
1082
for each anonymous login.
1084
@ivar lastPerspective: The L{MyPerspective} most recently created and
1085
returned from C{requestAvatar}.
1087
@ivar perspectiveFactory: A one-argument callable which will be used to
1088
create avatars to be returned from C{requestAvatar}.
1090
perspectiveFactory = MyPerspective
1092
lastPerspective = None
1094
def requestAvatar(self, avatarId, mind, interface):
1096
Verify that the mind and interface supplied have the expected values
1097
(this should really be done somewhere else, like inside a test method)
1098
and return an avatar appropriate for the given identifier.
1100
assert interface == pb.IPerspective
1101
assert mind == "BRAINS!"
1102
if avatarId is checkers.ANONYMOUS:
1103
return pb.IPerspective, Echoer(), lambda: None
1105
self.lastPerspective = self.perspectiveFactory(avatarId)
1106
self.lastPerspective.loggedIn = True
1108
pb.IPerspective, self.lastPerspective,
1109
self.lastPerspective.logout)
1113
class MyView(pb.Viewable):
1115
def view_check(self, user):
1116
return isinstance(user, MyPerspective)
1120
class NewCredTestCase(unittest.TestCase):
1122
Tests related to the L{twisted.cred} support in PB.
1126
Create a portal with no checkers and wrap it around a simple test
1127
realm. Set up a PB server on a TCP port which serves perspectives
1130
self.realm = TestRealm()
1131
self.portal = portal.Portal(self.realm)
1132
self.factory = ConnectionNotifyServerFactory(self.portal)
1133
self.port = reactor.listenTCP(0, self.factory, interface="127.0.0.1")
1134
self.portno = self.port.getHost().port
1139
Shut down the TCP port created by L{setUp}.
1141
return self.port.stopListening()
1144
def getFactoryAndRootObject(self, clientFactory=pb.PBClientFactory):
1146
Create a connection to the test server.
1148
@param clientFactory: the factory class used to create the connection.
1150
@return: a tuple (C{factory}, C{deferred}), where factory is an
1151
instance of C{clientFactory} and C{deferred} the L{Deferred} firing
1152
with the PB root object.
1154
factory = clientFactory()
1155
rootObjDeferred = factory.getRootObject()
1156
connector = reactor.connectTCP('127.0.0.1', self.portno, factory)
1157
self.addCleanup(connector.disconnect)
1158
return factory, rootObjDeferred
1161
def test_getRootObject(self):
1163
Assert only that L{PBClientFactory.getRootObject}'s Deferred fires with
1164
a L{RemoteReference}.
1166
factory, rootObjDeferred = self.getFactoryAndRootObject()
1168
def gotRootObject(rootObj):
1169
self.assertIsInstance(rootObj, pb.RemoteReference)
1170
disconnectedDeferred = Deferred()
1171
rootObj.notifyOnDisconnect(disconnectedDeferred.callback)
1172
factory.disconnect()
1173
return disconnectedDeferred
1175
return rootObjDeferred.addCallback(gotRootObject)
1178
def test_deadReferenceError(self):
1180
Test that when a connection is lost, calling a method on a
1181
RemoteReference obtained from it raises DeadReferenceError.
1183
factory, rootObjDeferred = self.getFactoryAndRootObject()
1185
def gotRootObject(rootObj):
1186
disconnectedDeferred = Deferred()
1187
rootObj.notifyOnDisconnect(disconnectedDeferred.callback)
1189
def lostConnection(ign):
1191
pb.DeadReferenceError,
1192
rootObj.callRemote, 'method')
1194
disconnectedDeferred.addCallback(lostConnection)
1195
factory.disconnect()
1196
return disconnectedDeferred
1198
return rootObjDeferred.addCallback(gotRootObject)
1201
def test_clientConnectionLost(self):
1203
Test that if the L{reconnecting} flag is passed with a True value then
1204
a remote call made from a disconnection notification callback gets a
1205
result successfully.
1207
class ReconnectOnce(pb.PBClientFactory):
1208
reconnectedAlready = False
1209
def clientConnectionLost(self, connector, reason):
1210
reconnecting = not self.reconnectedAlready
1211
self.reconnectedAlready = True
1214
return pb.PBClientFactory.clientConnectionLost(
1215
self, connector, reason, reconnecting)
1217
factory, rootObjDeferred = self.getFactoryAndRootObject(ReconnectOnce)
1219
def gotRootObject(rootObj):
1220
self.assertIsInstance(rootObj, pb.RemoteReference)
1223
rootObj.notifyOnDisconnect(d.callback)
1224
factory.disconnect()
1226
def disconnected(ign):
1227
d = factory.getRootObject()
1229
def gotAnotherRootObject(anotherRootObj):
1230
self.assertIsInstance(anotherRootObj, pb.RemoteReference)
1233
anotherRootObj.notifyOnDisconnect(d.callback)
1234
factory.disconnect()
1236
return d.addCallback(gotAnotherRootObject)
1237
return d.addCallback(disconnected)
1238
return rootObjDeferred.addCallback(gotRootObject)
1241
def test_immediateClose(self):
1243
Test that if a Broker loses its connection without receiving any bytes,
1244
it doesn't raise any exceptions or log any errors.
1246
serverProto = self.factory.buildProtocol(('127.0.0.1', 12345))
1247
serverProto.makeConnection(protocol.FileWrapper(StringIO()))
1248
serverProto.connectionLost(failure.Failure(main.CONNECTION_DONE))
1251
def test_loginConnectionRefused(self):
1253
L{PBClientFactory.login} returns a L{Deferred} which is errbacked
1254
with the L{ConnectionRefusedError} if the underlying connection is
1257
clientFactory = pb.PBClientFactory()
1258
loginDeferred = clientFactory.login(
1259
credentials.UsernamePassword("foo", "bar"))
1260
clientFactory.clientConnectionFailed(
1263
ConnectionRefusedError("Test simulated refused connection")))
1264
return self.assertFailure(loginDeferred, ConnectionRefusedError)
1267
def _disconnect(self, ignore, factory):
1269
Helper method disconnecting the given client factory and returning a
1270
C{Deferred} that will fire when the server connection has noticed the
1273
disconnectedDeferred = Deferred()
1274
self.factory.protocolInstance.notifyOnDisconnect(
1275
lambda: disconnectedDeferred.callback(None))
1276
factory.disconnect()
1277
return disconnectedDeferred
1280
def test_loginLogout(self):
1282
Test that login can be performed with IUsernamePassword credentials and
1283
that when the connection is dropped the avatar is logged out.
1285
self.portal.registerChecker(
1286
checkers.InMemoryUsernamePasswordDatabaseDontUse(user='pass'))
1287
factory = pb.PBClientFactory()
1288
creds = credentials.UsernamePassword("user", "pass")
1290
# NOTE: real code probably won't need anything where we have the
1291
# "BRAINS!" argument, passing None is fine. We just do it here to
1292
# test that it is being passed. It is used to give additional info to
1293
# the realm to aid perspective creation, if you don't need that,
1297
d = factory.login(creds, mind)
1298
def cbLogin(perspective):
1299
self.assertTrue(self.realm.lastPerspective.loggedIn)
1300
self.assertIsInstance(perspective, pb.RemoteReference)
1301
return self._disconnect(None, factory)
1302
d.addCallback(cbLogin)
1304
def cbLogout(ignored):
1305
self.assertTrue(self.realm.lastPerspective.loggedOut)
1306
d.addCallback(cbLogout)
1308
connector = reactor.connectTCP("127.0.0.1", self.portno, factory)
1309
self.addCleanup(connector.disconnect)
1313
def test_logoutAfterDecref(self):
1315
If a L{RemoteReference} to an L{IPerspective} avatar is decrefed and
1316
there remain no other references to the avatar on the server, the
1317
avatar is garbage collected and the logout method called.
1319
loggedOut = Deferred()
1321
class EventPerspective(pb.Avatar):
1323
An avatar which fires a Deferred when it is logged out.
1325
def __init__(self, avatarId):
1329
loggedOut.callback(None)
1331
self.realm.perspectiveFactory = EventPerspective
1333
self.portal.registerChecker(
1334
checkers.InMemoryUsernamePasswordDatabaseDontUse(foo='bar'))
1335
factory = pb.PBClientFactory()
1337
credentials.UsernamePassword('foo', 'bar'), "BRAINS!")
1338
def cbLoggedIn(avatar):
1339
# Just wait for the logout to happen, as it should since the
1340
# reference to the avatar will shortly no longer exists.
1342
d.addCallback(cbLoggedIn)
1343
def cbLoggedOut(ignored):
1344
# Verify that the server broker's _localCleanup dict isn't growing
1346
self.assertEqual(self.factory.protocolInstance._localCleanup, {})
1347
d.addCallback(cbLoggedOut)
1348
d.addCallback(self._disconnect, factory)
1349
connector = reactor.connectTCP("127.0.0.1", self.portno, factory)
1350
self.addCleanup(connector.disconnect)
1354
def test_concurrentLogin(self):
1356
Two different correct login attempts can be made on the same root
1357
object at the same time and produce two different resulting avatars.
1359
self.portal.registerChecker(
1360
checkers.InMemoryUsernamePasswordDatabaseDontUse(
1361
foo='bar', baz='quux'))
1362
factory = pb.PBClientFactory()
1364
firstLogin = factory.login(
1365
credentials.UsernamePassword('foo', 'bar'), "BRAINS!")
1366
secondLogin = factory.login(
1367
credentials.UsernamePassword('baz', 'quux'), "BRAINS!")
1368
d = gatherResults([firstLogin, secondLogin])
1369
def cbLoggedIn((first, second)):
1370
return gatherResults([
1371
first.callRemote('getAvatarId'),
1372
second.callRemote('getAvatarId')])
1373
d.addCallback(cbLoggedIn)
1374
def cbAvatarIds((first, second)):
1375
self.assertEqual(first, 'foo')
1376
self.assertEqual(second, 'baz')
1377
d.addCallback(cbAvatarIds)
1378
d.addCallback(self._disconnect, factory)
1380
connector = reactor.connectTCP('127.0.0.1', self.portno, factory)
1381
self.addCleanup(connector.disconnect)
1385
def test_badUsernamePasswordLogin(self):
1387
Test that a login attempt with an invalid user or invalid password
1388
fails in the appropriate way.
1390
self.portal.registerChecker(
1391
checkers.InMemoryUsernamePasswordDatabaseDontUse(user='pass'))
1392
factory = pb.PBClientFactory()
1394
firstLogin = factory.login(
1395
credentials.UsernamePassword('nosuchuser', 'pass'))
1396
secondLogin = factory.login(
1397
credentials.UsernamePassword('user', 'wrongpass'))
1399
self.assertFailure(firstLogin, UnauthorizedLogin)
1400
self.assertFailure(secondLogin, UnauthorizedLogin)
1401
d = gatherResults([firstLogin, secondLogin])
1403
def cleanup(ignore):
1404
errors = self.flushLoggedErrors(UnauthorizedLogin)
1405
self.assertEquals(len(errors), 2)
1406
return self._disconnect(None, factory)
1407
d.addCallback(cleanup)
1409
connector = reactor.connectTCP("127.0.0.1", self.portno, factory)
1410
self.addCleanup(connector.disconnect)
1414
def test_anonymousLogin(self):
1416
Verify that a PB server using a portal configured with an checker which
1417
allows IAnonymous credentials can be logged into using IAnonymous
1420
self.portal.registerChecker(checkers.AllowAnonymousAccess())
1421
factory = pb.PBClientFactory()
1422
d = factory.login(credentials.Anonymous(), "BRAINS!")
1424
def cbLoggedIn(perspective):
1425
return perspective.callRemote('echo', 123)
1426
d.addCallback(cbLoggedIn)
1428
d.addCallback(self.assertEqual, 123)
1430
d.addCallback(self._disconnect, factory)
1432
connector = reactor.connectTCP("127.0.0.1", self.portno, factory)
1433
self.addCleanup(connector.disconnect)
1437
def test_anonymousLoginNotPermitted(self):
1439
Verify that without an anonymous checker set up, anonymous login is
1442
self.portal.registerChecker(
1443
checkers.InMemoryUsernamePasswordDatabaseDontUse(user='pass'))
1444
factory = pb.PBClientFactory()
1445
d = factory.login(credentials.Anonymous(), "BRAINS!")
1446
self.assertFailure(d, UnhandledCredentials)
1448
def cleanup(ignore):
1449
errors = self.flushLoggedErrors(UnhandledCredentials)
1450
self.assertEquals(len(errors), 1)
1451
return self._disconnect(None, factory)
1452
d.addCallback(cleanup)
1454
connector = reactor.connectTCP('127.0.0.1', self.portno, factory)
1455
self.addCleanup(connector.disconnect)
1459
def test_anonymousLoginWithMultipleCheckers(self):
1461
Like L{test_anonymousLogin} but against a portal with a checker for
1462
both IAnonymous and IUsernamePassword.
1464
self.portal.registerChecker(checkers.AllowAnonymousAccess())
1465
self.portal.registerChecker(
1466
checkers.InMemoryUsernamePasswordDatabaseDontUse(user='pass'))
1467
factory = pb.PBClientFactory()
1468
d = factory.login(credentials.Anonymous(), "BRAINS!")
1470
def cbLogin(perspective):
1471
return perspective.callRemote('echo', 123)
1472
d.addCallback(cbLogin)
1474
d.addCallback(self.assertEqual, 123)
1476
d.addCallback(self._disconnect, factory)
1478
connector = reactor.connectTCP('127.0.0.1', self.portno, factory)
1479
self.addCleanup(connector.disconnect)
1483
def test_authenticatedLoginWithMultipleCheckers(self):
1485
Like L{test_anonymousLoginWithMultipleCheckers} but check that
1486
username/password authentication works.
1488
self.portal.registerChecker(checkers.AllowAnonymousAccess())
1489
self.portal.registerChecker(
1490
checkers.InMemoryUsernamePasswordDatabaseDontUse(user='pass'))
1491
factory = pb.PBClientFactory()
1493
credentials.UsernamePassword('user', 'pass'), "BRAINS!")
1495
def cbLogin(perspective):
1496
return perspective.callRemote('add', 100, 23)
1497
d.addCallback(cbLogin)
1499
d.addCallback(self.assertEqual, 123)
1501
d.addCallback(self._disconnect, factory)
1503
connector = reactor.connectTCP('127.0.0.1', self.portno, factory)
1504
self.addCleanup(connector.disconnect)
1508
def test_view(self):
1510
Verify that a viewpoint can be retrieved after authenticating with
1513
self.portal.registerChecker(
1514
checkers.InMemoryUsernamePasswordDatabaseDontUse(user='pass'))
1515
factory = pb.PBClientFactory()
1517
credentials.UsernamePassword("user", "pass"), "BRAINS!")
1519
def cbLogin(perspective):
1520
return perspective.callRemote("getViewPoint")
1521
d.addCallback(cbLogin)
1523
def cbView(viewpoint):
1524
return viewpoint.callRemote("check")
1525
d.addCallback(cbView)
1527
d.addCallback(self.assertTrue)
1529
d.addCallback(self._disconnect, factory)
1531
connector = reactor.connectTCP("127.0.0.1", self.portno, factory)
1532
self.addCleanup(connector.disconnect)
1537
class NonSubclassingPerspective:
1538
implements(pb.IPerspective)
1540
def __init__(self, avatarId):
1543
# IPerspective implementation
1544
def perspectiveMessageReceived(self, broker, message, args, kwargs):
1545
args = broker.unserialize(args, self)
1546
kwargs = broker.unserialize(kwargs, self)
1547
return broker.serialize((message, args, kwargs))
1549
# Methods required by TestRealm
1551
self.loggedOut = True
1555
class NSPTestCase(unittest.TestCase):
1557
Tests for authentication against a realm where the L{IPerspective}
1558
implementation is not a subclass of L{Avatar}.
1561
self.realm = TestRealm()
1562
self.realm.perspectiveFactory = NonSubclassingPerspective
1563
self.portal = portal.Portal(self.realm)
1564
self.checker = checkers.InMemoryUsernamePasswordDatabaseDontUse()
1565
self.checker.addUser("user", "pass")
1566
self.portal.registerChecker(self.checker)
1567
self.factory = WrappingFactory(pb.PBServerFactory(self.portal))
1568
self.port = reactor.listenTCP(0, self.factory, interface="127.0.0.1")
1569
self.addCleanup(self.port.stopListening)
1570
self.portno = self.port.getHost().port
1575
An L{IPerspective} implementation which does not subclass
1576
L{Avatar} can expose remote methods for the client to call.
1578
factory = pb.PBClientFactory()
1579
d = factory.login(credentials.UsernamePassword('user', 'pass'),
1581
reactor.connectTCP('127.0.0.1', self.portno, factory)
1582
d.addCallback(lambda p: p.callRemote('ANYTHING', 'here', bar='baz'))
1583
d.addCallback(self.assertEquals,
1584
('ANYTHING', ('here',), {'bar': 'baz'}))
1585
def cleanup(ignored):
1586
factory.disconnect()
1587
for p in self.factory.protocols:
1588
p.transport.loseConnection()
1589
d.addCallback(cleanup)
1594
class IForwarded(Interface):
1596
Interface used for testing L{util.LocalAsyncForwarder}.
1601
Simple synchronous method.
1604
def forwardDeferred():
1606
Simple asynchronous method.
1612
Test implementation of L{IForwarded}.
1614
@ivar forwarded: set if C{forwardMe} is called.
1615
@type forwarded: C{bool}
1616
@ivar unforwarded: set if C{dontForwardMe} is called.
1617
@type unforwarded: C{bool}
1619
implements(IForwarded)
1623
def forwardMe(self):
1625
Set a local flag to test afterwards.
1627
self.forwarded = True
1629
def dontForwardMe(self):
1631
Set a local flag to test afterwards. This should not be called as it's
1632
not in the interface.
1634
self.unforwarded = True
1636
def forwardDeferred(self):
1638
Asynchronously return C{True}.
1640
return succeed(True)
1643
class SpreadUtilTestCase(unittest.TestCase):
1645
Tests for L{twisted.spread.util}.
1648
def test_sync(self):
1650
Call a synchronous method of a L{util.LocalAsRemote} object and check
1653
o = LocalRemoteTest()
1654
self.assertEquals(o.callRemote("add1", 2), 3)
1656
def test_async(self):
1658
Call an asynchronous method of a L{util.LocalAsRemote} object and check
1661
o = LocalRemoteTest()
1662
o = LocalRemoteTest()
1663
d = o.callRemote("add", 2, y=4)
1664
self.assertIsInstance(d, Deferred)
1665
d.addCallback(self.assertEquals, 6)
1668
def test_asyncFail(self):
1670
Test a asynchronous failure on a remote method call.
1673
o = LocalRemoteTest()
1674
d = o.callRemote("fail")
1676
self.assertTrue(isinstance(f, failure.Failure))
1677
f.trap(RuntimeError)
1678
d.addCallbacks(lambda res: self.fail("supposed to fail"), eb)
1681
def test_remoteMethod(self):
1683
Test the C{remoteMethod} facility of L{util.LocalAsRemote}.
1685
o = LocalRemoteTest()
1686
m = o.remoteMethod("add1")
1687
self.assertEquals(m(3), 4)
1689
def test_localAsyncForwarder(self):
1691
Test a call to L{util.LocalAsyncForwarder} using L{Forwarded} local
1695
lf = util.LocalAsyncForwarder(f, IForwarded)
1696
lf.callRemote("forwardMe")
1697
self.assertTrue(f.forwarded)
1698
lf.callRemote("dontForwardMe")
1699
self.assertFalse(f.unforwarded)
1700
rr = lf.callRemote("forwardDeferred")
1702
rr.addCallback(l.append)
1703
self.assertEqual(l[0], 1)
1707
class PBWithSecurityOptionsTest(unittest.TestCase):
1709
Test security customization.
1712
def test_clientDefaultSecurityOptions(self):
1714
By default, client broker should use C{jelly.globalSecurity} as
1717
factory = pb.PBClientFactory()
1718
broker = factory.buildProtocol(None)
1719
self.assertIdentical(broker.security, jelly.globalSecurity)
1722
def test_serverDefaultSecurityOptions(self):
1724
By default, server broker should use C{jelly.globalSecurity} as
1727
factory = pb.PBServerFactory(Echoer())
1728
broker = factory.buildProtocol(None)
1729
self.assertIdentical(broker.security, jelly.globalSecurity)
1732
def test_clientSecurityCustomization(self):
1734
Check that the security settings are passed from the client factory to
1737
security = jelly.SecurityOptions()
1738
factory = pb.PBClientFactory(security=security)
1739
broker = factory.buildProtocol(None)
1740
self.assertIdentical(broker.security, security)
1743
def test_serverSecurityCustomization(self):
1745
Check that the security settings are passed from the server factory to
1748
security = jelly.SecurityOptions()
1749
factory = pb.PBServerFactory(Echoer(), security=security)
1750
broker = factory.buildProtocol(None)
1751
self.assertIdentical(broker.security, security)
1755
class DeprecationTests(unittest.TestCase):
1757
Tests for certain deprecations of free-functions in L{twisted.spread.pb}.
1759
def test_noOperationDeprecated(self):
1761
L{pb.noOperation} is deprecated.
1763
self.callDeprecated(
1764
Version("twisted", 8, 2, 0),
1765
pb.noOperation, 1, 2, x=3, y=4)
1768
def test_printTraceback(self):
1770
L{pb.printTraceback} is deprecated.
1772
self.callDeprecated(
1773
Version("twisted", 8, 2, 0),
1775
"printTraceback deprecation fake traceback value")