1
from sqlalchemy.test.testing import eq_, assert_raises, \
1
from test.lib.testing import eq_, assert_raises, \
2
2
assert_raises_message
3
from sqlalchemy.test.util import gc_collect
3
from test.lib.util import gc_collect
4
from test.lib import pickleable
5
from sqlalchemy.util import pickle
6
7
from sqlalchemy.orm import create_session, sessionmaker, attributes, \
7
8
make_transient, Session
8
from sqlalchemy.orm.attributes import instance_state
9
9
import sqlalchemy as sa
10
from sqlalchemy.test import engines, testing, config
10
from test.lib import engines, testing, config
11
11
from sqlalchemy import Integer, String, Sequence
12
from sqlalchemy.test.schema import Table, Column
12
from test.lib.schema import Table, Column
13
13
from sqlalchemy.orm import mapper, relationship, backref, joinedload, \
14
14
exc as orm_exc, object_session
15
from sqlalchemy.test.testing import eq_
16
from test.engine import _base as engine_base
17
from test.orm import _base, _fixtures
15
from sqlalchemy.util import pypy
16
from test.lib import fixtures
17
from test.lib import fixtures
18
from test.orm import _fixtures
20
20
class SessionTest(_fixtures.FixtureTest):
23
@testing.resolve_artifact_names
24
23
def test_no_close_on_flush(self):
25
24
"""Flush() doesn't close a connection the session didn't open"""
26
User, users = self.classes.User, self.tables.users
26
28
c = testing.db.connect()
27
29
c.execute("select * from users")
93
99
seq.drop(testing.db)
96
@testing.resolve_artifact_names
97
def test_expunge_cascade(self):
98
mapper(Address, addresses)
99
mapper(User, users, properties={
100
'addresses':relationship(Address,
101
backref=backref("user", cascade="all"),
104
_fixtures.run_inserts_for(users)
105
_fixtures.run_inserts_for(addresses)
107
session = create_session()
108
u = session.query(User).filter_by(id=7).one()
110
# get everything to load in both directions
111
print [a.user for a in u.addresses]
113
# then see if expunge fails
116
assert sa.orm.object_session(u) is None
117
assert sa.orm.attributes.instance_state(u).session_id is None
118
for a in u.addresses:
119
assert sa.orm.object_session(a) is None
120
assert sa.orm.attributes.instance_state(a).session_id is None
122
103
@engines.close_open_connections
123
@testing.resolve_artifact_names
124
104
def test_mapped_binds(self):
105
Address, addresses, users, User = (self.classes.Address,
106
self.tables.addresses,
126
111
# ensure tables are unbound
127
112
m2 = sa.MetaData()
415
@testing.resolve_artifact_names
416
def test_autoflush_rollback(self):
417
mapper(Address, addresses)
418
mapper(User, users, properties={
419
'addresses':relationship(Address)})
421
_fixtures.run_inserts_for(users)
422
_fixtures.run_inserts_for(addresses)
424
sess = create_session(autocommit=False, autoflush=True)
425
u = sess.query(User).get(8)
426
newad = Address(email_address='a new address')
427
u.addresses.append(newad)
428
u.name = 'some new name'
429
assert u.name == 'some new name'
430
assert len(u.addresses) == 4
431
assert newad in u.addresses
433
assert u.name == 'ed'
434
assert len(u.addresses) == 3
436
assert newad not in u.addresses
437
# pending objects dont get expired
438
assert newad.email_address == 'a new address'
440
@testing.resolve_artifact_names
441
418
def test_autocommit_doesnt_raise_on_pending(self):
419
User, users = self.classes.User, self.tables.users
442
421
mapper(User, users)
443
422
session = create_session(autocommit=True)
826
826
assert not c.in_transaction()
827
827
assert c.scalar("select count(1) from users") == 1
829
def test_bind_arguments(self):
830
users, Address, addresses, User = (self.tables.users,
831
self.classes.Address,
832
self.tables.addresses,
836
mapper(Address, addresses)
838
e1 = engines.testing_engine()
839
e2 = engines.testing_engine()
840
e3 = engines.testing_engine()
843
sess.bind_mapper(User, e1)
844
sess.bind_mapper(Address, e2)
846
assert sess.connection().engine is e3
847
assert sess.connection(bind=e1).engine is e1
848
assert sess.connection(mapper=Address, bind=e1).engine is e1
849
assert sess.connection(mapper=Address).engine is e2
850
assert sess.connection(clause=addresses.select()).engine is e2
851
assert sess.connection(mapper=User,
852
clause=addresses.select()).engine is e1
853
assert sess.connection(mapper=User,
854
clause=addresses.select(),
855
bind=e2).engine is e2
830
859
@engines.close_open_connections
831
@testing.resolve_artifact_names
832
860
def test_add_delete(self):
861
User, Address, addresses, users = (self.classes.User,
862
self.classes.Address,
863
self.tables.addresses,
834
867
s = create_session()
835
868
mapper(User, users, properties={
1139
1195
self.assert_(s.prune() == 0)
1140
1196
self.assert_(len(s.identity_map) == 0)
1142
@testing.resolve_artifact_names
1143
def test_no_save_cascade_1(self):
1144
mapper(Address, addresses)
1145
mapper(User, users, properties=dict(
1146
addresses=relationship(Address, cascade="none", backref="user")))
1147
s = create_session()
1151
a = Address(email_address='u1@e')
1152
u.addresses.append(a)
1156
print "\n".join([repr(x.__dict__) for x in s])
1158
assert s.query(User).one().id == u.id
1159
assert s.query(Address).first() is None
1161
@testing.resolve_artifact_names
1162
def test_no_save_cascade_2(self):
1163
mapper(Address, addresses)
1164
mapper(User, users, properties=dict(
1165
addresses=relationship(Address,
1167
backref=backref("user", cascade="none"))))
1169
s = create_session()
1171
a = Address(email_address='u1@e')
1178
assert s.query(Address).one().id == a.id
1179
assert s.query(User).first() is None
1181
@testing.resolve_artifact_names
1182
def test_extension(self):
1185
class MyExt(sa.orm.session.SessionExtension):
1186
def before_commit(self, session):
1187
log.append('before_commit')
1188
def after_commit(self, session):
1189
log.append('after_commit')
1190
def after_rollback(self, session):
1191
log.append('after_rollback')
1192
def before_flush(self, session, flush_context, objects):
1193
log.append('before_flush')
1194
def after_flush(self, session, flush_context):
1195
log.append('after_flush')
1196
def after_flush_postexec(self, session, flush_context):
1197
log.append('after_flush_postexec')
1198
def after_begin(self, session, transaction, connection):
1199
log.append('after_begin')
1200
def after_attach(self, session, instance):
1201
log.append('after_attach')
1202
def after_bulk_update(
1209
log.append('after_bulk_update')
1211
def after_bulk_delete(
1218
log.append('after_bulk_delete')
1220
sess = create_session(extension = MyExt())
1231
'after_flush_postexec',
1234
sess = create_session(autocommit=False, extension=MyExt())
1238
assert log == ['after_attach', 'before_flush', 'after_begin',
1239
'after_flush', 'after_flush_postexec']
1243
assert log == ['before_commit', 'before_flush', 'after_flush',
1244
'after_flush_postexec', 'after_commit']
1247
assert log == ['before_commit', 'after_commit']
1249
sess.query(User).delete()
1250
assert log == ['after_begin', 'after_bulk_delete']
1252
sess.query(User).update({'name': 'foo'})
1253
assert log == ['after_bulk_update']
1255
sess = create_session(autocommit=False, extension=MyExt(),
1257
conn = sess.connection()
1258
assert log == ['after_begin']
1260
@testing.resolve_artifact_names
1261
def test_before_flush(self):
1262
"""test that the flush plan can be affected during before_flush()"""
1266
class MyExt(sa.orm.session.SessionExtension):
1267
def before_flush(self, session, flush_context, objects):
1268
for obj in list(session.new) + list(session.dirty):
1269
if isinstance(obj, User):
1270
session.add(User(name='another %s' % obj.name))
1271
for obj in list(session.deleted):
1272
if isinstance(obj, User):
1273
x = session.query(User).filter(User.name
1274
== 'another %s' % obj.name).one()
1277
sess = create_session(extension = MyExt(), autoflush=True)
1281
eq_(sess.query(User).order_by(User.name).all(),
1283
User(name='another u1'),
1289
eq_(sess.query(User).order_by(User.name).all(),
1291
User(name='another u1'),
1298
eq_(sess.query(User).order_by(User.name).all(),
1300
User(name='another u1'),
1301
User(name='another u2'),
1308
eq_(sess.query(User).order_by(User.name).all(),
1310
User(name='another u1'),
1314
@testing.resolve_artifact_names
1315
def test_before_flush_affects_dirty(self):
1318
class MyExt(sa.orm.session.SessionExtension):
1319
def before_flush(self, session, flush_context, objects):
1320
for obj in list(session.identity_map.values()):
1321
obj.name += " modified"
1323
sess = create_session(extension = MyExt(), autoflush=True)
1327
eq_(sess.query(User).order_by(User.name).all(),
1333
sess.add(User(name='u2'))
1336
eq_(sess.query(User).order_by(User.name).all(),
1338
User(name='u1 modified'),
1343
@testing.resolve_artifact_names
1344
def test_reentrant_flush(self):
1348
class MyExt(sa.orm.session.SessionExtension):
1349
def before_flush(s, session, flush_context, objects):
1352
sess = create_session(extension=MyExt())
1353
sess.add(User(name='foo'))
1354
assert_raises_message(sa.exc.InvalidRequestError,
1355
'already flushing', sess.flush)
1357
@testing.resolve_artifact_names
1358
1199
def test_pickled_update(self):
1200
users, User = self.tables.users, pickleable.User
1359
1202
mapper(User, users)
1360
1203
sess1 = create_session()
1361
1204
sess2 = create_session()
1443
class DisposedStates(_base.MappedTest):
1288
class SessionDataTest(_fixtures.FixtureTest):
1289
def test_expunge_cascade(self):
1290
Address, addresses, users, User = (self.classes.Address,
1291
self.tables.addresses,
1295
mapper(Address, addresses)
1296
mapper(User, users, properties={
1297
'addresses':relationship(Address,
1298
backref=backref("user", cascade="all"),
1301
session = create_session()
1302
u = session.query(User).filter_by(id=7).one()
1304
# get everything to load in both directions
1305
print [a.user for a in u.addresses]
1307
# then see if expunge fails
1310
assert sa.orm.object_session(u) is None
1311
assert sa.orm.attributes.instance_state(u).session_id is None
1312
for a in u.addresses:
1313
assert sa.orm.object_session(a) is None
1314
assert sa.orm.attributes.instance_state(a).session_id is None
1316
def test_autoflush_rollback(self):
1317
Address, addresses, users, User = (self.classes.Address,
1318
self.tables.addresses,
1322
mapper(Address, addresses)
1323
mapper(User, users, properties={
1324
'addresses':relationship(Address)})
1326
sess = create_session(autocommit=False, autoflush=True)
1327
u = sess.query(User).get(8)
1328
newad = Address(email_address='a new address')
1329
u.addresses.append(newad)
1330
u.name = 'some new name'
1331
assert u.name == 'some new name'
1332
assert len(u.addresses) == 4
1333
assert newad in u.addresses
1335
assert u.name == 'ed'
1336
assert len(u.addresses) == 3
1338
assert newad not in u.addresses
1339
# pending objects dont get expired
1340
assert newad.email_address == 'a new address'
1343
class DisposedStates(fixtures.MappedTest):
1444
1344
run_setup_mappers = 'once'
1445
1345
run_inserts = 'once'
1446
1346
run_deletes = None