~ubuntu-branches/ubuntu/wily/pymongo/wily-proposed

« back to all changes in this revision

Viewing changes to test/utils.py

  • Committer: Package Import Robot
  • Author(s): Federico Ceratto
  • Date: 2014-09-28 11:54:48 UTC
  • mfrom: (24.1.4 sid)
  • Revision ID: package-import@ubuntu.com-20140928115448-2j07tau6d9ye3s9o
Tags: 2.7.2-1
* New upstream release.
* Add architectures armel armhf mipsel ppc64el

Show diffs side-by-side

added added

removed removed

Lines of Context:
15
15
"""Utilities for testing pymongo
16
16
"""
17
17
 
 
18
import gc
18
19
import os
19
20
import struct
20
21
import sys
21
22
import threading
 
23
import time
22
24
 
23
25
from nose.plugins.skip import SkipTest
 
26
 
 
27
from bson.son import SON
24
28
from pymongo import MongoClient, MongoReplicaSetClient
25
 
from pymongo.errors import AutoReconnect
 
29
from pymongo.errors import AutoReconnect, ConnectionFailure, OperationFailure
26
30
from pymongo.pool import NO_REQUEST, NO_SOCKET_YET, SocketInfo
27
31
from test import host, port, version
28
32
 
453
457
    # Make concurrency bugs more likely to manifest.
454
458
    interval = None
455
459
    if not sys.platform.startswith('java'):
456
 
        if sys.version_info >= (3, 2):
 
460
        if hasattr(sys, 'getswitchinterval'):
457
461
            interval = sys.getswitchinterval()
458
462
            sys.setswitchinterval(1e-6)
459
463
        else:
472
476
 
473
477
    finally:
474
478
        if not sys.platform.startswith('java'):
475
 
            if sys.version_info >= (3, 2):
 
479
            if hasattr(sys, 'setswitchinterval'):
476
480
                sys.setswitchinterval(interval)
477
481
            else:
478
482
                sys.setcheckinterval(interval)
586
590
                c.max_message_size)
587
591
 
588
592
 
 
593
class _TestExhaustCursorMixin(object):
 
594
    """Test that clients properly handle errors from exhaust cursors.
 
595
 
 
596
    Inherit from this class and from unittest.TestCase, and override
 
597
    _get_client(self, **kwargs).
 
598
    """
 
599
    def test_exhaust_query_server_error(self):
 
600
        # When doing an exhaust query, the socket stays checked out on success
 
601
        # but must be checked in on error to avoid semaphore leaks.
 
602
        client = self._get_client(max_pool_size=1)
 
603
        if is_mongos(client):
 
604
            raise SkipTest("Can't use exhaust cursors with mongos")
 
605
        if not version.at_least(client, (2, 2, 0)):
 
606
            raise SkipTest("mongod < 2.2.0 closes exhaust socket on error")
 
607
 
 
608
        collection = client.pymongo_test.test
 
609
        pool = get_pool(client)
 
610
 
 
611
        sock_info = one(pool.sockets)
 
612
        # This will cause OperationFailure in all mongo versions since
 
613
        # the value for $orderby must be a document.
 
614
        cursor = collection.find(
 
615
            SON([('$query', {}), ('$orderby', True)]), exhaust=True)
 
616
        self.assertRaises(OperationFailure, cursor.next)
 
617
        self.assertFalse(sock_info.closed)
 
618
 
 
619
        # The semaphore was decremented despite the error.
 
620
        self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
 
621
 
 
622
    def test_exhaust_getmore_server_error(self):
 
623
        # When doing a getmore on an exhaust cursor, the socket stays checked
 
624
        # out on success but must be checked in on error to avoid semaphore
 
625
        # leaks.
 
626
        client = self._get_client(max_pool_size=1)
 
627
        if is_mongos(client):
 
628
            raise SkipTest("Can't use exhaust cursors with mongos")
 
629
 
 
630
        # A separate client that doesn't affect the test client's pool.
 
631
        client2 = self._get_client()
 
632
 
 
633
        collection = client.pymongo_test.test
 
634
        collection.remove()
 
635
 
 
636
        # Enough data to ensure it streams down for a few milliseconds.
 
637
        long_str = 'a' * (256 * 1024)
 
638
        collection.insert([{'a': long_str} for _ in range(200)])
 
639
 
 
640
        pool = get_pool(client)
 
641
        pool._check_interval_seconds = None  # Never check.
 
642
        sock_info = one(pool.sockets)
 
643
 
 
644
        cursor = collection.find(exhaust=True)
 
645
 
 
646
        # Initial query succeeds.
 
647
        cursor.next()
 
648
 
 
649
        # Cause a server error on getmore.
 
650
        client2.pymongo_test.test.drop()
 
651
        self.assertRaises(OperationFailure, list, cursor)
 
652
 
 
653
        # Make sure the socket is still valid
 
654
        self.assertEqual(0, collection.count())
 
655
 
 
656
    def test_exhaust_query_network_error(self):
 
657
        # When doing an exhaust query, the socket stays checked out on success
 
658
        # but must be checked in on error to avoid semaphore leaks.
 
659
        client = self._get_client(max_pool_size=1)
 
660
        if is_mongos(client):
 
661
            raise SkipTest("Can't use exhaust cursors with mongos")
 
662
 
 
663
        collection = client.pymongo_test.test
 
664
        pool = get_pool(client)
 
665
        pool._check_interval_seconds = None  # Never check.
 
666
 
 
667
        # Cause a network error.
 
668
        sock_info = one(pool.sockets)
 
669
        sock_info.sock.close()
 
670
        cursor = collection.find(exhaust=True)
 
671
        self.assertRaises(ConnectionFailure, cursor.next)
 
672
        self.assertTrue(sock_info.closed)
 
673
 
 
674
        # The semaphore was decremented despite the error.
 
675
        self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
 
676
 
 
677
    def test_exhaust_getmore_network_error(self):
 
678
        # When doing a getmore on an exhaust cursor, the socket stays checked
 
679
        # out on success but must be checked in on error to avoid semaphore
 
680
        # leaks.
 
681
        client = self._get_client(max_pool_size=1)
 
682
        if is_mongos(client):
 
683
            raise SkipTest("Can't use exhaust cursors with mongos")
 
684
 
 
685
        collection = client.pymongo_test.test
 
686
        collection.remove()
 
687
        collection.insert([{} for _ in range(200)])  # More than one batch.
 
688
        pool = get_pool(client)
 
689
        pool._check_interval_seconds = None  # Never check.
 
690
 
 
691
        cursor = collection.find(exhaust=True)
 
692
 
 
693
        # Initial query succeeds.
 
694
        cursor.next()
 
695
 
 
696
        # Cause a network error.
 
697
        sock_info = cursor._Cursor__exhaust_mgr.sock
 
698
        sock_info.sock.close()
 
699
 
 
700
        # A getmore fails.
 
701
        self.assertRaises(ConnectionFailure, list, cursor)
 
702
        self.assertTrue(sock_info.closed)
 
703
 
 
704
        # The semaphore was decremented despite the error.
 
705
        self.assertTrue(pool._socket_semaphore.acquire(blocking=False))
 
706
 
 
707
 
589
708
# Backport of WarningMessage from python 2.6, with fixed syntax for python 2.4.
590
709
class WarningMessage(object):
591
710