1
#-*- coding: ISO-8859-1 -*-
2
# pysqlite2/test/types.py: tests for type conversion and detection
4
# Copyright (C) 2005-2007 Gerhard Hļæ½ring <gh@ghaering.de>
6
# This file is part of pysqlite.
8
# This software is provided 'as-is', without any express or implied
9
# warranty. In no event will the authors be held liable for any damages
10
# arising from the use of this software.
12
# Permission is granted to anyone to use this software for any purpose,
13
# including commercial applications, and to alter it and redistribute it
14
# freely, subject to the following restrictions:
16
# 1. The origin of this software must not be misrepresented; you must not
17
# claim that you wrote the original software. If you use this software
18
# in a product, an acknowledgment in the product documentation would be
19
# appreciated but is not required.
20
# 2. Altered source versions must be plainly marked as such, and must not be
21
# misrepresented as being the original software.
22
# 3. This notice may not be removed or altered from any source distribution.
26
import sqlite3 as sqlite
28
class SqliteTypeTests(unittest.TestCase):
30
self.con = sqlite.connect(":memory:")
31
self.cur = self.con.cursor()
32
self.cur.execute("create table test(i integer, s varchar, f number, b blob)")
38
def CheckString(self):
39
self.cur.execute("insert into test(s) values (?)", (u"ļæ½sterreich",))
40
self.cur.execute("select s from test")
41
row = self.cur.fetchone()
42
self.failUnlessEqual(row[0], u"ļæ½sterreich")
44
def CheckSmallInt(self):
45
self.cur.execute("insert into test(i) values (?)", (42,))
46
self.cur.execute("select i from test")
47
row = self.cur.fetchone()
48
self.failUnlessEqual(row[0], 42)
50
def CheckLargeInt(self):
52
self.cur.execute("insert into test(i) values (?)", (num,))
53
self.cur.execute("select i from test")
54
row = self.cur.fetchone()
55
self.failUnlessEqual(row[0], num)
59
self.cur.execute("insert into test(f) values (?)", (val,))
60
self.cur.execute("select f from test")
61
row = self.cur.fetchone()
62
self.failUnlessEqual(row[0], val)
65
val = buffer("Guglhupf")
66
self.cur.execute("insert into test(b) values (?)", (val,))
67
self.cur.execute("select b from test")
68
row = self.cur.fetchone()
69
self.failUnlessEqual(row[0], val)
71
def CheckUnicodeExecute(self):
72
self.cur.execute(u"select 'ļæ½sterreich'")
73
row = self.cur.fetchone()
74
self.failUnlessEqual(row[0], u"ļæ½sterreich")
76
class DeclTypesTests(unittest.TestCase):
78
def __init__(self, _val):
81
def __cmp__(self, other):
82
if not isinstance(other, DeclTypesTests.Foo):
84
if self.val == other.val:
89
def __conform__(self, protocol):
90
if protocol is sqlite.PrepareProtocol:
96
return "<%s>" % self.val
99
self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
100
self.cur = self.con.cursor()
101
self.cur.execute("create table test(i int, s str, f float, b bool, u unicode, foo foo, bin blob, n1 number, n2 number(5))")
103
# override float, make them always return the same number
104
sqlite.converters["FLOAT"] = lambda x: 47.2
106
# and implement two custom ones
107
sqlite.converters["BOOL"] = lambda x: bool(int(x))
108
sqlite.converters["FOO"] = DeclTypesTests.Foo
109
sqlite.converters["WRONG"] = lambda x: "WRONG"
110
sqlite.converters["NUMBER"] = float
113
del sqlite.converters["FLOAT"]
114
del sqlite.converters["BOOL"]
115
del sqlite.converters["FOO"]
116
del sqlite.converters["NUMBER"]
120
def CheckString(self):
122
self.cur.execute("insert into test(s) values (?)", ("foo",))
123
self.cur.execute('select s as "s [WRONG]" from test')
124
row = self.cur.fetchone()
125
self.failUnlessEqual(row[0], "foo")
127
def CheckSmallInt(self):
129
self.cur.execute("insert into test(i) values (?)", (42,))
130
self.cur.execute("select i from test")
131
row = self.cur.fetchone()
132
self.failUnlessEqual(row[0], 42)
134
def CheckLargeInt(self):
137
self.cur.execute("insert into test(i) values (?)", (num,))
138
self.cur.execute("select i from test")
139
row = self.cur.fetchone()
140
self.failUnlessEqual(row[0], num)
142
def CheckFloat(self):
145
self.cur.execute("insert into test(f) values (?)", (val,))
146
self.cur.execute("select f from test")
147
row = self.cur.fetchone()
148
self.failUnlessEqual(row[0], 47.2)
152
self.cur.execute("insert into test(b) values (?)", (False,))
153
self.cur.execute("select b from test")
154
row = self.cur.fetchone()
155
self.failUnlessEqual(row[0], False)
157
self.cur.execute("delete from test")
158
self.cur.execute("insert into test(b) values (?)", (True,))
159
self.cur.execute("select b from test")
160
row = self.cur.fetchone()
161
self.failUnlessEqual(row[0], True)
163
def CheckUnicode(self):
165
val = u"\xd6sterreich"
166
self.cur.execute("insert into test(u) values (?)", (val,))
167
self.cur.execute("select u from test")
168
row = self.cur.fetchone()
169
self.failUnlessEqual(row[0], val)
172
val = DeclTypesTests.Foo("bla")
173
self.cur.execute("insert into test(foo) values (?)", (val,))
174
self.cur.execute("select foo from test")
175
row = self.cur.fetchone()
176
self.failUnlessEqual(row[0], val)
178
def CheckUnsupportedSeq(self):
182
self.cur.execute("insert into test(f) values (?)", (val,))
183
self.fail("should have raised an InterfaceError")
184
except sqlite.InterfaceError:
187
self.fail("should have raised an InterfaceError")
189
def CheckUnsupportedDict(self):
193
self.cur.execute("insert into test(f) values (:val)", {"val": val})
194
self.fail("should have raised an InterfaceError")
195
except sqlite.InterfaceError:
198
self.fail("should have raised an InterfaceError")
202
val = buffer("Guglhupf")
203
self.cur.execute("insert into test(bin) values (?)", (val,))
204
self.cur.execute("select bin from test")
205
row = self.cur.fetchone()
206
self.failUnlessEqual(row[0], val)
208
def CheckNumber1(self):
209
self.cur.execute("insert into test(n1) values (5)")
210
value = self.cur.execute("select n1 from test").fetchone()[0]
211
# if the converter is not used, it's an int instead of a float
212
self.failUnlessEqual(type(value), float)
214
def CheckNumber2(self):
215
"""Checks wether converter names are cut off at '(' characters"""
216
self.cur.execute("insert into test(n2) values (5)")
217
value = self.cur.execute("select n2 from test").fetchone()[0]
218
# if the converter is not used, it's an int instead of a float
219
self.failUnlessEqual(type(value), float)
221
class ColNamesTests(unittest.TestCase):
223
self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
224
self.cur = self.con.cursor()
225
self.cur.execute("create table test(x foo)")
227
sqlite.converters["FOO"] = lambda x: "[%s]" % x
228
sqlite.converters["BAR"] = lambda x: "<%s>" % x
229
sqlite.converters["EXC"] = lambda x: 5/0
230
sqlite.converters["B1B1"] = lambda x: "MARKER"
233
del sqlite.converters["FOO"]
234
del sqlite.converters["BAR"]
235
del sqlite.converters["EXC"]
236
del sqlite.converters["B1B1"]
240
def CheckDeclTypeNotUsed(self):
242
Assures that the declared type is not used when PARSE_DECLTYPES
245
self.cur.execute("insert into test(x) values (?)", ("xxx",))
246
self.cur.execute("select x from test")
247
val = self.cur.fetchone()[0]
248
self.failUnlessEqual(val, "xxx")
251
self.cur.execute("insert into test(x) values (?)", (None,))
252
self.cur.execute("select x from test")
253
val = self.cur.fetchone()[0]
254
self.failUnlessEqual(val, None)
256
def CheckColName(self):
257
self.cur.execute("insert into test(x) values (?)", ("xxx",))
258
self.cur.execute('select x as "x [bar]" from test')
259
val = self.cur.fetchone()[0]
260
self.failUnlessEqual(val, "<xxx>")
262
# Check if the stripping of colnames works. Everything after the first
263
# whitespace should be stripped.
264
self.failUnlessEqual(self.cur.description[0][0], "x")
266
def CheckCaseInConverterName(self):
267
self.cur.execute("""select 'other' as "x [b1b1]\"""")
268
val = self.cur.fetchone()[0]
269
self.failUnlessEqual(val, "MARKER")
271
def CheckCursorDescriptionNoRow(self):
273
cursor.description should at least provide the column name(s), even if
276
self.cur.execute("select * from test where 0 = 1")
277
self.assert_(self.cur.description[0][0] == "x")
279
class ObjectAdaptationTests(unittest.TestCase):
282
cast = staticmethod(cast)
285
self.con = sqlite.connect(":memory:")
287
del sqlite.adapters[int]
290
sqlite.register_adapter(int, ObjectAdaptationTests.cast)
291
self.cur = self.con.cursor()
294
del sqlite.adapters[(int, sqlite.PrepareProtocol)]
298
def CheckCasterIsUsed(self):
299
self.cur.execute("select ?", (4,))
300
val = self.cur.fetchone()[0]
301
self.failUnlessEqual(type(val), float)
303
class BinaryConverterTests(unittest.TestCase):
305
return zlib.decompress(s)
306
convert = staticmethod(convert)
309
self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
310
sqlite.register_converter("bin", BinaryConverterTests.convert)
315
def CheckBinaryInputForConverter(self):
316
testdata = "abcdefg" * 10
317
result = self.con.execute('select ? as "x [bin]"', (buffer(zlib.compress(testdata)),)).fetchone()[0]
318
self.failUnlessEqual(testdata, result)
320
class DateTimeTests(unittest.TestCase):
322
self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
323
self.cur = self.con.cursor()
324
self.cur.execute("create table test(d date, ts timestamp)")
330
def CheckSqliteDate(self):
331
d = sqlite.Date(2004, 2, 14)
332
self.cur.execute("insert into test(d) values (?)", (d,))
333
self.cur.execute("select d from test")
334
d2 = self.cur.fetchone()[0]
335
self.failUnlessEqual(d, d2)
337
def CheckSqliteTimestamp(self):
338
ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0)
339
self.cur.execute("insert into test(ts) values (?)", (ts,))
340
self.cur.execute("select ts from test")
341
ts2 = self.cur.fetchone()[0]
342
self.failUnlessEqual(ts, ts2)
344
def CheckSqlTimestamp(self):
345
# The date functions are only available in SQLite version 3.1 or later
346
if sqlite.sqlite_version_info < (3, 1):
349
# SQLite's current_timestamp uses UTC time, while datetime.datetime.now() uses local time.
350
now = datetime.datetime.now()
351
self.cur.execute("insert into test(ts) values (current_timestamp)")
352
self.cur.execute("select ts from test")
353
ts = self.cur.fetchone()[0]
354
self.failUnlessEqual(type(ts), datetime.datetime)
355
self.failUnlessEqual(ts.year, now.year)
357
def CheckDateTimeSubSeconds(self):
358
ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000)
359
self.cur.execute("insert into test(ts) values (?)", (ts,))
360
self.cur.execute("select ts from test")
361
ts2 = self.cur.fetchone()[0]
362
self.failUnlessEqual(ts, ts2)
364
def CheckDateTimeSubSecondsFloatingPoint(self):
365
ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241)
366
self.cur.execute("insert into test(ts) values (?)", (ts,))
367
self.cur.execute("select ts from test")
368
ts2 = self.cur.fetchone()[0]
369
self.failUnlessEqual(ts, ts2)
372
sqlite_type_suite = unittest.makeSuite(SqliteTypeTests, "Check")
373
decltypes_type_suite = unittest.makeSuite(DeclTypesTests, "Check")
374
colnames_type_suite = unittest.makeSuite(ColNamesTests, "Check")
375
adaptation_suite = unittest.makeSuite(ObjectAdaptationTests, "Check")
376
bin_suite = unittest.makeSuite(BinaryConverterTests, "Check")
377
date_suite = unittest.makeSuite(DateTimeTests, "Check")
378
return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite))
381
runner = unittest.TextTestRunner()
384
if __name__ == "__main__":