1
# -*- test-case-name: twisted.test.test_threadpool -*-
2
# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
3
# See LICENSE for details.
7
twisted.threadpool: a pool of threads to which we dispatch tasks.
9
In most cases you can just use reactor.callInThread and friends
10
instead of creating a thread pool directly.
22
from twisted.python import log, runtime, context, failure
29
This class (hopefully) generalizes the functionality of a pool of
30
threads to which work can be dispatched.
32
callInThread() and stop() should only be called from
33
a single thread, unless you make a subclass where stop() and
34
_startSomeWorkers() are synchronized.
43
threadFactory = threading.Thread
44
currentThread = staticmethod(threading.currentThread)
46
def __init__(self, minthreads=5, maxthreads=20, name=None):
48
Create a new threadpool.
50
@param minthreads: minimum number of threads in the pool
52
@param maxthreads: maximum number of threads in the pool
54
assert minthreads >= 0, 'minimum is negative'
55
assert minthreads <= maxthreads, 'minimum is greater than maximum'
56
self.q = Queue.Queue(0)
60
if runtime.platform.getType() != "java":
65
self.waiters = ThreadSafeList()
66
self.threads = ThreadSafeList()
67
self.working = ThreadSafeList()
78
def startAWorker(self):
80
name = "PoolThread-%s-%s" % (self.name or id(self), self.workers)
81
newThread = self.threadFactory(target=self._worker, name=name)
82
self.threads.append(newThread)
85
def stopAWorker(self):
86
self.q.put(WorkerStop)
89
def __setstate__(self, state):
91
ThreadPool.__init__(self, self.min, self.max)
93
def __getstate__(self):
95
state['min'] = self.min
96
state['max'] = self.max
99
def _startSomeWorkers(self):
100
neededSize = self.q.qsize() + len(self.working)
101
# Create enough, but not too many
102
while self.workers < min(self.max, neededSize):
106
def dispatch(self, owner, func, *args, **kw):
108
DEPRECATED: use L{callInThread} instead.
110
Dispatch a function to be a run in a thread.
112
warnings.warn("dispatch() is deprecated since Twisted 8.0, "
113
"use callInThread() instead",
114
DeprecationWarning, stacklevel=2)
115
self.callInThread(func, *args, **kw)
118
def callInThread(self, func, *args, **kw):
120
Call a callable object in a separate thread.
122
@param func: callable object to be called in separate thread
124
@param *args: positional arguments to be passed to func
126
@param **kw: keyword args to be passed to func
128
self.callInThreadWithCallback(None, func, *args, **kw)
131
def callInThreadWithCallback(self, onResult, func, *args, **kw):
133
Call a callable object in a separate thread and call onResult
134
with the return value, or a L{twisted.python.failure.Failure}
135
if the callable raises an exception.
137
The callable is allowed to block, but the onResult function
138
must not block and should perform as little work as possible.
140
A typical action for onResult for a threadpool used with a
141
Twisted reactor would be to schedule a Deferred to fire in the
142
main reactor thread using C{.callFromThread}. Note that
143
onResult is called inside the separate thread, not inside the
146
@param onResult: a callable with the signature (success, result).
147
If the callable returns normally, onResult is called with
148
(True, result) where result is the return value of the callable.
149
If the callable throws an exception, onResult is called with
152
Optionally, onResult may be None, in which case it is not
155
@param func: callable object to be called in separate thread
157
@param *args: positional arguments to be passed to func
159
@param **kwargs: keyword arguments to be passed to func
163
ctx = context.theContextTracker.currentContext().contexts[-1]
164
o = (ctx, func, args, kw, onResult)
167
self._startSomeWorkers()
170
def _runWithCallback(self, callback, errback, func, args, kwargs):
172
result = apply(func, args, kwargs)
174
errback(sys.exc_info()[1])
179
def dispatchWithCallback(self, owner, callback, errback, func, *args, **kw):
181
DEPRECATED: use L{twisted.internet.threads.deferToThread} instead.
183
Dispatch a function, returning the result to a callback function.
185
The callback function will be called in the thread - make sure it is
188
warnings.warn("dispatchWithCallback() is deprecated since Twisted 8.0, "
189
"use twisted.internet.threads.deferToThread() instead.",
190
DeprecationWarning, stacklevel=2)
192
self._runWithCallback, callback, errback, func, args, kw
198
Method used as target of the created threads: retrieve task to run
199
from the threadpool, run it, and proceed to the next task until
200
threadpool is stopped.
202
ct = self.currentThread()
204
while o is not WorkerStop:
205
self.working.append(ct)
206
ctx, function, args, kwargs, onResult = o
210
result = context.call(ctx, function, *args, **kwargs)
215
context.call(ctx, log.err)
218
result = failure.Failure()
220
del function, args, kwargs
222
self.working.remove(ct)
224
if onResult is not None:
226
context.call(ctx, onResult, success, result)
228
context.call(ctx, log.err)
230
del ctx, onResult, result
232
self.waiters.append(ct)
234
self.waiters.remove(ct)
236
self.threads.remove(ct)
240
Shutdown the threads in the threadpool.
243
threads = copy.copy(self.threads)
245
self.q.put(WorkerStop)
248
# and let's just make sure
249
# FIXME: threads that have died before calling stop() are not joined.
250
for thread in threads:
253
def adjustPoolsize(self, minthreads=None, maxthreads=None):
254
if minthreads is None:
255
minthreads = self.min
256
if maxthreads is None:
257
maxthreads = self.max
259
assert minthreads >= 0, 'minimum is negative'
260
assert minthreads <= maxthreads, 'minimum is greater than maximum'
262
self.min = minthreads
263
self.max = maxthreads
267
# Kill of some threads if we have too many.
268
while self.workers > self.max:
270
# Start some threads if we have too few.
271
while self.workers < self.min:
273
# Start some threads if there is a need.
274
self._startSomeWorkers()
277
log.msg('queue: %s' % self.q.queue)
278
log.msg('waiters: %s' % self.waiters)
279
log.msg('workers: %s' % self.working)
280
log.msg('total: %s' % self.threads)
283
class ThreadSafeList:
285
In Jython 2.1 lists aren't thread-safe, so this wraps it.
289
self.lock = threading.Lock()