~james-w/ubuntu/lucid/psycopg2/precise-backport

« back to all changes in this revision

Viewing changes to tests/types_basic.py

  • Committer: Bazaar Package Importer
  • Author(s): Fabio Tranchitella, Jakub Wilk, Fabio Tranchitella
  • Date: 2011-06-19 18:25:53 UTC
  • mfrom: (5.1.10 sid)
  • Revision ID: james.westby@ubuntu.com-20110619182553-uye7z0g5ewab98px
Tags: 2.4.2-1
[ Jakub Wilk ]
* Add Debian Python Modules Team to Uploaders.

[ Fabio Tranchitella ]
* New upstream release.
* debian/watch: updated, use pypi.
* debian/control, debian/rules: switched to dh_python2.
* debian/control: bumped Standard-Version to 3.9.2, no changes required.

Show diffs side-by-side

added added

removed removed

Lines of Context:
27
27
except:
28
28
    pass
29
29
import sys
30
 
import unittest
 
30
import testutils
 
31
from testutils import unittest, decorate_all_tests
 
32
from testconfig import dsn
31
33
 
32
34
import psycopg2
33
 
import tests
 
35
from psycopg2.extensions import b
34
36
 
35
37
 
36
38
class TypesBasicTests(unittest.TestCase):
37
39
    """Test that all type conversions are working."""
38
40
 
39
41
    def setUp(self):
40
 
        self.conn = psycopg2.connect(tests.dsn)
 
42
        self.conn = psycopg2.connect(dsn)
 
43
 
 
44
    def tearDown(self):
 
45
        self.conn.close()
41
46
 
42
47
    def execute(self, *args):
43
48
        curs = self.conn.cursor()
59
64
        self.failUnless(s == 1971, "wrong integer quoting: " + str(s))
60
65
        s = self.execute("SELECT %s AS foo", (1971L,))
61
66
        self.failUnless(s == 1971L, "wrong integer quoting: " + str(s))
62
 
        if sys.version_info[0] < 2 or sys.version_info[1] < 4:
 
67
        if sys.version_info[0:2] < (2, 4):
63
68
            s = self.execute("SELECT %s AS foo", (19.10,))
64
69
            self.failUnless(abs(s - 19.10) < 0.001,
65
70
                        "wrong float quoting: " + str(s))
66
71
 
 
72
    def testBoolean(self):
 
73
        x = self.execute("SELECT %s as foo", (False,))
 
74
        self.assert_(x is False)
 
75
        x = self.execute("SELECT %s as foo", (True,))
 
76
        self.assert_(x is True)
 
77
 
67
78
    def testDecimal(self):
68
 
        if sys.version_info[0] >= 2 and sys.version_info[1] >= 4:
 
79
        if sys.version_info[0:2] >= (2, 4):
69
80
            s = self.execute("SELECT %s AS foo", (decimal.Decimal("19.10"),))
70
81
            self.failUnless(s - decimal.Decimal("19.10") == 0,
71
82
                            "wrong decimal quoting: " + str(s))
78
89
            s = self.execute("SELECT %s AS foo", (decimal.Decimal("-infinity"),))
79
90
            self.failUnless(str(s) == "NaN", "wrong decimal quoting: " + str(s))
80
91
            self.failUnless(type(s) == decimal.Decimal, "wrong decimal conversion: " + repr(s))
81
 
 
82
 
    def testFloat(self):
 
92
        else:
 
93
            return self.skipTest("decimal not available")
 
94
 
 
95
    def testFloatNan(self):
 
96
        try:
 
97
            float("nan")
 
98
        except ValueError:
 
99
            return self.skipTest("nan not available on this platform")
 
100
 
83
101
        s = self.execute("SELECT %s AS foo", (float("nan"),))
84
102
        self.failUnless(str(s) == "nan", "wrong float quoting: " + str(s))
85
103
        self.failUnless(type(s) == float, "wrong float conversion: " + repr(s))
 
104
 
 
105
    def testFloatInf(self):
 
106
        try:
 
107
            self.execute("select 'inf'::float")
 
108
        except psycopg2.DataError:
 
109
            return self.skipTest("inf::float not available on the server")
 
110
        except ValueError:
 
111
            return self.skipTest("inf not available on this platform")
86
112
        s = self.execute("SELECT %s AS foo", (float("inf"),))
87
113
        self.failUnless(str(s) == "inf", "wrong float quoting: " + str(s))      
88
114
        self.failUnless(type(s) == float, "wrong float conversion: " + repr(s))
89
115
 
 
116
        s = self.execute("SELECT %s AS foo", (float("-inf"),))
 
117
        self.failUnless(str(s) == "-inf", "wrong float quoting: " + str(s))      
 
118
 
90
119
    def testBinary(self):
91
 
        s = ''.join([chr(x) for x in range(256)])
92
 
        b = psycopg2.Binary(s)
 
120
        if sys.version_info[0] < 3:
 
121
            s = ''.join([chr(x) for x in range(256)])
 
122
            b = psycopg2.Binary(s)
 
123
            buf = self.execute("SELECT %s::bytea AS foo", (b,))
 
124
            self.assertEqual(s, str(buf))
 
125
        else:
 
126
            s = bytes(range(256))
 
127
            b = psycopg2.Binary(s)
 
128
            buf = self.execute("SELECT %s::bytea AS foo", (b,))
 
129
            self.assertEqual(s, buf)
 
130
 
 
131
    def testBinaryNone(self):
 
132
        b = psycopg2.Binary(None)
93
133
        buf = self.execute("SELECT %s::bytea AS foo", (b,))
94
 
        self.failUnless(str(buf) == s, "wrong binary quoting")
 
134
        self.assertEqual(buf, None)
95
135
 
96
136
    def testBinaryEmptyString(self):
97
137
        # test to make sure an empty Binary is converted to an empty string
98
 
        b = psycopg2.Binary('')
99
 
        self.assertEqual(str(b), "''::bytea")
 
138
        if sys.version_info[0] < 3:
 
139
            b = psycopg2.Binary('')
 
140
            self.assertEqual(str(b), "''::bytea")
 
141
        else:
 
142
            b = psycopg2.Binary(bytes([]))
 
143
            self.assertEqual(str(b), "''::bytea")
100
144
 
101
145
    def testBinaryRoundTrip(self):
102
146
        # test to make sure buffers returned by psycopg2 are
103
147
        # understood by execute:
104
 
        s = ''.join([chr(x) for x in range(256)])
105
 
        buf = self.execute("SELECT %s::bytea AS foo", (psycopg2.Binary(s),))
106
 
        buf2 = self.execute("SELECT %s::bytea AS foo", (buf,))
107
 
        self.failUnless(str(buf2) == s, "wrong binary quoting")
 
148
        if sys.version_info[0] < 3:
 
149
            s = ''.join([chr(x) for x in range(256)])
 
150
            buf = self.execute("SELECT %s::bytea AS foo", (psycopg2.Binary(s),))
 
151
            buf2 = self.execute("SELECT %s::bytea AS foo", (buf,))
 
152
            self.assertEqual(s, str(buf2))
 
153
        else:
 
154
            s = bytes(range(256))
 
155
            buf = self.execute("SELECT %s::bytea AS foo", (psycopg2.Binary(s),))
 
156
            buf2 = self.execute("SELECT %s::bytea AS foo", (buf,))
 
157
            self.assertEqual(s, buf2)
108
158
 
109
159
    def testArray(self):
110
160
        s = self.execute("SELECT %s AS foo", ([[1,2],[3,4]],))
111
 
        self.failUnless(s == [[1,2],[3,4]], "wrong array quoting " + str(s))
 
161
        self.failUnlessEqual(s, [[1,2],[3,4]])
112
162
        s = self.execute("SELECT %s AS foo", (['one', 'two', 'three'],))
113
 
        self.failUnless(s == ['one', 'two', 'three'],
114
 
                        "wrong array quoting " + str(s))
115
 
 
116
 
    def testTypeRoundtripBinary(self):
 
163
        self.failUnlessEqual(s, ['one', 'two', 'three'])
 
164
 
 
165
    def testEmptyArrayRegression(self):
 
166
        # ticket #42
 
167
        import datetime
 
168
        curs = self.conn.cursor()
 
169
        curs.execute("create table array_test (id integer, col timestamp without time zone[])")
 
170
 
 
171
        curs.execute("insert into array_test values (%s, %s)", (1, [datetime.date(2011,2,14)]))
 
172
        curs.execute("select col from array_test where id = 1")
 
173
        self.assertEqual(curs.fetchone()[0], [datetime.datetime(2011, 2, 14, 0, 0)])
 
174
 
 
175
        curs.execute("insert into array_test values (%s, %s)", (2, []))
 
176
        curs.execute("select col from array_test where id = 2")
 
177
        self.assertEqual(curs.fetchone()[0], [])
 
178
 
 
179
    def testEmptyArray(self):
 
180
        s = self.execute("SELECT '{}' AS foo")
 
181
        self.failUnlessEqual(s, [])
 
182
        s = self.execute("SELECT '{}'::text[] AS foo")
 
183
        self.failUnlessEqual(s, [])
 
184
        s = self.execute("SELECT %s AS foo", ([],))
 
185
        self.failUnlessEqual(s, [])
 
186
        s = self.execute("SELECT 1 != ALL(%s)", ([],))
 
187
        self.failUnlessEqual(s, True)
 
188
        # but don't break the strings :)
 
189
        s = self.execute("SELECT '{}'::text AS foo")
 
190
        self.failUnlessEqual(s, "{}")
 
191
 
 
192
    @testutils.skip_from_python(3)
 
193
    def testTypeRoundtripBuffer(self):
117
194
        o1 = buffer("".join(map(chr, range(256))))
118
195
        o2 = self.execute("select %s;", (o1,))
119
196
        self.assertEqual(type(o1), type(o2))
122
199
        o1 = buffer("")
123
200
        o2 = self.execute("select %s;", (o1,))
124
201
        self.assertEqual(type(o1), type(o2))
 
202
        self.assertEqual(str(o1), str(o2))
125
203
 
126
 
    def testTypeRoundtripBinaryArray(self):
 
204
    @testutils.skip_from_python(3)
 
205
    def testTypeRoundtripBufferArray(self):
127
206
        o1 = buffer("".join(map(chr, range(256))))
128
207
        o1 = [o1]
129
208
        o2 = self.execute("select %s;", (o1,))
130
209
        self.assertEqual(type(o1[0]), type(o2[0]))
 
210
        self.assertEqual(str(o1[0]), str(o2[0]))
 
211
 
 
212
    @testutils.skip_before_python(3)
 
213
    def testTypeRoundtripBytes(self):
 
214
        o1 = bytes(range(256))
 
215
        o2 = self.execute("select %s;", (o1,))
 
216
        self.assertEqual(memoryview, type(o2))
 
217
 
 
218
        # Test with an empty buffer
 
219
        o1 = bytes([])
 
220
        o2 = self.execute("select %s;", (o1,))
 
221
        self.assertEqual(memoryview, type(o2))
 
222
 
 
223
    @testutils.skip_before_python(3)
 
224
    def testTypeRoundtripBytesArray(self):
 
225
        o1 = bytes(range(256))
 
226
        o1 = [o1]
 
227
        o2 = self.execute("select %s;", (o1,))
 
228
        self.assertEqual(memoryview, type(o2[0]))
 
229
 
 
230
    @testutils.skip_before_python(2, 6)
 
231
    def testAdaptBytearray(self):
 
232
        o1 = bytearray(range(256))
 
233
        o2 = self.execute("select %s;", (o1,))
 
234
 
 
235
        if sys.version_info[0] < 3:
 
236
            self.assertEqual(buffer, type(o2))
 
237
        else:
 
238
            self.assertEqual(memoryview, type(o2))
 
239
 
 
240
        self.assertEqual(len(o1), len(o2))
 
241
        for c1, c2 in zip(o1, o2):
 
242
            self.assertEqual(c1, ord(c2))
 
243
 
 
244
        # Test with an empty buffer
 
245
        o1 = bytearray([])
 
246
        o2 = self.execute("select %s;", (o1,))
 
247
 
 
248
        self.assertEqual(len(o2), 0)
 
249
        if sys.version_info[0] < 3:
 
250
            self.assertEqual(buffer, type(o2))
 
251
        else:
 
252
            self.assertEqual(memoryview, type(o2))
 
253
 
 
254
    @testutils.skip_before_python(2, 7)
 
255
    def testAdaptMemoryview(self):
 
256
        o1 = memoryview(bytearray(range(256)))
 
257
        o2 = self.execute("select %s;", (o1,))
 
258
        if sys.version_info[0] < 3:
 
259
            self.assertEqual(buffer, type(o2))
 
260
        else:
 
261
            self.assertEqual(memoryview, type(o2))
 
262
 
 
263
        # Test with an empty buffer
 
264
        o1 = memoryview(bytearray([]))
 
265
        o2 = self.execute("select %s;", (o1,))
 
266
        if sys.version_info[0] < 3:
 
267
            self.assertEqual(buffer, type(o2))
 
268
        else:
 
269
            self.assertEqual(memoryview, type(o2))
 
270
 
 
271
    def testByteaHexCheckFalsePositive(self):
 
272
        # the check \x -> x to detect bad bytea decode
 
273
        # may be fooled if the first char is really an 'x'
 
274
        o1 = psycopg2.Binary(b('x'))
 
275
        o2 = self.execute("SELECT %s::bytea AS foo", (o1,))
 
276
        self.assertEqual(b('x'), o2[0])
 
277
 
 
278
    def testNegNumber(self):
 
279
        d1 = self.execute("select -%s;", (decimal.Decimal('-1.0'),))
 
280
        self.assertEqual(1, d1)
 
281
        f1 = self.execute("select -%s;", (-1.0,))
 
282
        self.assertEqual(1, f1)
 
283
        i1 = self.execute("select -%s;", (-1,))
 
284
        self.assertEqual(1, i1)
 
285
        l1 = self.execute("select -%s;", (-1L,))
 
286
        self.assertEqual(1, l1)
 
287
 
 
288
 
 
289
class AdaptSubclassTest(unittest.TestCase):
 
290
    def test_adapt_subtype(self):
 
291
        from psycopg2.extensions import adapt
 
292
        class Sub(str): pass
 
293
        s1 = "hel'lo"
 
294
        s2 = Sub(s1)
 
295
        self.assertEqual(adapt(s1).getquoted(), adapt(s2).getquoted())
 
296
 
 
297
    def test_adapt_most_specific(self):
 
298
        from psycopg2.extensions import adapt, register_adapter, AsIs
 
299
 
 
300
        class A(object): pass
 
301
        class B(A): pass
 
302
        class C(B): pass
 
303
 
 
304
        register_adapter(A, lambda a: AsIs("a"))
 
305
        register_adapter(B, lambda b: AsIs("b"))
 
306
        try:
 
307
            self.assertEqual(b('b'), adapt(C()).getquoted())
 
308
        finally:
 
309
           del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
 
310
           del psycopg2.extensions.adapters[B, psycopg2.extensions.ISQLQuote]
 
311
 
 
312
    @testutils.skip_from_python(3)
 
313
    def test_no_mro_no_joy(self):
 
314
        from psycopg2.extensions import adapt, register_adapter, AsIs
 
315
 
 
316
        class A: pass
 
317
        class B(A): pass
 
318
 
 
319
        register_adapter(A, lambda a: AsIs("a"))
 
320
        try:
 
321
            self.assertRaises(psycopg2.ProgrammingError, adapt, B())
 
322
        finally:
 
323
           del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
 
324
 
 
325
 
 
326
    @testutils.skip_before_python(3)
 
327
    def test_adapt_subtype_3(self):
 
328
        from psycopg2.extensions import adapt, register_adapter, AsIs
 
329
 
 
330
        class A: pass
 
331
        class B(A): pass
 
332
 
 
333
        register_adapter(A, lambda a: AsIs("a"))
 
334
        try:
 
335
            self.assertEqual(b("a"), adapt(B()).getquoted())
 
336
        finally:
 
337
           del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
 
338
 
 
339
 
 
340
class ByteaParserTest(unittest.TestCase):
 
341
    """Unit test for our bytea format parser."""
 
342
    def setUp(self):
 
343
        try:
 
344
            self._cast = self._import_cast()
 
345
        except Exception, e:
 
346
            self._cast = None
 
347
            self._exc = e
 
348
 
 
349
    def _import_cast(self):
 
350
        """Use ctypes to access the C function.
 
351
 
 
352
        Raise any sort of error: we just support this where ctypes works as
 
353
        expected.
 
354
        """
 
355
        import ctypes
 
356
        lib = ctypes.cdll.LoadLibrary(psycopg2._psycopg.__file__)
 
357
        cast = lib.typecast_BINARY_cast
 
358
        cast.argtypes = [ctypes.c_char_p, ctypes.c_size_t, ctypes.py_object]
 
359
        cast.restype = ctypes.py_object
 
360
        return cast
 
361
 
 
362
    def cast(self, buffer):
 
363
        """Cast a buffer from the output format"""
 
364
        l = buffer and len(buffer) or 0
 
365
        rv = self._cast(buffer, l, None)
 
366
 
 
367
        if rv is None:
 
368
            return None
 
369
 
 
370
        if sys.version_info[0] < 3:
 
371
            return str(rv)
 
372
        else:
 
373
            return rv.tobytes()
 
374
 
 
375
    def test_null(self):
 
376
        rv = self.cast(None)
 
377
        self.assertEqual(rv, None)
 
378
 
 
379
    def test_blank(self):
 
380
        rv = self.cast(b(''))
 
381
        self.assertEqual(rv, b(''))
 
382
 
 
383
    def test_blank_hex(self):
 
384
        # Reported as problematic in ticket #48
 
385
        rv = self.cast(b('\\x'))
 
386
        self.assertEqual(rv, b(''))
 
387
 
 
388
    def test_full_hex(self, upper=False):
 
389
        buf = ''.join(("%02x" % i) for i in range(256))
 
390
        if upper: buf = buf.upper()
 
391
        buf = '\\x' + buf
 
392
        rv = self.cast(b(buf))
 
393
        if sys.version_info[0] < 3:
 
394
            self.assertEqual(rv, ''.join(map(chr, range(256))))
 
395
        else:
 
396
            self.assertEqual(rv, bytes(range(256)))
 
397
 
 
398
    def test_full_hex_upper(self):
 
399
        return self.test_full_hex(upper=True)
 
400
 
 
401
    def test_full_escaped_octal(self):
 
402
        buf = ''.join(("\\%03o" % i) for i in range(256))
 
403
        rv = self.cast(b(buf))
 
404
        if sys.version_info[0] < 3:
 
405
            self.assertEqual(rv, ''.join(map(chr, range(256))))
 
406
        else:
 
407
            self.assertEqual(rv, bytes(range(256)))
 
408
 
 
409
    def test_escaped_mixed(self):
 
410
        import string
 
411
        buf = ''.join(("\\%03o" % i) for i in range(32))
 
412
        buf += string.ascii_letters
 
413
        buf += ''.join('\\' + c for c in string.ascii_letters)
 
414
        buf += '\\\\'
 
415
        rv = self.cast(b(buf))
 
416
        if sys.version_info[0] < 3:
 
417
            tgt = ''.join(map(chr, range(32))) \
 
418
                + string.ascii_letters * 2 + '\\'
 
419
        else:
 
420
            tgt = bytes(range(32)) + \
 
421
                (string.ascii_letters * 2 + '\\').encode('ascii')
 
422
 
 
423
        self.assertEqual(rv, tgt)
 
424
 
 
425
def skip_if_cant_cast(f):
 
426
    def skip_if_cant_cast_(self, *args, **kwargs):
 
427
        if self._cast is None:
 
428
            return self.skipTest("can't test bytea parser: %s - %s"
 
429
                % (self._exc.__class__.__name__, self._exc))
 
430
 
 
431
        return f(self, *args, **kwargs)
 
432
 
 
433
    return skip_if_cant_cast_
 
434
 
 
435
decorate_all_tests(ByteaParserTest, skip_if_cant_cast)
 
436
 
131
437
 
132
438
def test_suite():
133
439
    return unittest.TestLoader().loadTestsFromName(__name__)