~ubuntu-branches/ubuntu/utopic/mongodb/utopic

« back to all changes in this revision

Viewing changes to src/mongo/s/dbclient_multi_command.cpp

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-07-03 09:23:46 UTC
  • mfrom: (1.3.10) (44.1.14 sid)
  • Revision ID: package-import@ubuntu.com-20140703092346-c5bvt46wnzougyly
Tags: 1:2.6.3-0ubuntu1
* New upstream stable release:
  - Dropped patches, included upstream:
    + 0003-All-platforms-but-Windows-find-hash-in-std-tr1.patch
    + 0008-Use-system-libstemmer.patch
    + 0011-Use-a-signed-char-to-store-BSONType-enumerations.patch
    + 0001-SERVER-12064-Atomic-operations-for-gcc-non-Intel-arc.patch
    + 0002-SERVER-12065-Support-ARM-and-AArch64-builds.patch
  - d/p/*: Refreshed/rebased remaining patches.
  - Use system provided libyaml-cpp:
    + d/control: Add libyaml-cpp-dev to BD's.
    + d/rules: Enable --with-system-yaml option.
    + d/p/fix-yaml-detection.patch: Fix detection of libyaml-cpp library.
  - d/mongodb-server.mongodb.upstart: Sync changes from upstream.
  - d/control,mongodb-dev.*: Drop mongodb-dev package; it has no reverse
    dependencies and upstream no longer install header files.
  - d/NEWS: Point users to upstream upgrade documentation for upgrades
    from 2.4 to 2.6.
* Merge from Debian unstable.
* d/control: BD on libv8-3.14-dev to ensure that transitioning to new v8
  versions is a explicit action due to changes in behaviour in >= 3.25
  (LP: #1295723).
* d/mongodb-server.prerm: Dropped debug echo call from maintainer script
  (LP: #1294455).

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/**
 
2
 *    Copyright (C) 2013 MongoDB Inc.
 
3
 *
 
4
 *    This program is free software: you can redistribute it and/or  modify
 
5
 *    it under the terms of the GNU Affero General Public License, version 3,
 
6
 *    as published by the Free Software Foundation.
 
7
 *
 
8
 *    This program is distributed in the hope that it will be useful,
 
9
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
10
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
11
 *    GNU Affero General Public License for more details.
 
12
 *
 
13
 *    You should have received a copy of the GNU Affero General Public License
 
14
 *    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
15
 *
 
16
 *    As a special exception, the copyright holders give permission to link the
 
17
 *    code of portions of this program with the OpenSSL library under certain
 
18
 *    conditions as described in each individual source file and distribute
 
19
 *    linked combinations including the program with the OpenSSL library. You
 
20
 *    must comply with the GNU Affero General Public License in all respects for
 
21
 *    all of the code used other than as permitted herein. If you modify file(s)
 
22
 *    with this exception, you may extend this exception to your version of the
 
23
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 
24
 *    delete this exception statement from your version. If you delete this
 
25
 *    exception statement from all source files in the program, then also delete
 
26
 *    it in the license file.
 
27
 */
 
28
 
 
29
#include "mongo/s/dbclient_multi_command.h"
 
30
 
 
31
#include "mongo/bson/mutable/document.h"
 
32
#include "mongo/db/audit.h"
 
33
#include "mongo/db/client_basic.h"
 
34
#include "mongo/db/dbmessage.h"
 
35
#include "mongo/db/wire_version.h"
 
36
#include "mongo/s/shard.h"
 
37
#include "mongo/db/server_parameters.h"
 
38
#include "mongo/s/write_ops/batch_downconvert.h"
 
39
#include "mongo/s/write_ops/dbclient_safe_writer.h"
 
40
#include "mongo/util/net/message.h"
 
41
 
 
42
namespace mongo {
 
43
 
 
44
    // Can force the write mode used for the shards to not uses commands, even if avail.
 
45
    MONGO_EXPORT_SERVER_PARAMETER( _forceLegacyShardWriteMode, bool, false );
 
46
 
 
47
    DBClientMultiCommand::PendingCommand::PendingCommand( const ConnectionString& endpoint,
 
48
                                                          const StringData& dbName,
 
49
                                                          const BSONObj& cmdObj ) :
 
50
        endpoint( endpoint ),
 
51
        dbName( dbName.toString() ),
 
52
        cmdObj( cmdObj ),
 
53
        conn( NULL ),
 
54
        status( Status::OK() ) {
 
55
    }
 
56
 
 
57
    void DBClientMultiCommand::addCommand( const ConnectionString& endpoint,
 
58
                                           const StringData& dbName,
 
59
                                           const BSONSerializable& request ) {
 
60
        PendingCommand* command = new PendingCommand( endpoint, dbName, request.toBSON() );
 
61
        _pendingCommands.push_back( command );
 
62
    }
 
63
 
 
64
    namespace {
 
65
 
 
66
        //
 
67
        // Stuff we need for batch downconversion
 
68
        // TODO: Remove post-2.6
 
69
        //
 
70
 
 
71
        BatchedCommandRequest::BatchType getBatchWriteType( const BSONObj& cmdObj ) {
 
72
            string cmdName = cmdObj.firstElement().fieldName();
 
73
            if ( cmdName == "insert" ) return BatchedCommandRequest::BatchType_Insert;
 
74
            if ( cmdName == "update" ) return BatchedCommandRequest::BatchType_Update;
 
75
            if ( cmdName == "delete" ) return BatchedCommandRequest::BatchType_Delete;
 
76
            return BatchedCommandRequest::BatchType_Unknown;
 
77
        }
 
78
 
 
79
        bool isBatchWriteCommand( const BSONObj& cmdObj ) {
 
80
            return getBatchWriteType( cmdObj ) != BatchedCommandRequest::BatchType_Unknown;
 
81
        }
 
82
 
 
83
        bool hasBatchWriteFeature( DBClientBase* conn ) {
 
84
            return !_forceLegacyShardWriteMode
 
85
                   && conn->getMinWireVersion() <= BATCH_COMMANDS
 
86
                   && conn->getMaxWireVersion() >= BATCH_COMMANDS;
 
87
        }
 
88
 
 
89
        /**
 
90
         * Parses and re-BSON's a batch write command in order to send it as a set of safe writes.
 
91
         */
 
92
        void legacySafeWrite( DBClientBase* conn,
 
93
                              const StringData& dbName,
 
94
                              const BSONObj& cmdRequest,
 
95
                              BSONObj* cmdResponse ) {
 
96
 
 
97
            // Translate from BSON
 
98
            BatchedCommandRequest request( getBatchWriteType( cmdRequest ) );
 
99
 
 
100
            // This should *always* parse correctly
 
101
            bool parsed = request.parseBSON( cmdRequest, NULL );
 
102
            (void) parsed; // for non-debug compile
 
103
            dassert( parsed && request.isValid( NULL ) );
 
104
 
 
105
            // Collection name is sent without db to the dispatcher
 
106
            request.setNS( dbName.toString() + "." + request.getNS() );
 
107
 
 
108
            DBClientSafeWriter safeWriter;
 
109
            BatchSafeWriter batchSafeWriter( &safeWriter );
 
110
            BatchedCommandResponse response;
 
111
            batchSafeWriter.safeWriteBatch( conn, request, &response );
 
112
 
 
113
            // Back to BSON
 
114
            dassert( response.isValid( NULL ) );
 
115
            *cmdResponse = response.toBSON();
 
116
        }
 
117
    }
 
118
 
 
119
    // THROWS
 
120
    static void sayAsCmd( DBClientBase* conn, const StringData& dbName, const BSONObj& cmdObj ) {
 
121
        Message toSend;
 
122
        BSONObjBuilder usersBuilder;
 
123
        usersBuilder.appendElements(cmdObj);
 
124
        audit::appendImpersonatedUsers(&usersBuilder);
 
125
        
 
126
        // see query.h for the protocol we are using here.
 
127
        BufBuilder bufB;
 
128
        bufB.appendNum( 0 ); // command/query options
 
129
        bufB.appendStr( dbName.toString() + ".$cmd" ); // write command ns
 
130
        bufB.appendNum( 0 ); // ntoskip (0 for command)
 
131
        bufB.appendNum( 1 ); // ntoreturn (1 for command)
 
132
        usersBuilder.obj().appendSelfToBufBuilder( bufB );
 
133
        toSend.setData( dbQuery, bufB.buf(), bufB.len() );
 
134
 
 
135
        // Send our command
 
136
        conn->say( toSend );
 
137
    }
 
138
 
 
139
    // THROWS
 
140
    static void recvAsCmd( DBClientBase* conn, Message* toRecv, BSONObj* result ) {
 
141
 
 
142
        if ( !conn->recv( *toRecv ) ) {
 
143
            // Confusingly, socket exceptions here are written to the log, not thrown.
 
144
            uasserted( 17255, "error receiving write command response, "
 
145
                       "possible socket exception - see logs" );
 
146
        }
 
147
 
 
148
        // A query result is returned from commands
 
149
        QueryResult* recvdQuery = reinterpret_cast<QueryResult*>( toRecv->singleData() );
 
150
        *result = BSONObj( recvdQuery->data() );
 
151
    }
 
152
 
 
153
    void DBClientMultiCommand::sendAll() {
 
154
 
 
155
        for ( deque<PendingCommand*>::iterator it = _pendingCommands.begin();
 
156
            it != _pendingCommands.end(); ++it ) {
 
157
 
 
158
            PendingCommand* command = *it;
 
159
            dassert( NULL == command->conn );
 
160
 
 
161
            try {
 
162
                dassert( command->endpoint.type() == ConnectionString::MASTER ||
 
163
                    command->endpoint.type() == ConnectionString::CUSTOM );
 
164
 
 
165
                // TODO: Fix the pool up to take millis directly
 
166
                int timeoutSecs = _timeoutMillis / 1000;
 
167
                command->conn = shardConnectionPool.get( command->endpoint, timeoutSecs );
 
168
 
 
169
                if ( hasBatchWriteFeature( command->conn )
 
170
                     || !isBatchWriteCommand( command->cmdObj ) ) {
 
171
                    // Do normal command dispatch
 
172
                    sayAsCmd( command->conn, command->dbName, command->cmdObj );
 
173
                }
 
174
                else {
 
175
                    // Sending a batch as safe writes necessarily blocks, so we can't do anything
 
176
                    // here.  Instead we do the safe writes in recvAny(), which can block.
 
177
                }
 
178
            }
 
179
            catch ( const DBException& ex ) {
 
180
                command->status = ex.toStatus();
 
181
 
 
182
                if ( NULL != command->conn ) {
 
183
 
 
184
                    // Confusingly, the pool needs to know about failed connections so that it can
 
185
                    // invalidate other connections which might be bad.  But if the connection
 
186
                    // doesn't seem bad, don't send it back, because we don't want to reuse it.
 
187
                    if ( !command->conn->isFailed() ) {
 
188
                        delete command->conn;
 
189
                    }
 
190
                    else {
 
191
                        shardConnectionPool.release( command->endpoint.toString(), command->conn );
 
192
                    }
 
193
 
 
194
                    command->conn = NULL;
 
195
                }
 
196
            }
 
197
        }
 
198
    }
 
199
 
 
200
    int DBClientMultiCommand::numPending() const {
 
201
        return static_cast<int>( _pendingCommands.size() );
 
202
    }
 
203
 
 
204
    Status DBClientMultiCommand::recvAny( ConnectionString* endpoint, BSONSerializable* response ) {
 
205
 
 
206
        scoped_ptr<PendingCommand> command( _pendingCommands.front() );
 
207
        _pendingCommands.pop_front();
 
208
 
 
209
        *endpoint = command->endpoint;
 
210
        if ( !command->status.isOK() ) return command->status;
 
211
 
 
212
        dassert( NULL != command->conn );
 
213
 
 
214
        try {
 
215
 
 
216
            // Holds the data and BSONObj for the command result
 
217
            Message toRecv;
 
218
            BSONObj result;
 
219
 
 
220
            if ( hasBatchWriteFeature( command->conn )
 
221
                 || !isBatchWriteCommand( command->cmdObj ) ) {
 
222
                // Recv data from command sent earlier
 
223
                recvAsCmd( command->conn, &toRecv, &result );
 
224
            }
 
225
            else {
 
226
                // We can safely block in recvAny, so dispatch writes as safe writes for hosts
 
227
                // that don't understand batch write commands.
 
228
                legacySafeWrite( command->conn, command->dbName, command->cmdObj, &result );
 
229
            }
 
230
 
 
231
            shardConnectionPool.release( command->endpoint.toString(), command->conn );
 
232
            command->conn = NULL;
 
233
 
 
234
            string errMsg;
 
235
            if ( !response->parseBSON( result, &errMsg ) || !response->isValid( &errMsg ) ) {
 
236
                return Status( ErrorCodes::FailedToParse, errMsg );
 
237
            }
 
238
        }
 
239
        catch ( const DBException& ex ) {
 
240
 
 
241
            // Confusingly, the pool needs to know about failed connections so that it can
 
242
            // invalidate other connections which might be bad.  But if the connection doesn't seem
 
243
            // bad, don't send it back, because we don't want to reuse it.
 
244
            if ( !command->conn->isFailed() ) {
 
245
                delete command->conn;
 
246
            }
 
247
            else {
 
248
                shardConnectionPool.release( command->endpoint.toString(), command->conn );
 
249
            }
 
250
            command->conn = NULL;
 
251
 
 
252
            return ex.toStatus();
 
253
        }
 
254
 
 
255
        return Status::OK();
 
256
    }
 
257
 
 
258
    DBClientMultiCommand::~DBClientMultiCommand() {
 
259
 
 
260
        // Cleanup anything outstanding, do *not* return stuff to the pool, that might error
 
261
        for ( deque<PendingCommand*>::iterator it = _pendingCommands.begin();
 
262
            it != _pendingCommands.end(); ++it ) {
 
263
 
 
264
            PendingCommand* command = *it;
 
265
 
 
266
            if ( NULL != command->conn ) delete command->conn;
 
267
            delete command;
 
268
            command = NULL;
 
269
        }
 
270
 
 
271
        _pendingCommands.clear();
 
272
    }
 
273
 
 
274
    void DBClientMultiCommand::setTimeoutMillis( int milliSecs ) {
 
275
        _timeoutMillis = milliSecs;
 
276
    }
 
277
}