~ubuntu-branches/debian/sid/pytds/sid

« back to all changes in this revision

Viewing changes to tests/dbapi20.py

  • Committer: Package Import Robot
  • Author(s): Christopher Hoskin
  • Date: 2017-03-11 20:12:33 UTC
  • Revision ID: package-import@ubuntu.com-20170311201233-voewiramv2n5i4uj
Tags: upstream-1.8.2
ImportĀ upstreamĀ versionĀ 1.8.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#!/usr/bin/env python
 
2
''' Python DB API 2.0 driver compliance unit test suite. 
 
3
    
 
4
    This software is Public Domain and may be used without restrictions.
 
5
 
 
6
 "Now we have booze and barflies entering the discussion, plus rumours of
 
7
  DBAs on drugs... and I won't tell you what flashes through my mind each
 
8
  time I read the subject line with 'Anal Compliance' in it.  All around
 
9
  this is turning out to be a thoroughly unwholesome unit test."
 
10
 
 
11
    -- Ian Bicking
 
12
'''
 
13
 
 
14
__rcs_id__  = '$Id: dbapi20.py,v 1.11 2005/01/02 02:41:01 zenzen Exp $'
 
15
__version__ = '$Revision: 1.12 $'[11:-2]
 
16
__author__ = 'Stuart Bishop <stuart@stuartbishop.net>'
 
17
 
 
18
import unittest
 
19
import time
 
20
import sys
 
21
 
 
22
 
 
23
# Revision 1.12  2009/02/06 03:35:11  kf7xm
 
24
# Tested okay with Python 3.0, includes last minute patches from Mark H.
 
25
#
 
26
# Revision 1.1.1.1.2.1  2008/09/20 19:54:59  rupole
 
27
# Include latest changes from main branch
 
28
# Updates for py3k
 
29
#
 
30
# Revision 1.11  2005/01/02 02:41:01  zenzen
 
31
# Update author email address
 
32
#
 
33
# Revision 1.10  2003/10/09 03:14:14  zenzen
 
34
# Add test for DB API 2.0 optional extension, where database exceptions
 
35
# are exposed as attributes on the Connection object.
 
36
#
 
37
# Revision 1.9  2003/08/13 01:16:36  zenzen
 
38
# Minor tweak from Stefan Fleiter
 
39
#
 
40
# Revision 1.8  2003/04/10 00:13:25  zenzen
 
41
# Changes, as per suggestions by M.-A. Lemburg
 
42
# - Add a table prefix, to ensure namespace collisions can always be avoided
 
43
#
 
44
# Revision 1.7  2003/02/26 23:33:37  zenzen
 
45
# Break out DDL into helper functions, as per request by David Rushby
 
46
#
 
47
# Revision 1.6  2003/02/21 03:04:33  zenzen
 
48
# Stuff from Henrik Ekelund:
 
49
#     added test_None
 
50
#     added test_nextset & hooks
 
51
#
 
52
# Revision 1.5  2003/02/17 22:08:43  zenzen
 
53
# Implement suggestions and code from Henrik Eklund - test that cursor.arraysize
 
54
# defaults to 1 & generic cursor.callproc test added
 
55
#
 
56
# Revision 1.4  2003/02/15 00:16:33  zenzen
 
57
# Changes, as per suggestions and bug reports by M.-A. Lemburg,
 
58
# Matthew T. Kromer, Federico Di Gregorio and Daniel Dittmar
 
59
# - Class renamed
 
60
# - Now a subclass of TestCase, to avoid requiring the driver stub
 
61
#   to use multiple inheritance
 
62
# - Reversed the polarity of buggy test in test_description
 
63
# - Test exception heirarchy correctly
 
64
# - self.populate is now self._populate(), so if a driver stub
 
65
#   overrides self.ddl1 this change propogates
 
66
# - VARCHAR columns now have a width, which will hopefully make the
 
67
#   DDL even more portible (this will be reversed if it causes more problems)
 
68
# - cursor.rowcount being checked after various execute and fetchXXX methods
 
69
# - Check for fetchall and fetchmany returning empty lists after results
 
70
#   are exhausted (already checking for empty lists if select retrieved
 
71
#   nothing
 
72
# - Fix bugs in test_setoutputsize_basic and test_setinputsizes
 
73
#
 
74
def str2bytes(sval):
 
75
    if sys.version_info < (3,0) and isinstance(sval, str):
 
76
        sval = sval.decode("latin1")
 
77
    return sval.encode("latin1")
 
78
 
 
79
class DatabaseAPI20Test(unittest.TestCase):
 
80
    ''' Test a database self.driver for DB API 2.0 compatibility.
 
81
        This implementation tests Gadfly, but the TestCase
 
82
        is structured so that other self.drivers can subclass this 
 
83
        test case to ensure compiliance with the DB-API. It is 
 
84
        expected that this TestCase may be expanded in the future
 
85
        if ambiguities or edge conditions are discovered.
 
86
 
 
87
        The 'Optional Extensions' are not yet being tested.
 
88
 
 
89
        self.drivers should subclass this test, overriding setUp, tearDown,
 
90
        self.driver, connect_args and connect_kw_args. Class specification
 
91
        should be as follows:
 
92
 
 
93
        import dbapi20 
 
94
        class mytest(dbapi20.DatabaseAPI20Test):
 
95
           [...] 
 
96
 
 
97
        Don't 'import DatabaseAPI20Test from dbapi20', or you will
 
98
        confuse the unit tester - just 'import dbapi20'.
 
99
    '''
 
100
 
 
101
    # The self.driver module. This should be the module where the 'connect'
 
102
    # method is to be found
 
103
    driver = None
 
104
    connect_args = () # List of arguments to pass to connect
 
105
    connect_kw_args = {} # Keyword arguments for connect
 
106
    table_prefix = 'dbapi20test_' # If you need to specify a prefix for tables
 
107
 
 
108
    ddl1 = 'create table %sbooze (name varchar(20))' % table_prefix
 
109
    ddl2 = 'create table %sbarflys (name varchar(20))' % table_prefix
 
110
    xddl1 = 'drop table %sbooze' % table_prefix
 
111
    xddl2 = 'drop table %sbarflys' % table_prefix
 
112
 
 
113
    lowerfunc = 'to_lower' # Name of stored procedure to convert string->lowercase
 
114
        
 
115
    # Some drivers may need to override these helpers, for example adding
 
116
    # a 'commit' after the execute.
 
117
    def executeDDL1(self,cursor):
 
118
        cursor.execute(self.ddl1)
 
119
 
 
120
    def executeDDL2(self,cursor):
 
121
        cursor.execute(self.ddl2)
 
122
 
 
123
    def setUp(self):
 
124
        ''' self.drivers should override this method to perform required setup
 
125
            if any is necessary, such as creating the database.
 
126
        '''
 
127
        pass
 
128
 
 
129
    def tearDown(self):
 
130
        ''' self.drivers should override this method to perform required cleanup
 
131
            if any is necessary, such as deleting the test database.
 
132
            The default drops the tables that may be created.
 
133
        '''
 
134
        con = self._connect()
 
135
        try:
 
136
            cur = con.cursor()
 
137
            for ddl in (self.xddl1,self.xddl2):
 
138
                try: 
 
139
                    cur.execute(ddl)
 
140
                    con.commit()
 
141
                except self.driver.Error: 
 
142
                    # Assume table didn't exist. Other tests will check if
 
143
                    # execute is busted.
 
144
                    pass
 
145
        finally:
 
146
            con.close()
 
147
 
 
148
    def _connect(self):
 
149
        try:
 
150
            return self.driver.connect(
 
151
                *self.connect_args,**self.connect_kw_args
 
152
                )
 
153
        except AttributeError:
 
154
            self.fail("No connect method found in self.driver module")
 
155
 
 
156
    def test_connect(self):
 
157
        con = self._connect()
 
158
        con.close()
 
159
 
 
160
    def test_apilevel(self):
 
161
        try:
 
162
            # Must exist
 
163
            apilevel = self.driver.apilevel
 
164
            # Must equal 2.0
 
165
            self.assertEqual(apilevel,'2.0')
 
166
        except AttributeError:
 
167
            self.fail("Driver doesn't define apilevel")
 
168
 
 
169
    def test_threadsafety(self):
 
170
        try:
 
171
            # Must exist
 
172
            threadsafety = self.driver.threadsafety
 
173
            # Must be a valid value
 
174
            self.assertTrue(threadsafety in (0,1,2,3))
 
175
        except AttributeError:
 
176
            self.fail("Driver doesn't define threadsafety")
 
177
 
 
178
    def test_paramstyle(self):
 
179
        try:
 
180
            # Must exist
 
181
            paramstyle = self.driver.paramstyle
 
182
            # Must be a valid value
 
183
            self.assertTrue(paramstyle in (
 
184
                'qmark','numeric','named','format','pyformat'
 
185
                ))
 
186
        except AttributeError:
 
187
            self.fail("Driver doesn't define paramstyle")
 
188
 
 
189
    def test_Exceptions(self):
 
190
        # Make sure required exceptions exist, and are in the
 
191
        # defined heirarchy.
 
192
        if sys.version[0] == '3': #under Python 3 StardardError no longer exists
 
193
            self.assertTrue(issubclass(self.driver.Warning,Exception))
 
194
            self.assertTrue(issubclass(self.driver.Error,Exception))
 
195
        else:
 
196
            self.assertTrue(issubclass(self.driver.Warning,StandardError))
 
197
            self.assertTrue(issubclass(self.driver.Error,StandardError))
 
198
 
 
199
        self.assertTrue(
 
200
            issubclass(self.driver.InterfaceError,self.driver.Error)
 
201
            )
 
202
        self.assertTrue(
 
203
            issubclass(self.driver.DatabaseError,self.driver.Error)
 
204
            )
 
205
        self.assertTrue(
 
206
            issubclass(self.driver.OperationalError,self.driver.Error)
 
207
            )
 
208
        self.assertTrue(
 
209
            issubclass(self.driver.IntegrityError,self.driver.Error)
 
210
            )
 
211
        self.assertTrue(
 
212
            issubclass(self.driver.InternalError,self.driver.Error)
 
213
            )
 
214
        self.assertTrue(
 
215
            issubclass(self.driver.ProgrammingError,self.driver.Error)
 
216
            )
 
217
        self.assertTrue(
 
218
            issubclass(self.driver.NotSupportedError,self.driver.Error)
 
219
            )
 
220
 
 
221
    def test_ExceptionsAsConnectionAttributes(self):
 
222
        # OPTIONAL EXTENSION
 
223
        # Test for the optional DB API 2.0 extension, where the exceptions
 
224
        # are exposed as attributes on the Connection object
 
225
        # I figure this optional extension will be implemented by any
 
226
        # driver author who is using this test suite, so it is enabled
 
227
        # by default.
 
228
        con = self._connect()
 
229
        try:
 
230
            drv = self.driver
 
231
            self.assertTrue(con.Warning is drv.Warning)
 
232
            self.assertTrue(con.Error is drv.Error)
 
233
            self.assertTrue(con.InterfaceError is drv.InterfaceError)
 
234
            self.assertTrue(con.DatabaseError is drv.DatabaseError)
 
235
            self.assertTrue(con.OperationalError is drv.OperationalError)
 
236
            self.assertTrue(con.IntegrityError is drv.IntegrityError)
 
237
            self.assertTrue(con.InternalError is drv.InternalError)
 
238
            self.assertTrue(con.ProgrammingError is drv.ProgrammingError)
 
239
            self.assertTrue(con.NotSupportedError is drv.NotSupportedError)
 
240
        finally:
 
241
            con.close()
 
242
 
 
243
 
 
244
    def test_commit(self):
 
245
        con = self._connect()
 
246
        try:
 
247
            # Commit must work, even if it doesn't do anything
 
248
            con.commit()
 
249
        finally:
 
250
            con.close()
 
251
 
 
252
    def test_rollback(self):
 
253
        con = self._connect()
 
254
        try:
 
255
            # If rollback is defined, it should either work or throw
 
256
            # the documented exception
 
257
            if hasattr(con,'rollback'):
 
258
                try:
 
259
                    con.rollback()
 
260
                except self.driver.NotSupportedError:
 
261
                    pass
 
262
        finally:
 
263
            con.close()
 
264
    
 
265
    def test_cursor(self):
 
266
        con = self._connect()
 
267
        try:
 
268
            cur = con.cursor()
 
269
        finally:
 
270
            con.close()
 
271
 
 
272
    def test_cursor_isolation(self):
 
273
        con = self._connect()
 
274
        try:
 
275
            # Make sure cursors created from the same connection have
 
276
            # the documented transaction isolation level
 
277
            cur1 = con.cursor()
 
278
            cur2 = con.cursor()
 
279
            self.executeDDL1(cur1)
 
280
            cur1.execute("insert into %sbooze values ('Victoria Bitter')" % (
 
281
                self.table_prefix
 
282
                ))
 
283
            cur2.execute("select name from %sbooze" % self.table_prefix)
 
284
            booze = cur2.fetchall()
 
285
            self.assertEqual(len(booze),1)
 
286
            self.assertEqual(len(booze[0]),1)
 
287
            self.assertEqual(booze[0][0],'Victoria Bitter')
 
288
        finally:
 
289
            con.close()
 
290
 
 
291
    def test_description(self):
 
292
        con = self._connect()
 
293
        try:
 
294
            cur = con.cursor()
 
295
            self.executeDDL1(cur)
 
296
            self.assertEqual(cur.description,None,
 
297
                'cursor.description should be none after executing a '
 
298
                'statement that can return no rows (such as DDL)'
 
299
                )
 
300
            cur.execute('select name from %sbooze' % self.table_prefix)
 
301
            self.assertEqual(len(cur.description),1,
 
302
                'cursor.description describes too many columns'
 
303
                )
 
304
            self.assertEqual(len(cur.description[0]),7,
 
305
                'cursor.description[x] tuples must have 7 elements'
 
306
                )
 
307
            self.assertEqual(cur.description[0][0].lower(),'name',
 
308
                'cursor.description[x][0] must return column name'
 
309
                )
 
310
            self.assertEqual(cur.description[0][1],self.driver.STRING,
 
311
                'cursor.description[x][1] must return column type. Got %r'
 
312
                    % cur.description[0][1]
 
313
                )
 
314
 
 
315
            # Make sure self.description gets reset
 
316
            self.executeDDL2(cur)
 
317
            self.assertEqual(cur.description,None,
 
318
                'cursor.description not being set to None when executing '
 
319
                'no-result statements (eg. DDL)'
 
320
                )
 
321
        finally:
 
322
            con.close()
 
323
 
 
324
    def test_rowcount(self):
 
325
        con = self._connect()
 
326
        try:
 
327
            cur = con.cursor()
 
328
            self.executeDDL1(cur)
 
329
            self.assertEqual(cur.rowcount,-1,
 
330
                'cursor.rowcount should be -1 after executing no-result '
 
331
                'statements'
 
332
                )
 
333
            cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
 
334
                self.table_prefix
 
335
                ))
 
336
            self.assertTrue(cur.rowcount in (-1,1),
 
337
                'cursor.rowcount should == number or rows inserted, or '
 
338
                'set to -1 after executing an insert statement'
 
339
                )
 
340
            cur.execute("select name from %sbooze" % self.table_prefix)
 
341
            self.assertTrue(cur.rowcount in (-1,1),
 
342
                'cursor.rowcount should == number of rows returned, or '
 
343
                'set to -1 after executing a select statement'
 
344
                )
 
345
            self.executeDDL2(cur)
 
346
            self.assertEqual(cur.rowcount,-1,
 
347
                'cursor.rowcount not being reset to -1 after executing '
 
348
                'no-result statements'
 
349
                )
 
350
        finally:
 
351
            con.close()
 
352
 
 
353
    lower_func = 'to_lower'
 
354
    def test_callproc(self):
 
355
        con = self._connect()
 
356
        try:
 
357
            cur = con.cursor()
 
358
            self._callproc_setup(cur)
 
359
            if self.lower_func and hasattr(cur,'callproc'):
 
360
                r = cur.callproc(self.lower_func,('FOO',))
 
361
                self.assertEqual(len(r),1)
 
362
                self.assertEqual(r[0],'FOO')
 
363
                r = cur.fetchall()
 
364
                self.assertEqual(len(r),1,'callproc produced no result set')
 
365
                self.assertEqual(len(r[0]),1,
 
366
                    'callproc produced invalid result set'
 
367
                    )
 
368
                self.assertEqual(r[0][0],'foo',
 
369
                    'callproc produced invalid results'
 
370
                    )
 
371
        finally:
 
372
            con.close()
 
373
 
 
374
    def test_close(self):
 
375
        con = self._connect()
 
376
        try:
 
377
            cur = con.cursor()
 
378
        finally:
 
379
            con.close()
 
380
 
 
381
        # cursor.execute should raise an Error if called after connection
 
382
        # closed
 
383
        self.assertRaises(self.driver.Error,self.executeDDL1,cur)
 
384
 
 
385
        # connection.commit should raise an Error if called after connection'
 
386
        # closed.'
 
387
        self.assertRaises(self.driver.Error,con.commit)
 
388
 
 
389
        # connection.close should raise an Error if called more than once
 
390
#       # disabled, there is no such requirement in DBAPI PEP-0249
 
391
        #self.assertRaises(self.driver.Error,con.close)
 
392
 
 
393
    def test_execute(self):
 
394
        con = self._connect()
 
395
        try:
 
396
            cur = con.cursor()
 
397
            self._paraminsert(cur)
 
398
        finally:
 
399
            con.close()
 
400
 
 
401
    def _paraminsert(self,cur):
 
402
        self.executeDDL1(cur)
 
403
        cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
 
404
            self.table_prefix
 
405
            ))
 
406
        self.assertTrue(cur.rowcount in (-1,1))
 
407
 
 
408
        if self.driver.paramstyle == 'qmark':
 
409
            cur.execute(
 
410
                'insert into %sbooze values (?)' % self.table_prefix,
 
411
                ("Cooper's",)
 
412
                )
 
413
        elif self.driver.paramstyle == 'numeric':
 
414
            cur.execute(
 
415
                'insert into %sbooze values (:1)' % self.table_prefix,
 
416
                ("Cooper's",)
 
417
                )
 
418
        elif self.driver.paramstyle == 'named':
 
419
            cur.execute(
 
420
                'insert into %sbooze values (:beer)' % self.table_prefix, 
 
421
                {'beer':"Cooper's"}
 
422
                )
 
423
        elif self.driver.paramstyle == 'format':
 
424
            cur.execute(
 
425
                'insert into %sbooze values (%%s)' % self.table_prefix,
 
426
                ("Cooper's",)
 
427
                )
 
428
        elif self.driver.paramstyle == 'pyformat':
 
429
            cur.execute(
 
430
                'insert into %sbooze values (%%(beer)s)' % self.table_prefix,
 
431
                {'beer':"Cooper's"}
 
432
                )
 
433
        else:
 
434
            self.fail('Invalid paramstyle')
 
435
        self.assertTrue(cur.rowcount in (-1,1))
 
436
 
 
437
        cur.execute('select name from %sbooze' % self.table_prefix)
 
438
        res = cur.fetchall()
 
439
        self.assertEqual(len(res),2,'cursor.fetchall returned too few rows')
 
440
        beers = [res[0][0],res[1][0]]
 
441
        beers.sort()
 
442
        self.assertEqual(beers[0],"Cooper's",
 
443
            'cursor.fetchall retrieved incorrect data, or data inserted '
 
444
            'incorrectly'
 
445
            )
 
446
        self.assertEqual(beers[1],"Victoria Bitter",
 
447
            'cursor.fetchall retrieved incorrect data, or data inserted '
 
448
            'incorrectly'
 
449
            )
 
450
 
 
451
    def test_executemany(self):
 
452
        con = self._connect()
 
453
        try:
 
454
            cur = con.cursor()
 
455
            self.executeDDL1(cur)
 
456
            largs = [ ("Cooper's",) , ("Boag's",) ]
 
457
            margs = [ {'beer': "Cooper's"}, {'beer': "Boag's"} ]
 
458
            if self.driver.paramstyle == 'qmark':
 
459
                cur.executemany(
 
460
                    'insert into %sbooze values (?)' % self.table_prefix,
 
461
                    largs
 
462
                    )
 
463
            elif self.driver.paramstyle == 'numeric':
 
464
                cur.executemany(
 
465
                    'insert into %sbooze values (:1)' % self.table_prefix,
 
466
                    largs
 
467
                    )
 
468
            elif self.driver.paramstyle == 'named':
 
469
                cur.executemany(
 
470
                    'insert into %sbooze values (:beer)' % self.table_prefix,
 
471
                    margs
 
472
                    )
 
473
            elif self.driver.paramstyle == 'format':
 
474
                cur.executemany(
 
475
                    'insert into %sbooze values (%%s)' % self.table_prefix,
 
476
                    largs
 
477
                    )
 
478
            elif self.driver.paramstyle == 'pyformat':
 
479
                cur.executemany(
 
480
                    'insert into %sbooze values (%%(beer)s)' % (
 
481
                        self.table_prefix
 
482
                        ),
 
483
                    margs
 
484
                    )
 
485
            else:
 
486
                self.fail('Unknown paramstyle')
 
487
            self.assertTrue(cur.rowcount in (-1,2),
 
488
                'insert using cursor.executemany set cursor.rowcount to '
 
489
                'incorrect value %r' % cur.rowcount
 
490
                )
 
491
            cur.execute('select name from %sbooze' % self.table_prefix)
 
492
            res = cur.fetchall()
 
493
            self.assertEqual(len(res),2,
 
494
                'cursor.fetchall retrieved incorrect number of rows'
 
495
                )
 
496
            beers = [res[0][0],res[1][0]]
 
497
            beers.sort()
 
498
            self.assertEqual(beers[0],"Boag's",'incorrect data retrieved')
 
499
            self.assertEqual(beers[1],"Cooper's",'incorrect data retrieved')
 
500
        finally:
 
501
            con.close()
 
502
 
 
503
    def test_fetchone(self):
 
504
        con = self._connect()
 
505
        try:
 
506
            cur = con.cursor()
 
507
 
 
508
            # cursor.fetchone should raise an Error if called before
 
509
            # executing a select-type query
 
510
            self.assertRaises(self.driver.Error,cur.fetchone)
 
511
 
 
512
            # cursor.fetchone should raise an Error if called after
 
513
            # executing a query that cannnot return rows
 
514
            self.executeDDL1(cur)
 
515
            self.assertRaises(self.driver.Error,cur.fetchone)
 
516
 
 
517
            cur.execute('select name from %sbooze' % self.table_prefix)
 
518
            self.assertEqual(cur.fetchone(),None,
 
519
                'cursor.fetchone should return None if a query retrieves '
 
520
                'no rows'
 
521
                )
 
522
            self.assertTrue(cur.rowcount in (-1,0))
 
523
 
 
524
            # cursor.fetchone should raise an Error if called after
 
525
            # executing a query that cannnot return rows
 
526
            cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
 
527
                self.table_prefix
 
528
                ))
 
529
            self.assertRaises(self.driver.Error,cur.fetchone)
 
530
 
 
531
            cur.execute('select name from %sbooze' % self.table_prefix)
 
532
            r = cur.fetchone()
 
533
            self.assertEqual(len(r),1,
 
534
                'cursor.fetchone should have retrieved a single row'
 
535
                )
 
536
            self.assertEqual(r[0],'Victoria Bitter',
 
537
                'cursor.fetchone retrieved incorrect data'
 
538
                )
 
539
            self.assertEqual(cur.fetchone(),None,
 
540
                'cursor.fetchone should return None if no more rows available'
 
541
                )
 
542
            self.assertTrue(cur.rowcount in (-1,1))
 
543
        finally:
 
544
            con.close()
 
545
 
 
546
    samples = [
 
547
        'Carlton Cold',
 
548
        'Carlton Draft',
 
549
        'Mountain Goat',
 
550
        'Redback',
 
551
        'Victoria Bitter',
 
552
        'XXXX'
 
553
        ]
 
554
 
 
555
    def _populate(self):
 
556
        ''' Return a list of sql commands to setup the DB for the fetch
 
557
            tests.
 
558
        '''
 
559
        populate = [
 
560
            "insert into %sbooze values ('%s')" % (self.table_prefix,s) 
 
561
                for s in self.samples
 
562
            ]
 
563
        return populate
 
564
 
 
565
    def test_fetchmany(self):
 
566
        con = self._connect()
 
567
        try:
 
568
            cur = con.cursor()
 
569
 
 
570
            # cursor.fetchmany should raise an Error if called without
 
571
            #issuing a query
 
572
            self.assertRaises(self.driver.Error,cur.fetchmany,4)
 
573
 
 
574
            self.executeDDL1(cur)
 
575
            for sql in self._populate():
 
576
                cur.execute(sql)
 
577
 
 
578
            cur.execute('select name from %sbooze' % self.table_prefix)
 
579
            r = cur.fetchmany()
 
580
            self.assertEqual(len(r),1,
 
581
                'cursor.fetchmany retrieved incorrect number of rows, '
 
582
                'default of arraysize is one.'
 
583
                )
 
584
            cur.arraysize=10
 
585
            r = cur.fetchmany(3) # Should get 3 rows
 
586
            self.assertEqual(len(r),3,
 
587
                'cursor.fetchmany retrieved incorrect number of rows'
 
588
                )
 
589
            r = cur.fetchmany(4) # Should get 2 more
 
590
            self.assertEqual(len(r),2,
 
591
                'cursor.fetchmany retrieved incorrect number of rows'
 
592
                )
 
593
            r = cur.fetchmany(4) # Should be an empty sequence
 
594
            self.assertEqual(len(r),0,
 
595
                'cursor.fetchmany should return an empty sequence after '
 
596
                'results are exhausted'
 
597
            )
 
598
            self.assertTrue(cur.rowcount in (-1,6))
 
599
 
 
600
            # Same as above, using cursor.arraysize
 
601
            cur.arraysize=4
 
602
            cur.execute('select name from %sbooze' % self.table_prefix)
 
603
            r = cur.fetchmany() # Should get 4 rows
 
604
            self.assertEqual(len(r),4,
 
605
                'cursor.arraysize not being honoured by fetchmany'
 
606
                )
 
607
            r = cur.fetchmany() # Should get 2 more
 
608
            self.assertEqual(len(r),2)
 
609
            r = cur.fetchmany() # Should be an empty sequence
 
610
            self.assertEqual(len(r),0)
 
611
            self.assertTrue(cur.rowcount in (-1,6))
 
612
 
 
613
            cur.arraysize=6
 
614
            cur.execute('select name from %sbooze' % self.table_prefix)
 
615
            rows = cur.fetchmany() # Should get all rows
 
616
            self.assertTrue(cur.rowcount in (-1,6))
 
617
            self.assertEqual(len(rows),6)
 
618
            self.assertEqual(len(rows),6)
 
619
            rows = [r[0] for r in rows]
 
620
            rows.sort()
 
621
          
 
622
            # Make sure we get the right data back out
 
623
            for i in range(0,6):
 
624
                self.assertEqual(rows[i],self.samples[i],
 
625
                    'incorrect data retrieved by cursor.fetchmany'
 
626
                    )
 
627
 
 
628
            rows = cur.fetchmany() # Should return an empty list
 
629
            self.assertEqual(len(rows),0,
 
630
                'cursor.fetchmany should return an empty sequence if '
 
631
                'called after the whole result set has been fetched'
 
632
                )
 
633
            self.assertTrue(cur.rowcount in (-1,6))
 
634
 
 
635
            self.executeDDL2(cur)
 
636
            cur.execute('select name from %sbarflys' % self.table_prefix)
 
637
            r = cur.fetchmany() # Should get empty sequence
 
638
            self.assertEqual(len(r),0,
 
639
                'cursor.fetchmany should return an empty sequence if '
 
640
                'query retrieved no rows'
 
641
                )
 
642
            self.assertTrue(cur.rowcount in (-1,0))
 
643
 
 
644
        finally:
 
645
            con.close()
 
646
 
 
647
    def test_fetchall(self):
 
648
        con = self._connect()
 
649
        try:
 
650
            cur = con.cursor()
 
651
            # cursor.fetchall should raise an Error if called
 
652
            # without executing a query that may return rows (such
 
653
            # as a select)
 
654
            self.assertRaises(self.driver.Error, cur.fetchall)
 
655
 
 
656
            self.executeDDL1(cur)
 
657
            for sql in self._populate():
 
658
                cur.execute(sql)
 
659
 
 
660
            # cursor.fetchall should raise an Error if called
 
661
            # after executing a a statement that cannot return rows
 
662
            self.assertRaises(self.driver.Error,cur.fetchall)
 
663
 
 
664
            cur.execute('select name from %sbooze' % self.table_prefix)
 
665
            rows = cur.fetchall()
 
666
            self.assertTrue(cur.rowcount in (-1,len(self.samples)))
 
667
            self.assertEqual(len(rows),len(self.samples),
 
668
                'cursor.fetchall did not retrieve all rows'
 
669
                )
 
670
            rows = [r[0] for r in rows]
 
671
            rows.sort()
 
672
            for i in range(0,len(self.samples)):
 
673
                self.assertEqual(rows[i],self.samples[i],
 
674
                'cursor.fetchall retrieved incorrect rows'
 
675
                )
 
676
            rows = cur.fetchall()
 
677
            self.assertEqual(
 
678
                len(rows),0,
 
679
                'cursor.fetchall should return an empty list if called '
 
680
                'after the whole result set has been fetched'
 
681
                )
 
682
            self.assertTrue(cur.rowcount in (-1,len(self.samples)))
 
683
 
 
684
            self.executeDDL2(cur)
 
685
            cur.execute('select name from %sbarflys' % self.table_prefix)
 
686
            rows = cur.fetchall()
 
687
            self.assertTrue(cur.rowcount in (-1,0))
 
688
            self.assertEqual(len(rows),0,
 
689
                'cursor.fetchall should return an empty list if '
 
690
                'a select query returns no rows'
 
691
                )
 
692
            
 
693
        finally:
 
694
            con.close()
 
695
    
 
696
    def test_mixedfetch(self):
 
697
        con = self._connect()
 
698
        try:
 
699
            cur = con.cursor()
 
700
            self.executeDDL1(cur)
 
701
            for sql in self._populate():
 
702
                cur.execute(sql)
 
703
 
 
704
            cur.execute('select name from %sbooze' % self.table_prefix)
 
705
            rows1  = cur.fetchone()
 
706
            rows23 = cur.fetchmany(2)
 
707
            rows4  = cur.fetchone()
 
708
            rows56 = cur.fetchall()
 
709
            self.assertTrue(cur.rowcount in (-1,6))
 
710
            self.assertEqual(len(rows23),2,
 
711
                'fetchmany returned incorrect number of rows'
 
712
                )
 
713
            self.assertEqual(len(rows56),2,
 
714
                'fetchall returned incorrect number of rows'
 
715
                )
 
716
 
 
717
            rows = [rows1[0]]
 
718
            rows.extend([rows23[0][0],rows23[1][0]])
 
719
            rows.append(rows4[0])
 
720
            rows.extend([rows56[0][0],rows56[1][0]])
 
721
            rows.sort()
 
722
            for i in range(0,len(self.samples)):
 
723
                self.assertEqual(rows[i],self.samples[i],
 
724
                    'incorrect data retrieved or inserted'
 
725
                    )
 
726
        finally:
 
727
            con.close()
 
728
 
 
729
    def help_nextset_setUp(self,cur):
 
730
        ''' Should create a procedure called deleteme
 
731
            that returns two result sets, first the 
 
732
            number of rows in booze then "name from booze"
 
733
        '''
 
734
        raise NotImplementedError('Helper not implemented')
 
735
        #sql="""
 
736
        #    create procedure deleteme as
 
737
        #    begin
 
738
        #        select count(*) from booze
 
739
        #        select name from booze
 
740
        #    end
 
741
        #"""
 
742
        #cur.execute(sql)
 
743
 
 
744
    def help_nextset_tearDown(self,cur):
 
745
        'If cleaning up is needed after nextSetTest'
 
746
        raise NotImplementedError('Helper not implemented')
 
747
        #cur.execute("drop procedure deleteme")
 
748
 
 
749
    def test_nextset(self):
 
750
        con = self._connect()
 
751
        try:
 
752
            cur = con.cursor()
 
753
            if not hasattr(cur,'nextset'):
 
754
                return
 
755
 
 
756
            try:
 
757
                self.executeDDL1(cur)
 
758
                sql=self._populate()
 
759
                for sql in self._populate():
 
760
                    cur.execute(sql)
 
761
 
 
762
                self.help_nextset_setUp(cur)
 
763
 
 
764
                cur.callproc('deleteme')
 
765
                numberofrows=cur.fetchone()
 
766
                self.assertEqual(numberofrows[0], len(self.samples))
 
767
                assert cur.nextset()
 
768
                names=cur.fetchall()
 
769
                assert len(names) == len(self.samples)
 
770
                s=cur.nextset()
 
771
                assert s == None,'No more return sets, should return None'
 
772
            finally:
 
773
                self.help_nextset_tearDown(cur)
 
774
 
 
775
        finally:
 
776
            con.close()
 
777
 
 
778
    #def test_nextset(self):
 
779
    #    raise NotImplementedError('Drivers need to override this test')
 
780
 
 
781
    def test_arraysize(self):
 
782
        # Not much here - rest of the tests for this are in test_fetchmany
 
783
        con = self._connect()
 
784
        try:
 
785
            cur = con.cursor()
 
786
            self.assertTrue(hasattr(cur,'arraysize'),
 
787
                'cursor.arraysize must be defined'
 
788
                )
 
789
        finally:
 
790
            con.close()
 
791
 
 
792
    def test_setinputsizes(self):
 
793
        con = self._connect()
 
794
        try:
 
795
            cur = con.cursor()
 
796
            cur.setinputsizes( (25,) )
 
797
            self._paraminsert(cur) # Make sure cursor still works
 
798
        finally:
 
799
            con.close()
 
800
 
 
801
    def test_setoutputsize_basic(self):
 
802
        # Basic test is to make sure setoutputsize doesn't blow up
 
803
        con = self._connect()
 
804
        try:
 
805
            cur = con.cursor()
 
806
            cur.setoutputsize(1000)
 
807
            cur.setoutputsize(2000,0)
 
808
            self._paraminsert(cur) # Make sure the cursor still works
 
809
        finally:
 
810
            con.close()
 
811
 
 
812
    def test_setoutputsize(self):
 
813
        # Real test for setoutputsize is driver dependant
 
814
        raise NotImplementedError('Driver needed to override this test')
 
815
 
 
816
    def test_None(self):
 
817
        con = self._connect()
 
818
        try:
 
819
            cur = con.cursor()
 
820
            self.executeDDL1(cur)
 
821
            cur.execute('insert into %sbooze values (NULL)' % self.table_prefix)
 
822
            cur.execute('select name from %sbooze' % self.table_prefix)
 
823
            r = cur.fetchall()
 
824
            self.assertEqual(len(r),1)
 
825
            self.assertEqual(len(r[0]),1)
 
826
            self.assertEqual(r[0][0],None,'NULL value not returned as None')
 
827
        finally:
 
828
            con.close()
 
829
 
 
830
    def test_Date(self):
 
831
        d1 = self.driver.Date(2002,12,25)
 
832
        d2 = self.driver.DateFromTicks(time.mktime((2002,12,25,0,0,0,0,0,0)))
 
833
        # Can we assume this? API doesn't specify, but it seems implied
 
834
        # self.assertEqual(str(d1),str(d2))
 
835
 
 
836
    def test_Time(self):
 
837
        t1 = self.driver.Time(13,45,30)
 
838
        t2 = self.driver.TimeFromTicks(time.mktime((2001,1,1,13,45,30,0,0,0)))
 
839
        # Can we assume this? API doesn't specify, but it seems implied
 
840
        # self.assertEqual(str(t1),str(t2))
 
841
 
 
842
    def test_Timestamp(self):
 
843
        t1 = self.driver.Timestamp(2002,12,25,13,45,30)
 
844
        t2 = self.driver.TimestampFromTicks(
 
845
            time.mktime((2002,12,25,13,45,30,0,0,0))
 
846
            )
 
847
        # Can we assume this? API doesn't specify, but it seems implied
 
848
        # self.assertEqual(str(t1),str(t2))
 
849
 
 
850
    def test_Binary(self):
 
851
        b = self.driver.Binary(str2bytes('Something'))
 
852
        b = self.driver.Binary(str2bytes(''))
 
853
 
 
854
    def test_STRING(self):
 
855
        self.assertTrue(hasattr(self.driver,'STRING'),
 
856
            'module.STRING must be defined'
 
857
            )
 
858
 
 
859
    def test_BINARY(self):
 
860
        self.assertTrue(hasattr(self.driver,'BINARY'),
 
861
            'module.BINARY must be defined.'
 
862
            )
 
863
 
 
864
    def test_NUMBER(self):
 
865
        self.assertTrue(hasattr(self.driver,'NUMBER'),
 
866
            'module.NUMBER must be defined.'
 
867
            )
 
868
 
 
869
    def test_DATETIME(self):
 
870
        self.assertTrue(hasattr(self.driver,'DATETIME'),
 
871
            'module.DATETIME must be defined.'
 
872
            )
 
873
 
 
874
    def test_ROWID(self):
 
875
        self.assertTrue(hasattr(self.driver,'ROWID'),
 
876
            'module.ROWID must be defined.'
 
877
            )
 
878