2
* Copyright (C) 2013 MongoDB Inc.
4
* This program is free software: you can redistribute it and/or modify
5
* it under the terms of the GNU Affero General Public License, version 3,
6
* as published by the Free Software Foundation.
8
* This program is distributed in the hope that it will be useful,
9
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
* GNU Affero General Public License for more details.
13
* You should have received a copy of the GNU Affero General Public License
14
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16
* As a special exception, the copyright holders give permission to link the
17
* code of portions of this program with the OpenSSL library under certain
18
* conditions as described in each individual source file and distribute
19
* linked combinations including the program with the OpenSSL library. You
20
* must comply with the GNU Affero General Public License in all respects for
21
* all of the code used other than as permitted herein. If you modify file(s)
22
* with this exception, you may extend this exception to your version of the
23
* file(s), but you are not obligated to do so. If you do not wish to do so,
24
* delete this exception statement from your version. If you delete this
25
* exception statement from all source files in the program, then also delete
26
* it in the license file.
29
#include "mongo/s/dbclient_multi_command.h"
31
#include "mongo/bson/mutable/document.h"
32
#include "mongo/db/audit.h"
33
#include "mongo/db/client_basic.h"
34
#include "mongo/db/dbmessage.h"
35
#include "mongo/db/wire_version.h"
36
#include "mongo/s/shard.h"
37
#include "mongo/db/server_parameters.h"
38
#include "mongo/s/write_ops/batch_downconvert.h"
39
#include "mongo/s/write_ops/dbclient_safe_writer.h"
40
#include "mongo/util/net/message.h"
44
// Can force the write mode used for the shards to not uses commands, even if avail.
45
MONGO_EXPORT_SERVER_PARAMETER( _forceLegacyShardWriteMode, bool, false );
47
DBClientMultiCommand::PendingCommand::PendingCommand( const ConnectionString& endpoint,
48
const StringData& dbName,
49
const BSONObj& cmdObj ) :
51
dbName( dbName.toString() ),
54
status( Status::OK() ) {
57
void DBClientMultiCommand::addCommand( const ConnectionString& endpoint,
58
const StringData& dbName,
59
const BSONSerializable& request ) {
60
PendingCommand* command = new PendingCommand( endpoint, dbName, request.toBSON() );
61
_pendingCommands.push_back( command );
67
// Stuff we need for batch downconversion
68
// TODO: Remove post-2.6
71
BatchedCommandRequest::BatchType getBatchWriteType( const BSONObj& cmdObj ) {
72
string cmdName = cmdObj.firstElement().fieldName();
73
if ( cmdName == "insert" ) return BatchedCommandRequest::BatchType_Insert;
74
if ( cmdName == "update" ) return BatchedCommandRequest::BatchType_Update;
75
if ( cmdName == "delete" ) return BatchedCommandRequest::BatchType_Delete;
76
return BatchedCommandRequest::BatchType_Unknown;
79
bool isBatchWriteCommand( const BSONObj& cmdObj ) {
80
return getBatchWriteType( cmdObj ) != BatchedCommandRequest::BatchType_Unknown;
83
bool hasBatchWriteFeature( DBClientBase* conn ) {
84
return !_forceLegacyShardWriteMode
85
&& conn->getMinWireVersion() <= BATCH_COMMANDS
86
&& conn->getMaxWireVersion() >= BATCH_COMMANDS;
90
* Parses and re-BSON's a batch write command in order to send it as a set of safe writes.
92
void legacySafeWrite( DBClientBase* conn,
93
const StringData& dbName,
94
const BSONObj& cmdRequest,
95
BSONObj* cmdResponse ) {
97
// Translate from BSON
98
BatchedCommandRequest request( getBatchWriteType( cmdRequest ) );
100
// This should *always* parse correctly
101
bool parsed = request.parseBSON( cmdRequest, NULL );
102
(void) parsed; // for non-debug compile
103
dassert( parsed && request.isValid( NULL ) );
105
// Collection name is sent without db to the dispatcher
106
request.setNS( dbName.toString() + "." + request.getNS() );
108
DBClientSafeWriter safeWriter;
109
BatchSafeWriter batchSafeWriter( &safeWriter );
110
BatchedCommandResponse response;
111
batchSafeWriter.safeWriteBatch( conn, request, &response );
114
dassert( response.isValid( NULL ) );
115
*cmdResponse = response.toBSON();
120
static void sayAsCmd( DBClientBase* conn, const StringData& dbName, const BSONObj& cmdObj ) {
122
BSONObjBuilder usersBuilder;
123
usersBuilder.appendElements(cmdObj);
124
audit::appendImpersonatedUsers(&usersBuilder);
126
// see query.h for the protocol we are using here.
128
bufB.appendNum( 0 ); // command/query options
129
bufB.appendStr( dbName.toString() + ".$cmd" ); // write command ns
130
bufB.appendNum( 0 ); // ntoskip (0 for command)
131
bufB.appendNum( 1 ); // ntoreturn (1 for command)
132
usersBuilder.obj().appendSelfToBufBuilder( bufB );
133
toSend.setData( dbQuery, bufB.buf(), bufB.len() );
140
static void recvAsCmd( DBClientBase* conn, Message* toRecv, BSONObj* result ) {
142
if ( !conn->recv( *toRecv ) ) {
143
// Confusingly, socket exceptions here are written to the log, not thrown.
144
uasserted( 17255, "error receiving write command response, "
145
"possible socket exception - see logs" );
148
// A query result is returned from commands
149
QueryResult* recvdQuery = reinterpret_cast<QueryResult*>( toRecv->singleData() );
150
*result = BSONObj( recvdQuery->data() );
153
void DBClientMultiCommand::sendAll() {
155
for ( deque<PendingCommand*>::iterator it = _pendingCommands.begin();
156
it != _pendingCommands.end(); ++it ) {
158
PendingCommand* command = *it;
159
dassert( NULL == command->conn );
162
dassert( command->endpoint.type() == ConnectionString::MASTER ||
163
command->endpoint.type() == ConnectionString::CUSTOM );
165
// TODO: Fix the pool up to take millis directly
166
int timeoutSecs = _timeoutMillis / 1000;
167
command->conn = shardConnectionPool.get( command->endpoint, timeoutSecs );
169
if ( hasBatchWriteFeature( command->conn )
170
|| !isBatchWriteCommand( command->cmdObj ) ) {
171
// Do normal command dispatch
172
sayAsCmd( command->conn, command->dbName, command->cmdObj );
175
// Sending a batch as safe writes necessarily blocks, so we can't do anything
176
// here. Instead we do the safe writes in recvAny(), which can block.
179
catch ( const DBException& ex ) {
180
command->status = ex.toStatus();
182
if ( NULL != command->conn ) {
184
// Confusingly, the pool needs to know about failed connections so that it can
185
// invalidate other connections which might be bad. But if the connection
186
// doesn't seem bad, don't send it back, because we don't want to reuse it.
187
if ( !command->conn->isFailed() ) {
188
delete command->conn;
191
shardConnectionPool.release( command->endpoint.toString(), command->conn );
194
command->conn = NULL;
200
int DBClientMultiCommand::numPending() const {
201
return static_cast<int>( _pendingCommands.size() );
204
Status DBClientMultiCommand::recvAny( ConnectionString* endpoint, BSONSerializable* response ) {
206
scoped_ptr<PendingCommand> command( _pendingCommands.front() );
207
_pendingCommands.pop_front();
209
*endpoint = command->endpoint;
210
if ( !command->status.isOK() ) return command->status;
212
dassert( NULL != command->conn );
216
// Holds the data and BSONObj for the command result
220
if ( hasBatchWriteFeature( command->conn )
221
|| !isBatchWriteCommand( command->cmdObj ) ) {
222
// Recv data from command sent earlier
223
recvAsCmd( command->conn, &toRecv, &result );
226
// We can safely block in recvAny, so dispatch writes as safe writes for hosts
227
// that don't understand batch write commands.
228
legacySafeWrite( command->conn, command->dbName, command->cmdObj, &result );
231
shardConnectionPool.release( command->endpoint.toString(), command->conn );
232
command->conn = NULL;
235
if ( !response->parseBSON( result, &errMsg ) || !response->isValid( &errMsg ) ) {
236
return Status( ErrorCodes::FailedToParse, errMsg );
239
catch ( const DBException& ex ) {
241
// Confusingly, the pool needs to know about failed connections so that it can
242
// invalidate other connections which might be bad. But if the connection doesn't seem
243
// bad, don't send it back, because we don't want to reuse it.
244
if ( !command->conn->isFailed() ) {
245
delete command->conn;
248
shardConnectionPool.release( command->endpoint.toString(), command->conn );
250
command->conn = NULL;
252
return ex.toStatus();
258
DBClientMultiCommand::~DBClientMultiCommand() {
260
// Cleanup anything outstanding, do *not* return stuff to the pool, that might error
261
for ( deque<PendingCommand*>::iterator it = _pendingCommands.begin();
262
it != _pendingCommands.end(); ++it ) {
264
PendingCommand* command = *it;
266
if ( NULL != command->conn ) delete command->conn;
271
_pendingCommands.clear();
274
void DBClientMultiCommand::setTimeoutMillis( int milliSecs ) {
275
_timeoutMillis = milliSecs;