1
/* Copyright (c) 2009-2011 Dovecot authors, see the included COPYING file */
6
#include "fd-set-nonblock.h"
8
#include "istream-dot.h"
11
#include "strescape.h"
12
#include "master-service.h"
13
#include "imap-util.h"
14
#include "dsync-proxy.h"
15
#include "dsync-worker-private.h"
20
#define OUTBUF_THROTTLE_SIZE (1024*64)
22
enum proxy_client_request_type {
23
PROXY_CLIENT_REQUEST_TYPE_COPY,
24
PROXY_CLIENT_REQUEST_TYPE_GET,
25
PROXY_CLIENT_REQUEST_TYPE_FINISH
28
struct proxy_client_request {
29
enum proxy_client_request_type type;
32
dsync_worker_msg_callback_t *get;
33
dsync_worker_copy_callback_t *copy;
34
dsync_worker_finish_callback_t *finish;
39
struct proxy_client_dsync_worker_mailbox_iter {
40
struct dsync_worker_mailbox_iter iter;
44
struct proxy_client_dsync_worker_subs_iter {
45
struct dsync_worker_subs_iter iter;
49
struct proxy_client_dsync_worker {
50
struct dsync_worker worker;
53
struct istream *input;
54
struct ostream *output;
55
struct timeout *to, *to_input;
57
mailbox_guid_t selected_box_guid;
59
dsync_worker_save_callback_t *save_callback;
61
struct istream *save_input;
63
bool save_input_last_lf;
66
struct dsync_msg_static_data msg_get_data;
67
ARRAY_DEFINE(request_array, struct proxy_client_request);
68
struct aqueue *request_queue;
69
string_t *pending_commands;
71
unsigned int handshake_received:1;
72
unsigned int finishing:1;
73
unsigned int finished:1;
76
extern struct dsync_worker_vfuncs proxy_client_dsync_worker;
78
static void proxy_client_worker_input(struct proxy_client_dsync_worker *worker);
79
static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker);
81
static void proxy_client_fail(struct proxy_client_dsync_worker *worker)
83
i_stream_close(worker->input);
84
dsync_worker_set_failure(&worker->worker);
85
master_service_stop(master_service);
89
proxy_client_worker_read_line(struct proxy_client_dsync_worker *worker,
92
if (worker->worker.failed)
95
*line_r = i_stream_read_next_line(worker->input);
96
if (*line_r == NULL) {
97
if (worker->input->stream_errno != 0) {
98
errno = worker->input->stream_errno;
99
i_error("read() from worker server failed: %m");
100
dsync_worker_set_failure(&worker->worker);
103
if (worker->input->eof) {
104
if (!worker->finished)
105
i_error("read() from worker server failed: EOF");
106
dsync_worker_set_failure(&worker->worker);
113
if (!worker->handshake_received) {
114
if (strcmp(*line_r, DSYNC_PROXY_SERVER_GREETING_LINE) != 0) {
115
i_error("Invalid server handshake: %s", *line_r);
116
dsync_worker_set_failure(&worker->worker);
119
worker->handshake_received = TRUE;
120
return proxy_client_worker_read_line(worker, line_r);
126
proxy_client_worker_msg_get_finish(struct proxy_client_dsync_worker *worker)
128
worker->msg_get_data.input = NULL;
129
worker->io = io_add(worker->fd_in, IO_READ,
130
proxy_client_worker_input, worker);
132
/* some input may already be buffered. note that we may be coming here
133
from the input function itself, in which case this timeout must not
134
be called (we'll remove it later) */
135
if (worker->to_input == NULL) {
137
timeout_add(0, proxy_client_worker_input, worker);
142
proxy_client_worker_read_to_eof(struct proxy_client_dsync_worker *worker)
144
struct istream *input = worker->msg_get_data.input;
145
const unsigned char *data;
149
while ((ret = i_stream_read_data(input, &data, &size, 0)) > 0)
150
i_stream_skip(input, size);
152
i_stream_unref(&input);
153
io_remove(&worker->io);
154
proxy_client_worker_msg_get_finish(worker);
156
timeout_reset(worker->to);
160
proxy_client_worker_msg_get_done(struct proxy_client_dsync_worker *worker)
162
struct istream *input = worker->msg_get_data.input;
164
i_assert(worker->io == NULL);
167
proxy_client_worker_msg_get_finish(worker);
169
/* saving read the message only partially. we'll need to read
170
the input until EOF or we'll start treating the input as
172
worker->io = io_add(worker->fd_in, IO_READ,
173
proxy_client_worker_read_to_eof, worker);
174
worker->msg_get_data.input =
175
i_stream_create_dot(worker->input, FALSE);
180
proxy_client_worker_next_copy(struct proxy_client_dsync_worker *worker,
181
const struct proxy_client_request *request,
187
if (line[0] == '1' && line[1] == '\t')
189
else if (line[0] == '0' && line[1] == '\t')
192
i_error("msg-copy returned invalid input: %s", line);
193
proxy_client_fail(worker);
196
uid = strtoul(line + 2, NULL, 10);
197
if (uid != request->uid) {
198
i_error("msg-copy returned invalid uid: %u != %u",
200
proxy_client_fail(worker);
204
request->callback.copy(success, request->context);
209
proxy_client_worker_next_msg_get(struct proxy_client_dsync_worker *worker,
210
const struct proxy_client_request *request,
213
enum dsync_msg_get_result result = DSYNC_MSG_GET_RESULT_FAILED;
214
const char *p, *error;
217
p_clear(worker->msg_get_pool);
225
if ((p = strchr(line, '\t')) == NULL)
227
uid = strtoul(t_strcut(line, '\t'), NULL, 10);
230
if (uid != request->uid) {
231
i_error("msg-get returned invalid uid: %u != %u",
233
proxy_client_fail(worker);
237
if (dsync_proxy_msg_static_import(worker->msg_get_pool,
238
line, &worker->msg_get_data,
240
i_error("Invalid msg-get static input: %s", error);
241
proxy_client_fail(worker);
244
worker->msg_get_data.input =
245
i_stream_create_dot(worker->input, FALSE);
246
i_stream_set_destroy_callback(worker->msg_get_data.input,
247
proxy_client_worker_msg_get_done,
249
io_remove(&worker->io);
250
result = DSYNC_MSG_GET_RESULT_SUCCESS;
254
result = DSYNC_MSG_GET_RESULT_EXPUNGED;
261
request->callback.get(result, &worker->msg_get_data, request->context);
262
return worker->io != NULL && worker->msg_get_data.input == NULL;
266
proxy_client_worker_next_finish(struct proxy_client_dsync_worker *worker,
267
const struct proxy_client_request *request,
272
i_assert(worker->finishing);
273
i_assert(!worker->finished);
275
worker->finishing = FALSE;
276
worker->finished = TRUE;
278
if (strcmp(line, "changes") == 0)
279
worker->worker.unexpected_changes = TRUE;
280
else if (strcmp(line, "fail") == 0)
282
else if (strcmp(line, "ok") != 0) {
283
i_error("Unexpected finish reply: %s", line);
287
request->callback.finish(success, request->context);
291
proxy_client_worker_next_reply(struct proxy_client_dsync_worker *worker,
294
const struct proxy_client_request *requests;
295
struct proxy_client_request request;
298
i_assert(worker->msg_get_data.input == NULL);
300
if (aqueue_count(worker->request_queue) == 0) {
301
i_error("Unexpected reply from server: %s", line);
302
proxy_client_fail(worker);
306
requests = array_idx(&worker->request_array, 0);
307
request = requests[aqueue_idx(worker->request_queue, 0)];
308
aqueue_delete_tail(worker->request_queue);
310
switch (request.type) {
311
case PROXY_CLIENT_REQUEST_TYPE_COPY:
312
ret = proxy_client_worker_next_copy(worker, &request, line);
314
case PROXY_CLIENT_REQUEST_TYPE_GET:
315
ret = proxy_client_worker_next_msg_get(worker, &request, line);
317
case PROXY_CLIENT_REQUEST_TYPE_FINISH:
318
proxy_client_worker_next_finish(worker, &request, line);
324
static void proxy_client_worker_input(struct proxy_client_dsync_worker *worker)
329
if (worker->to_input != NULL)
330
timeout_remove(&worker->to_input);
332
if (worker->worker.input_callback != NULL) {
333
worker->worker.input_callback(worker->worker.input_context);
334
timeout_reset(worker->to);
338
while ((ret = proxy_client_worker_read_line(worker, &line)) > 0) {
339
if (!proxy_client_worker_next_reply(worker, line))
343
/* try to continue */
344
proxy_client_worker_next_reply(worker, "");
347
if (worker->to_input != NULL) {
348
/* input stream's destroy callback was already called.
349
don't get back here. */
350
timeout_remove(&worker->to_input);
352
timeout_reset(worker->to);
356
proxy_client_worker_output_real(struct proxy_client_dsync_worker *worker)
360
if ((ret = o_stream_flush(worker->output)) < 0)
363
if (worker->save_input != NULL) {
364
/* proxy_client_worker_msg_save() hasn't finished yet. */
365
o_stream_cork(worker->output);
366
proxy_client_send_stream(worker);
367
if (worker->save_input != NULL) {
368
/* still unfinished, make sure we get called again */
373
if (worker->worker.output_callback != NULL)
374
worker->worker.output_callback(worker->worker.output_context);
378
static int proxy_client_worker_output(struct proxy_client_dsync_worker *worker)
382
ret = proxy_client_worker_output_real(worker);
383
timeout_reset(worker->to);
388
proxy_client_worker_timeout(struct proxy_client_dsync_worker *worker)
392
if (worker->save_io != NULL)
393
reason = " (waiting for more input from mail being saved)";
394
else if (worker->save_input != NULL) {
395
size_t bytes = o_stream_get_buffer_used_size(worker->output);
397
reason = t_strdup_printf(" (waiting for output stream to flush, "
398
"%"PRIuSIZE_T" bytes left)", bytes);
399
} else if (worker->msg_get_data.input != NULL) {
400
reason = " (waiting for MSG-GET message from remote)";
404
i_error("proxy client timed out%s", reason);
405
proxy_client_fail(worker);
408
struct dsync_worker *dsync_worker_init_proxy_client(int fd_in, int fd_out)
410
struct proxy_client_dsync_worker *worker;
412
worker = i_new(struct proxy_client_dsync_worker, 1);
413
worker->worker.v = proxy_client_dsync_worker;
414
worker->fd_in = fd_in;
415
worker->fd_out = fd_out;
416
worker->to = timeout_add(DSYNC_PROXY_CLIENT_TIMEOUT_MSECS,
417
proxy_client_worker_timeout, worker);
418
worker->io = io_add(fd_in, IO_READ, proxy_client_worker_input, worker);
419
worker->input = i_stream_create_fd(fd_in, (size_t)-1, FALSE);
420
worker->output = o_stream_create_fd(fd_out, (size_t)-1, FALSE);
421
o_stream_send_str(worker->output, DSYNC_PROXY_CLIENT_GREETING_LINE"\n");
422
/* we'll keep the output corked until flush is needed */
423
o_stream_cork(worker->output);
424
o_stream_set_flush_callback(worker->output, proxy_client_worker_output,
426
fd_set_nonblock(fd_in, TRUE);
427
fd_set_nonblock(fd_out, TRUE);
429
worker->pending_commands = str_new(default_pool, 1024);
430
worker->msg_get_pool = pool_alloconly_create("dsync proxy msg", 128);
431
i_array_init(&worker->request_array, 64);
432
worker->request_queue = aqueue_init(&worker->request_array.arr);
434
return &worker->worker;
437
static void proxy_client_worker_deinit(struct dsync_worker *_worker)
439
struct proxy_client_dsync_worker *worker =
440
(struct proxy_client_dsync_worker *)_worker;
442
timeout_remove(&worker->to);
443
if (worker->to_input != NULL)
444
timeout_remove(&worker->to_input);
445
if (worker->io != NULL)
446
io_remove(&worker->io);
447
i_stream_destroy(&worker->input);
448
o_stream_destroy(&worker->output);
449
if (close(worker->fd_in) < 0)
450
i_error("close(worker input) failed: %m");
451
if (worker->fd_in != worker->fd_out) {
452
if (close(worker->fd_out) < 0)
453
i_error("close(worker output) failed: %m");
455
aqueue_deinit(&worker->request_queue);
456
array_free(&worker->request_array);
457
pool_unref(&worker->msg_get_pool);
458
str_free(&worker->pending_commands);
463
worker_is_output_stream_full(struct proxy_client_dsync_worker *worker)
465
return o_stream_get_buffer_used_size(worker->output) >=
466
OUTBUF_THROTTLE_SIZE;
469
static bool proxy_client_worker_is_output_full(struct dsync_worker *_worker)
471
struct proxy_client_dsync_worker *worker =
472
(struct proxy_client_dsync_worker *)_worker;
474
if (worker->save_input != NULL) {
475
/* we haven't finished sending a message save, so we're full. */
478
return worker_is_output_stream_full(worker);
481
static int proxy_client_worker_output_flush(struct dsync_worker *_worker)
483
struct proxy_client_dsync_worker *worker =
484
(struct proxy_client_dsync_worker *)_worker;
487
if (o_stream_flush(worker->output) < 0)
490
o_stream_uncork(worker->output);
491
if (o_stream_get_buffer_used_size(worker->output) > 0)
494
if (o_stream_send(worker->output, str_data(worker->pending_commands),
495
str_len(worker->pending_commands)) < 0)
497
str_truncate(worker->pending_commands, 0);
498
o_stream_cork(worker->output);
502
static struct dsync_worker_mailbox_iter *
503
proxy_client_worker_mailbox_iter_init(struct dsync_worker *_worker)
505
struct proxy_client_dsync_worker *worker =
506
(struct proxy_client_dsync_worker *)_worker;
507
struct proxy_client_dsync_worker_mailbox_iter *iter;
509
iter = i_new(struct proxy_client_dsync_worker_mailbox_iter, 1);
510
iter->iter.worker = _worker;
511
iter->pool = pool_alloconly_create("proxy mailbox iter", 1024);
512
o_stream_send_str(worker->output, "BOX-LIST\n");
513
(void)proxy_client_worker_output_flush(_worker);
518
proxy_client_worker_mailbox_iter_next(struct dsync_worker_mailbox_iter *_iter,
519
struct dsync_mailbox *dsync_box_r)
521
struct proxy_client_dsync_worker_mailbox_iter *iter =
522
(struct proxy_client_dsync_worker_mailbox_iter *)_iter;
523
struct proxy_client_dsync_worker *worker =
524
(struct proxy_client_dsync_worker *)_iter->worker;
525
const char *line, *error;
528
if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
530
_iter->failed = TRUE;
534
if ((line[0] == '+' || line[0] == '-') && line[1] == '\0') {
535
/* end of mailboxes */
536
if (line[0] == '-') {
537
i_error("Worker server's mailbox iteration failed");
538
_iter->failed = TRUE;
544
if (dsync_proxy_mailbox_import(iter->pool, line,
545
dsync_box_r, &error) < 0) {
546
i_error("Invalid mailbox input from worker server: %s", error);
547
_iter->failed = TRUE;
554
proxy_client_worker_mailbox_iter_deinit(struct dsync_worker_mailbox_iter *_iter)
556
struct proxy_client_dsync_worker_mailbox_iter *iter =
557
(struct proxy_client_dsync_worker_mailbox_iter *)_iter;
558
int ret = _iter->failed ? -1 : 0;
560
pool_unref(&iter->pool);
565
static struct dsync_worker_subs_iter *
566
proxy_client_worker_subs_iter_init(struct dsync_worker *_worker)
568
struct proxy_client_dsync_worker *worker =
569
(struct proxy_client_dsync_worker *)_worker;
570
struct proxy_client_dsync_worker_subs_iter *iter;
572
iter = i_new(struct proxy_client_dsync_worker_subs_iter, 1);
573
iter->iter.worker = _worker;
574
iter->pool = pool_alloconly_create("proxy subscription iter", 1024);
575
o_stream_send_str(worker->output, "SUBS-LIST\n");
576
(void)proxy_client_worker_output_flush(_worker);
581
proxy_client_worker_subs_iter_next_line(struct proxy_client_dsync_worker_subs_iter *iter,
582
unsigned int wanted_arg_count,
585
struct proxy_client_dsync_worker *worker =
586
(struct proxy_client_dsync_worker *)iter->iter.worker;
591
if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
593
iter->iter.failed = TRUE;
597
if ((line[0] == '+' || line[0] == '-') && line[1] == '\0') {
598
/* end of subscribed subscriptions */
599
if (line[0] == '-') {
600
i_error("Worker server's subscription iteration failed");
601
iter->iter.failed = TRUE;
607
args = p_strsplit(iter->pool, line, "\t");
608
if (str_array_length((const char *const *)args) < wanted_arg_count) {
609
i_error("Invalid subscription input from worker server");
610
iter->iter.failed = TRUE;
618
proxy_client_worker_subs_iter_next(struct dsync_worker_subs_iter *_iter,
619
struct dsync_worker_subscription *rec_r)
621
struct proxy_client_dsync_worker_subs_iter *iter =
622
(struct proxy_client_dsync_worker_subs_iter *)_iter;
626
ret = proxy_client_worker_subs_iter_next_line(iter, 4, &args);
630
rec_r->vname = str_tabunescape(args[0]);
631
rec_r->storage_name = str_tabunescape(args[1]);
632
rec_r->ns_prefix = str_tabunescape(args[2]);
633
rec_r->last_change = strtoul(args[3], NULL, 10);
638
proxy_client_worker_subs_iter_next_un(struct dsync_worker_subs_iter *_iter,
639
struct dsync_worker_unsubscription *rec_r)
641
struct proxy_client_dsync_worker_subs_iter *iter =
642
(struct proxy_client_dsync_worker_subs_iter *)_iter;
646
ret = proxy_client_worker_subs_iter_next_line(iter, 3, &args);
650
memset(rec_r, 0, sizeof(*rec_r));
651
if (dsync_proxy_mailbox_guid_import(args[0], &rec_r->name_sha1) < 0) {
652
i_error("Invalid subscription input from worker server: "
653
"Invalid unsubscription mailbox GUID");
654
iter->iter.failed = TRUE;
657
rec_r->ns_prefix = str_tabunescape(args[1]);
658
rec_r->last_change = strtoul(args[2], NULL, 10);
663
proxy_client_worker_subs_iter_deinit(struct dsync_worker_subs_iter *_iter)
665
struct proxy_client_dsync_worker_subs_iter *iter =
666
(struct proxy_client_dsync_worker_subs_iter *)_iter;
667
int ret = _iter->failed ? -1 : 0;
669
pool_unref(&iter->pool);
675
proxy_client_worker_set_subscribed(struct dsync_worker *_worker,
676
const char *name, time_t last_change,
679
struct proxy_client_dsync_worker *worker =
680
(struct proxy_client_dsync_worker *)_worker;
683
string_t *str = t_str_new(128);
685
str_append(str, "SUBS-SET\t");
686
str_tabescape_write(str, name);
687
str_printfa(str, "\t%s\t%d\n", dec2str(last_change),
689
o_stream_send(worker->output, str_data(str), str_len(str));
693
struct proxy_client_dsync_worker_msg_iter {
694
struct dsync_worker_msg_iter iter;
699
static struct dsync_worker_msg_iter *
700
proxy_client_worker_msg_iter_init(struct dsync_worker *_worker,
701
const mailbox_guid_t mailboxes[],
702
unsigned int mailbox_count)
704
struct proxy_client_dsync_worker *worker =
705
(struct proxy_client_dsync_worker *)_worker;
706
struct proxy_client_dsync_worker_msg_iter *iter;
710
iter = i_new(struct proxy_client_dsync_worker_msg_iter, 1);
711
iter->iter.worker = _worker;
712
iter->pool = pool_alloconly_create("proxy message iter", 10240);
714
str = str_new(iter->pool, 512);
715
str_append(str, "MSG-LIST");
716
for (i = 0; i < mailbox_count; i++) T_BEGIN {
717
str_append_c(str, '\t');
718
dsync_proxy_mailbox_guid_export(str, &mailboxes[i]);
720
str_append_c(str, '\n');
721
o_stream_send(worker->output, str_data(str), str_len(str));
724
(void)proxy_client_worker_output_flush(_worker);
729
proxy_client_worker_msg_iter_next(struct dsync_worker_msg_iter *_iter,
730
unsigned int *mailbox_idx_r,
731
struct dsync_message *msg_r)
733
struct proxy_client_dsync_worker_msg_iter *iter =
734
(struct proxy_client_dsync_worker_msg_iter *)_iter;
735
struct proxy_client_dsync_worker *worker =
736
(struct proxy_client_dsync_worker *)_iter->worker;
737
const char *line, *error;
743
if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
745
_iter->failed = TRUE;
749
if ((line[0] == '+' || line[0] == '-') && line[1] == '\0') {
750
/* end of messages */
751
if (line[0] == '-') {
752
i_error("Worker server's message iteration failed");
753
_iter->failed = TRUE;
760
while (*line >= '0' && *line <= '9') {
761
*mailbox_idx_r = *mailbox_idx_r * 10 + (*line - '0');
765
i_error("Invalid mailbox idx from worker server");
766
_iter->failed = TRUE;
772
if (dsync_proxy_msg_import(iter->pool, line, msg_r, &error) < 0) {
773
i_error("Invalid message input from worker server: %s", error);
774
_iter->failed = TRUE;
781
proxy_client_worker_msg_iter_deinit(struct dsync_worker_msg_iter *_iter)
783
struct proxy_client_dsync_worker_msg_iter *iter =
784
(struct proxy_client_dsync_worker_msg_iter *)_iter;
785
int ret = _iter->failed ? -1 : 0;
787
pool_unref(&iter->pool);
793
proxy_client_worker_cmd(struct proxy_client_dsync_worker *worker, string_t *str)
795
if (worker->save_input == NULL)
796
o_stream_send(worker->output, str_data(str), str_len(str));
798
str_append_str(worker->pending_commands, str);
802
proxy_client_worker_create_mailbox(struct dsync_worker *_worker,
803
const struct dsync_mailbox *dsync_box)
805
struct proxy_client_dsync_worker *worker =
806
(struct proxy_client_dsync_worker *)_worker;
809
string_t *str = t_str_new(128);
811
str_append(str, "BOX-CREATE\t");
812
dsync_proxy_mailbox_export(str, dsync_box);
813
str_append_c(str, '\n');
814
proxy_client_worker_cmd(worker, str);
819
proxy_client_worker_delete_mailbox(struct dsync_worker *_worker,
820
const struct dsync_mailbox *dsync_box)
822
struct proxy_client_dsync_worker *worker =
823
(struct proxy_client_dsync_worker *)_worker;
826
string_t *str = t_str_new(128);
828
str_append(str, "BOX-DELETE\t");
829
dsync_proxy_mailbox_guid_export(str, &dsync_box->mailbox_guid);
830
str_printfa(str, "\t%s\n", dec2str(dsync_box->last_change));
831
proxy_client_worker_cmd(worker, str);
836
proxy_client_worker_delete_dir(struct dsync_worker *_worker,
837
const struct dsync_mailbox *dsync_box)
839
struct proxy_client_dsync_worker *worker =
840
(struct proxy_client_dsync_worker *)_worker;
843
string_t *str = t_str_new(128);
845
str_append(str, "DIR-DELETE\t");
846
str_tabescape_write(str, dsync_box->name);
847
str_printfa(str, "\t%s\n", dec2str(dsync_box->last_change));
848
proxy_client_worker_cmd(worker, str);
853
proxy_client_worker_rename_mailbox(struct dsync_worker *_worker,
854
const mailbox_guid_t *mailbox,
855
const struct dsync_mailbox *dsync_box)
857
struct proxy_client_dsync_worker *worker =
858
(struct proxy_client_dsync_worker *)_worker;
862
string_t *str = t_str_new(128);
864
str_append(str, "BOX-RENAME\t");
865
dsync_proxy_mailbox_guid_export(str, mailbox);
866
str_append_c(str, '\t');
867
str_tabescape_write(str, dsync_box->name);
868
str_append_c(str, '\t');
869
sep[0] = dsync_box->name_sep; sep[1] = '\0';
870
str_tabescape_write(str, sep);
871
str_append_c(str, '\n');
872
proxy_client_worker_cmd(worker, str);
877
proxy_client_worker_update_mailbox(struct dsync_worker *_worker,
878
const struct dsync_mailbox *dsync_box)
880
struct proxy_client_dsync_worker *worker =
881
(struct proxy_client_dsync_worker *)_worker;
884
string_t *str = t_str_new(128);
886
str_append(str, "BOX-UPDATE\t");
887
dsync_proxy_mailbox_export(str, dsync_box);
888
str_append_c(str, '\n');
889
proxy_client_worker_cmd(worker, str);
894
proxy_client_worker_select_mailbox(struct dsync_worker *_worker,
895
const mailbox_guid_t *mailbox,
896
const ARRAY_TYPE(const_string) *cache_fields)
898
struct proxy_client_dsync_worker *worker =
899
(struct proxy_client_dsync_worker *)_worker;
901
if (dsync_guid_equals(&worker->selected_box_guid, mailbox))
903
worker->selected_box_guid = *mailbox;
906
string_t *str = t_str_new(128);
908
str_append(str, "BOX-SELECT\t");
909
dsync_proxy_mailbox_guid_export(str, mailbox);
910
if (cache_fields != NULL)
911
dsync_proxy_strings_export(str, cache_fields);
912
str_append_c(str, '\n');
913
proxy_client_worker_cmd(worker, str);
918
proxy_client_worker_msg_update_metadata(struct dsync_worker *_worker,
919
const struct dsync_message *msg)
921
struct proxy_client_dsync_worker *worker =
922
(struct proxy_client_dsync_worker *)_worker;
925
string_t *str = t_str_new(128);
927
str_printfa(str, "MSG-UPDATE\t%u\t%llu\t", msg->uid,
928
(unsigned long long)msg->modseq);
929
imap_write_flags(str, msg->flags, msg->keywords);
930
str_append_c(str, '\n');
931
proxy_client_worker_cmd(worker, str);
936
proxy_client_worker_msg_update_uid(struct dsync_worker *_worker,
937
uint32_t old_uid, uint32_t new_uid)
939
struct proxy_client_dsync_worker *worker =
940
(struct proxy_client_dsync_worker *)_worker;
943
string_t *str = t_str_new(64);
944
str_printfa(str, "MSG-UID-CHANGE\t%u\t%u\n", old_uid, new_uid);
945
proxy_client_worker_cmd(worker, str);
950
proxy_client_worker_msg_expunge(struct dsync_worker *_worker, uint32_t uid)
952
struct proxy_client_dsync_worker *worker =
953
(struct proxy_client_dsync_worker *)_worker;
956
string_t *str = t_str_new(64);
957
str_printfa(str, "MSG-EXPUNGE\t%u\n", uid);
958
proxy_client_worker_cmd(worker, str);
963
proxy_client_worker_msg_copy(struct dsync_worker *_worker,
964
const mailbox_guid_t *src_mailbox,
966
const struct dsync_message *dest_msg,
967
dsync_worker_copy_callback_t *callback,
970
struct proxy_client_dsync_worker *worker =
971
(struct proxy_client_dsync_worker *)_worker;
972
struct proxy_client_request request;
975
string_t *str = t_str_new(128);
977
str_append(str, "MSG-COPY\t");
978
dsync_proxy_mailbox_guid_export(str, src_mailbox);
979
str_printfa(str, "\t%u\t", src_uid);
980
dsync_proxy_msg_export(str, dest_msg);
981
str_append_c(str, '\n');
982
proxy_client_worker_cmd(worker, str);
985
memset(&request, 0, sizeof(request));
986
request.type = PROXY_CLIENT_REQUEST_TYPE_COPY;
987
request.callback.copy = callback;
988
request.context = context;
989
request.uid = src_uid;
990
aqueue_append(worker->request_queue, &request);
994
proxy_client_send_stream_real(struct proxy_client_dsync_worker *worker)
996
dsync_worker_save_callback_t *callback;
998
struct istream *input;
999
const unsigned char *data;
1003
while ((ret = i_stream_read_data(worker->save_input,
1004
&data, &size, 0)) > 0) {
1005
dsync_proxy_send_dot_output(worker->output,
1006
&worker->save_input_last_lf,
1008
i_stream_skip(worker->save_input, size);
1010
if (worker_is_output_stream_full(worker)) {
1011
o_stream_uncork(worker->output);
1012
if (worker_is_output_stream_full(worker))
1014
o_stream_cork(worker->output);
1018
/* waiting for more input */
1019
o_stream_uncork(worker->output);
1020
if (worker->save_io == NULL) {
1021
int fd = i_stream_get_fd(worker->save_input);
1025
proxy_client_send_stream, worker);
1029
if (worker->save_io != NULL)
1030
io_remove(&worker->save_io);
1031
if (worker->save_input->stream_errno != 0) {
1032
errno = worker->save_input->stream_errno;
1033
i_error("proxy: reading message input failed: %m");
1034
o_stream_close(worker->output);
1036
i_assert(!i_stream_have_bytes_left(worker->save_input));
1037
o_stream_send(worker->output, "\n.\n", 3);
1040
callback = worker->save_callback;
1041
context = worker->save_context;
1042
worker->save_callback = NULL;
1043
worker->save_context = NULL;
1045
/* a bit ugly way to free the stream. the problem is that local worker
1046
has set a destroy callback, which in turn can call our msg_save()
1047
again before the i_stream_unref() is finished. */
1048
input = worker->save_input;
1049
worker->save_input = NULL;
1050
i_stream_unref(&input);
1052
(void)proxy_client_worker_output_flush(&worker->worker);
1057
static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker)
1059
proxy_client_send_stream_real(worker);
1060
timeout_reset(worker->to);
1064
proxy_client_worker_msg_save(struct dsync_worker *_worker,
1065
const struct dsync_message *msg,
1066
const struct dsync_msg_static_data *data,
1067
dsync_worker_save_callback_t *callback,
1070
struct proxy_client_dsync_worker *worker =
1071
(struct proxy_client_dsync_worker *)_worker;
1074
string_t *str = t_str_new(128);
1076
str_append(str, "MSG-SAVE\t");
1077
dsync_proxy_msg_static_export(str, data);
1078
str_append_c(str, '\t');
1079
dsync_proxy_msg_export(str, msg);
1080
str_append_c(str, '\n');
1081
proxy_client_worker_cmd(worker, str);
1084
i_assert(worker->save_input == NULL);
1085
worker->save_callback = callback;
1086
worker->save_context = context;
1087
worker->save_input = data->input;
1088
worker->save_input_last_lf = TRUE;
1089
i_stream_ref(worker->save_input);
1090
proxy_client_send_stream(worker);
1094
proxy_client_worker_msg_save_cancel(struct dsync_worker *_worker)
1096
struct proxy_client_dsync_worker *worker =
1097
(struct proxy_client_dsync_worker *)_worker;
1099
if (worker->save_io != NULL)
1100
io_remove(&worker->save_io);
1101
if (worker->save_input != NULL)
1102
i_stream_unref(&worker->save_input);
1106
proxy_client_worker_msg_get(struct dsync_worker *_worker,
1107
const mailbox_guid_t *mailbox, uint32_t uid,
1108
dsync_worker_msg_callback_t *callback,
1111
struct proxy_client_dsync_worker *worker =
1112
(struct proxy_client_dsync_worker *)_worker;
1113
struct proxy_client_request request;
1116
string_t *str = t_str_new(128);
1118
str_append(str, "MSG-GET\t");
1119
dsync_proxy_mailbox_guid_export(str, mailbox);
1120
str_printfa(str, "\t%u\n", uid);
1121
proxy_client_worker_cmd(worker, str);
1124
memset(&request, 0, sizeof(request));
1125
request.type = PROXY_CLIENT_REQUEST_TYPE_GET;
1126
request.callback.get = callback;
1127
request.context = context;
1129
aqueue_append(worker->request_queue, &request);
1133
proxy_client_worker_finish(struct dsync_worker *_worker,
1134
dsync_worker_finish_callback_t *callback,
1137
struct proxy_client_dsync_worker *worker =
1138
(struct proxy_client_dsync_worker *)_worker;
1139
struct proxy_client_request request;
1141
i_assert(worker->save_input == NULL);
1142
i_assert(!worker->finishing);
1144
worker->finishing = TRUE;
1145
worker->finished = FALSE;
1147
o_stream_send_str(worker->output, "FINISH\n");
1148
o_stream_uncork(worker->output);
1150
memset(&request, 0, sizeof(request));
1151
request.type = PROXY_CLIENT_REQUEST_TYPE_FINISH;
1152
request.callback.finish = callback;
1153
request.context = context;
1154
aqueue_append(worker->request_queue, &request);
1157
struct dsync_worker_vfuncs proxy_client_dsync_worker = {
1158
proxy_client_worker_deinit,
1160
proxy_client_worker_is_output_full,
1161
proxy_client_worker_output_flush,
1163
proxy_client_worker_mailbox_iter_init,
1164
proxy_client_worker_mailbox_iter_next,
1165
proxy_client_worker_mailbox_iter_deinit,
1167
proxy_client_worker_subs_iter_init,
1168
proxy_client_worker_subs_iter_next,
1169
proxy_client_worker_subs_iter_next_un,
1170
proxy_client_worker_subs_iter_deinit,
1171
proxy_client_worker_set_subscribed,
1173
proxy_client_worker_msg_iter_init,
1174
proxy_client_worker_msg_iter_next,
1175
proxy_client_worker_msg_iter_deinit,
1177
proxy_client_worker_create_mailbox,
1178
proxy_client_worker_delete_mailbox,
1179
proxy_client_worker_delete_dir,
1180
proxy_client_worker_rename_mailbox,
1181
proxy_client_worker_update_mailbox,
1183
proxy_client_worker_select_mailbox,
1184
proxy_client_worker_msg_update_metadata,
1185
proxy_client_worker_msg_update_uid,
1186
proxy_client_worker_msg_expunge,
1187
proxy_client_worker_msg_copy,
1188
proxy_client_worker_msg_save,
1189
proxy_client_worker_msg_save_cancel,
1190
proxy_client_worker_msg_get,
1191
proxy_client_worker_finish