59
if sys.platform.startswith('java'):
60
from select import cpython_compatible_select as select
62
from select import select
65
59
MAX_BSON_SIZE = 4 * 1024 * 1024
69
"""Return True if we know socket has been closed, False otherwise.
72
readers, _, _ = select([sock], [], [], 0)
73
# Any exception here is equally bad (select.error, ValueError, etc.).
76
return len(readers) > 0
79
62
def _partition_node(node):
80
63
"""Split a host:port string returned from mongod/s into
81
64
a (host, int(port)) pair needed for socket.connect().
93
class Monitor(threading.Thread):
94
def __init__(self, obj, interval=5):
95
super(Monitor, self).__init__()
96
self.obj = weakref.proxy(obj)
97
self.interval = interval
103
# The connection object has been
104
# collected so we should die.
105
except ReferenceError:
109
time.sleep(self.interval)
79
from gevent import Greenlet
82
class GreenletMonitor(Greenlet):
83
def __init__(self, obj, interval=5):
84
Greenlet.__init__(self)
85
self.obj = weakref.proxy(obj)
86
self.interval = interval
92
# The connection object has been
93
# collected so we should die.
94
except ReferenceError:
98
gevent.sleep(self.interval)
112
104
class ReplicaSetConnection(common.BaseObject):
176
168
- `ssl`: If True, create the connection to the servers using SSL.
177
169
- `read_preference`: The read preference for this connection.
178
170
See :class:`~pymongo.ReadPreference` for available options.
171
- `auto_start_request`: If True (the default), each thread that
172
accesses this :class:`ReplicaSetConnection` has a socket allocated
173
to it for the thread's lifetime, for each member of the set. For
174
:class:`~pymongo.ReadPreference` PRIMARY, auto_start_request=True
175
ensures consistent reads, even if you read after an unsafe
176
write. For read preferences other than PRIMARY, there are no
177
consistency guarantees. (The semantics of auto_start_request,
178
:class:`~pymongo.ReadPreference`, and :class:`ReplicaSetConnection`
179
may change in future releases of PyMongo.)
180
- `use_greenlets` (optional): if ``True``, use a background Greenlet
181
instead of a background thread to monitor state of replica set.
182
:meth:`start_request()` will ensure that the current greenlet uses
183
the same socket for all operations until :meth:`end_request()`.
184
`use_greenlets` with ReplicaSetConnection requires `Gevent
185
<http://gevent.org/>`_ to be installed.
179
186
- `slave_okay` or `slaveOk` (deprecated): Use `read_preference`
188
- `host`: For compatibility with connection.Connection. If both
189
`host` and `hosts_or_uri` are specified `host` takes precedence.
190
- `port`: For compatibility with connection.Connection. The default
191
port number to use for hosts.
192
- `network_timeout`: For compatibility with connection.Connection.
193
The timeout (in seconds) to use for socket operations - default
194
is no timeout. If both `network_timeout` and `socketTimeoutMS` are
195
are specified `network_timeout` takes precedence, matching
196
connection.Connection.
199
.. versionchanged:: 2.2
200
Added `auto_start_request` and `use_greenlets` options.
201
Added support for `host`, `port`, and `network_timeout` keyword
202
arguments for compatibility with connection.Connection.
182
203
.. versionadded:: 2.1
184
self.__max_pool_size = max_pool_size
185
self.__document_class = document_class
186
self.__tz_aware = tz_aware
188
206
self.__seeds = set()
189
207
self.__hosts = None
193
211
self.__pools = {}
194
212
self.__index_cache = {}
195
213
self.__auth_credentials = {}
216
self.__max_pool_size = common.validate_positive_integer(
217
'max_pool_size', max_pool_size)
218
self.__tz_aware = common.validate_boolean('tz_aware', tz_aware)
219
self.__document_class = document_class
221
# Compatibility with connection.Connection
222
host = kwargs.pop('host', hosts_or_uri)
224
port = kwargs.pop('port', 27017)
225
if not isinstance(port, int):
226
raise TypeError("port must be an instance of int")
228
network_timeout = kwargs.pop('network_timeout', None)
229
if network_timeout is not None:
230
if (not isinstance(network_timeout, (int, float)) or
231
network_timeout <= 0):
232
raise ConfigurationError("network_timeout must "
233
"be a positive integer")
198
if hosts_or_uri is None:
199
self.__seeds.add(('localhost', 27017))
200
elif '://' in hosts_or_uri:
201
res = uri_parser.parse_uri(hosts_or_uri)
238
self.__seeds.add(('localhost', port))
240
res = uri_parser.parse_uri(host, port)
202
241
self.__seeds.update(res['nodelist'])
203
242
username = res['username']
204
243
password = res['password']
205
244
db_name = res['database']
206
245
self.__opts = res['options']
208
self.__seeds.update(uri_parser.split_hosts(hosts_or_uri))
247
self.__seeds.update(uri_parser.split_hosts(host, port))
210
249
for option, value in kwargs.iteritems():
211
250
option, value = common.validate(option, value)
212
251
self.__opts[option] = value
253
if self.__opts.get('use_greenlets', False):
255
raise ConfigurationError(
256
"The gevent module is not available. "
257
"Install the gevent package from PyPI."
259
self.pool_class = pool.GreenletPool
261
self.pool_class = pool.Pool
263
self.__auto_start_request = self.__opts.get('auto_start_request', True)
264
self.__in_request = self.__auto_start_request
214
265
self.__name = self.__opts.get('replicaset')
215
266
if not self.__name:
216
267
raise ConfigurationError("the replicaSet "
217
268
"keyword parameter is required.")
218
self.__net_timeout = self.__opts.get('sockettimeoutms')
271
self.__net_timeout = (network_timeout or
272
self.__opts.get('sockettimeoutms'))
219
273
self.__conn_timeout = self.__opts.get('connecttimeoutms')
220
274
self.__use_ssl = self.__opts.get('ssl', False)
221
275
if self.__use_ssl and not pool.have_ssl:
316
392
elif db_name in self.__auth_credentials:
317
393
del self.__auth_credentials[db_name]
319
def __check_auth(self, sock, authset):
395
def __check_auth(self, sock_info):
320
396
"""Authenticate using cached database credentials.
322
398
If credentials for the 'admin' database are available only
323
399
this database is authenticated, since this gives global access.
401
authset = sock_info.authset
325
402
names = set(self.__auth_credentials.iterkeys())
327
404
# Logout from any databases no longer listed in the credentials cache.
328
405
for dbname in authset - names:
330
self.__simple_command(sock, dbname, {'logout': 1})
407
self.__simple_command(sock_info, dbname, {'logout': 1})
331
408
# TODO: We used this socket to logout. Fix logout so we don't
332
409
# have to catch this.
333
410
except OperationFailure:
341
418
if "admin" in self.__auth_credentials:
342
419
username, password = self.__auth_credentials["admin"]
343
self.__auth(sock, 'admin', username, password)
420
self.__auth(sock_info, 'admin', username, password)
344
421
authset.add('admin')
346
423
for db_name in names - authset:
347
424
user, pwd = self.__auth_credentials[db_name]
348
self.__auth(sock, db_name, user, pwd)
425
self.__auth(sock_info, db_name, user, pwd)
349
426
authset.add(db_name)
417
494
return self.__pools[self.__writer]['max_bson_size']
420
def __simple_command(self, sock, dbname, spec):
498
def auto_start_request(self):
499
return self.__auto_start_request
501
def __simple_command(self, sock_info, dbname, spec):
421
502
"""Send a command to the server.
423
504
rqst_id, msg, _ = message.query(0, dbname + '.$cmd', 0, -1, spec)
425
response = self.__recv_msg(1, rqst_id, sock)
505
sock_info.sock.sendall(msg)
506
response = self.__recv_msg(1, rqst_id, sock_info)
426
507
response = helpers._unpack_response(response)['data'][0]
427
508
msg = "command %r failed: %%s" % spec
428
509
helpers._check_command_response(response, None, msg)
431
def __auth(self, sock, dbname, user, passwd):
432
"""Authenticate socket `sock` against database `dbname`.
512
def __auth(self, sock_info, dbname, user, passwd):
513
"""Authenticate socket against database `dbname`.
435
response = self.__simple_command(sock, dbname, {'getnonce': 1})
516
response = self.__simple_command(sock_info, dbname, {'getnonce': 1})
436
517
nonce = response['nonce']
437
518
key = helpers._auth_key(nonce, user, passwd)
439
520
# Actually authenticate
440
521
query = SON([('authenticate', 1),
441
522
('user', user), ('nonce', nonce), ('key', key)])
442
self.__simple_command(sock, dbname, query)
523
self.__simple_command(sock_info, dbname, query)
444
525
def __is_master(self, host):
445
526
"""Directly call ismaster.
447
mongo = pool.Pool(host, self.__max_pool_size,
448
self.__net_timeout, self.__conn_timeout,
450
sock = mongo.get_socket()[0]
451
response = self.__simple_command(sock, 'admin', {'ismaster': 1})
452
return response, mongo
528
pool = self.pool_class(host, self.__max_pool_size,
529
self.__net_timeout, self.__conn_timeout,
531
sock_info = pool.get_socket()
532
response = self.__simple_command(
533
sock_info, 'admin', {'ismaster': 1}
536
pool.return_socket(sock_info)
537
return response, pool
454
539
def __update_pools(self):
455
540
"""Update the mapping of (host, port) pairs to connection pools.
458
543
for host in self.__hosts:
544
mongo, sock_info = None, None
461
546
if host in self.__pools:
462
547
mongo = self.__pools[host]
463
sock = self.__socket(mongo)
464
res = self.__simple_command(sock, 'admin', {'ismaster': 1})
548
sock_info = self.__socket(mongo)
549
res = self.__simple_command(sock_info, 'admin', {'ismaster': 1})
550
mongo['pool'].return_socket(sock_info)
466
552
res, conn = self.__is_master(host)
467
553
bson_max = res.get('maxBsonObjectSize', MAX_BSON_SIZE)
536
623
def __check_is_primary(self, host):
537
624
"""Checks if this host is the primary for the replica set.
626
mongo, sock_info = None, None
541
628
if host in self.__pools:
542
629
mongo = self.__pools[host]
543
sock = self.__socket(mongo)
544
res = self.__simple_command(sock, 'admin', {'ismaster': 1})
630
sock_info = self.__socket(mongo)
631
res = self.__simple_command(
632
sock_info, 'admin', {'ismaster': 1}
546
635
res, conn = self.__is_master(host)
547
636
bson_max = res.get('maxBsonObjectSize', MAX_BSON_SIZE)
585
677
raise AutoReconnect(', '.join(errors))
587
679
def __socket(self, mongo):
588
"""Get a socket from the pool.
590
If it's been > 1 second since the last time we checked out a
591
socket, we also check to see if the socket has been closed -
592
this let's us avoid seeing *some*
593
:class:`~pymongo.errors.AutoReconnect` exceptions on server
594
hiccups, etc. We only do this if it's been > 1 second since
595
the last socket checkout, to keep performance reasonable - we
596
can't avoid those completely anyway.
680
"""Get a SocketInfo from the pool.
598
sock, authset = mongo['pool'].get_socket()
601
if now - mongo['last_checkout'] > 1:
603
mongo['pool'] = pool.Pool(mongo['pool'].host,
604
self.__max_pool_size,
608
sock, authset = mongo['pool'].get_socket()
609
mongo['last_checkout'] = now
610
if self.__auth_credentials or authset:
611
self.__check_auth(sock, authset)
683
if self.__auto_start_request:
684
# No effect if a request already started
687
sock_info = pool.get_socket()
689
if self.__auth_credentials:
690
self.__check_auth(sock_info)
614
693
def disconnect(self):
615
694
"""Disconnect from the replica set primary.
696
mongo = self.__pools.get(self.__writer)
697
if mongo and 'pool' in mongo:
698
mongo['pool'].reset()
617
699
self.__writer = None
719
801
mongo = self.__pools[_connection_to_use]
722
sock = self.__socket(mongo)
805
sock_info = self.__socket(mongo)
723
806
rqst_id, data = self.__check_bson_size(msg,
724
807
mongo['max_bson_size'])
808
sock_info.sock.sendall(data)
726
809
# Safe mode. We pack the message together with a lastError
727
810
# message and send both. We then get the response (to the
728
811
# lastError) and raise OperationFailure if it is an error
731
response = self.__recv_msg(1, rqst_id, sock)
732
return self.__check_response_to_last_error(response)
815
response = self.__recv_msg(1, rqst_id, sock_info)
816
rv = self.__check_response_to_last_error(response)
817
mongo['pool'].return_socket(sock_info)
734
819
except(ConnectionFailure, socket.error), why:
735
mongo['pool'].discard_socket()
820
mongo['pool'].discard_socket(sock_info)
736
821
if _connection_to_use in (None, -1):
737
822
self.disconnect()
738
823
raise AutoReconnect(str(why))
740
mongo['pool'].discard_socket()
825
mongo['pool'].discard_socket(sock_info)
743
mongo['pool'].return_socket()
745
828
def __send_and_receive(self, mongo, msg, **kwargs):
746
829
"""Send a message on the given socket and return the response data.
749
sock = self.__socket(mongo)
833
sock_info = self.__socket(mongo)
750
835
if "network_timeout" in kwargs:
751
sock.settimeout(kwargs['network_timeout'])
836
sock_info.sock.settimeout(kwargs['network_timeout'])
753
838
rqst_id, data = self.__check_bson_size(msg,
754
839
mongo['max_bson_size'])
756
response = self.__recv_msg(1, rqst_id, sock)
840
sock_info.sock.sendall(data)
841
response = self.__recv_msg(1, rqst_id, sock_info)
758
843
if "network_timeout" in kwargs:
759
sock.settimeout(self.__net_timeout)
760
mongo['pool'].return_socket()
844
sock_info.sock.settimeout(self.__net_timeout)
845
mongo['pool'].return_socket(sock_info)
763
848
except (ConnectionFailure, socket.error), why:
764
host, port = mongo['pool'].host
765
mongo['pool'].discard_socket()
849
host, port = mongo['pool'].pair
850
mongo['pool'].discard_socket(sock_info)
766
851
raise AutoReconnect("%s:%d: %s" % (host, port, str(why)))
768
mongo['pool'].discard_socket()
853
mongo['pool'].discard_socket(sock_info)
771
856
def _send_message_with_response(self, msg, _connection_to_use=None,
785
870
mongo = self.__find_primary()
787
872
mongo = self.__pools[_connection_to_use]
788
return mongo['pool'].host, self.__send_and_receive(mongo,
873
return mongo['pool'].pair, self.__send_and_receive(mongo,
791
876
elif _must_use_master or not read_pref:
792
877
mongo = self.__find_primary()
793
return mongo['pool'].host, self.__send_and_receive(mongo,
878
return mongo['pool'].pair, self.__send_and_receive(mongo,
796
881
except AutoReconnect:
797
if mongo == self.__writer:
882
if mongo == self.__pools.get(self.__writer):
798
883
self.disconnect()
809
894
if read_pref == ReadPreference.SECONDARY:
811
896
mongo = self.__find_primary()
812
return mongo['pool'].host, self.__send_and_receive(mongo,
897
return mongo['pool'].pair, self.__send_and_receive(mongo,
815
900
except AutoReconnect, why:
816
901
self.disconnect()
902
errors.append(str(why))
818
903
raise AutoReconnect(', '.join(errors))
820
def __cmp__(self, other):
905
def start_request(self):
906
"""Ensure the current thread or greenlet always uses the same socket
907
until it calls :meth:`end_request`. For
908
:class:`~pymongo.ReadPreference` PRIMARY, auto_start_request=True
909
ensures consistent reads, even if you read after an unsafe write. For
910
read preferences other than PRIMARY, there are no consistency
913
In Python 2.6 and above, or in Python 2.5 with
914
"from __future__ import with_statement", :meth:`start_request` can be
915
used as a context manager:
917
>>> connection = pymongo.ReplicaSetConnection(auto_start_request=False)
918
>>> db = connection.test
919
>>> _id = db.test_collection.insert({}, safe=True)
920
>>> with connection.start_request():
921
... for i in range(100):
922
... db.test_collection.update({'_id': _id}, {'$set': {'i':i}})
924
... # Definitely read the document after the final update completes
925
... print db.test_collection.find({'_id': _id})
927
.. versionadded:: 2.2
928
The :class:`~pymongo.pool.Request` return value.
929
:meth:`start_request` previously returned None
931
for mongo in self.__pools.values():
933
mongo['pool'].start_request()
935
self.__in_request = True
936
return pool.Request(self)
938
def in_request(self):
939
"""True if :meth:`start_request` has been called, but not
940
:meth:`end_request`, or if `auto_start_request` is True and
941
:meth:`end_request` has not been called in this thread or greenlet.
943
return self.__in_request
945
def end_request(self):
946
"""Undo :meth:`start_request` and allow this thread's connections to
947
replica set members to return to the pool.
949
Calling :meth:`end_request` allows the :class:`~socket.socket` that has
950
been reserved for this thread by :meth:`start_request` to be returned
951
to the pool. Other threads will then be able to re-use that
952
:class:`~socket.socket`. If your application uses many threads, or has
953
long-running threads that infrequently perform MongoDB operations, then
954
judicious use of this method can lead to performance gains. Care should
955
be taken, however, to make sure that :meth:`end_request` is not called
956
in the middle of a sequence of operations in which ordering is
957
important. This could lead to unexpected results.
959
for mongo in self.__pools.values():
961
mongo['pool'].end_request()
963
self.__in_request = False
965
def __eq__(self, other):
821
966
# XXX: Implement this?
822
967
return NotImplemented
933
1080
if from_host is not None:
934
1081
command["fromhost"] = from_host
936
if username is not None:
937
nonce = self.admin.command("copydbgetnonce",
938
fromhost=from_host)["nonce"]
939
command["username"] = username
940
command["nonce"] = nonce
941
command["key"] = helpers._auth_key(nonce, username, password)
1083
in_request = self.in_request()
1086
self.start_request()
1087
if username is not None:
1088
nonce = self.admin.command("copydbgetnonce",
1089
fromhost=from_host)["nonce"]
1090
command["username"] = username
1091
command["nonce"] = nonce
1092
command["key"] = helpers._auth_key(nonce, username, password)
943
return self.admin.command("copydb", **command)
1094
return self.admin.command("copydb", **command)