~ubuntu-branches/ubuntu/trusty/mongodb/trusty-proposed

« back to all changes in this revision

Viewing changes to s/strategy.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Antonin Kral
  • Date: 2010-01-29 19:48:45 UTC
  • Revision ID: james.westby@ubuntu.com-20100129194845-8wbmkf626fwcavc9
Tags: upstream-1.3.1
ImportĀ upstreamĀ versionĀ 1.3.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// stragegy.cpp
 
2
 
 
3
#include "stdafx.h"
 
4
#include "request.h"
 
5
#include "../util/background.h"
 
6
#include "../client/connpool.h"
 
7
#include "../db/commands.h"
 
8
#include "server.h"
 
9
 
 
10
namespace mongo {
 
11
 
 
12
    // ----- Strategy ------
 
13
 
 
14
    void Strategy::doWrite( int op , Request& r , string server ){
 
15
        ScopedDbConnection dbcon( server );
 
16
        DBClientBase &_c = dbcon.conn();
 
17
        
 
18
        /* TODO FIX - do not case and call DBClientBase::say() */
 
19
        DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c);
 
20
        c.port().say( r.m() );
 
21
        
 
22
        dbcon.done();
 
23
    }
 
24
 
 
25
    void Strategy::doQuery( Request& r , string server ){
 
26
        try{
 
27
            ScopedDbConnection dbcon( server );
 
28
            DBClientBase &_c = dbcon.conn();
 
29
            
 
30
            checkShardVersion( _c , r.getns() );
 
31
            
 
32
            // TODO: This will not work with Paired connections.  Fix. 
 
33
            DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c);
 
34
            Message response;
 
35
            bool ok = c.port().call( r.m(), response);
 
36
 
 
37
            {
 
38
                QueryResult *qr = (QueryResult *) response.data;
 
39
                if ( qr->resultFlags() & QueryResult::ResultFlag_ShardConfigStale ){
 
40
                    dbcon.done();
 
41
                    throw StaleConfigException( r.getns() , "Strategy::doQuery" );
 
42
                }
 
43
            }
 
44
 
 
45
            uassert( 10200 , "mongos: error calling db", ok);
 
46
            r.reply( response );
 
47
            dbcon.done();
 
48
        }
 
49
        catch ( AssertionException& e ) {
 
50
            BSONObjBuilder err;
 
51
            err.append("$err", string("mongos: ") + (e.msg.empty() ? "assertion during query" : e.msg));
 
52
            BSONObj errObj = err.done();
 
53
            replyToQuery(QueryResult::ResultFlag_ErrSet, r.p() , r.m() , errObj);
 
54
        }
 
55
    }
 
56
    
 
57
    void Strategy::insert( string server , const char * ns , const BSONObj& obj ){
 
58
        ScopedDbConnection dbcon( server );
 
59
        checkShardVersion( dbcon.conn() , ns );
 
60
        dbcon->insert( ns , obj );
 
61
        dbcon.done();
 
62
    }
 
63
 
 
64
    map<DBClientBase*,unsigned long long> checkShardVersionLastSequence;
 
65
 
 
66
    class WriteBackListener : public BackgroundJob {
 
67
    protected:
 
68
        
 
69
        WriteBackListener( const string& addr ) : _addr( addr ){
 
70
            cout << "creating WriteBackListener for: " << addr << endl;
 
71
        }
 
72
        
 
73
        void run(){
 
74
            int secsToSleep = 0;
 
75
            while ( 1 ){
 
76
                try {
 
77
                    ScopedDbConnection conn( _addr );
 
78
                    
 
79
                    BSONObj result;
 
80
                    
 
81
                    {
 
82
                        BSONObjBuilder cmd;
 
83
                        cmd.appendOID( "writebacklisten" , &serverID );
 
84
                        if ( ! conn->runCommand( "admin" , cmd.obj() , result ) ){
 
85
                            log() <<  "writebacklisten command failed!  "  << result << endl;
 
86
                            conn.done();
 
87
                            continue;
 
88
                        }
 
89
 
 
90
                    }
 
91
                    
 
92
                    log(1) << "writebacklisten result: " << result << endl;
 
93
                    
 
94
                    BSONObj data = result.getObjectField( "data" );
 
95
                    if ( data.getBoolField( "writeBack" ) ){
 
96
                        string ns = data["ns"].valuestrsafe();
 
97
 
 
98
                        int len;
 
99
 
 
100
                        Message m( (void*)data["msg"].binData( len ) , false );
 
101
                        massert( 10427 ,  "invalid writeback message" , m.data->valid() );                        
 
102
 
 
103
                        grid.getDBConfig( ns )->getChunkManager( ns , true );
 
104
                        
 
105
                        Request r( m , 0 );
 
106
                        r.process();
 
107
                    }
 
108
                    else {
 
109
                        log() << "unknown writeBack result: " << result << endl;
 
110
                    }
 
111
                    
 
112
                    conn.done();
 
113
                    secsToSleep = 0;
 
114
                }
 
115
                catch ( std::exception e ){
 
116
                    log() << "WriteBackListener exception : " << e.what() << endl;
 
117
                }
 
118
                catch ( ... ){
 
119
                    log() << "WriteBackListener uncaught exception!" << endl;
 
120
                }
 
121
                secsToSleep++;
 
122
                sleepsecs(secsToSleep);
 
123
                if ( secsToSleep > 10 )
 
124
                    secsToSleep = 0;
 
125
            }
 
126
        }
 
127
        
 
128
    private:
 
129
        string _addr;
 
130
        static map<string,WriteBackListener*> _cache;
 
131
 
 
132
    public:
 
133
        static void init( DBClientBase& conn ){
 
134
            WriteBackListener*& l = _cache[conn.getServerAddress()];
 
135
            if ( l )
 
136
                return;
 
137
            l = new WriteBackListener( conn.getServerAddress() );
 
138
            l->go();
 
139
        }
 
140
 
 
141
    };
 
142
 
 
143
    map<string,WriteBackListener*> WriteBackListener::_cache;
 
144
    
 
145
 
 
146
    void checkShardVersion( DBClientBase& conn , const string& ns , bool authoritative ){
 
147
        // TODO: cache, optimize, etc...
 
148
        
 
149
        WriteBackListener::init( conn );
 
150
 
 
151
        DBConfig * conf = grid.getDBConfig( ns );
 
152
        if ( ! conf )
 
153
            return;
 
154
        
 
155
        ShardChunkVersion version = 0;
 
156
        unsigned long long officialSequenceNumber = 0;
 
157
 
 
158
        if ( conf->isSharded( ns ) ){
 
159
            ChunkManager * manager = conf->getChunkManager( ns , authoritative );
 
160
            officialSequenceNumber = manager->getSequenceNumber();
 
161
            version = manager->getVersion( conn.getServerAddress() );
 
162
        }
 
163
 
 
164
        unsigned long long & sequenceNumber = checkShardVersionLastSequence[ &conn ];        
 
165
        if ( officialSequenceNumber == sequenceNumber )
 
166
            return;
 
167
        
 
168
        log(2) << " have to set shard version for conn: " << &conn << " ns:" << ns << " my last seq: " << sequenceNumber << "  current: " << officialSequenceNumber << endl;
 
169
 
 
170
        BSONObj result;
 
171
        if ( setShardVersion( conn , ns , version , authoritative , result ) ){
 
172
            // success!
 
173
            log(1) << "      setShardVersion success!" << endl;
 
174
            sequenceNumber = officialSequenceNumber;
 
175
            return;
 
176
        }
 
177
 
 
178
        log(1) << "       setShardVersion failed!\n" << result << endl;
 
179
 
 
180
        if ( result.getBoolField( "need_authoritative" ) )
 
181
            massert( 10428 ,  "need_authoritative set but in authoritative mode already" , ! authoritative );
 
182
        
 
183
        if ( ! authoritative ){
 
184
            checkShardVersion( conn , ns , 1 );
 
185
            return;
 
186
        }
 
187
        
 
188
        log(1) << "     setShardVersion failed: " << result << endl;
 
189
        massert( 10429 ,  "setShardVersion failed!" , 0 );
 
190
    }
 
191
    
 
192
    bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result ){
 
193
 
 
194
        BSONObjBuilder cmdBuilder;
 
195
        cmdBuilder.append( "setShardVersion" , ns.c_str() );
 
196
        cmdBuilder.append( "configdb" , configServer.modelServer() );
 
197
        cmdBuilder.appendTimestamp( "version" , version );
 
198
        cmdBuilder.appendOID( "serverID" , &serverID );
 
199
        if ( authoritative )
 
200
            cmdBuilder.appendBool( "authoritative" , 1 );
 
201
        BSONObj cmd = cmdBuilder.obj();
 
202
        
 
203
        log(1) << "    setShardVersion  " << conn.getServerAddress() << "  " << ns << "  " << cmd << " " << &conn << endl;
 
204
        
 
205
        return conn.runCommand( "admin" , cmd , result );
 
206
    }
 
207
 
 
208
    bool lockNamespaceOnServer( const string& server , const string& ns ){
 
209
        ScopedDbConnection conn( server );
 
210
        bool res = lockNamespaceOnServer( conn.conn() , ns );
 
211
        conn.done();
 
212
        return res;
 
213
    }
 
214
 
 
215
    bool lockNamespaceOnServer( DBClientBase& conn , const string& ns ){
 
216
        BSONObj lockResult;
 
217
        return setShardVersion( conn , ns , grid.getNextOpTime() , true , lockResult );
 
218
    }
 
219
 
 
220
    
 
221
}