summaryrefslogtreecommitdiffstats
path: root/src/indexer/indexer.c
blob: 6175b5476b3df264d11ca4c2cd6b44a001f8c2e1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/* Copyright (c) 2011-2018 Dovecot authors, see the included COPYING file */

#include "lib.h"
#include "ioloop.h"
#include "restrict-access.h"
#include "process-title.h"
#include "master-service.h"
#include "master-service-settings.h"
#include "indexer-client.h"
#include "indexer-queue.h"
#include "worker-pool.h"
#include "worker-connection.h"

static const struct master_service_settings *set;
static struct indexer_queue *queue;
static struct worker_pool *worker_pool;

void indexer_refresh_proctitle(void)
{
	if (!set->verbose_proctitle)
		return;

	process_title_set(t_strdup_printf("[%u clients, %u requests]",
					  indexer_clients_get_count(),
					  indexer_queue_count(queue)));
}

static bool idle_die(void)
{
	return indexer_queue_is_empty(queue) &&
		!worker_pool_have_connections(worker_pool);
}

static void client_connected(struct master_service_connection *conn)
{
	master_service_client_connection_accept(conn);
	indexer_client_create(conn, queue);
}

static void worker_send_request(struct connection *conn,
				struct indexer_request *request)
{
	indexer_queue_request_work(request);
	worker_connection_request(conn, request);
}

static void queue_try_send_more(struct indexer_queue *queue)
{
	struct connection *conn;
	struct indexer_request *request, *first_moved_request = NULL;

	while ((request = indexer_queue_request_peek(queue)) != NULL) {
		conn = worker_pool_find_username_connection(worker_pool,
							    request->username);
		if (conn != NULL) {
			/* There is already a connection handling a request
			 * for this user. Move the request to the back of the
			 * queue and handle requests from other users.
			 * Terminate if we went through all requests. */
			if (request == first_moved_request) {
				/* all requests are waiting for existing users
				   to finish. */
				break;
			}
			if (first_moved_request == NULL)
				first_moved_request = request;
			indexer_queue_move_head_to_tail(queue);
			continue;
		} else {
			/* create a new connection to a worker */
			if (!worker_pool_get_connection(worker_pool, &conn))
				break;
		}
		indexer_queue_request_remove(queue);
		worker_send_request(conn, request);
	}
}

static void queue_listen_callback(struct indexer_queue *queue)
{
	queue_try_send_more(queue);
}

static void
worker_status_callback(int percentage, struct indexer_request *request)
{
	if (percentage >= 0 && percentage < 100) {
		indexer_queue_request_status(queue, request,
					     percentage);
		return;
	}

	indexer_queue_request_finish(queue, &request,
				     percentage == 100);
}

static void worker_avail_callback(void)
{
	/* A new worker became available. Try to shrink the queue. */
	queue_try_send_more(queue);
}

int main(int argc, char *argv[])
{
	const char *error;

	master_service = master_service_init("indexer", 0, &argc, &argv, "");
	if (master_getopt(master_service) > 0)
		return FATAL_DEFAULT;

	if (master_service_settings_read_simple(master_service, NULL,
						&error) < 0)
		i_fatal("Error reading configuration: %s", error);
	set = master_service_settings_get(master_service);

	master_service_init_log(master_service);
	restrict_access_by_env(RESTRICT_ACCESS_FLAG_ALLOW_ROOT, NULL);
	restrict_access_allow_coredumps(TRUE);
	master_service_set_idle_die_callback(master_service, idle_die);

	queue = indexer_queue_init(indexer_client_status_callback);
	indexer_queue_set_listen_callback(queue, queue_listen_callback);
	worker_pool = worker_pool_init("indexer-worker",
				       worker_status_callback,
				       worker_avail_callback);
	master_service_init_finish(master_service);

	master_service_run(master_service, client_connected);

	indexer_queue_cancel_all(queue);
	indexer_clients_destroy_all();
	worker_pool_deinit(&worker_pool);
	indexer_queue_deinit(&queue);

	master_service_deinit(&master_service);
        return 0;
}