~ubuntu-branches/ubuntu/trusty/mysql-5.6/trusty

« back to all changes in this revision

Viewing changes to plugin/innodb_memcached/daemon_memcached/engines/default_engine/items.c

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-02-12 11:54:27 UTC
  • Revision ID: package-import@ubuntu.com-20140212115427-oq6tfsqxl1wuwehi
Tags: upstream-5.6.15
ImportĀ upstreamĀ versionĀ 5.6.15

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
 
2
#include "config.h"
 
3
#include <fcntl.h>
 
4
#include <errno.h>
 
5
#include <stdlib.h>
 
6
#include <stdio.h>
 
7
#include <string.h>
 
8
#include <time.h>
 
9
#include <assert.h>
 
10
#include <inttypes.h>
 
11
 
 
12
#include "default_engine.h"
 
13
 
 
14
/* Forward Declarations */
 
15
static void item_link_q(struct default_engine *engine, hash_item *it);
 
16
static void item_unlink_q(struct default_engine *engine, hash_item *it);
 
17
static hash_item *do_item_alloc(struct default_engine *engine,
 
18
                                const void *key, const size_t nkey,
 
19
                                const int flags, const rel_time_t exptime,
 
20
                                const int nbytes,
 
21
                                const void *cookie);
 
22
static hash_item *do_item_get(struct default_engine *engine,
 
23
                              const char *key, const size_t nkey);
 
24
static int do_item_link(struct default_engine *engine, hash_item *it);
 
25
static void do_item_unlink(struct default_engine *engine, hash_item *it);
 
26
static void do_item_release(struct default_engine *engine, hash_item *it);
 
27
static void do_item_update(struct default_engine *engine, hash_item *it);
 
28
static int do_item_replace(struct default_engine *engine,
 
29
                            hash_item *it, hash_item *new_it);
 
30
static void item_free(struct default_engine *engine, hash_item *it);
 
31
 
 
32
/*
 
33
 * We only reposition items in the LRU queue if they haven't been repositioned
 
34
 * in this many seconds. That saves us from churning on frequently-accessed
 
35
 * items.
 
36
 */
 
37
#define ITEM_UPDATE_INTERVAL 60
 
38
/*
 
39
 * To avoid scanning through the complete cache in some circumstances we'll
 
40
 * just give up and return an error after inspecting a fixed number of objects.
 
41
 */
 
42
static const int search_items = 50;
 
43
 
 
44
void item_stats_reset(struct default_engine *engine) {
 
45
    pthread_mutex_lock(&engine->cache_lock);
 
46
    memset(engine->items.itemstats, 0, sizeof(engine->items.itemstats));
 
47
    pthread_mutex_unlock(&engine->cache_lock);
 
48
}
 
49
 
 
50
 
 
51
/* warning: don't use these macros with a function, as it evals its arg twice */
 
52
static inline size_t ITEM_ntotal(struct default_engine *engine,
 
53
                                 const hash_item *item) {
 
54
    size_t ret = sizeof(*item) + item->nkey + item->nbytes;
 
55
    if (engine->config.use_cas) {
 
56
        ret += sizeof(uint64_t);
 
57
    }
 
58
 
 
59
    return ret;
 
60
}
 
61
 
 
62
/* Get the next CAS id for a new item. */
 
63
static uint64_t get_cas_id(void) {
 
64
    static uint64_t cas_id = 0;
 
65
    return ++cas_id;
 
66
}
 
67
 
 
68
/* Enable this for reference-count debugging. */
 
69
#if 0
 
70
# define DEBUG_REFCNT(it,op) \
 
71
                fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
 
72
                        it, op, it->refcount, \
 
73
                        (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
 
74
                        (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
 
75
#else
 
76
# define DEBUG_REFCNT(it,op) while(0)
 
77
#endif
 
78
 
 
79
 
 
80
/*@null@*/
 
81
hash_item *do_item_alloc(struct default_engine *engine,
 
82
                         const void *key,
 
83
                         const size_t nkey,
 
84
                         const int flags,
 
85
                         const rel_time_t exptime,
 
86
                         const int nbytes,
 
87
                         const void *cookie) {
 
88
    hash_item *it = NULL;
 
89
    size_t ntotal = sizeof(hash_item) + nkey + nbytes;
 
90
    if (engine->config.use_cas) {
 
91
        ntotal += sizeof(uint64_t);
 
92
    }
 
93
 
 
94
    unsigned int id = slabs_clsid(engine, ntotal);
 
95
    if (id == 0)
 
96
        return 0;
 
97
 
 
98
    /* do a quick check if we have any expired items in the tail.. */
 
99
    int tries = search_items;
 
100
    hash_item *search;
 
101
 
 
102
    rel_time_t current_time = engine->server.core->get_current_time();
 
103
 
 
104
    for (search = engine->items.tails[id];
 
105
         tries > 0 && search != NULL;
 
106
         tries--, search=search->prev) {
 
107
        if (search->refcount == 0 &&
 
108
            (search->exptime != 0 && search->exptime < current_time)) {
 
109
            it = search;
 
110
            /* I don't want to actually free the object, just steal
 
111
             * the item to avoid to grab the slab mutex twice ;-)
 
112
             */
 
113
            pthread_mutex_lock(&engine->stats.lock);
 
114
            engine->stats.reclaimed++;
 
115
            pthread_mutex_unlock(&engine->stats.lock);
 
116
            engine->items.itemstats[id].reclaimed++;
 
117
            it->refcount = 1;
 
118
            slabs_adjust_mem_requested(engine, it->slabs_clsid, ITEM_ntotal(engine, it), ntotal);
 
119
            do_item_unlink(engine, it);
 
120
            /* Initialize the item block: */
 
121
            it->slabs_clsid = 0;
 
122
            it->refcount = 0;
 
123
            break;
 
124
        }
 
125
    }
 
126
 
 
127
    if (it == NULL && (it = slabs_alloc(engine, ntotal, id)) == NULL) {
 
128
        /*
 
129
        ** Could not find an expired item at the tail, and memory allocation
 
130
        ** failed. Try to evict some items!
 
131
        */
 
132
        tries = search_items;
 
133
 
 
134
        /* If requested to not push old items out of cache when memory runs out,
 
135
         * we're out of luck at this point...
 
136
         */
 
137
 
 
138
        if (engine->config.evict_to_free == 0) {
 
139
            engine->items.itemstats[id].outofmemory++;
 
140
            return NULL;
 
141
        }
 
142
 
 
143
        /*
 
144
         * try to get one off the right LRU
 
145
         * don't necessariuly unlink the tail because it may be locked: refcount>0
 
146
         * search up from tail an item with refcount==0 and unlink it; give up after search_items
 
147
         * tries
 
148
         */
 
149
 
 
150
        if (engine->items.tails[id] == 0) {
 
151
            engine->items.itemstats[id].outofmemory++;
 
152
            return NULL;
 
153
        }
 
154
 
 
155
        for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
 
156
            if (search->refcount == 0) {
 
157
                if (search->exptime == 0 || search->exptime > current_time) {
 
158
                    engine->items.itemstats[id].evicted++;
 
159
                    engine->items.itemstats[id].evicted_time = current_time - search->time;
 
160
                    if (search->exptime != 0) {
 
161
                        engine->items.itemstats[id].evicted_nonzero++;
 
162
                    }
 
163
                    pthread_mutex_lock(&engine->stats.lock);
 
164
                    engine->stats.evictions++;
 
165
                    pthread_mutex_unlock(&engine->stats.lock);
 
166
                    engine->server.stat->evicting(cookie,
 
167
                                                  item_get_key(search),
 
168
                                                  search->nkey);
 
169
                } else {
 
170
                    engine->items.itemstats[id].reclaimed++;
 
171
                    pthread_mutex_lock(&engine->stats.lock);
 
172
                    engine->stats.reclaimed++;
 
173
                    pthread_mutex_unlock(&engine->stats.lock);
 
174
                }
 
175
                do_item_unlink(engine, search);
 
176
                break;
 
177
            }
 
178
        }
 
179
        it = slabs_alloc(engine, ntotal, id);
 
180
        if (it == 0) {
 
181
            engine->items.itemstats[id].outofmemory++;
 
182
            /* Last ditch effort. There is a very rare bug which causes
 
183
             * refcount leaks. We've fixed most of them, but it still happens,
 
184
             * and it may happen in the future.
 
185
             * We can reasonably assume no item can stay locked for more than
 
186
             * three hours, so if we find one in the tail which is that old,
 
187
             * free it anyway.
 
188
             */
 
189
            tries = search_items;
 
190
            for (search = engine->items.tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
 
191
                if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
 
192
                    engine->items.itemstats[id].tailrepairs++;
 
193
                    search->refcount = 0;
 
194
                    do_item_unlink(engine, search);
 
195
                    break;
 
196
                }
 
197
            }
 
198
            it = slabs_alloc(engine, ntotal, id);
 
199
            if (it == 0) {
 
200
                return NULL;
 
201
            }
 
202
        }
 
203
    }
 
204
 
 
205
    assert(it->slabs_clsid == 0);
 
206
 
 
207
    it->slabs_clsid = id;
 
208
 
 
209
    assert(it != engine->items.heads[it->slabs_clsid]);
 
210
 
 
211
    it->next = it->prev = it->h_next = 0;
 
212
    it->refcount = 1;     /* the caller will have a reference */
 
213
    DEBUG_REFCNT(it, '*');
 
214
    it->iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
 
215
    it->nkey = nkey;
 
216
    it->nbytes = nbytes;
 
217
    it->flags = flags;
 
218
    memcpy((void*)item_get_key(it), key, nkey);
 
219
    it->exptime = exptime;
 
220
    return it;
 
221
}
 
222
 
 
223
static void item_free(struct default_engine *engine, hash_item *it) {
 
224
    size_t ntotal = ITEM_ntotal(engine, it);
 
225
    unsigned int clsid;
 
226
    assert((it->iflag & ITEM_LINKED) == 0);
 
227
    assert(it != engine->items.heads[it->slabs_clsid]);
 
228
    assert(it != engine->items.tails[it->slabs_clsid]);
 
229
    assert(it->refcount == 0);
 
230
 
 
231
    /* so slab size changer can tell later if item is already free or not */
 
232
    clsid = it->slabs_clsid;
 
233
    it->slabs_clsid = 0;
 
234
    it->iflag |= ITEM_SLABBED;
 
235
    DEBUG_REFCNT(it, 'F');
 
236
    slabs_free(engine, it, ntotal, clsid);
 
237
}
 
238
 
 
239
static void item_link_q(struct default_engine *engine, hash_item *it) { /* item is the new head */
 
240
    hash_item **head, **tail;
 
241
    assert(it->slabs_clsid < POWER_LARGEST);
 
242
    assert((it->iflag & ITEM_SLABBED) == 0);
 
243
 
 
244
    head = &engine->items.heads[it->slabs_clsid];
 
245
    tail = &engine->items.tails[it->slabs_clsid];
 
246
    assert(it != *head);
 
247
    assert((*head && *tail) || (*head == 0 && *tail == 0));
 
248
    it->prev = 0;
 
249
    it->next = *head;
 
250
    if (it->next) it->next->prev = it;
 
251
    *head = it;
 
252
    if (*tail == 0) *tail = it;
 
253
    engine->items.sizes[it->slabs_clsid]++;
 
254
    return;
 
255
}
 
256
 
 
257
static void item_unlink_q(struct default_engine *engine, hash_item *it) {
 
258
    hash_item **head, **tail;
 
259
    assert(it->slabs_clsid < POWER_LARGEST);
 
260
    head = &engine->items.heads[it->slabs_clsid];
 
261
    tail = &engine->items.tails[it->slabs_clsid];
 
262
 
 
263
    if (*head == it) {
 
264
        assert(it->prev == 0);
 
265
        *head = it->next;
 
266
    }
 
267
    if (*tail == it) {
 
268
        assert(it->next == 0);
 
269
        *tail = it->prev;
 
270
    }
 
271
    assert(it->next != it);
 
272
    assert(it->prev != it);
 
273
 
 
274
    if (it->next) it->next->prev = it->prev;
 
275
    if (it->prev) it->prev->next = it->next;
 
276
    engine->items.sizes[it->slabs_clsid]--;
 
277
    return;
 
278
}
 
279
 
 
280
int do_item_link(struct default_engine *engine, hash_item *it) {
 
281
    MEMCACHED_ITEM_LINK(item_get_key(it), it->nkey, it->nbytes);
 
282
    assert((it->iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
 
283
    assert(it->nbytes < (1024 * 1024));  /* 1MB max size */
 
284
    it->iflag |= ITEM_LINKED;
 
285
    it->time = engine->server.core->get_current_time();
 
286
    assoc_insert(engine, engine->server.core->hash(item_get_key(it),
 
287
                                                        it->nkey, 0),
 
288
                 it);
 
289
 
 
290
    pthread_mutex_lock(&engine->stats.lock);
 
291
    engine->stats.curr_bytes += ITEM_ntotal(engine, it);
 
292
    engine->stats.curr_items += 1;
 
293
    engine->stats.total_items += 1;
 
294
    pthread_mutex_unlock(&engine->stats.lock);
 
295
 
 
296
    /* Allocate a new CAS ID on link. */
 
297
    item_set_cas(NULL, NULL, it, get_cas_id());
 
298
 
 
299
    item_link_q(engine, it);
 
300
 
 
301
    return 1;
 
302
}
 
303
 
 
304
void do_item_unlink(struct default_engine *engine, hash_item *it) {
 
305
    MEMCACHED_ITEM_UNLINK(item_get_key(it), it->nkey, it->nbytes);
 
306
    if ((it->iflag & ITEM_LINKED) != 0) {
 
307
        it->iflag &= ~ITEM_LINKED;
 
308
        pthread_mutex_lock(&engine->stats.lock);
 
309
        engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
 
310
        engine->stats.curr_items -= 1;
 
311
        pthread_mutex_unlock(&engine->stats.lock);
 
312
        assoc_delete(engine, engine->server.core->hash(item_get_key(it),
 
313
                                                            it->nkey, 0),
 
314
                     item_get_key(it), it->nkey);
 
315
        item_unlink_q(engine, it);
 
316
        if (it->refcount == 0) {
 
317
            item_free(engine, it);
 
318
        }
 
319
    }
 
320
}
 
321
 
 
322
void do_item_release(struct default_engine *engine, hash_item *it) {
 
323
    MEMCACHED_ITEM_REMOVE(item_get_key(it), it->nkey, it->nbytes);
 
324
    if (it->refcount != 0) {
 
325
        it->refcount--;
 
326
        DEBUG_REFCNT(it, '-');
 
327
    }
 
328
    if (it->refcount == 0 && (it->iflag & ITEM_LINKED) == 0) {
 
329
        item_free(engine, it);
 
330
    }
 
331
}
 
332
 
 
333
void do_item_update(struct default_engine *engine, hash_item *it) {
 
334
    rel_time_t current_time = engine->server.core->get_current_time();
 
335
    MEMCACHED_ITEM_UPDATE(item_get_key(it), it->nkey, it->nbytes);
 
336
    if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
 
337
        assert((it->iflag & ITEM_SLABBED) == 0);
 
338
 
 
339
        if ((it->iflag & ITEM_LINKED) != 0) {
 
340
            item_unlink_q(engine, it);
 
341
            it->time = current_time;
 
342
            item_link_q(engine, it);
 
343
        }
 
344
    }
 
345
}
 
346
 
 
347
int do_item_replace(struct default_engine *engine,
 
348
                    hash_item *it, hash_item *new_it) {
 
349
    MEMCACHED_ITEM_REPLACE(item_get_key(it), it->nkey, it->nbytes,
 
350
                           item_get_key(new_it), new_it->nkey, new_it->nbytes);
 
351
    assert((it->iflag & ITEM_SLABBED) == 0);
 
352
 
 
353
    do_item_unlink(engine, it);
 
354
    return do_item_link(engine, new_it);
 
355
}
 
356
 
 
357
/*@null@*/
 
358
static char *do_item_cachedump(const unsigned int slabs_clsid,
 
359
                               const unsigned int limit,
 
360
                               unsigned int *bytes) {
 
361
#ifdef FUTURE
 
362
    unsigned int memlimit = 2 * 1024 * 1024;   /* 2MB max response size */
 
363
    char *buffer;
 
364
    unsigned int bufcurr;
 
365
    hash_item *it;
 
366
    unsigned int len;
 
367
    unsigned int shown = 0;
 
368
    char key_temp[KEY_MAX_LENGTH + 1];
 
369
    char temp[512];
 
370
 
 
371
    it = engine->items.heads[slabs_clsid];
 
372
 
 
373
    buffer = malloc((size_t)memlimit);
 
374
    if (buffer == 0) return NULL;
 
375
    bufcurr = 0;
 
376
 
 
377
 
 
378
    while (it != NULL && (limit == 0 || shown < limit)) {
 
379
        assert(it->nkey <= KEY_MAX_LENGTH);
 
380
        /* Copy the key since it may not be null-terminated in the struct */
 
381
        strncpy(key_temp, item_get_key(it), it->nkey);
 
382
        key_temp[it->nkey] = 0x00; /* terminate */
 
383
        len = snprintf(temp, sizeof(temp), "ITEM %s [%d b; %lu s]\r\n",
 
384
                       key_temp, it->nbytes,
 
385
                       (unsigned long)it->exptime + process_started);
 
386
        if (bufcurr + len + 6 > memlimit)  /* 6 is END\r\n\0 */
 
387
            break;
 
388
        memcpy(buffer + bufcurr, temp, len);
 
389
        bufcurr += len;
 
390
        shown++;
 
391
        it = it->next;
 
392
    }
 
393
 
 
394
 
 
395
    memcpy(buffer + bufcurr, "END\r\n", 6);
 
396
    bufcurr += 5;
 
397
 
 
398
    *bytes = bufcurr;
 
399
    return buffer;
 
400
#endif
 
401
    (void)slabs_clsid;
 
402
    (void)limit;
 
403
    (void)bytes;
 
404
    return NULL;
 
405
}
 
406
 
 
407
static void do_item_stats(struct default_engine *engine,
 
408
                          ADD_STAT add_stats, const void *c) {
 
409
    int i;
 
410
    rel_time_t current_time = engine->server.core->get_current_time();
 
411
    for (i = 0; i < POWER_LARGEST; i++) {
 
412
        if (engine->items.tails[i] != NULL) {
 
413
            int search = search_items;
 
414
            while (search > 0 &&
 
415
                   engine->items.tails[i] != NULL &&
 
416
                   ((engine->config.oldest_live != 0 && /* Item flushd */
 
417
                     engine->config.oldest_live <= current_time &&
 
418
                     engine->items.tails[i]->time <= engine->config.oldest_live) ||
 
419
                    (engine->items.tails[i]->exptime != 0 && /* and not expired */
 
420
                     engine->items.tails[i]->exptime < current_time))) {
 
421
                --search;
 
422
                if (engine->items.tails[i]->refcount == 0) {
 
423
                    do_item_unlink(engine, engine->items.tails[i]);
 
424
                } else {
 
425
                    break;
 
426
                }
 
427
            }
 
428
            if (engine->items.tails[i] == NULL) {
 
429
                /* We removed all of the items in this slab class */
 
430
                continue;
 
431
            }
 
432
 
 
433
            const char *prefix = "items";
 
434
            add_statistics(c, add_stats, prefix, i, "number", "%u",
 
435
                           engine->items.sizes[i]);
 
436
            add_statistics(c, add_stats, prefix, i, "age", "%u",
 
437
                           engine->items.tails[i]->time);
 
438
            add_statistics(c, add_stats, prefix, i, "evicted",
 
439
                           "%u", engine->items.itemstats[i].evicted);
 
440
            add_statistics(c, add_stats, prefix, i, "evicted_nonzero",
 
441
                           "%u", engine->items.itemstats[i].evicted_nonzero);
 
442
            add_statistics(c, add_stats, prefix, i, "evicted_time",
 
443
                           "%u", engine->items.itemstats[i].evicted_time);
 
444
            add_statistics(c, add_stats, prefix, i, "outofmemory",
 
445
                           "%u", engine->items.itemstats[i].outofmemory);
 
446
            add_statistics(c, add_stats, prefix, i, "tailrepairs",
 
447
                           "%u", engine->items.itemstats[i].tailrepairs);;
 
448
            add_statistics(c, add_stats, prefix, i, "reclaimed",
 
449
                           "%u", engine->items.itemstats[i].reclaimed);;
 
450
        }
 
451
    }
 
452
}
 
453
 
 
454
/** dumps out a list of objects of each size, with granularity of 32 bytes */
 
455
/*@null@*/
 
456
static void do_item_stats_sizes(struct default_engine *engine,
 
457
                                ADD_STAT add_stats, const void *c) {
 
458
 
 
459
    /* max 1MB object, divided into 32 bytes size buckets */
 
460
    const int num_buckets = 32768;
 
461
    unsigned int *histogram = calloc(num_buckets, sizeof(int));
 
462
 
 
463
    if (histogram != NULL) {
 
464
        int i;
 
465
 
 
466
        /* build the histogram */
 
467
        for (i = 0; i < POWER_LARGEST; i++) {
 
468
            hash_item *iter = engine->items.heads[i];
 
469
            while (iter) {
 
470
                int ntotal = ITEM_ntotal(engine, iter);
 
471
                int bucket = ntotal / 32;
 
472
                if ((ntotal % 32) != 0) bucket++;
 
473
                if (bucket < num_buckets) histogram[bucket]++;
 
474
                iter = iter->next;
 
475
            }
 
476
        }
 
477
 
 
478
        /* write the buffer */
 
479
        for (i = 0; i < num_buckets; i++) {
 
480
            if (histogram[i] != 0) {
 
481
                char key[8], val[32];
 
482
                int klen, vlen;
 
483
                klen = snprintf(key, sizeof(key), "%d", i * 32);
 
484
                vlen = snprintf(val, sizeof(val), "%u", histogram[i]);
 
485
                assert(klen < sizeof(key));
 
486
                assert(vlen < sizeof(val));
 
487
                add_stats(key, klen, val, vlen, c);
 
488
            }
 
489
        }
 
490
        free(histogram);
 
491
    }
 
492
}
 
493
 
 
494
/** wrapper around assoc_find which does the lazy expiration logic */
 
495
hash_item *do_item_get(struct default_engine *engine,
 
496
                       const char *key, const size_t nkey) {
 
497
    rel_time_t current_time = engine->server.core->get_current_time();
 
498
    hash_item *it = assoc_find(engine, engine->server.core->hash(key,
 
499
                                                                      nkey, 0),
 
500
                               key, nkey);
 
501
    int was_found = 0;
 
502
 
 
503
    if (engine->config.verbose > 2) {
 
504
        EXTENSION_LOGGER_DESCRIPTOR *logger;
 
505
        logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
 
506
        if (it == NULL) {
 
507
            logger->log(EXTENSION_LOG_DEBUG, NULL,
 
508
                        "> NOT FOUND %s", key);
 
509
        } else {
 
510
            logger->log(EXTENSION_LOG_DEBUG, NULL,
 
511
                        "> FOUND KEY %s",
 
512
                        (const char*)item_get_key(it));
 
513
            was_found++;
 
514
        }
 
515
    }
 
516
 
 
517
    if (it != NULL && engine->config.oldest_live != 0 &&
 
518
        engine->config.oldest_live <= current_time &&
 
519
        it->time <= engine->config.oldest_live) {
 
520
        do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
 
521
        it = NULL;
 
522
    }
 
523
 
 
524
    if (it == NULL && was_found) {
 
525
        EXTENSION_LOGGER_DESCRIPTOR *logger;
 
526
        logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
 
527
        logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by flush");
 
528
        was_found--;
 
529
    }
 
530
 
 
531
    if (it != NULL && it->exptime != 0 && it->exptime <= current_time) {
 
532
        do_item_unlink(engine, it);           /* MTSAFE - cache_lock held */
 
533
        it = NULL;
 
534
    }
 
535
 
 
536
    if (it == NULL && was_found) {
 
537
        EXTENSION_LOGGER_DESCRIPTOR *logger;
 
538
        logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
 
539
        logger->log(EXTENSION_LOG_DEBUG, NULL, " -nuked by expire");
 
540
        was_found--;
 
541
    }
 
542
 
 
543
    if (it != NULL) {
 
544
        it->refcount++;
 
545
        DEBUG_REFCNT(it, '+');
 
546
        do_item_update(engine, it);
 
547
    }
 
548
 
 
549
    return it;
 
550
}
 
551
 
 
552
/*
 
553
 * Stores an item in the cache according to the semantics of one of the set
 
554
 * commands. In threaded mode, this is protected by the cache lock.
 
555
 *
 
556
 * Returns the state of storage.
 
557
 */
 
558
static ENGINE_ERROR_CODE do_store_item(struct default_engine *engine,
 
559
                                       hash_item *it, uint64_t *cas,
 
560
                                       ENGINE_STORE_OPERATION operation,
 
561
                                       const void *cookie) {
 
562
    const char *key = item_get_key(it);
 
563
    hash_item *old_it = do_item_get(engine, key, it->nkey);
 
564
    ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
 
565
 
 
566
    hash_item *new_it = NULL;
 
567
 
 
568
    if (old_it != NULL && operation == OPERATION_ADD) {
 
569
        /* add only adds a nonexistent item, but promote to head of LRU */
 
570
        do_item_update(engine, old_it);
 
571
    } else if (!old_it && (operation == OPERATION_REPLACE
 
572
        || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
 
573
    {
 
574
        /* replace only replaces an existing value; don't store */
 
575
    } else if (operation == OPERATION_CAS) {
 
576
        /* validate cas operation */
 
577
        if(old_it == NULL) {
 
578
            // LRU expired
 
579
            stored = ENGINE_KEY_ENOENT;
 
580
        }
 
581
        else if (item_get_cas(it) == item_get_cas(old_it)) {
 
582
            // cas validates
 
583
            // it and old_it may belong to different classes.
 
584
            // I'm updating the stats for the one that's getting pushed out
 
585
            do_item_replace(engine, old_it, it);
 
586
            stored = ENGINE_SUCCESS;
 
587
        } else {
 
588
            if (engine->config.verbose > 1) {
 
589
                EXTENSION_LOGGER_DESCRIPTOR *logger;
 
590
                logger = (void*)engine->server.extension->get_extension(EXTENSION_LOGGER);
 
591
                logger->log(EXTENSION_LOG_INFO, NULL,
 
592
                        "CAS:  failure: expected %"PRIu64", got %"PRIu64"\n",
 
593
                        item_get_cas(old_it),
 
594
                        item_get_cas(it));
 
595
            }
 
596
            stored = ENGINE_KEY_EEXISTS;
 
597
        }
 
598
    } else {
 
599
        /*
 
600
         * Append - combine new and old record into single one. Here it's
 
601
         * atomic and thread-safe.
 
602
         */
 
603
        if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
 
604
            /*
 
605
             * Validate CAS
 
606
             */
 
607
            if (item_get_cas(it) != 0) {
 
608
                // CAS much be equal
 
609
                if (item_get_cas(it) != item_get_cas(old_it)) {
 
610
                    stored = ENGINE_KEY_EEXISTS;
 
611
                }
 
612
            }
 
613
 
 
614
            if (stored == ENGINE_NOT_STORED) {
 
615
                /* we have it and old_it here - alloc memory to hold both */
 
616
                new_it = do_item_alloc(engine, key, it->nkey,
 
617
                                       old_it->flags,
 
618
                                       old_it->exptime,
 
619
                                       it->nbytes + old_it->nbytes,
 
620
                                       cookie);
 
621
 
 
622
                if (new_it == NULL) {
 
623
                    /* SERVER_ERROR out of memory */
 
624
                    if (old_it != NULL) {
 
625
                        do_item_release(engine, old_it);
 
626
                    }
 
627
 
 
628
                    return ENGINE_NOT_STORED;
 
629
                }
 
630
 
 
631
                /* copy data from it and old_it to new_it */
 
632
 
 
633
                if (operation == OPERATION_APPEND) {
 
634
                    memcpy(item_get_data(new_it), item_get_data(old_it), old_it->nbytes);
 
635
                    memcpy(item_get_data(new_it) + old_it->nbytes, item_get_data(it), it->nbytes);
 
636
                } else {
 
637
                    /* OPERATION_PREPEND */
 
638
                    memcpy(item_get_data(new_it), item_get_data(it), it->nbytes);
 
639
                    memcpy(item_get_data(new_it) + it->nbytes, item_get_data(old_it), old_it->nbytes);
 
640
                }
 
641
 
 
642
                it = new_it;
 
643
            }
 
644
        }
 
645
 
 
646
        if (stored == ENGINE_NOT_STORED) {
 
647
            if (old_it != NULL) {
 
648
                do_item_replace(engine, old_it, it);
 
649
            } else {
 
650
                do_item_link(engine, it);
 
651
            }
 
652
 
 
653
            *cas = item_get_cas(it);
 
654
            stored = ENGINE_SUCCESS;
 
655
        }
 
656
    }
 
657
 
 
658
    if (old_it != NULL) {
 
659
        do_item_release(engine, old_it);         /* release our reference */
 
660
    }
 
661
 
 
662
    if (new_it != NULL) {
 
663
        do_item_release(engine, new_it);
 
664
    }
 
665
 
 
666
    if (stored == ENGINE_SUCCESS) {
 
667
        *cas = item_get_cas(it);
 
668
    }
 
669
 
 
670
    return stored;
 
671
}
 
672
 
 
673
 
 
674
/*
 
675
 * adds a delta value to a numeric item.
 
676
 *
 
677
 * c     connection requesting the operation
 
678
 * it    item to adjust
 
679
 * incr  true to increment value, false to decrement
 
680
 * delta amount to adjust value by
 
681
 * buf   buffer for response string
 
682
 *
 
683
 * returns a response string to send back to the client.
 
684
 */
 
685
static ENGINE_ERROR_CODE do_add_delta(struct default_engine *engine,
 
686
                                      hash_item *it, const bool incr,
 
687
                                      const int64_t delta, uint64_t *rcas,
 
688
                                      uint64_t *result, const void *cookie) {
 
689
    const char *ptr;
 
690
    uint64_t value;
 
691
    char buf[80];
 
692
    int res;
 
693
 
 
694
    if (it->nbytes >= (sizeof(buf) - 1)) {
 
695
        return ENGINE_EINVAL;
 
696
    }
 
697
 
 
698
    ptr = item_get_data(it);
 
699
    memcpy(buf, ptr, it->nbytes);
 
700
    buf[it->nbytes] = '\0';
 
701
 
 
702
    if (!safe_strtoull(buf, &value)) {
 
703
        return ENGINE_EINVAL;
 
704
    }
 
705
 
 
706
    if (incr) {
 
707
        value += delta;
 
708
    } else {
 
709
        if(delta > value) {
 
710
            value = 0;
 
711
        } else {
 
712
            value -= delta;
 
713
        }
 
714
    }
 
715
 
 
716
    *result = value;
 
717
    if ((res = snprintf(buf, sizeof(buf), "%" PRIu64, value)) == -1) {
 
718
        return ENGINE_EINVAL;
 
719
    }
 
720
 
 
721
    if (it->refcount == 1 && res <= it->nbytes) {
 
722
        // we can do inline replacement
 
723
        memcpy(item_get_data(it), buf, res);
 
724
        memset(item_get_data(it) + res, ' ', it->nbytes - res);
 
725
        item_set_cas(NULL, NULL, it, get_cas_id());
 
726
        *rcas = item_get_cas(it);
 
727
    } else {
 
728
        hash_item *new_it = do_item_alloc(engine, item_get_key(it),
 
729
                                          it->nkey, it->flags,
 
730
                                          it->exptime, res,
 
731
                                          cookie);
 
732
        if (new_it == NULL) {
 
733
            do_item_unlink(engine, it);
 
734
            return ENGINE_ENOMEM;
 
735
        }
 
736
        memcpy(item_get_data(new_it), buf, res);
 
737
        do_item_replace(engine, it, new_it);
 
738
        *rcas = item_get_cas(new_it);
 
739
        do_item_release(engine, new_it);       /* release our reference */
 
740
    }
 
741
 
 
742
    return ENGINE_SUCCESS;
 
743
}
 
744
 
 
745
/********************************* ITEM ACCESS *******************************/
 
746
 
 
747
/*
 
748
 * Allocates a new item.
 
749
 */
 
750
hash_item *item_alloc(struct default_engine *engine,
 
751
                      const void *key, size_t nkey, int flags,
 
752
                      rel_time_t exptime, int nbytes, const void *cookie) {
 
753
    hash_item *it;
 
754
    pthread_mutex_lock(&engine->cache_lock);
 
755
    it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
 
756
    pthread_mutex_unlock(&engine->cache_lock);
 
757
    return it;
 
758
}
 
759
 
 
760
/*
 
761
 * Returns an item if it hasn't been marked as expired,
 
762
 * lazy-expiring as needed.
 
763
 */
 
764
hash_item *item_get(struct default_engine *engine,
 
765
                    const void *key, const size_t nkey) {
 
766
    hash_item *it;
 
767
    pthread_mutex_lock(&engine->cache_lock);
 
768
    it = do_item_get(engine, key, nkey);
 
769
    pthread_mutex_unlock(&engine->cache_lock);
 
770
    return it;
 
771
}
 
772
 
 
773
/*
 
774
 * Decrements the reference count on an item and adds it to the freelist if
 
775
 * needed.
 
776
 */
 
777
void item_release(struct default_engine *engine, hash_item *item) {
 
778
    pthread_mutex_lock(&engine->cache_lock);
 
779
    do_item_release(engine, item);
 
780
    pthread_mutex_unlock(&engine->cache_lock);
 
781
}
 
782
 
 
783
/*
 
784
 * Unlinks an item from the LRU and hashtable.
 
785
 */
 
786
void item_unlink(struct default_engine *engine, hash_item *item) {
 
787
    pthread_mutex_lock(&engine->cache_lock);
 
788
    do_item_unlink(engine, item);
 
789
    pthread_mutex_unlock(&engine->cache_lock);
 
790
}
 
791
 
 
792
static ENGINE_ERROR_CODE do_arithmetic(struct default_engine *engine,
 
793
                                       const void* cookie,
 
794
                                       const void* key,
 
795
                                       const int nkey,
 
796
                                       const bool increment,
 
797
                                       const bool create,
 
798
                                       const uint64_t delta,
 
799
                                       const uint64_t initial,
 
800
                                       const rel_time_t exptime,
 
801
                                       uint64_t *cas,
 
802
                                       uint64_t *result)
 
803
{
 
804
   hash_item *item = do_item_get(engine, key, nkey);
 
805
   ENGINE_ERROR_CODE ret;
 
806
 
 
807
   if (item == NULL) {
 
808
      if (!create) {
 
809
         return ENGINE_KEY_ENOENT;
 
810
      } else {
 
811
         char buffer[128];
 
812
         int len = snprintf(buffer, sizeof(buffer), "%"PRIu64,
 
813
                            (uint64_t)initial);
 
814
 
 
815
         item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie);
 
816
         if (item == NULL) {
 
817
            return ENGINE_ENOMEM;
 
818
         }
 
819
         memcpy((void*)item_get_data(item), buffer, len);
 
820
         if ((ret = do_store_item(engine, item, cas,
 
821
                                  OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
 
822
             *result = initial;
 
823
             *cas = item_get_cas(item);
 
824
         }
 
825
         do_item_release(engine, item);
 
826
      }
 
827
   } else {
 
828
      ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
 
829
      do_item_release(engine, item);
 
830
   }
 
831
 
 
832
   return ret;
 
833
}
 
834
 
 
835
ENGINE_ERROR_CODE arithmetic(struct default_engine *engine,
 
836
                             const void* cookie,
 
837
                             const void* key,
 
838
                             const int nkey,
 
839
                             const bool increment,
 
840
                             const bool create,
 
841
                             const uint64_t delta,
 
842
                             const uint64_t initial,
 
843
                             const rel_time_t exptime,
 
844
                             uint64_t *cas,
 
845
                             uint64_t *result)
 
846
{
 
847
    ENGINE_ERROR_CODE ret;
 
848
 
 
849
    pthread_mutex_lock(&engine->cache_lock);
 
850
    ret = do_arithmetic(engine, cookie, key, nkey, increment,
 
851
                        create, delta, initial, exptime, cas,
 
852
                        result);
 
853
    pthread_mutex_unlock(&engine->cache_lock);
 
854
    return ret;
 
855
}
 
856
 
 
857
/*
 
858
 * Stores an item in the cache (high level, obeys set/add/replace semantics)
 
859
 */
 
860
ENGINE_ERROR_CODE store_item(struct default_engine *engine,
 
861
                             hash_item *item, uint64_t *cas,
 
862
                             ENGINE_STORE_OPERATION operation,
 
863
                             const void *cookie) {
 
864
    ENGINE_ERROR_CODE ret;
 
865
 
 
866
    pthread_mutex_lock(&engine->cache_lock);
 
867
    ret = do_store_item(engine, item, cas, operation, cookie);
 
868
    pthread_mutex_unlock(&engine->cache_lock);
 
869
    return ret;
 
870
}
 
871
 
 
872
static hash_item *do_touch_item(struct default_engine *engine,
 
873
                                     const void *key,
 
874
                                     uint16_t nkey,
 
875
                                     uint32_t exptime)
 
876
{
 
877
   hash_item *item = do_item_get(engine, key, nkey);
 
878
   if (item != NULL) {
 
879
       item->exptime = exptime;
 
880
   }
 
881
   return item;
 
882
}
 
883
 
 
884
hash_item *touch_item(struct default_engine *engine,
 
885
                           const void *key,
 
886
                           uint16_t nkey,
 
887
                           uint32_t exptime)
 
888
{
 
889
    hash_item *ret;
 
890
 
 
891
    pthread_mutex_lock(&engine->cache_lock);
 
892
    ret = do_touch_item(engine, key, nkey, exptime);
 
893
    pthread_mutex_unlock(&engine->cache_lock);
 
894
    return ret;
 
895
}
 
896
 
 
897
/*
 
898
 * Flushes expired items after a flush_all call
 
899
 */
 
900
void item_flush_expired(struct default_engine *engine, time_t when) {
 
901
    int i;
 
902
    hash_item *iter, *next;
 
903
 
 
904
    pthread_mutex_lock(&engine->cache_lock);
 
905
 
 
906
    if (when == 0) {
 
907
        engine->config.oldest_live = engine->server.core->get_current_time() - 1;
 
908
    } else {
 
909
        engine->config.oldest_live = engine->server.core->realtime(when) - 1;
 
910
    }
 
911
 
 
912
    if (engine->config.oldest_live != 0) {
 
913
        for (i = 0; i < POWER_LARGEST; i++) {
 
914
            /*
 
915
             * The LRU is sorted in decreasing time order, and an item's
 
916
             * timestamp is never newer than its last access time, so we
 
917
             * only need to walk back until we hit an item older than the
 
918
             * oldest_live time.
 
919
             * The oldest_live checking will auto-expire the remaining items.
 
920
             */
 
921
            for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
 
922
                if (iter->time >= engine->config.oldest_live) {
 
923
                    next = iter->next;
 
924
                    if ((iter->iflag & ITEM_SLABBED) == 0) {
 
925
                        do_item_unlink(engine, iter);
 
926
                    }
 
927
                } else {
 
928
                    /* We've hit the first old item. Continue to the next queue. */
 
929
                    break;
 
930
                }
 
931
            }
 
932
        }
 
933
    }
 
934
    pthread_mutex_unlock(&engine->cache_lock);
 
935
}
 
936
 
 
937
/*
 
938
 * Dumps part of the cache
 
939
 */
 
940
char *item_cachedump(struct default_engine *engine,
 
941
                     unsigned int slabs_clsid,
 
942
                     unsigned int limit,
 
943
                     unsigned int *bytes) {
 
944
    char *ret;
 
945
 
 
946
    pthread_mutex_lock(&engine->cache_lock);
 
947
    ret = do_item_cachedump(slabs_clsid, limit, bytes);
 
948
    pthread_mutex_unlock(&engine->cache_lock);
 
949
    return ret;
 
950
}
 
951
 
 
952
void item_stats(struct default_engine *engine,
 
953
                   ADD_STAT add_stat, const void *cookie)
 
954
{
 
955
    pthread_mutex_lock(&engine->cache_lock);
 
956
    do_item_stats(engine, add_stat, cookie);
 
957
    pthread_mutex_unlock(&engine->cache_lock);
 
958
}
 
959
 
 
960
 
 
961
void item_stats_sizes(struct default_engine *engine,
 
962
                      ADD_STAT add_stat, const void *cookie)
 
963
{
 
964
    pthread_mutex_lock(&engine->cache_lock);
 
965
    do_item_stats_sizes(engine, add_stat, cookie);
 
966
    pthread_mutex_unlock(&engine->cache_lock);
 
967
}
 
968
 
 
969
static void do_item_link_cursor(struct default_engine *engine,
 
970
                                hash_item *cursor, int ii)
 
971
{
 
972
    cursor->slabs_clsid = (uint8_t)ii;
 
973
    cursor->next = NULL;
 
974
    cursor->prev = engine->items.tails[ii];
 
975
    engine->items.tails[ii]->next = cursor;
 
976
    engine->items.tails[ii] = cursor;
 
977
    engine->items.sizes[ii]++;
 
978
}
 
979
 
 
980
typedef ENGINE_ERROR_CODE (*ITERFUNC)(struct default_engine *engine,
 
981
                                      hash_item *item, void *cookie);
 
982
 
 
983
static bool do_item_walk_cursor(struct default_engine *engine,
 
984
                                hash_item *cursor,
 
985
                                int steplength,
 
986
                                ITERFUNC itemfunc,
 
987
                                void* itemdata,
 
988
                                ENGINE_ERROR_CODE *error)
 
989
{
 
990
    int ii = 0;
 
991
    *error = ENGINE_SUCCESS;
 
992
 
 
993
    while (cursor->prev != NULL && ii < steplength) {
 
994
        ++ii;
 
995
        /* Move cursor */
 
996
        hash_item *ptr = cursor->prev;
 
997
        item_unlink_q(engine, cursor);
 
998
 
 
999
        bool done = false;
 
1000
        if (ptr == engine->items.heads[cursor->slabs_clsid]) {
 
1001
            done = true;
 
1002
            cursor->prev = NULL;
 
1003
        } else {
 
1004
            cursor->next = ptr;
 
1005
            cursor->prev = ptr->prev;
 
1006
            cursor->prev->next = cursor;
 
1007
            ptr->prev = cursor;
 
1008
        }
 
1009
 
 
1010
        /* Ignore cursors */
 
1011
        if (ptr->nkey == 0 && ptr->nbytes == 0) {
 
1012
            --ii;
 
1013
        } else {
 
1014
            *error = itemfunc(engine, ptr, itemdata);
 
1015
            if (*error != ENGINE_SUCCESS) {
 
1016
                return false;
 
1017
            }
 
1018
        }
 
1019
 
 
1020
        if (done) {
 
1021
            return false;
 
1022
        }
 
1023
    }
 
1024
 
 
1025
    return (cursor->prev != NULL);
 
1026
}
 
1027
 
 
1028
static ENGINE_ERROR_CODE item_scrub(struct default_engine *engine,
 
1029
                                    hash_item *item,
 
1030
                                    void *cookie) {
 
1031
    (void)cookie;
 
1032
    engine->scrubber.visited++;
 
1033
    rel_time_t current_time = engine->server.core->get_current_time();
 
1034
    if (item->refcount == 0 &&
 
1035
        (item->exptime != 0 && item->exptime < current_time)) {
 
1036
        do_item_unlink(engine, item);
 
1037
        engine->scrubber.cleaned++;
 
1038
    }
 
1039
    return ENGINE_SUCCESS;
 
1040
}
 
1041
 
 
1042
static void item_scrub_class(struct default_engine *engine,
 
1043
                             hash_item *cursor) {
 
1044
 
 
1045
    ENGINE_ERROR_CODE ret;
 
1046
    bool more;
 
1047
    do {
 
1048
        pthread_mutex_lock(&engine->cache_lock);
 
1049
        more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
 
1050
        pthread_mutex_unlock(&engine->cache_lock);
 
1051
        if (ret != ENGINE_SUCCESS) {
 
1052
            break;
 
1053
        }
 
1054
    } while (more);
 
1055
}
 
1056
 
 
1057
static void *item_scubber_main(void *arg)
 
1058
{
 
1059
    struct default_engine *engine = arg;
 
1060
    hash_item cursor = { .refcount = 1 };
 
1061
 
 
1062
    for (int ii = 0; ii < POWER_LARGEST; ++ii) {
 
1063
        pthread_mutex_lock(&engine->cache_lock);
 
1064
        bool skip = false;
 
1065
        if (engine->items.heads[ii] == NULL) {
 
1066
            skip = true;
 
1067
        } else {
 
1068
            // add the item at the tail
 
1069
            do_item_link_cursor(engine, &cursor, ii);
 
1070
        }
 
1071
        pthread_mutex_unlock(&engine->cache_lock);
 
1072
 
 
1073
        if (!skip) {
 
1074
            item_scrub_class(engine, &cursor);
 
1075
        }
 
1076
    }
 
1077
 
 
1078
    pthread_mutex_lock(&engine->scrubber.lock);
 
1079
    engine->scrubber.stopped = time(NULL);
 
1080
    engine->scrubber.running = false;
 
1081
    pthread_mutex_unlock(&engine->scrubber.lock);
 
1082
 
 
1083
    return NULL;
 
1084
}
 
1085
 
 
1086
bool item_start_scrub(struct default_engine *engine)
 
1087
{
 
1088
    bool ret = false;
 
1089
    pthread_mutex_lock(&engine->scrubber.lock);
 
1090
    if (!engine->scrubber.running) {
 
1091
        engine->scrubber.started = time(NULL);
 
1092
        engine->scrubber.stopped = 0;
 
1093
        engine->scrubber.visited = 0;
 
1094
        engine->scrubber.cleaned = 0;
 
1095
        engine->scrubber.running = true;
 
1096
 
 
1097
        pthread_t t;
 
1098
        pthread_attr_t attr;
 
1099
 
 
1100
        if (pthread_attr_init(&attr) != 0 ||
 
1101
            pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
 
1102
            pthread_create(&t, &attr, item_scubber_main, engine) != 0)
 
1103
        {
 
1104
            engine->scrubber.running = false;
 
1105
        } else {
 
1106
            ret = true;
 
1107
        }
 
1108
    }
 
1109
    pthread_mutex_unlock(&engine->scrubber.lock);
 
1110
 
 
1111
    return ret;
 
1112
}
 
1113
 
 
1114
struct tap_client {
 
1115
    hash_item cursor;
 
1116
    hash_item *it;
 
1117
};
 
1118
 
 
1119
static ENGINE_ERROR_CODE item_tap_iterfunc(struct default_engine *engine,
 
1120
                                    hash_item *item,
 
1121
                                    void *cookie) {
 
1122
    struct tap_client *client = cookie;
 
1123
    client->it = item;
 
1124
    ++client->it->refcount;
 
1125
    return ENGINE_SUCCESS;
 
1126
}
 
1127
 
 
1128
static tap_event_t do_item_tap_walker(struct default_engine *engine,
 
1129
                                         const void *cookie, item **itm,
 
1130
                                         void **es, uint16_t *nes, uint8_t *ttl,
 
1131
                                         uint16_t *flags, uint32_t *seqno,
 
1132
                                         uint16_t *vbucket)
 
1133
{
 
1134
    struct tap_client *client = engine->server.cookie->get_engine_specific(cookie);
 
1135
    if (client == NULL) {
 
1136
        return TAP_DISCONNECT;
 
1137
    }
 
1138
 
 
1139
    *es = NULL;
 
1140
    *nes = 0;
 
1141
    *ttl = (uint8_t)-1;
 
1142
    *seqno = 0;
 
1143
    *flags = 0;
 
1144
    *vbucket = 0;
 
1145
    client->it = NULL;
 
1146
 
 
1147
    ENGINE_ERROR_CODE r;
 
1148
    do {
 
1149
        if (!do_item_walk_cursor(engine, &client->cursor, 1, item_tap_iterfunc, client, &r)) {
 
1150
            // find next slab class to look at..
 
1151
            bool linked = false;
 
1152
            for (int ii = client->cursor.slabs_clsid + 1; ii < POWER_LARGEST && !linked;  ++ii) {
 
1153
                if (engine->items.heads[ii] != NULL) {
 
1154
                    // add the item at the tail
 
1155
                    do_item_link_cursor(engine, &client->cursor, ii);
 
1156
                    linked = true;
 
1157
                }
 
1158
            }
 
1159
            if (!linked) {
 
1160
                break;
 
1161
            }
 
1162
        }
 
1163
    } while (client->it == NULL);
 
1164
    *itm = client->it;
 
1165
 
 
1166
    return (*itm == NULL) ? TAP_DISCONNECT : TAP_MUTATION;
 
1167
}
 
1168
 
 
1169
tap_event_t item_tap_walker(ENGINE_HANDLE* handle,
 
1170
                            const void *cookie, item **itm,
 
1171
                            void **es, uint16_t *nes, uint8_t *ttl,
 
1172
                            uint16_t *flags, uint32_t *seqno,
 
1173
                            uint16_t *vbucket)
 
1174
{
 
1175
    tap_event_t ret;
 
1176
    struct default_engine *engine = (struct default_engine*)handle;
 
1177
    pthread_mutex_lock(&engine->cache_lock);
 
1178
    ret = do_item_tap_walker(engine, cookie, itm, es, nes, ttl, flags, seqno, vbucket);
 
1179
    pthread_mutex_unlock(&engine->cache_lock);
 
1180
 
 
1181
    return ret;
 
1182
}
 
1183
 
 
1184
bool initialize_item_tap_walker(struct default_engine *engine,
 
1185
                                const void* cookie)
 
1186
{
 
1187
    struct tap_client *client = calloc(1, sizeof(*client));
 
1188
    if (client == NULL) {
 
1189
        return false;
 
1190
    }
 
1191
    client->cursor.refcount = 1;
 
1192
 
 
1193
    /* Link the cursor! */
 
1194
    bool linked = false;
 
1195
    for (int ii = 0; ii < POWER_LARGEST && !linked; ++ii) {
 
1196
        pthread_mutex_lock(&engine->cache_lock);
 
1197
        if (engine->items.heads[ii] != NULL) {
 
1198
            // add the item at the tail
 
1199
            do_item_link_cursor(engine, &client->cursor, ii);
 
1200
            linked = true;
 
1201
        }
 
1202
        pthread_mutex_unlock(&engine->cache_lock);
 
1203
    }
 
1204
 
 
1205
    engine->server.cookie->store_engine_specific(cookie, client);
 
1206
    return true;
 
1207
}