~jk0/nova/xs-ipv6

« back to all changes in this revision

Viewing changes to vendor/Twisted-10.0.0/twisted/test/test_socks.py

  • Committer: Jesse Andrews
  • Date: 2010-05-28 06:05:26 UTC
  • Revision ID: git-v1:bf6e6e718cdc7488e2da87b21e258ccc065fe499
initial commit

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
"""
 
5
Tests for L{twisted.protocol.socks}, an implementation of the SOCKSv4 and
 
6
SOCKSv4a protocols.
 
7
"""
 
8
 
 
9
import struct, socket
 
10
 
 
11
from twisted.trial import unittest
 
12
from twisted.test import proto_helpers
 
13
from twisted.internet import defer, address, reactor
 
14
from twisted.internet.error import DNSLookupError
 
15
from twisted.protocols import socks
 
16
 
 
17
 
 
18
class StringTCPTransport(proto_helpers.StringTransport):
 
19
    stringTCPTransport_closing = False
 
20
    peer = None
 
21
 
 
22
    def getPeer(self):
 
23
        return self.peer
 
24
 
 
25
    def getHost(self):
 
26
        return address.IPv4Address('TCP', '2.3.4.5', 42)
 
27
 
 
28
    def loseConnection(self):
 
29
        self.stringTCPTransport_closing = True
 
30
 
 
31
 
 
32
 
 
33
class FakeResolverReactor:
 
34
    """
 
35
    Bare-bones reactor with deterministic behavior for the resolve method.
 
36
    """
 
37
    def __init__(self, names):
 
38
        """
 
39
        @type names: C{dict} containing C{str} keys and C{str} values.
 
40
        @param names: A hostname to IP address mapping. The IP addresses are
 
41
            stringified dotted quads.
 
42
        """
 
43
        self.names = names
 
44
 
 
45
 
 
46
    def resolve(self, hostname):
 
47
        """
 
48
        Resolve a hostname by looking it up in the C{names} dictionary.
 
49
        """
 
50
        try:
 
51
            return defer.succeed(self.names[hostname])
 
52
        except KeyError:
 
53
            return defer.fail(
 
54
                DNSLookupError("FakeResolverReactor couldn't find " + hostname))
 
55
 
 
56
 
 
57
 
 
58
class SOCKSv4Driver(socks.SOCKSv4):
 
59
    # last SOCKSv4Outgoing instantiated
 
60
    driver_outgoing = None
 
61
 
 
62
    # last SOCKSv4IncomingFactory instantiated
 
63
    driver_listen = None
 
64
 
 
65
    def connectClass(self, host, port, klass, *args):
 
66
        # fake it
 
67
        proto = klass(*args)
 
68
        proto.transport = StringTCPTransport()
 
69
        proto.transport.peer = address.IPv4Address('TCP', host, port)
 
70
        proto.connectionMade()
 
71
        self.driver_outgoing = proto
 
72
        return defer.succeed(proto)
 
73
 
 
74
    def listenClass(self, port, klass, *args):
 
75
        # fake it
 
76
        factory = klass(*args)
 
77
        self.driver_listen = factory
 
78
        if port == 0:
 
79
            port = 1234
 
80
        return defer.succeed(('6.7.8.9', port))
 
81
 
 
82
 
 
83
 
 
84
class Connect(unittest.TestCase):
 
85
    """
 
86
    Tests for SOCKS and SOCKSv4a connect requests using the L{SOCKSv4} protocol.
 
87
    """
 
88
    def setUp(self):
 
89
        self.sock = SOCKSv4Driver()
 
90
        self.sock.transport = StringTCPTransport()
 
91
        self.sock.connectionMade()
 
92
        self.sock.reactor = FakeResolverReactor({"localhost":"127.0.0.1"})
 
93
 
 
94
 
 
95
    def tearDown(self):
 
96
        outgoing = self.sock.driver_outgoing
 
97
        if outgoing is not None:
 
98
            self.assert_(outgoing.transport.stringTCPTransport_closing,
 
99
                         "Outgoing SOCKS connections need to be closed.")
 
100
 
 
101
 
 
102
    def test_simple(self):
 
103
        self.sock.dataReceived(
 
104
            struct.pack('!BBH', 4, 1, 34)
 
105
            + socket.inet_aton('1.2.3.4')
 
106
            + 'fooBAR'
 
107
            + '\0')
 
108
        sent = self.sock.transport.value()
 
109
        self.sock.transport.clear()
 
110
        self.assertEqual(sent,
 
111
                         struct.pack('!BBH', 0, 90, 34)
 
112
                         + socket.inet_aton('1.2.3.4'))
 
113
        self.assert_(not self.sock.transport.stringTCPTransport_closing)
 
114
        self.assert_(self.sock.driver_outgoing is not None)
 
115
 
 
116
        # pass some data through
 
117
        self.sock.dataReceived('hello, world')
 
118
        self.assertEqual(self.sock.driver_outgoing.transport.value(),
 
119
                         'hello, world')
 
120
 
 
121
        # the other way around
 
122
        self.sock.driver_outgoing.dataReceived('hi there')
 
123
        self.assertEqual(self.sock.transport.value(), 'hi there')
 
124
 
 
125
        self.sock.connectionLost('fake reason')
 
126
 
 
127
 
 
128
    def test_socks4aSuccessfulResolution(self):
 
129
        """
 
130
        If the destination IP address has zeros for the first three octets and
 
131
        non-zero for the fourth octet, the client is attempting a v4a
 
132
        connection.  A hostname is specified after the user ID string and the
 
133
        server connects to the address that hostname resolves to.
 
134
 
 
135
        @see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol}
 
136
        """
 
137
        # send the domain name "localhost" to be resolved
 
138
        clientRequest = (
 
139
            struct.pack('!BBH', 4, 1, 34)
 
140
            + socket.inet_aton('0.0.0.1')
 
141
            + 'fooBAZ\0'
 
142
            + 'localhost\0')
 
143
 
 
144
        # Deliver the bytes one by one to exercise the protocol's buffering
 
145
        # logic. FakeResolverReactor's resolve method is invoked to "resolve"
 
146
        # the hostname.
 
147
        for byte in clientRequest:
 
148
            self.sock.dataReceived(byte)
 
149
 
 
150
        sent = self.sock.transport.value()
 
151
        self.sock.transport.clear()
 
152
 
 
153
        # Verify that the server responded with the address which will be
 
154
        # connected to.
 
155
        self.assertEquals(
 
156
            sent,
 
157
            struct.pack('!BBH', 0, 90, 34) + socket.inet_aton('127.0.0.1'))
 
158
        self.assertFalse(self.sock.transport.stringTCPTransport_closing)
 
159
        self.assertNotIdentical(self.sock.driver_outgoing, None)
 
160
 
 
161
        # Pass some data through and verify it is forwarded to the outgoing
 
162
        # connection.
 
163
        self.sock.dataReceived('hello, world')
 
164
        self.assertEquals(
 
165
            self.sock.driver_outgoing.transport.value(), 'hello, world')
 
166
 
 
167
        # Deliver some data from the output connection and verify it is
 
168
        # passed along to the incoming side.
 
169
        self.sock.driver_outgoing.dataReceived('hi there')
 
170
        self.assertEquals(self.sock.transport.value(), 'hi there')
 
171
 
 
172
        self.sock.connectionLost('fake reason')
 
173
 
 
174
 
 
175
    def test_socks4aFailedResolution(self):
 
176
        """
 
177
        Failed hostname resolution on a SOCKSv4a packet results in a 91 error
 
178
        response and the connection getting closed.
 
179
        """
 
180
        # send the domain name "failinghost" to be resolved
 
181
        clientRequest = (
 
182
            struct.pack('!BBH', 4, 1, 34)
 
183
            + socket.inet_aton('0.0.0.1')
 
184
            + 'fooBAZ\0'
 
185
            + 'failinghost\0')
 
186
 
 
187
        # Deliver the bytes one by one to exercise the protocol's buffering
 
188
        # logic. FakeResolverReactor's resolve method is invoked to "resolve"
 
189
        # the hostname.
 
190
        for byte in clientRequest:
 
191
            self.sock.dataReceived(byte)
 
192
 
 
193
        # Verify that the server responds with a 91 error.
 
194
        sent = self.sock.transport.value()
 
195
        self.assertEquals(
 
196
            sent,
 
197
            struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0'))
 
198
 
 
199
        # A failed resolution causes the transport to drop the connection.
 
200
        self.assertTrue(self.sock.transport.stringTCPTransport_closing)
 
201
        self.assertIdentical(self.sock.driver_outgoing, None)
 
202
 
 
203
 
 
204
    def test_accessDenied(self):
 
205
        self.sock.authorize = lambda code, server, port, user: 0
 
206
        self.sock.dataReceived(
 
207
            struct.pack('!BBH', 4, 1, 4242)
 
208
            + socket.inet_aton('10.2.3.4')
 
209
            + 'fooBAR'
 
210
            + '\0')
 
211
        self.assertEqual(self.sock.transport.value(),
 
212
                         struct.pack('!BBH', 0, 91, 0)
 
213
                         + socket.inet_aton('0.0.0.0'))
 
214
        self.assert_(self.sock.transport.stringTCPTransport_closing)
 
215
        self.assertIdentical(self.sock.driver_outgoing, None)
 
216
 
 
217
 
 
218
    def test_eofRemote(self):
 
219
        self.sock.dataReceived(
 
220
            struct.pack('!BBH', 4, 1, 34)
 
221
            + socket.inet_aton('1.2.3.4')
 
222
            + 'fooBAR'
 
223
            + '\0')
 
224
        sent = self.sock.transport.value()
 
225
        self.sock.transport.clear()
 
226
 
 
227
        # pass some data through
 
228
        self.sock.dataReceived('hello, world')
 
229
        self.assertEqual(self.sock.driver_outgoing.transport.value(),
 
230
                         'hello, world')
 
231
 
 
232
        # now close it from the server side
 
233
        self.sock.driver_outgoing.transport.loseConnection()
 
234
        self.sock.driver_outgoing.connectionLost('fake reason')
 
235
 
 
236
 
 
237
    def test_eofLocal(self):
 
238
        self.sock.dataReceived(
 
239
            struct.pack('!BBH', 4, 1, 34)
 
240
            + socket.inet_aton('1.2.3.4')
 
241
            + 'fooBAR'
 
242
            + '\0')
 
243
        sent = self.sock.transport.value()
 
244
        self.sock.transport.clear()
 
245
 
 
246
        # pass some data through
 
247
        self.sock.dataReceived('hello, world')
 
248
        self.assertEqual(self.sock.driver_outgoing.transport.value(),
 
249
                         'hello, world')
 
250
 
 
251
        # now close it from the client side
 
252
        self.sock.connectionLost('fake reason')
 
253
 
 
254
 
 
255
 
 
256
class Bind(unittest.TestCase):
 
257
    """
 
258
    Tests for SOCKS and SOCKSv4a bind requests using the L{SOCKSv4} protocol.
 
259
    """
 
260
    def setUp(self):
 
261
        self.sock = SOCKSv4Driver()
 
262
        self.sock.transport = StringTCPTransport()
 
263
        self.sock.connectionMade()
 
264
        self.sock.reactor = FakeResolverReactor({"localhost":"127.0.0.1"})
 
265
 
 
266
##     def tearDown(self):
 
267
##         # TODO ensure the listen port is closed
 
268
##         listen = self.sock.driver_listen
 
269
##         if listen is not None:
 
270
##             self.assert_(incoming.transport.stringTCPTransport_closing,
 
271
##                     "Incoming SOCKS connections need to be closed.")
 
272
 
 
273
    def test_simple(self):
 
274
        self.sock.dataReceived(
 
275
            struct.pack('!BBH', 4, 2, 34)
 
276
            + socket.inet_aton('1.2.3.4')
 
277
            + 'fooBAR'
 
278
            + '\0')
 
279
        sent = self.sock.transport.value()
 
280
        self.sock.transport.clear()
 
281
        self.assertEqual(sent,
 
282
                         struct.pack('!BBH', 0, 90, 1234)
 
283
                         + socket.inet_aton('6.7.8.9'))
 
284
        self.assert_(not self.sock.transport.stringTCPTransport_closing)
 
285
        self.assert_(self.sock.driver_listen is not None)
 
286
 
 
287
        # connect
 
288
        incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
 
289
        self.assertNotIdentical(incoming, None)
 
290
        incoming.transport = StringTCPTransport()
 
291
        incoming.connectionMade()
 
292
 
 
293
        # now we should have the second reply packet
 
294
        sent = self.sock.transport.value()
 
295
        self.sock.transport.clear()
 
296
        self.assertEqual(sent,
 
297
                         struct.pack('!BBH', 0, 90, 0)
 
298
                         + socket.inet_aton('0.0.0.0'))
 
299
        self.assert_(not self.sock.transport.stringTCPTransport_closing)
 
300
 
 
301
        # pass some data through
 
302
        self.sock.dataReceived('hello, world')
 
303
        self.assertEqual(incoming.transport.value(),
 
304
                         'hello, world')
 
305
 
 
306
        # the other way around
 
307
        incoming.dataReceived('hi there')
 
308
        self.assertEqual(self.sock.transport.value(), 'hi there')
 
309
 
 
310
        self.sock.connectionLost('fake reason')
 
311
 
 
312
 
 
313
    def test_socks4a(self):
 
314
        """
 
315
        If the destination IP address has zeros for the first three octets and
 
316
        non-zero for the fourth octet, the client is attempting a v4a
 
317
        connection.  A hostname is specified after the user ID string and the
 
318
        server connects to the address that hostname resolves to.
 
319
 
 
320
        @see: U{http://en.wikipedia.org/wiki/SOCKS#SOCKS_4a_protocol}
 
321
        """
 
322
        # send the domain name "localhost" to be resolved
 
323
        clientRequest = (
 
324
            struct.pack('!BBH', 4, 2, 34)
 
325
            + socket.inet_aton('0.0.0.1')
 
326
            + 'fooBAZ\0'
 
327
            + 'localhost\0')
 
328
 
 
329
        # Deliver the bytes one by one to exercise the protocol's buffering
 
330
        # logic. FakeResolverReactor's resolve method is invoked to "resolve"
 
331
        # the hostname.
 
332
        for byte in clientRequest:
 
333
            self.sock.dataReceived(byte)
 
334
 
 
335
        sent = self.sock.transport.value()
 
336
        self.sock.transport.clear()
 
337
 
 
338
        # Verify that the server responded with the address which will be
 
339
        # connected to.
 
340
        self.assertEquals(
 
341
            sent,
 
342
            struct.pack('!BBH', 0, 90, 1234) + socket.inet_aton('6.7.8.9'))
 
343
        self.assertFalse(self.sock.transport.stringTCPTransport_closing)
 
344
        self.assertNotIdentical(self.sock.driver_listen, None)
 
345
 
 
346
        # connect
 
347
        incoming = self.sock.driver_listen.buildProtocol(('127.0.0.1', 5345))
 
348
        self.assertNotIdentical(incoming, None)
 
349
        incoming.transport = StringTCPTransport()
 
350
        incoming.connectionMade()
 
351
 
 
352
        # now we should have the second reply packet
 
353
        sent = self.sock.transport.value()
 
354
        self.sock.transport.clear()
 
355
        self.assertEqual(sent,
 
356
                         struct.pack('!BBH', 0, 90, 0)
 
357
                         + socket.inet_aton('0.0.0.0'))
 
358
        self.assertNotIdentical(
 
359
            self.sock.transport.stringTCPTransport_closing, None)
 
360
 
 
361
        # Deliver some data from the output connection and verify it is
 
362
        # passed along to the incoming side.
 
363
        self.sock.dataReceived('hi there')
 
364
        self.assertEquals(incoming.transport.value(), 'hi there')
 
365
 
 
366
        # the other way around
 
367
        incoming.dataReceived('hi there')
 
368
        self.assertEqual(self.sock.transport.value(), 'hi there')
 
369
 
 
370
        self.sock.connectionLost('fake reason')
 
371
 
 
372
 
 
373
    def test_socks4aFailedResolution(self):
 
374
        """
 
375
        Failed hostname resolution on a SOCKSv4a packet results in a 91 error
 
376
        response and the connection getting closed.
 
377
        """
 
378
        # send the domain name "failinghost" to be resolved
 
379
        clientRequest = (
 
380
            struct.pack('!BBH', 4, 2, 34)
 
381
            + socket.inet_aton('0.0.0.1')
 
382
            + 'fooBAZ\0'
 
383
            + 'failinghost\0')
 
384
 
 
385
        # Deliver the bytes one by one to exercise the protocol's buffering
 
386
        # logic. FakeResolverReactor's resolve method is invoked to "resolve"
 
387
        # the hostname.
 
388
        for byte in clientRequest:
 
389
            self.sock.dataReceived(byte)
 
390
 
 
391
        # Verify that the server responds with a 91 error.
 
392
        sent = self.sock.transport.value()
 
393
        self.assertEquals(
 
394
            sent,
 
395
            struct.pack('!BBH', 0, 91, 0) + socket.inet_aton('0.0.0.0'))
 
396
 
 
397
        # A failed resolution causes the transport to drop the connection.
 
398
        self.assertTrue(self.sock.transport.stringTCPTransport_closing)
 
399
        self.assertIdentical(self.sock.driver_outgoing, None)
 
400
 
 
401
 
 
402
    def test_accessDenied(self):
 
403
        self.sock.authorize = lambda code, server, port, user: 0
 
404
        self.sock.dataReceived(
 
405
            struct.pack('!BBH', 4, 2, 4242)
 
406
            + socket.inet_aton('10.2.3.4')
 
407
            + 'fooBAR'
 
408
            + '\0')
 
409
        self.assertEqual(self.sock.transport.value(),
 
410
                         struct.pack('!BBH', 0, 91, 0)
 
411
                         + socket.inet_aton('0.0.0.0'))
 
412
        self.assert_(self.sock.transport.stringTCPTransport_closing)
 
413
        self.assertIdentical(self.sock.driver_listen, None)
 
414
 
 
415
    def test_eofRemote(self):
 
416
        self.sock.dataReceived(
 
417
            struct.pack('!BBH', 4, 2, 34)
 
418
            + socket.inet_aton('1.2.3.4')
 
419
            + 'fooBAR'
 
420
            + '\0')
 
421
        sent = self.sock.transport.value()
 
422
        self.sock.transport.clear()
 
423
 
 
424
        # connect
 
425
        incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
 
426
        self.assertNotIdentical(incoming, None)
 
427
        incoming.transport = StringTCPTransport()
 
428
        incoming.connectionMade()
 
429
 
 
430
        # now we should have the second reply packet
 
431
        sent = self.sock.transport.value()
 
432
        self.sock.transport.clear()
 
433
        self.assertEqual(sent,
 
434
                         struct.pack('!BBH', 0, 90, 0)
 
435
                         + socket.inet_aton('0.0.0.0'))
 
436
        self.assert_(not self.sock.transport.stringTCPTransport_closing)
 
437
 
 
438
        # pass some data through
 
439
        self.sock.dataReceived('hello, world')
 
440
        self.assertEqual(incoming.transport.value(),
 
441
                         'hello, world')
 
442
 
 
443
        # now close it from the server side
 
444
        incoming.transport.loseConnection()
 
445
        incoming.connectionLost('fake reason')
 
446
 
 
447
    def test_eofLocal(self):
 
448
        self.sock.dataReceived(
 
449
            struct.pack('!BBH', 4, 2, 34)
 
450
            + socket.inet_aton('1.2.3.4')
 
451
            + 'fooBAR'
 
452
            + '\0')
 
453
        sent = self.sock.transport.value()
 
454
        self.sock.transport.clear()
 
455
 
 
456
        # connect
 
457
        incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
 
458
        self.assertNotIdentical(incoming, None)
 
459
        incoming.transport = StringTCPTransport()
 
460
        incoming.connectionMade()
 
461
 
 
462
        # now we should have the second reply packet
 
463
        sent = self.sock.transport.value()
 
464
        self.sock.transport.clear()
 
465
        self.assertEqual(sent,
 
466
                         struct.pack('!BBH', 0, 90, 0)
 
467
                         + socket.inet_aton('0.0.0.0'))
 
468
        self.assert_(not self.sock.transport.stringTCPTransport_closing)
 
469
 
 
470
        # pass some data through
 
471
        self.sock.dataReceived('hello, world')
 
472
        self.assertEqual(incoming.transport.value(),
 
473
                         'hello, world')
 
474
 
 
475
        # now close it from the client side
 
476
        self.sock.connectionLost('fake reason')
 
477
 
 
478
    def test_badSource(self):
 
479
        self.sock.dataReceived(
 
480
            struct.pack('!BBH', 4, 2, 34)
 
481
            + socket.inet_aton('1.2.3.4')
 
482
            + 'fooBAR'
 
483
            + '\0')
 
484
        sent = self.sock.transport.value()
 
485
        self.sock.transport.clear()
 
486
 
 
487
        # connect from WRONG address
 
488
        incoming = self.sock.driver_listen.buildProtocol(('1.6.6.6', 666))
 
489
        self.assertIdentical(incoming, None)
 
490
 
 
491
        # Now we should have the second reply packet and it should
 
492
        # be a failure. The connection should be closing.
 
493
        sent = self.sock.transport.value()
 
494
        self.sock.transport.clear()
 
495
        self.assertEqual(sent,
 
496
                         struct.pack('!BBH', 0, 91, 0)
 
497
                         + socket.inet_aton('0.0.0.0'))
 
498
        self.assert_(self.sock.transport.stringTCPTransport_closing)