summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d/pluginsd_parser.c
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/plugins.d/pluginsd_parser.c')
-rw-r--r--collectors/plugins.d/pluginsd_parser.c1476
1 files changed, 1010 insertions, 466 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 88e07fab7..5501c12fa 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -2,336 +2,223 @@
#include "pluginsd_parser.h"
-/*
- * This is the action defined for the FLUSH command
- */
-PARSER_RC pluginsd_set_action(void *user, RRDSET *st, RRDDIM *rd, long long int value)
-{
- UNUSED(user);
-
- rrddim_set_by_pointer(st, rd, value);
- return PARSER_RC_OK;
-}
+#define LOG_FUNCTIONS false
-PARSER_RC pluginsd_flush_action(void *user, RRDSET *st)
-{
- UNUSED(user);
- UNUSED(st);
- return PARSER_RC_OK;
-}
+static int send_to_plugin(const char *txt, void *data) {
+ PARSER *parser = data;
-PARSER_RC pluginsd_begin_action(void *user, RRDSET *st, usec_t microseconds, int trust_durations)
-{
- UNUSED(user);
- if (likely(st->counter_done)) {
- if (likely(microseconds)) {
- if (trust_durations)
- rrdset_next_usec_unfiltered(st, microseconds);
- else
- rrdset_next_usec(st, microseconds);
- } else
- rrdset_next(st);
- }
- return PARSER_RC_OK;
-}
-
-
-PARSER_RC pluginsd_end_action(void *user, RRDSET *st)
-{
- UNUSED(user);
-
- rrdset_done(st);
- return PARSER_RC_OK;
-}
+ if(!txt || !*txt)
+ return 0;
-PARSER_RC pluginsd_chart_action(void *user, char *type, char *id, char *name, char *family, char *context, char *title, char *units, char *plugin,
- char *module, int priority, int update_every, RRDSET_TYPE chart_type, char *options)
-{
- RRDSET *st = NULL;
- RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+#ifdef ENABLE_HTTPS
+ struct netdata_ssl *ssl = parser->ssl_output;
+ if(ssl) {
+ if(ssl->conn && ssl->flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
+ return (int)netdata_ssl_write(ssl->conn, (void *)txt, strlen(txt));
- st = rrdset_create(
- host, type, id, name, family, context, title, units,
- plugin, module, priority, update_every,
- chart_type);
+ error("PLUGINSD: cannot send command (SSL)");
+ return -1;
+ }
+#endif
- if (options && *options) {
- if (strstr(options, "obsolete"))
- rrdset_is_obsolete(st);
- else
- rrdset_isnot_obsolete(st);
+ if(parser->fp_output) {
+ int bytes = fprintf(parser->fp_output, "%s", txt);
+ if(bytes <= 0) {
+ error("PLUGINSD: cannot send command (FILE)");
+ return -2;
+ }
+ fflush(parser->fp_output);
+ return bytes;
+ }
- if (strstr(options, "detail"))
- rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
- else
- rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
+ if(parser->fd != -1) {
+ size_t bytes = 0;
+ size_t total = strlen(txt);
+ ssize_t sent;
- if (strstr(options, "hidden"))
- rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
- else
- rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
+ do {
+ sent = write(parser->fd, &txt[bytes], total - bytes);
+ if(sent <= 0) {
+ error("PLUGINSD: cannot send command (fd)");
+ return -3;
+ }
+ bytes += sent;
+ }
+ while(bytes < total);
- if (strstr(options, "store_first"))
- rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
- else
- rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
- } else {
- rrdset_isnot_obsolete(st);
- rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
- rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
+ return (int)bytes;
}
- ((PARSER_USER_OBJECT *)user)->st = st;
- return PARSER_RC_OK;
+ error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
+ return -4;
}
+static inline RRDHOST *pluginsd_require_host_from_parent(void *user, const char *cmd) {
+ RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
-PARSER_RC pluginsd_disable_action(void *user)
-{
- UNUSED(user);
+ if(unlikely(!host))
+ error("PLUGINSD: command %s requires a host, but is not set.", cmd);
- info("called DISABLE. Disabling it.");
- ((PARSER_USER_OBJECT *) user)->enabled = 0;
- return PARSER_RC_ERROR;
+ return host;
}
+static inline RRDSET *pluginsd_require_chart_from_parent(void *user, const char *cmd, const char *parent_cmd) {
+ RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
-PARSER_RC pluginsd_variable_action(void *user, RRDHOST *host, RRDSET *st, char *name, int global, NETDATA_DOUBLE value)
-{
- UNUSED(user);
+ if(unlikely(!st))
+ error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
- if (global) {
- RRDVAR *rv = rrdvar_custom_host_variable_create(host, name);
- if (rv)
- rrdvar_custom_host_variable_set(host, rv, value);
- else
- error("cannot find/create HOST VARIABLE '%s' on host '%s'", name, host->hostname);
- } else {
- RRDSETVAR *rs = rrdsetvar_custom_chart_variable_create(st, name);
- if (rs)
- rrdsetvar_custom_chart_variable_set(rs, value);
- else
- error("cannot find/create CHART VARIABLE '%s' on host '%s', chart '%s'", name, host->hostname, st->id);
- }
- return PARSER_RC_OK;
+ return st;
}
-
-
-PARSER_RC pluginsd_dimension_action(void *user, RRDSET *st, char *id, char *name, char *algorithm, long multiplier, long divisor, char *options,
- RRD_ALGORITHM algorithm_type)
-{
- UNUSED(user);
- UNUSED(algorithm);
-
- RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, algorithm_type);
- int unhide_dimension = 1;
-
- rrddim_flag_clear(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS);
- if (options && *options) {
- if (strstr(options, "obsolete") != NULL)
- rrddim_is_obsolete(st, rd);
- else
- rrddim_isnot_obsolete(st, rd);
-
- unhide_dimension = !strstr(options, "hidden");
-
- if (strstr(options, "noreset") != NULL)
- rrddim_flag_set(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS);
- if (strstr(options, "nooverflow") != NULL)
- rrddim_flag_set(rd, RRDDIM_FLAG_DONT_DETECT_RESETS_OR_OVERFLOWS);
- } else
- rrddim_isnot_obsolete(st, rd);
-
- if (likely(unhide_dimension)) {
- rrddim_flag_clear(rd, RRDDIM_FLAG_HIDDEN);
- if (rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) {
- (void)sql_set_dimension_option(&rd->metric_uuid, NULL);
- rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN);
- }
- } else {
- rrddim_flag_set(rd, RRDDIM_FLAG_HIDDEN);
- if (!rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) {
- (void)sql_set_dimension_option(&rd->metric_uuid, "hidden");
- rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN);
- }
+static inline RRDDIM_ACQUIRED *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
+ if (unlikely(!dimension || !*dimension)) {
+ error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
+ rrdhost_hostname(host), rrdset_id(st), cmd);
+ return NULL;
}
- return PARSER_RC_OK;
-}
-
-PARSER_RC pluginsd_label_action(void *user, char *key, char *value, RRDLABEL_SRC source)
-{
-
- if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels))
- ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create();
- rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels, key, value, source);
+ RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
- return PARSER_RC_OK;
-}
+ if (unlikely(!rda))
+ error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
+ rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
-PARSER_RC pluginsd_clabel_action(void *user, char *key, char *value, RRDLABEL_SRC source)
-{
- if(unlikely(!((PARSER_USER_OBJECT *) user)->new_chart_labels))
- ((PARSER_USER_OBJECT *) user)->new_chart_labels = rrdlabels_create();
-
- rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_chart_labels, key, value, source);
-
- return PARSER_RC_OK;
+ return rda;
}
-PARSER_RC pluginsd_clabel_commit_action(void *user, RRDHOST *host, DICTIONARY *new_chart_labels)
-{
- RRDSET *st = ((PARSER_USER_OBJECT *)user)->st;
- if (unlikely(!st)) {
- error("requested CLABEL_COMMIT on host '%s', without a BEGIN, ignoring it.", host->hostname);
- return PARSER_RC_OK;
+static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
+ if (unlikely(!chart || !*chart)) {
+ error("PLUGINSD: 'host:%s' got a %s without a chart id.",
+ rrdhost_hostname(host), cmd);
+ return NULL;
}
- rrdset_update_rrdlabels(st, new_chart_labels);
+ RRDSET *st = rrdset_find(host, chart);
+ if (unlikely(!st))
+ error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
+ rrdhost_hostname(host), chart, cmd);
- return PARSER_RC_OK;
+ return st;
}
-PARSER_RC pluginsd_overwrite_action(void *user, RRDHOST *host, DICTIONARY *new_host_labels)
-{
- UNUSED(user);
-
- if(!host->host_labels)
- host->host_labels = rrdlabels_create();
-
- rrdlabels_migrate_to_these(host->host_labels, new_host_labels);
- sql_store_host_labels(host);
-
- return PARSER_RC_OK;
+static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user) {
+ ((PARSER_USER_OBJECT *) user)->enabled = 0;
+ return PARSER_RC_ERROR;
}
-PARSER_RC pluginsd_set(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_set(char **words, size_t num_words, void *user)
{
- char *dimension = words[1];
- char *value = words[2];
+ char *dimension = get_word(words, num_words, 1);
+ char *value = get_word(words, num_words, 2);
- RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
- RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
- if (unlikely(!dimension || !*dimension)) {
- error("requested a SET on chart '%s' of host '%s', without a dimension. Disabling it.", st->id, host->hostname);
- goto disable;
- }
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
- if (unlikely(!value || !*value))
- value = NULL;
+ RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
+ if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
- if (unlikely(!st)) {
- error(
- "requested a SET on dimension %s with value %s on host '%s', without a BEGIN. Disabling it.", dimension,
- value ? value : "<nothing>", host->hostname);
- goto disable;
- }
+ RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- debug(D_PLUGINSD, "is setting dimension %s/%s to %s", st->id, dimension, value ? value : "<nothing>");
-
- if (value) {
- RRDDIM *rd = rrddim_find(st, dimension);
- if (unlikely(!rd)) {
- error(
- "requested a SET to dimension with id '%s' on stats '%s' (%s) on host '%s', which does not exist. Disabling it.",
- dimension, st->name, st->id, st->rrdhost->hostname);
- goto disable;
- } else {
- if (plugins_action->set_action) {
- return plugins_action->set_action(
- user, st, rd, strtoll(value, NULL, 0));
- }
- }
- }
- return PARSER_RC_OK;
+ debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
+ rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
-disable:
- ((PARSER_USER_OBJECT *) user)->enabled = 0;
- return PARSER_RC_ERROR;
+ if (value && *value)
+ rrddim_set_by_pointer(st, rd, strtoll(value, NULL, 0));
+
+ rrddim_acquired_release(rda);
+ return PARSER_RC_OK;
}
-PARSER_RC pluginsd_begin(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
{
- char *id = words[1];
- char *microseconds_txt = words[2];
+ char *id = get_word(words, num_words, 1);
+ char *microseconds_txt = get_word(words, num_words, 2);
- RRDSET *st = NULL;
- RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
- if (unlikely(!id)) {
- error("requested a BEGIN without a chart id for host '%s'. Disabling it.", host->hostname);
- goto disable;
- }
+ RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
- st = rrdset_find(host, id);
- if (unlikely(!st)) {
- error("requested a BEGIN on chart '%s', which does not exist on host '%s'. Disabling it.", id, host->hostname);
- goto disable;
- }
((PARSER_USER_OBJECT *)user)->st = st;
usec_t microseconds = 0;
if (microseconds_txt && *microseconds_txt)
microseconds = str2ull(microseconds_txt);
- if (plugins_action->begin_action) {
- return plugins_action->begin_action(user, st, microseconds,
- ((PARSER_USER_OBJECT *)user)->trust_durations);
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ if(st->replay.log_next_data_collection) {
+ st->replay.log_next_data_collection = false;
+
+ internal_error(true,
+ "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu",
+ rrdhost_hostname(host), rrdset_id(st),
+ st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec,
+ st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec,
+ microseconds
+ );
+ }
+#endif
+
+ if (likely(st->counter_done)) {
+ if (likely(microseconds)) {
+ if (((PARSER_USER_OBJECT *)user)->trust_durations)
+ rrdset_next_usec_unfiltered(st, microseconds);
+ else
+ rrdset_next_usec(st, microseconds);
+ }
+ else
+ rrdset_next(st);
}
return PARSER_RC_OK;
-disable:
- ((PARSER_USER_OBJECT *)user)->enabled = 0;
- return PARSER_RC_ERROR;
}
-PARSER_RC pluginsd_end(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
{
UNUSED(words);
- RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
- RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+ UNUSED(num_words);
- if (unlikely(!st)) {
- error("requested an END, without a BEGIN on host '%s'. Disabling it.", host->hostname);
- ((PARSER_USER_OBJECT *) user)->enabled = 0;
- return PARSER_RC_ERROR;
- }
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- debug(D_PLUGINSD, "requested an END on chart %s", st->id);
+ debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
((PARSER_USER_OBJECT *) user)->st = NULL;
((PARSER_USER_OBJECT *) user)->count++;
- if (plugins_action->end_action) {
- return plugins_action->end_action(user, st);
- }
+
+ struct timeval now;
+ now_realtime_timeval(&now);
+ rrdset_timed_done(st, now, /* pending_rrdset_next = */ false);
+
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
{
- RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
- if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) {
- debug(D_PLUGINSD, "Ignoring chart belonging to missing or ignored host.");
- return PARSER_RC_OK;
- }
-
- char *type = words[1];
- char *name = words[2];
- char *title = words[3];
- char *units = words[4];
- char *family = words[5];
- char *context = words[6];
- char *chart = words[7];
- char *priority_s = words[8];
- char *update_every_s = words[9];
- char *options = words[10];
- char *plugin = words[11];
- char *module = words[12];
-
- int have_action = ((plugins_action->chart_action) != NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ char *type = get_word(words, num_words, 1);
+ char *name = get_word(words, num_words, 2);
+ char *title = get_word(words, num_words, 3);
+ char *units = get_word(words, num_words, 4);
+ char *family = get_word(words, num_words, 5);
+ char *context = get_word(words, num_words, 6);
+ char *chart = get_word(words, num_words, 7);
+ char *priority_s = get_word(words, num_words, 8);
+ char *update_every_s = get_word(words, num_words, 9);
+ char *options = get_word(words, num_words, 10);
+ char *plugin = get_word(words, num_words, 11);
+ char *module = get_word(words, num_words, 12);
// parse the id from type
char *id = NULL;
@@ -342,10 +229,9 @@ PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_act
// make sure we have the required variables
if (unlikely((!type || !*type || !id || !*id))) {
- if (likely(host))
- error("requested a CHART, without a type.id, on host '%s'. Disabling it.", host->hostname);
- else
- error("requested a CHART, without a type.id. Disabling it.");
+ error("PLUGINSD: 'host:%s' requested a CHART, without a type.id. Disabling it.",
+ rrdhost_hostname(host));
+
((PARSER_USER_OBJECT *) user)->enabled = 0;
return PARSER_RC_ERROR;
}
@@ -396,42 +282,120 @@ PARSER_RC pluginsd_chart(char **words, void *user, PLUGINSD_ACTION *plugins_act
type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
priority, update_every);
- if (have_action) {
- return plugins_action->chart_action(
- user, type, id, name, family, context, title, units,
- (plugin && *plugin) ? plugin : ((PARSER_USER_OBJECT *)user)->cd->filename, module, priority, update_every,
- chart_type, options);
+ RRDSET *st = NULL;
+
+ st = rrdset_create(
+ host, type, id, name, family, context, title, units,
+ (plugin && *plugin) ? plugin : ((PARSER_USER_OBJECT *)user)->cd->filename,
+ module, priority, update_every,
+ chart_type);
+
+ if (likely(st)) {
+ if (options && *options) {
+ if (strstr(options, "obsolete"))
+ rrdset_is_obsolete(st);
+ else
+ rrdset_isnot_obsolete(st);
+
+ if (strstr(options, "detail"))
+ rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
+ else
+ rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
+
+ if (strstr(options, "hidden"))
+ rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
+ else
+ rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
+
+ if (strstr(options, "store_first"))
+ rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
+ else
+ rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
+ } else {
+ rrdset_isnot_obsolete(st);
+ rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
+ rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
+ }
}
+ ((PARSER_USER_OBJECT *)user)->st = st;
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user)
{
- char *id = words[1];
- char *name = words[2];
- char *algorithm = words[3];
- char *multiplier_s = words[4];
- char *divisor_s = words[5];
- char *options = words[6];
-
- RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
- RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
- if (unlikely(!host && !((PARSER_USER_OBJECT *) user)->host_exists)) {
- debug(D_PLUGINSD, "Ignoring dimension belonging to missing or ignored host.");
- return PARSER_RC_OK;
+ const char *first_entry_txt = get_word(words, num_words, 1);
+ const char *last_entry_txt = get_word(words, num_words, 2);
+ const char *world_time_txt = get_word(words, num_words, 3);
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
+ time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
+ time_t child_world_time = (world_time_txt && *world_time_txt) ? (time_t)str2ul(world_time_txt) : now_realtime_sec();
+
+ if((first_entry_child != 0 || last_entry_child != 0) && (first_entry_child == 0 || last_entry_child == 0))
+ error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_CHART_DEFINITION_END " with malformed timings (first time %ld, last time %ld, world time %ld).",
+ rrdhost_hostname(host), rrdset_id(st),
+ first_entry_child, last_entry_child, child_world_time);
+
+ bool ok = true;
+ if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ st->replay.start_streaming = false;
+ st->replay.after = 0;
+ st->replay.before = 0;
+#endif
+
+ rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
+
+ PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser;
+ ok = replicate_chart_request(send_to_plugin, parser, host, st,
+ first_entry_child, last_entry_child, child_world_time,
+ 0, 0);
+ }
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ else {
+ internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
}
+#endif
+
+ return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
+}
+
+PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
+{
+ char *id = get_word(words, num_words, 1);
+ char *name = get_word(words, num_words, 2);
+ char *algorithm = get_word(words, num_words, 3);
+ char *multiplier_s = get_word(words, num_words, 4);
+ char *divisor_s = get_word(words, num_words, 5);
+ char *options = get_word(words, num_words, 6);
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_DIMENSION);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
if (unlikely(!id)) {
- error(
- "requested a DIMENSION, without an id, host '%s' and chart '%s'. Disabling it.", host->hostname,
- st ? st->id : "UNSET");
- goto disable;
+ error("PLUGINSD: 'host:%s/chart:%s' got a DIMENSION, without an id. Disabling it.",
+ rrdhost_hostname(host), st ? rrdset_id(st) : "UNSET");
+ return PLUGINSD_DISABLE_PLUGIN(user);
}
if (unlikely(!st && !((PARSER_USER_OBJECT *) user)->st_exists)) {
- error("requested a DIMENSION, without a CHART, on host '%s'. Disabling it.", host->hostname);
- goto disable;
+ error("PLUGINSD: 'host:%s' got a DIMENSION, without a CHART. Disabling it.",
+ rrdhost_hostname(host));
+ return PLUGINSD_DISABLE_PLUGIN(user);
}
long multiplier = 1;
@@ -455,316 +419,906 @@ PARSER_RC pluginsd_dimension(char **words, void *user, PLUGINSD_ACTION *plugins
debug(
D_PLUGINSD,
"creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
- st->id, id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
+ rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
options ? options : "");
- if (plugins_action->dimension_action) {
- return plugins_action->dimension_action(
- user, st, id, name, algorithm,
- multiplier, divisor, (options && *options)?options:NULL, rrd_algorithm_id(algorithm));
+ RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm));
+ int unhide_dimension = 1;
+
+ rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
+ if (options && *options) {
+ if (strstr(options, "obsolete") != NULL)
+ rrddim_is_obsolete(st, rd);
+ else
+ rrddim_isnot_obsolete(st, rd);
+
+ unhide_dimension = !strstr(options, "hidden");
+
+ if (strstr(options, "noreset") != NULL)
+ rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
+ if (strstr(options, "nooverflow") != NULL)
+ rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
+ } else
+ rrddim_isnot_obsolete(st, rd);
+
+ if (likely(unhide_dimension)) {
+ rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN);
+ if (rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) {
+ rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN);
+ metaqueue_dimension_update_flags(rd);
+ }
+ }
+ else {
+ rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
+ if (!rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN)) {
+ rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN);
+ metaqueue_dimension_update_flags(rd);
+ }
+ }
+
+ return PARSER_RC_OK;
+}
+
+// ----------------------------------------------------------------------------
+// execution of functions
+
+struct inflight_function {
+ int code;
+ int timeout;
+ BUFFER *destination_wb;
+ STRING *function;
+ void (*callback)(BUFFER *wb, int code, void *callback_data);
+ void *callback_data;
+ usec_t timeout_ut;
+ usec_t started_ut;
+ usec_t sent_ut;
+};
+
+static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
+ struct inflight_function *pf = func;
+
+ PARSER *parser = parser_ptr;
+
+ // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
+ pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
+
+ char buffer[2048 + 1];
+ snprintfz(buffer, 2048, "FUNCTION %s %d \"%s\"\n",
+ dictionary_acquired_item_name(item),
+ pf->timeout,
+ string2str(pf->function));
+
+ // send the command to the plugin
+ int ret = send_to_plugin(buffer, parser);
+
+ pf->sent_ut = now_realtime_usec();
+
+ if(ret < 0) {
+ error("FUNCTION: failed to send function to plugin, error %d", ret);
+ rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
+ }
+ else {
+ internal_error(LOG_FUNCTIONS,
+ "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)",
+ string2str(pf->function), dictionary_acquired_item_name(item), ret,
+ pf->sent_ut - pf->started_ut);
+ }
+}
+
+static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) {
+ struct inflight_function *pf = new_func;
+
+ error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
+ pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
+ pf->callback(pf->destination_wb, pf->code, pf->callback_data);
+ string_freez(pf->function);
+
+ return false;
+}
+
+static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) {
+ struct inflight_function *pf = func;
+
+ internal_error(LOG_FUNCTIONS,
+ "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %llu usec, response %llu usec)",
+ string2str(pf->function), dictionary_acquired_item_name(item),
+ buffer_strlen(pf->destination_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut);
+
+ pf->callback(pf->destination_wb, pf->code, pf->callback_data);
+ string_freez(pf->function);
+}
+
+void inflight_functions_init(PARSER *parser) {
+ parser->inflight.functions = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser);
+ dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser);
+ dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser);
+}
+
+static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) {
+ parser->inflight.smaller_timeout = 0;
+ struct inflight_function *pf;
+ dfe_start_write(parser->inflight.functions, pf) {
+ if (pf->timeout_ut < now) {
+ internal_error(true,
+ "FUNCTION '%s' removing expired transaction '%s', after %llu usec.",
+ string2str(pf->function), pf_dfe.name, now - pf->started_ut);
+
+ if(!buffer_strlen(pf->destination_wb) || pf->code == HTTP_RESP_OK)
+ pf->code = rrd_call_function_error(pf->destination_wb,
+ "Timeout waiting for collector response.",
+ HTTP_RESP_GATEWAY_TIMEOUT);
+
+ dictionary_del(parser->inflight.functions, pf_dfe.name);
+ }
+
+ else if(!parser->inflight.smaller_timeout || pf->timeout_ut < parser->inflight.smaller_timeout)
+ parser->inflight.smaller_timeout = pf->timeout_ut;
+ }
+ dfe_done(pf);
+}
+
+// this is the function that is called from
+// rrd_call_function_and_wait() and rrd_call_function_async()
+static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeout, const char *function, void *collector_data, void (*callback)(BUFFER *wb, int code, void *callback_data), void *callback_data) {
+ PARSER *parser = collector_data;
+
+ usec_t now = now_realtime_usec();
+
+ struct inflight_function tmp = {
+ .started_ut = now,
+ .timeout_ut = now + timeout * USEC_PER_SEC,
+ .destination_wb = destination_wb,
+ .timeout = timeout,
+ .function = string_strdupz(function),
+ .callback = callback,
+ .callback_data = callback_data,
+ };
+
+ uuid_t uuid;
+ uuid_generate_time(uuid);
+
+ char key[UUID_STR_LEN];
+ uuid_unparse_lower(uuid, key);
+
+ dictionary_write_lock(parser->inflight.functions);
+
+ // if there is any error, our dictionary callbacks will call the caller callback to notify
+ // the caller about the error - no need for error handling here.
+ dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
+
+ if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
+ parser->inflight.smaller_timeout = tmp.timeout_ut;
+
+ // garbage collect stale inflight functions
+ if(parser->inflight.smaller_timeout < now)
+ inflight_functions_garbage_collect(parser, now);
+
+ dictionary_write_unlock(parser->inflight.functions);
+
+ return HTTP_RESP_OK;
+}
+
+PARSER_RC pluginsd_function(char **words, size_t num_words, void *user)
+{
+ bool global = false;
+ size_t i = 1;
+ if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
+ i++;
+ global = true;
+ }
+
+ char *name = get_word(words, num_words, i++);
+ char *timeout_s = get_word(words, num_words, i++);
+ char *help = get_word(words, num_words, i++);
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_FUNCTION);
+ if(!host) return PARSER_RC_ERROR;
+
+ RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
+ if(!st) global = true;
+
+ if (unlikely(!timeout_s || !name || !help || (!global && !st))) {
+ error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
+ rrdhost_hostname(host),
+ st?rrdset_id(st):"(unset)",
+ global?"yes":"no",
+ name?name:"(unset)",
+ timeout_s?timeout_s:"(unset)",
+ help?help:"(unset)"
+ );
+ return PARSER_RC_ERROR;
+ }
+
+ int timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
+ if (timeout_s && *timeout_s) {
+ timeout = str2i(timeout_s);
+ if (unlikely(timeout <= 0))
+ timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
}
+ PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser;
+ rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser);
+
+ return PARSER_RC_OK;
+}
+
+static void pluginsd_function_result_end(struct parser *parser, void *action_data) {
+ STRING *key = action_data;
+ if(key)
+ dictionary_del(parser->inflight.functions, string2str(key));
+ string_freez(key);
+}
+
+PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user)
+{
+ char *key = get_word(words, num_words, 1);
+ char *status = get_word(words, num_words, 2);
+ char *format = get_word(words, num_words, 3);
+ char *expires = get_word(words, num_words, 4);
+
+ if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) {
+ error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
+ , key ? key : "(unset)"
+ , status ? status : "(unset)"
+ , format ? format : "(unset)"
+ , expires ? expires : "(unset)"
+ );
+ }
+
+ int code = (status && *status) ? str2i(status) : 0;
+ if (code <= 0)
+ code = HTTP_RESP_BACKEND_RESPONSE_INVALID;
+
+ time_t expiration = (expires && *expires) ? str2l(expires) : 0;
+
+ PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser;
+
+ struct inflight_function *pf = NULL;
+
+ if(key && *key)
+ pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key);
+
+ if(!pf) {
+ error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)");
+ }
+ else {
+ if(format && *format)
+ pf->destination_wb->contenttype = functions_format_to_content_type(format);
+
+ pf->code = code;
+
+ pf->destination_wb->expires = expiration;
+ if(expiration <= now_realtime_sec())
+ buffer_no_cacheable(pf->destination_wb);
+ else
+ buffer_cacheable(pf->destination_wb);
+ }
+
+ parser->defer.response = (pf) ? pf->destination_wb : NULL;
+ parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END;
+ parser->defer.action = pluginsd_function_result_end;
+ parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL
+ parser->flags |= PARSER_DEFER_UNTIL_KEYWORD;
+
return PARSER_RC_OK;
-disable:
- ((PARSER_USER_OBJECT *)user)->enabled = 0;
- return PARSER_RC_ERROR;
}
-PARSER_RC pluginsd_variable(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+// ----------------------------------------------------------------------------
+
+PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
{
- char *name = words[1];
- char *value = words[2];
+ char *name = get_word(words, num_words, 1);
+ char *value = get_word(words, num_words, 2);
NETDATA_DOUBLE v;
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_VARIABLE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
- RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
int global = (st) ? 0 : 1;
if (name && *name) {
if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
global = 1;
- name = words[2];
- value = words[3];
+ name = get_word(words, num_words, 2);
+ value = get_word(words, num_words, 3);
} else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
global = 0;
- name = words[2];
- value = words[3];
+ name = get_word(words, num_words, 2);
+ value = get_word(words, num_words, 3);
}
}
if (unlikely(!name || !*name)) {
- error("requested a VARIABLE on host '%s', without a variable name. Disabling it.", host->hostname);
+ error("PLUGINSD: 'host:%s/chart:%s' got a VARIABLE without a variable name. Disabling it.",
+ rrdhost_hostname(host), st ? rrdset_id(st):"UNSET");
+
((PARSER_USER_OBJECT *)user)->enabled = 0;
- return PARSER_RC_ERROR;
+ return PLUGINSD_DISABLE_PLUGIN(user);
}
if (unlikely(!value || !*value))
value = NULL;
if (unlikely(!value)) {
- error("cannot set %s VARIABLE '%s' on host '%s' to an empty value", (global) ? "HOST" : "CHART", name,
- host->hostname);
+ error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ (global) ? "HOST" : "CHART",
+ name);
return PARSER_RC_OK;
}
if (!global && !st) {
- error("cannot find/create CHART VARIABLE '%s' on host '%s' without a chart", name, host->hostname);
- return PARSER_RC_OK;
+ error("PLUGINSD: 'host:%s/chart:%s' cannot update CHART VARIABLE '%s' without a chart",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ name
+ );
+ return PLUGINSD_DISABLE_PLUGIN(user);
}
char *endptr = NULL;
v = (NETDATA_DOUBLE)str2ndd(value, &endptr);
if (unlikely(endptr && *endptr)) {
if (endptr == value)
- error(
- "the value '%s' of VARIABLE '%s' on host '%s' cannot be parsed as a number", value, name,
- host->hostname);
+ error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ value,
+ name);
else
- error(
- "the value '%s' of VARIABLE '%s' on host '%s' has leftovers: '%s'", value, name, host->hostname,
- endptr);
+ error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ value,
+ name,
+ endptr);
}
- if (plugins_action->variable_action) {
- return plugins_action->variable_action(user, host, st, name, global, v);
+ if (global) {
+ const RRDVAR_ACQUIRED *rva = rrdvar_custom_host_variable_add_and_acquire(host, name);
+ if (rva) {
+ rrdvar_custom_host_variable_set(host, rva, v);
+ rrdvar_custom_host_variable_release(host, rva);
+ }
+ else
+ error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
+ rrdhost_hostname(host),
+ name);
+ } else {
+ const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name);
+ if (rsa) {
+ rrdsetvar_custom_chart_variable_set(st, rsa, v);
+ rrdsetvar_custom_chart_variable_release(st, rsa);
+ }
+ else
+ error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
+ rrdhost_hostname(host), rrdset_id(st), name);
}
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_flush(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
{
- UNUSED(words);
debug(D_PLUGINSD, "requested a FLUSH");
- RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
((PARSER_USER_OBJECT *) user)->st = NULL;
- if (plugins_action->flush_action) {
- return plugins_action->flush_action(user, st);
- }
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_disable(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused)
{
- UNUSED(user);
- UNUSED(words);
-
- if (plugins_action->disable_action) {
- return plugins_action->disable_action(user);
- }
+ info("PLUGINSD: plugin called DISABLE. Disabling it.");
+ ((PARSER_USER_OBJECT *) user)->enabled = 0;
return PARSER_RC_ERROR;
}
-PARSER_RC pluginsd_label(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
{
- char *store;
+ const char *name = get_word(words, num_words, 1);
+ const char *label_source = get_word(words, num_words, 2);
+ const char *value = get_word(words, num_words, 3);
- if (!words[1] || !words[2] || !words[3]) {
- error("Ignoring malformed or empty LABEL command.");
- return PARSER_RC_OK;
+ if (!name || !label_source || !value) {
+ error("PLUGINSD: ignoring malformed or empty LABEL command.");
+ return PLUGINSD_DISABLE_PLUGIN(user);
}
- if (!words[4])
- store = words[3];
- else {
- store = callocz(PLUGINSD_LINE_MAX + 1, sizeof(char));
+
+ char *store = (char *)value;
+ bool allocated_store = false;
+
+ if(unlikely(num_words > 4)) {
+ allocated_store = true;
+ store = mallocz(PLUGINSD_LINE_MAX + 1);
size_t remaining = PLUGINSD_LINE_MAX;
char *move = store;
- int i = 3;
- while (i < PLUGINSD_MAX_WORDS) {
- size_t length = strlen(words[i]);
- if ((length + 1) >= remaining)
- break;
-
- remaining -= (length + 1);
- memcpy(move, words[i], length);
- move += length;
- *move++ = ' ';
+ char *word;
+ for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) {
+ if(i > 3) {
+ *move++ = ' ';
+ *move = '\0';
+ remaining--;
+ }
+
+ size_t length = strlen(word);
+ if (length > remaining)
+ length = remaining;
- i++;
- if (!words[i])
- break;
+ remaining -= length;
+ memcpy(move, word, length);
+ move += length;
+ *move = '\0';
}
}
- if (plugins_action->label_action) {
- PARSER_RC rc = plugins_action->label_action(user, words[1], store, strtol(words[2], NULL, 10));
- if (store != words[3])
- freez(store);
- return rc;
- }
+ if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels))
+ ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create();
+
+ rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels,
+ name,
+ store,
+ str2l(label_source));
- if (store != words[3])
+ if (allocated_store)
freez(store);
+
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
+{
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_OVERWRITE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ debug(D_PLUGINSD, "requested to OVERWRITE host labels");
+
+ if(unlikely(!host->rrdlabels))
+ host->rrdlabels = rrdlabels_create();
+
+ rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels));
+ metaqueue_store_host_labels(host->machine_guid);
+
+ rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels);
+ ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL;
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_clabel(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+
+PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user)
{
- if (!words[1] || !words[2] || !words[3]) {
+ const char *name = get_word(words, num_words, 1);
+ const char *value = get_word(words, num_words, 2);
+ const char *label_source = get_word(words, num_words, 3);
+
+ if (!name || !value || !*label_source) {
error("Ignoring malformed or empty CHART LABEL command.");
- return PARSER_RC_OK;
+ return PLUGINSD_DISABLE_PLUGIN(user);
}
- if (plugins_action->clabel_action) {
- PARSER_RC rc = plugins_action->clabel_action(user, words[1], words[2], strtol(words[3], NULL, 10));
- return rc;
+ if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) {
+ ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = ((PARSER_USER_OBJECT *)user)->st->rrdlabels;
+ rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
}
+ rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily,
+ name, value, str2l(label_source));
+
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_clabel_commit(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
{
- UNUSED(words);
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
- RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
debug(D_PLUGINSD, "requested to commit chart labels");
- PARSER_RC rc = PARSER_RC_OK;
+ if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) {
+ error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.",
+ rrdhost_hostname(host));
+ return PLUGINSD_DISABLE_PLUGIN(user);
+ }
- if (plugins_action->clabel_commit_action)
- rc = plugins_action->clabel_commit_action(user, host, ((PARSER_USER_OBJECT *)user)->new_chart_labels);
+ rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
- rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_chart_labels);
- ((PARSER_USER_OBJECT *)user)->new_chart_labels = NULL;
+ rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
+ rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
- return rc;
+ ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = NULL;
+ return PARSER_RC_OK;
}
-PARSER_RC pluginsd_overwrite(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_replay_rrdset_begin(char **words, size_t num_words, void *user)
{
- UNUSED(words);
+ char *id = get_word(words, num_words, 1);
+ char *start_time_str = get_word(words, num_words, 2);
+ char *end_time_str = get_word(words, num_words, 3);
+ char *child_now_str = get_word(words, num_words, 4);
- RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
- debug(D_PLUGINSD, "requested to OVERWRITE host labels");
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
- PARSER_RC rc = PARSER_RC_OK;
+ RRDSET *st;
+ if (likely(!id || !*id))
+ st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ else
+ st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if (plugins_action->overwrite_action)
- rc = plugins_action->overwrite_action(user, host, ((PARSER_USER_OBJECT *)user)->new_host_labels);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+ ((PARSER_USER_OBJECT *) user)->st = st;
- rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels);
- ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL;
+ if(start_time_str && end_time_str) {
+ time_t start_time = (time_t)str2ul(start_time_str);
+ time_t end_time = (time_t)str2ul(end_time_str);
- return rc;
-}
+ time_t wall_clock_time = 0, tolerance;
+ bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child;
+ if(child_now_str) {
+ wall_clock_time = (time_t)str2ul(child_now_str);
+ tolerance = st->update_every + 1;
+ wall_clock_comes_from_child = true;
+ }
-PARSER_RC pluginsd_guid(char **words, void *user, PLUGINSD_ACTION *plugins_action)
-{
- char *uuid_str = words[1];
- uuid_t uuid;
+ if(wall_clock_time <= 0) {
+ wall_clock_time = now_realtime_sec();
+ tolerance = st->update_every + 5;
+ wall_clock_comes_from_child = false;
+ }
- if (unlikely(!uuid_str)) {
- error("requested a GUID, without a uuid.");
- return PARSER_RC_ERROR;
- }
- if (unlikely(strlen(uuid_str) != GUID_LEN || uuid_parse(uuid_str, uuid) == -1)) {
- error("requested a GUID, without a valid uuid string.");
- return PARSER_RC_ERROR;
- }
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(
+ (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
+ "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before);
+
+ internal_error(
+ true,
+ "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, child wall clock is %ld (%s), had requested %ld to %ld",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "from child" : "parent time",
+ st->replay.after, st->replay.before);
+#endif
+
+ if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
+ if (unlikely(end_time - start_time != st->update_every))
+ rrdset_set_update_every(st, end_time - start_time);
+
+ st->last_collected_time.tv_sec = end_time;
+ st->last_collected_time.tv_usec = 0;
+
+ st->last_updated.tv_sec = end_time;
+ st->last_updated.tv_usec = 0;
+
+ st->counter++;
+ st->counter_done++;
+
+ // these are only needed for db mode RAM, SAVE, MAP, ALLOC
+ st->current_entry++;
+ if(st->current_entry >= st->entries)
+ st->current_entry -= st->entries;
+
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
+ ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = wall_clock_time;
+ ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = true;
+
+ return PARSER_RC_OK;
+ }
- debug(D_PLUGINSD, "Parsed uuid=%s", uuid_str);
- if (plugins_action->guid_action) {
- return plugins_action->guid_action(user, &uuid);
+ error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, but timestamps are invalid (now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
+ wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance);
}
+ // the child sends an RBEGIN without any parameters initially
+ // setting rset_enabled to false, means the RSET should not store any metrics
+ // to store metrics, the RBEGIN needs to have timestamps
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_context(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
{
- char *uuid_str = words[1];
- uuid_t uuid;
+ char *dimension = get_word(words, num_words, 1);
+ char *value_str = get_word(words, num_words, 2);
+ char *flags_str = get_word(words, num_words, 3);
- if (unlikely(!uuid_str)) {
- error("requested a CONTEXT, without a uuid.");
- return PARSER_RC_ERROR;
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ if(!((PARSER_USER_OBJECT *) user)->replay.rset_enabled) {
+ error_limit_static_thread_var(erl, 1, 0);
+ error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " but it is disabled by " PLUGINSD_KEYWORD_REPLAY_BEGIN " errors",
+ rrdhost_hostname(host), rrdset_id(st));
+
+ // we have to return OK here
+ return PARSER_RC_OK;
}
- if (unlikely(strlen(uuid_str) != GUID_LEN || uuid_parse(uuid_str, uuid) == -1)) {
- error("requested a CONTEXT, without a valid uuid string.");
- return PARSER_RC_ERROR;
+
+ RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
+ if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ if (unlikely(!((PARSER_USER_OBJECT *) user)->replay.start_time || !((PARSER_USER_OBJECT *) user)->replay.end_time)) {
+ error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a " PLUGINSD_KEYWORD_REPLAY_SET " with invalid timestamps %ld to %ld from a " PLUGINSD_KEYWORD_REPLAY_BEGIN ". Disabling it.",
+ rrdhost_hostname(host),
+ rrdset_id(st),
+ dimension,
+ ((PARSER_USER_OBJECT *) user)->replay.start_time,
+ ((PARSER_USER_OBJECT *) user)->replay.end_time);
+ return PLUGINSD_DISABLE_PLUGIN(user);
+ }
+
+ if (unlikely(!value_str || !*value_str))
+ value_str = "NAN";
+
+ if(unlikely(!flags_str))
+ flags_str = "";
+
+ if (likely(value_str)) {
+ RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
+
+ RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
+
+ if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) {
+ NETDATA_DOUBLE value = strtondd(value_str, NULL);
+ SN_FLAGS flags = SN_FLAG_NONE;
+
+ char c;
+ while ((c = *flags_str++)) {
+ switch (c) {
+ case 'R':
+ flags |= SN_FLAG_RESET;
+ break;
+
+ case 'E':
+ flags |= SN_EMPTY_SLOT;
+ value = NAN;
+ break;
+
+ default:
+ error("unknown flag '%c'", c);
+ break;
+ }
+ }
+
+ if (!netdata_double_isnumber(value)) {
+ value = NAN;
+ flags = SN_EMPTY_SLOT;
+ }
+
+ rrddim_store_metric(rd, ((PARSER_USER_OBJECT *) user)->replay.end_time_ut, value, flags);
+ rd->last_collected_time.tv_sec = ((PARSER_USER_OBJECT *) user)->replay.end_time;
+ rd->last_collected_time.tv_usec = 0;
+ rd->collections_counter++;
+ }
+ else {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
+ }
}
- debug(D_PLUGINSD, "Parsed uuid=%s", uuid_str);
- if (plugins_action->context_action) {
- return plugins_action->context_action(user, &uuid);
+ rrddim_acquired_release(rda);
+ return PARSER_RC_OK;
+}
+
+PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user)
+{
+ char *dimension = get_word(words, num_words, 1);
+ char *last_collected_ut_str = get_word(words, num_words, 2);
+ char *last_collected_value_str = get_word(words, num_words, 3);
+ char *last_calculated_value_str = get_word(words, num_words, 4);
+ char *last_stored_value_str = get_word(words, num_words, 5);
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDDIM_ACQUIRED *rda = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
+ if(!rda) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
+ usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec;
+ usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
+ if(last_collected_ut > dim_last_collected_ut) {
+ rd->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC;
+ rd->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC;
}
+ rd->last_collected_value = last_collected_value_str ? str2ll(last_collected_value_str, NULL) : 0;
+ rd->last_calculated_value = last_calculated_value_str ? str2ndd(last_calculated_value_str, NULL) : 0;
+ rd->last_stored_value = last_stored_value_str ? str2ndd(last_stored_value_str, NULL) : 0.0;
+ rrddim_acquired_release(rda);
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_tombstone(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user)
{
- char *uuid_str = words[1];
- uuid_t uuid;
+ char *last_collected_ut_str = get_word(words, num_words, 1);
+ char *last_updated_ut_str = get_word(words, num_words, 2);
- if (unlikely(!uuid_str)) {
- error("requested a TOMBSTONE, without a uuid.");
- return PARSER_RC_ERROR;
- }
- if (unlikely(strlen(uuid_str) != GUID_LEN || uuid_parse(uuid_str, uuid) == -1)) {
- error("requested a TOMBSTONE, without a valid uuid string.");
- return PARSER_RC_ERROR;
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
+ usec_t last_collected_ut = last_collected_ut_str ? str2ull(last_collected_ut_str) : 0;
+ if(last_collected_ut > chart_last_collected_ut) {
+ st->last_collected_time.tv_sec = last_collected_ut / USEC_PER_SEC;
+ st->last_collected_time.tv_usec = last_collected_ut % USEC_PER_SEC;
}
- debug(D_PLUGINSD, "Parsed uuid=%s", uuid_str);
- if (plugins_action->tombstone_action) {
- return plugins_action->tombstone_action(user, &uuid);
+ usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec;
+ usec_t last_updated_ut = last_updated_ut_str ? str2ull(last_updated_ut_str) : 0;
+ if(last_updated_ut > chart_last_updated_ut) {
+ st->last_updated.tv_sec = last_updated_ut / USEC_PER_SEC;
+ st->last_updated.tv_usec = last_updated_ut % USEC_PER_SEC;
}
+ st->counter++;
+ st->counter_done++;
+
return PARSER_RC_OK;
}
-PARSER_RC metalog_pluginsd_host(char **words, void *user, PLUGINSD_ACTION *plugins_action)
+PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
{
- char *machine_guid = words[1];
- char *hostname = words[2];
- char *registry_hostname = words[3];
- char *update_every_s = words[4];
- char *os = words[5];
- char *timezone = words[6];
- char *tags = words[7];
-
- int update_every = 1;
- if (likely(update_every_s && *update_every_s))
- update_every = str2i(update_every_s);
- if (unlikely(!update_every))
- update_every = 1;
+ if (num_words < 7) { // accepts 7, but the 7th is optional
+ error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
+ return PARSER_RC_ERROR;
+ }
- debug(D_PLUGINSD, "HOST PARSED: guid=%s, hostname=%s, reg_host=%s, update=%d, os=%s, timezone=%s, tags=%s",
- machine_guid, hostname, registry_hostname, update_every, os, timezone, tags);
+ const char *update_every_child_txt = get_word(words, num_words, 1);
+ const char *first_entry_child_txt = get_word(words, num_words, 2);
+ const char *last_entry_child_txt = get_word(words, num_words, 3);
+ const char *start_streaming_txt = get_word(words, num_words, 4);
+ const char *first_entry_requested_txt = get_word(words, num_words, 5);
+ const char *last_entry_requested_txt = get_word(words, num_words, 6);
+ const char *child_world_time_txt = get_word(words, num_words, 7); // optional
- if (plugins_action->host_action) {
- return plugins_action->host_action(
- user, machine_guid, hostname, registry_hostname, update_every, os, timezone, tags);
+ time_t update_every_child = (time_t)str2ul(update_every_child_txt);
+ time_t first_entry_child = (time_t)str2ul(first_entry_child_txt);
+ time_t last_entry_child = (time_t)str2ul(last_entry_child_txt);
+
+ bool start_streaming = (strcmp(start_streaming_txt, "true") == 0);
+ time_t first_entry_requested = (time_t)str2ul(first_entry_requested_txt);
+ time_t last_entry_requested = (time_t)str2ul(last_entry_requested_txt);
+
+ // the optional child world time
+ time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t)str2ul(child_world_time_txt) : now_realtime_sec();
+
+ PARSER_USER_OBJECT *user_object = user;
+
+ RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(user);
+
+ RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(user);
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ internal_error(true,
+ "PLUGINSD REPLAY: 'host:%s/chart:%s': got a " PLUGINSD_KEYWORD_REPLAY_END " child db from %llu to %llu, start_streaming %s, had requested from %llu to %llu, wall clock %llu",
+ rrdhost_hostname(host), rrdset_id(st),
+ (unsigned long long)first_entry_child, (unsigned long long)last_entry_child,
+ start_streaming?"true":"false",
+ (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested,
+ (unsigned long long)child_world_time
+ );
+#endif
+
+ ((PARSER_USER_OBJECT *) user)->st = NULL;
+ ((PARSER_USER_OBJECT *) user)->count++;
+
+ if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) {
+ time_t now = now_realtime_sec();
+ time_t started = st->rrdhost->receiver->replication_first_time_t;
+ time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time;
+
+ worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
+ (NETDATA_DOUBLE)(current - started) * 100.0 / (NETDATA_DOUBLE)(now - started));
}
- return PARSER_RC_OK;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
+ ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
+
+ st->counter++;
+ st->counter_done++;
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ st->replay.start_streaming = false;
+ st->replay.after = 0;
+ st->replay.before = 0;
+ if(start_streaming)
+ st->replay.log_next_data_collection = true;
+#endif
+
+ if (start_streaming) {
+ if (st->update_every != update_every_child)
+ rrdset_set_update_every(st, update_every_child);
+
+ if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
+ rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
+ rrdhost_receiver_replicating_charts_minus_one(st->rrdhost);
+ }
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ else
+ internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
+ rrdhost_hostname(host), rrdset_id(st));
+#endif
+ worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, 100.0);
+
+ return PARSER_RC_OK;
+ }
+
+ rrdcontext_updated_retention_rrdset(st);
+
+ bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st,
+ first_entry_child, last_entry_child, child_world_time,
+ first_entry_requested, last_entry_requested);
+ return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
}
static void pluginsd_process_thread_cleanup(void *ptr) {
PARSER *parser = (PARSER *)ptr;
+ rrd_collector_finished();
parser_destroy(parser);
}
// New plugins.d parser
-inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int trust_durations)
+inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
{
int enabled = cd->enabled;
- if (!fp || !enabled) {
+ if (!fp_plugin_input || !fp_plugin_output || !enabled) {
cd->enabled = 0;
return 0;
}
- if (unlikely(fileno(fp) == -1)) {
- error("file descriptor given is not a valid stream");
+ if (unlikely(fileno(fp_plugin_input) == -1)) {
+ error("input file descriptor given is not a valid stream");
cd->serial_failures++;
return 0;
}
- clearerr(fp);
+
+ if (unlikely(fileno(fp_plugin_output) == -1)) {
+ error("output file descriptor given is not a valid stream");
+ cd->serial_failures++;
+ return 0;
+ }
+
+ clearerr(fp_plugin_input);
+ clearerr(fp_plugin_output);
PARSER_USER_OBJECT user = {
.enabled = cd->enabled,
@@ -773,25 +1327,15 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int
.trust_durations = trust_durations
};
- PARSER *parser = parser_init(host, &user, fp, PARSER_INPUT_SPLIT);
+ // fp_plugin_output = our input; fp_plugin_input = our output
+ PARSER *parser = parser_init(host, &user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
+
+ rrd_collector_started();
// this keeps the parser with its current value
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
- parser->plugins_action->begin_action = &pluginsd_begin_action;
- parser->plugins_action->flush_action = &pluginsd_flush_action;
- parser->plugins_action->end_action = &pluginsd_end_action;
- parser->plugins_action->disable_action = &pluginsd_disable_action;
- parser->plugins_action->variable_action = &pluginsd_variable_action;
- parser->plugins_action->dimension_action = &pluginsd_dimension_action;
- parser->plugins_action->label_action = &pluginsd_label_action;
- parser->plugins_action->overwrite_action = &pluginsd_overwrite_action;
- parser->plugins_action->chart_action = &pluginsd_chart_action;
- parser->plugins_action->set_action = &pluginsd_set_action;
- parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action;
- parser->plugins_action->clabel_action = &pluginsd_clabel_action;
-
user.parser = parser;
while (likely(!parser_next(parser))) {