~ubuntu-branches/ubuntu/trusty/mongodb/trusty-proposed

« back to all changes in this revision

Viewing changes to client/dbclient.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Antonin Kral
  • Date: 2010-01-29 19:48:45 UTC
  • Revision ID: james.westby@ubuntu.com-20100129194845-8wbmkf626fwcavc9
Tags: upstream-1.3.1
ImportĀ upstreamĀ versionĀ 1.3.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// dbclient.cpp - connect to a Mongo database as a database, from C++
 
2
 
 
3
/*    Copyright 2009 10gen Inc.
 
4
 *
 
5
 *    Licensed under the Apache License, Version 2.0 (the "License");
 
6
 *    you may not use this file except in compliance with the License.
 
7
 *    You may obtain a copy of the License at
 
8
 *
 
9
 *    http://www.apache.org/licenses/LICENSE-2.0
 
10
 *
 
11
 *    Unless required by applicable law or agreed to in writing, software
 
12
 *    distributed under the License is distributed on an "AS IS" BASIS,
 
13
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
14
 *    See the License for the specific language governing permissions and
 
15
 *    limitations under the License.
 
16
 */
 
17
 
 
18
#include "stdafx.h"
 
19
#include "../db/pdfile.h"
 
20
#include "dbclient.h"
 
21
#include "../util/builder.h"
 
22
#include "../db/jsobj.h"
 
23
#include "../db/json.h"
 
24
#include "../db/instance.h"
 
25
#include "../util/md5.hpp"
 
26
#include "../db/dbmessage.h"
 
27
#include "../db/cmdline.h"
 
28
 
 
29
namespace mongo {
 
30
 
 
31
    Query& Query::where(const string &jscode, BSONObj scope) { 
 
32
        /* use where() before sort() and hint() and explain(), else this will assert. */
 
33
        assert( !obj.hasField("query") );
 
34
        BSONObjBuilder b;
 
35
        b.appendElements(obj);
 
36
        b.appendWhere(jscode, scope);
 
37
        obj = b.obj();
 
38
        return *this;
 
39
    }
 
40
 
 
41
    void Query::makeComplex() {
 
42
        if ( obj.hasElement( "query" ) )
 
43
            return;
 
44
        BSONObjBuilder b;
 
45
        b.append( "query", obj );
 
46
        obj = b.obj();
 
47
    }
 
48
 
 
49
    Query& Query::sort(const BSONObj& s) { 
 
50
        appendComplex( "orderby", s );
 
51
        return *this; 
 
52
    }
 
53
 
 
54
    Query& Query::hint(BSONObj keyPattern) {
 
55
        appendComplex( "$hint", keyPattern );
 
56
        return *this; 
 
57
    }
 
58
 
 
59
    Query& Query::explain() {
 
60
        appendComplex( "$explain", true );
 
61
        return *this; 
 
62
    }
 
63
    
 
64
    Query& Query::snapshot() {
 
65
        appendComplex( "$snapshot", true );
 
66
        return *this; 
 
67
    }
 
68
    
 
69
    Query& Query::minKey( const BSONObj &val ) {
 
70
        appendComplex( "$min", val );
 
71
        return *this; 
 
72
    }
 
73
 
 
74
    Query& Query::maxKey( const BSONObj &val ) {
 
75
        appendComplex( "$max", val );
 
76
        return *this; 
 
77
    }
 
78
 
 
79
    bool Query::isComplex() const{
 
80
        return obj.hasElement( "query" );
 
81
    }
 
82
        
 
83
    BSONObj Query::getFilter() const {
 
84
        if ( ! isComplex() )
 
85
            return obj;
 
86
        return obj.getObjectField( "query" );
 
87
    }
 
88
    BSONObj Query::getSort() const {
 
89
        if ( ! isComplex() )
 
90
            return BSONObj();
 
91
        return obj.getObjectField( "orderby" );
 
92
    }
 
93
    BSONObj Query::getHint() const {
 
94
        if ( ! isComplex() )
 
95
            return BSONObj();
 
96
        return obj.getObjectField( "$hint" );
 
97
    }
 
98
    bool Query::isExplain() const {
 
99
        return isComplex() && obj.getBoolField( "$explain" );
 
100
    }
 
101
    
 
102
    string Query::toString() const{
 
103
        return obj.toString();
 
104
    }
 
105
 
 
106
    /* --- dbclientcommands --- */
 
107
 
 
108
    inline bool DBClientWithCommands::isOk(const BSONObj& o) {
 
109
        return o.getIntField("ok") == 1;
 
110
    }
 
111
 
 
112
    inline bool DBClientWithCommands::runCommand(const string &dbname, const BSONObj& cmd, BSONObj &info, int options) {
 
113
        string ns = dbname + ".$cmd";
 
114
        info = findOne(ns, cmd, 0 , options);
 
115
        return isOk(info);
 
116
    }
 
117
 
 
118
    /* note - we build a bson obj here -- for something that is super common like getlasterror you
 
119
              should have that object prebuilt as that would be faster.
 
120
    */
 
121
    bool DBClientWithCommands::simpleCommand(const string &dbname, BSONObj *info, const string &command) {
 
122
        BSONObj o;
 
123
        if ( info == 0 )
 
124
            info = &o;
 
125
        BSONObjBuilder b;
 
126
        b.append(command, 1);
 
127
        return runCommand(dbname, b.done(), *info);
 
128
    }
 
129
 
 
130
    unsigned long long DBClientWithCommands::count(const string &_ns, const BSONObj& query, int options) { 
 
131
        NamespaceString ns(_ns);
 
132
        BSONObj cmd = BSON( "count" << ns.coll << "query" << query );
 
133
        BSONObj res;
 
134
        if( !runCommand(ns.db.c_str(), cmd, res, options) )
 
135
            uasserted(11010,string("count fails:") + res.toString());
 
136
        return res.getIntField("n");
 
137
    }
 
138
 
 
139
    BSONObj getlasterrorcmdobj = fromjson("{getlasterror:1}");
 
140
 
 
141
    BSONObj DBClientWithCommands::getLastErrorDetailed() { 
 
142
        BSONObj info;
 
143
        runCommand("admin", getlasterrorcmdobj, info);
 
144
                return info;
 
145
    }
 
146
 
 
147
    string DBClientWithCommands::getLastError() { 
 
148
        BSONObj info = getLastErrorDetailed();
 
149
        BSONElement e = info["err"];
 
150
        if( e.eoo() ) return "";
 
151
        if( e.type() == Object ) return e.toString();
 
152
        return e.str();
 
153
    }
 
154
 
 
155
    BSONObj getpreverrorcmdobj = fromjson("{getpreverror:1}");
 
156
 
 
157
    BSONObj DBClientWithCommands::getPrevError() { 
 
158
        BSONObj info;
 
159
        runCommand("admin", getpreverrorcmdobj, info);
 
160
        return info;
 
161
    }
 
162
 
 
163
    BSONObj getnoncecmdobj = fromjson("{getnonce:1}");
 
164
 
 
165
    string DBClientWithCommands::createPasswordDigest( const string & username , const string & clearTextPassword ){
 
166
        md5digest d;
 
167
        {
 
168
            md5_state_t st;
 
169
            md5_init(&st);
 
170
            md5_append(&st, (const md5_byte_t *) username.data(), username.length());
 
171
            md5_append(&st, (const md5_byte_t *) ":mongo:", 7 );
 
172
            md5_append(&st, (const md5_byte_t *) clearTextPassword.data(), clearTextPassword.length());
 
173
            md5_finish(&st, d);
 
174
        }
 
175
        return digestToString( d );
 
176
    }
 
177
 
 
178
    bool DBClientWithCommands::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) {
 
179
                //cout << "TEMP AUTH " << toString() << dbname << ' ' << username << ' ' << password_text << ' ' << digestPassword << endl;
 
180
 
 
181
                string password = password_text;
 
182
                if( digestPassword ) 
 
183
                        password = createPasswordDigest( username , password_text );
 
184
 
 
185
        BSONObj info;
 
186
        string nonce;
 
187
        if( !runCommand(dbname, getnoncecmdobj, info) ) {
 
188
            errmsg = "getnonce fails - connection problem?";
 
189
            return false;
 
190
        }
 
191
        {
 
192
            BSONElement e = info.getField("nonce");
 
193
            assert( e.type() == String );
 
194
            nonce = e.valuestr();
 
195
        }
 
196
 
 
197
        BSONObj authCmd;
 
198
        BSONObjBuilder b;
 
199
        {
 
200
 
 
201
            b << "authenticate" << 1 << "nonce" << nonce << "user" << username;
 
202
            md5digest d;
 
203
            {
 
204
                md5_state_t st;
 
205
                md5_init(&st);
 
206
                md5_append(&st, (const md5_byte_t *) nonce.c_str(), nonce.size() );
 
207
                md5_append(&st, (const md5_byte_t *) username.data(), username.length());
 
208
                md5_append(&st, (const md5_byte_t *) password.c_str(), password.size() );
 
209
                md5_finish(&st, d);
 
210
            }
 
211
            b << "key" << digestToString( d );
 
212
            authCmd = b.done();
 
213
        }
 
214
        
 
215
        if( runCommand(dbname, authCmd, info) ) 
 
216
            return true;
 
217
 
 
218
        errmsg = info.toString();
 
219
        return false;
 
220
    }
 
221
 
 
222
    BSONObj ismastercmdobj = fromjson("{\"ismaster\":1}");
 
223
 
 
224
    bool DBClientWithCommands::isMaster(bool& isMaster, BSONObj *info) {
 
225
        BSONObj o;
 
226
        if ( info == 0 )        info = &o;
 
227
        bool ok = runCommand("admin", ismastercmdobj, *info);
 
228
        isMaster = (info->getIntField("ismaster") == 1);
 
229
        return ok;
 
230
    }
 
231
 
 
232
    bool DBClientWithCommands::createCollection(const string &ns, unsigned size, bool capped, int max, BSONObj *info) {
 
233
        BSONObj o;
 
234
        if ( info == 0 )        info = &o;
 
235
        BSONObjBuilder b;
 
236
        b.append("create", ns);
 
237
        if ( size ) b.append("size", size);
 
238
        if ( capped ) b.append("capped", true);
 
239
        if ( max ) b.append("max", max);
 
240
        string db = nsToDatabase(ns.c_str());
 
241
        return runCommand(db.c_str(), b.done(), *info);
 
242
    }
 
243
 
 
244
    bool DBClientWithCommands::copyDatabase(const string &fromdb, const string &todb, const string &fromhost, BSONObj *info) {
 
245
        BSONObj o;
 
246
        if ( info == 0 ) info = &o;
 
247
        BSONObjBuilder b;
 
248
        b.append("copydb", 1);
 
249
        b.append("fromhost", fromhost);
 
250
        b.append("fromdb", fromdb);
 
251
        b.append("todb", todb);
 
252
        return runCommand("admin", b.done(), *info);
 
253
    }
 
254
 
 
255
    bool DBClientWithCommands::setDbProfilingLevel(const string &dbname, ProfilingLevel level, BSONObj *info ) {
 
256
        BSONObj o;
 
257
        if ( info == 0 ) info = &o;
 
258
 
 
259
        if ( level ) {
 
260
            // Create system.profile collection.  If it already exists this does nothing.
 
261
            // TODO: move this into the db instead of here so that all
 
262
            //       drivers don't have to do this.
 
263
            string ns = dbname + ".system.profile";
 
264
            createCollection(ns.c_str(), 1024 * 1024, true, 0, info);
 
265
        }
 
266
 
 
267
        BSONObjBuilder b;
 
268
        b.append("profile", (int) level);
 
269
        return runCommand(dbname, b.done(), *info);
 
270
    }
 
271
 
 
272
    BSONObj getprofilingcmdobj = fromjson("{\"profile\":-1}");
 
273
 
 
274
    bool DBClientWithCommands::getDbProfilingLevel(const string &dbname, ProfilingLevel& level, BSONObj *info) {
 
275
        BSONObj o;
 
276
        if ( info == 0 ) info = &o;
 
277
        if ( runCommand(dbname, getprofilingcmdobj, *info) ) {
 
278
            level = (ProfilingLevel) info->getIntField("was");
 
279
            return true;
 
280
        }
 
281
        return false;
 
282
    }
 
283
 
 
284
    BSONObj DBClientWithCommands::mapreduce(const string &ns, const string &jsmapf, const string &jsreducef, BSONObj query, const string& outputcolname) { 
 
285
        BSONObjBuilder b;
 
286
        b.append("mapreduce", nsGetCollection(ns));
 
287
        b.appendCode("map", jsmapf.c_str());
 
288
        b.appendCode("reduce", jsreducef.c_str());
 
289
        if( !query.isEmpty() )
 
290
            b.append("query", query);
 
291
        if( !outputcolname.empty() )
 
292
            b.append("out", outputcolname);
 
293
        BSONObj info;
 
294
        runCommand(nsGetDB(ns), b.done(), info);
 
295
        return info;
 
296
    }
 
297
 
 
298
    bool DBClientWithCommands::eval(const string &dbname, const string &jscode, BSONObj& info, BSONElement& retValue, BSONObj *args) {
 
299
        BSONObjBuilder b;
 
300
        b.appendCode("$eval", jscode.c_str());
 
301
        if ( args )
 
302
            b.appendArray("args", *args);
 
303
        bool ok = runCommand(dbname, b.done(), info);
 
304
        if ( ok )
 
305
            retValue = info.getField("retval");
 
306
        return ok;
 
307
    }
 
308
 
 
309
    bool DBClientWithCommands::eval(const string &dbname, const string &jscode) {
 
310
        BSONObj info;
 
311
        BSONElement retValue;
 
312
        return eval(dbname, jscode, info, retValue);
 
313
    }
 
314
 
 
315
    list<string> DBClientWithCommands::getDatabaseNames(){
 
316
        BSONObj info;
 
317
        uassert( 10005 ,  "listdatabases failed" , runCommand( "admin" , BSON( "listDatabases" << 1 ) , info ) );
 
318
        uassert( 10006 ,  "listDatabases.databases not array" , info["databases"].type() == Array );
 
319
        
 
320
        list<string> names;
 
321
        
 
322
        BSONObjIterator i( info["databases"].embeddedObjectUserCheck() );
 
323
        while ( i.more() ){
 
324
            names.push_back( i.next().embeddedObjectUserCheck()["name"].valuestr() );
 
325
        }
 
326
 
 
327
        return names;
 
328
    }
 
329
 
 
330
    list<string> DBClientWithCommands::getCollectionNames( const string& db ){
 
331
        list<string> names;
 
332
        
 
333
        string ns = db + ".system.namespaces";
 
334
        auto_ptr<DBClientCursor> c = query( ns.c_str() , BSONObj() );
 
335
        while ( c->more() ){
 
336
            string name = c->next()["name"].valuestr();
 
337
            if ( name.find( "$" ) != string::npos )
 
338
                continue;
 
339
            names.push_back( name );
 
340
        }
 
341
        return names;
 
342
    }
 
343
 
 
344
    bool DBClientWithCommands::exists( const string& ns ){
 
345
        list<string> names;
 
346
        
 
347
        string db = nsGetDB( ns ) + ".system.namespaces";
 
348
        BSONObj q = BSON( "name" << ns );
 
349
        return count( db.c_str() , q );
 
350
    }
 
351
 
 
352
 
 
353
    void testSort() { 
 
354
        DBClientConnection c;
 
355
        string err;
 
356
        if ( !c.connect("localhost", err) ) {
 
357
            out() << "can't connect to server " << err << endl;
 
358
            return;
 
359
        }
 
360
 
 
361
        cout << "findOne returns:" << endl;
 
362
        cout << c.findOne("test.foo", QUERY( "x" << 3 ) ).toString() << endl;
 
363
        cout << c.findOne("test.foo", QUERY( "x" << 3 ).sort("name") ).toString() << endl;
 
364
 
 
365
    }
 
366
 
 
367
    /* TODO: unit tests should run this? */
 
368
    void testDbEval() {
 
369
        DBClientConnection c;
 
370
        string err;
 
371
        if ( !c.connect("localhost", err) ) {
 
372
            out() << "can't connect to server " << err << endl;
 
373
            return;
 
374
        }
 
375
 
 
376
        if( !c.auth("dwight", "u", "p", err) ) { 
 
377
            out() << "can't authenticate " << err << endl;
 
378
            return;
 
379
        }
 
380
 
 
381
        BSONObj info;
 
382
        BSONElement retValue;
 
383
        BSONObjBuilder b;
 
384
        b.append("0", 99);
 
385
        BSONObj args = b.done();
 
386
        bool ok = c.eval("dwight", "function() { return args[0]; }", info, retValue, &args);
 
387
        out() << "eval ok=" << ok << endl;
 
388
        out() << "retvalue=" << retValue.toString() << endl;
 
389
        out() << "info=" << info.toString() << endl;
 
390
 
 
391
        out() << endl;
 
392
 
 
393
        int x = 3;
 
394
        assert( c.eval("dwight", "function() { return 3; }", x) );
 
395
 
 
396
        out() << "***\n";
 
397
 
 
398
        BSONObj foo = fromjson("{\"x\":7}");
 
399
        out() << foo.toString() << endl;
 
400
        int res=0;
 
401
        ok = c.eval("dwight", "function(parm1) { return parm1.x; }", foo, res);
 
402
        out() << ok << " retval:" << res << endl;
 
403
    }
 
404
 
 
405
        void testPaired();
 
406
 
 
407
    /* --- dbclientconnection --- */
 
408
 
 
409
        bool DBClientConnection::auth(const string &dbname, const string &username, const string &password_text, string& errmsg, bool digestPassword) {
 
410
                string password = password_text;
 
411
                if( digestPassword ) 
 
412
                        password = createPasswordDigest( username , password_text );
 
413
 
 
414
                if( autoReconnect ) {
 
415
                        /* note we remember the auth info before we attempt to auth -- if the connection is broken, we will 
 
416
                           then have it for the next autoreconnect attempt. 
 
417
                        */
 
418
                        pair<string,string> p = pair<string,string>(username, password);
 
419
                        authCache[dbname] = p;
 
420
                }
 
421
 
 
422
                return DBClientBase::auth(dbname, username, password.c_str(), errmsg, false);
 
423
        }
 
424
 
 
425
    BSONObj DBClientInterface::findOne(const string &ns, Query query, const BSONObj *fieldsToReturn, int queryOptions) {
 
426
        auto_ptr<DBClientCursor> c =
 
427
            this->query(ns, query, 1, 0, fieldsToReturn, queryOptions);
 
428
 
 
429
        massert( 10276 ,  "DBClientBase::findOne: transport error", c.get() );
 
430
 
 
431
        if ( !c->more() )
 
432
            return BSONObj();
 
433
 
 
434
        return c->next().copy();
 
435
    }
 
436
 
 
437
    bool DBClientConnection::connect(const string &_serverAddress, string& errmsg) {
 
438
        serverAddress = _serverAddress;
 
439
 
 
440
        string ip;
 
441
        int port;
 
442
        size_t idx = serverAddress.find( ":" );
 
443
        if ( idx != string::npos ) {
 
444
            port = strtol( serverAddress.substr( idx + 1 ).c_str(), 0, 10 );
 
445
            ip = serverAddress.substr( 0 , idx );
 
446
            ip = hostbyname(ip.c_str());
 
447
        } else {
 
448
            port = CmdLine::DefaultDBPort;
 
449
            ip = hostbyname( serverAddress.c_str() );
 
450
        }
 
451
        massert( 10277 ,  "Unable to parse hostname", !ip.empty() );
 
452
 
 
453
        // we keep around SockAddr for connection life -- maybe MessagingPort
 
454
        // requires that?
 
455
        server = auto_ptr<SockAddr>(new SockAddr(ip.c_str(), port));
 
456
        p = auto_ptr<MessagingPort>(new MessagingPort());
 
457
 
 
458
        if ( !p->connect(*server) ) {
 
459
            stringstream ss;
 
460
            ss << "couldn't connect to server " << serverAddress << " " << ip << ":" << port;
 
461
            errmsg = ss.str();
 
462
            failed = true;
 
463
            return false;
 
464
        }
 
465
        return true;
 
466
    }
 
467
 
 
468
    void DBClientConnection::_checkConnection() {
 
469
        if ( !failed )
 
470
            return;
 
471
        if ( lastReconnectTry && time(0)-lastReconnectTry < 2 )
 
472
            return;
 
473
        if ( !autoReconnect )
 
474
            return;
 
475
 
 
476
        lastReconnectTry = time(0);
 
477
        log() << "trying reconnect to " << serverAddress << endl;
 
478
        string errmsg;
 
479
        string tmp = serverAddress;
 
480
        failed = false;
 
481
        if ( !connect(tmp.c_str(), errmsg) ) { 
 
482
            log() << "reconnect " << serverAddress << " failed " << errmsg << endl;
 
483
                        return;
 
484
                }
 
485
 
 
486
                log() << "reconnect " << serverAddress << " ok" << endl;
 
487
                for( map< string, pair<string,string> >::iterator i = authCache.begin(); i != authCache.end(); i++ ) { 
 
488
                        const char *dbname = i->first.c_str();
 
489
                        const char *username = i->second.first.c_str();
 
490
                        const char *password = i->second.second.c_str();
 
491
                        if( !DBClientBase::auth(dbname, username, password, errmsg, false) )
 
492
                                log() << "reconnect: auth failed db:" << dbname << " user:" << username << ' ' << errmsg << '\n';
 
493
                }
 
494
    }
 
495
 
 
496
    auto_ptr<DBClientCursor> DBClientBase::query(const string &ns, Query query, int nToReturn,
 
497
            int nToSkip, const BSONObj *fieldsToReturn, int queryOptions) {
 
498
        auto_ptr<DBClientCursor> c( new DBClientCursor( this,
 
499
                                    ns, query.obj, nToReturn, nToSkip,
 
500
                                    fieldsToReturn, queryOptions ) );
 
501
        if ( c->init() )
 
502
            return c;
 
503
        return auto_ptr< DBClientCursor >( 0 );
 
504
    }
 
505
 
 
506
    auto_ptr<DBClientCursor> DBClientBase::getMore( const string &ns, long long cursorId, int nToReturn, int options ) {
 
507
        auto_ptr<DBClientCursor> c( new DBClientCursor( this, ns, cursorId, nToReturn, options ) );
 
508
        if ( c->init() )
 
509
            return c;
 
510
        return auto_ptr< DBClientCursor >( 0 );
 
511
    }
 
512
 
 
513
    void DBClientBase::insert( const string & ns , BSONObj obj ) {
 
514
        Message toSend;
 
515
 
 
516
        BufBuilder b;
 
517
        int opts = 0;
 
518
        b.append( opts );
 
519
        b.append( ns );
 
520
        obj.appendSelfToBufBuilder( b );
 
521
 
 
522
        toSend.setData( dbInsert , b.buf() , b.len() );
 
523
 
 
524
        say( toSend );
 
525
    }
 
526
 
 
527
    void DBClientBase::insert( const string & ns , const vector< BSONObj > &v ) {
 
528
        Message toSend;
 
529
        
 
530
        BufBuilder b;
 
531
        int opts = 0;
 
532
        b.append( opts );
 
533
        b.append( ns );
 
534
        for( vector< BSONObj >::const_iterator i = v.begin(); i != v.end(); ++i )
 
535
            i->appendSelfToBufBuilder( b );
 
536
        
 
537
        toSend.setData( dbInsert, b.buf(), b.len() );
 
538
        
 
539
        say( toSend );
 
540
    }
 
541
 
 
542
    void DBClientBase::remove( const string & ns , Query obj , bool justOne ) {
 
543
        Message toSend;
 
544
 
 
545
        BufBuilder b;
 
546
        int opts = 0;
 
547
        b.append( opts );
 
548
        b.append( ns );
 
549
 
 
550
        int flags = 0;
 
551
        if ( justOne )
 
552
            flags |= 1;
 
553
        b.append( flags );
 
554
 
 
555
        obj.obj.appendSelfToBufBuilder( b );
 
556
 
 
557
        toSend.setData( dbDelete , b.buf() , b.len() );
 
558
 
 
559
        say( toSend );
 
560
    }
 
561
 
 
562
    void DBClientBase::update( const string & ns , Query query , BSONObj obj , bool upsert , bool multi ) {
 
563
 
 
564
        BufBuilder b;
 
565
        b.append( (int)0 ); // reserverd
 
566
        b.append( ns );
 
567
 
 
568
        int flags = 0;
 
569
        if ( upsert ) flags |= UpdateOption_Upsert;
 
570
        if ( multi ) flags |= UpdateOption_Multi;
 
571
        b.append( flags );
 
572
 
 
573
        query.obj.appendSelfToBufBuilder( b );
 
574
        obj.appendSelfToBufBuilder( b );
 
575
 
 
576
        Message toSend;
 
577
        toSend.setData( dbUpdate , b.buf() , b.len() );
 
578
 
 
579
        say( toSend );
 
580
    }
 
581
 
 
582
    auto_ptr<DBClientCursor> DBClientWithCommands::getIndexes( const string &ns ){
 
583
        return query( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , BSON( "ns" << ns ) );
 
584
    }
 
585
    
 
586
    void DBClientWithCommands::dropIndex( const string& ns , BSONObj keys ){
 
587
        dropIndex( ns , genIndexName( keys ) );
 
588
    }
 
589
 
 
590
 
 
591
    void DBClientWithCommands::dropIndex( const string& ns , const string& indexName ){
 
592
        BSONObj info;
 
593
        if ( ! runCommand( nsToDatabase( ns.c_str() ) , 
 
594
                           BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << indexName ) , 
 
595
                           info ) ){
 
596
            log() << "dropIndex failed: " << info << endl;
 
597
            uassert( 10007 ,  "dropIndex failed" , 0 );
 
598
        }
 
599
        resetIndexCache();
 
600
    }
 
601
    
 
602
    void DBClientWithCommands::dropIndexes( const string& ns ){
 
603
        BSONObj info;
 
604
        uassert( 10008 ,  "dropIndexes failed" , runCommand( nsToDatabase( ns.c_str() ) , 
 
605
                                                    BSON( "deleteIndexes" << NamespaceString( ns ).coll << "index" << "*") , 
 
606
                                                    info ) );
 
607
        resetIndexCache();
 
608
    }
 
609
 
 
610
    void DBClientWithCommands::reIndex( const string& ns ){
 
611
        list<BSONObj> all;
 
612
        auto_ptr<DBClientCursor> i = getIndexes( ns );
 
613
        while ( i->more() ){
 
614
            all.push_back( i->next().getOwned() );
 
615
        }
 
616
        
 
617
        dropIndexes( ns );
 
618
        
 
619
        for ( list<BSONObj>::iterator i=all.begin(); i!=all.end(); i++ ){
 
620
            BSONObj o = *i;
 
621
            insert( Namespace( ns.c_str() ).getSisterNS( "system.indexes" ).c_str() , o );
 
622
        }
 
623
        
 
624
    }
 
625
    
 
626
 
 
627
    string DBClientWithCommands::genIndexName( const BSONObj& keys ){
 
628
        stringstream ss;
 
629
        
 
630
        bool first = 1;
 
631
        for ( BSONObjIterator i(keys); i.more(); ) {
 
632
            BSONElement f = i.next();
 
633
            
 
634
            if ( first )
 
635
                first = 0;
 
636
            else
 
637
                ss << "_";
 
638
            
 
639
            ss << f.fieldName() << "_";
 
640
            if( f.isNumber() )
 
641
                ss << f.numberInt();
 
642
        }
 
643
        return ss.str();
 
644
    }
 
645
 
 
646
    bool DBClientWithCommands::ensureIndex( const string &ns , BSONObj keys , bool unique, const string & name ) {
 
647
        BSONObjBuilder toSave;
 
648
        toSave.append( "ns" , ns );
 
649
        toSave.append( "key" , keys );
 
650
 
 
651
        string cacheKey(ns);
 
652
        cacheKey += "--";
 
653
 
 
654
        if ( name != "" ) {
 
655
            toSave.append( "name" , name );
 
656
            cacheKey += name;
 
657
        }
 
658
        else {
 
659
            string nn = genIndexName( keys );
 
660
            toSave.append( "name" , nn );
 
661
            cacheKey += nn;
 
662
        }
 
663
        
 
664
        if ( unique )
 
665
            toSave.appendBool( "unique", unique );
 
666
 
 
667
        if ( _seenIndexes.count( cacheKey ) )
 
668
            return 0;
 
669
        _seenIndexes.insert( cacheKey );
 
670
 
 
671
        insert( Namespace( ns.c_str() ).getSisterNS( "system.indexes"  ).c_str() , toSave.obj() );
 
672
        return 1;
 
673
    }
 
674
 
 
675
    void DBClientWithCommands::resetIndexCache() {
 
676
        _seenIndexes.clear();
 
677
    }
 
678
 
 
679
    /* -- DBClientCursor ---------------------------------------------- */
 
680
 
 
681
    void assembleRequest( const string &ns, BSONObj query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, Message &toSend ) {
 
682
        CHECK_OBJECT( query , "assembleRequest query" );
 
683
        // see query.h for the protocol we are using here.
 
684
        BufBuilder b;
 
685
        int opts = queryOptions;
 
686
        b.append(opts);
 
687
        b.append(ns.c_str());
 
688
        b.append(nToSkip);
 
689
        b.append(nToReturn);
 
690
        query.appendSelfToBufBuilder(b);
 
691
        if ( fieldsToReturn )
 
692
            fieldsToReturn->appendSelfToBufBuilder(b);
 
693
        toSend.setData(dbQuery, b.buf(), b.len());
 
694
    }
 
695
 
 
696
    void DBClientConnection::say( Message &toSend ) {
 
697
        checkConnection();
 
698
        try { 
 
699
            port().say( toSend );
 
700
        } catch( SocketException & ) { 
 
701
            failed = true;
 
702
            throw;
 
703
        }
 
704
    }
 
705
 
 
706
    void DBClientConnection::sayPiggyBack( Message &toSend ) {
 
707
        port().piggyBack( toSend );
 
708
    }
 
709
 
 
710
    bool DBClientConnection::call( Message &toSend, Message &response, bool assertOk ) {
 
711
        /* todo: this is very ugly messagingport::call returns an error code AND can throw 
 
712
                 an exception.  we should make it return void and just throw an exception anytime 
 
713
                 it fails
 
714
        */
 
715
        try { 
 
716
            if ( !port().call(toSend, response) ) {
 
717
                failed = true;
 
718
                if ( assertOk )
 
719
                    massert( 10278 , "dbclient error communicating with server", false);
 
720
                return false;
 
721
            }
 
722
        }
 
723
        catch( SocketException & ) { 
 
724
            failed = true;
 
725
            throw;
 
726
        }
 
727
        return true;
 
728
    }
 
729
 
 
730
    void DBClientConnection::checkResponse( const char *data, int nReturned ) {
 
731
        /* check for errors.  the only one we really care about at
 
732
         this stage is "not master" */
 
733
        if ( clientPaired && nReturned ) {
 
734
            BSONObj o(data);
 
735
            BSONElement e = o.firstElement();
 
736
            if ( strcmp(e.fieldName(), "$err") == 0 &&
 
737
                    e.type() == String && strncmp(e.valuestr(), "not master", 10) == 0 ) {
 
738
                clientPaired->isntMaster();
 
739
            }
 
740
        }
 
741
    }
 
742
 
 
743
    bool DBClientCursor::init() {
 
744
        Message toSend;
 
745
        if ( !cursorId ) {
 
746
            assembleRequest( ns, query, nToReturn, nToSkip, fieldsToReturn, opts, toSend );
 
747
        } else {
 
748
            BufBuilder b;
 
749
            b.append( opts );
 
750
            b.append( ns.c_str() );
 
751
            b.append( nToReturn );
 
752
            b.append( cursorId );
 
753
            toSend.setData( dbGetMore, b.buf(), b.len() );
 
754
        }
 
755
        if ( !connector->call( toSend, *m, false ) )
 
756
            return false;
 
757
        dataReceived();
 
758
        return true;
 
759
    }
 
760
 
 
761
    void DBClientCursor::requestMore() {
 
762
        assert( cursorId && pos == nReturned );
 
763
 
 
764
        BufBuilder b;
 
765
        b.append(opts);
 
766
        b.append(ns.c_str());
 
767
        b.append(nToReturn);
 
768
        b.append(cursorId);
 
769
 
 
770
        Message toSend;
 
771
        toSend.setData(dbGetMore, b.buf(), b.len());
 
772
        auto_ptr<Message> response(new Message());
 
773
        connector->call( toSend, *response );
 
774
 
 
775
        m = response;
 
776
        dataReceived();
 
777
    }
 
778
 
 
779
    void DBClientCursor::dataReceived() {
 
780
        QueryResult *qr = (QueryResult *) m->data;
 
781
        resultFlags = qr->resultFlags();
 
782
        if ( qr->resultFlags() & QueryResult::ResultFlag_CursorNotFound ) {
 
783
            // cursor id no longer valid at the server.
 
784
            assert( qr->cursorId == 0 );
 
785
            cursorId = 0; // 0 indicates no longer valid (dead)
 
786
            // TODO: should we throw a UserException here???
 
787
        }
 
788
        if ( cursorId == 0 || ! ( opts & QueryOption_CursorTailable ) ) {
 
789
            // only set initially: we don't want to kill it on end of data
 
790
            // if it's a tailable cursor
 
791
            cursorId = qr->cursorId;
 
792
        }
 
793
        nReturned = qr->nReturned;
 
794
        pos = 0;
 
795
        data = qr->data();
 
796
 
 
797
        connector->checkResponse( data, nReturned );
 
798
        /* this assert would fire the way we currently work:
 
799
            assert( nReturned || cursorId == 0 );
 
800
        */
 
801
    }
 
802
 
 
803
    /** If true, safe to call next().  Requests more from server if necessary. */
 
804
    bool DBClientCursor::more() {
 
805
        if ( pos < nReturned )
 
806
            return true;
 
807
 
 
808
        if ( cursorId == 0 )
 
809
            return false;
 
810
 
 
811
        requestMore();
 
812
        return pos < nReturned;
 
813
    }
 
814
 
 
815
    BSONObj DBClientCursor::next() {
 
816
        assert( more() );
 
817
        pos++;
 
818
        BSONObj o(data);
 
819
        data += o.objsize();
 
820
        return o;
 
821
    }
 
822
 
 
823
    DBClientCursor::~DBClientCursor() {
 
824
        if ( cursorId && _ownCursor ) {
 
825
            BufBuilder b;
 
826
            b.append( (int)0 ); // reserved
 
827
            b.append( (int)1 ); // number
 
828
            b.append( cursorId );
 
829
 
 
830
            Message m;
 
831
            m.setData( dbKillCursors , b.buf() , b.len() );
 
832
 
 
833
            connector->sayPiggyBack( m );
 
834
        }
 
835
 
 
836
    }
 
837
 
 
838
    /* --- class dbclientpaired --- */
 
839
 
 
840
    string DBClientPaired::toString() {
 
841
        stringstream ss;
 
842
        ss << "state: " << master << '\n';
 
843
        ss << "left:  " << left.toStringLong() << '\n';
 
844
        ss << "right: " << right.toStringLong() << '\n';
 
845
        return ss.str();
 
846
    }
 
847
 
 
848
#pragma warning(disable: 4355)
 
849
    DBClientPaired::DBClientPaired() :
 
850
                left(true, this), right(true, this)
 
851
    {
 
852
        master = NotSetL;
 
853
    }
 
854
#pragma warning(default: 4355)
 
855
 
 
856
    /* find which server, the left or right, is currently master mode */
 
857
    void DBClientPaired::_checkMaster() {
 
858
        for ( int retry = 0; retry < 2; retry++ ) {
 
859
            int x = master;
 
860
            for ( int pass = 0; pass < 2; pass++ ) {
 
861
                DBClientConnection& c = x == 0 ? left : right;
 
862
                try {
 
863
                    bool im;
 
864
                    BSONObj o;
 
865
                    c.isMaster(im, &o);
 
866
                    if ( retry )
 
867
                        log() << "checkmaster: " << c.toString() << ' ' << o.toString() << '\n';
 
868
                    if ( im ) {
 
869
                        master = (State) (x + 2);
 
870
                        return;
 
871
                    }
 
872
                }
 
873
                catch (AssertionException&) {
 
874
                    if ( retry )
 
875
                        log() << "checkmaster: caught exception " << c.toString() << '\n';
 
876
                }
 
877
                x = x^1;
 
878
            }
 
879
            sleepsecs(1);
 
880
        }
 
881
 
 
882
        uassert( 10009 , "checkmaster: no master found", false);
 
883
    }
 
884
 
 
885
    inline DBClientConnection& DBClientPaired::checkMaster() {
 
886
        if ( master > NotSetR ) {
 
887
            // a master is selected.  let's just make sure connection didn't die
 
888
            DBClientConnection& c = master == Left ? left : right;
 
889
            if ( !c.isFailed() )
 
890
                return c;
 
891
            // after a failure, on the next checkMaster, start with the other
 
892
            // server -- presumably it took over. (not critical which we check first,
 
893
            // just will make the failover slightly faster if we guess right)
 
894
            master = master == Left ? NotSetR : NotSetL;
 
895
        }
 
896
 
 
897
        _checkMaster();
 
898
        assert( master > NotSetR );
 
899
        return master == Left ? left : right;
 
900
    }
 
901
 
 
902
    DBClientConnection& DBClientPaired::slaveConn(){
 
903
        DBClientConnection& m = checkMaster();
 
904
        assert( ! m.isFailed() );
 
905
        return master == Left ? right : left;
 
906
    }
 
907
 
 
908
    bool DBClientPaired::connect(const string &serverHostname1, const string &serverHostname2) {
 
909
        string errmsg;
 
910
        bool l = left.connect(serverHostname1, errmsg);
 
911
        bool r = right.connect(serverHostname2, errmsg);
 
912
        master = l ? NotSetL : NotSetR;
 
913
        if ( !l && !r ) // it would be ok to fall through, but checkMaster will then try an immediate reconnect which is slow
 
914
            return false;
 
915
        try {
 
916
            checkMaster();
 
917
        }
 
918
        catch (AssertionException&) {
 
919
            return false;
 
920
        }
 
921
        return true;
 
922
    }
 
923
 
 
924
    bool DBClientPaired::connect(string hostpairstring) { 
 
925
        size_t comma = hostpairstring.find( "," );
 
926
        uassert( 10010 , "bad hostpairstring", comma != string::npos);
 
927
        return connect( hostpairstring.substr( 0 , comma ) , hostpairstring.substr( comma + 1 ) );
 
928
    }
 
929
 
 
930
        bool DBClientPaired::auth(const string &dbname, const string &username, const string &pwd, string& errmsg) { 
 
931
                DBClientConnection& m = checkMaster();
 
932
                if( !m.auth(dbname, username, pwd, errmsg) )
 
933
                        return false;
 
934
                /* we try to authentiate with the other half of the pair -- even if down, that way the authInfo is cached. */
 
935
                string e;
 
936
                try {
 
937
                        if( &m == &left ) 
 
938
                                right.auth(dbname, username, pwd, e);
 
939
                        else
 
940
                                left.auth(dbname, username, pwd, e);
 
941
                }
 
942
                catch( AssertionException&) { 
 
943
                }
 
944
                return true;
 
945
        }
 
946
 
 
947
    auto_ptr<DBClientCursor> DBClientPaired::query(const string &a, Query b, int c, int d,
 
948
            const BSONObj *e, int f)
 
949
    {
 
950
        return checkMaster().query(a,b,c,d,e,f);
 
951
    }
 
952
 
 
953
    BSONObj DBClientPaired::findOne(const string &a, Query b, const BSONObj *c, int d) {
 
954
        return checkMaster().findOne(a,b,c,d);
 
955
    }
 
956
 
 
957
        void testPaired() { 
 
958
                DBClientPaired p;
 
959
                log() << "connect returns " << p.connect("localhost:27017", "localhost:27018") << endl;
 
960
 
 
961
                //DBClientConnection p(true);
 
962
                string errmsg;
 
963
                //              log() << "connect " << p.connect("localhost", errmsg) << endl;
 
964
                log() << "auth " << p.auth("dwight", "u", "p", errmsg) << endl;
 
965
 
 
966
                while( 1 ) { 
 
967
                        sleepsecs(3);
 
968
                        try { 
 
969
                                log() << "findone returns " << p.findOne("dwight.foo", BSONObj()).toString() << endl;
 
970
                                sleepsecs(3);
 
971
                                BSONObj info;
 
972
                                bool im;
 
973
                                log() << "ismaster returns " << p.isMaster(im,&info) << " info: " << info.toString() << endl;
 
974
                        }
 
975
                        catch(...) { 
 
976
                                cout << "caught exception" << endl;
 
977
                        }
 
978
                }
 
979
        }
 
980
 
 
981
} // namespace mongo