5
#include "../util/background.h"
6
#include "../client/connpool.h"
7
#include "../db/commands.h"
12
// ----- Strategy ------
14
void Strategy::doWrite( int op , Request& r , string server ){
15
ScopedDbConnection dbcon( server );
16
DBClientBase &_c = dbcon.conn();
18
/* TODO FIX - do not case and call DBClientBase::say() */
19
DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c);
20
c.port().say( r.m() );
25
void Strategy::doQuery( Request& r , string server ){
27
ScopedDbConnection dbcon( server );
28
DBClientBase &_c = dbcon.conn();
30
checkShardVersion( _c , r.getns() );
32
// TODO: This will not work with Paired connections. Fix.
33
DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c);
35
bool ok = c.port().call( r.m(), response);
38
QueryResult *qr = (QueryResult *) response.data;
39
if ( qr->resultFlags() & QueryResult::ResultFlag_ShardConfigStale ){
41
throw StaleConfigException( r.getns() , "Strategy::doQuery" );
45
uassert( 10200 , "mongos: error calling db", ok);
49
catch ( AssertionException& e ) {
51
err.append("$err", string("mongos: ") + (e.msg.empty() ? "assertion during query" : e.msg));
52
BSONObj errObj = err.done();
53
replyToQuery(QueryResult::ResultFlag_ErrSet, r.p() , r.m() , errObj);
57
void Strategy::insert( string server , const char * ns , const BSONObj& obj ){
58
ScopedDbConnection dbcon( server );
59
checkShardVersion( dbcon.conn() , ns );
60
dbcon->insert( ns , obj );
64
map<DBClientBase*,unsigned long long> checkShardVersionLastSequence;
66
class WriteBackListener : public BackgroundJob {
69
WriteBackListener( const string& addr ) : _addr( addr ){
70
cout << "creating WriteBackListener for: " << addr << endl;
77
ScopedDbConnection conn( _addr );
83
cmd.appendOID( "writebacklisten" , &serverID );
84
if ( ! conn->runCommand( "admin" , cmd.obj() , result ) ){
85
log() << "writebacklisten command failed! " << result << endl;
92
log(1) << "writebacklisten result: " << result << endl;
94
BSONObj data = result.getObjectField( "data" );
95
if ( data.getBoolField( "writeBack" ) ){
96
string ns = data["ns"].valuestrsafe();
100
Message m( (void*)data["msg"].binData( len ) , false );
101
massert( 10427 , "invalid writeback message" , m.data->valid() );
103
grid.getDBConfig( ns )->getChunkManager( ns , true );
109
log() << "unknown writeBack result: " << result << endl;
115
catch ( std::exception e ){
116
log() << "WriteBackListener exception : " << e.what() << endl;
119
log() << "WriteBackListener uncaught exception!" << endl;
122
sleepsecs(secsToSleep);
123
if ( secsToSleep > 10 )
130
static map<string,WriteBackListener*> _cache;
133
static void init( DBClientBase& conn ){
134
WriteBackListener*& l = _cache[conn.getServerAddress()];
137
l = new WriteBackListener( conn.getServerAddress() );
143
map<string,WriteBackListener*> WriteBackListener::_cache;
146
void checkShardVersion( DBClientBase& conn , const string& ns , bool authoritative ){
147
// TODO: cache, optimize, etc...
149
WriteBackListener::init( conn );
151
DBConfig * conf = grid.getDBConfig( ns );
155
ShardChunkVersion version = 0;
156
unsigned long long officialSequenceNumber = 0;
158
if ( conf->isSharded( ns ) ){
159
ChunkManager * manager = conf->getChunkManager( ns , authoritative );
160
officialSequenceNumber = manager->getSequenceNumber();
161
version = manager->getVersion( conn.getServerAddress() );
164
unsigned long long & sequenceNumber = checkShardVersionLastSequence[ &conn ];
165
if ( officialSequenceNumber == sequenceNumber )
168
log(2) << " have to set shard version for conn: " << &conn << " ns:" << ns << " my last seq: " << sequenceNumber << " current: " << officialSequenceNumber << endl;
171
if ( setShardVersion( conn , ns , version , authoritative , result ) ){
173
log(1) << " setShardVersion success!" << endl;
174
sequenceNumber = officialSequenceNumber;
178
log(1) << " setShardVersion failed!\n" << result << endl;
180
if ( result.getBoolField( "need_authoritative" ) )
181
massert( 10428 , "need_authoritative set but in authoritative mode already" , ! authoritative );
183
if ( ! authoritative ){
184
checkShardVersion( conn , ns , 1 );
188
log(1) << " setShardVersion failed: " << result << endl;
189
massert( 10429 , "setShardVersion failed!" , 0 );
192
bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result ){
194
BSONObjBuilder cmdBuilder;
195
cmdBuilder.append( "setShardVersion" , ns.c_str() );
196
cmdBuilder.append( "configdb" , configServer.modelServer() );
197
cmdBuilder.appendTimestamp( "version" , version );
198
cmdBuilder.appendOID( "serverID" , &serverID );
200
cmdBuilder.appendBool( "authoritative" , 1 );
201
BSONObj cmd = cmdBuilder.obj();
203
log(1) << " setShardVersion " << conn.getServerAddress() << " " << ns << " " << cmd << " " << &conn << endl;
205
return conn.runCommand( "admin" , cmd , result );
208
bool lockNamespaceOnServer( const string& server , const string& ns ){
209
ScopedDbConnection conn( server );
210
bool res = lockNamespaceOnServer( conn.conn() , ns );
215
bool lockNamespaceOnServer( DBClientBase& conn , const string& ns ){
217
return setShardVersion( conn , ns , grid.getNextOpTime() , true , lockResult );