2
Tests for a running Pyro server, without timeouts.
4
Pyro - Python Remote Objects. Copyright by Irmen de Jong.
5
irmen@razorvine.net - http://www.razorvine.net/python/Pyro
8
from __future__ import with_statement
14
from Pyro import threadutil
16
class MyThing(object):
18
self.dictionary={"number":42}
20
return self.dictionary
21
def multiply(self,x,y):
25
def delay(self, delay):
27
return "slept %d seconds" % delay
28
def delayAndId(self, delay, id):
30
return "slept for "+str(id)
31
def testargs(self,x,*args,**kwargs):
34
class DaemonLoopThread(threadutil.Thread):
35
def __init__(self, pyrodaemon):
36
super(DaemonLoopThread,self).__init__()
38
self.pyrodaemon=pyrodaemon
39
self.running=threadutil.Event()
44
self.pyrodaemon.requestLoop()
46
print "Swallow exception from terminated daemon"
48
class ServerTestsThreadNoTimeout(unittest.TestCase):
52
Pyro.config.POLLTIMEOUT=0.1
53
Pyro.config.SERVERTYPE=self.SERVERTYPE
54
Pyro.config.COMMTIMEOUT=self.COMMTIMEOUT
55
Pyro.config.THREADPOOL_MINTHREADS=2
56
Pyro.config.THREADPOOL_MAXTHREADS=20
57
self.daemon=Pyro.core.Daemon(port=0)
59
uri=self.daemon.register(obj, "something")
61
self.daemonthread=DaemonLoopThread(self.daemon)
62
self.daemonthread.start()
63
self.daemonthread.running.wait()
66
self.daemon.shutdown()
67
self.daemonthread.join()
68
Pyro.config.SERVERTYPE="thread"
69
Pyro.config.COMMTIMEOUT=None
71
def testNoDottedNames(self):
72
Pyro.config.DOTTEDNAMES=False
73
with Pyro.core.Proxy(self.objectUri) as p:
74
self.assertEqual(55,p.multiply(5,11))
76
self.assertEqual({"number":42}, x)
78
p.dictionary.update({"more":666}) # should fail with DOTTEDNAMES=False (the default)
79
self.fail("expected AttributeError")
80
except AttributeError:
83
self.assertEqual({"number":42}, x)
84
# also test some argument type things
85
self.assertEqual((1,(),{}), p.testargs(1))
86
self.assertEqual((1,(2,3),{'a':4}), p.testargs(1,2,3,a=4))
87
self.assertEqual((1,(),{'a':2}), p.testargs(1, **{'a':2}))
88
if sys.version_info>=(2,6,5):
89
# python 2.6.5 and later support unicode keyword args
90
result=p.testargs(1, **{unichr(0x20ac):2})
91
key=result[2].keys()[0]
92
self.assertTrue(key==unichr(0x20ac))
95
def testDottedNames(self):
97
Pyro.config.DOTTEDNAMES=True
98
with Pyro.core.Proxy(self.objectUri) as p:
99
self.assertEqual(55,p.multiply(5,11))
101
self.assertEqual({"number":42}, x)
102
p.dictionary.update({"more":666}) # updating it remotely should work with DOTTEDNAMES=True
104
self.assertEqual({"number":42, "more":666}, x) # eek, it got updated!
106
Pyro.config.DOTTEDNAMES=False
108
def testConnectionStuff(self):
109
p1=Pyro.core.Proxy(self.objectUri)
110
p2=Pyro.core.Proxy(self.objectUri)
111
self.assertTrue(p1._pyroConnection is None)
112
self.assertTrue(p2._pyroConnection is None)
117
self.assertTrue(p1._pyroConnection is not None)
118
self.assertTrue(p2._pyroConnection is not None)
123
self.assertTrue(p1._pyroConnection is None)
124
self.assertTrue(p2._pyroConnection is None)
128
self.assertTrue(p1._pyroConnection is not None)
129
self.assertTrue(p2._pyroConnection is not None)
130
self.assertEqual("PYRO",p1._pyroUri.protocol)
131
self.assertEqual("PYRO",p2._pyroUri.protocol)
135
def testReconnectAndCompression(self):
137
with Pyro.core.Proxy(self.objectUri) as p:
138
self.assertTrue(p._pyroConnection is None)
139
p._pyroReconnect(tries=100)
140
self.assertTrue(p._pyroConnection is not None)
141
self.assertTrue(p._pyroConnection is None)
144
with Pyro.core.Proxy(self.objectUri) as p:
145
Pyro.config.COMPRESSION=True
146
self.assertEqual(55, p.multiply(5,11))
147
self.assertEqual("*"*1000, p.multiply("*"*500,2))
149
Pyro.config.COMPRESSION=False
151
def testOneway(self):
152
with Pyro.core.Proxy(self.objectUri) as p:
153
self.assertEquals(55, p.multiply(5,11))
154
p._pyroOneway.add("multiply")
155
self.assertEquals(None, p.multiply(5,11))
156
self.assertEquals(None, p.multiply(5,11))
157
self.assertEquals(None, p.multiply(5,11))
158
p._pyroOneway.remove("multiply")
159
self.assertEquals(55, p.multiply(5,11))
160
self.assertEquals(55, p.multiply(5,11))
161
self.assertEquals(55, p.multiply(5,11))
162
# check nonexisting method behavoir
163
self.assertRaises(AttributeError, p.nonexisting)
164
p._pyroOneway.add("nonexisting")
165
# now it shouldn't fail because of oneway semantics
167
# also test on class:
168
class ProxyWithOneway(Pyro.core.Proxy):
169
def __init__(self, arg):
170
super(ProxyWithOneway,self).__init__(arg)
171
self._pyroOneway=["multiply"] # set is faster but don't care for this test
172
with ProxyWithOneway(self.objectUri) as p:
173
self.assertEquals(None, p.multiply(5,11))
174
p._pyroOneway=[] # empty set is better but don't care in this test
175
self.assertEquals(55, p.multiply(5,11))
177
def testOnewayDelayed(self):
179
with Pyro.core.Proxy(self.objectUri) as p:
181
Pyro.config.ONEWAY_THREADED=True # the default
182
p._pyroOneway.add("delay")
184
p.delay(1) # oneway so we should continue right away
185
self.assertTrue(time.time()-now < 0.2, "delay should be running as oneway")
187
self.assertEquals(55,p.multiply(5,11), "expected a normal result from a non-oneway call")
188
self.assertTrue(time.time()-now < 0.2, "delay should be running in its own thread")
189
# make oneway calls run in the server thread
190
# we can change the config here and the server will pick it up on the fly
191
Pyro.config.ONEWAY_THREADED=False
193
p.delay(1) # oneway so we should continue right away
194
self.assertTrue(time.time()-now < 0.2, "delay should be running as oneway")
196
self.assertEquals(55,p.multiply(5,11), "expected a normal result from a non-oneway call")
197
self.assertFalse(time.time()-now < 0.2, "delay should be running in the server thread")
199
Pyro.config.ONEWAY_THREADED=True # back to normal
201
def testSerializeConnected(self):
202
# online serialization tests
203
ser=Pyro.util.Serializer()
204
proxy=Pyro.core.Proxy(self.objectUri)
206
self.assertFalse(proxy._pyroConnection is None)
207
p,_=ser.serialize(proxy)
208
proxy2=ser.deserialize(p)
209
self.assertTrue(proxy2._pyroConnection is None)
210
self.assertFalse(proxy._pyroConnection is None)
211
self.assertEqual(proxy2._pyroUri, proxy._pyroUri)
212
self.assertEqual(proxy2._pyroSerializer, proxy._pyroSerializer)
214
self.assertFalse(proxy2._pyroConnection is None)
215
self.assertFalse(proxy2._pyroConnection is proxy._pyroConnection)
217
proxy2._pyroRelease()
218
self.assertTrue(proxy._pyroConnection is None)
219
self.assertTrue(proxy2._pyroConnection is None)
222
# try copying a connected proxy
224
proxy3=copy.copy(proxy)
225
self.assertTrue(proxy3._pyroConnection is None)
226
self.assertFalse(proxy._pyroConnection is None)
227
self.assertEqual(proxy3._pyroUri, proxy._pyroUri)
228
self.assertFalse(proxy3._pyroUri is proxy._pyroUri)
229
self.assertEqual(proxy3._pyroSerializer, proxy._pyroSerializer)
231
proxy2._pyroRelease()
232
proxy3._pyroRelease()
234
def testTimeoutCall(self):
235
Pyro.config.COMMTIMEOUT=None
236
with Pyro.core.Proxy(self.objectUri) as p:
240
duration=time.time()-start
241
self.assertAlmostEqual(0.5, duration, 1)
244
self.assertRaises(Pyro.errors.TimeoutError, p.delay, 1)
245
duration=time.time()-start
246
if sys.platform!="cli":
247
self.assertAlmostEqual(0.1, duration, 1)
249
# ironpython's time is wonky
250
self.assertTrue(0.0<duration<0.7)
252
def testTimeoutConnect(self):
253
# set up a unresponsive daemon
254
with Pyro.core.Daemon(port=0) as d:
258
# we're not going to start the daemon's event loop
259
p=Pyro.core.Proxy(uri)
262
self.assertRaises(Pyro.errors.TimeoutError, p.ping)
263
duration=time.time()-start
264
self.assertTrue(duration<2.0)
266
def testProxySharing(self):
267
class SharedProxyThread(threadutil.Thread):
268
def __init__(self, proxy):
269
super(SharedProxyThread,self).__init__()
276
while not self.terminate:
277
reply=self.proxy.multiply(5,11)
282
print "Something went wrong in the thread (SharedProxyThread):"
283
print "".join(Pyro.util.getPyroTraceback())
284
with Pyro.core.Proxy(self.objectUri) as p:
287
t=SharedProxyThread(p)
295
self.assertFalse(t.error, "all threads should report no errors")
297
def testServerConnections(self):
298
# check if the server allows to grow the number of connections
299
proxies=[Pyro.core.Proxy(self.objectUri) for _ in range(10)]
310
def testServerParallelism(self):
311
class ClientThread(threadutil.Thread):
312
def __init__(self, uri, name):
313
super(ClientThread,self).__init__()
315
self.proxy=Pyro.core.Proxy(uri)
318
self.proxy._pyroTimeout=5.0
319
self.proxy._pyroBind()
322
reply=self.proxy.delayAndId(0.5, self.name)
323
assert reply=="slept for "+self.name
326
self.proxy._pyroRelease()
331
t=ClientThread(self.objectUri,"t%d" % i)
334
# some exception (probably timeout) while creating clients
335
# try to clean up some connections first
337
t.proxy._pyroRelease()
338
raise # re-raise the exception
343
self.assertFalse(t.error, "all threads should report no errors")
345
duration=time.time()-start
346
if Pyro.config.SERVERTYPE=="select":
347
# select based server doesn't execute calls in parallel,
348
# so 6 threads times 0.5 seconds =~ 3 seconds
349
self.assertTrue(2.5<duration<3.5)
351
# thread based server does execute calls in parallel,
352
# so 6 threads taking 0.5 seconds =~ 0.5 seconds passed
353
self.assertTrue(0.3<duration<0.7)
356
class ServerTestsSelectNoTimeout(ServerTestsThreadNoTimeout):
359
def testProxySharing(self):
362
if __name__ == "__main__":
363
#import sys;sys.argv = ['', 'Test.testName']