diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /src/spawn | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/spawn')
-rw-r--r-- | src/spawn/README.md | 0 | ||||
-rw-r--r-- | src/spawn/spawn.c | 288 | ||||
-rw-r--r-- | src/spawn/spawn.h | 109 | ||||
-rw-r--r-- | src/spawn/spawn_client.c | 250 | ||||
-rw-r--r-- | src/spawn/spawn_server.c | 386 |
5 files changed, 1033 insertions, 0 deletions
diff --git a/src/spawn/README.md b/src/spawn/README.md new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/spawn/README.md diff --git a/src/spawn/spawn.c b/src/spawn/spawn.c new file mode 100644 index 000000000..a6e53718a --- /dev/null +++ b/src/spawn/spawn.c @@ -0,0 +1,288 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "spawn.h" + +static uv_thread_t thread; +int spawn_thread_error; +int spawn_thread_shutdown; + +struct spawn_queue spawn_cmd_queue; + +static struct spawn_cmd_info *create_spawn_cmd(const char *command_to_run) +{ + struct spawn_cmd_info *cmdinfo; + + cmdinfo = mallocz(sizeof(*cmdinfo)); + fatal_assert(0 == uv_cond_init(&cmdinfo->cond)); + fatal_assert(0 == uv_mutex_init(&cmdinfo->mutex)); + cmdinfo->serial = 0; /* invalid */ + cmdinfo->command_to_run = strdupz(command_to_run); + cmdinfo->exit_status = -1; /* invalid */ + cmdinfo->pid = -1; /* invalid */ + cmdinfo->flags = 0; + + return cmdinfo; +} + +void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo) +{ + uv_cond_destroy(&cmdinfo->cond); + uv_mutex_destroy(&cmdinfo->mutex); + + freez(cmdinfo->command_to_run); + freez(cmdinfo); +} + +int spawn_cmd_compare(void *a, void *b) +{ + struct spawn_cmd_info *cmda = a, *cmdb = b; + + /* No need for mutex, serial will never change and the entries cannot be deallocated yet */ + if (cmda->serial < cmdb->serial) return -1; + if (cmda->serial > cmdb->serial) return 1; + + return 0; +} + +static void init_spawn_cmd_queue(void) +{ + spawn_cmd_queue.cmd_tree.root = NULL; + spawn_cmd_queue.cmd_tree.compar = spawn_cmd_compare; + spawn_cmd_queue.size = 0; + spawn_cmd_queue.latest_serial = 0; + fatal_assert(0 == uv_cond_init(&spawn_cmd_queue.cond)); + fatal_assert(0 == uv_mutex_init(&spawn_cmd_queue.mutex)); +} + +/* + * Returns serial number of the enqueued command + */ +uint64_t spawn_enq_cmd(const char *command_to_run) +{ + unsigned queue_size; + uint64_t serial; + avl_t *avl_ret; + struct spawn_cmd_info *cmdinfo; + + cmdinfo = create_spawn_cmd(command_to_run); + + /* wait for free space in queue */ + uv_mutex_lock(&spawn_cmd_queue.mutex); + while ((queue_size = spawn_cmd_queue.size) == SPAWN_MAX_OUTSTANDING) { + uv_cond_wait(&spawn_cmd_queue.cond, &spawn_cmd_queue.mutex); + } + fatal_assert(queue_size < SPAWN_MAX_OUTSTANDING); + spawn_cmd_queue.size = queue_size + 1; + + serial = ++spawn_cmd_queue.latest_serial; /* 0 is invalid */ + cmdinfo->serial = serial; /* No need to take the cmd mutex since it is unreachable at the moment */ + + /* enqueue command */ + avl_ret = avl_insert(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo); + fatal_assert(avl_ret == (avl_t *)cmdinfo); + uv_mutex_unlock(&spawn_cmd_queue.mutex); + + /* wake up event loop */ + fatal_assert(0 == uv_async_send(&spawn_async)); + return serial; +} + +/* + * Blocks until command with serial finishes running. Only one thread is allowed to wait per command. + */ +void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp) +{ + avl_t *avl_ret; + struct spawn_cmd_info tmp, *cmdinfo; + + tmp.serial = serial; + + uv_mutex_lock(&spawn_cmd_queue.mutex); + avl_ret = avl_search(&spawn_cmd_queue.cmd_tree, (avl_t *)&tmp); + uv_mutex_unlock(&spawn_cmd_queue.mutex); + + fatal_assert(avl_ret); /* Could be NULL if more than 1 threads wait for the command */ + cmdinfo = (struct spawn_cmd_info *)avl_ret; + + uv_mutex_lock(&cmdinfo->mutex); + while (!(cmdinfo->flags & SPAWN_CMD_DONE)) { + /* Only 1 thread is allowed to wait for this command to finish */ + uv_cond_wait(&cmdinfo->cond, &cmdinfo->mutex); + } + uv_mutex_unlock(&cmdinfo->mutex); + + spawn_deq_cmd(cmdinfo); + *exit_status = cmdinfo->exit_status; + *exec_run_timestamp = cmdinfo->exec_run_timestamp; + + destroy_spawn_cmd(cmdinfo); +} + +void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo) +{ + unsigned queue_size; + avl_t *avl_ret; + + uv_mutex_lock(&spawn_cmd_queue.mutex); + queue_size = spawn_cmd_queue.size; + fatal_assert(queue_size); + /* dequeue command */ + avl_ret = avl_remove(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo); + fatal_assert(avl_ret); + + spawn_cmd_queue.size = queue_size - 1; + + /* wake up callers */ + uv_cond_signal(&spawn_cmd_queue.cond); + uv_mutex_unlock(&spawn_cmd_queue.mutex); +} + +/* + * Must be called from the spawn client event loop context. This way no mutex is needed because the event loop is the + * only writer as far as struct spawn_cmd_info entries are concerned. + */ +static int find_unprocessed_spawn_cmd_cb(void *entry, void *data) +{ + struct spawn_cmd_info **cmdinfop = data, *cmdinfo = entry; + + if (!(cmdinfo->flags & SPAWN_CMD_PROCESSED)) { + *cmdinfop = cmdinfo; + return -1; /* break tree traversal */ + } + return 0; /* continue traversing */ +} + +struct spawn_cmd_info *spawn_get_unprocessed_cmd(void) +{ + struct spawn_cmd_info *cmdinfo; + unsigned queue_size; + int ret; + + uv_mutex_lock(&spawn_cmd_queue.mutex); + queue_size = spawn_cmd_queue.size; + if (queue_size == 0) { + uv_mutex_unlock(&spawn_cmd_queue.mutex); + return NULL; + } + /* find command */ + cmdinfo = NULL; + ret = avl_traverse(&spawn_cmd_queue.cmd_tree, find_unprocessed_spawn_cmd_cb, (void *)&cmdinfo); + if (-1 != ret) { /* no commands available for processing */ + uv_mutex_unlock(&spawn_cmd_queue.mutex); + return NULL; + } + uv_mutex_unlock(&spawn_cmd_queue.mutex); + + return cmdinfo; +} + +/** + * This function spawns a process that shares a libuv IPC pipe with the caller and performs spawn server duties. + * The spawn server process will close all open file descriptors except for the pipe, UV_STDOUT_FD, and UV_STDERR_FD. + * The caller has to be the netdata user as configured. + * + * @param loop the libuv loop of the caller context + * @param spawn_channel the bidirectional libuv IPC pipe that the server and the caller will share + * @param process the spawn server libuv process context + * @return 0 on success or the libuv error code + */ +int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process) +{ + uv_process_options_t options = {0}; + char *args[3]; + int ret; +#define SPAWN_SERVER_DESCRIPTORS (3) + uv_stdio_container_t stdio[SPAWN_SERVER_DESCRIPTORS]; + struct passwd *passwd = NULL; + char *user = NULL; + + passwd = getpwuid(getuid()); + user = (passwd && passwd->pw_name) ? passwd->pw_name : ""; + + args[0] = netdata_exe_file; + args[1] = SPAWN_SERVER_COMMAND_LINE_ARGUMENT; + args[2] = NULL; + + memset(&options, 0, sizeof(options)); + options.file = netdata_exe_file; + options.args = args; + options.exit_cb = NULL; //exit_cb; + options.stdio = stdio; + options.stdio_count = SPAWN_SERVER_DESCRIPTORS; + + stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE; + stdio[0].data.stream = (uv_stream_t *)spawn_channel; /* bidirectional libuv pipe */ + stdio[1].flags = UV_INHERIT_FD; + stdio[1].data.fd = 1 /* UV_STDOUT_FD */; + stdio[2].flags = UV_INHERIT_FD; + stdio[2].data.fd = nd_log_health_fd() /* UV_STDERR_FD */; + + ret = uv_spawn(loop, process, &options); /* execute the netdata binary again as the netdata user */ + if (0 != ret) { + netdata_log_error("uv_spawn (process: \"%s\") (user: %s) failed (%s).", netdata_exe_file, user, uv_strerror(ret)); + fatal("Cannot start netdata without the spawn server."); + } + + return ret; +} + +#define CONCURRENT_SPAWNS 16 +#define SPAWN_ITERATIONS 10000 +#undef CONCURRENT_STRESS_TEST + +void spawn_init(void) +{ + struct completion completion; + int error; + + netdata_log_info("Initializing spawn client."); + + init_spawn_cmd_queue(); + + completion_init(&completion); + error = uv_thread_create(&thread, spawn_client, &completion); + if (error) { + netdata_log_error("uv_thread_create(): %s", uv_strerror(error)); + goto after_error; + } + /* wait for spawn client thread to initialize */ + completion_wait_for(&completion); + completion_destroy(&completion); + + if (spawn_thread_error) { + error = uv_thread_join(&thread); + if (error) { + netdata_log_error("uv_thread_create(): %s", uv_strerror(error)); + } + goto after_error; + } +#ifdef CONCURRENT_STRESS_TEST + signals_reset(); + signals_unblock(); + + sleep(60); + uint64_t serial[CONCURRENT_SPAWNS]; + for (int j = 0 ; j < SPAWN_ITERATIONS ; ++j) { + for (int i = 0; i < CONCURRENT_SPAWNS; ++i) { + char cmd[64]; + sprintf(cmd, "echo CONCURRENT_STRESS_TEST %d 1>&2", j * CONCURRENT_SPAWNS + i + 1); + serial[i] = spawn_enq_cmd(cmd); + netdata_log_info("Queued command %s for spawning.", cmd); + } + int exit_status; + time_t exec_run_timestamp; + for (int i = 0; i < CONCURRENT_SPAWNS; ++i) { + netdata_log_info("Started waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status, + exec_run_timestamp); + spawn_wait_cmd(serial[i], &exit_status, &exec_run_timestamp); + netdata_log_info("Finished waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status, + exec_run_timestamp); + } + } + exit(0); +#endif + return; + + after_error: + netdata_log_error("Failed to initialize spawn service. The alarms notifications will not be spawned."); +} diff --git a/src/spawn/spawn.h b/src/spawn/spawn.h new file mode 100644 index 000000000..6e9e51ef0 --- /dev/null +++ b/src/spawn/spawn.h @@ -0,0 +1,109 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_SPAWN_H +#define NETDATA_SPAWN_H 1 + +#include "daemon/common.h" + +#define SPAWN_SERVER_COMMAND_LINE_ARGUMENT "--special-spawn-server" + +typedef enum spawn_protocol { + SPAWN_PROT_EXEC_CMD = 0, + SPAWN_PROT_SPAWN_RESULT, + SPAWN_PROT_CMD_EXIT_STATUS +} spawn_prot_t; + +struct spawn_prot_exec_cmd { + uint16_t command_length; + char command_to_run[]; +}; + +struct spawn_prot_spawn_result { + pid_t exec_pid; /* 0 if failed to spawn */ + time_t exec_run_timestamp; /* time of successfully spawning the command */ +}; + +struct spawn_prot_cmd_exit_status { + int exec_exit_status; +}; + +struct spawn_prot_header { + spawn_prot_t opcode; + void *handle; +}; + +#undef SPAWN_DEBUG /* define to enable debug prints */ + +#define SPAWN_MAX_OUTSTANDING (32768) + +#define SPAWN_CMD_PROCESSED 0x00000001 +#define SPAWN_CMD_IN_PROGRESS 0x00000002 +#define SPAWN_CMD_FAILED_TO_SPAWN 0x00000004 +#define SPAWN_CMD_DONE 0x00000008 + +struct spawn_cmd_info { + avl_t avl; + + /* concurrency control per command */ + uv_mutex_t mutex; + uv_cond_t cond; /* users block here until command has finished */ + + uint64_t serial; + char *command_to_run; + int exit_status; + pid_t pid; + unsigned long flags; + time_t exec_run_timestamp; /* time of successfully spawning the command */ +}; + +/* spawn command queue */ +struct spawn_queue { + avl_tree_type cmd_tree; + + /* concurrency control of command queue */ + uv_mutex_t mutex; + uv_cond_t cond; + + volatile unsigned size; + uint64_t latest_serial; +}; + +struct write_context { + uv_write_t write_req; + struct spawn_prot_header header; + struct spawn_prot_cmd_exit_status exit_status; + struct spawn_prot_spawn_result spawn_result; + struct spawn_prot_exec_cmd payload; +}; + +extern int spawn_thread_error; +extern int spawn_thread_shutdown; +extern uv_async_t spawn_async; + +void spawn_init(void); +void spawn_server(void); +void spawn_client(void *arg); +void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo); +uint64_t spawn_enq_cmd(const char *command_to_run); +void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp); +void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo); +struct spawn_cmd_info *spawn_get_unprocessed_cmd(void); +int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process); + +/* + * Copies from the source buffer to the protocol buffer. It advances the source buffer by the amount copied. It + * subtracts the amount copied from the source length. + */ +static inline void copy_to_prot_buffer(char *prot_buffer, unsigned *prot_buffer_len, unsigned max_to_copy, + char **source, unsigned *source_len) +{ + unsigned to_copy; + + to_copy = MIN(max_to_copy, *source_len); + memcpy(prot_buffer + *prot_buffer_len, *source, to_copy); + *prot_buffer_len += to_copy; + *source += to_copy; + *source_len -= to_copy; +} + +#endif //NETDATA_SPAWN_H diff --git a/src/spawn/spawn_client.c b/src/spawn/spawn_client.c new file mode 100644 index 000000000..f2af9842c --- /dev/null +++ b/src/spawn/spawn_client.c @@ -0,0 +1,250 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "spawn.h" + +static uv_process_t process; +static uv_pipe_t spawn_channel; +static uv_loop_t *loop; +uv_async_t spawn_async; + +static char prot_buffer[MAX_COMMAND_LENGTH]; +static unsigned prot_buffer_len = 0; + +static void async_cb(uv_async_t *handle) +{ + uv_stop(handle->loop); +} + +static void after_pipe_write(uv_write_t* req, int status) +{ + (void)status; +#ifdef SPAWN_DEBUG + netdata_log_info("CLIENT %s called status=%d", __func__, status); +#endif + void **data = req->data; + freez(data[0]); + freez(data[1]); + freez(data); +} + +static void client_parse_spawn_protocol(unsigned source_len, char *source) +{ + unsigned required_len; + struct spawn_prot_header *header; + struct spawn_prot_spawn_result *spawn_result; + struct spawn_prot_cmd_exit_status *exit_status; + struct spawn_cmd_info *cmdinfo; + + while (source_len) { + required_len = sizeof(*header); + if (prot_buffer_len < required_len) + copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); + if (prot_buffer_len < required_len) + return; /* Source buffer ran out */ + + header = (struct spawn_prot_header *)prot_buffer; + cmdinfo = (struct spawn_cmd_info *)header->handle; + fatal_assert(NULL != cmdinfo); + + switch(header->opcode) { + case SPAWN_PROT_SPAWN_RESULT: + required_len += sizeof(*spawn_result); + if (prot_buffer_len < required_len) + copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); + if (prot_buffer_len < required_len) + return; /* Source buffer ran out */ + + spawn_result = (struct spawn_prot_spawn_result *)(header + 1); + uv_mutex_lock(&cmdinfo->mutex); + cmdinfo->pid = spawn_result->exec_pid; + if (0 == cmdinfo->pid) { /* Failed to spawn */ +#ifdef SPAWN_DEBUG + netdata_log_info("CLIENT %s SPAWN_PROT_SPAWN_RESULT failed to spawn.", __func__); +#endif + cmdinfo->flags |= SPAWN_CMD_FAILED_TO_SPAWN | SPAWN_CMD_DONE; + uv_cond_signal(&cmdinfo->cond); + } else { + cmdinfo->exec_run_timestamp = spawn_result->exec_run_timestamp; + cmdinfo->flags |= SPAWN_CMD_IN_PROGRESS; +#ifdef SPAWN_DEBUG + netdata_log_info("CLIENT %s SPAWN_PROT_SPAWN_RESULT in progress.", __func__); +#endif + } + uv_mutex_unlock(&cmdinfo->mutex); + prot_buffer_len = 0; + break; + case SPAWN_PROT_CMD_EXIT_STATUS: + required_len += sizeof(*exit_status); + if (prot_buffer_len < required_len) + copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); + if (prot_buffer_len < required_len) + return; /* Source buffer ran out */ + + exit_status = (struct spawn_prot_cmd_exit_status *)(header + 1); + uv_mutex_lock(&cmdinfo->mutex); + cmdinfo->exit_status = exit_status->exec_exit_status; +#ifdef SPAWN_DEBUG + netdata_log_info("CLIENT %s SPAWN_PROT_CMD_EXIT_STATUS %d.", __func__, exit_status->exec_exit_status); +#endif + cmdinfo->flags |= SPAWN_CMD_DONE; + uv_cond_signal(&cmdinfo->cond); + uv_mutex_unlock(&cmdinfo->mutex); + prot_buffer_len = 0; + break; + default: + fatal_assert(0); + break; + } + + } +} + +static void on_pipe_read(uv_stream_t* pipe, ssize_t nread, const uv_buf_t* buf) +{ + if (0 == nread) { + netdata_log_info("%s: Zero bytes read from spawn pipe.", __func__); + } else if (UV_EOF == nread) { + netdata_log_info("EOF found in spawn pipe."); + } else if (nread < 0) { + netdata_log_error("%s: %s", __func__, uv_strerror(nread)); + } + + if (nread < 0) { /* stop stream due to EOF or error */ + (void)uv_read_stop((uv_stream_t *)pipe); + } else if (nread) { +#ifdef SPAWN_DEBUG + netdata_log_info("CLIENT %s read %u", __func__, (unsigned)nread); +#endif + client_parse_spawn_protocol(nread, buf->base); + } + if (buf && buf->len) { + freez(buf->base); + } + + if (nread < 0) { + uv_close((uv_handle_t *)pipe, NULL); + } +} + +static void on_read_alloc(uv_handle_t* handle, + size_t suggested_size, + uv_buf_t* buf) +{ + (void)handle; + buf->base = mallocz(suggested_size); + buf->len = suggested_size; +} + +static void spawn_process_cmd(struct spawn_cmd_info *cmdinfo) +{ + int ret; + uv_buf_t *writebuf; + struct write_context *write_ctx; + + void **data = callocz(2, sizeof(void *)); + writebuf = callocz(3, sizeof(uv_buf_t)); + write_ctx = callocz(1, sizeof(*write_ctx)); + + data[0] = write_ctx; + data[1] = writebuf; + write_ctx->write_req.data = data; + + uv_mutex_lock(&cmdinfo->mutex); + cmdinfo->flags |= SPAWN_CMD_PROCESSED; + uv_mutex_unlock(&cmdinfo->mutex); + + write_ctx->header.opcode = SPAWN_PROT_EXEC_CMD; + write_ctx->header.handle = cmdinfo; + write_ctx->payload.command_length = strlen(cmdinfo->command_to_run); + + writebuf[0] = uv_buf_init((char *)&write_ctx->header, sizeof(write_ctx->header)); + writebuf[1] = uv_buf_init((char *)&write_ctx->payload, sizeof(write_ctx->payload)); + writebuf[2] = uv_buf_init((char *)cmdinfo->command_to_run, write_ctx->payload.command_length); + +#ifdef SPAWN_DEBUG + netdata_log_info("CLIENT %s SPAWN_PROT_EXEC_CMD %u", __func__, (unsigned)cmdinfo->serial); +#endif + ret = uv_write(&write_ctx->write_req, (uv_stream_t *)&spawn_channel, writebuf, 3, after_pipe_write); + fatal_assert(ret == 0); +} + +void spawn_client(void *arg) +{ + uv_thread_set_name_np("DAEMON_SPAWN"); + + int ret; + struct completion *completion = (struct completion *)arg; + + loop = mallocz(sizeof(uv_loop_t)); + ret = uv_loop_init(loop); + if (ret) { + netdata_log_error("uv_loop_init(): %s", uv_strerror(ret)); + spawn_thread_error = ret; + goto error_after_loop_init; + } + loop->data = NULL; + + spawn_async.data = NULL; + ret = uv_async_init(loop, &spawn_async, async_cb); + if (ret) { + netdata_log_error("uv_async_init(): %s", uv_strerror(ret)); + spawn_thread_error = ret; + goto error_after_async_init; + } + + ret = uv_pipe_init(loop, &spawn_channel, 1); + if (ret) { + netdata_log_error("uv_pipe_init(): %s", uv_strerror(ret)); + spawn_thread_error = ret; + goto error_after_pipe_init; + } + fatal_assert(spawn_channel.ipc); + + ret = create_spawn_server(loop, &spawn_channel, &process); + if (ret) { + netdata_log_error("Failed to fork spawn server process."); + spawn_thread_error = ret; + goto error_after_spawn_server; + } + + spawn_thread_error = 0; + spawn_thread_shutdown = 0; + /* wake up initialization thread */ + completion_mark_complete(completion); + + prot_buffer_len = 0; + ret = uv_read_start((uv_stream_t *)&spawn_channel, on_read_alloc, on_pipe_read); + fatal_assert(ret == 0); + + while (spawn_thread_shutdown == 0) { + struct spawn_cmd_info *cmdinfo; + + uv_run(loop, UV_RUN_DEFAULT); + while (NULL != (cmdinfo = spawn_get_unprocessed_cmd())) { + spawn_process_cmd(cmdinfo); + } + } + /* cleanup operations of the event loop */ + netdata_log_info("Shutting down spawn client event loop."); + uv_close((uv_handle_t *)&spawn_channel, NULL); + uv_close((uv_handle_t *)&spawn_async, NULL); + uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */ + + netdata_log_info("Shutting down spawn client loop complete."); + fatal_assert(0 == uv_loop_close(loop)); + + return; + +error_after_spawn_server: + uv_close((uv_handle_t *)&spawn_channel, NULL); +error_after_pipe_init: + uv_close((uv_handle_t *)&spawn_async, NULL); +error_after_async_init: + uv_run(loop, UV_RUN_DEFAULT); /* flush all libuv handles */ + fatal_assert(0 == uv_loop_close(loop)); +error_after_loop_init: + freez(loop); + + /* wake up initialization thread */ + completion_mark_complete(completion); +} diff --git a/src/spawn/spawn_server.c b/src/spawn/spawn_server.c new file mode 100644 index 000000000..f17669368 --- /dev/null +++ b/src/spawn/spawn_server.c @@ -0,0 +1,386 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "spawn.h" + +static uv_loop_t *loop; +static uv_pipe_t server_pipe; + +static int server_shutdown = 0; + +static uv_thread_t thread; + +/* spawn outstanding execution structure */ +static avl_tree_lock spawn_outstanding_exec_tree; + +static char prot_buffer[MAX_COMMAND_LENGTH]; +static unsigned prot_buffer_len = 0; + +struct spawn_execution_info { + avl_t avl; + + void *handle; + int exit_status; + pid_t pid; + struct spawn_execution_info *next; +}; + +int spawn_exec_compare(void *a, void *b) +{ + struct spawn_execution_info *spwna = a, *spwnb = b; + + if (spwna->pid < spwnb->pid) return -1; + if (spwna->pid > spwnb->pid) return 1; + + return 0; +} + +/* wake up waiter thread to reap the spawned processes */ +static uv_mutex_t wait_children_mutex; +static uv_cond_t wait_children_cond; +static uint8_t spawned_processes; +static struct spawn_execution_info *child_waited_list; +static uv_async_t child_waited_async; + +static inline struct spawn_execution_info *dequeue_child_waited_list(void) +{ + struct spawn_execution_info *exec_info; + + uv_mutex_lock(&wait_children_mutex); + if (NULL == child_waited_list) { + exec_info = NULL; + } else { + exec_info = child_waited_list; + child_waited_list = exec_info->next; + } + uv_mutex_unlock(&wait_children_mutex); + + return exec_info; +} + +static inline void enqueue_child_waited_list(struct spawn_execution_info *exec_info) +{ + uv_mutex_lock(&wait_children_mutex); + exec_info->next = child_waited_list; + child_waited_list = exec_info; + uv_mutex_unlock(&wait_children_mutex); +} + +static void after_pipe_write(uv_write_t *req, int status) +{ + (void)status; +#ifdef SPAWN_DEBUG + fprintf(stderr, "SERVER %s called status=%d\n", __func__, status); +#endif + void **data = req->data; + freez(data[0]); + freez(data[1]); + freez(data); +} + +static void child_waited_async_cb(uv_async_t *async_handle) +{ + uv_buf_t *writebuf; + int ret; + struct spawn_execution_info *exec_info; + struct write_context *write_ctx; + + (void)async_handle; + while (NULL != (exec_info = dequeue_child_waited_list())) { + write_ctx = mallocz(sizeof(*write_ctx)); + + void **data = callocz(2, sizeof(void *)); + writebuf = callocz(2, sizeof(uv_buf_t)); + + data[0] = write_ctx; + data[1] = writebuf; + write_ctx->write_req.data = data; + + write_ctx->header.opcode = SPAWN_PROT_CMD_EXIT_STATUS; + write_ctx->header.handle = exec_info->handle; + write_ctx->exit_status.exec_exit_status = exec_info->exit_status; + writebuf[0] = uv_buf_init((char *) &write_ctx->header, sizeof(write_ctx->header)); + writebuf[1] = uv_buf_init((char *) &write_ctx->exit_status, sizeof(write_ctx->exit_status)); +#ifdef SPAWN_DEBUG + fprintf(stderr, "SERVER %s SPAWN_PROT_CMD_EXIT_STATUS\n", __func__); +#endif + ret = uv_write(&write_ctx->write_req, (uv_stream_t *) &server_pipe, writebuf, 2, after_pipe_write); + fatal_assert(ret == 0); + + freez(exec_info); + } +} + +static void wait_children(void *arg) +{ + siginfo_t i; + struct spawn_execution_info tmp, *exec_info; + avl_t *ret_avl; + + (void)arg; + while (!server_shutdown) { + uv_mutex_lock(&wait_children_mutex); + while (!spawned_processes) { + uv_cond_wait(&wait_children_cond, &wait_children_mutex); + } + spawned_processes = 0; + uv_mutex_unlock(&wait_children_mutex); + + while (!server_shutdown) { + i.si_pid = 0; + if (os_waitid(P_ALL, (id_t) 0, &i, WEXITED) == -1) { + if (errno != ECHILD) + fprintf(stderr, "SPAWN: Failed to wait: %s\n", strerror(errno)); + break; + } + if (i.si_pid == 0) { + fprintf(stderr, "SPAWN: No child exited.\n"); + break; + } +#ifdef SPAWN_DEBUG + fprintf(stderr, "SPAWN: Successfully waited for pid:%d.\n", (int) i.si_pid); +#endif + fatal_assert(CLD_EXITED == i.si_code); + tmp.pid = (pid_t)i.si_pid; + while (NULL == (ret_avl = avl_remove_lock(&spawn_outstanding_exec_tree, (avl_t *)&tmp))) { + fprintf(stderr, + "SPAWN: race condition detected, waiting for child process %d to be indexed.\n", + (int)tmp.pid); + (void)sleep_usec(10000); /* 10 msec */ + } + exec_info = (struct spawn_execution_info *)ret_avl; + exec_info->exit_status = i.si_status; + enqueue_child_waited_list(exec_info); + + /* wake up event loop */ + fatal_assert(0 == uv_async_send(&child_waited_async)); + } + } +} + +void spawn_protocol_execute_command(void *handle, char *command_to_run, uint16_t command_length) +{ + uv_buf_t *writebuf; + int ret; + avl_t *avl_ret; + struct spawn_execution_info *exec_info; + struct write_context *write_ctx; + + write_ctx = mallocz(sizeof(*write_ctx)); + void **data = callocz(2, sizeof(void *)); + writebuf = callocz(2, sizeof(uv_buf_t)); + data[0] = write_ctx; + data[1] = writebuf; + write_ctx->write_req.data = data; + + command_to_run[command_length] = '\0'; +#ifdef SPAWN_DEBUG + fprintf(stderr, "SPAWN: executing command '%s'\n", command_to_run); +#endif + if (netdata_spawn(command_to_run, &write_ctx->spawn_result.exec_pid)) { + fprintf(stderr, "SPAWN: Cannot spawn(\"%s\", \"r\").\n", command_to_run); + write_ctx->spawn_result.exec_pid = 0; + } else { /* successfully spawned command */ + write_ctx->spawn_result.exec_run_timestamp = now_realtime_sec(); + + /* record it for when the process finishes execution */ + exec_info = mallocz(sizeof(*exec_info)); + exec_info->handle = handle; + exec_info->pid = write_ctx->spawn_result.exec_pid; + avl_ret = avl_insert_lock(&spawn_outstanding_exec_tree, (avl_t *)exec_info); + fatal_assert(avl_ret == (avl_t *)exec_info); + + /* wake up the thread that blocks waiting for processes to exit */ + uv_mutex_lock(&wait_children_mutex); + spawned_processes = 1; + uv_cond_signal(&wait_children_cond); + uv_mutex_unlock(&wait_children_mutex); + } + + write_ctx->header.opcode = SPAWN_PROT_SPAWN_RESULT; + write_ctx->header.handle = handle; + writebuf[0] = uv_buf_init((char *)&write_ctx->header, sizeof(write_ctx->header)); + writebuf[1] = uv_buf_init((char *)&write_ctx->spawn_result, sizeof(write_ctx->spawn_result)); +#ifdef SPAWN_DEBUG + fprintf(stderr, "SERVER %s SPAWN_PROT_SPAWN_RESULT\n", __func__); +#endif + ret = uv_write(&write_ctx->write_req, (uv_stream_t *)&server_pipe, writebuf, 2, after_pipe_write); + fatal_assert(ret == 0); +} + +static void server_parse_spawn_protocol(unsigned source_len, char *source) +{ + unsigned required_len; + struct spawn_prot_header *header; + struct spawn_prot_exec_cmd *payload; + uint16_t command_length; + + while (source_len) { + required_len = sizeof(*header); + if (prot_buffer_len < required_len) + copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); + if (prot_buffer_len < required_len) + return; /* Source buffer ran out */ + + header = (struct spawn_prot_header *)prot_buffer; + fatal_assert(SPAWN_PROT_EXEC_CMD == header->opcode); + fatal_assert(NULL != header->handle); + + required_len += sizeof(*payload); + if (prot_buffer_len < required_len) + copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); + if (prot_buffer_len < required_len) + return; /* Source buffer ran out */ + + payload = (struct spawn_prot_exec_cmd *)(header + 1); + command_length = payload->command_length; + + required_len += command_length; + if (unlikely(required_len > MAX_COMMAND_LENGTH - 1)) { + fprintf(stderr, "SPAWN: Ran out of protocol buffer space.\n"); + command_length = (MAX_COMMAND_LENGTH - 1) - (sizeof(*header) + sizeof(*payload)); + required_len = MAX_COMMAND_LENGTH - 1; + } + if (prot_buffer_len < required_len) + copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); + if (prot_buffer_len < required_len) + return; /* Source buffer ran out */ + + spawn_protocol_execute_command(header->handle, payload->command_to_run, command_length); + prot_buffer_len = 0; + } +} + +static void on_pipe_read(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf) +{ + if (0 == nread) { + fprintf(stderr, "SERVER %s: Zero bytes read from spawn pipe.\n", __func__); + } else if (UV_EOF == nread) { + fprintf(stderr, "EOF found in spawn pipe.\n"); + } else if (nread < 0) { + fprintf(stderr, "%s: %s\n", __func__, uv_strerror(nread)); + } + + if (nread < 0) { /* stop spawn server due to EOF or error */ + int error; + + uv_mutex_lock(&wait_children_mutex); + server_shutdown = 1; + spawned_processes = 1; + uv_cond_signal(&wait_children_cond); + uv_mutex_unlock(&wait_children_mutex); + + fprintf(stderr, "Shutting down spawn server event loop.\n"); + /* cleanup operations of the event loop */ + (void)uv_read_stop((uv_stream_t *) pipe); + uv_close((uv_handle_t *)&server_pipe, NULL); + + error = uv_thread_join(&thread); + if (error) { + fprintf(stderr, "uv_thread_create(): %s", uv_strerror(error)); + } + /* After joining it is safe to destroy child_waited_async */ + uv_close((uv_handle_t *)&child_waited_async, NULL); + } else if (nread) { +#ifdef SPAWN_DEBUG + fprintf(stderr, "SERVER %s nread %u\n", __func__, (unsigned)nread); +#endif + server_parse_spawn_protocol(nread, buf->base); + } + if (buf && buf->len) { + freez(buf->base); + } +} + +static void on_read_alloc(uv_handle_t *handle, + size_t suggested_size, + uv_buf_t* buf) +{ + (void)handle; + buf->base = mallocz(suggested_size); + buf->len = suggested_size; +} + +static void ignore_signal_handler(int signo) { + /* + * By having a signal handler we allow spawned processes to reset default signal dispositions. Setting SIG_IGN + * would be inherited by the spawned children which is not desirable. + */ + (void)signo; +} + +void spawn_server(void) +{ + int error; + + // initialize the system clocks + clocks_init(); + + // close all open file descriptors, except the standard ones + // the caller may have left open files (lxc-attach has this issue) + for_each_open_fd(OPEN_FD_ACTION_CLOSE, OPEN_FD_EXCLUDE_STDIN | OPEN_FD_EXCLUDE_STDOUT | OPEN_FD_EXCLUDE_STDERR); + + // Have the libuv IPC pipe be closed when forking child processes + (void) fcntl(0, F_SETFD, FD_CLOEXEC); + fprintf(stderr, "Spawn server is up.\n"); + + // Define signals we want to ignore + struct sigaction sa; + int signals_to_ignore[] = {SIGPIPE, SIGINT, SIGQUIT, SIGTERM, SIGHUP, SIGUSR1, SIGUSR2, SIGBUS, SIGCHLD}; + unsigned ignore_length = sizeof(signals_to_ignore) / sizeof(signals_to_ignore[0]); + + unsigned i; + for (i = 0; i < ignore_length ; ++i) { + sa.sa_flags = 0; + sigemptyset(&sa.sa_mask); + sa.sa_handler = ignore_signal_handler; + if(sigaction(signals_to_ignore[i], &sa, NULL) == -1) + fprintf(stderr, "SPAWN: Failed to change signal handler for signal: %d.\n", signals_to_ignore[i]); + } + + signals_unblock(); + + loop = uv_default_loop(); + loop->data = NULL; + + error = uv_pipe_init(loop, &server_pipe, 1); + if (error) { + fprintf(stderr, "uv_pipe_init(): %s\n", uv_strerror(error)); + exit(error); + } + fatal_assert(server_pipe.ipc); + + error = uv_pipe_open(&server_pipe, 0 /* UV_STDIN_FD */); + if (error) { + fprintf(stderr, "uv_pipe_open(): %s\n", uv_strerror(error)); + exit(error); + } + avl_init_lock(&spawn_outstanding_exec_tree, spawn_exec_compare); + + spawned_processes = 0; + fatal_assert(0 == uv_cond_init(&wait_children_cond)); + fatal_assert(0 == uv_mutex_init(&wait_children_mutex)); + child_waited_list = NULL; + error = uv_async_init(loop, &child_waited_async, child_waited_async_cb); + if (error) { + fprintf(stderr, "uv_async_init(): %s\n", uv_strerror(error)); + exit(error); + } + + error = uv_thread_create(&thread, wait_children, NULL); + if (error) { + fprintf(stderr, "uv_thread_create(): %s\n", uv_strerror(error)); + exit(error); + } + + prot_buffer_len = 0; + error = uv_read_start((uv_stream_t *)&server_pipe, on_read_alloc, on_pipe_read); + fatal_assert(error == 0); + + while (!server_shutdown) { + uv_run(loop, UV_RUN_DEFAULT); + } + fprintf(stderr, "Shutting down spawn server loop complete.\n"); + fatal_assert(0 == uv_loop_close(loop)); + + exit(0); +} |