1
from test.lib.testing import eq_, assert_raises_message, assert_raises, AssertsCompiledSQL
3
1
from sqlalchemy import *
4
from sqlalchemy import exc, sql, util
5
from sqlalchemy.engine import default, base
7
from test.lib.schema import Table, Column
2
from sqlalchemy import testing
8
3
from sqlalchemy.dialects import mysql
4
from sqlalchemy.engine import default
5
from sqlalchemy.testing import AssertsCompiledSQL, eq_, fixtures
6
from sqlalchemy.testing.schema import Table, Column
10
9
class _UpdateFromTestBase(object):
12
11
def define_tables(cls, metadata):
12
Table('mytable', metadata,
13
Column('myid', Integer),
14
Column('name', String(30)),
15
Column('description', String(50)))
16
Table('myothertable', metadata,
17
Column('otherid', Integer),
18
Column('othername', String(30)))
13
19
Table('users', metadata,
14
20
Column('id', Integer, primary_key=True,
15
test_needs_autoincrement=True),
16
Column('name', String(30), nullable=False),
21
test_needs_autoincrement=True),
22
Column('name', String(30), nullable=False))
19
23
Table('addresses', metadata,
20
24
Column('id', Integer, primary_key=True,
21
test_needs_autoincrement=True),
25
test_needs_autoincrement=True),
22
26
Column('user_id', None, ForeignKey('users.id')),
23
27
Column('name', String(30), nullable=False),
24
Column('email_address', String(50), nullable=False),
27
Table("dingalings", metadata,
28
Column('email_address', String(50), nullable=False))
29
Table('dingalings', metadata,
28
30
Column('id', Integer, primary_key=True,
29
test_needs_autoincrement=True),
31
test_needs_autoincrement=True),
30
32
Column('address_id', None, ForeignKey('addresses.id')),
31
Column('data', String(30)),
33
Column('data', String(30)))
46
46
('id', 'user_id', 'name', 'email_address'),
47
(1, 7, 'x', "jack@bean.com"),
48
(2, 8, 'x', "ed@wood.com"),
49
(3, 8, 'x', "ed@bettyboop.com"),
50
(4, 8, 'x', "ed@lala.com"),
51
(5, 9, 'x', "fred@fred.com")
47
(1, 7, 'x', 'jack@bean.com'),
48
(2, 8, 'x', 'ed@wood.com'),
49
(3, 8, 'x', 'ed@bettyboop.com'),
50
(4, 8, 'x', 'ed@lala.com'),
51
(5, 9, 'x', 'fred@fred.com')
54
54
('id', 'address_id', 'data'),
61
class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
61
class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
62
__dialect__ = 'default'
64
def test_update_1(self):
65
table1 = self.tables.mytable
68
update(table1, table1.c.myid == 7),
69
'UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1',
70
params={table1.c.name: 'fred'})
72
def test_update_2(self):
73
table1 = self.tables.mytable
77
where(table1.c.myid == 7).
78
values({table1.c.myid: 5}),
79
'UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1',
80
checkparams={'myid': 5, 'myid_1': 7})
82
def test_update_3(self):
83
table1 = self.tables.mytable
86
update(table1, table1.c.myid == 7),
87
'UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1',
88
params={'name': 'fred'})
90
def test_update_4(self):
91
table1 = self.tables.mytable
94
update(table1, values={table1.c.name: table1.c.myid}),
95
'UPDATE mytable SET name=mytable.myid')
97
def test_update_5(self):
98
table1 = self.tables.mytable
102
whereclause=table1.c.name == bindparam('crit'),
103
values={table1.c.name: 'hi'}),
104
'UPDATE mytable SET name=:name WHERE mytable.name = :crit',
105
params={'crit': 'notthere'},
106
checkparams={'crit': 'notthere', 'name': 'hi'})
108
def test_update_6(self):
109
table1 = self.tables.mytable
114
values={table1.c.name: table1.c.myid}),
116
'SET name=mytable.myid, description=:description '
117
'WHERE mytable.myid = :myid_1',
118
params={'description': 'test'},
119
checkparams={'description': 'test', 'myid_1': 12})
121
def test_update_7(self):
122
table1 = self.tables.mytable
125
update(table1, table1.c.myid == 12, values={table1.c.myid: 9}),
127
'SET myid=:myid, description=:description '
128
'WHERE mytable.myid = :myid_1',
129
params={'myid_1': 12, 'myid': 9, 'description': 'test'})
131
def test_update_8(self):
132
table1 = self.tables.mytable
135
update(table1, table1.c.myid == 12),
136
'UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1',
137
params={'myid': 18}, checkparams={'myid': 18, 'myid_1': 12})
139
def test_update_9(self):
140
table1 = self.tables.mytable
142
s = table1.update(table1.c.myid == 12, values={table1.c.name: 'lala'})
143
c = s.compile(column_keys=['id', 'name'])
146
def test_update_10(self):
147
table1 = self.tables.mytable
149
v1 = {table1.c.name: table1.c.myid}
150
v2 = {table1.c.name: table1.c.name + 'foo'}
152
update(table1, table1.c.myid == 12, values=v1).values(v2),
155
'name=(mytable.name || :name_1), '
156
'description=:description '
157
'WHERE mytable.myid = :myid_1',
158
params={'description': 'test'})
160
def test_update_11(self):
161
table1 = self.tables.mytable
164
table1.c.name: table1.c.name + 'lala',
165
table1.c.myid: func.do_stuff(table1.c.myid, literal('hoho'))
167
self.assert_compile(update(table1,
168
(table1.c.myid == func.hoho(4)) &
169
(table1.c.name == literal('foo') +
170
table1.c.name + literal('lala')),
174
'myid=do_stuff(mytable.myid, :param_1), '
175
'name=(mytable.name || :name_1) '
177
'mytable.myid = hoho(:hoho_1) AND '
178
'mytable.name = :param_2 || mytable.name || :param_3')
180
def test_prefix_with(self):
181
table1 = self.tables.mytable
183
stmt = table1.update().\
184
prefix_with('A', 'B', dialect='mysql').\
185
prefix_with('C', 'D')
187
self.assert_compile(stmt,
188
'UPDATE C D mytable SET myid=:myid, name=:name, '
189
'description=:description')
191
self.assert_compile(stmt,
192
'UPDATE A B C D mytable SET myid=%s, name=%s, description=%s',
193
dialect=mysql.dialect())
195
def test_alias(self):
196
table1 = self.tables.mytable
197
talias1 = table1.alias('t1')
199
self.assert_compile(update(talias1, talias1.c.myid == 7),
200
'UPDATE mytable AS t1 '
202
'WHERE t1.myid = :myid_1',
203
params={table1.c.name: 'fred'})
205
self.assert_compile(update(talias1, table1.c.myid == 7),
206
'UPDATE mytable AS t1 '
209
'WHERE mytable.myid = :myid_1',
210
params={table1.c.name: 'fred'})
212
def test_update_to_expression(self):
213
"""test update from an expression.
215
this logic is triggered currently by a left side that doesn't
216
have a key. The current supported use case is updating the index
217
of a Postgresql ARRAY type.
220
table1 = self.tables.mytable
221
expr = func.foo(table1.c.myid)
222
assert not hasattr(expr, 'key')
223
self.assert_compile(table1.update().values({expr: 'bar'}),
224
'UPDATE mytable SET foo(myid)=:param_1')
226
def test_update_bound_ordering(self):
227
"""test that bound parameters between the UPDATE and FROM clauses
228
order correctly in different SQL compilation scenarios.
231
table1 = self.tables.mytable
232
table2 = self.tables.myothertable
233
sel = select([table2]).where(table2.c.otherid == 5).alias()
234
upd = table1.update().\
235
where(table1.c.name == sel.c.othername).\
238
dialect = default.DefaultDialect()
239
dialect.positional = True
242
"UPDATE mytable SET name=:name FROM (SELECT "
243
"myothertable.otherid AS otherid, "
244
"myothertable.othername AS othername "
246
"WHERE myothertable.otherid = :otherid_1) AS anon_1 "
247
"WHERE mytable.name = anon_1.othername",
248
checkpositional=('foo', 5),
254
"UPDATE mytable, (SELECT myothertable.otherid AS otherid, "
255
"myothertable.othername AS othername "
257
"WHERE myothertable.otherid = %s) AS anon_1 SET mytable.name=%s "
258
"WHERE mytable.name = anon_1.othername",
259
checkpositional=(5, 'foo'),
260
dialect=mysql.dialect()
265
class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest,
62
267
__dialect__ = 'default'
64
269
run_create_tables = run_inserts = run_deletes = None
66
271
def test_render_table(self):
67
272
users, addresses = self.tables.users, self.tables.addresses
68
274
self.assert_compile(
70
values(name='newname').\
71
where(users.c.id==addresses.c.user_id).\
72
where(addresses.c.email_address=='e1'),
73
"UPDATE users SET name=:name FROM addresses "
74
"WHERE users.id = addresses.user_id AND "
75
"addresses.email_address = :email_address_1",
76
checkparams={u'email_address_1': 'e1', 'name': 'newname'}
276
values(name='newname').
277
where(users.c.id == addresses.c.user_id).
278
where(addresses.c.email_address == 'e1'),
280
'SET name=:name FROM addresses '
282
'users.id = addresses.user_id AND '
283
'addresses.email_address = :email_address_1',
284
checkparams={u'email_address_1': 'e1', 'name': 'newname'})
79
286
def test_render_multi_table(self):
80
users, addresses, dingalings = \
82
self.tables.addresses, \
83
self.tables.dingalings
287
users = self.tables.users
288
addresses = self.tables.addresses
289
dingalings = self.tables.dingalings
292
u'email_address_1': 'e1',
84
297
self.assert_compile(
86
values(name='newname').\
87
where(users.c.id==addresses.c.user_id).\
88
where(addresses.c.email_address=='e1').\
89
where(addresses.c.id==dingalings.c.address_id).\
90
where(dingalings.c.id==2),
91
"UPDATE users SET name=:name FROM addresses, "
92
"dingalings WHERE users.id = addresses.user_id "
93
"AND addresses.email_address = :email_address_1 "
94
"AND addresses.id = dingalings.address_id AND "
95
"dingalings.id = :id_1",
96
checkparams={u'email_address_1': 'e1', u'id_1': 2,
299
values(name='newname').
300
where(users.c.id == addresses.c.user_id).
301
where(addresses.c.email_address == 'e1').
302
where(addresses.c.id == dingalings.c.address_id).
303
where(dingalings.c.id == 2),
306
'FROM addresses, dingalings '
308
'users.id = addresses.user_id AND '
309
'addresses.email_address = :email_address_1 AND '
310
'addresses.id = dingalings.address_id AND '
311
'dingalings.id = :id_1',
312
checkparams=checkparams)
100
314
def test_render_table_mysql(self):
101
315
users, addresses = self.tables.users, self.tables.addresses
102
317
self.assert_compile(
104
values(name='newname').\
105
where(users.c.id==addresses.c.user_id).\
106
where(addresses.c.email_address=='e1'),
107
"UPDATE users, addresses SET users.name=%s "
108
"WHERE users.id = addresses.user_id AND "
109
"addresses.email_address = %s",
319
values(name='newname').
320
where(users.c.id == addresses.c.user_id).
321
where(addresses.c.email_address == 'e1'),
322
'UPDATE users, addresses '
325
'users.id = addresses.user_id AND '
326
'addresses.email_address = %s',
110
327
checkparams={u'email_address_1': 'e1', 'name': 'newname'},
111
dialect=mysql.dialect()
328
dialect=mysql.dialect())
114
330
def test_render_subquery(self):
115
331
users, addresses = self.tables.users, self.tables.addresses
116
subq = select([addresses.c.id,
118
addresses.c.email_address]).\
119
where(addresses.c.id==7).alias()
334
u'email_address_1': 'e1',
342
addresses.c.email_address
345
subq = select(cols).where(addresses.c.id == 7).alias()
120
346
self.assert_compile(
122
values(name='newname').\
123
where(users.c.id==subq.c.user_id).\
124
where(subq.c.email_address=='e1'),
125
"UPDATE users SET name=:name FROM "
126
"(SELECT addresses.id AS id, addresses.user_id "
127
"AS user_id, addresses.email_address AS "
128
"email_address FROM addresses WHERE addresses.id = "
129
":id_1) AS anon_1 WHERE users.id = anon_1.user_id "
130
"AND anon_1.email_address = :email_address_1",
131
checkparams={u'email_address_1': 'e1',
132
u'id_1': 7, 'name': 'newname'}
348
values(name='newname').
349
where(users.c.id == subq.c.user_id).
350
where(subq.c.email_address == 'e1'),
352
'SET name=:name FROM ('
354
'addresses.id AS id, '
355
'addresses.user_id AS user_id, '
356
'addresses.email_address AS email_address '
358
'WHERE addresses.id = :id_1'
360
'WHERE users.id = anon_1.user_id '
361
'AND anon_1.email_address = :email_address_1',
362
checkparams=checkparams)
135
365
class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest):
137
367
@testing.requires.update_from
138
368
def test_exec_two_table(self):
139
369
users, addresses = self.tables.users, self.tables.addresses
140
371
testing.db.execute(
142
values(email_address=users.c.name).\
143
where(users.c.id==addresses.c.user_id).\
144
where(users.c.name=='ed')
149
order_by(addresses.c.id)).fetchall(),
151
(1, 7, 'x', "jack@bean.com"),
155
(5, 9, 'x', "fred@fred.com")
373
values(email_address=users.c.name).
374
where(users.c.id == addresses.c.user_id).
375
where(users.c.name == 'ed'))
378
(1, 7, 'x', 'jack@bean.com'),
382
(5, 9, 'x', 'fred@fred.com')]
383
self._assert_addresses(addresses, expected)
159
385
@testing.requires.update_from
160
386
def test_exec_two_table_plus_alias(self):
161
387
users, addresses = self.tables.users, self.tables.addresses
162
389
a1 = addresses.alias()
164
390
testing.db.execute(
166
values(email_address=users.c.name).\
167
where(users.c.id==a1.c.user_id).\
168
where(users.c.name=='ed').\
169
where(a1.c.id==addresses.c.id)
174
order_by(addresses.c.id)).fetchall(),
176
(1, 7, 'x', "jack@bean.com"),
180
(5, 9, 'x', "fred@fred.com")
392
values(email_address=users.c.name).
393
where(users.c.id == a1.c.user_id).
394
where(users.c.name == 'ed').
395
where(a1.c.id == addresses.c.id)
399
(1, 7, 'x', 'jack@bean.com'),
403
(5, 9, 'x', 'fred@fred.com')]
404
self._assert_addresses(addresses, expected)
184
406
@testing.requires.update_from
185
407
def test_exec_three_table(self):
186
users, addresses, dingalings = \
188
self.tables.addresses, \
189
self.tables.dingalings
408
users = self.tables.users
409
addresses = self.tables.addresses
410
dingalings = self.tables.dingalings
190
412
testing.db.execute(
192
values(email_address=users.c.name).\
193
where(users.c.id==addresses.c.user_id).\
194
where(users.c.name=='ed').
195
where(addresses.c.id==dingalings.c.address_id).\
196
where(dingalings.c.id==1),
200
addresses.select().order_by(addresses.c.id)
203
(1, 7, 'x', "jack@bean.com"),
205
(3, 8, 'x', "ed@bettyboop.com"),
206
(4, 8, 'x', "ed@lala.com"),
207
(5, 9, 'x', "fred@fred.com")
414
values(email_address=users.c.name).
415
where(users.c.id == addresses.c.user_id).
416
where(users.c.name == 'ed').
417
where(addresses.c.id == dingalings.c.address_id).
418
where(dingalings.c.id == 1))
421
(1, 7, 'x', 'jack@bean.com'),
423
(3, 8, 'x', 'ed@bettyboop.com'),
424
(4, 8, 'x', 'ed@lala.com'),
425
(5, 9, 'x', 'fred@fred.com')]
426
self._assert_addresses(addresses, expected)
211
428
@testing.only_on('mysql', 'Multi table update')
212
429
def test_exec_multitable(self):
213
430
users, addresses = self.tables.users, self.tables.addresses
433
addresses.c.email_address: users.c.name,
214
437
testing.db.execute(
217
addresses.c.email_address:users.c.name,
220
where(users.c.id==addresses.c.user_id).\
221
where(users.c.name=='ed')
225
addresses.select().order_by(addresses.c.id)).fetchall(),
227
(1, 7, 'x', "jack@bean.com"),
231
(5, 9, 'x', "fred@fred.com")
236
users.select().order_by(users.c.id)).fetchall(),
245
class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, fixtures.TablesTest):
440
where(users.c.id == addresses.c.user_id).
441
where(users.c.name == 'ed'))
444
(1, 7, 'x', 'jack@bean.com'),
448
(5, 9, 'x', 'fred@fred.com')]
449
self._assert_addresses(addresses, expected)
456
self._assert_users(users, expected)
458
def _assert_addresses(self, addresses, expected):
459
stmt = addresses.select().order_by(addresses.c.id)
460
eq_(testing.db.execute(stmt).fetchall(), expected)
462
def _assert_users(self, users, expected):
463
stmt = users.select().order_by(users.c.id)
464
eq_(testing.db.execute(stmt).fetchall(), expected)
467
class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase,
468
fixtures.TablesTest):
247
470
def define_tables(cls, metadata):
248
471
Table('users', metadata,
249
472
Column('id', Integer, primary_key=True,
250
test_needs_autoincrement=True),
473
test_needs_autoincrement=True),
251
474
Column('name', String(30), nullable=False),
252
Column('some_update', String(30), onupdate="im the update")
475
Column('some_update', String(30), onupdate='im the update'))
255
477
Table('addresses', metadata,
256
478
Column('id', Integer, primary_key=True,
257
test_needs_autoincrement=True),
479
test_needs_autoincrement=True),
258
480
Column('user_id', None, ForeignKey('users.id')),
259
Column('email_address', String(50), nullable=False),
481
Column('email_address', String(50), nullable=False))
263
484
def fixtures(cls):
266
487
('id', 'name', 'some_update'),
267
488
(8, 'ed', 'value'),
268
489
(9, 'fred', 'value'),
272
492
('id', 'user_id', 'email_address'),
273
(2, 8, "ed@wood.com"),
274
(3, 8, "ed@bettyboop.com"),
275
(4, 9, "fred@fred.com")
493
(2, 8, 'ed@wood.com'),
494
(3, 8, 'ed@bettyboop.com'),
495
(4, 9, 'fred@fred.com')
279
499
@testing.only_on('mysql', 'Multi table update')
280
500
def test_defaults_second_table(self):
281
501
users, addresses = self.tables.users, self.tables.addresses
504
addresses.c.email_address: users.c.name,
282
508
ret = testing.db.execute(
285
addresses.c.email_address:users.c.name,
288
where(users.c.id==addresses.c.user_id).\
289
where(users.c.name=='ed')
292
set(ret.prefetch_cols()),
293
set([users.c.some_update])
297
addresses.select().order_by(addresses.c.id)).fetchall(),
301
(4, 9, "fred@fred.com")
306
users.select().order_by(users.c.id)).fetchall(),
308
(8, 'ed2', 'im the update'),
309
(9, 'fred', 'value'),
511
where(users.c.id == addresses.c.user_id).
512
where(users.c.name == 'ed'))
514
eq_(set(ret.prefetch_cols()), set([users.c.some_update]))
519
(4, 9, 'fred@fred.com')]
520
self._assert_addresses(addresses, expected)
523
(8, 'ed2', 'im the update'),
524
(9, 'fred', 'value')]
525
self._assert_users(users, expected)
313
527
@testing.only_on('mysql', 'Multi table update')
314
528
def test_no_defaults_second_table(self):
315
529
users, addresses = self.tables.users, self.tables.addresses
316
531
ret = testing.db.execute(
319
'email_address':users.c.name,
321
where(users.c.id==addresses.c.user_id).\
322
where(users.c.name=='ed')
325
ret.prefetch_cols(),[]
329
addresses.select().order_by(addresses.c.id)).fetchall(),
333
(4, 9, "fred@fred.com")
336
# users table not actually updated,
340
users.select().order_by(users.c.id)).fetchall(),
343
(9, 'fred', 'value'),
533
values({'email_address': users.c.name}).
534
where(users.c.id == addresses.c.user_id).
535
where(users.c.name == 'ed'))
537
eq_(ret.prefetch_cols(), [])
542
(4, 9, 'fred@fred.com')]
543
self._assert_addresses(addresses, expected)
545
# users table not actually updated, so no onupdate
548
(9, 'fred', 'value')]
549
self._assert_users(users, expected)
551
def _assert_addresses(self, addresses, expected):
552
stmt = addresses.select().order_by(addresses.c.id)
553
eq_(testing.db.execute(stmt).fetchall(), expected)
555
def _assert_users(self, users, expected):
556
stmt = users.select().order_by(users.c.id)
557
eq_(testing.db.execute(stmt).fetchall(), expected)