// SPDX-License-Identifier: GPL-3.0-or-later #include "spawn.h" static uv_thread_t thread; int spawn_thread_error; int spawn_thread_shutdown; struct spawn_queue spawn_cmd_queue; static struct spawn_cmd_info *create_spawn_cmd(const char *command_to_run) { struct spawn_cmd_info *cmdinfo; cmdinfo = mallocz(sizeof(*cmdinfo)); fatal_assert(0 == uv_cond_init(&cmdinfo->cond)); fatal_assert(0 == uv_mutex_init(&cmdinfo->mutex)); cmdinfo->serial = 0; /* invalid */ cmdinfo->command_to_run = strdupz(command_to_run); cmdinfo->exit_status = -1; /* invalid */ cmdinfo->pid = -1; /* invalid */ cmdinfo->flags = 0; return cmdinfo; } void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo) { uv_cond_destroy(&cmdinfo->cond); uv_mutex_destroy(&cmdinfo->mutex); freez(cmdinfo->command_to_run); freez(cmdinfo); } int spawn_cmd_compare(void *a, void *b) { struct spawn_cmd_info *cmda = a, *cmdb = b; /* No need for mutex, serial will never change and the entries cannot be deallocated yet */ if (cmda->serial < cmdb->serial) return -1; if (cmda->serial > cmdb->serial) return 1; return 0; } static void init_spawn_cmd_queue(void) { spawn_cmd_queue.cmd_tree.root = NULL; spawn_cmd_queue.cmd_tree.compar = spawn_cmd_compare; spawn_cmd_queue.size = 0; spawn_cmd_queue.latest_serial = 0; fatal_assert(0 == uv_cond_init(&spawn_cmd_queue.cond)); fatal_assert(0 == uv_mutex_init(&spawn_cmd_queue.mutex)); } /* * Returns serial number of the enqueued command */ uint64_t spawn_enq_cmd(const char *command_to_run) { unsigned queue_size; uint64_t serial; avl_t *avl_ret; struct spawn_cmd_info *cmdinfo; cmdinfo = create_spawn_cmd(command_to_run); /* wait for free space in queue */ uv_mutex_lock(&spawn_cmd_queue.mutex); while ((queue_size = spawn_cmd_queue.size) == SPAWN_MAX_OUTSTANDING) { uv_cond_wait(&spawn_cmd_queue.cond, &spawn_cmd_queue.mutex); } fatal_assert(queue_size < SPAWN_MAX_OUTSTANDING); spawn_cmd_queue.size = queue_size + 1; serial = ++spawn_cmd_queue.latest_serial; /* 0 is invalid */ cmdinfo->serial = serial; /* No need to take the cmd mutex since it is unreachable at the moment */ /* enqueue command */ avl_ret = avl_insert(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo); fatal_assert(avl_ret == (avl_t *)cmdinfo); uv_mutex_unlock(&spawn_cmd_queue.mutex); /* wake up event loop */ fatal_assert(0 == uv_async_send(&spawn_async)); return serial; } /* * Blocks until command with serial finishes running. Only one thread is allowed to wait per command. */ void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp) { avl_t *avl_ret; struct spawn_cmd_info tmp, *cmdinfo; tmp.serial = serial; uv_mutex_lock(&spawn_cmd_queue.mutex); avl_ret = avl_search(&spawn_cmd_queue.cmd_tree, (avl_t *)&tmp); uv_mutex_unlock(&spawn_cmd_queue.mutex); fatal_assert(avl_ret); /* Could be NULL if more than 1 threads wait for the command */ cmdinfo = (struct spawn_cmd_info *)avl_ret; uv_mutex_lock(&cmdinfo->mutex); while (!(cmdinfo->flags & SPAWN_CMD_DONE)) { /* Only 1 thread is allowed to wait for this command to finish */ uv_cond_wait(&cmdinfo->cond, &cmdinfo->mutex); } uv_mutex_unlock(&cmdinfo->mutex); spawn_deq_cmd(cmdinfo); *exit_status = cmdinfo->exit_status; *exec_run_timestamp = cmdinfo->exec_run_timestamp; destroy_spawn_cmd(cmdinfo); } void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo) { unsigned queue_size; avl_t *avl_ret; uv_mutex_lock(&spawn_cmd_queue.mutex); queue_size = spawn_cmd_queue.size; fatal_assert(queue_size); /* dequeue command */ avl_ret = avl_remove(&spawn_cmd_queue.cmd_tree, (avl_t *)cmdinfo); fatal_assert(avl_ret); spawn_cmd_queue.size = queue_size - 1; /* wake up callers */ uv_cond_signal(&spawn_cmd_queue.cond); uv_mutex_unlock(&spawn_cmd_queue.mutex); } /* * Must be called from the spawn client event loop context. This way no mutex is needed because the event loop is the * only writer as far as struct spawn_cmd_info entries are concerned. */ static int find_unprocessed_spawn_cmd_cb(void *entry, void *data) { struct spawn_cmd_info **cmdinfop = data, *cmdinfo = entry; if (!(cmdinfo->flags & SPAWN_CMD_PROCESSED)) { *cmdinfop = cmdinfo; return -1; /* break tree traversal */ } return 0; /* continue traversing */ } struct spawn_cmd_info *spawn_get_unprocessed_cmd(void) { struct spawn_cmd_info *cmdinfo; unsigned queue_size; int ret; uv_mutex_lock(&spawn_cmd_queue.mutex); queue_size = spawn_cmd_queue.size; if (queue_size == 0) { uv_mutex_unlock(&spawn_cmd_queue.mutex); return NULL; } /* find command */ cmdinfo = NULL; ret = avl_traverse(&spawn_cmd_queue.cmd_tree, find_unprocessed_spawn_cmd_cb, (void *)&cmdinfo); if (-1 != ret) { /* no commands available for processing */ uv_mutex_unlock(&spawn_cmd_queue.mutex); return NULL; } uv_mutex_unlock(&spawn_cmd_queue.mutex); return cmdinfo; } /** * This function spawns a process that shares a libuv IPC pipe with the caller and performs spawn server duties. * The spawn server process will close all open file descriptors except for the pipe, UV_STDOUT_FD, and UV_STDERR_FD. * The caller has to be the netdata user as configured. * * @param loop the libuv loop of the caller context * @param spawn_channel the bidirectional libuv IPC pipe that the server and the caller will share * @param process the spawn server libuv process context * @return 0 on success or the libuv error code */ int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process) { uv_process_options_t options = {0}; char *args[3]; int ret; #define SPAWN_SERVER_DESCRIPTORS (3) uv_stdio_container_t stdio[SPAWN_SERVER_DESCRIPTORS]; struct passwd *passwd = NULL; char *user = NULL; passwd = getpwuid(getuid()); user = (passwd && passwd->pw_name) ? passwd->pw_name : ""; args[0] = netdata_exe_file; args[1] = SPAWN_SERVER_COMMAND_LINE_ARGUMENT; args[2] = NULL; memset(&options, 0, sizeof(options)); options.file = netdata_exe_file; options.args = args; options.exit_cb = NULL; //exit_cb; options.stdio = stdio; options.stdio_count = SPAWN_SERVER_DESCRIPTORS; stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE; stdio[0].data.stream = (uv_stream_t *)spawn_channel; /* bidirectional libuv pipe */ stdio[1].flags = UV_INHERIT_FD; stdio[1].data.fd = 1 /* UV_STDOUT_FD */; stdio[2].flags = UV_INHERIT_FD; stdio[2].data.fd = nd_log_health_fd() /* UV_STDERR_FD */; ret = uv_spawn(loop, process, &options); /* execute the netdata binary again as the netdata user */ if (0 != ret) { netdata_log_error("uv_spawn (process: \"%s\") (user: %s) failed (%s).", netdata_exe_file, user, uv_strerror(ret)); fatal("Cannot start netdata without the spawn server."); } return ret; } #define CONCURRENT_SPAWNS 16 #define SPAWN_ITERATIONS 10000 #undef CONCURRENT_STRESS_TEST void spawn_init(void) { struct completion completion; int error; netdata_log_info("Initializing spawn client."); init_spawn_cmd_queue(); completion_init(&completion); error = uv_thread_create(&thread, spawn_client, &completion); if (error) { netdata_log_error("uv_thread_create(): %s", uv_strerror(error)); goto after_error; } /* wait for spawn client thread to initialize */ completion_wait_for(&completion); completion_destroy(&completion); uv_thread_set_name_np(thread, "DAEMON_SPAWN"); if (spawn_thread_error) { error = uv_thread_join(&thread); if (error) { netdata_log_error("uv_thread_create(): %s", uv_strerror(error)); } goto after_error; } #ifdef CONCURRENT_STRESS_TEST signals_reset(); signals_unblock(); sleep(60); uint64_t serial[CONCURRENT_SPAWNS]; for (int j = 0 ; j < SPAWN_ITERATIONS ; ++j) { for (int i = 0; i < CONCURRENT_SPAWNS; ++i) { char cmd[64]; sprintf(cmd, "echo CONCURRENT_STRESS_TEST %d 1>&2", j * CONCURRENT_SPAWNS + i + 1); serial[i] = spawn_enq_cmd(cmd); netdata_log_info("Queued command %s for spawning.", cmd); } int exit_status; time_t exec_run_timestamp; for (int i = 0; i < CONCURRENT_SPAWNS; ++i) { netdata_log_info("Started waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status, exec_run_timestamp); spawn_wait_cmd(serial[i], &exit_status, &exec_run_timestamp); netdata_log_info("Finished waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status, exec_run_timestamp); } } exit(0); #endif return; after_error: netdata_log_error("Failed to initialize spawn service. The alarms notifications will not be spawned."); }