diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 09:51:24 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-28 09:51:24 +0000 |
commit | f7548d6d28c313cf80e6f3ef89aed16a19815df1 (patch) | |
tree | a3f6f2a3f247293bee59ecd28e8cd8ceb6ca064a /src/director/director.c | |
parent | Initial commit. (diff) | |
download | dovecot-f7548d6d28c313cf80e6f3ef89aed16a19815df1.tar.xz dovecot-f7548d6d28c313cf80e6f3ef89aed16a19815df1.zip |
Adding upstream version 1:2.3.19.1+dfsg1.upstream/1%2.3.19.1+dfsg1upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/director/director.c')
-rw-r--r-- | src/director/director.c | 1589 |
1 files changed, 1589 insertions, 0 deletions
diff --git a/src/director/director.c b/src/director/director.c new file mode 100644 index 0000000..317cff1 --- /dev/null +++ b/src/director/director.c @@ -0,0 +1,1589 @@ +/* Copyright (c) 2010-2018 Dovecot authors, see the included COPYING file */ + +#include "lib.h" +#include "ioloop.h" +#include "array.h" +#include "str.h" +#include "strescape.h" +#include "log-throttle.h" +#include "ipc-client.h" +#include "program-client.h" +#include "var-expand.h" +#include "istream.h" +#include "ostream.h" +#include "iostream-temp.h" +#include "mail-user-hash.h" +#include "user-directory.h" +#include "mail-host.h" +#include "director-host.h" +#include "director-connection.h" +#include "director.h" + +#define DIRECTOR_IPC_PROXY_PATH "ipc" +#define DIRECTOR_DNS_SOCKET_PATH "dns-client" +#define DIRECTOR_RECONNECT_RETRY_SECS 60 +#define DIRECTOR_RECONNECT_TIMEOUT_MSECS (30*1000) +#define DIRECTOR_USER_MOVE_TIMEOUT_MSECS (30*1000) +#define DIRECTOR_SYNC_TIMEOUT_MSECS (5*1000) +#define DIRECTOR_RING_MIN_WAIT_SECS 20 +#define DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS 1000 +#define DIRECTOR_DELAYED_DIR_REMOVE_MSECS (1000*30) + +const char *user_kill_state_names[USER_KILL_STATE_DELAY+1] = { + "none", + "killing", + "notify-received", + "waiting-for-notify", + "waiting-for-everyone", + "flushing", + "delay", +}; + +static struct event_category event_category_director = { + .name = "director", +}; + +static struct log_throttle *user_move_throttle; +static struct log_throttle *user_kill_fail_throttle; + +static void director_hosts_purge_removed(struct director *dir); + +static const struct log_throttle_settings director_log_throttle_settings = { + .throttle_at_max_per_interval = 100, + .unthrottle_at_max_per_interval = 2, +}; + +static void +director_user_kill_finish_delayed(struct director *dir, struct user *user, + bool skip_delay); + +static bool director_is_self_ip_set(struct director *dir) +{ + if (net_ip_compare(&dir->self_ip, &net_ip4_any)) + return FALSE; + + if (net_ip_compare(&dir->self_ip, &net_ip6_any)) + return FALSE; + + return TRUE; +} + +static void director_find_self_ip(struct director *dir) +{ + struct director_host *const *hosts; + unsigned int i, count; + + hosts = array_get(&dir->dir_hosts, &count); + for (i = 0; i < count; i++) { + if (net_try_bind(&hosts[i]->ip) == 0) { + dir->self_ip = hosts[i]->ip; + return; + } + } + i_fatal("director_servers doesn't list ourself"); +} + +void director_find_self(struct director *dir) +{ + if (dir->self_host != NULL) + return; + + if (!director_is_self_ip_set(dir)) + director_find_self_ip(dir); + + dir->self_host = director_host_lookup(dir, &dir->self_ip, + dir->self_port); + if (dir->self_host == NULL) { + i_fatal("director_servers doesn't list ourself (%s:%u)", + net_ip2addr(&dir->self_ip), dir->self_port); + } + dir->self_host->self = TRUE; +} + +static unsigned int director_find_self_idx(struct director *dir) +{ + struct director_host *const *hosts; + unsigned int i, count; + + i_assert(dir->self_host != NULL); + + hosts = array_get(&dir->dir_hosts, &count); + for (i = 0; i < count; i++) { + if (hosts[i] == dir->self_host) + return i; + } + i_unreached(); +} + +static bool +director_has_outgoing_connection(struct director *dir, + struct director_host *host) +{ + struct director_connection *conn; + + array_foreach_elem(&dir->connections, conn) { + if (director_connection_get_host(conn) == host && + !director_connection_is_incoming(conn)) + return TRUE; + } + return FALSE; +} + +static void +director_log_connect(struct director *dir, struct director_host *host, + const char *reason) +{ + string_t *str = t_str_new(128); + + if (host->last_network_failure > 0) { + str_printfa(str, ", last network failure %ds ago", + (int)(ioloop_time - host->last_network_failure)); + } + if (host->last_protocol_failure > 0) { + str_printfa(str, ", last protocol failure %ds ago", + (int)(ioloop_time - host->last_protocol_failure)); + } + e_info(dir->event, "Connecting to %s:%u (as %s%s): %s", + host->ip_str, host->port, + net_ip2addr(&dir->self_ip), str_c(str), reason); +} + +int director_connect_host(struct director *dir, struct director_host *host, + const char *reason) +{ + in_port_t port; + int fd; + + if (director_has_outgoing_connection(dir, host)) + return 0; + + director_log_connect(dir, host, reason); + port = dir->test_port != 0 ? dir->test_port : host->port; + fd = net_connect_ip(&host->ip, port, &dir->self_ip); + if (fd == -1) { + host->last_network_failure = ioloop_time; + e_error(dir->event, "connect(%s) failed: %m", host->name); + return -1; + } + /* Reset timestamp so that director_connect() won't skip this host + while we're still trying to connect to it */ + host->last_network_failure = 0; + + (void)director_connection_init_out(dir, fd, host); + return 0; +} + +static struct director_host * +director_get_preferred_right_host(struct director *dir) +{ + struct director_host *const *hosts, *host; + unsigned int i, count, self_idx; + + hosts = array_get(&dir->dir_hosts, &count); + if (count == 1) { + /* self */ + return NULL; + } + + self_idx = director_find_self_idx(dir); + for (i = 0; i < count; i++) { + host = hosts[(self_idx + i + 1) % count]; + if (!host->removed) + return host; + } + /* self, with some removed hosts */ + return NULL; +} + +static void director_quick_reconnect_retry(struct director *dir) +{ + director_connect(dir, "Alone in director ring - trying to connect to others"); +} + +static bool director_wait_for_others(struct director *dir) +{ + struct director_host *host; + + /* don't assume we're alone until we've attempted to connect + to others for a while */ + if (dir->ring_first_alone != 0 && + ioloop_time - dir->ring_first_alone > DIRECTOR_RING_MIN_WAIT_SECS) + return FALSE; + + if (dir->ring_first_alone == 0) + dir->ring_first_alone = ioloop_time; + /* reset all failures and try again */ + array_foreach_elem(&dir->dir_hosts, host) { + host->last_network_failure = 0; + host->last_protocol_failure = 0; + } + timeout_remove(&dir->to_reconnect); + dir->to_reconnect = timeout_add(DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS, + director_quick_reconnect_retry, dir); + return TRUE; +} + +void director_connect(struct director *dir, const char *reason) +{ + struct director_host *const *hosts; + unsigned int i, count, self_idx; + + self_idx = director_find_self_idx(dir); + + /* try to connect to first working server on our right side. + the left side is supposed to connect to us. */ + hosts = array_get(&dir->dir_hosts, &count); + for (i = 1; i < count; i++) { + unsigned int idx = (self_idx + i) % count; + + if (hosts[idx]->removed) + continue; + + if (hosts[idx]->last_network_failure + + DIRECTOR_RECONNECT_RETRY_SECS > ioloop_time) { + /* connection failed recently, don't try retrying here */ + continue; + } + if (hosts[idx]->last_protocol_failure + + DIRECTOR_PROTOCOL_FAILURE_RETRY_SECS > ioloop_time) { + /* the director recently sent invalid protocol data, + don't try retrying yet */ + continue; + } + + if (director_connect_host(dir, hosts[idx], reason) == 0) { + /* success */ + return; + } + } + + if (count > 1 && director_wait_for_others(dir)) + return; + + /* we're the only one */ + if (count > 1) { + e_warning(dir->event, + "director: Couldn't connect to right side, " + "we must be the only director left"); + } + if (dir->left != NULL) { + /* since we couldn't connect to it, + it must have failed recently */ + e_warning(dir->event, + "director: Assuming %s is dead, disconnecting", + director_connection_get_name(dir->left)); + director_connection_deinit(&dir->left, + "This connection is dead?"); + } + dir->ring_min_version = DIRECTOR_VERSION_MINOR; + if (!dir->ring_handshaked) + director_set_ring_handshaked(dir); + else if (!dir->ring_synced) + director_set_ring_synced(dir); +} + +void director_set_ring_handshaked(struct director *dir) +{ + i_assert(!dir->ring_handshaked); + + timeout_remove(&dir->to_handshake_warning); + if (dir->ring_handshake_warning_sent) { + e_warning(dir->event, + "Directors have been connected, " + "continuing delayed requests"); + dir->ring_handshake_warning_sent = FALSE; + } + e_debug(dir->event, "Director ring handshaked"); + + dir->ring_handshaked = TRUE; + director_set_ring_synced(dir); +} + +static void director_reconnect_timeout(struct director *dir) +{ + struct director_host *cur_host, *preferred_host = + director_get_preferred_right_host(dir); + + cur_host = dir->right == NULL ? NULL : + director_connection_get_host(dir->right); + + if (preferred_host == NULL) { + /* all directors have been removed, try again later */ + } else if (cur_host != preferred_host) { + (void)director_connect_host(dir, preferred_host, + "Reconnect attempt to preferred director"); + } else { + /* the connection hasn't finished sync yet. + keep this timeout for now. */ + } +} + +void director_set_ring_synced(struct director *dir) +{ + struct director_host *host; + + i_assert(!dir->ring_synced); + i_assert((dir->left != NULL && dir->right != NULL) || + (dir->left == NULL && dir->right == NULL)); + + timeout_remove(&dir->to_handshake_warning); + if (dir->ring_handshake_warning_sent) { + e_warning(dir->event, + "Ring is synced, continuing delayed requests " + "(syncing took %d secs, hosts_hash=%u)", + (int)(ioloop_time - dir->ring_last_sync_time), + mail_hosts_hash(dir->mail_hosts)); + dir->ring_handshake_warning_sent = FALSE; + } + + host = dir->right == NULL ? NULL : + director_connection_get_host(dir->right); + + timeout_remove(&dir->to_reconnect); + if (host != director_get_preferred_right_host(dir)) { + /* try to reconnect to preferred host later */ + dir->to_reconnect = + timeout_add(DIRECTOR_RECONNECT_TIMEOUT_MSECS, + director_reconnect_timeout, dir); + } + + if (dir->left != NULL) + director_connection_set_synced(dir->left, TRUE); + if (dir->right != NULL) + director_connection_set_synced(dir->right, TRUE); + timeout_remove(&dir->to_sync); + dir->ring_synced = TRUE; + dir->ring_last_sync_time = ioloop_time; + /* If there are any director hosts still marked as "removed", we can + safely remove those now. The entire director cluster knows about the + removal now. */ + director_hosts_purge_removed(dir); + mail_hosts_set_synced(dir->mail_hosts); + director_set_state_changed(dir); +} + +void director_sync_send(struct director *dir, struct director_host *host, + uint32_t seq, unsigned int minor_version, + unsigned int timestamp, unsigned int hosts_hash) +{ + string_t *str; + + if (host == dir->self_host) { + dir->last_sync_sent_ring_change_counter = dir->ring_change_counter; + dir->last_sync_start_time = ioloop_timeval; + } + + str = t_str_new(128); + str_printfa(str, "SYNC\t%s\t%u\t%u", + host->ip_str, host->port, seq); + if (minor_version > 0 && + director_connection_get_minor_version(dir->right) > 0) { + /* only minor_version>0 supports extra parameters */ + str_printfa(str, "\t%u\t%u\t%u", minor_version, + timestamp, hosts_hash); + } + str_append_c(str, '\n'); + director_connection_send(dir->right, str_c(str)); + + /* ping our connections in case either of them are hanging. + if they are, we want to know it fast. */ + if (dir->left != NULL) + director_connection_ping(dir->left); + director_connection_ping(dir->right); +} + +static bool +director_has_any_outgoing_connections(struct director *dir) +{ + struct director_connection *conn; + + array_foreach_elem(&dir->connections, conn) { + if (!director_connection_is_incoming(conn)) + return TRUE; + } + return FALSE; +} + +bool director_resend_sync(struct director *dir) +{ + if (dir->ring_synced) { + /* everything ok, no need to do anything */ + return FALSE; + } + + if (dir->right == NULL) { + /* right side connection is missing. make sure we're not + hanging due to some bug. */ + if (dir->to_reconnect == NULL && + !director_has_any_outgoing_connections(dir)) { + e_warning(dir->event, + "Right side connection is unexpectedly lost, reconnecting"); + director_connect(dir, "Right side connection lost"); + } + } else if (dir->left != NULL) { + /* send a new SYNC in case the previous one got dropped */ + dir->self_host->last_sync_timestamp = ioloop_time; + director_sync_send(dir, dir->self_host, dir->sync_seq, + DIRECTOR_VERSION_MINOR, ioloop_time, + mail_hosts_hash(dir->mail_hosts)); + if (dir->to_sync != NULL) + timeout_reset(dir->to_sync); + return TRUE; + } + return FALSE; +} + +static void director_sync_timeout(struct director *dir) +{ + i_assert(!dir->ring_synced); + + if (director_resend_sync(dir)) + e_error(dir->event, "Ring SYNC seq=%u appears to have got lost, resending", dir->sync_seq); +} + +void director_set_ring_unsynced(struct director *dir) +{ + if (dir->ring_synced) { + dir->ring_synced = FALSE; + dir->ring_last_sync_time = ioloop_time; + } + + if (dir->to_sync == NULL) { + dir->to_sync = timeout_add(DIRECTOR_SYNC_TIMEOUT_MSECS, + director_sync_timeout, dir); + } else { + timeout_reset(dir->to_sync); + } +} + +static void director_sync(struct director *dir) +{ + /* we're synced again when we receive this SYNC back */ + dir->sync_seq++; + if (dir->right == NULL && dir->left == NULL) { + /* we're alone. if we're already synced, + don't become unsynced. */ + return; + } + director_set_ring_unsynced(dir); + + if (dir->sync_frozen) { + dir->sync_pending = TRUE; + return; + } + if (dir->right == NULL) { + i_assert(!dir->ring_synced || + (dir->left == NULL && dir->right == NULL)); + e_debug(dir->event, "Ring is desynced (seq=%u, no right connection)", + dir->sync_seq); + return; + } + + e_debug(dir->event, "Ring is desynced (seq=%u, sending SYNC to %s)", + dir->sync_seq, dir->right == NULL ? "(nowhere)" : + director_connection_get_name(dir->right)); + + /* send PINGs to our connections more rapidly until we've synced again. + if the connection has actually died, we don't need to wait (and + delay requests) for as long to detect it */ + if (dir->left != NULL) + director_connection_set_synced(dir->left, FALSE); + director_connection_set_synced(dir->right, FALSE); + director_sync_send(dir, dir->self_host, dir->sync_seq, + DIRECTOR_VERSION_MINOR, ioloop_time, + mail_hosts_hash(dir->mail_hosts)); +} + +void director_sync_freeze(struct director *dir) +{ + struct director_connection *conn; + + i_assert(!dir->sync_frozen); + i_assert(!dir->sync_pending); + + array_foreach_elem(&dir->connections, conn) + director_connection_cork(conn); + dir->sync_frozen = TRUE; +} + +void director_sync_thaw(struct director *dir) +{ + struct director_connection *conn; + + i_assert(dir->sync_frozen); + + dir->sync_frozen = FALSE; + if (dir->sync_pending) { + dir->sync_pending = FALSE; + director_sync(dir); + } + array_foreach_elem(&dir->connections, conn) + director_connection_uncork(conn); +} + +void director_notify_ring_added(struct director_host *added_host, + struct director_host *src, bool log) +{ + const char *cmd; + + if (log) { + e_info(added_host->dir->event, + "Adding director %s to ring (requested by %s)", + added_host->name, src->name); + } + + added_host->dir->ring_change_counter++; + cmd = t_strdup_printf("DIRECTOR\t%s\t%u\n", + added_host->ip_str, added_host->port); + director_update_send(added_host->dir, src, cmd); +} + +static void director_hosts_purge_removed(struct director *dir) +{ + struct director_host *const *hosts, *host; + unsigned int i, count; + + timeout_remove(&dir->to_remove_dirs); + + hosts = array_get(&dir->dir_hosts, &count); + for (i = 0; i < count; ) { + if (hosts[i]->removed) { + host = hosts[i]; + director_host_free(&host); + hosts = array_get(&dir->dir_hosts, &count); + } else { + i++; + } + } +} + +void director_ring_remove(struct director_host *removed_host, + struct director_host *src) +{ + struct director *dir = removed_host->dir; + struct director_connection *const *conns, *conn; + unsigned int i, count; + const char *cmd; + + e_info(dir->event, "Removing director %s from ring (requested by %s)", + removed_host->name, src->name); + + if (removed_host->self && !src->self) { + /* others will just disconnect us */ + return; + } + + if (!removed_host->self) { + /* mark the host as removed and fully remove it later. this + delay is needed, because the removal may trigger director + reconnections, which may send the director back and we don't + want to re-add it */ + removed_host->removed = TRUE; + if (dir->to_remove_dirs == NULL) { + dir->to_remove_dirs = + timeout_add(DIRECTOR_DELAYED_DIR_REMOVE_MSECS, + director_hosts_purge_removed, dir); + } + } + + /* if our left or ride side gets removed, notify them first + before disconnecting. */ + cmd = t_strdup_printf("DIRECTOR-REMOVE\t%s\t%u\n", + removed_host->ip_str, removed_host->port); + director_update_send_version(dir, src, + DIRECTOR_VERSION_RING_REMOVE, cmd); + + /* disconnect any connections to the host */ + conns = array_get(&dir->connections, &count); + for (i = 0; i < count; ) { + conn = conns[i]; + if (director_connection_get_host(conn) != removed_host || + removed_host->self) + i++; + else { + director_connection_deinit(&conn, "Removing from ring"); + conns = array_get(&dir->connections, &count); + } + } + if (dir->right == NULL) + director_connect(dir, "Reconnecting after director was removed"); + director_sync(dir); +} + +static void +director_send_host(struct director *dir, struct director_host *src, + struct director_host *orig_src, + struct mail_host *host) +{ + const char *host_tag = mail_host_get_tag(host); + string_t *str; + + if (orig_src == NULL) { + orig_src = dir->self_host; + orig_src->last_seq++; + } + + str = t_str_new(128); + str_printfa(str, "HOST\t%s\t%u\t%u\t%s\t%u", + orig_src->ip_str, orig_src->port, orig_src->last_seq, + host->ip_str, host->vhost_count); + if (dir->ring_min_version >= DIRECTOR_VERSION_TAGS_V2) { + str_append_c(str, '\t'); + str_append_tabescaped(str, host_tag); + } else if (host_tag[0] != '\0' && + dir->ring_min_version < DIRECTOR_VERSION_TAGS_V2) { + if (dir->ring_min_version < DIRECTOR_VERSION_TAGS) { + e_error(dir->event, "Ring has directors that don't support tags - removing host %s with tag '%s'", + host->ip_str, host_tag); + } else { + e_error(dir->event, "Ring has directors that support mixed versions of tags - removing host %s with tag '%s'", + host->ip_str, host_tag); + } + director_remove_host(dir, NULL, NULL, host); + return; + } + if (dir->ring_min_version >= DIRECTOR_VERSION_UPDOWN) { + str_printfa(str, "\t%c%ld\t", host->down ? 'D' : 'U', + (long)host->last_updown_change); + /* add any further version checks here - these directors ignore + any extra unknown arguments */ + if (host->hostname != NULL) + str_append_tabescaped(str, host->hostname); + } + str_append_c(str, '\n'); + director_update_send(dir, src, str_c(str)); +} + +void director_resend_hosts(struct director *dir) +{ + struct mail_host *host; + + array_foreach_elem(mail_hosts_get(dir->mail_hosts), host) + director_send_host(dir, dir->self_host, NULL, host); +} + +void director_update_host(struct director *dir, struct director_host *src, + struct director_host *orig_src, + struct mail_host *host) +{ + /* update state in case this is the first mail host being added */ + director_set_state_changed(dir); + + e_debug(dir->event, "Updating host %s vhost_count=%u " + "down=%d last_updown_change=%ld (hosts_hash=%u)", + host->ip_str, host->vhost_count, host->down ? 1 : 0, + (long)host->last_updown_change, + mail_hosts_hash(dir->mail_hosts)); + + director_send_host(dir, src, orig_src, host); + + /* mark the host desynced until ring is synced again. except if we're + alone in the ring that never happens. */ + if (dir->right != NULL || dir->left != NULL) + host->desynced = TRUE; + director_sync(dir); +} + +void director_remove_host(struct director *dir, struct director_host *src, + struct director_host *orig_src, + struct mail_host *host) +{ + struct user_directory *users = host->tag->users; + + if (src != NULL) { + if (orig_src == NULL) { + orig_src = dir->self_host; + orig_src->last_seq++; + } + + director_update_send(dir, src, t_strdup_printf( + "HOST-REMOVE\t%s\t%u\t%u\t%s\n", + orig_src->ip_str, orig_src->port, + orig_src->last_seq, host->ip_str)); + } + + user_directory_remove_host(users, host); + mail_host_remove(host); + director_sync(dir); +} + +void director_flush_host(struct director *dir, struct director_host *src, + struct director_host *orig_src, + struct mail_host *host) +{ + struct user_directory *users = host->tag->users; + + if (orig_src == NULL) { + orig_src = dir->self_host; + orig_src->last_seq++; + } + + director_update_send(dir, src, t_strdup_printf( + "HOST-FLUSH\t%s\t%u\t%u\t%s\n", + orig_src->ip_str, orig_src->port, orig_src->last_seq, + host->ip_str)); + user_directory_remove_host(users, host); + director_sync(dir); +} + +void director_update_user(struct director *dir, struct director_host *src, + struct user *user) +{ + struct director_connection *conn; + + i_assert(src != NULL); + i_assert(!user->weak); + + array_foreach_elem(&dir->connections, conn) { + if (director_connection_get_host(conn) == src) + continue; + + if (director_connection_get_minor_version(conn) >= DIRECTOR_VERSION_USER_TIMESTAMP) { + director_connection_send(conn, t_strdup_printf( + "USER\t%u\t%s\t%u\n", user->username_hash, user->host->ip_str, + user->timestamp)); + } else { + director_connection_send(conn, t_strdup_printf( + "USER\t%u\t%s\n", user->username_hash, user->host->ip_str)); + } + } +} + +void director_update_user_weak(struct director *dir, struct director_host *src, + struct director_connection *src_conn, + struct director_host *orig_src, + struct user *user) +{ + const char *cmd; + + i_assert(src != NULL); + i_assert(user->weak); + + if (orig_src == NULL) { + orig_src = dir->self_host; + orig_src->last_seq++; + } + + cmd = t_strdup_printf("USER-WEAK\t%s\t%u\t%u\t%u\t%s\n", + orig_src->ip_str, orig_src->port, orig_src->last_seq, + user->username_hash, user->host->ip_str); + + if (src != dir->self_host && dir->left != NULL && dir->right != NULL && + director_connection_get_host(dir->left) == + director_connection_get_host(dir->right)) { + /* only two directors in this ring and we're forwarding + USER-WEAK from one director back to itself via another + so it sees we've received it. we can't use + director_update_send() for this, because it doesn't send + data back to the source. */ + if (dir->right == src_conn) + director_connection_send(dir->left, cmd); + else if (dir->left == src_conn) + director_connection_send(dir->right, cmd); + else + i_unreached(); + } else { + director_update_send(dir, src, cmd); + } +} + +static void +director_flush_user_continue(enum program_client_exit_status result, + struct director_kill_context *ctx) +{ + struct director *dir = ctx->dir; + ctx->callback_pending = FALSE; + + struct user *user = user_directory_lookup(ctx->tag->users, + ctx->username_hash); + + if (result == PROGRAM_CLIENT_EXIT_STATUS_FAILURE) { + struct istream *is = iostream_temp_finish(&ctx->reply, SIZE_MAX); + char *data; + i_stream_set_return_partial_line(is, TRUE); + data = i_stream_read_next_line(is); + e_error(dir->event, "%s: Failed to flush user hash %u in host %s: %s", + ctx->socket_path, + ctx->username_hash, + net_ip2addr(&ctx->host_ip), + data == NULL ? "(no output to stdout)" : data); + while((data = i_stream_read_next_line(is)) != NULL) { + e_error(dir->event, "%s: Failed to flush user hash %u in host %s: %s", + ctx->socket_path, + ctx->username_hash, + net_ip2addr(&ctx->host_ip), data); + } + i_stream_unref(&is); + } else { + o_stream_unref(&ctx->reply); + } + program_client_destroy(&ctx->pclient); + + if (!DIRECTOR_KILL_CONTEXT_IS_VALID(user, ctx)) { + /* user was already freed - ignore */ + e_debug(dir->event, "User %u freed while flushing, result=%d", + ctx->username_hash, result); + i_assert(ctx->to_move == NULL); + i_free(ctx); + } else { + /* ctx is freed later via user->kill_ctx */ + e_debug(dir->event, "Flushing user %u finished, result=%d", + ctx->username_hash, result); + director_user_kill_finish_delayed(dir, user, + result == PROGRAM_CLIENT_EXIT_STATUS_SUCCESS); + } +} + +static void +director_flush_user(struct director *dir, struct user *user) +{ + struct director_kill_context *ctx = user->kill_ctx; + struct var_expand_table tab[] = { + { 'i', user->host->ip_str, "ip" }, + { 'h', user->host->hostname, "host" }, + { '\0', NULL, NULL } + }; + const char *error; + + /* Execute flush script, if set. Only the director that started the + user moving will call the flush script. Having each director do it + would be redundant since they're all supposed to be performing the + same flush task to the same backend. + + Flushing is also not triggered if we're moving a user that we just + created due to the user move. This means that the user doesn't have + an old host, so we couldn't really even perform any flushing on the + backend. */ + if (*dir->set->director_flush_socket == '\0' || + ctx->old_host_ip.family == 0 || + !ctx->kill_is_self_initiated) { + director_user_kill_finish_delayed(dir, user, FALSE); + return; + } + + ctx->host_ip = user->host->ip; + + string_t *s_sock = str_new(default_pool, 32); + if (var_expand(s_sock, dir->set->director_flush_socket, tab, &error) <= 0) { + e_error(dir->event, "Failed to expand director_flush_socket=%s: %s", + dir->set->director_flush_socket, error); + director_user_kill_finish_delayed(dir, user, FALSE); + return; + } + ctx->socket_path = str_free_without_data(&s_sock); + + struct program_client_settings set = { + .client_connect_timeout_msecs = 10000, + .dns_client_socket_path = DIRECTOR_DNS_SOCKET_PATH, + }; + + restrict_access_init(&set.restrict_set); + + const char *const args[] = { + "FLUSH", + t_strdup_printf("%u", user->username_hash), + net_ip2addr(&ctx->old_host_ip), + user->host->ip_str, + ctx->old_host_down ? "down" : "up", + dec2str(ctx->old_host_vhost_count), + NULL + }; + + ctx->kill_state = USER_KILL_STATE_FLUSHING; + e_debug(dir->event, "Flushing user %u via %s", user->username_hash, + ctx->socket_path); + + if ((program_client_create(ctx->socket_path, args, &set, FALSE, + &ctx->pclient, &error)) != 0) { + e_error(dir->event, "%s: Failed to flush user hash %u in host %s: %s", + ctx->socket_path, + user->username_hash, + user->host->ip_str, + error); + director_flush_user_continue(PROGRAM_CLIENT_EXIT_STATUS_FAILURE, + ctx); + return; + } + + ctx->reply = + iostream_temp_create_named("/tmp", 0, + t_strdup_printf("flush response from %s", + user->host->ip_str)); + o_stream_set_no_error_handling(ctx->reply, TRUE); + program_client_set_output(ctx->pclient, ctx->reply); + ctx->callback_pending = TRUE; + program_client_run_async(ctx->pclient, director_flush_user_continue, ctx); +} + +static void director_user_move_finished(struct director *dir) +{ + i_assert(dir->users_moving_count > 0); + dir->users_moving_count--; + + director_set_state_changed(dir); +} + +static void director_user_move_free(struct user *user) +{ + struct director *dir = user->kill_ctx->dir; + struct director_kill_context *kill_ctx = user->kill_ctx; + + i_assert(kill_ctx != NULL); + + e_debug(dir->event, "User %u move finished at state=%s", user->username_hash, + user_kill_state_names[kill_ctx->kill_state]); + + if (kill_ctx->ipc_cmd != NULL) + ipc_client_cmd_abort(dir->ipc_proxy, &kill_ctx->ipc_cmd); + timeout_remove(&kill_ctx->to_move); + i_free(kill_ctx->socket_path); + i_free(kill_ctx); + user->kill_ctx = NULL; + + director_user_move_finished(dir); +} + +static void +director_user_kill_finish_delayed_to(struct user *user) +{ + i_assert(user->kill_ctx != NULL); + i_assert(user->kill_ctx->kill_state == USER_KILL_STATE_DELAY); + + director_user_move_free(user); +} + +static void +director_user_kill_finish_delayed(struct director *dir, struct user *user, + bool skip_delay) +{ + if (skip_delay) { + user->kill_ctx->kill_state = USER_KILL_STATE_NONE; + director_user_move_free(user); + return; + } + + user->kill_ctx->kill_state = USER_KILL_STATE_DELAY; + + /* wait for a while for the kills to finish in the backend server, + so there are no longer any processes running for the user before we + start letting new in connections to the new server. */ + timeout_remove(&user->kill_ctx->to_move); + user->kill_ctx->to_move = + timeout_add(dir->set->director_user_kick_delay * 1000, + director_user_kill_finish_delayed_to, user); +} + +static void +director_finish_user_kill(struct director *dir, struct user *user, bool self) +{ + struct director_kill_context *kill_ctx = user->kill_ctx; + + i_assert(kill_ctx != NULL); + i_assert(kill_ctx->kill_state != USER_KILL_STATE_FLUSHING); + i_assert(kill_ctx->kill_state != USER_KILL_STATE_DELAY); + + e_debug(dir->event, "User %u kill finished - %sstate=%s", user->username_hash, + self ? "we started it " : "", + user_kill_state_names[kill_ctx->kill_state]); + + if (dir->right == NULL) { + /* we're alone */ + director_flush_user(dir, user); + } else if (self || + kill_ctx->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED) { + director_connection_send(dir->right, t_strdup_printf( + "USER-KILLED\t%u\n", user->username_hash)); + kill_ctx->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE; + } else { + i_assert(kill_ctx->kill_state == USER_KILL_STATE_KILLING); + kill_ctx->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY; + } +} + +static void director_user_kill_fail_throttled(unsigned int new_events_count, + void *context ATTR_UNUSED) +{ + i_error("Failed to kill %u users' connections", new_events_count); +} + +static void director_kill_user_callback(enum ipc_client_cmd_state state, + const char *data, void *context) +{ + struct director_kill_context *ctx = context; + struct user *user; + + /* don't try to abort the IPC command anymore */ + ctx->ipc_cmd = NULL; + + /* this is an asynchronous notification about user being killed. + there are no guarantees about what might have happened to the user + in the mean time. */ + switch (state) { + case IPC_CLIENT_CMD_STATE_REPLY: + /* shouldn't get here. the command reply isn't finished yet. */ + e_error(ctx->dir->event, + "login process sent unexpected reply to kick: %s", data); + return; + case IPC_CLIENT_CMD_STATE_OK: + break; + case IPC_CLIENT_CMD_STATE_ERROR: + if (log_throttle_accept(user_kill_fail_throttle)) { + e_error(ctx->dir->event, + "Failed to kill user %u connections: %s", + ctx->username_hash, data); + } + /* we can't really do anything but continue anyway */ + break; + } + + i_assert(ctx->dir->users_kicking_count > 0); + ctx->dir->users_kicking_count--; + if (ctx->dir->kick_callback != NULL) + ctx->dir->kick_callback(ctx->dir); + + user = user_directory_lookup(ctx->tag->users, ctx->username_hash); + if (!DIRECTOR_KILL_CONTEXT_IS_VALID(user, ctx)) { + /* user was already freed - ignore */ + i_assert(ctx->to_move == NULL); + director_user_move_finished(ctx->dir); + i_free(ctx); + } else { + i_assert(ctx->kill_state == USER_KILL_STATE_KILLING || + ctx->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED); + /* we were still waiting for the kill notification */ + director_finish_user_kill(ctx->dir, user, ctx->kill_is_self_initiated); + } +} + +static void director_user_move_throttled(unsigned int new_events_count, + void *context ATTR_UNUSED) +{ + i_error("%u users' move timed out, their state may now be inconsistent", + new_events_count); +} + +static void director_user_move_timeout(struct user *user) +{ + i_assert(user->kill_ctx != NULL); + i_assert(user->kill_ctx->kill_state != USER_KILL_STATE_DELAY); + + if (log_throttle_accept(user_move_throttle)) { + e_error(user->kill_ctx->dir->event, + "Finishing user %u move timed out, " + "its state may now be inconsistent (state=%s)", + user->username_hash, + user_kill_state_names[user->kill_ctx->kill_state]); + } + if (user->kill_ctx->kill_state == USER_KILL_STATE_FLUSHING) { + o_stream_unref(&user->kill_ctx->reply); + program_client_destroy(&user->kill_ctx->pclient); + } + director_user_move_free(user); +} + +void director_kill_user(struct director *dir, struct director_host *src, + struct user *user, struct mail_tag *tag, + struct mail_host *old_host, bool forced_kick) +{ + struct director_kill_context *ctx; + const char *cmd; + + if (USER_IS_BEING_KILLED(user)) { + /* User is being moved again before the previous move + finished. We'll just continue wherever we left off + earlier. */ + e_debug(dir->event, "User %u move restarted - previous kill_state=%s", + user->username_hash, + user_kill_state_names[user->kill_ctx->kill_state]); + return; + } + + user->kill_ctx = ctx = i_new(struct director_kill_context, 1); + ctx->dir = dir; + ctx->tag = tag; + ctx->username_hash = user->username_hash; + ctx->kill_is_self_initiated = src->self; + if (old_host != NULL) { + ctx->old_host_ip = old_host->ip; + ctx->old_host_down = old_host->down; + ctx->old_host_vhost_count = old_host->vhost_count; + } + + dir->users_moving_count++; + ctx->to_move = timeout_add(DIRECTOR_USER_MOVE_TIMEOUT_MSECS, + director_user_move_timeout, user); + ctx->kill_state = USER_KILL_STATE_KILLING; + + if ((old_host != NULL && old_host != user->host) || forced_kick) { + cmd = t_strdup_printf("proxy\t*\tKICK-DIRECTOR-HASH\t%u", + user->username_hash); + dir->users_kicking_count++; + ctx->ipc_cmd = ipc_client_cmd(dir->ipc_proxy, cmd, + director_kill_user_callback, ctx); + } else { + /* a) we didn't even know about the user before now. + don't bother performing a local kick, since it wouldn't + kick anything. + b) our host was already correct. notify others that we have + killed the user, but don't really do it. */ + director_finish_user_kill(ctx->dir, user, + ctx->kill_is_self_initiated); + } +} + +void director_move_user(struct director *dir, struct director_host *src, + struct director_host *orig_src, + unsigned int username_hash, struct mail_host *host) +{ + struct user_directory *users = host->tag->users; + struct mail_host *old_host = NULL; + struct user *user; + + /* 1. move this user's host, and set its "killing" flag to delay all of + its future connections until all directors have killed the + connections and notified us about it. + + 2. tell the other directors about the move + + 3. once user kill callback is called, tell the other directors + with USER-KILLED that we're done killing the user. + + 4. when some director gets a duplicate USER-KILLED, it's + responsible for notifying all directors that user is completely + killed. + + 5. after receiving USER-KILLED-EVERYWHERE notification, + new connections are again allowed for the user. + */ + user = user_directory_lookup(users, username_hash); + if (user == NULL) { + e_debug(dir->event, "User %u move started: User was nonexistent", + username_hash); + user = user_directory_add(users, username_hash, + host, ioloop_time); + } else if (user->host == host) { + /* User is already in the wanted host, but another director + didn't think so. We'll need to finish the move without + killing any of our connections. */ + old_host = user->host; + user->timestamp = ioloop_time; + e_debug(dir->event, "User %u move forwarded: host is already %s", + username_hash, host->ip_str); + } else { + /* user is looked up via the new host's tag, so if it's found + the old tag has to be the same. */ + i_assert(user->host->tag == host->tag); + + old_host = user->host; + user->host->user_count--; + user->host = host; + user->host->user_count++; + user->timestamp = ioloop_time; + e_debug(dir->event, "User %u move started: host %s -> %s", + username_hash, old_host->ip_str, + host->ip_str); + } + + if (orig_src == NULL) { + orig_src = dir->self_host; + orig_src->last_seq++; + } + director_update_send(dir, src, t_strdup_printf( + "USER-MOVE\t%s\t%u\t%u\t%u\t%s\n", + orig_src->ip_str, orig_src->port, orig_src->last_seq, + user->username_hash, user->host->ip_str)); + /* kill the user only after sending the USER-MOVE, because the kill + may finish instantly. */ + director_kill_user(dir, src, user, host->tag, old_host, FALSE); +} + +static void +director_kick_user_callback(enum ipc_client_cmd_state state, + const char *data, void *context) +{ + struct director *dir = context; + + if (state == IPC_CLIENT_CMD_STATE_REPLY) { + /* shouldn't get here. the command reply isn't finished yet. */ + e_error(dir->event, "login process sent unexpected reply to kick: %s", data); + return; + } + + i_assert(dir->users_kicking_count > 0); + dir->users_kicking_count--; + if (dir->kick_callback != NULL) + dir->kick_callback(dir); +} + +void director_kick_user(struct director *dir, struct director_host *src, + struct director_host *orig_src, const char *username) +{ + string_t *cmd = t_str_new(64); + + str_append(cmd, "proxy\t*\tKICK\t"); + str_append_tabescaped(cmd, username); + dir->users_kicking_count++; + ipc_client_cmd(dir->ipc_proxy, str_c(cmd), + director_kick_user_callback, dir); + + if (orig_src == NULL) { + orig_src = dir->self_host; + orig_src->last_seq++; + } + str_truncate(cmd, 0); + str_printfa(cmd, "USER-KICK\t%s\t%u\t%u\t", + orig_src->ip_str, orig_src->port, orig_src->last_seq); + str_append_tabescaped(cmd, username); + str_append_c(cmd, '\n'); + director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK, str_c(cmd)); +} + +void director_kick_user_alt(struct director *dir, struct director_host *src, + struct director_host *orig_src, + const char *field, const char *value) +{ + string_t *cmd = t_str_new(64); + + str_append(cmd, "proxy\t*\tKICK-ALT\t"); + str_append_tabescaped(cmd, field); + str_append_c(cmd, '\t'); + str_append_tabescaped(cmd, value); + dir->users_kicking_count++; + ipc_client_cmd(dir->ipc_proxy, str_c(cmd), + director_kick_user_callback, dir); + + if (orig_src == NULL) { + orig_src = dir->self_host; + orig_src->last_seq++; + } + str_truncate(cmd, 0); + str_printfa(cmd, "USER-KICK-ALT\t%s\t%u\t%u\t", + orig_src->ip_str, orig_src->port, orig_src->last_seq); + str_append_tabescaped(cmd, field); + str_append_c(cmd, '\t'); + str_append_tabescaped(cmd, value); + str_append_c(cmd, '\n'); + director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK_ALT, str_c(cmd)); +} + +void director_kick_user_hash(struct director *dir, struct director_host *src, + struct director_host *orig_src, + unsigned int username_hash, + const struct ip_addr *except_ip) +{ + const char *cmd; + + cmd = t_strdup_printf("proxy\t*\tKICK-DIRECTOR-HASH\t%u\t%s", + username_hash, net_ip2addr(except_ip)); + dir->users_kicking_count++; + ipc_client_cmd(dir->ipc_proxy, cmd, + director_kick_user_callback, dir); + + if (orig_src == NULL) { + orig_src = dir->self_host; + orig_src->last_seq++; + } + cmd = t_strdup_printf("USER-KICK-HASH\t%s\t%u\t%u\t%u\t%s\n", + orig_src->ip_str, orig_src->port, orig_src->last_seq, + username_hash, net_ip2addr(except_ip)); + director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK, cmd); +} + +static void +director_send_user_killed_everywhere(struct director *dir, + struct director_host *src, + struct director_host *orig_src, + unsigned int username_hash) +{ + if (orig_src == NULL) { + orig_src = dir->self_host; + orig_src->last_seq++; + } + director_update_send(dir, src, t_strdup_printf( + "USER-KILLED-EVERYWHERE\t%s\t%u\t%u\t%u\n", + orig_src->ip_str, orig_src->port, orig_src->last_seq, + username_hash)); +} + +static void +director_user_tag_killed(struct director *dir, struct mail_tag *tag, + unsigned int username_hash) +{ + struct user *user; + + user = user_directory_lookup(tag->users, username_hash); + if (user == NULL || !USER_IS_BEING_KILLED(user)) + return; + + switch (user->kill_ctx->kill_state) { + case USER_KILL_STATE_KILLING: + user->kill_ctx->kill_state = USER_KILL_STATE_KILLING_NOTIFY_RECEIVED; + break; + case USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY: + director_finish_user_kill(dir, user, TRUE); + break; + case USER_KILL_STATE_KILLING_NOTIFY_RECEIVED: + e_debug(dir->event, "User %u kill_state=%s - ignoring USER-KILLED", + username_hash, user_kill_state_names[user->kill_ctx->kill_state]); + break; + case USER_KILL_STATE_NONE: + case USER_KILL_STATE_FLUSHING: + case USER_KILL_STATE_DELAY: + /* move restarted. state=none can also happen if USER-MOVE was + sent while we were still moving. send back + USER-KILLED-EVERYWHERE to avoid hangs. */ + director_send_user_killed_everywhere(dir, dir->self_host, NULL, + username_hash); + break; + case USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE: + director_user_killed_everywhere(dir, dir->self_host, + NULL, username_hash); + break; + } +} + +void director_user_killed(struct director *dir, unsigned int username_hash) +{ + struct mail_tag *tag; + + array_foreach_elem(mail_hosts_get_tags(dir->mail_hosts), tag) + director_user_tag_killed(dir, tag, username_hash); +} + +static void +director_user_tag_killed_everywhere(struct director *dir, + struct mail_tag *tag, + struct director_host *src, + struct director_host *orig_src, + unsigned int username_hash) +{ + struct user *user; + + user = user_directory_lookup(tag->users, username_hash); + if (user == NULL) { + e_debug(dir->event, "User %u no longer exists - ignoring USER-KILLED-EVERYWHERE", + username_hash); + return; + } + if (!USER_IS_BEING_KILLED(user)) { + e_debug(dir->event, "User %u is no longer being killed - ignoring USER-KILLED-EVERYWHERE", + username_hash); + return; + } + if (user->kill_ctx->kill_state != USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE) { + e_debug(dir->event, "User %u kill_state=%s - ignoring USER-KILLED-EVERYWHERE", + username_hash, user_kill_state_names[user->kill_ctx->kill_state]); + return; + } + + director_flush_user(dir, user); + director_send_user_killed_everywhere(dir, src, orig_src, username_hash); +} + +void director_user_killed_everywhere(struct director *dir, + struct director_host *src, + struct director_host *orig_src, + unsigned int username_hash) +{ + struct mail_tag *tag; + + array_foreach_elem(mail_hosts_get_tags(dir->mail_hosts), tag) { + director_user_tag_killed_everywhere(dir, tag, src, orig_src, + username_hash); + } +} + +static void director_state_callback_timeout(struct director *dir) +{ + timeout_remove(&dir->to_callback); + dir->state_change_callback(dir); +} + +void director_set_state_changed(struct director *dir) +{ + /* we may get called to here from various places. use a timeout to + make sure the state callback is called with a clean state. */ + if (dir->to_callback == NULL) { + dir->to_callback = + timeout_add(0, director_state_callback_timeout, dir); + } +} + +void director_update_send(struct director *dir, struct director_host *src, + const char *cmd) +{ + director_update_send_version(dir, src, 0, cmd); +} + +void director_update_send_version(struct director *dir, + struct director_host *src, + unsigned int min_version, const char *cmd) +{ + struct director_connection *conn; + + i_assert(src != NULL); + + array_foreach_elem(&dir->connections, conn) { + if (director_connection_get_host(conn) != src && + director_connection_get_minor_version(conn) >= min_version) + director_connection_send(conn, cmd); + } +} + +static void director_user_freed(struct user *user) +{ + if (user->kill_ctx != NULL) { + /* director_user_expire is very short. user expired before + moving the user finished or timed out. */ + if (user->kill_ctx->callback_pending) { + /* kill_ctx is used as a callback parameter. + only remove the timeout and finish the free later. */ + timeout_remove(&user->kill_ctx->to_move); + } else { + director_user_move_free(user); + } + } +} + +struct director * +director_init(const struct director_settings *set, + const struct ip_addr *listen_ip, in_port_t listen_port, + director_state_change_callback_t *callback, + director_kick_callback_t *kick_callback) +{ + struct director *dir; + + dir = i_new(struct director, 1); + dir->set = set; + dir->self_port = listen_port; + dir->self_ip = *listen_ip; + dir->state_change_callback = callback; + dir->kick_callback = kick_callback; + dir->event = event_create(NULL); + event_add_category(dir->event, &event_category_director); + i_array_init(&dir->dir_hosts, 16); + i_array_init(&dir->pending_requests, 16); + i_array_init(&dir->connections, 8); + dir->mail_hosts = mail_hosts_init(dir, set->director_user_expire, + director_user_freed); + + dir->ipc_proxy = ipc_client_init(DIRECTOR_IPC_PROXY_PATH); + dir->ring_min_version = DIRECTOR_VERSION_MINOR; + return dir; +} + +void director_deinit(struct director **_dir) +{ + struct director *dir = *_dir; + struct director_host *const *hostp, *host; + struct director_connection *conn, *const *connp; + + *_dir = NULL; + + while (array_count(&dir->connections) > 0) { + connp = array_front(&dir->connections); + conn = *connp; + director_connection_deinit(&conn, "Shutting down"); + } + + mail_hosts_deinit(&dir->mail_hosts); + mail_hosts_deinit(&dir->orig_config_hosts); + + ipc_client_deinit(&dir->ipc_proxy); + timeout_remove(&dir->to_reconnect); + timeout_remove(&dir->to_handshake_warning); + timeout_remove(&dir->to_request); + timeout_remove(&dir->to_sync); + timeout_remove(&dir->to_remove_dirs); + timeout_remove(&dir->to_callback); + while (array_count(&dir->dir_hosts) > 0) { + hostp = array_front(&dir->dir_hosts); + host = *hostp; + director_host_free(&host); + } + array_free(&dir->pending_requests); + array_free(&dir->dir_hosts); + array_free(&dir->connections); + event_unref(&dir->event); + i_free(dir); +} + +struct director_user_iter { + struct director *dir; + unsigned int tag_idx; + struct user_directory_iter *user_iter; + bool iter_until_current_tail; +}; + +struct director_user_iter * +director_iterate_users_init(struct director *dir, bool iter_until_current_tail) +{ + struct director_user_iter *iter = i_new(struct director_user_iter, 1); + iter->dir = dir; + iter->iter_until_current_tail = iter_until_current_tail; + return iter; +} + +struct user *director_iterate_users_next(struct director_user_iter *iter) +{ + const ARRAY_TYPE(mail_tag) *tags; + struct user *user; + + i_assert(iter != NULL); + + if (iter->user_iter == NULL) { + tags = mail_hosts_get_tags(iter->dir->mail_hosts); + if (iter->tag_idx >= array_count(tags)) + return NULL; + struct mail_tag *tag = array_idx_elem(tags, iter->tag_idx); + iter->user_iter = user_directory_iter_init(tag->users, + iter->iter_until_current_tail); + } + user = user_directory_iter_next(iter->user_iter); + if (user == NULL) { + user_directory_iter_deinit(&iter->user_iter); + iter->tag_idx++; + return director_iterate_users_next(iter); + } else + return user; +} + +void director_iterate_users_deinit(struct director_user_iter **_iter) +{ + i_assert(_iter != NULL && *_iter != NULL); + struct director_user_iter *iter = *_iter; + *_iter = NULL; + if (iter->user_iter != NULL) + user_directory_iter_deinit(&iter->user_iter); + i_free(iter); +} + +bool +director_get_username_hash(struct director *dir, const char *username, + unsigned int *hash_r) +{ + const char *error; + + if (mail_user_hash(username, dir->set->director_username_hash, hash_r, + &error)) + return TRUE; + e_error(dir->event, "Failed to expand director_username_hash=%s: %s", + dir->set->director_username_hash, error); + return FALSE; +} + +void directors_init(void) +{ + user_move_throttle = + log_throttle_init(&director_log_throttle_settings, + director_user_move_throttled, NULL); + user_kill_fail_throttle = + log_throttle_init(&director_log_throttle_settings, + director_user_kill_fail_throttled, NULL); +} + +void directors_deinit(void) +{ + log_throttle_deinit(&user_move_throttle); + log_throttle_deinit(&user_kill_fail_throttle); +} |