2
* Copyright (C) 2015 Canonical Ltd.
4
* This program is free software: you can redistribute it and/or modify
5
* it under the terms of the GNU Lesser General Public License version 3 as
6
* published by the Free Software Foundation.
8
* This program is distributed in the hope that it will be useful,
9
* but WITHOUT ANY WARRANTY; without even the implied warranty of
10
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11
* GNU Lesser General Public License for more details.
13
* You should have received a copy of the GNU Lesser General Public License
14
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16
* Authored by: Michi Henning <michi.henning@canonical.com>
19
#include <core/internal/persistent_string_cache_impl.h>
21
#include <core/internal/persistent_string_cache_stats.h>
23
#include <leveldb/cache.h>
24
#include <leveldb/write_batch.h>
28
#include <system_error>
31
We have three tables and two secondary indexes in the DB:
34
The Values table maps keys to values.
36
- Key -> <Access time, Expiry time, Size>
37
The Data table maps keys to the access time, expire time,
38
and entry size. (Size is the sum of key, value, and metadata sizes.)
41
The Metadata table maps keys to metadata for the entry.
42
If no metadata exists for an entry, there is no row
45
- <Access time, Key> -> Size
46
The Atime index provides access in order of oldest-to-newest access time.
47
This allows efficient trimming based on LRU order.
49
- <Expiry time, Key> -> Size
50
The Etime index provides access in order of soonest-to-latest expiry time.
51
This allows efficient trimming of expired entries. For lru_only,
52
no entry is added to this index, and the corresponding expiry time in the
53
Data table is 0. For lru_ttl, only entries that actually
54
do have an expiry time are added.
56
The tables and indexes each map to a different region of the leveldb based on a prefix.
57
Tuple entries are separated by spaces. Entries are sorted in lexicographical order
58
by the DB; to ensure correct numerical comparison for the secondary indexes,
59
times are in milliseconds since the epoch, with fixed-width width zero padding
60
and 13 decimal digits. (That works out to more than 316 years past the epoch.)
62
Some examples to illustrate how it hangs together with lru_ttl. (Note that,
63
in reality, all four tables really sit inside the single leveldb table, separated
64
by the prefixes of their keys. They are shown as separate tables below to
65
make things easier to read.)
67
At time 0010, insert Bjarne -> Stroustrup, expires 1010,
68
at time 0020, insert Andy -> Koenig, expires 2020,
69
at time 0030, insert Scott -> Meyers, does not expire
70
at time 0040, insert Stan -> Lippman, expires 1040
74
Key | Value Key | Access time | Expiry time | Size
75
--------+----------- --------+---------------------------------
76
AAndy | Koenig BAndy | 20 | 2020 | 10
77
ABjarne | Stroustrup BBjarne | 10 | 1010 | 16
78
AScott | Meyers BScott | 30 | 0 | 11
79
AStan | Lippman BStan | 40 | 1040 | 11
82
Atime index: Etime index:
85
----------------------+----- ----------------------+-----
86
D0000000000010 Bjarne | 16 E0000000001010 Bjarne | 16
87
D0000000000020 Andy | 10 E0000000001040 Stan | 11
88
D0000000000030 Scott | 11 E0000000002020 Andy | 10
89
D0000000000040 Stan | 11
91
Note that, because the expiry time for Scott is infinite, no entry appears in the Etime index.
93
At time 100, we call get("Bjarne"). This updates the Data table and Atime index with the new access time.
94
(The Values table and Etime index are unchanged.)
98
Key | Value Key | Access time | Expiry time | Size
99
--------+----------- --------+---------------------------------
100
AAndy | Koenig BAndy | 20 | 2020 | 10
101
ABjarne | Stroustrup BBjarne | 100 | 1010 | 16
102
AScott | Meyers BScott | 30 | 0 | 11
103
AStan | Lippman BStan | 40 | 1040 | 11
106
Atime index: Etime index:
108
Key | Size Key | Size
109
----------------------+----- ----------------------+-----
110
D0000000000020 Andy | 10 E0000000001010 Bjarne | 16
111
D0000000000030 Scott | 11 E0000000001040 Stan | 11
112
D0000000000040 Stan | 11 E0000000002020 Andy | 10
113
D0000000000100 Bjarne | 16
115
In other words, the Atime and Etime indexes are always sorted in earliest-to-latest order
116
of expiry; this allows us to efficiently trim the cache once it is full. The sizes are
117
stored redundantly so we can efficiently determine the point at which we have removed
118
enough entries in order to make room for a new one.
120
If this example were to use lru_only, the Etime index would remain empty, and the expiry times
121
in the Data table would all be chrono::duration_cast<chrono::milliseconds>(chrono::time_point()).count().
122
That value typically is zero (but this is not guaranteed by the standard).
124
The Metadata table simply maps each key to its metadata. If no metadata exists
125
for an entry, there is no corresponding row in the table. For example, if Andy
126
and Scott had metadata, but Stan and Bjarne didn't, we'd have:
147
static string const class_name = "PersistentStringCache"; // For exception messages
149
static leveldb::WriteOptions write_options;
150
static leveldb::ReadOptions read_options;
152
// Schema version. If the way things are written to leveldb changes, the
153
// schema version here must be changed, too. If an existing cache is opened
154
// with a different schema version, the cache is simply thrown away, so
155
// it will automatically be re-created using the latest schema.
157
static int const SCHEMA_VERSION = 2; // Increment whenever schema changes!
159
// Prefixes to divide the key space into logical tables/indexes.
160
// All prefixes must have length 1. The end prefix must be
161
// the "one past the end" value for the prefix range. For example,
162
// "B" is the first prefix that can't be an entry in the Values table.
164
// Do not change the prefix without also checking that ALL_BEGIN and
165
// ALL_END are still correct!
167
static string const VALUES_BEGIN = "A";
168
static string const VALUES_END = "B";
170
static string const DATA_BEGIN = "B";
171
static string const DATA_END = "C";
173
static string const METADATA_BEGIN = "C";
174
static string const METADATA_END = "D";
176
static string const ATIME_BEGIN = "D";
177
static string const ATIME_END = "E";
179
static string const ETIME_BEGIN = "E";
180
static string const ETIME_END = "F";
182
// We store the stats so they are not lost across process re-starts.
184
static string const STATS_BEGIN = "X";
185
static string const STATS_END = "Y";
187
// The settings range stores data about the cache itself, such as
188
// max size and expiration policy. The prefix for this
189
// range must be outside the range [ALL_BEGIN..ALL_END).
190
// The schema version is there so we can change the way things are written into leveldb
191
// and detect when an old cache is opened with a newer version.
193
static string const SETTINGS_BEGIN = "Y";
194
static string const SETTINGS_END = "Z";
196
// This key stores a dirty flag that we set after successful open
197
// and clear in the destructor. This allows to detect, on start-up
198
// if we shut down cleanly.
200
static string const DIRTY_FLAG = "!DIRTY";
202
// These span the entire range of keys in all tables and stats (except settings and dirty flag).
204
static string const ALL_BEGIN = VALUES_BEGIN; // Must be lowest prefix for all tables and indexes, incl stats.
205
static string const ALL_END = SETTINGS_BEGIN; // Must be highest prefix for all tables and indexes, incl stats.
207
static string const SETTINGS_MAX_SIZE = SETTINGS_BEGIN + "MAX_SIZE";
208
static string const SETTINGS_POLICY = SETTINGS_BEGIN + "POLICY";
209
static string const SETTINGS_SCHEMA_VERSION = SETTINGS_BEGIN + "SCHEMA_VERSION";
211
static string const STATS_VALUES = STATS_BEGIN + "VALUES";
213
// Simple struct to serialize/deserialize a time-key tuple.
214
// For the stringified representation, time and key are
215
// separated by a space.
219
int64_t time; // msec since the epoch
222
TimeKeyTuple(int64_t t, string const& s)
228
TimeKeyTuple(string const& s)
230
auto pos = s.find(' ');
231
assert(pos != string::npos);
232
string t(s.substr(0, pos));
236
key = s.substr(pos + 1);
239
TimeKeyTuple(TimeKeyTuple const&) = default;
240
TimeKeyTuple(TimeKeyTuple&&) = default;
242
TimeKeyTuple& operator=(TimeKeyTuple const&) = default;
243
TimeKeyTuple& operator=(TimeKeyTuple&&) = default;
245
string to_string() const
247
// We zero-fill the time, so entries collate lexicographically in old-to-new order.
249
os << setfill('0') << setw(13) << time << " " << key;
254
// Key creation methods. These methods return the key into the corresponding
255
// table or index with the correct prefix and with tuple keys concatenated
256
// with a space separator.
258
string k_data(string const& key)
260
return DATA_BEGIN + key;
263
string k_metadata(string const& key)
265
return METADATA_BEGIN + key;
268
string k_atime_index(int64_t atime, string const& key)
270
return ATIME_BEGIN + TimeKeyTuple(atime, key).to_string();
273
string k_etime_index(int64_t etime, string const& key)
275
return ETIME_BEGIN + TimeKeyTuple(etime, key).to_string();
278
// Little helpers to get milliseconds since the epoch.
280
int64_t ticks(chrono::time_point<chrono::system_clock> tp) noexcept
282
return chrono::duration_cast<chrono::milliseconds>(tp.time_since_epoch()).count();
285
int64_t now_ticks() noexcept
287
return ticks(chrono::system_clock::now());
290
// Usually zero, but the standard doesn't guarantee this.
292
static auto const clock_origin = ticks(chrono::system_clock::time_point());
294
int64_t epoch_ticks() noexcept
299
typedef std::unique_ptr<leveldb::Iterator> IteratorUPtr;
303
// For assertions, so we can verify that num_entries_ matches the sum of entries in the histogram.
305
int64_t hist_sum(PersistentCacheStats::Histogram const& h) noexcept
319
void PersistentStringCacheImpl::init_stats()
324
// If we shut down cleanly last time, read the saved stats values.
325
bool is_dirty = read_dirty_flag();
332
// We didn't shut down cleanly or the cache is new.
333
// Run over the Atime index (it's smaller than the Data table)
334
// and count the number of entries and bytes, and initialize
336
IteratorUPtr it(db_->NewIterator(read_options));
337
leveldb::Slice const atime_prefix(ATIME_BEGIN);
338
it->Seek(atime_prefix);
339
while (it->Valid() && it->key().starts_with(atime_prefix))
342
auto bytes = stoll(it->value().ToString());
344
stats_->hist_increment(bytes);
347
throw_if_error(it->status(), "cannot initialize cache");
348
stats_->num_entries_ = num;
349
stats_->cache_size_ = size;
351
assert(stats_->num_entries_ == hist_sum(stats_->hist_));
354
// Open existing database or create an empty one.
356
PersistentStringCacheImpl::PersistentStringCacheImpl(string const& cache_path,
357
int64_t max_size_in_bytes,
358
CacheDiscardPolicy policy,
359
PersistentStringCache* pimpl)
361
, stats_(make_shared<PersistentStringCacheStats>())
363
stats_->cache_path_ = cache_path;
364
if (max_size_in_bytes < 1)
366
throw_invalid_argument("invalid max_size_in_bytes (" + to_string(max_size_in_bytes) + "): value must be > 0");
368
stats_->max_cache_size_ = max_size_in_bytes;
369
stats_->policy_ = policy;
371
leveldb::Options options;
372
options.create_if_missing = true;
374
// For small caches, reduce memory consumption by reducing the size of the internal block cache.
375
// The block cache size is at least 512 kB. For caches 5-80 MB, it is 10% of the nominal cache size.
376
// For caches > 80 MB, the block cache is left at the default of 8 MB.
377
size_t block_cache_size = max_size_in_bytes / 10;
378
if (block_cache_size < 512 * 1024)
380
block_cache_size = 512 * 1024;
382
if (block_cache_size < 8 * 1024 * 1024)
384
block_cache_.reset(leveldb::NewLRUCache(block_cache_size));
385
options.block_cache = block_cache_.get();
398
check_version(); // Wipes DB if version doesn't match.
401
// For an already-existing cache, check that size and policy match.
402
if (stats_->max_cache_size_ != max_size_in_bytes)
404
throw_logic_error(string("existing cache opened with different max_size_in_bytes (") +
405
to_string(max_size_in_bytes) + "), existing size = " +
406
to_string(stats_->max_cache_size_));
408
if (stats_->policy_ != policy)
410
auto to_string = [](CacheDiscardPolicy p)
412
return p == core::CacheDiscardPolicy::lru_only ? "lru_only" : "lru_ttl";
414
string msg = string("existing cache opened with different policy (") + to_string(policy) +
415
"), existing policy = " + to_string(stats_->policy_);
416
throw_logic_error(msg);
421
write_dirty_flag(true);
424
// Open existing database.
426
PersistentStringCacheImpl::PersistentStringCacheImpl(string const& cache_path, PersistentStringCache* pimpl)
428
, stats_(make_shared<PersistentStringCacheStats>())
430
stats_->cache_path_ = cache_path;
432
init_db(leveldb::Options()); // Throws if DB doesn't exist.
434
check_version(); // Wipes DB if version doesn't match.
438
write_dirty_flag(true);
441
PersistentStringCacheImpl::~PersistentStringCacheImpl()
446
write_dirty_flag(false);
449
catch (std::exception const& e)
451
cerr << make_message(string("~PersistentStringCacheImpl(): ") + e.what()) << endl;
455
cerr << make_message("~PersistentStringCacheImpl(): unknown exception") << endl;
460
bool PersistentStringCacheImpl::get(string const& key, string& value) const
462
return get(key, value, nullptr);
465
bool PersistentStringCacheImpl::get(string const& key, string& value, string* metadata) const
469
throw_invalid_argument("get(): key must be non-empty");
472
lock_guard<decltype(mutex_)> lock(mutex_);
474
string data_key = k_data(key);
476
bool found = get_value_and_metadata(key, dt, value, metadata);
479
stats_->inc_misses();
480
call_handler(key, CacheEventIndex::miss);
484
// Don't return expired entry.
485
int64_t new_atime = now_ticks();
486
if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && dt.etime != epoch_ticks() && dt.etime <= new_atime)
488
call_handler(key, CacheEventIndex::miss);
489
stats_->inc_misses();
493
leveldb::WriteBatch batch;
495
batch.Delete(k_atime_index(dt.atime, key)); // Delete old atime entry
496
dt.atime = new_atime;
497
dt.size = key.size() + value.size();
500
dt.size += metadata->size();
502
batch.Put(data_key, dt.to_string());
503
batch.Put(k_atime_index(dt.atime, key), to_string(dt.size));
505
auto s = db_->Write(write_options, &batch);
506
throw_if_error(s, "put()");
509
call_handler(key, CacheEventIndex::get);
513
bool PersistentStringCacheImpl::get_metadata(string const& key, string& metadata) const
517
throw_invalid_argument("get_metadata(): key must be non-empty");
520
lock_guard<decltype(mutex_)> lock(mutex_);
522
string data_key = k_data(key);
524
auto dt = get_data(data_key, found);
530
// Don't return expired entry.
531
if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && dt.etime != epoch_ticks() && dt.etime <= now_ticks())
535
auto s = db_->Get(read_options, k_metadata(key), &metadata);
536
throw_if_error(s, "get_metadata()");
537
return !s.IsNotFound();
540
bool PersistentStringCacheImpl::contains_key(string const& key) const
544
throw_invalid_argument("contains_key(): key must be non-empty");
547
lock_guard<decltype(mutex_)> lock(mutex_);
549
string data_key = k_data(key);
551
auto dt = get_data(data_key, found);
556
if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && dt.etime != epoch_ticks() && dt.etime <= now_ticks())
558
return false; // Expired entries are not returned.
563
int64_t PersistentStringCacheImpl::size() const noexcept
565
lock_guard<decltype(mutex_)> lock(mutex_);
567
return stats_->num_entries_;
570
int64_t PersistentStringCacheImpl::size_in_bytes() const noexcept
572
lock_guard<decltype(mutex_)> lock(mutex_);
574
return stats_->cache_size_;
577
int64_t PersistentStringCacheImpl::max_size_in_bytes() const noexcept
579
lock_guard<decltype(mutex_)> lock(mutex_);
581
return stats_->max_cache_size_;
584
int64_t PersistentStringCacheImpl::disk_size_in_bytes() const
586
lock_guard<decltype(mutex_)> lock(mutex_);
588
leveldb::Range everything(ALL_BEGIN, SETTINGS_END);
589
array<uint64_t, 1> sizes = {{0}};
590
db_->GetApproximateSizes(&everything, 1, sizes.data());
594
CacheDiscardPolicy PersistentStringCacheImpl::discard_policy() const noexcept
596
return stats_->policy_; // Immutable
599
PersistentCacheStats PersistentStringCacheImpl::stats() const
601
lock_guard<decltype(mutex_)> lock(mutex_);
603
// We make a copy here so values can't change underneath the caller.
604
return PersistentCacheStats(make_shared<PersistentStringCacheStats>(*stats_));
607
bool PersistentStringCacheImpl::put(string const& key,
609
chrono::time_point<chrono::system_clock> expiry_time)
611
return put(key, value.data(), value.size(), nullptr, 0, expiry_time);
614
bool PersistentStringCacheImpl::put(string const& key,
615
char const* value_data,
617
chrono::time_point<chrono::system_clock> expiry_time)
619
return put(key, value_data, value_size, nullptr, 0, expiry_time);
622
bool PersistentStringCacheImpl::put(string const& key,
624
string const* metadata,
625
chrono::time_point<chrono::system_clock> expiry_time)
629
value.data(), value.size(),
630
metadata->data(), metadata->size(),
634
bool PersistentStringCacheImpl::put(string const& key,
635
char const* value_data,
637
char const* metadata_data,
638
int64_t metadata_size,
639
chrono::time_point<chrono::system_clock> expiry_time)
643
throw_invalid_argument("put(): key must be non-empty");
647
throw_invalid_argument("put(): value must not be nullptr");
651
throw_invalid_argument("put(): invalid negative value size: " + to_string(value_size));
653
if (metadata_data && metadata_size < 0)
655
throw_invalid_argument("put(): invalid negative metadata size: " + to_string(metadata_size));
658
int64_t new_size = key.size() + value_size;
661
new_size += metadata_size;
663
if (new_size > stats_->max_cache_size_)
665
throw_logic_error(string("put(): cannot add ") + to_string(new_size) +
666
"-byte record to cache with maximum size of " + to_string(stats_->max_cache_size_));
669
auto etime = ticks(expiry_time);
670
if (stats_->policy_ == CacheDiscardPolicy::lru_only && etime != epoch_ticks())
672
throw_logic_error(string("put(): policy is lru_only, but expiry_time (") + to_string(etime) +
673
") is not infinite");
676
lock_guard<decltype(mutex_)> lock(mutex_);
678
auto atime = now_ticks();
679
if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && etime != epoch_ticks() && etime <= atime)
681
return false; // Already expired, so don't add it.
684
// The entry may or may not exist already.
685
// Work out how many bytes of space we need.
686
int64_t bytes_needed = new_size;
688
string prefixed_key = k_data(key);
690
auto old_data = get_data(prefixed_key, found);
693
bytes_needed = max(new_size - old_data.size, int64_t(0)); // new_size could be < old size
695
auto avail_bytes = stats_->max_cache_size_ - stats_->cache_size_;
697
// Make room to add or replace the entry.
698
if (bytes_needed > avail_bytes)
700
delete_at_least(bytes_needed - avail_bytes, key); // Don't delete the entry about to be updated!
703
leveldb::WriteBatch batch;
705
// Update the Data table.
706
DataTuple new_meta(atime, etime, new_size);
707
batch.Put(prefixed_key, new_meta.to_string());
709
// Add or replace the entry in the Values table.
710
prefixed_key[0] = VALUES_BEGIN[0]; // Avoid string copy.
711
batch.Put(prefixed_key, leveldb::Slice(value_data, value_size));
714
prefixed_key[0] = METADATA_BEGIN[0]; // Avoid string copy.
715
batch.Delete(prefixed_key); // In case there was metadata previously.
718
batch.Put(prefixed_key, leveldb::Slice(metadata_data, metadata_size));
721
// Update the Atime index.
722
string atime_key = k_atime_index(atime, key);
725
batch.Delete(k_atime_index(old_data.atime, key));
727
batch.Put(atime_key, to_string(new_size));
729
// Update the Etime index.
730
if (stats_->policy_ == CacheDiscardPolicy::lru_ttl)
732
if (found && old_data.etime != epoch_ticks())
734
batch.Delete(k_etime_index(old_data.etime, key));
736
// Etime index is not written to for non-expiring entries.
737
if (etime != epoch_ticks())
739
batch.Put(k_etime_index(etime, key), to_string(new_size));
744
auto s = db_->Write(write_options, &batch);
745
throw_if_error(s, "put()");
747
// Update cache size and number of entries;
748
stats_->cache_size_ = stats_->cache_size_ - old_data.size + new_size;
749
stats_->hist_increment(new_size);
752
++stats_->num_entries_;
756
stats_->hist_decrement(old_data.size);
758
assert(stats_->num_entries_ >= 0);
759
assert(stats_->num_entries_ == hist_sum(stats_->hist_));
760
assert(stats_->cache_size_ >= 0);
761
assert(stats_->cache_size_ <= stats_->max_cache_size_);
762
assert(stats_->cache_size_ == 0 || stats_->num_entries_ != 0);
763
assert(stats_->num_entries_ == 0 || stats_->cache_size_ != 0);
765
call_handler(key, CacheEventIndex::put);
770
bool PersistentStringCacheImpl::get_or_put(string const& key, string& value, PersistentStringCache::Loader load_func)
772
return get_or_put(key, value, nullptr, load_func);
775
bool PersistentStringCacheImpl::get_or_put(string const& key,
778
PersistentStringCache::Loader load_func)
782
throw_invalid_argument("get_or_put(): key must be non-empty");
785
lock_guard<decltype(mutex_)> lock(mutex_);
787
// Call the normal get() here, so the hit/miss counters and callbacks are correct.
788
if (get(key, value, metadata))
795
load_func(key, *pimpl_); // Expected to put the value.
797
catch (std::exception const& e)
799
throw runtime_error(make_message(string("get_or_put(): load_func exception: ") + e.what()));
803
throw runtime_error(make_message("get_or_put(): load_func: unknown exception"));
806
// We go for the raw DB here, to avoid counting an extra hit or miss.
808
bool loaded = get_value_and_metadata(key, dt, value, metadata);
812
bool PersistentStringCacheImpl::put_metadata(std::string const& key, std::string const& metadata)
814
return put_metadata(key, metadata.data(), metadata.size());
817
bool PersistentStringCacheImpl::put_metadata(std::string const& key, const char* metadata, int64_t metadata_size)
821
throw_invalid_argument("put_metadata(): key must be non-empty");
825
throw_invalid_argument("put_metadata(): metadata must not be nullptr");
827
if (metadata_size < 0)
829
throw_invalid_argument("put_metadata(): invalid negative size: " + to_string(metadata_size));
832
lock_guard<decltype(mutex_)> lock(mutex_);
834
string data_key = k_data(key);
836
auto dt = get_data(data_key, found);
842
int64_t old_meta_size = 0;
843
IteratorUPtr it(db_->NewIterator(read_options));
844
string metadata_key = k_metadata(key);
845
it->Seek(metadata_key);
846
if (it->Valid() && it->key().ToString() == metadata_key)
848
old_meta_size = it->value().size();
850
int64_t new_meta_size = metadata_size;
851
if (dt.size - old_meta_size + new_meta_size > stats_->max_cache_size_)
853
throw_logic_error(string("put_metadata(): cannot add ") + to_string(new_meta_size) +
854
"-byte metadata: record size (" + to_string(dt.size - old_meta_size + new_meta_size) +
855
") exceeds maximum cache size of " + to_string(stats_->max_cache_size_));
857
int64_t original_size = dt.size;
858
dt.size = dt.size - old_meta_size + new_meta_size;
860
if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && dt.etime != epoch_ticks() && dt.etime <= now_ticks())
862
return false; // Entry has expired.
865
// The new entry may be larger than the old one. If so, we may need to
866
// evict some other entries to make room. However, we exclude this
867
// record from trimming so we don't end up trimming the entry
868
// that's about to be modified.
869
if (new_meta_size > old_meta_size)
871
auto avail_bytes = stats_->max_cache_size_ - stats_->cache_size_;
872
int64_t bytes_needed = new_meta_size - old_meta_size;
873
if (bytes_needed > avail_bytes)
875
bytes_needed = min(bytes_needed, avail_bytes);
876
delete_at_least(bytes_needed, key); // Don't delete the entry about to be updated!
880
leveldb::WriteBatch batch;
882
if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && dt.etime != epoch_ticks())
884
it->Seek(k_etime_index(dt.etime, key));
886
assert(it->key().ToString() == k_etime_index(dt.etime, key));
887
batch.Put(it->key(), to_string(dt.size)); // Update Etime index with new size (expiry time is not modified).
890
batch.Put(data_key, dt.to_string()); // Update data.
891
batch.Put(metadata_key, leveldb::Slice(metadata, metadata_size)); // Update metadata.
893
it->Seek(k_atime_index(dt.atime, key));
895
assert(it->key().ToString() == k_atime_index(dt.atime, key));
896
batch.Put(it->key(), to_string(dt.size)); // Update Atime index with new size (access time is not modified).
898
auto s = db_->Write(write_options, &batch);
899
throw_if_error(s, "put_metadata(): batch write error");
901
stats_->cache_size_ = stats_->cache_size_ - old_meta_size + new_meta_size;
902
stats_->hist_increment(dt.size);
903
stats_->hist_decrement(original_size);
905
assert(stats_->num_entries_ >= 0);
906
assert(stats_->num_entries_ == hist_sum(stats_->hist_));
907
assert(stats_->cache_size_ >= 0);
908
assert(stats_->cache_size_ <= stats_->max_cache_size_);
909
assert(stats_->cache_size_ == 0 || stats_->num_entries_ != 0);
910
assert(stats_->num_entries_ == 0 || stats_->cache_size_ != 0);
915
bool PersistentStringCacheImpl::take(string const& key, string& value)
917
return take(key, value, nullptr);
920
bool PersistentStringCacheImpl::take(string const& key, string& value, string* metadata)
924
throw_invalid_argument("take(): key must be non-empty");
927
lock_guard<decltype(mutex_)> lock(mutex_);
929
string data_key = k_data(key);
932
bool found = get_value_and_metadata(key, dt, val, metadata);
935
stats_->inc_misses();
936
call_handler(key, CacheEventIndex::miss);
940
// Delete the entry whether it expired or not. Seeing that we have just done
941
// a lot of work finding it, we may as well finish the job.
942
delete_entry(key, dt);
944
if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && dt.etime != epoch_ticks() && dt.etime <= now_ticks())
946
stats_->inc_misses();
947
call_handler(key, CacheEventIndex::invalidate);
948
call_handler(key, CacheEventIndex::miss);
949
return false; // Expired entries are hidden.
953
call_handler(key, CacheEventIndex::get);
954
call_handler(key, CacheEventIndex::invalidate);
955
assert(stats_->num_entries_ == hist_sum(stats_->hist_));
959
bool PersistentStringCacheImpl::invalidate(string const& key)
963
throw_invalid_argument("invalidate(): key must be non-empty");
966
lock_guard<decltype(mutex_)> lock(mutex_);
968
string prefixed_key = k_data(key);
970
auto dt = get_data(prefixed_key, found);
976
// Delete the entry whether it expired or not. Seeing that we have just done
977
// a lot of work finding it, we may as well finish the job.
978
delete_entry(key, dt);
980
call_handler(key, CacheEventIndex::invalidate);
981
if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && dt.etime != epoch_ticks() && dt.etime < now_ticks())
983
return false; // Expired entries are hidden.
985
assert(stats_->num_entries_ == hist_sum(stats_->hist_));
989
void PersistentStringCacheImpl::invalidate(vector<string> const& keys)
991
invalidate(keys.begin(), keys.end());
994
template<typename It>
995
void PersistentStringCacheImpl::invalidate(It begin, It end)
997
lock_guard<decltype(mutex_)> lock(mutex_);
999
leveldb::WriteBatch batch;
1001
for (auto&& it = begin; it < end; ++it)
1008
auto dt = get_data(k_data(*it), found);
1013
batch_delete(*it, dt, batch);
1015
// Update cache size and entries.
1016
stats_->hist_decrement(dt.size);
1017
stats_->cache_size_ -= dt.size;
1018
assert(stats_->cache_size_ >= 0);
1019
assert(stats_->cache_size_ <= stats_->max_cache_size_);
1020
--stats_->num_entries_;
1021
assert(stats_->num_entries_ >= 0);
1022
assert(stats_->num_entries_ == hist_sum(stats_->hist_));
1023
assert(stats_->cache_size_ == 0 || stats_->num_entries_ != 0);
1024
assert(stats_->num_entries_ == 0 || stats_->cache_size_ != 0);
1026
call_handler(*it, CacheEventIndex::invalidate);
1029
auto s = db_->Write(write_options, &batch);
1030
throw_if_error(s, "invalidate(): batch write error");
1033
void PersistentStringCacheImpl::invalidate(initializer_list<std::string> const& keys)
1035
invalidate(keys.begin(), keys.end());
1038
void PersistentStringCacheImpl::invalidate()
1040
lock_guard<decltype(mutex_)> lock(mutex_);
1045
leveldb::WriteBatch batch;
1046
int64_t const batch_size = 1000;
1048
PersistentStringCache::EventCallback cb =
1049
handlers_[static_cast<underlying_type<CacheEventIndex>::type>(CacheEventIndex::invalidate)];
1051
IteratorUPtr it(db_->NewIterator(read_options));
1052
it->Seek(ALL_BEGIN);
1053
leveldb::Slice const atime_prefix = ATIME_BEGIN;
1054
leveldb::Slice const all_end = ALL_END;
1055
while (it->Valid() && it->key().compare(all_end) < 0)
1057
auto key = it->key();
1059
if (cb && key.starts_with(atime_prefix))
1061
TimeKeyTuple atk(key.ToString().substr(1));
1062
--stats_->num_entries_;
1063
auto size = stoll(it->value().ToString());
1064
stats_->cache_size_ -= size;
1065
call_handler(atk.key, CacheEventIndex::invalidate);
1067
if (++count == batch_size)
1069
auto s = db_->Write(write_options, &batch);
1070
throw_if_error(s, "invalidate(): batch write error");
1076
throw_if_error(it->status(), "invalidate(): iterator error");
1080
auto s = db_->Write(write_options, &batch);
1081
throw_if_error(s, "invalidate(): final batch write error");
1085
stats_->num_entries_ = 0;
1086
stats_->hist_clear();
1087
stats_->cache_size_ = 0;
1088
// Clear ephemeral stats too.
1093
bool PersistentStringCacheImpl::touch(string const& key, chrono::time_point<chrono::system_clock> expiry_time)
1097
throw_invalid_argument("touch(): key must be non-empty");
1100
int64_t new_etime = ticks(expiry_time);
1102
if (stats_->policy_ == CacheDiscardPolicy::lru_only && new_etime != epoch_ticks())
1104
throw_logic_error(string("touch(): policy is lru_only, but expiry_time (") + to_string(new_etime) +
1105
") is not infinite");
1108
lock_guard<decltype(mutex_)> lock(mutex_);
1110
string data_key = k_data(key);
1112
auto dt = get_data(data_key, found);
1118
int64_t now = now_ticks();
1119
if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && new_etime != epoch_ticks() && new_etime <= now)
1121
return false; // New expiry time is already older than the time now.
1124
leveldb::WriteBatch batch;
1126
string size = to_string(dt.size);
1127
batch.Delete(k_atime_index(dt.atime, key)); // Delete old Atime index entry.
1128
batch.Put(k_atime_index(now, key), size); // Write new Atime index entry.
1130
if (stats_->policy_ == CacheDiscardPolicy::lru_ttl)
1132
batch.Delete(k_etime_index(dt.etime, key)); // Delete old Etime index entry.
1133
if (new_etime != epoch_ticks())
1135
batch.Put(k_etime_index(new_etime, key), size); // Write new Etime index entry.
1139
dt.etime = new_etime;
1140
batch.Put(data_key, dt.to_string()); // Write new data.
1142
auto s = db_->Write(write_options, &batch);
1143
throw_if_error(s, "touch(): batch write error");
1145
call_handler(key, CacheEventIndex::touch);
1150
void PersistentStringCacheImpl::clear_stats() noexcept
1152
lock_guard<decltype(mutex_)> lock(mutex_);
1158
void PersistentStringCacheImpl::resize(int64_t size_in_bytes)
1160
if (size_in_bytes < 1)
1162
throw_invalid_argument("resize(): invalid size_in_bytes (" + to_string(size_in_bytes) + "): value must be > 0");
1165
lock_guard<decltype(mutex_)> lock(mutex_);
1167
if (size_in_bytes < stats_->max_cache_size_)
1169
trim_to(size_in_bytes);
1170
db_->CompactRange(nullptr, nullptr); // Avoid bulk deletions slowing down subsequent accesses.
1173
auto s = db_->Put(write_options, SETTINGS_MAX_SIZE, to_string(size_in_bytes));
1174
throw_if_error(s, "resize(): cannot write max size");
1175
stats_->max_cache_size_ = size_in_bytes;
1176
assert(stats_->num_entries_ == hist_sum(stats_->hist_));
1179
void PersistentStringCacheImpl::trim_to(int64_t used_size_in_bytes)
1181
if (used_size_in_bytes < 0)
1183
throw_invalid_argument("trim_to(): invalid used_size_in_bytes (" + to_string(used_size_in_bytes) +
1184
"): value must be >= 0");
1186
if (used_size_in_bytes > stats_->max_cache_size_)
1188
throw_logic_error(string("trim_to(): invalid used_size_in_bytes (") + to_string(used_size_in_bytes) +
1189
"): value must be <= max_size_in_bytes (" + to_string(stats_->max_cache_size_) + ")");
1192
lock_guard<decltype(mutex_)> lock(mutex_);
1194
if (used_size_in_bytes < stats_->cache_size_)
1196
delete_at_least(stats_->cache_size_ - used_size_in_bytes);
1198
assert(stats_->num_entries_ == hist_sum(stats_->hist_));
1201
void PersistentStringCacheImpl::compact()
1203
lock_guard<decltype(mutex_)> lock(mutex_);
1205
db_->CompactRange(nullptr, nullptr);
1208
void PersistentStringCacheImpl::set_handler(CacheEvent events, PersistentStringCache::EventCallback cb)
1210
static constexpr auto limit = underlying_type<CacheEvent>::type(CacheEvent::END_);
1212
auto evs = underlying_type<CacheEvent>::type(events);
1213
if (evs == 0 || evs > limit - 1)
1215
throw_invalid_argument("set_handler(): invalid events (" + to_string(evs) +
1216
"): value must be in the range [1.." + to_string(limit - 1) + "]");
1219
lock_guard<decltype(mutex_)> lock(mutex_);
1221
static constexpr auto index_limit = underlying_type<CacheEventIndex>::type(CacheEventIndex::END_);
1222
for (underlying_type<CacheEventIndex>::type i = 0; i < index_limit; ++i)
1231
void PersistentStringCacheImpl::init_db(leveldb::Options options)
1234
options.paranoid_checks = true;
1235
read_options.verify_checksums = true;
1239
auto s = leveldb::DB::Open(options, stats_->cache_path_, &db);
1240
throw_if_error(s, "cannot open or create cache");
1244
bool PersistentStringCacheImpl::cache_is_new() const
1247
auto s = db_->Get(read_options, SETTINGS_SCHEMA_VERSION, &val);
1248
throw_if_error(s, "cannot read schema version");
1249
return s.IsNotFound();
1252
void PersistentStringCacheImpl::write_version()
1254
auto s = db_->Put(write_options, SETTINGS_SCHEMA_VERSION, to_string(SCHEMA_VERSION));
1255
throw_if_error(s, "cannot read schema version");
1258
// Check if the version of the DB matches the expected version.
1259
// Pre: Version exists in the DB.
1260
// If the version can be read and make sense as a number, but
1261
// differs from the expected version, wipe the data (but
1262
// not the settings).
1263
// If the version can be read, but doesn't parse as a number, throw.
1264
// If the version matches the expected version, do nothing.
1266
void PersistentStringCacheImpl::check_version()
1268
// Check schema version.
1269
string val = "not found";
1270
auto s = db_->Get(read_options, SETTINGS_SCHEMA_VERSION, &val);
1271
throw_if_error(s, "cannot read schema version");
1272
assert(!s.IsNotFound());
1277
old_version = stoi(val);
1279
catch (std::exception const&)
1281
throw_corrupt_error("check_version(): bad version: \"" + val + "\"");
1283
if (old_version != SCHEMA_VERSION)
1285
// Wipe all tables and stats (but not settings).
1286
leveldb::WriteBatch batch;
1287
IteratorUPtr it(db_->NewIterator(read_options));
1289
it->Seek(ALL_BEGIN);
1290
leveldb::Slice const all_end(ALL_END);
1291
while (it->Valid() && it->key().compare(ALL_END) < 0)
1293
batch.Delete(it->key());
1297
// Note: any migration of settings for newer versions should happen here.
1299
// Write new schema version.
1300
batch.Put(SETTINGS_SCHEMA_VERSION, to_string(SCHEMA_VERSION));
1301
s = db_->Write(write_options, &batch);
1302
throw_if_error(s, string("cannot clear DB after version mismatch, old version = ") + to_string(old_version) +
1303
", new version = " + to_string(SCHEMA_VERSION));
1305
stats_->num_entries_ = 0;
1306
stats_->hist_clear();
1307
stats_->cache_size_ = 0;
1309
// init_stats() (called later) calls deserialize() on the stats,
1310
// so we need to create a proper stats record here.
1315
void PersistentStringCacheImpl::read_settings()
1317
// Note: Loose error checking here. If someone deliberately
1318
// corrupts the settings table by writing garbage values for valid
1319
// keys, that's just too bad. We can't protect against Machiavelli.
1323
auto s = db_->Get(read_options, SETTINGS_MAX_SIZE, &val);
1324
throw_if_error(s, "read_settings(): cannot read max size");
1325
stats_->max_cache_size_ = stoll(val);
1327
s = db_->Get(read_options, SETTINGS_POLICY, &val);
1328
throw_if_error(s, "read_settings(): cannot read policy");
1329
stats_->policy_ = static_cast<CacheDiscardPolicy>(stoi(val));
1332
void PersistentStringCacheImpl::write_settings()
1334
leveldb::WriteBatch batch;
1336
batch.Put(SETTINGS_MAX_SIZE, to_string(stats_->max_cache_size_));
1337
batch.Put(SETTINGS_POLICY, to_string(static_cast<int>(stats_->policy_)));
1339
auto s = db_->Write(write_options, &batch);
1340
throw_if_error(s, "write_settings()");
1343
void PersistentStringCacheImpl::read_stats()
1346
auto s = db_->Get(read_options, STATS_VALUES, &val);
1347
throw_if_error(s, "read_stats()");
1348
stats_->deserialize(val);
1351
void PersistentStringCacheImpl::write_stats()
1353
auto s = db_->Put(write_options, STATS_VALUES, stats_->serialize());
1354
throw_if_error(s, "write_stats()");
1357
bool PersistentStringCacheImpl::read_dirty_flag() const
1360
auto s = db_->Get(read_options, DIRTY_FLAG, &dirty);
1365
return dirty != "0";
1368
void PersistentStringCacheImpl::write_dirty_flag(bool is_dirty)
1370
auto s = db_->Put(write_options, DIRTY_FLAG, is_dirty ? "1" : "0");
1371
throw_if_error(s, "write_dirty_flag()");
1374
PersistentStringCacheImpl::DataTuple PersistentStringCacheImpl::get_data(string const& key, bool& found) const
1376
// mutex_ must be locked here!
1378
assert(key[0] == DATA_BEGIN[0]);
1381
auto s = db_->Get(read_options, key, &val);
1382
throw_if_error(s, "get_data(): cannot read data");
1383
if (!s.IsNotFound())
1386
return DataTuple(val);
1392
bool PersistentStringCacheImpl::get_value_and_metadata(string const& key,
1395
string* metadata) const
1397
// mutex_ must be locked here!
1399
// Note: key is the un-prefixed key!
1400
string prefixed_key = k_data(key);
1402
IteratorUPtr it(db_->NewIterator(read_options));
1403
it->Seek(prefixed_key);
1404
throw_if_error(it->status(), "get_value_and_metadata(): iterator error");
1405
assert(it->Valid());
1406
if (it->key() != leveldb::Slice(prefixed_key))
1411
data = DataTuple(it->value().ToString());
1412
prefixed_key[0] = VALUES_BEGIN[0]; // Avoid string copy.
1413
it->Seek(prefixed_key);
1414
assert(it->Valid() && it->key().compare(prefixed_key) == 0);
1415
value = it->value().ToString();
1418
prefixed_key[0] = METADATA_BEGIN[0]; // Avoid string copy.
1419
it->Seek(prefixed_key);
1420
if (it->key().compare(prefixed_key) == 0)
1422
*metadata = it->value().ToString();
1426
metadata->clear(); // Metadata may have been there previously, but isn't now.
1432
void PersistentStringCacheImpl::batch_delete(string const& key, DataTuple const& data, leveldb::WriteBatch& batch)
1434
// mutex_ must be locked here!
1436
string prefixed_key = k_data(key);
1437
batch.Delete(prefixed_key); // Delete data.
1438
prefixed_key[0] = VALUES_BEGIN[0]; // Avoid string copy.
1439
batch.Delete(prefixed_key); // Delete value.
1440
prefixed_key[0] = METADATA_BEGIN[0]; // Avoid string copy.
1441
batch.Delete(prefixed_key); // Delete metadata
1442
batch.Delete(k_atime_index(data.atime, key)); // Delete atime index
1443
if (stats_->policy_ == CacheDiscardPolicy::lru_ttl)
1445
string etime_key = k_etime_index(data.etime, key);
1446
batch.Delete(etime_key);
1450
void PersistentStringCacheImpl::delete_entry(string const& key, DataTuple const& data)
1452
// mutex_ must be locked here!
1454
leveldb::WriteBatch batch;
1455
batch_delete(key, data, batch);
1456
auto s = db_->Write(write_options, &batch);
1457
throw_if_error(s, "delete_entry()");
1459
// Update cache size and entries.
1460
stats_->hist_decrement(data.size);
1461
stats_->cache_size_ -= data.size;
1462
assert(stats_->cache_size_ >= 0);
1463
assert(stats_->cache_size_ <= stats_->max_cache_size_);
1464
--stats_->num_entries_;
1465
assert(stats_->num_entries_ >= 0);
1466
assert(stats_->cache_size_ == 0 || stats_->num_entries_ != 0);
1467
assert(stats_->num_entries_ == 0 || stats_->cache_size_ != 0);
1470
void PersistentStringCacheImpl::delete_at_least(int64_t bytes_needed, string const& skip_key)
1472
// mutex_ must be locked here!
1474
assert(bytes_needed > 0);
1475
assert(bytes_needed <= stats_->cache_size_);
1477
int64_t deleted_bytes = 0;
1478
int64_t deleted_entries = 0;
1480
leveldb::WriteBatch batch;
1482
// Step 1: Delete all expired entries.
1483
if (stats_->policy_ == CacheDiscardPolicy::lru_ttl)
1485
auto now_time = now_ticks();
1486
IteratorUPtr it(db_->NewIterator(read_options));
1487
leveldb::Slice const etime_prefix(ETIME_BEGIN);
1488
it->Seek(etime_prefix);
1491
if (!it->key().starts_with(etime_prefix))
1495
string etime_key = it->key().ToString();
1496
TimeKeyTuple ek(etime_key.substr(1)); // Strip prefix to create the etime/key tuple.
1497
if (!skip_key.empty() && ek.key == skip_key)
1499
// Too hard to hit with a test because the entry must expire
1500
// in between put_metadata() having decided that it's still
1501
// unexpired and now_time taken above.
1504
continue; // This entry must not be deleted (see put_metadata()).
1507
if (ek.time > now_time)
1509
break; // Anything past this point has not expired yet.
1512
string prefixed_key = k_data(ek.key);
1514
auto s = db_->Get(read_options, prefixed_key, &val);
1515
throw_if_error(s, "delete_at_least: cannot read data");
1516
DataTuple dt(move(val));
1518
int64_t size = stoll(it->value().ToString());
1519
deleted_bytes += size;
1520
bytes_needed -= size;
1522
batch_delete(ek.key, dt, batch);
1524
--stats_->num_entries_;
1525
++stats_->ttl_evictions_;
1526
stats_->hist_decrement(size);
1527
stats_->cache_size_ -= size;
1528
call_handler(ek.key, CacheEventIndex::evict_ttl);
1532
throw_if_error(it->status(), "delete_at_least(): expiry iterator error");
1533
} // Close iterator.
1535
if (deleted_entries)
1537
// Need to commit the batch here, otherwise what follows will not see the changes made above.
1538
auto s = db_->Write(write_options, &batch);
1539
throw_if_error(s, "delete_at_least(): expiry write error");
1543
// Step 2: If we still need more room, delete entries in LRU order until we have enough room.
1544
if (bytes_needed > 0)
1546
// Run over the Atime index and delete in old-to-new order.
1547
IteratorUPtr it(db_->NewIterator(read_options));
1548
leveldb::Slice const atime_prefix(ATIME_BEGIN);
1549
it->Seek(atime_prefix);
1550
while (it->Valid() && bytes_needed > 0 && it->key().starts_with(atime_prefix))
1552
TimeKeyTuple atk(it->key().ToString().substr(1)); // Strip prefix to create the atime/key tuple.
1553
if (!skip_key.empty() && atk.key == skip_key)
1556
continue; // This entry must not be deleted (see put_metadata()).
1559
int64_t size = stoll(it->value().ToString());
1560
deleted_bytes += size;
1561
bytes_needed -= size;
1565
string prefixed_key = k_data(atk.key);
1566
auto s = db_->Get(read_options, prefixed_key, &data_string);
1567
assert(!s.IsNotFound());
1568
throw_if_error(s, "delete_at_least()");
1569
DataTuple dt(move(data_string));
1570
batch_delete(atk.key, dt, batch);
1572
--stats_->num_entries_;
1573
++stats_->lru_evictions_;
1574
stats_->hist_decrement(size);
1575
stats_->cache_size_ -= size;
1576
call_handler(atk.key, CacheEventIndex::evict_lru);
1580
throw_if_error(it->status(), "delete_at_least(): LRU iterator error");
1581
assert(deleted_bytes > 0);
1582
assert(bytes_needed <= 0);
1585
auto s = db_->Write(write_options, &batch);
1586
throw_if_error(s, "delete_at_least(): LRU write error");
1588
assert(stats_->cache_size_ >= 0);
1589
assert(stats_->num_entries_ >= 0);
1590
assert(stats_->cache_size_ == 0 || stats_->num_entries_ != 0);
1591
assert(stats_->num_entries_ == 0 || stats_->cache_size_ != 0);
1594
void PersistentStringCacheImpl::call_handler(string const& key, CacheEventIndex event_index) const
1596
// mutex_ must be locked here!
1598
typedef underlying_type<CacheEventIndex>::type IndexType;
1599
auto handler = handlers_[static_cast<IndexType>(event_index)];
1604
IndexType index = static_cast<IndexType>(event_index);
1605
handler(key, static_cast<CacheEvent>(1 << index), stats_);
1614
string PersistentStringCacheImpl::make_message(leveldb::Status const& s, string const& msg) const
1616
return class_name + ": " + msg + ": " + s.ToString() + " (cache_path: " + stats_->cache_path_ + ")";
1619
string PersistentStringCacheImpl::make_message(string const& msg) const
1621
return class_name + ": " + msg + " (cache_path: " + stats_->cache_path_ + ")";
1624
void PersistentStringCacheImpl::throw_if_error(leveldb::Status const& s, string const& msg) const
1626
if (!s.ok() && !s.IsNotFound())
1628
if (s.IsCorruption())
1630
throw system_error(666, generic_category(), make_message(s, msg)); // LCOV_EXCL_LINE
1632
throw runtime_error(make_message(s, msg));
1636
void PersistentStringCacheImpl::throw_logic_error(string const& msg) const
1638
throw logic_error(make_message(msg));
1641
void PersistentStringCacheImpl::throw_invalid_argument(string const& msg) const
1643
throw invalid_argument(make_message(msg));
1646
void PersistentStringCacheImpl::throw_corrupt_error(string const& msg) const
1648
throw system_error(666, generic_category(), make_message(msg));
1651
} // namespace internal