~ubuntu-branches/debian/jessie/sqlalchemy/jessie

« back to all changes in this revision

Viewing changes to test/engine/test_pool.py

  • Committer: Package Import Robot
  • Author(s): Piotr Ożarowski, Jakub Wilk, Piotr Ożarowski
  • Date: 2013-07-06 20:53:52 UTC
  • mfrom: (1.4.23) (16.1.17 experimental)
  • Revision ID: package-import@ubuntu.com-20130706205352-ryppl1eto3illd79
Tags: 0.8.2-1
[ Jakub Wilk ]
* Use canonical URIs for Vcs-* fields.

[ Piotr Ożarowski ]
* New upstream release
* Upload to unstable
* Build depend on python3-all instead of -dev, extensions are not built for
  Python 3.X 

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
import threading, time
2
 
from sqlalchemy import pool, interfaces, select, event
 
1
from __future__ import with_statement
 
2
 
 
3
import threading
 
4
import time
 
5
from sqlalchemy import pool, select, event
3
6
import sqlalchemy as tsa
4
 
from test.lib import testing
5
 
from test.lib.util import gc_collect, lazy_gc
6
 
from test.lib.testing import eq_, assert_raises
7
 
from test.lib.engines import testing_engine
8
 
from test.lib import fixtures
9
 
 
10
 
mcid = 1
11
 
class MockDBAPI(object):
12
 
    throw_error = False
13
 
    def connect(self, *args, **kwargs):
14
 
        if self.throw_error:
15
 
            raise Exception("couldnt connect !")
16
 
        delay = kwargs.pop('delay', 0)
17
 
        if delay:
18
 
            time.sleep(delay)
19
 
        return MockConnection()
20
 
class MockConnection(object):
21
 
    closed = False
22
 
    def __init__(self):
23
 
        global mcid
24
 
        self.id = mcid
25
 
        mcid += 1
26
 
    def close(self):
27
 
        self.closed = True
28
 
    def rollback(self):
29
 
        pass
30
 
    def cursor(self):
31
 
        return MockCursor()
32
 
class MockCursor(object):
33
 
    def execute(self, *args, **kw):
34
 
        pass
35
 
    def close(self):
36
 
        pass
 
7
from sqlalchemy import testing
 
8
from sqlalchemy.testing.util import gc_collect, lazy_gc
 
9
from sqlalchemy.testing import eq_, assert_raises, is_not_
 
10
from sqlalchemy.testing.engines import testing_engine
 
11
from sqlalchemy.testing import fixtures
 
12
 
 
13
from sqlalchemy.testing.mock import Mock, call
 
14
 
 
15
def MockDBAPI():
 
16
    def cursor():
 
17
        while True:
 
18
            yield Mock()
 
19
    def connect():
 
20
        while True:
 
21
            yield Mock(cursor=Mock(side_effect=cursor()))
 
22
 
 
23
    return Mock(connect=Mock(side_effect=connect()))
37
24
 
38
25
class PoolTestBase(fixtures.TestBase):
39
26
    def setup(self):
41
28
 
42
29
    @classmethod
43
30
    def teardown_class(cls):
44
 
       pool.clear_managers()
 
31
        pool.clear_managers()
45
32
 
46
33
    def _queuepool_fixture(self, **kw):
47
 
        dbapi = MockDBAPI()
48
 
        return pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw)
 
34
        dbapi, pool = self._queuepool_dbapi_fixture(**kw)
 
35
        return pool
49
36
 
50
37
    def _queuepool_dbapi_fixture(self, **kw):
51
38
        dbapi = MockDBAPI()
52
 
        return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw)
 
39
        return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
 
40
                        **kw)
53
41
 
54
42
class PoolTest(PoolTestBase):
55
43
    def test_manager(self):
69
57
        assert c4 is not c5
70
58
 
71
59
    def test_manager_with_key(self):
72
 
        class NoKws(object):
73
 
            def connect(self, arg):
74
 
                return MockConnection()
75
60
 
76
 
        manager = pool.manage(NoKws(), use_threadlocal=True)
 
61
        dbapi = MockDBAPI()
 
62
        manager = pool.manage(dbapi, use_threadlocal=True)
77
63
 
78
64
        c1 = manager.connect('foo.db', sa_pool_key="a")
79
65
        c2 = manager.connect('foo.db', sa_pool_key="b")
81
67
 
82
68
        assert c1.cursor() is not None
83
69
        assert c1 is not c2
84
 
        assert c1 is  c3
 
70
        assert c1 is c3
 
71
 
 
72
        eq_(dbapi.connect.mock_calls,
 
73
            [
 
74
                call("foo.db"),
 
75
                call("foo.db"),
 
76
            ]
 
77
        )
 
78
 
85
79
 
86
80
    def test_bad_args(self):
87
81
        manager = pool.manage(MockDBAPI())
88
 
        connection = manager.connect(None)
 
82
        manager.connect(None)
89
83
 
90
84
    def test_non_thread_local_manager(self):
91
 
        manager = pool.manage(MockDBAPI(), use_threadlocal = False)
 
85
        manager = pool.manage(MockDBAPI(), use_threadlocal=False)
92
86
 
93
87
        connection = manager.connect('foo.db')
94
88
        connection2 = manager.connect('foo.db')
106
100
        for row in cursor:
107
101
            eq_(row, expected.pop(0))
108
102
 
 
103
 
109
104
    def test_no_connect_on_recreate(self):
110
105
        def creator():
111
106
            raise Exception("no creates allowed")
121
116
            p = cls(creator=mock_dbapi.connect)
122
117
            conn = p.connect()
123
118
            conn.close()
124
 
            mock_dbapi.throw_error = True
 
119
            mock_dbapi.connect.side_effect = Exception("error!")
125
120
            p.dispose()
126
121
            p.recreate()
127
122
 
182
177
                    lazy_gc()
183
178
                self.assert_(p.checkedout() == 0)
184
179
 
185
 
    def test_properties(self):
 
180
    def test_info(self):
186
181
        p = self._queuepool_fixture(pool_size=1, max_overflow=0)
187
182
 
188
183
        c = p.connect()
205
200
        self.assert_('foo2' in c.info)
206
201
 
207
202
        c2 = p.connect()
208
 
        self.assert_(c.connection is not c2.connection)
209
 
        self.assert_(not c2.info)
210
 
        self.assert_('foo2' in c.info)
211
 
 
212
 
 
213
 
 
214
 
class PoolEventsTest(object): #PoolTestBase):
 
203
        is_not_(c.connection, c2.connection)
 
204
        assert not c2.info
 
205
        assert 'foo2' in c.info
 
206
 
 
207
 
 
208
class PoolDialectTest(PoolTestBase):
 
209
    def _dialect(self):
 
210
        canary = []
 
211
        class PoolDialect(object):
 
212
            def do_rollback(self, dbapi_connection):
 
213
                canary.append('R')
 
214
                dbapi_connection.rollback()
 
215
 
 
216
            def do_commit(self, dbapi_connection):
 
217
                canary.append('C')
 
218
                dbapi_connection.commit()
 
219
 
 
220
            def do_close(self, dbapi_connection):
 
221
                canary.append('CL')
 
222
                dbapi_connection.close()
 
223
        return PoolDialect(), canary
 
224
 
 
225
    def _do_test(self, pool_cls, assertion):
 
226
        mock_dbapi = MockDBAPI()
 
227
        dialect, canary = self._dialect()
 
228
 
 
229
        p = pool_cls(creator=mock_dbapi.connect)
 
230
        p._dialect = dialect
 
231
        conn = p.connect()
 
232
        conn.close()
 
233
        p.dispose()
 
234
        p.recreate()
 
235
        conn = p.connect()
 
236
        conn.close()
 
237
        eq_(canary, assertion)
 
238
 
 
239
    def test_queue_pool(self):
 
240
        self._do_test(pool.QueuePool, ['R', 'CL', 'R'])
 
241
 
 
242
    def test_assertion_pool(self):
 
243
        self._do_test(pool.AssertionPool, ['R', 'CL', 'R'])
 
244
 
 
245
    def test_singleton_pool(self):
 
246
        self._do_test(pool.SingletonThreadPool, ['R', 'CL', 'R'])
 
247
 
 
248
    def test_null_pool(self):
 
249
        self._do_test(pool.NullPool, ['R', 'CL', 'R', 'CL'])
 
250
 
 
251
    def test_static_pool(self):
 
252
        self._do_test(pool.StaticPool, ['R', 'R'])
 
253
 
 
254
 
 
255
class PoolEventsTest(PoolTestBase):
215
256
    def _first_connect_event_fixture(self):
216
257
        p = self._queuepool_fixture()
217
258
        canary = []
249
290
 
250
291
        return p, canary
251
292
 
 
293
    def _reset_event_fixture(self):
 
294
        p = self._queuepool_fixture()
 
295
        canary = []
 
296
        def reset(*arg, **kw):
 
297
            canary.append('reset')
 
298
        event.listen(p, 'reset', reset)
 
299
 
 
300
        return p, canary
 
301
 
252
302
    def test_first_connect_event(self):
253
303
        p, canary = self._first_connect_event_fixture()
254
304
 
344
394
        c1.close()
345
395
        eq_(canary, ['checkin'])
346
396
 
 
397
    def test_reset_event(self):
 
398
        p, canary = self._reset_event_fixture()
 
399
 
 
400
        c1 = p.connect()
 
401
        eq_(canary, [])
 
402
        c1.close()
 
403
        eq_(canary, ['reset'])
 
404
 
347
405
    def test_checkin_event_gc(self):
348
406
        p, canary = self._checkin_event_fixture()
349
407
 
387
445
 
388
446
        engine.execute(select([1])).close()
389
447
        eq_(
390
 
            canary, ["listen_one","listen_four", "listen_two","listen_three"]
 
448
            canary,
 
449
            ["listen_one", "listen_four", "listen_two", "listen_three"]
391
450
        )
392
451
 
393
452
    def test_listen_targets_per_subclass(self):
426
485
        pool.Pool.dispatch._clear()
427
486
 
428
487
class DeprecatedPoolListenerTest(PoolTestBase):
 
488
    @testing.requires.predictable_gc
429
489
    @testing.uses_deprecated(r".*Use event.listen")
430
490
    def test_listeners(self):
431
491
        class InstrumentingListener(object):
725
785
        try:
726
786
            c4 = p.connect()
727
787
            assert False
728
 
        except tsa.exc.TimeoutError, e:
 
788
        except tsa.exc.TimeoutError:
729
789
            assert int(time.time() - now) == 2
730
790
 
 
791
    @testing.requires.threading_with_mock
731
792
    def test_timeout_race(self):
732
793
        # test a race condition where the initial connecting threads all race
733
794
        # to queue.Empty, then block on the mutex.  each thread consumes a
738
799
        # them back to the start of do_get()
739
800
        dbapi = MockDBAPI()
740
801
        p = pool.QueuePool(
741
 
                creator = lambda: dbapi.connect(delay=.05),
742
 
                pool_size = 2,
743
 
                max_overflow = 1, use_threadlocal = False, timeout=3)
 
802
                creator=lambda: dbapi.connect(delay=.05),
 
803
                pool_size=2,
 
804
                max_overflow=1, use_threadlocal=False, timeout=3)
744
805
        timeouts = []
745
806
        def checkout():
746
807
            for x in xrange(1):
747
808
                now = time.time()
748
809
                try:
749
810
                    c1 = p.connect()
750
 
                except tsa.exc.TimeoutError, e:
 
811
                except tsa.exc.TimeoutError:
751
812
                    timeouts.append(time.time() - now)
752
813
                    continue
753
814
                time.sleep(4)
766
827
            assert t >= 3, "Not all timeouts were >= 3 seconds %r" % timeouts
767
828
            # normally, the timeout should under 4 seconds,
768
829
            # but on a loaded down buildbot it can go up.
769
 
            assert t < 10, "Not all timeouts were < 10 seconds %r" % timeouts
 
830
            assert t < 14, "Not all timeouts were < 14 seconds %r" % timeouts
770
831
 
771
832
    def _test_overflow(self, thread_count, max_overflow):
772
833
        gc_collect()
803
864
        lazy_gc()
804
865
        assert not pool._refs
805
866
 
 
867
    @testing.requires.threading_with_mock
806
868
    def test_waiters_handled(self):
807
869
        """test that threads waiting for connections are
808
870
        handled when the pool is replaced.
830
892
 
831
893
                for i in range(2):
832
894
                    t = threading.Thread(target=waiter, args=(p, ))
833
 
                    t.setDaemon(True) # so the tests dont hang if this fails
 
895
                    t.setDaemon(True)  # so the tests dont hang if this fails
834
896
                    t.start()
835
897
 
836
898
                c1.invalidate()
839
901
        time.sleep(2)
840
902
        eq_(len(success), 12)
841
903
 
 
904
    @testing.requires.threading_with_mock
842
905
    @testing.requires.python26
843
906
    def test_notify_waiters(self):
844
907
        dbapi = MockDBAPI()
874
937
 
875
938
    def test_dispose_closes_pooled(self):
876
939
        dbapi = MockDBAPI()
877
 
        def creator():
878
 
            return dbapi.connect()
879
940
 
880
 
        p = pool.QueuePool(creator=creator,
 
941
        p = pool.QueuePool(creator=dbapi.connect,
881
942
                           pool_size=2, timeout=None,
882
943
                           max_overflow=0)
883
944
        c1 = p.connect()
884
945
        c2 = p.connect()
885
 
        conns = [c1.connection, c2.connection]
 
946
        c1_con = c1.connection
 
947
        c2_con = c2.connection
 
948
 
886
949
        c1.close()
887
 
        eq_([c.closed for c in conns], [False, False])
 
950
 
 
951
        eq_(c1_con.close.call_count, 0)
 
952
        eq_(c2_con.close.call_count, 0)
 
953
 
888
954
        p.dispose()
889
 
        eq_([c.closed for c in conns], [True, False])
 
955
 
 
956
        eq_(c1_con.close.call_count, 1)
 
957
        eq_(c2_con.close.call_count, 0)
890
958
 
891
959
        # currently, if a ConnectionFairy is closed
892
960
        # after the pool has been disposed, there's no
894
962
        # immediately - it just gets returned to the
895
963
        # pool normally...
896
964
        c2.close()
897
 
        eq_([c.closed for c in conns], [True, False])
 
965
        eq_(c1_con.close.call_count, 1)
 
966
        eq_(c2_con.close.call_count, 0)
898
967
 
899
968
        # ...and that's the one we'll get back next.
900
969
        c3 = p.connect()
901
 
        assert c3.connection is conns[1]
 
970
        assert c3.connection is c2_con
902
971
 
 
972
    @testing.requires.threading_with_mock
903
973
    def test_no_overflow(self):
904
974
        self._test_overflow(40, 0)
905
975
 
 
976
    @testing.requires.threading_with_mock
906
977
    def test_max_overflow(self):
907
978
        self._test_overflow(40, 5)
908
979
 
938
1009
            strong_refs.add(c.connection)
939
1010
            return c
940
1011
 
941
 
        for j in xrange(5):
942
 
            conns = [_conn() for i in xrange(4)]
 
1012
        for j in range(5):
 
1013
            # open 4 conns at a time.  each time this
 
1014
            # will yield two pooled connections + two
 
1015
            # overflow connections.
 
1016
            conns = [_conn() for i in range(4)]
943
1017
            for c in conns:
944
1018
                c.close()
945
1019
 
946
 
        still_opened = len([c for c in strong_refs if not c.closed])
947
 
        eq_(still_opened, 2)
948
 
 
 
1020
        # doing that for a total of 5 times yields
 
1021
        # ten overflow connections closed plus the
 
1022
        # two pooled connections unclosed.
 
1023
 
 
1024
        eq_(
 
1025
            set([c.close.call_count for c in strong_refs]),
 
1026
            set([1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0])
 
1027
        )
 
1028
 
 
1029
    @testing.requires.predictable_gc
949
1030
    def test_weakref_kaboom(self):
950
1031
        p = self._queuepool_fixture(pool_size=3,
951
1032
                           max_overflow=-1, use_threadlocal=True)
994
1075
        assert id(c3.connection) != c_id
995
1076
 
996
1077
    def test_invalidate(self):
997
 
        p  = self._queuepool_fixture(pool_size=1, max_overflow=0)
 
1078
        p = self._queuepool_fixture(pool_size=1, max_overflow=0)
998
1079
        c1 = p.connect()
999
1080
        c_id = c1.connection.id
1000
1081
        c1.close()
1007
1088
        assert c1.connection.id != c_id
1008
1089
 
1009
1090
    def test_recreate(self):
1010
 
        p = self._queuepool_fixture(pool_size=1, max_overflow=0)
 
1091
        p = self._queuepool_fixture(reset_on_return=None, pool_size=1, max_overflow=0)
1011
1092
        p2 = p.recreate()
1012
1093
        assert p2.size() == 1
 
1094
        assert p2._reset_on_return is pool.reset_none
1013
1095
        assert p2._use_threadlocal is False
1014
1096
        assert p2._max_overflow == 0
1015
1097
 
1035
1117
        dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0)
1036
1118
        c1 = p.connect()
1037
1119
        c1.detach()
1038
 
        c_id = c1.connection.id
1039
 
        c2 = p.connect()
1040
 
        assert c2.connection.id != c1.connection.id
1041
 
        dbapi.raise_error = True
1042
 
        c2.invalidate()
1043
 
        c2 = None
1044
 
        c2 = p.connect()
1045
 
        assert c2.connection.id != c1.connection.id
1046
 
        con = c1.connection
1047
 
        assert not con.closed
 
1120
        c2 = p.connect()
 
1121
        eq_(dbapi.connect.mock_calls, [call("foo.db"), call("foo.db")])
 
1122
 
 
1123
        c1_con = c1.connection
 
1124
        assert c1_con is not None
 
1125
        eq_(c1_con.close.call_count, 0)
1048
1126
        c1.close()
1049
 
        assert con.closed
 
1127
        eq_(c1_con.close.call_count, 1)
 
1128
 
 
1129
    def test_detach_via_invalidate(self):
 
1130
        dbapi, p = self._queuepool_dbapi_fixture(pool_size=1, max_overflow=0)
 
1131
 
 
1132
        c1 = p.connect()
 
1133
        c1_con = c1.connection
 
1134
        c1.invalidate()
 
1135
        assert c1.connection is None
 
1136
        eq_(c1_con.close.call_count, 1)
 
1137
 
 
1138
        c2 = p.connect()
 
1139
        assert c2.connection is not c1_con
 
1140
        c2_con = c2.connection
 
1141
 
 
1142
        c2.close()
 
1143
        eq_(c2_con.close.call_count, 0)
1050
1144
 
1051
1145
    def test_threadfairy(self):
1052
1146
        p = self._queuepool_fixture(pool_size=3, max_overflow=-1, use_threadlocal=True)
1057
1151
 
1058
1152
class SingletonThreadPoolTest(PoolTestBase):
1059
1153
 
 
1154
    @testing.requires.threading_with_mock
1060
1155
    def test_cleanup(self):
1061
1156
        self._test_cleanup(False)
1062
1157
 
 
1158
    @testing.requires.threading_with_mock
1063
1159
    def test_cleanup_no_gc(self):
1064
1160
        self._test_cleanup(True)
1065
1161
 
1068
1164
        been called."""
1069
1165
 
1070
1166
        dbapi = MockDBAPI()
1071
 
        p = pool.SingletonThreadPool(creator=dbapi.connect,
1072
 
                pool_size=3)
 
1167
 
 
1168
        lock = threading.Lock()
 
1169
        def creator():
 
1170
            # the mock iterator isn't threadsafe...
 
1171
            with lock:
 
1172
                return dbapi.connect()
 
1173
        p = pool.SingletonThreadPool(creator=creator, pool_size=3)
1073
1174
 
1074
1175
        if strong_refs:
1075
1176
            sr = set()
1099
1200
        assert len(p._all_conns) == 3
1100
1201
 
1101
1202
        if strong_refs:
1102
 
            still_opened = len([c for c in sr if not c.closed])
 
1203
            still_opened = len([c for c in sr if not c.close.call_count])
1103
1204
            eq_(still_opened, 3)
1104
1205
 
1105
1206
class AssertionPoolTest(PoolTestBase):
1106
1207
    def test_connect_error(self):
1107
1208
        dbapi = MockDBAPI()
1108
 
        p = pool.AssertionPool(creator = lambda: dbapi.connect('foo.db'))
 
1209
        p = pool.AssertionPool(creator=lambda: dbapi.connect('foo.db'))
1109
1210
        c1 = p.connect()
1110
1211
        assert_raises(AssertionError, p.connect)
1111
1212
 
1112
1213
    def test_connect_multiple(self):
1113
1214
        dbapi = MockDBAPI()
1114
 
        p = pool.AssertionPool(creator = lambda: dbapi.connect('foo.db'))
 
1215
        p = pool.AssertionPool(creator=lambda: dbapi.connect('foo.db'))
1115
1216
        c1 = p.connect()
1116
1217
        c1.close()
1117
1218
        c2 = p.connect()
1123
1224
class NullPoolTest(PoolTestBase):
1124
1225
    def test_reconnect(self):
1125
1226
        dbapi = MockDBAPI()
1126
 
        p = pool.NullPool(creator = lambda: dbapi.connect('foo.db'))
1127
 
        c1 = p.connect()
1128
 
        c_id = c1.connection.id
1129
 
        c1.close(); c1=None
1130
 
 
1131
 
        c1 = p.connect()
1132
 
        dbapi.raise_error = True
 
1227
        p = pool.NullPool(creator=lambda: dbapi.connect('foo.db'))
 
1228
        c1 = p.connect()
 
1229
 
 
1230
        c1.close()
 
1231
        c1 = None
 
1232
 
 
1233
        c1 = p.connect()
1133
1234
        c1.invalidate()
1134
1235
        c1 = None
1135
1236
 
1136
1237
        c1 = p.connect()
1137
 
        assert c1.connection.id != c_id
 
1238
        dbapi.connect.assert_has_calls([
 
1239
                            call('foo.db'),
 
1240
                            call('foo.db')],
 
1241
                            any_order=True)
1138
1242
 
1139
1243
 
1140
1244
class StaticPoolTest(PoolTestBase):