diff options
Diffstat (limited to '')
-rw-r--r-- | src/spawn/spawn.c | 288 |
1 files changed, 0 insertions, 288 deletions
diff --git a/src/spawn/spawn.c b/src/spawn/spawn.c deleted file mode 100644 index a6e53718..00000000 --- a/src/spawn/spawn.c +++ /dev/null @@ -1,288 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "spawn.h" - -static uv_thread_t thread; -int spawn_thread_error; -int spawn_thread_shutdown; - -struct spawn_queue spawn_cmd_queue; - -static struct spawn_cmd_info *create_spawn_cmd(const char *command_to_run) -{ - struct spawn_cmd_info *cmdinfo; - - cmdinfo = mallocz(sizeof(*cmdinfo)); - fatal_assert(0 == uv_cond_init(&cmdinfo->cond)); - fatal_assert(0 == uv_mutex_init(&cmdinfo->mutex)); - cmdinfo->serial = 0; /* invalid */ - cmdinfo->command_to_run = strdupz(command_to_run); - cmdinfo->exit_status = -1; /* invalid */ - cmdinfo->pid = -1; /* invalid */ - cmdinfo->flags = 0; - - return cmdinfo; -} - -void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo) -{ - uv_cond_destroy(&cmdinfo->cond); - uv_mutex_destroy(&cmdinfo->mutex); - - freez(cmdinfo->command_to_run); - freez(cmdinfo); -} - -int spawn_cmd_compare(void *a, void *b) -{ - struct spawn_cmd_info *cmda = a, *cmdb = b; - - /* No need for mutex, serial will never change and the entries cannot be deallocated yet */ - if (cmda->serial < cmdb->serial) return -1; - if (cmda->serial > cmdb->serial) return 1; - - return 0; -} - -static void init_spawn_cmd_queue(void) -{ - spawn_cmd_queue.cmd_tree.root = NULL; - spawn_cmd_queue.cmd_tree.compar = spawn_cmd_compare; - spawn_cmd_queue.size = 0; - spawn_cmd_queue.latest_serial = 0; - fatal_assert(0 == uv_cond_init(&spawn_cmd_queue.cond)); - fatal_assert(0 == uv_mutex_init(&spawn_cmd_queue.mutex)); -} - -/* - * Returns serial number of the enqueued command - */ -uint64_t spawn_enq_cmd(const char *command_to_run) -{ - unsigned queue_size; - uint64_t serial; - avl_t *avl_ret; - struct spawn_cmd_info *cmdinfo; - - cmdinfo = create_spawn_cmd(command_to_run); - - /* wait for free space in queue */ - uv_mutex_lock(&spawn_cmd_queue.mutex); - while ((queue_size = spawn_cmd_queue.size) == SPAWN_MAX_OUTSTANDING) { - uv_cond_wait(&spawn_cmd_queue.cond, &spawn_cmd_queue.mutex); - } - fatal_assert(queue_size < SPAWN_MAX_OUTSTANDING); - spawn_cmd_queue.size = queue_size + 1; - - serial = ++spawn_cmd_queue.latest_serial; /* 0 is invalid */ - cmdinfo->serial = serial; /* No need to take the cmd mutex since it is unreachable at the moment */ - - /* enqueue command */ - avl_ret = avl_insert(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo); - fatal_assert(avl_ret == (avl_t *)cmdinfo); - uv_mutex_unlock(&spawn_cmd_queue.mutex); - - /* wake up event loop */ - fatal_assert(0 == uv_async_send(&spawn_async)); - return serial; -} - -/* - * Blocks until command with serial finishes running. Only one thread is allowed to wait per command. - */ -void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp) -{ - avl_t *avl_ret; - struct spawn_cmd_info tmp, *cmdinfo; - - tmp.serial = serial; - - uv_mutex_lock(&spawn_cmd_queue.mutex); - avl_ret = avl_search(&spawn_cmd_queue.cmd_tree, (avl_t *)&tmp); - uv_mutex_unlock(&spawn_cmd_queue.mutex); - - fatal_assert(avl_ret); /* Could be NULL if more than 1 threads wait for the command */ - cmdinfo = (struct spawn_cmd_info *)avl_ret; - - uv_mutex_lock(&cmdinfo->mutex); - while (!(cmdinfo->flags & SPAWN_CMD_DONE)) { - /* Only 1 thread is allowed to wait for this command to finish */ - uv_cond_wait(&cmdinfo->cond, &cmdinfo->mutex); - } - uv_mutex_unlock(&cmdinfo->mutex); - - spawn_deq_cmd(cmdinfo); - *exit_status = cmdinfo->exit_status; - *exec_run_timestamp = cmdinfo->exec_run_timestamp; - - destroy_spawn_cmd(cmdinfo); -} - -void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo) -{ - unsigned queue_size; - avl_t *avl_ret; - - uv_mutex_lock(&spawn_cmd_queue.mutex); - queue_size = spawn_cmd_queue.size; - fatal_assert(queue_size); - /* dequeue command */ - avl_ret = avl_remove(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo); - fatal_assert(avl_ret); - - spawn_cmd_queue.size = queue_size - 1; - - /* wake up callers */ - uv_cond_signal(&spawn_cmd_queue.cond); - uv_mutex_unlock(&spawn_cmd_queue.mutex); -} - -/* - * Must be called from the spawn client event loop context. This way no mutex is needed because the event loop is the - * only writer as far as struct spawn_cmd_info entries are concerned. - */ -static int find_unprocessed_spawn_cmd_cb(void *entry, void *data) -{ - struct spawn_cmd_info **cmdinfop = data, *cmdinfo = entry; - - if (!(cmdinfo->flags & SPAWN_CMD_PROCESSED)) { - *cmdinfop = cmdinfo; - return -1; /* break tree traversal */ - } - return 0; /* continue traversing */ -} - -struct spawn_cmd_info *spawn_get_unprocessed_cmd(void) -{ - struct spawn_cmd_info *cmdinfo; - unsigned queue_size; - int ret; - - uv_mutex_lock(&spawn_cmd_queue.mutex); - queue_size = spawn_cmd_queue.size; - if (queue_size == 0) { - uv_mutex_unlock(&spawn_cmd_queue.mutex); - return NULL; - } - /* find command */ - cmdinfo = NULL; - ret = avl_traverse(&spawn_cmd_queue.cmd_tree, find_unprocessed_spawn_cmd_cb, (void *)&cmdinfo); - if (-1 != ret) { /* no commands available for processing */ - uv_mutex_unlock(&spawn_cmd_queue.mutex); - return NULL; - } - uv_mutex_unlock(&spawn_cmd_queue.mutex); - - return cmdinfo; -} - -/** - * This function spawns a process that shares a libuv IPC pipe with the caller and performs spawn server duties. - * The spawn server process will close all open file descriptors except for the pipe, UV_STDOUT_FD, and UV_STDERR_FD. - * The caller has to be the netdata user as configured. - * - * @param loop the libuv loop of the caller context - * @param spawn_channel the bidirectional libuv IPC pipe that the server and the caller will share - * @param process the spawn server libuv process context - * @return 0 on success or the libuv error code - */ -int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process) -{ - uv_process_options_t options = {0}; - char *args[3]; - int ret; -#define SPAWN_SERVER_DESCRIPTORS (3) - uv_stdio_container_t stdio[SPAWN_SERVER_DESCRIPTORS]; - struct passwd *passwd = NULL; - char *user = NULL; - - passwd = getpwuid(getuid()); - user = (passwd && passwd->pw_name) ? passwd->pw_name : ""; - - args[0] = netdata_exe_file; - args[1] = SPAWN_SERVER_COMMAND_LINE_ARGUMENT; - args[2] = NULL; - - memset(&options, 0, sizeof(options)); - options.file = netdata_exe_file; - options.args = args; - options.exit_cb = NULL; //exit_cb; - options.stdio = stdio; - options.stdio_count = SPAWN_SERVER_DESCRIPTORS; - - stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE; - stdio[0].data.stream = (uv_stream_t *)spawn_channel; /* bidirectional libuv pipe */ - stdio[1].flags = UV_INHERIT_FD; - stdio[1].data.fd = 1 /* UV_STDOUT_FD */; - stdio[2].flags = UV_INHERIT_FD; - stdio[2].data.fd = nd_log_health_fd() /* UV_STDERR_FD */; - - ret = uv_spawn(loop, process, &options); /* execute the netdata binary again as the netdata user */ - if (0 != ret) { - netdata_log_error("uv_spawn (process: \"%s\") (user: %s) failed (%s).", netdata_exe_file, user, uv_strerror(ret)); - fatal("Cannot start netdata without the spawn server."); - } - - return ret; -} - -#define CONCURRENT_SPAWNS 16 -#define SPAWN_ITERATIONS 10000 -#undef CONCURRENT_STRESS_TEST - -void spawn_init(void) -{ - struct completion completion; - int error; - - netdata_log_info("Initializing spawn client."); - - init_spawn_cmd_queue(); - - completion_init(&completion); - error = uv_thread_create(&thread, spawn_client, &completion); - if (error) { - netdata_log_error("uv_thread_create(): %s", uv_strerror(error)); - goto after_error; - } - /* wait for spawn client thread to initialize */ - completion_wait_for(&completion); - completion_destroy(&completion); - - if (spawn_thread_error) { - error = uv_thread_join(&thread); - if (error) { - netdata_log_error("uv_thread_create(): %s", uv_strerror(error)); - } - goto after_error; - } -#ifdef CONCURRENT_STRESS_TEST - signals_reset(); - signals_unblock(); - - sleep(60); - uint64_t serial[CONCURRENT_SPAWNS]; - for (int j = 0 ; j < SPAWN_ITERATIONS ; ++j) { - for (int i = 0; i < CONCURRENT_SPAWNS; ++i) { - char cmd[64]; - sprintf(cmd, "echo CONCURRENT_STRESS_TEST %d 1>&2", j * CONCURRENT_SPAWNS + i + 1); - serial[i] = spawn_enq_cmd(cmd); - netdata_log_info("Queued command %s for spawning.", cmd); - } - int exit_status; - time_t exec_run_timestamp; - for (int i = 0; i < CONCURRENT_SPAWNS; ++i) { - netdata_log_info("Started waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status, - exec_run_timestamp); - spawn_wait_cmd(serial[i], &exit_status, &exec_run_timestamp); - netdata_log_info("Finished waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status, - exec_run_timestamp); - } - } - exit(0); -#endif - return; - - after_error: - netdata_log_error("Failed to initialize spawn service. The alarms notifications will not be spawned."); -} |