diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /spawn/spawn_server.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'spawn/spawn_server.c')
-rw-r--r-- | spawn/spawn_server.c | 386 |
1 files changed, 0 insertions, 386 deletions
diff --git a/spawn/spawn_server.c b/spawn/spawn_server.c deleted file mode 100644 index 1d79ef15d..000000000 --- a/spawn/spawn_server.c +++ /dev/null @@ -1,386 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "spawn.h" - -static uv_loop_t *loop; -static uv_pipe_t server_pipe; - -static int server_shutdown = 0; - -static uv_thread_t thread; - -/* spawn outstanding execution structure */ -static avl_tree_lock spawn_outstanding_exec_tree; - -static char prot_buffer[MAX_COMMAND_LENGTH]; -static unsigned prot_buffer_len = 0; - -struct spawn_execution_info { - avl_t avl; - - void *handle; - int exit_status; - pid_t pid; - struct spawn_execution_info *next; -}; - -int spawn_exec_compare(void *a, void *b) -{ - struct spawn_execution_info *spwna = a, *spwnb = b; - - if (spwna->pid < spwnb->pid) return -1; - if (spwna->pid > spwnb->pid) return 1; - - return 0; -} - -/* wake up waiter thread to reap the spawned processes */ -static uv_mutex_t wait_children_mutex; -static uv_cond_t wait_children_cond; -static uint8_t spawned_processes; -static struct spawn_execution_info *child_waited_list; -static uv_async_t child_waited_async; - -static inline struct spawn_execution_info *dequeue_child_waited_list(void) -{ - struct spawn_execution_info *exec_info; - - uv_mutex_lock(&wait_children_mutex); - if (NULL == child_waited_list) { - exec_info = NULL; - } else { - exec_info = child_waited_list; - child_waited_list = exec_info->next; - } - uv_mutex_unlock(&wait_children_mutex); - - return exec_info; -} - -static inline void enqueue_child_waited_list(struct spawn_execution_info *exec_info) -{ - uv_mutex_lock(&wait_children_mutex); - exec_info->next = child_waited_list; - child_waited_list = exec_info; - uv_mutex_unlock(&wait_children_mutex); -} - -static void after_pipe_write(uv_write_t *req, int status) -{ - (void)status; -#ifdef SPAWN_DEBUG - fprintf(stderr, "SERVER %s called status=%d\n", __func__, status); -#endif - void **data = req->data; - freez(data[0]); - freez(data[1]); - freez(data); -} - -static void child_waited_async_cb(uv_async_t *async_handle) -{ - uv_buf_t *writebuf; - int ret; - struct spawn_execution_info *exec_info; - struct write_context *write_ctx; - - (void)async_handle; - while (NULL != (exec_info = dequeue_child_waited_list())) { - write_ctx = mallocz(sizeof(*write_ctx)); - - void **data = callocz(2, sizeof(void *)); - writebuf = callocz(2, sizeof(uv_buf_t)); - - data[0] = write_ctx; - data[1] = writebuf; - write_ctx->write_req.data = data; - - write_ctx->header.opcode = SPAWN_PROT_CMD_EXIT_STATUS; - write_ctx->header.handle = exec_info->handle; - write_ctx->exit_status.exec_exit_status = exec_info->exit_status; - writebuf[0] = uv_buf_init((char *) &write_ctx->header, sizeof(write_ctx->header)); - writebuf[1] = uv_buf_init((char *) &write_ctx->exit_status, sizeof(write_ctx->exit_status)); -#ifdef SPAWN_DEBUG - fprintf(stderr, "SERVER %s SPAWN_PROT_CMD_EXIT_STATUS\n", __func__); -#endif - ret = uv_write(&write_ctx->write_req, (uv_stream_t *) &server_pipe, writebuf, 2, after_pipe_write); - fatal_assert(ret == 0); - - freez(exec_info); - } -} - -static void wait_children(void *arg) -{ - siginfo_t i; - struct spawn_execution_info tmp, *exec_info; - avl_t *ret_avl; - - (void)arg; - while (!server_shutdown) { - uv_mutex_lock(&wait_children_mutex); - while (!spawned_processes) { - uv_cond_wait(&wait_children_cond, &wait_children_mutex); - } - spawned_processes = 0; - uv_mutex_unlock(&wait_children_mutex); - - while (!server_shutdown) { - i.si_pid = 0; - if (waitid(P_ALL, (id_t) 0, &i, WEXITED) == -1) { - if (errno != ECHILD) - fprintf(stderr, "SPAWN: Failed to wait: %s\n", strerror(errno)); - break; - } - if (i.si_pid == 0) { - fprintf(stderr, "SPAWN: No child exited.\n"); - break; - } -#ifdef SPAWN_DEBUG - fprintf(stderr, "SPAWN: Successfully waited for pid:%d.\n", (int) i.si_pid); -#endif - fatal_assert(CLD_EXITED == i.si_code); - tmp.pid = (pid_t)i.si_pid; - while (NULL == (ret_avl = avl_remove_lock(&spawn_outstanding_exec_tree, (avl_t *)&tmp))) { - fprintf(stderr, - "SPAWN: race condition detected, waiting for child process %d to be indexed.\n", - (int)tmp.pid); - (void)sleep_usec(10000); /* 10 msec */ - } - exec_info = (struct spawn_execution_info *)ret_avl; - exec_info->exit_status = i.si_status; - enqueue_child_waited_list(exec_info); - - /* wake up event loop */ - fatal_assert(0 == uv_async_send(&child_waited_async)); - } - } -} - -void spawn_protocol_execute_command(void *handle, char *command_to_run, uint16_t command_length) -{ - uv_buf_t *writebuf; - int ret; - avl_t *avl_ret; - struct spawn_execution_info *exec_info; - struct write_context *write_ctx; - - write_ctx = mallocz(sizeof(*write_ctx)); - void **data = callocz(2, sizeof(void *)); - writebuf = callocz(2, sizeof(uv_buf_t)); - data[0] = write_ctx; - data[1] = writebuf; - write_ctx->write_req.data = data; - - command_to_run[command_length] = '\0'; -#ifdef SPAWN_DEBUG - fprintf(stderr, "SPAWN: executing command '%s'\n", command_to_run); -#endif - if (netdata_spawn(command_to_run, &write_ctx->spawn_result.exec_pid)) { - fprintf(stderr, "SPAWN: Cannot spawn(\"%s\", \"r\").\n", command_to_run); - write_ctx->spawn_result.exec_pid = 0; - } else { /* successfully spawned command */ - write_ctx->spawn_result.exec_run_timestamp = now_realtime_sec(); - - /* record it for when the process finishes execution */ - exec_info = mallocz(sizeof(*exec_info)); - exec_info->handle = handle; - exec_info->pid = write_ctx->spawn_result.exec_pid; - avl_ret = avl_insert_lock(&spawn_outstanding_exec_tree, (avl_t *)exec_info); - fatal_assert(avl_ret == (avl_t *)exec_info); - - /* wake up the thread that blocks waiting for processes to exit */ - uv_mutex_lock(&wait_children_mutex); - spawned_processes = 1; - uv_cond_signal(&wait_children_cond); - uv_mutex_unlock(&wait_children_mutex); - } - - write_ctx->header.opcode = SPAWN_PROT_SPAWN_RESULT; - write_ctx->header.handle = handle; - writebuf[0] = uv_buf_init((char *)&write_ctx->header, sizeof(write_ctx->header)); - writebuf[1] = uv_buf_init((char *)&write_ctx->spawn_result, sizeof(write_ctx->spawn_result)); -#ifdef SPAWN_DEBUG - fprintf(stderr, "SERVER %s SPAWN_PROT_SPAWN_RESULT\n", __func__); -#endif - ret = uv_write(&write_ctx->write_req, (uv_stream_t *)&server_pipe, writebuf, 2, after_pipe_write); - fatal_assert(ret == 0); -} - -static void server_parse_spawn_protocol(unsigned source_len, char *source) -{ - unsigned required_len; - struct spawn_prot_header *header; - struct spawn_prot_exec_cmd *payload; - uint16_t command_length; - - while (source_len) { - required_len = sizeof(*header); - if (prot_buffer_len < required_len) - copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); - if (prot_buffer_len < required_len) - return; /* Source buffer ran out */ - - header = (struct spawn_prot_header *)prot_buffer; - fatal_assert(SPAWN_PROT_EXEC_CMD == header->opcode); - fatal_assert(NULL != header->handle); - - required_len += sizeof(*payload); - if (prot_buffer_len < required_len) - copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); - if (prot_buffer_len < required_len) - return; /* Source buffer ran out */ - - payload = (struct spawn_prot_exec_cmd *)(header + 1); - command_length = payload->command_length; - - required_len += command_length; - if (unlikely(required_len > MAX_COMMAND_LENGTH - 1)) { - fprintf(stderr, "SPAWN: Ran out of protocol buffer space.\n"); - command_length = (MAX_COMMAND_LENGTH - 1) - (sizeof(*header) + sizeof(*payload)); - required_len = MAX_COMMAND_LENGTH - 1; - } - if (prot_buffer_len < required_len) - copy_to_prot_buffer(prot_buffer, &prot_buffer_len, required_len - prot_buffer_len, &source, &source_len); - if (prot_buffer_len < required_len) - return; /* Source buffer ran out */ - - spawn_protocol_execute_command(header->handle, payload->command_to_run, command_length); - prot_buffer_len = 0; - } -} - -static void on_pipe_read(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf) -{ - if (0 == nread) { - fprintf(stderr, "SERVER %s: Zero bytes read from spawn pipe.\n", __func__); - } else if (UV_EOF == nread) { - fprintf(stderr, "EOF found in spawn pipe.\n"); - } else if (nread < 0) { - fprintf(stderr, "%s: %s\n", __func__, uv_strerror(nread)); - } - - if (nread < 0) { /* stop spawn server due to EOF or error */ - int error; - - uv_mutex_lock(&wait_children_mutex); - server_shutdown = 1; - spawned_processes = 1; - uv_cond_signal(&wait_children_cond); - uv_mutex_unlock(&wait_children_mutex); - - fprintf(stderr, "Shutting down spawn server event loop.\n"); - /* cleanup operations of the event loop */ - (void)uv_read_stop((uv_stream_t *) pipe); - uv_close((uv_handle_t *)&server_pipe, NULL); - - error = uv_thread_join(&thread); - if (error) { - fprintf(stderr, "uv_thread_create(): %s", uv_strerror(error)); - } - /* After joining it is safe to destroy child_waited_async */ - uv_close((uv_handle_t *)&child_waited_async, NULL); - } else if (nread) { -#ifdef SPAWN_DEBUG - fprintf(stderr, "SERVER %s nread %u\n", __func__, (unsigned)nread); -#endif - server_parse_spawn_protocol(nread, buf->base); - } - if (buf && buf->len) { - freez(buf->base); - } -} - -static void on_read_alloc(uv_handle_t *handle, - size_t suggested_size, - uv_buf_t* buf) -{ - (void)handle; - buf->base = mallocz(suggested_size); - buf->len = suggested_size; -} - -static void ignore_signal_handler(int signo) { - /* - * By having a signal handler we allow spawned processes to reset default signal dispositions. Setting SIG_IGN - * would be inherited by the spawned children which is not desirable. - */ - (void)signo; -} - -void spawn_server(void) -{ - int error; - - // initialize the system clocks - clocks_init(); - - // close all open file descriptors, except the standard ones - // the caller may have left open files (lxc-attach has this issue) - for_each_open_fd(OPEN_FD_ACTION_CLOSE, OPEN_FD_EXCLUDE_STDIN | OPEN_FD_EXCLUDE_STDOUT | OPEN_FD_EXCLUDE_STDERR); - - // Have the libuv IPC pipe be closed when forking child processes - (void) fcntl(0, F_SETFD, FD_CLOEXEC); - fprintf(stderr, "Spawn server is up.\n"); - - // Define signals we want to ignore - struct sigaction sa; - int signals_to_ignore[] = {SIGPIPE, SIGINT, SIGQUIT, SIGTERM, SIGHUP, SIGUSR1, SIGUSR2, SIGBUS, SIGCHLD}; - unsigned ignore_length = sizeof(signals_to_ignore) / sizeof(signals_to_ignore[0]); - - unsigned i; - for (i = 0; i < ignore_length ; ++i) { - sa.sa_flags = 0; - sigemptyset(&sa.sa_mask); - sa.sa_handler = ignore_signal_handler; - if(sigaction(signals_to_ignore[i], &sa, NULL) == -1) - fprintf(stderr, "SPAWN: Failed to change signal handler for signal: %d.\n", signals_to_ignore[i]); - } - - signals_unblock(); - - loop = uv_default_loop(); - loop->data = NULL; - - error = uv_pipe_init(loop, &server_pipe, 1); - if (error) { - fprintf(stderr, "uv_pipe_init(): %s\n", uv_strerror(error)); - exit(error); - } - fatal_assert(server_pipe.ipc); - - error = uv_pipe_open(&server_pipe, 0 /* UV_STDIN_FD */); - if (error) { - fprintf(stderr, "uv_pipe_open(): %s\n", uv_strerror(error)); - exit(error); - } - avl_init_lock(&spawn_outstanding_exec_tree, spawn_exec_compare); - - spawned_processes = 0; - fatal_assert(0 == uv_cond_init(&wait_children_cond)); - fatal_assert(0 == uv_mutex_init(&wait_children_mutex)); - child_waited_list = NULL; - error = uv_async_init(loop, &child_waited_async, child_waited_async_cb); - if (error) { - fprintf(stderr, "uv_async_init(): %s\n", uv_strerror(error)); - exit(error); - } - - error = uv_thread_create(&thread, wait_children, NULL); - if (error) { - fprintf(stderr, "uv_thread_create(): %s\n", uv_strerror(error)); - exit(error); - } - - prot_buffer_len = 0; - error = uv_read_start((uv_stream_t *)&server_pipe, on_read_alloc, on_pipe_read); - fatal_assert(error == 0); - - while (!server_shutdown) { - uv_run(loop, UV_RUN_DEFAULT); - } - fprintf(stderr, "Shutting down spawn server loop complete.\n"); - fatal_assert(0 == uv_loop_close(loop)); - - exit(0); -} |