~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/flow/pipe.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
 
# See LICENSE for details.
3
 
 
4
 
#
5
 
# Author: Clark Evans  (cce@clarkevans.com)
6
 
 
7
 
"""
8
 
flow.pipe
9
 
 
10
 
This contains various filter stages which have exactly one input stage.  These
11
 
stages take a single input and modify its results, ie a rewrite stage.
12
 
"""
13
 
 
14
 
from base import *
15
 
from wrap import wrap
16
 
from twisted.python.failure import Failure
17
 
 
18
 
class Pipe(Stage):
19
 
    """ abstract stage which takes a single input stage """
20
 
    def __init__(self, source, *trap):
21
 
        Stage.__init__(self, *trap)
22
 
        self._source = wrap(source)
23
 
 
24
 
    def _yield(self):
25
 
        while not self.results \
26
 
          and not self.stop \
27
 
          and not self.failure:
28
 
            source = self._source
29
 
            instruction = source._yield()
30
 
            if instruction:
31
 
                return instruction
32
 
            if source.failure:
33
 
                self.failure = source.failure
34
 
                return
35
 
            results = source.results
36
 
            stop = source.stop
37
 
            if stop:
38
 
                self.stop = True
39
 
            source.results = []
40
 
            self.process(results, stop)
41
 
 
42
 
    def process(self, results):
43
 
        """ process implemented by the pipe
44
 
 
45
 
            Take a set of possibly empty results and sets the member 
46
 
            variables: results, stop, or failure appropriately
47
 
        """
48
 
        raise NotImplementedError
49
 
 
50
 
class Filter(Pipe):
51
 
    """
52
 
    flow equivalent to filter:  Filter(function, source, ... )
53
 
 
54
 
    Yield those elements from a source stage for which a function returns true.
55
 
    If the function is None, the identity function is assumed, that is, all
56
 
    items yielded that are false (zero or empty) are discarded.
57
 
 
58
 
    For example::
59
 
 
60
 
        def odd(val):
61
 
            if val % 2:
62
 
                return True
63
 
 
64
 
        def range():
65
 
            yield 1
66
 
            yield 2
67
 
            yield 3
68
 
            yield 4
69
 
 
70
 
        source = flow.Filter(odd,range)
71
 
        printFlow(source)
72
 
    """
73
 
    def __init__(self, func, source, *trap):
74
 
        Pipe.__init__(self, source, *trap)
75
 
        self._func = func
76
 
 
77
 
    def process(self, results, stop):
78
 
        self.results.extend(filter(self._func,results))
79
 
 
80
 
class LineBreak(Pipe):
81
 
    """ pipe stage which breaks its input into lines """
82
 
    def __init__(self, source, *trap, **kwargs):
83
 
        Pipe.__init__(self, source, *trap)
84
 
        self._delimiter = kwargs.get('delimiter','\r\n')
85
 
        self._maxlen    = int(kwargs.get('maxlength', 16384))+1
86
 
        self._trailer   = int(kwargs.get('trailer',False))
87
 
        self._buffer    = []     
88
 
        self._currlen   = 0
89
 
 
90
 
    def process(self, results, stop):
91
 
        for block in results:
92
 
            lines = str(block).split(self._delimiter)
93
 
            if len(lines) < 2:
94
 
                tail = lines[0]
95
 
            else:
96
 
                tail = lines.pop()
97
 
                if self._buffer:
98
 
                    self._buffer.append(lines.pop(0))
99
 
                    self.results.append("".join(self._buffer))
100
 
                    self._buffer = []
101
 
                self.results.extend(lines) 
102
 
                self._currlen = 0
103
 
            if tail:
104
 
                self._currlen += len(tail)
105
 
                self._buffer.append(tail)
106
 
        if stop and self._buffer:
107
 
            tail = "".join(self._buffer)
108
 
            if self._trailer:
109
 
                self.results.append(tail)
110
 
            else:
111
 
                raise RuntimeError, "trailing data remains: '%s'" % tail[:10]
112