1
# Copyright 2012 10gen, Inc.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
15
"""Base classes to test built-in connection-pooling with threads or greenlets.
29
from nose.plugins.skip import SkipTest
32
from pymongo.connection import Connection
33
from pymongo.pool import (
34
Pool, GreenletPool, NO_REQUEST, NO_SOCKET_YET, SocketInfo)
35
from pymongo.errors import ConfigurationError
36
from test import version
37
from test.test_connection import get_connection, host, port
38
from test.utils import delay
41
DB = "pymongo-pooling-tests"
44
if sys.version_info[0] >= 3:
45
from imp import reload
50
from gevent import Greenlet, monkey
57
"""Get one element of a set"""
61
def force_reclaim_sockets(cx_pool, n_expected):
62
# When a thread dies without ending its request, the SocketInfo it was
63
# using is deleted, and in its __del__ it returns the socket to the
64
# pool. However, when exactly that happens is unpredictable. Try
65
# various ways of forcing the issue.
67
if sys.platform.startswith('java'):
68
raise SkipTest("Jython can't reclaim sockets")
70
if 'PyPy' in sys.version:
71
raise SkipTest("Socket reclamation happens at unpredictable time in PyPy")
73
# Bizarre behavior in CPython 2.4, and possibly other CPython versions
74
# less than 2.7: the last dead thread's locals aren't cleaned up until
75
# the local attribute with the same name is accessed from a different
76
# thread. This assert checks that the thread-local is indeed local, and
77
# also triggers the cleanup so the socket is reclaimed.
78
if isinstance(cx_pool, Pool):
79
assert cx_pool.local.sock_info is None
81
# Try for a while to make garbage-collection call SocketInfo.__del__
83
while len(cx_pool.sockets) < n_expected and time.time() - start < 5:
87
# collect() didn't support 'generation' arg until 2.5
93
class MongoThread(object):
94
"""A thread, or a greenlet, that uses a Connection"""
95
def __init__(self, test_case):
96
self.use_greenlets = test_case.use_greenlets
97
self.connection = test_case.c
98
self.db = self.connection[DB]
103
if self.use_greenlets:
104
# A Gevent extended Greenlet
105
self.thread = Greenlet(self.run)
107
self.thread = threading.Thread(target=self.run)
108
self.thread.setDaemon(True) # Don't hang whole test if thread hangs
114
self.thread.join(300)
115
if self.use_greenlets:
116
assert self.thread.dead, "Greenlet timeout"
118
assert not self.thread.isAlive(), "Thread timeout"
123
self.run_mongo_thread()
125
# No exceptions thrown
128
def run_mongo_thread(self):
129
raise NotImplementedError()
132
class SaveAndFind(MongoThread):
134
def run_mongo_thread(self):
136
rand = random.randint(0, N)
137
_id = self.db.sf.save({"x": rand}, safe=True)
138
self.ut.assertEqual(rand, self.db.sf.find_one(_id)["x"])
141
class Unique(MongoThread):
143
def run_mongo_thread(self):
145
self.connection.start_request()
146
self.db.unique.insert({})
147
self.ut.assertEqual(None, self.db.error())
148
self.connection.end_request()
151
class NonUnique(MongoThread):
153
def run_mongo_thread(self):
155
self.connection.start_request()
156
self.db.unique.insert({"_id": "jesse"})
157
self.ut.assertNotEqual(None, self.db.error())
158
self.connection.end_request()
161
class Disconnect(MongoThread):
163
def run_mongo_thread(self):
165
self.connection.disconnect()
168
class NoRequest(MongoThread):
170
def run_mongo_thread(self):
171
self.connection.start_request()
174
self.db.unique.insert({"_id": "jesse"})
175
if not self.db.error():
178
self.connection.end_request()
179
self.ut.assertEqual(0, errors)
182
def run_cases(ut, cases):
186
ut.use_greenlets and sys.platform == 'darwin'
187
and gevent.version_info[0] < 1
189
# Gevent 0.13.6 bug on Mac, Greenlet.join() hangs if more than
190
# about 35 Greenlets share a Connection. Apparently fixed in
191
# recent Gevent development.
195
for i in range(nruns):
204
assert t.passed, "%s.run_mongo_thread() threw an exception" % repr(t)
207
class OneOp(MongoThread):
209
def __init__(self, ut):
210
super(OneOp, self).__init__(ut)
212
def run_mongo_thread(self):
213
pool = self.connection._Connection__pool
214
assert len(pool.sockets) == 1, "Expected 1 socket, found %d" % (
218
sock_info = one(pool.sockets)
220
self.connection.start_request()
222
# start_request() hasn't yet moved the socket from the general pool into
224
assert len(pool.sockets) == 1
225
assert one(pool.sockets) == sock_info
227
self.connection[DB].test.find_one()
229
# find_one() causes the socket to be used in the request, so now it's
230
# bound to this thread
231
assert len(pool.sockets) == 0
232
assert pool._get_request_state() == sock_info
233
self.connection.end_request()
235
# The socket is back in the pool
236
assert len(pool.sockets) == 1
237
assert one(pool.sockets) == sock_info
240
class CreateAndReleaseSocket(MongoThread):
242
def __init__(self, ut, connection, start_request, end_request):
243
super(CreateAndReleaseSocket, self).__init__(ut)
244
self.connection = connection
245
self.start_request = start_request
246
self.end_request = end_request
248
def run_mongo_thread(self):
249
# Do an operation that requires a socket.
250
# test_max_pool_size uses this to spin up lots of threads requiring
251
# lots of simultaneous connections, to ensure that Pool obeys its
252
# max_size configuration and closes extra sockets as they're returned.
253
# We need a delay here to ensure that more than max_size sockets are
255
for i in range(self.start_request):
256
self.connection.start_request()
258
self.connection[DB].test.find_one({'$where': delay(0.1)})
259
for i in range(self.end_request):
260
self.connection.end_request()
263
class _TestPoolingBase(object):
264
"""Base class for all connection-pool tests. Doesn't inherit from
265
unittest.TestCase, and its name is prefixed with "_" to avoid being
266
run by nose. Real tests double-inherit from this base and from TestCase.
268
use_greenlets = False
271
if self.use_greenlets:
273
raise SkipTest("Gevent not installed")
275
# Note we don't do patch_thread() or patch_all() - we're
276
# testing here that patch_thread() is unnecessary for
277
# the connection pool to work properly.
278
monkey.patch_socket()
280
self.c = self.get_connection(auto_start_request=False)
286
db.unique.insert({"_id": "jesse"})
288
db.test.insert([{} for i in range(10)])
292
if self.use_greenlets:
296
def get_connection(self, *args, **kwargs):
298
opts['use_greenlets'] = self.use_greenlets
299
return get_connection(*args, **opts)
301
def get_pool(self, *args, **kwargs):
302
if self.use_greenlets:
307
return klass(*args, **kwargs)
309
def assert_no_request(self):
311
NO_REQUEST, self.c._Connection__pool._get_request_state()
314
def assert_request_without_socket(self):
316
NO_SOCKET_YET, self.c._Connection__pool._get_request_state()
319
def assert_request_with_socket(self):
320
self.assertTrue(isinstance(
321
self.c._Connection__pool._get_request_state(), SocketInfo
324
def assert_pool_size(self, pool_size):
326
pool_size, len(self.c._Connection__pool.sockets)
330
class _TestPooling(_TestPoolingBase):
331
"""Basic pool tests, to be applied both to Pool and GreenletPool"""
332
def test_max_pool_size_validation(self):
334
ConfigurationError, Connection, host=host, port=port,
339
ConfigurationError, Connection, host=host, port=port,
343
c = Connection(host=host, port=port, max_pool_size=100)
344
self.assertEqual(c.max_pool_size, 100)
346
def test_no_disconnect(self):
347
run_cases(self, [NoRequest, NonUnique, Unique, SaveAndFind])
349
def test_simple_disconnect(self):
350
# Connection just created, expect 1 free socket
351
self.assert_pool_size(1)
352
self.assert_no_request()
354
self.c.start_request()
355
self.assert_request_without_socket()
356
cursor = self.c[DB].stuff.find()
358
# Cursor hasn't actually caused a request yet, so there's still 1 free
360
self.assert_pool_size(1)
361
self.assert_request_without_socket()
363
# Actually make a request to server, triggering a socket to be
364
# allocated to the request
366
self.assert_pool_size(0)
367
self.assert_request_with_socket()
369
# Pool returns to its original state
371
self.assert_no_request()
372
self.assert_pool_size(1)
375
self.assert_pool_size(0)
376
self.assert_no_request()
378
def test_disconnect(self):
379
run_cases(self, [SaveAndFind, Disconnect, Unique])
381
def test_independent_pools(self):
382
p = self.get_pool((host, port), 10, None, None, False)
383
self.c.start_request()
384
self.assertEqual(set(), p.sockets)
386
self.assertEqual(set(), p.sockets)
388
def test_dependent_pools(self):
389
self.assert_pool_size(1)
390
self.c.start_request()
391
self.assert_request_without_socket()
392
self.c.test.test.find_one()
393
self.assert_request_with_socket()
394
self.assert_pool_size(0)
396
self.assert_pool_size(1)
401
self.assertTrue(t.passed, "OneOp.run() threw exception")
403
self.assert_pool_size(1)
404
self.c.test.test.find_one()
405
self.assert_pool_size(1)
407
def test_multiple_connections(self):
408
a = self.get_connection(auto_start_request=False)
409
b = self.get_connection(auto_start_request=False)
410
self.assertEqual(1, len(a._Connection__pool.sockets))
411
self.assertEqual(1, len(b._Connection__pool.sockets))
414
a.test.test.find_one()
415
self.assertEqual(0, len(a._Connection__pool.sockets))
417
self.assertEqual(1, len(a._Connection__pool.sockets))
418
self.assertEqual(1, len(b._Connection__pool.sockets))
419
a_sock = one(a._Connection__pool.sockets)
422
self.assertEqual(1, len(a._Connection__pool.sockets))
423
self.assertEqual(1, len(b._Connection__pool.sockets))
426
b.test.test.find_one()
427
self.assertEqual(1, len(a._Connection__pool.sockets))
428
self.assertEqual(0, len(b._Connection__pool.sockets))
431
b_sock = one(b._Connection__pool.sockets)
432
b.test.test.find_one()
433
a.test.test.find_one()
435
self.assertEqual(b_sock,
436
b._Connection__pool.get_socket((b.host, b.port)))
437
self.assertEqual(a_sock,
438
a._Connection__pool.get_socket((a.host, a.port)))
440
def test_request(self):
441
# Check that Pool gives two different sockets in two calls to
442
# get_socket() -- doesn't automatically put us in a request any more
443
cx_pool = self.get_pool(
451
sock0 = cx_pool.get_socket()
452
sock1 = cx_pool.get_socket()
454
self.assertNotEqual(sock0, sock1)
456
# Now in a request, we'll get the same socket both times
457
cx_pool.start_request()
459
sock2 = cx_pool.get_socket()
460
sock3 = cx_pool.get_socket()
461
self.assertEqual(sock2, sock3)
463
# Pool didn't keep reference to sock0 or sock1; sock2 and 3 are new
464
self.assertNotEqual(sock0, sock2)
465
self.assertNotEqual(sock1, sock2)
467
# Return the request sock to pool
468
cx_pool.end_request()
470
sock4 = cx_pool.get_socket()
471
sock5 = cx_pool.get_socket()
473
# Not in a request any more, we get different sockets
474
self.assertNotEqual(sock4, sock5)
476
# end_request() returned sock2 to pool
477
self.assertEqual(sock4, sock2)
479
def test_reset_and_request(self):
480
# reset() is called after a fork, or after a socket error. Ensure that
481
# a new request is begun if a request was in progress when the reset()
482
# occurred, otherwise no request is begun.
483
p = self.get_pool((host, port), 10, None, None, False)
484
self.assertFalse(p.in_request())
486
self.assertTrue(p.in_request())
488
self.assertTrue(p.in_request())
490
self.assertFalse(p.in_request())
492
self.assertFalse(p.in_request())
494
def test_pool_reuses_open_socket(self):
495
# Test Pool's _check_closed() method doesn't close a healthy socket
496
cx_pool = self.get_pool((host,port), 10, None, None, False)
497
sock_info = cx_pool.get_socket()
498
cx_pool.return_socket(sock_info)
500
# trigger _check_closed, which only runs on sockets that haven't been
503
new_sock_info = cx_pool.get_socket()
504
self.assertEqual(sock_info, new_sock_info)
505
del sock_info, new_sock_info
507
# Assert sock_info was returned to the pool *once*
508
force_reclaim_sockets(cx_pool, 1)
509
self.assertEqual(1, len(cx_pool.sockets))
511
def test_pool_removes_dead_socket(self):
512
# Test that Pool removes dead socket and the socket doesn't return
514
cx_pool = self.get_pool((host,port), 10, None, None, False)
515
sock_info = cx_pool.get_socket()
517
# Simulate a closed socket without telling the SocketInfo it's closed
518
sock_info.sock.close()
519
self.assertTrue(pymongo.pool._closed(sock_info.sock))
520
cx_pool.return_socket(sock_info)
521
time.sleep(1.1) # trigger _check_closed
522
new_sock_info = cx_pool.get_socket()
523
self.assertEqual(0, len(cx_pool.sockets))
524
self.assertNotEqual(sock_info, new_sock_info)
525
del sock_info, new_sock_info
527
# new_sock_info returned to the pool, but not the closed sock_info
528
force_reclaim_sockets(cx_pool, 1)
529
self.assertEqual(1, len(cx_pool.sockets))
531
def test_pool_removes_dead_request_socket(self):
532
# Test that Pool keeps request going even if a socket dies in request
533
cx_pool = self.get_pool((host,port), 10, None, None, False)
534
cx_pool.start_request()
536
# Get the request socket
537
sock_info = cx_pool.get_socket()
538
self.assertEqual(0, len(cx_pool.sockets))
539
self.assertEqual(sock_info, cx_pool._get_request_state())
540
sock_info.sock.close()
541
cx_pool.return_socket(sock_info)
542
time.sleep(1.1) # trigger _check_closed
544
# Although the request socket died, we're still in a request with a
546
new_sock_info = cx_pool.get_socket()
547
self.assertNotEqual(sock_info, new_sock_info)
548
self.assertEqual(new_sock_info, cx_pool._get_request_state())
549
cx_pool.return_socket(new_sock_info)
550
self.assertEqual(new_sock_info, cx_pool._get_request_state())
551
self.assertEqual(0, len(cx_pool.sockets))
553
cx_pool.end_request()
554
self.assertEqual(1, len(cx_pool.sockets))
556
def test_pool_removes_dead_socket_after_request(self):
557
# Test that Pool handles a socket dying that *used* to be the request
559
cx_pool = self.get_pool((host,port), 10, None, None, False)
560
cx_pool.start_request()
562
# Get the request socket
563
sock_info = cx_pool.get_socket()
564
self.assertEqual(sock_info, cx_pool._get_request_state())
567
cx_pool.end_request()
568
self.assertEqual(1, len(cx_pool.sockets))
570
# Kill old request socket
571
sock_info.sock.close()
572
old_sock_info_id = id(sock_info)
574
time.sleep(1.1) # trigger _check_closed
576
# Dead socket detected and removed
577
new_sock_info = cx_pool.get_socket()
578
self.assertNotEqual(id(new_sock_info), old_sock_info_id)
579
self.assertEqual(0, len(cx_pool.sockets))
580
self.assertFalse(pymongo.pool._closed(new_sock_info.sock))
582
def test_socket_reclamation(self):
583
# Check that if a thread starts a request and dies without ending
584
# the request, that the socket is reclaimed into the pool.
585
cx_pool = self.get_pool(
593
self.assertEqual(0, len(cx_pool.sockets))
599
self.assertEqual(NO_REQUEST, cx_pool._get_request_state())
600
cx_pool.start_request()
601
self.assertEqual(NO_SOCKET_YET, cx_pool._get_request_state())
602
sock_info = cx_pool.get_socket()
603
self.assertEqual(sock_info, cx_pool._get_request_state())
604
the_sock[0] = id(sock_info.sock)
606
if not self.use_greenlets:
609
if self.use_greenlets:
610
g = Greenlet(leak_request)
613
self.assertTrue(g.ready(), "Greenlet is hung")
615
lock = thread.allocate_lock()
618
# Start a thread WITHOUT a threading.Thread - important to test that
619
# Pool can deal with primitive threads.
620
thread.start_new_thread(leak_request, ())
623
acquired = lock.acquire()
624
self.assertTrue(acquired, "Thread is hung")
626
force_reclaim_sockets(cx_pool, 1)
628
# Pool reclaimed the socket
629
self.assertEqual(1, len(cx_pool.sockets))
630
self.assertEqual(the_sock[0], id(one(cx_pool.sockets).sock))
633
class _TestMaxPoolSize(_TestPoolingBase):
634
"""Test that connection pool keeps proper number of idle sockets open,
635
no matter how start/end_request are called. To be applied both to Pool and
638
def _test_max_pool_size(self, start_request, end_request):
639
c = self.get_connection(max_pool_size=4, auto_start_request=False)
643
self.use_greenlets and sys.platform == 'darwin'
644
and gevent.version_info[0] < 1
646
# Gevent 0.13.6 bug on Mac, Greenlet.join() hangs if more than
647
# about 35 Greenlets share a Connection. Apparently fixed in
648
# recent Gevent development.
652
for i in range(nthreads):
653
t = CreateAndReleaseSocket(self, c, start_request, end_request)
663
self.assertTrue(t.passed)
665
# Critical: release refs to threads, so SocketInfo.__del__() executes
666
# and reclaims sockets.
670
cx_pool = c._Connection__pool
671
force_reclaim_sockets(cx_pool, 4)
673
nsock = len(cx_pool.sockets)
675
# Socket-reclamation depends on timely garbage-collection, so be lenient
676
self.assertTrue(2 <= nsock <= 4,
677
msg="Expected between 2 and 4 sockets in the pool, got %d" % nsock)
679
def test_max_pool_size(self):
680
self._test_max_pool_size(0, 0)
682
def test_max_pool_size_with_request(self):
683
self._test_max_pool_size(1, 1)
685
def test_max_pool_size_with_redundant_request(self):
686
self._test_max_pool_size(2, 1)
687
self._test_max_pool_size(20, 1)
689
def test_max_pool_size_with_leaked_request(self):
690
# Call start_request() but not end_request() -- when threads die, they
691
# should return their request sockets to the pool.
692
self._test_max_pool_size(1, 0)
694
def test_max_pool_size_with_end_request_only(self):
695
# Call end_request() but not start_request()
696
self._test_max_pool_size(0, 1)
699
class _TestPoolSocketSharing(_TestPoolingBase):
700
"""Directly test that two simultaneous operations don't share a socket. To
701
be applied both to Pool and GreenletPool.
703
def _test_pool(self, use_request):
705
Test that the connection pool prevents both threads and greenlets from
706
using a socket at the same time.
709
gr0: start a slow find()
710
gr1: start a fast find()
715
use_greenlets=self.use_greenlets,
716
auto_start_request=False
720
if not version.at_least(db.connection, (1, 7, 2)):
721
raise SkipTest("Need at least MongoDB version 1.7.2 to use"
722
" db.eval(nolock=True)")
724
db.test.remove(safe=True)
725
db.test.insert({'_id': 1}, safe=True)
733
history.append('find_fast start')
735
# With greenlets and the old connection._Pool, this would throw
736
# AssertionError: "This event is already used by another
738
self.assertEqual({'_id': 1}, db.test.find_one())
739
history.append('find_fast done')
748
history.append('find_slow start')
750
# Javascript function that pauses 5 sec. 'nolock' allows find_fast
751
# to start and finish while we're waiting for this.
754
{'ok': 1.0, 'retval': True},
755
db.command('eval', fn, nolock=True))
757
history.append('find_slow done')
762
if self.use_greenlets:
763
gr0, gr1 = Greenlet(find_slow), Greenlet(find_fast)
767
gr0 = threading.Thread(target=find_slow)
769
gr1 = threading.Thread(target=find_fast)
787
self._test_pool(use_request=False)
789
def test_pool_request(self):
790
self._test_pool(use_request=True)