1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
# See LICENSE for details.
import sys, pickle
try:
import threading
except ImportError:
threading = None
from twisted.trial import unittest
from twisted.python import threadable
from twisted.internet import defer, reactor
class TestObject:
synchronized = ['aMethod']
x = -1
y = 1
def aMethod(self):
for i in xrange(10):
self.x, self.y = self.y, self.x
self.z = self.x + self.y
assert self.z == 0, "z == %d, not 0 as expected" % (self.z,)
threadable.synchronize(TestObject)
class SynchronizationTestCase(unittest.TestCase):
def setUp(self):
"""
Reduce the CPython check interval so that thread switches happen much
more often, hopefully exercising more possible race conditions. Also,
delay actual test startup until the reactor has been started.
"""
if hasattr(sys, 'getcheckinterval'):
self.addCleanup(sys.setcheckinterval, sys.getcheckinterval())
sys.setcheckinterval(7)
# XXX This is a trial hack. We need to make sure the reactor
# actually *starts* for isInIOThread() to have a meaningful result.
# Returning a Deferred here should force that to happen, if it has
# not happened already. In the future, this should not be
# necessary.
d = defer.Deferred()
reactor.callLater(0, d.callback, None)
return d
def testIsInIOThread(self):
foreignResult = []
t = threading.Thread(target=lambda: foreignResult.append(threadable.isInIOThread()))
t.start()
t.join()
self.failIf(foreignResult[0], "Non-IO thread reported as IO thread")
self.failUnless(threadable.isInIOThread(), "IO thread reported as not IO thread")
def testThreadedSynchronization(self):
o = TestObject()
errors = []
def callMethodLots():
try:
for i in xrange(1000):
o.aMethod()
except AssertionError, e:
errors.append(str(e))
threads = []
for x in range(5):
t = threading.Thread(target=callMethodLots)
threads.append(t)
t.start()
for t in threads:
t.join()
if errors:
raise unittest.FailTest(errors)
def testUnthreadedSynchronization(self):
o = TestObject()
for i in xrange(1000):
o.aMethod()
class SerializationTestCase(unittest.TestCase):
def testPickling(self):
lock = threadable.XLock()
lockType = type(lock)
lockPickle = pickle.dumps(lock)
newLock = pickle.loads(lockPickle)
self.failUnless(isinstance(newLock, lockType))
def testUnpickling(self):
lockPickle = 'ctwisted.python.threadable\nunpickle_lock\np0\n(tp1\nRp2\n.'
lock = pickle.loads(lockPickle)
newPickle = pickle.dumps(lock, 2)
newLock = pickle.loads(newPickle)
if threading is None:
SynchronizationTestCase.testThreadedSynchronization.skip = "Platform lacks thread support"
SerializationTestCase.testPickling.skip = "Platform lacks thread support"
|