1
# -*- test-case-name: twisted.test.test_internet -*-
2
# $Id: default.py,v 1.90 2004/01/06 22:35:22 warner Exp $
4
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
5
# See LICENSE for details.
7
from __future__ import generators
9
"""Threaded select reactor
11
API Stability: unstable
13
Maintainer: U{Bob Ippolito<mailto:bob@redivi.com>}
16
The threadedselectreactor is a specialized reactor for integrating with
17
arbitrary foreign event loop, such as those you find in GUI toolkits.
19
There are three things you'll need to do to use this reactor.
21
Install the reactor at the beginning of your program, before importing
24
| from twisted.internet import threadedselectreactor
25
| threadedselectreactor.install()
27
Interleave this reactor with your foreign event loop, at some point after
28
your event loop is initialized::
30
| from twisted.internet import reactor
31
| reactor.interleave(foreignEventLoopWakerFunction)
32
| self.addSystemEventTrigger('after', 'shutdown', foreignEventLoopStop)
34
Instead of shutting down the foreign event loop directly, shut down the
37
| from twisted.internet import reactor
40
In order for Twisted to do its work in the main thread (the thread that
41
interleave is called from), a waker function is necessary. The waker function
42
will be called from a "background" thread with one argument: func.
43
The waker function's purpose is to call func() from the main thread.
44
Many GUI toolkits ship with appropriate waker functions.
45
Some examples of this are wxPython's wx.callAfter (may be wxCallAfter in
46
older versions of wxPython) or PyObjC's PyObjCTools.AppHelper.callAfter.
47
These would be used in place of "foreignEventLoopWakerFunction" in the above
50
The other integration point at which the foreign event loop and this reactor
51
must integrate is shutdown. In order to ensure clean shutdown of Twisted,
52
you must allow for Twisted to come to a complete stop before quitting the
53
application. Typically, you will do this by setting up an after shutdown
54
trigger to stop your foreign event loop, and call reactor.stop() where you
55
would normally have initiated the shutdown procedure for the foreign event
56
loop. Shutdown functions that could be used in place of
57
"foreignEventloopStop" would be the ExitMainLoop method of the wxApp instance
58
with wxPython, or the PyObjCTools.AppHelper.stopEventLoop function.
61
from threading import Thread
62
from Queue import Queue, Empty
63
from time import sleep
66
from zope.interface import implements
68
from twisted.internet.interfaces import IReactorFDSet
69
from twisted.internet import error
70
from twisted.internet import posixbase
71
from twisted.python import log, components, failure, threadable
72
from twisted.persisted import styles
73
from twisted.python.runtime import platformType
76
from errno import EINTR, EBADF
78
from twisted.internet.selectreactor import _select
80
# Exceptions that doSelect might return frequently
81
_NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method')
82
_NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
84
def dictRemove(dct, value):
90
def raiseException(e):
93
class ThreadedSelectReactor(posixbase.PosixReactorBase):
94
"""A threaded select() based reactor - runs on all POSIX platforms and on
97
implements(IReactorFDSet)
103
self.toThreadQueue = Queue()
104
self.toMainThread = Queue()
105
self.workerThread = None
106
self.mainWaker = None
107
posixbase.PosixReactorBase.__init__(self)
108
self.addSystemEventTrigger('after', 'shutdown', self._mainLoopShutdown)
111
# we want to wake up from any thread
114
def callLater(self, *args, **kw):
115
tple = posixbase.PosixReactorBase.callLater(self, *args, **kw)
119
def _sendToMain(self, msg, *args):
120
#print >>sys.stderr, 'sendToMain', msg, args
121
self.toMainThread.put((msg, args))
122
if self.mainWaker is not None:
125
def _sendToThread(self, fn, *args):
126
#print >>sys.stderr, 'sendToThread', fn, args
127
self.toThreadQueue.put((fn, args))
129
def _preenDescriptorsInThread(self):
130
log.msg("Malformed file descriptor found. Preening lists.")
131
readers = self.reads.keys()
132
writers = self.writes.keys()
135
for selDict, selList in ((self.reads, readers), (self.writes, writers)):
136
for selectable in selList:
138
select.select([selectable], [selectable], [selectable], 0)
140
log.msg("bad descriptor %s" % selectable)
142
selDict[selectable] = 1
144
def _workerInThread(self):
147
fn, args = self.toThreadQueue.get()
148
#print >>sys.stderr, "worker got", fn, args
153
f = failure.Failure()
154
self._sendToMain('Failure', f)
155
#print >>sys.stderr, "worker finished"
157
def _doSelectInThread(self, timeout):
158
"""Run one iteration of the I/O monitor loop.
160
This will run all selectables who had input or output readiness
167
r, w, ignored = _select(reads.keys(),
171
except ValueError, ve:
172
# Possibly a file descriptor has gone negative?
174
self._preenDescriptorsInThread()
175
except TypeError, te:
176
# Something *totally* invalid (object w/o fileno, non-integral
179
self._preenDescriptorsInThread()
180
except (select.error, IOError), se:
181
# select(2) encountered an error
182
if se.args[0] in (0, 2):
183
# windows does this if it got an empty list
184
if (not reads) and (not writes):
188
elif se.args[0] == EINTR:
190
elif se.args[0] == EBADF:
191
self._preenDescriptorsInThread()
193
# OK, I really don't know what's going on. Blow up.
195
self._sendToMain('Notify', r, w)
197
def _process_Notify(self, r, w):
198
#print >>sys.stderr, "_process_Notify"
202
_drdw = self._doReadOrWrite
203
_logrun = log.callWithLogger
204
for selectables, method, dct in ((r, "doRead", reads), (w, "doWrite", writes)):
205
for selectable in selectables:
206
# if this was disconnected in another thread, kill it.
207
if selectable not in dct:
209
# This for pausing input when we're not ready for more.
210
_logrun(selectable, _drdw, selectable, method, dct)
211
#print >>sys.stderr, "done _process_Notify"
213
def _process_Failure(self, f):
216
_doIterationInThread = _doSelectInThread
218
def ensureWorkerThread(self):
219
if self.workerThread is None or not self.workerThread.isAlive():
220
self.workerThread = Thread(target=self._workerInThread)
221
self.workerThread.setDaemon(True)
222
self.workerThread.start()
224
def doThreadIteration(self, timeout):
225
self._sendToThread(self._doIterationInThread, timeout)
226
self.ensureWorkerThread()
227
#print >>sys.stderr, 'getting...'
228
msg, args = self.toMainThread.get()
229
#print >>sys.stderr, 'got', msg, args
230
getattr(self, '_process_' + msg)(*args)
232
doIteration = doThreadIteration
234
def mainLoopBegin(self):
236
self.runUntilCurrent()
238
def _interleave(self):
240
#print >>sys.stderr, "runUntilCurrent"
241
self.runUntilCurrent()
243
t = self.running and t2
244
self._sendToThread(self._doIterationInThread, t)
245
#print >>sys.stderr, "yielding"
247
#print >>sys.stderr, "fetching"
248
msg, args = self.toMainThread.get_nowait()
249
getattr(self, '_process_' + msg)(*args)
251
def interleave(self, waker, *args, **kw):
253
interleave(waker) interleaves this reactor with the
254
current application by moving the blocking parts of
255
the reactor (select() in this case) to a separate
256
thread. This is typically useful for integration with
257
GUI applications which have their own event loop
260
See the module docstring for more information.
262
self.startRunning(*args, **kw)
263
loop = self._interleave()
264
def mainWaker(waker=waker, loop=loop):
265
#print >>sys.stderr, "mainWaker()"
267
self.mainWaker = mainWaker
269
self.ensureWorkerThread()
271
def _mainLoopShutdown(self):
272
self.mainWaker = None
273
if self.workerThread is not None:
274
#print >>sys.stderr, 'getting...'
275
self._sendToThread(raiseException, SystemExit)
279
msg, args = self.toMainThread.get_nowait()
280
#print >>sys.stderr, "ignored:", (msg, args)
283
self.workerThread.join()
284
self.workerThread = None
287
fn, args = self.toThreadQueue.get_nowait()
288
if fn is self._doIterationInThread:
289
log.msg('Iteration is still in the thread queue!')
290
elif fn is raiseException and args[0] is SystemExit:
297
def _doReadOrWrite(self, selectable, method, dict):
299
why = getattr(selectable, method)()
300
handfn = getattr(selectable, 'fileno', None)
306
why = sys.exc_info()[1]
309
self._disconnectSelectable(selectable, why, method == "doRead")
311
def addReader(self, reader):
312
"""Add a FileDescriptor for notification of data available to read.
314
self._sendToThread(self.reads.__setitem__, reader, 1)
317
def addWriter(self, writer):
318
"""Add a FileDescriptor for notification of data available to write.
320
self._sendToThread(self.writes.__setitem__, writer, 1)
323
def removeReader(self, reader):
324
"""Remove a Selectable for notification of data available to read.
326
self._sendToThread(dictRemove, self.reads, reader)
328
def removeWriter(self, writer):
329
"""Remove a Selectable for notification of data available to write.
331
self._sendToThread(dictRemove, self.writes, writer)
334
return self._removeAll(self.reads, self.writes)
336
def run(self, installSignalHandlers=1):
337
self.startRunning(installSignalHandlers=installSignalHandlers)
342
self.interleave(q.put)
346
except StopIteration:
352
"""Configure the twisted mainloop to be run using the select() reactor.
354
reactor = ThreadedSelectReactor()
355
from twisted.internet.main import installReactor
356
installReactor(reactor)
359
__all__ = ['install']