~malept/ubuntu/lucid/python2.6/dev-dependency-fix

« back to all changes in this revision

Viewing changes to Lib/sqlite3/test/types.py

  • Committer: Bazaar Package Importer
  • Author(s): Matthias Klose
  • Date: 2009-02-13 12:51:00 UTC
  • Revision ID: james.westby@ubuntu.com-20090213125100-uufgcb9yeqzujpqw
Tags: upstream-2.6.1
ImportĀ upstreamĀ versionĀ 2.6.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#-*- coding: ISO-8859-1 -*-
 
2
# pysqlite2/test/types.py: tests for type conversion and detection
 
3
#
 
4
# Copyright (C) 2005-2007 Gerhard Hļæ½ring <gh@ghaering.de>
 
5
#
 
6
# This file is part of pysqlite.
 
7
#
 
8
# This software is provided 'as-is', without any express or implied
 
9
# warranty.  In no event will the authors be held liable for any damages
 
10
# arising from the use of this software.
 
11
#
 
12
# Permission is granted to anyone to use this software for any purpose,
 
13
# including commercial applications, and to alter it and redistribute it
 
14
# freely, subject to the following restrictions:
 
15
#
 
16
# 1. The origin of this software must not be misrepresented; you must not
 
17
#    claim that you wrote the original software. If you use this software
 
18
#    in a product, an acknowledgment in the product documentation would be
 
19
#    appreciated but is not required.
 
20
# 2. Altered source versions must be plainly marked as such, and must not be
 
21
#    misrepresented as being the original software.
 
22
# 3. This notice may not be removed or altered from any source distribution.
 
23
 
 
24
import zlib, datetime
 
25
import unittest
 
26
import sqlite3 as sqlite
 
27
 
 
28
class SqliteTypeTests(unittest.TestCase):
 
29
    def setUp(self):
 
30
        self.con = sqlite.connect(":memory:")
 
31
        self.cur = self.con.cursor()
 
32
        self.cur.execute("create table test(i integer, s varchar, f number, b blob)")
 
33
 
 
34
    def tearDown(self):
 
35
        self.cur.close()
 
36
        self.con.close()
 
37
 
 
38
    def CheckString(self):
 
39
        self.cur.execute("insert into test(s) values (?)", (u"ļæ½sterreich",))
 
40
        self.cur.execute("select s from test")
 
41
        row = self.cur.fetchone()
 
42
        self.failUnlessEqual(row[0], u"ļæ½sterreich")
 
43
 
 
44
    def CheckSmallInt(self):
 
45
        self.cur.execute("insert into test(i) values (?)", (42,))
 
46
        self.cur.execute("select i from test")
 
47
        row = self.cur.fetchone()
 
48
        self.failUnlessEqual(row[0], 42)
 
49
 
 
50
    def CheckLargeInt(self):
 
51
        num = 2**40
 
52
        self.cur.execute("insert into test(i) values (?)", (num,))
 
53
        self.cur.execute("select i from test")
 
54
        row = self.cur.fetchone()
 
55
        self.failUnlessEqual(row[0], num)
 
56
 
 
57
    def CheckFloat(self):
 
58
        val = 3.14
 
59
        self.cur.execute("insert into test(f) values (?)", (val,))
 
60
        self.cur.execute("select f from test")
 
61
        row = self.cur.fetchone()
 
62
        self.failUnlessEqual(row[0], val)
 
63
 
 
64
    def CheckBlob(self):
 
65
        val = buffer("Guglhupf")
 
66
        self.cur.execute("insert into test(b) values (?)", (val,))
 
67
        self.cur.execute("select b from test")
 
68
        row = self.cur.fetchone()
 
69
        self.failUnlessEqual(row[0], val)
 
70
 
 
71
    def CheckUnicodeExecute(self):
 
72
        self.cur.execute(u"select 'ļæ½sterreich'")
 
73
        row = self.cur.fetchone()
 
74
        self.failUnlessEqual(row[0], u"ļæ½sterreich")
 
75
 
 
76
class DeclTypesTests(unittest.TestCase):
 
77
    class Foo:
 
78
        def __init__(self, _val):
 
79
            self.val = _val
 
80
 
 
81
        def __cmp__(self, other):
 
82
            if not isinstance(other, DeclTypesTests.Foo):
 
83
                raise ValueError
 
84
            if self.val == other.val:
 
85
                return 0
 
86
            else:
 
87
                return 1
 
88
 
 
89
        def __conform__(self, protocol):
 
90
            if protocol is sqlite.PrepareProtocol:
 
91
                return self.val
 
92
            else:
 
93
                return None
 
94
 
 
95
        def __str__(self):
 
96
            return "<%s>" % self.val
 
97
 
 
98
    def setUp(self):
 
99
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
 
100
        self.cur = self.con.cursor()
 
101
        self.cur.execute("create table test(i int, s str, f float, b bool, u unicode, foo foo, bin blob, n1 number, n2 number(5))")
 
102
 
 
103
        # override float, make them always return the same number
 
104
        sqlite.converters["FLOAT"] = lambda x: 47.2
 
105
 
 
106
        # and implement two custom ones
 
107
        sqlite.converters["BOOL"] = lambda x: bool(int(x))
 
108
        sqlite.converters["FOO"] = DeclTypesTests.Foo
 
109
        sqlite.converters["WRONG"] = lambda x: "WRONG"
 
110
        sqlite.converters["NUMBER"] = float
 
111
 
 
112
    def tearDown(self):
 
113
        del sqlite.converters["FLOAT"]
 
114
        del sqlite.converters["BOOL"]
 
115
        del sqlite.converters["FOO"]
 
116
        del sqlite.converters["NUMBER"]
 
117
        self.cur.close()
 
118
        self.con.close()
 
119
 
 
120
    def CheckString(self):
 
121
        # default
 
122
        self.cur.execute("insert into test(s) values (?)", ("foo",))
 
123
        self.cur.execute('select s as "s [WRONG]" from test')
 
124
        row = self.cur.fetchone()
 
125
        self.failUnlessEqual(row[0], "foo")
 
126
 
 
127
    def CheckSmallInt(self):
 
128
        # default
 
129
        self.cur.execute("insert into test(i) values (?)", (42,))
 
130
        self.cur.execute("select i from test")
 
131
        row = self.cur.fetchone()
 
132
        self.failUnlessEqual(row[0], 42)
 
133
 
 
134
    def CheckLargeInt(self):
 
135
        # default
 
136
        num = 2**40
 
137
        self.cur.execute("insert into test(i) values (?)", (num,))
 
138
        self.cur.execute("select i from test")
 
139
        row = self.cur.fetchone()
 
140
        self.failUnlessEqual(row[0], num)
 
141
 
 
142
    def CheckFloat(self):
 
143
        # custom
 
144
        val = 3.14
 
145
        self.cur.execute("insert into test(f) values (?)", (val,))
 
146
        self.cur.execute("select f from test")
 
147
        row = self.cur.fetchone()
 
148
        self.failUnlessEqual(row[0], 47.2)
 
149
 
 
150
    def CheckBool(self):
 
151
        # custom
 
152
        self.cur.execute("insert into test(b) values (?)", (False,))
 
153
        self.cur.execute("select b from test")
 
154
        row = self.cur.fetchone()
 
155
        self.failUnlessEqual(row[0], False)
 
156
 
 
157
        self.cur.execute("delete from test")
 
158
        self.cur.execute("insert into test(b) values (?)", (True,))
 
159
        self.cur.execute("select b from test")
 
160
        row = self.cur.fetchone()
 
161
        self.failUnlessEqual(row[0], True)
 
162
 
 
163
    def CheckUnicode(self):
 
164
        # default
 
165
        val = u"\xd6sterreich"
 
166
        self.cur.execute("insert into test(u) values (?)", (val,))
 
167
        self.cur.execute("select u from test")
 
168
        row = self.cur.fetchone()
 
169
        self.failUnlessEqual(row[0], val)
 
170
 
 
171
    def CheckFoo(self):
 
172
        val = DeclTypesTests.Foo("bla")
 
173
        self.cur.execute("insert into test(foo) values (?)", (val,))
 
174
        self.cur.execute("select foo from test")
 
175
        row = self.cur.fetchone()
 
176
        self.failUnlessEqual(row[0], val)
 
177
 
 
178
    def CheckUnsupportedSeq(self):
 
179
        class Bar: pass
 
180
        val = Bar()
 
181
        try:
 
182
            self.cur.execute("insert into test(f) values (?)", (val,))
 
183
            self.fail("should have raised an InterfaceError")
 
184
        except sqlite.InterfaceError:
 
185
            pass
 
186
        except:
 
187
            self.fail("should have raised an InterfaceError")
 
188
 
 
189
    def CheckUnsupportedDict(self):
 
190
        class Bar: pass
 
191
        val = Bar()
 
192
        try:
 
193
            self.cur.execute("insert into test(f) values (:val)", {"val": val})
 
194
            self.fail("should have raised an InterfaceError")
 
195
        except sqlite.InterfaceError:
 
196
            pass
 
197
        except:
 
198
            self.fail("should have raised an InterfaceError")
 
199
 
 
200
    def CheckBlob(self):
 
201
        # default
 
202
        val = buffer("Guglhupf")
 
203
        self.cur.execute("insert into test(bin) values (?)", (val,))
 
204
        self.cur.execute("select bin from test")
 
205
        row = self.cur.fetchone()
 
206
        self.failUnlessEqual(row[0], val)
 
207
 
 
208
    def CheckNumber1(self):
 
209
        self.cur.execute("insert into test(n1) values (5)")
 
210
        value = self.cur.execute("select n1 from test").fetchone()[0]
 
211
        # if the converter is not used, it's an int instead of a float
 
212
        self.failUnlessEqual(type(value), float)
 
213
 
 
214
    def CheckNumber2(self):
 
215
        """Checks wether converter names are cut off at '(' characters"""
 
216
        self.cur.execute("insert into test(n2) values (5)")
 
217
        value = self.cur.execute("select n2 from test").fetchone()[0]
 
218
        # if the converter is not used, it's an int instead of a float
 
219
        self.failUnlessEqual(type(value), float)
 
220
 
 
221
class ColNamesTests(unittest.TestCase):
 
222
    def setUp(self):
 
223
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
 
224
        self.cur = self.con.cursor()
 
225
        self.cur.execute("create table test(x foo)")
 
226
 
 
227
        sqlite.converters["FOO"] = lambda x: "[%s]" % x
 
228
        sqlite.converters["BAR"] = lambda x: "<%s>" % x
 
229
        sqlite.converters["EXC"] = lambda x: 5/0
 
230
        sqlite.converters["B1B1"] = lambda x: "MARKER"
 
231
 
 
232
    def tearDown(self):
 
233
        del sqlite.converters["FOO"]
 
234
        del sqlite.converters["BAR"]
 
235
        del sqlite.converters["EXC"]
 
236
        del sqlite.converters["B1B1"]
 
237
        self.cur.close()
 
238
        self.con.close()
 
239
 
 
240
    def CheckDeclTypeNotUsed(self):
 
241
        """
 
242
        Assures that the declared type is not used when PARSE_DECLTYPES
 
243
        is not set.
 
244
        """
 
245
        self.cur.execute("insert into test(x) values (?)", ("xxx",))
 
246
        self.cur.execute("select x from test")
 
247
        val = self.cur.fetchone()[0]
 
248
        self.failUnlessEqual(val, "xxx")
 
249
 
 
250
    def CheckNone(self):
 
251
        self.cur.execute("insert into test(x) values (?)", (None,))
 
252
        self.cur.execute("select x from test")
 
253
        val = self.cur.fetchone()[0]
 
254
        self.failUnlessEqual(val, None)
 
255
 
 
256
    def CheckColName(self):
 
257
        self.cur.execute("insert into test(x) values (?)", ("xxx",))
 
258
        self.cur.execute('select x as "x [bar]" from test')
 
259
        val = self.cur.fetchone()[0]
 
260
        self.failUnlessEqual(val, "<xxx>")
 
261
 
 
262
        # Check if the stripping of colnames works. Everything after the first
 
263
        # whitespace should be stripped.
 
264
        self.failUnlessEqual(self.cur.description[0][0], "x")
 
265
 
 
266
    def CheckCaseInConverterName(self):
 
267
        self.cur.execute("""select 'other' as "x [b1b1]\"""")
 
268
        val = self.cur.fetchone()[0]
 
269
        self.failUnlessEqual(val, "MARKER")
 
270
 
 
271
    def CheckCursorDescriptionNoRow(self):
 
272
        """
 
273
        cursor.description should at least provide the column name(s), even if
 
274
        no row returned.
 
275
        """
 
276
        self.cur.execute("select * from test where 0 = 1")
 
277
        self.assert_(self.cur.description[0][0] == "x")
 
278
 
 
279
class ObjectAdaptationTests(unittest.TestCase):
 
280
    def cast(obj):
 
281
        return float(obj)
 
282
    cast = staticmethod(cast)
 
283
 
 
284
    def setUp(self):
 
285
        self.con = sqlite.connect(":memory:")
 
286
        try:
 
287
            del sqlite.adapters[int]
 
288
        except:
 
289
            pass
 
290
        sqlite.register_adapter(int, ObjectAdaptationTests.cast)
 
291
        self.cur = self.con.cursor()
 
292
 
 
293
    def tearDown(self):
 
294
        del sqlite.adapters[(int, sqlite.PrepareProtocol)]
 
295
        self.cur.close()
 
296
        self.con.close()
 
297
 
 
298
    def CheckCasterIsUsed(self):
 
299
        self.cur.execute("select ?", (4,))
 
300
        val = self.cur.fetchone()[0]
 
301
        self.failUnlessEqual(type(val), float)
 
302
 
 
303
class BinaryConverterTests(unittest.TestCase):
 
304
    def convert(s):
 
305
        return zlib.decompress(s)
 
306
    convert = staticmethod(convert)
 
307
 
 
308
    def setUp(self):
 
309
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
 
310
        sqlite.register_converter("bin", BinaryConverterTests.convert)
 
311
 
 
312
    def tearDown(self):
 
313
        self.con.close()
 
314
 
 
315
    def CheckBinaryInputForConverter(self):
 
316
        testdata = "abcdefg" * 10
 
317
        result = self.con.execute('select ? as "x [bin]"', (buffer(zlib.compress(testdata)),)).fetchone()[0]
 
318
        self.failUnlessEqual(testdata, result)
 
319
 
 
320
class DateTimeTests(unittest.TestCase):
 
321
    def setUp(self):
 
322
        self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
 
323
        self.cur = self.con.cursor()
 
324
        self.cur.execute("create table test(d date, ts timestamp)")
 
325
 
 
326
    def tearDown(self):
 
327
        self.cur.close()
 
328
        self.con.close()
 
329
 
 
330
    def CheckSqliteDate(self):
 
331
        d = sqlite.Date(2004, 2, 14)
 
332
        self.cur.execute("insert into test(d) values (?)", (d,))
 
333
        self.cur.execute("select d from test")
 
334
        d2 = self.cur.fetchone()[0]
 
335
        self.failUnlessEqual(d, d2)
 
336
 
 
337
    def CheckSqliteTimestamp(self):
 
338
        ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0)
 
339
        self.cur.execute("insert into test(ts) values (?)", (ts,))
 
340
        self.cur.execute("select ts from test")
 
341
        ts2 = self.cur.fetchone()[0]
 
342
        self.failUnlessEqual(ts, ts2)
 
343
 
 
344
    def CheckSqlTimestamp(self):
 
345
        # The date functions are only available in SQLite version 3.1 or later
 
346
        if sqlite.sqlite_version_info < (3, 1):
 
347
            return
 
348
 
 
349
        # SQLite's current_timestamp uses UTC time, while datetime.datetime.now() uses local time.
 
350
        now = datetime.datetime.now()
 
351
        self.cur.execute("insert into test(ts) values (current_timestamp)")
 
352
        self.cur.execute("select ts from test")
 
353
        ts = self.cur.fetchone()[0]
 
354
        self.failUnlessEqual(type(ts), datetime.datetime)
 
355
        self.failUnlessEqual(ts.year, now.year)
 
356
 
 
357
    def CheckDateTimeSubSeconds(self):
 
358
        ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000)
 
359
        self.cur.execute("insert into test(ts) values (?)", (ts,))
 
360
        self.cur.execute("select ts from test")
 
361
        ts2 = self.cur.fetchone()[0]
 
362
        self.failUnlessEqual(ts, ts2)
 
363
 
 
364
    def CheckDateTimeSubSecondsFloatingPoint(self):
 
365
        ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241)
 
366
        self.cur.execute("insert into test(ts) values (?)", (ts,))
 
367
        self.cur.execute("select ts from test")
 
368
        ts2 = self.cur.fetchone()[0]
 
369
        self.failUnlessEqual(ts, ts2)
 
370
 
 
371
def suite():
 
372
    sqlite_type_suite = unittest.makeSuite(SqliteTypeTests, "Check")
 
373
    decltypes_type_suite = unittest.makeSuite(DeclTypesTests, "Check")
 
374
    colnames_type_suite = unittest.makeSuite(ColNamesTests, "Check")
 
375
    adaptation_suite = unittest.makeSuite(ObjectAdaptationTests, "Check")
 
376
    bin_suite = unittest.makeSuite(BinaryConverterTests, "Check")
 
377
    date_suite = unittest.makeSuite(DateTimeTests, "Check")
 
378
    return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite))
 
379
 
 
380
def test():
 
381
    runner = unittest.TextTestRunner()
 
382
    runner.run(suite())
 
383
 
 
384
if __name__ == "__main__":
 
385
    test()