1
# Copyright 2012-2014 MongoDB, Inc.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
15
"""Base classes to test built-in connection-pooling with threads or greenlets.
28
from nose.plugins.skip import SkipTest
31
from pymongo.mongo_client import MongoClient
32
from pymongo.pool import Pool, NO_REQUEST, NO_SOCKET_YET, SocketInfo
33
from pymongo.errors import ConfigurationError, ConnectionFailure
34
from pymongo.errors import ExceededMaxWaiters
35
from test import version, host, port
36
from test.test_client import get_client
37
from test.utils import delay, is_mongos, one, get_pool
40
DB = "pymongo-pooling-tests"
43
if sys.version_info[0] >= 3:
44
from imp import reload
49
from gevent import Greenlet, monkey, hub
50
import gevent.coros, gevent.event
56
def gc_collect_until_done(threads, timeout=60):
58
running = list(threads)
60
assert (time.time() - start) < timeout, "Threads timed out"
68
class MongoThread(object):
69
"""A thread, or a greenlet, that uses a MongoClient"""
70
def __init__(self, test_case):
71
self.use_greenlets = test_case.use_greenlets
72
self.client = test_case.c
73
self.db = self.client[DB]
78
if self.use_greenlets:
79
# A Gevent extended Greenlet
80
self.thread = Greenlet(self.run)
82
self.thread = threading.Thread(target=self.run)
83
self.thread.setDaemon(True) # Don't hang whole test if thread hangs
89
if self.use_greenlets:
90
return not self.thread.dead
92
return self.thread.isAlive()
96
if self.use_greenlets:
97
msg = "Greenlet timeout"
99
msg = "Thread timeout"
100
assert not self.alive, msg
104
self.run_mongo_thread()
106
# No exceptions thrown
109
def run_mongo_thread(self):
110
raise NotImplementedError()
113
class SaveAndFind(MongoThread):
115
def run_mongo_thread(self):
117
rand = random.randint(0, N)
118
_id = self.db.sf.save({"x": rand})
119
self.ut.assertEqual(rand, self.db.sf.find_one(_id)["x"])
122
class Unique(MongoThread):
124
def run_mongo_thread(self):
126
self.client.start_request()
127
self.db.unique.insert({}) # no error
128
self.client.end_request()
131
class NonUnique(MongoThread):
133
def run_mongo_thread(self):
135
self.client.start_request()
136
self.db.unique.insert({"_id": "jesse"}, w=0)
137
self.ut.assertNotEqual(None, self.db.error())
138
self.client.end_request()
141
class Disconnect(MongoThread):
143
def run_mongo_thread(self):
145
self.client.disconnect()
148
class NoRequest(MongoThread):
150
def run_mongo_thread(self):
151
self.client.start_request()
154
self.db.unique.insert({"_id": "jesse"}, w=0)
155
if not self.db.error():
158
self.client.end_request()
159
self.ut.assertEqual(0, errors)
162
def run_cases(ut, cases):
166
ut.use_greenlets and sys.platform == 'darwin'
167
and gevent.version_info[0] < 1
169
# Gevent 0.13.6 bug on Mac, Greenlet.join() hangs if more than
170
# about 35 Greenlets share a MongoClient. Apparently fixed in
171
# recent Gevent development.
175
for i in range(nruns):
184
assert t.passed, "%s.run_mongo_thread() threw an exception" % repr(t)
187
class OneOp(MongoThread):
189
def __init__(self, ut):
190
super(OneOp, self).__init__(ut)
192
def run_mongo_thread(self):
193
pool = get_pool(self.client)
194
assert len(pool.sockets) == 1, "Expected 1 socket, found %d" % (
198
sock_info = one(pool.sockets)
200
self.client.start_request()
202
# start_request() hasn't yet moved the socket from the general pool into
204
assert len(pool.sockets) == 1
205
assert one(pool.sockets) == sock_info
207
self.client[DB].test.find_one()
209
# find_one() causes the socket to be used in the request, so now it's
210
# bound to this thread
211
assert len(pool.sockets) == 0
212
assert pool._get_request_state() == sock_info
213
self.client.end_request()
215
# The socket is back in the pool
216
assert len(pool.sockets) == 1
217
assert one(pool.sockets) == sock_info
220
class CreateAndReleaseSocket(MongoThread):
221
"""A thread or greenlet that acquires a socket, waits for all other threads
222
to reach rendezvous point, then terminates.
224
class Rendezvous(object):
225
def __init__(self, nthreads, use_greenlets):
226
self.nthreads = nthreads
227
self.nthreads_run = 0
228
self.use_greenlets = use_greenlets
230
self.lock = gevent.coros.RLock()
232
self.lock = threading.Lock()
235
def reset_ready(self):
236
if self.use_greenlets:
237
self.ready = gevent.event.Event()
239
self.ready = threading.Event()
241
def __init__(self, ut, client, start_request, end_request, rendezvous):
242
super(CreateAndReleaseSocket, self).__init__(ut)
244
self.start_request = start_request
245
self.end_request = end_request
246
self.rendezvous = rendezvous
248
def run_mongo_thread(self):
249
# Do an operation that requires a socket.
250
# test_max_pool_size uses this to spin up lots of threads requiring
251
# lots of simultaneous connections, to ensure that Pool obeys its
252
# max_size configuration and closes extra sockets as they're returned.
253
for i in range(self.start_request):
254
self.client.start_request()
257
self.client[DB].test.find_one()
259
# Don't finish until all threads reach this point
263
if r.nthreads_run == r.nthreads:
264
# Everyone's here, let them finish
269
r.ready.wait(30) # Wait thirty seconds....
270
assert r.ready.isSet(), "Rendezvous timed out"
272
for i in range(self.end_request):
273
self.client.end_request()
276
class CreateAndReleaseSocketNoRendezvous(MongoThread):
277
"""A thread or greenlet that acquires a socket and terminates without
278
waiting for other threads to reach rendezvous point.
280
class Rendezvous(object):
281
def __init__(self, nthreads, use_greenlets):
282
self.nthreads = nthreads
283
self.nthreads_run = 0
285
self.lock = gevent.coros.RLock()
286
self.ready = gevent.event.Event()
288
self.lock = threading.Lock()
289
self.ready = threading.Event()
291
def __init__(self, ut, client, start_request, end_request):
292
super(CreateAndReleaseSocketNoRendezvous, self).__init__(ut)
294
self.start_request = start_request
295
self.end_request = end_request
297
def run_mongo_thread(self):
298
# Do an operation that requires a socket.
299
# test_max_pool_size uses this to spin up lots of threads requiring
300
# lots of simultaneous connections, to ensure that Pool obeys its
301
# max_size configuration and closes extra sockets as they're returned.
302
for i in range(self.start_request):
303
self.client.start_request()
306
self.client[DB].test.find_one()
307
for i in range(self.end_request):
308
self.client.end_request()
311
class _TestPoolingBase(object):
312
"""Base class for all client-pool tests. Doesn't inherit from
313
unittest.TestCase, and its name is prefixed with "_" to avoid being
314
run by nose. Real tests double-inherit from this base and from TestCase.
316
use_greenlets = False
319
if self.use_greenlets:
321
raise SkipTest("Gevent not installed")
323
# Note we don't do patch_thread() or patch_all() - we're
324
# testing here that patch_thread() is unnecessary for
325
# the client pool to work properly.
326
monkey.patch_socket()
328
self.c = self.get_client(auto_start_request=False)
334
db.unique.insert({"_id": "jesse"})
336
db.test.insert([{} for i in range(10)])
341
if self.use_greenlets:
345
def get_client(self, *args, **kwargs):
347
opts['use_greenlets'] = self.use_greenlets
348
return get_client(*args, **opts)
350
def get_pool(self, *args, **kwargs):
351
kwargs['use_greenlets'] = self.use_greenlets
352
return Pool(*args, **kwargs)
354
def sleep(self, seconds):
355
if self.use_greenlets:
356
gevent.sleep(seconds)
360
def assert_no_request(self):
362
self.c._MongoClient__member is None or
363
NO_REQUEST == get_pool(self.c)._get_request_state()
366
def assert_request_without_socket(self):
368
NO_SOCKET_YET, get_pool(self.c)._get_request_state()
371
def assert_request_with_socket(self):
372
self.assertTrue(isinstance(
373
get_pool(self.c)._get_request_state(), SocketInfo
376
def assert_pool_size(self, pool_size):
379
self.c._MongoClient__member is None
380
or not get_pool(self.c).sockets
384
pool_size, len(get_pool(self.c).sockets)
388
class _TestPooling(_TestPoolingBase):
389
"""Basic pool tests, to be run both with threads and with greenlets."""
390
def test_max_pool_size_validation(self):
392
ConfigurationError, MongoClient, host=host, port=port,
397
ConfigurationError, MongoClient, host=host, port=port,
401
c = MongoClient(host=host, port=port, max_pool_size=100)
402
self.assertEqual(c.max_pool_size, 100)
404
def test_no_disconnect(self):
405
run_cases(self, [NoRequest, NonUnique, Unique, SaveAndFind])
407
def test_simple_disconnect(self):
408
# MongoClient just created, expect 1 free socket
409
self.assert_pool_size(1)
410
self.assert_no_request()
412
self.c.start_request()
413
self.assert_request_without_socket()
414
cursor = self.c[DB].stuff.find()
416
# Cursor hasn't actually caused a request yet, so there's still 1 free
418
self.assert_pool_size(1)
419
self.assert_request_without_socket()
421
# Actually make a request to server, triggering a socket to be
422
# allocated to the request
424
self.assert_pool_size(0)
425
self.assert_request_with_socket()
427
# Pool returns to its original state
429
self.assert_no_request()
430
self.assert_pool_size(1)
433
self.assert_pool_size(0)
434
self.assert_no_request()
436
def test_disconnect(self):
437
run_cases(self, [SaveAndFind, Disconnect, Unique])
439
def test_independent_pools(self):
440
# Test for regression of very early PyMongo bug: separate pools shared
442
p = self.get_pool((host, port), 10, None, None, False)
443
self.c.start_request()
444
self.c.pymongo_test.test.find_one()
445
self.assertEqual(set(), p.sockets)
447
self.assert_pool_size(1)
448
self.assertEqual(set(), p.sockets)
450
def test_dependent_pools(self):
451
self.assert_pool_size(1)
452
self.c.start_request()
453
self.assert_request_without_socket()
454
self.c.pymongo_test.test.find_one()
455
self.assert_request_with_socket()
456
self.assert_pool_size(0)
458
self.assert_pool_size(1)
463
self.assertTrue(t.passed, "OneOp.run() threw exception")
465
self.assert_pool_size(1)
466
self.c.pymongo_test.test.find_one()
467
self.assert_pool_size(1)
469
def test_multiple_connections(self):
470
a = self.get_client(auto_start_request=False)
471
b = self.get_client(auto_start_request=False)
472
self.assertEqual(1, len(get_pool(a).sockets))
473
self.assertEqual(1, len(get_pool(b).sockets))
476
a.pymongo_test.test.find_one()
477
self.assertEqual(0, len(get_pool(a).sockets))
479
self.assertEqual(1, len(get_pool(a).sockets))
480
self.assertEqual(1, len(get_pool(b).sockets))
481
a_sock = one(get_pool(a).sockets)
484
self.assertEqual(1, len(get_pool(a).sockets))
485
self.assertEqual(1, len(get_pool(b).sockets))
488
b.pymongo_test.test.find_one()
489
self.assertEqual(1, len(get_pool(a).sockets))
490
self.assertEqual(0, len(get_pool(b).sockets))
493
b_sock = one(get_pool(b).sockets)
494
b.pymongo_test.test.find_one()
495
a.pymongo_test.test.find_one()
497
self.assertEqual(b_sock,
498
get_pool(b).get_socket())
499
self.assertEqual(a_sock,
500
get_pool(a).get_socket())
505
def test_request(self):
506
# Check that Pool gives two different sockets in two calls to
507
# get_socket() -- doesn't automatically put us in a request any more
508
cx_pool = self.get_pool(
516
sock0 = cx_pool.get_socket()
517
sock1 = cx_pool.get_socket()
519
self.assertNotEqual(sock0, sock1)
521
# Now in a request, we'll get the same socket both times
522
cx_pool.start_request()
524
sock2 = cx_pool.get_socket()
525
sock3 = cx_pool.get_socket()
526
self.assertEqual(sock2, sock3)
528
# Pool didn't keep reference to sock0 or sock1; sock2 and 3 are new
529
self.assertNotEqual(sock0, sock2)
530
self.assertNotEqual(sock1, sock2)
532
# Return the request sock to pool
533
cx_pool.end_request()
535
sock4 = cx_pool.get_socket()
536
sock5 = cx_pool.get_socket()
538
# Not in a request any more, we get different sockets
539
self.assertNotEqual(sock4, sock5)
541
# end_request() returned sock2 to pool
542
self.assertEqual(sock4, sock2)
544
for s in [sock0, sock1, sock2, sock3, sock4, sock5]:
547
def test_reset_and_request(self):
548
# reset() is called after a fork, or after a socket error. Ensure that
549
# a new request is begun if a request was in progress when the reset()
550
# occurred, otherwise no request is begun.
551
p = self.get_pool((host, port), 10, None, None, False)
552
self.assertFalse(p.in_request())
554
self.assertTrue(p.in_request())
556
self.assertTrue(p.in_request())
558
self.assertFalse(p.in_request())
560
self.assertFalse(p.in_request())
562
def test_pool_reuses_open_socket(self):
563
# Test Pool's _check_closed() method doesn't close a healthy socket
564
cx_pool = self.get_pool((host,port), 10, None, None, False)
565
cx_pool._check_interval_seconds = 0 # Always check.
566
sock_info = cx_pool.get_socket()
567
cx_pool.maybe_return_socket(sock_info)
569
new_sock_info = cx_pool.get_socket()
570
self.assertEqual(sock_info, new_sock_info)
571
cx_pool.maybe_return_socket(new_sock_info)
572
self.assertEqual(1, len(cx_pool.sockets))
574
def test_pool_removes_dead_socket(self):
575
# Test that Pool removes dead socket and the socket doesn't return
577
cx_pool = self.get_pool((host,port), 10, None, None, False)
578
cx_pool._check_interval_seconds = 0 # Always check.
579
sock_info = cx_pool.get_socket()
581
# Simulate a closed socket without telling the SocketInfo it's closed
582
sock_info.sock.close()
583
self.assertTrue(pymongo.pool._closed(sock_info.sock))
584
cx_pool.maybe_return_socket(sock_info)
585
new_sock_info = cx_pool.get_socket()
586
self.assertEqual(0, len(cx_pool.sockets))
587
self.assertNotEqual(sock_info, new_sock_info)
588
cx_pool.maybe_return_socket(new_sock_info)
589
self.assertEqual(1, len(cx_pool.sockets))
591
def test_pool_removes_dead_request_socket_after_check(self):
592
# Test that Pool keeps request going even if a socket dies in request
593
cx_pool = self.get_pool((host,port), 10, None, None, False)
594
cx_pool._check_interval_seconds = 0 # Always check.
595
cx_pool.start_request()
597
# Get the request socket
598
sock_info = cx_pool.get_socket()
599
self.assertEqual(0, len(cx_pool.sockets))
600
self.assertEqual(sock_info, cx_pool._get_request_state())
601
sock_info.sock.close()
602
cx_pool.maybe_return_socket(sock_info)
604
# Although the request socket died, we're still in a request with a
606
new_sock_info = cx_pool.get_socket()
607
self.assertTrue(cx_pool.in_request())
608
self.assertNotEqual(sock_info, new_sock_info)
609
self.assertEqual(new_sock_info, cx_pool._get_request_state())
610
cx_pool.maybe_return_socket(new_sock_info)
611
self.assertEqual(new_sock_info, cx_pool._get_request_state())
612
self.assertEqual(0, len(cx_pool.sockets))
614
cx_pool.end_request()
615
self.assertEqual(1, len(cx_pool.sockets))
617
def test_pool_removes_dead_request_socket(self):
618
# Test that Pool keeps request going even if a socket dies in request
619
cx_pool = self.get_pool((host,port), 10, None, None, False)
620
cx_pool.start_request()
622
# Get the request socket
623
sock_info = cx_pool.get_socket()
624
self.assertEqual(0, len(cx_pool.sockets))
625
self.assertEqual(sock_info, cx_pool._get_request_state())
627
# Unlike in test_pool_removes_dead_request_socket_after_check, we
628
# set sock_info.closed and *don't* wait for it to be checked.
630
cx_pool.maybe_return_socket(sock_info)
632
# Although the request socket died, we're still in a request with a
634
new_sock_info = cx_pool.get_socket()
635
self.assertTrue(cx_pool.in_request())
636
self.assertNotEqual(sock_info, new_sock_info)
637
self.assertEqual(new_sock_info, cx_pool._get_request_state())
638
cx_pool.maybe_return_socket(new_sock_info)
639
self.assertEqual(new_sock_info, cx_pool._get_request_state())
640
self.assertEqual(0, len(cx_pool.sockets))
642
cx_pool.end_request()
643
self.assertEqual(1, len(cx_pool.sockets))
645
def test_pool_removes_dead_socket_after_request(self):
646
# Test that Pool handles a socket dying that *used* to be the request
648
cx_pool = self.get_pool((host,port), 10, None, None, False)
649
cx_pool._check_interval_seconds = 0 # Always check.
650
cx_pool.start_request()
652
# Get the request socket
653
sock_info = cx_pool.get_socket()
654
self.assertEqual(sock_info, cx_pool._get_request_state())
655
cx_pool.maybe_return_socket(sock_info)
658
cx_pool.end_request()
659
self.assertEqual(1, len(cx_pool.sockets))
661
# Kill old request socket
662
sock_info.sock.close()
664
# Dead socket detected and removed
665
new_sock_info = cx_pool.get_socket()
666
self.assertFalse(cx_pool.in_request())
667
self.assertNotEqual(sock_info, new_sock_info)
668
self.assertEqual(0, len(cx_pool.sockets))
669
self.assertFalse(pymongo.pool._closed(new_sock_info.sock))
670
cx_pool.maybe_return_socket(new_sock_info)
671
self.assertEqual(1, len(cx_pool.sockets))
673
def test_dead_request_socket_with_max_size(self):
674
# When a pool replaces a dead request socket, the semaphore it uses
675
# to enforce max_size should remain unaffected.
676
cx_pool = self.get_pool(
677
(host, port), 1, None, None, False, wait_queue_timeout=1)
679
cx_pool._check_interval_seconds = 0 # Always check.
680
cx_pool.start_request()
682
# Get and close the request socket.
683
request_sock_info = cx_pool.get_socket()
684
request_sock_info.sock.close()
685
cx_pool.maybe_return_socket(request_sock_info)
687
# Detects closed socket and creates new one, semaphore value still 0.
688
request_sock_info_2 = cx_pool.get_socket()
689
self.assertNotEqual(request_sock_info, request_sock_info_2)
690
cx_pool.maybe_return_socket(request_sock_info_2)
691
cx_pool.end_request()
693
# Semaphore value now 1; we can get a socket.
694
sock_info = cx_pool.get_socket()
697
cx_pool.maybe_return_socket(sock_info)
699
def test_socket_reclamation(self):
700
if sys.platform.startswith('java'):
701
raise SkipTest("Jython can't do socket reclamation")
703
# Check that if a thread starts a request and dies without ending
704
# the request, that the socket is reclaimed into the pool.
705
cx_pool = self.get_pool(
713
self.assertEqual(0, len(cx_pool.sockets))
719
self.assertEqual(NO_REQUEST, cx_pool._get_request_state())
720
cx_pool.start_request()
721
self.assertEqual(NO_SOCKET_YET, cx_pool._get_request_state())
722
sock_info = cx_pool.get_socket()
723
self.assertEqual(sock_info, cx_pool._get_request_state())
724
the_sock[0] = id(sock_info.sock)
725
cx_pool.maybe_return_socket(sock_info)
727
if not self.use_greenlets:
730
if self.use_greenlets:
731
g = Greenlet(leak_request)
734
self.assertTrue(g.ready(), "Greenlet is hung")
736
# In Gevent after 0.13.8, join() returns before the Greenlet.link
737
# callback fires. Give it a moment to reclaim the socket.
740
lock = thread.allocate_lock()
743
# Start a thread WITHOUT a threading.Thread - important to test that
744
# Pool can deal with primitive threads.
745
thread.start_new_thread(leak_request, ())
748
acquired = lock.acquire()
749
self.assertTrue(acquired, "Thread is hung")
751
# Make sure thread is really gone
754
if 'PyPy' in sys.version:
757
# Access the thread local from the main thread to trigger the
758
# ThreadVigil's delete callback, returning the request socket to
760
# In Python 2.7.0 and lesser, a dead thread's locals are deleted
761
# and those locals' weakref callbacks are fired only when another
762
# thread accesses the locals and finds the thread state is stale,
763
# see http://bugs.python.org/issue1868. Accessing the thread
764
# local from the main thread is a necessary part of this test, and
765
# realistic: in a multithreaded web server a new thread will access
766
# Pool._ident._local soon after an old thread has died.
769
# Pool reclaimed the socket
770
self.assertEqual(1, len(cx_pool.sockets))
771
self.assertEqual(the_sock[0], id(one(cx_pool.sockets).sock))
772
self.assertEqual(0, len(cx_pool._tid_to_sock))
775
class _TestMaxPoolSize(_TestPoolingBase):
776
"""Test that connection pool keeps proper number of idle sockets open,
777
no matter how start/end_request are called. To be run both with threads and
780
def _test_max_pool_size(
781
self, start_request, end_request, max_pool_size=4, nthreads=10):
782
"""Start `nthreads` threads. Each calls start_request `start_request`
783
times, then find_one and waits at a barrier; once all reach the barrier
784
each calls end_request `end_request` times. The test asserts that the
785
pool ends with min(max_pool_size, nthreads) sockets or, if
786
start_request wasn't called, at least one socket.
788
This tests both max_pool_size enforcement and that leaked request
789
sockets are eventually returned to the pool when their threads end.
791
You may need to increase ulimit -n on Mac.
793
If you increase nthreads over about 35, note a
794
Gevent 0.13.6 bug on Mac: Greenlet.join() hangs if more than
795
about 35 Greenlets share a MongoClient. Apparently fixed in
796
recent Gevent development.
799
if max_pool_size is not None and max_pool_size < nthreads:
800
raise AssertionError("Deadlock")
803
max_pool_size=max_pool_size, auto_start_request=False)
805
rendezvous = CreateAndReleaseSocket.Rendezvous(
806
nthreads, self.use_greenlets)
809
for i in range(nthreads):
810
t = CreateAndReleaseSocket(
811
self, c, start_request, end_request, rendezvous)
817
if 'PyPy' in sys.version:
818
# With PyPy we need to kick off the gc whenever the threads hit the
819
# rendezvous since nthreads > max_pool_size.
820
gc_collect_until_done(threads)
825
# join() returns before the thread state is cleared; give it time.
829
self.assertTrue(t.passed)
831
# Socket-reclamation doesn't work in Jython
832
if not sys.platform.startswith('java'):
833
cx_pool = get_pool(c)
835
# Socket-reclamation depends on timely garbage-collection
836
if 'PyPy' in sys.version:
839
if self.use_greenlets:
840
# Wait for Greenlet.link() callbacks to execute
841
the_hub = hub.get_hub()
842
if hasattr(the_hub, 'join'):
846
# Gevent 0.13 and less
850
# Trigger final cleanup in Python <= 2.7.0.
852
expected_idle = min(max_pool_size, nthreads)
854
'%d idle sockets (expected %d) and %d request sockets'
856
len(cx_pool.sockets), expected_idle,
857
len(cx_pool._tid_to_sock)))
860
expected_idle, len(cx_pool.sockets), message)
862
# Without calling start_request(), threads can safely share
863
# sockets; the number running concurrently, and hence the
864
# number of sockets needed, is between 1 and 10, depending
865
# on thread-scheduling.
866
self.assertTrue(len(cx_pool.sockets) >= 1)
868
# thread.join completes slightly *before* thread locals are
869
# cleaned up, so wait up to 5 seconds for them.
876
and cx_pool._socket_semaphore.counter < max_pool_size
877
and (time.time() - start) < 5
882
if max_pool_size is not None:
885
cx_pool._socket_semaphore.counter)
887
self.assertEqual(0, len(cx_pool._tid_to_sock))
889
def _test_max_pool_size_no_rendezvous(self, start_request, end_request):
892
max_pool_size=max_pool_size, auto_start_request=False)
894
# If you increase nthreads over about 35, note a
895
# Gevent 0.13.6 bug on Mac, Greenlet.join() hangs if more than
896
# about 35 Greenlets share a MongoClient. Apparently fixed in
897
# recent Gevent development.
899
# On the other hand, nthreads had better be much larger than
900
# max_pool_size to ensure that max_pool_size sockets are actually
901
# required at some point in this test's execution.
904
if (sys.platform.startswith('java')
905
and start_request > end_request
906
and nthreads > max_pool_size):
908
# Since Jython can't reclaim the socket and release the semaphore
909
# after a thread leaks a request, we'll exhaust the semaphore and
911
raise SkipTest("Jython can't do socket reclamation")
914
for i in range(nthreads):
915
t = CreateAndReleaseSocketNoRendezvous(
916
self, c, start_request, end_request)
922
if 'PyPy' in sys.version:
923
# With PyPy we need to kick off the gc whenever the threads hit the
924
# rendezvous since nthreads > max_pool_size.
925
gc_collect_until_done(threads)
931
self.assertTrue(t.passed)
933
cx_pool = get_pool(c)
935
# Socket-reclamation depends on timely garbage-collection
936
if 'PyPy' in sys.version:
939
if self.use_greenlets:
940
# Wait for Greenlet.link() callbacks to execute
941
the_hub = hub.get_hub()
942
if hasattr(the_hub, 'join'):
946
# Gevent 0.13 and less
949
# thread.join completes slightly *before* thread locals are
950
# cleaned up, so wait up to 5 seconds for them.
957
and cx_pool._socket_semaphore.counter < max_pool_size
958
and (time.time() - start) < 5
963
self.assertTrue(len(cx_pool.sockets) >= 1)
964
self.assertEqual(max_pool_size, cx_pool._socket_semaphore.counter)
966
def test_max_pool_size(self):
967
self._test_max_pool_size(
968
start_request=0, end_request=0, nthreads=10, max_pool_size=4)
970
def test_max_pool_size_none(self):
971
self._test_max_pool_size(
972
start_request=0, end_request=0, nthreads=10, max_pool_size=None)
974
def test_max_pool_size_with_request(self):
975
self._test_max_pool_size(
976
start_request=1, end_request=1, nthreads=10, max_pool_size=10)
978
def test_max_pool_size_with_multiple_request(self):
979
self._test_max_pool_size(
980
start_request=10, end_request=10, nthreads=10, max_pool_size=10)
982
def test_max_pool_size_with_redundant_request(self):
983
self._test_max_pool_size(
984
start_request=2, end_request=1, nthreads=10, max_pool_size=10)
986
def test_max_pool_size_with_redundant_request2(self):
987
self._test_max_pool_size(
988
start_request=20, end_request=1, nthreads=10, max_pool_size=10)
990
def test_max_pool_size_with_redundant_request_no_rendezvous(self):
991
self._test_max_pool_size_no_rendezvous(2, 1)
993
def test_max_pool_size_with_redundant_request_no_rendezvous2(self):
994
self._test_max_pool_size_no_rendezvous(20, 1)
996
def test_max_pool_size_with_leaked_request(self):
997
# Call start_request() but not end_request() -- when threads die, they
998
# should return their request sockets to the pool.
999
self._test_max_pool_size(
1000
start_request=1, end_request=0, nthreads=10, max_pool_size=10)
1002
def test_max_pool_size_with_leaked_request_no_rendezvous(self):
1003
self._test_max_pool_size_no_rendezvous(1, 0)
1005
def test_max_pool_size_with_end_request_only(self):
1006
# Call end_request() but not start_request()
1007
self._test_max_pool_size(0, 1)
1009
def test_max_pool_size_with_connection_failure(self):
1010
# The pool acquires its semaphore before attempting to connect; ensure
1011
# it releases the semaphore on connection failure.
1012
class TestPool(Pool):
1014
raise socket.error()
1016
test_pool = TestPool(
1017
pair=('example.com', 27017),
1022
wait_queue_timeout=1,
1023
use_greenlets=self.use_greenlets)
1025
# First call to get_socket fails; if pool doesn't release its semaphore
1026
# then the second call raises "ConnectionFailure: Timed out waiting for
1027
# socket from pool" instead of the socket.error.
1029
self.assertRaises(socket.error, test_pool.get_socket)
1032
class SocketGetter(MongoThread):
1033
"""Utility for _TestMaxOpenSockets and _TestWaitQueueMultiple"""
1034
def __init__(self, test_case, pool):
1035
super(SocketGetter, self).__init__(test_case)
1041
self.state = 'get_socket'
1042
self.sock = self.pool.get_socket()
1046
class _TestMaxOpenSockets(_TestPoolingBase):
1047
"""Test that connection pool doesn't open more than max_size sockets.
1048
To be run both with threads and with greenlets.
1050
def get_pool_with_wait_queue_timeout(self, wait_queue_timeout):
1051
return self.get_pool((host, port),
1054
wait_queue_timeout=wait_queue_timeout,
1055
wait_queue_multiple=None)
1057
def test_wait_queue_timeout(self):
1058
wait_queue_timeout = 2 # Seconds
1059
pool = self.get_pool_with_wait_queue_timeout(wait_queue_timeout)
1060
sock_info = pool.get_socket()
1062
self.assertRaises(ConnectionFailure, pool.get_socket)
1063
duration = time.time() - start
1065
abs(wait_queue_timeout - duration) < 1,
1066
"Waited %.2f seconds for a socket, expected %f" % (
1067
duration, wait_queue_timeout))
1071
def test_blocking(self):
1072
# Verify get_socket() with no wait_queue_timeout blocks forever.
1073
pool = self.get_pool_with_wait_queue_timeout(None)
1076
s1 = pool.get_socket()
1077
t = SocketGetter(self, pool)
1079
while t.state != 'get_socket':
1083
self.assertEqual(t.state, 'get_socket')
1084
pool.maybe_return_socket(s1)
1085
while t.state != 'sock':
1088
self.assertEqual(t.state, 'sock')
1089
self.assertEqual(t.sock, s1)
1093
class _TestWaitQueueMultiple(_TestPoolingBase):
1094
"""Test that connection pool doesn't allow more than
1095
waitQueueMultiple * max_size waiters.
1096
To be run both with threads and with greenlets.
1098
def get_pool_with_wait_queue_multiple(self, wait_queue_multiple):
1099
return self.get_pool((host, port),
1102
wait_queue_timeout=None,
1103
wait_queue_multiple=wait_queue_multiple)
1105
def test_wait_queue_multiple(self):
1106
pool = self.get_pool_with_wait_queue_multiple(3)
1108
# Reach max_size sockets.
1109
socket_info_0 = pool.get_socket()
1110
socket_info_1 = pool.get_socket()
1112
# Reach max_size * wait_queue_multiple waiters.
1115
t = SocketGetter(self, pool)
1121
self.assertEqual(t.state, 'get_socket')
1123
self.assertRaises(ExceededMaxWaiters, pool.get_socket)
1124
socket_info_0.close()
1125
socket_info_1.close()
1127
def test_wait_queue_multiple_unset(self):
1128
pool = self.get_pool_with_wait_queue_multiple(None)
1131
sock = pool.get_socket()
1134
for _ in xrange(30):
1135
t = SocketGetter(self, pool)
1140
self.assertEqual(t.state, 'get_socket')
1142
for socket_info in socks:
1146
class _TestPoolSocketSharing(_TestPoolingBase):
1147
"""Directly test that two simultaneous operations don't share a socket. To
1148
be run both with threads and with greenlets.
1150
def _test_pool(self, use_request):
1152
Test that the connection pool prevents both threads and greenlets from
1153
using a socket at the same time.
1156
gr0: start a slow find()
1157
gr1: start a fast find()
1162
use_greenlets=self.use_greenlets,
1163
auto_start_request=False
1166
db = cx.pymongo_test
1168
db.test.insert({'_id': 1})
1176
history.append('find_fast start')
1178
# With greenlets and the old connection._Pool, this would throw
1179
# AssertionError: "This event is already used by another
1181
self.assertEqual({'_id': 1}, db.test.find_one())
1182
history.append('find_fast done')
1191
history.append('find_slow start')
1193
# Javascript function that pauses N seconds per document
1195
if (is_mongos(db.connection) or not
1196
version.at_least(db.connection, (1, 7, 2))):
1197
# mongos doesn't support eval so we have to use $where
1198
# which is less reliable in this context.
1199
self.assertEqual(1, db.test.find({"$where": fn}).count())
1201
# 'nolock' allows find_fast to start and finish while we're
1202
# waiting for this to complete.
1203
self.assertEqual({'ok': 1.0, 'retval': True},
1204
db.command('eval', fn, nolock=True))
1206
history.append('find_slow done')
1211
if self.use_greenlets:
1212
gr0, gr1 = Greenlet(find_slow), Greenlet(find_fast)
1216
gr0 = threading.Thread(target=find_slow)
1218
gr1 = threading.Thread(target=find_fast)
1235
def test_pool(self):
1236
self._test_pool(use_request=False)
1238
def test_pool_request(self):
1239
self._test_pool(use_request=True)