summaryrefslogtreecommitdiffstats
path: root/src/indexer/indexer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/indexer/indexer.c')
-rw-r--r--src/indexer/indexer.c137
1 files changed, 137 insertions, 0 deletions
diff --git a/src/indexer/indexer.c b/src/indexer/indexer.c
new file mode 100644
index 0000000..6175b54
--- /dev/null
+++ b/src/indexer/indexer.c
@@ -0,0 +1,137 @@
+/* Copyright (c) 2011-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "restrict-access.h"
+#include "process-title.h"
+#include "master-service.h"
+#include "master-service-settings.h"
+#include "indexer-client.h"
+#include "indexer-queue.h"
+#include "worker-pool.h"
+#include "worker-connection.h"
+
+static const struct master_service_settings *set;
+static struct indexer_queue *queue;
+static struct worker_pool *worker_pool;
+
+void indexer_refresh_proctitle(void)
+{
+ if (!set->verbose_proctitle)
+ return;
+
+ process_title_set(t_strdup_printf("[%u clients, %u requests]",
+ indexer_clients_get_count(),
+ indexer_queue_count(queue)));
+}
+
+static bool idle_die(void)
+{
+ return indexer_queue_is_empty(queue) &&
+ !worker_pool_have_connections(worker_pool);
+}
+
+static void client_connected(struct master_service_connection *conn)
+{
+ master_service_client_connection_accept(conn);
+ indexer_client_create(conn, queue);
+}
+
+static void worker_send_request(struct connection *conn,
+ struct indexer_request *request)
+{
+ indexer_queue_request_work(request);
+ worker_connection_request(conn, request);
+}
+
+static void queue_try_send_more(struct indexer_queue *queue)
+{
+ struct connection *conn;
+ struct indexer_request *request, *first_moved_request = NULL;
+
+ while ((request = indexer_queue_request_peek(queue)) != NULL) {
+ conn = worker_pool_find_username_connection(worker_pool,
+ request->username);
+ if (conn != NULL) {
+ /* There is already a connection handling a request
+ * for this user. Move the request to the back of the
+ * queue and handle requests from other users.
+ * Terminate if we went through all requests. */
+ if (request == first_moved_request) {
+ /* all requests are waiting for existing users
+ to finish. */
+ break;
+ }
+ if (first_moved_request == NULL)
+ first_moved_request = request;
+ indexer_queue_move_head_to_tail(queue);
+ continue;
+ } else {
+ /* create a new connection to a worker */
+ if (!worker_pool_get_connection(worker_pool, &conn))
+ break;
+ }
+ indexer_queue_request_remove(queue);
+ worker_send_request(conn, request);
+ }
+}
+
+static void queue_listen_callback(struct indexer_queue *queue)
+{
+ queue_try_send_more(queue);
+}
+
+static void
+worker_status_callback(int percentage, struct indexer_request *request)
+{
+ if (percentage >= 0 && percentage < 100) {
+ indexer_queue_request_status(queue, request,
+ percentage);
+ return;
+ }
+
+ indexer_queue_request_finish(queue, &request,
+ percentage == 100);
+}
+
+static void worker_avail_callback(void)
+{
+ /* A new worker became available. Try to shrink the queue. */
+ queue_try_send_more(queue);
+}
+
+int main(int argc, char *argv[])
+{
+ const char *error;
+
+ master_service = master_service_init("indexer", 0, &argc, &argv, "");
+ if (master_getopt(master_service) > 0)
+ return FATAL_DEFAULT;
+
+ if (master_service_settings_read_simple(master_service, NULL,
+ &error) < 0)
+ i_fatal("Error reading configuration: %s", error);
+ set = master_service_settings_get(master_service);
+
+ master_service_init_log(master_service);
+ restrict_access_by_env(RESTRICT_ACCESS_FLAG_ALLOW_ROOT, NULL);
+ restrict_access_allow_coredumps(TRUE);
+ master_service_set_idle_die_callback(master_service, idle_die);
+
+ queue = indexer_queue_init(indexer_client_status_callback);
+ indexer_queue_set_listen_callback(queue, queue_listen_callback);
+ worker_pool = worker_pool_init("indexer-worker",
+ worker_status_callback,
+ worker_avail_callback);
+ master_service_init_finish(master_service);
+
+ master_service_run(master_service, client_connected);
+
+ indexer_queue_cancel_all(queue);
+ indexer_clients_destroy_all();
+ worker_pool_deinit(&worker_pool);
+ indexer_queue_deinit(&queue);
+
+ master_service_deinit(&master_service);
+ return 0;
+}