1
/* Copyright (c) 2009-2011 Dovecot authors, see the included COPYING file */
9
#include "istream-dot.h"
11
#include "imap-util.h"
12
#include "master-service.h"
13
#include "dsync-worker.h"
14
#include "dsync-proxy.h"
15
#include "dsync-proxy-server.h"
19
#define OUTBUF_THROTTLE_SIZE (1024*64)
22
proxy_server_is_output_full(struct dsync_proxy_server *server)
24
return o_stream_get_buffer_used_size(server->output) >=
29
cmd_box_list(struct dsync_proxy_server *server,
30
const char *const *args ATTR_UNUSED)
32
struct dsync_mailbox dsync_box;
36
if (server->mailbox_iter == NULL) {
37
server->mailbox_iter =
38
dsync_worker_mailbox_iter_init(server->worker);
42
while ((ret = dsync_worker_mailbox_iter_next(server->mailbox_iter,
45
dsync_proxy_mailbox_export(str, &dsync_box);
46
str_append_c(str, '\n');
47
o_stream_send(server->output, str_data(str), str_len(str));
48
if (proxy_server_is_output_full(server))
53
o_stream_set_flush_pending(server->output, TRUE);
56
if (dsync_worker_mailbox_iter_deinit(&server->mailbox_iter) < 0) {
57
o_stream_send(server->output, "-\n", 2);
60
o_stream_send(server->output, "+\n", 2);
65
static bool cmd_subs_list_subscriptions(struct dsync_proxy_server *server)
67
struct dsync_worker_subscription rec;
72
while ((ret = dsync_worker_subs_iter_next(server->subs_iter,
75
str_tabescape_write(str, rec.vname);
76
str_append_c(str, '\t');
77
str_tabescape_write(str, rec.storage_name);
78
str_append_c(str, '\t');
79
str_tabescape_write(str, rec.ns_prefix);
80
str_printfa(str, "\t%ld\n", (long)rec.last_change);
81
o_stream_send(server->output, str_data(str), str_len(str));
82
if (proxy_server_is_output_full(server))
87
o_stream_set_flush_pending(server->output, TRUE);
93
static bool cmd_subs_list_unsubscriptions(struct dsync_proxy_server *server)
95
struct dsync_worker_unsubscription rec;
100
while ((ret = dsync_worker_subs_iter_next_un(server->subs_iter,
102
str_truncate(str, 0);
103
dsync_proxy_mailbox_guid_export(str, &rec.name_sha1);
104
str_append_c(str, '\t');
105
str_tabescape_write(str, rec.ns_prefix);
106
str_printfa(str, "\t%ld\n", (long)rec.last_change);
107
o_stream_send(server->output, str_data(str), str_len(str));
108
if (proxy_server_is_output_full(server))
113
o_stream_set_flush_pending(server->output, TRUE);
120
cmd_subs_list(struct dsync_proxy_server *server,
121
const char *const *args ATTR_UNUSED)
123
if (server->subs_iter == NULL) {
125
dsync_worker_subs_iter_init(server->worker);
128
if (!server->subs_sending_unsubscriptions) {
129
if (!cmd_subs_list_subscriptions(server))
131
/* a bit hacky way to handle this. this assumes that caller
132
goes through all subscriptions first, and next starts
133
going through unsubscriptions */
134
o_stream_send(server->output, "+\n", 2);
135
server->subs_sending_unsubscriptions = TRUE;
137
if (!cmd_subs_list_unsubscriptions(server))
140
server->subs_sending_unsubscriptions = FALSE;
141
if (dsync_worker_subs_iter_deinit(&server->subs_iter) < 0) {
142
o_stream_send(server->output, "-\n", 2);
145
o_stream_send(server->output, "+\n", 2);
151
cmd_subs_set(struct dsync_proxy_server *server, const char *const *args)
153
if (str_array_length(args) < 3) {
154
i_error("subs-set: Missing parameters");
158
dsync_worker_set_subscribed(server->worker, args[0],
159
strtoul(args[1], NULL, 10),
160
strcmp(args[2], "1") == 0);
165
cmd_msg_list_init(struct dsync_proxy_server *server, const char *const *args)
167
mailbox_guid_t *mailboxes;
168
unsigned int i, count;
171
count = str_array_length(args);
172
mailboxes = count == 0 ? NULL : t_new(mailbox_guid_t, count);
173
for (i = 0; i < count; i++) {
175
ret = dsync_proxy_mailbox_guid_import(args[i],
180
i_error("msg-list: Invalid mailbox GUID '%s'", args[i]);
184
server->msg_iter = dsync_worker_msg_iter_init(server->worker,
190
cmd_msg_list(struct dsync_proxy_server *server, const char *const *args)
192
unsigned int mailbox_idx;
193
struct dsync_message msg;
197
if (server->msg_iter == NULL) {
198
if (cmd_msg_list_init(server, args) < 0)
202
str = t_str_new(256);
203
while ((ret = dsync_worker_msg_iter_next(server->msg_iter,
204
&mailbox_idx, &msg)) > 0) {
205
str_truncate(str, 0);
206
str_printfa(str, "%u\t", mailbox_idx);
207
dsync_proxy_msg_export(str, &msg);
208
str_append_c(str, '\n');
209
o_stream_send(server->output, str_data(str), str_len(str));
210
if (proxy_server_is_output_full(server))
215
o_stream_set_flush_pending(server->output, TRUE);
218
if (dsync_worker_msg_iter_deinit(&server->msg_iter) < 0) {
219
o_stream_send(server->output, "-\n", 2);
222
o_stream_send(server->output, "+\n", 2);
228
cmd_box_create(struct dsync_proxy_server *server, const char *const *args)
230
struct dsync_mailbox dsync_box;
233
if (dsync_proxy_mailbox_import_unescaped(pool_datastack_create(),
236
i_error("Invalid mailbox input: %s", error);
239
dsync_worker_create_mailbox(server->worker, &dsync_box);
244
cmd_box_delete(struct dsync_proxy_server *server, const char *const *args)
247
struct dsync_mailbox dsync_box;
249
if (str_array_length(args) < 2)
251
if (dsync_proxy_mailbox_guid_import(args[0], &guid) < 0) {
252
i_error("box-delete: Invalid mailbox GUID '%s'", args[0]);
256
memset(&dsync_box, 0, sizeof(dsync_box));
257
dsync_box.mailbox_guid = guid;
258
dsync_box.last_change = strtoul(args[1], NULL, 10);
259
dsync_worker_delete_mailbox(server->worker, &dsync_box);
264
cmd_dir_delete(struct dsync_proxy_server *server, const char *const *args)
266
struct dsync_mailbox dsync_box;
268
if (str_array_length(args) < 2)
271
memset(&dsync_box, 0, sizeof(dsync_box));
272
dsync_box.name = str_tabunescape(t_strdup_noconst(args[0]));
273
dsync_box.last_change = strtoul(args[1], NULL, 10);
274
dsync_worker_delete_dir(server->worker, &dsync_box);
279
cmd_box_rename(struct dsync_proxy_server *server, const char *const *args)
282
struct dsync_mailbox dsync_box;
284
if (str_array_length(args) < 3)
286
if (dsync_proxy_mailbox_guid_import(args[0], &guid) < 0) {
287
i_error("box-delete: Invalid mailbox GUID '%s'", args[0]);
291
memset(&dsync_box, 0, sizeof(dsync_box));
292
dsync_box.name = args[1];
293
dsync_box.name_sep = args[2][0];
294
dsync_worker_rename_mailbox(server->worker, &guid, &dsync_box);
299
cmd_box_update(struct dsync_proxy_server *server, const char *const *args)
301
struct dsync_mailbox dsync_box;
304
if (dsync_proxy_mailbox_import_unescaped(pool_datastack_create(),
307
i_error("Invalid mailbox input: %s", error);
310
dsync_worker_update_mailbox(server->worker, &dsync_box);
315
cmd_box_select(struct dsync_proxy_server *server, const char *const *args)
317
struct dsync_mailbox box;
318
unsigned int i, count;
320
memset(&box, 0, sizeof(box));
321
if (args[0] == NULL ||
322
dsync_proxy_mailbox_guid_import(args[0], &box.mailbox_guid) < 0) {
323
i_error("box-select: Invalid mailbox GUID '%s'", args[0]);
328
count = str_array_length(args);
329
t_array_init(&box.cache_fields, count + 1);
330
for (i = 0; i < count; i++)
331
array_append(&box.cache_fields, &args[i], 1);
332
dsync_worker_select_mailbox(server->worker, &box);
337
cmd_msg_update(struct dsync_proxy_server *server, const char *const *args)
339
struct dsync_message msg;
341
/* uid modseq flags */
342
if (str_array_length(args) < 3)
345
memset(&msg, 0, sizeof(msg));
346
msg.uid = strtoul(args[0], NULL, 10);
347
msg.modseq = strtoull(args[1], NULL, 10);
348
if (dsync_proxy_msg_parse_flags(pool_datastack_create(),
352
dsync_worker_msg_update_metadata(server->worker, &msg);
357
cmd_msg_uid_change(struct dsync_proxy_server *server, const char *const *args)
359
if (args[0] == NULL || args[1] == NULL)
362
dsync_worker_msg_update_uid(server->worker,
363
strtoul(args[0], NULL, 10),
364
strtoul(args[1], NULL, 10));
369
cmd_msg_expunge(struct dsync_proxy_server *server, const char *const *args)
374
dsync_worker_msg_expunge(server->worker, strtoul(args[0], NULL, 10));
378
static void copy_callback(bool success, void *context)
380
struct dsync_proxy_server *server = context;
383
i_assert(server->copy_uid != 0);
385
reply = t_strdup_printf("%d\t%u\n", success ? 1 : 0, server->copy_uid);
386
o_stream_send_str(server->output, reply);
390
cmd_msg_copy(struct dsync_proxy_server *server, const char *const *args)
392
mailbox_guid_t src_mailbox_guid;
394
struct dsync_message msg;
397
/* src_mailbox_guid src_uid <message> */
398
if (str_array_length(args) < 3)
401
if (dsync_proxy_mailbox_guid_import(args[0], &src_mailbox_guid) < 0) {
402
i_error("msg-copy: Invalid mailbox GUID '%s'", args[0]);
405
src_uid = strtoul(args[1], NULL, 10);
407
if (dsync_proxy_msg_import_unescaped(pool_datastack_create(),
408
args + 2, &msg, &error) < 0)
409
i_error("Invalid message input: %s", error);
411
server->copy_uid = src_uid;
412
dsync_worker_msg_copy(server->worker, &src_mailbox_guid, src_uid, &msg,
413
copy_callback, server);
414
server->copy_uid = 0;
418
static void cmd_msg_save_callback(void *context)
420
struct dsync_proxy_server *server = context;
422
server->save_finished = TRUE;
426
cmd_msg_save(struct dsync_proxy_server *server, const char *const *args)
428
struct dsync_message msg;
429
struct dsync_msg_static_data data;
433
if (dsync_proxy_msg_static_import_unescaped(pool_datastack_create(),
434
args, &data, &error) < 0) {
435
i_error("Invalid message input: %s", error);
438
data.input = i_stream_create_dot(server->input, FALSE);
440
if (dsync_proxy_msg_import_unescaped(pool_datastack_create(),
441
args + 2, &msg, &error) < 0) {
442
i_error("Invalid message input: %s", error);
446
/* we rely on save reading the entire input */
447
server->save_finished = FALSE;
448
net_set_nonblock(server->fd_in, FALSE);
449
dsync_worker_msg_save(server->worker, &msg, &data,
450
cmd_msg_save_callback, server);
451
net_set_nonblock(server->fd_in, TRUE);
452
ret = dsync_worker_has_failed(server->worker) ? -1 : 1;
453
i_assert(server->save_finished);
454
i_assert(data.input->eof || ret < 0);
455
i_stream_destroy(&data.input);
459
static void cmd_msg_get_send_more(struct dsync_proxy_server *server)
461
const unsigned char *data;
465
while (!proxy_server_is_output_full(server)) {
466
ret = i_stream_read_data(server->get_input, &data, &size, 0);
469
o_stream_send(server->output, "\n.\n", 3);
470
i_stream_unref(&server->get_input);
473
/* for now we assume input is blocking */
477
dsync_proxy_send_dot_output(server->output,
478
&server->get_input_last_lf,
480
i_stream_skip(server->get_input, size);
482
o_stream_set_flush_pending(server->output, TRUE);
486
cmd_msg_get_callback(enum dsync_msg_get_result result,
487
const struct dsync_msg_static_data *data, void *context)
489
struct dsync_proxy_server *server = context;
492
i_assert(server->get_uid != 0);
495
case DSYNC_MSG_GET_RESULT_SUCCESS:
497
case DSYNC_MSG_GET_RESULT_EXPUNGED:
498
o_stream_send(server->output, "0\n", 3);
500
case DSYNC_MSG_GET_RESULT_FAILED:
501
o_stream_send(server->output, "-\n", 3);
505
str = t_str_new(128);
506
str_printfa(str, "1\t%u\t", server->get_uid);
507
dsync_proxy_msg_static_export(str, data);
508
str_append_c(str, '\n');
509
o_stream_send(server->output, str_data(str), str_len(str));
511
/* then we'll still have to send the message body. */
512
server->get_input = data->input;
513
cmd_msg_get_send_more(server);
514
if (server->get_input == NULL) {
515
/* if we came here from ioloop, make sure the command gets
516
freed in the output flush callback */
517
o_stream_set_flush_pending(server->output, TRUE);
522
cmd_msg_get(struct dsync_proxy_server *server, const char *const *args)
524
mailbox_guid_t mailbox_guid;
527
if (str_array_length(args) < 2)
530
if (dsync_proxy_mailbox_guid_import(args[0], &mailbox_guid) < 0) {
531
i_error("msg-get: Invalid mailbox GUID '%s'", args[0]);
535
uid = strtoul(args[1], NULL, 10);
539
if (server->get_input != NULL) {
540
i_assert(server->get_uid == uid);
541
cmd_msg_get_send_more(server);
543
server->get_uid = uid;
544
dsync_worker_msg_get(server->worker, &mailbox_guid, uid,
545
cmd_msg_get_callback, server);
547
if (server->get_input != NULL)
553
static void cmd_finish_callback(bool success, void *context)
555
struct dsync_proxy_server *server = context;
560
else if (dsync_worker_has_unexpected_changes(server->worker))
565
server->finished = TRUE;
566
o_stream_send_str(server->output, reply);
570
cmd_finish(struct dsync_proxy_server *server,
571
const char *const *args ATTR_UNUSED)
573
dsync_worker_finish(server->worker, cmd_finish_callback, server);
577
static struct dsync_proxy_server_command commands[] = {
578
{ "BOX-LIST", cmd_box_list },
579
{ "SUBS-LIST", cmd_subs_list },
580
{ "SUBS-SET", cmd_subs_set },
581
{ "MSG-LIST", cmd_msg_list },
582
{ "BOX-CREATE", cmd_box_create },
583
{ "BOX-DELETE", cmd_box_delete },
584
{ "DIR-DELETE", cmd_dir_delete },
585
{ "BOX-RENAME", cmd_box_rename },
586
{ "BOX-UPDATE", cmd_box_update },
587
{ "BOX-SELECT", cmd_box_select },
588
{ "MSG-UPDATE", cmd_msg_update },
589
{ "MSG-UID-CHANGE", cmd_msg_uid_change },
590
{ "MSG-EXPUNGE", cmd_msg_expunge },
591
{ "MSG-COPY", cmd_msg_copy },
592
{ "MSG-SAVE", cmd_msg_save },
593
{ "MSG-GET", cmd_msg_get },
594
{ "FINISH", cmd_finish },
598
struct dsync_proxy_server_command *
599
dsync_proxy_server_command_find(const char *name)
603
for (i = 0; commands[i].name != NULL; i++) {
604
if (strcasecmp(commands[i].name, name) == 0)