From 97e01009d69b8fbebfebf68f51e3d126d0ed43fc Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Wed, 30 Nov 2022 19:47:05 +0100 Subject: Merging upstream version 1.37.0. Signed-off-by: Daniel Baumann --- collectors/plugins.d/pluginsd_parser.c | 1476 ++++++++++++++++++++++---------- 1 file changed, 1010 insertions(+), 466 deletions(-) (limited to 'collectors/plugins.d/pluginsd_parser.c') diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 88e07fab7..5501c12fa 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -2,336 +2,223 @@ #include "pluginsd_parser.h" -/* - * This is the action defined for the FLUSH command - */ -PARSER_RC pluginsd_set_action(void *user, RRDSET *st, RRDDIM *rd, long long int value) -{ - UNUSED(user); - - rrddim_set_by_pointer(st, rd, value); - return PARSER_RC_OK; -} +#define LOG_FUNCTIONS false -PARSER_RC pluginsd_flush_action(void *user, RRDSET *st) -{ - UNUSED(user); - UNUSED(st); - return PARSER_RC_OK; -} +static int send_to_plugin(const char *txt, void *data) { + PARSER *parser = data; -PARSER_RC pluginsd_begin_action(void *user, RRDSET *st, usec_t microseconds, int trust_durations) -{ - UNUSED(user); - if (likely(st->counter_done)) { - if (likely(microseconds)) { - if (trust_durations) - rrdset_next_usec_unfiltered(st, microseconds); - else - rrdset_next_usec(st, microseconds); - } else - rrdset_next(st); - } - return PARSER_RC_OK; -} - - -PARSER_RC pluginsd_end_action(void *user, RRDSET *st) -{ - UNUSED(user); - - rrdset_done(st); - return PARSER_RC_OK; -} + if(!txt || !*txt) + return 0; -PARSER_RC pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, char *context, char *title, char *units, char *plugin, - char *module, int priority, int update_every, RRDSET_TYPE chart_type, char *options) -{ - RRDSET *st = NULL; - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; +#ifdef ENABLE_HTTPS + struct netdata_ssl *ssl = parser->ssl_output; + if(ssl) { + if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE) + return (int)netdata_ssl_write(ssl->conn, (void *)txt, strlen(txt)); - st = rrdset_create( - host, type, id, name, family, context, title, units, - plugin, module, priority, update_every, - chart_type); + error("PLUGINSD: cannot send command (SSL)"); + return -1; + } +#endif - if (options && *options) { - if (strstr(options, "obsolete")) - rrdset_is_obsolete(st); - else - rrdset_isnot_obsolete(st); + if(parser->fp_output) { + int bytes = fprintf(parser->fp_output, "%s", txt); + if(bytes <= 0) { + error("PLUGINSD: cannot send command (FILE)"); + return -2; + } + fflush(parser->fp_output); + return bytes; + } - if (strstr(options, "detail")) - rrdset_flag_set(st, RRDSET_FLAG_DETAIL); - else - rrdset_flag_clear(st, RRDSET_FLAG_DETAIL); + if(parser->fd != -1) { + size_t bytes = 0; + size_t total = strlen(txt); + ssize_t sent; - if (strstr(options, "hidden")) - rrdset_flag_set(st, RRDSET_FLAG_HIDDEN); - else - rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN); + do { + sent = write(parser->fd, &txt[bytes], total - bytes); + if(sent <= 0) { + error("PLUGINSD: cannot send command (fd)"); + return -3; + } + bytes += sent; + } + while(bytes < total); - if (strstr(options, "store_first")) - rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST); - else - rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); - } else { - rrdset_isnot_obsolete(st); - rrdset_flag_clear(st, RRDSET_FLAG_DETAIL); - rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); + return (int)bytes; } - ((PARSER_USER_OBJECT *)user)->st = st; - return PARSER_RC_OK; + error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)"); + return -4; } +static inline RRDHOST *pluginsd_require_host_from_parent(void *user, const char *cmd) { + RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; -PARSER_RC pluginsd_disable_action(void *user) -{ - UNUSED(user); + if(unlikely(!host)) + error("PLUGINSD: command %s requires a host, but is not set.", cmd); - info("called DISABLE. Disabling it."); - ((PARSER_USER_OBJECT *) user)->enabled = 0; - return PARSER_RC_ERROR; + return host; } +static inline RRDSET *pluginsd_require_chart_from_parent(void *user, const char *cmd, const char *parent_cmd) { + RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; -PARSER_RC pluginsd_variable_action(void *user, RRDHOST *host, RRDSET *st, char *name, int global, NETDATA_DOUBLE value) -{ - UNUSED(user); + if(unlikely(!st)) + error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd); - if (global) { - RRDVAR *rv = rrdvar_custom_host_variable_create(host, name); - if (rv) - rrdvar_custom_host_variable_set(host, rv, value); - else - error("cannot find/create HOST VARIABLE '%s' on host '%s'", name, host->hostname); - } else { - RRDSETVAR *rs = rrdsetvar_custom_chart_variable_create(st, name); - if (rs) - rrdsetvar_custom_chart_variable_set(rs, value); - else - error("cannot find/create CHART VARIABLE '%s' on host '%s', chart '%s'", name, host->hostname, st->id); - } - return PARSER_RC_OK; + return st; } - - -PARSER_RC pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm, long multiplier, long divisor, char *options, - RRD_ALGORITHM algorithm_type) -{ - UNUSED(user); - UNUSED(algorithm); - - RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, algorithm_type); - int unhide_dimension = 1; - - rrddim_flag_clear(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS); - if (options && *options) { - if (strstr(options, "obsolete") != NULL) - rrddim_is_obsolete(st, rd); - else - rrddim_isnot_obsolete(st, rd); - - unhide_dimension = !strstr(options, "hidden"); - - if (strstr(options, "noreset") != NULL) - rrddim_flag_set(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS); - if (strstr(options, "nooverflow") != NULL) - rrddim_flag_set(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS); - } else - rrddim_isnot_obsolete(st, rd); - - if (likely(unhide_dimension)) { - rrddim_flag_clear(rd, RRDDIM_FLAG_HIDDEN); - if (rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) { - (void)sql_set_dimension_option(&rd->metric_uuid, NULL); - rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN); - } - } else { - rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN); - if (!rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) { - (void)sql_set_dimension_option(&rd->metric_uuid, "hidden"); - rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN); - } +static inline RRDDIM_ACQUIRED *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) { + if (unlikely(!dimension || !*dimension)) { + error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.", + rrdhost_hostname(host), rrdset_id(st), cmd); + return NULL; } - return PARSER_RC_OK; -} - -PARSER_RC pluginsd_label_action(void *user, char *key, char *value, RRDLABEL_SRC source) -{ - - if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels)) - ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create(); - rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels, key, value, source); + RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension); - return PARSER_RC_OK; -} + if (unlikely(!rda)) + error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.", + rrdhost_hostname(host), rrdset_id(st), dimension, cmd); -PARSER_RC pluginsd_clabel_action(void *user, char *key, char *value, RRDLABEL_SRC source) -{ - if(unlikely(!((PARSER_USER_OBJECT *) user)->new_chart_labels)) - ((PARSER_USER_OBJECT *) user)->new_chart_labels = rrdlabels_create(); - - rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_chart_labels, key, value, source); - - return PARSER_RC_OK; + return rda; } -PARSER_RC pluginsd_clabel_commit_action(void *user, RRDHOST *host, DICTIONARY *new_chart_labels) -{ - RRDSET *st = ((PARSER_USER_OBJECT *)user)->st; - if (unlikely(!st)) { - error("requested CLABEL_COMMIT on host '%s', without a BEGIN, ignoring it.", host->hostname); - return PARSER_RC_OK; +static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) { + if (unlikely(!chart || !*chart)) { + error("PLUGINSD: 'host:%s' got a %s without a chart id.", + rrdhost_hostname(host), cmd); + return NULL; } - rrdset_update_rrdlabels(st, new_chart_labels); + RRDSET *st = rrdset_find(host, chart); + if (unlikely(!st)) + error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.", + rrdhost_hostname(host), chart, cmd); - return PARSER_RC_OK; + return st; } -PARSER_RC pluginsd_overwrite_action(void *user, RRDHOST *host, DICTIONARY *new_host_labels) -{ - UNUSED(user); - - if(!host->host_labels) - host->host_labels = rrdlabels_create(); - - rrdlabels_migrate_to_these(host->host_labels, new_host_labels); - sql_store_host_labels(host); - - return PARSER_RC_OK; +static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user) { + ((PARSER_USER_OBJECT *) user)->enabled = 0; + return PARSER_RC_ERROR; } -PARSER_RC pluginsd_set(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_set(char **words, size_t num_words, void *user) { - char *dimension = words[1]; - char *value = words[2]; + char *dimension = get_word(words, num_words, 1); + char *value = get_word(words, num_words, 2); - RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); - if (unlikely(!dimension || !*dimension)) { - error("requested a SET on chart '%s' of host '%s', without a dimension. Disabling it.", st->id, host->hostname); - goto disable; - } + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); - if (unlikely(!value || !*value)) - value = NULL; + RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET); + if(!rda) return PLUGINSD_DISABLE_PLUGIN(user); - if (unlikely(!st)) { - error( - "requested a SET on dimension %s with value %s on host '%s', without a BEGIN. Disabling it.", dimension, - value ? value : "", host->hostname); - goto disable; - } + RRDDIM *rd = rrddim_acquired_to_rrddim(rda); if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - debug(D_PLUGINSD, "is setting dimension %s/%s to %s", st->id, dimension, value ? value : ""); - - if (value) { - RRDDIM *rd = rrddim_find(st, dimension); - if (unlikely(!rd)) { - error( - "requested a SET to dimension with id '%s' on stats '%s' (%s) on host '%s', which does not exist. Disabling it.", - dimension, st->name, st->id, st->rrdhost->hostname); - goto disable; - } else { - if (plugins_action->set_action) { - return plugins_action->set_action( - user, st, rd, strtoll(value, NULL, 0)); - } - } - } - return PARSER_RC_OK; + debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'", + rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET"); -disable: - ((PARSER_USER_OBJECT *) user)->enabled = 0; - return PARSER_RC_ERROR; + if (value && *value) + rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0)); + + rrddim_acquired_release(rda); + return PARSER_RC_OK; } -PARSER_RC pluginsd_begin(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user) { - char *id = words[1]; - char *microseconds_txt = words[2]; + char *id = get_word(words, num_words, 1); + char *microseconds_txt = get_word(words, num_words, 2); - RRDSET *st = NULL; - RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host; + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); - if (unlikely(!id)) { - error("requested a BEGIN without a chart id for host '%s'. Disabling it.", host->hostname); - goto disable; - } + RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); - st = rrdset_find(host, id); - if (unlikely(!st)) { - error("requested a BEGIN on chart '%s', which does not exist on host '%s'. Disabling it.", id, host->hostname); - goto disable; - } ((PARSER_USER_OBJECT *)user)->st = st; usec_t microseconds = 0; if (microseconds_txt && *microseconds_txt) microseconds = str2ull(microseconds_txt); - if (plugins_action->begin_action) { - return plugins_action->begin_action(user, st, microseconds, - ((PARSER_USER_OBJECT *)user)->trust_durations); +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + if(st->replay.log_next_data_collection) { + st->replay.log_next_data_collection = false; + + internal_error(true, + "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu", + rrdhost_hostname(host), rrdset_id(st), + st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec, + st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec, + microseconds + ); + } +#endif + + if (likely(st->counter_done)) { + if (likely(microseconds)) { + if (((PARSER_USER_OBJECT *)user)->trust_durations) + rrdset_next_usec_unfiltered(st, microseconds); + else + rrdset_next_usec(st, microseconds); + } + else + rrdset_next(st); } return PARSER_RC_OK; -disable: - ((PARSER_USER_OBJECT *)user)->enabled = 0; - return PARSER_RC_ERROR; } -PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_end(char **words, size_t num_words, void *user) { UNUSED(words); - RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; + UNUSED(num_words); - if (unlikely(!st)) { - error("requested an END, without a BEGIN on host '%s'. Disabling it.", host->hostname); - ((PARSER_USER_OBJECT *) user)->enabled = 0; - return PARSER_RC_ERROR; - } + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - debug(D_PLUGINSD, "requested an END on chart %s", st->id); + debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st)); ((PARSER_USER_OBJECT *) user)->st = NULL; ((PARSER_USER_OBJECT *) user)->count++; - if (plugins_action->end_action) { - return plugins_action->end_action(user, st); - } + + struct timeval now; + now_realtime_timeval(&now); + rrdset_timed_done(st, now, /* pending_rrdset_next = */ false); + return PARSER_RC_OK; } -PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) { - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; - if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) { - debug(D_PLUGINSD, "Ignoring chart belonging to missing or ignored host."); - return PARSER_RC_OK; - } - - char *type = words[1]; - char *name = words[2]; - char *title = words[3]; - char *units = words[4]; - char *family = words[5]; - char *context = words[6]; - char *chart = words[7]; - char *priority_s = words[8]; - char *update_every_s = words[9]; - char *options = words[10]; - char *plugin = words[11]; - char *module = words[12]; - - int have_action = ((plugins_action->chart_action) != NULL); + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + char *type = get_word(words, num_words, 1); + char *name = get_word(words, num_words, 2); + char *title = get_word(words, num_words, 3); + char *units = get_word(words, num_words, 4); + char *family = get_word(words, num_words, 5); + char *context = get_word(words, num_words, 6); + char *chart = get_word(words, num_words, 7); + char *priority_s = get_word(words, num_words, 8); + char *update_every_s = get_word(words, num_words, 9); + char *options = get_word(words, num_words, 10); + char *plugin = get_word(words, num_words, 11); + char *module = get_word(words, num_words, 12); // parse the id from type char *id = NULL; @@ -342,10 +229,9 @@ PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_act // make sure we have the required variables if (unlikely((!type || !*type || !id || !*id))) { - if (likely(host)) - error("requested a CHART, without a type.id, on host '%s'. Disabling it.", host->hostname); - else - error("requested a CHART, without a type.id. Disabling it."); + error("PLUGINSD: 'host:%s' requested a CHART, without a type.id. Disabling it.", + rrdhost_hostname(host)); + ((PARSER_USER_OBJECT *) user)->enabled = 0; return PARSER_RC_ERROR; } @@ -396,42 +282,120 @@ PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_act type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type), priority, update_every); - if (have_action) { - return plugins_action->chart_action( - user, type, id, name, family, context, title, units, - (plugin && *plugin) ? plugin : ((PARSER_USER_OBJECT *)user)->cd->filename, module, priority, update_every, - chart_type, options); + RRDSET *st = NULL; + + st = rrdset_create( + host, type, id, name, family, context, title, units, + (plugin && *plugin) ? plugin : ((PARSER_USER_OBJECT *)user)->cd->filename, + module, priority, update_every, + chart_type); + + if (likely(st)) { + if (options && *options) { + if (strstr(options, "obsolete")) + rrdset_is_obsolete(st); + else + rrdset_isnot_obsolete(st); + + if (strstr(options, "detail")) + rrdset_flag_set(st, RRDSET_FLAG_DETAIL); + else + rrdset_flag_clear(st, RRDSET_FLAG_DETAIL); + + if (strstr(options, "hidden")) + rrdset_flag_set(st, RRDSET_FLAG_HIDDEN); + else + rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN); + + if (strstr(options, "store_first")) + rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST); + else + rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); + } else { + rrdset_isnot_obsolete(st); + rrdset_flag_clear(st, RRDSET_FLAG_DETAIL); + rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); + } } + ((PARSER_USER_OBJECT *)user)->st = st; return PARSER_RC_OK; } -PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user) { - char *id = words[1]; - char *name = words[2]; - char *algorithm = words[3]; - char *multiplier_s = words[4]; - char *divisor_s = words[5]; - char *options = words[6]; - - RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; - if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) { - debug(D_PLUGINSD, "Ignoring dimension belonging to missing or ignored host."); - return PARSER_RC_OK; + const char *first_entry_txt = get_word(words, num_words, 1); + const char *last_entry_txt = get_word(words, num_words, 2); + const char *world_time_txt = get_word(words, num_words, 3); + + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + + time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0; + time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0; + time_t child_world_time = (world_time_txt && *world_time_txt) ? (time_t)str2ul(world_time_txt) : now_realtime_sec(); + + if((first_entry_child != 0 || last_entry_child != 0) && (first_entry_child == 0 || last_entry_child == 0)) + error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %ld, last time %ld, world time %ld).", + rrdhost_hostname(host), rrdset_id(st), + first_entry_child, last_entry_child, child_world_time); + + bool ok = true; + if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + st->replay.start_streaming = false; + st->replay.after = 0; + st->replay.before = 0; +#endif + + rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + rrdhost_receiver_replicating_charts_plus_one(st->rrdhost); + + PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser; + ok = replicate_chart_request(send_to_plugin, parser, host, st, + first_entry_child, last_entry_child, child_world_time, + 0, 0); + } +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + else { + internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); } +#endif + + return ok ? PARSER_RC_OK : PARSER_RC_ERROR; +} + +PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user) +{ + char *id = get_word(words, num_words, 1); + char *name = get_word(words, num_words, 2); + char *algorithm = get_word(words, num_words, 3); + char *multiplier_s = get_word(words, num_words, 4); + char *divisor_s = get_word(words, num_words, 5); + char *options = get_word(words, num_words, 6); + + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_DIMENSION); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); if (unlikely(!id)) { - error( - "requested a DIMENSION, without an id, host '%s' and chart '%s'. Disabling it.", host->hostname, - st ? st->id : "UNSET"); - goto disable; + error("PLUGINSD: 'host:%s/chart:%s' got a DIMENSION, without an id. Disabling it.", + rrdhost_hostname(host), st ? rrdset_id(st) : "UNSET"); + return PLUGINSD_DISABLE_PLUGIN(user); } if (unlikely(!st && !((PARSER_USER_OBJECT *) user)->st_exists)) { - error("requested a DIMENSION, without a CHART, on host '%s'. Disabling it.", host->hostname); - goto disable; + error("PLUGINSD: 'host:%s' got a DIMENSION, without a CHART. Disabling it.", + rrdhost_hostname(host)); + return PLUGINSD_DISABLE_PLUGIN(user); } long multiplier = 1; @@ -455,316 +419,906 @@ PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins debug( D_PLUGINSD, "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'", - st->id, id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor, + rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor, options ? options : ""); - if (plugins_action->dimension_action) { - return plugins_action->dimension_action( - user, st, id, name, algorithm, - multiplier, divisor, (options && *options)?options:NULL, rrd_algorithm_id(algorithm)); + RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm)); + int unhide_dimension = 1; + + rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); + if (options && *options) { + if (strstr(options, "obsolete") != NULL) + rrddim_is_obsolete(st, rd); + else + rrddim_isnot_obsolete(st, rd); + + unhide_dimension = !strstr(options, "hidden"); + + if (strstr(options, "noreset") != NULL) + rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); + if (strstr(options, "nooverflow") != NULL) + rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); + } else + rrddim_isnot_obsolete(st, rd); + + if (likely(unhide_dimension)) { + rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN); + if (rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) { + rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN); + metaqueue_dimension_update_flags(rd); + } + } + else { + rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN); + if (!rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) { + rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN); + metaqueue_dimension_update_flags(rd); + } + } + + return PARSER_RC_OK; +} + +// ---------------------------------------------------------------------------- +// execution of functions + +struct inflight_function { + int code; + int timeout; + BUFFER *destination_wb; + STRING *function; + void (*callback)(BUFFER *wb, int code, void *callback_data); + void *callback_data; + usec_t timeout_ut; + usec_t started_ut; + usec_t sent_ut; +}; + +static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) { + struct inflight_function *pf = func; + + PARSER *parser = parser_ptr; + + // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller + pf->code = HTTP_RESP_GATEWAY_TIMEOUT; + + char buffer[2048 + 1]; + snprintfz(buffer, 2048, "FUNCTION %s %d \"%s\"\n", + dictionary_acquired_item_name(item), + pf->timeout, + string2str(pf->function)); + + // send the command to the plugin + int ret = send_to_plugin(buffer, parser); + + pf->sent_ut = now_realtime_usec(); + + if(ret < 0) { + error("FUNCTION: failed to send function to plugin, error %d", ret); + rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED); + } + else { + internal_error(LOG_FUNCTIONS, + "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)", + string2str(pf->function), dictionary_acquired_item_name(item), ret, + pf->sent_ut - pf->started_ut); + } +} + +static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) { + struct inflight_function *pf = new_func; + + error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function)); + pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST); + pf->callback(pf->destination_wb, pf->code, pf->callback_data); + string_freez(pf->function); + + return false; +} + +static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) { + struct inflight_function *pf = func; + + internal_error(LOG_FUNCTIONS, + "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %llu usec, response %llu usec)", + string2str(pf->function), dictionary_acquired_item_name(item), + buffer_strlen(pf->destination_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut); + + pf->callback(pf->destination_wb, pf->code, pf->callback_data); + string_freez(pf->function); +} + +void inflight_functions_init(PARSER *parser) { + parser->inflight.functions = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser); + dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser); + dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser); +} + +static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) { + parser->inflight.smaller_timeout = 0; + struct inflight_function *pf; + dfe_start_write(parser->inflight.functions, pf) { + if (pf->timeout_ut < now) { + internal_error(true, + "FUNCTION '%s' removing expired transaction '%s', after %llu usec.", + string2str(pf->function), pf_dfe.name, now - pf->started_ut); + + if(!buffer_strlen(pf->destination_wb) || pf->code == HTTP_RESP_OK) + pf->code = rrd_call_function_error(pf->destination_wb, + "Timeout waiting for collector response.", + HTTP_RESP_GATEWAY_TIMEOUT); + + dictionary_del(parser->inflight.functions, pf_dfe.name); + } + + else if(!parser->inflight.smaller_timeout || pf->timeout_ut < parser->inflight.smaller_timeout) + parser->inflight.smaller_timeout = pf->timeout_ut; + } + dfe_done(pf); +} + +// this is the function that is called from +// rrd_call_function_and_wait() and rrd_call_function_async() +static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeout, const char *function, void *collector_data, void (*callback)(BUFFER *wb, int code, void *callback_data), void *callback_data) { + PARSER *parser = collector_data; + + usec_t now = now_realtime_usec(); + + struct inflight_function tmp = { + .started_ut = now, + .timeout_ut = now + timeout * USEC_PER_SEC, + .destination_wb = destination_wb, + .timeout = timeout, + .function = string_strdupz(function), + .callback = callback, + .callback_data = callback_data, + }; + + uuid_t uuid; + uuid_generate_time(uuid); + + char key[UUID_STR_LEN]; + uuid_unparse_lower(uuid, key); + + dictionary_write_lock(parser->inflight.functions); + + // if there is any error, our dictionary callbacks will call the caller callback to notify + // the caller about the error - no need for error handling here. + dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function)); + + if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout) + parser->inflight.smaller_timeout = tmp.timeout_ut; + + // garbage collect stale inflight functions + if(parser->inflight.smaller_timeout < now) + inflight_functions_garbage_collect(parser, now); + + dictionary_write_unlock(parser->inflight.functions); + + return HTTP_RESP_OK; +} + +PARSER_RC pluginsd_function(char **words, size_t num_words, void *user) +{ + bool global = false; + size_t i = 1; + if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) { + i++; + global = true; + } + + char *name = get_word(words, num_words, i++); + char *timeout_s = get_word(words, num_words, i++); + char *help = get_word(words, num_words, i++); + + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_FUNCTION); + if(!host) return PARSER_RC_ERROR; + + RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART); + if(!st) global = true; + + if (unlikely(!timeout_s || !name || !help || (!global && !st))) { + error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.", + rrdhost_hostname(host), + st?rrdset_id(st):"(unset)", + global?"yes":"no", + name?name:"(unset)", + timeout_s?timeout_s:"(unset)", + help?help:"(unset)" + ); + return PARSER_RC_ERROR; + } + + int timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; + if (timeout_s && *timeout_s) { + timeout = str2i(timeout_s); + if (unlikely(timeout <= 0)) + timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; } + PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser; + rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser); + + return PARSER_RC_OK; +} + +static void pluginsd_function_result_end(struct parser *parser, void *action_data) { + STRING *key = action_data; + if(key) + dictionary_del(parser->inflight.functions, string2str(key)); + string_freez(key); +} + +PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user) +{ + char *key = get_word(words, num_words, 1); + char *status = get_word(words, num_words, 2); + char *format = get_word(words, num_words, 3); + char *expires = get_word(words, num_words, 4); + + if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) { + error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')." + , key ? key : "(unset)" + , status ? status : "(unset)" + , format ? format : "(unset)" + , expires ? expires : "(unset)" + ); + } + + int code = (status && *status) ? str2i(status) : 0; + if (code <= 0) + code = HTTP_RESP_BACKEND_RESPONSE_INVALID; + + time_t expiration = (expires && *expires) ? str2l(expires) : 0; + + PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser; + + struct inflight_function *pf = NULL; + + if(key && *key) + pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key); + + if(!pf) { + error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)"); + } + else { + if(format && *format) + pf->destination_wb->contenttype = functions_format_to_content_type(format); + + pf->code = code; + + pf->destination_wb->expires = expiration; + if(expiration <= now_realtime_sec()) + buffer_no_cacheable(pf->destination_wb); + else + buffer_cacheable(pf->destination_wb); + } + + parser->defer.response = (pf) ? pf->destination_wb : NULL; + parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END; + parser->defer.action = pluginsd_function_result_end; + parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL + parser->flags |= PARSER_DEFER_UNTIL_KEYWORD; + return PARSER_RC_OK; -disable: - ((PARSER_USER_OBJECT *)user)->enabled = 0; - return PARSER_RC_ERROR; } -PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION *plugins_action) +// ---------------------------------------------------------------------------- + +PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) { - char *name = words[1]; - char *value = words[2]; + char *name = get_word(words, num_words, 1); + char *value = get_word(words, num_words, 2); NETDATA_DOUBLE v; + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_VARIABLE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; int global = (st) ? 0 : 1; if (name && *name) { if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) { global = 1; - name = words[2]; - value = words[3]; + name = get_word(words, num_words, 2); + value = get_word(words, num_words, 3); } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) { global = 0; - name = words[2]; - value = words[3]; + name = get_word(words, num_words, 2); + value = get_word(words, num_words, 3); } } if (unlikely(!name || !*name)) { - error("requested a VARIABLE on host '%s', without a variable name. Disabling it.", host->hostname); + error("PLUGINSD: 'host:%s/chart:%s' got a VARIABLE without a variable name. Disabling it.", + rrdhost_hostname(host), st ? rrdset_id(st):"UNSET"); + ((PARSER_USER_OBJECT *)user)->enabled = 0; - return PARSER_RC_ERROR; + return PLUGINSD_DISABLE_PLUGIN(user); } if (unlikely(!value || !*value)) value = NULL; if (unlikely(!value)) { - error("cannot set %s VARIABLE '%s' on host '%s' to an empty value", (global) ? "HOST" : "CHART", name, - host->hostname); + error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value", + rrdhost_hostname(host), + st ? rrdset_id(st):"UNSET", + (global) ? "HOST" : "CHART", + name); return PARSER_RC_OK; } if (!global && !st) { - error("cannot find/create CHART VARIABLE '%s' on host '%s' without a chart", name, host->hostname); - return PARSER_RC_OK; + error("PLUGINSD: 'host:%s/chart:%s' cannot update CHART VARIABLE '%s' without a chart", + rrdhost_hostname(host), + st ? rrdset_id(st):"UNSET", + name + ); + return PLUGINSD_DISABLE_PLUGIN(user); } char *endptr = NULL; v = (NETDATA_DOUBLE)str2ndd(value, &endptr); if (unlikely(endptr && *endptr)) { if (endptr == value) - error( - "the value '%s' of VARIABLE '%s' on host '%s' cannot be parsed as a number", value, name, - host->hostname); + error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number", + rrdhost_hostname(host), + st ? rrdset_id(st):"UNSET", + value, + name); else - error( - "the value '%s' of VARIABLE '%s' on host '%s' has leftovers: '%s'", value, name, host->hostname, - endptr); + error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'", + rrdhost_hostname(host), + st ? rrdset_id(st):"UNSET", + value, + name, + endptr); } - if (plugins_action->variable_action) { - return plugins_action->variable_action(user, host, st, name, global, v); + if (global) { + const RRDVAR_ACQUIRED *rva = rrdvar_custom_host_variable_add_and_acquire(host, name); + if (rva) { + rrdvar_custom_host_variable_set(host, rva, v); + rrdvar_custom_host_variable_release(host, rva); + } + else + error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'", + rrdhost_hostname(host), + name); + } else { + const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name); + if (rsa) { + rrdsetvar_custom_chart_variable_set(st, rsa, v); + rrdsetvar_custom_chart_variable_release(st, rsa); + } + else + error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'", + rrdhost_hostname(host), rrdset_id(st), name); } return PARSER_RC_OK; } -PARSER_RC pluginsd_flush(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { - UNUSED(words); debug(D_PLUGINSD, "requested a FLUSH"); - RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; ((PARSER_USER_OBJECT *) user)->st = NULL; - if (plugins_action->flush_action) { - return plugins_action->flush_action(user, st); - } + ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; return PARSER_RC_OK; } -PARSER_RC pluginsd_disable(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused) { - UNUSED(user); - UNUSED(words); - - if (plugins_action->disable_action) { - return plugins_action->disable_action(user); - } + info("PLUGINSD: plugin called DISABLE. Disabling it."); + ((PARSER_USER_OBJECT *) user)->enabled = 0; return PARSER_RC_ERROR; } -PARSER_RC pluginsd_label(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_label(char **words, size_t num_words, void *user) { - char *store; + const char *name = get_word(words, num_words, 1); + const char *label_source = get_word(words, num_words, 2); + const char *value = get_word(words, num_words, 3); - if (!words[1] || !words[2] || !words[3]) { - error("Ignoring malformed or empty LABEL command."); - return PARSER_RC_OK; + if (!name || !label_source || !value) { + error("PLUGINSD: ignoring malformed or empty LABEL command."); + return PLUGINSD_DISABLE_PLUGIN(user); } - if (!words[4]) - store = words[3]; - else { - store = callocz(PLUGINSD_LINE_MAX + 1, sizeof(char)); + + char *store = (char *)value; + bool allocated_store = false; + + if(unlikely(num_words > 4)) { + allocated_store = true; + store = mallocz(PLUGINSD_LINE_MAX + 1); size_t remaining = PLUGINSD_LINE_MAX; char *move = store; - int i = 3; - while (i < PLUGINSD_MAX_WORDS) { - size_t length = strlen(words[i]); - if ((length + 1) >= remaining) - break; - - remaining -= (length + 1); - memcpy(move, words[i], length); - move += length; - *move++ = ' '; + char *word; + for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) { + if(i > 3) { + *move++ = ' '; + *move = '\0'; + remaining--; + } + + size_t length = strlen(word); + if (length > remaining) + length = remaining; - i++; - if (!words[i]) - break; + remaining -= length; + memcpy(move, word, length); + move += length; + *move = '\0'; } } - if (plugins_action->label_action) { - PARSER_RC rc = plugins_action->label_action(user, words[1], store, strtol(words[2], NULL, 10)); - if (store != words[3]) - freez(store); - return rc; - } + if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels)) + ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create(); + + rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels, + name, + store, + str2l(label_source)); - if (store != words[3]) + if (allocated_store) freez(store); + + return PARSER_RC_OK; +} + +PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) +{ + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_OVERWRITE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + debug(D_PLUGINSD, "requested to OVERWRITE host labels"); + + if(unlikely(!host->rrdlabels)) + host->rrdlabels = rrdlabels_create(); + + rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels)); + metaqueue_store_host_labels(host->machine_guid); + + rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels); + ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL; return PARSER_RC_OK; } -PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION *plugins_action) + +PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user) { - if (!words[1] || !words[2] || !words[3]) { + const char *name = get_word(words, num_words, 1); + const char *value = get_word(words, num_words, 2); + const char *label_source = get_word(words, num_words, 3); + + if (!name || !value || !*label_source) { error("Ignoring malformed or empty CHART LABEL command."); - return PARSER_RC_OK; + return PLUGINSD_DISABLE_PLUGIN(user); } - if (plugins_action->clabel_action) { - PARSER_RC rc = plugins_action->clabel_action(user, words[1], words[2], strtol(words[3], NULL, 10)); - return rc; + if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) { + ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = ((PARSER_USER_OBJECT *)user)->st->rrdlabels; + rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily); } + rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily, + name, value, str2l(label_source)); + return PARSER_RC_OK; } -PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { - UNUSED(words); + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; debug(D_PLUGINSD, "requested to commit chart labels"); - PARSER_RC rc = PARSER_RC_OK; + if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) { + error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", + rrdhost_hostname(host)); + return PLUGINSD_DISABLE_PLUGIN(user); + } - if (plugins_action->clabel_commit_action) - rc = plugins_action->clabel_commit_action(user, host, ((PARSER_USER_OBJECT *)user)->new_chart_labels); + rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily); - rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_chart_labels); - ((PARSER_USER_OBJECT *)user)->new_chart_labels = NULL; + rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE); + rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); - return rc; + ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = NULL; + return PARSER_RC_OK; } -PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user) { - UNUSED(words); + char *id = get_word(words, num_words, 1); + char *start_time_str = get_word(words, num_words, 2); + char *end_time_str = get_word(words, num_words, 3); + char *child_now_str = get_word(words, num_words, 4); - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; - debug(D_PLUGINSD, "requested to OVERWRITE host labels"); + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); - PARSER_RC rc = PARSER_RC_OK; + RRDSET *st; + if (likely(!id || !*id)) + st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN); + else + st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if (plugins_action->overwrite_action) - rc = plugins_action->overwrite_action(user, host, ((PARSER_USER_OBJECT *)user)->new_host_labels); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + ((PARSER_USER_OBJECT *) user)->st = st; - rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels); - ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL; + if(start_time_str && end_time_str) { + time_t start_time = (time_t)str2ul(start_time_str); + time_t end_time = (time_t)str2ul(end_time_str); - return rc; -} + time_t wall_clock_time = 0, tolerance; + bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child; + if(child_now_str) { + wall_clock_time = (time_t)str2ul(child_now_str); + tolerance = st->update_every + 1; + wall_clock_comes_from_child = true; + } -PARSER_RC pluginsd_guid(char **words, void *user, PLUGINSD_ACTION *plugins_action) -{ - char *uuid_str = words[1]; - uuid_t uuid; + if(wall_clock_time <= 0) { + wall_clock_time = now_realtime_sec(); + tolerance = st->update_every + 5; + wall_clock_comes_from_child = false; + } - if (unlikely(!uuid_str)) { - error("requested a GUID, without a uuid."); - return PARSER_RC_ERROR; - } - if (unlikely(strlen(uuid_str) != GUID_LEN || uuid_parse(uuid_str, uuid) == -1)) { - error("requested a GUID, without a valid uuid string."); - return PARSER_RC_ERROR; - } +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error( + (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)), + "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).", + rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before); + + internal_error( + true, + "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, child wall clock is %ld (%s), had requested %ld to %ld", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "from child" : "parent time", + st->replay.after, st->replay.before); +#endif + + if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) { + if (unlikely(end_time - start_time != st->update_every)) + rrdset_set_update_every(st, end_time - start_time); + + st->last_collected_time.tv_sec = end_time; + st->last_collected_time.tv_usec = 0; + + st->last_updated.tv_sec = end_time; + st->last_updated.tv_usec = 0; + + st->counter++; + st->counter_done++; + + // these are only needed for db mode RAM, SAVE, MAP, ALLOC + st->current_entry++; + if(st->current_entry >= st->entries) + st->current_entry -= st->entries; + + ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time; + ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC; + ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = wall_clock_time; + ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = true; + + return PARSER_RC_OK; + } - debug(D_PLUGINSD, "Parsed uuid=%s", uuid_str); - if (plugins_action->guid_action) { - return plugins_action->guid_action(user, &uuid); + error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET, + rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, + wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance); } + // the child sends an RBEGIN without any parameters initially + // setting rset_enabled to false, means the RSET should not store any metrics + // to store metrics, the RBEGIN needs to have timestamps + ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false; return PARSER_RC_OK; } -PARSER_RC pluginsd_context(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) { - char *uuid_str = words[1]; - uuid_t uuid; + char *dimension = get_word(words, num_words, 1); + char *value_str = get_word(words, num_words, 2); + char *flags_str = get_word(words, num_words, 3); - if (unlikely(!uuid_str)) { - error("requested a CONTEXT, without a uuid."); - return PARSER_RC_ERROR; + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + + if(!((PARSER_USER_OBJECT *) user)->replay.rset_enabled) { + error_limit_static_thread_var(erl, 1, 0); + error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " but it is disabled by " PLUGINSD_KEYWORD_REPLAY_BEGIN " errors", + rrdhost_hostname(host), rrdset_id(st)); + + // we have to return OK here + return PARSER_RC_OK; } - if (unlikely(strlen(uuid_str) != GUID_LEN || uuid_parse(uuid_str, uuid) == -1)) { - error("requested a CONTEXT, without a valid uuid string."); - return PARSER_RC_ERROR; + + RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET); + if(!rda) return PLUGINSD_DISABLE_PLUGIN(user); + + if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) { + error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " with invalid timestamps %ld to %ld from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.", + rrdhost_hostname(host), + rrdset_id(st), + dimension, + ((PARSER_USER_OBJECT *) user)->replay.start_time, + ((PARSER_USER_OBJECT *) user)->replay.end_time); + return PLUGINSD_DISABLE_PLUGIN(user); + } + + if (unlikely(!value_str || !*value_str)) + value_str = "NAN"; + + if(unlikely(!flags_str)) + flags_str = ""; + + if (likely(value_str)) { + RRDDIM *rd = rrddim_acquired_to_rrddim(rda); + + RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED); + + if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) { + NETDATA_DOUBLE value = strtondd(value_str, NULL); + SN_FLAGS flags = SN_FLAG_NONE; + + char c; + while ((c = *flags_str++)) { + switch (c) { + case 'R': + flags |= SN_FLAG_RESET; + break; + + case 'E': + flags |= SN_EMPTY_SLOT; + value = NAN; + break; + + default: + error("unknown flag '%c'", c); + break; + } + } + + if (!netdata_double_isnumber(value)) { + value = NAN; + flags = SN_EMPTY_SLOT; + } + + rrddim_store_metric(rd, ((PARSER_USER_OBJECT *) user)->replay.end_time_ut, value, flags); + rd->last_collected_time.tv_sec = ((PARSER_USER_OBJECT *) user)->replay.end_time; + rd->last_collected_time.tv_usec = 0; + rd->collections_counter++; + } + else { + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.", + rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd)); + } } - debug(D_PLUGINSD, "Parsed uuid=%s", uuid_str); - if (plugins_action->context_action) { - return plugins_action->context_action(user, &uuid); + rrddim_acquired_release(rda); + return PARSER_RC_OK; +} + +PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user) +{ + char *dimension = get_word(words, num_words, 1); + char *last_collected_ut_str = get_word(words, num_words, 2); + char *last_collected_value_str = get_word(words, num_words, 3); + char *last_calculated_value_str = get_word(words, num_words, 4); + char *last_stored_value_str = get_word(words, num_words, 5); + + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); + if(!rda) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDDIM *rd = rrddim_acquired_to_rrddim(rda); + usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec; + usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0; + if(last_collected_ut > dim_last_collected_ut) { + rd->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC; + rd->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC; } + rd->last_collected_value = last_collected_value_str ? str2ll(last_collected_value_str, NULL) : 0; + rd->last_calculated_value = last_calculated_value_str ? str2ndd(last_calculated_value_str, NULL) : 0; + rd->last_stored_value = last_stored_value_str ? str2ndd(last_stored_value_str, NULL) : 0.0; + rrddim_acquired_release(rda); return PARSER_RC_OK; } -PARSER_RC pluginsd_tombstone(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user) { - char *uuid_str = words[1]; - uuid_t uuid; + char *last_collected_ut_str = get_word(words, num_words, 1); + char *last_updated_ut_str = get_word(words, num_words, 2); - if (unlikely(!uuid_str)) { - error("requested a TOMBSTONE, without a uuid."); - return PARSER_RC_ERROR; - } - if (unlikely(strlen(uuid_str) != GUID_LEN || uuid_parse(uuid_str, uuid) == -1)) { - error("requested a TOMBSTONE, without a valid uuid string."); - return PARSER_RC_ERROR; + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + + usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec; + usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0; + if(last_collected_ut > chart_last_collected_ut) { + st->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC; + st->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC; } - debug(D_PLUGINSD, "Parsed uuid=%s", uuid_str); - if (plugins_action->tombstone_action) { - return plugins_action->tombstone_action(user, &uuid); + usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec; + usec_t last_updated_ut = last_updated_ut_str ? str2ull(last_updated_ut_str) : 0; + if(last_updated_ut > chart_last_updated_ut) { + st->last_updated.tv_sec = last_updated_ut / USEC_PER_SEC; + st->last_updated.tv_usec = last_updated_ut % USEC_PER_SEC; } + st->counter++; + st->counter_done++; + return PARSER_RC_OK; } -PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plugins_action) +PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) { - char *machine_guid = words[1]; - char *hostname = words[2]; - char *registry_hostname = words[3]; - char *update_every_s = words[4]; - char *os = words[5]; - char *timezone = words[6]; - char *tags = words[7]; - - int update_every = 1; - if (likely(update_every_s && *update_every_s)) - update_every = str2i(update_every_s); - if (unlikely(!update_every)) - update_every = 1; + if (num_words < 7) { // accepts 7, but the 7th is optional + error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command"); + return PARSER_RC_ERROR; + } - debug(D_PLUGINSD, "HOST PARSED: guid=%s, hostname=%s, reg_host=%s, update=%d, os=%s, timezone=%s, tags=%s", - machine_guid, hostname, registry_hostname, update_every, os, timezone, tags); + const char *update_every_child_txt = get_word(words, num_words, 1); + const char *first_entry_child_txt = get_word(words, num_words, 2); + const char *last_entry_child_txt = get_word(words, num_words, 3); + const char *start_streaming_txt = get_word(words, num_words, 4); + const char *first_entry_requested_txt = get_word(words, num_words, 5); + const char *last_entry_requested_txt = get_word(words, num_words, 6); + const char *child_world_time_txt = get_word(words, num_words, 7); // optional - if (plugins_action->host_action) { - return plugins_action->host_action( - user, machine_guid, hostname, registry_hostname, update_every, os, timezone, tags); + time_t update_every_child = (time_t)str2ul(update_every_child_txt); + time_t first_entry_child = (time_t)str2ul(first_entry_child_txt); + time_t last_entry_child = (time_t)str2ul(last_entry_child_txt); + + bool start_streaming = (strcmp(start_streaming_txt, "true") == 0); + time_t first_entry_requested = (time_t)str2ul(first_entry_requested_txt); + time_t last_entry_requested = (time_t)str2ul(last_entry_requested_txt); + + // the optional child world time + time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t)str2ul(child_world_time_txt) : now_realtime_sec(); + + PARSER_USER_OBJECT *user_object = user; + + RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END); + if(!host) return PLUGINSD_DISABLE_PLUGIN(user); + + RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(user); + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + internal_error(true, + "PLUGINSD REPLAY: 'host:%s/chart:%s': got a " PLUGINSD_KEYWORD_REPLAY_END " child db from %llu to %llu, start_streaming %s, had requested from %llu to %llu, wall clock %llu", + rrdhost_hostname(host), rrdset_id(st), + (unsigned long long)first_entry_child, (unsigned long long)last_entry_child, + start_streaming?"true":"false", + (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested, + (unsigned long long)child_world_time + ); +#endif + + ((PARSER_USER_OBJECT *) user)->st = NULL; + ((PARSER_USER_OBJECT *) user)->count++; + + if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) { + time_t now = now_realtime_sec(); + time_t started = st->rrdhost->receiver->replication_first_time_t; + time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time; + + worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, + (NETDATA_DOUBLE)(current - started) * 100.0 / (NETDATA_DOUBLE)(now - started)); } - return PARSER_RC_OK; + ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; + ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0; + ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false; + + st->counter++; + st->counter_done++; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + st->replay.start_streaming = false; + st->replay.after = 0; + st->replay.before = 0; + if(start_streaming) + st->replay.log_next_data_collection = true; +#endif + + if (start_streaming) { + if (st->update_every != update_every_child) + rrdset_set_update_every(st, update_every_child); + + if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) { + rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); + rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); + rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK); + rrdhost_receiver_replicating_charts_minus_one(st->rrdhost); + } +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + else + internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.", + rrdhost_hostname(host), rrdset_id(st)); +#endif + worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0); + + return PARSER_RC_OK; + } + + rrdcontext_updated_retention_rrdset(st); + + bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, + first_entry_child, last_entry_child, child_world_time, + first_entry_requested, last_entry_requested); + return ok ? PARSER_RC_OK : PARSER_RC_ERROR; } static void pluginsd_process_thread_cleanup(void *ptr) { PARSER *parser = (PARSER *)ptr; + rrd_collector_finished(); parser_destroy(parser); } // New plugins.d parser -inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int trust_durations) +inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations) { int enabled = cd->enabled; - if (!fp || !enabled) { + if (!fp_plugin_input || !fp_plugin_output || !enabled) { cd->enabled = 0; return 0; } - if (unlikely(fileno(fp) == -1)) { - error("file descriptor given is not a valid stream"); + if (unlikely(fileno(fp_plugin_input) == -1)) { + error("input file descriptor given is not a valid stream"); cd->serial_failures++; return 0; } - clearerr(fp); + + if (unlikely(fileno(fp_plugin_output) == -1)) { + error("output file descriptor given is not a valid stream"); + cd->serial_failures++; + return 0; + } + + clearerr(fp_plugin_input); + clearerr(fp_plugin_output); PARSER_USER_OBJECT user = { .enabled = cd->enabled, @@ -773,25 +1327,15 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int .trust_durations = trust_durations }; - PARSER *parser = parser_init(host, &user, fp, PARSER_INPUT_SPLIT); + // fp_plugin_output = our input; fp_plugin_input = our output + PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL); + + rrd_collector_started(); // this keeps the parser with its current value // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - parser->plugins_action->begin_action = &pluginsd_begin_action; - parser->plugins_action->flush_action = &pluginsd_flush_action; - parser->plugins_action->end_action = &pluginsd_end_action; - parser->plugins_action->disable_action = &pluginsd_disable_action; - parser->plugins_action->variable_action = &pluginsd_variable_action; - parser->plugins_action->dimension_action = &pluginsd_dimension_action; - parser->plugins_action->label_action = &pluginsd_label_action; - parser->plugins_action->overwrite_action = &pluginsd_overwrite_action; - parser->plugins_action->chart_action = &pluginsd_chart_action; - parser->plugins_action->set_action = &pluginsd_set_action; - parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action; - parser->plugins_action->clabel_action = &pluginsd_clabel_action; - user.parser = parser; while (likely(!parser_next(parser))) { -- cgit v1.2.3