1
# Copyright 2009-2012 10gen, Inc.
3
# Licensed under the Apache License, Version 2.0 (the "License"); you
4
# may not use this file except in compliance with the License. You
5
# may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
# implied. See the License for the specific language governing
13
# permissions and limitations under the License.
15
"""Tools for connecting to MongoDB.
17
.. seealso:: Module :mod:`~pymongo.master_slave_connection` for
18
connecting to master-slave clusters, and
19
:doc:`/examples/high_availability` for an example of how to connect
20
to a replica set, or specify a list of mongos instances for automatic
23
To get a :class:`~pymongo.database.Database` instance from a
24
:class:`MongoClient` use either dictionary-style or attribute-style
29
>>> from pymongo import MongoClient
32
Database(MongoClient('localhost', 27017), u'test_database')
33
>>> c['test-database']
34
Database(MongoClient('localhost', 27017), u'test-database')
44
from bson.py3compat import b
45
from bson.son import SON
46
from pymongo import (common,
52
from pymongo.cursor_manager import CursorManager
53
from pymongo.errors import (AutoReconnect,
63
def _partition_node(node):
64
"""Split a host:port string returned from mongod/s into
65
a (host, int(port)) pair needed for socket.connect().
71
host, port = node[:idx], int(node[idx + 1:])
72
if host.startswith('['):
77
class MongoClient(common.BaseObject):
78
"""Connection to MongoDB.
84
__max_bson_size = 4 * 1024 * 1024
86
def __init__(self, host=None, port=None, max_pool_size=10,
87
document_class=dict, tz_aware=False, _connect=True, **kwargs):
88
"""Create a new connection to a single MongoDB instance at *host:port*.
90
The resultant connection object has connection-pooling built
91
in. It also performs auto-reconnection when necessary. If an
92
operation fails because of a connection error,
93
:class:`~pymongo.errors.ConnectionFailure` is raised. If
94
auto-reconnection will be performed,
95
:class:`~pymongo.errors.AutoReconnect` will be
96
raised. Application code should handle this exception
97
(recognizing that the operation failed) and then continue to
100
Raises :class:`TypeError` if port is not an instance of
101
``int``. Raises :class:`~pymongo.errors.ConnectionFailure` if
102
the connection cannot be made.
104
The `host` parameter can be a full `mongodb URI
105
<http://dochub.mongodb.org/core/connections>`_, in addition to
106
a simple hostname. It can also be a list of hostnames or
107
URIs. Any port specified in the host string(s) will override
108
the `port` parameter. If multiple mongodb URIs containing
109
database or auth information are passed, the last database,
110
username, and password present will be used. For username and
111
passwords reserved characters like ':', '/', '+' and '@' must be
112
escaped following RFC 2396.
115
- `host` (optional): hostname or IP address of the
116
instance to connect to, or a mongodb URI, or a list of
117
hostnames / mongodb URIs. If `host` is an IPv6 literal
118
it must be enclosed in '[' and ']' characters following
119
the RFC2732 URL syntax (e.g. '[::1]' for localhost)
120
- `port` (optional): port number on which to connect
121
- `max_pool_size` (optional): The maximum number of idle connections
122
to keep open in the pool for future use
123
- `document_class` (optional): default class to use for
124
documents returned from queries on this connection
125
- `tz_aware` (optional): if ``True``,
126
:class:`~datetime.datetime` instances returned as values
127
in a document by this :class:`MongoClient` will be timezone
128
aware (otherwise they will be naive)
130
**Other optional parameters can be passed as keyword arguments:**
132
- `w`: (integer or string) If this is a replica set, write operations
133
will block until they have been replicated to the specified number
134
or tagged set of servers. `w=<int>` always includes the replica set
135
primary (e.g. w=3 means write to the primary and wait until
136
replicated to **two** secondaries). **Passing w=0 disables write
137
acknowledgement and all other write concern options.**
138
- `wtimeout`: (integer) Used in conjunction with `w`. Specify a value
139
in milliseconds to control how long to wait for write propagation
140
to complete. If replication does not complete in the given
141
timeframe, a timeout exception is raised.
142
- `j`: If ``True`` block until write operations have been committed
143
to the journal. Ignored if the server is running without journaling.
144
- `fsync`: If ``True`` force the database to fsync all files before
145
returning. When used with `j` the server awaits the next group
146
commit before returning.
147
- `replicaSet`: (string) The name of the replica set to connect to.
148
The driver will verify that the replica set it connects to matches
149
this name. Implies that the hosts specified are a seed list and the
150
driver should attempt to find all members of the set.
151
- `socketTimeoutMS`: (integer) How long (in milliseconds) a send or
152
receive on a socket can take before timing out.
153
- `connectTimeoutMS`: (integer) How long (in milliseconds) a
154
connection can take to be opened before timing out.
155
- `ssl`: If ``True``, create the connection to the server using SSL.
156
- `read_preference`: The read preference for this connection.
157
See :class:`~pymongo.read_preferences.ReadPreference` for available
159
- `auto_start_request`: If ``True``, each thread that accesses
160
this :class:`MongoClient` has a socket allocated to it for the
161
thread's lifetime. This ensures consistent reads, even if you
162
read after an unacknowledged write. Defaults to ``False``
163
- `use_greenlets`: If ``True``, :meth:`start_request()` will ensure
164
that the current greenlet uses the same socket for all
165
operations until :meth:`end_request()`
167
.. seealso:: :meth:`end_request`
169
.. mongodoc:: connections
171
.. versionadded:: 2.4
175
if isinstance(host, basestring):
179
if not isinstance(port, int):
180
raise TypeError("port must be an instance of int")
189
if entity.startswith("mongodb://"):
190
res = uri_parser.parse_uri(entity, port)
191
seeds.update(res["nodelist"])
192
username = res["username"] or username
193
password = res["password"] or password
194
db = res["database"] or db
195
opts = res["options"]
197
idx = entity.find("://")
198
raise InvalidURI("Invalid URI scheme: "
199
"%s" % (entity[:idx],))
201
seeds.update(uri_parser.split_hosts(entity, port))
203
raise ConfigurationError("need to specify at least one host")
208
self.__is_primary = False
209
self.__is_mongos = False
211
# _pool_class option is for deep customization of PyMongo, e.g. Motor.
212
# SHOULD NOT BE USED BY DEVELOPERS EXTERNAL TO 10GEN.
213
pool_class = kwargs.pop('_pool_class', pool.Pool)
216
for option, value in kwargs.iteritems():
217
option, value = common.validate(option, value)
218
options[option] = value
221
self.__max_pool_size = common.validate_positive_integer(
222
'max_pool_size', max_pool_size)
224
self.__cursor_manager = CursorManager(self)
226
self.__repl = options.get('replicaset')
227
if len(seeds) == 1 and not self.__repl:
230
self.__direct = False
233
self.__net_timeout = options.get('sockettimeoutms')
234
self.__conn_timeout = options.get('connecttimeoutms')
235
self.__use_ssl = options.get('ssl', False)
236
if self.__use_ssl and not pool.have_ssl:
237
raise ConfigurationError("The ssl module is not available. If you "
238
"are using a python version previous to "
239
"2.6 you must install the ssl package "
242
self.__use_greenlets = options.get('use_greenlets', False)
243
self.__pool = pool_class(
245
self.__max_pool_size,
249
use_greenlets=self.__use_greenlets)
251
self.__document_class = document_class
252
self.__tz_aware = common.validate_boolean('tz_aware', tz_aware)
253
self.__auto_start_request = options.get('auto_start_request', False)
255
# cache of existing indexes used by ensure_index ops
256
self.__index_cache = {}
257
self.__auth_credentials = {}
259
super(MongoClient, self).__init__(**options)
261
warnings.warn("slave_okay is deprecated. Please "
262
"use read_preference instead.", DeprecationWarning)
266
self.__find_node(seeds)
267
except AutoReconnect, e:
268
# ConnectionFailure makes more sense here than AutoReconnect
269
raise ConnectionFailure(str(e))
271
if db and username is None:
272
warnings.warn("database name in URI is being ignored. If you wish "
273
"to authenticate to %s, you must provide a username "
274
"and password." % (db,))
277
if not self[db].authenticate(username, password):
278
raise ConfigurationError("authentication failed")
280
def _cached(self, dbname, coll, index):
281
"""Test if `index` is cached.
283
cache = self.__index_cache
284
now = datetime.datetime.utcnow()
285
return (dbname in cache and
286
coll in cache[dbname] and
287
index in cache[dbname][coll] and
288
now < cache[dbname][coll][index])
290
def _cache_index(self, database, collection, index, cache_for):
291
"""Add an index to the index cache for ensure_index operations.
293
now = datetime.datetime.utcnow()
294
expire = datetime.timedelta(seconds=cache_for) + now
296
if database not in self.__index_cache:
297
self.__index_cache[database] = {}
298
self.__index_cache[database][collection] = {}
299
self.__index_cache[database][collection][index] = expire
301
elif collection not in self.__index_cache[database]:
302
self.__index_cache[database][collection] = {}
303
self.__index_cache[database][collection][index] = expire
306
self.__index_cache[database][collection][index] = expire
308
def _purge_index(self, database_name,
309
collection_name=None, index_name=None):
310
"""Purge an index from the index cache.
312
If `index_name` is None purge an entire collection.
314
If `collection_name` is None purge an entire database.
316
if not database_name in self.__index_cache:
319
if collection_name is None:
320
del self.__index_cache[database_name]
323
if not collection_name in self.__index_cache[database_name]:
326
if index_name is None:
327
del self.__index_cache[database_name][collection_name]
330
if index_name in self.__index_cache[database_name][collection_name]:
331
del self.__index_cache[database_name][collection_name][index_name]
333
def _cache_credentials(self, db_name, username, password):
334
"""Add credentials to the database authentication cache
335
for automatic login when a socket is created.
337
If credentials are already cached for `db_name` they
340
self.__auth_credentials[db_name] = (username, password)
342
def _purge_credentials(self, db_name=None):
343
"""Purge credentials from the database authentication cache.
345
If `db_name` is None purge credentials for all databases.
348
self.__auth_credentials.clear()
349
elif db_name in self.__auth_credentials:
350
del self.__auth_credentials[db_name]
352
def __check_auth(self, sock_info):
353
"""Authenticate using cached database credentials.
355
authset = sock_info.authset
356
names = set(self.__auth_credentials.iterkeys())
358
# Logout from any databases no longer listed in the credentials cache.
359
for dbname in authset - names:
361
self.__simple_command(sock_info, dbname, {'logout': 1})
362
# TODO: We used this socket to logout. Fix logout so we don't
363
# have to catch this.
364
except OperationFailure:
366
authset.discard(dbname)
368
for db_name in names - authset:
369
user, pwd = self.__auth_credentials[db_name]
370
self.__auth(sock_info, db_name, user, pwd)
375
"""Current connected host.
377
.. versionchanged:: 1.3
378
``host`` is now a property rather than a method.
384
"""Current connected port.
386
.. versionchanged:: 1.3
387
``port`` is now a property rather than a method.
392
def is_primary(self):
393
"""If this instance is connected to a standalone, a replica-set
394
primary, or the master of a master-slave set.
396
.. versionadded:: 2.3
398
return self.__is_primary
402
"""If this instance is connected to mongos.
404
.. versionadded:: 2.3
406
return self.__is_mongos
409
def max_pool_size(self):
410
"""The maximum number of idle connections kept open in the pool for
413
.. note:: ``max_pool_size`` does not cap the number of concurrent
414
connections to the server; there is currently no way to limit the
415
number of connections. ``max_pool_size`` only limits the number of
416
**idle** connections kept open when they are returned to the pool.
418
.. versionadded:: 1.11
420
return self.__max_pool_size
423
def use_greenlets(self):
424
"""Whether calling :meth:`start_request` assigns greenlet-local,
425
rather than thread-local, sockets.
427
.. versionadded:: 2.4.2
429
return self.__use_greenlets
433
"""List of all known nodes.
435
Includes both nodes specified when this instance was created,
436
as well as nodes discovered through the replica set discovery
439
.. versionadded:: 1.8
444
def auto_start_request(self):
445
"""Is auto_start_request enabled?
447
return self.__auto_start_request
449
def get_document_class(self):
450
return self.__document_class
452
def set_document_class(self, klass):
453
self.__document_class = klass
455
document_class = property(get_document_class, set_document_class,
456
doc="""Default class to use for documents
457
returned on this connection.
459
.. versionadded:: 1.7
464
"""Does this connection return timezone-aware datetimes?
466
.. versionadded:: 1.8
468
return self.__tz_aware
471
def max_bson_size(self):
472
"""Return the maximum size BSON object the connected server
473
accepts in bytes. Defaults to 4MB in server < 1.7.4.
475
.. versionadded:: 1.10
477
return self.__max_bson_size
479
def __simple_command(self, sock_info, dbname, spec):
480
"""Send a command to the server.
482
rqst_id, msg, _ = message.query(0, dbname + '.$cmd', 0, -1, spec)
485
sock_info.sock.sendall(msg)
486
response = self.__receive_message_on_socket(1, rqst_id, sock_info)
492
response = helpers._unpack_response(response)['data'][0]
493
msg = "command %r failed: %%s" % spec
494
helpers._check_command_response(response, None, msg)
495
return response, end - start
497
def __auth(self, sock_info, dbname, user, passwd):
498
"""Authenticate socket against database `dbname`.
501
response, _ = self.__simple_command(sock_info, dbname, {'getnonce': 1})
502
nonce = response['nonce']
503
key = helpers._auth_key(nonce, user, passwd)
505
# Actually authenticate
506
query = SON([('authenticate', 1),
507
('user', user), ('nonce', nonce), ('key', key)])
508
self.__simple_command(sock_info, dbname, query)
510
def __try_node(self, node):
511
"""Try to connect to this node and see if it works for our connection
512
type. Returns ((host, port), ismaster, isdbgrid, res_time).
515
- `node`: The (host, port) pair to try.
518
self.__host, self.__port = node
520
# Call 'ismaster' directly so we can get a response time.
521
sock_info = self.__socket()
522
response, res_time = self.__simple_command(sock_info,
525
self.__pool.maybe_return_socket(sock_info)
527
# Are we talking to a mongos?
528
isdbgrid = response.get('msg', '') == 'isdbgrid'
530
if "maxBsonObjectSize" in response:
531
self.__max_bson_size = response["maxBsonObjectSize"]
534
if not self.__direct:
535
# Check that this host is part of the given replica set.
537
set_name = response.get('setName')
538
# The 'setName' field isn't returned by mongod before 1.6.2
539
# so we can't assume that if it's missing this host isn't in
541
if set_name and set_name != self.__repl:
542
raise ConfigurationError("%s:%d is not a member of "
544
% (node[0], node[1], self.__repl))
545
if "hosts" in response:
546
self.__nodes = set([_partition_node(h)
547
for h in response["hosts"]])
549
# The user passed a seed list of standalone or
551
self.__nodes.add(node)
552
if response["ismaster"]:
553
return node, True, isdbgrid, res_time
554
elif "primary" in response:
555
candidate = _partition_node(response["primary"])
556
return self.__try_node(candidate)
558
# Explain why we aren't using this connection.
559
raise AutoReconnect('%s:%d is not primary or master' % node)
562
if response.get("arbiterOnly", False) and not self.__direct:
563
raise ConfigurationError("%s:%d is an arbiter" % node)
564
return node, response['ismaster'], isdbgrid, res_time
566
def __pick_nearest(self, candidates):
567
"""Return the 'nearest' candidate based on response time.
569
latency = self.secondary_acceptable_latency_ms
570
# Only used for mongos high availability, res_time is in seconds.
571
fastest = min([res_time for candidate, res_time in candidates])
573
candidate for candidate, res_time in candidates
574
if res_time - fastest < latency / 1000.0
577
node = random.choice(near_candidates)
578
# Clear the pool from the last choice.
580
self.__host, self.__port = node
583
def __find_node(self, seeds=None):
584
"""Find a host, port pair suitable for our connection type.
586
If only one host was supplied to __init__ see if we can connect
587
to it. Don't check if the host is a master/primary so we can make
588
a direct connection to read from a secondary or send commands to
591
If more than one host was supplied treat them as a seed list for
592
connecting to a replica set or to support high availability for
593
mongos. If connecting to a replica set try to find the primary
594
and fail if we can't, possibly updating any replSet information
595
on success. If a mongos seed list was provided find the "nearest"
596
mongos and return it.
598
Otherwise we iterate through the list trying to find a host we can
599
send write operations to.
601
Sets __host and __port so that :attr:`host` and :attr:`port`
602
will return the address of the connected host. Sets __is_primary to
603
True if this is a primary or master, else False. Sets __is_mongos
604
to True if the connection is to a mongos.
607
mongos_candidates = []
608
candidates = seeds or self.__nodes.copy()
609
for candidate in candidates:
611
node, ismaster, isdbgrid, res_time = self.__try_node(candidate)
612
self.__is_primary = ismaster
613
self.__is_mongos = isdbgrid
614
# No need to calculate nearest if we only have one mongos.
615
if isdbgrid and not self.__direct:
616
mongos_candidates.append((node, res_time))
618
elif len(mongos_candidates):
619
raise ConfigurationError("Seed list cannot contain a mix "
620
"of mongod and mongos instances.")
622
except Exception, why:
623
errors.append(str(why))
625
# If we have a mongos seed list, pick the "nearest" member.
626
if len(mongos_candidates):
627
self.__is_mongos = True
628
return self.__pick_nearest(mongos_candidates)
630
# Otherwise, try any hosts we discovered that were not in the seed list.
631
for candidate in self.__nodes - candidates:
633
node, ismaster, isdbgrid, _ = self.__try_node(candidate)
634
self.__is_primary = ismaster
635
self.__is_mongos = isdbgrid
637
except Exception, why:
638
errors.append(str(why))
639
# Couldn't find a suitable host.
641
raise AutoReconnect(', '.join(errors))
644
"""Get a SocketInfo from the pool.
646
host, port = (self.__host, self.__port)
647
if host is None or (port is None and '/' not in host):
648
host, port = self.__find_node()
651
if self.auto_start_request and not self.in_request():
654
sock_info = self.__pool.get_socket((host, port))
655
except socket.error, why:
658
# Check if a unix domain socket
659
if host.endswith('.sock'):
660
host_details = "%s:" % host
662
host_details = "%s:%d:" % (host, port)
663
raise AutoReconnect("could not connect to "
664
"%s %s" % (host_details, str(why)))
665
if self.__auth_credentials:
666
self.__check_auth(sock_info)
669
def disconnect(self):
670
"""Disconnect from MongoDB.
672
Disconnecting will close all underlying sockets in the connection
673
pool. If this instance is used again it will be automatically
674
re-opened. Care should be taken to make sure that :meth:`disconnect`
675
is not called in the middle of a sequence of operations in which
676
ordering is important. This could lead to unexpected results.
678
.. seealso:: :meth:`end_request`
679
.. versionadded:: 1.3
686
"""Alias for :meth:`disconnect`
688
Disconnecting will close all underlying sockets in the connection
689
pool. If this instance is used again it will be automatically
690
re-opened. Care should be taken to make sure that :meth:`disconnect`
691
is not called in the middle of a sequence of operations in which
692
ordering is important. This could lead to unexpected results.
694
.. seealso:: :meth:`end_request`
695
.. versionadded:: 2.1
700
"""Return ``False`` if there has been an error communicating with the
701
server, else ``True``.
703
This method attempts to check the status of the server with minimal I/O.
704
The current thread / greenlet retrieves a socket from the pool (its
705
request socket if it's in a request, or a random idle socket if it's not
706
in a request) and checks whether calling `select`_ on it raises an
707
error. If there are currently no idle sockets, :meth:`alive` will
708
attempt to actually connect to the server.
710
A more certain way to determine server availability is::
712
connection.admin.command('ping')
714
.. _select: http://docs.python.org/2/library/select.html#select.select
716
# In the common case, a socket is available and was used recently, so
717
# calling select() on it is a reasonable attempt to see if the OS has
718
# reported an error. Note this can be wasteful: __socket implicitly
719
# calls select() if the socket hasn't been checked in the last second,
720
# or it may create a new socket, in which case calling select() is
723
sock_info = self.__socket()
724
return not pool._closed(sock_info.sock)
725
except (socket.error, ConnectionFailure):
728
def set_cursor_manager(self, manager_class):
729
"""Set this connection's cursor manager.
731
Raises :class:`TypeError` if `manager_class` is not a subclass of
732
:class:`~pymongo.cursor_manager.CursorManager`. A cursor manager
733
handles closing cursors. Different managers can implement different
734
policies in terms of when to actually kill a cursor that has
738
- `manager_class`: cursor manager to use
740
.. versionchanged:: 2.1+
741
Deprecated support for external cursor managers.
743
warnings.warn("Support for external cursor managers is deprecated "
744
"and will be removed in PyMongo 3.0.", DeprecationWarning)
745
manager = manager_class(self)
746
if not isinstance(manager, CursorManager):
747
raise TypeError("manager_class must be a subclass of "
750
self.__cursor_manager = manager
752
def __check_response_to_last_error(self, response):
753
"""Check a response to a lastError message for errors.
755
`response` is a byte string representing a response to the message.
756
If it represents an error response we raise OperationFailure.
758
Return the response as a document.
760
response = helpers._unpack_response(response)
762
assert response["number_returned"] == 1
763
error = response["data"][0]
765
helpers._check_command_response(error, self.disconnect)
767
error_msg = error.get("err", "")
768
if error_msg is None:
770
if error_msg.startswith("not master"):
772
raise AutoReconnect(error_msg)
775
# mongos returns the error code in an error object
777
if "errObjects" in error:
778
for errobj in error["errObjects"]:
779
if errobj["err"] == error_msg:
783
if "code" in details:
784
if details["code"] in [11000, 11001, 12582]:
785
raise DuplicateKeyError(details["err"])
787
raise OperationFailure(details["err"], details["code"])
789
raise OperationFailure(details["err"])
791
def __check_bson_size(self, message):
792
"""Make sure the message doesn't include BSON documents larger
793
than the connected server will accept.
796
- `message`: message to check
798
if len(message) == 3:
799
(request_id, data, max_doc_size) = message
800
if max_doc_size > self.__max_bson_size:
801
raise InvalidDocument("BSON document too large (%d bytes)"
802
" - the connected server supports"
803
" BSON document sizes up to %d"
805
(max_doc_size, self.__max_bson_size))
806
return (request_id, data)
808
# get_more and kill_cursors messages
809
# don't include BSON documents.
812
def _send_message(self, message, with_last_error=False, check_primary=True):
813
"""Say something to Mongo.
815
Raises ConnectionFailure if the message cannot be sent. Raises
816
OperationFailure if `with_last_error` is ``True`` and the
817
response to the getLastError call returns an error. Return the
818
response from lastError, or ``None`` if `with_last_error`
822
- `message`: message to send
823
- `with_last_error`: check getLastError status after sending the
825
- `check_primary`: don't try to write to a non-primary; see
826
kill_cursors for an exception to this rule
828
if check_primary and not with_last_error and not self.is_primary:
829
# The write won't succeed, bail as if we'd done a getLastError
830
raise AutoReconnect("not master")
832
sock_info = self.__socket()
834
(request_id, data) = self.__check_bson_size(message)
835
sock_info.sock.sendall(data)
836
# Safe mode. We pack the message together with a lastError
837
# message and send both. We then get the response (to the
838
# lastError) and raise OperationFailure if it is an error
842
response = self.__receive_message_on_socket(1, request_id,
844
rv = self.__check_response_to_last_error(response)
846
self.__pool.maybe_return_socket(sock_info)
848
except OperationFailure:
849
self.__pool.maybe_return_socket(sock_info)
851
except (ConnectionFailure, socket.error), e:
853
raise AutoReconnect(str(e))
858
def __receive_data_on_socket(self, length, sock_info):
859
"""Lowest level receive operation.
861
Takes length to receive and repeatedly calls recv until able to
862
return a buffer of that length, raising ConnectionFailure on error.
866
chunk = sock_info.sock.recv(length)
868
raise ConnectionFailure("connection closed")
873
def __receive_message_on_socket(self, operation, request_id, sock_info):
874
"""Receive a message in response to `request_id` on `sock`.
876
Returns the response data with the header removed.
878
header = self.__receive_data_on_socket(16, sock_info)
879
length = struct.unpack("<i", header[:4])[0]
880
assert request_id == struct.unpack("<i", header[8:12])[0], \
881
"ids don't match %r %r" % (request_id,
882
struct.unpack("<i", header[8:12])[0])
883
assert operation == struct.unpack("<i", header[12:])[0]
885
return self.__receive_data_on_socket(length - 16, sock_info)
887
def __send_and_receive(self, message, sock_info):
888
"""Send a message on the given socket and return the response data.
890
(request_id, data) = self.__check_bson_size(message)
892
sock_info.sock.sendall(data)
893
return self.__receive_message_on_socket(1, request_id, sock_info)
898
# we just ignore _must_use_master here: it's only relevant for
899
# MasterSlaveConnection instances.
900
def _send_message_with_response(self, message,
901
_must_use_master=False, **kwargs):
902
"""Send a message to Mongo and return the response.
904
Sends the given message and returns the response.
907
- `message`: (request_id, data) pair making up the message to send
909
sock_info = self.__socket()
913
if "network_timeout" in kwargs:
914
sock_info.sock.settimeout(kwargs["network_timeout"])
915
return self.__send_and_receive(message, sock_info)
916
except (ConnectionFailure, socket.error), e:
918
raise AutoReconnect(str(e))
920
if "network_timeout" in kwargs:
922
# Restore the socket's original timeout and return it to
924
sock_info.sock.settimeout(self.__net_timeout)
925
self.__pool.maybe_return_socket(sock_info)
927
# There was an exception and we've closed the socket
930
self.__pool.maybe_return_socket(sock_info)
932
def start_request(self):
933
"""Ensure the current thread or greenlet always uses the same socket
934
until it calls :meth:`end_request`. This ensures consistent reads,
935
even if you read after an unacknowledged write.
937
In Python 2.6 and above, or in Python 2.5 with
938
"from __future__ import with_statement", :meth:`start_request` can be
939
used as a context manager:
941
>>> connection = pymongo.MongoClient(auto_start_request=False)
942
>>> db = connection.test
943
>>> _id = db.test_collection.insert({})
944
>>> with connection.start_request():
945
... for i in range(100):
946
... db.test_collection.update({'_id': _id}, {'$set': {'i':i}})
948
... # Definitely read the document after the final update completes
949
... print db.test_collection.find({'_id': _id})
951
If a thread or greenlet calls start_request multiple times, an equal
952
number of calls to :meth:`end_request` is required to end the request.
954
.. versionchanged:: 2.4
955
Now counts the number of calls to start_request and doesn't end
956
request until an equal number of calls to end_request.
958
.. versionadded:: 2.2
959
The :class:`~pymongo.pool.Request` return value.
960
:meth:`start_request` previously returned None
962
self.__pool.start_request()
963
return pool.Request(self)
965
def in_request(self):
966
"""True if this thread is in a request, meaning it has a socket
967
reserved for its exclusive use.
969
return self.__pool.in_request()
971
def end_request(self):
972
"""Undo :meth:`start_request`. If :meth:`end_request` is called as many
973
times as :meth:`start_request`, the request is over and this thread's
974
connection returns to the pool. Extra calls to :meth:`end_request` have
977
Ending a request allows the :class:`~socket.socket` that has
978
been reserved for this thread by :meth:`start_request` to be returned to
979
the pool. Other threads will then be able to re-use that
980
:class:`~socket.socket`. If your application uses many threads, or has
981
long-running threads that infrequently perform MongoDB operations, then
982
judicious use of this method can lead to performance gains. Care should
983
be taken, however, to make sure that :meth:`end_request` is not called
984
in the middle of a sequence of operations in which ordering is
985
important. This could lead to unexpected results.
987
self.__pool.end_request()
989
def __eq__(self, other):
990
if isinstance(other, self.__class__):
991
us = (self.__host, self.__port)
992
them = (other.__host, other.__port)
994
return NotImplemented
996
def __ne__(self, other):
997
return not self == other
1000
if len(self.__nodes) == 1:
1001
return "MongoClient(%r, %r)" % (self.__host, self.__port)
1003
return "MongoClient(%r)" % ["%s:%d" % n for n in self.__nodes]
1005
def __getattr__(self, name):
1006
"""Get a database by name.
1008
Raises :class:`~pymongo.errors.InvalidName` if an invalid
1009
database name is used.
1012
- `name`: the name of the database to get
1014
return database.Database(self, name)
1016
def __getitem__(self, name):
1017
"""Get a database by name.
1019
Raises :class:`~pymongo.errors.InvalidName` if an invalid
1020
database name is used.
1023
- `name`: the name of the database to get
1025
return self.__getattr__(name)
1027
def close_cursor(self, cursor_id):
1028
"""Close a single database cursor.
1030
Raises :class:`TypeError` if `cursor_id` is not an instance of
1031
``(int, long)``. What closing the cursor actually means
1032
depends on this connection's cursor manager.
1035
- `cursor_id`: id of cursor to close
1037
.. seealso:: :meth:`set_cursor_manager` and
1038
the :mod:`~pymongo.cursor_manager` module
1040
if not isinstance(cursor_id, (int, long)):
1041
raise TypeError("cursor_id must be an instance of (int, long)")
1043
self.__cursor_manager.close(cursor_id)
1045
def kill_cursors(self, cursor_ids):
1046
"""Send a kill cursors message with the given ids.
1048
Raises :class:`TypeError` if `cursor_ids` is not an instance of
1052
- `cursor_ids`: list of cursor ids to kill
1054
if not isinstance(cursor_ids, list):
1055
raise TypeError("cursor_ids must be a list")
1056
return self._send_message(
1057
message.kill_cursors(cursor_ids), check_primary=False)
1059
def server_info(self):
1060
"""Get information about the MongoDB server we're connected to.
1062
return self.admin.command("buildinfo")
1064
def database_names(self):
1065
"""Get a list of the names of all databases on the connected server.
1067
return [db["name"] for db in
1068
self.admin.command("listDatabases")["databases"]]
1070
def drop_database(self, name_or_database):
1073
Raises :class:`TypeError` if `name_or_database` is not an instance of
1074
:class:`basestring` (:class:`str` in python 3) or Database.
1077
- `name_or_database`: the name of a database to drop, or a
1078
:class:`~pymongo.database.Database` instance representing the
1081
name = name_or_database
1082
if isinstance(name, database.Database):
1085
if not isinstance(name, basestring):
1086
raise TypeError("name_or_database must be an instance of "
1087
"%s or Database" % (basestring.__name__,))
1089
self._purge_index(name)
1090
self[name].command("dropDatabase")
1092
def copy_database(self, from_name, to_name,
1093
from_host=None, username=None, password=None):
1094
"""Copy a database, potentially from another host.
1096
Raises :class:`TypeError` if `from_name` or `to_name` is not
1097
an instance of :class:`basestring` (:class:`str` in python 3).
1098
Raises :class:`~pymongo.errors.InvalidName` if `to_name` is
1099
not a valid database name.
1101
If `from_host` is ``None`` the current host is used as the
1102
source. Otherwise the database is copied from `from_host`.
1104
If the source database requires authentication, `username` and
1105
`password` must be specified.
1108
- `from_name`: the name of the source database
1109
- `to_name`: the name of the target database
1110
- `from_host` (optional): host name to copy from
1111
- `username` (optional): username for source database
1112
- `password` (optional): password for source database
1114
.. note:: Specifying `username` and `password` requires server
1115
version **>= 1.3.3+**.
1117
.. versionadded:: 1.5
1119
if not isinstance(from_name, basestring):
1120
raise TypeError("from_name must be an instance "
1121
"of %s" % (basestring.__name__,))
1122
if not isinstance(to_name, basestring):
1123
raise TypeError("to_name must be an instance "
1124
"of %s" % (basestring.__name__,))
1126
database._check_name(to_name)
1128
command = {"fromdb": from_name, "todb": to_name}
1130
if from_host is not None:
1131
command["fromhost"] = from_host
1134
self.start_request()
1136
if username is not None:
1137
nonce = self.admin.command("copydbgetnonce",
1138
fromhost=from_host)["nonce"]
1139
command["username"] = username
1140
command["nonce"] = nonce
1141
command["key"] = helpers._auth_key(nonce, username, password)
1143
return self.admin.command("copydb", **command)
1148
def is_locked(self):
1149
"""Is this server locked? While locked, all write operations
1150
are blocked, although read operations may still be allowed.
1151
Use :meth:`unlock` to unlock.
1153
.. versionadded:: 2.0
1155
ops = self.admin.current_op()
1156
return bool(ops.get('fsyncLock', 0))
1158
def fsync(self, **kwargs):
1159
"""Flush all pending writes to datafiles.
1163
Optional parameters can be passed as keyword arguments:
1165
- `lock`: If True lock the server to disallow writes.
1166
- `async`: If True don't block while synchronizing.
1168
.. warning:: `async` and `lock` can not be used together.
1170
.. warning:: MongoDB does not support the `async` option
1171
on Windows and will raise an exception on that
1174
.. versionadded:: 2.0
1176
self.admin.command("fsync", **kwargs)
1179
"""Unlock a previously locked server.
1181
.. versionadded:: 2.0
1183
self.admin['$cmd'].sys.unlock.find_one()
1185
def __enter__(self):
1188
def __exit__(self, exc_type, exc_val, exc_tb):
1195
raise TypeError("'MongoClient' object is not iterable")