summaryrefslogtreecommitdiffstats
path: root/src/replication/replicator/replicator-brain.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/replication/replicator/replicator-brain.c')
-rw-r--r--src/replication/replicator/replicator-brain.c202
1 files changed, 202 insertions, 0 deletions
diff --git a/src/replication/replicator/replicator-brain.c b/src/replication/replicator/replicator-brain.c
new file mode 100644
index 0000000..65cfcec
--- /dev/null
+++ b/src/replication/replicator/replicator-brain.c
@@ -0,0 +1,202 @@
+/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "ioloop.h"
+#include "dsync-client.h"
+#include "replicator-settings.h"
+#include "replicator-queue.h"
+#include "replicator-brain.h"
+
+struct replicator_sync_context {
+ struct replicator_brain *brain;
+ struct replicator_user *user;
+};
+
+struct replicator_brain {
+ pool_t pool;
+ struct replicator_queue *queue;
+ const struct replicator_settings *set;
+ struct timeout *to;
+
+ ARRAY_TYPE(dsync_client) dsync_clients;
+
+ bool deinitializing:1;
+};
+
+static void replicator_brain_fill(struct replicator_brain *brain);
+
+static void replicator_brain_timeout(struct replicator_brain *brain)
+{
+ timeout_remove(&brain->to);
+ replicator_brain_fill(brain);
+}
+
+static void replicator_brain_queue_changed(void *context)
+{
+ struct replicator_brain *brain = context;
+
+ /* Delay a bit filling the replication. We could have gotten here
+ before the replicator_user change was fully filled out. */
+ timeout_remove(&brain->to);
+ brain->to = timeout_add_short(0, replicator_brain_timeout, brain);
+}
+
+struct replicator_brain *
+replicator_brain_init(struct replicator_queue *queue,
+ const struct replicator_settings *set)
+{
+ struct replicator_brain *brain;
+ pool_t pool;
+
+ pool = pool_alloconly_create("replication brain", 1024);
+ brain = p_new(pool, struct replicator_brain, 1);
+ brain->pool = pool;
+ brain->queue = queue;
+ brain->set = set;
+ p_array_init(&brain->dsync_clients, pool, 16);
+ replicator_queue_set_change_callback(queue,
+ replicator_brain_queue_changed, brain);
+ replicator_brain_fill(brain);
+ return brain;
+}
+
+void replicator_brain_deinit(struct replicator_brain **_brain)
+{
+ struct replicator_brain *brain = *_brain;
+ struct dsync_client *conn;
+
+ *_brain = NULL;
+
+ brain->deinitializing = TRUE;
+ array_foreach_elem(&brain->dsync_clients, conn)
+ dsync_client_deinit(&conn);
+ timeout_remove(&brain->to);
+ pool_unref(&brain->pool);
+}
+
+struct replicator_queue *
+replicator_brain_get_queue(struct replicator_brain *brain)
+{
+ return brain->queue;
+}
+
+const struct replicator_settings *
+replicator_brain_get_settings(struct replicator_brain *brain)
+{
+ return brain->set;
+}
+
+const ARRAY_TYPE(dsync_client) *
+replicator_brain_get_dsync_clients(struct replicator_brain *brain)
+{
+ return &brain->dsync_clients;
+}
+
+static struct dsync_client *
+get_dsync_client(struct replicator_brain *brain)
+{
+ struct dsync_client *conn;
+
+ array_foreach_elem(&brain->dsync_clients, conn) {
+ if (!dsync_client_is_busy(conn))
+ return conn;
+ }
+ if (array_count(&brain->dsync_clients) ==
+ brain->set->replication_max_conns)
+ return NULL;
+
+ conn = dsync_client_init(brain->set->doveadm_socket_path,
+ brain->set->replication_dsync_parameters);
+ array_push_back(&brain->dsync_clients, &conn);
+ return conn;
+}
+
+static void dsync_callback(enum dsync_reply reply, const char *state,
+ void *context)
+{
+ struct replicator_sync_context *ctx = context;
+ struct replicator_user *user = ctx->user;
+
+ if (!replicator_user_unref(&user)) {
+ /* user was already removed */
+ } else if (reply == DSYNC_REPLY_NOUSER ||
+ reply == DSYNC_REPLY_NOREPLICATE) {
+ /* user no longer exists, or is not wanted for replication,
+ remove from replication */
+ replicator_queue_remove(ctx->brain->queue, &ctx->user);
+ } else {
+ i_free(ctx->user->state);
+ ctx->user->state = i_strdup_empty(state);
+ ctx->user->last_sync_failed = reply != DSYNC_REPLY_OK;
+ if (reply == DSYNC_REPLY_OK)
+ ctx->user->last_successful_sync = ioloop_time;
+ replicator_queue_push(ctx->brain->queue, ctx->user);
+ }
+ if (!ctx->brain->deinitializing)
+ replicator_brain_fill(ctx->brain);
+ i_free(ctx);
+}
+
+static bool
+dsync_replicate(struct replicator_brain *brain, struct replicator_user *user)
+{
+ struct replicator_sync_context *ctx;
+ struct dsync_client *conn;
+ time_t next_full_sync;
+ bool full;
+
+ conn = get_dsync_client(brain);
+ if (conn == NULL)
+ return FALSE;
+
+ next_full_sync = user->last_full_sync +
+ brain->set->replication_full_sync_interval;
+ full = next_full_sync <= ioloop_time;
+ /* update the sync times immediately. if the replication fails we still
+ wouldn't want it to be retried immediately. */
+ user->last_fast_sync = ioloop_time;
+ if (full || user->force_full_sync) {
+ user->last_full_sync = ioloop_time;
+ user->force_full_sync = FALSE;
+ }
+ /* reset priority also. if more updates arrive during replication
+ we'll do another replication to make sure nothing gets lost */
+ user->priority = REPLICATION_PRIORITY_NONE;
+
+ ctx = i_new(struct replicator_sync_context, 1);
+ ctx->brain = brain;
+ ctx->user = user;
+ replicator_user_ref(user);
+ dsync_client_sync(conn, user->username, user->state, full,
+ dsync_callback, ctx);
+ return TRUE;
+}
+
+static bool replicator_brain_fill_next(struct replicator_brain *brain)
+{
+ struct replicator_user *user;
+ unsigned int next_secs;
+
+ user = replicator_queue_pop(brain->queue, &next_secs);
+ if (user == NULL) {
+ /* nothing more to do */
+ timeout_remove(&brain->to);
+ brain->to = timeout_add(next_secs * 1000,
+ replicator_brain_timeout, brain);
+ return FALSE;
+ }
+
+ if (!dsync_replicate(brain, user)) {
+ /* all connections were full, put the user back to queue */
+ replicator_queue_push(brain->queue, user);
+ return FALSE;
+ }
+ /* replication started for the user */
+ return TRUE;
+}
+
+static void replicator_brain_fill(struct replicator_brain *brain)
+{
+ while (replicator_brain_fill_next(brain)) ;
+}