/* Copyright (c) 2010-2018 Dovecot authors, see the included COPYING file */ #include "lib.h" #include "ioloop.h" #include "array.h" #include "str.h" #include "strescape.h" #include "log-throttle.h" #include "ipc-client.h" #include "program-client.h" #include "var-expand.h" #include "istream.h" #include "ostream.h" #include "iostream-temp.h" #include "mail-user-hash.h" #include "user-directory.h" #include "mail-host.h" #include "director-host.h" #include "director-connection.h" #include "director.h" #define DIRECTOR_IPC_PROXY_PATH "ipc" #define DIRECTOR_DNS_SOCKET_PATH "dns-client" #define DIRECTOR_RECONNECT_RETRY_SECS 60 #define DIRECTOR_RECONNECT_TIMEOUT_MSECS (30*1000) #define DIRECTOR_USER_MOVE_TIMEOUT_MSECS (30*1000) #define DIRECTOR_SYNC_TIMEOUT_MSECS (5*1000) #define DIRECTOR_RING_MIN_WAIT_SECS 20 #define DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS 1000 #define DIRECTOR_DELAYED_DIR_REMOVE_MSECS (1000*30) const char *user_kill_state_names[USER_KILL_STATE_DELAY+1] = { "none", "killing", "notify-received", "waiting-for-notify", "waiting-for-everyone", "flushing", "delay", }; static struct event_category event_category_director = { .name = "director", }; static struct log_throttle *user_move_throttle; static struct log_throttle *user_kill_fail_throttle; static void director_hosts_purge_removed(struct director *dir); static const struct log_throttle_settings director_log_throttle_settings = { .throttle_at_max_per_interval = 100, .unthrottle_at_max_per_interval = 2, }; static void director_user_kill_finish_delayed(struct director *dir, struct user *user, bool skip_delay); static bool director_is_self_ip_set(struct director *dir) { if (net_ip_compare(&dir->self_ip, &net_ip4_any)) return FALSE; if (net_ip_compare(&dir->self_ip, &net_ip6_any)) return FALSE; return TRUE; } static void director_find_self_ip(struct director *dir) { struct director_host *const *hosts; unsigned int i, count; hosts = array_get(&dir->dir_hosts, &count); for (i = 0; i < count; i++) { if (net_try_bind(&hosts[i]->ip) == 0) { dir->self_ip = hosts[i]->ip; return; } } i_fatal("director_servers doesn't list ourself"); } void director_find_self(struct director *dir) { if (dir->self_host != NULL) return; if (!director_is_self_ip_set(dir)) director_find_self_ip(dir); dir->self_host = director_host_lookup(dir, &dir->self_ip, dir->self_port); if (dir->self_host == NULL) { i_fatal("director_servers doesn't list ourself (%s:%u)", net_ip2addr(&dir->self_ip), dir->self_port); } dir->self_host->self = TRUE; } static unsigned int director_find_self_idx(struct director *dir) { struct director_host *const *hosts; unsigned int i, count; i_assert(dir->self_host != NULL); hosts = array_get(&dir->dir_hosts, &count); for (i = 0; i < count; i++) { if (hosts[i] == dir->self_host) return i; } i_unreached(); } static bool director_has_outgoing_connection(struct director *dir, struct director_host *host) { struct director_connection *conn; array_foreach_elem(&dir->connections, conn) { if (director_connection_get_host(conn) == host && !director_connection_is_incoming(conn)) return TRUE; } return FALSE; } static void director_log_connect(struct director *dir, struct director_host *host, const char *reason) { string_t *str = t_str_new(128); if (host->last_network_failure > 0) { str_printfa(str, ", last network failure %ds ago", (int)(ioloop_time - host->last_network_failure)); } if (host->last_protocol_failure > 0) { str_printfa(str, ", last protocol failure %ds ago", (int)(ioloop_time - host->last_protocol_failure)); } e_info(dir->event, "Connecting to %s:%u (as %s%s): %s", host->ip_str, host->port, net_ip2addr(&dir->self_ip), str_c(str), reason); } int director_connect_host(struct director *dir, struct director_host *host, const char *reason) { in_port_t port; int fd; if (director_has_outgoing_connection(dir, host)) return 0; director_log_connect(dir, host, reason); port = dir->test_port != 0 ? dir->test_port : host->port; fd = net_connect_ip(&host->ip, port, &dir->self_ip); if (fd == -1) { host->last_network_failure = ioloop_time; e_error(dir->event, "connect(%s) failed: %m", host->name); return -1; } /* Reset timestamp so that director_connect() won't skip this host while we're still trying to connect to it */ host->last_network_failure = 0; (void)director_connection_init_out(dir, fd, host); return 0; } static struct director_host * director_get_preferred_right_host(struct director *dir) { struct director_host *const *hosts, *host; unsigned int i, count, self_idx; hosts = array_get(&dir->dir_hosts, &count); if (count == 1) { /* self */ return NULL; } self_idx = director_find_self_idx(dir); for (i = 0; i < count; i++) { host = hosts[(self_idx + i + 1) % count]; if (!host->removed) return host; } /* self, with some removed hosts */ return NULL; } static void director_quick_reconnect_retry(struct director *dir) { director_connect(dir, "Alone in director ring - trying to connect to others"); } static bool director_wait_for_others(struct director *dir) { struct director_host *host; /* don't assume we're alone until we've attempted to connect to others for a while */ if (dir->ring_first_alone != 0 && ioloop_time - dir->ring_first_alone > DIRECTOR_RING_MIN_WAIT_SECS) return FALSE; if (dir->ring_first_alone == 0) dir->ring_first_alone = ioloop_time; /* reset all failures and try again */ array_foreach_elem(&dir->dir_hosts, host) { host->last_network_failure = 0; host->last_protocol_failure = 0; } timeout_remove(&dir->to_reconnect); dir->to_reconnect = timeout_add(DIRECTOR_QUICK_RECONNECT_TIMEOUT_MSECS, director_quick_reconnect_retry, dir); return TRUE; } void director_connect(struct director *dir, const char *reason) { struct director_host *const *hosts; unsigned int i, count, self_idx; self_idx = director_find_self_idx(dir); /* try to connect to first working server on our right side. the left side is supposed to connect to us. */ hosts = array_get(&dir->dir_hosts, &count); for (i = 1; i < count; i++) { unsigned int idx = (self_idx + i) % count; if (hosts[idx]->removed) continue; if (hosts[idx]->last_network_failure + DIRECTOR_RECONNECT_RETRY_SECS > ioloop_time) { /* connection failed recently, don't try retrying here */ continue; } if (hosts[idx]->last_protocol_failure + DIRECTOR_PROTOCOL_FAILURE_RETRY_SECS > ioloop_time) { /* the director recently sent invalid protocol data, don't try retrying yet */ continue; } if (director_connect_host(dir, hosts[idx], reason) == 0) { /* success */ return; } } if (count > 1 && director_wait_for_others(dir)) return; /* we're the only one */ if (count > 1) { e_warning(dir->event, "director: Couldn't connect to right side, " "we must be the only director left"); } if (dir->left != NULL) { /* since we couldn't connect to it, it must have failed recently */ e_warning(dir->event, "director: Assuming %s is dead, disconnecting", director_connection_get_name(dir->left)); director_connection_deinit(&dir->left, "This connection is dead?"); } dir->ring_min_version = DIRECTOR_VERSION_MINOR; if (!dir->ring_handshaked) director_set_ring_handshaked(dir); else if (!dir->ring_synced) director_set_ring_synced(dir); } void director_set_ring_handshaked(struct director *dir) { i_assert(!dir->ring_handshaked); timeout_remove(&dir->to_handshake_warning); if (dir->ring_handshake_warning_sent) { e_warning(dir->event, "Directors have been connected, " "continuing delayed requests"); dir->ring_handshake_warning_sent = FALSE; } e_debug(dir->event, "Director ring handshaked"); dir->ring_handshaked = TRUE; director_set_ring_synced(dir); } static void director_reconnect_timeout(struct director *dir) { struct director_host *cur_host, *preferred_host = director_get_preferred_right_host(dir); cur_host = dir->right == NULL ? NULL : director_connection_get_host(dir->right); if (preferred_host == NULL) { /* all directors have been removed, try again later */ } else if (cur_host != preferred_host) { (void)director_connect_host(dir, preferred_host, "Reconnect attempt to preferred director"); } else { /* the connection hasn't finished sync yet. keep this timeout for now. */ } } void director_set_ring_synced(struct director *dir) { struct director_host *host; i_assert(!dir->ring_synced); i_assert((dir->left != NULL && dir->right != NULL) || (dir->left == NULL && dir->right == NULL)); timeout_remove(&dir->to_handshake_warning); if (dir->ring_handshake_warning_sent) { e_warning(dir->event, "Ring is synced, continuing delayed requests " "(syncing took %d secs, hosts_hash=%u)", (int)(ioloop_time - dir->ring_last_sync_time), mail_hosts_hash(dir->mail_hosts)); dir->ring_handshake_warning_sent = FALSE; } host = dir->right == NULL ? NULL : director_connection_get_host(dir->right); timeout_remove(&dir->to_reconnect); if (host != director_get_preferred_right_host(dir)) { /* try to reconnect to preferred host later */ dir->to_reconnect = timeout_add(DIRECTOR_RECONNECT_TIMEOUT_MSECS, director_reconnect_timeout, dir); } if (dir->left != NULL) director_connection_set_synced(dir->left, TRUE); if (dir->right != NULL) director_connection_set_synced(dir->right, TRUE); timeout_remove(&dir->to_sync); dir->ring_synced = TRUE; dir->ring_last_sync_time = ioloop_time; /* If there are any director hosts still marked as "removed", we can safely remove those now. The entire director cluster knows about the removal now. */ director_hosts_purge_removed(dir); mail_hosts_set_synced(dir->mail_hosts); director_set_state_changed(dir); } void director_sync_send(struct director *dir, struct director_host *host, uint32_t seq, unsigned int minor_version, unsigned int timestamp, unsigned int hosts_hash) { string_t *str; if (host == dir->self_host) { dir->last_sync_sent_ring_change_counter = dir->ring_change_counter; dir->last_sync_start_time = ioloop_timeval; } str = t_str_new(128); str_printfa(str, "SYNC\t%s\t%u\t%u", host->ip_str, host->port, seq); if (minor_version > 0 && director_connection_get_minor_version(dir->right) > 0) { /* only minor_version>0 supports extra parameters */ str_printfa(str, "\t%u\t%u\t%u", minor_version, timestamp, hosts_hash); } str_append_c(str, '\n'); director_connection_send(dir->right, str_c(str)); /* ping our connections in case either of them are hanging. if they are, we want to know it fast. */ if (dir->left != NULL) director_connection_ping(dir->left); director_connection_ping(dir->right); } static bool director_has_any_outgoing_connections(struct director *dir) { struct director_connection *conn; array_foreach_elem(&dir->connections, conn) { if (!director_connection_is_incoming(conn)) return TRUE; } return FALSE; } bool director_resend_sync(struct director *dir) { if (dir->ring_synced) { /* everything ok, no need to do anything */ return FALSE; } if (dir->right == NULL) { /* right side connection is missing. make sure we're not hanging due to some bug. */ if (dir->to_reconnect == NULL && !director_has_any_outgoing_connections(dir)) { e_warning(dir->event, "Right side connection is unexpectedly lost, reconnecting"); director_connect(dir, "Right side connection lost"); } } else if (dir->left != NULL) { /* send a new SYNC in case the previous one got dropped */ dir->self_host->last_sync_timestamp = ioloop_time; director_sync_send(dir, dir->self_host, dir->sync_seq, DIRECTOR_VERSION_MINOR, ioloop_time, mail_hosts_hash(dir->mail_hosts)); if (dir->to_sync != NULL) timeout_reset(dir->to_sync); return TRUE; } return FALSE; } static void director_sync_timeout(struct director *dir) { i_assert(!dir->ring_synced); if (director_resend_sync(dir)) e_error(dir->event, "Ring SYNC seq=%u appears to have got lost, resending", dir->sync_seq); } void director_set_ring_unsynced(struct director *dir) { if (dir->ring_synced) { dir->ring_synced = FALSE; dir->ring_last_sync_time = ioloop_time; } if (dir->to_sync == NULL) { dir->to_sync = timeout_add(DIRECTOR_SYNC_TIMEOUT_MSECS, director_sync_timeout, dir); } else { timeout_reset(dir->to_sync); } } static void director_sync(struct director *dir) { /* we're synced again when we receive this SYNC back */ dir->sync_seq++; if (dir->right == NULL && dir->left == NULL) { /* we're alone. if we're already synced, don't become unsynced. */ return; } director_set_ring_unsynced(dir); if (dir->sync_frozen) { dir->sync_pending = TRUE; return; } if (dir->right == NULL) { i_assert(!dir->ring_synced || (dir->left == NULL && dir->right == NULL)); e_debug(dir->event, "Ring is desynced (seq=%u, no right connection)", dir->sync_seq); return; } e_debug(dir->event, "Ring is desynced (seq=%u, sending SYNC to %s)", dir->sync_seq, dir->right == NULL ? "(nowhere)" : director_connection_get_name(dir->right)); /* send PINGs to our connections more rapidly until we've synced again. if the connection has actually died, we don't need to wait (and delay requests) for as long to detect it */ if (dir->left != NULL) director_connection_set_synced(dir->left, FALSE); director_connection_set_synced(dir->right, FALSE); director_sync_send(dir, dir->self_host, dir->sync_seq, DIRECTOR_VERSION_MINOR, ioloop_time, mail_hosts_hash(dir->mail_hosts)); } void director_sync_freeze(struct director *dir) { struct director_connection *conn; i_assert(!dir->sync_frozen); i_assert(!dir->sync_pending); array_foreach_elem(&dir->connections, conn) director_connection_cork(conn); dir->sync_frozen = TRUE; } void director_sync_thaw(struct director *dir) { struct director_connection *conn; i_assert(dir->sync_frozen); dir->sync_frozen = FALSE; if (dir->sync_pending) { dir->sync_pending = FALSE; director_sync(dir); } array_foreach_elem(&dir->connections, conn) director_connection_uncork(conn); } void director_notify_ring_added(struct director_host *added_host, struct director_host *src, bool log) { const char *cmd; if (log) { e_info(added_host->dir->event, "Adding director %s to ring (requested by %s)", added_host->name, src->name); } added_host->dir->ring_change_counter++; cmd = t_strdup_printf("DIRECTOR\t%s\t%u\n", added_host->ip_str, added_host->port); director_update_send(added_host->dir, src, cmd); } static void director_hosts_purge_removed(struct director *dir) { struct director_host *const *hosts, *host; unsigned int i, count; timeout_remove(&dir->to_remove_dirs); hosts = array_get(&dir->dir_hosts, &count); for (i = 0; i < count; ) { if (hosts[i]->removed) { host = hosts[i]; director_host_free(&host); hosts = array_get(&dir->dir_hosts, &count); } else { i++; } } } void director_ring_remove(struct director_host *removed_host, struct director_host *src) { struct director *dir = removed_host->dir; struct director_connection *const *conns, *conn; unsigned int i, count; const char *cmd; e_info(dir->event, "Removing director %s from ring (requested by %s)", removed_host->name, src->name); if (removed_host->self && !src->self) { /* others will just disconnect us */ return; } if (!removed_host->self) { /* mark the host as removed and fully remove it later. this delay is needed, because the removal may trigger director reconnections, which may send the director back and we don't want to re-add it */ removed_host->removed = TRUE; if (dir->to_remove_dirs == NULL) { dir->to_remove_dirs = timeout_add(DIRECTOR_DELAYED_DIR_REMOVE_MSECS, director_hosts_purge_removed, dir); } } /* if our left or ride side gets removed, notify them first before disconnecting. */ cmd = t_strdup_printf("DIRECTOR-REMOVE\t%s\t%u\n", removed_host->ip_str, removed_host->port); director_update_send_version(dir, src, DIRECTOR_VERSION_RING_REMOVE, cmd); /* disconnect any connections to the host */ conns = array_get(&dir->connections, &count); for (i = 0; i < count; ) { conn = conns[i]; if (director_connection_get_host(conn) != removed_host || removed_host->self) i++; else { director_connection_deinit(&conn, "Removing from ring"); conns = array_get(&dir->connections, &count); } } if (dir->right == NULL) director_connect(dir, "Reconnecting after director was removed"); director_sync(dir); } static void director_send_host(struct director *dir, struct director_host *src, struct director_host *orig_src, struct mail_host *host) { const char *host_tag = mail_host_get_tag(host); string_t *str; if (orig_src == NULL) { orig_src = dir->self_host; orig_src->last_seq++; } str = t_str_new(128); str_printfa(str, "HOST\t%s\t%u\t%u\t%s\t%u", orig_src->ip_str, orig_src->port, orig_src->last_seq, host->ip_str, host->vhost_count); if (dir->ring_min_version >= DIRECTOR_VERSION_TAGS_V2) { str_append_c(str, '\t'); str_append_tabescaped(str, host_tag); } else if (host_tag[0] != '\0' && dir->ring_min_version < DIRECTOR_VERSION_TAGS_V2) { if (dir->ring_min_version < DIRECTOR_VERSION_TAGS) { e_error(dir->event, "Ring has directors that don't support tags - removing host %s with tag '%s'", host->ip_str, host_tag); } else { e_error(dir->event, "Ring has directors that support mixed versions of tags - removing host %s with tag '%s'", host->ip_str, host_tag); } director_remove_host(dir, NULL, NULL, host); return; } if (dir->ring_min_version >= DIRECTOR_VERSION_UPDOWN) { str_printfa(str, "\t%c%ld\t", host->down ? 'D' : 'U', (long)host->last_updown_change); /* add any further version checks here - these directors ignore any extra unknown arguments */ if (host->hostname != NULL) str_append_tabescaped(str, host->hostname); } str_append_c(str, '\n'); director_update_send(dir, src, str_c(str)); } void director_resend_hosts(struct director *dir) { struct mail_host *host; array_foreach_elem(mail_hosts_get(dir->mail_hosts), host) director_send_host(dir, dir->self_host, NULL, host); } void director_update_host(struct director *dir, struct director_host *src, struct director_host *orig_src, struct mail_host *host) { /* update state in case this is the first mail host being added */ director_set_state_changed(dir); e_debug(dir->event, "Updating host %s vhost_count=%u " "down=%d last_updown_change=%ld (hosts_hash=%u)", host->ip_str, host->vhost_count, host->down ? 1 : 0, (long)host->last_updown_change, mail_hosts_hash(dir->mail_hosts)); director_send_host(dir, src, orig_src, host); /* mark the host desynced until ring is synced again. except if we're alone in the ring that never happens. */ if (dir->right != NULL || dir->left != NULL) host->desynced = TRUE; director_sync(dir); } void director_remove_host(struct director *dir, struct director_host *src, struct director_host *orig_src, struct mail_host *host) { struct user_directory *users = host->tag->users; if (src != NULL) { if (orig_src == NULL) { orig_src = dir->self_host; orig_src->last_seq++; } director_update_send(dir, src, t_strdup_printf( "HOST-REMOVE\t%s\t%u\t%u\t%s\n", orig_src->ip_str, orig_src->port, orig_src->last_seq, host->ip_str)); } user_directory_remove_host(users, host); mail_host_remove(host); director_sync(dir); } void director_flush_host(struct director *dir, struct director_host *src, struct director_host *orig_src, struct mail_host *host) { struct user_directory *users = host->tag->users; if (orig_src == NULL) { orig_src = dir->self_host; orig_src->last_seq++; } director_update_send(dir, src, t_strdup_printf( "HOST-FLUSH\t%s\t%u\t%u\t%s\n", orig_src->ip_str, orig_src->port, orig_src->last_seq, host->ip_str)); user_directory_remove_host(users, host); director_sync(dir); } void director_update_user(struct director *dir, struct director_host *src, struct user *user) { struct director_connection *conn; i_assert(src != NULL); i_assert(!user->weak); array_foreach_elem(&dir->connections, conn) { if (director_connection_get_host(conn) == src) continue; if (director_connection_get_minor_version(conn) >= DIRECTOR_VERSION_USER_TIMESTAMP) { director_connection_send(conn, t_strdup_printf( "USER\t%u\t%s\t%u\n", user->username_hash, user->host->ip_str, user->timestamp)); } else { director_connection_send(conn, t_strdup_printf( "USER\t%u\t%s\n", user->username_hash, user->host->ip_str)); } } } void director_update_user_weak(struct director *dir, struct director_host *src, struct director_connection *src_conn, struct director_host *orig_src, struct user *user) { const char *cmd; i_assert(src != NULL); i_assert(user->weak); if (orig_src == NULL) { orig_src = dir->self_host; orig_src->last_seq++; } cmd = t_strdup_printf("USER-WEAK\t%s\t%u\t%u\t%u\t%s\n", orig_src->ip_str, orig_src->port, orig_src->last_seq, user->username_hash, user->host->ip_str); if (src != dir->self_host && dir->left != NULL && dir->right != NULL && director_connection_get_host(dir->left) == director_connection_get_host(dir->right)) { /* only two directors in this ring and we're forwarding USER-WEAK from one director back to itself via another so it sees we've received it. we can't use director_update_send() for this, because it doesn't send data back to the source. */ if (dir->right == src_conn) director_connection_send(dir->left, cmd); else if (dir->left == src_conn) director_connection_send(dir->right, cmd); else i_unreached(); } else { director_update_send(dir, src, cmd); } } static void director_flush_user_continue(enum program_client_exit_status result, struct director_kill_context *ctx) { struct director *dir = ctx->dir; ctx->callback_pending = FALSE; struct user *user = user_directory_lookup(ctx->tag->users, ctx->username_hash); if (result == PROGRAM_CLIENT_EXIT_STATUS_FAILURE) { struct istream *is = iostream_temp_finish(&ctx->reply, SIZE_MAX); char *data; i_stream_set_return_partial_line(is, TRUE); data = i_stream_read_next_line(is); e_error(dir->event, "%s: Failed to flush user hash %u in host %s: %s", ctx->socket_path, ctx->username_hash, net_ip2addr(&ctx->host_ip), data == NULL ? "(no output to stdout)" : data); while((data = i_stream_read_next_line(is)) != NULL) { e_error(dir->event, "%s: Failed to flush user hash %u in host %s: %s", ctx->socket_path, ctx->username_hash, net_ip2addr(&ctx->host_ip), data); } i_stream_unref(&is); } else { o_stream_unref(&ctx->reply); } program_client_destroy(&ctx->pclient); if (!DIRECTOR_KILL_CONTEXT_IS_VALID(user, ctx)) { /* user was already freed - ignore */ e_debug(dir->event, "User %u freed while flushing, result=%d", ctx->username_hash, result); i_assert(ctx->to_move == NULL); i_free(ctx); } else { /* ctx is freed later via user->kill_ctx */ e_debug(dir->event, "Flushing user %u finished, result=%d", ctx->username_hash, result); director_user_kill_finish_delayed(dir, user, result == PROGRAM_CLIENT_EXIT_STATUS_SUCCESS); } } static void director_flush_user(struct director *dir, struct user *user) { struct director_kill_context *ctx = user->kill_ctx; struct var_expand_table tab[] = { { 'i', user->host->ip_str, "ip" }, { 'h', user->host->hostname, "host" }, { '\0', NULL, NULL } }; const char *error; /* Execute flush script, if set. Only the director that started the user moving will call the flush script. Having each director do it would be redundant since they're all supposed to be performing the same flush task to the same backend. Flushing is also not triggered if we're moving a user that we just created due to the user move. This means that the user doesn't have an old host, so we couldn't really even perform any flushing on the backend. */ if (*dir->set->director_flush_socket == '\0' || ctx->old_host_ip.family == 0 || !ctx->kill_is_self_initiated) { director_user_kill_finish_delayed(dir, user, FALSE); return; } ctx->host_ip = user->host->ip; string_t *s_sock = str_new(default_pool, 32); if (var_expand(s_sock, dir->set->director_flush_socket, tab, &error) <= 0) { e_error(dir->event, "Failed to expand director_flush_socket=%s: %s", dir->set->director_flush_socket, error); director_user_kill_finish_delayed(dir, user, FALSE); return; } ctx->socket_path = str_free_without_data(&s_sock); struct program_client_settings set = { .client_connect_timeout_msecs = 10000, .dns_client_socket_path = DIRECTOR_DNS_SOCKET_PATH, }; restrict_access_init(&set.restrict_set); const char *const args[] = { "FLUSH", t_strdup_printf("%u", user->username_hash), net_ip2addr(&ctx->old_host_ip), user->host->ip_str, ctx->old_host_down ? "down" : "up", dec2str(ctx->old_host_vhost_count), NULL }; ctx->kill_state = USER_KILL_STATE_FLUSHING; e_debug(dir->event, "Flushing user %u via %s", user->username_hash, ctx->socket_path); if ((program_client_create(ctx->socket_path, args, &set, FALSE, &ctx->pclient, &error)) != 0) { e_error(dir->event, "%s: Failed to flush user hash %u in host %s: %s", ctx->socket_path, user->username_hash, user->host->ip_str, error); director_flush_user_continue(PROGRAM_CLIENT_EXIT_STATUS_FAILURE, ctx); return; } ctx->reply = iostream_temp_create_named("/tmp", 0, t_strdup_printf("flush response from %s", user->host->ip_str)); o_stream_set_no_error_handling(ctx->reply, TRUE); program_client_set_output(ctx->pclient, ctx->reply); ctx->callback_pending = TRUE; program_client_run_async(ctx->pclient, director_flush_user_continue, ctx); } static void director_user_move_finished(struct director *dir) { i_assert(dir->users_moving_count > 0); dir->users_moving_count--; director_set_state_changed(dir); } static void director_user_move_free(struct user *user) { struct director *dir = user->kill_ctx->dir; struct director_kill_context *kill_ctx = user->kill_ctx; i_assert(kill_ctx != NULL); e_debug(dir->event, "User %u move finished at state=%s", user->username_hash, user_kill_state_names[kill_ctx->kill_state]); if (kill_ctx->ipc_cmd != NULL) ipc_client_cmd_abort(dir->ipc_proxy, &kill_ctx->ipc_cmd); timeout_remove(&kill_ctx->to_move); i_free(kill_ctx->socket_path); i_free(kill_ctx); user->kill_ctx = NULL; director_user_move_finished(dir); } static void director_user_kill_finish_delayed_to(struct user *user) { i_assert(user->kill_ctx != NULL); i_assert(user->kill_ctx->kill_state == USER_KILL_STATE_DELAY); director_user_move_free(user); } static void director_user_kill_finish_delayed(struct director *dir, struct user *user, bool skip_delay) { if (skip_delay) { user->kill_ctx->kill_state = USER_KILL_STATE_NONE; director_user_move_free(user); return; } user->kill_ctx->kill_state = USER_KILL_STATE_DELAY; /* wait for a while for the kills to finish in the backend server, so there are no longer any processes running for the user before we start letting new in connections to the new server. */ timeout_remove(&user->kill_ctx->to_move); user->kill_ctx->to_move = timeout_add(dir->set->director_user_kick_delay * 1000, director_user_kill_finish_delayed_to, user); } static void director_finish_user_kill(struct director *dir, struct user *user, bool self) { struct director_kill_context *kill_ctx = user->kill_ctx; i_assert(kill_ctx != NULL); i_assert(kill_ctx->kill_state != USER_KILL_STATE_FLUSHING); i_assert(kill_ctx->kill_state != USER_KILL_STATE_DELAY); e_debug(dir->event, "User %u kill finished - %sstate=%s", user->username_hash, self ? "we started it " : "", user_kill_state_names[kill_ctx->kill_state]); if (dir->right == NULL) { /* we're alone */ director_flush_user(dir, user); } else if (self || kill_ctx->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED) { director_connection_send(dir->right, t_strdup_printf( "USER-KILLED\t%u\n", user->username_hash)); kill_ctx->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE; } else { i_assert(kill_ctx->kill_state == USER_KILL_STATE_KILLING); kill_ctx->kill_state = USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY; } } static void director_user_kill_fail_throttled(unsigned int new_events_count, void *context ATTR_UNUSED) { i_error("Failed to kill %u users' connections", new_events_count); } static void director_kill_user_callback(enum ipc_client_cmd_state state, const char *data, void *context) { struct director_kill_context *ctx = context; struct user *user; /* don't try to abort the IPC command anymore */ ctx->ipc_cmd = NULL; /* this is an asynchronous notification about user being killed. there are no guarantees about what might have happened to the user in the mean time. */ switch (state) { case IPC_CLIENT_CMD_STATE_REPLY: /* shouldn't get here. the command reply isn't finished yet. */ e_error(ctx->dir->event, "login process sent unexpected reply to kick: %s", data); return; case IPC_CLIENT_CMD_STATE_OK: break; case IPC_CLIENT_CMD_STATE_ERROR: if (log_throttle_accept(user_kill_fail_throttle)) { e_error(ctx->dir->event, "Failed to kill user %u connections: %s", ctx->username_hash, data); } /* we can't really do anything but continue anyway */ break; } i_assert(ctx->dir->users_kicking_count > 0); ctx->dir->users_kicking_count--; if (ctx->dir->kick_callback != NULL) ctx->dir->kick_callback(ctx->dir); user = user_directory_lookup(ctx->tag->users, ctx->username_hash); if (!DIRECTOR_KILL_CONTEXT_IS_VALID(user, ctx)) { /* user was already freed - ignore */ i_assert(ctx->to_move == NULL); director_user_move_finished(ctx->dir); i_free(ctx); } else { i_assert(ctx->kill_state == USER_KILL_STATE_KILLING || ctx->kill_state == USER_KILL_STATE_KILLING_NOTIFY_RECEIVED); /* we were still waiting for the kill notification */ director_finish_user_kill(ctx->dir, user, ctx->kill_is_self_initiated); } } static void director_user_move_throttled(unsigned int new_events_count, void *context ATTR_UNUSED) { i_error("%u users' move timed out, their state may now be inconsistent", new_events_count); } static void director_user_move_timeout(struct user *user) { i_assert(user->kill_ctx != NULL); i_assert(user->kill_ctx->kill_state != USER_KILL_STATE_DELAY); if (log_throttle_accept(user_move_throttle)) { e_error(user->kill_ctx->dir->event, "Finishing user %u move timed out, " "its state may now be inconsistent (state=%s)", user->username_hash, user_kill_state_names[user->kill_ctx->kill_state]); } if (user->kill_ctx->kill_state == USER_KILL_STATE_FLUSHING) { o_stream_unref(&user->kill_ctx->reply); program_client_destroy(&user->kill_ctx->pclient); } director_user_move_free(user); } void director_kill_user(struct director *dir, struct director_host *src, struct user *user, struct mail_tag *tag, struct mail_host *old_host, bool forced_kick) { struct director_kill_context *ctx; const char *cmd; if (USER_IS_BEING_KILLED(user)) { /* User is being moved again before the previous move finished. We'll just continue wherever we left off earlier. */ e_debug(dir->event, "User %u move restarted - previous kill_state=%s", user->username_hash, user_kill_state_names[user->kill_ctx->kill_state]); return; } user->kill_ctx = ctx = i_new(struct director_kill_context, 1); ctx->dir = dir; ctx->tag = tag; ctx->username_hash = user->username_hash; ctx->kill_is_self_initiated = src->self; if (old_host != NULL) { ctx->old_host_ip = old_host->ip; ctx->old_host_down = old_host->down; ctx->old_host_vhost_count = old_host->vhost_count; } dir->users_moving_count++; ctx->to_move = timeout_add(DIRECTOR_USER_MOVE_TIMEOUT_MSECS, director_user_move_timeout, user); ctx->kill_state = USER_KILL_STATE_KILLING; if ((old_host != NULL && old_host != user->host) || forced_kick) { cmd = t_strdup_printf("proxy\t*\tKICK-DIRECTOR-HASH\t%u", user->username_hash); dir->users_kicking_count++; ctx->ipc_cmd = ipc_client_cmd(dir->ipc_proxy, cmd, director_kill_user_callback, ctx); } else { /* a) we didn't even know about the user before now. don't bother performing a local kick, since it wouldn't kick anything. b) our host was already correct. notify others that we have killed the user, but don't really do it. */ director_finish_user_kill(ctx->dir, user, ctx->kill_is_self_initiated); } } void director_move_user(struct director *dir, struct director_host *src, struct director_host *orig_src, unsigned int username_hash, struct mail_host *host) { struct user_directory *users = host->tag->users; struct mail_host *old_host = NULL; struct user *user; /* 1. move this user's host, and set its "killing" flag to delay all of its future connections until all directors have killed the connections and notified us about it. 2. tell the other directors about the move 3. once user kill callback is called, tell the other directors with USER-KILLED that we're done killing the user. 4. when some director gets a duplicate USER-KILLED, it's responsible for notifying all directors that user is completely killed. 5. after receiving USER-KILLED-EVERYWHERE notification, new connections are again allowed for the user. */ user = user_directory_lookup(users, username_hash); if (user == NULL) { e_debug(dir->event, "User %u move started: User was nonexistent", username_hash); user = user_directory_add(users, username_hash, host, ioloop_time); } else if (user->host == host) { /* User is already in the wanted host, but another director didn't think so. We'll need to finish the move without killing any of our connections. */ old_host = user->host; user->timestamp = ioloop_time; e_debug(dir->event, "User %u move forwarded: host is already %s", username_hash, host->ip_str); } else { /* user is looked up via the new host's tag, so if it's found the old tag has to be the same. */ i_assert(user->host->tag == host->tag); old_host = user->host; user->host->user_count--; user->host = host; user->host->user_count++; user->timestamp = ioloop_time; e_debug(dir->event, "User %u move started: host %s -> %s", username_hash, old_host->ip_str, host->ip_str); } if (orig_src == NULL) { orig_src = dir->self_host; orig_src->last_seq++; } director_update_send(dir, src, t_strdup_printf( "USER-MOVE\t%s\t%u\t%u\t%u\t%s\n", orig_src->ip_str, orig_src->port, orig_src->last_seq, user->username_hash, user->host->ip_str)); /* kill the user only after sending the USER-MOVE, because the kill may finish instantly. */ director_kill_user(dir, src, user, host->tag, old_host, FALSE); } static void director_kick_user_callback(enum ipc_client_cmd_state state, const char *data, void *context) { struct director *dir = context; if (state == IPC_CLIENT_CMD_STATE_REPLY) { /* shouldn't get here. the command reply isn't finished yet. */ e_error(dir->event, "login process sent unexpected reply to kick: %s", data); return; } i_assert(dir->users_kicking_count > 0); dir->users_kicking_count--; if (dir->kick_callback != NULL) dir->kick_callback(dir); } void director_kick_user(struct director *dir, struct director_host *src, struct director_host *orig_src, const char *username) { string_t *cmd = t_str_new(64); str_append(cmd, "proxy\t*\tKICK\t"); str_append_tabescaped(cmd, username); dir->users_kicking_count++; ipc_client_cmd(dir->ipc_proxy, str_c(cmd), director_kick_user_callback, dir); if (orig_src == NULL) { orig_src = dir->self_host; orig_src->last_seq++; } str_truncate(cmd, 0); str_printfa(cmd, "USER-KICK\t%s\t%u\t%u\t", orig_src->ip_str, orig_src->port, orig_src->last_seq); str_append_tabescaped(cmd, username); str_append_c(cmd, '\n'); director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK, str_c(cmd)); } void director_kick_user_alt(struct director *dir, struct director_host *src, struct director_host *orig_src, const char *field, const char *value) { string_t *cmd = t_str_new(64); str_append(cmd, "proxy\t*\tKICK-ALT\t"); str_append_tabescaped(cmd, field); str_append_c(cmd, '\t'); str_append_tabescaped(cmd, value); dir->users_kicking_count++; ipc_client_cmd(dir->ipc_proxy, str_c(cmd), director_kick_user_callback, dir); if (orig_src == NULL) { orig_src = dir->self_host; orig_src->last_seq++; } str_truncate(cmd, 0); str_printfa(cmd, "USER-KICK-ALT\t%s\t%u\t%u\t", orig_src->ip_str, orig_src->port, orig_src->last_seq); str_append_tabescaped(cmd, field); str_append_c(cmd, '\t'); str_append_tabescaped(cmd, value); str_append_c(cmd, '\n'); director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK_ALT, str_c(cmd)); } void director_kick_user_hash(struct director *dir, struct director_host *src, struct director_host *orig_src, unsigned int username_hash, const struct ip_addr *except_ip) { const char *cmd; cmd = t_strdup_printf("proxy\t*\tKICK-DIRECTOR-HASH\t%u\t%s", username_hash, net_ip2addr(except_ip)); dir->users_kicking_count++; ipc_client_cmd(dir->ipc_proxy, cmd, director_kick_user_callback, dir); if (orig_src == NULL) { orig_src = dir->self_host; orig_src->last_seq++; } cmd = t_strdup_printf("USER-KICK-HASH\t%s\t%u\t%u\t%u\t%s\n", orig_src->ip_str, orig_src->port, orig_src->last_seq, username_hash, net_ip2addr(except_ip)); director_update_send_version(dir, src, DIRECTOR_VERSION_USER_KICK, cmd); } static void director_send_user_killed_everywhere(struct director *dir, struct director_host *src, struct director_host *orig_src, unsigned int username_hash) { if (orig_src == NULL) { orig_src = dir->self_host; orig_src->last_seq++; } director_update_send(dir, src, t_strdup_printf( "USER-KILLED-EVERYWHERE\t%s\t%u\t%u\t%u\n", orig_src->ip_str, orig_src->port, orig_src->last_seq, username_hash)); } static void director_user_tag_killed(struct director *dir, struct mail_tag *tag, unsigned int username_hash) { struct user *user; user = user_directory_lookup(tag->users, username_hash); if (user == NULL || !USER_IS_BEING_KILLED(user)) return; switch (user->kill_ctx->kill_state) { case USER_KILL_STATE_KILLING: user->kill_ctx->kill_state = USER_KILL_STATE_KILLING_NOTIFY_RECEIVED; break; case USER_KILL_STATE_KILLED_WAITING_FOR_NOTIFY: director_finish_user_kill(dir, user, TRUE); break; case USER_KILL_STATE_KILLING_NOTIFY_RECEIVED: e_debug(dir->event, "User %u kill_state=%s - ignoring USER-KILLED", username_hash, user_kill_state_names[user->kill_ctx->kill_state]); break; case USER_KILL_STATE_NONE: case USER_KILL_STATE_FLUSHING: case USER_KILL_STATE_DELAY: /* move restarted. state=none can also happen if USER-MOVE was sent while we were still moving. send back USER-KILLED-EVERYWHERE to avoid hangs. */ director_send_user_killed_everywhere(dir, dir->self_host, NULL, username_hash); break; case USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE: director_user_killed_everywhere(dir, dir->self_host, NULL, username_hash); break; } } void director_user_killed(struct director *dir, unsigned int username_hash) { struct mail_tag *tag; array_foreach_elem(mail_hosts_get_tags(dir->mail_hosts), tag) director_user_tag_killed(dir, tag, username_hash); } static void director_user_tag_killed_everywhere(struct director *dir, struct mail_tag *tag, struct director_host *src, struct director_host *orig_src, unsigned int username_hash) { struct user *user; user = user_directory_lookup(tag->users, username_hash); if (user == NULL) { e_debug(dir->event, "User %u no longer exists - ignoring USER-KILLED-EVERYWHERE", username_hash); return; } if (!USER_IS_BEING_KILLED(user)) { e_debug(dir->event, "User %u is no longer being killed - ignoring USER-KILLED-EVERYWHERE", username_hash); return; } if (user->kill_ctx->kill_state != USER_KILL_STATE_KILLED_WAITING_FOR_EVERYONE) { e_debug(dir->event, "User %u kill_state=%s - ignoring USER-KILLED-EVERYWHERE", username_hash, user_kill_state_names[user->kill_ctx->kill_state]); return; } director_flush_user(dir, user); director_send_user_killed_everywhere(dir, src, orig_src, username_hash); } void director_user_killed_everywhere(struct director *dir, struct director_host *src, struct director_host *orig_src, unsigned int username_hash) { struct mail_tag *tag; array_foreach_elem(mail_hosts_get_tags(dir->mail_hosts), tag) { director_user_tag_killed_everywhere(dir, tag, src, orig_src, username_hash); } } static void director_state_callback_timeout(struct director *dir) { timeout_remove(&dir->to_callback); dir->state_change_callback(dir); } void director_set_state_changed(struct director *dir) { /* we may get called to here from various places. use a timeout to make sure the state callback is called with a clean state. */ if (dir->to_callback == NULL) { dir->to_callback = timeout_add(0, director_state_callback_timeout, dir); } } void director_update_send(struct director *dir, struct director_host *src, const char *cmd) { director_update_send_version(dir, src, 0, cmd); } void director_update_send_version(struct director *dir, struct director_host *src, unsigned int min_version, const char *cmd) { struct director_connection *conn; i_assert(src != NULL); array_foreach_elem(&dir->connections, conn) { if (director_connection_get_host(conn) != src && director_connection_get_minor_version(conn) >= min_version) director_connection_send(conn, cmd); } } static void director_user_freed(struct user *user) { if (user->kill_ctx != NULL) { /* director_user_expire is very short. user expired before moving the user finished or timed out. */ if (user->kill_ctx->callback_pending) { /* kill_ctx is used as a callback parameter. only remove the timeout and finish the free later. */ timeout_remove(&user->kill_ctx->to_move); } else { director_user_move_free(user); } } } struct director * director_init(const struct director_settings *set, const struct ip_addr *listen_ip, in_port_t listen_port, director_state_change_callback_t *callback, director_kick_callback_t *kick_callback) { struct director *dir; dir = i_new(struct director, 1); dir->set = set; dir->self_port = listen_port; dir->self_ip = *listen_ip; dir->state_change_callback = callback; dir->kick_callback = kick_callback; dir->event = event_create(NULL); event_add_category(dir->event, &event_category_director); i_array_init(&dir->dir_hosts, 16); i_array_init(&dir->pending_requests, 16); i_array_init(&dir->connections, 8); dir->mail_hosts = mail_hosts_init(dir, set->director_user_expire, director_user_freed); dir->ipc_proxy = ipc_client_init(DIRECTOR_IPC_PROXY_PATH); dir->ring_min_version = DIRECTOR_VERSION_MINOR; return dir; } void director_deinit(struct director **_dir) { struct director *dir = *_dir; struct director_host *const *hostp, *host; struct director_connection *conn, *const *connp; *_dir = NULL; while (array_count(&dir->connections) > 0) { connp = array_front(&dir->connections); conn = *connp; director_connection_deinit(&conn, "Shutting down"); } mail_hosts_deinit(&dir->mail_hosts); mail_hosts_deinit(&dir->orig_config_hosts); ipc_client_deinit(&dir->ipc_proxy); timeout_remove(&dir->to_reconnect); timeout_remove(&dir->to_handshake_warning); timeout_remove(&dir->to_request); timeout_remove(&dir->to_sync); timeout_remove(&dir->to_remove_dirs); timeout_remove(&dir->to_callback); while (array_count(&dir->dir_hosts) > 0) { hostp = array_front(&dir->dir_hosts); host = *hostp; director_host_free(&host); } array_free(&dir->pending_requests); array_free(&dir->dir_hosts); array_free(&dir->connections); event_unref(&dir->event); i_free(dir); } struct director_user_iter { struct director *dir; unsigned int tag_idx; struct user_directory_iter *user_iter; bool iter_until_current_tail; }; struct director_user_iter * director_iterate_users_init(struct director *dir, bool iter_until_current_tail) { struct director_user_iter *iter = i_new(struct director_user_iter, 1); iter->dir = dir; iter->iter_until_current_tail = iter_until_current_tail; return iter; } struct user *director_iterate_users_next(struct director_user_iter *iter) { const ARRAY_TYPE(mail_tag) *tags; struct user *user; i_assert(iter != NULL); if (iter->user_iter == NULL) { tags = mail_hosts_get_tags(iter->dir->mail_hosts); if (iter->tag_idx >= array_count(tags)) return NULL; struct mail_tag *tag = array_idx_elem(tags, iter->tag_idx); iter->user_iter = user_directory_iter_init(tag->users, iter->iter_until_current_tail); } user = user_directory_iter_next(iter->user_iter); if (user == NULL) { user_directory_iter_deinit(&iter->user_iter); iter->tag_idx++; return director_iterate_users_next(iter); } else return user; } void director_iterate_users_deinit(struct director_user_iter **_iter) { i_assert(_iter != NULL && *_iter != NULL); struct director_user_iter *iter = *_iter; *_iter = NULL; if (iter->user_iter != NULL) user_directory_iter_deinit(&iter->user_iter); i_free(iter); } bool director_get_username_hash(struct director *dir, const char *username, unsigned int *hash_r) { const char *error; if (mail_user_hash(username, dir->set->director_username_hash, hash_r, &error)) return TRUE; e_error(dir->event, "Failed to expand director_username_hash=%s: %s", dir->set->director_username_hash, error); return FALSE; } void directors_init(void) { user_move_throttle = log_throttle_init(&director_log_throttle_settings, director_user_move_throttled, NULL); user_kill_fail_throttle = log_throttle_init(&director_log_throttle_settings, director_user_kill_fail_throttled, NULL); } void directors_deinit(void) { log_throttle_deinit(&user_move_throttle); log_throttle_deinit(&user_kill_fail_throttle); }