~ubuntu-branches/debian/jessie/sqlalchemy/jessie

« back to all changes in this revision

Viewing changes to test/orm/test_session.py

  • Committer: Package Import Robot
  • Author(s): Piotr Ożarowski, Jakub Wilk, Piotr Ożarowski
  • Date: 2013-07-06 20:53:52 UTC
  • mfrom: (1.4.23) (16.1.17 experimental)
  • Revision ID: package-import@ubuntu.com-20130706205352-ryppl1eto3illd79
Tags: 0.8.2-1
[ Jakub Wilk ]
* Use canonical URIs for Vcs-* fields.

[ Piotr Ożarowski ]
* New upstream release
* Upload to unstable
* Build depend on python3-all instead of -dev, extensions are not built for
  Python 3.X 

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
from test.lib.testing import eq_, assert_raises, \
2
 
    assert_raises_message, assert_warnings
3
 
from test.lib.util import gc_collect
4
 
from test.lib import pickleable
 
1
from sqlalchemy.testing import eq_, assert_raises, \
 
2
    assert_raises_message
 
3
from sqlalchemy.testing.util import gc_collect
 
4
from sqlalchemy.testing import pickleable
5
5
from sqlalchemy.util import pickle
6
6
import inspect
7
7
from sqlalchemy.orm import create_session, sessionmaker, attributes, \
8
8
    make_transient, Session
9
9
import sqlalchemy as sa
10
 
from test.lib import engines, testing, config
 
10
from sqlalchemy.testing import engines, config
 
11
from sqlalchemy import testing
11
12
from sqlalchemy import Integer, String, Sequence
12
 
from test.lib.schema import Table, Column
 
13
from sqlalchemy.testing.schema import Table, Column
13
14
from sqlalchemy.orm import mapper, relationship, backref, joinedload, \
14
 
    exc as orm_exc, object_session
 
15
    exc as orm_exc, object_session, was_deleted
15
16
from sqlalchemy.util import pypy
16
 
from test.lib import fixtures
17
 
from test.lib import fixtures
 
17
from sqlalchemy.testing import fixtures
18
18
from test.orm import _fixtures
 
19
from sqlalchemy import event, ForeignKey
19
20
 
20
 
class SessionTest(_fixtures.FixtureTest):
 
21
class BindTest(_fixtures.FixtureTest):
21
22
    run_inserts = None
22
23
 
23
 
    def test_no_close_on_flush(self):
24
 
        """Flush() doesn't close a connection the session didn't open"""
25
 
 
26
 
        User, users = self.classes.User, self.tables.users
27
 
 
28
 
        c = testing.db.connect()
29
 
        c.execute("select * from users")
30
 
 
31
 
        mapper(User, users)
32
 
        s = create_session(bind=c)
33
 
        s.add(User(name='first'))
34
 
        s.flush()
35
 
        c.execute("select * from users")
36
 
 
37
 
    def test_close(self):
38
 
        """close() doesn't close a connection the session didn't open"""
39
 
 
40
 
        User, users = self.classes.User, self.tables.users
41
 
 
42
 
        c = testing.db.connect()
43
 
        c.execute("select * from users")
44
 
 
45
 
        mapper(User, users)
46
 
        s = create_session(bind=c)
47
 
        s.add(User(name='first'))
48
 
        s.flush()
49
 
        c.execute("select * from users")
50
 
        s.close()
51
 
        c.execute("select * from users")
52
 
 
53
 
    def test_object_session_raises(self):
54
 
        User = self.classes.User
55
 
 
56
 
        assert_raises(
57
 
            orm_exc.UnmappedInstanceError,
58
 
            object_session,
59
 
            object()
60
 
        )
61
 
 
62
 
        assert_raises(
63
 
            orm_exc.UnmappedInstanceError,
64
 
            object_session,
65
 
            User()
66
 
        )
67
 
 
68
 
    @testing.requires.sequences
69
 
    def test_sequence_execute(self):
70
 
        seq = Sequence("some_sequence")
71
 
        seq.create(testing.db)
72
 
        try:
73
 
            sess = create_session(bind=testing.db)
74
 
            eq_(sess.execute(seq), 1)
75
 
        finally:
76
 
            seq.drop(testing.db)
77
 
 
78
 
 
79
 
 
80
 
    @engines.close_open_connections
81
24
    def test_mapped_binds(self):
82
25
        Address, addresses, users, User = (self.classes.Address,
83
26
                                self.tables.addresses,
87
30
 
88
31
        # ensure tables are unbound
89
32
        m2 = sa.MetaData()
90
 
        users_unbound =users.tometadata(m2)
 
33
        users_unbound = users.tometadata(m2)
91
34
        addresses_unbound = addresses.tometadata(m2)
92
35
 
93
36
        mapper(Address, addresses_unbound)
94
37
        mapper(User, users_unbound, properties={
95
 
            'addresses':relationship(Address,
 
38
            'addresses': relationship(Address,
96
39
                                 backref=backref("user", cascade="all"),
97
40
                                 cascade="all")})
98
41
 
99
 
        Session = sessionmaker(binds={User: self.metadata.bind,
 
42
        sess = Session(binds={User: self.metadata.bind,
100
43
                                      Address: self.metadata.bind})
101
 
        sess = Session()
102
44
 
103
45
        u1 = User(id=1, name='ed')
104
46
        sess.add(u1)
105
 
        eq_(sess.query(User).filter(User.id==1).all(),
 
47
        eq_(sess.query(User).filter(User.id == 1).all(),
106
48
            [User(id=1, name='ed')])
107
49
 
108
50
        # test expression binding
120
62
 
121
63
        sess.close()
122
64
 
123
 
    @engines.close_open_connections
124
65
    def test_table_binds(self):
125
66
        Address, addresses, users, User = (self.classes.Address,
126
67
                                self.tables.addresses,
130
71
 
131
72
        # ensure tables are unbound
132
73
        m2 = sa.MetaData()
133
 
        users_unbound =users.tometadata(m2)
 
74
        users_unbound = users.tometadata(m2)
134
75
        addresses_unbound = addresses.tometadata(m2)
135
76
 
136
77
        mapper(Address, addresses_unbound)
137
78
        mapper(User, users_unbound, properties={
138
 
            'addresses':relationship(Address,
 
79
            'addresses': relationship(Address,
139
80
                                 backref=backref("user", cascade="all"),
140
81
                                 cascade="all")})
141
82
 
145
86
 
146
87
        u1 = User(id=1, name='ed')
147
88
        sess.add(u1)
148
 
        eq_(sess.query(User).filter(User.id==1).all(),
 
89
        eq_(sess.query(User).filter(User.id == 1).all(),
149
90
            [User(id=1, name='ed')])
150
91
 
151
92
        sess.execute(users_unbound.insert(), params=dict(id=2, name='jack'))
161
102
 
162
103
        sess.close()
163
104
 
164
 
    @engines.close_open_connections
165
105
    def test_bind_from_metadata(self):
166
106
        users, User = self.tables.users, self.classes.User
167
107
 
177
117
        assert len(session.query(User).filter_by(name='Johnny').all()) == 0
178
118
        session.close()
179
119
 
 
120
    def test_bind_arguments(self):
 
121
        users, Address, addresses, User = (self.tables.users,
 
122
                                self.classes.Address,
 
123
                                self.tables.addresses,
 
124
                                self.classes.User)
 
125
 
 
126
        mapper(User, users)
 
127
        mapper(Address, addresses)
 
128
 
 
129
        e1 = engines.testing_engine()
 
130
        e2 = engines.testing_engine()
 
131
        e3 = engines.testing_engine()
 
132
 
 
133
        sess = Session(e3)
 
134
        sess.bind_mapper(User, e1)
 
135
        sess.bind_mapper(Address, e2)
 
136
 
 
137
        assert sess.connection().engine is e3
 
138
        assert sess.connection(bind=e1).engine is e1
 
139
        assert sess.connection(mapper=Address, bind=e1).engine is e1
 
140
        assert sess.connection(mapper=Address).engine is e2
 
141
        assert sess.connection(clause=addresses.select()).engine is e2
 
142
        assert sess.connection(mapper=User,
 
143
                                clause=addresses.select()).engine is e1
 
144
        assert sess.connection(mapper=User,
 
145
                                clause=addresses.select(),
 
146
                                bind=e2).engine is e2
 
147
 
 
148
        sess.close()
 
149
 
 
150
    @engines.close_open_connections
 
151
    def test_bound_connection(self):
 
152
        users, User = self.tables.users, self.classes.User
 
153
 
 
154
        mapper(User, users)
 
155
        c = testing.db.connect()
 
156
        sess = create_session(bind=c)
 
157
        sess.begin()
 
158
        transaction = sess.transaction
 
159
        u = User(name='u1')
 
160
        sess.add(u)
 
161
        sess.flush()
 
162
        assert transaction._connection_for_bind(testing.db) \
 
163
            is transaction._connection_for_bind(c) is c
 
164
 
 
165
        assert_raises_message(sa.exc.InvalidRequestError,
 
166
                              'Session already has a Connection '
 
167
                              'associated',
 
168
                              transaction._connection_for_bind,
 
169
                              testing.db.connect())
 
170
        transaction.rollback()
 
171
        assert len(sess.query(User).all()) == 0
 
172
        sess.close()
 
173
 
 
174
    def test_bound_connection_transactional(self):
 
175
        User, users = self.classes.User, self.tables.users
 
176
 
 
177
        mapper(User, users)
 
178
        c = testing.db.connect()
 
179
 
 
180
        sess = create_session(bind=c, autocommit=False)
 
181
        u = User(name='u1')
 
182
        sess.add(u)
 
183
        sess.flush()
 
184
        sess.close()
 
185
        assert not c.in_transaction()
 
186
        assert c.scalar("select count(1) from users") == 0
 
187
 
 
188
        sess = create_session(bind=c, autocommit=False)
 
189
        u = User(name='u2')
 
190
        sess.add(u)
 
191
        sess.flush()
 
192
        sess.commit()
 
193
        assert not c.in_transaction()
 
194
        assert c.scalar("select count(1) from users") == 1
 
195
        c.execute("delete from users")
 
196
        assert c.scalar("select count(1) from users") == 0
 
197
 
 
198
        c = testing.db.connect()
 
199
 
 
200
        trans = c.begin()
 
201
        sess = create_session(bind=c, autocommit=True)
 
202
        u = User(name='u3')
 
203
        sess.add(u)
 
204
        sess.flush()
 
205
        assert c.in_transaction()
 
206
        trans.commit()
 
207
        assert not c.in_transaction()
 
208
        assert c.scalar("select count(1) from users") == 1
 
209
 
 
210
class ExecutionTest(_fixtures.FixtureTest):
 
211
    run_inserts = None
 
212
 
 
213
    @testing.requires.sequences
 
214
    def test_sequence_execute(self):
 
215
        seq = Sequence("some_sequence")
 
216
        seq.create(testing.db)
 
217
        try:
 
218
            sess = create_session(bind=testing.db)
 
219
            eq_(sess.execute(seq), 1)
 
220
        finally:
 
221
            seq.drop(testing.db)
 
222
 
 
223
    def test_textual_execute(self):
 
224
        """test that Session.execute() converts to text()"""
 
225
 
 
226
        users = self.tables.users
 
227
 
 
228
 
 
229
        sess = create_session(bind=self.metadata.bind)
 
230
        users.insert().execute(id=7, name='jack')
 
231
 
 
232
        # use :bindparam style
 
233
        eq_(sess.execute("select * from users where id=:id",
 
234
                         {'id': 7}).fetchall(),
 
235
            [(7, u'jack')])
 
236
 
 
237
 
 
238
        # use :bindparam style
 
239
        eq_(sess.scalar("select id from users where id=:id",
 
240
                         {'id': 7}),
 
241
            7)
 
242
 
 
243
    def test_parameter_execute(self):
 
244
        users = self.tables.users
 
245
        sess = Session(bind=testing.db)
 
246
        sess.execute(users.insert(), [
 
247
                {"id": 7, "name": "u7"},
 
248
                {"id": 8, "name": "u8"}
 
249
            ]
 
250
        )
 
251
        sess.execute(users.insert(), {"id": 9, "name": "u9"})
 
252
        eq_(
 
253
            sess.execute(sa.select([users.c.id]).\
 
254
                    order_by(users.c.id)).fetchall(),
 
255
            [(7, ), (8, ), (9, )]
 
256
        )
 
257
 
 
258
 
 
259
class TransScopingTest(_fixtures.FixtureTest):
 
260
    run_inserts = None
 
261
 
 
262
    def test_no_close_on_flush(self):
 
263
        """Flush() doesn't close a connection the session didn't open"""
 
264
 
 
265
        User, users = self.classes.User, self.tables.users
 
266
 
 
267
        c = testing.db.connect()
 
268
        c.execute("select * from users")
 
269
 
 
270
        mapper(User, users)
 
271
        s = create_session(bind=c)
 
272
        s.add(User(name='first'))
 
273
        s.flush()
 
274
        c.execute("select * from users")
 
275
 
 
276
    def test_close(self):
 
277
        """close() doesn't close a connection the session didn't open"""
 
278
 
 
279
        User, users = self.classes.User, self.tables.users
 
280
 
 
281
        c = testing.db.connect()
 
282
        c.execute("select * from users")
 
283
 
 
284
        mapper(User, users)
 
285
        s = create_session(bind=c)
 
286
        s.add(User(name='first'))
 
287
        s.flush()
 
288
        c.execute("select * from users")
 
289
        s.close()
 
290
        c.execute("select * from users")
 
291
 
180
292
    @testing.requires.independent_connections
181
293
    @engines.close_open_connections
182
294
    def test_transaction(self):
199
311
                ).scalar() == 1
200
312
        sess.close()
201
313
 
 
314
class SessionUtilTest(_fixtures.FixtureTest):
 
315
    run_inserts = None
 
316
 
 
317
    def test_object_session_raises(self):
 
318
        User = self.classes.User
 
319
 
 
320
        assert_raises(
 
321
            orm_exc.UnmappedInstanceError,
 
322
            object_session,
 
323
            object()
 
324
        )
 
325
 
 
326
        assert_raises(
 
327
            orm_exc.UnmappedInstanceError,
 
328
            object_session,
 
329
            User()
 
330
        )
 
331
 
 
332
    def test_make_transient(self):
 
333
        users, User = self.tables.users, self.classes.User
 
334
 
 
335
        mapper(User, users)
 
336
        sess = create_session()
 
337
        sess.add(User(name='test'))
 
338
        sess.flush()
 
339
 
 
340
        u1 = sess.query(User).first()
 
341
        make_transient(u1)
 
342
        assert u1 not in sess
 
343
        sess.add(u1)
 
344
        assert u1 in sess.new
 
345
 
 
346
        u1 = sess.query(User).first()
 
347
        sess.expunge(u1)
 
348
        make_transient(u1)
 
349
        sess.add(u1)
 
350
        assert u1 in sess.new
 
351
 
 
352
        # test expired attributes
 
353
        # get unexpired
 
354
        u1 = sess.query(User).first()
 
355
        sess.expire(u1)
 
356
        make_transient(u1)
 
357
        assert u1.id is None
 
358
        assert u1.name is None
 
359
 
 
360
        # works twice
 
361
        make_transient(u1)
 
362
 
 
363
        sess.close()
 
364
 
 
365
        u1.name = 'test2'
 
366
        sess.add(u1)
 
367
        sess.flush()
 
368
        assert u1 in sess
 
369
        sess.delete(u1)
 
370
        sess.flush()
 
371
        assert u1 not in sess
 
372
 
 
373
        assert_raises(sa.exc.InvalidRequestError, sess.add, u1)
 
374
        make_transient(u1)
 
375
        sess.add(u1)
 
376
        sess.flush()
 
377
        assert u1 in sess
 
378
 
 
379
    def test_make_transient_plus_rollback(self):
 
380
        # test for [ticket:2182]
 
381
        users, User = self.tables.users, self.classes.User
 
382
 
 
383
        mapper(User, users)
 
384
        sess = Session()
 
385
        u1 = User(name='test')
 
386
        sess.add(u1)
 
387
        sess.commit()
 
388
 
 
389
        sess.delete(u1)
 
390
        sess.flush()
 
391
        make_transient(u1)
 
392
        sess.rollback()
 
393
 
 
394
class SessionStateTest(_fixtures.FixtureTest):
 
395
    run_inserts = None
 
396
 
 
397
 
202
398
    @testing.requires.independent_connections
203
399
    @engines.close_open_connections
204
400
    def test_autoflush(self):
211
407
 
212
408
        sess = create_session(bind=conn1, autocommit=False, autoflush=True)
213
409
        u = User()
214
 
        u.name='ed'
 
410
        u.name = 'ed'
215
411
        sess.add(u)
216
412
        u2 = sess.query(User).filter_by(name='ed').one()
217
413
        assert u2 is u
239
435
        assert u in sess.query(User).all()
240
436
        assert u not in sess.new
241
437
 
242
 
    def test_make_transient(self):
243
 
        users, User = self.tables.users, self.classes.User
244
 
 
245
 
        mapper(User, users)
246
 
        sess = create_session()
247
 
        sess.add(User(name='test'))
248
 
        sess.flush()
249
 
 
250
 
        u1 = sess.query(User).first()
251
 
        make_transient(u1)
252
 
        assert u1 not in sess
253
 
        sess.add(u1)
254
 
        assert u1 in sess.new
255
 
 
256
 
        u1 = sess.query(User).first()
257
 
        sess.expunge(u1)
258
 
        make_transient(u1)
259
 
        sess.add(u1)
260
 
        assert u1 in sess.new
261
 
 
262
 
        # test expired attributes
263
 
        # get unexpired
264
 
        u1 = sess.query(User).first()
265
 
        sess.expire(u1)
266
 
        make_transient(u1)
267
 
        assert u1.id is None
268
 
        assert u1.name is None
269
 
 
270
 
        # works twice
271
 
        make_transient(u1)
272
 
 
273
 
        sess.close()
274
 
 
275
 
        u1.name = 'test2'
276
 
        sess.add(u1)
277
 
        sess.flush()
278
 
        assert u1 in sess
279
 
        sess.delete(u1)
280
 
        sess.flush()
281
 
        assert u1 not in sess
282
 
 
283
 
        assert_raises(sa.exc.InvalidRequestError, sess.add, u1)
284
 
        make_transient(u1)
285
 
        sess.add(u1)
286
 
        sess.flush()
287
 
        assert u1 in sess
288
 
 
289
 
    def test_make_transient_plus_rollback(self):
290
 
        # test for [ticket:2182]
291
 
        users, User = self.tables.users, self.classes.User
292
 
 
293
 
        mapper(User, users)
294
 
        sess = Session()
295
 
        u1 = User(name='test')
296
 
        sess.add(u1)
297
 
        sess.commit()
298
 
 
299
 
        sess.delete(u1)
300
 
        sess.flush()
301
 
        make_transient(u1)
302
 
        sess.rollback()
303
438
 
304
439
    def test_deleted_flag(self):
305
440
        users, User = self.tables.users, self.classes.User
343
478
                                self.classes.User)
344
479
 
345
480
        mapper(User, users, properties={
346
 
            'addresses':relationship(Address, backref="user")})
 
481
            'addresses': relationship(Address, backref="user")})
347
482
        mapper(Address, addresses)
348
483
 
349
484
        sess = create_session(autoflush=True, autocommit=False)
350
485
        u = User(name='ed', addresses=[Address(email_address='foo')])
351
486
        sess.add(u)
352
 
        eq_(sess.query(Address).filter(Address.user==u).one(),
 
487
        eq_(sess.query(Address).filter(Address.user == u).one(),
353
488
            Address(email_address='foo'))
354
489
 
355
490
        # still works after "u" is garbage collected
356
491
        sess.commit()
357
492
        sess.close()
358
493
        u = sess.query(User).get(u.id)
359
 
        q = sess.query(Address).filter(Address.user==u)
 
494
        q = sess.query(Address).filter(Address.user == u)
360
495
        del u
361
496
        gc_collect()
362
497
        eq_(q.one(), Address(email_address='foo'))
396
531
 
397
532
        mapper(User, users)
398
533
        conn1 = testing.db.connect()
399
 
        conn2 = testing.db.connect()
400
534
        sess = create_session(bind=conn1, autocommit=False,
401
535
                              autoflush=True)
402
536
        u = User()
429
563
        sess.rollback()
430
564
        assert not sess.is_active
431
565
 
432
 
    def test_textual_execute(self):
433
 
        """test that Session.execute() converts to text()"""
434
 
 
435
 
        users = self.tables.users
436
 
 
437
 
 
438
 
        sess = create_session(bind=self.metadata.bind)
439
 
        users.insert().execute(id=7, name='jack')
440
 
 
441
 
        # use :bindparam style
442
 
        eq_(sess.execute("select * from users where id=:id",
443
 
                         {'id':7}).fetchall(),
444
 
            [(7, u'jack')])
445
 
 
446
 
 
447
 
        # use :bindparam style
448
 
        eq_(sess.scalar("select id from users where id=:id",
449
 
                         {'id':7}),
450
 
            7)
451
 
 
452
 
    def test_parameter_execute(self):
453
 
        users = self.tables.users
454
 
        sess = Session(bind=testing.db)
455
 
        sess.execute(users.insert(), [
456
 
                {"id": 7, "name": "u7"},
457
 
                {"id": 8, "name": "u8"}
458
 
            ]
459
 
        )
460
 
        sess.execute(users.insert(), {"id": 9, "name": "u9"})
461
 
        eq_(
462
 
            sess.execute(sa.select([users.c.id]).\
463
 
                    order_by(users.c.id)).fetchall(),
464
 
            [(7, ), (8, ), (9, )]
465
 
        )
466
 
 
467
 
 
468
 
    @engines.close_open_connections
469
 
    def test_bound_connection(self):
470
 
        users, User = self.tables.users, self.classes.User
471
 
 
472
 
        mapper(User, users)
473
 
        c = testing.db.connect()
474
 
        sess = create_session(bind=c)
475
 
        sess.begin()
476
 
        transaction = sess.transaction
477
 
        u = User(name='u1')
478
 
        sess.add(u)
479
 
        sess.flush()
480
 
        assert transaction._connection_for_bind(testing.db) \
481
 
            is transaction._connection_for_bind(c) is c
482
 
 
483
 
        assert_raises_message(sa.exc.InvalidRequestError,
484
 
                              'Session already has a Connection '
485
 
                              'associated',
486
 
                              transaction._connection_for_bind,
487
 
                              testing.db.connect())
488
 
        transaction.rollback()
489
 
        assert len(sess.query(User).all()) == 0
490
 
        sess.close()
491
 
 
492
 
    def test_bound_connection_transactional(self):
493
 
        User, users = self.classes.User, self.tables.users
494
 
 
495
 
        mapper(User, users)
496
 
        c = testing.db.connect()
497
 
 
498
 
        sess = create_session(bind=c, autocommit=False)
499
 
        u = User(name='u1')
500
 
        sess.add(u)
501
 
        sess.flush()
502
 
        sess.close()
503
 
        assert not c.in_transaction()
504
 
        assert c.scalar("select count(1) from users") == 0
505
 
 
506
 
        sess = create_session(bind=c, autocommit=False)
507
 
        u = User(name='u2')
508
 
        sess.add(u)
509
 
        sess.flush()
510
 
        sess.commit()
511
 
        assert not c.in_transaction()
512
 
        assert c.scalar("select count(1) from users") == 1
513
 
        c.execute("delete from users")
514
 
        assert c.scalar("select count(1) from users") == 0
515
 
 
516
 
        c = testing.db.connect()
517
 
 
518
 
        trans = c.begin()
519
 
        sess = create_session(bind=c, autocommit=True)
520
 
        u = User(name='u3')
521
 
        sess.add(u)
522
 
        sess.flush()
523
 
        assert c.in_transaction()
524
 
        trans.commit()
525
 
        assert not c.in_transaction()
526
 
        assert c.scalar("select count(1) from users") == 1
527
 
 
528
 
    def test_bind_arguments(self):
529
 
        users, Address, addresses, User = (self.tables.users,
530
 
                                self.classes.Address,
531
 
                                self.tables.addresses,
532
 
                                self.classes.User)
533
 
 
534
 
        mapper(User, users)
535
 
        mapper(Address, addresses)
536
 
 
537
 
        e1 = engines.testing_engine()
538
 
        e2 = engines.testing_engine()
539
 
        e3 = engines.testing_engine()
540
 
 
541
 
        sess = Session(e3)
542
 
        sess.bind_mapper(User, e1)
543
 
        sess.bind_mapper(Address, e2)
544
 
 
545
 
        assert sess.connection().engine is e3
546
 
        assert sess.connection(bind=e1).engine is e1
547
 
        assert sess.connection(mapper=Address, bind=e1).engine is e1
548
 
        assert sess.connection(mapper=Address).engine is e2
549
 
        assert sess.connection(clause=addresses.select()).engine is e2
550
 
        assert sess.connection(mapper=User,
551
 
                                clause=addresses.select()).engine is e1
552
 
        assert sess.connection(mapper=User,
553
 
                                clause=addresses.select(),
554
 
                                bind=e2).engine is e2
555
 
 
556
 
        sess.close()
557
566
 
558
567
    @engines.close_open_connections
559
568
    def test_add_delete(self):
565
574
 
566
575
        s = create_session()
567
576
        mapper(User, users, properties={
568
 
            'addresses':relationship(Address, cascade="all, delete")
 
577
            'addresses': relationship(Address, cascade="all, delete")
569
578
        })
570
579
        mapper(Address, addresses)
571
580
 
616
625
        assert s.query(User).count() == 0
617
626
 
618
627
 
619
 
    def test_weak_ref(self):
620
 
        """test the weak-referencing identity map, which strongly-
621
 
        references modified items."""
622
 
 
623
 
        users, User = self.tables.users, self.classes.User
624
 
 
625
 
 
626
 
        s = create_session()
627
 
        mapper(User, users)
628
 
 
629
 
        s.add(User(name='ed'))
630
 
        s.flush()
631
 
        assert not s.dirty
632
 
 
633
 
        user = s.query(User).one()
634
 
        del user
635
 
        gc_collect()
636
 
        assert len(s.identity_map) == 0
637
 
 
638
 
        user = s.query(User).one()
639
 
        user.name = 'fred'
640
 
        del user
641
 
        gc_collect()
642
 
        assert len(s.identity_map) == 1
643
 
        assert len(s.dirty) == 1
644
 
        assert None not in s.dirty
645
 
        s.flush()
646
 
        gc_collect()
647
 
        assert not s.dirty
648
 
        assert not s.identity_map
649
 
 
650
 
        user = s.query(User).one()
651
 
        assert user.name == 'fred'
652
 
        assert s.identity_map
653
 
 
654
 
    def test_weak_ref_pickled(self):
655
 
        users, User = self.tables.users, pickleable.User
656
 
 
657
 
        s = create_session()
658
 
        mapper(User, users)
659
 
 
660
 
        s.add(User(name='ed'))
661
 
        s.flush()
662
 
        assert not s.dirty
663
 
 
664
 
        user = s.query(User).one()
665
 
        user.name = 'fred'
666
 
        s.expunge(user)
667
 
 
668
 
        u2 = pickle.loads(pickle.dumps(user))
669
 
 
670
 
        del user
671
 
        s.add(u2)
672
 
 
673
 
        del u2
674
 
        gc_collect()
675
 
 
676
 
        assert len(s.identity_map) == 1
677
 
        assert len(s.dirty) == 1
678
 
        assert None not in s.dirty
679
 
        s.flush()
680
 
        gc_collect()
681
 
        assert not s.dirty
682
 
 
683
 
        assert not s.identity_map
684
 
 
685
628
    @testing.uses_deprecated()
686
629
    def test_identity_conflict(self):
687
630
        users, User = self.tables.users, self.classes.User
703
646
            assert_raises(AssertionError, s.identity_map.add,
704
647
                          sa.orm.attributes.instance_state(u2))
705
648
 
706
 
 
707
 
    def test_weakref_with_cycles_o2m(self):
708
 
        Address, addresses, users, User = (self.classes.Address,
709
 
                                self.tables.addresses,
710
 
                                self.tables.users,
711
 
                                self.classes.User)
712
 
 
713
 
        s = sessionmaker()()
714
 
        mapper(User, users, properties={
715
 
            "addresses":relationship(Address, backref="user")
716
 
        })
717
 
        mapper(Address, addresses)
718
 
        s.add(User(name="ed", addresses=[Address(email_address="ed1")]))
719
 
        s.commit()
720
 
 
721
 
        user = s.query(User).options(joinedload(User.addresses)).one()
722
 
        user.addresses[0].user # lazyload
723
 
        eq_(user, User(name="ed", addresses=[Address(email_address="ed1")]))
724
 
 
725
 
        del user
726
 
        gc_collect()
727
 
        assert len(s.identity_map) == 0
728
 
 
729
 
        user = s.query(User).options(joinedload(User.addresses)).one()
730
 
        user.addresses[0].email_address='ed2'
731
 
        user.addresses[0].user # lazyload
732
 
        del user
733
 
        gc_collect()
734
 
        assert len(s.identity_map) == 2
735
 
 
736
 
        s.commit()
737
 
        user = s.query(User).options(joinedload(User.addresses)).one()
738
 
        eq_(user, User(name="ed", addresses=[Address(email_address="ed2")]))
739
 
 
740
 
    def test_weakref_with_cycles_o2o(self):
741
 
        Address, addresses, users, User = (self.classes.Address,
742
 
                                self.tables.addresses,
743
 
                                self.tables.users,
744
 
                                self.classes.User)
745
 
 
746
 
        s = sessionmaker()()
747
 
        mapper(User, users, properties={
748
 
            "address":relationship(Address, backref="user", uselist=False)
749
 
        })
750
 
        mapper(Address, addresses)
751
 
        s.add(User(name="ed", address=Address(email_address="ed1")))
752
 
        s.commit()
753
 
 
754
 
        user = s.query(User).options(joinedload(User.address)).one()
755
 
        user.address.user
756
 
        eq_(user, User(name="ed", address=Address(email_address="ed1")))
757
 
 
758
 
        del user
759
 
        gc_collect()
760
 
        assert len(s.identity_map) == 0
761
 
 
762
 
        user = s.query(User).options(joinedload(User.address)).one()
763
 
        user.address.email_address='ed2'
764
 
        user.address.user # lazyload
765
 
 
766
 
        del user
767
 
        gc_collect()
768
 
        assert len(s.identity_map) == 2
769
 
 
770
 
        s.commit()
771
 
        user = s.query(User).options(joinedload(User.address)).one()
772
 
        eq_(user, User(name="ed", address=Address(email_address="ed2")))
773
 
 
774
 
    @testing.uses_deprecated()
775
 
    def test_strong_ref(self):
776
 
        users, User = self.tables.users, self.classes.User
777
 
 
778
 
        s = create_session(weak_identity_map=False)
779
 
        mapper(User, users)
780
 
 
781
 
        # save user
782
 
        s.add(User(name='u1'))
783
 
        s.flush()
784
 
        user = s.query(User).one()
785
 
        user = None
786
 
        print s.identity_map
787
 
        gc_collect()
788
 
        assert len(s.identity_map) == 1
789
 
 
790
 
        user = s.query(User).one()
791
 
        assert not s.identity_map._modified
792
 
        user.name = 'u2'
793
 
        assert s.identity_map._modified
794
 
        s.flush()
795
 
        eq_(users.select().execute().fetchall(), [(user.id, 'u2')])
796
 
 
797
 
    @testing.uses_deprecated()
798
 
    @testing.fails_if(lambda: pypy, "pypy has a real GC")
799
 
    @testing.fails_on('+zxjdbc', 'http://www.sqlalchemy.org/trac/ticket/1473')
800
 
    def test_prune(self):
801
 
        users, User = self.tables.users, self.classes.User
802
 
 
803
 
        s = create_session(weak_identity_map=False)
804
 
        mapper(User, users)
805
 
 
806
 
        for o in [User(name='u%s' % x) for x in xrange(10)]:
807
 
            s.add(o)
808
 
        # o is still live after this loop...
809
 
 
810
 
        self.assert_(len(s.identity_map) == 0)
811
 
        self.assert_(s.prune() == 0)
812
 
        s.flush()
813
 
        gc_collect()
814
 
        self.assert_(s.prune() == 9)
815
 
        self.assert_(len(s.identity_map) == 1)
816
 
 
817
 
        id = o.id
818
 
        del o
819
 
        self.assert_(s.prune() == 1)
820
 
        self.assert_(len(s.identity_map) == 0)
821
 
 
822
 
        u = s.query(User).get(id)
823
 
        self.assert_(s.prune() == 0)
824
 
        self.assert_(len(s.identity_map) == 1)
825
 
        u.name = 'squiznart'
826
 
        del u
827
 
        self.assert_(s.prune() == 0)
828
 
        self.assert_(len(s.identity_map) == 1)
829
 
        s.flush()
830
 
        self.assert_(s.prune() == 1)
831
 
        self.assert_(len(s.identity_map) == 0)
832
 
 
833
 
        s.add(User(name='x'))
834
 
        self.assert_(s.prune() == 0)
835
 
        self.assert_(len(s.identity_map) == 0)
836
 
        s.flush()
837
 
        self.assert_(len(s.identity_map) == 1)
838
 
        self.assert_(s.prune() == 1)
839
 
        self.assert_(len(s.identity_map) == 0)
840
 
 
841
 
        u = s.query(User).get(id)
842
 
        s.delete(u)
843
 
        del u
844
 
        self.assert_(s.prune() == 0)
845
 
        self.assert_(len(s.identity_map) == 1)
846
 
        s.flush()
847
 
        self.assert_(s.prune() == 0)
848
 
        self.assert_(len(s.identity_map) == 0)
849
 
 
850
 
 
851
649
    def test_pickled_update(self):
852
650
        users, User = self.tables.users, pickleable.User
853
651
 
931
729
        sess.add_all([User(name='u1'), User(name='u2'), User(name='u3')])
932
730
        sess.commit()
933
731
 
 
732
        # TODO: what are we testing here ?   that iteritems() can
 
733
        # withstand a change?  should this be
 
734
        # more directly attempting to manipulate the identity_map ?
934
735
        u1, u2, u3 = sess.query(User).all()
935
736
        for i, (key, value) in enumerate(sess.identity_map.iteritems()):
936
737
            if i == 2:
937
738
                del u3
938
739
                gc_collect()
939
740
 
940
 
    def test_auto_detach_on_gc_session(self):
 
741
    def _test_extra_dirty_state(self):
 
742
        users, User = self.tables.users, self.classes.User
 
743
        m = mapper(User, users)
 
744
 
 
745
        s = Session()
 
746
 
 
747
        @event.listens_for(m, "after_update")
 
748
        def e(mapper, conn, target):
 
749
            sess = object_session(target)
 
750
            for entry in sess.identity_map.values():
 
751
                entry.name = "5"
 
752
 
 
753
        a1, a2 = User(name="1"), User(name="2")
 
754
 
 
755
        s.add_all([a1, a2])
 
756
        s.commit()
 
757
 
 
758
        a1.name = "3"
 
759
        return s, a1, a2
 
760
 
 
761
    def test_extra_dirty_state_post_flush_warning(self):
 
762
        s, a1, a2 = self._test_extra_dirty_state()
 
763
        assert_raises_message(
 
764
            sa.exc.SAWarning,
 
765
            "Attribute history events accumulated on 1 previously "
 
766
            "clean instances",
 
767
            s.commit
 
768
        )
 
769
 
 
770
    def test_extra_dirty_state_post_flush_state(self):
 
771
        s, a1, a2 = self._test_extra_dirty_state()
 
772
        canary = []
 
773
        @event.listens_for(s, "after_flush_postexec")
 
774
        def e(sess, ctx):
 
775
            canary.append(bool(sess.identity_map._modified))
 
776
 
 
777
        @testing.emits_warning("Attribute")
 
778
        def go():
 
779
            s.commit()
 
780
        go()
 
781
        eq_(canary, [False])
 
782
 
 
783
    def test_deleted_expunged(self):
941
784
        users, User = self.tables.users, self.classes.User
942
785
 
943
786
        mapper(User, users)
944
 
 
945
787
        sess = Session()
946
 
 
947
 
        u1 = User(name='u1')
948
 
        sess.add(u1)
949
 
        sess.commit()
950
 
 
951
 
        # can't add u1 to Session,
952
 
        # already belongs to u2
953
 
        s2 = Session()
954
 
        assert_raises_message(
955
 
            sa.exc.InvalidRequestError,
956
 
            r".*is already attached to session",
957
 
            s2.add, u1
958
 
        )
959
 
 
960
 
        # garbage collect sess
961
 
        del sess
962
 
        gc_collect()
963
 
 
964
 
        # s2 lets it in now despite u1 having
965
 
        # session_key
966
 
        s2.add(u1)
967
 
        assert u1 in s2
968
 
 
969
 
class SessionDataTest(_fixtures.FixtureTest):
970
 
    def test_expunge_cascade(self):
971
 
        Address, addresses, users, User = (self.classes.Address,
972
 
                                self.tables.addresses,
973
 
                                self.tables.users,
974
 
                                self.classes.User)
975
 
 
976
 
        mapper(Address, addresses)
977
 
        mapper(User, users, properties={
978
 
            'addresses':relationship(Address,
979
 
                                 backref=backref("user", cascade="all"),
980
 
                                 cascade="all")})
981
 
 
982
 
        session = create_session()
983
 
        u = session.query(User).filter_by(id=7).one()
984
 
 
985
 
        # get everything to load in both directions
986
 
        print [a.user for a in u.addresses]
987
 
 
988
 
        # then see if expunge fails
989
 
        session.expunge(u)
990
 
 
991
 
        assert sa.orm.object_session(u) is None
992
 
        assert sa.orm.attributes.instance_state(u).session_id is None
993
 
        for a in u.addresses:
994
 
            assert sa.orm.object_session(a) is None
995
 
            assert sa.orm.attributes.instance_state(a).session_id is None
 
788
        sess.add(User(name='x'))
 
789
        sess.commit()
 
790
 
 
791
        u1 = sess.query(User).first()
 
792
        sess.delete(u1)
 
793
 
 
794
        assert not was_deleted(u1)
 
795
        sess.flush()
 
796
 
 
797
        assert was_deleted(u1)
 
798
        assert u1 not in sess
 
799
        assert object_session(u1) is sess
 
800
        sess.commit()
 
801
 
 
802
        assert object_session(u1) is None
 
803
 
 
804
class SessionStateWFixtureTest(_fixtures.FixtureTest):
996
805
 
997
806
    def test_autoflush_rollback(self):
998
807
        Address, addresses, users, User = (self.classes.Address,
1002
811
 
1003
812
        mapper(Address, addresses)
1004
813
        mapper(User, users, properties={
1005
 
            'addresses':relationship(Address)})
 
814
            'addresses': relationship(Address)})
1006
815
 
1007
816
        sess = create_session(autocommit=False, autoflush=True)
1008
817
        u = sess.query(User).get(8)
1020
829
        # pending objects dont get expired
1021
830
        assert newad.email_address == 'a new address'
1022
831
 
 
832
    def test_expunge_cascade(self):
 
833
        Address, addresses, users, User = (self.classes.Address,
 
834
                                self.tables.addresses,
 
835
                                self.tables.users,
 
836
                                self.classes.User)
 
837
 
 
838
        mapper(Address, addresses)
 
839
        mapper(User, users, properties={
 
840
            'addresses': relationship(Address,
 
841
                                 backref=backref("user", cascade="all"),
 
842
                                 cascade="all")})
 
843
 
 
844
        session = create_session()
 
845
        u = session.query(User).filter_by(id=7).one()
 
846
 
 
847
        # get everything to load in both directions
 
848
        print [a.user for a in u.addresses]
 
849
 
 
850
        # then see if expunge fails
 
851
        session.expunge(u)
 
852
 
 
853
        assert sa.orm.object_session(u) is None
 
854
        assert sa.orm.attributes.instance_state(u).session_id is None
 
855
        for a in u.addresses:
 
856
            assert sa.orm.object_session(a) is None
 
857
            assert sa.orm.attributes.instance_state(a).session_id is None
 
858
 
 
859
 
 
860
class NoCyclesOnTransientDetachedTest(_fixtures.FixtureTest):
 
861
    """Test the instance_state._strong_obj link that it
 
862
    is present only on persistent/pending objects and never
 
863
    transient/detached.
 
864
 
 
865
    """
 
866
    run_inserts = None
 
867
 
 
868
    def setup(self):
 
869
        mapper(self.classes.User, self.tables.users)
 
870
 
 
871
    def _assert_modified(self, u1):
 
872
        assert sa.orm.attributes.instance_state(u1).modified
 
873
 
 
874
    def _assert_not_modified(self, u1):
 
875
        assert not sa.orm.attributes.instance_state(u1).modified
 
876
 
 
877
    def _assert_cycle(self, u1):
 
878
        assert sa.orm.attributes.instance_state(u1)._strong_obj is not None
 
879
 
 
880
    def _assert_no_cycle(self, u1):
 
881
        assert sa.orm.attributes.instance_state(u1)._strong_obj is None
 
882
 
 
883
    def _persistent_fixture(self):
 
884
        User = self.classes.User
 
885
        u1 = User()
 
886
        u1.name = "ed"
 
887
        sess = Session()
 
888
        sess.add(u1)
 
889
        sess.flush()
 
890
        return sess, u1
 
891
 
 
892
    def test_transient(self):
 
893
        User = self.classes.User
 
894
        u1 = User()
 
895
        u1.name = 'ed'
 
896
        self._assert_no_cycle(u1)
 
897
        self._assert_modified(u1)
 
898
 
 
899
    def test_transient_to_pending(self):
 
900
        User = self.classes.User
 
901
        u1 = User()
 
902
        u1.name = 'ed'
 
903
        self._assert_modified(u1)
 
904
        self._assert_no_cycle(u1)
 
905
        sess = Session()
 
906
        sess.add(u1)
 
907
        self._assert_cycle(u1)
 
908
        sess.flush()
 
909
        self._assert_no_cycle(u1)
 
910
        self._assert_not_modified(u1)
 
911
 
 
912
    def test_dirty_persistent_to_detached_via_expunge(self):
 
913
        sess, u1 = self._persistent_fixture()
 
914
        u1.name = 'edchanged'
 
915
        self._assert_cycle(u1)
 
916
        sess.expunge(u1)
 
917
        self._assert_no_cycle(u1)
 
918
 
 
919
    def test_dirty_persistent_to_detached_via_close(self):
 
920
        sess, u1 = self._persistent_fixture()
 
921
        u1.name = 'edchanged'
 
922
        self._assert_cycle(u1)
 
923
        sess.close()
 
924
        self._assert_no_cycle(u1)
 
925
 
 
926
    def test_clean_persistent_to_detached_via_close(self):
 
927
        sess, u1 = self._persistent_fixture()
 
928
        self._assert_no_cycle(u1)
 
929
        self._assert_not_modified(u1)
 
930
        sess.close()
 
931
        u1.name = 'edchanged'
 
932
        self._assert_modified(u1)
 
933
        self._assert_no_cycle(u1)
 
934
 
 
935
    def test_detached_to_dirty_deleted(self):
 
936
        sess, u1 = self._persistent_fixture()
 
937
        sess.expunge(u1)
 
938
        u1.name = 'edchanged'
 
939
        self._assert_no_cycle(u1)
 
940
        sess.delete(u1)
 
941
        self._assert_cycle(u1)
 
942
 
 
943
    def test_detached_to_dirty_persistent(self):
 
944
        sess, u1 = self._persistent_fixture()
 
945
        sess.expunge(u1)
 
946
        u1.name = 'edchanged'
 
947
        self._assert_modified(u1)
 
948
        self._assert_no_cycle(u1)
 
949
        sess.add(u1)
 
950
        self._assert_cycle(u1)
 
951
        self._assert_modified(u1)
 
952
 
 
953
    def test_detached_to_clean_persistent(self):
 
954
        sess, u1 = self._persistent_fixture()
 
955
        sess.expunge(u1)
 
956
        self._assert_no_cycle(u1)
 
957
        self._assert_not_modified(u1)
 
958
        sess.add(u1)
 
959
        self._assert_no_cycle(u1)
 
960
        self._assert_not_modified(u1)
 
961
 
 
962
    def test_move_persistent_clean(self):
 
963
        sess, u1 = self._persistent_fixture()
 
964
        sess.close()
 
965
        s2 = Session()
 
966
        s2.add(u1)
 
967
        self._assert_no_cycle(u1)
 
968
        self._assert_not_modified(u1)
 
969
 
 
970
    def test_move_persistent_dirty(self):
 
971
        sess, u1 = self._persistent_fixture()
 
972
        u1.name = 'edchanged'
 
973
        self._assert_cycle(u1)
 
974
        self._assert_modified(u1)
 
975
        sess.close()
 
976
        self._assert_no_cycle(u1)
 
977
        s2 = Session()
 
978
        s2.add(u1)
 
979
        self._assert_cycle(u1)
 
980
        self._assert_modified(u1)
 
981
 
 
982
    @testing.requires.predictable_gc
 
983
    def test_move_gc_session_persistent_dirty(self):
 
984
        sess, u1 = self._persistent_fixture()
 
985
        u1.name = 'edchanged'
 
986
        self._assert_cycle(u1)
 
987
        self._assert_modified(u1)
 
988
        del sess
 
989
        gc_collect()
 
990
        self._assert_cycle(u1)
 
991
        s2 = Session()
 
992
        s2.add(u1)
 
993
        self._assert_cycle(u1)
 
994
        self._assert_modified(u1)
 
995
 
 
996
    def test_persistent_dirty_to_expired(self):
 
997
        sess, u1 = self._persistent_fixture()
 
998
        u1.name = 'edchanged'
 
999
        self._assert_cycle(u1)
 
1000
        self._assert_modified(u1)
 
1001
        sess.expire(u1)
 
1002
        self._assert_no_cycle(u1)
 
1003
        self._assert_not_modified(u1)
 
1004
 
 
1005
class WeakIdentityMapTest(_fixtures.FixtureTest):
 
1006
    run_inserts = None
 
1007
 
 
1008
    @testing.requires.predictable_gc
 
1009
    def test_weakref(self):
 
1010
        """test the weak-referencing identity map, which strongly-
 
1011
        references modified items."""
 
1012
 
 
1013
        users, User = self.tables.users, self.classes.User
 
1014
 
 
1015
 
 
1016
        s = create_session()
 
1017
        mapper(User, users)
 
1018
 
 
1019
        s.add(User(name='ed'))
 
1020
        s.flush()
 
1021
        assert not s.dirty
 
1022
 
 
1023
        user = s.query(User).one()
 
1024
        del user
 
1025
        gc_collect()
 
1026
        assert len(s.identity_map) == 0
 
1027
 
 
1028
        user = s.query(User).one()
 
1029
        user.name = 'fred'
 
1030
        del user
 
1031
        gc_collect()
 
1032
        assert len(s.identity_map) == 1
 
1033
        assert len(s.dirty) == 1
 
1034
        assert None not in s.dirty
 
1035
        s.flush()
 
1036
        gc_collect()
 
1037
        assert not s.dirty
 
1038
        assert not s.identity_map
 
1039
 
 
1040
        user = s.query(User).one()
 
1041
        assert user.name == 'fred'
 
1042
        assert s.identity_map
 
1043
 
 
1044
    @testing.requires.predictable_gc
 
1045
    def test_weakref_pickled(self):
 
1046
        users, User = self.tables.users, pickleable.User
 
1047
 
 
1048
        s = create_session()
 
1049
        mapper(User, users)
 
1050
 
 
1051
        s.add(User(name='ed'))
 
1052
        s.flush()
 
1053
        assert not s.dirty
 
1054
 
 
1055
        user = s.query(User).one()
 
1056
        user.name = 'fred'
 
1057
        s.expunge(user)
 
1058
 
 
1059
        u2 = pickle.loads(pickle.dumps(user))
 
1060
 
 
1061
        del user
 
1062
        s.add(u2)
 
1063
 
 
1064
        del u2
 
1065
        gc_collect()
 
1066
 
 
1067
        assert len(s.identity_map) == 1
 
1068
        assert len(s.dirty) == 1
 
1069
        assert None not in s.dirty
 
1070
        s.flush()
 
1071
        gc_collect()
 
1072
        assert not s.dirty
 
1073
 
 
1074
        assert not s.identity_map
 
1075
 
 
1076
    @testing.requires.predictable_gc
 
1077
    def test_weakref_with_cycles_o2m(self):
 
1078
        Address, addresses, users, User = (self.classes.Address,
 
1079
                                self.tables.addresses,
 
1080
                                self.tables.users,
 
1081
                                self.classes.User)
 
1082
 
 
1083
        s = sessionmaker()()
 
1084
        mapper(User, users, properties={
 
1085
            "addresses": relationship(Address, backref="user")
 
1086
        })
 
1087
        mapper(Address, addresses)
 
1088
        s.add(User(name="ed", addresses=[Address(email_address="ed1")]))
 
1089
        s.commit()
 
1090
 
 
1091
        user = s.query(User).options(joinedload(User.addresses)).one()
 
1092
        user.addresses[0].user  # lazyload
 
1093
        eq_(user, User(name="ed", addresses=[Address(email_address="ed1")]))
 
1094
 
 
1095
        del user
 
1096
        gc_collect()
 
1097
        assert len(s.identity_map) == 0
 
1098
 
 
1099
        user = s.query(User).options(joinedload(User.addresses)).one()
 
1100
        user.addresses[0].email_address = 'ed2'
 
1101
        user.addresses[0].user  # lazyload
 
1102
        del user
 
1103
        gc_collect()
 
1104
        assert len(s.identity_map) == 2
 
1105
 
 
1106
        s.commit()
 
1107
        user = s.query(User).options(joinedload(User.addresses)).one()
 
1108
        eq_(user, User(name="ed", addresses=[Address(email_address="ed2")]))
 
1109
 
 
1110
    @testing.requires.predictable_gc
 
1111
    def test_weakref_with_cycles_o2o(self):
 
1112
        Address, addresses, users, User = (self.classes.Address,
 
1113
                                self.tables.addresses,
 
1114
                                self.tables.users,
 
1115
                                self.classes.User)
 
1116
 
 
1117
        s = sessionmaker()()
 
1118
        mapper(User, users, properties={
 
1119
            "address": relationship(Address, backref="user", uselist=False)
 
1120
        })
 
1121
        mapper(Address, addresses)
 
1122
        s.add(User(name="ed", address=Address(email_address="ed1")))
 
1123
        s.commit()
 
1124
 
 
1125
        user = s.query(User).options(joinedload(User.address)).one()
 
1126
        user.address.user
 
1127
        eq_(user, User(name="ed", address=Address(email_address="ed1")))
 
1128
 
 
1129
        del user
 
1130
        gc_collect()
 
1131
        assert len(s.identity_map) == 0
 
1132
 
 
1133
        user = s.query(User).options(joinedload(User.address)).one()
 
1134
        user.address.email_address = 'ed2'
 
1135
        user.address.user  # lazyload
 
1136
 
 
1137
        del user
 
1138
        gc_collect()
 
1139
        assert len(s.identity_map) == 2
 
1140
 
 
1141
        s.commit()
 
1142
        user = s.query(User).options(joinedload(User.address)).one()
 
1143
        eq_(user, User(name="ed", address=Address(email_address="ed2")))
 
1144
 
 
1145
    def test_auto_detach_on_gc_session(self):
 
1146
        users, User = self.tables.users, self.classes.User
 
1147
 
 
1148
        mapper(User, users)
 
1149
 
 
1150
        sess = Session()
 
1151
 
 
1152
        u1 = User(name='u1')
 
1153
        sess.add(u1)
 
1154
        sess.commit()
 
1155
 
 
1156
        # can't add u1 to Session,
 
1157
        # already belongs to u2
 
1158
        s2 = Session()
 
1159
        assert_raises_message(
 
1160
            sa.exc.InvalidRequestError,
 
1161
            r".*is already attached to session",
 
1162
            s2.add, u1
 
1163
        )
 
1164
 
 
1165
        # garbage collect sess
 
1166
        del sess
 
1167
        gc_collect()
 
1168
 
 
1169
        # s2 lets it in now despite u1 having
 
1170
        # session_key
 
1171
        s2.add(u1)
 
1172
        assert u1 in s2
 
1173
 
 
1174
 
 
1175
class StrongIdentityMapTest(_fixtures.FixtureTest):
 
1176
    run_inserts = None
 
1177
 
 
1178
    @testing.uses_deprecated()
 
1179
    def test_strong_ref(self):
 
1180
        users, User = self.tables.users, self.classes.User
 
1181
 
 
1182
        s = create_session(weak_identity_map=False)
 
1183
        mapper(User, users)
 
1184
 
 
1185
        # save user
 
1186
        s.add(User(name='u1'))
 
1187
        s.flush()
 
1188
        user = s.query(User).one()
 
1189
        user = None
 
1190
        print s.identity_map
 
1191
        gc_collect()
 
1192
        assert len(s.identity_map) == 1
 
1193
 
 
1194
        user = s.query(User).one()
 
1195
        assert not s.identity_map._modified
 
1196
        user.name = 'u2'
 
1197
        assert s.identity_map._modified
 
1198
        s.flush()
 
1199
        eq_(users.select().execute().fetchall(), [(user.id, 'u2')])
 
1200
 
 
1201
    @testing.uses_deprecated()
 
1202
    @testing.fails_if(lambda: pypy, "pypy has a real GC")
 
1203
    @testing.fails_on('+zxjdbc', 'http://www.sqlalchemy.org/trac/ticket/1473')
 
1204
    def test_prune(self):
 
1205
        users, User = self.tables.users, self.classes.User
 
1206
 
 
1207
        s = create_session(weak_identity_map=False)
 
1208
        mapper(User, users)
 
1209
 
 
1210
        for o in [User(name='u%s' % x) for x in xrange(10)]:
 
1211
            s.add(o)
 
1212
        # o is still live after this loop...
 
1213
 
 
1214
        self.assert_(len(s.identity_map) == 0)
 
1215
        self.assert_(s.prune() == 0)
 
1216
        s.flush()
 
1217
        gc_collect()
 
1218
        self.assert_(s.prune() == 9)
 
1219
        self.assert_(len(s.identity_map) == 1)
 
1220
 
 
1221
        id = o.id
 
1222
        del o
 
1223
        self.assert_(s.prune() == 1)
 
1224
        self.assert_(len(s.identity_map) == 0)
 
1225
 
 
1226
        u = s.query(User).get(id)
 
1227
        self.assert_(s.prune() == 0)
 
1228
        self.assert_(len(s.identity_map) == 1)
 
1229
        u.name = 'squiznart'
 
1230
        del u
 
1231
        self.assert_(s.prune() == 0)
 
1232
        self.assert_(len(s.identity_map) == 1)
 
1233
        s.flush()
 
1234
        self.assert_(s.prune() == 1)
 
1235
        self.assert_(len(s.identity_map) == 0)
 
1236
 
 
1237
        s.add(User(name='x'))
 
1238
        self.assert_(s.prune() == 0)
 
1239
        self.assert_(len(s.identity_map) == 0)
 
1240
        s.flush()
 
1241
        self.assert_(len(s.identity_map) == 1)
 
1242
        self.assert_(s.prune() == 1)
 
1243
        self.assert_(len(s.identity_map) == 0)
 
1244
 
 
1245
        u = s.query(User).get(id)
 
1246
        s.delete(u)
 
1247
        del u
 
1248
        self.assert_(s.prune() == 0)
 
1249
        self.assert_(len(s.identity_map) == 1)
 
1250
        s.flush()
 
1251
        self.assert_(s.prune() == 0)
 
1252
        self.assert_(len(s.identity_map) == 0)
 
1253
 
 
1254
 
1023
1255
class IsModifiedTest(_fixtures.FixtureTest):
1024
1256
    run_inserts = None
1025
1257
 
1027
1259
        User, Address = self.classes.User, self.classes.Address
1028
1260
        users, addresses = self.tables.users, self.tables.addresses
1029
1261
        mapper(User, users, properties={
1030
 
            "addresses":relationship(Address)
 
1262
            "addresses": relationship(Address)
1031
1263
        })
1032
1264
        mapper(Address, addresses)
1033
1265
        return User, Address
1063
1295
        assert not s.is_modified(user, include_collections=False)
1064
1296
 
1065
1297
    def test_is_modified_passive_off(self):
 
1298
        """as of 0.8 no SQL is emitted for is_modified()
 
1299
        regardless of the passive flag"""
 
1300
 
1066
1301
        User, Address = self._default_mapping_fixture()
1067
1302
 
1068
1303
        s = Session()
1077
1312
        self.assert_sql_count(
1078
1313
            testing.db,
1079
1314
            go,
1080
 
            1
 
1315
            0
1081
1316
        )
1082
1317
 
1083
1318
        s.expire_all()
1086
1321
        # can't predict result here
1087
1322
        # deterministically, depending on if
1088
1323
        # 'name' or 'addresses' is tested first
1089
 
        mod  = s.is_modified(u)
 
1324
        mod = s.is_modified(u)
1090
1325
        addresses_loaded = 'addresses' in u.__dict__
1091
1326
        assert mod is not addresses_loaded
1092
1327
 
1121
1356
 
1122
1357
        s = sessionmaker()()
1123
1358
 
1124
 
        mapper(User, users, properties={'uname':sa.orm.synonym('name')})
 
1359
        mapper(User, users, properties={'uname': sa.orm.synonym('name')})
1125
1360
        u = User(uname='fred')
1126
1361
        assert s.is_modified(u)
1127
1362
        s.add(u)
1135
1370
 
1136
1371
    @classmethod
1137
1372
    def define_tables(cls, metadata):
1138
 
        global t1
1139
 
        t1 = Table('t1', metadata, Column('id', Integer,
 
1373
        Table('t1', metadata, Column('id', Integer,
1140
1374
                   primary_key=True, test_needs_autoincrement=True),
1141
1375
                   Column('data', String(50)))
1142
1376
 
1143
1377
    @classmethod
1144
 
    def setup_mappers(cls):
1145
 
        global T
1146
 
        class T(object):
 
1378
    def setup_classes(cls):
 
1379
        class T(cls.Basic):
1147
1380
            def __init__(self, data):
1148
1381
                self.data = data
1149
 
        mapper(T, t1)
 
1382
        mapper(T, cls.tables.t1)
1150
1383
 
1151
1384
    def teardown(self):
1152
1385
        from sqlalchemy.orm.session import _sessions
1169
1402
        """
1170
1403
 
1171
1404
        all_states = sess.identity_map.all_states()
1172
 
        sess.identity_map.all_states = lambda : all_states
 
1405
        sess.identity_map.all_states = lambda: all_states
1173
1406
        for obj in objs:
1174
1407
            state = attributes.instance_state(obj)
1175
1408
            sess.identity_map.discard(state)
1176
 
            state.dispose()
 
1409
            state._dispose()
1177
1410
 
1178
1411
    def _test_session(self, **kwargs):
1179
 
        global sess
 
1412
        T = self.classes.T
1180
1413
        sess = create_session(**kwargs)
1181
1414
 
1182
1415
        data = o1, o2, o3, o4, o5 = [T('t1'), T('t2'), T('t3'), T('t4'
1380
1613
        sess.flush()
1381
1614
        self.bind.commit()
1382
1615
 
 
1616
 
 
1617
 
 
1618
class FlushWarningsTest(fixtures.MappedTest):
 
1619
    run_setup_mappers = 'each'
 
1620
 
 
1621
    @classmethod
 
1622
    def define_tables(cls, metadata):
 
1623
        Table('user', metadata,
 
1624
                Column('id', Integer, primary_key=True,
 
1625
                            test_needs_autoincrement=True),
 
1626
                Column('name', String(20))
 
1627
            )
 
1628
 
 
1629
        Table('address', metadata,
 
1630
                Column('id', Integer, primary_key=True,
 
1631
                            test_needs_autoincrement=True),
 
1632
                Column('user_id', Integer, ForeignKey('user.id')),
 
1633
                Column('email', String(20))
 
1634
            )
 
1635
 
 
1636
    @classmethod
 
1637
    def setup_classes(cls):
 
1638
        class User(cls.Basic):
 
1639
            pass
 
1640
        class Address(cls.Basic):
 
1641
            pass
 
1642
 
 
1643
    @classmethod
 
1644
    def setup_mappers(cls):
 
1645
        user, User = cls.tables.user, cls.classes.User
 
1646
        address, Address = cls.tables.address, cls.classes.Address
 
1647
        mapper(User, user, properties={
 
1648
                'addresses': relationship(Address, backref="user")
 
1649
            })
 
1650
        mapper(Address, address)
 
1651
 
 
1652
    def test_o2m_cascade_add(self):
 
1653
        Address = self.classes.Address
 
1654
        def evt(mapper, conn, instance):
 
1655
            instance.addresses.append(Address(email='x1'))
 
1656
        self._test(evt, "collection append")
 
1657
 
 
1658
    def test_o2m_cascade_remove(self):
 
1659
        def evt(mapper, conn, instance):
 
1660
            del instance.addresses[0]
 
1661
        self._test(evt, "collection remove")
 
1662
 
 
1663
    def test_m2o_cascade_add(self):
 
1664
        User = self.classes.User
 
1665
        def evt(mapper, conn, instance):
 
1666
            instance.addresses[0].user = User(name='u2')
 
1667
        self._test(evt, "related attribute set")
 
1668
 
 
1669
    def test_m2o_cascade_remove(self):
 
1670
        def evt(mapper, conn, instance):
 
1671
            a1 = instance.addresses[0]
 
1672
            del a1.user
 
1673
        self._test(evt, "related attribute delete")
 
1674
 
 
1675
    def test_plain_add(self):
 
1676
        Address = self.classes.Address
 
1677
        def evt(mapper, conn, instance):
 
1678
            object_session(instance).add(Address(email='x1'))
 
1679
        self._test(evt, "Session.add\(\)")
 
1680
 
 
1681
    def test_plain_merge(self):
 
1682
        Address = self.classes.Address
 
1683
        def evt(mapper, conn, instance):
 
1684
            object_session(instance).merge(Address(email='x1'))
 
1685
        self._test(evt, "Session.merge\(\)")
 
1686
 
 
1687
    def test_plain_delete(self):
 
1688
        Address = self.classes.Address
 
1689
        def evt(mapper, conn, instance):
 
1690
            object_session(instance).delete(Address(email='x1'))
 
1691
        self._test(evt, "Session.delete\(\)")
 
1692
 
 
1693
    def _test(self, fn, method):
 
1694
        User = self.classes.User
 
1695
        Address = self.classes.Address
 
1696
 
 
1697
        s = Session()
 
1698
        event.listen(User, "after_insert", fn)
 
1699
 
 
1700
        u1 = User(name='u1', addresses=[Address(name='a1')])
 
1701
        s.add(u1)
 
1702
        assert_raises_message(
 
1703
            sa.exc.SAWarning,
 
1704
            "Usage of the '%s'" % method,
 
1705
            s.commit
 
1706
        )