~ubuntu-branches/ubuntu/precise/memcached/precise

« back to all changes in this revision

Viewing changes to thread.c

  • Committer: Bazaar Package Importer
  • Author(s): David Martínez Moreno
  • Date: 2009-10-16 15:09:43 UTC
  • mfrom: (1.3.5 upstream)
  • mto: This revision was merged to the branch mainline in revision 8.
  • Revision ID: james.westby@ubuntu.com-20091016150943-l96biwf7siwdt1ci
Tags: upstream-1.4.2
Import upstream version 1.4.2

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2
2
/*
3
3
 * Thread management for memcached.
4
 
 *
5
 
 *  $Id$
6
4
 */
7
5
#include "memcached.h"
8
6
#include <assert.h>
10
8
#include <errno.h>
11
9
#include <stdlib.h>
12
10
#include <errno.h>
13
 
 
14
 
#ifdef HAVE_MALLOC_H
15
 
#include <malloc.h>
16
 
#endif
17
 
 
18
 
#ifdef HAVE_STRING_H
19
11
#include <string.h>
20
 
#endif
21
 
 
22
 
#ifdef USE_THREADS
23
 
 
24
12
#include <pthread.h>
25
13
 
26
14
#define ITEMS_PER_ALLOC 64
28
16
/* An item in the connection queue. */
29
17
typedef struct conn_queue_item CQ_ITEM;
30
18
struct conn_queue_item {
31
 
    int     sfd;
32
 
    int     init_state;
33
 
    int     event_flags;
34
 
    int     read_buffer_size;
35
 
    int     is_udp;
36
 
    CQ_ITEM *next;
 
19
    int               sfd;
 
20
    enum conn_states  init_state;
 
21
    int               event_flags;
 
22
    int               read_buffer_size;
 
23
    enum network_transport     transport;
 
24
    CQ_ITEM          *next;
37
25
};
38
26
 
39
27
/* A connection queue. */
45
33
    pthread_cond_t  cond;
46
34
};
47
35
 
48
 
/* Lock for connection freelist */
49
 
static pthread_mutex_t conn_lock;
50
 
 
51
 
/* Lock for alternative item suffix freelist */
52
 
static pthread_mutex_t suffix_lock;
53
 
 
54
36
/* Lock for cache operations (item_*, assoc_*) */
55
 
static pthread_mutex_t cache_lock;
 
37
pthread_mutex_t cache_lock;
56
38
 
57
 
/* Lock for slab allocator operations */
58
 
static pthread_mutex_t slabs_lock;
 
39
/* Connection lock around accepting new connections */
 
40
pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
59
41
 
60
42
/* Lock for global stats */
61
43
static pthread_mutex_t stats_lock;
64
46
static CQ_ITEM *cqi_freelist;
65
47
static pthread_mutex_t cqi_freelist_lock;
66
48
 
 
49
static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;
 
50
 
67
51
/*
68
52
 * Each libevent instance has a wakeup pipe, which other threads
69
53
 * can use to signal that they've put a new connection on its queue.
70
54
 */
71
 
typedef struct {
72
 
    pthread_t thread_id;        /* unique ID of this thread */
73
 
    struct event_base *base;    /* libevent handle this thread uses */
74
 
    struct event notify_event;  /* listen event for notify pipe */
75
 
    int notify_receive_fd;      /* receiving end of notify pipe */
76
 
    int notify_send_fd;         /* sending end of notify pipe */
77
 
    CQ  new_conn_queue;         /* queue of new connections to handle */
78
 
} LIBEVENT_THREAD;
79
 
 
80
55
static LIBEVENT_THREAD *threads;
81
56
 
82
57
/*
83
 
 * Number of threads that have finished setting themselves up.
 
58
 * Number of worker threads that have finished setting themselves up.
84
59
 */
85
60
static int init_count = 0;
86
61
static pthread_mutex_t init_lock;
100
75
}
101
76
 
102
77
/*
103
 
 * Waits for work on a connection queue.
104
 
 */
105
 
static CQ_ITEM *cq_pop(CQ *cq) {
106
 
    CQ_ITEM *item;
107
 
 
108
 
    pthread_mutex_lock(&cq->lock);
109
 
    while (NULL == cq->head)
110
 
        pthread_cond_wait(&cq->cond, &cq->lock);
111
 
    item = cq->head;
112
 
    cq->head = item->next;
113
 
    if (NULL == cq->head)
114
 
        cq->tail = NULL;
115
 
    pthread_mutex_unlock(&cq->lock);
116
 
 
117
 
    return item;
118
 
}
119
 
 
120
 
/*
121
78
 * Looks for an item on a connection queue, but doesn't block if there isn't
122
79
 * one.
123
80
 * Returns the item, or NULL if no item is available
124
81
 */
125
 
static CQ_ITEM *cq_peek(CQ *cq) {
 
82
static CQ_ITEM *cq_pop(CQ *cq) {
126
83
    CQ_ITEM *item;
127
84
 
128
85
    pthread_mutex_lock(&cq->lock);
156
113
/*
157
114
 * Returns a fresh connection queue item.
158
115
 */
159
 
static CQ_ITEM *cqi_new() {
 
116
static CQ_ITEM *cqi_new(void) {
160
117
    CQ_ITEM *item = NULL;
161
118
    pthread_mutex_lock(&cqi_freelist_lock);
162
119
    if (cqi_freelist) {
222
179
/*
223
180
 * Sets whether or not we accept new connections.
224
181
 */
225
 
void mt_accept_new_conns(const bool do_accept) {
 
182
void accept_new_conns(const bool do_accept) {
226
183
    pthread_mutex_lock(&conn_lock);
227
184
    do_accept_new_conns(do_accept);
228
185
    pthread_mutex_unlock(&conn_lock);
229
186
}
230
 
 
231
 
/*
232
 
 * Pulls a conn structure from the freelist, if one is available.
233
 
 */
234
 
conn *mt_conn_from_freelist() {
235
 
    conn *c;
236
 
 
237
 
    pthread_mutex_lock(&conn_lock);
238
 
    c = do_conn_from_freelist();
239
 
    pthread_mutex_unlock(&conn_lock);
240
 
 
241
 
    return c;
242
 
}
243
 
 
244
 
 
245
 
/*
246
 
 * Adds a conn structure to the freelist.
247
 
 *
248
 
 * Returns 0 on success, 1 if the structure couldn't be added.
249
 
 */
250
 
bool mt_conn_add_to_freelist(conn *c) {
251
 
    bool result;
252
 
 
253
 
    pthread_mutex_lock(&conn_lock);
254
 
    result = do_conn_add_to_freelist(c);
255
 
    pthread_mutex_unlock(&conn_lock);
256
 
 
257
 
    return result;
258
 
}
259
 
 
260
 
/*
261
 
 * Pulls a suffix buffer from the freelist, if one is available.
262
 
 */
263
 
char *mt_suffix_from_freelist() {
264
 
    char *s;
265
 
 
266
 
    pthread_mutex_lock(&suffix_lock);
267
 
    s = do_suffix_from_freelist();
268
 
    pthread_mutex_unlock(&suffix_lock);
269
 
 
270
 
    return s;
271
 
}
272
 
 
273
 
 
274
 
/*
275
 
 * Adds a suffix buffer to the freelist.
276
 
 *
277
 
 * Returns 0 on success, 1 if the buffer couldn't be added.
278
 
 */
279
 
bool mt_suffix_add_to_freelist(char *s) {
280
 
    bool result;
281
 
 
282
 
    pthread_mutex_lock(&suffix_lock);
283
 
    result = do_suffix_add_to_freelist(s);
284
 
    pthread_mutex_unlock(&suffix_lock);
285
 
 
286
 
    return result;
287
 
}
288
 
 
289
 
 
290
187
/****************************** LIBEVENT THREADS *****************************/
291
188
 
292
189
/*
293
190
 * Set up a thread's information.
294
191
 */
295
192
static void setup_thread(LIBEVENT_THREAD *me) {
 
193
    me->base = event_init();
296
194
    if (! me->base) {
297
 
        me->base = event_init();
298
 
        if (! me->base) {
299
 
            fprintf(stderr, "Can't allocate event base\n");
300
 
            exit(1);
301
 
        }
 
195
        fprintf(stderr, "Can't allocate event base\n");
 
196
        exit(1);
302
197
    }
303
198
 
304
199
    /* Listen for notifications from other threads */
311
206
        exit(1);
312
207
    }
313
208
 
314
 
    cq_init(&me->new_conn_queue);
 
209
    me->new_conn_queue = malloc(sizeof(struct conn_queue));
 
210
    if (me->new_conn_queue == NULL) {
 
211
        perror("Failed to allocate memory for connection queue");
 
212
        exit(EXIT_FAILURE);
 
213
    }
 
214
    cq_init(me->new_conn_queue);
 
215
 
 
216
    if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
 
217
        perror("Failed to initialize mutex");
 
218
        exit(EXIT_FAILURE);
 
219
    }
 
220
 
 
221
    me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
 
222
                                    NULL, NULL);
 
223
    if (me->suffix_cache == NULL) {
 
224
        fprintf(stderr, "Failed to create suffix cache\n");
 
225
        exit(EXIT_FAILURE);
 
226
    }
315
227
}
316
228
 
317
229
 
330
242
    pthread_cond_signal(&init_cond);
331
243
    pthread_mutex_unlock(&init_lock);
332
244
 
333
 
    return (void*) event_base_loop(me->base, 0);
 
245
    event_base_loop(me->base, 0);
 
246
    return NULL;
334
247
}
335
248
 
336
249
 
347
260
        if (settings.verbose > 0)
348
261
            fprintf(stderr, "Can't read from libevent pipe\n");
349
262
 
350
 
    item = cq_peek(&me->new_conn_queue);
 
263
    item = cq_pop(me->new_conn_queue);
351
264
 
352
265
    if (NULL != item) {
353
266
        conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
354
 
                           item->read_buffer_size, item->is_udp, me->base);
 
267
                           item->read_buffer_size, item->transport, me->base);
355
268
        if (c == NULL) {
356
 
            if (item->is_udp) {
 
269
            if (IS_UDP(item->transport)) {
357
270
                fprintf(stderr, "Can't listen for events on UDP socket\n");
358
271
                exit(1);
359
272
            } else {
363
276
                }
364
277
                close(item->sfd);
365
278
            }
 
279
        } else {
 
280
            c->thread = me;
366
281
        }
367
282
        cqi_free(item);
368
283
    }
369
284
}
370
285
 
371
286
/* Which thread we assigned a connection to most recently. */
372
 
static int last_thread = 0;
 
287
static int last_thread = -1;
373
288
 
374
289
/*
375
290
 * Dispatches a new connection to another thread. This is only ever called
376
291
 * from the main thread, either during initialization (for UDP) or because
377
292
 * of an incoming connection.
378
293
 */
379
 
void dispatch_conn_new(int sfd, int init_state, int event_flags,
380
 
                       int read_buffer_size, int is_udp) {
 
294
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
 
295
                       int read_buffer_size, enum network_transport transport) {
381
296
    CQ_ITEM *item = cqi_new();
382
 
 
383
 
    int tid = last_thread % (settings.num_threads - 1);
384
 
 
385
 
    /* Skip the dispatch thread (0) */
386
 
    tid++;
387
 
    assert(tid != 0);
388
 
    assert(tid < settings.num_threads);
 
297
    int tid = (last_thread + 1) % settings.num_threads;
 
298
 
389
299
    LIBEVENT_THREAD *thread = threads + tid;
390
300
 
391
301
    last_thread = tid;
394
304
    item->init_state = init_state;
395
305
    item->event_flags = event_flags;
396
306
    item->read_buffer_size = read_buffer_size;
397
 
    item->is_udp = is_udp;
 
307
    item->transport = transport;
398
308
 
399
 
    cq_push(&thread->new_conn_queue, item);
 
309
    cq_push(thread->new_conn_queue, item);
400
310
 
401
311
    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
402
312
    if (write(thread->notify_send_fd, "", 1) != 1) {
407
317
/*
408
318
 * Returns true if this is the thread that listens for new TCP connections.
409
319
 */
410
 
int mt_is_listen_thread() {
411
 
    return pthread_self() == threads[0].thread_id;
 
320
int is_listen_thread() {
 
321
    return pthread_self() == dispatcher_thread.thread_id;
412
322
}
413
323
 
414
324
/********************************* ITEM ACCESS *******************************/
415
325
 
416
326
/*
417
 
 * Walks through the list of deletes that have been deferred because the items
418
 
 * were locked down at the tmie.
419
 
 */
420
 
void mt_run_deferred_deletes() {
421
 
    pthread_mutex_lock(&cache_lock);
422
 
    do_run_deferred_deletes();
423
 
    pthread_mutex_unlock(&cache_lock);
424
 
}
425
 
 
426
 
/*
427
327
 * Allocates a new item.
428
328
 */
429
 
item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
 
329
item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
430
330
    item *it;
431
331
    pthread_mutex_lock(&cache_lock);
432
332
    it = do_item_alloc(key, nkey, flags, exptime, nbytes);
435
335
}
436
336
 
437
337
/*
438
 
 * Returns an item if it hasn't been marked as expired or deleted,
 
338
 * Returns an item if it hasn't been marked as expired,
439
339
 * lazy-expiring as needed.
440
340
 */
441
 
item *mt_item_get_notedeleted(const char *key, const size_t nkey, bool *delete_locked) {
 
341
item *item_get(const char *key, const size_t nkey) {
442
342
    item *it;
443
343
    pthread_mutex_lock(&cache_lock);
444
 
    it = do_item_get_notedeleted(key, nkey, delete_locked);
 
344
    it = do_item_get(key, nkey);
445
345
    pthread_mutex_unlock(&cache_lock);
446
346
    return it;
447
347
}
449
349
/*
450
350
 * Links an item into the LRU and hashtable.
451
351
 */
452
 
int mt_item_link(item *item) {
 
352
int item_link(item *item) {
453
353
    int ret;
454
354
 
455
355
    pthread_mutex_lock(&cache_lock);
462
362
 * Decrements the reference count on an item and adds it to the freelist if
463
363
 * needed.
464
364
 */
465
 
void mt_item_remove(item *item) {
 
365
void item_remove(item *item) {
466
366
    pthread_mutex_lock(&cache_lock);
467
367
    do_item_remove(item);
468
368
    pthread_mutex_unlock(&cache_lock);
470
370
 
471
371
/*
472
372
 * Replaces one item with another in the hashtable.
 
373
 * Unprotected by a mutex lock since the core server does not require
 
374
 * it to be thread-safe.
473
375
 */
474
 
int mt_item_replace(item *old, item *new) {
475
 
    int ret;
476
 
 
477
 
    pthread_mutex_lock(&cache_lock);
478
 
    ret = do_item_replace(old, new);
479
 
    pthread_mutex_unlock(&cache_lock);
480
 
    return ret;
 
376
int item_replace(item *old_it, item *new_it) {
 
377
    return do_item_replace(old_it, new_it);
481
378
}
482
379
 
483
380
/*
484
381
 * Unlinks an item from the LRU and hashtable.
485
382
 */
486
 
void mt_item_unlink(item *item) {
 
383
void item_unlink(item *item) {
487
384
    pthread_mutex_lock(&cache_lock);
488
385
    do_item_unlink(item);
489
386
    pthread_mutex_unlock(&cache_lock);
492
389
/*
493
390
 * Moves an item to the back of the LRU queue.
494
391
 */
495
 
void mt_item_update(item *item) {
 
392
void item_update(item *item) {
496
393
    pthread_mutex_lock(&cache_lock);
497
394
    do_item_update(item);
498
395
    pthread_mutex_unlock(&cache_lock);
499
396
}
500
397
 
501
398
/*
502
 
 * Adds an item to the deferred-delete list so it can be reaped later.
503
 
 */
504
 
char *mt_defer_delete(item *item, time_t exptime) {
505
 
    char *ret;
506
 
 
507
 
    pthread_mutex_lock(&cache_lock);
508
 
    ret = do_defer_delete(item, exptime);
509
 
    pthread_mutex_unlock(&cache_lock);
510
 
    return ret;
511
 
}
512
 
 
513
 
/*
514
399
 * Does arithmetic on a numeric item value.
515
400
 */
516
 
char *mt_add_delta(conn *c, item *item, int incr, const int64_t delta,
517
 
                   char *buf) {
518
 
    char *ret;
 
401
enum delta_result_type add_delta(conn *c, item *item, int incr,
 
402
                                 const int64_t delta, char *buf) {
 
403
    enum delta_result_type ret;
519
404
 
520
405
    pthread_mutex_lock(&cache_lock);
521
406
    ret = do_add_delta(c, item, incr, delta, buf);
526
411
/*
527
412
 * Stores an item in the cache (high level, obeys set/add/replace semantics)
528
413
 */
529
 
int mt_store_item(item *item, int comm) {
530
 
    int ret;
 
414
enum store_item_type store_item(item *item, int comm, conn* c) {
 
415
    enum store_item_type ret;
531
416
 
532
417
    pthread_mutex_lock(&cache_lock);
533
 
    ret = do_store_item(item, comm);
 
418
    ret = do_store_item(item, comm, c);
534
419
    pthread_mutex_unlock(&cache_lock);
535
420
    return ret;
536
421
}
538
423
/*
539
424
 * Flushes expired items after a flush_all call
540
425
 */
541
 
void mt_item_flush_expired() {
 
426
void item_flush_expired() {
542
427
    pthread_mutex_lock(&cache_lock);
543
428
    do_item_flush_expired();
544
429
    pthread_mutex_unlock(&cache_lock);
547
432
/*
548
433
 * Dumps part of the cache
549
434
 */
550
 
char *mt_item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes) {
 
435
char *item_cachedump(unsigned int slabs_clsid, unsigned int limit, unsigned int *bytes) {
551
436
    char *ret;
552
437
 
553
438
    pthread_mutex_lock(&cache_lock);
559
444
/*
560
445
 * Dumps statistics about slab classes
561
446
 */
562
 
char *mt_item_stats(int *bytes) {
563
 
    char *ret;
564
 
 
 
447
void  item_stats(ADD_STAT add_stats, void *c) {
565
448
    pthread_mutex_lock(&cache_lock);
566
 
    ret = do_item_stats(bytes);
 
449
    do_item_stats(add_stats, c);
567
450
    pthread_mutex_unlock(&cache_lock);
568
 
    return ret;
569
451
}
570
452
 
571
453
/*
572
454
 * Dumps a list of objects of each size in 32-byte increments
573
455
 */
574
 
char *mt_item_stats_sizes(int *bytes) {
575
 
    char *ret;
576
 
 
577
 
    pthread_mutex_lock(&cache_lock);
578
 
    ret = do_item_stats_sizes(bytes);
579
 
    pthread_mutex_unlock(&cache_lock);
580
 
    return ret;
581
 
}
582
 
 
583
 
/****************************** HASHTABLE MODULE *****************************/
584
 
 
585
 
void mt_assoc_move_next_bucket() {
586
 
    pthread_mutex_lock(&cache_lock);
587
 
    do_assoc_move_next_bucket();
588
 
    pthread_mutex_unlock(&cache_lock);
589
 
}
590
 
 
591
 
/******************************* SLAB ALLOCATOR ******************************/
592
 
 
593
 
void *mt_slabs_alloc(size_t size, unsigned int id) {
594
 
    void *ret;
595
 
 
596
 
    pthread_mutex_lock(&slabs_lock);
597
 
    ret = do_slabs_alloc(size, id);
598
 
    pthread_mutex_unlock(&slabs_lock);
599
 
    return ret;
600
 
}
601
 
 
602
 
void mt_slabs_free(void *ptr, size_t size, unsigned int id) {
603
 
    pthread_mutex_lock(&slabs_lock);
604
 
    do_slabs_free(ptr, size, id);
605
 
    pthread_mutex_unlock(&slabs_lock);
606
 
}
607
 
 
608
 
char *mt_slabs_stats(int *buflen) {
609
 
    char *ret;
610
 
 
611
 
    pthread_mutex_lock(&slabs_lock);
612
 
    ret = do_slabs_stats(buflen);
613
 
    pthread_mutex_unlock(&slabs_lock);
614
 
    return ret;
615
 
}
616
 
 
617
 
#ifdef ALLOW_SLABS_REASSIGN
618
 
int mt_slabs_reassign(unsigned char srcid, unsigned char dstid) {
619
 
    int ret;
620
 
 
621
 
    pthread_mutex_lock(&slabs_lock);
622
 
    ret = do_slabs_reassign(srcid, dstid);
623
 
    pthread_mutex_unlock(&slabs_lock);
624
 
    return ret;
625
 
}
626
 
#endif
 
456
void  item_stats_sizes(ADD_STAT add_stats, void *c) {
 
457
    pthread_mutex_lock(&cache_lock);
 
458
    do_item_stats_sizes(add_stats, c);
 
459
    pthread_mutex_unlock(&cache_lock);
 
460
}
627
461
 
628
462
/******************************* GLOBAL STATS ******************************/
629
463
 
630
 
void mt_stats_lock() {
 
464
void STATS_LOCK() {
631
465
    pthread_mutex_lock(&stats_lock);
632
466
}
633
467
 
634
 
void mt_stats_unlock() {
 
468
void STATS_UNLOCK() {
635
469
    pthread_mutex_unlock(&stats_lock);
636
470
}
637
471
 
 
472
void threadlocal_stats_reset(void) {
 
473
    int ii, sid;
 
474
    for (ii = 0; ii < settings.num_threads; ++ii) {
 
475
        pthread_mutex_lock(&threads[ii].stats.mutex);
 
476
 
 
477
        threads[ii].stats.get_cmds = 0;
 
478
        threads[ii].stats.get_misses = 0;
 
479
        threads[ii].stats.delete_misses = 0;
 
480
        threads[ii].stats.incr_misses = 0;
 
481
        threads[ii].stats.decr_misses = 0;
 
482
        threads[ii].stats.cas_misses = 0;
 
483
        threads[ii].stats.bytes_read = 0;
 
484
        threads[ii].stats.bytes_written = 0;
 
485
        threads[ii].stats.flush_cmds = 0;
 
486
        threads[ii].stats.conn_yields = 0;
 
487
 
 
488
        for(sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
 
489
            threads[ii].stats.slab_stats[sid].set_cmds = 0;
 
490
            threads[ii].stats.slab_stats[sid].get_hits = 0;
 
491
            threads[ii].stats.slab_stats[sid].delete_hits = 0;
 
492
            threads[ii].stats.slab_stats[sid].incr_hits = 0;
 
493
            threads[ii].stats.slab_stats[sid].decr_hits = 0;
 
494
            threads[ii].stats.slab_stats[sid].cas_hits = 0;
 
495
            threads[ii].stats.slab_stats[sid].cas_badval = 0;
 
496
        }
 
497
 
 
498
        pthread_mutex_unlock(&threads[ii].stats.mutex);
 
499
    }
 
500
}
 
501
 
 
502
void threadlocal_stats_aggregate(struct thread_stats *stats) {
 
503
    int ii, sid;
 
504
    /* The struct contains a mutex, so I should probably not memset it.. */
 
505
    stats->get_cmds = 0;
 
506
    stats->get_misses = 0;
 
507
    stats->delete_misses = 0;
 
508
    stats->incr_misses = 0;
 
509
    stats->decr_misses = 0;
 
510
    stats->cas_misses = 0;
 
511
    stats->bytes_written = 0;
 
512
    stats->bytes_read = 0;
 
513
    stats->flush_cmds = 0;
 
514
    stats->conn_yields = 0;
 
515
 
 
516
    memset(stats->slab_stats, 0,
 
517
           sizeof(struct slab_stats) * MAX_NUMBER_OF_SLAB_CLASSES);
 
518
 
 
519
    for (ii = 0; ii < settings.num_threads; ++ii) {
 
520
        pthread_mutex_lock(&threads[ii].stats.mutex);
 
521
 
 
522
        stats->get_cmds += threads[ii].stats.get_cmds;
 
523
        stats->get_misses += threads[ii].stats.get_misses;
 
524
        stats->delete_misses += threads[ii].stats.delete_misses;
 
525
        stats->decr_misses += threads[ii].stats.decr_misses;
 
526
        stats->incr_misses += threads[ii].stats.incr_misses;
 
527
        stats->cas_misses += threads[ii].stats.cas_misses;
 
528
        stats->bytes_read += threads[ii].stats.bytes_read;
 
529
        stats->bytes_written += threads[ii].stats.bytes_written;
 
530
        stats->flush_cmds += threads[ii].stats.flush_cmds;
 
531
        stats->conn_yields += threads[ii].stats.conn_yields;
 
532
 
 
533
        for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
 
534
            stats->slab_stats[sid].set_cmds +=
 
535
                threads[ii].stats.slab_stats[sid].set_cmds;
 
536
            stats->slab_stats[sid].get_hits +=
 
537
                threads[ii].stats.slab_stats[sid].get_hits;
 
538
            stats->slab_stats[sid].delete_hits +=
 
539
                threads[ii].stats.slab_stats[sid].delete_hits;
 
540
            stats->slab_stats[sid].decr_hits +=
 
541
                threads[ii].stats.slab_stats[sid].decr_hits;
 
542
            stats->slab_stats[sid].incr_hits +=
 
543
                threads[ii].stats.slab_stats[sid].incr_hits;
 
544
            stats->slab_stats[sid].cas_hits +=
 
545
                threads[ii].stats.slab_stats[sid].cas_hits;
 
546
            stats->slab_stats[sid].cas_badval +=
 
547
                threads[ii].stats.slab_stats[sid].cas_badval;
 
548
        }
 
549
 
 
550
        pthread_mutex_unlock(&threads[ii].stats.mutex);
 
551
    }
 
552
}
 
553
 
 
554
void slab_stats_aggregate(struct thread_stats *stats, struct slab_stats *out) {
 
555
    int sid;
 
556
 
 
557
    out->set_cmds = 0;
 
558
    out->get_hits = 0;
 
559
    out->delete_hits = 0;
 
560
    out->incr_hits = 0;
 
561
    out->decr_hits = 0;
 
562
    out->cas_hits = 0;
 
563
    out->cas_badval = 0;
 
564
 
 
565
    for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
 
566
        out->set_cmds += stats->slab_stats[sid].set_cmds;
 
567
        out->get_hits += stats->slab_stats[sid].get_hits;
 
568
        out->delete_hits += stats->slab_stats[sid].delete_hits;
 
569
        out->decr_hits += stats->slab_stats[sid].decr_hits;
 
570
        out->incr_hits += stats->slab_stats[sid].incr_hits;
 
571
        out->cas_hits += stats->slab_stats[sid].cas_hits;
 
572
        out->cas_badval += stats->slab_stats[sid].cas_badval;
 
573
    }
 
574
}
 
575
 
638
576
/*
639
577
 * Initializes the thread subsystem, creating various worker threads.
640
578
 *
641
 
 * nthreads  Number of event handler threads to spawn
 
579
 * nthreads  Number of worker event handler threads to spawn
642
580
 * main_base Event base for main thread
643
581
 */
644
582
void thread_init(int nthreads, struct event_base *main_base) {
645
583
    int         i;
646
584
 
647
585
    pthread_mutex_init(&cache_lock, NULL);
648
 
    pthread_mutex_init(&conn_lock, NULL);
649
 
    pthread_mutex_init(&slabs_lock, NULL);
650
586
    pthread_mutex_init(&stats_lock, NULL);
651
587
 
652
588
    pthread_mutex_init(&init_lock, NULL);
661
597
        exit(1);
662
598
    }
663
599
 
664
 
    threads[0].base = main_base;
665
 
    threads[0].thread_id = pthread_self();
 
600
    dispatcher_thread.base = main_base;
 
601
    dispatcher_thread.thread_id = pthread_self();
666
602
 
667
603
    for (i = 0; i < nthreads; i++) {
668
604
        int fds[2];
674
610
        threads[i].notify_receive_fd = fds[0];
675
611
        threads[i].notify_send_fd = fds[1];
676
612
 
677
 
    setup_thread(&threads[i]);
 
613
        setup_thread(&threads[i]);
678
614
    }
679
615
 
680
616
    /* Create threads after we've done all the libevent setup. */
681
 
    for (i = 1; i < nthreads; i++) {
 
617
    for (i = 0; i < nthreads; i++) {
682
618
        create_worker(worker_libevent, &threads[i]);
683
619
    }
684
620
 
685
621
    /* Wait for all the threads to set themselves up before returning. */
686
622
    pthread_mutex_lock(&init_lock);
687
 
    init_count++; /* main thread */
688
623
    while (init_count < nthreads) {
689
624
        pthread_cond_wait(&init_cond, &init_lock);
690
625
    }
691
626
    pthread_mutex_unlock(&init_lock);
692
627
}
693
628
 
694
 
#endif