~ubuntu-branches/ubuntu/wily/pymongo/wily-proposed

« back to all changes in this revision

Viewing changes to test/test_pooling_base.py

  • Committer: Package Import Robot
  • Author(s): Federico Ceratto
  • Date: 2015-04-26 22:43:13 UTC
  • mfrom: (24.1.5 sid)
  • Revision ID: package-import@ubuntu.com-20150426224313-0hga2jphvf0rrmfe
Tags: 3.0.1-1
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2012-2014 MongoDB, Inc.
2
 
#
3
 
# Licensed under the Apache License, Version 2.0 (the "License");
4
 
# you may not use this file except in compliance with the License.
5
 
# You may obtain a copy of the License at
6
 
#
7
 
# http://www.apache.org/licenses/LICENSE-2.0
8
 
#
9
 
# Unless required by applicable law or agreed to in writing, software
10
 
# distributed under the License is distributed on an "AS IS" BASIS,
11
 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
 
# See the License for the specific language governing permissions and
13
 
# limitations under the License.
14
 
 
15
 
"""Base classes to test built-in connection-pooling with threads or greenlets.
16
 
"""
17
 
 
18
 
import gc
19
 
import random
20
 
import socket
21
 
import sys
22
 
import thread
23
 
import threading
24
 
import time
25
 
 
26
 
sys.path[0:0] = [""]
27
 
 
28
 
from nose.plugins.skip import SkipTest
29
 
 
30
 
import pymongo.pool
31
 
from pymongo.mongo_client import MongoClient
32
 
from pymongo.pool import Pool, NO_REQUEST, NO_SOCKET_YET, SocketInfo
33
 
from pymongo.errors import ConfigurationError, ConnectionFailure
34
 
from pymongo.errors import ExceededMaxWaiters
35
 
from test import version, host, port
36
 
from test.test_client import get_client
37
 
from test.utils import delay, is_mongos, one, get_pool
38
 
 
39
 
N = 10
40
 
DB = "pymongo-pooling-tests"
41
 
 
42
 
 
43
 
if sys.version_info[0] >= 3:
44
 
    from imp import reload
45
 
 
46
 
 
47
 
try:
48
 
    import gevent
49
 
    from gevent import Greenlet, monkey, hub
50
 
    import gevent.coros, gevent.event
51
 
    has_gevent = True
52
 
except ImportError:
53
 
    has_gevent = False
54
 
 
55
 
 
56
 
def gc_collect_until_done(threads, timeout=60):
57
 
    start = time.time()
58
 
    running = list(threads)
59
 
    while running:
60
 
        assert (time.time() - start) < timeout, "Threads timed out"
61
 
        for t in running:
62
 
            t.thread.join(0.1)
63
 
            if not t.alive:
64
 
                running.remove(t)
65
 
        gc.collect()
66
 
 
67
 
 
68
 
class MongoThread(object):
69
 
    """A thread, or a greenlet, that uses a MongoClient"""
70
 
    def __init__(self, test_case):
71
 
        self.use_greenlets = test_case.use_greenlets
72
 
        self.client = test_case.c
73
 
        self.db = self.client[DB]
74
 
        self.ut = test_case
75
 
        self.passed = False
76
 
 
77
 
    def start(self):
78
 
        if self.use_greenlets:
79
 
            # A Gevent extended Greenlet
80
 
            self.thread = Greenlet(self.run)
81
 
        else:
82
 
            self.thread = threading.Thread(target=self.run)
83
 
            self.thread.setDaemon(True)  # Don't hang whole test if thread hangs
84
 
 
85
 
        self.thread.start()
86
 
 
87
 
    @property
88
 
    def alive(self):
89
 
        if self.use_greenlets:
90
 
            return not self.thread.dead
91
 
        else:
92
 
            return self.thread.isAlive()
93
 
 
94
 
    def join(self):
95
 
        self.thread.join(20)
96
 
        if self.use_greenlets:
97
 
            msg = "Greenlet timeout"
98
 
        else:
99
 
            msg = "Thread timeout"
100
 
        assert not self.alive, msg
101
 
        self.thread = None
102
 
 
103
 
    def run(self):
104
 
        self.run_mongo_thread()
105
 
 
106
 
        # No exceptions thrown
107
 
        self.passed = True
108
 
 
109
 
    def run_mongo_thread(self):
110
 
        raise NotImplementedError()
111
 
 
112
 
 
113
 
class SaveAndFind(MongoThread):
114
 
 
115
 
    def run_mongo_thread(self):
116
 
        for _ in xrange(N):
117
 
            rand = random.randint(0, N)
118
 
            _id = self.db.sf.save({"x": rand})
119
 
            self.ut.assertEqual(rand, self.db.sf.find_one(_id)["x"])
120
 
 
121
 
 
122
 
class Unique(MongoThread):
123
 
 
124
 
    def run_mongo_thread(self):
125
 
        for _ in xrange(N):
126
 
            self.client.start_request()
127
 
            self.db.unique.insert({})  # no error
128
 
            self.client.end_request()
129
 
 
130
 
 
131
 
class NonUnique(MongoThread):
132
 
 
133
 
    def run_mongo_thread(self):
134
 
        for _ in xrange(N):
135
 
            self.client.start_request()
136
 
            self.db.unique.insert({"_id": "jesse"}, w=0)
137
 
            self.ut.assertNotEqual(None, self.db.error())
138
 
            self.client.end_request()
139
 
 
140
 
 
141
 
class Disconnect(MongoThread):
142
 
 
143
 
    def run_mongo_thread(self):
144
 
        for _ in xrange(N):
145
 
            self.client.disconnect()
146
 
 
147
 
 
148
 
class NoRequest(MongoThread):
149
 
 
150
 
    def run_mongo_thread(self):
151
 
        self.client.start_request()
152
 
        errors = 0
153
 
        for _ in xrange(N):
154
 
            self.db.unique.insert({"_id": "jesse"}, w=0)
155
 
            if not self.db.error():
156
 
                errors += 1
157
 
 
158
 
        self.client.end_request()
159
 
        self.ut.assertEqual(0, errors)
160
 
 
161
 
 
162
 
def run_cases(ut, cases):
163
 
    threads = []
164
 
    nruns = 10
165
 
    if (
166
 
        ut.use_greenlets and sys.platform == 'darwin'
167
 
        and gevent.version_info[0] < 1
168
 
    ):
169
 
        # Gevent 0.13.6 bug on Mac, Greenlet.join() hangs if more than
170
 
        # about 35 Greenlets share a MongoClient. Apparently fixed in
171
 
        # recent Gevent development.
172
 
        nruns = 5
173
 
 
174
 
    for case in cases:
175
 
        for i in range(nruns):
176
 
            t = case(ut)
177
 
            t.start()
178
 
            threads.append(t)
179
 
 
180
 
    for t in threads:
181
 
        t.join()
182
 
 
183
 
    for t in threads:
184
 
        assert t.passed, "%s.run_mongo_thread() threw an exception" % repr(t)
185
 
 
186
 
 
187
 
class OneOp(MongoThread):
188
 
 
189
 
    def __init__(self, ut):
190
 
        super(OneOp, self).__init__(ut)
191
 
 
192
 
    def run_mongo_thread(self):
193
 
        pool = get_pool(self.client)
194
 
        assert len(pool.sockets) == 1, "Expected 1 socket, found %d" % (
195
 
            len(pool.sockets)
196
 
        )
197
 
 
198
 
        sock_info = one(pool.sockets)
199
 
 
200
 
        self.client.start_request()
201
 
 
202
 
        # start_request() hasn't yet moved the socket from the general pool into
203
 
        # the request
204
 
        assert len(pool.sockets) == 1
205
 
        assert one(pool.sockets) == sock_info
206
 
 
207
 
        self.client[DB].test.find_one()
208
 
 
209
 
        # find_one() causes the socket to be used in the request, so now it's
210
 
        # bound to this thread
211
 
        assert len(pool.sockets) == 0
212
 
        assert pool._get_request_state() == sock_info
213
 
        self.client.end_request()
214
 
 
215
 
        # The socket is back in the pool
216
 
        assert len(pool.sockets) == 1
217
 
        assert one(pool.sockets) == sock_info
218
 
 
219
 
 
220
 
class CreateAndReleaseSocket(MongoThread):
221
 
    """A thread or greenlet that acquires a socket, waits for all other threads
222
 
    to reach rendezvous point, then terminates.
223
 
    """
224
 
    class Rendezvous(object):
225
 
        def __init__(self, nthreads, use_greenlets):
226
 
            self.nthreads = nthreads
227
 
            self.nthreads_run = 0
228
 
            self.use_greenlets = use_greenlets
229
 
            if use_greenlets:
230
 
                self.lock = gevent.coros.RLock()
231
 
            else:
232
 
                self.lock = threading.Lock()
233
 
            self.reset_ready()
234
 
 
235
 
        def reset_ready(self):
236
 
            if self.use_greenlets:
237
 
                self.ready = gevent.event.Event()
238
 
            else:
239
 
                self.ready = threading.Event()
240
 
 
241
 
    def __init__(self, ut, client, start_request, end_request, rendezvous):
242
 
        super(CreateAndReleaseSocket, self).__init__(ut)
243
 
        self.client = client
244
 
        self.start_request = start_request
245
 
        self.end_request = end_request
246
 
        self.rendezvous = rendezvous
247
 
 
248
 
    def run_mongo_thread(self):
249
 
        # Do an operation that requires a socket.
250
 
        # test_max_pool_size uses this to spin up lots of threads requiring
251
 
        # lots of simultaneous connections, to ensure that Pool obeys its
252
 
        # max_size configuration and closes extra sockets as they're returned.
253
 
        for i in range(self.start_request):
254
 
            self.client.start_request()
255
 
 
256
 
        # Use a socket
257
 
        self.client[DB].test.find_one()
258
 
 
259
 
        # Don't finish until all threads reach this point
260
 
        r = self.rendezvous
261
 
        r.lock.acquire()
262
 
        r.nthreads_run += 1
263
 
        if r.nthreads_run == r.nthreads:
264
 
            # Everyone's here, let them finish
265
 
            r.ready.set()
266
 
            r.lock.release()
267
 
        else:
268
 
            r.lock.release()
269
 
            r.ready.wait(30)  # Wait thirty seconds....
270
 
            assert r.ready.isSet(), "Rendezvous timed out"
271
 
 
272
 
        for i in range(self.end_request):
273
 
            self.client.end_request()
274
 
 
275
 
 
276
 
class CreateAndReleaseSocketNoRendezvous(MongoThread):
277
 
    """A thread or greenlet that acquires a socket and terminates without
278
 
    waiting for other threads to reach rendezvous point.
279
 
    """
280
 
    class Rendezvous(object):
281
 
        def __init__(self, nthreads, use_greenlets):
282
 
            self.nthreads = nthreads
283
 
            self.nthreads_run = 0
284
 
            if use_greenlets:
285
 
                self.lock = gevent.coros.RLock()
286
 
                self.ready = gevent.event.Event()
287
 
            else:
288
 
                self.lock = threading.Lock()
289
 
                self.ready = threading.Event()
290
 
 
291
 
    def __init__(self, ut, client, start_request, end_request):
292
 
        super(CreateAndReleaseSocketNoRendezvous, self).__init__(ut)
293
 
        self.client = client
294
 
        self.start_request = start_request
295
 
        self.end_request = end_request
296
 
 
297
 
    def run_mongo_thread(self):
298
 
        # Do an operation that requires a socket.
299
 
        # test_max_pool_size uses this to spin up lots of threads requiring
300
 
        # lots of simultaneous connections, to ensure that Pool obeys its
301
 
        # max_size configuration and closes extra sockets as they're returned.
302
 
        for i in range(self.start_request):
303
 
            self.client.start_request()
304
 
 
305
 
        # Use a socket
306
 
        self.client[DB].test.find_one()
307
 
        for i in range(self.end_request):
308
 
            self.client.end_request()
309
 
 
310
 
 
311
 
class _TestPoolingBase(object):
312
 
    """Base class for all client-pool tests. Doesn't inherit from
313
 
    unittest.TestCase, and its name is prefixed with "_" to avoid being
314
 
    run by nose. Real tests double-inherit from this base and from TestCase.
315
 
    """
316
 
    use_greenlets = False
317
 
 
318
 
    def setUp(self):
319
 
        if self.use_greenlets:
320
 
            if not has_gevent:
321
 
                raise SkipTest("Gevent not installed")
322
 
 
323
 
            # Note we don't do patch_thread() or patch_all() - we're
324
 
            # testing here that patch_thread() is unnecessary for
325
 
            # the client pool to work properly.
326
 
            monkey.patch_socket()
327
 
 
328
 
        self.c = self.get_client(auto_start_request=False)
329
 
 
330
 
        # reset the db
331
 
        db = self.c[DB]
332
 
        db.unique.drop()
333
 
        db.test.drop()
334
 
        db.unique.insert({"_id": "jesse"})
335
 
 
336
 
        db.test.insert([{} for i in range(10)])
337
 
 
338
 
    def tearDown(self):
339
 
        self.c.close()
340
 
        self.c = None
341
 
        if self.use_greenlets:
342
 
            # Undo patch
343
 
            reload(socket)
344
 
 
345
 
    def get_client(self, *args, **kwargs):
346
 
        opts = kwargs.copy()
347
 
        opts['use_greenlets'] = self.use_greenlets
348
 
        return get_client(*args, **opts)
349
 
 
350
 
    def get_pool(self, *args, **kwargs):
351
 
        kwargs['use_greenlets'] = self.use_greenlets
352
 
        return Pool(*args, **kwargs)
353
 
 
354
 
    def sleep(self, seconds):
355
 
        if self.use_greenlets:
356
 
            gevent.sleep(seconds)
357
 
        else:
358
 
            time.sleep(seconds)
359
 
 
360
 
    def assert_no_request(self):
361
 
        self.assertTrue(
362
 
            self.c._MongoClient__member is None or
363
 
            NO_REQUEST == get_pool(self.c)._get_request_state()
364
 
        )
365
 
 
366
 
    def assert_request_without_socket(self):
367
 
        self.assertEqual(
368
 
            NO_SOCKET_YET, get_pool(self.c)._get_request_state()
369
 
        )
370
 
 
371
 
    def assert_request_with_socket(self):
372
 
        self.assertTrue(isinstance(
373
 
            get_pool(self.c)._get_request_state(), SocketInfo
374
 
        ))
375
 
 
376
 
    def assert_pool_size(self, pool_size):
377
 
        if pool_size == 0:
378
 
            self.assertTrue(
379
 
                self.c._MongoClient__member is None
380
 
                or not get_pool(self.c).sockets
381
 
            )
382
 
        else:
383
 
            self.assertEqual(
384
 
                pool_size, len(get_pool(self.c).sockets)
385
 
            )
386
 
 
387
 
 
388
 
class _TestPooling(_TestPoolingBase):
389
 
    """Basic pool tests, to be run both with threads and with greenlets."""
390
 
    def test_max_pool_size_validation(self):
391
 
        self.assertRaises(
392
 
            ConfigurationError, MongoClient, host=host, port=port,
393
 
            max_pool_size=-1
394
 
        )
395
 
 
396
 
        self.assertRaises(
397
 
            ConfigurationError, MongoClient, host=host, port=port,
398
 
            max_pool_size='foo'
399
 
        )
400
 
 
401
 
        c = MongoClient(host=host, port=port, max_pool_size=100)
402
 
        self.assertEqual(c.max_pool_size, 100)
403
 
 
404
 
    def test_no_disconnect(self):
405
 
        run_cases(self, [NoRequest, NonUnique, Unique, SaveAndFind])
406
 
 
407
 
    def test_simple_disconnect(self):
408
 
        # MongoClient just created, expect 1 free socket
409
 
        self.assert_pool_size(1)
410
 
        self.assert_no_request()
411
 
 
412
 
        self.c.start_request()
413
 
        self.assert_request_without_socket()
414
 
        cursor = self.c[DB].stuff.find()
415
 
 
416
 
        # Cursor hasn't actually caused a request yet, so there's still 1 free
417
 
        # socket.
418
 
        self.assert_pool_size(1)
419
 
        self.assert_request_without_socket()
420
 
 
421
 
        # Actually make a request to server, triggering a socket to be
422
 
        # allocated to the request
423
 
        list(cursor)
424
 
        self.assert_pool_size(0)
425
 
        self.assert_request_with_socket()
426
 
 
427
 
        # Pool returns to its original state
428
 
        self.c.end_request()
429
 
        self.assert_no_request()
430
 
        self.assert_pool_size(1)
431
 
 
432
 
        self.c.disconnect()
433
 
        self.assert_pool_size(0)
434
 
        self.assert_no_request()
435
 
 
436
 
    def test_disconnect(self):
437
 
        run_cases(self, [SaveAndFind, Disconnect, Unique])
438
 
 
439
 
    def test_independent_pools(self):
440
 
        # Test for regression of very early PyMongo bug: separate pools shared
441
 
        # state.
442
 
        p = self.get_pool((host, port), 10, None, None, False)
443
 
        self.c.start_request()
444
 
        self.c.pymongo_test.test.find_one()
445
 
        self.assertEqual(set(), p.sockets)
446
 
        self.c.end_request()
447
 
        self.assert_pool_size(1)
448
 
        self.assertEqual(set(), p.sockets)
449
 
 
450
 
    def test_dependent_pools(self):
451
 
        self.assert_pool_size(1)
452
 
        self.c.start_request()
453
 
        self.assert_request_without_socket()
454
 
        self.c.pymongo_test.test.find_one()
455
 
        self.assert_request_with_socket()
456
 
        self.assert_pool_size(0)
457
 
        self.c.end_request()
458
 
        self.assert_pool_size(1)
459
 
 
460
 
        t = OneOp(self)
461
 
        t.start()
462
 
        t.join()
463
 
        self.assertTrue(t.passed, "OneOp.run() threw exception")
464
 
 
465
 
        self.assert_pool_size(1)
466
 
        self.c.pymongo_test.test.find_one()
467
 
        self.assert_pool_size(1)
468
 
 
469
 
    def test_multiple_connections(self):
470
 
        a = self.get_client(auto_start_request=False)
471
 
        b = self.get_client(auto_start_request=False)
472
 
        self.assertEqual(1, len(get_pool(a).sockets))
473
 
        self.assertEqual(1, len(get_pool(b).sockets))
474
 
 
475
 
        a.start_request()
476
 
        a.pymongo_test.test.find_one()
477
 
        self.assertEqual(0, len(get_pool(a).sockets))
478
 
        a.end_request()
479
 
        self.assertEqual(1, len(get_pool(a).sockets))
480
 
        self.assertEqual(1, len(get_pool(b).sockets))
481
 
        a_sock = one(get_pool(a).sockets)
482
 
 
483
 
        b.end_request()
484
 
        self.assertEqual(1, len(get_pool(a).sockets))
485
 
        self.assertEqual(1, len(get_pool(b).sockets))
486
 
 
487
 
        b.start_request()
488
 
        b.pymongo_test.test.find_one()
489
 
        self.assertEqual(1, len(get_pool(a).sockets))
490
 
        self.assertEqual(0, len(get_pool(b).sockets))
491
 
 
492
 
        b.end_request()
493
 
        b_sock = one(get_pool(b).sockets)
494
 
        b.pymongo_test.test.find_one()
495
 
        a.pymongo_test.test.find_one()
496
 
 
497
 
        self.assertEqual(b_sock,
498
 
                         get_pool(b).get_socket())
499
 
        self.assertEqual(a_sock,
500
 
                         get_pool(a).get_socket())
501
 
 
502
 
        a_sock.close()
503
 
        b_sock.close()
504
 
 
505
 
    def test_request(self):
506
 
        # Check that Pool gives two different sockets in two calls to
507
 
        # get_socket() -- doesn't automatically put us in a request any more
508
 
        cx_pool = self.get_pool(
509
 
            pair=(host,port),
510
 
            max_size=10,
511
 
            net_timeout=1000,
512
 
            conn_timeout=1000,
513
 
            use_ssl=False
514
 
        )
515
 
 
516
 
        sock0 = cx_pool.get_socket()
517
 
        sock1 = cx_pool.get_socket()
518
 
 
519
 
        self.assertNotEqual(sock0, sock1)
520
 
 
521
 
        # Now in a request, we'll get the same socket both times
522
 
        cx_pool.start_request()
523
 
 
524
 
        sock2 = cx_pool.get_socket()
525
 
        sock3 = cx_pool.get_socket()
526
 
        self.assertEqual(sock2, sock3)
527
 
 
528
 
        # Pool didn't keep reference to sock0 or sock1; sock2 and 3 are new
529
 
        self.assertNotEqual(sock0, sock2)
530
 
        self.assertNotEqual(sock1, sock2)
531
 
 
532
 
        # Return the request sock to pool
533
 
        cx_pool.end_request()
534
 
 
535
 
        sock4 = cx_pool.get_socket()
536
 
        sock5 = cx_pool.get_socket()
537
 
 
538
 
        # Not in a request any more, we get different sockets
539
 
        self.assertNotEqual(sock4, sock5)
540
 
 
541
 
        # end_request() returned sock2 to pool
542
 
        self.assertEqual(sock4, sock2)
543
 
 
544
 
        for s in [sock0, sock1, sock2, sock3, sock4, sock5]:
545
 
            s.close()
546
 
 
547
 
    def test_reset_and_request(self):
548
 
        # reset() is called after a fork, or after a socket error. Ensure that
549
 
        # a new request is begun if a request was in progress when the reset()
550
 
        # occurred, otherwise no request is begun.
551
 
        p = self.get_pool((host, port), 10, None, None, False)
552
 
        self.assertFalse(p.in_request())
553
 
        p.start_request()
554
 
        self.assertTrue(p.in_request())
555
 
        p.reset()
556
 
        self.assertTrue(p.in_request())
557
 
        p.end_request()
558
 
        self.assertFalse(p.in_request())
559
 
        p.reset()
560
 
        self.assertFalse(p.in_request())
561
 
 
562
 
    def test_pool_reuses_open_socket(self):
563
 
        # Test Pool's _check_closed() method doesn't close a healthy socket
564
 
        cx_pool = self.get_pool((host,port), 10, None, None, False)
565
 
        cx_pool._check_interval_seconds = 0  # Always check.
566
 
        sock_info = cx_pool.get_socket()
567
 
        cx_pool.maybe_return_socket(sock_info)
568
 
 
569
 
        new_sock_info = cx_pool.get_socket()
570
 
        self.assertEqual(sock_info, new_sock_info)
571
 
        cx_pool.maybe_return_socket(new_sock_info)
572
 
        self.assertEqual(1, len(cx_pool.sockets))
573
 
 
574
 
    def test_pool_removes_dead_socket(self):
575
 
        # Test that Pool removes dead socket and the socket doesn't return
576
 
        # itself PYTHON-344
577
 
        cx_pool = self.get_pool((host,port), 10, None, None, False)
578
 
        cx_pool._check_interval_seconds = 0  # Always check.
579
 
        sock_info = cx_pool.get_socket()
580
 
 
581
 
        # Simulate a closed socket without telling the SocketInfo it's closed
582
 
        sock_info.sock.close()
583
 
        self.assertTrue(pymongo.pool._closed(sock_info.sock))
584
 
        cx_pool.maybe_return_socket(sock_info)
585
 
        new_sock_info = cx_pool.get_socket()
586
 
        self.assertEqual(0, len(cx_pool.sockets))
587
 
        self.assertNotEqual(sock_info, new_sock_info)
588
 
        cx_pool.maybe_return_socket(new_sock_info)
589
 
        self.assertEqual(1, len(cx_pool.sockets))
590
 
 
591
 
    def test_pool_removes_dead_request_socket_after_check(self):
592
 
        # Test that Pool keeps request going even if a socket dies in request
593
 
        cx_pool = self.get_pool((host,port), 10, None, None, False)
594
 
        cx_pool._check_interval_seconds = 0  # Always check.
595
 
        cx_pool.start_request()
596
 
 
597
 
        # Get the request socket
598
 
        sock_info = cx_pool.get_socket()
599
 
        self.assertEqual(0, len(cx_pool.sockets))
600
 
        self.assertEqual(sock_info, cx_pool._get_request_state())
601
 
        sock_info.sock.close()
602
 
        cx_pool.maybe_return_socket(sock_info)
603
 
 
604
 
        # Although the request socket died, we're still in a request with a
605
 
        # new socket
606
 
        new_sock_info = cx_pool.get_socket()
607
 
        self.assertTrue(cx_pool.in_request())
608
 
        self.assertNotEqual(sock_info, new_sock_info)
609
 
        self.assertEqual(new_sock_info, cx_pool._get_request_state())
610
 
        cx_pool.maybe_return_socket(new_sock_info)
611
 
        self.assertEqual(new_sock_info, cx_pool._get_request_state())
612
 
        self.assertEqual(0, len(cx_pool.sockets))
613
 
 
614
 
        cx_pool.end_request()
615
 
        self.assertEqual(1, len(cx_pool.sockets))
616
 
 
617
 
    def test_pool_removes_dead_request_socket(self):
618
 
        # Test that Pool keeps request going even if a socket dies in request
619
 
        cx_pool = self.get_pool((host,port), 10, None, None, False)
620
 
        cx_pool.start_request()
621
 
 
622
 
        # Get the request socket
623
 
        sock_info = cx_pool.get_socket()
624
 
        self.assertEqual(0, len(cx_pool.sockets))
625
 
        self.assertEqual(sock_info, cx_pool._get_request_state())
626
 
 
627
 
        # Unlike in test_pool_removes_dead_request_socket_after_check, we
628
 
        # set sock_info.closed and *don't* wait for it to be checked.
629
 
        sock_info.close()
630
 
        cx_pool.maybe_return_socket(sock_info)
631
 
 
632
 
        # Although the request socket died, we're still in a request with a
633
 
        # new socket
634
 
        new_sock_info = cx_pool.get_socket()
635
 
        self.assertTrue(cx_pool.in_request())
636
 
        self.assertNotEqual(sock_info, new_sock_info)
637
 
        self.assertEqual(new_sock_info, cx_pool._get_request_state())
638
 
        cx_pool.maybe_return_socket(new_sock_info)
639
 
        self.assertEqual(new_sock_info, cx_pool._get_request_state())
640
 
        self.assertEqual(0, len(cx_pool.sockets))
641
 
 
642
 
        cx_pool.end_request()
643
 
        self.assertEqual(1, len(cx_pool.sockets))
644
 
 
645
 
    def test_pool_removes_dead_socket_after_request(self):
646
 
        # Test that Pool handles a socket dying that *used* to be the request
647
 
        # socket.
648
 
        cx_pool = self.get_pool((host,port), 10, None, None, False)
649
 
        cx_pool._check_interval_seconds = 0  # Always check.
650
 
        cx_pool.start_request()
651
 
 
652
 
        # Get the request socket
653
 
        sock_info = cx_pool.get_socket()
654
 
        self.assertEqual(sock_info, cx_pool._get_request_state())
655
 
        cx_pool.maybe_return_socket(sock_info)
656
 
 
657
 
        # End request
658
 
        cx_pool.end_request()
659
 
        self.assertEqual(1, len(cx_pool.sockets))
660
 
 
661
 
        # Kill old request socket
662
 
        sock_info.sock.close()
663
 
 
664
 
        # Dead socket detected and removed
665
 
        new_sock_info = cx_pool.get_socket()
666
 
        self.assertFalse(cx_pool.in_request())
667
 
        self.assertNotEqual(sock_info, new_sock_info)
668
 
        self.assertEqual(0, len(cx_pool.sockets))
669
 
        self.assertFalse(pymongo.pool._closed(new_sock_info.sock))
670
 
        cx_pool.maybe_return_socket(new_sock_info)
671
 
        self.assertEqual(1, len(cx_pool.sockets))
672
 
 
673
 
    def test_dead_request_socket_with_max_size(self):
674
 
        # When a pool replaces a dead request socket, the semaphore it uses
675
 
        # to enforce max_size should remain unaffected.
676
 
        cx_pool = self.get_pool(
677
 
            (host, port), 1, None, None, False, wait_queue_timeout=1)
678
 
 
679
 
        cx_pool._check_interval_seconds = 0  # Always check.
680
 
        cx_pool.start_request()
681
 
 
682
 
        # Get and close the request socket.
683
 
        request_sock_info = cx_pool.get_socket()
684
 
        request_sock_info.sock.close()
685
 
        cx_pool.maybe_return_socket(request_sock_info)
686
 
 
687
 
        # Detects closed socket and creates new one, semaphore value still 0.
688
 
        request_sock_info_2 = cx_pool.get_socket()
689
 
        self.assertNotEqual(request_sock_info, request_sock_info_2)
690
 
        cx_pool.maybe_return_socket(request_sock_info_2)
691
 
        cx_pool.end_request()
692
 
 
693
 
        # Semaphore value now 1; we can get a socket.
694
 
        sock_info = cx_pool.get_socket()
695
 
 
696
 
        # Clean up.
697
 
        cx_pool.maybe_return_socket(sock_info)
698
 
 
699
 
    def test_socket_reclamation(self):
700
 
        if sys.platform.startswith('java'):
701
 
            raise SkipTest("Jython can't do socket reclamation")
702
 
 
703
 
        # Check that if a thread starts a request and dies without ending
704
 
        # the request, that the socket is reclaimed into the pool.
705
 
        cx_pool = self.get_pool(
706
 
            pair=(host,port),
707
 
            max_size=10,
708
 
            net_timeout=1000,
709
 
            conn_timeout=1000,
710
 
            use_ssl=False,
711
 
        )
712
 
 
713
 
        self.assertEqual(0, len(cx_pool.sockets))
714
 
 
715
 
        lock = None
716
 
        the_sock = [None]
717
 
 
718
 
        def leak_request():
719
 
            self.assertEqual(NO_REQUEST, cx_pool._get_request_state())
720
 
            cx_pool.start_request()
721
 
            self.assertEqual(NO_SOCKET_YET, cx_pool._get_request_state())
722
 
            sock_info = cx_pool.get_socket()
723
 
            self.assertEqual(sock_info, cx_pool._get_request_state())
724
 
            the_sock[0] = id(sock_info.sock)
725
 
            cx_pool.maybe_return_socket(sock_info)
726
 
 
727
 
            if not self.use_greenlets:
728
 
                lock.release()
729
 
 
730
 
        if self.use_greenlets:
731
 
            g = Greenlet(leak_request)
732
 
            g.start()
733
 
            g.join(1)
734
 
            self.assertTrue(g.ready(), "Greenlet is hung")
735
 
 
736
 
            # In Gevent after 0.13.8, join() returns before the Greenlet.link
737
 
            # callback fires. Give it a moment to reclaim the socket.
738
 
            gevent.sleep(0.1)
739
 
        else:
740
 
            lock = thread.allocate_lock()
741
 
            lock.acquire()
742
 
 
743
 
            # Start a thread WITHOUT a threading.Thread - important to test that
744
 
            # Pool can deal with primitive threads.
745
 
            thread.start_new_thread(leak_request, ())
746
 
 
747
 
            # Join thread
748
 
            acquired = lock.acquire()
749
 
            self.assertTrue(acquired, "Thread is hung")
750
 
 
751
 
            # Make sure thread is really gone
752
 
            time.sleep(1)
753
 
 
754
 
            if 'PyPy' in sys.version:
755
 
                gc.collect()
756
 
 
757
 
            # Access the thread local from the main thread to trigger the
758
 
            # ThreadVigil's delete callback, returning the request socket to
759
 
            # the pool.
760
 
            # In Python 2.7.0 and lesser, a dead thread's locals are deleted
761
 
            # and those locals' weakref callbacks are fired only when another
762
 
            # thread accesses the locals and finds the thread state is stale,
763
 
            # see http://bugs.python.org/issue1868. Accessing the thread
764
 
            # local from the main thread is a necessary part of this test, and
765
 
            # realistic: in a multithreaded web server a new thread will access
766
 
            # Pool._ident._local soon after an old thread has died.
767
 
            cx_pool._ident.get()
768
 
 
769
 
        # Pool reclaimed the socket
770
 
        self.assertEqual(1, len(cx_pool.sockets))
771
 
        self.assertEqual(the_sock[0], id(one(cx_pool.sockets).sock))
772
 
        self.assertEqual(0, len(cx_pool._tid_to_sock))
773
 
 
774
 
 
775
 
class _TestMaxPoolSize(_TestPoolingBase):
776
 
    """Test that connection pool keeps proper number of idle sockets open,
777
 
    no matter how start/end_request are called. To be run both with threads and
778
 
    with greenlets.
779
 
    """
780
 
    def _test_max_pool_size(
781
 
            self, start_request, end_request, max_pool_size=4, nthreads=10):
782
 
        """Start `nthreads` threads. Each calls start_request `start_request`
783
 
        times, then find_one and waits at a barrier; once all reach the barrier
784
 
        each calls end_request `end_request` times. The test asserts that the
785
 
        pool ends with min(max_pool_size, nthreads) sockets or, if
786
 
        start_request wasn't called, at least one socket.
787
 
 
788
 
        This tests both max_pool_size enforcement and that leaked request
789
 
        sockets are eventually returned to the pool when their threads end.
790
 
 
791
 
        You may need to increase ulimit -n on Mac.
792
 
 
793
 
        If you increase nthreads over about 35, note a
794
 
        Gevent 0.13.6 bug on Mac: Greenlet.join() hangs if more than
795
 
        about 35 Greenlets share a MongoClient. Apparently fixed in
796
 
        recent Gevent development.
797
 
        """
798
 
        if start_request:
799
 
            if max_pool_size is not None and max_pool_size < nthreads:
800
 
                raise AssertionError("Deadlock")
801
 
 
802
 
        c = self.get_client(
803
 
            max_pool_size=max_pool_size, auto_start_request=False)
804
 
 
805
 
        rendezvous = CreateAndReleaseSocket.Rendezvous(
806
 
            nthreads, self.use_greenlets)
807
 
 
808
 
        threads = []
809
 
        for i in range(nthreads):
810
 
            t = CreateAndReleaseSocket(
811
 
                self, c, start_request, end_request, rendezvous)
812
 
            threads.append(t)
813
 
 
814
 
        for t in threads:
815
 
            t.start()
816
 
 
817
 
        if 'PyPy' in sys.version:
818
 
            # With PyPy we need to kick off the gc whenever the threads hit the
819
 
            # rendezvous since nthreads > max_pool_size.
820
 
            gc_collect_until_done(threads)
821
 
        else:
822
 
            for t in threads:
823
 
                t.join()
824
 
 
825
 
        # join() returns before the thread state is cleared; give it time.
826
 
        self.sleep(1)
827
 
 
828
 
        for t in threads:
829
 
            self.assertTrue(t.passed)
830
 
 
831
 
        # Socket-reclamation doesn't work in Jython
832
 
        if not sys.platform.startswith('java'):
833
 
            cx_pool = get_pool(c)
834
 
 
835
 
            # Socket-reclamation depends on timely garbage-collection
836
 
            if 'PyPy' in sys.version:
837
 
                gc.collect()
838
 
 
839
 
            if self.use_greenlets:
840
 
                # Wait for Greenlet.link() callbacks to execute
841
 
                the_hub = hub.get_hub()
842
 
                if hasattr(the_hub, 'join'):
843
 
                    # Gevent 1.0
844
 
                    the_hub.join()
845
 
                else:
846
 
                    # Gevent 0.13 and less
847
 
                    the_hub.shutdown()
848
 
 
849
 
            if start_request:
850
 
                # Trigger final cleanup in Python <= 2.7.0.
851
 
                cx_pool._ident.get()
852
 
                expected_idle = min(max_pool_size, nthreads)
853
 
                message = (
854
 
                    '%d idle sockets (expected %d) and %d request sockets'
855
 
                    ' (expected 0)' % (
856
 
                        len(cx_pool.sockets), expected_idle,
857
 
                        len(cx_pool._tid_to_sock)))
858
 
 
859
 
                self.assertEqual(
860
 
                    expected_idle, len(cx_pool.sockets), message)
861
 
            else:
862
 
                # Without calling start_request(), threads can safely share
863
 
                # sockets; the number running concurrently, and hence the
864
 
                # number of sockets needed, is between 1 and 10, depending
865
 
                # on thread-scheduling.
866
 
                self.assertTrue(len(cx_pool.sockets) >= 1)
867
 
 
868
 
            # thread.join completes slightly *before* thread locals are
869
 
            # cleaned up, so wait up to 5 seconds for them.
870
 
            self.sleep(0.1)
871
 
            cx_pool._ident.get()
872
 
            start = time.time()
873
 
 
874
 
            while (
875
 
                not cx_pool.sockets
876
 
                and cx_pool._socket_semaphore.counter < max_pool_size
877
 
                and (time.time() - start) < 5
878
 
            ):
879
 
                self.sleep(0.1)
880
 
                cx_pool._ident.get()
881
 
 
882
 
            if max_pool_size is not None:
883
 
                self.assertEqual(
884
 
                    max_pool_size,
885
 
                    cx_pool._socket_semaphore.counter)
886
 
 
887
 
            self.assertEqual(0, len(cx_pool._tid_to_sock))
888
 
 
889
 
    def _test_max_pool_size_no_rendezvous(self, start_request, end_request):
890
 
        max_pool_size = 5
891
 
        c = self.get_client(
892
 
            max_pool_size=max_pool_size, auto_start_request=False)
893
 
 
894
 
        # If you increase nthreads over about 35, note a
895
 
        # Gevent 0.13.6 bug on Mac, Greenlet.join() hangs if more than
896
 
        # about 35 Greenlets share a MongoClient. Apparently fixed in
897
 
        # recent Gevent development.
898
 
 
899
 
        # On the other hand, nthreads had better be much larger than
900
 
        # max_pool_size to ensure that max_pool_size sockets are actually
901
 
        # required at some point in this test's execution.
902
 
        nthreads = 10
903
 
 
904
 
        if (sys.platform.startswith('java')
905
 
                and start_request > end_request
906
 
                and nthreads > max_pool_size):
907
 
 
908
 
            # Since Jython can't reclaim the socket and release the semaphore
909
 
            # after a thread leaks a request, we'll exhaust the semaphore and
910
 
            # deadlock.
911
 
            raise SkipTest("Jython can't do socket reclamation")
912
 
 
913
 
        threads = []
914
 
        for i in range(nthreads):
915
 
            t = CreateAndReleaseSocketNoRendezvous(
916
 
                self, c, start_request, end_request)
917
 
            threads.append(t)
918
 
 
919
 
        for t in threads:
920
 
            t.start()
921
 
 
922
 
        if 'PyPy' in sys.version:
923
 
            # With PyPy we need to kick off the gc whenever the threads hit the
924
 
            # rendezvous since nthreads > max_pool_size.
925
 
            gc_collect_until_done(threads)
926
 
        else:
927
 
            for t in threads:
928
 
                t.join()
929
 
 
930
 
        for t in threads:
931
 
            self.assertTrue(t.passed)
932
 
 
933
 
        cx_pool = get_pool(c)
934
 
 
935
 
        # Socket-reclamation depends on timely garbage-collection
936
 
        if 'PyPy' in sys.version:
937
 
            gc.collect()
938
 
 
939
 
        if self.use_greenlets:
940
 
            # Wait for Greenlet.link() callbacks to execute
941
 
            the_hub = hub.get_hub()
942
 
            if hasattr(the_hub, 'join'):
943
 
                # Gevent 1.0
944
 
                the_hub.join()
945
 
            else:
946
 
                # Gevent 0.13 and less
947
 
                the_hub.shutdown()
948
 
 
949
 
        # thread.join completes slightly *before* thread locals are
950
 
        # cleaned up, so wait up to 5 seconds for them.
951
 
        self.sleep(0.1)
952
 
        cx_pool._ident.get()
953
 
        start = time.time()
954
 
 
955
 
        while (
956
 
            not cx_pool.sockets
957
 
            and cx_pool._socket_semaphore.counter < max_pool_size
958
 
            and (time.time() - start) < 5
959
 
        ):
960
 
            self.sleep(0.1)
961
 
            cx_pool._ident.get()
962
 
 
963
 
        self.assertTrue(len(cx_pool.sockets) >= 1)
964
 
        self.assertEqual(max_pool_size, cx_pool._socket_semaphore.counter)
965
 
 
966
 
    def test_max_pool_size(self):
967
 
        self._test_max_pool_size(
968
 
            start_request=0, end_request=0, nthreads=10, max_pool_size=4)
969
 
 
970
 
    def test_max_pool_size_none(self):
971
 
        self._test_max_pool_size(
972
 
            start_request=0, end_request=0, nthreads=10, max_pool_size=None)
973
 
 
974
 
    def test_max_pool_size_with_request(self):
975
 
        self._test_max_pool_size(
976
 
            start_request=1, end_request=1, nthreads=10, max_pool_size=10)
977
 
 
978
 
    def test_max_pool_size_with_multiple_request(self):
979
 
        self._test_max_pool_size(
980
 
            start_request=10, end_request=10, nthreads=10, max_pool_size=10)
981
 
 
982
 
    def test_max_pool_size_with_redundant_request(self):
983
 
        self._test_max_pool_size(
984
 
            start_request=2, end_request=1, nthreads=10, max_pool_size=10)
985
 
 
986
 
    def test_max_pool_size_with_redundant_request2(self):
987
 
        self._test_max_pool_size(
988
 
            start_request=20, end_request=1, nthreads=10, max_pool_size=10)
989
 
 
990
 
    def test_max_pool_size_with_redundant_request_no_rendezvous(self):
991
 
        self._test_max_pool_size_no_rendezvous(2, 1)
992
 
 
993
 
    def test_max_pool_size_with_redundant_request_no_rendezvous2(self):
994
 
        self._test_max_pool_size_no_rendezvous(20, 1)
995
 
 
996
 
    def test_max_pool_size_with_leaked_request(self):
997
 
        # Call start_request() but not end_request() -- when threads die, they
998
 
        # should return their request sockets to the pool.
999
 
        self._test_max_pool_size(
1000
 
            start_request=1, end_request=0, nthreads=10, max_pool_size=10)
1001
 
 
1002
 
    def test_max_pool_size_with_leaked_request_no_rendezvous(self):
1003
 
        self._test_max_pool_size_no_rendezvous(1, 0)
1004
 
 
1005
 
    def test_max_pool_size_with_end_request_only(self):
1006
 
        # Call end_request() but not start_request()
1007
 
        self._test_max_pool_size(0, 1)
1008
 
 
1009
 
    def test_max_pool_size_with_connection_failure(self):
1010
 
        # The pool acquires its semaphore before attempting to connect; ensure
1011
 
        # it releases the semaphore on connection failure.
1012
 
        class TestPool(Pool):
1013
 
            def connect(self):
1014
 
                raise socket.error()
1015
 
 
1016
 
        test_pool = TestPool(
1017
 
            pair=('example.com', 27017),
1018
 
            max_size=1,
1019
 
            net_timeout=1,
1020
 
            conn_timeout=1,
1021
 
            use_ssl=False,
1022
 
            wait_queue_timeout=1,
1023
 
            use_greenlets=self.use_greenlets)
1024
 
 
1025
 
        # First call to get_socket fails; if pool doesn't release its semaphore
1026
 
        # then the second call raises "ConnectionFailure: Timed out waiting for
1027
 
        # socket from pool" instead of the socket.error.
1028
 
        for i in range(2):
1029
 
            self.assertRaises(socket.error, test_pool.get_socket)
1030
 
 
1031
 
 
1032
 
class SocketGetter(MongoThread):
1033
 
    """Utility for _TestMaxOpenSockets and _TestWaitQueueMultiple"""
1034
 
    def __init__(self, test_case, pool):
1035
 
        super(SocketGetter, self).__init__(test_case)
1036
 
        self.state = 'init'
1037
 
        self.pool = pool
1038
 
        self.sock = None
1039
 
 
1040
 
    def run(self):
1041
 
        self.state = 'get_socket'
1042
 
        self.sock = self.pool.get_socket()
1043
 
        self.state = 'sock'
1044
 
 
1045
 
 
1046
 
class _TestMaxOpenSockets(_TestPoolingBase):
1047
 
    """Test that connection pool doesn't open more than max_size sockets.
1048
 
    To be run both with threads and with greenlets.
1049
 
    """
1050
 
    def get_pool_with_wait_queue_timeout(self, wait_queue_timeout):
1051
 
        return self.get_pool((host, port),
1052
 
                             1, None, None,
1053
 
                             False,
1054
 
                             wait_queue_timeout=wait_queue_timeout,
1055
 
                             wait_queue_multiple=None)
1056
 
 
1057
 
    def test_wait_queue_timeout(self):
1058
 
        wait_queue_timeout = 2  # Seconds
1059
 
        pool = self.get_pool_with_wait_queue_timeout(wait_queue_timeout)
1060
 
        sock_info = pool.get_socket()
1061
 
        start = time.time()
1062
 
        self.assertRaises(ConnectionFailure, pool.get_socket)
1063
 
        duration = time.time() - start
1064
 
        self.assertTrue(
1065
 
            abs(wait_queue_timeout - duration) < 1,
1066
 
            "Waited %.2f seconds for a socket, expected %f" % (
1067
 
                duration, wait_queue_timeout))
1068
 
 
1069
 
        sock_info.close()
1070
 
 
1071
 
    def test_blocking(self):
1072
 
        # Verify get_socket() with no wait_queue_timeout blocks forever.
1073
 
        pool = self.get_pool_with_wait_queue_timeout(None)
1074
 
 
1075
 
        # Reach max_size.
1076
 
        s1 = pool.get_socket()
1077
 
        t = SocketGetter(self, pool)
1078
 
        t.start()
1079
 
        while t.state != 'get_socket':
1080
 
            self.sleep(0.1)
1081
 
 
1082
 
        self.sleep(1)
1083
 
        self.assertEqual(t.state, 'get_socket')
1084
 
        pool.maybe_return_socket(s1)
1085
 
        while t.state != 'sock':
1086
 
            self.sleep(0.1)
1087
 
 
1088
 
        self.assertEqual(t.state, 'sock')
1089
 
        self.assertEqual(t.sock, s1)
1090
 
        s1.close()
1091
 
 
1092
 
 
1093
 
class _TestWaitQueueMultiple(_TestPoolingBase):
1094
 
    """Test that connection pool doesn't allow more than
1095
 
    waitQueueMultiple * max_size waiters.
1096
 
    To be run both with threads and with greenlets.
1097
 
    """
1098
 
    def get_pool_with_wait_queue_multiple(self, wait_queue_multiple):
1099
 
        return self.get_pool((host, port),
1100
 
                             2, None, None,
1101
 
                             False,
1102
 
                             wait_queue_timeout=None,
1103
 
                             wait_queue_multiple=wait_queue_multiple)
1104
 
 
1105
 
    def test_wait_queue_multiple(self):
1106
 
        pool = self.get_pool_with_wait_queue_multiple(3)
1107
 
 
1108
 
        # Reach max_size sockets.
1109
 
        socket_info_0 = pool.get_socket()
1110
 
        socket_info_1 = pool.get_socket()
1111
 
 
1112
 
        # Reach max_size * wait_queue_multiple waiters.
1113
 
        threads = []
1114
 
        for _ in xrange(6):
1115
 
            t = SocketGetter(self, pool)
1116
 
            t.start()
1117
 
            threads.append(t)
1118
 
 
1119
 
        self.sleep(1)
1120
 
        for t in threads:
1121
 
            self.assertEqual(t.state, 'get_socket')
1122
 
 
1123
 
        self.assertRaises(ExceededMaxWaiters, pool.get_socket)
1124
 
        socket_info_0.close()
1125
 
        socket_info_1.close()
1126
 
 
1127
 
    def test_wait_queue_multiple_unset(self):
1128
 
        pool = self.get_pool_with_wait_queue_multiple(None)
1129
 
        socks = []
1130
 
        for _ in xrange(2):
1131
 
            sock = pool.get_socket()
1132
 
            socks.append(sock)
1133
 
        threads = []
1134
 
        for _ in xrange(30):
1135
 
            t = SocketGetter(self, pool)
1136
 
            t.start()
1137
 
            threads.append(t)
1138
 
        self.sleep(1)
1139
 
        for t in threads:
1140
 
            self.assertEqual(t.state, 'get_socket')
1141
 
 
1142
 
        for socket_info in socks:
1143
 
            socket_info.close()
1144
 
 
1145
 
 
1146
 
class _TestPoolSocketSharing(_TestPoolingBase):
1147
 
    """Directly test that two simultaneous operations don't share a socket. To
1148
 
    be run both with threads and with greenlets.
1149
 
    """
1150
 
    def _test_pool(self, use_request):
1151
 
        """
1152
 
        Test that the connection pool prevents both threads and greenlets from
1153
 
        using a socket at the same time.
1154
 
 
1155
 
        Sequence:
1156
 
        gr0: start a slow find()
1157
 
        gr1: start a fast find()
1158
 
        gr1: get results
1159
 
        gr0: get results
1160
 
        """
1161
 
        cx = get_client(
1162
 
            use_greenlets=self.use_greenlets,
1163
 
            auto_start_request=False
1164
 
        )
1165
 
 
1166
 
        db = cx.pymongo_test
1167
 
        db.test.remove()
1168
 
        db.test.insert({'_id': 1})
1169
 
 
1170
 
        history = []
1171
 
 
1172
 
        def find_fast():
1173
 
            if use_request:
1174
 
                cx.start_request()
1175
 
 
1176
 
            history.append('find_fast start')
1177
 
 
1178
 
            # With greenlets and the old connection._Pool, this would throw
1179
 
            # AssertionError: "This event is already used by another
1180
 
            # greenlet"
1181
 
            self.assertEqual({'_id': 1}, db.test.find_one())
1182
 
            history.append('find_fast done')
1183
 
 
1184
 
            if use_request:
1185
 
                cx.end_request()
1186
 
 
1187
 
        def find_slow():
1188
 
            if use_request:
1189
 
                cx.start_request()
1190
 
 
1191
 
            history.append('find_slow start')
1192
 
 
1193
 
            # Javascript function that pauses N seconds per document
1194
 
            fn = delay(10)
1195
 
            if (is_mongos(db.connection) or not
1196
 
                version.at_least(db.connection, (1, 7, 2))):
1197
 
                # mongos doesn't support eval so we have to use $where
1198
 
                # which is less reliable in this context.
1199
 
                self.assertEqual(1, db.test.find({"$where": fn}).count())
1200
 
            else:
1201
 
                # 'nolock' allows find_fast to start and finish while we're
1202
 
                # waiting for this to complete.
1203
 
                self.assertEqual({'ok': 1.0, 'retval': True},
1204
 
                                 db.command('eval', fn, nolock=True))
1205
 
 
1206
 
            history.append('find_slow done')
1207
 
 
1208
 
            if use_request:
1209
 
                cx.end_request()
1210
 
 
1211
 
        if self.use_greenlets:
1212
 
            gr0, gr1 = Greenlet(find_slow), Greenlet(find_fast)
1213
 
            gr0.start()
1214
 
            gr1.start_later(.1)
1215
 
        else:
1216
 
            gr0 = threading.Thread(target=find_slow)
1217
 
            gr0.setDaemon(True)
1218
 
            gr1 = threading.Thread(target=find_fast)
1219
 
            gr1.setDaemon(True)
1220
 
 
1221
 
            gr0.start()
1222
 
            time.sleep(.1)
1223
 
            gr1.start()
1224
 
 
1225
 
        gr0.join()
1226
 
        gr1.join()
1227
 
 
1228
 
        self.assertEqual([
1229
 
            'find_slow start',
1230
 
            'find_fast start',
1231
 
            'find_fast done',
1232
 
            'find_slow done',
1233
 
        ], history)
1234
 
 
1235
 
    def test_pool(self):
1236
 
        self._test_pool(use_request=False)
1237
 
 
1238
 
    def test_pool_request(self):
1239
 
        self._test_pool(use_request=True)