1
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
Tests for L{twisted.protocol.socks}, an implementation of the SOCKSv4 and
11
from twisted.trial import unittest
12
from twisted.test import proto_helpers
13
from twisted.internet import defer, address, reactor
14
from twisted.internet.error import DNSLookupError
15
from twisted.protocols import socks
18
class StringTCPTransport(proto_helpers.StringTransport):
19
stringTCPTransport_closing = False
26
return address.IPv4Address('TCP', '2.3.4.5', 42)
28
def loseConnection(self):
29
self.stringTCPTransport_closing = True
33
class FakeResolverReactor:
35
Bare-bones reactor with deterministic behavior for the resolve method.
37
def __init__(self, names):
39
@type names: C{dict} containing C{str} keys and C{str} values.
40
@param names: A hostname to IP address mapping. The IP addresses are
41
stringified dotted quads.
46
def resolve(self, hostname):
48
Resolve a hostname by looking it up in the C{names} dictionary.
51
return defer.succeed(self.names[hostname])
54
DNSLookupError("FakeResolverReactor couldn't find " + hostname))
58
class SOCKSv4Driver(socks.SOCKSv4):
59
# last SOCKSv4Outgoing instantiated
60
driver_outgoing = None
62
# last SOCKSv4IncomingFactory instantiated
65
def connectClass(self, host, port, klass, *args):
68
proto.transport = StringTCPTransport()
69
proto.transport.peer = address.IPv4Address('TCP', host, port)
70
proto.connectionMade()
71
self.driver_outgoing = proto
72
return defer.succeed(proto)
74
def listenClass(self, port, klass, *args):
76
factory = klass(*args)
77
self.driver_listen = factory
80
return defer.succeed(('6.7.8.9', port))
84
class Connect(unittest.TestCase):
86
Tests for SOCKS and SOCKSv4a connect requests using the L{SOCKSv4} protocol.
89
self.sock = SOCKSv4Driver()
90
self.sock.transport = StringTCPTransport()
91
self.sock.connectionMade()
92
self.sock.reactor = FakeResolverReactor({"localhost":"127.0.0.1"})
96
outgoing = self.sock.driver_outgoing
97
if outgoing is not None:
98
self.assert_(outgoing.transport.stringTCPTransport_closing,
99
"Outgoing SOCKS connections need to be closed.")
102
def test_simple(self):
103
self.sock.dataReceived(
104
struct.pack('!BBH', 4, 1, 34)
105
+ socket.inet_aton('1.2.3.4')
108
sent = self.sock.transport.value()
109
self.sock.transport.clear()
110
self.assertEqual(sent,
111
struct.pack('!BBH', 0, 90, 34)
112
+ socket.inet_aton('1.2.3.4'))
113
self.assert_(not self.sock.transport.stringTCPTransport_closing)
114
self.assert_(self.sock.driver_outgoing is not None)
116
# pass some data through
117
self.sock.dataReceived('hello, world')
118
self.assertEqual(self.sock.driver_outgoing.transport.value(),
121
# the other way around
122
self.sock.driver_outgoing.dataReceived('hi there')
123
self.assertEqual(self.sock.transport.value(), 'hi there')
125
self.sock.connectionLost('fake reason')
128
def test_socks4aSuccessfulResolution(self):
130
If the destination IP address has zeros for the first three octets and
131
non-zero for the fourth octet, the client is attempting a v4a
132
connection. A hostname is specified after the user ID string and the
133
server connects to the address that hostname resolves to.
135
@see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol}
137
# send the domain name "localhost" to be resolved
139
struct.pack('!BBH', 4, 1, 34)
140
+ socket.inet_aton('0.0.0.1')
144
# Deliver the bytes one by one to exercise the protocol's buffering
145
# logic. FakeResolverReactor's resolve method is invoked to "resolve"
147
for byte in clientRequest:
148
self.sock.dataReceived(byte)
150
sent = self.sock.transport.value()
151
self.sock.transport.clear()
153
# Verify that the server responded with the address which will be
157
struct.pack('!BBH', 0, 90, 34) + socket.inet_aton('127.0.0.1'))
158
self.assertFalse(self.sock.transport.stringTCPTransport_closing)
159
self.assertNotIdentical(self.sock.driver_outgoing, None)
161
# Pass some data through and verify it is forwarded to the outgoing
163
self.sock.dataReceived('hello, world')
165
self.sock.driver_outgoing.transport.value(), 'hello, world')
167
# Deliver some data from the output connection and verify it is
168
# passed along to the incoming side.
169
self.sock.driver_outgoing.dataReceived('hi there')
170
self.assertEquals(self.sock.transport.value(), 'hi there')
172
self.sock.connectionLost('fake reason')
175
def test_socks4aFailedResolution(self):
177
Failed hostname resolution on a SOCKSv4a packet results in a 91 error
178
response and the connection getting closed.
180
# send the domain name "failinghost" to be resolved
182
struct.pack('!BBH', 4, 1, 34)
183
+ socket.inet_aton('0.0.0.1')
187
# Deliver the bytes one by one to exercise the protocol's buffering
188
# logic. FakeResolverReactor's resolve method is invoked to "resolve"
190
for byte in clientRequest:
191
self.sock.dataReceived(byte)
193
# Verify that the server responds with a 91 error.
194
sent = self.sock.transport.value()
197
struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0'))
199
# A failed resolution causes the transport to drop the connection.
200
self.assertTrue(self.sock.transport.stringTCPTransport_closing)
201
self.assertIdentical(self.sock.driver_outgoing, None)
204
def test_accessDenied(self):
205
self.sock.authorize = lambda code, server, port, user: 0
206
self.sock.dataReceived(
207
struct.pack('!BBH', 4, 1, 4242)
208
+ socket.inet_aton('10.2.3.4')
211
self.assertEqual(self.sock.transport.value(),
212
struct.pack('!BBH', 0, 91, 0)
213
+ socket.inet_aton('0.0.0.0'))
214
self.assert_(self.sock.transport.stringTCPTransport_closing)
215
self.assertIdentical(self.sock.driver_outgoing, None)
218
def test_eofRemote(self):
219
self.sock.dataReceived(
220
struct.pack('!BBH', 4, 1, 34)
221
+ socket.inet_aton('1.2.3.4')
224
sent = self.sock.transport.value()
225
self.sock.transport.clear()
227
# pass some data through
228
self.sock.dataReceived('hello, world')
229
self.assertEqual(self.sock.driver_outgoing.transport.value(),
232
# now close it from the server side
233
self.sock.driver_outgoing.transport.loseConnection()
234
self.sock.driver_outgoing.connectionLost('fake reason')
237
def test_eofLocal(self):
238
self.sock.dataReceived(
239
struct.pack('!BBH', 4, 1, 34)
240
+ socket.inet_aton('1.2.3.4')
243
sent = self.sock.transport.value()
244
self.sock.transport.clear()
246
# pass some data through
247
self.sock.dataReceived('hello, world')
248
self.assertEqual(self.sock.driver_outgoing.transport.value(),
251
# now close it from the client side
252
self.sock.connectionLost('fake reason')
256
class Bind(unittest.TestCase):
258
Tests for SOCKS and SOCKSv4a bind requests using the L{SOCKSv4} protocol.
261
self.sock = SOCKSv4Driver()
262
self.sock.transport = StringTCPTransport()
263
self.sock.connectionMade()
264
self.sock.reactor = FakeResolverReactor({"localhost":"127.0.0.1"})
266
## def tearDown(self):
267
## # TODO ensure the listen port is closed
268
## listen = self.sock.driver_listen
269
## if listen is not None:
270
## self.assert_(incoming.transport.stringTCPTransport_closing,
271
## "Incoming SOCKS connections need to be closed.")
273
def test_simple(self):
274
self.sock.dataReceived(
275
struct.pack('!BBH', 4, 2, 34)
276
+ socket.inet_aton('1.2.3.4')
279
sent = self.sock.transport.value()
280
self.sock.transport.clear()
281
self.assertEqual(sent,
282
struct.pack('!BBH', 0, 90, 1234)
283
+ socket.inet_aton('6.7.8.9'))
284
self.assert_(not self.sock.transport.stringTCPTransport_closing)
285
self.assert_(self.sock.driver_listen is not None)
288
incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
289
self.assertNotIdentical(incoming, None)
290
incoming.transport = StringTCPTransport()
291
incoming.connectionMade()
293
# now we should have the second reply packet
294
sent = self.sock.transport.value()
295
self.sock.transport.clear()
296
self.assertEqual(sent,
297
struct.pack('!BBH', 0, 90, 0)
298
+ socket.inet_aton('0.0.0.0'))
299
self.assert_(not self.sock.transport.stringTCPTransport_closing)
301
# pass some data through
302
self.sock.dataReceived('hello, world')
303
self.assertEqual(incoming.transport.value(),
306
# the other way around
307
incoming.dataReceived('hi there')
308
self.assertEqual(self.sock.transport.value(), 'hi there')
310
self.sock.connectionLost('fake reason')
313
def test_socks4a(self):
315
If the destination IP address has zeros for the first three octets and
316
non-zero for the fourth octet, the client is attempting a v4a
317
connection. A hostname is specified after the user ID string and the
318
server connects to the address that hostname resolves to.
320
@see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol}
322
# send the domain name "localhost" to be resolved
324
struct.pack('!BBH', 4, 2, 34)
325
+ socket.inet_aton('0.0.0.1')
329
# Deliver the bytes one by one to exercise the protocol's buffering
330
# logic. FakeResolverReactor's resolve method is invoked to "resolve"
332
for byte in clientRequest:
333
self.sock.dataReceived(byte)
335
sent = self.sock.transport.value()
336
self.sock.transport.clear()
338
# Verify that the server responded with the address which will be
342
struct.pack('!BBH', 0, 90, 1234) + socket.inet_aton('6.7.8.9'))
343
self.assertFalse(self.sock.transport.stringTCPTransport_closing)
344
self.assertNotIdentical(self.sock.driver_listen, None)
347
incoming = self.sock.driver_listen.buildProtocol(('127.0.0.1', 5345))
348
self.assertNotIdentical(incoming, None)
349
incoming.transport = StringTCPTransport()
350
incoming.connectionMade()
352
# now we should have the second reply packet
353
sent = self.sock.transport.value()
354
self.sock.transport.clear()
355
self.assertEqual(sent,
356
struct.pack('!BBH', 0, 90, 0)
357
+ socket.inet_aton('0.0.0.0'))
358
self.assertNotIdentical(
359
self.sock.transport.stringTCPTransport_closing, None)
361
# Deliver some data from the output connection and verify it is
362
# passed along to the incoming side.
363
self.sock.dataReceived('hi there')
364
self.assertEquals(incoming.transport.value(), 'hi there')
366
# the other way around
367
incoming.dataReceived('hi there')
368
self.assertEqual(self.sock.transport.value(), 'hi there')
370
self.sock.connectionLost('fake reason')
373
def test_socks4aFailedResolution(self):
375
Failed hostname resolution on a SOCKSv4a packet results in a 91 error
376
response and the connection getting closed.
378
# send the domain name "failinghost" to be resolved
380
struct.pack('!BBH', 4, 2, 34)
381
+ socket.inet_aton('0.0.0.1')
385
# Deliver the bytes one by one to exercise the protocol's buffering
386
# logic. FakeResolverReactor's resolve method is invoked to "resolve"
388
for byte in clientRequest:
389
self.sock.dataReceived(byte)
391
# Verify that the server responds with a 91 error.
392
sent = self.sock.transport.value()
395
struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0'))
397
# A failed resolution causes the transport to drop the connection.
398
self.assertTrue(self.sock.transport.stringTCPTransport_closing)
399
self.assertIdentical(self.sock.driver_outgoing, None)
402
def test_accessDenied(self):
403
self.sock.authorize = lambda code, server, port, user: 0
404
self.sock.dataReceived(
405
struct.pack('!BBH', 4, 2, 4242)
406
+ socket.inet_aton('10.2.3.4')
409
self.assertEqual(self.sock.transport.value(),
410
struct.pack('!BBH', 0, 91, 0)
411
+ socket.inet_aton('0.0.0.0'))
412
self.assert_(self.sock.transport.stringTCPTransport_closing)
413
self.assertIdentical(self.sock.driver_listen, None)
415
def test_eofRemote(self):
416
self.sock.dataReceived(
417
struct.pack('!BBH', 4, 2, 34)
418
+ socket.inet_aton('1.2.3.4')
421
sent = self.sock.transport.value()
422
self.sock.transport.clear()
425
incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
426
self.assertNotIdentical(incoming, None)
427
incoming.transport = StringTCPTransport()
428
incoming.connectionMade()
430
# now we should have the second reply packet
431
sent = self.sock.transport.value()
432
self.sock.transport.clear()
433
self.assertEqual(sent,
434
struct.pack('!BBH', 0, 90, 0)
435
+ socket.inet_aton('0.0.0.0'))
436
self.assert_(not self.sock.transport.stringTCPTransport_closing)
438
# pass some data through
439
self.sock.dataReceived('hello, world')
440
self.assertEqual(incoming.transport.value(),
443
# now close it from the server side
444
incoming.transport.loseConnection()
445
incoming.connectionLost('fake reason')
447
def test_eofLocal(self):
448
self.sock.dataReceived(
449
struct.pack('!BBH', 4, 2, 34)
450
+ socket.inet_aton('1.2.3.4')
453
sent = self.sock.transport.value()
454
self.sock.transport.clear()
457
incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
458
self.assertNotIdentical(incoming, None)
459
incoming.transport = StringTCPTransport()
460
incoming.connectionMade()
462
# now we should have the second reply packet
463
sent = self.sock.transport.value()
464
self.sock.transport.clear()
465
self.assertEqual(sent,
466
struct.pack('!BBH', 0, 90, 0)
467
+ socket.inet_aton('0.0.0.0'))
468
self.assert_(not self.sock.transport.stringTCPTransport_closing)
470
# pass some data through
471
self.sock.dataReceived('hello, world')
472
self.assertEqual(incoming.transport.value(),
475
# now close it from the client side
476
self.sock.connectionLost('fake reason')
478
def test_badSource(self):
479
self.sock.dataReceived(
480
struct.pack('!BBH', 4, 2, 34)
481
+ socket.inet_aton('1.2.3.4')
484
sent = self.sock.transport.value()
485
self.sock.transport.clear()
487
# connect from WRONG address
488
incoming = self.sock.driver_listen.buildProtocol(('1.6.6.6', 666))
489
self.assertIdentical(incoming, None)
491
# Now we should have the second reply packet and it should
492
# be a failure. The connection should be closing.
493
sent = self.sock.transport.value()
494
self.sock.transport.clear()
495
self.assertEqual(sent,
496
struct.pack('!BBH', 0, 91, 0)
497
+ socket.inet_aton('0.0.0.0'))
498
self.assert_(self.sock.transport.stringTCPTransport_closing)