~ubuntu-branches/ubuntu/utopic/dovecot/utopic-proposed

« back to all changes in this revision

Viewing changes to src/doveadm/dsync/dsync-proxy-client.c

  • Committer: Package Import Robot
  • Author(s): James Page
  • Date: 2014-01-08 09:35:49 UTC
  • mfrom: (4.1.35 sid)
  • Revision ID: package-import@ubuntu.com-20140108093549-i72o93pux8p0dlaf
Tags: 1:2.2.9-1ubuntu1
* Merge from Debian unstable, remaining changes:
  + Add mail-stack-delivery package:
    - Update d/rules
    - d/control: convert existing dovecot-postfix package to a dummy
      package and add new mail-stack-delivery package.
    - Update maintainer scripts.
    - Rename d/dovecot-postfix.* to debian/mail-stack-delivery.*
    - d/mail-stack-delivery.preinst: Move previously installed backups and
      config files to a new package namespace.
    - d/mail-stack-delivery.prerm: Added to handle downgrades.
  + Use Snakeoil SSL certificates by default:
    - d/control: Depend on ssl-cert.
    - d/dovecot-core.postinst: Relax grep for SSL_* a bit.
  + Add autopkgtest to debian/tests/*.
  + Add ufw integration:
    - d/dovecot-core.ufw.profile: new ufw profile.
    - d/rules: install profile in dovecot-core.
    - d/control: dovecot-core - suggest ufw.
  + d/dovecot-core.dirs: Added usr/share/doc/dovecot-core
  + Add apport hook:
    - d/rules, d/source_dovecot.py
  + Add upstart job:
    - d/rules, d/dovecot-core.dovecot.upstart, d/control,
      d/dovecot-core.dirs, dovecot-imapd.{postrm, postinst, prerm},
      d/dovecot-pop3d.{postinst, postrm, prerm}.
      d/mail-stack-deliver.postinst: Convert init script to upstart.
  + Use the autotools-dev dh addon to update config.guess/config.sub for
    arm64.
* Dropped changes, included in Debian:
  - Update Dovecot name to reflect distribution in login greeting.
  - Update Drac plugin for >= 2.0.0 support.
* d/control: Drop dovecot-postfix package as its no longer required.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
/* Copyright (c) 2009-2012 Dovecot authors, see the included COPYING file */
2
 
 
3
 
#include "lib.h"
4
 
#include "array.h"
5
 
#include "aqueue.h"
6
 
#include "fd-set-nonblock.h"
7
 
#include "istream.h"
8
 
#include "istream-dot.h"
9
 
#include "ostream.h"
10
 
#include "str.h"
11
 
#include "strescape.h"
12
 
#include "imap-util.h"
13
 
#include "dsync-proxy.h"
14
 
#include "dsync-worker-private.h"
15
 
 
16
 
#include <stdlib.h>
17
 
#include <unistd.h>
18
 
 
19
 
#define OUTBUF_THROTTLE_SIZE (1024*64)
20
 
 
21
 
enum proxy_client_request_type {
22
 
        PROXY_CLIENT_REQUEST_TYPE_COPY,
23
 
        PROXY_CLIENT_REQUEST_TYPE_GET,
24
 
        PROXY_CLIENT_REQUEST_TYPE_FINISH
25
 
};
26
 
 
27
 
struct proxy_client_request {
28
 
        enum proxy_client_request_type type;
29
 
        uint32_t uid;
30
 
        union {
31
 
                dsync_worker_msg_callback_t *get;
32
 
                dsync_worker_copy_callback_t *copy;
33
 
                dsync_worker_finish_callback_t *finish;
34
 
        } callback;
35
 
        void *context;
36
 
};
37
 
 
38
 
struct proxy_client_dsync_worker_mailbox_iter {
39
 
        struct dsync_worker_mailbox_iter iter;
40
 
        pool_t pool;
41
 
};
42
 
 
43
 
struct proxy_client_dsync_worker_subs_iter {
44
 
        struct dsync_worker_subs_iter iter;
45
 
        pool_t pool;
46
 
};
47
 
 
48
 
struct proxy_client_dsync_worker {
49
 
        struct dsync_worker worker;
50
 
        int fd_in, fd_out;
51
 
        struct io *io;
52
 
        struct istream *input;
53
 
        struct ostream *output;
54
 
        struct timeout *to, *to_input;
55
 
 
56
 
        mailbox_guid_t selected_box_guid;
57
 
 
58
 
        dsync_worker_save_callback_t *save_callback;
59
 
        void *save_context;
60
 
        struct istream *save_input;
61
 
        struct io *save_io;
62
 
        bool save_input_last_lf;
63
 
 
64
 
        pool_t msg_get_pool;
65
 
        struct dsync_msg_static_data msg_get_data;
66
 
        ARRAY_DEFINE(request_array, struct proxy_client_request);
67
 
        struct aqueue *request_queue;
68
 
        string_t *pending_commands;
69
 
 
70
 
        unsigned int handshake_received:1;
71
 
        unsigned int finishing:1;
72
 
        unsigned int finished:1;
73
 
};
74
 
 
75
 
extern struct dsync_worker_vfuncs proxy_client_dsync_worker;
76
 
 
77
 
static void proxy_client_worker_input(struct proxy_client_dsync_worker *worker);
78
 
static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker);
79
 
 
80
 
static void proxy_client_fail(struct proxy_client_dsync_worker *worker)
81
 
{
82
 
        i_stream_close(worker->input);
83
 
        dsync_worker_set_failure(&worker->worker);
84
 
        io_loop_stop(current_ioloop);
85
 
}
86
 
 
87
 
static int
88
 
proxy_client_worker_read_line(struct proxy_client_dsync_worker *worker,
89
 
                              const char **line_r)
90
 
{
91
 
        if (worker->worker.failed)
92
 
                return -1;
93
 
 
94
 
        *line_r = i_stream_read_next_line(worker->input);
95
 
        if (*line_r == NULL) {
96
 
                if (worker->input->stream_errno != 0) {
97
 
                        errno = worker->input->stream_errno;
98
 
                        i_error("read() from worker server failed: %m");
99
 
                        dsync_worker_set_failure(&worker->worker);
100
 
                        return -1;
101
 
                }
102
 
                if (worker->input->eof) {
103
 
                        if (!worker->finished)
104
 
                                i_error("read() from worker server failed: EOF");
105
 
                        dsync_worker_set_failure(&worker->worker);
106
 
                        return -1;
107
 
                }
108
 
        }
109
 
        if (*line_r == NULL)
110
 
                return 0;
111
 
 
112
 
        if (!worker->handshake_received) {
113
 
                if (strcmp(*line_r, DSYNC_PROXY_SERVER_GREETING_LINE) != 0) {
114
 
                        i_error("Invalid server handshake: %s", *line_r);
115
 
                        dsync_worker_set_failure(&worker->worker);
116
 
                        return -1;
117
 
                }
118
 
                worker->handshake_received = TRUE;
119
 
                return proxy_client_worker_read_line(worker, line_r);
120
 
        }
121
 
        return 1;
122
 
}
123
 
 
124
 
static void
125
 
proxy_client_worker_msg_get_finish(struct proxy_client_dsync_worker *worker)
126
 
{
127
 
        worker->msg_get_data.input = NULL;
128
 
        worker->io = io_add(worker->fd_in, IO_READ,
129
 
                            proxy_client_worker_input, worker);
130
 
 
131
 
        /* some input may already be buffered. note that we may be coming here
132
 
           from the input function itself, in which case this timeout must not
133
 
           be called (we'll remove it later) */
134
 
        if (worker->to_input == NULL) {
135
 
                worker->to_input =
136
 
                        timeout_add(0, proxy_client_worker_input, worker);
137
 
        }
138
 
}
139
 
 
140
 
static void
141
 
proxy_client_worker_read_to_eof(struct proxy_client_dsync_worker *worker)
142
 
{
143
 
        struct istream *input = worker->msg_get_data.input;
144
 
        const unsigned char *data;
145
 
        size_t size;
146
 
        int ret;
147
 
 
148
 
        while ((ret = i_stream_read_data(input, &data, &size, 0)) > 0)
149
 
                i_stream_skip(input, size);
150
 
        if (ret == -1) {
151
 
                i_stream_unref(&input);
152
 
                io_remove(&worker->io);
153
 
                proxy_client_worker_msg_get_finish(worker);
154
 
        }
155
 
        timeout_reset(worker->to);
156
 
}
157
 
 
158
 
static void
159
 
proxy_client_worker_msg_get_done(struct proxy_client_dsync_worker *worker)
160
 
{
161
 
        struct istream *input = worker->msg_get_data.input;
162
 
 
163
 
        i_assert(worker->io == NULL);
164
 
 
165
 
        if (input->eof)
166
 
                proxy_client_worker_msg_get_finish(worker);
167
 
        else {
168
 
                /* saving read the message only partially. we'll need to read
169
 
                   the input until EOF or we'll start treating the input as
170
 
                   commands. */
171
 
                worker->io = io_add(worker->fd_in, IO_READ,
172
 
                                    proxy_client_worker_read_to_eof, worker);
173
 
                worker->msg_get_data.input =
174
 
                        i_stream_create_dot(worker->input, FALSE);
175
 
        }
176
 
}
177
 
 
178
 
static bool
179
 
proxy_client_worker_next_copy(struct proxy_client_dsync_worker *worker,
180
 
                              const struct proxy_client_request *request,
181
 
                              const char *line)
182
 
{
183
 
        uint32_t uid;
184
 
        bool success;
185
 
 
186
 
        if (line[0] == '1' && line[1] == '\t')
187
 
                success = TRUE;
188
 
        else if (line[0] == '0' && line[1] == '\t')
189
 
                success = FALSE;
190
 
        else {
191
 
                i_error("msg-copy returned invalid input: %s", line);
192
 
                proxy_client_fail(worker);
193
 
                return FALSE;
194
 
        }
195
 
        uid = strtoul(line + 2, NULL, 10);
196
 
        if (uid != request->uid) {
197
 
                i_error("msg-copy returned invalid uid: %u != %u",
198
 
                        uid, request->uid);
199
 
                proxy_client_fail(worker);
200
 
                return FALSE;
201
 
        }
202
 
 
203
 
        request->callback.copy(success, request->context);
204
 
        return TRUE;
205
 
}
206
 
 
207
 
static bool
208
 
proxy_client_worker_next_msg_get(struct proxy_client_dsync_worker *worker,
209
 
                                 const struct proxy_client_request *request,
210
 
                                 const char *line)
211
 
{
212
 
        enum dsync_msg_get_result result = DSYNC_MSG_GET_RESULT_FAILED;
213
 
        const char *p, *error;
214
 
        uint32_t uid;
215
 
 
216
 
        p_clear(worker->msg_get_pool);
217
 
        switch (line[0]) {
218
 
        case '1':
219
 
                /* ok */
220
 
                if (line[1] != '\t')
221
 
                        break;
222
 
                line += 2;
223
 
 
224
 
                if ((p = strchr(line, '\t')) == NULL)
225
 
                        break;
226
 
                uid = strtoul(t_strcut(line, '\t'), NULL, 10);
227
 
                line = p + 1;
228
 
 
229
 
                if (uid != request->uid) {
230
 
                        i_error("msg-get returned invalid uid: %u != %u",
231
 
                                uid, request->uid);
232
 
                        proxy_client_fail(worker);
233
 
                        return FALSE;
234
 
                }
235
 
 
236
 
                if (dsync_proxy_msg_static_import(worker->msg_get_pool,
237
 
                                                  line, &worker->msg_get_data,
238
 
                                                  &error) < 0) {
239
 
                        i_error("Invalid msg-get static input: %s", error);
240
 
                        proxy_client_fail(worker);
241
 
                        return FALSE;
242
 
                }
243
 
                worker->msg_get_data.input =
244
 
                        i_stream_create_dot(worker->input, FALSE);
245
 
                i_stream_set_destroy_callback(worker->msg_get_data.input,
246
 
                                              proxy_client_worker_msg_get_done,
247
 
                                              worker);
248
 
                io_remove(&worker->io);
249
 
                result = DSYNC_MSG_GET_RESULT_SUCCESS;
250
 
                break;
251
 
        case '0':
252
 
                /* expunged */
253
 
                result = DSYNC_MSG_GET_RESULT_EXPUNGED;
254
 
                break;
255
 
        default:
256
 
                /* failure */
257
 
                break;
258
 
        }
259
 
 
260
 
        request->callback.get(result, &worker->msg_get_data, request->context);
261
 
        return worker->io != NULL && worker->msg_get_data.input == NULL;
262
 
}
263
 
 
264
 
static void
265
 
proxy_client_worker_next_finish(struct proxy_client_dsync_worker *worker,
266
 
                                const struct proxy_client_request *request,
267
 
                                const char *line)
268
 
{
269
 
        bool success = TRUE;
270
 
 
271
 
        i_assert(worker->finishing);
272
 
        i_assert(!worker->finished);
273
 
 
274
 
        worker->finishing = FALSE;
275
 
        worker->finished = TRUE;
276
 
 
277
 
        if (strcmp(line, "changes") == 0)
278
 
                worker->worker.unexpected_changes = TRUE;
279
 
        else if (strcmp(line, "fail") == 0)
280
 
                success = FALSE;
281
 
        else if (strcmp(line, "ok") != 0) {
282
 
                i_error("Unexpected finish reply: %s", line);
283
 
                success = FALSE;
284
 
        }
285
 
                
286
 
        request->callback.finish(success, request->context);
287
 
}
288
 
 
289
 
static bool
290
 
proxy_client_worker_next_reply(struct proxy_client_dsync_worker *worker,
291
 
                               const char *line)
292
 
{
293
 
        const struct proxy_client_request *requests;
294
 
        struct proxy_client_request request;
295
 
        bool ret = TRUE;
296
 
 
297
 
        i_assert(worker->msg_get_data.input == NULL);
298
 
 
299
 
        if (aqueue_count(worker->request_queue) == 0) {
300
 
                i_error("Unexpected reply from server: %s", line);
301
 
                proxy_client_fail(worker);
302
 
                return FALSE;
303
 
        }
304
 
 
305
 
        requests = array_idx(&worker->request_array, 0);
306
 
        request = requests[aqueue_idx(worker->request_queue, 0)];
307
 
        aqueue_delete_tail(worker->request_queue);
308
 
 
309
 
        switch (request.type) {
310
 
        case PROXY_CLIENT_REQUEST_TYPE_COPY:
311
 
                ret = proxy_client_worker_next_copy(worker, &request, line);
312
 
                break;
313
 
        case PROXY_CLIENT_REQUEST_TYPE_GET:
314
 
                ret = proxy_client_worker_next_msg_get(worker, &request, line);
315
 
                break;
316
 
        case PROXY_CLIENT_REQUEST_TYPE_FINISH:
317
 
                proxy_client_worker_next_finish(worker, &request, line);
318
 
                break;
319
 
        }
320
 
        return ret;
321
 
}
322
 
 
323
 
static void proxy_client_worker_input(struct proxy_client_dsync_worker *worker)
324
 
{
325
 
        const char *line;
326
 
        int ret;
327
 
 
328
 
        if (worker->to_input != NULL)
329
 
                timeout_remove(&worker->to_input);
330
 
 
331
 
        if (worker->worker.input_callback != NULL) {
332
 
                worker->worker.input_callback(worker->worker.input_context);
333
 
                timeout_reset(worker->to);
334
 
                return;
335
 
        }
336
 
 
337
 
        while ((ret = proxy_client_worker_read_line(worker, &line)) > 0) {
338
 
                if (!proxy_client_worker_next_reply(worker, line))
339
 
                        break;
340
 
        }
341
 
        if (ret < 0) {
342
 
                /* try to continue */
343
 
                proxy_client_worker_next_reply(worker, "");
344
 
        }
345
 
 
346
 
        if (worker->to_input != NULL) {
347
 
                /* input stream's destroy callback was already called.
348
 
                   don't get back here. */
349
 
                timeout_remove(&worker->to_input);
350
 
        }
351
 
        timeout_reset(worker->to);
352
 
}
353
 
 
354
 
static int
355
 
proxy_client_worker_output_real(struct proxy_client_dsync_worker *worker)
356
 
{
357
 
        int ret;
358
 
 
359
 
        if ((ret = o_stream_flush(worker->output)) < 0)
360
 
                return 1;
361
 
 
362
 
        if (worker->save_input != NULL) {
363
 
                /* proxy_client_worker_msg_save() hasn't finished yet. */
364
 
                o_stream_cork(worker->output);
365
 
                proxy_client_send_stream(worker);
366
 
                if (worker->save_input != NULL) {
367
 
                        /* still unfinished, make sure we get called again */
368
 
                        return 0;
369
 
                }
370
 
        }
371
 
 
372
 
        if (worker->worker.output_callback != NULL)
373
 
                worker->worker.output_callback(worker->worker.output_context);
374
 
        return ret;
375
 
}
376
 
 
377
 
static int proxy_client_worker_output(struct proxy_client_dsync_worker *worker)
378
 
{
379
 
        int ret;
380
 
 
381
 
        ret = proxy_client_worker_output_real(worker);
382
 
        timeout_reset(worker->to);
383
 
        return ret;
384
 
}
385
 
 
386
 
static void
387
 
proxy_client_worker_timeout(struct proxy_client_dsync_worker *worker)
388
 
{
389
 
        const char *reason;
390
 
 
391
 
        if (worker->save_io != NULL)
392
 
                reason = " (waiting for more input from mail being saved)";
393
 
        else if (worker->save_input != NULL) {
394
 
                size_t bytes = o_stream_get_buffer_used_size(worker->output);
395
 
 
396
 
                reason = t_strdup_printf(" (waiting for output stream to flush, "
397
 
                                         "%"PRIuSIZE_T" bytes left)", bytes);
398
 
        } else if (worker->msg_get_data.input != NULL) {
399
 
                reason = " (waiting for MSG-GET message from remote)";
400
 
        } else {
401
 
                reason = "";
402
 
        }
403
 
        i_error("proxy client timed out%s", reason);
404
 
        proxy_client_fail(worker);
405
 
}
406
 
 
407
 
struct dsync_worker *dsync_worker_init_proxy_client(int fd_in, int fd_out)
408
 
{
409
 
        struct proxy_client_dsync_worker *worker;
410
 
 
411
 
        worker = i_new(struct proxy_client_dsync_worker, 1);
412
 
        worker->worker.v = proxy_client_dsync_worker;
413
 
        worker->fd_in = fd_in;
414
 
        worker->fd_out = fd_out;
415
 
        worker->to = timeout_add(DSYNC_PROXY_CLIENT_TIMEOUT_MSECS,
416
 
                                 proxy_client_worker_timeout, worker);
417
 
        worker->io = io_add(fd_in, IO_READ, proxy_client_worker_input, worker);
418
 
        worker->input = i_stream_create_fd(fd_in, (size_t)-1, FALSE);
419
 
        worker->output = o_stream_create_fd(fd_out, (size_t)-1, FALSE);
420
 
        o_stream_send_str(worker->output, DSYNC_PROXY_CLIENT_GREETING_LINE"\n");
421
 
        /* we'll keep the output corked until flush is needed */
422
 
        o_stream_cork(worker->output);
423
 
        o_stream_set_flush_callback(worker->output, proxy_client_worker_output,
424
 
                                    worker);
425
 
        fd_set_nonblock(fd_in, TRUE);
426
 
        fd_set_nonblock(fd_out, TRUE);
427
 
 
428
 
        worker->pending_commands = str_new(default_pool, 1024);
429
 
        worker->msg_get_pool = pool_alloconly_create("dsync proxy msg", 128);
430
 
        i_array_init(&worker->request_array, 64);
431
 
        worker->request_queue = aqueue_init(&worker->request_array.arr);
432
 
 
433
 
        return &worker->worker;
434
 
}
435
 
 
436
 
static void proxy_client_worker_deinit(struct dsync_worker *_worker)
437
 
{
438
 
        struct proxy_client_dsync_worker *worker =
439
 
                (struct proxy_client_dsync_worker *)_worker;
440
 
 
441
 
        timeout_remove(&worker->to);
442
 
        if (worker->to_input != NULL)
443
 
                timeout_remove(&worker->to_input);
444
 
        if (worker->io != NULL)
445
 
                io_remove(&worker->io);
446
 
        i_stream_destroy(&worker->input);
447
 
        o_stream_destroy(&worker->output);
448
 
        if (close(worker->fd_in) < 0)
449
 
                i_error("close(worker input) failed: %m");
450
 
        if (worker->fd_in != worker->fd_out) {
451
 
                if (close(worker->fd_out) < 0)
452
 
                        i_error("close(worker output) failed: %m");
453
 
        }
454
 
        aqueue_deinit(&worker->request_queue);
455
 
        array_free(&worker->request_array);
456
 
        pool_unref(&worker->msg_get_pool);
457
 
        str_free(&worker->pending_commands);
458
 
        i_free(worker);
459
 
}
460
 
 
461
 
static bool
462
 
worker_is_output_stream_full(struct proxy_client_dsync_worker *worker)
463
 
{
464
 
        return o_stream_get_buffer_used_size(worker->output) >=
465
 
                OUTBUF_THROTTLE_SIZE;
466
 
}
467
 
 
468
 
static bool proxy_client_worker_is_output_full(struct dsync_worker *_worker)
469
 
{
470
 
        struct proxy_client_dsync_worker *worker =
471
 
                (struct proxy_client_dsync_worker *)_worker;
472
 
 
473
 
        if (worker->save_input != NULL) {
474
 
                /* we haven't finished sending a message save, so we're full. */
475
 
                return TRUE;
476
 
        }
477
 
        return worker_is_output_stream_full(worker);
478
 
}
479
 
 
480
 
static int proxy_client_worker_output_flush(struct dsync_worker *_worker)
481
 
{
482
 
        struct proxy_client_dsync_worker *worker =
483
 
                (struct proxy_client_dsync_worker *)_worker;
484
 
        int ret = 1;
485
 
 
486
 
        if (o_stream_flush(worker->output) < 0)
487
 
                return -1;
488
 
 
489
 
        o_stream_uncork(worker->output);
490
 
        if (o_stream_get_buffer_used_size(worker->output) > 0)
491
 
                return 0;
492
 
 
493
 
        if (o_stream_send(worker->output, str_data(worker->pending_commands),
494
 
                          str_len(worker->pending_commands)) < 0)
495
 
                ret = -1;
496
 
        str_truncate(worker->pending_commands, 0);
497
 
        o_stream_cork(worker->output);
498
 
        return ret;
499
 
}
500
 
 
501
 
static struct dsync_worker_mailbox_iter *
502
 
proxy_client_worker_mailbox_iter_init(struct dsync_worker *_worker)
503
 
{
504
 
        struct proxy_client_dsync_worker *worker =
505
 
                (struct proxy_client_dsync_worker *)_worker;
506
 
        struct proxy_client_dsync_worker_mailbox_iter *iter;
507
 
 
508
 
        iter = i_new(struct proxy_client_dsync_worker_mailbox_iter, 1);
509
 
        iter->iter.worker = _worker;
510
 
        iter->pool = pool_alloconly_create("proxy mailbox iter", 1024);
511
 
        o_stream_send_str(worker->output, "BOX-LIST\n");
512
 
        (void)proxy_client_worker_output_flush(_worker);
513
 
        return &iter->iter;
514
 
}
515
 
 
516
 
static int
517
 
proxy_client_worker_mailbox_iter_next(struct dsync_worker_mailbox_iter *_iter,
518
 
                                      struct dsync_mailbox *dsync_box_r)
519
 
{
520
 
        struct proxy_client_dsync_worker_mailbox_iter *iter =
521
 
                (struct proxy_client_dsync_worker_mailbox_iter *)_iter;
522
 
        struct proxy_client_dsync_worker *worker =
523
 
                (struct proxy_client_dsync_worker *)_iter->worker;
524
 
        const char *line, *error;
525
 
        int ret;
526
 
 
527
 
        if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
528
 
                if (ret < 0)
529
 
                        _iter->failed = TRUE;
530
 
                return ret;
531
 
        }
532
 
 
533
 
        if ((line[0] == '+' || line[0] == '-') && line[1] == '\0') {
534
 
                /* end of mailboxes */
535
 
                if (line[0] == '-') {
536
 
                        i_error("Worker server's mailbox iteration failed");
537
 
                        _iter->failed = TRUE;
538
 
                }
539
 
                return -1;
540
 
        }
541
 
 
542
 
        p_clear(iter->pool);
543
 
        if (dsync_proxy_mailbox_import(iter->pool, line,
544
 
                                       dsync_box_r, &error) < 0) {
545
 
                i_error("Invalid mailbox input from worker server: %s", error);
546
 
                _iter->failed = TRUE;
547
 
                return -1;
548
 
        }
549
 
        return 1;
550
 
}
551
 
 
552
 
static int
553
 
proxy_client_worker_mailbox_iter_deinit(struct dsync_worker_mailbox_iter *_iter)
554
 
{
555
 
        struct proxy_client_dsync_worker_mailbox_iter *iter =
556
 
                (struct proxy_client_dsync_worker_mailbox_iter *)_iter;
557
 
        int ret = _iter->failed ? -1 : 0;
558
 
 
559
 
        pool_unref(&iter->pool);
560
 
        i_free(iter);
561
 
        return ret;
562
 
}
563
 
 
564
 
static struct dsync_worker_subs_iter *
565
 
proxy_client_worker_subs_iter_init(struct dsync_worker *_worker)
566
 
{
567
 
        struct proxy_client_dsync_worker *worker =
568
 
                (struct proxy_client_dsync_worker *)_worker;
569
 
        struct proxy_client_dsync_worker_subs_iter *iter;
570
 
 
571
 
        iter = i_new(struct proxy_client_dsync_worker_subs_iter, 1);
572
 
        iter->iter.worker = _worker;
573
 
        iter->pool = pool_alloconly_create("proxy subscription iter", 1024);
574
 
        o_stream_send_str(worker->output, "SUBS-LIST\n");
575
 
        (void)proxy_client_worker_output_flush(_worker);
576
 
        return &iter->iter;
577
 
}
578
 
 
579
 
static int
580
 
proxy_client_worker_subs_iter_next_line(struct proxy_client_dsync_worker_subs_iter *iter,
581
 
                                        unsigned int wanted_arg_count,
582
 
                                        char ***args_r)
583
 
{
584
 
        struct proxy_client_dsync_worker *worker =
585
 
                (struct proxy_client_dsync_worker *)iter->iter.worker;
586
 
        const char *line;
587
 
        char **args;
588
 
        int ret;
589
 
 
590
 
        if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
591
 
                if (ret < 0)
592
 
                        iter->iter.failed = TRUE;
593
 
                return ret;
594
 
        }
595
 
 
596
 
        if ((line[0] == '+' || line[0] == '-') && line[1] == '\0') {
597
 
                /* end of subscribed subscriptions */
598
 
                if (line[0] == '-') {
599
 
                        i_error("Worker server's subscription iteration failed");
600
 
                        iter->iter.failed = TRUE;
601
 
                }
602
 
                return -1;
603
 
        }
604
 
 
605
 
        p_clear(iter->pool);
606
 
        args = p_strsplit(iter->pool, line, "\t");
607
 
        if (str_array_length((const char *const *)args) < wanted_arg_count) {
608
 
                i_error("Invalid subscription input from worker server");
609
 
                iter->iter.failed = TRUE;
610
 
                return -1;
611
 
        }
612
 
        *args_r = args;
613
 
        return 1;
614
 
}
615
 
 
616
 
static int
617
 
proxy_client_worker_subs_iter_next(struct dsync_worker_subs_iter *_iter,
618
 
                                   struct dsync_worker_subscription *rec_r)
619
 
{
620
 
        struct proxy_client_dsync_worker_subs_iter *iter =
621
 
                (struct proxy_client_dsync_worker_subs_iter *)_iter;
622
 
        char **args;
623
 
        int ret;
624
 
 
625
 
        ret = proxy_client_worker_subs_iter_next_line(iter, 4, &args);
626
 
        if (ret <= 0)
627
 
                return ret;
628
 
 
629
 
        rec_r->vname = str_tabunescape(args[0]);
630
 
        rec_r->storage_name = str_tabunescape(args[1]);
631
 
        rec_r->ns_prefix = str_tabunescape(args[2]);
632
 
        rec_r->last_change = strtoul(args[3], NULL, 10);
633
 
        return 1;
634
 
}
635
 
 
636
 
static int
637
 
proxy_client_worker_subs_iter_next_un(struct dsync_worker_subs_iter *_iter,
638
 
                                      struct dsync_worker_unsubscription *rec_r)
639
 
{
640
 
        struct proxy_client_dsync_worker_subs_iter *iter =
641
 
                (struct proxy_client_dsync_worker_subs_iter *)_iter;
642
 
        char **args;
643
 
        int ret;
644
 
 
645
 
        ret = proxy_client_worker_subs_iter_next_line(iter, 3, &args);
646
 
        if (ret <= 0)
647
 
                return ret;
648
 
 
649
 
        memset(rec_r, 0, sizeof(*rec_r));
650
 
        if (dsync_proxy_mailbox_guid_import(args[0], &rec_r->name_sha1) < 0) {
651
 
                i_error("Invalid subscription input from worker server: "
652
 
                        "Invalid unsubscription mailbox GUID");
653
 
                iter->iter.failed = TRUE;
654
 
                return -1;
655
 
        }
656
 
        rec_r->ns_prefix = str_tabunescape(args[1]);
657
 
        rec_r->last_change = strtoul(args[2], NULL, 10);
658
 
        return 1;
659
 
}
660
 
 
661
 
static int
662
 
proxy_client_worker_subs_iter_deinit(struct dsync_worker_subs_iter *_iter)
663
 
{
664
 
        struct proxy_client_dsync_worker_subs_iter *iter =
665
 
                (struct proxy_client_dsync_worker_subs_iter *)_iter;
666
 
        int ret = _iter->failed ? -1 : 0;
667
 
 
668
 
        pool_unref(&iter->pool);
669
 
        i_free(iter);
670
 
        return ret;
671
 
}
672
 
 
673
 
static void
674
 
proxy_client_worker_set_subscribed(struct dsync_worker *_worker,
675
 
                                   const char *name, time_t last_change,
676
 
                                   bool set)
677
 
{
678
 
        struct proxy_client_dsync_worker *worker =
679
 
                (struct proxy_client_dsync_worker *)_worker;
680
 
 
681
 
        T_BEGIN {
682
 
                string_t *str = t_str_new(128);
683
 
 
684
 
                str_append(str, "SUBS-SET\t");
685
 
                str_tabescape_write(str, name);
686
 
                str_printfa(str, "\t%s\t%d\n", dec2str(last_change),
687
 
                            set ? 1 : 0);
688
 
                o_stream_send(worker->output, str_data(str), str_len(str));
689
 
        } T_END;
690
 
}
691
 
 
692
 
struct proxy_client_dsync_worker_msg_iter {
693
 
        struct dsync_worker_msg_iter iter;
694
 
        pool_t pool;
695
 
        bool done;
696
 
};
697
 
 
698
 
static struct dsync_worker_msg_iter *
699
 
proxy_client_worker_msg_iter_init(struct dsync_worker *_worker,
700
 
                                  const mailbox_guid_t mailboxes[],
701
 
                                  unsigned int mailbox_count)
702
 
{
703
 
        struct proxy_client_dsync_worker *worker =
704
 
                (struct proxy_client_dsync_worker *)_worker;
705
 
        struct proxy_client_dsync_worker_msg_iter *iter;
706
 
        string_t *str;
707
 
        unsigned int i;
708
 
 
709
 
        iter = i_new(struct proxy_client_dsync_worker_msg_iter, 1);
710
 
        iter->iter.worker = _worker;
711
 
        iter->pool = pool_alloconly_create("proxy message iter", 10240);
712
 
 
713
 
        str = str_new(iter->pool, 512);
714
 
        str_append(str, "MSG-LIST");
715
 
        for (i = 0; i < mailbox_count; i++) T_BEGIN {
716
 
                str_append_c(str, '\t');
717
 
                dsync_proxy_mailbox_guid_export(str, &mailboxes[i]);
718
 
        } T_END;
719
 
        str_append_c(str, '\n');
720
 
        o_stream_send(worker->output, str_data(str), str_len(str));
721
 
        p_clear(iter->pool);
722
 
 
723
 
        (void)proxy_client_worker_output_flush(_worker);
724
 
        return &iter->iter;
725
 
}
726
 
 
727
 
static int
728
 
proxy_client_worker_msg_iter_next(struct dsync_worker_msg_iter *_iter,
729
 
                                  unsigned int *mailbox_idx_r,
730
 
                                  struct dsync_message *msg_r)
731
 
{
732
 
        struct proxy_client_dsync_worker_msg_iter *iter =
733
 
                (struct proxy_client_dsync_worker_msg_iter *)_iter;
734
 
        struct proxy_client_dsync_worker *worker =
735
 
                (struct proxy_client_dsync_worker *)_iter->worker;
736
 
        const char *line, *error;
737
 
        int ret;
738
 
 
739
 
        if (iter->done)
740
 
                return -1;
741
 
 
742
 
        if ((ret = proxy_client_worker_read_line(worker, &line)) <= 0) {
743
 
                if (ret < 0)
744
 
                        _iter->failed = TRUE;
745
 
                return ret;
746
 
        }
747
 
 
748
 
        if ((line[0] == '+' || line[0] == '-') && line[1] == '\0') {
749
 
                /* end of messages */
750
 
                if (line[0] == '-') {
751
 
                        i_error("Worker server's message iteration failed");
752
 
                        _iter->failed = TRUE;
753
 
                }
754
 
                iter->done = TRUE;
755
 
                return -1;
756
 
        }
757
 
 
758
 
        *mailbox_idx_r = 0;
759
 
        while (*line >= '0' && *line <= '9') {
760
 
                *mailbox_idx_r = *mailbox_idx_r * 10 + (*line - '0');
761
 
                line++;
762
 
        }
763
 
        if (*line != '\t') {
764
 
                i_error("Invalid mailbox idx from worker server");
765
 
                _iter->failed = TRUE;
766
 
                return -1;
767
 
        }
768
 
        line++;
769
 
 
770
 
        p_clear(iter->pool);
771
 
        if (dsync_proxy_msg_import(iter->pool, line, msg_r, &error) < 0) {
772
 
                i_error("Invalid message input from worker server: %s", error);
773
 
                _iter->failed = TRUE;
774
 
                return -1;
775
 
        }
776
 
        return 1;
777
 
}
778
 
 
779
 
static int
780
 
proxy_client_worker_msg_iter_deinit(struct dsync_worker_msg_iter *_iter)
781
 
{
782
 
        struct proxy_client_dsync_worker_msg_iter *iter =
783
 
                (struct proxy_client_dsync_worker_msg_iter *)_iter;
784
 
        int ret = _iter->failed ? -1 : 0;
785
 
 
786
 
        pool_unref(&iter->pool);
787
 
        i_free(iter);
788
 
        return ret;
789
 
}
790
 
 
791
 
static void
792
 
proxy_client_worker_cmd(struct proxy_client_dsync_worker *worker, string_t *str)
793
 
{
794
 
        if (worker->save_input == NULL)
795
 
                o_stream_send(worker->output, str_data(str), str_len(str));
796
 
        else
797
 
                str_append_str(worker->pending_commands, str);
798
 
}
799
 
 
800
 
static void
801
 
proxy_client_worker_create_mailbox(struct dsync_worker *_worker,
802
 
                                   const struct dsync_mailbox *dsync_box)
803
 
{
804
 
        struct proxy_client_dsync_worker *worker =
805
 
                (struct proxy_client_dsync_worker *)_worker;
806
 
 
807
 
        T_BEGIN {
808
 
                string_t *str = t_str_new(128);
809
 
 
810
 
                str_append(str, "BOX-CREATE\t");
811
 
                dsync_proxy_mailbox_export(str, dsync_box);
812
 
                str_append_c(str, '\n');
813
 
                proxy_client_worker_cmd(worker, str);
814
 
        } T_END;
815
 
}
816
 
 
817
 
static void
818
 
proxy_client_worker_delete_mailbox(struct dsync_worker *_worker,
819
 
                                   const struct dsync_mailbox *dsync_box)
820
 
{
821
 
        struct proxy_client_dsync_worker *worker =
822
 
                (struct proxy_client_dsync_worker *)_worker;
823
 
 
824
 
        T_BEGIN {
825
 
                string_t *str = t_str_new(128);
826
 
 
827
 
                str_append(str, "BOX-DELETE\t");
828
 
                dsync_proxy_mailbox_guid_export(str, &dsync_box->mailbox_guid);
829
 
                str_printfa(str, "\t%s\n", dec2str(dsync_box->last_change));
830
 
                proxy_client_worker_cmd(worker, str);
831
 
        } T_END;
832
 
}
833
 
 
834
 
static void
835
 
proxy_client_worker_delete_dir(struct dsync_worker *_worker,
836
 
                               const struct dsync_mailbox *dsync_box)
837
 
{
838
 
        struct proxy_client_dsync_worker *worker =
839
 
                (struct proxy_client_dsync_worker *)_worker;
840
 
 
841
 
        T_BEGIN {
842
 
                string_t *str = t_str_new(128);
843
 
 
844
 
                str_append(str, "DIR-DELETE\t");
845
 
                str_tabescape_write(str, dsync_box->name);
846
 
                str_printfa(str, "\t%s\n", dec2str(dsync_box->last_change));
847
 
                proxy_client_worker_cmd(worker, str);
848
 
        } T_END;
849
 
}
850
 
 
851
 
static void
852
 
proxy_client_worker_rename_mailbox(struct dsync_worker *_worker,
853
 
                                   const mailbox_guid_t *mailbox,
854
 
                                   const struct dsync_mailbox *dsync_box)
855
 
{
856
 
        struct proxy_client_dsync_worker *worker =
857
 
                (struct proxy_client_dsync_worker *)_worker;
858
 
        char sep[2];
859
 
 
860
 
        T_BEGIN {
861
 
                string_t *str = t_str_new(128);
862
 
 
863
 
                str_append(str, "BOX-RENAME\t");
864
 
                dsync_proxy_mailbox_guid_export(str, mailbox);
865
 
                str_append_c(str, '\t');
866
 
                str_tabescape_write(str, dsync_box->name);
867
 
                str_append_c(str, '\t');
868
 
                sep[0] = dsync_box->name_sep; sep[1] = '\0';
869
 
                str_tabescape_write(str, sep);
870
 
                str_append_c(str, '\n');
871
 
                proxy_client_worker_cmd(worker, str);
872
 
        } T_END;
873
 
}
874
 
 
875
 
static void
876
 
proxy_client_worker_update_mailbox(struct dsync_worker *_worker,
877
 
                                   const struct dsync_mailbox *dsync_box)
878
 
{
879
 
        struct proxy_client_dsync_worker *worker =
880
 
                (struct proxy_client_dsync_worker *)_worker;
881
 
 
882
 
        T_BEGIN {
883
 
                string_t *str = t_str_new(128);
884
 
 
885
 
                str_append(str, "BOX-UPDATE\t");
886
 
                dsync_proxy_mailbox_export(str, dsync_box);
887
 
                str_append_c(str, '\n');
888
 
                proxy_client_worker_cmd(worker, str);
889
 
        } T_END;
890
 
}
891
 
 
892
 
static void
893
 
proxy_client_worker_select_mailbox(struct dsync_worker *_worker,
894
 
                                   const mailbox_guid_t *mailbox,
895
 
                                   const ARRAY_TYPE(mailbox_cache_field) *cache_fields)
896
 
{
897
 
        struct proxy_client_dsync_worker *worker =
898
 
                (struct proxy_client_dsync_worker *)_worker;
899
 
 
900
 
        if (dsync_guid_equals(&worker->selected_box_guid, mailbox))
901
 
                return;
902
 
        worker->selected_box_guid = *mailbox;
903
 
 
904
 
        T_BEGIN {
905
 
                string_t *str = t_str_new(128);
906
 
 
907
 
                str_append(str, "BOX-SELECT\t");
908
 
                dsync_proxy_mailbox_guid_export(str, mailbox);
909
 
                if (cache_fields != NULL)
910
 
                        dsync_proxy_cache_fields_export(str, cache_fields);
911
 
                str_append_c(str, '\n');
912
 
                proxy_client_worker_cmd(worker, str);
913
 
        } T_END;
914
 
}
915
 
 
916
 
static void
917
 
proxy_client_worker_msg_update_metadata(struct dsync_worker *_worker,
918
 
                                        const struct dsync_message *msg)
919
 
{
920
 
        struct proxy_client_dsync_worker *worker =
921
 
                (struct proxy_client_dsync_worker *)_worker;
922
 
 
923
 
        T_BEGIN {
924
 
                string_t *str = t_str_new(128);
925
 
 
926
 
                str_printfa(str, "MSG-UPDATE\t%u\t%llu\t", msg->uid,
927
 
                            (unsigned long long)msg->modseq);
928
 
                imap_write_flags(str, msg->flags, msg->keywords);
929
 
                str_append_c(str, '\n');
930
 
                proxy_client_worker_cmd(worker, str);
931
 
        } T_END;
932
 
}
933
 
 
934
 
static void
935
 
proxy_client_worker_msg_update_uid(struct dsync_worker *_worker,
936
 
                                   uint32_t old_uid, uint32_t new_uid)
937
 
{
938
 
        struct proxy_client_dsync_worker *worker =
939
 
                (struct proxy_client_dsync_worker *)_worker;
940
 
 
941
 
        T_BEGIN {
942
 
                string_t *str = t_str_new(64);
943
 
                str_printfa(str, "MSG-UID-CHANGE\t%u\t%u\n", old_uid, new_uid);
944
 
                proxy_client_worker_cmd(worker, str);
945
 
        } T_END;
946
 
}
947
 
 
948
 
static void
949
 
proxy_client_worker_msg_expunge(struct dsync_worker *_worker, uint32_t uid)
950
 
{
951
 
        struct proxy_client_dsync_worker *worker =
952
 
                (struct proxy_client_dsync_worker *)_worker;
953
 
 
954
 
        T_BEGIN {
955
 
                string_t *str = t_str_new(64);
956
 
                str_printfa(str, "MSG-EXPUNGE\t%u\n", uid);
957
 
                proxy_client_worker_cmd(worker, str);
958
 
        } T_END;
959
 
}
960
 
 
961
 
static void
962
 
proxy_client_worker_msg_copy(struct dsync_worker *_worker,
963
 
                             const mailbox_guid_t *src_mailbox,
964
 
                             uint32_t src_uid,
965
 
                             const struct dsync_message *dest_msg,
966
 
                             dsync_worker_copy_callback_t *callback,
967
 
                             void *context)
968
 
{
969
 
        struct proxy_client_dsync_worker *worker =
970
 
                (struct proxy_client_dsync_worker *)_worker;
971
 
        struct proxy_client_request request;
972
 
 
973
 
        T_BEGIN {
974
 
                string_t *str = t_str_new(128);
975
 
 
976
 
                str_append(str, "MSG-COPY\t");
977
 
                dsync_proxy_mailbox_guid_export(str, src_mailbox);
978
 
                str_printfa(str, "\t%u\t", src_uid);
979
 
                dsync_proxy_msg_export(str, dest_msg);
980
 
                str_append_c(str, '\n');
981
 
                proxy_client_worker_cmd(worker, str);
982
 
        } T_END;
983
 
 
984
 
        memset(&request, 0, sizeof(request));
985
 
        request.type = PROXY_CLIENT_REQUEST_TYPE_COPY;
986
 
        request.callback.copy = callback;
987
 
        request.context = context;
988
 
        request.uid = src_uid;
989
 
        aqueue_append(worker->request_queue, &request);
990
 
}
991
 
 
992
 
static void
993
 
proxy_client_send_stream_real(struct proxy_client_dsync_worker *worker)
994
 
{
995
 
        dsync_worker_save_callback_t *callback;
996
 
        void *context;
997
 
        struct istream *input;
998
 
        const unsigned char *data;
999
 
        size_t size;
1000
 
        int ret;
1001
 
 
1002
 
        while ((ret = i_stream_read_data(worker->save_input,
1003
 
                                         &data, &size, 0)) > 0) {
1004
 
                dsync_proxy_send_dot_output(worker->output,
1005
 
                                            &worker->save_input_last_lf,
1006
 
                                            data, size);
1007
 
                i_stream_skip(worker->save_input, size);
1008
 
 
1009
 
                if (worker_is_output_stream_full(worker)) {
1010
 
                        o_stream_uncork(worker->output);
1011
 
                        if (worker_is_output_stream_full(worker))
1012
 
                                return;
1013
 
                        o_stream_cork(worker->output);
1014
 
                }
1015
 
        }
1016
 
        if (ret == 0) {
1017
 
                /* waiting for more input */
1018
 
                o_stream_uncork(worker->output);
1019
 
                if (worker->save_io == NULL) {
1020
 
                        int fd = i_stream_get_fd(worker->save_input);
1021
 
 
1022
 
                        worker->save_io =
1023
 
                                io_add(fd, IO_READ,
1024
 
                                       proxy_client_send_stream, worker);
1025
 
                }
1026
 
                return;
1027
 
        }
1028
 
        if (worker->save_io != NULL)
1029
 
                io_remove(&worker->save_io);
1030
 
        if (worker->save_input->stream_errno != 0) {
1031
 
                errno = worker->save_input->stream_errno;
1032
 
                i_error("proxy: reading message input failed: %m");
1033
 
                o_stream_close(worker->output);
1034
 
        } else {
1035
 
                i_assert(!i_stream_have_bytes_left(worker->save_input));
1036
 
                o_stream_send(worker->output, "\n.\n", 3);
1037
 
        }
1038
 
 
1039
 
        callback = worker->save_callback;
1040
 
        context = worker->save_context;
1041
 
        worker->save_callback = NULL;
1042
 
        worker->save_context = NULL;
1043
 
 
1044
 
        /* a bit ugly way to free the stream. the problem is that local worker
1045
 
           has set a destroy callback, which in turn can call our msg_save()
1046
 
           again before the i_stream_unref() is finished. */
1047
 
        input = worker->save_input;
1048
 
        worker->save_input = NULL;
1049
 
        i_stream_unref(&input);
1050
 
 
1051
 
        (void)proxy_client_worker_output_flush(&worker->worker);
1052
 
 
1053
 
        callback(context);
1054
 
}
1055
 
 
1056
 
static void proxy_client_send_stream(struct proxy_client_dsync_worker *worker)
1057
 
{
1058
 
        proxy_client_send_stream_real(worker);
1059
 
        timeout_reset(worker->to);
1060
 
}
1061
 
 
1062
 
static void
1063
 
proxy_client_worker_msg_save(struct dsync_worker *_worker,
1064
 
                             const struct dsync_message *msg,
1065
 
                             const struct dsync_msg_static_data *data,
1066
 
                             dsync_worker_save_callback_t *callback,
1067
 
                             void *context)
1068
 
{
1069
 
        struct proxy_client_dsync_worker *worker =
1070
 
                (struct proxy_client_dsync_worker *)_worker;
1071
 
 
1072
 
        T_BEGIN {
1073
 
                string_t *str = t_str_new(128);
1074
 
 
1075
 
                str_append(str, "MSG-SAVE\t");
1076
 
                dsync_proxy_msg_static_export(str, data);
1077
 
                str_append_c(str, '\t');
1078
 
                dsync_proxy_msg_export(str, msg);
1079
 
                str_append_c(str, '\n');
1080
 
                proxy_client_worker_cmd(worker, str);
1081
 
        } T_END;
1082
 
 
1083
 
        i_assert(worker->save_input == NULL);
1084
 
        worker->save_callback = callback;
1085
 
        worker->save_context = context;
1086
 
        worker->save_input = data->input;
1087
 
        worker->save_input_last_lf = TRUE;
1088
 
        i_stream_ref(worker->save_input);
1089
 
        proxy_client_send_stream(worker);
1090
 
}
1091
 
 
1092
 
static void
1093
 
proxy_client_worker_msg_save_cancel(struct dsync_worker *_worker)
1094
 
{
1095
 
        struct proxy_client_dsync_worker *worker =
1096
 
                (struct proxy_client_dsync_worker *)_worker;
1097
 
 
1098
 
        if (worker->save_io != NULL)
1099
 
                io_remove(&worker->save_io);
1100
 
        if (worker->save_input != NULL)
1101
 
                i_stream_unref(&worker->save_input);
1102
 
}
1103
 
 
1104
 
static void
1105
 
proxy_client_worker_msg_get(struct dsync_worker *_worker,
1106
 
                            const mailbox_guid_t *mailbox, uint32_t uid,
1107
 
                            dsync_worker_msg_callback_t *callback,
1108
 
                            void *context)
1109
 
{
1110
 
        struct proxy_client_dsync_worker *worker =
1111
 
                (struct proxy_client_dsync_worker *)_worker;
1112
 
        struct proxy_client_request request;
1113
 
 
1114
 
        T_BEGIN {
1115
 
                string_t *str = t_str_new(128);
1116
 
 
1117
 
                str_append(str, "MSG-GET\t");
1118
 
                dsync_proxy_mailbox_guid_export(str, mailbox);
1119
 
                str_printfa(str, "\t%u\n", uid);
1120
 
                proxy_client_worker_cmd(worker, str);
1121
 
        } T_END;
1122
 
 
1123
 
        memset(&request, 0, sizeof(request));
1124
 
        request.type = PROXY_CLIENT_REQUEST_TYPE_GET;
1125
 
        request.callback.get = callback;
1126
 
        request.context = context;
1127
 
        request.uid = uid;
1128
 
        aqueue_append(worker->request_queue, &request);
1129
 
}
1130
 
 
1131
 
static void
1132
 
proxy_client_worker_finish(struct dsync_worker *_worker,
1133
 
                           dsync_worker_finish_callback_t *callback,
1134
 
                           void *context)
1135
 
{
1136
 
        struct proxy_client_dsync_worker *worker =
1137
 
                (struct proxy_client_dsync_worker *)_worker;
1138
 
        struct proxy_client_request request;
1139
 
 
1140
 
        i_assert(worker->save_input == NULL);
1141
 
        i_assert(!worker->finishing);
1142
 
 
1143
 
        worker->finishing = TRUE;
1144
 
        worker->finished = FALSE;
1145
 
 
1146
 
        o_stream_send_str(worker->output, "FINISH\n");
1147
 
        o_stream_uncork(worker->output);
1148
 
 
1149
 
        memset(&request, 0, sizeof(request));
1150
 
        request.type = PROXY_CLIENT_REQUEST_TYPE_FINISH;
1151
 
        request.callback.finish = callback;
1152
 
        request.context = context;
1153
 
        aqueue_append(worker->request_queue, &request);
1154
 
}
1155
 
 
1156
 
struct dsync_worker_vfuncs proxy_client_dsync_worker = {
1157
 
        proxy_client_worker_deinit,
1158
 
 
1159
 
        proxy_client_worker_is_output_full,
1160
 
        proxy_client_worker_output_flush,
1161
 
 
1162
 
        proxy_client_worker_mailbox_iter_init,
1163
 
        proxy_client_worker_mailbox_iter_next,
1164
 
        proxy_client_worker_mailbox_iter_deinit,
1165
 
 
1166
 
        proxy_client_worker_subs_iter_init,
1167
 
        proxy_client_worker_subs_iter_next,
1168
 
        proxy_client_worker_subs_iter_next_un,
1169
 
        proxy_client_worker_subs_iter_deinit,
1170
 
        proxy_client_worker_set_subscribed,
1171
 
 
1172
 
        proxy_client_worker_msg_iter_init,
1173
 
        proxy_client_worker_msg_iter_next,
1174
 
        proxy_client_worker_msg_iter_deinit,
1175
 
 
1176
 
        proxy_client_worker_create_mailbox,
1177
 
        proxy_client_worker_delete_mailbox,
1178
 
        proxy_client_worker_delete_dir,
1179
 
        proxy_client_worker_rename_mailbox,
1180
 
        proxy_client_worker_update_mailbox,
1181
 
 
1182
 
        proxy_client_worker_select_mailbox,
1183
 
        proxy_client_worker_msg_update_metadata,
1184
 
        proxy_client_worker_msg_update_uid,
1185
 
        proxy_client_worker_msg_expunge,
1186
 
        proxy_client_worker_msg_copy,
1187
 
        proxy_client_worker_msg_save,
1188
 
        proxy_client_worker_msg_save_cancel,
1189
 
        proxy_client_worker_msg_get,
1190
 
        proxy_client_worker_finish
1191
 
};