~landscape/zope3/newer-from-ztk

« back to all changes in this revision

Viewing changes to src/twisted/test/test_socks.py

  • Committer: Thomas Hervé
  • Date: 2009-07-08 13:52:04 UTC
  • Revision ID: thomas@canonical.com-20090708135204-df5eesrthifpylf8
Remove twisted copy

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
"""SOCKS unit tests."""
2
 
 
3
 
from twisted.trial import unittest
4
 
from twisted.test import proto_helpers
5
 
import struct, socket
6
 
from twisted.internet import defer, address
7
 
from twisted.protocols import socks
8
 
 
9
 
class StringTCPTransport(proto_helpers.StringTransport):
10
 
    stringTCPTransport_closing = False
11
 
    peer = None
12
 
 
13
 
    def getPeer(self):
14
 
        return self.peer
15
 
 
16
 
    def getHost(self):
17
 
        return address.IPv4Address('TCP', '2.3.4.5', 42)
18
 
 
19
 
    def loseConnection(self):
20
 
        self.stringTCPTransport_closing = True
21
 
 
22
 
 
23
 
class SOCKSv4Driver(socks.SOCKSv4):
24
 
    # last SOCKSv4Outgoing instantiated
25
 
    driver_outgoing = None
26
 
 
27
 
    # last SOCKSv4IncomingFactory instantiated
28
 
    driver_listen = None
29
 
 
30
 
    def connectClass(self, host, port, klass, *args):
31
 
        # fake it
32
 
        proto = klass(*args)
33
 
        proto.transport = StringTCPTransport()
34
 
        proto.transport.peer = address.IPv4Address('TCP', host, port)
35
 
        proto.connectionMade()
36
 
        self.driver_outgoing = proto
37
 
        return defer.succeed(proto)
38
 
 
39
 
    def listenClass(self, port, klass, *args):
40
 
        # fake it
41
 
        factory = klass(*args)
42
 
        self.driver_listen = factory
43
 
        if port == 0:
44
 
            port = 1234
45
 
        return defer.succeed(('6.7.8.9', port))
46
 
 
47
 
class Connect(unittest.TestCase):
48
 
    def setUp(self):
49
 
        self.sock = SOCKSv4Driver()
50
 
        self.sock.transport = StringTCPTransport()
51
 
        self.sock.connectionMade()
52
 
 
53
 
    def tearDown(self):
54
 
        outgoing = self.sock.driver_outgoing
55
 
        if outgoing is not None:
56
 
            self.assert_(outgoing.transport.stringTCPTransport_closing,
57
 
                         "Outgoing SOCKS connections need to be closed.")
58
 
 
59
 
    def test_simple(self):
60
 
        self.sock.dataReceived(
61
 
            struct.pack('!BBH', 4, 1, 34)
62
 
            + socket.inet_aton('1.2.3.4')
63
 
            + 'fooBAR'
64
 
            + '\0')
65
 
        sent = self.sock.transport.value()
66
 
        self.sock.transport.clear()
67
 
        self.assertEqual(sent,
68
 
                         struct.pack('!BBH', 0, 90, 34)
69
 
                         + socket.inet_aton('1.2.3.4'))
70
 
        self.assert_(not self.sock.transport.stringTCPTransport_closing)
71
 
        self.assert_(self.sock.driver_outgoing is not None)
72
 
 
73
 
        # pass some data through
74
 
        self.sock.dataReceived('hello, world')
75
 
        self.assertEqual(self.sock.driver_outgoing.transport.value(),
76
 
                         'hello, world')
77
 
 
78
 
        # the other way around
79
 
        self.sock.driver_outgoing.dataReceived('hi there')
80
 
        self.assertEqual(self.sock.transport.value(), 'hi there')
81
 
 
82
 
        self.sock.connectionLost('fake reason')
83
 
 
84
 
    def test_access_denied(self):
85
 
        self.sock.authorize = lambda code, server, port, user: 0
86
 
        self.sock.dataReceived(
87
 
            struct.pack('!BBH', 4, 1, 4242)
88
 
            + socket.inet_aton('10.2.3.4')
89
 
            + 'fooBAR'
90
 
            + '\0')
91
 
        self.assertEqual(self.sock.transport.value(),
92
 
                         struct.pack('!BBH', 0, 91, 0)
93
 
                         + socket.inet_aton('0.0.0.0'))
94
 
        self.assert_(self.sock.transport.stringTCPTransport_closing)
95
 
        self.assertIdentical(self.sock.driver_outgoing, None)
96
 
 
97
 
    def test_eof_remote(self):
98
 
        self.sock.dataReceived(
99
 
            struct.pack('!BBH', 4, 1, 34)
100
 
            + socket.inet_aton('1.2.3.4')
101
 
            + 'fooBAR'
102
 
            + '\0')
103
 
        sent = self.sock.transport.value()
104
 
        self.sock.transport.clear()
105
 
 
106
 
        # pass some data through
107
 
        self.sock.dataReceived('hello, world')
108
 
        self.assertEqual(self.sock.driver_outgoing.transport.value(),
109
 
                         'hello, world')
110
 
 
111
 
        # now close it from the server side
112
 
        self.sock.driver_outgoing.transport.loseConnection()
113
 
        self.sock.driver_outgoing.connectionLost('fake reason')
114
 
 
115
 
    def test_eof_local(self):
116
 
        self.sock.dataReceived(
117
 
            struct.pack('!BBH', 4, 1, 34)
118
 
            + socket.inet_aton('1.2.3.4')
119
 
            + 'fooBAR'
120
 
            + '\0')
121
 
        sent = self.sock.transport.value()
122
 
        self.sock.transport.clear()
123
 
 
124
 
        # pass some data through
125
 
        self.sock.dataReceived('hello, world')
126
 
        self.assertEqual(self.sock.driver_outgoing.transport.value(),
127
 
                         'hello, world')
128
 
 
129
 
        # now close it from the client side
130
 
        self.sock.connectionLost('fake reason')
131
 
 
132
 
class Bind(unittest.TestCase):
133
 
    def setUp(self):
134
 
        self.sock = SOCKSv4Driver()
135
 
        self.sock.transport = StringTCPTransport()
136
 
        self.sock.connectionMade()
137
 
 
138
 
##     def tearDown(self):
139
 
##         # TODO ensure the listen port is closed
140
 
##         listen = self.sock.driver_listen
141
 
##         if listen is not None:
142
 
##             self.assert_(incoming.transport.stringTCPTransport_closing,
143
 
##                     "Incoming SOCKS connections need to be closed.")
144
 
 
145
 
    def test_simple(self):
146
 
        self.sock.dataReceived(
147
 
            struct.pack('!BBH', 4, 2, 34)
148
 
            + socket.inet_aton('1.2.3.4')
149
 
            + 'fooBAR'
150
 
            + '\0')
151
 
        sent = self.sock.transport.value()
152
 
        self.sock.transport.clear()
153
 
        self.assertEqual(sent,
154
 
                         struct.pack('!BBH', 0, 90, 1234)
155
 
                         + socket.inet_aton('6.7.8.9'))
156
 
        self.assert_(not self.sock.transport.stringTCPTransport_closing)
157
 
        self.assert_(self.sock.driver_listen is not None)
158
 
 
159
 
        # connect
160
 
        incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
161
 
        self.assertNotIdentical(incoming, None)
162
 
        incoming.transport = StringTCPTransport()
163
 
        incoming.connectionMade()
164
 
 
165
 
        # now we should have the second reply packet
166
 
        sent = self.sock.transport.value()
167
 
        self.sock.transport.clear()
168
 
        self.assertEqual(sent,
169
 
                         struct.pack('!BBH', 0, 90, 0)
170
 
                         + socket.inet_aton('0.0.0.0'))
171
 
        self.assert_(not self.sock.transport.stringTCPTransport_closing)
172
 
 
173
 
        # pass some data through
174
 
        self.sock.dataReceived('hello, world')
175
 
        self.assertEqual(incoming.transport.value(),
176
 
                         'hello, world')
177
 
 
178
 
        # the other way around
179
 
        incoming.dataReceived('hi there')
180
 
        self.assertEqual(self.sock.transport.value(), 'hi there')
181
 
 
182
 
        self.sock.connectionLost('fake reason')
183
 
 
184
 
    def test_access_denied(self):
185
 
        self.sock.authorize = lambda code, server, port, user: 0
186
 
        self.sock.dataReceived(
187
 
            struct.pack('!BBH', 4, 2, 4242)
188
 
            + socket.inet_aton('10.2.3.4')
189
 
            + 'fooBAR'
190
 
            + '\0')
191
 
        self.assertEqual(self.sock.transport.value(),
192
 
                         struct.pack('!BBH', 0, 91, 0)
193
 
                         + socket.inet_aton('0.0.0.0'))
194
 
        self.assert_(self.sock.transport.stringTCPTransport_closing)
195
 
        self.assertIdentical(self.sock.driver_listen, None)
196
 
 
197
 
    def test_eof_remote(self):
198
 
        self.sock.dataReceived(
199
 
            struct.pack('!BBH', 4, 2, 34)
200
 
            + socket.inet_aton('1.2.3.4')
201
 
            + 'fooBAR'
202
 
            + '\0')
203
 
        sent = self.sock.transport.value()
204
 
        self.sock.transport.clear()
205
 
 
206
 
        # connect
207
 
        incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
208
 
        self.assertNotIdentical(incoming, None)
209
 
        incoming.transport = StringTCPTransport()
210
 
        incoming.connectionMade()
211
 
 
212
 
        # now we should have the second reply packet
213
 
        sent = self.sock.transport.value()
214
 
        self.sock.transport.clear()
215
 
        self.assertEqual(sent,
216
 
                         struct.pack('!BBH', 0, 90, 0)
217
 
                         + socket.inet_aton('0.0.0.0'))
218
 
        self.assert_(not self.sock.transport.stringTCPTransport_closing)
219
 
 
220
 
        # pass some data through
221
 
        self.sock.dataReceived('hello, world')
222
 
        self.assertEqual(incoming.transport.value(),
223
 
                         'hello, world')
224
 
 
225
 
        # now close it from the server side
226
 
        incoming.transport.loseConnection()
227
 
        incoming.connectionLost('fake reason')
228
 
 
229
 
    def test_eof_local(self):
230
 
        self.sock.dataReceived(
231
 
            struct.pack('!BBH', 4, 2, 34)
232
 
            + socket.inet_aton('1.2.3.4')
233
 
            + 'fooBAR'
234
 
            + '\0')
235
 
        sent = self.sock.transport.value()
236
 
        self.sock.transport.clear()
237
 
 
238
 
        # connect
239
 
        incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
240
 
        self.assertNotIdentical(incoming, None)
241
 
        incoming.transport = StringTCPTransport()
242
 
        incoming.connectionMade()
243
 
 
244
 
        # now we should have the second reply packet
245
 
        sent = self.sock.transport.value()
246
 
        self.sock.transport.clear()
247
 
        self.assertEqual(sent,
248
 
                         struct.pack('!BBH', 0, 90, 0)
249
 
                         + socket.inet_aton('0.0.0.0'))
250
 
        self.assert_(not self.sock.transport.stringTCPTransport_closing)
251
 
 
252
 
        # pass some data through
253
 
        self.sock.dataReceived('hello, world')
254
 
        self.assertEqual(incoming.transport.value(),
255
 
                         'hello, world')
256
 
 
257
 
        # now close it from the client side
258
 
        self.sock.connectionLost('fake reason')
259
 
 
260
 
    def test_bad_source(self):
261
 
        self.sock.dataReceived(
262
 
            struct.pack('!BBH', 4, 2, 34)
263
 
            + socket.inet_aton('1.2.3.4')
264
 
            + 'fooBAR'
265
 
            + '\0')
266
 
        sent = self.sock.transport.value()
267
 
        self.sock.transport.clear()
268
 
 
269
 
        # connect from WRONG address
270
 
        incoming = self.sock.driver_listen.buildProtocol(('1.6.6.6', 666))
271
 
        self.assertIdentical(incoming, None)
272
 
 
273
 
        # Now we should have the second reply packet and it should
274
 
        # be a failure. The connection should be closing.
275
 
        sent = self.sock.transport.value()
276
 
        self.sock.transport.clear()
277
 
        self.assertEqual(sent,
278
 
                         struct.pack('!BBH', 0, 91, 0)
279
 
                         + socket.inet_aton('0.0.0.0'))
280
 
        self.assert_(self.sock.transport.stringTCPTransport_closing)