~psycopg/psycopg/2.0.x

232 by Fabio Tranchitella
Added psycopg2da, the zope3 database adapter for psycopg2.
1
# psycopg2da
2
# Copyright (C) 2006 Fabio Tranchitella <fabio@tranchitella.it>
3
#
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License as published by
6
# the Free Software Foundation; either version 2 of the License, or
7
# (at your option) any later version.
8
#
9
# This program is distributed in the hope that it will be useful,
10
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
# GNU General Public License for more details.
13
#
14
# You should have received a copy of the GNU General Public License
15
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17
#
18
"""Unit tests for Psycopg2DA."""
19
20
from unittest import TestCase, TestSuite, main, makeSuite
21
from datetime import tzinfo, timedelta
22
23
import psycopg2
24
import psycopg2.extensions
25
26
27
class Stub(object):
28
29
    def __init__(self, **kw):
30
        self.__dict__.update(kw)
31
32
class TZStub(tzinfo):
33
34
    def __init__(self, h, m):
35
        self.offset = h * 60 + m
36
37
    def utcoffset(self, dt):
38
        return timedelta(minutes=self.offset)
39
40
    def dst(self, dt):
41
        return 0
42
43
    def tzname(self, dt):
44
        return ''
45
46
    def __repr__(self):
47
        return 'tzinfo(%d)' % self.offset
48
49
    def __reduce__(self):
50
        return type(self), (), self.__dict__
51
52
class ConnectionStub(object):
53
54
    def set_isolation_level(self, level):
55
        pass
56
57
class Psycopg2Stub(object):
58
59
    __shared_state = {}     # 'Borg' design pattern
60
61
    DATE = psycopg2.extensions.DATE
62
    TIME = psycopg2.extensions.TIME
63
    DATETIME = psycopg2.DATETIME
64
    INTERVAL = psycopg2.extensions.INTERVAL
65
    ISOLATION_LEVEL_SERIALIZABLE = psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE
66
67
    def __init__(self):
68
        self.__dict__ = self.__shared_state
69
        self.types = {}
70
71
    def connect(self, connection_string):
72
        self.last_connection_string = connection_string
73
        return ConnectionStub()
74
75
    def new_type(self, values, name, converter):
76
        return Stub(name=name, values=values)
77
78
    def register_type(self, type):
79
        for typeid in type.values:
80
            self.types[typeid] = type
81
82
    def getExtensions(self):
83
        return self
84
    extensions = property(getExtensions)
85
86
87
class TestPsycopg2TypeConversion(TestCase):
88
89
    def test_conv_date(self):
90
        from psycopg2da.adapter import _conv_date
91
        from datetime import date
92
        def c(s):
93
            return _conv_date(s, None)
94
        self.assertEquals(c(''), None)
95
        self.assertEquals(c('2001-03-02'), date(2001, 3, 2))
96
97
    def test_conv_time(self):
98
        from psycopg2da.adapter import _conv_time
99
        from datetime import time
100
        def c(s):
101
            return _conv_time(s, None)
102
        self.assertEquals(c(''), None)
103
        self.assertEquals(c('23:17:57'), time(23, 17, 57))
104
        self.assertEquals(c('23:17:57.037'), time(23, 17, 57, 37000))
105
106
    def test_conv_timetz(self):
107
        from psycopg2da.adapter import _conv_timetz
108
        from datetime import time
109
        def c(s):
110
            return _conv_timetz(s, None)
111
        self.assertEquals(c(''), None)
112
        self.assertEquals(c('12:44:01+01:00'), time(12, 44, 01, 0, TZStub(1,0)))
113
        self.assertEquals(c('12:44:01.037-00:30'), time(12, 44, 01, 37000, TZStub(0,-30)))
114
115
    def test_conv_timestamp(self):
116
        from psycopg2da.adapter import _conv_timestamp
117
        from datetime import datetime
118
        def c(s):
119
            return _conv_timestamp(s, None)
120
        self.assertEquals(c(''), None)
121
        self.assertEquals(c('2001-03-02 12:44:01'),
122
                          datetime(2001, 3, 2, 12, 44, 01))
123
        self.assertEquals(c('2001-03-02 12:44:01.037'),
124
                          datetime(2001, 3, 2, 12, 44, 01, 37000))
125
        self.assertEquals(c('2001-03-02 12:44:01.000001'),
126
                          datetime(2001, 3, 2, 12, 44, 01, 1))
127
128
    def test_conv_timestamptz(self):
129
        from psycopg2da.adapter import _conv_timestamptz
130
        from datetime import datetime
131
        def c(s):
132
            return _conv_timestamptz(s, None)
133
        self.assertEquals(c(''), None)
134
135
        self.assertEquals(c('2001-03-02 12:44:01+01:00'),
136
                  datetime(2001, 3, 2, 12, 44, 01, 0, TZStub(1,0)))
137
        self.assertEquals(c('2001-03-02 12:44:01.037-00:30'),
138
                  datetime(2001, 3, 2, 12, 44, 01, 37000, TZStub(0,-30)))
139
        self.assertEquals(c('2001-03-02 12:44:01.000001+12:00'),
140
                  datetime(2001, 3, 2, 12, 44, 01, 1, TZStub(12,0)))
141
        self.assertEquals(c('2001-06-25 12:14:00-07'),
142
                  datetime(2001, 6, 25, 12, 14, 00, 0, TZStub(-7,0)))
143
144
    def test_conv_interval(self):
145
        from psycopg2da.adapter import _conv_interval
146
        from datetime import timedelta
147
        def c(s):
148
            return _conv_interval(s, None)
149
150
        self.assertEquals(c(''), None)
151
        self.assertEquals(c('01:00'), timedelta(hours=1))
152
        self.assertEquals(c('00:15'), timedelta(minutes=15))
153
        self.assertEquals(c('00:00:47'), timedelta(seconds=47))
154
        self.assertEquals(c('00:00:00.037'), timedelta(microseconds=37000))
155
        self.assertEquals(c('00:00:00.111111'), timedelta(microseconds=111111))
156
        self.assertEquals(c('1 day'), timedelta(days=1))
157
        self.assertEquals(c('2 days'), timedelta(days=2))
158
        self.assertEquals(c('374 days'), timedelta(days=374))
159
        self.assertEquals(c('2 days 03:20:15.123456'),
160
                          timedelta(days=2, hours=3, minutes=20,
161
                                    seconds=15, microseconds=123456))
162
        # XXX There's a problem with years and months.  Currently timedelta
163
        # cannot represent them accurately
164
        self.assertEquals(c('1 month'), '1 month')
165
        self.assertEquals(c('2 months'), '2 months')
166
        self.assertEquals(c('1 mon'), '1 mon')
167
        self.assertEquals(c('2 mons'), '2 mons')
168
        self.assertEquals(c('1 year'), '1 year')
169
        self.assertEquals(c('3 years'), '3 years')
170
        # Later we might be able to use
171
        ## self.assertEquals(c('1 month'), timedelta(months=1))
172
        ## self.assertEquals(c('2 months'), timedelta(months=2))
173
        ## self.assertEquals(c('1 year'), timedelta(years=1))
174
        ## self.assertEquals(c('3 years'), timedelta(years=3))
175
176
        self.assertRaises(ValueError, c, '2 day')
177
        self.assertRaises(ValueError, c, '2days')
178
        self.assertRaises(ValueError, c, '123')
179
180
    def test_conv_string(self):
181
        from psycopg2da.adapter import _get_string_conv
182
        _conv_string = _get_string_conv("utf-8")
183
        def c(s):
184
            return _conv_string(s, None)
185
        self.assertEquals(c(None), None)
186
        self.assertEquals(c(''), u'')
187
        self.assertEquals(c('c'), u'c')
188
        self.assertEquals(c('\xc3\x82\xc2\xa2'), u'\xc2\xa2')
189
        self.assertEquals(c('c\xc3\x82\xc2\xa2'), u'c\xc2\xa2')
190
191
class TestPsycopg2Adapter(TestCase):
192
193
    def setUp(self):
194
        import psycopg2da.adapter as adapter
195
        self.real_psycopg2 = adapter.psycopg2
196
        adapter.psycopg2 = self.psycopg2_stub = Psycopg2Stub()
197
198
    def tearDown(self):
199
        import psycopg2da.adapter as adapter
200
        adapter.psycopg2 = self.real_psycopg2
201
202
    def test_connection_factory(self):
203
        from psycopg2da.adapter import Psycopg2Adapter
204
        a = Psycopg2Adapter('dbi://username:password@hostname:port/dbname;junk=ignored')
205
        c = a._connection_factory()
206
        args = self.psycopg2_stub.last_connection_string.split()
207
        args.sort()
208
        self.assertEquals(args, ['dbname=dbname', 'host=hostname',
209
                                 'password=password', 'port=port',
210
                                 'user=username'])
211
212
    def test_registerTypes(self):
213
        import psycopg2da.adapter as adapter
214
        from psycopg2da.adapter import Psycopg2Adapter
215
        a = Psycopg2Adapter('dbi://')
216
        a.registerTypes()
217
        for typename in ('DATE', 'TIME', 'TIMETZ', 'TIMESTAMP',
218
                         'TIMESTAMPTZ', 'INTERVAL'):
219
            typeid = getattr(adapter, '%s_OID' % typename)
220
            result = self.psycopg2_stub.types.get(typeid, None)
221
            if not result:
222
                # comparing None with psycopg2.type object segfaults
223
                self.fail("did not register %s (%d): got None, not Z%s"
224
                          % (typename, typeid, typename))
225
            else:
226
                result_name = getattr(result, 'name', 'None')
227
                self.assertEquals(result, getattr(adapter, typename),
228
                              "did not register %s (%d): got %s, not Z%s"
229
                              % (typename, typeid, result_name, typename))
230
231
232
class TestISODateTime(TestCase):
233
234
    # Test if date/time parsing functions accept a sensible subset of ISO-8601
235
    # compliant date/time strings.
236
    #
237
    # Resources:
238
    #   http://www.cl.cam.ac.uk/~mgk25/iso-time.html
239
    #   http://www.mcs.vuw.ac.nz/technical/software/SGML/doc/iso8601/ISO8601.html
240
    #   http://www.w3.org/TR/NOTE-datetime
241
    #   http://www.ietf.org/rfc/rfc3339.txt
242
243
    basic_dates     = (('20020304',   (2002, 03, 04)),
244
                       ('20000229',   (2000, 02, 29)))
245
246
    extended_dates  = (('2002-03-04', (2002, 03, 04)),
247
                       ('2000-02-29', (2000, 02, 29)))
248
249
    basic_times     = (('12',         (12, 0, 0)),
250
                       ('1234',       (12, 34, 0)),
251
                       ('123417',     (12, 34, 17)),
252
                       ('123417.5',   (12, 34, 17.5)),
253
                       ('123417,5',   (12, 34, 17.5)))
254
255
    extended_times  = (('12',         (12, 0, 0)),
256
                       ('12:34',      (12, 34, 0)),
257
                       ('12:34:17',   (12, 34, 17)),
258
                       ('12:34:17.5', (12, 34, 17.5)),
259
                       ('12:34:17,5', (12, 34, 17.5)))
260
261
    basic_tzs       = (('Z', 0),
262
                       ('+02', 2*60),
263
                       ('+1130', 11*60+30),
264
                       ('-05', -5*60),
265
                       ('-0030', -30))
266
267
    extended_tzs    = (('Z', 0),
268
                       ('+02', 2*60),
269
                       ('+11:30', 11*60+30),
270
                       ('-05', -5*60),
271
                       ('-00:30', -30))
272
273
    time_separators = (' ', 'T')
274
275
    bad_dates       = ('', 'foo', 'XXXXXXXX', 'XXXX-XX-XX', '2001-2-29',
276
                       '1990/13/14')
277
278
    bad_times       = ('', 'foo', 'XXXXXX', '12:34,5', '12:34:56,')
279
280
    bad_timetzs     = ('12+12 34', '15:45 +1234', '18:00-12:34:56', '18:00+123', '18:00Q')
281
282
    bad_datetimes   = ('', 'foo', '2002-03-0412:33')
283
284
    bad_datetimetzs = ('', 'foo', '2002-03-04T12:33 +1200')
285
286
    exception_type  = ValueError
287
288
    # We need the following funcions:
289
    #   parse_date       -> (year, month, day)
290
    #   parse_time       -> (hour, minute, second)
291
    #   parse_timetz     -> (hour, minute, second, tzoffset)
292
    #   parse_datetime   -> (year, month, day, hour, minute, second)
293
    #   parse_datetimetz -> (year, month, day, hour, minute, second, tzoffset)
294
    # second can be a float, all other values are ints
295
    # tzoffset is offset in minutes east of UTC
296
297
    def setUp(self):
298
        from psycopg2da.adapter import parse_date, parse_time, \
299
                                parse_timetz, parse_datetime, parse_datetimetz
300
        self.parse_date = parse_date
301
        self.parse_time = parse_time
302
        self.parse_timetz = parse_timetz
303
        self.parse_datetime = parse_datetime
304
        self.parse_datetimetz = parse_datetimetz
305
306
    def test_basic_date(self):
307
        for s, d in self.basic_dates:
308
            self.assertEqual(self.parse_date(s), d)
309
310
    def test_extended_date(self):
311
        for s, d in self.extended_dates:
312
            self.assertEqual(self.parse_date(s), d)
313
314
    def test_bad_date(self):
315
        for s in self.bad_dates:
316
            self.assertRaises(self.exception_type, self.parse_date, s)
317
318
    def test_basic_time(self):
319
        for s, t in self.basic_times:
320
            self.assertEqual(self.parse_time(s), t)
321
322
    def test_extended_time(self):
323
        for s, t in self.extended_times:
324
            self.assertEqual(self.parse_time(s), t)
325
326
    def test_bad_time(self):
327
        for s in self.bad_times:
328
            self.assertRaises(self.exception_type, self.parse_time, s)
329
330
    def test_basic_timetz(self):
331
        for s, t in self.basic_times:
332
            for tz, off in self.basic_tzs:
333
                self.assertEqual(self.parse_timetz(s+tz), t + (off,))
334
335
    def test_extended_timetz(self):
336
        for s, t in self.extended_times:
337
            for tz, off in self.extended_tzs:
338
                self.assertEqual(self.parse_timetz(s+tz), t + (off,))
339
340
    def test_bad_timetzs(self):
341
        for s in self.bad_timetzs:
342
            self.assertRaises(self.exception_type, self.parse_timetz, s)
343
344
    def test_basic_datetime(self):
345
        for ds, d in self.basic_dates:
346
            for ts, t in self.basic_times:
347
                for sep in self.time_separators:
348
                    self.assertEqual(self.parse_datetime(ds+sep+ts), d + t)
349
350
    def test_extended_datetime(self):
351
        for ds, d in self.extended_dates:
352
            for ts, t in self.extended_times:
353
                for sep in self.time_separators:
354
                    self.assertEqual(self.parse_datetime(ds+sep+ts), d + t)
355
356
    def test_bad_datetimes(self):
357
        for s in self.bad_datetimes:
358
            self.assertRaises(self.exception_type, self.parse_datetime, s)
359
360
    def test_basic_datetimetz(self):
361
        for ds, d in self.basic_dates:
362
            for ts, t in self.basic_times:
363
                for tz, off in self.basic_tzs:
364
                    for sep in self.time_separators:
365
                        self.assertEqual(self.parse_datetimetz(ds+sep+ts+tz),
366
                                         d + t + (off,))
367
368
    def test_extended_datetimetz(self):
369
        for ds, d in self.extended_dates:
370
            for ts, t in self.extended_times:
371
                for tz, off in self.extended_tzs:
372
                    for sep in self.time_separators:
373
                        self.assertEqual(self.parse_datetimetz(ds+sep+ts+tz),
374
                                         d + t + (off,))
375
376
    def test_bad_datetimetzs(self):
377
        for s in self.bad_datetimetzs:
378
            self.assertRaises(self.exception_type, self.parse_datetimetz, s)
379
380
381
def test_suite():
382
    return TestSuite((
383
        makeSuite(TestPsycopg2TypeConversion),
384
        makeSuite(TestPsycopg2Adapter),
385
        makeSuite(TestISODateTime),
386
        ))
387
388
if __name__=='__main__':
389
    main(defaultTest='test_suite')