113
112
assert sel2.corresponding_column(keyed.c.coly) is sel2.c.coly
114
113
assert sel2.corresponding_column(keyed.c.z) is sel2.c.z
115
def test_keyed_label_gen(self):
116
s = select([keyed]).apply_labels()
118
assert s.corresponding_column(keyed.c.colx) is s.c.keyed_colx
119
assert s.corresponding_column(keyed.c.coly) is s.c.keyed_coly
120
assert s.corresponding_column(keyed.c.z) is s.c.keyed_z
123
assert sel2.corresponding_column(keyed.c.colx) is sel2.c.keyed_colx
124
assert sel2.corresponding_column(keyed.c.coly) is sel2.c.keyed_coly
125
assert sel2.corresponding_column(keyed.c.z) is sel2.c.keyed_z
127
def test_keyed_c_collection_upper(self):
128
c = Column('foo', Integer, key='bar')
129
t = Table('t', MetaData(), c)
132
def test_keyed_c_collection_lower(self):
138
def test_clone_c_proxy_key_upper(self):
139
c = Column('foo', Integer, key='bar')
140
t = Table('t', MetaData(), c)
141
s = select([t])._clone()
142
assert c in s.c.bar.proxy_set
144
def test_clone_c_proxy_key_lower(self):
148
s = select([t])._clone()
149
assert c in s.c.bar.proxy_set
151
def test_cloned_intersection(self):
152
t1 = table('t1', column('x'))
153
t2 = table('t2', column('x'))
165
expression._cloned_intersection(
166
[s1c1, s3c1], [s2c1, s1c2]
171
def test_cloned_difference(self):
172
t1 = table('t1', column('x'))
173
t2 = table('t2', column('x'))
186
expression._cloned_difference(
187
[s1c1, s2c1, s3c1], [s2c1, s1c2]
117
193
def test_distance_on_aliases(self):
118
194
a1 = table1.alias('a1')
569
658
Table('t1', MetaData(), c1)
570
659
eq_(c1._label, "t1_c1")
662
class RefreshForNewColTest(fixtures.TestBase):
663
def test_join_uninit(self):
664
a = table('a', column('x'))
665
b = table('b', column('y'))
666
j = a.join(b, a.c.x == b.c.y)
670
j._refresh_for_new_column(q)
673
def test_join_init(self):
674
a = table('a', column('x'))
675
b = table('b', column('y'))
676
j = a.join(b, a.c.x == b.c.y)
680
j._refresh_for_new_column(q)
684
def test_join_samename_init(self):
685
a = table('a', column('x'))
686
b = table('b', column('y'))
687
j = a.join(b, a.c.x == b.c.y)
691
j._refresh_for_new_column(q)
694
def test_select_samename_init(self):
695
a = table('a', column('x'))
696
b = table('b', column('y'))
697
s = select([a, b]).apply_labels()
701
s._refresh_for_new_column(q)
702
assert q in s.c.b_x.proxy_set
704
def test_aliased_select_samename_uninit(self):
705
a = table('a', column('x'))
706
b = table('b', column('y'))
707
s = select([a, b]).apply_labels().alias()
710
s._refresh_for_new_column(q)
711
assert q in s.c.b_x.proxy_set
713
def test_aliased_select_samename_init(self):
714
a = table('a', column('x'))
715
b = table('b', column('y'))
716
s = select([a, b]).apply_labels().alias()
720
s._refresh_for_new_column(q)
721
assert q in s.c.b_x.proxy_set
723
def test_aliased_select_irrelevant(self):
724
a = table('a', column('x'))
725
b = table('b', column('y'))
726
c = table('c', column('z'))
727
s = select([a, b]).apply_labels().alias()
731
s._refresh_for_new_column(q)
732
assert 'c_x' not in s.c
734
def test_aliased_select_no_cols_clause(self):
735
a = table('a', column('x'))
736
s = select([a.c.x]).apply_labels().alias()
740
s._refresh_for_new_column(q)
741
assert 'a_q' not in s.c
743
def test_union_uninit(self):
744
a = table('a', column('x'))
750
s3._refresh_for_new_column(q)
751
assert a.c.q in s3.c.q.proxy_set
753
def test_union_init_raises(self):
754
a = table('a', column('x'))
761
assert_raises_message(
763
"CompoundSelect constructs don't support addition of "
764
"columns to underlying selectables",
765
s3._refresh_for_new_column, q
767
def test_nested_join_uninit(self):
768
a = table('a', column('x'))
769
b = table('b', column('y'))
770
c = table('c', column('z'))
771
j = a.join(b, a.c.x == b.c.y).join(c, b.c.y == c.c.z)
775
j._refresh_for_new_column(q)
778
def test_nested_join_init(self):
779
a = table('a', column('x'))
780
b = table('b', column('y'))
781
c = table('c', column('z'))
782
j = a.join(b, a.c.x == b.c.y).join(c, b.c.y == c.c.z)
787
j._refresh_for_new_column(q)
572
790
class AnonLabelTest(fixtures.TestBase):
573
"""Test behaviors that we hope to change with [ticket:2168]."""
791
"""Test behaviors fixed by [ticket:2168]."""
575
793
def test_anon_labels_named_column(self):
579
assert c1.label(None) is c1
580
eq_(str(select([c1.label(None)])), "SELECT x")
796
assert c1.label(None) is not c1
797
eq_(str(select([c1.label(None)])), "SELECT x AS x_1")
582
799
def test_anon_labels_literal_column(self):
583
800
c1 = literal_column('x')
584
assert c1.label(None) is c1
585
eq_(str(select([c1.label(None)])), "SELECT x")
801
assert c1.label(None) is not c1
802
eq_(str(select([c1.label(None)])), "SELECT x AS x_1")
587
804
def test_anon_labels_func(self):
588
805
c1 = func.count('*')
606
823
def test_join_condition(self):
608
825
t1 = Table('t1', m, Column('id', Integer))
609
t2 = Table('t2', m, Column('id', Integer), Column('t1id',
610
ForeignKey('t1.id')))
611
t3 = Table('t3', m, Column('id', Integer), Column('t1id',
612
ForeignKey('t1.id')), Column('t2id',
613
ForeignKey('t2.id')))
614
t4 = Table('t4', m, Column('id', Integer), Column('t2id',
615
ForeignKey('t2.id')))
826
t2 = Table('t2', m, Column('id', Integer),
827
Column('t1id', ForeignKey('t1.id')))
829
Column('id', Integer),
830
Column('t1id', ForeignKey('t1.id')),
831
Column('t2id', ForeignKey('t2.id')))
832
t4 = Table('t4', m, Column('id', Integer),
833
Column('t2id', ForeignKey('t2.id')))
835
Column('t1id1', ForeignKey('t1.id')),
836
Column('t1id2', ForeignKey('t1.id')),
616
838
t1t2 = t1.join(t2)
617
839
t2t3 = t2.join(t3)
618
841
for (left, right, a_subset, expected) in [
619
842
(t1, t2, None, t1.c.id == t2.c.t1id),
620
843
(t1t2, t3, t2, t1t2.c.t2_id == t3.c.t2id),
644
870
als = t2t3.alias()
645
871
# test join's behavior, including natural
646
872
for left, right, expected in [
647
(t1, t2, t1.c.id==t2.c.t1id),
648
(t1t2, t3, t1t2.c.t2_id==t3.c.t2id),
649
(t2t3, t1, t1.c.id==t3.c.t1id),
650
(t2t3, t4, t2t3.c.t2_id==t4.c.t2id),
651
(t2t3, t4, t2t3.c.t2_id==t4.c.t2id),
652
(t2t3.join(t1), t4, t2t3.c.t2_id==t4.c.t2id),
653
(t2t3.join(t1), t4, t2t3.c.t2_id==t4.c.t2id),
654
(t1t2, als, t1t2.c.t2_id==als.c.t3_t2id)
873
(t1, t2, t1.c.id == t2.c.t1id),
874
(t1t2, t3, t1t2.c.t2_id == t3.c.t2id),
875
(t2t3, t1, t1.c.id == t3.c.t1id),
876
(t2t3, t4, t2t3.c.t2_id == t4.c.t2id),
877
(t2t3, t4, t2t3.c.t2_id == t4.c.t2id),
878
(t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id),
879
(t2t3.join(t1), t4, t2t3.c.t2_id == t4.c.t2id),
880
(t1t2, als, t1t2.c.t2_id == als.c.t3_t2id)
656
882
assert expected.compare(
657
883
left.join(right).onclause
684
910
Column('q', Integer, ForeignKey('t22.id')),
686
912
t2 = Table('t2', m, Column('id', Integer))
687
assert sql_util.join_condition(t1, t2).compare(t1.c.x==t2.c.id)
688
assert sql_util.join_condition(t2, t1).compare(t1.c.x==t2.c.id)
913
assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id)
914
assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id)
690
916
def test_join_cond_no_such_unrelated_column(self):
692
918
t1 = Table('t1', m, Column('x', Integer, ForeignKey('t2.id')),
693
919
Column('y', Integer, ForeignKey('t3.q')))
694
920
t2 = Table('t2', m, Column('id', Integer))
695
t3 = Table('t3', m, Column('id', Integer))
696
assert sql_util.join_condition(t1, t2).compare(t1.c.x==t2.c.id)
697
assert sql_util.join_condition(t2, t1).compare(t1.c.x==t2.c.id)
921
Table('t3', m, Column('id', Integer))
922
assert sql_util.join_condition(t1, t2).compare(t1.c.x == t2.c.id)
923
assert sql_util.join_condition(t2, t1).compare(t1.c.x == t2.c.id)
699
925
def test_join_cond_no_such_related_table(self):
876
1104
util.column_set([s.c.engineer_id, s.c.engineer_name,
877
1105
s.c.manager_id]))
1107
def test_reduce_generation(self):
1109
t1 = Table('t1', m, Column('x', Integer, primary_key=True),
1110
Column('y', Integer))
1111
t2 = Table('t2', m, Column('z', Integer, ForeignKey('t1.x')),
1112
Column('q', Integer))
1113
s1 = select([t1, t2])
1114
s2 = s1.reduce_columns(only_synonyms=False)
1116
set(s2.inner_columns),
1117
set([t1.c.x, t1.c.y, t2.c.q])
1120
s2 = s1.reduce_columns()
1122
set(s2.inner_columns),
1123
set([t1.c.x, t1.c.y, t2.c.z, t2.c.q])
1127
def test_reduce_only_synonym_fk(self):
1129
t1 = Table('t1', m, Column('x', Integer, primary_key=True),
1130
Column('y', Integer))
1131
t2 = Table('t2', m, Column('x', Integer, ForeignKey('t1.x')),
1132
Column('q', Integer, ForeignKey('t1.y')))
1133
s1 = select([t1, t2])
1134
s1 = s1.reduce_columns(only_synonyms=True)
1137
set([s1.c.x, s1.c.y, s1.c.q])
1140
def test_reduce_only_synonym_lineage(self):
1142
t1 = Table('t1', m, Column('x', Integer, primary_key=True),
1143
Column('y', Integer),
1144
Column('z', Integer)
1146
# test that the first appearance in the columns clause
1147
# wins - t1 is first, t1.c.x wins
1149
s2 = select([t1, s1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z)
1151
set(s2.reduce_columns().inner_columns),
1152
set([t1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z])
1155
# reverse order, s1.c.x wins
1157
s2 = select([s1, t1]).where(t1.c.x == s1.c.x).where(s1.c.y == t1.c.z)
1159
set(s2.reduce_columns().inner_columns),
1160
set([s1.c.x, t1.c.y, t1.c.z, s1.c.y, s1.c.z])
880
1163
def test_reduce_aliased_join(self):
881
1164
metadata = MetaData()
1166
1476
assert b2.left is not bin.left
1167
1477
assert b3.left is not b2.left is not bin.left
1168
1478
assert b4.left is bin.left # since column is immutable
1169
assert b4.right is not bin.right is not b2.right is not b3.right
1479
# deannotate copies the element
1480
assert bin.right is not b2.right is not b3.right is not b4.right
1171
1482
def test_annotate_unique_traversal(self):
1172
1483
"""test that items are copied only once during
1173
1484
annotate, deannotate traversal
1486
#2453 - however note this was modified by
1487
#1401, and it's likely that re49563072578
1488
is helping us with the str() comparison
1489
case now, as deannotate is making
1490
clones again in some cases.
1177
1492
table1 = table('table1', column('x'))
1178
1493
table2 = table('table2', column('y'))
1179
1494
a1 = table1.alias()
1180
1495
s = select([a1.c.x]).select_from(
1181
a1.join(table2, a1.c.x==table2.c.y)
1496
a1.join(table2, a1.c.x == table2.c.y)
1185
1499
sql_util._deep_deannotate(s),
1186
sql_util._deep_annotate(s, {'foo':'bar'}),
1187
1500
visitors.cloned_traverse(s, {}, {}),
1188
visitors.replacement_traverse(s, {}, lambda x:None)
1501
visitors.replacement_traverse(s, {}, lambda x: None)
1190
1503
# the columns clause isn't changed at all
1191
1504
assert sel._raw_columns[0].table is a1
1192
# the from objects are internally consistent,
1193
# i.e. the Alias at position 0 is the same
1194
# Alias in the Join object in position 1
1195
1505
assert sel._froms[0] is sel._froms[1].left
1196
eq_(str(s), str(sel))
1507
eq_(str(s), str(sel))
1509
# when we are modifying annotations sets only
1510
# partially, each element is copied unconditionally
1513
sql_util._deep_deannotate(s, {"foo": "bar"}),
1514
sql_util._deep_annotate(s, {'foo': 'bar'}),
1516
assert sel._froms[0] is not sel._froms[1].left
1518
# but things still work out due to
1520
eq_(str(s), str(sel))
1523
def test_annotate_varied_annot_same_col(self):
1524
"""test two instances of the same column with different annotations
1525
preserving them when deep_annotate is run on them.
1528
t1 = table('table1', column("col1"), column("col2"))
1529
s = select([t1.c.col1._annotate({"foo":"bar"})])
1530
s2 = select([t1.c.col1._annotate({"bat":"hoho"})])
1532
sel = sql_util._deep_annotate(s3, {"new": "thing"})
1535
sel.selects[0]._raw_columns[0]._annotations,
1536
{"foo": "bar", "new": "thing"}
1540
sel.selects[1]._raw_columns[0]._annotations,
1541
{"bat": "hoho", "new": "thing"}
1544
def test_deannotate_2(self):
1545
table1 = table('table1', column("col1"), column("col2"))
1546
j = table1.c.col1._annotate({"remote": True}) == \
1547
table1.c.col2._annotate({"local": True})
1548
j2 = sql_util._deep_deannotate(j)
1550
j.left._annotations, {"remote": True}
1553
j2.left._annotations, {}
1556
def test_deannotate_3(self):
1557
table1 = table('table1', column("col1"), column("col2"),
1558
column("col3"), column("col4"))
1560
table1.c.col1._annotate({"remote": True}) ==
1561
table1.c.col2._annotate({"local": True}),
1562
table1.c.col3._annotate({"remote": True}) ==
1563
table1.c.col4._annotate({"local": True})
1565
j2 = sql_util._deep_deannotate(j)
1567
j.clauses[0].left._annotations, {"remote": True}
1570
j2.clauses[0].left._annotations, {}
1198
1573
def test_annotate_fromlist_preservation(self):
1199
1574
"""test the FROM list in select still works
1225
1600
def test_bind_unique_test(self):
1226
t1 = table('t', column('a'), column('b'))
1601
table('t', column('a'), column('b'))
1228
1603
b = bindparam("bind", value="x", unique=True)
1230
1605
# the annotation of "b" should render the
1231
1606
# same. The "unique" test in compiler should
1232
1607
# also pass, [ticket:2425]
1233
eq_(str(or_(b, b._annotate({"foo":"bar"}))),
1608
eq_(str(or_(b, b._annotate({"foo": "bar"}))),
1234
1609
":bind_1 OR :bind_1")
1611
def test_comparators_cleaned_out_construction(self):
1614
comp1 = c.comparator
1616
c1 = c._annotate({"foo": "bar"})
1617
comp2 = c1.comparator
1618
assert comp1 is not comp2
1620
def test_comparators_cleaned_out_reannotate(self):
1623
c1 = c._annotate({"foo": "bar"})
1624
comp1 = c1.comparator
1626
c2 = c1._annotate({"bat": "hoho"})
1627
comp2 = c2.comparator
1629
assert comp1 is not comp2
1631
def test_comparator_cleanout_integration(self):
1634
c1 = c._annotate({"foo": "bar"})
1635
comp1 = c1.comparator
1637
c2 = c1._annotate({"bat": "hoho"})
1638
comp2 = c2.comparator
1640
assert (c2 == 5).left._annotations == {"foo": "bar", "bat": "hoho"}
1642
class WithLabelsTest(fixtures.TestBase):
1643
def _assert_labels_warning(self, s):
1644
assert_raises_message(
1646
r"replaced by Column.*, which has the same key",
1650
def _assert_result_keys(self, s, keys):
1651
compiled = s.compile()
1652
eq_(set(compiled.result_map), set(keys))
1654
def _assert_subq_result_keys(self, s, keys):
1655
compiled = s.select().compile()
1656
eq_(set(compiled.result_map), set(keys))
1658
def _names_overlap(self):
1660
t1 = Table('t1', m, Column('x', Integer))
1661
t2 = Table('t2', m, Column('x', Integer))
1662
return select([t1, t2])
1664
def test_names_overlap_nolabel(self):
1665
sel = self._names_overlap()
1666
self._assert_labels_warning(sel)
1667
self._assert_result_keys(sel, ['x'])
1669
def test_names_overlap_label(self):
1670
sel = self._names_overlap().apply_labels()
1675
self._assert_result_keys(sel, ['t1_x', 't2_x'])
1677
def _names_overlap_keys_dont(self):
1679
t1 = Table('t1', m, Column('x', Integer, key='a'))
1680
t2 = Table('t2', m, Column('x', Integer, key='b'))
1681
return select([t1, t2])
1683
def test_names_overlap_keys_dont_nolabel(self):
1684
sel = self._names_overlap_keys_dont()
1689
self._assert_result_keys(sel, ['x'])
1691
def test_names_overlap_keys_dont_label(self):
1692
sel = self._names_overlap_keys_dont().apply_labels()
1697
self._assert_result_keys(sel, ['t1_x', 't2_x'])
1699
def _labels_overlap(self):
1701
t1 = Table('t', m, Column('x_id', Integer))
1702
t2 = Table('t_x', m, Column('id', Integer))
1703
return select([t1, t2])
1705
def test_labels_overlap_nolabel(self):
1706
sel = self._labels_overlap()
1711
self._assert_result_keys(sel, ['x_id', 'id'])
1713
def test_labels_overlap_label(self):
1714
sel = self._labels_overlap().apply_labels()
1718
['t_x_id', t2.c.id.anon_label]
1720
self._assert_result_keys(sel, ['t_x_id', 'id_1'])
1721
self._assert_subq_result_keys(sel, ['t_x_id', 'id_1'])
1723
def _labels_overlap_keylabels_dont(self):
1725
t1 = Table('t', m, Column('x_id', Integer, key='a'))
1726
t2 = Table('t_x', m, Column('id', Integer, key='b'))
1727
return select([t1, t2])
1729
def test_labels_overlap_keylabels_dont_nolabel(self):
1730
sel = self._labels_overlap_keylabels_dont()
1731
eq_(sel.c.keys(), ['a', 'b'])
1732
self._assert_result_keys(sel, ['x_id', 'id'])
1734
def test_labels_overlap_keylabels_dont_label(self):
1735
sel = self._labels_overlap_keylabels_dont().apply_labels()
1736
eq_(sel.c.keys(), ['t_a', 't_x_b'])
1737
self._assert_result_keys(sel, ['t_x_id', 'id_1'])
1739
def _keylabels_overlap_labels_dont(self):
1741
t1 = Table('t', m, Column('a', Integer, key='x_id'))
1742
t2 = Table('t_x', m, Column('b', Integer, key='id'))
1743
return select([t1, t2])
1745
def test_keylabels_overlap_labels_dont_nolabel(self):
1746
sel = self._keylabels_overlap_labels_dont()
1747
eq_(sel.c.keys(), ['x_id', 'id'])
1748
self._assert_result_keys(sel, ['a', 'b'])
1750
def test_keylabels_overlap_labels_dont_label(self):
1751
sel = self._keylabels_overlap_labels_dont().apply_labels()
1753
eq_(sel.c.keys(), ['t_x_id', t2.c.id.anon_label])
1754
self._assert_result_keys(sel, ['t_a', 't_x_b'])
1755
self._assert_subq_result_keys(sel, ['t_a', 't_x_b'])
1757
def _keylabels_overlap_labels_overlap(self):
1759
t1 = Table('t', m, Column('x_id', Integer, key='x_a'))
1760
t2 = Table('t_x', m, Column('id', Integer, key='a'))
1761
return select([t1, t2])
1763
def test_keylabels_overlap_labels_overlap_nolabel(self):
1764
sel = self._keylabels_overlap_labels_overlap()
1765
eq_(sel.c.keys(), ['x_a', 'a'])
1766
self._assert_result_keys(sel, ['x_id', 'id'])
1767
self._assert_subq_result_keys(sel, ['x_id', 'id'])
1769
def test_keylabels_overlap_labels_overlap_label(self):
1770
sel = self._keylabels_overlap_labels_overlap().apply_labels()
1772
eq_(sel.c.keys(), ['t_x_a', t2.c.a.anon_label])
1773
self._assert_result_keys(sel, ['t_x_id', 'id_1'])
1774
self._assert_subq_result_keys(sel, ['t_x_id', 'id_1'])
1776
def _keys_overlap_names_dont(self):
1778
t1 = Table('t1', m, Column('a', Integer, key='x'))
1779
t2 = Table('t2', m, Column('b', Integer, key='x'))
1780
return select([t1, t2])
1782
def test_keys_overlap_names_dont_nolabel(self):
1783
sel = self._keys_overlap_names_dont()
1784
self._assert_labels_warning(sel)
1785
self._assert_result_keys(sel, ['a', 'b'])
1787
def test_keys_overlap_names_dont_label(self):
1788
sel = self._keys_overlap_names_dont().apply_labels()
1793
self._assert_result_keys(sel, ['t1_a', 't2_b'])