5
* This program is free software: you can redistribute it and/or modify
6
* it under the terms of the GNU Affero General Public License, version 3,
7
* as published by the Free Software Foundation.
9
* This program is distributed in the hope that it will be useful,
10
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12
* GNU Affero General Public License for more details.
14
* You should have received a copy of the GNU Affero General Public License
15
* along with this program. If not, see <http://www.gnu.org/licenses/>.
22
#include "../scripting/engine.h"
23
#include "../client/dbclient.h"
24
#include "../client/connpool.h"
25
#include "../client/parallel.h"
34
bool operator()( const BSONObj &l, const BSONObj &r ) const {
35
return l.firstElement().woCompare( r.firstElement() ) < 0;
39
typedef pair<BSONObj,BSONObj> Data;
40
//typedef list< Data > InMemory;
41
typedef map< BSONObj,list<BSONObj>,MyCmp > InMemory;
43
BSONObj reduceValues( list<BSONObj>& values , Scope * s , ScriptingFunction reduce , bool final , ScriptingFunction finalize ){
44
uassert( 10074 , "need values" , values.size() );
46
int sizeEstimate = ( values.size() * values.begin()->getField( "value" ).size() ) + 128;
49
BSONObjBuilder reduceArgs( sizeEstimate );
51
BSONObjBuilder valueBuilder( sizeEstimate );
53
for ( list<BSONObj>::iterator i=values.begin(); i!=values.end(); i++){
56
BSONElement keyE = j.next();
58
reduceArgs.append( keyE );
63
valueBuilder.appendAs( j.next() , BSONObjBuilder::numStr( n++ ).c_str() );
66
reduceArgs.appendArray( "values" , valueBuilder.obj() );
67
BSONObj args = reduceArgs.obj();
69
s->invokeSafe( reduce , args );
70
if ( s->type( "return" ) == Array ){
71
uassert( 10075 , "reduce -> multiple not supported yet",0);
77
b.appendAs( key.firstElement() , "_id" );
78
s->append( b , "value" , "return" );
79
s->invokeSafe( finalize , b.obj() );
83
b.appendAs( key.firstElement() , final ? "_id" : "0" );
84
s->append( b , final ? "value" : "1" , "return" );
90
MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp = true ){
91
static int jobNumber = 1;
94
ns = dbname + "." + cmdObj.firstElement().valuestr();
96
verbose = cmdObj["verbose"].trueValue();
97
keeptemp = cmdObj["keeptemp"].trueValue();
103
ss << "mr." << cmdObj.firstElement().fieldName() << "_" << time(0) << "_" << jobNumber++;
104
tempShort = ss.str();
105
tempLong = dbname + "." + tempShort;
106
incLong = tempLong + "_inc";
108
if ( ! keeptemp && markAsTemp )
109
cc().addTempCollection( tempLong );
111
if ( cmdObj["out"].type() == String )
112
finalShort = cmdObj["out"].valuestr();
114
finalShort = tempShort;
116
finalLong = dbname + "." + finalShort;
121
mapCode = cmdObj["map"].ascode();
122
reduceCode = cmdObj["reduce"].ascode();
123
if ( cmdObj["finalize"].type() ){
124
finalizeCode = cmdObj["finalize"].ascode();
128
if ( cmdObj["mapparams"].type() == Array ){
129
mapparams = cmdObj["mapparams"].embeddedObjectUserCheck();
132
if ( cmdObj["scope"].type() == Object ){
133
scopeSetup = cmdObj["scope"].embeddedObjectUserCheck();
139
if ( cmdObj["query"].type() == Object ){
140
filter = cmdObj["query"].embeddedObjectUserCheck();
144
if ( cmdObj["sort"].type() == Object )
145
q.sort( cmdObj["sort"].embeddedObjectUserCheck() );
147
if ( cmdObj["limit"].isNumber() )
148
limit = cmdObj["limit"].numberLong();
155
@return number objects in collection
157
long long renameIfNeeded( DBDirectClient& db ){
158
if ( finalLong != tempLong ){
159
db.dropCollection( finalLong );
160
if ( db.count( tempLong ) ){
162
uassert( 10076 , "rename failed" , db.runCommand( "admin" , BSON( "renameCollection" << tempLong << "to" << finalLong ) , info ) );
165
return db.count( finalLong );
203
MRState( MRSetup& s ) : setup(s){
204
scope = globalScriptEngine->getPooledScope( setup.dbname );
205
scope->localConnect( setup.dbname.c_str() );
207
map = scope->createFunction( setup.mapCode.c_str() );
209
throw UserException( 9012, (string)"map compile failed: " + scope->getError() );
211
reduce = scope->createFunction( setup.reduceCode.c_str() );
213
throw UserException( 9013, (string)"reduce compile failed: " + scope->getError() );
215
if ( setup.finalizeCode.size() )
216
finalize = scope->createFunction( setup.finalizeCode.c_str() );
220
if ( ! setup.scopeSetup.isEmpty() )
221
scope->init( &setup.scopeSetup );
223
db.dropCollection( setup.tempLong );
224
db.dropCollection( setup.incLong );
226
writelock l( setup.incLong );
228
assert( userCreateNS( setup.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) );
232
void finalReduce( list<BSONObj>& values ){
233
if ( values.size() == 0 )
236
BSONObj key = values.begin()->firstElement().wrap( "_id" );
237
BSONObj res = reduceValues( values , scope.get() , reduce , 1 , finalize );
239
writelock l( setup.tempLong );
240
theDataFileMgr.insertAndLog( setup.tempLong.c_str() , res , false );
245
auto_ptr<Scope> scope;
248
ScriptingFunction map;
249
ScriptingFunction reduce;
250
ScriptingFunction finalize;
256
MRTL( MRState& state ) : _state( state ){
257
_temp = new InMemory();
266
void reduceInMemory(){
268
InMemory * old = _temp;
269
InMemory * n = new InMemory();
273
for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){
274
BSONObj key = i->first;
275
list<BSONObj>& all = i->second;
277
if ( all.size() == 1 ){
278
// this key has low cardinality, so just write to db
279
writelock l(_state.setup.incLong);
280
write( *(all.begin()) );
282
else if ( all.size() > 1 ){
283
BSONObj res = reduceValues( all , _state.scope.get() , _state.reduce , false , 0 );
293
writelock l(_state.setup.incLong);
295
for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){
296
list<BSONObj>& all = i->second;
297
if ( all.size() < 1 )
300
for ( list<BSONObj>::iterator j=all.begin(); j!=all.end(); j++ )
308
void insert( const BSONObj& a ){
309
list<BSONObj>& all = (*_temp)[a];
311
_size += a.objsize() + 16;
315
if ( _size < 1024 * 5 )
320
log(1) << " mr: did reduceInMemory " << before << " -->> " << _size << endl;
322
if ( _size < 1024 * 15 )
326
log(1) << " mr: dumping to db" << endl;
330
void write( BSONObj& o ){
331
theDataFileMgr.insert( _state.setup.incLong.c_str() , o , true );
343
boost::thread_specific_ptr<MRTL> _tlmr;
345
BSONObj fast_emit( const BSONObj& args ){
346
uassert( 10077 , "fast_emit takes 2 args" , args.nFields() == 2 );
347
_tlmr->insert( args );
352
class MapReduceCommand : public Command {
354
MapReduceCommand() : Command("mapreduce"){}
355
virtual bool slaveOk() { return true; }
357
virtual void help( stringstream &help ) const {
358
help << "see http://www.mongodb.org/display/DOCS/MapReduce";
361
bool run(const char *dbname, BSONObj& cmd, string& errmsg, BSONObjBuilder& result, bool fromRepl ){
364
MRSetup mr( cc().database()->name , cmd );
366
log(1) << "mr ns: " << mr.ns << endl;
368
if ( ! db.exists( mr.ns ) ){
369
errmsg = "ns doesn't exist";
373
bool shouldHaveData = false;
376
long long inReduce = 0;
378
BSONObjBuilder countsBuilder;
379
BSONObjBuilder timingBuilder;
383
state.scope->injectNative( "emit" , fast_emit );
385
MRTL * mrtl = new MRTL( state );
388
ProgressMeter pm( db.count( mr.ns , mr.filter ) );
389
auto_ptr<DBClientCursor> cursor = db.query( mr.ns , mr.q );
390
long long mapTime = 0;
392
while ( cursor->more() ){
393
BSONObj o = cursor->next();
395
if ( mr.verbose ) mt.reset();
397
state.scope->setThis( &o );
398
if ( state.scope->invoke( state.map , state.setup.mapparams , 0 , true ) )
399
throw UserException( 9014, (string)"map invoke failed: " + state.scope->getError() );
401
if ( mr.verbose ) mapTime += mt.micros();
404
if ( num % 100 == 0 ){
407
inReduce += t.micros();
408
dbtemprelease temprlease;
412
if ( mr.limit && num >= mr.limit )
416
countsBuilder.append( "input" , num );
417
countsBuilder.append( "emit" , mrtl->numEmits );
418
if ( mrtl->numEmits )
419
shouldHaveData = true;
421
timingBuilder.append( "mapTime" , mapTime / 1000 );
422
timingBuilder.append( "emitLoop" , t.millis() );
426
mrtl->reduceInMemory();
429
BSONObj sortKey = BSON( "0" << 1 );
430
db.ensureIndex( mr.incLong , sortKey );
435
ProgressMeter fpm( db.count( mr.incLong ) );
436
cursor = db.query( mr.incLong, Query().sort( sortKey ) );
438
while ( cursor->more() ){
439
BSONObj o = cursor->next().getOwned();
441
if ( o.woSortOrder( prev , sortKey ) == 0 ){
446
state.finalReduce( all );
455
state.finalReduce( all );
460
log() << "mr failed, removing collection" << endl;
461
db.dropCollection( mr.tempLong );
462
db.dropCollection( mr.incLong );
466
db.dropCollection( mr.incLong );
468
long long finalCount = mr.renameIfNeeded( db );
470
timingBuilder.append( "total" , t.millis() );
472
result.append( "result" , mr.finalShort );
473
result.append( "timeMillis" , t.millis() );
474
countsBuilder.append( "output" , finalCount );
475
if ( mr.verbose ) result.append( "timing" , timingBuilder.obj() );
476
result.append( "counts" , countsBuilder.obj() );
478
if ( finalCount == 0 && shouldHaveData ){
479
result.append( "cmd" , cmd );
480
errmsg = "there were emits but no data!";
492
class MapReduceFinishCommand : public Command {
494
MapReduceFinishCommand() : Command( "mapreduce.shardedfinish" ){}
495
virtual bool slaveOk() { return true; }
497
bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
498
dbtemprelease temprlease; // we don't touch the db directly
500
string dbname = cc().database()->name;
501
string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe();
503
MRSetup mr( dbname , cmdObj.firstElement().embeddedObjectUserCheck() , false );
505
set<ServerAndQuery> servers;
507
BSONObjBuilder shardCounts;
508
map<string,long long> counts;
510
BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck();
511
vector< auto_ptr<DBClientCursor> > shardCursors;
512
BSONObjIterator i( shards );
514
BSONElement e = i.next();
515
string shard = e.fieldName();
517
BSONObj res = e.embeddedObjectUserCheck();
519
uassert( 10078 , "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() );
520
servers.insert( shard );
521
shardCounts.appendAs( res["counts"] , shard.c_str() );
523
BSONObjIterator j( res["counts"].embeddedObjectUserCheck() );
525
BSONElement temp = j.next();
526
counts[temp.fieldName()] += temp.numberLong();
531
BSONObj sortKey = BSON( "_id" << 1 );
533
ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection ,
534
Query().sort( sortKey ) );
537
auto_ptr<Scope> s = globalScriptEngine->getPooledScope( ns );
538
ScriptingFunction reduceFunction = s->createFunction( mr.reduceCode.c_str() );
539
ScriptingFunction finalizeFunction = 0;
540
if ( mr.finalizeCode.size() )
541
finalizeFunction = s->createFunction( mr.finalizeCode.c_str() );
543
list<BSONObj> values;
545
result.append( "result" , mr.finalShort );
549
while ( cursor.more() ){
550
BSONObj t = cursor.next();
552
if ( values.size() == 0 ){
553
values.push_back( t );
557
if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ){
558
values.push_back( t );
563
db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) );
565
values.push_back( t );
569
db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) );
571
long long finalCount = mr.renameIfNeeded( db );
572
log(0) << " mapreducefinishcommand " << mr.finalLong << " " << finalCount << endl;
574
for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ){
575
ScopedDbConnection conn( i->_server );
576
conn->dropCollection( dbname + "." + shardedOutputCollection );
579
result.append( "shardCounts" , shardCounts.obj() );
583
for ( map<string,long long>::iterator i=counts.begin(); i!=counts.end(); i++ ){
584
c.append( i->first , i->second );
586
result.append( "counts" , c.obj() );
591
} mapReduceFinishCommand;