1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
# Author: Clark Evans (cce@clarkevans.com)
11
Various stages for manipulating data flows, in particular, those stages which
12
take more than one input stages or alternative input, such as a callback.
17
from twisted.python.failure import Failure
21
flow equivalent to map: Map(function, stage, ... )
23
Apply a function to every item yielded and yield the results. If
24
additional stages are passed, the function must take that many arguments
25
and is applied to the items of all lists in parallel. If a list is shorter
26
than another, it is assumed to be extended with None items. If the
27
function is None, the identity function is assumed; if there are multiple
28
list arguments, Map stage returns a sequence consisting of tuples
29
containing the corresponding items from all lists.
36
source = flow.Map(fn,range(4))
39
def __init__(self, func, stage, *stages):
42
self._stage = [wrap(stage)]
44
self._stage.append(wrap(stage))
48
if self.results or self.stop or self.failure:
53
while self._index < len(self._stage):
55
curr = self._stage[idx]
56
instruction = curr._yield()
60
self._curr.append(curr.results.pop(0))
65
self._curr.append(None)
69
self.failure = curr.failure
71
raise AssertionError("flow.Map ; no results, stop or failure?")
75
curr = tuple(self._curr)
78
curr = self.func(*curr)
83
self.failure = Failure()
85
self.results.append(curr)
90
Zips two or more stages into a stream of N tuples
94
source = flow.Zip([1,flow.Cooperate(),2,3],["one","two"])
98
def __init__(self, *stages):
99
Map.__init__(self, None, stages[0], *stages[1:])
101
class Concurrent(Stage):
103
Executes stages concurrently
105
This stage allows two or more stages (branches) to be executed at the same
106
time. It returns each stage as it becomes available. This can be used if
107
you have N callbacks, and you want to yield and wait for the first
108
available one that produces results. Once a stage is retuned, its next()
109
method should be used to extract the value for the stage.
112
class Instruction(CallLater):
113
def __init__(self, inst):
115
def callLater(self, callable):
116
for inst in self.inst:
117
inst.callLater(callable)
119
def __init__(self, *stages):
123
self._stages.append(wrap(stage))
126
if self.results or self.stop or self.failure:
128
stages = self._stages
132
if stages[0] is exit:
137
instruction = curr._yield()
139
self.results.append(curr)
141
self.failure = curr.failure
152
if isinstance(instruction, CallLater):
153
if instruction not in later:
154
later.append(instruction)
156
raise Unsupported(instruction)
158
return Concurrent.Instruction(later)
163
Merges two or more Stages results into a single stream
167
source = flow.Zip([1,flow.Cooperate(),2,3],["one","two"])
170
def __init__(self, *stages):
172
self.concurrent = Concurrent(*stages)
175
if self.results or self.stop or self.failure:
177
instruction = self.concurrent._yield()
180
for stage in self.concurrent.results:
181
self.results.extend(stage.results)
183
self.concurrent.results = []
184
if self.concurrent.stop:
186
self.failure = self.concurrent.failure
188
class Callback(Stage):
190
Converts a single-thread push interface into a pull interface.
192
Once this stage is constructed, its result, errback, and finish member
193
variables may be called by a producer. The results of which can be
194
obtained by yielding the Callback and then calling next().
198
source = flow.Callback()
199
reactor.callLater(0, lambda: source.result("one"))
200
reactor.callLater(.5, lambda: source.result("two"))
201
reactor.callLater(1, lambda: source.finish())
205
# TODO: Potentially rename this 'Consumer' and make it
206
# comply with protocols.IConsumer
207
# TODO: Make the inverse stage, which is an IProducer
208
class Instruction(CallLater):
210
self.flow = lambda: True
211
def callLater(self, callable):
213
def __init__(self, *trap):
214
Stage.__init__(self, *trap)
215
self._finished = False
216
self._cooperate = Callback.Instruction()
217
def result(self,result):
218
""" called by the producer to indicate a successful result """
219
self.results.append(result)
220
self._cooperate.flow()
222
""" called by producer to indicate successful stream completion """
223
assert not self.failure, "failed streams should not be finished"
224
self._finished = True
225
self._cooperate.flow()
226
def errback(self, fail):
227
""" called by the producer in case of Failure """
229
self._cooperate.flow()
231
if self.results or self.stop or self.failure:
237
return self._cooperate