1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
# Author: Clark Evans (cce@clarkevans.com)
11
This implements the various flow controllers, that is, those things which run
17
from twisted.internet import defer
19
class Block(Controller,Stage):
21
A controller which blocks on Cooperate events
23
This converts a Stage into an iterable which can be used directly in python
24
for loops and other iteratable constructs. It does this by eating any
25
Cooperate values and sleeping. This is largely helpful for testing or
26
within a threaded environment. It converts other stages into one which
27
does not emit cooperate events, ie::
29
[1,2, Cooperate(), 3] => [1,2,3]
31
def __init__(self, stage, *trap):
33
self._stage = wrap(stage,*trap)
34
self.block = time.sleep
37
""" fetch the next value from the Stage flow """
40
result = stage._yield()
42
if isinstance(result, Cooperate):
43
if result.__class__ == Cooperate:
44
self.block(result.timeout)
46
raise Unsupported(result)
49
class Deferred(Controller, defer.Deferred):
51
wraps up a Stage with a Deferred interface
53
In this version, the results of the Stage are used to construct a list of
54
results and then sent to deferred. Further, in this version Cooperate is
55
implemented via reactor's callLater.
59
from twisted.internet import reactor
60
from twisted.flow import flow
63
d = flow.Deferred([1,2,3])
67
def __init__(self, stage, *trap):
68
defer.Deferred.__init__(self)
70
self._stage = wrap(stage, *trap)
73
def results(self, results):
74
self._results.extend(results)
76
def _execute(self, dummy = None):
81
self.results(cmd.results)
85
self.callback(self._results)
90
error = cmd.failure.check(*cmd._trap)
92
self._results.append(error)
94
self.errback(cmd.failure)
97
if isinstance(result, CallLater):
98
result.callLater(self._execute)
100
raise Unsupported(result)