~certify-web-dev/twisted/certify-trunk

« back to all changes in this revision

Viewing changes to twisted/pair/test/test_ip.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2007-01-17 14:52:35 UTC
  • mfrom: (1.1.5 upstream) (2.1.2 etch)
  • Revision ID: james.westby@ubuntu.com-20070117145235-btmig6qfmqfen0om
Tags: 2.5.0-0ubuntu1
New upstream version, compatible with python2.5.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
 
2
# See LICENSE for details.
 
3
 
 
4
#
 
5
from twisted.trial import unittest
 
6
 
 
7
from twisted.internet import protocol, reactor, error
 
8
from twisted.python import failure, components
 
9
from twisted.pair import ip, raw
 
10
from zope import interface
 
11
 
 
12
class MyProtocol:
 
13
    interface.implements(raw.IRawDatagramProtocol)
 
14
    
 
15
    def __init__(self, expecting):
 
16
        self.expecting = list(expecting)
 
17
 
 
18
    def datagramReceived(self, data, **kw):
 
19
        assert self.expecting, 'Got a packet when not expecting anymore.'
 
20
        expectData, expectKw = self.expecting.pop(0)
 
21
 
 
22
        expectKwKeys = expectKw.keys(); expectKwKeys.sort()
 
23
        kwKeys = kw.keys(); kwKeys.sort()
 
24
        assert expectKwKeys == kwKeys, "Expected %r, got %r" % (expectKwKeys, kwKeys)
 
25
 
 
26
        for k in expectKwKeys:
 
27
            assert expectKw[k] == kw[k], "Expected %s=%r, got %r" % (k, expectKw[k], kw[k])
 
28
        assert expectKw == kw, "Expected %r, got %r" % (expectKw, kw)
 
29
        assert expectData == data, "Expected %r, got %r" % (expectData, data)
 
30
 
 
31
class IPTestCase(unittest.TestCase):
 
32
    def testPacketParsing(self):
 
33
        proto = ip.IPProtocol()
 
34
        p1 = MyProtocol([
 
35
 
 
36
            ('foobar', {
 
37
            'partial': 0,
 
38
            'dest': '1.2.3.4',
 
39
            'source': '5.6.7.8',
 
40
            'protocol': 0x0F,
 
41
            'version': 4,
 
42
            'ihl': 20,
 
43
            'tos': 7,
 
44
            'tot_len': 20+6,
 
45
            'fragment_id': 0xDEAD,
 
46
            'fragment_offset': 0x1EEF,
 
47
            'dont_fragment': 0,
 
48
            'more_fragments': 1,
 
49
            'ttl': 0xC0,
 
50
            }),
 
51
 
 
52
            ])
 
53
        proto.addProto(0x0F, p1)
 
54
 
 
55
        proto.datagramReceived("\x54" #ihl version
 
56
                               + "\x07" #tos
 
57
                               + "\x00\x1a" #tot_len
 
58
                               + "\xDE\xAD" #id
 
59
                               + "\xBE\xEF" #frag_off
 
60
                               + "\xC0" #ttl
 
61
                               + "\x0F" #protocol
 
62
                               + "FE" #checksum
 
63
                               + "\x05\x06\x07\x08" + "\x01\x02\x03\x04" + "foobar",
 
64
                               partial=0,
 
65
                               dest='dummy',
 
66
                               source='dummy',
 
67
                               protocol='dummy',
 
68
                               )
 
69
 
 
70
        assert not p1.expecting, \
 
71
               'Should not expect any more packets, but still want %r' % p1.expecting
 
72
 
 
73
    def testMultiplePackets(self):
 
74
        proto = ip.IPProtocol()
 
75
        p1 = MyProtocol([
 
76
 
 
77
            ('foobar', {
 
78
            'partial': 0,
 
79
            'dest': '1.2.3.4',
 
80
            'source': '5.6.7.8',
 
81
            'protocol': 0x0F,
 
82
            'version': 4,
 
83
            'ihl': 20,
 
84
            'tos': 7,
 
85
            'tot_len': 20+6,
 
86
            'fragment_id': 0xDEAD,
 
87
            'fragment_offset': 0x1EEF,
 
88
            'dont_fragment': 0,
 
89
            'more_fragments': 1,
 
90
            'ttl': 0xC0,
 
91
            }),
 
92
 
 
93
            ('quux', {
 
94
            'partial': 1,
 
95
            'dest': '5.4.3.2',
 
96
            'source': '6.7.8.9',
 
97
            'protocol': 0x0F,
 
98
            'version': 4,
 
99
            'ihl': 20,
 
100
            'tos': 7,
 
101
            'tot_len': 20+6,
 
102
            'fragment_id': 0xDEAD,
 
103
            'fragment_offset': 0x1EEF,
 
104
            'dont_fragment': 0,
 
105
            'more_fragments': 1,
 
106
            'ttl': 0xC0,
 
107
            }),
 
108
 
 
109
            ])
 
110
        proto.addProto(0x0F, p1)
 
111
        proto.datagramReceived("\x54" #ihl version
 
112
                               + "\x07" #tos
 
113
                               + "\x00\x1a" #tot_len
 
114
                               + "\xDE\xAD" #id
 
115
                               + "\xBE\xEF" #frag_off
 
116
                               + "\xC0" #ttl
 
117
                               + "\x0F" #protocol
 
118
                               + "FE" #checksum
 
119
                               + "\x05\x06\x07\x08" + "\x01\x02\x03\x04" + "foobar",
 
120
                               partial=0,
 
121
                               dest='dummy',
 
122
                               source='dummy',
 
123
                               protocol='dummy',
 
124
                               )
 
125
        proto.datagramReceived("\x54" #ihl version
 
126
                               + "\x07" #tos
 
127
                               + "\x00\x1a" #tot_len
 
128
                               + "\xDE\xAD" #id
 
129
                               + "\xBE\xEF" #frag_off
 
130
                               + "\xC0" #ttl
 
131
                               + "\x0F" #protocol
 
132
                               + "FE" #checksum
 
133
                               + "\x06\x07\x08\x09" + "\x05\x04\x03\x02" + "quux",
 
134
                               partial=1,
 
135
                               dest='dummy',
 
136
                               source='dummy',
 
137
                               protocol='dummy',
 
138
                               )
 
139
 
 
140
        assert not p1.expecting, \
 
141
               'Should not expect any more packets, but still want %r' % p1.expecting
 
142
 
 
143
 
 
144
    def testMultipleSameProtos(self):
 
145
        proto = ip.IPProtocol()
 
146
        p1 = MyProtocol([
 
147
 
 
148
            ('foobar', {
 
149
            'partial': 0,
 
150
            'dest': '1.2.3.4',
 
151
            'source': '5.6.7.8',
 
152
            'protocol': 0x0F,
 
153
            'version': 4,
 
154
            'ihl': 20,
 
155
            'tos': 7,
 
156
            'tot_len': 20+6,
 
157
            'fragment_id': 0xDEAD,
 
158
            'fragment_offset': 0x1EEF,
 
159
            'dont_fragment': 0,
 
160
            'more_fragments': 1,
 
161
            'ttl': 0xC0,
 
162
            }),
 
163
 
 
164
            ])
 
165
 
 
166
        p2 = MyProtocol([
 
167
 
 
168
            ('foobar', {
 
169
            'partial': 0,
 
170
            'dest': '1.2.3.4',
 
171
            'source': '5.6.7.8',
 
172
            'protocol': 0x0F,
 
173
            'version': 4,
 
174
            'ihl': 20,
 
175
            'tos': 7,
 
176
            'tot_len': 20+6,
 
177
            'fragment_id': 0xDEAD,
 
178
            'fragment_offset': 0x1EEF,
 
179
            'dont_fragment': 0,
 
180
            'more_fragments': 1,
 
181
            'ttl': 0xC0,
 
182
            }),
 
183
 
 
184
            ])
 
185
 
 
186
        proto.addProto(0x0F, p1)
 
187
        proto.addProto(0x0F, p2)
 
188
 
 
189
        proto.datagramReceived("\x54" #ihl version
 
190
                               + "\x07" #tos
 
191
                               + "\x00\x1a" #tot_len
 
192
                               + "\xDE\xAD" #id
 
193
                               + "\xBE\xEF" #frag_off
 
194
                               + "\xC0" #ttl
 
195
                               + "\x0F" #protocol
 
196
                               + "FE" #checksum
 
197
                               + "\x05\x06\x07\x08" + "\x01\x02\x03\x04" + "foobar",
 
198
                               partial=0,
 
199
                               dest='dummy',
 
200
                               source='dummy',
 
201
                               protocol='dummy',
 
202
                               )
 
203
 
 
204
        assert not p1.expecting, \
 
205
               'Should not expect any more packets, but still want %r' % p1.expecting
 
206
        assert not p2.expecting, \
 
207
               'Should not expect any more packets, but still want %r' % p2.expecting
 
208
 
 
209
    def testWrongProtoNotSeen(self):
 
210
        proto = ip.IPProtocol()
 
211
        p1 = MyProtocol([])
 
212
        proto.addProto(1, p1)
 
213
 
 
214
        proto.datagramReceived("\x54" #ihl version
 
215
                               + "\x07" #tos
 
216
                               + "\x00\x1a" #tot_len
 
217
                               + "\xDE\xAD" #id
 
218
                               + "\xBE\xEF" #frag_off
 
219
                               + "\xC0" #ttl
 
220
                               + "\x0F" #protocol
 
221
                               + "FE" #checksum
 
222
                               + "\x05\x06\x07\x08" + "\x01\x02\x03\x04" + "foobar",
 
223
                               partial=0,
 
224
                               dest='dummy',
 
225
                               source='dummy',
 
226
                               protocol='dummy',
 
227
                               )
 
228
 
 
229
    def testDemuxing(self):
 
230
        proto = ip.IPProtocol()
 
231
        p1 = MyProtocol([
 
232
 
 
233
            ('foobar', {
 
234
            'partial': 0,
 
235
            'dest': '1.2.3.4',
 
236
            'source': '5.6.7.8',
 
237
            'protocol': 0x0F,
 
238
            'version': 4,
 
239
            'ihl': 20,
 
240
            'tos': 7,
 
241
            'tot_len': 20+6,
 
242
            'fragment_id': 0xDEAD,
 
243
            'fragment_offset': 0x1EEF,
 
244
            'dont_fragment': 0,
 
245
            'more_fragments': 1,
 
246
            'ttl': 0xC0,
 
247
            }),
 
248
 
 
249
            ('quux', {
 
250
            'partial': 1,
 
251
            'dest': '5.4.3.2',
 
252
            'source': '6.7.8.9',
 
253
            'protocol': 0x0F,
 
254
            'version': 4,
 
255
            'ihl': 20,
 
256
            'tos': 7,
 
257
            'tot_len': 20+6,
 
258
            'fragment_id': 0xDEAD,
 
259
            'fragment_offset': 0x1EEF,
 
260
            'dont_fragment': 0,
 
261
            'more_fragments': 1,
 
262
            'ttl': 0xC0,
 
263
            }),
 
264
 
 
265
            ])
 
266
        proto.addProto(0x0F, p1)
 
267
 
 
268
        p2 = MyProtocol([
 
269
 
 
270
            ('quux', {
 
271
            'partial': 1,
 
272
            'dest': '5.4.3.2',
 
273
            'source': '6.7.8.9',
 
274
            'protocol': 0x0A,
 
275
            'version': 4,
 
276
            'ihl': 20,
 
277
            'tos': 7,
 
278
            'tot_len': 20+6,
 
279
            'fragment_id': 0xDEAD,
 
280
            'fragment_offset': 0x1EEF,
 
281
            'dont_fragment': 0,
 
282
            'more_fragments': 1,
 
283
            'ttl': 0xC0,
 
284
            }),
 
285
 
 
286
            ('foobar', {
 
287
            'partial': 0,
 
288
            'dest': '1.2.3.4',
 
289
            'source': '5.6.7.8',
 
290
            'protocol': 0x0A,
 
291
            'version': 4,
 
292
            'ihl': 20,
 
293
            'tos': 7,
 
294
            'tot_len': 20+6,
 
295
            'fragment_id': 0xDEAD,
 
296
            'fragment_offset': 0x1EEF,
 
297
            'dont_fragment': 0,
 
298
            'more_fragments': 1,
 
299
            'ttl': 0xC0,
 
300
            }),
 
301
 
 
302
 
 
303
            ])
 
304
        proto.addProto(0x0A, p2)
 
305
 
 
306
        proto.datagramReceived("\x54" #ihl version
 
307
                               + "\x07" #tos
 
308
                               + "\x00\x1a" #tot_len
 
309
                               + "\xDE\xAD" #id
 
310
                               + "\xBE\xEF" #frag_off
 
311
                               + "\xC0" #ttl
 
312
                               + "\x0A" #protocol
 
313
                               + "FE" #checksum
 
314
                               + "\x06\x07\x08\x09" + "\x05\x04\x03\x02" + "quux",
 
315
                               partial=1,
 
316
                               dest='dummy',
 
317
                               source='dummy',
 
318
                               protocol='dummy',
 
319
                               )
 
320
        proto.datagramReceived("\x54" #ihl version
 
321
                               + "\x07" #tos
 
322
                               + "\x00\x1a" #tot_len
 
323
                               + "\xDE\xAD" #id
 
324
                               + "\xBE\xEF" #frag_off
 
325
                               + "\xC0" #ttl
 
326
                               + "\x0F" #protocol
 
327
                               + "FE" #checksum
 
328
                               + "\x05\x06\x07\x08" + "\x01\x02\x03\x04" + "foobar",
 
329
                               partial=0,
 
330
                               dest='dummy',
 
331
                               source='dummy',
 
332
                               protocol='dummy',
 
333
                               )
 
334
        proto.datagramReceived("\x54" #ihl version
 
335
                               + "\x07" #tos
 
336
                               + "\x00\x1a" #tot_len
 
337
                               + "\xDE\xAD" #id
 
338
                               + "\xBE\xEF" #frag_off
 
339
                               + "\xC0" #ttl
 
340
                               + "\x0F" #protocol
 
341
                               + "FE" #checksum
 
342
                               + "\x06\x07\x08\x09" + "\x05\x04\x03\x02" + "quux",
 
343
                               partial=1,
 
344
                               dest='dummy',
 
345
                               source='dummy',
 
346
                               protocol='dummy',
 
347
                               )
 
348
        proto.datagramReceived("\x54" #ihl version
 
349
                               + "\x07" #tos
 
350
                               + "\x00\x1a" #tot_len
 
351
                               + "\xDE\xAD" #id
 
352
                               + "\xBE\xEF" #frag_off
 
353
                               + "\xC0" #ttl
 
354
                               + "\x0A" #protocol
 
355
                               + "FE" #checksum
 
356
                               + "\x05\x06\x07\x08" + "\x01\x02\x03\x04" + "foobar",
 
357
                               partial=0,
 
358
                               dest='dummy',
 
359
                               source='dummy',
 
360
                               protocol='dummy',
 
361
                               )
 
362
 
 
363
        assert not p1.expecting, \
 
364
               'Should not expect any more packets, but still want %r' % p1.expecting
 
365
        assert not p2.expecting, \
 
366
               'Should not expect any more packets, but still want %r' % p2.expecting
 
367
 
 
368
    def testAddingBadProtos_WrongLevel(self):
 
369
        """Adding a wrong level protocol raises an exception."""
 
370
        e = ip.IPProtocol()
 
371
        try:
 
372
            e.addProto(42, "silliness")
 
373
        except components.CannotAdapt:
 
374
            pass
 
375
        else:
 
376
            raise AssertionError, 'addProto must raise an exception for bad protocols'
 
377
 
 
378
 
 
379
    def testAddingBadProtos_TooSmall(self):
 
380
        """Adding a protocol with a negative number raises an exception."""
 
381
        e = ip.IPProtocol()
 
382
        try:
 
383
            e.addProto(-1, MyProtocol([]))
 
384
        except TypeError, e:
 
385
            if e.args == ('Added protocol must be positive or zero',):
 
386
                pass
 
387
            else:
 
388
                raise
 
389
        else:
 
390
            raise AssertionError, 'addProto must raise an exception for bad protocols'
 
391
 
 
392
 
 
393
    def testAddingBadProtos_TooBig(self):
 
394
        """Adding a protocol with a number >=2**32 raises an exception."""
 
395
        e = ip.IPProtocol()
 
396
        try:
 
397
            e.addProto(2L**32, MyProtocol([]))
 
398
        except TypeError, e:
 
399
            if e.args == ('Added protocol must fit in 32 bits',):
 
400
                pass
 
401
            else:
 
402
                raise
 
403
        else:
 
404
            raise AssertionError, 'addProto must raise an exception for bad protocols'
 
405
 
 
406
    def testAddingBadProtos_TooBig2(self):
 
407
        """Adding a protocol with a number >=2**32 raises an exception."""
 
408
        e = ip.IPProtocol()
 
409
        try:
 
410
            e.addProto(2L**32+1, MyProtocol([]))
 
411
        except TypeError, e:
 
412
            if e.args == ('Added protocol must fit in 32 bits',):
 
413
                pass
 
414
            else:
 
415
                raise
 
416
        else:
 
417
            raise AssertionError, 'addProto must raise an exception for bad protocols'