~ubuntu-branches/ubuntu/trusty/mongodb/trusty-proposed

« back to all changes in this revision

Viewing changes to db/mr.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Antonin Kral
  • Date: 2010-01-29 19:48:45 UTC
  • Revision ID: james.westby@ubuntu.com-20100129194845-8wbmkf626fwcavc9
Tags: upstream-1.3.1
ImportĀ upstreamĀ versionĀ 1.3.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// mr.cpp
 
2
 
 
3
/**
 
4
 *
 
5
 *    This program is free software: you can redistribute it and/or  modify
 
6
 *    it under the terms of the GNU Affero General Public License, version 3,
 
7
 *    as published by the Free Software Foundation.
 
8
 *
 
9
 *    This program is distributed in the hope that it will be useful,
 
10
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
11
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
12
 *    GNU Affero General Public License for more details.
 
13
 *
 
14
 *    You should have received a copy of the GNU Affero General Public License
 
15
 *    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
16
 */
 
17
 
 
18
#include "stdafx.h"
 
19
#include "db.h"
 
20
#include "instance.h"
 
21
#include "commands.h"
 
22
#include "../scripting/engine.h"
 
23
#include "../client/dbclient.h"
 
24
#include "../client/connpool.h"
 
25
#include "../client/parallel.h"
 
26
 
 
27
namespace mongo {
 
28
 
 
29
    namespace mr {
 
30
 
 
31
        class MyCmp {
 
32
        public:
 
33
            MyCmp(){}
 
34
            bool operator()( const BSONObj &l, const BSONObj &r ) const {
 
35
                return l.firstElement().woCompare( r.firstElement() ) < 0;
 
36
            }
 
37
        };
 
38
 
 
39
        typedef pair<BSONObj,BSONObj> Data;
 
40
        //typedef list< Data > InMemory;
 
41
        typedef map< BSONObj,list<BSONObj>,MyCmp > InMemory;
 
42
 
 
43
        BSONObj reduceValues( list<BSONObj>& values , Scope * s , ScriptingFunction reduce , bool final , ScriptingFunction finalize ){
 
44
            uassert( 10074 ,  "need values" , values.size() );
 
45
            
 
46
            int sizeEstimate = ( values.size() * values.begin()->getField( "value" ).size() ) + 128;
 
47
            BSONObj key;
 
48
 
 
49
            BSONObjBuilder reduceArgs( sizeEstimate );
 
50
        
 
51
            BSONObjBuilder valueBuilder( sizeEstimate );
 
52
            int n = 0;
 
53
            for ( list<BSONObj>::iterator i=values.begin(); i!=values.end(); i++){
 
54
                BSONObj o = *i;
 
55
                BSONObjIterator j(o);
 
56
                BSONElement keyE = j.next();
 
57
                if ( n == 0 ){
 
58
                    reduceArgs.append( keyE );
 
59
                    BSONObjBuilder temp;
 
60
                    temp.append( keyE );
 
61
                    key = temp.obj();
 
62
                }
 
63
                valueBuilder.appendAs( j.next() , BSONObjBuilder::numStr( n++ ).c_str() );
 
64
            }
 
65
        
 
66
            reduceArgs.appendArray( "values" , valueBuilder.obj() );
 
67
            BSONObj args = reduceArgs.obj();
 
68
            
 
69
            s->invokeSafe( reduce , args );
 
70
            if ( s->type( "return" ) == Array ){
 
71
                uassert( 10075 , "reduce -> multiple not supported yet",0);                
 
72
                return BSONObj();
 
73
            }
 
74
            
 
75
            if ( finalize ){
 
76
                BSONObjBuilder b;
 
77
                b.appendAs( key.firstElement() , "_id" );
 
78
                s->append( b , "value" , "return" );
 
79
                s->invokeSafe( finalize , b.obj() );
 
80
            }
 
81
            
 
82
            BSONObjBuilder b;
 
83
            b.appendAs( key.firstElement() , final ? "_id" : "0" );
 
84
            s->append( b , final ? "value" : "1" , "return" );
 
85
            return b.obj();
 
86
        }
 
87
        
 
88
        class MRSetup {
 
89
        public:
 
90
            MRSetup( const string& _dbname , const BSONObj& cmdObj , bool markAsTemp = true ){
 
91
                static int jobNumber = 1;
 
92
                
 
93
                dbname = _dbname;
 
94
                ns = dbname + "." + cmdObj.firstElement().valuestr();
 
95
 
 
96
                verbose = cmdObj["verbose"].trueValue();
 
97
                keeptemp = cmdObj["keeptemp"].trueValue();
 
98
                
 
99
                { // setup names
 
100
                    stringstream ss;
 
101
                    if ( ! keeptemp )
 
102
                        ss << "tmp.";
 
103
                    ss << "mr." << cmdObj.firstElement().fieldName() << "_" << time(0) << "_" << jobNumber++;    
 
104
                    tempShort = ss.str();
 
105
                    tempLong = dbname + "." + tempShort;
 
106
                    incLong = tempLong + "_inc";
 
107
 
 
108
                    if ( ! keeptemp && markAsTemp )
 
109
                        cc().addTempCollection( tempLong );
 
110
 
 
111
                    if ( cmdObj["out"].type() == String )
 
112
                        finalShort = cmdObj["out"].valuestr();
 
113
                    else
 
114
                        finalShort = tempShort;
 
115
                    
 
116
                    finalLong = dbname + "." + finalShort;
 
117
                    
 
118
                }
 
119
             
 
120
                { // code
 
121
                    mapCode = cmdObj["map"].ascode();
 
122
                    reduceCode = cmdObj["reduce"].ascode();
 
123
                    if ( cmdObj["finalize"].type() ){
 
124
                        finalizeCode = cmdObj["finalize"].ascode();
 
125
                    }
 
126
                    
 
127
 
 
128
                    if ( cmdObj["mapparams"].type() == Array ){
 
129
                        mapparams = cmdObj["mapparams"].embeddedObjectUserCheck();
 
130
                    }
 
131
 
 
132
                    if ( cmdObj["scope"].type() == Object ){
 
133
                        scopeSetup = cmdObj["scope"].embeddedObjectUserCheck();
 
134
                    }
 
135
                    
 
136
                }
 
137
                
 
138
                { // query options
 
139
                    if ( cmdObj["query"].type() == Object ){
 
140
                        filter = cmdObj["query"].embeddedObjectUserCheck();
 
141
                        q = filter;
 
142
                    }
 
143
                    
 
144
                    if ( cmdObj["sort"].type() == Object )
 
145
                        q.sort( cmdObj["sort"].embeddedObjectUserCheck() );
 
146
 
 
147
                    if ( cmdObj["limit"].isNumber() )
 
148
                        limit = cmdObj["limit"].numberLong();
 
149
                    else 
 
150
                        limit = 0;
 
151
                }
 
152
            }
 
153
            
 
154
            /**
 
155
               @return number objects in collection
 
156
             */
 
157
            long long renameIfNeeded( DBDirectClient& db ){
 
158
                if ( finalLong != tempLong ){
 
159
                    db.dropCollection( finalLong );
 
160
                    if ( db.count( tempLong ) ){
 
161
                        BSONObj info;
 
162
                        uassert( 10076 ,  "rename failed" , db.runCommand( "admin" , BSON( "renameCollection" << tempLong << "to" << finalLong ) , info ) );
 
163
                    }
 
164
                }
 
165
                return db.count( finalLong );
 
166
            }
 
167
                
 
168
            string dbname;
 
169
            string ns;
 
170
            
 
171
            // options
 
172
            bool verbose;            
 
173
            bool keeptemp;
 
174
 
 
175
            // query options
 
176
            
 
177
            BSONObj filter;
 
178
            Query q;
 
179
            long long limit;
 
180
 
 
181
            // functions
 
182
            
 
183
            string mapCode;
 
184
            string reduceCode;
 
185
            string finalizeCode;
 
186
            
 
187
            BSONObj mapparams;
 
188
            BSONObj scopeSetup;
 
189
            
 
190
            // output tables
 
191
            string incLong;
 
192
            
 
193
            string tempShort;
 
194
            string tempLong;
 
195
            
 
196
            string finalShort;
 
197
            string finalLong;
 
198
            
 
199
        }; // end MRsetup
 
200
 
 
201
        class MRState {
 
202
        public:
 
203
            MRState( MRSetup& s ) : setup(s){
 
204
                scope = globalScriptEngine->getPooledScope( setup.dbname );
 
205
                scope->localConnect( setup.dbname.c_str() );
 
206
                
 
207
                map = scope->createFunction( setup.mapCode.c_str() );
 
208
                if ( ! map )
 
209
                    throw UserException( 9012, (string)"map compile failed: " + scope->getError() );
 
210
 
 
211
                reduce = scope->createFunction( setup.reduceCode.c_str() );
 
212
                if ( ! reduce )
 
213
                    throw UserException( 9013, (string)"reduce compile failed: " + scope->getError() );
 
214
 
 
215
                if ( setup.finalizeCode.size() )
 
216
                    finalize  = scope->createFunction( setup.finalizeCode.c_str() );
 
217
                else
 
218
                    finalize = 0;
 
219
                
 
220
                if ( ! setup.scopeSetup.isEmpty() )
 
221
                    scope->init( &setup.scopeSetup );
 
222
 
 
223
                db.dropCollection( setup.tempLong );
 
224
                db.dropCollection( setup.incLong );
 
225
                
 
226
                writelock l( setup.incLong );
 
227
                string err;
 
228
                assert( userCreateNS( setup.incLong.c_str() , BSON( "autoIndexId" << 0 ) , err , false ) );
 
229
 
 
230
            }
 
231
 
 
232
            void finalReduce( list<BSONObj>& values ){
 
233
                if ( values.size() == 0 )
 
234
                    return;
 
235
 
 
236
                BSONObj key = values.begin()->firstElement().wrap( "_id" );
 
237
                BSONObj res = reduceValues( values , scope.get() , reduce , 1 , finalize );
 
238
                
 
239
                writelock l( setup.tempLong );
 
240
                theDataFileMgr.insertAndLog( setup.tempLong.c_str() , res , false );
 
241
            }
 
242
 
 
243
            
 
244
            MRSetup& setup;
 
245
            auto_ptr<Scope> scope;
 
246
            DBDirectClient db;
 
247
 
 
248
            ScriptingFunction map;
 
249
            ScriptingFunction reduce;
 
250
            ScriptingFunction finalize;
 
251
            
 
252
        };
 
253
        
 
254
        class MRTL {
 
255
        public:
 
256
            MRTL( MRState& state ) : _state( state ){
 
257
                _temp = new InMemory();
 
258
                _size = 0;
 
259
                numEmits = 0;
 
260
            }
 
261
            ~MRTL(){
 
262
                delete _temp;
 
263
            }
 
264
            
 
265
            
 
266
            void reduceInMemory(){
 
267
                
 
268
                InMemory * old = _temp;
 
269
                InMemory * n = new InMemory();
 
270
                _temp = n;
 
271
                _size = 0;
 
272
                
 
273
                for ( InMemory::iterator i=old->begin(); i!=old->end(); i++ ){
 
274
                    BSONObj key = i->first;
 
275
                    list<BSONObj>& all = i->second;
 
276
                    
 
277
                    if ( all.size() == 1 ){
 
278
                        // this key has low cardinality, so just write to db
 
279
                        writelock l(_state.setup.incLong);
 
280
                        write( *(all.begin()) );
 
281
                    }
 
282
                    else if ( all.size() > 1 ){
 
283
                        BSONObj res = reduceValues( all , _state.scope.get() , _state.reduce , false , 0 );
 
284
                        insert( res );
 
285
                    }
 
286
                }
 
287
                
 
288
                delete( old );
 
289
 
 
290
            }
 
291
 
 
292
            void dump(){
 
293
                writelock l(_state.setup.incLong);
 
294
                    
 
295
                for ( InMemory::iterator i=_temp->begin(); i!=_temp->end(); i++ ){
 
296
                    list<BSONObj>& all = i->second;
 
297
                    if ( all.size() < 1 )
 
298
                        continue;
 
299
                    
 
300
                    for ( list<BSONObj>::iterator j=all.begin(); j!=all.end(); j++ )
 
301
                        write( *j );
 
302
                }
 
303
                _temp->clear();
 
304
                _size = 0;
 
305
 
 
306
            }
 
307
            
 
308
            void insert( const BSONObj& a ){
 
309
                list<BSONObj>& all = (*_temp)[a];
 
310
                all.push_back( a );
 
311
                _size += a.objsize() + 16;
 
312
            }
 
313
 
 
314
            void checkSize(){
 
315
                if ( _size < 1024 * 5 )
 
316
                    return;
 
317
 
 
318
                long before = _size;
 
319
                reduceInMemory();
 
320
                log(1) << "  mr: did reduceInMemory  " << before << " -->> " << _size << endl;
 
321
 
 
322
                if ( _size < 1024 * 15 )
 
323
                    return;
 
324
                
 
325
                dump();
 
326
                log(1) << "  mr: dumping to db" << endl;
 
327
            }
 
328
 
 
329
        private:
 
330
            void write( BSONObj& o ){
 
331
                theDataFileMgr.insert( _state.setup.incLong.c_str() , o , true );
 
332
            }
 
333
            
 
334
            MRState& _state;
 
335
        
 
336
            InMemory * _temp;
 
337
            long _size;
 
338
            
 
339
        public:
 
340
            long long numEmits;
 
341
        };
 
342
 
 
343
        boost::thread_specific_ptr<MRTL> _tlmr;
 
344
 
 
345
        BSONObj fast_emit( const BSONObj& args ){
 
346
            uassert( 10077 ,  "fast_emit takes 2 args" , args.nFields() == 2 );
 
347
            _tlmr->insert( args );
 
348
            _tlmr->numEmits++;
 
349
            return BSONObj();
 
350
        }
 
351
 
 
352
        class MapReduceCommand : public Command {
 
353
        public:
 
354
            MapReduceCommand() : Command("mapreduce"){}
 
355
            virtual bool slaveOk() { return true; }
 
356
        
 
357
            virtual void help( stringstream &help ) const {
 
358
                help << "see http://www.mongodb.org/display/DOCS/MapReduce";
 
359
            }
 
360
        
 
361
            bool run(const char *dbname, BSONObj& cmd, string& errmsg, BSONObjBuilder& result, bool fromRepl ){
 
362
                Timer t;
 
363
                Client::GodScope cg;
 
364
                MRSetup mr( cc().database()->name , cmd );
 
365
 
 
366
                log(1) << "mr ns: " << mr.ns << endl;
 
367
                
 
368
                if ( ! db.exists( mr.ns ) ){
 
369
                    errmsg = "ns doesn't exist";
 
370
                    return false;
 
371
                }
 
372
                
 
373
                bool shouldHaveData = false;
 
374
                
 
375
                long long num = 0;
 
376
                long long inReduce = 0;
 
377
                
 
378
                BSONObjBuilder countsBuilder;
 
379
                BSONObjBuilder timingBuilder;
 
380
                try {
 
381
                    
 
382
                    MRState state( mr );
 
383
                    state.scope->injectNative( "emit" , fast_emit );
 
384
                    
 
385
                    MRTL * mrtl = new MRTL( state );
 
386
                    _tlmr.reset( mrtl );
 
387
 
 
388
                    ProgressMeter pm( db.count( mr.ns , mr.filter ) );
 
389
                    auto_ptr<DBClientCursor> cursor = db.query( mr.ns , mr.q );
 
390
                    long long mapTime = 0;
 
391
                    Timer mt;
 
392
                    while ( cursor->more() ){
 
393
                        BSONObj o = cursor->next(); 
 
394
                    
 
395
                        if ( mr.verbose ) mt.reset();
 
396
                        
 
397
                        state.scope->setThis( &o );
 
398
                        if ( state.scope->invoke( state.map , state.setup.mapparams , 0 , true ) )
 
399
                            throw UserException( 9014, (string)"map invoke failed: " + state.scope->getError() );
 
400
                        
 
401
                        if ( mr.verbose ) mapTime += mt.micros();
 
402
                        
 
403
                        num++;
 
404
                        if ( num % 100 == 0 ){
 
405
                            Timer t;
 
406
                            mrtl->checkSize();
 
407
                            inReduce += t.micros();
 
408
                            dbtemprelease temprlease;
 
409
                        }
 
410
                        pm.hit();
 
411
 
 
412
                        if ( mr.limit && num >= mr.limit )
 
413
                            break;
 
414
                    }
 
415
                    
 
416
                    countsBuilder.append( "input" , num );
 
417
                    countsBuilder.append( "emit" , mrtl->numEmits );
 
418
                    if ( mrtl->numEmits )
 
419
                        shouldHaveData = true;
 
420
                    
 
421
                    timingBuilder.append( "mapTime" , mapTime / 1000 );
 
422
                    timingBuilder.append( "emitLoop" , t.millis() );
 
423
                    
 
424
                    // final reduce
 
425
                    
 
426
                    mrtl->reduceInMemory();
 
427
                    mrtl->dump();
 
428
                    
 
429
                    BSONObj sortKey = BSON( "0" << 1 );
 
430
                    db.ensureIndex( mr.incLong , sortKey );
 
431
                    
 
432
                    BSONObj prev;
 
433
                    list<BSONObj> all;
 
434
                    
 
435
                    ProgressMeter fpm( db.count( mr.incLong ) );
 
436
                    cursor = db.query( mr.incLong, Query().sort( sortKey ) );
 
437
 
 
438
                    while ( cursor->more() ){
 
439
                        BSONObj o = cursor->next().getOwned();
 
440
                        
 
441
                        if ( o.woSortOrder( prev , sortKey ) == 0 ){
 
442
                            all.push_back( o );
 
443
                            continue;
 
444
                        }
 
445
                        
 
446
                        state.finalReduce( all );
 
447
                        
 
448
                        all.clear();
 
449
                        prev = o;
 
450
                        all.push_back( o );
 
451
                        fpm.hit();
 
452
                        dbtemprelease tl;
 
453
                    }
 
454
                    
 
455
                    state.finalReduce( all );
 
456
 
 
457
                    _tlmr.reset( 0 );
 
458
                }
 
459
                catch ( ... ){
 
460
                    log() << "mr failed, removing collection" << endl;
 
461
                    db.dropCollection( mr.tempLong );
 
462
                    db.dropCollection( mr.incLong );
 
463
                    throw;
 
464
                }
 
465
                
 
466
                db.dropCollection( mr.incLong );
 
467
                
 
468
                long long finalCount = mr.renameIfNeeded( db );
 
469
 
 
470
                timingBuilder.append( "total" , t.millis() );
 
471
                
 
472
                result.append( "result" , mr.finalShort );
 
473
                result.append( "timeMillis" , t.millis() );
 
474
                countsBuilder.append( "output" , finalCount );
 
475
                if ( mr.verbose ) result.append( "timing" , timingBuilder.obj() );
 
476
                result.append( "counts" , countsBuilder.obj() );
 
477
 
 
478
                if ( finalCount == 0 && shouldHaveData ){
 
479
                    result.append( "cmd" , cmd );
 
480
                    errmsg = "there were emits but no data!";
 
481
                    return false;
 
482
                }
 
483
 
 
484
                return true;
 
485
            }
 
486
 
 
487
        private:
 
488
            DBDirectClient db;
 
489
 
 
490
        } mapReduceCommand;
 
491
        
 
492
        class MapReduceFinishCommand : public Command {
 
493
        public:
 
494
            MapReduceFinishCommand() : Command( "mapreduce.shardedfinish" ){}
 
495
            virtual bool slaveOk() { return true; }
 
496
 
 
497
            bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
 
498
                dbtemprelease temprlease; // we don't touch the db directly
 
499
                                    
 
500
                string dbname = cc().database()->name;
 
501
                string shardedOutputCollection = cmdObj["shardedOutputCollection"].valuestrsafe();
 
502
 
 
503
                MRSetup mr( dbname , cmdObj.firstElement().embeddedObjectUserCheck() , false );
 
504
                
 
505
                set<ServerAndQuery> servers;
 
506
                
 
507
                BSONObjBuilder shardCounts;
 
508
                map<string,long long> counts;
 
509
                
 
510
                BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck();
 
511
                vector< auto_ptr<DBClientCursor> > shardCursors;
 
512
                BSONObjIterator i( shards );
 
513
                while ( i.more() ){
 
514
                    BSONElement e = i.next();
 
515
                    string shard = e.fieldName();
 
516
 
 
517
                    BSONObj res = e.embeddedObjectUserCheck();
 
518
                    
 
519
                    uassert( 10078 ,  "something bad happened" , shardedOutputCollection == res["result"].valuestrsafe() );
 
520
                    servers.insert( shard );
 
521
                    shardCounts.appendAs( res["counts"] , shard.c_str() );
 
522
 
 
523
                    BSONObjIterator j( res["counts"].embeddedObjectUserCheck() );
 
524
                    while ( j.more() ){
 
525
                        BSONElement temp = j.next();
 
526
                        counts[temp.fieldName()] += temp.numberLong();
 
527
                    }
 
528
 
 
529
                }
 
530
 
 
531
                BSONObj sortKey = BSON( "_id" << 1 );
 
532
 
 
533
                ParallelSortClusteredCursor cursor( servers , dbname + "." + shardedOutputCollection ,
 
534
                                                    Query().sort( sortKey ) );
 
535
                
 
536
                
 
537
                auto_ptr<Scope> s = globalScriptEngine->getPooledScope( ns );
 
538
                ScriptingFunction reduceFunction = s->createFunction( mr.reduceCode.c_str() );
 
539
                ScriptingFunction finalizeFunction = 0;
 
540
                if ( mr.finalizeCode.size() )
 
541
                    finalizeFunction = s->createFunction( mr.finalizeCode.c_str() );
 
542
 
 
543
                list<BSONObj> values;
 
544
 
 
545
                result.append( "result" , mr.finalShort );
 
546
 
 
547
                DBDirectClient db;
 
548
                
 
549
                while ( cursor.more() ){
 
550
                    BSONObj t = cursor.next();
 
551
                                        
 
552
                    if ( values.size() == 0 ){
 
553
                        values.push_back( t );
 
554
                        continue;
 
555
                    }
 
556
                    
 
557
                    if ( t.woSortOrder( *(values.begin()) , sortKey ) == 0 ){
 
558
                        values.push_back( t );
 
559
                        continue;
 
560
                    }
 
561
                    
 
562
 
 
563
                    db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) );
 
564
                    values.clear();
 
565
                    values.push_back( t );
 
566
                }
 
567
                
 
568
                if ( values.size() )
 
569
                    db.insert( mr.tempLong , reduceValues( values , s.get() , reduceFunction , 1 , finalizeFunction ) );
 
570
                
 
571
                long long finalCount = mr.renameIfNeeded( db );
 
572
                log(0) << " mapreducefinishcommand " << mr.finalLong << " " << finalCount << endl;
 
573
 
 
574
                for ( set<ServerAndQuery>::iterator i=servers.begin(); i!=servers.end(); i++ ){
 
575
                    ScopedDbConnection conn( i->_server );
 
576
                    conn->dropCollection( dbname + "." + shardedOutputCollection );
 
577
                }
 
578
                
 
579
                result.append( "shardCounts" , shardCounts.obj() );
 
580
                
 
581
                {
 
582
                    BSONObjBuilder c;
 
583
                    for ( map<string,long long>::iterator i=counts.begin(); i!=counts.end(); i++ ){
 
584
                        c.append( i->first , i->second );
 
585
                    }
 
586
                    result.append( "counts" , c.obj() );
 
587
                }
 
588
 
 
589
                return 1;
 
590
            }
 
591
        } mapReduceFinishCommand;
 
592
 
 
593
    }
 
594
 
 
595
}
 
596