diff options
Diffstat (limited to 'logsrvd/logsrvd_queue.c')
-rw-r--r-- | logsrvd/logsrvd_queue.c | 287 |
1 files changed, 287 insertions, 0 deletions
diff --git a/logsrvd/logsrvd_queue.c b/logsrvd/logsrvd_queue.c new file mode 100644 index 0000000..cf48656 --- /dev/null +++ b/logsrvd/logsrvd_queue.c @@ -0,0 +1,287 @@ +/* + * SPDX-License-Identifier: ISC + * + * Copyright (c) 2021 Todd C. Miller <Todd.Miller@sudo.ws> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +/* + * This is an open source non-commercial project. Dear PVS-Studio, please check it. + * PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com + */ + +#include <config.h> + +#include <sys/stat.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> + +#include <errno.h> +#include <fcntl.h> +#include <limits.h> +#ifdef HAVE_STDBOOL_H +# include <stdbool.h> +#else +# include <compat/stdbool.h> +#endif /* HAVE_STDBOOL_H */ +#if defined(HAVE_STDINT_H) +# include <stdint.h> +#elif defined(HAVE_INTTYPES_H) +# include <inttypes.h> +#endif +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <dirent.h> +#include <time.h> +#include <unistd.h> + +#include <sudo_compat.h> +#include <sudo_conf.h> +#include <sudo_debug.h> +#include <sudo_event.h> +#include <sudo_eventlog.h> +#include <sudo_fatal.h> +#include <sudo_gettext.h> +#include <sudo_iolog.h> +#include <sudo_queue.h> +#include <sudo_util.h> + +#include <logsrvd.h> + +#if defined(HAVE_STRUCT_DIRENT_D_NAMLEN) && HAVE_STRUCT_DIRENT_D_NAMLEN +# define NAMLEN(dirent) (dirent)->d_namlen +#else +# define NAMLEN(dirent) strlen((dirent)->d_name) +#endif + +static struct outgoing_journal_queue outgoing_journal_queue = + TAILQ_HEAD_INITIALIZER(outgoing_journal_queue); + +static struct sudo_event *outgoing_queue_event; + +/* + * Callback that runs when the outgoing queue retry timer fires. + * Tries to relay the first entry in the outgoing queue. + */ +static void +outgoing_queue_cb(int unused, int what, void *v) +{ + struct connection_closure *closure; + struct outgoing_journal *oj, *next; + struct sudo_event_base *evbase = v; + bool success = false; + debug_decl(outgoing_queue_cb, SUDO_DEBUG_UTIL); + + /* Must have at least one relay server. */ + if (TAILQ_EMPTY(logsrvd_conf_relay_address())) + debug_return; + + /* Process first journal. */ + TAILQ_FOREACH_SAFE(oj, &outgoing_journal_queue, entries, next) { + FILE *fp; + int fd; + + fd = open(oj->journal_path, O_RDWR); + if (fd == -1) { + if (errno == ENOENT) { + TAILQ_REMOVE(&outgoing_journal_queue, oj, entries); + free(oj->journal_path); + free(oj); + } + continue; + } + if (!sudo_lock_file(fd, SUDO_TLOCK)) { + sudo_warn(U_("unable to lock %s"), oj->journal_path); + close(fd); + continue; + } + fp = fdopen(fd, "r"); + if (fp == NULL) { + sudo_warn(U_("unable to open %s"), oj->journal_path); + close(fd); + break; + } + + /* Allocate a connection closure and fill in journal vars. */ + closure = connection_closure_alloc(fd, false, true, evbase); + if (closure == NULL) { + fclose(fp); + break; + } + closure->journal = fp; + closure->journal_path = oj->journal_path; + + /* Done with oj now, closure owns journal_path. */ + TAILQ_REMOVE(&outgoing_journal_queue, oj, entries); + free(oj); + + success = connect_relay(closure); + if (!success) { + sudo_warnx("%s", U_("unable to connect to relay")); + connection_close(closure); + } + break; + } +} + +/* + * Schedule the outgoing_queue_event, creating it as necessary. + * The event will fire after the specified timeout elapses. + */ +bool +logsrvd_queue_enable(time_t timeout, struct sudo_event_base *evbase) +{ + debug_decl(logsrvd_queue_enable, SUDO_DEBUG_UTIL); + + if (!TAILQ_EMPTY(&outgoing_journal_queue)) { + struct timespec tv = { timeout, 0 }; + + if (outgoing_queue_event == NULL) { + outgoing_queue_event = sudo_ev_alloc(-1, SUDO_EV_TIMEOUT, + outgoing_queue_cb, evbase); + if (outgoing_queue_event == NULL) { + sudo_warnx(U_("%s: %s"), __func__, + U_("unable to allocate memory")); + debug_return_bool(false); + } + } + if (sudo_ev_add(evbase, outgoing_queue_event, &tv, false) == -1) { + sudo_warnx("%s", U_("unable to add event to queue")); + debug_return_bool(false); + } + } + debug_return_bool(true); +} + +/* + * Allocate a queue item based on the connection and push it on + * the outgoing queue. + * Consumes journal_path from the closure. + */ +bool +logsrvd_queue_insert(struct connection_closure *closure) +{ + struct outgoing_journal *oj; + debug_decl(logsrvd_queue_insert, SUDO_DEBUG_UTIL); + + if (closure->journal_path == NULL) { + sudo_debug_printf(SUDO_DEBUG_ERROR|SUDO_DEBUG_LINENO, + "missing journal_path for closure %p", closure); + debug_return_bool(false); + } + + if ((oj = malloc(sizeof(*oj))) == NULL) { + sudo_warnx(U_("%s: %s"), __func__, U_("unable to allocate memory")); + debug_return_bool(false); + } + oj->journal_path = closure->journal_path; + closure->journal_path = NULL; + TAILQ_INSERT_TAIL(&outgoing_journal_queue, oj, entries); + + if (!logsrvd_queue_enable(logsrvd_conf_relay_retry_interval(), + closure->evbase)) + debug_return_bool(false); + + debug_return_bool(true); +} + +/* + * Scan the outgoing queue at startup and populate the + * outgoing_journal_queue. + */ +bool +logsrvd_queue_scan(struct sudo_event_base *evbase) +{ + char path[PATH_MAX]; + struct dirent *dent; + size_t prefix_len; + int dirlen; + DIR *dirp; + debug_decl(logsrvd_queue_scan, SUDO_DEBUG_UTIL); + + /* Must have at least one relay server. */ + if (TAILQ_EMPTY(logsrvd_conf_relay_address())) + debug_return_bool(true); + + dirlen = snprintf(path, sizeof(path), "%s/outgoing/%s", + logsrvd_conf_relay_dir(), RELAY_TEMPLATE); + if (dirlen >= ssizeof(path)) { + errno = ENAMETOOLONG; + sudo_warn("%s/outgoing/%s", logsrvd_conf_relay_dir(), RELAY_TEMPLATE); + debug_return_bool(false); + } + dirlen -= (int)sizeof(RELAY_TEMPLATE) - 1; + path[dirlen] = '\0'; + + dirp = opendir(path); + if (dirp == NULL) { + sudo_warn("opendir %s", path); + debug_return_bool(false); + } + prefix_len = strcspn(RELAY_TEMPLATE, "X"); + while ((dent = readdir(dirp)) != NULL) { + struct outgoing_journal *oj; + + /* Skip anything that is not a relay temp file. */ + if (NAMLEN(dent) != sizeof(RELAY_TEMPLATE) - 1) + continue; + if (strncmp(dent->d_name, RELAY_TEMPLATE, prefix_len) != 0) + continue; + + /* Add to queue. */ + path[dirlen] = '\0'; + if (strlcat(path, dent->d_name, sizeof(path)) >= sizeof(path)) + continue; + if ((oj = malloc(sizeof(*oj))) == NULL) + goto oom; + if ((oj->journal_path = strdup(path)) == NULL) { + free(oj); + goto oom; + } + TAILQ_INSERT_TAIL(&outgoing_journal_queue, oj, entries); + } + closedir(dirp); + + /* Process the queue immediately. */ + if (!logsrvd_queue_enable(0, evbase)) + debug_return_bool(false); + + debug_return_bool(true); +oom: + sudo_warnx(U_("%s: %s"), __func__, U_("unable to allocate memory")); + closedir(dirp); + debug_return_bool(false); +} + +/* + * Dump outgoing queue in response to SIGUSR1. + */ +void +logsrvd_queue_dump(void) +{ + struct outgoing_journal *oj; + debug_decl(logsrvd_queue_dump, SUDO_DEBUG_UTIL); + + if (TAILQ_EMPTY(&outgoing_journal_queue)) + debug_return; + + sudo_debug_printf(SUDO_DEBUG_INFO, "outgoing journal queue:"); + TAILQ_FOREACH(oj, &outgoing_journal_queue, entries) { + sudo_debug_printf(SUDO_DEBUG_INFO, " %s", oj->journal_path); + } +} |