/* Copyright (c) 2011-2018 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "array.h" #include "llist.h" #include "hash.h" #include "indexer-queue.h" struct indexer_queue { indexer_queue_callback_t *callback; void (*listen_callback)(struct indexer_queue *); /* username+mailbox -> indexer_request */ HASH_TABLE(struct indexer_request *, struct indexer_request *) requests; struct indexer_request *head, *tail; }; static unsigned int indexer_request_hash(const struct indexer_request *request) { return str_hash(request->username) ^ str_hash(request->mailbox); } static int indexer_request_cmp(const struct indexer_request *r1, const struct indexer_request *r2) { return strcmp(r1->username, r2->username) == 0 && strcmp(r1->mailbox, r2->mailbox) == 0 ? 0 : 1; } struct indexer_queue * indexer_queue_init(indexer_queue_callback_t *callback) { struct indexer_queue *queue; queue = i_new(struct indexer_queue, 1); queue->callback = callback; hash_table_create(&queue->requests, default_pool, 0, indexer_request_hash, indexer_request_cmp); return queue; } void indexer_queue_deinit(struct indexer_queue **_queue) { struct indexer_queue *queue = *_queue; *_queue = NULL; i_assert(indexer_queue_is_empty(queue)); hash_table_destroy(&queue->requests); i_free(queue); } void indexer_queue_set_listen_callback(struct indexer_queue *queue, void (*callback)(struct indexer_queue *)) { queue->listen_callback = callback; } static struct indexer_request * indexer_queue_lookup(struct indexer_queue *queue, const char *username, const char *mailbox) { struct indexer_request lookup_request; lookup_request.username = (void *)username; lookup_request.mailbox = (void *)mailbox; return hash_table_lookup(queue->requests, &lookup_request); } static void request_add_context(struct indexer_request *request, void *context) { if (context == NULL) return; if (!array_is_created(&request->contexts)) i_array_init(&request->contexts, 2); array_push_back(&request->contexts, &context); } static struct indexer_request * indexer_queue_append_request(struct indexer_queue *queue, bool append, const char *username, const char *mailbox, const char *session_id, unsigned int max_recent_msgs, void *context) { struct indexer_request *request; request = indexer_queue_lookup(queue, username, mailbox); if (request == NULL) { request = i_new(struct indexer_request, 1); request->username = i_strdup(username); request->mailbox = i_strdup(mailbox); request->session_id = i_strdup(session_id); request->max_recent_msgs = max_recent_msgs; request_add_context(request, context); hash_table_insert(queue->requests, request, request); } else { if (request->max_recent_msgs > max_recent_msgs) request->max_recent_msgs = max_recent_msgs; request_add_context(request, context); if (request->working) { /* we're already indexing this mailbox. */ if (append) request->reindex_tail = TRUE; else request->reindex_head = TRUE; return request; } if (append) { /* keep the request in its old position */ return request; } /* move request to beginning of the queue */ DLLIST2_REMOVE(&queue->head, &queue->tail, request); } if (append) DLLIST2_APPEND(&queue->head, &queue->tail, request); else DLLIST2_PREPEND(&queue->head, &queue->tail, request); return request; } static void indexer_queue_append_finish(struct indexer_queue *queue) { if (queue->listen_callback != NULL) queue->listen_callback(queue); indexer_refresh_proctitle(); } void indexer_queue_append(struct indexer_queue *queue, bool append, const char *username, const char *mailbox, const char *session_id, unsigned int max_recent_msgs, void *context) { struct indexer_request *request; request = indexer_queue_append_request(queue, append, username, mailbox, session_id, max_recent_msgs, context); request->index = TRUE; indexer_queue_append_finish(queue); } void indexer_queue_append_optimize(struct indexer_queue *queue, const char *username, const char *mailbox, void *context) { struct indexer_request *request; request = indexer_queue_append_request(queue, TRUE, username, mailbox, NULL, 0, context); request->optimize = TRUE; indexer_queue_append_finish(queue); } struct indexer_request *indexer_queue_request_peek(struct indexer_queue *queue) { return queue->head; } void indexer_queue_request_remove(struct indexer_queue *queue) { struct indexer_request *request = queue->head; i_assert(request != NULL); DLLIST2_REMOVE(&queue->head, &queue->tail, request); } static void indexer_queue_request_status_int(struct indexer_queue *queue, struct indexer_request *request, int percentage) { void *context; unsigned int i; for (i = 0; i < request->working_context_idx; i++) { context = array_idx_elem(&request->contexts, i); queue->callback(percentage, context); } } void indexer_queue_request_status(struct indexer_queue *queue, struct indexer_request *request, int percentage) { i_assert(percentage >= 0 && percentage < 100); indexer_queue_request_status_int(queue, request, percentage); } void indexer_queue_move_head_to_tail(struct indexer_queue *queue) { struct indexer_request *request = queue->head; indexer_queue_request_remove(queue); DLLIST2_APPEND(&queue->head, &queue->tail, request); } void indexer_queue_request_work(struct indexer_request *request) { request->working = TRUE; request->working_context_idx = !array_is_created(&request->contexts) ? 0 : array_count(&request->contexts); } void indexer_queue_request_finish(struct indexer_queue *queue, struct indexer_request **_request, bool success) { struct indexer_request *request = *_request; *_request = NULL; indexer_queue_request_status_int(queue, request, success ? 100 : -1); if (request->reindex_head || request->reindex_tail) { i_assert(request->working); request->working = FALSE; request->reindex_head = FALSE; request->reindex_tail = FALSE; if (request->working_context_idx > 0) { array_delete(&request->contexts, 0, request->working_context_idx); } if (request->reindex_head) DLLIST2_PREPEND(&queue->head, &queue->tail, request); else DLLIST2_APPEND(&queue->head, &queue->tail, request); return; } hash_table_remove(queue->requests, request); if (array_is_created(&request->contexts)) array_free(&request->contexts); i_free(request->username); i_free(request->mailbox); i_free(request->session_id); i_free(request); indexer_refresh_proctitle(); } void indexer_queue_cancel_all(struct indexer_queue *queue) { struct indexer_request *request; struct hash_iterate_context *iter; /* remove all reindex-markers so when the current requests finish (or are cancelled) we don't try to retry them (especially during deinit where it crashes) */ iter = hash_table_iterate_init(queue->requests); while (hash_table_iterate(iter, queue->requests, &request, &request)) request->reindex_head = request->reindex_tail = FALSE; hash_table_iterate_deinit(&iter); while ((request = indexer_queue_request_peek(queue)) != NULL) { indexer_queue_request_remove(queue); indexer_queue_request_finish(queue, &request, FALSE); } } bool indexer_queue_is_empty(struct indexer_queue *queue) { return queue->head == NULL; } unsigned int indexer_queue_count(struct indexer_queue *queue) { return hash_table_count(queue->requests); }