diff options
Diffstat (limited to 'src/plugins/fts/fts-expunge-log.c')
-rw-r--r-- | src/plugins/fts/fts-expunge-log.c | 617 |
1 files changed, 617 insertions, 0 deletions
diff --git a/src/plugins/fts/fts-expunge-log.c b/src/plugins/fts/fts-expunge-log.c new file mode 100644 index 0000000..d39ceea --- /dev/null +++ b/src/plugins/fts/fts-expunge-log.c @@ -0,0 +1,617 @@ +/* Copyright (c) 2011-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "crc32.h" +#include "hash.h" +#include "istream.h" +#include "write-full.h" +#include "seq-range-array.h" +#include "mail-storage.h" +#include "fts-expunge-log.h" + +#include <sys/stat.h> +#include <unistd.h> +#include <fcntl.h> + +struct fts_expunge_log_record { + /* CRC32 of this entire record (except this checksum) */ + uint32_t checksum; + /* Size of this entire record */ + uint32_t record_size; + + /* Mailbox GUID */ + guid_128_t guid; + /* { uid1, uid2 } pairs */ + /* uint32_t expunge_uid_ranges[]; */ + + /* Total number of messages expunged so far in this log */ + /* uint32_t expunge_count; */ +}; + +struct fts_expunge_log { + char *path; + + int fd; + struct stat st; +}; + +struct fts_expunge_log_mailbox { + guid_128_t guid; + ARRAY_TYPE(seq_range) uids; + unsigned uids_count; +}; + +struct fts_expunge_log_append_ctx { + struct fts_expunge_log *log; + pool_t pool; + + HASH_TABLE(uint8_t *, struct fts_expunge_log_mailbox *) mailboxes; + struct fts_expunge_log_mailbox *prev_mailbox; + + bool failed; +}; + +struct fts_expunge_log_read_ctx { + struct fts_expunge_log *log; + + struct istream *input; + buffer_t buffer; + struct fts_expunge_log_read_record read_rec; + + bool failed; + bool corrupted; + bool unlink; +}; + +struct fts_expunge_log *fts_expunge_log_init(const char *path) +{ + struct fts_expunge_log *log; + + log = i_new(struct fts_expunge_log, 1); + log->path = i_strdup(path); + log->fd = -1; + return log; +} + +void fts_expunge_log_deinit(struct fts_expunge_log **_log) +{ + struct fts_expunge_log *log = *_log; + + *_log = NULL; + i_close_fd(&log->fd); + i_free(log->path); + i_free(log); +} + +static int fts_expunge_log_open(struct fts_expunge_log *log, bool create) +{ + int fd; + + i_assert(log->fd == -1); + + /* FIXME: use proper permissions */ + fd = open(log->path, O_RDWR | O_APPEND | (create ? O_CREAT : 0), 0600); + if (fd == -1) { + if (errno == ENOENT && !create) + return 0; + + i_error("open(%s) failed: %m", log->path); + return -1; + } + if (fstat(fd, &log->st) < 0) { + i_error("fstat(%s) failed: %m", log->path); + i_close_fd(&fd); + return -1; + } + log->fd = fd; + return 1; +} + +static int +fts_expunge_log_reopen_if_needed(struct fts_expunge_log *log, bool create) +{ + struct stat st; + + if (log->fd == -1) + return fts_expunge_log_open(log, create); + + if (stat(log->path, &st) == 0) { + if (st.st_ino == log->st.st_ino && + CMP_DEV_T(st.st_dev, log->st.st_dev)) { + /* same file */ + return 0; + } + /* file changed */ + } else if (errno == ENOENT) { + /* recreate the file */ + } else { + i_error("stat(%s) failed: %m", log->path); + return -1; + } + if (close(log->fd) < 0) + i_error("close(%s) failed: %m", log->path); + log->fd = -1; + return fts_expunge_log_open(log, create); +} + +static int +fts_expunge_log_read_expunge_count(struct fts_expunge_log *log, + uint32_t *expunge_count_r) +{ + ssize_t ret; + + i_assert(log->fd != -1); + + if (fstat(log->fd, &log->st) < 0) { + i_error("fstat(%s) failed: %m", log->path); + return -1; + } + if ((uoff_t)log->st.st_size < sizeof(*expunge_count_r)) { + *expunge_count_r = 0; + return 0; + } + /* we'll assume that write()s atomically grow the file size, as + O_APPEND almost guarantees. even if not, having a race condition + isn't the end of the world. the expunge count is simply read wrong + and fts optimize is performed earlier or later than intended. */ + ret = pread(log->fd, expunge_count_r, sizeof(*expunge_count_r), + log->st.st_size - 4); + if (ret < 0) { + i_error("pread(%s) failed: %m", log->path); + return -1; + } + if (ret != sizeof(*expunge_count_r)) { + i_error("pread(%s) read only %d of %d bytes", log->path, + (int)ret, (int)sizeof(*expunge_count_r)); + return -1; + } + return 0; +} + +struct fts_expunge_log_append_ctx * +fts_expunge_log_append_begin(struct fts_expunge_log *log) +{ + struct fts_expunge_log_append_ctx *ctx; + pool_t pool; + + pool = pool_alloconly_create("fts expunge log append", 1024); + ctx = p_new(pool, struct fts_expunge_log_append_ctx, 1); + ctx->log = log; + ctx->pool = pool; + hash_table_create(&ctx->mailboxes, pool, 0, guid_128_hash, guid_128_cmp); + + if (log != NULL && fts_expunge_log_reopen_if_needed(log, TRUE) < 0) + ctx->failed = TRUE; + return ctx; +} + +static struct fts_expunge_log_mailbox * +fts_expunge_log_mailbox_alloc(struct fts_expunge_log_append_ctx *ctx, + const guid_128_t mailbox_guid) +{ + uint8_t *guid_p; + struct fts_expunge_log_mailbox *mailbox; + + mailbox = p_new(ctx->pool, struct fts_expunge_log_mailbox, 1); + guid_128_copy(mailbox->guid, mailbox_guid); + p_array_init(&mailbox->uids, ctx->pool, 16); + + guid_p = mailbox->guid; + hash_table_insert(ctx->mailboxes, guid_p, mailbox); + return mailbox; +} + +static struct fts_expunge_log_mailbox * +fts_expunge_log_append_mailbox(struct fts_expunge_log_append_ctx *ctx, + const guid_128_t mailbox_guid) +{ + const uint8_t *guid_p = mailbox_guid; + struct fts_expunge_log_mailbox *mailbox; + + if (ctx->prev_mailbox != NULL && + guid_128_equals(mailbox_guid, ctx->prev_mailbox->guid)) + mailbox = ctx->prev_mailbox; + else { + mailbox = hash_table_lookup(ctx->mailboxes, guid_p); + if (mailbox == NULL) + mailbox = fts_expunge_log_mailbox_alloc(ctx, mailbox_guid); + ctx->prev_mailbox = mailbox; + } + return mailbox; +} +void fts_expunge_log_append_next(struct fts_expunge_log_append_ctx *ctx, + const guid_128_t mailbox_guid, + uint32_t uid) +{ + struct fts_expunge_log_mailbox *mailbox; + + mailbox = fts_expunge_log_append_mailbox(ctx, mailbox_guid); + if (!seq_range_array_add(&mailbox->uids, uid)) + mailbox->uids_count++; +} +void fts_expunge_log_append_range(struct fts_expunge_log_append_ctx *ctx, + const guid_128_t mailbox_guid, + const struct seq_range *uids) +{ + struct fts_expunge_log_mailbox *mailbox; + + mailbox = fts_expunge_log_append_mailbox(ctx, mailbox_guid); + mailbox->uids_count += seq_range_array_add_range_count(&mailbox->uids, + uids->seq1, uids->seq2); + /* To be honest, an unbacked log doesn't need to maintain the uids_count, + but we don't know here if we're supporting an unbacked log or not, so we + have to maintain the value, just in case. + At the moment, the only caller of this function is for unbacked logs. */ +} +void fts_expunge_log_append_record(struct fts_expunge_log_append_ctx *ctx, + const struct fts_expunge_log_read_record *record) +{ + const struct seq_range *range; + /* FIXME: Optimise with a merge */ + array_foreach(&record->uids, range) + fts_expunge_log_append_range(ctx, record->mailbox_guid, range); +} +static void fts_expunge_log_append_mailbox_record(struct fts_expunge_log_append_ctx *ctx, + struct fts_expunge_log_mailbox *mailbox) +{ + const struct seq_range *range; + /* FIXME: Optimise with a merge */ + array_foreach(&mailbox->uids, range) + fts_expunge_log_append_range(ctx, mailbox->guid, range); +} + +static void +fts_expunge_log_export(struct fts_expunge_log_append_ctx *ctx, + uint32_t expunge_count, buffer_t *output) +{ + struct hash_iterate_context *iter; + uint8_t *guid_p; + struct fts_expunge_log_mailbox *mailbox; + struct fts_expunge_log_record *rec; + size_t rec_offset; + + iter = hash_table_iterate_init(ctx->mailboxes); + while (hash_table_iterate(iter, ctx->mailboxes, &guid_p, &mailbox)) { + rec_offset = output->used; + rec = buffer_append_space_unsafe(output, sizeof(*rec)); + memcpy(rec->guid, mailbox->guid, sizeof(rec->guid)); + + /* uint32_t expunge_uid_ranges[]; */ + buffer_append(output, array_front(&mailbox->uids), + array_count(&mailbox->uids) * + sizeof(struct seq_range)); + /* uint32_t expunge_count; */ + expunge_count += mailbox->uids_count; + buffer_append(output, &expunge_count, sizeof(expunge_count)); + + /* update the header now that we know the record contents */ + rec = buffer_get_space_unsafe(output, rec_offset, + output->used - rec_offset); + rec->record_size = output->used - rec_offset; + rec->checksum = crc32_data(&rec->record_size, + rec->record_size - + sizeof(rec->checksum)); + } + hash_table_iterate_deinit(&iter); +} + +static int +fts_expunge_log_write(struct fts_expunge_log_append_ctx *ctx) +{ + struct fts_expunge_log *log = ctx->log; + buffer_t *buf; + uint32_t expunge_count, *e; + int ret; + + /* Unbacked expunge logs cannot be written, by definition */ + i_assert(log != NULL); + + /* try to append to the latest file */ + if (fts_expunge_log_reopen_if_needed(log, TRUE) < 0) + return -1; + + if (fts_expunge_log_read_expunge_count(log, &expunge_count) < 0) + return -1; + + buf = buffer_create_dynamic(default_pool, 1024); + fts_expunge_log_export(ctx, expunge_count, buf); + /* the file was opened with O_APPEND, so this write() should be + appended atomically without any need for locking. */ + for (;;) { + if (write_full(log->fd, buf->data, buf->used) < 0) { + i_error("write(%s) failed: %m", log->path); + if (ftruncate(log->fd, log->st.st_size) < 0) + i_error("ftruncate(%s) failed: %m", log->path); + } + if ((ret = fts_expunge_log_reopen_if_needed(log, TRUE)) <= 0) + break; + /* the log was unlinked, so we'll need to write again to + the new file. the expunge_count needs to be reset to zero + from here. */ + e = buffer_get_space_unsafe(buf, buf->used - sizeof(uint32_t), + sizeof(uint32_t)); + i_assert(*e > expunge_count); + *e -= expunge_count; + expunge_count = 0; + } + buffer_free(&buf); + + if (ret == 0) { + /* finish by closing the log. this forces NFS to flush the + changes to disk without our having to explicitly play with + fsync() */ + if (close(log->fd) < 0) { + /* FIXME: we should ftruncate() in case there + were partial writes.. */ + i_error("close(%s) failed: %m", log->path); + ret = -1; + } + log->fd = -1; + } + return ret; +} + +static int fts_expunge_log_append_finalize(struct fts_expunge_log_append_ctx **_ctx, + bool commit) +{ + struct fts_expunge_log_append_ctx *ctx = *_ctx; + int ret = ctx->failed ? -1 : 0; + + *_ctx = NULL; + if (commit && ret == 0) + ret = fts_expunge_log_write(ctx); + + hash_table_destroy(&ctx->mailboxes); + pool_unref(&ctx->pool); + return ret; +} + +int fts_expunge_log_uid_count(struct fts_expunge_log *log, + unsigned int *expunges_r) +{ + int ret; + + if ((ret = fts_expunge_log_reopen_if_needed(log, FALSE)) <= 0) { + *expunges_r = 0; + return ret; + } + + return fts_expunge_log_read_expunge_count(log, expunges_r); +} + +int fts_expunge_log_append_commit(struct fts_expunge_log_append_ctx **_ctx) +{ + return fts_expunge_log_append_finalize(_ctx, TRUE); +} + +int fts_expunge_log_append_abort(struct fts_expunge_log_append_ctx **_ctx) +{ + return fts_expunge_log_append_finalize(_ctx, FALSE); +} + +struct fts_expunge_log_read_ctx * +fts_expunge_log_read_begin(struct fts_expunge_log *log) +{ + struct fts_expunge_log_read_ctx *ctx; + + ctx = i_new(struct fts_expunge_log_read_ctx, 1); + ctx->log = log; + if (fts_expunge_log_reopen_if_needed(log, FALSE) < 0) + ctx->failed = TRUE; + else if (log->fd != -1) + ctx->input = i_stream_create_fd(log->fd, SIZE_MAX); + ctx->unlink = TRUE; + return ctx; +} + +static bool +fts_expunge_log_record_size_is_valid(const struct fts_expunge_log_record *rec, + unsigned int *uids_size_r) +{ + if (rec->record_size < sizeof(*rec) + sizeof(uint32_t)*3) + return FALSE; + *uids_size_r = rec->record_size - sizeof(*rec) - sizeof(uint32_t); + return *uids_size_r % sizeof(uint32_t)*2 == 0; +} + +static void +fts_expunge_log_read_failure(struct fts_expunge_log_read_ctx *ctx, + unsigned int wanted_size) +{ + size_t size; + + if (ctx->input->stream_errno != 0) { + ctx->failed = TRUE; + i_error("read(%s) failed: %s", ctx->log->path, + i_stream_get_error(ctx->input)); + } else { + size = i_stream_get_data_size(ctx->input); + ctx->corrupted = TRUE; + i_error("Corrupted fts expunge log %s: " + "Unexpected EOF (read %zu / %u bytes)", + ctx->log->path, size, wanted_size); + } +} + +const struct fts_expunge_log_read_record * +fts_expunge_log_read_next(struct fts_expunge_log_read_ctx *ctx) +{ + const unsigned char *data; + const struct fts_expunge_log_record *rec; + unsigned int uids_size; + size_t size; + uint32_t checksum; + + if (ctx->input == NULL) + return NULL; + + /* initial read to try to get the record */ + (void)i_stream_read_bytes(ctx->input, &data, &size, IO_BLOCK_SIZE); + if (size == 0 && ctx->input->stream_errno == 0) { + /* expected EOF - mark the file as read by unlinking it */ + if (ctx->unlink) + i_unlink_if_exists(ctx->log->path); + + /* try reading again, in case something new was written */ + i_stream_sync(ctx->input); + (void)i_stream_read_bytes(ctx->input, &data, &size, + IO_BLOCK_SIZE); + } + if (size < sizeof(*rec)) { + if (size == 0 && ctx->input->stream_errno == 0) { + /* expected EOF */ + return NULL; + } + fts_expunge_log_read_failure(ctx, sizeof(*rec)); + return NULL; + } + rec = (const void *)data; + + if (!fts_expunge_log_record_size_is_valid(rec, &uids_size)) { + ctx->corrupted = TRUE; + i_error("Corrupted fts expunge log %s: " + "Invalid record size: %u", + ctx->log->path, rec->record_size); + return NULL; + } + + /* read the entire record */ + while (size < rec->record_size) { + if (i_stream_read_bytes(ctx->input, &data, &size, rec->record_size) < 0) { + fts_expunge_log_read_failure(ctx, rec->record_size); + return NULL; + } + rec = (const void *)data; + } + + /* verify that the record checksum is valid */ + checksum = crc32_data(&rec->record_size, + rec->record_size - sizeof(rec->checksum)); + if (checksum != rec->checksum) { + ctx->corrupted = TRUE; + i_error("Corrupted fts expunge log %s: " + "Record checksum mismatch: %u != %u", + ctx->log->path, checksum, rec->checksum); + return NULL; + } + + memcpy(ctx->read_rec.mailbox_guid, rec->guid, + sizeof(ctx->read_rec.mailbox_guid)); + /* create the UIDs array by pointing it directly into input + stream's buffer */ + buffer_create_from_const_data(&ctx->buffer, rec + 1, uids_size); + array_create_from_buffer(&ctx->read_rec.uids, &ctx->buffer, + sizeof(struct seq_range)); + + i_stream_skip(ctx->input, rec->record_size); + return &ctx->read_rec; +} + +int fts_expunge_log_read_end(struct fts_expunge_log_read_ctx **_ctx) +{ + struct fts_expunge_log_read_ctx *ctx = *_ctx; + int ret = ctx->failed ? -1 : (ctx->corrupted ? 0 : 1); + + *_ctx = NULL; + + if (ctx->corrupted) { + if (ctx->unlink) + i_unlink_if_exists(ctx->log->path); + } + + i_stream_unref(&ctx->input); + i_free(ctx); + return ret; +} + +int fts_expunge_log_flatten(const char *path, + struct fts_expunge_log_append_ctx **flattened_r) +{ + struct fts_expunge_log *read; + struct fts_expunge_log_read_ctx *read_ctx; + const struct fts_expunge_log_read_record *record; + struct fts_expunge_log_append_ctx *append; + int ret; + + i_assert(path != NULL && flattened_r != NULL); + read = fts_expunge_log_init(path); + + read_ctx = fts_expunge_log_read_begin(read); + read_ctx->unlink = FALSE; + + append = fts_expunge_log_append_begin(NULL); + while((record = fts_expunge_log_read_next(read_ctx)) != NULL) { + fts_expunge_log_append_record(append, record); + } + + if ((ret = fts_expunge_log_read_end(&read_ctx)) > 0) + *flattened_r = append; + fts_expunge_log_deinit(&read); + + return ret; +} +bool fts_expunge_log_contains(const struct fts_expunge_log_append_ctx *ctx, + const guid_128_t mailbox_guid, uint32_t uid) +{ + const struct fts_expunge_log_mailbox *mailbox; + const uint8_t *guid_p = mailbox_guid; + + mailbox = hash_table_lookup(ctx->mailboxes, guid_p); + if (mailbox == NULL) + return FALSE; + return seq_range_exists(&mailbox->uids, uid); +} +int fts_expunge_log_append_remove(struct fts_expunge_log_append_ctx *from, + const struct fts_expunge_log_read_record *record) +{ + const uint8_t *guid_p = record->mailbox_guid; + struct fts_expunge_log_mailbox *mailbox = hash_table_lookup(from->mailboxes, guid_p); + if (mailbox == NULL) + return 0; /* may only remove things that exist */ + + mailbox->uids_count -= seq_range_array_remove_seq_range(&mailbox->uids, &record->uids); + return 1; +} +int fts_expunge_log_subtract(struct fts_expunge_log_append_ctx *from, + struct fts_expunge_log *subtract) +{ + unsigned int failures = 0; + struct fts_expunge_log_read_ctx *read_ctx = fts_expunge_log_read_begin(subtract); + read_ctx->unlink = FALSE; + + const struct fts_expunge_log_read_record *record; + while ((record = fts_expunge_log_read_next(read_ctx)) != NULL) { + if (fts_expunge_log_append_remove(from, record) <= 0) + failures++; + } + if (failures > 0) + i_warning("fts: Expunge log subtract ignored %u nonexistent mailbox GUIDs", + failures); + return fts_expunge_log_read_end(&read_ctx); +} +/* It could be argued that somehow adding a log (file) to the append context + and then calling the _write() helper would be easier. But then there's the + _commit() vs. _abort() cleanup that would need to be addressed. Just creating + a copy is simpler. */ +int fts_expunge_log_flat_write(const struct fts_expunge_log_append_ctx *read_log, + const char *path) +{ + int ret; + struct fts_expunge_log *nlog = fts_expunge_log_init(path); + struct fts_expunge_log_append_ctx *nappend = fts_expunge_log_append_begin(nlog); + + struct hash_iterate_context *iter; + uint8_t *guid_p; + struct fts_expunge_log_mailbox *mailbox; + + iter = hash_table_iterate_init(read_log->mailboxes); + while (hash_table_iterate(iter, read_log->mailboxes, &guid_p, &mailbox)) + fts_expunge_log_append_mailbox_record(nappend, mailbox); + + hash_table_iterate_deinit(&iter); + ret = fts_expunge_log_append_commit(&nappend); + fts_expunge_log_deinit(&nlog); + + return ret; +} |