~ubuntu-branches/ubuntu/wily/dovecot/wily

« back to all changes in this revision

Viewing changes to src/lib-dict/dict-memcached-ascii.c

  • Committer: Package Import Robot
  • Author(s): Jaldhar H. Vyas
  • Date: 2013-09-09 00:57:32 UTC
  • mfrom: (1.13.11)
  • mto: (4.8.5 experimental) (1.16.1)
  • mto: This revision was merged to the branch mainline in revision 97.
  • Revision ID: package-import@ubuntu.com-20130909005732-dn1eell8srqbhh0e
Tags: upstream-2.2.5
ImportĀ upstreamĀ versionĀ 2.2.5

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/* Copyright (c) 2008-2013 Dovecot authors, see the included COPYING memcached_ascii */
 
2
 
 
3
#include "lib.h"
 
4
#include "array.h"
 
5
#include "str.h"
 
6
#include "istream.h"
 
7
#include "ostream.h"
 
8
#include "connection.h"
 
9
#include "dict-transaction-memory.h"
 
10
#include "dict-private.h"
 
11
 
 
12
#define MEMCACHED_DEFAULT_PORT 11211
 
13
#define MEMCACHED_DEFAULT_LOOKUP_TIMEOUT_MSECS (1000*30)
 
14
#define DICT_USERNAME_SEPARATOR '/'
 
15
 
 
16
enum memcached_ascii_input_state {
 
17
        /* GET: expecting VALUE or END */
 
18
        MEMCACHED_INPUT_STATE_GET,
 
19
        /* SET/(APPEND+ADD): expecting STORED / NOT_STORED */
 
20
        MEMCACHED_INPUT_STATE_STORED,
 
21
        /* DELETE: expecting DELETED */
 
22
        MEMCACHED_INPUT_STATE_DELETED,
 
23
        /* (INCR+ADD)/DECR: expecting number / NOT_FOUND / STORED / NOT_STORED */
 
24
        MEMCACHED_INPUT_STATE_INCRDECR
 
25
};
 
26
 
 
27
struct memcached_ascii_connection {
 
28
        struct connection conn;
 
29
        struct memcached_ascii_dict *dict;
 
30
 
 
31
        string_t *reply_str;
 
32
        unsigned int reply_bytes_left;
 
33
        bool value_received;
 
34
        bool value_waiting_end;
 
35
};
 
36
 
 
37
struct memcached_ascii_dict_reply {
 
38
        unsigned int reply_count;
 
39
        dict_transaction_commit_callback_t *callback;
 
40
        void *context;
 
41
};
 
42
 
 
43
struct dict_memcached_ascii_commit_ctx {
 
44
        struct memcached_ascii_dict *dict;
 
45
        struct dict_transaction_memory_context *memctx;
 
46
        string_t *str;
 
47
 
 
48
        dict_transaction_commit_callback_t *callback;
 
49
        void *context;
 
50
};
 
51
 
 
52
struct memcached_ascii_dict {
 
53
        struct dict dict;
 
54
        struct ip_addr ip;
 
55
        char *username, *key_prefix;
 
56
        unsigned int port;
 
57
        unsigned int timeout_msecs;
 
58
 
 
59
        struct ioloop *ioloop;
 
60
        struct timeout *to;
 
61
        struct memcached_ascii_connection conn;
 
62
 
 
63
        ARRAY(enum memcached_ascii_input_state) input_states;
 
64
        ARRAY(struct memcached_ascii_dict_reply) replies;
 
65
};
 
66
 
 
67
static struct connection_list *memcached_ascii_connections;
 
68
 
 
69
static void memcached_ascii_conn_destroy(struct connection *_conn)
 
70
{
 
71
        struct memcached_ascii_connection *conn =
 
72
                (struct memcached_ascii_connection *)_conn;
 
73
        const struct memcached_ascii_dict_reply *reply;
 
74
 
 
75
        connection_disconnect(_conn);
 
76
        if (conn->dict->ioloop != NULL)
 
77
                io_loop_stop(conn->dict->ioloop);
 
78
 
 
79
        array_foreach(&conn->dict->replies, reply) {
 
80
                if (reply->callback != NULL)
 
81
                        reply->callback(-1, reply->context);
 
82
        }
 
83
        array_clear(&conn->dict->replies);
 
84
        array_clear(&conn->dict->input_states);
 
85
        conn->reply_bytes_left = 0;
 
86
}
 
87
 
 
88
static bool memcached_ascii_input_value(struct memcached_ascii_connection *conn)
 
89
{
 
90
        const unsigned char *data;
 
91
        size_t size;
 
92
 
 
93
        data = i_stream_get_data(conn->conn.input, &size);
 
94
        if (size > conn->reply_bytes_left)
 
95
                size = conn->reply_bytes_left;
 
96
        conn->reply_bytes_left -= size;
 
97
 
 
98
        str_append_n(conn->reply_str, data, size);
 
99
        i_stream_skip(conn->conn.input, size);
 
100
        if (conn->reply_bytes_left > 0)
 
101
                return FALSE;
 
102
 
 
103
        /* finished. drop the trailing CRLF */
 
104
        str_truncate(conn->reply_str, str_len(conn->reply_str)-2);
 
105
        conn->value_received = TRUE;
 
106
        return TRUE;
 
107
}
 
108
 
 
109
static int memcached_ascii_input_reply_read(struct memcached_ascii_dict *dict)
 
110
{
 
111
        struct memcached_ascii_connection *conn = &dict->conn;
 
112
        const enum memcached_ascii_input_state *states;
 
113
        const char *line, *p;
 
114
        unsigned int count;
 
115
        long long num;
 
116
 
 
117
        if (conn->reply_bytes_left > 0) {
 
118
                /* continue reading bulk reply */
 
119
                if (!memcached_ascii_input_value(conn))
 
120
                        return 0;
 
121
                conn->value_waiting_end = TRUE;
 
122
        } else if (conn->value_waiting_end) {
 
123
                conn->value_waiting_end = FALSE;
 
124
        } else {
 
125
                str_truncate(conn->reply_str, 0);
 
126
                conn->value_received = FALSE;
 
127
        }
 
128
 
 
129
        line = i_stream_next_line(conn->conn.input);
 
130
        if (line == NULL)
 
131
                return 0;
 
132
 
 
133
        states = array_get(&dict->input_states, &count);
 
134
        if (count == 0) {
 
135
                i_error("memcached_ascii: Unexpected input (expected nothing): %s",
 
136
                        line);
 
137
                return -1;
 
138
        }
 
139
        switch (states[0]) {
 
140
        case MEMCACHED_INPUT_STATE_GET:
 
141
                /* VALUE <key> <flags> <bytes>
 
142
                   END */
 
143
                if (strncmp(line, "VALUE ", 6) == 0) {
 
144
                        p = strrchr(line, ' ');
 
145
                        if (str_to_uint(p+1, &conn->reply_bytes_left) < 0)
 
146
                                break;
 
147
                        conn->reply_bytes_left += 2; /* CRLF */
 
148
                        return memcached_ascii_input_reply_read(dict);
 
149
                } else if (strcmp(line, "END") == 0)
 
150
                        return 1;
 
151
                break;
 
152
        case MEMCACHED_INPUT_STATE_STORED:
 
153
                if (strcmp(line, "STORED") != 0 &&
 
154
                    strcmp(line, "NOT_STORED") != 0)
 
155
                        break;
 
156
                return 1;
 
157
        case MEMCACHED_INPUT_STATE_DELETED:
 
158
                if (strcmp(line, "DELETED") != 0)
 
159
                        break;
 
160
                return 1;
 
161
        case MEMCACHED_INPUT_STATE_INCRDECR:
 
162
                if (strcmp(line, "NOT_FOUND") != 0 &&
 
163
                    strcmp(line, "STORED") != 0 &&
 
164
                    strcmp(line, "NOT_STORED") != 0 &&
 
165
                    str_to_llong(line, &num) < 0)
 
166
                        break;
 
167
                return 1;
 
168
        }
 
169
        i_error("memcached_ascii: Unexpected input (state=%d): %s",
 
170
                states[0], line);
 
171
        return -1;
 
172
}
 
173
 
 
174
static int memcached_ascii_input_reply(struct memcached_ascii_dict *dict)
 
175
{
 
176
        struct memcached_ascii_dict_reply *replies;
 
177
        unsigned int count;
 
178
        int ret;
 
179
 
 
180
        if ((ret = memcached_ascii_input_reply_read(dict)) <= 0)
 
181
                return ret;
 
182
        /* finished a reply */
 
183
        array_delete(&dict->input_states, 0, 1);
 
184
 
 
185
        replies = array_get_modifiable(&dict->replies, &count);
 
186
        i_assert(count > 0);
 
187
        i_assert(replies[0].reply_count > 0);
 
188
        if (--replies[0].reply_count == 0) {
 
189
                if (replies[0].callback != NULL)
 
190
                        replies[0].callback(1, replies[0].context);
 
191
                array_delete(&dict->replies, 0, 1);
 
192
        }
 
193
        return 1;
 
194
}
 
195
 
 
196
static void memcached_ascii_conn_input(struct connection *_conn)
 
197
{
 
198
        struct memcached_ascii_connection *conn =
 
199
                (struct memcached_ascii_connection *)_conn;
 
200
        int ret;
 
201
 
 
202
        switch (i_stream_read(_conn->input)) {
 
203
        case 0:
 
204
                return;
 
205
        case -1:
 
206
                memcached_ascii_conn_destroy(_conn);
 
207
                return;
 
208
        default:
 
209
                break;
 
210
        }
 
211
 
 
212
        while ((ret = memcached_ascii_input_reply(conn->dict)) > 0) ;
 
213
        if (ret < 0)
 
214
                memcached_ascii_conn_destroy(_conn);
 
215
        io_loop_stop(conn->dict->ioloop);
 
216
}
 
217
 
 
218
static int memcached_ascii_input_wait(struct memcached_ascii_dict *dict)
 
219
{
 
220
        struct ioloop *old_ioloop = current_ioloop;
 
221
 
 
222
        current_ioloop = dict->ioloop;
 
223
        if (dict->to != NULL)
 
224
                dict->to = io_loop_move_timeout(&dict->to);
 
225
        connection_switch_ioloop(&dict->conn.conn);
 
226
        io_loop_run(dict->ioloop);
 
227
 
 
228
        current_ioloop = old_ioloop;
 
229
        if (dict->to != NULL)
 
230
                dict->to = io_loop_move_timeout(&dict->to);
 
231
        connection_switch_ioloop(&dict->conn.conn);
 
232
 
 
233
        return dict->conn.conn.fd_in == -1 ? -1 : 0;
 
234
}
 
235
 
 
236
static void memcached_ascii_input_timeout(struct memcached_ascii_dict *dict)
 
237
{
 
238
        i_error("memcached_ascii: Request timed out in %u.%03u secs",
 
239
                dict->timeout_msecs/1000, dict->timeout_msecs%1000);
 
240
        memcached_ascii_conn_destroy(&dict->conn.conn);
 
241
}
 
242
 
 
243
static int memcached_ascii_wait_replies(struct memcached_ascii_dict *dict)
 
244
{
 
245
        int ret = 0;
 
246
 
 
247
        dict->to = timeout_add(dict->timeout_msecs,
 
248
                               memcached_ascii_input_timeout, dict);
 
249
        while (array_count(&dict->input_states) > 0) {
 
250
                i_assert(array_count(&dict->replies) > 0);
 
251
 
 
252
                if ((ret = memcached_ascii_input_reply(dict)) != 0) {
 
253
                        if (ret < 0)
 
254
                                memcached_ascii_conn_destroy(&dict->conn.conn);
 
255
                        break;
 
256
                }
 
257
                ret = memcached_ascii_input_wait(dict);
 
258
                if (ret != 0)
 
259
                        break;
 
260
        }
 
261
 
 
262
        timeout_remove(&dict->to);
 
263
        return ret < 0 ? -1 : 0;
 
264
}
 
265
 
 
266
static int memcached_ascii_wait(struct memcached_ascii_dict *dict)
 
267
{
 
268
        int ret;
 
269
 
 
270
        i_assert(dict->conn.conn.fd_in != -1);
 
271
 
 
272
        if (dict->conn.conn.input == NULL) {
 
273
                /* waiting for connection to finish */
 
274
                dict->to = timeout_add(dict->timeout_msecs,
 
275
                                       memcached_ascii_input_timeout, dict);
 
276
                ret = memcached_ascii_input_wait(dict);
 
277
                timeout_remove(&dict->to);
 
278
                if (ret < 0)
 
279
                        return -1;
 
280
        }
 
281
        if (memcached_ascii_wait_replies(dict) < 0)
 
282
                return -1;
 
283
        i_assert(array_count(&dict->input_states) == 0);
 
284
        i_assert(array_count(&dict->replies) == 0);
 
285
        return 0;
 
286
}
 
287
 
 
288
static void
 
289
memcached_ascii_conn_connected(struct connection *_conn, bool success)
 
290
{
 
291
        struct memcached_ascii_connection *conn = (struct memcached_ascii_connection *)_conn;
 
292
 
 
293
        if (!success) {
 
294
                i_error("memcached_ascii: connect(%s, %u) failed: %m",
 
295
                        net_ip2addr(&conn->dict->ip), conn->dict->port);
 
296
        }
 
297
        if (conn->dict->ioloop != NULL)
 
298
                io_loop_stop(conn->dict->ioloop);
 
299
}
 
300
 
 
301
static const struct connection_settings memcached_ascii_conn_set = {
 
302
        .input_max_size = (size_t)-1,
 
303
        .output_max_size = (size_t)-1,
 
304
        .client = TRUE
 
305
};
 
306
 
 
307
static const struct connection_vfuncs memcached_ascii_conn_vfuncs = {
 
308
        .destroy = memcached_ascii_conn_destroy,
 
309
        .input = memcached_ascii_conn_input,
 
310
        .client_connected = memcached_ascii_conn_connected
 
311
};
 
312
 
 
313
static const char *memcached_ascii_escape_username(const char *username)
 
314
{
 
315
        const char *p;
 
316
        string_t *str = t_str_new(64);
 
317
 
 
318
        for (p = username; *p != '\0'; p++) {
 
319
                switch (*p) {
 
320
                case DICT_USERNAME_SEPARATOR:
 
321
                        str_append(str, "\\-");
 
322
                        break;
 
323
                case '\\':
 
324
                        str_append(str, "\\\\");
 
325
                        break;
 
326
                default:
 
327
                        str_append_c(str, *p);
 
328
                }
 
329
        }
 
330
        return str_c(str);
 
331
}
 
332
 
 
333
static int
 
334
memcached_ascii_dict_init(struct dict *driver, const char *uri,
 
335
                          enum dict_data_type value_type ATTR_UNUSED,
 
336
                          const char *username,
 
337
                          const char *base_dir ATTR_UNUSED,
 
338
                          struct dict **dict_r, const char **error_r)
 
339
{
 
340
        struct memcached_ascii_dict *dict;
 
341
        const char *const *args;
 
342
        struct ioloop *old_ioloop = current_ioloop;
 
343
        int ret = 0;
 
344
 
 
345
        if (memcached_ascii_connections == NULL) {
 
346
                memcached_ascii_connections =
 
347
                        connection_list_init(&memcached_ascii_conn_set,
 
348
                                             &memcached_ascii_conn_vfuncs);
 
349
        }
 
350
 
 
351
        dict = i_new(struct memcached_ascii_dict, 1);
 
352
        if (net_addr2ip("127.0.0.1", &dict->ip) < 0)
 
353
                i_unreached();
 
354
        dict->port = MEMCACHED_DEFAULT_PORT;
 
355
        dict->timeout_msecs = MEMCACHED_DEFAULT_LOOKUP_TIMEOUT_MSECS;
 
356
        dict->key_prefix = i_strdup("");
 
357
 
 
358
        args = t_strsplit(uri, ":");
 
359
        for (; *args != NULL; args++) {
 
360
                if (strncmp(*args, "host=", 5) == 0) {
 
361
                        if (net_addr2ip(*args+5, &dict->ip) < 0) {
 
362
                                *error_r = t_strdup_printf("Invalid IP: %s",
 
363
                                                           *args+5);
 
364
                                ret = -1;
 
365
                        }
 
366
                } else if (strncmp(*args, "port=", 5) == 0) {
 
367
                        if (str_to_uint(*args+5, &dict->port) < 0) {
 
368
                                *error_r = t_strdup_printf("Invalid port: %s",
 
369
                                                           *args+5);
 
370
                                ret = -1;
 
371
                        }
 
372
                } else if (strncmp(*args, "prefix=", 7) == 0) {
 
373
                        i_free(dict->key_prefix);
 
374
                        dict->key_prefix = i_strdup(*args + 7);
 
375
                } else if (strncmp(*args, "timeout_msecs=", 14) == 0) {
 
376
                        if (str_to_uint(*args+14, &dict->timeout_msecs) < 0) {
 
377
                                *error_r = t_strdup_printf(
 
378
                                        "Invalid timeout_msecs: %s", *args+14);
 
379
                                ret = -1;
 
380
                        }
 
381
                } else {
 
382
                        *error_r = t_strdup_printf("Unknown parameter: %s",
 
383
                                                   *args);
 
384
                        ret = -1;
 
385
                }
 
386
        }
 
387
        if (ret < 0) {
 
388
                i_free(dict->key_prefix);
 
389
                i_free(dict);
 
390
                return -1;
 
391
        }
 
392
 
 
393
        connection_init_client_ip(memcached_ascii_connections, &dict->conn.conn,
 
394
                                  &dict->ip, dict->port);
 
395
        dict->dict = *driver;
 
396
        dict->conn.reply_str = str_new(default_pool, 256);
 
397
        dict->conn.dict = dict;
 
398
 
 
399
        if (strchr(username, DICT_USERNAME_SEPARATOR) == NULL)
 
400
                dict->username = i_strdup(username);
 
401
        else {
 
402
                /* escape the username */
 
403
                dict->username = i_strdup(memcached_ascii_escape_username(username));
 
404
        }
 
405
        i_array_init(&dict->input_states, 4);
 
406
        i_array_init(&dict->replies, 4);
 
407
 
 
408
        dict->ioloop = io_loop_create();
 
409
        current_ioloop = old_ioloop;
 
410
        *dict_r = &dict->dict;
 
411
        return 0;
 
412
}
 
413
 
 
414
static void memcached_ascii_dict_deinit(struct dict *_dict)
 
415
{
 
416
        struct memcached_ascii_dict *dict =
 
417
                (struct memcached_ascii_dict *)_dict;
 
418
        struct ioloop *old_ioloop = current_ioloop;
 
419
 
 
420
        if (array_count(&dict->input_states) > 0)
 
421
                (void)memcached_ascii_wait(dict);
 
422
        connection_deinit(&dict->conn.conn);
 
423
 
 
424
        current_ioloop = dict->ioloop;
 
425
        io_loop_destroy(&dict->ioloop);
 
426
        current_ioloop = old_ioloop;
 
427
 
 
428
        str_free(&dict->conn.reply_str);
 
429
        array_free(&dict->replies);
 
430
        array_free(&dict->input_states);
 
431
        i_free(dict->key_prefix);
 
432
        i_free(dict->username);
 
433
        i_free(dict);
 
434
 
 
435
        if (memcached_ascii_connections->connections == NULL)
 
436
                connection_list_deinit(&memcached_ascii_connections);
 
437
}
 
438
 
 
439
static int memcached_ascii_connect(struct memcached_ascii_dict *dict)
 
440
{
 
441
        if (dict->conn.conn.input != NULL)
 
442
                return 0;
 
443
 
 
444
        if (dict->conn.conn.fd_in == -1) {
 
445
                if (connection_client_connect(&dict->conn.conn) < 0) {
 
446
                        i_error("memcached_ascii: Couldn't connect to %s:%u",
 
447
                                net_ip2addr(&dict->ip), dict->port);
 
448
                        return -1;
 
449
                }
 
450
        }
 
451
        return memcached_ascii_wait(dict);
 
452
}
 
453
 
 
454
static const char *
 
455
memcached_ascii_dict_get_full_key(struct memcached_ascii_dict *dict,
 
456
                                  const char *key)
 
457
{
 
458
        if (strncmp(key, DICT_PATH_SHARED, strlen(DICT_PATH_SHARED)) == 0)
 
459
                key += strlen(DICT_PATH_SHARED);
 
460
        else if (strncmp(key, DICT_PATH_PRIVATE, strlen(DICT_PATH_PRIVATE)) == 0) {
 
461
                key = t_strdup_printf("%s%c%s", dict->username,
 
462
                                      DICT_USERNAME_SEPARATOR,
 
463
                                      key + strlen(DICT_PATH_PRIVATE));
 
464
        } else {
 
465
                i_unreached();
 
466
        }
 
467
        if (*dict->key_prefix != '\0')
 
468
                key = t_strconcat(dict->key_prefix, key, NULL);
 
469
        return key;
 
470
}
 
471
 
 
472
static int
 
473
memcached_ascii_dict_lookup_real(struct memcached_ascii_dict *dict, pool_t pool,
 
474
                                 const char *key, const char **value_r)
 
475
{
 
476
        enum memcached_ascii_input_state state = MEMCACHED_INPUT_STATE_GET;
 
477
        struct memcached_ascii_dict_reply *reply;
 
478
 
 
479
        if (memcached_ascii_connect(dict) < 0)
 
480
                return -1;
 
481
 
 
482
        key = memcached_ascii_dict_get_full_key(dict, key);
 
483
        o_stream_nsend_str(dict->conn.conn.output,
 
484
                           t_strdup_printf("get %s\r\n", key));
 
485
        array_append(&dict->input_states, &state, 1);
 
486
 
 
487
        reply = array_append_space(&dict->replies);
 
488
        reply->reply_count = 1;
 
489
 
 
490
        if (memcached_ascii_wait(dict) < 0)
 
491
                return -1;
 
492
 
 
493
        *value_r = p_strdup(pool, str_c(dict->conn.reply_str));
 
494
        return dict->conn.value_received ? 1 : 0;
 
495
}
 
496
 
 
497
static int
 
498
memcached_ascii_dict_lookup(struct dict *_dict, pool_t pool,
 
499
                            const char *key, const char **value_r)
 
500
{
 
501
        struct memcached_ascii_dict *dict = (struct memcached_ascii_dict *)_dict;
 
502
        int ret;
 
503
 
 
504
        if (pool->datastack_pool)
 
505
                ret = memcached_ascii_dict_lookup_real(dict, pool, key, value_r);
 
506
        else T_BEGIN {
 
507
                ret = memcached_ascii_dict_lookup_real(dict, pool, key, value_r);
 
508
        } T_END;
 
509
        return ret;
 
510
}
 
511
 
 
512
static struct dict_transaction_context *
 
513
memcached_ascii_transaction_init(struct dict *_dict)
 
514
{
 
515
        struct dict_transaction_memory_context *ctx;
 
516
        pool_t pool;
 
517
 
 
518
        pool = pool_alloconly_create("file dict transaction", 2048);
 
519
        ctx = p_new(pool, struct dict_transaction_memory_context, 1);
 
520
        dict_transaction_memory_init(ctx, _dict, pool);
 
521
        return &ctx->ctx;
 
522
}
 
523
 
 
524
static void
 
525
memcached_send_change(struct dict_memcached_ascii_commit_ctx *ctx,
 
526
                      const struct dict_transaction_memory_change *change)
 
527
{
 
528
        enum memcached_ascii_input_state state;
 
529
        const char *key, *value;
 
530
 
 
531
        key = memcached_ascii_dict_get_full_key(ctx->dict, change->key);
 
532
 
 
533
        str_truncate(ctx->str, 0);
 
534
        switch (change->type) {
 
535
        case DICT_CHANGE_TYPE_SET:
 
536
                state = MEMCACHED_INPUT_STATE_STORED;
 
537
                str_printfa(ctx->str, "set %s 0 0 %"PRIuSIZE_T"\r\n%s\r\n",
 
538
                            key, strlen(change->value.str), change->value.str);
 
539
                break;
 
540
        case DICT_CHANGE_TYPE_UNSET:
 
541
                state = MEMCACHED_INPUT_STATE_DELETED;
 
542
                str_printfa(ctx->str, "delete %s\r\n", key);
 
543
                break;
 
544
        case DICT_CHANGE_TYPE_APPEND:
 
545
                state = MEMCACHED_INPUT_STATE_STORED;
 
546
                str_printfa(ctx->str, "append %s 0 0 %"PRIuSIZE_T"\r\n%s\r\n",
 
547
                            key, strlen(change->value.str), change->value.str);
 
548
                array_append(&ctx->dict->input_states, &state, 1);
 
549
                /* we'd preferably want an append that always works, but
 
550
                   this kludge works for that too.. */
 
551
                str_printfa(ctx->str, "add %s 0 0 %"PRIuSIZE_T"\r\n%s\r\n",
 
552
                            key, strlen(change->value.str), change->value.str);
 
553
                break;
 
554
        case DICT_CHANGE_TYPE_INC:
 
555
                state = MEMCACHED_INPUT_STATE_INCRDECR;
 
556
                if (change->value.diff > 0) {
 
557
                        str_printfa(ctx->str, "incr %s %lld\r\n",
 
558
                                    key, change->value.diff);
 
559
                        array_append(&ctx->dict->input_states, &state, 1);
 
560
                        /* same kludge as with append */
 
561
                        value = t_strdup_printf("%lld", change->value.diff);
 
562
                        str_printfa(ctx->str, "add %s 0 0 %u\r\n%s\r\n",
 
563
                                    key, (unsigned int)strlen(value), value);
 
564
                } else {
 
565
                        str_printfa(ctx->str, "decr %s %lld\r\n",
 
566
                                    key, -change->value.diff);
 
567
                }
 
568
                break;
 
569
        }
 
570
        array_append(&ctx->dict->input_states, &state, 1);
 
571
        o_stream_nsend(ctx->dict->conn.conn.output,
 
572
                       str_data(ctx->str), str_len(ctx->str));
 
573
}
 
574
 
 
575
static int
 
576
memcached_ascii_transaction_send(struct dict_memcached_ascii_commit_ctx *ctx)
 
577
{
 
578
        struct memcached_ascii_dict *dict = ctx->dict;
 
579
        struct memcached_ascii_dict_reply *reply;
 
580
        const struct dict_transaction_memory_change *changes;
 
581
        unsigned int i, count, old_state_count;
 
582
 
 
583
        if (memcached_ascii_connect(dict) < 0)
 
584
                return -1;
 
585
 
 
586
        old_state_count = array_count(&dict->input_states);
 
587
        changes = array_get(&ctx->memctx->changes, &count);
 
588
        i_assert(count > 0);
 
589
 
 
590
        o_stream_cork(dict->conn.conn.output);
 
591
        for (i = 0; i < count; i++) T_BEGIN {
 
592
                memcached_send_change(ctx, &changes[i]);
 
593
        } T_END;
 
594
        o_stream_uncork(dict->conn.conn.output);
 
595
 
 
596
        reply = array_append_space(&dict->replies);
 
597
        reply->callback = ctx->callback;
 
598
        reply->context = ctx->context;
 
599
        reply->reply_count = array_count(&dict->input_states) - old_state_count;
 
600
        return 1;
 
601
}
 
602
 
 
603
static int
 
604
memcached_ascii_transaction_commit(struct dict_transaction_context *_ctx,
 
605
                                   bool async,
 
606
                                   dict_transaction_commit_callback_t *callback,
 
607
                                   void *context)
 
608
{
 
609
        struct dict_transaction_memory_context *ctx =
 
610
                (struct dict_transaction_memory_context *)_ctx;
 
611
        struct memcached_ascii_dict *dict =
 
612
                (struct memcached_ascii_dict *)_ctx->dict;
 
613
        struct dict_memcached_ascii_commit_ctx commit_ctx;
 
614
        int ret = 1;
 
615
 
 
616
        if (_ctx->changed) {
 
617
                memset(&commit_ctx, 0, sizeof(commit_ctx));
 
618
                commit_ctx.dict = dict;
 
619
                commit_ctx.memctx = ctx;
 
620
                commit_ctx.callback = callback;
 
621
                commit_ctx.context = context;
 
622
                commit_ctx.str = str_new(default_pool, 128);
 
623
 
 
624
                ret = memcached_ascii_transaction_send(&commit_ctx);
 
625
                if (!async && ret >= 0) {
 
626
                        if (memcached_ascii_wait(dict) < 0)
 
627
                                ret = -1;
 
628
                }
 
629
                str_free(&commit_ctx.str);
 
630
        }
 
631
        if (callback != NULL)
 
632
                callback(ret, context);
 
633
        pool_unref(&ctx->pool);
 
634
        return ret;
 
635
}
 
636
 
 
637
struct dict dict_driver_memcached_ascii = {
 
638
        .name = "memcached_ascii",
 
639
        {
 
640
                memcached_ascii_dict_init,
 
641
                memcached_ascii_dict_deinit,
 
642
                NULL,
 
643
                memcached_ascii_dict_lookup,
 
644
                NULL,
 
645
                NULL,
 
646
                NULL,
 
647
                memcached_ascii_transaction_init,
 
648
                memcached_ascii_transaction_commit,
 
649
                dict_transaction_memory_rollback,
 
650
                dict_transaction_memory_set,
 
651
                dict_transaction_memory_unset,
 
652
                dict_transaction_memory_append,
 
653
                dict_transaction_memory_atomic_inc
 
654
        }
 
655
};