~brianaker/libmemcached/1164440

« back to all changes in this revision

Viewing changes to memcached/thread.c

Merge working tree with build tree.

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
 
2
/*
 
3
 * Thread management for memcached.
 
4
 */
 
5
#include "memcached.h"
 
6
#include <assert.h>
 
7
#include <stdio.h>
 
8
#include <errno.h>
 
9
#include <stdlib.h>
 
10
#include <errno.h>
 
11
#include <string.h>
 
12
#include <pthread.h>
 
13
 
 
14
#ifdef __sun
 
15
#include <atomic.h>
 
16
#endif
 
17
 
 
18
#define ITEMS_PER_ALLOC 64
 
19
 
 
20
/* An item in the connection queue. */
 
21
typedef struct conn_queue_item CQ_ITEM;
 
22
struct conn_queue_item {
 
23
    int               sfd;
 
24
    enum conn_states  init_state;
 
25
    int               event_flags;
 
26
    int               read_buffer_size;
 
27
    enum network_transport     transport;
 
28
    CQ_ITEM          *next;
 
29
};
 
30
 
 
31
/* A connection queue. */
 
32
typedef struct conn_queue CQ;
 
33
struct conn_queue {
 
34
    CQ_ITEM *head;
 
35
    CQ_ITEM *tail;
 
36
    pthread_mutex_t lock;
 
37
    pthread_cond_t  cond;
 
38
};
 
39
 
 
40
/* Lock for cache operations (item_*, assoc_*) */
 
41
pthread_mutex_t cache_lock;
 
42
 
 
43
/* Connection lock around accepting new connections */
 
44
pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
 
45
 
 
46
#if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
 
47
pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
 
48
#endif
 
49
 
 
50
/* Lock for global stats */
 
51
static pthread_mutex_t stats_lock;
 
52
 
 
53
/* Free list of CQ_ITEM structs */
 
54
static CQ_ITEM *cqi_freelist;
 
55
static pthread_mutex_t cqi_freelist_lock;
 
56
 
 
57
static pthread_mutex_t *item_locks;
 
58
/* size of the item lock hash table */
 
59
static uint32_t item_lock_count;
 
60
/* size - 1 for lookup masking */
 
61
static uint32_t item_lock_mask;
 
62
 
 
63
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
 
64
 
 
65
/*
 
66
 * Each libevent instance has a wakeup pipe, which other threads
 
67
 * can use to signal that they've put a new connection on its queue.
 
68
 */
 
69
static LIBEVENT_THREAD *threads;
 
70
 
 
71
/*
 
72
 * Number of worker threads that have finished setting themselves up.
 
73
 */
 
74
static int init_count = 0;
 
75
static pthread_mutex_t init_lock;
 
76
static pthread_cond_t init_cond;
 
77
 
 
78
 
 
79
static void thread_libevent_process(int fd, short which, void *arg);
 
80
 
 
81
unsigned short refcount_incr(unsigned short *refcount) {
 
82
#ifdef HAVE_GCC_ATOMICS
 
83
    return __sync_add_and_fetch(refcount, 1);
 
84
#elif defined(__sun)
 
85
    return atomic_inc_ushort_nv(refcount);
 
86
#else
 
87
    unsigned short res;
 
88
    mutex_lock(&atomics_mutex);
 
89
    (*refcount)++;
 
90
    res = *refcount;
 
91
    pthread_mutex_unlock(&atomics_mutex);
 
92
    return res;
 
93
#endif
 
94
}
 
95
 
 
96
unsigned short refcount_decr(unsigned short *refcount) {
 
97
#ifdef HAVE_GCC_ATOMICS
 
98
    return __sync_sub_and_fetch(refcount, 1);
 
99
#elif defined(__sun)
 
100
    return atomic_dec_ushort_nv(refcount);
 
101
#else
 
102
    unsigned short res;
 
103
    mutex_lock(&atomics_mutex);
 
104
    (*refcount)--;
 
105
    res = *refcount;
 
106
    pthread_mutex_unlock(&atomics_mutex);
 
107
    return res;
 
108
#endif
 
109
}
 
110
 
 
111
void item_lock(uint32_t hv) {
 
112
    mutex_lock(&item_locks[hv & item_lock_mask]);
 
113
}
 
114
 
 
115
void item_unlock(uint32_t hv) {
 
116
    pthread_mutex_unlock(&item_locks[hv & item_lock_mask]);
 
117
}
 
118
 
 
119
/*
 
120
 * Initializes a connection queue.
 
121
 */
 
122
static void cq_init(CQ *cq) {
 
123
    pthread_mutex_init(&cq->lock, NULL);
 
124
    pthread_cond_init(&cq->cond, NULL);
 
125
    cq->head = NULL;
 
126
    cq->tail = NULL;
 
127
}
 
128
 
 
129
/*
 
130
 * Looks for an item on a connection queue, but doesn't block if there isn't
 
131
 * one.
 
132
 * Returns the item, or NULL if no item is available
 
133
 */
 
134
static CQ_ITEM *cq_pop(CQ *cq) {
 
135
    CQ_ITEM *item;
 
136
 
 
137
    pthread_mutex_lock(&cq->lock);
 
138
    item = cq->head;
 
139
    if (NULL != item) {
 
140
        cq->head = item->next;
 
141
        if (NULL == cq->head)
 
142
            cq->tail = NULL;
 
143
    }
 
144
    pthread_mutex_unlock(&cq->lock);
 
145
 
 
146
    return item;
 
147
}
 
148
 
 
149
/*
 
150
 * Adds an item to a connection queue.
 
151
 */
 
152
static void cq_push(CQ *cq, CQ_ITEM *item) {
 
153
    item->next = NULL;
 
154
 
 
155
    pthread_mutex_lock(&cq->lock);
 
156
    if (NULL == cq->tail)
 
157
        cq->head = item;
 
158
    else
 
159
        cq->tail->next = item;
 
160
    cq->tail = item;
 
161
    pthread_cond_signal(&cq->cond);
 
162
    pthread_mutex_unlock(&cq->lock);
 
163
}
 
164
 
 
165
/*
 
166
 * Returns a fresh connection queue item.
 
167
 */
 
168
static CQ_ITEM *cqi_new(void) {
 
169
    CQ_ITEM *item = NULL;
 
170
    pthread_mutex_lock(&cqi_freelist_lock);
 
171
    if (cqi_freelist) {
 
172
        item = cqi_freelist;
 
173
        cqi_freelist = item->next;
 
174
    }
 
175
    pthread_mutex_unlock(&cqi_freelist_lock);
 
176
 
 
177
    if (NULL == item) {
 
178
        int i;
 
179
 
 
180
        /* Allocate a bunch of items at once to reduce fragmentation */
 
181
        item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
 
182
        if (NULL == item)
 
183
            return NULL;
 
184
 
 
185
        /*
 
186
         * Link together all the new items except the first one
 
187
         * (which we'll return to the caller) for placement on
 
188
         * the freelist.
 
189
         */
 
190
        for (i = 2; i < ITEMS_PER_ALLOC; i++)
 
191
            item[i - 1].next = &item[i];
 
192
 
 
193
        pthread_mutex_lock(&cqi_freelist_lock);
 
194
        item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
 
195
        cqi_freelist = &item[1];
 
196
        pthread_mutex_unlock(&cqi_freelist_lock);
 
197
    }
 
198
 
 
199
    return item;
 
200
}
 
201
 
 
202
 
 
203
/*
 
204
 * Frees a connection queue item (adds it to the freelist.)
 
205
 */
 
206
static void cqi_free(CQ_ITEM *item) {
 
207
    pthread_mutex_lock(&cqi_freelist_lock);
 
208
    item->next = cqi_freelist;
 
209
    cqi_freelist = item;
 
210
    pthread_mutex_unlock(&cqi_freelist_lock);
 
211
}
 
212
 
 
213
 
 
214
/*
 
215
 * Creates a worker thread.
 
216
 */
 
217
static void create_worker(void *(*func)(void *), void *arg) {
 
218
    pthread_t       thread;
 
219
    pthread_attr_t  attr;
 
220
    int             ret;
 
221
 
 
222
    pthread_attr_init(&attr);
 
223
 
 
224
    if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
 
225
        fprintf(stderr, "Can't create thread: %s\n",
 
226
                strerror(ret));
 
227
        exit(1);
 
228
    }
 
229
}
 
230
 
 
231
/*
 
232
 * Sets whether or not we accept new connections.
 
233
 */
 
234
void accept_new_conns(const bool do_accept) {
 
235
    pthread_mutex_lock(&conn_lock);
 
236
    do_accept_new_conns(do_accept);
 
237
    pthread_mutex_unlock(&conn_lock);
 
238
}
 
239
/****************************** LIBEVENT THREADS *****************************/
 
240
 
 
241
/*
 
242
 * Set up a thread's information.
 
243
 */
 
244
static void setup_thread(LIBEVENT_THREAD *me) {
 
245
    me->base = event_init();
 
246
    if (! me->base) {
 
247
        fprintf(stderr, "Can't allocate event base\n");
 
248
        exit(1);
 
249
    }
 
250
 
 
251
    /* Listen for notifications from other threads */
 
252
    event_set(&me->notify_event, me->notify_receive_fd,
 
253
              EV_READ | EV_PERSIST, thread_libevent_process, me);
 
254
    event_base_set(me->base, &me->notify_event);
 
255
 
 
256
    if (event_add(&me->notify_event, 0) == -1) {
 
257
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
 
258
        exit(1);
 
259
    }
 
260
 
 
261
    me->new_conn_queue = malloc(sizeof(struct conn_queue));
 
262
    if (me->new_conn_queue == NULL) {
 
263
        perror("Failed to allocate memory for connection queue");
 
264
        exit(EXIT_FAILURE);
 
265
    }
 
266
    cq_init(me->new_conn_queue);
 
267
 
 
268
    if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
 
269
        perror("Failed to initialize mutex");
 
270
        exit(EXIT_FAILURE);
 
271
    }
 
272
 
 
273
    me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
 
274
                                    NULL, NULL);
 
275
    if (me->suffix_cache == NULL) {
 
276
        fprintf(stderr, "Failed to create suffix cache\n");
 
277
        exit(EXIT_FAILURE);
 
278
    }
 
279
}
 
280
 
 
281
 
 
282
/*
 
283
 * Worker thread: main event loop
 
284
 */
 
285
static void *worker_libevent(void *arg) {
 
286
    LIBEVENT_THREAD *me = arg;
 
287
 
 
288
    /* Any per-thread setup can happen here; thread_init() will block until
 
289
     * all threads have finished initializing.
 
290
     */
 
291
 
 
292
    pthread_mutex_lock(&init_lock);
 
293
    init_count++;
 
294
    pthread_cond_signal(&init_cond);
 
295
    pthread_mutex_unlock(&init_lock);
 
296
 
 
297
    event_base_loop(me->base, 0);
 
298
    return NULL;
 
299
}
 
300
 
 
301
 
 
302
#ifndef __INTEL_COMPILER
 
303
#pragma GCC diagnostic ignored "-Wunused-parameter"
 
304
#endif
 
305
/*
 
306
 * Processes an incoming "handle a new connection" item. This is called when
 
307
 * input arrives on the libevent wakeup pipe.
 
308
 */
 
309
static void thread_libevent_process(int fd, short which, void *arg) {
 
310
    LIBEVENT_THREAD *me = arg;
 
311
    CQ_ITEM *item;
 
312
    char buf[1];
 
313
 
 
314
    if (read(fd, buf, 1) != 1)
 
315
        if (settings.verbose > 0)
 
316
            fprintf(stderr, "Can't read from libevent pipe\n");
 
317
 
 
318
    item = cq_pop(me->new_conn_queue);
 
319
 
 
320
    if (NULL != item) {
 
321
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
 
322
                           item->read_buffer_size, item->transport, me->base);
 
323
        if (c == NULL) {
 
324
            if (IS_UDP(item->transport)) {
 
325
                fprintf(stderr, "Can't listen for events on UDP socket\n");
 
326
                exit(1);
 
327
            } else {
 
328
                if (settings.verbose > 0) {
 
329
                    fprintf(stderr, "Can't listen for events on fd %d\n",
 
330
                        item->sfd);
 
331
                }
 
332
                close(item->sfd);
 
333
            }
 
334
        } else {
 
335
            c->thread = me;
 
336
        }
 
337
        cqi_free(item);
 
338
    }
 
339
}
 
340
 
 
341
/* Which thread we assigned a connection to most recently. */
 
342
static int last_thread = -1;
 
343
 
 
344
/*
 
345
 * Dispatches a new connection to another thread. This is only ever called
 
346
 * from the main thread, either during initialization (for UDP) or because
 
347
 * of an incoming connection.
 
348
 */
 
349
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
 
350
                       int read_buffer_size, enum network_transport transport) {
 
351
    CQ_ITEM *item = cqi_new();
 
352
    int tid = (last_thread + 1) % settings.num_threads;
 
353
 
 
354
    LIBEVENT_THREAD *thread = threads + tid;
 
355
 
 
356
    last_thread = tid;
 
357
 
 
358
    item->sfd = sfd;
 
359
    item->init_state = init_state;
 
360
    item->event_flags = event_flags;
 
361
    item->read_buffer_size = read_buffer_size;
 
362
    item->transport = transport;
 
363
 
 
364
    cq_push(thread->new_conn_queue, item);
 
365
 
 
366
    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
 
367
    if (write(thread->notify_send_fd, "", 1) != 1) {
 
368
        perror("Writing to thread notify pipe");
 
369
    }
 
370
}
 
371
 
 
372
/*
 
373
 * Returns true if this is the thread that listens for new TCP connections.
 
374
 */
 
375
int is_listen_thread() {
 
376
    return pthread_self() == dispatcher_thread.thread_id;
 
377
}
 
378
 
 
379
/********************************* ITEM ACCESS *******************************/
 
380
 
 
381
/*
 
382
 * Allocates a new item.
 
383
 */
 
384
item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
 
385
    item *it;
 
386
    /* do_item_alloc handles its own locks */
 
387
    it = do_item_alloc(key, nkey, flags, exptime, nbytes);
 
388
    return it;
 
389
}
 
390
 
 
391
/*
 
392
 * Returns an item if it hasn't been marked as expired,
 
393
 * lazy-expiring as needed.
 
394
 */
 
395
item *item_get(const char *key, const size_t nkey) {
 
396
    item *it;
 
397
    uint32_t hv;
 
398
    hv = hash(key, nkey, 0);
 
399
    item_lock(hv);
 
400
    it = do_item_get(key, nkey, hv);
 
401
    item_unlock(hv);
 
402
    return it;
 
403
}
 
404
 
 
405
item *item_touch(const char *key, size_t nkey, uint32_t exptime) {
 
406
    item *it;
 
407
    uint32_t hv;
 
408
    hv = hash(key, nkey, 0);
 
409
    item_lock(hv);
 
410
    it = do_item_touch(key, nkey, exptime, hv);
 
411
    item_unlock(hv);
 
412
    return it;
 
413
}
 
414
 
 
415
/*
 
416
 * Links an item into the LRU and hashtable.
 
417
 */
 
418
int item_link(item *item) {
 
419
    int ret;
 
420
    uint32_t hv;
 
421
 
 
422
    hv = hash(ITEM_key(item), item->nkey, 0);
 
423
    item_lock(hv);
 
424
    ret = do_item_link(item, hv);
 
425
    item_unlock(hv);
 
426
    return ret;
 
427
}
 
428
 
 
429
/*
 
430
 * Decrements the reference count on an item and adds it to the freelist if
 
431
 * needed.
 
432
 */
 
433
void item_remove(item *item) {
 
434
    uint32_t hv;
 
435
    hv = hash(ITEM_key(item), item->nkey, 0);
 
436
 
 
437
    item_lock(hv);
 
438
    do_item_remove(item);
 
439
    item_unlock(hv);
 
440
}
 
441
 
 
442
/*
 
443
 * Replaces one item with another in the hashtable.
 
444
 * Unprotected by a mutex lock since the core server does not require
 
445
 * it to be thread-safe.
 
446
 */
 
447
int item_replace(item *old_it, item *new_it, const uint32_t hv) {
 
448
    return do_item_replace(old_it, new_it, hv);
 
449
}
 
450
 
 
451
/*
 
452
 * Unlinks an item from the LRU and hashtable.
 
453
 */
 
454
void item_unlink(item *item) {
 
455
    uint32_t hv;
 
456
    hv = hash(ITEM_key(item), item->nkey, 0);
 
457
    item_lock(hv);
 
458
    do_item_unlink(item, hv);
 
459
    item_unlock(hv);
 
460
}
 
461
 
 
462
/*
 
463
 * Moves an item to the back of the LRU queue.
 
464
 */
 
465
void item_update(item *item) {
 
466
    uint32_t hv;
 
467
    hv = hash(ITEM_key(item), item->nkey, 0);
 
468
 
 
469
    item_lock(hv);
 
470
    do_item_update(item);
 
471
    item_unlock(hv);
 
472
}
 
473
 
 
474
/*
 
475
 * Does arithmetic on a numeric item value.
 
476
 */
 
477
enum delta_result_type add_delta(conn *c, const char *key,
 
478
                                 const size_t nkey, int incr,
 
479
                                 const int64_t delta, char *buf,
 
480
                                 uint64_t *cas) {
 
481
    enum delta_result_type ret;
 
482
    uint32_t hv;
 
483
 
 
484
    hv = hash(key, nkey, 0);
 
485
    item_lock(hv);
 
486
    ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv);
 
487
    item_unlock(hv);
 
488
    return ret;
 
489
}
 
490
 
 
491
/*
 
492
 * Stores an item in the cache (high level, obeys set/add/replace semantics)
 
493
 */
 
494
enum store_item_type store_item(item *item, int comm, conn* c) {
 
495
    enum store_item_type ret;
 
496
    uint32_t hv;
 
497
 
 
498
    hv = hash(ITEM_key(item), item->nkey, 0);
 
499
    item_lock(hv);
 
500
    ret = do_store_item(item, comm, c, hv);
 
501
    item_unlock(hv);
 
502
    return ret;
 
503
}
 
504
 
 
505
/*
 
506
 * Flushes expired items after a flush_all call
 
507
 */
 
508
void item_flush_expired() {
 
509
    mutex_lock(&cache_lock);
 
510
    do_item_flush_expired();
 
511
    pthread_mutex_unlock(&cache_lock);
 
512
}
 
513
 
 
514
/*
 
515
 * Dumps part of the cache
 
516
 */
 
517
char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes) {
 
518
    char *ret;
 
519
 
 
520
    mutex_lock(&cache_lock);
 
521
    ret = do_item_cachedump(slabs_clsid, limit, bytes);
 
522
    pthread_mutex_unlock(&cache_lock);
 
523
    return ret;
 
524
}
 
525
 
 
526
/*
 
527
 * Dumps statistics about slab classes
 
528
 */
 
529
void  item_stats(ADD_STAT add_stats, void *c) {
 
530
    mutex_lock(&cache_lock);
 
531
    do_item_stats(add_stats, c);
 
532
    pthread_mutex_unlock(&cache_lock);
 
533
}
 
534
 
 
535
/*
 
536
 * Dumps a list of objects of each size in 32-byte increments
 
537
 */
 
538
void  item_stats_sizes(ADD_STAT add_stats, void *c) {
 
539
    mutex_lock(&cache_lock);
 
540
    do_item_stats_sizes(add_stats, c);
 
541
    pthread_mutex_unlock(&cache_lock);
 
542
}
 
543
 
 
544
/******************************* GLOBAL STATS ******************************/
 
545
 
 
546
void STATS_LOCK() {
 
547
    pthread_mutex_lock(&stats_lock);
 
548
}
 
549
 
 
550
void STATS_UNLOCK() {
 
551
    pthread_mutex_unlock(&stats_lock);
 
552
}
 
553
 
 
554
void threadlocal_stats_reset(void) {
 
555
    int ii, sid;
 
556
    for (ii = 0; ii < settings.num_threads; ++ii) {
 
557
        pthread_mutex_lock(&threads[ii].stats.mutex);
 
558
 
 
559
        threads[ii].stats.get_cmds = 0;
 
560
        threads[ii].stats.get_misses = 0;
 
561
        threads[ii].stats.touch_cmds = 0;
 
562
        threads[ii].stats.touch_misses = 0;
 
563
        threads[ii].stats.delete_misses = 0;
 
564
        threads[ii].stats.incr_misses = 0;
 
565
        threads[ii].stats.decr_misses = 0;
 
566
        threads[ii].stats.cas_misses = 0;
 
567
        threads[ii].stats.bytes_read = 0;
 
568
        threads[ii].stats.bytes_written = 0;
 
569
        threads[ii].stats.flush_cmds = 0;
 
570
        threads[ii].stats.conn_yields = 0;
 
571
        threads[ii].stats.auth_cmds = 0;
 
572
        threads[ii].stats.auth_errors = 0;
 
573
 
 
574
        for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
 
575
            threads[ii].stats.slab_stats[sid].set_cmds = 0;
 
576
            threads[ii].stats.slab_stats[sid].get_hits = 0;
 
577
            threads[ii].stats.slab_stats[sid].touch_hits = 0;
 
578
            threads[ii].stats.slab_stats[sid].delete_hits = 0;
 
579
            threads[ii].stats.slab_stats[sid].incr_hits = 0;
 
580
            threads[ii].stats.slab_stats[sid].decr_hits = 0;
 
581
            threads[ii].stats.slab_stats[sid].cas_hits = 0;
 
582
            threads[ii].stats.slab_stats[sid].cas_badval = 0;
 
583
        }
 
584
 
 
585
        pthread_mutex_unlock(&threads[ii].stats.mutex);
 
586
    }
 
587
}
 
588
 
 
589
void threadlocal_stats_aggregate(struct thread_stats *stats) {
 
590
    int ii, sid;
 
591
 
 
592
    /* The struct has a mutex, but we can safely set the whole thing
 
593
     * to zero since it is unused when aggregating. */
 
594
    memset(stats, 0, sizeof(*stats));
 
595
 
 
596
    for (ii = 0; ii < settings.num_threads; ++ii) {
 
597
        pthread_mutex_lock(&threads[ii].stats.mutex);
 
598
 
 
599
        stats->get_cmds += threads[ii].stats.get_cmds;
 
600
        stats->get_misses += threads[ii].stats.get_misses;
 
601
        stats->touch_cmds += threads[ii].stats.touch_cmds;
 
602
        stats->touch_misses += threads[ii].stats.touch_misses;
 
603
        stats->delete_misses += threads[ii].stats.delete_misses;
 
604
        stats->decr_misses += threads[ii].stats.decr_misses;
 
605
        stats->incr_misses += threads[ii].stats.incr_misses;
 
606
        stats->cas_misses += threads[ii].stats.cas_misses;
 
607
        stats->bytes_read += threads[ii].stats.bytes_read;
 
608
        stats->bytes_written += threads[ii].stats.bytes_written;
 
609
        stats->flush_cmds += threads[ii].stats.flush_cmds;
 
610
        stats->conn_yields += threads[ii].stats.conn_yields;
 
611
        stats->auth_cmds += threads[ii].stats.auth_cmds;
 
612
        stats->auth_errors += threads[ii].stats.auth_errors;
 
613
 
 
614
        for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
 
615
            stats->slab_stats[sid].set_cmds +=
 
616
                threads[ii].stats.slab_stats[sid].set_cmds;
 
617
            stats->slab_stats[sid].get_hits +=
 
618
                threads[ii].stats.slab_stats[sid].get_hits;
 
619
            stats->slab_stats[sid].touch_hits +=
 
620
                threads[ii].stats.slab_stats[sid].touch_hits;
 
621
            stats->slab_stats[sid].delete_hits +=
 
622
                threads[ii].stats.slab_stats[sid].delete_hits;
 
623
            stats->slab_stats[sid].decr_hits +=
 
624
                threads[ii].stats.slab_stats[sid].decr_hits;
 
625
            stats->slab_stats[sid].incr_hits +=
 
626
                threads[ii].stats.slab_stats[sid].incr_hits;
 
627
            stats->slab_stats[sid].cas_hits +=
 
628
                threads[ii].stats.slab_stats[sid].cas_hits;
 
629
            stats->slab_stats[sid].cas_badval +=
 
630
                threads[ii].stats.slab_stats[sid].cas_badval;
 
631
        }
 
632
 
 
633
        pthread_mutex_unlock(&threads[ii].stats.mutex);
 
634
    }
 
635
}
 
636
 
 
637
void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
 
638
    int sid;
 
639
 
 
640
    out->set_cmds = 0;
 
641
    out->get_hits = 0;
 
642
    out->touch_hits = 0;
 
643
    out->delete_hits = 0;
 
644
    out->incr_hits = 0;
 
645
    out->decr_hits = 0;
 
646
    out->cas_hits = 0;
 
647
    out->cas_badval = 0;
 
648
 
 
649
    for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
 
650
        out->set_cmds += stats->slab_stats[sid].set_cmds;
 
651
        out->get_hits += stats->slab_stats[sid].get_hits;
 
652
        out->touch_hits += stats->slab_stats[sid].touch_hits;
 
653
        out->delete_hits += stats->slab_stats[sid].delete_hits;
 
654
        out->decr_hits += stats->slab_stats[sid].decr_hits;
 
655
        out->incr_hits += stats->slab_stats[sid].incr_hits;
 
656
        out->cas_hits += stats->slab_stats[sid].cas_hits;
 
657
        out->cas_badval += stats->slab_stats[sid].cas_badval;
 
658
    }
 
659
}
 
660
 
 
661
#ifndef __INTEL_COMPILER
 
662
#pragma GCC diagnostic ignored "-Wsign-compare"
 
663
#endif
 
664
/*
 
665
 * Initializes the thread subsystem, creating various worker threads.
 
666
 *
 
667
 * nthreads  Number of worker event handler threads to spawn
 
668
 * main_base Event base for main thread
 
669
 */
 
670
void thread_init(int nthreads, struct event_base *main_base) {
 
671
    int         i;
 
672
    int         power;
 
673
 
 
674
    pthread_mutex_init(&cache_lock, NULL);
 
675
    pthread_mutex_init(&stats_lock, NULL);
 
676
 
 
677
    pthread_mutex_init(&init_lock, NULL);
 
678
    pthread_cond_init(&init_cond, NULL);
 
679
 
 
680
    pthread_mutex_init(&cqi_freelist_lock, NULL);
 
681
    cqi_freelist = NULL;
 
682
 
 
683
    /* Want a wide lock table, but don't waste memory */
 
684
    if (nthreads < 3) {
 
685
        power = 10;
 
686
    } else if (nthreads < 4) {
 
687
        power = 11;
 
688
    } else if (nthreads < 5) {
 
689
        power = 12;
 
690
    } else {
 
691
        /* 8192 buckets, and central locks don't scale much past 5 threads */
 
692
        power = 13;
 
693
    }
 
694
 
 
695
    item_lock_count = ((unsigned long int)1 << (power));
 
696
    item_lock_mask  = item_lock_count - 1;
 
697
 
 
698
    item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
 
699
    if (! item_locks) {
 
700
        perror("Can't allocate item locks");
 
701
        exit(1);
 
702
    }
 
703
    for (i = 0; i < item_lock_count; i++) {
 
704
        pthread_mutex_init(&item_locks[i], NULL);
 
705
    }
 
706
 
 
707
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
 
708
    if (! threads) {
 
709
        perror("Can't allocate thread descriptors");
 
710
        exit(1);
 
711
    }
 
712
 
 
713
    dispatcher_thread.base = main_base;
 
714
    dispatcher_thread.thread_id = pthread_self();
 
715
 
 
716
    for (i = 0; i < nthreads; i++) {
 
717
        int fds[2];
 
718
        if (pipe(fds)) {
 
719
            perror("Can't create notify pipe");
 
720
            exit(1);
 
721
        }
 
722
 
 
723
        threads[i].notify_receive_fd = fds[0];
 
724
        threads[i].notify_send_fd = fds[1];
 
725
 
 
726
        setup_thread(&threads[i]);
 
727
        /* Reserve three fds for the libevent base, and two for the pipe */
 
728
        stats.reserved_fds += 5;
 
729
    }
 
730
 
 
731
    /* Create threads after we've done all the libevent setup. */
 
732
    for (i = 0; i < nthreads; i++) {
 
733
        create_worker(worker_libevent, &threads[i]);
 
734
    }
 
735
 
 
736
    /* Wait for all the threads to set themselves up before returning. */
 
737
    pthread_mutex_lock(&init_lock);
 
738
    while (init_count < nthreads) {
 
739
        pthread_cond_wait(&init_cond, &init_lock);
 
740
    }
 
741
    pthread_mutex_unlock(&init_lock);
 
742
}
 
743