summaryrefslogtreecommitdiffstats
path: root/src/indexer/worker-connection.c
blob: 19e2a4160f63e3743240cbda9a549302c0190d1f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
/* Copyright (c) 2011-2018 Dovecot authors, see the included COPYING file */

#include "lib.h"
#include "array.h"
#include "aqueue.h"
#include "connection.h"
#include "ioloop.h"
#include "istream.h"
#include "llist.h"
#include "ostream.h"
#include "str.h"
#include "strescape.h"
#include "master-service.h"
#include "indexer-queue.h"
#include "worker-connection.h"

#include <unistd.h>

#define INDEXER_PROTOCOL_MAJOR_VERSION 1
#define INDEXER_PROTOCOL_MINOR_VERSION 0

#define INDEXER_MASTER_NAME "indexer-master-worker"
#define INDEXER_WORKER_NAME "indexer-worker-master"

struct worker_connection {
	struct connection conn;

	indexer_status_callback_t *callback;
	worker_available_callback_t *avail_callback;

	char *request_username;
	struct indexer_request *request;
};

static unsigned int worker_last_process_limit = 0;

static void worker_connection_call_callback(struct worker_connection *worker,
					    int percentage)
{
	if (worker->request != NULL)
		worker->callback(percentage, worker->request);
	if (percentage < 0 || percentage == 100)
		worker->request = NULL;
}

void worker_connection_destroy(struct connection *conn)
{
	struct worker_connection *worker =
		container_of(conn, struct worker_connection, conn);

	worker_connection_call_callback(worker, -1);
	i_free_and_null(worker->request_username);
	connection_deinit(conn);

	worker->avail_callback();
	i_free(conn);
}

static int
worker_connection_handshake_args(struct connection *conn, const char *const *args)
{
	unsigned int process_limit;
	int ret;
	if (!conn->version_received) {
		if ((ret = connection_handshake_args_default(conn, args)) < 1)
			return ret;
		/* we are not done yet */
		return 0;
	}
	if (str_to_uint(args[0], &process_limit) < 0 ||
	    process_limit == 0) {
		e_error(conn->event, "Worker sent invalid process limit '%s'",
			args[0]);
		return -1;
	}
	worker_last_process_limit = process_limit;
	return 1;
}

static int
worker_connection_input_args(struct connection *conn, const char *const *args)
{
	struct worker_connection *worker =
		container_of(conn, struct worker_connection, conn);
	int percentage;
	int ret = 1;

	if (str_to_int(args[0], &percentage) < 0 ||
	    percentage < -1 || percentage > 100) {
		e_error(conn->event, "Worker sent invalid progress '%s'", args[0]);
		return -1;
	}

	if (percentage < 0)
		ret = -1;

	worker_connection_call_callback(worker, percentage);
	if (worker->request == NULL) {
		/* disconnect after each request */
		ret = -1;
	}

	return ret;
}

bool worker_connection_is_connected(struct connection *conn)
{
	return !conn->disconnected;
}

unsigned int worker_connections_get_process_limit(void)
{
	return worker_last_process_limit;
}

void worker_connection_request(struct connection *conn,
			       struct indexer_request *request)
{
	struct worker_connection *worker =
		container_of(conn, struct worker_connection, conn);

	i_assert(worker_connection_is_connected(conn));
	i_assert(request->index || request->optimize);

	if (worker->request_username == NULL)
		worker->request_username = i_strdup(request->username);
	else {
		i_assert(strcmp(worker->request_username,
				request->username) == 0);
	}

	worker->request = request;

	T_BEGIN {
		string_t *str = t_str_new(128);

		str_append_tabescaped(str, request->username);
		str_append_c(str, '\t');
		str_append_tabescaped(str, request->mailbox);
		str_append_c(str, '\t');
		if (request->session_id != NULL)
			str_append_tabescaped(str, request->session_id);
		str_printfa(str, "\t%u\t", request->max_recent_msgs);
		if (request->index)
			str_append_c(str, 'i');
		if (request->optimize)
			str_append_c(str, 'o');
		str_append_c(str, '\n');
		o_stream_nsend(conn->output, str_data(str), str_len(str));
	} T_END;
}

const char *worker_connection_get_username(struct connection *conn)
{
	struct worker_connection *worker =
		container_of(conn, struct worker_connection, conn);
	return worker->request_username;
}

static const struct connection_vfuncs worker_connection_vfuncs = {
	.destroy = worker_connection_destroy,
	.input_args = worker_connection_input_args,
	.handshake_args = worker_connection_handshake_args,
};

static const struct connection_settings worker_connection_set = {
	.service_name_in = INDEXER_WORKER_NAME,
	.service_name_out = INDEXER_MASTER_NAME,
	.major_version = INDEXER_PROTOCOL_MAJOR_VERSION,
	.minor_version = INDEXER_PROTOCOL_MINOR_VERSION,
	.input_max_size = SIZE_MAX,
	.output_max_size = SIZE_MAX,
	.client = TRUE,
};

struct connection_list *worker_connection_list_create(void)
{
	return connection_list_init(&worker_connection_set,
				    &worker_connection_vfuncs);
}

struct connection *
worker_connection_create(const char *socket_path,
			 indexer_status_callback_t *callback,
			 worker_available_callback_t *avail_callback,
			 struct connection_list *list)
{
	struct worker_connection *conn;

	conn = i_new(struct worker_connection, 1);
	conn->callback = callback;
	conn->avail_callback = avail_callback;
	connection_init_client_unix(list, &conn->conn, socket_path);

	return &conn->conn;
}