1
from txtorcon.util import maybe_ip_addr
2
from twisted.trial import unittest
3
from twisted.internet import defer
4
from zope.interface import implements
6
from txtorcon import Stream, IStreamListener, ICircuitContainer, StreamListenerMixin
10
def __init__(self, id=-999):
15
class Listener(object):
16
implements(IStreamListener)
18
def __init__(self, expected):
19
"expect is a list of tuples: (event, {key:value, key1:value1, ..})"
20
self.expected = expected
22
def checker(self, state, stream, *args, **kw):
23
if self.expected[0][0] != state:
24
raise RuntimeError('Expected event "%s" not "%s".' % (self.expected[0][0], state))
25
for (k, v) in self.expected[0][1].items():
28
raise RuntimeError('Expected argument to have value "%s", not "%s"' % (v, args))
30
for (key, value) in v.items():
32
print key, value, k, v, kw
33
raise RuntimeError('Expected keyword argument for key "%s" but found nothing.' % key)
34
elif kw[key] != value:
35
raise RuntimeError('KW Argument expected "%s" but got "%s"' % (value, kw[key]))
36
elif getattr(stream, k) != v:
37
raise RuntimeError('Expected attribute "%s" to have value "%s", not "%s"' % (k, v, getattr(stream, k)))
38
self.expected = self.expected[1:]
40
def stream_new(self, stream):
41
"a new stream has been created"
42
self.checker('new', stream)
44
def stream_succeeded(self, stream):
45
"stream has succeeded"
46
self.checker('succeeded', stream)
48
def stream_attach(self, stream, circuit):
49
"the stream has been attached to a circuit"
50
self.checker('attach', stream, circuit)
52
def stream_detach(self, stream, **kw):
53
"the stream has been attached to a circuit"
54
self.checker('detach', stream, **kw)
56
def stream_closed(self, stream, **kw):
57
"stream has been closed (won't be in controller's list anymore)"
58
self.checker('closed', stream, **kw)
60
def stream_failed(self, stream, **kw):
61
"stream failed for some reason (won't be in controller's list anymore)"
62
self.checker('failed', stream, **kw)
65
class StreamTests(unittest.TestCase):
67
implements(ICircuitContainer)
69
def find_circuit(self, id):
70
return self.circuits[id]
72
def close_circuit(self, circuit, **kw):
73
raise NotImplementedError()
75
def close_stream(self, stream, **kw):
76
return defer.succeed('OK')
81
def test_lowercase_flags(self):
82
## testing an internal method, maybe a no-no?
84
kw = dict(FOO='bar', BAR='baz')
85
flags = stream._create_flags(kw)
86
self.assertTrue('FOO' in flags)
87
self.assertTrue('foo' in flags)
88
self.assertTrue(flags['foo'] is flags['FOO'])
90
self.assertTrue('BAR' in flags)
91
self.assertTrue('bar' in flags)
92
self.assertTrue(flags['bar'] is flags['BAR'])
94
def test_listener_mixin(self):
95
listener = StreamListenerMixin()
96
from zope.interface.verify import verifyObject
97
self.assertTrue(verifyObject(IStreamListener, listener))
99
## call all the methods with None for each arg. This is mostly
100
## just to gratuitously increase test coverage, but also
101
## serves to ensure these methods don't just blow up
102
for (methodname, desc) in IStreamListener.namesAndDescriptions():
103
method = getattr(listener, methodname)
104
args = [None] * len(desc.positional)
107
def test_circuit_already_valid_in_new(self):
108
stream = Stream(self)
109
stream.circuit = FakeCircuit(1)
110
stream.update("1 NEW 0 94.23.164.42.$43ED8310EB968746970896E8835C2F1991E50B69.exit:9001 SOURCE_ADDR=(Tor_internal):0 PURPOSE=DIR_FETCH".split())
111
errs = self.flushLoggedErrors()
112
self.assertEqual(len(errs), 1)
113
self.assertTrue('Weird' in errs[0].getErrorMessage())
115
def test_magic_circuit_detach(self):
116
stream = Stream(self)
117
stream.circuit = FakeCircuit(1)
118
stream.circuit.streams = [stream]
119
stream.update("1 SENTCONNECT 0 94.23.164.42.$43ED8310EB968746970896E8835C2F1991E50B69.exit:9001 SOURCE_ADDR=(Tor_internal):0 PURPOSE=DIR_FETCH".split())
120
self.assertTrue(stream.circuit is None)
122
def test_args_in_ctor(self):
123
stream = Stream(self)
124
stream.update("1 NEW 0 94.23.164.42.$43ED8310EB968746970896E8835C2F1991E50B69.exit:9001 SOURCE_ADDR=(Tor_internal):0 PURPOSE=DIR_FETCH".split())
125
self.assertEqual(stream.id, 1)
126
self.assertEqual(stream.state, 'NEW')
128
def test_parse_resolve(self):
129
stream = Stream(self)
130
stream.update("1604 NEWRESOLVE 0 www.google.ca:0 PURPOSE=DNS_REQUEST".split())
131
self.assertEqual(stream.state, 'NEWRESOLVE')
133
def test_listener_new(self):
134
listener = Listener([('new', {'target_port':9001})])
136
stream = Stream(self)
137
stream.listen(listener)
138
stream.update("1 NEW 0 94.23.164.42.$43ED8310EB968746970896E8835C2F1991E50B69.exit:9001 SOURCE_ADDR=(Tor_internal):0 PURPOSE=DIR_FETCH".split())
140
def test_listener_attach(self):
141
self.circuits[186] = FakeCircuit(186)
143
listener = Listener([('new', {'target_host':'www.yahoo.com', 'target_port':80}),
144
('attach', {'target_addr':maybe_ip_addr('1.2.3.4')})])
146
stream = Stream(self)
147
stream.listen(listener)
148
stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
149
stream.update("316 REMAP 186 1.2.3.4:80 SOURCE=EXIT".split())
151
self.assertEqual(self.circuits[186].streams[0], stream)
153
def test_listener_attach_no_remap(self):
154
"Attachment is via SENTCONNECT on .onion addresses (for example)"
155
self.circuits[186] = FakeCircuit(186)
157
listener = Listener([('new', {'target_host':'www.yahoo.com', 'target_port':80}),
160
stream = Stream(self)
161
stream.listen(listener)
162
stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
163
stream.update("316 SENTCONNECT 186 1.2.3.4:80 SOURCE=EXIT".split())
165
self.assertEqual(self.circuits[186].streams[0], stream)
167
def test_update_wrong_stream(self):
168
self.circuits[186] = FakeCircuit(186)
170
stream = Stream(self)
171
stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
173
stream.update("999 SENTCONNECT 186 1.2.3.4:80 SOURCE=EXIT".split())
176
self.assertTrue('wrong stream' in str(e))
178
def test_update_illegal_state(self):
179
self.circuits[186] = FakeCircuit(186)
181
stream = Stream(self)
183
stream.update("316 FOO 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
186
self.assertTrue('Unknown state' in str(e))
188
def test_listen_unlisten(self):
189
self.circuits[186] = FakeCircuit(186)
191
listener = Listener([])
193
stream = Stream(self)
194
stream.listen(listener)
195
stream.unlisten(listener)
196
self.assertEqual(len(stream.listeners), 0)
198
def test_stream_changed(self):
199
"Change a stream-id mid-stream."
200
self.circuits[186] = FakeCircuit(186)
202
listener = Listener([('new', {'target_host':'www.yahoo.com', 'target_port':80}),
206
stream = Stream(self)
207
stream.listen(listener)
208
stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
209
stream.update("316 SENTCONNECT 186 1.2.3.4:80 SOURCE=EXIT".split())
210
self.assertEqual(self.circuits[186].streams[0], stream)
212
# magically change circuit ID without a DETACHED, should fail
213
stream.update("316 SUCCEEDED 999 1.2.3.4:80 SOURCE=EXIT".split())
214
errs = self.flushLoggedErrors()
215
self.assertEqual(len(errs), 1)
216
# kind of fragile to look at strings, but...
217
self.assertTrue('186 to 999' in str(errs[0]))
219
def test_stream_changed_with_detach(self):
220
"Change a stream-id mid-stream, but with a DETACHED message"
221
self.circuits[123] = FakeCircuit(123)
222
self.circuits[456] = FakeCircuit(456)
224
listener = Listener([('new', {'target_host':'www.yahoo.com', 'target_port':80}),
226
('detach', {'kwargs': dict(reason='END', remote_reason='MISC')}),
229
stream = Stream(self)
230
stream.listen(listener)
231
stream.update("999 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
232
stream.update("999 SENTCONNECT 123 1.2.3.4:80".split())
233
self.assertEqual(len(self.circuits[123].streams), 1)
234
self.assertEqual(self.circuits[123].streams[0], stream)
236
stream.update("999 DETACHED 123 1.2.3.4:80 REASON=END REMOTE_REASON=MISC".split())
237
self.assertEqual(len(self.circuits[123].streams), 0)
239
stream.update("999 SENTCONNECT 456 1.2.3.4:80 SOURCE=EXIT".split())
240
self.assertEqual(len(self.circuits[456].streams), 1)
241
self.assertEqual(self.circuits[456].streams[0], stream)
243
def test_listener_close(self):
244
self.circuits[186] = FakeCircuit(186)
246
listener = Listener([('new', {'target_host':'www.yahoo.com', 'target_port':80}),
247
('attach', {'target_addr':maybe_ip_addr('1.2.3.4')}),
248
('closed', {'kwargs': dict(REASON='END', REMOTE_REASON='DONE')})])
249
stream = Stream(self)
250
stream.listen(listener)
251
stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
252
stream.update("316 REMAP 186 1.2.3.4:80 SOURCE=EXIT".split())
253
stream.update("316 CLOSED 186 1.2.3.4:80 REASON=END REMOTE_REASON=DONE".split())
255
self.assertEqual(len(self.circuits[186].streams), 0)
257
def test_listener_fail(self):
258
listener = Listener([('new', {'target_host':'www.yahoo.com', 'target_port':80}),
259
('attach', {'target_addr':maybe_ip_addr('1.2.3.4')}),
260
('failed', {'kwargs': dict(REASON='TIMEOUT', REMOTE_REASON='DESTROYED')})])
261
stream = Stream(self)
262
stream.listen(listener)
263
stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
264
self.circuits[186] = FakeCircuit(186)
265
stream.update("316 REMAP 186 1.2.3.4:80 SOURCE=EXIT".split())
266
stream.update("316 FAILED 0 1.2.3.4:80 REASON=TIMEOUT REMOTE_REASON=DESTROYED".split())
269
stream = Stream(self)
270
stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
271
stream.circuit = FakeCircuit(1)
275
listener = Listener([('new', {'target_host':'::1', 'target_port':80})])
277
stream = Stream(self)
278
stream.listen(listener)
279
stream.update("1234 NEW 0 ::1:80 SOURCE_ADDR=127.0.0.1:57349 PURPOSE=USER".split())
281
def test_ipv6_remap(self):
282
stream = Stream(self)
283
stream.update("1234 REMAP 0 ::1:80 SOURCE_ADDR=127.0.0.1:57349 PURPOSE=USER".split())
284
self.assertEqual(stream.target_addr, maybe_ip_addr('::1'))
286
def test_ipv6_source(self):
287
listener = Listener([('new', {'source_addr':maybe_ip_addr('::1'), 'source_port':12345})])
289
stream = Stream(self)
290
stream.listen(listener)
291
stream.update("1234 NEW 0 127.0.0.1:80 SOURCE_ADDR=::1:12345 PURPOSE=USER".split())
293
def test_states_and_uris(self):
294
self.circuits[1] = FakeCircuit(1)
296
stream = Stream(self)
297
for address in ['1.2.3.4:80',
298
'1.2.3.4.315D5684D5343580D409F16119F78D776A58AEFB.exit:80',
299
'timaq4ygg2iegci7.onion:80']:
301
line = "316 %s 1 %s REASON=FOO"
302
for state in ['NEW', 'SUCCEEDED', 'REMAP',
304
'DETACHED', 'NEWRESOLVE', 'SENTRESOLVE',
306
stream.update((line % (state, address)).split(' '))
307
self.assertEqual(stream.state, state)
309
def test_close_stream(self):
310
self.circuits[186] = FakeCircuit(186)
311
stream = Stream(self)
312
stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
313
stream.update("316 REMAP 186 1.2.3.4:80 SOURCE=EXIT".split())
315
self.assertEqual(len(self.circuits[186].streams), 1)
318
self.assertTrue(not d.called)
319
self.assertEqual(len(self.circuits[186].streams), 1)
321
stream.update("316 CLOSED 186 1.2.3.4:80 REASON=END REMOTE_REASON=DONE".split())
322
self.assertTrue(d.called)
323
self.assertEqual(len(self.circuits[186].streams), 0)