diff options
Diffstat (limited to 'src/auth/auth-worker-server.c')
-rw-r--r-- | src/auth/auth-worker-server.c | 519 |
1 files changed, 519 insertions, 0 deletions
diff --git a/src/auth/auth-worker-server.c b/src/auth/auth-worker-server.c new file mode 100644 index 0000000..41b2b2b --- /dev/null +++ b/src/auth/auth-worker-server.c @@ -0,0 +1,519 @@ +/* Copyright (c) 2005-2018 Dovecot authors, see the included COPYING file */ + +#include "auth-common.h" +#include "ioloop.h" +#include "array.h" +#include "aqueue.h" +#include "net.h" +#include "istream.h" +#include "ostream.h" +#include "hex-binary.h" +#include "str.h" +#include "eacces-error.h" +#include "auth-request.h" +#include "auth-worker-client.h" +#include "auth-worker-server.h" + +#include <unistd.h> + +/* Initial lookup timeout */ +#define AUTH_WORKER_LOOKUP_TIMEOUT_SECS 60 +/* Timeout for multi-line replies, e.g. listing users. This should be a much + higher value, because e.g. doveadm could be doing some long-running commands + for the users. And because of buffering this timeout is for handling + multiple users, not just one. */ +#define AUTH_WORKER_RESUME_TIMEOUT_SECS (30*60) +#define AUTH_WORKER_MAX_IDLE_SECS (60*5) +#define AUTH_WORKER_ABORT_SECS 60 +#define AUTH_WORKER_DELAY_WARN_SECS 3 +#define AUTH_WORKER_DELAY_WARN_MIN_INTERVAL_SECS 300 + +struct auth_worker_request { + unsigned int id; + time_t created; + const char *username; + const char *data; + auth_worker_callback_t *callback; + void *context; +}; + +struct auth_worker_connection { + int fd; + + struct event *event; + struct io *io; + struct istream *input; + struct ostream *output; + struct timeout *to; + + struct auth_worker_request *request; + unsigned int id_counter; + + bool received_error:1; + bool restart:1; + bool shutdown:1; + bool timeout_pending_resume:1; + bool resuming:1; +}; + +static ARRAY(struct auth_worker_connection *) connections = ARRAY_INIT; +static unsigned int idle_count = 0, auth_workers_with_errors = 0; +static ARRAY(struct auth_worker_request *) worker_request_array; +static struct aqueue *worker_request_queue; +static time_t auth_worker_last_warn; +static unsigned int auth_workers_throttle_count; + +static const char *worker_socket_path; + +static void worker_input(struct auth_worker_connection *conn); +static void auth_worker_destroy(struct auth_worker_connection **conn, + const char *reason, bool restart) ATTR_NULL(2); + +static void auth_worker_idle_timeout(struct auth_worker_connection *conn) +{ + i_assert(conn->request == NULL); + + if (idle_count > 1) + auth_worker_destroy(&conn, NULL, FALSE); + else + timeout_reset(conn->to); +} + +static void auth_worker_call_timeout(struct auth_worker_connection *conn) +{ + i_assert(conn->request != NULL); + + auth_worker_destroy(&conn, "Lookup timed out", TRUE); +} + +static bool auth_worker_request_send(struct auth_worker_connection *conn, + struct auth_worker_request *request) +{ + struct const_iovec iov[3]; + unsigned int age_secs = ioloop_time - request->created; + + i_assert(conn->to != NULL); + + if (age_secs >= AUTH_WORKER_ABORT_SECS) { + e_error(conn->event, + "Aborting auth request that was queued for %d secs, " + "%d left in queue", + age_secs, aqueue_count(worker_request_queue)); + request->callback(conn, t_strdup_printf( + "FAIL\t%d", PASSDB_RESULT_INTERNAL_FAILURE), + request->context); + return FALSE; + } + if (age_secs >= AUTH_WORKER_DELAY_WARN_SECS && + ioloop_time - auth_worker_last_warn > + AUTH_WORKER_DELAY_WARN_MIN_INTERVAL_SECS) { + auth_worker_last_warn = ioloop_time; + e_error(conn->event, "Auth request was queued for %d " + "seconds, %d left in queue " + "(see auth_worker_max_count)", + age_secs, aqueue_count(worker_request_queue)); + } + + request->id = ++conn->id_counter; + + iov[0].iov_base = t_strdup_printf("%d\t", request->id); + iov[0].iov_len = strlen(iov[0].iov_base); + iov[1].iov_base = request->data; + iov[1].iov_len = strlen(request->data); + iov[2].iov_base = "\n"; + iov[2].iov_len = 1; + + o_stream_nsendv(conn->output, iov, 3); + + i_assert(conn->request == NULL); + conn->request = request; + + timeout_remove(&conn->to); + conn->to = timeout_add(AUTH_WORKER_LOOKUP_TIMEOUT_SECS * 1000, + auth_worker_call_timeout, conn); + idle_count--; + return TRUE; +} + +static void auth_worker_request_send_next(struct auth_worker_connection *conn) +{ + struct auth_worker_request *request; + + do { + if (aqueue_count(worker_request_queue) == 0) + return; + + request = array_idx_elem(&worker_request_array, + aqueue_idx(worker_request_queue, 0)); + aqueue_delete_tail(worker_request_queue); + } while (!auth_worker_request_send(conn, request)); +} + +static void auth_worker_send_handshake(struct auth_worker_connection *conn) +{ + string_t *str; + unsigned char passdb_md5[MD5_RESULTLEN]; + unsigned char userdb_md5[MD5_RESULTLEN]; + + str = t_str_new(128); + str_printfa(str, "VERSION\tauth-worker\t%u\t%u\n", + AUTH_WORKER_PROTOCOL_MAJOR_VERSION, + AUTH_WORKER_PROTOCOL_MINOR_VERSION); + + passdbs_generate_md5(passdb_md5); + userdbs_generate_md5(userdb_md5); + str_append(str, "DBHASH\t"); + binary_to_hex_append(str, passdb_md5, sizeof(passdb_md5)); + str_append_c(str, '\t'); + binary_to_hex_append(str, userdb_md5, sizeof(userdb_md5)); + str_append_c(str, '\n'); + + o_stream_nsend(conn->output, str_data(str), str_len(str)); +} + +static struct auth_worker_connection *auth_worker_create(void) +{ + struct auth_worker_connection *conn; + struct event *event; + int fd; + + if (array_count(&connections) >= auth_workers_throttle_count) + return NULL; + + event = event_create(auth_event); + event_set_append_log_prefix(event, "auth-worker: "); + + fd = net_connect_unix_with_retries(worker_socket_path, 5000); + if (fd == -1) { + if (errno == EACCES) { + e_error(event, "%s", + eacces_error_get("net_connect_unix", + worker_socket_path)); + } else { + e_error(event, "net_connect_unix(%s) failed: %m", + worker_socket_path); + } + event_unref(&event); + return NULL; + } + + conn = i_new(struct auth_worker_connection, 1); + conn->fd = fd; + conn->input = i_stream_create_fd(fd, AUTH_WORKER_MAX_LINE_LENGTH); + conn->output = o_stream_create_fd(fd, SIZE_MAX); + o_stream_set_no_error_handling(conn->output, TRUE); + conn->io = io_add(fd, IO_READ, worker_input, conn); + conn->to = timeout_add(AUTH_WORKER_MAX_IDLE_SECS * 1000, + auth_worker_idle_timeout, conn); + conn->event = event; + auth_worker_send_handshake(conn); + + idle_count++; + array_push_back(&connections, &conn); + return conn; +} + +static void auth_worker_destroy(struct auth_worker_connection **_conn, + const char *reason, bool restart) +{ + struct auth_worker_connection *conn = *_conn; + struct auth_worker_connection *const *conns; + unsigned int idx; + + *_conn = NULL; + + if (conn->received_error) { + i_assert(auth_workers_with_errors > 0); + i_assert(auth_workers_with_errors <= array_count(&connections)); + auth_workers_with_errors--; + } + + array_foreach(&connections, conns) { + if (*conns == conn) { + idx = array_foreach_idx(&connections, conns); + array_delete(&connections, idx, 1); + break; + } + } + + if (conn->request == NULL) + idle_count--; + + if (conn->request != NULL) { + e_error(conn->event, "Aborted %s request for %s: %s", + t_strcut(conn->request->data, '\t'), + conn->request->username, reason); + conn->request->callback(conn, t_strdup_printf( + "FAIL\t%d", PASSDB_RESULT_INTERNAL_FAILURE), + conn->request->context); + } + + io_remove(&conn->io); + i_stream_destroy(&conn->input); + o_stream_destroy(&conn->output); + timeout_remove(&conn->to); + + if (close(conn->fd) < 0) + e_error(conn->event, "close() failed: %m"); + event_unref(&conn->event); + i_free(conn); + + if (idle_count == 0 && restart) { + conn = auth_worker_create(); + if (conn != NULL) + auth_worker_request_send_next(conn); + } +} + +static struct auth_worker_connection *auth_worker_find_free(void) +{ + struct auth_worker_connection *conn; + + if (idle_count == 0) + return NULL; + + array_foreach_elem(&connections, conn) { + if (conn->request == NULL) + return conn; + } + i_unreached(); + return NULL; +} + +static bool auth_worker_request_handle(struct auth_worker_connection *conn, + struct auth_worker_request *request, + const char *line) +{ + if (str_begins(line, "*\t")) { + /* multi-line reply, not finished yet */ + if (conn->resuming) + timeout_reset(conn->to); + else { + conn->resuming = TRUE; + timeout_remove(&conn->to); + conn->to = timeout_add(AUTH_WORKER_RESUME_TIMEOUT_SECS * 1000, + auth_worker_call_timeout, conn); + } + } else { + conn->resuming = FALSE; + conn->request = NULL; + conn->timeout_pending_resume = FALSE; + timeout_remove(&conn->to); + conn->to = timeout_add(AUTH_WORKER_MAX_IDLE_SECS * 1000, + auth_worker_idle_timeout, conn); + idle_count++; + } + + if (!request->callback(conn, line, request->context) && conn->io != NULL) { + conn->timeout_pending_resume = FALSE; + timeout_remove(&conn->to); + io_remove(&conn->io); + return FALSE; + } + return TRUE; +} + +static bool auth_worker_error(struct auth_worker_connection *conn) +{ + if (conn->received_error) + return TRUE; + conn->received_error = TRUE; + auth_workers_with_errors++; + i_assert(auth_workers_with_errors <= array_count(&connections)); + + if (auth_workers_with_errors == 1) { + /* this is the only failing auth worker connection. + don't create new ones until this one sends SUCCESS. */ + auth_workers_throttle_count = array_count(&connections); + return TRUE; + } + + /* too many auth workers, reduce them */ + i_assert(array_count(&connections) > 1); + if (auth_workers_throttle_count >= array_count(&connections)) + auth_workers_throttle_count = array_count(&connections)-1; + else if (auth_workers_throttle_count > 1) + auth_workers_throttle_count--; + auth_worker_destroy(&conn, "Internal auth worker failure", FALSE); + return FALSE; +} + +static void auth_worker_success(struct auth_worker_connection *conn) +{ + unsigned int max_count = global_auth_settings->worker_max_count; + + if (!conn->received_error) + return; + + i_assert(auth_workers_with_errors > 0); + i_assert(auth_workers_with_errors <= array_count(&connections)); + auth_workers_with_errors--; + + if (auth_workers_with_errors == 0) { + /* all workers are succeeding now, set the limit back to + original. */ + auth_workers_throttle_count = max_count; + } else if (auth_workers_throttle_count < max_count) + auth_workers_throttle_count++; + conn->received_error = FALSE; +} + +static void worker_input(struct auth_worker_connection *conn) +{ + const char *line, *id_str; + unsigned int id; + + switch (i_stream_read(conn->input)) { + case 0: + return; + case -1: + /* disconnected */ + auth_worker_destroy(&conn, "Worker process died unexpectedly", + TRUE); + return; + case -2: + /* buffer full */ + e_error(conn->event, + "BUG: Auth worker sent us more than %d bytes", + (int)AUTH_WORKER_MAX_LINE_LENGTH); + auth_worker_destroy(&conn, "Worker is buggy", TRUE); + return; + } + + while ((line = i_stream_next_line(conn->input)) != NULL) { + if (strcmp(line, "RESTART") == 0) { + conn->restart = TRUE; + continue; + } + if (strcmp(line, "SHUTDOWN") == 0) { + conn->shutdown = TRUE; + continue; + } + if (strcmp(line, "ERROR") == 0) { + if (!auth_worker_error(conn)) + return; + continue; + } + if (strcmp(line, "SUCCESS") == 0) { + auth_worker_success(conn); + continue; + } + id_str = line; + line = strchr(line, '\t'); + if (line == NULL || + str_to_uint(t_strdup_until(id_str, line), &id) < 0) + continue; + + if (conn->request != NULL && id == conn->request->id) { + if (!auth_worker_request_handle(conn, conn->request, + line + 1)) + break; + } else { + if (conn->request != NULL) { + e_error(conn->event, + "BUG: Worker sent reply with id %u, " + "expected %u", id, conn->request->id); + } else { + e_error(conn->event, + "BUG: Worker sent reply with id %u, " + "none was expected", id); + } + auth_worker_destroy(&conn, "Worker is buggy", TRUE); + return; + } + } + + if (conn->request != NULL) { + /* there's still a pending request */ + } else if (conn->restart) + auth_worker_destroy(&conn, "Max requests limit", TRUE); + else if (conn->shutdown) + auth_worker_destroy(&conn, "Idle kill", FALSE); + else + auth_worker_request_send_next(conn); +} + +static void worker_input_resume(struct auth_worker_connection *conn) +{ + conn->timeout_pending_resume = FALSE; + timeout_remove(&conn->to); + conn->to = timeout_add(AUTH_WORKER_RESUME_TIMEOUT_SECS * 1000, + auth_worker_call_timeout, conn); + worker_input(conn); +} + +void auth_worker_call(pool_t pool, const char *username, const char *data, + auth_worker_callback_t *callback, void *context) +{ + struct auth_worker_connection *conn; + struct auth_worker_request *request; + + request = p_new(pool, struct auth_worker_request, 1); + request->created = ioloop_time; + request->username = p_strdup(pool, username); + request->data = p_strdup(pool, data); + request->callback = callback; + request->context = context; + + if (aqueue_count(worker_request_queue) > 0) { + /* requests are already being queued, no chance of + finding/creating a worker */ + conn = NULL; + } else { + conn = auth_worker_find_free(); + if (conn == NULL) { + /* no free connections, create a new one */ + conn = auth_worker_create(); + } + } + if (conn != NULL) { + if (!auth_worker_request_send(conn, request)) + i_unreached(); + } else { + /* reached the limit, queue the request */ + aqueue_append(worker_request_queue, &request); + } +} + +void auth_worker_server_resume_input(struct auth_worker_connection *conn) +{ + if (conn->request == NULL) { + /* request was just finished, don't try to resume it */ + return; + } + + if (conn->io == NULL) + conn->io = io_add(conn->fd, IO_READ, worker_input, conn); + if (!conn->timeout_pending_resume) { + conn->timeout_pending_resume = TRUE; + timeout_remove(&conn->to); + conn->to = timeout_add_short(0, worker_input_resume, conn); + } +} + +void auth_worker_server_init(void) +{ + worker_socket_path = "auth-worker"; + auth_workers_throttle_count = global_auth_settings->worker_max_count; + i_assert(auth_workers_throttle_count > 0); + + i_array_init(&worker_request_array, 128); + worker_request_queue = aqueue_init(&worker_request_array.arr); + + i_array_init(&connections, 16); +} + +void auth_worker_server_deinit(void) +{ + struct auth_worker_connection **connp, *conn; + + while (array_count(&connections) > 0) { + connp = array_front_modifiable(&connections); + conn = *connp; + auth_worker_destroy(&conn, "Shutting down", FALSE); + } + array_free(&connections); + + aqueue_deinit(&worker_request_queue); + array_free(&worker_request_array); +} |