~ubuntu-branches/ubuntu/trusty/mongodb/trusty-proposed

« back to all changes in this revision

Viewing changes to db/instance.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Antonin Kral
  • Date: 2010-01-29 19:48:45 UTC
  • Revision ID: james.westby@ubuntu.com-20100129194845-8wbmkf626fwcavc9
Tags: upstream-1.3.1
ImportĀ upstreamĀ versionĀ 1.3.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// instance.cpp : Global state variables and functions.
 
2
//
 
3
 
 
4
/**
 
5
*    Copyright (C) 2008 10gen Inc.
 
6
*
 
7
*    This program is free software: you can redistribute it and/or  modify
 
8
*    it under the terms of the GNU Affero General Public License, version 3,
 
9
*    as published by the Free Software Foundation.
 
10
*
 
11
*    This program is distributed in the hope that it will be useful,
 
12
*    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
13
*    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
14
*    GNU Affero General Public License for more details.
 
15
*
 
16
*    You should have received a copy of the GNU Affero General Public License
 
17
*    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
18
*/
 
19
 
 
20
#include "stdafx.h"
 
21
#include "db.h"
 
22
#include "query.h"
 
23
#include "introspect.h"
 
24
#include "repl.h"
 
25
#include "dbmessage.h"
 
26
#include "instance.h"
 
27
#include "lasterror.h"
 
28
#include "security.h"
 
29
#include "json.h"
 
30
#include "reccache.h"
 
31
#include "replset.h"
 
32
#include "../s/d_logic.h"
 
33
#include "../util/file_allocator.h"
 
34
#include "cmdline.h"
 
35
#if !defined(_WIN32)
 
36
#include <sys/file.h>
 
37
#endif
 
38
#include "dbstats.h"
 
39
 
 
40
namespace mongo {
 
41
 
 
42
    void receivedKillCursors(Message& m);
 
43
    void receivedUpdate(Message& m, CurOp& op);
 
44
    void receivedDelete(Message& m, CurOp& op);
 
45
    void receivedInsert(Message& m, CurOp& op);
 
46
    bool receivedGetMore(DbResponse& dbresponse, Message& m, CurOp& curop );
 
47
 
 
48
    CmdLine cmdLine;
 
49
 
 
50
    int nloggedsome = 0;
 
51
#define LOGSOME if( ++nloggedsome < 1000 || nloggedsome % 100 == 0 )
 
52
 
 
53
    SlaveTypes slave = NotSlave;
 
54
    bool master = false; // true means keep an op log
 
55
    bool autoresync = false;
 
56
    
 
57
    /* we use new here so we don't have to worry about destructor orders at program shutdown */
 
58
    MongoMutex &dbMutex( *(new MongoMutex) );
 
59
//    MutexInfo dbMutexInfo;
 
60
 
 
61
    string dbExecCommand;
 
62
 
 
63
    string bind_ip = "";
 
64
 
 
65
    char *appsrvPath = null;
 
66
 
 
67
    DiagLog _diaglog;
 
68
 
 
69
    int opIdMem = 100000000;
 
70
 
 
71
    bool useCursors = true;
 
72
    bool useHints = true;
 
73
    
 
74
    void closeAllSockets();
 
75
    void flushOpLog( stringstream &ss ) {
 
76
        if( _diaglog.f && _diaglog.f->is_open() ) {
 
77
            ss << "flushing op log and files\n";
 
78
            _diaglog.flush();
 
79
        }
 
80
    }
 
81
 
 
82
    int ctr = 0;
 
83
 
 
84
    KillCurrentOp killCurrentOp;
 
85
    
 
86
    int lockFile = 0;
 
87
 
 
88
    // see FSyncCommand:
 
89
    unsigned lockedForWriting; 
 
90
    boost::mutex lockedForWritingMutex;
 
91
    bool unlockRequested = false;
 
92
 
 
93
    void inProgCmd( Message &m, DbResponse &dbresponse ) {
 
94
        BSONObjBuilder b;
 
95
 
 
96
        AuthenticationInfo *ai = cc().ai;
 
97
        if( !ai->isAuthorized("admin") ) { 
 
98
            BSONObjBuilder b;
 
99
            b.append("err", "unauthorized");
 
100
        }
 
101
        else {
 
102
            vector<BSONObj> vals;
 
103
            {
 
104
                boostlock bl(Client::clientsMutex);
 
105
                for( set<Client*>::iterator i = Client::clients.begin(); i != Client::clients.end(); i++ ) { 
 
106
                    Client *c = *i;
 
107
                    CurOp& co = *(c->curop());
 
108
                    if( co.active() )
 
109
                        vals.push_back( co.infoNoauth() );
 
110
                }
 
111
            }
 
112
            b.append("inprog", vals);
 
113
            unsigned x = lockedForWriting;
 
114
            if( x ) {
 
115
                b.append("fsyncLock", x);
 
116
                b.append("info", "use command {unlock:0} to terminate the fsync write/snapshot lock");
 
117
            }
 
118
        }
 
119
 
 
120
        replyToQuery(0, m, dbresponse, b.obj());
 
121
    }
 
122
    
 
123
    void killOp( Message &m, DbResponse &dbresponse ) {
 
124
        BSONObj obj;
 
125
        AuthenticationInfo *ai = currentClient.get()->ai;
 
126
        if( !ai->isAuthorized("admin") ) { 
 
127
            obj = fromjson("{\"err\":\"unauthorized\"}");
 
128
        }
 
129
        /*else if( !dbMutexInfo.isLocked() ) 
 
130
            obj = fromjson("{\"info\":\"no op in progress/not locked\"}");
 
131
            */
 
132
        else {
 
133
            DbMessage d(m);
 
134
            QueryMessage q(d);
 
135
            BSONElement e = q.query.getField("op");
 
136
            if( !e.isNumber() ) { 
 
137
                obj = fromjson("{\"err\":\"no op number field specified?\"}");
 
138
            }
 
139
            else { 
 
140
                obj = fromjson("{\"info\":\"attempting to kill op\"}");
 
141
                killCurrentOp.kill( (unsigned) e.number() );
 
142
            }
 
143
        }
 
144
        replyToQuery(0, m, dbresponse, obj);
 
145
    }
 
146
 
 
147
    void unlockFsync(const char *ns, Message& m, DbResponse &dbresponse) {
 
148
        BSONObj obj;
 
149
        AuthenticationInfo *ai = currentClient.get()->ai;
 
150
        if( !ai->isAuthorized("admin") || strncmp(ns, "admin.", 6) != 0 ) { 
 
151
            obj = fromjson("{\"err\":\"unauthorized\"}");
 
152
        }
 
153
        else {
 
154
            if( lockedForWriting ) { 
 
155
                                log() << "command: unlock requested" << endl;
 
156
                obj = fromjson("{ok:1,\"info\":\"unlock requested\"}");
 
157
                unlockRequested = true;
 
158
            }
 
159
            else { 
 
160
                obj = fromjson("{ok:0,\"errmsg\":\"not locked\"}");
 
161
            }
 
162
        }
 
163
        replyToQuery(0, m, dbresponse, obj);
 
164
    }
 
165
 
 
166
    static bool receivedQuery(DbResponse& dbresponse, Message& m, 
 
167
                              CurOp& op, bool logit, 
 
168
                              mongolock& lock
 
169
      ) {
 
170
        bool ok = true;
 
171
        MSGID responseTo = m.data->id;
 
172
 
 
173
        DbMessage d(m);
 
174
        QueryMessage q(d);
 
175
        QueryResult* msgdata;
 
176
 
 
177
        Client& c = cc();
 
178
 
 
179
        try {
 
180
            if (q.fields.get() && q.fields->errmsg)
 
181
                uassert( 10053 , q.fields->errmsg, false);
 
182
 
 
183
            /* note these are logged BEFORE authentication -- which is sort of ok */
 
184
            if ( _diaglog.level && logit ) {
 
185
                if ( strstr(q.ns, ".$cmd") ) {
 
186
                    /* $cmd queries are "commands" and usually best treated as write operations */
 
187
                    OPWRITE;
 
188
                }
 
189
                else {
 
190
                    OPREAD;
 
191
                }
 
192
            }
 
193
 
 
194
            setClient( q.ns, dbpath, &lock );
 
195
            c.top.setRead();
 
196
            c.curop()->setNS(q.ns);
 
197
            msgdata = runQuery(m, q, op ).release();
 
198
        }
 
199
        catch ( AssertionException& e ) {
 
200
            ok = false;
 
201
            op.debug().str << " exception ";
 
202
            LOGSOME problem() << " Caught Assertion in runQuery ns:" << q.ns << ' ' << e.toString() << '\n';
 
203
            log() << "  ntoskip:" << q.ntoskip << " ntoreturn:" << q.ntoreturn << '\n';
 
204
            if ( q.query.valid() )
 
205
                log() << "  query:" << q.query.toString() << endl;
 
206
            else
 
207
                log() << "  query object is not valid!" << endl;
 
208
 
 
209
            BSONObjBuilder err;
 
210
            err.append("$err", e.msg.empty() ? "assertion during query" : e.msg);
 
211
            BSONObj errObj = err.done();
 
212
 
 
213
            BufBuilder b;
 
214
            b.skip(sizeof(QueryResult));
 
215
            b.append((void*) errObj.objdata(), errObj.objsize());
 
216
 
 
217
            // todo: call replyToQuery() from here instead of this!!! see dbmessage.h
 
218
            msgdata = (QueryResult *) b.buf();
 
219
            b.decouple();
 
220
            QueryResult *qr = msgdata;
 
221
            qr->_resultFlags() = QueryResult::ResultFlag_ErrSet;
 
222
            qr->len = b.len();
 
223
            qr->setOperation(opReply);
 
224
            qr->cursorId = 0;
 
225
            qr->startingFrom = 0;
 
226
            qr->nReturned = 1;
 
227
 
 
228
        }
 
229
        Message *resp = new Message();
 
230
        resp->setData(msgdata, true); // transport will free
 
231
        dbresponse.response = resp;
 
232
        dbresponse.responseTo = responseTo;
 
233
        Database *database = c.database();
 
234
        if ( database ) {
 
235
            if ( database->profile )
 
236
                op.debug().str << " bytes:" << resp->data->dataLen();
 
237
        }
 
238
        else {
 
239
            if ( strstr(q.ns, "$cmd") == 0 ) // (this condition is normal for $cmd dropDatabase)
 
240
                log() << "ERROR: receiveQuery: database is null; ns=" << q.ns << endl;
 
241
        }
 
242
 
 
243
        return ok;
 
244
    }
 
245
 
 
246
    bool commandIsReadOnly(BSONObj& _cmdobj);
 
247
 
 
248
    // Returns false when request includes 'end'
 
249
    bool assembleResponse( Message &m, DbResponse &dbresponse, const sockaddr_in &client ) {
 
250
 
 
251
        bool writeLock = true;
 
252
 
 
253
        // before we lock...
 
254
        int op = m.data->operation();
 
255
        globalOpCounters.gotOp( op );
 
256
        const char *ns = m.data->_data + 4;
 
257
        if ( op == dbQuery ) {
 
258
            if( strstr(ns, ".$cmd") ) {
 
259
                if( strstr(ns, ".$cmd.sys.") ) { 
 
260
                    if( strstr(ns, "$cmd.sys.inprog") ) {
 
261
                        inProgCmd(m, dbresponse);
 
262
                        return true;
 
263
                    }
 
264
                    if( strstr(ns, "$cmd.sys.killop") ) { 
 
265
                        killOp(m, dbresponse);
 
266
                        return true;
 
267
                    }
 
268
                    if( strstr(ns, "$cmd.sys.unlock") ) { 
 
269
                        unlockFsync(ns, m, dbresponse);
 
270
                        return true;
 
271
                    }
 
272
                }
 
273
                DbMessage d( m );
 
274
                QueryMessage q( d );
 
275
                writeLock = !commandIsReadOnly(q.query);
 
276
            }
 
277
            else
 
278
                writeLock = false;
 
279
        }
 
280
        else if( op == dbGetMore ) {
 
281
            writeLock = false;
 
282
        }
 
283
        
 
284
        if ( handlePossibleShardedMessage( m , dbresponse ) ){
 
285
            /* important to do this before we lock
 
286
               so if a message has to be forwarded, doesn't block for that
 
287
            */
 
288
            return true;
 
289
        }
 
290
 
 
291
        Client& c = cc();
 
292
        c.clearns();
 
293
        
 
294
        auto_ptr<CurOp> nestedOp;
 
295
        CurOp* currentOpP = c.curop();
 
296
        if ( currentOpP->active() ){
 
297
            nestedOp.reset( new CurOp() );
 
298
            currentOpP = nestedOp.get();
 
299
        }
 
300
        CurOp& currentOp = *currentOpP;
 
301
        currentOp.reset(client);
 
302
        currentOp.setOp(op);
 
303
        
 
304
        OpDebug& debug = currentOp.debug();
 
305
        StringBuilder& ss = debug.str;
 
306
 
 
307
        int logThreshold = cmdLine.slowMS;
 
308
        bool log = logLevel >= 1;
 
309
 
 
310
        Timer t( currentOp.startTime() );
 
311
 
 
312
        mongolock lk(writeLock);
 
313
 
 
314
#if 0
 
315
        /* use this if you only want to process operations for a particular namespace.
 
316
         maybe add to cmd line parms or something fancier.
 
317
         */
 
318
        DbMessage ddd(m);
 
319
        if ( strncmp(ddd.getns(), "clusterstock", 12) != 0 ) {
 
320
            static int q;
 
321
            if ( ++q < 20 )
 
322
                out() << "TEMP skip " << ddd.getns() << endl;
 
323
            goto skip;
 
324
        }
 
325
#endif
 
326
 
 
327
        if ( op == dbQuery ) {
 
328
            // receivedQuery() does its own authorization processing.
 
329
            if ( ! receivedQuery(dbresponse, m, currentOp, true, lk) )
 
330
                log = true;
 
331
        }
 
332
        else if ( op == dbGetMore ) {
 
333
            // does its own authorization processing.
 
334
            OPREAD;
 
335
            DEV log = true;
 
336
            ss << "getmore ";
 
337
            if ( ! receivedGetMore(dbresponse, m, currentOp) )
 
338
                log = true;
 
339
        }
 
340
        else if ( op == dbMsg ) {
 
341
                        /* deprecated / rarely used.  intended for connection diagnostics. */
 
342
            ss << "msg ";
 
343
            char *p = m.data->_data;
 
344
            int len = strlen(p);
 
345
            if ( len > 400 )
 
346
                out() << curTimeMillis() % 10000 <<
 
347
                     " long msg received, len:" << len <<
 
348
                     " ends with: " << p + len - 10 << endl;
 
349
            bool end = false; //strcmp("end", p) == 0;
 
350
            Message *resp = new Message();
 
351
            resp->setData(opReply, "i am fine");
 
352
            dbresponse.response = resp;
 
353
            dbresponse.responseTo = m.data->id;
 
354
            //dbMsgPort.reply(m, resp);
 
355
            if ( end )
 
356
                return false;
 
357
        }
 
358
        else {
 
359
            const char *ns = m.data->_data + 4;
 
360
            char cl[256];
 
361
            nsToDatabase(ns, cl);
 
362
            currentOp.setNS(ns);
 
363
            AuthenticationInfo *ai = currentClient.get()->ai;
 
364
            if( !ai->isAuthorized(cl) ) { 
 
365
                uassert_nothrow("unauthorized");
 
366
            }
 
367
            else if ( op == dbInsert ) {
 
368
                OPWRITE;
 
369
                try {
 
370
                    ss << "insert ";
 
371
                    receivedInsert(m, currentOp);
 
372
                }
 
373
                catch ( AssertionException& e ) {
 
374
                    LOGSOME problem() << " Caught Assertion insert, continuing\n";
 
375
                    ss << " exception " << e.toString();
 
376
                    log = true;
 
377
                }
 
378
            }
 
379
            else if ( op == dbUpdate ) {
 
380
                OPWRITE;
 
381
                try {
 
382
                    ss << "update ";
 
383
                    receivedUpdate(m, currentOp);
 
384
                }
 
385
                catch ( AssertionException& e ) {
 
386
                    LOGSOME problem() << " Caught Assertion update, continuing" << endl;
 
387
                    ss << " exception " << e.toString();
 
388
                    log = true;
 
389
                }
 
390
            }
 
391
            else if ( op == dbDelete ) {
 
392
                OPWRITE;
 
393
                try {
 
394
                    ss << "remove ";
 
395
                    receivedDelete(m, currentOp);
 
396
                }
 
397
                catch ( AssertionException& e ) {
 
398
                    LOGSOME problem() << " Caught Assertion receivedDelete, continuing" << endl;
 
399
                    ss << " exception " << e.toString();
 
400
                    log = true;
 
401
                }
 
402
            }
 
403
            else if ( op == dbKillCursors ) {
 
404
                OPREAD;
 
405
                try {
 
406
                    logThreshold = 10;
 
407
                    ss << "killcursors ";
 
408
                    receivedKillCursors(m);
 
409
                }
 
410
                catch ( AssertionException& e ) {
 
411
                    problem() << " Caught Assertion in kill cursors, continuing" << endl;
 
412
                    ss << " exception " + e.toString();
 
413
                    log = true;
 
414
                }
 
415
            }
 
416
            else {
 
417
                out() << "    operation isn't supported: " << op << endl;
 
418
                currentOp.setActive(false);
 
419
                assert(false);
 
420
            }
 
421
        }
 
422
        int ms = t.millis();
 
423
        log = log || (logLevel >= 2 && ++ctr % 512 == 0);
 
424
        DEV log = true;
 
425
        if ( log || ms > logThreshold ) {
 
426
            ss << ' ' << ms << "ms";
 
427
            mongo::log() << ss.str() << endl;
 
428
        }
 
429
        Database *database = c.database();
 
430
        if ( database && database->profile >= 1 ) {
 
431
            if ( database->profile >= 2 || ms >= cmdLine.slowMS ) {
 
432
                // performance profiling is on
 
433
                if ( dbMutex.getState() > 1 || dbMutex.getState() < -1 ){
 
434
                    out() << "warning: not profiling because recursive lock" << endl;
 
435
                }
 
436
                else {
 
437
                    string old_ns = c.ns();
 
438
                    Database * old_db = c.database();
 
439
                    lk.releaseAndWriteLock();
 
440
                    Client::Context c( old_ns , old_db );
 
441
                    profile(ss.str().c_str(), ms);
 
442
                }
 
443
            }
 
444
        }
 
445
 
 
446
        currentOp.setActive(false);
 
447
        return true;
 
448
    } /* assembleResponse() */
 
449
 
 
450
    void killCursors(int n, long long *ids);
 
451
    void receivedKillCursors(Message& m) {
 
452
        int *x = (int *) m.data->_data;
 
453
        x++; // reserved
 
454
        int n = *x++;
 
455
        assert( n >= 1 );
 
456
        if ( n > 2000 ) {
 
457
            problem() << "Assertion failure, receivedKillCursors, n=" << n << endl;
 
458
            assert( n < 30000 );
 
459
        }
 
460
        killCursors(n, (long long *) x);
 
461
    }
 
462
 
 
463
    /* cl - database name
 
464
       path - db directory
 
465
    */
 
466
    void closeDatabase( const char *cl, const string& path ) {
 
467
        Database *database = cc().database();
 
468
        assert( database );
 
469
        assert( database->name == cl );
 
470
                /*
 
471
        if ( string("local") != cl ) {
 
472
            DBInfo i(cl);
 
473
            i.dbDropped();
 
474
                        }*/
 
475
 
 
476
        /* important: kill all open cursors on the database */
 
477
        string prefix(cl);
 
478
        prefix += '.';
 
479
        ClientCursor::invalidate(prefix.c_str());
 
480
 
 
481
        NamespaceDetailsTransient::clearForPrefix( prefix.c_str() );
 
482
 
 
483
        dbHolder.erase( cl, path );
 
484
        delete database; // closes files
 
485
        cc().clearns();
 
486
    }
 
487
 
 
488
    void receivedUpdate(Message& m, CurOp& op) {
 
489
        DbMessage d(m);
 
490
        const char *ns = d.getns();
 
491
        assert(*ns);
 
492
        uassert( 10054 ,  "not master", isMasterNs( ns ) );
 
493
        setClient(ns);
 
494
        Client& client = cc();
 
495
        client.top.setWrite();
 
496
        op.debug().str << ns << ' ';
 
497
        int flags = d.pullInt();
 
498
        BSONObj query = d.nextJsObj();
 
499
 
 
500
        assert( d.moreJSObjs() );
 
501
        assert( query.objsize() < m.data->dataLen() );
 
502
        BSONObj toupdate = d.nextJsObj();
 
503
        uassert( 10055 , "update object too large", toupdate.objsize() <= MaxBSONObjectSize);
 
504
        assert( toupdate.objsize() < m.data->dataLen() );
 
505
        assert( query.objsize() + toupdate.objsize() < m.data->dataLen() );
 
506
        bool upsert = flags & UpdateOption_Upsert;
 
507
        bool multi = flags & UpdateOption_Multi;
 
508
        {
 
509
            string s = query.toString();
 
510
            /* todo: we shouldn't do all this ss stuff when we don't need it, it will slow us down. */
 
511
            op.debug().str << " query: " << s;
 
512
            CurOp& currentOp = *client.curop();
 
513
            currentOp.setQuery(query);
 
514
        }        
 
515
        UpdateResult res = updateObjects(ns, toupdate, query, upsert, multi, true, op.debug() );
 
516
        /* TODO FIX: recordUpdate should take a long int for parm #2 */
 
517
        recordUpdate( res.existing , (int) res.num ); // for getlasterror
 
518
    }
 
519
 
 
520
    void receivedDelete(Message& m, CurOp& op) {
 
521
        DbMessage d(m);
 
522
        const char *ns = d.getns();
 
523
        assert(*ns);
 
524
        uassert( 10056 ,  "not master", isMasterNs( ns ) );
 
525
        setClient(ns);
 
526
        Client& client = cc();
 
527
        client.top.setWrite();
 
528
        int flags = d.pullInt();
 
529
        bool justOne = flags & 1;
 
530
        assert( d.moreJSObjs() );
 
531
        BSONObj pattern = d.nextJsObj();
 
532
        {
 
533
            string s = pattern.toString();
 
534
            op.debug().str << " query: " << s;
 
535
            CurOp& currentOp = *client.curop();
 
536
            currentOp.setQuery(pattern);
 
537
        }        
 
538
        int n = deleteObjects(ns, pattern, justOne, true);
 
539
        recordDelete( n );
 
540
    }
 
541
    
 
542
    QueryResult* emptyMoreResult(long long);
 
543
 
 
544
    bool receivedGetMore(DbResponse& dbresponse, Message& m, CurOp& curop ) {
 
545
        bool ok = true;
 
546
        DbMessage d(m);
 
547
        const char *ns = d.getns();
 
548
        StringBuilder& ss = curop.debug().str;
 
549
        ss << ns;
 
550
        setClient(ns);
 
551
        cc().top.setRead();
 
552
        int ntoreturn = d.pullInt();
 
553
        long long cursorid = d.pullInt64();
 
554
        ss << " cid:" << cursorid;
 
555
        ss << " ntoreturn:" << ntoreturn;
 
556
        QueryResult* msgdata;
 
557
        try {
 
558
            AuthenticationInfo *ai = currentClient.get()->ai;
 
559
            uassert( 10057 , "unauthorized", ai->isAuthorized(cc().database()->name.c_str()));
 
560
            msgdata = getMore(ns, ntoreturn, cursorid, curop);
 
561
        }
 
562
        catch ( AssertionException& e ) {
 
563
            ss << " exception " + e.toString();
 
564
            msgdata = emptyMoreResult(cursorid);
 
565
            ok = false;
 
566
        }
 
567
        Message *resp = new Message();
 
568
        resp->setData(msgdata, true);
 
569
        ss << " bytes:" << resp->data->dataLen();
 
570
        ss << " nreturned:" << msgdata->nReturned;
 
571
        dbresponse.response = resp;
 
572
        dbresponse.responseTo = m.data->id;
 
573
        //dbMsgPort.reply(m, resp);
 
574
        return ok;
 
575
    }
 
576
 
 
577
    void receivedInsert(Message& m, CurOp& op) {
 
578
        DbMessage d(m);
 
579
                const char *ns = d.getns();
 
580
                assert(*ns);
 
581
        uassert( 10058 ,  "not master", isMasterNs( ns ) );
 
582
                setClient(ns);
 
583
        cc().top.setWrite();
 
584
        op.debug().str << ns;
 
585
                
 
586
        while ( d.moreJSObjs() ) {
 
587
            BSONObj js = d.nextJsObj();
 
588
            uassert( 10059 , "object to insert too large", js.objsize() <= MaxBSONObjectSize);
 
589
            theDataFileMgr.insert(ns, js, false);
 
590
            logOp("i", ns, js);
 
591
        }
 
592
    }
 
593
 
 
594
    class JniMessagingPort : public AbstractMessagingPort {
 
595
    public:
 
596
        JniMessagingPort(Message& _container) : container(_container) { }
 
597
        void reply(Message& received, Message& response, MSGID) {
 
598
            container = response;
 
599
        }
 
600
        void reply(Message& received, Message& response) {
 
601
            container = response;
 
602
        }
 
603
        unsigned remotePort(){
 
604
            return 1;
 
605
        }
 
606
        Message & container;
 
607
    };
 
608
    
 
609
    void getDatabaseNames( vector< string > &names ) {
 
610
        boost::filesystem::path path( dbpath );
 
611
        for ( boost::filesystem::directory_iterator i( path );
 
612
                i != boost::filesystem::directory_iterator(); ++i ) {
 
613
            string fileName = boost::filesystem::path(*i).leaf();
 
614
            if ( fileName.length() > 3 && fileName.substr( fileName.length() - 3, 3 ) == ".ns" )
 
615
                names.push_back( fileName.substr( 0, fileName.length() - 3 ) );
 
616
        }
 
617
    }
 
618
 
 
619
    bool DBDirectClient::call( Message &toSend, Message &response, bool assertOk ) {
 
620
        SavedContext c;
 
621
        if ( lastError._get() )
 
622
            lastError.startRequest( toSend, lastError._get() );
 
623
        DbResponse dbResponse;
 
624
        assembleResponse( toSend, dbResponse );
 
625
        assert( dbResponse.response );
 
626
        response = *dbResponse.response;
 
627
        return true;
 
628
    }
 
629
 
 
630
    void DBDirectClient::say( Message &toSend ) {
 
631
        SavedContext c;
 
632
        if ( lastError._get() )
 
633
            lastError.startRequest( toSend, lastError._get() );
 
634
        DbResponse dbResponse;
 
635
        assembleResponse( toSend, dbResponse );
 
636
    }
 
637
 
 
638
    auto_ptr<DBClientCursor> DBDirectClient::query(const string &ns, Query query, int nToReturn , int nToSkip ,
 
639
                                                   const BSONObj *fieldsToReturn , int queryOptions ){
 
640
        
 
641
        //if ( ! query.obj.isEmpty() || nToReturn != 0 || nToSkip != 0 || fieldsToReturn || queryOptions )
 
642
        return DBClientBase::query( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions );
 
643
        //
 
644
        //assert( query.obj.isEmpty() );
 
645
        //throw UserException( (string)"yay:" + ns );
 
646
    }
 
647
 
 
648
 
 
649
    DBDirectClient::AlwaysAuthorized DBDirectClient::SavedContext::always;
 
650
 
 
651
    DBClientBase * createDirectClient(){
 
652
        return new DBDirectClient();
 
653
    }
 
654
 
 
655
    void recCacheCloseAll();
 
656
 
 
657
    boost::mutex &exitMutex( *( new boost::mutex ) );
 
658
    int numExitCalls = 0;
 
659
    void shutdown();
 
660
 
 
661
    bool inShutdown(){
 
662
        return numExitCalls > 0;
 
663
    }
 
664
 
 
665
    void tryToOutputFatal( const string& s ){
 
666
        try {
 
667
            rawOut( s );
 
668
            return;
 
669
        }
 
670
        catch ( ... ){}
 
671
 
 
672
        try {
 
673
            cerr << s << endl;
 
674
            return;
 
675
        }
 
676
        catch ( ... ){}
 
677
        
 
678
        // uh - oh, not sure there is anything else we can do...
 
679
    }
 
680
 
 
681
    /* not using log() herein in case we are already locked */
 
682
    void dbexit( ExitCode rc, const char *why) {        
 
683
        {
 
684
            boostlock lk( exitMutex );
 
685
            if ( numExitCalls++ > 0 ) {
 
686
                if ( numExitCalls > 5 ){
 
687
                    // this means something horrible has happened
 
688
                    ::_exit( rc );
 
689
                }
 
690
                stringstream ss;
 
691
                ss << "dbexit: " << why << "; exiting immediately" << endl;
 
692
                tryToOutputFatal( ss.str() );
 
693
                ::exit( rc );                
 
694
            }
 
695
        }
 
696
        
 
697
        stringstream ss;
 
698
        ss << "dbexit: " << why << endl;
 
699
        tryToOutputFatal( ss.str() );
 
700
        
 
701
        try {
 
702
            shutdown(); // gracefully shutdown instance
 
703
        }
 
704
        catch ( ... ){
 
705
            tryToOutputFatal( "shutdown failed with exception" );
 
706
        }
 
707
        
 
708
        tryToOutputFatal( "dbexit: really exiting now\n" );
 
709
        ::exit(rc);
 
710
    }
 
711
    
 
712
    void shutdown() {
 
713
 
 
714
 
 
715
        log() << "\t shutdown: going to close listening sockets..." << endl;        
 
716
        ListeningSockets::get()->closeAll();
 
717
 
 
718
        log() << "\t shutdown: going to flush oplog..." << endl;
 
719
        stringstream ss2;
 
720
        flushOpLog( ss2 );
 
721
        rawOut( ss2.str() );
 
722
 
 
723
        /* must do this before unmapping mem or you may get a seg fault */
 
724
        log() << "\t shutdown: going to close sockets..." << endl;
 
725
        boost::thread close_socket_thread(closeAllSockets);
 
726
 
 
727
        // wait until file preallocation finishes
 
728
        // we would only hang here if the file_allocator code generates a
 
729
        // synchronous signal, which we don't expect
 
730
        log() << "\t shutdown: waiting for fs preallocator..." << endl;
 
731
        theFileAllocator().waitUntilFinished();
 
732
        
 
733
        log() << "\t shutdown: closing all files..." << endl;
 
734
        stringstream ss3;
 
735
        MemoryMappedFile::closeAllFiles( ss3 );
 
736
        rawOut( ss3.str() );
 
737
 
 
738
        // should we be locked here?  we aren't. might be ok as-is.
 
739
        recCacheCloseAll();
 
740
        
 
741
#if !defined(_WIN32) && !defined(__sunos__)
 
742
        if ( lockFile ){
 
743
            log() << "\t shutdown: removing fs lock..." << endl;
 
744
            if( ftruncate( lockFile , 0 ) ) 
 
745
                log() << "\t couldn't remove fs lock " << OUTPUT_ERRNO << endl;
 
746
            flock( lockFile, LOCK_UN );
 
747
        }
 
748
#endif
 
749
    }
 
750
 
 
751
    void acquirePathLock() {
 
752
#if !defined(_WIN32) && !defined(__sunos__)
 
753
        string name = ( boost::filesystem::path( dbpath ) / "mongod.lock" ).native_file_string();
 
754
        lockFile = open( name.c_str(), O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO );
 
755
        massert( 10309 ,  "Unable to create / open lock file for dbpath: " + name, lockFile > 0 );
 
756
        massert( 10310 ,  "Unable to acquire lock for dbpath: " + name, flock( lockFile, LOCK_EX | LOCK_NB ) == 0 );
 
757
        
 
758
        stringstream ss;
 
759
        ss << getpid() << endl;
 
760
        string s = ss.str();
 
761
        const char * data = s.c_str();
 
762
        assert( write( lockFile , data , strlen( data ) ) );
 
763
        fsync( lockFile );
 
764
#endif        
 
765
    }
 
766
    
 
767
} // namespace mongo