~hudson-openstack/nova/trunk

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/internet/cfreactor.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
 
 
5
"""
 
6
This module provides support for Twisted to interact with CoreFoundation
 
7
CFRunLoops.  This includes Cocoa's NSRunLoop.
 
8
 
 
9
In order to use this support, simply do the following::
 
10
 
 
11
    |  from twisted.internet import cfreactor
 
12
    |  cfreactor.install()
 
13
 
 
14
Then use the twisted.internet APIs as usual.  The other methods here are not
 
15
intended to be called directly under normal use.  However, install can take
 
16
a runLoop kwarg, and run will take a withRunLoop arg if you need to explicitly
 
17
pass a CFRunLoop for some reason.  Otherwise it will make a pretty good guess
 
18
as to which runLoop you want (the current NSRunLoop if PyObjC is imported,
 
19
otherwise the current CFRunLoop.  Either way, if one doesn't exist, it will
 
20
be created).
 
21
 
 
22
Maintainer: Bob Ippolito
 
23
"""
 
24
 
 
25
__all__ = ['install']
 
26
 
 
27
import sys
 
28
 
 
29
# hints for py2app
 
30
import Carbon.CF
 
31
import traceback
 
32
 
 
33
import cfsupport as cf
 
34
 
 
35
from zope.interface import implements
 
36
 
 
37
from twisted.python import log, threadable, failure
 
38
from twisted.internet.interfaces import IReactorFDSet
 
39
from twisted.internet import posixbase, error
 
40
from weakref import WeakKeyDictionary
 
41
from Foundation import NSRunLoop
 
42
from AppKit import NSApp
 
43
 
 
44
# cache two extremely common "failures" without traceback info
 
45
_faildict = {
 
46
    error.ConnectionDone: failure.Failure(error.ConnectionDone()),
 
47
    error.ConnectionLost: failure.Failure(error.ConnectionLost()),
 
48
}
 
49
 
 
50
class SelectableSocketWrapper(object):
 
51
    _objCache = WeakKeyDictionary()
 
52
 
 
53
    cf = None
 
54
    def socketWrapperForReactorAndObject(klass, reactor, obj):
 
55
        _objCache = klass._objCache
 
56
        if obj in _objCache:
 
57
            return _objCache[obj]
 
58
        v = _objCache[obj] = klass(reactor, obj)
 
59
        return v
 
60
    socketWrapperForReactorAndObject = classmethod(socketWrapperForReactorAndObject)
 
61
        
 
62
    def __init__(self, reactor, obj):
 
63
        if self.cf:
 
64
            raise ValueError, "This socket wrapper is already initialized"
 
65
        self.reactor = reactor
 
66
        self.obj = obj
 
67
        obj._orig_ssw_connectionLost = obj.connectionLost
 
68
        obj.connectionLost = self.objConnectionLost
 
69
        self.fd = obj.fileno()
 
70
        self.writing = False
 
71
        self.reading = False
 
72
        self.wouldRead = False
 
73
        self.wouldWrite = False
 
74
        self.cf = cf.PyCFSocket(obj.fileno(), self.doRead, self.doWrite, self.doConnect)
 
75
        self.cf.stopWriting()
 
76
        reactor.getRunLoop().addSocket(self.cf)
 
77
       
 
78
    def __repr__(self):
 
79
        return 'SSW(fd=%r r=%r w=%r x=%08x o=%08x)' % (self.fd, int(self.reading), int(self.writing), id(self), id(self.obj))
 
80
 
 
81
    def objConnectionLost(self, *args, **kwargs):
 
82
        obj = self.obj
 
83
        self.reactor.removeReader(obj)
 
84
        self.reactor.removeWriter(obj)
 
85
        obj.connectionLost = obj._orig_ssw_connectionLost
 
86
        obj.connectionLost(*args, **kwargs)
 
87
        try:
 
88
            del self._objCache[obj]
 
89
        except:
 
90
            pass
 
91
        self.obj = None
 
92
        self.cf = None
 
93
 
 
94
    def doConnect(self, why):
 
95
        pass
 
96
 
 
97
    def startReading(self):
 
98
        self.cf.startReading()
 
99
        self.reading = True
 
100
        if self.wouldRead:
 
101
            if not self.reactor.running:
 
102
                self.reactor.callLater(0, self.doRead)
 
103
            else:
 
104
                self.doRead()
 
105
            self.wouldRead = False
 
106
        return self
 
107
 
 
108
    def stopReading(self):
 
109
        self.cf.stopReading()
 
110
        self.reading = False
 
111
        self.wouldRead = False
 
112
        return self
 
113
 
 
114
    def startWriting(self):
 
115
        self.cf.startWriting()
 
116
        self.writing = True
 
117
        if self.wouldWrite:
 
118
            if not self.reactor.running:
 
119
                self.reactor.callLater(0, self.doWrite)
 
120
            else:
 
121
                self.doWrite()
 
122
            self.wouldWrite = False
 
123
        return self
 
124
 
 
125
    def stopWriting(self):
 
126
        self.cf.stopWriting()
 
127
        self.writing = False
 
128
        self.wouldWrite = False
 
129
    
 
130
    def _finishReadOrWrite(self, fn, faildict=_faildict):
 
131
        try:
 
132
            why = fn()
 
133
        except:
 
134
            why = sys.exc_info()[1]
 
135
            log.err()
 
136
        if why:
 
137
            try:
 
138
                f = faildict.get(why.__class__) or failure.Failure(why)
 
139
                self.objConnectionLost(f)
 
140
            except:
 
141
                log.err()
 
142
        if self.reactor.running:
 
143
            self.reactor.simulate()
 
144
 
 
145
    def doRead(self):
 
146
        obj = self.obj
 
147
        if not obj:
 
148
            return
 
149
        if not self.reading:
 
150
            self.wouldRead = True
 
151
            if self.reactor.running:
 
152
                self.reactor.simulate()
 
153
            return
 
154
        self._finishReadOrWrite(obj.doRead)
 
155
 
 
156
    def doWrite(self):
 
157
        obj = self.obj
 
158
        if not obj:
 
159
            return
 
160
        if not self.writing:
 
161
            self.wouldWrite = True
 
162
            if self.reactor.running:
 
163
                self.reactor.simulate()
 
164
            return
 
165
        self._finishReadOrWrite(obj.doWrite)
 
166
 
 
167
    def __hash__(self):
 
168
        return hash(self.fd)
 
169
 
 
170
class CFReactor(posixbase.PosixReactorBase):
 
171
    implements(IReactorFDSet)
 
172
    # how long to poll if we're don't care about signals
 
173
    longIntervalOfTime = 60.0 
 
174
 
 
175
    # how long we should poll if we do care about signals
 
176
    shortIntervalOfTime = 1.0
 
177
 
 
178
    # don't set this
 
179
    pollInterval = longIntervalOfTime
 
180
 
 
181
    def __init__(self, runLoop=None):
 
182
        self.readers = {}
 
183
        self.writers = {}
 
184
        self.running = 0
 
185
        self.crashing = False
 
186
        self._doRunUntilCurrent = True
 
187
        self.timer = None
 
188
        self.runLoop = None
 
189
        self.nsRunLoop = None
 
190
        self.didStartRunLoop = False
 
191
        if runLoop is not None:
 
192
            self.getRunLoop(runLoop)
 
193
        posixbase.PosixReactorBase.__init__(self)
 
194
 
 
195
    def getRunLoop(self, runLoop=None):
 
196
        if self.runLoop is None:
 
197
            self.nsRunLoop = runLoop or NSRunLoop.currentRunLoop()
 
198
            self.runLoop = cf.PyCFRunLoop(self.nsRunLoop.getCFRunLoop())
 
199
        return self.runLoop
 
200
    
 
201
    def addReader(self, reader):
 
202
        self.readers[reader] = SelectableSocketWrapper.socketWrapperForReactorAndObject(self, reader).startReading()
 
203
 
 
204
    def addWriter(self, writer):
 
205
        self.writers[writer] = SelectableSocketWrapper.socketWrapperForReactorAndObject(self, writer).startWriting()
 
206
 
 
207
    def removeReader(self, reader):
 
208
        wrapped = self.readers.get(reader, None)
 
209
        if wrapped is not None:
 
210
            del self.readers[reader]
 
211
            wrapped.stopReading()
 
212
 
 
213
    def removeWriter(self, writer):
 
214
        wrapped = self.writers.get(writer, None)
 
215
        if wrapped is not None:
 
216
            del self.writers[writer]
 
217
            wrapped.stopWriting()
 
218
 
 
219
 
 
220
    def getReaders(self):
 
221
        return self.readers.keys()
 
222
 
 
223
 
 
224
    def getWriters(self):
 
225
        return self.writers.keys()
 
226
 
 
227
 
 
228
    def removeAll(self):
 
229
        r = self.readers.keys()
 
230
        for s in self.readers.itervalues():
 
231
            s.stopReading()
 
232
        for s in self.writers.itervalues():
 
233
            s.stopWriting()
 
234
        self.readers.clear()
 
235
        self.writers.clear()
 
236
        return r
 
237
        
 
238
    def run(self, installSignalHandlers=1, withRunLoop=None):
 
239
        if self.running:
 
240
            raise ValueError, "Reactor already running"
 
241
        if installSignalHandlers:
 
242
            self.pollInterval = self.shortIntervalOfTime
 
243
        runLoop = self.getRunLoop(withRunLoop)
 
244
        self._startup()
 
245
       
 
246
        self.startRunning(installSignalHandlers=installSignalHandlers)
 
247
 
 
248
        self.running = True
 
249
        if NSApp() is None and self.nsRunLoop.currentMode() is None:
 
250
            # Most of the time the NSRunLoop will have already started,
 
251
            # but in this case it wasn't.
 
252
            runLoop.run()
 
253
            self.crashing = False
 
254
            self.didStartRunLoop = True
 
255
 
 
256
    def callLater(self, howlong, *args, **kwargs):
 
257
        rval = posixbase.PosixReactorBase.callLater(self, howlong, *args, **kwargs)
 
258
        if self.timer:
 
259
            timeout = self.timeout()
 
260
            if timeout is None:
 
261
                timeout = howlong
 
262
            sleepUntil = cf.now() + min(timeout, howlong)
 
263
            if sleepUntil < self.timer.getNextFireDate():
 
264
                self.timer.setNextFireDate(sleepUntil)
 
265
        else:
 
266
            pass
 
267
        return rval
 
268
        
 
269
    def iterate(self, howlong=0.0):
 
270
        if self.running:
 
271
            raise ValueError, "Can't iterate a running reactor"
 
272
        self.runUntilCurrent()
 
273
        self.doIteration(howlong)
 
274
        
 
275
    def doIteration(self, howlong):
 
276
        if self.running:
 
277
            raise ValueError, "Can't iterate a running reactor"
 
278
        howlong = howlong or 0.01
 
279
        pi = self.pollInterval
 
280
        self.pollInterval = howlong
 
281
        self._doRunUntilCurrent = False
 
282
        self.run()
 
283
        self._doRunUntilCurrent = True
 
284
        self.pollInterval = pi
 
285
 
 
286
    def simulate(self):
 
287
        if self.crashing:
 
288
            return
 
289
        if not self.running:
 
290
            raise ValueError, "You can't simulate a stopped reactor"
 
291
        if self._doRunUntilCurrent:
 
292
            self.runUntilCurrent()
 
293
        if self.crashing:
 
294
            return
 
295
        if self.timer is None:
 
296
            return
 
297
        nap = self.timeout()
 
298
        if nap is None:
 
299
            nap = self.pollInterval
 
300
        else:
 
301
            nap = min(self.pollInterval, nap)
 
302
        if self.running:
 
303
            self.timer.setNextFireDate(cf.now() + nap)
 
304
        if not self._doRunUntilCurrent:
 
305
            self.crash()
 
306
        
 
307
    def _startup(self):
 
308
        if self.running:
 
309
            raise ValueError, "Can't bootstrap a running reactor"
 
310
        self.timer = cf.PyCFRunLoopTimer(cf.now(), self.pollInterval, self.simulate)
 
311
        self.runLoop.addTimer(self.timer)
 
312
 
 
313
    def cleanup(self):
 
314
        pass
 
315
 
 
316
    def sigInt(self, *args):
 
317
        self.callLater(0.0, self.stop)
 
318
 
 
319
    def crash(self):
 
320
        if not self.running:
 
321
            raise ValueError, "Can't crash a stopped reactor"
 
322
        posixbase.PosixReactorBase.crash(self)
 
323
        self.crashing = True
 
324
        if self.timer is not None:
 
325
            self.runLoop.removeTimer(self.timer)
 
326
            self.timer = None
 
327
        if self.didStartRunLoop:
 
328
            self.runLoop.stop()
 
329
 
 
330
    def stop(self):
 
331
        if not self.running:
 
332
            raise ValueError, "Can't stop a stopped reactor"
 
333
        posixbase.PosixReactorBase.stop(self)
 
334
 
 
335
def install(runLoop=None):
 
336
    """Configure the twisted mainloop to be run inside CFRunLoop.
 
337
    """
 
338
    reactor = CFReactor(runLoop=runLoop)
 
339
    reactor.addSystemEventTrigger('after', 'shutdown', reactor.cleanup)
 
340
    from twisted.internet.main import installReactor
 
341
    installReactor(reactor)
 
342
    return reactor