~ubuntu-branches/ubuntu/wily/pymongo/wily

« back to all changes in this revision

Viewing changes to test/test_pooling_base.py

  • Committer: Package Import Robot
  • Author(s): Federico Ceratto
  • Date: 2012-05-10 21:21:40 UTC
  • mfrom: (1.1.11)
  • Revision ID: package-import@ubuntu.com-20120510212140-9c66c00zz850h6l9
Tags: 2.2-1
* New upstream release.
* Dependencies added (Closes: #670268)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2012 10gen, Inc.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License");
 
4
# you may not use this file except in compliance with the License.
 
5
# You may obtain a copy of the License at
 
6
#
 
7
# http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
12
# See the License for the specific language governing permissions and
 
13
# limitations under the License.
 
14
 
 
15
"""Base classes to test built-in connection-pooling with threads or greenlets.
 
16
"""
 
17
 
 
18
import gc
 
19
import random
 
20
import socket
 
21
import sys
 
22
import thread
 
23
import threading
 
24
import time
 
25
import bson
 
26
 
 
27
sys.path[0:0] = [""]
 
28
 
 
29
from nose.plugins.skip import SkipTest
 
30
 
 
31
import pymongo.pool
 
32
from pymongo.connection import Connection
 
33
from pymongo.pool import (
 
34
    Pool, GreenletPool, NO_REQUEST, NO_SOCKET_YET, SocketInfo)
 
35
from pymongo.errors import ConfigurationError
 
36
from test import version
 
37
from test.test_connection import get_connection, host, port
 
38
from test.utils import delay
 
39
 
 
40
N = 50
 
41
DB = "pymongo-pooling-tests"
 
42
 
 
43
 
 
44
if sys.version_info[0] >= 3:
 
45
    from imp import reload
 
46
 
 
47
 
 
48
try:
 
49
    import gevent
 
50
    from gevent import Greenlet, monkey
 
51
    has_gevent = True
 
52
except ImportError:
 
53
    has_gevent = False
 
54
 
 
55
 
 
56
def one(s):
 
57
    """Get one element of a set"""
 
58
    return iter(s).next()
 
59
 
 
60
 
 
61
def force_reclaim_sockets(cx_pool, n_expected):
 
62
    # When a thread dies without ending its request, the SocketInfo it was
 
63
    # using is deleted, and in its __del__ it returns the socket to the
 
64
    # pool. However, when exactly that happens is unpredictable. Try
 
65
    # various ways of forcing the issue.
 
66
 
 
67
    if sys.platform.startswith('java'):
 
68
        raise SkipTest("Jython can't reclaim sockets")
 
69
 
 
70
    if 'PyPy' in sys.version:
 
71
        raise SkipTest("Socket reclamation happens at unpredictable time in PyPy")
 
72
 
 
73
    # Bizarre behavior in CPython 2.4, and possibly other CPython versions
 
74
    # less than 2.7: the last dead thread's locals aren't cleaned up until
 
75
    # the local attribute with the same name is accessed from a different
 
76
    # thread. This assert checks that the thread-local is indeed local, and
 
77
    # also triggers the cleanup so the socket is reclaimed.
 
78
    if isinstance(cx_pool, Pool):
 
79
        assert cx_pool.local.sock_info is None
 
80
 
 
81
    # Try for a while to make garbage-collection call SocketInfo.__del__
 
82
    start = time.time()
 
83
    while len(cx_pool.sockets) < n_expected and time.time() - start < 5:
 
84
        try:
 
85
            gc.collect(2)
 
86
        except TypeError:
 
87
            # collect() didn't support 'generation' arg until 2.5
 
88
            gc.collect()
 
89
 
 
90
        time.sleep(0.5)
 
91
 
 
92
 
 
93
class MongoThread(object):
 
94
    """A thread, or a greenlet, that uses a Connection"""
 
95
    def __init__(self, test_case):
 
96
        self.use_greenlets = test_case.use_greenlets
 
97
        self.connection = test_case.c
 
98
        self.db = self.connection[DB]
 
99
        self.ut = test_case
 
100
        self.passed = False
 
101
 
 
102
    def start(self):
 
103
        if self.use_greenlets:
 
104
            # A Gevent extended Greenlet
 
105
            self.thread = Greenlet(self.run)
 
106
        else:
 
107
            self.thread = threading.Thread(target=self.run)
 
108
            self.thread.setDaemon(True) # Don't hang whole test if thread hangs
 
109
 
 
110
 
 
111
        self.thread.start()
 
112
 
 
113
    def join(self):
 
114
        self.thread.join(300)
 
115
        if self.use_greenlets:
 
116
            assert self.thread.dead, "Greenlet timeout"
 
117
        else:
 
118
            assert not self.thread.isAlive(), "Thread timeout"
 
119
 
 
120
        self.thread = None
 
121
 
 
122
    def run(self):
 
123
        self.run_mongo_thread()
 
124
 
 
125
        # No exceptions thrown
 
126
        self.passed = True
 
127
 
 
128
    def run_mongo_thread(self):
 
129
        raise NotImplementedError()
 
130
 
 
131
 
 
132
class SaveAndFind(MongoThread):
 
133
 
 
134
    def run_mongo_thread(self):
 
135
        for _ in xrange(N):
 
136
            rand = random.randint(0, N)
 
137
            _id = self.db.sf.save({"x": rand}, safe=True)
 
138
            self.ut.assertEqual(rand, self.db.sf.find_one(_id)["x"])
 
139
 
 
140
 
 
141
class Unique(MongoThread):
 
142
 
 
143
    def run_mongo_thread(self):
 
144
        for _ in xrange(N):
 
145
            self.connection.start_request()
 
146
            self.db.unique.insert({})
 
147
            self.ut.assertEqual(None, self.db.error())
 
148
            self.connection.end_request()
 
149
 
 
150
 
 
151
class NonUnique(MongoThread):
 
152
 
 
153
    def run_mongo_thread(self):
 
154
        for _ in xrange(N):
 
155
            self.connection.start_request()
 
156
            self.db.unique.insert({"_id": "jesse"})
 
157
            self.ut.assertNotEqual(None, self.db.error())
 
158
            self.connection.end_request()
 
159
 
 
160
 
 
161
class Disconnect(MongoThread):
 
162
 
 
163
    def run_mongo_thread(self):
 
164
        for _ in xrange(N):
 
165
            self.connection.disconnect()
 
166
 
 
167
 
 
168
class NoRequest(MongoThread):
 
169
 
 
170
    def run_mongo_thread(self):
 
171
        self.connection.start_request()
 
172
        errors = 0
 
173
        for _ in xrange(N):
 
174
            self.db.unique.insert({"_id": "jesse"})
 
175
            if not self.db.error():
 
176
                errors += 1
 
177
 
 
178
        self.connection.end_request()
 
179
        self.ut.assertEqual(0, errors)
 
180
 
 
181
 
 
182
def run_cases(ut, cases):
 
183
    threads = []
 
184
    nruns = 10
 
185
    if (
 
186
        ut.use_greenlets and sys.platform == 'darwin'
 
187
        and gevent.version_info[0] < 1
 
188
    ):
 
189
        # Gevent 0.13.6 bug on Mac, Greenlet.join() hangs if more than
 
190
        # about 35 Greenlets share a Connection. Apparently fixed in
 
191
        # recent Gevent development.
 
192
        nruns = 5
 
193
 
 
194
    for case in cases:
 
195
        for i in range(nruns):
 
196
            t = case(ut)
 
197
            t.start()
 
198
            threads.append(t)
 
199
 
 
200
    for t in threads:
 
201
        t.join()
 
202
 
 
203
    for t in threads:
 
204
        assert t.passed, "%s.run_mongo_thread() threw an exception" % repr(t)
 
205
 
 
206
 
 
207
class OneOp(MongoThread):
 
208
 
 
209
    def __init__(self, ut):
 
210
        super(OneOp, self).__init__(ut)
 
211
 
 
212
    def run_mongo_thread(self):
 
213
        pool = self.connection._Connection__pool
 
214
        assert len(pool.sockets) == 1, "Expected 1 socket, found %d" % (
 
215
            len(pool.sockets)
 
216
        )
 
217
 
 
218
        sock_info = one(pool.sockets)
 
219
 
 
220
        self.connection.start_request()
 
221
 
 
222
        # start_request() hasn't yet moved the socket from the general pool into
 
223
        # the request
 
224
        assert len(pool.sockets) == 1
 
225
        assert one(pool.sockets) == sock_info
 
226
 
 
227
        self.connection[DB].test.find_one()
 
228
 
 
229
        # find_one() causes the socket to be used in the request, so now it's
 
230
        # bound to this thread
 
231
        assert len(pool.sockets) == 0
 
232
        assert pool._get_request_state() == sock_info
 
233
        self.connection.end_request()
 
234
 
 
235
        # The socket is back in the pool
 
236
        assert len(pool.sockets) == 1
 
237
        assert one(pool.sockets) == sock_info
 
238
 
 
239
 
 
240
class CreateAndReleaseSocket(MongoThread):
 
241
 
 
242
    def __init__(self, ut, connection, start_request, end_request):
 
243
        super(CreateAndReleaseSocket, self).__init__(ut)
 
244
        self.connection = connection
 
245
        self.start_request = start_request
 
246
        self.end_request = end_request
 
247
 
 
248
    def run_mongo_thread(self):
 
249
        # Do an operation that requires a socket.
 
250
        # test_max_pool_size uses this to spin up lots of threads requiring
 
251
        # lots of simultaneous connections, to ensure that Pool obeys its
 
252
        # max_size configuration and closes extra sockets as they're returned.
 
253
        # We need a delay here to ensure that more than max_size sockets are
 
254
        # needed at once.
 
255
        for i in range(self.start_request):
 
256
            self.connection.start_request()
 
257
 
 
258
        self.connection[DB].test.find_one({'$where': delay(0.1)})
 
259
        for i in range(self.end_request):
 
260
            self.connection.end_request()
 
261
 
 
262
 
 
263
class _TestPoolingBase(object):
 
264
    """Base class for all connection-pool tests. Doesn't inherit from
 
265
    unittest.TestCase, and its name is prefixed with "_" to avoid being
 
266
    run by nose. Real tests double-inherit from this base and from TestCase.
 
267
    """
 
268
    use_greenlets = False
 
269
 
 
270
    def setUp(self):
 
271
        if self.use_greenlets:
 
272
            if not has_gevent:
 
273
                raise SkipTest("Gevent not installed")
 
274
 
 
275
            # Note we don't do patch_thread() or patch_all() - we're
 
276
            # testing here that patch_thread() is unnecessary for
 
277
            # the connection pool to work properly.
 
278
            monkey.patch_socket()
 
279
 
 
280
        self.c = self.get_connection(auto_start_request=False)
 
281
 
 
282
        # reset the db
 
283
        db = self.c[DB]
 
284
        db.unique.drop()
 
285
        db.test.drop()
 
286
        db.unique.insert({"_id": "jesse"})
 
287
 
 
288
        db.test.insert([{} for i in range(10)])
 
289
 
 
290
    def tearDown(self):
 
291
        self.c.close()
 
292
        if self.use_greenlets:
 
293
            # Undo patch
 
294
            reload(socket)
 
295
 
 
296
    def get_connection(self, *args, **kwargs):
 
297
        opts = kwargs.copy()
 
298
        opts['use_greenlets'] = self.use_greenlets
 
299
        return get_connection(*args, **opts)
 
300
 
 
301
    def get_pool(self, *args, **kwargs):
 
302
        if self.use_greenlets:
 
303
            klass = GreenletPool
 
304
        else:
 
305
            klass = Pool
 
306
 
 
307
        return klass(*args, **kwargs)
 
308
 
 
309
    def assert_no_request(self):
 
310
        self.assertEqual(
 
311
            NO_REQUEST, self.c._Connection__pool._get_request_state()
 
312
        )
 
313
 
 
314
    def assert_request_without_socket(self):
 
315
        self.assertEqual(
 
316
            NO_SOCKET_YET, self.c._Connection__pool._get_request_state()
 
317
        )
 
318
 
 
319
    def assert_request_with_socket(self):
 
320
        self.assertTrue(isinstance(
 
321
            self.c._Connection__pool._get_request_state(), SocketInfo
 
322
        ))
 
323
 
 
324
    def assert_pool_size(self, pool_size):
 
325
        self.assertEqual(
 
326
            pool_size, len(self.c._Connection__pool.sockets)
 
327
        )
 
328
 
 
329
 
 
330
class _TestPooling(_TestPoolingBase):
 
331
    """Basic pool tests, to be applied both to Pool and GreenletPool"""
 
332
    def test_max_pool_size_validation(self):
 
333
        self.assertRaises(
 
334
            ConfigurationError, Connection, host=host, port=port,
 
335
            max_pool_size=-1
 
336
        )
 
337
 
 
338
        self.assertRaises(
 
339
            ConfigurationError, Connection, host=host, port=port,
 
340
            max_pool_size='foo'
 
341
        )
 
342
 
 
343
        c = Connection(host=host, port=port, max_pool_size=100)
 
344
        self.assertEqual(c.max_pool_size, 100)
 
345
 
 
346
    def test_no_disconnect(self):
 
347
        run_cases(self, [NoRequest, NonUnique, Unique, SaveAndFind])
 
348
 
 
349
    def test_simple_disconnect(self):
 
350
        # Connection just created, expect 1 free socket
 
351
        self.assert_pool_size(1)
 
352
        self.assert_no_request()
 
353
 
 
354
        self.c.start_request()
 
355
        self.assert_request_without_socket()
 
356
        cursor = self.c[DB].stuff.find()
 
357
 
 
358
        # Cursor hasn't actually caused a request yet, so there's still 1 free
 
359
        # socket.
 
360
        self.assert_pool_size(1)
 
361
        self.assert_request_without_socket()
 
362
 
 
363
        # Actually make a request to server, triggering a socket to be
 
364
        # allocated to the request
 
365
        list(cursor)
 
366
        self.assert_pool_size(0)
 
367
        self.assert_request_with_socket()
 
368
 
 
369
        # Pool returns to its original state
 
370
        self.c.end_request()
 
371
        self.assert_no_request()
 
372
        self.assert_pool_size(1)
 
373
 
 
374
        self.c.disconnect()
 
375
        self.assert_pool_size(0)
 
376
        self.assert_no_request()
 
377
 
 
378
    def test_disconnect(self):
 
379
        run_cases(self, [SaveAndFind, Disconnect, Unique])
 
380
 
 
381
    def test_independent_pools(self):
 
382
        p = self.get_pool((host, port), 10, None, None, False)
 
383
        self.c.start_request()
 
384
        self.assertEqual(set(), p.sockets)
 
385
        self.c.end_request()
 
386
        self.assertEqual(set(), p.sockets)
 
387
 
 
388
    def test_dependent_pools(self):
 
389
        self.assert_pool_size(1)
 
390
        self.c.start_request()
 
391
        self.assert_request_without_socket()
 
392
        self.c.test.test.find_one()
 
393
        self.assert_request_with_socket()
 
394
        self.assert_pool_size(0)
 
395
        self.c.end_request()
 
396
        self.assert_pool_size(1)
 
397
 
 
398
        t = OneOp(self)
 
399
        t.start()
 
400
        t.join()
 
401
        self.assertTrue(t.passed, "OneOp.run() threw exception")
 
402
 
 
403
        self.assert_pool_size(1)
 
404
        self.c.test.test.find_one()
 
405
        self.assert_pool_size(1)
 
406
 
 
407
    def test_multiple_connections(self):
 
408
        a = self.get_connection(auto_start_request=False)
 
409
        b = self.get_connection(auto_start_request=False)
 
410
        self.assertEqual(1, len(a._Connection__pool.sockets))
 
411
        self.assertEqual(1, len(b._Connection__pool.sockets))
 
412
 
 
413
        a.start_request()
 
414
        a.test.test.find_one()
 
415
        self.assertEqual(0, len(a._Connection__pool.sockets))
 
416
        a.end_request()
 
417
        self.assertEqual(1, len(a._Connection__pool.sockets))
 
418
        self.assertEqual(1, len(b._Connection__pool.sockets))
 
419
        a_sock = one(a._Connection__pool.sockets)
 
420
 
 
421
        b.end_request()
 
422
        self.assertEqual(1, len(a._Connection__pool.sockets))
 
423
        self.assertEqual(1, len(b._Connection__pool.sockets))
 
424
 
 
425
        b.start_request()
 
426
        b.test.test.find_one()
 
427
        self.assertEqual(1, len(a._Connection__pool.sockets))
 
428
        self.assertEqual(0, len(b._Connection__pool.sockets))
 
429
 
 
430
        b.end_request()
 
431
        b_sock = one(b._Connection__pool.sockets)
 
432
        b.test.test.find_one()
 
433
        a.test.test.find_one()
 
434
 
 
435
        self.assertEqual(b_sock,
 
436
                         b._Connection__pool.get_socket((b.host, b.port)))
 
437
        self.assertEqual(a_sock,
 
438
                         a._Connection__pool.get_socket((a.host, a.port)))
 
439
 
 
440
    def test_request(self):
 
441
        # Check that Pool gives two different sockets in two calls to
 
442
        # get_socket() -- doesn't automatically put us in a request any more
 
443
        cx_pool = self.get_pool(
 
444
            pair=(host,port),
 
445
            max_size=10,
 
446
            net_timeout=1000,
 
447
            conn_timeout=1000,
 
448
            use_ssl=False
 
449
        )
 
450
 
 
451
        sock0 = cx_pool.get_socket()
 
452
        sock1 = cx_pool.get_socket()
 
453
 
 
454
        self.assertNotEqual(sock0, sock1)
 
455
 
 
456
        # Now in a request, we'll get the same socket both times
 
457
        cx_pool.start_request()
 
458
 
 
459
        sock2 = cx_pool.get_socket()
 
460
        sock3 = cx_pool.get_socket()
 
461
        self.assertEqual(sock2, sock3)
 
462
 
 
463
        # Pool didn't keep reference to sock0 or sock1; sock2 and 3 are new
 
464
        self.assertNotEqual(sock0, sock2)
 
465
        self.assertNotEqual(sock1, sock2)
 
466
 
 
467
        # Return the request sock to pool
 
468
        cx_pool.end_request()
 
469
 
 
470
        sock4 = cx_pool.get_socket()
 
471
        sock5 = cx_pool.get_socket()
 
472
 
 
473
        # Not in a request any more, we get different sockets
 
474
        self.assertNotEqual(sock4, sock5)
 
475
 
 
476
        # end_request() returned sock2 to pool
 
477
        self.assertEqual(sock4, sock2)
 
478
 
 
479
    def test_reset_and_request(self):
 
480
        # reset() is called after a fork, or after a socket error. Ensure that
 
481
        # a new request is begun if a request was in progress when the reset()
 
482
        # occurred, otherwise no request is begun.
 
483
        p = self.get_pool((host, port), 10, None, None, False)
 
484
        self.assertFalse(p.in_request())
 
485
        p.start_request()
 
486
        self.assertTrue(p.in_request())
 
487
        p.reset()
 
488
        self.assertTrue(p.in_request())
 
489
        p.end_request()
 
490
        self.assertFalse(p.in_request())
 
491
        p.reset()
 
492
        self.assertFalse(p.in_request())
 
493
 
 
494
    def test_pool_reuses_open_socket(self):
 
495
        # Test Pool's _check_closed() method doesn't close a healthy socket
 
496
        cx_pool = self.get_pool((host,port), 10, None, None, False)
 
497
        sock_info = cx_pool.get_socket()
 
498
        cx_pool.return_socket(sock_info)
 
499
 
 
500
        # trigger _check_closed, which only runs on sockets that haven't been
 
501
        # used in a second
 
502
        time.sleep(1.1)
 
503
        new_sock_info = cx_pool.get_socket()
 
504
        self.assertEqual(sock_info, new_sock_info)
 
505
        del sock_info, new_sock_info
 
506
 
 
507
        # Assert sock_info was returned to the pool *once*
 
508
        force_reclaim_sockets(cx_pool, 1)
 
509
        self.assertEqual(1, len(cx_pool.sockets))
 
510
 
 
511
    def test_pool_removes_dead_socket(self):
 
512
        # Test that Pool removes dead socket and the socket doesn't return
 
513
        # itself PYTHON-344
 
514
        cx_pool = self.get_pool((host,port), 10, None, None, False)
 
515
        sock_info = cx_pool.get_socket()
 
516
 
 
517
        # Simulate a closed socket without telling the SocketInfo it's closed
 
518
        sock_info.sock.close()
 
519
        self.assertTrue(pymongo.pool._closed(sock_info.sock))
 
520
        cx_pool.return_socket(sock_info)
 
521
        time.sleep(1.1) # trigger _check_closed
 
522
        new_sock_info = cx_pool.get_socket()
 
523
        self.assertEqual(0, len(cx_pool.sockets))
 
524
        self.assertNotEqual(sock_info, new_sock_info)
 
525
        del sock_info, new_sock_info
 
526
 
 
527
        # new_sock_info returned to the pool, but not the closed sock_info
 
528
        force_reclaim_sockets(cx_pool, 1)
 
529
        self.assertEqual(1, len(cx_pool.sockets))
 
530
 
 
531
    def test_pool_removes_dead_request_socket(self):
 
532
        # Test that Pool keeps request going even if a socket dies in request
 
533
        cx_pool = self.get_pool((host,port), 10, None, None, False)
 
534
        cx_pool.start_request()
 
535
 
 
536
        # Get the request socket
 
537
        sock_info = cx_pool.get_socket()
 
538
        self.assertEqual(0, len(cx_pool.sockets))
 
539
        self.assertEqual(sock_info, cx_pool._get_request_state())
 
540
        sock_info.sock.close()
 
541
        cx_pool.return_socket(sock_info)
 
542
        time.sleep(1.1) # trigger _check_closed
 
543
 
 
544
        # Although the request socket died, we're still in a request with a
 
545
        # new socket
 
546
        new_sock_info = cx_pool.get_socket()
 
547
        self.assertNotEqual(sock_info, new_sock_info)
 
548
        self.assertEqual(new_sock_info, cx_pool._get_request_state())
 
549
        cx_pool.return_socket(new_sock_info)
 
550
        self.assertEqual(new_sock_info, cx_pool._get_request_state())
 
551
        self.assertEqual(0, len(cx_pool.sockets))
 
552
 
 
553
        cx_pool.end_request()
 
554
        self.assertEqual(1, len(cx_pool.sockets))
 
555
 
 
556
    def test_pool_removes_dead_socket_after_request(self):
 
557
        # Test that Pool handles a socket dying that *used* to be the request
 
558
        # socket.
 
559
        cx_pool = self.get_pool((host,port), 10, None, None, False)
 
560
        cx_pool.start_request()
 
561
 
 
562
        # Get the request socket
 
563
        sock_info = cx_pool.get_socket()
 
564
        self.assertEqual(sock_info, cx_pool._get_request_state())
 
565
 
 
566
        # End request
 
567
        cx_pool.end_request()
 
568
        self.assertEqual(1, len(cx_pool.sockets))
 
569
 
 
570
        # Kill old request socket
 
571
        sock_info.sock.close()
 
572
        old_sock_info_id = id(sock_info)
 
573
        del sock_info
 
574
        time.sleep(1.1) # trigger _check_closed
 
575
 
 
576
        # Dead socket detected and removed
 
577
        new_sock_info = cx_pool.get_socket()
 
578
        self.assertNotEqual(id(new_sock_info), old_sock_info_id)
 
579
        self.assertEqual(0, len(cx_pool.sockets))
 
580
        self.assertFalse(pymongo.pool._closed(new_sock_info.sock))
 
581
 
 
582
    def test_socket_reclamation(self):
 
583
        # Check that if a thread starts a request and dies without ending
 
584
        # the request, that the socket is reclaimed into the pool.
 
585
        cx_pool = self.get_pool(
 
586
            pair=(host,port),
 
587
            max_size=10,
 
588
            net_timeout=1000,
 
589
            conn_timeout=1000,
 
590
            use_ssl=False,
 
591
        )
 
592
 
 
593
        self.assertEqual(0, len(cx_pool.sockets))
 
594
 
 
595
        lock = None
 
596
        the_sock = [None]
 
597
 
 
598
        def leak_request():
 
599
            self.assertEqual(NO_REQUEST, cx_pool._get_request_state())
 
600
            cx_pool.start_request()
 
601
            self.assertEqual(NO_SOCKET_YET, cx_pool._get_request_state())
 
602
            sock_info = cx_pool.get_socket()
 
603
            self.assertEqual(sock_info, cx_pool._get_request_state())
 
604
            the_sock[0] = id(sock_info.sock)
 
605
 
 
606
            if not self.use_greenlets:
 
607
                lock.release()
 
608
 
 
609
        if self.use_greenlets:
 
610
            g = Greenlet(leak_request)
 
611
            g.start()
 
612
            g.join(1)
 
613
            self.assertTrue(g.ready(), "Greenlet is hung")
 
614
        else:
 
615
            lock = thread.allocate_lock()
 
616
            lock.acquire()
 
617
 
 
618
            # Start a thread WITHOUT a threading.Thread - important to test that
 
619
            # Pool can deal with primitive threads.
 
620
            thread.start_new_thread(leak_request, ())
 
621
 
 
622
            # Join thread
 
623
            acquired = lock.acquire()
 
624
            self.assertTrue(acquired, "Thread is hung")
 
625
 
 
626
        force_reclaim_sockets(cx_pool, 1)
 
627
 
 
628
        # Pool reclaimed the socket
 
629
        self.assertEqual(1, len(cx_pool.sockets))
 
630
        self.assertEqual(the_sock[0], id(one(cx_pool.sockets).sock))
 
631
 
 
632
 
 
633
class _TestMaxPoolSize(_TestPoolingBase):
 
634
    """Test that connection pool keeps proper number of idle sockets open,
 
635
    no matter how start/end_request are called. To be applied both to Pool and
 
636
    GreenletPool.
 
637
    """
 
638
    def _test_max_pool_size(self, start_request, end_request):
 
639
        c = self.get_connection(max_pool_size=4, auto_start_request=False)
 
640
        nthreads = 10
 
641
 
 
642
        if (
 
643
            self.use_greenlets and sys.platform == 'darwin'
 
644
            and gevent.version_info[0] < 1
 
645
        ):
 
646
            # Gevent 0.13.6 bug on Mac, Greenlet.join() hangs if more than
 
647
            # about 35 Greenlets share a Connection. Apparently fixed in
 
648
            # recent Gevent development.
 
649
            nthreads = 30
 
650
 
 
651
        threads = []
 
652
        for i in range(nthreads):
 
653
            t = CreateAndReleaseSocket(self, c, start_request, end_request)
 
654
            threads.append(t)
 
655
 
 
656
        for t in threads:
 
657
            t.start()
 
658
 
 
659
        for t in threads:
 
660
            t.join()
 
661
 
 
662
        for t in threads:
 
663
            self.assertTrue(t.passed)
 
664
 
 
665
        # Critical: release refs to threads, so SocketInfo.__del__() executes
 
666
        # and reclaims sockets.
 
667
        del threads
 
668
        t = None
 
669
 
 
670
        cx_pool = c._Connection__pool
 
671
        force_reclaim_sockets(cx_pool, 4)
 
672
 
 
673
        nsock = len(cx_pool.sockets)
 
674
 
 
675
        # Socket-reclamation depends on timely garbage-collection, so be lenient
 
676
        self.assertTrue(2 <= nsock <= 4,
 
677
            msg="Expected between 2 and 4 sockets in the pool, got %d" % nsock)
 
678
 
 
679
    def test_max_pool_size(self):
 
680
        self._test_max_pool_size(0, 0)
 
681
 
 
682
    def test_max_pool_size_with_request(self):
 
683
        self._test_max_pool_size(1, 1)
 
684
 
 
685
    def test_max_pool_size_with_redundant_request(self):
 
686
        self._test_max_pool_size(2, 1)
 
687
        self._test_max_pool_size(20, 1)
 
688
 
 
689
    def test_max_pool_size_with_leaked_request(self):
 
690
        # Call start_request() but not end_request() -- when threads die, they
 
691
        # should return their request sockets to the pool.
 
692
        self._test_max_pool_size(1, 0)
 
693
 
 
694
    def test_max_pool_size_with_end_request_only(self):
 
695
        # Call end_request() but not start_request()
 
696
        self._test_max_pool_size(0, 1)
 
697
 
 
698
 
 
699
class _TestPoolSocketSharing(_TestPoolingBase):
 
700
    """Directly test that two simultaneous operations don't share a socket. To
 
701
    be applied both to Pool and GreenletPool.
 
702
    """
 
703
    def _test_pool(self, use_request):
 
704
        """
 
705
        Test that the connection pool prevents both threads and greenlets from
 
706
        using a socket at the same time.
 
707
 
 
708
        Sequence:
 
709
        gr0: start a slow find()
 
710
        gr1: start a fast find()
 
711
        gr1: get results
 
712
        gr0: get results
 
713
        """
 
714
        cx = get_connection(
 
715
            use_greenlets=self.use_greenlets,
 
716
            auto_start_request=False
 
717
        )
 
718
 
 
719
        db = cx.pymongo_test
 
720
        if not version.at_least(db.connection, (1, 7, 2)):
 
721
            raise SkipTest("Need at least MongoDB version 1.7.2 to use"
 
722
                           " db.eval(nolock=True)")
 
723
        
 
724
        db.test.remove(safe=True)
 
725
        db.test.insert({'_id': 1}, safe=True)
 
726
 
 
727
        history = []
 
728
 
 
729
        def find_fast():
 
730
            if use_request:
 
731
                cx.start_request()
 
732
 
 
733
            history.append('find_fast start')
 
734
 
 
735
            # With greenlets and the old connection._Pool, this would throw
 
736
            # AssertionError: "This event is already used by another
 
737
            # greenlet"
 
738
            self.assertEqual({'_id': 1}, db.test.find_one())
 
739
            history.append('find_fast done')
 
740
 
 
741
            if use_request:
 
742
                cx.end_request()
 
743
 
 
744
        def find_slow():
 
745
            if use_request:
 
746
                cx.start_request()
 
747
 
 
748
            history.append('find_slow start')
 
749
 
 
750
            # Javascript function that pauses 5 sec. 'nolock' allows find_fast
 
751
            # to start and finish while we're waiting for this.
 
752
            fn = delay(5)
 
753
            self.assertEqual(
 
754
                {'ok': 1.0, 'retval': True},
 
755
                db.command('eval', fn, nolock=True))
 
756
 
 
757
            history.append('find_slow done')
 
758
 
 
759
            if use_request:
 
760
                cx.end_request()
 
761
 
 
762
        if self.use_greenlets:
 
763
            gr0, gr1 = Greenlet(find_slow), Greenlet(find_fast)
 
764
            gr0.start()
 
765
            gr1.start_later(.1)
 
766
        else:
 
767
            gr0 = threading.Thread(target=find_slow)
 
768
            gr0.setDaemon(True)
 
769
            gr1 = threading.Thread(target=find_fast)
 
770
            gr1.setDaemon(True)
 
771
 
 
772
            gr0.start()
 
773
            time.sleep(.1)
 
774
            gr1.start()
 
775
 
 
776
        gr0.join()
 
777
        gr1.join()
 
778
 
 
779
        self.assertEqual([
 
780
            'find_slow start',
 
781
            'find_fast start',
 
782
            'find_fast done',
 
783
            'find_slow done',
 
784
        ], history)
 
785
 
 
786
    def test_pool(self):
 
787
        self._test_pool(use_request=False)
 
788
 
 
789
    def test_pool_request(self):
 
790
        self._test_pool(use_request=True)