summaryrefslogtreecommitdiffstats
path: root/src/plugins/replication/replication-plugin.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/replication/replication-plugin.c')
-rw-r--r--src/plugins/replication/replication-plugin.c404
1 files changed, 404 insertions, 0 deletions
diff --git a/src/plugins/replication/replication-plugin.c b/src/plugins/replication/replication-plugin.c
new file mode 100644
index 0000000..9b4bb08
--- /dev/null
+++ b/src/plugins/replication/replication-plugin.c
@@ -0,0 +1,404 @@
+/* Copyright (c) 2013-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "array.h"
+#include "str.h"
+#include "strescape.h"
+#include "ioloop.h"
+#include "net.h"
+#include "write-full.h"
+#include "mail-user.h"
+#include "mail-namespace.h"
+#include "mail-storage-private.h"
+#include "notify-plugin.h"
+#include "replication-common.h"
+#include "replication-plugin.h"
+
+
+#define REPLICATION_SOCKET_NAME "replication-notify"
+#define REPLICATION_FIFO_NAME "replication-notify-fifo"
+#define REPLICATION_NOTIFY_DELAY_MSECS 500
+#define REPLICATION_SYNC_TIMEOUT_SECS 10
+
+#define REPLICATION_USER_CONTEXT(obj) \
+ MODULE_CONTEXT(obj, replication_user_module)
+
+struct replication_user {
+ union mail_user_module_context module_ctx;
+
+ const char *socket_path;
+
+ struct timeout *to;
+ enum replication_priority priority;
+ unsigned int sync_secs;
+};
+
+struct replication_mail_txn_context {
+ struct mail_namespace *ns;
+ bool new_messages;
+ bool sync_trans;
+ char *reason;
+};
+
+static MODULE_CONTEXT_DEFINE_INIT(replication_user_module,
+ &mail_user_module_register);
+static int fifo_fd;
+static bool fifo_failed;
+static char *fifo_path;
+
+static int
+replication_fifo_notify(struct mail_user *user,
+ enum replication_priority priority)
+{
+ string_t *str;
+ ssize_t ret;
+
+ if (fifo_failed)
+ return -1;
+ if (fifo_fd == -1) {
+ fifo_fd = open(fifo_path, O_WRONLY | O_NONBLOCK);
+ if (fifo_fd == -1) {
+ i_error("open(%s) failed: %m", fifo_path);
+ fifo_failed = TRUE;
+ return -1;
+ }
+ }
+ /* <username> \t <priority> */
+ str = t_str_new(256);
+ str_append_tabescaped(str, user->username);
+ str_append_c(str, '\t');
+ switch (priority) {
+ case REPLICATION_PRIORITY_NONE:
+ case REPLICATION_PRIORITY_SYNC:
+ i_unreached();
+ case REPLICATION_PRIORITY_LOW:
+ str_append(str, "low");
+ break;
+ case REPLICATION_PRIORITY_HIGH:
+ str_append(str, "high");
+ break;
+ }
+ str_append_c(str, '\n');
+ ret = write(fifo_fd, str_data(str), str_len(str));
+ i_assert(ret != 0);
+ if (ret != (ssize_t)str_len(str)) {
+ if (ret > 0)
+ i_error("write(%s) wrote partial data", fifo_path);
+ else if (errno == EAGAIN) {
+ /* busy, try again later */
+ return 0;
+ } else if (errno != EPIPE) {
+ i_error("write(%s) failed: %m", fifo_path);
+ } else {
+ /* server was probably restarted, don't bother logging
+ this. */
+ }
+ if (close(fifo_fd) < 0)
+ i_error("close(%s) failed: %m", fifo_path);
+ fifo_fd = -1;
+ return -1;
+ }
+ return 1;
+}
+
+static void replication_notify_now(struct mail_user *user)
+{
+ struct replication_user *ruser = REPLICATION_USER_CONTEXT(user);
+ int ret;
+
+ i_assert(ruser != NULL);
+ i_assert(ruser->priority != REPLICATION_PRIORITY_NONE);
+ i_assert(ruser->priority != REPLICATION_PRIORITY_SYNC);
+
+ if ((ret = replication_fifo_notify(user, ruser->priority)) < 0 &&
+ !fifo_failed) {
+ /* retry once, in case replication server was restarted */
+ ret = replication_fifo_notify(user, ruser->priority);
+ }
+ if (ret != 0) {
+ timeout_remove(&ruser->to);
+ ruser->priority = REPLICATION_PRIORITY_NONE;
+ }
+}
+
+static int replication_notify_sync(struct mail_user *user)
+{
+ struct replication_user *ruser = REPLICATION_USER_CONTEXT(user);
+ string_t *str;
+ char buf[1024];
+ int fd;
+ ssize_t ret;
+ bool success = FALSE;
+
+ i_assert(ruser != NULL);
+
+ fd = net_connect_unix(ruser->socket_path);
+ if (fd == -1) {
+ i_error("net_connect_unix(%s) failed: %m", ruser->socket_path);
+ return -1;
+ }
+ net_set_nonblock(fd, FALSE);
+
+ /* <username> \t "sync" */
+ str = t_str_new(256);
+ str_append_tabescaped(str, user->username);
+ str_append(str, "\tsync\n");
+ alarm(ruser->sync_secs);
+ if (write_full(fd, str_data(str), str_len(str)) < 0) {
+ i_error("write(%s) failed: %m", ruser->socket_path);
+ } else {
+ /* + | - */
+ ret = read(fd, buf, sizeof(buf));
+ if (ret < 0) {
+ if (errno != EINTR) {
+ i_error("read(%s) failed: %m",
+ ruser->socket_path);
+ } else {
+ i_warning("replication(%s): Sync failure: "
+ "Timeout in %u secs",
+ user->username, ruser->sync_secs);
+ }
+ } else if (ret == 0) {
+ i_error("read(%s) failed: EOF", ruser->socket_path);
+ } else if (buf[0] == '+') {
+ /* success */
+ success = TRUE;
+ } else if (buf[0] == '-') {
+ /* failure */
+ if (buf[ret-1] == '\n') ret--;
+ i_warning("replication(%s): Sync failure: %s",
+ user->username, t_strndup(buf+1, ret-1));
+ i_warning("replication(%s): "
+ "Remote sent invalid input: %s",
+ user->username, t_strndup(buf, ret));
+ }
+ }
+ alarm(0);
+ if (close(fd) < 0)
+ i_error("close(%s) failed: %m", ruser->socket_path);
+ return success ? 0 : -1;
+}
+
+static void replication_notify(struct mail_namespace *ns,
+ enum replication_priority priority,
+ const char *event)
+{
+ struct replication_user *ruser;
+
+ ruser = REPLICATION_USER_CONTEXT(ns->user);
+ if (ruser == NULL)
+ return;
+
+ e_debug(ns->user->event,
+ "replication: Replication requested by '%s', priority=%d",
+ event, priority);
+
+ if (priority == REPLICATION_PRIORITY_SYNC) {
+ if (replication_notify_sync(ns->user) == 0) {
+ timeout_remove(&ruser->to);
+ ruser->priority = REPLICATION_PRIORITY_NONE;
+ return;
+ }
+ /* sync replication failed, try as "high" via fifo */
+ priority = REPLICATION_PRIORITY_HIGH;
+ }
+
+ if (ruser->priority < priority)
+ ruser->priority = priority;
+ if (ruser->to == NULL) {
+ ruser->to = timeout_add_short(REPLICATION_NOTIFY_DELAY_MSECS,
+ replication_notify_now, ns->user);
+ }
+}
+
+static void *
+replication_mail_transaction_begin(struct mailbox_transaction_context *t)
+{
+ struct replication_mail_txn_context *ctx;
+
+ ctx = i_new(struct replication_mail_txn_context, 1);
+ ctx->ns = mailbox_get_namespace(t->box);
+ ctx->reason = i_strdup(t->reason);
+ if ((t->flags & MAILBOX_TRANSACTION_FLAG_SYNC) != 0) {
+ /* Transaction is from dsync. Don't trigger replication back. */
+ ctx->sync_trans = TRUE;
+ }
+ return ctx;
+}
+
+static void replication_mail_save(void *txn, struct mail *mail ATTR_UNUSED)
+{
+ struct replication_mail_txn_context *ctx =
+ (struct replication_mail_txn_context *)txn;
+
+ ctx->new_messages = TRUE;
+}
+
+static void replication_mail_copy(void *txn, struct mail *src,
+ struct mail *dst)
+{
+ struct replication_mail_txn_context *ctx =
+ (struct replication_mail_txn_context *)txn;
+
+ if (src->box->storage != dst->box->storage) {
+ /* copy between storages, e.g. new mail delivery */
+ ctx->new_messages = TRUE;
+ } else {
+ /* copy within storage, which isn't as high priority since the
+ mail already exists. and especially copies to Trash or to
+ lazy-expunge namespace is pretty low priority. */
+ }
+}
+
+static bool
+replication_want_sync_changes(const struct mail_transaction_commit_changes *changes)
+{
+ /* Replication needs to be triggered on all the user-visible changes,
+ but not e.g. due to writes to cache file. */
+ return (changes->changes_mask &
+ ENUM_NEGATE(MAIL_INDEX_TRANSACTION_CHANGE_OTHERS)) != 0;
+}
+
+static void
+replication_mail_transaction_commit(void *txn,
+ struct mail_transaction_commit_changes *changes)
+{
+ struct replication_mail_txn_context *ctx =
+ (struct replication_mail_txn_context *)txn;
+ struct replication_user *ruser =
+ REPLICATION_USER_CONTEXT(ctx->ns->user);
+ enum replication_priority priority;
+
+ if (ruser != NULL && !ctx->sync_trans &&
+ (ctx->new_messages || replication_want_sync_changes(changes))) {
+ priority = !ctx->new_messages ? REPLICATION_PRIORITY_LOW :
+ ruser->sync_secs == 0 ? REPLICATION_PRIORITY_HIGH :
+ REPLICATION_PRIORITY_SYNC;
+ replication_notify(ctx->ns, priority, ctx->reason);
+ }
+ i_free(ctx->reason);
+ i_free(ctx);
+}
+
+static void replication_mailbox_create(struct mailbox *box)
+{
+ replication_notify(mailbox_get_namespace(box),
+ REPLICATION_PRIORITY_LOW, "mailbox create");
+}
+
+static void
+replication_mailbox_delete_commit(void *txn ATTR_UNUSED,
+ struct mailbox *box)
+{
+ replication_notify(mailbox_get_namespace(box),
+ REPLICATION_PRIORITY_LOW, "mailbox delete");
+}
+
+static void
+replication_mailbox_rename(struct mailbox *src ATTR_UNUSED,
+ struct mailbox *dest)
+{
+ replication_notify(mailbox_get_namespace(dest),
+ REPLICATION_PRIORITY_LOW, "mailbox rename");
+}
+
+static void replication_mailbox_set_subscribed(struct mailbox *box,
+ bool subscribed ATTR_UNUSED)
+{
+ replication_notify(mailbox_get_namespace(box),
+ REPLICATION_PRIORITY_LOW, "mailbox subscribe");
+}
+
+static void replication_user_deinit(struct mail_user *user)
+{
+ struct replication_user *ruser = REPLICATION_USER_CONTEXT(user);
+
+ i_assert(ruser != NULL);
+
+ if (ruser->to != NULL) {
+ replication_notify_now(user);
+ if (ruser->to != NULL) {
+ i_warning("%s: Couldn't send final notification "
+ "due to fifo being busy", fifo_path);
+ timeout_remove(&ruser->to);
+ }
+ }
+
+ ruser->module_ctx.super.deinit(user);
+}
+
+static void replication_user_created(struct mail_user *user)
+{
+ struct mail_user_vfuncs *v = user->vlast;
+ struct replication_user *ruser;
+ const char *value;
+
+ value = mail_user_plugin_getenv(user, "mail_replica");
+ if (value == NULL || value[0] == '\0') {
+ e_debug(user->event, "replication: No mail_replica setting - replication disabled");
+ return;
+ }
+
+ if (user->dsyncing) {
+ /* we're running dsync, which means that the remote is telling
+ us about a change. don't trigger a replication back to it */
+ e_debug(user->event, "replication: We're running dsync - replication disabled");
+ return;
+ }
+
+ ruser = p_new(user->pool, struct replication_user, 1);
+ ruser->module_ctx.super = *v;
+ user->vlast = &ruser->module_ctx.super;
+ v->deinit = replication_user_deinit;
+ MODULE_CONTEXT_SET(user, replication_user_module, ruser);
+
+ if (fifo_path == NULL) {
+ /* we'll assume that all users have the same base_dir.
+ they really should. */
+ fifo_path = i_strconcat(user->set->base_dir,
+ "/"REPLICATION_FIFO_NAME, NULL);
+ }
+ ruser->socket_path = p_strconcat(user->pool, user->set->base_dir,
+ "/"REPLICATION_SOCKET_NAME, NULL);
+ value = mail_user_plugin_getenv(user, "replication_sync_timeout");
+ if (value != NULL && str_to_uint(value, &ruser->sync_secs) < 0) {
+ i_error("replication(%s): "
+ "Invalid replication_sync_timeout value: %s",
+ user->username, value);
+ }
+}
+
+static const struct notify_vfuncs replication_vfuncs = {
+ .mail_transaction_begin = replication_mail_transaction_begin,
+ .mail_save = replication_mail_save,
+ .mail_copy = replication_mail_copy,
+ .mail_transaction_commit = replication_mail_transaction_commit,
+ .mailbox_create = replication_mailbox_create,
+ .mailbox_delete_commit = replication_mailbox_delete_commit,
+ .mailbox_rename = replication_mailbox_rename,
+ .mailbox_set_subscribed = replication_mailbox_set_subscribed
+};
+
+static struct notify_context *replication_ctx;
+
+static struct mail_storage_hooks replication_mail_storage_hooks = {
+ .mail_user_created = replication_user_created
+};
+
+void replication_plugin_init(struct module *module)
+{
+ fifo_fd = -1;
+ replication_ctx = notify_register(&replication_vfuncs);
+ mail_storage_hooks_add(module, &replication_mail_storage_hooks);
+}
+
+void replication_plugin_deinit(void)
+{
+ i_close_fd_path(&fifo_fd, fifo_path);
+ i_free_and_null(fifo_path);
+
+ mail_storage_hooks_remove(&replication_mail_storage_hooks);
+ notify_unregister(replication_ctx);
+}
+
+const char *replication_plugin_dependencies[] = { "notify", NULL };