~ubuntu-branches/debian/jessie/sqlalchemy/jessie

« back to all changes in this revision

Viewing changes to test/orm/test_session.py

  • Committer: Bazaar Package Importer
  • Author(s): Piotr Ożarowski
  • Date: 2011-08-01 23:18:16 UTC
  • mfrom: (1.4.15 upstream) (16.1.14 experimental)
  • Revision ID: james.westby@ubuntu.com-20110801231816-6lx797pi3q1fpqst
Tags: 0.7.2-1
* New upstream release
* Bump minimum required python-mako version to 0.4.1 (closes: 635898)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
from sqlalchemy.test.testing import eq_, assert_raises, \
 
1
from test.lib.testing import eq_, assert_raises, \
2
2
    assert_raises_message
3
 
from sqlalchemy.test.util import gc_collect
 
3
from test.lib.util import gc_collect
 
4
from test.lib import pickleable
 
5
from sqlalchemy.util import pickle
4
6
import inspect
5
 
import pickle
6
7
from sqlalchemy.orm import create_session, sessionmaker, attributes, \
7
8
    make_transient, Session
8
 
from sqlalchemy.orm.attributes import instance_state
9
9
import sqlalchemy as sa
10
 
from sqlalchemy.test import engines, testing, config
 
10
from test.lib import engines, testing, config
11
11
from sqlalchemy import Integer, String, Sequence
12
 
from sqlalchemy.test.schema import Table, Column
 
12
from test.lib.schema import Table, Column
13
13
from sqlalchemy.orm import mapper, relationship, backref, joinedload, \
14
14
    exc as orm_exc, object_session
15
 
from sqlalchemy.test.testing import eq_
16
 
from test.engine import _base as engine_base
17
 
from test.orm import _base, _fixtures
18
 
 
 
15
from sqlalchemy.util import pypy
 
16
from test.lib import fixtures
 
17
from test.lib import fixtures
 
18
from test.orm import _fixtures
19
19
 
20
20
class SessionTest(_fixtures.FixtureTest):
21
21
    run_inserts = None
22
22
 
23
 
    @testing.resolve_artifact_names
24
23
    def test_no_close_on_flush(self):
25
24
        """Flush() doesn't close a connection the session didn't open"""
 
25
 
 
26
        User, users = self.classes.User, self.tables.users
 
27
 
26
28
        c = testing.db.connect()
27
29
        c.execute("select * from users")
28
30
 
32
34
        s.flush()
33
35
        c.execute("select * from users")
34
36
 
35
 
    @testing.resolve_artifact_names
36
37
    def test_close(self):
37
38
        """close() doesn't close a connection the session didn't open"""
 
39
 
 
40
        User, users = self.classes.User, self.tables.users
 
41
 
38
42
        c = testing.db.connect()
39
43
        c.execute("select * from users")
40
44
 
46
50
        s.close()
47
51
        c.execute("select * from users")
48
52
 
49
 
    @testing.resolve_artifact_names
50
53
    def test_no_close_transaction_on_flulsh(self):
 
54
        User, users = self.classes.User, self.tables.users
 
55
 
51
56
        c = testing.db.connect()
52
57
        try:
53
58
            mapper(User, users)
68
73
        finally:
69
74
            c.close()
70
75
 
71
 
    @testing.resolve_artifact_names
72
76
    def test_object_session_raises(self):
 
77
        User = self.classes.User
 
78
 
73
79
        assert_raises(
74
80
            orm_exc.UnmappedInstanceError,
75
 
            object_session, 
 
81
            object_session,
76
82
            object()
77
83
        )
78
84
 
79
85
        assert_raises(
80
86
            orm_exc.UnmappedInstanceError,
81
 
            object_session, 
 
87
            object_session,
82
88
            User()
83
89
        )
84
90
 
93
99
            seq.drop(testing.db)
94
100
 
95
101
 
96
 
    @testing.resolve_artifact_names
97
 
    def test_expunge_cascade(self):
98
 
        mapper(Address, addresses)
99
 
        mapper(User, users, properties={
100
 
            'addresses':relationship(Address,
101
 
                                 backref=backref("user", cascade="all"),
102
 
                                 cascade="all")})
103
 
 
104
 
        _fixtures.run_inserts_for(users)
105
 
        _fixtures.run_inserts_for(addresses)
106
 
 
107
 
        session = create_session()
108
 
        u = session.query(User).filter_by(id=7).one()
109
 
 
110
 
        # get everything to load in both directions
111
 
        print [a.user for a in u.addresses]
112
 
 
113
 
        # then see if expunge fails
114
 
        session.expunge(u)
115
 
 
116
 
        assert sa.orm.object_session(u) is None
117
 
        assert sa.orm.attributes.instance_state(u).session_id is None
118
 
        for a in u.addresses:
119
 
            assert sa.orm.object_session(a) is None
120
 
            assert sa.orm.attributes.instance_state(a).session_id is None
121
102
 
122
103
    @engines.close_open_connections
123
 
    @testing.resolve_artifact_names
124
104
    def test_mapped_binds(self):
 
105
        Address, addresses, users, User = (self.classes.Address,
 
106
                                self.tables.addresses,
 
107
                                self.tables.users,
 
108
                                self.classes.User)
 
109
 
125
110
 
126
111
        # ensure tables are unbound
127
112
        m2 = sa.MetaData()
159
144
        sess.close()
160
145
 
161
146
    @engines.close_open_connections
162
 
    @testing.resolve_artifact_names
163
147
    def test_table_binds(self):
 
148
        Address, addresses, users, User = (self.classes.Address,
 
149
                                self.tables.addresses,
 
150
                                self.tables.users,
 
151
                                self.classes.User)
 
152
 
164
153
 
165
154
        # ensure tables are unbound
166
155
        m2 = sa.MetaData()
196
185
        sess.close()
197
186
 
198
187
    @engines.close_open_connections
199
 
    @testing.resolve_artifact_names
200
188
    def test_bind_from_metadata(self):
 
189
        users, User = self.tables.users, self.classes.User
 
190
 
201
191
        mapper(User, users)
202
192
 
203
193
        session = create_session()
212
202
 
213
203
    @testing.requires.independent_connections
214
204
    @engines.close_open_connections
215
 
    @testing.resolve_artifact_names
216
205
    def test_transaction(self):
 
206
        User, users = self.classes.User, self.tables.users
 
207
 
217
208
        mapper(User, users)
218
209
        conn1 = testing.db.connect()
219
210
        conn2 = testing.db.connect()
233
224
 
234
225
    @testing.requires.independent_connections
235
226
    @engines.close_open_connections
236
 
    @testing.resolve_artifact_names
237
227
    def test_autoflush(self):
 
228
        User, users = self.classes.User, self.tables.users
 
229
 
238
230
        bind = self.metadata.bind
239
231
        mapper(User, users)
240
232
        conn1 = bind.connect()
253
245
        eq_(bind.connect().execute("select count(1) from users").scalar(), 1)
254
246
        sess.close()
255
247
 
256
 
    @testing.resolve_artifact_names
257
248
    def test_make_transient(self):
 
249
        users, User = self.tables.users, self.classes.User
 
250
 
258
251
        mapper(User, users)
259
252
        sess = create_session()
260
253
        sess.add(User(name='test'))
272
265
        sess.add(u1)
273
266
        assert u1 in sess.new
274
267
 
275
 
        # test expired attributes 
 
268
        # test expired attributes
276
269
        # get unexpired
277
270
        u1 = sess.query(User).first()
278
271
        sess.expire(u1)
299
292
        sess.flush()
300
293
        assert u1 in sess
301
294
 
302
 
    @testing.resolve_artifact_names
303
295
    def test_make_transient_plus_rollback(self):
304
296
        # test for [ticket:2182]
 
297
        users, User = self.tables.users, self.classes.User
 
298
 
305
299
        mapper(User, users)
306
300
        sess = Session()
307
301
        u1 = User(name='test')
313
307
        make_transient(u1)
314
308
        sess.rollback()
315
309
 
316
 
    @testing.resolve_artifact_names
317
310
    def test_deleted_flag(self):
 
311
        users, User = self.tables.users, self.classes.User
 
312
 
318
313
        mapper(User, users)
319
314
 
320
315
        sess = sessionmaker()()
341
336
 
342
337
        eq_(sess.query(User).count(), 1)
343
338
 
344
 
    @testing.resolve_artifact_names
345
339
    def test_autoflush_expressions(self):
346
 
        """test that an expression which is dependent on object state is 
 
340
        """test that an expression which is dependent on object state is
347
341
        evaluated after the session autoflushes.   This is the lambda
348
342
        inside of strategies.py lazy_clause.
349
343
 
350
344
        """
 
345
 
 
346
        users, Address, addresses, User = (self.tables.users,
 
347
                                self.classes.Address,
 
348
                                self.tables.addresses,
 
349
                                self.classes.User)
 
350
 
351
351
        mapper(User, users, properties={
352
352
            'addresses':relationship(Address, backref="user")})
353
353
        mapper(Address, addresses)
371
371
 
372
372
    @testing.requires.independent_connections
373
373
    @engines.close_open_connections
374
 
    @testing.resolve_artifact_names
375
374
    def test_autoflush_unbound(self):
 
375
        User, users = self.classes.User, self.tables.users
 
376
 
376
377
        mapper(User, users)
377
378
        try:
378
379
            sess = create_session(autocommit=False, autoflush=True)
396
397
            raise
397
398
 
398
399
    @engines.close_open_connections
399
 
    @testing.resolve_artifact_names
400
400
    def test_autoflush_2(self):
 
401
        User, users = self.classes.User, self.tables.users
 
402
 
401
403
        mapper(User, users)
402
404
        conn1 = testing.db.connect()
403
405
        conn2 = testing.db.connect()
412
414
                ).scalar() == 1
413
415
        sess.commit()
414
416
 
415
 
    @testing.resolve_artifact_names
416
 
    def test_autoflush_rollback(self):
417
 
        mapper(Address, addresses)
418
 
        mapper(User, users, properties={
419
 
            'addresses':relationship(Address)})
420
 
 
421
 
        _fixtures.run_inserts_for(users)
422
 
        _fixtures.run_inserts_for(addresses)
423
 
 
424
 
        sess = create_session(autocommit=False, autoflush=True)
425
 
        u = sess.query(User).get(8)
426
 
        newad = Address(email_address='a new address')
427
 
        u.addresses.append(newad)
428
 
        u.name = 'some new name'
429
 
        assert u.name == 'some new name'
430
 
        assert len(u.addresses) == 4
431
 
        assert newad in u.addresses
432
 
        sess.rollback()
433
 
        assert u.name == 'ed'
434
 
        assert len(u.addresses) == 3
435
 
 
436
 
        assert newad not in u.addresses
437
 
        # pending objects dont get expired
438
 
        assert newad.email_address == 'a new address'
439
 
 
440
 
    @testing.resolve_artifact_names
 
417
 
441
418
    def test_autocommit_doesnt_raise_on_pending(self):
 
419
        User, users = self.classes.User, self.tables.users
 
420
 
442
421
        mapper(User, users)
443
422
        session = create_session(autocommit=True)
444
423
 
456
435
        sess.rollback()
457
436
        assert not sess.is_active
458
437
 
459
 
    @testing.resolve_artifact_names
460
438
    def test_textual_execute(self):
461
439
        """test that Session.execute() converts to text()"""
462
440
 
 
441
        users = self.tables.users
 
442
 
 
443
 
463
444
        sess = create_session(bind=self.metadata.bind)
464
445
        users.insert().execute(id=7, name='jack')
465
446
 
475
456
            7)
476
457
 
477
458
    @engines.close_open_connections
478
 
    @testing.resolve_artifact_names
479
459
    def test_subtransaction_on_external(self):
 
460
        users, User = self.tables.users, self.classes.User
 
461
 
480
462
        mapper(User, users)
481
463
        conn = testing.db.connect()
482
464
        trans = conn.begin()
492
474
 
493
475
    @testing.requires.savepoints
494
476
    @engines.close_open_connections
495
 
    @testing.resolve_artifact_names
496
477
    def test_external_nested_transaction(self):
 
478
        users, User = self.tables.users, self.classes.User
 
479
 
497
480
        mapper(User, users)
498
481
        try:
499
482
            conn = testing.db.connect()
516
499
            raise
517
500
 
518
501
    @testing.requires.savepoints
519
 
    @testing.resolve_artifact_names
520
502
    def test_heavy_nesting(self):
 
503
        users = self.tables.users
 
504
 
521
505
        session = create_session(bind=testing.db)
522
506
        session.begin()
523
507
        session.connection().execute(users.insert().values(name='user1'
537
521
        assert session.connection().execute('select count(1) from users'
538
522
                ).scalar() == 2
539
523
 
540
 
    @testing.fails_on('sqlite', 'FIXME: unknown')
541
 
    @testing.resolve_artifact_names
 
524
    @testing.requires.independent_connections
542
525
    def test_transactions_isolated(self):
 
526
        User, users = self.classes.User, self.tables.users
 
527
 
543
528
        mapper(User, users)
544
529
        users.delete().execute()
545
530
 
552
537
        assert s2.query(User).all() == []
553
538
 
554
539
    @testing.requires.two_phase_transactions
555
 
    @testing.resolve_artifact_names
556
540
    def test_twophase(self):
 
541
        users, Address, addresses, User = (self.tables.users,
 
542
                                self.classes.Address,
 
543
                                self.tables.addresses,
 
544
                                self.classes.User)
 
545
 
557
546
        # TODO: mock up a failure condition here
558
547
        # to ensure a rollback succeeds
559
548
        mapper(User, users)
573
562
        assert users.count().scalar() == 1
574
563
        assert addresses.count().scalar() == 1
575
564
 
576
 
    @testing.resolve_artifact_names
577
565
    def test_subtransaction_on_noautocommit(self):
 
566
        User, users = self.classes.User, self.tables.users
 
567
 
578
568
        mapper(User, users)
579
569
        sess = create_session(autocommit=False, autoflush=True)
580
570
        sess.begin(subtransactions=True)
587
577
        sess.close()
588
578
 
589
579
    @testing.requires.savepoints
590
 
    @testing.resolve_artifact_names
591
580
    def test_nested_transaction(self):
 
581
        User, users = self.classes.User, self.tables.users
 
582
 
592
583
        mapper(User, users)
593
584
        sess = create_session()
594
585
        sess.begin()
610
601
        sess.close()
611
602
 
612
603
    @testing.requires.savepoints
613
 
    @testing.resolve_artifact_names
614
604
    def test_nested_autotrans(self):
 
605
        User, users = self.classes.User, self.tables.users
 
606
 
615
607
        mapper(User, users)
616
608
        sess = create_session(autocommit=False)
617
609
        u = User(name='u1')
631
623
        sess.close()
632
624
 
633
625
    @testing.requires.savepoints
634
 
    @testing.resolve_artifact_names
635
626
    def test_nested_transaction_connection_add(self):
 
627
        users, User = self.tables.users, self.classes.User
 
628
 
636
629
        mapper(User, users)
637
630
 
638
631
        sess = create_session(autocommit=True)
666
659
        sess.close()
667
660
 
668
661
    @testing.requires.savepoints
669
 
    @testing.resolve_artifact_names
670
662
    def test_mixed_transaction_control(self):
 
663
        users, User = self.tables.users, self.classes.User
 
664
 
671
665
        mapper(User, users)
672
666
 
673
667
        sess = create_session(autocommit=True)
697
691
        sess.close()
698
692
 
699
693
    @testing.requires.savepoints
700
 
    @testing.resolve_artifact_names
701
694
    def test_mixed_transaction_close(self):
 
695
        users, User = self.tables.users, self.classes.User
 
696
 
702
697
        mapper(User, users)
703
698
 
704
699
        sess = create_session(autocommit=False)
717
712
 
718
713
        eq_(len(sess.query(User).all()), 1)
719
714
 
720
 
    @testing.resolve_artifact_names
721
715
    def test_error_on_using_inactive_session(self):
 
716
        users, User = self.tables.users, self.classes.User
 
717
 
722
718
        mapper(User, users)
723
719
        sess = create_session(autocommit=True)
724
720
        sess.begin()
734
730
                              sess.begin, subtransactions=True)
735
731
        sess.close()
736
732
 
737
 
    @testing.resolve_artifact_names
738
733
    def test_preserve_flush_error(self):
 
734
        users, User = self.tables.users, self.classes.User
 
735
 
739
736
        mapper(User, users)
740
737
        sess = Session()
741
738
 
759
756
        sess.commit()
760
757
 
761
758
 
762
 
    @testing.resolve_artifact_names
763
759
    def test_no_autocommit_with_explicit_commit(self):
 
760
        User, users = self.classes.User, self.tables.users
 
761
 
764
762
        mapper(User, users)
765
763
        session = create_session(autocommit=False)
766
764
        session.add(User(name='ed'))
769
767
            'autocommit=False should start a new transaction'
770
768
 
771
769
    @engines.close_open_connections
772
 
    @testing.resolve_artifact_names
773
770
    def test_bound_connection(self):
 
771
        users, User = self.tables.users, self.classes.User
 
772
 
774
773
        mapper(User, users)
775
774
        c = testing.db.connect()
776
775
        sess = create_session(bind=c)
791
790
        assert len(sess.query(User).all()) == 0
792
791
        sess.close()
793
792
 
794
 
    @testing.resolve_artifact_names
795
793
    def test_bound_connection_transactional(self):
 
794
        User, users = self.classes.User, self.tables.users
 
795
 
796
796
        mapper(User, users)
797
797
        c = testing.db.connect()
798
798
 
826
826
        assert not c.in_transaction()
827
827
        assert c.scalar("select count(1) from users") == 1
828
828
 
 
829
    def test_bind_arguments(self):
 
830
        users, Address, addresses, User = (self.tables.users,
 
831
                                self.classes.Address,
 
832
                                self.tables.addresses,
 
833
                                self.classes.User)
 
834
 
 
835
        mapper(User, users)
 
836
        mapper(Address, addresses)
 
837
 
 
838
        e1 = engines.testing_engine()
 
839
        e2 = engines.testing_engine()
 
840
        e3 = engines.testing_engine()
 
841
 
 
842
        sess = Session(e3)
 
843
        sess.bind_mapper(User, e1)
 
844
        sess.bind_mapper(Address, e2)
 
845
 
 
846
        assert sess.connection().engine is e3
 
847
        assert sess.connection(bind=e1).engine is e1
 
848
        assert sess.connection(mapper=Address, bind=e1).engine is e1
 
849
        assert sess.connection(mapper=Address).engine is e2
 
850
        assert sess.connection(clause=addresses.select()).engine is e2
 
851
        assert sess.connection(mapper=User,
 
852
                                clause=addresses.select()).engine is e1
 
853
        assert sess.connection(mapper=User,
 
854
                                clause=addresses.select(),
 
855
                                bind=e2).engine is e2
 
856
 
 
857
        sess.close()
829
858
 
830
859
    @engines.close_open_connections
831
 
    @testing.resolve_artifact_names
832
860
    def test_add_delete(self):
 
861
        User, Address, addresses, users = (self.classes.User,
 
862
                                self.classes.Address,
 
863
                                self.tables.addresses,
 
864
                                self.tables.users)
 
865
 
833
866
 
834
867
        s = create_session()
835
868
        mapper(User, users, properties={
883
916
        assert user not in s
884
917
        assert s.query(User).count() == 0
885
918
 
886
 
    @testing.resolve_artifact_names
887
919
    def test_is_modified(self):
 
920
        User, Address, addresses, users = (self.classes.User,
 
921
                                self.classes.Address,
 
922
                                self.tables.addresses,
 
923
                                self.tables.users)
 
924
 
888
925
        s = create_session()
889
926
 
890
927
        mapper(User, users, properties={'addresses':relationship(Address)})
915
952
        assert s.is_modified(user)
916
953
        assert not s.is_modified(user, include_collections=False)
917
954
 
918
 
    @testing.resolve_artifact_names
919
955
    def test_is_modified_syn(self):
 
956
        User, users = self.classes.User, self.tables.users
 
957
 
920
958
        s = sessionmaker()()
921
959
 
922
960
        mapper(User, users, properties={'uname':sa.orm.synonym('name')})
926
964
        s.commit()
927
965
        assert not s.is_modified(u)
928
966
 
929
 
    @testing.resolve_artifact_names
930
967
    def test_weak_ref(self):
931
968
        """test the weak-referencing identity map, which strongly-
932
969
        references modified items."""
933
970
 
 
971
        users, User = self.tables.users, self.classes.User
 
972
 
 
973
 
934
974
        s = create_session()
935
975
        mapper(User, users)
936
976
 
959
999
        assert user.name == 'fred'
960
1000
        assert s.identity_map
961
1001
 
962
 
    @testing.resolve_artifact_names
963
1002
    def test_weak_ref_pickled(self):
 
1003
        users, User = self.tables.users, pickleable.User
 
1004
 
964
1005
        s = create_session()
965
1006
        mapper(User, users)
966
1007
 
989
1030
 
990
1031
        assert not s.identity_map
991
1032
 
992
 
    @testing.resolve_artifact_names
 
1033
    @testing.uses_deprecated()
993
1034
    def test_identity_conflict(self):
 
1035
        users, User = self.tables.users, self.classes.User
 
1036
 
994
1037
        mapper(User, users)
995
1038
        for s in (
996
1039
            create_session(),
1009
1052
                          sa.orm.attributes.instance_state(u2))
1010
1053
 
1011
1054
 
1012
 
    @testing.resolve_artifact_names
1013
1055
    def test_weakref_with_cycles_o2m(self):
 
1056
        Address, addresses, users, User = (self.classes.Address,
 
1057
                                self.tables.addresses,
 
1058
                                self.tables.users,
 
1059
                                self.classes.User)
 
1060
 
1014
1061
        s = sessionmaker()()
1015
1062
        mapper(User, users, properties={
1016
1063
            "addresses":relationship(Address, backref="user")
1038
1085
        user = s.query(User).options(joinedload(User.addresses)).one()
1039
1086
        eq_(user, User(name="ed", addresses=[Address(email_address="ed2")]))
1040
1087
 
1041
 
    @testing.resolve_artifact_names
1042
1088
    def test_weakref_with_cycles_o2o(self):
 
1089
        Address, addresses, users, User = (self.classes.Address,
 
1090
                                self.tables.addresses,
 
1091
                                self.tables.users,
 
1092
                                self.classes.User)
 
1093
 
1043
1094
        s = sessionmaker()()
1044
1095
        mapper(User, users, properties={
1045
1096
            "address":relationship(Address, backref="user", uselist=False)
1068
1119
        user = s.query(User).options(joinedload(User.address)).one()
1069
1120
        eq_(user, User(name="ed", address=Address(email_address="ed2")))
1070
1121
 
1071
 
    @testing.resolve_artifact_names
 
1122
    @testing.uses_deprecated()
1072
1123
    def test_strong_ref(self):
 
1124
        users, User = self.tables.users, self.classes.User
 
1125
 
1073
1126
        s = create_session(weak_identity_map=False)
1074
1127
        mapper(User, users)
1075
1128
 
1089
1142
        s.flush()
1090
1143
        eq_(users.select().execute().fetchall(), [(user.id, 'u2')])
1091
1144
 
 
1145
    @testing.uses_deprecated()
 
1146
    @testing.fails_if(lambda: pypy, "pypy has a real GC")
1092
1147
    @testing.fails_on('+zxjdbc', 'http://www.sqlalchemy.org/trac/ticket/1473')
1093
 
    @testing.resolve_artifact_names
1094
1148
    def test_prune(self):
 
1149
        users, User = self.tables.users, self.classes.User
 
1150
 
1095
1151
        s = create_session(weak_identity_map=False)
1096
1152
        mapper(User, users)
1097
1153
 
1139
1195
        self.assert_(s.prune() == 0)
1140
1196
        self.assert_(len(s.identity_map) == 0)
1141
1197
 
1142
 
    @testing.resolve_artifact_names
1143
 
    def test_no_save_cascade_1(self):
1144
 
        mapper(Address, addresses)
1145
 
        mapper(User, users, properties=dict(
1146
 
            addresses=relationship(Address, cascade="none", backref="user")))
1147
 
        s = create_session()
1148
 
 
1149
 
        u = User(name='u1')
1150
 
        s.add(u)
1151
 
        a = Address(email_address='u1@e')
1152
 
        u.addresses.append(a)
1153
 
        assert u in s
1154
 
        assert a not in s
1155
 
        s.flush()
1156
 
        print "\n".join([repr(x.__dict__) for x in s])
1157
 
        s.expunge_all()
1158
 
        assert s.query(User).one().id == u.id
1159
 
        assert s.query(Address).first() is None
1160
 
 
1161
 
    @testing.resolve_artifact_names
1162
 
    def test_no_save_cascade_2(self):
1163
 
        mapper(Address, addresses)
1164
 
        mapper(User, users, properties=dict(
1165
 
            addresses=relationship(Address,
1166
 
                               cascade="all",
1167
 
                               backref=backref("user", cascade="none"))))
1168
 
 
1169
 
        s = create_session()
1170
 
        u = User(name='u1')
1171
 
        a = Address(email_address='u1@e')
1172
 
        a.user = u
1173
 
        s.add(a)
1174
 
        assert u not in s
1175
 
        assert a in s
1176
 
        s.flush()
1177
 
        s.expunge_all()
1178
 
        assert s.query(Address).one().id == a.id
1179
 
        assert s.query(User).first() is None
1180
 
 
1181
 
    @testing.resolve_artifact_names
1182
 
    def test_extension(self):
1183
 
        mapper(User, users)
1184
 
        log = []
1185
 
        class MyExt(sa.orm.session.SessionExtension):
1186
 
            def before_commit(self, session):
1187
 
                log.append('before_commit')
1188
 
            def after_commit(self, session):
1189
 
                log.append('after_commit')
1190
 
            def after_rollback(self, session):
1191
 
                log.append('after_rollback')
1192
 
            def before_flush(self, session, flush_context, objects):
1193
 
                log.append('before_flush')
1194
 
            def after_flush(self, session, flush_context):
1195
 
                log.append('after_flush')
1196
 
            def after_flush_postexec(self, session, flush_context):
1197
 
                log.append('after_flush_postexec')
1198
 
            def after_begin(self, session, transaction, connection):
1199
 
                log.append('after_begin')
1200
 
            def after_attach(self, session, instance):
1201
 
                log.append('after_attach')
1202
 
            def after_bulk_update(
1203
 
                self,
1204
 
                session,
1205
 
                query,
1206
 
                query_context,
1207
 
                result,
1208
 
                ):
1209
 
                log.append('after_bulk_update')
1210
 
 
1211
 
            def after_bulk_delete(
1212
 
                self,
1213
 
                session,
1214
 
                query,
1215
 
                query_context,
1216
 
                result,
1217
 
                ):
1218
 
                log.append('after_bulk_delete')
1219
 
 
1220
 
        sess = create_session(extension = MyExt())
1221
 
        u = User(name='u1')
1222
 
        sess.add(u)
1223
 
        sess.flush()
1224
 
        assert log == [
1225
 
            'after_attach',
1226
 
            'before_flush',
1227
 
            'after_begin',
1228
 
            'after_flush',
1229
 
            'before_commit',
1230
 
            'after_commit',
1231
 
            'after_flush_postexec',
1232
 
            ]
1233
 
        log = []
1234
 
        sess = create_session(autocommit=False, extension=MyExt())
1235
 
        u = User(name='u1')
1236
 
        sess.add(u)
1237
 
        sess.flush()
1238
 
        assert log == ['after_attach', 'before_flush', 'after_begin',
1239
 
                       'after_flush', 'after_flush_postexec']
1240
 
        log = []
1241
 
        u.name = 'ed'
1242
 
        sess.commit()
1243
 
        assert log == ['before_commit', 'before_flush', 'after_flush',
1244
 
                       'after_flush_postexec', 'after_commit']
1245
 
        log = []
1246
 
        sess.commit()
1247
 
        assert log == ['before_commit', 'after_commit']
1248
 
        log = []
1249
 
        sess.query(User).delete()
1250
 
        assert log == ['after_begin', 'after_bulk_delete']
1251
 
        log = []
1252
 
        sess.query(User).update({'name': 'foo'})
1253
 
        assert log == ['after_bulk_update']
1254
 
        log = []
1255
 
        sess = create_session(autocommit=False, extension=MyExt(),
1256
 
                              bind=testing.db)
1257
 
        conn = sess.connection()
1258
 
        assert log == ['after_begin']
1259
 
 
1260
 
    @testing.resolve_artifact_names
1261
 
    def test_before_flush(self):
1262
 
        """test that the flush plan can be affected during before_flush()"""
1263
 
 
1264
 
        mapper(User, users)
1265
 
 
1266
 
        class MyExt(sa.orm.session.SessionExtension):
1267
 
            def before_flush(self, session, flush_context, objects):
1268
 
                for obj in list(session.new) + list(session.dirty):
1269
 
                    if isinstance(obj, User):
1270
 
                        session.add(User(name='another %s' % obj.name))
1271
 
                for obj in list(session.deleted):
1272
 
                    if isinstance(obj, User):
1273
 
                        x = session.query(User).filter(User.name
1274
 
                                == 'another %s' % obj.name).one()
1275
 
                        session.delete(x)
1276
 
 
1277
 
        sess = create_session(extension = MyExt(), autoflush=True)
1278
 
        u = User(name='u1')
1279
 
        sess.add(u)
1280
 
        sess.flush()
1281
 
        eq_(sess.query(User).order_by(User.name).all(), 
1282
 
            [
1283
 
                User(name='another u1'),
1284
 
                User(name='u1')
1285
 
            ]
1286
 
        )
1287
 
 
1288
 
        sess.flush()
1289
 
        eq_(sess.query(User).order_by(User.name).all(), 
1290
 
            [
1291
 
                User(name='another u1'),
1292
 
                User(name='u1')
1293
 
            ]
1294
 
        )
1295
 
 
1296
 
        u.name='u2'
1297
 
        sess.flush()
1298
 
        eq_(sess.query(User).order_by(User.name).all(), 
1299
 
            [
1300
 
                User(name='another u1'),
1301
 
                User(name='another u2'),
1302
 
                User(name='u2')
1303
 
            ]
1304
 
        )
1305
 
 
1306
 
        sess.delete(u)
1307
 
        sess.flush()
1308
 
        eq_(sess.query(User).order_by(User.name).all(), 
1309
 
            [
1310
 
                User(name='another u1'),
1311
 
            ]
1312
 
        )
1313
 
 
1314
 
    @testing.resolve_artifact_names
1315
 
    def test_before_flush_affects_dirty(self):
1316
 
        mapper(User, users)
1317
 
 
1318
 
        class MyExt(sa.orm.session.SessionExtension):
1319
 
            def before_flush(self, session, flush_context, objects):
1320
 
                for obj in list(session.identity_map.values()):
1321
 
                    obj.name += " modified"
1322
 
 
1323
 
        sess = create_session(extension = MyExt(), autoflush=True)
1324
 
        u = User(name='u1')
1325
 
        sess.add(u)
1326
 
        sess.flush()
1327
 
        eq_(sess.query(User).order_by(User.name).all(), 
1328
 
            [
1329
 
                User(name='u1')
1330
 
            ]
1331
 
        )
1332
 
 
1333
 
        sess.add(User(name='u2'))
1334
 
        sess.flush()
1335
 
        sess.expunge_all()
1336
 
        eq_(sess.query(User).order_by(User.name).all(), 
1337
 
            [
1338
 
                User(name='u1 modified'),
1339
 
                User(name='u2')
1340
 
            ]
1341
 
        )
1342
 
 
1343
 
    @testing.resolve_artifact_names
1344
 
    def test_reentrant_flush(self):
1345
 
 
1346
 
        mapper(User, users)
1347
 
 
1348
 
        class MyExt(sa.orm.session.SessionExtension):
1349
 
            def before_flush(s, session, flush_context, objects):
1350
 
                session.flush()
1351
 
 
1352
 
        sess = create_session(extension=MyExt())
1353
 
        sess.add(User(name='foo'))
1354
 
        assert_raises_message(sa.exc.InvalidRequestError,
1355
 
                              'already flushing', sess.flush)
1356
 
 
1357
 
    @testing.resolve_artifact_names
 
1198
 
1358
1199
    def test_pickled_update(self):
 
1200
        users, User = self.tables.users, pickleable.User
 
1201
 
1359
1202
        mapper(User, users)
1360
1203
        sess1 = create_session()
1361
1204
        sess2 = create_session()
1367
1210
        u2 = pickle.loads(pickle.dumps(u1))
1368
1211
        sess2.add(u2)
1369
1212
 
1370
 
    @testing.resolve_artifact_names
1371
1213
    def test_duplicate_update(self):
 
1214
        users, User = self.tables.users, self.classes.User
 
1215
 
1372
1216
        mapper(User, users)
1373
1217
        Session = sessionmaker()
1374
1218
        sess = Session()
1407
1251
        u3 = sess.query(User).get(u1.id)
1408
1252
        assert u3 is not u1 and u3 is not u2 and u3.name == u1.name
1409
1253
 
1410
 
    @testing.resolve_artifact_names
1411
1254
    def test_no_double_save(self):
 
1255
        users = self.tables.users
 
1256
 
1412
1257
        sess = create_session()
1413
1258
        class Foo(object):
1414
1259
            def __init__(self):
1424
1269
        assert b in sess
1425
1270
        assert len(list(sess)) == 1
1426
1271
 
1427
 
    @testing.resolve_artifact_names
1428
1272
    def test_identity_map_mutate(self):
 
1273
        users, User = self.tables.users, self.classes.User
 
1274
 
1429
1275
        mapper(User, users)
1430
1276
 
1431
1277
        sess = Session()
1439
1285
                del u3
1440
1286
                gc_collect()
1441
1287
 
1442
 
 
1443
 
class DisposedStates(_base.MappedTest):
 
1288
class SessionDataTest(_fixtures.FixtureTest):
 
1289
    def test_expunge_cascade(self):
 
1290
        Address, addresses, users, User = (self.classes.Address,
 
1291
                                self.tables.addresses,
 
1292
                                self.tables.users,
 
1293
                                self.classes.User)
 
1294
 
 
1295
        mapper(Address, addresses)
 
1296
        mapper(User, users, properties={
 
1297
            'addresses':relationship(Address,
 
1298
                                 backref=backref("user", cascade="all"),
 
1299
                                 cascade="all")})
 
1300
 
 
1301
        session = create_session()
 
1302
        u = session.query(User).filter_by(id=7).one()
 
1303
 
 
1304
        # get everything to load in both directions
 
1305
        print [a.user for a in u.addresses]
 
1306
 
 
1307
        # then see if expunge fails
 
1308
        session.expunge(u)
 
1309
 
 
1310
        assert sa.orm.object_session(u) is None
 
1311
        assert sa.orm.attributes.instance_state(u).session_id is None
 
1312
        for a in u.addresses:
 
1313
            assert sa.orm.object_session(a) is None
 
1314
            assert sa.orm.attributes.instance_state(a).session_id is None
 
1315
 
 
1316
    def test_autoflush_rollback(self):
 
1317
        Address, addresses, users, User = (self.classes.Address,
 
1318
                                self.tables.addresses,
 
1319
                                self.tables.users,
 
1320
                                self.classes.User)
 
1321
 
 
1322
        mapper(Address, addresses)
 
1323
        mapper(User, users, properties={
 
1324
            'addresses':relationship(Address)})
 
1325
 
 
1326
        sess = create_session(autocommit=False, autoflush=True)
 
1327
        u = sess.query(User).get(8)
 
1328
        newad = Address(email_address='a new address')
 
1329
        u.addresses.append(newad)
 
1330
        u.name = 'some new name'
 
1331
        assert u.name == 'some new name'
 
1332
        assert len(u.addresses) == 4
 
1333
        assert newad in u.addresses
 
1334
        sess.rollback()
 
1335
        assert u.name == 'ed'
 
1336
        assert len(u.addresses) == 3
 
1337
 
 
1338
        assert newad not in u.addresses
 
1339
        # pending objects dont get expired
 
1340
        assert newad.email_address == 'a new address'
 
1341
 
 
1342
 
 
1343
class DisposedStates(fixtures.MappedTest):
1444
1344
    run_setup_mappers = 'once'
1445
1345
    run_inserts = 'once'
1446
1346
    run_deletes = None
1484
1384
        sess.identity_map.all_states = lambda : all_states
1485
1385
        for obj in objs:
1486
1386
            state = attributes.instance_state(obj)
1487
 
            sess.identity_map.remove(state)
 
1387
            sess.identity_map.discard(state)
1488
1388
            state.dispose()
1489
1389
 
1490
1390
    def _test_session(self, **kwargs):
1526
1426
        sess.rollback()
1527
1427
 
1528
1428
 
1529
 
class SessionInterface(testing.TestBase):
 
1429
class SessionInterface(fixtures.TestBase):
1530
1430
    """Bogus args to Session methods produce actionable exceptions."""
1531
1431
 
1532
1432
    # TODO: expand with message body assertions.
1658
1558
        self._test_class_guards(early)
1659
1559
 
1660
1560
 
1661
 
class TLTransactionTest(engine_base.AltEngineTest, _base.MappedTest):
 
1561
class TLTransactionTest(fixtures.MappedTest):
 
1562
    run_dispose_bind = 'once'
 
1563
 
1662
1564
    @classmethod
1663
 
    def create_engine(cls):
 
1565
    def setup_bind(cls):
1664
1566
        return engines.testing_engine(options=dict(strategy='threadlocal'))
1665
1567
 
1666
1568
    @classmethod
1671
1573
 
1672
1574
    @classmethod
1673
1575
    def setup_classes(cls):
1674
 
        class User(_base.BasicEntity):
 
1576
        class User(cls.Basic):
1675
1577
            pass
1676
1578
 
1677
1579
    @classmethod
1678
 
    @testing.resolve_artifact_names
1679
1580
    def setup_mappers(cls):
 
1581
        users, User = cls.tables.users, cls.classes.User
 
1582
 
1680
1583
        mapper(User, users)
1681
1584
 
1682
1585
    @testing.exclude('mysql', '<', (5, 0, 3), 'FIXME: unknown')
1683
 
    @testing.resolve_artifact_names
1684
1586
    def test_session_nesting(self):
1685
 
        sess = create_session(bind=self.engine)
1686
 
        self.engine.begin()
 
1587
        User = self.classes.User
 
1588
 
 
1589
        sess = create_session(bind=self.bind)
 
1590
        self.bind.begin()
1687
1591
        u = User(name='ed')
1688
1592
        sess.add(u)
1689
1593
        sess.flush()
1690
 
        self.engine.commit()
1691
 
 
 
1594
        self.bind.commit()
1692
1595