1
/* Copyright (c) 2009-2011 Dovecot authors, see the included COPYING file */
3
/* This code synchronizes messages in all mailboxes between two workers.
4
The "src" and "dest" terms don't really have anything to do with reality,
5
they're both treated equal.
7
1. Iterate through all messages in all (wanted) mailboxes. The mailboxes
8
are iterated in the same order and messages in ascending order.
9
All of the expunged messages at the end of mailbox (i.e.
10
last_existing_uid+1 .. next_uid-1) are also returned with
11
DSYNC_MAIL_FLAG_EXPUNGED set. We only care about the end of the mailbox,
12
because we can detect UID conflicts for messages in the middle by looking
13
at the next existing message and seeing if it has UID conflict.
14
2. For each seen non-expunged message, save it to GUID instance hash table:
15
message GUID => linked list of { uid, mailbox }
16
3. Each message in a mailbox is matched between the two workers as long as
17
both have messages left (the last ones may be expunged).
18
The possibilities are:
20
i) We don't know the GUIDs of both messages:
22
a) Message is expunged in both. Do nothing.
23
b) Message is expunged in only one of them. If there have been no UID
24
conflicts seen so far, expunge the message in the other one.
25
Otherwise, give the existing a message a new UID (at step 6).
27
ii) We know GUIDs of both messages (one/both of them may be expunged):
29
a) Messages have conflicting GUIDs. Give new UIDs for the non-expunged
30
message(s) (at step 6).
31
b) Messages have matching GUIDs and one of them is expunged.
32
Expunge also the other one. (We don't need to care about previous
33
UID conflicts here, because we know this message is the same with
34
both workers, since they have the same GUID.)
35
c) Messages have matching GUIDs and both of them exist. Sync flags from
36
whichever has the higher modseq. If both modseqs equal but flags
37
don't, pick the one that has more flags. If even the flag count is
38
the same, just pick one of them.
39
4. One of the workers may messages left in the mailbox. Copy these
40
(non-expunged) messages to the other worker (at step 6).
41
5. If there are more mailboxes left, go to next one and goto 2.
43
6. Copy the new messages and give new UIDs to conflicting messages.
44
This code exists in dsync-brain-msgs-new.c
50
#include "dsync-worker.h"
51
#include "dsync-brain-private.h"
53
static void dsync_brain_guid_add(struct dsync_brain_msg_iter *iter)
55
struct dsync_brain_guid_instance *inst, *prev_inst;
57
if ((iter->msg.flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0)
60
inst = p_new(iter->sync->pool, struct dsync_brain_guid_instance, 1);
61
inst->mailbox_idx = iter->mailbox_idx;
62
inst->uid = iter->msg.uid;
64
prev_inst = hash_table_lookup(iter->guid_hash, iter->msg.guid);
65
if (prev_inst == NULL) {
66
hash_table_insert(iter->guid_hash,
67
p_strdup(iter->sync->pool, iter->msg.guid),
70
inst->next = prev_inst->next;
71
prev_inst->next = inst;
75
static int dsync_brain_msg_iter_next(struct dsync_brain_msg_iter *iter)
79
if (iter->msg.guid == NULL) {
80
ret = dsync_worker_msg_iter_next(iter->iter,
84
dsync_brain_guid_add(iter);
87
if (iter->sync->wanted_mailbox_idx != iter->mailbox_idx) {
88
/* finished with this mailbox */
95
dsync_brain_msg_iter_skip_mailbox(struct dsync_brain_mailbox_sync *sync)
99
while ((ret = dsync_brain_msg_iter_next(sync->src_msg_iter)) > 0)
100
sync->src_msg_iter->msg.guid = NULL;
104
while ((ret = dsync_brain_msg_iter_next(sync->dest_msg_iter)) > 0)
105
sync->dest_msg_iter->msg.guid = NULL;
109
sync->skip_mailbox = FALSE;
113
static int dsync_brain_msg_iter_next_pair(struct dsync_brain_mailbox_sync *sync)
117
if (sync->skip_mailbox) {
118
if (dsync_brain_msg_iter_skip_mailbox(sync) == 0)
122
ret1 = dsync_brain_msg_iter_next(sync->src_msg_iter);
123
ret2 = dsync_brain_msg_iter_next(sync->dest_msg_iter);
124
if (ret1 == 0 || ret2 == 0) {
125
/* make sure we iterate through everything in both iterators
126
(even if it might not seem necessary, because proxy
130
if (ret1 < 0 || ret2 < 0)
136
dsync_brain_msg_sync_save(struct dsync_brain_msg_iter *iter,
137
unsigned int mailbox_idx,
138
const struct dsync_message *msg)
140
struct dsync_brain_new_msg *new_msg;
142
if ((msg->flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0)
145
new_msg = array_append_space(&iter->new_msgs);
146
new_msg->mailbox_idx = mailbox_idx;
147
new_msg->orig_uid = msg->uid;
148
new_msg->msg = dsync_message_dup(iter->sync->pool, msg);
152
dsync_brain_msg_sync_conflict(struct dsync_brain_msg_iter *conflict_iter,
153
struct dsync_brain_msg_iter *save_iter,
154
const struct dsync_message *msg)
156
struct dsync_brain_uid_conflict *conflict;
157
struct dsync_brain_new_msg *new_msg;
158
struct dsync_brain_mailbox *brain_box;
161
brain_box = array_idx_modifiable(&save_iter->sync->mailboxes,
162
save_iter->mailbox_idx);
164
if (save_iter->sync->brain->backup) {
165
i_warning("Destination mailbox %s has been modified, "
166
"need to recreate it before we can continue syncing",
167
brain_box->box.name);
168
dsync_worker_delete_mailbox(save_iter->sync->brain->dest_worker,
170
save_iter->sync->brain->unexpected_changes = TRUE;
171
save_iter->sync->skip_mailbox = TRUE;
175
new_uid = brain_box->box.uid_next++;
177
conflict = array_append_space(&conflict_iter->uid_conflicts);
178
conflict->mailbox_idx = conflict_iter->mailbox_idx;
179
conflict->old_uid = msg->uid;
180
conflict->new_uid = new_uid;
182
new_msg = array_append_space(&save_iter->new_msgs);
183
new_msg->mailbox_idx = save_iter->mailbox_idx;
184
new_msg->orig_uid = msg->uid;
185
new_msg->msg = dsync_message_dup(save_iter->sync->pool, msg);
186
new_msg->msg->uid = new_uid;
190
dsync_message_flag_importance_cmp(const struct dsync_message *m1,
191
const struct dsync_message *m2)
193
unsigned int i, count1, count2;
195
if (m1->modseq > m2->modseq)
197
else if (m1->modseq < m2->modseq)
200
if (m1->flags == m2->flags &&
201
dsync_keyword_list_equals(m1->keywords, m2->keywords))
204
/* modseqs match, but flags aren't the same. pick the one that
206
count1 = str_array_length(m1->keywords);
207
count2 = str_array_length(m2->keywords);
208
for (i = 1; i != MAIL_RECENT; i <<= 1) {
209
if ((m1->flags & i) != 0)
211
if ((m2->flags & i) != 0)
216
else if (count1 < count2)
219
/* they even have the same number of flags. don't bother with further
220
guessing, just pick the first one. */
224
static void dsync_brain_msg_sync_existing(struct dsync_brain_mailbox_sync *sync,
225
struct dsync_message *src_msg,
226
struct dsync_message *dest_msg)
230
ret = dsync_message_flag_importance_cmp(src_msg, dest_msg);
231
if (ret < 0 || (sync->brain->backup && ret > 0))
232
dsync_worker_msg_update_metadata(sync->dest_worker, src_msg);
234
dsync_worker_msg_update_metadata(sync->src_worker, dest_msg);
237
static int dsync_brain_msg_sync_pair(struct dsync_brain_mailbox_sync *sync)
239
struct dsync_message *src_msg = &sync->src_msg_iter->msg;
240
struct dsync_message *dest_msg = &sync->dest_msg_iter->msg;
241
const char *src_guid, *dest_guid;
242
unsigned char guid_128_data[MAIL_GUID_128_SIZE * 2 + 1];
243
bool src_expunged, dest_expunged;
245
i_assert(sync->src_msg_iter->mailbox_idx ==
246
sync->dest_msg_iter->mailbox_idx);
248
src_expunged = (src_msg->flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0;
249
dest_expunged = (dest_msg->flags & DSYNC_MAIL_FLAG_EXPUNGED) != 0;
251
/* If a message is expunged, it's guaranteed to have a 128bit GUID.
252
If the other message isn't expunged, we'll need to convert its GUID
253
to the 128bit GUID form (if it's not already) so that we can compare
256
src_guid = src_msg->guid;
257
dest_guid = dsync_get_guid_128_str(dest_msg->guid,
259
sizeof(guid_128_data));
260
} else if (dest_expunged) {
261
src_guid = dsync_get_guid_128_str(src_msg->guid, guid_128_data,
262
sizeof(guid_128_data));
263
dest_guid = dest_msg->guid;
265
src_guid = src_msg->guid;
266
dest_guid = dest_msg->guid;
269
/* FIXME: checking for sync->uid_conflict isn't fully reliable here.
270
we should be checking if the next matching message pair has a
271
conflict, not if the previous pair had one. */
272
if (src_msg->uid < dest_msg->uid) {
273
/* message has been expunged from dest. */
275
/* expunged from source already */
276
} else if (sync->uid_conflict || sync->brain->backup) {
277
/* update uid src, copy to dest */
278
dsync_brain_msg_sync_conflict(sync->src_msg_iter,
282
/* expunge from source */
283
dsync_worker_msg_expunge(sync->src_worker,
286
src_msg->guid = NULL;
288
} else if (src_msg->uid > dest_msg->uid) {
289
/* message has been expunged from src. */
291
/* expunged from dest already */
292
} else if (sync->uid_conflict && !sync->brain->backup) {
293
/* update uid in dest, copy to src */
294
dsync_brain_msg_sync_conflict(sync->dest_msg_iter,
298
/* expunge from dest */
299
dsync_worker_msg_expunge(sync->dest_worker,
302
dest_msg->guid = NULL;
306
/* UIDs match, but do GUIDs? If either of the GUIDs aren't set, it
307
means that either the storage doesn't support GUIDs or we're
308
handling an old-style expunge record. In that case just assume
310
if (strcmp(src_guid, dest_guid) != 0 &&
311
*src_guid != '\0' && *dest_guid != '\0') {
312
/* UID conflict. give new UIDs to messages in both src and
313
dest (if they're not expunged already) */
314
sync->uid_conflict = TRUE;
315
if (!dest_expunged) {
316
dsync_brain_msg_sync_conflict(sync->dest_msg_iter,
321
dsync_brain_msg_sync_conflict(sync->src_msg_iter,
325
} else if (dest_expunged) {
326
/* message expunged from destination */
328
/* expunged from source already */
329
} else if (sync->brain->backup) {
330
dsync_brain_msg_sync_conflict(sync->src_msg_iter,
334
dsync_worker_msg_expunge(sync->src_worker,
337
} else if (src_expunged) {
338
/* message expunged from source, expunge from destination too */
339
dsync_worker_msg_expunge(sync->dest_worker, dest_msg->uid);
341
/* message exists in both source and dest, sync metadata */
342
dsync_brain_msg_sync_existing(sync, src_msg, dest_msg);
344
src_msg->guid = NULL;
345
dest_msg->guid = NULL;
349
static bool dsync_brain_msg_sync_mailbox_end(struct dsync_brain_msg_iter *iter1,
350
struct dsync_brain_msg_iter *iter2)
354
while ((ret = dsync_brain_msg_iter_next(iter1)) > 0) {
355
dsync_brain_msg_sync_save(iter2, iter1->mailbox_idx,
357
iter1->msg.guid = NULL;
363
dsync_brain_msg_sync_mailbox_more(struct dsync_brain_mailbox_sync *sync)
367
while ((ret = dsync_brain_msg_iter_next_pair(sync)) > 0) {
368
if (dsync_brain_msg_sync_pair(sync) < 0)
370
if (dsync_worker_is_output_full(sync->dest_worker)) {
371
if (dsync_worker_output_flush(sync->dest_worker) <= 0)
378
/* finished syncing messages in this mailbox that exist in both source
379
and destination. if there are messages left, we can't reliably know
380
if they should be expunged, so just copy them to the other side. */
381
if (!sync->brain->backup) {
382
if (!dsync_brain_msg_sync_mailbox_end(sync->dest_msg_iter,
386
if (!dsync_brain_msg_sync_mailbox_end(sync->src_msg_iter,
387
sync->dest_msg_iter))
390
/* done with this mailbox. the same iterator is still used for
391
getting messages from other mailboxes. */
395
void dsync_brain_msg_sync_more(struct dsync_brain_mailbox_sync *sync)
397
const struct dsync_brain_mailbox *mailboxes;
398
unsigned int count, mailbox_idx = 0;
400
mailboxes = array_get(&sync->mailboxes, &count);
401
while (dsync_brain_msg_sync_mailbox_more(sync)) {
402
/* sync the next mailbox */
403
sync->uid_conflict = FALSE;
404
mailbox_idx = ++sync->wanted_mailbox_idx;
405
if (mailbox_idx >= count)
408
dsync_worker_select_mailbox(sync->src_worker,
409
&mailboxes[mailbox_idx].box);
410
dsync_worker_select_mailbox(sync->dest_worker,
411
&mailboxes[mailbox_idx].box);
413
if (mailbox_idx < count) {
414
/* output buffer is full */
418
/* finished with all mailboxes. */
419
dsync_worker_set_input_callback(sync->src_msg_iter->worker, NULL, NULL);
420
dsync_worker_set_output_callback(sync->src_msg_iter->worker, NULL, NULL);
421
dsync_worker_set_input_callback(sync->dest_msg_iter->worker, NULL, NULL);
422
dsync_worker_set_output_callback(sync->dest_msg_iter->worker, NULL, NULL);
424
if (dsync_worker_msg_iter_deinit(&sync->src_msg_iter->iter) < 0 ||
425
dsync_worker_msg_iter_deinit(&sync->dest_msg_iter->iter) < 0) {
426
dsync_brain_fail(sync->brain);
430
dsync_brain_msg_sync_new_msgs(sync);
433
static void dsync_worker_msg_callback(void *context)
435
struct dsync_brain_mailbox_sync *sync = context;
437
dsync_brain_msg_sync_more(sync);
440
static struct dsync_brain_msg_iter *
441
dsync_brain_msg_iter_init(struct dsync_brain_mailbox_sync *sync,
442
struct dsync_worker *worker,
443
const mailbox_guid_t mailboxes[],
444
unsigned int mailbox_count)
446
struct dsync_brain_msg_iter *iter;
448
iter = p_new(sync->pool, struct dsync_brain_msg_iter, 1);
450
iter->worker = worker;
451
i_array_init(&iter->uid_conflicts, 128);
452
i_array_init(&iter->new_msgs, 128);
453
iter->guid_hash = hash_table_create(default_pool, sync->pool, 10000,
455
(hash_cmp_callback_t *)strcasecmp);
457
iter->iter = dsync_worker_msg_iter_init(worker, mailboxes,
459
dsync_worker_set_input_callback(worker,
460
dsync_worker_msg_callback, sync);
461
dsync_worker_set_output_callback(worker,
462
dsync_worker_msg_callback, sync);
463
if (mailbox_count > 0) {
464
const struct dsync_brain_mailbox *first;
466
first = array_idx(&sync->mailboxes, 0);
467
dsync_worker_select_mailbox(worker, &first->box);
472
static void dsync_brain_msg_iter_deinit(struct dsync_brain_msg_iter *iter)
474
if (iter->iter != NULL)
475
(void)dsync_worker_msg_iter_deinit(&iter->iter);
477
hash_table_destroy(&iter->guid_hash);
478
array_free(&iter->uid_conflicts);
479
array_free(&iter->new_msgs);
483
get_mailbox_guids(const ARRAY_TYPE(dsync_brain_mailbox) *mailboxes,
484
ARRAY_TYPE(mailbox_guid) *guids)
486
const struct dsync_brain_mailbox *brain_box;
488
t_array_init(guids, array_count(mailboxes));
489
array_foreach(mailboxes, brain_box)
490
array_append(guids, &brain_box->box.mailbox_guid, 1);
493
struct dsync_brain_mailbox_sync *
494
dsync_brain_msg_sync_init(struct dsync_brain *brain,
495
const ARRAY_TYPE(dsync_brain_mailbox) *mailboxes)
497
struct dsync_brain_mailbox_sync *sync;
500
pool = pool_alloconly_create("dsync brain mailbox sync", 1024*256);
501
sync = p_new(pool, struct dsync_brain_mailbox_sync, 1);
504
sync->src_worker = brain->src_worker;
505
sync->dest_worker = brain->dest_worker;
507
p_array_init(&sync->mailboxes, pool, array_count(mailboxes));
508
array_append_array(&sync->mailboxes, mailboxes);
510
ARRAY_TYPE(mailbox_guid) guids_arr;
511
const mailbox_guid_t *guids;
514
get_mailbox_guids(mailboxes, &guids_arr);
516
/* initialize message iteration on both workers */
517
guids = array_get(&guids_arr, &count);
519
dsync_brain_msg_iter_init(sync, brain->src_worker,
521
sync->dest_msg_iter =
522
dsync_brain_msg_iter_init(sync, brain->dest_worker,
528
void dsync_brain_msg_sync_deinit(struct dsync_brain_mailbox_sync **_sync)
530
struct dsync_brain_mailbox_sync *sync = *_sync;
534
dsync_brain_msg_iter_deinit(sync->src_msg_iter);
535
dsync_brain_msg_iter_deinit(sync->dest_msg_iter);
536
pool_unref(&sync->pool);