~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/test/test_threadpool.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
 
 
5
import pickle, time, weakref, gc
 
6
 
 
7
from twisted.trial import unittest, util
 
8
from twisted.python import threadable, failure, context
 
9
from twisted.internet import reactor, interfaces
 
10
from twisted.internet.defer import Deferred
 
11
 
 
12
#
 
13
# See the end of this module for the remainder of the imports.
 
14
#
 
15
 
 
16
class Synchronization(object):
 
17
    failures = 0
 
18
 
 
19
    def __init__(self, N, waiting):
 
20
        self.N = N
 
21
        self.waiting = waiting
 
22
        self.lock = threading.Lock()
 
23
        self.runs = []
 
24
 
 
25
    def run(self):
 
26
        # This is the testy part: this is supposed to be invoked
 
27
        # serially from multiple threads.  If that is actually the
 
28
        # case, we will never fail to acquire this lock.  If it is
 
29
        # *not* the case, we might get here while someone else is
 
30
        # holding the lock.
 
31
        if self.lock.acquire(False):
 
32
            if not len(self.runs) % 5:
 
33
                time.sleep(0.0002) # Constant selected based on
 
34
                                   # empirical data to maximize the
 
35
                                   # chance of a quick failure if this
 
36
                                   # code is broken.
 
37
            self.lock.release()
 
38
        else:
 
39
            self.failures += 1
 
40
 
 
41
        # This is just the only way I can think of to wake up the test
 
42
        # method.  It doesn't actually have anything to do with the
 
43
        # test.
 
44
        self.lock.acquire()
 
45
        self.runs.append(None)
 
46
        if len(self.runs) == self.N:
 
47
            self.waiting.release()
 
48
        self.lock.release()
 
49
 
 
50
    synchronized = ["run"]
 
51
threadable.synchronize(Synchronization)
 
52
 
 
53
 
 
54
 
 
55
class ThreadPoolTestCase(unittest.TestCase):
 
56
    """
 
57
    Test threadpools.
 
58
    """
 
59
    def _waitForLock(self, lock):
 
60
        for i in xrange(1000000):
 
61
            if lock.acquire(False):
 
62
                break
 
63
            time.sleep(1e-5)
 
64
        else:
 
65
            self.fail("A long time passed without succeeding")
 
66
 
 
67
 
 
68
    def test_attributes(self):
 
69
        """
 
70
        L{ThreadPool.min} and L{ThreadPool.max} are set to the values passed to
 
71
        L{ThreadPool.__init__}.
 
72
        """
 
73
        pool = threadpool.ThreadPool(12, 22)
 
74
        self.assertEqual(pool.min, 12)
 
75
        self.assertEqual(pool.max, 22)
 
76
 
 
77
 
 
78
    def test_start(self):
 
79
        """
 
80
        L{ThreadPool.start} creates the minimum number of threads specified.
 
81
        """
 
82
        pool = threadpool.ThreadPool(0, 5)
 
83
        pool.start()
 
84
        self.addCleanup(pool.stop)
 
85
        self.assertEqual(len(pool.threads), 0)
 
86
 
 
87
        pool = threadpool.ThreadPool(3, 10)
 
88
        self.assertEqual(len(pool.threads), 0)
 
89
        pool.start()
 
90
        self.addCleanup(pool.stop)
 
91
        self.assertEqual(len(pool.threads), 3)
 
92
 
 
93
 
 
94
    def test_threadCreationArguments(self):
 
95
        """
 
96
        Test that creating threads in the threadpool with application-level
 
97
        objects as arguments doesn't results in those objects never being
 
98
        freed, with the thread maintaining a reference to them as long as it
 
99
        exists.
 
100
        """
 
101
        tp = threadpool.ThreadPool(0, 1)
 
102
        tp.start()
 
103
        self.addCleanup(tp.stop)
 
104
 
 
105
        # Sanity check - no threads should have been started yet.
 
106
        self.assertEqual(tp.threads, [])
 
107
 
 
108
        # Here's our function
 
109
        def worker(arg):
 
110
            pass
 
111
        # weakref needs an object subclass
 
112
        class Dumb(object):
 
113
            pass
 
114
        # And here's the unique object
 
115
        unique = Dumb()
 
116
 
 
117
        workerRef = weakref.ref(worker)
 
118
        uniqueRef = weakref.ref(unique)
 
119
 
 
120
        # Put some work in
 
121
        tp.callInThread(worker, unique)
 
122
 
 
123
        # Add an event to wait completion
 
124
        event = threading.Event()
 
125
        tp.callInThread(event.set)
 
126
        event.wait(self.getTimeout())
 
127
 
 
128
        del worker
 
129
        del unique
 
130
        gc.collect()
 
131
        self.assertEquals(uniqueRef(), None)
 
132
        self.assertEquals(workerRef(), None)
 
133
 
 
134
 
 
135
    def test_threadCreationArgumentsCallInThreadWithCallback(self):
 
136
        """
 
137
        As C{test_threadCreationArguments} above, but for
 
138
        callInThreadWithCallback.
 
139
        """
 
140
 
 
141
        tp = threadpool.ThreadPool(0, 1)
 
142
        tp.start()
 
143
        self.addCleanup(tp.stop)
 
144
 
 
145
        # Sanity check - no threads should have been started yet.
 
146
        self.assertEqual(tp.threads, [])
 
147
 
 
148
        # this holds references obtained in onResult
 
149
        refdict = {} # name -> ref value
 
150
 
 
151
        onResultWait = threading.Event()
 
152
        onResultDone = threading.Event()
 
153
 
 
154
        resultRef = []
 
155
 
 
156
        # result callback
 
157
        def onResult(success, result):
 
158
            onResultWait.wait(self.getTimeout())
 
159
            refdict['workerRef'] = workerRef()
 
160
            refdict['uniqueRef'] = uniqueRef()
 
161
            onResultDone.set()
 
162
            resultRef.append(weakref.ref(result))
 
163
 
 
164
        # Here's our function
 
165
        def worker(arg, test):
 
166
            return Dumb()
 
167
 
 
168
        # weakref needs an object subclass
 
169
        class Dumb(object):
 
170
            pass
 
171
 
 
172
        # And here's the unique object
 
173
        unique = Dumb()
 
174
 
 
175
        onResultRef = weakref.ref(onResult)
 
176
        workerRef = weakref.ref(worker)
 
177
        uniqueRef = weakref.ref(unique)
 
178
 
 
179
        # Put some work in
 
180
        tp.callInThreadWithCallback(onResult, worker, unique, test=unique)
 
181
 
 
182
        del worker
 
183
        del unique
 
184
        gc.collect()
 
185
 
 
186
        # let onResult collect the refs
 
187
        onResultWait.set()
 
188
        # wait for onResult
 
189
        onResultDone.wait(self.getTimeout())
 
190
 
 
191
        self.assertEquals(uniqueRef(), None)
 
192
        self.assertEquals(workerRef(), None)
 
193
 
 
194
        # XXX There's a race right here - has onResult in the worker thread
 
195
        # returned and the locals in _worker holding it and the result been
 
196
        # deleted yet?
 
197
 
 
198
        del onResult
 
199
        gc.collect()
 
200
        self.assertEqual(onResultRef(), None)
 
201
        self.assertEqual(resultRef[0](), None)
 
202
 
 
203
 
 
204
    def test_persistence(self):
 
205
        """
 
206
        Threadpools can be pickled and unpickled, which should preserve the
 
207
        number of threads and other parameters.
 
208
        """
 
209
        pool = threadpool.ThreadPool(7, 20)
 
210
 
 
211
        self.assertEquals(pool.min, 7)
 
212
        self.assertEquals(pool.max, 20)
 
213
 
 
214
        # check that unpickled threadpool has same number of threads
 
215
        copy = pickle.loads(pickle.dumps(pool))
 
216
 
 
217
        self.assertEquals(copy.min, 7)
 
218
        self.assertEquals(copy.max, 20)
 
219
 
 
220
 
 
221
    def _threadpoolTest(self, method):
 
222
        """
 
223
        Test synchronization of calls made with C{method}, which should be
 
224
        one of the mechanisms of the threadpool to execute work in threads.
 
225
        """
 
226
        # This is a schizophrenic test: it seems to be trying to test
 
227
        # both the callInThread()/dispatch() behavior of the ThreadPool as well
 
228
        # as the serialization behavior of threadable.synchronize().  It
 
229
        # would probably make more sense as two much simpler tests.
 
230
        N = 10
 
231
 
 
232
        tp = threadpool.ThreadPool()
 
233
        tp.start()
 
234
        self.addCleanup(tp.stop)
 
235
 
 
236
        waiting = threading.Lock()
 
237
        waiting.acquire()
 
238
        actor = Synchronization(N, waiting)
 
239
 
 
240
        for i in xrange(N):
 
241
            method(tp, actor)
 
242
 
 
243
        self._waitForLock(waiting)
 
244
 
 
245
        self.failIf(actor.failures, "run() re-entered %d times" %
 
246
                                    (actor.failures,))
 
247
 
 
248
 
 
249
    def test_dispatch(self):
 
250
        """
 
251
        Call C{_threadpoolTest} with C{dispatch}.
 
252
        """
 
253
        return self._threadpoolTest(
 
254
            lambda tp, actor: tp.dispatch(actor, actor.run))
 
255
 
 
256
    test_dispatch.suppress = [util.suppress(
 
257
                message="dispatch\(\) is deprecated since Twisted 8.0, "
 
258
                        "use callInThread\(\) instead",
 
259
                category=DeprecationWarning)]
 
260
 
 
261
 
 
262
    def test_callInThread(self):
 
263
        """
 
264
        Call C{_threadpoolTest} with C{callInThread}.
 
265
        """
 
266
        return self._threadpoolTest(
 
267
            lambda tp, actor: tp.callInThread(actor.run))
 
268
 
 
269
 
 
270
    def test_callInThreadException(self):
 
271
        """
 
272
        L{ThreadPool.callInThread} logs exceptions raised by the callable it
 
273
        is passed.
 
274
        """
 
275
        class NewError(Exception):
 
276
            pass
 
277
 
 
278
        def raiseError():
 
279
            raise NewError()
 
280
 
 
281
        tp = threadpool.ThreadPool(0, 1)
 
282
        tp.callInThread(raiseError)
 
283
        tp.start()
 
284
        tp.stop()
 
285
 
 
286
        errors = self.flushLoggedErrors(NewError)
 
287
        self.assertEqual(len(errors), 1)
 
288
 
 
289
 
 
290
    def test_callInThreadWithCallback(self):
 
291
        """
 
292
        L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
 
293
        two-tuple of C{(True, result)} where C{result} is the value returned
 
294
        by the callable supplied.
 
295
        """
 
296
        waiter = threading.Lock()
 
297
        waiter.acquire()
 
298
 
 
299
        results = []
 
300
 
 
301
        def onResult(success, result):
 
302
            waiter.release()
 
303
            results.append(success)
 
304
            results.append(result)
 
305
 
 
306
        tp = threadpool.ThreadPool(0, 1)
 
307
        tp.callInThreadWithCallback(onResult, lambda : "test")
 
308
        tp.start()
 
309
 
 
310
        try:
 
311
            self._waitForLock(waiter)
 
312
        finally:
 
313
            tp.stop()
 
314
 
 
315
        self.assertTrue(results[0])
 
316
        self.assertEqual(results[1], "test")
 
317
 
 
318
 
 
319
    def test_callInThreadWithCallbackExceptionInCallback(self):
 
320
        """
 
321
        L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
 
322
        two-tuple of C{(False, failure)} where C{failure} represents the
 
323
        exception raised by the callable supplied.
 
324
        """
 
325
        class NewError(Exception):
 
326
            pass
 
327
 
 
328
        def raiseError():
 
329
            raise NewError()
 
330
 
 
331
        waiter = threading.Lock()
 
332
        waiter.acquire()
 
333
 
 
334
        results = []
 
335
 
 
336
        def onResult(success, result):
 
337
            waiter.release()
 
338
            results.append(success)
 
339
            results.append(result)
 
340
 
 
341
        tp = threadpool.ThreadPool(0, 1)
 
342
        tp.callInThreadWithCallback(onResult, raiseError)
 
343
        tp.start()
 
344
 
 
345
        try:
 
346
            self._waitForLock(waiter)
 
347
        finally:
 
348
            tp.stop()
 
349
 
 
350
        self.assertFalse(results[0])
 
351
        self.assertTrue(isinstance(results[1], failure.Failure))
 
352
        self.assertTrue(issubclass(results[1].type, NewError))
 
353
 
 
354
 
 
355
    def test_callInThreadWithCallbackExceptionInOnResult(self):
 
356
        """
 
357
        L{ThreadPool.callInThreadWithCallback} logs the exception raised by
 
358
        C{onResult}.
 
359
        """
 
360
        class NewError(Exception):
 
361
            pass
 
362
 
 
363
        waiter = threading.Lock()
 
364
        waiter.acquire()
 
365
 
 
366
        results = []
 
367
 
 
368
        def onResult(success, result):
 
369
            results.append(success)
 
370
            results.append(result)
 
371
            raise NewError()
 
372
 
 
373
        tp = threadpool.ThreadPool(0, 1)
 
374
        tp.callInThreadWithCallback(onResult, lambda : None)
 
375
        tp.callInThread(waiter.release)
 
376
        tp.start()
 
377
 
 
378
        try:
 
379
            self._waitForLock(waiter)
 
380
        finally:
 
381
            tp.stop()
 
382
 
 
383
        errors = self.flushLoggedErrors(NewError)
 
384
        self.assertEqual(len(errors), 1)
 
385
 
 
386
        self.assertTrue(results[0])
 
387
        self.assertEqual(results[1], None)
 
388
 
 
389
 
 
390
    def test_callbackThread(self):
 
391
        """
 
392
        L{ThreadPool.callInThreadWithCallback} calls the function it is
 
393
        given and the C{onResult} callback in the same thread.
 
394
        """
 
395
        threadIds = []
 
396
 
 
397
        import thread
 
398
 
 
399
        event = threading.Event()
 
400
 
 
401
        def onResult(success, result):
 
402
            threadIds.append(thread.get_ident())
 
403
            event.set()
 
404
 
 
405
        def func():
 
406
            threadIds.append(thread.get_ident())
 
407
 
 
408
        tp = threadpool.ThreadPool(0, 1)
 
409
        tp.callInThreadWithCallback(onResult, func)
 
410
        tp.start()
 
411
        self.addCleanup(tp.stop)
 
412
 
 
413
        event.wait(self.getTimeout())
 
414
        self.assertEqual(len(threadIds), 2)
 
415
        self.assertEqual(threadIds[0], threadIds[1])
 
416
 
 
417
 
 
418
    def test_callbackContext(self):
 
419
        """
 
420
        The context L{ThreadPool.callInThreadWithCallback} is invoked in is
 
421
        shared by the context the callable and C{onResult} callback are
 
422
        invoked in.
 
423
        """
 
424
        myctx = context.theContextTracker.currentContext().contexts[-1]
 
425
        myctx['testing'] = 'this must be present'
 
426
 
 
427
        contexts = []
 
428
 
 
429
        event = threading.Event()
 
430
 
 
431
        def onResult(success, result):
 
432
            ctx = context.theContextTracker.currentContext().contexts[-1]
 
433
            contexts.append(ctx)
 
434
            event.set()
 
435
 
 
436
        def func():
 
437
            ctx = context.theContextTracker.currentContext().contexts[-1]
 
438
            contexts.append(ctx)
 
439
 
 
440
        tp = threadpool.ThreadPool(0, 1)
 
441
        tp.callInThreadWithCallback(onResult, func)
 
442
        tp.start()
 
443
        self.addCleanup(tp.stop)
 
444
 
 
445
        event.wait(self.getTimeout())
 
446
 
 
447
        self.assertEqual(len(contexts), 2)
 
448
        self.assertEqual(myctx, contexts[0])
 
449
        self.assertEqual(myctx, contexts[1])
 
450
 
 
451
 
 
452
    def test_existingWork(self):
 
453
        """
 
454
        Work added to the threadpool before its start should be executed once
 
455
        the threadpool is started: this is ensured by trying to release a lock
 
456
        previously acquired.
 
457
        """
 
458
        waiter = threading.Lock()
 
459
        waiter.acquire()
 
460
 
 
461
        tp = threadpool.ThreadPool(0, 1)
 
462
        tp.callInThread(waiter.release) # before start()
 
463
        tp.start()
 
464
 
 
465
        try:
 
466
            self._waitForLock(waiter)
 
467
        finally:
 
468
            tp.stop()
 
469
 
 
470
 
 
471
    def test_dispatchDeprecation(self):
 
472
        """
 
473
        Test for the deprecation of the dispatch method.
 
474
        """
 
475
        tp = threadpool.ThreadPool()
 
476
        tp.start()
 
477
        self.addCleanup(tp.stop)
 
478
 
 
479
        def cb():
 
480
            return tp.dispatch(None, lambda: None)
 
481
 
 
482
        self.assertWarns(DeprecationWarning,
 
483
                         "dispatch() is deprecated since Twisted 8.0, "
 
484
                         "use callInThread() instead",
 
485
                         __file__, cb)
 
486
 
 
487
 
 
488
    def test_dispatchWithCallbackDeprecation(self):
 
489
        """
 
490
        Test for the deprecation of the dispatchWithCallback method.
 
491
        """
 
492
        tp = threadpool.ThreadPool()
 
493
        tp.start()
 
494
        self.addCleanup(tp.stop)
 
495
 
 
496
        def cb():
 
497
            return tp.dispatchWithCallback(
 
498
                None,
 
499
                lambda x: None,
 
500
                lambda x: None,
 
501
                lambda: None)
 
502
 
 
503
        self.assertWarns(DeprecationWarning,
 
504
                     "dispatchWithCallback() is deprecated since Twisted 8.0, "
 
505
                     "use twisted.internet.threads.deferToThread() instead.",
 
506
                     __file__, cb)
 
507
 
 
508
 
 
509
 
 
510
class RaceConditionTestCase(unittest.TestCase):
 
511
    def setUp(self):
 
512
        self.event = threading.Event()
 
513
        self.threadpool = threadpool.ThreadPool(0, 10)
 
514
        self.threadpool.start()
 
515
 
 
516
 
 
517
    def tearDown(self):
 
518
        del self.event
 
519
        self.threadpool.stop()
 
520
        del self.threadpool
 
521
 
 
522
 
 
523
    def test_synchronization(self):
 
524
        """
 
525
        Test a race condition: ensure that actions run in the pool synchronize
 
526
        with actions run in the main thread.
 
527
        """
 
528
        timeout = self.getTimeout()
 
529
        self.threadpool.callInThread(self.event.set)
 
530
        self.event.wait(timeout)
 
531
        self.event.clear()
 
532
        for i in range(3):
 
533
            self.threadpool.callInThread(self.event.wait)
 
534
        self.threadpool.callInThread(self.event.set)
 
535
        self.event.wait(timeout)
 
536
        if not self.event.isSet():
 
537
            self.event.set()
 
538
            self.fail("Actions not synchronized")
 
539
 
 
540
 
 
541
    def test_singleThread(self):
 
542
        """
 
543
        The submission of a new job to a thread pool in response to the
 
544
        C{onResult} callback does not cause a new thread to be added to the
 
545
        thread pool.
 
546
 
 
547
        This requires that the thread which calls C{onResult} to have first
 
548
        marked itself as available so that when the new job is queued, that
 
549
        thread may be considered to run it.  This is desirable so that when
 
550
        only N jobs are ever being executed in the thread pool at once only
 
551
        N threads will ever be created.
 
552
        """
 
553
        # Ensure no threads running
 
554
        self.assertEquals(self.threadpool.workers, 0)
 
555
 
 
556
        loopDeferred = Deferred()
 
557
 
 
558
        def onResult(success, counter):
 
559
            reactor.callFromThread(submit, counter)
 
560
 
 
561
        def submit(counter):
 
562
            if counter:
 
563
                self.threadpool.callInThreadWithCallback(
 
564
                    onResult, lambda: counter - 1)
 
565
            else:
 
566
                loopDeferred.callback(None)
 
567
 
 
568
        def cbLoop(ignored):
 
569
            # Ensure there is only one thread running.
 
570
            self.assertEqual(self.threadpool.workers, 1)
 
571
 
 
572
        loopDeferred.addCallback(cbLoop)
 
573
        submit(10)
 
574
        return loopDeferred
 
575
 
 
576
 
 
577
 
 
578
if interfaces.IReactorThreads(reactor, None) is None:
 
579
    for cls in ThreadPoolTestCase, RaceConditionTestCase:
 
580
        setattr(cls, 'skip', "No thread support, nothing to test here")
 
581
else:
 
582
    import threading
 
583
    from twisted.python import threadpool