~ubuntu-branches/ubuntu/trusty/mongodb/trusty-proposed

« back to all changes in this revision

Viewing changes to db/btree.cpp

  • Committer: Bazaar Package Importer
  • Author(s): Antonin Kral
  • Date: 2010-01-29 19:48:45 UTC
  • Revision ID: james.westby@ubuntu.com-20100129194845-8wbmkf626fwcavc9
Tags: upstream-1.3.1
ImportĀ upstreamĀ versionĀ 1.3.1

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
// btree.cpp
 
2
 
 
3
/**
 
4
*    Copyright (C) 2008 10gen Inc.
 
5
*
 
6
*    This program is free software: you can redistribute it and/or  modify
 
7
*    it under the terms of the GNU Affero General Public License, version 3,
 
8
*    as published by the Free Software Foundation.
 
9
*
 
10
*    This program is distributed in the hope that it will be useful,
 
11
*    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
12
*    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
13
*    GNU Affero General Public License for more details.
 
14
*
 
15
*    You should have received a copy of the GNU Affero General Public License
 
16
*    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
17
*/
 
18
 
 
19
#include "stdafx.h"
 
20
#include "db.h"
 
21
#include "btree.h"
 
22
#include "pdfile.h"
 
23
#include "json.h"
 
24
#include "clientcursor.h"
 
25
#include "client.h"
 
26
#include "dbhelpers.h"
 
27
#include "curop.h"
 
28
 
 
29
namespace mongo {
 
30
 
 
31
#define VERIFYTHISLOC dassert( thisLoc.btree() == this );
 
32
 
 
33
    KeyNode::KeyNode(const BucketBasics& bb, const _KeyNode &k) :
 
34
            prevChildBucket(k.prevChildBucket),
 
35
            recordLoc(k.recordLoc), key(bb.data+k.keyDataOfs())
 
36
    { }
 
37
 
 
38
    const int KeyMax = BucketSize / 10;
 
39
 
 
40
    extern int otherTraceLevel;
 
41
    const int split_debug = 0;
 
42
    const int insert_debug = 0;
 
43
 
 
44
    /* BucketBasics --------------------------------------------------- */
 
45
 
 
46
    inline void BucketBasics::modified(const DiskLoc& thisLoc) {
 
47
        VERIFYTHISLOC
 
48
        btreeStore->modified(thisLoc);
 
49
    }
 
50
 
 
51
    int BucketBasics::Size() const {
 
52
        assert( _Size == BucketSize );
 
53
        return _Size;
 
54
    }
 
55
    inline void BucketBasics::setNotPacked() {
 
56
        flags &= ~Packed;
 
57
    }
 
58
    inline void BucketBasics::setPacked() {
 
59
        flags |= Packed;
 
60
    }
 
61
 
 
62
    void BucketBasics::_shape(int level, stringstream& ss) {
 
63
        for ( int i = 0; i < level; i++ ) ss << ' ';
 
64
        ss << "*\n";
 
65
        for ( int i = 0; i < n; i++ )
 
66
            if ( !k(i).prevChildBucket.isNull() )
 
67
                k(i).prevChildBucket.btree()->_shape(level+1,ss);
 
68
        if ( !nextChild.isNull() )
 
69
            nextChild.btree()->_shape(level+1,ss);
 
70
    }
 
71
 
 
72
    int bt_fv=0;
 
73
    int bt_dmp=0;
 
74
 
 
75
    void BucketBasics::dumpTree(DiskLoc thisLoc, const BSONObj &order) {
 
76
        bt_dmp=1;
 
77
        fullValidate(thisLoc, order);
 
78
        bt_dmp=0;
 
79
    }
 
80
 
 
81
    int BucketBasics::fullValidate(const DiskLoc& thisLoc, const BSONObj &order) {
 
82
        {
 
83
            bool f = false;
 
84
            assert( f = true );
 
85
            massert( 10281 , "assert is misdefined", f);
 
86
        }
 
87
 
 
88
        killCurrentOp.checkForInterrupt();
 
89
        assertValid(order, true);
 
90
//      if( bt_fv==0 )
 
91
//              return;
 
92
 
 
93
        if ( bt_dmp ) {
 
94
            out() << thisLoc.toString() << ' ';
 
95
            ((BtreeBucket *) this)->dump();
 
96
        }
 
97
 
 
98
        // keycount
 
99
        int kc = 0;
 
100
 
 
101
        for ( int i = 0; i < n; i++ ) {
 
102
            _KeyNode& kn = k(i);
 
103
 
 
104
            if ( kn.isUsed() ) kc++;
 
105
            if ( !kn.prevChildBucket.isNull() ) {
 
106
                DiskLoc left = kn.prevChildBucket;
 
107
                BtreeBucket *b = left.btree();
 
108
                wassert( b->parent == thisLoc );
 
109
                kc += b->fullValidate(kn.prevChildBucket, order);
 
110
            }
 
111
        }
 
112
        if ( !nextChild.isNull() ) {
 
113
            BtreeBucket *b = nextChild.btree();
 
114
            wassert( b->parent == thisLoc );
 
115
            kc += b->fullValidate(nextChild, order);
 
116
        }
 
117
 
 
118
        return kc;
 
119
    }
 
120
 
 
121
    int nDumped = 0;
 
122
 
 
123
    void BucketBasics::assertValid(const BSONObj &order, bool force) {
 
124
        if ( !debug && !force )
 
125
            return;
 
126
        wassert( n >= 0 && n < Size() );
 
127
        wassert( emptySize >= 0 && emptySize < BucketSize );
 
128
        wassert( topSize >= n && topSize <= BucketSize );
 
129
        DEV {
 
130
            // slow:
 
131
            for ( int i = 0; i < n-1; i++ ) {
 
132
                BSONObj k1 = keyNode(i).key;
 
133
                BSONObj k2 = keyNode(i+1).key;
 
134
                int z = k1.woCompare(k2, order); //OK
 
135
                if ( z > 0 ) {
 
136
                    out() << "ERROR: btree key order corrupt.  Keys:" << endl;
 
137
                    if ( ++nDumped < 5 ) {
 
138
                        for ( int j = 0; j < n; j++ ) {
 
139
                            out() << "  " << keyNode(j).key.toString() << endl;
 
140
                        }
 
141
                        ((BtreeBucket *) this)->dump();
 
142
                    }
 
143
                    wassert(false);
 
144
                    break;
 
145
                }
 
146
                else if ( z == 0 ) {
 
147
                    if ( !(k(i).recordLoc < k(i+1).recordLoc) ) {
 
148
                        out() << "ERROR: btree key order corrupt (recordloc's wrong).  Keys:" << endl;
 
149
                        out() << " k(" << i << "):" << keyNode(i).key.toString() << " RL:" << k(i).recordLoc.toString() << endl;
 
150
                        out() << " k(" << i+1 << "):" << keyNode(i+1).key.toString() << " RL:" << k(i+1).recordLoc.toString() << endl;
 
151
                        wassert( k(i).recordLoc < k(i+1).recordLoc );
 
152
                    }
 
153
                }
 
154
            }
 
155
        }
 
156
        else {
 
157
            //faster:
 
158
            if ( n > 1 ) {
 
159
                BSONObj k1 = keyNode(0).key;
 
160
                BSONObj k2 = keyNode(n-1).key;
 
161
                int z = k1.woCompare(k2, order);
 
162
                //wassert( z <= 0 );
 
163
                if ( z > 0 ) {
 
164
                    problem() << "btree keys out of order" << '\n';
 
165
                    ONCE {
 
166
                        ((BtreeBucket *) this)->dump();
 
167
                    }
 
168
                    assert(false);
 
169
                }
 
170
            }
 
171
        }
 
172
    }
 
173
 
 
174
    inline void BucketBasics::markUnused(int keypos) {
 
175
        assert( keypos >= 0 && keypos < n );
 
176
        k(keypos).setUnused();
 
177
    }
 
178
 
 
179
    inline int BucketBasics::totalDataSize() const {
 
180
        return Size() - (data-(char*)this);
 
181
    }
 
182
 
 
183
    void BucketBasics::init() {
 
184
        parent.Null();
 
185
        nextChild.Null();
 
186
        _Size = BucketSize;
 
187
        flags = Packed;
 
188
        n = 0;
 
189
        emptySize = totalDataSize();
 
190
        topSize = 0;
 
191
        reserved = 0;
 
192
    }
 
193
 
 
194
    /* see _alloc */
 
195
    inline void BucketBasics::_unalloc(int bytes) {
 
196
        topSize -= bytes;
 
197
        emptySize += bytes;
 
198
    }
 
199
 
 
200
    /* we allocate space from the end of the buffer for data.
 
201
       the keynodes grow from the front.
 
202
    */
 
203
    inline int BucketBasics::_alloc(int bytes) {
 
204
        topSize += bytes;
 
205
        emptySize -= bytes;
 
206
        int ofs = totalDataSize() - topSize;
 
207
        assert( ofs > 0 );
 
208
        return ofs;
 
209
    }
 
210
 
 
211
    void BucketBasics::_delKeyAtPos(int keypos) {
 
212
        assert( keypos >= 0 && keypos <= n );
 
213
        assert( childForPos(keypos).isNull() );
 
214
        n--;
 
215
        assert( n > 0 || nextChild.isNull() );
 
216
        for ( int j = keypos; j < n; j++ )
 
217
            k(j) = k(j+1);
 
218
        emptySize += sizeof(_KeyNode);
 
219
        setNotPacked();
 
220
    }
 
221
 
 
222
    /* pull rightmost key from the bucket.  this version requires its right child to be null so it 
 
223
           does not bother returning that value.
 
224
    */
 
225
    void BucketBasics::popBack(DiskLoc& recLoc, BSONObj& key) { 
 
226
        massert( 10282 ,  "n==0 in btree popBack()", n > 0 );
 
227
        assert( k(n-1).isUsed() ); // no unused skipping in this function at this point - btreebuilder doesn't require that
 
228
        KeyNode kn = keyNode(n-1);
 
229
        recLoc = kn.recordLoc;
 
230
        key = kn.key;
 
231
        int keysize = kn.key.objsize();
 
232
 
 
233
                massert( 10283 , "rchild not null in btree popBack()", nextChild.isNull());
 
234
 
 
235
                /* weirdly, we also put the rightmost down pointer in nextchild, even when bucket isn't full. */
 
236
                nextChild = kn.prevChildBucket;
 
237
 
 
238
        n--;
 
239
        emptySize += sizeof(_KeyNode);
 
240
        _unalloc(keysize);
 
241
    }
 
242
 
 
243
    /* add a key.  must be > all existing.  be careful to set next ptr right. */
 
244
    bool BucketBasics::_pushBack(const DiskLoc& recordLoc, BSONObj& key, const BSONObj &order, DiskLoc prevChild) {
 
245
        int bytesNeeded = key.objsize() + sizeof(_KeyNode);
 
246
        if ( bytesNeeded > emptySize )
 
247
            return false;
 
248
        assert( bytesNeeded <= emptySize );
 
249
        assert( n == 0 || keyNode(n-1).key.woCompare(key, order) <= 0 );
 
250
        emptySize -= sizeof(_KeyNode);
 
251
        _KeyNode& kn = k(n++);
 
252
        kn.prevChildBucket = prevChild;
 
253
        kn.recordLoc = recordLoc;
 
254
        kn.setKeyDataOfs( (short) _alloc(key.objsize()) );
 
255
        char *p = dataAt(kn.keyDataOfs());
 
256
        memcpy(p, key.objdata(), key.objsize());
 
257
        return true;
 
258
    }
 
259
    /*void BucketBasics::pushBack(const DiskLoc& recordLoc, BSONObj& key, const BSONObj &order, DiskLoc prevChild, DiskLoc nextChild) { 
 
260
        pushBack(recordLoc, key, order, prevChild);
 
261
        childForPos(n) = nextChild;
 
262
    }*/
 
263
 
 
264
    /* insert a key in a bucket with no complexity -- no splits required */
 
265
    bool BucketBasics::basicInsert(const DiskLoc& thisLoc, int keypos, const DiskLoc& recordLoc, const BSONObj& key, const BSONObj &order) {
 
266
        modified(thisLoc);
 
267
        assert( keypos >= 0 && keypos <= n );
 
268
        int bytesNeeded = key.objsize() + sizeof(_KeyNode);
 
269
        if ( bytesNeeded > emptySize ) {
 
270
            pack( order );
 
271
            if ( bytesNeeded > emptySize )
 
272
                return false;
 
273
        }
 
274
        for ( int j = n; j > keypos; j-- ) // make room
 
275
            k(j) = k(j-1);
 
276
        n++;
 
277
        emptySize -= sizeof(_KeyNode);
 
278
        _KeyNode& kn = k(keypos);
 
279
        kn.prevChildBucket.Null();
 
280
        kn.recordLoc = recordLoc;
 
281
        kn.setKeyDataOfs((short) _alloc(key.objsize()) );
 
282
        char *p = dataAt(kn.keyDataOfs());
 
283
        memcpy(p, key.objdata(), key.objsize());
 
284
        return true;
 
285
    }
 
286
 
 
287
    /* when we delete things we just leave empty space until the node is
 
288
       full and then we repack it.
 
289
    */
 
290
    void BucketBasics::pack( const BSONObj &order ) {
 
291
        if ( flags & Packed )
 
292
            return;
 
293
 
 
294
        int tdz = totalDataSize();
 
295
        char temp[BucketSize];
 
296
        int ofs = tdz;
 
297
        topSize = 0;
 
298
        for ( int j = 0; j < n; j++ ) {
 
299
            short ofsold = k(j).keyDataOfs();
 
300
            int sz = keyNode(j).key.objsize();
 
301
            ofs -= sz;
 
302
            topSize += sz;
 
303
            memcpy(temp+ofs, dataAt(ofsold), sz);
 
304
            k(j).setKeyDataOfsSavingUse( ofs );
 
305
        }
 
306
        int dataUsed = tdz - ofs;
 
307
        memcpy(data + ofs, temp + ofs, dataUsed);
 
308
        emptySize = tdz - dataUsed - n * sizeof(_KeyNode);
 
309
        assert( emptySize >= 0 );
 
310
 
 
311
        setPacked();
 
312
        assertValid( order );
 
313
    }
 
314
 
 
315
    inline void BucketBasics::truncateTo(int N, const BSONObj &order) {
 
316
        n = N;
 
317
        setNotPacked();
 
318
        pack( order );
 
319
    }
 
320
 
 
321
    /* - BtreeBucket --------------------------------------------------- */
 
322
 
 
323
    /* return largest key in the subtree. */
 
324
    void BtreeBucket::findLargestKey(const DiskLoc& thisLoc, DiskLoc& largestLoc, int& largestKey) {
 
325
        DiskLoc loc = thisLoc;
 
326
        while ( 1 ) {
 
327
            BtreeBucket *b = loc.btree();
 
328
            if ( !b->nextChild.isNull() ) {
 
329
                loc = b->nextChild;
 
330
                continue;
 
331
            }
 
332
 
 
333
            assert(b->n>0);
 
334
            largestLoc = loc;
 
335
            largestKey = b->n-1;
 
336
 
 
337
            break;
 
338
        }
 
339
    }
 
340
 
 
341
    bool BtreeBucket::exists(const IndexDetails& idx, DiskLoc thisLoc, const BSONObj& key, BSONObj order) { 
 
342
        int pos;
 
343
        bool found;
 
344
        DiskLoc b = locate(idx, thisLoc, key, order, pos, found, minDiskLoc);
 
345
 
 
346
        // skip unused keys
 
347
        while ( 1 ) {
 
348
            if( b.isNull() )
 
349
                break;
 
350
            BtreeBucket *bucket = b.btree();
 
351
            _KeyNode& kn = bucket->k(pos);
 
352
            if ( kn.isUsed() )
 
353
                return bucket->keyAt(pos).woEqual(key);
 
354
            b = bucket->advance(b, pos, 1, "BtreeBucket::exists");
 
355
        }
 
356
        return false;
 
357
    }
 
358
 
 
359
    string BtreeBucket::dupKeyError( const IndexDetails& idx , const BSONObj& key ){
 
360
        stringstream ss;
 
361
        ss << "E11000 duplicate key error";
 
362
        ss << "index: " << idx.indexNamespace() << "  ";
 
363
        ss << "dup key: " << key;
 
364
        return ss.str();
 
365
    }
 
366
 
 
367
    /* Find a key withing this btree bucket.
 
368
 
 
369
       When duplicate keys are allowed, we use the DiskLoc of the record as if it were part of the 
 
370
       key.  That assures that even when there are many duplicates (e.g., 1 million) for a key,
 
371
       our performance is still good.
 
372
 
 
373
       assertIfDup: if the key exists (ignoring the recordLoc), uassert
 
374
 
 
375
       pos: for existing keys k0...kn-1.
 
376
       returns # it goes BEFORE.  so key[pos-1] < key < key[pos]
 
377
       returns n if it goes after the last existing key.
 
378
       note result might be an Unused location!
 
379
    */
 
380
        char foo;
 
381
    bool BtreeBucket::find(const IndexDetails& idx, const BSONObj& key, DiskLoc recordLoc, const BSONObj &order, int& pos, bool assertIfDup) {
 
382
#if defined(_EXPERIMENT1)
 
383
                {
 
384
                        char *z = (char *) this;
 
385
                        int i = 0;
 
386
                        while( 1 ) {
 
387
                                i += 4096;
 
388
                                if( i >= BucketSize )
 
389
                                        break;
 
390
                                foo += z[i];
 
391
                        }
 
392
                }
 
393
#endif
 
394
        /* binary search for this key */
 
395
        bool dupsChecked = false;
 
396
        int l=0;
 
397
        int h=n-1;
 
398
        while ( l <= h ) {
 
399
            int m = (l+h)/2;
 
400
            KeyNode M = keyNode(m);
 
401
            int x = key.woCompare(M.key, order);
 
402
            if ( x == 0 ) { 
 
403
                if( assertIfDup ) {
 
404
                    if( k(m).isUnused() ) { 
 
405
                        // ok that key is there if unused.  but we need to check that there aren't other 
 
406
                        // entries for the key then.  as it is very rare that we get here, we don't put any 
 
407
                        // coding effort in here to make this particularly fast
 
408
                        if( !dupsChecked ) { 
 
409
                            dupsChecked = true;
 
410
                            if( idx.head.btree()->exists(idx, idx.head, key, order) )
 
411
                                uasserted( ASSERT_ID_DUPKEY , dupKeyError( idx , key ) );
 
412
                        }
 
413
                    }
 
414
                    else
 
415
                        uasserted( ASSERT_ID_DUPKEY , dupKeyError( idx , key ) );
 
416
                }
 
417
 
 
418
                // dup keys allowed.  use recordLoc as if it is part of the key
 
419
                DiskLoc unusedRL = M.recordLoc;
 
420
                unusedRL.GETOFS() &= ~1; // so we can test equality without the used bit messing us up
 
421
                x = recordLoc.compare(unusedRL);
 
422
            }
 
423
            if ( x < 0 ) // key < M.key
 
424
                h = m-1;
 
425
            else if ( x > 0 )
 
426
                l = m+1;
 
427
            else {
 
428
                // found it.
 
429
                pos = m;
 
430
                return true;
 
431
            }
 
432
        }
 
433
        // not found
 
434
        pos = l;
 
435
        if ( pos != n ) {
 
436
            BSONObj keyatpos = keyNode(pos).key;
 
437
            wassert( key.woCompare(keyatpos, order) <= 0 );
 
438
            if ( pos > 0 ) {
 
439
                wassert( keyNode(pos-1).key.woCompare(key, order) <= 0 );
 
440
            }
 
441
        }
 
442
 
 
443
        return false;
 
444
    }
 
445
 
 
446
    void BtreeBucket::delBucket(const DiskLoc& thisLoc, IndexDetails& id) {
 
447
        ClientCursor::informAboutToDeleteBucket(thisLoc);
 
448
        assert( !isHead() );
 
449
 
 
450
        BtreeBucket *p = parent.btreemod();
 
451
        if ( p->nextChild == thisLoc ) {
 
452
            p->nextChild.Null();
 
453
        }
 
454
        else {
 
455
            for ( int i = 0; i < p->n; i++ ) {
 
456
                if ( p->k(i).prevChildBucket == thisLoc ) {
 
457
                    p->k(i).prevChildBucket.Null();
 
458
                    goto found;
 
459
                }
 
460
            }
 
461
            out() << "ERROR: can't find ref to deleted bucket.\n";
 
462
            out() << "To delete:\n";
 
463
            dump();
 
464
            out() << "Parent:\n";
 
465
            p->dump();
 
466
            assert(false);
 
467
        }
 
468
found:
 
469
#if 1
 
470
        /* as a temporary defensive measure, we zap the whole bucket, AND don't truly delete
 
471
           it (meaning it is ineligible for reuse).
 
472
           */
 
473
        memset(this, 0, Size());
 
474
        modified(thisLoc);
 
475
#else
 
476
        //defensive:
 
477
        n = -1;
 
478
        parent.Null();
 
479
        massert( 10284 , "todo: use RecStoreInterface instead", false);
 
480
        // TODO: this was broken anyway as deleteRecord does unindexRecord() call which assumes the data is a BSONObj, 
 
481
        // and it isn't.
 
482
        assert(false);
 
483
//        theDataFileMgr.deleteRecord(id.indexNamespace().c_str(), thisLoc.rec(), thisLoc);
 
484
#endif
 
485
    }
 
486
 
 
487
    /* note: may delete the entire bucket!  this invalid upon return sometimes. */
 
488
    void BtreeBucket::delKeyAtPos(const DiskLoc& thisLoc, IndexDetails& id, int p) {
 
489
        modified(thisLoc);
 
490
        assert(n>0);
 
491
        DiskLoc left = childForPos(p);
 
492
 
 
493
        if ( n == 1 ) {
 
494
            if ( left.isNull() && nextChild.isNull() ) {
 
495
                if ( isHead() )
 
496
                    _delKeyAtPos(p); // we don't delete the top bucket ever
 
497
                else
 
498
                    delBucket(thisLoc, id);
 
499
                return;
 
500
            }
 
501
            markUnused(p);
 
502
            return;
 
503
        }
 
504
 
 
505
        if ( left.isNull() )
 
506
            _delKeyAtPos(p);
 
507
        else
 
508
            markUnused(p);
 
509
    }
 
510
 
 
511
    int qqq = 0;
 
512
 
 
513
    /* remove a key from the index */
 
514
    bool BtreeBucket::unindex(const DiskLoc& thisLoc, IndexDetails& id, BSONObj& key, const DiskLoc& recordLoc ) {
 
515
        if ( key.objsize() > KeyMax ) {
 
516
            OCCASIONALLY problem() << "unindex: key too large to index, skipping " << id.indexNamespace() << /* ' ' << key.toString() << */ '\n';
 
517
            return false;
 
518
        }
 
519
 
 
520
        int pos;
 
521
        bool found;
 
522
        DiskLoc loc = locate(id, thisLoc, key, id.keyPattern(), pos, found, recordLoc, 1);
 
523
        if ( found ) {
 
524
            loc.btree()->delKeyAtPos(loc, id, pos);
 
525
            return true;
 
526
        }
 
527
        return false;
 
528
    }
 
529
 
 
530
    BtreeBucket* BtreeBucket::allocTemp() {
 
531
        BtreeBucket *b = (BtreeBucket*) malloc(BucketSize);
 
532
        b->init();
 
533
        return b;
 
534
    }
 
535
 
 
536
    inline void fix(const DiskLoc& thisLoc, const DiskLoc& child) {
 
537
        if ( !child.isNull() ) {
 
538
            if ( insert_debug )
 
539
                out() << "      " << child.toString() << ".parent=" << thisLoc.toString() << endl;
 
540
            child.btreemod()->parent = thisLoc;
 
541
        }
 
542
    }
 
543
 
 
544
    /* this sucks.  maybe get rid of parent ptrs. */
 
545
    void BtreeBucket::fixParentPtrs(const DiskLoc& thisLoc) {
 
546
        VERIFYTHISLOC
 
547
        fix(thisLoc, nextChild);
 
548
        for ( int i = 0; i < n; i++ )
 
549
            fix(thisLoc, k(i).prevChildBucket);
 
550
    }
 
551
 
 
552
    /* insert a key in this bucket, splitting if necessary.
 
553
       keypos - where to insert the key i3n range 0..n.  0=make leftmost, n=make rightmost.
 
554
    */
 
555
    void BtreeBucket::insertHere(DiskLoc thisLoc, int keypos,
 
556
                                 DiskLoc recordLoc, const BSONObj& key, const BSONObj& order,
 
557
                                 DiskLoc lchild, DiskLoc rchild, IndexDetails& idx)
 
558
    {
 
559
        modified(thisLoc);
 
560
        if ( insert_debug )
 
561
            out() << "   " << thisLoc.toString() << ".insertHere " << key.toString() << '/' << recordLoc.toString() << ' '
 
562
                 << lchild.toString() << ' ' << rchild.toString() << " keypos:" << keypos << endl;
 
563
 
 
564
        DiskLoc oldLoc = thisLoc;
 
565
 
 
566
        if ( basicInsert(thisLoc, keypos, recordLoc, key, order) ) {
 
567
            _KeyNode& kn = k(keypos);
 
568
            if ( keypos+1 == n ) { // last key
 
569
                if ( nextChild != lchild ) {
 
570
                    out() << "ERROR nextChild != lchild" << endl;
 
571
                    out() << "  thisLoc: " << thisLoc.toString() << ' ' << idx.indexNamespace() << endl;
 
572
                    out() << "  keyPos: " << keypos << " n:" << n << endl;
 
573
                    out() << "  nextChild: " << nextChild.toString() << " lchild: " << lchild.toString() << endl;
 
574
                    out() << "  recordLoc: " << recordLoc.toString() << " rchild: " << rchild.toString() << endl;
 
575
                    out() << "  key: " << key.toString() << endl;
 
576
                    dump();
 
577
#if 0
 
578
                    out() << "\n\nDUMPING FULL INDEX" << endl;
 
579
                    bt_dmp=1;
 
580
                    bt_fv=1;
 
581
                    idx.head.btree()->fullValidate(idx.head);
 
582
#endif
 
583
                    assert(false);
 
584
                }
 
585
                kn.prevChildBucket = nextChild;
 
586
                assert( kn.prevChildBucket == lchild );
 
587
                nextChild = rchild;
 
588
                if ( !rchild.isNull() )
 
589
                    rchild.btreemod()->parent = thisLoc;
 
590
            }
 
591
            else {
 
592
                k(keypos).prevChildBucket = lchild;
 
593
                if ( k(keypos+1).prevChildBucket != lchild ) {
 
594
                    out() << "ERROR k(keypos+1).prevChildBucket != lchild" << endl;
 
595
                    out() << "  thisLoc: " << thisLoc.toString() << ' ' << idx.indexNamespace() << endl;
 
596
                    out() << "  keyPos: " << keypos << " n:" << n << endl;
 
597
                    out() << "  k(keypos+1).pcb: " << k(keypos+1).prevChildBucket.toString() << " lchild: " << lchild.toString() << endl;
 
598
                    out() << "  recordLoc: " << recordLoc.toString() << " rchild: " << rchild.toString() << endl;
 
599
                    out() << "  key: " << key.toString() << endl;
 
600
                    dump();
 
601
#if 0
 
602
                    out() << "\n\nDUMPING FULL INDEX" << endl;
 
603
                    bt_dmp=1;
 
604
                    bt_fv=1;
 
605
                    idx.head.btree()->fullValidate(idx.head);
 
606
#endif
 
607
                    assert(false);
 
608
                }
 
609
                k(keypos+1).prevChildBucket = rchild;
 
610
                if ( !rchild.isNull() )
 
611
                    rchild.btreemod()->parent = thisLoc;
 
612
            }
 
613
            return;
 
614
        }
 
615
 
 
616
        /* ---------- split ---------------- */
 
617
 
 
618
        if ( split_debug )
 
619
            out() << "    " << thisLoc.toString() << ".split" << endl;
 
620
 
 
621
        int mid = n / 2;
 
622
 
 
623
        DiskLoc rLoc = addBucket(idx);
 
624
        BtreeBucket *r = rLoc.btreemod();
 
625
        if ( split_debug )
 
626
            out() << "     mid:" << mid << ' ' << keyNode(mid).key.toString() << " n:" << n << endl;
 
627
        for ( int i = mid+1; i < n; i++ ) {
 
628
            KeyNode kn = keyNode(i);
 
629
            r->pushBack(kn.recordLoc, kn.key, order, kn.prevChildBucket);
 
630
        }
 
631
        r->nextChild = nextChild;
 
632
        r->assertValid( order );
 
633
 
 
634
        if ( split_debug )
 
635
            out() << "     new rLoc:" << rLoc.toString() << endl;
 
636
        r = 0;
 
637
        rLoc.btree()->fixParentPtrs(rLoc);
 
638
 
 
639
        {
 
640
            KeyNode middle = keyNode(mid);
 
641
            nextChild = middle.prevChildBucket; // middle key gets promoted, its children will be thisLoc (l) and rLoc (r)
 
642
            if ( split_debug ) {
 
643
                out() << "    middle key:" << middle.key.toString() << endl;
 
644
            }
 
645
 
 
646
            // promote middle to a parent node
 
647
            if ( parent.isNull() ) {
 
648
                // make a new parent if we were the root
 
649
                DiskLoc L = addBucket(idx);
 
650
                BtreeBucket *p = L.btreemod();
 
651
                p->pushBack(middle.recordLoc, middle.key, order, thisLoc);
 
652
                p->nextChild = rLoc;
 
653
                p->assertValid( order );
 
654
                parent = idx.head = L;
 
655
                if ( split_debug )
 
656
                    out() << "    we were root, making new root:" << hex << parent.getOfs() << dec << endl;
 
657
                rLoc.btreemod()->parent = parent;
 
658
            }
 
659
            else {
 
660
                /* set this before calling _insert - if it splits it will do fixParent() logic and change the value.
 
661
                */
 
662
                rLoc.btreemod()->parent = parent;
 
663
                if ( split_debug )
 
664
                    out() << "    promoting middle key " << middle.key.toString() << endl;
 
665
                parent.btree()->_insert(parent, middle.recordLoc, middle.key, order, /*dupsallowed*/true, thisLoc, rLoc, idx);
 
666
            }
 
667
        }
 
668
 
 
669
        truncateTo(mid, order);  // note this may trash middle.key.  thus we had to promote it before finishing up here.
 
670
 
 
671
        // add our new key, there is room now
 
672
        {
 
673
 
 
674
            if ( keypos <= mid ) {
 
675
                if ( split_debug )
 
676
                    out() << "  keypos<mid, insertHere() the new key" << endl;
 
677
                insertHere(thisLoc, keypos, recordLoc, key, order, lchild, rchild, idx);
 
678
            } else {
 
679
                int kp = keypos-mid-1;
 
680
                assert(kp>=0);
 
681
                rLoc.btree()->insertHere(rLoc, kp, recordLoc, key, order, lchild, rchild, idx);
 
682
            }
 
683
        }
 
684
 
 
685
        if ( split_debug )
 
686
            out() << "     split end " << hex << thisLoc.getOfs() << dec << endl;
 
687
    }
 
688
 
 
689
    /* start a new index off, empty */
 
690
    DiskLoc BtreeBucket::addBucket(IndexDetails& id) {
 
691
        DiskLoc loc = btreeStore->insert(id.indexNamespace().c_str(), 0, BucketSize, true);
 
692
        BtreeBucket *b = loc.btreemod();
 
693
        b->init();
 
694
        return loc;
 
695
    }
 
696
 
 
697
    void BtreeBucket::renameIndexNamespace(const char *oldNs, const char *newNs) {
 
698
        btreeStore->rename( oldNs, newNs );
 
699
    }
 
700
 
 
701
    DiskLoc BtreeBucket::getHead(const DiskLoc& thisLoc) {
 
702
        DiskLoc p = thisLoc;
 
703
        while ( !p.btree()->isHead() )
 
704
            p = p.btree()->parent;
 
705
        return p;
 
706
    }
 
707
 
 
708
    DiskLoc BtreeBucket::advance(const DiskLoc& thisLoc, int& keyOfs, int direction, const char *caller) {
 
709
        if ( keyOfs < 0 || keyOfs >= n ) {
 
710
            out() << "ASSERT failure BtreeBucket::advance, caller: " << caller << endl;
 
711
            out() << "  thisLoc: " << thisLoc.toString() << endl;
 
712
            out() << "  keyOfs: " << keyOfs << " n:" << n << " direction: " << direction << endl;
 
713
            out() << bucketSummary() << endl;
 
714
            assert(false);
 
715
        }
 
716
        int adj = direction < 0 ? 1 : 0;
 
717
        int ko = keyOfs + direction;
 
718
        DiskLoc nextDown = childForPos(ko+adj);
 
719
        if ( !nextDown.isNull() ) {
 
720
            while ( 1 ) {
 
721
                keyOfs = direction>0 ? 0 : nextDown.btree()->n - 1;
 
722
                DiskLoc loc = nextDown.btree()->childForPos(keyOfs + adj);
 
723
                if ( loc.isNull() )
 
724
                    break;
 
725
                nextDown = loc;
 
726
            }
 
727
            return nextDown;
 
728
        }
 
729
 
 
730
        if ( ko < n && ko >= 0 ) {
 
731
            keyOfs = ko;
 
732
            return thisLoc;
 
733
        }
 
734
 
 
735
        // end of bucket.  traverse back up.
 
736
        DiskLoc childLoc = thisLoc;
 
737
        DiskLoc ancestor = parent;
 
738
        while ( 1 ) {
 
739
            if ( ancestor.isNull() )
 
740
                break;
 
741
            BtreeBucket *an = ancestor.btree();
 
742
            for ( int i = 0; i < an->n; i++ ) {
 
743
                if ( an->childForPos(i+adj) == childLoc ) {
 
744
                    keyOfs = i;
 
745
                    return ancestor;
 
746
                }
 
747
            }
 
748
            assert( direction<0 || an->nextChild == childLoc );
 
749
            // parent exhausted also, keep going up
 
750
            childLoc = ancestor;
 
751
            ancestor = an->parent;
 
752
        }
 
753
 
 
754
        return DiskLoc();
 
755
    }
 
756
 
 
757
    DiskLoc BtreeBucket::locate(const IndexDetails& idx, const DiskLoc& thisLoc, const BSONObj& key, const BSONObj &order, int& pos, bool& found, DiskLoc recordLoc, int direction) {
 
758
        int p;
 
759
        found = find(idx, key, recordLoc, order, p, /*assertIfDup*/ false);
 
760
        if ( found ) {
 
761
            pos = p;
 
762
            return thisLoc;
 
763
        }
 
764
 
 
765
        DiskLoc child = childForPos(p);
 
766
 
 
767
        if ( !child.isNull() ) {
 
768
            DiskLoc l = child.btree()->locate(idx, child, key, order, pos, found, recordLoc, direction);
 
769
            if ( !l.isNull() )
 
770
                return l;
 
771
        }
 
772
 
 
773
        pos = p;
 
774
        if ( direction < 0 )
 
775
            return --pos == -1 ? DiskLoc() /*theend*/ : thisLoc;
 
776
        else
 
777
            return pos == n ? DiskLoc() /*theend*/ : thisLoc;
 
778
    }
 
779
 
 
780
    /* @thisLoc disk location of *this
 
781
    */
 
782
    int BtreeBucket::_insert(DiskLoc thisLoc, DiskLoc recordLoc,
 
783
                             const BSONObj& key, const BSONObj &order, bool dupsAllowed,
 
784
                             DiskLoc lChild, DiskLoc rChild, IndexDetails& idx) {
 
785
        if ( key.objsize() > KeyMax ) {
 
786
            problem() << "ERROR: key too large len:" << key.objsize() << " max:" << KeyMax << ' ' << key.objsize() << ' ' << idx.indexNamespace() << endl;
 
787
            return 2;
 
788
        }
 
789
        assert( key.objsize() > 0 );
 
790
 
 
791
        int pos;
 
792
        bool found = find(idx, key, recordLoc, order, pos, !dupsAllowed);
 
793
        if ( insert_debug ) {
 
794
            out() << "  " << thisLoc.toString() << '.' << "_insert " <<
 
795
                 key.toString() << '/' << recordLoc.toString() <<
 
796
                 " l:" << lChild.toString() << " r:" << rChild.toString() << endl;
 
797
            out() << "    found:" << found << " pos:" << pos << " n:" << n << endl;
 
798
        }
 
799
 
 
800
        if ( found ) {
 
801
            _KeyNode& kn = k(pos);
 
802
            if ( kn.isUnused() ) {
 
803
                log(4) << "btree _insert: reusing unused key" << endl;
 
804
                massert( 10285 , "_insert: reuse key but lchild is not null", lChild.isNull());
 
805
                massert( 10286 , "_insert: reuse key but rchild is not null", rChild.isNull());
 
806
                kn.setUsed();
 
807
                return 0;
 
808
            }
 
809
 
 
810
            out() << "_insert(): key already exists in index\n";
 
811
            out() << "  " << idx.indexNamespace().c_str() << " thisLoc:" << thisLoc.toString() << '\n';
 
812
            out() << "  " << key.toString() << '\n';
 
813
            out() << "  " << "recordLoc:" << recordLoc.toString() << " pos:" << pos << endl;
 
814
            out() << "  old l r: " << childForPos(pos).toString() << ' ' << childForPos(pos+1).toString() << endl;
 
815
            out() << "  new l r: " << lChild.toString() << ' ' << rChild.toString() << endl;
 
816
            massert( 10287 , "btree: key+recloc already in index", false);
 
817
        }
 
818
 
 
819
        DEBUGGING out() << "TEMP: key: " << key.toString() << endl;
 
820
        DiskLoc& child = childForPos(pos);
 
821
        if ( insert_debug )
 
822
            out() << "    getChild(" << pos << "): " << child.toString() << endl;
 
823
        if ( child.isNull() || !rChild.isNull() /* means an 'internal' insert */ ) {
 
824
            insertHere(thisLoc, pos, recordLoc, key, order, lChild, rChild, idx);
 
825
            return 0;
 
826
        }
 
827
 
 
828
        return child.btree()->bt_insert(child, recordLoc, key, order, dupsAllowed, idx, /*toplevel*/false);
 
829
    }
 
830
 
 
831
    void BtreeBucket::dump() {
 
832
        out() << "DUMP btreebucket n:" << n;
 
833
        out() << " parent:" << hex << parent.getOfs() << dec;
 
834
        for ( int i = 0; i < n; i++ ) {
 
835
            out() << '\n';
 
836
            KeyNode k = keyNode(i);
 
837
            out() << '\t' << i << '\t' << k.key.toString() << "\tleft:" << hex <<
 
838
                 k.prevChildBucket.getOfs() << "\tRecLoc:" << k.recordLoc.toString() << dec;
 
839
            if ( this->k(i).isUnused() )
 
840
                out() << " UNUSED";
 
841
        }
 
842
        out() << " right:" << hex << nextChild.getOfs() << dec << endl;
 
843
    }
 
844
 
 
845
    /* todo: meaning of return code unclear clean up */
 
846
    int BtreeBucket::bt_insert(DiskLoc thisLoc, DiskLoc recordLoc,
 
847
                            const BSONObj& key, const BSONObj &order, bool dupsAllowed,
 
848
                            IndexDetails& idx, bool toplevel)
 
849
    {
 
850
        if ( toplevel ) {
 
851
            if ( key.objsize() > KeyMax ) {
 
852
                problem() << "Btree::insert: key too large to index, skipping " << idx.indexNamespace().c_str() << ' ' << key.objsize() << ' ' << key.toString() << '\n';
 
853
                return 3;
 
854
            }
 
855
        }
 
856
 
 
857
        int x = _insert(thisLoc, recordLoc, key, order, dupsAllowed, DiskLoc(), DiskLoc(), idx);
 
858
        assertValid( order );
 
859
 
 
860
        return x;
 
861
    }
 
862
 
 
863
    void BtreeBucket::shape(stringstream& ss) {
 
864
        _shape(0, ss);
 
865
    }
 
866
    
 
867
    DiskLoc BtreeBucket::findSingle( const IndexDetails& indexdetails , const DiskLoc& thisLoc, const BSONObj& key ){
 
868
        int pos;
 
869
        bool found;
 
870
        DiskLoc bucket = locate( indexdetails , indexdetails.head , key , BSONObj() , pos , found , minDiskLoc );
 
871
        if ( bucket.isNull() )
 
872
            return bucket;
 
873
 
 
874
        BtreeBucket *b = bucket.btree();
 
875
        while ( 1 ){
 
876
            _KeyNode& knraw = b->k(pos);
 
877
            if ( knraw.isUsed() )
 
878
                break;
 
879
            bucket = b->advance( bucket , pos , 1 , "findSingle" );
 
880
            if ( bucket.isNull() )
 
881
                return bucket;
 
882
            b = bucket.btree();
 
883
        }
 
884
        KeyNode kn = b->keyNode( pos );
 
885
        if ( key.woCompare( kn.key ) != 0 )
 
886
            return DiskLoc();
 
887
        return kn.recordLoc;
 
888
    }
 
889
 
 
890
} // namespace mongo
 
891
 
 
892
#include "db.h"
 
893
#include "dbhelpers.h"
 
894
 
 
895
namespace mongo {
 
896
 
 
897
    void BtreeBucket::a_test(IndexDetails& id) {
 
898
        BtreeBucket *b = id.head.btree();
 
899
 
 
900
        // record locs for testing
 
901
        DiskLoc A(1, 20);
 
902
        DiskLoc B(1, 30);
 
903
        DiskLoc C(1, 40);
 
904
 
 
905
        DiskLoc rl;
 
906
        BSONObj key = fromjson("{x:9}");
 
907
        BSONObj order = fromjson("{}");
 
908
 
 
909
        b->bt_insert(id.head, A, key, order, true, id);
 
910
        A.GETOFS() += 2;
 
911
        b->bt_insert(id.head, A, key, order, true, id);
 
912
        A.GETOFS() += 2;
 
913
        b->bt_insert(id.head, A, key, order, true, id);
 
914
        A.GETOFS() += 2;
 
915
        b->bt_insert(id.head, A, key, order, true, id);
 
916
        A.GETOFS() += 2;
 
917
        assert( b->k(0).isUsed() );
 
918
//        b->k(0).setUnused();
 
919
        b->k(1).setUnused();
 
920
        b->k(2).setUnused();
 
921
        b->k(3).setUnused();
 
922
 
 
923
        b->dumpTree(id.head, order);
 
924
 
 
925
        /*        b->bt_insert(id.head, B, key, order, false, id);
 
926
        b->k(1).setUnused();
 
927
 
 
928
        b->dumpTree(id.head, order);
 
929
        cout << "---\n";
 
930
 
 
931
        b->bt_insert(id.head, A, key, order, false, id);
 
932
 
 
933
        b->dumpTree(id.head, order);
 
934
        cout << "---\n";*/
 
935
 
 
936
        // this should assert.  does it? (it might "accidentally" though, not asserting proves a problem, asserting proves nothing)
 
937
        b->bt_insert(id.head, C, key, order, false, id);
 
938
 
 
939
        b->dumpTree(id.head, order);
 
940
    }
 
941
 
 
942
    /* --- BtreeBuilder --- */
 
943
 
 
944
    BtreeBuilder::BtreeBuilder(bool _dupsAllowed, IndexDetails& _idx) : 
 
945
      dupsAllowed(_dupsAllowed), idx(_idx), n(0) 
 
946
    {
 
947
        first = cur = BtreeBucket::addBucket(idx);
 
948
        b = cur.btreemod();
 
949
        order = idx.keyPattern();
 
950
        committed = false;
 
951
    }
 
952
 
 
953
    void BtreeBuilder::newBucket() { 
 
954
        DiskLoc L = BtreeBucket::addBucket(idx);
 
955
        b->tempNext() = L;
 
956
        cur = L;
 
957
        b = cur.btreemod();
 
958
    }
 
959
 
 
960
    void BtreeBuilder::addKey(BSONObj& key, DiskLoc loc) { 
 
961
        if( !dupsAllowed ) {
 
962
            if( n > 0 ) {
 
963
                int cmp = keyLast.woCompare(key, order);
 
964
                massert( 10288 ,  "bad key order in BtreeBuilder - server internal error", cmp <= 0 );
 
965
                if( cmp == 0 ) {
 
966
                    //if( !dupsAllowed )
 
967
                    uasserted( ASSERT_ID_DUPKEY , BtreeBucket::dupKeyError( idx , keyLast ) );
 
968
                }
 
969
            }
 
970
            keyLast = key;
 
971
        }
 
972
 
 
973
        if ( ! b->_pushBack(loc, key, order, DiskLoc()) ){
 
974
            // no room
 
975
            if ( key.objsize() > KeyMax ) {
 
976
                problem() << "Btree::insert: key too large to index, skipping " << idx.indexNamespace().c_str() << ' ' << key.objsize() << ' ' << key.toString() << '\n';
 
977
            }
 
978
            else { 
 
979
                // bucket was full
 
980
                newBucket();
 
981
                b->pushBack(loc, key, order, DiskLoc());
 
982
            }
 
983
        }
 
984
        n++;
 
985
    }
 
986
 
 
987
    void BtreeBuilder::buildNextLevel(DiskLoc loc) { 
 
988
        int levels = 1;
 
989
        while( 1 ) { 
 
990
            if( loc.btree()->tempNext().isNull() ) { 
 
991
                // only 1 bucket at this level. we are done.
 
992
                idx.head = loc;
 
993
                break;
 
994
            }
 
995
            levels++;
 
996
 
 
997
            DiskLoc upLoc = BtreeBucket::addBucket(idx);
 
998
            DiskLoc upStart = upLoc;
 
999
            BtreeBucket *up = upLoc.btreemod();
 
1000
 
 
1001
            DiskLoc xloc = loc;
 
1002
            while( !xloc.isNull() ) { 
 
1003
                BtreeBucket *x = xloc.btreemod();
 
1004
                BSONObj k; 
 
1005
                DiskLoc r;
 
1006
                x->popBack(r,k);
 
1007
                if( x->n == 0 )
 
1008
                    log() << "warning: empty bucket on BtreeBuild " << k.toString() << endl;
 
1009
 
 
1010
                if ( ! up->_pushBack(r, k, order, xloc) ){
 
1011
                    // current bucket full
 
1012
                    DiskLoc n = BtreeBucket::addBucket(idx);
 
1013
                    up->tempNext() = n;
 
1014
                    upLoc = n; 
 
1015
                    up = upLoc.btreemod();
 
1016
                    up->pushBack(r, k, order, xloc);
 
1017
                }
 
1018
 
 
1019
                xloc = x->tempNext(); /* get next in chain at current level */
 
1020
                x->parent = upLoc;
 
1021
            }
 
1022
            
 
1023
            loc = upStart;
 
1024
        }
 
1025
 
 
1026
        if( levels > 1 )
 
1027
            log(2) << "btree levels: " << levels << endl;
 
1028
    }
 
1029
 
 
1030
    /* when all addKeys are done, we then build the higher levels of the tree */
 
1031
    void BtreeBuilder::commit() { 
 
1032
        buildNextLevel(first);
 
1033
        committed = true;
 
1034
    }
 
1035
 
 
1036
    BtreeBuilder::~BtreeBuilder() { 
 
1037
        if( !committed ) { 
 
1038
            log(2) << "Rolling back partially built index space" << endl;
 
1039
            DiskLoc x = first;
 
1040
            while( !x.isNull() ) { 
 
1041
                DiskLoc next = x.btree()->tempNext();
 
1042
                btreeStore->deleteRecord(idx.indexNamespace().c_str(), x);
 
1043
                x = next;
 
1044
            }
 
1045
            assert( idx.head.isNull() );
 
1046
            log(2) << "done rollback" << endl;
 
1047
        }
 
1048
    }
 
1049
 
 
1050
}