~0x44/nova/bug838466

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/protocols/pcp.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- test-case-name: twisted.test.test_pcp -*-
 
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
"""
 
6
Producer-Consumer Proxy.
 
7
"""
 
8
 
 
9
from zope.interface import implements
 
10
 
 
11
from twisted.internet import interfaces
 
12
 
 
13
 
 
14
class BasicProducerConsumerProxy:
 
15
    """
 
16
    I can act as a man in the middle between any Producer and Consumer.
 
17
 
 
18
    @ivar producer: the Producer I subscribe to.
 
19
    @type producer: L{IProducer<interfaces.IProducer>}
 
20
    @ivar consumer: the Consumer I publish to.
 
21
    @type consumer: L{IConsumer<interfaces.IConsumer>}
 
22
    @ivar paused: As a Producer, am I paused?
 
23
    @type paused: bool
 
24
    """
 
25
    implements(interfaces.IProducer, interfaces.IConsumer)
 
26
 
 
27
    consumer = None
 
28
    producer = None
 
29
    producerIsStreaming = None
 
30
    iAmStreaming = True
 
31
    outstandingPull = False
 
32
    paused = False
 
33
    stopped = False
 
34
 
 
35
    def __init__(self, consumer):
 
36
        self._buffer = []
 
37
        if consumer is not None:
 
38
            self.consumer = consumer
 
39
            consumer.registerProducer(self, self.iAmStreaming)
 
40
 
 
41
    # Producer methods:
 
42
 
 
43
    def pauseProducing(self):
 
44
        self.paused = True
 
45
        if self.producer:
 
46
            self.producer.pauseProducing()
 
47
 
 
48
    def resumeProducing(self):
 
49
        self.paused = False
 
50
        if self._buffer:
 
51
            # TODO: Check to see if consumer supports writeSeq.
 
52
            self.consumer.write(''.join(self._buffer))
 
53
            self._buffer[:] = []
 
54
        else:
 
55
            if not self.iAmStreaming:
 
56
                self.outstandingPull = True
 
57
 
 
58
        if self.producer is not None:
 
59
            self.producer.resumeProducing()
 
60
 
 
61
    def stopProducing(self):
 
62
        if self.producer is not None:
 
63
            self.producer.stopProducing()
 
64
        if self.consumer is not None:
 
65
            del self.consumer
 
66
 
 
67
    # Consumer methods:
 
68
 
 
69
    def write(self, data):
 
70
        if self.paused or (not self.iAmStreaming and not self.outstandingPull):
 
71
            # We could use that fifo queue here.
 
72
            self._buffer.append(data)
 
73
 
 
74
        elif self.consumer is not None:
 
75
            self.consumer.write(data)
 
76
            self.outstandingPull = False
 
77
 
 
78
    def finish(self):
 
79
        if self.consumer is not None:
 
80
            self.consumer.finish()
 
81
        self.unregisterProducer()
 
82
 
 
83
    def registerProducer(self, producer, streaming):
 
84
        self.producer = producer
 
85
        self.producerIsStreaming = streaming
 
86
 
 
87
    def unregisterProducer(self):
 
88
        if self.producer is not None:
 
89
            del self.producer
 
90
            del self.producerIsStreaming
 
91
        if self.consumer:
 
92
            self.consumer.unregisterProducer()
 
93
 
 
94
    def __repr__(self):
 
95
        return '<%s@%x around %s>' % (self.__class__, id(self), self.consumer)
 
96
 
 
97
 
 
98
class ProducerConsumerProxy(BasicProducerConsumerProxy):
 
99
    """ProducerConsumerProxy with a finite buffer.
 
100
 
 
101
    When my buffer fills up, I have my parent Producer pause until my buffer
 
102
    has room in it again.
 
103
    """
 
104
    # Copies much from abstract.FileDescriptor
 
105
    bufferSize = 2**2**2**2
 
106
 
 
107
    producerPaused = False
 
108
    unregistered = False
 
109
 
 
110
    def pauseProducing(self):
 
111
        # Does *not* call up to ProducerConsumerProxy to relay the pause
 
112
        # message through to my parent Producer.
 
113
        self.paused = True
 
114
 
 
115
    def resumeProducing(self):
 
116
        self.paused = False
 
117
        if self._buffer:
 
118
            data = ''.join(self._buffer)
 
119
            bytesSent = self._writeSomeData(data)
 
120
            if bytesSent < len(data):
 
121
                unsent = data[bytesSent:]
 
122
                assert not self.iAmStreaming, (
 
123
                    "Streaming producer did not write all its data.")
 
124
                self._buffer[:] = [unsent]
 
125
            else:
 
126
                self._buffer[:] = []
 
127
        else:
 
128
            bytesSent = 0
 
129
 
 
130
        if (self.unregistered and bytesSent and not self._buffer and
 
131
            self.consumer is not None):
 
132
            self.consumer.unregisterProducer()
 
133
 
 
134
        if not self.iAmStreaming:
 
135
            self.outstandingPull = not bytesSent
 
136
 
 
137
        if self.producer is not None:
 
138
            bytesBuffered = sum([len(s) for s in self._buffer])
 
139
            # TODO: You can see here the potential for high and low
 
140
            # watermarks, where bufferSize would be the high mark when we
 
141
            # ask the upstream producer to pause, and we wouldn't have
 
142
            # it resume again until it hit the low mark.  Or if producer
 
143
            # is Pull, maybe we'd like to pull from it as much as necessary
 
144
            # to keep our buffer full to the low mark, so we're never caught
 
145
            # without something to send.
 
146
            if self.producerPaused and (bytesBuffered < self.bufferSize):
 
147
                # Now that our buffer is empty,
 
148
                self.producerPaused = False
 
149
                self.producer.resumeProducing()
 
150
            elif self.outstandingPull:
 
151
                # I did not have any data to write in response to a pull,
 
152
                # so I'd better pull some myself.
 
153
                self.producer.resumeProducing()
 
154
 
 
155
    def write(self, data):
 
156
        if self.paused or (not self.iAmStreaming and not self.outstandingPull):
 
157
            # We could use that fifo queue here.
 
158
            self._buffer.append(data)
 
159
 
 
160
        elif self.consumer is not None:
 
161
            assert not self._buffer, (
 
162
                "Writing fresh data to consumer before my buffer is empty!")
 
163
            # I'm going to use _writeSomeData here so that there is only one
 
164
            # path to self.consumer.write.  But it doesn't actually make sense,
 
165
            # if I am streaming, for some data to not be all data.  But maybe I
 
166
            # am not streaming, but I am writing here anyway, because there was
 
167
            # an earlier request for data which was not answered.
 
168
            bytesSent = self._writeSomeData(data)
 
169
            self.outstandingPull = False
 
170
            if not bytesSent == len(data):
 
171
                assert not self.iAmStreaming, (
 
172
                    "Streaming producer did not write all its data.")
 
173
                self._buffer.append(data[bytesSent:])
 
174
 
 
175
        if (self.producer is not None) and self.producerIsStreaming:
 
176
            bytesBuffered = sum([len(s) for s in self._buffer])
 
177
            if bytesBuffered >= self.bufferSize:
 
178
 
 
179
                self.producer.pauseProducing()
 
180
                self.producerPaused = True
 
181
 
 
182
    def registerProducer(self, producer, streaming):
 
183
        self.unregistered = False
 
184
        BasicProducerConsumerProxy.registerProducer(self, producer, streaming)
 
185
        if not streaming:
 
186
            producer.resumeProducing()
 
187
 
 
188
    def unregisterProducer(self):
 
189
        if self.producer is not None:
 
190
            del self.producer
 
191
            del self.producerIsStreaming
 
192
        self.unregistered = True
 
193
        if self.consumer and not self._buffer:
 
194
            self.consumer.unregisterProducer()
 
195
 
 
196
    def _writeSomeData(self, data):
 
197
        """Write as much of this data as possible.
 
198
 
 
199
        @returns: The number of bytes written.
 
200
        """
 
201
        if self.consumer is None:
 
202
            return 0
 
203
        self.consumer.write(data)
 
204
        return len(data)