1
# Copyright 2009-2014 MongoDB, Inc.
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
7
# http://www.apache.org/licenses/LICENSE-2.0
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
15
"""Master-Slave connection to Mongo.
17
Performs all writes to Master instance and distributes reads among all
18
slaves. Reads are tried on each slave in turn until the read succeeds
22
from pymongo import helpers, thread_util
23
from pymongo import ReadPreference
24
from pymongo.common import BaseObject
25
from pymongo.mongo_client import MongoClient
26
from pymongo.database import Database
27
from pymongo.errors import AutoReconnect
30
class MasterSlaveConnection(BaseObject):
31
"""A master-slave connection to Mongo.
34
def __init__(self, master, slaves=[], document_class=dict, tz_aware=False):
35
"""Create a new Master-Slave connection.
37
The resultant connection should be interacted with using the same
38
mechanisms as a regular `MongoClient`. The `MongoClient` instances used
39
to create this `MasterSlaveConnection` can themselves make use of
40
connection pooling, etc. `MongoClient` instances used as slaves should
41
be created with the read_preference option set to
42
:attr:`~pymongo.read_preferences.ReadPreference.SECONDARY`. Write
43
concerns are inherited from `master` and can be changed in this
46
Raises TypeError if `master` is not an instance of `MongoClient` or
47
slaves is not a list of at least one `MongoClient` instances.
50
- `master`: `MongoClient` instance for the writable Master
51
- `slaves`: list of `MongoClient` instances for the
53
- `document_class` (optional): default class to use for
54
documents returned from queries on this connection
55
- `tz_aware` (optional): if ``True``,
56
:class:`~datetime.datetime` instances returned as values
57
in a document by this :class:`MasterSlaveConnection` will be timezone
58
aware (otherwise they will be naive)
60
if not isinstance(master, MongoClient):
61
raise TypeError("master must be a MongoClient instance")
62
if not isinstance(slaves, list) or len(slaves) == 0:
63
raise TypeError("slaves must be a list of length >= 1")
66
if not isinstance(slave, MongoClient):
67
raise TypeError("slave %r is not an instance of MongoClient" %
70
super(MasterSlaveConnection,
71
self).__init__(read_preference=ReadPreference.SECONDARY,
73
**master.write_concern)
75
self.__master = master
76
self.__slaves = slaves
77
self.__document_class = document_class
78
self.__tz_aware = tz_aware
79
self.__request_counter = thread_util.Counter(master.use_greenlets)
91
"""If this MasterSlaveConnection is connected to mongos (always False)
98
def use_greenlets(self):
99
"""Whether calling :meth:`start_request` assigns greenlet-local,
100
rather than thread-local, sockets.
102
.. versionadded:: 2.4.2
104
return self.master.use_greenlets
106
def get_document_class(self):
107
return self.__document_class
109
def set_document_class(self, klass):
110
self.__document_class = klass
112
document_class = property(get_document_class, set_document_class,
113
doc="""Default class to use for documents
114
returned on this connection.""")
118
return self.__tz_aware
121
def max_bson_size(self):
122
"""Return the maximum size BSON object the connected master
123
accepts in bytes. Defaults to 4MB in server < 1.7.4.
125
.. versionadded:: 2.6
127
return self.master.max_bson_size
130
def max_message_size(self):
131
"""Return the maximum message size the connected master
134
.. versionadded:: 2.6
136
return self.master.max_message_size
139
def min_wire_version(self):
140
"""The minWireVersion reported by the server.
142
Returns ``0`` when connected to server versions prior to MongoDB 2.6.
144
.. versionadded:: 2.7
146
return self.master.min_wire_version
149
def max_wire_version(self):
150
"""The maxWireVersion reported by the server.
152
Returns ``0`` when connected to server versions prior to MongoDB 2.6.
154
.. versionadded:: 2.7
156
return self.master.max_wire_version
159
def max_write_batch_size(self):
160
"""The maxWriteBatchSize reported by the server.
162
Returns a default value when connected to server versions prior to
165
.. versionadded:: 2.7
167
return self.master.max_write_batch_size
169
def disconnect(self):
170
"""Disconnect from MongoDB.
172
Disconnecting will call disconnect on all master and slave
175
.. seealso:: Module :mod:`~pymongo.mongo_client`
176
.. versionadded:: 1.10.1
178
self.__master.disconnect()
179
for slave in self.__slaves:
182
def set_cursor_manager(self, manager_class):
183
"""Set the cursor manager for this connection.
185
Helper to set cursor manager for each individual `MongoClient` instance
186
that make up this `MasterSlaveConnection`.
188
self.__master.set_cursor_manager(manager_class)
189
for slave in self.__slaves:
190
slave.set_cursor_manager(manager_class)
192
def _ensure_connected(self, sync):
193
"""Ensure the master is connected to a mongod/s.
195
self.__master._ensure_connected(sync)
197
# _connection_to_use is a hack that we need to include to make sure
198
# that killcursor operations can be sent to the same instance on which
199
# the cursor actually resides...
200
def _send_message(self, message,
201
with_last_error=False,
202
command=False, _connection_to_use=None):
203
"""Say something to Mongo.
205
Sends a message on the Master connection. This is used for inserts,
206
updates, and deletes.
208
Raises ConnectionFailure if the message cannot be sent. Returns the
209
request id of the sent message.
212
- `operation`: opcode of the message
213
- `data`: data to send
214
- `safe`: perform a getLastError after sending the message
216
if _connection_to_use is None or _connection_to_use == -1:
217
return self.__master._send_message(message,
218
with_last_error, command)
219
return self.__slaves[_connection_to_use]._send_message(
220
message, with_last_error, command, check_primary=False)
222
# _connection_to_use is a hack that we need to include to make sure
223
# that getmore operations can be sent to the same instance on which
224
# the cursor actually resides...
225
def _send_message_with_response(self, message, _connection_to_use=None,
226
_must_use_master=False, **kwargs):
227
"""Receive a message from Mongo.
229
Sends the given message and returns a (connection_id, response) pair.
232
- `operation`: opcode of the message to send
233
- `data`: data to send
235
if _connection_to_use is not None:
236
if _connection_to_use == -1:
237
member = self.__master
240
member = self.__slaves[_connection_to_use]
241
conn = _connection_to_use
243
member._send_message_with_response(message, **kwargs)[1])
245
# _must_use_master is set for commands, which must be sent to the
246
# master instance. any queries in a request must be sent to the
247
# master since that is where writes go.
248
if _must_use_master or self.in_request():
249
return (-1, self.__master._send_message_with_response(message,
252
# Iterate through the slaves randomly until we have success. Raise
253
# reconnect if they all fail.
254
for connection_id in helpers.shuffled(xrange(len(self.__slaves))):
256
slave = self.__slaves[connection_id]
257
return (connection_id,
258
slave._send_message_with_response(message,
260
except AutoReconnect:
263
raise AutoReconnect("failed to connect to slaves")
265
def start_request(self):
266
"""Start a "request".
268
Start a sequence of operations in which order matters. Note
269
that all operations performed within a request will be sent
270
using the Master connection.
272
self.__request_counter.inc()
273
self.master.start_request()
275
def in_request(self):
276
return bool(self.__request_counter.get())
278
def end_request(self):
279
"""End the current "request".
281
See documentation for `MongoClient.end_request`.
283
self.__request_counter.dec()
284
self.master.end_request()
286
def __eq__(self, other):
287
if isinstance(other, MasterSlaveConnection):
288
us = (self.__master, self.slaves)
289
them = (other.__master, other.__slaves)
291
return NotImplemented
293
def __ne__(self, other):
294
return not self == other
297
return "MasterSlaveConnection(%r, %r)" % (self.__master, self.__slaves)
299
def __getattr__(self, name):
300
"""Get a database by name.
302
Raises InvalidName if an invalid database name is used.
305
- `name`: the name of the database to get
307
return Database(self, name)
309
def __getitem__(self, name):
310
"""Get a database by name.
312
Raises InvalidName if an invalid database name is used.
315
- `name`: the name of the database to get
317
return self.__getattr__(name)
319
def close_cursor(self, cursor_id, connection_id):
320
"""Close a single database cursor.
322
Raises TypeError if cursor_id is not an instance of (int, long). What
323
closing the cursor actually means depends on this connection's cursor
327
- `cursor_id`: cursor id to close
328
- `connection_id`: id of the `MongoClient` instance where the cursor
331
if connection_id == -1:
332
return self.__master.close_cursor(cursor_id)
333
return self.__slaves[connection_id].close_cursor(cursor_id)
335
def database_names(self):
336
"""Get a list of all database names.
338
return self.__master.database_names()
340
def drop_database(self, name_or_database):
344
- `name_or_database`: the name of a database to drop or the object
347
return self.__master.drop_database(name_or_database)
353
raise TypeError("'MasterSlaveConnection' object is not iterable")
355
def _cached(self, database_name, collection_name, index_name):
356
return self.__master._cached(database_name,
357
collection_name, index_name)
359
def _cache_index(self, database_name, collection_name,
360
index_name, cache_for):
361
return self.__master._cache_index(database_name, collection_name,
362
index_name, cache_for)
364
def _purge_index(self, database_name,
365
collection_name=None, index_name=None):
366
return self.__master._purge_index(database_name,