293
300
LOG( retries == 0 ? 1 : 0 ) << op << " will be retried b/c sharding config info is stale, "
294
<< " retries: " << retries
296
<< " data: " << query << endl;
301
<< " retries: " << retries
303
<< " data: " << query << endl;
299
versionManager.forceRemoteCheckShardVersionCB( ns );
306
versionManager.forceRemoteCheckShardVersionCB(ns);
305
void _groupInserts( const string& ns,
306
vector<BSONObj>& inserts,
307
map<ChunkPtr,vector<BSONObj> >& insertsForChunks,
308
ChunkManagerPtr& manager,
310
bool reloadedConfigData = false )
313
grid.getDBConfig( ns )->getChunkManagerOrPrimary( ns, manager, primary );
315
// Redo all inserts for chunks which have changed
316
map<ChunkPtr,vector<BSONObj> >::iterator i = insertsForChunks.begin();
317
while( ! insertsForChunks.empty() && i != insertsForChunks.end() ){
319
// If we don't have a manger, our chunk is empty, or our manager is incompatible with the chunk
320
// we assigned inserts to, re-map the inserts to new chunks
321
if( ! manager || ! ( i->first.get() ) || ( manager && ! manager->compatibleWith( i->first ) ) ){
322
inserts.insert( inserts.end(), i->second.begin(), i->second.end() );
323
insertsForChunks.erase( i++ );
329
// Used for storing non-sharded insert data
332
// Figure out inserts we haven't chunked yet
333
for( vector<BSONObj>::iterator i = inserts.begin(); i != inserts.end(); ++i ){
337
if ( manager && ! manager->hasShardKey( o ) ) {
315
reloadedConfig(false)
319
// Does NOT reset our config reload flag
320
void resetInsertData() {
328
bool hasException() {
332
void setException(int code, const string& errMsg) {
333
this->errCode = code;
334
this->errMsg = errMsg;
338
ChunkManagerPtr manager;
339
vector<BSONObj> inserts;
340
map<ChunkPtr, int> chunkData;
349
* Given a ns and insert message (with flags), returns the shard (and chunkmanager if
350
* needed) required to do a bulk insert of the next N inserts until the shard changes.
352
* Also tracks the data inserted per chunk on the current shard.
354
* Returns whether or not the config data was reloaded
356
void _getNextInsertGroup(const string& ns, DbMessage& d, int flags, InsertGroup* group) {
357
grid.getDBConfig(ns)->getChunkManagerOrPrimary(ns, group->manager, group->shard);
358
// shard is either primary or nothing, if there's a chunk manager
360
// Set our current position, so we can jump back if we have a stale config error for
364
int totalInsertSize = 0;
366
while (d.moreJSObjs()) {
368
const char* prevObjMark = d.markGet();
369
BSONObj o = d.nextJsObj();
371
if (group->manager && !group->manager->hasShardKey(o)) {
341
// Add autogenerated _id to item and see if we now have a shard key
342
if ( manager->getShardKey().partOfShardKey( "_id" ) ) {
375
// If _id is part of shard key pattern, but item doesn't already have one,
376
// add autogenerated _id and see if we now have a shard key.
377
if (group->manager->getShardKey().partOfShardKey("_id") && !o.hasField("_id")) {
344
379
BSONObjBuilder b;
345
b.appendOID( "_id" , 0 , true );
346
b.appendElements( o );
380
b.appendOID("_id", 0, true);
348
bad = ! manager->hasShardKey( o );
383
bad = !group->manager->hasShardKey(o);
352
if( bad && ! reloadedConfigData ){
387
if (bad && !group->reloadedConfig) {
355
390
// The shard key may not match because it has changed on us (new collection), and we are now
372
407
warning() << "shard key mismatch for insert " << o
373
<< ", expected values for " << manager->getShardKey()
408
<< ", expected values for " << group->manager->getShardKey()
374
409
<< ", reloading config data to ensure not stale" << endl;
411
// Reset the selected shard and manager
412
group->shard.reset();
413
group->manager.reset();
376
414
// Remove all the previously grouped inserts...
377
inserts.erase( inserts.begin(), i );
415
group->inserts.clear();
416
// Reset the chunk data...
417
group->chunkData.clear();
379
419
// If this is our retry, force talking to the config server
380
grid.getDBConfig( ns )->getChunkManagerIfExists( ns, true );
381
_groupInserts( ns, inserts, insertsForChunks, manager, primary, true );
420
grid.getDBConfig(ns)->getChunkManagerIfExists(ns, true);
422
// Now we've reloaded the config once
423
group->reloadedConfig = true;
425
// Reset our current position to the start of the last group of inserts
428
_getNextInsertGroup(ns, d, flags, group);
387
434
// Sleep to avoid DOS'ing config server when we have invalid inserts
388
435
_sleepForVerifiedLocalError();
390
437
// TODO: Matching old behavior, but do we need this log line?
391
438
log() << "tried to insert object with no valid shard key for "
392
<< manager->getShardKey() << " : " << o << endl;
439
<< group->manager->getShardKey() << " : " << o << endl;
395
str::stream() << "tried to insert object with no valid shard key for "
396
<< manager->getShardKey().toString() << " : " << o.toString() );
441
group->setException(8011,
443
<< "tried to insert object with no valid shard key for "
444
<< group->manager->getShardKey().toString()
445
<< " : " << o.toString());
450
int objSize = o.objsize();
451
totalInsertSize += objSize;
453
// Insert at least one document, but otherwise no more than 8MB of data, otherwise
454
// the WBL will not work
455
if (group->inserts.size() > 0 && totalInsertSize > BSONObjMaxUserSize / 2) {
456
// Reset to after the previous insert
457
d.markReset(prevObjMark);
459
LOG(3) << "breaking up bulk insert group to " << ns << " at size "
460
<< (totalInsertSize - objSize) << " (" << group->inserts.size()
461
<< " documents)" << endl;
463
// Too much data would be inserted, break out of our bulk insert loop
467
// Make sure our objSize is not greater than maximum, otherwise WBL won't work
468
verify( objSize <= BSONObjMaxUserSize );
400
470
// Many operations benefit from having the shard key early in the object
402
o = manager->getShardKey().moveToFront(o);
403
insertsForChunks[manager->findChunk(o)].push_back(o);
471
if (group->manager) {
477
ChunkPtr chunk = group->manager->findChunkForDoc(o);
480
group->shard.reset(new Shard(chunk->getShard()));
482
else if (group->shard->getName() != chunk->getShard().getName()) {
484
// Reset to after the previous insert
485
d.markReset(prevObjMark);
486
// Our shard has changed, break out of bulk insert loop
490
o = group->manager->getShardKey().moveToFront(o);
491
group->inserts.push_back(o);
492
group->chunkData[chunk] += objSize;
406
insertsForChunks[ empty ].push_back(o);
500
group->inserts.push_back(o);
418
509
* 1) Error is thrown immediately for corrupt objects
419
510
* 2) Error is thrown only for UserExceptions during the insert process, if last obj had error that's thrown
421
void _insert( Request& r , DbMessage& d ){
512
void _insert(Request& r, DbMessage& d) {
423
514
const string& ns = r.getns();
425
vector<BSONObj> insertsRemaining;
426
while ( d.moreJSObjs() ){
427
insertsRemaining.push_back( d.nextJsObj() );
516
AuthorizationManager* authManager =
517
ClientBasic::getCurrent()->getAuthorizationManager();
518
Status status = authManager->checkAuthForInsert(ns);
519
uassert(16540, status.reason(), status.isOK());
432
if( d.reservedField() & Reserved_InsertOption_ContinueOnError )
433
flags |= InsertOption_ContinueOnError;
435
if( d.reservedField() & Reserved_FromWriteback )
436
flags |= WriteOption_FromWriteback;
438
_insert( ns, insertsRemaining, flags, r, d );
441
void _insert( const string& ns,
442
vector<BSONObj>& inserts,
444
Request& r , DbMessage& d ) // TODO: remove
446
map<ChunkPtr, vector<BSONObj> > insertsForChunks; // Map for bulk inserts to diff chunks
447
_insert( ns, inserts, insertsForChunks, flags, r, d );
450
void _insert( const string& ns,
451
vector<BSONObj>& insertsRemaining,
452
map<ChunkPtr, vector<BSONObj> >& insertsForChunks,
454
Request& r, DbMessage& d, // TODO: remove
457
// TODO: Replace this with a better check to see if we're making progress
458
uassert( 16055, str::stream() << "too many retries during bulk insert, " << insertsRemaining.size() << " inserts remaining", retries < 30 );
459
uassert( 16056, str::stream() << "shutting down server during bulk insert, " << insertsRemaining.size() << " inserts remaining", ! inShutdown() );
461
ChunkManagerPtr manager;
464
// This function handles grouping the inserts per-shard whether the collection is sharded or not.
465
_groupInserts( ns, insertsRemaining, insertsForChunks, manager, primary );
467
// ContinueOnError is always on when using sharding.
468
flags |= manager ? InsertOption_ContinueOnError : 0;
470
while( ! insertsForChunks.empty() ){
472
ChunkPtr c = insertsForChunks.begin()->first;
473
vector<BSONObj>& objs = insertsForChunks.begin()->second;
476
// Careful - if primary exists, c will be empty
479
const Shard& shard = c ? c->getShard() : primary.get();
481
ShardConnection dbcon( shard, ns, manager );
524
if (d.reservedField() & Reserved_InsertOption_ContinueOnError) flags |=
525
InsertOption_ContinueOnError;
527
if (d.reservedField() & Reserved_FromWriteback) flags |= WriteOption_FromWriteback;
529
if (!d.moreJSObjs()) return;
531
_insert(ns, d, flags, r);
534
void _insert(const string& ns, DbMessage& d, int flags, Request& r) // TODO: remove
536
uassert( 16056, str::stream() << "shutting down server during insert", ! inShutdown() );
538
bool continueOnError = flags & InsertOption_ContinueOnError;
540
// Sanity check, probably not needed but for safety
545
bool prevInsertException = false;
547
while (d.moreJSObjs()) {
549
// TODO: Replace this with a better check to see if we're making progress
550
uassert( 16055, str::stream() << "too many retries during insert", retries < 30 );
556
group.resetInsertData();
558
// This function handles grouping the inserts per-shard whether the collection
559
// is sharded or not.
561
// Can record errors on bad insert object format (i.e. doesn't have the shard
562
// key), which should be handled in-order with mongod errors.
564
_getNextInsertGroup(ns, d, flags, &group);
566
// We should always have a shard if we have any inserts
567
verify(group.inserts.size() == 0 || group.shard.get());
569
if (group.inserts.size() > 0 && group.hasException()) {
570
warning() << "problem preparing batch insert detected, first inserting "
571
<< group.inserts.size() << " intermediate documents" << endl;
574
scoped_ptr<ShardConnection> dbconPtr;
485
LOG(4) << "inserting " << objs.size() << " documents to shard " << shard
487
<< ( manager.get() ? manager->getVersion().toString() :
488
ShardChunkVersion( 0, OID() ).toString() ) << endl;
490
// Taken from single-shard bulk insert, should not need multiple methods in future
491
// insert( c->getShard() , r.getns() , objs , flags);
493
// It's okay if the version is set here, an exception will be thrown if the version is incompatible
579
// DO ALL VALID INSERTS
582
if (group.inserts.size() > 0) {
584
dbconPtr.reset(new ShardConnection(*(group.shard), ns, group.manager));
585
ShardConnection& dbcon = *dbconPtr;
589
<< group.inserts.size()
590
<< " documents to shard "
593
<< (group.manager.get() ?
594
group.manager->getVersion().toString() :
595
ChunkVersion(0, OID()).toString())
602
// Will throw SCE if we need to reset our version before sending.
495
603
dbcon.setVersion();
497
catch ( StaleConfigException& e ) {
498
// External try block is still needed to match bulk insert mongod
501
_handleRetries( "insert", retries, ns, objs[0], e, r );
502
_insert( ns, insertsRemaining, insertsForChunks, flags, r, d, retries + 1 );
506
// Certain conn types can't handle bulk inserts, so don't use unless we need to
507
if( objs.size() == 1 ){
508
dbcon->insert( ns, objs[0], flags );
511
dbcon->insert( ns , objs , flags);
514
// TODO: Option for safe inserts here - can then use this for all inserts
515
// Not sure what this means?
519
int bytesWritten = 0;
520
for (vector<BSONObj>::iterator vecIt = objs.begin(); vecIt != objs.end(); ++vecIt) {
521
r.gotInsert(); // Record the correct number of individual inserts
522
bytesWritten += (*vecIt).objsize();
525
// TODO: The only reason we're grouping by chunks here is for auto-split, more efficient
526
// to track this separately and bulk insert to shards
527
if ( c && r.getClientInfo()->autoSplitOk() )
528
c->splitIfShould( bytesWritten );
531
catch( UserException& e ){
532
// Unexpected exception, so don't clean up the conn
535
// These inserts won't be retried, as something weird happened here
536
insertsForChunks.erase( insertsForChunks.begin() );
538
// Throw if this is the last chunk bulk-inserted to
539
if( insertsForChunks.empty() ){
605
// Reset our retries to zero since this batch's version went through
612
string insertErr = "";
616
dbcon->insert(ns, group.inserts, flags);
619
// WARNING: We *have* to return the connection here, otherwise the
620
// error gets checked on a different connection!
624
globalOpCounters.incInsertInWriteLock(group.inserts.size());
627
// CHECK INTERMEDIATE ERROR
630
// We need to check the mongod error if we're inserting more documents,
631
// or if a later mongos error might mask an insert error,
632
// or if an earlier error might mask this error from GLE
633
if (d.moreJSObjs() || group.hasException() || prevInsertException) {
635
LOG(3) << "running intermediate GLE to "
636
<< group.shard->toString() << " during bulk insert "
638
<< (d.moreJSObjs() ? "we have more documents to insert" :
639
(group.hasException() ? "exception detected while preparing group" :
640
"a previous error exists"))
643
ClientInfo* ci = r.getClientInfo();
646
// WARNING: Without this, we will use the *previous* shard for GLE
653
// TODO: Can't actually pass GLE parameters here,
654
// so we use defaults?
655
ci->getLastError("admin",
656
BSON( "getLastError" << 1 ),
662
BSONObj gle = gleB.obj();
663
if (gle["err"].type() == String)
664
insertErr = gle["err"].String();
666
LOG(3) << "intermediate GLE result was " << gle
667
<< " errmsg: " << errMsg << endl;
670
// Clear out the shards we've checked so far, if we're successful
672
ci->clearSinceLastGetError();
675
catch (DBException& e) {
676
// Network error on send or GLE
677
insertErr = e.what();
682
// If the insert had an error, figure out if we should throw right now.
685
if (insertErr.size() > 0) {
687
string errMsg = str::stream()
688
<< "error inserting "
689
<< group.inserts.size()
690
<< " documents to shard "
691
<< group.shard->toString()
693
<< (group.manager.get() ?
694
group.manager->getVersion().toString() :
695
ChunkVersion(0, OID()).toString())
696
<< causedBy(insertErr);
698
// If we're continuing-on-error and the insert error is superseded by
699
// a later error from mongos, we shouldn't throw this error but the
701
if (group.hasException() && continueOnError) warning() << errMsg;
702
else uasserted(16460, errMsg);
706
// SPLIT CHUNKS IF NEEDED
709
// Should never throw errors!
710
if (!group.chunkData.empty() && r.getClientInfo()->autoSplitOk()) {
712
for (map<ChunkPtr, int>::iterator it = group.chunkData.begin();
713
it != group.chunkData.end(); ++it)
715
ChunkPtr c = it->first;
716
int bytesWritten = it->second;
718
c->splitIfShould(bytesWritten);
724
// CHECK AND RE-THROW MONGOS ERROR
727
if (group.hasException()) {
729
string errMsg = str::stream() << "error preparing documents for insert"
730
<< causedBy(group.errMsg);
732
uasserted(group.errCode, errMsg);
735
catch (StaleConfigException& e) {
737
// Clean up the conn if needed
738
if (dbconPtr) dbconPtr->done();
740
// Note - this can throw a SCE which will abort *all* the rest of the inserts
741
// if we retry too many times. We assume that this cannot happen. In any
742
// case, the user gets an error.
743
_handleRetries("insert", retries, ns, group.inserts[0], e, r);
746
// Go back to the start of the inserts
749
verify( d.moreJSObjs() );
751
catch (UserException& e) {
753
// Unexpected exception, cleans up the conn if not already done()
754
if (dbconPtr) dbconPtr->kill();
756
warning() << "exception during insert"
757
<< (continueOnError ? " (continue on error set)" : "") << causedBy(e)
760
prevInsertException = true;
763
// Throw if this is the last chunk bulk-inserted to, or if continue-on-error is
766
// If this is the last chunk bulk-inserted to, then if continue-on-error is
767
// true, we'll have the final error, and if continue-on-error is false, we
768
// should abort anyway.
771
if (!d.moreJSObjs() || !continueOnError) {