~canonical-livepatch-dependencies/canonical-livepatch-service-dependencies/twisted

« back to all changes in this revision

Viewing changes to docs/core/examples/threadedselect/blockingdemo.py

  • Committer: Free Ekanayaka
  • Date: 2016-07-01 12:22:33 UTC
  • Revision ID: free.ekanayaka@canonical.com-20160701122233-nh55w514zwzoz1ip
Tags: upstream-16.2.0
ImportĀ upstreamĀ versionĀ 16.2.0

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
 
 
5
from twisted.internet import _threadedselect
 
6
_threadedselect.install()
 
7
 
 
8
from twisted.internet.defer import Deferred
 
9
from twisted.python.failure import Failure
 
10
from twisted.internet import reactor
 
11
from twisted.python.runtime import seconds
 
12
from itertools import count
 
13
from Queue import Queue, Empty
 
14
 
 
15
class TwistedManager(object):
 
16
    def __init__(self):
 
17
        self.twistedQueue = Queue()
 
18
        self.key = count()
 
19
        self.results = {}
 
20
 
 
21
    def getKey(self):
 
22
        # get a unique identifier
 
23
        return self.key.next()
 
24
 
 
25
    def start(self):
 
26
        # start the reactor
 
27
        reactor.interleave(self.twistedQueue.put)
 
28
 
 
29
    def _stopIterating(self, value, key):
 
30
        self.results[key] = value
 
31
 
 
32
    def stop(self):
 
33
        # stop the reactor
 
34
        key = self.getKey()
 
35
        reactor.addSystemEventTrigger('after', 'shutdown',
 
36
            self._stopIterating, True, key)
 
37
        reactor.stop()
 
38
        self.iterate(key)
 
39
 
 
40
    def getDeferred(self, d):
 
41
        # get the result of a deferred or raise if it failed
 
42
        key = self.getKey()
 
43
        d.addBoth(self._stopIterating, key)
 
44
        res = self.iterate(key)
 
45
        if isinstance(res, Failure):
 
46
            res.raiseException()
 
47
        return res
 
48
    
 
49
    def poll(self, noLongerThan=1.0):
 
50
        # poll the reactor for up to noLongerThan seconds
 
51
        base = seconds()
 
52
        try:
 
53
            while (seconds() - base) <= noLongerThan:
 
54
                callback = self.twistedQueue.get_nowait()
 
55
                callback()
 
56
        except Empty:
 
57
            pass
 
58
    
 
59
    def iterate(self, key=None):
 
60
        # iterate the reactor until it has the result we're looking for
 
61
        while key not in self.results:
 
62
            callback = self.twistedQueue.get()
 
63
            callback()
 
64
        return self.results.pop(key)
 
65
 
 
66
def fakeDeferred(msg):
 
67
    d = Deferred()
 
68
    def cb():
 
69
        print "deferred called back"
 
70
        d.callback(msg)
 
71
    reactor.callLater(2, cb)
 
72
    return d
 
73
 
 
74
def fakeCallback():
 
75
    print "twisted is still running"
 
76
 
 
77
def main():
 
78
    m = TwistedManager()
 
79
    print "starting"
 
80
    m.start()
 
81
    print "setting up a 1sec callback"
 
82
    reactor.callLater(1, fakeCallback)
 
83
    print "getting a deferred"
 
84
    res = m.getDeferred(fakeDeferred("got it!"))
 
85
    print "got the deferred:", res
 
86
    print "stopping"
 
87
    m.stop()
 
88
    print "stopped"
 
89
 
 
90
 
 
91
if __name__ == '__main__':
 
92
    main()