/* Copyright (c) 2002-2018 Dovecot authors, see the included COPYING file */ /* doc/thread-refs.txt describes the incremental algorithm we use here. */ #include "lib.h" #include "array.h" #include "bsearch-insert-pos.h" #include "hash2.h" #include "message-id.h" #include "mail-search.h" #include "mail-search-build.h" #include "mailbox-search-result-private.h" #include "index-storage.h" #include "index-thread-private.h" #define MAIL_THREAD_CONTEXT(obj) \ MODULE_CONTEXT(obj, mail_thread_storage_module) #define MAIL_THREAD_CONTEXT_REQUIRE(obj) \ MODULE_CONTEXT_REQUIRE(obj, mail_thread_storage_module) struct mail_thread_context { struct mailbox *box; struct mailbox_transaction_context *t; struct mail_index_strmap_view_sync *strmap_sync; struct mail *tmp_mail; struct mail_search_args *search_args; ARRAY_TYPE(seq_range) added_uids; bool failed:1; bool corrupted:1; }; struct mail_thread_mailbox { union mailbox_module_context module_ctx; unsigned int next_msgid_idx; struct mail_thread_cache *cache; struct mail_index_strmap *strmap; struct mail_index_strmap_view *strmap_view; /* sorted by UID, ref_index */ const ARRAY_TYPE(mail_index_strmap_rec) *msgid_map; const struct hash2_table *msgid_hash; /* set only temporarily while needed */ struct mail_thread_context *ctx; }; static MODULE_CONTEXT_DEFINE_INIT(mail_thread_storage_module, &mail_storage_module_register); static void mail_thread_clear(struct mail_thread_context *ctx); static int mail_strmap_rec_get_msgid(struct mail_thread_context *ctx, const struct mail_index_strmap_rec *rec, const char **msgid_r) { struct mail *mail = ctx->tmp_mail; const char *msgids = NULL, *msgid; unsigned int n = 0; int ret; if (!mail_set_uid(mail, rec->uid)) return 0; switch (rec->ref_index) { case MAIL_THREAD_NODE_REF_MSGID: /* Message-ID: header */ ret = mail_get_first_header(mail, HDR_MESSAGE_ID, &msgids); break; case MAIL_THREAD_NODE_REF_INREPLYTO: /* In-Reply-To: header */ ret = mail_get_first_header(mail, HDR_IN_REPLY_TO, &msgids); break; default: /* References: header */ ret = mail_get_first_header(mail, HDR_REFERENCES, &msgids); n = rec->ref_index - MAIL_THREAD_NODE_REF_REFERENCES1; break; } if (ret < 0) { if (mail->expunged) { /* treat it as if it didn't exist. trying to add it again will result in failure. */ return 0; } return -1; } /* get the nth message-id */ msgid = message_id_get_next(&msgids); if (msgid != NULL) { for (; n > 0; n--) msgid = message_id_get_next(&msgids); } if (msgid == NULL) { /* shouldn't have happened, probably corrupted */ mail_set_critical(mail, "Corrupted thread index: lost Message ID %u", rec->ref_index); ctx->failed = TRUE; ctx->corrupted = TRUE; return -1; } *msgid_r = msgid; return 1; } static bool mail_thread_hash_key_cmp(const char *key, const struct mail_index_strmap_rec *rec, void *context) { struct mail_thread_mailbox *tbox = context; struct mail_thread_context *ctx = tbox->ctx; const char *msgid; bool cmp_ret; int ret; /* either a match or a collision, need to look closer */ T_BEGIN { ret = mail_strmap_rec_get_msgid(ctx, rec, &msgid); if (ret <= 0) { if (ret < 0) ctx->failed = TRUE; cmp_ret = FALSE; } else { cmp_ret = strcmp(msgid, key) == 0; } } T_END; return cmp_ret; } static int mail_thread_hash_rec_cmp(const struct mail_index_strmap_rec *rec1, const struct mail_index_strmap_rec *rec2, void *context) { struct mail_thread_mailbox *tbox = context; struct mail_thread_context *ctx = tbox->ctx; const char *msgid1, *msgid2; int ret; T_BEGIN { ret = mail_strmap_rec_get_msgid(ctx, rec1, &msgid1); if (ret > 0) { msgid1 = t_strdup(msgid1); ret = mail_strmap_rec_get_msgid(ctx, rec2, &msgid2); } ret = ret <= 0 ? -1 : strcmp(msgid1, msgid2) == 0; } T_END; return ret; } static void mail_thread_strmap_remap(const uint32_t *idx_map, unsigned int old_count, unsigned int new_count, void *context) { struct mail_thread_mailbox *tbox = context; struct mail_thread_cache *cache = tbox->cache; ARRAY_TYPE(mail_thread_node) new_nodes; const struct mail_thread_node *old_nodes; struct mail_thread_node *node; unsigned int i, nodes_count, max, new_first_invalid, invalid_count; if (cache->search_result == NULL) return; if (new_count == 0) { /* strmap was reset, we'll need to rebuild thread */ mailbox_search_result_free(&cache->search_result); return; } invalid_count = cache->next_invalid_msgid_str_idx - cache->first_invalid_msgid_str_idx; old_nodes = array_get(&cache->thread_nodes, &nodes_count); i_array_init(&new_nodes, new_count + invalid_count + 32); /* optimization: allocate all nodes initially */ (void)array_idx_modifiable(&new_nodes, new_count-1); /* renumber existing valid nodes. all existing records in old_nodes should also exist in idx_map since we've removed expunged messages from the cache before committing the sync. */ max = I_MIN(I_MIN(old_count, nodes_count), cache->first_invalid_msgid_str_idx); for (i = 0; i < max; i++) { if (idx_map[i] == 0) { /* expunged record. */ i_assert(old_nodes[i].uid == 0); } else { node = array_idx_modifiable(&new_nodes, idx_map[i]); *node = old_nodes[i]; if (node->parent_idx != 0) { node->parent_idx = idx_map[node->parent_idx]; i_assert(node->parent_idx != 0); } } } /* copy invalid nodes, if any. no other messages point to them, so this is safe. we still need to update their parent_idx pointers though. */ new_first_invalid = new_count + 1 + THREAD_INVALID_MSGID_STR_IDX_SKIP_COUNT; for (i = 0; i < invalid_count; i++) { node = array_idx_modifiable(&new_nodes, new_first_invalid + i); *node = old_nodes[cache->first_invalid_msgid_str_idx + i]; if (node->parent_idx != 0) { node->parent_idx = idx_map[node->parent_idx]; i_assert(node->parent_idx != 0); } } cache->first_invalid_msgid_str_idx = new_first_invalid; cache->next_invalid_msgid_str_idx = new_first_invalid + invalid_count; /* replace the old nodes with the renumbered ones */ array_free(&cache->thread_nodes); cache->thread_nodes = new_nodes; } static int thread_get_mail_header(struct mail *mail, const char *name, const char **value_r) { if (mail_get_first_header(mail, name, value_r) < 0) { if (!mail->expunged) return -1; /* Message is expunged. Instead of failing the entire THREAD command, just treat the header as nonexistent. */ *value_r = NULL; } return 0; } static int mail_thread_map_add_mail(struct mail_thread_context *ctx, struct mail *mail) { const char *message_id, *in_reply_to, *references, *msgid; uint32_t ref_index; if (thread_get_mail_header(mail, HDR_MESSAGE_ID, &message_id) < 0 || thread_get_mail_header(mail, HDR_REFERENCES, &references) < 0) return -1; /* add Message-ID: */ msgid = message_id_get_next(&message_id); if (msgid != NULL) { mail_index_strmap_view_sync_add(ctx->strmap_sync, mail->uid, MAIL_THREAD_NODE_REF_MSGID, msgid); } else { mail_index_strmap_view_sync_add_unique(ctx->strmap_sync, mail->uid, MAIL_THREAD_NODE_REF_MSGID); } /* add References: if there are any valid ones */ msgid = message_id_get_next(&references); if (msgid != NULL) { ref_index = MAIL_THREAD_NODE_REF_REFERENCES1; do { mail_index_strmap_view_sync_add(ctx->strmap_sync, mail->uid, ref_index, msgid); ref_index++; msgid = message_id_get_next(&references); } while (msgid != NULL); } else { /* no References:, use In-Reply-To: */ if (thread_get_mail_header(mail, HDR_IN_REPLY_TO, &in_reply_to) < 0) return -1; msgid = message_id_get_next(&in_reply_to); if (msgid != NULL) { mail_index_strmap_view_sync_add(ctx->strmap_sync, mail->uid, MAIL_THREAD_NODE_REF_INREPLYTO, msgid); } } if (ctx->failed) { /* message-id lookup failed in hash compare */ return -1; } return 0; } static int mail_thread_index_map_build(struct mail_thread_context *ctx) { static const char *wanted_headers[] = { HDR_MESSAGE_ID, HDR_IN_REPLY_TO, HDR_REFERENCES, HDR_SUBJECT, NULL }; struct mail_thread_mailbox *tbox = MAIL_THREAD_CONTEXT_REQUIRE(ctx->box); struct mailbox_header_lookup_ctx *headers_ctx; struct mail_search_args *search_args; struct mail_search_context *search_ctx; struct mail *mail; uint32_t last_uid, seq1, seq2; int ret = 0; if (tbox->strmap_view == NULL) { /* first time we're threading this mailbox */ tbox->strmap_view = mail_index_strmap_view_open(tbox->strmap, ctx->box->view, mail_thread_hash_key_cmp, mail_thread_hash_rec_cmp, mail_thread_strmap_remap, tbox, &tbox->msgid_map, &tbox->msgid_hash); } headers_ctx = mailbox_header_lookup_init(ctx->box, wanted_headers); ctx->tmp_mail = mail_alloc(ctx->t, MAIL_FETCH_DATE | MAIL_FETCH_RECEIVED_DATE, headers_ctx); /* add all missing UIDs */ ctx->strmap_sync = mail_index_strmap_view_sync_init(tbox->strmap_view, &last_uid); mailbox_get_seq_range(ctx->box, last_uid + 1, (uint32_t)-1, &seq1, &seq2); if (seq1 == 0) { /* nothing is missing */ mail_index_strmap_view_sync_commit(&ctx->strmap_sync); mailbox_header_lookup_unref(&headers_ctx); return 0; } search_args = mail_search_build_init(); mail_search_build_add_seqset(search_args, seq1, seq2); search_ctx = mailbox_search_init(ctx->t, search_args, NULL, MAIL_FETCH_DATE | MAIL_FETCH_RECEIVED_DATE, headers_ctx); mailbox_header_lookup_unref(&headers_ctx); mail_search_args_unref(&search_args); while (mailbox_search_next(search_ctx, &mail)) { if (mail_thread_map_add_mail(ctx, mail) < 0) { ret = -1; break; } } if (mailbox_search_deinit(&search_ctx) < 0) ret = -1; if (ret < 0) mail_index_strmap_view_sync_rollback(&ctx->strmap_sync); else mail_index_strmap_view_sync_commit(&ctx->strmap_sync); return ret; } static int msgid_map_cmp(const uint32_t *uid, const struct mail_index_strmap_rec *rec) { return *uid < rec->uid ? -1 : (*uid > rec->uid ? 1 : 0); } static bool mail_thread_cache_update_removes(struct mail_thread_mailbox *tbox, ARRAY_TYPE(seq_range) *added_uids) { struct mail_thread_cache *cache = tbox->cache; ARRAY_TYPE(seq_range) removed_uids; const struct seq_range *uids; const struct mail_index_strmap_rec *msgid_map; unsigned int i, j, idx, map_count, uid_count; uint32_t uid; t_array_init(&removed_uids, 64); mailbox_search_result_sync(cache->search_result, &removed_uids, added_uids); /* first check that we're not inserting any messages in the middle */ uids = array_get(added_uids, &uid_count); if (uid_count > 0 && uids[0].seq1 <= cache->last_uid) return FALSE; /* next remove messages so we'll see early if we have to rebuild. we expect to find all removed UIDs from msgid_map that are <= max UID in msgid_map */ msgid_map = array_get(tbox->msgid_map, &map_count); uids = array_get(&removed_uids, &uid_count); for (i = j = 0; i < uid_count; i++) { /* find and remove from the map */ bsearch_insert_pos(&uids[i].seq1, &msgid_map[j], map_count - j, sizeof(*msgid_map), msgid_map_cmp, &idx); j += idx; if (j == map_count) { /* all removals after this are about messages we never even added to the cache */ i_assert(uids[i].seq1 > cache->last_uid); break; } while (j > 0 && msgid_map[j-1].uid == msgid_map[j].uid) j--; /* remove the messages from cache */ for (uid = uids[i].seq1; uid <= uids[i].seq2; uid++) { if (j == map_count) { i_assert(uid > cache->last_uid); break; } i_assert(msgid_map[j].uid == uid); if (!mail_thread_remove(cache, msgid_map + j, &j)) return FALSE; } } return TRUE; } static void mail_thread_cache_update_adds(struct mail_thread_mailbox *tbox, ARRAY_TYPE(seq_range) *added_uids) { struct mail_thread_cache *cache = tbox->cache; const struct seq_range *uids; const struct mail_index_strmap_rec *msgid_map; unsigned int i, j, map_count, uid_count; uint32_t uid; /* everything removed successfully, add the new messages. all of them should already be in msgid_map. */ uids = array_get(added_uids, &uid_count); if (uid_count == 0) return; (void)array_bsearch_insert_pos(tbox->msgid_map, &uids[0].seq1, msgid_map_cmp, &j); msgid_map = array_get(tbox->msgid_map, &map_count); i_assert(j < map_count); while (j > 0 && msgid_map[j-1].uid == msgid_map[j].uid) j--; for (i = 0; i < uid_count; i++) { for (uid = uids[i].seq1; uid <= uids[i].seq2; uid++) { while (j < map_count && msgid_map[j].uid < uid) j++; i_assert(j < map_count && msgid_map[j].uid == uid); mail_thread_add(cache, msgid_map+j, &j); } } } static void mail_thread_cache_fix_invalid_indexes(struct mail_thread_mailbox *tbox) { struct mail_thread_cache *cache = tbox->cache; uint32_t highest_idx, new_first_idx, count; highest_idx = mail_index_strmap_view_get_highest_idx(tbox->strmap_view); new_first_idx = highest_idx + 1 + THREAD_INVALID_MSGID_STR_IDX_SKIP_COUNT; count = cache->next_invalid_msgid_str_idx - cache->first_invalid_msgid_str_idx; if (count == 0) { /* there are no invalid indexes yet, we can update the first invalid index position to delay conflicts. */ cache->first_invalid_msgid_str_idx = cache->next_invalid_msgid_str_idx = new_first_idx; } else if (highest_idx >= cache->first_invalid_msgid_str_idx) { /* conflict - move the invalid indexes forward */ array_copy(&cache->thread_nodes.arr, new_first_idx, &cache->thread_nodes.arr, cache->first_invalid_msgid_str_idx, count); cache->first_invalid_msgid_str_idx = new_first_idx; cache->next_invalid_msgid_str_idx = new_first_idx + count; } } static void mail_thread_cache_sync_remove(struct mail_thread_mailbox *tbox, struct mail_thread_context *ctx) { struct mail_thread_cache *cache = tbox->cache; if (cache->search_result == NULL) return; if (mail_search_args_equal(ctx->search_args, cache->search_result->search_args)) { t_array_init(&ctx->added_uids, 64); if (mail_thread_cache_update_removes(tbox, &ctx->added_uids)) { /* successfully updated the cache */ return; } } /* failed to use the cache, rebuild */ mailbox_search_result_free(&cache->search_result); } static void mail_thread_cache_sync_add(struct mail_thread_mailbox *tbox, struct mail_thread_context *ctx, struct mail_search_context *search_ctx) { struct mail_thread_cache *cache = tbox->cache; struct mail *mail; const struct mail_index_strmap_rec *msgid_map; unsigned int i, count; mail_thread_cache_fix_invalid_indexes(tbox); if (cache->search_result != NULL) { /* we already checked at sync_remove that we can use this search result. */ mail_thread_cache_update_adds(tbox, &ctx->added_uids); return; } cache->last_uid = 0; cache->first_invalid_msgid_str_idx = cache->next_invalid_msgid_str_idx = mail_index_strmap_view_get_highest_idx(tbox->strmap_view) + 1 + THREAD_INVALID_MSGID_STR_IDX_SKIP_COUNT; array_clear(&cache->thread_nodes); cache->search_result = mailbox_search_result_save(search_ctx, MAILBOX_SEARCH_RESULT_FLAG_UPDATE | MAILBOX_SEARCH_RESULT_FLAG_QUEUE_SYNC); msgid_map = array_get(tbox->msgid_map, &count); /* we're relying on the array being zero-terminated (outside used count - kind of kludgy) */ i_assert(msgid_map[count].uid == 0); i = 0; while (i < count && mailbox_search_next(search_ctx, &mail)) { while (msgid_map[i].uid < mail->uid) i++; i_assert(i < count); mail_thread_add(cache, msgid_map+i, &i); } } int mail_thread_init(struct mailbox *box, struct mail_search_args *args, struct mail_thread_context **ctx_r) { struct mail_thread_mailbox *tbox = MAIL_THREAD_CONTEXT_REQUIRE(box); struct mail_thread_context *ctx; struct mail_search_context *search_ctx; int ret; i_assert(tbox->ctx == NULL); struct event_reason *reason = event_reason_begin("mailbox:thread"); if (args != NULL) mail_search_args_ref(args); else { args = mail_search_build_init(); mail_search_build_add_all(args); mail_search_args_init(args, box, FALSE, NULL); } ctx = i_new(struct mail_thread_context, 1); ctx->box = box; ctx->search_args = args; ctx->t = mailbox_transaction_begin(ctx->box, 0, __func__); /* perform search first, so we don't break if there are INTHREAD keys */ search_ctx = mailbox_search_init(ctx->t, args, NULL, 0, NULL); tbox->ctx = ctx; mail_thread_cache_sync_remove(tbox, ctx); ret = mail_thread_index_map_build(ctx); if (ret == 0) mail_thread_cache_sync_add(tbox, ctx, search_ctx); if (mailbox_search_deinit(&search_ctx) < 0) ret = -1; if (ctx->failed) { ret = -1; if (ctx->corrupted) mail_index_strmap_view_set_corrupted(tbox->strmap_view); } event_reason_end(&reason); if (ret < 0) { mail_thread_deinit(&ctx); return -1; } else { i_zero(&ctx->added_uids); *ctx_r = ctx; return 0; } } static void mail_thread_clear(struct mail_thread_context *ctx) { mail_free(&ctx->tmp_mail); (void)mailbox_transaction_commit(&ctx->t); } void mail_thread_deinit(struct mail_thread_context **_ctx) { struct mail_thread_context *ctx = *_ctx; struct mail_thread_mailbox *tbox = MAIL_THREAD_CONTEXT_REQUIRE(ctx->box); *_ctx = NULL; mail_thread_clear(ctx); mail_search_args_unref(&ctx->search_args); tbox->ctx = NULL; i_free(ctx); } struct mail_thread_iterate_context * mail_thread_iterate_init(struct mail_thread_context *ctx, enum mail_thread_type thread_type, bool write_seqs) { struct mail_thread_mailbox *tbox = MAIL_THREAD_CONTEXT_REQUIRE(ctx->box); return mail_thread_iterate_init_full(tbox->cache, ctx->tmp_mail, thread_type, write_seqs); } static void mail_thread_mailbox_close(struct mailbox *box) { struct mail_thread_mailbox *tbox = MAIL_THREAD_CONTEXT_REQUIRE(box); i_assert(tbox->ctx == NULL); if (tbox->strmap_view != NULL) mail_index_strmap_view_close(&tbox->strmap_view); if (tbox->cache->search_result != NULL) mailbox_search_result_free(&tbox->cache->search_result); tbox->module_ctx.super.close(box); } static void mail_thread_mailbox_free(struct mailbox *box) { struct mail_thread_mailbox *tbox = MAIL_THREAD_CONTEXT_REQUIRE(box); mail_index_strmap_deinit(&tbox->strmap); tbox->module_ctx.super.free(box); array_free(&tbox->cache->thread_nodes); i_free(tbox->cache); i_free(tbox); } void index_thread_mailbox_opened(struct mailbox *box) { struct mail_thread_mailbox *tbox = MAIL_THREAD_CONTEXT(box); if (tbox != NULL) { /* mailbox was already opened+closed once. */ return; } tbox = i_new(struct mail_thread_mailbox, 1); tbox->module_ctx.super = box->v; box->v.close = mail_thread_mailbox_close; box->v.free = mail_thread_mailbox_free; tbox->strmap = mail_index_strmap_init(box->index, MAIL_THREAD_INDEX_SUFFIX); tbox->next_msgid_idx = 1; tbox->cache = i_new(struct mail_thread_cache, 1); i_array_init(&tbox->cache->thread_nodes, 128); MODULE_CONTEXT_SET(box, mail_thread_storage_module, tbox); }