1
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file. See the AUTHORS file for names of contributors.
5
#include "db/log_reader.h"
6
#include "db/log_writer.h"
7
#include "leveldb/env.h"
8
#include "util/coding.h"
9
#include "util/crc32c.h"
10
#include "util/random.h"
11
#include "util/testharness.h"
16
// Construct a string of the specified length made out of the supplied
18
static std::string BigString(const std::string& partial_string, size_t n) {
20
while (result.size() < n) {
21
result.append(partial_string);
27
// Construct a string from a number
28
static std::string NumberString(int n) {
30
snprintf(buf, sizeof(buf), "%d.", n);
31
return std::string(buf);
34
// Return a skewed potentially long string
35
static std::string RandomSkewedString(int i, Random* rnd) {
36
return BigString(NumberString(i), rnd->Skewed(17));
41
class StringDest : public WritableFile {
43
std::string contents_;
45
virtual Status Close() { return Status::OK(); }
46
virtual Status Flush() { return Status::OK(); }
47
virtual Status Sync() { return Status::OK(); }
48
virtual Status Append(const Slice& slice) {
49
contents_.append(slice.data(), slice.size());
54
class StringSource : public SequentialFile {
58
bool returned_partial_;
59
StringSource() : force_error_(false), returned_partial_(false) { }
61
virtual Status Read(size_t n, Slice* result, char* scratch) {
62
ASSERT_TRUE(!returned_partial_) << "must not Read() after eof/error";
66
returned_partial_ = true;
67
return Status::Corruption("read error");
70
if (contents_.size() < n) {
72
returned_partial_ = true;
74
*result = Slice(contents_.data(), n);
75
contents_.remove_prefix(n);
79
virtual Status Skip(uint64_t n) {
80
if (n > contents_.size()) {
82
return Status::NotFound("in-memory file skipepd past end");
85
contents_.remove_prefix(n);
91
class ReportCollector : public Reader::Reporter {
93
size_t dropped_bytes_;
96
ReportCollector() : dropped_bytes_(0) { }
97
virtual void Corruption(size_t bytes, const Status& status) {
98
dropped_bytes_ += bytes;
99
message_.append(status.ToString());
104
StringSource source_;
105
ReportCollector report_;
110
// Record metadata for testing initial offset functionality
111
static size_t initial_offset_record_sizes_[];
112
static uint64_t initial_offset_last_record_offsets_[];
115
LogTest() : reading_(false),
117
reader_(&source_, &report_, true/*checksum*/,
118
0/*initial_offset*/) {
121
void Write(const std::string& msg) {
122
ASSERT_TRUE(!reading_) << "Write() after starting to read";
123
writer_.AddRecord(Slice(msg));
126
size_t WrittenBytes() const {
127
return dest_.contents_.size();
133
source_.contents_ = Slice(dest_.contents_);
137
if (reader_.ReadRecord(&record, &scratch)) {
138
return record.ToString();
144
void IncrementByte(int offset, int delta) {
145
dest_.contents_[offset] += delta;
148
void SetByte(int offset, char new_byte) {
149
dest_.contents_[offset] = new_byte;
152
void ShrinkSize(int bytes) {
153
dest_.contents_.resize(dest_.contents_.size() - bytes);
156
void FixChecksum(int header_offset, int len) {
157
// Compute crc of type/len/data
158
uint32_t crc = crc32c::Value(&dest_.contents_[header_offset+6], 1 + len);
159
crc = crc32c::Mask(crc);
160
EncodeFixed32(&dest_.contents_[header_offset], crc);
164
source_.force_error_ = true;
167
size_t DroppedBytes() const {
168
return report_.dropped_bytes_;
171
std::string ReportMessage() const {
172
return report_.message_;
175
// Returns OK iff recorded error message contains "msg"
176
std::string MatchError(const std::string& msg) const {
177
if (report_.message_.find(msg) == std::string::npos) {
178
return report_.message_;
184
void WriteInitialOffsetLog() {
185
for (int i = 0; i < 4; i++) {
186
std::string record(initial_offset_record_sizes_[i],
187
static_cast<char>('a' + i));
192
void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
193
WriteInitialOffsetLog();
195
source_.contents_ = Slice(dest_.contents_);
196
Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
197
WrittenBytes() + offset_past_end);
200
ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
201
delete offset_reader;
204
void CheckInitialOffsetRecord(uint64_t initial_offset,
205
int expected_record_offset) {
206
WriteInitialOffsetLog();
208
source_.contents_ = Slice(dest_.contents_);
209
Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
213
ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
214
ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
216
ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
217
offset_reader->LastRecordOffset());
218
ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
219
delete offset_reader;
224
size_t LogTest::initial_offset_record_sizes_[] =
225
{10000, // Two sizable records in first block
227
2 * log::kBlockSize - 1000, // Span three blocks
230
uint64_t LogTest::initial_offset_last_record_offsets_[] =
233
2 * (kHeaderSize + 10000),
234
2 * (kHeaderSize + 10000) +
235
(2 * log::kBlockSize - 1000) + 3 * kHeaderSize};
238
TEST(LogTest, Empty) {
239
ASSERT_EQ("EOF", Read());
242
TEST(LogTest, ReadWrite) {
247
ASSERT_EQ("foo", Read());
248
ASSERT_EQ("bar", Read());
249
ASSERT_EQ("", Read());
250
ASSERT_EQ("xxxx", Read());
251
ASSERT_EQ("EOF", Read());
252
ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
255
TEST(LogTest, ManyBlocks) {
256
for (int i = 0; i < 100000; i++) {
257
Write(NumberString(i));
259
for (int i = 0; i < 100000; i++) {
260
ASSERT_EQ(NumberString(i), Read());
262
ASSERT_EQ("EOF", Read());
265
TEST(LogTest, Fragmentation) {
267
Write(BigString("medium", 50000));
268
Write(BigString("large", 100000));
269
ASSERT_EQ("small", Read());
270
ASSERT_EQ(BigString("medium", 50000), Read());
271
ASSERT_EQ(BigString("large", 100000), Read());
272
ASSERT_EQ("EOF", Read());
275
TEST(LogTest, MarginalTrailer) {
276
// Make a trailer that is exactly the same length as an empty record.
277
const int n = kBlockSize - 2*kHeaderSize;
278
Write(BigString("foo", n));
279
ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
282
ASSERT_EQ(BigString("foo", n), Read());
283
ASSERT_EQ("", Read());
284
ASSERT_EQ("bar", Read());
285
ASSERT_EQ("EOF", Read());
288
TEST(LogTest, MarginalTrailer2) {
289
// Make a trailer that is exactly the same length as an empty record.
290
const int n = kBlockSize - 2*kHeaderSize;
291
Write(BigString("foo", n));
292
ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
294
ASSERT_EQ(BigString("foo", n), Read());
295
ASSERT_EQ("bar", Read());
296
ASSERT_EQ("EOF", Read());
297
ASSERT_EQ(0, DroppedBytes());
298
ASSERT_EQ("", ReportMessage());
301
TEST(LogTest, ShortTrailer) {
302
const int n = kBlockSize - 2*kHeaderSize + 4;
303
Write(BigString("foo", n));
304
ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
307
ASSERT_EQ(BigString("foo", n), Read());
308
ASSERT_EQ("", Read());
309
ASSERT_EQ("bar", Read());
310
ASSERT_EQ("EOF", Read());
313
TEST(LogTest, AlignedEof) {
314
const int n = kBlockSize - 2*kHeaderSize + 4;
315
Write(BigString("foo", n));
316
ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
317
ASSERT_EQ(BigString("foo", n), Read());
318
ASSERT_EQ("EOF", Read());
321
TEST(LogTest, RandomRead) {
323
Random write_rnd(301);
324
for (int i = 0; i < N; i++) {
325
Write(RandomSkewedString(i, &write_rnd));
327
Random read_rnd(301);
328
for (int i = 0; i < N; i++) {
329
ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
331
ASSERT_EQ("EOF", Read());
334
// Tests of all the error paths in log_reader.cc follow:
336
TEST(LogTest, ReadError) {
339
ASSERT_EQ("EOF", Read());
340
ASSERT_EQ(kBlockSize, DroppedBytes());
341
ASSERT_EQ("OK", MatchError("read error"));
344
TEST(LogTest, BadRecordType) {
346
// Type is stored in header[6]
347
IncrementByte(6, 100);
349
ASSERT_EQ("EOF", Read());
350
ASSERT_EQ(3, DroppedBytes());
351
ASSERT_EQ("OK", MatchError("unknown record type"));
354
TEST(LogTest, TruncatedTrailingRecord) {
356
ShrinkSize(4); // Drop all payload as well as a header byte
357
ASSERT_EQ("EOF", Read());
358
ASSERT_EQ(kHeaderSize - 1, DroppedBytes());
359
ASSERT_EQ("OK", MatchError("truncated record at end of file"));
362
TEST(LogTest, BadLength) {
365
ASSERT_EQ("EOF", Read());
366
ASSERT_EQ(kHeaderSize + 2, DroppedBytes());
367
ASSERT_EQ("OK", MatchError("bad record length"));
370
TEST(LogTest, ChecksumMismatch) {
372
IncrementByte(0, 10);
373
ASSERT_EQ("EOF", Read());
374
ASSERT_EQ(10, DroppedBytes());
375
ASSERT_EQ("OK", MatchError("checksum mismatch"));
378
TEST(LogTest, UnexpectedMiddleType) {
380
SetByte(6, kMiddleType);
382
ASSERT_EQ("EOF", Read());
383
ASSERT_EQ(3, DroppedBytes());
384
ASSERT_EQ("OK", MatchError("missing start"));
387
TEST(LogTest, UnexpectedLastType) {
389
SetByte(6, kLastType);
391
ASSERT_EQ("EOF", Read());
392
ASSERT_EQ(3, DroppedBytes());
393
ASSERT_EQ("OK", MatchError("missing start"));
396
TEST(LogTest, UnexpectedFullType) {
399
SetByte(6, kFirstType);
401
ASSERT_EQ("bar", Read());
402
ASSERT_EQ("EOF", Read());
403
ASSERT_EQ(3, DroppedBytes());
404
ASSERT_EQ("OK", MatchError("partial record without end"));
407
TEST(LogTest, UnexpectedFirstType) {
409
Write(BigString("bar", 100000));
410
SetByte(6, kFirstType);
412
ASSERT_EQ(BigString("bar", 100000), Read());
413
ASSERT_EQ("EOF", Read());
414
ASSERT_EQ(3, DroppedBytes());
415
ASSERT_EQ("OK", MatchError("partial record without end"));
418
TEST(LogTest, ErrorJoinsRecords) {
419
// Consider two fragmented records:
420
// first(R1) last(R1) first(R2) last(R2)
421
// where the middle two fragments disappear. We do not want
422
// first(R1),last(R2) to get joined and returned as a valid record.
424
// Write records that span two blocks
425
Write(BigString("foo", kBlockSize));
426
Write(BigString("bar", kBlockSize));
429
// Wipe the middle block
430
for (int offset = kBlockSize; offset < 2*kBlockSize; offset++) {
431
SetByte(offset, 'x');
434
ASSERT_EQ("correct", Read());
435
ASSERT_EQ("EOF", Read());
436
const int dropped = DroppedBytes();
437
ASSERT_LE(dropped, 2*kBlockSize + 100);
438
ASSERT_GE(dropped, 2*kBlockSize);
441
TEST(LogTest, ReadStart) {
442
CheckInitialOffsetRecord(0, 0);
445
TEST(LogTest, ReadSecondOneOff) {
446
CheckInitialOffsetRecord(1, 1);
449
TEST(LogTest, ReadSecondTenThousand) {
450
CheckInitialOffsetRecord(10000, 1);
453
TEST(LogTest, ReadSecondStart) {
454
CheckInitialOffsetRecord(10007, 1);
457
TEST(LogTest, ReadThirdOneOff) {
458
CheckInitialOffsetRecord(10008, 2);
461
TEST(LogTest, ReadThirdStart) {
462
CheckInitialOffsetRecord(20014, 2);
465
TEST(LogTest, ReadFourthOneOff) {
466
CheckInitialOffsetRecord(20015, 3);
469
TEST(LogTest, ReadFourthFirstBlockTrailer) {
470
CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
473
TEST(LogTest, ReadFourthMiddleBlock) {
474
CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
477
TEST(LogTest, ReadFourthLastBlock) {
478
CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
481
TEST(LogTest, ReadFourthStart) {
482
CheckInitialOffsetRecord(
483
2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
487
TEST(LogTest, ReadEnd) {
488
CheckOffsetPastEndReturnsNoRecords(0);
491
TEST(LogTest, ReadPastEnd) {
492
CheckOffsetPastEndReturnsNoRecords(5);
496
} // namespace leveldb
498
int main(int argc, char** argv) {
499
return leveldb::test::RunAllTests();