1
# -*- test-case-name: twisted.test.test_internet -*-
2
# $Id: default.py,v 1.90 2004/01/06 22:35:22 warner Exp $
4
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
5
# See LICENSE for details.
7
from __future__ import generators
10
Threaded select reactor
12
Maintainer: Bob Ippolito
15
The threadedselectreactor is a specialized reactor for integrating with
16
arbitrary foreign event loop, such as those you find in GUI toolkits.
18
There are three things you'll need to do to use this reactor.
20
Install the reactor at the beginning of your program, before importing
23
| from twisted.internet import _threadedselect
24
| _threadedselect.install()
26
Interleave this reactor with your foreign event loop, at some point after
27
your event loop is initialized::
29
| from twisted.internet import reactor
30
| reactor.interleave(foreignEventLoopWakerFunction)
31
| self.addSystemEventTrigger('after', 'shutdown', foreignEventLoopStop)
33
Instead of shutting down the foreign event loop directly, shut down the
36
| from twisted.internet import reactor
39
In order for Twisted to do its work in the main thread (the thread that
40
interleave is called from), a waker function is necessary. The waker function
41
will be called from a "background" thread with one argument: func.
42
The waker function's purpose is to call func() from the main thread.
43
Many GUI toolkits ship with appropriate waker functions.
44
Some examples of this are wxPython's wx.callAfter (may be wxCallAfter in
45
older versions of wxPython) or PyObjC's PyObjCTools.AppHelper.callAfter.
46
These would be used in place of "foreignEventLoopWakerFunction" in the above
49
The other integration point at which the foreign event loop and this reactor
50
must integrate is shutdown. In order to ensure clean shutdown of Twisted,
51
you must allow for Twisted to come to a complete stop before quitting the
52
application. Typically, you will do this by setting up an after shutdown
53
trigger to stop your foreign event loop, and call reactor.stop() where you
54
would normally have initiated the shutdown procedure for the foreign event
55
loop. Shutdown functions that could be used in place of
56
"foreignEventloopStop" would be the ExitMainLoop method of the wxApp instance
57
with wxPython, or the PyObjCTools.AppHelper.stopEventLoop function.
60
from threading import Thread
61
from Queue import Queue, Empty
62
from time import sleep
65
from zope.interface import implements
67
from twisted.internet.interfaces import IReactorFDSet
68
from twisted.internet import error
69
from twisted.internet import posixbase
70
from twisted.python import log, failure, threadable
71
from twisted.persisted import styles
72
from twisted.python.runtime import platformType
75
from errno import EINTR, EBADF
77
from twisted.internet.selectreactor import _select
79
# Exceptions that doSelect might return frequently
80
_NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method')
81
_NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
83
def dictRemove(dct, value):
89
def raiseException(e):
92
class ThreadedSelectReactor(posixbase.PosixReactorBase):
93
"""A threaded select() based reactor - runs on all POSIX platforms and on
96
implements(IReactorFDSet)
102
self.toThreadQueue = Queue()
103
self.toMainThread = Queue()
104
self.workerThread = None
105
self.mainWaker = None
106
posixbase.PosixReactorBase.__init__(self)
107
self.addSystemEventTrigger('after', 'shutdown', self._mainLoopShutdown)
110
# we want to wake up from any thread
113
def callLater(self, *args, **kw):
114
tple = posixbase.PosixReactorBase.callLater(self, *args, **kw)
118
def _sendToMain(self, msg, *args):
119
#print >>sys.stderr, 'sendToMain', msg, args
120
self.toMainThread.put((msg, args))
121
if self.mainWaker is not None:
124
def _sendToThread(self, fn, *args):
125
#print >>sys.stderr, 'sendToThread', fn, args
126
self.toThreadQueue.put((fn, args))
128
def _preenDescriptorsInThread(self):
129
log.msg("Malformed file descriptor found. Preening lists.")
130
readers = self.reads.keys()
131
writers = self.writes.keys()
134
for selDict, selList in ((self.reads, readers), (self.writes, writers)):
135
for selectable in selList:
137
select.select([selectable], [selectable], [selectable], 0)
139
log.msg("bad descriptor %s" % selectable)
141
selDict[selectable] = 1
143
def _workerInThread(self):
146
fn, args = self.toThreadQueue.get()
147
#print >>sys.stderr, "worker got", fn, args
150
pass # exception indicates this thread should exit
152
f = failure.Failure()
153
self._sendToMain('Failure', f)
154
#print >>sys.stderr, "worker finished"
156
def _doSelectInThread(self, timeout):
157
"""Run one iteration of the I/O monitor loop.
159
This will run all selectables who had input or output readiness
166
r, w, ignored = _select(reads.keys(),
170
except ValueError, ve:
171
# Possibly a file descriptor has gone negative?
173
self._preenDescriptorsInThread()
174
except TypeError, te:
175
# Something *totally* invalid (object w/o fileno, non-integral
178
self._preenDescriptorsInThread()
179
except (select.error, IOError), se:
180
# select(2) encountered an error
181
if se.args[0] in (0, 2):
182
# windows does this if it got an empty list
183
if (not reads) and (not writes):
187
elif se.args[0] == EINTR:
189
elif se.args[0] == EBADF:
190
self._preenDescriptorsInThread()
192
# OK, I really don't know what's going on. Blow up.
194
self._sendToMain('Notify', r, w)
196
def _process_Notify(self, r, w):
197
#print >>sys.stderr, "_process_Notify"
201
_drdw = self._doReadOrWrite
202
_logrun = log.callWithLogger
203
for selectables, method, dct in ((r, "doRead", reads), (w, "doWrite", writes)):
204
for selectable in selectables:
205
# if this was disconnected in another thread, kill it.
206
if selectable not in dct:
208
# This for pausing input when we're not ready for more.
209
_logrun(selectable, _drdw, selectable, method, dct)
210
#print >>sys.stderr, "done _process_Notify"
212
def _process_Failure(self, f):
215
_doIterationInThread = _doSelectInThread
217
def ensureWorkerThread(self):
218
if self.workerThread is None or not self.workerThread.isAlive():
219
self.workerThread = Thread(target=self._workerInThread)
220
self.workerThread.start()
222
def doThreadIteration(self, timeout):
223
self._sendToThread(self._doIterationInThread, timeout)
224
self.ensureWorkerThread()
225
#print >>sys.stderr, 'getting...'
226
msg, args = self.toMainThread.get()
227
#print >>sys.stderr, 'got', msg, args
228
getattr(self, '_process_' + msg)(*args)
230
doIteration = doThreadIteration
232
def _interleave(self):
234
#print >>sys.stderr, "runUntilCurrent"
235
self.runUntilCurrent()
237
t = self.running and t2
238
self._sendToThread(self._doIterationInThread, t)
239
#print >>sys.stderr, "yielding"
241
#print >>sys.stderr, "fetching"
242
msg, args = self.toMainThread.get_nowait()
243
getattr(self, '_process_' + msg)(*args)
245
def interleave(self, waker, *args, **kw):
247
interleave(waker) interleaves this reactor with the
248
current application by moving the blocking parts of
249
the reactor (select() in this case) to a separate
250
thread. This is typically useful for integration with
251
GUI applications which have their own event loop
254
See the module docstring for more information.
256
self.startRunning(*args, **kw)
257
loop = self._interleave()
258
def mainWaker(waker=waker, loop=loop):
259
#print >>sys.stderr, "mainWaker()"
261
self.mainWaker = mainWaker
263
self.ensureWorkerThread()
265
def _mainLoopShutdown(self):
266
self.mainWaker = None
267
if self.workerThread is not None:
268
#print >>sys.stderr, 'getting...'
269
self._sendToThread(raiseException, SystemExit)
273
msg, args = self.toMainThread.get_nowait()
274
#print >>sys.stderr, "ignored:", (msg, args)
277
self.workerThread.join()
278
self.workerThread = None
281
fn, args = self.toThreadQueue.get_nowait()
282
if fn is self._doIterationInThread:
283
log.msg('Iteration is still in the thread queue!')
284
elif fn is raiseException and args[0] is SystemExit:
291
def _doReadOrWrite(self, selectable, method, dict):
293
why = getattr(selectable, method)()
294
handfn = getattr(selectable, 'fileno', None)
300
why = sys.exc_info()[1]
303
self._disconnectSelectable(selectable, why, method == "doRead")
305
def addReader(self, reader):
306
"""Add a FileDescriptor for notification of data available to read.
308
self._sendToThread(self.reads.__setitem__, reader, 1)
311
def addWriter(self, writer):
312
"""Add a FileDescriptor for notification of data available to write.
314
self._sendToThread(self.writes.__setitem__, writer, 1)
317
def removeReader(self, reader):
318
"""Remove a Selectable for notification of data available to read.
320
self._sendToThread(dictRemove, self.reads, reader)
322
def removeWriter(self, writer):
323
"""Remove a Selectable for notification of data available to write.
325
self._sendToThread(dictRemove, self.writes, writer)
328
return self._removeAll(self.reads, self.writes)
331
def getReaders(self):
332
return self.reads.keys()
335
def getWriters(self):
336
return self.writes.keys()
339
def run(self, installSignalHandlers=1):
340
self.startRunning(installSignalHandlers=installSignalHandlers)
345
self.interleave(q.put)
349
except StopIteration:
355
"""Configure the twisted mainloop to be run using the select() reactor.
357
reactor = ThreadedSelectReactor()
358
from twisted.internet.main import installReactor
359
installReactor(reactor)
362
__all__ = ['install']