586
590
c.max_message_size)
593
class _TestExhaustCursorMixin(object):
594
"""Test that clients properly handle errors from exhaust cursors.
596
Inherit from this class and from unittest.TestCase, and override
597
_get_client(self, **kwargs).
599
def test_exhaust_query_server_error(self):
600
# When doing an exhaust query, the socket stays checked out on success
601
# but must be checked in on error to avoid semaphore leaks.
602
client = self._get_client(max_pool_size=1)
603
if is_mongos(client):
604
raise SkipTest("Can't use exhaust cursors with mongos")
605
if not version.at_least(client, (2, 2, 0)):
606
raise SkipTest("mongod < 2.2.0 closes exhaust socket on error")
608
collection = client.pymongo_test.test
609
pool = get_pool(client)
611
sock_info = one(pool.sockets)
612
# This will cause OperationFailure in all mongo versions since
613
# the value for $orderby must be a document.
614
cursor = collection.find(
615
SON([('$query', {}), ('$orderby', True)]), exhaust=True)
616
self.assertRaises(OperationFailure, cursor.next)
617
self.assertFalse(sock_info.closed)
619
# The semaphore was decremented despite the error.
620
self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
622
def test_exhaust_getmore_server_error(self):
623
# When doing a getmore on an exhaust cursor, the socket stays checked
624
# out on success but must be checked in on error to avoid semaphore
626
client = self._get_client(max_pool_size=1)
627
if is_mongos(client):
628
raise SkipTest("Can't use exhaust cursors with mongos")
630
# A separate client that doesn't affect the test client's pool.
631
client2 = self._get_client()
633
collection = client.pymongo_test.test
636
# Enough data to ensure it streams down for a few milliseconds.
637
long_str = 'a' * (256 * 1024)
638
collection.insert([{'a': long_str} for _ in range(200)])
640
pool = get_pool(client)
641
pool._check_interval_seconds = None # Never check.
642
sock_info = one(pool.sockets)
644
cursor = collection.find(exhaust=True)
646
# Initial query succeeds.
649
# Cause a server error on getmore.
650
client2.pymongo_test.test.drop()
651
self.assertRaises(OperationFailure, list, cursor)
653
# Make sure the socket is still valid
654
self.assertEqual(0, collection.count())
656
def test_exhaust_query_network_error(self):
657
# When doing an exhaust query, the socket stays checked out on success
658
# but must be checked in on error to avoid semaphore leaks.
659
client = self._get_client(max_pool_size=1)
660
if is_mongos(client):
661
raise SkipTest("Can't use exhaust cursors with mongos")
663
collection = client.pymongo_test.test
664
pool = get_pool(client)
665
pool._check_interval_seconds = None # Never check.
667
# Cause a network error.
668
sock_info = one(pool.sockets)
669
sock_info.sock.close()
670
cursor = collection.find(exhaust=True)
671
self.assertRaises(ConnectionFailure, cursor.next)
672
self.assertTrue(sock_info.closed)
674
# The semaphore was decremented despite the error.
675
self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
677
def test_exhaust_getmore_network_error(self):
678
# When doing a getmore on an exhaust cursor, the socket stays checked
679
# out on success but must be checked in on error to avoid semaphore
681
client = self._get_client(max_pool_size=1)
682
if is_mongos(client):
683
raise SkipTest("Can't use exhaust cursors with mongos")
685
collection = client.pymongo_test.test
687
collection.insert([{} for _ in range(200)]) # More than one batch.
688
pool = get_pool(client)
689
pool._check_interval_seconds = None # Never check.
691
cursor = collection.find(exhaust=True)
693
# Initial query succeeds.
696
# Cause a network error.
697
sock_info = cursor._Cursor__exhaust_mgr.sock
698
sock_info.sock.close()
701
self.assertRaises(ConnectionFailure, list, cursor)
702
self.assertTrue(sock_info.closed)
704
# The semaphore was decremented despite the error.
705
self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
589
708
# Backport of WarningMessage from python 2.6, with fixed syntax for python 2.4.
590
709
class WarningMessage(object):