~cbehrens/nova/lp844160-build-works-with-zones

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/test/test_pcp.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# -*- Python -*-
 
2
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
3
# See LICENSE for details.
 
4
 
 
5
 
 
6
__version__ = '$Revision: 1.5 $'[11:-2]
 
7
 
 
8
from StringIO import StringIO
 
9
from twisted.trial import unittest
 
10
from twisted.protocols import pcp
 
11
 
 
12
# Goal:
 
13
 
 
14
# Take a Protocol instance.  Own all outgoing data - anything that
 
15
# would go to p.transport.write.  Own all incoming data - anything
 
16
# that comes to p.dataReceived.
 
17
 
 
18
# I need:
 
19
# Something with the AbstractFileDescriptor interface.
 
20
# That is:
 
21
#  - acts as a Transport
 
22
#    - has a method write()
 
23
#    - which buffers
 
24
#  - acts as a Consumer
 
25
#    - has a registerProducer, unRegisterProducer
 
26
#    - tells the Producer to back off (pauseProducing) when its buffer is full.
 
27
#    - tells the Producer to resumeProducing when its buffer is not so full.
 
28
#  - acts as a Producer
 
29
#    - calls registerProducer
 
30
#    - calls write() on consumers
 
31
#    - honors requests to pause/resume producing
 
32
#    - honors stopProducing, and passes it along to upstream Producers
 
33
 
 
34
 
 
35
class DummyTransport:
 
36
    """A dumb transport to wrap around."""
 
37
 
 
38
    def __init__(self):
 
39
        self._writes = []
 
40
 
 
41
    def write(self, data):
 
42
        self._writes.append(data)
 
43
 
 
44
    def getvalue(self):
 
45
        return ''.join(self._writes)
 
46
 
 
47
class DummyProducer:
 
48
    resumed = False
 
49
    stopped = False
 
50
    paused = False
 
51
 
 
52
    def __init__(self, consumer):
 
53
        self.consumer = consumer
 
54
 
 
55
    def resumeProducing(self):
 
56
        self.resumed = True
 
57
        self.paused = False
 
58
 
 
59
    def pauseProducing(self):
 
60
        self.paused = True
 
61
 
 
62
    def stopProducing(self):
 
63
        self.stopped = True
 
64
 
 
65
 
 
66
class DummyConsumer(DummyTransport):
 
67
    producer = None
 
68
    finished = False
 
69
    unregistered = True
 
70
 
 
71
    def registerProducer(self, producer, streaming):
 
72
        self.producer = (producer, streaming)
 
73
 
 
74
    def unregisterProducer(self):
 
75
        self.unregistered = True
 
76
 
 
77
    def finish(self):
 
78
        self.finished = True
 
79
 
 
80
class TransportInterfaceTest(unittest.TestCase):
 
81
    proxyClass = pcp.BasicProducerConsumerProxy
 
82
 
 
83
    def setUp(self):
 
84
        self.underlying = DummyConsumer()
 
85
        self.transport = self.proxyClass(self.underlying)
 
86
 
 
87
    def testWrite(self):
 
88
        self.transport.write("some bytes")
 
89
 
 
90
class ConsumerInterfaceTest:
 
91
    """Test ProducerConsumerProxy as a Consumer.
 
92
 
 
93
    Normally we have ProducingServer -> ConsumingTransport.
 
94
 
 
95
    If I am to go between (Server -> Shaper -> Transport), I have to
 
96
    play the role of Consumer convincingly for the ProducingServer.
 
97
    """
 
98
 
 
99
    def setUp(self):
 
100
        self.underlying = DummyConsumer()
 
101
        self.consumer = self.proxyClass(self.underlying)
 
102
        self.producer = DummyProducer(self.consumer)
 
103
 
 
104
    def testRegisterPush(self):
 
105
        self.consumer.registerProducer(self.producer, True)
 
106
        ## Consumer should NOT have called PushProducer.resumeProducing
 
107
        self.failIf(self.producer.resumed)
 
108
 
 
109
    ## I'm I'm just a proxy, should I only do resumeProducing when
 
110
    ## I get poked myself?
 
111
    #def testRegisterPull(self):
 
112
    #    self.consumer.registerProducer(self.producer, False)
 
113
    #    ## Consumer SHOULD have called PushProducer.resumeProducing
 
114
    #    self.failUnless(self.producer.resumed)
 
115
 
 
116
    def testUnregister(self):
 
117
        self.consumer.registerProducer(self.producer, False)
 
118
        self.consumer.unregisterProducer()
 
119
        # Now when the consumer would ordinarily want more data, it
 
120
        # shouldn't ask producer for it.
 
121
        # The most succinct way to trigger "want more data" is to proxy for
 
122
        # a PullProducer and have someone ask me for data.
 
123
        self.producer.resumed = False
 
124
        self.consumer.resumeProducing()
 
125
        self.failIf(self.producer.resumed)
 
126
 
 
127
    def testFinish(self):
 
128
        self.consumer.registerProducer(self.producer, False)
 
129
        self.consumer.finish()
 
130
        # I guess finish should behave like unregister?
 
131
        self.producer.resumed = False
 
132
        self.consumer.resumeProducing()
 
133
        self.failIf(self.producer.resumed)
 
134
 
 
135
 
 
136
class ProducerInterfaceTest:
 
137
    """Test ProducerConsumerProxy as a Producer.
 
138
 
 
139
    Normally we have ProducingServer -> ConsumingTransport.
 
140
 
 
141
    If I am to go between (Server -> Shaper -> Transport), I have to
 
142
    play the role of Producer convincingly for the ConsumingTransport.
 
143
    """
 
144
 
 
145
    def setUp(self):
 
146
        self.consumer = DummyConsumer()
 
147
        self.producer = self.proxyClass(self.consumer)
 
148
 
 
149
    def testRegistersProducer(self):
 
150
        self.failUnlessEqual(self.consumer.producer[0], self.producer)
 
151
 
 
152
    def testPause(self):
 
153
        self.producer.pauseProducing()
 
154
        self.producer.write("yakkity yak")
 
155
        self.failIf(self.consumer.getvalue(),
 
156
                    "Paused producer should not have sent data.")
 
157
 
 
158
    def testResume(self):
 
159
        self.producer.pauseProducing()
 
160
        self.producer.resumeProducing()
 
161
        self.producer.write("yakkity yak")
 
162
        self.failUnlessEqual(self.consumer.getvalue(), "yakkity yak")
 
163
 
 
164
    def testResumeNoEmptyWrite(self):
 
165
        self.producer.pauseProducing()
 
166
        self.producer.resumeProducing()
 
167
        self.failUnlessEqual(len(self.consumer._writes), 0,
 
168
                             "Resume triggered an empty write.")
 
169
 
 
170
    def testResumeBuffer(self):
 
171
        self.producer.pauseProducing()
 
172
        self.producer.write("buffer this")
 
173
        self.producer.resumeProducing()
 
174
        self.failUnlessEqual(self.consumer.getvalue(), "buffer this")
 
175
 
 
176
    def testStop(self):
 
177
        self.producer.stopProducing()
 
178
        self.producer.write("yakkity yak")
 
179
        self.failIf(self.consumer.getvalue(),
 
180
                    "Stopped producer should not have sent data.")
 
181
 
 
182
 
 
183
class PCP_ConsumerInterfaceTest(ConsumerInterfaceTest, unittest.TestCase):
 
184
    proxyClass = pcp.BasicProducerConsumerProxy
 
185
 
 
186
class PCPII_ConsumerInterfaceTest(ConsumerInterfaceTest, unittest.TestCase):
 
187
    proxyClass = pcp.ProducerConsumerProxy
 
188
 
 
189
class PCP_ProducerInterfaceTest(ProducerInterfaceTest, unittest.TestCase):
 
190
    proxyClass = pcp.BasicProducerConsumerProxy
 
191
 
 
192
class PCPII_ProducerInterfaceTest(ProducerInterfaceTest, unittest.TestCase):
 
193
    proxyClass = pcp.ProducerConsumerProxy
 
194
 
 
195
class ProducerProxyTest(unittest.TestCase):
 
196
    """Producer methods on me should be relayed to the Producer I proxy.
 
197
    """
 
198
    proxyClass = pcp.BasicProducerConsumerProxy
 
199
 
 
200
    def setUp(self):
 
201
        self.proxy = self.proxyClass(None)
 
202
        self.parentProducer = DummyProducer(self.proxy)
 
203
        self.proxy.registerProducer(self.parentProducer, True)
 
204
 
 
205
    def testStop(self):
 
206
        self.proxy.stopProducing()
 
207
        self.failUnless(self.parentProducer.stopped)
 
208
 
 
209
 
 
210
class ConsumerProxyTest(unittest.TestCase):
 
211
    """Consumer methods on me should be relayed to the Consumer I proxy.
 
212
    """
 
213
    proxyClass = pcp.BasicProducerConsumerProxy
 
214
 
 
215
    def setUp(self):
 
216
        self.underlying = DummyConsumer()
 
217
        self.consumer = self.proxyClass(self.underlying)
 
218
 
 
219
    def testWrite(self):
 
220
        # NOTE: This test only valid for streaming (Push) systems.
 
221
        self.consumer.write("some bytes")
 
222
        self.failUnlessEqual(self.underlying.getvalue(), "some bytes")
 
223
 
 
224
    def testFinish(self):
 
225
        self.consumer.finish()
 
226
        self.failUnless(self.underlying.finished)
 
227
 
 
228
    def testUnregister(self):
 
229
        self.consumer.unregisterProducer()
 
230
        self.failUnless(self.underlying.unregistered)
 
231
 
 
232
 
 
233
class PullProducerTest:
 
234
    def setUp(self):
 
235
        self.underlying = DummyConsumer()
 
236
        self.proxy = self.proxyClass(self.underlying)
 
237
        self.parentProducer = DummyProducer(self.proxy)
 
238
        self.proxy.registerProducer(self.parentProducer, True)
 
239
 
 
240
    def testHoldWrites(self):
 
241
        self.proxy.write("hello")
 
242
        # Consumer should get no data before it says resumeProducing.
 
243
        self.failIf(self.underlying.getvalue(),
 
244
                    "Pulling Consumer got data before it pulled.")
 
245
 
 
246
    def testPull(self):
 
247
        self.proxy.write("hello")
 
248
        self.proxy.resumeProducing()
 
249
        self.failUnlessEqual(self.underlying.getvalue(), "hello")
 
250
 
 
251
    def testMergeWrites(self):
 
252
        self.proxy.write("hello ")
 
253
        self.proxy.write("sunshine")
 
254
        self.proxy.resumeProducing()
 
255
        nwrites = len(self.underlying._writes)
 
256
        self.failUnlessEqual(nwrites, 1, "Pull resulted in %d writes instead "
 
257
                             "of 1." % (nwrites,))
 
258
        self.failUnlessEqual(self.underlying.getvalue(), "hello sunshine")
 
259
 
 
260
 
 
261
    def testLateWrite(self):
 
262
        # consumer sends its initial pull before we have data
 
263
        self.proxy.resumeProducing()
 
264
        self.proxy.write("data")
 
265
        # This data should answer that pull request.
 
266
        self.failUnlessEqual(self.underlying.getvalue(), "data")
 
267
 
 
268
class PCP_PullProducerTest(PullProducerTest, unittest.TestCase):
 
269
    class proxyClass(pcp.BasicProducerConsumerProxy):
 
270
        iAmStreaming = False
 
271
 
 
272
class PCPII_PullProducerTest(PullProducerTest, unittest.TestCase):
 
273
    class proxyClass(pcp.ProducerConsumerProxy):
 
274
        iAmStreaming = False
 
275
 
 
276
# Buffering!
 
277
 
 
278
class BufferedConsumerTest(unittest.TestCase):
 
279
    """As a consumer, ask the producer to pause after too much data."""
 
280
 
 
281
    proxyClass = pcp.ProducerConsumerProxy
 
282
 
 
283
    def setUp(self):
 
284
        self.underlying = DummyConsumer()
 
285
        self.proxy = self.proxyClass(self.underlying)
 
286
        self.proxy.bufferSize = 100
 
287
 
 
288
        self.parentProducer = DummyProducer(self.proxy)
 
289
        self.proxy.registerProducer(self.parentProducer, True)
 
290
 
 
291
    def testRegisterPull(self):
 
292
        self.proxy.registerProducer(self.parentProducer, False)
 
293
        ## Consumer SHOULD have called PushProducer.resumeProducing
 
294
        self.failUnless(self.parentProducer.resumed)
 
295
 
 
296
    def testPauseIntercept(self):
 
297
        self.proxy.pauseProducing()
 
298
        self.failIf(self.parentProducer.paused)
 
299
 
 
300
    def testResumeIntercept(self):
 
301
        self.proxy.pauseProducing()
 
302
        self.proxy.resumeProducing()
 
303
        # With a streaming producer, just because the proxy was resumed is
 
304
        # not necessarily a reason to resume the parent producer.  The state
 
305
        # of the buffer should decide that.
 
306
        self.failIf(self.parentProducer.resumed)
 
307
 
 
308
    def testTriggerPause(self):
 
309
        """Make sure I say \"when.\""""
 
310
 
 
311
        # Pause the proxy so data sent to it builds up in its buffer.
 
312
        self.proxy.pauseProducing()
 
313
        self.failIf(self.parentProducer.paused, "don't pause yet")
 
314
        self.proxy.write("x" * 51)
 
315
        self.failIf(self.parentProducer.paused, "don't pause yet")
 
316
        self.proxy.write("x" * 51)
 
317
        self.failUnless(self.parentProducer.paused)
 
318
 
 
319
    def testTriggerResume(self):
 
320
        """Make sure I resumeProducing when my buffer empties."""
 
321
        self.proxy.pauseProducing()
 
322
        self.proxy.write("x" * 102)
 
323
        self.failUnless(self.parentProducer.paused, "should be paused")
 
324
        self.proxy.resumeProducing()
 
325
        # Resuming should have emptied my buffer, so I should tell my
 
326
        # parent to resume too.
 
327
        self.failIf(self.parentProducer.paused,
 
328
                    "Producer should have resumed.")
 
329
        self.failIf(self.proxy.producerPaused)
 
330
 
 
331
class BufferedPullTests(unittest.TestCase):
 
332
    class proxyClass(pcp.ProducerConsumerProxy):
 
333
        iAmStreaming = False
 
334
 
 
335
        def _writeSomeData(self, data):
 
336
            pcp.ProducerConsumerProxy._writeSomeData(self, data[:100])
 
337
            return min(len(data), 100)
 
338
 
 
339
    def setUp(self):
 
340
        self.underlying = DummyConsumer()
 
341
        self.proxy = self.proxyClass(self.underlying)
 
342
        self.proxy.bufferSize = 100
 
343
 
 
344
        self.parentProducer = DummyProducer(self.proxy)
 
345
        self.proxy.registerProducer(self.parentProducer, False)
 
346
 
 
347
    def testResumePull(self):
 
348
        # If proxy has no data to send on resumeProducing, it had better pull
 
349
        # some from its PullProducer.
 
350
        self.parentProducer.resumed = False
 
351
        self.proxy.resumeProducing()
 
352
        self.failUnless(self.parentProducer.resumed)
 
353
 
 
354
    def testLateWriteBuffering(self):
 
355
        # consumer sends its initial pull before we have data
 
356
        self.proxy.resumeProducing()
 
357
        self.proxy.write("datum" * 21)
 
358
        # This data should answer that pull request.
 
359
        self.failUnlessEqual(self.underlying.getvalue(), "datum" * 20)
 
360
        # but there should be some left over
 
361
        self.failUnlessEqual(self.proxy._buffer, ["datum"])
 
362
 
 
363
 
 
364
# TODO:
 
365
#  test that web request finishing bug (when we weren't proxying
 
366
#    unregisterProducer but were proxying finish, web file transfers
 
367
#    would hang on the last block.)
 
368
#  test what happens if writeSomeBytes decided to write zero bytes.