~ubuntu-branches/ubuntu/quantal/ceph/quantal

« back to all changes in this revision

Viewing changes to src/leveldb/db/db_bench.cc

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2012-07-16 09:56:24 UTC
  • mfrom: (0.3.11)
  • mto: This revision was merged to the branch mainline in revision 17.
  • Revision ID: package-import@ubuntu.com-20120716095624-azr2w4hbhei1rxmx
Tags: upstream-0.48
ImportĀ upstreamĀ versionĀ 0.48

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2
 
// Use of this source code is governed by a BSD-style license that can be
3
 
// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
 
 
5
 
#include <sys/types.h>
6
 
#include <stdio.h>
7
 
#include <stdlib.h>
8
 
#include "db/db_impl.h"
9
 
#include "db/version_set.h"
10
 
#include "leveldb/cache.h"
11
 
#include "leveldb/db.h"
12
 
#include "leveldb/env.h"
13
 
#include "leveldb/write_batch.h"
14
 
#include "port/port.h"
15
 
#include "util/crc32c.h"
16
 
#include "util/histogram.h"
17
 
#include "util/mutexlock.h"
18
 
#include "util/random.h"
19
 
#include "util/testutil.h"
20
 
 
21
 
// Comma-separated list of operations to run in the specified order
22
 
//   Actual benchmarks:
23
 
//      fillseq       -- write N values in sequential key order in async mode
24
 
//      fillrandom    -- write N values in random key order in async mode
25
 
//      overwrite     -- overwrite N values in random key order in async mode
26
 
//      fillsync      -- write N/100 values in random key order in sync mode
27
 
//      fill100K      -- write N/1000 100K values in random order in async mode
28
 
//      readseq       -- read N times sequentially
29
 
//      readreverse   -- read N times in reverse order
30
 
//      readrandom    -- read N times in random order
31
 
//      readhot       -- read N times in random order from 1% section of DB
32
 
//      crc32c        -- repeated crc32c of 4K of data
33
 
//      acquireload   -- load N*1000 times
34
 
//   Meta operations:
35
 
//      compact     -- Compact the entire DB
36
 
//      stats       -- Print DB stats
37
 
//      heapprofile -- Dump a heap profile (if supported by this port)
38
 
static const char* FLAGS_benchmarks =
39
 
    "fillseq,"
40
 
    "fillsync,"
41
 
    "fillrandom,"
42
 
    "overwrite,"
43
 
    "readrandom,"
44
 
    "readrandom,"  // Extra run to allow previous compactions to quiesce
45
 
    "readseq,"
46
 
    "readreverse,"
47
 
    "compact,"
48
 
    "readrandom,"
49
 
    "readseq,"
50
 
    "readreverse,"
51
 
    "fill100K,"
52
 
    "crc32c,"
53
 
    "snappycomp,"
54
 
    "snappyuncomp,"
55
 
    "acquireload,"
56
 
    ;
57
 
 
58
 
// Number of key/values to place in database
59
 
static int FLAGS_num = 1000000;
60
 
 
61
 
// Number of read operations to do.  If negative, do FLAGS_num reads.
62
 
static int FLAGS_reads = -1;
63
 
 
64
 
// Number of concurrent threads to run.
65
 
static int FLAGS_threads = 1;
66
 
 
67
 
// Size of each value
68
 
static int FLAGS_value_size = 100;
69
 
 
70
 
// Arrange to generate values that shrink to this fraction of
71
 
// their original size after compression
72
 
static double FLAGS_compression_ratio = 0.5;
73
 
 
74
 
// Print histogram of operation timings
75
 
static bool FLAGS_histogram = false;
76
 
 
77
 
// Number of bytes to buffer in memtable before compacting
78
 
// (initialized to default value by "main")
79
 
static int FLAGS_write_buffer_size = 0;
80
 
 
81
 
// Number of bytes to use as a cache of uncompressed data.
82
 
// Negative means use default settings.
83
 
static int FLAGS_cache_size = -1;
84
 
 
85
 
// Maximum number of files to keep open at the same time (use default if == 0)
86
 
static int FLAGS_open_files = 0;
87
 
 
88
 
// If true, do not destroy the existing database.  If you set this
89
 
// flag and also specify a benchmark that wants a fresh database, that
90
 
// benchmark will fail.
91
 
static bool FLAGS_use_existing_db = false;
92
 
 
93
 
// Use the db with the following name.
94
 
static const char* FLAGS_db = "/tmp/dbbench";
95
 
 
96
 
namespace leveldb {
97
 
 
98
 
namespace {
99
 
 
100
 
// Helper for quickly generating random data.
101
 
class RandomGenerator {
102
 
 private:
103
 
  std::string data_;
104
 
  int pos_;
105
 
 
106
 
 public:
107
 
  RandomGenerator() {
108
 
    // We use a limited amount of data over and over again and ensure
109
 
    // that it is larger than the compression window (32KB), and also
110
 
    // large enough to serve all typical value sizes we want to write.
111
 
    Random rnd(301);
112
 
    std::string piece;
113
 
    while (data_.size() < 1048576) {
114
 
      // Add a short fragment that is as compressible as specified
115
 
      // by FLAGS_compression_ratio.
116
 
      test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
117
 
      data_.append(piece);
118
 
    }
119
 
    pos_ = 0;
120
 
  }
121
 
 
122
 
  Slice Generate(int len) {
123
 
    if (pos_ + len > data_.size()) {
124
 
      pos_ = 0;
125
 
      assert(len < data_.size());
126
 
    }
127
 
    pos_ += len;
128
 
    return Slice(data_.data() + pos_ - len, len);
129
 
  }
130
 
};
131
 
 
132
 
static Slice TrimSpace(Slice s) {
133
 
  int start = 0;
134
 
  while (start < s.size() && isspace(s[start])) {
135
 
    start++;
136
 
  }
137
 
  int limit = s.size();
138
 
  while (limit > start && isspace(s[limit-1])) {
139
 
    limit--;
140
 
  }
141
 
  return Slice(s.data() + start, limit - start);
142
 
}
143
 
 
144
 
static void AppendWithSpace(std::string* str, Slice msg) {
145
 
  if (msg.empty()) return;
146
 
  if (!str->empty()) {
147
 
    str->push_back(' ');
148
 
  }
149
 
  str->append(msg.data(), msg.size());
150
 
}
151
 
 
152
 
class Stats {
153
 
 private:
154
 
  double start_;
155
 
  double finish_;
156
 
  double seconds_;
157
 
  int done_;
158
 
  int next_report_;
159
 
  int64_t bytes_;
160
 
  double last_op_finish_;
161
 
  Histogram hist_;
162
 
  std::string message_;
163
 
 
164
 
 public:
165
 
  Stats() { Start(); }
166
 
 
167
 
  void Start() {
168
 
    next_report_ = 100;
169
 
    last_op_finish_ = start_;
170
 
    hist_.Clear();
171
 
    done_ = 0;
172
 
    bytes_ = 0;
173
 
    seconds_ = 0;
174
 
    start_ = Env::Default()->NowMicros();
175
 
    finish_ = start_;
176
 
    message_.clear();
177
 
  }
178
 
 
179
 
  void Merge(const Stats& other) {
180
 
    hist_.Merge(other.hist_);
181
 
    done_ += other.done_;
182
 
    bytes_ += other.bytes_;
183
 
    seconds_ += other.seconds_;
184
 
    if (other.start_ < start_) start_ = other.start_;
185
 
    if (other.finish_ > finish_) finish_ = other.finish_;
186
 
 
187
 
    // Just keep the messages from one thread
188
 
    if (message_.empty()) message_ = other.message_;
189
 
  }
190
 
 
191
 
  void Stop() {
192
 
    finish_ = Env::Default()->NowMicros();
193
 
    seconds_ = (finish_ - start_) * 1e-6;
194
 
  }
195
 
 
196
 
  void AddMessage(Slice msg) {
197
 
    AppendWithSpace(&message_, msg);
198
 
  }
199
 
 
200
 
  void FinishedSingleOp() {
201
 
    if (FLAGS_histogram) {
202
 
      double now = Env::Default()->NowMicros();
203
 
      double micros = now - last_op_finish_;
204
 
      hist_.Add(micros);
205
 
      if (micros > 20000) {
206
 
        fprintf(stderr, "long op: %.1f micros%30s\r", micros, "");
207
 
        fflush(stderr);
208
 
      }
209
 
      last_op_finish_ = now;
210
 
    }
211
 
 
212
 
    done_++;
213
 
    if (done_ >= next_report_) {
214
 
      if      (next_report_ < 1000)   next_report_ += 100;
215
 
      else if (next_report_ < 5000)   next_report_ += 500;
216
 
      else if (next_report_ < 10000)  next_report_ += 1000;
217
 
      else if (next_report_ < 50000)  next_report_ += 5000;
218
 
      else if (next_report_ < 100000) next_report_ += 10000;
219
 
      else if (next_report_ < 500000) next_report_ += 50000;
220
 
      else                            next_report_ += 100000;
221
 
      fprintf(stderr, "... finished %d ops%30s\r", done_, "");
222
 
      fflush(stderr);
223
 
    }
224
 
  }
225
 
 
226
 
  void AddBytes(int64_t n) {
227
 
    bytes_ += n;
228
 
  }
229
 
 
230
 
  void Report(const Slice& name) {
231
 
    // Pretend at least one op was done in case we are running a benchmark
232
 
    // that does not call FinishedSingleOp().
233
 
    if (done_ < 1) done_ = 1;
234
 
 
235
 
    std::string extra;
236
 
    if (bytes_ > 0) {
237
 
      // Rate is computed on actual elapsed time, not the sum of per-thread
238
 
      // elapsed times.
239
 
      double elapsed = (finish_ - start_) * 1e-6;
240
 
      char rate[100];
241
 
      snprintf(rate, sizeof(rate), "%6.1f MB/s",
242
 
               (bytes_ / 1048576.0) / elapsed);
243
 
      extra = rate;
244
 
    }
245
 
    AppendWithSpace(&extra, message_);
246
 
 
247
 
    fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n",
248
 
            name.ToString().c_str(),
249
 
            seconds_ * 1e6 / done_,
250
 
            (extra.empty() ? "" : " "),
251
 
            extra.c_str());
252
 
    if (FLAGS_histogram) {
253
 
      fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
254
 
    }
255
 
    fflush(stdout);
256
 
  }
257
 
};
258
 
 
259
 
// State shared by all concurrent executions of the same benchmark.
260
 
struct SharedState {
261
 
  port::Mutex mu;
262
 
  port::CondVar cv;
263
 
  int total;
264
 
 
265
 
  // Each thread goes through the following states:
266
 
  //    (1) initializing
267
 
  //    (2) waiting for others to be initialized
268
 
  //    (3) running
269
 
  //    (4) done
270
 
 
271
 
  int num_initialized;
272
 
  int num_done;
273
 
  bool start;
274
 
 
275
 
  SharedState() : cv(&mu) { }
276
 
};
277
 
 
278
 
// Per-thread state for concurrent executions of the same benchmark.
279
 
struct ThreadState {
280
 
  int tid;             // 0..n-1 when running in n threads
281
 
  Random rand;         // Has different seeds for different threads
282
 
  Stats stats;
283
 
  SharedState* shared;
284
 
 
285
 
  ThreadState(int index)
286
 
      : tid(index),
287
 
        rand(1000 + index) {
288
 
  }
289
 
};
290
 
 
291
 
}  // namespace
292
 
 
293
 
class Benchmark {
294
 
 private:
295
 
  Cache* cache_;
296
 
  DB* db_;
297
 
  int num_;
298
 
  int value_size_;
299
 
  int entries_per_batch_;
300
 
  WriteOptions write_options_;
301
 
  int reads_;
302
 
  int heap_counter_;
303
 
 
304
 
  void PrintHeader() {
305
 
    const int kKeySize = 16;
306
 
    PrintEnvironment();
307
 
    fprintf(stdout, "Keys:       %d bytes each\n", kKeySize);
308
 
    fprintf(stdout, "Values:     %d bytes each (%d bytes after compression)\n",
309
 
            FLAGS_value_size,
310
 
            static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
311
 
    fprintf(stdout, "Entries:    %d\n", num_);
312
 
    fprintf(stdout, "RawSize:    %.1f MB (estimated)\n",
313
 
            ((static_cast<int64_t>(kKeySize + FLAGS_value_size) * num_)
314
 
             / 1048576.0));
315
 
    fprintf(stdout, "FileSize:   %.1f MB (estimated)\n",
316
 
            (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_)
317
 
             / 1048576.0));
318
 
    PrintWarnings();
319
 
    fprintf(stdout, "------------------------------------------------\n");
320
 
  }
321
 
 
322
 
  void PrintWarnings() {
323
 
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
324
 
    fprintf(stdout,
325
 
            "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
326
 
            );
327
 
#endif
328
 
#ifndef NDEBUG
329
 
    fprintf(stdout,
330
 
            "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
331
 
#endif
332
 
 
333
 
    // See if snappy is working by attempting to compress a compressible string
334
 
    const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy";
335
 
    std::string compressed;
336
 
    if (!port::Snappy_Compress(text, sizeof(text), &compressed)) {
337
 
      fprintf(stdout, "WARNING: Snappy compression is not enabled\n");
338
 
    } else if (compressed.size() >= sizeof(text)) {
339
 
      fprintf(stdout, "WARNING: Snappy compression is not effective\n");
340
 
    }
341
 
  }
342
 
 
343
 
  void PrintEnvironment() {
344
 
    fprintf(stderr, "LevelDB:    version %d.%d\n",
345
 
            kMajorVersion, kMinorVersion);
346
 
 
347
 
#if defined(__linux)
348
 
    time_t now = time(NULL);
349
 
    fprintf(stderr, "Date:       %s", ctime(&now));  // ctime() adds newline
350
 
 
351
 
    FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
352
 
    if (cpuinfo != NULL) {
353
 
      char line[1000];
354
 
      int num_cpus = 0;
355
 
      std::string cpu_type;
356
 
      std::string cache_size;
357
 
      while (fgets(line, sizeof(line), cpuinfo) != NULL) {
358
 
        const char* sep = strchr(line, ':');
359
 
        if (sep == NULL) {
360
 
          continue;
361
 
        }
362
 
        Slice key = TrimSpace(Slice(line, sep - 1 - line));
363
 
        Slice val = TrimSpace(Slice(sep + 1));
364
 
        if (key == "model name") {
365
 
          ++num_cpus;
366
 
          cpu_type = val.ToString();
367
 
        } else if (key == "cache size") {
368
 
          cache_size = val.ToString();
369
 
        }
370
 
      }
371
 
      fclose(cpuinfo);
372
 
      fprintf(stderr, "CPU:        %d * %s\n", num_cpus, cpu_type.c_str());
373
 
      fprintf(stderr, "CPUCache:   %s\n", cache_size.c_str());
374
 
    }
375
 
#endif
376
 
  }
377
 
 
378
 
 public:
379
 
  Benchmark()
380
 
  : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL),
381
 
    db_(NULL),
382
 
    num_(FLAGS_num),
383
 
    value_size_(FLAGS_value_size),
384
 
    entries_per_batch_(1),
385
 
    reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
386
 
    heap_counter_(0) {
387
 
    std::vector<std::string> files;
388
 
    Env::Default()->GetChildren(FLAGS_db, &files);
389
 
    for (int i = 0; i < files.size(); i++) {
390
 
      if (Slice(files[i]).starts_with("heap-")) {
391
 
        Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
392
 
      }
393
 
    }
394
 
    if (!FLAGS_use_existing_db) {
395
 
      DestroyDB(FLAGS_db, Options());
396
 
    }
397
 
  }
398
 
 
399
 
  ~Benchmark() {
400
 
    delete db_;
401
 
    delete cache_;
402
 
  }
403
 
 
404
 
  void Run() {
405
 
    PrintHeader();
406
 
    Open();
407
 
 
408
 
    const char* benchmarks = FLAGS_benchmarks;
409
 
    while (benchmarks != NULL) {
410
 
      const char* sep = strchr(benchmarks, ',');
411
 
      Slice name;
412
 
      if (sep == NULL) {
413
 
        name = benchmarks;
414
 
        benchmarks = NULL;
415
 
      } else {
416
 
        name = Slice(benchmarks, sep - benchmarks);
417
 
        benchmarks = sep + 1;
418
 
      }
419
 
 
420
 
      // Reset parameters that may be overriddden bwlow
421
 
      num_ = FLAGS_num;
422
 
      reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
423
 
      value_size_ = FLAGS_value_size;
424
 
      entries_per_batch_ = 1;
425
 
      write_options_ = WriteOptions();
426
 
 
427
 
      void (Benchmark::*method)(ThreadState*) = NULL;
428
 
      bool fresh_db = false;
429
 
      int num_threads = FLAGS_threads;
430
 
 
431
 
      if (name == Slice("fillseq")) {
432
 
        fresh_db = true;
433
 
        method = &Benchmark::WriteSeq;
434
 
      } else if (name == Slice("fillbatch")) {
435
 
        fresh_db = true;
436
 
        entries_per_batch_ = 1000;
437
 
        method = &Benchmark::WriteSeq;
438
 
      } else if (name == Slice("fillrandom")) {
439
 
        fresh_db = true;
440
 
        method = &Benchmark::WriteRandom;
441
 
      } else if (name == Slice("overwrite")) {
442
 
        fresh_db = false;
443
 
        method = &Benchmark::WriteRandom;
444
 
      } else if (name == Slice("fillsync")) {
445
 
        fresh_db = true;
446
 
        num_ /= 1000;
447
 
        write_options_.sync = true;
448
 
        method = &Benchmark::WriteRandom;
449
 
      } else if (name == Slice("fill100K")) {
450
 
        fresh_db = true;
451
 
        num_ /= 1000;
452
 
        value_size_ = 100 * 1000;
453
 
        method = &Benchmark::WriteRandom;
454
 
      } else if (name == Slice("readseq")) {
455
 
        method = &Benchmark::ReadSequential;
456
 
      } else if (name == Slice("readreverse")) {
457
 
        method = &Benchmark::ReadReverse;
458
 
      } else if (name == Slice("readrandom")) {
459
 
        method = &Benchmark::ReadRandom;
460
 
      } else if (name == Slice("readhot")) {
461
 
        method = &Benchmark::ReadHot;
462
 
      } else if (name == Slice("readrandomsmall")) {
463
 
        reads_ /= 1000;
464
 
        method = &Benchmark::ReadRandom;
465
 
      } else if (name == Slice("readwhilewriting")) {
466
 
        num_threads++;  // Add extra thread for writing
467
 
        method = &Benchmark::ReadWhileWriting;
468
 
      } else if (name == Slice("compact")) {
469
 
        method = &Benchmark::Compact;
470
 
      } else if (name == Slice("crc32c")) {
471
 
        method = &Benchmark::Crc32c;
472
 
      } else if (name == Slice("acquireload")) {
473
 
        method = &Benchmark::AcquireLoad;
474
 
      } else if (name == Slice("snappycomp")) {
475
 
        method = &Benchmark::SnappyCompress;
476
 
      } else if (name == Slice("snappyuncomp")) {
477
 
        method = &Benchmark::SnappyUncompress;
478
 
      } else if (name == Slice("heapprofile")) {
479
 
        HeapProfile();
480
 
      } else if (name == Slice("stats")) {
481
 
        PrintStats();
482
 
      } else {
483
 
        if (name != Slice()) {  // No error message for empty name
484
 
          fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str());
485
 
        }
486
 
      }
487
 
 
488
 
      if (fresh_db) {
489
 
        if (FLAGS_use_existing_db) {
490
 
          fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n",
491
 
                  name.ToString().c_str());
492
 
          method = NULL;
493
 
        } else {
494
 
          delete db_;
495
 
          db_ = NULL;
496
 
          DestroyDB(FLAGS_db, Options());
497
 
          Open();
498
 
        }
499
 
      }
500
 
 
501
 
      if (method != NULL) {
502
 
        RunBenchmark(num_threads, name, method);
503
 
      }
504
 
    }
505
 
  }
506
 
 
507
 
 private:
508
 
  struct ThreadArg {
509
 
    Benchmark* bm;
510
 
    SharedState* shared;
511
 
    ThreadState* thread;
512
 
    void (Benchmark::*method)(ThreadState*);
513
 
  };
514
 
 
515
 
  static void ThreadBody(void* v) {
516
 
    ThreadArg* arg = reinterpret_cast<ThreadArg*>(v);
517
 
    SharedState* shared = arg->shared;
518
 
    ThreadState* thread = arg->thread;
519
 
    {
520
 
      MutexLock l(&shared->mu);
521
 
      shared->num_initialized++;
522
 
      if (shared->num_initialized >= shared->total) {
523
 
        shared->cv.SignalAll();
524
 
      }
525
 
      while (!shared->start) {
526
 
        shared->cv.Wait();
527
 
      }
528
 
    }
529
 
 
530
 
    thread->stats.Start();
531
 
    (arg->bm->*(arg->method))(thread);
532
 
    thread->stats.Stop();
533
 
 
534
 
    {
535
 
      MutexLock l(&shared->mu);
536
 
      shared->num_done++;
537
 
      if (shared->num_done >= shared->total) {
538
 
        shared->cv.SignalAll();
539
 
      }
540
 
    }
541
 
  }
542
 
 
543
 
  void RunBenchmark(int n, Slice name,
544
 
                    void (Benchmark::*method)(ThreadState*)) {
545
 
    SharedState shared;
546
 
    shared.total = n;
547
 
    shared.num_initialized = 0;
548
 
    shared.num_done = 0;
549
 
    shared.start = false;
550
 
 
551
 
    ThreadArg* arg = new ThreadArg[n];
552
 
    for (int i = 0; i < n; i++) {
553
 
      arg[i].bm = this;
554
 
      arg[i].method = method;
555
 
      arg[i].shared = &shared;
556
 
      arg[i].thread = new ThreadState(i);
557
 
      arg[i].thread->shared = &shared;
558
 
      Env::Default()->StartThread(ThreadBody, &arg[i]);
559
 
    }
560
 
 
561
 
    shared.mu.Lock();
562
 
    while (shared.num_initialized < n) {
563
 
      shared.cv.Wait();
564
 
    }
565
 
 
566
 
    shared.start = true;
567
 
    shared.cv.SignalAll();
568
 
    while (shared.num_done < n) {
569
 
      shared.cv.Wait();
570
 
    }
571
 
    shared.mu.Unlock();
572
 
 
573
 
    for (int i = 1; i < n; i++) {
574
 
      arg[0].thread->stats.Merge(arg[i].thread->stats);
575
 
    }
576
 
    arg[0].thread->stats.Report(name);
577
 
 
578
 
    for (int i = 0; i < n; i++) {
579
 
      delete arg[i].thread;
580
 
    }
581
 
    delete[] arg;
582
 
  }
583
 
 
584
 
  void Crc32c(ThreadState* thread) {
585
 
    // Checksum about 500MB of data total
586
 
    const int size = 4096;
587
 
    const char* label = "(4K per op)";
588
 
    std::string data(size, 'x');
589
 
    int64_t bytes = 0;
590
 
    uint32_t crc = 0;
591
 
    while (bytes < 500 * 1048576) {
592
 
      crc = crc32c::Value(data.data(), size);
593
 
      thread->stats.FinishedSingleOp();
594
 
      bytes += size;
595
 
    }
596
 
    // Print so result is not dead
597
 
    fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc));
598
 
 
599
 
    thread->stats.AddBytes(bytes);
600
 
    thread->stats.AddMessage(label);
601
 
  }
602
 
 
603
 
  void AcquireLoad(ThreadState* thread) {
604
 
    int dummy;
605
 
    port::AtomicPointer ap(&dummy);
606
 
    int count = 0;
607
 
    void *ptr = NULL;
608
 
    thread->stats.AddMessage("(each op is 1000 loads)");
609
 
    while (count < 100000) {
610
 
      for (int i = 0; i < 1000; i++) {
611
 
        ptr = ap.Acquire_Load();
612
 
      }
613
 
      count++;
614
 
      thread->stats.FinishedSingleOp();
615
 
    }
616
 
    if (ptr == NULL) exit(1); // Disable unused variable warning.
617
 
  }
618
 
 
619
 
  void SnappyCompress(ThreadState* thread) {
620
 
    RandomGenerator gen;
621
 
    Slice input = gen.Generate(Options().block_size);
622
 
    int64_t bytes = 0;
623
 
    int64_t produced = 0;
624
 
    bool ok = true;
625
 
    std::string compressed;
626
 
    while (ok && bytes < 1024 * 1048576) {  // Compress 1G
627
 
      ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
628
 
      produced += compressed.size();
629
 
      bytes += input.size();
630
 
      thread->stats.FinishedSingleOp();
631
 
    }
632
 
 
633
 
    if (!ok) {
634
 
      thread->stats.AddMessage("(snappy failure)");
635
 
    } else {
636
 
      char buf[100];
637
 
      snprintf(buf, sizeof(buf), "(output: %.1f%%)",
638
 
               (produced * 100.0) / bytes);
639
 
      thread->stats.AddMessage(buf);
640
 
      thread->stats.AddBytes(bytes);
641
 
    }
642
 
  }
643
 
 
644
 
  void SnappyUncompress(ThreadState* thread) {
645
 
    RandomGenerator gen;
646
 
    Slice input = gen.Generate(Options().block_size);
647
 
    std::string compressed;
648
 
    bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
649
 
    int64_t bytes = 0;
650
 
    char* uncompressed = new char[input.size()];
651
 
    while (ok && bytes < 1024 * 1048576) {  // Compress 1G
652
 
      ok =  port::Snappy_Uncompress(compressed.data(), compressed.size(),
653
 
                                    uncompressed);
654
 
      bytes += input.size();
655
 
      thread->stats.FinishedSingleOp();
656
 
    }
657
 
    delete[] uncompressed;
658
 
 
659
 
    if (!ok) {
660
 
      thread->stats.AddMessage("(snappy failure)");
661
 
    } else {
662
 
      thread->stats.AddBytes(bytes);
663
 
    }
664
 
  }
665
 
 
666
 
  void Open() {
667
 
    assert(db_ == NULL);
668
 
    Options options;
669
 
    options.create_if_missing = !FLAGS_use_existing_db;
670
 
    options.block_cache = cache_;
671
 
    options.write_buffer_size = FLAGS_write_buffer_size;
672
 
    Status s = DB::Open(options, FLAGS_db, &db_);
673
 
    if (!s.ok()) {
674
 
      fprintf(stderr, "open error: %s\n", s.ToString().c_str());
675
 
      exit(1);
676
 
    }
677
 
  }
678
 
 
679
 
  void WriteSeq(ThreadState* thread) {
680
 
    DoWrite(thread, true);
681
 
  }
682
 
 
683
 
  void WriteRandom(ThreadState* thread) {
684
 
    DoWrite(thread, false);
685
 
  }
686
 
 
687
 
  void DoWrite(ThreadState* thread, bool seq) {
688
 
    if (num_ != FLAGS_num) {
689
 
      char msg[100];
690
 
      snprintf(msg, sizeof(msg), "(%d ops)", num_);
691
 
      thread->stats.AddMessage(msg);
692
 
    }
693
 
 
694
 
    RandomGenerator gen;
695
 
    WriteBatch batch;
696
 
    Status s;
697
 
    int64_t bytes = 0;
698
 
    for (int i = 0; i < num_; i += entries_per_batch_) {
699
 
      batch.Clear();
700
 
      for (int j = 0; j < entries_per_batch_; j++) {
701
 
        const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
702
 
        char key[100];
703
 
        snprintf(key, sizeof(key), "%016d", k);
704
 
        batch.Put(key, gen.Generate(value_size_));
705
 
        bytes += value_size_ + strlen(key);
706
 
        thread->stats.FinishedSingleOp();
707
 
      }
708
 
      s = db_->Write(write_options_, &batch);
709
 
      if (!s.ok()) {
710
 
        fprintf(stderr, "put error: %s\n", s.ToString().c_str());
711
 
        exit(1);
712
 
      }
713
 
    }
714
 
    thread->stats.AddBytes(bytes);
715
 
  }
716
 
 
717
 
  void ReadSequential(ThreadState* thread) {
718
 
    Iterator* iter = db_->NewIterator(ReadOptions());
719
 
    int i = 0;
720
 
    int64_t bytes = 0;
721
 
    for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) {
722
 
      bytes += iter->key().size() + iter->value().size();
723
 
      thread->stats.FinishedSingleOp();
724
 
      ++i;
725
 
    }
726
 
    delete iter;
727
 
    thread->stats.AddBytes(bytes);
728
 
  }
729
 
 
730
 
  void ReadReverse(ThreadState* thread) {
731
 
    Iterator* iter = db_->NewIterator(ReadOptions());
732
 
    int i = 0;
733
 
    int64_t bytes = 0;
734
 
    for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) {
735
 
      bytes += iter->key().size() + iter->value().size();
736
 
      thread->stats.FinishedSingleOp();
737
 
      ++i;
738
 
    }
739
 
    delete iter;
740
 
    thread->stats.AddBytes(bytes);
741
 
  }
742
 
 
743
 
  void ReadRandom(ThreadState* thread) {
744
 
    ReadOptions options;
745
 
    std::string value;
746
 
    for (int i = 0; i < reads_; i++) {
747
 
      char key[100];
748
 
      const int k = thread->rand.Next() % FLAGS_num;
749
 
      snprintf(key, sizeof(key), "%016d", k);
750
 
      db_->Get(options, key, &value);
751
 
      thread->stats.FinishedSingleOp();
752
 
    }
753
 
  }
754
 
 
755
 
  void ReadHot(ThreadState* thread) {
756
 
    ReadOptions options;
757
 
    std::string value;
758
 
    const int range = (FLAGS_num + 99) / 100;
759
 
    for (int i = 0; i < reads_; i++) {
760
 
      char key[100];
761
 
      const int k = thread->rand.Next() % range;
762
 
      snprintf(key, sizeof(key), "%016d", k);
763
 
      db_->Get(options, key, &value);
764
 
      thread->stats.FinishedSingleOp();
765
 
    }
766
 
  }
767
 
 
768
 
  void ReadWhileWriting(ThreadState* thread) {
769
 
    if (thread->tid > 0) {
770
 
      ReadRandom(thread);
771
 
    } else {
772
 
      // Special thread that keeps writing until other threads are done.
773
 
      RandomGenerator gen;
774
 
      while (true) {
775
 
        {
776
 
          MutexLock l(&thread->shared->mu);
777
 
          if (thread->shared->num_done + 1 >= thread->shared->num_initialized) {
778
 
            // Other threads have finished
779
 
            break;
780
 
          }
781
 
        }
782
 
 
783
 
        const int k = thread->rand.Next() % FLAGS_num;
784
 
        char key[100];
785
 
        snprintf(key, sizeof(key), "%016d", k);
786
 
        Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
787
 
        if (!s.ok()) {
788
 
          fprintf(stderr, "put error: %s\n", s.ToString().c_str());
789
 
          exit(1);
790
 
        }
791
 
      }
792
 
 
793
 
      // Do not count any of the preceding work/delay in stats.
794
 
      thread->stats.Start();
795
 
    }
796
 
  }
797
 
 
798
 
  void Compact(ThreadState* thread) {
799
 
    db_->CompactRange(NULL, NULL);
800
 
  }
801
 
 
802
 
  void PrintStats() {
803
 
    std::string stats;
804
 
    if (!db_->GetProperty("leveldb.stats", &stats)) {
805
 
      stats = "(failed)";
806
 
    }
807
 
    fprintf(stdout, "\n%s\n", stats.c_str());
808
 
  }
809
 
 
810
 
  static void WriteToFile(void* arg, const char* buf, int n) {
811
 
    reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n));
812
 
  }
813
 
 
814
 
  void HeapProfile() {
815
 
    char fname[100];
816
 
    snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
817
 
    WritableFile* file;
818
 
    Status s = Env::Default()->NewWritableFile(fname, &file);
819
 
    if (!s.ok()) {
820
 
      fprintf(stderr, "%s\n", s.ToString().c_str());
821
 
      return;
822
 
    }
823
 
    bool ok = port::GetHeapProfile(WriteToFile, file);
824
 
    delete file;
825
 
    if (!ok) {
826
 
      fprintf(stderr, "heap profiling not supported\n");
827
 
      Env::Default()->DeleteFile(fname);
828
 
    }
829
 
  }
830
 
};
831
 
 
832
 
}  // namespace leveldb
833
 
 
834
 
int main(int argc, char** argv) {
835
 
  FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
836
 
  FLAGS_open_files = leveldb::Options().max_open_files;
837
 
 
838
 
  for (int i = 1; i < argc; i++) {
839
 
    double d;
840
 
    int n;
841
 
    char junk;
842
 
    if (leveldb::Slice(argv[i]).starts_with("--benchmarks=")) {
843
 
      FLAGS_benchmarks = argv[i] + strlen("--benchmarks=");
844
 
    } else if (sscanf(argv[i], "--compression_ratio=%lf%c", &d, &junk) == 1) {
845
 
      FLAGS_compression_ratio = d;
846
 
    } else if (sscanf(argv[i], "--histogram=%d%c", &n, &junk) == 1 &&
847
 
               (n == 0 || n == 1)) {
848
 
      FLAGS_histogram = n;
849
 
    } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
850
 
               (n == 0 || n == 1)) {
851
 
      FLAGS_use_existing_db = n;
852
 
    } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
853
 
      FLAGS_num = n;
854
 
    } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
855
 
      FLAGS_reads = n;
856
 
    } else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) {
857
 
      FLAGS_threads = n;
858
 
    } else if (sscanf(argv[i], "--value_size=%d%c", &n, &junk) == 1) {
859
 
      FLAGS_value_size = n;
860
 
    } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
861
 
      FLAGS_write_buffer_size = n;
862
 
    } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) {
863
 
      FLAGS_cache_size = n;
864
 
    } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
865
 
      FLAGS_open_files = n;
866
 
    } else if (strncmp(argv[i], "--db=", 5) == 0) {
867
 
      FLAGS_db = argv[i] + 5;
868
 
    } else {
869
 
      fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
870
 
      exit(1);
871
 
    }
872
 
  }
873
 
 
874
 
  leveldb::Benchmark benchmark;
875
 
  benchmark.Run();
876
 
  return 0;
877
 
}