~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/flow/pipe.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
#
 
5
# Author: Clark Evans  (cce@clarkevans.com)
 
6
 
 
7
"""
 
8
flow.pipe
 
9
 
 
10
This contains various filter stages which have exactly one input stage.  These
 
11
stages take a single input and modify its results, ie a rewrite stage.
 
12
"""
 
13
 
 
14
from base import *
 
15
from wrap import wrap
 
16
from twisted.python.failure import Failure
 
17
 
 
18
class Pipe(Stage):
 
19
    """ abstract stage which takes a single input stage """
 
20
    def __init__(self, source, *trap):
 
21
        Stage.__init__(self, *trap)
 
22
        self._source = wrap(source)
 
23
 
 
24
    def _yield(self):
 
25
        while not self.results \
 
26
          and not self.stop \
 
27
          and not self.failure:
 
28
            source = self._source
 
29
            instruction = source._yield()
 
30
            if instruction:
 
31
                return instruction
 
32
            if source.failure:
 
33
                self.failure = source.failure
 
34
                return
 
35
            results = source.results
 
36
            stop = source.stop
 
37
            if stop:
 
38
                self.stop = True
 
39
            source.results = []
 
40
            self.process(results, stop)
 
41
 
 
42
    def process(self, results):
 
43
        """ process implemented by the pipe
 
44
 
 
45
            Take a set of possibly empty results and sets the member 
 
46
            variables: results, stop, or failure appropriately
 
47
        """
 
48
        raise NotImplementedError
 
49
 
 
50
class Filter(Pipe):
 
51
    """
 
52
    flow equivalent to filter:  Filter(function, source, ... )
 
53
 
 
54
    Yield those elements from a source stage for which a function returns true.
 
55
    If the function is None, the identity function is assumed, that is, all
 
56
    items yielded that are false (zero or empty) are discarded.
 
57
 
 
58
    For example::
 
59
 
 
60
        def odd(val):
 
61
            if val % 2:
 
62
                return True
 
63
 
 
64
        def range():
 
65
            yield 1
 
66
            yield 2
 
67
            yield 3
 
68
            yield 4
 
69
 
 
70
        source = flow.Filter(odd,range)
 
71
        printFlow(source)
 
72
    """
 
73
    def __init__(self, func, source, *trap):
 
74
        Pipe.__init__(self, source, *trap)
 
75
        self._func = func
 
76
 
 
77
    def process(self, results, stop):
 
78
        self.results.extend(filter(self._func,results))
 
79
 
 
80
class LineBreak(Pipe):
 
81
    """ pipe stage which breaks its input into lines """
 
82
    def __init__(self, source, *trap, **kwargs):
 
83
        Pipe.__init__(self, source, *trap)
 
84
        self._delimiter = kwargs.get('delimiter','\r\n')
 
85
        self._maxlen    = int(kwargs.get('maxlength', 16384))+1
 
86
        self._trailer   = int(kwargs.get('trailer',False))
 
87
        self._buffer    = []     
 
88
        self._currlen   = 0
 
89
 
 
90
    def process(self, results, stop):
 
91
        for block in results:
 
92
            lines = str(block).split(self._delimiter)
 
93
            if len(lines) < 2:
 
94
                tail = lines[0]
 
95
            else:
 
96
                tail = lines.pop()
 
97
                if self._buffer:
 
98
                    self._buffer.append(lines.pop(0))
 
99
                    self.results.append("".join(self._buffer))
 
100
                    self._buffer = []
 
101
                self.results.extend(lines) 
 
102
                self._currlen = 0
 
103
            if tail:
 
104
                self._currlen += len(tail)
 
105
                self._buffer.append(tail)
 
106
        if stop and self._buffer:
 
107
            tail = "".join(self._buffer)
 
108
            if self._trailer:
 
109
                self.results.append(tail)
 
110
            else:
 
111
                raise RuntimeError, "trailing data remains: '%s'" % tail[:10]
 
112