~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/internet/threadedselectreactor.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# -*- test-case-name: twisted.test.test_internet -*-
2
 
# $Id: default.py,v 1.90 2004/01/06 22:35:22 warner Exp $
3
 
#
4
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
5
 
# See LICENSE for details.
6
 
 
7
 
from __future__ import generators
8
 
 
9
 
"""Threaded select reactor
10
 
 
11
 
API Stability: unstable
12
 
 
13
 
Maintainer: U{Bob Ippolito<mailto:bob@redivi.com>}
14
 
 
15
 
 
16
 
The threadedselectreactor is a specialized reactor for integrating with
17
 
arbitrary foreign event loop, such as those you find in GUI toolkits.
18
 
 
19
 
There are three things you'll need to do to use this reactor.
20
 
 
21
 
Install the reactor at the beginning of your program, before importing
22
 
the rest of Twisted::
23
 
 
24
 
    | from twisted.internet import threadedselectreactor
25
 
    | threadedselectreactor.install()
26
 
 
27
 
Interleave this reactor with your foreign event loop, at some point after
28
 
your event loop is initialized::
29
 
    
30
 
    | from twisted.internet import reactor
31
 
    | reactor.interleave(foreignEventLoopWakerFunction)
32
 
    | self.addSystemEventTrigger('after', 'shutdown', foreignEventLoopStop)
33
 
 
34
 
Instead of shutting down the foreign event loop directly, shut down the
35
 
reactor::
36
 
    
37
 
    | from twisted.internet import reactor
38
 
    | reactor.stop()
39
 
 
40
 
In order for Twisted to do its work in the main thread (the thread that
41
 
interleave is called from), a waker function is necessary.  The waker function
42
 
will be called from a "background" thread with one argument: func.
43
 
The waker function's purpose is to call func() from the main thread.
44
 
Many GUI toolkits ship with appropriate waker functions.
45
 
Some examples of this are wxPython's wx.callAfter (may be wxCallAfter in
46
 
older versions of wxPython) or PyObjC's PyObjCTools.AppHelper.callAfter.
47
 
These would be used in place of "foreignEventLoopWakerFunction" in the above
48
 
example.
49
 
 
50
 
The other integration point at which the foreign event loop and this reactor
51
 
must integrate is shutdown.  In order to ensure clean shutdown of Twisted,
52
 
you must allow for Twisted to come to a complete stop before quitting the
53
 
application.  Typically, you will do this by setting up an after shutdown
54
 
trigger to stop your foreign event loop, and call reactor.stop() where you
55
 
would normally have initiated the shutdown procedure for the foreign event
56
 
loop.  Shutdown functions that could be used in place of 
57
 
"foreignEventloopStop" would be the ExitMainLoop method of the wxApp instance
58
 
with wxPython, or the PyObjCTools.AppHelper.stopEventLoop function.
59
 
"""
60
 
 
61
 
from threading import Thread
62
 
from Queue import Queue, Empty
63
 
from time import sleep
64
 
import sys
65
 
 
66
 
from zope.interface import implements
67
 
 
68
 
from twisted.internet.interfaces import IReactorFDSet
69
 
from twisted.internet import error
70
 
from twisted.internet import posixbase
71
 
from twisted.python import log, components, failure, threadable
72
 
from twisted.persisted import styles
73
 
from twisted.python.runtime import platformType
74
 
 
75
 
import select
76
 
from errno import EINTR, EBADF
77
 
 
78
 
from twisted.internet.selectreactor import _select
79
 
 
80
 
# Exceptions that doSelect might return frequently
81
 
_NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method')
82
 
_NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
83
 
 
84
 
def dictRemove(dct, value):
85
 
    try:
86
 
        del dct[value]
87
 
    except KeyError:
88
 
        pass
89
 
 
90
 
def raiseException(e):
91
 
    raise e
92
 
 
93
 
class ThreadedSelectReactor(posixbase.PosixReactorBase):
94
 
    """A threaded select() based reactor - runs on all POSIX platforms and on
95
 
    Win32.
96
 
    """
97
 
    implements(IReactorFDSet)
98
 
 
99
 
    def __init__(self):
100
 
        threadable.init(1)
101
 
        self.reads = {}
102
 
        self.writes = {}
103
 
        self.toThreadQueue = Queue()
104
 
        self.toMainThread = Queue()
105
 
        self.workerThread = None
106
 
        self.mainWaker = None
107
 
        posixbase.PosixReactorBase.__init__(self)
108
 
        self.addSystemEventTrigger('after', 'shutdown', self._mainLoopShutdown)
109
 
 
110
 
    def wakeUp(self):
111
 
        # we want to wake up from any thread
112
 
        self.waker.wakeUp()
113
 
 
114
 
    def callLater(self, *args, **kw):
115
 
        tple = posixbase.PosixReactorBase.callLater(self, *args, **kw)
116
 
        self.wakeUp()
117
 
        return tple
118
 
    
119
 
    def _sendToMain(self, msg, *args):
120
 
        #print >>sys.stderr, 'sendToMain', msg, args
121
 
        self.toMainThread.put((msg, args))
122
 
        if self.mainWaker is not None:
123
 
            self.mainWaker()
124
 
 
125
 
    def _sendToThread(self, fn, *args):
126
 
        #print >>sys.stderr, 'sendToThread', fn, args
127
 
        self.toThreadQueue.put((fn, args))
128
 
    
129
 
    def _preenDescriptorsInThread(self):
130
 
        log.msg("Malformed file descriptor found.  Preening lists.")
131
 
        readers = self.reads.keys()
132
 
        writers = self.writes.keys()
133
 
        self.reads.clear()
134
 
        self.writes.clear()
135
 
        for selDict, selList in ((self.reads, readers), (self.writes, writers)):
136
 
            for selectable in selList:
137
 
                try:
138
 
                    select.select([selectable], [selectable], [selectable], 0)
139
 
                except:
140
 
                    log.msg("bad descriptor %s" % selectable)
141
 
                else:
142
 
                    selDict[selectable] = 1
143
 
 
144
 
    def _workerInThread(self):
145
 
        try:
146
 
            while 1:
147
 
                fn, args = self.toThreadQueue.get()
148
 
                #print >>sys.stderr, "worker got", fn, args
149
 
                fn(*args)
150
 
        except SystemExit:
151
 
            pass
152
 
        except:
153
 
            f = failure.Failure()
154
 
            self._sendToMain('Failure', f)
155
 
        #print >>sys.stderr, "worker finished"
156
 
    
157
 
    def _doSelectInThread(self, timeout):
158
 
        """Run one iteration of the I/O monitor loop.
159
 
 
160
 
        This will run all selectables who had input or output readiness
161
 
        waiting for them.
162
 
        """
163
 
        reads = self.reads
164
 
        writes = self.writes
165
 
        while 1:
166
 
            try:
167
 
                r, w, ignored = _select(reads.keys(),
168
 
                                        writes.keys(),
169
 
                                        [], timeout)
170
 
                break
171
 
            except ValueError, ve:
172
 
                # Possibly a file descriptor has gone negative?
173
 
                log.err()
174
 
                self._preenDescriptorsInThread()
175
 
            except TypeError, te:
176
 
                # Something *totally* invalid (object w/o fileno, non-integral
177
 
                # result) was passed
178
 
                log.err()
179
 
                self._preenDescriptorsInThread()
180
 
            except (select.error, IOError), se:
181
 
                # select(2) encountered an error
182
 
                if se.args[0] in (0, 2):
183
 
                    # windows does this if it got an empty list
184
 
                    if (not reads) and (not writes):
185
 
                        return
186
 
                    else:
187
 
                        raise
188
 
                elif se.args[0] == EINTR:
189
 
                    return
190
 
                elif se.args[0] == EBADF:
191
 
                    self._preenDescriptorsInThread()
192
 
                else:
193
 
                    # OK, I really don't know what's going on.  Blow up.
194
 
                    raise
195
 
        self._sendToMain('Notify', r, w)
196
 
        
197
 
    def _process_Notify(self, r, w):
198
 
        #print >>sys.stderr, "_process_Notify"
199
 
        reads = self.reads
200
 
        writes = self.writes
201
 
    
202
 
        _drdw = self._doReadOrWrite
203
 
        _logrun = log.callWithLogger
204
 
        for selectables, method, dct in ((r, "doRead", reads), (w, "doWrite", writes)):
205
 
            for selectable in selectables:
206
 
                # if this was disconnected in another thread, kill it.
207
 
                if selectable not in dct:
208
 
                    continue
209
 
                # This for pausing input when we're not ready for more.
210
 
                _logrun(selectable, _drdw, selectable, method, dct)
211
 
        #print >>sys.stderr, "done _process_Notify"
212
 
 
213
 
    def _process_Failure(self, f):
214
 
        f.raiseException()
215
 
 
216
 
    _doIterationInThread = _doSelectInThread
217
 
 
218
 
    def ensureWorkerThread(self):
219
 
        if self.workerThread is None or not self.workerThread.isAlive():
220
 
            self.workerThread = Thread(target=self._workerInThread)
221
 
            self.workerThread.setDaemon(True)
222
 
            self.workerThread.start()
223
 
    
224
 
    def doThreadIteration(self, timeout):
225
 
        self._sendToThread(self._doIterationInThread, timeout)
226
 
        self.ensureWorkerThread()
227
 
        #print >>sys.stderr, 'getting...'
228
 
        msg, args = self.toMainThread.get()
229
 
        #print >>sys.stderr, 'got', msg, args
230
 
        getattr(self, '_process_' + msg)(*args)
231
 
    
232
 
    doIteration = doThreadIteration
233
 
 
234
 
    def mainLoopBegin(self):
235
 
        if self.running:
236
 
            self.runUntilCurrent()
237
 
 
238
 
    def _interleave(self):
239
 
        while self.running:
240
 
            #print >>sys.stderr, "runUntilCurrent"
241
 
            self.runUntilCurrent()
242
 
            t2 = self.timeout()
243
 
            t = self.running and t2
244
 
            self._sendToThread(self._doIterationInThread, t)
245
 
            #print >>sys.stderr, "yielding"
246
 
            yield None
247
 
            #print >>sys.stderr, "fetching"
248
 
            msg, args = self.toMainThread.get_nowait()
249
 
            getattr(self, '_process_' + msg)(*args)
250
 
 
251
 
    def interleave(self, waker, *args, **kw):
252
 
        """
253
 
        interleave(waker) interleaves this reactor with the
254
 
        current application by moving the blocking parts of
255
 
        the reactor (select() in this case) to a separate
256
 
        thread.  This is typically useful for integration with
257
 
        GUI applications which have their own event loop
258
 
        already running.
259
 
 
260
 
        See the module docstring for more information.
261
 
        """
262
 
        self.startRunning(*args, **kw)
263
 
        loop = self._interleave()
264
 
        def mainWaker(waker=waker, loop=loop):
265
 
            #print >>sys.stderr, "mainWaker()"
266
 
            waker(loop.next)
267
 
        self.mainWaker = mainWaker
268
 
        loop.next()
269
 
        self.ensureWorkerThread()
270
 
    
271
 
    def _mainLoopShutdown(self):
272
 
        self.mainWaker = None
273
 
        if self.workerThread is not None:
274
 
            #print >>sys.stderr, 'getting...'
275
 
            self._sendToThread(raiseException, SystemExit)
276
 
            self.wakeUp()
277
 
            try:
278
 
                while 1:
279
 
                    msg, args = self.toMainThread.get_nowait()
280
 
                    #print >>sys.stderr, "ignored:", (msg, args)
281
 
            except Empty:
282
 
                pass
283
 
            self.workerThread.join()
284
 
            self.workerThread = None
285
 
        try:
286
 
            while 1:
287
 
                fn, args = self.toThreadQueue.get_nowait()
288
 
                if fn is self._doIterationInThread:
289
 
                    log.msg('Iteration is still in the thread queue!')
290
 
                elif fn is raiseException and args[0] is SystemExit:
291
 
                    pass
292
 
                else:
293
 
                    fn(*args)
294
 
        except Empty:
295
 
            pass
296
 
 
297
 
    def _doReadOrWrite(self, selectable, method, dict):
298
 
        try:
299
 
            why = getattr(selectable, method)()
300
 
            handfn = getattr(selectable, 'fileno', None)
301
 
            if not handfn:
302
 
                why = _NO_FILENO
303
 
            elif handfn() == -1:
304
 
                why = _NO_FILEDESC
305
 
        except:
306
 
            why = sys.exc_info()[1]
307
 
            log.err()
308
 
        if why:
309
 
            self._disconnectSelectable(selectable, why, method == "doRead")
310
 
    
311
 
    def addReader(self, reader):
312
 
        """Add a FileDescriptor for notification of data available to read.
313
 
        """
314
 
        self._sendToThread(self.reads.__setitem__, reader, 1)
315
 
        self.wakeUp()
316
 
 
317
 
    def addWriter(self, writer):
318
 
        """Add a FileDescriptor for notification of data available to write.
319
 
        """
320
 
        self._sendToThread(self.writes.__setitem__, writer, 1)
321
 
        self.wakeUp()
322
 
 
323
 
    def removeReader(self, reader):
324
 
        """Remove a Selectable for notification of data available to read.
325
 
        """
326
 
        self._sendToThread(dictRemove, self.reads, reader)
327
 
 
328
 
    def removeWriter(self, writer):
329
 
        """Remove a Selectable for notification of data available to write.
330
 
        """
331
 
        self._sendToThread(dictRemove, self.writes, writer)
332
 
 
333
 
    def removeAll(self):
334
 
        return self._removeAll(self.reads, self.writes)
335
 
 
336
 
    def run(self, installSignalHandlers=1):
337
 
        self.startRunning(installSignalHandlers=installSignalHandlers)
338
 
        self.mainLoop()
339
 
 
340
 
    def mainLoop(self):
341
 
        q = Queue()
342
 
        self.interleave(q.put)
343
 
        while self.running:
344
 
            try:
345
 
                q.get()()
346
 
            except StopIteration:
347
 
                break
348
 
        
349
 
    
350
 
 
351
 
def install():
352
 
    """Configure the twisted mainloop to be run using the select() reactor.
353
 
    """
354
 
    reactor = ThreadedSelectReactor()
355
 
    from twisted.internet.main import installReactor
356
 
    installReactor(reactor)
357
 
    return reactor
358
 
 
359
 
__all__ = ['install']