diff options
Diffstat (limited to 'src/director/director-connection.c')
-rw-r--r-- | src/director/director-connection.c | 2712 |
1 files changed, 2712 insertions, 0 deletions
diff --git a/src/director/director-connection.c b/src/director/director-connection.c new file mode 100644 index 0000000..a89cc2e --- /dev/null +++ b/src/director/director-connection.c @@ -0,0 +1,2712 @@ +/* Copyright (c) 2010-2018 Dovecot authors, see the included COPYING file */ + +/* + Handshaking: + + Incoming director connections send: + + VERSION + ME + <wait for DONE from remote handshake> + DONE + <make this connection our "left" connection, potentially disconnecting + another one> + + Outgoing director connections send: + + VERSION + ME + [0..n] DIRECTOR + HOST-HAND-START + [0..n] HOST + HOST-HAND-END + [0..n] USER + <possibly other non-handshake commands between USERs> + DONE + <wait for DONE from remote> + <make this connection our "right" connection, potentially disconnecting + another one> +*/ + +#include "lib.h" +#include "ioloop.h" +#include "array.h" +#include "net.h" +#include "istream.h" +#include "ostream.h" +#include "str.h" +#include "strescape.h" +#include "time-util.h" +#include "master-service.h" +#include "mail-host.h" +#include "director.h" +#include "director-host.h" +#include "director-request.h" +#include "director-connection.h" + +#include <unistd.h> +#include <sys/time.h> +#include <sys/resource.h> + +#define MAX_INBUF_SIZE 1024 +#define OUTBUF_FLUSH_THRESHOLD (1024*128) +/* Max time to wait for connect() to finish before aborting */ +#define DIRECTOR_CONNECTION_CONNECT_TIMEOUT_MSECS (10*1000) +/* Max idling time before "ME" command must have been received, + or we'll disconnect. */ +#define DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS (10*1000) +/* Max time to wait for USERs in handshake to be sent. With a lot of users the + kernel may quickly eat up everything we send, while the receiver is busy + parsing the data. */ +#define DIRECTOR_CONNECTION_SEND_USERS_TIMEOUT_MSECS (30*1000) +/* Max idling time before "DONE" command must have been received, + or we'll disconnect. Use a slightly larger value than for _SEND_USERS_ so + that we'll get a better error if the sender decides to disconnect. */ +#define DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS (40*1000) +/* How long to wait to send PING when connection is idle */ +#define DIRECTOR_CONNECTION_PING_INTERVAL_MSECS (15*1000) +/* How long to wait before sending PING while waiting for SYNC reply */ +#define DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS 1000 +/* Log a warning if PING reply or PONG response takes longer than this */ +#define DIRECTOR_CONNECTION_PINGPONG_WARN_MSECS (5*1000) +/* If outgoing director connection exists for less than this many seconds, + mark the host as failed so we won't try to reconnect to it immediately */ +#define DIRECTOR_SUCCESS_MIN_CONNECT_SECS 40 +/* If USER request doesn't have a timestamp, user isn't refreshed if it was + already refreshed director_user_expire/4 seconds ago. This value is the + hardcoded maximum for that value. */ +#define DIRECTOR_SKIP_RECENT_REFRESH_MAX_SECS 15 +#define DIRECTOR_RECONNECT_AFTER_WRONG_CONNECT_MSECS 1000 +#define DIRECTOR_WAIT_DISCONNECT_SECS 10 +#define DIRECTOR_HANDSHAKE_WARN_SECS 29 +#define DIRECTOR_HANDSHAKE_BYTES_LOG_MIN_SECS (60*30) +#define DIRECTOR_MAX_SYNC_SEQ_DUPLICATES 4 +/* If we receive SYNCs with a timestamp this many seconds higher than the last + valid received SYNC timestamp, assume that we lost the director's restart + notification and reset the last_sync_seq */ +#define DIRECTOR_SYNC_STALE_TIMESTAMP_RESET_SECS (60*2) +#define DIRECTOR_MAX_CLOCK_DIFF_WARN_SECS 1 +/* How many USER entries to send during handshake before going back to ioloop + to see if there's other work to be done as well. */ +#define DIRECTOR_HANDSHAKE_MAX_USERS_SENT_PER_FLUSH 10000 + +#define CMD_IS_USER_HANDSHAKE(minor_version, args) \ + ((minor_version) < DIRECTOR_VERSION_HANDSHAKE_U_CMD && \ + str_array_length(args) > 2) + +#define DIRECTOR_OPT_CONSISTENT_HASHING "consistent-hashing" + +struct director_connection { + int refcount; + struct director *dir; + struct event *event; + char *name; + struct timeval created, connected_time, me_received_time; + struct timeval connected_user_cpu; + unsigned int minor_version; + + struct timeval last_input, last_output; + size_t peak_bytes_buffered; + + struct timeval ping_sent_time; + size_t ping_sent_buffer_size; + struct timeval ping_sent_user_cpu; + uoff_t ping_sent_input_offset, ping_sent_output_offset; + unsigned int last_ping_msecs; + + /* for incoming connections the director host isn't known until + ME-line is received */ + struct director_host *host; + /* this is set only for wrong connections: */ + struct director_host *connect_request_to; + + int fd; + struct io *io; + struct istream *input; + struct ostream *output; + struct timeout *to_disconnect, *to_ping, *to_pong; + + struct director_user_iter *user_iter; + unsigned int users_received, handshake_users_received; + unsigned int handshake_users_sent; + + /* set during command execution */ + const char *cur_cmd, *const *cur_args; + + bool in:1; + bool connected:1; + bool version_received:1; + bool me_received:1; + bool handshake_received:1; + bool ignore_host_events:1; + bool handshake_sending_hosts:1; + bool ping_waiting:1; + bool synced:1; + bool wrong_host:1; + bool verifying_left:1; + bool users_unsorted:1; + bool connected_user_cpu_set:1; +}; + +static bool director_connection_unref(struct director_connection *conn); +static void director_finish_sending_handshake(struct director_connection *conn); +static void director_connection_disconnected(struct director_connection **conn, + const char *reason); +static void director_connection_reconnect(struct director_connection **conn, + const char *reason); +static void +director_connection_log_disconnect(struct director_connection *conn, int err, + const char *errstr); +static int director_connection_send_done(struct director_connection *conn); + +static void +director_connection_set_name(struct director_connection *conn, const char *name) +{ + char *old_name = conn->name; + conn->name = i_strdup(name); + i_free(old_name); + + event_set_append_log_prefix(conn->event, + t_strdup_printf("director(%s): ", conn->name)); +} + +static void ATTR_FORMAT(2, 3) +director_cmd_error(struct director_connection *conn, const char *fmt, ...) +{ + va_list args; + + va_start(args, fmt); + e_error(conn->event, "Command %s: %s (input: %s)", + conn->cur_cmd, t_strdup_vprintf(fmt, args), + t_strarray_join(conn->cur_args, "\t")); + va_end(args); + + if (conn->host != NULL) + conn->host->last_protocol_failure = ioloop_time; +} + +static void +director_connection_append_stats(struct director_connection *conn, string_t *str) +{ + struct rusage usage; + + str_printfa(str, "bytes in=%"PRIuUOFF_T", bytes out=%"PRIuUOFF_T, + conn->input->v_offset, conn->output->offset); + str_printfa(str, ", %u+%u USERs received", + conn->handshake_users_received, conn->users_received); + if (conn->handshake_users_sent > 0) { + str_printfa(str, ", %u USERs sent in handshake", + conn->handshake_users_sent); + } + if (conn->last_input.tv_sec > 0) { + int input_msecs = timeval_diff_msecs(&ioloop_timeval, + &conn->last_input); + str_printfa(str, ", last input %u.%03u s ago", + input_msecs/1000, input_msecs%1000); + } + if (conn->last_output.tv_sec > 0) { + int output_msecs = timeval_diff_msecs(&ioloop_timeval, + &conn->last_output); + str_printfa(str, ", last output %u.%03u s ago", + output_msecs/1000, output_msecs%1000); + } + if (conn->connected) { + int connected_msecs = timeval_diff_msecs(&ioloop_timeval, + &conn->connected_time); + str_printfa(str, ", connected %u.%03u s ago", + connected_msecs/1000, connected_msecs%1000); + } + if (o_stream_get_buffer_used_size(conn->output) > 0) { + str_printfa(str, ", %zu bytes in output buffer", + o_stream_get_buffer_used_size(conn->output)); + } + str_printfa(str, ", %zu peak output buffer size", + conn->peak_bytes_buffered); + if (conn->connected_user_cpu_set && + getrusage(RUSAGE_SELF, &usage) == 0) { + /* this isn't measuring the CPU usage used by the connection + itself, but it can still be a useful measurement */ + int diff = timeval_diff_msecs(&usage.ru_utime, + &conn->connected_user_cpu); + str_printfa(str, ", %d.%03d CPU secs since connected", + diff / 1000, diff % 1000); + } +} + +static void +director_connection_init_timeout(struct director_connection *conn) +{ + struct timeval start_time; + string_t *reason = t_str_new(128); + + if (!conn->connected) { + start_time = conn->created; + str_append(reason, "Connect timed out"); + } else if (!conn->me_received) { + start_time = conn->connected_time; + str_append(reason, "Handshaking ME timed out"); + } else if (!conn->in) { + start_time = conn->me_received_time; + str_append(reason, "Sending handshake timed out"); + } else { + start_time = conn->me_received_time; + str_append(reason, "Handshaking DONE timed out"); + } + int msecs = timeval_diff_msecs(&ioloop_timeval, &start_time); + str_printfa(reason, " (%u.%03u secs, ", msecs/1000, msecs%1000); + director_connection_append_stats(conn, reason); + str_append_c(reason, ')'); + + e_error(conn->event, "%s", str_c(reason)); + director_connection_disconnected(&conn, "Handshake timeout"); +} + +static void +director_connection_set_ping_timeout(struct director_connection *conn) +{ + unsigned int msecs; + + msecs = conn->synced || !conn->handshake_received ? + DIRECTOR_CONNECTION_PING_INTERVAL_MSECS : + DIRECTOR_CONNECTION_PING_SYNC_INTERVAL_MSECS; + + timeout_remove(&conn->to_ping); + conn->to_ping = timeout_add(msecs, director_connection_ping, conn); +} + +static void director_connection_wait_timeout(struct director_connection *conn) +{ + director_connection_log_disconnect(conn, ETIMEDOUT, ""); + director_connection_deinit(&conn, + "Timeout waiting for disconnect after CONNECT"); +} + +static void director_connection_send_connect(struct director_connection *conn, + struct director_host *host) +{ + const char *connect_str; + + if (conn->to_disconnect != NULL) + return; + + connect_str = t_strdup_printf("CONNECT\t%s\t%u\n", + host->ip_str, host->port); + director_connection_send(conn, connect_str); + o_stream_uncork(conn->output); + + /* wait for a while for the remote to disconnect, so it will hopefully + see our CONNECT command. we'll also log the warning later to avoid + multiple log lines about it. */ + conn->connect_request_to = host; + director_host_ref(conn->connect_request_to); + + conn->to_disconnect = + timeout_add(DIRECTOR_WAIT_DISCONNECT_SECS*1000, + director_connection_wait_timeout, conn); +} + +static void director_connection_assigned(struct director_connection *conn) +{ + struct director *dir = conn->dir; + + if (dir->left != NULL && dir->right != NULL) { + /* we're connected to both directors. see if the ring is + finished by sending a SYNC. if we get it back, it's done. */ + dir->sync_seq++; + director_set_ring_unsynced(dir); + director_sync_send(dir, dir->self_host, dir->sync_seq, + DIRECTOR_VERSION_MINOR, ioloop_time, + mail_hosts_hash(dir->mail_hosts)); + } + director_connection_set_ping_timeout(conn); +} + +static bool director_connection_assign_left(struct director_connection *conn) +{ + struct director *dir = conn->dir; + + i_assert(conn->in); + i_assert(dir->left != conn); + + /* make sure this is the correct incoming connection */ + if (conn->host->self) { + e_error(conn->event, "Connection from self, dropping"); + return FALSE; + } else if (dir->left == NULL) { + /* no conflicts yet */ + } else if (dir->left->host == conn->host) { + e_warning(conn->event, + "Replacing left director connection %s with %s", + dir->left->host->name, conn->host->name); + director_connection_deinit(&dir->left, t_strdup_printf( + "Replacing with %s", conn->host->name)); + } else if (dir->left->verifying_left) { + /* we're waiting to verify if our current left is still + working. if we don't receive a PONG, the current left + gets disconnected and a new left gets assigned. if we do + receive a PONG, we'll wait until the current left + disconnects us and then reassign the new left. */ + return TRUE; + } else if (director_host_cmp_to_self(dir->left->host, conn->host, + dir->self_host) < 0) { + /* the old connection is the correct one. + refer the client there (FIXME: do we ever get here?) */ + director_connection_send_connect(conn, dir->left->host); + return TRUE; + } else { + /* this new connection is the correct one, but wait until the + old connection gets disconnected before using this one. + that guarantees that the director inserting itself into + the ring has finished handshaking its left side, so the + switch will be fast. */ + return TRUE; + } + dir->left = conn; + director_connection_set_name(conn, + t_strdup_printf("%s/left", conn->host->name)); + director_connection_assigned(conn); + return TRUE; +} + +static void director_assign_left(struct director *dir) +{ + struct director_connection *conn; + + array_foreach_elem(&dir->connections, conn) { + if (conn->in && conn->handshake_received && + conn->to_disconnect == NULL && conn != dir->left) { + /* either use this or disconnect it */ + if (!director_connection_assign_left(conn)) { + /* we don't want this */ + director_connection_deinit(&conn, + "Unwanted incoming connection"); + director_assign_left(dir); + break; + } + } + } +} + +static bool director_has_outgoing_connections(struct director *dir) +{ + struct director_connection *conn; + + array_foreach_elem(&dir->connections, conn) { + if (!conn->in && conn->to_disconnect == NULL) + return TRUE; + } + return FALSE; +} + +static void director_send_delayed_syncs(struct director *dir) +{ + struct director_host *host; + + i_assert(dir->right != NULL); + + e_debug(dir->right->event, "Sending delayed SYNCs"); + array_foreach_elem(&dir->dir_hosts, host) { + if (host->delayed_sync_seq == 0) + continue; + + director_sync_send(dir, host, host->delayed_sync_seq, + host->delayed_sync_minor_version, + host->delayed_sync_timestamp, + host->delayed_sync_hosts_hash); + host->delayed_sync_seq = 0; + } +} + +static bool director_connection_assign_right(struct director_connection *conn) +{ + struct director *dir = conn->dir; + + i_assert(!conn->in); + + if (dir->right != NULL) { + /* see if we should disconnect or keep the existing + connection. */ + if (director_host_cmp_to_self(conn->host, dir->right->host, + dir->self_host) <= 0) { + /* the old connection is the correct one */ + e_warning(conn->event, + "Aborting incorrect outgoing connection to %s " + "(already connected to correct one: %s)", + conn->host->name, dir->right->host->name); + conn->wrong_host = TRUE; + return FALSE; + } + e_warning(conn->event, + "Replacing right director connection %s with %s", + dir->right->host->name, conn->host->name); + director_connection_deinit(&dir->right, t_strdup_printf( + "Replacing with %s", conn->host->name)); + } + dir->right = conn; + director_connection_set_name(conn, + t_strdup_printf("%s/right", conn->host->name)); + director_connection_assigned(conn); + director_send_delayed_syncs(dir); + return TRUE; +} + +static bool +director_args_parse_ip_port(struct director_connection *conn, + const char *const *args, + struct ip_addr *ip_r, in_port_t *port_r) +{ + if (args[0] == NULL || args[1] == NULL) { + director_cmd_error(conn, "Missing IP+port parameters"); + return FALSE; + } + if (net_addr2ip(args[0], ip_r) < 0) { + director_cmd_error(conn, "Invalid IP address: %s", args[0]); + return FALSE; + } + if (net_str2port(args[1], port_r) < 0) { + director_cmd_error(conn, "Invalid port: %s", args[1]); + return FALSE; + } + return TRUE; +} + +static bool director_cmd_me(struct director_connection *conn, + const char *const *args) +{ + struct director *dir = conn->dir; + const char *connect_str; + struct ip_addr ip; + in_port_t port; + time_t next_comm_attempt; + + if (!director_args_parse_ip_port(conn, args, &ip, &port)) + return FALSE; + if (conn->me_received) { + director_cmd_error(conn, "Duplicate ME"); + return FALSE; + } + + if (!conn->in && (!net_ip_compare(&conn->host->ip, &ip) || + conn->host->port != port)) { + e_error(conn->event, + "Remote director thinks it's someone else " + "(connected to %s:%u, remote says it's %s:%u)", + conn->host->ip_str, conn->host->port, + net_ip2addr(&ip), port); + return FALSE; + } + conn->me_received = TRUE; + conn->me_received_time = ioloop_timeval; + + if (args[2] != NULL) { + time_t remote_time; + int diff; + + if (str_to_time(args[2], &remote_time) < 0) { + director_cmd_error(conn, "Invalid ME timestamp"); + return FALSE; + } + diff = ioloop_time - remote_time; + if (diff > DIRECTOR_MAX_CLOCK_DIFF_WARN_SECS || + (diff < 0 && -diff > DIRECTOR_MAX_CLOCK_DIFF_WARN_SECS)) { + e_warning(conn->event, + "Director %s clock differs from ours by %d secs", + conn->name, diff); + } + } + + timeout_remove(&conn->to_ping); + if (conn->in) { + conn->to_ping = timeout_add(DIRECTOR_CONNECTION_DONE_TIMEOUT_MSECS, + director_connection_init_timeout, conn); + } else { + conn->to_ping = timeout_add(DIRECTOR_CONNECTION_SEND_USERS_TIMEOUT_MSECS, + director_connection_init_timeout, conn); + } + + if (!conn->in) + return TRUE; + + /* Incoming connection: + + a) we don't have an established ring yet. make sure we're connecting + to our right side (which might become our left side). + + b) it's our current "left" connection. the previous connection + is most likely dead. + + c) we have an existing ring. tell our current "left" to connect to + it with CONNECT command. + + d) the incoming connection doesn't belong to us at all, refer it + elsewhere with CONNECT. however, before disconnecting it verify + first that our left side is actually still functional. + */ + i_assert(conn->host == NULL); + conn->host = director_host_get(dir, &ip, port); + /* the host shouldn't be removed at this point, but if for some + reason it is we don't want to crash */ + conn->host->removed = FALSE; + director_host_ref(conn->host); + /* make sure we don't keep old sequence values across restarts */ + director_host_restarted(conn->host); + + next_comm_attempt = conn->host->last_protocol_failure + + DIRECTOR_PROTOCOL_FAILURE_RETRY_SECS; + if (next_comm_attempt > ioloop_time) { + /* the director recently sent invalid protocol data, + don't try retrying yet */ + e_error(conn->event, + "Remote sent invalid protocol data recently, " + "waiting %u secs before allowing further communication", + (unsigned int)(next_comm_attempt-ioloop_time)); + return FALSE; + } else if (dir->left == NULL) { + /* a) - just in case the left is also our right side reset + its failed state, so we can connect to it */ + conn->host->last_network_failure = 0; + if (!director_has_outgoing_connections(dir)) + director_connect(dir, "Connecting to left"); + } else if (dir->left->host == conn->host) { + /* b) */ + i_assert(dir->left != conn); + director_connection_deinit(&dir->left, + "Replacing with new incoming connection"); + } else if (director_host_cmp_to_self(conn->host, dir->left->host, + dir->self_host) < 0) { + /* c) */ + connect_str = t_strdup_printf("CONNECT\t%s\t%u\n", + conn->host->ip_str, + conn->host->port); + director_connection_send(dir->left, connect_str); + } else { + /* d) */ + dir->left->verifying_left = TRUE; + director_connection_ping(dir->left); + } + return TRUE; +} + +static inline bool +user_need_refresh(struct director *dir, struct user *user, + time_t timestamp, bool unknown_timestamp) +{ + if (timestamp <= (time_t)user->timestamp) { + /* we already have this timestamp */ + return FALSE; + } + if (unknown_timestamp) { + /* Old director sent USER command without timestamp. We don't + know what it is exactly, but we can assume that it's very + close to the current time (which timestamp parameter is + already set to). However, try to break USER loops here when + director ring latency is >1sec, but below skip_recent_secs + by just not refreshing the user. */ + time_t skip_recent_secs = + I_MIN(dir->set->director_user_expire/4, + DIRECTOR_SKIP_RECENT_REFRESH_MAX_SECS); + if ((time_t)user->timestamp + skip_recent_secs >= timestamp) + return FALSE; + } + return TRUE; +} + +static int +director_user_refresh(struct director_connection *conn, + unsigned int username_hash, struct mail_host *host, + time_t timestamp, bool weak, bool *forced_r, + struct user **user_r) +{ + struct director *dir = conn->dir; + struct user *user; + bool ret = FALSE, unset_weak_user = FALSE; + struct user_directory *users = host->tag->users; + bool unknown_timestamp = (timestamp == (time_t)-1); + + *forced_r = FALSE; + + if (unknown_timestamp) { + /* Old director version sent USER without timestamp. */ + timestamp = ioloop_time; + } + + if (timestamp + (time_t)dir->set->director_user_expire <= ioloop_time) { + /* Ignore this refresh entirely, regardless of whether the + user already exists or not. */ + e_debug(conn->event, + "user refresh: %u has expired timestamp %"PRIdTIME_T, + username_hash, timestamp); + return -1; + } + + user = user_directory_lookup(users, username_hash); + if (user == NULL) { + *user_r = user_directory_add(users, username_hash, + host, timestamp); + (*user_r)->weak = weak; + e_debug(conn->event, "user refresh: %u added", username_hash); + return 1; + } + + if (user->weak) { + if (!weak) { + /* removing user's weakness */ + e_debug(conn->event, "user refresh: %u weakness removed", + username_hash); + unset_weak_user = TRUE; + user->weak = FALSE; + ret = TRUE; + } else { + /* weak user marked again as weak */ + } + } else if (weak && + !user_directory_user_is_recently_updated(users, user)) { + /* mark the user as weak */ + e_debug(conn->event, "user refresh: %u set weak", username_hash); + user->weak = TRUE; + ret = TRUE; + } else if (weak) { + e_debug(conn->event, + "user refresh: %u weak update to %s ignored, " + "we recently changed it to %s", + username_hash, host->ip_str, + user->host->ip_str); + host = user->host; + ret = TRUE; + } else if (user->host == host) { + /* update to the same host */ + } else if (user_directory_user_is_near_expiring(users, user)) { + /* host conflict for a user that is already near expiring. we can + assume that the other director had already dropped this user + and we should have as well. use the new host. */ + e_debug(conn->event, "user refresh: %u is nearly expired, " + "replacing host %s with %s", username_hash, + user->host->ip_str, host->ip_str); + ret = TRUE; + } else if (USER_IS_BEING_KILLED(user)) { + /* user is still being moved - ignore conflicting host updates + from other directors who don't yet know about the move. */ + e_debug(conn->event, "user refresh: %u is being moved, " + "preserve its host %s instead of replacing with %s", + username_hash, user->host->ip_str, host->ip_str); + host = user->host; + } else { + /* non-weak user received a non-weak update with + conflicting host. this shouldn't happen. */ + string_t *str = t_str_new(128); + + str_printfa(str, "User hash %u " + "is being redirected to two hosts: %s and %s", + username_hash, user->host->ip_str, host->ip_str); + str_printfa(str, " (old_ts=%ld", (long)user->timestamp); + + if (!conn->handshake_received) { + str_printfa(str, ",handshaking,recv_ts=%ld", + (long)timestamp); + } + if (USER_IS_BEING_KILLED(user)) { + if (user->kill_ctx->to_move != NULL) + str_append(str, ",moving"); + str_printfa(str, ",kill_state=%s", + user_kill_state_names[user->kill_ctx->kill_state]); + } + str_append_c(str, ')'); + e_error(conn->event, "%s", str_c(str)); + + /* we want all the directors to redirect the user to same + server, but we don't want two directors fighting over which + server it belongs to, so always use the lower IP address */ + if (net_ip_cmp(&user->host->ip, &host->ip) > 0) { + /* change the host. we'll also need to remove the user + from the old host's user_count, because we can't + keep track of the user for more than one host. + + send the updated USER back to the sender as well. */ + *forced_r = TRUE; + } else { + /* keep the host */ + host = user->host; + } + /* especially IMAP connections can take a long time to die. + make sure we kill off the connections in the wrong + backends. */ + director_kick_user_hash(dir, dir->self_host, NULL, + username_hash, &host->ip); + ret = TRUE; + } + if (user->host != host) { + user->host->user_count--; + user->host = host; + user->host->user_count++; + ret = TRUE; + } + /* Update user's timestamp if it's higher than the current one. Note + that we'll preserve the original timestamp. This is important when + the director ring is slow and a single USER can traverse through + the ring more than a second. We don't want to get into a loop where + the same USER goes through the ring forever. */ + if (user_need_refresh(dir, user, timestamp, unknown_timestamp)) { + /* NOTE: This makes the users list somewhat out-of-order. + It's not a big problem - most likely it's only a few seconds + difference. The worst that can happen is that some users + take up memory that should have been freed already. */ + e_debug(conn->event, "user refresh: %u refreshed timestamp from %u to %"PRIdTIME_T, + username_hash, user->timestamp, timestamp); + user_directory_refresh(users, user); + user->timestamp = timestamp; + ret = TRUE; + } else { + e_debug(conn->event, "user refresh: %u ignored timestamp %"PRIdTIME_T" (we have %u)", + username_hash, timestamp, user->timestamp); + } + + if (unset_weak_user) { + /* user is no longer weak. handle pending requests for + this user if there are any */ + director_set_state_changed(conn->dir); + } + + *user_r = user; + return ret ? 1 : 0; +} + +static bool +director_handshake_cmd_user(struct director_connection *conn, + const char *const *args) +{ + unsigned int username_hash, timestamp; + struct ip_addr ip; + struct mail_host *host; + struct user *user; + bool weak, forced; + + if (str_array_length(args) < 3 || + str_to_uint(args[0], &username_hash) < 0 || + net_addr2ip(args[1], &ip) < 0 || + str_to_uint(args[2], ×tamp) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + weak = args[3] != NULL && args[3][0] == 'w'; + conn->handshake_users_received++; + + host = mail_host_lookup(conn->dir->mail_hosts, &ip); + if (host == NULL) { + e_error(conn->event, "USER used unknown host %s in handshake", + args[1]); + return FALSE; + } + + if ((time_t)timestamp > ioloop_time) { + /* The other director's clock seems to be into the future + compared to us. Don't set any of our users' timestamps into + future though. It's most likely only 1 second difference. */ + timestamp = ioloop_time; + } + conn->dir->num_incoming_requests++; + if (director_user_refresh(conn, username_hash, host, + timestamp, weak, &forced, &user) < 0) { + /* user expired - ignore */ + return TRUE; + } + /* Possibilities: + + a) The user didn't exist yet, and it was added with the given + timestamp. + + b) The user existed, but with an older timestamp. The timestamp + wasn't yet updated, so do it here below. + + c) The user existed with a newer timestamp. This is either because + we already received a non-handshake USER update for this user, or + our director saw a login for this user. Ignore this update. + + (We never want to change the user's timestamp to be older, because + that could result in directors going to a loop fighting each others + over a flipping timestamp.) */ + if (user->timestamp < timestamp) + user->timestamp = timestamp; + /* always sort users after handshaking to make sure the order + is correct */ + conn->users_unsorted = TRUE; + return TRUE; +} + +static bool +director_cmd_user(struct director_connection *conn, + const char *const *args) +{ + unsigned int username_hash; + struct ip_addr ip; + struct mail_host *host; + struct user *user; + bool forced; + time_t timestamp = (time_t)-1; + + if (str_array_length(args) < 2 || + str_to_uint(args[0], &username_hash) < 0 || + net_addr2ip(args[1], &ip) < 0 || + (args[2] != NULL && str_to_time(args[2], ×tamp) < 0)) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + + /* could this before it's potentially ignored */ + conn->dir->num_incoming_requests++; + + conn->users_received++; + host = mail_host_lookup(conn->dir->mail_hosts, &ip); + if (host == NULL) { + /* we probably just removed this host. */ + return TRUE; + } + + if (director_user_refresh(conn, username_hash, + host, timestamp, FALSE, &forced, &user) > 0) { + /* user changed - forward the USER in the ring */ + struct director_host *src_host = + forced ? conn->dir->self_host : conn->host; + i_assert(!user->weak); + director_update_user(conn->dir, src_host, user); + } + return TRUE; +} + +static bool director_cmd_director(struct director_connection *conn, + const char *const *args) +{ + struct director_host *host; + struct ip_addr ip; + in_port_t port; + bool log_add = FALSE; + + if (!director_args_parse_ip_port(conn, args, &ip, &port)) + return FALSE; + + host = director_host_lookup(conn->dir, &ip, port); + if (host != NULL) { + if (host == conn->dir->self_host) { + /* ignore updates to ourself */ + return TRUE; + } + if (host->removed) { + /* ignore re-adds of removed directors */ + return TRUE; + } + + /* already have this. just reset its last_network_failure + timestamp, since it might be up now, but only if this + isn't part of the handshake. (if it was, reseting the + timestamp could cause us to rapidly keep trying to connect + to it) */ + if (conn->handshake_received) + host->last_network_failure = 0; + /* it also may have been restarted, reset its state */ + director_host_restarted(host); + } else { + /* save the director and forward it */ + host = director_host_add(conn->dir, &ip, port); + log_add = TRUE; + } + /* just forward this to the entire ring until it reaches back to + itself. some hosts may see this twice, but that's the only way to + guarantee that it gets seen by everyone. resetting the host multiple + times may cause us to handle its commands multiple times, but the + commands can handle that. however, we need to also handle a + situation where the added director never comes back - we don't want + to send the director information in a loop forever. */ + if (conn->dir->right != NULL && + director_host_cmp_to_self(host, conn->dir->right->host, + conn->dir->self_host) > 0) { + e_debug(conn->event, + "Received DIRECTOR update for a host where we should be connected to. " + "Not forwarding it since it's probably crashed."); + } else { + director_notify_ring_added(host, + director_connection_get_host(conn), log_add); + } + return TRUE; +} + +static bool director_cmd_director_remove(struct director_connection *conn, + const char *const *args) +{ + struct director_host *host; + struct ip_addr ip; + in_port_t port; + + if (!director_args_parse_ip_port(conn, args, &ip, &port)) + return FALSE; + + host = director_host_lookup(conn->dir, &ip, port); + if (host != NULL && !host->removed) + director_ring_remove(host, director_connection_get_host(conn)); + return TRUE; +} + +static bool +director_cmd_host_hand_start(struct director_connection *conn, + const char *const *args) +{ + const ARRAY_TYPE(mail_host) *hosts; + struct mail_host *const *hostp; + unsigned int remote_ring_completed; + + if (args[0] == NULL || + str_to_uint(args[0], &remote_ring_completed) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + + if (remote_ring_completed != 0 && !conn->dir->ring_handshaked) { + /* clear everything we have and use only what remote sends us */ + e_debug(conn->event, "We're joining a ring - replace all hosts"); + hosts = mail_hosts_get(conn->dir->mail_hosts); + while (array_count(hosts) > 0) { + hostp = array_front(hosts); + director_remove_host(conn->dir, NULL, NULL, *hostp); + } + } else if (remote_ring_completed == 0 && conn->dir->ring_handshaked) { + /* ignore whatever remote sends */ + e_debug(conn->event, "Remote is joining our ring - " + "ignore all remote HOSTs"); + conn->ignore_host_events = TRUE; + } else { + e_debug(conn->event, "Merge rings' hosts"); + } + conn->handshake_sending_hosts = TRUE; + return TRUE; +} + +static int +director_cmd_is_seen_full(struct director_connection *conn, + const char *const **_args, unsigned int *seq_r, + struct director_host **host_r) +{ + const char *const *args = *_args; + struct ip_addr ip; + in_port_t port; + unsigned int seq; + struct director_host *host; + + if (str_array_length(args) < 3 || + net_addr2ip(args[0], &ip) < 0 || + net_str2port(args[1], &port) < 0 || + str_to_uint(args[2], &seq) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return -1; + } + *_args = args + 3; + *seq_r = seq; + + host = director_host_lookup(conn->dir, &ip, port); + if (host == NULL || host->removed) { + /* director is already gone, but we can't be sure if this + command was sent everywhere. re-send it as if it was from + ourself. */ + *host_r = NULL; + } else { + *host_r = host; + if (seq <= host->last_seq) { + /* already seen this */ + return 1; + } + host->last_seq = seq; + } + return 0; +} + +static int +director_cmd_is_seen(struct director_connection *conn, + const char *const **_args, + struct director_host **host_r) +{ + unsigned int seq; + + return director_cmd_is_seen_full(conn, _args, &seq, host_r); +} + +static bool +director_cmd_user_weak(struct director_connection *conn, + const char *const *args) +{ + struct director_host *dir_host; + struct ip_addr ip; + unsigned int username_hash; + struct mail_host *host; + struct user *user; + struct director_host *src_host = conn->host; + bool weak = TRUE, weak_forward = FALSE, forced; + int ret; + + /* note that unlike other commands we don't want to just ignore + duplicate commands */ + if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) < 0) + return FALSE; + + /* could this before it's potentially ignored */ + conn->dir->num_incoming_requests++; + + if (str_array_length(args) != 2 || + str_to_uint(args[0], &username_hash) < 0 || + net_addr2ip(args[1], &ip) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + + host = mail_host_lookup(conn->dir->mail_hosts, &ip); + if (host == NULL) { + /* we probably just removed this host. */ + return TRUE; + } + + if (ret == 0) { + /* First time we're seeing this - forward it to others also. + We'll want to do it even if the user was already marked as + weak, because otherwise if two directors mark the user weak + at the same time both the USER-WEAK notifications reach + only half the directors until they collide and neither one + finishes going through the whole ring marking the user + non-weak. */ + weak_forward = TRUE; + } else if (dir_host == conn->dir->self_host) { + /* We originated this USER-WEAK request. The entire ring has seen + it and there weren't any conflicts. Make the user non-weak. */ + e_debug(conn->event, + "user refresh: %u Our USER-WEAK seen by the entire ring", + username_hash); + src_host = conn->dir->self_host; + weak = FALSE; + } else { + /* The original USER-WEAK sender will send a new non-weak USER + update saying what really happened. We'll still need to forward + this around the ring to the origin so it also knows it has + travelled through the ring. */ + e_debug(conn->event, + "user refresh: %u Remote USER-WEAK from %s seen by the entire ring, ignoring", + username_hash, dir_host->ip_str); + weak_forward = TRUE; + } + + ret = director_user_refresh(conn, username_hash, + host, ioloop_time, weak, &forced, &user); + /* user is refreshed with ioloop_time, it can't be expired already */ + i_assert(ret >= 0); + if (ret > 0 || weak_forward) { + /* user changed, or we've decided that we need to forward + the weakness notification to the rest of the ring even + though we already knew it. */ + if (forced) + src_host = conn->dir->self_host; + if (!user->weak) + director_update_user(conn->dir, src_host, user); + else { + director_update_user_weak(conn->dir, src_host, conn, + dir_host, user); + } + } + return TRUE; +} + +static bool ATTR_NULL(3) +director_cmd_host_int(struct director_connection *conn, const char *const *args, + struct director_host *dir_host) +{ + struct director_host *src_host = conn->host; + struct mail_host *host; + struct ip_addr ip; + const char *tag = "", *host_tag, *hostname = NULL; + unsigned int arg_count, vhost_count; + bool update, down = FALSE, tag_changed = FALSE; + time_t last_updown_change = 0; + + arg_count = str_array_length(args); + if (arg_count < 2 || + net_addr2ip(args[0], &ip) < 0 || + str_to_uint(args[1], &vhost_count) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + if (arg_count >= 3) + tag = args[2]; + if (arg_count >= 4) { + if ((args[3][0] != 'D' && args[3][0] != 'U') || + str_to_time(args[3]+1, &last_updown_change) < 0) { + director_cmd_error(conn, "Invalid updown parameters"); + return FALSE; + } + down = args[3][0] == 'D'; + } + if (arg_count >= 5) + hostname = args[4]; + if (conn->ignore_host_events) { + /* remote is sending hosts in a handshake, but it doesn't have + a completed ring and we do. */ + i_assert(conn->handshake_sending_hosts); + return TRUE; + } + if (tag[0] != '\0' && conn->minor_version < DIRECTOR_VERSION_TAGS_V2) { + director_cmd_error(conn, "Received a host tag from older director version with incompatible tagging support"); + return FALSE; + } + + host = mail_host_lookup(conn->dir->mail_hosts, &ip); + if (host == NULL) { + host = mail_host_add_hostname(conn->dir->mail_hosts, + hostname, &ip, tag); + update = TRUE; + } else { + host_tag = mail_host_get_tag(host); + tag_changed = strcmp(tag, host_tag) != 0; + update = host->vhost_count != vhost_count || + host->down != down || tag_changed; + + if (update && host->desynced) { + string_t *str = t_str_new(128); + + str_printfa(str, "Host %s is being updated before previous update had finished (", + host->ip_str); + if (host->down != down && + host->last_updown_change > last_updown_change) { + /* our host has a newer change. preserve it. */ + down = host->down; + } + if (host->down != down) { + if (host->down) + str_append(str, "down -> up"); + else + str_append(str, "up -> down"); + } + if (host->vhost_count != vhost_count) { + if (host->down != down) + str_append(str, ", "); + str_printfa(str, "vhosts %u -> %u", + host->vhost_count, vhost_count); + } + str_append(str, ") - "); + + vhost_count = I_MIN(vhost_count, host->vhost_count); + str_printfa(str, "setting to state=%s vhosts=%u", + down ? "down" : "up", vhost_count); + e_warning(conn->event, "%s", str_c(str)); + /* make the change appear to come from us, so it + reaches the full ring */ + dir_host = NULL; + src_host = conn->dir->self_host; + } + if (update) { + /* Make sure the host's timestamp never shrinks. + Otherwise we might get into a loop where the up/down + state keeps switching. */ + last_updown_change = I_MAX(last_updown_change, + host->last_updown_change); + } + } + + if (update) { + const char *log_prefix = t_strdup_printf("director(%s): ", + conn->name); + if (tag_changed) { + e_error(conn->event, + "Host %s changed tag from '%s' to '%s'", + host->ip_str, + mail_host_get_tag(host), tag); + mail_host_set_tag(host, tag); + } + mail_host_set_down(host, down, last_updown_change, log_prefix); + mail_host_set_vhost_count(host, vhost_count, log_prefix); + director_update_host(conn->dir, src_host, dir_host, host); + } else { + e_debug(conn->event, + "Ignoring host %s update vhost_count=%u " + "down=%d last_updown_change=%ld (hosts_hash=%u)", + net_ip2addr(&ip), vhost_count, down ? 1 : 0, + (long)last_updown_change, + mail_hosts_hash(conn->dir->mail_hosts)); + } + return TRUE; +} + +static bool +director_cmd_host_handshake(struct director_connection *conn, + const char *const *args) +{ + return director_cmd_host_int(conn, args, NULL); +} + +static bool +director_cmd_host(struct director_connection *conn, const char *const *args) +{ + struct director_host *dir_host; + int ret; + + if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0) + return ret > 0; + return director_cmd_host_int(conn, args, dir_host); +} + +static bool +director_cmd_host_remove(struct director_connection *conn, + const char *const *args) +{ + struct director_host *dir_host; + struct mail_host *host; + struct ip_addr ip; + int ret; + + if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0) + return ret > 0; + + if (str_array_length(args) != 1 || + net_addr2ip(args[0], &ip) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + + host = mail_host_lookup(conn->dir->mail_hosts, &ip); + if (host != NULL) + director_remove_host(conn->dir, conn->host, dir_host, host); + return TRUE; +} + +static bool +director_cmd_host_flush(struct director_connection *conn, + const char *const *args) +{ + struct director_host *dir_host; + struct mail_host *host; + struct ip_addr ip; + int ret; + + if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0) + return ret > 0; + + if (str_array_length(args) != 1 || + net_addr2ip(args[0], &ip) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + + host = mail_host_lookup(conn->dir->mail_hosts, &ip); + if (host != NULL) + director_flush_host(conn->dir, conn->host, dir_host, host); + return TRUE; +} + +static bool +director_cmd_user_move(struct director_connection *conn, + const char *const *args) +{ + struct director_host *dir_host; + struct mail_host *host; + struct ip_addr ip; + unsigned int username_hash; + int ret; + + if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0) + return ret > 0; + + if (str_array_length(args) != 2 || + str_to_uint(args[0], &username_hash) < 0 || + net_addr2ip(args[1], &ip) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + + host = mail_host_lookup(conn->dir->mail_hosts, &ip); + if (host != NULL) { + director_move_user(conn->dir, conn->host, dir_host, + username_hash, host); + } + return TRUE; +} + +static bool +director_cmd_user_kick(struct director_connection *conn, + const char *const *args) +{ + struct director_host *dir_host; + int ret; + + if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0) + return ret > 0; + + if (str_array_length(args) != 1) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + + director_kick_user(conn->dir, conn->host, dir_host, args[0]); + return TRUE; +} + +static bool +director_cmd_user_kick_alt(struct director_connection *conn, + const char *const *args) +{ + struct director_host *dir_host; + int ret; + + if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0) + return ret > 0; + + if (str_array_length(args) != 2) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + + director_kick_user_alt(conn->dir, conn->host, dir_host, args[0], args[1]); + return TRUE; +} + +static bool +director_cmd_user_kick_hash(struct director_connection *conn, + const char *const *args) +{ + struct director_host *dir_host; + unsigned int username_hash; + struct ip_addr except_ip; + int ret; + + if ((ret = director_cmd_is_seen(conn, &args, &dir_host)) != 0) + return ret > 0; + + if (str_array_length(args) != 2 || + str_to_uint(args[0], &username_hash) < 0 || + net_addr2ip(args[1], &except_ip) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + + director_kick_user_hash(conn->dir, conn->host, dir_host, + username_hash, &except_ip); + return TRUE; +} + +static bool +director_cmd_user_killed(struct director_connection *conn, + const char *const *args) +{ + unsigned int username_hash; + + if (str_array_length(args) != 1 || + str_to_uint(args[0], &username_hash) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + + director_user_killed(conn->dir, username_hash); + return TRUE; +} + +static bool +director_cmd_user_killed_everywhere(struct director_connection *conn, + const char *const *args) +{ + struct director_host *dir_host; + unsigned int seq, username_hash; + int ret; + + if ((ret = director_cmd_is_seen_full(conn, &args, &seq, &dir_host)) < 0) + return FALSE; + + if (str_array_length(args) != 1 || + str_to_uint(args[0], &username_hash) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + + if (ret > 0) { + i_assert(dir_host != NULL); + e_debug(conn->event, + "User %u - ignoring already seen USER-KILLED-EVERYWHERE " + "with seq=%u <= %s.last_seq=%u", username_hash, + seq, dir_host->name, dir_host->last_seq); + return TRUE; + } + + director_user_killed_everywhere(conn->dir, conn->host, + dir_host, username_hash); + return TRUE; +} + +static bool director_handshake_cmd_done(struct director_connection *conn) +{ + struct director *dir = conn->dir; + int handshake_msecs = timeval_diff_msecs(&ioloop_timeval, &conn->connected_time); + string_t *str; + + if (conn->users_unsorted && conn->user_iter == NULL) { + /* we sent our user list before receiving remote's */ + conn->users_unsorted = FALSE; + mail_hosts_sort_users(conn->dir->mail_hosts); + } + + str = t_str_new(128); + str_printfa(str, "Handshake finished in %u.%03u secs (", + handshake_msecs/1000, handshake_msecs%1000); + director_connection_append_stats(conn, str); + str_append_c(str, ')'); + if (handshake_msecs >= DIRECTOR_HANDSHAKE_WARN_SECS*1000) + e_warning(conn->event, "%s", str_c(str)); + else + e_info(conn->event, "%s", str_c(str)); + + /* the host is up now, make sure we can connect to it immediately + if needed */ + conn->host->last_network_failure = 0; + + conn->handshake_received = TRUE; + if (conn->in) { + /* handshaked to left side. tell it we've received the + whole handshake. */ + director_connection_send(conn, "DONE\n"); + + /* tell the "right" director about the "left" one */ + director_update_send(dir, director_connection_get_host(conn), + t_strdup_printf("DIRECTOR\t%s\t%u\n", + conn->host->ip_str, + conn->host->port)); + /* this is our "left" side. */ + return director_connection_assign_left(conn); + } else { + /* handshaked to "right" side. */ + return director_connection_assign_right(conn); + } +} + +static int +director_handshake_cmd_options(struct director_connection *conn, + const char *const *args) +{ + bool consistent_hashing = FALSE; + unsigned int i; + + for (i = 0; args[i] != NULL; i++) { + if (strcmp(args[i], DIRECTOR_OPT_CONSISTENT_HASHING) == 0) + consistent_hashing = TRUE; + } + if (!consistent_hashing) { + e_error(conn->event, "director_consistent_hashing settings " + "differ between directors. Set " + "director_consistent_hashing=yes on old directors"); + return -1; + } + return 1; +} + +static int +director_connection_handle_handshake(struct director_connection *conn, + const char *cmd, const char *const *args) +{ + unsigned int major_version; + + /* both incoming and outgoing connections get VERSION and ME */ + if (strcmp(cmd, "VERSION") == 0 && str_array_length(args) >= 3) { + if (strcmp(args[0], DIRECTOR_VERSION_NAME) != 0) { + e_error(conn->event, "Wrong protocol in socket " + "(%s vs %s)", + args[0], DIRECTOR_VERSION_NAME); + return -1; + } else if (str_to_uint(args[1], &major_version) < 0 || + str_to_uint(args[2], &conn->minor_version) < 0) { + e_error(conn->event, "Invalid protocol version: " + "%s.%s", args[1], args[2]); + return -1; + } else if (major_version != DIRECTOR_VERSION_MAJOR) { + e_error(conn->event, "Incompatible protocol version: " + "%u vs %u", major_version, + DIRECTOR_VERSION_MAJOR); + return -1; + } + if (conn->minor_version < DIRECTOR_VERSION_TAGS_V2 && + mail_hosts_have_tags(conn->dir->mail_hosts)) { + e_error(conn->event, "Director version supports incompatible tags"); + return -1; + } + conn->version_received = TRUE; + director_finish_sending_handshake(conn); + return 1; + } + if (!conn->version_received) { + director_cmd_error(conn, "Incompatible protocol"); + return -1; + } + + if (strcmp(cmd, "ME") == 0) + return director_cmd_me(conn, args) ? 1 : -1; + if (!conn->me_received) { + director_cmd_error(conn, "Expecting ME command first"); + return -1; + } + + /* incoming connections get a HOST list */ + if (conn->handshake_sending_hosts) { + if (strcmp(cmd, "HOST") == 0) + return director_cmd_host_handshake(conn, args) ? 1 : -1; + if (strcmp(cmd, "HOST-HAND-END") == 0) { + conn->ignore_host_events = FALSE; + conn->handshake_sending_hosts = FALSE; + return 1; + } + director_cmd_error(conn, "Unexpected command during host list"); + return -1; + } + if (strcmp(cmd, "OPTIONS") == 0) + return director_handshake_cmd_options(conn, args); + if (strcmp(cmd, "HOST-HAND-START") == 0) { + if (!conn->in) { + director_cmd_error(conn, + "Host list is only for incoming connections"); + return -1; + } + return director_cmd_host_hand_start(conn, args) ? 1 : -1; + } + + if (conn->in && + (strcmp(cmd, "U") == 0 || + (strcmp(cmd, "USER") == 0 && + CMD_IS_USER_HANDSHAKE(conn->minor_version, args)))) + return director_handshake_cmd_user(conn, args) ? 1 : -1; + + /* both get DONE */ + if (strcmp(cmd, "DONE") == 0) + return director_handshake_cmd_done(conn) ? 1 : -1; + return 0; +} + +static bool +director_connection_sync_host(struct director_connection *conn, + struct director_host *host, + uint32_t seq, unsigned int minor_version, + unsigned int timestamp, unsigned int hosts_hash) +{ + struct director *dir = conn->dir; + + if (minor_version > DIRECTOR_VERSION_MINOR) { + /* we're not up to date */ + minor_version = DIRECTOR_VERSION_MINOR; + } + + if (host->self) { + if (dir->sync_seq != seq) { + /* stale SYNC event */ + return FALSE; + } + /* sync_seq increases when we get disconnected, so we must be + successfully connected to both directions */ + i_assert(dir->left != NULL && dir->right != NULL); + + if (hosts_hash != 0 && + hosts_hash != mail_hosts_hash(conn->dir->mail_hosts)) { + e_error(conn->event, "Hosts unexpectedly changed during SYNC reply - resending" + "(seq=%u, old hosts_hash=%u, new hosts_hash=%u)", + seq, hosts_hash, + mail_hosts_hash(dir->mail_hosts)); + (void)director_resend_sync(dir); + return FALSE; + } + + dir->ring_min_version = minor_version; + if (!dir->ring_handshaked) { + /* the ring is handshaked */ + director_set_ring_handshaked(dir); + } else if (dir->ring_synced) { + /* duplicate SYNC (which was sent just in case the + previous one got lost) */ + } else { + e_debug(conn->event, "Ring is synced (%s sent seq=%u, hosts_hash=%u)", + conn->name, seq, + mail_hosts_hash(dir->mail_hosts)); + int sync_msecs = + timeval_diff_msecs(&ioloop_timeval, &dir->last_sync_start_time); + if (sync_msecs >= 0) + dir->last_sync_msecs = sync_msecs; + director_set_ring_synced(dir); + } + } else { + if (seq < host->last_sync_seq && + timestamp < host->last_sync_timestamp + + DIRECTOR_SYNC_STALE_TIMESTAMP_RESET_SECS) { + /* stale SYNC event */ + e_debug(conn->event, "Ignore stale SYNC event for %s " + "(seq %u < %u, timestamp=%u)", + host->name, seq, host->last_sync_seq, + timestamp); + return FALSE; + } else if (seq < host->last_sync_seq) { + e_warning(conn->event, + "Last SYNC seq for %s appears to be stale, resetting " + "(seq=%u, timestamp=%u -> seq=%u, timestamp=%u)", + host->name, host->last_sync_seq, + host->last_sync_timestamp, seq, timestamp); + host->last_sync_seq = seq; + host->last_sync_timestamp = timestamp; + host->last_sync_seq_counter = 1; + } else if (seq > host->last_sync_seq || + timestamp > host->last_sync_timestamp) { + host->last_sync_seq = seq; + host->last_sync_timestamp = timestamp; + host->last_sync_seq_counter = 1; + e_debug(conn->event, "Update SYNC for %s " + "(seq=%u, timestamp=%u -> seq=%u, timestamp=%u)", + host->name, host->last_sync_seq, + host->last_sync_timestamp, seq, timestamp); + } else if (++host->last_sync_seq_counter > + DIRECTOR_MAX_SYNC_SEQ_DUPLICATES) { + /* we've received this too many times already */ + e_debug(conn->event, "Ignore duplicate #%u SYNC event for %s " + "(seq=%u, timestamp %u <= %u)", + host->last_sync_seq_counter, host->name, seq, + timestamp, host->last_sync_timestamp); + return FALSE; + } + + if (hosts_hash != 0 && + hosts_hash != mail_hosts_hash(conn->dir->mail_hosts)) { + if (host->desynced_hosts_hash != hosts_hash) { + e_debug(conn->event, "Ignore director %s stale SYNC request whose hosts don't match us " + "(seq=%u, remote hosts_hash=%u, my hosts_hash=%u)", + host->ip_str, seq, hosts_hash, + mail_hosts_hash(dir->mail_hosts)); + host->desynced_hosts_hash = hosts_hash; + return FALSE; + } + /* we'll get here only if we received a SYNC twice + with the same wrong hosts_hash. FIXME: this gets + triggered unnecessarily sometimes if hosts are + changing rapidly. */ + e_error(conn->event, "Director %s SYNC request hosts don't match us - resending hosts " + "(seq=%u, remote hosts_hash=%u, my hosts_hash=%u)", + host->ip_str, seq, + hosts_hash, mail_hosts_hash(dir->mail_hosts)); + director_resend_hosts(dir); + return FALSE; + } + host->desynced_hosts_hash = 0; + if (dir->right != NULL) { + /* forward it to the connection on right */ + director_sync_send(dir, host, seq, minor_version, + timestamp, hosts_hash); + } else { + e_debug(conn->event, "We have no right connection - " + "delay replying to SYNC until finished"); + host->delayed_sync_seq = seq; + host->delayed_sync_minor_version = minor_version; + host->delayed_sync_timestamp = timestamp; + host->delayed_sync_hosts_hash = hosts_hash; + } + } + return TRUE; +} + +static bool director_connection_sync(struct director_connection *conn, + const char *const *args) +{ + struct director *dir = conn->dir; + struct director_host *host; + struct ip_addr ip; + in_port_t port; + unsigned int arg_count, seq, minor_version = 0, timestamp = ioloop_time; + unsigned int hosts_hash = 0; + + arg_count = str_array_length(args); + if (arg_count < 3 || + !director_args_parse_ip_port(conn, args, &ip, &port) || + str_to_uint(args[2], &seq) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + if (arg_count >= 4 && str_to_uint(args[3], &minor_version) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + if (arg_count >= 5 && str_to_uint(args[4], ×tamp) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + if (arg_count >= 6 && str_to_uint(args[5], &hosts_hash) < 0) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + + /* find the originating director. if we don't see it, it was already + removed and we can ignore this sync. */ + host = director_host_lookup(dir, &ip, port); + if (host != NULL) { + if (!director_connection_sync_host(conn, host, seq, + minor_version, timestamp, + hosts_hash)) + return TRUE; + } + + /* If directors got disconnected while we were waiting a SYNC reply, + it might have gotten lost. If we've received a DIRECTOR update since + the last time we sent a SYNC, retry sending it here to make sure + it doesn't get stuck. We don't want to do this too eagerly because + it may trigger desynced_hosts_hash != hosts_hash mismatch, which + causes unnecessary error logging and hosts-resending. */ + if ((host == NULL || !host->self) && + dir->last_sync_sent_ring_change_counter != dir->ring_change_counter && + (time_t)dir->self_host->last_sync_timestamp != ioloop_time) + (void)director_resend_sync(dir); + return TRUE; +} + +static void director_disconnect_timeout(struct director_connection *conn) +{ + director_connection_deinit(&conn, "CONNECT requested"); +} + +static void +director_reconnect_after_wrong_connect_timeout(struct director_connection *conn) +{ + struct director *dir = conn->dir; + + director_connection_deinit(&conn, "Wrong CONNECT requested"); + if (dir->right == NULL) + director_connect(dir, "Reconnecting after wrong CONNECT request"); +} + +static void +director_reconnect_after_wrong_connect(struct director_connection *conn) +{ + if (conn->to_disconnect != NULL) + return; + conn->to_disconnect = + timeout_add_short(DIRECTOR_RECONNECT_AFTER_WRONG_CONNECT_MSECS, + director_reconnect_after_wrong_connect_timeout, conn); +} + +static bool director_cmd_connect(struct director_connection *conn, + const char *const *args) +{ + struct director *dir = conn->dir; + struct director_host *host; + struct ip_addr ip; + in_port_t port; + const char *right_state; + + if (str_array_length(args) != 2 || + !director_args_parse_ip_port(conn, args, &ip, &port)) { + director_cmd_error(conn, "Invalid parameters"); + return FALSE; + } + + host = director_host_get(conn->dir, &ip, port); + + /* remote suggests us to connect elsewhere */ + if (dir->right != NULL && + director_host_cmp_to_self(host, dir->right->host, + dir->self_host) <= 0) { + /* the old connection is the correct one */ + e_debug(conn->event, "Ignoring CONNECT request to %s (current right is %s)", + host->name, dir->right->name); + director_reconnect_after_wrong_connect(conn); + return TRUE; + } + if (host->removed) { + e_debug(conn->event, "Ignoring CONNECT request to %s (director is removed)", + host->name); + director_reconnect_after_wrong_connect(conn); + return TRUE; + } + + /* reset failure timestamp so we'll actually try to connect there. */ + host->last_network_failure = 0; + /* reset removed-flag, so we don't crash */ + host->removed = FALSE; + + if (dir->right == NULL) { + right_state = "initializing right"; + } else { + right_state = t_strdup_printf("replacing current right %s", + dir->right->name); + /* disconnect from right side immediately - it's not accepting + any further commands from us. */ + if (conn->dir->right != conn) + director_connection_deinit(&conn->dir->right, "CONNECT requested"); + else if (conn->to_disconnect == NULL) { + conn->to_disconnect = + timeout_add_short(0, director_disconnect_timeout, conn); + } + } + + /* connect here */ + (void)director_connect_host(dir, host, t_strdup_printf( + "Received CONNECT request from %s - %s", + conn->name, right_state)); + return TRUE; +} + +static void director_disconnect_wrong_lefts(struct director *dir) +{ + struct director_connection *conn; + + array_foreach_elem(&dir->connections, conn) { + if (conn->in && conn != dir->left && conn->me_received && + conn->to_disconnect == NULL && + director_host_cmp_to_self(dir->left->host, conn->host, + dir->self_host) < 0) + director_connection_send_connect(conn, dir->left->host); + } +} + +static bool director_cmd_ping(struct director_connection *conn, + const char *const *args) +{ + time_t sent_time; + uintmax_t send_buffer_size; + + if (str_array_length(args) >= 2 && + str_to_time(args[0], &sent_time) == 0 && + str_to_uintmax(args[1], &send_buffer_size) == 0) { + int diff_secs = ioloop_time - sent_time; + if (diff_secs*1000+500 > DIRECTOR_CONNECTION_PINGPONG_WARN_MSECS) { + e_warning(conn->event, + "PING response took %d secs to receive " + "(send buffer was %ju bytes)", + diff_secs, send_buffer_size); + } + } + director_connection_send(conn, + t_strdup_printf("PONG\t%"PRIdTIME_T"\t%zu\n", + ioloop_time, o_stream_get_buffer_used_size(conn->output))); + return TRUE; +} + +static void +director_ping_append_extra(struct director_connection *conn, string_t *str, + time_t pong_sent_time, + uintmax_t pong_send_buffer_size) +{ + struct rusage usage; + + str_printfa(str, "buffer size at PING was %zu bytes", conn->ping_sent_buffer_size); + if (pong_sent_time != 0) { + str_printfa(str, ", remote sent it %"PRIdTIME_T" secs ago", + ioloop_time - pong_sent_time); + } + if (pong_send_buffer_size != (uintmax_t)-1) { + str_printfa(str, ", remote buffer size at PONG was %ju bytes", + pong_send_buffer_size); + } + if (conn->ping_sent_user_cpu.tv_sec != (time_t)-1 && + getrusage(RUSAGE_SELF, &usage) == 0) { + int diff = timeval_diff_msecs(&usage.ru_utime, + &conn->ping_sent_user_cpu); + str_printfa(str, ", %u.%03u CPU secs since PING was sent", + diff/1000, diff%1000); + } + str_printfa(str, ", %"PRIuUOFF_T" bytes input", + conn->input->v_offset - conn->ping_sent_input_offset); + str_printfa(str, ", %"PRIuUOFF_T" bytes output", + conn->output->offset - conn->ping_sent_output_offset); +} + +static bool director_cmd_pong(struct director_connection *conn, + const char *const *args) +{ + time_t sent_time; + uintmax_t send_buffer_size; + + if (!conn->ping_waiting) + return TRUE; + conn->ping_waiting = FALSE; + timeout_remove(&conn->to_pong); + + if (str_array_length(args) < 2 || + str_to_time(args[0], &sent_time) < 0 || + str_to_uintmax(args[1], &send_buffer_size) < 0) { + sent_time = 0; + send_buffer_size = (uintmax_t)-1; + } + + int ping_msecs = timeval_diff_msecs(&ioloop_timeval, &conn->ping_sent_time); + if (ping_msecs >= 0) { + if (ping_msecs > DIRECTOR_CONNECTION_PINGPONG_WARN_MSECS) { + string_t *extra = t_str_new(128); + director_ping_append_extra(conn, extra, sent_time, send_buffer_size); + e_warning(conn->event, + "PONG response took %u.%03u secs (%s)", + ping_msecs/1000, ping_msecs%1000, + str_c(extra)); + } + conn->last_ping_msecs = ping_msecs; + } + + if (conn->verifying_left) { + conn->verifying_left = FALSE; + if (conn == conn->dir->left) { + /* our left side is functional. tell all the wrong + incoming connections to connect to it instead. */ + director_disconnect_wrong_lefts(conn->dir); + } + } + + director_connection_set_ping_timeout(conn); + return TRUE; +} + +static bool +director_connection_handle_cmd(struct director_connection *conn, + const char *cmd, const char *const *args) +{ + int ret; + + if (!conn->handshake_received) { + ret = director_connection_handle_handshake(conn, cmd, args); + if (ret > 0) + return TRUE; + if (ret < 0) { + /* invalid commands during handshake, + we probably don't want to reconnect here */ + return FALSE; + } + /* allow also other commands during handshake */ + } + + if (strcmp(cmd, "PING") == 0) + return director_cmd_ping(conn, args); + if (strcmp(cmd, "PONG") == 0) + return director_cmd_pong(conn, args); + if (strcmp(cmd, "USER") == 0) + return director_cmd_user(conn, args); + if (strcmp(cmd, "USER-WEAK") == 0) + return director_cmd_user_weak(conn, args); + if (strcmp(cmd, "HOST") == 0) + return director_cmd_host(conn, args); + if (strcmp(cmd, "HOST-REMOVE") == 0) + return director_cmd_host_remove(conn, args); + if (strcmp(cmd, "HOST-FLUSH") == 0) + return director_cmd_host_flush(conn, args); + if (strcmp(cmd, "USER-MOVE") == 0) + return director_cmd_user_move(conn, args); + if (strcmp(cmd, "USER-KICK") == 0) + return director_cmd_user_kick(conn, args); + if (strcmp(cmd, "USER-KICK-ALT") == 0) + return director_cmd_user_kick_alt(conn, args); + if (strcmp(cmd, "USER-KICK-HASH") == 0) + return director_cmd_user_kick_hash(conn, args); + if (strcmp(cmd, "USER-KILLED") == 0) + return director_cmd_user_killed(conn, args); + if (strcmp(cmd, "USER-KILLED-EVERYWHERE") == 0) + return director_cmd_user_killed_everywhere(conn, args); + if (strcmp(cmd, "DIRECTOR") == 0) + return director_cmd_director(conn, args); + if (strcmp(cmd, "DIRECTOR-REMOVE") == 0) + return director_cmd_director_remove(conn, args); + if (strcmp(cmd, "SYNC") == 0) + return director_connection_sync(conn, args); + if (strcmp(cmd, "CONNECT") == 0) + return director_cmd_connect(conn, args); + if (strcmp(cmd, "QUIT") == 0) { + e_warning(conn->event, + "Director %s disconnected us with reason: %s", + conn->name, t_strarray_join(args, " ")); + return FALSE; + } + + director_cmd_error(conn, "Unknown command %s", cmd); + return FALSE; +} + +static bool +director_connection_handle_line(struct director_connection *conn, + char *line) +{ + const char *cmd, *const *args; + bool ret; + + e_debug(conn->event, "input: %s", line); + + args = t_strsplit_tabescaped_inplace(line); + cmd = args[0]; + if (cmd == NULL) { + director_cmd_error(conn, "Received empty line"); + return FALSE; + } + + conn->cur_cmd = cmd; + conn->cur_args = args; + ret = director_connection_handle_cmd(conn, cmd, args+1); + conn->cur_cmd = NULL; + conn->cur_args = NULL; + return ret; +} + +static void +director_connection_log_disconnect(struct director_connection *conn, int err, + const char *errstr) +{ + string_t *str = t_str_new(128); + + i_assert(conn->connected); + + if (conn->connect_request_to != NULL) { + e_warning(conn->event, + "Director %s tried to connect to us, " + "should use %s instead", + conn->name, conn->connect_request_to->name); + return; + } + + str_printfa(str, "Director %s disconnected: ", conn->name); + str_append(str, "Connection closed"); + if (err != 0 && err != EPIPE) { + errno = err; + if (errstr[0] == '\0') + str_printfa(str, ": %m"); + else + str_printfa(str, ": %s", errstr); + } + + str_append(str, " ("); + director_connection_append_stats(conn, str); + + if (!conn->me_received) + str_append(str, ", handshake ME not received"); + else if (!conn->handshake_received) + str_append(str, ", handshake DONE not received"); + if (conn->synced) + str_append(str, ", synced"); + str_append_c(str, ')'); + e_error(conn->event, "%s", str_c(str)); +} + +static void director_connection_input(struct director_connection *conn) +{ + struct director *dir = conn->dir; + char *line; + uoff_t prev_offset; + bool ret; + + switch (i_stream_read(conn->input)) { + case 0: + return; + case -1: + /* disconnected */ + director_connection_log_disconnect(conn, conn->input->stream_errno, + i_stream_get_error(conn->input)); + director_connection_disconnected(&conn, i_stream_get_error(conn->input)); + return; + case -2: + /* buffer full */ + director_cmd_error(conn, "Director sent us more than %d bytes", + MAX_INBUF_SIZE); + director_connection_reconnect(&conn, "Too long input line"); + return; + } + + if (conn->to_disconnect != NULL) { + /* just read everything the remote sends, and wait for it + to disconnect. we mainly just want the remote to read the + CONNECT we sent it. */ + i_stream_skip(conn->input, i_stream_get_data_size(conn->input)); + return; + } + conn->last_input = ioloop_timeval; + conn->refcount++; + + director_sync_freeze(dir); + prev_offset = conn->input->v_offset; + while ((line = i_stream_next_line(conn->input)) != NULL) { + dir->ring_traffic_input += conn->input->v_offset - prev_offset; + prev_offset = conn->input->v_offset; + + T_BEGIN { + ret = director_connection_handle_line(conn, line); + } T_END; + + if (!ret) { + if (!director_connection_unref(conn)) + break; + director_connection_reconnect(&conn, t_strdup_printf( + "Invalid input: %s", line)); + break; + } + } + director_sync_thaw(dir); + if (conn != NULL) { + if (director_connection_unref(conn)) + timeout_reset(conn->to_ping); + } +} + +static void director_connection_send_directors(struct director_connection *conn) +{ + struct director_host *host; + string_t *str = t_str_new(64); + + array_foreach_elem(&conn->dir->dir_hosts, host) { + if (host->removed) + continue; + + str_truncate(str, 0); + str_printfa(str, "DIRECTOR\t%s\t%u\n", + host->ip_str, host->port); + director_connection_send(conn, str_c(str)); + } +} + +static void +director_connection_send_hosts(struct director_connection *conn) +{ + struct mail_host *host; + bool send_updowns; + string_t *str = t_str_new(128); + + i_assert(conn->version_received); + + send_updowns = conn->minor_version >= DIRECTOR_VERSION_UPDOWN; + + str_printfa(str, "HOST-HAND-START\t%u\n", + conn->dir->ring_handshaked ? 1 : 0); + array_foreach_elem(mail_hosts_get(conn->dir->mail_hosts), host) { + const char *host_tag = mail_host_get_tag(host); + + str_printfa(str, "HOST\t%s\t%u", + host->ip_str, host->vhost_count); + if (host_tag[0] != '\0' || send_updowns) { + str_append_c(str, '\t'); + str_append_tabescaped(str, host_tag); + } + if (send_updowns) { + str_printfa(str, "\t%c%ld\t", host->down ? 'D' : 'U', + (long)host->last_updown_change); + if (host->hostname != NULL) + str_append_tabescaped(str, host->hostname); + } + str_append_c(str, '\n'); + director_connection_send(conn, str_c(str)); + str_truncate(str, 0); + } + str_printfa(str, "HOST-HAND-END\t%u\n", + conn->dir->ring_handshaked ? 1 : 0); + director_connection_send(conn, str_c(str)); +} + +static int director_connection_send_done(struct director_connection *conn) +{ + i_assert(conn->version_received); + + if (conn->minor_version >= DIRECTOR_VERSION_OPTIONS) { + director_connection_send(conn, + "OPTIONS\t"DIRECTOR_OPT_CONSISTENT_HASHING"\n"); + } else { + e_error(conn->event, "Director version is too old for supporting director_consistent_hashing=yes"); + return -1; + } + director_connection_send(conn, "DONE\n"); + return 0; +} + +static int director_connection_send_users(struct director_connection *conn) +{ + struct user *user; + string_t *str = t_str_new(128); + char dec_buf[MAX_INT_STRLEN]; + unsigned int sent_count = 0; + int ret; + + i_assert(conn->version_received); + + /* with new versions use "U" for sending the handshake users, because + otherwise their parameters may look identical and can't be + distinguished. */ + if (director_connection_get_minor_version(conn) >= DIRECTOR_VERSION_HANDSHAKE_U_CMD) + str_append(str, "U\t"); + else + str_append(str, "USER\t"); + size_t cmd_prefix_len = str_len(str); + while ((user = director_iterate_users_next(conn->user_iter)) != NULL) { + str_truncate(str, cmd_prefix_len); + str_append(str, dec2str_buf(dec_buf, user->username_hash)); + str_append_c(str, '\t'); + str_append(str, user->host->ip_str); + str_append_c(str, '\t'); + str_append(str, dec2str_buf(dec_buf, user->timestamp)); + if (user->weak) + str_append(str, "\tw"); + str_append_c(str, '\n'); + + conn->handshake_users_sent++; + director_connection_send(conn, str_c(str)); + if (++sent_count >= DIRECTOR_HANDSHAKE_MAX_USERS_SENT_PER_FLUSH) { + /* Don't send too much at once to avoid hangs */ + timeout_reset(conn->to_ping); + return 0; + } + + if (o_stream_get_buffer_used_size(conn->output) >= OUTBUF_FLUSH_THRESHOLD) { + if ((ret = o_stream_flush(conn->output)) <= 0) { + /* continue later */ + timeout_reset(conn->to_ping); + return ret; + } + } + } + director_iterate_users_deinit(&conn->user_iter); + if (director_connection_send_done(conn) < 0) + return -1; + + if (conn->users_unsorted && conn->handshake_received) { + /* we received remote's list of users before sending ours */ + conn->users_unsorted = FALSE; + mail_hosts_sort_users(conn->dir->mail_hosts); + } + + ret = o_stream_flush(conn->output); + timeout_reset(conn->to_ping); + return ret; +} + +static int director_connection_output(struct director_connection *conn) +{ + int ret; + + conn->last_output = ioloop_timeval; + if (conn->user_iter != NULL) { + /* still handshaking USER list */ + ret = director_connection_send_users(conn); + if (ret < 0) { + director_connection_log_disconnect(conn, conn->output->stream_errno, + o_stream_get_error(conn->output)); + director_connection_disconnected(&conn, + o_stream_get_error(conn->output)); + } else { + o_stream_set_flush_pending(conn->output, TRUE); + } + return ret; + } + return o_stream_flush(conn->output); +} + +static struct director_connection * +director_connection_init_common(struct director *dir, int fd) +{ + struct director_connection *conn; + + conn = i_new(struct director_connection, 1); + conn->refcount = 1; + conn->created = ioloop_timeval; + conn->fd = fd; + conn->dir = dir; + conn->event = event_create(dir->event); + conn->input = i_stream_create_fd(conn->fd, MAX_INBUF_SIZE); + conn->output = o_stream_create_fd(conn->fd, dir->set->director_output_buffer_size); + o_stream_set_no_error_handling(conn->output, TRUE); + array_push_back(&dir->connections, &conn); + return conn; +} + +static void director_connection_send_handshake(struct director_connection *conn) +{ + director_connection_send(conn, t_strdup_printf( + "VERSION\t"DIRECTOR_VERSION_NAME"\t%u\t%u\n" + "ME\t%s\t%u\t%lld\n", + DIRECTOR_VERSION_MAJOR, DIRECTOR_VERSION_MINOR, + net_ip2addr(&conn->dir->self_ip), conn->dir->self_port, + (long long)time(NULL))); +} + +static void director_connection_set_connected(struct director_connection *conn) +{ + struct rusage usage; + + conn->connected = TRUE; + conn->connected_time = ioloop_timeval; + + if (getrusage(RUSAGE_SELF, &usage) == 0) { + conn->connected_user_cpu_set = TRUE; + conn->connected_user_cpu = usage.ru_utime; + } +} + +struct director_connection * +director_connection_init_in(struct director *dir, int fd, + const struct ip_addr *ip) +{ + struct director_connection *conn; + + conn = director_connection_init_common(dir, fd); + conn->in = TRUE; + director_connection_set_connected(conn); + director_connection_set_name(conn, + t_strdup_printf("%s/in", net_ip2addr(ip))); + conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn); + conn->to_ping = timeout_add(DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS, + director_connection_init_timeout, conn); + + e_info(conn->event, "Incoming connection from director %s", conn->name); + director_connection_send_handshake(conn); + return conn; +} + +static void director_connection_connected(struct director_connection *conn) +{ + int err; + + if ((err = net_geterror(conn->fd)) != 0) { + e_error(conn->event, "connect() failed: %s", strerror(err)); + director_connection_disconnected(&conn, strerror(err)); + return; + } + director_connection_set_connected(conn); + o_stream_set_flush_callback(conn->output, + director_connection_output, conn); + + io_remove(&conn->io); + conn->io = io_add(conn->fd, IO_READ, director_connection_input, conn); + + timeout_remove(&conn->to_ping); + conn->to_ping = timeout_add(DIRECTOR_CONNECTION_ME_TIMEOUT_MSECS, + director_connection_init_timeout, conn); + + o_stream_cork(conn->output); + director_connection_send_handshake(conn); + director_connection_send_directors(conn); + o_stream_uncork(conn->output); + /* send the rest of the handshake after we've received the remote's + version number */ +} + +static void director_finish_sending_handshake(struct director_connection *conn) +{ + if ( + conn->in) { + /* only outgoing connections send hosts & users */ + return; + } + o_stream_cork(conn->output); + director_connection_send_hosts(conn); + + i_assert(conn->user_iter == NULL); + /* Iterate only through users that aren't refreshed since the + iteration started. The refreshed users will already be sent as + regular USER updates, so they don't need to be sent again. + + We especially don't want to send these users again, because + otherwise in a rapidly changing director we might never end up + sending all the users when they constantly keep being added to the + end of the list. (The iteration lists users in order from older to + newer.) */ + conn->user_iter = director_iterate_users_init(conn->dir, TRUE); + + if (director_connection_send_users(conn) == 0) + o_stream_set_flush_pending(conn->output, TRUE); + + o_stream_uncork(conn->output); +} + +struct director_connection * +director_connection_init_out(struct director *dir, int fd, + struct director_host *host) +{ + struct director_connection *conn; + + i_assert(!host->removed); + + /* make sure we don't keep old sequence values across restarts */ + director_host_restarted(host); + + conn = director_connection_init_common(dir, fd); + director_connection_set_name(conn, + t_strdup_printf("%s/out", host->name)); + conn->host = host; + director_host_ref(host); + conn->io = io_add(conn->fd, IO_WRITE, + director_connection_connected, conn); + conn->to_ping = timeout_add(DIRECTOR_CONNECTION_CONNECT_TIMEOUT_MSECS, + director_connection_init_timeout, conn); + return conn; +} + +void director_connection_deinit(struct director_connection **_conn, + const char *remote_reason) +{ + struct director_connection *const *conns, *conn = *_conn; + struct director *dir = conn->dir; + unsigned int i, count; + + *_conn = NULL; + + i_assert(conn->fd != -1); + + if (conn->host != NULL) { + e_debug(conn->event, "Disconnecting from %s: %s", + conn->host->name, remote_reason); + } + if (*remote_reason != '\0' && + conn->minor_version >= DIRECTOR_VERSION_QUIT) { + o_stream_nsend_str(conn->output, t_strdup_printf( + "QUIT\t%s\n", remote_reason)); + } + + conns = array_get(&dir->connections, &count); + for (i = 0; i < count; i++) { + if (conns[i] == conn) { + array_delete(&dir->connections, i, 1); + break; + } + } + i_assert(i < count); + if (dir->left == conn) { + dir->left = NULL; + /* if there is already another handshaked incoming connection, + use it as the new "left" */ + director_assign_left(dir); + } + if (dir->right == conn) + dir->right = NULL; + + if (conn->users_unsorted) { + /* Users were received, but handshake didn't finish. + Finish sorting so the users won't stay in wrong order. */ + mail_hosts_sort_users(conn->dir->mail_hosts); + } + + if (conn->connect_request_to != NULL) { + director_host_unref(conn->connect_request_to); + conn->connect_request_to = NULL; + } + if (conn->user_iter != NULL) + director_iterate_users_deinit(&conn->user_iter); + timeout_remove(&conn->to_disconnect); + timeout_remove(&conn->to_pong); + timeout_remove(&conn->to_ping); + io_remove(&conn->io); + i_stream_close(conn->input); + o_stream_close(conn->output); + i_close_fd(&conn->fd); + + if (conn->in) + master_service_client_connection_destroyed(master_service); + director_connection_unref(conn); + + if (dir->left == NULL || dir->right == NULL) { + /* we aren't synced until we're again connected to a ring */ + dir->sync_seq++; + director_set_ring_unsynced(dir); + } +} + +static bool director_connection_unref(struct director_connection *conn) +{ + i_assert(conn->refcount > 0); + if (--conn->refcount > 0) + return TRUE; + + if (conn->host != NULL) + director_host_unref(conn->host); + i_stream_unref(&conn->input); + o_stream_unref(&conn->output); + event_unref(&conn->event); + i_free(conn->name); + i_free(conn); + return FALSE; +} + +static void director_connection_disconnected(struct director_connection **_conn, + const char *reason) +{ + struct director_connection *conn = *_conn; + struct director *dir = conn->dir; + + if ((conn->connected_time.tv_sec == 0 || + conn->connected_time.tv_sec + DIRECTOR_SUCCESS_MIN_CONNECT_SECS > ioloop_time) && + conn->host != NULL) { + /* connection didn't exist for very long, assume it has a + network problem */ + conn->host->last_network_failure = ioloop_time; + } + + director_connection_deinit(_conn, reason); + if (dir->right == NULL) + director_connect(dir, "Reconnecting after disconnection"); +} + +static void director_connection_reconnect(struct director_connection **_conn, + const char *reason) +{ + struct director_connection *conn = *_conn; + struct director *dir = conn->dir; + + director_connection_deinit(_conn, reason); + if (dir->right == NULL) + director_connect(dir, "Reconnecting after error"); +} + +static void director_disconnect_write_error(struct director_connection *conn) +{ + struct director *dir = conn->dir; + + director_connection_deinit(&conn, "write failure"); + if (dir->right == NULL) + director_connect(dir, "Reconnecting after write failure"); +} + +void director_connection_send(struct director_connection *conn, + const char *data) +{ + size_t len = strlen(data); + off_t ret; + + if (conn->output->closed || !conn->connected) + return; + + if (event_want_debug(conn->event)) T_BEGIN { + const char *const *lines = t_strsplit(data, "\n"); + for (; lines[1] != NULL; lines++) + e_debug(conn->event, "output: %s", *lines); + } T_END; + ret = o_stream_send(conn->output, data, len); + if (ret != (off_t)len) { + if (ret < 0) { + director_connection_log_disconnect(conn, + conn->output->stream_errno, + t_strdup_printf("write() failed: %s", + o_stream_get_error(conn->output))); + } else { + director_connection_log_disconnect(conn, EINVAL, + t_strdup_printf("Output buffer full at %zu", + o_stream_get_buffer_used_size(conn->output))); + } + o_stream_close(conn->output); + /* closing the stream when output buffer is full doesn't cause + disconnection itself. */ + timeout_remove(&conn->to_disconnect); + conn->to_disconnect = + timeout_add_short(0, director_disconnect_write_error, conn); + } else { + conn->dir->ring_traffic_output += len; + conn->last_output = ioloop_timeval; + conn->peak_bytes_buffered = + I_MAX(conn->peak_bytes_buffered, + o_stream_get_buffer_used_size(conn->output)); + } +} + +static void +director_connection_ping_idle_timeout(struct director_connection *conn) +{ + string_t *str = t_str_new(128); + int diff = timeval_diff_msecs(&ioloop_timeval, &conn->ping_sent_time); + + str_printfa(str, "Ping timed out in %u.%03u secs: ", + diff/1000, diff%1000); + director_ping_append_extra(conn, str, 0, (uintmax_t)-1); + director_connection_log_disconnect(conn, EINVAL, str_c(str)); + director_connection_disconnected(&conn, "Ping timeout"); +} + +static void director_connection_pong_timeout(struct director_connection *conn) +{ + int diff = timeval_diff_msecs(&ioloop_timeval, &conn->ping_sent_time); + const char *errstr; + + errstr = t_strdup_printf( + "PONG reply not received in %u.%03u secs, " + "although other input keeps coming", + diff/1000, diff%1000); + director_connection_log_disconnect(conn, EINVAL, errstr); + director_connection_disconnected(&conn, "Pong timeout"); +} + +void director_connection_ping(struct director_connection *conn) +{ + if (conn->ping_waiting) + return; + + timeout_remove(&conn->to_ping); + conn->to_ping = timeout_add(conn->dir->set->director_ping_idle_timeout*1000, + director_connection_ping_idle_timeout, conn); + conn->to_pong = timeout_add(conn->dir->set->director_ping_max_timeout*1000, + director_connection_pong_timeout, conn); + conn->ping_waiting = TRUE; + conn->ping_sent_time = ioloop_timeval; + conn->ping_sent_buffer_size = o_stream_get_buffer_used_size(conn->output); + conn->ping_sent_input_offset = conn->input->v_offset; + conn->ping_sent_output_offset = conn->output->offset; + + struct rusage usage; + if (getrusage(RUSAGE_SELF, &usage) == 0) + conn->ping_sent_user_cpu = usage.ru_utime; + else + conn->ping_sent_user_cpu.tv_sec = (time_t)-1; + /* send it after getting the buffer size */ + director_connection_send(conn, t_strdup_printf( + "PING\t%"PRIdTIME_T"\t%zu\n", ioloop_time, + conn->ping_sent_buffer_size)); +} + +const char *director_connection_get_name(struct director_connection *conn) +{ + return conn->name; +} + +struct director_host * +director_connection_get_host(struct director_connection *conn) +{ + return conn->host; +} + +bool director_connection_is_handshaked(struct director_connection *conn) +{ + return conn->handshake_received; +} + +bool director_connection_is_synced(struct director_connection *conn) +{ + return conn->synced; +} + +bool director_connection_is_incoming(struct director_connection *conn) +{ + return conn->in; +} + +unsigned int +director_connection_get_minor_version(struct director_connection *conn) +{ + return conn->minor_version; +} + +void director_connection_cork(struct director_connection *conn) +{ + o_stream_cork(conn->output); +} + +void director_connection_uncork(struct director_connection *conn) +{ + o_stream_uncork(conn->output); +} + +void director_connection_set_synced(struct director_connection *conn, + bool synced) +{ + if (conn->synced == synced) + return; + conn->synced = synced; + + /* switch ping timeout, unless we're already waiting for PONG */ + if (conn->ping_waiting) + return; + + director_connection_set_ping_timeout(conn); +} + +void director_connection_get_status(struct director_connection *conn, + struct director_connection_status *status_r) +{ + i_zero(status_r); + status_r->bytes_read = conn->input->v_offset; + status_r->bytes_sent = conn->output->offset; + status_r->bytes_buffered = o_stream_get_buffer_used_size(conn->output); + status_r->peak_bytes_buffered = conn->peak_bytes_buffered; + status_r->last_input = conn->last_input; + status_r->last_output = conn->last_output; + status_r->last_ping_msecs = conn->last_ping_msecs; + status_r->handshake_users_sent = conn->handshake_users_sent; + status_r->handshake_users_received = conn->handshake_users_received; +} |