~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/test/test_threads.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2008 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Tests for implementations of L{IReactorThreads}.
 
6
"""
 
7
 
 
8
__metaclass__ = type
 
9
 
 
10
from weakref import ref
 
11
import gc
 
12
 
 
13
from twisted.internet.test.reactormixins import ReactorBuilder
 
14
from twisted.python.threadpool import ThreadPool
 
15
 
 
16
 
 
17
class ThreadTestsBuilder(ReactorBuilder):
 
18
    """
 
19
    Builder for defining tests relating to L{IReactorThreads}.
 
20
    """
 
21
    def test_getThreadPool(self):
 
22
        """
 
23
        C{reactor.getThreadPool()} returns an instance of L{ThreadPool} which
 
24
        starts when C{reactor.run()} is called and stops before it returns.
 
25
        """
 
26
        state = []
 
27
        reactor = self.buildReactor()
 
28
 
 
29
        pool = reactor.getThreadPool()
 
30
        self.assertIsInstance(pool, ThreadPool)
 
31
        self.assertFalse(
 
32
            pool.started, "Pool should not start before reactor.run")
 
33
 
 
34
        def f():
 
35
            # Record the state for later assertions
 
36
            state.append(pool.started)
 
37
            state.append(pool.joined)
 
38
            reactor.stop()
 
39
 
 
40
        reactor.callWhenRunning(f)
 
41
        self.runReactor(reactor, 2)
 
42
 
 
43
        self.assertTrue(
 
44
            state[0], "Pool should start after reactor.run")
 
45
        self.assertFalse(
 
46
            state[1], "Pool should not be joined before reactor.stop")
 
47
        self.assertTrue(
 
48
            pool.joined,
 
49
            "Pool should be stopped after reactor.run returns")
 
50
 
 
51
 
 
52
    def test_suggestThreadPoolSize(self):
 
53
        """
 
54
        C{reactor.suggestThreadPoolSize()} sets the maximum size of the reactor
 
55
        threadpool.
 
56
        """
 
57
        reactor = self.buildReactor()
 
58
        reactor.suggestThreadPoolSize(17)
 
59
        pool = reactor.getThreadPool()
 
60
        self.assertEqual(pool.max, 17)
 
61
 
 
62
 
 
63
    def test_delayedCallFromThread(self):
 
64
        """
 
65
        A function scheduled with L{IReactorThreads.callFromThread} invoked
 
66
        from a delayed call is run immediately in the next reactor iteration.
 
67
 
 
68
        When invoked from the reactor thread, previous implementations of
 
69
        L{IReactorThreads.callFromThread} would skip the pipe/socket based wake
 
70
        up step, assuming the reactor would wake up on its own.  However, this
 
71
        resulted in the reactor not noticing a insert into the thread queue at
 
72
        the right time (in this case, after the thread queue has been processed
 
73
        for that reactor iteration).
 
74
        """
 
75
        reactor = self.buildReactor()
 
76
 
 
77
        def threadCall():
 
78
            reactor.stop()
 
79
 
 
80
        # Set up the use of callFromThread being tested.
 
81
        reactor.callLater(0, reactor.callFromThread, threadCall)
 
82
 
 
83
        before = reactor.seconds()
 
84
        self.runReactor(reactor, 60)
 
85
        after = reactor.seconds()
 
86
 
 
87
        # We specified a timeout of 60 seconds.  The timeout code in runReactor
 
88
        # probably won't actually work, though.  If the reactor comes out of
 
89
        # the event notification API just a little bit early, say after 59.9999
 
90
        # seconds instead of after 60 seconds, then the queued thread call will
 
91
        # get processed but the timeout delayed call runReactor sets up won't!
 
92
        # Then the reactor will stop and runReactor will return without the
 
93
        # timeout firing.  As it turns out, select() and poll() are quite
 
94
        # likely to return *slightly* earlier than we ask them to, so the
 
95
        # timeout will rarely happen, even if callFromThread is broken.  So,
 
96
        # instead we'll measure the elapsed time and make sure it's something
 
97
        # less than about half of the timeout we specified.  This is heuristic.
 
98
        # It assumes that select() won't ever return after 30 seconds when we
 
99
        # asked it to timeout after 60 seconds.  And of course like all
 
100
        # time-based tests, it's slightly non-deterministic.  If the OS doesn't
 
101
        # schedule this process for 30 seconds, then the test might fail even
 
102
        # if callFromThread is working.
 
103
        self.assertTrue(after - before < 30)
 
104
 
 
105
 
 
106
    def test_stopThreadPool(self):
 
107
        """
 
108
        When the reactor stops, L{ReactorBase._stopThreadPool} drops the
 
109
        reactor's direct reference to its internal threadpool and removes
 
110
        the associated startup and shutdown triggers.
 
111
 
 
112
        This is the case of the thread pool being created before the reactor
 
113
        is run.
 
114
        """
 
115
        reactor = self.buildReactor()
 
116
        threadpool = ref(reactor.getThreadPool())
 
117
        reactor.callWhenRunning(reactor.stop)
 
118
        self.runReactor(reactor)
 
119
        gc.collect()
 
120
        self.assertIdentical(threadpool(), None)
 
121
 
 
122
 
 
123
    def test_stopThreadPoolWhenStartedAfterReactorRan(self):
 
124
        """
 
125
        We must handle the case of shutting down the thread pool when it was
 
126
        started after the reactor was run in a special way.
 
127
 
 
128
        Some implementation background: The thread pool is started with
 
129
        callWhenRunning, which only returns a system trigger ID when it is
 
130
        invoked before the reactor is started.
 
131
 
 
132
        This is the case of the thread pool being created after the reactor
 
133
        is started.
 
134
        """
 
135
        reactor = self.buildReactor()
 
136
        threadPoolRefs = []
 
137
        def acquireThreadPool():
 
138
            threadPoolRefs.append(ref(reactor.getThreadPool()))
 
139
            reactor.stop()
 
140
        reactor.callWhenRunning(acquireThreadPool)
 
141
        self.runReactor(reactor)
 
142
        gc.collect()
 
143
        self.assertIdentical(threadPoolRefs[0](), None)
 
144
 
 
145
 
 
146
    def test_cleanUpThreadPoolEvenBeforeReactorIsRun(self):
 
147
        """
 
148
        When the reactor has its shutdown event fired before it is run, the
 
149
        thread pool is completely destroyed.
 
150
 
 
151
        For what it's worth, the reason we support this behavior at all is
 
152
        because Trial does this.
 
153
 
 
154
        This is the case of the thread pool being created without the reactor
 
155
        being started at al.
 
156
        """
 
157
        reactor = self.buildReactor()
 
158
        threadPoolRef = ref(reactor.getThreadPool())
 
159
        reactor.fireSystemEvent("shutdown")
 
160
        self.assertIdentical(threadPoolRef(), None)
 
161
 
 
162
 
 
163
globals().update(ThreadTestsBuilder.makeTestCaseClasses())