1
"""SOCKS unit tests."""
3
from twisted.trial import unittest
4
from twisted.test import proto_helpers
6
from twisted.internet import defer, address
7
from twisted.protocols import socks
9
class StringTCPTransport(proto_helpers.StringTransport):
10
stringTCPTransport_closing = False
17
return address.IPv4Address('TCP', '2.3.4.5', 42)
19
def loseConnection(self):
20
self.stringTCPTransport_closing = True
23
class SOCKSv4Driver(socks.SOCKSv4):
24
# last SOCKSv4Outgoing instantiated
25
driver_outgoing = None
27
# last SOCKSv4IncomingFactory instantiated
30
def connectClass(self, host, port, klass, *args):
33
proto.transport = StringTCPTransport()
34
proto.transport.peer = address.IPv4Address('TCP', host, port)
35
proto.connectionMade()
36
self.driver_outgoing = proto
37
return defer.succeed(proto)
39
def listenClass(self, port, klass, *args):
41
factory = klass(*args)
42
self.driver_listen = factory
45
return defer.succeed(('6.7.8.9', port))
47
class Connect(unittest.TestCase):
49
self.sock = SOCKSv4Driver()
50
self.sock.transport = StringTCPTransport()
51
self.sock.connectionMade()
54
outgoing = self.sock.driver_outgoing
55
if outgoing is not None:
56
self.assert_(outgoing.transport.stringTCPTransport_closing,
57
"Outgoing SOCKS connections need to be closed.")
59
def test_simple(self):
60
self.sock.dataReceived(
61
struct.pack('!BBH', 4, 1, 34)
62
+ socket.inet_aton('1.2.3.4')
65
sent = self.sock.transport.value()
66
self.sock.transport.clear()
67
self.assertEqual(sent,
68
struct.pack('!BBH', 0, 90, 34)
69
+ socket.inet_aton('1.2.3.4'))
70
self.assert_(not self.sock.transport.stringTCPTransport_closing)
71
self.assert_(self.sock.driver_outgoing is not None)
73
# pass some data through
74
self.sock.dataReceived('hello, world')
75
self.assertEqual(self.sock.driver_outgoing.transport.value(),
78
# the other way around
79
self.sock.driver_outgoing.dataReceived('hi there')
80
self.assertEqual(self.sock.transport.value(), 'hi there')
82
self.sock.connectionLost('fake reason')
84
def test_access_denied(self):
85
self.sock.authorize = lambda code, server, port, user: 0
86
self.sock.dataReceived(
87
struct.pack('!BBH', 4, 1, 4242)
88
+ socket.inet_aton('10.2.3.4')
91
self.assertEqual(self.sock.transport.value(),
92
struct.pack('!BBH', 0, 91, 0)
93
+ socket.inet_aton('0.0.0.0'))
94
self.assert_(self.sock.transport.stringTCPTransport_closing)
95
self.assertIdentical(self.sock.driver_outgoing, None)
97
def test_eof_remote(self):
98
self.sock.dataReceived(
99
struct.pack('!BBH', 4, 1, 34)
100
+ socket.inet_aton('1.2.3.4')
103
sent = self.sock.transport.value()
104
self.sock.transport.clear()
106
# pass some data through
107
self.sock.dataReceived('hello, world')
108
self.assertEqual(self.sock.driver_outgoing.transport.value(),
111
# now close it from the server side
112
self.sock.driver_outgoing.transport.loseConnection()
113
self.sock.driver_outgoing.connectionLost('fake reason')
115
def test_eof_local(self):
116
self.sock.dataReceived(
117
struct.pack('!BBH', 4, 1, 34)
118
+ socket.inet_aton('1.2.3.4')
121
sent = self.sock.transport.value()
122
self.sock.transport.clear()
124
# pass some data through
125
self.sock.dataReceived('hello, world')
126
self.assertEqual(self.sock.driver_outgoing.transport.value(),
129
# now close it from the client side
130
self.sock.connectionLost('fake reason')
132
class Bind(unittest.TestCase):
134
self.sock = SOCKSv4Driver()
135
self.sock.transport = StringTCPTransport()
136
self.sock.connectionMade()
138
## def tearDown(self):
139
## # TODO ensure the listen port is closed
140
## listen = self.sock.driver_listen
141
## if listen is not None:
142
## self.assert_(incoming.transport.stringTCPTransport_closing,
143
## "Incoming SOCKS connections need to be closed.")
145
def test_simple(self):
146
self.sock.dataReceived(
147
struct.pack('!BBH', 4, 2, 34)
148
+ socket.inet_aton('1.2.3.4')
151
sent = self.sock.transport.value()
152
self.sock.transport.clear()
153
self.assertEqual(sent,
154
struct.pack('!BBH', 0, 90, 1234)
155
+ socket.inet_aton('6.7.8.9'))
156
self.assert_(not self.sock.transport.stringTCPTransport_closing)
157
self.assert_(self.sock.driver_listen is not None)
160
incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
161
self.assertNotIdentical(incoming, None)
162
incoming.transport = StringTCPTransport()
163
incoming.connectionMade()
165
# now we should have the second reply packet
166
sent = self.sock.transport.value()
167
self.sock.transport.clear()
168
self.assertEqual(sent,
169
struct.pack('!BBH', 0, 90, 0)
170
+ socket.inet_aton('0.0.0.0'))
171
self.assert_(not self.sock.transport.stringTCPTransport_closing)
173
# pass some data through
174
self.sock.dataReceived('hello, world')
175
self.assertEqual(incoming.transport.value(),
178
# the other way around
179
incoming.dataReceived('hi there')
180
self.assertEqual(self.sock.transport.value(), 'hi there')
182
self.sock.connectionLost('fake reason')
184
def test_access_denied(self):
185
self.sock.authorize = lambda code, server, port, user: 0
186
self.sock.dataReceived(
187
struct.pack('!BBH', 4, 2, 4242)
188
+ socket.inet_aton('10.2.3.4')
191
self.assertEqual(self.sock.transport.value(),
192
struct.pack('!BBH', 0, 91, 0)
193
+ socket.inet_aton('0.0.0.0'))
194
self.assert_(self.sock.transport.stringTCPTransport_closing)
195
self.assertIdentical(self.sock.driver_listen, None)
197
def test_eof_remote(self):
198
self.sock.dataReceived(
199
struct.pack('!BBH', 4, 2, 34)
200
+ socket.inet_aton('1.2.3.4')
203
sent = self.sock.transport.value()
204
self.sock.transport.clear()
207
incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
208
self.assertNotIdentical(incoming, None)
209
incoming.transport = StringTCPTransport()
210
incoming.connectionMade()
212
# now we should have the second reply packet
213
sent = self.sock.transport.value()
214
self.sock.transport.clear()
215
self.assertEqual(sent,
216
struct.pack('!BBH', 0, 90, 0)
217
+ socket.inet_aton('0.0.0.0'))
218
self.assert_(not self.sock.transport.stringTCPTransport_closing)
220
# pass some data through
221
self.sock.dataReceived('hello, world')
222
self.assertEqual(incoming.transport.value(),
225
# now close it from the server side
226
incoming.transport.loseConnection()
227
incoming.connectionLost('fake reason')
229
def test_eof_local(self):
230
self.sock.dataReceived(
231
struct.pack('!BBH', 4, 2, 34)
232
+ socket.inet_aton('1.2.3.4')
235
sent = self.sock.transport.value()
236
self.sock.transport.clear()
239
incoming = self.sock.driver_listen.buildProtocol(('1.2.3.4', 5345))
240
self.assertNotIdentical(incoming, None)
241
incoming.transport = StringTCPTransport()
242
incoming.connectionMade()
244
# now we should have the second reply packet
245
sent = self.sock.transport.value()
246
self.sock.transport.clear()
247
self.assertEqual(sent,
248
struct.pack('!BBH', 0, 90, 0)
249
+ socket.inet_aton('0.0.0.0'))
250
self.assert_(not self.sock.transport.stringTCPTransport_closing)
252
# pass some data through
253
self.sock.dataReceived('hello, world')
254
self.assertEqual(incoming.transport.value(),
257
# now close it from the client side
258
self.sock.connectionLost('fake reason')
260
def test_bad_source(self):
261
self.sock.dataReceived(
262
struct.pack('!BBH', 4, 2, 34)
263
+ socket.inet_aton('1.2.3.4')
266
sent = self.sock.transport.value()
267
self.sock.transport.clear()
269
# connect from WRONG address
270
incoming = self.sock.driver_listen.buildProtocol(('1.6.6.6', 666))
271
self.assertIdentical(incoming, None)
273
# Now we should have the second reply packet and it should
274
# be a failure. The connection should be closing.
275
sent = self.sock.transport.value()
276
self.sock.transport.clear()
277
self.assertEqual(sent,
278
struct.pack('!BBH', 0, 91, 0)
279
+ socket.inet_aton('0.0.0.0'))
280
self.assert_(self.sock.transport.stringTCPTransport_closing)