78
89
s = self.execute("SELECT %s AS foo", (decimal.Decimal("-infinity"),))
79
90
self.failUnless(str(s) == "NaN", "wrong decimal quoting: " + str(s))
80
91
self.failUnless(type(s) == decimal.Decimal, "wrong decimal conversion: " + repr(s))
93
return self.skipTest("decimal not available")
95
def testFloatNan(self):
99
return self.skipTest("nan not available on this platform")
83
101
s = self.execute("SELECT %s AS foo", (float("nan"),))
84
102
self.failUnless(str(s) == "nan", "wrong float quoting: " + str(s))
85
103
self.failUnless(type(s) == float, "wrong float conversion: " + repr(s))
105
def testFloatInf(self):
107
self.execute("select 'inf'::float")
108
except psycopg2.DataError:
109
return self.skipTest("inf::float not available on the server")
111
return self.skipTest("inf not available on this platform")
86
112
s = self.execute("SELECT %s AS foo", (float("inf"),))
87
113
self.failUnless(str(s) == "inf", "wrong float quoting: " + str(s))
88
114
self.failUnless(type(s) == float, "wrong float conversion: " + repr(s))
116
s = self.execute("SELECT %s AS foo", (float("-inf"),))
117
self.failUnless(str(s) == "-inf", "wrong float quoting: " + str(s))
90
119
def testBinary(self):
91
s = ''.join([chr(x) for x in range(256)])
92
b = psycopg2.Binary(s)
120
if sys.version_info[0] < 3:
121
s = ''.join([chr(x) for x in range(256)])
122
b = psycopg2.Binary(s)
123
buf = self.execute("SELECT %s::bytea AS foo", (b,))
124
self.assertEqual(s, str(buf))
126
s = bytes(range(256))
127
b = psycopg2.Binary(s)
128
buf = self.execute("SELECT %s::bytea AS foo", (b,))
129
self.assertEqual(s, buf)
131
def testBinaryNone(self):
132
b = psycopg2.Binary(None)
93
133
buf = self.execute("SELECT %s::bytea AS foo", (b,))
94
self.failUnless(str(buf) == s, "wrong binary quoting")
134
self.assertEqual(buf, None)
96
136
def testBinaryEmptyString(self):
97
137
# test to make sure an empty Binary is converted to an empty string
98
b = psycopg2.Binary('')
99
self.assertEqual(str(b), "''::bytea")
138
if sys.version_info[0] < 3:
139
b = psycopg2.Binary('')
140
self.assertEqual(str(b), "''::bytea")
142
b = psycopg2.Binary(bytes([]))
143
self.assertEqual(str(b), "''::bytea")
101
145
def testBinaryRoundTrip(self):
102
146
# test to make sure buffers returned by psycopg2 are
103
147
# understood by execute:
104
s = ''.join([chr(x) for x in range(256)])
105
buf = self.execute("SELECT %s::bytea AS foo", (psycopg2.Binary(s),))
106
buf2 = self.execute("SELECT %s::bytea AS foo", (buf,))
107
self.failUnless(str(buf2) == s, "wrong binary quoting")
148
if sys.version_info[0] < 3:
149
s = ''.join([chr(x) for x in range(256)])
150
buf = self.execute("SELECT %s::bytea AS foo", (psycopg2.Binary(s),))
151
buf2 = self.execute("SELECT %s::bytea AS foo", (buf,))
152
self.assertEqual(s, str(buf2))
154
s = bytes(range(256))
155
buf = self.execute("SELECT %s::bytea AS foo", (psycopg2.Binary(s),))
156
buf2 = self.execute("SELECT %s::bytea AS foo", (buf,))
157
self.assertEqual(s, buf2)
109
159
def testArray(self):
110
160
s = self.execute("SELECT %s AS foo", ([[1,2],[3,4]],))
111
self.failUnless(s == [[1,2],[3,4]], "wrong array quoting " + str(s))
161
self.failUnlessEqual(s, [[1,2],[3,4]])
112
162
s = self.execute("SELECT %s AS foo", (['one', 'two', 'three'],))
113
self.failUnless(s == ['one', 'two', 'three'],
114
"wrong array quoting " + str(s))
116
def testTypeRoundtripBinary(self):
163
self.failUnlessEqual(s, ['one', 'two', 'three'])
165
def testEmptyArrayRegression(self):
168
curs = self.conn.cursor()
169
curs.execute("create table array_test (id integer, col timestamp without time zone[])")
171
curs.execute("insert into array_test values (%s, %s)", (1, [datetime.date(2011,2,14)]))
172
curs.execute("select col from array_test where id = 1")
173
self.assertEqual(curs.fetchone()[0], [datetime.datetime(2011, 2, 14, 0, 0)])
175
curs.execute("insert into array_test values (%s, %s)", (2, []))
176
curs.execute("select col from array_test where id = 2")
177
self.assertEqual(curs.fetchone()[0], [])
179
def testEmptyArray(self):
180
s = self.execute("SELECT '{}' AS foo")
181
self.failUnlessEqual(s, [])
182
s = self.execute("SELECT '{}'::text[] AS foo")
183
self.failUnlessEqual(s, [])
184
s = self.execute("SELECT %s AS foo", ([],))
185
self.failUnlessEqual(s, [])
186
s = self.execute("SELECT 1 != ALL(%s)", ([],))
187
self.failUnlessEqual(s, True)
188
# but don't break the strings :)
189
s = self.execute("SELECT '{}'::text AS foo")
190
self.failUnlessEqual(s, "{}")
192
@testutils.skip_from_python(3)
193
def testTypeRoundtripBuffer(self):
117
194
o1 = buffer("".join(map(chr, range(256))))
118
195
o2 = self.execute("select %s;", (o1,))
119
196
self.assertEqual(type(o1), type(o2))
123
200
o2 = self.execute("select %s;", (o1,))
124
201
self.assertEqual(type(o1), type(o2))
202
self.assertEqual(str(o1), str(o2))
126
def testTypeRoundtripBinaryArray(self):
204
@testutils.skip_from_python(3)
205
def testTypeRoundtripBufferArray(self):
127
206
o1 = buffer("".join(map(chr, range(256))))
129
208
o2 = self.execute("select %s;", (o1,))
130
209
self.assertEqual(type(o1[0]), type(o2[0]))
210
self.assertEqual(str(o1[0]), str(o2[0]))
212
@testutils.skip_before_python(3)
213
def testTypeRoundtripBytes(self):
214
o1 = bytes(range(256))
215
o2 = self.execute("select %s;", (o1,))
216
self.assertEqual(memoryview, type(o2))
218
# Test with an empty buffer
220
o2 = self.execute("select %s;", (o1,))
221
self.assertEqual(memoryview, type(o2))
223
@testutils.skip_before_python(3)
224
def testTypeRoundtripBytesArray(self):
225
o1 = bytes(range(256))
227
o2 = self.execute("select %s;", (o1,))
228
self.assertEqual(memoryview, type(o2[0]))
230
@testutils.skip_before_python(2, 6)
231
def testAdaptBytearray(self):
232
o1 = bytearray(range(256))
233
o2 = self.execute("select %s;", (o1,))
235
if sys.version_info[0] < 3:
236
self.assertEqual(buffer, type(o2))
238
self.assertEqual(memoryview, type(o2))
240
self.assertEqual(len(o1), len(o2))
241
for c1, c2 in zip(o1, o2):
242
self.assertEqual(c1, ord(c2))
244
# Test with an empty buffer
246
o2 = self.execute("select %s;", (o1,))
248
self.assertEqual(len(o2), 0)
249
if sys.version_info[0] < 3:
250
self.assertEqual(buffer, type(o2))
252
self.assertEqual(memoryview, type(o2))
254
@testutils.skip_before_python(2, 7)
255
def testAdaptMemoryview(self):
256
o1 = memoryview(bytearray(range(256)))
257
o2 = self.execute("select %s;", (o1,))
258
if sys.version_info[0] < 3:
259
self.assertEqual(buffer, type(o2))
261
self.assertEqual(memoryview, type(o2))
263
# Test with an empty buffer
264
o1 = memoryview(bytearray([]))
265
o2 = self.execute("select %s;", (o1,))
266
if sys.version_info[0] < 3:
267
self.assertEqual(buffer, type(o2))
269
self.assertEqual(memoryview, type(o2))
271
def testByteaHexCheckFalsePositive(self):
272
# the check \x -> x to detect bad bytea decode
273
# may be fooled if the first char is really an 'x'
274
o1 = psycopg2.Binary(b('x'))
275
o2 = self.execute("SELECT %s::bytea AS foo", (o1,))
276
self.assertEqual(b('x'), o2[0])
278
def testNegNumber(self):
279
d1 = self.execute("select -%s;", (decimal.Decimal('-1.0'),))
280
self.assertEqual(1, d1)
281
f1 = self.execute("select -%s;", (-1.0,))
282
self.assertEqual(1, f1)
283
i1 = self.execute("select -%s;", (-1,))
284
self.assertEqual(1, i1)
285
l1 = self.execute("select -%s;", (-1L,))
286
self.assertEqual(1, l1)
289
class AdaptSubclassTest(unittest.TestCase):
290
def test_adapt_subtype(self):
291
from psycopg2.extensions import adapt
295
self.assertEqual(adapt(s1).getquoted(), adapt(s2).getquoted())
297
def test_adapt_most_specific(self):
298
from psycopg2.extensions import adapt, register_adapter, AsIs
300
class A(object): pass
304
register_adapter(A, lambda a: AsIs("a"))
305
register_adapter(B, lambda b: AsIs("b"))
307
self.assertEqual(b('b'), adapt(C()).getquoted())
309
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
310
del psycopg2.extensions.adapters[B, psycopg2.extensions.ISQLQuote]
312
@testutils.skip_from_python(3)
313
def test_no_mro_no_joy(self):
314
from psycopg2.extensions import adapt, register_adapter, AsIs
319
register_adapter(A, lambda a: AsIs("a"))
321
self.assertRaises(psycopg2.ProgrammingError, adapt, B())
323
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
326
@testutils.skip_before_python(3)
327
def test_adapt_subtype_3(self):
328
from psycopg2.extensions import adapt, register_adapter, AsIs
333
register_adapter(A, lambda a: AsIs("a"))
335
self.assertEqual(b("a"), adapt(B()).getquoted())
337
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
340
class ByteaParserTest(unittest.TestCase):
341
"""Unit test for our bytea format parser."""
344
self._cast = self._import_cast()
349
def _import_cast(self):
350
"""Use ctypes to access the C function.
352
Raise any sort of error: we just support this where ctypes works as
356
lib = ctypes.cdll.LoadLibrary(psycopg2._psycopg.__file__)
357
cast = lib.typecast_BINARY_cast
358
cast.argtypes = [ctypes.c_char_p, ctypes.c_size_t, ctypes.py_object]
359
cast.restype = ctypes.py_object
362
def cast(self, buffer):
363
"""Cast a buffer from the output format"""
364
l = buffer and len(buffer) or 0
365
rv = self._cast(buffer, l, None)
370
if sys.version_info[0] < 3:
377
self.assertEqual(rv, None)
379
def test_blank(self):
380
rv = self.cast(b(''))
381
self.assertEqual(rv, b(''))
383
def test_blank_hex(self):
384
# Reported as problematic in ticket #48
385
rv = self.cast(b('\\x'))
386
self.assertEqual(rv, b(''))
388
def test_full_hex(self, upper=False):
389
buf = ''.join(("%02x" % i) for i in range(256))
390
if upper: buf = buf.upper()
392
rv = self.cast(b(buf))
393
if sys.version_info[0] < 3:
394
self.assertEqual(rv, ''.join(map(chr, range(256))))
396
self.assertEqual(rv, bytes(range(256)))
398
def test_full_hex_upper(self):
399
return self.test_full_hex(upper=True)
401
def test_full_escaped_octal(self):
402
buf = ''.join(("\\%03o" % i) for i in range(256))
403
rv = self.cast(b(buf))
404
if sys.version_info[0] < 3:
405
self.assertEqual(rv, ''.join(map(chr, range(256))))
407
self.assertEqual(rv, bytes(range(256)))
409
def test_escaped_mixed(self):
411
buf = ''.join(("\\%03o" % i) for i in range(32))
412
buf += string.ascii_letters
413
buf += ''.join('\\' + c for c in string.ascii_letters)
415
rv = self.cast(b(buf))
416
if sys.version_info[0] < 3:
417
tgt = ''.join(map(chr, range(32))) \
418
+ string.ascii_letters * 2 + '\\'
420
tgt = bytes(range(32)) + \
421
(string.ascii_letters * 2 + '\\').encode('ascii')
423
self.assertEqual(rv, tgt)
425
def skip_if_cant_cast(f):
426
def skip_if_cant_cast_(self, *args, **kwargs):
427
if self._cast is None:
428
return self.skipTest("can't test bytea parser: %s - %s"
429
% (self._exc.__class__.__name__, self._exc))
431
return f(self, *args, **kwargs)
433
return skip_if_cant_cast_
435
decorate_all_tests(ByteaParserTest, skip_if_cant_cast)
132
438
def test_suite():
133
439
return unittest.TestLoader().loadTestsFromName(__name__)