~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/flow/stage.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
#
 
5
# Author: Clark Evans  (cce@clarkevans.com)
 
6
#
 
7
 
 
8
"""
 
9
flow.stage
 
10
 
 
11
Various stages for manipulating data flows, in particular, those stages which
 
12
take more than one input stages or alternative input, such as a callback.
 
13
"""
 
14
 
 
15
from base import *
 
16
from wrap import wrap
 
17
from twisted.python.failure import Failure
 
18
 
 
19
class Map(Stage):
 
20
    """
 
21
    flow equivalent to map:  Map(function, stage, ... )
 
22
 
 
23
    Apply a function to every item yielded and yield the results.  If
 
24
    additional stages are passed, the function must take that many arguments
 
25
    and is applied to the items of all lists in parallel.  If a list is shorter
 
26
    than another, it is assumed to be extended with None items.  If the
 
27
    function is None, the identity function is assumed; if there are multiple
 
28
    list arguments, Map stage returns a sequence consisting of tuples
 
29
    containing the corresponding items from all lists.
 
30
 
 
31
    For example::
 
32
 
 
33
        def fn(val):
 
34
            return val + 10
 
35
 
 
36
        source = flow.Map(fn,range(4))
 
37
        printFlow(source)
 
38
    """
 
39
    def __init__(self, func, stage, *stages):
 
40
        Stage.__init__(self)
 
41
        self.func = func
 
42
        self._stage  = [wrap(stage)]
 
43
        for stage in stages:
 
44
            self._stage.append(wrap(stage))
 
45
        self._index  = 0
 
46
 
 
47
    def _yield(self):
 
48
        if self.results or self.stop or self.failure:
 
49
            return
 
50
        if not self._index:
 
51
            self._curr = []
 
52
            self._done = True
 
53
        while self._index < len(self._stage):
 
54
            idx = self._index
 
55
            curr = self._stage[idx]
 
56
            instruction = curr._yield()
 
57
            if instruction:
 
58
                return instruction
 
59
            if curr.results:
 
60
                self._curr.append(curr.results.pop(0))
 
61
                self._index += 1
 
62
                self._done = False
 
63
                continue
 
64
            if curr.stop:
 
65
                self._curr.append(None)
 
66
                self._index += 1
 
67
                continue
 
68
            if curr.failure:
 
69
                self.failure = curr.failure
 
70
                return
 
71
            raise AssertionError("flow.Map ; no results, stop or failure?")
 
72
        if self._done:
 
73
            self.stop = 1
 
74
            return
 
75
        curr = tuple(self._curr)
 
76
        if self.func:
 
77
            try:
 
78
                curr = self.func(*curr)
 
79
            except Failure, fail:
 
80
                self.failure = fail
 
81
                return
 
82
            except:
 
83
                self.failure = Failure()
 
84
                return
 
85
        self.results.append(curr)
 
86
        self._index  = 0
 
87
 
 
88
class Zip(Map):
 
89
    """
 
90
    Zips two or more stages into a stream of N tuples
 
91
 
 
92
    For example::
 
93
 
 
94
        source = flow.Zip([1,flow.Cooperate(),2,3],["one","two"])
 
95
        printFlow(source)
 
96
 
 
97
    """
 
98
    def __init__(self, *stages):
 
99
        Map.__init__(self, None, stages[0], *stages[1:])
 
100
 
 
101
class Concurrent(Stage):
 
102
    """
 
103
    Executes stages concurrently
 
104
 
 
105
    This stage allows two or more stages (branches) to be executed at the same
 
106
    time.  It returns each stage as it becomes available.  This can be used if
 
107
    you have N callbacks, and you want to yield and wait for the first
 
108
    available one that produces results.  Once a stage is retuned, its next()
 
109
    method should be used to extract the value for the stage.
 
110
    """
 
111
 
 
112
    class Instruction(CallLater):
 
113
        def __init__(self, inst):
 
114
            self.inst = inst
 
115
        def callLater(self, callable):
 
116
            for inst in self.inst:
 
117
                inst.callLater(callable)
 
118
 
 
119
    def __init__(self, *stages):
 
120
        Stage.__init__(self)
 
121
        self._stages = []
 
122
        for stage in stages:
 
123
            self._stages.append(wrap(stage))
 
124
 
 
125
    def _yield(self):
 
126
        if self.results or self.stop or self.failure:
 
127
            return
 
128
        stages = self._stages
 
129
        later = []
 
130
        exit = None
 
131
        while stages:
 
132
            if stages[0] is exit:
 
133
                if self.results:
 
134
                    return
 
135
                break
 
136
            curr = stages.pop(0)
 
137
            instruction = curr._yield()
 
138
            if curr.results:
 
139
                self.results.append(curr)
 
140
            if curr.failure:
 
141
                self.failure = curr.failure
 
142
                return
 
143
            if curr.stop:
 
144
                exit = None
 
145
                if self.results:
 
146
                    return
 
147
                continue
 
148
            stages.append(curr)
 
149
            if not exit:
 
150
                exit = curr
 
151
            if instruction:
 
152
                if isinstance(instruction, CallLater):
 
153
                    if instruction not in later:
 
154
                        later.append(instruction)
 
155
                    continue
 
156
                raise Unsupported(instruction)
 
157
        if later:
 
158
            return Concurrent.Instruction(later)
 
159
        self.stop = True
 
160
 
 
161
class Merge(Stage):
 
162
    """
 
163
    Merges two or more Stages results into a single stream
 
164
 
 
165
    For example::
 
166
 
 
167
        source = flow.Zip([1,flow.Cooperate(),2,3],["one","two"])
 
168
        printFlow(source)
 
169
    """
 
170
    def __init__(self, *stages):
 
171
        Stage.__init__(self)
 
172
        self.concurrent = Concurrent(*stages)
 
173
 
 
174
    def _yield(self):
 
175
        if self.results or self.stop or self.failure:
 
176
            return
 
177
        instruction = self.concurrent._yield()
 
178
        if instruction: 
 
179
            return instruction
 
180
        for stage in self.concurrent.results:
 
181
            self.results.extend(stage.results)
 
182
            stage.results = []
 
183
        self.concurrent.results = []
 
184
        if self.concurrent.stop:
 
185
            self.stop = True
 
186
        self.failure =  self.concurrent.failure
 
187
 
 
188
class Callback(Stage):
 
189
    """
 
190
    Converts a single-thread push interface into a pull interface.
 
191
 
 
192
    Once this stage is constructed, its result, errback, and finish member
 
193
    variables may be called by a producer.  The results of which can be
 
194
    obtained by yielding the Callback and then calling next().
 
195
 
 
196
    For example::
 
197
 
 
198
        source = flow.Callback()
 
199
        reactor.callLater(0, lambda: source.result("one"))
 
200
        reactor.callLater(.5, lambda: source.result("two"))
 
201
        reactor.callLater(1, lambda: source.finish())
 
202
        printFlow(source)
 
203
 
 
204
    """
 
205
    # TODO: Potentially rename this 'Consumer' and make it
 
206
    #       comply with protocols.IConsumer
 
207
    # TODO: Make the inverse stage, which is an IProducer
 
208
    class Instruction(CallLater):
 
209
        def __init__(self):
 
210
            self.flow = lambda: True
 
211
        def callLater(self, callable):
 
212
            self.flow = callable
 
213
    def __init__(self, *trap):
 
214
        Stage.__init__(self, *trap)
 
215
        self._finished   = False
 
216
        self._cooperate  = Callback.Instruction()
 
217
    def result(self,result):
 
218
        """ called by the producer to indicate a successful result """
 
219
        self.results.append(result)
 
220
        self._cooperate.flow()
 
221
    def finish(self):
 
222
        """ called by producer to indicate successful stream completion """
 
223
        assert not self.failure, "failed streams should not be finished"
 
224
        self._finished = True
 
225
        self._cooperate.flow()
 
226
    def errback(self, fail):
 
227
        """ called by the producer in case of Failure """
 
228
        self.failure = fail
 
229
        self._cooperate.flow()
 
230
    def _yield(self):
 
231
        if self.results or self.stop or self.failure:
 
232
            return
 
233
        if not self.results: 
 
234
            if self._finished:
 
235
                self.stop = True
 
236
                return
 
237
            return self._cooperate
 
238
    __call__ = result
 
239