~ubuntu-branches/debian/jessie/sqlalchemy/jessie

« back to all changes in this revision

Viewing changes to test/engine/test_reflection.py

  • Committer: Package Import Robot
  • Author(s): Piotr Ożarowski, Jakub Wilk, Piotr Ożarowski
  • Date: 2013-07-06 20:53:52 UTC
  • mfrom: (1.4.23) (16.1.17 experimental)
  • Revision ID: package-import@ubuntu.com-20130706205352-ryppl1eto3illd79
Tags: 0.8.2-1
[ Jakub Wilk ]
* Use canonical URIs for Vcs-* fields.

[ Piotr Ożarowski ]
* New upstream release
* Upload to unstable
* Build depend on python3-all instead of -dev, extensions are not built for
  Python 3.X 

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
from test.lib.testing import eq_, assert_raises, assert_raises_message
2
 
import StringIO, unicodedata
3
 
from sqlalchemy import types as sql_types
4
 
from sqlalchemy import schema, events, event
5
 
from sqlalchemy.engine.reflection import Inspector
 
1
import unicodedata
 
2
import sqlalchemy as sa
 
3
from sqlalchemy import schema, events, event, inspect
6
4
from sqlalchemy import MetaData, Integer, String
7
 
from test.lib.schema import Table, Column
8
 
import sqlalchemy as sa
9
 
from test.lib import ComparesTables, \
10
 
                            testing, engines, AssertsCompiledSQL
11
 
from test.lib import fixtures
12
 
 
13
 
create_inspector = Inspector.from_engine
 
5
from sqlalchemy.testing import ComparesTables, \
 
6
                            engines, AssertsCompiledSQL, fixtures
 
7
from sqlalchemy.testing.schema import Table, Column
 
8
from sqlalchemy.testing import eq_, assert_raises, assert_raises_message
 
9
from sqlalchemy import testing
14
10
 
15
11
metadata, users = None, None
16
12
 
30
26
            Column('test1', sa.CHAR(5), nullable=False),
31
27
            Column('test2', sa.Float(5), nullable=False),
32
28
            Column('test3', sa.Text),
33
 
            Column('test4', sa.Numeric(10, 2), nullable = False),
 
29
            Column('test4', sa.Numeric(10, 2), nullable=False),
34
30
            Column('test5', sa.Date),
35
31
            Column('parent_user_id', sa.Integer,
36
32
                   sa.ForeignKey('engine_users.user_id')),
66
62
    @testing.provide_metadata
67
63
    def test_two_foreign_keys(self):
68
64
        meta = self.metadata
69
 
        t1 = Table(
 
65
        Table(
70
66
            't1',
71
67
            meta,
72
68
            Column('id', sa.Integer, primary_key=True),
74
70
            Column('t3id', sa.Integer, sa.ForeignKey('t3.id')),
75
71
            test_needs_fk=True,
76
72
            )
77
 
        t2 = Table('t2', meta, Column('id', sa.Integer,
78
 
                   primary_key=True), test_needs_fk=True)
79
 
        t3 = Table('t3', meta, Column('id', sa.Integer,
80
 
                   primary_key=True), test_needs_fk=True)
 
73
        Table('t2', meta,
 
74
                    Column('id', sa.Integer, primary_key=True),
 
75
                    test_needs_fk=True)
 
76
        Table('t3', meta,
 
77
                    Column('id', sa.Integer, primary_key=True),
 
78
                    test_needs_fk=True)
81
79
        meta.create_all()
82
80
        meta2 = MetaData()
83
81
        t1r, t2r, t3r = [Table(x, meta2, autoload=True,
122
120
    def test_extend_existing(self):
123
121
        meta = self.metadata
124
122
 
125
 
        t1 = Table('t', meta,
 
123
        Table('t', meta,
126
124
            Column('id', Integer, primary_key=True),
127
125
            Column('x', Integer),
128
126
            Column('y', Integer),
203
201
        assert len(t2.indexes) == 2
204
202
 
205
203
    @testing.provide_metadata
206
 
    def test_autoload_replace_foreign_key(self):
207
 
        a = Table('a', self.metadata, Column('id', Integer, primary_key=True))
208
 
        b = Table('b', self.metadata, Column('id', Integer, primary_key=True),
 
204
    def test_autoload_replace_foreign_key_nonpresent(self):
 
205
        """test autoload_replace=False with col plus FK
 
206
        establishes the FK not present in the DB.
 
207
 
 
208
        """
 
209
        Table('a', self.metadata, Column('id', Integer, primary_key=True))
 
210
        Table('b', self.metadata, Column('id', Integer, primary_key=True),
209
211
                                    Column('a_id', Integer))
210
212
        self.metadata.create_all()
211
213
 
221
223
        eq_(len(b2.constraints), 2)
222
224
 
223
225
    @testing.provide_metadata
 
226
    def test_autoload_replace_foreign_key_ispresent(self):
 
227
        """test autoload_replace=False with col plus FK mirroring
 
228
        DB-reflected FK skips the reflected FK and installs
 
229
        the in-python one only.
 
230
 
 
231
        """
 
232
        Table('a', self.metadata, Column('id', Integer, primary_key=True))
 
233
        Table('b', self.metadata, Column('id', Integer, primary_key=True),
 
234
                                    Column('a_id', Integer, sa.ForeignKey('a.id')))
 
235
        self.metadata.create_all()
 
236
 
 
237
        m2 = MetaData()
 
238
        b2 = Table('b', m2, Column('a_id', Integer, sa.ForeignKey('a.id')))
 
239
        a2 = Table('a', m2, autoload=True, autoload_with=testing.db)
 
240
        b2 = Table('b', m2, extend_existing=True, autoload=True,
 
241
                                autoload_with=testing.db,
 
242
                                autoload_replace=False)
 
243
 
 
244
        assert b2.c.id is not None
 
245
        assert b2.c.a_id.references(a2.c.id)
 
246
        eq_(len(b2.constraints), 2)
 
247
 
 
248
    @testing.provide_metadata
 
249
    def test_autoload_replace_foreign_key_removed(self):
 
250
        """test autoload_replace=False with col minus FK that's in the
 
251
        DB means the FK is skipped and doesn't get installed at all.
 
252
 
 
253
        """
 
254
        Table('a', self.metadata, Column('id', Integer, primary_key=True))
 
255
        Table('b', self.metadata, Column('id', Integer, primary_key=True),
 
256
                                    Column('a_id', Integer, sa.ForeignKey('a.id')))
 
257
        self.metadata.create_all()
 
258
 
 
259
        m2 = MetaData()
 
260
        b2 = Table('b', m2, Column('a_id', Integer))
 
261
        a2 = Table('a', m2, autoload=True, autoload_with=testing.db)
 
262
        b2 = Table('b', m2, extend_existing=True, autoload=True,
 
263
                                autoload_with=testing.db,
 
264
                                autoload_replace=False)
 
265
 
 
266
        assert b2.c.id is not None
 
267
        assert not b2.c.a_id.references(a2.c.id)
 
268
        eq_(len(b2.constraints), 1)
 
269
 
 
270
    @testing.provide_metadata
224
271
    def test_autoload_replace_primary_key(self):
225
 
        a = Table('a', self.metadata, Column('id', Integer))
 
272
        Table('a', self.metadata, Column('id', Integer))
226
273
        self.metadata.create_all()
227
274
 
228
275
        m2 = MetaData()
246
293
        """
247
294
 
248
295
        meta = self.metadata
249
 
        t1 = Table('test', meta,
 
296
        Table('test', meta,
250
297
            Column('id', sa.Integer, primary_key=True),
251
298
            Column('data', sa.String(50)),
252
299
            mysql_engine='MyISAM'
253
300
        )
254
 
        t2 = Table('test2', meta,
 
301
        Table('test2', meta,
255
302
            Column('id', sa.Integer, sa.ForeignKey('test.id'),
256
303
                                        primary_key=True),
257
304
            Column('id2', sa.Integer, primary_key=True),
316
363
        a primary key column"""
317
364
 
318
365
        meta = self.metadata
319
 
        users = Table('users', meta,
 
366
        Table('users', meta,
320
367
            Column('id', sa.Integer, primary_key=True),
321
368
            Column('name', sa.String(30)))
322
 
        addresses = Table('addresses', meta,
 
369
        Table('addresses', meta,
323
370
            Column('id', sa.Integer, primary_key=True),
324
371
            Column('street', sa.String(30)))
325
372
 
334
381
 
335
382
        assert list(a2.primary_key) == [a2.c.id]
336
383
        assert list(u2.primary_key) == [u2.c.id]
337
 
        assert u2.join(a2).onclause.compare(u2.c.id==a2.c.id)
 
384
        assert u2.join(a2).onclause.compare(u2.c.id == a2.c.id)
338
385
 
339
386
        meta3 = MetaData(testing.db)
340
387
        u3 = Table('users', meta3, autoload=True)
345
392
 
346
393
        assert list(a3.primary_key) == [a3.c.id]
347
394
        assert list(u3.primary_key) == [u3.c.id]
348
 
        assert u3.join(a3).onclause.compare(u3.c.id==a3.c.id)
 
395
        assert u3.join(a3).onclause.compare(u3.c.id == a3.c.id)
349
396
 
350
397
    @testing.provide_metadata
351
398
    def test_override_nonexistent_fk(self):
354
401
        is common with MySQL MyISAM tables."""
355
402
 
356
403
        meta = self.metadata
357
 
        users = Table('users', meta,
 
404
        Table('users', meta,
358
405
            Column('id', sa.Integer, primary_key=True),
359
406
            Column('name', sa.String(30)))
360
 
        addresses = Table('addresses', meta,
 
407
        Table('addresses', meta,
361
408
            Column('id', sa.Integer, primary_key=True),
362
409
            Column('street', sa.String(30)),
363
410
            Column('user_id', sa.Integer))
365
412
        meta.create_all()
366
413
        meta2 = MetaData(testing.db)
367
414
        a2 = Table('addresses', meta2,
368
 
                Column('user_id',sa.Integer, sa.ForeignKey('users.id')),
 
415
                Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
369
416
               autoload=True)
370
417
        u2 = Table('users', meta2, autoload=True)
371
418
        assert len(a2.c.user_id.foreign_keys) == 1
414
461
 
415
462
        metadata = self.metadata
416
463
 
417
 
        a = Table('a',
 
464
        Table('a',
418
465
            metadata,
419
466
            Column('x', sa.Integer, primary_key=True),
420
467
            Column('y', sa.Integer, primary_key=True),
421
468
        )
422
469
 
423
 
        b = Table('b',
 
470
        Table('b',
424
471
            metadata,
425
472
            Column('x', sa.Integer, primary_key=True),
426
473
            Column('y', sa.Integer, primary_key=True),
453
500
        and that ForeignKey targeting during reflection still works."""
454
501
 
455
502
        meta = self.metadata
456
 
        a1 = Table('a', meta,
 
503
        Table('a', meta,
457
504
            Column('x', sa.Integer, primary_key=True),
458
505
            Column('z', sa.Integer),
459
506
            test_needs_fk=True
460
507
        )
461
 
        b1 = Table('b', meta,
 
508
        Table('b', meta,
462
509
            Column('y', sa.Integer, sa.ForeignKey('a.x')),
463
510
            test_needs_fk=True
464
511
        )
480
527
        """
481
528
 
482
529
        meta = self.metadata
483
 
        a1 = Table('a', meta,
 
530
        Table('a', meta,
484
531
            Column('x', sa.Integer, primary_key=True),
485
532
            Column('z', sa.Integer),
486
533
            test_needs_fk=True
487
534
        )
488
 
        b1 = Table('b', meta,
 
535
        Table('b', meta,
489
536
            Column('y', sa.Integer, sa.ForeignKey('a.x')),
490
537
            test_needs_fk=True
491
538
        )
504
551
        have that foreign key, and that the FK is not duped. """
505
552
 
506
553
        meta = self.metadata
507
 
        users = Table('users', meta,
 
554
        Table('users', meta,
508
555
            Column('id', sa.Integer, primary_key=True),
509
556
            Column('name', sa.String(30)),
510
557
            test_needs_fk=True)
511
 
        addresses = Table('addresses', meta,
 
558
        Table('addresses', meta,
512
559
            Column('id', sa.Integer, primary_key=True),
513
560
            Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
514
561
            test_needs_fk=True)
516
563
        meta.create_all()
517
564
        meta2 = MetaData(testing.db)
518
565
        a2 = Table('addresses', meta2,
519
 
                Column('user_id',sa.Integer, sa.ForeignKey('users.id')),
 
566
                Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
520
567
                autoload=True)
521
568
        u2 = Table('users', meta2, autoload=True)
522
569
        s = sa.select([a2])
575
622
 
576
623
    def test_fk_error(self):
577
624
        metadata = MetaData(testing.db)
578
 
        slots_table = Table('slots', metadata,
 
625
        Table('slots', metadata,
579
626
            Column('slot_id', sa.Integer, primary_key=True),
580
627
            Column('pkg_id', sa.Integer, sa.ForeignKey('pkgs.pkg_id')),
581
628
            Column('slot', sa.String(128)),
595
642
            id INTEGER NOT NULL,
596
643
            isbn VARCHAR(50) NOT NULL,
597
644
            title VARCHAR(100) NOT NULL,
598
 
            series INTEGER,
599
 
            series_id INTEGER,
 
645
            series INTEGER NOT NULL,
 
646
            series_id INTEGER NOT NULL,
600
647
            UNIQUE(series, series_id),
601
648
            PRIMARY KEY(id, isbn)
602
649
        )""")
680
727
            check_col = 'true'
681
728
        quoter = meta.bind.dialect.identifier_preparer.quote_identifier
682
729
 
683
 
        table_b = Table('false', meta,
 
730
        Table('false', meta,
684
731
                    Column('create', sa.Integer, primary_key=True),
685
 
                    Column('true', sa.Integer,sa.ForeignKey('select.not')),
 
732
                    Column('true', sa.Integer, sa.ForeignKey('select.not')),
686
733
                    sa.CheckConstraint('%s <> 1'
687
734
                        % quoter(check_col), name='limit')
688
735
                    )
696
743
        meta.create_all()
697
744
        index_c.drop()
698
745
        meta2 = MetaData(testing.db)
699
 
        table_a2 = Table('select', meta2, autoload=True)
700
 
        table_b2 = Table('false', meta2, autoload=True)
701
 
        table_c2 = Table('is', meta2, autoload=True)
 
746
        Table('select', meta2, autoload=True)
 
747
        Table('false', meta2, autoload=True)
 
748
        Table('is', meta2, autoload=True)
 
749
 
 
750
    @testing.provide_metadata
 
751
    def _test_reflect_uses_bind(self, fn):
 
752
        from sqlalchemy.pool import AssertionPool
 
753
        e = engines.testing_engine(options={"poolclass": AssertionPool})
 
754
        fn(e)
 
755
 
 
756
    @testing.uses_deprecated
 
757
    def test_reflect_uses_bind_constructor_conn(self):
 
758
        self._test_reflect_uses_bind(lambda e: MetaData(e.connect(),
 
759
                    reflect=True))
 
760
 
 
761
    @testing.uses_deprecated
 
762
    def test_reflect_uses_bind_constructor_engine(self):
 
763
        self._test_reflect_uses_bind(lambda e: MetaData(e, reflect=True))
 
764
 
 
765
    def test_reflect_uses_bind_constructor_conn_reflect(self):
 
766
        self._test_reflect_uses_bind(lambda e: MetaData(e.connect()).reflect())
 
767
 
 
768
    def test_reflect_uses_bind_constructor_engine_reflect(self):
 
769
        self._test_reflect_uses_bind(lambda e: MetaData(e).reflect())
 
770
 
 
771
    def test_reflect_uses_bind_conn_reflect(self):
 
772
        self._test_reflect_uses_bind(lambda e: MetaData().reflect(e.connect()))
 
773
 
 
774
    def test_reflect_uses_bind_engine_reflect(self):
 
775
        self._test_reflect_uses_bind(lambda e: MetaData().reflect(e))
702
776
 
703
777
    @testing.provide_metadata
704
778
    def test_reflect_all(self):
705
779
        existing = testing.db.table_names()
706
780
 
707
 
        names = ['rt_%s' % name for name in ('a','b','c','d','e')]
 
781
        names = ['rt_%s' % name for name in ('a', 'b', 'c', 'd', 'e')]
708
782
        nameset = set(names)
709
783
        for name in names:
710
784
            # be sure our starting environment is sane
745
819
        m6.reflect(only=lambda n, m: False)
746
820
        self.assert_(not m6.tables)
747
821
 
748
 
        m7 = MetaData(testing.db, reflect=True)
 
822
        m7 = MetaData(testing.db)
 
823
        m7.reflect()
749
824
        self.assert_(nameset.issubset(set(m7.tables.keys())))
750
825
 
751
 
        try:
752
 
            m8 = MetaData(reflect=True)
753
 
            self.assert_(False)
754
 
        except sa.exc.ArgumentError, e:
755
 
            self.assert_(e.args[0]
756
 
                         == 'A bind must be supplied in '
757
 
                         'conjunction with reflect=True')
 
826
        m8 = MetaData()
 
827
        assert_raises(
 
828
            sa.exc.UnboundExecutionError,
 
829
            m8.reflect
 
830
        )
758
831
 
759
832
        if existing:
760
833
            print "Other tables present in database, skipping some checks."
771
844
        assert not c.closed
772
845
 
773
846
    def test_inspector_conn_closing(self):
774
 
        m1 = MetaData()
775
847
        c = testing.db.connect()
776
 
        i = Inspector.from_engine(testing.db)
 
848
        inspect(c)
777
849
        assert not c.closed
778
850
 
779
851
    @testing.provide_metadata
783
855
            Column('id', sa.Integer, nullable=False),
784
856
            Column('name', sa.String(20), index=True)
785
857
            )
786
 
        i1 = sa.Index('idx1', t1.c.id, unique=True)
787
 
        i2 = sa.Index('idx2', t1.c.name, t1.c.id, unique=False)
 
858
        sa.Index('idx1', t1.c.id, unique=True)
 
859
        sa.Index('idx2', t1.c.name, t1.c.id, unique=False)
788
860
        m1.create_all()
789
861
        m2 = MetaData(testing.db)
790
862
        t2 = Table('party', m2, autoload=True)
862
934
                    Column('user_id', sa.Integer,
863
935
                      sa.Sequence('user_id_seq', optional=True),
864
936
                      primary_key=True),
865
 
                    Column('user_name',sa.String(40)))
 
937
                    Column('user_name', sa.String(40)))
866
938
 
867
 
        addresses = Table('email_addresses', metadata,
 
939
        Table('email_addresses', metadata,
868
940
                      Column('address_id', sa.Integer,
869
941
                          sa.Sequence('address_id_seq', optional=True),
870
942
                          primary_key=True),
872
944
                          sa.Integer, sa.ForeignKey(users.c.user_id)),
873
945
                      Column('email_address', sa.String(40)))
874
946
 
875
 
        orders = Table(
 
947
        Table(
876
948
            'orders',
877
949
            metadata,
878
950
            Column('order_id', sa.Integer, sa.Sequence('order_id_seq',
882
954
            Column('description', sa.String(50)),
883
955
            Column('isopen', sa.Integer),
884
956
            )
885
 
        orderitems = Table('items', metadata, Column('item_id', sa.INT,
 
957
        Table('items', metadata,
 
958
                        Column('item_id', sa.INT,
886
959
                           sa.Sequence('items_id_seq', optional=True),
887
 
                           primary_key=True), Column('order_id',
 
960
                               primary_key=True),
 
961
                        Column('order_id',
888
962
                           sa.INT, sa.ForeignKey('orders')),
889
963
                           Column('item_name', sa.VARCHAR(50)))
890
964
 
891
 
    def test_sorter( self ):
 
965
    def test_sorter(self):
892
966
        tables = metadata.sorted_tables
893
967
        table_names = [t.name for t in tables]
894
968
        ua = [n for n in table_names if n in ('users', 'email_addresses')]
914
988
 
915
989
    def test_createdrop(self):
916
990
        metadata.create_all(bind=testing.db)
917
 
        eq_( testing.db.has_table('items'), True )
918
 
        eq_( testing.db.has_table('email_addresses'), True )
 
991
        eq_(testing.db.has_table('items'), True)
 
992
        eq_(testing.db.has_table('email_addresses'), True)
919
993
        metadata.create_all(bind=testing.db)
920
 
        eq_( testing.db.has_table('items'), True )
 
994
        eq_(testing.db.has_table('items'), True)
921
995
 
922
996
        metadata.drop_all(bind=testing.db)
923
 
        eq_( testing.db.has_table('items'), False )
924
 
        eq_( testing.db.has_table('email_addresses'), False )
 
997
        eq_(testing.db.has_table('items'), False)
 
998
        eq_(testing.db.has_table('email_addresses'), False)
925
999
        metadata.drop_all(bind=testing.db)
926
 
        eq_( testing.db.has_table('items'), False )
 
1000
        eq_(testing.db.has_table('items'), False)
927
1001
 
928
1002
    def test_tablenames(self):
929
1003
        metadata.create_all(bind=testing.db)
946
1020
                        Column('id', sa.Integer),
947
1021
                        Column('user_id', sa.Integer))
948
1022
 
949
 
        fk = sa.ForeignKeyConstraint(['user_id'],[users.c.id])
 
1023
        fk = sa.ForeignKeyConstraint(['user_id'], [users.c.id])
950
1024
 
951
1025
        addresses.append_constraint(fk)
952
1026
        addresses.append_constraint(fk)
960
1034
        testing.db.connect().close()
961
1035
 
962
1036
        cls.bind = bind = engines.utf8_engine(
963
 
                            options={'convert_unicode' : True})
 
1037
                            options={'convert_unicode': True})
964
1038
 
965
1039
        cls.metadata = metadata = MetaData()
966
1040
 
984
1058
        # are really limited unless you're on PG or SQLite
985
1059
 
986
1060
        # forget about it on these backends
987
 
        if testing.against('sybase', 'maxdb', 'oracle'):
 
1061
        if not testing.requires.unicode_ddl.enabled:
988
1062
            names = no_multibyte_period
989
1063
        # mysql can't handle casing usually
990
1064
        elif testing.against("mysql") and \
991
 
            not testing.requires._has_mysql_fully_case_sensitive():
 
1065
                not testing.requires._has_mysql_fully_case_sensitive():
992
1066
            names = no_multibyte_period.union(no_case_sensitivity)
993
1067
        # mssql + pyodbc + freetds can't compare multibyte names to
994
1068
        # information_schema.tables.table_name
998
1072
            names = no_multibyte_period.union(full)
999
1073
 
1000
1074
        for tname, cname, ixname in names:
1001
 
            t = Table(tname, metadata, Column('id', sa.Integer,
1002
 
                  sa.Sequence(cname + '_id_seq'), primary_key=True),
1003
 
                  Column(cname, Integer)
 
1075
            t = Table(tname, metadata,
 
1076
                        Column('id', sa.Integer,
 
1077
                                sa.Sequence(cname + '_id_seq'),
 
1078
                                primary_key=True),
 
1079
                        Column(cname, Integer)
1004
1080
                  )
1005
1081
            schema.Index(ixname, t.c[cname])
1006
1082
 
1031
1107
 
1032
1108
        # Jython 2.5 on Java 5 lacks unicodedata.normalize
1033
1109
 
1034
 
        if not names.issubset(reflected) and hasattr(unicodedata,'normalize'):
 
1110
        if not names.issubset(reflected) and hasattr(unicodedata, 'normalize'):
1035
1111
 
1036
1112
            # Python source files in the utf-8 coding seem to
1037
1113
            # normalize literals as NFC (and the above are
1044
1120
            # Yep.  But still ensure that bulk reflection and
1045
1121
            # create/drop work with either normalization.
1046
1122
 
1047
 
        r = MetaData(bind, reflect=True)
 
1123
        r = MetaData(bind)
 
1124
        r.reflect()
1048
1125
        r.drop_all(checkfirst=False)
1049
1126
        r.create_all(checkfirst=False)
1050
1127
 
1051
1128
    @testing.requires.unicode_connections
1052
1129
    def test_get_names(self):
1053
 
        inspector = Inspector.from_engine(self.bind)
 
1130
        inspector = inspect(self.bind)
1054
1131
        names = dict(
1055
1132
            (tname, (cname, ixname)) for tname, cname, ixname in self.names
1056
1133
        )
1067
1144
class SchemaTest(fixtures.TestBase):
1068
1145
 
1069
1146
    @testing.requires.schemas
1070
 
    @testing.fails_on_everything_except("postgresql", "unimplemented feature")
 
1147
    @testing.requires.cross_schema_fk_reflection
1071
1148
    def test_has_schema(self):
1072
1149
        eq_(testing.db.dialect.has_schema(testing.db, 'test_schema'), True)
1073
1150
        eq_(testing.db.dialect.has_schema(testing.db, 'sa_fake_schema_123'), False)
1074
1151
 
1075
 
    @testing.crashes('firebird', 'No schema support')
 
1152
    @testing.requires.schemas
1076
1153
    @testing.fails_on('sqlite', 'FIXME: unknown')
1077
 
    # fixme: revisit these below.
1078
 
    @testing.fails_on('access', 'FIXME: unknown')
1079
1154
    @testing.fails_on('sybase', 'FIXME: unknown')
1080
1155
    def test_explicit_default_schema(self):
1081
1156
        engine = testing.db
1092
1167
        assert bool(schema)
1093
1168
 
1094
1169
        metadata = MetaData(engine)
1095
 
        table1 = Table('table1', metadata,
 
1170
        Table('table1', metadata,
1096
1171
                       Column('col1', sa.Integer, primary_key=True),
1097
1172
                       test_needs_fk=True,
1098
1173
                       schema=schema)
1099
 
        table2 = Table('table2', metadata,
 
1174
        Table('table2', metadata,
1100
1175
                       Column('col1', sa.Integer, primary_key=True),
1101
1176
                       Column('col2', sa.Integer,
1102
1177
                              sa.ForeignKey('%s.table1.col1' % schema)),
1108
1183
            assert len(metadata.tables) == 2
1109
1184
            metadata.clear()
1110
1185
 
1111
 
            table1 = Table('table1', metadata, autoload=True, schema=schema)
1112
 
            table2 = Table('table2', metadata, autoload=True, schema=schema)
 
1186
            Table('table1', metadata, autoload=True, schema=schema)
 
1187
            Table('table2', metadata, autoload=True, schema=schema)
1113
1188
            assert len(metadata.tables) == 2
1114
1189
        finally:
1115
1190
            metadata.drop_all()
1116
1191
 
1117
 
    @testing.crashes('firebird', 'No schema support')
1118
 
    # fixme: revisit these below.
1119
 
    @testing.fails_on('access', 'FIXME: unknown')
 
1192
    @testing.requires.schemas
1120
1193
    @testing.fails_on('sybase', 'FIXME: unknown')
1121
1194
    def test_explicit_default_schema_metadata(self):
1122
1195
        engine = testing.db
1132
1205
        assert bool(schema)
1133
1206
 
1134
1207
        metadata = MetaData(engine, schema=schema)
1135
 
        table1 = Table('table1', metadata,
 
1208
        Table('table1', metadata,
1136
1209
                       Column('col1', sa.Integer, primary_key=True),
1137
1210
                       test_needs_fk=True)
1138
 
        table2 = Table('table2', metadata,
 
1211
        Table('table2', metadata,
1139
1212
                       Column('col1', sa.Integer, primary_key=True),
1140
1213
                       Column('col2', sa.Integer,
1141
1214
                              sa.ForeignKey('table1.col1')),
1146
1219
            assert len(metadata.tables) == 2
1147
1220
            metadata.clear()
1148
1221
 
1149
 
            table1 = Table('table1', metadata, autoload=True)
1150
 
            table2 = Table('table2', metadata, autoload=True)
 
1222
            Table('table1', metadata, autoload=True)
 
1223
            Table('table2', metadata, autoload=True)
1151
1224
            assert len(metadata.tables) == 2
1152
1225
        finally:
1153
1226
            metadata.drop_all()
1166
1239
                'test_schema.email_addresses'])
1167
1240
        )
1168
1241
 
1169
 
class HasSequenceTest(fixtures.TestBase):
1170
 
 
1171
 
    @testing.requires.sequences
1172
 
    def test_has_sequence(self):
1173
 
        metadata = MetaData()
1174
 
        users = Table('users', metadata, Column('user_id', sa.Integer,
1175
 
                      sa.Sequence('user_id_seq'), primary_key=True),
1176
 
                      Column('user_name', sa.String(40)))
1177
 
        metadata.create_all(bind=testing.db)
1178
 
        try:
1179
 
            eq_(testing.db.dialect.has_sequence(testing.db,
1180
 
                'user_id_seq'), True)
1181
 
        finally:
1182
 
            metadata.drop_all(bind=testing.db)
1183
 
        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
1184
 
            False)
1185
 
 
1186
1242
    @testing.requires.schemas
1187
 
    @testing.requires.sequences
1188
 
    def test_has_sequence_schema(self):
1189
 
        test_schema = 'test_schema'
1190
 
        s1 = sa.Sequence('user_id_seq', schema=test_schema)
1191
 
        s2 = sa.Sequence('user_id_seq')
1192
 
        testing.db.execute(schema.CreateSequence(s1))
1193
 
        testing.db.execute(schema.CreateSequence(s2))
1194
 
        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
1195
 
            schema=test_schema), True)
1196
 
        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
1197
 
            True)
1198
 
        testing.db.execute(schema.DropSequence(s1))
1199
 
        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
1200
 
            schema=test_schema), False)
1201
 
        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
1202
 
            True)
1203
 
        testing.db.execute(schema.DropSequence(s2))
1204
 
        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq',
1205
 
            schema=test_schema), False)
1206
 
        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'),
1207
 
            False)
 
1243
    @testing.requires.cross_schema_fk_reflection
 
1244
    @testing.provide_metadata
 
1245
    def test_reflect_all_schemas_default_overlap(self):
 
1246
        t1 = Table('t', self.metadata,
 
1247
                Column('id', Integer, primary_key=True))
 
1248
 
 
1249
        t2 = Table('t', self.metadata,
 
1250
            Column('id1', sa.ForeignKey('t.id')),
 
1251
            schema="test_schema"
 
1252
        )
 
1253
 
 
1254
        self.metadata.create_all()
 
1255
        m2 = MetaData()
 
1256
        m2.reflect(testing.db, schema="test_schema")
 
1257
 
 
1258
        m3 = MetaData()
 
1259
        m3.reflect(testing.db)
 
1260
        m3.reflect(testing.db, schema="test_schema")
 
1261
 
 
1262
        eq_(
 
1263
            set((t.name, t.schema) for t in m2.tables.values()),
 
1264
            set((t.name, t.schema) for t in m3.tables.values())
 
1265
        )
 
1266
 
1208
1267
 
1209
1268
 
1210
1269
 
1223
1282
        Column('test1', sa.CHAR(5), nullable=False),
1224
1283
        Column('test2', sa.Float(5), nullable=False),
1225
1284
        Column('test3', sa.Text),
1226
 
        Column('test4', sa.Numeric(10, 2), nullable = False),
 
1285
        Column('test4', sa.Numeric(10, 2), nullable=False),
1227
1286
        Column('test5', sa.Date),
1228
1287
        Column('test5_1', sa.TIMESTAMP),
1229
1288
        Column('parent_user_id', sa.Integer,
1238
1297
        test_needs_fk=True,
1239
1298
    )
1240
1299
    dingalings = Table("dingalings", meta,
1241
 
              Column('dingaling_id', sa.Integer, primary_key=True),
1242
 
              Column('address_id', sa.Integer,
1243
 
                    sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)),
1244
 
              Column('data', sa.String(30)),
1245
 
              schema=schema,
1246
 
              test_needs_fk=True,
1247
 
        )
 
1300
                Column('dingaling_id', sa.Integer, primary_key=True),
 
1301
                Column('address_id', sa.Integer,
 
1302
                     sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)),
 
1303
                Column('data', sa.String(30)),
 
1304
                schema=schema, test_needs_fk=True,
 
1305
            )
1248
1306
    addresses = Table('email_addresses', meta,
1249
1307
        Column('address_id', sa.Integer),
1250
1308
        Column('remote_user_id', sa.Integer,
1358
1416
        eq_(t2.name, "sOmEtAbLe")
1359
1417
 
1360
1418
 
1361
 
class ComponentReflectionTest(fixtures.TestBase):
1362
 
 
1363
 
    @testing.requires.schemas
1364
 
    def test_get_schema_names(self):
1365
 
        insp = Inspector(testing.db)
1366
 
 
1367
 
        self.assert_('test_schema' in insp.get_schema_names())
1368
 
 
1369
 
    def test_dialect_initialize(self):
1370
 
        engine = engines.testing_engine()
1371
 
        assert not hasattr(engine.dialect, 'default_schema_name')
1372
 
        insp = Inspector(engine)
1373
 
        assert hasattr(engine.dialect, 'default_schema_name')
1374
 
 
1375
 
    def test_get_default_schema_name(self):
1376
 
        insp = Inspector(testing.db)
1377
 
        eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
1378
 
 
1379
 
    @testing.provide_metadata
1380
 
    def _test_get_table_names(self, schema=None, table_type='table',
1381
 
                              order_by=None):
1382
 
        meta = self.metadata
1383
 
        users, addresses, dingalings = createTables(meta, schema)
1384
 
        meta.create_all()
1385
 
        _create_views(meta.bind, schema)
1386
 
        try:
1387
 
            insp = Inspector(meta.bind)
1388
 
            if table_type == 'view':
1389
 
                table_names = insp.get_view_names(schema)
1390
 
                table_names.sort()
1391
 
                answer = ['email_addresses_v', 'users_v']
1392
 
            else:
1393
 
                table_names = insp.get_table_names(schema,
1394
 
                                                   order_by=order_by)
1395
 
                if order_by == 'foreign_key':
1396
 
                    answer = ['dingalings', 'email_addresses', 'users']
1397
 
                    eq_(table_names, answer)
1398
 
                else:
1399
 
                    answer = ['dingalings', 'email_addresses', 'users']
1400
 
                    eq_(sorted(table_names), answer)
1401
 
        finally:
1402
 
            _drop_views(meta.bind, schema)
1403
 
 
1404
 
    def test_get_table_names(self):
1405
 
        self._test_get_table_names()
1406
 
 
1407
 
    def test_get_table_names_fks(self):
1408
 
        self._test_get_table_names(order_by='foreign_key')
1409
 
 
1410
 
    @testing.requires.schemas
1411
 
    def test_get_table_names_with_schema(self):
1412
 
        self._test_get_table_names('test_schema')
1413
 
 
1414
 
    @testing.requires.views
1415
 
    def test_get_view_names(self):
1416
 
        self._test_get_table_names(table_type='view')
1417
 
 
1418
 
    @testing.requires.schemas
1419
 
    def test_get_view_names_with_schema(self):
1420
 
        self._test_get_table_names('test_schema', table_type='view')
1421
 
 
1422
 
    def _test_get_columns(self, schema=None, table_type='table'):
1423
 
        meta = MetaData(testing.db)
1424
 
        users, addresses, dingalings = createTables(meta, schema)
1425
 
        table_names = ['users', 'email_addresses']
1426
 
        meta.create_all()
1427
 
        if table_type == 'view':
1428
 
            _create_views(meta.bind, schema)
1429
 
            table_names = ['users_v', 'email_addresses_v']
1430
 
        try:
1431
 
            insp = Inspector(meta.bind)
1432
 
            for table_name, table in zip(table_names, (users,
1433
 
                    addresses)):
1434
 
                schema_name = schema
1435
 
                cols = insp.get_columns(table_name, schema=schema_name)
1436
 
                self.assert_(len(cols) > 0, len(cols))
1437
 
 
1438
 
                # should be in order
1439
 
 
1440
 
                for i, col in enumerate(table.columns):
1441
 
                    eq_(col.name, cols[i]['name'])
1442
 
                    ctype = cols[i]['type'].__class__
1443
 
                    ctype_def = col.type
1444
 
                    if isinstance(ctype_def, sa.types.TypeEngine):
1445
 
                        ctype_def = ctype_def.__class__
1446
 
 
1447
 
                    # Oracle returns Date for DateTime.
1448
 
 
1449
 
                    if testing.against('oracle') and ctype_def \
1450
 
                        in (sql_types.Date, sql_types.DateTime):
1451
 
                        ctype_def = sql_types.Date
1452
 
 
1453
 
                    # assert that the desired type and return type share
1454
 
                    # a base within one of the generic types.
1455
 
 
1456
 
                    self.assert_(len(set(ctype.__mro__).
1457
 
                        intersection(ctype_def.__mro__).intersection([
1458
 
                        sql_types.Integer,
1459
 
                        sql_types.Numeric,
1460
 
                        sql_types.DateTime,
1461
 
                        sql_types.Date,
1462
 
                        sql_types.Time,
1463
 
                        sql_types.String,
1464
 
                        sql_types._Binary,
1465
 
                        ])) > 0, '%s(%s), %s(%s)' % (col.name,
1466
 
                                col.type, cols[i]['name'], ctype))
1467
 
        finally:
1468
 
            if table_type == 'view':
1469
 
                _drop_views(meta.bind, schema)
1470
 
            meta.drop_all()
1471
 
 
1472
 
    def test_get_columns(self):
1473
 
        self._test_get_columns()
1474
 
 
1475
 
    @testing.requires.schemas
1476
 
    def test_get_columns_with_schema(self):
1477
 
        self._test_get_columns(schema='test_schema')
1478
 
 
1479
 
    @testing.requires.views
1480
 
    def test_get_view_columns(self):
1481
 
        self._test_get_columns(table_type='view')
1482
 
 
1483
 
    @testing.requires.views
1484
 
    @testing.requires.schemas
1485
 
    def test_get_view_columns_with_schema(self):
1486
 
        self._test_get_columns(schema='test_schema', table_type='view')
1487
 
 
1488
 
    @testing.provide_metadata
1489
 
    def _test_get_primary_keys(self, schema=None):
1490
 
        meta = self.metadata
1491
 
        users, addresses, dingalings = createTables(meta, schema)
1492
 
        meta.create_all()
1493
 
        insp = Inspector(meta.bind)
1494
 
        users_pkeys = insp.get_primary_keys(users.name,
1495
 
                                            schema=schema)
1496
 
        eq_(users_pkeys,  ['user_id'])
1497
 
        addr_cons = insp.get_pk_constraint(addresses.name,
1498
 
                                            schema=schema)
1499
 
 
1500
 
        addr_pkeys = addr_cons['constrained_columns']
1501
 
        eq_(addr_pkeys,  ['address_id'])
1502
 
 
1503
 
        @testing.requires.reflects_pk_names
1504
 
        def go():
1505
 
            eq_(addr_cons['name'], 'email_ad_pk')
1506
 
        go()
1507
 
 
1508
 
    def test_get_primary_keys(self):
1509
 
        self._test_get_primary_keys()
1510
 
 
1511
 
    @testing.fails_on('sqlite', 'no schemas')
1512
 
    def test_get_primary_keys_with_schema(self):
1513
 
        self._test_get_primary_keys(schema='test_schema')
1514
 
 
1515
 
    @testing.provide_metadata
1516
 
    def _test_get_foreign_keys(self, schema=None):
1517
 
        meta = self.metadata
1518
 
        users, addresses, dingalings = createTables(meta, schema)
1519
 
        meta.create_all()
1520
 
        insp = Inspector(meta.bind)
1521
 
        expected_schema = schema
1522
 
        # users
1523
 
        users_fkeys = insp.get_foreign_keys(users.name,
1524
 
                                            schema=schema)
1525
 
        fkey1 = users_fkeys[0]
1526
 
 
1527
 
        @testing.fails_on('sqlite', 'no support for constraint names')
1528
 
        def go():
1529
 
            self.assert_(fkey1['name'] is not None)
1530
 
        go()
1531
 
 
1532
 
        eq_(fkey1['referred_schema'], expected_schema)
1533
 
        eq_(fkey1['referred_table'], users.name)
1534
 
        eq_(fkey1['referred_columns'], ['user_id', ])
1535
 
        eq_(fkey1['constrained_columns'], ['parent_user_id'])
1536
 
        #addresses
1537
 
        addr_fkeys = insp.get_foreign_keys(addresses.name,
1538
 
                                           schema=schema)
1539
 
        fkey1 = addr_fkeys[0]
1540
 
        @testing.fails_on('sqlite', 'no support for constraint names')
1541
 
        def go():
1542
 
            self.assert_(fkey1['name'] is not None)
1543
 
        go()
1544
 
        eq_(fkey1['referred_schema'], expected_schema)
1545
 
        eq_(fkey1['referred_table'], users.name)
1546
 
        eq_(fkey1['referred_columns'], ['user_id', ])
1547
 
        eq_(fkey1['constrained_columns'], ['remote_user_id'])
1548
 
 
1549
 
    def test_get_foreign_keys(self):
1550
 
        self._test_get_foreign_keys()
1551
 
 
1552
 
    @testing.requires.schemas
1553
 
    def test_get_foreign_keys_with_schema(self):
1554
 
        self._test_get_foreign_keys(schema='test_schema')
1555
 
 
1556
 
    @testing.provide_metadata
1557
 
    def _test_get_indexes(self, schema=None):
1558
 
        meta = self.metadata
1559
 
        users, addresses, dingalings = createTables(meta, schema)
1560
 
        meta.create_all()
1561
 
        createIndexes(meta.bind, schema)
1562
 
        # The database may decide to create indexes for foreign keys, etc.
1563
 
        # so there may be more indexes than expected.
1564
 
        insp = Inspector(meta.bind)
1565
 
        indexes = insp.get_indexes('users', schema=schema)
1566
 
        expected_indexes = [
1567
 
            {'unique': False,
1568
 
             'column_names': ['test1', 'test2'],
1569
 
             'name': 'users_t_idx'}]
1570
 
        index_names = [d['name'] for d in indexes]
1571
 
        for e_index in expected_indexes:
1572
 
            assert e_index['name'] in index_names
1573
 
            index = indexes[index_names.index(e_index['name'])]
1574
 
            for key in e_index:
1575
 
                eq_(e_index[key], index[key])
1576
 
 
1577
 
    def test_get_indexes(self):
1578
 
        self._test_get_indexes()
1579
 
 
1580
 
    @testing.requires.schemas
1581
 
    def test_get_indexes_with_schema(self):
1582
 
        self._test_get_indexes(schema='test_schema')
1583
 
 
1584
 
    @testing.provide_metadata
1585
 
    def _test_get_view_definition(self, schema=None):
1586
 
        meta = self.metadata
1587
 
        users, addresses, dingalings = createTables(meta, schema)
1588
 
        meta.create_all()
1589
 
        _create_views(meta.bind, schema)
1590
 
        view_name1 = 'users_v'
1591
 
        view_name2 = 'email_addresses_v'
1592
 
        try:
1593
 
            insp = Inspector(meta.bind)
1594
 
            v1 = insp.get_view_definition(view_name1, schema=schema)
1595
 
            self.assert_(v1)
1596
 
            v2 = insp.get_view_definition(view_name2, schema=schema)
1597
 
            self.assert_(v2)
1598
 
        finally:
1599
 
            _drop_views(meta.bind, schema)
1600
 
 
1601
 
    @testing.requires.views
1602
 
    def test_get_view_definition(self):
1603
 
        self._test_get_view_definition()
1604
 
 
1605
 
    @testing.requires.views
1606
 
    @testing.requires.schemas
1607
 
    def test_get_view_definition_with_schema(self):
1608
 
        self._test_get_view_definition(schema='test_schema')
1609
 
 
1610
 
    @testing.only_on("postgresql", "PG specific feature")
1611
 
    @testing.provide_metadata
1612
 
    def _test_get_table_oid(self, table_name, schema=None):
1613
 
        meta = self.metadata
1614
 
        users, addresses, dingalings = createTables(meta, schema)
1615
 
        meta.create_all()
1616
 
        insp = create_inspector(meta.bind)
1617
 
        oid = insp.get_table_oid(table_name, schema)
1618
 
        self.assert_(isinstance(oid, (int, long)))
1619
 
 
1620
 
    def test_get_table_oid(self):
1621
 
        self._test_get_table_oid('users')
1622
 
 
1623
 
    @testing.requires.schemas
1624
 
    def test_get_table_oid_with_schema(self):
1625
 
        self._test_get_table_oid('users', schema='test_schema')
1626
1419
 
1627
1420
class ColumnEventsTest(fixtures.TestBase):
1628
1421
    @classmethod
1648
1441
        from sqlalchemy.schema import Table
1649
1442
 
1650
1443
        m = MetaData(testing.db)
1651
 
        def column_reflect(table, column_info):
 
1444
        def column_reflect(insp, table, column_info):
1652
1445
            if column_info['name'] == col:
1653
1446
                column_info.update(update)
1654
1447
 
1664
1457
 
1665
1458
    def test_override_key(self):
1666
1459
        self._do_test(
1667
 
            "x", {"key":"YXZ"},
 
1460
            "x", {"key": "YXZ"},
1668
1461
            lambda table: eq_(table.c.YXZ.name, "x")
1669
1462
        )
1670
1463
 
1672
1465
        def assert_(table):
1673
1466
            assert isinstance(table.c.x.type, sa.String)
1674
1467
        self._do_test(
1675
 
            "x", {"type":sa.String},
 
1468
            "x", {"type": sa.String},
1676
1469
            assert_
1677
1470
        )
1678
1471
 
1679
1472
    def test_override_info(self):
1680
1473
        self._do_test(
1681
 
            "x", {"info":{"a":"b"}},
1682
 
            lambda table: eq_(table.c.x.info, {"a":"b"})
 
1474
            "x", {"info": {"a": "b"}},
 
1475
            lambda table: eq_(table.c.x.info, {"a": "b"})
1683
1476
        )