~ubuntu-branches/ubuntu/wily/pymongo/wily

« back to all changes in this revision

Viewing changes to pymongo/replica_set_connection.py

  • Committer: Package Import Robot
  • Author(s): Federico Ceratto
  • Date: 2012-05-10 21:21:40 UTC
  • mfrom: (1.1.11)
  • Revision ID: package-import@ubuntu.com-20120510212140-9c66c00zz850h6l9
Tags: 2.2-1
* New upstream release.
* Dependencies added (Closes: #670268)

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
# Copyright 2009-2011 10gen, Inc.
 
1
# Copyright 2011-2012 10gen, Inc.
2
2
#
3
3
# Licensed under the Apache License, Version 2.0 (the "License"); you
4
4
# may not use this file except in compliance with the License.  You
24
24
.. doctest::
25
25
 
26
26
  >>> from pymongo import ReplicaSetConnection
27
 
  >>> c = ReplicaSetConnection('localhost:31017', replicaSet='repl0')
 
27
  >>> c = ReplicaSetConnection('localhost:27017', replicaSet='repl0')
28
28
  >>> c.test_database
29
29
  Database(ReplicaSetConnection([u'...', u'...']), u'test_database')
30
30
  >>> c['test_database']
34
34
import datetime
35
35
import socket
36
36
import struct
37
 
import sys
38
37
import threading
39
38
import time
40
39
import warnings
41
40
import weakref
42
41
 
 
42
from bson.py3compat import b
43
43
from bson.son import SON
44
44
from pymongo import (common,
45
45
                     database,
55
55
                            InvalidDocument,
56
56
                            OperationFailure)
57
57
 
58
 
 
59
 
if sys.platform.startswith('java'):
60
 
    from select import cpython_compatible_select as select
61
 
else:
62
 
    from select import select
63
 
 
64
 
 
 
58
EMPTY = b("")
65
59
MAX_BSON_SIZE = 4 * 1024 * 1024
66
60
 
67
61
 
68
 
def _closed(sock):
69
 
    """Return True if we know socket has been closed, False otherwise.
70
 
    """
71
 
    try:
72
 
        readers, _, _ = select([sock], [], [], 0)
73
 
    # Any exception here is equally bad (select.error, ValueError, etc.).
74
 
    except Exception:
75
 
        return True
76
 
    return len(readers) > 0
77
 
 
78
 
 
79
62
def _partition_node(node):
80
63
    """Split a host:port string returned from mongod/s into
81
64
    a (host, int(port)) pair needed for socket.connect().
90
73
    return host, port
91
74
 
92
75
 
93
 
class Monitor(threading.Thread):
94
 
    def __init__(self, obj, interval=5):
95
 
        super(Monitor, self).__init__()
96
 
        self.obj = weakref.proxy(obj)
97
 
        self.interval = interval
98
 
 
99
 
    def run(self):
100
 
        while True:
101
 
            try:
102
 
                self.obj.refresh()
103
 
            # The connection object has been
104
 
            # collected so we should die.
105
 
            except ReferenceError:
106
 
                break
107
 
            except:
108
 
                pass
109
 
            time.sleep(self.interval)
 
76
have_gevent = False
 
77
try:
 
78
    import gevent
 
79
    from gevent import Greenlet
 
80
    have_gevent = True
 
81
 
 
82
    class GreenletMonitor(Greenlet):
 
83
        def __init__(self, obj, interval=5):
 
84
            Greenlet.__init__(self)
 
85
            self.obj = weakref.proxy(obj)
 
86
            self.interval = interval
 
87
 
 
88
        def _run(self):
 
89
            while True:
 
90
                try:
 
91
                    self.obj.refresh()
 
92
                # The connection object has been
 
93
                # collected so we should die.
 
94
                except ReferenceError:
 
95
                    break
 
96
                except:
 
97
                    pass
 
98
                gevent.sleep(self.interval)
 
99
 
 
100
except ImportError:
 
101
    pass
110
102
 
111
103
 
112
104
class ReplicaSetConnection(common.BaseObject):
146
138
            documents returned from queries on this connection
147
139
          - `tz_aware` (optional): if ``True``,
148
140
            :class:`~datetime.datetime` instances returned as values
149
 
            in a document by this :class:`Connection` will be timezone
 
141
            in a document by this :class:`ReplicaSetConnection` will be timezone
150
142
            aware (otherwise they will be naive)
151
143
          - `replicaSet`: (required) The name of the replica set to connect to.
152
144
            The driver will verify that each host it connects to is a member of
176
168
          - `ssl`: If True, create the connection to the servers using SSL.
177
169
          - `read_preference`: The read preference for this connection.
178
170
            See :class:`~pymongo.ReadPreference` for available options.
 
171
          - `auto_start_request`: If True (the default), each thread that
 
172
            accesses this :class:`ReplicaSetConnection` has a socket allocated
 
173
            to it for the thread's lifetime, for each member of the set. For
 
174
            :class:`~pymongo.ReadPreference` PRIMARY, auto_start_request=True
 
175
            ensures consistent reads, even if you read after an unsafe
 
176
            write. For read preferences other than PRIMARY, there are no
 
177
            consistency guarantees. (The semantics of auto_start_request,
 
178
            :class:`~pymongo.ReadPreference`, and :class:`ReplicaSetConnection`
 
179
            may change in future releases of PyMongo.)
 
180
          - `use_greenlets` (optional): if ``True``, use a background Greenlet
 
181
            instead of a background thread to monitor state of replica set.
 
182
            :meth:`start_request()` will ensure that the current greenlet uses
 
183
            the same socket for all operations until :meth:`end_request()`.
 
184
            `use_greenlets` with ReplicaSetConnection requires `Gevent
 
185
            <http://gevent.org/>`_ to be installed.
179
186
          - `slave_okay` or `slaveOk` (deprecated): Use `read_preference`
180
187
            instead.
181
 
 
 
188
          - `host`: For compatibility with connection.Connection. If both
 
189
            `host` and `hosts_or_uri` are specified `host` takes precedence.
 
190
          - `port`: For compatibility with connection.Connection. The default
 
191
            port number to use for hosts.
 
192
          - `network_timeout`: For compatibility with connection.Connection.
 
193
            The timeout (in seconds) to use for socket operations - default
 
194
            is no timeout. If both `network_timeout` and `socketTimeoutMS` are
 
195
            are specified `network_timeout` takes precedence, matching
 
196
            connection.Connection.
 
197
 
 
198
 
 
199
        .. versionchanged:: 2.2
 
200
           Added `auto_start_request` and `use_greenlets` options.
 
201
           Added support for `host`, `port`, and `network_timeout` keyword
 
202
           arguments for compatibility with connection.Connection.
182
203
        .. versionadded:: 2.1
183
204
        """
184
 
        self.__max_pool_size = max_pool_size
185
 
        self.__document_class = document_class
186
 
        self.__tz_aware = tz_aware
187
205
        self.__opts = {}
188
206
        self.__seeds = set()
189
207
        self.__hosts = None
193
211
        self.__pools = {}
194
212
        self.__index_cache = {}
195
213
        self.__auth_credentials = {}
 
214
        self.__done = False
 
215
 
 
216
        self.__max_pool_size = common.validate_positive_integer(
 
217
                                        'max_pool_size', max_pool_size)
 
218
        self.__tz_aware = common.validate_boolean('tz_aware', tz_aware)
 
219
        self.__document_class = document_class
 
220
 
 
221
        # Compatibility with connection.Connection
 
222
        host = kwargs.pop('host', hosts_or_uri)
 
223
 
 
224
        port = kwargs.pop('port', 27017)
 
225
        if not isinstance(port, int):
 
226
            raise TypeError("port must be an instance of int")
 
227
 
 
228
        network_timeout = kwargs.pop('network_timeout', None)
 
229
        if network_timeout is not None:
 
230
            if (not isinstance(network_timeout, (int, float)) or
 
231
                network_timeout <= 0):
 
232
                raise ConfigurationError("network_timeout must "
 
233
                                         "be a positive integer")
 
234
 
196
235
        username = None
197
236
        db_name = None
198
 
        if hosts_or_uri is None:
199
 
            self.__seeds.add(('localhost', 27017))
200
 
        elif '://' in hosts_or_uri:
201
 
            res = uri_parser.parse_uri(hosts_or_uri)
 
237
        if host is None:
 
238
            self.__seeds.add(('localhost', port))
 
239
        elif '://' in host:
 
240
            res = uri_parser.parse_uri(host, port)
202
241
            self.__seeds.update(res['nodelist'])
203
242
            username = res['username']
204
243
            password = res['password']
205
244
            db_name = res['database']
206
245
            self.__opts = res['options']
207
246
        else:
208
 
            self.__seeds.update(uri_parser.split_hosts(hosts_or_uri))
 
247
            self.__seeds.update(uri_parser.split_hosts(host, port))
209
248
 
210
249
        for option, value in kwargs.iteritems():
211
250
            option, value = common.validate(option, value)
212
251
            self.__opts[option] = value
213
252
 
 
253
        if self.__opts.get('use_greenlets', False):
 
254
            if not have_gevent:
 
255
                raise ConfigurationError(
 
256
                    "The gevent module is not available. "
 
257
                    "Install the gevent package from PyPI."
 
258
                )
 
259
            self.pool_class = pool.GreenletPool
 
260
        else:
 
261
            self.pool_class = pool.Pool
 
262
 
 
263
        self.__auto_start_request = self.__opts.get('auto_start_request', True)
 
264
        self.__in_request = self.__auto_start_request
214
265
        self.__name = self.__opts.get('replicaset')
215
266
        if not self.__name:
216
267
            raise ConfigurationError("the replicaSet "
217
268
                                     "keyword parameter is required.")
218
 
        self.__net_timeout = self.__opts.get('sockettimeoutms')
 
269
 
 
270
 
 
271
        self.__net_timeout = (network_timeout or
 
272
                              self.__opts.get('sockettimeoutms'))
219
273
        self.__conn_timeout = self.__opts.get('connecttimeoutms')
220
274
        self.__use_ssl = self.__opts.get('ssl', False)
221
275
        if self.__use_ssl and not pool.have_ssl:
231
285
 
232
286
        self.refresh()
233
287
 
234
 
        monitor_thread = Monitor(self)
235
 
        monitor_thread.setName("ReplicaSetMonitorThread")
236
 
        monitor_thread.setDaemon(True)
237
 
        monitor_thread.start()
 
288
        if self.__opts.get('use_greenlets', False):
 
289
            monitor = GreenletMonitor(self)
 
290
        else:
 
291
            monitor = threading.Thread(target=self.__refresh_loop)
 
292
            monitor.setName("ReplicaSetMonitorThread")
 
293
            monitor.setDaemon(True)
 
294
        monitor.start()
238
295
 
239
296
        if db_name and username is None:
240
297
            warnings.warn("must provide a username and password "
244
301
            if not self[db_name].authenticate(username, password):
245
302
                raise ConfigurationError("authentication failed")
246
303
 
 
304
    def __del__(self):
 
305
        """Shutdown the monitor thread.
 
306
        """
 
307
        self.__done = True
 
308
 
 
309
    def __refresh_loop(self):
 
310
        """Refresh loop used in the standard monitor thread.
 
311
        """
 
312
        while True:
 
313
            if not self.__done:
 
314
                try:
 
315
                    self.refresh()
 
316
                # Catch literally everything here to avoid
 
317
                # exceptions when the interpreter shuts down.
 
318
                except:
 
319
                    pass
 
320
                if time:
 
321
                    time.sleep(5)
 
322
 
247
323
    def _cached(self, dbname, coll, index):
248
324
        """Test if `index` is cached.
249
325
        """
316
392
        elif db_name in self.__auth_credentials:
317
393
            del self.__auth_credentials[db_name]
318
394
 
319
 
    def __check_auth(self, sock, authset):
 
395
    def __check_auth(self, sock_info):
320
396
        """Authenticate using cached database credentials.
321
397
 
322
398
        If credentials for the 'admin' database are available only
323
399
        this database is authenticated, since this gives global access.
324
400
        """
 
401
        authset = sock_info.authset
325
402
        names = set(self.__auth_credentials.iterkeys())
326
403
 
327
404
        # Logout from any databases no longer listed in the credentials cache.
328
405
        for dbname in authset - names:
329
406
            try:
330
 
                self.__simple_command(sock, dbname, {'logout': 1})
 
407
                self.__simple_command(sock_info, dbname, {'logout': 1})
331
408
            # TODO: We used this socket to logout. Fix logout so we don't
332
409
            # have to catch this.
333
410
            except OperationFailure:
340
417
 
341
418
        if "admin" in self.__auth_credentials:
342
419
            username, password = self.__auth_credentials["admin"]
343
 
            self.__auth(sock, 'admin', username, password)
 
420
            self.__auth(sock_info, 'admin', username, password)
344
421
            authset.add('admin')
345
422
        else:
346
423
            for db_name in names - authset:
347
424
                user, pwd = self.__auth_credentials[db_name]
348
 
                self.__auth(sock, db_name, user, pwd)
 
425
                self.__auth(sock_info, db_name, user, pwd)
349
426
                authset.add(db_name)
350
427
 
351
428
    @property
417
494
            return self.__pools[self.__writer]['max_bson_size']
418
495
        return 0
419
496
 
420
 
    def __simple_command(self, sock, dbname, spec):
 
497
    @property
 
498
    def auto_start_request(self):
 
499
        return self.__auto_start_request
 
500
 
 
501
    def __simple_command(self, sock_info, dbname, spec):
421
502
        """Send a command to the server.
422
503
        """
423
504
        rqst_id, msg, _ = message.query(0, dbname + '.$cmd', 0, -1, spec)
424
 
        sock.sendall(msg)
425
 
        response = self.__recv_msg(1, rqst_id, sock)
 
505
        sock_info.sock.sendall(msg)
 
506
        response = self.__recv_msg(1, rqst_id, sock_info)
426
507
        response = helpers._unpack_response(response)['data'][0]
427
508
        msg = "command %r failed: %%s" % spec
428
509
        helpers._check_command_response(response, None, msg)
429
510
        return response
430
511
 
431
 
    def __auth(self, sock, dbname, user, passwd):
432
 
        """Authenticate socket `sock` against database `dbname`.
 
512
    def __auth(self, sock_info, dbname, user, passwd):
 
513
        """Authenticate socket against database `dbname`.
433
514
        """
434
515
        # Get a nonce
435
 
        response = self.__simple_command(sock, dbname, {'getnonce': 1})
 
516
        response = self.__simple_command(sock_info, dbname, {'getnonce': 1})
436
517
        nonce = response['nonce']
437
518
        key = helpers._auth_key(nonce, user, passwd)
438
519
 
439
520
        # Actually authenticate
440
521
        query = SON([('authenticate', 1),
441
522
                     ('user', user), ('nonce', nonce), ('key', key)])
442
 
        self.__simple_command(sock, dbname, query)
 
523
        self.__simple_command(sock_info, dbname, query)
443
524
 
444
525
    def __is_master(self, host):
445
526
        """Directly call ismaster.
446
527
        """
447
 
        mongo = pool.Pool(host, self.__max_pool_size,
448
 
                          self.__net_timeout, self.__conn_timeout,
449
 
                          self.__use_ssl)
450
 
        sock = mongo.get_socket()[0]
451
 
        response = self.__simple_command(sock, 'admin', {'ismaster': 1})
452
 
        return response, mongo
 
528
        pool = self.pool_class(host, self.__max_pool_size,
 
529
                               self.__net_timeout, self.__conn_timeout,
 
530
                               self.__use_ssl)
 
531
        sock_info = pool.get_socket()
 
532
        response = self.__simple_command(
 
533
            sock_info, 'admin', {'ismaster': 1}
 
534
        )
 
535
 
 
536
        pool.return_socket(sock_info)
 
537
        return response, pool
453
538
 
454
539
    def __update_pools(self):
455
540
        """Update the mapping of (host, port) pairs to connection pools.
456
541
        """
457
542
        secondaries = []
458
543
        for host in self.__hosts:
459
 
            mongo = None
 
544
            mongo, sock_info = None, None
460
545
            try:
461
546
                if host in self.__pools:
462
547
                    mongo = self.__pools[host]
463
 
                    sock = self.__socket(mongo)
464
 
                    res = self.__simple_command(sock, 'admin', {'ismaster': 1})
 
548
                    sock_info = self.__socket(mongo)
 
549
                    res = self.__simple_command(sock_info, 'admin', {'ismaster': 1})
 
550
                    mongo['pool'].return_socket(sock_info)
465
551
                else:
466
552
                    res, conn = self.__is_master(host)
467
553
                    bson_max = res.get('maxBsonObjectSize', MAX_BSON_SIZE)
469
555
                                          'last_checkout': time.time(),
470
556
                                          'max_bson_size': bson_max}
471
557
            except (ConnectionFailure, socket.error):
472
 
                if mongo:
473
 
                    mongo['pool'].discard_socket()
 
558
                if mongo and sock_info:
 
559
                    mongo['pool'].discard_socket(sock_info)
474
560
                continue
475
561
            # Only use hosts that are currently in 'secondary' state
476
562
            # as readers.
490
576
        hosts = set()
491
577
 
492
578
        for node in nodes:
493
 
            mongo = None
 
579
            mongo, sock_info = None, None
494
580
            try:
495
581
                if node in self.__pools:
496
582
                    mongo = self.__pools[node]
497
 
                    sock = self.__socket(mongo)
498
 
                    response = self.__simple_command(sock, 'admin',
 
583
                    sock_info = self.__socket(mongo)
 
584
                    response = self.__simple_command(sock_info, 'admin',
499
585
                                                     {'ismaster': 1})
 
586
                    mongo['pool'].return_socket(sock_info)
500
587
                else:
501
588
                    response, conn = self.__is_master(node)
502
589
 
520
607
                    hosts.update([_partition_node(h)
521
608
                                  for h in response["passives"]])
522
609
            except (ConnectionFailure, socket.error), why:
523
 
                if mongo:
524
 
                    mongo['pool'].discard_socket()
 
610
                if mongo and sock_info:
 
611
                    mongo['pool'].discard_socket(sock_info)
525
612
                errors.append("%s:%d: %s" % (node[0], node[1], str(why)))
526
613
            if hosts:
527
614
                self.__hosts = hosts
536
623
    def __check_is_primary(self, host):
537
624
        """Checks if this host is the primary for the replica set.
538
625
        """
 
626
        mongo, sock_info = None, None
539
627
        try:
540
 
            mongo = None
541
628
            if host in self.__pools:
542
629
                mongo = self.__pools[host]
543
 
                sock = self.__socket(mongo)
544
 
                res = self.__simple_command(sock, 'admin', {'ismaster': 1})
 
630
                sock_info = self.__socket(mongo)
 
631
                res = self.__simple_command(
 
632
                    sock_info, 'admin', {'ismaster': 1}
 
633
                )
545
634
            else:
546
635
                res, conn = self.__is_master(host)
547
636
                bson_max = res.get('maxBsonObjectSize', MAX_BSON_SIZE)
549
638
                                      'last_checkout': time.time(),
550
639
                                      'max_bson_size': bson_max}
551
640
        except (ConnectionFailure, socket.error), why:
552
 
            if mongo:
553
 
                mongo['pool'].discard_socket()
 
641
            if mongo and sock_info:
 
642
                mongo['pool'].discard_socket(sock_info)
554
643
            raise ConnectionFailure("%s:%d: %s" % (host[0], host[1], str(why)))
 
644
        
 
645
        if mongo and sock_info:
 
646
            mongo['pool'].return_socket(sock_info)
555
647
 
556
648
        if res["ismaster"]:
557
649
            return host
585
677
        raise AutoReconnect(', '.join(errors))
586
678
 
587
679
    def __socket(self, mongo):
588
 
        """Get a socket from the pool.
589
 
 
590
 
        If it's been > 1 second since the last time we checked out a
591
 
        socket, we also check to see if the socket has been closed -
592
 
        this let's us avoid seeing *some*
593
 
        :class:`~pymongo.errors.AutoReconnect` exceptions on server
594
 
        hiccups, etc. We only do this if it's been > 1 second since
595
 
        the last socket checkout, to keep performance reasonable - we
596
 
        can't avoid those completely anyway.
 
680
        """Get a SocketInfo from the pool.
597
681
        """
598
 
        sock, authset = mongo['pool'].get_socket()
599
 
 
600
 
        now = time.time()
601
 
        if now - mongo['last_checkout'] > 1:
602
 
            if _closed(sock):
603
 
                mongo['pool'] = pool.Pool(mongo['pool'].host,
604
 
                                          self.__max_pool_size,
605
 
                                          self.__net_timeout,
606
 
                                          self.__conn_timeout,
607
 
                                          self.__use_ssl)
608
 
                sock, authset = mongo['pool'].get_socket()
609
 
        mongo['last_checkout'] = now
610
 
        if self.__auth_credentials or authset:
611
 
            self.__check_auth(sock, authset)
612
 
        return sock
 
682
        pool = mongo['pool']
 
683
        if self.__auto_start_request:
 
684
            # No effect if a request already started
 
685
            self.start_request()
 
686
 
 
687
        sock_info = pool.get_socket()
 
688
 
 
689
        if self.__auth_credentials:
 
690
            self.__check_auth(sock_info)
 
691
        return sock_info
613
692
 
614
693
    def disconnect(self):
615
694
        """Disconnect from the replica set primary.
616
695
        """
 
696
        mongo = self.__pools.get(self.__writer)
 
697
        if mongo and 'pool' in mongo:
 
698
            mongo['pool'].reset()
617
699
        self.__writer = None
618
700
 
619
701
    def close(self):
652
734
        else:
653
735
            raise OperationFailure(error["err"])
654
736
 
655
 
    def __recv_data(self, length, sock):
 
737
    def __recv_data(self, length, sock_info):
656
738
        """Lowest level receive operation.
657
739
 
658
740
        Takes length to receive and repeatedly calls recv until able to
660
742
        """
661
743
        chunks = []
662
744
        while length:
663
 
            chunk = sock.recv(length)
664
 
            if chunk == "":
 
745
            chunk = sock_info.sock.recv(length)
 
746
            if chunk == EMPTY:
665
747
                raise ConnectionFailure("connection closed")
666
748
            length -= len(chunk)
667
749
            chunks.append(chunk)
668
 
        return "".join(chunks)
 
750
        return EMPTY.join(chunks)
669
751
 
670
752
    def __recv_msg(self, operation, request_id, sock):
671
753
        """Receive a message in response to `request_id` on `sock`.
718
800
        else:
719
801
            mongo = self.__pools[_connection_to_use]
720
802
 
 
803
        sock_info = None
721
804
        try:
722
 
            sock = self.__socket(mongo)
 
805
            sock_info = self.__socket(mongo)
723
806
            rqst_id, data = self.__check_bson_size(msg,
724
807
                                                   mongo['max_bson_size'])
725
 
            sock.sendall(data)
 
808
            sock_info.sock.sendall(data)
726
809
            # Safe mode. We pack the message together with a lastError
727
810
            # message and send both. We then get the response (to the
728
811
            # lastError) and raise OperationFailure if it is an error
729
812
            # response.
 
813
            rv = None
730
814
            if safe:
731
 
                response = self.__recv_msg(1, rqst_id, sock)
732
 
                return self.__check_response_to_last_error(response)
733
 
            return None
 
815
                response = self.__recv_msg(1, rqst_id, sock_info)
 
816
                rv = self.__check_response_to_last_error(response)
 
817
            mongo['pool'].return_socket(sock_info)
 
818
            return rv
734
819
        except(ConnectionFailure, socket.error), why:
735
 
            mongo['pool'].discard_socket()
 
820
            mongo['pool'].discard_socket(sock_info)
736
821
            if _connection_to_use in (None, -1):
737
822
                self.disconnect()
738
823
            raise AutoReconnect(str(why))
739
824
        except:
740
 
            mongo['pool'].discard_socket()
 
825
            mongo['pool'].discard_socket(sock_info)
741
826
            raise
742
827
 
743
 
        mongo['pool'].return_socket()
744
 
 
745
828
    def __send_and_receive(self, mongo, msg, **kwargs):
746
829
        """Send a message on the given socket and return the response data.
747
830
        """
 
831
        sock_info = None
748
832
        try:
749
 
            sock = self.__socket(mongo)
 
833
            sock_info = self.__socket(mongo)
 
834
 
750
835
            if "network_timeout" in kwargs:
751
 
                sock.settimeout(kwargs['network_timeout'])
 
836
                sock_info.sock.settimeout(kwargs['network_timeout'])
752
837
 
753
838
            rqst_id, data = self.__check_bson_size(msg,
754
839
                                                   mongo['max_bson_size'])
755
 
            sock.sendall(data)
756
 
            response = self.__recv_msg(1, rqst_id, sock)
 
840
            sock_info.sock.sendall(data)
 
841
            response = self.__recv_msg(1, rqst_id, sock_info)
757
842
 
758
843
            if "network_timeout" in kwargs:
759
 
                sock.settimeout(self.__net_timeout)
760
 
            mongo['pool'].return_socket()
 
844
                sock_info.sock.settimeout(self.__net_timeout)
 
845
            mongo['pool'].return_socket(sock_info)
761
846
 
762
847
            return response
763
848
        except (ConnectionFailure, socket.error), why:
764
 
            host, port = mongo['pool'].host
765
 
            mongo['pool'].discard_socket()
 
849
            host, port = mongo['pool'].pair
 
850
            mongo['pool'].discard_socket(sock_info)
766
851
            raise AutoReconnect("%s:%d: %s" % (host, port, str(why)))
767
852
        except:
768
 
            mongo['pool'].discard_socket()
 
853
            mongo['pool'].discard_socket(sock_info)
769
854
            raise
770
855
 
771
856
    def _send_message_with_response(self, msg, _connection_to_use=None,
785
870
                    mongo = self.__find_primary()
786
871
                else:
787
872
                    mongo = self.__pools[_connection_to_use]
788
 
                return mongo['pool'].host, self.__send_and_receive(mongo,
 
873
                return mongo['pool'].pair, self.__send_and_receive(mongo,
789
874
                                                                   msg,
790
875
                                                                   **kwargs)
791
876
            elif _must_use_master or not read_pref:
792
877
                mongo = self.__find_primary()
793
 
                return mongo['pool'].host, self.__send_and_receive(mongo,
 
878
                return mongo['pool'].pair, self.__send_and_receive(mongo,
794
879
                                                                   msg,
795
880
                                                                   **kwargs)
796
881
        except AutoReconnect:
797
 
            if mongo == self.__writer:
 
882
            if mongo == self.__pools.get(self.__writer):
798
883
                self.disconnect()
799
884
            raise
800
885
 
809
894
        if read_pref == ReadPreference.SECONDARY:
810
895
            try:
811
896
                mongo = self.__find_primary()
812
 
                return mongo['pool'].host, self.__send_and_receive(mongo,
 
897
                return mongo['pool'].pair, self.__send_and_receive(mongo,
813
898
                                                                   msg,
814
899
                                                                   **kwargs)
815
900
            except AutoReconnect, why:
816
901
                self.disconnect()
817
 
                errors.append(why)
 
902
                errors.append(str(why))
818
903
        raise AutoReconnect(', '.join(errors))
819
904
 
820
 
    def __cmp__(self, other):
 
905
    def start_request(self):
 
906
        """Ensure the current thread or greenlet always uses the same socket
 
907
        until it calls :meth:`end_request`. For
 
908
        :class:`~pymongo.ReadPreference` PRIMARY, auto_start_request=True
 
909
        ensures consistent reads, even if you read after an unsafe write. For
 
910
        read preferences other than PRIMARY, there are no consistency
 
911
        guarantees.
 
912
 
 
913
        In Python 2.6 and above, or in Python 2.5 with
 
914
        "from __future__ import with_statement", :meth:`start_request` can be
 
915
        used as a context manager:
 
916
 
 
917
        >>> connection = pymongo.ReplicaSetConnection(auto_start_request=False)
 
918
        >>> db = connection.test
 
919
        >>> _id = db.test_collection.insert({}, safe=True)
 
920
        >>> with connection.start_request():
 
921
        ...     for i in range(100):
 
922
        ...         db.test_collection.update({'_id': _id}, {'$set': {'i':i}})
 
923
        ...
 
924
        ...     # Definitely read the document after the final update completes
 
925
        ...     print db.test_collection.find({'_id': _id})
 
926
 
 
927
        .. versionadded:: 2.2
 
928
           The :class:`~pymongo.pool.Request` return value.
 
929
           :meth:`start_request` previously returned None
 
930
        """
 
931
        for mongo in self.__pools.values():
 
932
            if 'pool' in mongo:
 
933
                mongo['pool'].start_request()
 
934
 
 
935
        self.__in_request = True
 
936
        return pool.Request(self)
 
937
 
 
938
    def in_request(self):
 
939
        """True if :meth:`start_request` has been called, but not
 
940
        :meth:`end_request`, or if `auto_start_request` is True and
 
941
        :meth:`end_request` has not been called in this thread or greenlet.
 
942
        """
 
943
        return self.__in_request
 
944
 
 
945
    def end_request(self):
 
946
        """Undo :meth:`start_request` and allow this thread's connections to
 
947
        replica set members to return to the pool.
 
948
 
 
949
        Calling :meth:`end_request` allows the :class:`~socket.socket` that has
 
950
        been reserved for this thread by :meth:`start_request` to be returned
 
951
        to the pool. Other threads will then be able to re-use that
 
952
        :class:`~socket.socket`. If your application uses many threads, or has
 
953
        long-running threads that infrequently perform MongoDB operations, then
 
954
        judicious use of this method can lead to performance gains. Care should
 
955
        be taken, however, to make sure that :meth:`end_request` is not called
 
956
        in the middle of a sequence of operations in which ordering is
 
957
        important. This could lead to unexpected results.
 
958
        """
 
959
        for mongo in self.__pools.values():
 
960
            if 'pool' in mongo:
 
961
                mongo['pool'].end_request()
 
962
 
 
963
        self.__in_request = False
 
964
 
 
965
    def __eq__(self, other):
821
966
        # XXX: Implement this?
822
967
        return NotImplemented
823
968
 
878
1023
        """Drop a database.
879
1024
 
880
1025
        Raises :class:`TypeError` if `name_or_database` is not an instance of
881
 
        ``(str, unicode, Database)``
 
1026
        :class:`basestring` (:class:`str` in python 3) or Database
882
1027
 
883
1028
        :Parameters:
884
1029
          - `name_or_database`: the name of a database to drop, or a
891
1036
 
892
1037
        if not isinstance(name, basestring):
893
1038
            raise TypeError("name_or_database must be an instance of "
894
 
                            "(Database, str, unicode)")
 
1039
                            "%s or Database" % (basestring.__name__,))
895
1040
 
896
1041
        self._purge_index(name)
897
1042
        self[name].command("dropDatabase")
901
1046
        """Copy a database, potentially from another host.
902
1047
 
903
1048
        Raises :class:`TypeError` if `from_name` or `to_name` is not
904
 
        an instance of :class:`basestring`. Raises
905
 
        :class:`~pymongo.errors.InvalidName` if `to_name` is not a
906
 
        valid database name.
 
1049
        an instance of :class:`basestring` (:class:`str` in python 3).
 
1050
        Raises :class:`~pymongo.errors.InvalidName` if `to_name` is
 
1051
        not a valid database name.
907
1052
 
908
1053
        If `from_host` is ``None`` the current host is used as the
909
1054
        source. Otherwise the database is copied from `from_host`.
922
1067
           version **>= 1.3.3+**.
923
1068
        """
924
1069
        if not isinstance(from_name, basestring):
925
 
            raise TypeError("from_name must be an instance of basestring")
 
1070
            raise TypeError("from_name must be an instance "
 
1071
                            "of %s" % (basestring.__name__,))
926
1072
        if not isinstance(to_name, basestring):
927
 
            raise TypeError("to_name must be an instance of basestring")
 
1073
            raise TypeError("to_name must be an instance "
 
1074
                            "of %s" % (basestring.__name__,))
928
1075
 
929
1076
        database._check_name(to_name)
930
1077
 
933
1080
        if from_host is not None:
934
1081
            command["fromhost"] = from_host
935
1082
 
936
 
        if username is not None:
937
 
            nonce = self.admin.command("copydbgetnonce",
938
 
                                       fromhost=from_host)["nonce"]
939
 
            command["username"] = username
940
 
            command["nonce"] = nonce
941
 
            command["key"] = helpers._auth_key(nonce, username, password)
 
1083
        in_request = self.in_request()
 
1084
        try:
 
1085
            if not in_request:
 
1086
                self.start_request()
 
1087
            if username is not None:
 
1088
                nonce = self.admin.command("copydbgetnonce",
 
1089
                                           fromhost=from_host)["nonce"]
 
1090
                command["username"] = username
 
1091
                command["nonce"] = nonce
 
1092
                command["key"] = helpers._auth_key(nonce, username, password)
942
1093
 
943
 
        return self.admin.command("copydb", **command)
 
1094
            return self.admin.command("copydb", **command)
 
1095
        finally:
 
1096
            if not in_request:
 
1097
                self.end_request()