summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d/pluginsd_parser.c
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/plugins.d/pluginsd_parser.c')
-rw-r--r--collectors/plugins.d/pluginsd_parser.c63
1 files changed, 35 insertions, 28 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 5501c12fa..2c0f2cbc6 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -148,8 +148,11 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
((PARSER_USER_OBJECT *)user)->st = st;
usec_t microseconds = 0;
- if (microseconds_txt && *microseconds_txt)
- microseconds = str2ull(microseconds_txt);
+ if (microseconds_txt && *microseconds_txt) {
+ long long t = str2ll(microseconds_txt, NULL);
+ if(t >= 0)
+ microseconds = t;
+ }
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
if(st->replay.log_next_data_collection) {
@@ -326,7 +329,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
{
const char *first_entry_txt = get_word(words, num_words, 1);
const char *last_entry_txt = get_word(words, num_words, 2);
- const char *world_time_txt = get_word(words, num_words, 3);
+ const char *wall_clock_time_txt = get_word(words, num_words, 3);
RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
@@ -336,12 +339,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
- time_t child_world_time = (world_time_txt && *world_time_txt) ? (time_t)str2ul(world_time_txt) : now_realtime_sec();
-
- if((first_entry_child != 0 || last_entry_child != 0) && (first_entry_child == 0 || last_entry_child == 0))
- error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %ld, last time %ld, world time %ld).",
- rrdhost_hostname(host), rrdset_id(st),
- first_entry_child, last_entry_child, child_world_time);
+ time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec();
bool ok = true;
if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
@@ -358,7 +356,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser;
ok = replicate_chart_request(send_to_plugin, parser, host, st,
- first_entry_child, last_entry_child, child_world_time,
+ first_entry_child, last_entry_child, child_wall_clock_time,
0, 0);
}
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
@@ -441,19 +439,20 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
} else
rrddim_isnot_obsolete(st, rd);
+ bool should_update_dimension = false;
+
if (likely(unhide_dimension)) {
rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN);
- if (rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) {
- rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN);
- metaqueue_dimension_update_flags(rd);
- }
+ should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
}
else {
rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
- if (!rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) {
- rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN);
- metaqueue_dimension_update_flags(rd);
- }
+ should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
+ }
+
+ if (should_update_dimension) {
+ rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE);
+ rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
}
return PARSER_RC_OK;
@@ -529,7 +528,7 @@ static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __may
}
void inflight_functions_init(PARSER *parser) {
- parser->inflight.functions = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ parser->inflight.functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE, &dictionary_stats_category_functions, 0);
dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser);
dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser);
dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser);
@@ -883,7 +882,7 @@ PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __may
host->rrdlabels = rrdlabels_create();
rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels));
- metaqueue_store_host_labels(host->machine_guid);
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels);
((PARSER_USER_OBJECT *)user)->new_host_labels = NULL;
@@ -991,7 +990,7 @@ PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *use
if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
if (unlikely(end_time - start_time != st->update_every))
- rrdset_set_update_every(st, end_time - start_time);
+ rrdset_set_update_every_s(st, end_time - start_time);
st->last_collected_time.tv_sec = end_time;
st->last_collected_time.tv_usec = 0;
@@ -1124,6 +1123,9 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user)
{
+ if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false)
+ return PARSER_RC_OK;
+
char *dimension = get_word(words, num_words, 1);
char *last_collected_ut_str = get_word(words, num_words, 2);
char *last_collected_value_str = get_word(words, num_words, 3);
@@ -1156,6 +1158,9 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words
PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user)
{
+ if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false)
+ return PARSER_RC_OK;
+
char *last_collected_ut_str = get_word(words, num_words, 1);
char *last_updated_ut_str = get_word(words, num_words, 2);
@@ -1238,7 +1243,8 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
time_t started = st->rrdhost->receiver->replication_first_time_t;
time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time;
- worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
+ if(started && current > started)
+ worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
(NETDATA_DOUBLE)(current - started) * 100.0 / (NETDATA_DOUBLE)(now - started));
}
@@ -1251,6 +1257,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
st->counter++;
st->counter_done++;
+ store_metric_collection_completed();
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
st->replay.start_streaming = false;
@@ -1262,7 +1269,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
if (start_streaming) {
if (st->update_every != update_every_child)
- rrdset_set_update_every(st, update_every_child);
+ rrdset_set_update_every_s(st, update_every_child);
if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
@@ -1298,10 +1305,10 @@ static void pluginsd_process_thread_cleanup(void *ptr) {
inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
{
- int enabled = cd->enabled;
+ int enabled = cd->unsafe.enabled;
if (!fp_plugin_input || !fp_plugin_output || !enabled) {
- cd->enabled = 0;
+ cd->unsafe.enabled = 0;
return 0;
}
@@ -1321,7 +1328,7 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
clearerr(fp_plugin_output);
PARSER_USER_OBJECT user = {
- .enabled = cd->enabled,
+ .enabled = cd->unsafe.enabled,
.host = host,
.cd = cd,
.trust_durations = trust_durations
@@ -1339,14 +1346,14 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
user.parser = parser;
while (likely(!parser_next(parser))) {
- if (unlikely(netdata_exit || parser_action(parser, NULL)))
+ if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, NULL)))
break;
}
// free parser with the pop function
netdata_thread_cleanup_pop(1);
- cd->enabled = user.enabled;
+ cd->unsafe.enabled = user.enabled;
size_t count = user.count;
if (likely(count)) {