1
# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
1
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
2
2
# See LICENSE for details.
6
5
Test cases for twisted.protocols package.
9
10
from twisted.trial import unittest
10
11
from twisted.protocols import basic, wire, portforward
11
12
from twisted.internet import reactor, protocol, defer, task, error
12
13
from twisted.test import proto_helpers
17
class StringIOWithoutClosing(StringIO.StringIO):
19
A StringIO that can't be closed.
26
16
class LineTester(basic.LineReceiver):
119
109
Test wire protocols.
123
113
Test wire.Echo protocol: send some data and check it send it back.
125
t = StringIOWithoutClosing()
115
t = proto_helpers.StringTransport()
127
a.makeConnection(protocol.FileWrapper(t))
128
118
a.dataReceived("hello")
129
119
a.dataReceived("world")
130
120
a.dataReceived("how")
131
121
a.dataReceived("are")
132
122
a.dataReceived("you")
133
self.failUnlessEqual(t.getvalue(), "helloworldhowareyou")
123
self.assertEquals(t.value(), "helloworldhowareyou")
137
128
Test wire.Who protocol.
139
t = StringIOWithoutClosing()
130
t = proto_helpers.StringTransport()
141
a.makeConnection(protocol.FileWrapper(t))
142
self.failUnlessEqual(t.getvalue(), "root\r\n")
133
self.assertEquals(t.value(), "root\r\n")
146
138
Test wire.QOTD protocol.
148
t = StringIOWithoutClosing()
140
t = proto_helpers.StringTransport()
150
a.makeConnection(protocol.FileWrapper(t))
151
self.failUnlessEqual(t.getvalue(),
152
"An apple a day keeps the doctor away.\r\n")
154
def testDiscard(self):
143
self.assertEquals(t.value(),
144
"An apple a day keeps the doctor away.\r\n")
147
def test_discard(self):
156
149
Test wire.Discard protocol.
158
t = StringIOWithoutClosing()
151
t = proto_helpers.StringTransport()
159
152
a = wire.Discard()
160
a.makeConnection(protocol.FileWrapper(t))
161
154
a.dataReceived("hello")
162
155
a.dataReceived("world")
163
156
a.dataReceived("how")
164
157
a.dataReceived("are")
165
158
a.dataReceived("you")
166
self.failUnlessEqual(t.getvalue(), "")
159
self.assertEqual(t.value(), "")
168
163
class LineReceiverTestCase(unittest.TestCase):
211
206
pause_output1 = ['twiddle1', 'twiddle2', 'pause']
212
207
pause_output2 = pause_output1+['twiddle3']
214
def testPausing(self):
209
def test_pausing(self):
216
211
Test pause inside data receiving. It uses fake clock to see if
217
212
pausing/resuming work.
219
214
for packet_size in range(1, 10):
220
t = StringIOWithoutClosing()
215
t = proto_helpers.StringIOWithoutClosing()
221
216
clock = task.Clock()
222
217
a = LineTester(clock)
223
218
a.makeConnection(protocol.FileWrapper(t))
224
219
for i in range(len(self.pause_buf)/packet_size + 1):
225
220
s = self.pause_buf[i*packet_size:(i+1)*packet_size]
226
221
a.dataReceived(s)
227
self.failUnlessEqual(self.pause_output1, a.received)
222
self.assertEquals(self.pause_output1, a.received)
229
self.failUnlessEqual(self.pause_output2, a.received)
224
self.assertEquals(self.pause_output2, a.received)
231
226
rawpause_buf = 'twiddle1\ntwiddle2\nlen 5\nrawpause\n12345twiddle3\n'
234
229
rawpause_output2 = ['twiddle1', 'twiddle2', 'len 5', 'rawpause', '12345',
237
def testRawPausing(self):
232
def test_rawPausing(self):
239
234
Test pause inside raw date receiving.
241
236
for packet_size in range(1, 10):
242
t = StringIOWithoutClosing()
237
t = proto_helpers.StringIOWithoutClosing()
243
238
clock = task.Clock()
244
239
a = LineTester(clock)
245
240
a.makeConnection(protocol.FileWrapper(t))
246
241
for i in range(len(self.rawpause_buf)/packet_size + 1):
247
242
s = self.rawpause_buf[i*packet_size:(i+1)*packet_size]
248
243
a.dataReceived(s)
249
self.failUnlessEqual(self.rawpause_output1, a.received)
244
self.assertEquals(self.rawpause_output1, a.received)
251
self.failUnlessEqual(self.rawpause_output2, a.received)
246
self.assertEquals(self.rawpause_output2, a.received)
253
248
stop_buf = 'twiddle1\ntwiddle2\nstop\nmore\nstuff\n'
255
250
stop_output = ['twiddle1', 'twiddle2', 'stop']
257
def testStopProducing(self):
252
def test_stopProducing(self):
259
254
Test stop inside producing.
261
256
for packet_size in range(1, 10):
262
t = StringIOWithoutClosing()
257
t = proto_helpers.StringIOWithoutClosing()
264
259
a.makeConnection(protocol.FileWrapper(t))
265
260
for i in range(len(self.stop_buf)/packet_size + 1):
266
261
s = self.stop_buf[i*packet_size:(i+1)*packet_size]
267
262
a.dataReceived(s)
268
self.failUnlessEqual(self.stop_output, a.received)
271
def testLineReceiverAsProducer(self):
263
self.assertEquals(self.stop_output, a.received)
266
def test_lineReceiverAsProducer(self):
273
268
Test produce/unproduce in receiving.
276
t = StringIOWithoutClosing()
271
t = proto_helpers.StringIOWithoutClosing()
277
272
a.makeConnection(protocol.FileWrapper(t))
278
273
a.dataReceived('produce\nhello world\nunproduce\ngoodbye\n')
279
274
self.assertEquals(a.received,
280
275
['produce', 'hello world', 'unproduce', 'goodbye'])
278
def test_clearLineBuffer(self):
280
L{LineReceiver.clearLineBuffer} removes all buffered data and returns
281
it as a C{str} and can be called from beneath C{dataReceived}.
283
class ClearingReceiver(basic.LineReceiver):
284
def lineReceived(self, line):
286
self.rest = self.clearLineBuffer()
288
protocol = ClearingReceiver()
289
protocol.dataReceived('foo\r\nbar\r\nbaz')
290
self.assertEqual(protocol.line, 'foo')
291
self.assertEqual(protocol.rest, 'bar\r\nbaz')
293
# Deliver another line to make sure the previously buffered data is
295
protocol.dataReceived('quux\r\n')
296
self.assertEqual(protocol.line, 'quux')
297
self.assertEqual(protocol.rest, '')
283
301
class LineOnlyReceiverTestCase(unittest.TestCase):
285
303
Test line only receiveer.
293
def testBuffer(self):
311
def test_buffer(self):
295
313
Test buffering over line protocol: data received should match buffer.
297
t = StringIOWithoutClosing()
315
t = proto_helpers.StringTransport()
298
316
a = LineOnlyTester()
299
a.makeConnection(protocol.FileWrapper(t))
300
318
for c in self.buffer:
301
319
a.dataReceived(c)
302
self.failUnlessEqual(a.received, self.buffer.split('\n')[:-1])
320
self.assertEquals(a.received, self.buffer.split('\n')[:-1])
304
def testLineTooLong(self):
322
def test_lineTooLong(self):
306
324
Test sending a line too long: it should close the connection.
308
t = StringIOWithoutClosing()
326
t = proto_helpers.StringTransport()
309
327
a = LineOnlyTester()
310
a.makeConnection(protocol.FileWrapper(t))
311
329
res = a.dataReceived('x'*200)
312
self.assertTrue(isinstance(res, error.ConnectionLost))
330
self.assertIsInstance(res, error.ConnectionLost)