~ci-train-bot/thumbnailer/thumbnailer-ubuntu-yakkety-landing-072

« back to all changes in this revision

Viewing changes to src/core/internal/persistent_string_cache_impl.cpp

  • Committer: CI Train Bot
  • Author(s): Michi Henning
  • Date: 2015-09-15 11:04:11 UTC
  • mfrom: (125.1.2 landing150915)
  • Revision ID: ci-train-bot@canonical.com-20150915110411-233xw0fljaq7p2o0
Landing changes on devel to trunk.
Approved by: James Henstridge

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/*
2
 
 * Copyright (C) 2015 Canonical Ltd.
3
 
 *
4
 
 * This program is free software: you can redistribute it and/or modify
5
 
 * it under the terms of the GNU Lesser General Public License version 3 as
6
 
 * published by the Free Software Foundation.
7
 
 *
8
 
 * This program is distributed in the hope that it will be useful,
9
 
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10
 
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
 
 * GNU Lesser General Public License for more details.
12
 
 *
13
 
 * You should have received a copy of the GNU Lesser General Public License
14
 
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
15
 
 *
16
 
 * Authored by: Michi Henning <michi.henning@canonical.com>
17
 
 */
18
 
 
19
 
#include <core/internal/persistent_string_cache_impl.h>
20
 
 
21
 
#include <core/internal/persistent_string_cache_stats.h>
22
 
 
23
 
#include <leveldb/cache.h>
24
 
#include <leveldb/write_batch.h>
25
 
 
26
 
#include <iomanip>
27
 
#include <iostream>
28
 
#include <system_error>
29
 
 
30
 
/*
31
 
    We have three tables and two secondary indexes in the DB:
32
 
 
33
 
    - Key -> Value
34
 
      The Values table maps keys to values.
35
 
 
36
 
    - Key -> <Access time, Expiry time, Size>
37
 
      The Data table maps keys to the access time, expire time,
38
 
      and entry size. (Size is the sum of key, value, and metadata sizes.)
39
 
 
40
 
    - Key -> Metadata
41
 
      The Metadata table maps keys to metadata for the entry.
42
 
      If no metadata exists for an entry, there is no row
43
 
      in the table.
44
 
 
45
 
    - <Access time, Key> -> Size
46
 
      The Atime index provides access in order of oldest-to-newest access time.
47
 
      This allows efficient trimming based on LRU order.
48
 
 
49
 
    - <Expiry time, Key> -> Size
50
 
      The Etime index provides access in order of soonest-to-latest expiry time.
51
 
      This allows efficient trimming of expired entries. For lru_only,
52
 
      no entry is added to this index, and the corresponding expiry time in the
53
 
      Data table is 0. For lru_ttl, only entries that actually
54
 
      do have an expiry time are added.
55
 
 
56
 
    The tables and indexes each map to a different region of the leveldb based on a prefix.
57
 
    Tuple entries are separated by spaces. Entries are sorted in lexicographical order
58
 
    by the DB; to ensure correct numerical comparison for the secondary indexes,
59
 
    times are in milliseconds since the epoch, with fixed-width width zero padding
60
 
    and 13 decimal digits. (That works out to more than 316 years past the epoch.)
61
 
 
62
 
    Some examples to illustrate how it hangs together with lru_ttl. (Note that,
63
 
    in reality, all four tables really sit inside the single leveldb table, separated
64
 
    by the prefixes of their keys. They are shown as separate tables below to
65
 
    make things easier to read.)
66
 
 
67
 
    At time 0010, insert Bjarne -> Stroustrup, expires 1010,
68
 
    at time 0020, insert Andy   -> Koenig,     expires 2020,
69
 
    at time 0030, insert Scott  -> Meyers,     does not expire
70
 
    at time 0040, insert Stan   -> Lippman,    expires 1040
71
 
 
72
 
    Values:                         Data:
73
 
 
74
 
    Key     | Value                 Key     | Access time | Expiry time | Size
75
 
    --------+-----------            --------+---------------------------------
76
 
    AAndy   | Koenig                BAndy   |      20     |    2020     |  10
77
 
    ABjarne | Stroustrup            BBjarne |      10     |    1010     |  16
78
 
    AScott  | Meyers                BScott  |      30     |       0     |  11
79
 
    AStan   | Lippman               BStan   |      40     |    1040     |  11
80
 
 
81
 
 
82
 
    Atime index:                    Etime index:
83
 
 
84
 
    Key                   | Size    Key                   | Size
85
 
    ----------------------+-----    ----------------------+-----
86
 
    D0000000000010 Bjarne |  16     E0000000001010 Bjarne |  16
87
 
    D0000000000020 Andy   |  10     E0000000001040 Stan   |  11
88
 
    D0000000000030 Scott  |  11     E0000000002020 Andy   |  10
89
 
    D0000000000040 Stan   |  11
90
 
 
91
 
    Note that, because the expiry time for Scott is infinite, no entry appears in the Etime index.
92
 
 
93
 
    At time 100, we call get("Bjarne"). This updates the Data table and Atime index with the new access time.
94
 
    (The Values table and Etime index are unchanged.)
95
 
 
96
 
    Values:                         Data:
97
 
 
98
 
    Key     | Value                 Key     | Access time | Expiry time | Size
99
 
    --------+-----------            --------+---------------------------------
100
 
    AAndy   | Koenig                BAndy   |      20     |    2020     |  10
101
 
    ABjarne | Stroustrup            BBjarne |     100     |    1010     |  16
102
 
    AScott  | Meyers                BScott  |      30     |       0     |  11
103
 
    AStan   | Lippman               BStan   |      40     |    1040     |  11
104
 
 
105
 
 
106
 
    Atime index:                    Etime index:
107
 
 
108
 
    Key                   | Size    Key                   | Size
109
 
    ----------------------+-----    ----------------------+-----
110
 
    D0000000000020 Andy   |  10     E0000000001010 Bjarne |  16
111
 
    D0000000000030 Scott  |  11     E0000000001040 Stan   |  11
112
 
    D0000000000040 Stan   |  11     E0000000002020 Andy   |  10
113
 
    D0000000000100 Bjarne |  16
114
 
 
115
 
    In other words, the Atime and Etime indexes are always sorted in earliest-to-latest order
116
 
    of expiry; this allows us to efficiently trim the cache once it is full. The sizes are
117
 
    stored redundantly so we can efficiently determine the point at which we have removed
118
 
    enough entries in order to make room for a new one.
119
 
 
120
 
    If this example were to use lru_only, the Etime index would remain empty, and the expiry times
121
 
    in the Data table would all be chrono::duration_cast<chrono::milliseconds>(chrono::time_point()).count().
122
 
    That value typically is zero (but this is not guaranteed by the standard).
123
 
 
124
 
    The Metadata table simply maps each key to its metadata. If no metadata exists
125
 
    for an entry, there is no corresponding row in the table. For example, if Andy
126
 
    and Scott had metadata, but Stan and Bjarne didn't, we'd have:
127
 
 
128
 
    Metadata table:
129
 
 
130
 
    Key     | Metadata
131
 
    --------+----------
132
 
    CAndy   |  <data>
133
 
    CScott  |  <data>
134
 
*/
135
 
 
136
 
using namespace std;
137
 
 
138
 
namespace core
139
 
{
140
 
 
141
 
namespace internal
142
 
{
143
 
 
144
 
namespace
145
 
{
146
 
 
147
 
static string const class_name = "PersistentStringCache";  // For exception messages
148
 
 
149
 
static leveldb::WriteOptions write_options;
150
 
static leveldb::ReadOptions read_options;
151
 
 
152
 
// Schema version. If the way things are written to leveldb changes, the
153
 
// schema version here must be changed, too. If an existing cache is opened
154
 
// with a different schema version, the cache is simply thrown away, so
155
 
// it will automatically be re-created using the latest schema.
156
 
 
157
 
static int const SCHEMA_VERSION = 2;  // Increment whenever schema changes!
158
 
 
159
 
// Prefixes to divide the key space into logical tables/indexes.
160
 
// All prefixes must have length 1. The end prefix must be
161
 
// the "one past the end" value for the prefix range. For example,
162
 
// "B" is the first prefix that can't be an entry in the Values table.
163
 
//
164
 
// Do not change the prefix without also checking that ALL_BEGIN and
165
 
// ALL_END are still correct!
166
 
 
167
 
static string const VALUES_BEGIN = "A";
168
 
static string const VALUES_END = "B";
169
 
 
170
 
static string const DATA_BEGIN = "B";
171
 
static string const DATA_END = "C";
172
 
 
173
 
static string const METADATA_BEGIN = "C";
174
 
static string const METADATA_END = "D";
175
 
 
176
 
static string const ATIME_BEGIN = "D";
177
 
static string const ATIME_END = "E";
178
 
 
179
 
static string const ETIME_BEGIN = "E";
180
 
static string const ETIME_END = "F";
181
 
 
182
 
// We store the stats so they are not lost across process re-starts.
183
 
 
184
 
static string const STATS_BEGIN = "X";
185
 
static string const STATS_END = "Y";
186
 
 
187
 
// The settings range stores data about the cache itself, such as
188
 
// max size and expiration policy. The prefix for this
189
 
// range must be outside the range [ALL_BEGIN..ALL_END).
190
 
// The schema version is there so we can change the way things are written into leveldb
191
 
// and detect when an old cache is opened with a newer version.
192
 
 
193
 
static string const SETTINGS_BEGIN = "Y";
194
 
static string const SETTINGS_END = "Z";
195
 
 
196
 
// This key stores a dirty flag that we set after successful open
197
 
// and clear in the destructor. This allows to detect, on start-up
198
 
// if we shut down cleanly.
199
 
 
200
 
static string const DIRTY_FLAG = "!DIRTY";
201
 
 
202
 
// These span the entire range of keys in all tables and stats (except settings and dirty flag).
203
 
 
204
 
static string const ALL_BEGIN = VALUES_BEGIN;  // Must be lowest prefix for all tables and indexes, incl stats.
205
 
static string const ALL_END = SETTINGS_BEGIN;  // Must be highest prefix for all tables and indexes, incl stats.
206
 
 
207
 
static string const SETTINGS_MAX_SIZE = SETTINGS_BEGIN + "MAX_SIZE";
208
 
static string const SETTINGS_POLICY = SETTINGS_BEGIN + "POLICY";
209
 
static string const SETTINGS_SCHEMA_VERSION = SETTINGS_BEGIN + "SCHEMA_VERSION";
210
 
 
211
 
static string const STATS_VALUES = STATS_BEGIN + "VALUES";
212
 
 
213
 
// Simple struct to serialize/deserialize a time-key tuple.
214
 
// For the stringified representation, time and key are
215
 
// separated by a space.
216
 
 
217
 
struct TimeKeyTuple
218
 
{
219
 
    int64_t time;  // msec since the epoch
220
 
    string key;
221
 
 
222
 
    TimeKeyTuple(int64_t t, string const& s)
223
 
        : time(t)
224
 
        , key(s)
225
 
    {
226
 
    }
227
 
 
228
 
    TimeKeyTuple(string const& s)
229
 
    {
230
 
        auto pos = s.find(' ');
231
 
        assert(pos != string::npos);
232
 
        string t(s.substr(0, pos));
233
 
        istringstream is(t);
234
 
        is >> time;
235
 
        assert(!is.bad());
236
 
        key = s.substr(pos + 1);
237
 
    }
238
 
 
239
 
    TimeKeyTuple(TimeKeyTuple const&) = default;
240
 
    TimeKeyTuple(TimeKeyTuple&&) = default;
241
 
 
242
 
    TimeKeyTuple& operator=(TimeKeyTuple const&) = default;
243
 
    TimeKeyTuple& operator=(TimeKeyTuple&&) = default;
244
 
 
245
 
    string to_string() const
246
 
    {
247
 
        // We zero-fill the time, so entries collate lexicographically in old-to-new order.
248
 
        ostringstream os;
249
 
        os << setfill('0') << setw(13) << time << " " << key;
250
 
        return os.str();
251
 
    }
252
 
};
253
 
 
254
 
// Key creation methods. These methods return the key into the corresponding
255
 
// table or index with the correct prefix and with tuple keys concatenated
256
 
// with a space separator.
257
 
 
258
 
string k_data(string const& key)
259
 
{
260
 
    return DATA_BEGIN + key;
261
 
}
262
 
 
263
 
string k_metadata(string const& key)
264
 
{
265
 
    return METADATA_BEGIN + key;
266
 
}
267
 
 
268
 
string k_atime_index(int64_t atime, string const& key)
269
 
{
270
 
    return ATIME_BEGIN + TimeKeyTuple(atime, key).to_string();
271
 
}
272
 
 
273
 
string k_etime_index(int64_t etime, string const& key)
274
 
{
275
 
    return ETIME_BEGIN + TimeKeyTuple(etime, key).to_string();
276
 
}
277
 
 
278
 
// Little helpers to get milliseconds since the epoch.
279
 
 
280
 
int64_t ticks(chrono::time_point<chrono::system_clock> tp) noexcept
281
 
{
282
 
    return chrono::duration_cast<chrono::milliseconds>(tp.time_since_epoch()).count();
283
 
}
284
 
 
285
 
int64_t now_ticks() noexcept
286
 
{
287
 
    return ticks(chrono::system_clock::now());
288
 
}
289
 
 
290
 
// Usually zero, but the standard doesn't guarantee this.
291
 
 
292
 
static auto const clock_origin = ticks(chrono::system_clock::time_point());
293
 
 
294
 
int64_t epoch_ticks() noexcept
295
 
{
296
 
    return clock_origin;
297
 
}
298
 
 
299
 
typedef std::unique_ptr<leveldb::Iterator> IteratorUPtr;
300
 
 
301
 
#ifndef NDEBUG
302
 
 
303
 
// For assertions, so we can verify that num_entries_ matches the sum of entries in the histogram.
304
 
 
305
 
int64_t hist_sum(PersistentCacheStats::Histogram const& h) noexcept
306
 
{
307
 
    int64_t size = 0;
308
 
    for (auto num : h)
309
 
    {
310
 
        size += num;
311
 
    }
312
 
    return size;
313
 
}
314
 
 
315
 
#endif
316
 
 
317
 
}  // namespace
318
 
 
319
 
void PersistentStringCacheImpl::init_stats()
320
 
{
321
 
    int64_t num = 0;
322
 
    int64_t size = 0;
323
 
 
324
 
    // If we shut down cleanly last time, read the saved stats values.
325
 
    bool is_dirty = read_dirty_flag();
326
 
    if (!is_dirty)
327
 
    {
328
 
        read_stats();
329
 
    }
330
 
    else
331
 
    {
332
 
        // We didn't shut down cleanly or the cache is new.
333
 
        // Run over the Atime index (it's smaller than the Data table)
334
 
        // and count the number of entries and bytes, and initialize
335
 
        // the histogram.
336
 
        IteratorUPtr it(db_->NewIterator(read_options));
337
 
        leveldb::Slice const atime_prefix(ATIME_BEGIN);
338
 
        it->Seek(atime_prefix);
339
 
        while (it->Valid() && it->key().starts_with(atime_prefix))
340
 
        {
341
 
            ++num;
342
 
            auto bytes = stoll(it->value().ToString());
343
 
            size += bytes;
344
 
            stats_->hist_increment(bytes);
345
 
            it->Next();
346
 
        }
347
 
        throw_if_error(it->status(), "cannot initialize cache");
348
 
        stats_->num_entries_ = num;
349
 
        stats_->cache_size_ = size;
350
 
    }
351
 
    assert(stats_->num_entries_ == hist_sum(stats_->hist_));
352
 
}
353
 
 
354
 
// Open existing database or create an empty one.
355
 
 
356
 
PersistentStringCacheImpl::PersistentStringCacheImpl(string const& cache_path,
357
 
                                                     int64_t max_size_in_bytes,
358
 
                                                     CacheDiscardPolicy policy,
359
 
                                                     PersistentStringCache* pimpl)
360
 
    : pimpl_(pimpl)
361
 
    , stats_(make_shared<PersistentStringCacheStats>())
362
 
{
363
 
    stats_->cache_path_ = cache_path;
364
 
    if (max_size_in_bytes < 1)
365
 
    {
366
 
        throw_invalid_argument("invalid max_size_in_bytes (" + to_string(max_size_in_bytes) + "): value must be > 0");
367
 
    }
368
 
    stats_->max_cache_size_ = max_size_in_bytes;
369
 
    stats_->policy_ = policy;
370
 
 
371
 
    leveldb::Options options;
372
 
    options.create_if_missing = true;
373
 
 
374
 
    // For small caches, reduce memory consumption by reducing the size of the internal block cache.
375
 
    // The block cache size is at least 512 kB. For caches 5-80 MB, it is 10% of the nominal cache size.
376
 
    // For caches > 80 MB, the block cache is left at the default of 8 MB.
377
 
    size_t block_cache_size = max_size_in_bytes / 10;
378
 
    if (block_cache_size < 512 * 1024)
379
 
    {
380
 
        block_cache_size = 512 * 1024;
381
 
    }
382
 
    if (block_cache_size < 8 * 1024 * 1024)
383
 
    {
384
 
        block_cache_.reset(leveldb::NewLRUCache(block_cache_size));
385
 
        options.block_cache = block_cache_.get();
386
 
    }
387
 
 
388
 
    init_db(options);
389
 
 
390
 
    if (cache_is_new())
391
 
    {
392
 
        write_version();
393
 
        write_settings();
394
 
        write_stats();
395
 
    }
396
 
    else
397
 
    {
398
 
        check_version();  // Wipes DB if version doesn't match.
399
 
        read_settings();
400
 
 
401
 
        // For an already-existing cache, check that size and policy match.
402
 
        if (stats_->max_cache_size_ != max_size_in_bytes)
403
 
        {
404
 
            throw_logic_error(string("existing cache opened with different max_size_in_bytes (") +
405
 
                              to_string(max_size_in_bytes) + "), existing size = " +
406
 
                              to_string(stats_->max_cache_size_));
407
 
        }
408
 
        if (stats_->policy_ != policy)
409
 
        {
410
 
            auto to_string = [](CacheDiscardPolicy p)
411
 
            {
412
 
                return p == core::CacheDiscardPolicy::lru_only ? "lru_only" : "lru_ttl";
413
 
            };
414
 
            string msg = string("existing cache opened with different policy (") + to_string(policy) +
415
 
                         "), existing policy = " + to_string(stats_->policy_);
416
 
            throw_logic_error(msg);
417
 
        }
418
 
    }
419
 
 
420
 
    init_stats();
421
 
    write_dirty_flag(true);
422
 
}
423
 
 
424
 
// Open existing database.
425
 
 
426
 
PersistentStringCacheImpl::PersistentStringCacheImpl(string const& cache_path, PersistentStringCache* pimpl)
427
 
    : pimpl_(pimpl)
428
 
    , stats_(make_shared<PersistentStringCacheStats>())
429
 
{
430
 
    stats_->cache_path_ = cache_path;
431
 
 
432
 
    init_db(leveldb::Options());  // Throws if DB doesn't exist.
433
 
 
434
 
    check_version();  // Wipes DB if version doesn't match.
435
 
    read_settings();
436
 
 
437
 
    init_stats();
438
 
    write_dirty_flag(true);
439
 
}
440
 
 
441
 
PersistentStringCacheImpl::~PersistentStringCacheImpl()
442
 
{
443
 
    try
444
 
    {
445
 
        write_stats();
446
 
        write_dirty_flag(false);
447
 
    }
448
 
    // LCOV_EXCL_START
449
 
    catch (std::exception const& e)
450
 
    {
451
 
        cerr << make_message(string("~PersistentStringCacheImpl(): ") + e.what()) << endl;
452
 
    }
453
 
    catch (...)
454
 
    {
455
 
        cerr << make_message("~PersistentStringCacheImpl(): unknown exception") << endl;
456
 
    }
457
 
    // LCOV_EXCL_STOP
458
 
}
459
 
 
460
 
bool PersistentStringCacheImpl::get(string const& key, string& value) const
461
 
{
462
 
    return get(key, value, nullptr);
463
 
}
464
 
 
465
 
bool PersistentStringCacheImpl::get(string const& key, string& value, string* metadata) const
466
 
{
467
 
    if (key.empty())
468
 
    {
469
 
        throw_invalid_argument("get(): key must be non-empty");
470
 
    }
471
 
 
472
 
    lock_guard<decltype(mutex_)> lock(mutex_);
473
 
 
474
 
    string data_key = k_data(key);
475
 
    DataTuple dt;
476
 
    bool found = get_value_and_metadata(key, dt, value, metadata);
477
 
    if (!found)
478
 
    {
479
 
        stats_->inc_misses();
480
 
        call_handler(key, CacheEventIndex::miss);
481
 
        return false;
482
 
    }
483
 
 
484
 
    // Don't return expired entry.
485
 
    int64_t new_atime = now_ticks();
486
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && dt.etime != epoch_ticks() && dt.etime <= new_atime)
487
 
    {
488
 
        call_handler(key, CacheEventIndex::miss);
489
 
        stats_->inc_misses();
490
 
        return false;
491
 
    }
492
 
 
493
 
    leveldb::WriteBatch batch;
494
 
 
495
 
    batch.Delete(k_atime_index(dt.atime, key));  // Delete old atime entry
496
 
    dt.atime = new_atime;
497
 
    dt.size = key.size() + value.size();
498
 
    if (metadata)
499
 
    {
500
 
        dt.size += metadata->size();
501
 
    }
502
 
    batch.Put(data_key, dt.to_string());
503
 
    batch.Put(k_atime_index(dt.atime, key), to_string(dt.size));
504
 
 
505
 
    auto s = db_->Write(write_options, &batch);
506
 
    throw_if_error(s, "put()");
507
 
 
508
 
    stats_->inc_hits();
509
 
    call_handler(key, CacheEventIndex::get);
510
 
    return true;
511
 
}
512
 
 
513
 
bool PersistentStringCacheImpl::get_metadata(string const& key, string& metadata) const
514
 
{
515
 
    if (key.empty())
516
 
    {
517
 
        throw_invalid_argument("get_metadata(): key must be non-empty");
518
 
    }
519
 
 
520
 
    lock_guard<decltype(mutex_)> lock(mutex_);
521
 
 
522
 
    string data_key = k_data(key);
523
 
    bool found;
524
 
    auto dt = get_data(data_key, found);
525
 
    if (!found)
526
 
    {
527
 
        return false;
528
 
    }
529
 
 
530
 
    // Don't return expired entry.
531
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && dt.etime != epoch_ticks() && dt.etime <= now_ticks())
532
 
    {
533
 
        return false;
534
 
    }
535
 
    auto s = db_->Get(read_options, k_metadata(key), &metadata);
536
 
    throw_if_error(s, "get_metadata()");
537
 
    return !s.IsNotFound();
538
 
}
539
 
 
540
 
bool PersistentStringCacheImpl::contains_key(string const& key) const
541
 
{
542
 
    if (key.empty())
543
 
    {
544
 
        throw_invalid_argument("contains_key(): key must be non-empty");
545
 
    }
546
 
 
547
 
    lock_guard<decltype(mutex_)> lock(mutex_);
548
 
 
549
 
    string data_key = k_data(key);
550
 
    bool found;
551
 
    auto dt = get_data(data_key, found);
552
 
    if (!found)
553
 
    {
554
 
        return false;
555
 
    }
556
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && dt.etime != epoch_ticks() && dt.etime <= now_ticks())
557
 
    {
558
 
        return false;  // Expired entries are not returned.
559
 
    }
560
 
    return true;
561
 
}
562
 
 
563
 
int64_t PersistentStringCacheImpl::size() const noexcept
564
 
{
565
 
    lock_guard<decltype(mutex_)> lock(mutex_);
566
 
 
567
 
    return stats_->num_entries_;
568
 
}
569
 
 
570
 
int64_t PersistentStringCacheImpl::size_in_bytes() const noexcept
571
 
{
572
 
    lock_guard<decltype(mutex_)> lock(mutex_);
573
 
 
574
 
    return stats_->cache_size_;
575
 
}
576
 
 
577
 
int64_t PersistentStringCacheImpl::max_size_in_bytes() const noexcept
578
 
{
579
 
    lock_guard<decltype(mutex_)> lock(mutex_);
580
 
 
581
 
    return stats_->max_cache_size_;
582
 
}
583
 
 
584
 
int64_t PersistentStringCacheImpl::disk_size_in_bytes() const
585
 
{
586
 
    lock_guard<decltype(mutex_)> lock(mutex_);
587
 
 
588
 
    leveldb::Range everything(ALL_BEGIN, SETTINGS_END);
589
 
    array<uint64_t, 1> sizes = {{0}};
590
 
    db_->GetApproximateSizes(&everything, 1, sizes.data());
591
 
    return sizes[0];
592
 
}
593
 
 
594
 
CacheDiscardPolicy PersistentStringCacheImpl::discard_policy() const noexcept
595
 
{
596
 
    return stats_->policy_;  // Immutable
597
 
}
598
 
 
599
 
PersistentCacheStats PersistentStringCacheImpl::stats() const
600
 
{
601
 
    lock_guard<decltype(mutex_)> lock(mutex_);
602
 
 
603
 
    // We make a copy here so values can't change underneath the caller.
604
 
    return PersistentCacheStats(make_shared<PersistentStringCacheStats>(*stats_));
605
 
}
606
 
 
607
 
bool PersistentStringCacheImpl::put(string const& key,
608
 
                                    string const& value,
609
 
                                    chrono::time_point<chrono::system_clock> expiry_time)
610
 
{
611
 
    return put(key, value.data(), value.size(), nullptr, 0, expiry_time);
612
 
}
613
 
 
614
 
bool PersistentStringCacheImpl::put(string const& key,
615
 
                                    char const* value_data,
616
 
                                    int64_t value_size,
617
 
                                    chrono::time_point<chrono::system_clock> expiry_time)
618
 
{
619
 
    return put(key, value_data, value_size, nullptr, 0, expiry_time);
620
 
}
621
 
 
622
 
bool PersistentStringCacheImpl::put(string const& key,
623
 
                                    string const& value,
624
 
                                    string const* metadata,
625
 
                                    chrono::time_point<chrono::system_clock> expiry_time)
626
 
{
627
 
    assert(metadata);
628
 
    return put(key,
629
 
               value.data(), value.size(),
630
 
               metadata->data(), metadata->size(),
631
 
               expiry_time);
632
 
}
633
 
 
634
 
bool PersistentStringCacheImpl::put(string const& key,
635
 
                                    char const* value_data,
636
 
                                    int64_t value_size,
637
 
                                    char const* metadata_data,
638
 
                                    int64_t metadata_size,
639
 
                                    chrono::time_point<chrono::system_clock> expiry_time)
640
 
{
641
 
    if (key.empty())
642
 
    {
643
 
        throw_invalid_argument("put(): key must be non-empty");
644
 
    }
645
 
    if (!value_data)
646
 
    {
647
 
        throw_invalid_argument("put(): value must not be nullptr");
648
 
    }
649
 
    if (value_size < 0)
650
 
    {
651
 
        throw_invalid_argument("put(): invalid negative value size: " + to_string(value_size));
652
 
    }
653
 
    if (metadata_data && metadata_size < 0)
654
 
    {
655
 
        throw_invalid_argument("put(): invalid negative metadata size: " + to_string(metadata_size));
656
 
    }
657
 
 
658
 
    int64_t new_size = key.size() + value_size;
659
 
    if (metadata_data)
660
 
    {
661
 
        new_size += metadata_size;
662
 
    }
663
 
    if (new_size > stats_->max_cache_size_)
664
 
    {
665
 
        throw_logic_error(string("put(): cannot add ") + to_string(new_size) +
666
 
                          "-byte record to cache with maximum size of " + to_string(stats_->max_cache_size_));
667
 
    }
668
 
 
669
 
    auto etime = ticks(expiry_time);
670
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_only && etime != epoch_ticks())
671
 
    {
672
 
        throw_logic_error(string("put(): policy is lru_only, but expiry_time (") + to_string(etime) +
673
 
                          ") is not infinite");
674
 
    }
675
 
 
676
 
    lock_guard<decltype(mutex_)> lock(mutex_);
677
 
 
678
 
    auto atime = now_ticks();
679
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && etime != epoch_ticks() && etime <= atime)
680
 
    {
681
 
        return false;  // Already expired, so don't add it.
682
 
    }
683
 
 
684
 
    // The entry may or may not exist already.
685
 
    // Work out how many bytes of space we need.
686
 
    int64_t bytes_needed = new_size;
687
 
 
688
 
    string prefixed_key = k_data(key);
689
 
    bool found;
690
 
    auto old_data = get_data(prefixed_key, found);
691
 
    if (found)
692
 
    {
693
 
        bytes_needed = max(new_size - old_data.size, int64_t(0));  // new_size could be < old size
694
 
    }
695
 
    auto avail_bytes = stats_->max_cache_size_ - stats_->cache_size_;
696
 
 
697
 
    // Make room to add or replace the entry.
698
 
    if (bytes_needed > avail_bytes)
699
 
    {
700
 
        delete_at_least(bytes_needed - avail_bytes, key);  // Don't delete the entry about to be updated!
701
 
    }
702
 
 
703
 
    leveldb::WriteBatch batch;
704
 
 
705
 
    // Update the Data table.
706
 
    DataTuple new_meta(atime, etime, new_size);
707
 
    batch.Put(prefixed_key, new_meta.to_string());
708
 
 
709
 
    // Add or replace the entry in the Values table.
710
 
    prefixed_key[0] = VALUES_BEGIN[0];  // Avoid string copy.
711
 
    batch.Put(prefixed_key, leveldb::Slice(value_data, value_size));
712
 
 
713
 
    // Update metadata.
714
 
    prefixed_key[0] = METADATA_BEGIN[0];  // Avoid string copy.
715
 
    batch.Delete(prefixed_key);           // In case there was metadata previously.
716
 
    if (metadata_data)
717
 
    {
718
 
        batch.Put(prefixed_key, leveldb::Slice(metadata_data, metadata_size));
719
 
    }
720
 
 
721
 
    // Update the Atime index.
722
 
    string atime_key = k_atime_index(atime, key);
723
 
    if (found)
724
 
    {
725
 
        batch.Delete(k_atime_index(old_data.atime, key));
726
 
    }
727
 
    batch.Put(atime_key, to_string(new_size));
728
 
 
729
 
    // Update the Etime index.
730
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_ttl)
731
 
    {
732
 
        if (found && old_data.etime != epoch_ticks())
733
 
        {
734
 
            batch.Delete(k_etime_index(old_data.etime, key));
735
 
        }
736
 
        // Etime index is not written to for non-expiring entries.
737
 
        if (etime != epoch_ticks())
738
 
        {
739
 
            batch.Put(k_etime_index(etime, key), to_string(new_size));
740
 
        }
741
 
    }
742
 
 
743
 
    // Write the batch.
744
 
    auto s = db_->Write(write_options, &batch);
745
 
    throw_if_error(s, "put()");
746
 
 
747
 
    // Update cache size and number of entries;
748
 
    stats_->cache_size_ = stats_->cache_size_ - old_data.size + new_size;
749
 
    stats_->hist_increment(new_size);
750
 
    if (!found)
751
 
    {
752
 
        ++stats_->num_entries_;
753
 
    }
754
 
    else
755
 
    {
756
 
        stats_->hist_decrement(old_data.size);
757
 
    }
758
 
    assert(stats_->num_entries_ >= 0);
759
 
    assert(stats_->num_entries_ == hist_sum(stats_->hist_));
760
 
    assert(stats_->cache_size_ >= 0);
761
 
    assert(stats_->cache_size_ <= stats_->max_cache_size_);
762
 
    assert(stats_->cache_size_ == 0 || stats_->num_entries_ != 0);
763
 
    assert(stats_->num_entries_ == 0 || stats_->cache_size_ != 0);
764
 
 
765
 
    call_handler(key, CacheEventIndex::put);
766
 
 
767
 
    return true;
768
 
}
769
 
 
770
 
bool PersistentStringCacheImpl::get_or_put(string const& key, string& value, PersistentStringCache::Loader load_func)
771
 
{
772
 
    return get_or_put(key, value, nullptr, load_func);
773
 
}
774
 
 
775
 
bool PersistentStringCacheImpl::get_or_put(string const& key,
776
 
                                           string& value,
777
 
                                           string* metadata,
778
 
                                           PersistentStringCache::Loader load_func)
779
 
{
780
 
    if (key.empty())
781
 
    {
782
 
        throw_invalid_argument("get_or_put(): key must be non-empty");
783
 
    }
784
 
 
785
 
    lock_guard<decltype(mutex_)> lock(mutex_);
786
 
 
787
 
    // Call the normal get() here, so the hit/miss counters and callbacks are correct.
788
 
    if (get(key, value, metadata))
789
 
    {
790
 
        return true;
791
 
    }
792
 
 
793
 
    try
794
 
    {
795
 
        load_func(key, *pimpl_);  // Expected to put the value.
796
 
    }
797
 
    catch (std::exception const& e)
798
 
    {
799
 
        throw runtime_error(make_message(string("get_or_put(): load_func exception: ") + e.what()));
800
 
    }
801
 
    catch (...)
802
 
    {
803
 
        throw runtime_error(make_message("get_or_put(): load_func: unknown exception"));
804
 
    }
805
 
 
806
 
    // We go for the raw DB here, to avoid counting an extra hit or miss.
807
 
    DataTuple dt;
808
 
    bool loaded = get_value_and_metadata(key, dt, value, metadata);
809
 
    return loaded;
810
 
}
811
 
 
812
 
bool PersistentStringCacheImpl::put_metadata(std::string const& key, std::string const& metadata)
813
 
{
814
 
    return put_metadata(key, metadata.data(), metadata.size());
815
 
}
816
 
 
817
 
bool PersistentStringCacheImpl::put_metadata(std::string const& key, const char* metadata, int64_t metadata_size)
818
 
{
819
 
    if (key.empty())
820
 
    {
821
 
        throw_invalid_argument("put_metadata(): key must be non-empty");
822
 
    }
823
 
    if (!metadata)
824
 
    {
825
 
        throw_invalid_argument("put_metadata(): metadata must not be nullptr");
826
 
    }
827
 
    if (metadata_size < 0)
828
 
    {
829
 
        throw_invalid_argument("put_metadata(): invalid negative size: " + to_string(metadata_size));
830
 
    }
831
 
 
832
 
    lock_guard<decltype(mutex_)> lock(mutex_);
833
 
 
834
 
    string data_key = k_data(key);
835
 
    bool found;
836
 
    auto dt = get_data(data_key, found);
837
 
    if (!found)
838
 
    {
839
 
        return false;
840
 
    }
841
 
 
842
 
    int64_t old_meta_size = 0;
843
 
    IteratorUPtr it(db_->NewIterator(read_options));
844
 
    string metadata_key = k_metadata(key);
845
 
    it->Seek(metadata_key);
846
 
    if (it->Valid() && it->key().ToString() == metadata_key)
847
 
    {
848
 
        old_meta_size = it->value().size();
849
 
    }
850
 
    int64_t new_meta_size = metadata_size;
851
 
    if (dt.size - old_meta_size + new_meta_size > stats_->max_cache_size_)
852
 
    {
853
 
        throw_logic_error(string("put_metadata(): cannot add ") + to_string(new_meta_size) +
854
 
                          "-byte metadata: record size (" + to_string(dt.size - old_meta_size + new_meta_size) +
855
 
                          ") exceeds maximum cache size of " + to_string(stats_->max_cache_size_));
856
 
    }
857
 
    int64_t original_size = dt.size;
858
 
    dt.size = dt.size - old_meta_size + new_meta_size;
859
 
 
860
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && dt.etime != epoch_ticks() && dt.etime <= now_ticks())
861
 
    {
862
 
        return false;  // Entry has expired.
863
 
    }
864
 
 
865
 
    // The new entry may be larger than the old one. If so, we may need to
866
 
    // evict some other entries to make room. However, we exclude this
867
 
    // record from trimming so we don't end up trimming the entry
868
 
    // that's about to be modified.
869
 
    if (new_meta_size > old_meta_size)
870
 
    {
871
 
        auto avail_bytes = stats_->max_cache_size_ - stats_->cache_size_;
872
 
        int64_t bytes_needed = new_meta_size - old_meta_size;
873
 
        if (bytes_needed > avail_bytes)
874
 
        {
875
 
            bytes_needed = min(bytes_needed, avail_bytes);
876
 
            delete_at_least(bytes_needed, key);  // Don't delete the entry about to be updated!
877
 
        }
878
 
    }
879
 
 
880
 
    leveldb::WriteBatch batch;
881
 
 
882
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && dt.etime != epoch_ticks())
883
 
    {
884
 
        it->Seek(k_etime_index(dt.etime, key));
885
 
        assert(it->Valid());
886
 
        assert(it->key().ToString() == k_etime_index(dt.etime, key));
887
 
        batch.Put(it->key(), to_string(dt.size));  // Update Etime index with new size (expiry time is not modified).
888
 
    }
889
 
 
890
 
    batch.Put(data_key, dt.to_string());                               // Update data.
891
 
    batch.Put(metadata_key, leveldb::Slice(metadata, metadata_size));  // Update metadata.
892
 
 
893
 
    it->Seek(k_atime_index(dt.atime, key));
894
 
    assert(it->Valid());
895
 
    assert(it->key().ToString() == k_atime_index(dt.atime, key));
896
 
    batch.Put(it->key(), to_string(dt.size));  // Update Atime index with new size (access time is not modified).
897
 
 
898
 
    auto s = db_->Write(write_options, &batch);
899
 
    throw_if_error(s, "put_metadata(): batch write error");
900
 
 
901
 
    stats_->cache_size_ = stats_->cache_size_ - old_meta_size + new_meta_size;
902
 
    stats_->hist_increment(dt.size);
903
 
    stats_->hist_decrement(original_size);
904
 
 
905
 
    assert(stats_->num_entries_ >= 0);
906
 
    assert(stats_->num_entries_ == hist_sum(stats_->hist_));
907
 
    assert(stats_->cache_size_ >= 0);
908
 
    assert(stats_->cache_size_ <= stats_->max_cache_size_);
909
 
    assert(stats_->cache_size_ == 0 || stats_->num_entries_ != 0);
910
 
    assert(stats_->num_entries_ == 0 || stats_->cache_size_ != 0);
911
 
 
912
 
    return true;
913
 
}
914
 
 
915
 
bool PersistentStringCacheImpl::take(string const& key, string& value)
916
 
{
917
 
    return take(key, value, nullptr);
918
 
}
919
 
 
920
 
bool PersistentStringCacheImpl::take(string const& key, string& value, string* metadata)
921
 
{
922
 
    if (key.empty())
923
 
    {
924
 
        throw_invalid_argument("take(): key must be non-empty");
925
 
    }
926
 
 
927
 
    lock_guard<decltype(mutex_)> lock(mutex_);
928
 
 
929
 
    string data_key = k_data(key);
930
 
    DataTuple dt;
931
 
    string val;
932
 
    bool found = get_value_and_metadata(key, dt, val, metadata);
933
 
    if (!found)
934
 
    {
935
 
        stats_->inc_misses();
936
 
        call_handler(key, CacheEventIndex::miss);
937
 
        return false;
938
 
    }
939
 
 
940
 
    // Delete the entry whether it expired or not. Seeing that we have just done
941
 
    // a lot of work finding it, we may as well finish the job.
942
 
    delete_entry(key, dt);
943
 
 
944
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && dt.etime != epoch_ticks() && dt.etime <= now_ticks())
945
 
    {
946
 
        stats_->inc_misses();
947
 
        call_handler(key, CacheEventIndex::invalidate);
948
 
        call_handler(key, CacheEventIndex::miss);
949
 
        return false;  // Expired entries are hidden.
950
 
    }
951
 
    stats_->inc_hits();
952
 
    value = move(val);
953
 
    call_handler(key, CacheEventIndex::get);
954
 
    call_handler(key, CacheEventIndex::invalidate);
955
 
    assert(stats_->num_entries_ == hist_sum(stats_->hist_));
956
 
    return true;
957
 
}
958
 
 
959
 
bool PersistentStringCacheImpl::invalidate(string const& key)
960
 
{
961
 
    if (key.empty())
962
 
    {
963
 
        throw_invalid_argument("invalidate(): key must be non-empty");
964
 
    }
965
 
 
966
 
    lock_guard<decltype(mutex_)> lock(mutex_);
967
 
 
968
 
    string prefixed_key = k_data(key);
969
 
    bool found;
970
 
    auto dt = get_data(prefixed_key, found);
971
 
    if (!found)
972
 
    {
973
 
        return false;
974
 
    }
975
 
 
976
 
    // Delete the entry whether it expired or not. Seeing that we have just done
977
 
    // a lot of work finding it, we may as well finish the job.
978
 
    delete_entry(key, dt);
979
 
 
980
 
    call_handler(key, CacheEventIndex::invalidate);
981
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && dt.etime != epoch_ticks() && dt.etime < now_ticks())
982
 
    {
983
 
        return false;  // Expired entries are hidden.
984
 
    }
985
 
    assert(stats_->num_entries_ == hist_sum(stats_->hist_));
986
 
    return true;
987
 
}
988
 
 
989
 
void PersistentStringCacheImpl::invalidate(vector<string> const& keys)
990
 
{
991
 
    invalidate(keys.begin(), keys.end());
992
 
}
993
 
 
994
 
template<typename It>
995
 
void PersistentStringCacheImpl::invalidate(It begin, It end)
996
 
{
997
 
    lock_guard<decltype(mutex_)> lock(mutex_);
998
 
 
999
 
    leveldb::WriteBatch batch;
1000
 
 
1001
 
    for (auto&& it = begin; it < end; ++it)
1002
 
    {
1003
 
        if (it->empty())
1004
 
        {
1005
 
            continue;
1006
 
        }
1007
 
        bool found;
1008
 
        auto dt = get_data(k_data(*it), found);
1009
 
        if (!found)
1010
 
        {
1011
 
            continue;
1012
 
        }
1013
 
        batch_delete(*it, dt, batch);
1014
 
 
1015
 
        // Update cache size and entries.
1016
 
        stats_->hist_decrement(dt.size);
1017
 
        stats_->cache_size_ -= dt.size;
1018
 
        assert(stats_->cache_size_ >= 0);
1019
 
        assert(stats_->cache_size_ <= stats_->max_cache_size_);
1020
 
        --stats_->num_entries_;
1021
 
        assert(stats_->num_entries_ >= 0);
1022
 
        assert(stats_->num_entries_ == hist_sum(stats_->hist_));
1023
 
        assert(stats_->cache_size_ == 0 || stats_->num_entries_ != 0);
1024
 
        assert(stats_->num_entries_ == 0 || stats_->cache_size_ != 0);
1025
 
 
1026
 
        call_handler(*it, CacheEventIndex::invalidate);
1027
 
    }
1028
 
 
1029
 
    auto s = db_->Write(write_options, &batch);
1030
 
    throw_if_error(s, "invalidate(): batch write error");
1031
 
}
1032
 
 
1033
 
void PersistentStringCacheImpl::invalidate(initializer_list<std::string> const& keys)
1034
 
{
1035
 
    invalidate(keys.begin(), keys.end());
1036
 
}
1037
 
 
1038
 
void PersistentStringCacheImpl::invalidate()
1039
 
{
1040
 
    lock_guard<decltype(mutex_)> lock(mutex_);
1041
 
 
1042
 
    {
1043
 
        int64_t count = 0;
1044
 
 
1045
 
        leveldb::WriteBatch batch;
1046
 
        int64_t const batch_size = 1000;
1047
 
 
1048
 
        PersistentStringCache::EventCallback cb =
1049
 
            handlers_[static_cast<underlying_type<CacheEventIndex>::type>(CacheEventIndex::invalidate)];
1050
 
 
1051
 
        IteratorUPtr it(db_->NewIterator(read_options));
1052
 
        it->Seek(ALL_BEGIN);
1053
 
        leveldb::Slice const atime_prefix = ATIME_BEGIN;
1054
 
        leveldb::Slice const all_end = ALL_END;
1055
 
        while (it->Valid() && it->key().compare(all_end) < 0)
1056
 
        {
1057
 
            auto key = it->key();
1058
 
            batch.Delete(key);
1059
 
            if (cb && key.starts_with(atime_prefix))
1060
 
            {
1061
 
                TimeKeyTuple atk(key.ToString().substr(1));
1062
 
                --stats_->num_entries_;
1063
 
                auto size = stoll(it->value().ToString());
1064
 
                stats_->cache_size_ -= size;
1065
 
                call_handler(atk.key, CacheEventIndex::invalidate);
1066
 
            }
1067
 
            if (++count == batch_size)
1068
 
            {
1069
 
                auto s = db_->Write(write_options, &batch);
1070
 
                throw_if_error(s, "invalidate(): batch write error");
1071
 
                batch.Clear();
1072
 
                count = 0;
1073
 
            }
1074
 
            it->Next();
1075
 
        }
1076
 
        throw_if_error(it->status(), "invalidate(): iterator error");
1077
 
 
1078
 
        if (count != 0)
1079
 
        {
1080
 
            auto s = db_->Write(write_options, &batch);
1081
 
            throw_if_error(s, "invalidate(): final batch write error");
1082
 
        }
1083
 
    }  // Close batch
1084
 
 
1085
 
    stats_->num_entries_ = 0;
1086
 
    stats_->hist_clear();
1087
 
    stats_->cache_size_ = 0;
1088
 
    // Clear ephemeral stats too.
1089
 
    stats_->clear();
1090
 
    write_stats();
1091
 
}
1092
 
 
1093
 
bool PersistentStringCacheImpl::touch(string const& key, chrono::time_point<chrono::system_clock> expiry_time)
1094
 
{
1095
 
    if (key.empty())
1096
 
    {
1097
 
        throw_invalid_argument("touch(): key must be non-empty");
1098
 
    }
1099
 
 
1100
 
    int64_t new_etime = ticks(expiry_time);
1101
 
 
1102
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_only && new_etime != epoch_ticks())
1103
 
    {
1104
 
        throw_logic_error(string("touch(): policy is lru_only, but expiry_time (") + to_string(new_etime) +
1105
 
                          ") is not infinite");
1106
 
    }
1107
 
 
1108
 
    lock_guard<decltype(mutex_)> lock(mutex_);
1109
 
 
1110
 
    string data_key = k_data(key);
1111
 
    bool found;
1112
 
    auto dt = get_data(data_key, found);
1113
 
    if (!found)
1114
 
    {
1115
 
        return false;
1116
 
    }
1117
 
 
1118
 
    int64_t now = now_ticks();
1119
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_ttl && new_etime != epoch_ticks() && new_etime <= now)
1120
 
    {
1121
 
        return false;  // New expiry time is already older than the time now.
1122
 
    }
1123
 
 
1124
 
    leveldb::WriteBatch batch;
1125
 
 
1126
 
    string size = to_string(dt.size);
1127
 
    batch.Delete(k_atime_index(dt.atime, key));  // Delete old Atime index entry.
1128
 
    batch.Put(k_atime_index(now, key), size);    // Write new Atime index entry.
1129
 
 
1130
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_ttl)
1131
 
    {
1132
 
        batch.Delete(k_etime_index(dt.etime, key));  // Delete old Etime index entry.
1133
 
        if (new_etime != epoch_ticks())
1134
 
        {
1135
 
            batch.Put(k_etime_index(new_etime, key), size);  // Write new Etime index entry.
1136
 
        }
1137
 
    }
1138
 
    dt.atime = now;
1139
 
    dt.etime = new_etime;
1140
 
    batch.Put(data_key, dt.to_string());  // Write new data.
1141
 
 
1142
 
    auto s = db_->Write(write_options, &batch);
1143
 
    throw_if_error(s, "touch(): batch write error");
1144
 
 
1145
 
    call_handler(key, CacheEventIndex::touch);
1146
 
 
1147
 
    return true;
1148
 
}
1149
 
 
1150
 
void PersistentStringCacheImpl::clear_stats() noexcept
1151
 
{
1152
 
    lock_guard<decltype(mutex_)> lock(mutex_);
1153
 
 
1154
 
    stats_->clear();
1155
 
    write_stats();
1156
 
}
1157
 
 
1158
 
void PersistentStringCacheImpl::resize(int64_t size_in_bytes)
1159
 
{
1160
 
    if (size_in_bytes < 1)
1161
 
    {
1162
 
        throw_invalid_argument("resize(): invalid size_in_bytes (" + to_string(size_in_bytes) + "): value must be > 0");
1163
 
    }
1164
 
 
1165
 
    lock_guard<decltype(mutex_)> lock(mutex_);
1166
 
 
1167
 
    if (size_in_bytes < stats_->max_cache_size_)
1168
 
    {
1169
 
        trim_to(size_in_bytes);
1170
 
        db_->CompactRange(nullptr, nullptr);  // Avoid bulk deletions slowing down subsequent accesses.
1171
 
    }
1172
 
 
1173
 
    auto s = db_->Put(write_options, SETTINGS_MAX_SIZE, to_string(size_in_bytes));
1174
 
    throw_if_error(s, "resize(): cannot write max size");
1175
 
    stats_->max_cache_size_ = size_in_bytes;
1176
 
    assert(stats_->num_entries_ == hist_sum(stats_->hist_));
1177
 
}
1178
 
 
1179
 
void PersistentStringCacheImpl::trim_to(int64_t used_size_in_bytes)
1180
 
{
1181
 
    if (used_size_in_bytes < 0)
1182
 
    {
1183
 
        throw_invalid_argument("trim_to(): invalid used_size_in_bytes (" + to_string(used_size_in_bytes) +
1184
 
                               "): value must be >= 0");
1185
 
    }
1186
 
    if (used_size_in_bytes > stats_->max_cache_size_)
1187
 
    {
1188
 
        throw_logic_error(string("trim_to(): invalid used_size_in_bytes (") + to_string(used_size_in_bytes) +
1189
 
                          "): value must be <= max_size_in_bytes (" + to_string(stats_->max_cache_size_) + ")");
1190
 
    }
1191
 
 
1192
 
    lock_guard<decltype(mutex_)> lock(mutex_);
1193
 
 
1194
 
    if (used_size_in_bytes < stats_->cache_size_)
1195
 
    {
1196
 
        delete_at_least(stats_->cache_size_ - used_size_in_bytes);
1197
 
    }
1198
 
    assert(stats_->num_entries_ == hist_sum(stats_->hist_));
1199
 
}
1200
 
 
1201
 
void PersistentStringCacheImpl::compact()
1202
 
{
1203
 
    lock_guard<decltype(mutex_)> lock(mutex_);
1204
 
 
1205
 
    db_->CompactRange(nullptr, nullptr);
1206
 
}
1207
 
 
1208
 
void PersistentStringCacheImpl::set_handler(CacheEvent events, PersistentStringCache::EventCallback cb)
1209
 
{
1210
 
    static constexpr auto limit = underlying_type<CacheEvent>::type(CacheEvent::END_);
1211
 
 
1212
 
    auto evs = underlying_type<CacheEvent>::type(events);
1213
 
    if (evs == 0 || evs > limit - 1)
1214
 
    {
1215
 
        throw_invalid_argument("set_handler(): invalid events (" + to_string(evs) +
1216
 
                               "): value must be in the range [1.." + to_string(limit - 1) + "]");
1217
 
    }
1218
 
 
1219
 
    lock_guard<decltype(mutex_)> lock(mutex_);
1220
 
 
1221
 
    static constexpr auto index_limit = underlying_type<CacheEventIndex>::type(CacheEventIndex::END_);
1222
 
    for (underlying_type<CacheEventIndex>::type i = 0; i < index_limit; ++i)
1223
 
    {
1224
 
        if ((evs >> i) & 1)
1225
 
        {
1226
 
            handlers_[i] = cb;
1227
 
        }
1228
 
    }
1229
 
}
1230
 
 
1231
 
void PersistentStringCacheImpl::init_db(leveldb::Options options)
1232
 
{
1233
 
#ifndef NDEBUG
1234
 
    options.paranoid_checks = true;
1235
 
    read_options.verify_checksums = true;
1236
 
#endif
1237
 
 
1238
 
    leveldb::DB* db;
1239
 
    auto s = leveldb::DB::Open(options, stats_->cache_path_, &db);
1240
 
    throw_if_error(s, "cannot open or create cache");
1241
 
    db_.reset(db);
1242
 
}
1243
 
 
1244
 
bool PersistentStringCacheImpl::cache_is_new() const
1245
 
{
1246
 
    string val;
1247
 
    auto s = db_->Get(read_options, SETTINGS_SCHEMA_VERSION, &val);
1248
 
    throw_if_error(s, "cannot read schema version");
1249
 
    return s.IsNotFound();
1250
 
}
1251
 
 
1252
 
void PersistentStringCacheImpl::write_version()
1253
 
{
1254
 
    auto s = db_->Put(write_options, SETTINGS_SCHEMA_VERSION, to_string(SCHEMA_VERSION));
1255
 
    throw_if_error(s, "cannot read schema version");
1256
 
}
1257
 
 
1258
 
// Check if the version of the DB matches the expected version.
1259
 
// Pre: Version exists in the DB.
1260
 
// If the version can be read and make sense as a number, but
1261
 
// differs from the expected version, wipe the data (but
1262
 
// not the settings).
1263
 
// If the version can be read, but doesn't parse as a number, throw.
1264
 
// If the version matches the expected version, do nothing.
1265
 
 
1266
 
void PersistentStringCacheImpl::check_version()
1267
 
{
1268
 
    // Check schema version.
1269
 
    string val = "not found";
1270
 
    auto s = db_->Get(read_options, SETTINGS_SCHEMA_VERSION, &val);
1271
 
    throw_if_error(s, "cannot read schema version");
1272
 
    assert(!s.IsNotFound());
1273
 
 
1274
 
    int old_version;
1275
 
    try
1276
 
    {
1277
 
        old_version = stoi(val);
1278
 
    }
1279
 
    catch (std::exception const&)
1280
 
    {
1281
 
        throw_corrupt_error("check_version(): bad version: \"" + val + "\"");
1282
 
    }
1283
 
    if (old_version != SCHEMA_VERSION)
1284
 
    {
1285
 
        // Wipe all tables and stats (but not settings).
1286
 
        leveldb::WriteBatch batch;
1287
 
        IteratorUPtr it(db_->NewIterator(read_options));
1288
 
 
1289
 
        it->Seek(ALL_BEGIN);
1290
 
        leveldb::Slice const all_end(ALL_END);
1291
 
        while (it->Valid() && it->key().compare(ALL_END) < 0)
1292
 
        {
1293
 
            batch.Delete(it->key());
1294
 
            it->Next();
1295
 
        }
1296
 
 
1297
 
        // Note: any migration of settings for newer versions should happen here.
1298
 
 
1299
 
        // Write new schema version.
1300
 
        batch.Put(SETTINGS_SCHEMA_VERSION, to_string(SCHEMA_VERSION));
1301
 
        s = db_->Write(write_options, &batch);
1302
 
        throw_if_error(s, string("cannot clear DB after version mismatch, old version = ") + to_string(old_version) +
1303
 
                              ", new version = " + to_string(SCHEMA_VERSION));
1304
 
 
1305
 
        stats_->num_entries_ = 0;
1306
 
        stats_->hist_clear();
1307
 
        stats_->cache_size_ = 0;
1308
 
 
1309
 
        // init_stats() (called later) calls deserialize() on the stats,
1310
 
        // so we need to create a proper stats record here.
1311
 
        write_stats();
1312
 
    }
1313
 
}
1314
 
 
1315
 
void PersistentStringCacheImpl::read_settings()
1316
 
{
1317
 
    // Note: Loose error checking here. If someone deliberately
1318
 
    //       corrupts the settings table by writing garbage values for valid
1319
 
    //       keys, that's just too bad. We can't protect against Machiavelli.
1320
 
 
1321
 
    string val;
1322
 
 
1323
 
    auto s = db_->Get(read_options, SETTINGS_MAX_SIZE, &val);
1324
 
    throw_if_error(s, "read_settings(): cannot read max size");
1325
 
    stats_->max_cache_size_ = stoll(val);
1326
 
 
1327
 
    s = db_->Get(read_options, SETTINGS_POLICY, &val);
1328
 
    throw_if_error(s, "read_settings(): cannot read policy");
1329
 
    stats_->policy_ = static_cast<CacheDiscardPolicy>(stoi(val));
1330
 
}
1331
 
 
1332
 
void PersistentStringCacheImpl::write_settings()
1333
 
{
1334
 
    leveldb::WriteBatch batch;
1335
 
 
1336
 
    batch.Put(SETTINGS_MAX_SIZE, to_string(stats_->max_cache_size_));
1337
 
    batch.Put(SETTINGS_POLICY, to_string(static_cast<int>(stats_->policy_)));
1338
 
 
1339
 
    auto s = db_->Write(write_options, &batch);
1340
 
    throw_if_error(s, "write_settings()");
1341
 
}
1342
 
 
1343
 
void PersistentStringCacheImpl::read_stats()
1344
 
{
1345
 
    string val;
1346
 
    auto s = db_->Get(read_options, STATS_VALUES, &val);
1347
 
    throw_if_error(s, "read_stats()");
1348
 
    stats_->deserialize(val);
1349
 
}
1350
 
 
1351
 
void PersistentStringCacheImpl::write_stats()
1352
 
{
1353
 
    auto s = db_->Put(write_options, STATS_VALUES, stats_->serialize());
1354
 
    throw_if_error(s, "write_stats()");
1355
 
}
1356
 
 
1357
 
bool PersistentStringCacheImpl::read_dirty_flag() const
1358
 
{
1359
 
    string dirty;
1360
 
    auto s = db_->Get(read_options, DIRTY_FLAG, &dirty);
1361
 
    if (s.IsNotFound())
1362
 
    {
1363
 
        return true;
1364
 
    }
1365
 
    return dirty != "0";
1366
 
}
1367
 
 
1368
 
void PersistentStringCacheImpl::write_dirty_flag(bool is_dirty)
1369
 
{
1370
 
    auto s = db_->Put(write_options, DIRTY_FLAG, is_dirty ? "1" : "0");
1371
 
    throw_if_error(s, "write_dirty_flag()");
1372
 
}
1373
 
 
1374
 
PersistentStringCacheImpl::DataTuple PersistentStringCacheImpl::get_data(string const& key, bool& found) const
1375
 
{
1376
 
    // mutex_ must be locked here!
1377
 
 
1378
 
    assert(key[0] == DATA_BEGIN[0]);
1379
 
 
1380
 
    string val;
1381
 
    auto s = db_->Get(read_options, key, &val);
1382
 
    throw_if_error(s, "get_data(): cannot read data");
1383
 
    if (!s.IsNotFound())
1384
 
    {
1385
 
        found = true;
1386
 
        return DataTuple(val);
1387
 
    }
1388
 
    found = false;
1389
 
    return DataTuple();
1390
 
}
1391
 
 
1392
 
bool PersistentStringCacheImpl::get_value_and_metadata(string const& key,
1393
 
                                                       DataTuple& data,
1394
 
                                                       string& value,
1395
 
                                                       string* metadata) const
1396
 
{
1397
 
    // mutex_ must be locked here!
1398
 
 
1399
 
    // Note: key is the un-prefixed key!
1400
 
    string prefixed_key = k_data(key);
1401
 
 
1402
 
    IteratorUPtr it(db_->NewIterator(read_options));
1403
 
    it->Seek(prefixed_key);
1404
 
    throw_if_error(it->status(), "get_value_and_metadata(): iterator error");
1405
 
    assert(it->Valid());
1406
 
    if (it->key() != leveldb::Slice(prefixed_key))
1407
 
    {
1408
 
        return false;
1409
 
    }
1410
 
 
1411
 
    data = DataTuple(it->value().ToString());
1412
 
    prefixed_key[0] = VALUES_BEGIN[0];  // Avoid string copy.
1413
 
    it->Seek(prefixed_key);
1414
 
    assert(it->Valid() && it->key().compare(prefixed_key) == 0);
1415
 
    value = it->value().ToString();
1416
 
    if (metadata)
1417
 
    {
1418
 
        prefixed_key[0] = METADATA_BEGIN[0];  // Avoid string copy.
1419
 
        it->Seek(prefixed_key);
1420
 
        if (it->key().compare(prefixed_key) == 0)
1421
 
        {
1422
 
            *metadata = it->value().ToString();
1423
 
        }
1424
 
        else
1425
 
        {
1426
 
            metadata->clear();  // Metadata may have been there previously, but isn't now.
1427
 
        }
1428
 
    }
1429
 
    return true;
1430
 
}
1431
 
 
1432
 
void PersistentStringCacheImpl::batch_delete(string const& key, DataTuple const& data, leveldb::WriteBatch& batch)
1433
 
{
1434
 
    // mutex_ must be locked here!
1435
 
 
1436
 
    string prefixed_key = k_data(key);
1437
 
    batch.Delete(prefixed_key);                    // Delete data.
1438
 
    prefixed_key[0] = VALUES_BEGIN[0];             // Avoid string copy.
1439
 
    batch.Delete(prefixed_key);                    // Delete value.
1440
 
    prefixed_key[0] = METADATA_BEGIN[0];           // Avoid string copy.
1441
 
    batch.Delete(prefixed_key);                    // Delete metadata
1442
 
    batch.Delete(k_atime_index(data.atime, key));  // Delete atime index
1443
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_ttl)
1444
 
    {
1445
 
        string etime_key = k_etime_index(data.etime, key);
1446
 
        batch.Delete(etime_key);
1447
 
    }
1448
 
}
1449
 
 
1450
 
void PersistentStringCacheImpl::delete_entry(string const& key, DataTuple const& data)
1451
 
{
1452
 
    // mutex_ must be locked here!
1453
 
 
1454
 
    leveldb::WriteBatch batch;
1455
 
    batch_delete(key, data, batch);
1456
 
    auto s = db_->Write(write_options, &batch);
1457
 
    throw_if_error(s, "delete_entry()");
1458
 
 
1459
 
    // Update cache size and entries.
1460
 
    stats_->hist_decrement(data.size);
1461
 
    stats_->cache_size_ -= data.size;
1462
 
    assert(stats_->cache_size_ >= 0);
1463
 
    assert(stats_->cache_size_ <= stats_->max_cache_size_);
1464
 
    --stats_->num_entries_;
1465
 
    assert(stats_->num_entries_ >= 0);
1466
 
    assert(stats_->cache_size_ == 0 || stats_->num_entries_ != 0);
1467
 
    assert(stats_->num_entries_ == 0 || stats_->cache_size_ != 0);
1468
 
}
1469
 
 
1470
 
void PersistentStringCacheImpl::delete_at_least(int64_t bytes_needed, string const& skip_key)
1471
 
{
1472
 
    // mutex_ must be locked here!
1473
 
 
1474
 
    assert(bytes_needed > 0);
1475
 
    assert(bytes_needed <= stats_->cache_size_);
1476
 
 
1477
 
    int64_t deleted_bytes = 0;
1478
 
    int64_t deleted_entries = 0;
1479
 
 
1480
 
    leveldb::WriteBatch batch;
1481
 
 
1482
 
    // Step 1: Delete all expired entries.
1483
 
    if (stats_->policy_ == CacheDiscardPolicy::lru_ttl)
1484
 
    {
1485
 
        auto now_time = now_ticks();
1486
 
        IteratorUPtr it(db_->NewIterator(read_options));
1487
 
        leveldb::Slice const etime_prefix(ETIME_BEGIN);
1488
 
        it->Seek(etime_prefix);
1489
 
        while (it->Valid())
1490
 
        {
1491
 
            if (!it->key().starts_with(etime_prefix))
1492
 
            {
1493
 
                break;
1494
 
            }
1495
 
            string etime_key = it->key().ToString();
1496
 
            TimeKeyTuple ek(etime_key.substr(1));  // Strip prefix to create the etime/key tuple.
1497
 
            if (!skip_key.empty() && ek.key == skip_key)
1498
 
            {
1499
 
                // Too hard to hit with a test because the entry must expire
1500
 
                // in between put_metadata() having decided that it's still
1501
 
                // unexpired and now_time taken above.
1502
 
                // LCOV_EXCL_START
1503
 
                it->Next();
1504
 
                continue;  // This entry must not be deleted (see put_metadata()).
1505
 
                // LCOV_EXCL_STOP
1506
 
            }
1507
 
            if (ek.time > now_time)
1508
 
            {
1509
 
                break;  // Anything past this point has not expired yet.
1510
 
            }
1511
 
 
1512
 
            string prefixed_key = k_data(ek.key);
1513
 
            string val;
1514
 
            auto s = db_->Get(read_options, prefixed_key, &val);
1515
 
            throw_if_error(s, "delete_at_least: cannot read data");
1516
 
            DataTuple dt(move(val));
1517
 
 
1518
 
            int64_t size = stoll(it->value().ToString());
1519
 
            deleted_bytes += size;
1520
 
            bytes_needed -= size;
1521
 
            ++deleted_entries;
1522
 
            batch_delete(ek.key, dt, batch);
1523
 
 
1524
 
            --stats_->num_entries_;
1525
 
            ++stats_->ttl_evictions_;
1526
 
            stats_->hist_decrement(size);
1527
 
            stats_->cache_size_ -= size;
1528
 
            call_handler(ek.key, CacheEventIndex::evict_ttl);
1529
 
 
1530
 
            it->Next();
1531
 
        }
1532
 
        throw_if_error(it->status(), "delete_at_least(): expiry iterator error");
1533
 
    }  // Close iterator.
1534
 
 
1535
 
    if (deleted_entries)
1536
 
    {
1537
 
        // Need to commit the batch here, otherwise what follows will not see the changes made above.
1538
 
        auto s = db_->Write(write_options, &batch);
1539
 
        throw_if_error(s, "delete_at_least(): expiry write error");
1540
 
        batch.Clear();
1541
 
    }
1542
 
 
1543
 
    // Step 2: If we still need more room, delete entries in LRU order until we have enough room.
1544
 
    if (bytes_needed > 0)
1545
 
    {
1546
 
        // Run over the Atime index and delete in old-to-new order.
1547
 
        IteratorUPtr it(db_->NewIterator(read_options));
1548
 
        leveldb::Slice const atime_prefix(ATIME_BEGIN);
1549
 
        it->Seek(atime_prefix);
1550
 
        while (it->Valid() && bytes_needed > 0 && it->key().starts_with(atime_prefix))
1551
 
        {
1552
 
            TimeKeyTuple atk(it->key().ToString().substr(1));  // Strip prefix to create the atime/key tuple.
1553
 
            if (!skip_key.empty() && atk.key == skip_key)
1554
 
            {
1555
 
                it->Next();
1556
 
                continue;  // This entry must not be deleted (see put_metadata()).
1557
 
            }
1558
 
 
1559
 
            int64_t size = stoll(it->value().ToString());
1560
 
            deleted_bytes += size;
1561
 
            bytes_needed -= size;
1562
 
            ++deleted_entries;
1563
 
 
1564
 
            string data_string;
1565
 
            string prefixed_key = k_data(atk.key);
1566
 
            auto s = db_->Get(read_options, prefixed_key, &data_string);
1567
 
            assert(!s.IsNotFound());
1568
 
            throw_if_error(s, "delete_at_least()");
1569
 
            DataTuple dt(move(data_string));
1570
 
            batch_delete(atk.key, dt, batch);
1571
 
 
1572
 
            --stats_->num_entries_;
1573
 
            ++stats_->lru_evictions_;
1574
 
            stats_->hist_decrement(size);
1575
 
            stats_->cache_size_ -= size;
1576
 
            call_handler(atk.key, CacheEventIndex::evict_lru);
1577
 
 
1578
 
            it->Next();
1579
 
        }
1580
 
        throw_if_error(it->status(), "delete_at_least(): LRU iterator error");
1581
 
        assert(deleted_bytes > 0);
1582
 
        assert(bytes_needed <= 0);
1583
 
    }
1584
 
 
1585
 
    auto s = db_->Write(write_options, &batch);
1586
 
    throw_if_error(s, "delete_at_least(): LRU write error");
1587
 
 
1588
 
    assert(stats_->cache_size_ >= 0);
1589
 
    assert(stats_->num_entries_ >= 0);
1590
 
    assert(stats_->cache_size_ == 0 || stats_->num_entries_ != 0);
1591
 
    assert(stats_->num_entries_ == 0 || stats_->cache_size_ != 0);
1592
 
}
1593
 
 
1594
 
void PersistentStringCacheImpl::call_handler(string const& key, CacheEventIndex event_index) const
1595
 
{
1596
 
    // mutex_ must be locked here!
1597
 
 
1598
 
    typedef underlying_type<CacheEventIndex>::type IndexType;
1599
 
    auto handler = handlers_[static_cast<IndexType>(event_index)];
1600
 
    if (handler)
1601
 
    {
1602
 
        try
1603
 
        {
1604
 
            IndexType index = static_cast<IndexType>(event_index);
1605
 
            handler(key, static_cast<CacheEvent>(1 << index), stats_);
1606
 
        }
1607
 
        catch (...)
1608
 
        {
1609
 
            // Ignored
1610
 
        }
1611
 
    }
1612
 
}
1613
 
 
1614
 
string PersistentStringCacheImpl::make_message(leveldb::Status const& s, string const& msg) const
1615
 
{
1616
 
    return class_name + ": " + msg + ": " + s.ToString() + " (cache_path: " + stats_->cache_path_ + ")";
1617
 
}
1618
 
 
1619
 
string PersistentStringCacheImpl::make_message(string const& msg) const
1620
 
{
1621
 
    return class_name + ": " + msg + " (cache_path: " + stats_->cache_path_ + ")";
1622
 
}
1623
 
 
1624
 
void PersistentStringCacheImpl::throw_if_error(leveldb::Status const& s, string const& msg) const
1625
 
{
1626
 
    if (!s.ok() && !s.IsNotFound())
1627
 
    {
1628
 
        if (s.IsCorruption())
1629
 
        {
1630
 
            throw system_error(666, generic_category(), make_message(s, msg));  // LCOV_EXCL_LINE
1631
 
        }
1632
 
        throw runtime_error(make_message(s, msg));
1633
 
    }
1634
 
}
1635
 
 
1636
 
void PersistentStringCacheImpl::throw_logic_error(string const& msg) const
1637
 
{
1638
 
    throw logic_error(make_message(msg));
1639
 
}
1640
 
 
1641
 
void PersistentStringCacheImpl::throw_invalid_argument(string const& msg) const
1642
 
{
1643
 
    throw invalid_argument(make_message(msg));
1644
 
}
1645
 
 
1646
 
void PersistentStringCacheImpl::throw_corrupt_error(string const& msg) const
1647
 
{
1648
 
    throw system_error(666, generic_category(), make_message(msg));
1649
 
}
1650
 
 
1651
 
}  // namespace internal
1652
 
 
1653
 
}  // namespace core