~ubuntu-branches/debian/jessie/sqlalchemy/jessie

« back to all changes in this revision

Viewing changes to test/sql/test_query.py

  • Committer: Package Import Robot
  • Author(s): Piotr Ożarowski, Jakub Wilk, Piotr Ożarowski
  • Date: 2013-07-06 20:53:52 UTC
  • mfrom: (1.4.23) (16.1.17 experimental)
  • Revision ID: package-import@ubuntu.com-20130706205352-ryppl1eto3illd79
Tags: 0.8.2-1
[ Jakub Wilk ]
* Use canonical URIs for Vcs-* fields.

[ Piotr Ożarowski ]
* New upstream release
* Upload to unstable
* Build depend on python3-all instead of -dev, extensions are not built for
  Python 3.X 

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
from test.lib.testing import eq_, assert_raises_message, assert_raises
 
1
from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, is_
 
2
from sqlalchemy import testing
 
3
from sqlalchemy.testing import fixtures, engines
 
4
from sqlalchemy import util
2
5
import datetime
3
6
from sqlalchemy import *
4
 
from sqlalchemy import exc, sql, util
5
 
from sqlalchemy.engine import default, base
6
 
from test.lib import *
7
 
from test.lib.schema import Table, Column
 
7
from sqlalchemy import exc, sql
 
8
from sqlalchemy.engine import default, result as _result
 
9
from sqlalchemy.testing.schema import Table, Column
 
10
 
 
11
# ongoing - these are old tests.  those which are of general use
 
12
# to test a dialect are being slowly migrated to
 
13
# sqlalhcemy.testing.suite
8
14
 
9
15
class QueryTest(fixtures.TestBase):
10
16
 
42
48
    def teardown_class(cls):
43
49
        metadata.drop_all()
44
50
 
45
 
    def test_insert(self):
46
 
        users.insert().execute(user_id = 7, user_name = 'jack')
47
 
        assert users.count().scalar() == 1
 
51
    @testing.requires.multivalues_inserts
 
52
    def test_multivalues_insert(self):
 
53
        users.insert(values=[{'user_id':7, 'user_name':'jack'},
 
54
            {'user_id':8, 'user_name':'ed'}]).execute()
 
55
        rows = users.select().order_by(users.c.user_id).execute().fetchall()
 
56
        self.assert_(rows[0] == (7, 'jack'))
 
57
        self.assert_(rows[1] == (8, 'ed'))
 
58
        users.insert(values=[(9, 'jack'), (10, 'ed')]).execute()
 
59
        rows = users.select().order_by(users.c.user_id).execute().fetchall()
 
60
        self.assert_(rows[2] == (9, 'jack'))
 
61
        self.assert_(rows[3] == (10, 'ed'))
48
62
 
49
63
    def test_insert_heterogeneous_params(self):
50
 
        """test that executemany parameters are asserted to match the parameter set of the first."""
 
64
        """test that executemany parameters are asserted to match the
 
65
        parameter set of the first."""
51
66
 
52
67
        assert_raises_message(exc.StatementError,
53
68
            r"A value is required for bind parameter 'user_name', in "
68
83
            {'user_id':9}
69
84
        )
70
85
 
71
 
    def test_update(self):
72
 
        users.insert().execute(user_id = 7, user_name = 'jack')
73
 
        assert users.count().scalar() == 1
74
 
 
75
 
        users.update(users.c.user_id == 7).execute(user_name = 'fred')
76
 
        assert users.select(users.c.user_id==7).execute().first()['user_name'] == 'fred'
77
 
 
78
86
    def test_lastrow_accessor(self):
79
87
        """Tests the inserted_primary_key and lastrow_has_id() functions."""
80
88
 
182
190
                try:
183
191
                    table.create(bind=engine, checkfirst=True)
184
192
                    i = insert_values(engine, table, values)
185
 
                    assert i == assertvalues, "tablename: %s %r %r" % (table.name, repr(i), repr(assertvalues))
 
193
                    assert i == assertvalues, "tablename: %s %r %r" % \
 
194
                            (table.name, repr(i), repr(assertvalues))
186
195
                finally:
187
196
                    table.drop(bind=engine)
188
197
 
 
198
    @testing.only_on('sqlite+pysqlite')
 
199
    @testing.provide_metadata
 
200
    def test_lastrowid_zero(self):
 
201
        from sqlalchemy.dialects import sqlite
 
202
        eng = engines.testing_engine()
 
203
        class ExcCtx(sqlite.base.SQLiteExecutionContext):
 
204
            def get_lastrowid(self):
 
205
                return 0
 
206
        eng.dialect.execution_ctx_cls = ExcCtx
 
207
        t = Table('t', MetaData(), Column('x', Integer, primary_key=True),
 
208
                            Column('y', Integer))
 
209
        t.create(eng)
 
210
        r = eng.execute(t.insert().values(y=5))
 
211
        eq_(r.inserted_primary_key, [0])
 
212
 
 
213
 
189
214
    @testing.fails_on('sqlite', "sqlite autoincremnt doesn't work with composite pks")
190
215
    def test_misordered_lastrow(self):
191
216
        related = Table('related', metadata,
194
219
        )
195
220
        t6 = Table("t6", metadata,
196
221
            Column('manual_id', Integer, ForeignKey('related.id'), primary_key=True),
197
 
            Column('auto_id', Integer, primary_key=True, test_needs_autoincrement=True),
 
222
            Column('auto_id', Integer, primary_key=True,
 
223
                                    test_needs_autoincrement=True),
198
224
            mysql_engine='MyISAM'
199
225
        )
200
226
 
206
232
        r = t6.insert().values(manual_id=id).execute()
207
233
        eq_(r.inserted_primary_key, [12, 1])
208
234
 
209
 
    def test_autoclose_on_insert(self):
210
 
        if testing.against('firebird', 'postgresql', 'oracle', 'mssql'):
211
 
            test_engines = [
212
 
                engines.testing_engine(options={'implicit_returning':False}),
213
 
                engines.testing_engine(options={'implicit_returning':True}),
214
 
            ]
215
 
        else:
216
 
            test_engines = [testing.db]
217
 
 
218
 
        for engine in test_engines:
219
 
 
220
 
            r = engine.execute(users.insert(),
221
 
                {'user_name':'jack'},
222
 
            )
223
 
            assert r.closed
224
235
 
225
236
    def test_row_iteration(self):
226
237
        users.insert().execute(
234
245
            l.append(row)
235
246
        self.assert_(len(l) == 3)
236
247
 
237
 
    @testing.fails_on('firebird', "kinterbasdb doesn't send full type information")
238
248
    @testing.requires.subqueries
239
249
    def test_anonymous_rows(self):
240
250
        users.insert().execute(
450
460
        ):
451
461
            eq_(expr.execute().fetchall(), result)
452
462
 
453
 
    @testing.fails_on("firebird", "see dialect.test_firebird:MiscTest.test_percents_in_text")
454
 
    @testing.fails_on("oracle", "neither % nor %% are accepted")
455
 
    @testing.fails_on("informix", "neither % nor %% are accepted")
456
 
    @testing.fails_on("+pg8000", "can't interpret result column from '%%'")
 
463
    @testing.requires.mod_operator_as_percent_sign
457
464
    @testing.emits_warning('.*now automatically escapes.*')
458
465
    def test_percents_in_text(self):
459
466
        for expr, result in (
541
548
        a_eq(prep(r"(\:that$other)"), "(:that$other)")
542
549
        a_eq(prep(r".\:that$ :other."), ".:that$ ?.")
543
550
 
 
551
    @testing.requires.standalone_binds
544
552
    def test_select_from_bindparam(self):
545
553
        """Test result row processing when selecting from a plain bind param."""
546
554
 
562
570
        )
563
571
 
564
572
 
565
 
    def test_delete(self):
566
 
        users.insert().execute(user_id = 7, user_name = 'jack')
567
 
        users.insert().execute(user_id = 8, user_name = 'fred')
568
 
        print repr(users.select().execute().fetchall())
569
 
 
570
 
        users.delete(users.c.user_name == 'fred').execute()
571
 
 
572
 
        print repr(users.select().execute().fetchall())
573
 
 
574
 
 
575
573
 
576
574
    @testing.exclude('mysql', '<', (5, 0, 37), 'database bug')
577
575
    def test_scalar_select(self):
710
708
                              use_labels=labels),
711
709
                 [(3, 'a'), (2, 'b'), (1, None)])
712
710
 
 
711
    @testing.fails_on('mssql+pyodbc',
 
712
        "pyodbc result row doesn't support slicing")
713
713
    def test_column_slices(self):
714
714
        users.insert().execute(user_id=1, user_name='john')
715
715
        users.insert().execute(user_id=2, user_name='jack')
837
837
            lambda: r['foo']
838
838
        )
839
839
 
840
 
    @testing.requires.dbapi_lastrowid
841
 
    def test_native_lastrowid(self):
842
 
        r = testing.db.execute(
843
 
            users.insert(),
844
 
            {'user_id':1, 'user_name':'ed'}
845
 
        )
846
 
 
847
 
        eq_(r.lastrowid, 1)
848
 
 
849
 
    def test_returns_rows_flag_insert(self):
850
 
        r = testing.db.execute(
851
 
            users.insert(),
852
 
            {'user_id':1, 'user_name':'ed'}
853
 
        )
854
 
        assert r.is_insert
855
 
        assert not r.returns_rows
856
 
 
857
 
    def test_returns_rows_flag_update(self):
858
 
        r = testing.db.execute(
859
 
            users.update().values(user_name='fred')
860
 
        )
861
 
        assert not r.is_insert
862
 
        assert not r.returns_rows
863
 
 
864
 
    def test_returns_rows_flag_select(self):
865
 
        r = testing.db.execute(
866
 
            users.select()
867
 
        )
868
 
        assert not r.is_insert
869
 
        assert r.returns_rows
870
 
 
871
 
    @testing.requires.returning
872
 
    def test_returns_rows_flag_insert_returning(self):
873
 
        r = testing.db.execute(
874
 
            users.insert().returning(users.c.user_id),
875
 
            {'user_id':1, 'user_name':'ed'}
876
 
        )
877
 
        assert r.is_insert
878
 
        assert r.returns_rows
 
840
 
879
841
 
880
842
    def test_graceful_fetch_on_non_rows(self):
881
843
        """test that calling fetchone() etc. on a result that doesn't
885
847
 
886
848
        # these proxies don't work with no cursor.description present.
887
849
        # so they don't apply to this test at the moment.
888
 
        # base.FullyBufferedResultProxy,
889
 
        # base.BufferedRowResultProxy,
890
 
        # base.BufferedColumnResultProxy
 
850
        # result.FullyBufferedResultProxy,
 
851
        # result.BufferedRowResultProxy,
 
852
        # result.BufferedColumnResultProxy
891
853
 
892
854
        conn = testing.db.connect()
893
855
        for meth in ('fetchone', 'fetchall', 'first', 'scalar', 'fetchmany'):
919
881
            result.fetchone
920
882
        )
921
883
 
922
 
    def test_result_case_sensitivity(self):
923
 
        """test name normalization for result sets."""
924
 
 
 
884
    def test_row_case_sensitive(self):
925
885
        row = testing.db.execute(
926
886
            select([
927
887
                literal_column("1").label("case_insensitive"),
929
889
            ])
930
890
        ).first()
931
891
 
932
 
        assert row.keys() == ["case_insensitive", "CaseSensitive"]
 
892
        eq_(row.keys(), ["case_insensitive", "CaseSensitive"])
 
893
        eq_(row["case_insensitive"], 1)
 
894
        eq_(row["CaseSensitive"], 2)
 
895
 
 
896
        assert_raises(
 
897
            KeyError,
 
898
            lambda: row["Case_insensitive"]
 
899
        )
 
900
        assert_raises(
 
901
            KeyError,
 
902
            lambda: row["casesensitive"]
 
903
        )
 
904
 
 
905
    def test_row_case_insensitive(self):
 
906
        ins_db = engines.testing_engine(options={"case_sensitive":False})
 
907
        row = ins_db.execute(
 
908
            select([
 
909
                literal_column("1").label("case_insensitive"),
 
910
                literal_column("2").label("CaseSensitive")
 
911
            ])
 
912
        ).first()
 
913
 
 
914
        eq_(row.keys(), ["case_insensitive", "CaseSensitive"])
 
915
        eq_(row["case_insensitive"], 1)
 
916
        eq_(row["CaseSensitive"], 2)
 
917
        eq_(row["Case_insensitive"],1)
 
918
        eq_(row["casesensitive"],2)
933
919
 
934
920
 
935
921
    def test_row_as_args(self):
940
926
        eq_(users.select().execute().fetchall(), [(1, 'john')])
941
927
 
942
928
    def test_result_as_args(self):
943
 
        users.insert().execute([dict(user_id=1, user_name='john'), dict(user_id=2, user_name='ed')])
 
929
        users.insert().execute([
 
930
                dict(user_id=1, user_name='john'),
 
931
                dict(user_id=2, user_name='ed')])
944
932
        r = users.select().execute()
945
933
        users2.insert().execute(list(r))
946
 
        assert users2.select().execute().fetchall() == [(1, 'john'), (2, 'ed')]
 
934
        eq_(
 
935
            users2.select().order_by(users2.c.user_id).execute().fetchall(),
 
936
            [(1, 'john'), (2, 'ed')]
 
937
        )
947
938
 
948
939
        users2.delete().execute()
949
940
        r = users.select().execute()
950
941
        users2.insert().execute(*list(r))
951
 
        assert users2.select().execute().fetchall() == [(1, 'john'), (2, 'ed')]
 
942
        eq_(
 
943
            users2.select().order_by(users2.c.user_id).execute().fetchall(),
 
944
            [(1, 'john'), (2, 'ed')]
 
945
        )
952
946
 
953
947
    def test_ambiguous_column(self):
954
948
        users.insert().execute(user_id=1, user_name='john')
955
 
        r = users.outerjoin(addresses).select().execute().first()
 
949
        result = users.outerjoin(addresses).select().execute()
 
950
        r = result.first()
 
951
 
956
952
        assert_raises_message(
957
953
            exc.InvalidRequestError,
958
954
            "Ambiguous column name",
959
955
            lambda: r['user_id']
960
956
        )
961
957
 
 
958
        assert_raises_message(
 
959
            exc.InvalidRequestError,
 
960
            "Ambiguous column name",
 
961
            lambda: r[users.c.user_id]
 
962
        )
 
963
 
 
964
        assert_raises_message(
 
965
            exc.InvalidRequestError,
 
966
            "Ambiguous column name",
 
967
            lambda: r[addresses.c.user_id]
 
968
        )
 
969
 
 
970
        # try to trick it - fake_table isn't in the result!
 
971
        # we get the correct error
 
972
        fake_table = Table('fake', MetaData(), Column('user_id', Integer))
 
973
        assert_raises_message(
 
974
            exc.InvalidRequestError,
 
975
            "Could not locate column in row for column 'fake.user_id'",
 
976
            lambda: r[fake_table.c.user_id]
 
977
        )
 
978
 
962
979
        r = util.pickle.loads(util.pickle.dumps(r))
963
980
        assert_raises_message(
964
981
            exc.InvalidRequestError,
967
984
        )
968
985
 
969
986
        result = users.outerjoin(addresses).select().execute()
970
 
        result = base.BufferedColumnResultProxy(result.context)
 
987
        result = _result.BufferedColumnResultProxy(result.context)
971
988
        r = result.first()
972
 
        assert isinstance(r, base.BufferedColumnRow)
 
989
        assert isinstance(r, _result.BufferedColumnRow)
973
990
        assert_raises_message(
974
991
            exc.InvalidRequestError,
975
992
            "Ambiguous column name",
989
1006
            lambda: row[users.c.user_id]
990
1007
        )
991
1008
 
992
 
        # this is a bug, should be ambiguous.
993
 
        # Fixed in 0.8
994
 
        eq_(row[ua.c.user_id], 1)
 
1009
        assert_raises_message(
 
1010
            exc.InvalidRequestError,
 
1011
            "Ambiguous column name",
 
1012
            lambda: row[ua.c.user_id]
 
1013
        )
995
1014
 
996
 
        # this is also a less severe bug - u2.c.user_id
997
 
        # is not in the row at all so is not actually
998
 
        # ambiguous.  Still is like this in 0.8
999
 
        # and is due to overly liberal "this is a derived column"
1000
 
        # rules.
 
1015
        # Unfortunately, this fails -
 
1016
        # we'd like
 
1017
        # "Could not locate column in row"
 
1018
        # to be raised here, but the check for
 
1019
        # "common column" in _compare_name_for_result()
 
1020
        # has other requirements to be more liberal.
 
1021
        # Ultimately the
 
1022
        # expression system would need a way to determine
 
1023
        # if given two columns in a "proxy" relationship, if they
 
1024
        # refer to a different parent table
1001
1025
        assert_raises_message(
1002
1026
            exc.InvalidRequestError,
1003
1027
            "Ambiguous column name",
1004
1028
            lambda: row[u2.c.user_id]
1005
1029
        )
1006
1030
 
 
1031
    def test_ambiguous_column_contains(self):
 
1032
        # ticket 2702.  in 0.7 we'd get True, False.
 
1033
        # in 0.8, both columns are present so it's True;
 
1034
        # but when they're fetched you'll get the ambiguous error.
 
1035
        users.insert().execute(user_id=1, user_name='john')
 
1036
        result = select([
 
1037
                    users.c.user_id,
 
1038
                    addresses.c.user_id]).\
 
1039
                    select_from(users.outerjoin(addresses)).execute()
 
1040
        row = result.first()
 
1041
 
 
1042
        eq_(
 
1043
            set([users.c.user_id in row, addresses.c.user_id in row]),
 
1044
            set([True])
 
1045
        )
 
1046
 
 
1047
    def test_ambiguous_column_by_col_plus_label(self):
 
1048
        users.insert().execute(user_id=1, user_name='john')
 
1049
        result = select([users.c.user_id,
 
1050
                        type_coerce(users.c.user_id, Integer).label('foo')]
 
1051
                    ).execute()
 
1052
        row = result.first()
 
1053
        eq_(
 
1054
            row[users.c.user_id], 1
 
1055
        )
 
1056
        eq_(
 
1057
            row[1], 1
 
1058
        )
 
1059
 
1007
1060
    @testing.requires.subqueries
1008
1061
    def test_column_label_targeting(self):
1009
1062
        users.insert().execute(user_id=7, user_name='ed')
1038
1091
        r = testing.db.execute('select user_name from query_users').first()
1039
1092
        eq_(len(r), 1)
1040
1093
 
1041
 
    @testing.uses_deprecated(r'.*which subclass Executable')
1042
 
    def test_cant_execute_join(self):
1043
 
        try:
1044
 
            users.join(addresses).execute()
1045
 
        except exc.StatementError, e:
1046
 
            assert str(e).startswith('Not an executable clause ')
1047
 
 
1048
 
 
1049
1094
 
1050
1095
    def test_column_order_with_simple_query(self):
1051
1096
        # should return values in column definition order
1129
1174
    @testing.emits_warning('.*empty sequence.*')
1130
1175
    @testing.fails_on('firebird', "uses sql-92 rules")
1131
1176
    @testing.fails_on('sybase', "uses sql-92 rules")
1132
 
    @testing.fails_on('mssql+mxodbc', "uses sql-92 rules")
1133
1177
    @testing.fails_if(lambda:
1134
1178
                         testing.against('mssql+pyodbc') and not testing.db.dialect.freetds,
1135
1179
                         "uses sql-92 rules")
1154
1198
        assert len(r) == 0
1155
1199
 
1156
1200
    @testing.emits_warning('.*empty sequence.*')
1157
 
    @testing.fails_on('firebird', 'uses sql-92 bind rules')
1158
1201
    def test_literal_in(self):
1159
1202
        """similar to test_bind_in but use a bind with a value."""
1160
1203
 
1190
1233
        r = s.execute().fetchall()
1191
1234
        assert len(r) == 1
1192
1235
 
 
1236
class RequiredBindTest(fixtures.TablesTest):
 
1237
    run_create_tables = None
 
1238
    run_deletes = None
 
1239
 
 
1240
    @classmethod
 
1241
    def define_tables(cls, metadata):
 
1242
        Table('foo', metadata,
 
1243
                Column('id', Integer, primary_key=True),
 
1244
                Column('data', String(50)),
 
1245
                Column('x', Integer)
 
1246
            )
 
1247
 
 
1248
    def _assert_raises(self, stmt, params):
 
1249
        assert_raises_message(
 
1250
            exc.StatementError,
 
1251
            "A value is required for bind parameter 'x'",
 
1252
            testing.db.execute, stmt, **params)
 
1253
 
 
1254
        assert_raises_message(
 
1255
            exc.StatementError,
 
1256
            "A value is required for bind parameter 'x'",
 
1257
            testing.db.execute, stmt, params)
 
1258
 
 
1259
    def test_insert(self):
 
1260
        stmt = self.tables.foo.insert().values(x=bindparam('x'),
 
1261
                                    data=bindparam('data'))
 
1262
        self._assert_raises(
 
1263
            stmt, {'data': 'data'}
 
1264
        )
 
1265
 
 
1266
    def test_select_where(self):
 
1267
        stmt = select([self.tables.foo]).\
 
1268
                    where(self.tables.foo.c.data == bindparam('data')).\
 
1269
                    where(self.tables.foo.c.x == bindparam('x'))
 
1270
        self._assert_raises(
 
1271
            stmt, {'data': 'data'}
 
1272
        )
 
1273
 
 
1274
    @testing.requires.standalone_binds
 
1275
    def test_select_columns(self):
 
1276
        stmt = select([bindparam('data'), bindparam('x')])
 
1277
        self._assert_raises(
 
1278
            stmt, {'data': 'data'}
 
1279
        )
 
1280
 
 
1281
    def test_text(self):
 
1282
        stmt = text("select * from foo where x=:x and data=:data1")
 
1283
        self._assert_raises(
 
1284
            stmt, {'data1': 'data'}
 
1285
        )
 
1286
 
 
1287
    def test_required_flag(self):
 
1288
        is_(bindparam('foo').required, True)
 
1289
        is_(bindparam('foo', required=False).required, False)
 
1290
        is_(bindparam('foo', 'bar').required, False)
 
1291
        is_(bindparam('foo', 'bar', required=True).required, True)
 
1292
 
 
1293
        c = lambda: None
 
1294
        is_(bindparam('foo', callable_=c, required=True).required, True)
 
1295
        is_(bindparam('foo', callable_=c).required, False)
 
1296
        is_(bindparam('foo', callable_=c, required=False).required, False)
 
1297
 
 
1298
class TableInsertTest(fixtures.TablesTest):
 
1299
    """test for consistent insert behavior across dialects
 
1300
    regarding the inline=True flag, lower-case 't' tables.
 
1301
 
 
1302
    """
 
1303
    run_create_tables = 'each'
 
1304
 
 
1305
    @classmethod
 
1306
    def define_tables(cls, metadata):
 
1307
        Table('foo', metadata,
 
1308
                Column('id', Integer, Sequence('t_id_seq'), primary_key=True),
 
1309
                Column('data', String(50)),
 
1310
                Column('x', Integer)
 
1311
            )
 
1312
 
 
1313
    def _fixture(self, types=True):
 
1314
        if types:
 
1315
            t = sql.table('foo', sql.column('id', Integer),
 
1316
                        sql.column('data', String),
 
1317
                        sql.column('x', Integer))
 
1318
        else:
 
1319
            t = sql.table('foo', sql.column('id'),
 
1320
                        sql.column('data'),
 
1321
                        sql.column('x'))
 
1322
        return t
 
1323
 
 
1324
    def _test(self, stmt, row, returning=None, inserted_primary_key=False):
 
1325
        r = testing.db.execute(stmt)
 
1326
 
 
1327
        if returning:
 
1328
            returned = r.first()
 
1329
            eq_(returned, returning)
 
1330
        elif inserted_primary_key is not False:
 
1331
            eq_(r.inserted_primary_key, inserted_primary_key)
 
1332
 
 
1333
        eq_(testing.db.execute(self.tables.foo.select()).first(), row)
 
1334
 
 
1335
    def _test_multi(self, stmt, rows, data):
 
1336
        testing.db.execute(stmt, rows)
 
1337
        eq_(
 
1338
            testing.db.execute(self.tables.foo.select().
 
1339
                            order_by(self.tables.foo.c.id)).fetchall(),
 
1340
            data)
 
1341
 
 
1342
    @testing.requires.sequences
 
1343
    def test_expicit_sequence(self):
 
1344
        t = self._fixture()
 
1345
        self._test(
 
1346
            t.insert().values(
 
1347
                        id=func.next_value(Sequence('t_id_seq')),
 
1348
                        data='data', x=5
 
1349
                    ),
 
1350
            (1, 'data', 5)
 
1351
        )
 
1352
 
 
1353
    def test_uppercase(self):
 
1354
        t = self.tables.foo
 
1355
        self._test(
 
1356
            t.insert().values(
 
1357
                        id=1,
 
1358
                        data='data', x=5
 
1359
                    ),
 
1360
            (1, 'data', 5),
 
1361
            inserted_primary_key=[1]
 
1362
        )
 
1363
 
 
1364
    def test_uppercase_inline(self):
 
1365
        t = self.tables.foo
 
1366
        self._test(
 
1367
            t.insert(inline=True).values(
 
1368
                        id=1,
 
1369
                        data='data', x=5
 
1370
                    ),
 
1371
            (1, 'data', 5),
 
1372
            inserted_primary_key=[1]
 
1373
        )
 
1374
 
 
1375
    def test_uppercase_inline_implicit(self):
 
1376
        t = self.tables.foo
 
1377
        self._test(
 
1378
            t.insert(inline=True).values(
 
1379
                        data='data', x=5
 
1380
                    ),
 
1381
            (1, 'data', 5),
 
1382
            inserted_primary_key=[None]
 
1383
        )
 
1384
 
 
1385
    def test_uppercase_implicit(self):
 
1386
        t = self.tables.foo
 
1387
        self._test(
 
1388
            t.insert().values(data='data', x=5),
 
1389
            (1, 'data', 5),
 
1390
            inserted_primary_key=[1]
 
1391
        )
 
1392
 
 
1393
    def test_uppercase_direct_params(self):
 
1394
        t = self.tables.foo
 
1395
        self._test(
 
1396
            t.insert().values(id=1, data='data', x=5),
 
1397
            (1, 'data', 5),
 
1398
            inserted_primary_key=[1]
 
1399
        )
 
1400
 
 
1401
    @testing.requires.returning
 
1402
    def test_uppercase_direct_params_returning(self):
 
1403
        t = self.tables.foo
 
1404
        self._test(
 
1405
            t.insert().values(
 
1406
                        id=1, data='data', x=5).returning(t.c.id, t.c.x),
 
1407
            (1, 'data', 5),
 
1408
            returning=(1, 5)
 
1409
        )
 
1410
 
 
1411
    @testing.fails_on('mssql',
 
1412
        "lowercase table doesn't support identity insert disable")
 
1413
    def test_direct_params(self):
 
1414
        t = self._fixture()
 
1415
        self._test(
 
1416
            t.insert().values(id=1, data='data', x=5),
 
1417
            (1, 'data', 5),
 
1418
            inserted_primary_key=[]
 
1419
        )
 
1420
 
 
1421
    @testing.fails_on('mssql',
 
1422
        "lowercase table doesn't support identity insert disable")
 
1423
    @testing.requires.returning
 
1424
    def test_direct_params_returning(self):
 
1425
        t = self._fixture()
 
1426
        self._test(
 
1427
            t.insert().values(
 
1428
                        id=1, data='data', x=5).returning(t.c.id, t.c.x),
 
1429
            (1, 'data', 5),
 
1430
            returning=(1, 5)
 
1431
        )
 
1432
 
 
1433
    @testing.requires.emulated_lastrowid
 
1434
    def test_implicit_pk(self):
 
1435
        t = self._fixture()
 
1436
        self._test(
 
1437
            t.insert().values(
 
1438
                        data='data', x=5),
 
1439
            (1, 'data', 5),
 
1440
            inserted_primary_key=[]
 
1441
        )
 
1442
 
 
1443
    @testing.requires.emulated_lastrowid
 
1444
    def test_implicit_pk_multi_rows(self):
 
1445
        t = self._fixture()
 
1446
        self._test_multi(
 
1447
            t.insert(),
 
1448
            [
 
1449
                {'data':'d1', 'x':5},
 
1450
                {'data':'d2', 'x':6},
 
1451
                {'data':'d3', 'x':7},
 
1452
            ],
 
1453
            [
 
1454
                (1, 'd1', 5),
 
1455
                (2, 'd2', 6),
 
1456
                (3, 'd3', 7)
 
1457
            ],
 
1458
        )
 
1459
 
 
1460
    @testing.requires.emulated_lastrowid
 
1461
    def test_implicit_pk_inline(self):
 
1462
        t = self._fixture()
 
1463
        self._test(
 
1464
            t.insert(inline=True).values(data='data', x=5),
 
1465
            (1, 'data', 5),
 
1466
            inserted_primary_key=[]
 
1467
        )
 
1468
 
1193
1469
class PercentSchemaNamesTest(fixtures.TestBase):
1194
1470
    """tests using percent signs, spaces in table and column names.
1195
1471
 
1344
1620
            Column('ctype', String(30), key="content_type")
1345
1621
        )
1346
1622
 
 
1623
        if testing.requires.schemas.enabled:
 
1624
            wschema = Table('wschema', metadata,
 
1625
                Column("a", CHAR(2), key="b"),
 
1626
                Column("c", CHAR(2), key="q"),
 
1627
                schema="test_schema"
 
1628
            )
 
1629
 
1347
1630
    @classmethod
1348
1631
    def insert_data(cls):
1349
1632
        cls.tables.keyed1.insert().execute(dict(b="a1", q="c1"))
1352
1635
        cls.tables.keyed4.insert().execute(dict(b="b4", q="q4"))
1353
1636
        cls.tables.content.insert().execute(type="t1")
1354
1637
 
 
1638
        if testing.requires.schemas.enabled:
 
1639
            cls.tables['test_schema.wschema'].insert().execute(dict(b="a1", q="c1"))
 
1640
 
 
1641
    @testing.requires.schemas
 
1642
    def test_keyed_accessor_wschema(self):
 
1643
        keyed1 = self.tables['test_schema.wschema']
 
1644
        row = testing.db.execute(keyed1.select()).first()
 
1645
 
 
1646
        eq_(row.b, "a1")
 
1647
        eq_(row.q, "c1")
 
1648
        eq_(row.a, "a1")
 
1649
        eq_(row.c, "c1")
 
1650
 
 
1651
 
1355
1652
    def test_keyed_accessor_single(self):
1356
1653
        keyed1 = self.tables.keyed1
1357
1654
        row = testing.db.execute(keyed1.select()).first()
1375
1672
        keyed2 = self.tables.keyed2
1376
1673
 
1377
1674
        row = testing.db.execute(select([keyed1, keyed2])).first()
1378
 
        # without #2397, row.b is unambiguous
 
1675
        # row.b is unambiguous
1379
1676
        eq_(row.b, "b2")
1380
1677
        # row.a is ambiguous
1381
1678
        assert_raises_message(
1384
1681
            getattr, row, "a"
1385
1682
        )
1386
1683
 
1387
 
    @testing.fails_if(lambda: True, "Possible future behavior")
1388
 
    def test_keyed_accessor_composite_conflict_2397(self):
1389
 
        keyed1 = self.tables.keyed1
1390
 
        keyed2 = self.tables.keyed2
1391
 
 
1392
 
        row = testing.db.execute(select([keyed1, keyed2])).first()
1393
 
        # with #2397, row.a is unambiguous
1394
 
        eq_(row.a, "a2")
1395
 
        # row.b is ambiguous
1396
 
        assert_raises_message(
1397
 
            exc.InvalidRequestError,
1398
 
            "Ambiguous column name 'b'",
1399
 
            getattr, row, 'b'
1400
 
        )
1401
 
 
1402
1684
    def test_keyed_accessor_composite_names_precedent(self):
1403
1685
        keyed1 = self.tables.keyed1
1404
1686
        keyed4 = self.tables.keyed4
1414
1696
        keyed3 = self.tables.keyed3
1415
1697
 
1416
1698
        row = testing.db.execute(select([keyed1, keyed3])).first()
1417
 
        assert 'b' not in row
1418
1699
        eq_(row.q, "c1")
1419
1700
        assert_raises_message(
1420
1701
            exc.InvalidRequestError,
 
1702
            "Ambiguous column name 'b'",
 
1703
            getattr, row, "b"
 
1704
        )
 
1705
        assert_raises_message(
 
1706
            exc.InvalidRequestError,
1421
1707
            "Ambiguous column name 'a'",
1422
1708
            getattr, row, "a"
1423
1709
        )
1424
1710
        eq_(row.d, "d3")
1425
1711
 
1426
 
    @testing.fails_if(lambda: True, "Possible future behavior")
1427
 
    def test_keyed_accessor_composite_2397(self):
1428
 
        keyed1 = self.tables.keyed1
1429
 
        keyed3 = self.tables.keyed3
1430
 
 
1431
 
        row = testing.db.execute(select([keyed1, keyed3])).first()
1432
 
        eq_(row.b, "a1")
1433
 
        eq_(row.q, "c1")
1434
 
        eq_(row.a, "a3")
1435
 
        eq_(row.d, "d3")
1436
 
 
1437
1712
    def test_keyed_accessor_composite_labeled(self):
1438
1713
        keyed1 = self.tables.keyed1
1439
1714
        keyed2 = self.tables.keyed2
1451
1726
    def test_column_label_overlap_fallback(self):
1452
1727
        content, bar = self.tables.content, self.tables.bar
1453
1728
        row = testing.db.execute(select([content.c.type.label("content_type")])).first()
1454
 
        assert content.c.type in row
 
1729
        assert content.c.type not in row
1455
1730
        assert bar.c.content_type not in row
1456
1731
        assert sql.column('content_type') in row
1457
1732
 
1461
1736
        assert sql.column('content_type') in row
1462
1737
 
1463
1738
    def test_column_label_overlap_fallback_2(self):
1464
 
        # this fails with #2397
1465
1739
        content, bar = self.tables.content, self.tables.bar
1466
1740
        row = testing.db.execute(content.select(use_labels=True)).first()
1467
1741
        assert content.c.type in row
1468
1742
        assert bar.c.content_type not in row
1469
1743
        assert sql.column('content_type') not in row
1470
1744
 
1471
 
    @testing.fails_if(lambda: True, "Possible future behavior")
1472
 
    def test_column_label_overlap_fallback_3(self):
1473
 
        # this passes with #2397
1474
 
        content, bar = self.tables.content, self.tables.bar
1475
 
        row = testing.db.execute(content.select(use_labels=True)).first()
1476
 
        assert content.c.type in row
1477
 
        assert bar.c.content_type not in row
1478
 
        assert sql.column('content_type') in row
1479
 
 
1480
1745
 
1481
1746
class LimitTest(fixtures.TestBase):
1482
1747
 
1518
1783
        self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))
1519
1784
 
1520
1785
    @testing.requires.offset
1521
 
    @testing.fails_on('maxdb', 'FIXME: unknown')
1522
1786
    def test_select_limit_offset(self):
1523
1787
        """Test the interaction between limit and offset"""
1524
1788
 
1559
1823
        global metadata, t1, t2, t3
1560
1824
        metadata = MetaData(testing.db)
1561
1825
        t1 = Table('t1', metadata,
1562
 
            Column('col1', Integer, Sequence('t1pkseq'), primary_key=True),
 
1826
            Column('col1', Integer, test_needs_autoincrement=True, primary_key=True),
1563
1827
            Column('col2', String(30)),
1564
1828
            Column('col3', String(40)),
1565
1829
            Column('col4', String(30))
1566
1830
            )
1567
1831
        t2 = Table('t2', metadata,
1568
 
            Column('col1', Integer, Sequence('t2pkseq'), primary_key=True),
 
1832
            Column('col1', Integer, test_needs_autoincrement=True, primary_key=True),
1569
1833
            Column('col2', String(30)),
1570
1834
            Column('col3', String(40)),
1571
1835
            Column('col4', String(30)))
1572
1836
        t3 = Table('t3', metadata,
1573
 
            Column('col1', Integer, Sequence('t3pkseq'), primary_key=True),
 
1837
            Column('col1', Integer, test_needs_autoincrement=True, primary_key=True),
1574
1838
            Column('col2', String(30)),
1575
1839
            Column('col3', String(40)),
1576
1840
            Column('col4', String(30)))
2143
2407
        eq_(
2144
2408
            select([flds.c.intcol % 3],
2145
2409
                   order_by=flds.c.idcol).execute().fetchall(),
2146
 
            [(2,),(1,)]
 
2410
            [(2,), (1,)]
2147
2411
        )
2148
2412
 
2149
2413
    @testing.requires.window_functions