diff options
Diffstat (limited to 'src/replication/replicator/replicator-brain.c')
-rw-r--r-- | src/replication/replicator/replicator-brain.c | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/src/replication/replicator/replicator-brain.c b/src/replication/replicator/replicator-brain.c new file mode 100644 index 0000000..65cfcec --- /dev/null +++ b/src/replication/replicator/replicator-brain.c @@ -0,0 +1,202 @@ +/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "array.h" +#include "ioloop.h" +#include "dsync-client.h" +#include "replicator-settings.h" +#include "replicator-queue.h" +#include "replicator-brain.h" + +struct replicator_sync_context { + struct replicator_brain *brain; + struct replicator_user *user; +}; + +struct replicator_brain { + pool_t pool; + struct replicator_queue *queue; + const struct replicator_settings *set; + struct timeout *to; + + ARRAY_TYPE(dsync_client) dsync_clients; + + bool deinitializing:1; +}; + +static void replicator_brain_fill(struct replicator_brain *brain); + +static void replicator_brain_timeout(struct replicator_brain *brain) +{ + timeout_remove(&brain->to); + replicator_brain_fill(brain); +} + +static void replicator_brain_queue_changed(void *context) +{ + struct replicator_brain *brain = context; + + /* Delay a bit filling the replication. We could have gotten here + before the replicator_user change was fully filled out. */ + timeout_remove(&brain->to); + brain->to = timeout_add_short(0, replicator_brain_timeout, brain); +} + +struct replicator_brain * +replicator_brain_init(struct replicator_queue *queue, + const struct replicator_settings *set) +{ + struct replicator_brain *brain; + pool_t pool; + + pool = pool_alloconly_create("replication brain", 1024); + brain = p_new(pool, struct replicator_brain, 1); + brain->pool = pool; + brain->queue = queue; + brain->set = set; + p_array_init(&brain->dsync_clients, pool, 16); + replicator_queue_set_change_callback(queue, + replicator_brain_queue_changed, brain); + replicator_brain_fill(brain); + return brain; +} + +void replicator_brain_deinit(struct replicator_brain **_brain) +{ + struct replicator_brain *brain = *_brain; + struct dsync_client *conn; + + *_brain = NULL; + + brain->deinitializing = TRUE; + array_foreach_elem(&brain->dsync_clients, conn) + dsync_client_deinit(&conn); + timeout_remove(&brain->to); + pool_unref(&brain->pool); +} + +struct replicator_queue * +replicator_brain_get_queue(struct replicator_brain *brain) +{ + return brain->queue; +} + +const struct replicator_settings * +replicator_brain_get_settings(struct replicator_brain *brain) +{ + return brain->set; +} + +const ARRAY_TYPE(dsync_client) * +replicator_brain_get_dsync_clients(struct replicator_brain *brain) +{ + return &brain->dsync_clients; +} + +static struct dsync_client * +get_dsync_client(struct replicator_brain *brain) +{ + struct dsync_client *conn; + + array_foreach_elem(&brain->dsync_clients, conn) { + if (!dsync_client_is_busy(conn)) + return conn; + } + if (array_count(&brain->dsync_clients) == + brain->set->replication_max_conns) + return NULL; + + conn = dsync_client_init(brain->set->doveadm_socket_path, + brain->set->replication_dsync_parameters); + array_push_back(&brain->dsync_clients, &conn); + return conn; +} + +static void dsync_callback(enum dsync_reply reply, const char *state, + void *context) +{ + struct replicator_sync_context *ctx = context; + struct replicator_user *user = ctx->user; + + if (!replicator_user_unref(&user)) { + /* user was already removed */ + } else if (reply == DSYNC_REPLY_NOUSER || + reply == DSYNC_REPLY_NOREPLICATE) { + /* user no longer exists, or is not wanted for replication, + remove from replication */ + replicator_queue_remove(ctx->brain->queue, &ctx->user); + } else { + i_free(ctx->user->state); + ctx->user->state = i_strdup_empty(state); + ctx->user->last_sync_failed = reply != DSYNC_REPLY_OK; + if (reply == DSYNC_REPLY_OK) + ctx->user->last_successful_sync = ioloop_time; + replicator_queue_push(ctx->brain->queue, ctx->user); + } + if (!ctx->brain->deinitializing) + replicator_brain_fill(ctx->brain); + i_free(ctx); +} + +static bool +dsync_replicate(struct replicator_brain *brain, struct replicator_user *user) +{ + struct replicator_sync_context *ctx; + struct dsync_client *conn; + time_t next_full_sync; + bool full; + + conn = get_dsync_client(brain); + if (conn == NULL) + return FALSE; + + next_full_sync = user->last_full_sync + + brain->set->replication_full_sync_interval; + full = next_full_sync <= ioloop_time; + /* update the sync times immediately. if the replication fails we still + wouldn't want it to be retried immediately. */ + user->last_fast_sync = ioloop_time; + if (full || user->force_full_sync) { + user->last_full_sync = ioloop_time; + user->force_full_sync = FALSE; + } + /* reset priority also. if more updates arrive during replication + we'll do another replication to make sure nothing gets lost */ + user->priority = REPLICATION_PRIORITY_NONE; + + ctx = i_new(struct replicator_sync_context, 1); + ctx->brain = brain; + ctx->user = user; + replicator_user_ref(user); + dsync_client_sync(conn, user->username, user->state, full, + dsync_callback, ctx); + return TRUE; +} + +static bool replicator_brain_fill_next(struct replicator_brain *brain) +{ + struct replicator_user *user; + unsigned int next_secs; + + user = replicator_queue_pop(brain->queue, &next_secs); + if (user == NULL) { + /* nothing more to do */ + timeout_remove(&brain->to); + brain->to = timeout_add(next_secs * 1000, + replicator_brain_timeout, brain); + return FALSE; + } + + if (!dsync_replicate(brain, user)) { + /* all connections were full, put the user back to queue */ + replicator_queue_push(brain->queue, user); + return FALSE; + } + /* replication started for the user */ + return TRUE; +} + +static void replicator_brain_fill(struct replicator_brain *brain) +{ + while (replicator_brain_fill_next(brain)) ; +} |