2
from sqlalchemy import pool, interfaces, select, event
1
from __future__ import with_statement
5
from sqlalchemy import pool, select, event
3
6
import sqlalchemy as tsa
4
from test.lib import testing
5
from test.lib.util import gc_collect, lazy_gc
6
from test.lib.testing import eq_, assert_raises
7
from test.lib.engines import testing_engine
8
from test.lib import fixtures
11
class MockDBAPI(object):
13
def connect(self, *args, **kwargs):
15
raise Exception("couldnt connect !")
16
delay = kwargs.pop('delay', 0)
19
return MockConnection()
20
class MockConnection(object):
32
class MockCursor(object):
33
def execute(self, *args, **kw):
7
from sqlalchemy import testing
8
from sqlalchemy.testing.util import gc_collect, lazy_gc
9
from sqlalchemy.testing import eq_, assert_raises, is_not_
10
from sqlalchemy.testing.engines import testing_engine
11
from sqlalchemy.testing import fixtures
13
from sqlalchemy.testing.mock import Mock, call
21
yield Mock(cursor=Mock(side_effect=cursor()))
23
return Mock(connect=Mock(side_effect=connect()))
38
25
class PoolTestBase(fixtures.TestBase):
43
30
def teardown_class(cls):
46
33
def _queuepool_fixture(self, **kw):
48
return pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw)
34
dbapi, pool = self._queuepool_dbapi_fixture(**kw)
50
37
def _queuepool_dbapi_fixture(self, **kw):
51
38
dbapi = MockDBAPI()
52
return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw)
39
return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
54
42
class PoolTest(PoolTestBase):
55
43
def test_manager(self):
69
57
assert c4 is not c5
71
59
def test_manager_with_key(self):
73
def connect(self, arg):
74
return MockConnection()
76
manager = pool.manage(NoKws(), use_threadlocal=True)
62
manager = pool.manage(dbapi, use_threadlocal=True)
78
64
c1 = manager.connect('foo.db', sa_pool_key="a")
79
65
c2 = manager.connect('foo.db', sa_pool_key="b")
82
68
assert c1.cursor() is not None
83
69
assert c1 is not c2
72
eq_(dbapi.connect.mock_calls,
86
80
def test_bad_args(self):
87
81
manager = pool.manage(MockDBAPI())
88
connection = manager.connect(None)
90
84
def test_non_thread_local_manager(self):
91
manager = pool.manage(MockDBAPI(), use_threadlocal = False)
85
manager = pool.manage(MockDBAPI(), use_threadlocal=False)
93
87
connection = manager.connect('foo.db')
94
88
connection2 = manager.connect('foo.db')
205
200
self.assert_('foo2' in c.info)
208
self.assert_(c.connection is not c2.connection)
209
self.assert_(not c2.info)
210
self.assert_('foo2' in c.info)
214
class PoolEventsTest(object): #PoolTestBase):
203
is_not_(c.connection, c2.connection)
205
assert 'foo2' in c.info
208
class PoolDialectTest(PoolTestBase):
211
class PoolDialect(object):
212
def do_rollback(self, dbapi_connection):
214
dbapi_connection.rollback()
216
def do_commit(self, dbapi_connection):
218
dbapi_connection.commit()
220
def do_close(self, dbapi_connection):
222
dbapi_connection.close()
223
return PoolDialect(), canary
225
def _do_test(self, pool_cls, assertion):
226
mock_dbapi = MockDBAPI()
227
dialect, canary = self._dialect()
229
p = pool_cls(creator=mock_dbapi.connect)
237
eq_(canary, assertion)
239
def test_queue_pool(self):
240
self._do_test(pool.QueuePool, ['R', 'CL', 'R'])
242
def test_assertion_pool(self):
243
self._do_test(pool.AssertionPool, ['R', 'CL', 'R'])
245
def test_singleton_pool(self):
246
self._do_test(pool.SingletonThreadPool, ['R', 'CL', 'R'])
248
def test_null_pool(self):
249
self._do_test(pool.NullPool, ['R', 'CL', 'R', 'CL'])
251
def test_static_pool(self):
252
self._do_test(pool.StaticPool, ['R', 'R'])
255
class PoolEventsTest(PoolTestBase):
215
256
def _first_connect_event_fixture(self):
216
257
p = self._queuepool_fixture()
293
def _reset_event_fixture(self):
294
p = self._queuepool_fixture()
296
def reset(*arg, **kw):
297
canary.append('reset')
298
event.listen(p, 'reset', reset)
252
302
def test_first_connect_event(self):
253
303
p, canary = self._first_connect_event_fixture()
345
395
eq_(canary, ['checkin'])
397
def test_reset_event(self):
398
p, canary = self._reset_event_fixture()
403
eq_(canary, ['reset'])
347
405
def test_checkin_event_gc(self):
348
406
p, canary = self._checkin_event_fixture()
388
446
engine.execute(select([1])).close()
390
canary, ["listen_one","listen_four", "listen_two","listen_three"]
449
["listen_one", "listen_four", "listen_two", "listen_three"]
393
452
def test_listen_targets_per_subclass(self):
426
485
pool.Pool.dispatch._clear()
428
487
class DeprecatedPoolListenerTest(PoolTestBase):
488
@testing.requires.predictable_gc
429
489
@testing.uses_deprecated(r".*Use event.listen")
430
490
def test_listeners(self):
431
491
class InstrumentingListener(object):
728
except tsa.exc.TimeoutError, e:
788
except tsa.exc.TimeoutError:
729
789
assert int(time.time() - now) == 2
791
@testing.requires.threading_with_mock
731
792
def test_timeout_race(self):
732
793
# test a race condition where the initial connecting threads all race
733
794
# to queue.Empty, then block on the mutex. each thread consumes a
738
799
# them back to the start of do_get()
739
800
dbapi = MockDBAPI()
740
801
p = pool.QueuePool(
741
creator = lambda: dbapi.connect(delay=.05),
743
max_overflow = 1, use_threadlocal = False, timeout=3)
802
creator=lambda: dbapi.connect(delay=.05),
804
max_overflow=1, use_threadlocal=False, timeout=3)
746
807
for x in xrange(1):
747
808
now = time.time()
750
except tsa.exc.TimeoutError, e:
811
except tsa.exc.TimeoutError:
751
812
timeouts.append(time.time() - now)
766
827
assert t >= 3, "Not all timeouts were >= 3 seconds %r" % timeouts
767
828
# normally, the timeout should under 4 seconds,
768
829
# but on a loaded down buildbot it can go up.
769
assert t < 10, "Not all timeouts were < 10 seconds %r" % timeouts
830
assert t < 14, "Not all timeouts were < 14 seconds %r" % timeouts
771
832
def _test_overflow(self, thread_count, max_overflow):
875
938
def test_dispose_closes_pooled(self):
876
939
dbapi = MockDBAPI()
878
return dbapi.connect()
880
p = pool.QueuePool(creator=creator,
941
p = pool.QueuePool(creator=dbapi.connect,
881
942
pool_size=2, timeout=None,
885
conns = [c1.connection, c2.connection]
946
c1_con = c1.connection
947
c2_con = c2.connection
887
eq_([c.closed for c in conns], [False, False])
951
eq_(c1_con.close.call_count, 0)
952
eq_(c2_con.close.call_count, 0)
889
eq_([c.closed for c in conns], [True, False])
956
eq_(c1_con.close.call_count, 1)
957
eq_(c2_con.close.call_count, 0)
891
959
# currently, if a ConnectionFairy is closed
892
960
# after the pool has been disposed, there's no
894
962
# immediately - it just gets returned to the
895
963
# pool normally...
897
eq_([c.closed for c in conns], [True, False])
965
eq_(c1_con.close.call_count, 1)
966
eq_(c2_con.close.call_count, 0)
899
968
# ...and that's the one we'll get back next.
901
assert c3.connection is conns[1]
970
assert c3.connection is c2_con
972
@testing.requires.threading_with_mock
903
973
def test_no_overflow(self):
904
974
self._test_overflow(40, 0)
976
@testing.requires.threading_with_mock
906
977
def test_max_overflow(self):
907
978
self._test_overflow(40, 5)
938
1009
strong_refs.add(c.connection)
942
conns = [_conn() for i in xrange(4)]
1013
# open 4 conns at a time. each time this
1014
# will yield two pooled connections + two
1015
# overflow connections.
1016
conns = [_conn() for i in range(4)]
946
still_opened = len([c for c in strong_refs if not c.closed])
1020
# doing that for a total of 5 times yields
1021
# ten overflow connections closed plus the
1022
# two pooled connections unclosed.
1025
set([c.close.call_count for c in strong_refs]),
1026
set([1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0])
1029
@testing.requires.predictable_gc
949
1030
def test_weakref_kaboom(self):
950
1031
p = self._queuepool_fixture(pool_size=3,
951
1032
max_overflow=-1, use_threadlocal=True)
994
1075
assert id(c3.connection) != c_id
996
1077
def test_invalidate(self):
997
p = self._queuepool_fixture(pool_size=1, max_overflow=0)
1078
p = self._queuepool_fixture(pool_size=1, max_overflow=0)
998
1079
c1 = p.connect()
999
1080
c_id = c1.connection.id
1007
1088
assert c1.connection.id != c_id
1009
1090
def test_recreate(self):
1010
p = self._queuepool_fixture(pool_size=1, max_overflow=0)
1091
p = self._queuepool_fixture(reset_on_return=None, pool_size=1, max_overflow=0)
1011
1092
p2 = p.recreate()
1012
1093
assert p2.size() == 1
1094
assert p2._reset_on_return is pool.reset_none
1013
1095
assert p2._use_threadlocal is False
1014
1096
assert p2._max_overflow == 0
1035
1117
dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0)
1036
1118
c1 = p.connect()
1038
c_id = c1.connection.id
1040
assert c2.connection.id != c1.connection.id
1041
dbapi.raise_error = True
1045
assert c2.connection.id != c1.connection.id
1047
assert not con.closed
1121
eq_(dbapi.connect.mock_calls, [call("foo.db"), call("foo.db")])
1123
c1_con = c1.connection
1124
assert c1_con is not None
1125
eq_(c1_con.close.call_count, 0)
1127
eq_(c1_con.close.call_count, 1)
1129
def test_detach_via_invalidate(self):
1130
dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0)
1133
c1_con = c1.connection
1135
assert c1.connection is None
1136
eq_(c1_con.close.call_count, 1)
1139
assert c2.connection is not c1_con
1140
c2_con = c2.connection
1143
eq_(c2_con.close.call_count, 0)
1051
1145
def test_threadfairy(self):
1052
1146
p = self._queuepool_fixture(pool_size=3, max_overflow=-1, use_threadlocal=True)
1058
1152
class SingletonThreadPoolTest(PoolTestBase):
1154
@testing.requires.threading_with_mock
1060
1155
def test_cleanup(self):
1061
1156
self._test_cleanup(False)
1158
@testing.requires.threading_with_mock
1063
1159
def test_cleanup_no_gc(self):
1064
1160
self._test_cleanup(True)
1099
1200
assert len(p._all_conns) == 3
1101
1202
if strong_refs:
1102
still_opened = len([c for c in sr if not c.closed])
1203
still_opened = len([c for c in sr if not c.close.call_count])
1103
1204
eq_(still_opened, 3)
1105
1206
class AssertionPoolTest(PoolTestBase):
1106
1207
def test_connect_error(self):
1107
1208
dbapi = MockDBAPI()
1108
p = pool.AssertionPool(creator = lambda: dbapi.connect('foo.db'))
1209
p = pool.AssertionPool(creator=lambda: dbapi.connect('foo.db'))
1109
1210
c1 = p.connect()
1110
1211
assert_raises(AssertionError, p.connect)
1112
1213
def test_connect_multiple(self):
1113
1214
dbapi = MockDBAPI()
1114
p = pool.AssertionPool(creator = lambda: dbapi.connect('foo.db'))
1215
p = pool.AssertionPool(creator=lambda: dbapi.connect('foo.db'))
1115
1216
c1 = p.connect()
1117
1218
c2 = p.connect()
1123
1224
class NullPoolTest(PoolTestBase):
1124
1225
def test_reconnect(self):
1125
1226
dbapi = MockDBAPI()
1126
p = pool.NullPool(creator = lambda: dbapi.connect('foo.db'))
1128
c_id = c1.connection.id
1132
dbapi.raise_error = True
1227
p = pool.NullPool(creator=lambda: dbapi.connect('foo.db'))
1133
1234
c1.invalidate()
1136
1237
c1 = p.connect()
1137
assert c1.connection.id != c_id
1238
dbapi.connect.assert_has_calls([
1140
1244
class StaticPoolTest(PoolTestBase):