1
from test.lib.testing import eq_, assert_raises_message, assert_raises
1
from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, is_
2
from sqlalchemy import testing
3
from sqlalchemy.testing import fixtures, engines
4
from sqlalchemy import util
3
6
from sqlalchemy import *
4
from sqlalchemy import exc, sql, util
5
from sqlalchemy.engine import default, base
7
from test.lib.schema import Table, Column
7
from sqlalchemy import exc, sql
8
from sqlalchemy.engine import default, result as _result
9
from sqlalchemy.testing.schema import Table, Column
11
# ongoing - these are old tests. those which are of general use
12
# to test a dialect are being slowly migrated to
13
# sqlalhcemy.testing.suite
9
15
class QueryTest(fixtures.TestBase):
42
48
def teardown_class(cls):
43
49
metadata.drop_all()
45
def test_insert(self):
46
users.insert().execute(user_id = 7, user_name = 'jack')
47
assert users.count().scalar() == 1
51
@testing.requires.multivalues_inserts
52
def test_multivalues_insert(self):
53
users.insert(values=[{'user_id':7, 'user_name':'jack'},
54
{'user_id':8, 'user_name':'ed'}]).execute()
55
rows = users.select().order_by(users.c.user_id).execute().fetchall()
56
self.assert_(rows[0] == (7, 'jack'))
57
self.assert_(rows[1] == (8, 'ed'))
58
users.insert(values=[(9, 'jack'), (10, 'ed')]).execute()
59
rows = users.select().order_by(users.c.user_id).execute().fetchall()
60
self.assert_(rows[2] == (9, 'jack'))
61
self.assert_(rows[3] == (10, 'ed'))
49
63
def test_insert_heterogeneous_params(self):
50
"""test that executemany parameters are asserted to match the parameter set of the first."""
64
"""test that executemany parameters are asserted to match the
65
parameter set of the first."""
52
67
assert_raises_message(exc.StatementError,
53
68
r"A value is required for bind parameter 'user_name', in "
183
191
table.create(bind=engine, checkfirst=True)
184
192
i = insert_values(engine, table, values)
185
assert i == assertvalues, "tablename: %s %r %r" % (table.name, repr(i), repr(assertvalues))
193
assert i == assertvalues, "tablename: %s %r %r" % \
194
(table.name, repr(i), repr(assertvalues))
187
196
table.drop(bind=engine)
198
@testing.only_on('sqlite+pysqlite')
199
@testing.provide_metadata
200
def test_lastrowid_zero(self):
201
from sqlalchemy.dialects import sqlite
202
eng = engines.testing_engine()
203
class ExcCtx(sqlite.base.SQLiteExecutionContext):
204
def get_lastrowid(self):
206
eng.dialect.execution_ctx_cls = ExcCtx
207
t = Table('t', MetaData(), Column('x', Integer, primary_key=True),
208
Column('y', Integer))
210
r = eng.execute(t.insert().values(y=5))
211
eq_(r.inserted_primary_key, [0])
189
214
@testing.fails_on('sqlite', "sqlite autoincremnt doesn't work with composite pks")
190
215
def test_misordered_lastrow(self):
191
216
related = Table('related', metadata,
932
assert row.keys() == ["case_insensitive", "CaseSensitive"]
892
eq_(row.keys(), ["case_insensitive", "CaseSensitive"])
893
eq_(row["case_insensitive"], 1)
894
eq_(row["CaseSensitive"], 2)
898
lambda: row["Case_insensitive"]
902
lambda: row["casesensitive"]
905
def test_row_case_insensitive(self):
906
ins_db = engines.testing_engine(options={"case_sensitive":False})
907
row = ins_db.execute(
909
literal_column("1").label("case_insensitive"),
910
literal_column("2").label("CaseSensitive")
914
eq_(row.keys(), ["case_insensitive", "CaseSensitive"])
915
eq_(row["case_insensitive"], 1)
916
eq_(row["CaseSensitive"], 2)
917
eq_(row["Case_insensitive"],1)
918
eq_(row["casesensitive"],2)
935
921
def test_row_as_args(self):
940
926
eq_(users.select().execute().fetchall(), [(1, 'john')])
942
928
def test_result_as_args(self):
943
users.insert().execute([dict(user_id=1, user_name='john'), dict(user_id=2, user_name='ed')])
929
users.insert().execute([
930
dict(user_id=1, user_name='john'),
931
dict(user_id=2, user_name='ed')])
944
932
r = users.select().execute()
945
933
users2.insert().execute(list(r))
946
assert users2.select().execute().fetchall() == [(1, 'john'), (2, 'ed')]
935
users2.select().order_by(users2.c.user_id).execute().fetchall(),
936
[(1, 'john'), (2, 'ed')]
948
939
users2.delete().execute()
949
940
r = users.select().execute()
950
941
users2.insert().execute(*list(r))
951
assert users2.select().execute().fetchall() == [(1, 'john'), (2, 'ed')]
943
users2.select().order_by(users2.c.user_id).execute().fetchall(),
944
[(1, 'john'), (2, 'ed')]
953
947
def test_ambiguous_column(self):
954
948
users.insert().execute(user_id=1, user_name='john')
955
r = users.outerjoin(addresses).select().execute().first()
949
result = users.outerjoin(addresses).select().execute()
956
952
assert_raises_message(
957
953
exc.InvalidRequestError,
958
954
"Ambiguous column name",
959
955
lambda: r['user_id']
958
assert_raises_message(
959
exc.InvalidRequestError,
960
"Ambiguous column name",
961
lambda: r[users.c.user_id]
964
assert_raises_message(
965
exc.InvalidRequestError,
966
"Ambiguous column name",
967
lambda: r[addresses.c.user_id]
970
# try to trick it - fake_table isn't in the result!
971
# we get the correct error
972
fake_table = Table('fake', MetaData(), Column('user_id', Integer))
973
assert_raises_message(
974
exc.InvalidRequestError,
975
"Could not locate column in row for column 'fake.user_id'",
976
lambda: r[fake_table.c.user_id]
962
979
r = util.pickle.loads(util.pickle.dumps(r))
963
980
assert_raises_message(
964
981
exc.InvalidRequestError,
989
1006
lambda: row[users.c.user_id]
992
# this is a bug, should be ambiguous.
994
eq_(row[ua.c.user_id], 1)
1009
assert_raises_message(
1010
exc.InvalidRequestError,
1011
"Ambiguous column name",
1012
lambda: row[ua.c.user_id]
996
# this is also a less severe bug - u2.c.user_id
997
# is not in the row at all so is not actually
998
# ambiguous. Still is like this in 0.8
999
# and is due to overly liberal "this is a derived column"
1015
# Unfortunately, this fails -
1017
# "Could not locate column in row"
1018
# to be raised here, but the check for
1019
# "common column" in _compare_name_for_result()
1020
# has other requirements to be more liberal.
1022
# expression system would need a way to determine
1023
# if given two columns in a "proxy" relationship, if they
1024
# refer to a different parent table
1001
1025
assert_raises_message(
1002
1026
exc.InvalidRequestError,
1003
1027
"Ambiguous column name",
1004
1028
lambda: row[u2.c.user_id]
1031
def test_ambiguous_column_contains(self):
1032
# ticket 2702. in 0.7 we'd get True, False.
1033
# in 0.8, both columns are present so it's True;
1034
# but when they're fetched you'll get the ambiguous error.
1035
users.insert().execute(user_id=1, user_name='john')
1038
addresses.c.user_id]).\
1039
select_from(users.outerjoin(addresses)).execute()
1040
row = result.first()
1043
set([users.c.user_id in row, addresses.c.user_id in row]),
1047
def test_ambiguous_column_by_col_plus_label(self):
1048
users.insert().execute(user_id=1, user_name='john')
1049
result = select([users.c.user_id,
1050
type_coerce(users.c.user_id, Integer).label('foo')]
1052
row = result.first()
1054
row[users.c.user_id], 1
1007
1060
@testing.requires.subqueries
1008
1061
def test_column_label_targeting(self):
1009
1062
users.insert().execute(user_id=7, user_name='ed')
1190
1233
r = s.execute().fetchall()
1191
1234
assert len(r) == 1
1236
class RequiredBindTest(fixtures.TablesTest):
1237
run_create_tables = None
1241
def define_tables(cls, metadata):
1242
Table('foo', metadata,
1243
Column('id', Integer, primary_key=True),
1244
Column('data', String(50)),
1245
Column('x', Integer)
1248
def _assert_raises(self, stmt, params):
1249
assert_raises_message(
1251
"A value is required for bind parameter 'x'",
1252
testing.db.execute, stmt, **params)
1254
assert_raises_message(
1256
"A value is required for bind parameter 'x'",
1257
testing.db.execute, stmt, params)
1259
def test_insert(self):
1260
stmt = self.tables.foo.insert().values(x=bindparam('x'),
1261
data=bindparam('data'))
1262
self._assert_raises(
1263
stmt, {'data': 'data'}
1266
def test_select_where(self):
1267
stmt = select([self.tables.foo]).\
1268
where(self.tables.foo.c.data == bindparam('data')).\
1269
where(self.tables.foo.c.x == bindparam('x'))
1270
self._assert_raises(
1271
stmt, {'data': 'data'}
1274
@testing.requires.standalone_binds
1275
def test_select_columns(self):
1276
stmt = select([bindparam('data'), bindparam('x')])
1277
self._assert_raises(
1278
stmt, {'data': 'data'}
1281
def test_text(self):
1282
stmt = text("select * from foo where x=:x and data=:data1")
1283
self._assert_raises(
1284
stmt, {'data1': 'data'}
1287
def test_required_flag(self):
1288
is_(bindparam('foo').required, True)
1289
is_(bindparam('foo', required=False).required, False)
1290
is_(bindparam('foo', 'bar').required, False)
1291
is_(bindparam('foo', 'bar', required=True).required, True)
1294
is_(bindparam('foo', callable_=c, required=True).required, True)
1295
is_(bindparam('foo', callable_=c).required, False)
1296
is_(bindparam('foo', callable_=c, required=False).required, False)
1298
class TableInsertTest(fixtures.TablesTest):
1299
"""test for consistent insert behavior across dialects
1300
regarding the inline=True flag, lower-case 't' tables.
1303
run_create_tables = 'each'
1306
def define_tables(cls, metadata):
1307
Table('foo', metadata,
1308
Column('id', Integer, Sequence('t_id_seq'), primary_key=True),
1309
Column('data', String(50)),
1310
Column('x', Integer)
1313
def _fixture(self, types=True):
1315
t = sql.table('foo', sql.column('id', Integer),
1316
sql.column('data', String),
1317
sql.column('x', Integer))
1319
t = sql.table('foo', sql.column('id'),
1324
def _test(self, stmt, row, returning=None, inserted_primary_key=False):
1325
r = testing.db.execute(stmt)
1328
returned = r.first()
1329
eq_(returned, returning)
1330
elif inserted_primary_key is not False:
1331
eq_(r.inserted_primary_key, inserted_primary_key)
1333
eq_(testing.db.execute(self.tables.foo.select()).first(), row)
1335
def _test_multi(self, stmt, rows, data):
1336
testing.db.execute(stmt, rows)
1338
testing.db.execute(self.tables.foo.select().
1339
order_by(self.tables.foo.c.id)).fetchall(),
1342
@testing.requires.sequences
1343
def test_expicit_sequence(self):
1347
id=func.next_value(Sequence('t_id_seq')),
1353
def test_uppercase(self):
1361
inserted_primary_key=[1]
1364
def test_uppercase_inline(self):
1367
t.insert(inline=True).values(
1372
inserted_primary_key=[1]
1375
def test_uppercase_inline_implicit(self):
1378
t.insert(inline=True).values(
1382
inserted_primary_key=[None]
1385
def test_uppercase_implicit(self):
1388
t.insert().values(data='data', x=5),
1390
inserted_primary_key=[1]
1393
def test_uppercase_direct_params(self):
1396
t.insert().values(id=1, data='data', x=5),
1398
inserted_primary_key=[1]
1401
@testing.requires.returning
1402
def test_uppercase_direct_params_returning(self):
1406
id=1, data='data', x=5).returning(t.c.id, t.c.x),
1411
@testing.fails_on('mssql',
1412
"lowercase table doesn't support identity insert disable")
1413
def test_direct_params(self):
1416
t.insert().values(id=1, data='data', x=5),
1418
inserted_primary_key=[]
1421
@testing.fails_on('mssql',
1422
"lowercase table doesn't support identity insert disable")
1423
@testing.requires.returning
1424
def test_direct_params_returning(self):
1428
id=1, data='data', x=5).returning(t.c.id, t.c.x),
1433
@testing.requires.emulated_lastrowid
1434
def test_implicit_pk(self):
1440
inserted_primary_key=[]
1443
@testing.requires.emulated_lastrowid
1444
def test_implicit_pk_multi_rows(self):
1449
{'data':'d1', 'x':5},
1450
{'data':'d2', 'x':6},
1451
{'data':'d3', 'x':7},
1460
@testing.requires.emulated_lastrowid
1461
def test_implicit_pk_inline(self):
1464
t.insert(inline=True).values(data='data', x=5),
1466
inserted_primary_key=[]
1193
1469
class PercentSchemaNamesTest(fixtures.TestBase):
1194
1470
"""tests using percent signs, spaces in table and column names.
1559
1823
global metadata, t1, t2, t3
1560
1824
metadata = MetaData(testing.db)
1561
1825
t1 = Table('t1', metadata,
1562
Column('col1', Integer, Sequence('t1pkseq'), primary_key=True),
1826
Column('col1', Integer, test_needs_autoincrement=True, primary_key=True),
1563
1827
Column('col2', String(30)),
1564
1828
Column('col3', String(40)),
1565
1829
Column('col4', String(30))
1567
1831
t2 = Table('t2', metadata,
1568
Column('col1', Integer, Sequence('t2pkseq'), primary_key=True),
1832
Column('col1', Integer, test_needs_autoincrement=True, primary_key=True),
1569
1833
Column('col2', String(30)),
1570
1834
Column('col3', String(40)),
1571
1835
Column('col4', String(30)))
1572
1836
t3 = Table('t3', metadata,
1573
Column('col1', Integer, Sequence('t3pkseq'), primary_key=True),
1837
Column('col1', Integer, test_needs_autoincrement=True, primary_key=True),
1574
1838
Column('col2', String(30)),
1575
1839
Column('col3', String(40)),
1576
1840
Column('col4', String(30)))