807
827
"SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable"
810
def test_operators(self):
811
for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'),
814
#(operator.truediv, '/'),
819
for (lhs, rhs, res) in (
820
(5, table1.c.myid, ':myid_1 %s mytable.myid'),
821
(5, literal(5), ':param_1 %s :param_2'),
822
(table1.c.myid, 'b', 'mytable.myid %s :myid_1'),
823
(table1.c.myid, literal(2.7), 'mytable.myid %s :param_1'),
824
(table1.c.myid, table1.c.myid, 'mytable.myid %s mytable.myid'),
825
(literal(5), 8, ':param_1 %s :param_2'),
826
(literal(6), table1.c.myid, ':param_1 %s mytable.myid'),
827
(literal(7), literal(5.5), ':param_1 %s :param_2'),
829
self.assert_compile(py_op(lhs, rhs), res % sql_op)
831
dt = datetime.datetime.today()
832
# exercise comparison operators
833
for (py_op, fwd_op, rev_op) in ((operator.lt, '<', '>'),
834
(operator.gt, '>', '<'),
835
(operator.eq, '=', '='),
836
(operator.ne, '!=', '!='),
837
(operator.le, '<=', '>='),
838
(operator.ge, '>=', '<=')):
839
for (lhs, rhs, l_sql, r_sql) in (
840
('a', table1.c.myid, ':myid_1', 'mytable.myid'),
841
('a', literal('b'), ':param_2', ':param_1'), # note swap!
842
(table1.c.myid, 'b', 'mytable.myid', ':myid_1'),
843
(table1.c.myid, literal('b'), 'mytable.myid', ':param_1'),
844
(table1.c.myid, table1.c.myid, 'mytable.myid', 'mytable.myid'),
845
(literal('a'), 'b', ':param_1', ':param_2'),
846
(literal('a'), table1.c.myid, ':param_1', 'mytable.myid'),
847
(literal('a'), literal('b'), ':param_1', ':param_2'),
848
(dt, literal('b'), ':param_2', ':param_1'),
849
(literal('b'), dt, ':param_1', ':param_2'),
852
# the compiled clause should match either (e.g.):
853
# 'a' < 'b' -or- 'b' > 'a'.
854
compiled = str(py_op(lhs, rhs))
855
fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql)
856
rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql)
858
self.assert_(compiled == fwd_sql or compiled == rev_sql,
859
"\n'" + compiled + "'\n does not match\n'" +
860
fwd_sql + "'\n or\n'" + rev_sql + "'")
864
(operator.inv, 'NOT '),
867
(table1.c.myid, "mytable.myid"),
868
(literal("foo"), ":param_1"),
871
compiled = str(py_op(expr))
872
sql = "%s%s" % (op, sql)
876
table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')),
877
"SELECT mytable.myid, mytable.name, mytable.description FROM "
878
"mytable WHERE mytable.myid != :myid_1 AND mytable.name != :name_1"
882
table1.select((table1.c.myid != 12) &
883
~(table1.c.name.between('jack','john'))),
884
"SELECT mytable.myid, mytable.name, mytable.description FROM "
885
"mytable WHERE mytable.myid != :myid_1 AND "\
886
"NOT (mytable.name BETWEEN :name_1 AND :name_2)"
890
table1.select((table1.c.myid != 12) &
891
~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')),
892
"SELECT mytable.myid, mytable.name, mytable.description FROM "
893
"mytable WHERE mytable.myid != :myid_1 AND "\
894
"NOT (mytable.name = :name_1 AND mytable.name = :name_2 "
895
"AND mytable.name = :name_3)"
899
table1.select((table1.c.myid != 12) & ~table1.c.name),
900
"SELECT mytable.myid, mytable.name, mytable.description FROM "
901
"mytable WHERE mytable.myid != :myid_1 AND NOT mytable.name"
905
literal("a") + literal("b") * literal("c"), ":param_1 || :param_2 * :param_3"
908
# test the op() function, also that its results are further usable in expressions
910
table1.select(table1.c.myid.op('hoho')(12)==14),
911
"SELECT mytable.myid, mytable.name, mytable.description FROM "
912
"mytable WHERE (mytable.myid hoho :myid_1) = :param_1"
915
# test that clauses can be pickled (operators need to be module-level, etc.)
916
clause = (table1.c.myid == 12) & table1.c.myid.between(15, 20) & \
917
table1.c.myid.like('hoho')
918
assert str(clause) == str(util.pickle.loads(util.pickle.dumps(clause)))
922
for expr, check, dialect in [
924
table1.c.myid.like('somstr'),
925
"mytable.myid LIKE :myid_1", None),
927
~table1.c.myid.like('somstr'),
928
"mytable.myid NOT LIKE :myid_1", None),
930
table1.c.myid.like('somstr', escape='\\'),
931
"mytable.myid LIKE :myid_1 ESCAPE '\\'",
934
~table1.c.myid.like('somstr', escape='\\'),
935
"mytable.myid NOT LIKE :myid_1 ESCAPE '\\'",
938
table1.c.myid.ilike('somstr', escape='\\'),
939
"lower(mytable.myid) LIKE lower(:myid_1) ESCAPE '\\'",
942
~table1.c.myid.ilike('somstr', escape='\\'),
943
"lower(mytable.myid) NOT LIKE lower(:myid_1) ESCAPE '\\'",
946
table1.c.myid.ilike('somstr', escape='\\'),
947
"mytable.myid ILIKE %(myid_1)s ESCAPE '\\\\'",
948
postgresql.PGDialect()),
950
~table1.c.myid.ilike('somstr', escape='\\'),
951
"mytable.myid NOT ILIKE %(myid_1)s ESCAPE '\\\\'",
952
postgresql.PGDialect()),
954
table1.c.name.ilike('%something%'),
955
"lower(mytable.name) LIKE lower(:name_1)", None),
957
table1.c.name.ilike('%something%'),
958
"mytable.name ILIKE %(name_1)s", postgresql.PGDialect()),
960
~table1.c.name.ilike('%something%'),
961
"lower(mytable.name) NOT LIKE lower(:name_1)", None),
963
~table1.c.name.ilike('%something%'),
964
"mytable.name NOT ILIKE %(name_1)s",
965
postgresql.PGDialect()),
967
self.assert_compile(expr, check, dialect=dialect)
969
def test_match(self):
970
for expr, check, dialect in [
971
(table1.c.myid.match('somstr'),
972
"mytable.myid MATCH ?", sqlite.SQLiteDialect()),
973
(table1.c.myid.match('somstr'),
974
"MATCH (mytable.myid) AGAINST (%s IN BOOLEAN MODE)",
976
(table1.c.myid.match('somstr'),
977
"CONTAINS (mytable.myid, :myid_1)",
979
(table1.c.myid.match('somstr'),
980
"mytable.myid @@ to_tsquery(%(myid_1)s)",
981
postgresql.dialect()),
982
(table1.c.myid.match('somstr'),
983
"CONTAINS (mytable.myid, :myid_1)",
986
self.assert_compile(expr, check, dialect=dialect)
988
def test_composed_string_comparators(self):
990
table1.c.name.contains('jo'),
991
"mytable.name LIKE '%%' || :name_1 || '%%'" ,
992
checkparams = {'name_1': u'jo'},
995
table1.c.name.contains('jo'),
996
"mytable.name LIKE concat(concat('%%', %s), '%%')" ,
997
checkparams = {'name_1': u'jo'},
998
dialect=mysql.dialect()
1000
self.assert_compile(
1001
table1.c.name.contains('jo', escape='\\'),
1002
"mytable.name LIKE '%%' || :name_1 || '%%' ESCAPE '\\'" ,
1003
checkparams = {'name_1': u'jo'},
1005
self.assert_compile(
1006
table1.c.name.startswith('jo', escape='\\'),
1007
"mytable.name LIKE :name_1 || '%%' ESCAPE '\\'" )
1008
self.assert_compile(
1009
table1.c.name.endswith('jo', escape='\\'),
1010
"mytable.name LIKE '%%' || :name_1 ESCAPE '\\'" )
1011
self.assert_compile(
1012
table1.c.name.endswith('hn'),
1013
"mytable.name LIKE '%%' || :name_1",
1014
checkparams = {'name_1': u'hn'}, )
1015
self.assert_compile(
1016
table1.c.name.endswith('hn'),
1017
"mytable.name LIKE concat('%%', %s)",
1018
checkparams = {'name_1': u'hn'}, dialect=mysql.dialect()
1020
self.assert_compile(
1021
table1.c.name.startswith(u"hi \xf6 \xf5"),
1022
"mytable.name LIKE :name_1 || '%%'",
1023
checkparams = {'name_1': u'hi \xf6 \xf5'},
1025
self.assert_compile(
1026
column('name').endswith(text("'foo'")),
1027
"name LIKE '%%' || 'foo'" )
1028
self.assert_compile(
1029
column('name').endswith(literal_column("'foo'")),
1030
"name LIKE '%%' || 'foo'" )
1031
self.assert_compile(
1032
column('name').startswith(text("'foo'")),
1033
"name LIKE 'foo' || '%%'" )
1034
self.assert_compile(
1035
column('name').startswith(text("'foo'")),
1036
"name LIKE concat('foo', '%%')", dialect=mysql.dialect())
1037
self.assert_compile(
1038
column('name').startswith(literal_column("'foo'")),
1039
"name LIKE 'foo' || '%%'" )
1040
self.assert_compile(
1041
column('name').startswith(literal_column("'foo'")),
1042
"name LIKE concat('foo', '%%')", dialect=mysql.dialect())
1044
832
def test_multiple_col_binds(self):
1045
833
self.assert_compile(
1046
select(["*"], or_(table1.c.myid == 12, table1.c.myid=='asdf',
834
select(["*"], or_(table1.c.myid == 12, table1.c.myid == 'asdf',
1047
835
table1.c.myid == 'foo')),
1048
836
"SELECT * FROM mytable WHERE mytable.myid = :myid_1 "
1049
837
"OR mytable.myid = :myid_2 OR mytable.myid = :myid_3"
1052
840
def test_order_by_nulls(self):
1053
841
self.assert_compile(
1054
table2.select(order_by = [table2.c.otherid, table2.c.othername.desc().nullsfirst()]),
1055
"SELECT myothertable.otherid, myothertable.othername FROM "
1056
"myothertable ORDER BY myothertable.otherid, myothertable.othername DESC NULLS FIRST"
1059
self.assert_compile(
1060
table2.select(order_by = [table2.c.otherid, table2.c.othername.desc().nullslast()]),
1061
"SELECT myothertable.otherid, myothertable.othername FROM "
1062
"myothertable ORDER BY myothertable.otherid, myothertable.othername DESC NULLS LAST"
1065
self.assert_compile(
1066
table2.select(order_by = [table2.c.otherid.nullslast(), table2.c.othername.desc().nullsfirst()]),
1067
"SELECT myothertable.otherid, myothertable.othername FROM "
1068
"myothertable ORDER BY myothertable.otherid NULLS LAST, myothertable.othername DESC NULLS FIRST"
1071
self.assert_compile(
1072
table2.select(order_by = [table2.c.otherid.nullsfirst(), table2.c.othername.desc()]),
1073
"SELECT myothertable.otherid, myothertable.othername FROM "
1074
"myothertable ORDER BY myothertable.otherid NULLS FIRST, myothertable.othername DESC"
1077
self.assert_compile(
1078
table2.select(order_by = [table2.c.otherid.nullsfirst(), table2.c.othername.desc().nullslast()]),
1079
"SELECT myothertable.otherid, myothertable.othername FROM "
1080
"myothertable ORDER BY myothertable.otherid NULLS FIRST, myothertable.othername DESC NULLS LAST"
842
table2.select(order_by=[table2.c.otherid,
843
table2.c.othername.desc().nullsfirst()]),
844
"SELECT myothertable.otherid, myothertable.othername FROM "
845
"myothertable ORDER BY myothertable.otherid, "
846
"myothertable.othername DESC NULLS FIRST"
850
table2.select(order_by=[
851
table2.c.otherid, table2.c.othername.desc().nullslast()]),
852
"SELECT myothertable.otherid, myothertable.othername FROM "
853
"myothertable ORDER BY myothertable.otherid, "
854
"myothertable.othername DESC NULLS LAST"
858
table2.select(order_by=[
859
table2.c.otherid.nullslast(),
860
table2.c.othername.desc().nullsfirst()]),
861
"SELECT myothertable.otherid, myothertable.othername FROM "
862
"myothertable ORDER BY myothertable.otherid NULLS LAST, "
863
"myothertable.othername DESC NULLS FIRST"
867
table2.select(order_by=[table2.c.otherid.nullsfirst(),
868
table2.c.othername.desc()]),
869
"SELECT myothertable.otherid, myothertable.othername FROM "
870
"myothertable ORDER BY myothertable.otherid NULLS FIRST, "
871
"myothertable.othername DESC"
875
table2.select(order_by=[table2.c.otherid.nullsfirst(),
876
table2.c.othername.desc().nullslast()]),
877
"SELECT myothertable.otherid, myothertable.othername FROM "
878
"myothertable ORDER BY myothertable.otherid NULLS FIRST, "
879
"myothertable.othername DESC NULLS LAST"
1083
882
def test_orderby_groupby(self):
1084
883
self.assert_compile(
1085
table2.select(order_by = [table2.c.otherid, asc(table2.c.othername)]),
884
table2.select(order_by=[table2.c.otherid,
885
asc(table2.c.othername)]),
1086
886
"SELECT myothertable.otherid, myothertable.othername FROM "
1087
"myothertable ORDER BY myothertable.otherid, myothertable.othername ASC"
887
"myothertable ORDER BY myothertable.otherid, "
888
"myothertable.othername ASC"
1090
891
self.assert_compile(
1091
table2.select(order_by = [table2.c.otherid, table2.c.othername.desc()]),
892
table2.select(order_by=[table2.c.otherid,
893
table2.c.othername.desc()]),
1092
894
"SELECT myothertable.otherid, myothertable.othername FROM "
1093
"myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
895
"myothertable ORDER BY myothertable.otherid, "
896
"myothertable.othername DESC"
1096
899
# generative order_by
1097
900
self.assert_compile(
1098
table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()),
901
table2.select().order_by(table2.c.otherid).\
902
order_by(table2.c.othername.desc()),
1099
903
"SELECT myothertable.otherid, myothertable.othername FROM "
1100
"myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
904
"myothertable ORDER BY myothertable.otherid, "
905
"myothertable.othername DESC"
1103
908
self.assert_compile(
1104
909
table2.select().order_by(table2.c.otherid).
1105
order_by(table2.c.othername.desc()).order_by(None),
1106
"SELECT myothertable.otherid, myothertable.othername FROM myothertable"
910
order_by(table2.c.othername.desc()
912
"SELECT myothertable.otherid, myothertable.othername "
1109
916
self.assert_compile(
1111
918
[table2.c.othername, func.count(table2.c.otherid)],
1112
group_by = [table2.c.othername]),
1113
"SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
919
group_by=[table2.c.othername]),
920
"SELECT myothertable.othername, "
921
"count(myothertable.otherid) AS count_1 "
1114
922
"FROM myothertable GROUP BY myothertable.othername"
1578
1441
table3, table1.c.myid == table3.c.userid)]
1580
1443
"SELECT mytable.myid, mytable.name, mytable.description, "
1581
"myothertable.otherid, myothertable.othername, thirdtable.userid, "
1582
"thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid ="
1583
" myothertable.otherid JOIN thirdtable ON mytable.myid = thirdtable.userid"
1444
"myothertable.otherid, myothertable.othername, "
1445
"thirdtable.userid, "
1446
"thirdtable.otherstuff FROM mytable JOIN myothertable "
1448
" myothertable.otherid JOIN thirdtable ON "
1449
"mytable.myid = thirdtable.userid"
1586
1452
self.assert_compile(
1587
join(users, addresses, users.c.user_id==addresses.c.user_id).select(),
1453
join(users, addresses, users.c.user_id ==
1454
addresses.c.user_id).select(),
1588
1455
"SELECT users.user_id, users.user_name, users.password, "
1589
1456
"addresses.address_id, addresses.user_id, addresses.street, "
1590
"addresses.city, addresses.state, addresses.zip FROM users JOIN addresses "
1457
"addresses.city, addresses.state, addresses.zip "
1458
"FROM users JOIN addresses "
1591
1459
"ON users.user_id = addresses.user_id"
1594
1462
self.assert_compile(
1595
1463
select([table1, table2, table3],
1597
from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid).
1598
outerjoin(table3, table1.c.myid==table3.c.userid)]
1600
,"SELECT mytable.myid, mytable.name, mytable.description, "
1601
"myothertable.otherid, myothertable.othername, thirdtable.userid,"
1602
" thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid "
1603
"= myothertable.otherid LEFT OUTER JOIN thirdtable ON mytable.myid ="
1465
from_obj=[join(table1, table2,
1466
table1.c.myid == table2.c.otherid).
1468
table1.c.myid == table3.c.userid)]
1470
"SELECT mytable.myid, mytable.name, mytable.description, "
1471
"myothertable.otherid, myothertable.othername, "
1472
"thirdtable.userid,"
1473
" thirdtable.otherstuff FROM mytable "
1474
"JOIN myothertable ON mytable.myid "
1475
"= myothertable.otherid LEFT OUTER JOIN thirdtable "
1604
1477
" thirdtable.userid"
1606
1479
self.assert_compile(
1607
1480
select([table1, table2, table3],
1608
from_obj = [outerjoin(table1,
1609
join(table2, table3, table2.c.otherid == table3.c.userid),
1610
table1.c.myid==table2.c.otherid)]
1612
,"SELECT mytable.myid, mytable.name, mytable.description, "
1613
"myothertable.otherid, myothertable.othername, thirdtable.userid,"
1614
" thirdtable.otherstuff FROM mytable LEFT OUTER JOIN (myothertable "
1615
"JOIN thirdtable ON myothertable.otherid = thirdtable.userid) ON "
1481
from_obj=[outerjoin(table1,
1482
join(table2, table3, table2.c.otherid
1483
== table3.c.userid),
1484
table1.c.myid == table2.c.otherid)]
1486
"SELECT mytable.myid, mytable.name, mytable.description, "
1487
"myothertable.otherid, myothertable.othername, "
1488
"thirdtable.userid,"
1489
" thirdtable.otherstuff FROM mytable LEFT OUTER JOIN "
1491
"JOIN thirdtable ON myothertable.otherid = "
1492
"thirdtable.userid) ON "
1616
1493
"mytable.myid = myothertable.otherid"
1866
1764
{'mytablename':5}, {'mytablename':5}, [5]
1869
select([table1], or_(table1.c.myid==bindparam('myid'),
1870
table2.c.otherid==bindparam('myid'))),
1767
select([table1], or_(table1.c.myid == bindparam('myid'),
1768
table2.c.otherid == bindparam('myid'))),
1871
1769
"SELECT mytable.myid, mytable.name, mytable.description "
1872
1770
"FROM mytable, myothertable WHERE mytable.myid = :myid "
1873
1771
"OR myothertable.otherid = :myid",
1874
1772
"SELECT mytable.myid, mytable.name, mytable.description "
1875
1773
"FROM mytable, myothertable WHERE mytable.myid = ? "
1876
1774
"OR myothertable.otherid = ?",
1877
{'myid':None}, [None, None],
1878
{'myid':5}, {'myid':5}, [5,5]
1775
{'myid': None}, [None, None],
1776
{'myid': 5}, {'myid': 5}, [5, 5]
1881
1779
text("SELECT mytable.myid, mytable.name, mytable.description FROM "
1882
"mytable, myothertable WHERE mytable.myid = :myid OR "
1883
"myothertable.otherid = :myid"),
1884
"SELECT mytable.myid, mytable.name, mytable.description FROM "
1885
"mytable, myothertable WHERE mytable.myid = :myid OR "
1886
"myothertable.otherid = :myid",
1887
"SELECT mytable.myid, mytable.name, mytable.description FROM "
1888
"mytable, myothertable WHERE mytable.myid = ? OR "
1889
"myothertable.otherid = ?",
1780
"mytable, myothertable WHERE mytable.myid = :myid OR "
1781
"myothertable.otherid = :myid"),
1782
"SELECT mytable.myid, mytable.name, mytable.description FROM "
1783
"mytable, myothertable WHERE mytable.myid = :myid OR "
1784
"myothertable.otherid = :myid",
1785
"SELECT mytable.myid, mytable.name, mytable.description FROM "
1786
"mytable, myothertable WHERE mytable.myid = ? OR "
1787
"myothertable.otherid = ?",
1890
1788
{'myid':None}, [None, None],
1891
{'myid':5}, {'myid':5}, [5,5]
1789
{'myid': 5}, {'myid': 5}, [5, 5]
1894
select([table1], or_(table1.c.myid==bindparam('myid', unique=True),
1895
table2.c.otherid==bindparam('myid', unique=True))),
1792
select([table1], or_(table1.c.myid ==
1793
bindparam('myid', unique=True),
1795
bindparam('myid', unique=True))),
1896
1796
"SELECT mytable.myid, mytable.name, mytable.description FROM "
1897
1797
"mytable, myothertable WHERE mytable.myid = "
1898
1798
":myid_1 OR myothertable.otherid = :myid_2",
1919
1821
"SELECT mytable.myid, mytable.name, mytable.description FROM "
1920
1822
"mytable, myothertable WHERE mytable.myid = "
1921
1823
"? OR myothertable.otherid = ?",
1922
{'myid':8, 'myotherid':7}, [8, 7],
1923
{'myid':5}, {'myid':5, 'myotherid':7}, [5,7]
1824
{'myid': 8, 'myotherid': 7}, [8, 7],
1825
{'myid': 5}, {'myid': 5, 'myotherid': 7}, [5, 7]
1926
select([table1], or_(table1.c.myid==bindparam('myid', value=7, unique=True),
1927
table2.c.otherid==bindparam('myid', value=8, unique=True))),
1828
select([table1], or_(table1.c.myid ==
1829
bindparam('myid', value=7, unique=True),
1831
bindparam('myid', value=8, unique=True))),
1928
1832
"SELECT mytable.myid, mytable.name, mytable.description FROM "
1929
1833
"mytable, myothertable WHERE mytable.myid = "
1930
1834
":myid_1 OR myothertable.otherid = :myid_2",
1931
1835
"SELECT mytable.myid, mytable.name, mytable.description FROM "
1932
1836
"mytable, myothertable WHERE mytable.myid = "
1933
1837
"? OR myothertable.otherid = ?",
1934
{'myid_1':7, 'myid_2':8}, [7,8],
1935
{'myid_1':5, 'myid_2':6}, {'myid_1':5, 'myid_2':6}, [5,6]
1838
{'myid_1': 7, 'myid_2': 8}, [7, 8],
1839
{'myid_1': 5, 'myid_2': 6}, {'myid_1': 5, 'myid_2': 6}, [5, 6]
1939
self.assert_compile(stmt, expected_named_stmt, params=expected_default_params_dict)
1940
self.assert_compile(stmt, expected_positional_stmt, dialect=sqlite.dialect())
1843
self.assert_compile(stmt, expected_named_stmt,
1844
params=expected_default_params_dict)
1845
self.assert_compile(stmt, expected_positional_stmt,
1846
dialect=sqlite.dialect())
1941
1847
nonpositional = stmt.compile()
1942
1848
positional = stmt.compile(dialect=sqlite.dialect())
1943
1849
pp = positional.params
1944
assert [pp[k] for k in positional.positiontup] == expected_default_params_list
1945
assert nonpositional.construct_params(test_param_dict) == expected_test_params_dict, \
1946
"expected :%s got %s" % (str(expected_test_params_dict), \
1947
str(nonpositional.get_params(**test_param_dict)))
1850
eq_([pp[k] for k in positional.positiontup],
1851
expected_default_params_list)
1853
eq_(nonpositional.construct_params(test_param_dict),
1854
expected_test_params_dict)
1948
1855
pp = positional.construct_params(test_param_dict)
1949
assert [pp[k] for k in positional.positiontup] == expected_test_params_list
1857
[pp[k] for k in positional.positiontup],
1858
expected_test_params_list
1951
1861
# check that params() doesnt modify original statement
1952
s = select([table1], or_(table1.c.myid==bindparam('myid'),
1953
table2.c.otherid==bindparam('myotherid')))
1954
s2 = s.params({'myid':8, 'myotherid':7})
1955
s3 = s2.params({'myid':9})
1956
assert s.compile().params == {'myid':None, 'myotherid':None}
1957
assert s2.compile().params == {'myid':8, 'myotherid':7}
1958
assert s3.compile().params == {'myid':9, 'myotherid':7}
1862
s = select([table1], or_(table1.c.myid == bindparam('myid'),
1864
bindparam('myotherid')))
1865
s2 = s.params({'myid': 8, 'myotherid': 7})
1866
s3 = s2.params({'myid': 9})
1867
assert s.compile().params == {'myid': None, 'myotherid': None}
1868
assert s2.compile().params == {'myid': 8, 'myotherid': 7}
1869
assert s3.compile().params == {'myid': 9, 'myotherid': 7}
1960
1871
# test using same 'unique' param object twice in one compile
1961
s = select([table1.c.myid]).where(table1.c.myid==12).as_scalar()
1962
s2 = select([table1, s], table1.c.myid==s)
1872
s = select([table1.c.myid]).where(table1.c.myid == 12).as_scalar()
1873
s2 = select([table1, s], table1.c.myid == s)
1963
1874
self.assert_compile(s2,
1964
1875
"SELECT mytable.myid, mytable.name, mytable.description, "
1965
1876
"(SELECT mytable.myid FROM mytable WHERE mytable.myid = "\
2053
1964
r"A value is required for bind parameter 'x', "
2054
1965
"in parameter group 2",
2055
1966
select([table1]).where(
2056
table1.c.myid==bindparam("x", required=True)
1967
table1.c.myid == bindparam("x", required=True)
2057
1968
).compile().construct_params,
2058
1969
_group_number=2
2063
@testing.emits_warning('.*empty sequence.*')
2065
self.assert_compile(table1.c.myid.in_(['a']),
2066
"mytable.myid IN (:myid_1)")
2068
self.assert_compile(~table1.c.myid.in_(['a']),
2069
"mytable.myid NOT IN (:myid_1)")
2071
self.assert_compile(table1.c.myid.in_(['a', 'b']),
2072
"mytable.myid IN (:myid_1, :myid_2)")
2074
self.assert_compile(table1.c.myid.in_(iter(['a', 'b'])),
2075
"mytable.myid IN (:myid_1, :myid_2)")
2077
self.assert_compile(table1.c.myid.in_([literal('a')]),
2078
"mytable.myid IN (:param_1)")
2080
self.assert_compile(table1.c.myid.in_([literal('a'), 'b']),
2081
"mytable.myid IN (:param_1, :myid_1)")
2083
self.assert_compile(table1.c.myid.in_([literal('a'), literal('b')]),
2084
"mytable.myid IN (:param_1, :param_2)")
2086
self.assert_compile(table1.c.myid.in_(['a', literal('b')]),
2087
"mytable.myid IN (:myid_1, :param_1)")
2089
self.assert_compile(table1.c.myid.in_([literal(1) + 'a']),
2090
"mytable.myid IN (:param_1 + :param_2)")
2092
self.assert_compile(table1.c.myid.in_([literal('a') +'a', 'b']),
2093
"mytable.myid IN (:param_1 || :param_2, :myid_1)")
2095
self.assert_compile(table1.c.myid.in_([literal('a') + literal('a'), literal('b')]),
2096
"mytable.myid IN (:param_1 || :param_2, :param_3)")
2098
self.assert_compile(table1.c.myid.in_([1, literal(3) + 4]),
2099
"mytable.myid IN (:myid_1, :param_1 + :param_2)")
2101
self.assert_compile(table1.c.myid.in_([literal('a') < 'b']),
2102
"mytable.myid IN (:param_1 < :param_2)")
2104
self.assert_compile(table1.c.myid.in_([table1.c.myid]),
2105
"mytable.myid IN (mytable.myid)")
2107
self.assert_compile(table1.c.myid.in_(['a', table1.c.myid]),
2108
"mytable.myid IN (:myid_1, mytable.myid)")
2110
self.assert_compile(table1.c.myid.in_([literal('a'), table1.c.myid]),
2111
"mytable.myid IN (:param_1, mytable.myid)")
2113
self.assert_compile(table1.c.myid.in_([literal('a'), table1.c.myid +'a']),
2114
"mytable.myid IN (:param_1, mytable.myid + :myid_1)")
2116
self.assert_compile(table1.c.myid.in_([literal(1), 'a' + table1.c.myid]),
2117
"mytable.myid IN (:param_1, :myid_1 + mytable.myid)")
2119
self.assert_compile(table1.c.myid.in_([1, 2, 3]),
2120
"mytable.myid IN (:myid_1, :myid_2, :myid_3)")
2122
self.assert_compile(table1.c.myid.in_(select([table2.c.otherid])),
2123
"mytable.myid IN (SELECT myothertable.otherid FROM myothertable)")
2125
self.assert_compile(~table1.c.myid.in_(select([table2.c.otherid])),
2126
"mytable.myid NOT IN (SELECT myothertable.otherid FROM myothertable)")
2129
self.assert_compile(
2131
text("SELECT myothertable.otherid FROM myothertable")
2133
"mytable.myid IN (SELECT myothertable.otherid "
2134
"FROM myothertable)"
2137
# test empty in clause
2138
self.assert_compile(table1.c.myid.in_([]),
2139
"mytable.myid != mytable.myid")
2141
self.assert_compile(
2142
select([table1.c.myid.in_(select([table2.c.otherid]))]),
2143
"SELECT mytable.myid IN (SELECT myothertable.otherid FROM myothertable) AS anon_1 FROM mytable"
2145
self.assert_compile(
2146
select([table1.c.myid.in_(select([table2.c.otherid]).as_scalar())]),
2147
"SELECT mytable.myid IN (SELECT myothertable.otherid FROM myothertable) AS anon_1 FROM mytable"
2150
self.assert_compile(table1.c.myid.in_(
2152
select([table1.c.myid], table1.c.myid == 5),
2153
select([table1.c.myid], table1.c.myid == 12),
2155
), "mytable.myid IN ("\
2156
"SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1 "\
2157
"UNION SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_2)")
2159
# test that putting a select in an IN clause does not blow away its ORDER BY clause
2160
self.assert_compile(
2161
select([table1, table2],
2162
table2.c.otherid.in_(
2163
select([table2.c.otherid], order_by=[table2.c.othername], limit=10, correlate=False)
2165
from_obj=[table1.join(table2, table1.c.myid==table2.c.otherid)], order_by=[table1.c.myid]
2167
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername FROM mytable "\
2168
"JOIN myothertable ON mytable.myid = myothertable.otherid WHERE myothertable.otherid IN (SELECT myothertable.otherid "\
2169
"FROM myothertable ORDER BY myothertable.othername LIMIT :param_1) ORDER BY mytable.myid",
2173
1975
def test_tuple(self):
2174
self.assert_compile(tuple_(table1.c.myid, table1.c.name).in_([(1, 'foo'), (5, 'bar')]),
2175
"(mytable.myid, mytable.name) IN ((:param_1, :param_2), (:param_3, :param_4))"
1976
self.assert_compile(
1977
tuple_(table1.c.myid, table1.c.name).in_(
1978
[(1, 'foo'), (5, 'bar')]),
1979
"(mytable.myid, mytable.name) IN "
1980
"((:param_1, :param_2), (:param_3, :param_4))"
2178
1983
self.assert_compile(
2179
1984
tuple_(table1.c.myid, table1.c.name).in_(
2180
1985
[tuple_(table2.c.otherid, table2.c.othername)]
2182
"(mytable.myid, mytable.name) IN ((myothertable.otherid, myothertable.othername))"
1987
"(mytable.myid, mytable.name) IN "
1988
"((myothertable.otherid, myothertable.othername))"
2185
1991
self.assert_compile(
2412
class KwargPropagationTest(fixtures.TestBase):
2415
def setup_class(cls):
2416
from sqlalchemy.sql.expression import ColumnClause, TableClause
2417
class CatchCol(ColumnClause):
2420
class CatchTable(TableClause):
2423
cls.column = CatchCol("x")
2424
cls.table = CatchTable("y")
2425
cls.criterion = cls.column == CatchCol('y')
2428
def compile_col(element, compiler, **kw):
2429
assert "canary" in kw
2430
return compiler.visit_column(element)
2432
@compiles(CatchTable)
2433
def compile_table(element, compiler, **kw):
2434
assert "canary" in kw
2435
return compiler.visit_table(element)
2437
def _do_test(self, element):
2438
d = default.DefaultDialect()
2439
d.statement_compiler(d, element,
2440
compile_kwargs={"canary": True})
2442
def test_binary(self):
2443
self._do_test(self.column == 5)
2445
def test_select(self):
2446
s = select([self.column]).select_from(self.table).\
2447
where(self.column == self.criterion).\
2448
order_by(self.column)
2451
def test_case(self):
2452
c = case([(self.criterion, self.column)], else_=self.column)
2455
def test_cast(self):
2456
c = cast(self.column, Integer)
2612
2460
class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
2613
2461
__dialect__ = 'default'
2615
def test_insert(self):
2616
# generic insert, will create bind params for all columns
2617
self.assert_compile(insert(table1),
2618
"INSERT INTO mytable (myid, name, description) "
2619
"VALUES (:myid, :name, :description)")
2621
# insert with user-supplied bind params for specific columns,
2622
# cols provided literally
2623
self.assert_compile(
2625
table1.c.myid : bindparam('userid'),
2626
table1.c.name : bindparam('username')}),
2627
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)")
2629
# insert with user-supplied bind params for specific columns, cols
2630
# provided as strings
2631
self.assert_compile(
2632
insert(table1, dict(myid = 3, name = 'jack')),
2633
"INSERT INTO mytable (myid, name) VALUES (:myid, :name)"
2636
# test with a tuple of params instead of named
2637
self.assert_compile(
2638
insert(table1, (3, 'jack', 'mydescription')),
2639
"INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)",
2640
checkparams = {'myid':3, 'name':'jack', 'description':'mydescription'}
2643
self.assert_compile(
2644
insert(table1, values={
2645
table1.c.myid : bindparam('userid')
2646
}).values({table1.c.name : bindparam('username')}),
2647
"INSERT INTO mytable (myid, name) VALUES (:userid, :username)"
2650
self.assert_compile(
2651
insert(table1, values=dict(myid=func.lala())),
2652
"INSERT INTO mytable (myid) VALUES (lala())")
2654
def test_inline_insert(self):
2655
metadata = MetaData()
2656
table = Table('sometable', metadata,
2657
Column('id', Integer, primary_key=True),
2658
Column('foo', Integer, default=func.foobar()))
2659
self.assert_compile(
2660
table.insert(values={}, inline=True),
2661
"INSERT INTO sometable (foo) VALUES (foobar())")
2662
self.assert_compile(
2663
table.insert(inline=True),
2664
"INSERT INTO sometable (foo) VALUES (foobar())", params={})
2666
def test_update(self):
2667
self.assert_compile(
2668
update(table1, table1.c.myid == 7),
2669
"UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
2670
params = {table1.c.name:'fred'})
2671
self.assert_compile(
2672
table1.update().where(table1.c.myid==7).
2673
values({table1.c.myid:5}),
2674
"UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
2675
checkparams={'myid':5, 'myid_1':7})
2676
self.assert_compile(
2677
update(table1, table1.c.myid == 7),
2678
"UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
2679
params = {'name':'fred'})
2680
self.assert_compile(
2681
update(table1, values = {table1.c.name : table1.c.myid}),
2682
"UPDATE mytable SET name=mytable.myid")
2683
self.assert_compile(
2685
whereclause = table1.c.name == bindparam('crit'),
2686
values = {table1.c.name : 'hi'}),
2687
"UPDATE mytable SET name=:name WHERE mytable.name = :crit",
2688
params = {'crit' : 'notthere'},
2689
checkparams={'crit':'notthere', 'name':'hi'})
2690
self.assert_compile(
2691
update(table1, table1.c.myid == 12,
2692
values = {table1.c.name : table1.c.myid}),
2693
"UPDATE mytable SET name=mytable.myid, description="
2694
":description WHERE mytable.myid = :myid_1",
2695
params = {'description':'test'},
2696
checkparams={'description':'test', 'myid_1':12})
2697
self.assert_compile(
2698
update(table1, table1.c.myid == 12,
2699
values = {table1.c.myid : 9}),
2700
"UPDATE mytable SET myid=:myid, description=:description "
2701
"WHERE mytable.myid = :myid_1",
2702
params = {'myid_1': 12, 'myid': 9, 'description': 'test'})
2703
self.assert_compile(
2704
update(table1, table1.c.myid ==12),
2705
"UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
2706
params={'myid':18}, checkparams={'myid':18, 'myid_1':12})
2707
s = table1.update(table1.c.myid == 12, values = {table1.c.name : 'lala'})
2708
c = s.compile(column_keys=['id', 'name'])
2709
self.assert_compile(
2710
update(table1, table1.c.myid == 12,
2711
values = {table1.c.name : table1.c.myid}
2712
).values({table1.c.name:table1.c.name + 'foo'}),
2713
"UPDATE mytable SET name=(mytable.name || :name_1), "
2714
"description=:description WHERE mytable.myid = :myid_1",
2715
params = {'description':'test'})
2718
self.assert_compile(update(table1,
2719
(table1.c.myid == func.hoho(4)) &
2720
(table1.c.name == literal('foo') + table1.c.name + literal('lala')),
2722
table1.c.name : table1.c.name + "lala",
2723
table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho'))
2724
}), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), "
2725
"name=(mytable.name || :name_1) "
2726
"WHERE mytable.myid = hoho(:hoho_1) AND mytable.name = :param_2 || "
2727
"mytable.name || :param_3")
2729
2464
def test_correlated_update(self):
2730
2465
# test against a straight text subquery
2731
u = update(table1, values = {
2466
u = update(table1, values={
2733
2468
text("(select name from mytable where id=mytable.id)")})
2734
2469
self.assert_compile(u,
2735
2470
"UPDATE mytable SET name=(select name from mytable "
2736
2471
"where id=mytable.id)")
2738
2473
mt = table1.alias()
2739
u = update(table1, values = {
2741
select([mt.c.name], mt.c.myid==table1.c.myid)
2474
u = update(table1, values={
2476
select([mt.c.name], mt.c.myid == table1.c.myid)
2743
2478
self.assert_compile(u,
2744
2479
"UPDATE mytable SET name=(SELECT mytable_1.name FROM "
2745
"mytable AS mytable_1 WHERE mytable_1.myid = mytable.myid)")
2480
"mytable AS mytable_1 WHERE "
2481
"mytable_1.myid = mytable.myid)")
2747
2483
# test against a regular constructed subquery
2748
2484
s = select([table2], table2.c.otherid == table1.c.myid)
2749
u = update(table1, table1.c.name == 'jack', values = {table1.c.name : s})
2485
u = update(table1, table1.c.name == 'jack', values={table1.c.name: s})
2750
2486
self.assert_compile(u,
2751
2487
"UPDATE mytable SET name=(SELECT myothertable.otherid, "
2752
2488
"myothertable.othername FROM myothertable WHERE "
2753
"myothertable.otherid = mytable.myid) WHERE mytable.name = :name_1")
2489
"myothertable.otherid = mytable.myid) "
2490
"WHERE mytable.name = :name_1")
2755
2492
# test a non-correlated WHERE clause
2756
2493
s = select([table2.c.othername], table2.c.otherid == 7)
2757
u = update(table1, table1.c.name==s)
2494
u = update(table1, table1.c.name == s)
2758
2495
self.assert_compile(u,
2759
2496
"UPDATE mytable SET myid=:myid, name=:name, "
2760
2497
"description=:description WHERE mytable.name = "
3012
2730
def test_select(self):
3013
2731
self.assert_compile(table4.select(),
3014
"SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id,"
3015
" remote_owner.remotetable.value FROM remote_owner.remotetable")
2732
"SELECT remote_owner.remotetable.rem_id, "
2733
"remote_owner.remotetable.datatype_id,"
2734
" remote_owner.remotetable.value "
2735
"FROM remote_owner.remotetable")
3017
self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')),
3018
"SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id,"
3019
" remote_owner.remotetable.value FROM remote_owner.remotetable WHERE "
2737
self.assert_compile(table4.select(and_(table4.c.datatype_id == 7,
2738
table4.c.value == 'hi')),
2739
"SELECT remote_owner.remotetable.rem_id, "
2740
"remote_owner.remotetable.datatype_id,"
2741
" remote_owner.remotetable.value "
2742
"FROM remote_owner.remotetable WHERE "
3020
2743
"remote_owner.remotetable.datatype_id = :datatype_id_1 AND"
3021
2744
" remote_owner.remotetable.value = :value_1")
3023
s = table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi'), use_labels=True)
2746
s = table4.select(and_(table4.c.datatype_id == 7,
2747
table4.c.value == 'hi'), use_labels=True)
3024
2748
self.assert_compile(s, "SELECT remote_owner.remotetable.rem_id AS"
3025
" remote_owner_remotetable_rem_id, remote_owner.remotetable.datatype_id AS"
3026
" remote_owner_remotetable_datatype_id, remote_owner.remotetable.value "
3027
"AS remote_owner_remotetable_value FROM remote_owner.remotetable WHERE "
2749
" remote_owner_remotetable_rem_id, "
2750
"remote_owner.remotetable.datatype_id AS"
2751
" remote_owner_remotetable_datatype_id, "
2752
"remote_owner.remotetable.value "
2753
"AS remote_owner_remotetable_value FROM "
2754
"remote_owner.remotetable WHERE "
3028
2755
"remote_owner.remotetable.datatype_id = :datatype_id_1 AND "
3029
2756
"remote_owner.remotetable.value = :value_1")
3031
2758
# multi-part schema name
3032
2759
self.assert_compile(table5.select(),
3033
2760
'SELECT "dbo.remote_owner".remotetable.rem_id, '
3034
'"dbo.remote_owner".remotetable.datatype_id, "dbo.remote_owner".remotetable.value '
2761
'"dbo.remote_owner".remotetable.datatype_id, '
2762
'"dbo.remote_owner".remotetable.value '
3035
2763
'FROM "dbo.remote_owner".remotetable'
3038
2766
# multi-part schema name labels - convert '.' to '_'
3039
2767
self.assert_compile(table5.select(use_labels=True),
3040
2768
'SELECT "dbo.remote_owner".remotetable.rem_id AS'
3041
' dbo_remote_owner_remotetable_rem_id, "dbo.remote_owner".remotetable.datatype_id'
2769
' dbo_remote_owner_remotetable_rem_id, '
2770
'"dbo.remote_owner".remotetable.datatype_id'
3042
2771
' AS dbo_remote_owner_remotetable_datatype_id,'
3043
' "dbo.remote_owner".remotetable.value AS dbo_remote_owner_remotetable_value FROM'
2772
' "dbo.remote_owner".remotetable.value AS '
2773
'dbo_remote_owner_remotetable_value FROM'
3044
2774
' "dbo.remote_owner".remotetable'
3047
2777
def test_alias(self):
3048
2778
a = alias(table4, 'remtable')
3049
self.assert_compile(a.select(a.c.datatype_id==7),
3050
"SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM"
2779
self.assert_compile(a.select(a.c.datatype_id == 7),
2780
"SELECT remtable.rem_id, remtable.datatype_id, "
2781
"remtable.value FROM"
3051
2782
" remote_owner.remotetable AS remtable "
3052
2783
"WHERE remtable.datatype_id = :datatype_id_1")
3054
2785
def test_update(self):
3055
2786
self.assert_compile(
3056
table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}),
2787
table4.update(table4.c.value == 'test',
2788
values={table4.c.datatype_id: 12}),
3057
2789
"UPDATE remote_owner.remotetable SET datatype_id=:datatype_id "
3058
2790
"WHERE remote_owner.remotetable.value = :value_1")
3060
2792
def test_insert(self):
3061
2793
self.assert_compile(table4.insert(values=(2, 5, 'test')),
3062
"INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES "
2794
"INSERT INTO remote_owner.remotetable "
2795
"(rem_id, datatype_id, value) VALUES "
3063
2796
"(:rem_id, :datatype_id, :value)")
2799
class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL):
2800
__dialect__ = 'default'
2802
def test_dont_overcorrelate(self):
2803
self.assert_compile(select([table1], from_obj=[table1,
2805
"SELECT mytable.myid, mytable.name, "
2806
"mytable.description FROM mytable, (SELECT "
2807
"mytable.myid AS myid, mytable.name AS "
2808
"name, mytable.description AS description "
2812
t1 = table('t1', column('a'))
2813
t2 = table('t2', column('a'))
2814
return t1, t2, select([t1]).where(t1.c.a == t2.c.a)
2816
def _assert_where_correlated(self, stmt):
2817
self.assert_compile(
2819
"SELECT t2.a FROM t2 WHERE t2.a = "
2820
"(SELECT t1.a FROM t1 WHERE t1.a = t2.a)")
2822
def _assert_where_all_correlated(self, stmt):
2823
self.assert_compile(
2825
"SELECT t1.a, t2.a FROM t1, t2 WHERE t2.a = "
2826
"(SELECT t1.a WHERE t1.a = t2.a)")
2828
# note there's no more "backwards" correlation after
2830
#def _assert_where_backwards_correlated(self, stmt):
2831
# self.assert_compile(
2833
# "SELECT t2.a FROM t2 WHERE t2.a = "
2834
# "(SELECT t1.a FROM t2 WHERE t1.a = t2.a)")
2836
#def _assert_column_backwards_correlated(self, stmt):
2837
# self.assert_compile(stmt,
2838
# "SELECT t2.a, (SELECT t1.a FROM t2 WHERE t1.a = t2.a) "
2839
# "AS anon_1 FROM t2")
2841
def _assert_column_correlated(self, stmt):
2842
self.assert_compile(stmt,
2843
"SELECT t2.a, (SELECT t1.a FROM t1 WHERE t1.a = t2.a) "
2844
"AS anon_1 FROM t2")
2846
def _assert_column_all_correlated(self, stmt):
2847
self.assert_compile(stmt,
2848
"SELECT t1.a, t2.a, "
2849
"(SELECT t1.a WHERE t1.a = t2.a) AS anon_1 FROM t1, t2")
2852
def _assert_having_correlated(self, stmt):
2853
self.assert_compile(stmt,
2854
"SELECT t2.a FROM t2 HAVING t2.a = "
2855
"(SELECT t1.a FROM t1 WHERE t1.a = t2.a)")
2857
def _assert_from_uncorrelated(self, stmt):
2858
self.assert_compile(stmt,
2859
"SELECT t2.a, anon_1.a FROM t2, "
2860
"(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1")
2862
def _assert_from_all_uncorrelated(self, stmt):
2863
self.assert_compile(stmt,
2864
"SELECT t1.a, t2.a, anon_1.a FROM t1, t2, "
2865
"(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1")
2867
def _assert_where_uncorrelated(self, stmt):
2868
self.assert_compile(stmt,
2869
"SELECT t2.a FROM t2 WHERE t2.a = "
2870
"(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)")
2872
def _assert_column_uncorrelated(self, stmt):
2873
self.assert_compile(stmt,
2874
"SELECT t2.a, (SELECT t1.a FROM t1, t2 "
2875
"WHERE t1.a = t2.a) AS anon_1 FROM t2")
2877
def _assert_having_uncorrelated(self, stmt):
2878
self.assert_compile(stmt,
2879
"SELECT t2.a FROM t2 HAVING t2.a = "
2880
"(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)")
2882
def _assert_where_single_full_correlated(self, stmt):
2883
self.assert_compile(stmt,
2884
"SELECT t1.a FROM t1 WHERE t1.a = (SELECT t1.a)")
2886
def test_correlate_semiauto_where(self):
2887
t1, t2, s1 = self._fixture()
2888
self._assert_where_correlated(
2889
select([t2]).where(t2.c.a == s1.correlate(t2)))
2891
def test_correlate_semiauto_column(self):
2892
t1, t2, s1 = self._fixture()
2893
self._assert_column_correlated(
2894
select([t2, s1.correlate(t2).as_scalar()]))
2896
def test_correlate_semiauto_from(self):
2897
t1, t2, s1 = self._fixture()
2898
self._assert_from_uncorrelated(
2899
select([t2, s1.correlate(t2).alias()]))
2901
def test_correlate_semiauto_having(self):
2902
t1, t2, s1 = self._fixture()
2903
self._assert_having_correlated(
2904
select([t2]).having(t2.c.a == s1.correlate(t2)))
2906
def test_correlate_except_inclusion_where(self):
2907
t1, t2, s1 = self._fixture()
2908
self._assert_where_correlated(
2909
select([t2]).where(t2.c.a == s1.correlate_except(t1)))
2911
def test_correlate_except_exclusion_where(self):
2912
t1, t2, s1 = self._fixture()
2913
self._assert_where_uncorrelated(
2914
select([t2]).where(t2.c.a == s1.correlate_except(t2)))
2916
def test_correlate_except_inclusion_column(self):
2917
t1, t2, s1 = self._fixture()
2918
self._assert_column_correlated(
2919
select([t2, s1.correlate_except(t1).as_scalar()]))
2921
def test_correlate_except_exclusion_column(self):
2922
t1, t2, s1 = self._fixture()
2923
self._assert_column_uncorrelated(
2924
select([t2, s1.correlate_except(t2).as_scalar()]))
2926
def test_correlate_except_inclusion_from(self):
2927
t1, t2, s1 = self._fixture()
2928
self._assert_from_uncorrelated(
2929
select([t2, s1.correlate_except(t1).alias()]))
2931
def test_correlate_except_exclusion_from(self):
2932
t1, t2, s1 = self._fixture()
2933
self._assert_from_uncorrelated(
2934
select([t2, s1.correlate_except(t2).alias()]))
2936
def test_correlate_except_none(self):
2937
t1, t2, s1 = self._fixture()
2938
self._assert_where_all_correlated(
2939
select([t1, t2]).where(t2.c.a == s1.correlate_except(None)))
2941
def test_correlate_except_having(self):
2942
t1, t2, s1 = self._fixture()
2943
self._assert_having_correlated(
2944
select([t2]).having(t2.c.a == s1.correlate_except(t1)))
2946
def test_correlate_auto_where(self):
2947
t1, t2, s1 = self._fixture()
2948
self._assert_where_correlated(
2949
select([t2]).where(t2.c.a == s1))
2951
def test_correlate_auto_column(self):
2952
t1, t2, s1 = self._fixture()
2953
self._assert_column_correlated(
2954
select([t2, s1.as_scalar()]))
2956
def test_correlate_auto_from(self):
2957
t1, t2, s1 = self._fixture()
2958
self._assert_from_uncorrelated(
2959
select([t2, s1.alias()]))
2961
def test_correlate_auto_having(self):
2962
t1, t2, s1 = self._fixture()
2963
self._assert_having_correlated(
2964
select([t2]).having(t2.c.a == s1))
2966
def test_correlate_disabled_where(self):
2967
t1, t2, s1 = self._fixture()
2968
self._assert_where_uncorrelated(
2969
select([t2]).where(t2.c.a == s1.correlate(None)))
2971
def test_correlate_disabled_column(self):
2972
t1, t2, s1 = self._fixture()
2973
self._assert_column_uncorrelated(
2974
select([t2, s1.correlate(None).as_scalar()]))
2976
def test_correlate_disabled_from(self):
2977
t1, t2, s1 = self._fixture()
2978
self._assert_from_uncorrelated(
2979
select([t2, s1.correlate(None).alias()]))
2981
def test_correlate_disabled_having(self):
2982
t1, t2, s1 = self._fixture()
2983
self._assert_having_uncorrelated(
2984
select([t2]).having(t2.c.a == s1.correlate(None)))
2986
def test_correlate_all_where(self):
2987
t1, t2, s1 = self._fixture()
2988
self._assert_where_all_correlated(
2989
select([t1, t2]).where(t2.c.a == s1.correlate(t1, t2)))
2991
def test_correlate_all_column(self):
2992
t1, t2, s1 = self._fixture()
2993
self._assert_column_all_correlated(
2994
select([t1, t2, s1.correlate(t1, t2).as_scalar()]))
2996
def test_correlate_all_from(self):
2997
t1, t2, s1 = self._fixture()
2998
self._assert_from_all_uncorrelated(
2999
select([t1, t2, s1.correlate(t1, t2).alias()]))
3001
def test_correlate_where_all_unintentional(self):
3002
t1, t2, s1 = self._fixture()
3003
assert_raises_message(
3004
exc.InvalidRequestError,
3005
"returned no FROM clauses due to auto-correlation",
3006
select([t1, t2]).where(t2.c.a == s1).compile
3009
def test_correlate_from_all_ok(self):
3010
t1, t2, s1 = self._fixture()
3011
self.assert_compile(
3012
select([t1, t2, s1]),
3013
"SELECT t1.a, t2.a, a FROM t1, t2, "
3014
"(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a)"
3017
def test_correlate_auto_where_singlefrom(self):
3018
t1, t2, s1 = self._fixture()
3019
s = select([t1.c.a])
3020
s2 = select([t1]).where(t1.c.a == s)
3021
self.assert_compile(s2,
3022
"SELECT t1.a FROM t1 WHERE t1.a = "
3023
"(SELECT t1.a FROM t1)")
3025
def test_correlate_semiauto_where_singlefrom(self):
3026
t1, t2, s1 = self._fixture()
3028
s = select([t1.c.a])
3030
s2 = select([t1]).where(t1.c.a == s.correlate(t1))
3031
self._assert_where_single_full_correlated(s2)
3033
def test_correlate_except_semiauto_where_singlefrom(self):
3034
t1, t2, s1 = self._fixture()
3036
s = select([t1.c.a])
3038
s2 = select([t1]).where(t1.c.a == s.correlate_except(t2))
3039
self._assert_where_single_full_correlated(s2)
3041
def test_correlate_alone_noeffect(self):
3043
t1, t2, s1 = self._fixture()
3044
self.assert_compile(s1.correlate(t1, t2),
3045
"SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a")
3047
def test_correlate_except_froms(self):
3049
t1 = table('t1', column('a'))
3050
t2 = table('t2', column('a'), column('b'))
3051
s = select([t2.c.b]).where(t1.c.a == t2.c.a)
3052
s = s.correlate_except(t2).alias('s')
3054
s2 = select([func.foo(s.c.b)]).as_scalar()
3055
s3 = select([t1], order_by=s2)
3057
self.assert_compile(s3,
3058
"SELECT t1.a FROM t1 ORDER BY "
3059
"(SELECT foo(s.b) AS foo_1 FROM "
3060
"(SELECT t2.b AS b FROM t2 WHERE t1.a = t2.a) AS s)"
3063
def test_multilevel_froms_correlation(self):
3065
p = table('parent', column('id'))
3066
c = table('child', column('id'), column('parent_id'), column('pos'))
3068
s = c.select().where(c.c.parent_id == p.c.id).order_by(c.c.pos).limit(1)
3070
s = exists().select_from(s).where(s.c.id == 1)
3071
s = select([p]).where(s)
3072
self.assert_compile(s,
3073
"SELECT parent.id FROM parent WHERE EXISTS (SELECT * "
3074
"FROM (SELECT child.id AS id, child.parent_id AS parent_id, "
3075
"child.pos AS pos FROM child WHERE child.parent_id = parent.id "
3076
"ORDER BY child.pos LIMIT :param_1) WHERE id = :id_1)")
3078
def test_no_contextless_correlate_except(self):
3081
t1 = table('t1', column('x'))
3082
t2 = table('t2', column('y'))
3083
t3 = table('t3', column('z'))
3085
s = select([t1]).where(t1.c.x == t2.c.y).\
3086
where(t2.c.y == t3.c.z).correlate_except(t1)
3087
self.assert_compile(s,
3088
"SELECT t1.x FROM t1, t2, t3 WHERE t1.x = t2.y AND t2.y = t3.z")
3090
def test_multilevel_implicit_correlation_disabled(self):
3091
# test that implicit correlation with multilevel WHERE correlation
3092
# behaves like 0.8.1, 0.7 (i.e. doesn't happen)
3093
t1 = table('t1', column('x'))
3094
t2 = table('t2', column('y'))
3095
t3 = table('t3', column('z'))
3097
s = select([t1.c.x]).where(t1.c.x == t2.c.y)
3098
s2 = select([t3.c.z]).where(t3.c.z == s.as_scalar())
3099
s3 = select([t1]).where(t1.c.x == s2.as_scalar())
3101
self.assert_compile(s3,
3102
"SELECT t1.x FROM t1 "
3103
"WHERE t1.x = (SELECT t3.z "
3105
"WHERE t3.z = (SELECT t1.x "
3107
"WHERE t1.x = t2.y))"
3110
def test_from_implicit_correlation_disabled(self):
3111
# test that implicit correlation with immediate and
3112
# multilevel FROM clauses behaves like 0.8.1 (i.e. doesn't happen)
3113
t1 = table('t1', column('x'))
3114
t2 = table('t2', column('y'))
3115
t3 = table('t3', column('z'))
3117
s = select([t1.c.x]).where(t1.c.x == t2.c.y)
3118
s2 = select([t2, s])
3119
s3 = select([t1, s2])
3121
self.assert_compile(s3,
3122
"SELECT t1.x, y, x FROM t1, "
3123
"(SELECT t2.y AS y, x FROM t2, "
3124
"(SELECT t1.x AS x FROM t1, t2 WHERE t1.x = t2.y))"
3066
3127
class CoercionTest(fixtures.TestBase, AssertsCompiledSQL):
3067
3128
__dialect__ = 'default'