~certify-web-dev/twisted/certify-staging

« back to all changes in this revision

Viewing changes to twisted/test/test_protocols.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2010-01-02 19:38:17 UTC
  • mfrom: (2.2.4 sid)
  • Revision ID: james.westby@ubuntu.com-20100102193817-jphp464ppwh7dulg
Tags: 9.0.0-1
* python-twisted: Depend on the python-twisted-* 9.0 packages.
* python-twisted: Depend on python-zope.interface only. Closes: #557781.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
 
1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
2
2
# See LICENSE for details.
3
3
 
4
 
 
5
4
"""
6
5
Test cases for twisted.protocols package.
7
6
"""
8
7
 
 
8
import struct
 
9
 
9
10
from twisted.trial import unittest
10
11
from twisted.protocols import basic, wire, portforward
11
12
from twisted.internet import reactor, protocol, defer, task, error
12
13
from twisted.test import proto_helpers
13
14
 
14
 
import struct
15
 
import StringIO
16
 
 
17
 
class StringIOWithoutClosing(StringIO.StringIO):
18
 
    """
19
 
    A StringIO that can't be closed.
20
 
    """
21
 
    def close(self):
22
 
        """
23
 
        Do nothing.
24
 
        """
25
15
 
26
16
class LineTester(basic.LineReceiver):
27
17
    """
118
108
    """
119
109
    Test wire protocols.
120
110
    """
121
 
    def testEcho(self):
 
111
    def test_echo(self):
122
112
        """
123
113
        Test wire.Echo protocol: send some data and check it send it back.
124
114
        """
125
 
        t = StringIOWithoutClosing()
 
115
        t = proto_helpers.StringTransport()
126
116
        a = wire.Echo()
127
 
        a.makeConnection(protocol.FileWrapper(t))
 
117
        a.makeConnection(t)
128
118
        a.dataReceived("hello")
129
119
        a.dataReceived("world")
130
120
        a.dataReceived("how")
131
121
        a.dataReceived("are")
132
122
        a.dataReceived("you")
133
 
        self.failUnlessEqual(t.getvalue(), "helloworldhowareyou")
134
 
 
135
 
    def testWho(self):
 
123
        self.assertEquals(t.value(), "helloworldhowareyou")
 
124
 
 
125
 
 
126
    def test_who(self):
136
127
        """
137
128
        Test wire.Who protocol.
138
129
        """
139
 
        t = StringIOWithoutClosing()
 
130
        t = proto_helpers.StringTransport()
140
131
        a = wire.Who()
141
 
        a.makeConnection(protocol.FileWrapper(t))
142
 
        self.failUnlessEqual(t.getvalue(), "root\r\n")
143
 
 
144
 
    def testQOTD(self):
 
132
        a.makeConnection(t)
 
133
        self.assertEquals(t.value(), "root\r\n")
 
134
 
 
135
 
 
136
    def test_QOTD(self):
145
137
        """
146
138
        Test wire.QOTD protocol.
147
139
        """
148
 
        t = StringIOWithoutClosing()
 
140
        t = proto_helpers.StringTransport()
149
141
        a = wire.QOTD()
150
 
        a.makeConnection(protocol.FileWrapper(t))
151
 
        self.failUnlessEqual(t.getvalue(),
152
 
                             "An apple a day keeps the doctor away.\r\n")
153
 
 
154
 
    def testDiscard(self):
 
142
        a.makeConnection(t)
 
143
        self.assertEquals(t.value(),
 
144
                          "An apple a day keeps the doctor away.\r\n")
 
145
 
 
146
 
 
147
    def test_discard(self):
155
148
        """
156
149
        Test wire.Discard protocol.
157
150
        """
158
 
        t = StringIOWithoutClosing()
 
151
        t = proto_helpers.StringTransport()
159
152
        a = wire.Discard()
160
 
        a.makeConnection(protocol.FileWrapper(t))
 
153
        a.makeConnection(t)
161
154
        a.dataReceived("hello")
162
155
        a.dataReceived("world")
163
156
        a.dataReceived("how")
164
157
        a.dataReceived("are")
165
158
        a.dataReceived("you")
166
 
        self.failUnlessEqual(t.getvalue(), "")
 
159
        self.assertEqual(t.value(), "")
 
160
 
 
161
 
167
162
 
168
163
class LineReceiverTestCase(unittest.TestCase):
169
164
    """
197
192
        expected data.
198
193
        """
199
194
        for packet_size in range(1, 10):
200
 
            t = StringIOWithoutClosing()
 
195
            t = proto_helpers.StringIOWithoutClosing()
201
196
            a = LineTester()
202
197
            a.makeConnection(protocol.FileWrapper(t))
203
198
            for i in range(len(self.buffer)/packet_size + 1):
211
206
    pause_output1 = ['twiddle1', 'twiddle2', 'pause']
212
207
    pause_output2 = pause_output1+['twiddle3']
213
208
 
214
 
    def testPausing(self):
 
209
    def test_pausing(self):
215
210
        """
216
211
        Test pause inside data receiving. It uses fake clock to see if
217
212
        pausing/resuming work.
218
213
        """
219
214
        for packet_size in range(1, 10):
220
 
            t = StringIOWithoutClosing()
 
215
            t = proto_helpers.StringIOWithoutClosing()
221
216
            clock = task.Clock()
222
217
            a = LineTester(clock)
223
218
            a.makeConnection(protocol.FileWrapper(t))
224
219
            for i in range(len(self.pause_buf)/packet_size + 1):
225
220
                s = self.pause_buf[i*packet_size:(i+1)*packet_size]
226
221
                a.dataReceived(s)
227
 
            self.failUnlessEqual(self.pause_output1, a.received)
 
222
            self.assertEquals(self.pause_output1, a.received)
228
223
            clock.advance(0)
229
 
            self.failUnlessEqual(self.pause_output2, a.received)
 
224
            self.assertEquals(self.pause_output2, a.received)
230
225
 
231
226
    rawpause_buf = 'twiddle1\ntwiddle2\nlen 5\nrawpause\n12345twiddle3\n'
232
227
 
234
229
    rawpause_output2 = ['twiddle1', 'twiddle2', 'len 5', 'rawpause', '12345',
235
230
                        'twiddle3']
236
231
 
237
 
    def testRawPausing(self):
 
232
    def test_rawPausing(self):
238
233
        """
239
234
        Test pause inside raw date receiving.
240
235
        """
241
236
        for packet_size in range(1, 10):
242
 
            t = StringIOWithoutClosing()
 
237
            t = proto_helpers.StringIOWithoutClosing()
243
238
            clock = task.Clock()
244
239
            a = LineTester(clock)
245
240
            a.makeConnection(protocol.FileWrapper(t))
246
241
            for i in range(len(self.rawpause_buf)/packet_size + 1):
247
242
                s = self.rawpause_buf[i*packet_size:(i+1)*packet_size]
248
243
                a.dataReceived(s)
249
 
            self.failUnlessEqual(self.rawpause_output1, a.received)
 
244
            self.assertEquals(self.rawpause_output1, a.received)
250
245
            clock.advance(0)
251
 
            self.failUnlessEqual(self.rawpause_output2, a.received)
 
246
            self.assertEquals(self.rawpause_output2, a.received)
252
247
 
253
248
    stop_buf = 'twiddle1\ntwiddle2\nstop\nmore\nstuff\n'
254
249
 
255
250
    stop_output = ['twiddle1', 'twiddle2', 'stop']
256
251
 
257
 
    def testStopProducing(self):
 
252
    def test_stopProducing(self):
258
253
        """
259
254
        Test stop inside producing.
260
255
        """
261
256
        for packet_size in range(1, 10):
262
 
            t = StringIOWithoutClosing()
 
257
            t = proto_helpers.StringIOWithoutClosing()
263
258
            a = LineTester()
264
259
            a.makeConnection(protocol.FileWrapper(t))
265
260
            for i in range(len(self.stop_buf)/packet_size + 1):
266
261
                s = self.stop_buf[i*packet_size:(i+1)*packet_size]
267
262
                a.dataReceived(s)
268
 
            self.failUnlessEqual(self.stop_output, a.received)
269
 
 
270
 
 
271
 
    def testLineReceiverAsProducer(self):
 
263
            self.assertEquals(self.stop_output, a.received)
 
264
 
 
265
 
 
266
    def test_lineReceiverAsProducer(self):
272
267
        """
273
268
        Test produce/unproduce in receiving.
274
269
        """
275
270
        a = LineTester()
276
 
        t = StringIOWithoutClosing()
 
271
        t = proto_helpers.StringIOWithoutClosing()
277
272
        a.makeConnection(protocol.FileWrapper(t))
278
273
        a.dataReceived('produce\nhello world\nunproduce\ngoodbye\n')
279
274
        self.assertEquals(a.received,
280
275
                          ['produce', 'hello world', 'unproduce', 'goodbye'])
281
276
 
282
277
 
 
278
    def test_clearLineBuffer(self):
 
279
        """
 
280
        L{LineReceiver.clearLineBuffer} removes all buffered data and returns
 
281
        it as a C{str} and can be called from beneath C{dataReceived}.
 
282
        """
 
283
        class ClearingReceiver(basic.LineReceiver):
 
284
            def lineReceived(self, line):
 
285
                self.line = line
 
286
                self.rest = self.clearLineBuffer()
 
287
 
 
288
        protocol = ClearingReceiver()
 
289
        protocol.dataReceived('foo\r\nbar\r\nbaz')
 
290
        self.assertEqual(protocol.line, 'foo')
 
291
        self.assertEqual(protocol.rest, 'bar\r\nbaz')
 
292
 
 
293
        # Deliver another line to make sure the previously buffered data is
 
294
        # really gone.
 
295
        protocol.dataReceived('quux\r\n')
 
296
        self.assertEqual(protocol.line, 'quux')
 
297
        self.assertEqual(protocol.rest, '')
 
298
 
 
299
 
 
300
 
283
301
class LineOnlyReceiverTestCase(unittest.TestCase):
284
302
    """
285
303
    Test line only receiveer.
290
308
    plastic forks
291
309
    """
292
310
 
293
 
    def testBuffer(self):
 
311
    def test_buffer(self):
294
312
        """
295
313
        Test buffering over line protocol: data received should match buffer.
296
314
        """
297
 
        t = StringIOWithoutClosing()
 
315
        t = proto_helpers.StringTransport()
298
316
        a = LineOnlyTester()
299
 
        a.makeConnection(protocol.FileWrapper(t))
 
317
        a.makeConnection(t)
300
318
        for c in self.buffer:
301
319
            a.dataReceived(c)
302
 
        self.failUnlessEqual(a.received, self.buffer.split('\n')[:-1])
 
320
        self.assertEquals(a.received, self.buffer.split('\n')[:-1])
303
321
 
304
 
    def testLineTooLong(self):
 
322
    def test_lineTooLong(self):
305
323
        """
306
324
        Test sending a line too long: it should close the connection.
307
325
        """
308
 
        t = StringIOWithoutClosing()
 
326
        t = proto_helpers.StringTransport()
309
327
        a = LineOnlyTester()
310
 
        a.makeConnection(protocol.FileWrapper(t))
 
328
        a.makeConnection(t)
311
329
        res = a.dataReceived('x'*200)
312
 
        self.assertTrue(isinstance(res, error.ConnectionLost))
 
330
        self.assertIsInstance(res, error.ConnectionLost)
313
331
 
314
332
 
315
333
 
338
356
    protocol = None
339
357
 
340
358
    def getProtocol(self):
341
 
        t = StringIOWithoutClosing()
 
359
        """
 
360
        Return a new instance of C{self.protocol} connected to a new instance
 
361
        of L{proto_helpers.StringTransport}.
 
362
        """
 
363
        t = proto_helpers.StringTransport()
342
364
        a = self.protocol()
343
 
        a.makeConnection(protocol.FileWrapper(t))
 
365
        a.makeConnection(t)
344
366
        return a
345
367
 
 
368
 
346
369
    def test_illegal(self):
347
370
        """
348
371
        Assert that illegal strings cause the transport to be closed.
351
374
            r = self.getProtocol()
352
375
            for c in s:
353
376
                r.dataReceived(c)
354
 
            self.assertEquals(r.transport.closed, 1)
 
377
            self.assertTrue(r.transport.disconnecting)
355
378
 
356
379
 
357
380
class NetstringReceiverTestCase(unittest.TestCase, LPTestCaseMixin):
364
387
 
365
388
    protocol = TestNetstring
366
389
 
367
 
    def testBuffer(self):
 
390
    def test_buffer(self):
 
391
        """
 
392
        Test that when strings are received in chunks of different lengths,
 
393
        they are still parsed correctly.
 
394
        """
368
395
        for packet_size in range(1, 10):
369
 
            t = StringIOWithoutClosing()
 
396
            t = proto_helpers.StringTransport()
370
397
            a = TestNetstring()
371
398
            a.MAX_LENGTH = 699
372
 
            a.makeConnection(protocol.FileWrapper(t))
 
399
            a.makeConnection(t)
373
400
            for s in self.strings:
374
401
                a.sendString(s)
375
 
            out = t.getvalue()
 
402
            out = t.value()
376
403
            for i in range(len(out)/packet_size + 1):
377
404
                s = out[i*packet_size:(i+1)*packet_size]
378
405
                if s:
396
423
        """
397
424
        r = self.getProtocol()
398
425
        for s in self.strings:
399
 
            for c in struct.pack(self.protocol.structFormat,len(s)) + s:
 
426
            for c in struct.pack(r.structFormat,len(s)) + s:
400
427
                r.dataReceived(c)
401
428
        self.assertEquals(r.received, self.strings)
402
429
 
416
443
        """
417
444
        r = self.getProtocol()
418
445
        r.sendString("b" * 16)
419
 
        self.assertEquals(r.transport.file.getvalue(),
420
 
            struct.pack(self.protocol.structFormat, 16) + "b" * 16)
 
446
        self.assertEquals(r.transport.value(),
 
447
            struct.pack(r.structFormat, 16) + "b" * 16)
421
448
 
422
449
 
423
450
    def test_lengthLimitExceeded(self):
430
457
        r = self.getProtocol()
431
458
        r.lengthLimitExceeded = length.append
432
459
        r.MAX_LENGTH = 10
433
 
        r.dataReceived(struct.pack(self.protocol.structFormat, 11))
 
460
        r.dataReceived(struct.pack(r.structFormat, 11))
434
461
        self.assertEqual(length, [11])
435
462
 
436
463
 
443
470
        r = self.getProtocol()
444
471
        r.MAX_LENGTH = 10
445
472
        r.dataReceived(
446
 
            struct.pack(self.protocol.structFormat, 11) + 'x' * 11)
 
473
            struct.pack(r.structFormat, 11) + 'x' * 11)
447
474
        self.assertEqual(r.received, [])
448
475
 
449
476
 
471
498
        """
472
499
        r = self.getProtocol()
473
500
        r.sendString("foo")
474
 
        self.assertEquals(r.transport.file.getvalue(), "\x00\x00\x00\x03foo")
 
501
        self.assertEquals(r.transport.value(), "\x00\x00\x00\x03foo")
475
502
        r.dataReceived("\x00\x00\x00\x04ubar")
476
503
        self.assertEquals(r.received, ["ubar"])
477
504
 
499
526
        """
500
527
        r = self.getProtocol()
501
528
        r.sendString("foo")
502
 
        self.assertEquals(r.transport.file.getvalue(), "\x00\x03foo")
 
529
        self.assertEquals(r.transport.value(), "\x00\x03foo")
503
530
        r.dataReceived("\x00\x04ubar")
504
531
        self.assertEquals(r.received, ["ubar"])
505
532
 
535
562
        """
536
563
        r = self.getProtocol()
537
564
        r.sendString("foo")
538
 
        self.assertEquals(r.transport.file.getvalue(), "\x03foo")
 
565
        self.assertEquals(r.transport.value(), "\x03foo")
539
566
        r.dataReceived("\x04ubar")
540
567
        self.assertEquals(r.received, ["ubar"])
541
568