107
static bool AfterFile(const Comparator* ucmp,
108
const Slice* user_key, const FileMetaData* f) {
109
// NULL user_key occurs before all keys and is therefore never after *f
110
return (user_key != NULL &&
111
ucmp->Compare(*user_key, f->largest.user_key()) > 0);
114
static bool BeforeFile(const Comparator* ucmp,
115
const Slice* user_key, const FileMetaData* f) {
116
// NULL user_key occurs after all keys and is therefore never before *f
117
return (user_key != NULL &&
118
ucmp->Compare(*user_key, f->smallest.user_key()) < 0);
99
121
bool SomeFileOverlapsRange(
100
122
const InternalKeyComparator& icmp,
123
bool disjoint_sorted_files,
101
124
const std::vector<FileMetaData*>& files,
102
const Slice& smallest_user_key,
103
const Slice& largest_user_key) {
104
// Find the earliest possible internal key for smallest_user_key
105
InternalKey small(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
106
const uint32_t index = FindFile(icmp, files, small.Encode());
107
return ((index < files.size()) &&
108
icmp.user_comparator()->Compare(
109
largest_user_key, files[index]->smallest.user_key()) >= 0);
125
const Slice* smallest_user_key,
126
const Slice* largest_user_key) {
127
const Comparator* ucmp = icmp.user_comparator();
128
if (!disjoint_sorted_files) {
129
// Need to check against all files
130
for (int i = 0; i < files.size(); i++) {
131
const FileMetaData* f = files[i];
132
if (AfterFile(ucmp, smallest_user_key, f) ||
133
BeforeFile(ucmp, largest_user_key, f)) {
136
return true; // Overlap
142
// Binary search over file list
144
if (smallest_user_key != NULL) {
145
// Find the earliest possible internal key for smallest_user_key
146
InternalKey small(*smallest_user_key, kMaxSequenceNumber,kValueTypeForSeek);
147
index = FindFile(icmp, files, small.Encode());
150
if (index >= files.size()) {
151
// beginning of range is after all files, so no overlap.
155
return !BeforeFile(ucmp, largest_user_key, files[index]);
112
158
// An internal iterator. For a given version/level pair, yields
207
253
// If "*iter" points at a value or deletion for user_key, store
208
254
// either the value, or a NotFound error and return true.
209
255
// Else return false.
210
static bool GetValue(Iterator* iter, const Slice& user_key,
256
static bool GetValue(const Comparator* cmp,
257
Iterator* iter, const Slice& user_key,
211
258
std::string* value,
213
260
if (!iter->Valid()) {
360
407
bool Version::OverlapInLevel(int level,
361
const Slice& smallest_user_key,
362
const Slice& largest_user_key) {
363
return SomeFileOverlapsRange(vset_->icmp_, files_[level],
408
const Slice* smallest_user_key,
409
const Slice* largest_user_key) {
410
return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
411
smallest_user_key, largest_user_key);
414
int Version::PickLevelForMemTableOutput(
415
const Slice& smallest_user_key,
416
const Slice& largest_user_key) {
418
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
419
// Push to next level if there is no overlap in next level,
420
// and the #bytes overlapping in the level after that are limited.
421
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
422
InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
423
std::vector<FileMetaData*> overlaps;
424
while (level < config::kMaxMemCompactLevel) {
425
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
428
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
429
const int64_t sum = TotalFileSize(overlaps);
430
if (sum > kMaxGrandParentOverlapBytes) {
439
// Store in "*inputs" all files in "level" that overlap [begin,end]
440
void Version::GetOverlappingInputs(
442
const InternalKey* begin,
443
const InternalKey* end,
444
std::vector<FileMetaData*>* inputs) {
446
Slice user_begin, user_end;
448
user_begin = begin->user_key();
451
user_end = end->user_key();
453
const Comparator* user_cmp = vset_->icmp_.user_comparator();
454
for (size_t i = 0; i < files_[level].size(); ) {
455
FileMetaData* f = files_[level][i++];
456
const Slice file_start = f->smallest.user_key();
457
const Slice file_limit = f->largest.user_key();
458
if (begin != NULL && user_cmp->Compare(file_limit, user_begin) < 0) {
459
// "f" is completely before specified range; skip it
460
} else if (end != NULL && user_cmp->Compare(file_start, user_end) > 0) {
461
// "f" is completely after specified range; skip it
463
inputs->push_back(f);
465
// Level-0 files may overlap each other. So check if the newly
466
// added file has expanded the range. If so, restart search.
467
if (begin != NULL && user_cmp->Compare(file_start, user_begin) < 0) {
468
user_begin = file_start;
471
} else if (end != NULL && user_cmp->Compare(file_limit, user_end) > 0) {
472
user_end = file_limit;
368
481
std::string Version::DebugString() const {
540
653
const InternalKey& this_begin = v->files_[level][i]->smallest;
541
654
if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
542
655
fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
543
EscapeString(prev_end.Encode()).c_str(),
544
EscapeString(this_begin.Encode()).c_str());
656
prev_end.DebugString().c_str(),
657
this_begin.DebugString().c_str());
967
1072
for (int level = 1; level < config::kNumLevels - 1; level++) {
968
1073
for (size_t i = 0; i < current_->files_[level].size(); i++) {
969
1074
const FileMetaData* f = current_->files_[level][i];
970
GetOverlappingInputs(level+1, f->smallest, f->largest, &overlaps);
1075
current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest,
971
1077
const int64_t sum = TotalFileSize(overlaps);
972
1078
if (sum > result) {
980
// Store in "*inputs" all files in "level" that overlap [begin,end]
981
void VersionSet::GetOverlappingInputs(
983
const InternalKey& begin,
984
const InternalKey& end,
985
std::vector<FileMetaData*>* inputs) {
987
Slice user_begin = begin.user_key();
988
Slice user_end = end.user_key();
989
const Comparator* user_cmp = icmp_.user_comparator();
990
for (size_t i = 0; i < current_->files_[level].size(); i++) {
991
FileMetaData* f = current_->files_[level][i];
992
if (user_cmp->Compare(f->largest.user_key(), user_begin) < 0 ||
993
user_cmp->Compare(f->smallest.user_key(), user_end) > 0) {
994
// Either completely before or after range; skip it
996
inputs->push_back(f);
1001
1086
// Stores the minimal range that covers all entries in inputs in
1002
1087
// *smallest, *largest.
1003
1088
// REQUIRES: inputs is not empty
1113
1198
// Note that the next call will discard the file we placed in
1114
1199
// c->inputs_[0] earlier and replace it with an overlapping set
1115
1200
// which will include the picked file.
1116
GetOverlappingInputs(0, smallest, largest, &c->inputs_[0]);
1201
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]);
1117
1202
assert(!c->inputs_[0].empty());
1127
1212
InternalKey smallest, largest;
1128
1213
GetRange(c->inputs_[0], &smallest, &largest);
1130
GetOverlappingInputs(level+1, smallest, largest, &c->inputs_[1]);
1215
current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1]);
1132
1217
// Get entire range covered by compaction
1133
1218
InternalKey all_start, all_limit;
1137
1222
// changing the number of "level+1" files we pick up.
1138
1223
if (!c->inputs_[1].empty()) {
1139
1224
std::vector<FileMetaData*> expanded0;
1140
GetOverlappingInputs(level, all_start, all_limit, &expanded0);
1225
current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0);
1141
1226
if (expanded0.size() > c->inputs_[0].size()) {
1142
1227
InternalKey new_start, new_limit;
1143
1228
GetRange(expanded0, &new_start, &new_limit);
1144
1229
std::vector<FileMetaData*> expanded1;
1145
GetOverlappingInputs(level+1, new_start, new_limit, &expanded1);
1230
current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
1146
1232
if (expanded1.size() == c->inputs_[1].size()) {
1147
1233
Log(options_->info_log,
1148
1234
"Expanding@%d %d+%d to %d+%d\n",
1163
1249
// Compute the set of grandparent files that overlap this compaction
1164
1250
// (parent == level+1; grandparent == level+2)
1165
1251
if (level + 2 < config::kNumLevels) {
1166
GetOverlappingInputs(level + 2, all_start, all_limit, &c->grandparents_);
1252
current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
1170
1257
Log(options_->info_log, "Compacting %d '%s' .. '%s'",
1172
EscapeString(smallest.Encode()).c_str(),
1173
EscapeString(largest.Encode()).c_str());
1259
smallest.DebugString().c_str(),
1260
largest.DebugString().c_str());
1176
1263
// Update the place where we will do the next compaction for this level.
1184
1271
Compaction* VersionSet::CompactRange(
1186
const InternalKey& begin,
1187
const InternalKey& end) {
1273
const InternalKey* begin,
1274
const InternalKey* end) {
1188
1275
std::vector<FileMetaData*> inputs;
1189
GetOverlappingInputs(level, begin, end, &inputs);
1276
current_->GetOverlappingInputs(level, begin, end, &inputs);
1190
1277
if (inputs.empty()) {
1281
// Avoid compacting too much in one shot in case the range is large.
1282
const uint64_t limit = MaxFileSizeForLevel(level);
1284
for (int i = 0; i < inputs.size(); i++) {
1285
uint64_t s = inputs[i]->file_size;
1287
if (total >= limit) {
1288
inputs.resize(i + 1);
1194
1293
Compaction* c = new Compaction(level);
1195
1294
c->input_version_ = current_;
1196
1295
c->input_version_->Ref();