~pbeaman/akiban-persistit/fix_1005206_infinite_loop

« back to all changes in this revision

Viewing changes to core/src/main/java/com/persistit/Buffer.java

  • Committer: build-akiban
  • Date: 2012-05-15 21:24:26 UTC
  • mfrom: (292.3.38 fix_959456)
  • Revision ID: build-akiban-20120515212426-6lp526nql1wu790g
merge pbeaman: This branch fixes several problems related to https://bugs.launchpad.net/akiban-persistit/+bug/959456 Journal copier falls behind in 4-hour Persistit TPCC test.

https://code.launchpad.net/~pbeaman/akiban-persistit/fix_959456/+merge/105529

Show diffs side-by-side

added added

removed removed

Lines of Context:
38
38
import java.util.List;
39
39
import java.util.Set;
40
40
import java.util.Stack;
41
 
import java.util.concurrent.atomic.AtomicLong;
42
41
 
43
42
import com.persistit.CleanupManager.CleanupAntiValue;
44
43
import com.persistit.Exchange.Sequence;
62
61
 
63
62
/**
64
63
 * <p>
65
 
 * Memory structure that holds and manipulates the state of a fixed-length
66
 
 * page of a {@link Volume}. Persistit manipulates the content of a page by
67
 
 * copying it into a <code>Buffer</code>, reading and/or modifying modifying the
 
64
 * Memory structure that holds and manipulates the state of a fixed-length page
 
65
 * of a {@link Volume}. Persistit manipulates the content of a page by copying
 
66
 * it into a <code>Buffer</code>, reading and/or modifying modifying the
68
67
 * <code>Buffer</code>, and then writing the <code>Buffer</code>'s content back
69
68
 * into the page. There are several types of pages within the BTree structure -
70
69
 * e.g., index pages and data pages. A <code>Buffer</code> can hold and
383
382
     */
384
383
    private Buffer _next = null;
385
384
 
386
 
    private AtomicLong _lastPruningActionEnqueuedTime = new AtomicLong();
 
385
    private volatile long _lastPrunedTime;
 
386
 
 
387
    private volatile boolean _enqueuedForAntiValuePruning;
387
388
 
388
389
    /**
389
390
     * Construct a new buffer.
451
452
        _alloc = _bufferSize;
452
453
        _slack = 0;
453
454
        _mvvCount = 0;
454
 
        _lastPruningActionEnqueuedTime.set(0);
 
455
        clearEnqueuedForPruning();
455
456
        bumpGeneration();
456
457
    }
457
458
 
 
459
    void clearEnqueuedForPruning() {
 
460
        _enqueuedForAntiValuePruning = false;
 
461
        _lastPrunedTime = 0;
 
462
    }
 
463
 
458
464
    /**
 
465
     * 
459
466
     * Extract fields from the buffer.
460
467
     * 
461
468
     * @throws PersistitIOException
501
508
                if (isDataPage()) {
502
509
                    _tailHeaderSize = TAILBLOCK_HDR_SIZE_DATA;
503
510
                    _mvvCount = Integer.MAX_VALUE;
 
511
                    clearEnqueuedForPruning();
504
512
                } else if (isIndexPage()) {
505
513
                    _tailHeaderSize = TAILBLOCK_HDR_SIZE_INDEX;
506
514
                }
579
587
        super.release();
580
588
    }
581
589
 
582
 
 
583
590
    void releaseTouched() {
584
591
        setTouched();
585
592
        release();
788
795
        return _next;
789
796
    }
790
797
 
791
 
 
792
798
    /**
793
799
     * Finds the keyblock in this page that exactly matches or immediately
794
800
     * follows the supplied key.
3128
3134
                }
3129
3135
            }
3130
3136
        }
3131
 
        releaseRepackPlanBuffer(plan);
3132
3137
    }
3133
3138
 
3134
3139
    /**
3399
3404
    }
3400
3405
 
3401
3406
    final int[] getRepackPlanBuffer() {
3402
 
        synchronized (REPACK_BUFFER_STACK) {
3403
 
            if (REPACK_BUFFER_STACK.isEmpty()) {
3404
 
                return new int[MAX_BUFFER_SIZE / TAILBLOCK_FACTOR];
3405
 
            } else
3406
 
                return (int[]) REPACK_BUFFER_STACK.pop();
3407
 
        }
3408
 
    }
3409
 
 
3410
 
    final void releaseRepackPlanBuffer(int[] plan) {
3411
 
        synchronized (REPACK_BUFFER_STACK) {
3412
 
            REPACK_BUFFER_STACK.push(plan);
3413
 
        }
 
3407
        return _persistit.getThreadLocalIntArray(MAX_BUFFER_SIZE / TAILBLOCK_FACTOR);
3414
3408
    }
3415
3409
 
3416
3410
    PersistitException verify(Key key, VerifyVisitor visitor) {
3553
3547
                }
3554
3548
                tail += ((size + ~TAILBLOCK_MASK) & TAILBLOCK_MASK);
3555
3549
            }
3556
 
            releaseRepackPlanBuffer(plan);
3557
3550
            return null;
3558
3551
        } catch (PersistitException pe) {
3559
3552
            return pe;
3583
3576
        }
3584
3577
        if (isDataPage() && _mvvCount != 0) {
3585
3578
            final long timestamp = _persistit.getTimestampAllocator().updateTimestamp();
 
3579
            _mvvCount = 0;
3586
3580
            writePageOnCheckpoint(timestamp);
3587
 
            _mvvCount = 0;
3588
3581
            List<PrunedVersion> prunedVersions = new ArrayList<PrunedVersion>();
3589
3582
            for (int p = KEY_BLOCK_START; p < _keyBlockEnd; p += KEYBLOCK_LENGTH) {
3590
3583
                final int kbData = getInt(p);
3623
3616
 
3624
3617
                    if (valueByte == MVV.TYPE_ANTIVALUE) {
3625
3618
                        if (p == KEY_BLOCK_START) {
3626
 
                            if (tree != null) {
3627
 
                                _mvvCount++;
3628
 
                                _persistit.getCleanupManager().offer(
3629
 
                                        new CleanupAntiValue(tree.getHandle(), getPageAddress()));
 
3619
                            if (tree != null && !_enqueuedForAntiValuePruning) {
 
3620
                                if (_persistit.getCleanupManager().offer(
 
3621
                                        new CleanupAntiValue(tree.getHandle(), getPageAddress()))) {
 
3622
                                    _enqueuedForAntiValuePruning = true;
 
3623
                                }
3630
3624
                            }
3631
3625
                        } else if (p == _keyBlockEnd - KEYBLOCK_LENGTH) {
3632
3626
                            Debug.$assert1.t(false);
3633
 
                        } else {
3634
 
                            final boolean removed  = removeKeys(p | EXACT_MASK, p | EXACT_MASK, spareKey);
 
3627
                        } else if (spareKey != null) {
 
3628
                            final boolean removed = removeKeys(p | EXACT_MASK, p | EXACT_MASK, spareKey);
3635
3629
                            Debug.$assert0.t(removed);
3636
3630
                            p -= KEYBLOCK_LENGTH;
3637
3631
                            changed = true;
3980
3974
        if (_mvvCount > 0) {
3981
3975
            long delay = _persistit.getCleanupManager().getMinimumPruningDelay();
3982
3976
            if (delay > 0) {
3983
 
                long last = _lastPruningActionEnqueuedTime.get();
 
3977
                long last = _lastPrunedTime;
3984
3978
                long now = System.currentTimeMillis();
3985
 
                if (now - last > delay && _lastPruningActionEnqueuedTime.compareAndSet(last, now)) {
 
3979
                if (now - last > delay) {
 
3980
                    _lastPrunedTime = now;
3986
3981
                    _persistit.getCleanupManager().offer(
3987
3982
                            new CleanupManager.CleanupPruneAction(treeHandle, getPageAddress()));
3988
3983
                }
4196
4191
    }
4197
4192
 
4198
4193
    static boolean isLongMVV(byte[] bytes, int offset, int length) {
4199
 
        return isLongRecord(bytes, offset, length) &&
4200
 
               (length > LONGREC_PREFIX_OFFSET) &&
4201
 
               MVV.isArrayMVV(bytes, offset + LONGREC_PREFIX_OFFSET, length - LONGREC_PREFIX_OFFSET);
 
4194
        return isLongRecord(bytes, offset, length) && (length > LONGREC_PREFIX_OFFSET)
 
4195
                && MVV.isArrayMVV(bytes, offset + LONGREC_PREFIX_OFFSET, length - LONGREC_PREFIX_OFFSET);
4202
4196
    }
4203
4197
 
4204
4198
    static boolean isValueMVV(byte[] bytes, int offset, int length) {