From d079b656b4719739b2247dcd9d46e9bec793095a Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 6 Feb 2023 17:11:34 +0100 Subject: Merging upstream version 1.38.0. Signed-off-by: Daniel Baumann --- collectors/plugins.d/pluginsd_parser.c | 63 +++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 28 deletions(-) (limited to 'collectors/plugins.d/pluginsd_parser.c') diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 5501c12fa..2c0f2cbc6 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -148,8 +148,11 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user) ((PARSER_USER_OBJECT *)user)->st = st; usec_t microseconds = 0; - if (microseconds_txt && *microseconds_txt) - microseconds = str2ull(microseconds_txt); + if (microseconds_txt && *microseconds_txt) { + long long t = str2ll(microseconds_txt, NULL); + if(t >= 0) + microseconds = t; + } #ifdef NETDATA_LOG_REPLICATION_REQUESTS if(st->replay.log_next_data_collection) { @@ -326,7 +329,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us { const char *first_entry_txt = get_word(words, num_words, 1); const char *last_entry_txt = get_word(words, num_words, 2); - const char *world_time_txt = get_word(words, num_words, 3); + const char *wall_clock_time_txt = get_word(words, num_words, 3); RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END); if(!host) return PLUGINSD_DISABLE_PLUGIN(user); @@ -336,12 +339,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0; time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0; - time_t child_world_time = (world_time_txt && *world_time_txt) ? (time_t)str2ul(world_time_txt) : now_realtime_sec(); - - if((first_entry_child != 0 || last_entry_child != 0) && (first_entry_child == 0 || last_entry_child == 0)) - error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %ld, last time %ld, world time %ld).", - rrdhost_hostname(host), rrdset_id(st), - first_entry_child, last_entry_child, child_world_time); + time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec(); bool ok = true; if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { @@ -358,7 +356,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser; ok = replicate_chart_request(send_to_plugin, parser, host, st, - first_entry_child, last_entry_child, child_world_time, + first_entry_child, last_entry_child, child_wall_clock_time, 0, 0); } #ifdef NETDATA_LOG_REPLICATION_REQUESTS @@ -441,19 +439,20 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user) } else rrddim_isnot_obsolete(st, rd); + bool should_update_dimension = false; + if (likely(unhide_dimension)) { rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN); - if (rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) { - rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN); - metaqueue_dimension_update_flags(rd); - } + should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN); } else { rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN); - if (!rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) { - rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN); - metaqueue_dimension_update_flags(rd); - } + should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN); + } + + if (should_update_dimension) { + rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE); + rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); } return PARSER_RC_OK; @@ -529,7 +528,7 @@ static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __may } void inflight_functions_init(PARSER *parser) { - parser->inflight.functions = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + parser->inflight.functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE, &dictionary_stats_category_functions, 0); dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser); dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser); dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser); @@ -883,7 +882,7 @@ PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __may host->rrdlabels = rrdlabels_create(); rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels)); - metaqueue_store_host_labels(host->machine_guid); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE); rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels); ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL; @@ -991,7 +990,7 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) { if (unlikely(end_time - start_time != st->update_every)) - rrdset_set_update_every(st, end_time - start_time); + rrdset_set_update_every_s(st, end_time - start_time); st->last_collected_time.tv_sec = end_time; st->last_collected_time.tv_usec = 0; @@ -1124,6 +1123,9 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user) { + if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false) + return PARSER_RC_OK; + char *dimension = get_word(words, num_words, 1); char *last_collected_ut_str = get_word(words, num_words, 2); char *last_collected_value_str = get_word(words, num_words, 3); @@ -1156,6 +1158,9 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user) { + if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false) + return PARSER_RC_OK; + char *last_collected_ut_str = get_word(words, num_words, 1); char *last_updated_ut_str = get_word(words, num_words, 2); @@ -1238,7 +1243,8 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) time_t started = st->rrdhost->receiver->replication_first_time_t; time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time; - worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, + if(started && current > started) + worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, (NETDATA_DOUBLE)(current - started) * 100.0 / (NETDATA_DOUBLE)(now - started)); } @@ -1251,6 +1257,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) st->counter++; st->counter_done++; + store_metric_collection_completed(); #ifdef NETDATA_LOG_REPLICATION_REQUESTS st->replay.start_streaming = false; @@ -1262,7 +1269,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) if (start_streaming) { if (st->update_every != update_every_child) - rrdset_set_update_every(st, update_every_child); + rrdset_set_update_every_s(st, update_every_child); if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); @@ -1298,10 +1305,10 @@ static void pluginsd_process_thread_cleanup(void *ptr) { inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations) { - int enabled = cd->enabled; + int enabled = cd->unsafe.enabled; if (!fp_plugin_input || !fp_plugin_output || !enabled) { - cd->enabled = 0; + cd->unsafe.enabled = 0; return 0; } @@ -1321,7 +1328,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi clearerr(fp_plugin_output); PARSER_USER_OBJECT user = { - .enabled = cd->enabled, + .enabled = cd->unsafe.enabled, .host = host, .cd = cd, .trust_durations = trust_durations @@ -1339,14 +1346,14 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi user.parser = parser; while (likely(!parser_next(parser))) { - if (unlikely(netdata_exit || parser_action(parser, NULL))) + if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, NULL))) break; } // free parser with the pop function netdata_thread_cleanup_pop(1); - cd->enabled = user.enabled; + cd->unsafe.enabled = user.enabled; size_t count = user.count; if (likely(count)) { -- cgit v1.2.3