1
/* Copyright (c) 2009-2012 Dovecot authors, see the included COPYING file */
6
#include "fd-set-nonblock.h"
8
#include "istream-dot.h"
11
#include "strescape.h"
12
#include "imap-util.h"
13
#include "dsync-proxy.h"
14
#include "dsync-worker-private.h"
19
#define OUTBUF_THROTTLE_SIZE (1024*64)
21
enum proxy_client_request_type {
22
PROXY_CLIENT_REQUEST_TYPE_COPY,
23
PROXY_CLIENT_REQUEST_TYPE_GET,
24
PROXY_CLIENT_REQUEST_TYPE_FINISH
27
struct proxy_client_request {
28
enum proxy_client_request_type type;
31
dsync_worker_msg_callback_t *get;
32
dsync_worker_copy_callback_t *copy;
33
dsync_worker_finish_callback_t *finish;
38
struct proxy_client_dsync_worker_mailbox_iter {
39
struct dsync_worker_mailbox_iter iter;
43
struct proxy_client_dsync_worker_subs_iter {
44
struct dsync_worker_subs_iter iter;
48
struct proxy_client_dsync_worker {
49
struct dsync_worker worker;
52
struct istream *input;
53
struct ostream *output;
54
struct timeout *to, *to_input;
56
mailbox_guid_t selected_box_guid;
58
dsync_worker_save_callback_t *save_callback;
60
struct istream *save_input;
62
bool save_input_last_lf;
65
struct dsync_msg_static_data msg_get_data;
66
ARRAY_DEFINE(request_array, struct proxy_client_request);
67
struct aqueue *request_queue;
68
string_t *pending_commands;
70
unsigned int handshake_received:1;
71
unsigned int finishing:1;
72
unsigned int finished:1;
75
extern struct dsync_worker_vfuncs proxy_client_dsync_worker;
77
static void proxy_client_worker_input(struct proxy_client_dsync_worker *worker);
78
static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker);
80
static void proxy_client_fail(struct proxy_client_dsync_worker *worker)
82
i_stream_close(worker->input);
83
dsync_worker_set_failure(&worker->worker);
84
io_loop_stop(current_ioloop);
88
proxy_client_worker_read_line(struct proxy_client_dsync_worker *worker,
91
if (worker->worker.failed)
94
*line_r = i_stream_read_next_line(worker->input);
95
if (*line_r == NULL) {
96
if (worker->input->stream_errno != 0) {
97
errno = worker->input->stream_errno;
98
i_error("read() from worker server failed: %m");
99
dsync_worker_set_failure(&worker->worker);
102
if (worker->input->eof) {
103
if (!worker->finished)
104
i_error("read() from worker server failed: EOF");
105
dsync_worker_set_failure(&worker->worker);
112
if (!worker->handshake_received) {
113
if (strcmp(*line_r, DSYNC_PROXY_SERVER_GREETING_LINE) != 0) {
114
i_error("Invalid server handshake: %s", *line_r);
115
dsync_worker_set_failure(&worker->worker);
118
worker->handshake_received = TRUE;
119
return proxy_client_worker_read_line(worker, line_r);
125
proxy_client_worker_msg_get_finish(struct proxy_client_dsync_worker *worker)
127
worker->msg_get_data.input = NULL;
128
worker->io = io_add(worker->fd_in, IO_READ,
129
proxy_client_worker_input, worker);
131
/* some input may already be buffered. note that we may be coming here
132
from the input function itself, in which case this timeout must not
133
be called (we'll remove it later) */
134
if (worker->to_input == NULL) {
136
timeout_add(0, proxy_client_worker_input, worker);
141
proxy_client_worker_read_to_eof(struct proxy_client_dsync_worker *worker)
143
struct istream *input = worker->msg_get_data.input;
144
const unsigned char *data;
148
while ((ret = i_stream_read_data(input, &data, &size, 0)) > 0)
149
i_stream_skip(input, size);
151
i_stream_unref(&input);
152
io_remove(&worker->io);
153
proxy_client_worker_msg_get_finish(worker);
155
timeout_reset(worker->to);
159
proxy_client_worker_msg_get_done(struct proxy_client_dsync_worker *worker)
161
struct istream *input = worker->msg_get_data.input;
163
i_assert(worker->io == NULL);
166
proxy_client_worker_msg_get_finish(worker);
168
/* saving read the message only partially. we'll need to read
169
the input until EOF or we'll start treating the input as
171
worker->io = io_add(worker->fd_in, IO_READ,
172
proxy_client_worker_read_to_eof, worker);
173
worker->msg_get_data.input =
174
i_stream_create_dot(worker->input, FALSE);
179
proxy_client_worker_next_copy(struct proxy_client_dsync_worker *worker,
180
const struct proxy_client_request *request,
186
if (line[0] == '1' && line[1] == '\t')
188
else if (line[0] == '0' && line[1] == '\t')
191
i_error("msg-copy returned invalid input: %s", line);
192
proxy_client_fail(worker);
195
uid = strtoul(line + 2, NULL, 10);
196
if (uid != request->uid) {
197
i_error("msg-copy returned invalid uid: %u != %u",
199
proxy_client_fail(worker);
203
request->callback.copy(success, request->context);
208
proxy_client_worker_next_msg_get(struct proxy_client_dsync_worker *worker,
209
const struct proxy_client_request *request,
212
enum dsync_msg_get_result result = DSYNC_MSG_GET_RESULT_FAILED;
213
const char *p, *error;
216
p_clear(worker->msg_get_pool);
224
if ((p = strchr(line, '\t')) == NULL)
226
uid = strtoul(t_strcut(line, '\t'), NULL, 10);
229
if (uid != request->uid) {
230
i_error("msg-get returned invalid uid: %u != %u",
232
proxy_client_fail(worker);
236
if (dsync_proxy_msg_static_import(worker->msg_get_pool,
237
line, &worker->msg_get_data,
239
i_error("Invalid msg-get static input: %s", error);
240
proxy_client_fail(worker);
243
worker->msg_get_data.input =
244
i_stream_create_dot(worker->input, FALSE);
245
i_stream_set_destroy_callback(worker->msg_get_data.input,
246
proxy_client_worker_msg_get_done,
248
io_remove(&worker->io);
249
result = DSYNC_MSG_GET_RESULT_SUCCESS;
253
result = DSYNC_MSG_GET_RESULT_EXPUNGED;
260
request->callback.get(result, &worker->msg_get_data, request->context);
261
return worker->io != NULL && worker->msg_get_data.input == NULL;
265
proxy_client_worker_next_finish(struct proxy_client_dsync_worker *worker,
266
const struct proxy_client_request *request,
271
i_assert(worker->finishing);
272
i_assert(!worker->finished);
274
worker->finishing = FALSE;
275
worker->finished = TRUE;
277
if (strcmp(line, "changes") == 0)
278
worker->worker.unexpected_changes = TRUE;
279
else if (strcmp(line, "fail") == 0)
281
else if (strcmp(line, "ok") != 0) {
282
i_error("Unexpected finish reply: %s", line);
286
request->callback.finish(success, request->context);
290
proxy_client_worker_next_reply(struct proxy_client_dsync_worker *worker,
293
const struct proxy_client_request *requests;
294
struct proxy_client_request request;
297
i_assert(worker->msg_get_data.input == NULL);
299
if (aqueue_count(worker->request_queue) == 0) {
300
i_error("Unexpected reply from server: %s", line);
301
proxy_client_fail(worker);
305
requests = array_idx(&worker->request_array, 0);
306
request = requests[aqueue_idx(worker->request_queue, 0)];
307
aqueue_delete_tail(worker->request_queue);
309
switch (request.type) {
310
case PROXY_CLIENT_REQUEST_TYPE_COPY:
311
ret = proxy_client_worker_next_copy(worker, &request, line);
313
case PROXY_CLIENT_REQUEST_TYPE_GET:
314
ret = proxy_client_worker_next_msg_get(worker, &request, line);
316
case PROXY_CLIENT_REQUEST_TYPE_FINISH:
317
proxy_client_worker_next_finish(worker, &request, line);
323
static void proxy_client_worker_input(struct proxy_client_dsync_worker *worker)
328
if (worker->to_input != NULL)
329
timeout_remove(&worker->to_input);
331
if (worker->worker.input_callback != NULL) {
332
worker->worker.input_callback(worker->worker.input_context);
333
timeout_reset(worker->to);
337
while ((ret = proxy_client_worker_read_line(worker, &line)) > 0) {
338
if (!proxy_client_worker_next_reply(worker, line))
342
/* try to continue */
343
proxy_client_worker_next_reply(worker, "");
346
if (worker->to_input != NULL) {
347
/* input stream's destroy callback was already called.
348
don't get back here. */
349
timeout_remove(&worker->to_input);
351
timeout_reset(worker->to);
355
proxy_client_worker_output_real(struct proxy_client_dsync_worker *worker)
359
if ((ret = o_stream_flush(worker->output)) < 0)
362
if (worker->save_input != NULL) {
363
/* proxy_client_worker_msg_save() hasn't finished yet. */
364
o_stream_cork(worker->output);
365
proxy_client_send_stream(worker);
366
if (worker->save_input != NULL) {
367
/* still unfinished, make sure we get called again */
372
if (worker->worker.output_callback != NULL)
373
worker->worker.output_callback(worker->worker.output_context);
377
static int proxy_client_worker_output(struct proxy_client_dsync_worker *worker)
381
ret = proxy_client_worker_output_real(worker);
382
timeout_reset(worker->to);
387
proxy_client_worker_timeout(struct proxy_client_dsync_worker *worker)
391
if (worker->save_io != NULL)
392
reason = " (waiting for more input from mail being saved)";
393
else if (worker->save_input != NULL) {
394
size_t bytes = o_stream_get_buffer_used_size(worker->output);
396
reason = t_strdup_printf(" (waiting for output stream to flush, "
397
"%"PRIuSIZE_T" bytes left)", bytes);
398
} else if (worker->msg_get_data.input != NULL) {
399
reason = " (waiting for MSG-GET message from remote)";
403
i_error("proxy client timed out%s", reason);
404
proxy_client_fail(worker);
407
struct dsync_worker *dsync_worker_init_proxy_client(int fd_in, int fd_out)
409
struct proxy_client_dsync_worker *worker;
411
worker = i_new(struct proxy_client_dsync_worker, 1);
412
worker->worker.v = proxy_client_dsync_worker;
413
worker->fd_in = fd_in;
414
worker->fd_out = fd_out;
415
worker->to = timeout_add(DSYNC_PROXY_CLIENT_TIMEOUT_MSECS,
416
proxy_client_worker_timeout, worker);
417
worker->io = io_add(fd_in, IO_READ, proxy_client_worker_input, worker);
418
worker->input = i_stream_create_fd(fd_in, (size_t)-1, FALSE);
419
worker->output = o_stream_create_fd(fd_out, (size_t)-1, FALSE);
420
o_stream_send_str(worker->output, DSYNC_PROXY_CLIENT_GREETING_LINE"\n");
421
/* we'll keep the output corked until flush is needed */
422
o_stream_cork(worker->output);
423
o_stream_set_flush_callback(worker->output, proxy_client_worker_output,
425
fd_set_nonblock(fd_in, TRUE);
426
fd_set_nonblock(fd_out, TRUE);
428
worker->pending_commands = str_new(default_pool, 1024);
429
worker->msg_get_pool = pool_alloconly_create("dsync proxy msg", 128);
430
i_array_init(&worker->request_array, 64);
431
worker->request_queue = aqueue_init(&worker->request_array.arr);
433
return &worker->worker;
436
static void proxy_client_worker_deinit(struct dsync_worker *_worker)
438
struct proxy_client_dsync_worker *worker =
439
(struct proxy_client_dsync_worker *)_worker;
441
timeout_remove(&worker->to);
442
if (worker->to_input != NULL)
443
timeout_remove(&worker->to_input);
444
if (worker->io != NULL)
445
io_remove(&worker->io);
446
i_stream_destroy(&worker->input);
447
o_stream_destroy(&worker->output);
448
if (close(worker->fd_in) < 0)
449
i_error("close(worker input) failed: %m");
450
if (worker->fd_in != worker->fd_out) {
451
if (close(worker->fd_out) < 0)
452
i_error("close(worker output) failed: %m");
454
aqueue_deinit(&worker->request_queue);
455
array_free(&worker->request_array);
456
pool_unref(&worker->msg_get_pool);
457
str_free(&worker->pending_commands);
462
worker_is_output_stream_full(struct proxy_client_dsync_worker *worker)
464
return o_stream_get_buffer_used_size(worker->output) >=
465
OUTBUF_THROTTLE_SIZE;
468
static bool proxy_client_worker_is_output_full(struct dsync_worker *_worker)
470
struct proxy_client_dsync_worker *worker =
471
(struct proxy_client_dsync_worker *)_worker;
473
if (worker->save_input != NULL) {
474
/* we haven't finished sending a message save, so we're full. */
477
return worker_is_output_stream_full(worker);
480
static int proxy_client_worker_output_flush(struct dsync_worker *_worker)
482
struct proxy_client_dsync_worker *worker =
483
(struct proxy_client_dsync_worker *)_worker;
486
if (o_stream_flush(worker->output) < 0)
489
o_stream_uncork(worker->output);
490
if (o_stream_get_buffer_used_size(worker->output) > 0)
493
if (o_stream_send(worker->output, str_data(worker->pending_commands),
494
str_len(worker->pending_commands)) < 0)
496
str_truncate(worker->pending_commands, 0);
497
o_stream_cork(worker->output);
501
static struct dsync_worker_mailbox_iter *
502
proxy_client_worker_mailbox_iter_init(struct dsync_worker *_worker)
504
struct proxy_client_dsync_worker *worker =
505
(struct proxy_client_dsync_worker *)_worker;
506
struct proxy_client_dsync_worker_mailbox_iter *iter;
508
iter = i_new(struct proxy_client_dsync_worker_mailbox_iter, 1);
509
iter->iter.worker = _worker;
510
iter->pool = pool_alloconly_create("proxy mailbox iter", 1024);
511
o_stream_send_str(worker->output, "BOX-LIST\n");
512
(void)proxy_client_worker_output_flush(_worker);
517
proxy_client_worker_mailbox_iter_next(struct dsync_worker_mailbox_iter *_iter,
518
struct dsync_mailbox *dsync_box_r)
520
struct proxy_client_dsync_worker_mailbox_iter *iter =
521
(struct proxy_client_dsync_worker_mailbox_iter *)_iter;
522
struct proxy_client_dsync_worker *worker =
523
(struct proxy_client_dsync_worker *)_iter->worker;
524
const char *line, *error;
527
if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
529
_iter->failed = TRUE;
533
if ((line[0] == '+' || line[0] == '-') && line[1] == '\0') {
534
/* end of mailboxes */
535
if (line[0] == '-') {
536
i_error("Worker server's mailbox iteration failed");
537
_iter->failed = TRUE;
543
if (dsync_proxy_mailbox_import(iter->pool, line,
544
dsync_box_r, &error) < 0) {
545
i_error("Invalid mailbox input from worker server: %s", error);
546
_iter->failed = TRUE;
553
proxy_client_worker_mailbox_iter_deinit(struct dsync_worker_mailbox_iter *_iter)
555
struct proxy_client_dsync_worker_mailbox_iter *iter =
556
(struct proxy_client_dsync_worker_mailbox_iter *)_iter;
557
int ret = _iter->failed ? -1 : 0;
559
pool_unref(&iter->pool);
564
static struct dsync_worker_subs_iter *
565
proxy_client_worker_subs_iter_init(struct dsync_worker *_worker)
567
struct proxy_client_dsync_worker *worker =
568
(struct proxy_client_dsync_worker *)_worker;
569
struct proxy_client_dsync_worker_subs_iter *iter;
571
iter = i_new(struct proxy_client_dsync_worker_subs_iter, 1);
572
iter->iter.worker = _worker;
573
iter->pool = pool_alloconly_create("proxy subscription iter", 1024);
574
o_stream_send_str(worker->output, "SUBS-LIST\n");
575
(void)proxy_client_worker_output_flush(_worker);
580
proxy_client_worker_subs_iter_next_line(struct proxy_client_dsync_worker_subs_iter *iter,
581
unsigned int wanted_arg_count,
584
struct proxy_client_dsync_worker *worker =
585
(struct proxy_client_dsync_worker *)iter->iter.worker;
590
if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
592
iter->iter.failed = TRUE;
596
if ((line[0] == '+' || line[0] == '-') && line[1] == '\0') {
597
/* end of subscribed subscriptions */
598
if (line[0] == '-') {
599
i_error("Worker server's subscription iteration failed");
600
iter->iter.failed = TRUE;
606
args = p_strsplit(iter->pool, line, "\t");
607
if (str_array_length((const char *const *)args) < wanted_arg_count) {
608
i_error("Invalid subscription input from worker server");
609
iter->iter.failed = TRUE;
617
proxy_client_worker_subs_iter_next(struct dsync_worker_subs_iter *_iter,
618
struct dsync_worker_subscription *rec_r)
620
struct proxy_client_dsync_worker_subs_iter *iter =
621
(struct proxy_client_dsync_worker_subs_iter *)_iter;
625
ret = proxy_client_worker_subs_iter_next_line(iter, 4, &args);
629
rec_r->vname = str_tabunescape(args[0]);
630
rec_r->storage_name = str_tabunescape(args[1]);
631
rec_r->ns_prefix = str_tabunescape(args[2]);
632
rec_r->last_change = strtoul(args[3], NULL, 10);
637
proxy_client_worker_subs_iter_next_un(struct dsync_worker_subs_iter *_iter,
638
struct dsync_worker_unsubscription *rec_r)
640
struct proxy_client_dsync_worker_subs_iter *iter =
641
(struct proxy_client_dsync_worker_subs_iter *)_iter;
645
ret = proxy_client_worker_subs_iter_next_line(iter, 3, &args);
649
memset(rec_r, 0, sizeof(*rec_r));
650
if (dsync_proxy_mailbox_guid_import(args[0], &rec_r->name_sha1) < 0) {
651
i_error("Invalid subscription input from worker server: "
652
"Invalid unsubscription mailbox GUID");
653
iter->iter.failed = TRUE;
656
rec_r->ns_prefix = str_tabunescape(args[1]);
657
rec_r->last_change = strtoul(args[2], NULL, 10);
662
proxy_client_worker_subs_iter_deinit(struct dsync_worker_subs_iter *_iter)
664
struct proxy_client_dsync_worker_subs_iter *iter =
665
(struct proxy_client_dsync_worker_subs_iter *)_iter;
666
int ret = _iter->failed ? -1 : 0;
668
pool_unref(&iter->pool);
674
proxy_client_worker_set_subscribed(struct dsync_worker *_worker,
675
const char *name, time_t last_change,
678
struct proxy_client_dsync_worker *worker =
679
(struct proxy_client_dsync_worker *)_worker;
682
string_t *str = t_str_new(128);
684
str_append(str, "SUBS-SET\t");
685
str_tabescape_write(str, name);
686
str_printfa(str, "\t%s\t%d\n", dec2str(last_change),
688
o_stream_send(worker->output, str_data(str), str_len(str));
692
struct proxy_client_dsync_worker_msg_iter {
693
struct dsync_worker_msg_iter iter;
698
static struct dsync_worker_msg_iter *
699
proxy_client_worker_msg_iter_init(struct dsync_worker *_worker,
700
const mailbox_guid_t mailboxes[],
701
unsigned int mailbox_count)
703
struct proxy_client_dsync_worker *worker =
704
(struct proxy_client_dsync_worker *)_worker;
705
struct proxy_client_dsync_worker_msg_iter *iter;
709
iter = i_new(struct proxy_client_dsync_worker_msg_iter, 1);
710
iter->iter.worker = _worker;
711
iter->pool = pool_alloconly_create("proxy message iter", 10240);
713
str = str_new(iter->pool, 512);
714
str_append(str, "MSG-LIST");
715
for (i = 0; i < mailbox_count; i++) T_BEGIN {
716
str_append_c(str, '\t');
717
dsync_proxy_mailbox_guid_export(str, &mailboxes[i]);
719
str_append_c(str, '\n');
720
o_stream_send(worker->output, str_data(str), str_len(str));
723
(void)proxy_client_worker_output_flush(_worker);
728
proxy_client_worker_msg_iter_next(struct dsync_worker_msg_iter *_iter,
729
unsigned int *mailbox_idx_r,
730
struct dsync_message *msg_r)
732
struct proxy_client_dsync_worker_msg_iter *iter =
733
(struct proxy_client_dsync_worker_msg_iter *)_iter;
734
struct proxy_client_dsync_worker *worker =
735
(struct proxy_client_dsync_worker *)_iter->worker;
736
const char *line, *error;
742
if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
744
_iter->failed = TRUE;
748
if ((line[0] == '+' || line[0] == '-') && line[1] == '\0') {
749
/* end of messages */
750
if (line[0] == '-') {
751
i_error("Worker server's message iteration failed");
752
_iter->failed = TRUE;
759
while (*line >= '0' && *line <= '9') {
760
*mailbox_idx_r = *mailbox_idx_r * 10 + (*line - '0');
764
i_error("Invalid mailbox idx from worker server");
765
_iter->failed = TRUE;
771
if (dsync_proxy_msg_import(iter->pool, line, msg_r, &error) < 0) {
772
i_error("Invalid message input from worker server: %s", error);
773
_iter->failed = TRUE;
780
proxy_client_worker_msg_iter_deinit(struct dsync_worker_msg_iter *_iter)
782
struct proxy_client_dsync_worker_msg_iter *iter =
783
(struct proxy_client_dsync_worker_msg_iter *)_iter;
784
int ret = _iter->failed ? -1 : 0;
786
pool_unref(&iter->pool);
792
proxy_client_worker_cmd(struct proxy_client_dsync_worker *worker, string_t *str)
794
if (worker->save_input == NULL)
795
o_stream_send(worker->output, str_data(str), str_len(str));
797
str_append_str(worker->pending_commands, str);
801
proxy_client_worker_create_mailbox(struct dsync_worker *_worker,
802
const struct dsync_mailbox *dsync_box)
804
struct proxy_client_dsync_worker *worker =
805
(struct proxy_client_dsync_worker *)_worker;
808
string_t *str = t_str_new(128);
810
str_append(str, "BOX-CREATE\t");
811
dsync_proxy_mailbox_export(str, dsync_box);
812
str_append_c(str, '\n');
813
proxy_client_worker_cmd(worker, str);
818
proxy_client_worker_delete_mailbox(struct dsync_worker *_worker,
819
const struct dsync_mailbox *dsync_box)
821
struct proxy_client_dsync_worker *worker =
822
(struct proxy_client_dsync_worker *)_worker;
825
string_t *str = t_str_new(128);
827
str_append(str, "BOX-DELETE\t");
828
dsync_proxy_mailbox_guid_export(str, &dsync_box->mailbox_guid);
829
str_printfa(str, "\t%s\n", dec2str(dsync_box->last_change));
830
proxy_client_worker_cmd(worker, str);
835
proxy_client_worker_delete_dir(struct dsync_worker *_worker,
836
const struct dsync_mailbox *dsync_box)
838
struct proxy_client_dsync_worker *worker =
839
(struct proxy_client_dsync_worker *)_worker;
842
string_t *str = t_str_new(128);
844
str_append(str, "DIR-DELETE\t");
845
str_tabescape_write(str, dsync_box->name);
846
str_printfa(str, "\t%s\n", dec2str(dsync_box->last_change));
847
proxy_client_worker_cmd(worker, str);
852
proxy_client_worker_rename_mailbox(struct dsync_worker *_worker,
853
const mailbox_guid_t *mailbox,
854
const struct dsync_mailbox *dsync_box)
856
struct proxy_client_dsync_worker *worker =
857
(struct proxy_client_dsync_worker *)_worker;
861
string_t *str = t_str_new(128);
863
str_append(str, "BOX-RENAME\t");
864
dsync_proxy_mailbox_guid_export(str, mailbox);
865
str_append_c(str, '\t');
866
str_tabescape_write(str, dsync_box->name);
867
str_append_c(str, '\t');
868
sep[0] = dsync_box->name_sep; sep[1] = '\0';
869
str_tabescape_write(str, sep);
870
str_append_c(str, '\n');
871
proxy_client_worker_cmd(worker, str);
876
proxy_client_worker_update_mailbox(struct dsync_worker *_worker,
877
const struct dsync_mailbox *dsync_box)
879
struct proxy_client_dsync_worker *worker =
880
(struct proxy_client_dsync_worker *)_worker;
883
string_t *str = t_str_new(128);
885
str_append(str, "BOX-UPDATE\t");
886
dsync_proxy_mailbox_export(str, dsync_box);
887
str_append_c(str, '\n');
888
proxy_client_worker_cmd(worker, str);
893
proxy_client_worker_select_mailbox(struct dsync_worker *_worker,
894
const mailbox_guid_t *mailbox,
895
const ARRAY_TYPE(mailbox_cache_field) *cache_fields)
897
struct proxy_client_dsync_worker *worker =
898
(struct proxy_client_dsync_worker *)_worker;
900
if (dsync_guid_equals(&worker->selected_box_guid, mailbox))
902
worker->selected_box_guid = *mailbox;
905
string_t *str = t_str_new(128);
907
str_append(str, "BOX-SELECT\t");
908
dsync_proxy_mailbox_guid_export(str, mailbox);
909
if (cache_fields != NULL)
910
dsync_proxy_cache_fields_export(str, cache_fields);
911
str_append_c(str, '\n');
912
proxy_client_worker_cmd(worker, str);
917
proxy_client_worker_msg_update_metadata(struct dsync_worker *_worker,
918
const struct dsync_message *msg)
920
struct proxy_client_dsync_worker *worker =
921
(struct proxy_client_dsync_worker *)_worker;
924
string_t *str = t_str_new(128);
926
str_printfa(str, "MSG-UPDATE\t%u\t%llu\t", msg->uid,
927
(unsigned long long)msg->modseq);
928
imap_write_flags(str, msg->flags, msg->keywords);
929
str_append_c(str, '\n');
930
proxy_client_worker_cmd(worker, str);
935
proxy_client_worker_msg_update_uid(struct dsync_worker *_worker,
936
uint32_t old_uid, uint32_t new_uid)
938
struct proxy_client_dsync_worker *worker =
939
(struct proxy_client_dsync_worker *)_worker;
942
string_t *str = t_str_new(64);
943
str_printfa(str, "MSG-UID-CHANGE\t%u\t%u\n", old_uid, new_uid);
944
proxy_client_worker_cmd(worker, str);
949
proxy_client_worker_msg_expunge(struct dsync_worker *_worker, uint32_t uid)
951
struct proxy_client_dsync_worker *worker =
952
(struct proxy_client_dsync_worker *)_worker;
955
string_t *str = t_str_new(64);
956
str_printfa(str, "MSG-EXPUNGE\t%u\n", uid);
957
proxy_client_worker_cmd(worker, str);
962
proxy_client_worker_msg_copy(struct dsync_worker *_worker,
963
const mailbox_guid_t *src_mailbox,
965
const struct dsync_message *dest_msg,
966
dsync_worker_copy_callback_t *callback,
969
struct proxy_client_dsync_worker *worker =
970
(struct proxy_client_dsync_worker *)_worker;
971
struct proxy_client_request request;
974
string_t *str = t_str_new(128);
976
str_append(str, "MSG-COPY\t");
977
dsync_proxy_mailbox_guid_export(str, src_mailbox);
978
str_printfa(str, "\t%u\t", src_uid);
979
dsync_proxy_msg_export(str, dest_msg);
980
str_append_c(str, '\n');
981
proxy_client_worker_cmd(worker, str);
984
memset(&request, 0, sizeof(request));
985
request.type = PROXY_CLIENT_REQUEST_TYPE_COPY;
986
request.callback.copy = callback;
987
request.context = context;
988
request.uid = src_uid;
989
aqueue_append(worker->request_queue, &request);
993
proxy_client_send_stream_real(struct proxy_client_dsync_worker *worker)
995
dsync_worker_save_callback_t *callback;
997
struct istream *input;
998
const unsigned char *data;
1002
while ((ret = i_stream_read_data(worker->save_input,
1003
&data, &size, 0)) > 0) {
1004
dsync_proxy_send_dot_output(worker->output,
1005
&worker->save_input_last_lf,
1007
i_stream_skip(worker->save_input, size);
1009
if (worker_is_output_stream_full(worker)) {
1010
o_stream_uncork(worker->output);
1011
if (worker_is_output_stream_full(worker))
1013
o_stream_cork(worker->output);
1017
/* waiting for more input */
1018
o_stream_uncork(worker->output);
1019
if (worker->save_io == NULL) {
1020
int fd = i_stream_get_fd(worker->save_input);
1024
proxy_client_send_stream, worker);
1028
if (worker->save_io != NULL)
1029
io_remove(&worker->save_io);
1030
if (worker->save_input->stream_errno != 0) {
1031
errno = worker->save_input->stream_errno;
1032
i_error("proxy: reading message input failed: %m");
1033
o_stream_close(worker->output);
1035
i_assert(!i_stream_have_bytes_left(worker->save_input));
1036
o_stream_send(worker->output, "\n.\n", 3);
1039
callback = worker->save_callback;
1040
context = worker->save_context;
1041
worker->save_callback = NULL;
1042
worker->save_context = NULL;
1044
/* a bit ugly way to free the stream. the problem is that local worker
1045
has set a destroy callback, which in turn can call our msg_save()
1046
again before the i_stream_unref() is finished. */
1047
input = worker->save_input;
1048
worker->save_input = NULL;
1049
i_stream_unref(&input);
1051
(void)proxy_client_worker_output_flush(&worker->worker);
1056
static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker)
1058
proxy_client_send_stream_real(worker);
1059
timeout_reset(worker->to);
1063
proxy_client_worker_msg_save(struct dsync_worker *_worker,
1064
const struct dsync_message *msg,
1065
const struct dsync_msg_static_data *data,
1066
dsync_worker_save_callback_t *callback,
1069
struct proxy_client_dsync_worker *worker =
1070
(struct proxy_client_dsync_worker *)_worker;
1073
string_t *str = t_str_new(128);
1075
str_append(str, "MSG-SAVE\t");
1076
dsync_proxy_msg_static_export(str, data);
1077
str_append_c(str, '\t');
1078
dsync_proxy_msg_export(str, msg);
1079
str_append_c(str, '\n');
1080
proxy_client_worker_cmd(worker, str);
1083
i_assert(worker->save_input == NULL);
1084
worker->save_callback = callback;
1085
worker->save_context = context;
1086
worker->save_input = data->input;
1087
worker->save_input_last_lf = TRUE;
1088
i_stream_ref(worker->save_input);
1089
proxy_client_send_stream(worker);
1093
proxy_client_worker_msg_save_cancel(struct dsync_worker *_worker)
1095
struct proxy_client_dsync_worker *worker =
1096
(struct proxy_client_dsync_worker *)_worker;
1098
if (worker->save_io != NULL)
1099
io_remove(&worker->save_io);
1100
if (worker->save_input != NULL)
1101
i_stream_unref(&worker->save_input);
1105
proxy_client_worker_msg_get(struct dsync_worker *_worker,
1106
const mailbox_guid_t *mailbox, uint32_t uid,
1107
dsync_worker_msg_callback_t *callback,
1110
struct proxy_client_dsync_worker *worker =
1111
(struct proxy_client_dsync_worker *)_worker;
1112
struct proxy_client_request request;
1115
string_t *str = t_str_new(128);
1117
str_append(str, "MSG-GET\t");
1118
dsync_proxy_mailbox_guid_export(str, mailbox);
1119
str_printfa(str, "\t%u\n", uid);
1120
proxy_client_worker_cmd(worker, str);
1123
memset(&request, 0, sizeof(request));
1124
request.type = PROXY_CLIENT_REQUEST_TYPE_GET;
1125
request.callback.get = callback;
1126
request.context = context;
1128
aqueue_append(worker->request_queue, &request);
1132
proxy_client_worker_finish(struct dsync_worker *_worker,
1133
dsync_worker_finish_callback_t *callback,
1136
struct proxy_client_dsync_worker *worker =
1137
(struct proxy_client_dsync_worker *)_worker;
1138
struct proxy_client_request request;
1140
i_assert(worker->save_input == NULL);
1141
i_assert(!worker->finishing);
1143
worker->finishing = TRUE;
1144
worker->finished = FALSE;
1146
o_stream_send_str(worker->output, "FINISH\n");
1147
o_stream_uncork(worker->output);
1149
memset(&request, 0, sizeof(request));
1150
request.type = PROXY_CLIENT_REQUEST_TYPE_FINISH;
1151
request.callback.finish = callback;
1152
request.context = context;
1153
aqueue_append(worker->request_queue, &request);
1156
struct dsync_worker_vfuncs proxy_client_dsync_worker = {
1157
proxy_client_worker_deinit,
1159
proxy_client_worker_is_output_full,
1160
proxy_client_worker_output_flush,
1162
proxy_client_worker_mailbox_iter_init,
1163
proxy_client_worker_mailbox_iter_next,
1164
proxy_client_worker_mailbox_iter_deinit,
1166
proxy_client_worker_subs_iter_init,
1167
proxy_client_worker_subs_iter_next,
1168
proxy_client_worker_subs_iter_next_un,
1169
proxy_client_worker_subs_iter_deinit,
1170
proxy_client_worker_set_subscribed,
1172
proxy_client_worker_msg_iter_init,
1173
proxy_client_worker_msg_iter_next,
1174
proxy_client_worker_msg_iter_deinit,
1176
proxy_client_worker_create_mailbox,
1177
proxy_client_worker_delete_mailbox,
1178
proxy_client_worker_delete_dir,
1179
proxy_client_worker_rename_mailbox,
1180
proxy_client_worker_update_mailbox,
1182
proxy_client_worker_select_mailbox,
1183
proxy_client_worker_msg_update_metadata,
1184
proxy_client_worker_msg_update_uid,
1185
proxy_client_worker_msg_expunge,
1186
proxy_client_worker_msg_copy,
1187
proxy_client_worker_msg_save,
1188
proxy_client_worker_msg_save_cancel,
1189
proxy_client_worker_msg_get,
1190
proxy_client_worker_finish