1
/* Copyright (c) 2011-2012 Dovecot authors, see the included COPYING file */
8
#include "write-full.h"
9
#include "seq-range-array.h"
10
#include "mail-storage.h"
11
#include "fts-expunge-log.h"
17
struct fts_expunge_log_record {
18
/* CRC32 of this entire record (except this checksum) */
20
/* Size of this entire record */
25
/* { uid1, uid2 } pairs */
26
/* uint32_t expunge_uid_ranges[]; */
28
/* Total number of messages expunged so far in this log */
29
/* uint32_t expunge_count; */
32
struct fts_expunge_log {
39
struct fts_expunge_log_mailbox {
41
ARRAY_TYPE(seq_range) uids;
45
struct fts_expunge_log_append_ctx {
46
struct fts_expunge_log *log;
49
struct hash_table *mailboxes;
50
struct fts_expunge_log_mailbox *prev_mailbox;
55
struct fts_expunge_log_read_ctx {
56
struct fts_expunge_log *log;
58
struct istream *input;
60
struct fts_expunge_log_read_record read_rec;
66
struct fts_expunge_log *fts_expunge_log_init(const char *path)
68
struct fts_expunge_log *log;
70
log = i_new(struct fts_expunge_log, 1);
71
log->path = i_strdup(path);
76
void fts_expunge_log_deinit(struct fts_expunge_log **_log)
78
struct fts_expunge_log *log = *_log;
85
static int fts_expunge_log_open(struct fts_expunge_log *log, bool create)
89
i_assert(log->fd == -1);
91
/* FIXME: use proper permissions */
92
fd = open(log->path, O_RDWR | O_APPEND | (create ? O_CREAT : 0), 0600);
94
if (errno == ENOENT && !create)
97
i_error("open(%s) failed: %m", log->path);
100
if (fstat(fd, &log->st) < 0) {
101
i_error("fstat(%s) failed: %m", log->path);
110
fts_expunge_log_reopen_if_needed(struct fts_expunge_log *log, bool create)
115
return fts_expunge_log_open(log, create);
117
if (stat(log->path, &st) == 0) {
118
if (st.st_ino == log->st.st_ino &&
119
CMP_DEV_T(st.st_dev, log->st.st_dev)) {
124
} else if (errno == ENOENT) {
125
/* recreate the file */
127
i_error("stat(%s) failed: %m", log->path);
130
if (close(log->fd) < 0)
131
i_error("close(%s) failed: %m", log->path);
133
return fts_expunge_log_open(log, create);
137
fts_expunge_log_read_expunge_count(struct fts_expunge_log *log,
138
uint32_t *expunge_count_r)
142
i_assert(log->fd != -1);
144
if (fstat(log->fd, &log->st) < 0) {
145
i_error("fstat(%s) failed: %m", log->path);
148
if ((uoff_t)log->st.st_size < sizeof(*expunge_count_r)) {
149
*expunge_count_r = 0;
152
/* we'll assume that write()s atomically grow the file size, as
153
O_APPEND almost guarantees. even if not, having a race condition
154
isn't the end of the world. the expunge count is simply read wrong
155
and fts optimize is performed earlier or later than intended. */
156
ret = pread(log->fd, expunge_count_r, sizeof(*expunge_count_r),
157
log->st.st_size - 4);
159
i_error("pread(%s) failed: %m", log->path);
162
if (ret != sizeof(*expunge_count_r)) {
163
i_error("pread(%s) read only %d of %d bytes", log->path,
164
(int)ret, (int)sizeof(*expunge_count_r));
170
struct fts_expunge_log_append_ctx *
171
fts_expunge_log_append_begin(struct fts_expunge_log *log)
173
struct fts_expunge_log_append_ctx *ctx;
176
pool = pool_alloconly_create("fts expunge log append", 1024);
177
ctx = p_new(pool, struct fts_expunge_log_append_ctx, 1);
181
hash_table_create(default_pool, pool, 0,
182
guid_128_hash, guid_128_cmp);
184
if (fts_expunge_log_reopen_if_needed(log, TRUE) < 0)
189
static struct fts_expunge_log_mailbox *
190
fts_expunge_log_mailbox_alloc(struct fts_expunge_log_append_ctx *ctx,
191
const guid_128_t mailbox_guid)
193
struct fts_expunge_log_mailbox *mailbox;
195
mailbox = p_new(ctx->pool, struct fts_expunge_log_mailbox, 1);
196
memcpy(mailbox->guid, mailbox_guid, sizeof(mailbox->guid));
197
p_array_init(&mailbox->uids, ctx->pool, 16);
198
hash_table_insert(ctx->mailboxes, mailbox->guid, mailbox);
202
void fts_expunge_log_append_next(struct fts_expunge_log_append_ctx *ctx,
203
const guid_128_t mailbox_guid,
206
struct fts_expunge_log_mailbox *mailbox;
208
if (ctx->prev_mailbox != NULL &&
209
memcmp(mailbox_guid, ctx->prev_mailbox->guid, GUID_128_SIZE) == 0)
210
mailbox = ctx->prev_mailbox;
212
mailbox = hash_table_lookup(ctx->mailboxes, mailbox_guid);
214
mailbox = fts_expunge_log_mailbox_alloc(ctx, mailbox_guid);
215
ctx->prev_mailbox = mailbox;
217
if (!seq_range_array_add(&mailbox->uids, 0, uid))
218
mailbox->uids_count++;
222
fts_expunge_log_export(struct fts_expunge_log_append_ctx *ctx,
223
uint32_t expunge_count, buffer_t *output)
225
struct hash_iterate_context *iter;
227
struct fts_expunge_log_record *rec;
230
iter = hash_table_iterate_init(ctx->mailboxes);
231
while (hash_table_iterate(iter, &key, &value)) {
232
struct fts_expunge_log_mailbox *mailbox = value;
234
rec_offset = output->used;
235
rec = buffer_append_space_unsafe(output, sizeof(*rec));
236
memcpy(rec->guid, mailbox->guid, sizeof(rec->guid));
238
/* uint32_t expunge_uid_ranges[]; */
239
buffer_append(output, array_idx(&mailbox->uids, 0),
240
array_count(&mailbox->uids) *
241
sizeof(struct seq_range));
242
/* uint32_t expunge_count; */
243
expunge_count += mailbox->uids_count;
244
buffer_append(output, &expunge_count, sizeof(expunge_count));
246
/* update the header now that we know the record contents */
247
rec = buffer_get_space_unsafe(output, rec_offset,
248
output->used - rec_offset);
249
rec->record_size = output->used - rec_offset;
250
rec->checksum = crc32_data(&rec->record_size,
252
sizeof(rec->checksum));
254
hash_table_iterate_deinit(&iter);
258
fts_expunge_log_write(struct fts_expunge_log_append_ctx *ctx)
260
struct fts_expunge_log *log = ctx->log;
262
uint32_t expunge_count, *e;
265
/* try to append to the latest file */
266
if (fts_expunge_log_reopen_if_needed(log, TRUE) < 0)
269
if (fts_expunge_log_read_expunge_count(log, &expunge_count) < 0)
272
buf = buffer_create_dynamic(default_pool, 1024);
273
fts_expunge_log_export(ctx, expunge_count, buf);
274
/* the file was opened with O_APPEND, so this write() should be
275
appended atomically without any need for locking. */
277
if ((ret = write_full(log->fd, buf->data, buf->used)) < 0) {
278
i_error("write(%s) failed: %m", log->path);
279
if (ftruncate(log->fd, log->st.st_size) < 0)
280
i_error("ftruncate(%s) failed: %m", log->path);
282
if ((ret = fts_expunge_log_reopen_if_needed(log, TRUE)) <= 0)
284
/* the log was unlinked, so we'll need to write again to
285
the new file. the expunge_count needs to be reset to zero
287
e = buffer_get_space_unsafe(buf, buf->used - sizeof(uint32_t),
289
i_assert(*e > expunge_count);
296
/* finish by closing the log. this forces NFS to flush the
297
changes to disk without our having to explicitly play with
299
if (close(log->fd) < 0) {
300
/* FIXME: we should ftruncate() in case there
301
were partial writes.. */
302
i_error("close(%s) failed: %m", log->path);
310
int fts_expunge_log_append_commit(struct fts_expunge_log_append_ctx **_ctx)
312
struct fts_expunge_log_append_ctx *ctx = *_ctx;
313
int ret = ctx->failed ? -1 : 0;
317
ret = fts_expunge_log_write(ctx);
319
hash_table_destroy(&ctx->mailboxes);
320
pool_unref(&ctx->pool);
324
int fts_expunge_log_uid_count(struct fts_expunge_log *log,
325
unsigned int *expunges_r)
329
if ((ret = fts_expunge_log_reopen_if_needed(log, FALSE)) <= 0) {
334
return fts_expunge_log_read_expunge_count(log, expunges_r);
337
struct fts_expunge_log_read_ctx *
338
fts_expunge_log_read_begin(struct fts_expunge_log *log)
340
struct fts_expunge_log_read_ctx *ctx;
342
ctx = i_new(struct fts_expunge_log_read_ctx, 1);
344
if (fts_expunge_log_reopen_if_needed(log, FALSE) < 0)
346
else if (log->fd != -1)
347
ctx->input = i_stream_create_fd(log->fd, (size_t)-1, FALSE);
352
fts_expunge_log_record_size_is_valid(const struct fts_expunge_log_record *rec,
353
unsigned int *uids_size_r)
355
if (rec->record_size < sizeof(*rec) + sizeof(uint32_t)*3)
357
*uids_size_r = rec->record_size - sizeof(*rec) - sizeof(uint32_t);
358
return *uids_size_r % sizeof(uint32_t)*2 == 0;
362
fts_expunge_log_read_failure(struct fts_expunge_log_read_ctx *ctx,
363
unsigned int wanted_size)
367
if (ctx->input->stream_errno != 0) {
369
i_error("read(%s) failed: %m", ctx->log->path);
371
(void)i_stream_get_data(ctx->input, &size);
372
ctx->corrupted = TRUE;
373
i_error("Corrupted fts expunge log %s: "
374
"Unexpected EOF (read %"PRIuSIZE_T" / %u bytes)",
375
ctx->log->path, size, wanted_size);
379
const struct fts_expunge_log_read_record *
380
fts_expunge_log_read_next(struct fts_expunge_log_read_ctx *ctx)
382
const unsigned char *data;
383
const struct fts_expunge_log_record *rec;
384
unsigned int uids_size;
388
if (ctx->input == NULL)
391
/* initial read to try to get the record */
392
(void)i_stream_read_data(ctx->input, &data, &size, IO_BLOCK_SIZE);
393
if (size == 0 && ctx->input->stream_errno == 0) {
394
/* expected EOF - mark the file as read by unlinking it */
395
if (unlink(ctx->log->path) < 0 && errno != ENOENT)
396
i_error("unlink(%s) failed: %m", ctx->log->path);
398
/* try reading again, in case something new was written */
399
i_stream_sync(ctx->input);
400
(void)i_stream_read_data(ctx->input, &data, &size,
403
if (size < sizeof(*rec)) {
404
if (size == 0 && ctx->input->stream_errno == 0) {
408
fts_expunge_log_read_failure(ctx, sizeof(*rec));
411
rec = (const void *)data;
413
if (!fts_expunge_log_record_size_is_valid(rec, &uids_size)) {
414
ctx->corrupted = TRUE;
415
i_error("Corrupted fts expunge log %s: "
416
"Invalid record size: %u",
417
ctx->log->path, rec->record_size);
421
/* read the entire record */
422
while (size < rec->record_size) {
423
if (i_stream_read_data(ctx->input, &data, &size,
424
rec->record_size-1) < 0) {
425
fts_expunge_log_read_failure(ctx, rec->record_size);
428
rec = (const void *)data;
431
/* verify that the record checksum is valid */
432
checksum = crc32_data(&rec->record_size,
433
rec->record_size - sizeof(rec->checksum));
434
if (checksum != rec->checksum) {
435
ctx->corrupted = TRUE;
436
i_error("Corrupted fts expunge log %s: "
437
"Record checksum mismatch: %u != %u",
438
ctx->log->path, checksum, rec->checksum);
442
memcpy(ctx->read_rec.mailbox_guid, rec->guid,
443
sizeof(ctx->read_rec.mailbox_guid));
444
/* create the UIDs array by pointing it directly into input
446
buffer_create_const_data(&ctx->buffer, rec + 1, uids_size);
447
array_create_from_buffer(&ctx->read_rec.uids, &ctx->buffer,
448
sizeof(struct seq_range));
450
i_stream_skip(ctx->input, rec->record_size);
451
return &ctx->read_rec;
454
int fts_expunge_log_read_end(struct fts_expunge_log_read_ctx **_ctx)
456
struct fts_expunge_log_read_ctx *ctx = *_ctx;
457
int ret = ctx->failed ? -1 : (ctx->corrupted ? 0 : 1);
461
if (ctx->input != NULL)
462
i_stream_unref(&ctx->input);