diff options
Diffstat (limited to 'src/indexer/master-connection.c')
-rw-r--r-- | src/indexer/master-connection.c | 365 |
1 files changed, 365 insertions, 0 deletions
diff --git a/src/indexer/master-connection.c b/src/indexer/master-connection.c new file mode 100644 index 0000000..14f3c7d --- /dev/null +++ b/src/indexer/master-connection.c @@ -0,0 +1,365 @@ +/* Copyright (c) 2011-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "connection.h" +#include "ioloop.h" +#include "istream.h" +#include "ostream.h" +#include "strescape.h" +#include "process-title.h" +#include "master-service.h" +#include "master-service-settings.h" +#include "mail-namespace.h" +#include "mail-storage-private.h" +#include "mail-storage-service.h" +#include "mail-search-build.h" +#include "master-connection.h" + +#include <unistd.h> + +#define INDEXER_PROTOCOL_MAJOR_VERSION 1 +#define INDEXER_PROTOCOL_MINOR_VERSION 0 + +#define INDEXER_MASTER_NAME "indexer-master-worker" +#define INDEXER_WORKER_NAME "indexer-worker-master" + +static struct event_category event_category_indexer_worker = { + .name = "indexer-worker", +}; + +struct master_connection { + struct connection conn; + struct mail_storage_service_ctx *storage_service; + + bool version_received:1; +}; + +static void ATTR_NULL(1, 2) +indexer_worker_refresh_proctitle(const char *username, const char *mailbox, + uint32_t seq1, uint32_t seq2) +{ + if (!master_service_settings_get(master_service)->verbose_proctitle) + return; + + if (username == NULL) + process_title_set("[idling]"); + else if (seq1 == 0) + process_title_set(t_strdup_printf("[%s %s]", username, mailbox)); + else { + process_title_set(t_strdup_printf("[%s %s - %u/%u]", + username, mailbox, seq1, seq2)); + } +} + +static const char * +get_attempt_error(unsigned int counter, uint32_t first_uid, uint32_t last_uid) +{ + if (counter == 0) + return " (no mails indexed)"; + return t_strdup_printf( + " (attempted to index %u messages between UIDs %u..%u)", + counter, first_uid, last_uid); +} + +static int +index_mailbox_precache(struct master_connection *conn, struct mailbox *box) +{ + struct mail_storage *storage = mailbox_get_storage(box); + const char *username = mail_storage_get_user(storage)->username; + const char *box_vname = mailbox_get_vname(box); + const char *errstr; + enum mail_error error; + struct mailbox_status status; + struct mailbox_transaction_context *trans; + struct mail_search_args *search_args; + struct mail_search_context *ctx; + struct mail *mail; + struct mailbox_metadata metadata; + uint32_t seq, first_uid = 0, last_uid = 0; + char percentage_str[2+1+1]; + unsigned int counter = 0, max, percentage, percentage_sent = 0; + int ret = 0; + struct event *index_event = event_create(box->event); + event_add_category(index_event, &event_category_indexer_worker); + + if (mailbox_get_metadata(box, MAILBOX_METADATA_PRECACHE_FIELDS, + &metadata) < 0) { + e_error(index_event, "Precache-fields lookup failed: %s", + mailbox_get_last_internal_error(box, NULL)); + event_unref(&index_event); + return -1; + } + if (mailbox_get_status(box, STATUS_MESSAGES | STATUS_LAST_CACHED_SEQ, + &status) < 0) { + e_error(index_event, "Status lookup failed: %s", + mailbox_get_last_internal_error(box, NULL)); + event_unref(&index_event); + return -1; + } + seq = status.last_cached_seq + 1; + + trans = mailbox_transaction_begin(box, MAILBOX_TRANSACTION_FLAG_NO_CACHE_DEC, + "indexing"); + search_args = mail_search_build_init(); + mail_search_build_add_seqset(search_args, seq, status.messages); + + event_enable_user_cpu_usecs(index_event); + + ctx = mailbox_search_init(trans, search_args, NULL, + metadata.precache_fields, NULL); + mail_search_args_unref(&search_args); + + max = status.messages + 1 - seq; + while (mailbox_search_next(ctx, &mail)) { + if (first_uid == 0) + first_uid = mail->uid; + last_uid = mail->uid; + e_debug(index_event, "Indexing UID=%u", mail->uid); + + if (mail_precache(mail) < 0) { + e_error(index_event, "Precache for UID=%u failed: %s%s", + mail->uid, + mailbox_get_last_internal_error(box, NULL), + get_attempt_error(counter, first_uid, last_uid)); + ret = -1; + break; + } + if (++counter % 100 == 0) { + percentage = counter*100 / max; + if (percentage != percentage_sent && percentage < 100) { + percentage_sent = percentage; + if (i_snprintf(percentage_str, + sizeof(percentage_str), "%u\n", + percentage) < 0) + i_unreached(); + o_stream_nsend_str(conn->conn.output, + percentage_str); + } + indexer_worker_refresh_proctitle(username, box_vname, + counter, max); + } + } + if (mailbox_search_deinit(&ctx) < 0) { + e_error(index_event, "Mail search failed: %s%s", + mailbox_get_last_internal_error(box, NULL), + get_attempt_error(counter, first_uid, last_uid)); + ret = -1; + } + const char *uids = first_uid == 0 ? "" : + t_strdup_printf(" (UIDs %u..%u)", first_uid, last_uid); + event_add_int(index_event, "message_count", counter); + event_add_int(index_event, "first_uid", first_uid); + event_add_int(index_event, "last_uid", last_uid); + +#define FINISHED_EVENT_NAME "indexer_worker_indexing_finished" + if (mailbox_transaction_commit(&trans) < 0) { + struct event_passthrough *e = event_create_passthrough(index_event)-> + set_name(FINISHED_EVENT_NAME); + errstr = t_strdup_printf("Transaction commit failed: %s", + mailbox_get_last_internal_error(box, &error)); + e->add_str("error", errstr); + const char *log_error = t_strdup_printf("%s (attempted to index %u messages%s)", + errstr, counter, uids); + if (error != MAIL_ERROR_NOTFOUND) + e_error(e->event(), "%s", log_error); + else + e_debug(e->event(), "%s", log_error); + ret = -1; + } else { + struct event_passthrough *e = event_create_passthrough(index_event)-> + set_name(FINISHED_EVENT_NAME); + e_debug(e->event(), "Indexed %u messages%s", counter, uids); + } + event_unref(&index_event); + return ret; +} + +static int +index_mailbox(struct master_connection *conn, struct mail_user *user, + const char *mailbox, unsigned int max_recent_msgs, + const char *what) +{ + struct mail_namespace *ns; + struct mailbox *box; + struct mailbox_status status; + const char *path, *errstr; + enum mail_error error; + enum mailbox_sync_flags sync_flags = MAILBOX_SYNC_FLAG_FULL_READ; + int ret; + + ns = mail_namespace_find(user->namespaces, mailbox); + box = mailbox_alloc(ns->list, mailbox, 0); + ret = mailbox_get_path_to(box, MAILBOX_LIST_PATH_TYPE_INDEX, &path); + if (ret < 0) { + errstr = mailbox_get_last_internal_error(box, &error); + if (error != MAIL_ERROR_NOTFOUND) + e_error(box->event, "Getting path failed: %s", errstr); + mailbox_free(&box); + return -1; + } + if (ret == 0) { + e_info(box->event, "Indexes disabled, skipping"); + mailbox_free(&box); + return 0; + } + ret = 0; + + if (max_recent_msgs != 0) { + /* index only if there aren't too many recent messages. + don't bother syncing the mailbox, that alone can take a + while with large maildirs. */ + if (mailbox_open(box) < 0) { + errstr = mailbox_get_last_internal_error(box, &error); + if (error != MAIL_ERROR_NOTFOUND) + e_error(box->event, "Opening failed: %s", + errstr); + ret = -1; + } else { + mailbox_get_open_status(box, STATUS_RECENT, &status); + } + if (ret < 0 || status.recent > max_recent_msgs) { + mailbox_free(&box); + return ret; + } + } + + if (strchr(what, 'o') != NULL) + sync_flags |= MAILBOX_SYNC_FLAG_OPTIMIZE; + + if (mailbox_sync(box, sync_flags) < 0) { + errstr = mailbox_get_last_internal_error(box, &error); + if (error != MAIL_ERROR_NOTFOUND) { + e_error(box->event, "Syncing failed: %s", errstr); + } else { + e_debug(box->event, "Syncing failed: %s", errstr); + } + ret = -1; + } else if (strchr(what, 'i') != NULL) { + if (index_mailbox_precache(conn, box) < 0) + ret = -1; + } + mailbox_free(&box); + return ret; +} + +static int +master_connection_input_args(struct connection *_conn, const char *const *args) +{ + struct master_connection *conn = + container_of(_conn, struct master_connection, conn); + struct mail_storage_service_input input; + struct mail_storage_service_user *service_user; + struct mail_user *user; + const char *str, *error; + unsigned int max_recent_msgs; + int ret; + + /* <username> <mailbox> <session ID> <max_recent_msgs> [i][o] */ + if (str_array_length(args) != 5 || + str_to_uint(args[3], &max_recent_msgs) < 0 || args[4][0] == '\0') { + e_error(conn->conn.event, "Invalid input from master: %s", + t_strarray_join(args, "\t")); + return -1; + } + + i_zero(&input); + input.module = "mail"; + input.service = "indexer-worker"; + input.username = args[0]; + /* if session-id is given, use it as a prefix to a unique session ID. + we can't use the session-id directly or stats process will complain + about duplicates. (especially LMTP would use the same session-id for + multiple users' indexing at the same time.) */ + if (args[2][0] != '\0') + input.session_id_prefix = args[2]; + + if (mail_storage_service_lookup_next(conn->storage_service, &input, + &service_user, &user, &error) <= 0) { + e_error(conn->conn.event, "User %s lookup failed: %s", args[0], error); + ret = -1; + } else { + indexer_worker_refresh_proctitle(user->username, args[1], 0, 0); + struct event_reason *reason = + event_reason_begin("indexer:index_mailbox"); + ret = index_mailbox(conn, user, args[1], + max_recent_msgs, args[4]); + event_reason_end(&reason); + /* refresh proctitle before a potentially long-running + user unref */ + indexer_worker_refresh_proctitle(user->username, "(deinit)", 0, 0); + mail_user_deinit(&user); + mail_storage_service_user_unref(&service_user); + indexer_worker_refresh_proctitle(NULL, NULL, 0, 0); + } + + str = ret < 0 ? "-1\n" : "100\n"; + o_stream_nsend_str(conn->conn.output, str); + return ret; +} + +static void master_connection_destroy(struct connection *connection) +{ + connection_deinit(connection); + i_free(connection); + master_service_client_connection_destroyed(master_service); +} + +static int master_connection_handshake_args(struct connection *connection, + const char *const *args) +{ + int ret; + if ((ret = connection_handshake_args_default(connection, args)) < 1) + return ret; + const char *limit = t_strdup_printf("%u\n", + master_service_get_process_limit(master_service)); + o_stream_nsend_str(connection->output, limit); + return 1; +} + +static struct connection_list *master_connection_list = NULL; + +static const struct connection_vfuncs master_connection_vfuncs = { + .destroy = master_connection_destroy, + .input_args = master_connection_input_args, + .handshake_args = master_connection_handshake_args, +}; + +static const struct connection_settings master_connection_set = { + .service_name_in = INDEXER_MASTER_NAME, + .service_name_out = INDEXER_WORKER_NAME, + .major_version = INDEXER_PROTOCOL_MAJOR_VERSION, + .minor_version = INDEXER_PROTOCOL_MINOR_VERSION, + .input_max_size = SIZE_MAX, + .output_max_size = SIZE_MAX, +}; + +bool +master_connection_create(struct master_service_connection *master, + struct mail_storage_service_ctx *storage_service) +{ + struct master_connection *conn; + + if (master_connection_list == NULL) { + master_connection_list = + connection_list_init(&master_connection_set, + &master_connection_vfuncs); + } else if (master_connection_list->connections_count > 0) { + return FALSE; + } + + conn = i_new(struct master_connection, 1); + conn->storage_service = storage_service; + conn->conn.event_parent = event_create(NULL); + event_add_category(conn->conn.event_parent, &event_category_indexer_worker); + connection_init_server(master_connection_list, &conn->conn, + master->name, master->fd, master->fd); + + event_unref(&conn->conn.event_parent); + return TRUE; +} + +void master_connections_destroy(void) +{ + if (master_connection_list != NULL) + connection_list_deinit(&master_connection_list); +} |