~pbeaman/akiban-persistit/tree-builder3

« back to all changes in this revision

Viewing changes to src/test/java/com/persistit/stress/unit/BigLoad.java

  • Committer: Peter Beaman
  • Date: 2012-12-31 15:54:13 UTC
  • Revision ID: pbeaman@akiban.com-20121231155413-sqd2uo4ux1ijvnee
Modify InsertBigLoad stress test to use TreeBuilder

Show diffs side-by-side

added added

removed removed

Lines of Context:
16
16
package com.persistit.stress.unit;
17
17
 
18
18
import java.util.Random;
19
 
import java.util.SortedMap;
20
 
import java.util.TreeMap;
21
19
 
22
20
import com.persistit.Exchange;
23
21
import com.persistit.Key;
24
22
import com.persistit.Persistit;
25
 
import com.persistit.Volume;
26
 
import com.persistit.exception.PersistitException;
 
23
import com.persistit.Tree;
 
24
import com.persistit.TreeBuilder;
 
25
import com.persistit.Value;
27
26
import com.persistit.stress.AbstractStressTest;
28
27
import com.persistit.util.ArgParser;
29
28
import com.persistit.util.Util;
50
49
    private static final Random RANDOM = new Random();
51
50
 
52
51
    private int totalRecords;
53
 
    private int recordsPerBucket;
54
 
 
55
 
    /**
56
 
     * A Comparable wrapper for an Exchange. An instance of this class may be
57
 
     * held in a SortedMap only if the Key of the Exchange does not change. In
58
 
     * this example, the ComparableExchangeHolder is always removed from the
59
 
     * TreeMap before the Key changes and then reinserted into a new location
60
 
     * after the key has changed.
61
 
     */
62
 
    static class ComparableExchangeHolder implements Comparable<ComparableExchangeHolder> {
63
 
 
64
 
        final Exchange exchange;
65
 
 
66
 
        ComparableExchangeHolder(final Exchange exchange) {
67
 
            this.exchange = exchange;
68
 
        }
69
 
 
70
 
        @Override
71
 
        public int compareTo(final ComparableExchangeHolder ceh) {
72
 
            final Key k1 = exchange.getKey();
73
 
            final Key k2 = ceh.exchange.getKey();
74
 
            return k1.compareTo(k2);
75
 
        }
76
 
    }
77
52
 
78
53
    public BigLoad(final int totalRecords, final int buckets) {
79
54
        super("");
80
55
        this.totalRecords = totalRecords;
81
 
        this.recordsPerBucket = totalRecords / buckets;
82
56
    }
83
57
 
84
 
    public void load(final Persistit db) throws PersistitException {
 
58
    public void load(final Persistit db) throws Exception {
85
59
        final long startLoadTime = System.nanoTime();
86
 
        final Volume sortVolume = db.createTemporaryVolume();
 
60
        TreeBuilder tb = new TreeBuilder(db) {
 
61
            @Override
 
62
            protected void reportSorted(final long count) {
 
63
                System.out.printf("Sorted %,15d records\n", count);
 
64
            }
 
65
 
 
66
            @Override
 
67
            protected void reportMerged(final long count) {
 
68
                System.out.printf("Sorted %,15d records\n", count);
 
69
            }
 
70
 
 
71
            @Override
 
72
            protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) {
 
73
                System.out.println("Duplicate  key detected: " + key);
 
74
                return true;
 
75
            }
 
76
        };
87
77
        final Exchange resultExchange = db.getExchange("persistit", "sorted", true);
88
 
        System.out.printf("Loading %,d records into %,d buckets\n", totalRecords, totalRecords / recordsPerBucket);
89
 
        final int bucketCount = loadBuckets(db, sortVolume);
 
78
        System.out.printf("Loading %,d records\n", totalRecords);
 
79
 
 
80
        for (int i = 0; i < totalRecords; i++) {
 
81
            resultExchange.clear().append(randomKey());
 
82
            tb.store(resultExchange);
 
83
        }
90
84
        final long endLoadTime = System.nanoTime();
91
85
 
92
 
        System.out.printf("Merging %,d records from %,d buckets into main database\n", totalRecords, bucketCount);
93
 
        mergeBuckets(db, bucketCount, sortVolume, resultExchange);
 
86
        System.out.printf("Merging %,d records from %,d buckets into main database\n", totalRecords,
 
87
                tb.getSortVolumeCount());
 
88
 
 
89
        tb.merge();
94
90
        final long endMergeTime = System.nanoTime();
95
91
        System.out.printf("Merged %,d records in %,dms\n", totalRecords, (endMergeTime - endLoadTime) / Util.NS_PER_MS);
96
 
        sortVolume.close();
 
92
        tb.close();
97
93
        System.out.printf("Counting keys in main database (100M keys per dot) ");
98
94
        resultExchange.clear().append(Key.BEFORE);
99
95
        long count = 0;
110
106
        System.out.printf("Total time to load, merge and count %,d records is %,dms", totalRecords,
111
107
                (endCountTime - startLoadTime) / Util.NS_PER_MS);
112
108
    }
113
 
 
114
 
    private int loadBuckets(final Persistit db, final Volume volume) throws PersistitException {
115
 
        long bucketStartTime = 0;
116
 
        int bucket = 0;
117
 
        Exchange ex = null;
118
 
        for (int i = 0; i < totalRecords; i++) {
119
 
            if ((i % recordsPerBucket) == 0) {
120
 
                final long now = System.nanoTime();
121
 
                if (i > 0) {
122
 
                    System.out.printf("Loaded bucket %,5d in %,12dms\n", bucket, (now - bucketStartTime)
123
 
                            / Util.NS_PER_MS);
124
 
                }
125
 
                bucketStartTime = now;
126
 
                bucket++;
127
 
                ex = db.getExchange(volume, "sort" + bucket, true);
128
 
            }
129
 
            ex.clear().append(randomKey());
130
 
            ex.store();
131
 
        }
132
 
        return bucket;
133
 
    }
134
 
 
135
 
    private void mergeBuckets(final Persistit db, final int bucketCount, final Volume sortVolume, final Exchange to)
136
 
            throws PersistitException {
137
 
        final long startLoadTime = System.nanoTime();
138
 
        int loaded = 0;
139
 
 
140
 
        final SortedMap<ComparableExchangeHolder, Integer> sortMap = new TreeMap<ComparableExchangeHolder, Integer>();
141
 
        /*
142
 
         * Load the sortMap using as keys the first key of each bucket.
143
 
         */
144
 
        for (int bucket = 1; bucket <= bucketCount; bucket++) {
145
 
            final Exchange ex = db.getExchange(sortVolume, "sort" + bucket, false);
146
 
            if (ex.append(Key.BEFORE).next()) {
147
 
                final Integer duplicate = sortMap.put(new ComparableExchangeHolder(ex), bucket);
148
 
                showDuplicate(duplicate, bucket, ex);
149
 
            }
150
 
        }
151
 
 
152
 
        while (!sortMap.isEmpty()) {
153
 
            final ComparableExchangeHolder ceh = sortMap.firstKey();
154
 
            final int bucket = sortMap.remove(ceh);
155
 
            ceh.exchange.getKey().copyTo(to.getKey());
156
 
            if (ceh.exchange.next()) {
157
 
                final Integer duplicate = sortMap.put(ceh, bucket);
158
 
                showDuplicate(duplicate, bucket, ceh.exchange);
159
 
            }
160
 
            to.store();
161
 
            if ((++loaded % 10000000) == 0) {
162
 
                System.out.printf("Merged %,d records in %,dms\n", loaded, (System.nanoTime() - startLoadTime)
163
 
                        / Util.NS_PER_MS);
164
 
            }
165
 
        }
166
 
    }
 
109
    
 
110
    final StringBuilder sb = new StringBuilder("00000000000000000000xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
167
111
 
168
112
    private String randomKey() {
169
 
        return String.format("%020dxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
170
 
                (RANDOM.nextLong() & Long.MAX_VALUE));
171
 
    }
172
 
 
173
 
    private void showDuplicate(final Integer bucket1, final int bucket2, final Exchange ex) {
174
 
        if (bucket1 != null) {
175
 
            System.out.printf("Duplicate key %s in buckets %,d and %,d\n", ex.getKey(), bucket1, bucket2);
 
113
        long n = RANDOM.nextLong() & Long.MAX_VALUE;
 
114
        for (int i = 20; --i >= 0; ) {
 
115
            sb.setCharAt(i, (char)((n % 10) + '0'));
 
116
            n /= 10;
176
117
        }
 
118
        return sb.toString();
177
119
    }
178
120
 
179
121
    /**
220
162
    }
221
163
 
222
164
    private final static String[] ARGS_TEMPLATE = { "records|int:1000000:1:1000000000|Total records to create",
223
 
            "buckets|int:100:1:1000000|Number of sort buckets", "tmpdir|string:|Temporary volume path" };
 
165
            "tmpdir|string:|Temporary volume path" };
224
166
 
225
167
    /**
226
168
     * Method to parse stress test arguments passed by the stress test suite.
230
172
        super.setUp();
231
173
        final ArgParser ap = new ArgParser("com.persistit.BigLoad", _args, ARGS_TEMPLATE).strict();
232
174
        totalRecords = ap.getIntValue("records");
233
 
        recordsPerBucket = totalRecords / ap.getIntValue("buckets");
234
175
        final String path = ap.getStringValue("tmpdir");
235
176
        if (path != null && !path.isEmpty()) {
236
177
            getPersistit().getConfiguration().setTmpVolDir(path);