~ubuntu-branches/ubuntu/trusty/txtorcon/trusty-proposed

« back to all changes in this revision

Viewing changes to test/test_stream.py

  • Committer: Package Import Robot
  • Author(s): Jérémy Bobbio
  • Date: 2014-01-21 15:10:52 UTC
  • mfrom: (1.1.3)
  • Revision ID: package-import@ubuntu.com-20140121151052-r1dw06if9912ihbp
Tags: 0.9.1-1
* New upstream release.
* Update debian/watch and verify upstream signature
* Improve debian/README.source

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
from txtorcon.util import maybe_ip_addr
 
2
from twisted.trial import unittest
 
3
from twisted.internet import defer
 
4
from zope.interface import implements
 
5
 
 
6
from txtorcon import Stream, IStreamListener, ICircuitContainer, StreamListenerMixin
 
7
 
 
8
 
 
9
class FakeCircuit:
 
10
    def __init__(self, id=-999):
 
11
        self.streams = []
 
12
        self.id = id
 
13
 
 
14
 
 
15
class Listener(object):
 
16
    implements(IStreamListener)
 
17
 
 
18
    def __init__(self, expected):
 
19
        "expect is a list of tuples: (event, {key:value, key1:value1, ..})"
 
20
        self.expected = expected
 
21
 
 
22
    def checker(self, state, stream, *args, **kw):
 
23
        if self.expected[0][0] != state:
 
24
            raise RuntimeError('Expected event "%s" not "%s".' % (self.expected[0][0], state))
 
25
        for (k, v) in self.expected[0][1].items():
 
26
            if k == 'args':
 
27
                if v != args:
 
28
                    raise RuntimeError('Expected argument to have value "%s", not "%s"' % (v, args))
 
29
            elif k == 'kwargs':
 
30
                for (key, value) in v.items():
 
31
                    if not key in kw:
 
32
                        print key, value, k, v, kw
 
33
                        raise RuntimeError('Expected keyword argument for key "%s" but found nothing.' % key)
 
34
                    elif kw[key] != value:
 
35
                        raise RuntimeError('KW Argument expected "%s" but got "%s"' % (value, kw[key]))
 
36
            elif getattr(stream, k) != v:
 
37
                raise RuntimeError('Expected attribute "%s" to have value "%s", not "%s"' % (k, v, getattr(stream, k)))
 
38
        self.expected = self.expected[1:]
 
39
 
 
40
    def stream_new(self, stream):
 
41
        "a new stream has been created"
 
42
        self.checker('new', stream)
 
43
 
 
44
    def stream_succeeded(self, stream):
 
45
        "stream has succeeded"
 
46
        self.checker('succeeded', stream)
 
47
 
 
48
    def stream_attach(self, stream, circuit):
 
49
        "the stream has been attached to a circuit"
 
50
        self.checker('attach', stream, circuit)
 
51
 
 
52
    def stream_detach(self, stream, **kw):
 
53
        "the stream has been attached to a circuit"
 
54
        self.checker('detach', stream, **kw)
 
55
 
 
56
    def stream_closed(self, stream, **kw):
 
57
        "stream has been closed (won't be in controller's list anymore)"
 
58
        self.checker('closed', stream, **kw)
 
59
 
 
60
    def stream_failed(self, stream, **kw):
 
61
        "stream failed for some reason (won't be in controller's list anymore)"
 
62
        self.checker('failed', stream, **kw)
 
63
 
 
64
 
 
65
class StreamTests(unittest.TestCase):
 
66
 
 
67
    implements(ICircuitContainer)
 
68
 
 
69
    def find_circuit(self, id):
 
70
        return self.circuits[id]
 
71
 
 
72
    def close_circuit(self, circuit, **kw):
 
73
        raise NotImplementedError()
 
74
 
 
75
    def close_stream(self, stream, **kw):
 
76
        return defer.succeed('OK')
 
77
 
 
78
    def setUp(self):
 
79
        self.circuits = {}
 
80
 
 
81
    def test_lowercase_flags(self):
 
82
        ## testing an internal method, maybe a no-no?
 
83
        stream = Stream(self)
 
84
        kw = dict(FOO='bar', BAR='baz')
 
85
        flags = stream._create_flags(kw)
 
86
        self.assertTrue('FOO' in flags)
 
87
        self.assertTrue('foo' in flags)
 
88
        self.assertTrue(flags['foo'] is flags['FOO'])
 
89
 
 
90
        self.assertTrue('BAR' in flags)
 
91
        self.assertTrue('bar' in flags)
 
92
        self.assertTrue(flags['bar'] is flags['BAR'])
 
93
 
 
94
    def test_listener_mixin(self):
 
95
        listener = StreamListenerMixin()
 
96
        from zope.interface.verify import verifyObject
 
97
        self.assertTrue(verifyObject(IStreamListener, listener))
 
98
 
 
99
        ## call all the methods with None for each arg. This is mostly
 
100
        ## just to gratuitously increase test coverage, but also
 
101
        ## serves to ensure these methods don't just blow up
 
102
        for (methodname, desc) in IStreamListener.namesAndDescriptions():
 
103
            method = getattr(listener, methodname)
 
104
            args = [None] * len(desc.positional)
 
105
            method(*args)
 
106
 
 
107
    def test_circuit_already_valid_in_new(self):
 
108
        stream = Stream(self)
 
109
        stream.circuit = FakeCircuit(1)
 
110
        stream.update("1 NEW 0 94.23.164.42.$43ED8310EB968746970896E8835C2F1991E50B69.exit:9001 SOURCE_ADDR=(Tor_internal):0 PURPOSE=DIR_FETCH".split())
 
111
        errs = self.flushLoggedErrors()
 
112
        self.assertEqual(len(errs), 1)
 
113
        self.assertTrue('Weird' in errs[0].getErrorMessage())
 
114
 
 
115
    def test_magic_circuit_detach(self):
 
116
        stream = Stream(self)
 
117
        stream.circuit = FakeCircuit(1)
 
118
        stream.circuit.streams = [stream]
 
119
        stream.update("1 SENTCONNECT 0 94.23.164.42.$43ED8310EB968746970896E8835C2F1991E50B69.exit:9001 SOURCE_ADDR=(Tor_internal):0 PURPOSE=DIR_FETCH".split())
 
120
        self.assertTrue(stream.circuit is None)
 
121
 
 
122
    def test_args_in_ctor(self):
 
123
        stream = Stream(self)
 
124
        stream.update("1 NEW 0 94.23.164.42.$43ED8310EB968746970896E8835C2F1991E50B69.exit:9001 SOURCE_ADDR=(Tor_internal):0 PURPOSE=DIR_FETCH".split())
 
125
        self.assertEqual(stream.id, 1)
 
126
        self.assertEqual(stream.state, 'NEW')
 
127
 
 
128
    def test_parse_resolve(self):
 
129
        stream = Stream(self)
 
130
        stream.update("1604 NEWRESOLVE 0 www.google.ca:0 PURPOSE=DNS_REQUEST".split())
 
131
        self.assertEqual(stream.state, 'NEWRESOLVE')
 
132
 
 
133
    def test_listener_new(self):
 
134
        listener = Listener([('new', {'target_port':9001})])
 
135
 
 
136
        stream = Stream(self)
 
137
        stream.listen(listener)
 
138
        stream.update("1 NEW 0 94.23.164.42.$43ED8310EB968746970896E8835C2F1991E50B69.exit:9001 SOURCE_ADDR=(Tor_internal):0 PURPOSE=DIR_FETCH".split())
 
139
 
 
140
    def test_listener_attach(self):
 
141
        self.circuits[186] = FakeCircuit(186)
 
142
 
 
143
        listener = Listener([('new', {'target_host':'www.yahoo.com', 'target_port':80}),
 
144
                             ('attach', {'target_addr':maybe_ip_addr('1.2.3.4')})])
 
145
 
 
146
        stream = Stream(self)
 
147
        stream.listen(listener)
 
148
        stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
 
149
        stream.update("316 REMAP 186 1.2.3.4:80 SOURCE=EXIT".split())
 
150
 
 
151
        self.assertEqual(self.circuits[186].streams[0], stream)
 
152
 
 
153
    def test_listener_attach_no_remap(self):
 
154
        "Attachment is via SENTCONNECT on .onion addresses (for example)"
 
155
        self.circuits[186] = FakeCircuit(186)
 
156
 
 
157
        listener = Listener([('new', {'target_host':'www.yahoo.com', 'target_port':80}),
 
158
                             ('attach', {})])
 
159
 
 
160
        stream = Stream(self)
 
161
        stream.listen(listener)
 
162
        stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
 
163
        stream.update("316 SENTCONNECT 186 1.2.3.4:80 SOURCE=EXIT".split())
 
164
 
 
165
        self.assertEqual(self.circuits[186].streams[0], stream)
 
166
 
 
167
    def test_update_wrong_stream(self):
 
168
        self.circuits[186] = FakeCircuit(186)
 
169
 
 
170
        stream = Stream(self)
 
171
        stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
 
172
        try:
 
173
            stream.update("999 SENTCONNECT 186 1.2.3.4:80 SOURCE=EXIT".split())
 
174
            self.fail()
 
175
        except Exception, e:
 
176
            self.assertTrue('wrong stream' in str(e))
 
177
 
 
178
    def test_update_illegal_state(self):
 
179
        self.circuits[186] = FakeCircuit(186)
 
180
 
 
181
        stream = Stream(self)
 
182
        try:
 
183
            stream.update("316 FOO 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
 
184
            self.fail()
 
185
        except Exception, e:
 
186
            self.assertTrue('Unknown state' in str(e))
 
187
 
 
188
    def test_listen_unlisten(self):
 
189
        self.circuits[186] = FakeCircuit(186)
 
190
 
 
191
        listener = Listener([])
 
192
 
 
193
        stream = Stream(self)
 
194
        stream.listen(listener)
 
195
        stream.unlisten(listener)
 
196
        self.assertEqual(len(stream.listeners), 0)
 
197
 
 
198
    def test_stream_changed(self):
 
199
        "Change a stream-id mid-stream."
 
200
        self.circuits[186] = FakeCircuit(186)
 
201
 
 
202
        listener = Listener([('new', {'target_host':'www.yahoo.com', 'target_port':80}),
 
203
                             ('attach', {}),
 
204
                             ('succeeded', {})])
 
205
 
 
206
        stream = Stream(self)
 
207
        stream.listen(listener)
 
208
        stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
 
209
        stream.update("316 SENTCONNECT 186 1.2.3.4:80 SOURCE=EXIT".split())
 
210
        self.assertEqual(self.circuits[186].streams[0], stream)
 
211
 
 
212
        # magically change circuit ID without a DETACHED, should fail
 
213
        stream.update("316 SUCCEEDED 999 1.2.3.4:80 SOURCE=EXIT".split())
 
214
        errs = self.flushLoggedErrors()
 
215
        self.assertEqual(len(errs), 1)
 
216
        # kind of fragile to look at strings, but...
 
217
        self.assertTrue('186 to 999' in str(errs[0]))
 
218
 
 
219
    def test_stream_changed_with_detach(self):
 
220
        "Change a stream-id mid-stream, but with a DETACHED message"
 
221
        self.circuits[123] = FakeCircuit(123)
 
222
        self.circuits[456] = FakeCircuit(456)
 
223
 
 
224
        listener = Listener([('new', {'target_host':'www.yahoo.com', 'target_port':80}),
 
225
                             ('attach', {}),
 
226
                             ('detach', {'kwargs': dict(reason='END', remote_reason='MISC')}),
 
227
                             ('attach', {})])
 
228
 
 
229
        stream = Stream(self)
 
230
        stream.listen(listener)
 
231
        stream.update("999 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
 
232
        stream.update("999 SENTCONNECT 123 1.2.3.4:80".split())
 
233
        self.assertEqual(len(self.circuits[123].streams), 1)
 
234
        self.assertEqual(self.circuits[123].streams[0], stream)
 
235
 
 
236
        stream.update("999 DETACHED 123 1.2.3.4:80 REASON=END REMOTE_REASON=MISC".split())
 
237
        self.assertEqual(len(self.circuits[123].streams), 0)
 
238
 
 
239
        stream.update("999 SENTCONNECT 456 1.2.3.4:80 SOURCE=EXIT".split())
 
240
        self.assertEqual(len(self.circuits[456].streams), 1)
 
241
        self.assertEqual(self.circuits[456].streams[0], stream)
 
242
 
 
243
    def test_listener_close(self):
 
244
        self.circuits[186] = FakeCircuit(186)
 
245
 
 
246
        listener = Listener([('new', {'target_host':'www.yahoo.com', 'target_port':80}),
 
247
                             ('attach', {'target_addr':maybe_ip_addr('1.2.3.4')}),
 
248
                             ('closed', {'kwargs': dict(REASON='END', REMOTE_REASON='DONE')})])
 
249
        stream = Stream(self)
 
250
        stream.listen(listener)
 
251
        stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
 
252
        stream.update("316 REMAP 186 1.2.3.4:80 SOURCE=EXIT".split())
 
253
        stream.update("316 CLOSED 186 1.2.3.4:80 REASON=END REMOTE_REASON=DONE".split())
 
254
 
 
255
        self.assertEqual(len(self.circuits[186].streams), 0)
 
256
 
 
257
    def test_listener_fail(self):
 
258
        listener = Listener([('new', {'target_host':'www.yahoo.com', 'target_port':80}),
 
259
                             ('attach', {'target_addr':maybe_ip_addr('1.2.3.4')}),
 
260
                             ('failed', {'kwargs': dict(REASON='TIMEOUT', REMOTE_REASON='DESTROYED')})])
 
261
        stream = Stream(self)
 
262
        stream.listen(listener)
 
263
        stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
 
264
        self.circuits[186] = FakeCircuit(186)
 
265
        stream.update("316 REMAP 186 1.2.3.4:80 SOURCE=EXIT".split())
 
266
        stream.update("316 FAILED 0 1.2.3.4:80 REASON=TIMEOUT REMOTE_REASON=DESTROYED".split())
 
267
 
 
268
    def test_str(self):
 
269
        stream = Stream(self)
 
270
        stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
 
271
        stream.circuit = FakeCircuit(1)
 
272
        str(stream)
 
273
 
 
274
    def test_ipv6(self):
 
275
        listener = Listener([('new', {'target_host':'::1', 'target_port':80})])
 
276
 
 
277
        stream = Stream(self)
 
278
        stream.listen(listener)
 
279
        stream.update("1234 NEW 0 ::1:80 SOURCE_ADDR=127.0.0.1:57349 PURPOSE=USER".split())
 
280
 
 
281
    def test_ipv6_remap(self):
 
282
        stream = Stream(self)
 
283
        stream.update("1234 REMAP 0 ::1:80 SOURCE_ADDR=127.0.0.1:57349 PURPOSE=USER".split())
 
284
        self.assertEqual(stream.target_addr, maybe_ip_addr('::1'))
 
285
 
 
286
    def test_ipv6_source(self):
 
287
        listener = Listener([('new', {'source_addr':maybe_ip_addr('::1'), 'source_port':12345})])
 
288
 
 
289
        stream = Stream(self)
 
290
        stream.listen(listener)
 
291
        stream.update("1234 NEW 0 127.0.0.1:80 SOURCE_ADDR=::1:12345 PURPOSE=USER".split())
 
292
 
 
293
    def test_states_and_uris(self):
 
294
        self.circuits[1] = FakeCircuit(1)
 
295
 
 
296
        stream = Stream(self)
 
297
        for address in ['1.2.3.4:80',
 
298
                        '1.2.3.4.315D5684D5343580D409F16119F78D776A58AEFB.exit:80',
 
299
                        'timaq4ygg2iegci7.onion:80']:
 
300
 
 
301
            line = "316 %s 1 %s REASON=FOO"
 
302
            for state in ['NEW', 'SUCCEEDED', 'REMAP',
 
303
                          'SENTCONNECT',
 
304
                          'DETACHED', 'NEWRESOLVE', 'SENTRESOLVE',
 
305
                          'FAILED', 'CLOSED']:
 
306
                stream.update((line % (state, address)).split(' '))
 
307
                self.assertEqual(stream.state, state)
 
308
 
 
309
    def test_close_stream(self):
 
310
        self.circuits[186] = FakeCircuit(186)
 
311
        stream = Stream(self)
 
312
        stream.update("316 NEW 0 www.yahoo.com:80 SOURCE_ADDR=127.0.0.1:55877 PURPOSE=USER".split())
 
313
        stream.update("316 REMAP 186 1.2.3.4:80 SOURCE=EXIT".split())
 
314
 
 
315
        self.assertEqual(len(self.circuits[186].streams), 1)
 
316
 
 
317
        d = stream.close()
 
318
        self.assertTrue(not d.called)
 
319
        self.assertEqual(len(self.circuits[186].streams), 1)
 
320
 
 
321
        stream.update("316 CLOSED 186 1.2.3.4:80 REASON=END REMOTE_REASON=DONE".split())
 
322
        self.assertTrue(d.called)
 
323
        self.assertEqual(len(self.circuits[186].streams), 0)