2
''' Python DB API 2.0 driver compliance unit test suite.
4
This software is Public Domain and may be used without restrictions.
6
"Now we have booze and barflies entering the discussion, plus rumours of
7
DBAs on drugs... and I won't tell you what flashes through my mind each
8
time I read the subject line with 'Anal Compliance' in it. All around
9
this is turning out to be a thoroughly unwholesome unit test."
14
__rcs_id__ = '$Id: dbapi20.py,v 1.11 2005/01/02 02:41:01 zenzen Exp $'
15
__version__ = '$Revision: 1.12 $'[11:-2]
16
__author__ = 'Stuart Bishop <stuart@stuartbishop.net>'
23
# Revision 1.12 2009/02/06 03:35:11 kf7xm
24
# Tested okay with Python 3.0, includes last minute patches from Mark H.
26
# Revision 1.1.1.1.2.1 2008/09/20 19:54:59 rupole
27
# Include latest changes from main branch
30
# Revision 1.11 2005/01/02 02:41:01 zenzen
31
# Update author email address
33
# Revision 1.10 2003/10/09 03:14:14 zenzen
34
# Add test for DB API 2.0 optional extension, where database exceptions
35
# are exposed as attributes on the Connection object.
37
# Revision 1.9 2003/08/13 01:16:36 zenzen
38
# Minor tweak from Stefan Fleiter
40
# Revision 1.8 2003/04/10 00:13:25 zenzen
41
# Changes, as per suggestions by M.-A. Lemburg
42
# - Add a table prefix, to ensure namespace collisions can always be avoided
44
# Revision 1.7 2003/02/26 23:33:37 zenzen
45
# Break out DDL into helper functions, as per request by David Rushby
47
# Revision 1.6 2003/02/21 03:04:33 zenzen
48
# Stuff from Henrik Ekelund:
50
# added test_nextset & hooks
52
# Revision 1.5 2003/02/17 22:08:43 zenzen
53
# Implement suggestions and code from Henrik Eklund - test that cursor.arraysize
54
# defaults to 1 & generic cursor.callproc test added
56
# Revision 1.4 2003/02/15 00:16:33 zenzen
57
# Changes, as per suggestions and bug reports by M.-A. Lemburg,
58
# Matthew T. Kromer, Federico Di Gregorio and Daniel Dittmar
60
# - Now a subclass of TestCase, to avoid requiring the driver stub
61
# to use multiple inheritance
62
# - Reversed the polarity of buggy test in test_description
63
# - Test exception heirarchy correctly
64
# - self.populate is now self._populate(), so if a driver stub
65
# overrides self.ddl1 this change propogates
66
# - VARCHAR columns now have a width, which will hopefully make the
67
# DDL even more portible (this will be reversed if it causes more problems)
68
# - cursor.rowcount being checked after various execute and fetchXXX methods
69
# - Check for fetchall and fetchmany returning empty lists after results
70
# are exhausted (already checking for empty lists if select retrieved
72
# - Fix bugs in test_setoutputsize_basic and test_setinputsizes
75
if sys.version_info < (3,0) and isinstance(sval, str):
76
sval = sval.decode("latin1")
77
return sval.encode("latin1")
79
class DatabaseAPI20Test(unittest.TestCase):
80
''' Test a database self.driver for DB API 2.0 compatibility.
81
This implementation tests Gadfly, but the TestCase
82
is structured so that other self.drivers can subclass this
83
test case to ensure compiliance with the DB-API. It is
84
expected that this TestCase may be expanded in the future
85
if ambiguities or edge conditions are discovered.
87
The 'Optional Extensions' are not yet being tested.
89
self.drivers should subclass this test, overriding setUp, tearDown,
90
self.driver, connect_args and connect_kw_args. Class specification
94
class mytest(dbapi20.DatabaseAPI20Test):
97
Don't 'import DatabaseAPI20Test from dbapi20', or you will
98
confuse the unit tester - just 'import dbapi20'.
101
# The self.driver module. This should be the module where the 'connect'
102
# method is to be found
104
connect_args = () # List of arguments to pass to connect
105
connect_kw_args = {} # Keyword arguments for connect
106
table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables
108
ddl1 = 'create table %sbooze (name varchar(20))' % table_prefix
109
ddl2 = 'create table %sbarflys (name varchar(20))' % table_prefix
110
xddl1 = 'drop table %sbooze' % table_prefix
111
xddl2 = 'drop table %sbarflys' % table_prefix
113
lowerfunc = 'to_lower' # Name of stored procedure to convert string->lowercase
115
# Some drivers may need to override these helpers, for example adding
116
# a 'commit' after the execute.
117
def executeDDL1(self,cursor):
118
cursor.execute(self.ddl1)
120
def executeDDL2(self,cursor):
121
cursor.execute(self.ddl2)
124
''' self.drivers should override this method to perform required setup
125
if any is necessary, such as creating the database.
130
''' self.drivers should override this method to perform required cleanup
131
if any is necessary, such as deleting the test database.
132
The default drops the tables that may be created.
134
con = self._connect()
137
for ddl in (self.xddl1,self.xddl2):
141
except self.driver.Error:
142
# Assume table didn't exist. Other tests will check if
150
return self.driver.connect(
151
*self.connect_args,**self.connect_kw_args
153
except AttributeError:
154
self.fail("No connect method found in self.driver module")
156
def test_connect(self):
157
con = self._connect()
160
def test_apilevel(self):
163
apilevel = self.driver.apilevel
165
self.assertEqual(apilevel,'2.0')
166
except AttributeError:
167
self.fail("Driver doesn't define apilevel")
169
def test_threadsafety(self):
172
threadsafety = self.driver.threadsafety
173
# Must be a valid value
174
self.assertTrue(threadsafety in (0,1,2,3))
175
except AttributeError:
176
self.fail("Driver doesn't define threadsafety")
178
def test_paramstyle(self):
181
paramstyle = self.driver.paramstyle
182
# Must be a valid value
183
self.assertTrue(paramstyle in (
184
'qmark','numeric','named','format','pyformat'
186
except AttributeError:
187
self.fail("Driver doesn't define paramstyle")
189
def test_Exceptions(self):
190
# Make sure required exceptions exist, and are in the
192
if sys.version[0] == '3': #under Python 3 StardardError no longer exists
193
self.assertTrue(issubclass(self.driver.Warning,Exception))
194
self.assertTrue(issubclass(self.driver.Error,Exception))
196
self.assertTrue(issubclass(self.driver.Warning,StandardError))
197
self.assertTrue(issubclass(self.driver.Error,StandardError))
200
issubclass(self.driver.InterfaceError,self.driver.Error)
203
issubclass(self.driver.DatabaseError,self.driver.Error)
206
issubclass(self.driver.OperationalError,self.driver.Error)
209
issubclass(self.driver.IntegrityError,self.driver.Error)
212
issubclass(self.driver.InternalError,self.driver.Error)
215
issubclass(self.driver.ProgrammingError,self.driver.Error)
218
issubclass(self.driver.NotSupportedError,self.driver.Error)
221
def test_ExceptionsAsConnectionAttributes(self):
223
# Test for the optional DB API 2.0 extension, where the exceptions
224
# are exposed as attributes on the Connection object
225
# I figure this optional extension will be implemented by any
226
# driver author who is using this test suite, so it is enabled
228
con = self._connect()
231
self.assertTrue(con.Warning is drv.Warning)
232
self.assertTrue(con.Error is drv.Error)
233
self.assertTrue(con.InterfaceError is drv.InterfaceError)
234
self.assertTrue(con.DatabaseError is drv.DatabaseError)
235
self.assertTrue(con.OperationalError is drv.OperationalError)
236
self.assertTrue(con.IntegrityError is drv.IntegrityError)
237
self.assertTrue(con.InternalError is drv.InternalError)
238
self.assertTrue(con.ProgrammingError is drv.ProgrammingError)
239
self.assertTrue(con.NotSupportedError is drv.NotSupportedError)
244
def test_commit(self):
245
con = self._connect()
247
# Commit must work, even if it doesn't do anything
252
def test_rollback(self):
253
con = self._connect()
255
# If rollback is defined, it should either work or throw
256
# the documented exception
257
if hasattr(con,'rollback'):
260
except self.driver.NotSupportedError:
265
def test_cursor(self):
266
con = self._connect()
272
def test_cursor_isolation(self):
273
con = self._connect()
275
# Make sure cursors created from the same connection have
276
# the documented transaction isolation level
279
self.executeDDL1(cur1)
280
cur1.execute("insert into %sbooze values ('Victoria Bitter')" % (
283
cur2.execute("select name from %sbooze" % self.table_prefix)
284
booze = cur2.fetchall()
285
self.assertEqual(len(booze),1)
286
self.assertEqual(len(booze[0]),1)
287
self.assertEqual(booze[0][0],'Victoria Bitter')
291
def test_description(self):
292
con = self._connect()
295
self.executeDDL1(cur)
296
self.assertEqual(cur.description,None,
297
'cursor.description should be none after executing a '
298
'statement that can return no rows (such as DDL)'
300
cur.execute('select name from %sbooze' % self.table_prefix)
301
self.assertEqual(len(cur.description),1,
302
'cursor.description describes too many columns'
304
self.assertEqual(len(cur.description[0]),7,
305
'cursor.description[x] tuples must have 7 elements'
307
self.assertEqual(cur.description[0][0].lower(),'name',
308
'cursor.description[x][0] must return column name'
310
self.assertEqual(cur.description[0][1],self.driver.STRING,
311
'cursor.description[x][1] must return column type. Got %r'
312
% cur.description[0][1]
315
# Make sure self.description gets reset
316
self.executeDDL2(cur)
317
self.assertEqual(cur.description,None,
318
'cursor.description not being set to None when executing '
319
'no-result statements (eg. DDL)'
324
def test_rowcount(self):
325
con = self._connect()
328
self.executeDDL1(cur)
329
self.assertEqual(cur.rowcount,-1,
330
'cursor.rowcount should be -1 after executing no-result '
333
cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
336
self.assertTrue(cur.rowcount in (-1,1),
337
'cursor.rowcount should == number or rows inserted, or '
338
'set to -1 after executing an insert statement'
340
cur.execute("select name from %sbooze" % self.table_prefix)
341
self.assertTrue(cur.rowcount in (-1,1),
342
'cursor.rowcount should == number of rows returned, or '
343
'set to -1 after executing a select statement'
345
self.executeDDL2(cur)
346
self.assertEqual(cur.rowcount,-1,
347
'cursor.rowcount not being reset to -1 after executing '
348
'no-result statements'
353
lower_func = 'to_lower'
354
def test_callproc(self):
355
con = self._connect()
358
self._callproc_setup(cur)
359
if self.lower_func and hasattr(cur,'callproc'):
360
r = cur.callproc(self.lower_func,('FOO',))
361
self.assertEqual(len(r),1)
362
self.assertEqual(r[0],'FOO')
364
self.assertEqual(len(r),1,'callproc produced no result set')
365
self.assertEqual(len(r[0]),1,
366
'callproc produced invalid result set'
368
self.assertEqual(r[0][0],'foo',
369
'callproc produced invalid results'
374
def test_close(self):
375
con = self._connect()
381
# cursor.execute should raise an Error if called after connection
383
self.assertRaises(self.driver.Error,self.executeDDL1,cur)
385
# connection.commit should raise an Error if called after connection'
387
self.assertRaises(self.driver.Error,con.commit)
389
# connection.close should raise an Error if called more than once
390
# # disabled, there is no such requirement in DBAPI PEP-0249
391
#self.assertRaises(self.driver.Error,con.close)
393
def test_execute(self):
394
con = self._connect()
397
self._paraminsert(cur)
401
def _paraminsert(self,cur):
402
self.executeDDL1(cur)
403
cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
406
self.assertTrue(cur.rowcount in (-1,1))
408
if self.driver.paramstyle == 'qmark':
410
'insert into %sbooze values (?)' % self.table_prefix,
413
elif self.driver.paramstyle == 'numeric':
415
'insert into %sbooze values (:1)' % self.table_prefix,
418
elif self.driver.paramstyle == 'named':
420
'insert into %sbooze values (:beer)' % self.table_prefix,
423
elif self.driver.paramstyle == 'format':
425
'insert into %sbooze values (%%s)' % self.table_prefix,
428
elif self.driver.paramstyle == 'pyformat':
430
'insert into %sbooze values (%%(beer)s)' % self.table_prefix,
434
self.fail('Invalid paramstyle')
435
self.assertTrue(cur.rowcount in (-1,1))
437
cur.execute('select name from %sbooze' % self.table_prefix)
439
self.assertEqual(len(res),2,'cursor.fetchall returned too few rows')
440
beers = [res[0][0],res[1][0]]
442
self.assertEqual(beers[0],"Cooper's",
443
'cursor.fetchall retrieved incorrect data, or data inserted '
446
self.assertEqual(beers[1],"Victoria Bitter",
447
'cursor.fetchall retrieved incorrect data, or data inserted '
451
def test_executemany(self):
452
con = self._connect()
455
self.executeDDL1(cur)
456
largs = [ ("Cooper's",) , ("Boag's",) ]
457
margs = [ {'beer': "Cooper's"}, {'beer': "Boag's"} ]
458
if self.driver.paramstyle == 'qmark':
460
'insert into %sbooze values (?)' % self.table_prefix,
463
elif self.driver.paramstyle == 'numeric':
465
'insert into %sbooze values (:1)' % self.table_prefix,
468
elif self.driver.paramstyle == 'named':
470
'insert into %sbooze values (:beer)' % self.table_prefix,
473
elif self.driver.paramstyle == 'format':
475
'insert into %sbooze values (%%s)' % self.table_prefix,
478
elif self.driver.paramstyle == 'pyformat':
480
'insert into %sbooze values (%%(beer)s)' % (
486
self.fail('Unknown paramstyle')
487
self.assertTrue(cur.rowcount in (-1,2),
488
'insert using cursor.executemany set cursor.rowcount to '
489
'incorrect value %r' % cur.rowcount
491
cur.execute('select name from %sbooze' % self.table_prefix)
493
self.assertEqual(len(res),2,
494
'cursor.fetchall retrieved incorrect number of rows'
496
beers = [res[0][0],res[1][0]]
498
self.assertEqual(beers[0],"Boag's",'incorrect data retrieved')
499
self.assertEqual(beers[1],"Cooper's",'incorrect data retrieved')
503
def test_fetchone(self):
504
con = self._connect()
508
# cursor.fetchone should raise an Error if called before
509
# executing a select-type query
510
self.assertRaises(self.driver.Error,cur.fetchone)
512
# cursor.fetchone should raise an Error if called after
513
# executing a query that cannnot return rows
514
self.executeDDL1(cur)
515
self.assertRaises(self.driver.Error,cur.fetchone)
517
cur.execute('select name from %sbooze' % self.table_prefix)
518
self.assertEqual(cur.fetchone(),None,
519
'cursor.fetchone should return None if a query retrieves '
522
self.assertTrue(cur.rowcount in (-1,0))
524
# cursor.fetchone should raise an Error if called after
525
# executing a query that cannnot return rows
526
cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
529
self.assertRaises(self.driver.Error,cur.fetchone)
531
cur.execute('select name from %sbooze' % self.table_prefix)
533
self.assertEqual(len(r),1,
534
'cursor.fetchone should have retrieved a single row'
536
self.assertEqual(r[0],'Victoria Bitter',
537
'cursor.fetchone retrieved incorrect data'
539
self.assertEqual(cur.fetchone(),None,
540
'cursor.fetchone should return None if no more rows available'
542
self.assertTrue(cur.rowcount in (-1,1))
556
''' Return a list of sql commands to setup the DB for the fetch
560
"insert into %sbooze values ('%s')" % (self.table_prefix,s)
561
for s in self.samples
565
def test_fetchmany(self):
566
con = self._connect()
570
# cursor.fetchmany should raise an Error if called without
572
self.assertRaises(self.driver.Error,cur.fetchmany,4)
574
self.executeDDL1(cur)
575
for sql in self._populate():
578
cur.execute('select name from %sbooze' % self.table_prefix)
580
self.assertEqual(len(r),1,
581
'cursor.fetchmany retrieved incorrect number of rows, '
582
'default of arraysize is one.'
585
r = cur.fetchmany(3) # Should get 3 rows
586
self.assertEqual(len(r),3,
587
'cursor.fetchmany retrieved incorrect number of rows'
589
r = cur.fetchmany(4) # Should get 2 more
590
self.assertEqual(len(r),2,
591
'cursor.fetchmany retrieved incorrect number of rows'
593
r = cur.fetchmany(4) # Should be an empty sequence
594
self.assertEqual(len(r),0,
595
'cursor.fetchmany should return an empty sequence after '
596
'results are exhausted'
598
self.assertTrue(cur.rowcount in (-1,6))
600
# Same as above, using cursor.arraysize
602
cur.execute('select name from %sbooze' % self.table_prefix)
603
r = cur.fetchmany() # Should get 4 rows
604
self.assertEqual(len(r),4,
605
'cursor.arraysize not being honoured by fetchmany'
607
r = cur.fetchmany() # Should get 2 more
608
self.assertEqual(len(r),2)
609
r = cur.fetchmany() # Should be an empty sequence
610
self.assertEqual(len(r),0)
611
self.assertTrue(cur.rowcount in (-1,6))
614
cur.execute('select name from %sbooze' % self.table_prefix)
615
rows = cur.fetchmany() # Should get all rows
616
self.assertTrue(cur.rowcount in (-1,6))
617
self.assertEqual(len(rows),6)
618
self.assertEqual(len(rows),6)
619
rows = [r[0] for r in rows]
622
# Make sure we get the right data back out
624
self.assertEqual(rows[i],self.samples[i],
625
'incorrect data retrieved by cursor.fetchmany'
628
rows = cur.fetchmany() # Should return an empty list
629
self.assertEqual(len(rows),0,
630
'cursor.fetchmany should return an empty sequence if '
631
'called after the whole result set has been fetched'
633
self.assertTrue(cur.rowcount in (-1,6))
635
self.executeDDL2(cur)
636
cur.execute('select name from %sbarflys' % self.table_prefix)
637
r = cur.fetchmany() # Should get empty sequence
638
self.assertEqual(len(r),0,
639
'cursor.fetchmany should return an empty sequence if '
640
'query retrieved no rows'
642
self.assertTrue(cur.rowcount in (-1,0))
647
def test_fetchall(self):
648
con = self._connect()
651
# cursor.fetchall should raise an Error if called
652
# without executing a query that may return rows (such
654
self.assertRaises(self.driver.Error, cur.fetchall)
656
self.executeDDL1(cur)
657
for sql in self._populate():
660
# cursor.fetchall should raise an Error if called
661
# after executing a a statement that cannot return rows
662
self.assertRaises(self.driver.Error,cur.fetchall)
664
cur.execute('select name from %sbooze' % self.table_prefix)
665
rows = cur.fetchall()
666
self.assertTrue(cur.rowcount in (-1,len(self.samples)))
667
self.assertEqual(len(rows),len(self.samples),
668
'cursor.fetchall did not retrieve all rows'
670
rows = [r[0] for r in rows]
672
for i in range(0,len(self.samples)):
673
self.assertEqual(rows[i],self.samples[i],
674
'cursor.fetchall retrieved incorrect rows'
676
rows = cur.fetchall()
679
'cursor.fetchall should return an empty list if called '
680
'after the whole result set has been fetched'
682
self.assertTrue(cur.rowcount in (-1,len(self.samples)))
684
self.executeDDL2(cur)
685
cur.execute('select name from %sbarflys' % self.table_prefix)
686
rows = cur.fetchall()
687
self.assertTrue(cur.rowcount in (-1,0))
688
self.assertEqual(len(rows),0,
689
'cursor.fetchall should return an empty list if '
690
'a select query returns no rows'
696
def test_mixedfetch(self):
697
con = self._connect()
700
self.executeDDL1(cur)
701
for sql in self._populate():
704
cur.execute('select name from %sbooze' % self.table_prefix)
705
rows1 = cur.fetchone()
706
rows23 = cur.fetchmany(2)
707
rows4 = cur.fetchone()
708
rows56 = cur.fetchall()
709
self.assertTrue(cur.rowcount in (-1,6))
710
self.assertEqual(len(rows23),2,
711
'fetchmany returned incorrect number of rows'
713
self.assertEqual(len(rows56),2,
714
'fetchall returned incorrect number of rows'
718
rows.extend([rows23[0][0],rows23[1][0]])
719
rows.append(rows4[0])
720
rows.extend([rows56[0][0],rows56[1][0]])
722
for i in range(0,len(self.samples)):
723
self.assertEqual(rows[i],self.samples[i],
724
'incorrect data retrieved or inserted'
729
def help_nextset_setUp(self,cur):
730
''' Should create a procedure called deleteme
731
that returns two result sets, first the
732
number of rows in booze then "name from booze"
734
raise NotImplementedError('Helper not implemented')
736
# create procedure deleteme as
738
# select count(*) from booze
739
# select name from booze
744
def help_nextset_tearDown(self,cur):
745
'If cleaning up is needed after nextSetTest'
746
raise NotImplementedError('Helper not implemented')
747
#cur.execute("drop procedure deleteme")
749
def test_nextset(self):
750
con = self._connect()
753
if not hasattr(cur,'nextset'):
757
self.executeDDL1(cur)
759
for sql in self._populate():
762
self.help_nextset_setUp(cur)
764
cur.callproc('deleteme')
765
numberofrows=cur.fetchone()
766
self.assertEqual(numberofrows[0], len(self.samples))
769
assert len(names) == len(self.samples)
771
assert s == None,'No more return sets, should return None'
773
self.help_nextset_tearDown(cur)
778
#def test_nextset(self):
779
# raise NotImplementedError('Drivers need to override this test')
781
def test_arraysize(self):
782
# Not much here - rest of the tests for this are in test_fetchmany
783
con = self._connect()
786
self.assertTrue(hasattr(cur,'arraysize'),
787
'cursor.arraysize must be defined'
792
def test_setinputsizes(self):
793
con = self._connect()
796
cur.setinputsizes( (25,) )
797
self._paraminsert(cur) # Make sure cursor still works
801
def test_setoutputsize_basic(self):
802
# Basic test is to make sure setoutputsize doesn't blow up
803
con = self._connect()
806
cur.setoutputsize(1000)
807
cur.setoutputsize(2000,0)
808
self._paraminsert(cur) # Make sure the cursor still works
812
def test_setoutputsize(self):
813
# Real test for setoutputsize is driver dependant
814
raise NotImplementedError('Driver needed to override this test')
817
con = self._connect()
820
self.executeDDL1(cur)
821
cur.execute('insert into %sbooze values (NULL)' % self.table_prefix)
822
cur.execute('select name from %sbooze' % self.table_prefix)
824
self.assertEqual(len(r),1)
825
self.assertEqual(len(r[0]),1)
826
self.assertEqual(r[0][0],None,'NULL value not returned as None')
831
d1 = self.driver.Date(2002,12,25)
832
d2 = self.driver.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
833
# Can we assume this? API doesn't specify, but it seems implied
834
# self.assertEqual(str(d1),str(d2))
837
t1 = self.driver.Time(13,45,30)
838
t2 = self.driver.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
839
# Can we assume this? API doesn't specify, but it seems implied
840
# self.assertEqual(str(t1),str(t2))
842
def test_Timestamp(self):
843
t1 = self.driver.Timestamp(2002,12,25,13,45,30)
844
t2 = self.driver.TimestampFromTicks(
845
time.mktime((2002,12,25,13,45,30,0,0,0))
847
# Can we assume this? API doesn't specify, but it seems implied
848
# self.assertEqual(str(t1),str(t2))
850
def test_Binary(self):
851
b = self.driver.Binary(str2bytes('Something'))
852
b = self.driver.Binary(str2bytes(''))
854
def test_STRING(self):
855
self.assertTrue(hasattr(self.driver,'STRING'),
856
'module.STRING must be defined'
859
def test_BINARY(self):
860
self.assertTrue(hasattr(self.driver,'BINARY'),
861
'module.BINARY must be defined.'
864
def test_NUMBER(self):
865
self.assertTrue(hasattr(self.driver,'NUMBER'),
866
'module.NUMBER must be defined.'
869
def test_DATETIME(self):
870
self.assertTrue(hasattr(self.driver,'DATETIME'),
871
'module.DATETIME must be defined.'
874
def test_ROWID(self):
875
self.assertTrue(hasattr(self.driver,'ROWID'),
876
'module.ROWID must be defined.'