389
* Reallocates memory and updates a buffer size if successful.
391
int do_realloc(void **orig, int newsize, int bytes_per_item, int *size) {
392
void *newbuf = realloc(*orig, newsize * bytes_per_item);
402
* Shrinks a connection's buffers if they're too big. This prevents
403
* periodic large "get" requests from permanently chewing lots of server
406
* This should only be called in between requests since it can wipe output
409
void conn_shrink(conn *c) {
413
if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
414
if (c->rcurr != c->rbuf)
415
memmove(c->rbuf, c->rcurr, c->rbytes);
416
do_realloc((void **)&c->rbuf, DATA_BUFFER_SIZE, 1, &c->rsize);
420
if (c->isize > ITEM_LIST_HIGHWAT) {
421
do_realloc((void **)&c->ilist, ITEM_LIST_INITIAL, sizeof(c->ilist[0]), &c->isize);
424
if (c->msgsize > MSG_LIST_HIGHWAT) {
425
do_realloc((void **)&c->msglist, MSG_LIST_INITIAL, sizeof(c->msglist[0]), &c->msgsize);
428
if (c->iovsize > IOV_LIST_HIGHWAT) {
429
do_realloc((void **)&c->iov, IOV_LIST_INITIAL, sizeof(c->iov[0]), &c->iovsize);
434
* Sets a connection's current state in the state machine. Any special
435
* processing that needs to happen on certain state transitions can
438
void conn_set_state(conn *c, int state) {
439
if (state != c->state) {
440
if (state == conn_read) {
442
assoc_move_next_bucket();
450
* Ensures that there is room for another struct iovec in a connection's
453
* Returns 0 on success, -1 on out-of-memory.
455
int ensure_iov_space(conn *c) {
456
if (c->iovused >= c->iovsize) {
458
struct iovec *new_iov = (struct iovec *) realloc(c->iov,
459
(c->iovsize * 2) * sizeof(struct iovec));
465
/* Point all the msghdr structures at the new list. */
466
for (i = 0, iovnum = 0; i < c->msgused; i++) {
467
c->msglist[i].msg_iov = &c->iov[iovnum];
468
iovnum += c->msglist[i].msg_iovlen;
477
* Adds data to the list of pending data that will be written out to a
480
* Returns 0 on success, -1 on out-of-memory.
483
int add_iov(conn *c, const void *buf, int len) {
489
m = &c->msglist[c->msgused - 1];
492
* Limit UDP packets, and the first payloads of TCP replies, to
493
* UDP_MAX_PAYLOAD_SIZE bytes.
495
limit_to_mtu = c->udp || (1 == c->msgused);
497
/* We may need to start a new msghdr if this one is full. */
498
if (m->msg_iovlen == IOV_MAX ||
499
(limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
501
m = &c->msglist[c->msgused - 1];
504
if (ensure_iov_space(c))
507
/* If the fragment is too big to fit in the datagram, split it up */
508
if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
509
leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
515
m = &c->msglist[c->msgused - 1];
516
m->msg_iov[m->msg_iovlen].iov_base = (void*) buf;
517
m->msg_iov[m->msg_iovlen].iov_len = len;
523
buf = ((char *)buf) + len;
525
} while (leftover > 0);
532
* Constructs a set of UDP headers and attaches them to the outgoing messages.
534
int build_udp_headers(conn *c) {
538
if (c->msgused > c->hdrsize) {
541
new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
543
new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
546
c->hdrbuf = (unsigned char *) new_hdrbuf;
547
c->hdrsize = c->msgused * 2;
551
for (i = 0; i < c->msgused; i++) {
552
c->msglist[i].msg_iov[0].iov_base = hdr;
553
c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
554
*hdr++ = c->request_id / 256;
555
*hdr++ = c->request_id % 256;
558
*hdr++ = c->msgused / 256;
559
*hdr++ = c->msgused % 256;
562
assert((void*) hdr == (void*) c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
239
569
void out_string(conn *c, char *str) {
268
598
item *it = c->item;
269
599
int comm = c->item_comm;
271
time_t now = time(0);
601
int delete_locked = 0;
602
char *key = ITEM_key(it);
273
604
stats.set_cmds++;
276
if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
277
out_string(c, "CLIENT_ERROR bad data chunk");
281
old_it = assoc_find(ITEM_key(it));
283
if (old_it && settings.oldest_live &&
284
old_it->time <= settings.oldest_live) {
289
if (old_it && old_it->exptime && old_it->exptime < now) {
294
if (old_it && comm==NREAD_ADD) {
296
out_string(c, "NOT_STORED");
300
if (!old_it && comm == NREAD_REPLACE) {
301
out_string(c, "NOT_STORED");
305
if (old_it && (old_it->it_flags & ITEM_DELETED) && (comm == NREAD_REPLACE || comm == NREAD_ADD)) {
306
out_string(c, "NOT_STORED");
311
item_replace(old_it, it);
312
} else item_link(it);
315
out_string(c, "STORED");
606
if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
607
out_string(c, "CLIENT_ERROR bad data chunk");
611
old_it = get_item_notedeleted(key, it->nkey, &delete_locked);
613
if (old_it && comm == NREAD_ADD) {
614
item_update(old_it); /* touches item, promotes to head of LRU */
615
out_string(c, "NOT_STORED");
619
if (!old_it && comm == NREAD_REPLACE) {
620
out_string(c, "NOT_STORED");
625
if (comm == NREAD_REPLACE || comm == NREAD_ADD) {
626
out_string(c, "NOT_STORED");
630
/* but "set" commands can override the delete lock
631
window... in which case we have to find the old hidden item
632
that's in the namespace/LRU but wasn't returned by
633
get_item.... because we need to replace it (below) */
634
old_it = assoc_find(key, it->nkey);
638
item_replace(old_it, it);
643
out_string(c, "STORED");
652
typedef struct token_s {
657
#define COMMAND_TOKEN 0
658
#define SUBCOMMAND_TOKEN 1
660
#define KEY_MAX_LENGTH 250
665
* Tokenize the command string by replacing whitespace with '\0' and update
666
* the token array tokens with pointer to start of each token and length.
667
* Returns total number of tokens. The last valid token is the terminal
668
* token (value points to the first unprocessed character of the string and
673
* while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
674
* for(int ix = 0; tokens[ix].length != 0; ix++) {
677
* ncommand = tokens[ix].value - command;
678
* command = tokens[ix].value;
681
size_t tokenize_command(char* command, token_t* tokens, size_t max_tokens) {
687
assert(command != NULL && tokens != NULL && max_tokens > 1);
690
while(*cp != '\0' && ntokens < max_tokens - 1) {
692
// If we've accumulated a token, this is the end of it.
694
tokens[ntokens].value = value;
695
tokens[ntokens].length = length;
710
if(ntokens < max_tokens - 1 && length > 0) {
711
tokens[ntokens].value = value;
712
tokens[ntokens].length = length;
717
* If we scanned the whole string, the terminal value pointer is null,
718
* otherwise it is the first unprocessed character.
720
tokens[ntokens].value = *cp == '\0' ? NULL : cp;
721
tokens[ntokens].length = 0;
727
void process_stat(conn *c, token_t* tokens, size_t ntokens) {
728
rel_time_t now = current_time;
733
out_string(c, "CLIENT_ERROR bad command line");
324
void process_stat(conn *c, char *command) {
325
time_t now = time(0);
327
if (strcmp(command, "stats") == 0) {
737
command = tokens[COMMAND_TOKEN].value;
739
if (ntokens == 2 && strcmp(command, "stats") == 0) {
329
741
pid_t pid = getpid();
330
742
char *pos = temp;
331
743
struct rusage usage;
333
745
getrusage(RUSAGE_SELF, &usage);
335
747
pos += sprintf(pos, "STAT pid %u\r\n", pid);
336
pos += sprintf(pos, "STAT uptime %lu\r\n", now - stats.started);
337
pos += sprintf(pos, "STAT time %ld\r\n", now);
748
pos += sprintf(pos, "STAT uptime %u\r\n", now);
749
pos += sprintf(pos, "STAT time %ld\r\n", now + stats.started);
338
750
pos += sprintf(pos, "STAT version " VERSION "\r\n");
751
pos += sprintf(pos, "STAT pointer_size %d\r\n", 8 * sizeof(void*));
339
752
pos += sprintf(pos, "STAT rusage_user %ld.%06ld\r\n", usage.ru_utime.tv_sec, usage.ru_utime.tv_usec);
340
753
pos += sprintf(pos, "STAT rusage_system %ld.%06ld\r\n", usage.ru_stime.tv_sec, usage.ru_stime.tv_usec);
341
754
pos += sprintf(pos, "STAT curr_items %u\r\n", stats.curr_items);
490
914
out_string(c, "ERROR");
493
void process_command(conn *c, char *command) {
499
* for commands set/add/replace, we build an item and read the data
500
* directly into it, then continue in nread_complete().
503
if (settings.verbose > 1)
504
fprintf(stderr, "<%d %s\n", c->sfd, command);
506
/* All incoming commands will require a response, so we cork at the beginning,
507
and uncork at the very end (usually by means of out_string) */
510
if ((strncmp(command, "add ", 4) == 0 && (comm = NREAD_ADD)) ||
511
(strncmp(command, "set ", 4) == 0 && (comm = NREAD_SET)) ||
512
(strncmp(command, "replace ", 8) == 0 && (comm = NREAD_REPLACE))) {
520
res = sscanf(command, "%*s %250s %u %ld %d\n", key, &flags, &expire, &len);
521
if (res!=4 || strlen(key)==0 ) {
522
out_string(c, "CLIENT_ERROR bad command line format");
525
expire = realtime(expire);
526
it = item_alloc(key, flags, expire, len+2);
528
out_string(c, "SERVER_ERROR out of memory");
529
/* swallow the data line */
530
c->write_and_go = conn_swallow;
537
c->rcurr = ITEM_data(it);
538
c->rlbytes = it->nbytes;
539
c->state = conn_nread;
917
inline void process_get_command(conn *c, token_t* tokens, size_t ntokens) {
922
token_t* key_token = &tokens[KEY_TOKEN];
924
if (settings.managed) {
925
int bucket = c->bucket;
927
out_string(c, "CLIENT_ERROR no BG data in managed mode");
931
if (buckets[bucket] != c->gen) {
932
out_string(c, "ERROR_NOT_OWNER");
543
if ((strncmp(command, "incr ", 5) == 0 && (incr = 1)) ||
544
(strncmp(command, "decr ", 5) == 0)) {
552
time_t now = time(0);
554
res = sscanf(command, "%*s %250s %u\n", key, &delta);
555
if (res!=2 || strlen(key)==0 ) {
556
out_string(c, "CLIENT_ERROR bad command line format");
560
it = assoc_find(key);
561
if (it && (it->it_flags & ITEM_DELETED)) {
564
if (it && it->exptime && it->exptime < now) {
570
out_string(c, "NOT_FOUND");
575
while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++;
582
if (delta >= value) value = 0;
586
sprintf(temp, "%u", value);
588
if (res + 2 > it->nbytes) { /* need to realloc */
590
new_it = item_alloc(ITEM_key(it), it->flags, it->exptime, res + 2 );
592
out_string(c, "SERVER_ERROR out of memory");
938
while(key_token->length != 0) {
940
key = key_token->value;
941
nkey = key_token->length;
943
if(nkey > KEY_MAX_LENGTH) {
944
out_string(c, "CLIENT_ERROR bad command line format");
595
memcpy(ITEM_data(new_it), temp, res);
596
memcpy(ITEM_data(new_it) + res, "\r\n", 2);
597
item_replace(it, new_it);
598
} else { /* replace in-place */
599
memcpy(ITEM_data(it), temp, res);
600
memset(ITEM_data(it) + res, ' ', it->nbytes-res-2);
606
if (strncmp(command, "get ", 4) == 0) {
608
char *start = command + 4;
613
time_t now = time(0);
615
while(sscanf(start, " %250s%n", key, &next) >= 1) {
617
948
stats.get_cmds++;
618
it = assoc_find(key);
619
if (it && (it->it_flags & ITEM_DELETED)) {
622
if (settings.oldest_live && it &&
623
it->time <= settings.oldest_live) {
627
if (it && it->exptime && it->exptime < now) {
949
it = get_item(key, nkey);
633
951
if (i >= c->isize) {
634
952
item **new_list = realloc(c->ilist, sizeof(item *)*c->isize*2);
637
955
c->ilist = new_list;
960
* Construct the response. Each hit adds three elements to the
961
* outgoing data list:
964
* " " + flags + " " + data length + "\r\n" + data (with \r\n)
966
if (add_iov(c, "VALUE ", 6) ||
967
add_iov(c, ITEM_key(it), it->nkey) ||
968
add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes))
972
if (settings.verbose > 1)
973
fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
640
975
stats.get_hits++;
643
978
*(c->ilist + i) = it;
645
981
} else stats.get_misses++;
651
c->state = conn_mwrite;
655
out_string(c, "END");
660
if (strncmp(command, "delete ", 7) == 0) {
987
* If the command string hasn't been fully processed, get the next set
990
if(key_token->value != NULL) {
991
ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
995
} while(key_token->value != NULL);
1000
if (settings.verbose > 1)
1001
fprintf(stderr, ">%d END\n", c->sfd);
1002
add_iov(c, "END\r\n", 5);
1004
if (c->udp && build_udp_headers(c)) {
1005
out_string(c, "SERVER_ERROR out of memory");
1008
conn_set_state(c, conn_mwrite);
1014
void process_update_command(conn *c, token_t* tokens, size_t ntokens, int comm) {
1022
if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1023
out_string(c, "CLIENT_ERROR bad command line format");
1027
key = tokens[KEY_TOKEN].value;
1028
nkey = tokens[KEY_TOKEN].length;
1030
flags = strtoul(tokens[2].value, NULL, 10);
1031
exptime = strtol(tokens[3].value, NULL, 10);
1032
vlen = strtol(tokens[4].value, NULL, 10);
1034
if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
1035
out_string(c, "CLIENT_ERROR bad command line format");
1039
if (settings.managed) {
1040
int bucket = c->bucket;
1042
out_string(c, "CLIENT_ERROR no BG data in managed mode");
1046
if (buckets[bucket] != c->gen) {
1047
out_string(c, "ERROR_NOT_OWNER");
1052
it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
1055
if (! item_size_ok(key, nkey, flags, vlen + 2))
1056
out_string(c, "SERVER_ERROR object too large for cache");
1058
out_string(c, "SERVER_ERROR out of memory");
1059
/* swallow the data line */
1060
c->write_and_go = conn_swallow;
1065
c->item_comm = comm;
1067
c->ritem = ITEM_data(it);
1068
c->rlbytes = it->nbytes;
1069
conn_set_state(c, conn_nread);
1073
void process_arithmetic_command(conn *c, token_t* tokens, size_t ntokens, int incr) {
1083
if(tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
1084
out_string(c, "CLIENT_ERROR bad command line format");
1088
key = tokens[KEY_TOKEN].value;
1089
nkey = tokens[KEY_TOKEN].length;
1091
if (settings.managed) {
1092
int bucket = c->bucket;
1094
out_string(c, "CLIENT_ERROR no BG data in managed mode");
1098
if (buckets[bucket] != c->gen) {
1099
out_string(c, "ERROR_NOT_OWNER");
1104
it = get_item(key, nkey);
1106
out_string(c, "NOT_FOUND");
1110
delta = strtoul(tokens[2].value, NULL, 10);
1112
if(errno == ERANGE) {
1113
out_string(c, "CLIENT_ERROR bad command line format");
1117
ptr = ITEM_data(it);
1118
while (*ptr && (*ptr<'0' && *ptr>'9')) ptr++; // BUG: can't be true
1120
value = strtol(ptr, NULL, 10);
1122
if(errno == ERANGE) {
1123
out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
1130
if (delta >= value) value = 0;
1133
sprintf(temp, "%u", value);
1135
if (res + 2 > it->nbytes) { /* need to realloc */
1137
new_it = item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
1139
out_string(c, "SERVER_ERROR out of memory");
1142
memcpy(ITEM_data(new_it), temp, res);
1143
memcpy(ITEM_data(new_it) + res, "\r\n", 2);
1144
item_replace(it, new_it);
1145
} else { /* replace in-place */
1146
memcpy(ITEM_data(it), temp, res);
1147
memset(ITEM_data(it) + res, ' ', it->nbytes-res-2);
1149
out_string(c, temp);
1153
void process_delete_command(conn *c, token_t* tokens, size_t ntokens) {
1159
if (settings.managed) {
1160
int bucket = c->bucket;
1162
out_string(c, "CLIENT_ERROR no BG data in managed mode");
1166
if (buckets[bucket] != c->gen) {
1167
out_string(c, "ERROR_NOT_OWNER");
1172
key = tokens[KEY_TOKEN].value;
1173
nkey = tokens[KEY_TOKEN].length;
1175
if(nkey > KEY_MAX_LENGTH) {
1176
out_string(c, "CLIENT_ERROR bad command line format");
1181
exptime = strtol(tokens[2].value, NULL, 10);
1183
if(errno == ERANGE) {
1184
out_string(c, "CLIENT_ERROR bad command line format");
1189
it = get_item(key, nkey);
1191
out_string(c, "NOT_FOUND");
1197
out_string(c, "DELETED");
1200
if (delcurr >= deltotal) {
1201
item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
1203
todelete = new_delete;
1207
* can't delete it immediately, user wants a delay,
1208
* but we ran out of memory for the delete queue
1210
out_string(c, "SERVER_ERROR out of memory");
1216
/* use its expiration time as its deletion time now */
1217
it->exptime = realtime(exptime);
1218
it->it_flags |= ITEM_DELETED;
1219
todelete[delcurr++] = it;
1220
out_string(c, "DELETED");
1224
void process_command(conn *c, char *command) {
1226
token_t tokens[MAX_TOKENS];
1230
if (settings.verbose > 1)
1231
fprintf(stderr, "<%d %s\n", c->sfd, command);
1234
* for commands set/add/replace, we build an item and read the data
1235
* directly into it, then continue in nread_complete().
1241
if (add_msghdr(c)) {
1242
out_string(c, "SERVER_ERROR out of memory");
1246
ntokens = tokenize_command(command, tokens, MAX_TOKENS);
1249
((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
1250
(strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
1252
process_get_command(c, tokens, ntokens);
1254
} else if (ntokens == 6 &&
1255
((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
1256
(strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
1257
(strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)))) {
1259
process_update_command(c, tokens, ntokens, comm);
1261
} else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
1263
process_arithmetic_command(c, tokens, ntokens, 1);
1265
} else if (ntokens == 4 && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
1267
process_arithmetic_command(c, tokens, ntokens, 0);
1269
} else if (ntokens >= 3 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
1271
process_delete_command(c, tokens, ntokens);
1273
} else if (ntokens == 3 && strcmp(tokens[COMMAND_TOKEN].value, "own") == 0) {
1274
unsigned int bucket, gen;
1275
if (!settings.managed) {
1276
out_string(c, "CLIENT_ERROR not a managed instance");
1280
if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
1281
if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1282
out_string(c, "CLIENT_ERROR bucket number out of range");
1285
buckets[bucket] = gen;
1286
out_string(c, "OWNED");
1289
out_string(c, "CLIENT_ERROR bad format");
1293
} else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "disown")) == 0) {
1296
if (!settings.managed) {
1297
out_string(c, "CLIENT_ERROR not a managed instance");
1300
if (sscanf(tokens[1].value, "%u", &bucket) == 1) {
1301
if ((bucket < 0) || (bucket >= MAX_BUCKETS)) {
1302
out_string(c, "CLIENT_ERROR bucket number out of range");
1305
buckets[bucket] = 0;
1306
out_string(c, "DISOWNED");
1309
out_string(c, "CLIENT_ERROR bad format");
1313
} else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value, "bg")) == 0) {
1315
if (!settings.managed) {
1316
out_string(c, "CLIENT_ERROR not a managed instance");
1319
if (sscanf(tokens[1].value, "%u:%u", &bucket,&gen) == 2) {
1320
/* we never write anything back, even if input's wrong */
1321
if ((bucket < 0) || (bucket >= MAX_BUCKETS) || (gen<=0)) {
1322
/* do nothing, bad input */
1327
conn_set_state(c, conn_read);
1330
out_string(c, "CLIENT_ERROR bad format");
1334
} else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
1336
process_stat(c, tokens, ntokens);
1338
} else if (ntokens >= 2 && ntokens <= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
664
1339
time_t exptime = 0;
666
res = sscanf(command, "%*s %250s %ld", key, &exptime);
667
it = assoc_find(key);
669
out_string(c, "NOT_FOUND");
675
out_string(c, "DELETED");
679
if (delcurr >= deltotal) {
680
item **new_delete = realloc(todelete, sizeof(item *) * deltotal * 2);
682
todelete = new_delete;
686
* can't delete it immediately, user wants a delay,
687
* but we ran out of memory for the delete queue
689
out_string(c, "SERVER_ERROR out of memory");
694
exptime = realtime(exptime);
697
/* use its expiration time as its deletion time now */
698
it->exptime = exptime;
699
it->it_flags |= ITEM_DELETED;
700
todelete[delcurr++] = it;
701
out_string(c, "DELETED");
705
if (strncmp(command, "stats", 5) == 0) {
706
process_stat(c, command);
710
if (strcmp(command, "flush_all") == 0) {
711
settings.oldest_live = time(0);
1343
settings.oldest_live = current_time - 1;
1344
item_flush_expired();
1345
out_string(c, "OK");
1349
exptime = strtol(tokens[1].value, NULL, 10);
1350
if(errno == ERANGE) {
1351
out_string(c, "CLIENT_ERROR bad command line format");
1355
settings.oldest_live = realtime(exptime) - 1;
1356
item_flush_expired();
712
1357
out_string(c, "OK");
1360
} else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
716
if (strcmp(command, "version") == 0) {
717
1362
out_string(c, "VERSION " VERSION);
721
if (strcmp(command, "quit") == 0) {
722
c->state = conn_closing;
726
if (strncmp(command, "slabs reassign ", 15) == 0) {
728
char *start = command+15;
729
if (sscanf(start, "%u %u\r\n", &src, &dst) == 2) {
730
int rv = slabs_reassign(src, dst);
732
out_string(c, "DONE");
736
out_string(c, "CANT");
740
out_string(c, "BUSY");
744
out_string(c, "CLIENT_ERROR bogus command");
748
out_string(c, "ERROR");
1364
} else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
1366
conn_set_state(c, conn_closing);
1368
} else if (ntokens == 5 && (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0 &&
1369
strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0)) {
1370
#ifdef ALLOW_SLABS_REASSIGN
1374
src = strtol(tokens[2].value, NULL, 10);
1375
dst = strtol(tokens[3].value, NULL, 10);
1377
if(errno == ERANGE) {
1378
out_string(c, "CLIENT_ERROR bad command line format");
1382
rv = slabs_reassign(src, dst);
1384
out_string(c, "DONE");
1388
out_string(c, "CANT");
1392
out_string(c, "BUSY");
1396
out_string(c, "CLIENT_ERROR Slab reassignment not supported");
1400
out_string(c, "ERROR");
753
* if we have a complete line in the buffer, process it and move whatever
754
* remains in the buffer to its beginning.
1406
* if we have a complete line in the buffer, process it.
756
1408
int try_read_command(conn *c) {
757
1409
char *el, *cont;
1411
assert(c->rcurr <= c->rbuf + c->rsize);
761
el = memchr(c->rbuf, '\n', c->rbytes);
1415
el = memchr(c->rcurr, '\n', c->rbytes);
765
if (el - c->rbuf > 1 && *(el - 1) == '\r') {
1419
if (el - c->rcurr > 1 && *(el - 1) == '\r') {
770
process_command(c, c->rbuf);
772
if (cont - c->rbuf < c->rbytes) { /* more stuff in the buffer */
773
memmove(c->rbuf, cont, c->rbytes - (cont - c->rbuf));
1424
assert(cont <= c->rcurr + c->rbytes);
1426
process_command(c, c->rcurr);
1428
c->rbytes -= (cont - c->rcurr);
1431
assert(c->rcurr <= c->rbuf + c->rsize);
1437
* read a UDP request.
1438
* return 0 if there's nothing to read.
1440
int try_read_udp(conn *c) {
1443
c->request_addr_size = sizeof(c->request_addr);
1444
res = recvfrom(c->sfd, c->rbuf, c->rsize,
1445
0, &c->request_addr, &c->request_addr_size);
1447
unsigned char *buf = (unsigned char *)c->rbuf;
1448
stats.bytes_read += res;
1450
/* Beginning of UDP packet is the request ID; save it. */
1451
c->request_id = buf[0] * 256 + buf[1];
1453
/* If this is a multi-packet request, drop it. */
1454
if (buf[4] != 0 || buf[5] != 1) {
1455
out_string(c, "SERVER_ERROR multi-packet request not supported");
1459
/* Don't care about any of the rest of the header. */
1461
memmove(c->rbuf, c->rbuf + 8, res);
775
c->rbytes -= (cont - c->rbuf);
780
1471
* read from network as much as we can, handle buffer overflow and connection
1473
* before reading, move the remaining incomplete fragment of a command
1474
* (if any) to the beginning of the buffer.
782
1475
* return 0 if there's nothing to read on the first read.
784
1477
int try_read_network(conn *c) {
785
1478
int gotdata = 0;
1481
if (c->rcurr != c->rbuf) {
1482
if (c->rbytes != 0) /* otherwise there's nothing to copy */
1483
memmove(c->rbuf, c->rcurr, c->rbytes);
788
1488
if (c->rbytes >= c->rsize) {
789
1489
char *new_rbuf = realloc(c->rbuf, c->rsize*2);
958
1751
if (res == 0) { /* end of stream */
959
c->state = conn_closing;
1752
conn_set_state(c, conn_closing);
962
1755
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
963
1756
if (!update_event(c, EV_READ | EV_PERSIST)) {
964
1757
if (settings.verbose > 0)
965
1758
fprintf(stderr, "Couldn't update event\n");
966
c->state = conn_closing;
1759
conn_set_state(c, conn_closing);
972
1765
/* otherwise we have a real error, on which we close the connection */
973
1766
if (settings.verbose > 0)
974
1767
fprintf(stderr, "Failed to read, and not due to blocking\n");
975
c->state = conn_closing;
1768
conn_set_state(c, conn_closing);
978
1771
case conn_write:
979
/* we are writing wbytes bytes starting from wcurr */
980
if (c->wbytes == 0) {
981
if (c->write_and_free) {
982
free(c->write_and_free);
983
c->write_and_free = 0;
985
c->state = c->write_and_go;
986
if (c->state == conn_read)
990
res = write(c->sfd, c->wcurr, c->wbytes);
992
stats.bytes_written += res;
997
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
998
if (!update_event(c, EV_WRITE | EV_PERSIST)) {
1773
* We want to write out a simple response. If we haven't already,
1774
* assemble it into a msgbuf list (this will be a single-entry
1775
* list for TCP or a two-entry list for UDP).
1777
if (c->iovused == 0 || (c->udp && c->iovused == 1)) {
1778
if (add_iov(c, c->wcurr, c->wbytes) ||
1779
(c->udp && build_udp_headers(c))) {
999
1780
if (settings.verbose > 0)
1000
fprintf(stderr, "Couldn't update event\n");
1001
c->state = conn_closing;
1781
fprintf(stderr, "Couldn't build response\n");
1782
conn_set_state(c, conn_closing);
1007
/* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1008
we have a real error, on which we close the connection */
1009
if (settings.verbose > 0)
1010
fprintf(stderr, "Failed to write, and not due to blocking\n");
1011
c->state = conn_closing;
1787
/* fall through... */
1013
1789
case conn_mwrite:
1015
* we're writing ibytes bytes from iptr. iptr alternates between
1016
* ibuf, where we build a string "VALUE...", and ITEM_data(it) for the
1017
* current item. When we finish a chunk, we choose the next one using
1018
* ipart, which has the following semantics: 0 - start the loop, 1 -
1019
* we finished ibuf, go to current ITEM_data(it); 2 - we finished ITEM_data(it),
1020
* move to the next item and build its ibuf; 3 - we finished all items,
1023
if (c->ibytes > 0) {
1024
res = write(c->sfd, c->iptr, c->ibytes);
1026
stats.bytes_written += res;
1031
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
1032
if (!update_event(c, EV_WRITE | EV_PERSIST)) {
1033
if (settings.verbose > 0)
1034
fprintf(stderr, "Couldn't update event\n");
1035
c->state = conn_closing;
1041
/* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
1042
we have a real error, on which we close the connection */
1043
if (settings.verbose > 0)
1044
fprintf(stderr, "Failed to write, and not due to blocking\n");
1045
c->state = conn_closing;
1049
/* we finished a chunk, decide what to do next */
1053
assert((it->it_flags & ITEM_SLABBED) == 0);
1054
c->iptr = ITEM_data(it);
1055
c->ibytes = it->nbytes;
1062
if (c->ileft <= 0) {
1790
switch (transmit(c)) {
1791
case TRANSMIT_COMPLETE:
1792
if (c->state == conn_mwrite) {
1793
while (c->ileft > 0) {
1794
item *it = *(c->icurr);
1795
assert((it->it_flags & ITEM_SLABBED) == 0);
1071
assert((it->it_flags & ITEM_SLABBED) == 0);
1072
c->ibytes = sprintf(c->ibuf, "VALUE %s %u %u\r\n", ITEM_key(it), it->flags, it->nbytes - 2);
1073
if (settings.verbose > 1)
1074
fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
1079
out_string(c, "END");
1800
conn_set_state(c, conn_read);
1801
} else if (c->state == conn_write) {
1802
if (c->write_and_free) {
1803
free(c->write_and_free);
1804
c->write_and_free = 0;
1806
conn_set_state(c, c->write_and_go);
1808
if (settings.verbose > 0)
1809
fprintf(stderr, "Unexpected state %d\n", c->state);
1810
conn_set_state(c, conn_closing);
1814
case TRANSMIT_INCOMPLETE:
1815
case TRANSMIT_HARD_ERROR:
1816
break; /* Continue in state machine. */
1818
case TRANSMIT_SOFT_ERROR:
1085
1824
case conn_closing:
1233
2114
printf("-vv very verbose (also print client commands/reponses)\n");
1234
2115
printf("-h print this help and exit\n");
1235
2116
printf("-i print memcached and libevent license\n");
2117
printf("-b run a managed instanced (mnemonic: buckets)\n");
1236
2118
printf("-P <file> save PID in <file>, only used with -d option\n");
2119
printf("-f <factor> chunk size growth factor, default 1.25\n");
2120
printf("-n <bytes> minimum space allocated for key+value+flags, default 48\n");
1240
2124
void usage_license(void) {
1241
2125
printf(PACKAGE " " VERSION "\n\n");
1243
"Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
1244
"All rights reserved.\n"
1246
"Redistribution and use in source and binary forms, with or without\n"
1247
"modification, are permitted provided that the following conditions are\n"
1250
" * Redistributions of source code must retain the above copyright\n"
1251
"notice, this list of conditions and the following disclaimer.\n"
1253
" * Redistributions in binary form must reproduce the above\n"
1254
"copyright notice, this list of conditions and the following disclaimer\n"
1255
"in the documentation and/or other materials provided with the\n"
1258
" * Neither the name of the Danga Interactive nor the names of its\n"
1259
"contributors may be used to endorse or promote products derived from\n"
1260
"this software without specific prior written permission.\n"
1262
"THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
1263
"\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
1264
"LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
1265
"A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
1266
"OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
1267
"SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
1268
"LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
1269
"DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
1270
"THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
1271
"(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
1272
"OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
1275
"This product includes software developed by Niels Provos.\n"
1279
"Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
1280
"All rights reserved.\n"
1282
"Redistribution and use in source and binary forms, with or without\n"
1283
"modification, are permitted provided that the following conditions\n"
1285
"1. Redistributions of source code must retain the above copyright\n"
1286
" notice, this list of conditions and the following disclaimer.\n"
1287
"2. Redistributions in binary form must reproduce the above copyright\n"
1288
" notice, this list of conditions and the following disclaimer in the\n"
1289
" documentation and/or other materials provided with the distribution.\n"
1290
"3. All advertising materials mentioning features or use of this software\n"
1291
" must display the following acknowledgement:\n"
1292
" This product includes software developed by Niels Provos.\n"
1293
"4. The name of the author may not be used to endorse or promote products\n"
1294
" derived from this software without specific prior written permission.\n"
1296
"THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
1297
"IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
1298
"OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
1299
"IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
1300
"INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
1301
"NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
1302
"DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
1303
"THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
1304
"(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
1305
"THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2127
"Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
2128
"All rights reserved.\n"
2130
"Redistribution and use in source and binary forms, with or without\n"
2131
"modification, are permitted provided that the following conditions are\n"
2134
" * Redistributions of source code must retain the above copyright\n"
2135
"notice, this list of conditions and the following disclaimer.\n"
2137
" * Redistributions in binary form must reproduce the above\n"
2138
"copyright notice, this list of conditions and the following disclaimer\n"
2139
"in the documentation and/or other materials provided with the\n"
2142
" * Neither the name of the Danga Interactive nor the names of its\n"
2143
"contributors may be used to endorse or promote products derived from\n"
2144
"this software without specific prior written permission.\n"
2146
"THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
2147
"\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
2148
"LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
2149
"A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
2150
"OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
2151
"SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
2152
"LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2153
"DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2154
"THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2155
"(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
2156
"OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
2159
"This product includes software developed by Niels Provos.\n"
2163
"Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
2164
"All rights reserved.\n"
2166
"Redistribution and use in source and binary forms, with or without\n"
2167
"modification, are permitted provided that the following conditions\n"
2169
"1. Redistributions of source code must retain the above copyright\n"
2170
" notice, this list of conditions and the following disclaimer.\n"
2171
"2. Redistributions in binary form must reproduce the above copyright\n"
2172
" notice, this list of conditions and the following disclaimer in the\n"
2173
" documentation and/or other materials provided with the distribution.\n"
2174
"3. All advertising materials mentioning features or use of this software\n"
2175
" must display the following acknowledgement:\n"
2176
" This product includes software developed by Niels Provos.\n"
2177
"4. The name of the author may not be used to endorse or promote products\n"
2178
" derived from this software without specific prior written permission.\n"
2180
"THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
2181
"IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
2182
"OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
2183
"IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
2184
"INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
2185
"NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
2186
"DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
2187
"THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
2188
"(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
2189
"THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"