diff options
Diffstat (limited to 'src/indexer/indexer.c')
-rw-r--r-- | src/indexer/indexer.c | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/src/indexer/indexer.c b/src/indexer/indexer.c new file mode 100644 index 0000000..6175b54 --- /dev/null +++ b/src/indexer/indexer.c @@ -0,0 +1,137 @@ +/* Copyright (c) 2011-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "restrict-access.h" +#include "process-title.h" +#include "master-service.h" +#include "master-service-settings.h" +#include "indexer-client.h" +#include "indexer-queue.h" +#include "worker-pool.h" +#include "worker-connection.h" + +static const struct master_service_settings *set; +static struct indexer_queue *queue; +static struct worker_pool *worker_pool; + +void indexer_refresh_proctitle(void) +{ + if (!set->verbose_proctitle) + return; + + process_title_set(t_strdup_printf("[%u clients, %u requests]", + indexer_clients_get_count(), + indexer_queue_count(queue))); +} + +static bool idle_die(void) +{ + return indexer_queue_is_empty(queue) && + !worker_pool_have_connections(worker_pool); +} + +static void client_connected(struct master_service_connection *conn) +{ + master_service_client_connection_accept(conn); + indexer_client_create(conn, queue); +} + +static void worker_send_request(struct connection *conn, + struct indexer_request *request) +{ + indexer_queue_request_work(request); + worker_connection_request(conn, request); +} + +static void queue_try_send_more(struct indexer_queue *queue) +{ + struct connection *conn; + struct indexer_request *request, *first_moved_request = NULL; + + while ((request = indexer_queue_request_peek(queue)) != NULL) { + conn = worker_pool_find_username_connection(worker_pool, + request->username); + if (conn != NULL) { + /* There is already a connection handling a request + * for this user. Move the request to the back of the + * queue and handle requests from other users. + * Terminate if we went through all requests. */ + if (request == first_moved_request) { + /* all requests are waiting for existing users + to finish. */ + break; + } + if (first_moved_request == NULL) + first_moved_request = request; + indexer_queue_move_head_to_tail(queue); + continue; + } else { + /* create a new connection to a worker */ + if (!worker_pool_get_connection(worker_pool, &conn)) + break; + } + indexer_queue_request_remove(queue); + worker_send_request(conn, request); + } +} + +static void queue_listen_callback(struct indexer_queue *queue) +{ + queue_try_send_more(queue); +} + +static void +worker_status_callback(int percentage, struct indexer_request *request) +{ + if (percentage >= 0 && percentage < 100) { + indexer_queue_request_status(queue, request, + percentage); + return; + } + + indexer_queue_request_finish(queue, &request, + percentage == 100); +} + +static void worker_avail_callback(void) +{ + /* A new worker became available. Try to shrink the queue. */ + queue_try_send_more(queue); +} + +int main(int argc, char *argv[]) +{ + const char *error; + + master_service = master_service_init("indexer", 0, &argc, &argv, ""); + if (master_getopt(master_service) > 0) + return FATAL_DEFAULT; + + if (master_service_settings_read_simple(master_service, NULL, + &error) < 0) + i_fatal("Error reading configuration: %s", error); + set = master_service_settings_get(master_service); + + master_service_init_log(master_service); + restrict_access_by_env(RESTRICT_ACCESS_FLAG_ALLOW_ROOT, NULL); + restrict_access_allow_coredumps(TRUE); + master_service_set_idle_die_callback(master_service, idle_die); + + queue = indexer_queue_init(indexer_client_status_callback); + indexer_queue_set_listen_callback(queue, queue_listen_callback); + worker_pool = worker_pool_init("indexer-worker", + worker_status_callback, + worker_avail_callback); + master_service_init_finish(master_service); + + master_service_run(master_service, client_connected); + + indexer_queue_cancel_all(queue); + indexer_clients_destroy_all(); + worker_pool_deinit(&worker_pool); + indexer_queue_deinit(&queue); + + master_service_deinit(&master_service); + return 0; +} |