~ubuntu-branches/debian/jessie/sqlalchemy/jessie

« back to all changes in this revision

Viewing changes to test/sql/test_compiler.py

  • Committer: Package Import Robot
  • Author(s): Piotr Ożarowski, Jakub Wilk, Piotr Ożarowski
  • Date: 2013-07-06 20:53:52 UTC
  • mfrom: (1.4.23) (16.1.17 experimental)
  • Revision ID: package-import@ubuntu.com-20130706205352-ryppl1eto3illd79
Tags: 0.8.2-1
[ Jakub Wilk ]
* Use canonical URIs for Vcs-* fields.

[ Piotr Ożarowski ]
* New upstream release
* Upload to unstable
* Build depend on python3-all instead of -dev, extensions are not built for
  Python 3.X 

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#! coding:utf-8
2
2
 
3
 
from test.lib.testing import eq_, is_, assert_raises, assert_raises_message
4
 
import datetime, re, operator, decimal
5
 
from sqlalchemy import *
 
3
"""
 
4
compiler tests.
 
5
 
 
6
These tests are among the very first that were written when SQLAlchemy
 
7
began in 2005.  As a result the testing style here is very dense;
 
8
it's an ongoing job to break these into much smaller tests with correct pep8
 
9
styling and coherent test organization.
 
10
 
 
11
"""
 
12
 
 
13
from sqlalchemy.testing import eq_, is_, assert_raises, assert_raises_message
 
14
from sqlalchemy import testing
 
15
from sqlalchemy.testing import fixtures, AssertsCompiledSQL
 
16
from sqlalchemy import Integer, String, MetaData, Table, Column, select, \
 
17
    func, not_, cast, text, tuple_, exists, update, bindparam,\
 
18
    literal, and_, null, type_coerce, alias, or_, literal_column,\
 
19
    Float, TIMESTAMP, Numeric, Date, Text, collate, union, except_,\
 
20
    intersect, union_all, Boolean, distinct, join, outerjoin, asc, desc,\
 
21
    over, subquery, case
 
22
import decimal
6
23
from sqlalchemy import exc, sql, util, types, schema
7
 
from sqlalchemy.sql import table, column, label, compiler
8
 
from sqlalchemy.sql.expression import ClauseList, _literal_as_text
 
24
from sqlalchemy.sql import table, column, label
 
25
from sqlalchemy.sql.expression import ClauseList, _literal_as_text, HasPrefixes
9
26
from sqlalchemy.engine import default
10
 
from sqlalchemy.databases import *
11
 
from test.lib import *
 
27
from sqlalchemy.dialects import mysql, mssql, postgresql, oracle, \
 
28
            sqlite, sybase
12
29
from sqlalchemy.ext.compiler import compiles
13
30
 
14
31
table1 = table('mytable',
37
54
    Column('rem_id', Integer, primary_key=True),
38
55
    Column('datatype_id', Integer),
39
56
    Column('value', String(20)),
40
 
    schema = 'remote_owner'
 
57
    schema='remote_owner'
41
58
)
42
59
 
43
60
# table with a 'multipart' schema
46
63
    Column('rem_id', Integer, primary_key=True),
47
64
    Column('datatype_id', Integer),
48
65
    Column('value', String(20)),
49
 
    schema = 'dbo.remote_owner'
 
66
    schema='dbo.remote_owner'
50
67
)
51
68
 
52
69
users = table('users',
70
87
    Column('z', Integer),
71
88
)
72
89
 
 
90
 
73
91
class SelectTest(fixtures.TestBase, AssertsCompiledSQL):
74
92
    __dialect__ = 'default'
75
93
 
90
108
                'Scalar Select expression has no '
91
109
                'columns; use this object directly within a '
92
110
                'column-level expression.',
93
 
                lambda: hasattr(select([table1.c.myid]).as_scalar().self_group(), 'columns'))
 
111
                lambda: hasattr(
 
112
                        select([table1.c.myid]).as_scalar().self_group(),
 
113
                        'columns'))
94
114
            assert_raises_message(
95
115
                exc.InvalidRequestError,
96
116
                'Scalar Select expression has no '
97
117
                'columns; use this object directly within a '
98
118
                'column-level expression.',
99
 
                lambda: hasattr(select([table1.c.myid]).as_scalar(), 'columns'))
 
119
                lambda: hasattr(select([table1.c.myid]).as_scalar(),
 
120
                            'columns'))
100
121
        else:
101
 
            assert not hasattr(select([table1.c.myid]).as_scalar().self_group(), 'columns')
 
122
            assert not hasattr(
 
123
                    select([table1.c.myid]).as_scalar().self_group(),
 
124
                            'columns')
102
125
            assert not hasattr(select([table1.c.myid]).as_scalar(), 'columns')
103
126
 
 
127
    def test_prefix_constructor(self):
 
128
        class Pref(HasPrefixes):
 
129
            def _generate(self):
 
130
                return self
 
131
        assert_raises(exc.ArgumentError,
 
132
                Pref().prefix_with,
 
133
                "some prefix", not_a_dialect=True
 
134
        )
 
135
 
104
136
    def test_table_select(self):
105
137
        self.assert_compile(table1.select(),
106
138
                            "SELECT mytable.myid, mytable.name, "
107
139
                            "mytable.description FROM mytable")
108
140
 
109
141
        self.assert_compile(select([table1, table2]),
110
 
                            "SELECT mytable.myid, mytable.name, mytable.description, "
111
 
                            "myothertable.otherid, myothertable.othername FROM mytable, "
112
 
                            "myothertable")
 
142
                "SELECT mytable.myid, mytable.name, mytable.description, "
 
143
                "myothertable.otherid, myothertable.othername FROM mytable, "
 
144
                "myothertable")
113
145
 
114
146
    def test_invalid_col_argument(self):
115
147
        assert_raises(exc.ArgumentError, select, table1)
145
177
            self.assert_compile(
146
178
                select([1]).limit(lim).offset(offset),
147
179
                "SELECT 1 " + exp,
148
 
                checkparams =params
 
180
                checkparams=params
149
181
            )
150
182
 
151
183
    def test_from_subquery(self):
152
 
        """tests placing select statements in the column clause of another select, for the
 
184
        """tests placing select statements in the column clause of
 
185
        another select, for the
153
186
        purposes of selecting from the exported columns of that select."""
154
187
 
155
188
        s = select([table1], table1.c.name == 'jack')
157
190
            select(
158
191
                [s],
159
192
                s.c.myid == 7
160
 
            )
161
 
            ,
 
193
            ),
162
194
        "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, "
163
 
        "mytable.name AS name, mytable.description AS description FROM mytable "
 
195
        "mytable.name AS name, mytable.description AS description "
 
196
        "FROM mytable "
164
197
        "WHERE mytable.name = :name_1) WHERE myid = :myid_1")
165
198
 
166
199
        sq = select([table1])
167
200
        self.assert_compile(
168
201
            sq.select(),
169
 
            "SELECT myid, name, description FROM (SELECT mytable.myid AS myid, "
170
 
            "mytable.name AS name, mytable.description AS description FROM mytable)"
 
202
            "SELECT myid, name, description FROM "
 
203
            "(SELECT mytable.myid AS myid, "
 
204
            "mytable.name AS name, mytable.description "
 
205
            "AS description FROM mytable)"
171
206
        )
172
207
 
173
208
        sq = select(
178
213
            sq.select(sq.c.myid == 7),
179
214
            "SELECT sq.myid, sq.name, sq.description FROM "
180
215
            "(SELECT mytable.myid AS myid, mytable.name AS name, "
181
 
            "mytable.description AS description FROM mytable) AS sq WHERE sq.myid = :myid_1"
 
216
            "mytable.description AS description FROM mytable) AS sq "
 
217
            "WHERE sq.myid = :myid_1"
182
218
        )
183
219
 
184
220
        sq = select(
185
221
            [table1, table2],
186
 
            and_(table1.c.myid ==7, table2.c.otherid==table1.c.myid),
187
 
            use_labels = True
 
222
            and_(table1.c.myid == 7, table2.c.otherid == table1.c.myid),
 
223
            use_labels=True
188
224
        ).alias('sq')
189
225
 
190
226
        sqstring = "SELECT mytable.myid AS mytable_myid, mytable.name AS "\
202
238
 
203
239
        sq2 = select(
204
240
            [sq],
205
 
            use_labels = True
 
241
            use_labels=True
206
242
        ).alias('sq2')
207
243
 
208
244
        self.assert_compile(
209
245
                sq2.select(),
210
246
                "SELECT sq2.sq_mytable_myid, sq2.sq_mytable_name, "
211
247
                "sq2.sq_mytable_description, sq2.sq_myothertable_otherid, "
212
 
                "sq2.sq_myothertable_othername FROM (SELECT sq.mytable_myid AS "
 
248
                "sq2.sq_myothertable_othername FROM "
 
249
                "(SELECT sq.mytable_myid AS "
213
250
                "sq_mytable_myid, sq.mytable_name AS sq_mytable_name, "
214
251
                "sq.mytable_description AS sq_mytable_description, "
215
252
                "sq.myothertable_otherid AS sq_myothertable_otherid, "
218
255
 
219
256
    def test_select_from_clauselist(self):
220
257
        self.assert_compile(
221
 
            select([ClauseList(column('a'), column('b'))]).select_from('sometable'),
 
258
            select([ClauseList(column('a'), column('b'))]
 
259
                    ).select_from('sometable'),
222
260
            'SELECT a, b FROM sometable'
223
261
        )
224
262
 
225
263
    def test_use_labels(self):
226
264
        self.assert_compile(
227
 
            select([table1.c.myid==5], use_labels=True),
 
265
            select([table1.c.myid == 5], use_labels=True),
228
266
            "SELECT mytable.myid = :myid_1 AS anon_1 FROM mytable"
229
267
        )
230
268
 
235
273
 
236
274
        self.assert_compile(
237
275
            select([not_(True)], use_labels=True),
238
 
            "SELECT NOT :param_1"       # TODO: should this make an anon label ??
 
276
            "SELECT NOT :param_1"
239
277
        )
240
278
 
241
279
        self.assert_compile(
244
282
        )
245
283
 
246
284
        self.assert_compile(
247
 
            select([func.sum(func.lala(table1.c.myid).label('foo')).label('bar')]),
 
285
            select([func.sum(
 
286
                    func.lala(table1.c.myid).label('foo')).label('bar')]),
248
287
            "SELECT sum(lala(mytable.myid)) AS bar FROM mytable"
249
288
        )
250
289
 
251
 
        # changes with #2397
252
290
        self.assert_compile(
253
291
            select([keyed]),
254
292
            "SELECT keyed.x, keyed.y"
255
293
            ", keyed.z FROM keyed"
256
294
        )
257
295
 
258
 
        # changes with #2397
259
296
        self.assert_compile(
260
297
            select([keyed]).apply_labels(),
261
298
            "SELECT keyed.x AS keyed_x, keyed.y AS "
267
304
 
268
305
        self.assert_compile(
269
306
            stmt,
270
 
            "select ?, ?, ? from sometable"
271
 
            , dialect=default.DefaultDialect(paramstyle='qmark')
272
 
        )
273
 
        self.assert_compile(
274
 
            stmt,
275
 
            "select :foo, :bar, :bat from sometable"
276
 
            , dialect=default.DefaultDialect(paramstyle='named')
277
 
        )
278
 
        self.assert_compile(
279
 
            stmt,
280
 
            "select %s, %s, %s from sometable"
281
 
            , dialect=default.DefaultDialect(paramstyle='format')
282
 
        )
283
 
        self.assert_compile(
284
 
            stmt,
285
 
            "select :1, :2, :3 from sometable"
286
 
            , dialect=default.DefaultDialect(paramstyle='numeric')
287
 
        )
288
 
        self.assert_compile(
289
 
            stmt,
290
 
            "select %(foo)s, %(bar)s, %(bat)s from sometable"
291
 
            , dialect=default.DefaultDialect(paramstyle='pyformat')
 
307
            "select ?, ?, ? from sometable",
 
308
            dialect=default.DefaultDialect(paramstyle='qmark')
 
309
        )
 
310
        self.assert_compile(
 
311
            stmt,
 
312
            "select :foo, :bar, :bat from sometable",
 
313
            dialect=default.DefaultDialect(paramstyle='named')
 
314
        )
 
315
        self.assert_compile(
 
316
            stmt,
 
317
            "select %s, %s, %s from sometable",
 
318
            dialect=default.DefaultDialect(paramstyle='format')
 
319
        )
 
320
        self.assert_compile(
 
321
            stmt,
 
322
            "select :1, :2, :3 from sometable",
 
323
            dialect=default.DefaultDialect(paramstyle='numeric')
 
324
        )
 
325
        self.assert_compile(
 
326
            stmt,
 
327
            "select %(foo)s, %(bar)s, %(bat)s from sometable",
 
328
            dialect=default.DefaultDialect(paramstyle='pyformat')
292
329
        )
293
330
 
294
331
    def test_dupe_columns(self):
297
334
 
298
335
        self.assert_compile(
299
336
            select([column('a'), column('a'), column('a')]),
300
 
            "SELECT a, a, a"
301
 
            , dialect=default.DefaultDialect()
 
337
            "SELECT a, a, a", dialect=default.DefaultDialect()
302
338
        )
303
339
 
304
340
        c = column('a')
305
341
        self.assert_compile(
306
342
            select([c, c, c]),
307
 
            "SELECT a"
308
 
            , dialect=default.DefaultDialect()
 
343
            "SELECT a", dialect=default.DefaultDialect()
309
344
        )
310
345
 
311
346
        a, b = column('a'), column('b')
312
347
        self.assert_compile(
313
348
            select([a, b, b, b, a, a]),
314
 
            "SELECT a, b"
315
 
            , dialect=default.DefaultDialect()
 
349
            "SELECT a, b", dialect=default.DefaultDialect()
316
350
        )
317
351
 
318
352
        # using alternate keys.
319
 
        # this will change with #2397
320
353
        a, b, c = Column('a', Integer, key='b'), \
321
354
                    Column('b', Integer), \
322
355
                    Column('c', Integer, key='a')
323
356
        self.assert_compile(
324
357
            select([a, b, c, a, b, c]),
325
 
            "SELECT a, b, c"
326
 
            , dialect=default.DefaultDialect()
327
 
        )
328
 
 
329
 
        self.assert_compile(
330
 
            select([bindparam('a'), bindparam('b'), bindparam('c')]),
331
 
            "SELECT :a AS anon_1, :b AS anon_2, :c AS anon_3"
332
 
            , dialect=default.DefaultDialect(paramstyle='named')
333
 
        )
334
 
 
335
 
        self.assert_compile(
336
 
            select([bindparam('a'), bindparam('b'), bindparam('c')]),
337
 
            "SELECT ? AS anon_1, ? AS anon_2, ? AS anon_3"
338
 
            , dialect=default.DefaultDialect(paramstyle='qmark'),
 
358
            "SELECT a, b, c", dialect=default.DefaultDialect()
 
359
        )
 
360
 
 
361
        self.assert_compile(
 
362
            select([bindparam('a'), bindparam('b'), bindparam('c')]),
 
363
            "SELECT :a AS anon_1, :b AS anon_2, :c AS anon_3",
 
364
                dialect=default.DefaultDialect(paramstyle='named')
 
365
        )
 
366
 
 
367
        self.assert_compile(
 
368
            select([bindparam('a'), bindparam('b'), bindparam('c')]),
 
369
            "SELECT ? AS anon_1, ? AS anon_2, ? AS anon_3",
 
370
                dialect=default.DefaultDialect(paramstyle='qmark'),
339
371
        )
340
372
 
341
373
        self.assert_compile(
370
402
                            'AS anon_1')
371
403
 
372
404
    def test_nested_label_targeting_keyed(self):
373
 
        # this behavior chagnes with #2397
374
405
        s1 = keyed.select()
375
406
        s2 = s1.alias()
376
407
        s3 = select([s2], use_labels=True)
387
418
                    "SELECT anon_1.anon_2_x AS anon_1_anon_2_x, "
388
419
                    "anon_1.anon_2_y AS anon_1_anon_2_y, "
389
420
                    "anon_1.anon_2_z AS anon_1_anon_2_z "
390
 
                    "FROM (SELECT anon_2.x AS anon_2_x, anon_2.y AS anon_2_y, "
 
421
                    "FROM (SELECT anon_2.x AS anon_2_x, "
 
422
                        "anon_2.y AS anon_2_y, "
391
423
                    "anon_2.z AS anon_2_z FROM "
392
424
                    "(SELECT keyed.x AS x, keyed.y AS y, keyed.z "
393
425
                    "AS z FROM keyed) AS anon_2) AS anon_1"
394
426
                    )
395
427
 
396
 
    def test_dont_overcorrelate(self):
397
 
        self.assert_compile(select([table1], from_obj=[table1,
398
 
                            table1.select()]),
399
 
                            "SELECT mytable.myid, mytable.name, "
400
 
                            "mytable.description FROM mytable, (SELECT "
401
 
                            "mytable.myid AS myid, mytable.name AS "
402
 
                            "name, mytable.description AS description "
403
 
                            "FROM mytable)")
404
 
 
405
 
    def test_full_correlate(self):
406
 
        # intentional
407
 
        t = table('t', column('a'), column('b'))
408
 
        s = select([t.c.a]).where(t.c.a==1).correlate(t).as_scalar()
409
 
 
410
 
        s2 = select([t.c.a, s])
411
 
        self.assert_compile(s2, """SELECT t.a, (SELECT t.a WHERE t.a = :a_1) AS anon_1 FROM t""")
412
 
 
413
 
        # unintentional
414
 
        t2 = table('t2', column('c'), column('d'))
415
 
        s = select([t.c.a]).where(t.c.a==t2.c.d).as_scalar()
416
 
        s2 =select([t, t2, s])
417
 
        assert_raises(exc.InvalidRequestError, str, s2)
418
 
 
419
 
        # intentional again
420
 
        s = s.correlate(t, t2)
421
 
        s2 =select([t, t2, s])
422
 
        self.assert_compile(s, "SELECT t.a WHERE t.a = t2.d")
423
 
 
424
428
    def test_exists(self):
425
 
        s = select([table1.c.myid]).where(table1.c.myid==5)
 
429
        s = select([table1.c.myid]).where(table1.c.myid == 5)
426
430
 
427
431
        self.assert_compile(exists(s),
428
 
                    "EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)"
 
432
                    "EXISTS (SELECT mytable.myid FROM mytable "
 
433
                        "WHERE mytable.myid = :myid_1)"
429
434
                )
430
435
 
431
436
        self.assert_compile(exists(s.as_scalar()),
432
 
                    "EXISTS (SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)"
 
437
                    "EXISTS (SELECT mytable.myid FROM mytable "
 
438
                        "WHERE mytable.myid = :myid_1)"
433
439
                )
434
440
 
435
441
        self.assert_compile(exists([table1.c.myid], table1.c.myid
463
469
                            'EXISTS (SELECT * FROM myothertable WHERE '
464
470
                            'myothertable.otherid = mytable.myid)')
465
471
        self.assert_compile(table1.select(exists().where(table2.c.otherid
466
 
                            == table1.c.myid).correlate(table1)).replace_selectable(table2,
 
472
                            == table1.c.myid).correlate(table1)
 
473
                            ).replace_selectable(table2,
467
474
                            table2.alias()),
468
475
                            'SELECT mytable.myid, mytable.name, '
469
476
                            'mytable.description FROM mytable WHERE '
471
478
                            'myothertable_1 WHERE myothertable_1.otheri'
472
479
                            'd = mytable.myid)')
473
480
        self.assert_compile(table1.select(exists().where(table2.c.otherid
474
 
                            == table1.c.myid).correlate(table1)).select_from(table1.join(table2,
 
481
                            == table1.c.myid).correlate(table1)).select_from(
 
482
                            table1.join(table2,
475
483
                            table1.c.myid
476
484
                            == table2.c.otherid)).replace_selectable(table2,
477
485
                            table2.alias()),
486
494
        self.assert_compile(
487
495
            select([
488
496
                or_(
489
 
                    exists().where(table2.c.otherid=='foo'),
490
 
                    exists().where(table2.c.otherid=='bar')
 
497
                    exists().where(table2.c.otherid == 'foo'),
 
498
                    exists().where(table2.c.otherid == 'bar')
491
499
                )
492
500
            ]),
493
501
            "SELECT (EXISTS (SELECT * FROM myothertable "
500
508
    def test_where_subquery(self):
501
509
        s = select([addresses.c.street], addresses.c.user_id
502
510
                   == users.c.user_id, correlate=True).alias('s')
 
511
 
 
512
        # don't correlate in a FROM list
503
513
        self.assert_compile(select([users, s.c.street], from_obj=s),
504
514
                            "SELECT users.user_id, users.user_name, "
505
515
                            "users.password, s.street FROM users, "
506
516
                            "(SELECT addresses.street AS street FROM "
507
 
                            "addresses WHERE addresses.user_id = "
 
517
                            "addresses, users WHERE addresses.user_id = "
508
518
                            "users.user_id) AS s")
509
519
        self.assert_compile(table1.select(table1.c.myid
510
520
                            == select([table1.c.myid], table1.c.name
540
550
                            'mytable AS ta WHERE EXISTS (SELECT 1 FROM '
541
551
                            'myothertable WHERE myothertable.otherid = '
542
552
                            'ta.myid)) AS sq2, mytable')
543
 
        s = select([addresses.c.street], addresses.c.user_id
544
 
                   == users.c.user_id, correlate=True).alias('s')
545
 
        self.assert_compile(select([users, s.c.street], from_obj=s),
546
 
                            "SELECT users.user_id, users.user_name, "
547
 
                            "users.password, s.street FROM users, "
548
 
                            "(SELECT addresses.street AS street FROM "
549
 
                            "addresses WHERE addresses.user_id = "
550
 
                            "users.user_id) AS s")
 
553
 
551
554
 
552
555
        # test constructing the outer query via append_column(), which
553
556
        # occurs in the ORM's Query object
570
573
                            '(SELECT myothertable.otherid FROM '
571
574
                            'myothertable WHERE mytable.myid = '
572
575
                            'myothertable.otherid)')
573
 
        self.assert_compile(table1.select(order_by=[desc(select([table2.c.otherid],
 
576
        self.assert_compile(table1.select(order_by=[
 
577
                            desc(select([table2.c.otherid],
574
578
                            table1.c.myid == table2.c.otherid))]),
575
579
                            'SELECT mytable.myid, mytable.name, '
576
580
                            'mytable.description FROM mytable ORDER BY '
578
582
                            'myothertable WHERE mytable.myid = '
579
583
                            'myothertable.otherid) DESC')
580
584
 
581
 
    @testing.uses_deprecated('scalar option')
582
585
    def test_scalar_select(self):
583
586
        assert_raises_message(
584
587
            exc.InvalidRequestError,
585
588
            r"Select objects don't have a type\.  Call as_scalar\(\) "
586
 
            "on this Select object to return a 'scalar' version of this Select\.",
 
589
            "on this Select object to return a 'scalar' "
 
590
            "version of this Select\.",
587
591
            func.coalesce, select([table1.c.myid])
588
592
        )
589
593
 
604
608
                            'mytable.description, (SELECT mytable.myid '
605
609
                            'FROM mytable) AS anon_1 FROM mytable')
606
610
 
 
611
        s = select([table1.c.myid]).as_scalar()
 
612
        s2 = s.where(table1.c.myid == 5)
 
613
        self.assert_compile(
 
614
            s2,
 
615
            "(SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1)"
 
616
        )
 
617
        self.assert_compile(
 
618
            s, "(SELECT mytable.myid FROM mytable)"
 
619
        )
607
620
        # test that aliases use as_scalar() when used in an explicitly
608
621
        # scalar context
609
622
 
669
682
            column('nm')
670
683
        )
671
684
        zip = '12345'
672
 
        qlat = select([zips.c.latitude], zips.c.zipcode == zip).correlate(None).as_scalar()
673
 
        qlng = select([zips.c.longitude], zips.c.zipcode == zip).correlate(None).as_scalar()
 
685
        qlat = select([zips.c.latitude], zips.c.zipcode == zip).\
 
686
                        correlate(None).as_scalar()
 
687
        qlng = select([zips.c.longitude], zips.c.zipcode == zip).\
 
688
                        correlate(None).as_scalar()
674
689
 
675
 
        q = select([places.c.id, places.c.nm, zips.c.zipcode, func.latlondist(qlat, qlng).label('dist')],
676
 
                         zips.c.zipcode==zip,
677
 
                         order_by = ['dist', places.c.nm]
 
690
        q = select([places.c.id, places.c.nm, zips.c.zipcode,
 
691
                        func.latlondist(qlat, qlng).label('dist')],
 
692
                         zips.c.zipcode == zip,
 
693
                         order_by=['dist', places.c.nm]
678
694
                         )
679
695
 
680
696
        self.assert_compile(q,
688
704
                            ':zipcode_3 ORDER BY dist, places.nm')
689
705
 
690
706
        zalias = zips.alias('main_zip')
691
 
        qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode).as_scalar()
692
 
        qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode).as_scalar()
 
707
        qlat = select([zips.c.latitude], zips.c.zipcode == zalias.c.zipcode).\
 
708
                    as_scalar()
 
709
        qlng = select([zips.c.longitude], zips.c.zipcode == zalias.c.zipcode).\
 
710
                    as_scalar()
693
711
        q = select([places.c.id, places.c.nm, zalias.c.zipcode,
694
712
                   func.latlondist(qlat, qlng).label('dist')],
695
713
                   order_by=['dist', places.c.nm])
704
722
                            'dist, places.nm')
705
723
 
706
724
        a1 = table2.alias('t2alias')
707
 
        s1 = select([a1.c.otherid], table1.c.myid==a1.c.otherid).as_scalar()
708
 
        j1 = table1.join(table2, table1.c.myid==table2.c.otherid)
 
725
        s1 = select([a1.c.otherid], table1.c.myid == a1.c.otherid).as_scalar()
 
726
        j1 = table1.join(table2, table1.c.myid == table2.c.otherid)
709
727
        s2 = select([table1, s1], from_obj=j1)
710
728
        self.assert_compile(s2,
711
729
                            'SELECT mytable.myid, mytable.name, '
724
742
                            ':param_1')
725
743
 
726
744
        self.assert_compile(
727
 
                label('bar', column('foo', type_=String))+ 'foo',
 
745
                label('bar', column('foo', type_=String)) + 'foo',
728
746
                'foo || :param_1')
729
747
 
730
748
 
739
757
        )
740
758
 
741
759
        self.assert_compile(
742
 
            and_(table1.c.myid == 12, table1.c.name=='asdf',
 
760
            and_(table1.c.myid == 12, table1.c.name == 'asdf',
743
761
                table2.c.othername == 'foo', "sysdate() = today()"),
744
762
            "mytable.myid = :myid_1 AND mytable.name = :name_1 "\
745
 
            "AND myothertable.othername = :othername_1 AND sysdate() = today()"
 
763
            "AND myothertable.othername = "
 
764
            ":othername_1 AND sysdate() = today()"
746
765
        )
747
766
 
748
767
        self.assert_compile(
749
768
            and_(
750
769
                table1.c.myid == 12,
751
 
                or_(table2.c.othername=='asdf',
 
770
                or_(table2.c.othername == 'asdf',
752
771
                    table2.c.othername == 'foo', table2.c.otherid == 9),
753
772
                "sysdate() = today()",
754
773
            ),
756
775
             ':othername_1 OR myothertable.othername = :othername_2 OR '
757
776
             'myothertable.otherid = :otherid_1) AND sysdate() = '
758
777
             'today()',
759
 
            checkparams = {'othername_1': 'asdf', 'othername_2':'foo', 'otherid_1': 9, 'myid_1': 12}
 
778
            checkparams={'othername_1': 'asdf', 'othername_2': 'foo',
 
779
                            'otherid_1': 9, 'myid_1': 12}
760
780
        )
761
781
 
762
782
    def test_nested_conjunctions_short_circuit(self):
766
786
        t = table('t', column('x'))
767
787
 
768
788
        self.assert_compile(
769
 
            select([t]).where(and_(t.c.x==5,
770
 
                or_(and_(or_(t.c.x==7))))),
 
789
            select([t]).where(and_(t.c.x == 5,
 
790
                or_(and_(or_(t.c.x == 7))))),
771
791
            "SELECT t.x FROM t WHERE t.x = :x_1 AND t.x = :x_2"
772
792
        )
773
793
        self.assert_compile(
774
 
            select([t]).where(and_(or_(t.c.x==12,
775
 
                and_(or_(t.c.x==8))))),
 
794
            select([t]).where(and_(or_(t.c.x == 12,
 
795
                and_(or_(t.c.x == 8))))),
776
796
            "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2"
777
797
        )
778
798
        self.assert_compile(
779
 
            select([t]).where(and_(or_(or_(t.c.x==12),
780
 
                and_(or_(), or_(and_(t.c.x==8)), and_())))),
 
799
            select([t]).where(and_(or_(or_(t.c.x == 12),
 
800
                and_(or_(), or_(and_(t.c.x == 8)), and_())))),
781
801
            "SELECT t.x FROM t WHERE t.x = :x_1 OR t.x = :x_2"
782
802
        )
783
803
 
807
827
            "SELECT count(DISTINCT mytable.myid) AS count_1 FROM mytable"
808
828
        )
809
829
 
810
 
    def test_operators(self):
811
 
        for (py_op, sql_op) in ((operator.add, '+'), (operator.mul, '*'),
812
 
                                (operator.sub, '-'),
813
 
                                # Py3K
814
 
                                #(operator.truediv, '/'),
815
 
                                # Py2K
816
 
                                (operator.div, '/'),
817
 
                                # end Py2K
818
 
                                ):
819
 
            for (lhs, rhs, res) in (
820
 
                (5, table1.c.myid, ':myid_1 %s mytable.myid'),
821
 
                (5, literal(5), ':param_1 %s :param_2'),
822
 
                (table1.c.myid, 'b', 'mytable.myid %s :myid_1'),
823
 
                (table1.c.myid, literal(2.7), 'mytable.myid %s :param_1'),
824
 
                (table1.c.myid, table1.c.myid, 'mytable.myid %s mytable.myid'),
825
 
                (literal(5), 8, ':param_1 %s :param_2'),
826
 
                (literal(6), table1.c.myid, ':param_1 %s mytable.myid'),
827
 
                (literal(7), literal(5.5), ':param_1 %s :param_2'),
828
 
                ):
829
 
                self.assert_compile(py_op(lhs, rhs), res % sql_op)
830
 
 
831
 
        dt = datetime.datetime.today()
832
 
        # exercise comparison operators
833
 
        for (py_op, fwd_op, rev_op) in ((operator.lt, '<', '>'),
834
 
                                        (operator.gt, '>', '<'),
835
 
                                        (operator.eq, '=', '='),
836
 
                                        (operator.ne, '!=', '!='),
837
 
                                        (operator.le, '<=', '>='),
838
 
                                        (operator.ge, '>=', '<=')):
839
 
            for (lhs, rhs, l_sql, r_sql) in (
840
 
                ('a', table1.c.myid, ':myid_1', 'mytable.myid'),
841
 
                ('a', literal('b'), ':param_2', ':param_1'), # note swap!
842
 
                (table1.c.myid, 'b', 'mytable.myid', ':myid_1'),
843
 
                (table1.c.myid, literal('b'), 'mytable.myid', ':param_1'),
844
 
                (table1.c.myid, table1.c.myid, 'mytable.myid', 'mytable.myid'),
845
 
                (literal('a'), 'b', ':param_1', ':param_2'),
846
 
                (literal('a'), table1.c.myid, ':param_1', 'mytable.myid'),
847
 
                (literal('a'), literal('b'), ':param_1', ':param_2'),
848
 
                (dt, literal('b'), ':param_2', ':param_1'),
849
 
                (literal('b'), dt, ':param_1', ':param_2'),
850
 
                ):
851
 
 
852
 
                # the compiled clause should match either (e.g.):
853
 
                # 'a' < 'b' -or- 'b' > 'a'.
854
 
                compiled = str(py_op(lhs, rhs))
855
 
                fwd_sql = "%s %s %s" % (l_sql, fwd_op, r_sql)
856
 
                rev_sql = "%s %s %s" % (r_sql, rev_op, l_sql)
857
 
 
858
 
                self.assert_(compiled == fwd_sql or compiled == rev_sql,
859
 
                             "\n'" + compiled + "'\n does not match\n'" +
860
 
                             fwd_sql + "'\n or\n'" + rev_sql + "'")
861
 
 
862
 
        for (py_op, op) in (
863
 
            (operator.neg, '-'),
864
 
            (operator.inv, 'NOT '),
865
 
        ):
866
 
            for expr, sql in (
867
 
                (table1.c.myid, "mytable.myid"),
868
 
                (literal("foo"), ":param_1"),
869
 
            ):
870
 
 
871
 
                compiled = str(py_op(expr))
872
 
                sql = "%s%s" % (op, sql)
873
 
                eq_(compiled, sql)
874
 
 
875
 
        self.assert_compile(
876
 
         table1.select((table1.c.myid != 12) & ~(table1.c.name=='john')),
877
 
         "SELECT mytable.myid, mytable.name, mytable.description FROM "
878
 
            "mytable WHERE mytable.myid != :myid_1 AND mytable.name != :name_1"
879
 
        )
880
 
 
881
 
        self.assert_compile(
882
 
         table1.select((table1.c.myid != 12) &
883
 
                ~(table1.c.name.between('jack','john'))),
884
 
         "SELECT mytable.myid, mytable.name, mytable.description FROM "
885
 
             "mytable WHERE mytable.myid != :myid_1 AND "\
886
 
             "NOT (mytable.name BETWEEN :name_1 AND :name_2)"
887
 
        )
888
 
 
889
 
        self.assert_compile(
890
 
         table1.select((table1.c.myid != 12) &
891
 
                ~and_(table1.c.name=='john', table1.c.name=='ed', table1.c.name=='fred')),
892
 
         "SELECT mytable.myid, mytable.name, mytable.description FROM "
893
 
         "mytable WHERE mytable.myid != :myid_1 AND "\
894
 
         "NOT (mytable.name = :name_1 AND mytable.name = :name_2 "
895
 
         "AND mytable.name = :name_3)"
896
 
        )
897
 
 
898
 
        self.assert_compile(
899
 
         table1.select((table1.c.myid != 12) & ~table1.c.name),
900
 
         "SELECT mytable.myid, mytable.name, mytable.description FROM "
901
 
            "mytable WHERE mytable.myid != :myid_1 AND NOT mytable.name"
902
 
        )
903
 
 
904
 
        self.assert_compile(
905
 
         literal("a") + literal("b") * literal("c"), ":param_1 || :param_2 * :param_3"
906
 
        )
907
 
 
908
 
        # test the op() function, also that its results are further usable in expressions
909
 
        self.assert_compile(
910
 
            table1.select(table1.c.myid.op('hoho')(12)==14),
911
 
            "SELECT mytable.myid, mytable.name, mytable.description FROM "
912
 
                    "mytable WHERE (mytable.myid hoho :myid_1) = :param_1"
913
 
        )
914
 
 
915
 
        # test that clauses can be pickled (operators need to be module-level, etc.)
916
 
        clause = (table1.c.myid == 12) & table1.c.myid.between(15, 20) & \
917
 
                            table1.c.myid.like('hoho')
918
 
        assert str(clause) == str(util.pickle.loads(util.pickle.dumps(clause)))
919
 
 
920
 
 
921
 
    def test_like(self):
922
 
        for expr, check, dialect in [
923
 
            (
924
 
                table1.c.myid.like('somstr'),
925
 
                "mytable.myid LIKE :myid_1", None),
926
 
            (
927
 
                ~table1.c.myid.like('somstr'),
928
 
                "mytable.myid NOT LIKE :myid_1", None),
929
 
            (
930
 
                table1.c.myid.like('somstr', escape='\\'),
931
 
                "mytable.myid LIKE :myid_1 ESCAPE '\\'",
932
 
                None),
933
 
            (
934
 
                ~table1.c.myid.like('somstr', escape='\\'),
935
 
                "mytable.myid NOT LIKE :myid_1 ESCAPE '\\'",
936
 
                None),
937
 
            (
938
 
                table1.c.myid.ilike('somstr', escape='\\'),
939
 
                "lower(mytable.myid) LIKE lower(:myid_1) ESCAPE '\\'",
940
 
                None),
941
 
            (
942
 
                ~table1.c.myid.ilike('somstr', escape='\\'),
943
 
                "lower(mytable.myid) NOT LIKE lower(:myid_1) ESCAPE '\\'",
944
 
                None),
945
 
            (
946
 
                table1.c.myid.ilike('somstr', escape='\\'),
947
 
                    "mytable.myid ILIKE %(myid_1)s ESCAPE '\\\\'",
948
 
                    postgresql.PGDialect()),
949
 
            (
950
 
                ~table1.c.myid.ilike('somstr', escape='\\'),
951
 
                "mytable.myid NOT ILIKE %(myid_1)s ESCAPE '\\\\'",
952
 
                postgresql.PGDialect()),
953
 
            (
954
 
                table1.c.name.ilike('%something%'),
955
 
                "lower(mytable.name) LIKE lower(:name_1)", None),
956
 
            (
957
 
                table1.c.name.ilike('%something%'),
958
 
                "mytable.name ILIKE %(name_1)s", postgresql.PGDialect()),
959
 
            (
960
 
                ~table1.c.name.ilike('%something%'),
961
 
                "lower(mytable.name) NOT LIKE lower(:name_1)", None),
962
 
            (
963
 
                ~table1.c.name.ilike('%something%'),
964
 
                "mytable.name NOT ILIKE %(name_1)s",
965
 
                postgresql.PGDialect()),
966
 
        ]:
967
 
            self.assert_compile(expr, check, dialect=dialect)
968
 
 
969
 
    def test_match(self):
970
 
        for expr, check, dialect in [
971
 
            (table1.c.myid.match('somstr'),
972
 
                        "mytable.myid MATCH ?", sqlite.SQLiteDialect()),
973
 
            (table1.c.myid.match('somstr'),
974
 
                        "MATCH (mytable.myid) AGAINST (%s IN BOOLEAN MODE)",
975
 
                        mysql.dialect()),
976
 
            (table1.c.myid.match('somstr'),
977
 
                        "CONTAINS (mytable.myid, :myid_1)",
978
 
                        mssql.dialect()),
979
 
            (table1.c.myid.match('somstr'),
980
 
                        "mytable.myid @@ to_tsquery(%(myid_1)s)",
981
 
                        postgresql.dialect()),
982
 
            (table1.c.myid.match('somstr'),
983
 
                        "CONTAINS (mytable.myid, :myid_1)",
984
 
                        oracle.dialect()),
985
 
        ]:
986
 
            self.assert_compile(expr, check, dialect=dialect)
987
 
 
988
 
    def test_composed_string_comparators(self):
989
 
        self.assert_compile(
990
 
            table1.c.name.contains('jo'),
991
 
            "mytable.name LIKE '%%' || :name_1 || '%%'" ,
992
 
            checkparams = {'name_1': u'jo'},
993
 
        )
994
 
        self.assert_compile(
995
 
            table1.c.name.contains('jo'),
996
 
            "mytable.name LIKE concat(concat('%%', %s), '%%')" ,
997
 
            checkparams = {'name_1': u'jo'},
998
 
            dialect=mysql.dialect()
999
 
        )
1000
 
        self.assert_compile(
1001
 
            table1.c.name.contains('jo', escape='\\'),
1002
 
            "mytable.name LIKE '%%' || :name_1 || '%%' ESCAPE '\\'" ,
1003
 
            checkparams = {'name_1': u'jo'},
1004
 
        )
1005
 
        self.assert_compile(
1006
 
            table1.c.name.startswith('jo', escape='\\'),
1007
 
            "mytable.name LIKE :name_1 || '%%' ESCAPE '\\'" )
1008
 
        self.assert_compile(
1009
 
            table1.c.name.endswith('jo', escape='\\'),
1010
 
            "mytable.name LIKE '%%' || :name_1 ESCAPE '\\'" )
1011
 
        self.assert_compile(
1012
 
            table1.c.name.endswith('hn'),
1013
 
            "mytable.name LIKE '%%' || :name_1",
1014
 
            checkparams = {'name_1': u'hn'}, )
1015
 
        self.assert_compile(
1016
 
            table1.c.name.endswith('hn'),
1017
 
            "mytable.name LIKE concat('%%', %s)",
1018
 
            checkparams = {'name_1': u'hn'}, dialect=mysql.dialect()
1019
 
        )
1020
 
        self.assert_compile(
1021
 
            table1.c.name.startswith(u"hi \xf6 \xf5"),
1022
 
            "mytable.name LIKE :name_1 || '%%'",
1023
 
            checkparams = {'name_1': u'hi \xf6 \xf5'},
1024
 
        )
1025
 
        self.assert_compile(
1026
 
                column('name').endswith(text("'foo'")),
1027
 
                "name LIKE '%%' || 'foo'"  )
1028
 
        self.assert_compile(
1029
 
                column('name').endswith(literal_column("'foo'")),
1030
 
                "name LIKE '%%' || 'foo'"  )
1031
 
        self.assert_compile(
1032
 
                column('name').startswith(text("'foo'")),
1033
 
                "name LIKE 'foo' || '%%'"  )
1034
 
        self.assert_compile(
1035
 
                column('name').startswith(text("'foo'")),
1036
 
                 "name LIKE concat('foo', '%%')", dialect=mysql.dialect())
1037
 
        self.assert_compile(
1038
 
                column('name').startswith(literal_column("'foo'")),
1039
 
                "name LIKE 'foo' || '%%'"  )
1040
 
        self.assert_compile(
1041
 
                column('name').startswith(literal_column("'foo'")),
1042
 
                "name LIKE concat('foo', '%%')", dialect=mysql.dialect())
 
830
 
1043
831
 
1044
832
    def test_multiple_col_binds(self):
1045
833
        self.assert_compile(
1046
 
            select(["*"], or_(table1.c.myid == 12, table1.c.myid=='asdf',
 
834
            select(["*"], or_(table1.c.myid == 12, table1.c.myid == 'asdf',
1047
835
                            table1.c.myid == 'foo')),
1048
836
            "SELECT * FROM mytable WHERE mytable.myid = :myid_1 "
1049
837
            "OR mytable.myid = :myid_2 OR mytable.myid = :myid_3"
1051
839
 
1052
840
    def test_order_by_nulls(self):
1053
841
        self.assert_compile(
1054
 
            table2.select(order_by = [table2.c.otherid, table2.c.othername.desc().nullsfirst()]),
1055
 
            "SELECT myothertable.otherid, myothertable.othername FROM "
1056
 
            "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC NULLS FIRST"
1057
 
        )
1058
 
 
1059
 
        self.assert_compile(
1060
 
            table2.select(order_by = [table2.c.otherid, table2.c.othername.desc().nullslast()]),
1061
 
            "SELECT myothertable.otherid, myothertable.othername FROM "
1062
 
            "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC NULLS LAST"
1063
 
        )
1064
 
 
1065
 
        self.assert_compile(
1066
 
            table2.select(order_by = [table2.c.otherid.nullslast(), table2.c.othername.desc().nullsfirst()]),
1067
 
            "SELECT myothertable.otherid, myothertable.othername FROM "
1068
 
            "myothertable ORDER BY myothertable.otherid NULLS LAST, myothertable.othername DESC NULLS FIRST"
1069
 
        )
1070
 
 
1071
 
        self.assert_compile(
1072
 
            table2.select(order_by = [table2.c.otherid.nullsfirst(), table2.c.othername.desc()]),
1073
 
            "SELECT myothertable.otherid, myothertable.othername FROM "
1074
 
            "myothertable ORDER BY myothertable.otherid NULLS FIRST, myothertable.othername DESC"
1075
 
        )
1076
 
 
1077
 
        self.assert_compile(
1078
 
            table2.select(order_by = [table2.c.otherid.nullsfirst(), table2.c.othername.desc().nullslast()]),
1079
 
            "SELECT myothertable.otherid, myothertable.othername FROM "
1080
 
            "myothertable ORDER BY myothertable.otherid NULLS FIRST, myothertable.othername DESC NULLS LAST"
 
842
            table2.select(order_by=[table2.c.otherid,
 
843
                                    table2.c.othername.desc().nullsfirst()]),
 
844
            "SELECT myothertable.otherid, myothertable.othername FROM "
 
845
            "myothertable ORDER BY myothertable.otherid, "
 
846
            "myothertable.othername DESC NULLS FIRST"
 
847
        )
 
848
 
 
849
        self.assert_compile(
 
850
            table2.select(order_by=[
 
851
                    table2.c.otherid, table2.c.othername.desc().nullslast()]),
 
852
            "SELECT myothertable.otherid, myothertable.othername FROM "
 
853
            "myothertable ORDER BY myothertable.otherid, "
 
854
            "myothertable.othername DESC NULLS LAST"
 
855
        )
 
856
 
 
857
        self.assert_compile(
 
858
            table2.select(order_by=[
 
859
                        table2.c.otherid.nullslast(),
 
860
                        table2.c.othername.desc().nullsfirst()]),
 
861
            "SELECT myothertable.otherid, myothertable.othername FROM "
 
862
            "myothertable ORDER BY myothertable.otherid NULLS LAST, "
 
863
            "myothertable.othername DESC NULLS FIRST"
 
864
        )
 
865
 
 
866
        self.assert_compile(
 
867
            table2.select(order_by=[table2.c.otherid.nullsfirst(),
 
868
                                                table2.c.othername.desc()]),
 
869
            "SELECT myothertable.otherid, myothertable.othername FROM "
 
870
            "myothertable ORDER BY myothertable.otherid NULLS FIRST, "
 
871
            "myothertable.othername DESC"
 
872
        )
 
873
 
 
874
        self.assert_compile(
 
875
            table2.select(order_by=[table2.c.otherid.nullsfirst(),
 
876
                                table2.c.othername.desc().nullslast()]),
 
877
            "SELECT myothertable.otherid, myothertable.othername FROM "
 
878
            "myothertable ORDER BY myothertable.otherid NULLS FIRST, "
 
879
            "myothertable.othername DESC NULLS LAST"
1081
880
        )
1082
881
 
1083
882
    def test_orderby_groupby(self):
1084
883
        self.assert_compile(
1085
 
            table2.select(order_by = [table2.c.otherid, asc(table2.c.othername)]),
 
884
            table2.select(order_by=[table2.c.otherid,
 
885
                                asc(table2.c.othername)]),
1086
886
            "SELECT myothertable.otherid, myothertable.othername FROM "
1087
 
            "myothertable ORDER BY myothertable.otherid, myothertable.othername ASC"
 
887
            "myothertable ORDER BY myothertable.otherid, "
 
888
            "myothertable.othername ASC"
1088
889
        )
1089
890
 
1090
891
        self.assert_compile(
1091
 
            table2.select(order_by = [table2.c.otherid, table2.c.othername.desc()]),
 
892
            table2.select(order_by=[table2.c.otherid,
 
893
                                    table2.c.othername.desc()]),
1092
894
            "SELECT myothertable.otherid, myothertable.othername FROM "
1093
 
            "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
 
895
            "myothertable ORDER BY myothertable.otherid, "
 
896
            "myothertable.othername DESC"
1094
897
        )
1095
898
 
1096
899
        # generative order_by
1097
900
        self.assert_compile(
1098
 
            table2.select().order_by(table2.c.otherid).order_by(table2.c.othername.desc()),
 
901
            table2.select().order_by(table2.c.otherid).\
 
902
                    order_by(table2.c.othername.desc()),
1099
903
            "SELECT myothertable.otherid, myothertable.othername FROM "
1100
 
            "myothertable ORDER BY myothertable.otherid, myothertable.othername DESC"
 
904
            "myothertable ORDER BY myothertable.otherid, "
 
905
            "myothertable.othername DESC"
1101
906
        )
1102
907
 
1103
908
        self.assert_compile(
1104
909
            table2.select().order_by(table2.c.otherid).
1105
 
                                order_by(table2.c.othername.desc()).order_by(None),
1106
 
            "SELECT myothertable.otherid, myothertable.othername FROM myothertable"
 
910
                                order_by(table2.c.othername.desc()
 
911
                                ).order_by(None),
 
912
            "SELECT myothertable.otherid, myothertable.othername "
 
913
            "FROM myothertable"
1107
914
        )
1108
915
 
1109
916
        self.assert_compile(
1110
917
            select(
1111
918
                    [table2.c.othername, func.count(table2.c.otherid)],
1112
 
                    group_by = [table2.c.othername]),
1113
 
            "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
 
919
                    group_by=[table2.c.othername]),
 
920
            "SELECT myothertable.othername, "
 
921
            "count(myothertable.otherid) AS count_1 "
1114
922
            "FROM myothertable GROUP BY myothertable.othername"
1115
923
        )
1116
924
 
1118
926
        self.assert_compile(
1119
927
            select([table2.c.othername, func.count(table2.c.otherid)]).
1120
928
                        group_by(table2.c.othername),
1121
 
            "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
 
929
            "SELECT myothertable.othername, "
 
930
            "count(myothertable.otherid) AS count_1 "
1122
931
            "FROM myothertable GROUP BY myothertable.othername"
1123
932
        )
1124
933
 
1125
934
        self.assert_compile(
1126
935
            select([table2.c.othername, func.count(table2.c.otherid)]).
1127
936
                        group_by(table2.c.othername).group_by(None),
1128
 
            "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
 
937
            "SELECT myothertable.othername, "
 
938
            "count(myothertable.otherid) AS count_1 "
1129
939
            "FROM myothertable"
1130
940
        )
1131
941
 
1132
942
        self.assert_compile(
1133
943
            select([table2.c.othername, func.count(table2.c.otherid)],
1134
 
                        group_by = [table2.c.othername],
1135
 
                        order_by = [table2.c.othername]),
1136
 
            "SELECT myothertable.othername, count(myothertable.otherid) AS count_1 "
1137
 
            "FROM myothertable GROUP BY myothertable.othername ORDER BY myothertable.othername"
 
944
                        group_by=[table2.c.othername],
 
945
                        order_by=[table2.c.othername]),
 
946
            "SELECT myothertable.othername, "
 
947
            "count(myothertable.otherid) AS count_1 "
 
948
            "FROM myothertable "
 
949
            "GROUP BY myothertable.othername ORDER BY myothertable.othername"
1138
950
        )
1139
951
 
1140
952
    def test_for_update(self):
1141
953
        self.assert_compile(
1142
 
            table1.select(table1.c.myid==7, for_update=True),
 
954
            table1.select(table1.c.myid == 7, for_update=True),
1143
955
            "SELECT mytable.myid, mytable.name, mytable.description "
1144
956
            "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
1145
957
 
1146
958
        self.assert_compile(
1147
 
            table1.select(table1.c.myid==7, for_update=False),
 
959
            table1.select(table1.c.myid == 7, for_update=False),
1148
960
            "SELECT mytable.myid, mytable.name, mytable.description "
1149
961
            "FROM mytable WHERE mytable.myid = :myid_1")
1150
962
 
1151
963
        # not supported by dialect, should just use update
1152
964
        self.assert_compile(
1153
 
            table1.select(table1.c.myid==7, for_update='nowait'),
 
965
            table1.select(table1.c.myid == 7, for_update='nowait'),
1154
966
            "SELECT mytable.myid, mytable.name, mytable.description "
1155
967
            "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
1156
968
 
1157
969
        # unknown lock mode
1158
970
        self.assert_compile(
1159
 
            table1.select(table1.c.myid==7, for_update='unknown_mode'),
 
971
            table1.select(table1.c.myid == 7, for_update='unknown_mode'),
1160
972
            "SELECT mytable.myid, mytable.name, mytable.description "
1161
973
            "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE")
1162
974
 
1163
975
        # ----- mysql
1164
976
 
1165
977
        self.assert_compile(
1166
 
            table1.select(table1.c.myid==7, for_update=True),
 
978
            table1.select(table1.c.myid == 7, for_update=True),
1167
979
            "SELECT mytable.myid, mytable.name, mytable.description "
1168
980
            "FROM mytable WHERE mytable.myid = %s FOR UPDATE",
1169
981
            dialect=mysql.dialect())
1170
982
 
1171
983
        self.assert_compile(
1172
 
            table1.select(table1.c.myid==7, for_update="read"),
 
984
            table1.select(table1.c.myid == 7, for_update="read"),
1173
985
            "SELECT mytable.myid, mytable.name, mytable.description "
1174
986
            "FROM mytable WHERE mytable.myid = %s LOCK IN SHARE MODE",
1175
987
            dialect=mysql.dialect())
1177
989
        # ----- oracle
1178
990
 
1179
991
        self.assert_compile(
1180
 
            table1.select(table1.c.myid==7, for_update=True),
 
992
            table1.select(table1.c.myid == 7, for_update=True),
1181
993
            "SELECT mytable.myid, mytable.name, mytable.description "
1182
994
            "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE",
1183
995
            dialect=oracle.dialect())
1184
996
 
1185
997
        self.assert_compile(
1186
 
            table1.select(table1.c.myid==7, for_update="nowait"),
 
998
            table1.select(table1.c.myid == 7, for_update="nowait"),
1187
999
            "SELECT mytable.myid, mytable.name, mytable.description "
1188
1000
            "FROM mytable WHERE mytable.myid = :myid_1 FOR UPDATE NOWAIT",
1189
1001
            dialect=oracle.dialect())
1191
1003
        # ----- postgresql
1192
1004
 
1193
1005
        self.assert_compile(
1194
 
            table1.select(table1.c.myid==7, for_update=True),
 
1006
            table1.select(table1.c.myid == 7, for_update=True),
1195
1007
            "SELECT mytable.myid, mytable.name, mytable.description "
1196
1008
            "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE",
1197
1009
            dialect=postgresql.dialect())
1198
1010
 
1199
1011
        self.assert_compile(
1200
 
            table1.select(table1.c.myid==7, for_update="nowait"),
 
1012
            table1.select(table1.c.myid == 7, for_update="nowait"),
1201
1013
            "SELECT mytable.myid, mytable.name, mytable.description "
1202
1014
            "FROM mytable WHERE mytable.myid = %(myid_1)s FOR UPDATE NOWAIT",
1203
1015
            dialect=postgresql.dialect())
1204
1016
 
1205
1017
        self.assert_compile(
1206
 
            table1.select(table1.c.myid==7, for_update="read"),
 
1018
            table1.select(table1.c.myid == 7, for_update="read"),
1207
1019
            "SELECT mytable.myid, mytable.name, mytable.description "
1208
1020
            "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE",
1209
1021
            dialect=postgresql.dialect())
1210
1022
 
1211
1023
        self.assert_compile(
1212
 
            table1.select(table1.c.myid==7, for_update="read_nowait"),
 
1024
            table1.select(table1.c.myid == 7, for_update="read_nowait"),
1213
1025
            "SELECT mytable.myid, mytable.name, mytable.description "
1214
1026
            "FROM mytable WHERE mytable.myid = %(myid_1)s FOR SHARE NOWAIT",
1215
1027
            dialect=postgresql.dialect())
1216
1028
 
1217
1029
    def test_alias(self):
1218
 
        # test the alias for a table1.  column names stay the same, table name "changes" to "foo".
 
1030
        # test the alias for a table1.  column names stay the same,
 
1031
        # table name "changes" to "foo".
1219
1032
        self.assert_compile(
1220
 
            select([table1.alias('foo')])
1221
 
            ,"SELECT foo.myid, foo.name, foo.description FROM mytable AS foo")
 
1033
            select([table1.alias('foo')]),
 
1034
            "SELECT foo.myid, foo.name, foo.description FROM mytable AS foo")
1222
1035
 
1223
1036
        for dialect in (oracle.dialect(),):
1224
1037
            self.assert_compile(
1225
 
                select([table1.alias('foo')])
1226
 
                ,"SELECT foo.myid, foo.name, foo.description FROM mytable foo"
1227
 
                ,dialect=dialect)
 
1038
                select([table1.alias('foo')]),
 
1039
                "SELECT foo.myid, foo.name, foo.description FROM mytable foo",
 
1040
                dialect=dialect)
1228
1041
 
1229
1042
        self.assert_compile(
1230
1043
            select([table1.alias()]),
1238
1051
        # from the first table1.
1239
1052
        q = select(
1240
1053
                        [table1, table2.c.otherid],
1241
 
                        table1.c.myid == table2.c.otherid, use_labels = True
 
1054
                        table1.c.myid == table2.c.otherid, use_labels=True
1242
1055
                    )
1243
1056
 
1244
1057
        # make an alias of the "selectable".  column names
1245
1058
        # stay the same (i.e. the labels), table name "changes" to "t2view".
1246
1059
        a = alias(q, 't2view')
1247
1060
 
1248
 
        # select from that alias, also using labels.  two levels of labels should produce two underscores.
 
1061
        # select from that alias, also using labels.  two levels of labels
 
1062
        # should produce two underscores.
1249
1063
        # also, reference the column "mytable_myid" off of the t2view alias.
1250
1064
        self.assert_compile(
1251
 
            a.select(a.c.mytable_myid == 9, use_labels = True),
1252
 
            "SELECT t2view.mytable_myid AS t2view_mytable_myid, t2view.mytable_name "
1253
 
            "AS t2view_mytable_name, t2view.mytable_description AS t2view_mytable_description, "
 
1065
            a.select(a.c.mytable_myid == 9, use_labels=True),
 
1066
            "SELECT t2view.mytable_myid AS t2view_mytable_myid, "
 
1067
            "t2view.mytable_name "
 
1068
            "AS t2view_mytable_name, "
 
1069
            "t2view.mytable_description AS t2view_mytable_description, "
1254
1070
            "t2view.myothertable_otherid AS t2view_myothertable_otherid FROM "
1255
 
            "(SELECT mytable.myid AS mytable_myid, mytable.name AS mytable_name, "
1256
 
            "mytable.description AS mytable_description, myothertable.otherid AS "
1257
 
            "myothertable_otherid FROM mytable, myothertable WHERE mytable.myid = "
1258
 
            "myothertable.otherid) AS t2view WHERE t2view.mytable_myid = :mytable_myid_1"
 
1071
            "(SELECT mytable.myid AS mytable_myid, "
 
1072
                "mytable.name AS mytable_name, "
 
1073
            "mytable.description AS mytable_description, "
 
1074
                "myothertable.otherid AS "
 
1075
            "myothertable_otherid FROM mytable, myothertable "
 
1076
                "WHERE mytable.myid = "
 
1077
            "myothertable.otherid) AS t2view "
 
1078
                "WHERE t2view.mytable_myid = :mytable_myid_1"
1259
1079
        )
1260
1080
 
1261
1081
 
1262
 
    def test_prefixes(self):
 
1082
    def test_prefix(self):
1263
1083
        self.assert_compile(
1264
1084
            table1.select().prefix_with("SQL_CALC_FOUND_ROWS").\
1265
1085
                                prefix_with("SQL_SOME_WEIRD_MYSQL_THING"),
1267
1087
            "mytable.myid, mytable.name, mytable.description FROM mytable"
1268
1088
        )
1269
1089
 
 
1090
    def test_prefix_dialect_specific(self):
 
1091
        self.assert_compile(
 
1092
            table1.select().prefix_with("SQL_CALC_FOUND_ROWS",
 
1093
                                dialect='sqlite').\
 
1094
                                prefix_with("SQL_SOME_WEIRD_MYSQL_THING",
 
1095
                                        dialect='mysql'),
 
1096
            "SELECT SQL_SOME_WEIRD_MYSQL_THING "
 
1097
            "mytable.myid, mytable.name, mytable.description FROM mytable",
 
1098
            dialect=mysql.dialect()
 
1099
        )
 
1100
 
1270
1101
    def test_text(self):
1271
1102
        self.assert_compile(
1272
 
            text("select * from foo where lala = bar") ,
 
1103
            text("select * from foo where lala = bar"),
1273
1104
            "select * from foo where lala = bar"
1274
1105
        )
1275
1106
 
1277
1108
        self.assert_compile(select(
1278
1109
            ["foobar(a)", "pk_foo_bar(syslaal)"],
1279
1110
            "a = 12",
1280
 
            from_obj = ["foobar left outer join lala on foobar.foo = lala.foo"]
 
1111
            from_obj=["foobar left outer join lala on foobar.foo = lala.foo"]
1281
1112
            ),
1282
1113
            "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
1283
1114
            "left outer join lala on foobar.foo = lala.foo WHERE a = 12"
1287
1118
        self.assert_compile(select(
1288
1119
            [u"foobar(a)", u"pk_foo_bar(syslaal)"],
1289
1120
            u"a = 12",
1290
 
            from_obj = [u"foobar left outer join lala on foobar.foo = lala.foo"]
 
1121
            from_obj=[u"foobar left outer join lala on foobar.foo = lala.foo"]
1291
1122
            ),
1292
1123
            "SELECT foobar(a), pk_foo_bar(syslaal) FROM foobar "
1293
1124
            "left outer join lala on foobar.foo = lala.foo WHERE a = 12"
1305
1136
                                "column1=12 AND column2=19 ORDER BY column1")
1306
1137
 
1307
1138
        self.assert_compile(
1308
 
            select(["column1", "column2"], from_obj=table1).alias('somealias').select(),
 
1139
            select(["column1", "column2"],
 
1140
                            from_obj=table1).alias('somealias').select(),
1309
1141
            "SELECT somealias.column1, somealias.column2 FROM "
1310
1142
            "(SELECT column1, column2 FROM mytable) AS somealias"
1311
1143
        )
1312
1144
 
1313
1145
        # test that use_labels doesnt interfere with literal columns
1314
1146
        self.assert_compile(
1315
 
            select(["column1", "column2", table1.c.myid], from_obj=table1, use_labels=True),
1316
 
            "SELECT column1, column2, mytable.myid AS mytable_myid FROM mytable"
 
1147
            select(["column1", "column2", table1.c.myid], from_obj=table1,
 
1148
                        use_labels=True),
 
1149
            "SELECT column1, column2, mytable.myid AS mytable_myid "
 
1150
            "FROM mytable"
1317
1151
        )
1318
1152
 
1319
 
        # test that use_labels doesnt interfere with literal columns that have textual labels
 
1153
        # test that use_labels doesnt interfere
 
1154
        # with literal columns that have textual labels
1320
1155
        self.assert_compile(
1321
 
            select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=table1, use_labels=True),
1322
 
            "SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS mytable_myid FROM mytable"
 
1156
            select(["column1 AS foobar", "column2 AS hoho", table1.c.myid],
 
1157
                        from_obj=table1, use_labels=True),
 
1158
            "SELECT column1 AS foobar, column2 AS hoho, "
 
1159
            "mytable.myid AS mytable_myid FROM mytable"
1323
1160
        )
1324
1161
 
1325
 
        s1 = select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1])
1326
 
        # test that "auto-labeling of subquery columns" doesnt interfere with literal columns,
 
1162
        # test that "auto-labeling of subquery columns"
 
1163
        # doesnt interfere with literal columns,
1327
1164
        # exported columns dont get quoted
1328
1165
        self.assert_compile(
1329
 
            select(["column1 AS foobar", "column2 AS hoho", table1.c.myid], from_obj=[table1]).select(),
 
1166
            select(["column1 AS foobar", "column2 AS hoho", table1.c.myid],
 
1167
                            from_obj=[table1]).select(),
1330
1168
            "SELECT column1 AS foobar, column2 AS hoho, myid FROM "
1331
 
            "(SELECT column1 AS foobar, column2 AS hoho, mytable.myid AS myid FROM mytable)"
 
1169
            "(SELECT column1 AS foobar, column2 AS hoho, "
 
1170
                "mytable.myid AS myid FROM mytable)"
1332
1171
        )
1333
1172
 
1334
1173
        self.assert_compile(
1335
 
            select(['col1','col2'], from_obj='tablename').alias('myalias'),
 
1174
            select(['col1', 'col2'], from_obj='tablename').alias('myalias'),
1336
1175
            "SELECT col1, col2 FROM tablename"
1337
1176
        )
1338
1177
 
1341
1180
            text("select * from foo where lala=:bar and hoho=:whee",
1342
1181
                bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
1343
1182
                "select * from foo where lala=:bar and hoho=:whee",
1344
 
                checkparams={'bar':4, 'whee': 7},
 
1183
                checkparams={'bar': 4, 'whee': 7},
1345
1184
        )
1346
1185
 
1347
1186
        self.assert_compile(
1354
1193
        dialect = postgresql.dialect()
1355
1194
        self.assert_compile(
1356
1195
            text("select * from foo where lala=:bar and hoho=:whee",
1357
 
                bindparams=[bindparam('bar',4), bindparam('whee',7)]),
 
1196
                bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
1358
1197
                "select * from foo where lala=%(bar)s and hoho=%(whee)s",
1359
 
                checkparams={'bar':4, 'whee': 7},
 
1198
                checkparams={'bar': 4, 'whee': 7},
1360
1199
                dialect=dialect
1361
1200
        )
1362
1201
 
1363
1202
        # test escaping out text() params with a backslash
1364
1203
        self.assert_compile(
1365
 
            text("select * from foo where clock='05:06:07' and mork='\:mindy'"),
 
1204
            text("select * from foo where clock='05:06:07' "
 
1205
                    "and mork='\:mindy'"),
1366
1206
            "select * from foo where clock='05:06:07' and mork=':mindy'",
1367
1207
            checkparams={},
1368
1208
            params={},
1372
1212
        dialect = sqlite.dialect()
1373
1213
        self.assert_compile(
1374
1214
            text("select * from foo where lala=:bar and hoho=:whee",
1375
 
                bindparams=[bindparam('bar',4), bindparam('whee',7)]),
 
1215
                bindparams=[bindparam('bar', 4), bindparam('whee', 7)]),
1376
1216
                "select * from foo where lala=? and hoho=?",
1377
 
                checkparams={'bar':4, 'whee':7},
 
1217
                checkparams={'bar': 4, 'whee': 7},
1378
1218
                dialect=dialect
1379
1219
        )
1380
1220
 
1394
1234
        self.assert_compile(select(
1395
1235
            [alias(table1, 't'), "foo.f"],
1396
1236
            "foo.f = t.id",
1397
 
            from_obj = ["(select f from bar where lala=heyhey) foo"]
 
1237
            from_obj=["(select f from bar where lala=heyhey) foo"]
1398
1238
        ),
1399
1239
        "SELECT t.myid, t.name, t.description, foo.f FROM mytable AS t, "
1400
1240
        "(select f from bar where lala=heyhey) foo WHERE foo.f = t.id")
1402
1242
        # test Text embedded within select_from(), using binds
1403
1243
        generate_series = text(
1404
1244
                            "generate_series(:x, :y, :z) as s(a)",
1405
 
                            bindparams=[bindparam('x'), bindparam('y'), bindparam('z')]
 
1245
                            bindparams=[bindparam('x', None),
 
1246
                                bindparam('y', None), bindparam('z', None)]
1406
1247
                        )
1407
1248
 
1408
 
        s =select([
1409
 
                    (func.current_date() + literal_column("s.a")).label("dates")
 
1249
        s = select([
 
1250
                    (func.current_date() +
 
1251
                            literal_column("s.a")).label("dates")
1410
1252
                ]).select_from(generate_series)
1411
1253
        self.assert_compile(
1412
1254
                    s,
1413
 
                    "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
 
1255
                    "SELECT CURRENT_DATE + s.a AS dates FROM "
 
1256
                                        "generate_series(:x, :y, :z) as s(a)",
1414
1257
                    checkparams={'y': None, 'x': None, 'z': None}
1415
1258
                )
1416
1259
 
1417
1260
        self.assert_compile(
1418
1261
                    s.params(x=5, y=6, z=7),
1419
 
                    "SELECT CURRENT_DATE + s.a AS dates FROM generate_series(:x, :y, :z) as s(a)",
 
1262
                    "SELECT CURRENT_DATE + s.a AS dates FROM "
 
1263
                                        "generate_series(:x, :y, :z) as s(a)",
1420
1264
                    checkparams={'y': 6, 'x': 5, 'z': 7}
1421
1265
                )
1422
1266
 
1468
1312
 
1469
1313
    def test_literal(self):
1470
1314
 
1471
 
        self.assert_compile(select([literal('foo')]), "SELECT :param_1 AS anon_1")
 
1315
        self.assert_compile(select([literal('foo')]),
 
1316
                "SELECT :param_1 AS anon_1")
1472
1317
 
1473
 
        self.assert_compile(select([literal("foo") + literal("bar")], from_obj=[table1]),
 
1318
        self.assert_compile(select([literal("foo") + literal("bar")],
 
1319
                        from_obj=[table1]),
1474
1320
            "SELECT :param_1 || :param_2 AS anon_1 FROM mytable")
1475
1321
 
1476
1322
    def test_calculated_columns(self):
1477
 
         value_tbl = table('values',
 
1323
        value_tbl = table('values',
1478
1324
             column('id', Integer),
1479
1325
             column('val1', Float),
1480
1326
             column('val2', Float),
1481
1327
         )
1482
1328
 
1483
 
         self.assert_compile(
 
1329
        self.assert_compile(
1484
1330
             select([value_tbl.c.id, (value_tbl.c.val2 -
1485
 
     value_tbl.c.val1)/value_tbl.c.val1]),
1486
 
             "SELECT values.id, (values.val2 - values.val1) / values.val1 AS anon_1 FROM values"
 
1331
                        value_tbl.c.val1) / value_tbl.c.val1]),
 
1332
             "SELECT values.id, (values.val2 - values.val1) "
 
1333
             "/ values.val1 AS anon_1 FROM values"
1487
1334
         )
1488
1335
 
1489
 
         self.assert_compile(
 
1336
        self.assert_compile(
1490
1337
             select([value_tbl.c.id], (value_tbl.c.val2 -
1491
 
     value_tbl.c.val1)/value_tbl.c.val1 > 2.0),
1492
 
             "SELECT values.id FROM values WHERE (values.val2 - values.val1) / values.val1 > :param_1"
 
1338
                    value_tbl.c.val1) / value_tbl.c.val1 > 2.0),
 
1339
             "SELECT values.id FROM values WHERE "
 
1340
             "(values.val2 - values.val1) / values.val1 > :param_1"
1493
1341
         )
1494
1342
 
1495
 
         self.assert_compile(
1496
 
             select([value_tbl.c.id], value_tbl.c.val1 / (value_tbl.c.val2 - value_tbl.c.val1) /value_tbl.c.val1 > 2.0),
1497
 
             "SELECT values.id FROM values WHERE (values.val1 / (values.val2 - values.val1)) / values.val1 > :param_1"
 
1343
        self.assert_compile(
 
1344
             select([value_tbl.c.id], value_tbl.c.val1 /
 
1345
                (value_tbl.c.val2 - value_tbl.c.val1) /
 
1346
                    value_tbl.c.val1 > 2.0),
 
1347
             "SELECT values.id FROM values WHERE "
 
1348
                "(values.val1 / (values.val2 - values.val1)) "
 
1349
                "/ values.val1 > :param_1"
1498
1350
         )
1499
1351
 
1500
1352
    def test_collate(self):
1501
1353
        for expr in (select([table1.c.name.collate('latin1_german2_ci')]),
1502
1354
                     select([collate(table1.c.name, 'latin1_german2_ci')])):
1503
1355
            self.assert_compile(
1504
 
                expr, "SELECT mytable.name COLLATE latin1_german2_ci AS anon_1 FROM mytable")
1505
 
 
1506
 
        assert table1.c.name.collate('latin1_german2_ci').type is table1.c.name.type
1507
 
 
1508
 
        expr = select([table1.c.name.collate('latin1_german2_ci').label('k1')]).order_by('k1')
1509
 
        self.assert_compile(expr,"SELECT mytable.name COLLATE latin1_german2_ci AS k1 FROM mytable ORDER BY k1")
 
1356
                expr, "SELECT mytable.name COLLATE latin1_german2_ci "
 
1357
                "AS anon_1 FROM mytable")
 
1358
 
 
1359
        assert table1.c.name.collate('latin1_german2_ci').type is \
 
1360
            table1.c.name.type
 
1361
 
 
1362
        expr = select([table1.c.name.collate('latin1_german2_ci').\
 
1363
                    label('k1')]).order_by('k1')
 
1364
        self.assert_compile(expr,
 
1365
            "SELECT mytable.name "
 
1366
            "COLLATE latin1_german2_ci AS k1 FROM mytable ORDER BY k1")
1510
1367
 
1511
1368
        expr = select([collate('foo', 'latin1_german2_ci').label('k1')])
1512
 
        self.assert_compile(expr,"SELECT :param_1 COLLATE latin1_german2_ci AS k1")
 
1369
        self.assert_compile(expr,
 
1370
                    "SELECT :param_1 COLLATE latin1_german2_ci AS k1")
1513
1371
 
1514
1372
        expr = select([table1.c.name.collate('latin1_german2_ci').like('%x%')])
1515
1373
        self.assert_compile(expr,
1516
1374
                            "SELECT mytable.name COLLATE latin1_german2_ci "
1517
1375
                            "LIKE :param_1 AS anon_1 FROM mytable")
1518
1376
 
1519
 
        expr = select([table1.c.name.like(collate('%x%', 'latin1_german2_ci'))])
 
1377
        expr = select([table1.c.name.like(collate('%x%',
 
1378
                                'latin1_german2_ci'))])
1520
1379
        self.assert_compile(expr,
1521
 
                            "SELECT mytable.name "
1522
 
                            "LIKE :param_1 COLLATE latin1_german2_ci AS anon_1 "
1523
 
                            "FROM mytable")
 
1380
                        "SELECT mytable.name "
 
1381
                        "LIKE :param_1 COLLATE latin1_german2_ci AS anon_1 "
 
1382
                        "FROM mytable")
1524
1383
 
1525
1384
        expr = select([table1.c.name.collate('col1').like(
1526
1385
            collate('%x%', 'col2'))])
1529
1388
                            "LIKE :param_1 COLLATE col2 AS anon_1 "
1530
1389
                            "FROM mytable")
1531
1390
 
1532
 
        expr = select([func.concat('a', 'b').collate('latin1_german2_ci').label('x')])
 
1391
        expr = select([func.concat('a', 'b').\
 
1392
                collate('latin1_german2_ci').label('x')])
1533
1393
        self.assert_compile(expr,
1534
1394
                            "SELECT concat(:param_1, :param_2) "
1535
1395
                            "COLLATE latin1_german2_ci AS x")
1550
1410
        self.assert_compile(
1551
1411
            t.select(use_labels=True),
1552
1412
            '''SELECT "table%name"."percent%" AS "table%name_percent%", '''\
1553
 
            '''"table%name"."%(oneofthese)s" AS "table%name_%(oneofthese)s", '''\
1554
 
            '''"table%name"."spaces % more spaces" AS "table%name_spaces % '''\
 
1413
            '''"table%name"."%(oneofthese)s" AS '''\
 
1414
            '''"table%name_%(oneofthese)s", '''\
 
1415
            '''"table%name"."spaces % more spaces" AS '''\
 
1416
            '''"table%name_spaces % '''\
1555
1417
            '''more spaces" FROM "table%name"'''
1556
1418
        )
1557
1419
 
1567
1429
        self.assert_compile(
1568
1430
            select(
1569
1431
             [table1],
1570
 
                from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid)]
 
1432
                from_obj=[join(table1, table2, table1.c.myid
 
1433
                                    == table2.c.otherid)]
1571
1434
            ),
1572
1435
        "SELECT mytable.myid, mytable.name, mytable.description FROM "
1573
1436
        "mytable JOIN myothertable ON mytable.myid = myothertable.otherid")
1578
1441
                table3, table1.c.myid == table3.c.userid)]
1579
1442
            ),
1580
1443
            "SELECT mytable.myid, mytable.name, mytable.description, "
1581
 
            "myothertable.otherid, myothertable.othername, thirdtable.userid, "
1582
 
            "thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid ="
1583
 
            " myothertable.otherid JOIN thirdtable ON mytable.myid = thirdtable.userid"
 
1444
            "myothertable.otherid, myothertable.othername, "
 
1445
            "thirdtable.userid, "
 
1446
            "thirdtable.otherstuff FROM mytable JOIN myothertable "
 
1447
            "ON mytable.myid ="
 
1448
            " myothertable.otherid JOIN thirdtable ON "
 
1449
            "mytable.myid = thirdtable.userid"
1584
1450
        )
1585
1451
 
1586
1452
        self.assert_compile(
1587
 
            join(users, addresses, users.c.user_id==addresses.c.user_id).select(),
 
1453
            join(users, addresses, users.c.user_id ==
 
1454
                    addresses.c.user_id).select(),
1588
1455
            "SELECT users.user_id, users.user_name, users.password, "
1589
1456
            "addresses.address_id, addresses.user_id, addresses.street, "
1590
 
            "addresses.city, addresses.state, addresses.zip FROM users JOIN addresses "
 
1457
            "addresses.city, addresses.state, addresses.zip "
 
1458
            "FROM users JOIN addresses "
1591
1459
            "ON users.user_id = addresses.user_id"
1592
1460
        )
1593
1461
 
1594
1462
        self.assert_compile(
1595
1463
                select([table1, table2, table3],
1596
1464
 
1597
 
                from_obj = [join(table1, table2, table1.c.myid == table2.c.otherid).
1598
 
                                    outerjoin(table3, table1.c.myid==table3.c.userid)]
1599
 
                )
1600
 
                ,"SELECT mytable.myid, mytable.name, mytable.description, "
1601
 
                "myothertable.otherid, myothertable.othername, thirdtable.userid,"
1602
 
                " thirdtable.otherstuff FROM mytable JOIN myothertable ON mytable.myid "
1603
 
                "= myothertable.otherid LEFT OUTER JOIN thirdtable ON mytable.myid ="
 
1465
                from_obj=[join(table1, table2,
 
1466
                                    table1.c.myid == table2.c.otherid).
 
1467
                                    outerjoin(table3,
 
1468
                                            table1.c.myid == table3.c.userid)]
 
1469
                ),
 
1470
                "SELECT mytable.myid, mytable.name, mytable.description, "
 
1471
                "myothertable.otherid, myothertable.othername, "
 
1472
                "thirdtable.userid,"
 
1473
                " thirdtable.otherstuff FROM mytable "
 
1474
                "JOIN myothertable ON mytable.myid "
 
1475
                "= myothertable.otherid LEFT OUTER JOIN thirdtable "
 
1476
                "ON mytable.myid ="
1604
1477
                " thirdtable.userid"
1605
1478
            )
1606
1479
        self.assert_compile(
1607
1480
                select([table1, table2, table3],
1608
 
                from_obj = [outerjoin(table1,
1609
 
                                join(table2, table3, table2.c.otherid == table3.c.userid),
1610
 
                                table1.c.myid==table2.c.otherid)]
1611
 
                )
1612
 
                ,"SELECT mytable.myid, mytable.name, mytable.description, "
1613
 
                "myothertable.otherid, myothertable.othername, thirdtable.userid,"
1614
 
                " thirdtable.otherstuff FROM mytable LEFT OUTER JOIN (myothertable "
1615
 
                "JOIN thirdtable ON myothertable.otherid = thirdtable.userid) ON "
 
1481
                from_obj=[outerjoin(table1,
 
1482
                                join(table2, table3, table2.c.otherid
 
1483
                                        == table3.c.userid),
 
1484
                                table1.c.myid == table2.c.otherid)]
 
1485
                ),
 
1486
                "SELECT mytable.myid, mytable.name, mytable.description, "
 
1487
                "myothertable.otherid, myothertable.othername, "
 
1488
                "thirdtable.userid,"
 
1489
                " thirdtable.otherstuff FROM mytable LEFT OUTER JOIN "
 
1490
                "(myothertable "
 
1491
                "JOIN thirdtable ON myothertable.otherid = "
 
1492
                "thirdtable.userid) ON "
1616
1493
                "mytable.myid = myothertable.otherid"
1617
1494
            )
1618
1495
 
1624
1501
                    table2.c.othername != 'jack',
1625
1502
                    "EXISTS (select yay from foo where boo = lar)"
1626
1503
                ),
1627
 
                from_obj = [ outerjoin(table1, table2, table1.c.myid == table2.c.otherid) ]
 
1504
                from_obj=[outerjoin(table1, table2,
 
1505
                                table1.c.myid == table2.c.otherid)]
1628
1506
                )
1629
1507
        self.assert_compile(query,
1630
1508
            "SELECT mytable.myid, mytable.name, mytable.description, "
1647
1525
        x = union(
1648
1526
              select([table1], table1.c.myid == 5),
1649
1527
              select([table1], table1.c.myid == 12),
1650
 
              order_by = [table1.c.myid],
 
1528
              order_by=[table1.c.myid],
1651
1529
        )
1652
1530
 
1653
 
        self.assert_compile(x, "SELECT mytable.myid, mytable.name, mytable.description "\
1654
 
                                "FROM mytable WHERE mytable.myid = :myid_1 UNION "\
1655
 
                                "SELECT mytable.myid, mytable.name, mytable.description "\
1656
 
                                "FROM mytable WHERE mytable.myid = :myid_2 ORDER BY mytable.myid")
 
1531
        self.assert_compile(x,
 
1532
                    "SELECT mytable.myid, mytable.name, "
 
1533
                    "mytable.description "
 
1534
                    "FROM mytable WHERE "
 
1535
                    "mytable.myid = :myid_1 UNION "
 
1536
                    "SELECT mytable.myid, mytable.name, mytable.description "
 
1537
                    "FROM mytable WHERE mytable.myid = :myid_2 "
 
1538
                    "ORDER BY mytable.myid")
1657
1539
 
1658
1540
        x = union(
1659
1541
              select([table1]),
1660
1542
              select([table1])
1661
1543
        )
1662
1544
        x = union(x, select([table1]))
1663
 
        self.assert_compile(x, "(SELECT mytable.myid, mytable.name, mytable.description "
1664
 
                                "FROM mytable UNION SELECT mytable.myid, mytable.name, "
1665
 
                                "mytable.description FROM mytable) UNION SELECT mytable.myid,"
1666
 
                                " mytable.name, mytable.description FROM mytable")
 
1545
        self.assert_compile(x,
 
1546
                "(SELECT mytable.myid, mytable.name, mytable.description "
 
1547
                "FROM mytable UNION SELECT mytable.myid, mytable.name, "
 
1548
                "mytable.description FROM mytable) UNION SELECT mytable.myid,"
 
1549
                " mytable.name, mytable.description FROM mytable")
1667
1550
 
1668
1551
        u1 = union(
1669
1552
            select([table1.c.myid, table1.c.name]),
1670
1553
            select([table2]),
1671
1554
            select([table3])
1672
1555
        )
1673
 
        self.assert_compile(u1, "SELECT mytable.myid, mytable.name "
1674
 
                                "FROM mytable UNION SELECT myothertable.otherid, "
1675
 
                                "myothertable.othername FROM myothertable "
1676
 
                                "UNION SELECT thirdtable.userid, thirdtable.otherstuff "
1677
 
                                "FROM thirdtable")
 
1556
        self.assert_compile(u1,
 
1557
                "SELECT mytable.myid, mytable.name "
 
1558
                "FROM mytable UNION SELECT myothertable.otherid, "
 
1559
                "myothertable.othername FROM myothertable "
 
1560
                "UNION SELECT thirdtable.userid, thirdtable.otherstuff "
 
1561
                "FROM thirdtable")
1678
1562
 
1679
1563
        assert u1.corresponding_column(table2.c.otherid) is u1.c.myid
1680
1564
 
1687
1571
                limit=5
1688
1572
            ),
1689
1573
            "SELECT mytable.myid, mytable.name "
1690
 
            "FROM mytable UNION SELECT myothertable.otherid, myothertable.othername "
 
1574
            "FROM mytable UNION SELECT myothertable.otherid, "
 
1575
            "myothertable.othername "
1691
1576
            "FROM myothertable ORDER BY myid LIMIT :param_1 OFFSET :param_2",
1692
 
            {'param_1':5, 'param_2':10}
 
1577
            {'param_1': 5, 'param_2': 10}
1693
1578
        )
1694
1579
 
1695
1580
        self.assert_compile(
1696
1581
            union(
1697
 
                select([table1.c.myid, table1.c.name, func.max(table1.c.description)],
1698
 
                            table1.c.name=='name2',
 
1582
                select([table1.c.myid, table1.c.name,
 
1583
                            func.max(table1.c.description)],
 
1584
                            table1.c.name == 'name2',
1699
1585
                            group_by=[table1.c.myid, table1.c.name]),
1700
 
                table1.select(table1.c.name=='name1')
 
1586
                table1.select(table1.c.name == 'name1')
1701
1587
            ),
1702
 
            "SELECT mytable.myid, mytable.name, max(mytable.description) AS max_1 "
1703
 
            "FROM mytable WHERE mytable.name = :name_1 GROUP BY mytable.myid, "
1704
 
            "mytable.name UNION SELECT mytable.myid, mytable.name, mytable.description "
 
1588
            "SELECT mytable.myid, mytable.name, "
 
1589
            "max(mytable.description) AS max_1 "
 
1590
            "FROM mytable WHERE mytable.name = :name_1 "
 
1591
            "GROUP BY mytable.myid, "
 
1592
            "mytable.name UNION SELECT mytable.myid, mytable.name, "
 
1593
            "mytable.description "
1705
1594
            "FROM mytable WHERE mytable.name = :name_2"
1706
1595
        )
1707
1596
 
1720
1609
                    select([table2.c.otherid]),
1721
1610
                    select([table3.c.userid]),
1722
1611
                )
1723
 
            )
1724
 
            ,
 
1612
            ),
 
1613
 
1725
1614
            "SELECT mytable.myid FROM mytable UNION ALL "
1726
1615
            "(SELECT myothertable.otherid FROM myothertable UNION "
1727
1616
            "SELECT thirdtable.userid FROM thirdtable)"
1730
1619
 
1731
1620
        s = select([column('foo'), column('bar')])
1732
1621
 
1733
 
        # ORDER BY's even though not supported by all DB's, are rendered if requested
 
1622
        # ORDER BY's even though not supported by
 
1623
        # all DB's, are rendered if requested
1734
1624
        self.assert_compile(union(s.order_by("foo"), s.order_by("bar")),
1735
1625
            "SELECT foo, bar ORDER BY foo UNION SELECT foo, bar ORDER BY bar"
1736
1626
        )
1737
1627
        # self_group() is honored
1738
1628
        self.assert_compile(
1739
 
            union(s.order_by("foo").self_group(), s.order_by("bar").limit(10).self_group()),
1740
 
            "(SELECT foo, bar ORDER BY foo) UNION (SELECT foo, bar ORDER BY bar LIMIT :param_1)",
1741
 
            {'param_1':10}
 
1629
            union(s.order_by("foo").self_group(),
 
1630
                        s.order_by("bar").limit(10).self_group()),
 
1631
            "(SELECT foo, bar ORDER BY foo) UNION (SELECT foo, "
 
1632
                "bar ORDER BY bar LIMIT :param_1)",
 
1633
            {'param_1': 10}
1742
1634
 
1743
1635
        )
1744
1636
 
1754
1646
        self.assert_compile(
1755
1647
            union(s, s, s, s),
1756
1648
            "SELECT foo, bar FROM bat UNION SELECT foo, bar "
1757
 
            "FROM bat UNION SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat"
 
1649
            "FROM bat UNION SELECT foo, bar FROM bat "
 
1650
            "UNION SELECT foo, bar FROM bat"
1758
1651
        )
1759
1652
 
1760
1653
        self.assert_compile(
1761
1654
            union(s, union(s, union(s, s))),
1762
1655
            "SELECT foo, bar FROM bat UNION (SELECT foo, bar FROM bat "
1763
 
            "UNION (SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat))"
 
1656
            "UNION (SELECT foo, bar FROM bat "
 
1657
                "UNION SELECT foo, bar FROM bat))"
1764
1658
        )
1765
1659
 
1766
1660
        self.assert_compile(
1767
1661
            select([s.alias()]),
1768
 
            'SELECT anon_1.foo, anon_1.bar FROM (SELECT foo, bar FROM bat) AS anon_1'
 
1662
            'SELECT anon_1.foo, anon_1.bar FROM '
 
1663
            '(SELECT foo, bar FROM bat) AS anon_1'
1769
1664
        )
1770
1665
 
1771
1666
        self.assert_compile(
1772
1667
            select([union(s, s).alias()]),
1773
1668
            'SELECT anon_1.foo, anon_1.bar FROM '
1774
 
            '(SELECT foo, bar FROM bat UNION SELECT foo, bar FROM bat) AS anon_1'
 
1669
            '(SELECT foo, bar FROM bat UNION '
 
1670
                'SELECT foo, bar FROM bat) AS anon_1'
1775
1671
        )
1776
1672
 
1777
1673
        self.assert_compile(
1778
1674
            select([except_(s, s).alias()]),
1779
1675
            'SELECT anon_1.foo, anon_1.bar FROM '
1780
 
            '(SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat) AS anon_1'
 
1676
            '(SELECT foo, bar FROM bat EXCEPT '
 
1677
                'SELECT foo, bar FROM bat) AS anon_1'
1781
1678
        )
1782
1679
 
1783
1680
        # this query sqlite specifically chokes on
1806
1703
                s
1807
1704
            ),
1808
1705
            "SELECT anon_1.foo, anon_1.bar FROM "
1809
 
            "(SELECT foo, bar FROM bat EXCEPT SELECT foo, bar FROM bat) AS anon_1 "
 
1706
            "(SELECT foo, bar FROM bat EXCEPT "
 
1707
                "SELECT foo, bar FROM bat) AS anon_1 "
1810
1708
            "UNION SELECT foo, bar FROM bat"
1811
1709
        )
1812
1710
 
1832
1730
                intersect(s, s)
1833
1731
            ),
1834
1732
            "(SELECT foo, bar FROM bat INTERSECT SELECT foo, bar FROM bat) "
1835
 
            "UNION (SELECT foo, bar FROM bat INTERSECT SELECT foo, bar FROM bat)"
 
1733
            "UNION (SELECT foo, bar FROM bat INTERSECT "
 
1734
                "SELECT foo, bar FROM bat)"
1836
1735
        )
1837
1736
 
1838
 
    @testing.uses_deprecated()
1839
1737
    def test_binds(self):
1840
1738
        for (
1841
1739
             stmt,
1866
1764
                 {'mytablename':5}, {'mytablename':5}, [5]
1867
1765
             ),
1868
1766
             (
1869
 
                 select([table1], or_(table1.c.myid==bindparam('myid'),
1870
 
                                        table2.c.otherid==bindparam('myid'))),
 
1767
                 select([table1], or_(table1.c.myid == bindparam('myid'),
 
1768
                                        table2.c.otherid == bindparam('myid'))),
1871
1769
                 "SELECT mytable.myid, mytable.name, mytable.description "
1872
1770
                        "FROM mytable, myothertable WHERE mytable.myid = :myid "
1873
1771
                        "OR myothertable.otherid = :myid",
1874
1772
                 "SELECT mytable.myid, mytable.name, mytable.description "
1875
1773
                        "FROM mytable, myothertable WHERE mytable.myid = ? "
1876
1774
                        "OR myothertable.otherid = ?",
1877
 
                 {'myid':None}, [None, None],
1878
 
                 {'myid':5}, {'myid':5}, [5,5]
 
1775
                 {'myid': None}, [None, None],
 
1776
                 {'myid': 5}, {'myid': 5}, [5, 5]
1879
1777
             ),
1880
1778
             (
1881
1779
                 text("SELECT mytable.myid, mytable.name, mytable.description FROM "
1882
 
                                "mytable, myothertable WHERE mytable.myid = :myid OR "
1883
 
                                "myothertable.otherid = :myid"),
1884
 
                 "SELECT mytable.myid, mytable.name, mytable.description FROM "
1885
 
                                "mytable, myothertable WHERE mytable.myid = :myid OR "
1886
 
                                "myothertable.otherid = :myid",
1887
 
                 "SELECT mytable.myid, mytable.name, mytable.description FROM "
1888
 
                                "mytable, myothertable WHERE mytable.myid = ? OR "
1889
 
                                "myothertable.otherid = ?",
 
1780
                            "mytable, myothertable WHERE mytable.myid = :myid OR "
 
1781
                            "myothertable.otherid = :myid"),
 
1782
                 "SELECT mytable.myid, mytable.name, mytable.description FROM "
 
1783
                            "mytable, myothertable WHERE mytable.myid = :myid OR "
 
1784
                            "myothertable.otherid = :myid",
 
1785
                 "SELECT mytable.myid, mytable.name, mytable.description FROM "
 
1786
                            "mytable, myothertable WHERE mytable.myid = ? OR "
 
1787
                            "myothertable.otherid = ?",
1890
1788
                 {'myid':None}, [None, None],
1891
 
                 {'myid':5}, {'myid':5}, [5,5]
 
1789
                 {'myid': 5}, {'myid': 5}, [5, 5]
1892
1790
             ),
1893
1791
             (
1894
 
                 select([table1], or_(table1.c.myid==bindparam('myid', unique=True),
1895
 
                                    table2.c.otherid==bindparam('myid', unique=True))),
 
1792
                 select([table1], or_(table1.c.myid ==
 
1793
                                    bindparam('myid', unique=True),
 
1794
                                    table2.c.otherid ==
 
1795
                                    bindparam('myid', unique=True))),
1896
1796
                 "SELECT mytable.myid, mytable.name, mytable.description FROM "
1897
1797
                                "mytable, myothertable WHERE mytable.myid = "
1898
1798
                                ":myid_1 OR myothertable.otherid = :myid_2",
1900
1800
                                "mytable, myothertable WHERE mytable.myid = ? "
1901
1801
                                "OR myothertable.otherid = ?",
1902
1802
                 {'myid_1':None, 'myid_2':None}, [None, None],
1903
 
                 {'myid_1':5, 'myid_2': 6}, {'myid_1':5, 'myid_2':6}, [5,6]
 
1803
                 {'myid_1': 5, 'myid_2': 6}, {'myid_1': 5, 'myid_2': 6}, [5, 6]
1904
1804
             ),
1905
1805
             (
1906
 
                bindparam('test', type_=String) + text("'hi'"),
 
1806
                bindparam('test', type_=String, required=False) + text("'hi'"),
1907
1807
                ":test || 'hi'",
1908
1808
                "? || 'hi'",
1909
1809
                {'test':None}, [None],
1910
1810
                {}, {'test':None}, [None]
1911
1811
             ),
1912
1812
             (
1913
 
                 select([table1], or_(table1.c.myid==bindparam('myid'),
1914
 
                                    table2.c.otherid==bindparam('myotherid'))).\
 
1813
                # testing select.params() here - bindparam() objects
 
1814
                # must get required flag set to False
 
1815
                 select([table1], or_(table1.c.myid == bindparam('myid'),
 
1816
                                    table2.c.otherid == bindparam('myotherid'))).\
1915
1817
                                        params({'myid':8, 'myotherid':7}),
1916
1818
                 "SELECT mytable.myid, mytable.name, mytable.description FROM "
1917
1819
                                    "mytable, myothertable WHERE mytable.myid = "
1919
1821
                 "SELECT mytable.myid, mytable.name, mytable.description FROM "
1920
1822
                                    "mytable, myothertable WHERE mytable.myid = "
1921
1823
                                    "? OR myothertable.otherid = ?",
1922
 
                 {'myid':8, 'myotherid':7}, [8, 7],
1923
 
                 {'myid':5}, {'myid':5, 'myotherid':7}, [5,7]
 
1824
                 {'myid': 8, 'myotherid': 7}, [8, 7],
 
1825
                 {'myid': 5}, {'myid': 5, 'myotherid': 7}, [5, 7]
1924
1826
             ),
1925
1827
             (
1926
 
                 select([table1], or_(table1.c.myid==bindparam('myid', value=7, unique=True),
1927
 
                                    table2.c.otherid==bindparam('myid', value=8, unique=True))),
 
1828
                 select([table1], or_(table1.c.myid ==
 
1829
                                    bindparam('myid', value=7, unique=True),
 
1830
                                    table2.c.otherid ==
 
1831
                                    bindparam('myid', value=8, unique=True))),
1928
1832
                 "SELECT mytable.myid, mytable.name, mytable.description FROM "
1929
1833
                                    "mytable, myothertable WHERE mytable.myid = "
1930
1834
                                    ":myid_1 OR myothertable.otherid = :myid_2",
1931
1835
                 "SELECT mytable.myid, mytable.name, mytable.description FROM "
1932
1836
                                    "mytable, myothertable WHERE mytable.myid = "
1933
1837
                                    "? OR myothertable.otherid = ?",
1934
 
                 {'myid_1':7, 'myid_2':8}, [7,8],
1935
 
                 {'myid_1':5, 'myid_2':6}, {'myid_1':5, 'myid_2':6}, [5,6]
 
1838
                 {'myid_1': 7, 'myid_2': 8}, [7, 8],
 
1839
                 {'myid_1': 5, 'myid_2': 6}, {'myid_1': 5, 'myid_2': 6}, [5, 6]
1936
1840
             ),
1937
1841
             ]:
1938
1842
 
1939
 
                self.assert_compile(stmt, expected_named_stmt, params=expected_default_params_dict)
1940
 
                self.assert_compile(stmt, expected_positional_stmt, dialect=sqlite.dialect())
 
1843
                self.assert_compile(stmt, expected_named_stmt,
 
1844
                            params=expected_default_params_dict)
 
1845
                self.assert_compile(stmt, expected_positional_stmt,
 
1846
                            dialect=sqlite.dialect())
1941
1847
                nonpositional = stmt.compile()
1942
1848
                positional = stmt.compile(dialect=sqlite.dialect())
1943
1849
                pp = positional.params
1944
 
                assert [pp[k] for k in positional.positiontup] == expected_default_params_list
1945
 
                assert nonpositional.construct_params(test_param_dict) == expected_test_params_dict, \
1946
 
                                    "expected :%s got %s" % (str(expected_test_params_dict), \
1947
 
                                    str(nonpositional.get_params(**test_param_dict)))
 
1850
                eq_([pp[k] for k in positional.positiontup],
 
1851
                                    expected_default_params_list)
 
1852
 
 
1853
                eq_(nonpositional.construct_params(test_param_dict),
 
1854
                                    expected_test_params_dict)
1948
1855
                pp = positional.construct_params(test_param_dict)
1949
 
                assert [pp[k] for k in positional.positiontup] == expected_test_params_list
 
1856
                eq_(
 
1857
                        [pp[k] for k in positional.positiontup],
 
1858
                        expected_test_params_list
 
1859
                )
1950
1860
 
1951
1861
        # check that params() doesnt modify original statement
1952
 
        s = select([table1], or_(table1.c.myid==bindparam('myid'),
1953
 
                                    table2.c.otherid==bindparam('myotherid')))
1954
 
        s2 = s.params({'myid':8, 'myotherid':7})
1955
 
        s3 = s2.params({'myid':9})
1956
 
        assert s.compile().params == {'myid':None, 'myotherid':None}
1957
 
        assert s2.compile().params == {'myid':8, 'myotherid':7}
1958
 
        assert s3.compile().params == {'myid':9, 'myotherid':7}
 
1862
        s = select([table1], or_(table1.c.myid == bindparam('myid'),
 
1863
                                    table2.c.otherid ==
 
1864
                                        bindparam('myotherid')))
 
1865
        s2 = s.params({'myid': 8, 'myotherid': 7})
 
1866
        s3 = s2.params({'myid': 9})
 
1867
        assert s.compile().params == {'myid': None, 'myotherid': None}
 
1868
        assert s2.compile().params == {'myid': 8, 'myotherid': 7}
 
1869
        assert s3.compile().params == {'myid': 9, 'myotherid': 7}
1959
1870
 
1960
1871
        # test using same 'unique' param object twice in one compile
1961
 
        s = select([table1.c.myid]).where(table1.c.myid==12).as_scalar()
1962
 
        s2 = select([table1, s], table1.c.myid==s)
 
1872
        s = select([table1.c.myid]).where(table1.c.myid == 12).as_scalar()
 
1873
        s2 = select([table1, s], table1.c.myid == s)
1963
1874
        self.assert_compile(s2,
1964
1875
            "SELECT mytable.myid, mytable.name, mytable.description, "
1965
1876
            "(SELECT mytable.myid FROM mytable WHERE mytable.myid = "\
1971
1882
        assert [pp[k] for k in positional.positiontup] == [12, 12]
1972
1883
 
1973
1884
        # check that conflicts with "unique" params are caught
1974
 
        s = select([table1], or_(table1.c.myid==7,
1975
 
                                        table1.c.myid==bindparam('myid_1')))
1976
 
        assert_raises_message(exc.CompileError,
1977
 
                                "conflicts with unique bind parameter "
1978
 
                                "of the same name",
1979
 
                                str, s)
1980
 
 
1981
 
        s = select([table1], or_(table1.c.myid==7, table1.c.myid==8,
1982
 
                                        table1.c.myid==bindparam('myid_1')))
1983
 
        assert_raises_message(exc.CompileError,
1984
 
                                "conflicts with unique bind parameter "
1985
 
                                "of the same name",
1986
 
                                str, s)
1987
 
 
1988
 
    def test_binds_no_hash_collision(self):
1989
 
        """test that construct_params doesn't corrupt dict due to hash collisions"""
 
1885
        s = select([table1], or_(table1.c.myid == 7,
 
1886
                                        table1.c.myid == bindparam('myid_1')))
 
1887
        assert_raises_message(exc.CompileError,
 
1888
                                "conflicts with unique bind parameter "
 
1889
                                "of the same name",
 
1890
                                str, s)
 
1891
 
 
1892
        s = select([table1], or_(table1.c.myid == 7, table1.c.myid == 8,
 
1893
                                        table1.c.myid == bindparam('myid_1')))
 
1894
        assert_raises_message(exc.CompileError,
 
1895
                                "conflicts with unique bind parameter "
 
1896
                                "of the same name",
 
1897
                                str, s)
 
1898
 
 
1899
    def _test_binds_no_hash_collision(self):
 
1900
        """test that construct_params doesn't corrupt dict
 
1901
            due to hash collisions"""
1990
1902
 
1991
1903
        total_params = 100000
1992
1904
 
1993
1905
        in_clause = [':in%d' % i for i in range(total_params)]
1994
1906
        params = dict(('in%d' % i, i) for i in range(total_params))
1995
 
        sql = 'text clause %s' % ', '.join(in_clause)
1996
 
        t = text(sql)
 
1907
        t = text('text clause %s' % ', '.join(in_clause))
1997
1908
        eq_(len(t.bindparams), total_params)
1998
1909
        c = t.compile()
1999
1910
        pp = c.construct_params(params)
2014
1925
        self.assert_compile(
2015
1926
            expr,
2016
1927
            "x = :key",
2017
 
            {'x':12}
 
1928
            {'x': 12}
2018
1929
        )
2019
1930
 
2020
1931
    def test_bind_params_missing(self):
2022
1933
            r"A value is required for bind parameter 'x'",
2023
1934
            select([table1]).where(
2024
1935
                    and_(
2025
 
                        table1.c.myid==bindparam("x", required=True),
2026
 
                        table1.c.name==bindparam("y", required=True)
 
1936
                        table1.c.myid == bindparam("x", required=True),
 
1937
                        table1.c.name == bindparam("y", required=True)
2027
1938
                    )
2028
1939
                ).compile().construct_params,
2029
1940
            params=dict(y=5)
2032
1943
        assert_raises_message(exc.InvalidRequestError,
2033
1944
            r"A value is required for bind parameter 'x'",
2034
1945
            select([table1]).where(
2035
 
                    table1.c.myid==bindparam("x", required=True)
 
1946
                    table1.c.myid == bindparam("x", required=True)
2036
1947
                ).compile().construct_params
2037
1948
        )
2038
1949
 
2041
1952
                "in parameter group 2",
2042
1953
            select([table1]).where(
2043
1954
                    and_(
2044
 
                        table1.c.myid==bindparam("x", required=True),
2045
 
                        table1.c.name==bindparam("y", required=True)
 
1955
                        table1.c.myid == bindparam("x", required=True),
 
1956
                        table1.c.name == bindparam("y", required=True)
2046
1957
                    )
2047
1958
                ).compile().construct_params,
2048
1959
            params=dict(y=5),
2053
1964
            r"A value is required for bind parameter 'x', "
2054
1965
                "in parameter group 2",
2055
1966
            select([table1]).where(
2056
 
                    table1.c.myid==bindparam("x", required=True)
 
1967
                    table1.c.myid == bindparam("x", required=True)
2057
1968
                ).compile().construct_params,
2058
1969
            _group_number=2
2059
1970
        )
2060
1971
 
2061
1972
 
2062
1973
 
2063
 
    @testing.emits_warning('.*empty sequence.*')
2064
 
    def test_in(self):
2065
 
        self.assert_compile(table1.c.myid.in_(['a']),
2066
 
        "mytable.myid IN (:myid_1)")
2067
 
 
2068
 
        self.assert_compile(~table1.c.myid.in_(['a']),
2069
 
        "mytable.myid NOT IN (:myid_1)")
2070
 
 
2071
 
        self.assert_compile(table1.c.myid.in_(['a', 'b']),
2072
 
        "mytable.myid IN (:myid_1, :myid_2)")
2073
 
 
2074
 
        self.assert_compile(table1.c.myid.in_(iter(['a', 'b'])),
2075
 
        "mytable.myid IN (:myid_1, :myid_2)")
2076
 
 
2077
 
        self.assert_compile(table1.c.myid.in_([literal('a')]),
2078
 
        "mytable.myid IN (:param_1)")
2079
 
 
2080
 
        self.assert_compile(table1.c.myid.in_([literal('a'), 'b']),
2081
 
        "mytable.myid IN (:param_1, :myid_1)")
2082
 
 
2083
 
        self.assert_compile(table1.c.myid.in_([literal('a'), literal('b')]),
2084
 
        "mytable.myid IN (:param_1, :param_2)")
2085
 
 
2086
 
        self.assert_compile(table1.c.myid.in_(['a', literal('b')]),
2087
 
        "mytable.myid IN (:myid_1, :param_1)")
2088
 
 
2089
 
        self.assert_compile(table1.c.myid.in_([literal(1) + 'a']),
2090
 
        "mytable.myid IN (:param_1 + :param_2)")
2091
 
 
2092
 
        self.assert_compile(table1.c.myid.in_([literal('a') +'a', 'b']),
2093
 
        "mytable.myid IN (:param_1 || :param_2, :myid_1)")
2094
 
 
2095
 
        self.assert_compile(table1.c.myid.in_([literal('a') + literal('a'), literal('b')]),
2096
 
        "mytable.myid IN (:param_1 || :param_2, :param_3)")
2097
 
 
2098
 
        self.assert_compile(table1.c.myid.in_([1, literal(3) + 4]),
2099
 
        "mytable.myid IN (:myid_1, :param_1 + :param_2)")
2100
 
 
2101
 
        self.assert_compile(table1.c.myid.in_([literal('a') < 'b']),
2102
 
        "mytable.myid IN (:param_1 < :param_2)")
2103
 
 
2104
 
        self.assert_compile(table1.c.myid.in_([table1.c.myid]),
2105
 
        "mytable.myid IN (mytable.myid)")
2106
 
 
2107
 
        self.assert_compile(table1.c.myid.in_(['a', table1.c.myid]),
2108
 
        "mytable.myid IN (:myid_1, mytable.myid)")
2109
 
 
2110
 
        self.assert_compile(table1.c.myid.in_([literal('a'), table1.c.myid]),
2111
 
        "mytable.myid IN (:param_1, mytable.myid)")
2112
 
 
2113
 
        self.assert_compile(table1.c.myid.in_([literal('a'), table1.c.myid +'a']),
2114
 
        "mytable.myid IN (:param_1, mytable.myid + :myid_1)")
2115
 
 
2116
 
        self.assert_compile(table1.c.myid.in_([literal(1), 'a' + table1.c.myid]),
2117
 
        "mytable.myid IN (:param_1, :myid_1 + mytable.myid)")
2118
 
 
2119
 
        self.assert_compile(table1.c.myid.in_([1, 2, 3]),
2120
 
        "mytable.myid IN (:myid_1, :myid_2, :myid_3)")
2121
 
 
2122
 
        self.assert_compile(table1.c.myid.in_(select([table2.c.otherid])),
2123
 
        "mytable.myid IN (SELECT myothertable.otherid FROM myothertable)")
2124
 
 
2125
 
        self.assert_compile(~table1.c.myid.in_(select([table2.c.otherid])),
2126
 
        "mytable.myid NOT IN (SELECT myothertable.otherid FROM myothertable)")
2127
 
 
2128
 
        # text
2129
 
        self.assert_compile(
2130
 
                table1.c.myid.in_(
2131
 
                        text("SELECT myothertable.otherid FROM myothertable")
2132
 
                    ),
2133
 
                    "mytable.myid IN (SELECT myothertable.otherid "
2134
 
                    "FROM myothertable)"
2135
 
        )
2136
 
 
2137
 
        # test empty in clause
2138
 
        self.assert_compile(table1.c.myid.in_([]),
2139
 
        "mytable.myid != mytable.myid")
2140
 
 
2141
 
        self.assert_compile(
2142
 
            select([table1.c.myid.in_(select([table2.c.otherid]))]),
2143
 
            "SELECT mytable.myid IN (SELECT myothertable.otherid FROM myothertable) AS anon_1 FROM mytable"
2144
 
        )
2145
 
        self.assert_compile(
2146
 
            select([table1.c.myid.in_(select([table2.c.otherid]).as_scalar())]),
2147
 
            "SELECT mytable.myid IN (SELECT myothertable.otherid FROM myothertable) AS anon_1 FROM mytable"
2148
 
        )
2149
 
 
2150
 
        self.assert_compile(table1.c.myid.in_(
2151
 
            union(
2152
 
                  select([table1.c.myid], table1.c.myid == 5),
2153
 
                  select([table1.c.myid], table1.c.myid == 12),
2154
 
            )
2155
 
        ), "mytable.myid IN ("\
2156
 
        "SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_1 "\
2157
 
        "UNION SELECT mytable.myid FROM mytable WHERE mytable.myid = :myid_2)")
2158
 
 
2159
 
        # test that putting a select in an IN clause does not blow away its ORDER BY clause
2160
 
        self.assert_compile(
2161
 
            select([table1, table2],
2162
 
                table2.c.otherid.in_(
2163
 
                    select([table2.c.otherid], order_by=[table2.c.othername], limit=10, correlate=False)
2164
 
                ),
2165
 
                from_obj=[table1.join(table2, table1.c.myid==table2.c.otherid)], order_by=[table1.c.myid]
2166
 
            ),
2167
 
            "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername FROM mytable "\
2168
 
            "JOIN myothertable ON mytable.myid = myothertable.otherid WHERE myothertable.otherid IN (SELECT myothertable.otherid "\
2169
 
            "FROM myothertable ORDER BY myothertable.othername LIMIT :param_1) ORDER BY mytable.myid",
2170
 
            {'param_1':10}
2171
 
        )
2172
1974
 
2173
1975
    def test_tuple(self):
2174
 
        self.assert_compile(tuple_(table1.c.myid, table1.c.name).in_([(1, 'foo'), (5, 'bar')]),
2175
 
            "(mytable.myid, mytable.name) IN ((:param_1, :param_2), (:param_3, :param_4))"
 
1976
        self.assert_compile(
 
1977
            tuple_(table1.c.myid, table1.c.name).in_(
 
1978
                    [(1, 'foo'), (5, 'bar')]),
 
1979
            "(mytable.myid, mytable.name) IN "
 
1980
            "((:param_1, :param_2), (:param_3, :param_4))"
2176
1981
        )
2177
1982
 
2178
1983
        self.assert_compile(
2179
1984
            tuple_(table1.c.myid, table1.c.name).in_(
2180
1985
                        [tuple_(table2.c.otherid, table2.c.othername)]
2181
1986
                    ),
2182
 
            "(mytable.myid, mytable.name) IN ((myothertable.otherid, myothertable.othername))"
 
1987
            "(mytable.myid, mytable.name) IN "
 
1988
            "((myothertable.otherid, myothertable.othername))"
2183
1989
        )
2184
1990
 
2185
1991
        self.assert_compile(
2211
2017
            eq_(str(cast(1234, Text).compile(dialect=dialect)),
2212
2018
                            'CAST(%s AS %s)' % (literal, expected_results[3]))
2213
2019
            eq_(str(cast('test', String(20)).compile(dialect=dialect)),
2214
 
                            'CAST(%s AS %s)' %(literal, expected_results[4]))
 
2020
                            'CAST(%s AS %s)' % (literal, expected_results[4]))
 
2021
 
2215
2022
            # fixme: shoving all of this dialect-specific stuff in one test
2216
2023
            # is now officialy completely ridiculous AND non-obviously omits
2217
2024
            # coverage on other dialects.
2227
2034
                        "anon_1 \nFROM casttest")
2228
2035
 
2229
2036
        # first test with PostgreSQL engine
2230
 
        check_results(postgresql.dialect(), ['NUMERIC', 'NUMERIC(12, 9)'
2231
 
                      , 'DATE', 'TEXT', 'VARCHAR(20)'], '%(param_1)s')
 
2037
        check_results(postgresql.dialect(), ['NUMERIC', 'NUMERIC(12, 9)',
 
2038
                       'DATE', 'TEXT', 'VARCHAR(20)'], '%(param_1)s')
2232
2039
 
2233
2040
        # then the Oracle engine
2234
2041
        check_results(oracle.dialect(), ['NUMERIC', 'NUMERIC(12, 9)',
2373
2180
        table = Table('dt', metadata,
2374
2181
            Column('date', Date))
2375
2182
        self.assert_compile(
2376
 
            table.select(table.c.date.between(datetime.date(2006,6,1),
2377
 
                                            datetime.date(2006,6,5))),
 
2183
            table.select(table.c.date.between(datetime.date(2006, 6, 1),
 
2184
                                            datetime.date(2006, 6, 5))),
2378
2185
            "SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2",
2379
 
            checkparams={'date_1':datetime.date(2006,6,1),
2380
 
                            'date_2':datetime.date(2006,6,5)})
 
2186
            checkparams={'date_1': datetime.date(2006, 6, 1),
 
2187
                            'date_2': datetime.date(2006, 6, 5)})
2381
2188
 
2382
2189
        self.assert_compile(
2383
 
            table.select(sql.between(table.c.date, datetime.date(2006,6,1),
2384
 
                                        datetime.date(2006,6,5))),
 
2190
            table.select(sql.between(table.c.date, datetime.date(2006, 6, 1),
 
2191
                                        datetime.date(2006, 6, 5))),
2385
2192
            "SELECT dt.date FROM dt WHERE dt.date BETWEEN :date_1 AND :date_2",
2386
 
            checkparams={'date_1':datetime.date(2006,6,1),
2387
 
                            'date_2':datetime.date(2006,6,5)})
2388
 
 
 
2193
            checkparams={'date_1': datetime.date(2006, 6, 1),
 
2194
                            'date_2': datetime.date(2006, 6, 5)})
2389
2195
 
2390
2196
 
2391
2197
    def test_delayed_col_naming(self):
2405
2211
 
2406
2212
        assert_raises_message(
2407
2213
            exc.CompileError,
2408
 
            "Cannot compile Column object until it's 'name' is assigned.",
 
2214
            "Cannot compile Column object until its 'name' is assigned.",
2409
2215
            str, sel2
2410
2216
        )
2411
2217
 
2412
2218
        sel3 = select([my_str]).as_scalar()
2413
2219
        assert_raises_message(
2414
2220
            exc.CompileError,
2415
 
            "Cannot compile Column object until it's 'name' is assigned.",
 
2221
            "Cannot compile Column object until its 'name' is assigned.",
2416
2222
            str, sel3
2417
2223
        )
2418
2224
 
2447
2253
        t1 = Table('mytable', meta, Column('col1', Integer))
2448
2254
 
2449
2255
        exprs = (
2450
 
            table1.c.myid==12,
 
2256
            table1.c.myid == 12,
2451
2257
            func.hoho(table1.c.myid),
2452
2258
            cast(table1.c.name, Numeric)
2453
2259
        )
2455
2261
            (table1.c.name, 'name', 'mytable.name', None),
2456
2262
            (exprs[0], str(exprs[0]), 'mytable.myid = :myid_1', 'anon_1'),
2457
2263
            (exprs[1], str(exprs[1]), 'hoho(mytable.myid)', 'hoho_1'),
2458
 
            (exprs[2], str(exprs[2]), 'CAST(mytable.name AS NUMERIC)', 'anon_1'),
 
2264
            (exprs[2], str(exprs[2]),
 
2265
                        'CAST(mytable.name AS NUMERIC)', 'anon_1'),
2459
2266
            (t1.c.col1, 'col1', 'mytable.col1', None),
2460
 
            (column('some wacky thing'), 'some wacky thing', '"some wacky thing"', '')
 
2267
            (column('some wacky thing'), 'some wacky thing',
 
2268
                '"some wacky thing"', '')
2461
2269
        ):
2462
2270
            if getattr(col, 'table', None) is not None:
2463
2271
                t = col.table
2468
2276
            assert s1.c.keys() == [key], s1.c.keys()
2469
2277
 
2470
2278
            if label:
2471
 
                self.assert_compile(s1, "SELECT %s AS %s FROM mytable" % (expr, label))
 
2279
                self.assert_compile(s1,
 
2280
                        "SELECT %s AS %s FROM mytable" % (expr, label))
2472
2281
            else:
2473
2282
                self.assert_compile(s1, "SELECT %s FROM mytable" % (expr,))
2474
2283
 
2481
2290
                # sqlite rule labels subquery columns
2482
2291
                self.assert_compile(s1,
2483
2292
                            "SELECT %s FROM (SELECT %s AS %s FROM mytable)" %
2484
 
                            (key,expr, key))
 
2293
                            (key, expr, key))
2485
2294
            else:
2486
2295
                self.assert_compile(s1,
2487
2296
                            "SELECT %s FROM (SELECT %s FROM mytable)" %
2488
 
                            (expr,expr))
 
2297
                            (expr, expr))
2489
2298
 
2490
2299
    def test_hints(self):
2491
2300
        s = select([table1.c.myid]).with_hint(table1, "test hint %(name)s")
2499
2308
 
2500
2309
        subs4 = select([
2501
2310
            table1, table2
2502
 
        ]).select_from(table1.join(table2, table1.c.myid==table2.c.otherid)).\
 
2311
        ]).select_from(table1.join(table2, table1.c.myid == table2.c.otherid)).\
2503
2312
            with_hint(table1, 'hint1')
2504
2313
 
2505
2314
        s4 = select([table3]).select_from(
2506
2315
                        table3.join(
2507
2316
                                subs4,
2508
 
                                subs4.c.othername==table3.c.otherstuff
 
2317
                                subs4.c.othername == table3.c.otherstuff
2509
2318
                            )
2510
2319
                    ).\
2511
2320
                    with_hint(table3, 'hint3')
2512
2321
 
2513
 
        subs5 = select([
2514
 
            table1, table2
2515
 
        ]).select_from(table1.join(table2, table1.c.myid==table2.c.otherid))
2516
 
        s5 = select([table3]).select_from(
2517
 
                        table3.join(
2518
 
                                subs5,
2519
 
                                subs5.c.othername==table3.c.otherstuff
2520
 
                            )
2521
 
                    ).\
2522
 
                    with_hint(table3, 'hint3').\
2523
 
                    with_hint(table1, 'hint1')
2524
2322
 
2525
2323
        t1 = table('QuotedName', column('col1'))
2526
 
        s6 = select([t1.c.col1]).where(t1.c.col1>10).with_hint(t1, '%(name)s idx1')
 
2324
        s6 = select([t1.c.col1]).where(t1.c.col1 > 10).\
 
2325
                    with_hint(t1, '%(name)s idx1')
2527
2326
        a2 = t1.alias('SomeName')
2528
 
        s7 = select([a2.c.col1]).where(a2.c.col1>10).with_hint(a2, '%(name)s idx1')
 
2327
        s7 = select([a2.c.col1]).where(a2.c.col1 > 10).\
 
2328
                    with_hint(a2, '%(name)s idx1')
2529
2329
 
2530
2330
        mysql_d, oracle_d, sybase_d = \
2531
2331
                            mysql.dialect(), \
2609
2409
        )
2610
2410
 
2611
2411
 
 
2412
class KwargPropagationTest(fixtures.TestBase):
 
2413
 
 
2414
    @classmethod
 
2415
    def setup_class(cls):
 
2416
        from sqlalchemy.sql.expression import ColumnClause, TableClause
 
2417
        class CatchCol(ColumnClause):
 
2418
            pass
 
2419
 
 
2420
        class CatchTable(TableClause):
 
2421
            pass
 
2422
 
 
2423
        cls.column = CatchCol("x")
 
2424
        cls.table = CatchTable("y")
 
2425
        cls.criterion = cls.column == CatchCol('y')
 
2426
 
 
2427
        @compiles(CatchCol)
 
2428
        def compile_col(element, compiler, **kw):
 
2429
            assert "canary" in kw
 
2430
            return compiler.visit_column(element)
 
2431
 
 
2432
        @compiles(CatchTable)
 
2433
        def compile_table(element, compiler, **kw):
 
2434
            assert "canary" in kw
 
2435
            return compiler.visit_table(element)
 
2436
 
 
2437
    def _do_test(self, element):
 
2438
        d = default.DefaultDialect()
 
2439
        d.statement_compiler(d, element,
 
2440
                        compile_kwargs={"canary": True})
 
2441
 
 
2442
    def test_binary(self):
 
2443
        self._do_test(self.column == 5)
 
2444
 
 
2445
    def test_select(self):
 
2446
        s = select([self.column]).select_from(self.table).\
 
2447
                where(self.column == self.criterion).\
 
2448
                order_by(self.column)
 
2449
        self._do_test(s)
 
2450
 
 
2451
    def test_case(self):
 
2452
        c = case([(self.criterion, self.column)], else_=self.column)
 
2453
        self._do_test(c)
 
2454
 
 
2455
    def test_cast(self):
 
2456
        c = cast(self.column, Integer)
 
2457
        self._do_test(c)
 
2458
 
 
2459
 
2612
2460
class CRUDTest(fixtures.TestBase, AssertsCompiledSQL):
2613
2461
    __dialect__ = 'default'
2614
2462
 
2615
 
    def test_insert(self):
2616
 
        # generic insert, will create bind params for all columns
2617
 
        self.assert_compile(insert(table1),
2618
 
                            "INSERT INTO mytable (myid, name, description) "
2619
 
                            "VALUES (:myid, :name, :description)")
2620
 
 
2621
 
        # insert with user-supplied bind params for specific columns,
2622
 
        # cols provided literally
2623
 
        self.assert_compile(
2624
 
            insert(table1, {
2625
 
                            table1.c.myid : bindparam('userid'),
2626
 
                            table1.c.name : bindparam('username')}),
2627
 
            "INSERT INTO mytable (myid, name) VALUES (:userid, :username)")
2628
 
 
2629
 
        # insert with user-supplied bind params for specific columns, cols
2630
 
        # provided as strings
2631
 
        self.assert_compile(
2632
 
            insert(table1, dict(myid = 3, name = 'jack')),
2633
 
            "INSERT INTO mytable (myid, name) VALUES (:myid, :name)"
2634
 
        )
2635
 
 
2636
 
        # test with a tuple of params instead of named
2637
 
        self.assert_compile(
2638
 
            insert(table1, (3, 'jack', 'mydescription')),
2639
 
            "INSERT INTO mytable (myid, name, description) VALUES (:myid, :name, :description)",
2640
 
            checkparams = {'myid':3, 'name':'jack', 'description':'mydescription'}
2641
 
        )
2642
 
 
2643
 
        self.assert_compile(
2644
 
            insert(table1, values={
2645
 
                                    table1.c.myid : bindparam('userid')
2646
 
                                }).values({table1.c.name : bindparam('username')}),
2647
 
            "INSERT INTO mytable (myid, name) VALUES (:userid, :username)"
2648
 
        )
2649
 
 
2650
 
        self.assert_compile(
2651
 
                    insert(table1, values=dict(myid=func.lala())),
2652
 
                    "INSERT INTO mytable (myid) VALUES (lala())")
2653
 
 
2654
 
    def test_inline_insert(self):
2655
 
        metadata = MetaData()
2656
 
        table = Table('sometable', metadata,
2657
 
            Column('id', Integer, primary_key=True),
2658
 
            Column('foo', Integer, default=func.foobar()))
2659
 
        self.assert_compile(
2660
 
                    table.insert(values={}, inline=True),
2661
 
                    "INSERT INTO sometable (foo) VALUES (foobar())")
2662
 
        self.assert_compile(
2663
 
                    table.insert(inline=True),
2664
 
                    "INSERT INTO sometable (foo) VALUES (foobar())", params={})
2665
 
 
2666
 
    def test_update(self):
2667
 
        self.assert_compile(
2668
 
                update(table1, table1.c.myid == 7),
2669
 
                "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
2670
 
                params = {table1.c.name:'fred'})
2671
 
        self.assert_compile(
2672
 
                table1.update().where(table1.c.myid==7).
2673
 
                            values({table1.c.myid:5}),
2674
 
                "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
2675
 
                checkparams={'myid':5, 'myid_1':7})
2676
 
        self.assert_compile(
2677
 
                update(table1, table1.c.myid == 7),
2678
 
                "UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1",
2679
 
                params = {'name':'fred'})
2680
 
        self.assert_compile(
2681
 
                update(table1, values = {table1.c.name : table1.c.myid}),
2682
 
                "UPDATE mytable SET name=mytable.myid")
2683
 
        self.assert_compile(
2684
 
                update(table1,
2685
 
                        whereclause = table1.c.name == bindparam('crit'),
2686
 
                        values = {table1.c.name : 'hi'}),
2687
 
                "UPDATE mytable SET name=:name WHERE mytable.name = :crit",
2688
 
                params = {'crit' : 'notthere'},
2689
 
                checkparams={'crit':'notthere', 'name':'hi'})
2690
 
        self.assert_compile(
2691
 
                update(table1, table1.c.myid == 12,
2692
 
                            values = {table1.c.name : table1.c.myid}),
2693
 
                "UPDATE mytable SET name=mytable.myid, description="
2694
 
                ":description WHERE mytable.myid = :myid_1",
2695
 
                params = {'description':'test'},
2696
 
                checkparams={'description':'test', 'myid_1':12})
2697
 
        self.assert_compile(
2698
 
                update(table1, table1.c.myid == 12,
2699
 
                                values = {table1.c.myid : 9}),
2700
 
                "UPDATE mytable SET myid=:myid, description=:description "
2701
 
                "WHERE mytable.myid = :myid_1",
2702
 
                params = {'myid_1': 12, 'myid': 9, 'description': 'test'})
2703
 
        self.assert_compile(
2704
 
                update(table1, table1.c.myid ==12),
2705
 
                "UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1",
2706
 
                params={'myid':18}, checkparams={'myid':18, 'myid_1':12})
2707
 
        s = table1.update(table1.c.myid == 12, values = {table1.c.name : 'lala'})
2708
 
        c = s.compile(column_keys=['id', 'name'])
2709
 
        self.assert_compile(
2710
 
                update(table1, table1.c.myid == 12,
2711
 
                        values = {table1.c.name : table1.c.myid}
2712
 
                    ).values({table1.c.name:table1.c.name + 'foo'}),
2713
 
                "UPDATE mytable SET name=(mytable.name || :name_1), "
2714
 
                "description=:description WHERE mytable.myid = :myid_1",
2715
 
                params = {'description':'test'})
2716
 
        eq_(str(s), str(c))
2717
 
 
2718
 
        self.assert_compile(update(table1,
2719
 
            (table1.c.myid == func.hoho(4)) &
2720
 
            (table1.c.name == literal('foo') + table1.c.name + literal('lala')),
2721
 
            values = {
2722
 
            table1.c.name : table1.c.name + "lala",
2723
 
            table1.c.myid : func.do_stuff(table1.c.myid, literal('hoho'))
2724
 
            }), "UPDATE mytable SET myid=do_stuff(mytable.myid, :param_1), "
2725
 
            "name=(mytable.name || :name_1) "
2726
 
            "WHERE mytable.myid = hoho(:hoho_1) AND mytable.name = :param_2 || "
2727
 
            "mytable.name || :param_3")
2728
2463
 
2729
2464
    def test_correlated_update(self):
2730
2465
        # test against a straight text subquery
2731
 
        u = update(table1, values = {
2732
 
                    table1.c.name :
 
2466
        u = update(table1, values={
 
2467
                    table1.c.name:
2733
2468
                    text("(select name from mytable where id=mytable.id)")})
2734
2469
        self.assert_compile(u,
2735
2470
                    "UPDATE mytable SET name=(select name from mytable "
2736
2471
                    "where id=mytable.id)")
2737
2472
 
2738
2473
        mt = table1.alias()
2739
 
        u = update(table1, values = {
2740
 
                                table1.c.name :
2741
 
                                select([mt.c.name], mt.c.myid==table1.c.myid)
 
2474
        u = update(table1, values={
 
2475
                                table1.c.name:
 
2476
                                select([mt.c.name], mt.c.myid == table1.c.myid)
2742
2477
                            })
2743
2478
        self.assert_compile(u,
2744
2479
                    "UPDATE mytable SET name=(SELECT mytable_1.name FROM "
2745
 
                    "mytable AS mytable_1 WHERE mytable_1.myid = mytable.myid)")
 
2480
                    "mytable AS mytable_1 WHERE "
 
2481
                    "mytable_1.myid = mytable.myid)")
2746
2482
 
2747
2483
        # test against a regular constructed subquery
2748
2484
        s = select([table2], table2.c.otherid == table1.c.myid)
2749
 
        u = update(table1, table1.c.name == 'jack', values = {table1.c.name : s})
 
2485
        u = update(table1, table1.c.name == 'jack', values={table1.c.name: s})
2750
2486
        self.assert_compile(u,
2751
2487
                    "UPDATE mytable SET name=(SELECT myothertable.otherid, "
2752
2488
                    "myothertable.othername FROM myothertable WHERE "
2753
 
                    "myothertable.otherid = mytable.myid) WHERE mytable.name = :name_1")
 
2489
                    "myothertable.otherid = mytable.myid) "
 
2490
                    "WHERE mytable.name = :name_1")
2754
2491
 
2755
2492
        # test a non-correlated WHERE clause
2756
2493
        s = select([table2.c.othername], table2.c.otherid == 7)
2757
 
        u = update(table1, table1.c.name==s)
 
2494
        u = update(table1, table1.c.name == s)
2758
2495
        self.assert_compile(u,
2759
2496
                    "UPDATE mytable SET myid=:myid, name=:name, "
2760
2497
                    "description=:description WHERE mytable.name = "
2763
2500
 
2764
2501
        # test one that is actually correlated...
2765
2502
        s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
2766
 
        u = table1.update(table1.c.name==s)
 
2503
        u = table1.update(table1.c.name == s)
2767
2504
        self.assert_compile(u,
2768
2505
                "UPDATE mytable SET myid=:myid, name=:name, "
2769
2506
                "description=:description WHERE mytable.name = "
2795
2532
                "AND myothertable.othername = mytable_1.name",
2796
2533
                dialect=mssql.dialect())
2797
2534
 
2798
 
    def test_delete(self):
2799
 
        self.assert_compile(
2800
 
                        delete(table1, table1.c.myid == 7),
2801
 
                        "DELETE FROM mytable WHERE mytable.myid = :myid_1")
2802
 
        self.assert_compile(
2803
 
                        table1.delete().where(table1.c.myid == 7),
2804
 
                        "DELETE FROM mytable WHERE mytable.myid = :myid_1")
2805
 
        self.assert_compile(
2806
 
                        table1.delete().where(table1.c.myid == 7).\
2807
 
                                        where(table1.c.name=='somename'),
2808
 
                        "DELETE FROM mytable WHERE mytable.myid = :myid_1 "
2809
 
                        "AND mytable.name = :name_1")
2810
 
 
2811
 
    def test_correlated_delete(self):
2812
 
        # test a non-correlated WHERE clause
2813
 
        s = select([table2.c.othername], table2.c.otherid == 7)
2814
 
        u = delete(table1, table1.c.name==s)
2815
 
        self.assert_compile(u, "DELETE FROM mytable WHERE mytable.name = "\
2816
 
        "(SELECT myothertable.othername FROM myothertable WHERE myothertable.otherid = :otherid_1)")
2817
 
 
2818
 
        # test one that is actually correlated...
2819
 
        s = select([table2.c.othername], table2.c.otherid == table1.c.myid)
2820
 
        u = table1.delete(table1.c.name==s)
2821
 
        self.assert_compile(u,
2822
 
                    "DELETE FROM mytable WHERE mytable.name = (SELECT "
2823
 
                    "myothertable.othername FROM myothertable WHERE "
2824
 
                    "myothertable.otherid = mytable.myid)")
2825
 
 
2826
2535
    def test_binds_that_match_columns(self):
2827
2536
        """test bind params named after column names
2828
2537
        replace the normal SET/VALUES generation."""
2829
2538
 
2830
2539
        t = table('foo', column('x'), column('y'))
2831
2540
 
2832
 
        u = t.update().where(t.c.x==bindparam('x'))
 
2541
        u = t.update().where(t.c.x == bindparam('x'))
2833
2542
 
2834
2543
        assert_raises(exc.CompileError, u.compile)
2835
2544
 
2837
2546
 
2838
2547
        assert_raises(exc.CompileError, u.values(x=7).compile)
2839
2548
 
2840
 
        self.assert_compile(u.values(y=7), "UPDATE foo SET y=:y WHERE foo.x = :x")
 
2549
        self.assert_compile(u.values(y=7),
 
2550
                        "UPDATE foo SET y=:y WHERE foo.x = :x")
2841
2551
 
2842
 
        assert_raises(exc.CompileError, u.values(x=7).compile, column_keys=['x', 'y'])
 
2552
        assert_raises(exc.CompileError,
 
2553
                        u.values(x=7).compile, column_keys=['x', 'y'])
2843
2554
        assert_raises(exc.CompileError, u.compile, column_keys=['x', 'y'])
2844
2555
 
2845
2556
        self.assert_compile(u.values(x=3 + bindparam('x')),
2846
 
                            "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x")
2847
 
 
2848
 
        self.assert_compile(u.values(x=3 + bindparam('x')),
2849
 
                            "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x",
2850
 
                            params={'x':1})
2851
 
 
2852
 
        self.assert_compile(u.values(x=3 + bindparam('x')),
2853
 
                            "UPDATE foo SET x=(:param_1 + :x), y=:y WHERE foo.x = :x",
2854
 
                            params={'x':1, 'y':2})
 
2557
                        "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x")
 
2558
 
 
2559
        self.assert_compile(u.values(x=3 + bindparam('x')),
 
2560
                        "UPDATE foo SET x=(:param_1 + :x) WHERE foo.x = :x",
 
2561
                        params={'x': 1})
 
2562
 
 
2563
        self.assert_compile(u.values(x=3 + bindparam('x')),
 
2564
                    "UPDATE foo SET x=(:param_1 + :x), y=:y WHERE foo.x = :x",
 
2565
                    params={'x': 1, 'y': 2})
2855
2566
 
2856
2567
        i = t.insert().values(x=3 + bindparam('x'))
2857
 
        self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x))")
2858
 
        self.assert_compile(i,
2859
 
                            "INSERT INTO foo (x, y) VALUES ((:param_1 + :x), :y)",
2860
 
                            params={'x':1, 'y':2})
 
2568
        self.assert_compile(i,
 
2569
                        "INSERT INTO foo (x) VALUES ((:param_1 + :x))")
 
2570
        self.assert_compile(i,
 
2571
                        "INSERT INTO foo (x, y) VALUES ((:param_1 + :x), :y)",
 
2572
                        params={'x': 1, 'y': 2})
2861
2573
 
2862
2574
        i = t.insert().values(x=bindparam('y'))
2863
2575
        self.assert_compile(i, "INSERT INTO foo (x) VALUES (:y)")
2869
2581
        assert_raises(exc.CompileError, i.compile)
2870
2582
 
2871
2583
        i = t.insert().values(x=3 + bindparam('x2'))
2872
 
        self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))")
2873
 
        self.assert_compile(i, "INSERT INTO foo (x) VALUES ((:param_1 + :x2))", params={})
2874
 
        self.assert_compile(i, "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
2875
 
                                    params={'x':1, 'y':2})
2876
 
        self.assert_compile(i, "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
2877
 
                                    params={'x2':1, 'y':2})
 
2584
        self.assert_compile(i,
 
2585
                "INSERT INTO foo (x) VALUES ((:param_1 + :x2))")
 
2586
        self.assert_compile(i,
 
2587
                "INSERT INTO foo (x) VALUES ((:param_1 + :x2))", params={})
 
2588
        self.assert_compile(i,
 
2589
                "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
 
2590
                                    params={'x': 1, 'y': 2})
 
2591
        self.assert_compile(i,
 
2592
                "INSERT INTO foo (x, y) VALUES ((:param_1 + :x2), :y)",
 
2593
                                    params={'x2': 1, 'y': 2})
2878
2594
 
2879
2595
    def test_unconsumed_names(self):
2880
2596
        t = table("t", column("x"), column("y"))
2881
2597
        t2 = table("t2", column("q"), column("z"))
2882
2598
        assert_raises_message(
2883
 
            exc.SAWarning,
 
2599
            exc.CompileError,
2884
2600
            "Unconsumed column names: z",
2885
2601
            t.insert().values(x=5, z=5).compile,
2886
2602
        )
2887
2603
        assert_raises_message(
2888
 
            exc.SAWarning,
 
2604
            exc.CompileError,
2889
2605
            "Unconsumed column names: z",
2890
2606
            t.update().values(x=5, z=5).compile,
2891
2607
        )
2892
2608
 
2893
2609
        assert_raises_message(
2894
 
            exc.SAWarning,
 
2610
            exc.CompileError,
2895
2611
            "Unconsumed column names: j",
2896
 
            t.update().values(x=5, j=7).values({t2.c.z:5}).
2897
 
                where(t.c.x==t2.c.q).compile,
 
2612
            t.update().values(x=5, j=7).values({t2.c.z: 5}).
 
2613
                where(t.c.x == t2.c.q).compile,
2898
2614
        )
2899
2615
 
2900
2616
        # bindparam names don't get counted
2909
2625
        self.assert_compile(
2910
2626
            i,
2911
2627
            "INSERT INTO t (x) VALUES ((:param_1 + :x2))",
2912
 
            params={"x2":1}
 
2628
            params={"x2": 1}
2913
2629
        )
2914
2630
 
2915
2631
        assert_raises_message(
2916
 
            exc.SAWarning,
 
2632
            exc.CompileError,
2917
2633
            "Unconsumed column names: j",
2918
2634
            t.update().values(x=5, j=7).compile,
2919
2635
            column_keys=['j']
2925
2641
        t = table('foo', column('id'), column('foo_id'))
2926
2642
 
2927
2643
        self.assert_compile(
2928
 
            t.update().where(t.c.id==5),
 
2644
            t.update().where(t.c.id == 5),
2929
2645
            "UPDATE foo SET id=:id, foo_id=:foo_id WHERE foo.id = :id_1"
2930
2646
        )
2931
2647
 
2932
2648
        self.assert_compile(
2933
 
            t.update().where(t.c.id==bindparam(key=t.c.id._label)),
 
2649
            t.update().where(t.c.id == bindparam(key=t.c.id._label)),
2934
2650
            "UPDATE foo SET id=:id, foo_id=:foo_id WHERE foo.id = :foo_id_1"
2935
2651
        )
2936
2652
 
2977
2693
 
2978
2694
    def test_insert(self):
2979
2695
        m = MetaData()
2980
 
        foo =  Table('foo', m,
 
2696
        foo = Table('foo', m,
2981
2697
            Column('id', Integer))
2982
2698
 
2983
2699
        t = Table('test', m,
2984
2700
            Column('col1', Integer, default=func.foo(1)),
2985
 
            Column('col2', Integer, default=select([func.coalesce(func.max(foo.c.id))])),
 
2701
            Column('col2', Integer, default=select(
 
2702
                        [func.coalesce(func.max(foo.c.id))])),
2986
2703
            )
2987
2704
 
2988
2705
        self.assert_compile(t.insert(inline=True, values={}),
2992
2709
 
2993
2710
    def test_update(self):
2994
2711
        m = MetaData()
2995
 
        foo =  Table('foo', m,
 
2712
        foo = Table('foo', m,
2996
2713
            Column('id', Integer))
2997
2714
 
2998
2715
        t = Table('test', m,
2999
2716
            Column('col1', Integer, onupdate=func.foo(1)),
3000
 
            Column('col2', Integer, onupdate=select([func.coalesce(func.max(foo.c.id))])),
 
2717
            Column('col2', Integer, onupdate=select(
 
2718
                            [func.coalesce(func.max(foo.c.id))])),
3001
2719
            Column('col3', String(30))
3002
2720
            )
3003
2721
 
3004
 
        self.assert_compile(t.update(inline=True, values={'col3':'foo'}),
 
2722
        self.assert_compile(t.update(inline=True, values={'col3': 'foo'}),
3005
2723
                        "UPDATE test SET col1=foo(:foo_1), col2=(SELECT "
3006
2724
                        "coalesce(max(foo.id)) AS coalesce_1 FROM foo), "
3007
2725
                        "col3=:col3")
3011
2729
 
3012
2730
    def test_select(self):
3013
2731
        self.assert_compile(table4.select(),
3014
 
                "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id,"
3015
 
                " remote_owner.remotetable.value FROM remote_owner.remotetable")
 
2732
                "SELECT remote_owner.remotetable.rem_id, "
 
2733
                "remote_owner.remotetable.datatype_id,"
 
2734
                " remote_owner.remotetable.value "
 
2735
                "FROM remote_owner.remotetable")
3016
2736
 
3017
 
        self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')),
3018
 
                "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id,"
3019
 
                " remote_owner.remotetable.value FROM remote_owner.remotetable WHERE "
 
2737
        self.assert_compile(table4.select(and_(table4.c.datatype_id == 7,
 
2738
                                table4.c.value == 'hi')),
 
2739
                "SELECT remote_owner.remotetable.rem_id, "
 
2740
                "remote_owner.remotetable.datatype_id,"
 
2741
                " remote_owner.remotetable.value "
 
2742
                "FROM remote_owner.remotetable WHERE "
3020
2743
                "remote_owner.remotetable.datatype_id = :datatype_id_1 AND"
3021
2744
                " remote_owner.remotetable.value = :value_1")
3022
2745
 
3023
 
        s = table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi'), use_labels=True)
 
2746
        s = table4.select(and_(table4.c.datatype_id == 7,
 
2747
                            table4.c.value == 'hi'), use_labels=True)
3024
2748
        self.assert_compile(s, "SELECT remote_owner.remotetable.rem_id AS"
3025
 
            " remote_owner_remotetable_rem_id, remote_owner.remotetable.datatype_id AS"
3026
 
            " remote_owner_remotetable_datatype_id, remote_owner.remotetable.value "
3027
 
            "AS remote_owner_remotetable_value FROM remote_owner.remotetable WHERE "
 
2749
            " remote_owner_remotetable_rem_id, "
 
2750
            "remote_owner.remotetable.datatype_id AS"
 
2751
            " remote_owner_remotetable_datatype_id, "
 
2752
            "remote_owner.remotetable.value "
 
2753
            "AS remote_owner_remotetable_value FROM "
 
2754
            "remote_owner.remotetable WHERE "
3028
2755
            "remote_owner.remotetable.datatype_id = :datatype_id_1 AND "
3029
2756
            "remote_owner.remotetable.value = :value_1")
3030
2757
 
3031
2758
        # multi-part schema name
3032
2759
        self.assert_compile(table5.select(),
3033
2760
                'SELECT "dbo.remote_owner".remotetable.rem_id, '
3034
 
                '"dbo.remote_owner".remotetable.datatype_id, "dbo.remote_owner".remotetable.value '
 
2761
                '"dbo.remote_owner".remotetable.datatype_id, '
 
2762
                '"dbo.remote_owner".remotetable.value '
3035
2763
                'FROM "dbo.remote_owner".remotetable'
3036
2764
        )
3037
2765
 
3038
2766
        # multi-part schema name labels - convert '.' to '_'
3039
2767
        self.assert_compile(table5.select(use_labels=True),
3040
2768
                'SELECT "dbo.remote_owner".remotetable.rem_id AS'
3041
 
                ' dbo_remote_owner_remotetable_rem_id, "dbo.remote_owner".remotetable.datatype_id'
 
2769
                ' dbo_remote_owner_remotetable_rem_id, '
 
2770
                '"dbo.remote_owner".remotetable.datatype_id'
3042
2771
                ' AS dbo_remote_owner_remotetable_datatype_id,'
3043
 
                ' "dbo.remote_owner".remotetable.value AS dbo_remote_owner_remotetable_value FROM'
 
2772
                ' "dbo.remote_owner".remotetable.value AS '
 
2773
                'dbo_remote_owner_remotetable_value FROM'
3044
2774
                ' "dbo.remote_owner".remotetable'
3045
2775
        )
3046
2776
 
3047
2777
    def test_alias(self):
3048
2778
        a = alias(table4, 'remtable')
3049
 
        self.assert_compile(a.select(a.c.datatype_id==7),
3050
 
                            "SELECT remtable.rem_id, remtable.datatype_id, remtable.value FROM"
 
2779
        self.assert_compile(a.select(a.c.datatype_id == 7),
 
2780
                            "SELECT remtable.rem_id, remtable.datatype_id, "
 
2781
                            "remtable.value FROM"
3051
2782
                            " remote_owner.remotetable AS remtable "
3052
2783
                            "WHERE remtable.datatype_id = :datatype_id_1")
3053
2784
 
3054
2785
    def test_update(self):
3055
2786
        self.assert_compile(
3056
 
                table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}),
 
2787
                table4.update(table4.c.value == 'test',
 
2788
                        values={table4.c.datatype_id: 12}),
3057
2789
                "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id "
3058
2790
                "WHERE remote_owner.remotetable.value = :value_1")
3059
2791
 
3060
2792
    def test_insert(self):
3061
2793
        self.assert_compile(table4.insert(values=(2, 5, 'test')),
3062
 
                    "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES "
 
2794
                    "INSERT INTO remote_owner.remotetable "
 
2795
                    "(rem_id, datatype_id, value) VALUES "
3063
2796
                    "(:rem_id, :datatype_id, :value)")
3064
2797
 
3065
2798
 
 
2799
class CorrelateTest(fixtures.TestBase, AssertsCompiledSQL):
 
2800
    __dialect__ = 'default'
 
2801
 
 
2802
    def test_dont_overcorrelate(self):
 
2803
        self.assert_compile(select([table1], from_obj=[table1,
 
2804
                            table1.select()]),
 
2805
                            "SELECT mytable.myid, mytable.name, "
 
2806
                            "mytable.description FROM mytable, (SELECT "
 
2807
                            "mytable.myid AS myid, mytable.name AS "
 
2808
                            "name, mytable.description AS description "
 
2809
                            "FROM mytable)")
 
2810
 
 
2811
    def _fixture(self):
 
2812
        t1 = table('t1', column('a'))
 
2813
        t2 = table('t2', column('a'))
 
2814
        return t1, t2, select([t1]).where(t1.c.a == t2.c.a)
 
2815
 
 
2816
    def _assert_where_correlated(self, stmt):
 
2817
        self.assert_compile(
 
2818
                stmt,
 
2819
                "SELECT t2.a FROM t2 WHERE t2.a = "
 
2820
                "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)")
 
2821
 
 
2822
    def _assert_where_all_correlated(self, stmt):
 
2823
        self.assert_compile(
 
2824
                stmt,
 
2825
                "SELECT t1.a, t2.a FROM t1, t2 WHERE t2.a = "
 
2826
                "(SELECT t1.a WHERE t1.a = t2.a)")
 
2827
 
 
2828
    # note there's no more "backwards" correlation after
 
2829
    # we've done #2746
 
2830
    #def _assert_where_backwards_correlated(self, stmt):
 
2831
    #    self.assert_compile(
 
2832
    #            stmt,
 
2833
    #            "SELECT t2.a FROM t2 WHERE t2.a = "
 
2834
    #            "(SELECT t1.a FROM t2 WHERE t1.a = t2.a)")
 
2835
 
 
2836
    #def _assert_column_backwards_correlated(self, stmt):
 
2837
    #    self.assert_compile(stmt,
 
2838
    #            "SELECT t2.a, (SELECT t1.a FROM t2 WHERE t1.a = t2.a) "
 
2839
    #            "AS anon_1 FROM t2")
 
2840
 
 
2841
    def _assert_column_correlated(self, stmt):
 
2842
        self.assert_compile(stmt,
 
2843
                "SELECT t2.a, (SELECT t1.a FROM t1 WHERE t1.a = t2.a) "
 
2844
                "AS anon_1 FROM t2")
 
2845
 
 
2846
    def _assert_column_all_correlated(self, stmt):
 
2847
        self.assert_compile(stmt,
 
2848
                "SELECT t1.a, t2.a, "
 
2849
                "(SELECT t1.a WHERE t1.a = t2.a) AS anon_1 FROM t1, t2")
 
2850
 
 
2851
 
 
2852
    def _assert_having_correlated(self, stmt):
 
2853
        self.assert_compile(stmt,
 
2854
                "SELECT t2.a FROM t2 HAVING t2.a = "
 
2855
                "(SELECT t1.a FROM t1 WHERE t1.a = t2.a)")
 
2856
 
 
2857
    def _assert_from_uncorrelated(self, stmt):
 
2858
        self.assert_compile(stmt,
 
2859
                "SELECT t2.a, anon_1.a FROM t2, "
 
2860
                "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1")
 
2861
 
 
2862
    def _assert_from_all_uncorrelated(self, stmt):
 
2863
        self.assert_compile(stmt,
 
2864
                "SELECT t1.a, t2.a, anon_1.a FROM t1, t2, "
 
2865
                "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a) AS anon_1")
 
2866
 
 
2867
    def _assert_where_uncorrelated(self, stmt):
 
2868
        self.assert_compile(stmt,
 
2869
                "SELECT t2.a FROM t2 WHERE t2.a = "
 
2870
                "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)")
 
2871
 
 
2872
    def _assert_column_uncorrelated(self, stmt):
 
2873
        self.assert_compile(stmt,
 
2874
                "SELECT t2.a, (SELECT t1.a FROM t1, t2 "
 
2875
                    "WHERE t1.a = t2.a) AS anon_1 FROM t2")
 
2876
 
 
2877
    def _assert_having_uncorrelated(self, stmt):
 
2878
        self.assert_compile(stmt,
 
2879
                "SELECT t2.a FROM t2 HAVING t2.a = "
 
2880
                "(SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a)")
 
2881
 
 
2882
    def _assert_where_single_full_correlated(self, stmt):
 
2883
        self.assert_compile(stmt,
 
2884
            "SELECT t1.a FROM t1 WHERE t1.a = (SELECT t1.a)")
 
2885
 
 
2886
    def test_correlate_semiauto_where(self):
 
2887
        t1, t2, s1 = self._fixture()
 
2888
        self._assert_where_correlated(
 
2889
                select([t2]).where(t2.c.a == s1.correlate(t2)))
 
2890
 
 
2891
    def test_correlate_semiauto_column(self):
 
2892
        t1, t2, s1 = self._fixture()
 
2893
        self._assert_column_correlated(
 
2894
                select([t2, s1.correlate(t2).as_scalar()]))
 
2895
 
 
2896
    def test_correlate_semiauto_from(self):
 
2897
        t1, t2, s1 = self._fixture()
 
2898
        self._assert_from_uncorrelated(
 
2899
                select([t2, s1.correlate(t2).alias()]))
 
2900
 
 
2901
    def test_correlate_semiauto_having(self):
 
2902
        t1, t2, s1 = self._fixture()
 
2903
        self._assert_having_correlated(
 
2904
                select([t2]).having(t2.c.a == s1.correlate(t2)))
 
2905
 
 
2906
    def test_correlate_except_inclusion_where(self):
 
2907
        t1, t2, s1 = self._fixture()
 
2908
        self._assert_where_correlated(
 
2909
                select([t2]).where(t2.c.a == s1.correlate_except(t1)))
 
2910
 
 
2911
    def test_correlate_except_exclusion_where(self):
 
2912
        t1, t2, s1 = self._fixture()
 
2913
        self._assert_where_uncorrelated(
 
2914
                select([t2]).where(t2.c.a == s1.correlate_except(t2)))
 
2915
 
 
2916
    def test_correlate_except_inclusion_column(self):
 
2917
        t1, t2, s1 = self._fixture()
 
2918
        self._assert_column_correlated(
 
2919
                select([t2, s1.correlate_except(t1).as_scalar()]))
 
2920
 
 
2921
    def test_correlate_except_exclusion_column(self):
 
2922
        t1, t2, s1 = self._fixture()
 
2923
        self._assert_column_uncorrelated(
 
2924
                select([t2, s1.correlate_except(t2).as_scalar()]))
 
2925
 
 
2926
    def test_correlate_except_inclusion_from(self):
 
2927
        t1, t2, s1 = self._fixture()
 
2928
        self._assert_from_uncorrelated(
 
2929
                select([t2, s1.correlate_except(t1).alias()]))
 
2930
 
 
2931
    def test_correlate_except_exclusion_from(self):
 
2932
        t1, t2, s1 = self._fixture()
 
2933
        self._assert_from_uncorrelated(
 
2934
                select([t2, s1.correlate_except(t2).alias()]))
 
2935
 
 
2936
    def test_correlate_except_none(self):
 
2937
        t1, t2, s1 = self._fixture()
 
2938
        self._assert_where_all_correlated(
 
2939
                select([t1, t2]).where(t2.c.a == s1.correlate_except(None)))
 
2940
 
 
2941
    def test_correlate_except_having(self):
 
2942
        t1, t2, s1 = self._fixture()
 
2943
        self._assert_having_correlated(
 
2944
                select([t2]).having(t2.c.a == s1.correlate_except(t1)))
 
2945
 
 
2946
    def test_correlate_auto_where(self):
 
2947
        t1, t2, s1 = self._fixture()
 
2948
        self._assert_where_correlated(
 
2949
                select([t2]).where(t2.c.a == s1))
 
2950
 
 
2951
    def test_correlate_auto_column(self):
 
2952
        t1, t2, s1 = self._fixture()
 
2953
        self._assert_column_correlated(
 
2954
                select([t2, s1.as_scalar()]))
 
2955
 
 
2956
    def test_correlate_auto_from(self):
 
2957
        t1, t2, s1 = self._fixture()
 
2958
        self._assert_from_uncorrelated(
 
2959
                select([t2, s1.alias()]))
 
2960
 
 
2961
    def test_correlate_auto_having(self):
 
2962
        t1, t2, s1 = self._fixture()
 
2963
        self._assert_having_correlated(
 
2964
                select([t2]).having(t2.c.a == s1))
 
2965
 
 
2966
    def test_correlate_disabled_where(self):
 
2967
        t1, t2, s1 = self._fixture()
 
2968
        self._assert_where_uncorrelated(
 
2969
                select([t2]).where(t2.c.a == s1.correlate(None)))
 
2970
 
 
2971
    def test_correlate_disabled_column(self):
 
2972
        t1, t2, s1 = self._fixture()
 
2973
        self._assert_column_uncorrelated(
 
2974
                select([t2, s1.correlate(None).as_scalar()]))
 
2975
 
 
2976
    def test_correlate_disabled_from(self):
 
2977
        t1, t2, s1 = self._fixture()
 
2978
        self._assert_from_uncorrelated(
 
2979
                select([t2, s1.correlate(None).alias()]))
 
2980
 
 
2981
    def test_correlate_disabled_having(self):
 
2982
        t1, t2, s1 = self._fixture()
 
2983
        self._assert_having_uncorrelated(
 
2984
                select([t2]).having(t2.c.a == s1.correlate(None)))
 
2985
 
 
2986
    def test_correlate_all_where(self):
 
2987
        t1, t2, s1 = self._fixture()
 
2988
        self._assert_where_all_correlated(
 
2989
                select([t1, t2]).where(t2.c.a == s1.correlate(t1, t2)))
 
2990
 
 
2991
    def test_correlate_all_column(self):
 
2992
        t1, t2, s1 = self._fixture()
 
2993
        self._assert_column_all_correlated(
 
2994
                select([t1, t2, s1.correlate(t1, t2).as_scalar()]))
 
2995
 
 
2996
    def test_correlate_all_from(self):
 
2997
        t1, t2, s1 = self._fixture()
 
2998
        self._assert_from_all_uncorrelated(
 
2999
                select([t1, t2, s1.correlate(t1, t2).alias()]))
 
3000
 
 
3001
    def test_correlate_where_all_unintentional(self):
 
3002
        t1, t2, s1 = self._fixture()
 
3003
        assert_raises_message(
 
3004
            exc.InvalidRequestError,
 
3005
            "returned no FROM clauses due to auto-correlation",
 
3006
            select([t1, t2]).where(t2.c.a == s1).compile
 
3007
        )
 
3008
 
 
3009
    def test_correlate_from_all_ok(self):
 
3010
        t1, t2, s1 = self._fixture()
 
3011
        self.assert_compile(
 
3012
            select([t1, t2, s1]),
 
3013
            "SELECT t1.a, t2.a, a FROM t1, t2, "
 
3014
            "(SELECT t1.a AS a FROM t1, t2 WHERE t1.a = t2.a)"
 
3015
        )
 
3016
 
 
3017
    def test_correlate_auto_where_singlefrom(self):
 
3018
        t1, t2, s1 = self._fixture()
 
3019
        s = select([t1.c.a])
 
3020
        s2 = select([t1]).where(t1.c.a == s)
 
3021
        self.assert_compile(s2,
 
3022
                "SELECT t1.a FROM t1 WHERE t1.a = "
 
3023
                "(SELECT t1.a FROM t1)")
 
3024
 
 
3025
    def test_correlate_semiauto_where_singlefrom(self):
 
3026
        t1, t2, s1 = self._fixture()
 
3027
 
 
3028
        s = select([t1.c.a])
 
3029
 
 
3030
        s2 = select([t1]).where(t1.c.a == s.correlate(t1))
 
3031
        self._assert_where_single_full_correlated(s2)
 
3032
 
 
3033
    def test_correlate_except_semiauto_where_singlefrom(self):
 
3034
        t1, t2, s1 = self._fixture()
 
3035
 
 
3036
        s = select([t1.c.a])
 
3037
 
 
3038
        s2 = select([t1]).where(t1.c.a == s.correlate_except(t2))
 
3039
        self._assert_where_single_full_correlated(s2)
 
3040
 
 
3041
    def test_correlate_alone_noeffect(self):
 
3042
        # new as of #2668
 
3043
        t1, t2, s1 = self._fixture()
 
3044
        self.assert_compile(s1.correlate(t1, t2),
 
3045
            "SELECT t1.a FROM t1, t2 WHERE t1.a = t2.a")
 
3046
 
 
3047
    def test_correlate_except_froms(self):
 
3048
        # new as of #2748
 
3049
        t1 = table('t1', column('a'))
 
3050
        t2 = table('t2', column('a'), column('b'))
 
3051
        s = select([t2.c.b]).where(t1.c.a == t2.c.a)
 
3052
        s = s.correlate_except(t2).alias('s')
 
3053
 
 
3054
        s2 = select([func.foo(s.c.b)]).as_scalar()
 
3055
        s3 = select([t1], order_by=s2)
 
3056
 
 
3057
        self.assert_compile(s3,
 
3058
            "SELECT t1.a FROM t1 ORDER BY "
 
3059
            "(SELECT foo(s.b) AS foo_1 FROM "
 
3060
            "(SELECT t2.b AS b FROM t2 WHERE t1.a = t2.a) AS s)"
 
3061
        )
 
3062
 
 
3063
    def test_multilevel_froms_correlation(self):
 
3064
        # new as of #2748
 
3065
        p = table('parent', column('id'))
 
3066
        c = table('child', column('id'), column('parent_id'), column('pos'))
 
3067
 
 
3068
        s = c.select().where(c.c.parent_id == p.c.id).order_by(c.c.pos).limit(1)
 
3069
        s = s.correlate(p)
 
3070
        s = exists().select_from(s).where(s.c.id == 1)
 
3071
        s = select([p]).where(s)
 
3072
        self.assert_compile(s,
 
3073
            "SELECT parent.id FROM parent WHERE EXISTS (SELECT * "
 
3074
            "FROM (SELECT child.id AS id, child.parent_id AS parent_id, "
 
3075
            "child.pos AS pos FROM child WHERE child.parent_id = parent.id "
 
3076
            "ORDER BY child.pos LIMIT :param_1) WHERE id = :id_1)")
 
3077
 
 
3078
    def test_no_contextless_correlate_except(self):
 
3079
        # new as of #2748
 
3080
 
 
3081
        t1 = table('t1', column('x'))
 
3082
        t2 = table('t2', column('y'))
 
3083
        t3 = table('t3', column('z'))
 
3084
 
 
3085
        s = select([t1]).where(t1.c.x == t2.c.y).\
 
3086
            where(t2.c.y == t3.c.z).correlate_except(t1)
 
3087
        self.assert_compile(s,
 
3088
            "SELECT t1.x FROM t1, t2, t3 WHERE t1.x = t2.y AND t2.y = t3.z")
 
3089
 
 
3090
    def test_multilevel_implicit_correlation_disabled(self):
 
3091
        # test that implicit correlation with multilevel WHERE correlation
 
3092
        # behaves like 0.8.1, 0.7 (i.e. doesn't happen)
 
3093
        t1 = table('t1', column('x'))
 
3094
        t2 = table('t2', column('y'))
 
3095
        t3 = table('t3', column('z'))
 
3096
 
 
3097
        s = select([t1.c.x]).where(t1.c.x == t2.c.y)
 
3098
        s2 = select([t3.c.z]).where(t3.c.z == s.as_scalar())
 
3099
        s3 = select([t1]).where(t1.c.x == s2.as_scalar())
 
3100
 
 
3101
        self.assert_compile(s3,
 
3102
            "SELECT t1.x FROM t1 "
 
3103
            "WHERE t1.x = (SELECT t3.z "
 
3104
            "FROM t3 "
 
3105
            "WHERE t3.z = (SELECT t1.x "
 
3106
            "FROM t1, t2 "
 
3107
            "WHERE t1.x = t2.y))"
 
3108
        )
 
3109
 
 
3110
    def test_from_implicit_correlation_disabled(self):
 
3111
        # test that implicit correlation with immediate and
 
3112
        # multilevel FROM clauses behaves like 0.8.1 (i.e. doesn't happen)
 
3113
        t1 = table('t1', column('x'))
 
3114
        t2 = table('t2', column('y'))
 
3115
        t3 = table('t3', column('z'))
 
3116
 
 
3117
        s = select([t1.c.x]).where(t1.c.x == t2.c.y)
 
3118
        s2 = select([t2, s])
 
3119
        s3 = select([t1, s2])
 
3120
 
 
3121
        self.assert_compile(s3,
 
3122
            "SELECT t1.x, y, x FROM t1, "
 
3123
            "(SELECT t2.y AS y, x FROM t2, "
 
3124
            "(SELECT t1.x AS x FROM t1, t2 WHERE t1.x = t2.y))"
 
3125
        )
 
3126
 
3066
3127
class CoercionTest(fixtures.TestBase, AssertsCompiledSQL):
3067
3128
    __dialect__ = 'default'
3068
3129
 
3072
3133
            Column('id', Integer))
3073
3134
 
3074
3135
    def test_null_constant(self):
3075
 
        t = self._fixture()
3076
3136
        self.assert_compile(_literal_as_text(None), "NULL")
3077
3137
 
3078
3138
    def test_false_constant(self):
3079
 
        t = self._fixture()
3080
3139
        self.assert_compile(_literal_as_text(False), "false")
3081
3140
 
3082
3141
    def test_true_constant(self):
3083
 
        t = self._fixture()
3084
3142
        self.assert_compile(_literal_as_text(True), "true")
3085
3143
 
3086
3144
    def test_val_and_false(self):
3160
3218
             {'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type)},
3161
3219
        )
3162
3220
 
 
3221
    def test_label_plus_element(self):
 
3222
        t = Table('t', MetaData(), Column('a', Integer))
 
3223
        l1 = t.c.a.label('bar')
 
3224
        tc = type_coerce(t.c.a, String)
 
3225
        stmt = select([t.c.a, l1, tc])
 
3226
        comp = stmt.compile()
 
3227
        tc_anon_label = comp.result_map['a_1'][1][0]
 
3228
        eq_(
 
3229
            comp.result_map,
 
3230
             {
 
3231
                'a': ('a', (t.c.a, 'a', 'a'), t.c.a.type),
 
3232
                'bar': ('bar', (l1, 'bar'), l1.type),
 
3233
                'a_1': ('%%(%d a)s' % id(tc), (tc_anon_label, 'a_1'), tc.type),
 
3234
             },
 
3235
        )
 
3236
 
3163
3237
    def test_label_conflict_union(self):
3164
 
        t1 = Table('t1', MetaData(), Column('a', Integer), Column('b', Integer))
 
3238
        t1 = Table('t1', MetaData(), Column('a', Integer),
 
3239
                                    Column('b', Integer))
3165
3240
        t2 = Table('t2', MetaData(), Column('t1_a', Integer))
3166
3241
        union = select([t2]).union(select([t2])).alias()
3167
3242
 
3174
3249
            set(['t1_1_b', 't1_1_a', 't1_a', 't1_b'])
3175
3250
        )
3176
3251
        is_(
3177
 
            comp.result_map['t1_a'][1][1], t1.c.a
3178
 
        )
 
 
b'\\ No newline at end of file'
 
3252
            comp.result_map['t1_a'][1][2], t1.c.a
 
3253
        )