~ubuntu-branches/ubuntu/jaunty/memcached/jaunty

« back to all changes in this revision

Viewing changes to memcached.c

  • Committer: Bazaar Package Importer
  • Author(s): Jay Bonci
  • Date: 2007-05-02 11:35:42 UTC
  • mfrom: (1.1.2 upstream)
  • Revision ID: james.westby@ubuntu.com-20070502113542-qpoxsq28fjb7s9wc
Tags: 1.2.1-1
* New upstream release (Closes: #405054)
* Fix to logfile output so logrotate will work (Closes: #417941)
* Listen in on localhost by default (Closes: #383660)
* Default configuration suggests nobody by default (Closes: #391351)
* Bumped policy version to 3.7.2.2 (No other changes)

Show diffs side-by-side

added added

removed removed

Lines of Context:
13
13
 *      Anatoly Vorobey <mellon@pobox.com>
14
14
 *      Brad Fitzpatrick <brad@danga.com>
15
15
 *
16
 
 *  $Id: memcached.c,v 1.56 2005/04/05 00:10:26 bradfitz Exp $
 
16
 *  $Id: memcached.c 450 2006-11-26 21:33:47Z sgrimm $
17
17
 */
18
 
 
19
18
#include "config.h"
20
19
#include <sys/types.h>
21
20
#include <sys/stat.h>
22
21
#include <sys/time.h>
23
22
#include <sys/socket.h>
 
23
#include <sys/un.h>
24
24
#include <sys/signal.h>
25
25
#include <sys/resource.h>
 
26
#include <sys/uio.h>
 
27
 
26
28
/* some POSIX systems need the following definition
27
29
 * to get mlockall flags out of sys/mman.h.  */
28
30
#ifndef _P1003_1B_VISIBLE
29
31
#define _P1003_1B_VISIBLE
30
32
#endif
 
33
/* need this to get IOV_MAX on some platforms. */
 
34
#ifndef __need_IOV_MAX
 
35
#define __need_IOV_MAX
 
36
#endif
31
37
#include <pwd.h>
32
38
#include <sys/mman.h>
33
39
#include <fcntl.h>
42
48
#include <time.h>
43
49
#include <event.h>
44
50
#include <assert.h>
 
51
#include <limits.h>
45
52
 
46
53
#ifdef HAVE_MALLOC_H
 
54
/* OpenBSD has a malloc.h, but warns to use stdlib.h instead */
 
55
#ifndef __OpenBSD__
47
56
#include <malloc.h>
48
57
#endif
 
58
#endif
 
59
 
 
60
/* FreeBSD 4.x doesn't have IOV_MAX exposed. */
 
61
#ifndef IOV_MAX
 
62
#if defined(__FreeBSD__)
 
63
# define IOV_MAX 1024
 
64
#endif
 
65
#endif
49
66
 
50
67
#include "memcached.h"
51
68
 
55
72
static item **todelete = 0;
56
73
static int delcurr;
57
74
static int deltotal;
58
 
 
59
 
time_t realtime(time_t exptime) {
60
 
    time_t now;
61
 
 
 
75
static conn *listen_conn;
 
76
 
 
77
#define TRANSMIT_COMPLETE   0
 
78
#define TRANSMIT_INCOMPLETE 1
 
79
#define TRANSMIT_SOFT_ERROR 2
 
80
#define TRANSMIT_HARD_ERROR 3
 
81
 
 
82
int *buckets = 0; /* bucket->generation array for a managed instance */
 
83
 
 
84
#define REALTIME_MAXDELTA 60*60*24*30
 
85
rel_time_t realtime(time_t exptime) {
62
86
    /* no. of seconds in 30 days - largest possible delta exptime */
63
 
    #define REALTIME_MAXDELTA 60*60*24*30
64
87
 
65
88
    if (exptime == 0) return 0; /* 0 means never expire */
66
89
 
67
90
    if (exptime > REALTIME_MAXDELTA)
68
 
        return exptime;
 
91
        return (rel_time_t) (exptime - stats.started);
69
92
    else {
70
 
        now = time(0);
71
 
        return exptime + now;
 
93
        return (rel_time_t) (exptime + current_time);
72
94
    }
73
95
}
74
96
 
76
98
    stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
77
99
    stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = 0;
78
100
    stats.curr_bytes = stats.bytes_read = stats.bytes_written = 0;
79
 
    stats.started = time(0);
 
101
 
 
102
    /* make the time we started always be 2 seconds before we really
 
103
       did, so time(0) - time.started is never zero.  if so, things
 
104
       like 'settings.oldest_live' which act as booleans as well as
 
105
       values are now false in boolean context... */
 
106
    stats.started = time(0) - 2;
80
107
}
81
108
 
82
109
void stats_reset(void) {
87
114
 
88
115
void settings_init(void) {
89
116
    settings.port = 11211;
 
117
    settings.udpport = 0;
90
118
    settings.interface.s_addr = htonl(INADDR_ANY);
91
119
    settings.maxbytes = 64*1024*1024; /* default is 64MB */
92
120
    settings.maxconns = 1024;         /* to limit connections-related memory to about 5MB */
93
121
    settings.verbose = 0;
94
122
    settings.oldest_live = 0;
95
123
    settings.evict_to_free = 1;       /* push old items out of cache when memory runs out */
 
124
    settings.socketpath = NULL;       /* by default, not using a unix socket */
 
125
    settings.managed = 0;
 
126
    settings.factor = 1.25;
 
127
    settings.chunk_size = 48;         /* space for a modest key and value */
 
128
}
 
129
 
 
130
/* returns true if a deleted item's delete-locked-time is over, and it
 
131
   should be removed from the namespace */
 
132
int item_delete_lock_over (item *it) {
 
133
    assert(it->it_flags & ITEM_DELETED);
 
134
    return (current_time >= it->exptime);
 
135
}
 
136
 
 
137
/* wrapper around assoc_find which does the lazy expiration/deletion logic */
 
138
item *get_item_notedeleted(char *key, size_t nkey, int *delete_locked) {
 
139
    item *it = assoc_find(key, nkey);
 
140
    if (delete_locked) *delete_locked = 0;
 
141
    if (it && (it->it_flags & ITEM_DELETED)) {
 
142
        /* it's flagged as delete-locked.  let's see if that condition
 
143
           is past due, and the 5-second delete_timer just hasn't
 
144
           gotten to it yet... */
 
145
        if (! item_delete_lock_over(it)) {
 
146
            if (delete_locked) *delete_locked = 1;
 
147
            it = 0;
 
148
        }
 
149
    }
 
150
    if (it && settings.oldest_live && settings.oldest_live <= current_time &&
 
151
        it->time <= settings.oldest_live) {
 
152
        item_unlink(it);
 
153
        it = 0;
 
154
    }
 
155
    if (it && it->exptime && it->exptime <= current_time) {
 
156
        item_unlink(it);
 
157
        it = 0;
 
158
    }
 
159
    return it;
 
160
}
 
161
 
 
162
item *get_item(char *key, size_t nkey) {
 
163
    return get_item_notedeleted(key, nkey, 0);
 
164
}
 
165
 
 
166
/*
 
167
 * Adds a message header to a connection.
 
168
 *
 
169
 * Returns 0 on success, -1 on out-of-memory.
 
170
 */
 
171
int add_msghdr(conn *c)
 
172
{
 
173
    struct msghdr *msg;
 
174
 
 
175
    if (c->msgsize == c->msgused) {
 
176
        msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
 
177
        if (! msg)
 
178
            return -1;
 
179
        c->msglist = msg;
 
180
        c->msgsize *= 2;
 
181
    }
 
182
 
 
183
    msg = c->msglist + c->msgused;
 
184
 
 
185
    /* this wipes msg_iovlen, msg_control, msg_controllen, and
 
186
       msg_flags, the last 3 of which aren't defined on solaris: */
 
187
    memset(msg, 0, sizeof(struct msghdr));
 
188
 
 
189
    msg->msg_iov = &c->iov[c->iovused];
 
190
    msg->msg_name = &c->request_addr;
 
191
    msg->msg_namelen = c->request_addr_size;
 
192
 
 
193
    c->msgbytes = 0;
 
194
    c->msgused++;
 
195
 
 
196
    if (c->udp) {
 
197
        /* Leave room for the UDP header, which we'll fill in later. */
 
198
        return add_iov(c, NULL, UDP_HEADER_SIZE);
 
199
    }
 
200
 
 
201
    return 0;
96
202
}
97
203
 
98
204
conn **freeconns;
99
205
int freetotal;
100
206
int freecurr;
101
207
 
102
 
void set_cork (conn *c, int val) {
103
 
    if (c->is_corked == val) return;
104
 
    c->is_corked = val;
105
 
#ifdef TCP_NOPUSH
106
 
    setsockopt(c->sfd, IPPROTO_TCP, TCP_NOPUSH, &val, sizeof(val));
107
 
#endif
108
 
}
109
 
 
110
208
void conn_init(void) {
111
209
    freetotal = 200;
112
210
    freecurr = 0;
114
212
    return;
115
213
}
116
214
 
117
 
conn *conn_new(int sfd, int init_state, int event_flags) {
 
215
conn *conn_new(int sfd, int init_state, int event_flags, int read_buffer_size,
 
216
                int is_udp) {
118
217
    conn *c;
119
218
 
120
219
    /* do we have a free conn structure from a previous close? */
127
226
        }
128
227
        c->rbuf = c->wbuf = 0;
129
228
        c->ilist = 0;
130
 
 
131
 
        c->rbuf = (char *) malloc(DATA_BUFFER_SIZE);
132
 
        c->wbuf = (char *) malloc(DATA_BUFFER_SIZE);
133
 
        c->ilist = (item **) malloc(sizeof(item *)*200);
134
 
 
135
 
        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0) {
 
229
        c->iov = 0;
 
230
        c->msglist = 0;
 
231
        c->hdrbuf = 0;
 
232
 
 
233
        c->rsize = read_buffer_size;
 
234
        c->wsize = DATA_BUFFER_SIZE;
 
235
        c->isize = ITEM_LIST_INITIAL;
 
236
        c->iovsize = IOV_LIST_INITIAL;
 
237
        c->msgsize = MSG_LIST_INITIAL;
 
238
        c->hdrsize = 0;
 
239
 
 
240
        c->rbuf = (char *) malloc(c->rsize);
 
241
        c->wbuf = (char *) malloc(c->wsize);
 
242
        c->ilist = (item **) malloc(sizeof(item *) * c->isize);
 
243
        c->iov = (struct iovec *) malloc(sizeof(struct iovec) * c->iovsize);
 
244
        c->msglist = (struct msghdr *) malloc(sizeof(struct msghdr) * c->msgsize);
 
245
 
 
246
        if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
 
247
                c->msglist == 0) {
136
248
            if (c->rbuf != 0) free(c->rbuf);
137
249
            if (c->wbuf != 0) free(c->wbuf);
138
250
            if (c->ilist !=0) free(c->ilist);
 
251
            if (c->iov != 0) free(c->iov);
 
252
            if (c->msglist != 0) free(c->msglist);
139
253
            free(c);
140
254
            perror("malloc()");
141
255
            return 0;
142
256
        }
143
 
        c->rsize = c->wsize = DATA_BUFFER_SIZE;
144
 
        c->isize = 200;
 
257
 
145
258
        stats.conn_structs++;
146
259
    }
147
260
 
148
261
    if (settings.verbose > 1) {
149
262
        if (init_state == conn_listening)
150
263
            fprintf(stderr, "<%d server listening\n", sfd);
 
264
        else if (is_udp)
 
265
            fprintf(stderr, "<%d server listening (udp)\n", sfd);
151
266
        else
152
267
            fprintf(stderr, "<%d new client connection\n", sfd);
153
268
    }
154
269
 
155
270
    c->sfd = sfd;
 
271
    c->udp = is_udp;
156
272
    c->state = init_state;
157
273
    c->rlbytes = 0;
158
274
    c->rbytes = c->wbytes = 0;
159
275
    c->wcurr = c->wbuf;
160
276
    c->rcurr = c->rbuf;
161
 
    c->icurr = c->ilist; 
 
277
    c->ritem = 0;
 
278
    c->icurr = c->ilist;
162
279
    c->ileft = 0;
163
 
    c->iptr = c->ibuf;
164
 
    c->ibytes = 0;
 
280
    c->iovused = 0;
 
281
    c->msgcurr = 0;
 
282
    c->msgused = 0;
165
283
 
166
284
    c->write_and_go = conn_read;
167
285
    c->write_and_free = 0;
168
286
    c->item = 0;
169
 
 
170
 
    c->is_corked = 0;
 
287
    c->bucket = -1;
 
288
    c->gen = 0;
171
289
 
172
290
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
173
291
    c->ev_flags = event_flags;
176
294
        if (freecurr < freetotal) {
177
295
            freeconns[freecurr++] = c;
178
296
        } else {
 
297
            if (c->hdrbuf)
 
298
                free (c->hdrbuf);
 
299
            free (c->msglist);
179
300
            free (c->rbuf);
180
301
            free (c->wbuf);
181
302
            free (c->ilist);
 
303
            free (c->iov);
182
304
            free (c);
183
305
        }
184
306
        return 0;
190
312
    return c;
191
313
}
192
314
 
193
 
void conn_close(conn *c) {
194
 
    /* delete the event, the socket and the conn */
195
 
    event_del(&c->event);
196
 
 
197
 
    if (settings.verbose > 1)
198
 
        fprintf(stderr, "<%d connection closed.\n", c->sfd);
199
 
 
200
 
    close(c->sfd);
201
 
 
 
315
void conn_cleanup(conn *c) {
202
316
    if (c->item) {
203
317
        item_free(c->item);
 
318
        c->item = 0;
204
319
    }
205
320
 
206
321
    if (c->ileft) {
211
326
 
212
327
    if (c->write_and_free) {
213
328
        free(c->write_and_free);
214
 
    }
215
 
 
216
 
    /* if we have enough space in the free connections array, put the structure there */
217
 
    if (freecurr < freetotal) {
 
329
        c->write_and_free = 0;
 
330
    }
 
331
}
 
332
 
 
333
/*
 
334
 * Frees a connection.
 
335
 */
 
336
static void conn_free(conn *c) {
 
337
    if (c) {
 
338
        if (c->hdrbuf)
 
339
            free(c->hdrbuf);
 
340
        if (c->msglist)
 
341
            free(c->msglist);
 
342
        if (c->rbuf)
 
343
            free(c->rbuf);
 
344
        if (c->wbuf)
 
345
            free(c->wbuf);
 
346
        if (c->ilist)
 
347
            free(c->ilist);
 
348
        if (c->iov)
 
349
            free(c->iov);
 
350
        free(c);
 
351
    }
 
352
}
 
353
 
 
354
void conn_close(conn *c) {
 
355
    /* delete the event, the socket and the conn */
 
356
    event_del(&c->event);
 
357
 
 
358
    if (settings.verbose > 1)
 
359
        fprintf(stderr, "<%d connection closed.\n", c->sfd);
 
360
 
 
361
    close(c->sfd);
 
362
    accept_new_conns(1);
 
363
    conn_cleanup(c);
 
364
 
 
365
    /* if the connection has big buffers, just free it */
 
366
    if (c->rsize > READ_BUFFER_HIGHWAT) {
 
367
        conn_free(c);
 
368
    } else if (freecurr < freetotal) {
 
369
        /* if we have enough space in the free connections array, put the structure there */
218
370
        freeconns[freecurr++] = c;
219
371
    } else {
220
372
        /* try to enlarge free connections array */
224
376
            freeconns = new_freeconns;
225
377
            freeconns[freecurr++] = c;
226
378
        } else {
227
 
            free(c->rbuf);
228
 
            free(c->wbuf);
229
 
            free(c->ilist);
230
 
            free(c);
 
379
            conn_free(c);
231
380
        }
232
381
    }
233
382
 
236
385
    return;
237
386
}
238
387
 
 
388
/*
 
389
 * Reallocates memory and updates a buffer size if successful.
 
390
 */
 
391
int do_realloc(void **orig, int newsize, int bytes_per_item, int *size) {
 
392
    void *newbuf = realloc(*orig, newsize * bytes_per_item);
 
393
    if (newbuf) {
 
394
        *orig = newbuf;
 
395
        *size = newsize;
 
396
        return 1;
 
397
    }
 
398
    return 0;
 
399
}
 
400
 
 
401
/*
 
402
 * Shrinks a connection's buffers if they're too big.  This prevents
 
403
 * periodic large "get" requests from permanently chewing lots of server
 
404
 * memory.
 
405
 *
 
406
 * This should only be called in between requests since it can wipe output
 
407
 * buffers!
 
408
 */
 
409
void conn_shrink(conn *c) {
 
410
    if (c->udp)
 
411
        return;
 
412
 
 
413
    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
 
414
        if (c->rcurr != c->rbuf)
 
415
            memmove(c->rbuf, c->rcurr, c->rbytes);
 
416
        do_realloc((void **)&c->rbuf, DATA_BUFFER_SIZE, 1, &c->rsize);
 
417
        c->rcurr = c->rbuf;
 
418
    }
 
419
 
 
420
    if (c->isize > ITEM_LIST_HIGHWAT) {
 
421
        do_realloc((void **)&c->ilist, ITEM_LIST_INITIAL, sizeof(c->ilist[0]), &c->isize);
 
422
    }
 
423
 
 
424
    if (c->msgsize > MSG_LIST_HIGHWAT) {
 
425
        do_realloc((void **)&c->msglist, MSG_LIST_INITIAL, sizeof(c->msglist[0]), &c->msgsize);
 
426
    }
 
427
 
 
428
    if (c->iovsize > IOV_LIST_HIGHWAT) {
 
429
        do_realloc((void **)&c->iov, IOV_LIST_INITIAL, sizeof(c->iov[0]), &c->iovsize);
 
430
    }
 
431
}
 
432
 
 
433
/*
 
434
 * Sets a connection's current state in the state machine. Any special
 
435
 * processing that needs to happen on certain state transitions can
 
436
 * happen here.
 
437
 */
 
438
void conn_set_state(conn *c, int state) {
 
439
    if (state != c->state) {
 
440
        if (state == conn_read) {
 
441
            conn_shrink(c);
 
442
            assoc_move_next_bucket();
 
443
        }
 
444
        c->state = state;
 
445
    }
 
446
}
 
447
 
 
448
 
 
449
/*
 
450
 * Ensures that there is room for another struct iovec in a connection's
 
451
 * iov list.
 
452
 *
 
453
 * Returns 0 on success, -1 on out-of-memory.
 
454
 */
 
455
int ensure_iov_space(conn *c) {
 
456
    if (c->iovused >= c->iovsize) {
 
457
        int i, iovnum;
 
458
        struct iovec *new_iov = (struct iovec *) realloc(c->iov,
 
459
                                (c->iovsize * 2) * sizeof(struct iovec));
 
460
        if (! new_iov)
 
461
            return -1;
 
462
        c->iov = new_iov;
 
463
        c->iovsize *= 2;
 
464
 
 
465
        /* Point all the msghdr structures at the new list. */
 
466
        for (i = 0, iovnum = 0; i < c->msgused; i++) {
 
467
            c->msglist[i].msg_iov = &c->iov[iovnum];
 
468
            iovnum += c->msglist[i].msg_iovlen;
 
469
        }
 
470
    }
 
471
 
 
472
    return 0;
 
473
}
 
474
 
 
475
 
 
476
/*
 
477
 * Adds data to the list of pending data that will be written out to a
 
478
 * connection.
 
479
 *
 
480
 * Returns 0 on success, -1 on out-of-memory.
 
481
 */
 
482
 
 
483
int add_iov(conn *c, const void *buf, int len) {
 
484
    struct msghdr *m;
 
485
    int leftover;
 
486
    int limit_to_mtu;
 
487
 
 
488
    do {
 
489
        m = &c->msglist[c->msgused - 1];
 
490
 
 
491
        /*
 
492
         * Limit UDP packets, and the first payloads of TCP replies, to
 
493
         * UDP_MAX_PAYLOAD_SIZE bytes.
 
494
         */
 
495
        limit_to_mtu = c->udp || (1 == c->msgused);
 
496
 
 
497
        /* We may need to start a new msghdr if this one is full. */
 
498
        if (m->msg_iovlen == IOV_MAX ||
 
499
            (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
 
500
            add_msghdr(c);
 
501
            m = &c->msglist[c->msgused - 1];
 
502
        }
 
503
 
 
504
        if (ensure_iov_space(c))
 
505
            return -1;
 
506
 
 
507
        /* If the fragment is too big to fit in the datagram, split it up */
 
508
        if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
 
509
            leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
 
510
            len -= leftover;
 
511
        } else {
 
512
            leftover = 0;
 
513
        }
 
514
 
 
515
        m = &c->msglist[c->msgused - 1];
 
516
        m->msg_iov[m->msg_iovlen].iov_base = (void*) buf;
 
517
        m->msg_iov[m->msg_iovlen].iov_len = len;
 
518
 
 
519
        c->msgbytes += len;
 
520
        c->iovused++;
 
521
        m->msg_iovlen++;
 
522
 
 
523
        buf = ((char *)buf) + len;
 
524
        len = leftover;
 
525
    } while (leftover > 0);
 
526
 
 
527
    return 0;
 
528
}
 
529
 
 
530
 
 
531
/*
 
532
 * Constructs a set of UDP headers and attaches them to the outgoing messages.
 
533
 */
 
534
int build_udp_headers(conn *c) {
 
535
    int i;
 
536
    unsigned char *hdr;
 
537
 
 
538
    if (c->msgused > c->hdrsize) {
 
539
        void *new_hdrbuf;
 
540
        if (c->hdrbuf)
 
541
            new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
 
542
        else
 
543
            new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
 
544
        if (! new_hdrbuf)
 
545
            return -1;
 
546
        c->hdrbuf = (unsigned char *) new_hdrbuf;
 
547
        c->hdrsize = c->msgused * 2;
 
548
    }
 
549
 
 
550
    hdr = c->hdrbuf;
 
551
    for (i = 0; i < c->msgused; i++) {
 
552
        c->msglist[i].msg_iov[0].iov_base = hdr;
 
553
        c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
 
554
        *hdr++ = c->request_id / 256;
 
555
        *hdr++ = c->request_id % 256;
 
556
        *hdr++ = i / 256;
 
557
        *hdr++ = i % 256;
 
558
        *hdr++ = c->msgused / 256;
 
559
        *hdr++ = c->msgused % 256;
 
560
        *hdr++ = 0;
 
561
        *hdr++ = 0;
 
562
        assert((void*) hdr == (void*) c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
 
563
    }
 
564
 
 
565
    return 0;
 
566
}
 
567
 
 
568
 
239
569
void out_string(conn *c, char *str) {
240
570
    int len;
241
571
 
250
580
    }
251
581
 
252
582
    strcpy(c->wbuf, str);
253
 
    strcat(c->wbuf, "\r\n");
 
583
    strcpy(c->wbuf + len, "\r\n");
254
584
    c->wbytes = len + 2;
255
585
    c->wcurr = c->wbuf;
256
586
 
257
 
    c->state = conn_write;
 
587
    conn_set_state(c, conn_write);
258
588
    c->write_and_go = conn_read;
259
589
    return;
260
590
}
261
591
 
262
 
/* 
 
592
/*
263
593
 * we get here after reading the value in set/add/replace commands. The command
264
594
 * has been stored in c->item_comm, and the item is ready in c->item.
265
595
 */
268
598
    item *it = c->item;
269
599
    int comm = c->item_comm;
270
600
    item *old_it;
271
 
    time_t now = time(0);
 
601
    int delete_locked = 0;
 
602
    char *key = ITEM_key(it);
272
603
 
273
604
    stats.set_cmds++;
274
605
 
275
 
    while(1) {
276
 
        if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
277
 
            out_string(c, "CLIENT_ERROR bad data chunk");
278
 
            break;
279
 
        }
280
 
 
281
 
        old_it = assoc_find(ITEM_key(it));
282
 
 
283
 
        if (old_it && settings.oldest_live &&
284
 
            old_it->time <= settings.oldest_live) {
285
 
            item_unlink(old_it);
286
 
            old_it = 0;
287
 
        }
288
 
 
289
 
        if (old_it && old_it->exptime && old_it->exptime < now) {
290
 
            item_unlink(old_it);
291
 
            old_it = 0;
292
 
        }
293
 
 
294
 
        if (old_it && comm==NREAD_ADD) {
295
 
            item_update(old_it);
296
 
            out_string(c, "NOT_STORED");
297
 
            break;
298
 
        }
299
 
        
300
 
        if (!old_it && comm == NREAD_REPLACE) {
301
 
            out_string(c, "NOT_STORED");
302
 
            break;
303
 
        }
304
 
 
305
 
        if (old_it && (old_it->it_flags & ITEM_DELETED) && (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
306
 
            out_string(c, "NOT_STORED");
307
 
            break;
308
 
        }
309
 
        
310
 
        if (old_it) {
311
 
            item_replace(old_it, it);
312
 
        } else item_link(it);
313
 
        
314
 
        c->item = 0;
315
 
        out_string(c, "STORED");
 
606
    if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
 
607
        out_string(c, "CLIENT_ERROR bad data chunk");
 
608
        goto err;
 
609
    }
 
610
 
 
611
    old_it = get_item_notedeleted(key, it->nkey, &delete_locked);
 
612
 
 
613
    if (old_it && comm == NREAD_ADD) {
 
614
        item_update(old_it);  /* touches item, promotes to head of LRU */
 
615
        out_string(c, "NOT_STORED");
 
616
        goto err;
 
617
    }
 
618
 
 
619
    if (!old_it && comm == NREAD_REPLACE) {
 
620
        out_string(c, "NOT_STORED");
 
621
        goto err;
 
622
    }
 
623
 
 
624
    if (delete_locked) {
 
625
        if (comm == NREAD_REPLACE || comm == NREAD_ADD) {
 
626
            out_string(c, "NOT_STORED");
 
627
            goto err;
 
628
        }
 
629
 
 
630
        /* but "set" commands can override the delete lock
 
631
         window... in which case we have to find the old hidden item
 
632
         that's in the namespace/LRU but wasn't returned by
 
633
         get_item.... because we need to replace it (below) */
 
634
        old_it = assoc_find(key, it->nkey);
 
635
    }
 
636
 
 
637
    if (old_it)
 
638
        item_replace(old_it, it);
 
639
    else
 
640
        item_link(it);
 
641
 
 
642
    c->item = 0;
 
643
    out_string(c, "STORED");
 
644
    return;
 
645
 
 
646
err:
 
647
     item_free(it);
 
648
     c->item = 0;
 
649
     return;
 
650
}
 
651
 
 
652
typedef struct token_s {
 
653
    char* value;
 
654
    size_t length;
 
655
} token_t;
 
656
 
 
657
#define COMMAND_TOKEN 0
 
658
#define SUBCOMMAND_TOKEN 1
 
659
#define KEY_TOKEN 1
 
660
#define KEY_MAX_LENGTH 250
 
661
 
 
662
#define MAX_TOKENS 6 
 
663
 
 
664
/*
 
665
 * Tokenize the command string by replacing whitespace with '\0' and update
 
666
 * the token array tokens with pointer to start of each token and length.
 
667
 * Returns total number of tokens.  The last valid token is the terminal
 
668
 * token (value points to the first unprocessed character of the string and
 
669
 * length zero).  
 
670
 *
 
671
 * Usage example:
 
672
 *
 
673
 *  while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
 
674
 *      for(int ix = 0; tokens[ix].length != 0; ix++) {
 
675
 *          ...
 
676
 *      }
 
677
 *      ncommand = tokens[ix].value - command;
 
678
 *      command  = tokens[ix].value;
 
679
 *   }
 
680
 */
 
681
size_t tokenize_command(char* command, token_t* tokens, size_t max_tokens)  {
 
682
    char* cp;
 
683
    char* value = NULL;
 
684
    size_t length = 0;
 
685
    size_t ntokens = 0;
 
686
 
 
687
    assert(command != NULL && tokens != NULL && max_tokens > 1); 
 
688
 
 
689
    cp = command;
 
690
    while(*cp != '\0' && ntokens < max_tokens - 1) {
 
691
        if(*cp == ' ') {
 
692
            // If we've accumulated a token, this is the end of it. 
 
693
            if(length > 0) {
 
694
                tokens[ntokens].value = value;
 
695
                tokens[ntokens].length = length;
 
696
                ntokens++;
 
697
                length = 0;
 
698
                value = NULL;
 
699
            }
 
700
            *cp = '\0';
 
701
        } else {
 
702
            if(length == 0) {
 
703
                value = cp;
 
704
            }
 
705
            length++;
 
706
        }
 
707
        cp++;
 
708
    }
 
709
 
 
710
    if(ntokens < max_tokens - 1 && length > 0) {
 
711
        tokens[ntokens].value = value;
 
712
        tokens[ntokens].length = length;
 
713
        ntokens++;
 
714
    }
 
715
 
 
716
    /*
 
717
     * If we scanned the whole string, the terminal value pointer is null,
 
718
     * otherwise it is the first unprocessed character.
 
719
     */
 
720
    tokens[ntokens].value =  *cp == '\0' ? NULL : cp;
 
721
    tokens[ntokens].length = 0;
 
722
    ntokens++;
 
723
 
 
724
    return ntokens;
 
725
}
 
726
 
 
727
void process_stat(conn *c, token_t* tokens, size_t ntokens) {
 
728
    rel_time_t now = current_time;
 
729
    char* command;
 
730
    char* subcommand;
 
731
 
 
732
    if(ntokens < 2) {
 
733
        out_string(c, "CLIENT_ERROR bad command line");
316
734
        return;
317
735
    }
318
 
            
319
 
    item_free(it); 
320
 
    c->item = 0; 
321
 
    return;
322
 
}
323
 
 
324
 
void process_stat(conn *c, char *command) {
325
 
    time_t now = time(0);
326
 
 
327
 
    if (strcmp(command, "stats") == 0) {
 
736
 
 
737
    command = tokens[COMMAND_TOKEN].value;
 
738
 
 
739
    if (ntokens == 2 && strcmp(command, "stats") == 0) {
328
740
        char temp[1024];
329
741
        pid_t pid = getpid();
330
742
        char *pos = temp;
331
743
        struct rusage usage;
332
 
        
 
744
 
333
745
        getrusage(RUSAGE_SELF, &usage);
334
746
 
335
747
        pos += sprintf(pos, "STAT pid %u\r\n", pid);
336
 
        pos += sprintf(pos, "STAT uptime %lu\r\n", now - stats.started);
337
 
        pos += sprintf(pos, "STAT time %ld\r\n", now);
 
748
        pos += sprintf(pos, "STAT uptime %u\r\n", now);
 
749
        pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
338
750
        pos += sprintf(pos, "STAT version " VERSION "\r\n");
 
751
        pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void*));
339
752
        pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
340
753
        pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
341
754
        pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
344
757
        pos += sprintf(pos, "STAT curr_connections %u\r\n", stats.curr_conns - 1); /* ignore listening conn */
345
758
        pos += sprintf(pos, "STAT total_connections %u\r\n", stats.total_conns);
346
759
        pos += sprintf(pos, "STAT connection_structures %u\r\n", stats.conn_structs);
347
 
        pos += sprintf(pos, "STAT cmd_get %u\r\n", stats.get_cmds);
348
 
        pos += sprintf(pos, "STAT cmd_set %u\r\n", stats.set_cmds);
349
 
        pos += sprintf(pos, "STAT get_hits %u\r\n", stats.get_hits);
350
 
        pos += sprintf(pos, "STAT get_misses %u\r\n", stats.get_misses);
 
760
        pos += sprintf(pos, "STAT cmd_get %llu\r\n", stats.get_cmds);
 
761
        pos += sprintf(pos, "STAT cmd_set %llu\r\n", stats.set_cmds);
 
762
        pos += sprintf(pos, "STAT get_hits %llu\r\n", stats.get_hits);
 
763
        pos += sprintf(pos, "STAT get_misses %llu\r\n", stats.get_misses);
351
764
        pos += sprintf(pos, "STAT bytes_read %llu\r\n", stats.bytes_read);
352
765
        pos += sprintf(pos, "STAT bytes_written %llu\r\n", stats.bytes_written);
353
 
        pos += sprintf(pos, "STAT limit_maxbytes %u\r\n", settings.maxbytes);
 
766
        pos += sprintf(pos, "STAT limit_maxbytes %llu\r\n", (unsigned long long) settings.maxbytes);
354
767
        pos += sprintf(pos, "END");
355
768
        out_string(c, temp);
356
769
        return;
357
770
    }
358
771
 
359
 
    if (strcmp(command, "stats reset") == 0) {
 
772
    subcommand = tokens[SUBCOMMAND_TOKEN].value;
 
773
 
 
774
    if (strcmp(subcommand, "reset") == 0) {
360
775
        stats_reset();
361
776
        out_string(c, "RESET");
362
777
        return;
364
779
 
365
780
#ifdef HAVE_MALLOC_H
366
781
#ifdef HAVE_STRUCT_MALLINFO
367
 
    if (strcmp(command, "stats malloc") == 0) {
 
782
    if (strcmp(subcommand, "malloc") == 0) {
368
783
        char temp[512];
369
784
        struct mallinfo info;
370
785
        char *pos = temp;
386
801
#endif /* HAVE_STRUCT_MALLINFO */
387
802
#endif /* HAVE_MALLOC_H */
388
803
 
389
 
    if (strcmp(command, "stats maps") == 0) {
 
804
    if (strcmp(subcommand, "maps") == 0) {
390
805
        char *wbuf;
391
806
        int wsize = 8192; /* should be enough */
392
807
        int fd;
419
834
        strcpy(wbuf + res, "END\r\n");
420
835
        c->write_and_free=wbuf;
421
836
        c->wcurr=wbuf;
422
 
        c->wbytes = res + 6;
423
 
        c->state = conn_write;
 
837
        c->wbytes = res + 5; // Don't write the terminal '\0' 
 
838
        conn_set_state(c, conn_write);
424
839
        c->write_and_go = conn_read;
425
840
        close(fd);
426
841
        return;
427
842
    }
428
843
 
429
 
    if (strncmp(command, "stats cachedump", 15) == 0) {
 
844
    if (strcmp(subcommand, "cachedump") == 0) {
 
845
 
430
846
        char *buf;
431
847
        unsigned int bytes, id, limit = 0;
432
 
        char *start = command + 15;
433
 
        if (sscanf(start, "%u %u\r\n", &id, &limit) < 1) {
 
848
 
 
849
        if(ntokens < 5) {
434
850
            out_string(c, "CLIENT_ERROR bad command line");
435
851
            return;
436
852
        }
437
853
 
 
854
        id = strtoul(tokens[2].value, NULL, 10);
 
855
        limit = strtoul(tokens[3].value, NULL, 10);
 
856
 
 
857
        if(errno == ERANGE) {
 
858
            out_string(c, "CLIENT_ERROR bad command line format");
 
859
            return;
 
860
        }
 
861
 
438
862
        buf = item_cachedump(id, limit, &bytes);
439
863
        if (buf == 0) {
440
864
            out_string(c, "SERVER_ERROR out of memory");
444
868
        c->write_and_free = buf;
445
869
        c->wcurr = buf;
446
870
        c->wbytes = bytes;
447
 
        c->state = conn_write;
 
871
        conn_set_state(c, conn_write);
448
872
        c->write_and_go = conn_read;
449
873
        return;
450
874
    }
451
875
 
452
 
    if (strcmp(command, "stats slabs")==0) {
 
876
    if (strcmp(subcommand, "slabs")==0) {
453
877
        int bytes = 0;
454
878
        char *buf = slabs_stats(&bytes);
455
879
        if (!buf) {
459
883
        c->write_and_free = buf;
460
884
        c->wcurr = buf;
461
885
        c->wbytes = bytes;
462
 
        c->state = conn_write;
 
886
        conn_set_state(c, conn_write);
463
887
        c->write_and_go = conn_read;
464
888
        return;
465
889
    }
466
890
 
467
 
    if (strcmp(command, "stats items")==0) {
 
891
    if (strcmp(subcommand, "items")==0) {
468
892
        char buffer[4096];
469
893
        item_stats(buffer, 4096);
470
894
        out_string(c, buffer);
471
895
        return;
472
896
    }
473
897
 
474
 
    if (strcmp(command, "stats sizes")==0) {
 
898
    if (strcmp(subcommand, "sizes")==0) {
475
899
        int bytes = 0;
476
900
        char *buf = item_stats_sizes(&bytes);
477
901
        if (! buf) {
482
906
        c->write_and_free = buf;
483
907
        c->wcurr = buf;
484
908
        c->wbytes = bytes;
485
 
        c->state = conn_write;
 
909
        conn_set_state(c, conn_write);
486
910
        c->write_and_go = conn_read;
487
911
        return;
488
912
    }
490
914
    out_string(c, "ERROR");
491
915
}
492
916
 
493
 
void process_command(conn *c, char *command) {
494
 
    
495
 
    int comm = 0;
496
 
    int incr = 0;
497
 
 
498
 
    /* 
499
 
     * for commands set/add/replace, we build an item and read the data
500
 
     * directly into it, then continue in nread_complete().
501
 
     */ 
502
 
 
503
 
    if (settings.verbose > 1)
504
 
        fprintf(stderr, "<%d %s\n", c->sfd, command);
505
 
 
506
 
    /* All incoming commands will require a response, so we cork at the beginning,
507
 
       and uncork at the very end (usually by means of out_string)  */
508
 
    set_cork(c, 1);
509
 
 
510
 
    if ((strncmp(command, "add ", 4) == 0 && (comm = NREAD_ADD)) || 
511
 
        (strncmp(command, "set ", 4) == 0 && (comm = NREAD_SET)) ||
512
 
        (strncmp(command, "replace ", 8) == 0 && (comm = NREAD_REPLACE))) {
513
 
 
514
 
        char key[251];
515
 
        int flags;
516
 
        time_t expire;
517
 
        int len, res;
518
 
        item *it;
519
 
 
520
 
        res = sscanf(command, "%*s %250s %u %ld %d\n", key, &flags, &expire, &len);
521
 
        if (res!=4 || strlen(key)==0 ) {
522
 
            out_string(c, "CLIENT_ERROR bad command line format");
523
 
            return;
524
 
        }
525
 
        expire = realtime(expire);
526
 
        it = item_alloc(key, flags, expire, len+2);
527
 
        if (it == 0) {
528
 
            out_string(c, "SERVER_ERROR out of memory");
529
 
            /* swallow the data line */
530
 
            c->write_and_go = conn_swallow;
531
 
            c->sbytes = len+2;
532
 
            return;
533
 
        }
534
 
 
535
 
        c->item_comm = comm;
536
 
        c->item = it;
537
 
        c->rcurr = ITEM_data(it);
538
 
        c->rlbytes = it->nbytes;
539
 
        c->state = conn_nread;
540
 
        return;
 
917
inline void process_get_command(conn *c, token_t* tokens, size_t ntokens) {
 
918
    char *key;
 
919
    size_t nkey;
 
920
    int i = 0;
 
921
    item *it;
 
922
    token_t* key_token = &tokens[KEY_TOKEN];
 
923
 
 
924
    if (settings.managed) {
 
925
        int bucket = c->bucket;
 
926
        if (bucket == -1) {
 
927
            out_string(c, "CLIENT_ERROR no BG data in managed mode");
 
928
            return;
 
929
        }
 
930
        c->bucket = -1;
 
931
        if (buckets[bucket] != c->gen) {
 
932
            out_string(c, "ERROR_NOT_OWNER");
 
933
            return;
 
934
        }
541
935
    }
542
936
 
543
 
    if ((strncmp(command, "incr ", 5) == 0 && (incr = 1)) ||
544
 
        (strncmp(command, "decr ", 5) == 0)) {
545
 
        char temp[32];
546
 
        unsigned int value;
547
 
        item *it;
548
 
        unsigned int delta;
549
 
        char key[251];
550
 
        int res;
551
 
        char *ptr;
552
 
        time_t now = time(0);
553
 
 
554
 
        res = sscanf(command, "%*s %250s %u\n", key, &delta);
555
 
        if (res!=2 || strlen(key)==0 ) {
556
 
            out_string(c, "CLIENT_ERROR bad command line format");
557
 
            return;
558
 
        }
559
 
        
560
 
        it = assoc_find(key);
561
 
        if (it && (it->it_flags & ITEM_DELETED)) {
562
 
            it = 0;
563
 
        }
564
 
        if (it && it->exptime && it->exptime < now) {
565
 
            item_unlink(it);
566
 
            it = 0;
567
 
        }
568
 
 
569
 
        if (!it) {
570
 
            out_string(c, "NOT_FOUND");
571
 
            return;
572
 
        }
573
 
 
574
 
        ptr = ITEM_data(it);
575
 
        while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++;
576
 
        
577
 
        value = atoi(ptr);
578
 
 
579
 
        if (incr)
580
 
            value+=delta;
581
 
        else {
582
 
            if (delta >= value) value = 0;
583
 
            else value-=delta;
584
 
        }
585
 
 
586
 
        sprintf(temp, "%u", value);
587
 
        res = strlen(temp);
588
 
        if (res + 2 > it->nbytes) { /* need to realloc */
589
 
            item *new_it;
590
 
            new_it = item_alloc(ITEM_key(it), it->flags, it->exptime, res + 2 );
591
 
            if (new_it == 0) {
592
 
                out_string(c, "SERVER_ERROR out of memory");
 
937
    do {
 
938
        while(key_token->length != 0) {
 
939
            
 
940
            key = key_token->value;
 
941
            nkey = key_token->length;
 
942
            
 
943
            if(nkey > KEY_MAX_LENGTH) {
 
944
                out_string(c, "CLIENT_ERROR bad command line format");
593
945
                return;
594
946
            }
595
 
            memcpy(ITEM_data(new_it), temp, res);
596
 
            memcpy(ITEM_data(new_it) + res, "\r\n", 2);
597
 
            item_replace(it, new_it);
598
 
        } else { /* replace in-place */
599
 
            memcpy(ITEM_data(it), temp, res);
600
 
            memset(ITEM_data(it) + res, ' ', it->nbytes-res-2);
601
 
        }
602
 
        out_string(c, temp);
603
 
        return;
604
 
    }
605
 
        
606
 
    if (strncmp(command, "get ", 4) == 0) {
607
 
 
608
 
        char *start = command + 4;
609
 
        char key[251];
610
 
        int next;
611
 
        int i = 0;
612
 
        item *it;
613
 
        time_t now = time(0);
614
 
 
615
 
        while(sscanf(start, " %250s%n", key, &next) >= 1) {
616
 
            start+=next;
 
947
                
617
948
            stats.get_cmds++;
618
 
            it = assoc_find(key);
619
 
            if (it && (it->it_flags & ITEM_DELETED)) {
620
 
                it = 0;
621
 
            }
622
 
            if (settings.oldest_live && it &&
623
 
                it->time <= settings.oldest_live) {
624
 
                item_unlink(it);
625
 
                it = 0;
626
 
            }
627
 
            if (it && it->exptime && it->exptime < now) {
628
 
                item_unlink(it);
629
 
                it = 0;
630
 
            }
631
 
 
 
949
            it = get_item(key, nkey);
632
950
            if (it) {
633
951
                if (i >= c->isize) {
634
952
                    item **new_list = realloc(c->ilist, sizeof(item *)*c->isize*2);
637
955
                        c->ilist = new_list;
638
956
                    } else break;
639
957
                }
 
958
                    
 
959
                /*
 
960
                 * Construct the response. Each hit adds three elements to the
 
961
                 * outgoing data list:
 
962
                 *   "VALUE "
 
963
                 *   key
 
964
                 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
 
965
                 */
 
966
                if (add_iov(c, "VALUE ", 6) ||
 
967
                    add_iov(c, ITEM_key(it), it->nkey) ||
 
968
                    add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes))
 
969
                    {
 
970
                        break;
 
971
                    }
 
972
                if (settings.verbose > 1)
 
973
                    fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
 
974
 
640
975
                stats.get_hits++;
641
976
                it->refcount++;
642
977
                item_update(it);
643
978
                *(c->ilist + i) = it;
644
979
                i++;
 
980
                
645
981
            } else stats.get_misses++;
646
 
        }
647
 
        c->icurr = c->ilist;
648
 
        c->ileft = i;
649
 
        if (c->ileft) {
650
 
            c->ipart = 0;
651
 
            c->state = conn_mwrite;
652
 
            c->ibytes = 0;
653
 
            return;
654
 
        } else {
655
 
            out_string(c, "END");
656
 
            return;
657
 
        }
658
 
    }
659
 
 
660
 
    if (strncmp(command, "delete ", 7) == 0) {
661
 
        char key[251];
662
 
        item *it;
663
 
        int res;
 
982
            
 
983
            key_token++;
 
984
        }
 
985
 
 
986
        /*
 
987
         * If the command string hasn't been fully processed, get the next set
 
988
         * of tokens.
 
989
         */
 
990
        if(key_token->value != NULL) {
 
991
           ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
 
992
            key_token = tokens;
 
993
        }
 
994
        
 
995
    } while(key_token->value != NULL);
 
996
 
 
997
    c->icurr = c->ilist;
 
998
    c->ileft = i;
 
999
            
 
1000
    if (settings.verbose > 1)
 
1001
        fprintf(stderr, ">%d END\n", c->sfd);
 
1002
    add_iov(c, "END\r\n", 5);
 
1003
        
 
1004
    if (c->udp && build_udp_headers(c)) {
 
1005
        out_string(c, "SERVER_ERROR out of memory");
 
1006
    }
 
1007
    else {
 
1008
        conn_set_state(c, conn_mwrite);
 
1009
        c->msgcurr = 0;
 
1010
    }
 
1011
    return;
 
1012
}
 
1013
 
 
1014
void process_update_command(conn *c, token_t* tokens, size_t ntokens, int comm) {
 
1015
    char *key;
 
1016
    size_t nkey;
 
1017
    int flags;
 
1018
    time_t exptime;
 
1019
    int vlen;
 
1020
    item *it;
 
1021
 
 
1022
    if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
 
1023
        out_string(c, "CLIENT_ERROR bad command line format");
 
1024
        return;
 
1025
    }
 
1026
 
 
1027
    key = tokens[KEY_TOKEN].value;
 
1028
    nkey = tokens[KEY_TOKEN].length;
 
1029
 
 
1030
    flags = strtoul(tokens[2].value, NULL, 10);
 
1031
    exptime = strtol(tokens[3].value, NULL, 10);
 
1032
    vlen = strtol(tokens[4].value, NULL, 10);
 
1033
    
 
1034
    if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
 
1035
        out_string(c, "CLIENT_ERROR bad command line format");
 
1036
        return;
 
1037
    }
 
1038
    
 
1039
    if (settings.managed) {
 
1040
        int bucket = c->bucket;
 
1041
        if (bucket == -1) {
 
1042
            out_string(c, "CLIENT_ERROR no BG data in managed mode");
 
1043
            return;
 
1044
        }
 
1045
        c->bucket = -1;
 
1046
        if (buckets[bucket] != c->gen) {
 
1047
            out_string(c, "ERROR_NOT_OWNER");
 
1048
            return;
 
1049
        }
 
1050
    }
 
1051
 
 
1052
    it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
 
1053
 
 
1054
    if (it == 0) {
 
1055
        if (! item_size_ok(key, nkey, flags, vlen + 2))
 
1056
            out_string(c, "SERVER_ERROR object too large for cache");
 
1057
        else
 
1058
            out_string(c, "SERVER_ERROR out of memory");
 
1059
        /* swallow the data line */
 
1060
        c->write_and_go = conn_swallow;
 
1061
        c->sbytes = vlen+2;
 
1062
        return;
 
1063
    }
 
1064
    
 
1065
    c->item_comm = comm;
 
1066
    c->item = it;
 
1067
    c->ritem = ITEM_data(it);
 
1068
    c->rlbytes = it->nbytes;
 
1069
    conn_set_state(c, conn_nread);
 
1070
    return;
 
1071
}
 
1072
 
 
1073
void process_arithmetic_command(conn *c, token_t* tokens, size_t ntokens, int incr) {
 
1074
    char temp[32];
 
1075
    unsigned int value;
 
1076
    item *it;
 
1077
    unsigned int delta;
 
1078
    char *key;
 
1079
    size_t nkey;
 
1080
    int res;
 
1081
    char *ptr;
 
1082
    
 
1083
    if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) { 
 
1084
        out_string(c, "CLIENT_ERROR bad command line format");
 
1085
        return;
 
1086
    }
 
1087
 
 
1088
    key = tokens[KEY_TOKEN].value;
 
1089
    nkey = tokens[KEY_TOKEN].length;
 
1090
        
 
1091
    if (settings.managed) {
 
1092
        int bucket = c->bucket;
 
1093
        if (bucket == -1) {
 
1094
            out_string(c, "CLIENT_ERROR no BG data in managed mode");
 
1095
            return;
 
1096
        }
 
1097
        c->bucket = -1;
 
1098
        if (buckets[bucket] != c->gen) {
 
1099
            out_string(c, "ERROR_NOT_OWNER");
 
1100
            return;
 
1101
        }
 
1102
    }
 
1103
 
 
1104
    it = get_item(key, nkey);
 
1105
    if (!it) {
 
1106
        out_string(c, "NOT_FOUND");
 
1107
        return;
 
1108
    }
 
1109
 
 
1110
    delta = strtoul(tokens[2].value, NULL, 10);
 
1111
        
 
1112
    if(errno == ERANGE) {
 
1113
        out_string(c, "CLIENT_ERROR bad command line format");
 
1114
        return;
 
1115
    }
 
1116
 
 
1117
    ptr = ITEM_data(it);
 
1118
    while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++;    // BUG: can't be true
 
1119
        
 
1120
    value = strtol(ptr, NULL, 10);
 
1121
 
 
1122
    if(errno == ERANGE) {
 
1123
        out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
 
1124
        return;
 
1125
    }
 
1126
    
 
1127
    if (incr)
 
1128
        value+=delta;
 
1129
    else {
 
1130
        if (delta >= value) value = 0;
 
1131
        else value-=delta;
 
1132
    }
 
1133
    sprintf(temp, "%u", value);
 
1134
    res = strlen(temp);
 
1135
    if (res + 2 > it->nbytes) { /* need to realloc */
 
1136
        item *new_it;
 
1137
        new_it = item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
 
1138
        if (new_it == 0) {
 
1139
            out_string(c, "SERVER_ERROR out of memory");
 
1140
            return;
 
1141
        }
 
1142
        memcpy(ITEM_data(new_it), temp, res);
 
1143
        memcpy(ITEM_data(new_it) + res, "\r\n", 2);
 
1144
        item_replace(it, new_it);
 
1145
    } else { /* replace in-place */
 
1146
        memcpy(ITEM_data(it), temp, res);
 
1147
        memset(ITEM_data(it) + res, ' ', it->nbytes-res-2);
 
1148
    }
 
1149
    out_string(c, temp);
 
1150
    return;
 
1151
}
 
1152
 
 
1153
void process_delete_command(conn *c, token_t* tokens, size_t ntokens) {
 
1154
    char *key;
 
1155
    size_t nkey;
 
1156
    item *it;
 
1157
    time_t exptime = 0;
 
1158
    
 
1159
    if (settings.managed) {
 
1160
        int bucket = c->bucket;
 
1161
        if (bucket == -1) {
 
1162
            out_string(c, "CLIENT_ERROR no BG data in managed mode");
 
1163
            return;
 
1164
        }
 
1165
        c->bucket = -1;
 
1166
        if (buckets[bucket] != c->gen) {
 
1167
            out_string(c, "ERROR_NOT_OWNER");
 
1168
            return;
 
1169
        }
 
1170
    }
 
1171
    
 
1172
    key = tokens[KEY_TOKEN].value;
 
1173
    nkey = tokens[KEY_TOKEN].length;
 
1174
 
 
1175
    if(nkey > KEY_MAX_LENGTH) {
 
1176
        out_string(c, "CLIENT_ERROR bad command line format");
 
1177
        return;
 
1178
    }
 
1179
 
 
1180
    if(ntokens == 4) {
 
1181
        exptime = strtol(tokens[2].value, NULL, 10);
 
1182
        
 
1183
        if(errno == ERANGE) {
 
1184
            out_string(c, "CLIENT_ERROR bad command line format");
 
1185
            return;
 
1186
        }
 
1187
    }
 
1188
 
 
1189
    it = get_item(key, nkey);
 
1190
    if (!it) {
 
1191
        out_string(c, "NOT_FOUND");
 
1192
        return;
 
1193
    }
 
1194
    
 
1195
    if (exptime == 0) {
 
1196
        item_unlink(it);
 
1197
        out_string(c, "DELETED");
 
1198
        return;
 
1199
    }
 
1200
    if (delcurr >= deltotal) {
 
1201
        item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
 
1202
        if (new_delete) {
 
1203
            todelete = new_delete;
 
1204
            deltotal *= 2;
 
1205
        } else { 
 
1206
            /* 
 
1207
             * can't delete it immediately, user wants a delay,
 
1208
             * but we ran out of memory for the delete queue
 
1209
             */
 
1210
            out_string(c, "SERVER_ERROR out of memory");
 
1211
            return;
 
1212
        }
 
1213
    }
 
1214
    
 
1215
    it->refcount++;
 
1216
    /* use its expiration time as its deletion time now */
 
1217
    it->exptime = realtime(exptime);
 
1218
    it->it_flags |= ITEM_DELETED;
 
1219
    todelete[delcurr++] = it;
 
1220
    out_string(c, "DELETED");
 
1221
    return;
 
1222
}
 
1223
 
 
1224
void process_command(conn *c, char *command) {
 
1225
    
 
1226
    token_t tokens[MAX_TOKENS];
 
1227
    size_t ntokens;
 
1228
    int comm;
 
1229
 
 
1230
    if (settings.verbose > 1)
 
1231
        fprintf(stderr, "<%d %s\n", c->sfd, command);
 
1232
 
 
1233
    /* 
 
1234
     * for commands set/add/replace, we build an item and read the data
 
1235
     * directly into it, then continue in nread_complete().
 
1236
     */ 
 
1237
    
 
1238
    c->msgcurr = 0;
 
1239
    c->msgused = 0;
 
1240
    c->iovused = 0;
 
1241
    if (add_msghdr(c)) {
 
1242
        out_string(c, "SERVER_ERROR out of memory");
 
1243
        return;
 
1244
    }
 
1245
 
 
1246
    ntokens = tokenize_command(command, tokens, MAX_TOKENS);
 
1247
 
 
1248
    if (ntokens >= 3 &&
 
1249
        ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
 
1250
         (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
 
1251
        
 
1252
        process_get_command(c, tokens, ntokens);
 
1253
 
 
1254
    } else if (ntokens == 6 && 
 
1255
               ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) || 
 
1256
                (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
 
1257
                (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) {
 
1258
        
 
1259
        process_update_command(c, tokens, ntokens, comm);
 
1260
 
 
1261
    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
 
1262
 
 
1263
        process_arithmetic_command(c, tokens, ntokens, 1);
 
1264
 
 
1265
    } else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
 
1266
 
 
1267
        process_arithmetic_command(c, tokens, ntokens, 0);
 
1268
 
 
1269
    } else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
 
1270
 
 
1271
        process_delete_command(c, tokens, ntokens);
 
1272
 
 
1273
    } else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
 
1274
        unsigned int bucket, gen;
 
1275
        if (!settings.managed) {
 
1276
            out_string(c, "CLIENT_ERROR not a managed instance");
 
1277
            return;
 
1278
        }
 
1279
        
 
1280
        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
 
1281
            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
 
1282
                out_string(c, "CLIENT_ERROR bucket number out of range");
 
1283
                return;
 
1284
            }
 
1285
            buckets[bucket] = gen;
 
1286
            out_string(c, "OWNED");
 
1287
            return;
 
1288
        } else {
 
1289
            out_string(c, "CLIENT_ERROR bad format");
 
1290
            return;
 
1291
        }
 
1292
 
 
1293
    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
 
1294
 
 
1295
        int bucket;
 
1296
        if (!settings.managed) {
 
1297
            out_string(c, "CLIENT_ERROR not a managed instance");
 
1298
            return;
 
1299
        }
 
1300
        if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
 
1301
            if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
 
1302
                out_string(c, "CLIENT_ERROR bucket number out of range");
 
1303
                return;
 
1304
            }
 
1305
            buckets[bucket] = 0;
 
1306
            out_string(c, "DISOWNED");
 
1307
            return;
 
1308
        } else {
 
1309
            out_string(c, "CLIENT_ERROR bad format");
 
1310
            return;
 
1311
        }
 
1312
 
 
1313
    } else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
 
1314
        int bucket, gen;
 
1315
        if (!settings.managed) {
 
1316
            out_string(c, "CLIENT_ERROR not a managed instance");
 
1317
            return;
 
1318
        }
 
1319
        if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
 
1320
            /* we never write anything back, even if input's wrong */
 
1321
            if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen<=0)) {
 
1322
                /* do nothing, bad input */
 
1323
            } else {
 
1324
                c->bucket = bucket;
 
1325
                c->gen = gen;
 
1326
            }
 
1327
            conn_set_state(c, conn_read);
 
1328
            return;
 
1329
        } else {
 
1330
            out_string(c, "CLIENT_ERROR bad format");
 
1331
            return;
 
1332
        }
 
1333
 
 
1334
    } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
 
1335
        
 
1336
        process_stat(c, tokens, ntokens);
 
1337
 
 
1338
    } else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
664
1339
        time_t exptime = 0;
665
 
 
666
 
        res = sscanf(command, "%*s %250s %ld", key, &exptime);
667
 
        it = assoc_find(key);
668
 
        if (!it) {
669
 
            out_string(c, "NOT_FOUND");
670
 
            return;
671
 
        }
672
 
 
673
 
        if (exptime == 0) {
674
 
            item_unlink(it);
675
 
            out_string(c, "DELETED");
676
 
            return;
677
 
        }
678
 
 
679
 
        if (delcurr >= deltotal) {
680
 
            item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
681
 
            if (new_delete) {
682
 
                todelete = new_delete;
683
 
                deltotal *= 2;
684
 
            } else { 
685
 
                /* 
686
 
                 * can't delete it immediately, user wants a delay,
687
 
                 * but we ran out of memory for the delete queue
688
 
                 */
689
 
                out_string(c, "SERVER_ERROR out of memory");
690
 
                return;
691
 
            }
692
 
        }
693
 
            
694
 
        exptime = realtime(exptime);
695
 
 
696
 
        it->refcount++;
697
 
        /* use its expiration time as its deletion time now */
698
 
        it->exptime = exptime;
699
 
        it->it_flags |= ITEM_DELETED;
700
 
        todelete[delcurr++] = it;
701
 
        out_string(c, "DELETED");
702
 
        return;
703
 
    }
704
 
        
705
 
    if (strncmp(command, "stats", 5) == 0) {
706
 
        process_stat(c, command);
707
 
        return;
708
 
    }
709
 
 
710
 
    if (strcmp(command, "flush_all") == 0) {
711
 
        settings.oldest_live = time(0);
 
1340
        set_current_time();
 
1341
 
 
1342
        if(ntokens == 2) {
 
1343
            settings.oldest_live = current_time - 1;
 
1344
            item_flush_expired();
 
1345
            out_string(c, "OK");
 
1346
            return;
 
1347
        }
 
1348
 
 
1349
        exptime = strtol(tokens[1].value, NULL, 10);
 
1350
        if(errno == ERANGE) {
 
1351
            out_string(c, "CLIENT_ERROR bad command line format");
 
1352
            return;
 
1353
        }
 
1354
 
 
1355
        settings.oldest_live = realtime(exptime) - 1;
 
1356
        item_flush_expired();
712
1357
        out_string(c, "OK");
713
1358
        return;
714
 
    }
 
1359
 
 
1360
    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
715
1361
 
716
 
    if (strcmp(command, "version") == 0) {
717
1362
        out_string(c, "VERSION " VERSION);
718
 
        return;
719
 
    }
720
 
 
721
 
    if (strcmp(command, "quit") == 0) {
722
 
        c->state = conn_closing;
723
 
        return;
724
 
    }
725
 
 
726
 
    if (strncmp(command, "slabs reassign ", 15) == 0) {
727
 
        int src, dst;
728
 
        char *start = command+15;
729
 
        if (sscanf(start, "%u %u\r\n", &src, &dst) == 2) {
730
 
            int rv = slabs_reassign(src, dst);
731
 
            if (rv == 1) {
732
 
                out_string(c, "DONE");
733
 
                return;
734
 
            }
735
 
            if (rv == 0) {
736
 
                out_string(c, "CANT");
737
 
                return;
738
 
            }
739
 
            if (rv == -1) {
740
 
                out_string(c, "BUSY");
741
 
                return;
742
 
            }
743
 
        }
744
 
        out_string(c, "CLIENT_ERROR bogus command");
745
 
        return;
746
 
    }
747
 
    
748
 
    out_string(c, "ERROR");
 
1363
 
 
1364
    } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
 
1365
 
 
1366
        conn_set_state(c, conn_closing);
 
1367
        
 
1368
    } else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
 
1369
                                strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
 
1370
#ifdef ALLOW_SLABS_REASSIGN
 
1371
 
 
1372
        int src, dst, rv;
 
1373
 
 
1374
        src = strtol(tokens[2].value, NULL, 10);
 
1375
        dst  = strtol(tokens[3].value, NULL, 10);
 
1376
 
 
1377
        if(errno == ERANGE) {
 
1378
            out_string(c, "CLIENT_ERROR bad command line format");
 
1379
            return;
 
1380
        }
 
1381
 
 
1382
        rv = slabs_reassign(src, dst);
 
1383
        if (rv == 1) {
 
1384
            out_string(c, "DONE");
 
1385
            return;
 
1386
        }
 
1387
        if (rv == 0) {
 
1388
            out_string(c, "CANT");
 
1389
            return;
 
1390
        }
 
1391
        if (rv == -1) {
 
1392
            out_string(c, "BUSY");
 
1393
            return;
 
1394
        }
 
1395
#else
 
1396
        out_string(c, "CLIENT_ERROR Slab reassignment not supported");
 
1397
#endif
 
1398
 
 
1399
    } else {
 
1400
        out_string(c, "ERROR");
 
1401
    }
749
1402
    return;
750
1403
}
751
1404
 
752
 
/* 
753
 
 * if we have a complete line in the buffer, process it and move whatever
754
 
 * remains in the buffer to its beginning.
 
1405
/*
 
1406
 * if we have a complete line in the buffer, process it.
755
1407
 */
756
1408
int try_read_command(conn *c) {
757
1409
    char *el, *cont;
758
1410
 
 
1411
    assert(c->rcurr <= c->rbuf + c->rsize);
 
1412
 
759
1413
    if (!c->rbytes)
760
1414
        return 0;
761
 
    el = memchr(c->rbuf, '\n', c->rbytes);
 
1415
    el = memchr(c->rcurr, '\n', c->rbytes);
762
1416
    if (!el)
763
1417
        return 0;
764
1418
    cont = el + 1;
765
 
    if (el - c->rbuf > 1 && *(el - 1) == '\r') {
 
1419
    if (el - c->rcurr > 1 && *(el - 1) == '\r') {
766
1420
        el--;
767
1421
    }
768
1422
    *el = '\0';
769
1423
 
770
 
    process_command(c, c->rbuf);
771
 
 
772
 
    if (cont - c->rbuf < c->rbytes) { /* more stuff in the buffer */
773
 
        memmove(c->rbuf, cont, c->rbytes - (cont - c->rbuf));
 
1424
    assert(cont <= c->rcurr + c->rbytes);
 
1425
 
 
1426
    process_command(c, c->rcurr);
 
1427
 
 
1428
    c->rbytes -= (cont - c->rcurr);
 
1429
    c->rcurr = cont;
 
1430
 
 
1431
    assert(c->rcurr <= c->rbuf + c->rsize);
 
1432
 
 
1433
    return 1;
 
1434
}
 
1435
 
 
1436
/*
 
1437
 * read a UDP request.
 
1438
 * return 0 if there's nothing to read.
 
1439
 */
 
1440
int try_read_udp(conn *c) {
 
1441
    int res;
 
1442
 
 
1443
    c->request_addr_size = sizeof(c->request_addr);
 
1444
    res = recvfrom(c->sfd, c->rbuf, c->rsize,
 
1445
                   0, &c->request_addr, &c->request_addr_size);
 
1446
    if (res > 8) {
 
1447
        unsigned char *buf = (unsigned char *)c->rbuf;
 
1448
        stats.bytes_read += res;
 
1449
 
 
1450
        /* Beginning of UDP packet is the request ID; save it. */
 
1451
        c->request_id = buf[0] * 256 + buf[1];
 
1452
 
 
1453
        /* If this is a multi-packet request, drop it. */
 
1454
        if (buf[4] != 0 || buf[5] != 1) {
 
1455
            out_string(c, "SERVER_ERROR multi-packet request not supported");
 
1456
            return 0;
 
1457
        }
 
1458
 
 
1459
        /* Don't care about any of the rest of the header. */
 
1460
        res -= 8;
 
1461
        memmove(c->rbuf, c->rbuf + 8, res);
 
1462
 
 
1463
        c->rbytes += res;
 
1464
        c->rcurr = c->rbuf;
 
1465
        return 1;
774
1466
    }
775
 
    c->rbytes -= (cont - c->rbuf);
776
 
    return 1;
 
1467
    return 0;
777
1468
}
778
1469
 
779
1470
/*
780
1471
 * read from network as much as we can, handle buffer overflow and connection
781
 
 * close. 
 
1472
 * close.
 
1473
 * before reading, move the remaining incomplete fragment of a command
 
1474
 * (if any) to the beginning of the buffer.
782
1475
 * return 0 if there's nothing to read on the first read.
783
1476
 */
784
1477
int try_read_network(conn *c) {
785
1478
    int gotdata = 0;
786
1479
    int res;
 
1480
 
 
1481
    if (c->rcurr != c->rbuf) {
 
1482
        if (c->rbytes != 0) /* otherwise there's nothing to copy */
 
1483
            memmove(c->rbuf, c->rcurr, c->rbytes);
 
1484
        c->rcurr = c->rbuf;
 
1485
    }
 
1486
 
787
1487
    while (1) {
788
1488
        if (c->rbytes >= c->rsize) {
789
1489
            char *new_rbuf = realloc(c->rbuf, c->rsize*2);
795
1495
                c->write_and_go = conn_closing;
796
1496
                return 1;
797
1497
            }
798
 
            c->rbuf = new_rbuf; c->rsize *= 2;
799
 
        }
 
1498
            c->rcurr  = c->rbuf = new_rbuf;
 
1499
            c->rsize *= 2;
 
1500
        }
 
1501
 
 
1502
        /* unix socket mode doesn't need this, so zeroed out.  but why
 
1503
         * is this done for every command?  presumably for UDP
 
1504
         * mode.  */
 
1505
        if (!settings.socketpath) {
 
1506
            c->request_addr_size = sizeof(c->request_addr);
 
1507
        } else {
 
1508
            c->request_addr_size = 0;
 
1509
        }
 
1510
 
800
1511
        res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
801
1512
        if (res > 0) {
802
1513
            stats.bytes_read += res;
806
1517
        }
807
1518
        if (res == 0) {
808
1519
            /* connection closed */
809
 
            c->state = conn_closing;
 
1520
            conn_set_state(c, conn_closing);
810
1521
            return 1;
811
1522
        }
812
1523
        if (res == -1) {
826
1537
    if (event_add(&c->event, 0) == -1) return 0;
827
1538
    return 1;
828
1539
}
829
 
    
 
1540
 
 
1541
/*
 
1542
 * Sets whether we are listening for new connections or not.
 
1543
 */
 
1544
void accept_new_conns(int do_accept) {
 
1545
    if (do_accept) {
 
1546
        update_event(listen_conn, EV_READ | EV_PERSIST);
 
1547
        if (listen(listen_conn->sfd, 1024)) {
 
1548
            perror("listen");
 
1549
        }
 
1550
    }
 
1551
    else {
 
1552
        update_event(listen_conn, 0);
 
1553
        if (listen(listen_conn->sfd, 0)) {
 
1554
            perror("listen");
 
1555
        }
 
1556
    }
 
1557
}
 
1558
 
 
1559
 
 
1560
/*
 
1561
 * Transmit the next chunk of data from our list of msgbuf structures.
 
1562
 *
 
1563
 * Returns:
 
1564
 *   TRANSMIT_COMPLETE   All done writing.
 
1565
 *   TRANSMIT_INCOMPLETE More data remaining to write.
 
1566
 *   TRANSMIT_SOFT_ERROR Can't write any more right now.
 
1567
 *   TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
 
1568
 */
 
1569
int transmit(conn *c) {
 
1570
    int res;
 
1571
 
 
1572
    if (c->msgcurr < c->msgused &&
 
1573
            c->msglist[c->msgcurr].msg_iovlen == 0) {
 
1574
        /* Finished writing the current msg; advance to the next. */
 
1575
        c->msgcurr++;
 
1576
    }
 
1577
    if (c->msgcurr < c->msgused) {
 
1578
        struct msghdr *m = &c->msglist[c->msgcurr];
 
1579
        res = sendmsg(c->sfd, m, 0);
 
1580
        if (res > 0) {
 
1581
            stats.bytes_written += res;
 
1582
 
 
1583
            /* We've written some of the data. Remove the completed
 
1584
               iovec entries from the list of pending writes. */
 
1585
            while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
 
1586
                res -= m->msg_iov->iov_len;
 
1587
                m->msg_iovlen--;
 
1588
                m->msg_iov++;
 
1589
            }
 
1590
 
 
1591
            /* Might have written just part of the last iovec entry;
 
1592
               adjust it so the next write will do the rest. */
 
1593
            if (res > 0) {
 
1594
                m->msg_iov->iov_base += res;
 
1595
                m->msg_iov->iov_len -= res;
 
1596
            }
 
1597
            return TRANSMIT_INCOMPLETE;
 
1598
        }
 
1599
        if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
 
1600
            if (!update_event(c, EV_WRITE | EV_PERSIST)) {
 
1601
                if (settings.verbose > 0)
 
1602
                    fprintf(stderr, "Couldn't update event\n");
 
1603
                conn_set_state(c, conn_closing);
 
1604
                return TRANSMIT_HARD_ERROR;
 
1605
            }
 
1606
            return TRANSMIT_SOFT_ERROR;
 
1607
        }
 
1608
        /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
 
1609
           we have a real error, on which we close the connection */
 
1610
        if (settings.verbose > 0)
 
1611
            perror("Failed to write, and not due to blocking");
 
1612
 
 
1613
        if (c->udp)
 
1614
            conn_set_state(c, conn_read);
 
1615
        else
 
1616
            conn_set_state(c, conn_closing);
 
1617
        return TRANSMIT_HARD_ERROR;
 
1618
    } else {
 
1619
        return TRANSMIT_COMPLETE;
 
1620
    }
 
1621
}
 
1622
 
830
1623
void drive_machine(conn *c) {
831
1624
 
832
 
    int exit = 0;
 
1625
    int stop = 0;
833
1626
    int sfd, flags = 1;
834
1627
    socklen_t addrlen;
835
1628
    struct sockaddr addr;
836
1629
    conn *newc;
837
1630
    int res;
838
1631
 
839
 
    while (!exit) {
840
 
        /* printf("state %d\n", c->state);*/
 
1632
    while (!stop) {
841
1633
        switch(c->state) {
842
1634
        case conn_listening:
843
1635
            addrlen = sizeof(addr);
844
1636
            if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
845
1637
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
846
 
                    exit = 1;
 
1638
                    stop = 1;
847
1639
                    break;
 
1640
                } else if (errno == EMFILE) {
 
1641
                    if (settings.verbose > 0)
 
1642
                        fprintf(stderr, "Too many open connections\n");
 
1643
                    accept_new_conns(0);
848
1644
                } else {
849
1645
                    perror("accept()");
850
1646
                }
855
1651
                perror("setting O_NONBLOCK");
856
1652
                close(sfd);
857
1653
                break;
858
 
            }            
859
 
            newc = conn_new(sfd, conn_read, EV_READ | EV_PERSIST);
 
1654
            }
 
1655
            newc = conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
 
1656
                            DATA_BUFFER_SIZE, 0);
860
1657
            if (!newc) {
861
1658
                if (settings.verbose > 0)
862
1659
                    fprintf(stderr, "couldn't create new connection\n");
870
1667
            if (try_read_command(c)) {
871
1668
                continue;
872
1669
            }
873
 
            if (try_read_network(c)) {
 
1670
            if (c->udp ? try_read_udp(c) : try_read_network(c)) {
874
1671
                continue;
875
1672
            }
876
1673
            /* we have no command line and no data to read from network */
877
1674
            if (!update_event(c, EV_READ | EV_PERSIST)) {
878
1675
                if (settings.verbose > 0)
879
1676
                    fprintf(stderr, "Couldn't update event\n");
880
 
                c->state = conn_closing;
 
1677
                conn_set_state(c, conn_closing);
881
1678
                break;
882
1679
            }
883
 
            exit = 1;
 
1680
            stop = 1;
884
1681
            break;
885
1682
 
886
1683
        case conn_nread:
887
 
            /* we are reading rlbytes into rcurr; */
 
1684
            /* we are reading rlbytes into ritem; */
888
1685
            if (c->rlbytes == 0) {
889
1686
                complete_nread(c);
890
1687
                break;
892
1689
            /* first check if we have leftovers in the conn_read buffer */
893
1690
            if (c->rbytes > 0) {
894
1691
                int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
895
 
                memcpy(c->rcurr, c->rbuf, tocopy);
896
 
                c->rcurr += tocopy;
 
1692
                memcpy(c->ritem, c->rcurr, tocopy);
 
1693
                c->ritem += tocopy;
897
1694
                c->rlbytes -= tocopy;
898
 
                if (c->rbytes > tocopy) {
899
 
                    memmove(c->rbuf, c->rbuf+tocopy, c->rbytes - tocopy);
900
 
                }
 
1695
                c->rcurr += tocopy;
901
1696
                c->rbytes -= tocopy;
902
1697
                break;
903
1698
            }
904
1699
 
905
1700
            /*  now try reading from the socket */
906
 
            res = read(c->sfd, c->rcurr, c->rlbytes);
 
1701
            res = read(c->sfd, c->ritem, c->rlbytes);
907
1702
            if (res > 0) {
908
1703
                stats.bytes_read += res;
909
 
                c->rcurr += res;
 
1704
                c->ritem += res;
910
1705
                c->rlbytes -= res;
911
1706
                break;
912
1707
            }
913
1708
            if (res == 0) { /* end of stream */
914
 
                c->state = conn_closing;
 
1709
                conn_set_state(c, conn_closing);
915
1710
                break;
916
1711
            }
917
1712
            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
918
1713
                if (!update_event(c, EV_READ | EV_PERSIST)) {
919
 
                    if (settings.verbose > 0) 
 
1714
                    if (settings.verbose > 0)
920
1715
                        fprintf(stderr, "Couldn't update event\n");
921
 
                    c->state = conn_closing;
 
1716
                    conn_set_state(c, conn_closing);
922
1717
                    break;
923
1718
                }
924
 
                exit = 1;
 
1719
                stop = 1;
925
1720
                break;
926
1721
            }
927
1722
            /* otherwise we have a real error, on which we close the connection */
928
1723
            if (settings.verbose > 0)
929
1724
                fprintf(stderr, "Failed to read, and not due to blocking\n");
930
 
            c->state = conn_closing;
 
1725
            conn_set_state(c, conn_closing);
931
1726
            break;
932
1727
 
933
1728
        case conn_swallow:
934
1729
            /* we are reading sbytes and throwing them away */
935
1730
            if (c->sbytes == 0) {
936
 
                c->state = conn_read;
 
1731
                conn_set_state(c, conn_read);
937
1732
                break;
938
1733
            }
939
1734
 
941
1736
            if (c->rbytes > 0) {
942
1737
                int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
943
1738
                c->sbytes -= tocopy;
944
 
                if (c->rbytes > tocopy) {
945
 
                    memmove(c->rbuf, c->rbuf+tocopy, c->rbytes - tocopy);
946
 
                }
 
1739
                c->rcurr += tocopy;
947
1740
                c->rbytes -= tocopy;
948
1741
                break;
949
1742
            }
956
1749
                break;
957
1750
            }
958
1751
            if (res == 0) { /* end of stream */
959
 
                c->state = conn_closing;
 
1752
                conn_set_state(c, conn_closing);
960
1753
                break;
961
1754
            }
962
1755
            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
963
1756
                if (!update_event(c, EV_READ | EV_PERSIST)) {
964
1757
                    if (settings.verbose > 0)
965
1758
                        fprintf(stderr, "Couldn't update event\n");
966
 
                    c->state = conn_closing;
 
1759
                    conn_set_state(c, conn_closing);
967
1760
                    break;
968
1761
                }
969
 
                exit = 1;
 
1762
                stop = 1;
970
1763
                break;
971
1764
            }
972
1765
            /* otherwise we have a real error, on which we close the connection */
973
1766
            if (settings.verbose > 0)
974
1767
                fprintf(stderr, "Failed to read, and not due to blocking\n");
975
 
            c->state = conn_closing;
 
1768
            conn_set_state(c, conn_closing);
976
1769
            break;
977
1770
 
978
1771
        case conn_write:
979
 
            /* we are writing wbytes bytes starting from wcurr */
980
 
            if (c->wbytes == 0) {
981
 
                if (c->write_and_free) {
982
 
                    free(c->write_and_free);
983
 
                    c->write_and_free = 0;
984
 
                }
985
 
                c->state = c->write_and_go;
986
 
                if (c->state == conn_read)
987
 
                    set_cork(c, 0);
988
 
                break;
989
 
            }
990
 
            res = write(c->sfd, c->wcurr, c->wbytes);
991
 
            if (res > 0) {
992
 
                stats.bytes_written += res;
993
 
                c->wcurr  += res;
994
 
                c->wbytes -= res;
995
 
                break;
996
 
            }
997
 
            if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
998
 
                if (!update_event(c, EV_WRITE | EV_PERSIST)) {
 
1772
            /*
 
1773
             * We want to write out a simple response. If we haven't already,
 
1774
             * assemble it into a msgbuf list (this will be a single-entry
 
1775
             * list for TCP or a two-entry list for UDP).
 
1776
             */
 
1777
            if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
 
1778
                if (add_iov(c, c->wcurr, c->wbytes) ||
 
1779
                    (c->udp && build_udp_headers(c))) {
999
1780
                    if (settings.verbose > 0)
1000
 
                        fprintf(stderr, "Couldn't update event\n");
1001
 
                    c->state = conn_closing;
 
1781
                        fprintf(stderr, "Couldn't build response\n");
 
1782
                    conn_set_state(c, conn_closing);
1002
1783
                    break;
1003
 
                }                
1004
 
                exit = 1;
1005
 
                break;
 
1784
                }
1006
1785
            }
1007
 
            /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1008
 
               we have a real error, on which we close the connection */
1009
 
            if (settings.verbose > 0)
1010
 
                fprintf(stderr, "Failed to write, and not due to blocking\n");
1011
 
            c->state = conn_closing;
1012
 
            break;
 
1786
 
 
1787
            /* fall through... */
 
1788
 
1013
1789
        case conn_mwrite:
1014
 
            /* 
1015
 
             * we're writing ibytes bytes from iptr. iptr alternates between
1016
 
             * ibuf, where we build a string "VALUE...", and ITEM_data(it) for the 
1017
 
             * current item. When we finish a chunk, we choose the next one using 
1018
 
             * ipart, which has the following semantics: 0 - start the loop, 1 - 
1019
 
             * we finished ibuf, go to current ITEM_data(it); 2 - we finished ITEM_data(it),
1020
 
             * move to the next item and build its ibuf; 3 - we finished all items, 
1021
 
             * write "END".
1022
 
             */
1023
 
            if (c->ibytes > 0) {
1024
 
                res = write(c->sfd, c->iptr, c->ibytes);
1025
 
                if (res > 0) {
1026
 
                    stats.bytes_written += res;
1027
 
                    c->iptr += res;
1028
 
                    c->ibytes -= res;
1029
 
                    break;
1030
 
                }
1031
 
                if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1032
 
                    if (!update_event(c, EV_WRITE | EV_PERSIST)) {
1033
 
                        if (settings.verbose > 0)
1034
 
                            fprintf(stderr, "Couldn't update event\n");
1035
 
                        c->state = conn_closing;
1036
 
                        break;
1037
 
                    }
1038
 
                    exit = 1;
1039
 
                    break;
1040
 
                }
1041
 
                /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1042
 
                   we have a real error, on which we close the connection */
1043
 
                if (settings.verbose > 0)
1044
 
                    fprintf(stderr, "Failed to write, and not due to blocking\n");
1045
 
                c->state = conn_closing;
1046
 
                break;
1047
 
            } else {
1048
 
                item *it;
1049
 
                /* we finished a chunk, decide what to do next */
1050
 
                switch (c->ipart) {
1051
 
                case 1:
1052
 
                    it = *(c->icurr);
1053
 
                    assert((it->it_flags & ITEM_SLABBED) == 0);
1054
 
                    c->iptr = ITEM_data(it);
1055
 
                    c->ibytes = it->nbytes;
1056
 
                    c->ipart = 2;
1057
 
                    break;
1058
 
                case 2:
1059
 
                    it = *(c->icurr);
1060
 
                    item_remove(it);
1061
 
                    c->ileft--;
1062
 
                    if (c->ileft <= 0) {
1063
 
                        c->ipart = 3;
1064
 
                        break;
1065
 
                    } else {
 
1790
            switch (transmit(c)) {
 
1791
            case TRANSMIT_COMPLETE:
 
1792
                if (c->state == conn_mwrite) {
 
1793
                    while (c->ileft > 0) {
 
1794
                        item *it = *(c->icurr);
 
1795
                        assert((it->it_flags & ITEM_SLABBED) == 0);
 
1796
                        item_remove(it);
1066
1797
                        c->icurr++;
1067
 
                    }
1068
 
                    /* FALL THROUGH */
1069
 
                case 0:
1070
 
                    it = *(c->icurr);
1071
 
                    assert((it->it_flags & ITEM_SLABBED) == 0);
1072
 
                    c->ibytes = sprintf(c->ibuf, "VALUE %s %u %u\r\n", ITEM_key(it), it->flags, it->nbytes - 2);
1073
 
                    if (settings.verbose > 1)
1074
 
                        fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1075
 
                    c->iptr = c->ibuf;
1076
 
                    c->ipart = 1;
1077
 
                    break;
1078
 
                case 3:
1079
 
                    out_string(c, "END");
1080
 
                    break;
 
1798
                        c->ileft--;
 
1799
                    }
 
1800
                    conn_set_state(c, conn_read);
 
1801
                } else if (c->state == conn_write) {
 
1802
                    if (c->write_and_free) {
 
1803
                        free(c->write_and_free);
 
1804
                        c->write_and_free = 0;
 
1805
                    }
 
1806
                    conn_set_state(c, c->write_and_go);
 
1807
                } else {
 
1808
                    if (settings.verbose > 0)
 
1809
                        fprintf(stderr, "Unexpected state %d\n", c->state);
 
1810
                    conn_set_state(c, conn_closing);
1081
1811
                }
 
1812
                break;
 
1813
 
 
1814
            case TRANSMIT_INCOMPLETE:
 
1815
            case TRANSMIT_HARD_ERROR:
 
1816
                break;                   /* Continue in state machine. */
 
1817
 
 
1818
            case TRANSMIT_SOFT_ERROR:
 
1819
                stop = 1;
 
1820
                break;
1082
1821
            }
1083
1822
            break;
1084
1823
 
1085
1824
        case conn_closing:
1086
 
            conn_close(c);
1087
 
            exit = 1;
 
1825
            if (c->udp)
 
1826
                conn_cleanup(c);
 
1827
            else
 
1828
                conn_close(c);
 
1829
            stop = 1;
1088
1830
            break;
1089
1831
        }
1090
1832
 
1093
1835
    return;
1094
1836
}
1095
1837
 
1096
 
 
1097
1838
void event_handler(int fd, short which, void *arg) {
1098
1839
    conn *c;
1099
 
    
 
1840
 
1100
1841
    c = (conn *)arg;
1101
1842
    c->which = which;
1102
1843
 
1115
1856
    return;
1116
1857
}
1117
1858
 
1118
 
int new_socket(void) {
 
1859
int new_socket(int is_udp) {
1119
1860
    int sfd;
1120
1861
    int flags;
1121
1862
 
1122
 
    if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
 
1863
    if ((sfd = socket(AF_INET, is_udp ? SOCK_DGRAM : SOCK_STREAM, 0)) == -1) {
1123
1864
        perror("socket()");
1124
1865
        return -1;
1125
1866
    }
1133
1874
    return sfd;
1134
1875
}
1135
1876
 
1136
 
int server_socket(int port) {
 
1877
 
 
1878
/*
 
1879
 * Sets a socket's send buffer size to the maximum allowed by the system.
 
1880
 */
 
1881
void maximize_sndbuf(int sfd) {
 
1882
    socklen_t intsize = sizeof(int);
 
1883
    int last_good = 0;
 
1884
    int min, max, avg;
 
1885
    int old_size;
 
1886
 
 
1887
    /* Start with the default size. */
 
1888
    if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize)) {
 
1889
        if (settings.verbose > 0)
 
1890
            perror("getsockopt(SO_SNDBUF)");
 
1891
        return;
 
1892
    }
 
1893
 
 
1894
    /* Binary-search for the real maximum. */
 
1895
    min = old_size;
 
1896
    max = MAX_SENDBUF_SIZE;
 
1897
 
 
1898
    while (min <= max) {
 
1899
        avg = ((unsigned int) min + max) / 2;
 
1900
        if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &avg, intsize) == 0) {
 
1901
            last_good = avg;
 
1902
            min = avg + 1;
 
1903
        } else {
 
1904
            max = avg - 1;
 
1905
        }
 
1906
    }
 
1907
 
 
1908
    if (settings.verbose > 1)
 
1909
        fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
 
1910
}
 
1911
 
 
1912
 
 
1913
int server_socket(int port, int is_udp) {
1137
1914
    int sfd;
1138
1915
    struct linger ling = {0, 0};
1139
1916
    struct sockaddr_in addr;
1140
1917
    int flags =1;
1141
1918
 
1142
 
    if ((sfd = new_socket()) == -1) {
 
1919
    if ((sfd = new_socket(is_udp)) == -1) {
1143
1920
        return -1;
1144
1921
    }
1145
1922
 
1146
1923
    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
1147
 
    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
1148
 
    setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
1149
 
#if !defined(TCP_NOPUSH)
1150
 
    setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
1151
 
#endif
 
1924
    if (is_udp) {
 
1925
        maximize_sndbuf(sfd);
 
1926
    } else {
 
1927
        setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
 
1928
        setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
 
1929
        setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &flags, sizeof(flags));
 
1930
    }
1152
1931
 
1153
 
    /* 
 
1932
    /*
1154
1933
     * the memset call clears nonstandard fields in some impementations
1155
1934
     * that otherwise mess things up.
1156
1935
     */
1164
1943
        close(sfd);
1165
1944
        return -1;
1166
1945
    }
 
1946
    if (! is_udp && listen(sfd, 1024) == -1) {
 
1947
        perror("listen()");
 
1948
        close(sfd);
 
1949
        return -1;
 
1950
    }
 
1951
    return sfd;
 
1952
}
 
1953
 
 
1954
int new_socket_unix(void) {
 
1955
    int sfd;
 
1956
    int flags;
 
1957
 
 
1958
    if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
 
1959
        perror("socket()");
 
1960
        return -1;
 
1961
    }
 
1962
 
 
1963
    if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
 
1964
        fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
 
1965
        perror("setting O_NONBLOCK");
 
1966
        close(sfd);
 
1967
        return -1;
 
1968
    }
 
1969
    return sfd;
 
1970
}
 
1971
 
 
1972
int server_socket_unix(char *path) {
 
1973
    int sfd;
 
1974
    struct linger ling = {0, 0};
 
1975
    struct sockaddr_un addr;
 
1976
    struct stat tstat;
 
1977
    int flags =1;
 
1978
 
 
1979
    if (!path) {
 
1980
        return -1;
 
1981
    }
 
1982
 
 
1983
    if ((sfd = new_socket_unix()) == -1) {
 
1984
        return -1;
 
1985
    }
 
1986
 
 
1987
    /*
 
1988
     * Clean up a previous socket file if we left it around
 
1989
     */
 
1990
    if (!lstat(path, &tstat)) {
 
1991
        if (S_ISSOCK(tstat.st_mode))
 
1992
            unlink(path);
 
1993
    }
 
1994
 
 
1995
    setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
 
1996
    setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flags, sizeof(flags));
 
1997
    setsockopt(sfd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
 
1998
 
 
1999
    /*
 
2000
     * the memset call clears nonstandard fields in some impementations
 
2001
     * that otherwise mess things up.
 
2002
     */
 
2003
    memset(&addr, 0, sizeof(addr));
 
2004
 
 
2005
    addr.sun_family = AF_UNIX;
 
2006
    strcpy(addr.sun_path, path);
 
2007
    if (bind(sfd, (struct sockaddr *) &addr, sizeof(addr)) == -1) {
 
2008
        perror("bind()");
 
2009
        close(sfd);
 
2010
        return -1;
 
2011
    }
1167
2012
    if (listen(sfd, 1024) == -1) {
1168
2013
        perror("listen()");
1169
2014
        close(sfd);
1172
2017
    return sfd;
1173
2018
}
1174
2019
 
 
2020
 
1175
2021
/* invoke right before gdb is called, on assert */
1176
2022
void pre_gdb () {
1177
2023
    int i = 0;
1178
2024
    if(l_socket) close(l_socket);
 
2025
    if(u_socket > -1) close(u_socket);
1179
2026
    for (i=3; i<=500; i++) close(i); /* so lame */
1180
2027
    kill(getpid(), SIGABRT);
1181
2028
}
1182
2029
 
 
2030
/*
 
2031
 * We keep the current time of day in a global variable that's updated by a
 
2032
 * timer event. This saves us a bunch of time() system calls (we really only
 
2033
 * need to get the time once a second, whereas there can be tens of thousands
 
2034
 * of requests a second) and allows us to use server-start-relative timestamps
 
2035
 * rather than absolute UNIX timestamps, a space savings on systems where
 
2036
 * sizeof(time_t) > sizeof(unsigned int).
 
2037
 */
 
2038
volatile rel_time_t current_time;
 
2039
struct event clockevent;
 
2040
 
 
2041
/* time-sensitive callers can call it by hand with this, outside the normal ever-1-second timer */
 
2042
void set_current_time () {
 
2043
    current_time = (rel_time_t) (time(0) - stats.started);
 
2044
}
 
2045
 
 
2046
void clock_handler(int fd, short which, void *arg) {
 
2047
    struct timeval t;
 
2048
    static int initialized = 0;
 
2049
 
 
2050
    if (initialized) {
 
2051
        /* only delete the event if it's actually there. */
 
2052
        evtimer_del(&clockevent);
 
2053
    } else {
 
2054
        initialized = 1;
 
2055
    }
 
2056
 
 
2057
    evtimer_set(&clockevent, clock_handler, 0);
 
2058
    t.tv_sec = 1;
 
2059
    t.tv_usec = 0;
 
2060
    evtimer_add(&clockevent, &t);
 
2061
 
 
2062
    set_current_time();
 
2063
}
 
2064
 
1183
2065
struct event deleteevent;
1184
2066
 
1185
2067
void delete_handler(int fd, short which, void *arg) {
1200
2082
 
1201
2083
    {
1202
2084
        int i, j=0;
1203
 
        time_t now = time(0);
1204
2085
        for (i=0; i<delcurr; i++) {
1205
2086
            item *it = todelete[i];
1206
 
            if (it->exptime < now) {
 
2087
            if (item_delete_lock_over(it)) {
1207
2088
                assert(it->refcount > 0);
1208
2089
                it->it_flags &= ~ITEM_DELETED;
1209
2090
                item_unlink(it);
1214
2095
        }
1215
2096
        delcurr = j;
1216
2097
    }
1217
 
                
1218
 
    return;
1219
2098
}
1220
 
        
 
2099
 
1221
2100
void usage(void) {
1222
2101
    printf(PACKAGE " " VERSION "\n");
1223
 
    printf("-p <num>      port number to listen on\n");
 
2102
    printf("-p <num>      TCP port number to listen on (default: 11211)\n");
 
2103
    printf("-U <num>      UDP port number to listen on (default: 0, off)\n");
 
2104
    printf("-s <file>     unix socket path to listen on (disables network support)\n");
1224
2105
    printf("-l <ip_addr>  interface to listen on, default is INDRR_ANY\n");
1225
2106
    printf("-d            run as a daemon\n");
1226
2107
    printf("-r            maximize core file limit\n");
1233
2114
    printf("-vv           very verbose (also print client commands/reponses)\n");
1234
2115
    printf("-h            print this help and exit\n");
1235
2116
    printf("-i            print memcached and libevent license\n");
 
2117
    printf("-b            run a managed instanced (mnemonic: buckets)\n");
1236
2118
    printf("-P <file>     save PID in <file>, only used with -d option\n");
 
2119
    printf("-f <factor>   chunk size growth factor, default 1.25\n");
 
2120
    printf("-n <bytes>    minimum space allocated for key+value+flags, default 48\n");
1237
2121
    return;
1238
2122
}
1239
2123
 
1240
2124
void usage_license(void) {
1241
2125
    printf(PACKAGE " " VERSION "\n\n");
1242
2126
    printf(
1243
 
        "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
1244
 
        "All rights reserved.\n"
1245
 
        "\n"
1246
 
        "Redistribution and use in source and binary forms, with or without\n"
1247
 
        "modification, are permitted provided that the following conditions are\n"
1248
 
        "met:\n"
1249
 
        "\n"
1250
 
        "    * Redistributions of source code must retain the above copyright\n"
1251
 
        "notice, this list of conditions and the following disclaimer.\n"
1252
 
        "\n"
1253
 
        "    * Redistributions in binary form must reproduce the above\n"
1254
 
        "copyright notice, this list of conditions and the following disclaimer\n"
1255
 
        "in the documentation and/or other materials provided with the\n"
1256
 
        "distribution.\n"
1257
 
        "\n"
1258
 
        "    * Neither the name of the Danga Interactive nor the names of its\n"
1259
 
        "contributors may be used to endorse or promote products derived from\n"
1260
 
        "this software without specific prior written permission.\n"
1261
 
        "\n"
1262
 
        "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
1263
 
        "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
1264
 
        "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
1265
 
        "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
1266
 
        "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
1267
 
        "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
1268
 
        "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
1269
 
        "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
1270
 
        "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
1271
 
        "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
1272
 
        "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
1273
 
        "\n"
1274
 
        "\n"
1275
 
        "This product includes software developed by Niels Provos.\n"
1276
 
        "\n"
1277
 
        "[ libevent ]\n"
1278
 
        "\n"
1279
 
        "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
1280
 
        "All rights reserved.\n"
1281
 
        "\n"
1282
 
        "Redistribution and use in source and binary forms, with or without\n"
1283
 
        "modification, are permitted provided that the following conditions\n"
1284
 
        "are met:\n"
1285
 
        "1. Redistributions of source code must retain the above copyright\n"
1286
 
        "   notice, this list of conditions and the following disclaimer.\n"
1287
 
        "2. Redistributions in binary form must reproduce the above copyright\n"
1288
 
        "   notice, this list of conditions and the following disclaimer in the\n"
1289
 
        "   documentation and/or other materials provided with the distribution.\n"
1290
 
        "3. All advertising materials mentioning features or use of this software\n"
1291
 
        "   must display the following acknowledgement:\n"
1292
 
        "      This product includes software developed by Niels Provos.\n"
1293
 
        "4. The name of the author may not be used to endorse or promote products\n"
1294
 
        "   derived from this software without specific prior written permission.\n"
1295
 
        "\n"
1296
 
        "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
1297
 
        "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
1298
 
        "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
1299
 
        "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
1300
 
        "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
1301
 
        "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
1302
 
        "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
1303
 
        "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
1304
 
        "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
1305
 
        "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
 
2127
    "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
 
2128
    "All rights reserved.\n"
 
2129
    "\n"
 
2130
    "Redistribution and use in source and binary forms, with or without\n"
 
2131
    "modification, are permitted provided that the following conditions are\n"
 
2132
    "met:\n"
 
2133
    "\n"
 
2134
    "    * Redistributions of source code must retain the above copyright\n"
 
2135
    "notice, this list of conditions and the following disclaimer.\n"
 
2136
    "\n"
 
2137
    "    * Redistributions in binary form must reproduce the above\n"
 
2138
    "copyright notice, this list of conditions and the following disclaimer\n"
 
2139
    "in the documentation and/or other materials provided with the\n"
 
2140
    "distribution.\n"
 
2141
    "\n"
 
2142
    "    * Neither the name of the Danga Interactive nor the names of its\n"
 
2143
    "contributors may be used to endorse or promote products derived from\n"
 
2144
    "this software without specific prior written permission.\n"
 
2145
    "\n"
 
2146
    "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
 
2147
    "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
 
2148
    "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
 
2149
    "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
 
2150
    "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
 
2151
    "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
 
2152
    "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
 
2153
    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
 
2154
    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
 
2155
    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
 
2156
    "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
 
2157
    "\n"
 
2158
    "\n"
 
2159
    "This product includes software developed by Niels Provos.\n"
 
2160
    "\n"
 
2161
    "[ libevent ]\n"
 
2162
    "\n"
 
2163
    "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
 
2164
    "All rights reserved.\n"
 
2165
    "\n"
 
2166
    "Redistribution and use in source and binary forms, with or without\n"
 
2167
    "modification, are permitted provided that the following conditions\n"
 
2168
    "are met:\n"
 
2169
    "1. Redistributions of source code must retain the above copyright\n"
 
2170
    "   notice, this list of conditions and the following disclaimer.\n"
 
2171
    "2. Redistributions in binary form must reproduce the above copyright\n"
 
2172
    "   notice, this list of conditions and the following disclaimer in the\n"
 
2173
    "   documentation and/or other materials provided with the distribution.\n"
 
2174
    "3. All advertising materials mentioning features or use of this software\n"
 
2175
    "   must display the following acknowledgement:\n"
 
2176
    "      This product includes software developed by Niels Provos.\n"
 
2177
    "4. The name of the author may not be used to endorse or promote products\n"
 
2178
    "   derived from this software without specific prior written permission.\n"
 
2179
    "\n"
 
2180
    "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
 
2181
    "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
 
2182
    "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
 
2183
    "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
 
2184
    "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
 
2185
    "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
 
2186
    "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
 
2187
    "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
 
2188
    "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
 
2189
    "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
1306
2190
    );
1307
2191
 
1308
2192
    return;
1336
2220
}
1337
2221
 
1338
2222
int l_socket=0;
 
2223
int u_socket=-1;
 
2224
 
 
2225
void sig_handler(int sig) {
 
2226
    printf("SIGINT handled.\n");
 
2227
    exit(0);
 
2228
}
1339
2229
 
1340
2230
int main (int argc, char **argv) {
1341
2231
    int c;
1342
 
    conn *l_conn;
 
2232
    conn *u_conn;
1343
2233
    struct in_addr addr;
1344
2234
    int lock_memory = 0;
1345
2235
    int daemonize = 0;
1350
2240
    struct rlimit rlim;
1351
2241
    char *pid_file = NULL;
1352
2242
 
 
2243
    /* handle SIGINT */
 
2244
    signal(SIGINT, sig_handler);
 
2245
 
1353
2246
    /* init settings */
1354
2247
    settings_init();
1355
2248
 
 
2249
    /* set stderr non-buffering (for running under, say, daemontools) */
 
2250
    setbuf(stderr, NULL);
 
2251
 
1356
2252
    /* process arguments */
1357
 
    while ((c = getopt(argc, argv, "p:m:Mc:khirvdl:u:P:")) != -1) {
 
2253
    while ((c = getopt(argc, argv, "bp:s:U:m:Mc:khirvdl:u:P:f:s:")) != -1) {
1358
2254
        switch (c) {
 
2255
        case 'U':
 
2256
            settings.udpport = atoi(optarg);
 
2257
            break;
 
2258
        case 'b':
 
2259
            settings.managed = 1;
 
2260
            break;
1359
2261
        case 'p':
1360
2262
            settings.port = atoi(optarg);
1361
2263
            break;
 
2264
        case 's':
 
2265
            settings.socketpath = optarg;
 
2266
            break;
1362
2267
        case 'm':
1363
 
            settings.maxbytes = atoi(optarg)*1024*1024;
 
2268
            settings.maxbytes = ((size_t)atoi(optarg))*1024*1024;
1364
2269
            break;
1365
2270
        case 'M':
1366
2271
            settings.evict_to_free = 0;
1381
2286
            settings.verbose++;
1382
2287
            break;
1383
2288
        case 'l':
1384
 
            if (!inet_aton(optarg, &addr)) {
 
2289
            if (!inet_pton(AF_INET, optarg, &addr)) {
1385
2290
                fprintf(stderr, "Illegal address: %s\n", optarg);
1386
2291
                return 1;
1387
2292
            } else {
1400
2305
        case 'P':
1401
2306
            pid_file = optarg;
1402
2307
            break;
 
2308
        case 'f':
 
2309
            settings.factor = atof(optarg);
 
2310
            if (settings.factor <= 1.0) {
 
2311
                fprintf(stderr, "Factor must be greater than 1\n");
 
2312
                return 1;
 
2313
            }
 
2314
            break;
 
2315
        case 'n':
 
2316
            settings.chunk_size = atoi(optarg);
 
2317
            if (settings.chunk_size == 0) {
 
2318
                fprintf(stderr, "Chunk size must be greater than 0\n");
 
2319
                return 1;
 
2320
            }
 
2321
            break;
1403
2322
        default:
1404
2323
            fprintf(stderr, "Illegal argument \"%c\"\n", c);
1405
2324
            return 1;
1408
2327
 
1409
2328
    if (maxcore) {
1410
2329
        struct rlimit rlim_new;
1411
 
        /* 
 
2330
        /*
1412
2331
         * First try raising to infinity; if that fails, try bringing
1413
 
         * the soft limit to the hard. 
 
2332
         * the soft limit to the hard.
1414
2333
         */
1415
2334
        if (getrlimit(RLIMIT_CORE, &rlim)==0) {
1416
2335
            rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
1417
2336
            if (setrlimit(RLIMIT_CORE, &rlim_new)!=0) {
1418
2337
                /* failed. try raising just to the old max */
1419
 
                rlim_new.rlim_cur = rlim_new.rlim_max = 
 
2338
                rlim_new.rlim_cur = rlim_new.rlim_max =
1420
2339
                    rlim.rlim_max;
1421
2340
                (void) setrlimit(RLIMIT_CORE, &rlim_new);
1422
2341
            }
1423
2342
        }
1424
 
        /* 
1425
 
         * getrlimit again to see what we ended up with. Only fail if 
1426
 
         * the soft limit ends up 0, because then no core files will be 
 
2343
        /*
 
2344
         * getrlimit again to see what we ended up with. Only fail if
 
2345
         * the soft limit ends up 0, because then no core files will be
1427
2346
         * created at all.
1428
2347
         */
1429
 
           
 
2348
 
1430
2349
        if ((getrlimit(RLIMIT_CORE, &rlim)!=0) || rlim.rlim_cur==0) {
1431
2350
            fprintf(stderr, "failed to ensure corefile creation\n");
1432
2351
            exit(1);
1433
2352
        }
1434
2353
    }
1435
 
                        
1436
 
    /* 
 
2354
 
 
2355
    /*
1437
2356
     * If needed, increase rlimits to allow as many connections
1438
2357
     * as needed.
1439
2358
     */
1440
 
    
 
2359
 
1441
2360
    if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
1442
2361
        fprintf(stderr, "failed to getrlimit number of files\n");
1443
2362
        exit(1);
1444
2363
    } else {
1445
2364
        int maxfiles = settings.maxconns;
1446
 
        if (rlim.rlim_cur < maxfiles) 
 
2365
        if (rlim.rlim_cur < maxfiles)
1447
2366
            rlim.rlim_cur = maxfiles + 3;
1448
2367
        if (rlim.rlim_max < rlim.rlim_cur)
1449
2368
            rlim.rlim_max = rlim.rlim_cur;
1453
2372
        }
1454
2373
    }
1455
2374
 
1456
 
    /* 
1457
 
     * initialization order: first create the listening socket
 
2375
    /*
 
2376
     * initialization order: first create the listening sockets
1458
2377
     * (may need root on low ports), then drop root if needed,
1459
2378
     * then daemonise if needed, then init libevent (in some cases
1460
2379
     * descriptors created by libevent wouldn't survive forking).
1461
2380
     */
1462
2381
 
1463
2382
    /* create the listening socket and bind it */
1464
 
    l_socket = server_socket(settings.port);
1465
 
    if (l_socket == -1) {
1466
 
        fprintf(stderr, "failed to listen\n");
1467
 
        exit(1);
 
2383
    if (!settings.socketpath) {
 
2384
        l_socket = server_socket(settings.port, 0);
 
2385
        if (l_socket == -1) {
 
2386
            fprintf(stderr, "failed to listen\n");
 
2387
            exit(1);
 
2388
        }
 
2389
    }
 
2390
 
 
2391
    if (settings.udpport > 0 && ! settings.socketpath) {
 
2392
        /* create the UDP listening socket and bind it */
 
2393
        u_socket = server_socket(settings.udpport, 1);
 
2394
        if (u_socket == -1) {
 
2395
            fprintf(stderr, "failed to listen on UDP port %d\n", settings.udpport);
 
2396
            exit(1);
 
2397
        }
1468
2398
    }
1469
2399
 
1470
2400
    /* lose root privileges if we have them */
1483
2413
        }
1484
2414
    }
1485
2415
 
 
2416
    /* create unix mode sockets after dropping privileges */
 
2417
    if (settings.socketpath) {
 
2418
        l_socket = server_socket_unix(settings.socketpath);
 
2419
        if (l_socket == -1) {
 
2420
            fprintf(stderr, "failed to listen\n");
 
2421
            exit(1);
 
2422
        }
 
2423
    }
 
2424
 
1486
2425
    /* daemonize if requested */
1487
2426
    /* if we want to ensure our ability to dump core, don't chdir to / */
1488
2427
    if (daemonize) {
1501
2440
    stats_init();
1502
2441
    assoc_init();
1503
2442
    conn_init();
1504
 
    slabs_init(settings.maxbytes);
 
2443
    slabs_init(settings.maxbytes, settings.factor);
 
2444
 
 
2445
    /* managed instance? alloc and zero a bucket array */
 
2446
    if (settings.managed) {
 
2447
        buckets = malloc(sizeof(int)*MAX_BUCKETS);
 
2448
        if (buckets == 0) {
 
2449
            fprintf(stderr, "failed to allocate the bucket array");
 
2450
            exit(1);
 
2451
        }
 
2452
        memset(buckets, 0, sizeof(int)*MAX_BUCKETS);
 
2453
    }
1505
2454
 
1506
2455
    /* lock paged memory if needed */
1507
2456
    if (lock_memory) {
1521
2470
    if (sigemptyset(&sa.sa_mask) == -1 ||
1522
2471
        sigaction(SIGPIPE, &sa, 0) == -1) {
1523
2472
        perror("failed to ignore SIGPIPE; sigaction");
1524
 
        exit(1); 
 
2473
        exit(1);
1525
2474
    }
1526
 
 
1527
2475
    /* create the initial listening connection */
1528
 
    if (!(l_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST))) {
 
2476
    if (!(listen_conn = conn_new(l_socket, conn_listening, EV_READ | EV_PERSIST, 1, 0))) {
1529
2477
        fprintf(stderr, "failed to create listening connection");
1530
2478
        exit(1);
1531
2479
    }
1532
 
 
 
2480
    /* create the initial listening udp connection */
 
2481
    if (u_socket > -1 &&
 
2482
        !(u_conn = conn_new(u_socket, conn_read, EV_READ | EV_PERSIST, UDP_READ_BUFFER_SIZE, 1))) {
 
2483
        fprintf(stderr, "failed to create udp connection");
 
2484
        exit(1);
 
2485
    }
 
2486
    /* initialise clock event */
 
2487
    clock_handler(0,0,0);
1533
2488
    /* initialise deletion array and timer event */
1534
2489
    deltotal = 200; delcurr = 0;
1535
2490
    todelete = malloc(sizeof(item *)*deltotal);
1536
2491
    delete_handler(0,0,0); /* sets up the event */
1537
 
 
1538
2492
    /* save the PID in if we're a daemon */
1539
2493
    if (daemonize)
1540
2494
        save_pid(getpid(),pid_file);
1541
 
 
1542
2495
    /* enter the loop */
1543
2496
    event_loop(0);
1544
 
 
1545
2497
    /* remove the PID file if we're a daemon */
1546
2498
    if (daemonize)
1547
2499
        remove_pidfile(pid_file);
1548
 
 
1549
2500
    return 0;
1550
2501
}
1551