232
by Fabio Tranchitella
Added psycopg2da, the zope3 database adapter for psycopg2. |
1 |
# psycopg2da
|
2 |
# Copyright (C) 2006 Fabio Tranchitella <fabio@tranchitella.it>
|
|
3 |
#
|
|
4 |
# This program is free software; you can redistribute it and/or modify
|
|
5 |
# it under the terms of the GNU General Public License as published by
|
|
6 |
# the Free Software Foundation; either version 2 of the License, or
|
|
7 |
# (at your option) any later version.
|
|
8 |
#
|
|
9 |
# This program is distributed in the hope that it will be useful,
|
|
10 |
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
11 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
12 |
# GNU General Public License for more details.
|
|
13 |
#
|
|
14 |
# You should have received a copy of the GNU General Public License
|
|
15 |
# along with this program; if not, write to the Free Software
|
|
16 |
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
17 |
#
|
|
18 |
"""Unit tests for Psycopg2DA."""
|
|
19 |
||
20 |
from unittest import TestCase, TestSuite, main, makeSuite |
|
21 |
from datetime import tzinfo, timedelta |
|
22 |
||
23 |
import psycopg2 |
|
24 |
import psycopg2.extensions |
|
25 |
||
26 |
||
27 |
class Stub(object): |
|
28 |
||
29 |
def __init__(self, **kw): |
|
30 |
self.__dict__.update(kw) |
|
31 |
||
32 |
class TZStub(tzinfo): |
|
33 |
||
34 |
def __init__(self, h, m): |
|
35 |
self.offset = h * 60 + m |
|
36 |
||
37 |
def utcoffset(self, dt): |
|
38 |
return timedelta(minutes=self.offset) |
|
39 |
||
40 |
def dst(self, dt): |
|
41 |
return 0 |
|
42 |
||
43 |
def tzname(self, dt): |
|
44 |
return '' |
|
45 |
||
46 |
def __repr__(self): |
|
47 |
return 'tzinfo(%d)' % self.offset |
|
48 |
||
49 |
def __reduce__(self): |
|
50 |
return type(self), (), self.__dict__ |
|
51 |
||
52 |
class ConnectionStub(object): |
|
53 |
||
54 |
def set_isolation_level(self, level): |
|
55 |
pass
|
|
56 |
||
57 |
class Psycopg2Stub(object): |
|
58 |
||
59 |
__shared_state = {} # 'Borg' design pattern |
|
60 |
||
61 |
DATE = psycopg2.extensions.DATE |
|
62 |
TIME = psycopg2.extensions.TIME |
|
63 |
DATETIME = psycopg2.DATETIME |
|
64 |
INTERVAL = psycopg2.extensions.INTERVAL |
|
65 |
ISOLATION_LEVEL_SERIALIZABLE = psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE |
|
66 |
||
67 |
def __init__(self): |
|
68 |
self.__dict__ = self.__shared_state |
|
69 |
self.types = {} |
|
70 |
||
71 |
def connect(self, connection_string): |
|
72 |
self.last_connection_string = connection_string |
|
73 |
return ConnectionStub() |
|
74 |
||
75 |
def new_type(self, values, name, converter): |
|
76 |
return Stub(name=name, values=values) |
|
77 |
||
78 |
def register_type(self, type): |
|
79 |
for typeid in type.values: |
|
80 |
self.types[typeid] = type |
|
81 |
||
82 |
def getExtensions(self): |
|
83 |
return self |
|
84 |
extensions = property(getExtensions) |
|
85 |
||
86 |
||
87 |
class TestPsycopg2TypeConversion(TestCase): |
|
88 |
||
89 |
def test_conv_date(self): |
|
90 |
from psycopg2da.adapter import _conv_date |
|
91 |
from datetime import date |
|
92 |
def c(s): |
|
93 |
return _conv_date(s, None) |
|
94 |
self.assertEquals(c(''), None) |
|
95 |
self.assertEquals(c('2001-03-02'), date(2001, 3, 2)) |
|
96 |
||
97 |
def test_conv_time(self): |
|
98 |
from psycopg2da.adapter import _conv_time |
|
99 |
from datetime import time |
|
100 |
def c(s): |
|
101 |
return _conv_time(s, None) |
|
102 |
self.assertEquals(c(''), None) |
|
103 |
self.assertEquals(c('23:17:57'), time(23, 17, 57)) |
|
104 |
self.assertEquals(c('23:17:57.037'), time(23, 17, 57, 37000)) |
|
105 |
||
106 |
def test_conv_timetz(self): |
|
107 |
from psycopg2da.adapter import _conv_timetz |
|
108 |
from datetime import time |
|
109 |
def c(s): |
|
110 |
return _conv_timetz(s, None) |
|
111 |
self.assertEquals(c(''), None) |
|
112 |
self.assertEquals(c('12:44:01+01:00'), time(12, 44, 01, 0, TZStub(1,0))) |
|
113 |
self.assertEquals(c('12:44:01.037-00:30'), time(12, 44, 01, 37000, TZStub(0,-30))) |
|
114 |
||
115 |
def test_conv_timestamp(self): |
|
116 |
from psycopg2da.adapter import _conv_timestamp |
|
117 |
from datetime import datetime |
|
118 |
def c(s): |
|
119 |
return _conv_timestamp(s, None) |
|
120 |
self.assertEquals(c(''), None) |
|
121 |
self.assertEquals(c('2001-03-02 12:44:01'), |
|
122 |
datetime(2001, 3, 2, 12, 44, 01)) |
|
123 |
self.assertEquals(c('2001-03-02 12:44:01.037'), |
|
124 |
datetime(2001, 3, 2, 12, 44, 01, 37000)) |
|
125 |
self.assertEquals(c('2001-03-02 12:44:01.000001'), |
|
126 |
datetime(2001, 3, 2, 12, 44, 01, 1)) |
|
127 |
||
128 |
def test_conv_timestamptz(self): |
|
129 |
from psycopg2da.adapter import _conv_timestamptz |
|
130 |
from datetime import datetime |
|
131 |
def c(s): |
|
132 |
return _conv_timestamptz(s, None) |
|
133 |
self.assertEquals(c(''), None) |
|
134 |
||
135 |
self.assertEquals(c('2001-03-02 12:44:01+01:00'), |
|
136 |
datetime(2001, 3, 2, 12, 44, 01, 0, TZStub(1,0))) |
|
137 |
self.assertEquals(c('2001-03-02 12:44:01.037-00:30'), |
|
138 |
datetime(2001, 3, 2, 12, 44, 01, 37000, TZStub(0,-30))) |
|
139 |
self.assertEquals(c('2001-03-02 12:44:01.000001+12:00'), |
|
140 |
datetime(2001, 3, 2, 12, 44, 01, 1, TZStub(12,0))) |
|
141 |
self.assertEquals(c('2001-06-25 12:14:00-07'), |
|
142 |
datetime(2001, 6, 25, 12, 14, 00, 0, TZStub(-7,0))) |
|
143 |
||
144 |
def test_conv_interval(self): |
|
145 |
from psycopg2da.adapter import _conv_interval |
|
146 |
from datetime import timedelta |
|
147 |
def c(s): |
|
148 |
return _conv_interval(s, None) |
|
149 |
||
150 |
self.assertEquals(c(''), None) |
|
151 |
self.assertEquals(c('01:00'), timedelta(hours=1)) |
|
152 |
self.assertEquals(c('00:15'), timedelta(minutes=15)) |
|
153 |
self.assertEquals(c('00:00:47'), timedelta(seconds=47)) |
|
154 |
self.assertEquals(c('00:00:00.037'), timedelta(microseconds=37000)) |
|
155 |
self.assertEquals(c('00:00:00.111111'), timedelta(microseconds=111111)) |
|
156 |
self.assertEquals(c('1 day'), timedelta(days=1)) |
|
157 |
self.assertEquals(c('2 days'), timedelta(days=2)) |
|
158 |
self.assertEquals(c('374 days'), timedelta(days=374)) |
|
159 |
self.assertEquals(c('2 days 03:20:15.123456'), |
|
160 |
timedelta(days=2, hours=3, minutes=20, |
|
161 |
seconds=15, microseconds=123456)) |
|
162 |
# XXX There's a problem with years and months. Currently timedelta
|
|
163 |
# cannot represent them accurately
|
|
164 |
self.assertEquals(c('1 month'), '1 month') |
|
165 |
self.assertEquals(c('2 months'), '2 months') |
|
166 |
self.assertEquals(c('1 mon'), '1 mon') |
|
167 |
self.assertEquals(c('2 mons'), '2 mons') |
|
168 |
self.assertEquals(c('1 year'), '1 year') |
|
169 |
self.assertEquals(c('3 years'), '3 years') |
|
170 |
# Later we might be able to use
|
|
171 |
## self.assertEquals(c('1 month'), timedelta(months=1))
|
|
172 |
## self.assertEquals(c('2 months'), timedelta(months=2))
|
|
173 |
## self.assertEquals(c('1 year'), timedelta(years=1))
|
|
174 |
## self.assertEquals(c('3 years'), timedelta(years=3))
|
|
175 |
||
176 |
self.assertRaises(ValueError, c, '2 day') |
|
177 |
self.assertRaises(ValueError, c, '2days') |
|
178 |
self.assertRaises(ValueError, c, '123') |
|
179 |
||
180 |
def test_conv_string(self): |
|
181 |
from psycopg2da.adapter import _get_string_conv |
|
182 |
_conv_string = _get_string_conv("utf-8") |
|
183 |
def c(s): |
|
184 |
return _conv_string(s, None) |
|
185 |
self.assertEquals(c(None), None) |
|
186 |
self.assertEquals(c(''), u'') |
|
187 |
self.assertEquals(c('c'), u'c') |
|
188 |
self.assertEquals(c('\xc3\x82\xc2\xa2'), u'\xc2\xa2') |
|
189 |
self.assertEquals(c('c\xc3\x82\xc2\xa2'), u'c\xc2\xa2') |
|
190 |
||
191 |
class TestPsycopg2Adapter(TestCase): |
|
192 |
||
193 |
def setUp(self): |
|
194 |
import psycopg2da.adapter as adapter |
|
195 |
self.real_psycopg2 = adapter.psycopg2 |
|
196 |
adapter.psycopg2 = self.psycopg2_stub = Psycopg2Stub() |
|
197 |
||
198 |
def tearDown(self): |
|
199 |
import psycopg2da.adapter as adapter |
|
200 |
adapter.psycopg2 = self.real_psycopg2 |
|
201 |
||
202 |
def test_connection_factory(self): |
|
203 |
from psycopg2da.adapter import Psycopg2Adapter |
|
204 |
a = Psycopg2Adapter('dbi://username:password@hostname:port/dbname;junk=ignored') |
|
205 |
c = a._connection_factory() |
|
206 |
args = self.psycopg2_stub.last_connection_string.split() |
|
207 |
args.sort() |
|
208 |
self.assertEquals(args, ['dbname=dbname', 'host=hostname', |
|
209 |
'password=password', 'port=port', |
|
210 |
'user=username']) |
|
211 |
||
212 |
def test_registerTypes(self): |
|
213 |
import psycopg2da.adapter as adapter |
|
214 |
from psycopg2da.adapter import Psycopg2Adapter |
|
215 |
a = Psycopg2Adapter('dbi://') |
|
216 |
a.registerTypes() |
|
217 |
for typename in ('DATE', 'TIME', 'TIMETZ', 'TIMESTAMP', |
|
218 |
'TIMESTAMPTZ', 'INTERVAL'): |
|
219 |
typeid = getattr(adapter, '%s_OID' % typename) |
|
220 |
result = self.psycopg2_stub.types.get(typeid, None) |
|
221 |
if not result: |
|
222 |
# comparing None with psycopg2.type object segfaults
|
|
223 |
self.fail("did not register %s (%d): got None, not Z%s" |
|
224 |
% (typename, typeid, typename)) |
|
225 |
else: |
|
226 |
result_name = getattr(result, 'name', 'None') |
|
227 |
self.assertEquals(result, getattr(adapter, typename), |
|
228 |
"did not register %s (%d): got %s, not Z%s" |
|
229 |
% (typename, typeid, result_name, typename)) |
|
230 |
||
231 |
||
232 |
class TestISODateTime(TestCase): |
|
233 |
||
234 |
# Test if date/time parsing functions accept a sensible subset of ISO-8601
|
|
235 |
# compliant date/time strings.
|
|
236 |
#
|
|
237 |
# Resources:
|
|
238 |
# http://www.cl.cam.ac.uk/~mgk25/iso-time.html
|
|
239 |
# http://www.mcs.vuw.ac.nz/technical/software/SGML/doc/iso8601/ISO8601.html
|
|
240 |
# http://www.w3.org/TR/NOTE-datetime
|
|
241 |
# http://www.ietf.org/rfc/rfc3339.txt
|
|
242 |
||
243 |
basic_dates = (('20020304', (2002, 03, 04)), |
|
244 |
('20000229', (2000, 02, 29))) |
|
245 |
||
246 |
extended_dates = (('2002-03-04', (2002, 03, 04)), |
|
247 |
('2000-02-29', (2000, 02, 29))) |
|
248 |
||
249 |
basic_times = (('12', (12, 0, 0)), |
|
250 |
('1234', (12, 34, 0)), |
|
251 |
('123417', (12, 34, 17)), |
|
252 |
('123417.5', (12, 34, 17.5)), |
|
253 |
('123417,5', (12, 34, 17.5))) |
|
254 |
||
255 |
extended_times = (('12', (12, 0, 0)), |
|
256 |
('12:34', (12, 34, 0)), |
|
257 |
('12:34:17', (12, 34, 17)), |
|
258 |
('12:34:17.5', (12, 34, 17.5)), |
|
259 |
('12:34:17,5', (12, 34, 17.5))) |
|
260 |
||
261 |
basic_tzs = (('Z', 0), |
|
262 |
('+02', 2*60), |
|
263 |
('+1130', 11*60+30), |
|
264 |
('-05', -5*60), |
|
265 |
('-0030', -30)) |
|
266 |
||
267 |
extended_tzs = (('Z', 0), |
|
268 |
('+02', 2*60), |
|
269 |
('+11:30', 11*60+30), |
|
270 |
('-05', -5*60), |
|
271 |
('-00:30', -30)) |
|
272 |
||
273 |
time_separators = (' ', 'T') |
|
274 |
||
275 |
bad_dates = ('', 'foo', 'XXXXXXXX', 'XXXX-XX-XX', '2001-2-29', |
|
276 |
'1990/13/14') |
|
277 |
||
278 |
bad_times = ('', 'foo', 'XXXXXX', '12:34,5', '12:34:56,') |
|
279 |
||
280 |
bad_timetzs = ('12+12 34', '15:45 +1234', '18:00-12:34:56', '18:00+123', '18:00Q') |
|
281 |
||
282 |
bad_datetimes = ('', 'foo', '2002-03-0412:33') |
|
283 |
||
284 |
bad_datetimetzs = ('', 'foo', '2002-03-04T12:33 +1200') |
|
285 |
||
286 |
exception_type = ValueError |
|
287 |
||
288 |
# We need the following funcions:
|
|
289 |
# parse_date -> (year, month, day)
|
|
290 |
# parse_time -> (hour, minute, second)
|
|
291 |
# parse_timetz -> (hour, minute, second, tzoffset)
|
|
292 |
# parse_datetime -> (year, month, day, hour, minute, second)
|
|
293 |
# parse_datetimetz -> (year, month, day, hour, minute, second, tzoffset)
|
|
294 |
# second can be a float, all other values are ints
|
|
295 |
# tzoffset is offset in minutes east of UTC
|
|
296 |
||
297 |
def setUp(self): |
|
298 |
from psycopg2da.adapter import parse_date, parse_time, \ |
|
299 |
parse_timetz, parse_datetime, parse_datetimetz |
|
300 |
self.parse_date = parse_date |
|
301 |
self.parse_time = parse_time |
|
302 |
self.parse_timetz = parse_timetz |
|
303 |
self.parse_datetime = parse_datetime |
|
304 |
self.parse_datetimetz = parse_datetimetz |
|
305 |
||
306 |
def test_basic_date(self): |
|
307 |
for s, d in self.basic_dates: |
|
308 |
self.assertEqual(self.parse_date(s), d) |
|
309 |
||
310 |
def test_extended_date(self): |
|
311 |
for s, d in self.extended_dates: |
|
312 |
self.assertEqual(self.parse_date(s), d) |
|
313 |
||
314 |
def test_bad_date(self): |
|
315 |
for s in self.bad_dates: |
|
316 |
self.assertRaises(self.exception_type, self.parse_date, s) |
|
317 |
||
318 |
def test_basic_time(self): |
|
319 |
for s, t in self.basic_times: |
|
320 |
self.assertEqual(self.parse_time(s), t) |
|
321 |
||
322 |
def test_extended_time(self): |
|
323 |
for s, t in self.extended_times: |
|
324 |
self.assertEqual(self.parse_time(s), t) |
|
325 |
||
326 |
def test_bad_time(self): |
|
327 |
for s in self.bad_times: |
|
328 |
self.assertRaises(self.exception_type, self.parse_time, s) |
|
329 |
||
330 |
def test_basic_timetz(self): |
|
331 |
for s, t in self.basic_times: |
|
332 |
for tz, off in self.basic_tzs: |
|
333 |
self.assertEqual(self.parse_timetz(s+tz), t + (off,)) |
|
334 |
||
335 |
def test_extended_timetz(self): |
|
336 |
for s, t in self.extended_times: |
|
337 |
for tz, off in self.extended_tzs: |
|
338 |
self.assertEqual(self.parse_timetz(s+tz), t + (off,)) |
|
339 |
||
340 |
def test_bad_timetzs(self): |
|
341 |
for s in self.bad_timetzs: |
|
342 |
self.assertRaises(self.exception_type, self.parse_timetz, s) |
|
343 |
||
344 |
def test_basic_datetime(self): |
|
345 |
for ds, d in self.basic_dates: |
|
346 |
for ts, t in self.basic_times: |
|
347 |
for sep in self.time_separators: |
|
348 |
self.assertEqual(self.parse_datetime(ds+sep+ts), d + t) |
|
349 |
||
350 |
def test_extended_datetime(self): |
|
351 |
for ds, d in self.extended_dates: |
|
352 |
for ts, t in self.extended_times: |
|
353 |
for sep in self.time_separators: |
|
354 |
self.assertEqual(self.parse_datetime(ds+sep+ts), d + t) |
|
355 |
||
356 |
def test_bad_datetimes(self): |
|
357 |
for s in self.bad_datetimes: |
|
358 |
self.assertRaises(self.exception_type, self.parse_datetime, s) |
|
359 |
||
360 |
def test_basic_datetimetz(self): |
|
361 |
for ds, d in self.basic_dates: |
|
362 |
for ts, t in self.basic_times: |
|
363 |
for tz, off in self.basic_tzs: |
|
364 |
for sep in self.time_separators: |
|
365 |
self.assertEqual(self.parse_datetimetz(ds+sep+ts+tz), |
|
366 |
d + t + (off,)) |
|
367 |
||
368 |
def test_extended_datetimetz(self): |
|
369 |
for ds, d in self.extended_dates: |
|
370 |
for ts, t in self.extended_times: |
|
371 |
for tz, off in self.extended_tzs: |
|
372 |
for sep in self.time_separators: |
|
373 |
self.assertEqual(self.parse_datetimetz(ds+sep+ts+tz), |
|
374 |
d + t + (off,)) |
|
375 |
||
376 |
def test_bad_datetimetzs(self): |
|
377 |
for s in self.bad_datetimetzs: |
|
378 |
self.assertRaises(self.exception_type, self.parse_datetimetz, s) |
|
379 |
||
380 |
||
381 |
def test_suite(): |
|
382 |
return TestSuite(( |
|
383 |
makeSuite(TestPsycopg2TypeConversion), |
|
384 |
makeSuite(TestPsycopg2Adapter), |
|
385 |
makeSuite(TestISODateTime), |
|
386 |
))
|
|
387 |
||
388 |
if __name__=='__main__': |
|
389 |
main(defaultTest='test_suite') |