22
22
Cursor -- and its derived classes -- are our internal cursors.
26
#include "clientcursor.h"
27
#include "introspect.h"
25
#include "mongo/pch.h"
27
#include "mongo/db/clientcursor.h"
31
#include "repl_block.h"
32
#include "../util/processinfo.h"
33
#include "../util/timer.h"
34
33
#include "mongo/client/dbclientinterface.h"
34
#include "mongo/db/auth/action_set.h"
35
#include "mongo/db/auth/action_type.h"
36
#include "mongo/db/auth/authorization_manager.h"
37
#include "mongo/db/auth/privilege.h"
38
#include "mongo/db/commands.h"
39
#include "mongo/db/commands/server_status.h"
40
#include "mongo/db/db.h"
41
#include "mongo/db/introspect.h"
42
#include "mongo/db/jsobj.h"
43
#include "mongo/db/kill_current_op.h"
44
#include "mongo/db/pagefault.h"
45
#include "mongo/db/repl/rs.h"
46
#include "mongo/db/repl_block.h"
35
47
#include "mongo/db/scanandorder.h"
36
#include "pagefault.h"
37
#include "mongo/db/repl/rs.h"
48
#include "mongo/platform/random.h"
49
#include "mongo/util/processinfo.h"
50
#include "mongo/util/timer.h"
42
55
boost::recursive_mutex& ClientCursor::ccmutex( *(new boost::recursive_mutex()) );
43
56
long long ClientCursor::numberTimedOut = 0;
45
void aboutToDeleteForSharding( const Database* db , const DiskLoc& dl ); // from s/d_logic.h
58
void aboutToDeleteForSharding( const Database* db, const NamespaceDetails* nsd, const DiskLoc& dl ); // from s/d_logic.h
47
60
/*static*/ void ClientCursor::assertNoCursors() {
48
61
recursive_scoped_lock lock(ccmutex);
198
211
Database *db = cc().database();
199
212
CCByLoc& bl = db->ccByLoc;
200
213
RARELY if ( bl.size() > 70 ) {
201
log() << "perf warning: byLoc.size=" << bl.size() << " in aboutToDeleteBucket\n";
214
log() << "perf warning: byLoc.size=" << bl.size() << " in aboutToDeleteBucket" << endl;
203
216
for ( CCByLoc::iterator i = bl.begin(); i != bl.end(); i++ )
204
217
i->second->_c->aboutToDeleteBucket(b);
210
223
/* must call this on a delete so we clean up the cursors. */
211
void ClientCursor::aboutToDelete(const DiskLoc& dl) {
224
void ClientCursor::aboutToDelete(const NamespaceDetails* nsd, const DiskLoc& dl) {
212
225
NoPageFaultsAllowed npfa;
214
227
recursive_scoped_lock lock(ccmutex);
216
229
Database *db = cc().database();
219
aboutToDeleteForSharding( db , dl );
232
aboutToDeleteForSharding( db, nsd, dl );
221
234
CCByLoc& bl = db->ccByLoc;
222
235
CCByLoc::iterator j = bl.lower_bound(ByLocKey::min(dl));
438
BSONObj ClientCursor::extractKey( const KeyPattern& usingKeyPattern ) const {
439
KeyPattern currentIndex( _c->indexKeyPattern() );
440
if ( usingKeyPattern.isCoveredBy( currentIndex ) && ! currentIndex.isSpecial() ){
441
BSONObj currKey = _c->currKey();
442
BSONObj prettyKey = currKey.replaceFieldNames( currentIndex.toBSON() );
443
return usingKeyPattern.extractSingleKey( prettyKey );
445
return usingKeyPattern.extractSingleKey( _c->current() );
426
448
void ClientCursor::fillQueryResultFromObj( BufBuilder &b, const MatchDetails* details ) const {
427
449
const Projection::KeyOnly *keyFieldsOnly = c()->keyFieldsOnly();
428
450
if ( keyFieldsOnly ) {
546
568
dbtempreleasecond unlock;
547
569
if ( unlock.unlocked() ) {
548
570
if ( haveReadLock ) {
549
// don't sleep with a read lock
571
// This sleep helps reader threads yield to writer threads.
572
// Without this, the underlying reader/writer lock implementations
573
// are not sufficiently writer-greedy.
552
577
if ( micros == -1 )
636
661
return ClientCursor::recoverFromYield( data );
640
long long ctmLast = 0; // so we don't have to do find() which is a little slow very often.
665
// so we don't have to do find() which is a little slow very often.
666
long long cursorGenTSLast = 0;
667
PseudoRandom* cursorGenRandom = NULL;
641
670
long long ClientCursor::allocCursorId_inlock() {
642
long long ctm = curTimeMillis64();
671
// It is important that cursor IDs not be reused within a short period of time.
673
if ( ! cursorGenRandom ) {
674
scoped_ptr<SecureRandom> sr( SecureRandom::create() );
675
cursorGenRandom = new PseudoRandom( sr->nextInt64() );
678
const long long ts = Listener::getElapsedTimeMillis();
646
x = (((long long)rand()) << 32);
648
if ( ctm != ctmLast || ClientCursor::find_inlock(x, false) == 0 )
684
x |= cursorGenRandom->nextInt32();
692
if ( ts != cursorGenTSLast || ClientCursor::find_inlock(x, false) == 0 )
696
cursorGenTSLast = ts;
701
747
help << " example: { cursorInfo : 1 }";
703
749
virtual LockType locktype() const { return NONE; }
750
virtual void addRequiredPrivileges(const std::string& dbname,
751
const BSONObj& cmdObj,
752
std::vector<Privilege>* out) {
754
actions.addAction(ActionType::cursorInfo);
755
out->push_back(Privilege(AuthorizationManager::SERVER_RESOURCE_NAME, actions));
704
757
bool run(const string& dbname, BSONObj& jsobj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl ) {
705
758
ClientCursor::appendStats( result );
763
class CursorServerStats : public ServerStatusSection {
766
CursorServerStats() : ServerStatusSection( "cursors" ){}
767
virtual bool includeByDefault() const { return true; }
769
BSONObj generateSection(const BSONElement& configElement) const {
771
ClientCursor::appendStats( b );
711
778
Mem() { res = virt = mapped = 0; }
742
809
log() << " mapped:" << totalMapped;
744
log() << " connections:" << connTicketHolder.used();
811
log() << " connections:" << Listener::globalTicketHolder.used();
745
812
if (theReplSet) {
746
813
log() << " replication threads:" <<
747
814
ReplSetImpl::replWriterThreadCount +
786
bool ClientCursor::erase( CursorId id ) {
787
recursive_scoped_lock lock( ccmutex );
788
ClientCursor *cursor = find_inlock( id );
792
if ( ! cc().getAuthenticationInfo()->isAuthorizedReads( nsToDatabase( cursor->ns() ) ) )
853
bool ClientCursor::_erase_inlock(ClientCursor* cursor) {
795
854
// Must not have an active ClientCursor::Pin.
797
str::stream() << "Cannot kill active cursor " << id,
856
str::stream() << "Cannot kill active cursor " << cursor->cursorid(),
798
857
cursor->_pinValue < 100 );
863
bool ClientCursor::erase(CursorId id) {
864
recursive_scoped_lock lock(ccmutex);
865
ClientCursor* cursor = find_inlock(id);
870
return _erase_inlock(cursor);
873
bool ClientCursor::eraseIfAuthorized(CursorId id) {
876
recursive_scoped_lock lock(ccmutex);
877
ClientCursor* cursor = find_inlock(id);
884
// Can't be in a lock when checking authorization
885
if (!cc().getAuthorizationManager()->checkAuthorization(ns, ActionType::killCursors)) {
889
// It is safe to lookup the cursor again after temporarily releasing the mutex because
890
// of 2 invariants: that the cursor ID won't be re-used in a short period of time, and that
891
// the namespace associated with a cursor cannot change.
892
recursive_scoped_lock lock(ccmutex);
893
ClientCursor* cursor = find_inlock(id);
895
// Cursor was deleted in another thread since we found it earlier in this function.
898
if (cursor->ns() != ns) {
899
warning() << "Cursor namespace changed. Previous ns: " << ns << ", current ns: "
900
<< cursor->ns() << endl;
904
return _erase_inlock(cursor);
804
907
int ClientCursor::erase(int n, long long *ids) {
806
909
for ( int i = 0; i < n; i++ ) {
919
int ClientCursor::eraseIfAuthorized(int n, long long *ids) {
921
for ( int i = 0; i < n; i++ ) {
922
if ( eraseIfAuthorized(ids[i]))
817
931
ClientCursor::YieldLock::YieldLock( ptr<ClientCursor> cc )