diff options
Diffstat (limited to 'src/lib-storage/index/index-thread.c')
-rw-r--r-- | src/lib-storage/index/index-thread.c | 670 |
1 files changed, 670 insertions, 0 deletions
diff --git a/src/lib-storage/index/index-thread.c b/src/lib-storage/index/index-thread.c new file mode 100644 index 0000000..2b1ba04 --- /dev/null +++ b/src/lib-storage/index/index-thread.c @@ -0,0 +1,670 @@ +/* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */ + +/* doc/thread-refs.txt describes the incremental algorithm we use here. */ + +#include "lib.h" +#include "array.h" +#include "bsearch-insert-pos.h" +#include "hash2.h" +#include "message-id.h" +#include "mail-search.h" +#include "mail-search-build.h" +#include "mailbox-search-result-private.h" +#include "index-storage.h" +#include "index-thread-private.h" + + +#define MAIL_THREAD_CONTEXT(obj) \ + MODULE_CONTEXT(obj, mail_thread_storage_module) +#define MAIL_THREAD_CONTEXT_REQUIRE(obj) \ + MODULE_CONTEXT_REQUIRE(obj, mail_thread_storage_module) + +struct mail_thread_context { + struct mailbox *box; + struct mailbox_transaction_context *t; + struct mail_index_strmap_view_sync *strmap_sync; + + struct mail *tmp_mail; + struct mail_search_args *search_args; + ARRAY_TYPE(seq_range) added_uids; + + bool failed:1; + bool corrupted:1; +}; + +struct mail_thread_mailbox { + union mailbox_module_context module_ctx; + + unsigned int next_msgid_idx; + struct mail_thread_cache *cache; + + struct mail_index_strmap *strmap; + struct mail_index_strmap_view *strmap_view; + /* sorted by UID, ref_index */ + const ARRAY_TYPE(mail_index_strmap_rec) *msgid_map; + const struct hash2_table *msgid_hash; + + /* set only temporarily while needed */ + struct mail_thread_context *ctx; +}; + +static MODULE_CONTEXT_DEFINE_INIT(mail_thread_storage_module, + &mail_storage_module_register); + +static void mail_thread_clear(struct mail_thread_context *ctx); + +static int +mail_strmap_rec_get_msgid(struct mail_thread_context *ctx, + const struct mail_index_strmap_rec *rec, + const char **msgid_r) +{ + struct mail *mail = ctx->tmp_mail; + const char *msgids = NULL, *msgid; + unsigned int n = 0; + int ret; + + if (!mail_set_uid(mail, rec->uid)) + return 0; + + switch (rec->ref_index) { + case MAIL_THREAD_NODE_REF_MSGID: + /* Message-ID: header */ + ret = mail_get_first_header(mail, HDR_MESSAGE_ID, &msgids); + break; + case MAIL_THREAD_NODE_REF_INREPLYTO: + /* In-Reply-To: header */ + ret = mail_get_first_header(mail, HDR_IN_REPLY_TO, &msgids); + break; + default: + /* References: header */ + ret = mail_get_first_header(mail, HDR_REFERENCES, &msgids); + n = rec->ref_index - MAIL_THREAD_NODE_REF_REFERENCES1; + break; + } + + if (ret < 0) { + if (mail->expunged) { + /* treat it as if it didn't exist. trying to add it + again will result in failure. */ + return 0; + } + return -1; + } + + /* get the nth message-id */ + msgid = message_id_get_next(&msgids); + if (msgid != NULL) { + for (; n > 0; n--) + msgid = message_id_get_next(&msgids); + } + + if (msgid == NULL) { + /* shouldn't have happened, probably corrupted */ + mail_set_critical(mail, + "Corrupted thread index: lost Message ID %u", + rec->ref_index); + ctx->failed = TRUE; + ctx->corrupted = TRUE; + return -1; + } + *msgid_r = msgid; + return 1; +} + +static bool +mail_thread_hash_key_cmp(const char *key, + const struct mail_index_strmap_rec *rec, + void *context) +{ + struct mail_thread_mailbox *tbox = context; + struct mail_thread_context *ctx = tbox->ctx; + const char *msgid; + bool cmp_ret; + int ret; + + /* either a match or a collision, need to look closer */ + T_BEGIN { + ret = mail_strmap_rec_get_msgid(ctx, rec, &msgid); + if (ret <= 0) { + if (ret < 0) + ctx->failed = TRUE; + cmp_ret = FALSE; + } else { + cmp_ret = strcmp(msgid, key) == 0; + } + } T_END; + return cmp_ret; +} + +static int +mail_thread_hash_rec_cmp(const struct mail_index_strmap_rec *rec1, + const struct mail_index_strmap_rec *rec2, + void *context) +{ + struct mail_thread_mailbox *tbox = context; + struct mail_thread_context *ctx = tbox->ctx; + const char *msgid1, *msgid2; + int ret; + + T_BEGIN { + ret = mail_strmap_rec_get_msgid(ctx, rec1, &msgid1); + if (ret > 0) { + msgid1 = t_strdup(msgid1); + ret = mail_strmap_rec_get_msgid(ctx, rec2, &msgid2); + } + ret = ret <= 0 ? -1 : + strcmp(msgid1, msgid2) == 0; + } T_END; + return ret; +} + +static void mail_thread_strmap_remap(const uint32_t *idx_map, + unsigned int old_count, + unsigned int new_count, void *context) +{ + struct mail_thread_mailbox *tbox = context; + struct mail_thread_cache *cache = tbox->cache; + ARRAY_TYPE(mail_thread_node) new_nodes; + const struct mail_thread_node *old_nodes; + struct mail_thread_node *node; + unsigned int i, nodes_count, max, new_first_invalid, invalid_count; + + if (cache->search_result == NULL) + return; + + if (new_count == 0) { + /* strmap was reset, we'll need to rebuild thread */ + mailbox_search_result_free(&cache->search_result); + return; + } + + invalid_count = cache->next_invalid_msgid_str_idx - + cache->first_invalid_msgid_str_idx; + + old_nodes = array_get(&cache->thread_nodes, &nodes_count); + i_array_init(&new_nodes, new_count + invalid_count + 32); + + /* optimization: allocate all nodes initially */ + (void)array_idx_modifiable(&new_nodes, new_count-1); + + /* renumber existing valid nodes. all existing records in old_nodes + should also exist in idx_map since we've removed expunged messages + from the cache before committing the sync. */ + max = I_MIN(I_MIN(old_count, nodes_count), + cache->first_invalid_msgid_str_idx); + for (i = 0; i < max; i++) { + if (idx_map[i] == 0) { + /* expunged record. */ + i_assert(old_nodes[i].uid == 0); + } else { + node = array_idx_modifiable(&new_nodes, idx_map[i]); + *node = old_nodes[i]; + if (node->parent_idx != 0) { + node->parent_idx = idx_map[node->parent_idx]; + i_assert(node->parent_idx != 0); + } + } + } + + /* copy invalid nodes, if any. no other messages point to them, + so this is safe. we still need to update their parent_idx + pointers though. */ + new_first_invalid = new_count + 1 + + THREAD_INVALID_MSGID_STR_IDX_SKIP_COUNT; + for (i = 0; i < invalid_count; i++) { + node = array_idx_modifiable(&new_nodes, new_first_invalid + i); + *node = old_nodes[cache->first_invalid_msgid_str_idx + i]; + if (node->parent_idx != 0) { + node->parent_idx = idx_map[node->parent_idx]; + i_assert(node->parent_idx != 0); + } + } + cache->first_invalid_msgid_str_idx = new_first_invalid; + cache->next_invalid_msgid_str_idx = new_first_invalid + invalid_count; + + /* replace the old nodes with the renumbered ones */ + array_free(&cache->thread_nodes); + cache->thread_nodes = new_nodes; +} + +static int thread_get_mail_header(struct mail *mail, const char *name, + const char **value_r) +{ + if (mail_get_first_header(mail, name, value_r) < 0) { + if (!mail->expunged) + return -1; + + /* Message is expunged. Instead of failing the entire THREAD + command, just treat the header as nonexistent. */ + *value_r = NULL; + } + return 0; +} + +static int +mail_thread_map_add_mail(struct mail_thread_context *ctx, struct mail *mail) +{ + const char *message_id, *in_reply_to, *references, *msgid; + uint32_t ref_index; + + if (thread_get_mail_header(mail, HDR_MESSAGE_ID, &message_id) < 0 || + thread_get_mail_header(mail, HDR_REFERENCES, &references) < 0) + return -1; + + /* add Message-ID: */ + msgid = message_id_get_next(&message_id); + if (msgid != NULL) { + mail_index_strmap_view_sync_add(ctx->strmap_sync, mail->uid, + MAIL_THREAD_NODE_REF_MSGID, + msgid); + } else { + mail_index_strmap_view_sync_add_unique(ctx->strmap_sync, + mail->uid, MAIL_THREAD_NODE_REF_MSGID); + } + + /* add References: if there are any valid ones */ + msgid = message_id_get_next(&references); + if (msgid != NULL) { + ref_index = MAIL_THREAD_NODE_REF_REFERENCES1; + do { + mail_index_strmap_view_sync_add(ctx->strmap_sync, + mail->uid, + ref_index, msgid); + ref_index++; + msgid = message_id_get_next(&references); + } while (msgid != NULL); + } else { + /* no References:, use In-Reply-To: */ + if (thread_get_mail_header(mail, HDR_IN_REPLY_TO, + &in_reply_to) < 0) + return -1; + + msgid = message_id_get_next(&in_reply_to); + if (msgid != NULL) { + mail_index_strmap_view_sync_add(ctx->strmap_sync, + mail->uid, MAIL_THREAD_NODE_REF_INREPLYTO, + msgid); + } + } + if (ctx->failed) { + /* message-id lookup failed in hash compare */ + return -1; + } + return 0; +} + +static int mail_thread_index_map_build(struct mail_thread_context *ctx) +{ + static const char *wanted_headers[] = { + HDR_MESSAGE_ID, HDR_IN_REPLY_TO, HDR_REFERENCES, HDR_SUBJECT, + NULL + }; + struct mail_thread_mailbox *tbox = MAIL_THREAD_CONTEXT_REQUIRE(ctx->box); + struct mailbox_header_lookup_ctx *headers_ctx; + struct mail_search_args *search_args; + struct mail_search_context *search_ctx; + struct mail *mail; + uint32_t last_uid, seq1, seq2; + int ret = 0; + + if (tbox->strmap_view == NULL) { + /* first time we're threading this mailbox */ + tbox->strmap_view = + mail_index_strmap_view_open(tbox->strmap, + ctx->box->view, + mail_thread_hash_key_cmp, + mail_thread_hash_rec_cmp, + mail_thread_strmap_remap, + tbox, &tbox->msgid_map, + &tbox->msgid_hash); + } + + headers_ctx = mailbox_header_lookup_init(ctx->box, wanted_headers); + ctx->tmp_mail = mail_alloc(ctx->t, MAIL_FETCH_DATE | + MAIL_FETCH_RECEIVED_DATE, headers_ctx); + + /* add all missing UIDs */ + ctx->strmap_sync = mail_index_strmap_view_sync_init(tbox->strmap_view, + &last_uid); + mailbox_get_seq_range(ctx->box, last_uid + 1, (uint32_t)-1, + &seq1, &seq2); + if (seq1 == 0) { + /* nothing is missing */ + mail_index_strmap_view_sync_commit(&ctx->strmap_sync); + mailbox_header_lookup_unref(&headers_ctx); + return 0; + } + + search_args = mail_search_build_init(); + mail_search_build_add_seqset(search_args, seq1, seq2); + search_ctx = mailbox_search_init(ctx->t, search_args, NULL, + MAIL_FETCH_DATE | + MAIL_FETCH_RECEIVED_DATE, + headers_ctx); + mailbox_header_lookup_unref(&headers_ctx); + mail_search_args_unref(&search_args); + + while (mailbox_search_next(search_ctx, &mail)) { + if (mail_thread_map_add_mail(ctx, mail) < 0) { + ret = -1; + break; + } + } + if (mailbox_search_deinit(&search_ctx) < 0) + ret = -1; + + if (ret < 0) + mail_index_strmap_view_sync_rollback(&ctx->strmap_sync); + else + mail_index_strmap_view_sync_commit(&ctx->strmap_sync); + return ret; +} + +static int msgid_map_cmp(const uint32_t *uid, + const struct mail_index_strmap_rec *rec) +{ + return *uid < rec->uid ? -1 : + (*uid > rec->uid ? 1 : 0); +} + +static bool mail_thread_cache_update_removes(struct mail_thread_mailbox *tbox, + ARRAY_TYPE(seq_range) *added_uids) +{ + struct mail_thread_cache *cache = tbox->cache; + ARRAY_TYPE(seq_range) removed_uids; + const struct seq_range *uids; + const struct mail_index_strmap_rec *msgid_map; + unsigned int i, j, idx, map_count, uid_count; + uint32_t uid; + + t_array_init(&removed_uids, 64); + mailbox_search_result_sync(cache->search_result, + &removed_uids, added_uids); + + /* first check that we're not inserting any messages in the middle */ + uids = array_get(added_uids, &uid_count); + if (uid_count > 0 && uids[0].seq1 <= cache->last_uid) + return FALSE; + + /* next remove messages so we'll see early if we have to rebuild. + we expect to find all removed UIDs from msgid_map that are <= max + UID in msgid_map */ + msgid_map = array_get(tbox->msgid_map, &map_count); + uids = array_get(&removed_uids, &uid_count); + for (i = j = 0; i < uid_count; i++) { + /* find and remove from the map */ + bsearch_insert_pos(&uids[i].seq1, &msgid_map[j], + map_count - j, sizeof(*msgid_map), + msgid_map_cmp, &idx); + j += idx; + if (j == map_count) { + /* all removals after this are about messages we never + even added to the cache */ + i_assert(uids[i].seq1 > cache->last_uid); + break; + } + while (j > 0 && msgid_map[j-1].uid == msgid_map[j].uid) + j--; + + /* remove the messages from cache */ + for (uid = uids[i].seq1; uid <= uids[i].seq2; uid++) { + if (j == map_count) { + i_assert(uid > cache->last_uid); + break; + } + i_assert(msgid_map[j].uid == uid); + if (!mail_thread_remove(cache, msgid_map + j, &j)) + return FALSE; + } + } + return TRUE; +} + +static void mail_thread_cache_update_adds(struct mail_thread_mailbox *tbox, + ARRAY_TYPE(seq_range) *added_uids) +{ + struct mail_thread_cache *cache = tbox->cache; + const struct seq_range *uids; + const struct mail_index_strmap_rec *msgid_map; + unsigned int i, j, map_count, uid_count; + uint32_t uid; + + /* everything removed successfully, add the new messages. all of them + should already be in msgid_map. */ + uids = array_get(added_uids, &uid_count); + if (uid_count == 0) + return; + + (void)array_bsearch_insert_pos(tbox->msgid_map, &uids[0].seq1, + msgid_map_cmp, &j); + msgid_map = array_get(tbox->msgid_map, &map_count); + i_assert(j < map_count); + while (j > 0 && msgid_map[j-1].uid == msgid_map[j].uid) + j--; + + for (i = 0; i < uid_count; i++) { + for (uid = uids[i].seq1; uid <= uids[i].seq2; uid++) { + while (j < map_count && msgid_map[j].uid < uid) + j++; + i_assert(j < map_count && msgid_map[j].uid == uid); + mail_thread_add(cache, msgid_map+j, &j); + } + } +} + +static void +mail_thread_cache_fix_invalid_indexes(struct mail_thread_mailbox *tbox) +{ + struct mail_thread_cache *cache = tbox->cache; + uint32_t highest_idx, new_first_idx, count; + + highest_idx = mail_index_strmap_view_get_highest_idx(tbox->strmap_view); + new_first_idx = highest_idx + 1 + + THREAD_INVALID_MSGID_STR_IDX_SKIP_COUNT; + count = cache->next_invalid_msgid_str_idx - + cache->first_invalid_msgid_str_idx; + + if (count == 0) { + /* there are no invalid indexes yet, we can update the first + invalid index position to delay conflicts. */ + cache->first_invalid_msgid_str_idx = + cache->next_invalid_msgid_str_idx = new_first_idx; + } else if (highest_idx >= cache->first_invalid_msgid_str_idx) { + /* conflict - move the invalid indexes forward */ + array_copy(&cache->thread_nodes.arr, new_first_idx, + &cache->thread_nodes.arr, + cache->first_invalid_msgid_str_idx, count); + cache->first_invalid_msgid_str_idx = new_first_idx; + cache->next_invalid_msgid_str_idx = new_first_idx + count; + } +} + +static void mail_thread_cache_sync_remove(struct mail_thread_mailbox *tbox, + struct mail_thread_context *ctx) +{ + struct mail_thread_cache *cache = tbox->cache; + + if (cache->search_result == NULL) + return; + + if (mail_search_args_equal(ctx->search_args, + cache->search_result->search_args)) { + t_array_init(&ctx->added_uids, 64); + if (mail_thread_cache_update_removes(tbox, &ctx->added_uids)) { + /* successfully updated the cache */ + return; + } + } + /* failed to use the cache, rebuild */ + mailbox_search_result_free(&cache->search_result); +} + +static void mail_thread_cache_sync_add(struct mail_thread_mailbox *tbox, + struct mail_thread_context *ctx, + struct mail_search_context *search_ctx) +{ + struct mail_thread_cache *cache = tbox->cache; + struct mail *mail; + const struct mail_index_strmap_rec *msgid_map; + unsigned int i, count; + + mail_thread_cache_fix_invalid_indexes(tbox); + + if (cache->search_result != NULL) { + /* we already checked at sync_remove that we can use this + search result. */ + mail_thread_cache_update_adds(tbox, &ctx->added_uids); + return; + } + + cache->last_uid = 0; + cache->first_invalid_msgid_str_idx = cache->next_invalid_msgid_str_idx = + mail_index_strmap_view_get_highest_idx(tbox->strmap_view) + 1 + + THREAD_INVALID_MSGID_STR_IDX_SKIP_COUNT; + array_clear(&cache->thread_nodes); + + cache->search_result = + mailbox_search_result_save(search_ctx, + MAILBOX_SEARCH_RESULT_FLAG_UPDATE | + MAILBOX_SEARCH_RESULT_FLAG_QUEUE_SYNC); + + msgid_map = array_get(tbox->msgid_map, &count); + /* we're relying on the array being zero-terminated (outside used + count - kind of kludgy) */ + i_assert(msgid_map[count].uid == 0); + i = 0; + while (i < count && mailbox_search_next(search_ctx, &mail)) { + while (msgid_map[i].uid < mail->uid) + i++; + i_assert(i < count); + mail_thread_add(cache, msgid_map+i, &i); + } +} + +int mail_thread_init(struct mailbox *box, struct mail_search_args *args, + struct mail_thread_context **ctx_r) +{ + struct mail_thread_mailbox *tbox = MAIL_THREAD_CONTEXT_REQUIRE(box); + struct mail_thread_context *ctx; + struct mail_search_context *search_ctx; + int ret; + + i_assert(tbox->ctx == NULL); + + struct event_reason *reason = event_reason_begin("mailbox:thread"); + if (args != NULL) + mail_search_args_ref(args); + else { + args = mail_search_build_init(); + mail_search_build_add_all(args); + mail_search_args_init(args, box, FALSE, NULL); + } + + ctx = i_new(struct mail_thread_context, 1); + ctx->box = box; + ctx->search_args = args; + ctx->t = mailbox_transaction_begin(ctx->box, 0, __func__); + /* perform search first, so we don't break if there are INTHREAD keys */ + search_ctx = mailbox_search_init(ctx->t, args, NULL, 0, NULL); + + tbox->ctx = ctx; + + mail_thread_cache_sync_remove(tbox, ctx); + ret = mail_thread_index_map_build(ctx); + if (ret == 0) + mail_thread_cache_sync_add(tbox, ctx, search_ctx); + if (mailbox_search_deinit(&search_ctx) < 0) + ret = -1; + if (ctx->failed) { + ret = -1; + if (ctx->corrupted) + mail_index_strmap_view_set_corrupted(tbox->strmap_view); + } + event_reason_end(&reason); + if (ret < 0) { + mail_thread_deinit(&ctx); + return -1; + } else { + i_zero(&ctx->added_uids); + *ctx_r = ctx; + return 0; + } +} + +static void mail_thread_clear(struct mail_thread_context *ctx) +{ + mail_free(&ctx->tmp_mail); + (void)mailbox_transaction_commit(&ctx->t); +} + +void mail_thread_deinit(struct mail_thread_context **_ctx) +{ + struct mail_thread_context *ctx = *_ctx; + struct mail_thread_mailbox *tbox = MAIL_THREAD_CONTEXT_REQUIRE(ctx->box); + + *_ctx = NULL; + + mail_thread_clear(ctx); + mail_search_args_unref(&ctx->search_args); + tbox->ctx = NULL; + i_free(ctx); +} + +struct mail_thread_iterate_context * +mail_thread_iterate_init(struct mail_thread_context *ctx, + enum mail_thread_type thread_type, bool write_seqs) +{ + struct mail_thread_mailbox *tbox = MAIL_THREAD_CONTEXT_REQUIRE(ctx->box); + + return mail_thread_iterate_init_full(tbox->cache, ctx->tmp_mail, + thread_type, write_seqs); +} + +static void mail_thread_mailbox_close(struct mailbox *box) +{ + struct mail_thread_mailbox *tbox = MAIL_THREAD_CONTEXT_REQUIRE(box); + + i_assert(tbox->ctx == NULL); + + if (tbox->strmap_view != NULL) + mail_index_strmap_view_close(&tbox->strmap_view); + if (tbox->cache->search_result != NULL) + mailbox_search_result_free(&tbox->cache->search_result); + tbox->module_ctx.super.close(box); +} + +static void mail_thread_mailbox_free(struct mailbox *box) +{ + struct mail_thread_mailbox *tbox = MAIL_THREAD_CONTEXT_REQUIRE(box); + + mail_index_strmap_deinit(&tbox->strmap); + tbox->module_ctx.super.free(box); + + array_free(&tbox->cache->thread_nodes); + i_free(tbox->cache); + i_free(tbox); +} + +void index_thread_mailbox_opened(struct mailbox *box) +{ + struct mail_thread_mailbox *tbox = MAIL_THREAD_CONTEXT(box); + + if (tbox != NULL) { + /* mailbox was already opened+closed once. */ + return; + } + + tbox = i_new(struct mail_thread_mailbox, 1); + tbox->module_ctx.super = box->v; + box->v.close = mail_thread_mailbox_close; + box->v.free = mail_thread_mailbox_free; + + tbox->strmap = mail_index_strmap_init(box->index, + MAIL_THREAD_INDEX_SUFFIX); + tbox->next_msgid_idx = 1; + + tbox->cache = i_new(struct mail_thread_cache, 1); + i_array_init(&tbox->cache->thread_nodes, 128); + + MODULE_CONTEXT_SET(box, mail_thread_storage_module, tbox); +} |