~ntt-pf-lab/nova/monkey_patch_notification

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/python/threadpool.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_threadpool -*-
 
2
# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
 
 
6
"""
 
7
twisted.threadpool: a pool of threads to which we dispatch tasks.
 
8
 
 
9
In most cases you can just use reactor.callInThread and friends
 
10
instead of creating a thread pool directly.
 
11
"""
 
12
 
 
13
# System Imports
 
14
import Queue
 
15
import threading
 
16
import copy
 
17
import sys
 
18
import warnings
 
19
 
 
20
 
 
21
# Twisted Imports
 
22
from twisted.python import log, runtime, context, failure
 
23
 
 
24
WorkerStop = object()
 
25
 
 
26
 
 
27
class ThreadPool:
 
28
    """
 
29
    This class (hopefully) generalizes the functionality of a pool of
 
30
    threads to which work can be dispatched.
 
31
 
 
32
    callInThread() and stop() should only be called from
 
33
    a single thread, unless you make a subclass where stop() and
 
34
    _startSomeWorkers() are synchronized.
 
35
    """
 
36
    min = 5
 
37
    max = 20
 
38
    joined = False
 
39
    started = False
 
40
    workers = 0
 
41
    name = None
 
42
 
 
43
    threadFactory = threading.Thread
 
44
    currentThread = staticmethod(threading.currentThread)
 
45
 
 
46
    def __init__(self, minthreads=5, maxthreads=20, name=None):
 
47
        """
 
48
        Create a new threadpool.
 
49
 
 
50
        @param minthreads: minimum number of threads in the pool
 
51
 
 
52
        @param maxthreads: maximum number of threads in the pool
 
53
        """
 
54
        assert minthreads >= 0, 'minimum is negative'
 
55
        assert minthreads <= maxthreads, 'minimum is greater than maximum'
 
56
        self.q = Queue.Queue(0)
 
57
        self.min = minthreads
 
58
        self.max = maxthreads
 
59
        self.name = name
 
60
        if runtime.platform.getType() != "java":
 
61
            self.waiters = []
 
62
            self.threads = []
 
63
            self.working = []
 
64
        else:
 
65
            self.waiters = ThreadSafeList()
 
66
            self.threads = ThreadSafeList()
 
67
            self.working = ThreadSafeList()
 
68
 
 
69
    def start(self):
 
70
        """
 
71
        Start the threadpool.
 
72
        """
 
73
        self.joined = False
 
74
        self.started = True
 
75
        # Start some threads.
 
76
        self.adjustPoolsize()
 
77
 
 
78
    def startAWorker(self):
 
79
        self.workers += 1
 
80
        name = "PoolThread-%s-%s" % (self.name or id(self), self.workers)
 
81
        newThread = self.threadFactory(target=self._worker, name=name)
 
82
        self.threads.append(newThread)
 
83
        newThread.start()
 
84
 
 
85
    def stopAWorker(self):
 
86
        self.q.put(WorkerStop)
 
87
        self.workers -= 1
 
88
 
 
89
    def __setstate__(self, state):
 
90
        self.__dict__ = state
 
91
        ThreadPool.__init__(self, self.min, self.max)
 
92
 
 
93
    def __getstate__(self):
 
94
        state = {}
 
95
        state['min'] = self.min
 
96
        state['max'] = self.max
 
97
        return state
 
98
 
 
99
    def _startSomeWorkers(self):
 
100
        neededSize = self.q.qsize() + len(self.working)
 
101
        # Create enough, but not too many
 
102
        while self.workers < min(self.max, neededSize):
 
103
            self.startAWorker()
 
104
 
 
105
 
 
106
    def dispatch(self, owner, func, *args, **kw):
 
107
        """
 
108
        DEPRECATED: use L{callInThread} instead.
 
109
 
 
110
        Dispatch a function to be a run in a thread.
 
111
        """
 
112
        warnings.warn("dispatch() is deprecated since Twisted 8.0, "
 
113
                      "use callInThread() instead",
 
114
                      DeprecationWarning, stacklevel=2)
 
115
        self.callInThread(func, *args, **kw)
 
116
 
 
117
 
 
118
    def callInThread(self, func, *args, **kw):
 
119
        """
 
120
        Call a callable object in a separate thread.
 
121
 
 
122
        @param func: callable object to be called in separate thread
 
123
 
 
124
        @param *args: positional arguments to be passed to func
 
125
 
 
126
        @param **kw: keyword args to be passed to func
 
127
        """
 
128
        self.callInThreadWithCallback(None, func, *args, **kw)
 
129
 
 
130
 
 
131
    def callInThreadWithCallback(self, onResult, func, *args, **kw):
 
132
        """
 
133
        Call a callable object in a separate thread and call onResult
 
134
        with the return value, or a L{twisted.python.failure.Failure}
 
135
        if the callable raises an exception.
 
136
 
 
137
        The callable is allowed to block, but the onResult function
 
138
        must not block and should perform as little work as possible.
 
139
 
 
140
        A typical action for onResult for a threadpool used with a
 
141
        Twisted reactor would be to schedule a Deferred to fire in the
 
142
        main reactor thread using C{.callFromThread}.  Note that
 
143
        onResult is called inside the separate thread, not inside the
 
144
        reactor thread.
 
145
 
 
146
        @param onResult: a callable with the signature (success, result).
 
147
            If the callable returns normally, onResult is called with
 
148
            (True, result) where result is the return value of the callable.
 
149
            If the callable throws an exception, onResult is called with
 
150
            (False, failure).
 
151
 
 
152
            Optionally, onResult may be None, in which case it is not
 
153
            called at all.
 
154
 
 
155
        @param func: callable object to be called in separate thread
 
156
 
 
157
        @param *args: positional arguments to be passed to func
 
158
 
 
159
        @param **kwargs: keyword arguments to be passed to func
 
160
        """
 
161
        if self.joined:
 
162
            return
 
163
        ctx = context.theContextTracker.currentContext().contexts[-1]
 
164
        o = (ctx, func, args, kw, onResult)
 
165
        self.q.put(o)
 
166
        if self.started:
 
167
            self._startSomeWorkers()
 
168
 
 
169
 
 
170
    def _runWithCallback(self, callback, errback, func, args, kwargs):
 
171
        try:
 
172
            result = apply(func, args, kwargs)
 
173
        except:
 
174
            errback(sys.exc_info()[1])
 
175
        else:
 
176
            callback(result)
 
177
 
 
178
 
 
179
    def dispatchWithCallback(self, owner, callback, errback, func, *args, **kw):
 
180
        """
 
181
        DEPRECATED: use L{twisted.internet.threads.deferToThread} instead.
 
182
 
 
183
        Dispatch a function, returning the result to a callback function.
 
184
 
 
185
        The callback function will be called in the thread - make sure it is
 
186
        thread-safe.
 
187
        """
 
188
        warnings.warn("dispatchWithCallback() is deprecated since Twisted 8.0, "
 
189
                      "use twisted.internet.threads.deferToThread() instead.",
 
190
                      DeprecationWarning, stacklevel=2)
 
191
        self.callInThread(
 
192
            self._runWithCallback, callback, errback, func, args, kw
 
193
        )
 
194
 
 
195
 
 
196
    def _worker(self):
 
197
        """
 
198
        Method used as target of the created threads: retrieve task to run
 
199
        from the threadpool, run it, and proceed to the next task until
 
200
        threadpool is stopped.
 
201
        """
 
202
        ct = self.currentThread()
 
203
        o = self.q.get()
 
204
        while o is not WorkerStop:
 
205
            self.working.append(ct)
 
206
            ctx, function, args, kwargs, onResult = o
 
207
            del o
 
208
 
 
209
            try:
 
210
                result = context.call(ctx, function, *args, **kwargs)
 
211
                success = True
 
212
            except:
 
213
                success = False
 
214
                if onResult is None:
 
215
                    context.call(ctx, log.err)
 
216
                    result = None
 
217
                else:
 
218
                    result = failure.Failure()
 
219
 
 
220
            del function, args, kwargs
 
221
 
 
222
            self.working.remove(ct)
 
223
 
 
224
            if onResult is not None:
 
225
                try:
 
226
                    context.call(ctx, onResult, success, result)
 
227
                except:
 
228
                    context.call(ctx, log.err)
 
229
 
 
230
            del ctx, onResult, result
 
231
 
 
232
            self.waiters.append(ct)
 
233
            o = self.q.get()
 
234
            self.waiters.remove(ct)
 
235
 
 
236
        self.threads.remove(ct)
 
237
 
 
238
    def stop(self):
 
239
        """
 
240
        Shutdown the threads in the threadpool.
 
241
        """
 
242
        self.joined = True
 
243
        threads = copy.copy(self.threads)
 
244
        while self.workers:
 
245
            self.q.put(WorkerStop)
 
246
            self.workers -= 1
 
247
 
 
248
        # and let's just make sure
 
249
        # FIXME: threads that have died before calling stop() are not joined.
 
250
        for thread in threads:
 
251
            thread.join()
 
252
 
 
253
    def adjustPoolsize(self, minthreads=None, maxthreads=None):
 
254
        if minthreads is None:
 
255
            minthreads = self.min
 
256
        if maxthreads is None:
 
257
            maxthreads = self.max
 
258
 
 
259
        assert minthreads >= 0, 'minimum is negative'
 
260
        assert minthreads <= maxthreads, 'minimum is greater than maximum'
 
261
 
 
262
        self.min = minthreads
 
263
        self.max = maxthreads
 
264
        if not self.started:
 
265
            return
 
266
 
 
267
        # Kill of some threads if we have too many.
 
268
        while self.workers > self.max:
 
269
            self.stopAWorker()
 
270
        # Start some threads if we have too few.
 
271
        while self.workers < self.min:
 
272
            self.startAWorker()
 
273
        # Start some threads if there is a need.
 
274
        self._startSomeWorkers()
 
275
 
 
276
    def dumpStats(self):
 
277
        log.msg('queue: %s'   % self.q.queue)
 
278
        log.msg('waiters: %s' % self.waiters)
 
279
        log.msg('workers: %s' % self.working)
 
280
        log.msg('total: %s'   % self.threads)
 
281
 
 
282
 
 
283
class ThreadSafeList:
 
284
    """
 
285
    In Jython 2.1 lists aren't thread-safe, so this wraps it.
 
286
    """
 
287
 
 
288
    def __init__(self):
 
289
        self.lock = threading.Lock()
 
290
        self.l = []
 
291
 
 
292
    def append(self, i):
 
293
        self.lock.acquire()
 
294
        try:
 
295
            self.l.append(i)
 
296
        finally:
 
297
            self.lock.release()
 
298
 
 
299
    def remove(self, i):
 
300
        self.lock.acquire()
 
301
        try:
 
302
            self.l.remove(i)
 
303
        finally:
 
304
            self.lock.release()
 
305
 
 
306
    def __len__(self):
 
307
        return len(self.l)
 
308