1
# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
import pickle, time, weakref, gc
7
from twisted.trial import unittest, util
8
from twisted.python import threadable, failure, context
9
from twisted.internet import reactor, interfaces
10
from twisted.internet.defer import Deferred
13
# See the end of this module for the remainder of the imports.
16
class Synchronization(object):
19
def __init__(self, N, waiting):
21
self.waiting = waiting
22
self.lock = threading.Lock()
26
# This is the testy part: this is supposed to be invoked
27
# serially from multiple threads. If that is actually the
28
# case, we will never fail to acquire this lock. If it is
29
# *not* the case, we might get here while someone else is
31
if self.lock.acquire(False):
32
if not len(self.runs) % 5:
33
time.sleep(0.0002) # Constant selected based on
34
# empirical data to maximize the
35
# chance of a quick failure if this
41
# This is just the only way I can think of to wake up the test
42
# method. It doesn't actually have anything to do with the
45
self.runs.append(None)
46
if len(self.runs) == self.N:
47
self.waiting.release()
50
synchronized = ["run"]
51
threadable.synchronize(Synchronization)
55
class ThreadPoolTestCase(unittest.TestCase):
59
def _waitForLock(self, lock):
60
for i in xrange(1000000):
61
if lock.acquire(False):
65
self.fail("A long time passed without succeeding")
68
def test_attributes(self):
70
L{ThreadPool.min} and L{ThreadPool.max} are set to the values passed to
71
L{ThreadPool.__init__}.
73
pool = threadpool.ThreadPool(12, 22)
74
self.assertEqual(pool.min, 12)
75
self.assertEqual(pool.max, 22)
80
L{ThreadPool.start} creates the minimum number of threads specified.
82
pool = threadpool.ThreadPool(0, 5)
84
self.addCleanup(pool.stop)
85
self.assertEqual(len(pool.threads), 0)
87
pool = threadpool.ThreadPool(3, 10)
88
self.assertEqual(len(pool.threads), 0)
90
self.addCleanup(pool.stop)
91
self.assertEqual(len(pool.threads), 3)
94
def test_threadCreationArguments(self):
96
Test that creating threads in the threadpool with application-level
97
objects as arguments doesn't results in those objects never being
98
freed, with the thread maintaining a reference to them as long as it
101
tp = threadpool.ThreadPool(0, 1)
103
self.addCleanup(tp.stop)
105
# Sanity check - no threads should have been started yet.
106
self.assertEqual(tp.threads, [])
108
# Here's our function
111
# weakref needs an object subclass
114
# And here's the unique object
117
workerRef = weakref.ref(worker)
118
uniqueRef = weakref.ref(unique)
121
tp.callInThread(worker, unique)
123
# Add an event to wait completion
124
event = threading.Event()
125
tp.callInThread(event.set)
126
event.wait(self.getTimeout())
131
self.assertEquals(uniqueRef(), None)
132
self.assertEquals(workerRef(), None)
135
def test_threadCreationArgumentsCallInThreadWithCallback(self):
137
As C{test_threadCreationArguments} above, but for
138
callInThreadWithCallback.
141
tp = threadpool.ThreadPool(0, 1)
143
self.addCleanup(tp.stop)
145
# Sanity check - no threads should have been started yet.
146
self.assertEqual(tp.threads, [])
148
# this holds references obtained in onResult
149
refdict = {} # name -> ref value
151
onResultWait = threading.Event()
152
onResultDone = threading.Event()
157
def onResult(success, result):
158
onResultWait.wait(self.getTimeout())
159
refdict['workerRef'] = workerRef()
160
refdict['uniqueRef'] = uniqueRef()
162
resultRef.append(weakref.ref(result))
164
# Here's our function
165
def worker(arg, test):
168
# weakref needs an object subclass
172
# And here's the unique object
175
onResultRef = weakref.ref(onResult)
176
workerRef = weakref.ref(worker)
177
uniqueRef = weakref.ref(unique)
180
tp.callInThreadWithCallback(onResult, worker, unique, test=unique)
186
# let onResult collect the refs
189
onResultDone.wait(self.getTimeout())
191
self.assertEquals(uniqueRef(), None)
192
self.assertEquals(workerRef(), None)
194
# XXX There's a race right here - has onResult in the worker thread
195
# returned and the locals in _worker holding it and the result been
200
self.assertEqual(onResultRef(), None)
201
self.assertEqual(resultRef[0](), None)
204
def test_persistence(self):
206
Threadpools can be pickled and unpickled, which should preserve the
207
number of threads and other parameters.
209
pool = threadpool.ThreadPool(7, 20)
211
self.assertEquals(pool.min, 7)
212
self.assertEquals(pool.max, 20)
214
# check that unpickled threadpool has same number of threads
215
copy = pickle.loads(pickle.dumps(pool))
217
self.assertEquals(copy.min, 7)
218
self.assertEquals(copy.max, 20)
221
def _threadpoolTest(self, method):
223
Test synchronization of calls made with C{method}, which should be
224
one of the mechanisms of the threadpool to execute work in threads.
226
# This is a schizophrenic test: it seems to be trying to test
227
# both the callInThread()/dispatch() behavior of the ThreadPool as well
228
# as the serialization behavior of threadable.synchronize(). It
229
# would probably make more sense as two much simpler tests.
232
tp = threadpool.ThreadPool()
234
self.addCleanup(tp.stop)
236
waiting = threading.Lock()
238
actor = Synchronization(N, waiting)
243
self._waitForLock(waiting)
245
self.failIf(actor.failures, "run() re-entered %d times" %
249
def test_dispatch(self):
251
Call C{_threadpoolTest} with C{dispatch}.
253
return self._threadpoolTest(
254
lambda tp, actor: tp.dispatch(actor, actor.run))
256
test_dispatch.suppress = [util.suppress(
257
message="dispatch\(\) is deprecated since Twisted 8.0, "
258
"use callInThread\(\) instead",
259
category=DeprecationWarning)]
262
def test_callInThread(self):
264
Call C{_threadpoolTest} with C{callInThread}.
266
return self._threadpoolTest(
267
lambda tp, actor: tp.callInThread(actor.run))
270
def test_callInThreadException(self):
272
L{ThreadPool.callInThread} logs exceptions raised by the callable it
275
class NewError(Exception):
281
tp = threadpool.ThreadPool(0, 1)
282
tp.callInThread(raiseError)
286
errors = self.flushLoggedErrors(NewError)
287
self.assertEqual(len(errors), 1)
290
def test_callInThreadWithCallback(self):
292
L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
293
two-tuple of C{(True, result)} where C{result} is the value returned
294
by the callable supplied.
296
waiter = threading.Lock()
301
def onResult(success, result):
303
results.append(success)
304
results.append(result)
306
tp = threadpool.ThreadPool(0, 1)
307
tp.callInThreadWithCallback(onResult, lambda : "test")
311
self._waitForLock(waiter)
315
self.assertTrue(results[0])
316
self.assertEqual(results[1], "test")
319
def test_callInThreadWithCallbackExceptionInCallback(self):
321
L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
322
two-tuple of C{(False, failure)} where C{failure} represents the
323
exception raised by the callable supplied.
325
class NewError(Exception):
331
waiter = threading.Lock()
336
def onResult(success, result):
338
results.append(success)
339
results.append(result)
341
tp = threadpool.ThreadPool(0, 1)
342
tp.callInThreadWithCallback(onResult, raiseError)
346
self._waitForLock(waiter)
350
self.assertFalse(results[0])
351
self.assertTrue(isinstance(results[1], failure.Failure))
352
self.assertTrue(issubclass(results[1].type, NewError))
355
def test_callInThreadWithCallbackExceptionInOnResult(self):
357
L{ThreadPool.callInThreadWithCallback} logs the exception raised by
360
class NewError(Exception):
363
waiter = threading.Lock()
368
def onResult(success, result):
369
results.append(success)
370
results.append(result)
373
tp = threadpool.ThreadPool(0, 1)
374
tp.callInThreadWithCallback(onResult, lambda : None)
375
tp.callInThread(waiter.release)
379
self._waitForLock(waiter)
383
errors = self.flushLoggedErrors(NewError)
384
self.assertEqual(len(errors), 1)
386
self.assertTrue(results[0])
387
self.assertEqual(results[1], None)
390
def test_callbackThread(self):
392
L{ThreadPool.callInThreadWithCallback} calls the function it is
393
given and the C{onResult} callback in the same thread.
399
event = threading.Event()
401
def onResult(success, result):
402
threadIds.append(thread.get_ident())
406
threadIds.append(thread.get_ident())
408
tp = threadpool.ThreadPool(0, 1)
409
tp.callInThreadWithCallback(onResult, func)
411
self.addCleanup(tp.stop)
413
event.wait(self.getTimeout())
414
self.assertEqual(len(threadIds), 2)
415
self.assertEqual(threadIds[0], threadIds[1])
418
def test_callbackContext(self):
420
The context L{ThreadPool.callInThreadWithCallback} is invoked in is
421
shared by the context the callable and C{onResult} callback are
424
myctx = context.theContextTracker.currentContext().contexts[-1]
425
myctx['testing'] = 'this must be present'
429
event = threading.Event()
431
def onResult(success, result):
432
ctx = context.theContextTracker.currentContext().contexts[-1]
437
ctx = context.theContextTracker.currentContext().contexts[-1]
440
tp = threadpool.ThreadPool(0, 1)
441
tp.callInThreadWithCallback(onResult, func)
443
self.addCleanup(tp.stop)
445
event.wait(self.getTimeout())
447
self.assertEqual(len(contexts), 2)
448
self.assertEqual(myctx, contexts[0])
449
self.assertEqual(myctx, contexts[1])
452
def test_existingWork(self):
454
Work added to the threadpool before its start should be executed once
455
the threadpool is started: this is ensured by trying to release a lock
458
waiter = threading.Lock()
461
tp = threadpool.ThreadPool(0, 1)
462
tp.callInThread(waiter.release) # before start()
466
self._waitForLock(waiter)
471
def test_dispatchDeprecation(self):
473
Test for the deprecation of the dispatch method.
475
tp = threadpool.ThreadPool()
477
self.addCleanup(tp.stop)
480
return tp.dispatch(None, lambda: None)
482
self.assertWarns(DeprecationWarning,
483
"dispatch() is deprecated since Twisted 8.0, "
484
"use callInThread() instead",
488
def test_dispatchWithCallbackDeprecation(self):
490
Test for the deprecation of the dispatchWithCallback method.
492
tp = threadpool.ThreadPool()
494
self.addCleanup(tp.stop)
497
return tp.dispatchWithCallback(
503
self.assertWarns(DeprecationWarning,
504
"dispatchWithCallback() is deprecated since Twisted 8.0, "
505
"use twisted.internet.threads.deferToThread() instead.",
510
class RaceConditionTestCase(unittest.TestCase):
512
self.event = threading.Event()
513
self.threadpool = threadpool.ThreadPool(0, 10)
514
self.threadpool.start()
519
self.threadpool.stop()
523
def test_synchronization(self):
525
Test a race condition: ensure that actions run in the pool synchronize
526
with actions run in the main thread.
528
timeout = self.getTimeout()
529
self.threadpool.callInThread(self.event.set)
530
self.event.wait(timeout)
533
self.threadpool.callInThread(self.event.wait)
534
self.threadpool.callInThread(self.event.set)
535
self.event.wait(timeout)
536
if not self.event.isSet():
538
self.fail("Actions not synchronized")
541
def test_singleThread(self):
543
The submission of a new job to a thread pool in response to the
544
C{onResult} callback does not cause a new thread to be added to the
547
This requires that the thread which calls C{onResult} to have first
548
marked itself as available so that when the new job is queued, that
549
thread may be considered to run it. This is desirable so that when
550
only N jobs are ever being executed in the thread pool at once only
551
N threads will ever be created.
553
# Ensure no threads running
554
self.assertEquals(self.threadpool.workers, 0)
556
loopDeferred = Deferred()
558
def onResult(success, counter):
559
reactor.callFromThread(submit, counter)
563
self.threadpool.callInThreadWithCallback(
564
onResult, lambda: counter - 1)
566
loopDeferred.callback(None)
569
# Ensure there is only one thread running.
570
self.assertEqual(self.threadpool.workers, 1)
572
loopDeferred.addCallback(cbLoop)
578
if interfaces.IReactorThreads(reactor, None) is None:
579
for cls in ThreadPoolTestCase, RaceConditionTestCase:
580
setattr(cls, 'skip', "No thread support, nothing to test here")
583
from twisted.python import threadpool