1
// instance.cpp : Global state variables and functions.
5
* Copyright (C) 2008 10gen Inc.
7
* This program is free software: you can redistribute it and/or modify
8
* it under the terms of the GNU Affero General Public License, version 3,
9
* as published by the Free Software Foundation.
11
* This program is distributed in the hope that it will be useful,
12
* but WITHOUT ANY WARRANTY; without even the implied warranty of
13
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14
* GNU Affero General Public License for more details.
16
* You should have received a copy of the GNU Affero General Public License
17
* along with this program. If not, see <http://www.gnu.org/licenses/>.
23
#include "introspect.h"
25
#include "dbmessage.h"
27
#include "lasterror.h"
32
#include "../s/d_logic.h"
33
#include "../util/file_allocator.h"
42
void receivedKillCursors(Message& m);
43
void receivedUpdate(Message& m, CurOp& op);
44
void receivedDelete(Message& m, CurOp& op);
45
void receivedInsert(Message& m, CurOp& op);
46
bool receivedGetMore(DbResponse& dbresponse, Message& m, CurOp& curop );
51
#define LOGSOME if( ++nloggedsome < 1000 || nloggedsome % 100 == 0 )
53
SlaveTypes slave = NotSlave;
54
bool master = false; // true means keep an op log
55
bool autoresync = false;
57
/* we use new here so we don't have to worry about destructor orders at program shutdown */
58
MongoMutex &dbMutex( *(new MongoMutex) );
59
// MutexInfo dbMutexInfo;
65
char *appsrvPath = null;
69
int opIdMem = 100000000;
71
bool useCursors = true;
74
void closeAllSockets();
75
void flushOpLog( stringstream &ss ) {
76
if( _diaglog.f && _diaglog.f->is_open() ) {
77
ss << "flushing op log and files\n";
84
KillCurrentOp killCurrentOp;
89
unsigned lockedForWriting;
90
boost::mutex lockedForWritingMutex;
91
bool unlockRequested = false;
93
void inProgCmd( Message &m, DbResponse &dbresponse ) {
96
AuthenticationInfo *ai = cc().ai;
97
if( !ai->isAuthorized("admin") ) {
99
b.append("err", "unauthorized");
102
vector<BSONObj> vals;
104
boostlock bl(Client::clientsMutex);
105
for( set<Client*>::iterator i = Client::clients.begin(); i != Client::clients.end(); i++ ) {
107
CurOp& co = *(c->curop());
109
vals.push_back( co.infoNoauth() );
112
b.append("inprog", vals);
113
unsigned x = lockedForWriting;
115
b.append("fsyncLock", x);
116
b.append("info", "use command {unlock:0} to terminate the fsync write/snapshot lock");
120
replyToQuery(0, m, dbresponse, b.obj());
123
void killOp( Message &m, DbResponse &dbresponse ) {
125
AuthenticationInfo *ai = currentClient.get()->ai;
126
if( !ai->isAuthorized("admin") ) {
127
obj = fromjson("{\"err\":\"unauthorized\"}");
129
/*else if( !dbMutexInfo.isLocked() )
130
obj = fromjson("{\"info\":\"no op in progress/not locked\"}");
135
BSONElement e = q.query.getField("op");
136
if( !e.isNumber() ) {
137
obj = fromjson("{\"err\":\"no op number field specified?\"}");
140
obj = fromjson("{\"info\":\"attempting to kill op\"}");
141
killCurrentOp.kill( (unsigned) e.number() );
144
replyToQuery(0, m, dbresponse, obj);
147
void unlockFsync(const char *ns, Message& m, DbResponse &dbresponse) {
149
AuthenticationInfo *ai = currentClient.get()->ai;
150
if( !ai->isAuthorized("admin") || strncmp(ns, "admin.", 6) != 0 ) {
151
obj = fromjson("{\"err\":\"unauthorized\"}");
154
if( lockedForWriting ) {
155
log() << "command: unlock requested" << endl;
156
obj = fromjson("{ok:1,\"info\":\"unlock requested\"}");
157
unlockRequested = true;
160
obj = fromjson("{ok:0,\"errmsg\":\"not locked\"}");
163
replyToQuery(0, m, dbresponse, obj);
166
static bool receivedQuery(DbResponse& dbresponse, Message& m,
167
CurOp& op, bool logit,
171
MSGID responseTo = m.data->id;
175
QueryResult* msgdata;
180
if (q.fields.get() && q.fields->errmsg)
181
uassert( 10053 , q.fields->errmsg, false);
183
/* note these are logged BEFORE authentication -- which is sort of ok */
184
if ( _diaglog.level && logit ) {
185
if ( strstr(q.ns, ".$cmd") ) {
186
/* $cmd queries are "commands" and usually best treated as write operations */
194
setClient( q.ns, dbpath, &lock );
196
c.curop()->setNS(q.ns);
197
msgdata = runQuery(m, q, op ).release();
199
catch ( AssertionException& e ) {
201
op.debug().str << " exception ";
202
LOGSOME problem() << " Caught Assertion in runQuery ns:" << q.ns << ' ' << e.toString() << '\n';
203
log() << " ntoskip:" << q.ntoskip << " ntoreturn:" << q.ntoreturn << '\n';
204
if ( q.query.valid() )
205
log() << " query:" << q.query.toString() << endl;
207
log() << " query object is not valid!" << endl;
210
err.append("$err", e.msg.empty() ? "assertion during query" : e.msg);
211
BSONObj errObj = err.done();
214
b.skip(sizeof(QueryResult));
215
b.append((void*) errObj.objdata(), errObj.objsize());
217
// todo: call replyToQuery() from here instead of this!!! see dbmessage.h
218
msgdata = (QueryResult *) b.buf();
220
QueryResult *qr = msgdata;
221
qr->_resultFlags() = QueryResult::ResultFlag_ErrSet;
223
qr->setOperation(opReply);
225
qr->startingFrom = 0;
229
Message *resp = new Message();
230
resp->setData(msgdata, true); // transport will free
231
dbresponse.response = resp;
232
dbresponse.responseTo = responseTo;
233
Database *database = c.database();
235
if ( database->profile )
236
op.debug().str << " bytes:" << resp->data->dataLen();
239
if ( strstr(q.ns, "$cmd") == 0 ) // (this condition is normal for $cmd dropDatabase)
240
log() << "ERROR: receiveQuery: database is null; ns=" << q.ns << endl;
246
bool commandIsReadOnly(BSONObj& _cmdobj);
248
// Returns false when request includes 'end'
249
bool assembleResponse( Message &m, DbResponse &dbresponse, const sockaddr_in &client ) {
251
bool writeLock = true;
254
int op = m.data->operation();
255
globalOpCounters.gotOp( op );
256
const char *ns = m.data->_data + 4;
257
if ( op == dbQuery ) {
258
if( strstr(ns, ".$cmd") ) {
259
if( strstr(ns, ".$cmd.sys.") ) {
260
if( strstr(ns, "$cmd.sys.inprog") ) {
261
inProgCmd(m, dbresponse);
264
if( strstr(ns, "$cmd.sys.killop") ) {
265
killOp(m, dbresponse);
268
if( strstr(ns, "$cmd.sys.unlock") ) {
269
unlockFsync(ns, m, dbresponse);
275
writeLock = !commandIsReadOnly(q.query);
280
else if( op == dbGetMore ) {
284
if ( handlePossibleShardedMessage( m , dbresponse ) ){
285
/* important to do this before we lock
286
so if a message has to be forwarded, doesn't block for that
294
auto_ptr<CurOp> nestedOp;
295
CurOp* currentOpP = c.curop();
296
if ( currentOpP->active() ){
297
nestedOp.reset( new CurOp() );
298
currentOpP = nestedOp.get();
300
CurOp& currentOp = *currentOpP;
301
currentOp.reset(client);
304
OpDebug& debug = currentOp.debug();
305
StringBuilder& ss = debug.str;
307
int logThreshold = cmdLine.slowMS;
308
bool log = logLevel >= 1;
310
Timer t( currentOp.startTime() );
312
mongolock lk(writeLock);
315
/* use this if you only want to process operations for a particular namespace.
316
maybe add to cmd line parms or something fancier.
319
if ( strncmp(ddd.getns(), "clusterstock", 12) != 0 ) {
322
out() << "TEMP skip " << ddd.getns() << endl;
327
if ( op == dbQuery ) {
328
// receivedQuery() does its own authorization processing.
329
if ( ! receivedQuery(dbresponse, m, currentOp, true, lk) )
332
else if ( op == dbGetMore ) {
333
// does its own authorization processing.
337
if ( ! receivedGetMore(dbresponse, m, currentOp) )
340
else if ( op == dbMsg ) {
341
/* deprecated / rarely used. intended for connection diagnostics. */
343
char *p = m.data->_data;
346
out() << curTimeMillis() % 10000 <<
347
" long msg received, len:" << len <<
348
" ends with: " << p + len - 10 << endl;
349
bool end = false; //strcmp("end", p) == 0;
350
Message *resp = new Message();
351
resp->setData(opReply, "i am fine");
352
dbresponse.response = resp;
353
dbresponse.responseTo = m.data->id;
354
//dbMsgPort.reply(m, resp);
359
const char *ns = m.data->_data + 4;
361
nsToDatabase(ns, cl);
363
AuthenticationInfo *ai = currentClient.get()->ai;
364
if( !ai->isAuthorized(cl) ) {
365
uassert_nothrow("unauthorized");
367
else if ( op == dbInsert ) {
371
receivedInsert(m, currentOp);
373
catch ( AssertionException& e ) {
374
LOGSOME problem() << " Caught Assertion insert, continuing\n";
375
ss << " exception " << e.toString();
379
else if ( op == dbUpdate ) {
383
receivedUpdate(m, currentOp);
385
catch ( AssertionException& e ) {
386
LOGSOME problem() << " Caught Assertion update, continuing" << endl;
387
ss << " exception " << e.toString();
391
else if ( op == dbDelete ) {
395
receivedDelete(m, currentOp);
397
catch ( AssertionException& e ) {
398
LOGSOME problem() << " Caught Assertion receivedDelete, continuing" << endl;
399
ss << " exception " << e.toString();
403
else if ( op == dbKillCursors ) {
407
ss << "killcursors ";
408
receivedKillCursors(m);
410
catch ( AssertionException& e ) {
411
problem() << " Caught Assertion in kill cursors, continuing" << endl;
412
ss << " exception " + e.toString();
417
out() << " operation isn't supported: " << op << endl;
418
currentOp.setActive(false);
423
log = log || (logLevel >= 2 && ++ctr % 512 == 0);
425
if ( log || ms > logThreshold ) {
426
ss << ' ' << ms << "ms";
427
mongo::log() << ss.str() << endl;
429
Database *database = c.database();
430
if ( database && database->profile >= 1 ) {
431
if ( database->profile >= 2 || ms >= cmdLine.slowMS ) {
432
// performance profiling is on
433
if ( dbMutex.getState() > 1 || dbMutex.getState() < -1 ){
434
out() << "warning: not profiling because recursive lock" << endl;
437
string old_ns = c.ns();
438
Database * old_db = c.database();
439
lk.releaseAndWriteLock();
440
Client::Context c( old_ns , old_db );
441
profile(ss.str().c_str(), ms);
446
currentOp.setActive(false);
448
} /* assembleResponse() */
450
void killCursors(int n, long long *ids);
451
void receivedKillCursors(Message& m) {
452
int *x = (int *) m.data->_data;
457
problem() << "Assertion failure, receivedKillCursors, n=" << n << endl;
460
killCursors(n, (long long *) x);
463
/* cl - database name
466
void closeDatabase( const char *cl, const string& path ) {
467
Database *database = cc().database();
469
assert( database->name == cl );
471
if ( string("local") != cl ) {
476
/* important: kill all open cursors on the database */
479
ClientCursor::invalidate(prefix.c_str());
481
NamespaceDetailsTransient::clearForPrefix( prefix.c_str() );
483
dbHolder.erase( cl, path );
484
delete database; // closes files
488
void receivedUpdate(Message& m, CurOp& op) {
490
const char *ns = d.getns();
492
uassert( 10054 , "not master", isMasterNs( ns ) );
494
Client& client = cc();
495
client.top.setWrite();
496
op.debug().str << ns << ' ';
497
int flags = d.pullInt();
498
BSONObj query = d.nextJsObj();
500
assert( d.moreJSObjs() );
501
assert( query.objsize() < m.data->dataLen() );
502
BSONObj toupdate = d.nextJsObj();
503
uassert( 10055 , "update object too large", toupdate.objsize() <= MaxBSONObjectSize);
504
assert( toupdate.objsize() < m.data->dataLen() );
505
assert( query.objsize() + toupdate.objsize() < m.data->dataLen() );
506
bool upsert = flags & UpdateOption_Upsert;
507
bool multi = flags & UpdateOption_Multi;
509
string s = query.toString();
510
/* todo: we shouldn't do all this ss stuff when we don't need it, it will slow us down. */
511
op.debug().str << " query: " << s;
512
CurOp& currentOp = *client.curop();
513
currentOp.setQuery(query);
515
UpdateResult res = updateObjects(ns, toupdate, query, upsert, multi, true, op.debug() );
516
/* TODO FIX: recordUpdate should take a long int for parm #2 */
517
recordUpdate( res.existing , (int) res.num ); // for getlasterror
520
void receivedDelete(Message& m, CurOp& op) {
522
const char *ns = d.getns();
524
uassert( 10056 , "not master", isMasterNs( ns ) );
526
Client& client = cc();
527
client.top.setWrite();
528
int flags = d.pullInt();
529
bool justOne = flags & 1;
530
assert( d.moreJSObjs() );
531
BSONObj pattern = d.nextJsObj();
533
string s = pattern.toString();
534
op.debug().str << " query: " << s;
535
CurOp& currentOp = *client.curop();
536
currentOp.setQuery(pattern);
538
int n = deleteObjects(ns, pattern, justOne, true);
542
QueryResult* emptyMoreResult(long long);
544
bool receivedGetMore(DbResponse& dbresponse, Message& m, CurOp& curop ) {
547
const char *ns = d.getns();
548
StringBuilder& ss = curop.debug().str;
552
int ntoreturn = d.pullInt();
553
long long cursorid = d.pullInt64();
554
ss << " cid:" << cursorid;
555
ss << " ntoreturn:" << ntoreturn;
556
QueryResult* msgdata;
558
AuthenticationInfo *ai = currentClient.get()->ai;
559
uassert( 10057 , "unauthorized", ai->isAuthorized(cc().database()->name.c_str()));
560
msgdata = getMore(ns, ntoreturn, cursorid, curop);
562
catch ( AssertionException& e ) {
563
ss << " exception " + e.toString();
564
msgdata = emptyMoreResult(cursorid);
567
Message *resp = new Message();
568
resp->setData(msgdata, true);
569
ss << " bytes:" << resp->data->dataLen();
570
ss << " nreturned:" << msgdata->nReturned;
571
dbresponse.response = resp;
572
dbresponse.responseTo = m.data->id;
573
//dbMsgPort.reply(m, resp);
577
void receivedInsert(Message& m, CurOp& op) {
579
const char *ns = d.getns();
581
uassert( 10058 , "not master", isMasterNs( ns ) );
584
op.debug().str << ns;
586
while ( d.moreJSObjs() ) {
587
BSONObj js = d.nextJsObj();
588
uassert( 10059 , "object to insert too large", js.objsize() <= MaxBSONObjectSize);
589
theDataFileMgr.insert(ns, js, false);
594
class JniMessagingPort : public AbstractMessagingPort {
596
JniMessagingPort(Message& _container) : container(_container) { }
597
void reply(Message& received, Message& response, MSGID) {
598
container = response;
600
void reply(Message& received, Message& response) {
601
container = response;
603
unsigned remotePort(){
609
void getDatabaseNames( vector< string > &names ) {
610
boost::filesystem::path path( dbpath );
611
for ( boost::filesystem::directory_iterator i( path );
612
i != boost::filesystem::directory_iterator(); ++i ) {
613
string fileName = boost::filesystem::path(*i).leaf();
614
if ( fileName.length() > 3 && fileName.substr( fileName.length() - 3, 3 ) == ".ns" )
615
names.push_back( fileName.substr( 0, fileName.length() - 3 ) );
619
bool DBDirectClient::call( Message &toSend, Message &response, bool assertOk ) {
621
if ( lastError._get() )
622
lastError.startRequest( toSend, lastError._get() );
623
DbResponse dbResponse;
624
assembleResponse( toSend, dbResponse );
625
assert( dbResponse.response );
626
response = *dbResponse.response;
630
void DBDirectClient::say( Message &toSend ) {
632
if ( lastError._get() )
633
lastError.startRequest( toSend, lastError._get() );
634
DbResponse dbResponse;
635
assembleResponse( toSend, dbResponse );
638
auto_ptr<DBClientCursor> DBDirectClient::query(const string &ns, Query query, int nToReturn , int nToSkip ,
639
const BSONObj *fieldsToReturn , int queryOptions ){
641
//if ( ! query.obj.isEmpty() || nToReturn != 0 || nToSkip != 0 || fieldsToReturn || queryOptions )
642
return DBClientBase::query( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions );
644
//assert( query.obj.isEmpty() );
645
//throw UserException( (string)"yay:" + ns );
649
DBDirectClient::AlwaysAuthorized DBDirectClient::SavedContext::always;
651
DBClientBase * createDirectClient(){
652
return new DBDirectClient();
655
void recCacheCloseAll();
657
boost::mutex &exitMutex( *( new boost::mutex ) );
658
int numExitCalls = 0;
662
return numExitCalls > 0;
665
void tryToOutputFatal( const string& s ){
678
// uh - oh, not sure there is anything else we can do...
681
/* not using log() herein in case we are already locked */
682
void dbexit( ExitCode rc, const char *why) {
684
boostlock lk( exitMutex );
685
if ( numExitCalls++ > 0 ) {
686
if ( numExitCalls > 5 ){
687
// this means something horrible has happened
691
ss << "dbexit: " << why << "; exiting immediately" << endl;
692
tryToOutputFatal( ss.str() );
698
ss << "dbexit: " << why << endl;
699
tryToOutputFatal( ss.str() );
702
shutdown(); // gracefully shutdown instance
705
tryToOutputFatal( "shutdown failed with exception" );
708
tryToOutputFatal( "dbexit: really exiting now\n" );
715
log() << "\t shutdown: going to close listening sockets..." << endl;
716
ListeningSockets::get()->closeAll();
718
log() << "\t shutdown: going to flush oplog..." << endl;
723
/* must do this before unmapping mem or you may get a seg fault */
724
log() << "\t shutdown: going to close sockets..." << endl;
725
boost::thread close_socket_thread(closeAllSockets);
727
// wait until file preallocation finishes
728
// we would only hang here if the file_allocator code generates a
729
// synchronous signal, which we don't expect
730
log() << "\t shutdown: waiting for fs preallocator..." << endl;
731
theFileAllocator().waitUntilFinished();
733
log() << "\t shutdown: closing all files..." << endl;
735
MemoryMappedFile::closeAllFiles( ss3 );
738
// should we be locked here? we aren't. might be ok as-is.
741
#if !defined(_WIN32) && !defined(__sunos__)
743
log() << "\t shutdown: removing fs lock..." << endl;
744
if( ftruncate( lockFile , 0 ) )
745
log() << "\t couldn't remove fs lock " << OUTPUT_ERRNO << endl;
746
flock( lockFile, LOCK_UN );
751
void acquirePathLock() {
752
#if !defined(_WIN32) && !defined(__sunos__)
753
string name = ( boost::filesystem::path( dbpath ) / "mongod.lock" ).native_file_string();
754
lockFile = open( name.c_str(), O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO );
755
massert( 10309 , "Unable to create / open lock file for dbpath: " + name, lockFile > 0 );
756
massert( 10310 , "Unable to acquire lock for dbpath: " + name, flock( lockFile, LOCK_EX | LOCK_NB ) == 0 );
759
ss << getpid() << endl;
761
const char * data = s.c_str();
762
assert( write( lockFile , data , strlen( data ) ) );