summaryrefslogtreecommitdiffstats
path: root/spawn/spawn.c
diff options
context:
space:
mode:
Diffstat (limited to 'spawn/spawn.c')
-rw-r--r--spawn/spawn.c290
1 files changed, 290 insertions, 0 deletions
diff --git a/spawn/spawn.c b/spawn/spawn.c
new file mode 100644
index 0000000..256c046
--- /dev/null
+++ b/spawn/spawn.c
@@ -0,0 +1,290 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "spawn.h"
+#include "../database/engine/rrdenginelib.h"
+
+static uv_thread_t thread;
+int spawn_thread_error;
+int spawn_thread_shutdown;
+
+struct spawn_queue spawn_cmd_queue;
+
+static struct spawn_cmd_info *create_spawn_cmd(char *command_to_run)
+{
+ struct spawn_cmd_info *cmdinfo;
+
+ cmdinfo = mallocz(sizeof(*cmdinfo));
+ fatal_assert(0 == uv_cond_init(&cmdinfo->cond));
+ fatal_assert(0 == uv_mutex_init(&cmdinfo->mutex));
+ cmdinfo->serial = 0; /* invalid */
+ cmdinfo->command_to_run = strdupz(command_to_run);
+ cmdinfo->exit_status = -1; /* invalid */
+ cmdinfo->pid = -1; /* invalid */
+ cmdinfo->flags = 0;
+
+ return cmdinfo;
+}
+
+void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo)
+{
+ uv_cond_destroy(&cmdinfo->cond);
+ uv_mutex_destroy(&cmdinfo->mutex);
+
+ freez(cmdinfo->command_to_run);
+ freez(cmdinfo);
+}
+
+int spawn_cmd_compare(void *a, void *b)
+{
+ struct spawn_cmd_info *cmda = a, *cmdb = b;
+
+ /* No need for mutex, serial will never change and the entries cannot be deallocated yet */
+ if (cmda->serial < cmdb->serial) return -1;
+ if (cmda->serial > cmdb->serial) return 1;
+
+ return 0;
+}
+
+static void init_spawn_cmd_queue(void)
+{
+ spawn_cmd_queue.cmd_tree.root = NULL;
+ spawn_cmd_queue.cmd_tree.compar = spawn_cmd_compare;
+ spawn_cmd_queue.size = 0;
+ spawn_cmd_queue.latest_serial = 0;
+ fatal_assert(0 == uv_cond_init(&spawn_cmd_queue.cond));
+ fatal_assert(0 == uv_mutex_init(&spawn_cmd_queue.mutex));
+}
+
+/*
+ * Returns serial number of the enqueued command
+ */
+uint64_t spawn_enq_cmd(char *command_to_run)
+{
+ unsigned queue_size;
+ uint64_t serial;
+ avl *avl_ret;
+ struct spawn_cmd_info *cmdinfo;
+
+ cmdinfo = create_spawn_cmd(command_to_run);
+
+ /* wait for free space in queue */
+ uv_mutex_lock(&spawn_cmd_queue.mutex);
+ while ((queue_size = spawn_cmd_queue.size) == SPAWN_MAX_OUTSTANDING) {
+ uv_cond_wait(&spawn_cmd_queue.cond, &spawn_cmd_queue.mutex);
+ }
+ fatal_assert(queue_size < SPAWN_MAX_OUTSTANDING);
+ spawn_cmd_queue.size = queue_size + 1;
+
+ serial = ++spawn_cmd_queue.latest_serial; /* 0 is invalid */
+ cmdinfo->serial = serial; /* No need to take the cmd mutex since it is unreachable at the moment */
+
+ /* enqueue command */
+ avl_ret = avl_insert(&spawn_cmd_queue.cmd_tree, (avl *)cmdinfo);
+ fatal_assert(avl_ret == (avl *)cmdinfo);
+ uv_mutex_unlock(&spawn_cmd_queue.mutex);
+
+ /* wake up event loop */
+ fatal_assert(0 == uv_async_send(&spawn_async));
+ return serial;
+}
+
+/*
+ * Blocks until command with serial finishes running. Only one thread is allowed to wait per command.
+ */
+void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp)
+{
+ avl *avl_ret;
+ struct spawn_cmd_info tmp, *cmdinfo;
+
+ tmp.serial = serial;
+
+ uv_mutex_lock(&spawn_cmd_queue.mutex);
+ avl_ret = avl_search(&spawn_cmd_queue.cmd_tree, (avl *)&tmp);
+ uv_mutex_unlock(&spawn_cmd_queue.mutex);
+
+ fatal_assert(avl_ret); /* Could be NULL if more than 1 threads wait for the command */
+ cmdinfo = (struct spawn_cmd_info *)avl_ret;
+
+ uv_mutex_lock(&cmdinfo->mutex);
+ while (!(cmdinfo->flags & SPAWN_CMD_DONE)) {
+ /* Only 1 thread is allowed to wait for this command to finish */
+ uv_cond_wait(&cmdinfo->cond, &cmdinfo->mutex);
+ }
+ uv_mutex_unlock(&cmdinfo->mutex);
+
+ spawn_deq_cmd(cmdinfo);
+ *exit_status = cmdinfo->exit_status;
+ *exec_run_timestamp = cmdinfo->exec_run_timestamp;
+
+ destroy_spawn_cmd(cmdinfo);
+}
+
+void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo)
+{
+ unsigned queue_size;
+ avl *avl_ret;
+
+ uv_mutex_lock(&spawn_cmd_queue.mutex);
+ queue_size = spawn_cmd_queue.size;
+ fatal_assert(queue_size);
+ /* dequeue command */
+ avl_ret = avl_remove(&spawn_cmd_queue.cmd_tree, (avl *)cmdinfo);
+ fatal_assert(avl_ret);
+
+ spawn_cmd_queue.size = queue_size - 1;
+
+ /* wake up callers */
+ uv_cond_signal(&spawn_cmd_queue.cond);
+ uv_mutex_unlock(&spawn_cmd_queue.mutex);
+}
+
+/*
+ * Must be called from the spawn client event loop context. This way no mutex is needed because the event loop is the
+ * only writer as far as struct spawn_cmd_info entries are concerned.
+ */
+static int find_unprocessed_spawn_cmd_cb(void *entry, void *data)
+{
+ struct spawn_cmd_info **cmdinfop = data, *cmdinfo = entry;
+
+ if (!(cmdinfo->flags & SPAWN_CMD_PROCESSED)) {
+ *cmdinfop = cmdinfo;
+ return -1; /* break tree traversal */
+ }
+ return 0; /* continue traversing */
+}
+
+struct spawn_cmd_info *spawn_get_unprocessed_cmd(void)
+{
+ struct spawn_cmd_info *cmdinfo;
+ unsigned queue_size;
+ int ret;
+
+ uv_mutex_lock(&spawn_cmd_queue.mutex);
+ queue_size = spawn_cmd_queue.size;
+ if (queue_size == 0) {
+ uv_mutex_unlock(&spawn_cmd_queue.mutex);
+ return NULL;
+ }
+ /* find command */
+ cmdinfo = NULL;
+ ret = avl_traverse(&spawn_cmd_queue.cmd_tree, find_unprocessed_spawn_cmd_cb, (void *)&cmdinfo);
+ if (-1 != ret) { /* no commands available for processing */
+ uv_mutex_unlock(&spawn_cmd_queue.mutex);
+ return NULL;
+ }
+ uv_mutex_unlock(&spawn_cmd_queue.mutex);
+
+ return cmdinfo;
+}
+
+/**
+ * This function spawns a process that shares a libuv IPC pipe with the caller and performs spawn server duties.
+ * The spawn server process will close all open file descriptors except for the pipe, UV_STDOUT_FD, and UV_STDERR_FD.
+ * The caller has to be the netdata user as configured.
+ *
+ * @param loop the libuv loop of the caller context
+ * @param spawn_channel the birectional libuv IPC pipe that the server and the caller will share
+ * @param process the spawn server libuv process context
+ * @return 0 on success or the libuv error code
+ */
+int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process)
+{
+ uv_process_options_t options = {0};
+ char *args[3];
+ int ret;
+#define SPAWN_SERVER_DESCRIPTORS (3)
+ uv_stdio_container_t stdio[SPAWN_SERVER_DESCRIPTORS];
+ struct passwd *passwd = NULL;
+ char *user = NULL;
+
+ passwd = getpwuid(getuid());
+ user = (passwd && passwd->pw_name) ? passwd->pw_name : "";
+
+ args[0] = exepath;
+ args[1] = SPAWN_SERVER_COMMAND_LINE_ARGUMENT;
+ args[2] = NULL;
+
+ memset(&options, 0, sizeof(options));
+ options.file = exepath;
+ options.args = args;
+ options.exit_cb = NULL; //exit_cb;
+ options.stdio = stdio;
+ options.stdio_count = SPAWN_SERVER_DESCRIPTORS;
+
+ stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
+ stdio[0].data.stream = (uv_stream_t *)spawn_channel; /* bidirectional libuv pipe */
+ stdio[1].flags = UV_INHERIT_FD;
+ stdio[1].data.fd = 1 /* UV_STDOUT_FD */;
+ stdio[2].flags = UV_INHERIT_FD;
+ stdio[2].data.fd = 2 /* UV_STDERR_FD */;
+
+ ret = uv_spawn(loop, process, &options); /* execute the netdata binary again as the netdata user */
+ if (0 != ret) {
+ error("uv_spawn (process: \"%s\") (user: %s) failed (%s).", exepath, user, uv_strerror(ret));
+ fatal("Cannot start netdata without the spawn server.");
+ }
+
+ return ret;
+}
+
+#define CONCURRENT_SPAWNS 16
+#define SPAWN_ITERATIONS 10000
+#undef CONCURRENT_STRESS_TEST
+
+void spawn_init(void)
+{
+ struct completion completion;
+ int error;
+
+ info("Initializing spawn client.");
+
+ init_spawn_cmd_queue();
+
+ init_completion(&completion);
+ error = uv_thread_create(&thread, spawn_client, &completion);
+ if (error) {
+ error("uv_thread_create(): %s", uv_strerror(error));
+ goto after_error;
+ }
+ /* wait for spawn client thread to initialize */
+ wait_for_completion(&completion);
+ destroy_completion(&completion);
+ uv_thread_set_name_np(thread, "DAEMON_SPAWN");
+
+ if (spawn_thread_error) {
+ error = uv_thread_join(&thread);
+ if (error) {
+ error("uv_thread_create(): %s", uv_strerror(error));
+ }
+ goto after_error;
+ }
+#ifdef CONCURRENT_STRESS_TEST
+ signals_reset();
+ signals_unblock();
+
+ sleep(60);
+ uint64_t serial[CONCURRENT_SPAWNS];
+ for (int j = 0 ; j < SPAWN_ITERATIONS ; ++j) {
+ for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
+ char cmd[64];
+ sprintf(cmd, "echo CONCURRENT_STRESS_TEST %d 1>&2", j * CONCURRENT_SPAWNS + i + 1);
+ serial[i] = spawn_enq_cmd(cmd);
+ info("Queued command %s for spawning.", cmd);
+ }
+ int exit_status;
+ time_t exec_run_timestamp;
+ for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
+ info("Started waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
+ exec_run_timestamp);
+ spawn_wait_cmd(serial[i], &exit_status, &exec_run_timestamp);
+ info("Finished waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
+ exec_run_timestamp);
+ }
+ }
+ exit(0);
+#endif
+ return;
+
+ after_error:
+ error("Failed to initialize spawn service. The alarms notifications will not be spawned.");
+}