From 310edf444908b09ea6d00c03baceb7925f3bb7a2 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 21 Mar 2024 18:19:04 +0100 Subject: Merging upstream version 1.45.0. Signed-off-by: Daniel Baumann --- spawn/Makefile.am | 9 -- spawn/README.md | 0 spawn/spawn.c | 289 -------------------------------------- spawn/spawn.h | 109 --------------- spawn/spawn_client.c | 248 --------------------------------- spawn/spawn_server.c | 386 --------------------------------------------------- 6 files changed, 1041 deletions(-) delete mode 100644 spawn/Makefile.am delete mode 100644 spawn/README.md delete mode 100644 spawn/spawn.c delete mode 100644 spawn/spawn.h delete mode 100644 spawn/spawn_client.c delete mode 100644 spawn/spawn_server.c (limited to 'spawn') diff --git a/spawn/Makefile.am b/spawn/Makefile.am deleted file mode 100644 index 02fe3a314..000000000 --- a/spawn/Makefile.am +++ /dev/null @@ -1,9 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-or-later - -AUTOMAKE_OPTIONS = subdir-objects -MAINTAINERCLEANFILES = $(srcdir)/Makefile.in - -dist_noinst_DATA = \ - README.md \ - $(NULL) - diff --git a/spawn/README.md b/spawn/README.md deleted file mode 100644 index e69de29bb..000000000 diff --git a/spawn/spawn.c b/spawn/spawn.c deleted file mode 100644 index 3d62df796..000000000 --- a/spawn/spawn.c +++ /dev/null @@ -1,289 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "spawn.h" - -static uv_thread_t thread; -int spawn_thread_error; -int spawn_thread_shutdown; - -struct spawn_queue spawn_cmd_queue; - -static struct spawn_cmd_info *create_spawn_cmd(const char *command_to_run) -{ - struct spawn_cmd_info *cmdinfo; - - cmdinfo = mallocz(sizeof(*cmdinfo)); - fatal_assert(0 == uv_cond_init(&cmdinfo->cond)); - fatal_assert(0 == uv_mutex_init(&cmdinfo->mutex)); - cmdinfo->serial = 0; /* invalid */ - cmdinfo->command_to_run = strdupz(command_to_run); - cmdinfo->exit_status = -1; /* invalid */ - cmdinfo->pid = -1; /* invalid */ - cmdinfo->flags = 0; - - return cmdinfo; -} - -void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo) -{ - uv_cond_destroy(&cmdinfo->cond); - uv_mutex_destroy(&cmdinfo->mutex); - - freez(cmdinfo->command_to_run); - freez(cmdinfo); -} - -int spawn_cmd_compare(void *a, void *b) -{ - struct spawn_cmd_info *cmda = a, *cmdb = b; - - /* No need for mutex, serial will never change and the entries cannot be deallocated yet */ - if (cmda->serial < cmdb->serial) return -1; - if (cmda->serial > cmdb->serial) return 1; - - return 0; -} - -static void init_spawn_cmd_queue(void) -{ - spawn_cmd_queue.cmd_tree.root = NULL; - spawn_cmd_queue.cmd_tree.compar = spawn_cmd_compare; - spawn_cmd_queue.size = 0; - spawn_cmd_queue.latest_serial = 0; - fatal_assert(0 == uv_cond_init(&spawn_cmd_queue.cond)); - fatal_assert(0 == uv_mutex_init(&spawn_cmd_queue.mutex)); -} - -/* - * Returns serial number of the enqueued command - */ -uint64_t spawn_enq_cmd(const char *command_to_run) -{ - unsigned queue_size; - uint64_t serial; - avl_t *avl_ret; - struct spawn_cmd_info *cmdinfo; - - cmdinfo = create_spawn_cmd(command_to_run); - - /* wait for free space in queue */ - uv_mutex_lock(&spawn_cmd_queue.mutex); - while ((queue_size = spawn_cmd_queue.size) == SPAWN_MAX_OUTSTANDING) { - uv_cond_wait(&spawn_cmd_queue.cond, &spawn_cmd_queue.mutex); - } - fatal_assert(queue_size < SPAWN_MAX_OUTSTANDING); - spawn_cmd_queue.size = queue_size + 1; - - serial = ++spawn_cmd_queue.latest_serial; /* 0 is invalid */ - cmdinfo->serial = serial; /* No need to take the cmd mutex since it is unreachable at the moment */ - - /* enqueue command */ - avl_ret = avl_insert(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo); - fatal_assert(avl_ret == (avl_t *)cmdinfo); - uv_mutex_unlock(&spawn_cmd_queue.mutex); - - /* wake up event loop */ - fatal_assert(0 == uv_async_send(&spawn_async)); - return serial; -} - -/* - * Blocks until command with serial finishes running. Only one thread is allowed to wait per command. - */ -void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp) -{ - avl_t *avl_ret; - struct spawn_cmd_info tmp, *cmdinfo; - - tmp.serial = serial; - - uv_mutex_lock(&spawn_cmd_queue.mutex); - avl_ret = avl_search(&spawn_cmd_queue.cmd_tree, (avl_t *)&tmp); - uv_mutex_unlock(&spawn_cmd_queue.mutex); - - fatal_assert(avl_ret); /* Could be NULL if more than 1 threads wait for the command */ - cmdinfo = (struct spawn_cmd_info *)avl_ret; - - uv_mutex_lock(&cmdinfo->mutex); - while (!(cmdinfo->flags & SPAWN_CMD_DONE)) { - /* Only 1 thread is allowed to wait for this command to finish */ - uv_cond_wait(&cmdinfo->cond, &cmdinfo->mutex); - } - uv_mutex_unlock(&cmdinfo->mutex); - - spawn_deq_cmd(cmdinfo); - *exit_status = cmdinfo->exit_status; - *exec_run_timestamp = cmdinfo->exec_run_timestamp; - - destroy_spawn_cmd(cmdinfo); -} - -void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo) -{ - unsigned queue_size; - avl_t *avl_ret; - - uv_mutex_lock(&spawn_cmd_queue.mutex); - queue_size = spawn_cmd_queue.size; - fatal_assert(queue_size); - /* dequeue command */ - avl_ret = avl_remove(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo); - fatal_assert(avl_ret); - - spawn_cmd_queue.size = queue_size - 1; - - /* wake up callers */ - uv_cond_signal(&spawn_cmd_queue.cond); - uv_mutex_unlock(&spawn_cmd_queue.mutex); -} - -/* - * Must be called from the spawn client event loop context. This way no mutex is needed because the event loop is the - * only writer as far as struct spawn_cmd_info entries are concerned. - */ -static int find_unprocessed_spawn_cmd_cb(void *entry, void *data) -{ - struct spawn_cmd_info **cmdinfop = data, *cmdinfo = entry; - - if (!(cmdinfo->flags & SPAWN_CMD_PROCESSED)) { - *cmdinfop = cmdinfo; - return -1; /* break tree traversal */ - } - return 0; /* continue traversing */ -} - -struct spawn_cmd_info *spawn_get_unprocessed_cmd(void) -{ - struct spawn_cmd_info *cmdinfo; - unsigned queue_size; - int ret; - - uv_mutex_lock(&spawn_cmd_queue.mutex); - queue_size = spawn_cmd_queue.size; - if (queue_size == 0) { - uv_mutex_unlock(&spawn_cmd_queue.mutex); - return NULL; - } - /* find command */ - cmdinfo = NULL; - ret = avl_traverse(&spawn_cmd_queue.cmd_tree, find_unprocessed_spawn_cmd_cb, (void *)&cmdinfo); - if (-1 != ret) { /* no commands available for processing */ - uv_mutex_unlock(&spawn_cmd_queue.mutex); - return NULL; - } - uv_mutex_unlock(&spawn_cmd_queue.mutex); - - return cmdinfo; -} - -/** - * This function spawns a process that shares a libuv IPC pipe with the caller and performs spawn server duties. - * The spawn server process will close all open file descriptors except for the pipe, UV_STDOUT_FD, and UV_STDERR_FD. - * The caller has to be the netdata user as configured. - * - * @param loop the libuv loop of the caller context - * @param spawn_channel the bidirectional libuv IPC pipe that the server and the caller will share - * @param process the spawn server libuv process context - * @return 0 on success or the libuv error code - */ -int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process) -{ - uv_process_options_t options = {0}; - char *args[3]; - int ret; -#define SPAWN_SERVER_DESCRIPTORS (3) - uv_stdio_container_t stdio[SPAWN_SERVER_DESCRIPTORS]; - struct passwd *passwd = NULL; - char *user = NULL; - - passwd = getpwuid(getuid()); - user = (passwd && passwd->pw_name) ? passwd->pw_name : ""; - - args[0] = netdata_exe_file; - args[1] = SPAWN_SERVER_COMMAND_LINE_ARGUMENT; - args[2] = NULL; - - memset(&options, 0, sizeof(options)); - options.file = netdata_exe_file; - options.args = args; - options.exit_cb = NULL; //exit_cb; - options.stdio = stdio; - options.stdio_count = SPAWN_SERVER_DESCRIPTORS; - - stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE; - stdio[0].data.stream = (uv_stream_t *)spawn_channel; /* bidirectional libuv pipe */ - stdio[1].flags = UV_INHERIT_FD; - stdio[1].data.fd = 1 /* UV_STDOUT_FD */; - stdio[2].flags = UV_INHERIT_FD; - stdio[2].data.fd = nd_log_health_fd() /* UV_STDERR_FD */; - - ret = uv_spawn(loop, process, &options); /* execute the netdata binary again as the netdata user */ - if (0 != ret) { - netdata_log_error("uv_spawn (process: \"%s\") (user: %s) failed (%s).", netdata_exe_file, user, uv_strerror(ret)); - fatal("Cannot start netdata without the spawn server."); - } - - return ret; -} - -#define CONCURRENT_SPAWNS 16 -#define SPAWN_ITERATIONS 10000 -#undef CONCURRENT_STRESS_TEST - -void spawn_init(void) -{ - struct completion completion; - int error; - - netdata_log_info("Initializing spawn client."); - - init_spawn_cmd_queue(); - - completion_init(&completion); - error = uv_thread_create(&thread, spawn_client, &completion); - if (error) { - netdata_log_error("uv_thread_create(): %s", uv_strerror(error)); - goto after_error; - } - /* wait for spawn client thread to initialize */ - completion_wait_for(&completion); - completion_destroy(&completion); - uv_thread_set_name_np(thread, "DAEMON_SPAWN"); - - if (spawn_thread_error) { - error = uv_thread_join(&thread); - if (error) { - netdata_log_error("uv_thread_create(): %s", uv_strerror(error)); - } - goto after_error; - } -#ifdef CONCURRENT_STRESS_TEST - signals_reset(); - signals_unblock(); - - sleep(60); - uint64_t serial[CONCURRENT_SPAWNS]; - for (int j = 0 ; j < SPAWN_ITERATIONS ; ++j) { - for (int i = 0; i < CONCURRENT_SPAWNS; ++i) { - char cmd[64]; - sprintf(cmd, "echo CONCURRENT_STRESS_TEST %d 1>&2", j * CONCURRENT_SPAWNS + i + 1); - serial[i] = spawn_enq_cmd(cmd); - netdata_log_info("Queued command %s for spawning.", cmd); - } - int exit_status; - time_t exec_run_timestamp; - for (int i = 0; i < CONCURRENT_SPAWNS; ++i) { - netdata_log_info("Started waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status, - exec_run_timestamp); - spawn_wait_cmd(serial[i], &exit_status, &exec_run_timestamp); - netdata_log_info("Finished waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status, - exec_run_timestamp); - } - } - exit(0); -#endif - return; - - after_error: - netdata_log_error("Failed to initialize spawn service. The alarms notifications will not be spawned."); -} diff --git a/spawn/spawn.h b/spawn/spawn.h deleted file mode 100644 index 6e9e51ef0..000000000 --- a/spawn/spawn.h +++ /dev/null @@ -1,109 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_SPAWN_H -#define NETDATA_SPAWN_H 1 - -#include "daemon/common.h" - -#define SPAWN_SERVER_COMMAND_LINE_ARGUMENT "--special-spawn-server" - -typedef enum spawn_protocol { - SPAWN_PROT_EXEC_CMD = 0, - SPAWN_PROT_SPAWN_RESULT, - SPAWN_PROT_CMD_EXIT_STATUS -} spawn_prot_t; - -struct spawn_prot_exec_cmd { - uint16_t command_length; - char command_to_run[]; -}; - -struct spawn_prot_spawn_result { - pid_t exec_pid; /* 0 if failed to spawn */ - time_t exec_run_timestamp; /* time of successfully spawning the command */ -}; - -struct spawn_prot_cmd_exit_status { - int exec_exit_status; -}; - -struct spawn_prot_header { - spawn_prot_t opcode; - void *handle; -}; - -#undef SPAWN_DEBUG /* define to enable debug prints */ - -#define SPAWN_MAX_OUTSTANDING (32768) - -#define SPAWN_CMD_PROCESSED 0x00000001 -#define SPAWN_CMD_IN_PROGRESS 0x00000002 -#define SPAWN_CMD_FAILED_TO_SPAWN 0x00000004 -#define SPAWN_CMD_DONE 0x00000008 - -struct spawn_cmd_info { - avl_t avl; - - /* concurrency control per command */ - uv_mutex_t mutex; - uv_cond_t cond; /* users block here until command has finished */ - - uint64_t serial; - char *command_to_run; - int exit_status; - pid_t pid; - unsigned long flags; - time_t exec_run_timestamp; /* time of successfully spawning the command */ -}; - -/* spawn command queue */ -struct spawn_queue { - avl_tree_type cmd_tree; - - /* concurrency control of command queue */ - uv_mutex_t mutex; - uv_cond_t cond; - - volatile unsigned size; - uint64_t latest_serial; -}; - -struct write_context { - uv_write_t write_req; - struct spawn_prot_header header; - struct spawn_prot_cmd_exit_status exit_status; - struct spawn_prot_spawn_result spawn_result; - struct spawn_prot_exec_cmd payload; -}; - -extern int spawn_thread_error; -extern int spawn_thread_shutdown; -extern uv_async_t spawn_async; - -void spawn_init(void); -void spawn_server(void); -void spawn_client(void *arg); -void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo); -uint64_t spawn_enq_cmd(const char *command_to_run); -void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp); -void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo); -struct spawn_cmd_info *spawn_get_unprocessed_cmd(void); -int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process); - -/* - * Copies from the source buffer to the protocol buffer. It advances the source buffer by the amount copied. It - * subtracts the amount copied from the source length. - */ -static inline void copy_to_prot_buffer(char *prot_buffer, unsigned *prot_buffer_len, unsigned max_to_copy, - char **source, unsigned *source_len) -{ - unsigned to_copy; - - to_copy = MIN(max_to_copy, *source_len); - memcpy(prot_buffer + *prot_buffer_len, *source, to_copy); - *prot_buffer_len += to_copy; - *source += to_copy; - *source_len -= to_copy; -} - -#endif //NETDATA_SPAWN_H diff --git a/spawn/spawn_client.c b/spawn/spawn_client.c deleted file mode 100644 index 8928a468c..000000000 --- a/spawn/spawn_client.c +++ /dev/null @@ -1,248 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "spawn.h" - -static uv_process_t process; -static uv_pipe_t spawn_channel; -static uv_loop_t *loop; -uv_async_t spawn_async; - -static char prot_buffer[MAX_COMMAND_LENGTH]; -static unsigned prot_buffer_len = 0; - -static void async_cb(uv_async_t *handle) -{ - uv_stop(handle->loop); -} - -static void after_pipe_write(uv_write_t* req, int status) -{ - (void)status; -#ifdef SPAWN_DEBUG - netdata_log_info("CLIENT %s called status=%d", __func__, status); -#endif - void **data = req->data; - freez(data[0]); - freez(data[1]); - freez(data); -} - -static void client_parse_spawn_protocol(unsigned source_len, char *source) -{ - unsigned required_len; - struct spawn_prot_header *header; - struct spawn_prot_spawn_result *spawn_result; - struct spawn_prot_cmd_exit_status *exit_status; - struct spawn_cmd_info *cmdinfo; - - while (source_len) { - required_len = sizeof(*header); - if (prot_buffer_len < required_len) - copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); - if (prot_buffer_len < required_len) - return; /* Source buffer ran out */ - - header = (struct spawn_prot_header *)prot_buffer; - cmdinfo = (struct spawn_cmd_info *)header->handle; - fatal_assert(NULL != cmdinfo); - - switch(header->opcode) { - case SPAWN_PROT_SPAWN_RESULT: - required_len += sizeof(*spawn_result); - if (prot_buffer_len < required_len) - copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); - if (prot_buffer_len < required_len) - return; /* Source buffer ran out */ - - spawn_result = (struct spawn_prot_spawn_result *)(header + 1); - uv_mutex_lock(&cmdinfo->mutex); - cmdinfo->pid = spawn_result->exec_pid; - if (0 == cmdinfo->pid) { /* Failed to spawn */ -#ifdef SPAWN_DEBUG - netdata_log_info("CLIENT %s SPAWN_PROT_SPAWN_RESULT failed to spawn.", __func__); -#endif - cmdinfo->flags |= SPAWN_CMD_FAILED_TO_SPAWN | SPAWN_CMD_DONE; - uv_cond_signal(&cmdinfo->cond); - } else { - cmdinfo->exec_run_timestamp = spawn_result->exec_run_timestamp; - cmdinfo->flags |= SPAWN_CMD_IN_PROGRESS; -#ifdef SPAWN_DEBUG - netdata_log_info("CLIENT %s SPAWN_PROT_SPAWN_RESULT in progress.", __func__); -#endif - } - uv_mutex_unlock(&cmdinfo->mutex); - prot_buffer_len = 0; - break; - case SPAWN_PROT_CMD_EXIT_STATUS: - required_len += sizeof(*exit_status); - if (prot_buffer_len < required_len) - copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); - if (prot_buffer_len < required_len) - return; /* Source buffer ran out */ - - exit_status = (struct spawn_prot_cmd_exit_status *)(header + 1); - uv_mutex_lock(&cmdinfo->mutex); - cmdinfo->exit_status = exit_status->exec_exit_status; -#ifdef SPAWN_DEBUG - netdata_log_info("CLIENT %s SPAWN_PROT_CMD_EXIT_STATUS %d.", __func__, exit_status->exec_exit_status); -#endif - cmdinfo->flags |= SPAWN_CMD_DONE; - uv_cond_signal(&cmdinfo->cond); - uv_mutex_unlock(&cmdinfo->mutex); - prot_buffer_len = 0; - break; - default: - fatal_assert(0); - break; - } - - } -} - -static void on_pipe_read(uv_stream_t* pipe, ssize_t nread, const uv_buf_t* buf) -{ - if (0 == nread) { - netdata_log_info("%s: Zero bytes read from spawn pipe.", __func__); - } else if (UV_EOF == nread) { - netdata_log_info("EOF found in spawn pipe."); - } else if (nread < 0) { - netdata_log_error("%s: %s", __func__, uv_strerror(nread)); - } - - if (nread < 0) { /* stop stream due to EOF or error */ - (void)uv_read_stop((uv_stream_t *)pipe); - } else if (nread) { -#ifdef SPAWN_DEBUG - netdata_log_info("CLIENT %s read %u", __func__, (unsigned)nread); -#endif - client_parse_spawn_protocol(nread, buf->base); - } - if (buf && buf->len) { - freez(buf->base); - } - - if (nread < 0) { - uv_close((uv_handle_t *)pipe, NULL); - } -} - -static void on_read_alloc(uv_handle_t* handle, - size_t suggested_size, - uv_buf_t* buf) -{ - (void)handle; - buf->base = mallocz(suggested_size); - buf->len = suggested_size; -} - -static void spawn_process_cmd(struct spawn_cmd_info *cmdinfo) -{ - int ret; - uv_buf_t *writebuf; - struct write_context *write_ctx; - - void **data = callocz(2, sizeof(void *)); - writebuf = callocz(3, sizeof(uv_buf_t)); - write_ctx = callocz(1, sizeof(*write_ctx)); - - data[0] = write_ctx; - data[1] = writebuf; - write_ctx->write_req.data = data; - - uv_mutex_lock(&cmdinfo->mutex); - cmdinfo->flags |= SPAWN_CMD_PROCESSED; - uv_mutex_unlock(&cmdinfo->mutex); - - write_ctx->header.opcode = SPAWN_PROT_EXEC_CMD; - write_ctx->header.handle = cmdinfo; - write_ctx->payload.command_length = strlen(cmdinfo->command_to_run); - - writebuf[0] = uv_buf_init((char *)&write_ctx->header, sizeof(write_ctx->header)); - writebuf[1] = uv_buf_init((char *)&write_ctx->payload, sizeof(write_ctx->payload)); - writebuf[2] = uv_buf_init((char *)cmdinfo->command_to_run, write_ctx->payload.command_length); - -#ifdef SPAWN_DEBUG - netdata_log_info("CLIENT %s SPAWN_PROT_EXEC_CMD %u", __func__, (unsigned)cmdinfo->serial); -#endif - ret = uv_write(&write_ctx->write_req, (uv_stream_t *)&spawn_channel, writebuf, 3, after_pipe_write); - fatal_assert(ret == 0); -} - -void spawn_client(void *arg) -{ - int ret; - struct completion *completion = (struct completion *)arg; - - loop = mallocz(sizeof(uv_loop_t)); - ret = uv_loop_init(loop); - if (ret) { - netdata_log_error("uv_loop_init(): %s", uv_strerror(ret)); - spawn_thread_error = ret; - goto error_after_loop_init; - } - loop->data = NULL; - - spawn_async.data = NULL; - ret = uv_async_init(loop, &spawn_async, async_cb); - if (ret) { - netdata_log_error("uv_async_init(): %s", uv_strerror(ret)); - spawn_thread_error = ret; - goto error_after_async_init; - } - - ret = uv_pipe_init(loop, &spawn_channel, 1); - if (ret) { - netdata_log_error("uv_pipe_init(): %s", uv_strerror(ret)); - spawn_thread_error = ret; - goto error_after_pipe_init; - } - fatal_assert(spawn_channel.ipc); - - ret = create_spawn_server(loop, &spawn_channel, &process); - if (ret) { - netdata_log_error("Failed to fork spawn server process."); - spawn_thread_error = ret; - goto error_after_spawn_server; - } - - spawn_thread_error = 0; - spawn_thread_shutdown = 0; - /* wake up initialization thread */ - completion_mark_complete(completion); - - prot_buffer_len = 0; - ret = uv_read_start((uv_stream_t *)&spawn_channel, on_read_alloc, on_pipe_read); - fatal_assert(ret == 0); - - while (spawn_thread_shutdown == 0) { - struct spawn_cmd_info *cmdinfo; - - uv_run(loop, UV_RUN_DEFAULT); - while (NULL != (cmdinfo = spawn_get_unprocessed_cmd())) { - spawn_process_cmd(cmdinfo); - } - } - /* cleanup operations of the event loop */ - netdata_log_info("Shutting down spawn client event loop."); - uv_close((uv_handle_t *)&spawn_channel, NULL); - uv_close((uv_handle_t *)&spawn_async, NULL); - uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */ - - netdata_log_info("Shutting down spawn client loop complete."); - fatal_assert(0 == uv_loop_close(loop)); - - return; - -error_after_spawn_server: - uv_close((uv_handle_t *)&spawn_channel, NULL); -error_after_pipe_init: - uv_close((uv_handle_t *)&spawn_async, NULL); -error_after_async_init: - uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */ - fatal_assert(0 == uv_loop_close(loop)); -error_after_loop_init: - freez(loop); - - /* wake up initialization thread */ - completion_mark_complete(completion); -} diff --git a/spawn/spawn_server.c b/spawn/spawn_server.c deleted file mode 100644 index 1d79ef15d..000000000 --- a/spawn/spawn_server.c +++ /dev/null @@ -1,386 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "spawn.h" - -static uv_loop_t *loop; -static uv_pipe_t server_pipe; - -static int server_shutdown = 0; - -static uv_thread_t thread; - -/* spawn outstanding execution structure */ -static avl_tree_lock spawn_outstanding_exec_tree; - -static char prot_buffer[MAX_COMMAND_LENGTH]; -static unsigned prot_buffer_len = 0; - -struct spawn_execution_info { - avl_t avl; - - void *handle; - int exit_status; - pid_t pid; - struct spawn_execution_info *next; -}; - -int spawn_exec_compare(void *a, void *b) -{ - struct spawn_execution_info *spwna = a, *spwnb = b; - - if (spwna->pid < spwnb->pid) return -1; - if (spwna->pid > spwnb->pid) return 1; - - return 0; -} - -/* wake up waiter thread to reap the spawned processes */ -static uv_mutex_t wait_children_mutex; -static uv_cond_t wait_children_cond; -static uint8_t spawned_processes; -static struct spawn_execution_info *child_waited_list; -static uv_async_t child_waited_async; - -static inline struct spawn_execution_info *dequeue_child_waited_list(void) -{ - struct spawn_execution_info *exec_info; - - uv_mutex_lock(&wait_children_mutex); - if (NULL == child_waited_list) { - exec_info = NULL; - } else { - exec_info = child_waited_list; - child_waited_list = exec_info->next; - } - uv_mutex_unlock(&wait_children_mutex); - - return exec_info; -} - -static inline void enqueue_child_waited_list(struct spawn_execution_info *exec_info) -{ - uv_mutex_lock(&wait_children_mutex); - exec_info->next = child_waited_list; - child_waited_list = exec_info; - uv_mutex_unlock(&wait_children_mutex); -} - -static void after_pipe_write(uv_write_t *req, int status) -{ - (void)status; -#ifdef SPAWN_DEBUG - fprintf(stderr, "SERVER %s called status=%d\n", __func__, status); -#endif - void **data = req->data; - freez(data[0]); - freez(data[1]); - freez(data); -} - -static void child_waited_async_cb(uv_async_t *async_handle) -{ - uv_buf_t *writebuf; - int ret; - struct spawn_execution_info *exec_info; - struct write_context *write_ctx; - - (void)async_handle; - while (NULL != (exec_info = dequeue_child_waited_list())) { - write_ctx = mallocz(sizeof(*write_ctx)); - - void **data = callocz(2, sizeof(void *)); - writebuf = callocz(2, sizeof(uv_buf_t)); - - data[0] = write_ctx; - data[1] = writebuf; - write_ctx->write_req.data = data; - - write_ctx->header.opcode = SPAWN_PROT_CMD_EXIT_STATUS; - write_ctx->header.handle = exec_info->handle; - write_ctx->exit_status.exec_exit_status = exec_info->exit_status; - writebuf[0] = uv_buf_init((char *) &write_ctx->header, sizeof(write_ctx->header)); - writebuf[1] = uv_buf_init((char *) &write_ctx->exit_status, sizeof(write_ctx->exit_status)); -#ifdef SPAWN_DEBUG - fprintf(stderr, "SERVER %s SPAWN_PROT_CMD_EXIT_STATUS\n", __func__); -#endif - ret = uv_write(&write_ctx->write_req, (uv_stream_t *) &server_pipe, writebuf, 2, after_pipe_write); - fatal_assert(ret == 0); - - freez(exec_info); - } -} - -static void wait_children(void *arg) -{ - siginfo_t i; - struct spawn_execution_info tmp, *exec_info; - avl_t *ret_avl; - - (void)arg; - while (!server_shutdown) { - uv_mutex_lock(&wait_children_mutex); - while (!spawned_processes) { - uv_cond_wait(&wait_children_cond, &wait_children_mutex); - } - spawned_processes = 0; - uv_mutex_unlock(&wait_children_mutex); - - while (!server_shutdown) { - i.si_pid = 0; - if (waitid(P_ALL, (id_t) 0, &i, WEXITED) == -1) { - if (errno != ECHILD) - fprintf(stderr, "SPAWN: Failed to wait: %s\n", strerror(errno)); - break; - } - if (i.si_pid == 0) { - fprintf(stderr, "SPAWN: No child exited.\n"); - break; - } -#ifdef SPAWN_DEBUG - fprintf(stderr, "SPAWN: Successfully waited for pid:%d.\n", (int) i.si_pid); -#endif - fatal_assert(CLD_EXITED == i.si_code); - tmp.pid = (pid_t)i.si_pid; - while (NULL == (ret_avl = avl_remove_lock(&spawn_outstanding_exec_tree, (avl_t *)&tmp))) { - fprintf(stderr, - "SPAWN: race condition detected, waiting for child process %d to be indexed.\n", - (int)tmp.pid); - (void)sleep_usec(10000); /* 10 msec */ - } - exec_info = (struct spawn_execution_info *)ret_avl; - exec_info->exit_status = i.si_status; - enqueue_child_waited_list(exec_info); - - /* wake up event loop */ - fatal_assert(0 == uv_async_send(&child_waited_async)); - } - } -} - -void spawn_protocol_execute_command(void *handle, char *command_to_run, uint16_t command_length) -{ - uv_buf_t *writebuf; - int ret; - avl_t *avl_ret; - struct spawn_execution_info *exec_info; - struct write_context *write_ctx; - - write_ctx = mallocz(sizeof(*write_ctx)); - void **data = callocz(2, sizeof(void *)); - writebuf = callocz(2, sizeof(uv_buf_t)); - data[0] = write_ctx; - data[1] = writebuf; - write_ctx->write_req.data = data; - - command_to_run[command_length] = '\0'; -#ifdef SPAWN_DEBUG - fprintf(stderr, "SPAWN: executing command '%s'\n", command_to_run); -#endif - if (netdata_spawn(command_to_run, &write_ctx->spawn_result.exec_pid)) { - fprintf(stderr, "SPAWN: Cannot spawn(\"%s\", \"r\").\n", command_to_run); - write_ctx->spawn_result.exec_pid = 0; - } else { /* successfully spawned command */ - write_ctx->spawn_result.exec_run_timestamp = now_realtime_sec(); - - /* record it for when the process finishes execution */ - exec_info = mallocz(sizeof(*exec_info)); - exec_info->handle = handle; - exec_info->pid = write_ctx->spawn_result.exec_pid; - avl_ret = avl_insert_lock(&spawn_outstanding_exec_tree, (avl_t *)exec_info); - fatal_assert(avl_ret == (avl_t *)exec_info); - - /* wake up the thread that blocks waiting for processes to exit */ - uv_mutex_lock(&wait_children_mutex); - spawned_processes = 1; - uv_cond_signal(&wait_children_cond); - uv_mutex_unlock(&wait_children_mutex); - } - - write_ctx->header.opcode = SPAWN_PROT_SPAWN_RESULT; - write_ctx->header.handle = handle; - writebuf[0] = uv_buf_init((char *)&write_ctx->header, sizeof(write_ctx->header)); - writebuf[1] = uv_buf_init((char *)&write_ctx->spawn_result, sizeof(write_ctx->spawn_result)); -#ifdef SPAWN_DEBUG - fprintf(stderr, "SERVER %s SPAWN_PROT_SPAWN_RESULT\n", __func__); -#endif - ret = uv_write(&write_ctx->write_req, (uv_stream_t *)&server_pipe, writebuf, 2, after_pipe_write); - fatal_assert(ret == 0); -} - -static void server_parse_spawn_protocol(unsigned source_len, char *source) -{ - unsigned required_len; - struct spawn_prot_header *header; - struct spawn_prot_exec_cmd *payload; - uint16_t command_length; - - while (source_len) { - required_len = sizeof(*header); - if (prot_buffer_len < required_len) - copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); - if (prot_buffer_len < required_len) - return; /* Source buffer ran out */ - - header = (struct spawn_prot_header *)prot_buffer; - fatal_assert(SPAWN_PROT_EXEC_CMD == header->opcode); - fatal_assert(NULL != header->handle); - - required_len += sizeof(*payload); - if (prot_buffer_len < required_len) - copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); - if (prot_buffer_len < required_len) - return; /* Source buffer ran out */ - - payload = (struct spawn_prot_exec_cmd *)(header + 1); - command_length = payload->command_length; - - required_len += command_length; - if (unlikely(required_len > MAX_COMMAND_LENGTH - 1)) { - fprintf(stderr, "SPAWN: Ran out of protocol buffer space.\n"); - command_length = (MAX_COMMAND_LENGTH - 1) - (sizeof(*header) + sizeof(*payload)); - required_len = MAX_COMMAND_LENGTH - 1; - } - if (prot_buffer_len < required_len) - copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); - if (prot_buffer_len < required_len) - return; /* Source buffer ran out */ - - spawn_protocol_execute_command(header->handle, payload->command_to_run, command_length); - prot_buffer_len = 0; - } -} - -static void on_pipe_read(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf) -{ - if (0 == nread) { - fprintf(stderr, "SERVER %s: Zero bytes read from spawn pipe.\n", __func__); - } else if (UV_EOF == nread) { - fprintf(stderr, "EOF found in spawn pipe.\n"); - } else if (nread < 0) { - fprintf(stderr, "%s: %s\n", __func__, uv_strerror(nread)); - } - - if (nread < 0) { /* stop spawn server due to EOF or error */ - int error; - - uv_mutex_lock(&wait_children_mutex); - server_shutdown = 1; - spawned_processes = 1; - uv_cond_signal(&wait_children_cond); - uv_mutex_unlock(&wait_children_mutex); - - fprintf(stderr, "Shutting down spawn server event loop.\n"); - /* cleanup operations of the event loop */ - (void)uv_read_stop((uv_stream_t *) pipe); - uv_close((uv_handle_t *)&server_pipe, NULL); - - error = uv_thread_join(&thread); - if (error) { - fprintf(stderr, "uv_thread_create(): %s", uv_strerror(error)); - } - /* After joining it is safe to destroy child_waited_async */ - uv_close((uv_handle_t *)&child_waited_async, NULL); - } else if (nread) { -#ifdef SPAWN_DEBUG - fprintf(stderr, "SERVER %s nread %u\n", __func__, (unsigned)nread); -#endif - server_parse_spawn_protocol(nread, buf->base); - } - if (buf && buf->len) { - freez(buf->base); - } -} - -static void on_read_alloc(uv_handle_t *handle, - size_t suggested_size, - uv_buf_t* buf) -{ - (void)handle; - buf->base = mallocz(suggested_size); - buf->len = suggested_size; -} - -static void ignore_signal_handler(int signo) { - /* - * By having a signal handler we allow spawned processes to reset default signal dispositions. Setting SIG_IGN - * would be inherited by the spawned children which is not desirable. - */ - (void)signo; -} - -void spawn_server(void) -{ - int error; - - // initialize the system clocks - clocks_init(); - - // close all open file descriptors, except the standard ones - // the caller may have left open files (lxc-attach has this issue) - for_each_open_fd(OPEN_FD_ACTION_CLOSE, OPEN_FD_EXCLUDE_STDIN | OPEN_FD_EXCLUDE_STDOUT | OPEN_FD_EXCLUDE_STDERR); - - // Have the libuv IPC pipe be closed when forking child processes - (void) fcntl(0, F_SETFD, FD_CLOEXEC); - fprintf(stderr, "Spawn server is up.\n"); - - // Define signals we want to ignore - struct sigaction sa; - int signals_to_ignore[] = {SIGPIPE, SIGINT, SIGQUIT, SIGTERM, SIGHUP, SIGUSR1, SIGUSR2, SIGBUS, SIGCHLD}; - unsigned ignore_length = sizeof(signals_to_ignore) / sizeof(signals_to_ignore[0]); - - unsigned i; - for (i = 0; i < ignore_length ; ++i) { - sa.sa_flags = 0; - sigemptyset(&sa.sa_mask); - sa.sa_handler = ignore_signal_handler; - if(sigaction(signals_to_ignore[i], &sa, NULL) == -1) - fprintf(stderr, "SPAWN: Failed to change signal handler for signal: %d.\n", signals_to_ignore[i]); - } - - signals_unblock(); - - loop = uv_default_loop(); - loop->data = NULL; - - error = uv_pipe_init(loop, &server_pipe, 1); - if (error) { - fprintf(stderr, "uv_pipe_init(): %s\n", uv_strerror(error)); - exit(error); - } - fatal_assert(server_pipe.ipc); - - error = uv_pipe_open(&server_pipe, 0 /* UV_STDIN_FD */); - if (error) { - fprintf(stderr, "uv_pipe_open(): %s\n", uv_strerror(error)); - exit(error); - } - avl_init_lock(&spawn_outstanding_exec_tree, spawn_exec_compare); - - spawned_processes = 0; - fatal_assert(0 == uv_cond_init(&wait_children_cond)); - fatal_assert(0 == uv_mutex_init(&wait_children_mutex)); - child_waited_list = NULL; - error = uv_async_init(loop, &child_waited_async, child_waited_async_cb); - if (error) { - fprintf(stderr, "uv_async_init(): %s\n", uv_strerror(error)); - exit(error); - } - - error = uv_thread_create(&thread, wait_children, NULL); - if (error) { - fprintf(stderr, "uv_thread_create(): %s\n", uv_strerror(error)); - exit(error); - } - - prot_buffer_len = 0; - error = uv_read_start((uv_stream_t *)&server_pipe, on_read_alloc, on_pipe_read); - fatal_assert(error == 0); - - while (!server_shutdown) { - uv_run(loop, UV_RUN_DEFAULT); - } - fprintf(stderr, "Shutting down spawn server loop complete.\n"); - fatal_assert(0 == uv_loop_close(loop)); - - exit(0); -} -- cgit v1.2.3