summaryrefslogtreecommitdiffstats
path: root/spawn
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-21 17:19:04 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-21 17:19:04 +0000
commit310edf444908b09ea6d00c03baceb7925f3bb7a2 (patch)
tree7064577c7fa7a851e2e930beb606ea8237b0bbd2 /spawn
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-310edf444908b09ea6d00c03baceb7925f3bb7a2.tar.xz
netdata-310edf444908b09ea6d00c03baceb7925f3bb7a2.zip
Merging upstream version 1.45.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'spawn')
-rw-r--r--spawn/Makefile.am9
-rw-r--r--spawn/README.md0
-rw-r--r--spawn/spawn.c289
-rw-r--r--spawn/spawn.h109
-rw-r--r--spawn/spawn_client.c248
-rw-r--r--spawn/spawn_server.c386
6 files changed, 0 insertions, 1041 deletions
diff --git a/spawn/Makefile.am b/spawn/Makefile.am
deleted file mode 100644
index 02fe3a31..00000000
--- a/spawn/Makefile.am
+++ /dev/null
@@ -1,9 +0,0 @@
-# SPDX-License-Identifier: GPL-3.0-or-later
-
-AUTOMAKE_OPTIONS = subdir-objects
-MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
-
-dist_noinst_DATA = \
- README.md \
- $(NULL)
-
diff --git a/spawn/README.md b/spawn/README.md
deleted file mode 100644
index e69de29b..00000000
--- a/spawn/README.md
+++ /dev/null
diff --git a/spawn/spawn.c b/spawn/spawn.c
deleted file mode 100644
index 3d62df79..00000000
--- a/spawn/spawn.c
+++ /dev/null
@@ -1,289 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "spawn.h"
-
-static uv_thread_t thread;
-int spawn_thread_error;
-int spawn_thread_shutdown;
-
-struct spawn_queue spawn_cmd_queue;
-
-static struct spawn_cmd_info *create_spawn_cmd(const char *command_to_run)
-{
- struct spawn_cmd_info *cmdinfo;
-
- cmdinfo = mallocz(sizeof(*cmdinfo));
- fatal_assert(0 == uv_cond_init(&cmdinfo->cond));
- fatal_assert(0 == uv_mutex_init(&cmdinfo->mutex));
- cmdinfo->serial = 0; /* invalid */
- cmdinfo->command_to_run = strdupz(command_to_run);
- cmdinfo->exit_status = -1; /* invalid */
- cmdinfo->pid = -1; /* invalid */
- cmdinfo->flags = 0;
-
- return cmdinfo;
-}
-
-void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo)
-{
- uv_cond_destroy(&cmdinfo->cond);
- uv_mutex_destroy(&cmdinfo->mutex);
-
- freez(cmdinfo->command_to_run);
- freez(cmdinfo);
-}
-
-int spawn_cmd_compare(void *a, void *b)
-{
- struct spawn_cmd_info *cmda = a, *cmdb = b;
-
- /* No need for mutex, serial will never change and the entries cannot be deallocated yet */
- if (cmda->serial < cmdb->serial) return -1;
- if (cmda->serial > cmdb->serial) return 1;
-
- return 0;
-}
-
-static void init_spawn_cmd_queue(void)
-{
- spawn_cmd_queue.cmd_tree.root = NULL;
- spawn_cmd_queue.cmd_tree.compar = spawn_cmd_compare;
- spawn_cmd_queue.size = 0;
- spawn_cmd_queue.latest_serial = 0;
- fatal_assert(0 == uv_cond_init(&spawn_cmd_queue.cond));
- fatal_assert(0 == uv_mutex_init(&spawn_cmd_queue.mutex));
-}
-
-/*
- * Returns serial number of the enqueued command
- */
-uint64_t spawn_enq_cmd(const char *command_to_run)
-{
- unsigned queue_size;
- uint64_t serial;
- avl_t *avl_ret;
- struct spawn_cmd_info *cmdinfo;
-
- cmdinfo = create_spawn_cmd(command_to_run);
-
- /* wait for free space in queue */
- uv_mutex_lock(&spawn_cmd_queue.mutex);
- while ((queue_size = spawn_cmd_queue.size) == SPAWN_MAX_OUTSTANDING) {
- uv_cond_wait(&spawn_cmd_queue.cond, &spawn_cmd_queue.mutex);
- }
- fatal_assert(queue_size < SPAWN_MAX_OUTSTANDING);
- spawn_cmd_queue.size = queue_size + 1;
-
- serial = ++spawn_cmd_queue.latest_serial; /* 0 is invalid */
- cmdinfo->serial = serial; /* No need to take the cmd mutex since it is unreachable at the moment */
-
- /* enqueue command */
- avl_ret = avl_insert(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo);
- fatal_assert(avl_ret == (avl_t *)cmdinfo);
- uv_mutex_unlock(&spawn_cmd_queue.mutex);
-
- /* wake up event loop */
- fatal_assert(0 == uv_async_send(&spawn_async));
- return serial;
-}
-
-/*
- * Blocks until command with serial finishes running. Only one thread is allowed to wait per command.
- */
-void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp)
-{
- avl_t *avl_ret;
- struct spawn_cmd_info tmp, *cmdinfo;
-
- tmp.serial = serial;
-
- uv_mutex_lock(&spawn_cmd_queue.mutex);
- avl_ret = avl_search(&spawn_cmd_queue.cmd_tree, (avl_t *)&tmp);
- uv_mutex_unlock(&spawn_cmd_queue.mutex);
-
- fatal_assert(avl_ret); /* Could be NULL if more than 1 threads wait for the command */
- cmdinfo = (struct spawn_cmd_info *)avl_ret;
-
- uv_mutex_lock(&cmdinfo->mutex);
- while (!(cmdinfo->flags & SPAWN_CMD_DONE)) {
- /* Only 1 thread is allowed to wait for this command to finish */
- uv_cond_wait(&cmdinfo->cond, &cmdinfo->mutex);
- }
- uv_mutex_unlock(&cmdinfo->mutex);
-
- spawn_deq_cmd(cmdinfo);
- *exit_status = cmdinfo->exit_status;
- *exec_run_timestamp = cmdinfo->exec_run_timestamp;
-
- destroy_spawn_cmd(cmdinfo);
-}
-
-void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo)
-{
- unsigned queue_size;
- avl_t *avl_ret;
-
- uv_mutex_lock(&spawn_cmd_queue.mutex);
- queue_size = spawn_cmd_queue.size;
- fatal_assert(queue_size);
- /* dequeue command */
- avl_ret = avl_remove(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo);
- fatal_assert(avl_ret);
-
- spawn_cmd_queue.size = queue_size - 1;
-
- /* wake up callers */
- uv_cond_signal(&spawn_cmd_queue.cond);
- uv_mutex_unlock(&spawn_cmd_queue.mutex);
-}
-
-/*
- * Must be called from the spawn client event loop context. This way no mutex is needed because the event loop is the
- * only writer as far as struct spawn_cmd_info entries are concerned.
- */
-static int find_unprocessed_spawn_cmd_cb(void *entry, void *data)
-{
- struct spawn_cmd_info **cmdinfop = data, *cmdinfo = entry;
-
- if (!(cmdinfo->flags & SPAWN_CMD_PROCESSED)) {
- *cmdinfop = cmdinfo;
- return -1; /* break tree traversal */
- }
- return 0; /* continue traversing */
-}
-
-struct spawn_cmd_info *spawn_get_unprocessed_cmd(void)
-{
- struct spawn_cmd_info *cmdinfo;
- unsigned queue_size;
- int ret;
-
- uv_mutex_lock(&spawn_cmd_queue.mutex);
- queue_size = spawn_cmd_queue.size;
- if (queue_size == 0) {
- uv_mutex_unlock(&spawn_cmd_queue.mutex);
- return NULL;
- }
- /* find command */
- cmdinfo = NULL;
- ret = avl_traverse(&spawn_cmd_queue.cmd_tree, find_unprocessed_spawn_cmd_cb, (void *)&cmdinfo);
- if (-1 != ret) { /* no commands available for processing */
- uv_mutex_unlock(&spawn_cmd_queue.mutex);
- return NULL;
- }
- uv_mutex_unlock(&spawn_cmd_queue.mutex);
-
- return cmdinfo;
-}
-
-/**
- * This function spawns a process that shares a libuv IPC pipe with the caller and performs spawn server duties.
- * The spawn server process will close all open file descriptors except for the pipe, UV_STDOUT_FD, and UV_STDERR_FD.
- * The caller has to be the netdata user as configured.
- *
- * @param loop the libuv loop of the caller context
- * @param spawn_channel the bidirectional libuv IPC pipe that the server and the caller will share
- * @param process the spawn server libuv process context
- * @return 0 on success or the libuv error code
- */
-int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process)
-{
- uv_process_options_t options = {0};
- char *args[3];
- int ret;
-#define SPAWN_SERVER_DESCRIPTORS (3)
- uv_stdio_container_t stdio[SPAWN_SERVER_DESCRIPTORS];
- struct passwd *passwd = NULL;
- char *user = NULL;
-
- passwd = getpwuid(getuid());
- user = (passwd && passwd->pw_name) ? passwd->pw_name : "";
-
- args[0] = netdata_exe_file;
- args[1] = SPAWN_SERVER_COMMAND_LINE_ARGUMENT;
- args[2] = NULL;
-
- memset(&options, 0, sizeof(options));
- options.file = netdata_exe_file;
- options.args = args;
- options.exit_cb = NULL; //exit_cb;
- options.stdio = stdio;
- options.stdio_count = SPAWN_SERVER_DESCRIPTORS;
-
- stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
- stdio[0].data.stream = (uv_stream_t *)spawn_channel; /* bidirectional libuv pipe */
- stdio[1].flags = UV_INHERIT_FD;
- stdio[1].data.fd = 1 /* UV_STDOUT_FD */;
- stdio[2].flags = UV_INHERIT_FD;
- stdio[2].data.fd = nd_log_health_fd() /* UV_STDERR_FD */;
-
- ret = uv_spawn(loop, process, &options); /* execute the netdata binary again as the netdata user */
- if (0 != ret) {
- netdata_log_error("uv_spawn (process: \"%s\") (user: %s) failed (%s).", netdata_exe_file, user, uv_strerror(ret));
- fatal("Cannot start netdata without the spawn server.");
- }
-
- return ret;
-}
-
-#define CONCURRENT_SPAWNS 16
-#define SPAWN_ITERATIONS 10000
-#undef CONCURRENT_STRESS_TEST
-
-void spawn_init(void)
-{
- struct completion completion;
- int error;
-
- netdata_log_info("Initializing spawn client.");
-
- init_spawn_cmd_queue();
-
- completion_init(&completion);
- error = uv_thread_create(&thread, spawn_client, &completion);
- if (error) {
- netdata_log_error("uv_thread_create(): %s", uv_strerror(error));
- goto after_error;
- }
- /* wait for spawn client thread to initialize */
- completion_wait_for(&completion);
- completion_destroy(&completion);
- uv_thread_set_name_np(thread, "DAEMON_SPAWN");
-
- if (spawn_thread_error) {
- error = uv_thread_join(&thread);
- if (error) {
- netdata_log_error("uv_thread_create(): %s", uv_strerror(error));
- }
- goto after_error;
- }
-#ifdef CONCURRENT_STRESS_TEST
- signals_reset();
- signals_unblock();
-
- sleep(60);
- uint64_t serial[CONCURRENT_SPAWNS];
- for (int j = 0 ; j < SPAWN_ITERATIONS ; ++j) {
- for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
- char cmd[64];
- sprintf(cmd, "echo CONCURRENT_STRESS_TEST %d 1>&2", j * CONCURRENT_SPAWNS + i + 1);
- serial[i] = spawn_enq_cmd(cmd);
- netdata_log_info("Queued command %s for spawning.", cmd);
- }
- int exit_status;
- time_t exec_run_timestamp;
- for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
- netdata_log_info("Started waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
- exec_run_timestamp);
- spawn_wait_cmd(serial[i], &exit_status, &exec_run_timestamp);
- netdata_log_info("Finished waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
- exec_run_timestamp);
- }
- }
- exit(0);
-#endif
- return;
-
- after_error:
- netdata_log_error("Failed to initialize spawn service. The alarms notifications will not be spawned.");
-}
diff --git a/spawn/spawn.h b/spawn/spawn.h
deleted file mode 100644
index 6e9e51ef..00000000
--- a/spawn/spawn.h
+++ /dev/null
@@ -1,109 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_SPAWN_H
-#define NETDATA_SPAWN_H 1
-
-#include "daemon/common.h"
-
-#define SPAWN_SERVER_COMMAND_LINE_ARGUMENT "--special-spawn-server"
-
-typedef enum spawn_protocol {
- SPAWN_PROT_EXEC_CMD = 0,
- SPAWN_PROT_SPAWN_RESULT,
- SPAWN_PROT_CMD_EXIT_STATUS
-} spawn_prot_t;
-
-struct spawn_prot_exec_cmd {
- uint16_t command_length;
- char command_to_run[];
-};
-
-struct spawn_prot_spawn_result {
- pid_t exec_pid; /* 0 if failed to spawn */
- time_t exec_run_timestamp; /* time of successfully spawning the command */
-};
-
-struct spawn_prot_cmd_exit_status {
- int exec_exit_status;
-};
-
-struct spawn_prot_header {
- spawn_prot_t opcode;
- void *handle;
-};
-
-#undef SPAWN_DEBUG /* define to enable debug prints */
-
-#define SPAWN_MAX_OUTSTANDING (32768)
-
-#define SPAWN_CMD_PROCESSED 0x00000001
-#define SPAWN_CMD_IN_PROGRESS 0x00000002
-#define SPAWN_CMD_FAILED_TO_SPAWN 0x00000004
-#define SPAWN_CMD_DONE 0x00000008
-
-struct spawn_cmd_info {
- avl_t avl;
-
- /* concurrency control per command */
- uv_mutex_t mutex;
- uv_cond_t cond; /* users block here until command has finished */
-
- uint64_t serial;
- char *command_to_run;
- int exit_status;
- pid_t pid;
- unsigned long flags;
- time_t exec_run_timestamp; /* time of successfully spawning the command */
-};
-
-/* spawn command queue */
-struct spawn_queue {
- avl_tree_type cmd_tree;
-
- /* concurrency control of command queue */
- uv_mutex_t mutex;
- uv_cond_t cond;
-
- volatile unsigned size;
- uint64_t latest_serial;
-};
-
-struct write_context {
- uv_write_t write_req;
- struct spawn_prot_header header;
- struct spawn_prot_cmd_exit_status exit_status;
- struct spawn_prot_spawn_result spawn_result;
- struct spawn_prot_exec_cmd payload;
-};
-
-extern int spawn_thread_error;
-extern int spawn_thread_shutdown;
-extern uv_async_t spawn_async;
-
-void spawn_init(void);
-void spawn_server(void);
-void spawn_client(void *arg);
-void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo);
-uint64_t spawn_enq_cmd(const char *command_to_run);
-void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp);
-void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo);
-struct spawn_cmd_info *spawn_get_unprocessed_cmd(void);
-int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process);
-
-/*
- * Copies from the source buffer to the protocol buffer. It advances the source buffer by the amount copied. It
- * subtracts the amount copied from the source length.
- */
-static inline void copy_to_prot_buffer(char *prot_buffer, unsigned *prot_buffer_len, unsigned max_to_copy,
- char **source, unsigned *source_len)
-{
- unsigned to_copy;
-
- to_copy = MIN(max_to_copy, *source_len);
- memcpy(prot_buffer + *prot_buffer_len, *source, to_copy);
- *prot_buffer_len += to_copy;
- *source += to_copy;
- *source_len -= to_copy;
-}
-
-#endif //NETDATA_SPAWN_H
diff --git a/spawn/spawn_client.c b/spawn/spawn_client.c
deleted file mode 100644
index 8928a468..00000000
--- a/spawn/spawn_client.c
+++ /dev/null
@@ -1,248 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "spawn.h"
-
-static uv_process_t process;
-static uv_pipe_t spawn_channel;
-static uv_loop_t *loop;
-uv_async_t spawn_async;
-
-static char prot_buffer[MAX_COMMAND_LENGTH];
-static unsigned prot_buffer_len = 0;
-
-static void async_cb(uv_async_t *handle)
-{
- uv_stop(handle->loop);
-}
-
-static void after_pipe_write(uv_write_t* req, int status)
-{
- (void)status;
-#ifdef SPAWN_DEBUG
- netdata_log_info("CLIENT %s called status=%d", __func__, status);
-#endif
- void **data = req->data;
- freez(data[0]);
- freez(data[1]);
- freez(data);
-}
-
-static void client_parse_spawn_protocol(unsigned source_len, char *source)
-{
- unsigned required_len;
- struct spawn_prot_header *header;
- struct spawn_prot_spawn_result *spawn_result;
- struct spawn_prot_cmd_exit_status *exit_status;
- struct spawn_cmd_info *cmdinfo;
-
- while (source_len) {
- required_len = sizeof(*header);
- if (prot_buffer_len < required_len)
- copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
- if (prot_buffer_len < required_len)
- return; /* Source buffer ran out */
-
- header = (struct spawn_prot_header *)prot_buffer;
- cmdinfo = (struct spawn_cmd_info *)header->handle;
- fatal_assert(NULL != cmdinfo);
-
- switch(header->opcode) {
- case SPAWN_PROT_SPAWN_RESULT:
- required_len += sizeof(*spawn_result);
- if (prot_buffer_len < required_len)
- copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
- if (prot_buffer_len < required_len)
- return; /* Source buffer ran out */
-
- spawn_result = (struct spawn_prot_spawn_result *)(header + 1);
- uv_mutex_lock(&cmdinfo->mutex);
- cmdinfo->pid = spawn_result->exec_pid;
- if (0 == cmdinfo->pid) { /* Failed to spawn */
-#ifdef SPAWN_DEBUG
- netdata_log_info("CLIENT %s SPAWN_PROT_SPAWN_RESULT failed to spawn.", __func__);
-#endif
- cmdinfo->flags |= SPAWN_CMD_FAILED_TO_SPAWN | SPAWN_CMD_DONE;
- uv_cond_signal(&cmdinfo->cond);
- } else {
- cmdinfo->exec_run_timestamp = spawn_result->exec_run_timestamp;
- cmdinfo->flags |= SPAWN_CMD_IN_PROGRESS;
-#ifdef SPAWN_DEBUG
- netdata_log_info("CLIENT %s SPAWN_PROT_SPAWN_RESULT in progress.", __func__);
-#endif
- }
- uv_mutex_unlock(&cmdinfo->mutex);
- prot_buffer_len = 0;
- break;
- case SPAWN_PROT_CMD_EXIT_STATUS:
- required_len += sizeof(*exit_status);
- if (prot_buffer_len < required_len)
- copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
- if (prot_buffer_len < required_len)
- return; /* Source buffer ran out */
-
- exit_status = (struct spawn_prot_cmd_exit_status *)(header + 1);
- uv_mutex_lock(&cmdinfo->mutex);
- cmdinfo->exit_status = exit_status->exec_exit_status;
-#ifdef SPAWN_DEBUG
- netdata_log_info("CLIENT %s SPAWN_PROT_CMD_EXIT_STATUS %d.", __func__, exit_status->exec_exit_status);
-#endif
- cmdinfo->flags |= SPAWN_CMD_DONE;
- uv_cond_signal(&cmdinfo->cond);
- uv_mutex_unlock(&cmdinfo->mutex);
- prot_buffer_len = 0;
- break;
- default:
- fatal_assert(0);
- break;
- }
-
- }
-}
-
-static void on_pipe_read(uv_stream_t* pipe, ssize_t nread, const uv_buf_t* buf)
-{
- if (0 == nread) {
- netdata_log_info("%s: Zero bytes read from spawn pipe.", __func__);
- } else if (UV_EOF == nread) {
- netdata_log_info("EOF found in spawn pipe.");
- } else if (nread < 0) {
- netdata_log_error("%s: %s", __func__, uv_strerror(nread));
- }
-
- if (nread < 0) { /* stop stream due to EOF or error */
- (void)uv_read_stop((uv_stream_t *)pipe);
- } else if (nread) {
-#ifdef SPAWN_DEBUG
- netdata_log_info("CLIENT %s read %u", __func__, (unsigned)nread);
-#endif
- client_parse_spawn_protocol(nread, buf->base);
- }
- if (buf && buf->len) {
- freez(buf->base);
- }
-
- if (nread < 0) {
- uv_close((uv_handle_t *)pipe, NULL);
- }
-}
-
-static void on_read_alloc(uv_handle_t* handle,
- size_t suggested_size,
- uv_buf_t* buf)
-{
- (void)handle;
- buf->base = mallocz(suggested_size);
- buf->len = suggested_size;
-}
-
-static void spawn_process_cmd(struct spawn_cmd_info *cmdinfo)
-{
- int ret;
- uv_buf_t *writebuf;
- struct write_context *write_ctx;
-
- void **data = callocz(2, sizeof(void *));
- writebuf = callocz(3, sizeof(uv_buf_t));
- write_ctx = callocz(1, sizeof(*write_ctx));
-
- data[0] = write_ctx;
- data[1] = writebuf;
- write_ctx->write_req.data = data;
-
- uv_mutex_lock(&cmdinfo->mutex);
- cmdinfo->flags |= SPAWN_CMD_PROCESSED;
- uv_mutex_unlock(&cmdinfo->mutex);
-
- write_ctx->header.opcode = SPAWN_PROT_EXEC_CMD;
- write_ctx->header.handle = cmdinfo;
- write_ctx->payload.command_length = strlen(cmdinfo->command_to_run);
-
- writebuf[0] = uv_buf_init((char *)&write_ctx->header, sizeof(write_ctx->header));
- writebuf[1] = uv_buf_init((char *)&write_ctx->payload, sizeof(write_ctx->payload));
- writebuf[2] = uv_buf_init((char *)cmdinfo->command_to_run, write_ctx->payload.command_length);
-
-#ifdef SPAWN_DEBUG
- netdata_log_info("CLIENT %s SPAWN_PROT_EXEC_CMD %u", __func__, (unsigned)cmdinfo->serial);
-#endif
- ret = uv_write(&write_ctx->write_req, (uv_stream_t *)&spawn_channel, writebuf, 3, after_pipe_write);
- fatal_assert(ret == 0);
-}
-
-void spawn_client(void *arg)
-{
- int ret;
- struct completion *completion = (struct completion *)arg;
-
- loop = mallocz(sizeof(uv_loop_t));
- ret = uv_loop_init(loop);
- if (ret) {
- netdata_log_error("uv_loop_init(): %s", uv_strerror(ret));
- spawn_thread_error = ret;
- goto error_after_loop_init;
- }
- loop->data = NULL;
-
- spawn_async.data = NULL;
- ret = uv_async_init(loop, &spawn_async, async_cb);
- if (ret) {
- netdata_log_error("uv_async_init(): %s", uv_strerror(ret));
- spawn_thread_error = ret;
- goto error_after_async_init;
- }
-
- ret = uv_pipe_init(loop, &spawn_channel, 1);
- if (ret) {
- netdata_log_error("uv_pipe_init(): %s", uv_strerror(ret));
- spawn_thread_error = ret;
- goto error_after_pipe_init;
- }
- fatal_assert(spawn_channel.ipc);
-
- ret = create_spawn_server(loop, &spawn_channel, &process);
- if (ret) {
- netdata_log_error("Failed to fork spawn server process.");
- spawn_thread_error = ret;
- goto error_after_spawn_server;
- }
-
- spawn_thread_error = 0;
- spawn_thread_shutdown = 0;
- /* wake up initialization thread */
- completion_mark_complete(completion);
-
- prot_buffer_len = 0;
- ret = uv_read_start((uv_stream_t *)&spawn_channel, on_read_alloc, on_pipe_read);
- fatal_assert(ret == 0);
-
- while (spawn_thread_shutdown == 0) {
- struct spawn_cmd_info *cmdinfo;
-
- uv_run(loop, UV_RUN_DEFAULT);
- while (NULL != (cmdinfo = spawn_get_unprocessed_cmd())) {
- spawn_process_cmd(cmdinfo);
- }
- }
- /* cleanup operations of the event loop */
- netdata_log_info("Shutting down spawn client event loop.");
- uv_close((uv_handle_t *)&spawn_channel, NULL);
- uv_close((uv_handle_t *)&spawn_async, NULL);
- uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */
-
- netdata_log_info("Shutting down spawn client loop complete.");
- fatal_assert(0 == uv_loop_close(loop));
-
- return;
-
-error_after_spawn_server:
- uv_close((uv_handle_t *)&spawn_channel, NULL);
-error_after_pipe_init:
- uv_close((uv_handle_t *)&spawn_async, NULL);
-error_after_async_init:
- uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */
- fatal_assert(0 == uv_loop_close(loop));
-error_after_loop_init:
- freez(loop);
-
- /* wake up initialization thread */
- completion_mark_complete(completion);
-}
diff --git a/spawn/spawn_server.c b/spawn/spawn_server.c
deleted file mode 100644
index 1d79ef15..00000000
--- a/spawn/spawn_server.c
+++ /dev/null
@@ -1,386 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "spawn.h"
-
-static uv_loop_t *loop;
-static uv_pipe_t server_pipe;
-
-static int server_shutdown = 0;
-
-static uv_thread_t thread;
-
-/* spawn outstanding execution structure */
-static avl_tree_lock spawn_outstanding_exec_tree;
-
-static char prot_buffer[MAX_COMMAND_LENGTH];
-static unsigned prot_buffer_len = 0;
-
-struct spawn_execution_info {
- avl_t avl;
-
- void *handle;
- int exit_status;
- pid_t pid;
- struct spawn_execution_info *next;
-};
-
-int spawn_exec_compare(void *a, void *b)
-{
- struct spawn_execution_info *spwna = a, *spwnb = b;
-
- if (spwna->pid < spwnb->pid) return -1;
- if (spwna->pid > spwnb->pid) return 1;
-
- return 0;
-}
-
-/* wake up waiter thread to reap the spawned processes */
-static uv_mutex_t wait_children_mutex;
-static uv_cond_t wait_children_cond;
-static uint8_t spawned_processes;
-static struct spawn_execution_info *child_waited_list;
-static uv_async_t child_waited_async;
-
-static inline struct spawn_execution_info *dequeue_child_waited_list(void)
-{
- struct spawn_execution_info *exec_info;
-
- uv_mutex_lock(&wait_children_mutex);
- if (NULL == child_waited_list) {
- exec_info = NULL;
- } else {
- exec_info = child_waited_list;
- child_waited_list = exec_info->next;
- }
- uv_mutex_unlock(&wait_children_mutex);
-
- return exec_info;
-}
-
-static inline void enqueue_child_waited_list(struct spawn_execution_info *exec_info)
-{
- uv_mutex_lock(&wait_children_mutex);
- exec_info->next = child_waited_list;
- child_waited_list = exec_info;
- uv_mutex_unlock(&wait_children_mutex);
-}
-
-static void after_pipe_write(uv_write_t *req, int status)
-{
- (void)status;
-#ifdef SPAWN_DEBUG
- fprintf(stderr, "SERVER %s called status=%d\n", __func__, status);
-#endif
- void **data = req->data;
- freez(data[0]);
- freez(data[1]);
- freez(data);
-}
-
-static void child_waited_async_cb(uv_async_t *async_handle)
-{
- uv_buf_t *writebuf;
- int ret;
- struct spawn_execution_info *exec_info;
- struct write_context *write_ctx;
-
- (void)async_handle;
- while (NULL != (exec_info = dequeue_child_waited_list())) {
- write_ctx = mallocz(sizeof(*write_ctx));
-
- void **data = callocz(2, sizeof(void *));
- writebuf = callocz(2, sizeof(uv_buf_t));
-
- data[0] = write_ctx;
- data[1] = writebuf;
- write_ctx->write_req.data = data;
-
- write_ctx->header.opcode = SPAWN_PROT_CMD_EXIT_STATUS;
- write_ctx->header.handle = exec_info->handle;
- write_ctx->exit_status.exec_exit_status = exec_info->exit_status;
- writebuf[0] = uv_buf_init((char *) &write_ctx->header, sizeof(write_ctx->header));
- writebuf[1] = uv_buf_init((char *) &write_ctx->exit_status, sizeof(write_ctx->exit_status));
-#ifdef SPAWN_DEBUG
- fprintf(stderr, "SERVER %s SPAWN_PROT_CMD_EXIT_STATUS\n", __func__);
-#endif
- ret = uv_write(&write_ctx->write_req, (uv_stream_t *) &server_pipe, writebuf, 2, after_pipe_write);
- fatal_assert(ret == 0);
-
- freez(exec_info);
- }
-}
-
-static void wait_children(void *arg)
-{
- siginfo_t i;
- struct spawn_execution_info tmp, *exec_info;
- avl_t *ret_avl;
-
- (void)arg;
- while (!server_shutdown) {
- uv_mutex_lock(&wait_children_mutex);
- while (!spawned_processes) {
- uv_cond_wait(&wait_children_cond, &wait_children_mutex);
- }
- spawned_processes = 0;
- uv_mutex_unlock(&wait_children_mutex);
-
- while (!server_shutdown) {
- i.si_pid = 0;
- if (waitid(P_ALL, (id_t) 0, &i, WEXITED) == -1) {
- if (errno != ECHILD)
- fprintf(stderr, "SPAWN: Failed to wait: %s\n", strerror(errno));
- break;
- }
- if (i.si_pid == 0) {
- fprintf(stderr, "SPAWN: No child exited.\n");
- break;
- }
-#ifdef SPAWN_DEBUG
- fprintf(stderr, "SPAWN: Successfully waited for pid:%d.\n", (int) i.si_pid);
-#endif
- fatal_assert(CLD_EXITED == i.si_code);
- tmp.pid = (pid_t)i.si_pid;
- while (NULL == (ret_avl = avl_remove_lock(&spawn_outstanding_exec_tree, (avl_t *)&tmp))) {
- fprintf(stderr,
- "SPAWN: race condition detected, waiting for child process %d to be indexed.\n",
- (int)tmp.pid);
- (void)sleep_usec(10000); /* 10 msec */
- }
- exec_info = (struct spawn_execution_info *)ret_avl;
- exec_info->exit_status = i.si_status;
- enqueue_child_waited_list(exec_info);
-
- /* wake up event loop */
- fatal_assert(0 == uv_async_send(&child_waited_async));
- }
- }
-}
-
-void spawn_protocol_execute_command(void *handle, char *command_to_run, uint16_t command_length)
-{
- uv_buf_t *writebuf;
- int ret;
- avl_t *avl_ret;
- struct spawn_execution_info *exec_info;
- struct write_context *write_ctx;
-
- write_ctx = mallocz(sizeof(*write_ctx));
- void **data = callocz(2, sizeof(void *));
- writebuf = callocz(2, sizeof(uv_buf_t));
- data[0] = write_ctx;
- data[1] = writebuf;
- write_ctx->write_req.data = data;
-
- command_to_run[command_length] = '\0';
-#ifdef SPAWN_DEBUG
- fprintf(stderr, "SPAWN: executing command '%s'\n", command_to_run);
-#endif
- if (netdata_spawn(command_to_run, &write_ctx->spawn_result.exec_pid)) {
- fprintf(stderr, "SPAWN: Cannot spawn(\"%s\", \"r\").\n", command_to_run);
- write_ctx->spawn_result.exec_pid = 0;
- } else { /* successfully spawned command */
- write_ctx->spawn_result.exec_run_timestamp = now_realtime_sec();
-
- /* record it for when the process finishes execution */
- exec_info = mallocz(sizeof(*exec_info));
- exec_info->handle = handle;
- exec_info->pid = write_ctx->spawn_result.exec_pid;
- avl_ret = avl_insert_lock(&spawn_outstanding_exec_tree, (avl_t *)exec_info);
- fatal_assert(avl_ret == (avl_t *)exec_info);
-
- /* wake up the thread that blocks waiting for processes to exit */
- uv_mutex_lock(&wait_children_mutex);
- spawned_processes = 1;
- uv_cond_signal(&wait_children_cond);
- uv_mutex_unlock(&wait_children_mutex);
- }
-
- write_ctx->header.opcode = SPAWN_PROT_SPAWN_RESULT;
- write_ctx->header.handle = handle;
- writebuf[0] = uv_buf_init((char *)&write_ctx->header, sizeof(write_ctx->header));
- writebuf[1] = uv_buf_init((char *)&write_ctx->spawn_result, sizeof(write_ctx->spawn_result));
-#ifdef SPAWN_DEBUG
- fprintf(stderr, "SERVER %s SPAWN_PROT_SPAWN_RESULT\n", __func__);
-#endif
- ret = uv_write(&write_ctx->write_req, (uv_stream_t *)&server_pipe, writebuf, 2, after_pipe_write);
- fatal_assert(ret == 0);
-}
-
-static void server_parse_spawn_protocol(unsigned source_len, char *source)
-{
- unsigned required_len;
- struct spawn_prot_header *header;
- struct spawn_prot_exec_cmd *payload;
- uint16_t command_length;
-
- while (source_len) {
- required_len = sizeof(*header);
- if (prot_buffer_len < required_len)
- copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
- if (prot_buffer_len < required_len)
- return; /* Source buffer ran out */
-
- header = (struct spawn_prot_header *)prot_buffer;
- fatal_assert(SPAWN_PROT_EXEC_CMD == header->opcode);
- fatal_assert(NULL != header->handle);
-
- required_len += sizeof(*payload);
- if (prot_buffer_len < required_len)
- copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
- if (prot_buffer_len < required_len)
- return; /* Source buffer ran out */
-
- payload = (struct spawn_prot_exec_cmd *)(header + 1);
- command_length = payload->command_length;
-
- required_len += command_length;
- if (unlikely(required_len > MAX_COMMAND_LENGTH - 1)) {
- fprintf(stderr, "SPAWN: Ran out of protocol buffer space.\n");
- command_length = (MAX_COMMAND_LENGTH - 1) - (sizeof(*header) + sizeof(*payload));
- required_len = MAX_COMMAND_LENGTH - 1;
- }
- if (prot_buffer_len < required_len)
- copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len);
- if (prot_buffer_len < required_len)
- return; /* Source buffer ran out */
-
- spawn_protocol_execute_command(header->handle, payload->command_to_run, command_length);
- prot_buffer_len = 0;
- }
-}
-
-static void on_pipe_read(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf)
-{
- if (0 == nread) {
- fprintf(stderr, "SERVER %s: Zero bytes read from spawn pipe.\n", __func__);
- } else if (UV_EOF == nread) {
- fprintf(stderr, "EOF found in spawn pipe.\n");
- } else if (nread < 0) {
- fprintf(stderr, "%s: %s\n", __func__, uv_strerror(nread));
- }
-
- if (nread < 0) { /* stop spawn server due to EOF or error */
- int error;
-
- uv_mutex_lock(&wait_children_mutex);
- server_shutdown = 1;
- spawned_processes = 1;
- uv_cond_signal(&wait_children_cond);
- uv_mutex_unlock(&wait_children_mutex);
-
- fprintf(stderr, "Shutting down spawn server event loop.\n");
- /* cleanup operations of the event loop */
- (void)uv_read_stop((uv_stream_t *) pipe);
- uv_close((uv_handle_t *)&server_pipe, NULL);
-
- error = uv_thread_join(&thread);
- if (error) {
- fprintf(stderr, "uv_thread_create(): %s", uv_strerror(error));
- }
- /* After joining it is safe to destroy child_waited_async */
- uv_close((uv_handle_t *)&child_waited_async, NULL);
- } else if (nread) {
-#ifdef SPAWN_DEBUG
- fprintf(stderr, "SERVER %s nread %u\n", __func__, (unsigned)nread);
-#endif
- server_parse_spawn_protocol(nread, buf->base);
- }
- if (buf && buf->len) {
- freez(buf->base);
- }
-}
-
-static void on_read_alloc(uv_handle_t *handle,
- size_t suggested_size,
- uv_buf_t* buf)
-{
- (void)handle;
- buf->base = mallocz(suggested_size);
- buf->len = suggested_size;
-}
-
-static void ignore_signal_handler(int signo) {
- /*
- * By having a signal handler we allow spawned processes to reset default signal dispositions. Setting SIG_IGN
- * would be inherited by the spawned children which is not desirable.
- */
- (void)signo;
-}
-
-void spawn_server(void)
-{
- int error;
-
- // initialize the system clocks
- clocks_init();
-
- // close all open file descriptors, except the standard ones
- // the caller may have left open files (lxc-attach has this issue)
- for_each_open_fd(OPEN_FD_ACTION_CLOSE, OPEN_FD_EXCLUDE_STDIN | OPEN_FD_EXCLUDE_STDOUT | OPEN_FD_EXCLUDE_STDERR);
-
- // Have the libuv IPC pipe be closed when forking child processes
- (void) fcntl(0, F_SETFD, FD_CLOEXEC);
- fprintf(stderr, "Spawn server is up.\n");
-
- // Define signals we want to ignore
- struct sigaction sa;
- int signals_to_ignore[] = {SIGPIPE, SIGINT, SIGQUIT, SIGTERM, SIGHUP, SIGUSR1, SIGUSR2, SIGBUS, SIGCHLD};
- unsigned ignore_length = sizeof(signals_to_ignore) / sizeof(signals_to_ignore[0]);
-
- unsigned i;
- for (i = 0; i < ignore_length ; ++i) {
- sa.sa_flags = 0;
- sigemptyset(&sa.sa_mask);
- sa.sa_handler = ignore_signal_handler;
- if(sigaction(signals_to_ignore[i], &sa, NULL) == -1)
- fprintf(stderr, "SPAWN: Failed to change signal handler for signal: %d.\n", signals_to_ignore[i]);
- }
-
- signals_unblock();
-
- loop = uv_default_loop();
- loop->data = NULL;
-
- error = uv_pipe_init(loop, &server_pipe, 1);
- if (error) {
- fprintf(stderr, "uv_pipe_init(): %s\n", uv_strerror(error));
- exit(error);
- }
- fatal_assert(server_pipe.ipc);
-
- error = uv_pipe_open(&server_pipe, 0 /* UV_STDIN_FD */);
- if (error) {
- fprintf(stderr, "uv_pipe_open(): %s\n", uv_strerror(error));
- exit(error);
- }
- avl_init_lock(&spawn_outstanding_exec_tree, spawn_exec_compare);
-
- spawned_processes = 0;
- fatal_assert(0 == uv_cond_init(&wait_children_cond));
- fatal_assert(0 == uv_mutex_init(&wait_children_mutex));
- child_waited_list = NULL;
- error = uv_async_init(loop, &child_waited_async, child_waited_async_cb);
- if (error) {
- fprintf(stderr, "uv_async_init(): %s\n", uv_strerror(error));
- exit(error);
- }
-
- error = uv_thread_create(&thread, wait_children, NULL);
- if (error) {
- fprintf(stderr, "uv_thread_create(): %s\n", uv_strerror(error));
- exit(error);
- }
-
- prot_buffer_len = 0;
- error = uv_read_start((uv_stream_t *)&server_pipe, on_read_alloc, on_pipe_read);
- fatal_assert(error == 0);
-
- while (!server_shutdown) {
- uv_run(loop, UV_RUN_DEFAULT);
- }
- fprintf(stderr, "Shutting down spawn server loop complete.\n");
- fatal_assert(0 == uv_loop_close(loop));
-
- exit(0);
-}