~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/protocols/pcp.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# -*- test-case-name: twisted.test.test_pcp -*-
2
 
#
3
 
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
4
 
# See LICENSE for details.
5
 
 
6
 
 
7
 
"""Producer-Consumer Proxy."""
8
 
 
9
 
__version__ = '$Revision: 1.4 $'[11:-2]
10
 
 
11
 
import operator
12
 
 
13
 
from zope.interface import implements
14
 
 
15
 
from twisted.internet import interfaces
16
 
 
17
 
 
18
 
class BasicProducerConsumerProxy:
19
 
    """ I can act as a man in the middle between any Producer and Consumer.
20
 
 
21
 
    @ivar producer: the Producer I subscribe to.
22
 
    @type producer: L{IProducer<interfaces.IProducer>}
23
 
    @ivar consumer: the Consumer I publish to.
24
 
    @type consumer: L{IConsumer<interfaces.IConsumer>}
25
 
    @ivar paused: As a Producer, am I paused?
26
 
    @type paused: bool
27
 
    """
28
 
    implements(interfaces.IProducer, interfaces.IConsumer)
29
 
 
30
 
    consumer = None
31
 
    producer = None
32
 
    producerIsStreaming = None
33
 
    iAmStreaming = True
34
 
    outstandingPull = False
35
 
    paused = False
36
 
    stopped = False
37
 
 
38
 
    def __init__(self, consumer):
39
 
        self._buffer = []
40
 
        if consumer is not None:
41
 
            self.consumer = consumer
42
 
            consumer.registerProducer(self, self.iAmStreaming)
43
 
 
44
 
    # Producer methods:
45
 
 
46
 
    def pauseProducing(self):
47
 
        self.paused = True
48
 
        if self.producer:
49
 
            self.producer.pauseProducing()
50
 
 
51
 
    def resumeProducing(self):
52
 
        self.paused = False
53
 
        if self._buffer:
54
 
            # TODO: Check to see if consumer supports writeSeq.
55
 
            self.consumer.write(''.join(self._buffer))
56
 
            self._buffer[:] = []
57
 
        else:
58
 
            if not self.iAmStreaming:
59
 
                self.outstandingPull = True
60
 
 
61
 
        if self.producer is not None:
62
 
            self.producer.resumeProducing()
63
 
 
64
 
    def stopProducing(self):
65
 
        if self.producer is not None:
66
 
            self.producer.stopProducing()
67
 
        if self.consumer is not None:
68
 
            del self.consumer
69
 
 
70
 
    # Consumer methods:
71
 
 
72
 
    def write(self, data):
73
 
        if self.paused or (not self.iAmStreaming and not self.outstandingPull):
74
 
            # We could use that fifo queue here.
75
 
            self._buffer.append(data)
76
 
 
77
 
        elif self.consumer is not None:
78
 
            self.consumer.write(data)
79
 
            self.outstandingPull = False
80
 
 
81
 
    def finish(self):
82
 
        if self.consumer is not None:
83
 
            self.consumer.finish()
84
 
        self.unregisterProducer()
85
 
 
86
 
    def registerProducer(self, producer, streaming):
87
 
        self.producer = producer
88
 
        self.producerIsStreaming = streaming
89
 
 
90
 
    def unregisterProducer(self):
91
 
        if self.producer is not None:
92
 
            del self.producer
93
 
            del self.producerIsStreaming
94
 
        if self.consumer:
95
 
            self.consumer.unregisterProducer()
96
 
 
97
 
    def __repr__(self):
98
 
        return '<%s@%x around %s>' % (self.__class__, id(self), self.consumer)
99
 
 
100
 
 
101
 
class ProducerConsumerProxy(BasicProducerConsumerProxy):
102
 
    """ProducerConsumerProxy with a finite buffer.
103
 
 
104
 
    When my buffer fills up, I have my parent Producer pause until my buffer
105
 
    has room in it again.
106
 
    """
107
 
    # Copies much from abstract.FileDescriptor
108
 
    bufferSize = 2**2**2**2
109
 
 
110
 
    producerPaused = False
111
 
    unregistered = False
112
 
 
113
 
    def pauseProducing(self):
114
 
        # Does *not* call up to ProducerConsumerProxy to relay the pause
115
 
        # message through to my parent Producer.
116
 
        self.paused = True
117
 
 
118
 
    def resumeProducing(self):
119
 
        self.paused = False
120
 
        if self._buffer:
121
 
            data = ''.join(self._buffer)
122
 
            bytesSent = self._writeSomeData(data)
123
 
            if bytesSent < len(data):
124
 
                unsent = data[bytesSent:]
125
 
                assert not self.iAmStreaming, (
126
 
                    "Streaming producer did not write all its data.")
127
 
                self._buffer[:] = [unsent]
128
 
            else:
129
 
                self._buffer[:] = []
130
 
        else:
131
 
            bytesSent = 0
132
 
 
133
 
        if (self.unregistered and bytesSent and not self._buffer and
134
 
            self.consumer is not None):
135
 
            self.consumer.unregisterProducer()
136
 
 
137
 
        if not self.iAmStreaming:
138
 
            self.outstandingPull = not bytesSent
139
 
 
140
 
        if self.producer is not None:
141
 
            bytesBuffered = reduce(operator.add,
142
 
                                   [len(s) for s in self._buffer], 0)
143
 
            # TODO: You can see here the potential for high and low
144
 
            # watermarks, where bufferSize would be the high mark when we
145
 
            # ask the upstream producer to pause, and we wouldn't have
146
 
            # it resume again until it hit the low mark.  Or if producer
147
 
            # is Pull, maybe we'd like to pull from it as much as necessary
148
 
            # to keep our buffer full to the low mark, so we're never caught
149
 
            # without something to send.
150
 
            if self.producerPaused and (bytesBuffered < self.bufferSize):
151
 
                # Now that our buffer is empty,
152
 
                self.producerPaused = False
153
 
                self.producer.resumeProducing()
154
 
            elif self.outstandingPull:
155
 
                # I did not have any data to write in response to a pull,
156
 
                # so I'd better pull some myself.
157
 
                self.producer.resumeProducing()
158
 
 
159
 
    def write(self, data):
160
 
        if self.paused or (not self.iAmStreaming and not self.outstandingPull):
161
 
            # We could use that fifo queue here.
162
 
            self._buffer.append(data)
163
 
 
164
 
        elif self.consumer is not None:
165
 
            assert not self._buffer, (
166
 
                "Writing fresh data to consumer before my buffer is empty!")
167
 
            # I'm going to use _writeSomeData here so that there is only one
168
 
            # path to self.consumer.write.  But it doesn't actually make sense,
169
 
            # if I am streaming, for some data to not be all data.  But maybe I
170
 
            # am not streaming, but I am writing here anyway, because there was
171
 
            # an earlier request for data which was not answered.
172
 
            bytesSent = self._writeSomeData(data)
173
 
            self.outstandingPull = False
174
 
            if not bytesSent == len(data):
175
 
                assert not self.iAmStreaming, (
176
 
                    "Streaming producer did not write all its data.")
177
 
                self._buffer.append(data[bytesSent:])
178
 
 
179
 
        if (self.producer is not None) and self.producerIsStreaming:
180
 
            bytesBuffered = reduce(operator.add,
181
 
                                   [len(s) for s in self._buffer], 0)
182
 
            if bytesBuffered >= self.bufferSize:
183
 
 
184
 
                self.producer.pauseProducing()
185
 
                self.producerPaused = True
186
 
 
187
 
    def registerProducer(self, producer, streaming):
188
 
        self.unregistered = False
189
 
        BasicProducerConsumerProxy.registerProducer(self, producer, streaming)
190
 
        if not streaming:
191
 
            producer.resumeProducing()
192
 
 
193
 
    def unregisterProducer(self):
194
 
        if self.producer is not None:
195
 
            del self.producer
196
 
            del self.producerIsStreaming
197
 
        self.unregistered = True
198
 
        if self.consumer and not self._buffer:
199
 
            self.consumer.unregisterProducer()
200
 
 
201
 
    def _writeSomeData(self, data):
202
 
        """Write as much of this data as possible.
203
 
 
204
 
        @returns: The number of bytes written.
205
 
        """
206
 
        if self.consumer is None:
207
 
            return 0
208
 
        self.consumer.write(data)
209
 
        return len(data)