~crunch.io/ubuntu/precise/pymongo/unstable

« back to all changes in this revision

Viewing changes to pymongo/mongo_client.py

  • Committer: Joseph Tate
  • Date: 2013-01-31 08:00:57 UTC
  • mfrom: (1.1.12)
  • Revision ID: jtate@dragonstrider.com-20130131080057-y7lv17xi6x8c1j5x
New upstream release.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
# Copyright 2009-2012 10gen, Inc.
 
2
#
 
3
# Licensed under the Apache License, Version 2.0 (the "License"); you
 
4
# may not use this file except in compliance with the License.  You
 
5
# may obtain a copy of the License at
 
6
#
 
7
# http://www.apache.org/licenses/LICENSE-2.0
 
8
#
 
9
# Unless required by applicable law or agreed to in writing, software
 
10
# distributed under the License is distributed on an "AS IS" BASIS,
 
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
12
# implied.  See the License for the specific language governing
 
13
# permissions and limitations under the License.
 
14
 
 
15
"""Tools for connecting to MongoDB.
 
16
 
 
17
.. seealso:: Module :mod:`~pymongo.master_slave_connection` for
 
18
   connecting to master-slave clusters, and
 
19
   :doc:`/examples/high_availability` for an example of how to connect
 
20
   to a replica set, or specify a list of mongos instances for automatic
 
21
   failover.
 
22
 
 
23
To get a :class:`~pymongo.database.Database` instance from a
 
24
:class:`MongoClient` use either dictionary-style or attribute-style
 
25
access:
 
26
 
 
27
.. doctest::
 
28
 
 
29
  >>> from pymongo import MongoClient
 
30
  >>> c = MongoClient()
 
31
  >>> c.test_database
 
32
  Database(MongoClient('localhost', 27017), u'test_database')
 
33
  >>> c['test-database']
 
34
  Database(MongoClient('localhost', 27017), u'test-database')
 
35
"""
 
36
 
 
37
import datetime
 
38
import random
 
39
import socket
 
40
import struct
 
41
import time
 
42
import warnings
 
43
 
 
44
from bson.py3compat import b
 
45
from bson.son import SON
 
46
from pymongo import (common,
 
47
                     database,
 
48
                     helpers,
 
49
                     message,
 
50
                     pool,
 
51
                     uri_parser)
 
52
from pymongo.cursor_manager import CursorManager
 
53
from pymongo.errors import (AutoReconnect,
 
54
                            ConfigurationError,
 
55
                            ConnectionFailure,
 
56
                            DuplicateKeyError,
 
57
                            InvalidDocument,
 
58
                            InvalidURI,
 
59
                            OperationFailure)
 
60
 
 
61
EMPTY = b("")
 
62
 
 
63
def _partition_node(node):
 
64
    """Split a host:port string returned from mongod/s into
 
65
    a (host, int(port)) pair needed for socket.connect().
 
66
    """
 
67
    host = node
 
68
    port = 27017
 
69
    idx = node.rfind(':')
 
70
    if idx != -1:
 
71
        host, port = node[:idx], int(node[idx + 1:])
 
72
    if host.startswith('['):
 
73
        host = host[1:-1]
 
74
    return host, port
 
75
 
 
76
 
 
77
class MongoClient(common.BaseObject):
 
78
    """Connection to MongoDB.
 
79
    """
 
80
 
 
81
    HOST = "localhost"
 
82
    PORT = 27017
 
83
 
 
84
    __max_bson_size = 4 * 1024 * 1024
 
85
 
 
86
    def __init__(self, host=None, port=None, max_pool_size=10,
 
87
                 document_class=dict, tz_aware=False, _connect=True, **kwargs):
 
88
        """Create a new connection to a single MongoDB instance at *host:port*.
 
89
 
 
90
        The resultant connection object has connection-pooling built
 
91
        in. It also performs auto-reconnection when necessary. If an
 
92
        operation fails because of a connection error,
 
93
        :class:`~pymongo.errors.ConnectionFailure` is raised. If
 
94
        auto-reconnection will be performed,
 
95
        :class:`~pymongo.errors.AutoReconnect` will be
 
96
        raised. Application code should handle this exception
 
97
        (recognizing that the operation failed) and then continue to
 
98
        execute.
 
99
 
 
100
        Raises :class:`TypeError` if port is not an instance of
 
101
        ``int``. Raises :class:`~pymongo.errors.ConnectionFailure` if
 
102
        the connection cannot be made.
 
103
 
 
104
        The `host` parameter can be a full `mongodb URI
 
105
        <http://dochub.mongodb.org/core/connections>`_, in addition to
 
106
        a simple hostname. It can also be a list of hostnames or
 
107
        URIs. Any port specified in the host string(s) will override
 
108
        the `port` parameter. If multiple mongodb URIs containing
 
109
        database or auth information are passed, the last database,
 
110
        username, and password present will be used.  For username and
 
111
        passwords reserved characters like ':', '/', '+' and '@' must be
 
112
        escaped following RFC 2396.
 
113
 
 
114
        :Parameters:
 
115
          - `host` (optional): hostname or IP address of the
 
116
            instance to connect to, or a mongodb URI, or a list of
 
117
            hostnames / mongodb URIs. If `host` is an IPv6 literal
 
118
            it must be enclosed in '[' and ']' characters following
 
119
            the RFC2732 URL syntax (e.g. '[::1]' for localhost)
 
120
          - `port` (optional): port number on which to connect
 
121
          - `max_pool_size` (optional): The maximum number of idle connections
 
122
            to keep open in the pool for future use
 
123
          - `document_class` (optional): default class to use for
 
124
            documents returned from queries on this connection
 
125
          - `tz_aware` (optional): if ``True``,
 
126
            :class:`~datetime.datetime` instances returned as values
 
127
            in a document by this :class:`MongoClient` will be timezone
 
128
            aware (otherwise they will be naive)
 
129
 
 
130
          **Other optional parameters can be passed as keyword arguments:**
 
131
 
 
132
          - `w`: (integer or string) If this is a replica set, write operations
 
133
            will block until they have been replicated to the specified number
 
134
            or tagged set of servers. `w=<int>` always includes the replica set
 
135
            primary (e.g. w=3 means write to the primary and wait until
 
136
            replicated to **two** secondaries). **Passing w=0 disables write
 
137
            acknowledgement and all other write concern options.**
 
138
          - `wtimeout`: (integer) Used in conjunction with `w`. Specify a value
 
139
            in milliseconds to control how long to wait for write propagation
 
140
            to complete. If replication does not complete in the given
 
141
            timeframe, a timeout exception is raised.
 
142
          - `j`: If ``True`` block until write operations have been committed
 
143
            to the journal. Ignored if the server is running without journaling.
 
144
          - `fsync`: If ``True`` force the database to fsync all files before
 
145
            returning. When used with `j` the server awaits the next group
 
146
            commit before returning.
 
147
          - `replicaSet`: (string) The name of the replica set to connect to.
 
148
            The driver will verify that the replica set it connects to matches
 
149
            this name. Implies that the hosts specified are a seed list and the
 
150
            driver should attempt to find all members of the set.
 
151
          - `socketTimeoutMS`: (integer) How long (in milliseconds) a send or
 
152
            receive on a socket can take before timing out.
 
153
          - `connectTimeoutMS`: (integer) How long (in milliseconds) a
 
154
            connection can take to be opened before timing out.
 
155
          - `ssl`: If ``True``, create the connection to the server using SSL.
 
156
          - `read_preference`: The read preference for this connection.
 
157
            See :class:`~pymongo.read_preferences.ReadPreference` for available
 
158
            options.
 
159
          - `auto_start_request`: If ``True``, each thread that accesses
 
160
            this :class:`MongoClient` has a socket allocated to it for the
 
161
            thread's lifetime.  This ensures consistent reads, even if you
 
162
            read after an unacknowledged write. Defaults to ``False``
 
163
          - `use_greenlets`: If ``True``, :meth:`start_request()` will ensure
 
164
            that the current greenlet uses the same socket for all
 
165
            operations until :meth:`end_request()`
 
166
 
 
167
        .. seealso:: :meth:`end_request`
 
168
 
 
169
        .. mongodoc:: connections
 
170
 
 
171
        .. versionadded:: 2.4
 
172
        """
 
173
        if host is None:
 
174
            host = self.HOST
 
175
        if isinstance(host, basestring):
 
176
            host = [host]
 
177
        if port is None:
 
178
            port = self.PORT
 
179
        if not isinstance(port, int):
 
180
            raise TypeError("port must be an instance of int")
 
181
 
 
182
        seeds = set()
 
183
        username = None
 
184
        password = None
 
185
        db = None
 
186
        opts = {}
 
187
        for entity in host:
 
188
            if "://" in entity:
 
189
                if entity.startswith("mongodb://"):
 
190
                    res = uri_parser.parse_uri(entity, port)
 
191
                    seeds.update(res["nodelist"])
 
192
                    username = res["username"] or username
 
193
                    password = res["password"] or password
 
194
                    db = res["database"] or db
 
195
                    opts = res["options"]
 
196
                else:
 
197
                    idx = entity.find("://")
 
198
                    raise InvalidURI("Invalid URI scheme: "
 
199
                                     "%s" % (entity[:idx],))
 
200
            else:
 
201
                seeds.update(uri_parser.split_hosts(entity, port))
 
202
        if not seeds:
 
203
            raise ConfigurationError("need to specify at least one host")
 
204
 
 
205
        self.__nodes = seeds
 
206
        self.__host = None
 
207
        self.__port = None
 
208
        self.__is_primary = False
 
209
        self.__is_mongos = False
 
210
 
 
211
        # _pool_class option is for deep customization of PyMongo, e.g. Motor.
 
212
        # SHOULD NOT BE USED BY DEVELOPERS EXTERNAL TO 10GEN.
 
213
        pool_class = kwargs.pop('_pool_class', pool.Pool)
 
214
 
 
215
        options = {}
 
216
        for option, value in kwargs.iteritems():
 
217
            option, value = common.validate(option, value)
 
218
            options[option] = value
 
219
        options.update(opts)
 
220
 
 
221
        self.__max_pool_size = common.validate_positive_integer(
 
222
                                                'max_pool_size', max_pool_size)
 
223
 
 
224
        self.__cursor_manager = CursorManager(self)
 
225
 
 
226
        self.__repl = options.get('replicaset')
 
227
        if len(seeds) == 1 and not self.__repl:
 
228
            self.__direct = True
 
229
        else:
 
230
            self.__direct = False
 
231
            self.__nodes = set()
 
232
 
 
233
        self.__net_timeout = options.get('sockettimeoutms')
 
234
        self.__conn_timeout = options.get('connecttimeoutms')
 
235
        self.__use_ssl = options.get('ssl', False)
 
236
        if self.__use_ssl and not pool.have_ssl:
 
237
            raise ConfigurationError("The ssl module is not available. If you "
 
238
                                     "are using a python version previous to "
 
239
                                     "2.6 you must install the ssl package "
 
240
                                     "from PyPI.")
 
241
 
 
242
        self.__use_greenlets = options.get('use_greenlets', False)
 
243
        self.__pool = pool_class(
 
244
            None,
 
245
            self.__max_pool_size,
 
246
            self.__net_timeout,
 
247
            self.__conn_timeout,
 
248
            self.__use_ssl,
 
249
            use_greenlets=self.__use_greenlets)
 
250
 
 
251
        self.__document_class = document_class
 
252
        self.__tz_aware = common.validate_boolean('tz_aware', tz_aware)
 
253
        self.__auto_start_request = options.get('auto_start_request', False)
 
254
 
 
255
        # cache of existing indexes used by ensure_index ops
 
256
        self.__index_cache = {}
 
257
        self.__auth_credentials = {}
 
258
 
 
259
        super(MongoClient, self).__init__(**options)
 
260
        if self.slave_okay:
 
261
            warnings.warn("slave_okay is deprecated. Please "
 
262
                          "use read_preference instead.", DeprecationWarning)
 
263
 
 
264
        if _connect:
 
265
            try:
 
266
                self.__find_node(seeds)
 
267
            except AutoReconnect, e:
 
268
                # ConnectionFailure makes more sense here than AutoReconnect
 
269
                raise ConnectionFailure(str(e))
 
270
 
 
271
        if db and username is None:
 
272
            warnings.warn("database name in URI is being ignored. If you wish "
 
273
                          "to authenticate to %s, you must provide a username "
 
274
                          "and password." % (db,))
 
275
        if username:
 
276
            db = db or "admin"
 
277
            if not self[db].authenticate(username, password):
 
278
                raise ConfigurationError("authentication failed")
 
279
 
 
280
    def _cached(self, dbname, coll, index):
 
281
        """Test if `index` is cached.
 
282
        """
 
283
        cache = self.__index_cache
 
284
        now = datetime.datetime.utcnow()
 
285
        return (dbname in cache and
 
286
                coll in cache[dbname] and
 
287
                index in cache[dbname][coll] and
 
288
                now < cache[dbname][coll][index])
 
289
 
 
290
    def _cache_index(self, database, collection, index, cache_for):
 
291
        """Add an index to the index cache for ensure_index operations.
 
292
        """
 
293
        now = datetime.datetime.utcnow()
 
294
        expire = datetime.timedelta(seconds=cache_for) + now
 
295
 
 
296
        if database not in self.__index_cache:
 
297
            self.__index_cache[database] = {}
 
298
            self.__index_cache[database][collection] = {}
 
299
            self.__index_cache[database][collection][index] = expire
 
300
 
 
301
        elif collection not in self.__index_cache[database]:
 
302
            self.__index_cache[database][collection] = {}
 
303
            self.__index_cache[database][collection][index] = expire
 
304
 
 
305
        else:
 
306
            self.__index_cache[database][collection][index] = expire
 
307
 
 
308
    def _purge_index(self, database_name,
 
309
                     collection_name=None, index_name=None):
 
310
        """Purge an index from the index cache.
 
311
 
 
312
        If `index_name` is None purge an entire collection.
 
313
 
 
314
        If `collection_name` is None purge an entire database.
 
315
        """
 
316
        if not database_name in self.__index_cache:
 
317
            return
 
318
 
 
319
        if collection_name is None:
 
320
            del self.__index_cache[database_name]
 
321
            return
 
322
 
 
323
        if not collection_name in self.__index_cache[database_name]:
 
324
            return
 
325
 
 
326
        if index_name is None:
 
327
            del self.__index_cache[database_name][collection_name]
 
328
            return
 
329
 
 
330
        if index_name in self.__index_cache[database_name][collection_name]:
 
331
            del self.__index_cache[database_name][collection_name][index_name]
 
332
 
 
333
    def _cache_credentials(self, db_name, username, password):
 
334
        """Add credentials to the database authentication cache
 
335
        for automatic login when a socket is created.
 
336
 
 
337
        If credentials are already cached for `db_name` they
 
338
        will be replaced.
 
339
        """
 
340
        self.__auth_credentials[db_name] = (username, password)
 
341
 
 
342
    def _purge_credentials(self, db_name=None):
 
343
        """Purge credentials from the database authentication cache.
 
344
 
 
345
        If `db_name` is None purge credentials for all databases.
 
346
        """
 
347
        if db_name is None:
 
348
            self.__auth_credentials.clear()
 
349
        elif db_name in self.__auth_credentials:
 
350
            del self.__auth_credentials[db_name]
 
351
 
 
352
    def __check_auth(self, sock_info):
 
353
        """Authenticate using cached database credentials.
 
354
        """
 
355
        authset = sock_info.authset
 
356
        names = set(self.__auth_credentials.iterkeys())
 
357
 
 
358
        # Logout from any databases no longer listed in the credentials cache.
 
359
        for dbname in authset - names:
 
360
            try:
 
361
                self.__simple_command(sock_info, dbname, {'logout': 1})
 
362
            # TODO: We used this socket to logout. Fix logout so we don't
 
363
            # have to catch this.
 
364
            except OperationFailure:
 
365
                pass
 
366
            authset.discard(dbname)
 
367
 
 
368
        for db_name in names - authset:
 
369
            user, pwd = self.__auth_credentials[db_name]
 
370
            self.__auth(sock_info, db_name, user, pwd)
 
371
            authset.add(db_name)
 
372
 
 
373
    @property
 
374
    def host(self):
 
375
        """Current connected host.
 
376
 
 
377
        .. versionchanged:: 1.3
 
378
           ``host`` is now a property rather than a method.
 
379
        """
 
380
        return self.__host
 
381
 
 
382
    @property
 
383
    def port(self):
 
384
        """Current connected port.
 
385
 
 
386
        .. versionchanged:: 1.3
 
387
           ``port`` is now a property rather than a method.
 
388
        """
 
389
        return self.__port
 
390
 
 
391
    @property
 
392
    def is_primary(self):
 
393
        """If this instance is connected to a standalone, a replica-set
 
394
        primary, or the master of a master-slave set.
 
395
 
 
396
        .. versionadded:: 2.3
 
397
        """
 
398
        return self.__is_primary
 
399
 
 
400
    @property
 
401
    def is_mongos(self):
 
402
        """If this instance is connected to mongos.
 
403
 
 
404
        .. versionadded:: 2.3
 
405
        """
 
406
        return self.__is_mongos
 
407
 
 
408
    @property
 
409
    def max_pool_size(self):
 
410
        """The maximum number of idle connections kept open in the pool for
 
411
        future use.
 
412
 
 
413
        .. note:: ``max_pool_size`` does not cap the number of concurrent
 
414
          connections to the server; there is currently no way to limit the
 
415
          number of connections. ``max_pool_size`` only limits the number of
 
416
          **idle** connections kept open when they are returned to the pool.
 
417
 
 
418
        .. versionadded:: 1.11
 
419
        """
 
420
        return self.__max_pool_size
 
421
 
 
422
    @property
 
423
    def use_greenlets(self):
 
424
        """Whether calling :meth:`start_request` assigns greenlet-local,
 
425
        rather than thread-local, sockets.
 
426
 
 
427
        .. versionadded:: 2.4.2
 
428
        """
 
429
        return self.__use_greenlets
 
430
 
 
431
    @property
 
432
    def nodes(self):
 
433
        """List of all known nodes.
 
434
 
 
435
        Includes both nodes specified when this instance was created,
 
436
        as well as nodes discovered through the replica set discovery
 
437
        mechanism.
 
438
 
 
439
        .. versionadded:: 1.8
 
440
        """
 
441
        return self.__nodes
 
442
 
 
443
    @property
 
444
    def auto_start_request(self):
 
445
        """Is auto_start_request enabled?
 
446
        """
 
447
        return self.__auto_start_request
 
448
 
 
449
    def get_document_class(self):
 
450
        return self.__document_class
 
451
 
 
452
    def set_document_class(self, klass):
 
453
        self.__document_class = klass
 
454
 
 
455
    document_class = property(get_document_class, set_document_class,
 
456
                              doc="""Default class to use for documents
 
457
                              returned on this connection.
 
458
 
 
459
                              .. versionadded:: 1.7
 
460
                              """)
 
461
 
 
462
    @property
 
463
    def tz_aware(self):
 
464
        """Does this connection return timezone-aware datetimes?
 
465
 
 
466
        .. versionadded:: 1.8
 
467
        """
 
468
        return self.__tz_aware
 
469
 
 
470
    @property
 
471
    def max_bson_size(self):
 
472
        """Return the maximum size BSON object the connected server
 
473
        accepts in bytes. Defaults to 4MB in server < 1.7.4.
 
474
 
 
475
        .. versionadded:: 1.10
 
476
        """
 
477
        return self.__max_bson_size
 
478
 
 
479
    def __simple_command(self, sock_info, dbname, spec):
 
480
        """Send a command to the server.
 
481
        """
 
482
        rqst_id, msg, _ = message.query(0, dbname + '.$cmd', 0, -1, spec)
 
483
        start = time.time()
 
484
        try:
 
485
            sock_info.sock.sendall(msg)
 
486
            response = self.__receive_message_on_socket(1, rqst_id, sock_info)
 
487
        except:
 
488
            sock_info.close()
 
489
            raise
 
490
 
 
491
        end = time.time()
 
492
        response = helpers._unpack_response(response)['data'][0]
 
493
        msg = "command %r failed: %%s" % spec
 
494
        helpers._check_command_response(response, None, msg)
 
495
        return response, end - start
 
496
 
 
497
    def __auth(self, sock_info, dbname, user, passwd):
 
498
        """Authenticate socket against database `dbname`.
 
499
        """
 
500
        # Get a nonce
 
501
        response, _ = self.__simple_command(sock_info, dbname, {'getnonce': 1})
 
502
        nonce = response['nonce']
 
503
        key = helpers._auth_key(nonce, user, passwd)
 
504
 
 
505
        # Actually authenticate
 
506
        query = SON([('authenticate', 1),
 
507
            ('user', user), ('nonce', nonce), ('key', key)])
 
508
        self.__simple_command(sock_info, dbname, query)
 
509
 
 
510
    def __try_node(self, node):
 
511
        """Try to connect to this node and see if it works for our connection
 
512
        type. Returns ((host, port), ismaster, isdbgrid, res_time).
 
513
 
 
514
        :Parameters:
 
515
         - `node`: The (host, port) pair to try.
 
516
        """
 
517
        self.disconnect()
 
518
        self.__host, self.__port = node
 
519
 
 
520
        # Call 'ismaster' directly so we can get a response time.
 
521
        sock_info = self.__socket()
 
522
        response, res_time = self.__simple_command(sock_info,
 
523
                                                   'admin',
 
524
                                                   {'ismaster': 1})
 
525
        self.__pool.maybe_return_socket(sock_info)
 
526
 
 
527
        # Are we talking to a mongos?
 
528
        isdbgrid = response.get('msg', '') == 'isdbgrid'
 
529
 
 
530
        if "maxBsonObjectSize" in response:
 
531
            self.__max_bson_size = response["maxBsonObjectSize"]
 
532
 
 
533
        # Replica Set?
 
534
        if not self.__direct:
 
535
            # Check that this host is part of the given replica set.
 
536
            if self.__repl:
 
537
                set_name = response.get('setName')
 
538
                # The 'setName' field isn't returned by mongod before 1.6.2
 
539
                # so we can't assume that if it's missing this host isn't in
 
540
                # the specified set.
 
541
                if set_name and set_name != self.__repl:
 
542
                    raise ConfigurationError("%s:%d is not a member of "
 
543
                                             "replica set %s"
 
544
                                             % (node[0], node[1], self.__repl))
 
545
            if "hosts" in response:
 
546
                self.__nodes = set([_partition_node(h)
 
547
                                    for h in response["hosts"]])
 
548
            else:
 
549
                # The user passed a seed list of standalone or
 
550
                # mongos instances.
 
551
                self.__nodes.add(node)
 
552
            if response["ismaster"]:
 
553
                return node, True, isdbgrid, res_time
 
554
            elif "primary" in response:
 
555
                candidate = _partition_node(response["primary"])
 
556
                return self.__try_node(candidate)
 
557
 
 
558
            # Explain why we aren't using this connection.
 
559
            raise AutoReconnect('%s:%d is not primary or master' % node)
 
560
 
 
561
        # Direct connection
 
562
        if response.get("arbiterOnly", False) and not self.__direct:
 
563
            raise ConfigurationError("%s:%d is an arbiter" % node)
 
564
        return node, response['ismaster'], isdbgrid, res_time
 
565
 
 
566
    def __pick_nearest(self, candidates):
 
567
        """Return the 'nearest' candidate based on response time.
 
568
        """
 
569
        latency = self.secondary_acceptable_latency_ms
 
570
        # Only used for mongos high availability, res_time is in seconds.
 
571
        fastest = min([res_time for candidate, res_time in candidates])
 
572
        near_candidates = [
 
573
            candidate for candidate, res_time in candidates
 
574
            if res_time - fastest < latency / 1000.0
 
575
        ]
 
576
 
 
577
        node = random.choice(near_candidates)
 
578
        # Clear the pool from the last choice.
 
579
        self.disconnect()
 
580
        self.__host, self.__port = node
 
581
        return node
 
582
 
 
583
    def __find_node(self, seeds=None):
 
584
        """Find a host, port pair suitable for our connection type.
 
585
 
 
586
        If only one host was supplied to __init__ see if we can connect
 
587
        to it. Don't check if the host is a master/primary so we can make
 
588
        a direct connection to read from a secondary or send commands to
 
589
        an arbiter.
 
590
 
 
591
        If more than one host was supplied treat them as a seed list for
 
592
        connecting to a replica set or to support high availability for
 
593
        mongos. If connecting to a replica set try to find the primary
 
594
        and fail if we can't, possibly updating any replSet information
 
595
        on success. If a mongos seed list was provided find the "nearest"
 
596
        mongos and return it.
 
597
 
 
598
        Otherwise we iterate through the list trying to find a host we can
 
599
        send write operations to.
 
600
 
 
601
        Sets __host and __port so that :attr:`host` and :attr:`port`
 
602
        will return the address of the connected host. Sets __is_primary to
 
603
        True if this is a primary or master, else False. Sets __is_mongos
 
604
        to True if the connection is to a mongos.
 
605
        """
 
606
        errors = []
 
607
        mongos_candidates = []
 
608
        candidates = seeds or self.__nodes.copy()
 
609
        for candidate in candidates:
 
610
            try:
 
611
                node, ismaster, isdbgrid, res_time = self.__try_node(candidate)
 
612
                self.__is_primary = ismaster
 
613
                self.__is_mongos = isdbgrid
 
614
                # No need to calculate nearest if we only have one mongos.
 
615
                if isdbgrid and not self.__direct:
 
616
                    mongos_candidates.append((node, res_time))
 
617
                    continue
 
618
                elif len(mongos_candidates):
 
619
                    raise ConfigurationError("Seed list cannot contain a mix "
 
620
                                             "of mongod and mongos instances.")
 
621
                return node
 
622
            except Exception, why:
 
623
                errors.append(str(why))
 
624
 
 
625
        # If we have a mongos seed list, pick the "nearest" member.
 
626
        if len(mongos_candidates):
 
627
            self.__is_mongos = True
 
628
            return self.__pick_nearest(mongos_candidates)
 
629
 
 
630
        # Otherwise, try any hosts we discovered that were not in the seed list.
 
631
        for candidate in self.__nodes - candidates:
 
632
            try:
 
633
                node, ismaster, isdbgrid, _ = self.__try_node(candidate)
 
634
                self.__is_primary = ismaster
 
635
                self.__is_mongos = isdbgrid
 
636
                return node
 
637
            except Exception, why:
 
638
                errors.append(str(why))
 
639
        # Couldn't find a suitable host.
 
640
        self.disconnect()
 
641
        raise AutoReconnect(', '.join(errors))
 
642
 
 
643
    def __socket(self):
 
644
        """Get a SocketInfo from the pool.
 
645
        """
 
646
        host, port = (self.__host, self.__port)
 
647
        if host is None or (port is None and '/' not in host):
 
648
            host, port = self.__find_node()
 
649
 
 
650
        try:
 
651
            if self.auto_start_request and not self.in_request():
 
652
                self.start_request()
 
653
 
 
654
            sock_info = self.__pool.get_socket((host, port))
 
655
        except socket.error, why:
 
656
            self.disconnect()
 
657
 
 
658
            # Check if a unix domain socket
 
659
            if host.endswith('.sock'):
 
660
                host_details = "%s:" % host
 
661
            else:
 
662
                host_details = "%s:%d:" % (host, port)
 
663
            raise AutoReconnect("could not connect to "
 
664
                                "%s %s" % (host_details, str(why)))
 
665
        if self.__auth_credentials:
 
666
            self.__check_auth(sock_info)
 
667
        return sock_info
 
668
 
 
669
    def disconnect(self):
 
670
        """Disconnect from MongoDB.
 
671
 
 
672
        Disconnecting will close all underlying sockets in the connection
 
673
        pool. If this instance is used again it will be automatically
 
674
        re-opened. Care should be taken to make sure that :meth:`disconnect`
 
675
        is not called in the middle of a sequence of operations in which
 
676
        ordering is important. This could lead to unexpected results.
 
677
 
 
678
        .. seealso:: :meth:`end_request`
 
679
        .. versionadded:: 1.3
 
680
        """
 
681
        self.__pool.reset()
 
682
        self.__host = None
 
683
        self.__port = None
 
684
 
 
685
    def close(self):
 
686
        """Alias for :meth:`disconnect`
 
687
 
 
688
        Disconnecting will close all underlying sockets in the connection
 
689
        pool. If this instance is used again it will be automatically
 
690
        re-opened. Care should be taken to make sure that :meth:`disconnect`
 
691
        is not called in the middle of a sequence of operations in which
 
692
        ordering is important. This could lead to unexpected results.
 
693
 
 
694
        .. seealso:: :meth:`end_request`
 
695
        .. versionadded:: 2.1
 
696
        """
 
697
        self.disconnect()
 
698
 
 
699
    def alive(self):
 
700
        """Return ``False`` if there has been an error communicating with the
 
701
        server, else ``True``.
 
702
 
 
703
        This method attempts to check the status of the server with minimal I/O.
 
704
        The current thread / greenlet retrieves a socket from the pool (its
 
705
        request socket if it's in a request, or a random idle socket if it's not
 
706
        in a request) and checks whether calling `select`_ on it raises an
 
707
        error. If there are currently no idle sockets, :meth:`alive` will
 
708
        attempt to actually connect to the server.
 
709
 
 
710
        A more certain way to determine server availability is::
 
711
 
 
712
            connection.admin.command('ping')
 
713
 
 
714
        .. _select: http://docs.python.org/2/library/select.html#select.select
 
715
        """
 
716
        # In the common case, a socket is available and was used recently, so
 
717
        # calling select() on it is a reasonable attempt to see if the OS has
 
718
        # reported an error. Note this can be wasteful: __socket implicitly
 
719
        # calls select() if the socket hasn't been checked in the last second,
 
720
        # or it may create a new socket, in which case calling select() is
 
721
        # redundant.
 
722
        try:
 
723
            sock_info = self.__socket()
 
724
            return not pool._closed(sock_info.sock)
 
725
        except (socket.error, ConnectionFailure):
 
726
            return False
 
727
 
 
728
    def set_cursor_manager(self, manager_class):
 
729
        """Set this connection's cursor manager.
 
730
 
 
731
        Raises :class:`TypeError` if `manager_class` is not a subclass of
 
732
        :class:`~pymongo.cursor_manager.CursorManager`. A cursor manager
 
733
        handles closing cursors. Different managers can implement different
 
734
        policies in terms of when to actually kill a cursor that has
 
735
        been closed.
 
736
 
 
737
        :Parameters:
 
738
          - `manager_class`: cursor manager to use
 
739
 
 
740
        .. versionchanged:: 2.1+
 
741
           Deprecated support for external cursor managers.
 
742
        """
 
743
        warnings.warn("Support for external cursor managers is deprecated "
 
744
                      "and will be removed in PyMongo 3.0.", DeprecationWarning)
 
745
        manager = manager_class(self)
 
746
        if not isinstance(manager, CursorManager):
 
747
            raise TypeError("manager_class must be a subclass of "
 
748
                            "CursorManager")
 
749
 
 
750
        self.__cursor_manager = manager
 
751
 
 
752
    def __check_response_to_last_error(self, response):
 
753
        """Check a response to a lastError message for errors.
 
754
 
 
755
        `response` is a byte string representing a response to the message.
 
756
        If it represents an error response we raise OperationFailure.
 
757
 
 
758
        Return the response as a document.
 
759
        """
 
760
        response = helpers._unpack_response(response)
 
761
 
 
762
        assert response["number_returned"] == 1
 
763
        error = response["data"][0]
 
764
 
 
765
        helpers._check_command_response(error, self.disconnect)
 
766
 
 
767
        error_msg = error.get("err", "")
 
768
        if error_msg is None:
 
769
            return error
 
770
        if error_msg.startswith("not master"):
 
771
            self.disconnect()
 
772
            raise AutoReconnect(error_msg)
 
773
 
 
774
        details = error
 
775
        # mongos returns the error code in an error object
 
776
        # for some errors.
 
777
        if "errObjects" in error:
 
778
            for errobj in error["errObjects"]:
 
779
                if errobj["err"] == error_msg:
 
780
                    details = errobj
 
781
                    break
 
782
 
 
783
        if "code" in details:
 
784
            if details["code"] in [11000, 11001, 12582]:
 
785
                raise DuplicateKeyError(details["err"])
 
786
            else:
 
787
                raise OperationFailure(details["err"], details["code"])
 
788
        else:
 
789
            raise OperationFailure(details["err"])
 
790
 
 
791
    def __check_bson_size(self, message):
 
792
        """Make sure the message doesn't include BSON documents larger
 
793
        than the connected server will accept.
 
794
 
 
795
        :Parameters:
 
796
          - `message`: message to check
 
797
        """
 
798
        if len(message) == 3:
 
799
            (request_id, data, max_doc_size) = message
 
800
            if max_doc_size > self.__max_bson_size:
 
801
                raise InvalidDocument("BSON document too large (%d bytes)"
 
802
                                      " - the connected server supports"
 
803
                                      " BSON document sizes up to %d"
 
804
                                      " bytes." %
 
805
                                      (max_doc_size, self.__max_bson_size))
 
806
            return (request_id, data)
 
807
        else:
 
808
            # get_more and kill_cursors messages
 
809
            # don't include BSON documents.
 
810
            return message
 
811
 
 
812
    def _send_message(self, message, with_last_error=False, check_primary=True):
 
813
        """Say something to Mongo.
 
814
 
 
815
        Raises ConnectionFailure if the message cannot be sent. Raises
 
816
        OperationFailure if `with_last_error` is ``True`` and the
 
817
        response to the getLastError call returns an error. Return the
 
818
        response from lastError, or ``None`` if `with_last_error`
 
819
        is ``False``.
 
820
 
 
821
        :Parameters:
 
822
          - `message`: message to send
 
823
          - `with_last_error`: check getLastError status after sending the
 
824
            message
 
825
          - `check_primary`: don't try to write to a non-primary; see
 
826
            kill_cursors for an exception to this rule
 
827
        """
 
828
        if check_primary and not with_last_error and not self.is_primary:
 
829
            # The write won't succeed, bail as if we'd done a getLastError
 
830
            raise AutoReconnect("not master")
 
831
 
 
832
        sock_info = self.__socket()
 
833
        try:
 
834
            (request_id, data) = self.__check_bson_size(message)
 
835
            sock_info.sock.sendall(data)
 
836
            # Safe mode. We pack the message together with a lastError
 
837
            # message and send both. We then get the response (to the
 
838
            # lastError) and raise OperationFailure if it is an error
 
839
            # response.
 
840
            rv = None
 
841
            if with_last_error:
 
842
                response = self.__receive_message_on_socket(1, request_id,
 
843
                                                            sock_info)
 
844
                rv = self.__check_response_to_last_error(response)
 
845
 
 
846
            self.__pool.maybe_return_socket(sock_info)
 
847
            return rv
 
848
        except OperationFailure:
 
849
            self.__pool.maybe_return_socket(sock_info)
 
850
            raise
 
851
        except (ConnectionFailure, socket.error), e:
 
852
            self.disconnect()
 
853
            raise AutoReconnect(str(e))
 
854
        except:
 
855
            sock_info.close()
 
856
            raise
 
857
 
 
858
    def __receive_data_on_socket(self, length, sock_info):
 
859
        """Lowest level receive operation.
 
860
 
 
861
        Takes length to receive and repeatedly calls recv until able to
 
862
        return a buffer of that length, raising ConnectionFailure on error.
 
863
        """
 
864
        message = EMPTY
 
865
        while length:
 
866
            chunk = sock_info.sock.recv(length)
 
867
            if chunk == EMPTY:
 
868
                raise ConnectionFailure("connection closed")
 
869
            length -= len(chunk)
 
870
            message += chunk
 
871
        return message
 
872
 
 
873
    def __receive_message_on_socket(self, operation, request_id, sock_info):
 
874
        """Receive a message in response to `request_id` on `sock`.
 
875
 
 
876
        Returns the response data with the header removed.
 
877
        """
 
878
        header = self.__receive_data_on_socket(16, sock_info)
 
879
        length = struct.unpack("<i", header[:4])[0]
 
880
        assert request_id == struct.unpack("<i", header[8:12])[0], \
 
881
            "ids don't match %r %r" % (request_id,
 
882
                                       struct.unpack("<i", header[8:12])[0])
 
883
        assert operation == struct.unpack("<i", header[12:])[0]
 
884
 
 
885
        return self.__receive_data_on_socket(length - 16, sock_info)
 
886
 
 
887
    def __send_and_receive(self, message, sock_info):
 
888
        """Send a message on the given socket and return the response data.
 
889
        """
 
890
        (request_id, data) = self.__check_bson_size(message)
 
891
        try:
 
892
            sock_info.sock.sendall(data)
 
893
            return self.__receive_message_on_socket(1, request_id, sock_info)
 
894
        except:
 
895
            sock_info.close()
 
896
            raise
 
897
 
 
898
    # we just ignore _must_use_master here: it's only relevant for
 
899
    # MasterSlaveConnection instances.
 
900
    def _send_message_with_response(self, message,
 
901
                                    _must_use_master=False, **kwargs):
 
902
        """Send a message to Mongo and return the response.
 
903
 
 
904
        Sends the given message and returns the response.
 
905
 
 
906
        :Parameters:
 
907
          - `message`: (request_id, data) pair making up the message to send
 
908
        """
 
909
        sock_info = self.__socket()
 
910
 
 
911
        try:
 
912
            try:
 
913
                if "network_timeout" in kwargs:
 
914
                    sock_info.sock.settimeout(kwargs["network_timeout"])
 
915
                return self.__send_and_receive(message, sock_info)
 
916
            except (ConnectionFailure, socket.error), e:
 
917
                self.disconnect()
 
918
                raise AutoReconnect(str(e))
 
919
        finally:
 
920
            if "network_timeout" in kwargs:
 
921
                try:
 
922
                    # Restore the socket's original timeout and return it to
 
923
                    # the pool
 
924
                    sock_info.sock.settimeout(self.__net_timeout)
 
925
                    self.__pool.maybe_return_socket(sock_info)
 
926
                except socket.error:
 
927
                    # There was an exception and we've closed the socket
 
928
                    pass
 
929
            else:
 
930
                self.__pool.maybe_return_socket(sock_info)
 
931
 
 
932
    def start_request(self):
 
933
        """Ensure the current thread or greenlet always uses the same socket
 
934
        until it calls :meth:`end_request`. This ensures consistent reads,
 
935
        even if you read after an unacknowledged write.
 
936
 
 
937
        In Python 2.6 and above, or in Python 2.5 with
 
938
        "from __future__ import with_statement", :meth:`start_request` can be
 
939
        used as a context manager:
 
940
 
 
941
        >>> connection = pymongo.MongoClient(auto_start_request=False)
 
942
        >>> db = connection.test
 
943
        >>> _id = db.test_collection.insert({})
 
944
        >>> with connection.start_request():
 
945
        ...     for i in range(100):
 
946
        ...         db.test_collection.update({'_id': _id}, {'$set': {'i':i}})
 
947
        ...
 
948
        ...     # Definitely read the document after the final update completes
 
949
        ...     print db.test_collection.find({'_id': _id})
 
950
 
 
951
        If a thread or greenlet calls start_request multiple times, an equal
 
952
        number of calls to :meth:`end_request` is required to end the request.
 
953
 
 
954
        .. versionchanged:: 2.4
 
955
           Now counts the number of calls to start_request and doesn't end
 
956
           request until an equal number of calls to end_request.
 
957
 
 
958
        .. versionadded:: 2.2
 
959
           The :class:`~pymongo.pool.Request` return value.
 
960
           :meth:`start_request` previously returned None
 
961
        """
 
962
        self.__pool.start_request()
 
963
        return pool.Request(self)
 
964
 
 
965
    def in_request(self):
 
966
        """True if this thread is in a request, meaning it has a socket
 
967
        reserved for its exclusive use.
 
968
        """
 
969
        return self.__pool.in_request()
 
970
 
 
971
    def end_request(self):
 
972
        """Undo :meth:`start_request`. If :meth:`end_request` is called as many
 
973
        times as :meth:`start_request`, the request is over and this thread's
 
974
        connection returns to the pool. Extra calls to :meth:`end_request` have
 
975
        no effect.
 
976
 
 
977
        Ending a request allows the :class:`~socket.socket` that has
 
978
        been reserved for this thread by :meth:`start_request` to be returned to
 
979
        the pool. Other threads will then be able to re-use that
 
980
        :class:`~socket.socket`. If your application uses many threads, or has
 
981
        long-running threads that infrequently perform MongoDB operations, then
 
982
        judicious use of this method can lead to performance gains. Care should
 
983
        be taken, however, to make sure that :meth:`end_request` is not called
 
984
        in the middle of a sequence of operations in which ordering is
 
985
        important. This could lead to unexpected results.
 
986
        """
 
987
        self.__pool.end_request()
 
988
 
 
989
    def __eq__(self, other):
 
990
        if isinstance(other, self.__class__):
 
991
            us = (self.__host, self.__port)
 
992
            them = (other.__host, other.__port)
 
993
            return us == them
 
994
        return NotImplemented
 
995
 
 
996
    def __ne__(self, other):
 
997
        return not self == other
 
998
 
 
999
    def __repr__(self):
 
1000
        if len(self.__nodes) == 1:
 
1001
            return "MongoClient(%r, %r)" % (self.__host, self.__port)
 
1002
        else:
 
1003
            return "MongoClient(%r)" % ["%s:%d" % n for n in self.__nodes]
 
1004
 
 
1005
    def __getattr__(self, name):
 
1006
        """Get a database by name.
 
1007
 
 
1008
        Raises :class:`~pymongo.errors.InvalidName` if an invalid
 
1009
        database name is used.
 
1010
 
 
1011
        :Parameters:
 
1012
          - `name`: the name of the database to get
 
1013
        """
 
1014
        return database.Database(self, name)
 
1015
 
 
1016
    def __getitem__(self, name):
 
1017
        """Get a database by name.
 
1018
 
 
1019
        Raises :class:`~pymongo.errors.InvalidName` if an invalid
 
1020
        database name is used.
 
1021
 
 
1022
        :Parameters:
 
1023
          - `name`: the name of the database to get
 
1024
        """
 
1025
        return self.__getattr__(name)
 
1026
 
 
1027
    def close_cursor(self, cursor_id):
 
1028
        """Close a single database cursor.
 
1029
 
 
1030
        Raises :class:`TypeError` if `cursor_id` is not an instance of
 
1031
        ``(int, long)``. What closing the cursor actually means
 
1032
        depends on this connection's cursor manager.
 
1033
 
 
1034
        :Parameters:
 
1035
          - `cursor_id`: id of cursor to close
 
1036
 
 
1037
        .. seealso:: :meth:`set_cursor_manager` and
 
1038
           the :mod:`~pymongo.cursor_manager` module
 
1039
        """
 
1040
        if not isinstance(cursor_id, (int, long)):
 
1041
            raise TypeError("cursor_id must be an instance of (int, long)")
 
1042
 
 
1043
        self.__cursor_manager.close(cursor_id)
 
1044
 
 
1045
    def kill_cursors(self, cursor_ids):
 
1046
        """Send a kill cursors message with the given ids.
 
1047
 
 
1048
        Raises :class:`TypeError` if `cursor_ids` is not an instance of
 
1049
        ``list``.
 
1050
 
 
1051
        :Parameters:
 
1052
          - `cursor_ids`: list of cursor ids to kill
 
1053
        """
 
1054
        if not isinstance(cursor_ids, list):
 
1055
            raise TypeError("cursor_ids must be a list")
 
1056
        return self._send_message(
 
1057
            message.kill_cursors(cursor_ids), check_primary=False)
 
1058
 
 
1059
    def server_info(self):
 
1060
        """Get information about the MongoDB server we're connected to.
 
1061
        """
 
1062
        return self.admin.command("buildinfo")
 
1063
 
 
1064
    def database_names(self):
 
1065
        """Get a list of the names of all databases on the connected server.
 
1066
        """
 
1067
        return [db["name"] for db in
 
1068
                self.admin.command("listDatabases")["databases"]]
 
1069
 
 
1070
    def drop_database(self, name_or_database):
 
1071
        """Drop a database.
 
1072
 
 
1073
        Raises :class:`TypeError` if `name_or_database` is not an instance of
 
1074
        :class:`basestring` (:class:`str` in python 3) or Database.
 
1075
 
 
1076
        :Parameters:
 
1077
          - `name_or_database`: the name of a database to drop, or a
 
1078
            :class:`~pymongo.database.Database` instance representing the
 
1079
            database to drop
 
1080
        """
 
1081
        name = name_or_database
 
1082
        if isinstance(name, database.Database):
 
1083
            name = name.name
 
1084
 
 
1085
        if not isinstance(name, basestring):
 
1086
            raise TypeError("name_or_database must be an instance of "
 
1087
                            "%s or Database" % (basestring.__name__,))
 
1088
 
 
1089
        self._purge_index(name)
 
1090
        self[name].command("dropDatabase")
 
1091
 
 
1092
    def copy_database(self, from_name, to_name,
 
1093
                      from_host=None, username=None, password=None):
 
1094
        """Copy a database, potentially from another host.
 
1095
 
 
1096
        Raises :class:`TypeError` if `from_name` or `to_name` is not
 
1097
        an instance of :class:`basestring` (:class:`str` in python 3).
 
1098
        Raises :class:`~pymongo.errors.InvalidName` if `to_name` is
 
1099
        not a valid database name.
 
1100
 
 
1101
        If `from_host` is ``None`` the current host is used as the
 
1102
        source. Otherwise the database is copied from `from_host`.
 
1103
 
 
1104
        If the source database requires authentication, `username` and
 
1105
        `password` must be specified.
 
1106
 
 
1107
        :Parameters:
 
1108
          - `from_name`: the name of the source database
 
1109
          - `to_name`: the name of the target database
 
1110
          - `from_host` (optional): host name to copy from
 
1111
          - `username` (optional): username for source database
 
1112
          - `password` (optional): password for source database
 
1113
 
 
1114
        .. note:: Specifying `username` and `password` requires server
 
1115
           version **>= 1.3.3+**.
 
1116
 
 
1117
        .. versionadded:: 1.5
 
1118
        """
 
1119
        if not isinstance(from_name, basestring):
 
1120
            raise TypeError("from_name must be an instance "
 
1121
                            "of %s" % (basestring.__name__,))
 
1122
        if not isinstance(to_name, basestring):
 
1123
            raise TypeError("to_name must be an instance "
 
1124
                            "of %s" % (basestring.__name__,))
 
1125
 
 
1126
        database._check_name(to_name)
 
1127
 
 
1128
        command = {"fromdb": from_name, "todb": to_name}
 
1129
 
 
1130
        if from_host is not None:
 
1131
            command["fromhost"] = from_host
 
1132
 
 
1133
        try:
 
1134
            self.start_request()
 
1135
 
 
1136
            if username is not None:
 
1137
                nonce = self.admin.command("copydbgetnonce",
 
1138
                                           fromhost=from_host)["nonce"]
 
1139
                command["username"] = username
 
1140
                command["nonce"] = nonce
 
1141
                command["key"] = helpers._auth_key(nonce, username, password)
 
1142
 
 
1143
            return self.admin.command("copydb", **command)
 
1144
        finally:
 
1145
            self.end_request()
 
1146
 
 
1147
    @property
 
1148
    def is_locked(self):
 
1149
        """Is this server locked? While locked, all write operations
 
1150
        are blocked, although read operations may still be allowed.
 
1151
        Use :meth:`unlock` to unlock.
 
1152
 
 
1153
        .. versionadded:: 2.0
 
1154
        """
 
1155
        ops = self.admin.current_op()
 
1156
        return bool(ops.get('fsyncLock', 0))
 
1157
 
 
1158
    def fsync(self, **kwargs):
 
1159
        """Flush all pending writes to datafiles.
 
1160
 
 
1161
        :Parameters:
 
1162
 
 
1163
            Optional parameters can be passed as keyword arguments:
 
1164
 
 
1165
            - `lock`: If True lock the server to disallow writes.
 
1166
            - `async`: If True don't block while synchronizing.
 
1167
 
 
1168
            .. warning:: `async` and `lock` can not be used together.
 
1169
 
 
1170
            .. warning:: MongoDB does not support the `async` option
 
1171
                         on Windows and will raise an exception on that
 
1172
                         platform.
 
1173
 
 
1174
        .. versionadded:: 2.0
 
1175
        """
 
1176
        self.admin.command("fsync", **kwargs)
 
1177
 
 
1178
    def unlock(self):
 
1179
        """Unlock a previously locked server.
 
1180
 
 
1181
        .. versionadded:: 2.0
 
1182
        """
 
1183
        self.admin['$cmd'].sys.unlock.find_one()
 
1184
 
 
1185
    def __enter__(self):
 
1186
        return self
 
1187
 
 
1188
    def __exit__(self, exc_type, exc_val, exc_tb):
 
1189
        self.disconnect()
 
1190
 
 
1191
    def __iter__(self):
 
1192
        return self
 
1193
 
 
1194
    def next(self):
 
1195
        raise TypeError("'MongoClient' object is not iterable")