1
# -*- test-case-name: twisted.test.test_pcp -*-
3
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
4
# See LICENSE for details.
7
"""Producer-Consumer Proxy."""
9
__version__ = '$Revision: 1.4 $'[11:-2]
13
from zope.interface import implements
15
from twisted.internet import interfaces
18
class BasicProducerConsumerProxy:
19
""" I can act as a man in the middle between any Producer and Consumer.
21
@ivar producer: the Producer I subscribe to.
22
@type producer: L{IProducer<interfaces.IProducer>}
23
@ivar consumer: the Consumer I publish to.
24
@type consumer: L{IConsumer<interfaces.IConsumer>}
25
@ivar paused: As a Producer, am I paused?
28
implements(interfaces.IProducer, interfaces.IConsumer)
32
producerIsStreaming = None
34
outstandingPull = False
38
def __init__(self, consumer):
40
if consumer is not None:
41
self.consumer = consumer
42
consumer.registerProducer(self, self.iAmStreaming)
46
def pauseProducing(self):
49
self.producer.pauseProducing()
51
def resumeProducing(self):
54
# TODO: Check to see if consumer supports writeSeq.
55
self.consumer.write(''.join(self._buffer))
58
if not self.iAmStreaming:
59
self.outstandingPull = True
61
if self.producer is not None:
62
self.producer.resumeProducing()
64
def stopProducing(self):
65
if self.producer is not None:
66
self.producer.stopProducing()
67
if self.consumer is not None:
72
def write(self, data):
73
if self.paused or (not self.iAmStreaming and not self.outstandingPull):
74
# We could use that fifo queue here.
75
self._buffer.append(data)
77
elif self.consumer is not None:
78
self.consumer.write(data)
79
self.outstandingPull = False
82
if self.consumer is not None:
83
self.consumer.finish()
84
self.unregisterProducer()
86
def registerProducer(self, producer, streaming):
87
self.producer = producer
88
self.producerIsStreaming = streaming
90
def unregisterProducer(self):
91
if self.producer is not None:
93
del self.producerIsStreaming
95
self.consumer.unregisterProducer()
98
return '<%s@%x around %s>' % (self.__class__, id(self), self.consumer)
101
class ProducerConsumerProxy(BasicProducerConsumerProxy):
102
"""ProducerConsumerProxy with a finite buffer.
104
When my buffer fills up, I have my parent Producer pause until my buffer
105
has room in it again.
107
# Copies much from abstract.FileDescriptor
108
bufferSize = 2**2**2**2
110
producerPaused = False
113
def pauseProducing(self):
114
# Does *not* call up to ProducerConsumerProxy to relay the pause
115
# message through to my parent Producer.
118
def resumeProducing(self):
121
data = ''.join(self._buffer)
122
bytesSent = self._writeSomeData(data)
123
if bytesSent < len(data):
124
unsent = data[bytesSent:]
125
assert not self.iAmStreaming, (
126
"Streaming producer did not write all its data.")
127
self._buffer[:] = [unsent]
133
if (self.unregistered and bytesSent and not self._buffer and
134
self.consumer is not None):
135
self.consumer.unregisterProducer()
137
if not self.iAmStreaming:
138
self.outstandingPull = not bytesSent
140
if self.producer is not None:
141
bytesBuffered = reduce(operator.add,
142
[len(s) for s in self._buffer], 0)
143
# TODO: You can see here the potential for high and low
144
# watermarks, where bufferSize would be the high mark when we
145
# ask the upstream producer to pause, and we wouldn't have
146
# it resume again until it hit the low mark. Or if producer
147
# is Pull, maybe we'd like to pull from it as much as necessary
148
# to keep our buffer full to the low mark, so we're never caught
149
# without something to send.
150
if self.producerPaused and (bytesBuffered < self.bufferSize):
151
# Now that our buffer is empty,
152
self.producerPaused = False
153
self.producer.resumeProducing()
154
elif self.outstandingPull:
155
# I did not have any data to write in response to a pull,
156
# so I'd better pull some myself.
157
self.producer.resumeProducing()
159
def write(self, data):
160
if self.paused or (not self.iAmStreaming and not self.outstandingPull):
161
# We could use that fifo queue here.
162
self._buffer.append(data)
164
elif self.consumer is not None:
165
assert not self._buffer, (
166
"Writing fresh data to consumer before my buffer is empty!")
167
# I'm going to use _writeSomeData here so that there is only one
168
# path to self.consumer.write. But it doesn't actually make sense,
169
# if I am streaming, for some data to not be all data. But maybe I
170
# am not streaming, but I am writing here anyway, because there was
171
# an earlier request for data which was not answered.
172
bytesSent = self._writeSomeData(data)
173
self.outstandingPull = False
174
if not bytesSent == len(data):
175
assert not self.iAmStreaming, (
176
"Streaming producer did not write all its data.")
177
self._buffer.append(data[bytesSent:])
179
if (self.producer is not None) and self.producerIsStreaming:
180
bytesBuffered = reduce(operator.add,
181
[len(s) for s in self._buffer], 0)
182
if bytesBuffered >= self.bufferSize:
184
self.producer.pauseProducing()
185
self.producerPaused = True
187
def registerProducer(self, producer, streaming):
188
self.unregistered = False
189
BasicProducerConsumerProxy.registerProducer(self, producer, streaming)
191
producer.resumeProducing()
193
def unregisterProducer(self):
194
if self.producer is not None:
196
del self.producerIsStreaming
197
self.unregistered = True
198
if self.consumer and not self._buffer:
199
self.consumer.unregisterProducer()
201
def _writeSomeData(self, data):
202
"""Write as much of this data as possible.
204
@returns: The number of bytes written.
206
if self.consumer is None:
208
self.consumer.write(data)