1
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
12
#include "default_engine.h"
14
/* Forward Declarations */
15
static void item_link_q(struct default_engine *engine, hash_item *it);
16
static void item_unlink_q(struct default_engine *engine, hash_item *it);
17
static hash_item *do_item_alloc(struct default_engine *engine,
18
const void *key, const size_t nkey,
19
const int flags, const rel_time_t exptime,
22
static hash_item *do_item_get(struct default_engine *engine,
23
const char *key, const size_t nkey);
24
static int do_item_link(struct default_engine *engine, hash_item *it);
25
static void do_item_unlink(struct default_engine *engine, hash_item *it);
26
static void do_item_release(struct default_engine *engine, hash_item *it);
27
static void do_item_update(struct default_engine *engine, hash_item *it);
28
static int do_item_replace(struct default_engine *engine,
29
hash_item *it, hash_item *new_it);
30
static void item_free(struct default_engine *engine, hash_item *it);
33
* We only reposition items in the LRU queue if they haven't been repositioned
34
* in this many seconds. That saves us from churning on frequently-accessed
37
#define ITEM_UPDATE_INTERVAL 60
39
* To avoid scanning through the complete cache in some circumstances we'll
40
* just give up and return an error after inspecting a fixed number of objects.
42
static const int search_items = 50;
44
void item_stats_reset(struct default_engine *engine) {
45
pthread_mutex_lock(&engine->cache_lock);
46
memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
47
pthread_mutex_unlock(&engine->cache_lock);
51
/* warning: don't use these macros with a function, as it evals its arg twice */
52
static inline size_t ITEM_ntotal(struct default_engine *engine,
53
const hash_item *item) {
54
size_t ret = sizeof(*item) + item->nkey + item->nbytes;
55
if (engine->config.use_cas) {
56
ret += sizeof(uint64_t);
62
/* Get the next CAS id for a new item. */
63
static uint64_t get_cas_id(void) {
64
static uint64_t cas_id = 0;
68
/* Enable this for reference-count debugging. */
70
# define DEBUG_REFCNT(it,op) \
71
fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
72
it, op, it->refcount, \
73
(it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
74
(it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
76
# define DEBUG_REFCNT(it,op) while(0)
81
hash_item *do_item_alloc(struct default_engine *engine,
85
const rel_time_t exptime,
89
size_t ntotal = sizeof(hash_item) + nkey + nbytes;
90
if (engine->config.use_cas) {
91
ntotal += sizeof(uint64_t);
94
unsigned int id = slabs_clsid(engine, ntotal);
98
/* do a quick check if we have any expired items in the tail.. */
99
int tries = search_items;
102
rel_time_t current_time = engine->server.core->get_current_time();
104
for (search = engine->items.tails[id];
105
tries > 0 && search != NULL;
106
tries--, search=search->prev) {
107
if (search->refcount == 0 &&
108
(search->exptime != 0 && search->exptime < current_time)) {
110
/* I don't want to actually free the object, just steal
111
* the item to avoid to grab the slab mutex twice ;-)
113
pthread_mutex_lock(&engine->stats.lock);
114
engine->stats.reclaimed++;
115
pthread_mutex_unlock(&engine->stats.lock);
116
engine->items.itemstats[id].reclaimed++;
118
slabs_adjust_mem_requested(engine, it->slabs_clsid, ITEM_ntotal(engine, it), ntotal);
119
do_item_unlink(engine, it);
120
/* Initialize the item block: */
127
if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
129
** Could not find an expired item at the tail, and memory allocation
130
** failed. Try to evict some items!
132
tries = search_items;
134
/* If requested to not push old items out of cache when memory runs out,
135
* we're out of luck at this point...
138
if (engine->config.evict_to_free == 0) {
139
engine->items.itemstats[id].outofmemory++;
144
* try to get one off the right LRU
145
* don't necessariuly unlink the tail because it may be locked: refcount>0
146
* search up from tail an item with refcount==0 and unlink it; give up after search_items
150
if (engine->items.tails[id] == 0) {
151
engine->items.itemstats[id].outofmemory++;
155
for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
156
if (search->refcount == 0) {
157
if (search->exptime == 0 || search->exptime > current_time) {
158
engine->items.itemstats[id].evicted++;
159
engine->items.itemstats[id].evicted_time = current_time - search->time;
160
if (search->exptime != 0) {
161
engine->items.itemstats[id].evicted_nonzero++;
163
pthread_mutex_lock(&engine->stats.lock);
164
engine->stats.evictions++;
165
pthread_mutex_unlock(&engine->stats.lock);
166
engine->server.stat->evicting(cookie,
167
item_get_key(search),
170
engine->items.itemstats[id].reclaimed++;
171
pthread_mutex_lock(&engine->stats.lock);
172
engine->stats.reclaimed++;
173
pthread_mutex_unlock(&engine->stats.lock);
175
do_item_unlink(engine, search);
179
it = slabs_alloc(engine, ntotal, id);
181
engine->items.itemstats[id].outofmemory++;
182
/* Last ditch effort. There is a very rare bug which causes
183
* refcount leaks. We've fixed most of them, but it still happens,
184
* and it may happen in the future.
185
* We can reasonably assume no item can stay locked for more than
186
* three hours, so if we find one in the tail which is that old,
189
tries = search_items;
190
for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
191
if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
192
engine->items.itemstats[id].tailrepairs++;
193
search->refcount = 0;
194
do_item_unlink(engine, search);
198
it = slabs_alloc(engine, ntotal, id);
205
assert(it->slabs_clsid == 0);
207
it->slabs_clsid = id;
209
assert(it != engine->items.heads[it->slabs_clsid]);
211
it->next = it->prev = it->h_next = 0;
212
it->refcount = 1; /* the caller will have a reference */
213
DEBUG_REFCNT(it, '*');
214
it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
218
memcpy((void*)item_get_key(it), key, nkey);
219
it->exptime = exptime;
223
static void item_free(struct default_engine *engine, hash_item *it) {
224
size_t ntotal = ITEM_ntotal(engine, it);
226
assert((it->iflag & ITEM_LINKED) == 0);
227
assert(it != engine->items.heads[it->slabs_clsid]);
228
assert(it != engine->items.tails[it->slabs_clsid]);
229
assert(it->refcount == 0);
231
/* so slab size changer can tell later if item is already free or not */
232
clsid = it->slabs_clsid;
234
it->iflag |= ITEM_SLABBED;
235
DEBUG_REFCNT(it, 'F');
236
slabs_free(engine, it, ntotal, clsid);
239
static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
240
hash_item **head, **tail;
241
assert(it->slabs_clsid < POWER_LARGEST);
242
assert((it->iflag & ITEM_SLABBED) == 0);
244
head = &engine->items.heads[it->slabs_clsid];
245
tail = &engine->items.tails[it->slabs_clsid];
247
assert((*head && *tail) || (*head == 0 && *tail == 0));
250
if (it->next) it->next->prev = it;
252
if (*tail == 0) *tail = it;
253
engine->items.sizes[it->slabs_clsid]++;
257
static void item_unlink_q(struct default_engine *engine, hash_item *it) {
258
hash_item **head, **tail;
259
assert(it->slabs_clsid < POWER_LARGEST);
260
head = &engine->items.heads[it->slabs_clsid];
261
tail = &engine->items.tails[it->slabs_clsid];
264
assert(it->prev == 0);
268
assert(it->next == 0);
271
assert(it->next != it);
272
assert(it->prev != it);
274
if (it->next) it->next->prev = it->prev;
275
if (it->prev) it->prev->next = it->next;
276
engine->items.sizes[it->slabs_clsid]--;
280
int do_item_link(struct default_engine *engine, hash_item *it) {
281
MEMCACHED_ITEM_LINK(item_get_key(it), it->nkey, it->nbytes);
282
assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
283
assert(it->nbytes < (1024 * 1024)); /* 1MB max size */
284
it->iflag |= ITEM_LINKED;
285
it->time = engine->server.core->get_current_time();
286
assoc_insert(engine, engine->server.core->hash(item_get_key(it),
290
pthread_mutex_lock(&engine->stats.lock);
291
engine->stats.curr_bytes += ITEM_ntotal(engine, it);
292
engine->stats.curr_items += 1;
293
engine->stats.total_items += 1;
294
pthread_mutex_unlock(&engine->stats.lock);
296
/* Allocate a new CAS ID on link. */
297
item_set_cas(NULL, NULL, it, get_cas_id());
299
item_link_q(engine, it);
304
void do_item_unlink(struct default_engine *engine, hash_item *it) {
305
MEMCACHED_ITEM_UNLINK(item_get_key(it), it->nkey, it->nbytes);
306
if ((it->iflag & ITEM_LINKED) != 0) {
307
it->iflag &= ~ITEM_LINKED;
308
pthread_mutex_lock(&engine->stats.lock);
309
engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
310
engine->stats.curr_items -= 1;
311
pthread_mutex_unlock(&engine->stats.lock);
312
assoc_delete(engine, engine->server.core->hash(item_get_key(it),
314
item_get_key(it), it->nkey);
315
item_unlink_q(engine, it);
316
if (it->refcount == 0) {
317
item_free(engine, it);
322
void do_item_release(struct default_engine *engine, hash_item *it) {
323
MEMCACHED_ITEM_REMOVE(item_get_key(it), it->nkey, it->nbytes);
324
if (it->refcount != 0) {
326
DEBUG_REFCNT(it, '-');
328
if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
329
item_free(engine, it);
333
void do_item_update(struct default_engine *engine, hash_item *it) {
334
rel_time_t current_time = engine->server.core->get_current_time();
335
MEMCACHED_ITEM_UPDATE(item_get_key(it), it->nkey, it->nbytes);
336
if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
337
assert((it->iflag & ITEM_SLABBED) == 0);
339
if ((it->iflag & ITEM_LINKED) != 0) {
340
item_unlink_q(engine, it);
341
it->time = current_time;
342
item_link_q(engine, it);
347
int do_item_replace(struct default_engine *engine,
348
hash_item *it, hash_item *new_it) {
349
MEMCACHED_ITEM_REPLACE(item_get_key(it), it->nkey, it->nbytes,
350
item_get_key(new_it), new_it->nkey, new_it->nbytes);
351
assert((it->iflag & ITEM_SLABBED) == 0);
353
do_item_unlink(engine, it);
354
return do_item_link(engine, new_it);
358
static char *do_item_cachedump(const unsigned int slabs_clsid,
359
const unsigned int limit,
360
unsigned int *bytes) {
362
unsigned int memlimit = 2 * 1024 * 1024; /* 2MB max response size */
364
unsigned int bufcurr;
367
unsigned int shown = 0;
368
char key_temp[KEY_MAX_LENGTH + 1];
371
it = engine->items.heads[slabs_clsid];
373
buffer = malloc((size_t)memlimit);
374
if (buffer == 0) return NULL;
378
while (it != NULL && (limit == 0 || shown < limit)) {
379
assert(it->nkey <= KEY_MAX_LENGTH);
380
/* Copy the key since it may not be null-terminated in the struct */
381
strncpy(key_temp, item_get_key(it), it->nkey);
382
key_temp[it->nkey] = 0x00; /* terminate */
383
len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
384
key_temp, it->nbytes,
385
(unsigned long)it->exptime + process_started);
386
if (bufcurr + len + 6 > memlimit) /* 6 is END\r\n\0 */
388
memcpy(buffer + bufcurr, temp, len);
395
memcpy(buffer + bufcurr, "END\r\n", 6);
407
static void do_item_stats(struct default_engine *engine,
408
ADD_STAT add_stats, const void *c) {
410
rel_time_t current_time = engine->server.core->get_current_time();
411
for (i = 0; i < POWER_LARGEST; i++) {
412
if (engine->items.tails[i] != NULL) {
413
int search = search_items;
415
engine->items.tails[i] != NULL &&
416
((engine->config.oldest_live != 0 && /* Item flushd */
417
engine->config.oldest_live <= current_time &&
418
engine->items.tails[i]->time <= engine->config.oldest_live) ||
419
(engine->items.tails[i]->exptime != 0 && /* and not expired */
420
engine->items.tails[i]->exptime < current_time))) {
422
if (engine->items.tails[i]->refcount == 0) {
423
do_item_unlink(engine, engine->items.tails[i]);
428
if (engine->items.tails[i] == NULL) {
429
/* We removed all of the items in this slab class */
433
const char *prefix = "items";
434
add_statistics(c, add_stats, prefix, i, "number", "%u",
435
engine->items.sizes[i]);
436
add_statistics(c, add_stats, prefix, i, "age", "%u",
437
engine->items.tails[i]->time);
438
add_statistics(c, add_stats, prefix, i, "evicted",
439
"%u", engine->items.itemstats[i].evicted);
440
add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
441
"%u", engine->items.itemstats[i].evicted_nonzero);
442
add_statistics(c, add_stats, prefix, i, "evicted_time",
443
"%u", engine->items.itemstats[i].evicted_time);
444
add_statistics(c, add_stats, prefix, i, "outofmemory",
445
"%u", engine->items.itemstats[i].outofmemory);
446
add_statistics(c, add_stats, prefix, i, "tailrepairs",
447
"%u", engine->items.itemstats[i].tailrepairs);;
448
add_statistics(c, add_stats, prefix, i, "reclaimed",
449
"%u", engine->items.itemstats[i].reclaimed);;
454
/** dumps out a list of objects of each size, with granularity of 32 bytes */
456
static void do_item_stats_sizes(struct default_engine *engine,
457
ADD_STAT add_stats, const void *c) {
459
/* max 1MB object, divided into 32 bytes size buckets */
460
const int num_buckets = 32768;
461
unsigned int *histogram = calloc(num_buckets, sizeof(int));
463
if (histogram != NULL) {
466
/* build the histogram */
467
for (i = 0; i < POWER_LARGEST; i++) {
468
hash_item *iter = engine->items.heads[i];
470
int ntotal = ITEM_ntotal(engine, iter);
471
int bucket = ntotal / 32;
472
if ((ntotal % 32) != 0) bucket++;
473
if (bucket < num_buckets) histogram[bucket]++;
478
/* write the buffer */
479
for (i = 0; i < num_buckets; i++) {
480
if (histogram[i] != 0) {
481
char key[8], val[32];
483
klen = snprintf(key, sizeof(key), "%d", i * 32);
484
vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
485
assert(klen < sizeof(key));
486
assert(vlen < sizeof(val));
487
add_stats(key, klen, val, vlen, c);
494
/** wrapper around assoc_find which does the lazy expiration logic */
495
hash_item *do_item_get(struct default_engine *engine,
496
const char *key, const size_t nkey) {
497
rel_time_t current_time = engine->server.core->get_current_time();
498
hash_item *it = assoc_find(engine, engine->server.core->hash(key,
503
if (engine->config.verbose > 2) {
504
EXTENSION_LOGGER_DESCRIPTOR *logger;
505
logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
507
logger->log(EXTENSION_LOG_DEBUG, NULL,
508
"> NOT FOUND %s", key);
510
logger->log(EXTENSION_LOG_DEBUG, NULL,
512
(const char*)item_get_key(it));
517
if (it != NULL && engine->config.oldest_live != 0 &&
518
engine->config.oldest_live <= current_time &&
519
it->time <= engine->config.oldest_live) {
520
do_item_unlink(engine, it); /* MTSAFE - cache_lock held */
524
if (it == NULL && was_found) {
525
EXTENSION_LOGGER_DESCRIPTOR *logger;
526
logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
527
logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by flush");
531
if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
532
do_item_unlink(engine, it); /* MTSAFE - cache_lock held */
536
if (it == NULL && was_found) {
537
EXTENSION_LOGGER_DESCRIPTOR *logger;
538
logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
539
logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by expire");
545
DEBUG_REFCNT(it, '+');
546
do_item_update(engine, it);
553
* Stores an item in the cache according to the semantics of one of the set
554
* commands. In threaded mode, this is protected by the cache lock.
556
* Returns the state of storage.
558
static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
559
hash_item *it, uint64_t *cas,
560
ENGINE_STORE_OPERATION operation,
561
const void *cookie) {
562
const char *key = item_get_key(it);
563
hash_item *old_it = do_item_get(engine, key, it->nkey);
564
ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
566
hash_item *new_it = NULL;
568
if (old_it != NULL && operation == OPERATION_ADD) {
569
/* add only adds a nonexistent item, but promote to head of LRU */
570
do_item_update(engine, old_it);
571
} else if (!old_it && (operation == OPERATION_REPLACE
572
|| operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
574
/* replace only replaces an existing value; don't store */
575
} else if (operation == OPERATION_CAS) {
576
/* validate cas operation */
579
stored = ENGINE_KEY_ENOENT;
581
else if (item_get_cas(it) == item_get_cas(old_it)) {
583
// it and old_it may belong to different classes.
584
// I'm updating the stats for the one that's getting pushed out
585
do_item_replace(engine, old_it, it);
586
stored = ENGINE_SUCCESS;
588
if (engine->config.verbose > 1) {
589
EXTENSION_LOGGER_DESCRIPTOR *logger;
590
logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
591
logger->log(EXTENSION_LOG_INFO, NULL,
592
"CAS: failure: expected %"PRIu64", got %"PRIu64"\n",
593
item_get_cas(old_it),
596
stored = ENGINE_KEY_EEXISTS;
600
* Append - combine new and old record into single one. Here it's
601
* atomic and thread-safe.
603
if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
607
if (item_get_cas(it) != 0) {
609
if (item_get_cas(it) != item_get_cas(old_it)) {
610
stored = ENGINE_KEY_EEXISTS;
614
if (stored == ENGINE_NOT_STORED) {
615
/* we have it and old_it here - alloc memory to hold both */
616
new_it = do_item_alloc(engine, key, it->nkey,
619
it->nbytes + old_it->nbytes,
622
if (new_it == NULL) {
623
/* SERVER_ERROR out of memory */
624
if (old_it != NULL) {
625
do_item_release(engine, old_it);
628
return ENGINE_NOT_STORED;
631
/* copy data from it and old_it to new_it */
633
if (operation == OPERATION_APPEND) {
634
memcpy(item_get_data(new_it), item_get_data(old_it), old_it->nbytes);
635
memcpy(item_get_data(new_it) + old_it->nbytes, item_get_data(it), it->nbytes);
637
/* OPERATION_PREPEND */
638
memcpy(item_get_data(new_it), item_get_data(it), it->nbytes);
639
memcpy(item_get_data(new_it) + it->nbytes, item_get_data(old_it), old_it->nbytes);
646
if (stored == ENGINE_NOT_STORED) {
647
if (old_it != NULL) {
648
do_item_replace(engine, old_it, it);
650
do_item_link(engine, it);
653
*cas = item_get_cas(it);
654
stored = ENGINE_SUCCESS;
658
if (old_it != NULL) {
659
do_item_release(engine, old_it); /* release our reference */
662
if (new_it != NULL) {
663
do_item_release(engine, new_it);
666
if (stored == ENGINE_SUCCESS) {
667
*cas = item_get_cas(it);
675
* adds a delta value to a numeric item.
677
* c connection requesting the operation
679
* incr true to increment value, false to decrement
680
* delta amount to adjust value by
681
* buf buffer for response string
683
* returns a response string to send back to the client.
685
static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine,
686
hash_item *it, const bool incr,
687
const int64_t delta, uint64_t *rcas,
688
uint64_t *result, const void *cookie) {
694
if (it->nbytes >= (sizeof(buf) - 1)) {
695
return ENGINE_EINVAL;
698
ptr = item_get_data(it);
699
memcpy(buf, ptr, it->nbytes);
700
buf[it->nbytes] = '\0';
702
if (!safe_strtoull(buf, &value)) {
703
return ENGINE_EINVAL;
717
if ((res = snprintf(buf, sizeof(buf), "%" PRIu64, value)) == -1) {
718
return ENGINE_EINVAL;
721
if (it->refcount == 1 && res <= it->nbytes) {
722
// we can do inline replacement
723
memcpy(item_get_data(it), buf, res);
724
memset(item_get_data(it) + res, ' ', it->nbytes - res);
725
item_set_cas(NULL, NULL, it, get_cas_id());
726
*rcas = item_get_cas(it);
728
hash_item *new_it = do_item_alloc(engine, item_get_key(it),
732
if (new_it == NULL) {
733
do_item_unlink(engine, it);
734
return ENGINE_ENOMEM;
736
memcpy(item_get_data(new_it), buf, res);
737
do_item_replace(engine, it, new_it);
738
*rcas = item_get_cas(new_it);
739
do_item_release(engine, new_it); /* release our reference */
742
return ENGINE_SUCCESS;
745
/********************************* ITEM ACCESS *******************************/
748
* Allocates a new item.
750
hash_item *item_alloc(struct default_engine *engine,
751
const void *key, size_t nkey, int flags,
752
rel_time_t exptime, int nbytes, const void *cookie) {
754
pthread_mutex_lock(&engine->cache_lock);
755
it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
756
pthread_mutex_unlock(&engine->cache_lock);
761
* Returns an item if it hasn't been marked as expired,
762
* lazy-expiring as needed.
764
hash_item *item_get(struct default_engine *engine,
765
const void *key, const size_t nkey) {
767
pthread_mutex_lock(&engine->cache_lock);
768
it = do_item_get(engine, key, nkey);
769
pthread_mutex_unlock(&engine->cache_lock);
774
* Decrements the reference count on an item and adds it to the freelist if
777
void item_release(struct default_engine *engine, hash_item *item) {
778
pthread_mutex_lock(&engine->cache_lock);
779
do_item_release(engine, item);
780
pthread_mutex_unlock(&engine->cache_lock);
784
* Unlinks an item from the LRU and hashtable.
786
void item_unlink(struct default_engine *engine, hash_item *item) {
787
pthread_mutex_lock(&engine->cache_lock);
788
do_item_unlink(engine, item);
789
pthread_mutex_unlock(&engine->cache_lock);
792
static ENGINE_ERROR_CODE do_arithmetic(struct default_engine *engine,
796
const bool increment,
798
const uint64_t delta,
799
const uint64_t initial,
800
const rel_time_t exptime,
804
hash_item *item = do_item_get(engine, key, nkey);
805
ENGINE_ERROR_CODE ret;
809
return ENGINE_KEY_ENOENT;
812
int len = snprintf(buffer, sizeof(buffer), "%"PRIu64,
815
item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie);
817
return ENGINE_ENOMEM;
819
memcpy((void*)item_get_data(item), buffer, len);
820
if ((ret = do_store_item(engine, item, cas,
821
OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
823
*cas = item_get_cas(item);
825
do_item_release(engine, item);
828
ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
829
do_item_release(engine, item);
835
ENGINE_ERROR_CODE arithmetic(struct default_engine *engine,
839
const bool increment,
841
const uint64_t delta,
842
const uint64_t initial,
843
const rel_time_t exptime,
847
ENGINE_ERROR_CODE ret;
849
pthread_mutex_lock(&engine->cache_lock);
850
ret = do_arithmetic(engine, cookie, key, nkey, increment,
851
create, delta, initial, exptime, cas,
853
pthread_mutex_unlock(&engine->cache_lock);
858
* Stores an item in the cache (high level, obeys set/add/replace semantics)
860
ENGINE_ERROR_CODE store_item(struct default_engine *engine,
861
hash_item *item, uint64_t *cas,
862
ENGINE_STORE_OPERATION operation,
863
const void *cookie) {
864
ENGINE_ERROR_CODE ret;
866
pthread_mutex_lock(&engine->cache_lock);
867
ret = do_store_item(engine, item, cas, operation, cookie);
868
pthread_mutex_unlock(&engine->cache_lock);
872
static hash_item *do_touch_item(struct default_engine *engine,
877
hash_item *item = do_item_get(engine, key, nkey);
879
item->exptime = exptime;
884
hash_item *touch_item(struct default_engine *engine,
891
pthread_mutex_lock(&engine->cache_lock);
892
ret = do_touch_item(engine, key, nkey, exptime);
893
pthread_mutex_unlock(&engine->cache_lock);
898
* Flushes expired items after a flush_all call
900
void item_flush_expired(struct default_engine *engine, time_t when) {
902
hash_item *iter, *next;
904
pthread_mutex_lock(&engine->cache_lock);
907
engine->config.oldest_live = engine->server.core->get_current_time() - 1;
909
engine->config.oldest_live = engine->server.core->realtime(when) - 1;
912
if (engine->config.oldest_live != 0) {
913
for (i = 0; i < POWER_LARGEST; i++) {
915
* The LRU is sorted in decreasing time order, and an item's
916
* timestamp is never newer than its last access time, so we
917
* only need to walk back until we hit an item older than the
919
* The oldest_live checking will auto-expire the remaining items.
921
for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
922
if (iter->time >= engine->config.oldest_live) {
924
if ((iter->iflag & ITEM_SLABBED) == 0) {
925
do_item_unlink(engine, iter);
928
/* We've hit the first old item. Continue to the next queue. */
934
pthread_mutex_unlock(&engine->cache_lock);
938
* Dumps part of the cache
940
char *item_cachedump(struct default_engine *engine,
941
unsigned int slabs_clsid,
943
unsigned int *bytes) {
946
pthread_mutex_lock(&engine->cache_lock);
947
ret = do_item_cachedump(slabs_clsid, limit, bytes);
948
pthread_mutex_unlock(&engine->cache_lock);
952
void item_stats(struct default_engine *engine,
953
ADD_STAT add_stat, const void *cookie)
955
pthread_mutex_lock(&engine->cache_lock);
956
do_item_stats(engine, add_stat, cookie);
957
pthread_mutex_unlock(&engine->cache_lock);
961
void item_stats_sizes(struct default_engine *engine,
962
ADD_STAT add_stat, const void *cookie)
964
pthread_mutex_lock(&engine->cache_lock);
965
do_item_stats_sizes(engine, add_stat, cookie);
966
pthread_mutex_unlock(&engine->cache_lock);
969
static void do_item_link_cursor(struct default_engine *engine,
970
hash_item *cursor, int ii)
972
cursor->slabs_clsid = (uint8_t)ii;
974
cursor->prev = engine->items.tails[ii];
975
engine->items.tails[ii]->next = cursor;
976
engine->items.tails[ii] = cursor;
977
engine->items.sizes[ii]++;
980
typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
981
hash_item *item, void *cookie);
983
static bool do_item_walk_cursor(struct default_engine *engine,
988
ENGINE_ERROR_CODE *error)
991
*error = ENGINE_SUCCESS;
993
while (cursor->prev != NULL && ii < steplength) {
996
hash_item *ptr = cursor->prev;
997
item_unlink_q(engine, cursor);
1000
if (ptr == engine->items.heads[cursor->slabs_clsid]) {
1002
cursor->prev = NULL;
1005
cursor->prev = ptr->prev;
1006
cursor->prev->next = cursor;
1010
/* Ignore cursors */
1011
if (ptr->nkey == 0 && ptr->nbytes == 0) {
1014
*error = itemfunc(engine, ptr, itemdata);
1015
if (*error != ENGINE_SUCCESS) {
1025
return (cursor->prev != NULL);
1028
static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
1032
engine->scrubber.visited++;
1033
rel_time_t current_time = engine->server.core->get_current_time();
1034
if (item->refcount == 0 &&
1035
(item->exptime != 0 && item->exptime < current_time)) {
1036
do_item_unlink(engine, item);
1037
engine->scrubber.cleaned++;
1039
return ENGINE_SUCCESS;
1042
static void item_scrub_class(struct default_engine *engine,
1043
hash_item *cursor) {
1045
ENGINE_ERROR_CODE ret;
1048
pthread_mutex_lock(&engine->cache_lock);
1049
more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
1050
pthread_mutex_unlock(&engine->cache_lock);
1051
if (ret != ENGINE_SUCCESS) {
1057
static void *item_scubber_main(void *arg)
1059
struct default_engine *engine = arg;
1060
hash_item cursor = { .refcount = 1 };
1062
for (int ii = 0; ii < POWER_LARGEST; ++ii) {
1063
pthread_mutex_lock(&engine->cache_lock);
1065
if (engine->items.heads[ii] == NULL) {
1068
// add the item at the tail
1069
do_item_link_cursor(engine, &cursor, ii);
1071
pthread_mutex_unlock(&engine->cache_lock);
1074
item_scrub_class(engine, &cursor);
1078
pthread_mutex_lock(&engine->scrubber.lock);
1079
engine->scrubber.stopped = time(NULL);
1080
engine->scrubber.running = false;
1081
pthread_mutex_unlock(&engine->scrubber.lock);
1086
bool item_start_scrub(struct default_engine *engine)
1089
pthread_mutex_lock(&engine->scrubber.lock);
1090
if (!engine->scrubber.running) {
1091
engine->scrubber.started = time(NULL);
1092
engine->scrubber.stopped = 0;
1093
engine->scrubber.visited = 0;
1094
engine->scrubber.cleaned = 0;
1095
engine->scrubber.running = true;
1098
pthread_attr_t attr;
1100
if (pthread_attr_init(&attr) != 0 ||
1101
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
1102
pthread_create(&t, &attr, item_scubber_main, engine) != 0)
1104
engine->scrubber.running = false;
1109
pthread_mutex_unlock(&engine->scrubber.lock);
1119
static ENGINE_ERROR_CODE item_tap_iterfunc(struct default_engine *engine,
1122
struct tap_client *client = cookie;
1124
++client->it->refcount;
1125
return ENGINE_SUCCESS;
1128
static tap_event_t do_item_tap_walker(struct default_engine *engine,
1129
const void *cookie, item **itm,
1130
void **es, uint16_t *nes, uint8_t *ttl,
1131
uint16_t *flags, uint32_t *seqno,
1134
struct tap_client *client = engine->server.cookie->get_engine_specific(cookie);
1135
if (client == NULL) {
1136
return TAP_DISCONNECT;
1147
ENGINE_ERROR_CODE r;
1149
if (!do_item_walk_cursor(engine, &client->cursor, 1, item_tap_iterfunc, client, &r)) {
1150
// find next slab class to look at..
1151
bool linked = false;
1152
for (int ii = client->cursor.slabs_clsid + 1; ii < POWER_LARGEST && !linked; ++ii) {
1153
if (engine->items.heads[ii] != NULL) {
1154
// add the item at the tail
1155
do_item_link_cursor(engine, &client->cursor, ii);
1163
} while (client->it == NULL);
1166
return (*itm == NULL) ? TAP_DISCONNECT : TAP_MUTATION;
1169
tap_event_t item_tap_walker(ENGINE_HANDLE* handle,
1170
const void *cookie, item **itm,
1171
void **es, uint16_t *nes, uint8_t *ttl,
1172
uint16_t *flags, uint32_t *seqno,
1176
struct default_engine *engine = (struct default_engine*)handle;
1177
pthread_mutex_lock(&engine->cache_lock);
1178
ret = do_item_tap_walker(engine, cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1179
pthread_mutex_unlock(&engine->cache_lock);
1184
bool initialize_item_tap_walker(struct default_engine *engine,
1187
struct tap_client *client = calloc(1, sizeof(*client));
1188
if (client == NULL) {
1191
client->cursor.refcount = 1;
1193
/* Link the cursor! */
1194
bool linked = false;
1195
for (int ii = 0; ii < POWER_LARGEST && !linked; ++ii) {
1196
pthread_mutex_lock(&engine->cache_lock);
1197
if (engine->items.heads[ii] != NULL) {
1198
// add the item at the tail
1199
do_item_link_cursor(engine, &client->cursor, ii);
1202
pthread_mutex_unlock(&engine->cache_lock);
1205
engine->server.cookie->store_engine_specific(cookie, client);