summaryrefslogtreecommitdiffstats
path: root/src/replication/replicator/replicator-brain.c
blob: 65cfcecd49c0257e7f757242c262d653ec3661b1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */

#include "lib.h"
#include "array.h"
#include "ioloop.h"
#include "dsync-client.h"
#include "replicator-settings.h"
#include "replicator-queue.h"
#include "replicator-brain.h"

struct replicator_sync_context {
	struct replicator_brain *brain;
	struct replicator_user *user;
};

struct replicator_brain {
	pool_t pool;
	struct replicator_queue *queue;
	const struct replicator_settings *set;
	struct timeout *to;

	ARRAY_TYPE(dsync_client) dsync_clients;

	bool deinitializing:1;
};

static void replicator_brain_fill(struct replicator_brain *brain);

static void replicator_brain_timeout(struct replicator_brain *brain)
{
	timeout_remove(&brain->to);
	replicator_brain_fill(brain);
}

static void replicator_brain_queue_changed(void *context)
{
	struct replicator_brain *brain = context;

	/* Delay a bit filling the replication. We could have gotten here
	   before the replicator_user change was fully filled out. */
	timeout_remove(&brain->to);
	brain->to = timeout_add_short(0, replicator_brain_timeout, brain);
}

struct replicator_brain *
replicator_brain_init(struct replicator_queue *queue,
		      const struct replicator_settings *set)
{
	struct replicator_brain *brain;
	pool_t pool;

	pool = pool_alloconly_create("replication brain", 1024);
	brain = p_new(pool, struct replicator_brain, 1);
	brain->pool = pool;
	brain->queue = queue;
	brain->set = set;
	p_array_init(&brain->dsync_clients, pool, 16);
	replicator_queue_set_change_callback(queue,
		replicator_brain_queue_changed, brain);
	replicator_brain_fill(brain);
	return brain;
}

void replicator_brain_deinit(struct replicator_brain **_brain)
{
	struct replicator_brain *brain = *_brain;
	struct dsync_client *conn;

	*_brain = NULL;

	brain->deinitializing = TRUE;
	array_foreach_elem(&brain->dsync_clients, conn)
		dsync_client_deinit(&conn);
	timeout_remove(&brain->to);
	pool_unref(&brain->pool);
}

struct replicator_queue *
replicator_brain_get_queue(struct replicator_brain *brain)
{
	return brain->queue;
}

const struct replicator_settings *
replicator_brain_get_settings(struct replicator_brain *brain)
{
	return brain->set;
}

const ARRAY_TYPE(dsync_client) *
replicator_brain_get_dsync_clients(struct replicator_brain *brain)
{
	return &brain->dsync_clients;
}

static struct dsync_client *
get_dsync_client(struct replicator_brain *brain)
{
	struct dsync_client *conn;

	array_foreach_elem(&brain->dsync_clients, conn) {
		if (!dsync_client_is_busy(conn))
			return conn;
	}
	if (array_count(&brain->dsync_clients) ==
	    brain->set->replication_max_conns)
		return NULL;

	conn = dsync_client_init(brain->set->doveadm_socket_path,
				 brain->set->replication_dsync_parameters);
	array_push_back(&brain->dsync_clients, &conn);
	return conn;
}

static void dsync_callback(enum dsync_reply reply, const char *state,
			   void *context)
{
	struct replicator_sync_context *ctx = context;
	struct replicator_user *user = ctx->user;

	if (!replicator_user_unref(&user)) {
		/* user was already removed */
	} else if (reply == DSYNC_REPLY_NOUSER ||
		   reply == DSYNC_REPLY_NOREPLICATE) {
		/* user no longer exists, or is not wanted for replication,
		   remove from replication */
		replicator_queue_remove(ctx->brain->queue, &ctx->user);
	} else {
		i_free(ctx->user->state);
		ctx->user->state = i_strdup_empty(state);
		ctx->user->last_sync_failed = reply != DSYNC_REPLY_OK;
		if (reply == DSYNC_REPLY_OK)
			ctx->user->last_successful_sync = ioloop_time;
		replicator_queue_push(ctx->brain->queue, ctx->user);
	}
	if (!ctx->brain->deinitializing)
		replicator_brain_fill(ctx->brain);
	i_free(ctx);
}

static bool
dsync_replicate(struct replicator_brain *brain, struct replicator_user *user)
{
	struct replicator_sync_context *ctx;
	struct dsync_client *conn;
	time_t next_full_sync;
	bool full;

	conn = get_dsync_client(brain);
	if (conn == NULL)
		return FALSE;

	next_full_sync = user->last_full_sync +
		brain->set->replication_full_sync_interval;
	full = next_full_sync <= ioloop_time;
	/* update the sync times immediately. if the replication fails we still
	   wouldn't want it to be retried immediately. */
	user->last_fast_sync = ioloop_time;
	if (full || user->force_full_sync) {
		user->last_full_sync = ioloop_time;
		user->force_full_sync = FALSE;
	}
	/* reset priority also. if more updates arrive during replication
	   we'll do another replication to make sure nothing gets lost */
	user->priority = REPLICATION_PRIORITY_NONE;

	ctx = i_new(struct replicator_sync_context, 1);
	ctx->brain = brain;
	ctx->user = user;
	replicator_user_ref(user);
	dsync_client_sync(conn, user->username, user->state, full,
			  dsync_callback, ctx);
	return TRUE;
}

static bool replicator_brain_fill_next(struct replicator_brain *brain)
{
	struct replicator_user *user;
	unsigned int next_secs;

	user = replicator_queue_pop(brain->queue, &next_secs);
	if (user == NULL) {
		/* nothing more to do */
		timeout_remove(&brain->to);
		brain->to = timeout_add(next_secs * 1000,
					replicator_brain_timeout, brain);
		return FALSE;
	}

	if (!dsync_replicate(brain, user)) {
		/* all connections were full, put the user back to queue */
		replicator_queue_push(brain->queue, user);
		return FALSE;
	}
	/* replication started for the user */
	return TRUE;
}

static void replicator_brain_fill(struct replicator_brain *brain)
{
	while (replicator_brain_fill_next(brain)) ;
}