~james-page/ubuntu/raring/dovecot/autopkgtest

« back to all changes in this revision

Viewing changes to src/dsync/dsync-proxy-client.c

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2012-06-11 11:11:54 UTC
  • mfrom: (1.15.2) (4.1.27 sid)
  • Revision ID: package-import@ubuntu.com-20120611111154-678cwbdj6ktgsv1h
Tags: 1:2.1.7-1ubuntu1
* Merge from Debian unstable, remaining changes:
  + Add mail-stack-delivery package:
    - Update d/rules
    - d/control: convert existing dovecot-postfix package to a dummy
      package and add new mail-stack-delivery package.
    - Update maintainer scripts.
    - Rename d/dovecot-postfix.* to debian/mail-stack-delivery.*
    - d/mail-stack-delivery.preinst: Move previously installed backups and
      config files to a new package namespace.
    - d/mail-stack-delivery.prerm: Added to handle downgrades.
  + Use Snakeoil SSL certificates by default:
    - d/control: Depend on ssl-cert.
    - d/dovecot-core.postinst: Relax grep for SSL_* a bit.
  + Add autopkgtest to debian/tests/*.
  + Add ufw integration:
    - d/dovecot-core.ufw.profile: new ufw profile.
    - d/rules: install profile in dovecot-core.
    - d/control: dovecot-core - suggest ufw.
  + d/{control,rules}: enable PIE hardening.
  + d/dovecot-core.dirs: Added usr/share/doc/dovecot-core
  + Add apport hook:
    - d/rules, d/source_dovecot.py
  + Add upstart job:
    - d/rules, d/dovecot-core.dovecot.upstart, d/control,
      d/dovecot-core.dirs, dovecot-imapd.{postrm, postinst, prerm},
      d/dovecot-pop3d.{postinst, postrm, prerm}.
      d/mail-stack-deliver.postinst: Convert init script to upstart.
  + d/control: Added Pre-Depends: dpkg (>= 1.15.6) to dovecot-dbg to support
    xz compression in Ubuntu.
  + d/control: Demote dovecot-common Recommends: to Suggests: to prevent
    install of extra packages on upgrade.
  + d/patches/dovecot-drac.patch: Updated with version for dovecot >= 2.0.0.
  + d/control: Drop B-D on systemd.
* Dropped changes:
  + d/patches/fix-racey-restart.patch: part of 2.1.x, no longer required.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2009-2011 Dovecot authors, see the included COPYING file */
2
 
 
3
 
#include "lib.h"
4
 
#include "array.h"
5
 
#include "aqueue.h"
6
 
#include "fd-set-nonblock.h"
7
 
#include "istream.h"
8
 
#include "istream-dot.h"
9
 
#include "ostream.h"
10
 
#include "str.h"
11
 
#include "strescape.h"
12
 
#include "master-service.h"
13
 
#include "imap-util.h"
14
 
#include "dsync-proxy.h"
15
 
#include "dsync-worker-private.h"
16
 
 
17
 
#include <stdlib.h>
18
 
#include <unistd.h>
19
 
 
20
 
#define OUTBUF_THROTTLE_SIZE (1024*64)
21
 
 
22
 
enum proxy_client_request_type {
23
 
        PROXY_CLIENT_REQUEST_TYPE_COPY,
24
 
        PROXY_CLIENT_REQUEST_TYPE_GET,
25
 
        PROXY_CLIENT_REQUEST_TYPE_FINISH
26
 
};
27
 
 
28
 
struct proxy_client_request {
29
 
        enum proxy_client_request_type type;
30
 
        uint32_t uid;
31
 
        union {
32
 
                dsync_worker_msg_callback_t *get;
33
 
                dsync_worker_copy_callback_t *copy;
34
 
                dsync_worker_finish_callback_t *finish;
35
 
        } callback;
36
 
        void *context;
37
 
};
38
 
 
39
 
struct proxy_client_dsync_worker_mailbox_iter {
40
 
        struct dsync_worker_mailbox_iter iter;
41
 
        pool_t pool;
42
 
};
43
 
 
44
 
struct proxy_client_dsync_worker_subs_iter {
45
 
        struct dsync_worker_subs_iter iter;
46
 
        pool_t pool;
47
 
};
48
 
 
49
 
struct proxy_client_dsync_worker {
50
 
        struct dsync_worker worker;
51
 
        int fd_in, fd_out;
52
 
        struct io *io;
53
 
        struct istream *input;
54
 
        struct ostream *output;
55
 
        struct timeout *to, *to_input;
56
 
 
57
 
        mailbox_guid_t selected_box_guid;
58
 
 
59
 
        dsync_worker_save_callback_t *save_callback;
60
 
        void *save_context;
61
 
        struct istream *save_input;
62
 
        struct io *save_io;
63
 
        bool save_input_last_lf;
64
 
 
65
 
        pool_t msg_get_pool;
66
 
        struct dsync_msg_static_data msg_get_data;
67
 
        ARRAY_DEFINE(request_array, struct proxy_client_request);
68
 
        struct aqueue *request_queue;
69
 
        string_t *pending_commands;
70
 
 
71
 
        unsigned int handshake_received:1;
72
 
        unsigned int finishing:1;
73
 
        unsigned int finished:1;
74
 
};
75
 
 
76
 
extern struct dsync_worker_vfuncs proxy_client_dsync_worker;
77
 
 
78
 
static void proxy_client_worker_input(struct proxy_client_dsync_worker *worker);
79
 
static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker);
80
 
 
81
 
static void proxy_client_fail(struct proxy_client_dsync_worker *worker)
82
 
{
83
 
        i_stream_close(worker->input);
84
 
        dsync_worker_set_failure(&worker->worker);
85
 
        master_service_stop(master_service);
86
 
}
87
 
 
88
 
static int
89
 
proxy_client_worker_read_line(struct proxy_client_dsync_worker *worker,
90
 
                              const char **line_r)
91
 
{
92
 
        if (worker->worker.failed)
93
 
                return -1;
94
 
 
95
 
        *line_r = i_stream_read_next_line(worker->input);
96
 
        if (*line_r == NULL) {
97
 
                if (worker->input->stream_errno != 0) {
98
 
                        errno = worker->input->stream_errno;
99
 
                        i_error("read() from worker server failed: %m");
100
 
                        dsync_worker_set_failure(&worker->worker);
101
 
                        return -1;
102
 
                }
103
 
                if (worker->input->eof) {
104
 
                        if (!worker->finished)
105
 
                                i_error("read() from worker server failed: EOF");
106
 
                        dsync_worker_set_failure(&worker->worker);
107
 
                        return -1;
108
 
                }
109
 
        }
110
 
        if (*line_r == NULL)
111
 
                return 0;
112
 
 
113
 
        if (!worker->handshake_received) {
114
 
                if (strcmp(*line_r, DSYNC_PROXY_SERVER_GREETING_LINE) != 0) {
115
 
                        i_error("Invalid server handshake: %s", *line_r);
116
 
                        dsync_worker_set_failure(&worker->worker);
117
 
                        return -1;
118
 
                }
119
 
                worker->handshake_received = TRUE;
120
 
                return proxy_client_worker_read_line(worker, line_r);
121
 
        }
122
 
        return 1;
123
 
}
124
 
 
125
 
static void
126
 
proxy_client_worker_msg_get_finish(struct proxy_client_dsync_worker *worker)
127
 
{
128
 
        worker->msg_get_data.input = NULL;
129
 
        worker->io = io_add(worker->fd_in, IO_READ,
130
 
                            proxy_client_worker_input, worker);
131
 
 
132
 
        /* some input may already be buffered. note that we may be coming here
133
 
           from the input function itself, in which case this timeout must not
134
 
           be called (we'll remove it later) */
135
 
        if (worker->to_input == NULL) {
136
 
                worker->to_input =
137
 
                        timeout_add(0, proxy_client_worker_input, worker);
138
 
        }
139
 
}
140
 
 
141
 
static void
142
 
proxy_client_worker_read_to_eof(struct proxy_client_dsync_worker *worker)
143
 
{
144
 
        struct istream *input = worker->msg_get_data.input;
145
 
        const unsigned char *data;
146
 
        size_t size;
147
 
        int ret;
148
 
 
149
 
        while ((ret = i_stream_read_data(input, &data, &size, 0)) > 0)
150
 
                i_stream_skip(input, size);
151
 
        if (ret == -1) {
152
 
                i_stream_unref(&input);
153
 
                io_remove(&worker->io);
154
 
                proxy_client_worker_msg_get_finish(worker);
155
 
        }
156
 
        timeout_reset(worker->to);
157
 
}
158
 
 
159
 
static void
160
 
proxy_client_worker_msg_get_done(struct proxy_client_dsync_worker *worker)
161
 
{
162
 
        struct istream *input = worker->msg_get_data.input;
163
 
 
164
 
        i_assert(worker->io == NULL);
165
 
 
166
 
        if (input->eof)
167
 
                proxy_client_worker_msg_get_finish(worker);
168
 
        else {
169
 
                /* saving read the message only partially. we'll need to read
170
 
                   the input until EOF or we'll start treating the input as
171
 
                   commands. */
172
 
                worker->io = io_add(worker->fd_in, IO_READ,
173
 
                                    proxy_client_worker_read_to_eof, worker);
174
 
                worker->msg_get_data.input =
175
 
                        i_stream_create_dot(worker->input, FALSE);
176
 
        }
177
 
}
178
 
 
179
 
static bool
180
 
proxy_client_worker_next_copy(struct proxy_client_dsync_worker *worker,
181
 
                              const struct proxy_client_request *request,
182
 
                              const char *line)
183
 
{
184
 
        uint32_t uid;
185
 
        bool success;
186
 
 
187
 
        if (line[0] == '1' && line[1] == '\t')
188
 
                success = TRUE;
189
 
        else if (line[0] == '0' && line[1] == '\t')
190
 
                success = FALSE;
191
 
        else {
192
 
                i_error("msg-copy returned invalid input: %s", line);
193
 
                proxy_client_fail(worker);
194
 
                return FALSE;
195
 
        }
196
 
        uid = strtoul(line + 2, NULL, 10);
197
 
        if (uid != request->uid) {
198
 
                i_error("msg-copy returned invalid uid: %u != %u",
199
 
                        uid, request->uid);
200
 
                proxy_client_fail(worker);
201
 
                return FALSE;
202
 
        }
203
 
 
204
 
        request->callback.copy(success, request->context);
205
 
        return TRUE;
206
 
}
207
 
 
208
 
static bool
209
 
proxy_client_worker_next_msg_get(struct proxy_client_dsync_worker *worker,
210
 
                                 const struct proxy_client_request *request,
211
 
                                 const char *line)
212
 
{
213
 
        enum dsync_msg_get_result result = DSYNC_MSG_GET_RESULT_FAILED;
214
 
        const char *p, *error;
215
 
        uint32_t uid;
216
 
 
217
 
        p_clear(worker->msg_get_pool);
218
 
        switch (line[0]) {
219
 
        case '1':
220
 
                /* ok */
221
 
                if (line[1] != '\t')
222
 
                        break;
223
 
                line += 2;
224
 
 
225
 
                if ((p = strchr(line, '\t')) == NULL)
226
 
                        break;
227
 
                uid = strtoul(t_strcut(line, '\t'), NULL, 10);
228
 
                line = p + 1;
229
 
 
230
 
                if (uid != request->uid) {
231
 
                        i_error("msg-get returned invalid uid: %u != %u",
232
 
                                uid, request->uid);
233
 
                        proxy_client_fail(worker);
234
 
                        return FALSE;
235
 
                }
236
 
 
237
 
                if (dsync_proxy_msg_static_import(worker->msg_get_pool,
238
 
                                                  line, &worker->msg_get_data,
239
 
                                                  &error) < 0) {
240
 
                        i_error("Invalid msg-get static input: %s", error);
241
 
                        proxy_client_fail(worker);
242
 
                        return FALSE;
243
 
                }
244
 
                worker->msg_get_data.input =
245
 
                        i_stream_create_dot(worker->input, FALSE);
246
 
                i_stream_set_destroy_callback(worker->msg_get_data.input,
247
 
                                              proxy_client_worker_msg_get_done,
248
 
                                              worker);
249
 
                io_remove(&worker->io);
250
 
                result = DSYNC_MSG_GET_RESULT_SUCCESS;
251
 
                break;
252
 
        case '0':
253
 
                /* expunged */
254
 
                result = DSYNC_MSG_GET_RESULT_EXPUNGED;
255
 
                break;
256
 
        default:
257
 
                /* failure */
258
 
                break;
259
 
        }
260
 
 
261
 
        request->callback.get(result, &worker->msg_get_data, request->context);
262
 
        return worker->io != NULL && worker->msg_get_data.input == NULL;
263
 
}
264
 
 
265
 
static void
266
 
proxy_client_worker_next_finish(struct proxy_client_dsync_worker *worker,
267
 
                                const struct proxy_client_request *request,
268
 
                                const char *line)
269
 
{
270
 
        bool success = TRUE;
271
 
 
272
 
        i_assert(worker->finishing);
273
 
        i_assert(!worker->finished);
274
 
 
275
 
        worker->finishing = FALSE;
276
 
        worker->finished = TRUE;
277
 
 
278
 
        if (strcmp(line, "changes") == 0)
279
 
                worker->worker.unexpected_changes = TRUE;
280
 
        else if (strcmp(line, "fail") == 0)
281
 
                success = FALSE;
282
 
        else if (strcmp(line, "ok") != 0) {
283
 
                i_error("Unexpected finish reply: %s", line);
284
 
                success = FALSE;
285
 
        }
286
 
                
287
 
        request->callback.finish(success, request->context);
288
 
}
289
 
 
290
 
static bool
291
 
proxy_client_worker_next_reply(struct proxy_client_dsync_worker *worker,
292
 
                               const char *line)
293
 
{
294
 
        const struct proxy_client_request *requests;
295
 
        struct proxy_client_request request;
296
 
        bool ret = TRUE;
297
 
 
298
 
        i_assert(worker->msg_get_data.input == NULL);
299
 
 
300
 
        if (aqueue_count(worker->request_queue) == 0) {
301
 
                i_error("Unexpected reply from server: %s", line);
302
 
                proxy_client_fail(worker);
303
 
                return FALSE;
304
 
        }
305
 
 
306
 
        requests = array_idx(&worker->request_array, 0);
307
 
        request = requests[aqueue_idx(worker->request_queue, 0)];
308
 
        aqueue_delete_tail(worker->request_queue);
309
 
 
310
 
        switch (request.type) {
311
 
        case PROXY_CLIENT_REQUEST_TYPE_COPY:
312
 
                ret = proxy_client_worker_next_copy(worker, &request, line);
313
 
                break;
314
 
        case PROXY_CLIENT_REQUEST_TYPE_GET:
315
 
                ret = proxy_client_worker_next_msg_get(worker, &request, line);
316
 
                break;
317
 
        case PROXY_CLIENT_REQUEST_TYPE_FINISH:
318
 
                proxy_client_worker_next_finish(worker, &request, line);
319
 
                break;
320
 
        }
321
 
        return ret;
322
 
}
323
 
 
324
 
static void proxy_client_worker_input(struct proxy_client_dsync_worker *worker)
325
 
{
326
 
        const char *line;
327
 
        int ret;
328
 
 
329
 
        if (worker->to_input != NULL)
330
 
                timeout_remove(&worker->to_input);
331
 
 
332
 
        if (worker->worker.input_callback != NULL) {
333
 
                worker->worker.input_callback(worker->worker.input_context);
334
 
                timeout_reset(worker->to);
335
 
                return;
336
 
        }
337
 
 
338
 
        while ((ret = proxy_client_worker_read_line(worker, &line)) > 0) {
339
 
                if (!proxy_client_worker_next_reply(worker, line))
340
 
                        break;
341
 
        }
342
 
        if (ret < 0) {
343
 
                /* try to continue */
344
 
                proxy_client_worker_next_reply(worker, "");
345
 
        }
346
 
 
347
 
        if (worker->to_input != NULL) {
348
 
                /* input stream's destroy callback was already called.
349
 
                   don't get back here. */
350
 
                timeout_remove(&worker->to_input);
351
 
        }
352
 
        timeout_reset(worker->to);
353
 
}
354
 
 
355
 
static int
356
 
proxy_client_worker_output_real(struct proxy_client_dsync_worker *worker)
357
 
{
358
 
        int ret;
359
 
 
360
 
        if ((ret = o_stream_flush(worker->output)) < 0)
361
 
                return 1;
362
 
 
363
 
        if (worker->save_input != NULL) {
364
 
                /* proxy_client_worker_msg_save() hasn't finished yet. */
365
 
                o_stream_cork(worker->output);
366
 
                proxy_client_send_stream(worker);
367
 
                if (worker->save_input != NULL) {
368
 
                        /* still unfinished, make sure we get called again */
369
 
                        return 0;
370
 
                }
371
 
        }
372
 
 
373
 
        if (worker->worker.output_callback != NULL)
374
 
                worker->worker.output_callback(worker->worker.output_context);
375
 
        return ret;
376
 
}
377
 
 
378
 
static int proxy_client_worker_output(struct proxy_client_dsync_worker *worker)
379
 
{
380
 
        int ret;
381
 
 
382
 
        ret = proxy_client_worker_output_real(worker);
383
 
        timeout_reset(worker->to);
384
 
        return ret;
385
 
}
386
 
 
387
 
static void
388
 
proxy_client_worker_timeout(struct proxy_client_dsync_worker *worker)
389
 
{
390
 
        const char *reason;
391
 
 
392
 
        if (worker->save_io != NULL)
393
 
                reason = " (waiting for more input from mail being saved)";
394
 
        else if (worker->save_input != NULL) {
395
 
                size_t bytes = o_stream_get_buffer_used_size(worker->output);
396
 
 
397
 
                reason = t_strdup_printf(" (waiting for output stream to flush, "
398
 
                                         "%"PRIuSIZE_T" bytes left)", bytes);
399
 
        } else if (worker->msg_get_data.input != NULL) {
400
 
                reason = " (waiting for MSG-GET message from remote)";
401
 
        } else {
402
 
                reason = "";
403
 
        }
404
 
        i_error("proxy client timed out%s", reason);
405
 
        proxy_client_fail(worker);
406
 
}
407
 
 
408
 
struct dsync_worker *dsync_worker_init_proxy_client(int fd_in, int fd_out)
409
 
{
410
 
        struct proxy_client_dsync_worker *worker;
411
 
 
412
 
        worker = i_new(struct proxy_client_dsync_worker, 1);
413
 
        worker->worker.v = proxy_client_dsync_worker;
414
 
        worker->fd_in = fd_in;
415
 
        worker->fd_out = fd_out;
416
 
        worker->to = timeout_add(DSYNC_PROXY_CLIENT_TIMEOUT_MSECS,
417
 
                                 proxy_client_worker_timeout, worker);
418
 
        worker->io = io_add(fd_in, IO_READ, proxy_client_worker_input, worker);
419
 
        worker->input = i_stream_create_fd(fd_in, (size_t)-1, FALSE);
420
 
        worker->output = o_stream_create_fd(fd_out, (size_t)-1, FALSE);
421
 
        o_stream_send_str(worker->output, DSYNC_PROXY_CLIENT_GREETING_LINE"\n");
422
 
        /* we'll keep the output corked until flush is needed */
423
 
        o_stream_cork(worker->output);
424
 
        o_stream_set_flush_callback(worker->output, proxy_client_worker_output,
425
 
                                    worker);
426
 
        fd_set_nonblock(fd_in, TRUE);
427
 
        fd_set_nonblock(fd_out, TRUE);
428
 
 
429
 
        worker->pending_commands = str_new(default_pool, 1024);
430
 
        worker->msg_get_pool = pool_alloconly_create("dsync proxy msg", 128);
431
 
        i_array_init(&worker->request_array, 64);
432
 
        worker->request_queue = aqueue_init(&worker->request_array.arr);
433
 
 
434
 
        return &worker->worker;
435
 
}
436
 
 
437
 
static void proxy_client_worker_deinit(struct dsync_worker *_worker)
438
 
{
439
 
        struct proxy_client_dsync_worker *worker =
440
 
                (struct proxy_client_dsync_worker *)_worker;
441
 
 
442
 
        timeout_remove(&worker->to);
443
 
        if (worker->to_input != NULL)
444
 
                timeout_remove(&worker->to_input);
445
 
        if (worker->io != NULL)
446
 
                io_remove(&worker->io);
447
 
        i_stream_destroy(&worker->input);
448
 
        o_stream_destroy(&worker->output);
449
 
        if (close(worker->fd_in) < 0)
450
 
                i_error("close(worker input) failed: %m");
451
 
        if (worker->fd_in != worker->fd_out) {
452
 
                if (close(worker->fd_out) < 0)
453
 
                        i_error("close(worker output) failed: %m");
454
 
        }
455
 
        aqueue_deinit(&worker->request_queue);
456
 
        array_free(&worker->request_array);
457
 
        pool_unref(&worker->msg_get_pool);
458
 
        str_free(&worker->pending_commands);
459
 
        i_free(worker);
460
 
}
461
 
 
462
 
static bool
463
 
worker_is_output_stream_full(struct proxy_client_dsync_worker *worker)
464
 
{
465
 
        return o_stream_get_buffer_used_size(worker->output) >=
466
 
                OUTBUF_THROTTLE_SIZE;
467
 
}
468
 
 
469
 
static bool proxy_client_worker_is_output_full(struct dsync_worker *_worker)
470
 
{
471
 
        struct proxy_client_dsync_worker *worker =
472
 
                (struct proxy_client_dsync_worker *)_worker;
473
 
 
474
 
        if (worker->save_input != NULL) {
475
 
                /* we haven't finished sending a message save, so we're full. */
476
 
                return TRUE;
477
 
        }
478
 
        return worker_is_output_stream_full(worker);
479
 
}
480
 
 
481
 
static int proxy_client_worker_output_flush(struct dsync_worker *_worker)
482
 
{
483
 
        struct proxy_client_dsync_worker *worker =
484
 
                (struct proxy_client_dsync_worker *)_worker;
485
 
        int ret = 1;
486
 
 
487
 
        if (o_stream_flush(worker->output) < 0)
488
 
                return -1;
489
 
 
490
 
        o_stream_uncork(worker->output);
491
 
        if (o_stream_get_buffer_used_size(worker->output) > 0)
492
 
                return 0;
493
 
 
494
 
        if (o_stream_send(worker->output, str_data(worker->pending_commands),
495
 
                          str_len(worker->pending_commands)) < 0)
496
 
                ret = -1;
497
 
        str_truncate(worker->pending_commands, 0);
498
 
        o_stream_cork(worker->output);
499
 
        return ret;
500
 
}
501
 
 
502
 
static struct dsync_worker_mailbox_iter *
503
 
proxy_client_worker_mailbox_iter_init(struct dsync_worker *_worker)
504
 
{
505
 
        struct proxy_client_dsync_worker *worker =
506
 
                (struct proxy_client_dsync_worker *)_worker;
507
 
        struct proxy_client_dsync_worker_mailbox_iter *iter;
508
 
 
509
 
        iter = i_new(struct proxy_client_dsync_worker_mailbox_iter, 1);
510
 
        iter->iter.worker = _worker;
511
 
        iter->pool = pool_alloconly_create("proxy mailbox iter", 1024);
512
 
        o_stream_send_str(worker->output, "BOX-LIST\n");
513
 
        (void)proxy_client_worker_output_flush(_worker);
514
 
        return &iter->iter;
515
 
}
516
 
 
517
 
static int
518
 
proxy_client_worker_mailbox_iter_next(struct dsync_worker_mailbox_iter *_iter,
519
 
                                      struct dsync_mailbox *dsync_box_r)
520
 
{
521
 
        struct proxy_client_dsync_worker_mailbox_iter *iter =
522
 
                (struct proxy_client_dsync_worker_mailbox_iter *)_iter;
523
 
        struct proxy_client_dsync_worker *worker =
524
 
                (struct proxy_client_dsync_worker *)_iter->worker;
525
 
        const char *line, *error;
526
 
        int ret;
527
 
 
528
 
        if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
529
 
                if (ret < 0)
530
 
                        _iter->failed = TRUE;
531
 
                return ret;
532
 
        }
533
 
 
534
 
        if ((line[0] == '+' || line[0] == '-') && line[1] == '\0') {
535
 
                /* end of mailboxes */
536
 
                if (line[0] == '-') {
537
 
                        i_error("Worker server's mailbox iteration failed");
538
 
                        _iter->failed = TRUE;
539
 
                }
540
 
                return -1;
541
 
        }
542
 
 
543
 
        p_clear(iter->pool);
544
 
        if (dsync_proxy_mailbox_import(iter->pool, line,
545
 
                                       dsync_box_r, &error) < 0) {
546
 
                i_error("Invalid mailbox input from worker server: %s", error);
547
 
                _iter->failed = TRUE;
548
 
                return -1;
549
 
        }
550
 
        return 1;
551
 
}
552
 
 
553
 
static int
554
 
proxy_client_worker_mailbox_iter_deinit(struct dsync_worker_mailbox_iter *_iter)
555
 
{
556
 
        struct proxy_client_dsync_worker_mailbox_iter *iter =
557
 
                (struct proxy_client_dsync_worker_mailbox_iter *)_iter;
558
 
        int ret = _iter->failed ? -1 : 0;
559
 
 
560
 
        pool_unref(&iter->pool);
561
 
        i_free(iter);
562
 
        return ret;
563
 
}
564
 
 
565
 
static struct dsync_worker_subs_iter *
566
 
proxy_client_worker_subs_iter_init(struct dsync_worker *_worker)
567
 
{
568
 
        struct proxy_client_dsync_worker *worker =
569
 
                (struct proxy_client_dsync_worker *)_worker;
570
 
        struct proxy_client_dsync_worker_subs_iter *iter;
571
 
 
572
 
        iter = i_new(struct proxy_client_dsync_worker_subs_iter, 1);
573
 
        iter->iter.worker = _worker;
574
 
        iter->pool = pool_alloconly_create("proxy subscription iter", 1024);
575
 
        o_stream_send_str(worker->output, "SUBS-LIST\n");
576
 
        (void)proxy_client_worker_output_flush(_worker);
577
 
        return &iter->iter;
578
 
}
579
 
 
580
 
static int
581
 
proxy_client_worker_subs_iter_next_line(struct proxy_client_dsync_worker_subs_iter *iter,
582
 
                                        unsigned int wanted_arg_count,
583
 
                                        char ***args_r)
584
 
{
585
 
        struct proxy_client_dsync_worker *worker =
586
 
                (struct proxy_client_dsync_worker *)iter->iter.worker;
587
 
        const char *line;
588
 
        char **args;
589
 
        int ret;
590
 
 
591
 
        if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
592
 
                if (ret < 0)
593
 
                        iter->iter.failed = TRUE;
594
 
                return ret;
595
 
        }
596
 
 
597
 
        if ((line[0] == '+' || line[0] == '-') && line[1] == '\0') {
598
 
                /* end of subscribed subscriptions */
599
 
                if (line[0] == '-') {
600
 
                        i_error("Worker server's subscription iteration failed");
601
 
                        iter->iter.failed = TRUE;
602
 
                }
603
 
                return -1;
604
 
        }
605
 
 
606
 
        p_clear(iter->pool);
607
 
        args = p_strsplit(iter->pool, line, "\t");
608
 
        if (str_array_length((const char *const *)args) < wanted_arg_count) {
609
 
                i_error("Invalid subscription input from worker server");
610
 
                iter->iter.failed = TRUE;
611
 
                return -1;
612
 
        }
613
 
        *args_r = args;
614
 
        return 1;
615
 
}
616
 
 
617
 
static int
618
 
proxy_client_worker_subs_iter_next(struct dsync_worker_subs_iter *_iter,
619
 
                                   struct dsync_worker_subscription *rec_r)
620
 
{
621
 
        struct proxy_client_dsync_worker_subs_iter *iter =
622
 
                (struct proxy_client_dsync_worker_subs_iter *)_iter;
623
 
        char **args;
624
 
        int ret;
625
 
 
626
 
        ret = proxy_client_worker_subs_iter_next_line(iter, 4, &args);
627
 
        if (ret <= 0)
628
 
                return ret;
629
 
 
630
 
        rec_r->vname = str_tabunescape(args[0]);
631
 
        rec_r->storage_name = str_tabunescape(args[1]);
632
 
        rec_r->ns_prefix = str_tabunescape(args[2]);
633
 
        rec_r->last_change = strtoul(args[3], NULL, 10);
634
 
        return 1;
635
 
}
636
 
 
637
 
static int
638
 
proxy_client_worker_subs_iter_next_un(struct dsync_worker_subs_iter *_iter,
639
 
                                      struct dsync_worker_unsubscription *rec_r)
640
 
{
641
 
        struct proxy_client_dsync_worker_subs_iter *iter =
642
 
                (struct proxy_client_dsync_worker_subs_iter *)_iter;
643
 
        char **args;
644
 
        int ret;
645
 
 
646
 
        ret = proxy_client_worker_subs_iter_next_line(iter, 3, &args);
647
 
        if (ret <= 0)
648
 
                return ret;
649
 
 
650
 
        memset(rec_r, 0, sizeof(*rec_r));
651
 
        if (dsync_proxy_mailbox_guid_import(args[0], &rec_r->name_sha1) < 0) {
652
 
                i_error("Invalid subscription input from worker server: "
653
 
                        "Invalid unsubscription mailbox GUID");
654
 
                iter->iter.failed = TRUE;
655
 
                return -1;
656
 
        }
657
 
        rec_r->ns_prefix = str_tabunescape(args[1]);
658
 
        rec_r->last_change = strtoul(args[2], NULL, 10);
659
 
        return 1;
660
 
}
661
 
 
662
 
static int
663
 
proxy_client_worker_subs_iter_deinit(struct dsync_worker_subs_iter *_iter)
664
 
{
665
 
        struct proxy_client_dsync_worker_subs_iter *iter =
666
 
                (struct proxy_client_dsync_worker_subs_iter *)_iter;
667
 
        int ret = _iter->failed ? -1 : 0;
668
 
 
669
 
        pool_unref(&iter->pool);
670
 
        i_free(iter);
671
 
        return ret;
672
 
}
673
 
 
674
 
static void
675
 
proxy_client_worker_set_subscribed(struct dsync_worker *_worker,
676
 
                                   const char *name, time_t last_change,
677
 
                                   bool set)
678
 
{
679
 
        struct proxy_client_dsync_worker *worker =
680
 
                (struct proxy_client_dsync_worker *)_worker;
681
 
 
682
 
        T_BEGIN {
683
 
                string_t *str = t_str_new(128);
684
 
 
685
 
                str_append(str, "SUBS-SET\t");
686
 
                str_tabescape_write(str, name);
687
 
                str_printfa(str, "\t%s\t%d\n", dec2str(last_change),
688
 
                            set ? 1 : 0);
689
 
                o_stream_send(worker->output, str_data(str), str_len(str));
690
 
        } T_END;
691
 
}
692
 
 
693
 
struct proxy_client_dsync_worker_msg_iter {
694
 
        struct dsync_worker_msg_iter iter;
695
 
        pool_t pool;
696
 
        bool done;
697
 
};
698
 
 
699
 
static struct dsync_worker_msg_iter *
700
 
proxy_client_worker_msg_iter_init(struct dsync_worker *_worker,
701
 
                                  const mailbox_guid_t mailboxes[],
702
 
                                  unsigned int mailbox_count)
703
 
{
704
 
        struct proxy_client_dsync_worker *worker =
705
 
                (struct proxy_client_dsync_worker *)_worker;
706
 
        struct proxy_client_dsync_worker_msg_iter *iter;
707
 
        string_t *str;
708
 
        unsigned int i;
709
 
 
710
 
        iter = i_new(struct proxy_client_dsync_worker_msg_iter, 1);
711
 
        iter->iter.worker = _worker;
712
 
        iter->pool = pool_alloconly_create("proxy message iter", 10240);
713
 
 
714
 
        str = str_new(iter->pool, 512);
715
 
        str_append(str, "MSG-LIST");
716
 
        for (i = 0; i < mailbox_count; i++) T_BEGIN {
717
 
                str_append_c(str, '\t');
718
 
                dsync_proxy_mailbox_guid_export(str, &mailboxes[i]);
719
 
        } T_END;
720
 
        str_append_c(str, '\n');
721
 
        o_stream_send(worker->output, str_data(str), str_len(str));
722
 
        p_clear(iter->pool);
723
 
 
724
 
        (void)proxy_client_worker_output_flush(_worker);
725
 
        return &iter->iter;
726
 
}
727
 
 
728
 
static int
729
 
proxy_client_worker_msg_iter_next(struct dsync_worker_msg_iter *_iter,
730
 
                                  unsigned int *mailbox_idx_r,
731
 
                                  struct dsync_message *msg_r)
732
 
{
733
 
        struct proxy_client_dsync_worker_msg_iter *iter =
734
 
                (struct proxy_client_dsync_worker_msg_iter *)_iter;
735
 
        struct proxy_client_dsync_worker *worker =
736
 
                (struct proxy_client_dsync_worker *)_iter->worker;
737
 
        const char *line, *error;
738
 
        int ret;
739
 
 
740
 
        if (iter->done)
741
 
                return -1;
742
 
 
743
 
        if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
744
 
                if (ret < 0)
745
 
                        _iter->failed = TRUE;
746
 
                return ret;
747
 
        }
748
 
 
749
 
        if ((line[0] == '+' || line[0] == '-') && line[1] == '\0') {
750
 
                /* end of messages */
751
 
                if (line[0] == '-') {
752
 
                        i_error("Worker server's message iteration failed");
753
 
                        _iter->failed = TRUE;
754
 
                }
755
 
                iter->done = TRUE;
756
 
                return -1;
757
 
        }
758
 
 
759
 
        *mailbox_idx_r = 0;
760
 
        while (*line >= '0' && *line <= '9') {
761
 
                *mailbox_idx_r = *mailbox_idx_r * 10 + (*line - '0');
762
 
                line++;
763
 
        }
764
 
        if (*line != '\t') {
765
 
                i_error("Invalid mailbox idx from worker server");
766
 
                _iter->failed = TRUE;
767
 
                return -1;
768
 
        }
769
 
        line++;
770
 
 
771
 
        p_clear(iter->pool);
772
 
        if (dsync_proxy_msg_import(iter->pool, line, msg_r, &error) < 0) {
773
 
                i_error("Invalid message input from worker server: %s", error);
774
 
                _iter->failed = TRUE;
775
 
                return -1;
776
 
        }
777
 
        return 1;
778
 
}
779
 
 
780
 
static int
781
 
proxy_client_worker_msg_iter_deinit(struct dsync_worker_msg_iter *_iter)
782
 
{
783
 
        struct proxy_client_dsync_worker_msg_iter *iter =
784
 
                (struct proxy_client_dsync_worker_msg_iter *)_iter;
785
 
        int ret = _iter->failed ? -1 : 0;
786
 
 
787
 
        pool_unref(&iter->pool);
788
 
        i_free(iter);
789
 
        return ret;
790
 
}
791
 
 
792
 
static void
793
 
proxy_client_worker_cmd(struct proxy_client_dsync_worker *worker, string_t *str)
794
 
{
795
 
        if (worker->save_input == NULL)
796
 
                o_stream_send(worker->output, str_data(str), str_len(str));
797
 
        else
798
 
                str_append_str(worker->pending_commands, str);
799
 
}
800
 
 
801
 
static void
802
 
proxy_client_worker_create_mailbox(struct dsync_worker *_worker,
803
 
                                   const struct dsync_mailbox *dsync_box)
804
 
{
805
 
        struct proxy_client_dsync_worker *worker =
806
 
                (struct proxy_client_dsync_worker *)_worker;
807
 
 
808
 
        T_BEGIN {
809
 
                string_t *str = t_str_new(128);
810
 
 
811
 
                str_append(str, "BOX-CREATE\t");
812
 
                dsync_proxy_mailbox_export(str, dsync_box);
813
 
                str_append_c(str, '\n');
814
 
                proxy_client_worker_cmd(worker, str);
815
 
        } T_END;
816
 
}
817
 
 
818
 
static void
819
 
proxy_client_worker_delete_mailbox(struct dsync_worker *_worker,
820
 
                                   const struct dsync_mailbox *dsync_box)
821
 
{
822
 
        struct proxy_client_dsync_worker *worker =
823
 
                (struct proxy_client_dsync_worker *)_worker;
824
 
 
825
 
        T_BEGIN {
826
 
                string_t *str = t_str_new(128);
827
 
 
828
 
                str_append(str, "BOX-DELETE\t");
829
 
                dsync_proxy_mailbox_guid_export(str, &dsync_box->mailbox_guid);
830
 
                str_printfa(str, "\t%s\n", dec2str(dsync_box->last_change));
831
 
                proxy_client_worker_cmd(worker, str);
832
 
        } T_END;
833
 
}
834
 
 
835
 
static void
836
 
proxy_client_worker_delete_dir(struct dsync_worker *_worker,
837
 
                               const struct dsync_mailbox *dsync_box)
838
 
{
839
 
        struct proxy_client_dsync_worker *worker =
840
 
                (struct proxy_client_dsync_worker *)_worker;
841
 
 
842
 
        T_BEGIN {
843
 
                string_t *str = t_str_new(128);
844
 
 
845
 
                str_append(str, "DIR-DELETE\t");
846
 
                str_tabescape_write(str, dsync_box->name);
847
 
                str_printfa(str, "\t%s\n", dec2str(dsync_box->last_change));
848
 
                proxy_client_worker_cmd(worker, str);
849
 
        } T_END;
850
 
}
851
 
 
852
 
static void
853
 
proxy_client_worker_rename_mailbox(struct dsync_worker *_worker,
854
 
                                   const mailbox_guid_t *mailbox,
855
 
                                   const struct dsync_mailbox *dsync_box)
856
 
{
857
 
        struct proxy_client_dsync_worker *worker =
858
 
                (struct proxy_client_dsync_worker *)_worker;
859
 
        char sep[2];
860
 
 
861
 
        T_BEGIN {
862
 
                string_t *str = t_str_new(128);
863
 
 
864
 
                str_append(str, "BOX-RENAME\t");
865
 
                dsync_proxy_mailbox_guid_export(str, mailbox);
866
 
                str_append_c(str, '\t');
867
 
                str_tabescape_write(str, dsync_box->name);
868
 
                str_append_c(str, '\t');
869
 
                sep[0] = dsync_box->name_sep; sep[1] = '\0';
870
 
                str_tabescape_write(str, sep);
871
 
                str_append_c(str, '\n');
872
 
                proxy_client_worker_cmd(worker, str);
873
 
        } T_END;
874
 
}
875
 
 
876
 
static void
877
 
proxy_client_worker_update_mailbox(struct dsync_worker *_worker,
878
 
                                   const struct dsync_mailbox *dsync_box)
879
 
{
880
 
        struct proxy_client_dsync_worker *worker =
881
 
                (struct proxy_client_dsync_worker *)_worker;
882
 
 
883
 
        T_BEGIN {
884
 
                string_t *str = t_str_new(128);
885
 
 
886
 
                str_append(str, "BOX-UPDATE\t");
887
 
                dsync_proxy_mailbox_export(str, dsync_box);
888
 
                str_append_c(str, '\n');
889
 
                proxy_client_worker_cmd(worker, str);
890
 
        } T_END;
891
 
}
892
 
 
893
 
static void
894
 
proxy_client_worker_select_mailbox(struct dsync_worker *_worker,
895
 
                                   const mailbox_guid_t *mailbox,
896
 
                                   const ARRAY_TYPE(const_string) *cache_fields)
897
 
{
898
 
        struct proxy_client_dsync_worker *worker =
899
 
                (struct proxy_client_dsync_worker *)_worker;
900
 
 
901
 
        if (dsync_guid_equals(&worker->selected_box_guid, mailbox))
902
 
                return;
903
 
        worker->selected_box_guid = *mailbox;
904
 
 
905
 
        T_BEGIN {
906
 
                string_t *str = t_str_new(128);
907
 
 
908
 
                str_append(str, "BOX-SELECT\t");
909
 
                dsync_proxy_mailbox_guid_export(str, mailbox);
910
 
                if (cache_fields != NULL)
911
 
                        dsync_proxy_strings_export(str, cache_fields);
912
 
                str_append_c(str, '\n');
913
 
                proxy_client_worker_cmd(worker, str);
914
 
        } T_END;
915
 
}
916
 
 
917
 
static void
918
 
proxy_client_worker_msg_update_metadata(struct dsync_worker *_worker,
919
 
                                        const struct dsync_message *msg)
920
 
{
921
 
        struct proxy_client_dsync_worker *worker =
922
 
                (struct proxy_client_dsync_worker *)_worker;
923
 
 
924
 
        T_BEGIN {
925
 
                string_t *str = t_str_new(128);
926
 
 
927
 
                str_printfa(str, "MSG-UPDATE\t%u\t%llu\t", msg->uid,
928
 
                            (unsigned long long)msg->modseq);
929
 
                imap_write_flags(str, msg->flags, msg->keywords);
930
 
                str_append_c(str, '\n');
931
 
                proxy_client_worker_cmd(worker, str);
932
 
        } T_END;
933
 
}
934
 
 
935
 
static void
936
 
proxy_client_worker_msg_update_uid(struct dsync_worker *_worker,
937
 
                                   uint32_t old_uid, uint32_t new_uid)
938
 
{
939
 
        struct proxy_client_dsync_worker *worker =
940
 
                (struct proxy_client_dsync_worker *)_worker;
941
 
 
942
 
        T_BEGIN {
943
 
                string_t *str = t_str_new(64);
944
 
                str_printfa(str, "MSG-UID-CHANGE\t%u\t%u\n", old_uid, new_uid);
945
 
                proxy_client_worker_cmd(worker, str);
946
 
        } T_END;
947
 
}
948
 
 
949
 
static void
950
 
proxy_client_worker_msg_expunge(struct dsync_worker *_worker, uint32_t uid)
951
 
{
952
 
        struct proxy_client_dsync_worker *worker =
953
 
                (struct proxy_client_dsync_worker *)_worker;
954
 
 
955
 
        T_BEGIN {
956
 
                string_t *str = t_str_new(64);
957
 
                str_printfa(str, "MSG-EXPUNGE\t%u\n", uid);
958
 
                proxy_client_worker_cmd(worker, str);
959
 
        } T_END;
960
 
}
961
 
 
962
 
static void
963
 
proxy_client_worker_msg_copy(struct dsync_worker *_worker,
964
 
                             const mailbox_guid_t *src_mailbox,
965
 
                             uint32_t src_uid,
966
 
                             const struct dsync_message *dest_msg,
967
 
                             dsync_worker_copy_callback_t *callback,
968
 
                             void *context)
969
 
{
970
 
        struct proxy_client_dsync_worker *worker =
971
 
                (struct proxy_client_dsync_worker *)_worker;
972
 
        struct proxy_client_request request;
973
 
 
974
 
        T_BEGIN {
975
 
                string_t *str = t_str_new(128);
976
 
 
977
 
                str_append(str, "MSG-COPY\t");
978
 
                dsync_proxy_mailbox_guid_export(str, src_mailbox);
979
 
                str_printfa(str, "\t%u\t", src_uid);
980
 
                dsync_proxy_msg_export(str, dest_msg);
981
 
                str_append_c(str, '\n');
982
 
                proxy_client_worker_cmd(worker, str);
983
 
        } T_END;
984
 
 
985
 
        memset(&request, 0, sizeof(request));
986
 
        request.type = PROXY_CLIENT_REQUEST_TYPE_COPY;
987
 
        request.callback.copy = callback;
988
 
        request.context = context;
989
 
        request.uid = src_uid;
990
 
        aqueue_append(worker->request_queue, &request);
991
 
}
992
 
 
993
 
static void
994
 
proxy_client_send_stream_real(struct proxy_client_dsync_worker *worker)
995
 
{
996
 
        dsync_worker_save_callback_t *callback;
997
 
        void *context;
998
 
        struct istream *input;
999
 
        const unsigned char *data;
1000
 
        size_t size;
1001
 
        int ret;
1002
 
 
1003
 
        while ((ret = i_stream_read_data(worker->save_input,
1004
 
                                         &data, &size, 0)) > 0) {
1005
 
                dsync_proxy_send_dot_output(worker->output,
1006
 
                                            &worker->save_input_last_lf,
1007
 
                                            data, size);
1008
 
                i_stream_skip(worker->save_input, size);
1009
 
 
1010
 
                if (worker_is_output_stream_full(worker)) {
1011
 
                        o_stream_uncork(worker->output);
1012
 
                        if (worker_is_output_stream_full(worker))
1013
 
                                return;
1014
 
                        o_stream_cork(worker->output);
1015
 
                }
1016
 
        }
1017
 
        if (ret == 0) {
1018
 
                /* waiting for more input */
1019
 
                o_stream_uncork(worker->output);
1020
 
                if (worker->save_io == NULL) {
1021
 
                        int fd = i_stream_get_fd(worker->save_input);
1022
 
 
1023
 
                        worker->save_io =
1024
 
                                io_add(fd, IO_READ,
1025
 
                                       proxy_client_send_stream, worker);
1026
 
                }
1027
 
                return;
1028
 
        }
1029
 
        if (worker->save_io != NULL)
1030
 
                io_remove(&worker->save_io);
1031
 
        if (worker->save_input->stream_errno != 0) {
1032
 
                errno = worker->save_input->stream_errno;
1033
 
                i_error("proxy: reading message input failed: %m");
1034
 
                o_stream_close(worker->output);
1035
 
        } else {
1036
 
                i_assert(!i_stream_have_bytes_left(worker->save_input));
1037
 
                o_stream_send(worker->output, "\n.\n", 3);
1038
 
        }
1039
 
 
1040
 
        callback = worker->save_callback;
1041
 
        context = worker->save_context;
1042
 
        worker->save_callback = NULL;
1043
 
        worker->save_context = NULL;
1044
 
 
1045
 
        /* a bit ugly way to free the stream. the problem is that local worker
1046
 
           has set a destroy callback, which in turn can call our msg_save()
1047
 
           again before the i_stream_unref() is finished. */
1048
 
        input = worker->save_input;
1049
 
        worker->save_input = NULL;
1050
 
        i_stream_unref(&input);
1051
 
 
1052
 
        (void)proxy_client_worker_output_flush(&worker->worker);
1053
 
 
1054
 
        callback(context);
1055
 
}
1056
 
 
1057
 
static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker)
1058
 
{
1059
 
        proxy_client_send_stream_real(worker);
1060
 
        timeout_reset(worker->to);
1061
 
}
1062
 
 
1063
 
static void
1064
 
proxy_client_worker_msg_save(struct dsync_worker *_worker,
1065
 
                             const struct dsync_message *msg,
1066
 
                             const struct dsync_msg_static_data *data,
1067
 
                             dsync_worker_save_callback_t *callback,
1068
 
                             void *context)
1069
 
{
1070
 
        struct proxy_client_dsync_worker *worker =
1071
 
                (struct proxy_client_dsync_worker *)_worker;
1072
 
 
1073
 
        T_BEGIN {
1074
 
                string_t *str = t_str_new(128);
1075
 
 
1076
 
                str_append(str, "MSG-SAVE\t");
1077
 
                dsync_proxy_msg_static_export(str, data);
1078
 
                str_append_c(str, '\t');
1079
 
                dsync_proxy_msg_export(str, msg);
1080
 
                str_append_c(str, '\n');
1081
 
                proxy_client_worker_cmd(worker, str);
1082
 
        } T_END;
1083
 
 
1084
 
        i_assert(worker->save_input == NULL);
1085
 
        worker->save_callback = callback;
1086
 
        worker->save_context = context;
1087
 
        worker->save_input = data->input;
1088
 
        worker->save_input_last_lf = TRUE;
1089
 
        i_stream_ref(worker->save_input);
1090
 
        proxy_client_send_stream(worker);
1091
 
}
1092
 
 
1093
 
static void
1094
 
proxy_client_worker_msg_save_cancel(struct dsync_worker *_worker)
1095
 
{
1096
 
        struct proxy_client_dsync_worker *worker =
1097
 
                (struct proxy_client_dsync_worker *)_worker;
1098
 
 
1099
 
        if (worker->save_io != NULL)
1100
 
                io_remove(&worker->save_io);
1101
 
        if (worker->save_input != NULL)
1102
 
                i_stream_unref(&worker->save_input);
1103
 
}
1104
 
 
1105
 
static void
1106
 
proxy_client_worker_msg_get(struct dsync_worker *_worker,
1107
 
                            const mailbox_guid_t *mailbox, uint32_t uid,
1108
 
                            dsync_worker_msg_callback_t *callback,
1109
 
                            void *context)
1110
 
{
1111
 
        struct proxy_client_dsync_worker *worker =
1112
 
                (struct proxy_client_dsync_worker *)_worker;
1113
 
        struct proxy_client_request request;
1114
 
 
1115
 
        T_BEGIN {
1116
 
                string_t *str = t_str_new(128);
1117
 
 
1118
 
                str_append(str, "MSG-GET\t");
1119
 
                dsync_proxy_mailbox_guid_export(str, mailbox);
1120
 
                str_printfa(str, "\t%u\n", uid);
1121
 
                proxy_client_worker_cmd(worker, str);
1122
 
        } T_END;
1123
 
 
1124
 
        memset(&request, 0, sizeof(request));
1125
 
        request.type = PROXY_CLIENT_REQUEST_TYPE_GET;
1126
 
        request.callback.get = callback;
1127
 
        request.context = context;
1128
 
        request.uid = uid;
1129
 
        aqueue_append(worker->request_queue, &request);
1130
 
}
1131
 
 
1132
 
static void
1133
 
proxy_client_worker_finish(struct dsync_worker *_worker,
1134
 
                           dsync_worker_finish_callback_t *callback,
1135
 
                           void *context)
1136
 
{
1137
 
        struct proxy_client_dsync_worker *worker =
1138
 
                (struct proxy_client_dsync_worker *)_worker;
1139
 
        struct proxy_client_request request;
1140
 
 
1141
 
        i_assert(worker->save_input == NULL);
1142
 
        i_assert(!worker->finishing);
1143
 
 
1144
 
        worker->finishing = TRUE;
1145
 
        worker->finished = FALSE;
1146
 
 
1147
 
        o_stream_send_str(worker->output, "FINISH\n");
1148
 
        o_stream_uncork(worker->output);
1149
 
 
1150
 
        memset(&request, 0, sizeof(request));
1151
 
        request.type = PROXY_CLIENT_REQUEST_TYPE_FINISH;
1152
 
        request.callback.finish = callback;
1153
 
        request.context = context;
1154
 
        aqueue_append(worker->request_queue, &request);
1155
 
}
1156
 
 
1157
 
struct dsync_worker_vfuncs proxy_client_dsync_worker = {
1158
 
        proxy_client_worker_deinit,
1159
 
 
1160
 
        proxy_client_worker_is_output_full,
1161
 
        proxy_client_worker_output_flush,
1162
 
 
1163
 
        proxy_client_worker_mailbox_iter_init,
1164
 
        proxy_client_worker_mailbox_iter_next,
1165
 
        proxy_client_worker_mailbox_iter_deinit,
1166
 
 
1167
 
        proxy_client_worker_subs_iter_init,
1168
 
        proxy_client_worker_subs_iter_next,
1169
 
        proxy_client_worker_subs_iter_next_un,
1170
 
        proxy_client_worker_subs_iter_deinit,
1171
 
        proxy_client_worker_set_subscribed,
1172
 
 
1173
 
        proxy_client_worker_msg_iter_init,
1174
 
        proxy_client_worker_msg_iter_next,
1175
 
        proxy_client_worker_msg_iter_deinit,
1176
 
 
1177
 
        proxy_client_worker_create_mailbox,
1178
 
        proxy_client_worker_delete_mailbox,
1179
 
        proxy_client_worker_delete_dir,
1180
 
        proxy_client_worker_rename_mailbox,
1181
 
        proxy_client_worker_update_mailbox,
1182
 
 
1183
 
        proxy_client_worker_select_mailbox,
1184
 
        proxy_client_worker_msg_update_metadata,
1185
 
        proxy_client_worker_msg_update_uid,
1186
 
        proxy_client_worker_msg_expunge,
1187
 
        proxy_client_worker_msg_copy,
1188
 
        proxy_client_worker_msg_save,
1189
 
        proxy_client_worker_msg_save_cancel,
1190
 
        proxy_client_worker_msg_get,
1191
 
        proxy_client_worker_finish
1192
 
};