16
16
package com.persistit.stress.unit;
18
18
import java.util.Random;
19
import java.util.SortedMap;
20
import java.util.TreeMap;
22
20
import com.persistit.Exchange;
23
21
import com.persistit.Key;
24
22
import com.persistit.Persistit;
25
import com.persistit.Volume;
26
import com.persistit.exception.PersistitException;
23
import com.persistit.Tree;
24
import com.persistit.TreeBuilder;
25
import com.persistit.Value;
27
26
import com.persistit.stress.AbstractStressTest;
28
27
import com.persistit.util.ArgParser;
29
28
import com.persistit.util.Util;
50
49
private static final Random RANDOM = new Random();
52
51
private int totalRecords;
53
private int recordsPerBucket;
56
* A Comparable wrapper for an Exchange. An instance of this class may be
57
* held in a SortedMap only if the Key of the Exchange does not change. In
58
* this example, the ComparableExchangeHolder is always removed from the
59
* TreeMap before the Key changes and then reinserted into a new location
60
* after the key has changed.
62
static class ComparableExchangeHolder implements Comparable<ComparableExchangeHolder> {
64
final Exchange exchange;
66
ComparableExchangeHolder(final Exchange exchange) {
67
this.exchange = exchange;
71
public int compareTo(final ComparableExchangeHolder ceh) {
72
final Key k1 = exchange.getKey();
73
final Key k2 = ceh.exchange.getKey();
74
return k1.compareTo(k2);
78
53
public BigLoad(final int totalRecords, final int buckets) {
80
55
this.totalRecords = totalRecords;
81
this.recordsPerBucket = totalRecords / buckets;
84
public void load(final Persistit db) throws PersistitException {
58
public void load(final Persistit db) throws Exception {
85
59
final long startLoadTime = System.nanoTime();
86
final Volume sortVolume = db.createTemporaryVolume();
60
TreeBuilder tb = new TreeBuilder(db) {
62
protected void reportSorted(final long count) {
63
System.out.printf("Sorted %,15d records\n", count);
67
protected void reportMerged(final long count) {
68
System.out.printf("Sorted %,15d records\n", count);
72
protected boolean duplicateKeyDetected(final Tree tree, final Key key, final Value v1, final Value v2) {
73
System.out.println("Duplicate key detected: " + key);
87
77
final Exchange resultExchange = db.getExchange("persistit", "sorted", true);
88
System.out.printf("Loading %,d records into %,d buckets\n", totalRecords, totalRecords / recordsPerBucket);
89
final int bucketCount = loadBuckets(db, sortVolume);
78
System.out.printf("Loading %,d records\n", totalRecords);
80
for (int i = 0; i < totalRecords; i++) {
81
resultExchange.clear().append(randomKey());
82
tb.store(resultExchange);
90
84
final long endLoadTime = System.nanoTime();
92
System.out.printf("Merging %,d records from %,d buckets into main database\n", totalRecords, bucketCount);
93
mergeBuckets(db, bucketCount, sortVolume, resultExchange);
86
System.out.printf("Merging %,d records from %,d buckets into main database\n", totalRecords,
87
tb.getSortVolumeCount());
94
90
final long endMergeTime = System.nanoTime();
95
91
System.out.printf("Merged %,d records in %,dms\n", totalRecords, (endMergeTime - endLoadTime) / Util.NS_PER_MS);
97
93
System.out.printf("Counting keys in main database (100M keys per dot) ");
98
94
resultExchange.clear().append(Key.BEFORE);
110
106
System.out.printf("Total time to load, merge and count %,d records is %,dms", totalRecords,
111
107
(endCountTime - startLoadTime) / Util.NS_PER_MS);
114
private int loadBuckets(final Persistit db, final Volume volume) throws PersistitException {
115
long bucketStartTime = 0;
118
for (int i = 0; i < totalRecords; i++) {
119
if ((i % recordsPerBucket) == 0) {
120
final long now = System.nanoTime();
122
System.out.printf("Loaded bucket %,5d in %,12dms\n", bucket, (now - bucketStartTime)
125
bucketStartTime = now;
127
ex = db.getExchange(volume, "sort" + bucket, true);
129
ex.clear().append(randomKey());
135
private void mergeBuckets(final Persistit db, final int bucketCount, final Volume sortVolume, final Exchange to)
136
throws PersistitException {
137
final long startLoadTime = System.nanoTime();
140
final SortedMap<ComparableExchangeHolder, Integer> sortMap = new TreeMap<ComparableExchangeHolder, Integer>();
142
* Load the sortMap using as keys the first key of each bucket.
144
for (int bucket = 1; bucket <= bucketCount; bucket++) {
145
final Exchange ex = db.getExchange(sortVolume, "sort" + bucket, false);
146
if (ex.append(Key.BEFORE).next()) {
147
final Integer duplicate = sortMap.put(new ComparableExchangeHolder(ex), bucket);
148
showDuplicate(duplicate, bucket, ex);
152
while (!sortMap.isEmpty()) {
153
final ComparableExchangeHolder ceh = sortMap.firstKey();
154
final int bucket = sortMap.remove(ceh);
155
ceh.exchange.getKey().copyTo(to.getKey());
156
if (ceh.exchange.next()) {
157
final Integer duplicate = sortMap.put(ceh, bucket);
158
showDuplicate(duplicate, bucket, ceh.exchange);
161
if ((++loaded % 10000000) == 0) {
162
System.out.printf("Merged %,d records in %,dms\n", loaded, (System.nanoTime() - startLoadTime)
110
final StringBuilder sb = new StringBuilder("00000000000000000000xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
168
112
private String randomKey() {
169
return String.format("%020dxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
170
(RANDOM.nextLong() & Long.MAX_VALUE));
173
private void showDuplicate(final Integer bucket1, final int bucket2, final Exchange ex) {
174
if (bucket1 != null) {
175
System.out.printf("Duplicate key %s in buckets %,d and %,d\n", ex.getKey(), bucket1, bucket2);
113
long n = RANDOM.nextLong() & Long.MAX_VALUE;
114
for (int i = 20; --i >= 0; ) {
115
sb.setCharAt(i, (char)((n % 10) + '0'));
118
return sb.toString();