1
# Copyright (c) Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
from twisted.internet import _threadedselect
6
_threadedselect.install()
8
from twisted.internet.defer import Deferred
9
from twisted.python.failure import Failure
10
from twisted.internet import reactor
11
from twisted.python.runtime import seconds
12
from itertools import count
13
from Queue import Queue, Empty
15
class TwistedManager(object):
17
self.twistedQueue = Queue()
22
# get a unique identifier
23
return self.key.next()
27
reactor.interleave(self.twistedQueue.put)
29
def _stopIterating(self, value, key):
30
self.results[key] = value
35
reactor.addSystemEventTrigger('after', 'shutdown',
36
self._stopIterating, True, key)
40
def getDeferred(self, d):
41
# get the result of a deferred or raise if it failed
43
d.addBoth(self._stopIterating, key)
44
res = self.iterate(key)
45
if isinstance(res, Failure):
49
def poll(self, noLongerThan=1.0):
50
# poll the reactor for up to noLongerThan seconds
53
while (seconds() - base) <= noLongerThan:
54
callback = self.twistedQueue.get_nowait()
59
def iterate(self, key=None):
60
# iterate the reactor until it has the result we're looking for
61
while key not in self.results:
62
callback = self.twistedQueue.get()
64
return self.results.pop(key)
66
def fakeDeferred(msg):
69
print "deferred called back"
71
reactor.callLater(2, cb)
75
print "twisted is still running"
81
print "setting up a 1sec callback"
82
reactor.callLater(1, fakeCallback)
83
print "getting a deferred"
84
res = m.getDeferred(fakeDeferred("got it!"))
85
print "got the deferred:", res
91
if __name__ == '__main__':