1
/* Copyright (c) 2008-2013 Dovecot authors, see the included COPYING memcached_ascii */
8
#include "connection.h"
9
#include "dict-transaction-memory.h"
10
#include "dict-private.h"
12
#define MEMCACHED_DEFAULT_PORT 11211
13
#define MEMCACHED_DEFAULT_LOOKUP_TIMEOUT_MSECS (1000*30)
14
#define DICT_USERNAME_SEPARATOR '/'
16
enum memcached_ascii_input_state {
17
/* GET: expecting VALUE or END */
18
MEMCACHED_INPUT_STATE_GET,
19
/* SET/(APPEND+ADD): expecting STORED / NOT_STORED */
20
MEMCACHED_INPUT_STATE_STORED,
21
/* DELETE: expecting DELETED */
22
MEMCACHED_INPUT_STATE_DELETED,
23
/* (INCR+ADD)/DECR: expecting number / NOT_FOUND / STORED / NOT_STORED */
24
MEMCACHED_INPUT_STATE_INCRDECR
27
struct memcached_ascii_connection {
28
struct connection conn;
29
struct memcached_ascii_dict *dict;
32
unsigned int reply_bytes_left;
34
bool value_waiting_end;
37
struct memcached_ascii_dict_reply {
38
unsigned int reply_count;
39
dict_transaction_commit_callback_t *callback;
43
struct dict_memcached_ascii_commit_ctx {
44
struct memcached_ascii_dict *dict;
45
struct dict_transaction_memory_context *memctx;
48
dict_transaction_commit_callback_t *callback;
52
struct memcached_ascii_dict {
55
char *username, *key_prefix;
57
unsigned int timeout_msecs;
59
struct ioloop *ioloop;
61
struct memcached_ascii_connection conn;
63
ARRAY(enum memcached_ascii_input_state) input_states;
64
ARRAY(struct memcached_ascii_dict_reply) replies;
67
static struct connection_list *memcached_ascii_connections;
69
static void memcached_ascii_conn_destroy(struct connection *_conn)
71
struct memcached_ascii_connection *conn =
72
(struct memcached_ascii_connection *)_conn;
73
const struct memcached_ascii_dict_reply *reply;
75
connection_disconnect(_conn);
76
if (conn->dict->ioloop != NULL)
77
io_loop_stop(conn->dict->ioloop);
79
array_foreach(&conn->dict->replies, reply) {
80
if (reply->callback != NULL)
81
reply->callback(-1, reply->context);
83
array_clear(&conn->dict->replies);
84
array_clear(&conn->dict->input_states);
85
conn->reply_bytes_left = 0;
88
static bool memcached_ascii_input_value(struct memcached_ascii_connection *conn)
90
const unsigned char *data;
93
data = i_stream_get_data(conn->conn.input, &size);
94
if (size > conn->reply_bytes_left)
95
size = conn->reply_bytes_left;
96
conn->reply_bytes_left -= size;
98
str_append_n(conn->reply_str, data, size);
99
i_stream_skip(conn->conn.input, size);
100
if (conn->reply_bytes_left > 0)
103
/* finished. drop the trailing CRLF */
104
str_truncate(conn->reply_str, str_len(conn->reply_str)-2);
105
conn->value_received = TRUE;
109
static int memcached_ascii_input_reply_read(struct memcached_ascii_dict *dict)
111
struct memcached_ascii_connection *conn = &dict->conn;
112
const enum memcached_ascii_input_state *states;
113
const char *line, *p;
117
if (conn->reply_bytes_left > 0) {
118
/* continue reading bulk reply */
119
if (!memcached_ascii_input_value(conn))
121
conn->value_waiting_end = TRUE;
122
} else if (conn->value_waiting_end) {
123
conn->value_waiting_end = FALSE;
125
str_truncate(conn->reply_str, 0);
126
conn->value_received = FALSE;
129
line = i_stream_next_line(conn->conn.input);
133
states = array_get(&dict->input_states, &count);
135
i_error("memcached_ascii: Unexpected input (expected nothing): %s",
140
case MEMCACHED_INPUT_STATE_GET:
141
/* VALUE <key> <flags> <bytes>
143
if (strncmp(line, "VALUE ", 6) == 0) {
144
p = strrchr(line, ' ');
145
if (str_to_uint(p+1, &conn->reply_bytes_left) < 0)
147
conn->reply_bytes_left += 2; /* CRLF */
148
return memcached_ascii_input_reply_read(dict);
149
} else if (strcmp(line, "END") == 0)
152
case MEMCACHED_INPUT_STATE_STORED:
153
if (strcmp(line, "STORED") != 0 &&
154
strcmp(line, "NOT_STORED") != 0)
157
case MEMCACHED_INPUT_STATE_DELETED:
158
if (strcmp(line, "DELETED") != 0)
161
case MEMCACHED_INPUT_STATE_INCRDECR:
162
if (strcmp(line, "NOT_FOUND") != 0 &&
163
strcmp(line, "STORED") != 0 &&
164
strcmp(line, "NOT_STORED") != 0 &&
165
str_to_llong(line, &num) < 0)
169
i_error("memcached_ascii: Unexpected input (state=%d): %s",
174
static int memcached_ascii_input_reply(struct memcached_ascii_dict *dict)
176
struct memcached_ascii_dict_reply *replies;
180
if ((ret = memcached_ascii_input_reply_read(dict)) <= 0)
182
/* finished a reply */
183
array_delete(&dict->input_states, 0, 1);
185
replies = array_get_modifiable(&dict->replies, &count);
187
i_assert(replies[0].reply_count > 0);
188
if (--replies[0].reply_count == 0) {
189
if (replies[0].callback != NULL)
190
replies[0].callback(1, replies[0].context);
191
array_delete(&dict->replies, 0, 1);
196
static void memcached_ascii_conn_input(struct connection *_conn)
198
struct memcached_ascii_connection *conn =
199
(struct memcached_ascii_connection *)_conn;
202
switch (i_stream_read(_conn->input)) {
206
memcached_ascii_conn_destroy(_conn);
212
while ((ret = memcached_ascii_input_reply(conn->dict)) > 0) ;
214
memcached_ascii_conn_destroy(_conn);
215
io_loop_stop(conn->dict->ioloop);
218
static int memcached_ascii_input_wait(struct memcached_ascii_dict *dict)
220
struct ioloop *old_ioloop = current_ioloop;
222
current_ioloop = dict->ioloop;
223
if (dict->to != NULL)
224
dict->to = io_loop_move_timeout(&dict->to);
225
connection_switch_ioloop(&dict->conn.conn);
226
io_loop_run(dict->ioloop);
228
current_ioloop = old_ioloop;
229
if (dict->to != NULL)
230
dict->to = io_loop_move_timeout(&dict->to);
231
connection_switch_ioloop(&dict->conn.conn);
233
return dict->conn.conn.fd_in == -1 ? -1 : 0;
236
static void memcached_ascii_input_timeout(struct memcached_ascii_dict *dict)
238
i_error("memcached_ascii: Request timed out in %u.%03u secs",
239
dict->timeout_msecs/1000, dict->timeout_msecs%1000);
240
memcached_ascii_conn_destroy(&dict->conn.conn);
243
static int memcached_ascii_wait_replies(struct memcached_ascii_dict *dict)
247
dict->to = timeout_add(dict->timeout_msecs,
248
memcached_ascii_input_timeout, dict);
249
while (array_count(&dict->input_states) > 0) {
250
i_assert(array_count(&dict->replies) > 0);
252
if ((ret = memcached_ascii_input_reply(dict)) != 0) {
254
memcached_ascii_conn_destroy(&dict->conn.conn);
257
ret = memcached_ascii_input_wait(dict);
262
timeout_remove(&dict->to);
263
return ret < 0 ? -1 : 0;
266
static int memcached_ascii_wait(struct memcached_ascii_dict *dict)
270
i_assert(dict->conn.conn.fd_in != -1);
272
if (dict->conn.conn.input == NULL) {
273
/* waiting for connection to finish */
274
dict->to = timeout_add(dict->timeout_msecs,
275
memcached_ascii_input_timeout, dict);
276
ret = memcached_ascii_input_wait(dict);
277
timeout_remove(&dict->to);
281
if (memcached_ascii_wait_replies(dict) < 0)
283
i_assert(array_count(&dict->input_states) == 0);
284
i_assert(array_count(&dict->replies) == 0);
289
memcached_ascii_conn_connected(struct connection *_conn, bool success)
291
struct memcached_ascii_connection *conn = (struct memcached_ascii_connection *)_conn;
294
i_error("memcached_ascii: connect(%s, %u) failed: %m",
295
net_ip2addr(&conn->dict->ip), conn->dict->port);
297
if (conn->dict->ioloop != NULL)
298
io_loop_stop(conn->dict->ioloop);
301
static const struct connection_settings memcached_ascii_conn_set = {
302
.input_max_size = (size_t)-1,
303
.output_max_size = (size_t)-1,
307
static const struct connection_vfuncs memcached_ascii_conn_vfuncs = {
308
.destroy = memcached_ascii_conn_destroy,
309
.input = memcached_ascii_conn_input,
310
.client_connected = memcached_ascii_conn_connected
313
static const char *memcached_ascii_escape_username(const char *username)
316
string_t *str = t_str_new(64);
318
for (p = username; *p != '\0'; p++) {
320
case DICT_USERNAME_SEPARATOR:
321
str_append(str, "\\-");
324
str_append(str, "\\\\");
327
str_append_c(str, *p);
334
memcached_ascii_dict_init(struct dict *driver, const char *uri,
335
enum dict_data_type value_type ATTR_UNUSED,
336
const char *username,
337
const char *base_dir ATTR_UNUSED,
338
struct dict **dict_r, const char **error_r)
340
struct memcached_ascii_dict *dict;
341
const char *const *args;
342
struct ioloop *old_ioloop = current_ioloop;
345
if (memcached_ascii_connections == NULL) {
346
memcached_ascii_connections =
347
connection_list_init(&memcached_ascii_conn_set,
348
&memcached_ascii_conn_vfuncs);
351
dict = i_new(struct memcached_ascii_dict, 1);
352
if (net_addr2ip("127.0.0.1", &dict->ip) < 0)
354
dict->port = MEMCACHED_DEFAULT_PORT;
355
dict->timeout_msecs = MEMCACHED_DEFAULT_LOOKUP_TIMEOUT_MSECS;
356
dict->key_prefix = i_strdup("");
358
args = t_strsplit(uri, ":");
359
for (; *args != NULL; args++) {
360
if (strncmp(*args, "host=", 5) == 0) {
361
if (net_addr2ip(*args+5, &dict->ip) < 0) {
362
*error_r = t_strdup_printf("Invalid IP: %s",
366
} else if (strncmp(*args, "port=", 5) == 0) {
367
if (str_to_uint(*args+5, &dict->port) < 0) {
368
*error_r = t_strdup_printf("Invalid port: %s",
372
} else if (strncmp(*args, "prefix=", 7) == 0) {
373
i_free(dict->key_prefix);
374
dict->key_prefix = i_strdup(*args + 7);
375
} else if (strncmp(*args, "timeout_msecs=", 14) == 0) {
376
if (str_to_uint(*args+14, &dict->timeout_msecs) < 0) {
377
*error_r = t_strdup_printf(
378
"Invalid timeout_msecs: %s", *args+14);
382
*error_r = t_strdup_printf("Unknown parameter: %s",
388
i_free(dict->key_prefix);
393
connection_init_client_ip(memcached_ascii_connections, &dict->conn.conn,
394
&dict->ip, dict->port);
395
dict->dict = *driver;
396
dict->conn.reply_str = str_new(default_pool, 256);
397
dict->conn.dict = dict;
399
if (strchr(username, DICT_USERNAME_SEPARATOR) == NULL)
400
dict->username = i_strdup(username);
402
/* escape the username */
403
dict->username = i_strdup(memcached_ascii_escape_username(username));
405
i_array_init(&dict->input_states, 4);
406
i_array_init(&dict->replies, 4);
408
dict->ioloop = io_loop_create();
409
current_ioloop = old_ioloop;
410
*dict_r = &dict->dict;
414
static void memcached_ascii_dict_deinit(struct dict *_dict)
416
struct memcached_ascii_dict *dict =
417
(struct memcached_ascii_dict *)_dict;
418
struct ioloop *old_ioloop = current_ioloop;
420
if (array_count(&dict->input_states) > 0)
421
(void)memcached_ascii_wait(dict);
422
connection_deinit(&dict->conn.conn);
424
current_ioloop = dict->ioloop;
425
io_loop_destroy(&dict->ioloop);
426
current_ioloop = old_ioloop;
428
str_free(&dict->conn.reply_str);
429
array_free(&dict->replies);
430
array_free(&dict->input_states);
431
i_free(dict->key_prefix);
432
i_free(dict->username);
435
if (memcached_ascii_connections->connections == NULL)
436
connection_list_deinit(&memcached_ascii_connections);
439
static int memcached_ascii_connect(struct memcached_ascii_dict *dict)
441
if (dict->conn.conn.input != NULL)
444
if (dict->conn.conn.fd_in == -1) {
445
if (connection_client_connect(&dict->conn.conn) < 0) {
446
i_error("memcached_ascii: Couldn't connect to %s:%u",
447
net_ip2addr(&dict->ip), dict->port);
451
return memcached_ascii_wait(dict);
455
memcached_ascii_dict_get_full_key(struct memcached_ascii_dict *dict,
458
if (strncmp(key, DICT_PATH_SHARED, strlen(DICT_PATH_SHARED)) == 0)
459
key += strlen(DICT_PATH_SHARED);
460
else if (strncmp(key, DICT_PATH_PRIVATE, strlen(DICT_PATH_PRIVATE)) == 0) {
461
key = t_strdup_printf("%s%c%s", dict->username,
462
DICT_USERNAME_SEPARATOR,
463
key + strlen(DICT_PATH_PRIVATE));
467
if (*dict->key_prefix != '\0')
468
key = t_strconcat(dict->key_prefix, key, NULL);
473
memcached_ascii_dict_lookup_real(struct memcached_ascii_dict *dict, pool_t pool,
474
const char *key, const char **value_r)
476
enum memcached_ascii_input_state state = MEMCACHED_INPUT_STATE_GET;
477
struct memcached_ascii_dict_reply *reply;
479
if (memcached_ascii_connect(dict) < 0)
482
key = memcached_ascii_dict_get_full_key(dict, key);
483
o_stream_nsend_str(dict->conn.conn.output,
484
t_strdup_printf("get %s\r\n", key));
485
array_append(&dict->input_states, &state, 1);
487
reply = array_append_space(&dict->replies);
488
reply->reply_count = 1;
490
if (memcached_ascii_wait(dict) < 0)
493
*value_r = p_strdup(pool, str_c(dict->conn.reply_str));
494
return dict->conn.value_received ? 1 : 0;
498
memcached_ascii_dict_lookup(struct dict *_dict, pool_t pool,
499
const char *key, const char **value_r)
501
struct memcached_ascii_dict *dict = (struct memcached_ascii_dict *)_dict;
504
if (pool->datastack_pool)
505
ret = memcached_ascii_dict_lookup_real(dict, pool, key, value_r);
507
ret = memcached_ascii_dict_lookup_real(dict, pool, key, value_r);
512
static struct dict_transaction_context *
513
memcached_ascii_transaction_init(struct dict *_dict)
515
struct dict_transaction_memory_context *ctx;
518
pool = pool_alloconly_create("file dict transaction", 2048);
519
ctx = p_new(pool, struct dict_transaction_memory_context, 1);
520
dict_transaction_memory_init(ctx, _dict, pool);
525
memcached_send_change(struct dict_memcached_ascii_commit_ctx *ctx,
526
const struct dict_transaction_memory_change *change)
528
enum memcached_ascii_input_state state;
529
const char *key, *value;
531
key = memcached_ascii_dict_get_full_key(ctx->dict, change->key);
533
str_truncate(ctx->str, 0);
534
switch (change->type) {
535
case DICT_CHANGE_TYPE_SET:
536
state = MEMCACHED_INPUT_STATE_STORED;
537
str_printfa(ctx->str, "set %s 0 0 %"PRIuSIZE_T"\r\n%s\r\n",
538
key, strlen(change->value.str), change->value.str);
540
case DICT_CHANGE_TYPE_UNSET:
541
state = MEMCACHED_INPUT_STATE_DELETED;
542
str_printfa(ctx->str, "delete %s\r\n", key);
544
case DICT_CHANGE_TYPE_APPEND:
545
state = MEMCACHED_INPUT_STATE_STORED;
546
str_printfa(ctx->str, "append %s 0 0 %"PRIuSIZE_T"\r\n%s\r\n",
547
key, strlen(change->value.str), change->value.str);
548
array_append(&ctx->dict->input_states, &state, 1);
549
/* we'd preferably want an append that always works, but
550
this kludge works for that too.. */
551
str_printfa(ctx->str, "add %s 0 0 %"PRIuSIZE_T"\r\n%s\r\n",
552
key, strlen(change->value.str), change->value.str);
554
case DICT_CHANGE_TYPE_INC:
555
state = MEMCACHED_INPUT_STATE_INCRDECR;
556
if (change->value.diff > 0) {
557
str_printfa(ctx->str, "incr %s %lld\r\n",
558
key, change->value.diff);
559
array_append(&ctx->dict->input_states, &state, 1);
560
/* same kludge as with append */
561
value = t_strdup_printf("%lld", change->value.diff);
562
str_printfa(ctx->str, "add %s 0 0 %u\r\n%s\r\n",
563
key, (unsigned int)strlen(value), value);
565
str_printfa(ctx->str, "decr %s %lld\r\n",
566
key, -change->value.diff);
570
array_append(&ctx->dict->input_states, &state, 1);
571
o_stream_nsend(ctx->dict->conn.conn.output,
572
str_data(ctx->str), str_len(ctx->str));
576
memcached_ascii_transaction_send(struct dict_memcached_ascii_commit_ctx *ctx)
578
struct memcached_ascii_dict *dict = ctx->dict;
579
struct memcached_ascii_dict_reply *reply;
580
const struct dict_transaction_memory_change *changes;
581
unsigned int i, count, old_state_count;
583
if (memcached_ascii_connect(dict) < 0)
586
old_state_count = array_count(&dict->input_states);
587
changes = array_get(&ctx->memctx->changes, &count);
590
o_stream_cork(dict->conn.conn.output);
591
for (i = 0; i < count; i++) T_BEGIN {
592
memcached_send_change(ctx, &changes[i]);
594
o_stream_uncork(dict->conn.conn.output);
596
reply = array_append_space(&dict->replies);
597
reply->callback = ctx->callback;
598
reply->context = ctx->context;
599
reply->reply_count = array_count(&dict->input_states) - old_state_count;
604
memcached_ascii_transaction_commit(struct dict_transaction_context *_ctx,
606
dict_transaction_commit_callback_t *callback,
609
struct dict_transaction_memory_context *ctx =
610
(struct dict_transaction_memory_context *)_ctx;
611
struct memcached_ascii_dict *dict =
612
(struct memcached_ascii_dict *)_ctx->dict;
613
struct dict_memcached_ascii_commit_ctx commit_ctx;
617
memset(&commit_ctx, 0, sizeof(commit_ctx));
618
commit_ctx.dict = dict;
619
commit_ctx.memctx = ctx;
620
commit_ctx.callback = callback;
621
commit_ctx.context = context;
622
commit_ctx.str = str_new(default_pool, 128);
624
ret = memcached_ascii_transaction_send(&commit_ctx);
625
if (!async && ret >= 0) {
626
if (memcached_ascii_wait(dict) < 0)
629
str_free(&commit_ctx.str);
631
if (callback != NULL)
632
callback(ret, context);
633
pool_unref(&ctx->pool);
637
struct dict dict_driver_memcached_ascii = {
638
.name = "memcached_ascii",
640
memcached_ascii_dict_init,
641
memcached_ascii_dict_deinit,
643
memcached_ascii_dict_lookup,
647
memcached_ascii_transaction_init,
648
memcached_ascii_transaction_commit,
649
dict_transaction_memory_rollback,
650
dict_transaction_memory_set,
651
dict_transaction_memory_unset,
652
dict_transaction_memory_append,
653
dict_transaction_memory_atomic_inc