~evarlast/ubuntu/utopic/mongodb/upstart-workaround-debian-bug-718702

« back to all changes in this revision

Viewing changes to src/mongo/s/strategy_shard.cpp

  • Committer: Package Import Robot
  • Author(s): James Page, James Page, Robie Basak
  • Date: 2013-05-29 17:44:42 UTC
  • mfrom: (44.1.7 sid)
  • Revision ID: package-import@ubuntu.com-20130529174442-z0a4qmoww4y0t458
Tags: 1:2.4.3-1ubuntu1
[ James Page ]
* Merge from Debian unstable, remaining changes:
  - Enable SSL support:
    + d/control: Add libssl-dev to BD's.
    + d/rules: Enabled --ssl option.
    + d/mongodb.conf: Add example SSL configuration options.
  - d/mongodb-server.mongodb.upstart: Add upstart configuration.
  - d/rules: Don't strip binaries during scons build for Ubuntu.
  - d/control: Add armhf to target archs.
  - d/p/SConscript.client.patch: fixup install of client libraries.
  - d/p/0010-install-libs-to-usr-lib-not-usr-lib64-Closes-588557.patch:
    Install libraries to lib not lib64.
* Dropped changes:
  - d/p/arm-support.patch: Included in Debian.
  - d/p/double-alignment.patch: Included in Debian.
  - d/rules,control: Debian also builds with avaliable system libraries
    now.
* Fix FTBFS due to gcc and boost upgrades in saucy:
  - d/p/0008-ignore-unused-local-typedefs.patch: Add -Wno-unused-typedefs
    to unbreak building with g++-4.8.
  - d/p/0009-boost-1.53.patch: Fixup signed/unsigned casting issue.

[ Robie Basak ]
* d/p/0011-Use-a-signed-char-to-store-BSONType-enumerations.patch: Fixup
  build failure on ARM due to missing signed'ness of char cast.

Show diffs side-by-side

added added

removed removed

Lines of Context:
18
18
 
19
19
#include "pch.h"
20
20
 
 
21
#include "mongo/base/status.h"
21
22
#include "mongo/bson/util/builder.h"
22
23
#include "mongo/client/connpool.h"
23
24
#include "mongo/client/dbclientcursor.h"
 
25
#include "mongo/db/auth/action_type.h"
 
26
#include "mongo/db/auth/authorization_manager.h"
24
27
#include "mongo/db/commands.h"
25
28
#include "mongo/db/index.h"
26
29
#include "mongo/db/namespacestring.h"
 
30
#include "mongo/db/stats/counters.h"
27
31
#include "mongo/s/client_info.h"
28
32
#include "mongo/s/chunk.h"
 
33
#include "mongo/s/chunk_version.h"
29
34
#include "mongo/s/cursors.h"
30
35
#include "mongo/s/grid.h"
31
36
#include "mongo/s/request.h"
32
 
#include "mongo/s/stats.h"
 
37
#include "mongo/s/version_manager.h"
 
38
#include "mongo/util/mongoutils/str.h"
33
39
 
34
40
// error codes 8010-8040
35
41
 
38
44
    class ShardStrategy : public Strategy {
39
45
 
40
46
        bool _isSystemIndexes( const char* ns ) {
41
 
            return strstr( ns , ".system.indexes" ) == strchr( ns , '.' ) && strchr( ns , '.' );
 
47
            return NamespaceString(ns).coll == "system.indexes";
42
48
        }
43
49
 
44
50
        virtual void queryOp( Request& r ) {
45
51
 
46
 
            // TODO: These probably should just be handled here.
47
 
            if ( r.isCommand() ) {
48
 
                SINGLE->queryOp( r );
49
 
                return;
50
 
            }
 
52
            verify(!r.isCommand()); // Commands are handled in strategy_single.cpp
51
53
 
52
54
            QueryMessage q( r.d() );
53
55
 
54
 
            Auth::Level authRequired = NamespaceString(q.ns).coll == "system.users" ?
55
 
                    Auth::WRITE : Auth::READ;
56
 
            r.checkAuth(authRequired);
 
56
            AuthorizationManager* authManager =
 
57
                    ClientBasic::getCurrent()->getAuthorizationManager();
 
58
            Status status = authManager->checkAuthForQuery(q.ns);
 
59
            uassert(16549, status.reason(), status.isOK());
57
60
 
58
61
            LOG(3) << "shard query: " << q.ns << "  " << q.query << endl;
59
62
 
77
80
                    myShard.reset( new Shard( *shards.begin() ) );
78
81
                }
79
82
                
80
 
                doQuery( r, *myShard );
 
83
                doIndexQuery( r, *myShard );
81
84
                return;
82
85
            }
83
86
            
90
93
                if ( qSpec.isExplain() ) start_millis = curTimeMillis64();
91
94
                cursor->init();
92
95
 
93
 
                LOG(5) << "   cursor type: " << cursor->type() << endl;
94
 
                shardedCursorTypes.hit( cursor->type() );
95
 
 
96
96
                if ( qSpec.isExplain() ) {
97
97
                    // fetch elapsed time for the query
98
98
                    long long elapsed_millis = curTimeMillis64() - start_millis;
149
149
                                const string& versionedNS, const BSONObj& filter,
150
150
                                map<Shard,BSONObj>& results )
151
151
        {
152
 
            const BSONObj& commandWithAuth = ClientBasic::getCurrent()->getAuthenticationInfo()->
153
 
                    getAuthTable().copyCommandObjAddingAuth( command );
154
152
 
155
 
            QuerySpec qSpec( db + ".$cmd",
156
 
                             noauth ? command : commandWithAuth,
157
 
                             BSONObj(),
158
 
                             0,
159
 
                             1,
160
 
                             options );
 
153
            QuerySpec qSpec(db + ".$cmd", command, BSONObj(), 0, 1, options);
161
154
 
162
155
            ParallelSortClusteredCursor cursor( qSpec, CommandInfo( versionedNS, filter ) );
163
156
 
175
168
 
176
169
        virtual void getMore( Request& r ) {
177
170
 
 
171
            const char *ns = r.getns();
 
172
 
 
173
            AuthorizationManager* authManager =
 
174
                    ClientBasic::getCurrent()->getAuthorizationManager();
 
175
            Status status = authManager->checkAuthForGetMore(ns);
 
176
            uassert(16539, status.reason(), status.isOK());
 
177
 
178
178
            // TODO:  Handle stale config exceptions here from coll being dropped or sharded during op
179
179
            // for now has same semantics as legacy request
180
180
            ChunkManagerPtr info = r.getChunkManager();
185
185
 
186
186
            if( ! info ){
187
187
 
188
 
                const char *ns = r.getns();
189
 
 
190
188
                LOG(3) << "single getmore: " << ns << endl;
191
189
 
192
190
                long long id = r.d().getInt64( 4 );
214
212
                Message response;
215
213
                bool ok = conn->get()->callRead( r.m() , response);
216
214
                uassert( 10204 , "dbgrid: getmore: error calling db", ok);
 
215
 
 
216
                bool hasMore = (response.singleData()->getCursor() != 0);
 
217
 
 
218
                if ( !hasMore ) {
 
219
                    cursorCache.removeRef( id );
 
220
                }
 
221
 
217
222
                r.reply( response , "" /*conn->getServerAddress() */ );
218
 
 
219
223
                conn->done();
220
224
                return;
221
225
            }
265
269
         * Each thread gets its own backoff wait sequence, to avoid interfering with other valid
266
270
         * operations.
267
271
         */
268
 
        void _sleepForVerifiedLocalError(){
 
272
        void _sleepForVerifiedLocalError() {
269
273
 
270
 
            if( ! perThreadBackoff.get() )
271
 
                perThreadBackoff.reset( new Backoff( maxWaitMillis, maxWaitMillis * 2 ) );
 
274
            if (!perThreadBackoff.get()) perThreadBackoff.reset(new Backoff(maxWaitMillis,
 
275
                                                                            maxWaitMillis * 2));
272
276
 
273
277
            perThreadBackoff.get()->nextSleepMillis();
274
278
        }
275
279
 
276
 
        void _handleRetries( const string& op,
277
 
                             int retries,
278
 
                             const string& ns,
279
 
                             const BSONObj& query,
280
 
                             StaleConfigException& e,
281
 
                             Request& r ) // TODO: remove
 
280
        void _handleRetries(const string& op,
 
281
                            int retries,
 
282
                            const string& ns,
 
283
                            const BSONObj& query,
 
284
                            StaleConfigException& e,
 
285
                            Request& r) // TODO: remove
282
286
        {
283
 
 
284
287
            static const int MAX_RETRIES = 5;
285
 
            if( retries >= MAX_RETRIES ) throw e;
 
288
            if (retries >= MAX_RETRIES) {
 
289
                // If we rethrow b/c too many retries, make sure we add as much data as possible
 
290
                e.addContext(query.toString());
 
291
                throw e;
 
292
            }
286
293
 
287
294
            //
288
295
            // On a stale config exception, we have to assume that the entire collection could have
291
298
            //
292
299
 
293
300
            LOG( retries == 0 ? 1 : 0 ) << op << " will be retried b/c sharding config info is stale, "
294
 
                                << " retries: " << retries
295
 
                                << " ns: " << ns
296
 
                                << " data: " << query << endl;
 
301
                              << " retries: " << retries
 
302
                              << " ns: " << ns
 
303
                              << " data: " << query << endl;
297
304
 
298
 
            if( retries > 2 ){
299
 
                versionManager.forceRemoteCheckShardVersionCB( ns );
 
305
            if (retries > 2) {
 
306
                versionManager.forceRemoteCheckShardVersionCB(ns);
300
307
            }
301
308
 
302
309
            r.reset();
303
310
        }
304
311
 
305
 
        void _groupInserts( const string& ns,
306
 
                            vector<BSONObj>& inserts,
307
 
                            map<ChunkPtr,vector<BSONObj> >& insertsForChunks,
308
 
                            ChunkManagerPtr& manager,
309
 
                            ShardPtr& primary,
310
 
                            bool reloadedConfigData = false )
311
 
        {
312
 
 
313
 
            grid.getDBConfig( ns )->getChunkManagerOrPrimary( ns, manager, primary );
314
 
 
315
 
            // Redo all inserts for chunks which have changed
316
 
            map<ChunkPtr,vector<BSONObj> >::iterator i = insertsForChunks.begin();
317
 
            while( ! insertsForChunks.empty() && i != insertsForChunks.end() ){
318
 
 
319
 
                // If we don't have a manger, our chunk is empty, or our manager is incompatible with the chunk
320
 
                // we assigned inserts to, re-map the inserts to new chunks
321
 
                if( ! manager || ! ( i->first.get() ) || ( manager && ! manager->compatibleWith( i->first ) ) ){
322
 
                    inserts.insert( inserts.end(), i->second.begin(), i->second.end() );
323
 
                    insertsForChunks.erase( i++ );
324
 
                }
325
 
                else ++i;
326
 
 
327
 
            }
328
 
 
329
 
            // Used for storing non-sharded insert data
330
 
            ChunkPtr empty;
331
 
 
332
 
            // Figure out inserts we haven't chunked yet
333
 
            for( vector<BSONObj>::iterator i = inserts.begin(); i != inserts.end(); ++i ){
334
 
 
335
 
                BSONObj o = *i;
336
 
 
337
 
                if ( manager && ! manager->hasShardKey( o ) ) {
 
312
        struct InsertGroup {
 
313
 
 
314
            InsertGroup() :
 
315
                    reloadedConfig(false)
 
316
            {
 
317
            }
 
318
 
 
319
            // Does NOT reset our config reload flag
 
320
            void resetInsertData() {
 
321
                shard.reset();
 
322
                manager.reset();
 
323
                inserts.clear();
 
324
                chunkData.clear();
 
325
                errMsg.clear();
 
326
            }
 
327
 
 
328
            bool hasException() {
 
329
                return errMsg != "";
 
330
            }
 
331
 
 
332
            void setException(int code, const string& errMsg) {
 
333
                this->errCode = code;
 
334
                this->errMsg = errMsg;
 
335
            }
 
336
 
 
337
            ShardPtr shard;
 
338
            ChunkManagerPtr manager;
 
339
            vector<BSONObj> inserts;
 
340
            map<ChunkPtr, int> chunkData;
 
341
            bool reloadedConfig;
 
342
 
 
343
            string errMsg;
 
344
            int errCode;
 
345
 
 
346
        };
 
347
 
 
348
        /**
 
349
         * Given a ns and insert message (with flags), returns the shard (and chunkmanager if
 
350
         * needed) required to do a bulk insert of the next N inserts until the shard changes.
 
351
         *
 
352
         * Also tracks the data inserted per chunk on the current shard.
 
353
         *
 
354
         * Returns whether or not the config data was reloaded
 
355
         */
 
356
        void _getNextInsertGroup(const string& ns, DbMessage& d, int flags, InsertGroup* group) {
 
357
            grid.getDBConfig(ns)->getChunkManagerOrPrimary(ns, group->manager, group->shard);
 
358
            // shard is either primary or nothing, if there's a chunk manager
 
359
 
 
360
            // Set our current position, so we can jump back if we have a stale config error for
 
361
            // this group
 
362
            d.markSet();
 
363
 
 
364
            int totalInsertSize = 0;
 
365
 
 
366
            while (d.moreJSObjs()) {
 
367
 
 
368
                const char* prevObjMark = d.markGet();
 
369
                BSONObj o = d.nextJsObj();
 
370
 
 
371
                if (group->manager && !group->manager->hasShardKey(o)) {
338
372
 
339
373
                    bool bad = true;
340
374
 
341
 
                    // Add autogenerated _id to item and see if we now have a shard key
342
 
                    if ( manager->getShardKey().partOfShardKey( "_id" ) ) {
 
375
                    // If _id is part of shard key pattern, but item doesn't already have one,
 
376
                    // add autogenerated _id and see if we now have a shard key.
 
377
                    if (group->manager->getShardKey().partOfShardKey("_id") && !o.hasField("_id")) {
343
378
 
344
379
                        BSONObjBuilder b;
345
 
                        b.appendOID( "_id" , 0 , true );
346
 
                        b.appendElements( o );
 
380
                        b.appendOID("_id", 0, true);
 
381
                        b.appendElements(o);
347
382
                        o = b.obj();
348
 
                        bad = ! manager->hasShardKey( o );
 
383
                        bad = !group->manager->hasShardKey(o);
349
384
 
350
385
                    }
351
386
 
352
 
                    if( bad && ! reloadedConfigData ){
 
387
                    if (bad && !group->reloadedConfig) {
353
388
 
354
389
                        //
355
390
                        // The shard key may not match because it has changed on us (new collection), and we are now
370
405
                        //
371
406
 
372
407
                        warning() << "shard key mismatch for insert " << o
373
 
                                  << ", expected values for " << manager->getShardKey()
 
408
                                  << ", expected values for " << group->manager->getShardKey()
374
409
                                  << ", reloading config data to ensure not stale" << endl;
375
410
 
 
411
                        // Reset the selected shard and manager
 
412
                        group->shard.reset();
 
413
                        group->manager.reset();
376
414
                        // Remove all the previously grouped inserts...
377
 
                        inserts.erase( inserts.begin(), i );
 
415
                        group->inserts.clear();
 
416
                        // Reset the chunk data...
 
417
                        group->chunkData.clear();
378
418
 
379
419
                        // If this is our retry, force talking to the config server
380
 
                        grid.getDBConfig( ns )->getChunkManagerIfExists( ns, true );
381
 
                        _groupInserts( ns, inserts, insertsForChunks, manager, primary, true );
 
420
                        grid.getDBConfig(ns)->getChunkManagerIfExists(ns, true);
 
421
 
 
422
                        // Now we've reloaded the config once
 
423
                        group->reloadedConfig = true;
 
424
 
 
425
                        // Reset our current position to the start of the last group of inserts
 
426
                        d.markReset();
 
427
 
 
428
                        _getNextInsertGroup(ns, d, flags, group);
382
429
                        return;
383
430
                    }
384
431
 
385
 
                    if( bad ){
 
432
                    if (bad) {
386
433
 
387
434
                        // Sleep to avoid DOS'ing config server when we have invalid inserts
388
435
                        _sleepForVerifiedLocalError();
389
436
 
390
437
                        // TODO: Matching old behavior, but do we need this log line?
391
438
                        log() << "tried to insert object with no valid shard key for "
392
 
                              << manager->getShardKey() << " : " << o << endl;
 
439
                              << group->manager->getShardKey() << " : " << o << endl;
393
440
 
394
 
                        uasserted( 8011,
395
 
                              str::stream() << "tried to insert object with no valid shard key for "
396
 
                                            << manager->getShardKey().toString() << " : " << o.toString() );
 
441
                        group->setException(8011,
 
442
                                            str::stream()
 
443
                                                    << "tried to insert object with no valid shard key for "
 
444
                                                    << group->manager->getShardKey().toString()
 
445
                                                    << " : " << o.toString());
 
446
                        return;
397
447
                    }
398
448
                }
399
449
 
 
450
                int objSize = o.objsize();
 
451
                totalInsertSize += objSize;
 
452
 
 
453
                // Insert at least one document, but otherwise no more than 8MB of data, otherwise
 
454
                // the WBL will not work
 
455
                if (group->inserts.size() > 0 && totalInsertSize > BSONObjMaxUserSize / 2) {
 
456
                    // Reset to after the previous insert
 
457
                    d.markReset(prevObjMark);
 
458
 
 
459
                    LOG(3) << "breaking up bulk insert group to " << ns << " at size "
 
460
                               << (totalInsertSize - objSize) << " (" << group->inserts.size()
 
461
                               << " documents)" << endl;
 
462
 
 
463
                    // Too much data would be inserted, break out of our bulk insert loop
 
464
                    break;
 
465
                }
 
466
 
 
467
                // Make sure our objSize is not greater than maximum, otherwise WBL won't work
 
468
                verify( objSize <= BSONObjMaxUserSize );
 
469
 
400
470
                // Many operations benefit from having the shard key early in the object
401
 
                if( manager ){
402
 
                    o = manager->getShardKey().moveToFront(o);
403
 
                    insertsForChunks[manager->findChunk(o)].push_back(o);
 
471
                if (group->manager) {
 
472
 
 
473
                    //
 
474
                    // Sharded insert
 
475
                    //
 
476
 
 
477
                    ChunkPtr chunk = group->manager->findChunkForDoc(o);
 
478
 
 
479
                    if (!group->shard) {
 
480
                        group->shard.reset(new Shard(chunk->getShard()));
 
481
                    }
 
482
                    else if (group->shard->getName() != chunk->getShard().getName()) {
 
483
 
 
484
                        // Reset to after the previous insert
 
485
                        d.markReset(prevObjMark);
 
486
                        // Our shard has changed, break out of bulk insert loop
 
487
                        break;
 
488
                    }
 
489
 
 
490
                    o = group->manager->getShardKey().moveToFront(o);
 
491
                    group->inserts.push_back(o);
 
492
                    group->chunkData[chunk] += objSize;
404
493
                }
405
 
                else{
406
 
                    insertsForChunks[ empty ].push_back(o);
 
494
                else {
 
495
 
 
496
                    //
 
497
                    // Unsharded insert
 
498
                    //
 
499
 
 
500
                    group->inserts.push_back(o);
407
501
                }
408
502
            }
409
 
 
410
 
            inserts.clear();
411
 
            return;
412
503
        }
413
504
 
414
505
        /**
418
509
         * 1) Error is thrown immediately for corrupt objects
419
510
         * 2) Error is thrown only for UserExceptions during the insert process, if last obj had error that's thrown
420
511
         */
421
 
        void _insert( Request& r , DbMessage& d ){
 
512
        void _insert(Request& r, DbMessage& d) {
422
513
 
423
514
            const string& ns = r.getns();
424
515
 
425
 
            vector<BSONObj> insertsRemaining;
426
 
            while ( d.moreJSObjs() ){
427
 
                insertsRemaining.push_back( d.nextJsObj() );
428
 
            }
 
516
            AuthorizationManager* authManager =
 
517
                    ClientBasic::getCurrent()->getAuthorizationManager();
 
518
            Status status = authManager->checkAuthForInsert(ns);
 
519
            uassert(16540, status.reason(), status.isOK());
 
520
 
429
521
 
430
522
            int flags = 0;
431
523
 
432
 
            if( d.reservedField() & Reserved_InsertOption_ContinueOnError )
433
 
                flags |= InsertOption_ContinueOnError;
434
 
 
435
 
            if( d.reservedField() & Reserved_FromWriteback )
436
 
                flags |= WriteOption_FromWriteback;
437
 
 
438
 
            _insert( ns, insertsRemaining, flags, r, d );
439
 
        }
440
 
 
441
 
        void _insert( const string& ns,
442
 
                      vector<BSONObj>& inserts,
443
 
                      int flags,
444
 
                      Request& r , DbMessage& d ) // TODO: remove
445
 
        {
446
 
            map<ChunkPtr, vector<BSONObj> > insertsForChunks; // Map for bulk inserts to diff chunks
447
 
            _insert( ns, inserts, insertsForChunks, flags, r, d );
448
 
        }
449
 
 
450
 
        void _insert( const string& ns,
451
 
                      vector<BSONObj>& insertsRemaining,
452
 
                      map<ChunkPtr, vector<BSONObj> >& insertsForChunks,
453
 
                      int flags,
454
 
                      Request& r, DbMessage& d, // TODO: remove
455
 
                      int retries = 0 )
456
 
        {
457
 
            // TODO: Replace this with a better check to see if we're making progress
458
 
            uassert( 16055, str::stream() << "too many retries during bulk insert, " << insertsRemaining.size() << " inserts remaining", retries < 30 );
459
 
            uassert( 16056, str::stream() << "shutting down server during bulk insert, " << insertsRemaining.size() << " inserts remaining", ! inShutdown() );
460
 
 
461
 
            ChunkManagerPtr manager;
462
 
            ShardPtr primary;
463
 
 
464
 
            // This function handles grouping the inserts per-shard whether the collection is sharded or not.
465
 
            _groupInserts( ns, insertsRemaining, insertsForChunks, manager, primary );
466
 
 
467
 
            // ContinueOnError is always on when using sharding.
468
 
            flags |= manager ? InsertOption_ContinueOnError : 0;
469
 
 
470
 
            while( ! insertsForChunks.empty() ){
471
 
 
472
 
                ChunkPtr c = insertsForChunks.begin()->first;
473
 
                vector<BSONObj>& objs = insertsForChunks.begin()->second;
474
 
 
475
 
                //
476
 
                // Careful - if primary exists, c will be empty
477
 
                //
478
 
 
479
 
                const Shard& shard = c ? c->getShard() : primary.get();
480
 
 
481
 
                ShardConnection dbcon( shard, ns, manager );
 
524
            if (d.reservedField() & Reserved_InsertOption_ContinueOnError) flags |=
 
525
                    InsertOption_ContinueOnError;
 
526
 
 
527
            if (d.reservedField() & Reserved_FromWriteback) flags |= WriteOption_FromWriteback;
 
528
 
 
529
            if (!d.moreJSObjs()) return;
 
530
 
 
531
            _insert(ns, d, flags, r);
 
532
        }
 
533
 
 
534
        void _insert(const string& ns, DbMessage& d, int flags, Request& r) // TODO: remove
 
535
        {
 
536
            uassert( 16056, str::stream() << "shutting down server during insert", ! inShutdown() );
 
537
 
 
538
            bool continueOnError = flags & InsertOption_ContinueOnError;
 
539
 
 
540
            // Sanity check, probably not needed but for safety
 
541
            int retries = 0;
 
542
 
 
543
            InsertGroup group;
 
544
 
 
545
            bool prevInsertException = false;
 
546
 
 
547
            while (d.moreJSObjs()) {
 
548
 
 
549
                // TODO: Replace this with a better check to see if we're making progress
 
550
                uassert( 16055, str::stream() << "too many retries during insert", retries < 30 );
 
551
 
 
552
                //
 
553
                // PREPARE INSERT
 
554
                //
 
555
 
 
556
                group.resetInsertData();
 
557
 
 
558
                // This function handles grouping the inserts per-shard whether the collection
 
559
                // is sharded or not.
 
560
                //
 
561
                // Can record errors on bad insert object format (i.e. doesn't have the shard
 
562
                // key), which should be handled in-order with mongod errors.
 
563
                //
 
564
                _getNextInsertGroup(ns, d, flags, &group);
 
565
 
 
566
                // We should always have a shard if we have any inserts
 
567
                verify(group.inserts.size() == 0 || group.shard.get());
 
568
 
 
569
                if (group.inserts.size() > 0 && group.hasException()) {
 
570
                    warning() << "problem preparing batch insert detected, first inserting "
 
571
                              << group.inserts.size() << " intermediate documents" << endl;
 
572
                }
 
573
 
 
574
                scoped_ptr<ShardConnection> dbconPtr;
482
575
 
483
576
                try {
484
577
 
485
 
                    LOG(4) << "inserting " << objs.size() << " documents to shard " << shard
486
 
                           << " at version "
487
 
                           << ( manager.get() ? manager->getVersion().toString() :
488
 
                                                ShardChunkVersion( 0, OID() ).toString() ) << endl;
489
 
 
490
 
                    // Taken from single-shard bulk insert, should not need multiple methods in future
491
 
                    // insert( c->getShard() , r.getns() , objs , flags);
492
 
 
493
 
                    // It's okay if the version is set here, an exception will be thrown if the version is incompatible
494
 
                    try{
 
578
                    //
 
579
                    // DO ALL VALID INSERTS
 
580
                    //
 
581
 
 
582
                    if (group.inserts.size() > 0) {
 
583
 
 
584
                        dbconPtr.reset(new ShardConnection(*(group.shard), ns, group.manager));
 
585
                        ShardConnection& dbcon = *dbconPtr;
 
586
 
 
587
                        LOG(5)
 
588
                                << "inserting "
 
589
                                << group.inserts.size()
 
590
                                << " documents to shard "
 
591
                                << group.shard
 
592
                                << " at version "
 
593
                                << (group.manager.get() ?
 
594
                                    group.manager->getVersion().toString() :
 
595
                                    ChunkVersion(0, OID()).toString())
 
596
                                << endl;
 
597
 
 
598
                        //
 
599
                        // CHECK VERSION
 
600
                        //
 
601
 
 
602
                        // Will throw SCE if we need to reset our version before sending.
495
603
                        dbcon.setVersion();
496
 
                    }
497
 
                    catch ( StaleConfigException& e ) {
498
 
                        // External try block is still needed to match bulk insert mongod
499
 
                        // behavior
500
 
                        dbcon.done();
501
 
                        _handleRetries( "insert", retries, ns, objs[0], e, r );
502
 
                        _insert( ns, insertsRemaining, insertsForChunks, flags, r, d, retries + 1 );
503
 
                        return;
504
 
                    }
505
 
 
506
 
                    // Certain conn types can't handle bulk inserts, so don't use unless we need to
507
 
                    if( objs.size() == 1 ){
508
 
                        dbcon->insert( ns, objs[0], flags );
509
 
                    }
510
 
                    else{
511
 
                        dbcon->insert( ns , objs , flags);
512
 
                    }
513
 
 
514
 
                    // TODO: Option for safe inserts here - can then use this for all inserts
515
 
                    // Not sure what this means?
516
 
 
517
 
                    dbcon.done();
518
 
 
519
 
                    int bytesWritten = 0;
520
 
                    for (vector<BSONObj>::iterator vecIt = objs.begin(); vecIt != objs.end(); ++vecIt) {
521
 
                        r.gotInsert(); // Record the correct number of individual inserts
522
 
                        bytesWritten += (*vecIt).objsize();
523
 
                    }
524
 
 
525
 
                    // TODO: The only reason we're grouping by chunks here is for auto-split, more efficient
526
 
                    // to track this separately and bulk insert to shards
527
 
                    if ( c && r.getClientInfo()->autoSplitOk() )
528
 
                        c->splitIfShould( bytesWritten );
529
 
 
530
 
                }
531
 
                catch( UserException& e ){
532
 
                    // Unexpected exception, so don't clean up the conn
533
 
                    dbcon.kill();
534
 
 
535
 
                    // These inserts won't be retried, as something weird happened here
536
 
                    insertsForChunks.erase( insertsForChunks.begin() );
537
 
 
538
 
                    // Throw if this is the last chunk bulk-inserted to
539
 
                    if( insertsForChunks.empty() ){
 
604
 
 
605
                        // Reset our retries to zero since this batch's version went through
 
606
                        retries = 0;
 
607
 
 
608
                        //
 
609
                        // SEND INSERT
 
610
                        //
 
611
 
 
612
                        string insertErr = "";
 
613
 
 
614
                        try {
 
615
 
 
616
                            dbcon->insert(ns, group.inserts, flags);
 
617
 
 
618
                            //
 
619
                            // WARNING: We *have* to return the connection here, otherwise the
 
620
                            // error gets checked on a different connection!
 
621
                            //
 
622
                            dbcon.done();
 
623
 
 
624
                            globalOpCounters.incInsertInWriteLock(group.inserts.size());
 
625
 
 
626
                            //
 
627
                            // CHECK INTERMEDIATE ERROR
 
628
                            //
 
629
 
 
630
                            // We need to check the mongod error if we're inserting more documents,
 
631
                            // or if a later mongos error might mask an insert error,
 
632
                            // or if an earlier error might mask this error from GLE
 
633
                            if (d.moreJSObjs() || group.hasException() || prevInsertException) {
 
634
 
 
635
                                LOG(3) << "running intermediate GLE to "
 
636
                                       << group.shard->toString() << " during bulk insert "
 
637
                                       << "because "
 
638
                                       << (d.moreJSObjs() ? "we have more documents to insert" : 
 
639
                                          (group.hasException() ? "exception detected while preparing group" :
 
640
                                                                      "a previous error exists"))
 
641
                                       << endl;
 
642
 
 
643
                                ClientInfo* ci = r.getClientInfo();
 
644
 
 
645
                                //
 
646
                                // WARNING: Without this, we will use the *previous* shard for GLE
 
647
                                //
 
648
                                ci->newRequest();
 
649
 
 
650
                                BSONObjBuilder gleB;
 
651
                                string errMsg;
 
652
 
 
653
                                // TODO: Can't actually pass GLE parameters here,
 
654
                                // so we use defaults?
 
655
                                ci->getLastError("admin",
 
656
                                                 BSON( "getLastError" << 1 ),
 
657
                                                 gleB,
 
658
                                                 errMsg,
 
659
                                                 false);
 
660
 
 
661
                                insertErr = errMsg;
 
662
                                BSONObj gle = gleB.obj();
 
663
                                if (gle["err"].type() == String)
 
664
                                    insertErr = gle["err"].String();
 
665
 
 
666
                                LOG(3) << "intermediate GLE result was " << gle
 
667
                                       << " errmsg: " << errMsg << endl;
 
668
 
 
669
                                //
 
670
                                // Clear out the shards we've checked so far, if we're successful
 
671
                                //
 
672
                                ci->clearSinceLastGetError();
 
673
                            }
 
674
                        }
 
675
                        catch (DBException& e) {
 
676
                            // Network error on send or GLE
 
677
                            insertErr = e.what();
 
678
                            dbcon.kill();
 
679
                        }
 
680
 
 
681
                        //
 
682
                        // If the insert had an error, figure out if we should throw right now.
 
683
                        //
 
684
 
 
685
                        if (insertErr.size() > 0) {
 
686
 
 
687
                            string errMsg = str::stream()
 
688
                                    << "error inserting "
 
689
                                    << group.inserts.size()
 
690
                                    << " documents to shard "
 
691
                                    << group.shard->toString()
 
692
                                    << " at version "
 
693
                                    << (group.manager.get() ?
 
694
                                        group.manager->getVersion().toString() :
 
695
                                        ChunkVersion(0, OID()).toString())
 
696
                                    << causedBy(insertErr);
 
697
 
 
698
                            // If we're continuing-on-error and the insert error is superseded by
 
699
                            // a later error from mongos, we shouldn't throw this error but the
 
700
                            // later one.
 
701
                            if (group.hasException() && continueOnError) warning() << errMsg;
 
702
                            else uasserted(16460, errMsg);
 
703
                        }
 
704
 
 
705
                        //
 
706
                        // SPLIT CHUNKS IF NEEDED
 
707
                        //
 
708
 
 
709
                        // Should never throw errors!
 
710
                        if (!group.chunkData.empty() && r.getClientInfo()->autoSplitOk()) {
 
711
 
 
712
                            for (map<ChunkPtr, int>::iterator it = group.chunkData.begin();
 
713
                                    it != group.chunkData.end(); ++it)
 
714
                            {
 
715
                                ChunkPtr c = it->first;
 
716
                                int bytesWritten = it->second;
 
717
 
 
718
                                c->splitIfShould(bytesWritten);
 
719
                            }
 
720
                        }
 
721
                    }
 
722
 
 
723
                    //
 
724
                    // CHECK AND RE-THROW MONGOS ERROR
 
725
                    //
 
726
 
 
727
                    if (group.hasException()) {
 
728
 
 
729
                        string errMsg = str::stream() << "error preparing documents for insert"
 
730
                                                      << causedBy(group.errMsg);
 
731
 
 
732
                        uasserted(group.errCode, errMsg);
 
733
                    }
 
734
                }
 
735
                catch (StaleConfigException& e) {
 
736
 
 
737
                    // Clean up the conn if needed
 
738
                    if (dbconPtr) dbconPtr->done();
 
739
 
 
740
                    // Note - this can throw a SCE which will abort *all* the rest of the inserts
 
741
                    // if we retry too many times.  We assume that this cannot happen.  In any
 
742
                    // case, the user gets an error.
 
743
                    _handleRetries("insert", retries, ns, group.inserts[0], e, r);
 
744
                    retries++;
 
745
 
 
746
                    // Go back to the start of the inserts
 
747
                    d.markReset();
 
748
 
 
749
                    verify( d.moreJSObjs() );
 
750
                }
 
751
                catch (UserException& e) {
 
752
 
 
753
                    // Unexpected exception, cleans up the conn if not already done()
 
754
                    if (dbconPtr) dbconPtr->kill();
 
755
 
 
756
                    warning() << "exception during insert"
 
757
                              << (continueOnError ? " (continue on error set)" : "") << causedBy(e)
 
758
                              << endl;
 
759
 
 
760
                    prevInsertException = true;
 
761
 
 
762
                    //
 
763
                    // Throw if this is the last chunk bulk-inserted to, or if continue-on-error is
 
764
                    // not set.
 
765
                    //
 
766
                    // If this is the last chunk bulk-inserted to, then if continue-on-error is
 
767
                    // true, we'll have the final error, and if continue-on-error is false, we
 
768
                    // should abort anyway.
 
769
                    //
 
770
 
 
771
                    if (!d.moreJSObjs() || !continueOnError) {
540
772
                        throw;
541
773
                    }
542
774
 
547
779
                    // TODO: Make better semantics
548
780
                    //
549
781
 
550
 
                    warning() << "swallowing exception during batch insert"
551
 
                              << causedBy( e ) << endl;
 
782
                    warning() << "swallowing exception during insert" << causedBy(e) << endl;
552
783
                }
553
784
 
554
 
                insertsForChunks.erase( insertsForChunks.begin() );
 
785
                // Reset our list of last shards we talked to, since we already got writebacks
 
786
                // earlier.
 
787
                if (d.moreJSObjs()) r.getClientInfo()->clearSinceLastGetError();
555
788
            }
556
789
        }
557
790
 
558
 
        void _prepareUpdate( const string& ns,
559
 
                             const BSONObj& query,
560
 
                             const BSONObj& toUpdate,
561
 
                             int flags,
562
 
                             // Output
563
 
                             ChunkPtr& chunk,
564
 
                             ShardPtr& shard,
565
 
                             ChunkManagerPtr& manager,
566
 
                             ShardPtr& primary,
567
 
                             // Input
568
 
                             bool reloadConfigData = false )
 
791
        void _prepareUpdate(const string& ns,
 
792
                            const BSONObj& query,
 
793
                            const BSONObj& toUpdate,
 
794
                            int flags,
 
795
                            // Output
 
796
                            ChunkPtr& chunk,
 
797
                            ShardPtr& shard,
 
798
                            ChunkManagerPtr& manager,
 
799
                            ShardPtr& primary,
 
800
                            // Input
 
801
                            bool reloadConfigData = false)
569
802
        {
570
803
            //
571
804
            // Updates have three basic targeting options :
685
918
                    if( ! skPattern.partOfShardKey( field.fieldName() ) || getGtLtOp( field ) != BSONObj::Equality )
686
919
                        continue;
687
920
 
688
 
                    if( field != shardKey[ field.fieldName() ] ){
 
921
                    if( field != toUpdate[ field.fieldName() ] ){
689
922
 
690
923
                        // Retry reloading the config data once
691
924
                        if( ! reloadConfigData ){
714
947
            verify( manager );
715
948
            if( ! shardKey.isEmpty() ){
716
949
 
717
 
                chunk = manager->findChunk( shardKey );
 
950
                chunk = manager->findIntersectingChunk( shardKey );
718
951
                shard = ShardPtr( new Shard( chunk->getShard() ) );
719
952
                return;
720
953
            }
774
1007
            int flags = d.pullInt();
775
1008
            const BSONObj query = d.nextJsObj();
776
1009
 
 
1010
            bool upsert = flags & UpdateOption_Upsert;
 
1011
            AuthorizationManager* authManager =
 
1012
                    ClientBasic::getCurrent()->getAuthorizationManager();
 
1013
            Status status = authManager->checkAuthForUpdate(ns, upsert);
 
1014
            uassert(16537, status.reason(), status.isOK());
 
1015
 
777
1016
            uassert( 10201 ,  "invalid update" , d.moreJSObjs() );
778
1017
 
779
1018
            const BSONObj toUpdate = d.nextJsObj();
930
1169
            const string& ns = r.getns();
931
1170
            int flags = d.pullInt();
932
1171
 
 
1172
            AuthorizationManager* authManager =
 
1173
                    ClientBasic::getCurrent()->getAuthorizationManager();
 
1174
            Status status = authManager->checkAuthForDelete(ns);
 
1175
            uassert(16541, status.reason(), status.isOK());
 
1176
 
933
1177
            uassert( 10203 ,  "bad delete message" , d.moreJSObjs() );
934
1178
 
935
1179
            const BSONObj query = d.nextJsObj();
957
1201
 
958
1202
                int * x = (int*)(r.d().afterNS());
959
1203
                x[0] |= RemoveOption_Broadcast; // this means don't check shard version in mongod
960
 
                // TODO: Why is this an update op here?
961
 
                broadcastWrite( dbUpdate, r );
 
1204
                broadcastWrite(dbDelete, r);
962
1205
                return;
963
1206
            }
964
1207
 
992
1235
            // TODO: This block goes away, system.indexes needs to handle better
993
1236
            if( isIndexWrite ){
994
1237
 
 
1238
                if (op == dbInsert) {
 
1239
                    // Insert is the only write op allowed on system.indexes, so it's the only one
 
1240
                    // we check auth for.
 
1241
                    AuthorizationManager* authManager =
 
1242
                            ClientBasic::getCurrent()->getAuthorizationManager();
 
1243
                    uassert(16547,
 
1244
                            mongoutils::str::stream() << "not authorized to create index on " << ns,
 
1245
                            authManager->checkAuthorization(ns, ActionType::ensureIndex));
 
1246
                }
 
1247
 
995
1248
                if ( r.getConfig()->isShardingEnabled() ){
996
1249
                    LOG(1) << "sharded index write for " << ns << endl;
997
1250
                    handleIndexWrite( op , r );
1035
1288
                while( d.moreJSObjs() ) {
1036
1289
                    BSONObj o = d.nextJsObj();
1037
1290
                    const char * ns = o["ns"].valuestr();
 
1291
 
1038
1292
                    if ( r.getConfig()->isSharded( ns ) ) {
1039
1293
                        BSONObj newIndexKey = o["key"].embeddedObjectUserCheck();
1040
1294