1
# Copyright (c) 2008 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for implementations of L{IReactorThreads}.
10
from weakref import ref
13
from twisted.internet.test.reactormixins import ReactorBuilder
14
from twisted.python.threadpool import ThreadPool
17
class ThreadTestsBuilder(ReactorBuilder):
19
Builder for defining tests relating to L{IReactorThreads}.
21
def test_getThreadPool(self):
23
C{reactor.getThreadPool()} returns an instance of L{ThreadPool} which
24
starts when C{reactor.run()} is called and stops before it returns.
27
reactor = self.buildReactor()
29
pool = reactor.getThreadPool()
30
self.assertIsInstance(pool, ThreadPool)
32
pool.started, "Pool should not start before reactor.run")
35
# Record the state for later assertions
36
state.append(pool.started)
37
state.append(pool.joined)
40
reactor.callWhenRunning(f)
41
self.runReactor(reactor, 2)
44
state[0], "Pool should start after reactor.run")
46
state[1], "Pool should not be joined before reactor.stop")
49
"Pool should be stopped after reactor.run returns")
52
def test_suggestThreadPoolSize(self):
54
C{reactor.suggestThreadPoolSize()} sets the maximum size of the reactor
57
reactor = self.buildReactor()
58
reactor.suggestThreadPoolSize(17)
59
pool = reactor.getThreadPool()
60
self.assertEqual(pool.max, 17)
63
def test_delayedCallFromThread(self):
65
A function scheduled with L{IReactorThreads.callFromThread} invoked
66
from a delayed call is run immediately in the next reactor iteration.
68
When invoked from the reactor thread, previous implementations of
69
L{IReactorThreads.callFromThread} would skip the pipe/socket based wake
70
up step, assuming the reactor would wake up on its own. However, this
71
resulted in the reactor not noticing a insert into the thread queue at
72
the right time (in this case, after the thread queue has been processed
73
for that reactor iteration).
75
reactor = self.buildReactor()
80
# Set up the use of callFromThread being tested.
81
reactor.callLater(0, reactor.callFromThread, threadCall)
83
before = reactor.seconds()
84
self.runReactor(reactor, 60)
85
after = reactor.seconds()
87
# We specified a timeout of 60 seconds. The timeout code in runReactor
88
# probably won't actually work, though. If the reactor comes out of
89
# the event notification API just a little bit early, say after 59.9999
90
# seconds instead of after 60 seconds, then the queued thread call will
91
# get processed but the timeout delayed call runReactor sets up won't!
92
# Then the reactor will stop and runReactor will return without the
93
# timeout firing. As it turns out, select() and poll() are quite
94
# likely to return *slightly* earlier than we ask them to, so the
95
# timeout will rarely happen, even if callFromThread is broken. So,
96
# instead we'll measure the elapsed time and make sure it's something
97
# less than about half of the timeout we specified. This is heuristic.
98
# It assumes that select() won't ever return after 30 seconds when we
99
# asked it to timeout after 60 seconds. And of course like all
100
# time-based tests, it's slightly non-deterministic. If the OS doesn't
101
# schedule this process for 30 seconds, then the test might fail even
102
# if callFromThread is working.
103
self.assertTrue(after - before < 30)
106
def test_stopThreadPool(self):
108
When the reactor stops, L{ReactorBase._stopThreadPool} drops the
109
reactor's direct reference to its internal threadpool and removes
110
the associated startup and shutdown triggers.
112
This is the case of the thread pool being created before the reactor
115
reactor = self.buildReactor()
116
threadpool = ref(reactor.getThreadPool())
117
reactor.callWhenRunning(reactor.stop)
118
self.runReactor(reactor)
120
self.assertIdentical(threadpool(), None)
123
def test_stopThreadPoolWhenStartedAfterReactorRan(self):
125
We must handle the case of shutting down the thread pool when it was
126
started after the reactor was run in a special way.
128
Some implementation background: The thread pool is started with
129
callWhenRunning, which only returns a system trigger ID when it is
130
invoked before the reactor is started.
132
This is the case of the thread pool being created after the reactor
135
reactor = self.buildReactor()
137
def acquireThreadPool():
138
threadPoolRefs.append(ref(reactor.getThreadPool()))
140
reactor.callWhenRunning(acquireThreadPool)
141
self.runReactor(reactor)
143
self.assertIdentical(threadPoolRefs[0](), None)
146
def test_cleanUpThreadPoolEvenBeforeReactorIsRun(self):
148
When the reactor has its shutdown event fired before it is run, the
149
thread pool is completely destroyed.
151
For what it's worth, the reason we support this behavior at all is
152
because Trial does this.
154
This is the case of the thread pool being created without the reactor
157
reactor = self.buildReactor()
158
threadPoolRef = ref(reactor.getThreadPool())
159
reactor.fireSystemEvent("shutdown")
160
self.assertIdentical(threadPoolRef(), None)
163
globals().update(ThreadTestsBuilder.makeTestCaseClasses())