diff options
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r-- | collectors/plugins.d/README.md | 34 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.c | 133 | ||||
-rw-r--r-- | collectors/plugins.d/plugins_d.h | 13 | ||||
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 63 |
4 files changed, 145 insertions, 98 deletions
diff --git a/collectors/plugins.d/README.md b/collectors/plugins.d/README.md index 2ecf233f7..8ad1d3a65 100644 --- a/collectors/plugins.d/README.md +++ b/collectors/plugins.d/README.md @@ -1,6 +1,10 @@ <!-- title: "External plugins overview" -custom_edit_url: https://github.com/netdata/netdata/edit/master/collectors/plugins.d/README.md +custom_edit_url: "https://github.com/netdata/netdata/edit/master/collectors/plugins.d/README.md" +sidebar_label: "External plugins overview" +learn_status: "Published" +learn_topic_type: "References" +learn_rel_path: "Developers" --> # External plugins overview @@ -12,17 +16,18 @@ from external processes, thus allowing Netdata to use **external plugins**. |plugin|language|O/S|description| |:----:|:------:|:-:|:----------| -|[apps.plugin](/collectors/apps.plugin/README.md)|`C`|linux, freebsd|monitors the whole process tree on Linux and FreeBSD and breaks down system resource usage by **process**, **user** and **user group**.| -|[charts.d.plugin](/collectors/charts.d.plugin/README.md)|`BASH`|all|a **plugin orchestrator** for data collection modules written in `BASH` v4+.| -|[cups.plugin](/collectors/cups.plugin/README.md)|`C`|all|monitors **CUPS**| -|[fping.plugin](/collectors/fping.plugin/README.md)|`C`|all|measures network latency, jitter and packet loss between the monitored node and any number of remote network end points.| -|[ioping.plugin](/collectors/ioping.plugin/README.md)|`C`|all|measures disk latency.| -|[freeipmi.plugin](/collectors/freeipmi.plugin/README.md)|`C`|linux|collects metrics from enterprise hardware sensors, on Linux servers.| -|[nfacct.plugin](/collectors/nfacct.plugin/README.md)|`C`|linux|collects netfilter firewall, connection tracker and accounting metrics using `libmnl` and `libnetfilter_acct`.| -|[xenstat.plugin](/collectors/xenstat.plugin/README.md)|`C`|linux|collects XenServer and XCP-ng metrics using `lxenstat`.| -|[perf.plugin](/collectors/perf.plugin/README.md)|`C`|linux|collects CPU performance metrics using performance monitoring units (PMU).| -|[python.d.plugin](/collectors/python.d.plugin/README.md)|`python`|all|a **plugin orchestrator** for data collection modules written in `python` v2 or v3 (both are supported).| -|[slabinfo.plugin](/collectors/slabinfo.plugin/README.md)|`C`|linux|collects kernel internal cache objects (SLAB) metrics.| +|[apps.plugin](https://github.com/netdata/netdata/blob/master/collectors/apps.plugin/README.md)|`C`|linux, freebsd|monitors the whole process tree on Linux and FreeBSD and breaks down system resource usage by **process**, **user** and **user group**.| +|[charts.d.plugin](https://github.com/netdata/netdata/blob/master/collectors/charts.d.plugin/README.md)|`BASH`|all|a **plugin orchestrator** for data collection modules written in `BASH` v4+.| +|[cups.plugin](https://github.com/netdata/netdata/blob/master/collectors/cups.plugin/README.md)|`C`|all|monitors **CUPS**| +|[ebpf.plugin](https://github.com/netdata/netdata/blob/master/collectors/ebpf.plugin/README.md)|`C`|linux|monitors different metrics on environments using kernel internal functions.| +|[go.d.plugin](https://github.com/netdata/go.d.plugin/blob/master/README.md)|`GO`|all|collects metrics from the system, applications, or third-party APIs.| +|[ioping.plugin](https://github.com/netdata/netdata/blob/master/collectors/ioping.plugin/README.md)|`C`|all|measures disk latency.| +|[freeipmi.plugin](https://github.com/netdata/netdata/blob/master/collectors/freeipmi.plugin/README.md)|`C`|linux|collects metrics from enterprise hardware sensors, on Linux servers.| +|[nfacct.plugin](https://github.com/netdata/netdata/blob/master/collectors/nfacct.plugin/README.md)|`C`|linux|collects netfilter firewall, connection tracker and accounting metrics using `libmnl` and `libnetfilter_acct`.| +|[xenstat.plugin](https://github.com/netdata/netdata/blob/master/collectors/xenstat.plugin/README.md)|`C`|linux|collects XenServer and XCP-ng metrics using `lxenstat`.| +|[perf.plugin](https://github.com/netdata/netdata/blob/master/collectors/perf.plugin/README.md)|`C`|linux|collects CPU performance metrics using performance monitoring units (PMU).| +|[python.d.plugin](https://github.com/netdata/netdata/blob/master/collectors/python.d.plugin/README.md)|`python`|all|a **plugin orchestrator** for data collection modules written in `python` v2 or v3 (both are supported).| +|[slabinfo.plugin](https://github.com/netdata/netdata/blob/master/collectors/slabinfo.plugin/README.md)|`C`|linux|collects kernel internal cache objects (SLAB) metrics.| Plugin orchestrators may also be described as **modular plugins**. They are modular since they accept custom made modules to be included. Writing modules for these plugins is easier than accessing the native Netdata API directly. You will find modules already available for each orchestrator under the directory of the particular modular plugin (e.g. under python.d.plugin for the python orchestrator). Each of these modular plugins has each own methods for defining modules. Please check the examples and their documentation. @@ -71,7 +76,6 @@ Example: # check for new plugins every = 60 # charts.d = yes - # fping = yes # ioping = yes # python.d = yes ``` @@ -504,12 +508,12 @@ or do not output the line at all. ## Modular Plugins 1. **python**, use `python.d.plugin`, there are many examples in the [python.d - directory](/collectors/python.d.plugin/README.md) + directory](https://github.com/netdata/netdata/blob/master/collectors/python.d.plugin/README.md) python is ideal for Netdata plugins. It is a simple, yet powerful way to collect data, it has a very small memory footprint, although it is not the most CPU efficient way to do it. 2. **BASH**, use `charts.d.plugin`, there are many examples in the [charts.d - directory](/collectors/charts.d.plugin/README.md) + directory](https://github.com/netdata/netdata/blob/master/collectors/charts.d.plugin/README.md) BASH is the simplest scripting language for collecting values. It is the less efficient though in terms of CPU resources. You can use it to collect data quickly, but extensive use of it might use a lot of system resources. diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c index 79abc7070..7608f3afc 100644 --- a/collectors/plugins.d/plugins_d.c +++ b/collectors/plugins.d/plugins_d.c @@ -21,23 +21,54 @@ inline size_t pluginsd_initialize_plugin_directories() return quoted_strings_splitter(plugins_dir_list, plugin_directories, PLUGINSD_MAX_DIRECTORIES, config_isspace, NULL, NULL, 0); } +static inline void plugin_set_disabled(struct plugind *cd) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + cd->unsafe.enabled = false; + netdata_spinlock_unlock(&cd->unsafe.spinlock); +} + +bool plugin_is_enabled(struct plugind *cd) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + bool ret = cd->unsafe.enabled; + netdata_spinlock_unlock(&cd->unsafe.spinlock); + return ret; +} + +static inline void plugin_set_running(struct plugind *cd) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + cd->unsafe.running = true; + netdata_spinlock_unlock(&cd->unsafe.spinlock); +} + +static inline bool plugin_is_running(struct plugind *cd) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + bool ret = cd->unsafe.running; + netdata_spinlock_unlock(&cd->unsafe.spinlock); + return ret; +} + static void pluginsd_worker_thread_cleanup(void *arg) { struct plugind *cd = (struct plugind *)arg; - if (cd->enabled && !cd->obsolete) { - cd->obsolete = 1; + netdata_spinlock_lock(&cd->unsafe.spinlock); + + cd->unsafe.running = false; + cd->unsafe.thread = 0; + pid_t pid = cd->unsafe.pid; + cd->unsafe.pid = 0; + + netdata_spinlock_unlock(&cd->unsafe.spinlock); + + if (pid) { info("data collection thread exiting"); - if (cd->pid) { - siginfo_t info; - info("killing child process pid %d", cd->pid); - if (killpid(cd->pid) != -1) { - info("waiting for child process pid %d to exit...", cd->pid); - waitid(P_PID, (id_t)cd->pid, &info, WEXITED); - } - cd->pid = 0; + siginfo_t info; + info("killing child process pid %d", pid); + if (killpid(pid) != -1) { + info("waiting for child process pid %d to exit...", pid); + waitid(P_PID, (id_t)pid, &info, WEXITED); } } } @@ -53,8 +84,8 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd) if (likely(cd->serial_failures <= SERIAL_FAILURES_THRESHOLD)) { info( "'%s' (pid %d) does not generate useful output but it reports success (exits with 0). %s.", - cd->fullfilename, cd->pid, - cd->enabled ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled."); + cd->fullfilename, cd->unsafe.pid, + plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is now disabled."); sleep((unsigned int)(cd->update_every * 10)); return; } @@ -63,35 +94,33 @@ static void pluginsd_worker_thread_handle_success(struct plugind *cd) error( "'%s' (pid %d) does not generate useful output, although it reports success (exits with 0)." "We have tried to collect something %zu times - unsuccessfully. Disabling it.", - cd->fullfilename, cd->pid, cd->serial_failures); - cd->enabled = 0; + cd->fullfilename, cd->unsafe.pid, cd->serial_failures); + plugin_set_disabled(cd); return; } - - return; } static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_ret_code) { if (worker_ret_code == -1) { - info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->pid); - cd->enabled = 0; + info("'%s' (pid %d) was killed with SIGTERM. Disabling it.", cd->fullfilename, cd->unsafe.pid); + plugin_set_disabled(cd); return; } if (!cd->successful_collections) { error( "'%s' (pid %d) exited with error code %d and haven't collected any data. Disabling it.", cd->fullfilename, - cd->pid, worker_ret_code); - cd->enabled = 0; + cd->unsafe.pid, worker_ret_code); + plugin_set_disabled(cd); return; } if (cd->serial_failures <= SERIAL_FAILURES_THRESHOLD) { error( "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times). %s", - cd->fullfilename, cd->pid, worker_ret_code, cd->successful_collections, - cd->enabled ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled."); + cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, + plugin_is_enabled(cd) ? "Waiting a bit before starting it again." : "Will not start it again - it is disabled."); sleep((unsigned int)(cd->update_every * 10)); return; } @@ -100,48 +129,47 @@ static void pluginsd_worker_thread_handle_error(struct plugind *cd, int worker_r error( "'%s' (pid %d) exited with error code %d, but has given useful output in the past (%zu times)." "We tried to restart it %zu times, but it failed to generate data. Disabling it.", - cd->fullfilename, cd->pid, worker_ret_code, cd->successful_collections, cd->serial_failures); - cd->enabled = 0; + cd->fullfilename, cd->unsafe.pid, worker_ret_code, cd->successful_collections, cd->serial_failures); + plugin_set_disabled(cd); return; } - - return; } + #undef SERIAL_FAILURES_THRESHOLD -void *pluginsd_worker_thread(void *arg) +static void *pluginsd_worker_thread(void *arg) { worker_register("PLUGINSD"); netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg); struct plugind *cd = (struct plugind *)arg; + plugin_set_running(cd); - cd->obsolete = 0; size_t count = 0; - while (!netdata_exit) { + while (service_running(SERVICE_COLLECTORS)) { FILE *fp_child_input = NULL; - FILE *fp_child_output = netdata_popen(cd->cmd, &cd->pid, &fp_child_input); + FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input); if (unlikely(!fp_child_input || !fp_child_output)) { error("Cannot popen(\"%s\", \"r\").", cd->cmd); break; } - info("connected to '%s' running on pid %d", cd->fullfilename, cd->pid); + info("connected to '%s' running on pid %d", cd->fullfilename, cd->unsafe.pid); count = pluginsd_process(localhost, cd, fp_child_input, fp_child_output, 0); - error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->pid, count); - killpid(cd->pid); + error("'%s' (pid %d) disconnected after %zu successful data collections (ENDs).", cd->fullfilename, cd->unsafe.pid, count); + killpid(cd->unsafe.pid); - int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->pid); + int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid); if (likely(worker_ret_code == 0)) pluginsd_worker_thread_handle_success(cd); else pluginsd_worker_thread_handle_error(cd, worker_ret_code); - cd->pid = 0; - if (unlikely(!cd->enabled)) + cd->unsafe.pid = 0; + if (unlikely(!plugin_is_enabled(cd))) break; } worker_unregister(); @@ -158,10 +186,12 @@ static void pluginsd_main_cleanup(void *data) struct plugind *cd; for (cd = pluginsd_root; cd; cd = cd->next) { - if (cd->enabled && !cd->obsolete) { + netdata_spinlock_lock(&cd->unsafe.spinlock); + if (cd->unsafe.enabled && cd->unsafe.running && cd->unsafe.thread != 0) { info("stopping plugin thread: %s", cd->id); - netdata_thread_cancel(cd->thread); + netdata_thread_cancel(cd->unsafe.thread); } + netdata_spinlock_unlock(&cd->unsafe.spinlock); } info("cleanup completed."); @@ -186,12 +216,12 @@ void *pluginsd_main(void *ptr) // so that we don't log broken directories on each loop int directory_errors[PLUGINSD_MAX_DIRECTORIES] = { 0 }; - while (!netdata_exit) { + while (service_running(SERVICE_COLLECTORS)) { int idx; const char *directory_name; for (idx = 0; idx < PLUGINSD_MAX_DIRECTORIES && (directory_name = plugin_directories[idx]); idx++) { - if (unlikely(netdata_exit)) + if (unlikely(!service_running(SERVICE_COLLECTORS))) break; errno = 0; @@ -206,7 +236,7 @@ void *pluginsd_main(void *ptr) struct dirent *file = NULL; while (likely((file = readdir(dir)))) { - if (unlikely(netdata_exit)) + if (unlikely(!service_running(SERVICE_COLLECTORS))) break; debug(D_PLUGINSD, "examining file '%s'", file->d_name); @@ -237,7 +267,7 @@ void *pluginsd_main(void *ptr) if (unlikely(strcmp(cd->filename, file->d_name) == 0)) break; - if (likely(cd && !cd->obsolete)) { + if (likely(cd && plugin_is_running(cd))) { debug(D_PLUGINSD, "plugin '%s' is already running", cd->filename); continue; } @@ -252,7 +282,9 @@ void *pluginsd_main(void *ptr) strncpyz(cd->filename, file->d_name, FILENAME_MAX); snprintfz(cd->fullfilename, FILENAME_MAX, "%s/%s", directory_name, cd->filename); - cd->enabled = enabled; + cd->unsafe.enabled = enabled; + cd->unsafe.running = false; + cd->update_every = (int)config_get_number(cd->id, "update every", localhost->rrd_update_every); cd->started_t = now_realtime_sec(); @@ -266,15 +298,16 @@ void *pluginsd_main(void *ptr) cd->next = pluginsd_root; pluginsd_root = cd; - // it is not currently running - cd->obsolete = 1; - - if (cd->enabled) { + if (plugin_is_enabled(cd)) { char tag[NETDATA_THREAD_TAG_MAX + 1]; - snprintfz(tag, NETDATA_THREAD_TAG_MAX, "PLUGINSD[%s]", pluginname); + snprintfz(tag, NETDATA_THREAD_TAG_MAX, "PD[%s]", pluginname); + // spawn a new thread for it - netdata_thread_create( - &cd->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, pluginsd_worker_thread, cd); + netdata_thread_create(&cd->unsafe.thread, + tag, + NETDATA_THREAD_OPTION_DEFAULT, + pluginsd_worker_thread, + cd); } } } diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h index a8acf038a..35af9fe58 100644 --- a/collectors/plugins.d/plugins_d.h +++ b/collectors/plugins.d/plugins_d.h @@ -50,9 +50,6 @@ struct plugind { char fullfilename[FILENAME_MAX+1]; // with path char cmd[PLUGINSD_CMD_MAX+1]; // the command that it executes - volatile pid_t pid; - netdata_thread_t thread; - size_t successful_collections; // the number of times we have seen // values collected from this plugin @@ -60,8 +57,14 @@ struct plugind { // without collecting values int update_every; // the plugin default data collection frequency - volatile sig_atomic_t obsolete; // do not touch this structure after setting this to 1 - volatile sig_atomic_t enabled; // if this is enabled or not + + struct { + SPINLOCK spinlock; + bool running; // do not touch this structure after setting this to 1 + bool enabled; // if this is enabled or not + netdata_thread_t thread; + pid_t pid; + } unsafe; time_t started_t; uint32_t capabilities; // follows the same principles as streaming capabilities diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 5501c12fa..2c0f2cbc6 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -148,8 +148,11 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user) ((PARSER_USER_OBJECT *)user)->st = st; usec_t microseconds = 0; - if (microseconds_txt && *microseconds_txt) - microseconds = str2ull(microseconds_txt); + if (microseconds_txt && *microseconds_txt) { + long long t = str2ll(microseconds_txt, NULL); + if(t >= 0) + microseconds = t; + } #ifdef NETDATA_LOG_REPLICATION_REQUESTS if(st->replay.log_next_data_collection) { @@ -326,7 +329,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us { const char *first_entry_txt = get_word(words, num_words, 1); const char *last_entry_txt = get_word(words, num_words, 2); - const char *world_time_txt = get_word(words, num_words, 3); + const char *wall_clock_time_txt = get_word(words, num_words, 3); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END); if(!host) return PLUGINSD_DISABLE_PLUGIN(user); @@ -336,12 +339,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0; time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0; - time_t child_world_time = (world_time_txt && *world_time_txt) ? (time_t)str2ul(world_time_txt) : now_realtime_sec(); - - if((first_entry_child != 0 || last_entry_child != 0) && (first_entry_child == 0 || last_entry_child == 0)) - error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %ld, last time %ld, world time %ld).", - rrdhost_hostname(host), rrdset_id(st), - first_entry_child, last_entry_child, child_world_time); + time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec(); bool ok = true; if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { @@ -358,7 +356,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser; ok = replicate_chart_request(send_to_plugin, parser, host, st, - first_entry_child, last_entry_child, child_world_time, + first_entry_child, last_entry_child, child_wall_clock_time, 0, 0); } #ifdef NETDATA_LOG_REPLICATION_REQUESTS @@ -441,19 +439,20 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user) } else rrddim_isnot_obsolete(st, rd); + bool should_update_dimension = false; + if (likely(unhide_dimension)) { rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN); - if (rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) { - rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN); - metaqueue_dimension_update_flags(rd); - } + should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN); } else { rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN); - if (!rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) { - rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN); - metaqueue_dimension_update_flags(rd); - } + should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN); + } + + if (should_update_dimension) { + rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE); + rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); } return PARSER_RC_OK; @@ -529,7 +528,7 @@ static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __may } void inflight_functions_init(PARSER *parser) { - parser->inflight.functions = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + parser->inflight.functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE, &dictionary_stats_category_functions, 0); dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser); dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser); dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser); @@ -883,7 +882,7 @@ PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __may host->rrdlabels = rrdlabels_create(); rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels)); - metaqueue_store_host_labels(host->machine_guid); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE); rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels); ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL; @@ -991,7 +990,7 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) { if (unlikely(end_time - start_time != st->update_every)) - rrdset_set_update_every(st, end_time - start_time); + rrdset_set_update_every_s(st, end_time - start_time); st->last_collected_time.tv_sec = end_time; st->last_collected_time.tv_usec = 0; @@ -1124,6 +1123,9 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user) { + if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false) + return PARSER_RC_OK; + char *dimension = get_word(words, num_words, 1); char *last_collected_ut_str = get_word(words, num_words, 2); char *last_collected_value_str = get_word(words, num_words, 3); @@ -1156,6 +1158,9 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user) { + if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false) + return PARSER_RC_OK; + char *last_collected_ut_str = get_word(words, num_words, 1); char *last_updated_ut_str = get_word(words, num_words, 2); @@ -1238,7 +1243,8 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) time_t started = st->rrdhost->receiver->replication_first_time_t; time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time; - worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, + if(started && current > started) + worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, (NETDATA_DOUBLE)(current - started) * 100.0 / (NETDATA_DOUBLE)(now - started)); } @@ -1251,6 +1257,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) st->counter++; st->counter_done++; + store_metric_collection_completed(); #ifdef NETDATA_LOG_REPLICATION_REQUESTS st->replay.start_streaming = false; @@ -1262,7 +1269,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) if (start_streaming) { if (st->update_every != update_every_child) - rrdset_set_update_every(st, update_every_child); + rrdset_set_update_every_s(st, update_every_child); if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); @@ -1298,10 +1305,10 @@ static void pluginsd_process_thread_cleanup(void *ptr) { inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations) { - int enabled = cd->enabled; + int enabled = cd->unsafe.enabled; if (!fp_plugin_input || !fp_plugin_output || !enabled) { - cd->enabled = 0; + cd->unsafe.enabled = 0; return 0; } @@ -1321,7 +1328,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi clearerr(fp_plugin_output); PARSER_USER_OBJECT user = { - .enabled = cd->enabled, + .enabled = cd->unsafe.enabled, .host = host, .cd = cd, .trust_durations = trust_durations @@ -1339,14 +1346,14 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi user.parser = parser; while (likely(!parser_next(parser))) { - if (unlikely(netdata_exit || parser_action(parser, NULL))) + if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, NULL))) break; } // free parser with the pop function netdata_thread_cleanup_pop(1); - cd->enabled = user.enabled; + cd->unsafe.enabled = user.enabled; size_t count = user.count; if (likely(count)) { |