1
# -*- test-case-name: twisted.conch.test.test_telnet -*-
2
# Copyright (c) 2001-2009 Twisted Matrix Laboratories.
3
# See LICENSE for details.
6
Tests for L{twisted.conch.telnet}.
9
from zope.interface import implements
11
from twisted.internet import defer
13
from twisted.conch import telnet
15
from twisted.trial import unittest
16
from twisted.test import proto_helpers
20
implements(telnet.ITelnetProtocol)
30
self.enabledLocal = []
31
self.enabledRemote = []
32
self.disabledLocal = []
33
self.disabledRemote = []
35
def makeConnection(self, transport):
36
d = transport.negotiationMap = {}
37
d['\x12'] = self.neg_TEST_COMMAND
39
d = transport.commandMap = transport.commandMap.copy()
40
for cmd in ('NOP', 'DM', 'BRK', 'IP', 'AO', 'AYT', 'EC', 'EL', 'GA'):
41
d[getattr(telnet, cmd)] = lambda arg, cmd=cmd: self.calls.append(cmd)
43
def dataReceived(self, bytes):
46
def connectionLost(self, reason):
49
def neg_TEST_COMMAND(self, payload):
52
def enableLocal(self, option):
53
if option in self.localEnableable:
54
self.enabledLocal.append(option)
58
def disableLocal(self, option):
59
self.disabledLocal.append(option)
61
def enableRemote(self, option):
62
if option in self.remoteEnableable:
63
self.enabledRemote.append(option)
67
def disableRemote(self, option):
68
self.disabledRemote.append(option)
72
class TelnetTransportTestCase(unittest.TestCase):
74
Tests for L{telnet.TelnetTransport}.
77
self.p = telnet.TelnetTransport(TestProtocol)
78
self.t = proto_helpers.StringTransport()
79
self.p.makeConnection(self.t)
81
def testRegularBytes(self):
82
# Just send a bunch of bytes. None of these do anything
83
# with telnet. They should pass right through to the
87
L = ["here are some bytes la la la",
88
"some more arrive here",
89
"lots of bytes to play with",
94
self.p.dataReceived(b)
96
self.assertEquals(h.bytes, ''.join(L))
98
def testNewlineHandling(self):
99
# Send various kinds of newlines and make sure they get translated
103
L = ["here is the first line\r\n",
104
"here is the second line\r\0",
105
"here is the third line\r\n",
106
"here is the last line\r\0"]
109
self.p.dataReceived(b)
111
self.assertEquals(h.bytes, L[0][:-2] + '\n' +
116
def testIACEscape(self):
117
# Send a bunch of bytes and a couple quoted \xFFs. Unquoted,
118
# \xFF is a telnet command. Quoted, one of them from each pair
119
# should be passed through to the application layer.
122
L = ["here are some bytes\xff\xff with an embedded IAC",
123
"and here is a test of a border escape\xff",
124
"\xff did you get that IAC?"]
127
self.p.dataReceived(b)
129
self.assertEquals(h.bytes, ''.join(L).replace('\xff\xff', '\xff'))
131
def _simpleCommandTest(self, cmdName):
132
# Send a single simple telnet command and make sure
133
# it gets noticed and the appropriate method gets
137
cmd = telnet.IAC + getattr(telnet, cmdName)
138
L = ["Here's some bytes, tra la la",
139
"But ono!" + cmd + " an interrupt"]
142
self.p.dataReceived(b)
144
self.assertEquals(h.calls, [cmdName])
145
self.assertEquals(h.bytes, ''.join(L).replace(cmd, ''))
147
def testInterrupt(self):
148
self._simpleCommandTest("IP")
150
def testNoOperation(self):
151
self._simpleCommandTest("NOP")
153
def testDataMark(self):
154
self._simpleCommandTest("DM")
157
self._simpleCommandTest("BRK")
159
def testAbortOutput(self):
160
self._simpleCommandTest("AO")
162
def testAreYouThere(self):
163
self._simpleCommandTest("AYT")
165
def testEraseCharacter(self):
166
self._simpleCommandTest("EC")
168
def testEraseLine(self):
169
self._simpleCommandTest("EL")
171
def testGoAhead(self):
172
self._simpleCommandTest("GA")
174
def testSubnegotiation(self):
175
# Send a subnegotiation command and make sure it gets
176
# parsed and that the correct method is called.
179
cmd = telnet.IAC + telnet.SB + '\x12hello world' + telnet.IAC + telnet.SE
180
L = ["These are some bytes but soon" + cmd,
181
"there will be some more"]
184
self.p.dataReceived(b)
186
self.assertEquals(h.bytes, ''.join(L).replace(cmd, ''))
187
self.assertEquals(h.subcmd, list("hello world"))
189
def testSubnegotiationWithEmbeddedSE(self):
190
# Send a subnegotiation command with an embedded SE. Make sure
191
# that SE gets passed to the correct method.
194
cmd = (telnet.IAC + telnet.SB +
196
telnet.IAC + telnet.SE)
198
L = ["Some bytes are here" + cmd + "and here",
202
self.p.dataReceived(b)
204
self.assertEquals(h.bytes, ''.join(L).replace(cmd, ''))
205
self.assertEquals(h.subcmd, [telnet.SE])
207
def testBoundarySubnegotiation(self):
208
# Send a subnegotiation command. Split it at every possible byte boundary
209
# and make sure it always gets parsed and that it is passed to the correct
211
cmd = (telnet.IAC + telnet.SB +
212
'\x12' + telnet.SE + 'hello' +
213
telnet.IAC + telnet.SE)
215
for i in range(len(cmd)):
216
h = self.p.protocol = TestProtocol()
217
h.makeConnection(self.p)
219
a, b = cmd[:i], cmd[i:]
220
L = ["first part" + a,
224
self.p.dataReceived(bytes)
226
self.assertEquals(h.bytes, ''.join(L).replace(cmd, ''))
227
self.assertEquals(h.subcmd, [telnet.SE] + list('hello'))
229
def _enabledHelper(self, o, eL=[], eR=[], dL=[], dR=[]):
230
self.assertEquals(o.enabledLocal, eL)
231
self.assertEquals(o.enabledRemote, eR)
232
self.assertEquals(o.disabledLocal, dL)
233
self.assertEquals(o.disabledRemote, dR)
235
def testRefuseWill(self):
236
# Try to enable an option. The server should refuse to enable it.
237
cmd = telnet.IAC + telnet.WILL + '\x12'
239
bytes = "surrounding bytes" + cmd + "to spice things up"
240
self.p.dataReceived(bytes)
242
self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
243
self.assertEquals(self.t.value(), telnet.IAC + telnet.DONT + '\x12')
244
self._enabledHelper(self.p.protocol)
246
def testRefuseDo(self):
247
# Try to enable an option. The server should refuse to enable it.
248
cmd = telnet.IAC + telnet.DO + '\x12'
250
bytes = "surrounding bytes" + cmd + "to spice things up"
251
self.p.dataReceived(bytes)
253
self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
254
self.assertEquals(self.t.value(), telnet.IAC + telnet.WONT + '\x12')
255
self._enabledHelper(self.p.protocol)
257
def testAcceptDo(self):
258
# Try to enable an option. The option is in our allowEnable
259
# list, so we will allow it to be enabled.
260
cmd = telnet.IAC + telnet.DO + '\x19'
261
bytes = 'padding' + cmd + 'trailer'
264
h.localEnableable = ('\x19',)
265
self.p.dataReceived(bytes)
267
self.assertEquals(self.t.value(), telnet.IAC + telnet.WILL + '\x19')
268
self._enabledHelper(h, eL=['\x19'])
270
def testAcceptWill(self):
271
# Same as testAcceptDo, but reversed.
272
cmd = telnet.IAC + telnet.WILL + '\x91'
273
bytes = 'header' + cmd + 'padding'
276
h.remoteEnableable = ('\x91',)
277
self.p.dataReceived(bytes)
279
self.assertEquals(self.t.value(), telnet.IAC + telnet.DO + '\x91')
280
self._enabledHelper(h, eR=['\x91'])
282
def testAcceptWont(self):
283
# Try to disable an option. The server must allow any option to
284
# be disabled at any time. Make sure it disables it and sends
285
# back an acknowledgement of this.
286
cmd = telnet.IAC + telnet.WONT + '\x29'
288
# Jimmy it - after these two lines, the server will be in a state
289
# such that it believes the option to have been previously enabled
290
# via normal negotiation.
291
s = self.p.getOptionState('\x29')
294
bytes = "fiddle dee" + cmd
295
self.p.dataReceived(bytes)
297
self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
298
self.assertEquals(self.t.value(), telnet.IAC + telnet.DONT + '\x29')
299
self.assertEquals(s.him.state, 'no')
300
self._enabledHelper(self.p.protocol, dR=['\x29'])
302
def testAcceptDont(self):
303
# Try to disable an option. The server must allow any option to
304
# be disabled at any time. Make sure it disables it and sends
305
# back an acknowledgement of this.
306
cmd = telnet.IAC + telnet.DONT + '\x29'
308
# Jimmy it - after these two lines, the server will be in a state
309
# such that it believes the option to have beenp previously enabled
310
# via normal negotiation.
311
s = self.p.getOptionState('\x29')
314
bytes = "fiddle dum " + cmd
315
self.p.dataReceived(bytes)
317
self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
318
self.assertEquals(self.t.value(), telnet.IAC + telnet.WONT + '\x29')
319
self.assertEquals(s.us.state, 'no')
320
self._enabledHelper(self.p.protocol, dL=['\x29'])
322
def testIgnoreWont(self):
323
# Try to disable an option. The option is already disabled. The
324
# server should send nothing in response to this.
325
cmd = telnet.IAC + telnet.WONT + '\x47'
327
bytes = "dum de dum" + cmd + "tra la la"
328
self.p.dataReceived(bytes)
330
self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
331
self.assertEquals(self.t.value(), '')
332
self._enabledHelper(self.p.protocol)
334
def testIgnoreDont(self):
335
# Try to disable an option. The option is already disabled. The
336
# server should send nothing in response to this. Doing so could
337
# lead to a negotiation loop.
338
cmd = telnet.IAC + telnet.DONT + '\x47'
340
bytes = "dum de dum" + cmd + "tra la la"
341
self.p.dataReceived(bytes)
343
self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
344
self.assertEquals(self.t.value(), '')
345
self._enabledHelper(self.p.protocol)
347
def testIgnoreWill(self):
348
# Try to enable an option. The option is already enabled. The
349
# server should send nothing in response to this. Doing so could
350
# lead to a negotiation loop.
351
cmd = telnet.IAC + telnet.WILL + '\x56'
353
# Jimmy it - after these two lines, the server will be in a state
354
# such that it believes the option to have been previously enabled
355
# via normal negotiation.
356
s = self.p.getOptionState('\x56')
359
bytes = "tra la la" + cmd + "dum de dum"
360
self.p.dataReceived(bytes)
362
self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
363
self.assertEquals(self.t.value(), '')
364
self._enabledHelper(self.p.protocol)
366
def testIgnoreDo(self):
367
# Try to enable an option. The option is already enabled. The
368
# server should send nothing in response to this. Doing so could
369
# lead to a negotiation loop.
370
cmd = telnet.IAC + telnet.DO + '\x56'
372
# Jimmy it - after these two lines, the server will be in a state
373
# such that it believes the option to have been previously enabled
374
# via normal negotiation.
375
s = self.p.getOptionState('\x56')
378
bytes = "tra la la" + cmd + "dum de dum"
379
self.p.dataReceived(bytes)
381
self.assertEquals(self.p.protocol.bytes, bytes.replace(cmd, ''))
382
self.assertEquals(self.t.value(), '')
383
self._enabledHelper(self.p.protocol)
385
def testAcceptedEnableRequest(self):
386
# Try to enable an option through the user-level API. This
387
# returns a Deferred that fires when negotiation about the option
388
# finishes. Make sure it fires, make sure state gets updated
389
# properly, make sure the result indicates the option was enabled.
390
d = self.p.do('\x42')
393
h.remoteEnableable = ('\x42',)
395
self.assertEquals(self.t.value(), telnet.IAC + telnet.DO + '\x42')
397
self.p.dataReceived(telnet.IAC + telnet.WILL + '\x42')
399
d.addCallback(self.assertEquals, True)
400
d.addCallback(lambda _: self._enabledHelper(h, eR=['\x42']))
403
def testRefusedEnableRequest(self):
404
# Try to enable an option through the user-level API. This
405
# returns a Deferred that fires when negotiation about the option
406
# finishes. Make sure it fires, make sure state gets updated
407
# properly, make sure the result indicates the option was enabled.
408
d = self.p.do('\x42')
410
self.assertEquals(self.t.value(), telnet.IAC + telnet.DO + '\x42')
412
self.p.dataReceived(telnet.IAC + telnet.WONT + '\x42')
414
d = self.assertFailure(d, telnet.OptionRefused)
415
d.addCallback(lambda _: self._enabledHelper(self.p.protocol))
418
def testAcceptedDisableRequest(self):
419
# Try to disable an option through the user-level API. This
420
# returns a Deferred that fires when negotiation about the option
421
# finishes. Make sure it fires, make sure state gets updated
422
# properly, make sure the result indicates the option was enabled.
423
s = self.p.getOptionState('\x42')
426
d = self.p.dont('\x42')
428
self.assertEquals(self.t.value(), telnet.IAC + telnet.DONT + '\x42')
430
self.p.dataReceived(telnet.IAC + telnet.WONT + '\x42')
432
d.addCallback(self.assertEquals, True)
433
d.addCallback(lambda _: self._enabledHelper(self.p.protocol,
437
def testNegotiationBlocksFurtherNegotiation(self):
438
# Try to disable an option, then immediately try to enable it, then
439
# immediately try to disable it. Ensure that the 2nd and 3rd calls
440
# fail quickly with the right exception.
441
s = self.p.getOptionState('\x24')
443
d2 = self.p.dont('\x24') # fires after the first line of _final
446
d = self.p.do('\x24')
447
return self.assertFailure(d, telnet.AlreadyNegotiating)
450
d = self.p.dont('\x24')
451
return self.assertFailure(d, telnet.AlreadyNegotiating)
454
self.p.dataReceived(telnet.IAC + telnet.WONT + '\x24')
455
# an assertion that only passes if d2 has fired
456
self._enabledHelper(self.p.protocol, dR=['\x24'])
457
# Make sure we allow this
458
self.p.protocol.remoteEnableable = ('\x24',)
459
d = self.p.do('\x24')
460
self.p.dataReceived(telnet.IAC + telnet.WILL + '\x24')
461
d.addCallback(self.assertEquals, True)
462
d.addCallback(lambda _: self._enabledHelper(self.p.protocol,
469
d.addCallback(_final)
472
def testSuperfluousDisableRequestRaises(self):
473
# Try to disable a disabled option. Make sure it fails properly.
474
d = self.p.dont('\xab')
475
return self.assertFailure(d, telnet.AlreadyDisabled)
477
def testSuperfluousEnableRequestRaises(self):
478
# Try to disable a disabled option. Make sure it fails properly.
479
s = self.p.getOptionState('\xab')
481
d = self.p.do('\xab')
482
return self.assertFailure(d, telnet.AlreadyEnabled)
484
def testLostConnectionFailsDeferreds(self):
485
d1 = self.p.do('\x12')
486
d2 = self.p.do('\x23')
487
d3 = self.p.do('\x34')
489
class TestException(Exception):
492
self.p.connectionLost(TestException("Total failure!"))
494
d1 = self.assertFailure(d1, TestException)
495
d2 = self.assertFailure(d2, TestException)
496
d3 = self.assertFailure(d3, TestException)
497
return defer.gatherResults([d1, d2, d3])
500
class TestTelnet(telnet.Telnet):
502
A trivial extension of the telnet protocol class useful to unit tests.
505
telnet.Telnet.__init__(self)
509
def applicationDataReceived(self, bytes):
511
Record the given data in C{self.events}.
513
self.events.append(('bytes', bytes))
516
def unhandledCommand(self, command, bytes):
518
Record the given command in C{self.events}.
520
self.events.append(('command', command, bytes))
523
def unhandledSubnegotiation(self, command, bytes):
525
Record the given subnegotiation command in C{self.events}.
527
self.events.append(('negotiate', command, bytes))
531
class TelnetTests(unittest.TestCase):
533
Tests for L{telnet.Telnet}.
535
L{telnet.Telnet} implements the TELNET protocol (RFC 854), including option
536
and suboption negotiation, and option state tracking.
540
Create an unconnected L{telnet.Telnet} to be used by tests.
542
self.protocol = TestTelnet()
545
def test_enableLocal(self):
547
L{telnet.Telnet.enableLocal} should reject all options, since
548
L{telnet.Telnet} does not know how to implement any options.
550
self.assertFalse(self.protocol.enableLocal('\0'))
553
def test_enableRemote(self):
555
L{telnet.Telnet.enableRemote} should reject all options, since
556
L{telnet.Telnet} does not know how to implement any options.
558
self.assertFalse(self.protocol.enableRemote('\0'))
561
def test_disableLocal(self):
563
It is an error for L{telnet.Telnet.disableLocal} to be called, since
564
L{telnet.Telnet.enableLocal} will never allow any options to be enabled
565
locally. If a subclass overrides enableLocal, it must also override
568
self.assertRaises(NotImplementedError, self.protocol.disableLocal, '\0')
571
def test_disableRemote(self):
573
It is an error for L{telnet.Telnet.disableRemote} to be called, since
574
L{telnet.Telnet.enableRemote} will never allow any options to be
575
enabled remotely. If a subclass overrides enableRemote, it must also
576
override disableRemote.
578
self.assertRaises(NotImplementedError, self.protocol.disableRemote, '\0')
581
def test_requestNegotiation(self):
583
L{telnet.Telnet.requestNegotiation} formats the feature byte and the
584
payload bytes into the subnegotiation format and sends them.
588
transport = proto_helpers.StringTransport()
589
self.protocol.makeConnection(transport)
590
self.protocol.requestNegotiation('\x01', '\x02\x03')
593
# IAC SB feature bytes IAC SE
594
'\xff\xfa\x01\x02\x03\xff\xf0')
597
def test_requestNegotiationEscapesIAC(self):
599
If the payload for a subnegotiation includes I{IAC}, it is escaped by
600
L{telnet.Telnet.requestNegotiation} with another I{IAC}.
604
transport = proto_helpers.StringTransport()
605
self.protocol.makeConnection(transport)
606
self.protocol.requestNegotiation('\x01', '\xff')
609
'\xff\xfa\x01\xff\xff\xff\xf0')
612
def _deliver(self, bytes, *expected):
614
Pass the given bytes to the protocol's C{dataReceived} method and
615
assert that the given events occur.
617
received = self.protocol.events = []
618
self.protocol.dataReceived(bytes)
619
self.assertEqual(received, list(expected))
622
def test_oneApplicationDataByte(self):
624
One application-data byte in the default state gets delivered right
627
self._deliver('a', ('bytes', 'a'))
630
def test_twoApplicationDataBytes(self):
632
Two application-data bytes in the default state get delivered
635
self._deliver('bc', ('bytes', 'bc'))
638
def test_threeApplicationDataBytes(self):
640
Three application-data bytes followed by a control byte get
641
delivered, but the control byte doesn't.
643
self._deliver('def' + telnet.IAC, ('bytes', 'def'))
646
def test_escapedControl(self):
648
IAC in the escaped state gets delivered and so does another
649
application-data byte following it.
651
self._deliver(telnet.IAC)
652
self._deliver(telnet.IAC + 'g', ('bytes', telnet.IAC + 'g'))
655
def test_carriageReturn(self):
657
A carriage return only puts the protocol into the newline state. A
658
linefeed in the newline state causes just the newline to be
659
delivered. A nul in the newline state causes a carriage return to
660
be delivered. An IAC in the newline state causes a carriage return
661
to be delivered and puts the protocol into the escaped state.
662
Anything else causes a carriage return and that thing to be
666
self._deliver('\n', ('bytes', '\n'))
667
self._deliver('\r\n', ('bytes', '\n'))
670
self._deliver('\0', ('bytes', '\r'))
671
self._deliver('\r\0', ('bytes', '\r'))
674
self._deliver('a', ('bytes', '\ra'))
675
self._deliver('\ra', ('bytes', '\ra'))
679
telnet.IAC + telnet.IAC + 'x', ('bytes', '\r' + telnet.IAC + 'x'))
682
def test_applicationDataBeforeSimpleCommand(self):
684
Application bytes received before a command are delivered before the
685
command is processed.
688
'x' + telnet.IAC + telnet.NOP,
689
('bytes', 'x'), ('command', telnet.NOP, None))
692
def test_applicationDataBeforeCommand(self):
694
Application bytes received before a WILL/WONT/DO/DONT are delivered
695
before the command is processed.
697
self.protocol.commandMap = {}
699
'y' + telnet.IAC + telnet.WILL + '\x00',
700
('bytes', 'y'), ('command', telnet.WILL, '\x00'))
703
def test_applicationDataBeforeSubnegotiation(self):
705
Application bytes received before a subnegotiation command are
706
delivered before the negotiation is processed.
709
'z' + telnet.IAC + telnet.SB + 'Qx' + telnet.IAC + telnet.SE,
710
('bytes', 'z'), ('negotiate', 'Q', ['x']))