summaryrefslogtreecommitdiffstats
path: root/logsrvd/logsrvd_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'logsrvd/logsrvd_queue.c')
-rw-r--r--logsrvd/logsrvd_queue.c287
1 files changed, 287 insertions, 0 deletions
diff --git a/logsrvd/logsrvd_queue.c b/logsrvd/logsrvd_queue.c
new file mode 100644
index 0000000..f164e82
--- /dev/null
+++ b/logsrvd/logsrvd_queue.c
@@ -0,0 +1,287 @@
+/*
+ * SPDX-License-Identifier: ISC
+ *
+ * Copyright (c) 2021 Todd C. Miller <Todd.Miller@sudo.ws>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+/*
+ * This is an open source non-commercial project. Dear PVS-Studio, please check it.
+ * PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
+ */
+
+#include <config.h>
+
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+
+#include <errno.h>
+#include <fcntl.h>
+#include <limits.h>
+#ifdef HAVE_STDBOOL_H
+# include <stdbool.h>
+#else
+# include "compat/stdbool.h"
+#endif /* HAVE_STDBOOL_H */
+#if defined(HAVE_STDINT_H)
+# include <stdint.h>
+#elif defined(HAVE_INTTYPES_H)
+# include <inttypes.h>
+#endif
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <dirent.h>
+#include <time.h>
+#include <unistd.h>
+
+#include "sudo_compat.h"
+#include "sudo_conf.h"
+#include "sudo_debug.h"
+#include "sudo_event.h"
+#include "sudo_eventlog.h"
+#include "sudo_fatal.h"
+#include "sudo_gettext.h"
+#include "sudo_iolog.h"
+#include "sudo_queue.h"
+#include "sudo_util.h"
+
+#include "logsrvd.h"
+
+#if defined(HAVE_STRUCT_DIRENT_D_NAMLEN) && HAVE_STRUCT_DIRENT_D_NAMLEN
+# define NAMLEN(dirent) (dirent)->d_namlen
+#else
+# define NAMLEN(dirent) strlen((dirent)->d_name)
+#endif
+
+static struct outgoing_journal_queue outgoing_journal_queue =
+ TAILQ_HEAD_INITIALIZER(outgoing_journal_queue);
+
+static struct sudo_event *outgoing_queue_event;
+
+/*
+ * Callback that runs when the outgoing queue retry timer fires.
+ * Tries to relay the first entry in the outgoing queue.
+ */
+static void
+outgoing_queue_cb(int unused, int what, void *v)
+{
+ struct connection_closure *closure;
+ struct outgoing_journal *oj, *next;
+ struct sudo_event_base *evbase = v;
+ bool success = false;
+ debug_decl(outgoing_queue_cb, SUDO_DEBUG_UTIL);
+
+ /* Must have at least one relay server. */
+ if (TAILQ_EMPTY(logsrvd_conf_relay_address()))
+ debug_return;
+
+ /* Process first journal. */
+ TAILQ_FOREACH_SAFE(oj, &outgoing_journal_queue, entries, next) {
+ FILE *fp;
+ int fd;
+
+ fd = open(oj->journal_path, O_RDWR);
+ if (fd == -1) {
+ if (errno == ENOENT) {
+ TAILQ_REMOVE(&outgoing_journal_queue, oj, entries);
+ free(oj->journal_path);
+ free(oj);
+ }
+ continue;
+ }
+ if (!sudo_lock_file(fd, SUDO_TLOCK)) {
+ sudo_warn(U_("unable to lock %s"), oj->journal_path);
+ close(fd);
+ continue;
+ }
+ fp = fdopen(fd, "r");
+ if (fp == NULL) {
+ sudo_warn(U_("unable to open %s"), oj->journal_path);
+ close(fd);
+ break;
+ }
+
+ /* Allocate a connection closure and fill in journal vars. */
+ closure = connection_closure_alloc(fd, false, true, evbase);
+ if (closure == NULL) {
+ fclose(fp);
+ break;
+ }
+ closure->journal = fp;
+ closure->journal_path = oj->journal_path;
+
+ /* Done with oj now, closure owns journal_path. */
+ TAILQ_REMOVE(&outgoing_journal_queue, oj, entries);
+ free(oj);
+
+ success = connect_relay(closure);
+ if (!success) {
+ sudo_warnx("%s", U_("unable to connect to relay"));
+ connection_close(closure);
+ }
+ break;
+ }
+}
+
+/*
+ * Schedule the outgoing_queue_event, creating it as necessary.
+ * The event will fire after the specified timeout elapses.
+ */
+bool
+logsrvd_queue_enable(time_t timeout, struct sudo_event_base *evbase)
+{
+ debug_decl(logsrvd_queue_enable, SUDO_DEBUG_UTIL);
+
+ if (!TAILQ_EMPTY(&outgoing_journal_queue)) {
+ struct timespec tv = { timeout, 0 };
+
+ if (outgoing_queue_event == NULL) {
+ outgoing_queue_event = sudo_ev_alloc(-1, SUDO_EV_TIMEOUT,
+ outgoing_queue_cb, evbase);
+ if (outgoing_queue_event == NULL) {
+ sudo_warnx(U_("%s: %s"), __func__,
+ U_("unable to allocate memory"));
+ debug_return_bool(false);
+ }
+ }
+ if (sudo_ev_add(evbase, outgoing_queue_event, &tv, false) == -1) {
+ sudo_warnx("%s", U_("unable to add event to queue"));
+ debug_return_bool(false);
+ }
+ }
+ debug_return_bool(true);
+}
+
+/*
+ * Allocate a queue item based on the connection and push it on
+ * the outgoing queue.
+ * Consumes journal_path from the closure.
+ */
+bool
+logsrvd_queue_insert(struct connection_closure *closure)
+{
+ struct outgoing_journal *oj;
+ debug_decl(logsrvd_queue_insert, SUDO_DEBUG_UTIL);
+
+ if (closure->journal_path == NULL) {
+ sudo_debug_printf(SUDO_DEBUG_ERROR|SUDO_DEBUG_LINENO,
+ "missing journal_path for closure %p", closure);
+ debug_return_bool(false);
+ }
+
+ if ((oj = malloc(sizeof(*oj))) == NULL) {
+ sudo_warnx(U_("%s: %s"), __func__, U_("unable to allocate memory"));
+ debug_return_bool(false);
+ }
+ oj->journal_path = closure->journal_path;
+ closure->journal_path = NULL;
+ TAILQ_INSERT_TAIL(&outgoing_journal_queue, oj, entries);
+
+ if (!logsrvd_queue_enable(logsrvd_conf_relay_retry_interval(),
+ closure->evbase))
+ debug_return_bool(false);
+
+ debug_return_bool(true);
+}
+
+/*
+ * Scan the outgoing queue at startup and populate the
+ * outgoing_journal_queue.
+ */
+bool
+logsrvd_queue_scan(struct sudo_event_base *evbase)
+{
+ char path[PATH_MAX];
+ struct dirent *dent;
+ size_t prefix_len;
+ int dirlen;
+ DIR *dirp;
+ debug_decl(logsrvd_queue_scan, SUDO_DEBUG_UTIL);
+
+ /* Must have at least one relay server. */
+ if (TAILQ_EMPTY(logsrvd_conf_relay_address()))
+ debug_return_bool(true);
+
+ dirlen = snprintf(path, sizeof(path), "%s/outgoing/%s",
+ logsrvd_conf_relay_dir(), RELAY_TEMPLATE);
+ if (dirlen >= ssizeof(path)) {
+ errno = ENAMETOOLONG;
+ sudo_warn("%s/outgoing/%s", logsrvd_conf_relay_dir(), RELAY_TEMPLATE);
+ debug_return_bool(false);
+ }
+ dirlen -= sizeof(RELAY_TEMPLATE) - 1;
+ path[dirlen] = '\0';
+
+ dirp = opendir(path);
+ if (dirp == NULL) {
+ sudo_warn("opendir %s", path);
+ debug_return_bool(false);
+ }
+ prefix_len = strcspn(RELAY_TEMPLATE, "X");
+ while ((dent = readdir(dirp)) != NULL) {
+ struct outgoing_journal *oj;
+
+ /* Skip anything that is not a relay temp file. */
+ if (NAMLEN(dent) != sizeof(RELAY_TEMPLATE) - 1)
+ continue;
+ if (strncmp(dent->d_name, RELAY_TEMPLATE, prefix_len) != 0)
+ continue;
+
+ /* Add to queue. */
+ path[dirlen] = '\0';
+ if (strlcat(path, dent->d_name, sizeof(path)) >= sizeof(path))
+ continue;
+ if ((oj = malloc(sizeof(*oj))) == NULL)
+ goto oom;
+ if ((oj->journal_path = strdup(path)) == NULL) {
+ free(oj);
+ goto oom;
+ }
+ TAILQ_INSERT_TAIL(&outgoing_journal_queue, oj, entries);
+ }
+ closedir(dirp);
+
+ /* Process the queue immediately. */
+ if (!logsrvd_queue_enable(0, evbase))
+ debug_return_bool(false);
+
+ debug_return_bool(true);
+oom:
+ sudo_warnx(U_("%s: %s"), __func__, U_("unable to allocate memory"));
+ closedir(dirp);
+ debug_return_bool(false);
+}
+
+/*
+ * Dump outgoing queue in response to SIGUSR1.
+ */
+void
+logsrvd_queue_dump(void)
+{
+ struct outgoing_journal *oj;
+ debug_decl(logsrvd_queue_dump, SUDO_DEBUG_UTIL);
+
+ if (TAILQ_EMPTY(&outgoing_journal_queue))
+ debug_return;
+
+ sudo_debug_printf(SUDO_DEBUG_INFO, "outgoing journal queue:");
+ TAILQ_FOREACH(oj, &outgoing_journal_queue, entries) {
+ sudo_debug_printf(SUDO_DEBUG_INFO, " %s", oj->journal_path);
+ }
+}