summaryrefslogtreecommitdiffstats
path: root/src/director/doveadm-connection.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/director/doveadm-connection.c')
-rw-r--r--src/director/doveadm-connection.c1196
1 files changed, 1196 insertions, 0 deletions
diff --git a/src/director/doveadm-connection.c b/src/director/doveadm-connection.c
new file mode 100644
index 0000000..c840536
--- /dev/null
+++ b/src/director/doveadm-connection.c
@@ -0,0 +1,1196 @@
+/* Copyright (c) 2010-2018 Dovecot authors, see the included COPYING file */
+
+#include "lib.h"
+#include "ioloop.h"
+#include "net.h"
+#include "istream.h"
+#include "ostream.h"
+#include "array.h"
+#include "str.h"
+#include "strescape.h"
+#include "llist.h"
+#include "time-util.h"
+#include "master-service.h"
+#include "user-directory.h"
+#include "mail-host.h"
+#include "director.h"
+#include "director-host.h"
+#include "director-request.h"
+#include "director-connection.h"
+#include "doveadm-connection.h"
+
+#include <unistd.h>
+
+#define DOVEADM_PROTOCOL_VERSION_MAJOR 1
+#define DOVEADM_HANDSHAKE "VERSION\tdirector-doveadm\t1\t0\n"
+
+#define MAX_VALID_VHOST_COUNT 1000
+
+#define DOVEADM_CONNECTION_RING_SYNC_TIMEOUT_MSECS (30*1000)
+
+enum doveadm_director_cmd_ret {
+ DOVEADM_DIRECTOR_CMD_RET_FAIL = -1,
+ DOVEADM_DIRECTOR_CMD_RET_UNFINISHED = 0,
+ DOVEADM_DIRECTOR_CMD_RET_OK = 1,
+ DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK,
+};
+
+enum doveadm_director_cmd_flag {
+ DOVEADM_DIRECTOR_CMD_FLAG_PRE_RING_SYNC = 0x01,
+};
+
+typedef void
+doveadm_connection_ring_sync_callback_t(struct doveadm_connection *);
+
+struct director_reset_cmd {
+ struct director_reset_cmd *prev, *next;
+
+ struct director *dir;
+ struct doveadm_connection *_conn;
+ struct timeval start_time;
+
+ struct director_user_iter *iter;
+ unsigned int host_start_idx, host_idx, hosts_count;
+ unsigned int max_moving_users;
+ unsigned int reset_count;
+ bool users_killed;
+};
+
+struct director_kick_cmd {
+ struct director_kick_cmd *prev, *next;
+
+ struct doveadm_connection *_conn;
+ struct director *dir;
+ char *mask, *field, *value;
+ bool alt:1;
+};
+
+struct doveadm_connection {
+ struct doveadm_connection *prev, *next;
+
+ int fd;
+ struct io *io;
+ struct istream *input;
+ struct ostream *output;
+ struct director *dir;
+
+ struct timeout *to_ring_sync_abort;
+ struct director_reset_cmd *reset_cmd;
+ struct director_kick_cmd *kick_cmd;
+ doveadm_connection_ring_sync_callback_t *ring_sync_callback;
+
+ const char **cmd_pending_args;
+ unsigned int cmd_pending_idx;
+
+ bool handshaked:1;
+};
+
+static struct doveadm_connection *doveadm_connections;
+static struct doveadm_connection *doveadm_ring_sync_pending_connections;
+static struct director_reset_cmd *reset_cmds = NULL;
+static struct director_kick_cmd *kick_cmds = NULL;
+
+static void doveadm_connection_set_io(struct doveadm_connection *conn);
+static void doveadm_connection_deinit(struct doveadm_connection **_conn);
+static void
+doveadm_connection_ring_sync_list_move(struct doveadm_connection *conn);
+static void doveadm_connection_cmd_run_synced(struct doveadm_connection *conn);
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_host_list(struct doveadm_connection *conn,
+ const char *const *args ATTR_UNUSED)
+{
+ struct mail_host *host;
+ string_t *str = t_str_new(1024);
+
+ array_foreach_elem(mail_hosts_get(conn->dir->mail_hosts), host) {
+ str_printfa(str, "%s\t%u\t%u\t",
+ host->ip_str, host->vhost_count,
+ host->user_count);
+ str_append_tabescaped(str, mail_host_get_tag(host));
+ str_printfa(str, "\t%c\t%ld", host->down ? 'D' : 'U',
+ (long)host->last_updown_change);
+ str_append_c(str, '\n');
+ }
+ str_append_c(str, '\n');
+ o_stream_nsend(conn->output, str_data(str), str_len(str));
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_host_list_removed(struct doveadm_connection *conn,
+ const char *const *args ATTR_UNUSED)
+{
+ struct mail_host_list *orig_hosts_list;
+ struct mail_host *const *orig_hosts, *const *cur_hosts;
+ unsigned int i, j, orig_hosts_count, cur_hosts_count;
+ string_t *str = t_str_new(1024);
+ int ret;
+
+ orig_hosts_list = mail_hosts_init(conn->dir,
+ conn->dir->set->director_user_expire,
+ NULL);
+ (void)mail_hosts_parse_and_add(orig_hosts_list,
+ conn->dir->set->director_mail_servers);
+
+ orig_hosts = array_get(mail_hosts_get(orig_hosts_list),
+ &orig_hosts_count);
+ cur_hosts = array_get(mail_hosts_get(conn->dir->mail_hosts),
+ &cur_hosts_count);
+
+ /* the hosts are sorted by IP */
+ for (i = j = 0; i < orig_hosts_count && j < cur_hosts_count; ) {
+ ret = net_ip_cmp(&orig_hosts[i]->ip, &cur_hosts[j]->ip);
+ if (ret == 0)
+ i++, j++;
+ else if (ret > 0)
+ j++;
+ else {
+ str_printfa(str, "%s\n", orig_hosts[i]->ip_str);
+ i++;
+ }
+ }
+ for (; i < orig_hosts_count; i++)
+ str_printfa(str, "%s\n", orig_hosts[i]->ip_str);
+ str_append_c(str, '\n');
+ o_stream_nsend(conn->output, str_data(str), str_len(str));
+
+ mail_hosts_deinit(&orig_hosts_list);
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+}
+
+static void
+doveadm_director_host_append_status(const struct director_host *host,
+ const char *type, string_t *str)
+{
+ time_t last_failed = I_MAX(host->last_network_failure,
+ host->last_protocol_failure);
+ str_printfa(str, "%s\t%u\t%s\t%"PRIdTIME_T"\t",
+ host->ip_str, host->port, type,
+ last_failed);
+}
+
+static void doveadm_director_append_status(struct director *dir, string_t *str)
+{
+ if (!dir->ring_handshaked)
+ str_append(str, "ring handshaking");
+ else if (dir->ring_synced)
+ str_append(str, "ring synced");
+ else {
+ str_printfa(str, "ring syncing - last sync %d secs ago",
+ (int)(ioloop_time - dir->ring_last_sync_time));
+ }
+ str_printfa(str, "\t%u", dir->last_sync_msecs);
+}
+
+static void
+doveadm_director_connection_append_status(struct director_connection *conn,
+ string_t *str)
+{
+ struct director_connection_status status;
+
+ director_connection_get_status(conn, &status);
+ if (!director_connection_is_handshaked(conn)) {
+ str_append(str, "handshaking - ");
+ if (director_connection_is_incoming(conn))
+ str_printfa(str, "%u USERs received", status.handshake_users_received);
+ else
+ str_printfa(str, "%u USERs sent", status.handshake_users_sent);
+ } else if (director_connection_is_synced(conn))
+ str_append(str, "synced");
+ else
+ str_append(str, "syncing");
+
+ str_printfa(str, "\t%u\t%"PRIuUOFF_T"\t%"PRIuUOFF_T"\t%zu\t%zu\t"
+ "%"PRIdTIME_T"\t%"PRIdTIME_T, status.last_ping_msecs,
+ status.bytes_read, status.bytes_sent,
+ status.bytes_buffered, status.peak_bytes_buffered,
+ status.last_input.tv_sec, status.last_output.tv_sec);
+}
+
+static void
+doveadm_director_connection_append(struct director *dir,
+ struct director_connection *conn,
+ const struct director_host *host,
+ string_t *str)
+{
+ const char *type;
+
+ if (conn == dir->left)
+ type = "left";
+ else if (conn == dir->right)
+ type = "right";
+ else if (director_connection_is_incoming(conn))
+ type = "in";
+ else
+ type = "out";
+
+ if (host != NULL)
+ doveadm_director_host_append_status(host, type, str);
+ doveadm_director_connection_append_status(conn, str);
+ str_append_c(str, '\n');
+}
+
+static void
+doveadm_director_host_append(struct director *dir,
+ const struct director_host *host, string_t *str)
+{
+ const char *type;
+
+ if (host->removed)
+ type = "removed";
+ else if (dir->self_host == host)
+ type = "self";
+ else
+ type = "";
+
+ doveadm_director_host_append_status(host, type, str);
+ if (dir->self_host == host)
+ doveadm_director_append_status(dir, str);
+ str_append_c(str, '\n');
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_director_list(struct doveadm_connection *conn,
+ const char *const *args ATTR_UNUSED)
+{
+ struct director *dir = conn->dir;
+ struct director_host *host;
+ string_t *str = t_str_new(1024);
+ struct director_connection *dir_conn;
+ ARRAY(struct director_host *) hosts;
+
+ t_array_init(&hosts, array_count(&dir->dir_hosts));
+ array_append_array(&hosts, &dir->dir_hosts);
+ array_sort(&hosts, director_host_cmp_p);
+
+ /* first show incoming connections that have no known host yet */
+ array_foreach_elem(&dir->connections, dir_conn) {
+ if (director_connection_get_host(dir_conn) == NULL)
+ doveadm_director_connection_append(dir, dir_conn, NULL, str);
+ }
+
+ /* show other connections and host without connections sorted by host */
+ array_foreach_elem(&hosts, host) {
+ bool have_connections = FALSE;
+
+ array_foreach_elem(&dir->connections, dir_conn) {
+ const struct director_host *conn_host =
+ director_connection_get_host(dir_conn);
+ if (conn_host != host)
+ continue;
+ have_connections = TRUE;
+ doveadm_director_connection_append(dir, dir_conn, host, str);
+ }
+ if (!have_connections)
+ doveadm_director_host_append(dir, host, str);
+ }
+
+ str_append_c(str, '\n');
+ o_stream_nsend(conn->output, str_data(str), str_len(str));
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_director_add(struct doveadm_connection *conn,
+ const char *const *args)
+{
+ struct director_host *host;
+ struct ip_addr ip;
+ in_port_t port = conn->dir->self_port;
+
+ if (args[0] == NULL ||
+ net_addr2ip(args[0], &ip) < 0 ||
+ (args[1] != NULL && net_str2port(args[1], &port) < 0)) {
+ e_error(conn->dir->event, "doveadm sent invalid DIRECTOR-ADD parameters");
+ return DOVEADM_DIRECTOR_CMD_RET_FAIL;
+ }
+
+ if (director_host_lookup(conn->dir, &ip, port) == NULL) {
+ host = director_host_add(conn->dir, &ip, port);
+ director_notify_ring_added(host, conn->dir->self_host, TRUE);
+ }
+ o_stream_nsend(conn->output, "OK\n", 3);
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_director_remove(struct doveadm_connection *conn,
+ const char *const *args)
+{
+ struct director_host *host;
+ struct ip_addr ip;
+ in_port_t port = 0;
+
+ if (args[0] == NULL ||
+ net_addr2ip(args[0], &ip) < 0 ||
+ (args[1] != NULL && net_str2port(args[1], &port) < 0)) {
+ e_error(conn->dir->event, "doveadm sent invalid DIRECTOR-REMOVE parameters");
+ return DOVEADM_DIRECTOR_CMD_RET_FAIL;
+ }
+
+ host = port != 0 ?
+ director_host_lookup(conn->dir, &ip, port) :
+ director_host_lookup_ip(conn->dir, &ip);
+ if (host == NULL) {
+ o_stream_nsend_str(conn->output, "NOTFOUND\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ } else {
+ director_ring_remove(host, conn->dir->self_host);
+ return DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK;
+ }
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_host_set_or_update(struct doveadm_connection *conn,
+ const char *const *args, bool update)
+{
+ struct director *dir = conn->dir;
+ const char *ip_str, *tag = "";
+ struct mail_host *host;
+ struct ip_addr ip;
+ unsigned int vhost_count = UINT_MAX;
+
+ ip_str = args[0];
+ if (ip_str != NULL) {
+ tag = strchr(ip_str, '@');
+ if (tag == NULL)
+ tag = "";
+ else
+ ip_str = t_strdup_until(ip_str, tag++);
+ }
+ if (ip_str == NULL || net_addr2ip(ip_str, &ip) < 0 ||
+ (args[1] != NULL && str_to_uint(args[1], &vhost_count) < 0) ||
+ (args[1] == NULL && update)) {
+ e_error(conn->dir->event, "doveadm sent invalid %s parameters",
+ update ? "HOST-UPDATE" : "HOST-SET");
+ return DOVEADM_DIRECTOR_CMD_RET_FAIL;
+ }
+ if (vhost_count > MAX_VALID_VHOST_COUNT && vhost_count != UINT_MAX) {
+ o_stream_nsend_str(conn->output, "vhost count too large\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ }
+ host = mail_host_lookup(dir->mail_hosts, &ip);
+ if (host == NULL) {
+ if (update) {
+ o_stream_nsend_str(conn->output, "NOTFOUND\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ }
+ host = mail_host_add_ip(dir->mail_hosts, &ip, tag);
+ } else if (tag[0] != '\0' && strcmp(mail_host_get_tag(host), tag) != 0) {
+ o_stream_nsend_str(conn->output, "host tag can't be changed\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ } else if (host->desynced) {
+ o_stream_nsend_str(conn->output,
+ "host is already being updated - try again later\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ }
+ if (vhost_count != UINT_MAX)
+ mail_host_set_vhost_count(host, vhost_count, "doveadm: ");
+ /* NOTE: we don't support changing a tag for an existing host.
+ it needs to be removed first. otherwise it would be a bit ugly to
+ handle. */
+ director_update_host(dir, dir->self_host, NULL, host);
+
+ return DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK;
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_host_set(struct doveadm_connection *conn, const char *const *args)
+{
+ return doveadm_cmd_host_set_or_update(conn, args, FALSE);
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_host_update(struct doveadm_connection *conn, const char *const *args)
+{
+ return doveadm_cmd_host_set_or_update(conn, args, TRUE);
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_host_updown(struct doveadm_connection *conn, bool down,
+ const char *const *args)
+{
+ struct mail_host *host;
+ struct ip_addr ip;
+
+ if (args[0] == NULL || net_addr2ip(args[0], &ip) < 0) {
+ e_error(conn->dir->event, "doveadm sent invalid %s parameters: %s",
+ down ? "HOST-DOWN" : "HOST-UP",
+ args[0] == NULL ? "" : args[0]);
+ return DOVEADM_DIRECTOR_CMD_RET_FAIL;
+ }
+ host = mail_host_lookup(conn->dir->mail_hosts, &ip);
+ if (host == NULL) {
+ o_stream_nsend_str(conn->output, "NOTFOUND\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ }
+ if (host->down == down) {
+ o_stream_nsend_str(conn->output, "OK\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ } else if (host->desynced) {
+ o_stream_nsend_str(conn->output,
+ "host is already being updated - try again later\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ } else {
+ mail_host_set_down(host, down, ioloop_time, "doveadm: ");
+ director_update_host(conn->dir, conn->dir->self_host,
+ NULL, host);
+ return DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK;
+ }
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_host_up(struct doveadm_connection *conn,
+ const char *const *args)
+{
+ return doveadm_cmd_host_updown(conn, FALSE, args);
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_host_down(struct doveadm_connection *conn,
+ const char *const *args)
+{
+ return doveadm_cmd_host_updown(conn, TRUE, args);
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_host_remove(struct doveadm_connection *conn,
+ const char *const *args)
+{
+ struct mail_host *host;
+ struct ip_addr ip;
+
+ if (args[0] == NULL || net_addr2ip(args[0], &ip) < 0) {
+ e_error(conn->dir->event, "doveadm sent invalid HOST-REMOVE parameters");
+ return DOVEADM_DIRECTOR_CMD_RET_FAIL;
+ }
+ host = mail_host_lookup(conn->dir->mail_hosts, &ip);
+ if (host == NULL) {
+ o_stream_nsend_str(conn->output, "NOTFOUND\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ } else {
+ director_remove_host(conn->dir, conn->dir->self_host,
+ NULL, host);
+ return DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK;
+ }
+}
+
+static void
+doveadm_cmd_host_flush_all(struct doveadm_connection *conn)
+{
+ struct mail_host *host;
+ unsigned int total_user_count = 0;
+
+ array_foreach_elem(mail_hosts_get(conn->dir->mail_hosts), host) {
+ total_user_count += host->user_count;
+ director_flush_host(conn->dir, conn->dir->self_host,
+ NULL, host);
+ }
+ e_warning(conn->dir->event,
+ "Flushed all backend hosts with %u users. This is an unsafe "
+ "operation and may cause the same users to end up in multiple backends.",
+ total_user_count);
+ o_stream_nsend(conn->output, "OK\n", 3);
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_host_flush(struct doveadm_connection *conn, const char *const *args)
+{
+ struct mail_host *host;
+ struct ip_addr ip;
+
+ if (args[0] == NULL || args[0][0] == '\0') {
+ doveadm_cmd_host_flush_all(conn);
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ }
+
+ if (net_addr2ip(args[0], &ip) < 0) {
+ e_error(conn->dir->event, "doveadm sent invalid HOST-FLUSH parameters");
+ return DOVEADM_DIRECTOR_CMD_RET_FAIL;
+ }
+ host = mail_host_lookup(conn->dir->mail_hosts, &ip);
+ if (host == NULL) {
+ o_stream_nsend_str(conn->output, "NOTFOUND\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ } else {
+ director_flush_host(conn->dir, conn->dir->self_host,
+ NULL, host);
+ return DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK;
+ }
+}
+
+static void doveadm_reset_cmd_free(struct director_reset_cmd *cmd)
+{
+ DLLIST_REMOVE(&reset_cmds, cmd);
+
+ if (cmd->iter != NULL)
+ director_iterate_users_deinit(&cmd->iter);
+ if (cmd->_conn != NULL)
+ cmd->_conn->reset_cmd = NULL;
+ i_free(cmd);
+}
+
+static bool
+director_host_reset_users(struct director_reset_cmd *cmd,
+ struct mail_host *host)
+{
+ struct director *dir = cmd->dir;
+ struct user *user;
+ struct mail_host *new_host;
+
+ if (dir->users_moving_count >= cmd->max_moving_users)
+ return FALSE;
+
+ if (dir->right != NULL)
+ director_connection_cork(dir->right);
+
+ if (cmd->iter == NULL) {
+ cmd->iter = director_iterate_users_init(dir, FALSE);
+ cmd->users_killed = FALSE;
+ }
+
+ while ((user = director_iterate_users_next(cmd->iter)) != NULL) {
+ if (user->host != host)
+ continue;
+
+ new_host = mail_host_get_by_hash(dir->mail_hosts,
+ user->username_hash,
+ mail_host_get_tag(host));
+ if (new_host != host) T_BEGIN {
+ if (new_host != NULL) {
+ director_move_user(dir, dir->self_host, NULL,
+ user->username_hash, new_host);
+ } else {
+ /* there are no more available backends.
+ kick the user instead. */
+ director_kill_user(dir, dir->self_host, user,
+ user->host->tag, user->host,
+ TRUE);
+ cmd->users_killed = TRUE;
+ }
+ cmd->reset_count++;
+ } T_END;
+ if (dir->users_moving_count >= cmd->max_moving_users)
+ break;
+ }
+ if (user == NULL) {
+ int msecs = timeval_diff_msecs(&ioloop_timeval, &cmd->start_time);
+ e_info(dir->event,
+ "Moved %u users in %u hosts in %u.%03u secs (max parallel=%u)",
+ cmd->reset_count, cmd->hosts_count - cmd->host_start_idx,
+ msecs / 1000, msecs % 1000, cmd->max_moving_users);
+ director_iterate_users_deinit(&cmd->iter);
+ if (cmd->users_killed) {
+ /* no more backends. we already sent kills. now remove
+ the users entirely from the host. */
+ director_flush_host(dir, dir->self_host, NULL, host);
+ }
+ }
+ if (dir->right != NULL)
+ director_connection_uncork(dir->right);
+ return user == NULL;
+}
+
+static bool
+director_reset_cmd_run(struct director_reset_cmd *cmd)
+{
+ struct mail_host *const *hosts;
+ unsigned int count;
+
+ hosts = array_get(mail_hosts_get(cmd->dir->mail_hosts), &count);
+ if (count > cmd->hosts_count)
+ count = cmd->hosts_count;
+ while (cmd->host_idx < count) {
+ if (!director_host_reset_users(cmd, hosts[cmd->host_idx]))
+ return FALSE;
+ cmd->host_idx++;
+ }
+ if (cmd->_conn != NULL) {
+ struct doveadm_connection *conn = cmd->_conn;
+
+ o_stream_nsend(conn->output, "OK\n", 3);
+ if (conn->io == NULL)
+ doveadm_connection_set_io(conn);
+ }
+ doveadm_reset_cmd_free(cmd);
+ return TRUE;
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_host_reset_users(struct doveadm_connection *conn,
+ const char *const *args)
+{
+ struct director_reset_cmd *cmd;
+ struct ip_addr ip;
+ struct mail_host *const *hosts;
+ unsigned int i = 0, count;
+ unsigned int max_moving_users =
+ conn->dir->set->director_max_parallel_moves;
+
+ if (args[0] != NULL && args[1] != NULL &&
+ (str_to_uint(args[1], &max_moving_users) < 0 ||
+ max_moving_users == 0)) {
+ e_error(conn->dir->event, "doveadm sent invalid HOST-RESET-USERS parameters");
+ return DOVEADM_DIRECTOR_CMD_RET_FAIL;
+ }
+
+ hosts = array_get(mail_hosts_get(conn->dir->mail_hosts), &count);
+ if (args[0] != NULL && args[0][0] != '\0') {
+ if (net_addr2ip(args[0], &ip) < 0) {
+ e_error(conn->dir->event, "doveadm sent invalid HOST-RESET-USERS ip: %s",
+ args[0]);
+ return DOVEADM_DIRECTOR_CMD_RET_FAIL;
+ }
+
+ for (i = 0; i < count; i++) {
+ if (net_ip_compare(&hosts[i]->ip, &ip))
+ break;
+ }
+ if (i == count) {
+ o_stream_nsend_str(conn->output, "NOTFOUND\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ }
+ count = i+1;
+ }
+
+ conn->reset_cmd = cmd = i_new(struct director_reset_cmd, 1);
+ cmd->dir = conn->dir;
+ cmd->_conn = conn;
+ cmd->max_moving_users = max_moving_users;
+ cmd->host_start_idx = i;
+ cmd->host_idx = i;
+ cmd->hosts_count = count;
+ cmd->start_time = ioloop_timeval;
+ DLLIST_PREPEND(&reset_cmds, cmd);
+
+ if (!director_reset_cmd_run(cmd)) {
+ /* we still have work to do. don't handle any more doveadm
+ input until we're finished. */
+ io_remove(&conn->io);
+ return DOVEADM_DIRECTOR_CMD_RET_UNFINISHED;
+ }
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_user_lookup(struct doveadm_connection *conn,
+ const char *const *args)
+{
+ struct user *user;
+ struct mail_host *host;
+ const char *username, *tag;
+ unsigned int username_hash;
+ struct mail_tag *mail_tag;
+ string_t *str = t_str_new(256);
+
+ if (args[0] == NULL) {
+ username = "";
+ tag = "";
+ } else {
+ username = args[0];
+ tag = args[1] != NULL ? args[1] : "";
+ }
+ if (str_to_uint(username, &username_hash) < 0) {
+ if (!director_get_username_hash(conn->dir,
+ username, &username_hash)) {
+ o_stream_nsend_str(conn->output, "TRYAGAIN\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ }
+ }
+
+ /* get user's current host */
+ mail_tag = mail_tag_find(conn->dir->mail_hosts, tag);
+ user = mail_tag == NULL ? NULL :
+ user_directory_lookup(mail_tag->users, username_hash);
+ if (user == NULL)
+ str_append(str, "\t0");
+ else {
+ str_printfa(str, "%s\t%u", user->host->ip_str,
+ user->timestamp +
+ conn->dir->set->director_user_expire);
+ }
+
+ /* get host if it wasn't in user directory */
+ host = mail_host_get_by_hash(conn->dir->mail_hosts, username_hash, tag);
+ if (host == NULL)
+ str_append(str, "\t");
+ else
+ str_printfa(str, "\t%s", host->ip_str);
+
+ /* get host with default configuration */
+ host = mail_host_get_by_hash(conn->dir->orig_config_hosts,
+ username_hash, tag);
+ if (host == NULL)
+ str_append(str, "\t\n");
+ else
+ str_printfa(str, "\t%s\n", host->ip_str);
+ o_stream_nsend(conn->output, str_data(str), str_len(str));
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_user_list(struct doveadm_connection *conn, const char *const *args)
+{
+ struct director_user_iter *iter;
+ struct user *user;
+ struct ip_addr ip;
+
+ if (args[0] != NULL && args[0][0] != '\0') {
+ if (net_addr2ip(args[0], &ip) < 0) {
+ e_error(conn->dir->event, "doveadm sent invalid USER-LIST parameters");
+ return DOVEADM_DIRECTOR_CMD_RET_FAIL;
+ }
+ } else {
+ ip.family = 0;
+ }
+
+ iter = director_iterate_users_init(conn->dir, FALSE);
+ while ((user = director_iterate_users_next(iter)) != NULL) {
+ if (ip.family == 0 ||
+ net_ip_compare(&ip, &user->host->ip)) T_BEGIN {
+ unsigned int expire_time = user->timestamp +
+ conn->dir->set->director_user_expire;
+
+ o_stream_nsend_str(conn->output, t_strdup_printf(
+ "%u\t%u\t%s\n",
+ user->username_hash, expire_time,
+ user->host->ip_str));
+ } T_END;
+ }
+ director_iterate_users_deinit(&iter);
+ o_stream_nsend(conn->output, "\n", 1);
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_user_move(struct doveadm_connection *conn, const char *const *args)
+{
+ unsigned int username_hash;
+ struct user *user;
+ struct mail_host *host;
+ struct ip_addr ip;
+
+ if (args[0] == NULL || args[1] == NULL ||
+ net_addr2ip(args[1], &ip) < 0) {
+ e_error(conn->dir->event, "doveadm sent invalid USER-MOVE parameters");
+ return DOVEADM_DIRECTOR_CMD_RET_FAIL;
+ }
+ host = mail_host_lookup(conn->dir->mail_hosts, &ip);
+ if (host == NULL) {
+ o_stream_nsend_str(conn->output, "NOTFOUND\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ }
+
+ if (str_to_uint(args[0], &username_hash) < 0) {
+ if (!director_get_username_hash(conn->dir,
+ args[0], &username_hash)) {
+ o_stream_nsend_str(conn->output, "TRYAGAIN\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ }
+ }
+
+ user = user_directory_lookup(host->tag->users, username_hash);
+ if (user != NULL && USER_IS_BEING_KILLED(user)) {
+ o_stream_nsend_str(conn->output, "TRYAGAIN\n");
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ }
+
+ if (user == NULL || user->host != host) {
+ director_move_user(conn->dir, conn->dir->self_host, NULL,
+ username_hash, host);
+ } else {
+ /* already the correct host. reset the user's timeout. */
+ user_directory_refresh(host->tag->users, user);
+ director_update_user(conn->dir, conn->dir->self_host, user);
+ }
+ o_stream_nsend(conn->output, "OK\n", 3);
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+}
+
+static void doveadm_kick_cmd_free(struct director_kick_cmd **_cmd)
+{
+ struct director_kick_cmd *cmd = *_cmd;
+ *_cmd = NULL;
+
+ if (cmd->_conn != NULL)
+ cmd->_conn->kick_cmd = NULL;
+
+ i_free(cmd->field);
+ i_free(cmd->value);
+ i_free(cmd->mask);
+ i_free(cmd);
+}
+
+static bool doveadm_cmd_user_kick_run(struct director_kick_cmd *cmd)
+{
+ if (cmd->dir->users_kicking_count >=
+ cmd->dir->set->director_max_parallel_kicks)
+ return FALSE;
+
+ if (cmd->alt)
+ director_kick_user_alt(cmd->dir, cmd->dir->self_host,
+ NULL, cmd->field, cmd->value);
+ else
+ director_kick_user(cmd->dir, cmd->dir->self_host,
+ NULL, cmd->mask);
+ if (cmd->_conn != NULL) {
+ struct doveadm_connection *conn = cmd->_conn;
+
+ o_stream_nsend(conn->output, "OK\n", 3);
+ if (conn->io == NULL)
+ doveadm_connection_set_io(conn);
+ }
+ DLLIST_REMOVE(&kick_cmds, cmd);
+ doveadm_kick_cmd_free(&cmd);
+ return TRUE;
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_user_kick(struct doveadm_connection *conn, const char *const *args)
+{
+ struct director_kick_cmd *cmd;
+ bool wait = TRUE;
+
+ if (args[0] == NULL) {
+ e_error(conn->dir->event, "doveadm sent invalid USER-KICK parameters");
+ return DOVEADM_DIRECTOR_CMD_RET_FAIL;
+ }
+
+ if (null_strcmp(args[1], "nowait") == 0)
+ wait = FALSE;
+
+ cmd = conn->kick_cmd = i_new(struct director_kick_cmd, 1);
+ cmd->alt = FALSE;
+ cmd->mask = i_strdup(args[0]);
+ cmd->dir = conn->dir;
+ cmd->_conn = conn;
+
+ DLLIST_PREPEND(&kick_cmds, cmd);
+
+ if (!doveadm_cmd_user_kick_run(cmd)) {
+ if (wait) {
+ /* we have work to do, wait until it finishes */
+ io_remove(&conn->io);
+ return DOVEADM_DIRECTOR_CMD_RET_UNFINISHED;
+ } else {
+ o_stream_nsend_str(conn->output, "TRYAGAIN\n");
+ /* need to remove it here */
+ DLLIST_REMOVE(&kick_cmds, cmd);
+ doveadm_kick_cmd_free(&cmd);
+ }
+ }
+
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_cmd_user_kick_alt(struct doveadm_connection *conn, const char *const *args)
+{
+ bool wait = TRUE;
+ struct director_kick_cmd *cmd;
+
+ if (str_array_length(args) < 2) {
+ e_error(conn->dir->event, "doveadm sent invalid USER-KICK-ALT parameters");
+ return DOVEADM_DIRECTOR_CMD_RET_FAIL;
+ }
+
+ if (null_strcmp(args[2], "nowait") == 0)
+ wait = FALSE;
+
+ conn->kick_cmd = cmd = i_new(struct director_kick_cmd, 1);
+ cmd->alt = TRUE;
+ cmd->field = i_strdup(args[0]);
+ cmd->value = i_strdup(args[1]);
+ cmd->dir = conn->dir;
+ cmd->_conn = conn;
+
+ DLLIST_PREPEND(&kick_cmds, cmd);
+
+ if (!doveadm_cmd_user_kick_run(cmd)) {
+ if (wait) {
+ /* we have work to do, wait until it finishes */
+ io_remove(&conn->io);
+ return DOVEADM_DIRECTOR_CMD_RET_UNFINISHED;
+ } else {
+ o_stream_nsend_str(conn->output, "TRYAGAIN\n");
+ DLLIST_REMOVE(&kick_cmds, cmd);
+ doveadm_kick_cmd_free(&cmd);
+ }
+ }
+
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+}
+
+struct {
+ const char *name;
+ enum doveadm_director_cmd_ret (*cmd)
+ (struct doveadm_connection *conn, const char *const *args);
+ enum doveadm_director_cmd_flag flags;
+} doveadm_director_commands[] = {
+ { "HOST-LIST", doveadm_cmd_host_list, 0 },
+ { "HOST-LIST-REMOVED", doveadm_cmd_host_list_removed, 0 },
+ { "DIRECTOR-LIST", doveadm_cmd_director_list, 0 },
+ { "DIRECTOR-ADD", doveadm_cmd_director_add, DOVEADM_DIRECTOR_CMD_FLAG_PRE_RING_SYNC },
+ { "DIRECTOR-REMOVE", doveadm_cmd_director_remove, DOVEADM_DIRECTOR_CMD_FLAG_PRE_RING_SYNC },
+ { "HOST-SET", doveadm_cmd_host_set, DOVEADM_DIRECTOR_CMD_FLAG_PRE_RING_SYNC },
+ { "HOST-UPDATE", doveadm_cmd_host_update, DOVEADM_DIRECTOR_CMD_FLAG_PRE_RING_SYNC },
+ { "HOST-UP", doveadm_cmd_host_up, DOVEADM_DIRECTOR_CMD_FLAG_PRE_RING_SYNC },
+ { "HOST-DOWN", doveadm_cmd_host_down, DOVEADM_DIRECTOR_CMD_FLAG_PRE_RING_SYNC },
+ { "HOST-REMOVE", doveadm_cmd_host_remove, DOVEADM_DIRECTOR_CMD_FLAG_PRE_RING_SYNC },
+ { "HOST-FLUSH", doveadm_cmd_host_flush, DOVEADM_DIRECTOR_CMD_FLAG_PRE_RING_SYNC },
+ { "HOST-RESET-USERS", doveadm_cmd_host_reset_users, 0 },
+ { "USER-LOOKUP", doveadm_cmd_user_lookup, 0 },
+ { "USER-LIST", doveadm_cmd_user_list, 0 },
+ { "USER-MOVE", doveadm_cmd_user_move, 0 },
+ { "USER-KICK", doveadm_cmd_user_kick, 0 },
+ { "USER-KICK-ALT", doveadm_cmd_user_kick_alt, 0 },
+};
+
+static void
+doveadm_connection_ring_sync_timeout(struct doveadm_connection *conn)
+{
+ doveadm_connection_ring_sync_list_move(conn);
+ o_stream_nsend_str(conn->output, "Ring sync timed out\n");
+
+ i_assert(conn->io == NULL);
+ doveadm_connection_set_io(conn);
+ io_set_pending(conn->io);
+
+ i_free_and_null(conn->cmd_pending_args);
+}
+
+static void
+doveadm_connection_set_ring_sync_callback(struct doveadm_connection *conn,
+ doveadm_connection_ring_sync_callback_t *callback)
+{
+ i_assert(conn->ring_sync_callback == NULL);
+ i_assert(conn->to_ring_sync_abort == NULL);
+
+ conn->ring_sync_callback = callback;
+ io_remove(&conn->io);
+ DLLIST_REMOVE(&doveadm_connections, conn);
+ DLLIST_PREPEND(&doveadm_ring_sync_pending_connections, conn);
+ conn->to_ring_sync_abort =
+ timeout_add(DOVEADM_CONNECTION_RING_SYNC_TIMEOUT_MSECS,
+ doveadm_connection_ring_sync_timeout, conn);
+}
+
+static void doveadm_connection_ret_ok(struct doveadm_connection *conn)
+{
+ o_stream_nsend(conn->output, "OK\n", 3);
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_connection_cmd_run(struct doveadm_connection *conn,
+ const char *const *args, unsigned int i)
+{
+ enum doveadm_director_cmd_ret ret;
+
+ if ((doveadm_director_commands[i].flags &
+ DOVEADM_DIRECTOR_CMD_FLAG_PRE_RING_SYNC) != 0 &&
+ !conn->dir->ring_synced) {
+ /* wait for ring to be synced before running the command */
+ conn->cmd_pending_args = p_strarray_dup(default_pool, args);
+ conn->cmd_pending_idx = i;
+ doveadm_connection_set_ring_sync_callback(conn,
+ doveadm_connection_cmd_run_synced);
+ return DOVEADM_DIRECTOR_CMD_RET_UNFINISHED;
+ }
+
+ ret = doveadm_director_commands[i].cmd(conn, args);
+ if (ret != DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK)
+ return ret;
+ /* Delay sending OK until ring is synced. This way doveadm will know
+ whether the call actually succeeded or not. */
+ if (conn->dir->ring_synced) {
+ /* director is alone */
+ i_assert(conn->dir->right == NULL && conn->dir->left == NULL);
+ o_stream_nsend(conn->output, "OK\n", 3);
+ return DOVEADM_DIRECTOR_CMD_RET_OK;
+ }
+ doveadm_connection_set_ring_sync_callback(conn, doveadm_connection_ret_ok);
+ return DOVEADM_DIRECTOR_CMD_RET_RING_SYNC_OK;
+}
+
+static void doveadm_connection_cmd_run_synced(struct doveadm_connection *conn)
+{
+ const char **args = conn->cmd_pending_args;
+
+ conn->cmd_pending_args = NULL;
+ (void)doveadm_connection_cmd_run(conn, args, conn->cmd_pending_idx);
+ i_free(args);
+}
+
+static enum doveadm_director_cmd_ret
+doveadm_connection_cmd(struct doveadm_connection *conn, const char *line)
+{
+ const char *cmd, *const *args;
+
+ args = t_strsplit_tabescaped(line);
+ if (args[0] == NULL) {
+ e_error(conn->dir->event, "doveadm sent empty command line");
+ return DOVEADM_DIRECTOR_CMD_RET_FAIL;
+ }
+ cmd = args[0];
+ args++;
+
+ for (unsigned int i = 0; i < N_ELEMENTS(doveadm_director_commands); i++) {
+ if (strcmp(doveadm_director_commands[i].name, cmd) == 0)
+ return doveadm_connection_cmd_run(conn, args, i);
+ }
+ e_error(conn->dir->event, "doveadm sent unknown command: %s", line);
+ return DOVEADM_DIRECTOR_CMD_RET_FAIL;
+}
+
+static void doveadm_connection_input(struct doveadm_connection *conn)
+{
+ const char *line;
+ enum doveadm_director_cmd_ret ret = DOVEADM_DIRECTOR_CMD_RET_OK;
+
+ if (!conn->handshaked) {
+ if ((line = i_stream_read_next_line(conn->input)) == NULL) {
+ if (conn->input->eof || conn->input->stream_errno != 0)
+ doveadm_connection_deinit(&conn);
+ return;
+ }
+
+ if (!version_string_verify(line, "director-doveadm",
+ DOVEADM_PROTOCOL_VERSION_MAJOR)) {
+ e_error(conn->dir->event, "doveadm not compatible with this server "
+ "(mixed old and new binaries?)");
+ doveadm_connection_deinit(&conn);
+ return;
+ }
+ conn->handshaked = TRUE;
+ }
+
+ while ((line = i_stream_read_next_line(conn->input)) != NULL &&
+ ret == DOVEADM_DIRECTOR_CMD_RET_OK) {
+ T_BEGIN {
+ ret = doveadm_connection_cmd(conn, line);
+ } T_END;
+ }
+ /* Delay deinit if io was removed, even if the client
+ already disconnected. */
+ if (conn->io != NULL &&
+ (conn->input->eof || conn->input->stream_errno != 0 ||
+ ret == DOVEADM_DIRECTOR_CMD_RET_FAIL))
+ doveadm_connection_deinit(&conn);
+}
+
+static void doveadm_connection_set_io(struct doveadm_connection *conn)
+{
+ conn->io = io_add(conn->fd, IO_READ, doveadm_connection_input, conn);
+}
+
+struct doveadm_connection *
+doveadm_connection_init(struct director *dir, int fd)
+{
+ struct doveadm_connection *conn;
+
+ conn = i_new(struct doveadm_connection, 1);
+ conn->fd = fd;
+ conn->dir = dir;
+ conn->input = i_stream_create_fd(conn->fd, 1024);
+ conn->output = o_stream_create_fd(conn->fd, SIZE_MAX);
+ o_stream_set_no_error_handling(conn->output, TRUE);
+ doveadm_connection_set_io(conn);
+ o_stream_nsend_str(conn->output, DOVEADM_HANDSHAKE);
+
+ DLLIST_PREPEND(&doveadm_connections, conn);
+ return conn;
+}
+
+static void doveadm_connection_deinit(struct doveadm_connection **_conn)
+{
+ struct doveadm_connection *conn = *_conn;
+
+ *_conn = NULL;
+
+ i_assert(conn->to_ring_sync_abort == NULL);
+
+ if (conn->reset_cmd != NULL) {
+ /* finish the move even if doveadm disconnected */
+ conn->reset_cmd->_conn = NULL;
+ }
+ if (conn->kick_cmd != NULL) {
+ /* finish the kick even if doveadm disconnected */
+ conn->kick_cmd->_conn = NULL;
+ }
+
+ DLLIST_REMOVE(&doveadm_connections, conn);
+ io_remove(&conn->io);
+ i_stream_unref(&conn->input);
+ o_stream_unref(&conn->output);
+ if (close(conn->fd) < 0)
+ e_error(conn->dir->event, "close(doveadm connection) failed: %m");
+ i_free(conn);
+
+ master_service_client_connection_destroyed(master_service);
+}
+
+static void
+doveadm_connection_ring_sync_list_move(struct doveadm_connection *conn)
+{
+ timeout_remove(&conn->to_ring_sync_abort);
+ DLLIST_REMOVE(&doveadm_ring_sync_pending_connections, conn);
+ DLLIST_PREPEND(&doveadm_connections, conn);
+}
+
+void doveadm_connections_deinit(void)
+{
+ while (reset_cmds != NULL)
+ doveadm_reset_cmd_free(reset_cmds);
+
+ unsigned int pending_count = 0;
+ while (doveadm_ring_sync_pending_connections != NULL) {
+ doveadm_connection_ring_sync_list_move(doveadm_ring_sync_pending_connections);
+ pending_count++;
+ }
+ if (pending_count > 0)
+ i_warning("Shutting down while %u doveadm connections were waiting for ring sync", pending_count);
+ while (doveadm_connections != NULL) {
+ struct doveadm_connection *conn = doveadm_connections;
+
+ doveadm_connection_deinit(&conn);
+ }
+}
+
+void doveadm_connections_kick_callback(struct director *dir ATTR_UNUSED)
+{
+ while(kick_cmds != NULL)
+ if (!doveadm_cmd_user_kick_run(kick_cmds))
+ break;
+}
+
+static void doveadm_connections_continue_reset_cmds(void)
+{
+ while (reset_cmds != NULL) {
+ if (!director_reset_cmd_run(reset_cmds))
+ break;
+ }
+}
+
+void doveadm_connections_ring_synced(struct director *dir)
+{
+ /* Note that it's not possible for a single connection to be multiple
+ times in doveadm_ring_sync_pending_connections. This is prevented
+ by removing input IO from the connection whenever it's added to the
+ list. */
+ while (doveadm_ring_sync_pending_connections != NULL &&
+ dir->ring_synced) {
+ struct doveadm_connection *conn =
+ doveadm_ring_sync_pending_connections;
+ doveadm_connection_ring_sync_callback_t *callback =
+ conn->ring_sync_callback;
+
+ conn->ring_sync_callback = NULL;
+ doveadm_connection_ring_sync_list_move(conn);
+ doveadm_connection_set_io(conn);
+ io_set_pending(conn->io);
+ callback(conn);
+ }
+ if (dir->ring_synced)
+ doveadm_connections_continue_reset_cmds();
+}