From d079b656b4719739b2247dcd9d46e9bec793095a Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 6 Feb 2023 17:11:34 +0100 Subject: Merging upstream version 1.38.0. Signed-off-by: Daniel Baumann --- collectors/plugins.d/plugins_d.c | 133 ++++++++++++++++++++++++--------------- 1 file changed, 83 insertions(+), 50 deletions(-) (limited to 'collectors/plugins.d/plugins_d.c') diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c index 79abc7070..7608f3afc 100644 --- a/collectors/plugins.d/plugins_d.c +++ b/collectors/plugins.d/plugins_d.c @@ -21,23 +21,54 @@ inline size_t pluginsd_initialize_plugin_directories() return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace, NULL, NULL, 0); } +static inline void plugin_set_disabled(struct plugind *cd) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + cd->unsafe.enabled = false; + netdata_spinlock_unlock(&cd->unsafe.spinlock); +} + +bool plugin_is_enabled(struct plugind *cd) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + bool ret = cd->unsafe.enabled; + netdata_spinlock_unlock(&cd->unsafe.spinlock); + return ret; +} + +static inline void plugin_set_running(struct plugind *cd) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + cd->unsafe.running = true; + netdata_spinlock_unlock(&cd->unsafe.spinlock); +} + +static inline bool plugin_is_running(struct plugind *cd) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + bool ret = cd->unsafe.running; + netdata_spinlock_unlock(&cd->unsafe.spinlock); + return ret; +} + static void pluginsd_worker_thread_cleanup(void *arg) { struct plugind *cd = (struct plugind *)arg; - if (cd->enabled && !cd->obsolete) { - cd->obsolete = 1; + netdata_spinlock_lock(&cd->unsafe.spinlock); + + cd->unsafe.running = false; + cd->unsafe.thread = 0; + pid_t pid = cd->unsafe.pid; + cd->unsafe.pid = 0; + + netdata_spinlock_unlock(&cd->unsafe.spinlock); + + if (pid) { info("data collection thread exiting"); - if (cd->pid) { - siginfo_t info; - info("killing child process pid %d", cd->pid); - if (killpid(cd->pid) != -1) { - info("waiting for child process pid %d to exit...", cd->pid); - waitid(P_PID, (id_t)cd->pid, &info, WEXITED); - } - cd->pid = 0; + siginfo_t info; + info("killing child process pid %d", pid); + if (killpid(pid) != -1) { + info("waiting for child process pid %d to exit...", pid); + waitid(P_PID, (id_t)pid, &info, WEXITED); } } } @@ -53,8 +84,8 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd) if (likely(cd->serial_failures <= SERIAL_FAILURES_THRESHOLD)) { info( "'%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.", - cd->fullfilename, cd->pid, - cd->enabled ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled."); + cd->fullfilename, cd->unsafe.pid, + plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled."); sleep((unsigned int)(cd->update_every * 10)); return; } @@ -63,35 +94,33 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd) error( "'%s' (pid %d) does not generate useful output, although it reports success (exits with 0)." "We have tried to collect something %zu times - unsuccessfully. Disabling it.", - cd->fullfilename, cd->pid, cd->serial_failures); - cd->enabled = 0; + cd->fullfilename, cd->unsafe.pid, cd->serial_failures); + plugin_set_disabled(cd); return; } - - return; } static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code) { if (worker_ret_code == -1) { - info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->pid); - cd->enabled = 0; + info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->unsafe.pid); + plugin_set_disabled(cd); return; } if (!cd->successful_collections) { error( "'%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", cd->fullfilename, - cd->pid, worker_ret_code); - cd->enabled = 0; + cd->unsafe.pid, worker_ret_code); + plugin_set_disabled(cd); return; } if (cd->serial_failures <= SERIAL_FAILURES_THRESHOLD) { error( "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s", - cd->fullfilename, cd->pid, worker_ret_code, cd->successful_collections, - cd->enabled ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled."); + cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, + plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled."); sleep((unsigned int)(cd->update_every * 10)); return; } @@ -100,48 +129,47 @@ static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_r error( "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)." "We tried to restart it %zu times, but it failed to generate data. Disabling it.", - cd->fullfilename, cd->pid, worker_ret_code, cd->successful_collections, cd->serial_failures); - cd->enabled = 0; + cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, cd->serial_failures); + plugin_set_disabled(cd); return; } - - return; } + #undef SERIAL_FAILURES_THRESHOLD -void *pluginsd_worker_thread(void *arg) +static void *pluginsd_worker_thread(void *arg) { worker_register("PLUGINSD"); netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg); struct plugind *cd = (struct plugind *)arg; + plugin_set_running(cd); - cd->obsolete = 0; size_t count = 0; - while (!netdata_exit) { + while (service_running(SERVICE_COLLECTORS)) { FILE *fp_child_input = NULL; - FILE *fp_child_output = netdata_popen(cd->cmd, &cd->pid, &fp_child_input); + FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input); if (unlikely(!fp_child_input || !fp_child_output)) { error("Cannot popen(\"%s\", \"r\").", cd->cmd); break; } - info("connected to '%s' running on pid %d", cd->fullfilename, cd->pid); + info("connected to '%s' running on pid %d", cd->fullfilename, cd->unsafe.pid); count = pluginsd_process(localhost, cd, fp_child_input, fp_child_output, 0); - error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->pid, count); - killpid(cd->pid); + error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->unsafe.pid, count); + killpid(cd->unsafe.pid); - int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->pid); + int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid); if (likely(worker_ret_code == 0)) pluginsd_worker_thread_handle_success(cd); else pluginsd_worker_thread_handle_error(cd, worker_ret_code); - cd->pid = 0; - if (unlikely(!cd->enabled)) + cd->unsafe.pid = 0; + if (unlikely(!plugin_is_enabled(cd))) break; } worker_unregister(); @@ -158,10 +186,12 @@ static void pluginsd_main_cleanup(void *data) struct plugind *cd; for (cd = pluginsd_root; cd; cd = cd->next) { - if (cd->enabled && !cd->obsolete) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + if (cd->unsafe.enabled && cd->unsafe.running && cd->unsafe.thread != 0) { info("stopping plugin thread: %s", cd->id); - netdata_thread_cancel(cd->thread); + netdata_thread_cancel(cd->unsafe.thread); } + netdata_spinlock_unlock(&cd->unsafe.spinlock); } info("cleanup completed."); @@ -186,12 +216,12 @@ void *pluginsd_main(void *ptr) // so that we don't log broken directories on each loop int directory_errors[PLUGINSD_MAX_DIRECTORIES] = { 0 }; - while (!netdata_exit) { + while (service_running(SERVICE_COLLECTORS)) { int idx; const char *directory_name; for (idx = 0; idx < PLUGINSD_MAX_DIRECTORIES && (directory_name = plugin_directories[idx]); idx++) { - if (unlikely(netdata_exit)) + if (unlikely(!service_running(SERVICE_COLLECTORS))) break; errno = 0; @@ -206,7 +236,7 @@ void *pluginsd_main(void *ptr) struct dirent *file = NULL; while (likely((file = readdir(dir)))) { - if (unlikely(netdata_exit)) + if (unlikely(!service_running(SERVICE_COLLECTORS))) break; debug(D_PLUGINSD, "examining file '%s'", file->d_name); @@ -237,7 +267,7 @@ void *pluginsd_main(void *ptr) if (unlikely(strcmp(cd->filename, file->d_name) == 0)) break; - if (likely(cd && !cd->obsolete)) { + if (likely(cd && plugin_is_running(cd))) { debug(D_PLUGINSD, "plugin '%s' is already running", cd->filename); continue; } @@ -252,7 +282,9 @@ void *pluginsd_main(void *ptr) strncpyz(cd->filename, file->d_name, FILENAME_MAX); snprintfz(cd->fullfilename, FILENAME_MAX, "%s/%s", directory_name, cd->filename); - cd->enabled = enabled; + cd->unsafe.enabled = enabled; + cd->unsafe.running = false; + cd->update_every = (int)config_get_number(cd->id, "update every", localhost->rrd_update_every); cd->started_t = now_realtime_sec(); @@ -266,15 +298,16 @@ void *pluginsd_main(void *ptr) cd->next = pluginsd_root; pluginsd_root = cd; - // it is not currently running - cd->obsolete = 1; - - if (cd->enabled) { + if (plugin_is_enabled(cd)) { char tag[NETDATA_THREAD_TAG_MAX + 1]; - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "PLUGINSD[%s]", pluginname); + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "PD[%s]", pluginname); + // spawn a new thread for it - netdata_thread_create( - &cd->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, pluginsd_worker_thread, cd); + netdata_thread_create(&cd->unsafe.thread, + tag, + NETDATA_THREAD_OPTION_DEFAULT, + pluginsd_worker_thread, + cd); } } } -- cgit v1.2.3