1
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3
* Thread management for memcached.
18
#define ITEMS_PER_ALLOC 64
20
/* An item in the connection queue. */
21
typedef struct conn_queue_item CQ_ITEM;
22
struct conn_queue_item {
24
enum conn_states init_state;
27
enum network_transport transport;
31
/* A connection queue. */
32
typedef struct conn_queue CQ;
40
/* Lock for cache operations (item_*, assoc_*) */
41
pthread_mutex_t cache_lock;
43
/* Connection lock around accepting new connections */
44
pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
46
#if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
47
pthread_mutex_t atomics_mutex = PTHREAD_MUTEX_INITIALIZER;
50
/* Lock for global stats */
51
static pthread_mutex_t stats_lock;
53
/* Free list of CQ_ITEM structs */
54
static CQ_ITEM *cqi_freelist;
55
static pthread_mutex_t cqi_freelist_lock;
57
static pthread_mutex_t *item_locks;
58
/* size of the item lock hash table */
59
static uint32_t item_lock_count;
60
/* size - 1 for lookup masking */
61
static uint32_t item_lock_mask;
63
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
66
* Each libevent instance has a wakeup pipe, which other threads
67
* can use to signal that they've put a new connection on its queue.
69
static LIBEVENT_THREAD *threads;
72
* Number of worker threads that have finished setting themselves up.
74
static int init_count = 0;
75
static pthread_mutex_t init_lock;
76
static pthread_cond_t init_cond;
79
static void thread_libevent_process(int fd, short which, void *arg);
81
unsigned short refcount_incr(unsigned short *refcount) {
82
#ifdef HAVE_GCC_ATOMICS
83
return __sync_add_and_fetch(refcount, 1);
85
return atomic_inc_ushort_nv(refcount);
88
mutex_lock(&atomics_mutex);
91
pthread_mutex_unlock(&atomics_mutex);
96
unsigned short refcount_decr(unsigned short *refcount) {
97
#ifdef HAVE_GCC_ATOMICS
98
return __sync_sub_and_fetch(refcount, 1);
100
return atomic_dec_ushort_nv(refcount);
103
mutex_lock(&atomics_mutex);
106
pthread_mutex_unlock(&atomics_mutex);
111
void item_lock(uint32_t hv) {
112
mutex_lock(&item_locks[hv & item_lock_mask]);
115
void item_unlock(uint32_t hv) {
116
pthread_mutex_unlock(&item_locks[hv & item_lock_mask]);
120
* Initializes a connection queue.
122
static void cq_init(CQ *cq) {
123
pthread_mutex_init(&cq->lock, NULL);
124
pthread_cond_init(&cq->cond, NULL);
130
* Looks for an item on a connection queue, but doesn't block if there isn't
132
* Returns the item, or NULL if no item is available
134
static CQ_ITEM *cq_pop(CQ *cq) {
137
pthread_mutex_lock(&cq->lock);
140
cq->head = item->next;
141
if (NULL == cq->head)
144
pthread_mutex_unlock(&cq->lock);
150
* Adds an item to a connection queue.
152
static void cq_push(CQ *cq, CQ_ITEM *item) {
155
pthread_mutex_lock(&cq->lock);
156
if (NULL == cq->tail)
159
cq->tail->next = item;
161
pthread_cond_signal(&cq->cond);
162
pthread_mutex_unlock(&cq->lock);
166
* Returns a fresh connection queue item.
168
static CQ_ITEM *cqi_new(void) {
169
CQ_ITEM *item = NULL;
170
pthread_mutex_lock(&cqi_freelist_lock);
173
cqi_freelist = item->next;
175
pthread_mutex_unlock(&cqi_freelist_lock);
180
/* Allocate a bunch of items at once to reduce fragmentation */
181
item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
186
* Link together all the new items except the first one
187
* (which we'll return to the caller) for placement on
190
for (i = 2; i < ITEMS_PER_ALLOC; i++)
191
item[i - 1].next = &item[i];
193
pthread_mutex_lock(&cqi_freelist_lock);
194
item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
195
cqi_freelist = &item[1];
196
pthread_mutex_unlock(&cqi_freelist_lock);
204
* Frees a connection queue item (adds it to the freelist.)
206
static void cqi_free(CQ_ITEM *item) {
207
pthread_mutex_lock(&cqi_freelist_lock);
208
item->next = cqi_freelist;
210
pthread_mutex_unlock(&cqi_freelist_lock);
215
* Creates a worker thread.
217
static void create_worker(void *(*func)(void *), void *arg) {
222
pthread_attr_init(&attr);
224
if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
225
fprintf(stderr, "Can't create thread: %s\n",
232
* Sets whether or not we accept new connections.
234
void accept_new_conns(const bool do_accept) {
235
pthread_mutex_lock(&conn_lock);
236
do_accept_new_conns(do_accept);
237
pthread_mutex_unlock(&conn_lock);
239
/****************************** LIBEVENT THREADS *****************************/
242
* Set up a thread's information.
244
static void setup_thread(LIBEVENT_THREAD *me) {
245
me->base = event_init();
247
fprintf(stderr, "Can't allocate event base\n");
251
/* Listen for notifications from other threads */
252
event_set(&me->notify_event, me->notify_receive_fd,
253
EV_READ | EV_PERSIST, thread_libevent_process, me);
254
event_base_set(me->base, &me->notify_event);
256
if (event_add(&me->notify_event, 0) == -1) {
257
fprintf(stderr, "Can't monitor libevent notify pipe\n");
261
me->new_conn_queue = malloc(sizeof(struct conn_queue));
262
if (me->new_conn_queue == NULL) {
263
perror("Failed to allocate memory for connection queue");
266
cq_init(me->new_conn_queue);
268
if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
269
perror("Failed to initialize mutex");
273
me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
275
if (me->suffix_cache == NULL) {
276
fprintf(stderr, "Failed to create suffix cache\n");
283
* Worker thread: main event loop
285
static void *worker_libevent(void *arg) {
286
LIBEVENT_THREAD *me = arg;
288
/* Any per-thread setup can happen here; thread_init() will block until
289
* all threads have finished initializing.
292
pthread_mutex_lock(&init_lock);
294
pthread_cond_signal(&init_cond);
295
pthread_mutex_unlock(&init_lock);
297
event_base_loop(me->base, 0);
302
#ifndef __INTEL_COMPILER
303
#pragma GCC diagnostic ignored "-Wunused-parameter"
306
* Processes an incoming "handle a new connection" item. This is called when
307
* input arrives on the libevent wakeup pipe.
309
static void thread_libevent_process(int fd, short which, void *arg) {
310
LIBEVENT_THREAD *me = arg;
314
if (read(fd, buf, 1) != 1)
315
if (settings.verbose > 0)
316
fprintf(stderr, "Can't read from libevent pipe\n");
318
item = cq_pop(me->new_conn_queue);
321
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
322
item->read_buffer_size, item->transport, me->base);
324
if (IS_UDP(item->transport)) {
325
fprintf(stderr, "Can't listen for events on UDP socket\n");
328
if (settings.verbose > 0) {
329
fprintf(stderr, "Can't listen for events on fd %d\n",
341
/* Which thread we assigned a connection to most recently. */
342
static int last_thread = -1;
345
* Dispatches a new connection to another thread. This is only ever called
346
* from the main thread, either during initialization (for UDP) or because
347
* of an incoming connection.
349
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
350
int read_buffer_size, enum network_transport transport) {
351
CQ_ITEM *item = cqi_new();
352
int tid = (last_thread + 1) % settings.num_threads;
354
LIBEVENT_THREAD *thread = threads + tid;
359
item->init_state = init_state;
360
item->event_flags = event_flags;
361
item->read_buffer_size = read_buffer_size;
362
item->transport = transport;
364
cq_push(thread->new_conn_queue, item);
366
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
367
if (write(thread->notify_send_fd, "", 1) != 1) {
368
perror("Writing to thread notify pipe");
373
* Returns true if this is the thread that listens for new TCP connections.
375
int is_listen_thread() {
376
return pthread_self() == dispatcher_thread.thread_id;
379
/********************************* ITEM ACCESS *******************************/
382
* Allocates a new item.
384
item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
386
/* do_item_alloc handles its own locks */
387
it = do_item_alloc(key, nkey, flags, exptime, nbytes);
392
* Returns an item if it hasn't been marked as expired,
393
* lazy-expiring as needed.
395
item *item_get(const char *key, const size_t nkey) {
398
hv = hash(key, nkey, 0);
400
it = do_item_get(key, nkey, hv);
405
item *item_touch(const char *key, size_t nkey, uint32_t exptime) {
408
hv = hash(key, nkey, 0);
410
it = do_item_touch(key, nkey, exptime, hv);
416
* Links an item into the LRU and hashtable.
418
int item_link(item *item) {
422
hv = hash(ITEM_key(item), item->nkey, 0);
424
ret = do_item_link(item, hv);
430
* Decrements the reference count on an item and adds it to the freelist if
433
void item_remove(item *item) {
435
hv = hash(ITEM_key(item), item->nkey, 0);
438
do_item_remove(item);
443
* Replaces one item with another in the hashtable.
444
* Unprotected by a mutex lock since the core server does not require
445
* it to be thread-safe.
447
int item_replace(item *old_it, item *new_it, const uint32_t hv) {
448
return do_item_replace(old_it, new_it, hv);
452
* Unlinks an item from the LRU and hashtable.
454
void item_unlink(item *item) {
456
hv = hash(ITEM_key(item), item->nkey, 0);
458
do_item_unlink(item, hv);
463
* Moves an item to the back of the LRU queue.
465
void item_update(item *item) {
467
hv = hash(ITEM_key(item), item->nkey, 0);
470
do_item_update(item);
475
* Does arithmetic on a numeric item value.
477
enum delta_result_type add_delta(conn *c, const char *key,
478
const size_t nkey, int incr,
479
const int64_t delta, char *buf,
481
enum delta_result_type ret;
484
hv = hash(key, nkey, 0);
486
ret = do_add_delta(c, key, nkey, incr, delta, buf, cas, hv);
492
* Stores an item in the cache (high level, obeys set/add/replace semantics)
494
enum store_item_type store_item(item *item, int comm, conn* c) {
495
enum store_item_type ret;
498
hv = hash(ITEM_key(item), item->nkey, 0);
500
ret = do_store_item(item, comm, c, hv);
506
* Flushes expired items after a flush_all call
508
void item_flush_expired() {
509
mutex_lock(&cache_lock);
510
do_item_flush_expired();
511
pthread_mutex_unlock(&cache_lock);
515
* Dumps part of the cache
517
char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes) {
520
mutex_lock(&cache_lock);
521
ret = do_item_cachedump(slabs_clsid, limit, bytes);
522
pthread_mutex_unlock(&cache_lock);
527
* Dumps statistics about slab classes
529
void item_stats(ADD_STAT add_stats, void *c) {
530
mutex_lock(&cache_lock);
531
do_item_stats(add_stats, c);
532
pthread_mutex_unlock(&cache_lock);
536
* Dumps a list of objects of each size in 32-byte increments
538
void item_stats_sizes(ADD_STAT add_stats, void *c) {
539
mutex_lock(&cache_lock);
540
do_item_stats_sizes(add_stats, c);
541
pthread_mutex_unlock(&cache_lock);
544
/******************************* GLOBAL STATS ******************************/
547
pthread_mutex_lock(&stats_lock);
550
void STATS_UNLOCK() {
551
pthread_mutex_unlock(&stats_lock);
554
void threadlocal_stats_reset(void) {
556
for (ii = 0; ii < settings.num_threads; ++ii) {
557
pthread_mutex_lock(&threads[ii].stats.mutex);
559
threads[ii].stats.get_cmds = 0;
560
threads[ii].stats.get_misses = 0;
561
threads[ii].stats.touch_cmds = 0;
562
threads[ii].stats.touch_misses = 0;
563
threads[ii].stats.delete_misses = 0;
564
threads[ii].stats.incr_misses = 0;
565
threads[ii].stats.decr_misses = 0;
566
threads[ii].stats.cas_misses = 0;
567
threads[ii].stats.bytes_read = 0;
568
threads[ii].stats.bytes_written = 0;
569
threads[ii].stats.flush_cmds = 0;
570
threads[ii].stats.conn_yields = 0;
571
threads[ii].stats.auth_cmds = 0;
572
threads[ii].stats.auth_errors = 0;
574
for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
575
threads[ii].stats.slab_stats[sid].set_cmds = 0;
576
threads[ii].stats.slab_stats[sid].get_hits = 0;
577
threads[ii].stats.slab_stats[sid].touch_hits = 0;
578
threads[ii].stats.slab_stats[sid].delete_hits = 0;
579
threads[ii].stats.slab_stats[sid].incr_hits = 0;
580
threads[ii].stats.slab_stats[sid].decr_hits = 0;
581
threads[ii].stats.slab_stats[sid].cas_hits = 0;
582
threads[ii].stats.slab_stats[sid].cas_badval = 0;
585
pthread_mutex_unlock(&threads[ii].stats.mutex);
589
void threadlocal_stats_aggregate(struct thread_stats *stats) {
592
/* The struct has a mutex, but we can safely set the whole thing
593
* to zero since it is unused when aggregating. */
594
memset(stats, 0, sizeof(*stats));
596
for (ii = 0; ii < settings.num_threads; ++ii) {
597
pthread_mutex_lock(&threads[ii].stats.mutex);
599
stats->get_cmds += threads[ii].stats.get_cmds;
600
stats->get_misses += threads[ii].stats.get_misses;
601
stats->touch_cmds += threads[ii].stats.touch_cmds;
602
stats->touch_misses += threads[ii].stats.touch_misses;
603
stats->delete_misses += threads[ii].stats.delete_misses;
604
stats->decr_misses += threads[ii].stats.decr_misses;
605
stats->incr_misses += threads[ii].stats.incr_misses;
606
stats->cas_misses += threads[ii].stats.cas_misses;
607
stats->bytes_read += threads[ii].stats.bytes_read;
608
stats->bytes_written += threads[ii].stats.bytes_written;
609
stats->flush_cmds += threads[ii].stats.flush_cmds;
610
stats->conn_yields += threads[ii].stats.conn_yields;
611
stats->auth_cmds += threads[ii].stats.auth_cmds;
612
stats->auth_errors += threads[ii].stats.auth_errors;
614
for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
615
stats->slab_stats[sid].set_cmds +=
616
threads[ii].stats.slab_stats[sid].set_cmds;
617
stats->slab_stats[sid].get_hits +=
618
threads[ii].stats.slab_stats[sid].get_hits;
619
stats->slab_stats[sid].touch_hits +=
620
threads[ii].stats.slab_stats[sid].touch_hits;
621
stats->slab_stats[sid].delete_hits +=
622
threads[ii].stats.slab_stats[sid].delete_hits;
623
stats->slab_stats[sid].decr_hits +=
624
threads[ii].stats.slab_stats[sid].decr_hits;
625
stats->slab_stats[sid].incr_hits +=
626
threads[ii].stats.slab_stats[sid].incr_hits;
627
stats->slab_stats[sid].cas_hits +=
628
threads[ii].stats.slab_stats[sid].cas_hits;
629
stats->slab_stats[sid].cas_badval +=
630
threads[ii].stats.slab_stats[sid].cas_badval;
633
pthread_mutex_unlock(&threads[ii].stats.mutex);
637
void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
643
out->delete_hits = 0;
649
for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
650
out->set_cmds += stats->slab_stats[sid].set_cmds;
651
out->get_hits += stats->slab_stats[sid].get_hits;
652
out->touch_hits += stats->slab_stats[sid].touch_hits;
653
out->delete_hits += stats->slab_stats[sid].delete_hits;
654
out->decr_hits += stats->slab_stats[sid].decr_hits;
655
out->incr_hits += stats->slab_stats[sid].incr_hits;
656
out->cas_hits += stats->slab_stats[sid].cas_hits;
657
out->cas_badval += stats->slab_stats[sid].cas_badval;
661
#ifndef __INTEL_COMPILER
662
#pragma GCC diagnostic ignored "-Wsign-compare"
665
* Initializes the thread subsystem, creating various worker threads.
667
* nthreads Number of worker event handler threads to spawn
668
* main_base Event base for main thread
670
void thread_init(int nthreads, struct event_base *main_base) {
674
pthread_mutex_init(&cache_lock, NULL);
675
pthread_mutex_init(&stats_lock, NULL);
677
pthread_mutex_init(&init_lock, NULL);
678
pthread_cond_init(&init_cond, NULL);
680
pthread_mutex_init(&cqi_freelist_lock, NULL);
683
/* Want a wide lock table, but don't waste memory */
686
} else if (nthreads < 4) {
688
} else if (nthreads < 5) {
691
/* 8192 buckets, and central locks don't scale much past 5 threads */
695
item_lock_count = ((unsigned long int)1 << (power));
696
item_lock_mask = item_lock_count - 1;
698
item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));
700
perror("Can't allocate item locks");
703
for (i = 0; i < item_lock_count; i++) {
704
pthread_mutex_init(&item_locks[i], NULL);
707
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
709
perror("Can't allocate thread descriptors");
713
dispatcher_thread.base = main_base;
714
dispatcher_thread.thread_id = pthread_self();
716
for (i = 0; i < nthreads; i++) {
719
perror("Can't create notify pipe");
723
threads[i].notify_receive_fd = fds[0];
724
threads[i].notify_send_fd = fds[1];
726
setup_thread(&threads[i]);
727
/* Reserve three fds for the libevent base, and two for the pipe */
728
stats.reserved_fds += 5;
731
/* Create threads after we've done all the libevent setup. */
732
for (i = 0; i < nthreads; i++) {
733
create_worker(worker_libevent, &threads[i]);
736
/* Wait for all the threads to set themselves up before returning. */
737
pthread_mutex_lock(&init_lock);
738
while (init_count < nthreads) {
739
pthread_cond_wait(&init_cond, &init_lock);
741
pthread_mutex_unlock(&init_lock);