~ubuntu-branches/debian/jessie/sqlalchemy/jessie

« back to all changes in this revision

Viewing changes to test/sql/test_update.py

  • Committer: Package Import Robot
  • Author(s): Piotr Ożarowski, Jakub Wilk, Piotr Ożarowski
  • Date: 2013-07-06 20:53:52 UTC
  • mfrom: (1.4.23) (16.1.17 experimental)
  • Revision ID: package-import@ubuntu.com-20130706205352-ryppl1eto3illd79
Tags: 0.8.2-1
[ Jakub Wilk ]
* Use canonical URIs for Vcs-* fields.

[ Piotr Ożarowski ]
* New upstream release
* Upload to unstable
* Build depend on python3-all instead of -dev, extensions are not built for
  Python 3.X 

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
from test.lib.testing import eq_, assert_raises_message, assert_raises, AssertsCompiledSQL
2
 
import datetime
3
1
from sqlalchemy import *
4
 
from sqlalchemy import exc, sql, util
5
 
from sqlalchemy.engine import default, base
6
 
from test.lib import *
7
 
from test.lib.schema import Table, Column
 
2
from sqlalchemy import testing
8
3
from sqlalchemy.dialects import mysql
 
4
from sqlalchemy.engine import default
 
5
from sqlalchemy.testing import AssertsCompiledSQL, eq_, fixtures
 
6
from sqlalchemy.testing.schema import Table, Column
 
7
 
9
8
 
10
9
class _UpdateFromTestBase(object):
11
10
    @classmethod
12
11
    def define_tables(cls, metadata):
 
12
        Table('mytable', metadata,
 
13
              Column('myid', Integer),
 
14
              Column('name', String(30)),
 
15
              Column('description', String(50)))
 
16
        Table('myothertable', metadata,
 
17
              Column('otherid', Integer),
 
18
              Column('othername', String(30)))
13
19
        Table('users', metadata,
14
20
              Column('id', Integer, primary_key=True,
15
 
                            test_needs_autoincrement=True),
16
 
              Column('name', String(30), nullable=False),
17
 
        )
18
 
 
 
21
                     test_needs_autoincrement=True),
 
22
              Column('name', String(30), nullable=False))
19
23
        Table('addresses', metadata,
20
24
              Column('id', Integer, primary_key=True,
21
 
                            test_needs_autoincrement=True),
 
25
                     test_needs_autoincrement=True),
22
26
              Column('user_id', None, ForeignKey('users.id')),
23
27
              Column('name', String(30), nullable=False),
24
 
              Column('email_address', String(50), nullable=False),
25
 
        )
26
 
 
27
 
        Table("dingalings", metadata,
 
28
              Column('email_address', String(50), nullable=False))
 
29
        Table('dingalings', metadata,
28
30
              Column('id', Integer, primary_key=True,
29
 
                            test_needs_autoincrement=True),
 
31
                     test_needs_autoincrement=True),
30
32
              Column('address_id', None, ForeignKey('addresses.id')),
31
 
              Column('data', String(30)),
32
 
        )
 
33
              Column('data', String(30)))
33
34
 
34
35
    @classmethod
35
36
    def fixtures(cls):
36
37
        return dict(
37
 
            users = (
 
38
            users=(
38
39
                ('id', 'name'),
39
40
                (7, 'jack'),
40
41
                (8, 'ed'),
41
42
                (9, 'fred'),
42
43
                (10, 'chuck')
43
44
            ),
44
 
 
45
45
            addresses = (
46
46
                ('id', 'user_id', 'name', 'email_address'),
47
 
                (1, 7, 'x', "jack@bean.com"),
48
 
                (2, 8, 'x', "ed@wood.com"),
49
 
                (3, 8, 'x', "ed@bettyboop.com"),
50
 
                (4, 8, 'x', "ed@lala.com"),
51
 
                (5, 9, 'x', "fred@fred.com")
 
47
                (1, 7, 'x', 'jack@bean.com'),
 
48
                (2, 8, 'x', 'ed@wood.com'),
 
49
                (3, 8, 'x', 'ed@bettyboop.com'),
 
50
                (4, 8, 'x', 'ed@lala.com'),
 
51
                (5, 9, 'x', 'fred@fred.com')
52
52
            ),
53
53
            dingalings = (
54
54
                ('id', 'address_id', 'data'),
58
58
        )
59
59
 
60
60
 
61
 
class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
 
61
class UpdateTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL):
 
62
    __dialect__ = 'default'
 
63
 
 
64
    def test_update_1(self):
 
65
        table1 = self.tables.mytable
 
66
 
 
67
        self.assert_compile(
 
68
            update(table1, table1.c.myid == 7),
 
69
            'UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1',
 
70
            params={table1.c.name: 'fred'})
 
71
 
 
72
    def test_update_2(self):
 
73
        table1 = self.tables.mytable
 
74
 
 
75
        self.assert_compile(
 
76
            table1.update().
 
77
                where(table1.c.myid == 7).
 
78
                values({table1.c.myid: 5}),
 
79
            'UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1',
 
80
            checkparams={'myid': 5, 'myid_1': 7})
 
81
 
 
82
    def test_update_3(self):
 
83
        table1 = self.tables.mytable
 
84
 
 
85
        self.assert_compile(
 
86
            update(table1, table1.c.myid == 7),
 
87
            'UPDATE mytable SET name=:name WHERE mytable.myid = :myid_1',
 
88
            params={'name': 'fred'})
 
89
 
 
90
    def test_update_4(self):
 
91
        table1 = self.tables.mytable
 
92
 
 
93
        self.assert_compile(
 
94
            update(table1, values={table1.c.name: table1.c.myid}),
 
95
            'UPDATE mytable SET name=mytable.myid')
 
96
 
 
97
    def test_update_5(self):
 
98
        table1 = self.tables.mytable
 
99
 
 
100
        self.assert_compile(
 
101
            update(table1,
 
102
                whereclause=table1.c.name == bindparam('crit'),
 
103
                values={table1.c.name: 'hi'}),
 
104
            'UPDATE mytable SET name=:name WHERE mytable.name = :crit',
 
105
            params={'crit': 'notthere'},
 
106
            checkparams={'crit': 'notthere', 'name': 'hi'})
 
107
 
 
108
    def test_update_6(self):
 
109
        table1 = self.tables.mytable
 
110
 
 
111
        self.assert_compile(
 
112
            update(table1,
 
113
                table1.c.myid == 12,
 
114
                values={table1.c.name: table1.c.myid}),
 
115
            'UPDATE mytable '
 
116
            'SET name=mytable.myid, description=:description '
 
117
            'WHERE mytable.myid = :myid_1',
 
118
            params={'description': 'test'},
 
119
            checkparams={'description': 'test', 'myid_1': 12})
 
120
 
 
121
    def test_update_7(self):
 
122
        table1 = self.tables.mytable
 
123
 
 
124
        self.assert_compile(
 
125
            update(table1, table1.c.myid == 12, values={table1.c.myid: 9}),
 
126
            'UPDATE mytable '
 
127
            'SET myid=:myid, description=:description '
 
128
            'WHERE mytable.myid = :myid_1',
 
129
            params={'myid_1': 12, 'myid': 9, 'description': 'test'})
 
130
 
 
131
    def test_update_8(self):
 
132
        table1 = self.tables.mytable
 
133
 
 
134
        self.assert_compile(
 
135
            update(table1, table1.c.myid == 12),
 
136
            'UPDATE mytable SET myid=:myid WHERE mytable.myid = :myid_1',
 
137
            params={'myid': 18}, checkparams={'myid': 18, 'myid_1': 12})
 
138
 
 
139
    def test_update_9(self):
 
140
        table1 = self.tables.mytable
 
141
 
 
142
        s = table1.update(table1.c.myid == 12, values={table1.c.name: 'lala'})
 
143
        c = s.compile(column_keys=['id', 'name'])
 
144
        eq_(str(s), str(c))
 
145
 
 
146
    def test_update_10(self):
 
147
        table1 = self.tables.mytable
 
148
 
 
149
        v1 = {table1.c.name: table1.c.myid}
 
150
        v2 = {table1.c.name: table1.c.name + 'foo'}
 
151
        self.assert_compile(
 
152
            update(table1, table1.c.myid == 12, values=v1).values(v2),
 
153
            'UPDATE mytable '
 
154
            'SET '
 
155
                'name=(mytable.name || :name_1), '
 
156
                'description=:description '
 
157
            'WHERE mytable.myid = :myid_1',
 
158
            params={'description': 'test'})
 
159
 
 
160
    def test_update_11(self):
 
161
        table1 = self.tables.mytable
 
162
 
 
163
        values = {
 
164
            table1.c.name: table1.c.name + 'lala',
 
165
            table1.c.myid: func.do_stuff(table1.c.myid, literal('hoho'))
 
166
        }
 
167
        self.assert_compile(update(table1,
 
168
            (table1.c.myid == func.hoho(4)) &
 
169
            (table1.c.name == literal('foo') +
 
170
             table1.c.name + literal('lala')),
 
171
            values=values),
 
172
            'UPDATE mytable '
 
173
            'SET '
 
174
                'myid=do_stuff(mytable.myid, :param_1), '
 
175
                'name=(mytable.name || :name_1) '
 
176
            'WHERE '
 
177
                'mytable.myid = hoho(:hoho_1) AND '
 
178
                'mytable.name = :param_2 || mytable.name || :param_3')
 
179
 
 
180
    def test_prefix_with(self):
 
181
        table1 = self.tables.mytable
 
182
 
 
183
        stmt = table1.update().\
 
184
            prefix_with('A', 'B', dialect='mysql').\
 
185
            prefix_with('C', 'D')
 
186
 
 
187
        self.assert_compile(stmt,
 
188
            'UPDATE C D mytable SET myid=:myid, name=:name, '
 
189
            'description=:description')
 
190
 
 
191
        self.assert_compile(stmt,
 
192
            'UPDATE A B C D mytable SET myid=%s, name=%s, description=%s',
 
193
            dialect=mysql.dialect())
 
194
 
 
195
    def test_alias(self):
 
196
        table1 = self.tables.mytable
 
197
        talias1 = table1.alias('t1')
 
198
 
 
199
        self.assert_compile(update(talias1, talias1.c.myid == 7),
 
200
            'UPDATE mytable AS t1 '
 
201
            'SET name=:name '
 
202
            'WHERE t1.myid = :myid_1',
 
203
            params={table1.c.name: 'fred'})
 
204
 
 
205
        self.assert_compile(update(talias1, table1.c.myid == 7),
 
206
            'UPDATE mytable AS t1 '
 
207
            'SET name=:name '
 
208
            'FROM mytable '
 
209
            'WHERE mytable.myid = :myid_1',
 
210
            params={table1.c.name: 'fred'})
 
211
 
 
212
    def test_update_to_expression(self):
 
213
        """test update from an expression.
 
214
 
 
215
        this logic is triggered currently by a left side that doesn't
 
216
        have a key.  The current supported use case is updating the index
 
217
        of a Postgresql ARRAY type.
 
218
 
 
219
        """
 
220
        table1 = self.tables.mytable
 
221
        expr = func.foo(table1.c.myid)
 
222
        assert not hasattr(expr, 'key')
 
223
        self.assert_compile(table1.update().values({expr: 'bar'}),
 
224
            'UPDATE mytable SET foo(myid)=:param_1')
 
225
 
 
226
    def test_update_bound_ordering(self):
 
227
        """test that bound parameters between the UPDATE and FROM clauses
 
228
        order correctly in different SQL compilation scenarios.
 
229
 
 
230
        """
 
231
        table1 = self.tables.mytable
 
232
        table2 = self.tables.myothertable
 
233
        sel = select([table2]).where(table2.c.otherid == 5).alias()
 
234
        upd = table1.update().\
 
235
                    where(table1.c.name == sel.c.othername).\
 
236
                    values(name='foo')
 
237
 
 
238
        dialect = default.DefaultDialect()
 
239
        dialect.positional = True
 
240
        self.assert_compile(
 
241
            upd,
 
242
            "UPDATE mytable SET name=:name FROM (SELECT "
 
243
            "myothertable.otherid AS otherid, "
 
244
            "myothertable.othername AS othername "
 
245
            "FROM myothertable "
 
246
            "WHERE myothertable.otherid = :otherid_1) AS anon_1 "
 
247
            "WHERE mytable.name = anon_1.othername",
 
248
            checkpositional=('foo', 5),
 
249
            dialect=dialect
 
250
        )
 
251
 
 
252
        self.assert_compile(
 
253
            upd,
 
254
            "UPDATE mytable, (SELECT myothertable.otherid AS otherid, "
 
255
            "myothertable.othername AS othername "
 
256
            "FROM myothertable "
 
257
            "WHERE myothertable.otherid = %s) AS anon_1 SET mytable.name=%s "
 
258
            "WHERE mytable.name = anon_1.othername",
 
259
            checkpositional=(5, 'foo'),
 
260
            dialect=mysql.dialect()
 
261
        )
 
262
 
 
263
 
 
264
 
 
265
class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest,
 
266
                            AssertsCompiledSQL):
62
267
    __dialect__ = 'default'
63
268
 
64
269
    run_create_tables = run_inserts = run_deletes = None
65
270
 
66
271
    def test_render_table(self):
67
272
        users, addresses = self.tables.users, self.tables.addresses
 
273
 
68
274
        self.assert_compile(
69
 
            users.update().\
70
 
                values(name='newname').\
71
 
                where(users.c.id==addresses.c.user_id).\
72
 
                where(addresses.c.email_address=='e1'),
73
 
            "UPDATE users SET name=:name FROM addresses "
74
 
            "WHERE users.id = addresses.user_id AND "
75
 
            "addresses.email_address = :email_address_1",
76
 
            checkparams={u'email_address_1': 'e1', 'name': 'newname'}
77
 
        )
 
275
            users.update().
 
276
                values(name='newname').
 
277
                where(users.c.id == addresses.c.user_id).
 
278
                where(addresses.c.email_address == 'e1'),
 
279
            'UPDATE users '
 
280
            'SET name=:name FROM addresses '
 
281
            'WHERE '
 
282
                'users.id = addresses.user_id AND '
 
283
                'addresses.email_address = :email_address_1',
 
284
            checkparams={u'email_address_1': 'e1', 'name': 'newname'})
78
285
 
79
286
    def test_render_multi_table(self):
80
 
        users, addresses, dingalings = \
81
 
                self.tables.users, \
82
 
                self.tables.addresses, \
83
 
                self.tables.dingalings
 
287
        users = self.tables.users
 
288
        addresses = self.tables.addresses
 
289
        dingalings = self.tables.dingalings
 
290
 
 
291
        checkparams = {
 
292
            u'email_address_1': 'e1',
 
293
            u'id_1': 2,
 
294
            'name': 'newname'
 
295
        }
 
296
 
84
297
        self.assert_compile(
85
 
            users.update().\
86
 
                values(name='newname').\
87
 
                where(users.c.id==addresses.c.user_id).\
88
 
                where(addresses.c.email_address=='e1').\
89
 
                where(addresses.c.id==dingalings.c.address_id).\
90
 
                where(dingalings.c.id==2),
91
 
            "UPDATE users SET name=:name FROM addresses, "
92
 
            "dingalings WHERE users.id = addresses.user_id "
93
 
            "AND addresses.email_address = :email_address_1 "
94
 
            "AND addresses.id = dingalings.address_id AND "
95
 
            "dingalings.id = :id_1",
96
 
            checkparams={u'email_address_1': 'e1', u'id_1': 2,
97
 
                                'name': 'newname'}
98
 
        )
 
298
            users.update().
 
299
                values(name='newname').
 
300
                where(users.c.id == addresses.c.user_id).
 
301
                where(addresses.c.email_address == 'e1').
 
302
                where(addresses.c.id == dingalings.c.address_id).
 
303
                where(dingalings.c.id == 2),
 
304
            'UPDATE users '
 
305
            'SET name=:name '
 
306
            'FROM addresses, dingalings '
 
307
            'WHERE '
 
308
                'users.id = addresses.user_id AND '
 
309
                'addresses.email_address = :email_address_1 AND '
 
310
                'addresses.id = dingalings.address_id AND '
 
311
                'dingalings.id = :id_1',
 
312
            checkparams=checkparams)
99
313
 
100
314
    def test_render_table_mysql(self):
101
315
        users, addresses = self.tables.users, self.tables.addresses
 
316
 
102
317
        self.assert_compile(
103
 
            users.update().\
104
 
                values(name='newname').\
105
 
                where(users.c.id==addresses.c.user_id).\
106
 
                where(addresses.c.email_address=='e1'),
107
 
            "UPDATE users, addresses SET users.name=%s "
108
 
            "WHERE users.id = addresses.user_id AND "
109
 
            "addresses.email_address = %s",
 
318
            users.update().
 
319
                values(name='newname').
 
320
                where(users.c.id == addresses.c.user_id).
 
321
                where(addresses.c.email_address == 'e1'),
 
322
            'UPDATE users, addresses '
 
323
            'SET users.name=%s '
 
324
            'WHERE '
 
325
                'users.id = addresses.user_id AND '
 
326
                'addresses.email_address = %s',
110
327
            checkparams={u'email_address_1': 'e1', 'name': 'newname'},
111
 
            dialect=mysql.dialect()
112
 
        )
 
328
            dialect=mysql.dialect())
113
329
 
114
330
    def test_render_subquery(self):
115
331
        users, addresses = self.tables.users, self.tables.addresses
116
 
        subq = select([addresses.c.id,
117
 
                        addresses.c.user_id,
118
 
                        addresses.c.email_address]).\
119
 
                            where(addresses.c.id==7).alias()
 
332
 
 
333
        checkparams = {
 
334
            u'email_address_1': 'e1',
 
335
            u'id_1': 7,
 
336
            'name': 'newname'
 
337
        }
 
338
 
 
339
        cols = [
 
340
            addresses.c.id,
 
341
            addresses.c.user_id,
 
342
            addresses.c.email_address
 
343
        ]
 
344
 
 
345
        subq = select(cols).where(addresses.c.id == 7).alias()
120
346
        self.assert_compile(
121
 
            users.update().\
122
 
                values(name='newname').\
123
 
                where(users.c.id==subq.c.user_id).\
124
 
                where(subq.c.email_address=='e1'),
125
 
            "UPDATE users SET name=:name FROM "
126
 
            "(SELECT addresses.id AS id, addresses.user_id "
127
 
            "AS user_id, addresses.email_address AS "
128
 
            "email_address FROM addresses WHERE addresses.id = "
129
 
            ":id_1) AS anon_1 WHERE users.id = anon_1.user_id "
130
 
            "AND anon_1.email_address = :email_address_1",
131
 
            checkparams={u'email_address_1': 'e1',
132
 
                            u'id_1': 7, 'name': 'newname'}
133
 
        )
 
347
            users.update().
 
348
                values(name='newname').
 
349
                where(users.c.id == subq.c.user_id).
 
350
                where(subq.c.email_address == 'e1'),
 
351
            'UPDATE users '
 
352
            'SET name=:name FROM ('
 
353
                'SELECT '
 
354
                    'addresses.id AS id, '
 
355
                    'addresses.user_id AS user_id, '
 
356
                    'addresses.email_address AS email_address '
 
357
                'FROM addresses '
 
358
                'WHERE addresses.id = :id_1'
 
359
            ') AS anon_1 '
 
360
            'WHERE users.id = anon_1.user_id '
 
361
            'AND anon_1.email_address = :email_address_1',
 
362
            checkparams=checkparams)
 
363
 
134
364
 
135
365
class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest):
136
366
 
137
367
    @testing.requires.update_from
138
368
    def test_exec_two_table(self):
139
369
        users, addresses = self.tables.users, self.tables.addresses
 
370
 
140
371
        testing.db.execute(
141
 
            addresses.update().\
142
 
                values(email_address=users.c.name).\
143
 
                where(users.c.id==addresses.c.user_id).\
144
 
                where(users.c.name=='ed')
145
 
        )
146
 
        eq_(
147
 
            testing.db.execute(
148
 
                addresses.select().\
149
 
                    order_by(addresses.c.id)).fetchall(),
150
 
            [
151
 
                (1, 7, 'x', "jack@bean.com"),
152
 
                (2, 8, 'x', "ed"),
153
 
                (3, 8, 'x', "ed"),
154
 
                (4, 8, 'x', "ed"),
155
 
                (5, 9, 'x', "fred@fred.com")
156
 
            ]
157
 
        )
 
372
            addresses.update().
 
373
                values(email_address=users.c.name).
 
374
                where(users.c.id == addresses.c.user_id).
 
375
                where(users.c.name == 'ed'))
 
376
 
 
377
        expected = [
 
378
            (1, 7, 'x', 'jack@bean.com'),
 
379
            (2, 8, 'x', 'ed'),
 
380
            (3, 8, 'x', 'ed'),
 
381
            (4, 8, 'x', 'ed'),
 
382
            (5, 9, 'x', 'fred@fred.com')]
 
383
        self._assert_addresses(addresses, expected)
158
384
 
159
385
    @testing.requires.update_from
160
386
    def test_exec_two_table_plus_alias(self):
161
387
        users, addresses = self.tables.users, self.tables.addresses
 
388
 
162
389
        a1 = addresses.alias()
163
 
 
164
390
        testing.db.execute(
165
 
            addresses.update().\
166
 
                values(email_address=users.c.name).\
167
 
                where(users.c.id==a1.c.user_id).\
168
 
                where(users.c.name=='ed').\
169
 
                where(a1.c.id==addresses.c.id)
170
 
        )
171
 
        eq_(
172
 
            testing.db.execute(
173
 
                addresses.select().\
174
 
                    order_by(addresses.c.id)).fetchall(),
175
 
            [
176
 
                (1, 7, 'x', "jack@bean.com"),
177
 
                (2, 8, 'x', "ed"),
178
 
                (3, 8, 'x', "ed"),
179
 
                (4, 8, 'x', "ed"),
180
 
                (5, 9, 'x', "fred@fred.com")
181
 
            ]
182
 
        )
 
391
            addresses.update().
 
392
                values(email_address=users.c.name).
 
393
                where(users.c.id == a1.c.user_id).
 
394
                where(users.c.name == 'ed').
 
395
                where(a1.c.id == addresses.c.id)
 
396
        )
 
397
 
 
398
        expected = [
 
399
            (1, 7, 'x', 'jack@bean.com'),
 
400
            (2, 8, 'x', 'ed'),
 
401
            (3, 8, 'x', 'ed'),
 
402
            (4, 8, 'x', 'ed'),
 
403
            (5, 9, 'x', 'fred@fred.com')]
 
404
        self._assert_addresses(addresses, expected)
183
405
 
184
406
    @testing.requires.update_from
185
407
    def test_exec_three_table(self):
186
 
        users, addresses, dingalings = \
187
 
                self.tables.users, \
188
 
                self.tables.addresses, \
189
 
                self.tables.dingalings
 
408
        users = self.tables.users
 
409
        addresses = self.tables.addresses
 
410
        dingalings = self.tables.dingalings
 
411
 
190
412
        testing.db.execute(
191
 
            addresses.update().\
192
 
                values(email_address=users.c.name).\
193
 
                where(users.c.id==addresses.c.user_id).\
194
 
                where(users.c.name=='ed').
195
 
                where(addresses.c.id==dingalings.c.address_id).\
196
 
                where(dingalings.c.id==1),
197
 
        )
198
 
        eq_(
199
 
            testing.db.execute(
200
 
                    addresses.select().order_by(addresses.c.id)
201
 
                ).fetchall(),
202
 
            [
203
 
                (1, 7, 'x', "jack@bean.com"),
204
 
                (2, 8, 'x', "ed"),
205
 
                (3, 8, 'x', "ed@bettyboop.com"),
206
 
                (4, 8, 'x', "ed@lala.com"),
207
 
                (5, 9, 'x', "fred@fred.com")
208
 
            ]
209
 
        )
 
413
            addresses.update().
 
414
                values(email_address=users.c.name).
 
415
                where(users.c.id == addresses.c.user_id).
 
416
                where(users.c.name == 'ed').
 
417
                where(addresses.c.id == dingalings.c.address_id).
 
418
                where(dingalings.c.id == 1))
 
419
 
 
420
        expected = [
 
421
            (1, 7, 'x', 'jack@bean.com'),
 
422
            (2, 8, 'x', 'ed'),
 
423
            (3, 8, 'x', 'ed@bettyboop.com'),
 
424
            (4, 8, 'x', 'ed@lala.com'),
 
425
            (5, 9, 'x', 'fred@fred.com')]
 
426
        self._assert_addresses(addresses, expected)
210
427
 
211
428
    @testing.only_on('mysql', 'Multi table update')
212
429
    def test_exec_multitable(self):
213
430
        users, addresses = self.tables.users, self.tables.addresses
 
431
 
 
432
        values = {
 
433
            addresses.c.email_address: users.c.name,
 
434
            users.c.name: 'ed2'
 
435
        }
 
436
 
214
437
        testing.db.execute(
215
 
            addresses.update().\
216
 
                values({
217
 
                        addresses.c.email_address:users.c.name,
218
 
                        users.c.name:'ed2'
219
 
                }).\
220
 
                where(users.c.id==addresses.c.user_id).\
221
 
                where(users.c.name=='ed')
222
 
        )
223
 
        eq_(
224
 
            testing.db.execute(
225
 
                addresses.select().order_by(addresses.c.id)).fetchall(),
226
 
            [
227
 
                (1, 7, 'x', "jack@bean.com"),
228
 
                (2, 8, 'x', "ed"),
229
 
                (3, 8, 'x', "ed"),
230
 
                (4, 8, 'x', "ed"),
231
 
                (5, 9, 'x', "fred@fred.com")
232
 
            ]
233
 
        )
234
 
        eq_(
235
 
            testing.db.execute(
236
 
                users.select().order_by(users.c.id)).fetchall(),
237
 
            [
238
 
                (7, 'jack'),
239
 
                (8, 'ed2'),
240
 
                (9, 'fred'),
241
 
                (10, 'chuck')
242
 
            ]
243
 
        )
244
 
 
245
 
class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, fixtures.TablesTest):
 
438
            addresses.update().
 
439
                values(values).
 
440
                where(users.c.id == addresses.c.user_id).
 
441
                where(users.c.name == 'ed'))
 
442
 
 
443
        expected = [
 
444
            (1, 7, 'x', 'jack@bean.com'),
 
445
            (2, 8, 'x', 'ed'),
 
446
            (3, 8, 'x', 'ed'),
 
447
            (4, 8, 'x', 'ed'),
 
448
            (5, 9, 'x', 'fred@fred.com')]
 
449
        self._assert_addresses(addresses, expected)
 
450
 
 
451
        expected = [
 
452
            (7, 'jack'),
 
453
            (8, 'ed2'),
 
454
            (9, 'fred'),
 
455
            (10, 'chuck')]
 
456
        self._assert_users(users, expected)
 
457
 
 
458
    def _assert_addresses(self, addresses, expected):
 
459
        stmt = addresses.select().order_by(addresses.c.id)
 
460
        eq_(testing.db.execute(stmt).fetchall(), expected)
 
461
 
 
462
    def _assert_users(self, users, expected):
 
463
        stmt = users.select().order_by(users.c.id)
 
464
        eq_(testing.db.execute(stmt).fetchall(), expected)
 
465
 
 
466
 
 
467
class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase,
 
468
                                             fixtures.TablesTest):
246
469
    @classmethod
247
470
    def define_tables(cls, metadata):
248
471
        Table('users', metadata,
249
472
              Column('id', Integer, primary_key=True,
250
 
                            test_needs_autoincrement=True),
 
473
                     test_needs_autoincrement=True),
251
474
              Column('name', String(30), nullable=False),
252
 
              Column('some_update', String(30), onupdate="im the update")
253
 
        )
 
475
              Column('some_update', String(30), onupdate='im the update'))
254
476
 
255
477
        Table('addresses', metadata,
256
478
              Column('id', Integer, primary_key=True,
257
 
                            test_needs_autoincrement=True),
 
479
                     test_needs_autoincrement=True),
258
480
              Column('user_id', None, ForeignKey('users.id')),
259
 
              Column('email_address', String(50), nullable=False),
260
 
        )
 
481
              Column('email_address', String(50), nullable=False))
261
482
 
262
483
    @classmethod
263
484
    def fixtures(cls):
264
485
        return dict(
265
 
            users = (
 
486
            users=(
266
487
                ('id', 'name', 'some_update'),
267
488
                (8, 'ed', 'value'),
268
489
                (9, 'fred', 'value'),
269
490
            ),
270
 
 
271
 
            addresses = (
 
491
            addresses=(
272
492
                ('id', 'user_id', 'email_address'),
273
 
                (2, 8, "ed@wood.com"),
274
 
                (3, 8, "ed@bettyboop.com"),
275
 
                (4, 9, "fred@fred.com")
 
493
                (2, 8, 'ed@wood.com'),
 
494
                (3, 8, 'ed@bettyboop.com'),
 
495
                (4, 9, 'fred@fred.com')
276
496
            ),
277
497
        )
278
498
 
279
499
    @testing.only_on('mysql', 'Multi table update')
280
500
    def test_defaults_second_table(self):
281
501
        users, addresses = self.tables.users, self.tables.addresses
 
502
 
 
503
        values = {
 
504
            addresses.c.email_address: users.c.name,
 
505
            users.c.name: 'ed2'
 
506
        }
 
507
 
282
508
        ret = testing.db.execute(
283
 
            addresses.update().\
284
 
                values({
285
 
                        addresses.c.email_address:users.c.name,
286
 
                        users.c.name:'ed2'
287
 
                }).\
288
 
                where(users.c.id==addresses.c.user_id).\
289
 
                where(users.c.name=='ed')
290
 
        )
291
 
        eq_(
292
 
            set(ret.prefetch_cols()),
293
 
            set([users.c.some_update])
294
 
        )
295
 
        eq_(
296
 
            testing.db.execute(
297
 
                addresses.select().order_by(addresses.c.id)).fetchall(),
298
 
            [
299
 
                (2, 8, "ed"),
300
 
                (3, 8, "ed"),
301
 
                (4, 9, "fred@fred.com")
302
 
            ]
303
 
        )
304
 
        eq_(
305
 
            testing.db.execute(
306
 
                users.select().order_by(users.c.id)).fetchall(),
307
 
            [
308
 
                (8, 'ed2', 'im the update'),
309
 
                (9, 'fred', 'value'),
310
 
            ]
311
 
        )
 
509
            addresses.update().
 
510
                values(values).
 
511
                where(users.c.id == addresses.c.user_id).
 
512
                where(users.c.name == 'ed'))
 
513
 
 
514
        eq_(set(ret.prefetch_cols()), set([users.c.some_update]))
 
515
 
 
516
        expected = [
 
517
            (2, 8, 'ed'),
 
518
            (3, 8, 'ed'),
 
519
            (4, 9, 'fred@fred.com')]
 
520
        self._assert_addresses(addresses, expected)
 
521
 
 
522
        expected = [
 
523
            (8, 'ed2', 'im the update'),
 
524
            (9, 'fred', 'value')]
 
525
        self._assert_users(users, expected)
312
526
 
313
527
    @testing.only_on('mysql', 'Multi table update')
314
528
    def test_no_defaults_second_table(self):
315
529
        users, addresses = self.tables.users, self.tables.addresses
 
530
 
316
531
        ret = testing.db.execute(
317
 
            addresses.update().\
318
 
                values({
319
 
                        'email_address':users.c.name,
320
 
                }).\
321
 
                where(users.c.id==addresses.c.user_id).\
322
 
                where(users.c.name=='ed')
323
 
        )
324
 
        eq_(
325
 
            ret.prefetch_cols(),[]
326
 
        )
327
 
        eq_(
328
 
            testing.db.execute(
329
 
                addresses.select().order_by(addresses.c.id)).fetchall(),
330
 
            [
331
 
                (2, 8, "ed"),
332
 
                (3, 8, "ed"),
333
 
                (4, 9, "fred@fred.com")
334
 
            ]
335
 
        )
336
 
        # users table not actually updated,
337
 
        # so no onupdate
338
 
        eq_(
339
 
            testing.db.execute(
340
 
                users.select().order_by(users.c.id)).fetchall(),
341
 
            [
342
 
                (8, 'ed', 'value'),
343
 
                (9, 'fred', 'value'),
344
 
            ]
345
 
        )
 
532
            addresses.update().
 
533
                values({'email_address': users.c.name}).
 
534
                where(users.c.id == addresses.c.user_id).
 
535
                where(users.c.name == 'ed'))
 
536
 
 
537
        eq_(ret.prefetch_cols(), [])
 
538
 
 
539
        expected = [
 
540
            (2, 8, 'ed'),
 
541
            (3, 8, 'ed'),
 
542
            (4, 9, 'fred@fred.com')]
 
543
        self._assert_addresses(addresses, expected)
 
544
 
 
545
        # users table not actually updated, so no onupdate
 
546
        expected = [
 
547
            (8, 'ed', 'value'),
 
548
            (9, 'fred', 'value')]
 
549
        self._assert_users(users, expected)
 
550
 
 
551
    def _assert_addresses(self, addresses, expected):
 
552
        stmt = addresses.select().order_by(addresses.c.id)
 
553
        eq_(testing.db.execute(stmt).fetchall(), expected)
 
554
 
 
555
    def _assert_users(self, users, expected):
 
556
        stmt = users.select().order_by(users.c.id)
 
557
        eq_(testing.db.execute(stmt).fetchall(), expected)