~ubuntu-branches/debian/jessie/sqlalchemy/jessie

« back to all changes in this revision

Viewing changes to test/sql/test_defaults.py

  • Committer: Package Import Robot
  • Author(s): Piotr Ożarowski, Jakub Wilk, Piotr Ożarowski
  • Date: 2013-07-06 20:53:52 UTC
  • mfrom: (1.4.23) (16.1.17 experimental)
  • Revision ID: package-import@ubuntu.com-20130706205352-ryppl1eto3illd79
Tags: 0.8.2-1
[ Jakub Wilk ]
* Use canonical URIs for Vcs-* fields.

[ Piotr Ożarowski ]
* New upstream release
* Upload to unstable
* Build depend on python3-all instead of -dev, extensions are not built for
  Python 3.X 

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
from test.lib.testing import eq_, assert_raises, assert_raises_message
 
1
from sqlalchemy.testing import eq_, assert_raises_message, assert_raises
2
2
import datetime
3
3
from sqlalchemy.schema import CreateSequence, DropSequence
4
 
from sqlalchemy.sql import select, text, literal_column
 
4
from sqlalchemy.sql import select, text
5
5
import sqlalchemy as sa
6
 
from test.lib import testing, engines
 
6
from sqlalchemy import testing
 
7
from sqlalchemy.testing import engines
7
8
from sqlalchemy import MetaData, Integer, String, ForeignKey, Boolean, exc,\
8
9
                Sequence, func, literal, Unicode
9
10
from sqlalchemy.types import TypeDecorator, TypeEngine
10
 
from test.lib.schema import Table, Column
11
 
from test.lib.testing import eq_
 
11
from sqlalchemy.testing.schema import Table, Column
12
12
from sqlalchemy.dialects import sqlite
13
 
from test.lib import fixtures
 
13
from sqlalchemy.testing import fixtures
 
14
 
 
15
t = f = f2 = ts = currenttime = metadata = default_generator = None
14
16
 
15
17
class DefaultTest(fixtures.TestBase):
16
18
 
20
22
 
21
23
        db = testing.db
22
24
        metadata = MetaData(db)
23
 
        default_generator = {'x':50}
 
25
        default_generator = {'x': 50}
24
26
 
25
27
        def mydefault():
26
28
            default_generator['x'] += 1
132
134
 
133
135
    def test_bad_arg_signature(self):
134
136
        ex_msg = \
135
 
          "ColumnDefault Python function takes zero or one positional arguments"
 
137
          "ColumnDefault Python function takes zero "\
 
138
          "or one positional arguments"
136
139
 
137
 
        def fn1(x, y): pass
138
 
        def fn2(x, y, z=3): pass
 
140
        def fn1(x, y):
 
141
            pass
 
142
        def fn2(x, y, z=3):
 
143
            pass
139
144
        class fn3(object):
140
145
            def __init__(self, x, y):
141
146
                pass
150
155
                                     sa.ColumnDefault, fn)
151
156
 
152
157
    def test_arg_signature(self):
153
 
        def fn1(): pass
154
 
        def fn2(): pass
155
 
        def fn3(x=1): pass
156
 
        def fn4(x=1, y=2, z=3): pass
 
158
        def fn1():
 
159
            pass
 
160
        def fn2():
 
161
            pass
 
162
        def fn3(x=1):
 
163
            eq_(x, 1)
 
164
        def fn4(x=1, y=2, z=3):
 
165
            eq_(x, 1)
157
166
        fn5 = list
158
 
        class fn6(object):
 
167
        class fn6a(object):
159
168
            def __init__(self, x):
160
 
                pass
161
 
        class fn6(object):
 
169
                eq_(x, "context")
 
170
        class fn6b(object):
162
171
            def __init__(self, x, y=3):
163
 
                pass
 
172
                eq_(x, "context")
164
173
        class FN7(object):
165
174
            def __call__(self, x):
166
 
                pass
 
175
                eq_(x, "context")
167
176
        fn7 = FN7()
168
177
        class FN8(object):
169
178
            def __call__(self, x, y=3):
170
 
                pass
 
179
                eq_(x, "context")
171
180
        fn8 = FN8()
172
181
 
173
 
        for fn in fn1, fn2, fn3, fn4, fn5, fn6, fn7, fn8:
 
182
        for fn in fn1, fn2, fn3, fn4, fn5, fn6a, fn6b, fn7, fn8:
174
183
            c = sa.ColumnDefault(fn)
 
184
            c.arg("context")
175
185
 
176
186
    @testing.fails_on('firebird', 'Data type unknown')
177
187
    def test_standalone(self):
279
289
        eq_(set(r.context.postfetch_cols),
280
290
            set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]))
281
291
 
282
 
        eq_(t.select(t.c.col1==54).execute().fetchall(),
 
292
        eq_(t.select(t.c.col1 == 54).execute().fetchall(),
283
293
            [(54, 'imthedefault', f, ts, ts, ctexec, True, False,
284
294
              12, today, None)])
285
295
 
287
297
    def test_insertmany(self):
288
298
        # MySQL-Python 1.2.2 breaks functions in execute_many :(
289
299
        if (testing.against('mysql+mysqldb') and
290
 
            testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
 
300
                testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
291
301
            return
292
302
 
293
 
        r = t.insert().execute({}, {}, {})
 
303
        t.insert().execute({}, {}, {})
294
304
 
295
305
        ctexec = currenttime.scalar()
296
306
        l = t.select().execute()
333
343
        assert_raises_message(exc.StatementError,
334
344
            "A value is required for bind parameter 'col7', in parameter group 1",
335
345
            t.insert().execute,
336
 
            {'col4':7, 'col7':12, 'col8':19},
337
 
            {'col4':7, 'col8':19},
338
 
            {'col4':7, 'col7':12, 'col8':19},
 
346
            {'col4': 7, 'col7': 12, 'col8': 19},
 
347
            {'col4': 7, 'col8': 19},
 
348
            {'col4': 7, 'col7': 12, 'col8': 19},
339
349
        )
340
350
 
341
351
    def test_insert_values(self):
342
 
        t.insert(values={'col3':50}).execute()
 
352
        t.insert(values={'col3': 50}).execute()
343
353
        l = t.select().execute()
344
354
        eq_(50, l.first()['col3'])
345
355
 
347
357
    def test_updatemany(self):
348
358
        # MySQL-Python 1.2.2 breaks functions in execute_many :(
349
359
        if (testing.against('mysql+mysqldb') and
350
 
            testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
 
360
                testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2)):
351
361
            return
352
362
 
353
363
        t.insert().execute({}, {}, {})
354
364
 
355
 
        t.update(t.c.col1==sa.bindparam('pkval')).execute(
356
 
            {'pkval':51,'col7':None, 'col8':None, 'boolcol1':False})
 
365
        t.update(t.c.col1 == sa.bindparam('pkval')).execute(
 
366
            {'pkval': 51, 'col7': None, 'col8': None, 'boolcol1': False})
357
367
 
358
 
        t.update(t.c.col1==sa.bindparam('pkval')).execute(
359
 
            {'pkval':51,},
360
 
            {'pkval':52,},
361
 
            {'pkval':53,})
 
368
        t.update(t.c.col1 == sa.bindparam('pkval')).execute(
 
369
            {'pkval': 51},
 
370
            {'pkval': 52},
 
371
            {'pkval': 53})
362
372
 
363
373
        l = t.select().execute()
364
374
        ctexec = currenttime.scalar()
375
385
    def test_update(self):
376
386
        r = t.insert().execute()
377
387
        pk = r.inserted_primary_key[0]
378
 
        t.update(t.c.col1==pk).execute(col4=None, col5=None)
 
388
        t.update(t.c.col1 == pk).execute(col4=None, col5=None)
379
389
        ctexec = currenttime.scalar()
380
 
        l = t.select(t.c.col1==pk).execute()
 
390
        l = t.select(t.c.col1 == pk).execute()
381
391
        l = l.first()
382
392
        eq_(l,
383
393
            (pk, 'im the update', f2, None, None, ctexec, True, False,
388
398
    def test_update_values(self):
389
399
        r = t.insert().execute()
390
400
        pk = r.inserted_primary_key[0]
391
 
        t.update(t.c.col1==pk, values={'col3': 55}).execute()
392
 
        l = t.select(t.c.col1==pk).execute()
 
401
        t.update(t.c.col1 == pk, values={'col3': 55}).execute()
 
402
        l = t.select(t.c.col1 == pk).execute()
393
403
        l = l.first()
394
404
        eq_(55, l['col3'])
395
405
 
420
430
        if not returning and not testing.db.dialect.implicit_returning:
421
431
            engine = testing.db
422
432
        else:
423
 
            engine = engines.testing_engine(options={'implicit_returning':returning})
 
433
            engine = engines.testing_engine(
 
434
                            options={'implicit_returning': returning})
424
435
        engine.execute(t2.insert(), nextid=1)
425
436
        r = engine.execute(t1.insert(), data='hi')
426
437
        eq_([1], r.inserted_primary_key)
464
475
        self.assert_(last not in ids)
465
476
        ids.add(last)
466
477
 
467
 
        rs = bind.execute(aitable.insert(values={'int1':func.length('four')}))
 
478
        rs = bind.execute(aitable.insert(values={'int1': func.length('four')}))
468
479
        last = rs.inserted_primary_key[0]
469
480
        self.assert_(last)
470
481
        self.assert_(last not in ids)
471
482
        ids.add(last)
472
483
 
473
 
        eq_(ids, set([1,2,3,4]))
 
484
        eq_(ids, set([1, 2, 3, 4]))
474
485
 
475
486
        eq_(list(bind.execute(aitable.select().order_by(aitable.c.id))),
476
487
            [(1, 1, None), (2, None, 'row 2'), (3, 3, 'row 3'), (4, 4, None)])
499
510
class EmptyInsertTest(fixtures.TestBase):
500
511
    @testing.exclude('sqlite', '<', (3, 3, 8), 'no empty insert support')
501
512
    @testing.fails_on('oracle', 'FIXME: unknown')
 
513
    @testing.provide_metadata
502
514
    def test_empty_insert(self):
503
 
        metadata = MetaData(testing.db)
504
 
        t1 = Table('t1', metadata,
 
515
        t1 = Table('t1', self.metadata,
505
516
                Column('is_true', Boolean, server_default=('1')))
506
 
        metadata.create_all()
507
 
 
508
 
        try:
509
 
            result = t1.insert().execute()
510
 
            eq_(1, select([func.count(text('*'))], from_obj=t1).scalar())
511
 
            eq_(True, t1.select().scalar())
512
 
        finally:
513
 
            metadata.drop_all()
 
517
        self.metadata.create_all()
 
518
        t1.insert().execute()
 
519
        eq_(1, select([func.count(text('*'))], from_obj=t1).scalar())
 
520
        eq_(True, t1.select().scalar())
514
521
 
515
522
class AutoIncrementTest(fixtures.TablesTest):
516
523
    __requires__ = ('identity',)
554
561
 
555
562
    def test_autoincrement_ignore_fk(self):
556
563
        m = MetaData()
557
 
        y = Table('y', m,
 
564
        Table('y', m,
558
565
            Column('id', Integer(), primary_key=True)
559
566
        )
560
567
        x = Table('x', m,
566
573
 
567
574
    def test_autoincrement_fk_disqualifies(self):
568
575
        m = MetaData()
569
 
        y = Table('y', m,
 
576
        Table('y', m,
570
577
            Column('id', Integer(), primary_key=True)
571
578
        )
572
579
        x = Table('x', m,
584
591
            Column('data', String(20)))
585
592
        nonai.create()
586
593
 
587
 
 
588
 
        try:
 
594
        def go():
589
595
            # postgresql + mysql strict will fail on first row,
590
596
            # mysql in legacy mode fails on second row
591
597
            nonai.insert().execute(data='row 1')
592
598
            nonai.insert().execute(data='row 2')
593
 
            assert False
594
 
        except sa.exc.DBAPIError, e:
595
 
            assert True
 
599
        assert_raises(
 
600
            sa.exc.DBAPIError,
 
601
            go
 
602
        )
596
603
 
597
604
        nonai.insert().execute(id=1, data='row 1')
598
605
 
630
637
 
631
638
    @classmethod
632
639
    def setup_class(cls):
633
 
        cls.seq= Sequence("my_sequence")
 
640
        cls.seq = Sequence("my_sequence")
634
641
        cls.seq.create(testing.db)
635
642
 
636
643
    @classmethod
693
700
            Column('x', Integer)
694
701
        )
695
702
        t1.create(testing.db)
696
 
        testing.db.execute(t1.insert(), [{'x':1}, {'x':300}, {'x':301}])
 
703
        testing.db.execute(t1.insert(), [{'x': 1}, {'x': 300}, {'x': 301}])
697
704
        s = Sequence("my_sequence")
698
705
        eq_(
699
706
            testing.db.execute(
725
732
        pk_col=next_value(), implicit returning is not used."""
726
733
 
727
734
        metadata = self.metadata
728
 
        e = engines.testing_engine(options={'implicit_returning':False})
 
735
        e = engines.testing_engine(options={'implicit_returning': False})
729
736
        s = Sequence("my_sequence")
730
737
        metadata.bind = e
731
738
        t1 = Table('t', metadata,
744
751
        pk_col=next_value(), when implicit returning is used."""
745
752
 
746
753
        metadata = self.metadata
747
 
        e = engines.testing_engine(options={'implicit_returning':True})
 
754
        e = engines.testing_engine(options={'implicit_returning': True})
748
755
        s = Sequence("my_sequence")
749
756
        metadata.bind = e
750
757
        t1 = Table('t', metadata,
790
797
                Sequence("my_seq", optional=True)):
791
798
            assert str(s.next_value().
792
799
                    compile(dialect=testing.db.dialect)) in (
793
 
                "nextval('my_seq')",
794
 
                "gen_id(my_seq, 1)",
795
 
                "my_seq.nextval",
796
 
            )
 
800
                        "nextval('my_seq')",
 
801
                        "gen_id(my_seq, 1)",
 
802
                        "my_seq.nextval",
 
803
                    )
797
804
 
798
805
    def test_nextval_unsupported(self):
799
806
        """test next_value() used on non-sequence platform
819
826
 
820
827
    def test_checkfirst_metadata(self):
821
828
        m = MetaData()
822
 
        s = Sequence("my_sequence", metadata=m)
 
829
        Sequence("my_sequence", metadata=m)
823
830
        m.create_all(testing.db, checkfirst=False)
824
831
        assert self._has_sequence('my_sequence')
825
832
        m.create_all(testing.db, checkfirst=True)
841
848
    @testing.provide_metadata
842
849
    def test_table_overrides_metadata_create(self):
843
850
        metadata = self.metadata
844
 
        s1 = Sequence("s1", metadata=metadata)
 
851
        Sequence("s1", metadata=metadata)
845
852
        s2 = Sequence("s2", metadata=metadata)
846
853
        s3 = Sequence("s3")
847
854
        t = Table('t', metadata,
868
875
        assert not self._has_sequence('s1')
869
876
        assert not self._has_sequence('s2')
870
877
 
871
 
 
 
878
cartitems = sometable = metadata = None
872
879
class TableBoundSequenceTest(fixtures.TestBase):
873
880
    __requires__ = ('sequences',)
874
881
 
881
888
            Column("description", String(40)),
882
889
            Column("createdate", sa.DateTime())
883
890
        )
884
 
        sometable = Table( 'Manager', metadata,
885
 
               Column('obj_id', Integer, Sequence('obj_id_seq'), ),
886
 
               Column('name', String(128)),
887
 
               Column('id', Integer, Sequence('Manager_id_seq', optional=True),
 
891
        sometable = Table('Manager', metadata,
 
892
                Column('obj_id', Integer, Sequence('obj_id_seq')),
 
893
                Column('name', String(128)),
 
894
                Column('id', Integer, Sequence('Manager_id_seq', optional=True),
888
895
                      primary_key=True),
889
896
           )
890
897
 
912
919
    def test_seq_nonpk(self):
913
920
        """test sequences fire off as defaults on non-pk columns"""
914
921
 
915
 
        engine = engines.testing_engine(options={'implicit_returning':False})
 
922
        engine = engines.testing_engine(
 
923
                            options={'implicit_returning': False})
916
924
        result = engine.execute(sometable.insert(), name="somename")
917
925
 
918
926
        assert set(result.postfetch_cols()) == set([sometable.c.obj_id])
921
929
        assert set(result.postfetch_cols()) == set([sometable.c.obj_id])
922
930
 
923
931
        sometable.insert().execute(
924
 
            {'name':'name3'},
925
 
            {'name':'name4'})
 
932
            {'name': 'name3'},
 
933
            {'name': 'name4'})
926
934
        eq_(sometable.select().order_by(sometable.c.id).execute().fetchall(),
927
935
            [(1, "somename", 1),
928
936
             (2, "someother", 2),
971
979
        r = t.insert().values(data=5).execute()
972
980
 
973
981
        # we don't pre-fetch 'server_default'.
974
 
        if 'server_default' in kw and (not testing.db.dialect.implicit_returning or not implicit_returning):
 
982
        if 'server_default' in kw and (not
 
983
                    testing.db.dialect.implicit_returning or
 
984
                    not implicit_returning):
975
985
            eq_(r.inserted_primary_key, [None])
976
986
        else:
977
987
            eq_(r.inserted_primary_key, ['INT_1'])
1100
1110
    @testing.provide_metadata
1101
1111
    def test_int_default_none_on_insert_reflected(self):
1102
1112
        metadata = self.metadata
1103
 
        t = Table('x', metadata,
 
1113
        Table('x', metadata,
1104
1114
                Column('y', Integer,
1105
1115
                        server_default='5', primary_key=True),
1106
1116
                Column('data', String(10)),
1144
1154
 
1145
1155
class UnicodeDefaultsTest(fixtures.TestBase):
1146
1156
    def test_no_default(self):
1147
 
        c = Column(Unicode(32))
 
1157
        Column(Unicode(32))
1148
1158
 
1149
1159
    def test_unicode_default(self):
1150
1160
        # Py3K
1152
1162
        # Py2K
1153
1163
        default = u'foo'
1154
1164
        # end Py2K
1155
 
        c = Column(Unicode(32), default=default)
 
1165
        Column(Unicode(32), default=default)
1156
1166
 
1157
1167
 
1158
1168
    def test_nonunicode_default(self):