1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
from twisted.trial import unittest
7
from twisted.internet import protocol, reactor, error
8
from twisted.python import failure, components
9
from twisted.pair import ip, raw
10
from zope import interface
13
interface.implements(raw.IRawDatagramProtocol)
15
def __init__(self, expecting):
16
self.expecting = list(expecting)
18
def datagramReceived(self, data, **kw):
19
assert self.expecting, 'Got a packet when not expecting anymore.'
20
expectData, expectKw = self.expecting.pop(0)
22
expectKwKeys = expectKw.keys(); expectKwKeys.sort()
23
kwKeys = kw.keys(); kwKeys.sort()
24
assert expectKwKeys == kwKeys, "Expected %r, got %r" % (expectKwKeys, kwKeys)
26
for k in expectKwKeys:
27
assert expectKw[k] == kw[k], "Expected %s=%r, got %r" % (k, expectKw[k], kw[k])
28
assert expectKw == kw, "Expected %r, got %r" % (expectKw, kw)
29
assert expectData == data, "Expected %r, got %r" % (expectData, data)
31
class IPTestCase(unittest.TestCase):
32
def testPacketParsing(self):
33
proto = ip.IPProtocol()
45
'fragment_id': 0xDEAD,
46
'fragment_offset': 0x1EEF,
53
proto.addProto(0x0F, p1)
55
proto.datagramReceived("\x54" #ihl version
59
+ "\xBE\xEF" #frag_off
63
+ "\x05\x06\x07\x08" + "\x01\x02\x03\x04" + "foobar",
70
assert not p1.expecting, \
71
'Should not expect any more packets, but still want %r' % p1.expecting
73
def testMultiplePackets(self):
74
proto = ip.IPProtocol()
86
'fragment_id': 0xDEAD,
87
'fragment_offset': 0x1EEF,
102
'fragment_id': 0xDEAD,
103
'fragment_offset': 0x1EEF,
110
proto.addProto(0x0F, p1)
111
proto.datagramReceived("\x54" #ihl version
113
+ "\x00\x1a" #tot_len
115
+ "\xBE\xEF" #frag_off
119
+ "\x05\x06\x07\x08" + "\x01\x02\x03\x04" + "foobar",
125
proto.datagramReceived("\x54" #ihl version
127
+ "\x00\x1a" #tot_len
129
+ "\xBE\xEF" #frag_off
133
+ "\x06\x07\x08\x09" + "\x05\x04\x03\x02" + "quux",
140
assert not p1.expecting, \
141
'Should not expect any more packets, but still want %r' % p1.expecting
144
def testMultipleSameProtos(self):
145
proto = ip.IPProtocol()
157
'fragment_id': 0xDEAD,
158
'fragment_offset': 0x1EEF,
177
'fragment_id': 0xDEAD,
178
'fragment_offset': 0x1EEF,
186
proto.addProto(0x0F, p1)
187
proto.addProto(0x0F, p2)
189
proto.datagramReceived("\x54" #ihl version
191
+ "\x00\x1a" #tot_len
193
+ "\xBE\xEF" #frag_off
197
+ "\x05\x06\x07\x08" + "\x01\x02\x03\x04" + "foobar",
204
assert not p1.expecting, \
205
'Should not expect any more packets, but still want %r' % p1.expecting
206
assert not p2.expecting, \
207
'Should not expect any more packets, but still want %r' % p2.expecting
209
def testWrongProtoNotSeen(self):
210
proto = ip.IPProtocol()
212
proto.addProto(1, p1)
214
proto.datagramReceived("\x54" #ihl version
216
+ "\x00\x1a" #tot_len
218
+ "\xBE\xEF" #frag_off
222
+ "\x05\x06\x07\x08" + "\x01\x02\x03\x04" + "foobar",
229
def testDemuxing(self):
230
proto = ip.IPProtocol()
242
'fragment_id': 0xDEAD,
243
'fragment_offset': 0x1EEF,
258
'fragment_id': 0xDEAD,
259
'fragment_offset': 0x1EEF,
266
proto.addProto(0x0F, p1)
279
'fragment_id': 0xDEAD,
280
'fragment_offset': 0x1EEF,
295
'fragment_id': 0xDEAD,
296
'fragment_offset': 0x1EEF,
304
proto.addProto(0x0A, p2)
306
proto.datagramReceived("\x54" #ihl version
308
+ "\x00\x1a" #tot_len
310
+ "\xBE\xEF" #frag_off
314
+ "\x06\x07\x08\x09" + "\x05\x04\x03\x02" + "quux",
320
proto.datagramReceived("\x54" #ihl version
322
+ "\x00\x1a" #tot_len
324
+ "\xBE\xEF" #frag_off
328
+ "\x05\x06\x07\x08" + "\x01\x02\x03\x04" + "foobar",
334
proto.datagramReceived("\x54" #ihl version
336
+ "\x00\x1a" #tot_len
338
+ "\xBE\xEF" #frag_off
342
+ "\x06\x07\x08\x09" + "\x05\x04\x03\x02" + "quux",
348
proto.datagramReceived("\x54" #ihl version
350
+ "\x00\x1a" #tot_len
352
+ "\xBE\xEF" #frag_off
356
+ "\x05\x06\x07\x08" + "\x01\x02\x03\x04" + "foobar",
363
assert not p1.expecting, \
364
'Should not expect any more packets, but still want %r' % p1.expecting
365
assert not p2.expecting, \
366
'Should not expect any more packets, but still want %r' % p2.expecting
368
def testAddingBadProtos_WrongLevel(self):
369
"""Adding a wrong level protocol raises an exception."""
372
e.addProto(42, "silliness")
373
except components.CannotAdapt:
376
raise AssertionError, 'addProto must raise an exception for bad protocols'
379
def testAddingBadProtos_TooSmall(self):
380
"""Adding a protocol with a negative number raises an exception."""
383
e.addProto(-1, MyProtocol([]))
385
if e.args == ('Added protocol must be positive or zero',):
390
raise AssertionError, 'addProto must raise an exception for bad protocols'
393
def testAddingBadProtos_TooBig(self):
394
"""Adding a protocol with a number >=2**32 raises an exception."""
397
e.addProto(2L**32, MyProtocol([]))
399
if e.args == ('Added protocol must fit in 32 bits',):
404
raise AssertionError, 'addProto must raise an exception for bad protocols'
406
def testAddingBadProtos_TooBig2(self):
407
"""Adding a protocol with a number >=2**32 raises an exception."""
410
e.addProto(2L**32+1, MyProtocol([]))
412
if e.args == ('Added protocol must fit in 32 bits',):
417
raise AssertionError, 'addProto must raise an exception for bad protocols'