summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d/plugins_d.c
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/plugins.d/plugins_d.c')
-rw-r--r--collectors/plugins.d/plugins_d.c133
1 files changed, 83 insertions, 50 deletions
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c
index 79abc7070..7608f3afc 100644
--- a/collectors/plugins.d/plugins_d.c
+++ b/collectors/plugins.d/plugins_d.c
@@ -21,23 +21,54 @@ inline size_t pluginsd_initialize_plugin_directories()
return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace, NULL, NULL, 0);
}
+static inline void plugin_set_disabled(struct plugind *cd) {
+ netdata_spinlock_lock(&cd->unsafe.spinlock);
+ cd->unsafe.enabled = false;
+ netdata_spinlock_unlock(&cd->unsafe.spinlock);
+}
+
+bool plugin_is_enabled(struct plugind *cd) {
+ netdata_spinlock_lock(&cd->unsafe.spinlock);
+ bool ret = cd->unsafe.enabled;
+ netdata_spinlock_unlock(&cd->unsafe.spinlock);
+ return ret;
+}
+
+static inline void plugin_set_running(struct plugind *cd) {
+ netdata_spinlock_lock(&cd->unsafe.spinlock);
+ cd->unsafe.running = true;
+ netdata_spinlock_unlock(&cd->unsafe.spinlock);
+}
+
+static inline bool plugin_is_running(struct plugind *cd) {
+ netdata_spinlock_lock(&cd->unsafe.spinlock);
+ bool ret = cd->unsafe.running;
+ netdata_spinlock_unlock(&cd->unsafe.spinlock);
+ return ret;
+}
+
static void pluginsd_worker_thread_cleanup(void *arg)
{
struct plugind *cd = (struct plugind *)arg;
- if (cd->enabled && !cd->obsolete) {
- cd->obsolete = 1;
+ netdata_spinlock_lock(&cd->unsafe.spinlock);
+
+ cd->unsafe.running = false;
+ cd->unsafe.thread = 0;
+ pid_t pid = cd->unsafe.pid;
+ cd->unsafe.pid = 0;
+
+ netdata_spinlock_unlock(&cd->unsafe.spinlock);
+
+ if (pid) {
info("data collection thread exiting");
- if (cd->pid) {
- siginfo_t info;
- info("killing child process pid %d", cd->pid);
- if (killpid(cd->pid) != -1) {
- info("waiting for child process pid %d to exit...", cd->pid);
- waitid(P_PID, (id_t)cd->pid, &info, WEXITED);
- }
- cd->pid = 0;
+ siginfo_t info;
+ info("killing child process pid %d", pid);
+ if (killpid(pid) != -1) {
+ info("waiting for child process pid %d to exit...", pid);
+ waitid(P_PID, (id_t)pid, &info, WEXITED);
}
}
}
@@ -53,8 +84,8 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd)
if (likely(cd->serial_failures <= SERIAL_FAILURES_THRESHOLD)) {
info(
"'%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.",
- cd->fullfilename, cd->pid,
- cd->enabled ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled.");
+ cd->fullfilename, cd->unsafe.pid,
+ plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled.");
sleep((unsigned int)(cd->update_every * 10));
return;
}
@@ -63,35 +94,33 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd)
error(
"'%s' (pid %d) does not generate useful output, although it reports success (exits with 0)."
"We have tried to collect something %zu times - unsuccessfully. Disabling it.",
- cd->fullfilename, cd->pid, cd->serial_failures);
- cd->enabled = 0;
+ cd->fullfilename, cd->unsafe.pid, cd->serial_failures);
+ plugin_set_disabled(cd);
return;
}
-
- return;
}
static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code)
{
if (worker_ret_code == -1) {
- info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->pid);
- cd->enabled = 0;
+ info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->unsafe.pid);
+ plugin_set_disabled(cd);
return;
}
if (!cd->successful_collections) {
error(
"'%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", cd->fullfilename,
- cd->pid, worker_ret_code);
- cd->enabled = 0;
+ cd->unsafe.pid, worker_ret_code);
+ plugin_set_disabled(cd);
return;
}
if (cd->serial_failures <= SERIAL_FAILURES_THRESHOLD) {
error(
"'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s",
- cd->fullfilename, cd->pid, worker_ret_code, cd->successful_collections,
- cd->enabled ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled.");
+ cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections,
+ plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled.");
sleep((unsigned int)(cd->update_every * 10));
return;
}
@@ -100,48 +129,47 @@ static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_r
error(
"'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)."
"We tried to restart it %zu times, but it failed to generate data. Disabling it.",
- cd->fullfilename, cd->pid, worker_ret_code, cd->successful_collections, cd->serial_failures);
- cd->enabled = 0;
+ cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, cd->serial_failures);
+ plugin_set_disabled(cd);
return;
}
-
- return;
}
+
#undef SERIAL_FAILURES_THRESHOLD
-void *pluginsd_worker_thread(void *arg)
+static void *pluginsd_worker_thread(void *arg)
{
worker_register("PLUGINSD");
netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg);
struct plugind *cd = (struct plugind *)arg;
+ plugin_set_running(cd);
- cd->obsolete = 0;
size_t count = 0;
- while (!netdata_exit) {
+ while (service_running(SERVICE_COLLECTORS)) {
FILE *fp_child_input = NULL;
- FILE *fp_child_output = netdata_popen(cd->cmd, &cd->pid, &fp_child_input);
+ FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input);
if (unlikely(!fp_child_input || !fp_child_output)) {
error("Cannot popen(\"%s\", \"r\").", cd->cmd);
break;
}
- info("connected to '%s' running on pid %d", cd->fullfilename, cd->pid);
+ info("connected to '%s' running on pid %d", cd->fullfilename, cd->unsafe.pid);
count = pluginsd_process(localhost, cd, fp_child_input, fp_child_output, 0);
- error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->pid, count);
- killpid(cd->pid);
+ error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->unsafe.pid, count);
+ killpid(cd->unsafe.pid);
- int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->pid);
+ int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid);
if (likely(worker_ret_code == 0))
pluginsd_worker_thread_handle_success(cd);
else
pluginsd_worker_thread_handle_error(cd, worker_ret_code);
- cd->pid = 0;
- if (unlikely(!cd->enabled))
+ cd->unsafe.pid = 0;
+ if (unlikely(!plugin_is_enabled(cd)))
break;
}
worker_unregister();
@@ -158,10 +186,12 @@ static void pluginsd_main_cleanup(void *data)
struct plugind *cd;
for (cd = pluginsd_root; cd; cd = cd->next) {
- if (cd->enabled && !cd->obsolete) {
+ netdata_spinlock_lock(&cd->unsafe.spinlock);
+ if (cd->unsafe.enabled && cd->unsafe.running && cd->unsafe.thread != 0) {
info("stopping plugin thread: %s", cd->id);
- netdata_thread_cancel(cd->thread);
+ netdata_thread_cancel(cd->unsafe.thread);
}
+ netdata_spinlock_unlock(&cd->unsafe.spinlock);
}
info("cleanup completed.");
@@ -186,12 +216,12 @@ void *pluginsd_main(void *ptr)
// so that we don't log broken directories on each loop
int directory_errors[PLUGINSD_MAX_DIRECTORIES] = { 0 };
- while (!netdata_exit) {
+ while (service_running(SERVICE_COLLECTORS)) {
int idx;
const char *directory_name;
for (idx = 0; idx < PLUGINSD_MAX_DIRECTORIES && (directory_name = plugin_directories[idx]); idx++) {
- if (unlikely(netdata_exit))
+ if (unlikely(!service_running(SERVICE_COLLECTORS)))
break;
errno = 0;
@@ -206,7 +236,7 @@ void *pluginsd_main(void *ptr)
struct dirent *file = NULL;
while (likely((file = readdir(dir)))) {
- if (unlikely(netdata_exit))
+ if (unlikely(!service_running(SERVICE_COLLECTORS)))
break;
debug(D_PLUGINSD, "examining file '%s'", file->d_name);
@@ -237,7 +267,7 @@ void *pluginsd_main(void *ptr)
if (unlikely(strcmp(cd->filename, file->d_name) == 0))
break;
- if (likely(cd && !cd->obsolete)) {
+ if (likely(cd && plugin_is_running(cd))) {
debug(D_PLUGINSD, "plugin '%s' is already running", cd->filename);
continue;
}
@@ -252,7 +282,9 @@ void *pluginsd_main(void *ptr)
strncpyz(cd->filename, file->d_name, FILENAME_MAX);
snprintfz(cd->fullfilename, FILENAME_MAX, "%s/%s", directory_name, cd->filename);
- cd->enabled = enabled;
+ cd->unsafe.enabled = enabled;
+ cd->unsafe.running = false;
+
cd->update_every = (int)config_get_number(cd->id, "update every", localhost->rrd_update_every);
cd->started_t = now_realtime_sec();
@@ -266,15 +298,16 @@ void *pluginsd_main(void *ptr)
cd->next = pluginsd_root;
pluginsd_root = cd;
- // it is not currently running
- cd->obsolete = 1;
-
- if (cd->enabled) {
+ if (plugin_is_enabled(cd)) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
- snprintfz(tag, NETDATA_THREAD_TAG_MAX, "PLUGINSD[%s]", pluginname);
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "PD[%s]", pluginname);
+
// spawn a new thread for it
- netdata_thread_create(
- &cd->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, pluginsd_worker_thread, cd);
+ netdata_thread_create(&cd->unsafe.thread,
+ tag,
+ NETDATA_THREAD_OPTION_DEFAULT,
+ pluginsd_worker_thread,
+ cd);
}
}
}