~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/_threadedselect.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_internet -*-
 
2
# $Id: default.py,v 1.90 2004/01/06 22:35:22 warner Exp $
 
3
#
 
4
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
5
# See LICENSE for details.
 
6
 
 
7
from __future__ import generators
 
8
 
 
9
"""
 
10
Threaded select reactor
 
11
 
 
12
Maintainer: Bob Ippolito
 
13
 
 
14
 
 
15
The threadedselectreactor is a specialized reactor for integrating with
 
16
arbitrary foreign event loop, such as those you find in GUI toolkits.
 
17
 
 
18
There are three things you'll need to do to use this reactor.
 
19
 
 
20
Install the reactor at the beginning of your program, before importing
 
21
the rest of Twisted::
 
22
 
 
23
    | from twisted.internet import _threadedselect
 
24
    | _threadedselect.install()
 
25
 
 
26
Interleave this reactor with your foreign event loop, at some point after
 
27
your event loop is initialized::
 
28
 
 
29
    | from twisted.internet import reactor
 
30
    | reactor.interleave(foreignEventLoopWakerFunction)
 
31
    | self.addSystemEventTrigger('after', 'shutdown', foreignEventLoopStop)
 
32
 
 
33
Instead of shutting down the foreign event loop directly, shut down the
 
34
reactor::
 
35
 
 
36
    | from twisted.internet import reactor
 
37
    | reactor.stop()
 
38
 
 
39
In order for Twisted to do its work in the main thread (the thread that
 
40
interleave is called from), a waker function is necessary.  The waker function
 
41
will be called from a "background" thread with one argument: func.
 
42
The waker function's purpose is to call func() from the main thread.
 
43
Many GUI toolkits ship with appropriate waker functions.
 
44
Some examples of this are wxPython's wx.callAfter (may be wxCallAfter in
 
45
older versions of wxPython) or PyObjC's PyObjCTools.AppHelper.callAfter.
 
46
These would be used in place of "foreignEventLoopWakerFunction" in the above
 
47
example.
 
48
 
 
49
The other integration point at which the foreign event loop and this reactor
 
50
must integrate is shutdown.  In order to ensure clean shutdown of Twisted,
 
51
you must allow for Twisted to come to a complete stop before quitting the
 
52
application.  Typically, you will do this by setting up an after shutdown
 
53
trigger to stop your foreign event loop, and call reactor.stop() where you
 
54
would normally have initiated the shutdown procedure for the foreign event
 
55
loop.  Shutdown functions that could be used in place of
 
56
"foreignEventloopStop" would be the ExitMainLoop method of the wxApp instance
 
57
with wxPython, or the PyObjCTools.AppHelper.stopEventLoop function.
 
58
"""
 
59
 
 
60
from threading import Thread
 
61
from Queue import Queue, Empty
 
62
from time import sleep
 
63
import sys
 
64
 
 
65
from zope.interface import implements
 
66
 
 
67
from twisted.internet.interfaces import IReactorFDSet
 
68
from twisted.internet import error
 
69
from twisted.internet import posixbase
 
70
from twisted.python import log, failure, threadable
 
71
from twisted.persisted import styles
 
72
from twisted.python.runtime import platformType
 
73
 
 
74
import select
 
75
from errno import EINTR, EBADF
 
76
 
 
77
from twisted.internet.selectreactor import _select
 
78
 
 
79
# Exceptions that doSelect might return frequently
 
80
_NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method')
 
81
_NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
 
82
 
 
83
def dictRemove(dct, value):
 
84
    try:
 
85
        del dct[value]
 
86
    except KeyError:
 
87
        pass
 
88
 
 
89
def raiseException(e):
 
90
    raise e
 
91
 
 
92
class ThreadedSelectReactor(posixbase.PosixReactorBase):
 
93
    """A threaded select() based reactor - runs on all POSIX platforms and on
 
94
    Win32.
 
95
    """
 
96
    implements(IReactorFDSet)
 
97
 
 
98
    def __init__(self):
 
99
        threadable.init(1)
 
100
        self.reads = {}
 
101
        self.writes = {}
 
102
        self.toThreadQueue = Queue()
 
103
        self.toMainThread = Queue()
 
104
        self.workerThread = None
 
105
        self.mainWaker = None
 
106
        posixbase.PosixReactorBase.__init__(self)
 
107
        self.addSystemEventTrigger('after', 'shutdown', self._mainLoopShutdown)
 
108
 
 
109
    def wakeUp(self):
 
110
        # we want to wake up from any thread
 
111
        self.waker.wakeUp()
 
112
 
 
113
    def callLater(self, *args, **kw):
 
114
        tple = posixbase.PosixReactorBase.callLater(self, *args, **kw)
 
115
        self.wakeUp()
 
116
        return tple
 
117
 
 
118
    def _sendToMain(self, msg, *args):
 
119
        #print >>sys.stderr, 'sendToMain', msg, args
 
120
        self.toMainThread.put((msg, args))
 
121
        if self.mainWaker is not None:
 
122
            self.mainWaker()
 
123
 
 
124
    def _sendToThread(self, fn, *args):
 
125
        #print >>sys.stderr, 'sendToThread', fn, args
 
126
        self.toThreadQueue.put((fn, args))
 
127
 
 
128
    def _preenDescriptorsInThread(self):
 
129
        log.msg("Malformed file descriptor found.  Preening lists.")
 
130
        readers = self.reads.keys()
 
131
        writers = self.writes.keys()
 
132
        self.reads.clear()
 
133
        self.writes.clear()
 
134
        for selDict, selList in ((self.reads, readers), (self.writes, writers)):
 
135
            for selectable in selList:
 
136
                try:
 
137
                    select.select([selectable], [selectable], [selectable], 0)
 
138
                except:
 
139
                    log.msg("bad descriptor %s" % selectable)
 
140
                else:
 
141
                    selDict[selectable] = 1
 
142
 
 
143
    def _workerInThread(self):
 
144
        try:
 
145
            while 1:
 
146
                fn, args = self.toThreadQueue.get()
 
147
                #print >>sys.stderr, "worker got", fn, args
 
148
                fn(*args)
 
149
        except SystemExit:
 
150
            pass # exception indicates this thread should exit
 
151
        except:
 
152
            f = failure.Failure()
 
153
            self._sendToMain('Failure', f)
 
154
        #print >>sys.stderr, "worker finished"
 
155
 
 
156
    def _doSelectInThread(self, timeout):
 
157
        """Run one iteration of the I/O monitor loop.
 
158
 
 
159
        This will run all selectables who had input or output readiness
 
160
        waiting for them.
 
161
        """
 
162
        reads = self.reads
 
163
        writes = self.writes
 
164
        while 1:
 
165
            try:
 
166
                r, w, ignored = _select(reads.keys(),
 
167
                                        writes.keys(),
 
168
                                        [], timeout)
 
169
                break
 
170
            except ValueError, ve:
 
171
                # Possibly a file descriptor has gone negative?
 
172
                log.err()
 
173
                self._preenDescriptorsInThread()
 
174
            except TypeError, te:
 
175
                # Something *totally* invalid (object w/o fileno, non-integral
 
176
                # result) was passed
 
177
                log.err()
 
178
                self._preenDescriptorsInThread()
 
179
            except (select.error, IOError), se:
 
180
                # select(2) encountered an error
 
181
                if se.args[0] in (0, 2):
 
182
                    # windows does this if it got an empty list
 
183
                    if (not reads) and (not writes):
 
184
                        return
 
185
                    else:
 
186
                        raise
 
187
                elif se.args[0] == EINTR:
 
188
                    return
 
189
                elif se.args[0] == EBADF:
 
190
                    self._preenDescriptorsInThread()
 
191
                else:
 
192
                    # OK, I really don't know what's going on.  Blow up.
 
193
                    raise
 
194
        self._sendToMain('Notify', r, w)
 
195
 
 
196
    def _process_Notify(self, r, w):
 
197
        #print >>sys.stderr, "_process_Notify"
 
198
        reads = self.reads
 
199
        writes = self.writes
 
200
 
 
201
        _drdw = self._doReadOrWrite
 
202
        _logrun = log.callWithLogger
 
203
        for selectables, method, dct in ((r, "doRead", reads), (w, "doWrite", writes)):
 
204
            for selectable in selectables:
 
205
                # if this was disconnected in another thread, kill it.
 
206
                if selectable not in dct:
 
207
                    continue
 
208
                # This for pausing input when we're not ready for more.
 
209
                _logrun(selectable, _drdw, selectable, method, dct)
 
210
        #print >>sys.stderr, "done _process_Notify"
 
211
 
 
212
    def _process_Failure(self, f):
 
213
        f.raiseException()
 
214
 
 
215
    _doIterationInThread = _doSelectInThread
 
216
 
 
217
    def ensureWorkerThread(self):
 
218
        if self.workerThread is None or not self.workerThread.isAlive():
 
219
            self.workerThread = Thread(target=self._workerInThread)
 
220
            self.workerThread.start()
 
221
 
 
222
    def doThreadIteration(self, timeout):
 
223
        self._sendToThread(self._doIterationInThread, timeout)
 
224
        self.ensureWorkerThread()
 
225
        #print >>sys.stderr, 'getting...'
 
226
        msg, args = self.toMainThread.get()
 
227
        #print >>sys.stderr, 'got', msg, args
 
228
        getattr(self, '_process_' + msg)(*args)
 
229
 
 
230
    doIteration = doThreadIteration
 
231
 
 
232
    def _interleave(self):
 
233
        while self.running:
 
234
            #print >>sys.stderr, "runUntilCurrent"
 
235
            self.runUntilCurrent()
 
236
            t2 = self.timeout()
 
237
            t = self.running and t2
 
238
            self._sendToThread(self._doIterationInThread, t)
 
239
            #print >>sys.stderr, "yielding"
 
240
            yield None
 
241
            #print >>sys.stderr, "fetching"
 
242
            msg, args = self.toMainThread.get_nowait()
 
243
            getattr(self, '_process_' + msg)(*args)
 
244
 
 
245
    def interleave(self, waker, *args, **kw):
 
246
        """
 
247
        interleave(waker) interleaves this reactor with the
 
248
        current application by moving the blocking parts of
 
249
        the reactor (select() in this case) to a separate
 
250
        thread.  This is typically useful for integration with
 
251
        GUI applications which have their own event loop
 
252
        already running.
 
253
 
 
254
        See the module docstring for more information.
 
255
        """
 
256
        self.startRunning(*args, **kw)
 
257
        loop = self._interleave()
 
258
        def mainWaker(waker=waker, loop=loop):
 
259
            #print >>sys.stderr, "mainWaker()"
 
260
            waker(loop.next)
 
261
        self.mainWaker = mainWaker
 
262
        loop.next()
 
263
        self.ensureWorkerThread()
 
264
 
 
265
    def _mainLoopShutdown(self):
 
266
        self.mainWaker = None
 
267
        if self.workerThread is not None:
 
268
            #print >>sys.stderr, 'getting...'
 
269
            self._sendToThread(raiseException, SystemExit)
 
270
            self.wakeUp()
 
271
            try:
 
272
                while 1:
 
273
                    msg, args = self.toMainThread.get_nowait()
 
274
                    #print >>sys.stderr, "ignored:", (msg, args)
 
275
            except Empty:
 
276
                pass
 
277
            self.workerThread.join()
 
278
            self.workerThread = None
 
279
        try:
 
280
            while 1:
 
281
                fn, args = self.toThreadQueue.get_nowait()
 
282
                if fn is self._doIterationInThread:
 
283
                    log.msg('Iteration is still in the thread queue!')
 
284
                elif fn is raiseException and args[0] is SystemExit:
 
285
                    pass
 
286
                else:
 
287
                    fn(*args)
 
288
        except Empty:
 
289
            pass
 
290
 
 
291
    def _doReadOrWrite(self, selectable, method, dict):
 
292
        try:
 
293
            why = getattr(selectable, method)()
 
294
            handfn = getattr(selectable, 'fileno', None)
 
295
            if not handfn:
 
296
                why = _NO_FILENO
 
297
            elif handfn() == -1:
 
298
                why = _NO_FILEDESC
 
299
        except:
 
300
            why = sys.exc_info()[1]
 
301
            log.err()
 
302
        if why:
 
303
            self._disconnectSelectable(selectable, why, method == "doRead")
 
304
 
 
305
    def addReader(self, reader):
 
306
        """Add a FileDescriptor for notification of data available to read.
 
307
        """
 
308
        self._sendToThread(self.reads.__setitem__, reader, 1)
 
309
        self.wakeUp()
 
310
 
 
311
    def addWriter(self, writer):
 
312
        """Add a FileDescriptor for notification of data available to write.
 
313
        """
 
314
        self._sendToThread(self.writes.__setitem__, writer, 1)
 
315
        self.wakeUp()
 
316
 
 
317
    def removeReader(self, reader):
 
318
        """Remove a Selectable for notification of data available to read.
 
319
        """
 
320
        self._sendToThread(dictRemove, self.reads, reader)
 
321
 
 
322
    def removeWriter(self, writer):
 
323
        """Remove a Selectable for notification of data available to write.
 
324
        """
 
325
        self._sendToThread(dictRemove, self.writes, writer)
 
326
 
 
327
    def removeAll(self):
 
328
        return self._removeAll(self.reads, self.writes)
 
329
 
 
330
 
 
331
    def getReaders(self):
 
332
        return self.reads.keys()
 
333
 
 
334
 
 
335
    def getWriters(self):
 
336
        return self.writes.keys()
 
337
 
 
338
 
 
339
    def run(self, installSignalHandlers=1):
 
340
        self.startRunning(installSignalHandlers=installSignalHandlers)
 
341
        self.mainLoop()
 
342
 
 
343
    def mainLoop(self):
 
344
        q = Queue()
 
345
        self.interleave(q.put)
 
346
        while self.running:
 
347
            try:
 
348
                q.get()()
 
349
            except StopIteration:
 
350
                break
 
351
 
 
352
 
 
353
 
 
354
def install():
 
355
    """Configure the twisted mainloop to be run using the select() reactor.
 
356
    """
 
357
    reactor = ThreadedSelectReactor()
 
358
    from twisted.internet.main import installReactor
 
359
    installReactor(reactor)
 
360
    return reactor
 
361
 
 
362
__all__ = ['install']