~justin-fathomdb/nova/justinsb-openstack-api-volumes

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/test/test_loopback.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Test case for L{twisted.protocols.loopback}.
 
6
"""
 
7
 
 
8
from zope.interface import implements
 
9
 
 
10
from twisted.trial import unittest
 
11
from twisted.trial.util import suppress as SUPPRESS
 
12
from twisted.protocols import basic, loopback
 
13
from twisted.internet import defer
 
14
from twisted.internet.protocol import Protocol
 
15
from twisted.internet.defer import Deferred
 
16
from twisted.internet.interfaces import IAddress, IPushProducer, IPullProducer
 
17
from twisted.internet import reactor, interfaces
 
18
 
 
19
 
 
20
class SimpleProtocol(basic.LineReceiver):
 
21
    def __init__(self):
 
22
        self.conn = defer.Deferred()
 
23
        self.lines = []
 
24
        self.connLost = []
 
25
 
 
26
    def connectionMade(self):
 
27
        self.conn.callback(None)
 
28
 
 
29
    def lineReceived(self, line):
 
30
        self.lines.append(line)
 
31
 
 
32
    def connectionLost(self, reason):
 
33
        self.connLost.append(reason)
 
34
 
 
35
 
 
36
class DoomProtocol(SimpleProtocol):
 
37
    i = 0
 
38
    def lineReceived(self, line):
 
39
        self.i += 1
 
40
        if self.i < 4:
 
41
            # by this point we should have connection closed,
 
42
            # but just in case we didn't we won't ever send 'Hello 4'
 
43
            self.sendLine("Hello %d" % self.i)
 
44
        SimpleProtocol.lineReceived(self, line)
 
45
        if self.lines[-1] == "Hello 3":
 
46
            self.transport.loseConnection()
 
47
 
 
48
 
 
49
class LoopbackTestCaseMixin:
 
50
    def testRegularFunction(self):
 
51
        s = SimpleProtocol()
 
52
        c = SimpleProtocol()
 
53
 
 
54
        def sendALine(result):
 
55
            s.sendLine("THIS IS LINE ONE!")
 
56
            s.transport.loseConnection()
 
57
        s.conn.addCallback(sendALine)
 
58
 
 
59
        def check(ignored):
 
60
            self.assertEquals(c.lines, ["THIS IS LINE ONE!"])
 
61
            self.assertEquals(len(s.connLost), 1)
 
62
            self.assertEquals(len(c.connLost), 1)
 
63
        d = defer.maybeDeferred(self.loopbackFunc, s, c)
 
64
        d.addCallback(check)
 
65
        return d
 
66
 
 
67
    def testSneakyHiddenDoom(self):
 
68
        s = DoomProtocol()
 
69
        c = DoomProtocol()
 
70
 
 
71
        def sendALine(result):
 
72
            s.sendLine("DOOM LINE")
 
73
        s.conn.addCallback(sendALine)
 
74
 
 
75
        def check(ignored):
 
76
            self.assertEquals(s.lines, ['Hello 1', 'Hello 2', 'Hello 3'])
 
77
            self.assertEquals(c.lines, ['DOOM LINE', 'Hello 1', 'Hello 2', 'Hello 3'])
 
78
            self.assertEquals(len(s.connLost), 1)
 
79
            self.assertEquals(len(c.connLost), 1)
 
80
        d = defer.maybeDeferred(self.loopbackFunc, s, c)
 
81
        d.addCallback(check)
 
82
        return d
 
83
 
 
84
 
 
85
 
 
86
class LoopbackTestCase(LoopbackTestCaseMixin, unittest.TestCase):
 
87
    loopbackFunc = staticmethod(loopback.loopback)
 
88
 
 
89
    def testRegularFunction(self):
 
90
        """
 
91
        Suppress loopback deprecation warning.
 
92
        """
 
93
        return LoopbackTestCaseMixin.testRegularFunction(self)
 
94
    testRegularFunction.suppress = [
 
95
        SUPPRESS(message="loopback\(\) is deprecated",
 
96
                 category=DeprecationWarning)]
 
97
 
 
98
 
 
99
 
 
100
class LoopbackAsyncTestCase(LoopbackTestCase):
 
101
    loopbackFunc = staticmethod(loopback.loopbackAsync)
 
102
 
 
103
 
 
104
    def test_makeConnection(self):
 
105
        """
 
106
        Test that the client and server protocol both have makeConnection
 
107
        invoked on them by loopbackAsync.
 
108
        """
 
109
        class TestProtocol(Protocol):
 
110
            transport = None
 
111
            def makeConnection(self, transport):
 
112
                self.transport = transport
 
113
 
 
114
        server = TestProtocol()
 
115
        client = TestProtocol()
 
116
        loopback.loopbackAsync(server, client)
 
117
        self.failIfEqual(client.transport, None)
 
118
        self.failIfEqual(server.transport, None)
 
119
 
 
120
 
 
121
    def _hostpeertest(self, get, testServer):
 
122
        """
 
123
        Test one of the permutations of client/server host/peer.
 
124
        """
 
125
        class TestProtocol(Protocol):
 
126
            def makeConnection(self, transport):
 
127
                Protocol.makeConnection(self, transport)
 
128
                self.onConnection.callback(transport)
 
129
 
 
130
        if testServer:
 
131
            server = TestProtocol()
 
132
            d = server.onConnection = Deferred()
 
133
            client = Protocol()
 
134
        else:
 
135
            server = Protocol()
 
136
            client = TestProtocol()
 
137
            d = client.onConnection = Deferred()
 
138
 
 
139
        loopback.loopbackAsync(server, client)
 
140
 
 
141
        def connected(transport):
 
142
            host = getattr(transport, get)()
 
143
            self.failUnless(IAddress.providedBy(host))
 
144
 
 
145
        return d.addCallback(connected)
 
146
 
 
147
 
 
148
    def test_serverHost(self):
 
149
        """
 
150
        Test that the server gets a transport with a properly functioning
 
151
        implementation of L{ITransport.getHost}.
 
152
        """
 
153
        return self._hostpeertest("getHost", True)
 
154
 
 
155
 
 
156
    def test_serverPeer(self):
 
157
        """
 
158
        Like C{test_serverHost} but for L{ITransport.getPeer}
 
159
        """
 
160
        return self._hostpeertest("getPeer", True)
 
161
 
 
162
 
 
163
    def test_clientHost(self, get="getHost"):
 
164
        """
 
165
        Test that the client gets a transport with a properly functioning
 
166
        implementation of L{ITransport.getHost}.
 
167
        """
 
168
        return self._hostpeertest("getHost", False)
 
169
 
 
170
 
 
171
    def test_clientPeer(self):
 
172
        """
 
173
        Like C{test_clientHost} but for L{ITransport.getPeer}.
 
174
        """
 
175
        return self._hostpeertest("getPeer", False)
 
176
 
 
177
 
 
178
    def _greetingtest(self, write, testServer):
 
179
        """
 
180
        Test one of the permutations of write/writeSequence client/server.
 
181
        """
 
182
        class GreeteeProtocol(Protocol):
 
183
            bytes = ""
 
184
            def dataReceived(self, bytes):
 
185
                self.bytes += bytes
 
186
                if self.bytes == "bytes":
 
187
                    self.received.callback(None)
 
188
 
 
189
        class GreeterProtocol(Protocol):
 
190
            def connectionMade(self):
 
191
                getattr(self.transport, write)("bytes")
 
192
 
 
193
        if testServer:
 
194
            server = GreeterProtocol()
 
195
            client = GreeteeProtocol()
 
196
            d = client.received = Deferred()
 
197
        else:
 
198
            server = GreeteeProtocol()
 
199
            d = server.received = Deferred()
 
200
            client = GreeterProtocol()
 
201
 
 
202
        loopback.loopbackAsync(server, client)
 
203
        return d
 
204
 
 
205
 
 
206
    def test_clientGreeting(self):
 
207
        """
 
208
        Test that on a connection where the client speaks first, the server
 
209
        receives the bytes sent by the client.
 
210
        """
 
211
        return self._greetingtest("write", False)
 
212
 
 
213
 
 
214
    def test_clientGreetingSequence(self):
 
215
        """
 
216
        Like C{test_clientGreeting}, but use C{writeSequence} instead of
 
217
        C{write} to issue the greeting.
 
218
        """
 
219
        return self._greetingtest("writeSequence", False)
 
220
 
 
221
 
 
222
    def test_serverGreeting(self, write="write"):
 
223
        """
 
224
        Test that on a connection where the server speaks first, the client
 
225
        receives the bytes sent by the server.
 
226
        """
 
227
        return self._greetingtest("write", True)
 
228
 
 
229
 
 
230
    def test_serverGreetingSequence(self):
 
231
        """
 
232
        Like C{test_serverGreeting}, but use C{writeSequence} instead of
 
233
        C{write} to issue the greeting.
 
234
        """
 
235
        return self._greetingtest("writeSequence", True)
 
236
 
 
237
 
 
238
    def _producertest(self, producerClass):
 
239
        toProduce = map(str, range(0, 10))
 
240
 
 
241
        class ProducingProtocol(Protocol):
 
242
            def connectionMade(self):
 
243
                self.producer = producerClass(list(toProduce))
 
244
                self.producer.start(self.transport)
 
245
 
 
246
        class ReceivingProtocol(Protocol):
 
247
            bytes = ""
 
248
            def dataReceived(self, bytes):
 
249
                self.bytes += bytes
 
250
                if self.bytes == ''.join(toProduce):
 
251
                    self.received.callback((client, server))
 
252
 
 
253
        server = ProducingProtocol()
 
254
        client = ReceivingProtocol()
 
255
        client.received = Deferred()
 
256
 
 
257
        loopback.loopbackAsync(server, client)
 
258
        return client.received
 
259
 
 
260
 
 
261
    def test_pushProducer(self):
 
262
        """
 
263
        Test a push producer registered against a loopback transport.
 
264
        """
 
265
        class PushProducer(object):
 
266
            implements(IPushProducer)
 
267
            resumed = False
 
268
 
 
269
            def __init__(self, toProduce):
 
270
                self.toProduce = toProduce
 
271
 
 
272
            def resumeProducing(self):
 
273
                self.resumed = True
 
274
 
 
275
            def start(self, consumer):
 
276
                self.consumer = consumer
 
277
                consumer.registerProducer(self, True)
 
278
                self._produceAndSchedule()
 
279
 
 
280
            def _produceAndSchedule(self):
 
281
                if self.toProduce:
 
282
                    self.consumer.write(self.toProduce.pop(0))
 
283
                    reactor.callLater(0, self._produceAndSchedule)
 
284
                else:
 
285
                    self.consumer.unregisterProducer()
 
286
        d = self._producertest(PushProducer)
 
287
 
 
288
        def finished((client, server)):
 
289
            self.failIf(
 
290
                server.producer.resumed,
 
291
                "Streaming producer should not have been resumed.")
 
292
        d.addCallback(finished)
 
293
        return d
 
294
 
 
295
 
 
296
    def test_pullProducer(self):
 
297
        """
 
298
        Test a pull producer registered against a loopback transport.
 
299
        """
 
300
        class PullProducer(object):
 
301
            implements(IPullProducer)
 
302
 
 
303
            def __init__(self, toProduce):
 
304
                self.toProduce = toProduce
 
305
 
 
306
            def start(self, consumer):
 
307
                self.consumer = consumer
 
308
                self.consumer.registerProducer(self, False)
 
309
 
 
310
            def resumeProducing(self):
 
311
                self.consumer.write(self.toProduce.pop(0))
 
312
                if not self.toProduce:
 
313
                    self.consumer.unregisterProducer()
 
314
        return self._producertest(PullProducer)
 
315
 
 
316
 
 
317
    def test_writeNotReentrant(self):
 
318
        """
 
319
        L{loopback.loopbackAsync} does not call a protocol's C{dataReceived}
 
320
        method while that protocol's transport's C{write} method is higher up
 
321
        on the stack.
 
322
        """
 
323
        class Server(Protocol):
 
324
            def dataReceived(self, bytes):
 
325
                self.transport.write("bytes")
 
326
 
 
327
        class Client(Protocol):
 
328
            ready = False
 
329
 
 
330
            def connectionMade(self):
 
331
                reactor.callLater(0, self.go)
 
332
 
 
333
            def go(self):
 
334
                self.transport.write("foo")
 
335
                self.ready = True
 
336
 
 
337
            def dataReceived(self, bytes):
 
338
                self.wasReady = self.ready
 
339
                self.transport.loseConnection()
 
340
 
 
341
 
 
342
        server = Server()
 
343
        client = Client()
 
344
        d = loopback.loopbackAsync(client, server)
 
345
        def cbFinished(ignored):
 
346
            self.assertTrue(client.wasReady)
 
347
        d.addCallback(cbFinished)
 
348
        return d
 
349
 
 
350
 
 
351
    def test_pumpPolicy(self):
 
352
        """
 
353
        The callable passed as the value for the C{pumpPolicy} parameter to
 
354
        L{loopbackAsync} is called with a L{_LoopbackQueue} of pending bytes
 
355
        and a protocol to which they should be delivered.
 
356
        """
 
357
        pumpCalls = []
 
358
        def dummyPolicy(queue, target):
 
359
            bytes = []
 
360
            while queue:
 
361
                bytes.append(queue.get())
 
362
            pumpCalls.append((target, bytes))
 
363
 
 
364
        client = Protocol()
 
365
        server = Protocol()
 
366
 
 
367
        finished = loopback.loopbackAsync(server, client, dummyPolicy)
 
368
        self.assertEquals(pumpCalls, [])
 
369
 
 
370
        client.transport.write("foo")
 
371
        client.transport.write("bar")
 
372
        server.transport.write("baz")
 
373
        server.transport.write("quux")
 
374
        server.transport.loseConnection()
 
375
 
 
376
        def cbComplete(ignored):
 
377
            self.assertEquals(
 
378
                pumpCalls,
 
379
                # The order here is somewhat arbitrary.  The implementation
 
380
                # happens to always deliver data to the client first.
 
381
                [(client, ["baz", "quux", None]),
 
382
                 (server, ["foo", "bar"])])
 
383
        finished.addCallback(cbComplete)
 
384
        return finished
 
385
 
 
386
 
 
387
    def test_identityPumpPolicy(self):
 
388
        """
 
389
        L{identityPumpPolicy} is a pump policy which calls the target's
 
390
        C{dataReceived} method one for each string in the queue passed to it.
 
391
        """
 
392
        bytes = []
 
393
        client = Protocol()
 
394
        client.dataReceived = bytes.append
 
395
        queue = loopback._LoopbackQueue()
 
396
        queue.put("foo")
 
397
        queue.put("bar")
 
398
        queue.put(None)
 
399
 
 
400
        loopback.identityPumpPolicy(queue, client)
 
401
 
 
402
        self.assertEquals(bytes, ["foo", "bar"])
 
403
 
 
404
 
 
405
    def test_collapsingPumpPolicy(self):
 
406
        """
 
407
        L{collapsingPumpPolicy} is a pump policy which calls the target's
 
408
        C{dataReceived} only once with all of the strings in the queue passed
 
409
        to it joined together.
 
410
        """
 
411
        bytes = []
 
412
        client = Protocol()
 
413
        client.dataReceived = bytes.append
 
414
        queue = loopback._LoopbackQueue()
 
415
        queue.put("foo")
 
416
        queue.put("bar")
 
417
        queue.put(None)
 
418
 
 
419
        loopback.collapsingPumpPolicy(queue, client)
 
420
 
 
421
        self.assertEquals(bytes, ["foobar"])
 
422
 
 
423
 
 
424
 
 
425
class LoopbackTCPTestCase(LoopbackTestCase):
 
426
    loopbackFunc = staticmethod(loopback.loopbackTCP)
 
427
 
 
428
 
 
429
class LoopbackUNIXTestCase(LoopbackTestCase):
 
430
    loopbackFunc = staticmethod(loopback.loopbackUNIX)
 
431
 
 
432
    if interfaces.IReactorUNIX(reactor, None) is None:
 
433
        skip = "Current reactor does not support UNIX sockets"