1
# -*- test-case-name: twisted.test.test_pcp -*-
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Producer-Consumer Proxy.
9
from zope.interface import implements
11
from twisted.internet import interfaces
14
class BasicProducerConsumerProxy:
16
I can act as a man in the middle between any Producer and Consumer.
18
@ivar producer: the Producer I subscribe to.
19
@type producer: L{IProducer<interfaces.IProducer>}
20
@ivar consumer: the Consumer I publish to.
21
@type consumer: L{IConsumer<interfaces.IConsumer>}
22
@ivar paused: As a Producer, am I paused?
25
implements(interfaces.IProducer, interfaces.IConsumer)
29
producerIsStreaming = None
31
outstandingPull = False
35
def __init__(self, consumer):
37
if consumer is not None:
38
self.consumer = consumer
39
consumer.registerProducer(self, self.iAmStreaming)
43
def pauseProducing(self):
46
self.producer.pauseProducing()
48
def resumeProducing(self):
51
# TODO: Check to see if consumer supports writeSeq.
52
self.consumer.write(''.join(self._buffer))
55
if not self.iAmStreaming:
56
self.outstandingPull = True
58
if self.producer is not None:
59
self.producer.resumeProducing()
61
def stopProducing(self):
62
if self.producer is not None:
63
self.producer.stopProducing()
64
if self.consumer is not None:
69
def write(self, data):
70
if self.paused or (not self.iAmStreaming and not self.outstandingPull):
71
# We could use that fifo queue here.
72
self._buffer.append(data)
74
elif self.consumer is not None:
75
self.consumer.write(data)
76
self.outstandingPull = False
79
if self.consumer is not None:
80
self.consumer.finish()
81
self.unregisterProducer()
83
def registerProducer(self, producer, streaming):
84
self.producer = producer
85
self.producerIsStreaming = streaming
87
def unregisterProducer(self):
88
if self.producer is not None:
90
del self.producerIsStreaming
92
self.consumer.unregisterProducer()
95
return '<%s@%x around %s>' % (self.__class__, id(self), self.consumer)
98
class ProducerConsumerProxy(BasicProducerConsumerProxy):
99
"""ProducerConsumerProxy with a finite buffer.
101
When my buffer fills up, I have my parent Producer pause until my buffer
102
has room in it again.
104
# Copies much from abstract.FileDescriptor
105
bufferSize = 2**2**2**2
107
producerPaused = False
110
def pauseProducing(self):
111
# Does *not* call up to ProducerConsumerProxy to relay the pause
112
# message through to my parent Producer.
115
def resumeProducing(self):
118
data = ''.join(self._buffer)
119
bytesSent = self._writeSomeData(data)
120
if bytesSent < len(data):
121
unsent = data[bytesSent:]
122
assert not self.iAmStreaming, (
123
"Streaming producer did not write all its data.")
124
self._buffer[:] = [unsent]
130
if (self.unregistered and bytesSent and not self._buffer and
131
self.consumer is not None):
132
self.consumer.unregisterProducer()
134
if not self.iAmStreaming:
135
self.outstandingPull = not bytesSent
137
if self.producer is not None:
138
bytesBuffered = sum([len(s) for s in self._buffer])
139
# TODO: You can see here the potential for high and low
140
# watermarks, where bufferSize would be the high mark when we
141
# ask the upstream producer to pause, and we wouldn't have
142
# it resume again until it hit the low mark. Or if producer
143
# is Pull, maybe we'd like to pull from it as much as necessary
144
# to keep our buffer full to the low mark, so we're never caught
145
# without something to send.
146
if self.producerPaused and (bytesBuffered < self.bufferSize):
147
# Now that our buffer is empty,
148
self.producerPaused = False
149
self.producer.resumeProducing()
150
elif self.outstandingPull:
151
# I did not have any data to write in response to a pull,
152
# so I'd better pull some myself.
153
self.producer.resumeProducing()
155
def write(self, data):
156
if self.paused or (not self.iAmStreaming and not self.outstandingPull):
157
# We could use that fifo queue here.
158
self._buffer.append(data)
160
elif self.consumer is not None:
161
assert not self._buffer, (
162
"Writing fresh data to consumer before my buffer is empty!")
163
# I'm going to use _writeSomeData here so that there is only one
164
# path to self.consumer.write. But it doesn't actually make sense,
165
# if I am streaming, for some data to not be all data. But maybe I
166
# am not streaming, but I am writing here anyway, because there was
167
# an earlier request for data which was not answered.
168
bytesSent = self._writeSomeData(data)
169
self.outstandingPull = False
170
if not bytesSent == len(data):
171
assert not self.iAmStreaming, (
172
"Streaming producer did not write all its data.")
173
self._buffer.append(data[bytesSent:])
175
if (self.producer is not None) and self.producerIsStreaming:
176
bytesBuffered = sum([len(s) for s in self._buffer])
177
if bytesBuffered >= self.bufferSize:
179
self.producer.pauseProducing()
180
self.producerPaused = True
182
def registerProducer(self, producer, streaming):
183
self.unregistered = False
184
BasicProducerConsumerProxy.registerProducer(self, producer, streaming)
186
producer.resumeProducing()
188
def unregisterProducer(self):
189
if self.producer is not None:
191
del self.producerIsStreaming
192
self.unregistered = True
193
if self.consumer and not self._buffer:
194
self.consumer.unregisterProducer()
196
def _writeSomeData(self, data):
197
"""Write as much of this data as possible.
199
@returns: The number of bytes written.
201
if self.consumer is None:
203
self.consumer.write(data)