1
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2
# See LICENSE for details.
5
from twisted.trial import unittest
8
import cStringIO as StringIO
14
from twisted.spread import banana
15
from twisted.python import failure
16
from twisted.internet import protocol, main
18
class MathTestCase(unittest.TestCase):
19
def testInt2b128(self):
20
funkylist = range(0,100) + range(1000,1100) + range(1000000,1000100) + [1024 **10l]
22
x = StringIO.StringIO()
23
banana.int2b128(i, x.write)
25
y = banana.b1282int(v)
26
assert y == i, "y = %s; i = %s" % (y,i)
28
class BananaTestCase(unittest.TestCase):
30
encClass = banana.Banana
33
self.io = StringIO.StringIO()
34
self.enc = self.encClass()
35
self.enc.makeConnection(protocol.FileWrapper(self.io))
36
self.enc._selectDialect("none")
37
self.enc.expressionReceived = self.putResult
39
def putResult(self, result):
43
self.enc.connectionLost(failure.Failure(main.CONNECTION_DONE))
47
self.enc.sendEncoded("hello")
49
self.enc.dataReceived(self.io.getvalue())
50
assert self.result == 'hello'
53
self.enc.sendEncoded(1015l)
54
self.enc.dataReceived(self.io.getvalue())
55
assert self.result == 1015l, "should be 1015l, got %s" % self.result
58
def test_largeLong(self):
60
Test that various longs greater than 2 ** 32 - 1 round-trip through
63
for exp in (32, 64, 128, 256):
67
self.enc.sendEncoded(n)
68
self.enc.dataReceived(self.io.getvalue())
69
self.assertEqual(self.result, n)
72
def _getSmallest(self):
73
# How many bytes of prefix our implementation allows
74
bytes = self.enc.prefixLimit
75
# How many useful bits we can extract from that based on Banana's
76
# base-128 representation.
78
# The largest number we _should_ be able to encode
79
largest = 2 ** bits - 1
80
# The smallest number we _shouldn't_ be able to encode
81
smallest = largest + 1
85
def test_encodeTooLargeLong(self):
87
Test that a long above the implementation-specific limit is rejected
88
as too large to be encoded.
90
smallest = self._getSmallest()
91
self.assertRaises(banana.BananaError, self.enc.sendEncoded, smallest)
94
def test_decodeTooLargeLong(self):
96
Test that a long above the implementation specific limit is rejected
97
as too large to be decoded.
99
smallest = self._getSmallest()
100
self.enc.setPrefixLimit(self.enc.prefixLimit * 2)
101
self.enc.sendEncoded(smallest)
102
encoded = self.io.getvalue()
104
self.enc.setPrefixLimit(self.enc.prefixLimit / 2)
106
self.assertRaises(banana.BananaError, self.enc.dataReceived, encoded)
109
def _getLargest(self):
110
return -self._getSmallest()
113
def test_encodeTooSmallLong(self):
115
Test that a negative long below the implementation-specific limit is
116
rejected as too small to be encoded.
118
largest = self._getLargest()
119
self.assertRaises(banana.BananaError, self.enc.sendEncoded, largest)
122
def test_decodeTooSmallLong(self):
124
Test that a negative long below the implementation specific limit is
125
rejected as too small to be decoded.
127
largest = self._getLargest()
128
self.enc.setPrefixLimit(self.enc.prefixLimit * 2)
129
self.enc.sendEncoded(largest)
130
encoded = self.io.getvalue()
132
self.enc.setPrefixLimit(self.enc.prefixLimit / 2)
134
self.assertRaises(banana.BananaError, self.enc.dataReceived, encoded)
137
def testNegativeLong(self):
138
self.enc.sendEncoded(-1015l)
139
self.enc.dataReceived(self.io.getvalue())
140
assert self.result == -1015l, "should be -1015l, got %s" % self.result
142
def testInteger(self):
143
self.enc.sendEncoded(1015)
144
self.enc.dataReceived(self.io.getvalue())
145
assert self.result == 1015, "should be 1015, got %s" % self.result
147
def testNegative(self):
148
self.enc.sendEncoded(-1015)
149
self.enc.dataReceived(self.io.getvalue())
150
assert self.result == -1015, "should be -1015, got %s" % self.result
153
self.enc.sendEncoded(1015.)
154
self.enc.dataReceived(self.io.getvalue())
155
assert self.result == 1015.
158
foo = [1, 2, [3, 4], [30.5, 40.2], 5, ["six", "seven", ["eight", 9]], [10], []]
159
self.enc.sendEncoded(foo)
160
self.enc.dataReceived(self.io.getvalue())
161
assert self.result == foo, "%s!=%s" % (repr(self.result), repr(self.result))
163
def testPartial(self):
164
foo = [1, 2, [3, 4], [30.5, 40.2], 5,
165
["six", "seven", ["eight", 9]], [10],
166
# TODO: currently the C implementation's a bit buggy...
167
sys.maxint * 3l, sys.maxint * 2l, sys.maxint * -2l]
168
self.enc.sendEncoded(foo)
169
for byte in self.io.getvalue():
170
self.enc.dataReceived(byte)
171
assert self.result == foo, "%s!=%s" % (repr(self.result), repr(foo))
173
def feed(self, data):
175
self.enc.dataReceived(byte)
176
def testOversizedList(self):
177
data = '\x02\x01\x01\x01\x01\x80'
178
# list(size=0x0101010102, about 4.3e9)
179
self.failUnlessRaises(banana.BananaError, self.feed, data)
180
def testOversizedString(self):
181
data = '\x02\x01\x01\x01\x01\x82'
182
# string(size=0x0101010102, about 4.3e9)
183
self.failUnlessRaises(banana.BananaError, self.feed, data)
185
def testCrashString(self):
186
crashString = '\x00\x00\x00\x00\x04\x80'
187
# string(size=0x0400000000, about 17.2e9)
189
# cBanana would fold that into a 32-bit 'int', then try to allocate
190
# a list with PyList_New(). cBanana ignored the NULL return value,
191
# so it would segfault when trying to free the imaginary list.
193
# This variant doesn't segfault straight out in my environment.
194
# Instead, it takes up large amounts of CPU and memory...
195
#crashString = '\x00\x00\x00\x00\x01\x80'
196
# print repr(crashString)
197
#self.failUnlessRaises(Exception, self.enc.dataReceived, crashString)
199
# should now raise MemoryError
200
self.enc.dataReceived(crashString)
201
except banana.BananaError:
204
def testCrashNegativeLong(self):
205
# There was a bug in cBanana which relied on negating a negative integer
206
# always giving a postive result, but for the lowest possible number in
207
# 2s-complement arithmetic, that's not true, i.e.
208
# long x = -2147483648;
210
# x == y; /* true! */
211
# (assuming 32-bit longs)
212
self.enc.sendEncoded(-2147483648)
213
self.enc.dataReceived(self.io.getvalue())
214
assert self.result == -2147483648, "should be -2147483648, got %s" % self.result
217
class GlobalCoderTests(unittest.TestCase):
219
Tests for the free functions L{banana.encode} and L{banana.decode}.
221
def test_statelessDecode(self):
223
Test that state doesn't carry over between calls to L{banana.decode}.
225
# Banana encoding of 2 ** 449
226
undecodable = '\x7f' * 65 + '\x85'
227
self.assertRaises(banana.BananaError, banana.decode, undecodable)
229
# Banana encoding of 1
230
decodable = '\x01\x81'
231
self.assertEqual(banana.decode(decodable), 1)