~ubuntu-branches/ubuntu/jaunty/memcached/jaunty

« back to all changes in this revision

Viewing changes to thread.c

  • Committer: Bazaar Package Importer
  • Author(s): Jay Bonci
  • Date: 2007-06-29 10:18:03 UTC
  • mfrom: (1.1.3 upstream) (3.1.1 lenny)
  • Revision ID: james.westby@ubuntu.com-20070629101803-9jtjy7t0j7w4kq78
Tags: 1.2.2-1
New upstream release

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
 
2
/*
 
3
 * Thread management for memcached.
 
4
 *
 
5
 *  $Id$
 
6
 */
 
7
#include "memcached.h"
 
8
#include <stdio.h>
 
9
#include <errno.h>
 
10
#include <stdlib.h>
 
11
#include <errno.h>
 
12
 
 
13
#ifdef HAVE_MALLOC_H
 
14
#include <malloc.h>
 
15
#endif
 
16
 
 
17
#ifdef USE_THREADS
 
18
 
 
19
#include <pthread.h>
 
20
 
 
21
#define ITEMS_PER_ALLOC 64
 
22
 
 
23
/* An item in the connection queue. */
 
24
typedef struct conn_queue_item CQ_ITEM;
 
25
struct conn_queue_item {
 
26
    int     sfd;
 
27
    int     init_state;
 
28
    int     event_flags;
 
29
    int     read_buffer_size;
 
30
    int     is_udp;
 
31
    CQ_ITEM *next;
 
32
};
 
33
 
 
34
/* A connection queue. */
 
35
typedef struct conn_queue CQ;
 
36
struct conn_queue {
 
37
    CQ_ITEM *head;
 
38
    CQ_ITEM *tail;
 
39
    pthread_mutex_t lock;
 
40
    pthread_cond_t  cond;
 
41
};
 
42
 
 
43
/* Lock for connection freelist */
 
44
static pthread_mutex_t conn_lock;
 
45
 
 
46
/* Lock for cache operations (item_*, assoc_*) */
 
47
static pthread_mutex_t cache_lock;
 
48
 
 
49
/* Lock for slab allocator operations */
 
50
static pthread_mutex_t slabs_lock;
 
51
 
 
52
/* Lock for global stats */
 
53
static pthread_mutex_t stats_lock;
 
54
 
 
55
/* Free list of CQ_ITEM structs */
 
56
static CQ_ITEM *cqi_freelist;
 
57
static pthread_mutex_t cqi_freelist_lock;
 
58
 
 
59
/*
 
60
 * Each libevent instance has a wakeup pipe, which other threads
 
61
 * can use to signal that they've put a new connection on its queue.
 
62
 */
 
63
typedef struct {
 
64
    pthread_t thread_id;        /* unique ID of this thread */
 
65
    struct event_base *base;    /* libevent handle this thread uses */
 
66
    struct event notify_event;  /* listen event for notify pipe */
 
67
    int notify_receive_fd;      /* receiving end of notify pipe */
 
68
    int notify_send_fd;         /* sending end of notify pipe */
 
69
    CQ  new_conn_queue;         /* queue of new connections to handle */
 
70
} LIBEVENT_THREAD;
 
71
 
 
72
static LIBEVENT_THREAD *threads;
 
73
 
 
74
/*
 
75
 * Number of threads that have finished setting themselves up.
 
76
 */
 
77
static int init_count = 0;
 
78
static pthread_mutex_t init_lock;
 
79
static pthread_cond_t init_cond;
 
80
 
 
81
 
 
82
static void thread_libevent_process(int fd, short which, void *arg);
 
83
 
 
84
/*
 
85
 * Initializes a connection queue.
 
86
 */
 
87
static void cq_init(CQ *cq) {
 
88
    pthread_mutex_init(&cq->lock, NULL);
 
89
    pthread_cond_init(&cq->cond, NULL);
 
90
    cq->head = NULL;
 
91
    cq->tail = NULL;
 
92
}
 
93
 
 
94
/*
 
95
 * Waits for work on a connection queue.
 
96
 */
 
97
static CQ_ITEM *cq_pop(CQ *cq) {
 
98
    CQ_ITEM *item;
 
99
 
 
100
    pthread_mutex_lock(&cq->lock);
 
101
    while (NULL == cq->head)
 
102
        pthread_cond_wait(&cq->cond, &cq->lock);
 
103
    item = cq->head;
 
104
    cq->head = item->next;
 
105
    if (NULL == cq->head)
 
106
        cq->tail = NULL;
 
107
    pthread_mutex_unlock(&cq->lock);
 
108
 
 
109
    return item;
 
110
}
 
111
 
 
112
/*
 
113
 * Looks for an item on a connection queue, but doesn't block if there isn't
 
114
 * one.
 
115
 */
 
116
static CQ_ITEM *cq_peek(CQ *cq) {
 
117
    CQ_ITEM *item;
 
118
 
 
119
    pthread_mutex_lock(&cq->lock);
 
120
    item = cq->head;
 
121
    if (NULL != item) {
 
122
        cq->head = item->next;
 
123
        if (NULL == cq->head)
 
124
            cq->tail = NULL;
 
125
    }
 
126
    pthread_mutex_unlock(&cq->lock);
 
127
 
 
128
    return item;
 
129
}
 
130
 
 
131
/*
 
132
 * Adds an item to a connection queue.
 
133
 */
 
134
static void cq_push(CQ *cq, CQ_ITEM *item) {
 
135
    item->next = NULL;
 
136
 
 
137
    pthread_mutex_lock(&cq->lock);
 
138
    if (NULL == cq->tail)
 
139
        cq->head = item;
 
140
    else
 
141
        cq->tail->next = item;
 
142
    cq->tail = item;
 
143
    pthread_cond_signal(&cq->cond);
 
144
    pthread_mutex_unlock(&cq->lock);
 
145
}
 
146
 
 
147
/*
 
148
 * Returns a fresh connection queue item.
 
149
 */
 
150
static CQ_ITEM *cqi_new() {
 
151
    CQ_ITEM *item = NULL;
 
152
    pthread_mutex_lock(&cqi_freelist_lock);
 
153
    if (cqi_freelist) {
 
154
        item = cqi_freelist;
 
155
        cqi_freelist = item->next;
 
156
    }
 
157
    pthread_mutex_unlock(&cqi_freelist_lock);
 
158
 
 
159
    if (NULL == item) {
 
160
        int i;
 
161
 
 
162
        /* Allocate a bunch of items at once to reduce fragmentation */
 
163
        item = malloc(sizeof(CQ_ITEM) * ITEMS_PER_ALLOC);
 
164
        if (NULL == item)
 
165
            return NULL;
 
166
 
 
167
        /*
 
168
         * Link together all the new items except the first one
 
169
         * (which we'll return to the caller) for placement on
 
170
         * the freelist.
 
171
         */
 
172
        for (i = 2; i < ITEMS_PER_ALLOC; i++)
 
173
            item[i - 1].next = &item[i];
 
174
 
 
175
        pthread_mutex_lock(&cqi_freelist_lock);
 
176
        item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
 
177
        cqi_freelist = &item[1];
 
178
        pthread_mutex_unlock(&cqi_freelist_lock);
 
179
    }
 
180
 
 
181
    return item;
 
182
}
 
183
 
 
184
 
 
185
/*
 
186
 * Frees a connection queue item (adds it to the freelist.)
 
187
 */
 
188
static void cqi_free(CQ_ITEM *item) {
 
189
    pthread_mutex_lock(&cqi_freelist_lock);
 
190
    item->next = cqi_freelist;
 
191
    cqi_freelist = item;
 
192
    pthread_mutex_unlock(&cqi_freelist_lock);
 
193
}
 
194
 
 
195
 
 
196
/*
 
197
 * Creates a worker thread.
 
198
 */
 
199
static void create_worker(void *(*func)(void *), void *arg) {
 
200
    pthread_t       thread;
 
201
    pthread_attr_t  attr;
 
202
    int             ret;
 
203
 
 
204
    pthread_attr_init(&attr);
 
205
 
 
206
    if (ret = pthread_create(&thread, &attr, func, arg)) {
 
207
        fprintf(stderr, "Can't create thread: %s\n",
 
208
                strerror(ret));
 
209
        exit(1);
 
210
    }
 
211
}
 
212
 
 
213
 
 
214
/*
 
215
 * Pulls a conn structure from the freelist, if one is available.
 
216
 */
 
217
conn *mt_conn_from_freelist() {
 
218
    conn *c;
 
219
 
 
220
    pthread_mutex_lock(&conn_lock);
 
221
    c = do_conn_from_freelist();
 
222
    pthread_mutex_unlock(&conn_lock);
 
223
 
 
224
    return c;
 
225
}
 
226
 
 
227
 
 
228
/*
 
229
 * Adds a conn structure to the freelist.
 
230
 *
 
231
 * Returns 0 on success, 1 if the structure couldn't be added.
 
232
 */
 
233
int mt_conn_add_to_freelist(conn *c) {
 
234
    int result;
 
235
 
 
236
    pthread_mutex_lock(&conn_lock);
 
237
    result = do_conn_add_to_freelist(c);
 
238
    pthread_mutex_unlock(&conn_lock);
 
239
 
 
240
    return result;
 
241
}
 
242
 
 
243
/****************************** LIBEVENT THREADS *****************************/
 
244
 
 
245
/*
 
246
 * Set up a thread's information.
 
247
 */
 
248
static void setup_thread(LIBEVENT_THREAD *me) {
 
249
    if (! me->base) {
 
250
        me->base = event_init();
 
251
        if (! me->base) {
 
252
            fprintf(stderr, "Can't allocate event base\n");
 
253
            exit(1);
 
254
        }
 
255
    }
 
256
 
 
257
    /* Listen for notifications from other threads */
 
258
    event_set(&me->notify_event, me->notify_receive_fd,
 
259
              EV_READ | EV_PERSIST, thread_libevent_process, me);
 
260
    event_base_set(me->base, &me->notify_event);
 
261
 
 
262
    if (event_add(&me->notify_event, 0) == -1) {
 
263
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
 
264
        exit(1);
 
265
    }
 
266
 
 
267
    cq_init(&me->new_conn_queue);
 
268
}
 
269
 
 
270
 
 
271
/*
 
272
 * Worker thread: main event loop
 
273
 */
 
274
static void *worker_libevent(void *arg) {
 
275
    LIBEVENT_THREAD *me = arg;
 
276
 
 
277
    /* Any per-thread setup can happen here; thread_init() will block until
 
278
     * all threads have finished initializing.
 
279
     */
 
280
 
 
281
    pthread_mutex_lock(&init_lock);
 
282
    init_count++;
 
283
    pthread_cond_signal(&init_cond);
 
284
    pthread_mutex_unlock(&init_lock);
 
285
 
 
286
    event_base_loop(me->base, 0);
 
287
}
 
288
 
 
289
 
 
290
/*
 
291
 * Processes an incoming "handle a new connection" item. This is called when
 
292
 * input arrives on the libevent wakeup pipe.
 
293
 */
 
294
static void thread_libevent_process(int fd, short which, void *arg) {
 
295
    LIBEVENT_THREAD *me = arg;
 
296
    CQ_ITEM *item;
 
297
    char buf[1];
 
298
 
 
299
    if (read(fd, buf, 1) != 1)
 
300
        if (settings.verbose > 0)
 
301
            fprintf(stderr, "Can't read from libevent pipe\n");
 
302
 
 
303
    if (item = cq_peek(&me->new_conn_queue)) {
 
304
    conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
 
305
                       item->read_buffer_size, item->is_udp, me->base);
 
306
    if (!c) {
 
307
        if (item->is_udp) {
 
308
            fprintf(stderr, "Can't listen for events on UDP socket\n");
 
309
            exit(1);
 
310
            }
 
311
        else {
 
312
        if (settings.verbose > 0) {
 
313
                fprintf(stderr, "Can't listen for events on fd %d\n",
 
314
                    item->sfd);
 
315
        }
 
316
        close(item->sfd);
 
317
        }
 
318
    }
 
319
        cqi_free(item);
 
320
    }
 
321
}
 
322
 
 
323
/* Which thread we assigned a connection to most recently. */
 
324
static int last_thread = -1;
 
325
 
 
326
/*
 
327
 * Dispatches a new connection to another thread. This is only ever called
 
328
 * from the main thread, either during initialization (for UDP) or because
 
329
 * of an incoming connection.
 
330
 */
 
331
void dispatch_conn_new(int sfd, int init_state, int event_flags,
 
332
                       int read_buffer_size, int is_udp) {
 
333
    CQ_ITEM *item = cqi_new();
 
334
    int thread = (last_thread + 1) % settings.num_threads;
 
335
 
 
336
    last_thread = thread;
 
337
 
 
338
    item->sfd = sfd;
 
339
    item->init_state = init_state;
 
340
    item->event_flags = event_flags;
 
341
    item->read_buffer_size = read_buffer_size;
 
342
    item->is_udp = is_udp;
 
343
 
 
344
    cq_push(&threads[thread].new_conn_queue, item);
 
345
    if (write(threads[thread].notify_send_fd, "", 1) != 1) {
 
346
        perror("Writing to thread notify pipe");
 
347
    }
 
348
}
 
349
 
 
350
/*
 
351
 * Returns true if this is the thread that listens for new TCP connections.
 
352
 */
 
353
int mt_is_listen_thread() {
 
354
    return pthread_self() == threads[0].thread_id;
 
355
}
 
356
 
 
357
/********************************* ITEM ACCESS *******************************/
 
358
 
 
359
/*
 
360
 * Walks through the list of deletes that have been deferred because the items
 
361
 * were locked down at the tmie.
 
362
 */
 
363
void mt_run_deferred_deletes() {
 
364
    pthread_mutex_lock(&cache_lock);
 
365
    do_run_deferred_deletes();
 
366
    pthread_mutex_unlock(&cache_lock);
 
367
}
 
368
 
 
369
/*
 
370
 * Allocates a new item.
 
371
 */
 
372
item *mt_item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
 
373
    item *it;
 
374
    pthread_mutex_lock(&cache_lock);
 
375
    it = do_item_alloc(key, nkey, flags, exptime, nbytes);
 
376
    pthread_mutex_unlock(&cache_lock);
 
377
    return it;
 
378
}
 
379
 
 
380
/*
 
381
 * Returns an item if it hasn't been marked as expired or deleted,
 
382
 * lazy-expiring as needed.
 
383
 */
 
384
item *mt_item_get_notedeleted(char *key, size_t nkey, bool *delete_locked) {
 
385
    item *it;
 
386
    pthread_mutex_lock(&cache_lock);
 
387
    it = do_item_get_notedeleted(key, nkey, delete_locked);
 
388
    pthread_mutex_unlock(&cache_lock);
 
389
    return it;
 
390
}
 
391
 
 
392
/*
 
393
 * Returns an item whether or not it's been marked as expired or deleted.
 
394
 */
 
395
item *mt_item_get_nocheck(char *key, size_t nkey) {
 
396
    item *it;
 
397
 
 
398
    pthread_mutex_lock(&cache_lock);
 
399
    it = assoc_find(key, nkey);
 
400
    it->refcount++;
 
401
    pthread_mutex_unlock(&cache_lock);
 
402
    return it;
 
403
}
 
404
 
 
405
/*
 
406
 * Links an item into the LRU and hashtable.
 
407
 */
 
408
int mt_item_link(item *item) {
 
409
    int ret;
 
410
 
 
411
    pthread_mutex_lock(&cache_lock);
 
412
    ret = do_item_link(item);
 
413
    pthread_mutex_unlock(&cache_lock);
 
414
    return ret;
 
415
}
 
416
 
 
417
/*
 
418
 * Decrements the reference count on an item and adds it to the freelist if
 
419
 * needed.
 
420
 */
 
421
void mt_item_remove(item *item) {
 
422
    pthread_mutex_lock(&cache_lock);
 
423
    do_item_remove(item);
 
424
    pthread_mutex_unlock(&cache_lock);
 
425
}
 
426
 
 
427
/*
 
428
 * Replaces one item with another in the hashtable.
 
429
 */
 
430
int mt_item_replace(item *old, item *new) {
 
431
    int ret;
 
432
 
 
433
    pthread_mutex_lock(&cache_lock);
 
434
    ret = do_item_replace(old, new);
 
435
    pthread_mutex_unlock(&cache_lock);
 
436
    return ret;
 
437
}
 
438
 
 
439
/*
 
440
 * Unlinks an item from the LRU and hashtable.
 
441
 */
 
442
void mt_item_unlink(item *item) {
 
443
    pthread_mutex_lock(&cache_lock);
 
444
    do_item_unlink(item);
 
445
    pthread_mutex_unlock(&cache_lock);
 
446
}
 
447
 
 
448
/*
 
449
 * Moves an item to the back of the LRU queue.
 
450
 */
 
451
void mt_item_update(item *item) {
 
452
    pthread_mutex_lock(&cache_lock);
 
453
    do_item_update(item);
 
454
    pthread_mutex_unlock(&cache_lock);
 
455
}
 
456
 
 
457
/*
 
458
 * Adds an item to the deferred-delete list so it can be reaped later.
 
459
 */
 
460
char *mt_defer_delete(item *item, time_t exptime) {
 
461
    char *ret;
 
462
 
 
463
    pthread_mutex_lock(&cache_lock);
 
464
    ret = do_defer_delete(item, exptime);
 
465
    pthread_mutex_unlock(&cache_lock);
 
466
    return ret;
 
467
}
 
468
 
 
469
/*
 
470
 * Does arithmetic on a numeric item value.
 
471
 */
 
472
char *mt_add_delta(item *item, int incr, unsigned int delta, char *buf) {
 
473
    char *ret;
 
474
 
 
475
    pthread_mutex_lock(&cache_lock);
 
476
    ret = do_add_delta(item, incr, delta, buf);
 
477
    pthread_mutex_unlock(&cache_lock);
 
478
    return ret;
 
479
}
 
480
 
 
481
/*
 
482
 * Stores an item in the cache (high level, obeys set/add/replace semantics)
 
483
 */
 
484
int mt_store_item(item *item, int comm) {
 
485
    int ret;
 
486
 
 
487
    pthread_mutex_lock(&cache_lock);
 
488
    ret = do_store_item(item, comm);
 
489
    pthread_mutex_unlock(&cache_lock);
 
490
    return ret;
 
491
}
 
492
 
 
493
/*
 
494
 * Flushes expired items after a flush_all call
 
495
 */
 
496
void mt_item_flush_expired() {
 
497
    pthread_mutex_lock(&cache_lock);
 
498
    do_item_flush_expired();
 
499
    pthread_mutex_unlock(&cache_lock);
 
500
}
 
501
 
 
502
/****************************** HASHTABLE MODULE *****************************/
 
503
 
 
504
void mt_assoc_move_next_bucket() {
 
505
    pthread_mutex_lock(&cache_lock);
 
506
    do_assoc_move_next_bucket();
 
507
    pthread_mutex_unlock(&cache_lock);
 
508
}
 
509
 
 
510
/******************************* SLAB ALLOCATOR ******************************/
 
511
 
 
512
void *mt_slabs_alloc(size_t size) {
 
513
    void *ret;
 
514
 
 
515
    pthread_mutex_lock(&slabs_lock);
 
516
    ret = do_slabs_alloc(size);
 
517
    pthread_mutex_unlock(&slabs_lock);
 
518
    return ret;
 
519
}
 
520
 
 
521
void mt_slabs_free(void *ptr, size_t size) {
 
522
    pthread_mutex_lock(&slabs_lock);
 
523
    do_slabs_free(ptr, size);
 
524
    pthread_mutex_unlock(&slabs_lock);
 
525
}
 
526
 
 
527
char *mt_slabs_stats(int *buflen) {
 
528
    char *ret;
 
529
 
 
530
    pthread_mutex_lock(&slabs_lock);
 
531
    ret = do_slabs_stats(buflen);
 
532
    pthread_mutex_unlock(&slabs_lock);
 
533
    return ret;
 
534
}
 
535
 
 
536
#ifdef ALLOW_SLABS_REASSIGN
 
537
int mt_slabs_reassign(unsigned char srcid, unsigned char dstid) {
 
538
    int ret;
 
539
 
 
540
    pthread_mutex_lock(&slabs_lock);
 
541
    ret = do_slabs_reassign(srcid, dstid);
 
542
    pthread_mutex_unlock(&slabs_lock);
 
543
    return ret;
 
544
}
 
545
#endif
 
546
 
 
547
/******************************* GLOBAL STATS ******************************/
 
548
 
 
549
void mt_stats_lock() {
 
550
    pthread_mutex_lock(&stats_lock);
 
551
}
 
552
 
 
553
void mt_stats_unlock() {
 
554
    pthread_mutex_unlock(&stats_lock);
 
555
}
 
556
 
 
557
/*
 
558
 * Initializes the thread subsystem, creating various worker threads.
 
559
 *
 
560
 * nthreads  Number of event handler threads to spawn
 
561
 * main_base Event base for main thread
 
562
 */
 
563
void thread_init(int nthreads, struct event_base *main_base) {
 
564
    int         i;
 
565
    pthread_t   *thread;
 
566
 
 
567
    pthread_mutex_init(&cache_lock, NULL);
 
568
    pthread_mutex_init(&conn_lock, NULL);
 
569
    pthread_mutex_init(&slabs_lock, NULL);
 
570
    pthread_mutex_init(&stats_lock, NULL);
 
571
 
 
572
    pthread_mutex_init(&init_lock, NULL);
 
573
    pthread_cond_init(&init_cond, NULL);
 
574
 
 
575
    pthread_mutex_init(&cqi_freelist_lock, NULL);
 
576
    cqi_freelist = NULL;
 
577
 
 
578
    threads = malloc(sizeof(LIBEVENT_THREAD) * nthreads);
 
579
    if (! threads) {
 
580
        perror("Can't allocate thread descriptors");
 
581
        exit(1);
 
582
    }
 
583
 
 
584
    threads[0].base = main_base;
 
585
    threads[0].thread_id = pthread_self();
 
586
 
 
587
    for (i = 0; i < nthreads; i++) {
 
588
        int fds[2];
 
589
        if (pipe(fds)) {
 
590
            perror("Can't create notify pipe");
 
591
            exit(1);
 
592
        }
 
593
 
 
594
        threads[i].notify_receive_fd = fds[0];
 
595
        threads[i].notify_send_fd = fds[1];
 
596
 
 
597
    setup_thread(&threads[i]);
 
598
    }
 
599
 
 
600
    /* Create threads after we've done all the libevent setup. */
 
601
    for (i = 1; i < nthreads; i++) {
 
602
        create_worker(worker_libevent, &threads[i]);
 
603
    }
 
604
 
 
605
    /* Wait for all the threads to set themselves up before returning. */
 
606
    pthread_mutex_lock(&init_lock);
 
607
    init_count++; // main thread
 
608
    while (init_count < nthreads) {
 
609
        pthread_cond_wait(&init_cond, &init_lock);
 
610
    }
 
611
    pthread_mutex_unlock(&init_lock);
 
612
}
 
613
 
 
614
#endif