~ubuntu-branches/debian/jessie/sqlalchemy/jessie

« back to all changes in this revision

Viewing changes to test/sql/test_selectable.py

  • Committer: Package Import Robot
  • Author(s): Piotr Ożarowski, Jakub Wilk, Piotr Ożarowski
  • Date: 2013-07-06 20:53:52 UTC
  • mfrom: (1.4.23) (16.1.17 experimental)
  • Revision ID: package-import@ubuntu.com-20130706205352-ryppl1eto3illd79
Tags: 0.8.2-1
[ Jakub Wilk ]
* Use canonical URIs for Vcs-* fields.

[ Piotr Ożarowski ]
* New upstream release
* Upload to unstable
* Build depend on python3-all instead of -dev, extensions are not built for
  Python 3.X 

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
"""Test various algorithmic properties of selectables."""
2
2
 
3
 
from test.lib.testing import eq_, assert_raises, \
4
 
    assert_raises_message
 
3
from sqlalchemy.testing import eq_, assert_raises, \
 
4
    assert_raises_message, is_
5
5
from sqlalchemy import *
6
 
from test.lib import *
7
 
from sqlalchemy.sql import util as sql_util, visitors
 
6
from sqlalchemy.testing import fixtures, AssertsCompiledSQL, \
 
7
    AssertsExecutionResults
 
8
from sqlalchemy import testing
 
9
from sqlalchemy.sql import util as sql_util, visitors, expression
8
10
from sqlalchemy import exc
9
11
from sqlalchemy.sql import table, column, null
10
12
from sqlalchemy import util
11
 
from test.lib import fixtures
12
13
 
13
14
metadata = MetaData()
14
15
table1 = Table('table1', metadata,
60
61
 
61
62
        eq_(
62
63
            s1.c.foo.proxy_set,
63
 
            set([s1.c.foo, scalar_select, scalar_select.element, table1.c.col1])
 
64
            set([s1.c.foo, scalar_select, scalar_select.element])
64
65
        )
65
66
        eq_(
66
67
            s2.c.foo.proxy_set,
67
 
            set([s2.c.foo, scalar_select, scalar_select.element, table1.c.col1])
 
68
            set([s2.c.foo, scalar_select, scalar_select.element])
68
69
        )
69
70
 
70
71
        assert s1.corresponding_column(scalar_select) is s1.c.foo
100
101
        s = select([keyed])
101
102
        eq_(s.c.colx.key, 'colx')
102
103
 
103
 
        # this would change to 'colx'
104
 
        # with #2397
105
104
        eq_(s.c.colx.name, 'x')
106
105
 
107
106
        assert s.corresponding_column(keyed.c.colx) is s.c.colx
113
112
        assert sel2.corresponding_column(keyed.c.coly) is sel2.c.coly
114
113
        assert sel2.corresponding_column(keyed.c.z) is sel2.c.z
115
114
 
 
115
    def test_keyed_label_gen(self):
 
116
        s = select([keyed]).apply_labels()
 
117
 
 
118
        assert s.corresponding_column(keyed.c.colx) is s.c.keyed_colx
 
119
        assert s.corresponding_column(keyed.c.coly) is s.c.keyed_coly
 
120
        assert s.corresponding_column(keyed.c.z) is s.c.keyed_z
 
121
 
 
122
        sel2 = s.alias()
 
123
        assert sel2.corresponding_column(keyed.c.colx) is sel2.c.keyed_colx
 
124
        assert sel2.corresponding_column(keyed.c.coly) is sel2.c.keyed_coly
 
125
        assert sel2.corresponding_column(keyed.c.z) is sel2.c.keyed_z
 
126
 
 
127
    def test_keyed_c_collection_upper(self):
 
128
        c = Column('foo', Integer, key='bar')
 
129
        t = Table('t', MetaData(), c)
 
130
        is_(t.c.bar, c)
 
131
 
 
132
    def test_keyed_c_collection_lower(self):
 
133
        c = column('foo')
 
134
        c.key = 'bar'
 
135
        t = table('t', c)
 
136
        is_(t.c.bar, c)
 
137
 
 
138
    def test_clone_c_proxy_key_upper(self):
 
139
        c = Column('foo', Integer, key='bar')
 
140
        t = Table('t', MetaData(), c)
 
141
        s = select([t])._clone()
 
142
        assert c in s.c.bar.proxy_set
 
143
 
 
144
    def test_clone_c_proxy_key_lower(self):
 
145
        c = column('foo')
 
146
        c.key = 'bar'
 
147
        t = table('t', c)
 
148
        s = select([t])._clone()
 
149
        assert c in s.c.bar.proxy_set
 
150
 
 
151
    def test_cloned_intersection(self):
 
152
        t1 = table('t1', column('x'))
 
153
        t2 = table('t2', column('x'))
 
154
 
 
155
        s1 = t1.select()
 
156
        s2 = t2.select()
 
157
        s3 = t1.select()
 
158
 
 
159
        s1c1 = s1._clone()
 
160
        s1c2 = s1._clone()
 
161
        s2c1 = s2._clone()
 
162
        s3c1 = s3._clone()
 
163
 
 
164
        eq_(
 
165
            expression._cloned_intersection(
 
166
                [s1c1, s3c1], [s2c1, s1c2]
 
167
            ),
 
168
            set([s1c1])
 
169
        )
 
170
 
 
171
    def test_cloned_difference(self):
 
172
        t1 = table('t1', column('x'))
 
173
        t2 = table('t2', column('x'))
 
174
 
 
175
        s1 = t1.select()
 
176
        s2 = t2.select()
 
177
        s3 = t1.select()
 
178
 
 
179
        s1c1 = s1._clone()
 
180
        s1c2 = s1._clone()
 
181
        s2c1 = s2._clone()
 
182
        s2c2 = s2._clone()
 
183
        s3c1 = s3._clone()
 
184
 
 
185
        eq_(
 
186
            expression._cloned_difference(
 
187
                [s1c1, s2c1, s3c1], [s2c1, s1c2]
 
188
            ),
 
189
            set([s3c1])
 
190
        )
 
191
 
116
192
 
117
193
    def test_distance_on_aliases(self):
118
194
        a1 = table1.alias('a1')
140
216
 
141
217
    def test_clone_append_column(self):
142
218
        sel = select([literal_column('1').label('a')])
 
219
        eq_(sel.c.keys(), ['a'])
143
220
        cloned = visitors.ReplacingCloningVisitor().traverse(sel)
144
221
        cloned.append_column(literal_column('2').label('b'))
145
222
        cloned.append_column(func.foo())
192
269
            "table1.col3, table1.colx FROM table1) AS anon_1"
193
270
        )
194
271
 
 
272
    def test_type_coerce_preserve_subq(self):
 
273
        class MyType(TypeDecorator):
 
274
            impl = Integer
 
275
 
 
276
        stmt = select([type_coerce(column('x'), MyType).label('foo')])
 
277
        stmt2 = stmt.select()
 
278
        assert isinstance(stmt._raw_columns[0].type, MyType)
 
279
        assert isinstance(stmt.c.foo.type, MyType)
 
280
        assert isinstance(stmt2.c.foo.type, MyType)
195
281
 
196
282
    def test_select_on_table(self):
197
283
        sel = select([table1, table2], use_labels=True)
239
325
                             table2.c.coly]))
240
326
        s1 = table1.select(use_labels=True)
241
327
        s2 = table2.select(use_labels=True)
242
 
        c = u.corresponding_column(s1.c.table1_col2)
 
328
 
243
329
        assert u.corresponding_column(s1.c.table1_col2) is u.c.col2
244
330
        assert u.corresponding_column(s2.c.table2_col2) is u.c.col2
245
331
 
339
425
        criterion = a.c.table1_col1 == table2.c.col2
340
426
        self.assert_(criterion.compare(j.onclause))
341
427
 
 
428
    def test_scalar_cloned_comparator(self):
 
429
        sel = select([table1.c.col1]).as_scalar()
 
430
        expr = sel == table1.c.col1
 
431
 
 
432
        sel2 = visitors.ReplacingCloningVisitor().traverse(sel)
 
433
 
 
434
        expr2 = sel2 == table1.c.col1
 
435
        is_(expr2.left, sel2)
342
436
 
343
437
    def test_column_labels(self):
344
438
        a = select([table1.c.col1.label('acol1'),
368
462
        metadata = MetaData()
369
463
        a = Table('a', metadata,
370
464
            Column('id', Integer, primary_key=True))
371
 
        b = Table('b', metadata,
372
 
            Column('id', Integer, primary_key=True),
373
 
            Column('aid', Integer, ForeignKey('a.id')),
374
 
            )
375
465
 
376
 
        j1 = a.outerjoin(b)
377
466
        j2 = select([a.c.id.label('aid')]).alias('bar')
378
467
 
379
 
        j3 = a.join(j2, j2.c.aid==a.c.id)
 
468
        j3 = a.join(j2, j2.c.aid == a.c.id)
380
469
 
381
470
        j4 = select([j3]).alias('foo')
382
471
        assert j4.corresponding_column(j2.c.aid) is j4.c.aid
433
522
    def test_unusual_column_elements_boolean_clauselist(self):
434
523
        """test that BooleanClauseList is placed as single element in .c."""
435
524
 
436
 
        c2 = and_(table1.c.col2==5, table1.c.col3==4)
 
525
        c2 = and_(table1.c.col2 == 5, table1.c.col3 == 4)
437
526
        s = select([table1.c.col1, c2])
438
527
        eq_(
439
528
            list(s.c),
460
549
        c1 = Column('c1', Integer)
461
550
        c2 = Column('c2', Integer)
462
551
 
463
 
        s = select([c1]).where(c1==5)
 
552
        s = select([c1]).where(c1 == 5)
464
553
 
465
554
        t = Table('t', MetaData(), c1, c2)
466
555
 
477
566
        t1 = Table('t1', m, Column('x', Integer))
478
567
 
479
568
        c1 = Column('c1', Integer)
480
 
        s = select([c1]).where(c1==5).select_from(t1)
 
569
        s = select([c1]).where(c1 == 5).select_from(t1)
481
570
 
482
571
        t2 = Table('t2', MetaData(), c1)
483
572
 
569
658
        Table('t1', MetaData(), c1)
570
659
        eq_(c1._label, "t1_c1")
571
660
 
 
661
 
 
662
class RefreshForNewColTest(fixtures.TestBase):
 
663
    def test_join_uninit(self):
 
664
        a = table('a', column('x'))
 
665
        b = table('b', column('y'))
 
666
        j = a.join(b, a.c.x == b.c.y)
 
667
 
 
668
        q = column('q')
 
669
        b.append_column(q)
 
670
        j._refresh_for_new_column(q)
 
671
        assert j.c.b_q is q
 
672
 
 
673
    def test_join_init(self):
 
674
        a = table('a', column('x'))
 
675
        b = table('b', column('y'))
 
676
        j = a.join(b, a.c.x == b.c.y)
 
677
        j.c
 
678
        q = column('q')
 
679
        b.append_column(q)
 
680
        j._refresh_for_new_column(q)
 
681
        assert j.c.b_q is q
 
682
 
 
683
 
 
684
    def test_join_samename_init(self):
 
685
        a = table('a', column('x'))
 
686
        b = table('b', column('y'))
 
687
        j = a.join(b, a.c.x == b.c.y)
 
688
        j.c
 
689
        q = column('x')
 
690
        b.append_column(q)
 
691
        j._refresh_for_new_column(q)
 
692
        assert j.c.b_x is q
 
693
 
 
694
    def test_select_samename_init(self):
 
695
        a = table('a', column('x'))
 
696
        b = table('b', column('y'))
 
697
        s = select([a, b]).apply_labels()
 
698
        s.c
 
699
        q = column('x')
 
700
        b.append_column(q)
 
701
        s._refresh_for_new_column(q)
 
702
        assert q in s.c.b_x.proxy_set
 
703
 
 
704
    def test_aliased_select_samename_uninit(self):
 
705
        a = table('a', column('x'))
 
706
        b = table('b', column('y'))
 
707
        s = select([a, b]).apply_labels().alias()
 
708
        q = column('x')
 
709
        b.append_column(q)
 
710
        s._refresh_for_new_column(q)
 
711
        assert q in s.c.b_x.proxy_set
 
712
 
 
713
    def test_aliased_select_samename_init(self):
 
714
        a = table('a', column('x'))
 
715
        b = table('b', column('y'))
 
716
        s = select([a, b]).apply_labels().alias()
 
717
        s.c
 
718
        q = column('x')
 
719
        b.append_column(q)
 
720
        s._refresh_for_new_column(q)
 
721
        assert q in s.c.b_x.proxy_set
 
722
 
 
723
    def test_aliased_select_irrelevant(self):
 
724
        a = table('a', column('x'))
 
725
        b = table('b', column('y'))
 
726
        c = table('c', column('z'))
 
727
        s = select([a, b]).apply_labels().alias()
 
728
        s.c
 
729
        q = column('x')
 
730
        c.append_column(q)
 
731
        s._refresh_for_new_column(q)
 
732
        assert 'c_x' not in s.c
 
733
 
 
734
    def test_aliased_select_no_cols_clause(self):
 
735
        a = table('a', column('x'))
 
736
        s = select([a.c.x]).apply_labels().alias()
 
737
        s.c
 
738
        q = column('q')
 
739
        a.append_column(q)
 
740
        s._refresh_for_new_column(q)
 
741
        assert 'a_q' not in s.c
 
742
 
 
743
    def test_union_uninit(self):
 
744
        a = table('a', column('x'))
 
745
        s1 = select([a])
 
746
        s2 = select([a])
 
747
        s3 = s1.union(s2)
 
748
        q = column('q')
 
749
        a.append_column(q)
 
750
        s3._refresh_for_new_column(q)
 
751
        assert a.c.q in s3.c.q.proxy_set
 
752
 
 
753
    def test_union_init_raises(self):
 
754
        a = table('a', column('x'))
 
755
        s1 = select([a])
 
756
        s2 = select([a])
 
757
        s3 = s1.union(s2)
 
758
        s3.c
 
759
        q = column('q')
 
760
        a.append_column(q)
 
761
        assert_raises_message(
 
762
                NotImplementedError,
 
763
                "CompoundSelect constructs don't support addition of "
 
764
                "columns to underlying selectables",
 
765
                s3._refresh_for_new_column, q
 
766
        )
 
767
    def test_nested_join_uninit(self):
 
768
        a = table('a', column('x'))
 
769
        b = table('b', column('y'))
 
770
        c = table('c', column('z'))
 
771
        j = a.join(b, a.c.x == b.c.y).join(c, b.c.y == c.c.z)
 
772
 
 
773
        q = column('q')
 
774
        b.append_column(q)
 
775
        j._refresh_for_new_column(q)
 
776
        assert j.c.b_q is q
 
777
 
 
778
    def test_nested_join_init(self):
 
779
        a = table('a', column('x'))
 
780
        b = table('b', column('y'))
 
781
        c = table('c', column('z'))
 
782
        j = a.join(b, a.c.x == b.c.y).join(c, b.c.y == c.c.z)
 
783
 
 
784
        j.c
 
785
        q = column('q')
 
786
        b.append_column(q)
 
787
        j._refresh_for_new_column(q)
 
788
        assert j.c.b_q is q
 
789
 
572
790
class AnonLabelTest(fixtures.TestBase):
573
 
    """Test behaviors that we hope to change with [ticket:2168]."""
 
791
    """Test behaviors fixed by [ticket:2168]."""
574
792
 
575
793
    def test_anon_labels_named_column(self):
576
794
        c1 = column('x')
577
795
 
578
 
        # surprising
579
 
        assert c1.label(None) is c1
580
 
        eq_(str(select([c1.label(None)])), "SELECT x")
 
796
        assert c1.label(None) is not c1
 
797
        eq_(str(select([c1.label(None)])), "SELECT x AS x_1")
581
798
 
582
799
    def test_anon_labels_literal_column(self):
583
800
        c1 = literal_column('x')
584
 
        assert c1.label(None) is c1
585
 
        eq_(str(select([c1.label(None)])), "SELECT x")
 
801
        assert c1.label(None) is not c1
 
802
        eq_(str(select([c1.label(None)])), "SELECT x AS x_1")
586
803
 
587
804
    def test_anon_labels_func(self):
588
805
        c1 = func.count('*')
606
823
    def test_join_condition(self):
607
824
        m = MetaData()
608
825
        t1 = Table('t1', m, Column('id', Integer))
609
 
        t2 = Table('t2', m, Column('id', Integer), Column('t1id',
610
 
                   ForeignKey('t1.id')))
611
 
        t3 = Table('t3', m, Column('id', Integer), Column('t1id',
612
 
                   ForeignKey('t1.id')), Column('t2id',
613
 
                   ForeignKey('t2.id')))
614
 
        t4 = Table('t4', m, Column('id', Integer), Column('t2id',
615
 
                   ForeignKey('t2.id')))
 
826
        t2 = Table('t2', m, Column('id', Integer),
 
827
                    Column('t1id', ForeignKey('t1.id')))
 
828
        t3 = Table('t3', m,
 
829
                    Column('id', Integer),
 
830
                    Column('t1id', ForeignKey('t1.id')),
 
831
                    Column('t2id', ForeignKey('t2.id')))
 
832
        t4 = Table('t4', m, Column('id', Integer),
 
833
                    Column('t2id', ForeignKey('t2.id')))
 
834
        t5 = Table('t5', m,
 
835
                    Column('t1id1', ForeignKey('t1.id')),
 
836
                    Column('t1id2', ForeignKey('t1.id')),
 
837
            )
616
838
        t1t2 = t1.join(t2)
617
839
        t2t3 = t2.join(t3)
 
840
 
618
841
        for (left, right, a_subset, expected) in [
619
842
            (t1, t2, None, t1.c.id == t2.c.t1id),
620
843
            (t1t2, t3, t2, t1t2.c.t2_id == t3.c.t2id),
628
851
            assert expected.compare(sql_util.join_condition(left,
629
852
                                    right, a_subset=a_subset))
630
853
 
 
854
 
631
855
        # these are ambiguous, or have no joins
632
856
        for left, right, a_subset in [
633
857
            (t1t2, t3, None),
634
858
            (t2t3, t1, None),
635
859
            (t1, t4, None),
636
860
            (t1t2, t2t3, None),
 
861
            (t5, t1, None),
 
862
            (t5.select(use_labels=True), t1, None)
637
863
        ]:
638
864
            assert_raises(
639
865
                exc.ArgumentError,
644
870
        als = t2t3.alias()
645
871
        # test join's behavior, including natural
646
872
        for left, right, expected in [
647
 
            (t1, t2, t1.c.id==t2.c.t1id),
648
 
            (t1t2, t3, t1t2.c.t2_id==t3.c.t2id),
649
 
            (t2t3, t1, t1.c.id==t3.c.t1id),
650
 
            (t2t3, t4, t2t3.c.t2_id==t4.c.t2id),
651
 
            (t2t3, t4, t2t3.c.t2_id==t4.c.t2id),
652
 
            (t2t3.join(t1), t4, t2t3.c.t2_id==t4.c.t2id),
653
 
            (t2t3.join(t1), t4, t2t3.c.t2_id==t4.c.t2id),
654
 
            (t1t2, als, t1t2.c.t2_id==als.c.t3_t2id)
 
873
            (t1, t2, t1.c.id == t2.c.t1id),
 
874
            (t1t2, t3, t1t2.c.t2_id == t3.c.t2id),
 
875
            (t2t3, t1, t1.c.id == t3.c.t1id),
 
876
            (t2t3, t4, t2t3.c.t2_id == t4.c.t2id),
 
877
            (t2t3, t4, t2t3.c.t2_id == t4.c.t2id),
 
878
            (t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id),
 
879
            (t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id),
 
880
            (t1t2, als, t1t2.c.t2_id == als.c.t3_t2id)
655
881
        ]:
656
882
            assert expected.compare(
657
883
                left.join(right).onclause
660
886
 
661
887
 
662
888
        # TODO: this raises due to right side being "grouped", and no
663
 
        # longer has FKs.  Did we want to make _FromGrouping friendlier
 
889
        # longer has FKs.  Did we want to make FromGrouping friendlier
664
890
        # ?
665
891
 
666
892
        assert_raises_message(exc.ArgumentError,
684
910
                            Column('q', Integer, ForeignKey('t22.id')),
685
911
                            )
686
912
        t2 = Table('t2', m, Column('id', Integer))
687
 
        assert sql_util.join_condition(t1, t2).compare(t1.c.x==t2.c.id)
688
 
        assert sql_util.join_condition(t2, t1).compare(t1.c.x==t2.c.id)
 
913
        assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id)
 
914
        assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id)
689
915
 
690
916
    def test_join_cond_no_such_unrelated_column(self):
691
917
        m = MetaData()
692
918
        t1 = Table('t1', m, Column('x', Integer, ForeignKey('t2.id')),
693
919
                            Column('y', Integer, ForeignKey('t3.q')))
694
920
        t2 = Table('t2', m, Column('id', Integer))
695
 
        t3 = Table('t3', m, Column('id', Integer))
696
 
        assert sql_util.join_condition(t1, t2).compare(t1.c.x==t2.c.id)
697
 
        assert sql_util.join_condition(t2, t1).compare(t1.c.x==t2.c.id)
 
921
        Table('t3', m, Column('id', Integer))
 
922
        assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id)
 
923
        assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id)
698
924
 
699
925
    def test_join_cond_no_such_related_table(self):
700
926
        m1 = MetaData()
723
949
        t2 = Table('t2', m, Column('id', Integer))
724
950
        assert_raises_message(
725
951
            exc.NoReferencedColumnError,
726
 
            "Could not create ForeignKey 't2.q' on table 't1': table 't2' has no column named 'q'",
 
952
            "Could not create ForeignKey 't2.q' on table 't1': "
 
953
                "table 't2' has no column named 'q'",
727
954
            sql_util.join_condition, t1, t2
728
955
        )
729
956
 
730
957
        assert_raises_message(
731
958
            exc.NoReferencedColumnError,
732
 
            "Could not create ForeignKey 't2.q' on table 't1': table 't2' has no column named 'q'",
 
959
            "Could not create ForeignKey 't2.q' on table 't1': "
 
960
                "table 't2' has no column named 'q'",
733
961
            sql_util.join_condition, t2, t1
734
962
        )
735
963
 
812
1040
                Column('id', Integer, ForeignKey('a.id'), primary_key=True),
813
1041
                Column('x', Integer, primary_key=True))
814
1042
 
815
 
        j = a.join(b, and_(a.c.id==b.c.id, b.c.x==5))
 
1043
        j = a.join(b, and_(a.c.id == b.c.id, b.c.x == 5))
816
1044
        assert str(j) == "a JOIN b ON a.id = b.id AND b.x = :x_1", str(j)
817
1045
        assert list(j.primary_key) == [a.c.id, b.c.x]
818
1046
 
819
1047
    def test_onclause_direction(self):
820
1048
        metadata = MetaData()
821
1049
 
822
 
        employee = Table( 'Employee', metadata,
 
1050
        employee = Table('Employee', metadata,
823
1051
            Column('name', String(100)),
824
 
            Column('id', Integer, primary_key= True),
 
1052
            Column('id', Integer, primary_key=True),
825
1053
        )
826
1054
 
827
1055
        engineer = Table('Engineer', metadata,
863
1091
 
864
1092
    def test_reduce_selectable(self):
865
1093
        metadata = MetaData()
866
 
        engineers = Table('engineers', metadata, Column('engineer_id',
867
 
                          Integer, primary_key=True),
 
1094
        engineers = Table('engineers', metadata,
 
1095
                        Column('engineer_id', Integer, primary_key=True),
868
1096
                          Column('engineer_name', String(50)))
869
 
        managers = Table('managers', metadata, Column('manager_id',
870
 
                         Integer, primary_key=True),
 
1097
        managers = Table('managers', metadata,
 
1098
                        Column('manager_id', Integer, primary_key=True),
871
1099
                         Column('manager_name', String(50)))
872
1100
        s = select([engineers,
873
1101
                   managers]).where(engineers.c.engineer_name
876
1104
            util.column_set([s.c.engineer_id, s.c.engineer_name,
877
1105
            s.c.manager_id]))
878
1106
 
 
1107
    def test_reduce_generation(self):
 
1108
        m = MetaData()
 
1109
        t1 = Table('t1', m, Column('x', Integer, primary_key=True),
 
1110
                                        Column('y', Integer))
 
1111
        t2 = Table('t2', m, Column('z', Integer, ForeignKey('t1.x')),
 
1112
                                        Column('q', Integer))
 
1113
        s1 = select([t1, t2])
 
1114
        s2 = s1.reduce_columns(only_synonyms=False)
 
1115
        eq_(
 
1116
            set(s2.inner_columns),
 
1117
            set([t1.c.x, t1.c.y, t2.c.q])
 
1118
        )
 
1119
 
 
1120
        s2 = s1.reduce_columns()
 
1121
        eq_(
 
1122
            set(s2.inner_columns),
 
1123
            set([t1.c.x, t1.c.y, t2.c.z, t2.c.q])
 
1124
        )
 
1125
 
 
1126
 
 
1127
    def test_reduce_only_synonym_fk(self):
 
1128
        m = MetaData()
 
1129
        t1 = Table('t1', m, Column('x', Integer, primary_key=True),
 
1130
                                        Column('y', Integer))
 
1131
        t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')),
 
1132
                                        Column('q', Integer, ForeignKey('t1.y')))
 
1133
        s1 = select([t1, t2])
 
1134
        s1 = s1.reduce_columns(only_synonyms=True)
 
1135
        eq_(
 
1136
            set(s1.c),
 
1137
            set([s1.c.x, s1.c.y, s1.c.q])
 
1138
        )
 
1139
 
 
1140
    def test_reduce_only_synonym_lineage(self):
 
1141
        m = MetaData()
 
1142
        t1 = Table('t1', m, Column('x', Integer, primary_key=True),
 
1143
                                        Column('y', Integer),
 
1144
                                        Column('z', Integer)
 
1145
                            )
 
1146
        # test that the first appearance in the columns clause
 
1147
        # wins - t1 is first, t1.c.x wins
 
1148
        s1 = select([t1])
 
1149
        s2 = select([t1, s1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z)
 
1150
        eq_(
 
1151
                set(s2.reduce_columns().inner_columns),
 
1152
                set([t1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z])
 
1153
        )
 
1154
 
 
1155
        # reverse order, s1.c.x wins
 
1156
        s1 = select([t1])
 
1157
        s2 = select([s1, t1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z)
 
1158
        eq_(
 
1159
                set(s2.reduce_columns().inner_columns),
 
1160
                set([s1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z])
 
1161
        )
879
1162
 
880
1163
    def test_reduce_aliased_join(self):
881
1164
        metadata = MetaData()
929
1212
            util.column_set([item_join.c.id, item_join.c.dummy,
930
1213
            item_join.c.child_name]))
931
1214
 
932
 
 
933
1215
    def test_reduce_aliased_union_2(self):
934
1216
        metadata = MetaData()
935
1217
        page_table = Table('page', metadata, Column('id', Integer,
1052
1334
            t.c.x,
1053
1335
            a,
1054
1336
            s,
1055
 
            s2
 
1337
            s2,
 
1338
            t.c.x > 1,
 
1339
            (t.c.x > 1).label(None)
1056
1340
        ]:
1057
1341
            annot = obj._annotate({})
1058
1342
            eq_(set([obj]), set([annot]))
1059
1343
 
 
1344
    def test_compare(self):
 
1345
        t = table('t', column('x'), column('y'))
 
1346
        x_a = t.c.x._annotate({})
 
1347
        assert t.c.x.compare(x_a)
 
1348
        assert x_a.compare(t.c.x)
 
1349
        assert not x_a.compare(t.c.y)
 
1350
        assert not t.c.y.compare(x_a)
 
1351
        assert (t.c.x == 5).compare(x_a == 5)
 
1352
        assert not (t.c.y == 5).compare(x_a == 5)
 
1353
 
 
1354
        s = select([t])
 
1355
        x_p = s.c.x
 
1356
        assert not x_a.compare(x_p)
 
1357
        assert not t.c.x.compare(x_p)
 
1358
        x_p_a = x_p._annotate({})
 
1359
        assert x_p_a.compare(x_p)
 
1360
        assert x_p.compare(x_p_a)
 
1361
        assert not x_p_a.compare(x_a)
 
1362
 
 
1363
    def test_late_name_add(self):
 
1364
        from sqlalchemy.schema import Column
 
1365
        c1 = Column(Integer)
 
1366
        c1_a = c1._annotate({"foo": "bar"})
 
1367
        c1.name = 'somename'
 
1368
        eq_(c1_a.name, 'somename')
 
1369
 
1060
1370
    def test_custom_constructions(self):
1061
1371
        from sqlalchemy.schema import Column
1062
1372
        class MyColumn(Column):
1108
1418
        def visit_binary(b):
1109
1419
            b.right = table1.c.col2
1110
1420
 
1111
 
        b2 = visitors.cloned_traverse(bin, {}, {'binary':visit_binary})
 
1421
        b2 = visitors.cloned_traverse(bin, {}, {'binary': visit_binary})
1112
1422
        assert str(b2) == "table1.col1 = table1.col2"
1113
1423
 
1114
1424
 
1115
 
        b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary'
1116
 
                : visit_binary})
 
1425
        b3 = visitors.cloned_traverse(bin._annotate({}), {}, {'binary':
 
1426
                                                        visit_binary})
1117
1427
        assert str(b3) == 'table1.col1 = table1.col2'
1118
1428
 
1119
1429
        def visit_binary(b):
1120
1430
            b.left = bindparam('bar')
1121
1431
 
1122
 
        b4 = visitors.cloned_traverse(b2, {}, {'binary':visit_binary})
 
1432
        b4 = visitors.cloned_traverse(b2, {}, {'binary': visit_binary})
1123
1433
        assert str(b4) == ":bar = table1.col2"
1124
1434
 
1125
 
        b5 = visitors.cloned_traverse(b3, {}, {'binary':visit_binary})
 
1435
        b5 = visitors.cloned_traverse(b3, {}, {'binary': visit_binary})
1126
1436
        assert str(b5) == ":bar = table1.col2"
1127
1437
 
1128
1438
    def test_annotate_aliased(self):
1152
1462
 
1153
1463
        bin = table1.c.col1 == bindparam('foo', value=None)
1154
1464
 
1155
 
        b2 = sql_util._deep_annotate(bin, {'_orm_adapt':True})
 
1465
        b2 = sql_util._deep_annotate(bin, {'_orm_adapt': True})
1156
1466
        b3 = sql_util._deep_deannotate(b2)
1157
1467
        b4 = sql_util._deep_deannotate(bin)
1158
1468
 
1166
1476
        assert b2.left is not bin.left
1167
1477
        assert b3.left is not b2.left is not bin.left
1168
1478
        assert b4.left is bin.left  # since column is immutable
1169
 
        assert b4.right is not bin.right is not b2.right is not b3.right
 
1479
        # deannotate copies the element
 
1480
        assert bin.right is not b2.right is not b3.right is not b4.right
1170
1481
 
1171
1482
    def test_annotate_unique_traversal(self):
1172
1483
        """test that items are copied only once during
1173
1484
        annotate, deannotate traversal
1174
1485
 
1175
 
        #2453
 
1486
        #2453 - however note this was modified by
 
1487
        #1401, and it's likely that re49563072578
 
1488
        is helping us with the str() comparison
 
1489
        case now, as deannotate is making
 
1490
        clones again in some cases.
1176
1491
        """
1177
1492
        table1 = table('table1', column('x'))
1178
1493
        table2 = table('table2', column('y'))
1179
1494
        a1 = table1.alias()
1180
1495
        s = select([a1.c.x]).select_from(
1181
 
                a1.join(table2, a1.c.x==table2.c.y)
 
1496
                a1.join(table2, a1.c.x == table2.c.y)
1182
1497
            )
1183
 
 
1184
1498
        for sel in (
1185
1499
            sql_util._deep_deannotate(s),
1186
 
            sql_util._deep_annotate(s, {'foo':'bar'}),
1187
1500
            visitors.cloned_traverse(s, {}, {}),
1188
 
            visitors.replacement_traverse(s, {}, lambda x:None)
 
1501
            visitors.replacement_traverse(s, {}, lambda x: None)
1189
1502
        ):
1190
1503
            # the columns clause isn't changed at all
1191
1504
            assert sel._raw_columns[0].table is a1
1192
 
            # the from objects are internally consistent,
1193
 
            # i.e. the Alias at position 0 is the same
1194
 
            # Alias in the Join object in position 1
1195
1505
            assert sel._froms[0] is sel._froms[1].left
1196
 
            eq_(str(s), str(sel))
 
1506
 
 
1507
            eq_(str(s), str(sel))
 
1508
 
 
1509
        # when we are modifying annotations sets only
 
1510
        # partially, each element is copied unconditionally
 
1511
        # when encountered.
 
1512
        for sel in (
 
1513
            sql_util._deep_deannotate(s, {"foo": "bar"}),
 
1514
            sql_util._deep_annotate(s, {'foo': 'bar'}),
 
1515
        ):
 
1516
            assert sel._froms[0] is not sel._froms[1].left
 
1517
 
 
1518
            # but things still work out due to
 
1519
            # re49563072578
 
1520
            eq_(str(s), str(sel))
 
1521
 
 
1522
 
 
1523
    def test_annotate_varied_annot_same_col(self):
 
1524
        """test two instances of the same column with different annotations
 
1525
        preserving them when deep_annotate is run on them.
 
1526
 
 
1527
        """
 
1528
        t1 = table('table1', column("col1"), column("col2"))
 
1529
        s = select([t1.c.col1._annotate({"foo":"bar"})])
 
1530
        s2 = select([t1.c.col1._annotate({"bat":"hoho"})])
 
1531
        s3 = s.union(s2)
 
1532
        sel = sql_util._deep_annotate(s3, {"new": "thing"})
 
1533
 
 
1534
        eq_(
 
1535
            sel.selects[0]._raw_columns[0]._annotations,
 
1536
            {"foo": "bar", "new": "thing"}
 
1537
        )
 
1538
 
 
1539
        eq_(
 
1540
            sel.selects[1]._raw_columns[0]._annotations,
 
1541
            {"bat": "hoho", "new": "thing"}
 
1542
        )
 
1543
 
 
1544
    def test_deannotate_2(self):
 
1545
        table1 = table('table1', column("col1"), column("col2"))
 
1546
        j = table1.c.col1._annotate({"remote": True}) == \
 
1547
                table1.c.col2._annotate({"local": True})
 
1548
        j2 = sql_util._deep_deannotate(j)
 
1549
        eq_(
 
1550
            j.left._annotations, {"remote": True}
 
1551
        )
 
1552
        eq_(
 
1553
            j2.left._annotations, {}
 
1554
        )
 
1555
 
 
1556
    def test_deannotate_3(self):
 
1557
        table1 = table('table1', column("col1"), column("col2"),
 
1558
                            column("col3"), column("col4"))
 
1559
        j = and_(
 
1560
                table1.c.col1._annotate({"remote": True}) ==
 
1561
                table1.c.col2._annotate({"local": True}),
 
1562
                table1.c.col3._annotate({"remote": True}) ==
 
1563
                table1.c.col4._annotate({"local": True})
 
1564
        )
 
1565
        j2 = sql_util._deep_deannotate(j)
 
1566
        eq_(
 
1567
            j.clauses[0].left._annotations, {"remote": True}
 
1568
        )
 
1569
        eq_(
 
1570
            j2.clauses[0].left._annotations, {}
 
1571
        )
1197
1572
 
1198
1573
    def test_annotate_fromlist_preservation(self):
1199
1574
        """test the FROM list in select still works
1207
1582
        table2 = table('table2', column('y'))
1208
1583
        a1 = table1.alias()
1209
1584
        s = select([a1.c.x]).select_from(
1210
 
                a1.join(table2, a1.c.x==table2.c.y)
 
1585
                a1.join(table2, a1.c.x == table2.c.y)
1211
1586
            )
1212
1587
 
1213
1588
        assert_s = select([select([s])])
1214
1589
        for fn in (
1215
1590
            sql_util._deep_deannotate,
1216
 
            lambda s: sql_util._deep_annotate(s, {'foo':'bar'}),
1217
 
            lambda s:visitors.cloned_traverse(s, {}, {}),
1218
 
            lambda s:visitors.replacement_traverse(s, {}, lambda x:None)
 
1591
            lambda s: sql_util._deep_annotate(s, {'foo': 'bar'}),
 
1592
            lambda s: visitors.cloned_traverse(s, {}, {}),
 
1593
            lambda s: visitors.replacement_traverse(s, {}, lambda x: None)
1219
1594
        ):
1220
1595
 
1221
1596
            sel = fn(select([fn(select([fn(s)]))]))
1223
1598
 
1224
1599
 
1225
1600
    def test_bind_unique_test(self):
1226
 
        t1 = table('t', column('a'), column('b'))
 
1601
        table('t', column('a'), column('b'))
1227
1602
 
1228
1603
        b = bindparam("bind", value="x", unique=True)
1229
1604
 
1230
1605
        # the annotation of "b" should render the
1231
1606
        # same.  The "unique" test in compiler should
1232
1607
        # also pass, [ticket:2425]
1233
 
        eq_(str(or_(b, b._annotate({"foo":"bar"}))),
 
1608
        eq_(str(or_(b, b._annotate({"foo": "bar"}))),
1234
1609
            ":bind_1 OR :bind_1")
 
1610
 
 
1611
    def test_comparators_cleaned_out_construction(self):
 
1612
        c = column('a')
 
1613
 
 
1614
        comp1 = c.comparator
 
1615
 
 
1616
        c1 = c._annotate({"foo": "bar"})
 
1617
        comp2 = c1.comparator
 
1618
        assert comp1 is not comp2
 
1619
 
 
1620
    def test_comparators_cleaned_out_reannotate(self):
 
1621
        c = column('a')
 
1622
 
 
1623
        c1 = c._annotate({"foo": "bar"})
 
1624
        comp1 = c1.comparator
 
1625
 
 
1626
        c2 = c1._annotate({"bat": "hoho"})
 
1627
        comp2 = c2.comparator
 
1628
 
 
1629
        assert comp1 is not comp2
 
1630
 
 
1631
    def test_comparator_cleanout_integration(self):
 
1632
        c = column('a')
 
1633
 
 
1634
        c1 = c._annotate({"foo": "bar"})
 
1635
        comp1 = c1.comparator
 
1636
 
 
1637
        c2 = c1._annotate({"bat": "hoho"})
 
1638
        comp2 = c2.comparator
 
1639
 
 
1640
        assert (c2 == 5).left._annotations == {"foo": "bar", "bat": "hoho"}
 
1641
 
 
1642
class WithLabelsTest(fixtures.TestBase):
 
1643
    def _assert_labels_warning(self, s):
 
1644
        assert_raises_message(
 
1645
            exc.SAWarning,
 
1646
            r"replaced by Column.*, which has the same key",
 
1647
            lambda: s.c
 
1648
        )
 
1649
 
 
1650
    def _assert_result_keys(self, s, keys):
 
1651
        compiled = s.compile()
 
1652
        eq_(set(compiled.result_map), set(keys))
 
1653
 
 
1654
    def _assert_subq_result_keys(self, s, keys):
 
1655
        compiled = s.select().compile()
 
1656
        eq_(set(compiled.result_map), set(keys))
 
1657
 
 
1658
    def _names_overlap(self):
 
1659
        m = MetaData()
 
1660
        t1 = Table('t1', m, Column('x', Integer))
 
1661
        t2 = Table('t2', m, Column('x', Integer))
 
1662
        return select([t1, t2])
 
1663
 
 
1664
    def test_names_overlap_nolabel(self):
 
1665
        sel = self._names_overlap()
 
1666
        self._assert_labels_warning(sel)
 
1667
        self._assert_result_keys(sel, ['x'])
 
1668
 
 
1669
    def test_names_overlap_label(self):
 
1670
        sel = self._names_overlap().apply_labels()
 
1671
        eq_(
 
1672
            sel.c.keys(),
 
1673
            ['t1_x', 't2_x']
 
1674
        )
 
1675
        self._assert_result_keys(sel, ['t1_x', 't2_x'])
 
1676
 
 
1677
    def _names_overlap_keys_dont(self):
 
1678
        m = MetaData()
 
1679
        t1 = Table('t1', m, Column('x', Integer, key='a'))
 
1680
        t2 = Table('t2', m, Column('x', Integer, key='b'))
 
1681
        return select([t1, t2])
 
1682
 
 
1683
    def test_names_overlap_keys_dont_nolabel(self):
 
1684
        sel = self._names_overlap_keys_dont()
 
1685
        eq_(
 
1686
            sel.c.keys(),
 
1687
            ['a', 'b']
 
1688
        )
 
1689
        self._assert_result_keys(sel, ['x'])
 
1690
 
 
1691
    def test_names_overlap_keys_dont_label(self):
 
1692
        sel = self._names_overlap_keys_dont().apply_labels()
 
1693
        eq_(
 
1694
            sel.c.keys(),
 
1695
            ['t1_a', 't2_b']
 
1696
        )
 
1697
        self._assert_result_keys(sel, ['t1_x', 't2_x'])
 
1698
 
 
1699
    def _labels_overlap(self):
 
1700
        m = MetaData()
 
1701
        t1 = Table('t', m, Column('x_id', Integer))
 
1702
        t2 = Table('t_x', m, Column('id', Integer))
 
1703
        return select([t1, t2])
 
1704
 
 
1705
    def test_labels_overlap_nolabel(self):
 
1706
        sel = self._labels_overlap()
 
1707
        eq_(
 
1708
            sel.c.keys(),
 
1709
            ['x_id', 'id']
 
1710
        )
 
1711
        self._assert_result_keys(sel, ['x_id', 'id'])
 
1712
 
 
1713
    def test_labels_overlap_label(self):
 
1714
        sel = self._labels_overlap().apply_labels()
 
1715
        t2 = sel.froms[1]
 
1716
        eq_(
 
1717
            sel.c.keys(),
 
1718
            ['t_x_id', t2.c.id.anon_label]
 
1719
        )
 
1720
        self._assert_result_keys(sel, ['t_x_id', 'id_1'])
 
1721
        self._assert_subq_result_keys(sel, ['t_x_id', 'id_1'])
 
1722
 
 
1723
    def _labels_overlap_keylabels_dont(self):
 
1724
        m = MetaData()
 
1725
        t1 = Table('t', m, Column('x_id', Integer, key='a'))
 
1726
        t2 = Table('t_x', m, Column('id', Integer, key='b'))
 
1727
        return select([t1, t2])
 
1728
 
 
1729
    def test_labels_overlap_keylabels_dont_nolabel(self):
 
1730
        sel = self._labels_overlap_keylabels_dont()
 
1731
        eq_(sel.c.keys(), ['a', 'b'])
 
1732
        self._assert_result_keys(sel, ['x_id', 'id'])
 
1733
 
 
1734
    def test_labels_overlap_keylabels_dont_label(self):
 
1735
        sel = self._labels_overlap_keylabels_dont().apply_labels()
 
1736
        eq_(sel.c.keys(), ['t_a', 't_x_b'])
 
1737
        self._assert_result_keys(sel, ['t_x_id', 'id_1'])
 
1738
 
 
1739
    def _keylabels_overlap_labels_dont(self):
 
1740
        m = MetaData()
 
1741
        t1 = Table('t', m, Column('a', Integer, key='x_id'))
 
1742
        t2 = Table('t_x', m, Column('b', Integer, key='id'))
 
1743
        return select([t1, t2])
 
1744
 
 
1745
    def test_keylabels_overlap_labels_dont_nolabel(self):
 
1746
        sel = self._keylabels_overlap_labels_dont()
 
1747
        eq_(sel.c.keys(), ['x_id', 'id'])
 
1748
        self._assert_result_keys(sel, ['a', 'b'])
 
1749
 
 
1750
    def test_keylabels_overlap_labels_dont_label(self):
 
1751
        sel = self._keylabels_overlap_labels_dont().apply_labels()
 
1752
        t2 = sel.froms[1]
 
1753
        eq_(sel.c.keys(), ['t_x_id', t2.c.id.anon_label])
 
1754
        self._assert_result_keys(sel, ['t_a', 't_x_b'])
 
1755
        self._assert_subq_result_keys(sel, ['t_a', 't_x_b'])
 
1756
 
 
1757
    def _keylabels_overlap_labels_overlap(self):
 
1758
        m = MetaData()
 
1759
        t1 = Table('t', m, Column('x_id', Integer, key='x_a'))
 
1760
        t2 = Table('t_x', m, Column('id', Integer, key='a'))
 
1761
        return select([t1, t2])
 
1762
 
 
1763
    def test_keylabels_overlap_labels_overlap_nolabel(self):
 
1764
        sel = self._keylabels_overlap_labels_overlap()
 
1765
        eq_(sel.c.keys(), ['x_a', 'a'])
 
1766
        self._assert_result_keys(sel, ['x_id', 'id'])
 
1767
        self._assert_subq_result_keys(sel, ['x_id', 'id'])
 
1768
 
 
1769
    def test_keylabels_overlap_labels_overlap_label(self):
 
1770
        sel = self._keylabels_overlap_labels_overlap().apply_labels()
 
1771
        t2 = sel.froms[1]
 
1772
        eq_(sel.c.keys(), ['t_x_a', t2.c.a.anon_label])
 
1773
        self._assert_result_keys(sel, ['t_x_id', 'id_1'])
 
1774
        self._assert_subq_result_keys(sel, ['t_x_id', 'id_1'])
 
1775
 
 
1776
    def _keys_overlap_names_dont(self):
 
1777
        m = MetaData()
 
1778
        t1 = Table('t1', m, Column('a', Integer, key='x'))
 
1779
        t2 = Table('t2', m, Column('b', Integer, key='x'))
 
1780
        return select([t1, t2])
 
1781
 
 
1782
    def test_keys_overlap_names_dont_nolabel(self):
 
1783
        sel = self._keys_overlap_names_dont()
 
1784
        self._assert_labels_warning(sel)
 
1785
        self._assert_result_keys(sel, ['a', 'b'])
 
1786
 
 
1787
    def test_keys_overlap_names_dont_label(self):
 
1788
        sel = self._keys_overlap_names_dont().apply_labels()
 
1789
        eq_(
 
1790
            sel.c.keys(),
 
1791
            ['t1_x', 't2_x']
 
1792
        )
 
1793
        self._assert_result_keys(sel, ['t1_a', 't2_b'])