1
/* Copyright (c) 2009-2012 Dovecot authors, see the included COPYING file */
4
This code contains the step 6 explained in dsync-brain-msgs.c:
5
It saves/copies new messages and gives new UIDs for conflicting messages.
7
The input is both workers' msg iterators' new_msgs and uid_conflicts
8
variables. They're first sorted by mailbox and secondarily by wanted
9
destination UID. Destination UIDs of conflicts should always be higher
12
Mailboxes are handled one at a time:
14
1. Go through all saved messages. If we've already seen an instance of this
15
message, try to copy it. Otherwise save a new instance of it.
16
2. Some of the copies may fail because they're already expunged by that
17
time. A list of these failed copies are saved to copy_retry_indexes.
18
3. UID conflicts are resolved by assigning a new UID to the message.
19
To avoid delays with remote dsync, this is done via worker API.
20
Internally the local worker copies the message to its new UID and
21
once the copy succeeds, the old UID is expunged. If the copy fails, it's
22
either due to message already being expunged or something more fatal.
23
4. Once all messages are saved/copied, see if there are any failed copies.
24
If so, goto 1, but going through only the failed messages.
25
5. If there are more mailboxes left, go to next one and goto 1.
27
Step 4 may require waiting for remote worker to send all replies.
34
#include "dsync-worker.h"
35
#include "dsync-brain-private.h"
37
struct dsync_brain_msg_copy_context {
38
struct dsync_brain_msg_iter *iter;
42
struct dsync_brain_msg_save_context {
43
struct dsync_brain_msg_iter *iter;
44
const struct dsync_message *msg;
45
unsigned int mailbox_idx;
49
dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter);
51
static void msg_save_callback(void *context)
53
struct dsync_brain_msg_save_context *ctx = context;
55
dsync_brain_guid_add(ctx->iter, ctx->mailbox_idx, ctx->msg);
56
if (--ctx->iter->save_results_left == 0 && !ctx->iter->adding_msgs)
57
dsync_brain_msg_sync_add_new_msgs(ctx->iter);
61
static void msg_get_callback(enum dsync_msg_get_result result,
62
const struct dsync_msg_static_data *data,
65
struct dsync_brain_msg_save_context *ctx = context;
66
const struct dsync_brain_mailbox *mailbox;
67
struct istream *input;
69
i_assert(ctx->iter->save_results_left > 0);
71
mailbox = array_idx(&ctx->iter->sync->mailboxes, ctx->mailbox_idx);
73
case DSYNC_MSG_GET_RESULT_SUCCESS:
74
/* the mailbox may have changed, make sure we've the
76
dsync_worker_select_mailbox(ctx->iter->worker, &mailbox->box);
79
dsync_worker_msg_save(ctx->iter->worker, ctx->msg, data,
80
msg_save_callback, ctx);
81
i_stream_unref(&input);
83
case DSYNC_MSG_GET_RESULT_EXPUNGED:
84
/* mail got expunged during sync. just skip this. */
85
msg_save_callback(ctx);
87
case DSYNC_MSG_GET_RESULT_FAILED:
88
i_error("msg-get failed: box=%s uid=%u guid=%s",
89
mailbox->box.name, ctx->msg->uid, ctx->msg->guid);
90
dsync_brain_fail(ctx->iter->sync->brain);
91
msg_save_callback(ctx);
97
dsync_brain_sync_remove_guid_instance(struct dsync_brain_msg_iter *iter,
98
const struct dsync_brain_new_msg *msg)
100
struct dsync_brain_guid_instance *inst;
101
void *orig_key, *orig_value;
103
if (!hash_table_lookup_full(iter->guid_hash, msg->msg->guid,
104
&orig_key, &orig_value)) {
105
/* another failed copy already removed it */
110
if (inst->next == NULL)
111
hash_table_remove(iter->guid_hash, orig_key);
113
hash_table_update(iter->guid_hash, orig_key, inst->next);
116
static void dsync_brain_copy_callback(bool success, void *context)
118
struct dsync_brain_msg_copy_context *ctx = context;
119
struct dsync_brain_new_msg *msg;
122
/* mark the guid instance invalid and try again later */
123
msg = array_idx_modifiable(&ctx->iter->new_msgs, ctx->msg_idx);
124
i_assert(msg->saved);
127
if (ctx->iter->next_new_msg > ctx->msg_idx)
128
ctx->iter->next_new_msg = ctx->msg_idx;
130
dsync_brain_sync_remove_guid_instance(ctx->iter, msg);
133
if (--ctx->iter->copy_results_left == 0 && !ctx->iter->adding_msgs)
134
dsync_brain_msg_sync_add_new_msgs(ctx->iter);
139
dsync_brain_msg_sync_add_new_msg(struct dsync_brain_msg_iter *dest_iter,
140
const mailbox_guid_t *src_mailbox,
141
unsigned int msg_idx,
142
struct dsync_brain_new_msg *msg)
144
struct dsync_brain_msg_save_context *save_ctx;
145
struct dsync_brain_msg_copy_context *copy_ctx;
146
struct dsync_brain_msg_iter *src_iter;
147
const struct dsync_brain_guid_instance *inst;
148
const struct dsync_brain_mailbox *inst_box;
152
inst = hash_table_lookup(dest_iter->guid_hash, msg->msg->guid);
154
/* we can save this by copying an existing message */
155
inst_box = array_idx(&dest_iter->sync->mailboxes,
158
copy_ctx = i_new(struct dsync_brain_msg_copy_context, 1);
159
copy_ctx->iter = dest_iter;
160
copy_ctx->msg_idx = msg_idx;
162
dest_iter->copy_results_left++;
163
dest_iter->adding_msgs = TRUE;
164
dsync_worker_msg_copy(dest_iter->worker,
165
&inst_box->box.mailbox_guid,
167
dsync_brain_copy_callback, copy_ctx);
168
dest_iter->adding_msgs = FALSE;
170
src_iter = dest_iter == dest_iter->sync->dest_msg_iter ?
171
dest_iter->sync->src_msg_iter :
172
dest_iter->sync->dest_msg_iter;
174
save_ctx = i_new(struct dsync_brain_msg_save_context, 1);
175
save_ctx->iter = dest_iter;
176
save_ctx->msg = msg->msg;
177
save_ctx->mailbox_idx = dest_iter->mailbox_idx;
179
dest_iter->save_results_left++;
180
dest_iter->adding_msgs = TRUE;
181
dsync_worker_msg_get(src_iter->worker, src_mailbox,
182
msg->orig_uid, msg_get_callback, save_ctx);
183
dest_iter->adding_msgs = FALSE;
184
if (dsync_worker_output_flush(src_iter->worker) < 0)
186
if (dsync_worker_is_output_full(dest_iter->worker)) {
187
/* see if the output becomes less full by flushing */
188
if (dsync_worker_output_flush(dest_iter->worker) < 0)
192
return dsync_worker_is_output_full(dest_iter->worker) ? 0 : 1;
196
dsync_brain_mailbox_add_new_msgs(struct dsync_brain_msg_iter *iter,
197
const mailbox_guid_t *mailbox_guid)
199
struct dsync_brain_new_msg *msgs;
200
unsigned int msg_count;
203
msgs = array_get_modifiable(&iter->new_msgs, &msg_count);
204
while (iter->next_new_msg < msg_count) {
205
struct dsync_brain_new_msg *msg = &msgs[iter->next_new_msg];
207
if (msg->mailbox_idx != iter->mailbox_idx) {
208
i_assert(msg->mailbox_idx > iter->mailbox_idx);
212
iter->next_new_msg++;
216
if (dsync_brain_msg_sync_add_new_msg(iter, mailbox_guid,
217
iter->next_new_msg - 1,
219
/* failed / continue later */
223
if (iter->next_new_msg == msg_count)
226
/* flush copy commands */
227
if (dsync_worker_output_flush(iter->worker) > 0 && ret) {
228
/* we have more space again, continue */
229
return dsync_brain_mailbox_add_new_msgs(iter, mailbox_guid);
236
dsync_brain_mailbox_save_conflicts(struct dsync_brain_msg_iter *iter)
238
const struct dsync_brain_uid_conflict *conflicts;
239
unsigned int i, count;
241
conflicts = array_get(&iter->uid_conflicts, &count);
242
for (i = iter->next_conflict; i < count; i++) {
243
if (conflicts[i].mailbox_idx != iter->mailbox_idx)
246
dsync_worker_msg_update_uid(iter->worker, conflicts[i].old_uid,
247
conflicts[i].new_uid);
249
iter->next_conflict = i;
253
dsync_brain_msg_sync_finish(struct dsync_brain_msg_iter *iter)
255
struct dsync_brain_mailbox_sync *sync = iter->sync;
257
i_assert(sync->brain->state == DSYNC_STATE_SYNC_MSGS);
259
iter->msgs_sent = TRUE;
261
/* done with all mailboxes from this iter */
262
dsync_worker_set_input_callback(iter->worker, NULL, NULL);
264
if (sync->src_msg_iter->msgs_sent &&
265
sync->dest_msg_iter->msgs_sent &&
266
sync->src_msg_iter->save_results_left == 0 &&
267
sync->dest_msg_iter->save_results_left == 0 &&
268
dsync_worker_output_flush(sync->dest_worker) > 0 &&
269
dsync_worker_output_flush(sync->src_worker) > 0) {
270
dsync_worker_set_output_callback(sync->src_msg_iter->worker,
272
dsync_worker_set_output_callback(sync->dest_msg_iter->worker,
274
sync->brain->state++;
275
dsync_brain_sync(sync->brain);
280
dsync_brain_msg_sync_select_mailbox(struct dsync_brain_msg_iter *iter)
282
const struct dsync_brain_mailbox *mailbox;
284
while (iter->mailbox_idx < array_count(&iter->sync->mailboxes)) {
285
if (array_count(&iter->new_msgs) == 0 &&
286
array_count(&iter->uid_conflicts) == 0) {
287
/* optimization: don't even bother selecting this
293
mailbox = array_idx(&iter->sync->mailboxes, iter->mailbox_idx);
294
dsync_worker_select_mailbox(iter->worker, &mailbox->box);
297
dsync_brain_msg_sync_finish(iter);
302
dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter)
304
const struct dsync_brain_mailbox *mailbox;
305
const mailbox_guid_t *mailbox_guid;
307
if (iter->msgs_sent) {
308
dsync_brain_msg_sync_finish(iter);
313
mailbox = array_idx(&iter->sync->mailboxes, iter->mailbox_idx);
314
mailbox_guid = &mailbox->box.mailbox_guid;
315
if (dsync_brain_mailbox_add_new_msgs(iter, mailbox_guid)) {
320
/* all messages saved for this mailbox. continue with saving
321
its conflicts and waiting for copies to finish. */
322
dsync_brain_mailbox_save_conflicts(iter);
323
if (iter->save_results_left > 0 ||
324
iter->copy_results_left > 0) {
325
/* wait for saves/copies to finish */
329
/* done with this mailbox, try the next one */
331
} while (dsync_brain_msg_sync_select_mailbox(iter));
334
static void dsync_worker_new_msg_output(void *context)
336
struct dsync_brain_msg_iter *iter = context;
338
dsync_brain_msg_sync_add_new_msgs(iter);
341
static int dsync_brain_new_msg_cmp(const struct dsync_brain_new_msg *m1,
342
const struct dsync_brain_new_msg *m2)
344
if (m1->mailbox_idx < m2->mailbox_idx)
346
if (m1->mailbox_idx > m2->mailbox_idx)
349
if (m1->msg->uid < m2->msg->uid)
351
if (m1->msg->uid > m2->msg->uid)
357
dsync_brain_uid_conflict_cmp(const struct dsync_brain_uid_conflict *c1,
358
const struct dsync_brain_uid_conflict *c2)
360
if (c1->mailbox_idx < c2->mailbox_idx)
362
if (c1->mailbox_idx < c2->mailbox_idx)
365
if (c1->new_uid < c2->new_uid)
367
if (c1->new_uid > c2->new_uid)
373
dsync_brain_msg_iter_sync_new_msgs(struct dsync_brain_msg_iter *iter)
375
iter->mailbox_idx = 0;
377
/* sort input by 1) mailbox, 2) new message UID */
378
array_sort(&iter->new_msgs, dsync_brain_new_msg_cmp);
379
array_sort(&iter->uid_conflicts, dsync_brain_uid_conflict_cmp);
381
dsync_worker_set_input_callback(iter->worker, NULL, iter);
382
dsync_worker_set_output_callback(iter->worker,
383
dsync_worker_new_msg_output, iter);
385
if (dsync_brain_msg_sync_select_mailbox(iter))
386
dsync_brain_msg_sync_add_new_msgs(iter);
389
void dsync_brain_msg_sync_new_msgs(struct dsync_brain_mailbox_sync *sync)
391
dsync_brain_msg_iter_sync_new_msgs(sync->src_msg_iter);
392
dsync_brain_msg_iter_sync_new_msgs(sync->dest_msg_iter);