summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d/pluginsd_parser.c
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/plugins.d/pluginsd_parser.c')
-rw-r--r--collectors/plugins.d/pluginsd_parser.c1220
1 files changed, 674 insertions, 546 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 097e5ea60..cda17710c 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -4,90 +4,103 @@
#define LOG_FUNCTIONS false
-static int send_to_plugin(const char *txt, void *data) {
+static ssize_t send_to_plugin(const char *txt, void *data) {
PARSER *parser = data;
if(!txt || !*txt)
return 0;
+ errno = 0;
+ spinlock_lock(&parser->writer.spinlock);
+ ssize_t bytes = -1;
+
#ifdef ENABLE_HTTPS
NETDATA_SSL *ssl = parser->ssl_output;
if(ssl) {
+
if(SSL_connection(ssl))
- return (int)netdata_ssl_write(ssl, (void *)txt, strlen(txt));
+ bytes = netdata_ssl_write(ssl, (void *) txt, strlen(txt));
- error("PLUGINSD: cannot send command (SSL)");
- return -1;
+ else
+ netdata_log_error("PLUGINSD: cannot send command (SSL)");
+
+ spinlock_unlock(&parser->writer.spinlock);
+ return bytes;
}
#endif
if(parser->fp_output) {
- int bytes = fprintf(parser->fp_output, "%s", txt);
+
+ bytes = fprintf(parser->fp_output, "%s", txt);
if(bytes <= 0) {
- error("PLUGINSD: cannot send command (FILE)");
- return -2;
+ netdata_log_error("PLUGINSD: cannot send command (FILE)");
+ bytes = -2;
}
- fflush(parser->fp_output);
+ else
+ fflush(parser->fp_output);
+
+ spinlock_unlock(&parser->writer.spinlock);
return bytes;
}
if(parser->fd != -1) {
- size_t bytes = 0;
- size_t total = strlen(txt);
+ bytes = 0;
+ ssize_t total = (ssize_t)strlen(txt);
ssize_t sent;
do {
sent = write(parser->fd, &txt[bytes], total - bytes);
if(sent <= 0) {
- error("PLUGINSD: cannot send command (fd)");
+ netdata_log_error("PLUGINSD: cannot send command (fd)");
+ spinlock_unlock(&parser->writer.spinlock);
return -3;
}
bytes += sent;
}
while(bytes < total);
+ spinlock_unlock(&parser->writer.spinlock);
return (int)bytes;
}
- error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
+ spinlock_unlock(&parser->writer.spinlock);
+ netdata_log_error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
return -4;
}
-static inline RRDHOST *pluginsd_require_host_from_parent(void *user, const char *cmd) {
- RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
+static inline RRDHOST *pluginsd_require_host_from_parent(PARSER *parser, const char *cmd) {
+ RRDHOST *host = parser->user.host;
if(unlikely(!host))
- error("PLUGINSD: command %s requires a host, but is not set.", cmd);
+ netdata_log_error("PLUGINSD: command %s requires a host, but is not set.", cmd);
return host;
}
-static inline RRDSET *pluginsd_require_chart_from_parent(void *user, const char *cmd, const char *parent_cmd) {
- RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
+static inline RRDSET *pluginsd_require_chart_from_parent(PARSER *parser, const char *cmd, const char *parent_cmd) {
+ RRDSET *st = parser->user.st;
if(unlikely(!st))
- error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
+ netdata_log_error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
return st;
}
-static inline RRDSET *pluginsd_get_chart_from_parent(void *user) {
- return ((PARSER_USER_OBJECT *) user)->st;
+static inline RRDSET *pluginsd_get_chart_from_parent(PARSER *parser) {
+ return parser->user.st;
}
-static inline void pluginsd_lock_rrdset_data_collection(void *user) {
- PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
- if(u->st && !u->v2.locked_data_collection) {
- netdata_spinlock_lock(&u->st->data_collection_lock);
- u->v2.locked_data_collection = true;
+static inline void pluginsd_lock_rrdset_data_collection(PARSER *parser) {
+ if(parser->user.st && !parser->user.v2.locked_data_collection) {
+ spinlock_lock(&parser->user.st->data_collection_lock);
+ parser->user.v2.locked_data_collection = true;
}
}
-static inline bool pluginsd_unlock_rrdset_data_collection(void *user) {
- PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
- if(u->st && u->v2.locked_data_collection) {
- netdata_spinlock_unlock(&u->st->data_collection_lock);
- u->v2.locked_data_collection = false;
+static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) {
+ if(parser->user.st && parser->user.v2.locked_data_collection) {
+ spinlock_unlock(&parser->user.st->data_collection_lock);
+ parser->user.v2.locked_data_collection = false;
return true;
}
@@ -108,29 +121,29 @@ void pluginsd_rrdset_cleanup(RRDSET *st) {
st->pluginsd.pos = 0;
}
-static inline void pluginsd_unlock_previous_chart(void *user, const char *keyword, bool stale) {
- PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
-
- if(unlikely(pluginsd_unlock_rrdset_data_collection(user))) {
+static inline void pluginsd_unlock_previous_chart(PARSER *parser, const char *keyword, bool stale) {
+ if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) {
if(stale)
- error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
- rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
+ rrdhost_hostname(parser->user.st->rrdhost),
+ rrdset_id(parser->user.st),
+ keyword);
}
- if(unlikely(u->v2.ml_locked)) {
- ml_chart_update_end(u->st);
- u->v2.ml_locked = false;
+ if(unlikely(parser->user.v2.ml_locked)) {
+ ml_chart_update_end(parser->user.st);
+ parser->user.v2.ml_locked = false;
if(stale)
- error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
- rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
+ rrdhost_hostname(parser->user.st->rrdhost),
+ rrdset_id(parser->user.st),
+ keyword);
}
}
-static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const char *keyword) {
- PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
-
- pluginsd_unlock_previous_chart(user, keyword, true);
+static inline void pluginsd_set_chart_from_parent(PARSER *parser, RRDSET *st, const char *keyword) {
+ pluginsd_unlock_previous_chart(parser, keyword, true);
if(st) {
size_t dims = dictionary_entries(st->rrddim_root_index);
@@ -145,13 +158,13 @@ static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const
st->pluginsd.pos = 0;
}
- u->st = st;
+ parser->user.st = st;
}
static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
if (unlikely(!dimension || !*dimension)) {
- error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
- rrdhost_hostname(host), rrdset_id(st), cmd);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
+ rrdhost_hostname(host), rrdset_id(st), cmd);
return NULL;
}
@@ -172,8 +185,8 @@ static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, cons
rda = rrddim_find_and_acquire(st, dimension);
if (unlikely(!rda)) {
- error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
- rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
+ rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
return NULL;
}
@@ -186,21 +199,21 @@ static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, cons
static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
if (unlikely(!chart || !*chart)) {
- error("PLUGINSD: 'host:%s' got a %s without a chart id.",
- rrdhost_hostname(host), cmd);
+ netdata_log_error("PLUGINSD: 'host:%s' got a %s without a chart id.",
+ rrdhost_hostname(host), cmd);
return NULL;
}
RRDSET *st = rrdset_find(host, chart);
if (unlikely(!st))
- error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
- rrdhost_hostname(host), chart, cmd);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
+ rrdhost_hostname(host), chart, cmd);
return st;
}
-static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user, const char *keyword, const char *msg) {
- ((PARSER_USER_OBJECT *) user)->enabled = 0;
+static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg) {
+ parser->user.enabled = 0;
if(keyword && msg) {
error_limit_static_global_var(erl, 1, 0);
@@ -210,22 +223,21 @@ static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user, const char *keyword,
return PARSER_RC_ERROR;
}
-PARSER_RC pluginsd_set(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) {
char *dimension = get_word(words, num_words, 1);
char *value = get_word(words, num_words, 2);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_SET);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
- if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
+ netdata_log_debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
if (value && *value)
@@ -234,18 +246,17 @@ PARSER_RC pluginsd_set(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) {
char *id = get_word(words, num_words, 1);
char *microseconds_txt = get_word(words, num_words, 2);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_BEGIN);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN);
+ pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_BEGIN);
usec_t microseconds = 0;
if (microseconds_txt && *microseconds_txt) {
@@ -270,7 +281,7 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
if (likely(st->counter_done)) {
if (likely(microseconds)) {
- if (((PARSER_USER_OBJECT *)user)->trust_durations)
+ if (parser->user.trust_durations)
rrdset_next_usec_unfiltered(st, microseconds);
else
rrdset_next_usec(st, microseconds);
@@ -281,22 +292,21 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) {
UNUSED(words);
UNUSED(num_words);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
+ netdata_log_debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
- pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_END);
- ((PARSER_USER_OBJECT *) user)->data_collections_count++;
+ pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_END);
+ parser->user.data_collections_count++;
struct timeval now;
now_realtime_timeval(&now);
@@ -305,15 +315,13 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
-static void pluginsd_host_define_cleanup(void *user) {
- PARSER_USER_OBJECT *u = user;
+static void pluginsd_host_define_cleanup(PARSER *parser) {
+ string_freez(parser->user.host_define.hostname);
+ dictionary_destroy(parser->user.host_define.rrdlabels);
- string_freez(u->host_define.hostname);
- dictionary_destroy(u->host_define.rrdlabels);
-
- u->host_define.hostname = NULL;
- u->host_define.rrdlabels = NULL;
- u->host_define.parsing_host = false;
+ parser->user.host_define.hostname = NULL;
+ parser->user.host_define.rrdlabels = NULL;
+ parser->user.host_define.parsing_host = false;
}
static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) {
@@ -325,61 +333,56 @@ static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid
return true;
}
-static PARSER_RC pluginsd_host_define(char **words, size_t num_words, void *user) {
- PARSER_USER_OBJECT *u = user;
-
+static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PARSER *parser) {
char *guid = get_word(words, num_words, 1);
char *hostname = get_word(words, num_words, 2);
if(unlikely(!guid || !*guid || !hostname || !*hostname))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
- if(unlikely(u->host_define.parsing_host))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE,
+ if(unlikely(parser->user.host_define.parsing_host))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE,
"another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?");
- if(!pluginsd_validate_machine_guid(guid, &u->host_define.machine_guid, u->host_define.machine_guid_str))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
+ if(!pluginsd_validate_machine_guid(guid, &parser->user.host_define.machine_guid, parser->user.host_define.machine_guid_str))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
- u->host_define.hostname = string_strdupz(hostname);
- u->host_define.rrdlabels = rrdlabels_create();
- u->host_define.parsing_host = true;
+ parser->user.host_define.hostname = string_strdupz(hostname);
+ parser->user.host_define.rrdlabels = rrdlabels_create();
+ parser->user.host_define.parsing_host = true;
return PARSER_RC_OK;
}
-static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, void *user, DICTIONARY *dict, const char *keyword) {
- PARSER_USER_OBJECT *u = user;
-
+static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, DICTIONARY *dict, const char *keyword) {
char *name = get_word(words, num_words, 1);
char *value = get_word(words, num_words, 2);
if(!name || !*name || !value)
- return PLUGINSD_DISABLE_PLUGIN(user, keyword, "missing parameters");
+ return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters");
- if(!u->host_define.parsing_host || !dict)
- return PLUGINSD_DISABLE_PLUGIN(user, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
+ if(!parser->user.host_define.parsing_host || !dict)
+ return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
rrdlabels_add(dict, name, value, RRDLABEL_SRC_CONFIG);
return PARSER_RC_OK;
}
-static PARSER_RC pluginsd_host_labels(char **words, size_t num_words, void *user) {
- PARSER_USER_OBJECT *u = user;
- return pluginsd_host_dictionary(words, num_words, user, u->host_define.rrdlabels, PLUGINSD_KEYWORD_HOST_LABEL);
+static inline PARSER_RC pluginsd_host_labels(char **words, size_t num_words, PARSER *parser) {
+ return pluginsd_host_dictionary(words, num_words, parser,
+ parser->user.host_define.rrdlabels,
+ PLUGINSD_KEYWORD_HOST_LABEL);
}
-static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) {
- PARSER_USER_OBJECT *u = user;
-
- if(!u->host_define.parsing_host)
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
+static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ if(!parser->user.host_define.parsing_host)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
RRDHOST *host = rrdhost_find_or_create(
- string2str(u->host_define.hostname),
- string2str(u->host_define.hostname),
- u->host_define.machine_guid_str,
+ string2str(parser->user.host_define.hostname),
+ string2str(parser->user.host_define.hostname),
+ parser->user.host_define.machine_guid_str,
"Netdata Virtual Host 1.0",
netdata_configured_timezone,
netdata_configured_abbrev_timezone,
@@ -398,22 +401,24 @@ static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t nu
default_rrdpush_enable_replication,
default_rrdpush_seconds_to_replicate,
default_rrdpush_replication_step,
- rrdhost_labels_to_system_info(u->host_define.rrdlabels),
+ rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels),
false
);
+ rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
+
if(host->rrdlabels) {
- rrdlabels_migrate_to_these(host->rrdlabels, u->host_define.rrdlabels);
+ rrdlabels_migrate_to_these(host->rrdlabels, parser->user.host_define.rrdlabels);
}
else {
- host->rrdlabels = u->host_define.rrdlabels;
- u->host_define.rrdlabels = NULL;
+ host->rrdlabels = parser->user.host_define.rrdlabels;
+ parser->user.host_define.rrdlabels = NULL;
}
- pluginsd_host_define_cleanup(user);
+ pluginsd_host_define_cleanup(parser);
- u->host = host;
- pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_HOST_DEFINE_END);
+ parser->user.host = host;
+ pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_HOST_DEFINE_END);
rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
rrdcontext_host_child_connected(host);
@@ -422,34 +427,31 @@ static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t nu
return PARSER_RC_OK;
}
-static PARSER_RC pluginsd_host(char **words, size_t num_words, void *user) {
- PARSER_USER_OBJECT *u = user;
-
+static inline PARSER_RC pluginsd_host(char **words, size_t num_words, PARSER *parser) {
char *guid = get_word(words, num_words, 1);
if(!guid || !*guid || strcmp(guid, "localhost") == 0) {
- u->host = localhost;
+ parser->user.host = localhost;
return PARSER_RC_OK;
}
uuid_t uuid;
char uuid_str[UUID_STR_LEN];
if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
RRDHOST *host = rrdhost_find_by_guid(uuid_str);
if(unlikely(!host))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
- u->host = host;
+ parser->user.host = host;
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
-{
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *parser) {
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CHART);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
char *type = get_word(words, num_words, 1);
char *name = get_word(words, num_words, 2);
@@ -473,7 +475,7 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
// make sure we have the required variables
if (unlikely((!type || !*type || !id || !*id)))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_CHART, "missing parameters");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_CHART, "missing parameters");
// parse the name, and make sure it does not include 'type.'
if (unlikely(name && *name)) {
@@ -494,11 +496,11 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
if (likely(priority_s && *priority_s))
priority = str2i(priority_s);
- int update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every;
+ int update_every = parser->user.cd->update_every;
if (likely(update_every_s && *update_every_s))
update_every = str2i(update_every_s);
if (unlikely(!update_every))
- update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every;
+ update_every = parser->user.cd->update_every;
RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
if (unlikely(chart))
@@ -515,7 +517,7 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
if (unlikely(!units))
units = "unknown";
- debug(
+ netdata_log_debug(
D_PLUGINSD,
"creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
@@ -525,14 +527,16 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
st = rrdset_create(
host, type, id, name, family, context, title, units,
- (plugin && *plugin) ? plugin : ((PARSER_USER_OBJECT *)user)->cd->filename,
+ (plugin && *plugin) ? plugin : parser->user.cd->filename,
module, priority, update_every,
chart_type);
if (likely(st)) {
if (options && *options) {
- if (strstr(options, "obsolete"))
+ if (strstr(options, "obsolete")) {
+ pluginsd_rrdset_cleanup(st);
rrdset_is_obsolete(st);
+ }
else
rrdset_isnot_obsolete(st);
@@ -556,22 +560,21 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
}
}
- pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_CHART);
+ pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_CHART);
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) {
const char *first_entry_txt = get_word(words, num_words, 1);
const char *last_entry_txt = get_word(words, num_words, 2);
const char *wall_clock_time_txt = get_word(words, num_words, 3);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
@@ -590,7 +593,6 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
- PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser;
ok = replicate_chart_request(send_to_plugin, parser, host, st,
first_entry_child, last_entry_child, child_wall_clock_time,
0, 0);
@@ -605,8 +607,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us
return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
}
-PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) {
char *id = get_word(words, num_words, 1);
char *name = get_word(words, num_words, 2);
char *algorithm = get_word(words, num_words, 3);
@@ -614,14 +615,14 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
char *divisor_s = get_word(words, num_words, 5);
char *options = get_word(words, num_words, 6);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_DIMENSION);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_DIMENSION);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if (unlikely(!id))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
long multiplier = 1;
if (multiplier_s && *multiplier_s) {
@@ -641,7 +642,7 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
algorithm = "absolute";
if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- debug(
+ netdata_log_debug(
D_PLUGINSD,
"creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
@@ -720,7 +721,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
pf->sent_ut = now_realtime_usec();
if(ret < 0) {
- error("FUNCTION: failed to send function to plugin, error %d", ret);
+ netdata_log_error("FUNCTION: failed to send function to plugin, error %d", ret);
rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
}
else {
@@ -734,7 +735,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) {
struct inflight_function *pf = new_func;
- error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
+ netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
pf->callback(pf->destination_wb, pf->code, pf->callback_data);
string_freez(pf->function);
@@ -825,8 +826,7 @@ static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeou
return HTTP_RESP_OK;
}
-PARSER_RC pluginsd_function(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) {
bool global = false;
size_t i = 1;
if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
@@ -838,21 +838,21 @@ PARSER_RC pluginsd_function(char **words, size_t num_words, void *user)
char *timeout_s = get_word(words, num_words, i++);
char *help = get_word(words, num_words, i++);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_FUNCTION);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_FUNCTION);
if(!host) return PARSER_RC_ERROR;
- RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
+ RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
if(!st) global = true;
if (unlikely(!timeout_s || !name || !help || (!global && !st))) {
- error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
- rrdhost_hostname(host),
- st?rrdset_id(st):"(unset)",
- global?"yes":"no",
- name?name:"(unset)",
- timeout_s?timeout_s:"(unset)",
- help?help:"(unset)"
- );
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
+ rrdhost_hostname(host),
+ st?rrdset_id(st):"(unset)",
+ global?"yes":"no",
+ name?name:"(unset)",
+ timeout_s?timeout_s:"(unset)",
+ help?help:"(unset)"
+ );
return PARSER_RC_ERROR;
}
@@ -863,7 +863,6 @@ PARSER_RC pluginsd_function(char **words, size_t num_words, void *user)
timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
}
- PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser;
rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser);
return PARSER_RC_OK;
@@ -876,15 +875,14 @@ static void pluginsd_function_result_end(struct parser *parser, void *action_dat
string_freez(key);
}
-PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, PARSER *parser) {
char *key = get_word(words, num_words, 1);
char *status = get_word(words, num_words, 2);
char *format = get_word(words, num_words, 3);
char *expires = get_word(words, num_words, 4);
if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) {
- error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
+ netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
, key ? key : "(unset)"
, status ? status : "(unset)"
, format ? format : "(unset)"
@@ -898,15 +896,13 @@ PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *u
time_t expiration = (expires && *expires) ? str2l(expires) : 0;
- PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser;
-
struct inflight_function *pf = NULL;
if(key && *key)
pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key);
if(!pf) {
- error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)");
+ netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)");
}
else {
if(format && *format)
@@ -932,16 +928,15 @@ PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *u
// ----------------------------------------------------------------------------
-PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_variable(char **words, size_t num_words, PARSER *parser) {
char *name = get_word(words, num_words, 1);
char *value = get_word(words, num_words, 2);
NETDATA_DOUBLE v;
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_VARIABLE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_VARIABLE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_get_chart_from_parent(user);
+ RRDSET *st = pluginsd_get_chart_from_parent(parser);
int global = (st) ? 0 : 1;
@@ -958,39 +953,39 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
}
if (unlikely(!name || !*name))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
if (unlikely(!value || !*value))
value = NULL;
if (unlikely(!value)) {
- error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
- rrdhost_hostname(host),
- st ? rrdset_id(st):"UNSET",
- (global) ? "HOST" : "CHART",
- name);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ (global) ? "HOST" : "CHART",
+ name);
return PARSER_RC_OK;
}
if (!global && !st)
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
char *endptr = NULL;
v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr);
if (unlikely(endptr && *endptr)) {
if (endptr == value)
- error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
- rrdhost_hostname(host),
- st ? rrdset_id(st):"UNSET",
- value,
- name);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ value,
+ name);
else
- error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
- rrdhost_hostname(host),
- st ? rrdset_id(st):"UNSET",
- value,
- name,
- endptr);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
+ rrdhost_hostname(host),
+ st ? rrdset_id(st):"UNSET",
+ value,
+ name,
+ endptr);
}
if (global) {
@@ -1000,9 +995,9 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
rrdvar_custom_host_variable_release(host, rva);
}
else
- error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
- rrdhost_hostname(host),
- name);
+ netdata_log_error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
+ rrdhost_hostname(host),
+ name);
} else {
const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name);
if (rsa) {
@@ -1010,39 +1005,36 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
rrdsetvar_custom_chart_variable_release(st, rsa);
}
else
- error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
- rrdhost_hostname(host), rrdset_id(st), name);
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
+ rrdhost_hostname(host), rrdset_id(st), name);
}
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
-{
- debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
- pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_FLUSH);
- ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
+static inline PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ netdata_log_debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
+ pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_FLUSH);
+ parser->user.replay.start_time = 0;
+ parser->user.replay.end_time = 0;
+ parser->user.replay.start_time_ut = 0;
+ parser->user.replay.end_time_ut = 0;
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused)
-{
- info("PLUGINSD: plugin called DISABLE. Disabling it.");
- ((PARSER_USER_OBJECT *) user)->enabled = 0;
+static inline PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ netdata_log_info("PLUGINSD: plugin called DISABLE. Disabling it.");
+ parser->user.enabled = 0;
return PARSER_RC_STOP;
}
-PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *parser) {
const char *name = get_word(words, num_words, 1);
const char *label_source = get_word(words, num_words, 2);
const char *value = get_word(words, num_words, 3);
if (!name || !label_source || !value)
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_LABEL, "missing parameters");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_LABEL, "missing parameters");
char *store = (char *)value;
bool allocated_store = false;
@@ -1071,13 +1063,10 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
}
}
- if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels))
- ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create();
+ if(unlikely(!(parser->user.new_host_labels)))
+ parser->user.new_host_labels = rrdlabels_create();
- rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels,
- name,
- store,
- str2l(label_source));
+ rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source));
if (allocated_store)
freez(store);
@@ -1085,90 +1074,84 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
-{
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_OVERWRITE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_OVERWRITE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- debug(D_PLUGINSD, "requested to OVERWRITE host labels");
+ netdata_log_debug(D_PLUGINSD, "requested to OVERWRITE host labels");
if(unlikely(!host->rrdlabels))
host->rrdlabels = rrdlabels_create();
- rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels));
+ rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels);
rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
- rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels);
- ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL;
+ rrdlabels_destroy(parser->user.new_host_labels);
+ parser->user.new_host_labels = NULL;
return PARSER_RC_OK;
}
-
-PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER *parser) {
const char *name = get_word(words, num_words, 1);
const char *value = get_word(words, num_words, 2);
const char *label_source = get_word(words, num_words, 3);
if (!name || !value || !*label_source) {
- error("Ignoring malformed or empty CHART LABEL command.");
- return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ netdata_log_error("Ignoring malformed or empty CHART LABEL command.");
+ return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
}
- if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) {
- RRDSET *st = pluginsd_get_chart_from_parent(user);
- ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = st->rrdlabels;
- rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
+ if(unlikely(!parser->user.chart_rrdlabels_linked_temporarily)) {
+ RRDSET *st = pluginsd_get_chart_from_parent(parser);
+ parser->user.chart_rrdlabels_linked_temporarily = st->rrdlabels;
+ rrdlabels_unmark_all(parser->user.chart_rrdlabels_linked_temporarily);
}
- rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily,
- name, value, str2l(label_source));
+ rrdlabels_add(parser->user.chart_rrdlabels_linked_temporarily, name, value, str2l(label_source));
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
-{
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- debug(D_PLUGINSD, "requested to commit chart labels");
+ netdata_log_debug(D_PLUGINSD, "requested to commit chart labels");
- if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) {
- error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.",
- rrdhost_hostname(host));
- return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ if(!parser->user.chart_rrdlabels_linked_temporarily) {
+ netdata_log_error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host));
+ return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
}
- rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
+ rrdlabels_remove_all_unmarked(parser->user.chart_rrdlabels_linked_temporarily);
rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
- ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = NULL;
+ parser->user.chart_rrdlabels_linked_temporarily = NULL;
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user) {
+static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) {
char *id = get_word(words, num_words, 1);
char *start_time_str = get_word(words, num_words, 2);
char *end_time_str = get_word(words, num_words, 3);
char *child_now_str = get_word(words, num_words, 4);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
RRDSET *st;
if (likely(!id || !*id))
- st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
else
st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
- pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+ pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_REPLAY_BEGIN);
if(start_time_str && end_time_str) {
time_t start_time = (time_t) str2ull_encoded(start_time_str);
@@ -1216,36 +1199,37 @@ PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user) {
st->counter_done++;
// these are only needed for db mode RAM, SAVE, MAP, ALLOC
- st->current_entry++;
- if(st->current_entry >= st->entries)
- st->current_entry -= st->entries;
+ st->db.current_entry++;
+ if(st->db.current_entry >= st->db.entries)
+ st->db.current_entry -= st->db.entries;
- ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time;
- ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time;
- ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
- ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
- ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = wall_clock_time;
- ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = true;
+ parser->user.replay.start_time = start_time;
+ parser->user.replay.end_time = end_time;
+ parser->user.replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
+ parser->user.replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
+ parser->user.replay.wall_clock_time = wall_clock_time;
+ parser->user.replay.rset_enabled = true;
return PARSER_RC_OK;
}
- error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN
- " from %ld to %ld, but timestamps are invalid "
- "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
- rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
- wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance);
+ netdata_log_error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN
+ " from %ld to %ld, but timestamps are invalid "
+ "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
+ rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
+ wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock",
+ tolerance);
}
// the child sends an RBEGIN without any parameters initially
// setting rset_enabled to false, means the RSET should not store any metrics
// to store metrics, the RBEGIN needs to have timestamps
- ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
- ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
+ parser->user.replay.start_time = 0;
+ parser->user.replay.end_time = 0;
+ parser->user.replay.start_time_ut = 0;
+ parser->user.replay.end_time_ut = 0;
+ parser->user.replay.wall_clock_time = 0;
+ parser->user.replay.rset_enabled = false;
return PARSER_RC_OK;
}
@@ -1276,20 +1260,18 @@ static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str
return flags;
}
-PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser) {
char *dimension = get_word(words, num_words, 1);
char *value_str = get_word(words, num_words, 2);
char *flags_str = get_word(words, num_words, 3);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_SET);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- PARSER_USER_OBJECT *u = user;
- if(!u->replay.rset_enabled) {
+ if(!parser->user.replay.rset_enabled) {
error_limit_static_thread_var(erl, 1, 0);
error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
@@ -1299,18 +1281,18 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
}
RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
- if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- if (unlikely(!u->replay.start_time || !u->replay.end_time)) {
- error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.",
+ if (unlikely(!parser->user.replay.start_time || !parser->user.replay.end_time)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.",
rrdhost_hostname(host),
rrdset_id(st),
dimension,
PLUGINSD_KEYWORD_REPLAY_SET,
- u->replay.start_time,
- u->replay.end_time,
+ parser->user.replay.start_time,
+ parser->user.replay.end_time,
PLUGINSD_KEYWORD_REPLAY_BEGIN);
- return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
}
if (unlikely(!value_str || !*value_str))
@@ -1331,10 +1313,10 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
flags = SN_EMPTY_SLOT;
}
- rrddim_store_metric(rd, u->replay.end_time_ut, value, flags);
- rd->last_collected_time.tv_sec = u->replay.end_time;
- rd->last_collected_time.tv_usec = 0;
- rd->collections_counter++;
+ rrddim_store_metric(rd, parser->user.replay.end_time_ut, value, flags);
+ rd->collector.last_collected_time.tv_sec = parser->user.replay.end_time;
+ rd->collector.last_collected_time.tv_usec = 0;
+ rd->collector.counter++;
}
else {
error_limit_static_global_var(erl, 1, 0);
@@ -1346,9 +1328,8 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user)
-{
- if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false)
+static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, PARSER *parser) {
+ if(parser->user.replay.rset_enabled == false)
return PARSER_RC_OK;
char *dimension = get_word(words, num_words, 1);
@@ -1357,42 +1338,41 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words
char *last_calculated_value_str = get_word(words, num_words, 4);
char *last_stored_value_str = get_word(words, num_words, 5);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
- if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec;
+ usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec;
usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
if(last_collected_ut > dim_last_collected_ut) {
- rd->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
- rd->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
+ rd->collector.last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
+ rd->collector.last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
}
- rd->last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0;
- rd->last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0;
- rd->last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0;
+ rd->collector.last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0;
+ rd->collector.last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0;
+ rd->collector.last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0;
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user)
-{
- if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false)
+static inline PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, PARSER *parser) {
+ if(parser->user.replay.rset_enabled == false)
return PARSER_RC_OK;
char *last_collected_ut_str = get_word(words, num_words, 1);
char *last_updated_ut_str = get_word(words, num_words, 2);
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
@@ -1414,10 +1394,9 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
-{
+static inline PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARSER *parser) {
if (num_words < 7) { // accepts 7, but the 7th is optional
- error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
+ netdata_log_error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
return PARSER_RC_ERROR;
}
@@ -1441,13 +1420,11 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t) str2ull_encoded(
child_world_time_txt) : now_realtime_sec();
- PARSER_USER_OBJECT *user_object = user;
-
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_END);
+ if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
internal_error(true,
@@ -1460,12 +1437,12 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
);
#endif
- ((PARSER_USER_OBJECT *) user)->data_collections_count++;
+ parser->user.data_collections_count++;
- if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) {
+ if(parser->user.replay.rset_enabled && st->rrdhost->receiver) {
time_t now = now_realtime_sec();
time_t started = st->rrdhost->receiver->replication_first_time_t;
- time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time;
+ time_t current = parser->user.replay.end_time;
if(started && current > started) {
host->rrdpush_receiver_replication_percent = (NETDATA_DOUBLE) (current - started) * 100.0 / (NETDATA_DOUBLE) (now - started);
@@ -1474,12 +1451,12 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
}
}
- ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
- ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
- ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
- ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
+ parser->user.replay.start_time = 0;
+ parser->user.replay.end_time = 0;
+ parser->user.replay.start_time_ut = 0;
+ parser->user.replay.end_time_ut = 0;
+ parser->user.replay.wall_clock_time = 0;
+ parser->user.replay.rset_enabled = false;
st->counter++;
st->counter_done++;
@@ -1509,7 +1486,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
rrdhost_hostname(host), rrdset_id(st));
#endif
- pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END);
+ pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_REPLAY_END);
host->rrdpush_receiver_replication_percent = 100.0;
worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, host->rrdpush_receiver_replication_percent);
@@ -1517,17 +1494,17 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
return PARSER_RC_OK;
}
- pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END);
+ pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_REPLAY_END);
rrdcontext_updated_retention_rrdset(st);
- bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st,
+ bool ok = replicate_chart_request(send_to_plugin, parser, host, st,
first_entry_child, last_entry_child, child_world_time,
first_entry_requested, last_entry_requested);
return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
}
-PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
+static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) {
timing_init();
char *id = get_word(words, num_words, 1);
@@ -1536,17 +1513,17 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
char *wall_clock_time_str = get_word(words, num_words, 4);
if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_BEGIN_V2);
+ if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
timing_step(TIMING_STEP_BEGIN2_PREPARE);
RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN_V2);
+ pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_BEGIN_V2);
if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE | RRDSET_FLAG_ARCHIVED)))
rrdset_isnot_obsolete(st);
@@ -1573,32 +1550,31 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
// ------------------------------------------------------------------------
// prepare our state
- pluginsd_lock_rrdset_data_collection(user);
+ pluginsd_lock_rrdset_data_collection(parser);
- PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
- u->v2.update_every = update_every;
- u->v2.end_time = end_time;
- u->v2.wall_clock_time = wall_clock_time;
- u->v2.ml_locked = ml_chart_update_begin(st);
+ parser->user.v2.update_every = update_every;
+ parser->user.v2.end_time = end_time;
+ parser->user.v2.wall_clock_time = wall_clock_time;
+ parser->user.v2.ml_locked = ml_chart_update_begin(st);
timing_step(TIMING_STEP_BEGIN2_ML);
// ------------------------------------------------------------------------
// propagate it forward in v2
- if(!u->v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost))
- u->v2.stream_buffer = rrdset_push_metric_initialize(u->st, wall_clock_time);
+ if(!parser->user.v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost))
+ parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time);
- if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.wb) {
+ if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) {
// check if receiver and sender have the same number parsing capabilities
- bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754);
- NUMBER_ENCODING encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+ bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
+ NUMBER_ENCODING encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
- BUFFER *wb = u->v2.stream_buffer.wb;
+ BUFFER *wb = parser->user.v2.stream_buffer.wb;
buffer_need_bytes(wb, 1024);
- if(unlikely(u->v2.stream_buffer.begin_v2_added))
+ if(unlikely(parser->user.v2.stream_buffer.begin_v2_added))
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
@@ -1626,8 +1602,8 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
buffer_fast_strcat(wb, "\n", 1);
- u->v2.stream_buffer.last_point_end_time_s = end_time;
- u->v2.stream_buffer.begin_v2_added = true;
+ parser->user.v2.stream_buffer.last_point_end_time_s = end_time;
+ parser->user.v2.stream_buffer.begin_v2_added = true;
}
timing_step(TIMING_STEP_BEGIN2_PROPAGATE);
@@ -1643,16 +1619,16 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
st->counter_done++;
// these are only needed for db mode RAM, SAVE, MAP, ALLOC
- st->current_entry++;
- if(st->current_entry >= st->entries)
- st->current_entry -= st->entries;
+ st->db.current_entry++;
+ if(st->db.current_entry >= st->db.entries)
+ st->db.current_entry -= st->db.entries;
timing_step(TIMING_STEP_BEGIN2_STORE);
return PARSER_RC_OK;
}
-PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) {
+static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) {
timing_init();
char *dimension = get_word(words, num_words, 1);
@@ -1661,20 +1637,18 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) {
char *flags_str = get_word(words, num_words, 4);
if(unlikely(!dimension || !collected_str || !value_str || !flags_str))
- return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
-
- PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET_V2);
- if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_SET_V2);
+ if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2);
+ if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
timing_step(TIMING_STEP_SET2_PREPARE);
RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET_V2);
- if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED)))
rrddim_isnot_obsolete(st, rd);
@@ -1703,11 +1677,11 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) {
value = NAN;
flags = SN_EMPTY_SLOT;
- if(u->v2.ml_locked)
- ml_dimension_is_anomalous(rd, u->v2.end_time, 0, false);
+ if(parser->user.v2.ml_locked)
+ ml_dimension_is_anomalous(rd, parser->user.v2.end_time, 0, false);
}
- else if(u->v2.ml_locked) {
- if (ml_dimension_is_anomalous(rd, u->v2.end_time, value, true)) {
+ else if(parser->user.v2.ml_locked) {
+ if (ml_dimension_is_anomalous(rd, parser->user.v2.end_time, value, true)) {
// clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS);
}
@@ -1720,13 +1694,13 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) {
// ------------------------------------------------------------------------
// propagate it forward in v2
- if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb) {
+ if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) {
// check if receiver and sender have the same number parsing capabilities
- bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754);
- NUMBER_ENCODING integer_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
- NUMBER_ENCODING doubles_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
+ bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
+ NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+ NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
- BUFFER *wb = u->v2.stream_buffer.wb;
+ BUFFER *wb = parser->user.v2.stream_buffer.wb;
buffer_need_bytes(wb, 1024);
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
@@ -1750,51 +1724,50 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) {
// ------------------------------------------------------------------------
// store it
- rrddim_store_metric(rd, u->v2.end_time * USEC_PER_SEC, value, flags);
- rd->last_collected_time.tv_sec = u->v2.end_time;
- rd->last_collected_time.tv_usec = 0;
- rd->last_collected_value = collected_value;
- rd->last_stored_value = value;
- rd->last_calculated_value = value;
- rd->collections_counter++;
- rd->updated = true;
+ rrddim_store_metric(rd, parser->user.v2.end_time * USEC_PER_SEC, value, flags);
+ rd->collector.last_collected_time.tv_sec = parser->user.v2.end_time;
+ rd->collector.last_collected_time.tv_usec = 0;
+ rd->collector.last_collected_value = collected_value;
+ rd->collector.last_stored_value = value;
+ rd->collector.last_calculated_value = value;
+ rd->collector.counter++;
+ rrddim_set_updated(rd);
timing_step(TIMING_STEP_SET2_STORE);
return PARSER_RC_OK;
}
-void pluginsd_cleanup_v2(void *user) {
+void pluginsd_cleanup_v2(PARSER *parser) {
// this is called when the thread is stopped while processing
- pluginsd_set_chart_from_parent(user, NULL, "THREAD CLEANUP");
+ pluginsd_set_chart_from_parent(parser, NULL, "THREAD CLEANUP");
}
-PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) {
+static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
timing_init();
- RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END_V2);
- if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_END_V2);
+ if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
+ RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2);
+ if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
- u->data_collections_count++;
+ parser->user.data_collections_count++;
timing_step(TIMING_STEP_END2_PREPARE);
// ------------------------------------------------------------------------
// propagate the whole chart update in v1
- if(unlikely(!u->v2.stream_buffer.v2 && !u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb))
- rrdset_push_metrics_v1(&u->v2.stream_buffer, st);
+ if(unlikely(!parser->user.v2.stream_buffer.v2 && !parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb))
+ rrdset_push_metrics_v1(&parser->user.v2.stream_buffer, st);
timing_step(TIMING_STEP_END2_PUSH_V1);
// ------------------------------------------------------------------------
// unblock data collection
- pluginsd_unlock_previous_chart(user, PLUGINSD_KEYWORD_END_V2, false);
+ pluginsd_unlock_previous_chart(parser, PLUGINSD_KEYWORD_END_V2, false);
rrdcontext_collected_rrdset(st);
store_metric_collection_completed();
@@ -1803,7 +1776,7 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_
// ------------------------------------------------------------------------
// propagate it forward
- rrdset_push_metrics_finished(&u->v2.stream_buffer, st);
+ rrdset_push_metrics_finished(&parser->user.v2.stream_buffer, st);
timing_step(TIMING_STEP_END2_PROPAGATE);
@@ -1812,16 +1785,16 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- rd->calculated_value = 0;
- rd->collected_value = 0;
- rd->updated = false;
- }
+ rd->collector.calculated_value = 0;
+ rd->collector.collected_value = 0;
+ rrddim_clear_updated(rd);
+ }
rrddim_foreach_done(rd);
// ------------------------------------------------------------------------
// reset state
- u->v2 = (struct parser_user_object_v2){ 0 };
+ parser->user.v2 = (struct parser_user_object_v2){ 0 };
timing_step(TIMING_STEP_END2_STORE);
timing_report();
@@ -1829,19 +1802,126 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_
return PARSER_RC_OK;
}
+static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
+ netdata_log_info("PLUGINSD: plugin called EXIT.");
+ return PARSER_RC_STOP;
+}
+
+static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser)
+{
+ const char *host_uuid_str = get_word(words, num_words, 1);
+ const char *claim_id_str = get_word(words, num_words, 2);
+
+ if (!host_uuid_str || !claim_id_str) {
+ netdata_log_error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
+ host_uuid_str ? host_uuid_str : "[unset]",
+ claim_id_str ? claim_id_str : "[unset]");
+ return PARSER_RC_ERROR;
+ }
+
+ uuid_t uuid;
+ RRDHOST *host = parser->user.host;
+
+ // We don't need the parsed UUID
+ // just do it to check the format
+ if(uuid_parse(host_uuid_str, uuid)) {
+ netdata_log_error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
+ return PARSER_RC_ERROR;
+ }
+ if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL") != 0) {
+ netdata_log_error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
+ return PARSER_RC_ERROR;
+ }
+
+ if(strcmp(host_uuid_str, host->machine_guid) != 0) {
+ netdata_log_error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
+ return PARSER_RC_OK; //the message is OK problem must be somewhere else
+ }
+
+ rrdhost_aclk_state_lock(host);
+
+ if (host->aclk_state.claimed_id)
+ freez(host->aclk_state.claimed_id);
+
+ host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
+
+ rrdhost_aclk_state_unlock(host);
+
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
+
+ rrdpush_send_claimed_id(host);
+
+ return PARSER_RC_OK;
+}
+
+// ----------------------------------------------------------------------------
+
+static inline bool buffered_reader_read(struct buffered_reader *reader, int fd) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(reader->read_buffer[reader->read_len] != '\0')
+ fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
+#endif
+
+ ssize_t bytes_read = read(fd, reader->read_buffer + reader->read_len, sizeof(reader->read_buffer) - reader->read_len - 1);
+ if(unlikely(bytes_read <= 0))
+ return false;
+
+ reader->read_len += bytes_read;
+ reader->read_buffer[reader->read_len] = '\0';
+
+ return true;
+}
+
+static inline bool buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms) {
+ errno = 0;
+ struct pollfd fds[1];
+
+ fds[0].fd = fd;
+ fds[0].events = POLLIN;
+
+ int ret = poll(fds, 1, timeout_ms);
+
+ if (ret > 0) {
+ /* There is data to read */
+ if (fds[0].revents & POLLIN)
+ return buffered_reader_read(reader, fd);
+
+ else if(fds[0].revents & POLLERR) {
+ netdata_log_error("PARSER: read failed: POLLERR.");
+ return false;
+ }
+ else if(fds[0].revents & POLLHUP) {
+ netdata_log_error("PARSER: read failed: POLLHUP.");
+ return false;
+ }
+ else if(fds[0].revents & POLLNVAL) {
+ netdata_log_error("PARSER: read failed: POLLNVAL.");
+ return false;
+ }
+
+ netdata_log_error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set.");
+ return false;
+ }
+ else if (ret == 0) {
+ netdata_log_error("PARSER: timeout while waiting for data.");
+ return false;
+ }
+
+ netdata_log_error("PARSER: poll() failed with code %d.", ret);
+ return false;
+}
+
void pluginsd_process_thread_cleanup(void *ptr) {
PARSER *parser = (PARSER *)ptr;
- pluginsd_cleanup_v2(parser->user);
- pluginsd_host_define_cleanup(parser->user);
+ pluginsd_cleanup_v2(parser);
+ pluginsd_host_define_cleanup(parser);
rrd_collector_finished();
parser_destroy(parser);
}
-// New plugins.d parser
-
inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
{
int enabled = cd->unsafe.enabled;
@@ -1852,13 +1932,13 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
}
if (unlikely(fileno(fp_plugin_input) == -1)) {
- error("input file descriptor given is not a valid stream");
+ netdata_log_error("input file descriptor given is not a valid stream");
cd->serial_failures++;
return 0;
}
if (unlikely(fileno(fp_plugin_output) == -1)) {
- error("output file descriptor given is not a valid stream");
+ netdata_log_error("output file descriptor given is not a valid stream");
cd->serial_failures++;
return 0;
}
@@ -1866,38 +1946,42 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
clearerr(fp_plugin_input);
clearerr(fp_plugin_output);
- PARSER_USER_OBJECT user = {
- .enabled = cd->unsafe.enabled,
- .host = host,
- .cd = cd,
- .trust_durations = trust_durations
- };
+ PARSER *parser;
+ {
+ PARSER_USER_OBJECT user = {
+ .enabled = cd->unsafe.enabled,
+ .host = host,
+ .cd = cd,
+ .trust_durations = trust_durations
+ };
- // fp_plugin_output = our input; fp_plugin_input = our output
- PARSER *parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1,
- PARSER_INPUT_SPLIT, NULL);
+ // fp_plugin_output = our input; fp_plugin_input = our output
+ parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
+ }
pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD);
rrd_collector_started();
+ size_t count = 0;
+
// this keeps the parser with its current value
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
- user.parser = parser;
- char buffer[PLUGINSD_LINE_MAX + 1];
-
- while (likely(!parser_next(parser, buffer, PLUGINSD_LINE_MAX))) {
- if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, buffer)))
+ buffered_reader_init(&parser->reader);
+ char buffer[PLUGINSD_LINE_MAX + 2];
+ while(likely(service_running(SERVICE_COLLECTORS))) {
+ if (unlikely(!buffered_reader_next_line(&parser->reader, buffer, PLUGINSD_LINE_MAX + 2))) {
+ if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC)))
+ break;
+ }
+ else if(unlikely(parser_action(parser, buffer)))
break;
}
- // free parser with the pop function
- netdata_thread_cleanup_pop(1);
-
- cd->unsafe.enabled = user.enabled;
- size_t count = user.data_collections_count;
+ cd->unsafe.enabled = parser->user.enabled;
+ count = parser->user.data_collections_count;
if (likely(count)) {
cd->successful_collections += count;
@@ -1906,143 +1990,187 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
else
cd->serial_failures++;
+ // free parser with the pop function
+ netdata_thread_cleanup_pop(1);
+
return count;
}
-PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused)
-{
- info("PLUGINSD: plugin called EXIT.");
- return PARSER_RC_STOP;
+void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire) {
+ parser_init_repertoire(parser, repertoire);
+
+ if (repertoire & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING))
+ inflight_functions_init(parser);
}
-static void pluginsd_keywords_init_internal(PARSER *parser, PLUGINSD_KEYWORDS types, void (*add_func)(PARSER *parser, char *keyword, keyword_function func)) {
+PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd,
+ PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) {
+ PARSER *parser;
- if (types & PARSER_INIT_PLUGINSD) {
- add_func(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush);
- add_func(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable);
+ parser = callocz(1, sizeof(*parser));
+ if(user)
+ parser->user = *user;
+ parser->fd = fd;
+ parser->fp_input = fp_input;
+ parser->fp_output = fp_output;
+#ifdef ENABLE_HTTPS
+ parser->ssl_output = ssl;
+#endif
+ parser->flags = flags;
- add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE, pluginsd_host_define);
- add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, pluginsd_host_define_end);
- add_func(parser, PLUGINSD_KEYWORD_HOST_LABEL, pluginsd_host_labels);
- add_func(parser, PLUGINSD_KEYWORD_HOST, pluginsd_host);
+ spinlock_init(&parser->writer.spinlock);
+ return parser;
+}
- add_func(parser, PLUGINSD_KEYWORD_EXIT, pluginsd_exit);
- }
+PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words) {
+ switch(keyword->id) {
+ case 1:
+ return pluginsd_set_v2(words, num_words, parser);
- if (types & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING)) {
- // plugins.d plugins and streaming
- add_func(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart);
- add_func(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension);
- add_func(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable);
- add_func(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label);
- add_func(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite);
- add_func(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, pluginsd_clabel_commit);
- add_func(parser, PLUGINSD_KEYWORD_CLABEL, pluginsd_clabel);
- add_func(parser, PLUGINSD_KEYWORD_FUNCTION, pluginsd_function);
- add_func(parser, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN, pluginsd_function_result_begin);
+ case 2:
+ return pluginsd_begin_v2(words, num_words, parser);
- add_func(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin);
- add_func(parser, PLUGINSD_KEYWORD_SET, pluginsd_set);
- add_func(parser, PLUGINSD_KEYWORD_END, pluginsd_end);
+ case 3:
+ return pluginsd_end_v2(words, num_words, parser);
- inflight_functions_init(parser);
- }
+ case 11:
+ return pluginsd_set(words, num_words, parser);
- if (types & PARSER_INIT_STREAMING) {
- add_func(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, pluginsd_chart_definition_end);
+ case 12:
+ return pluginsd_begin(words, num_words, parser);
- // replication
- add_func(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, pluginsd_replay_begin);
- add_func(parser, PLUGINSD_KEYWORD_REPLAY_SET, pluginsd_replay_set);
- add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, pluginsd_replay_rrddim_collection_state);
- add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, pluginsd_replay_rrdset_collection_state);
- add_func(parser, PLUGINSD_KEYWORD_REPLAY_END, pluginsd_replay_end);
+ case 13:
+ return pluginsd_end(words, num_words, parser);
- // streaming metrics v2
- add_func(parser, PLUGINSD_KEYWORD_BEGIN_V2, pluginsd_begin_v2);
- add_func(parser, PLUGINSD_KEYWORD_SET_V2, pluginsd_set_v2);
- add_func(parser, PLUGINSD_KEYWORD_END_V2, pluginsd_end_v2);
- }
-}
+ case 21:
+ return pluginsd_replay_set(words, num_words, parser);
-void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types) {
- pluginsd_keywords_init_internal(parser, types, parser_add_keyword);
-}
+ case 22:
+ return pluginsd_replay_begin(words, num_words, parser);
-struct pluginsd_user_unittest {
- size_t size;
- const char **hashtable;
- uint32_t (*hash)(const char *s);
- size_t collisions;
-};
+ case 23:
+ return pluginsd_replay_rrddim_collection_state(words, num_words, parser);
+
+ case 24:
+ return pluginsd_replay_rrdset_collection_state(words, num_words, parser);
+
+ case 25:
+ return pluginsd_replay_end(words, num_words, parser);
+
+ case 31:
+ return pluginsd_dimension(words, num_words, parser);
+
+ case 32:
+ return pluginsd_chart(words, num_words, parser);
+
+ case 33:
+ return pluginsd_chart_definition_end(words, num_words, parser);
+
+ case 34:
+ return pluginsd_clabel(words, num_words, parser);
+
+ case 35:
+ return pluginsd_clabel_commit(words, num_words, parser);
+
+ case 41:
+ return pluginsd_function(words, num_words, parser);
+
+ case 42:
+ return pluginsd_function_result_begin(words, num_words, parser);
+
+ case 51:
+ return pluginsd_label(words, num_words, parser);
+
+ case 52:
+ return pluginsd_overwrite(words, num_words, parser);
+
+ case 53:
+ return pluginsd_variable(words, num_words, parser);
+
+ case 61:
+ return streaming_claimed_id(words, num_words, parser);
+
+ case 71:
+ return pluginsd_host(words, num_words, parser);
+
+ case 72:
+ return pluginsd_host_define(words, num_words, parser);
+
+ case 73:
+ return pluginsd_host_define_end(words, num_words, parser);
-void pluginsd_keyword_collision_check(PARSER *parser, char *keyword, keyword_function func __maybe_unused) {
- struct pluginsd_user_unittest *u = parser->user;
+ case 74:
+ return pluginsd_host_labels(words, num_words, parser);
- uint32_t hash = u->hash(keyword);
- uint32_t slot = hash % u->size;
+ case 97:
+ return pluginsd_flush(words, num_words, parser);
- if(u->hashtable[slot])
- u->collisions++;
+ case 98:
+ return pluginsd_disable(words, num_words, parser);
- u->hashtable[slot] = keyword;
+ case 99:
+ return pluginsd_exit(words, num_words, parser);
+
+ default:
+ fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id);
+ }
}
-static struct {
- const char *name;
- uint32_t (*hash)(const char *s);
- size_t slots_needed;
-} hashers[] = {
- { .name = "djb2_hash32(s)", djb2_hash32, .slots_needed = 0, },
- { .name = "fnv1_hash32(s)", fnv1_hash32, .slots_needed = 0, },
- { .name = "fnv1a_hash32(s)", fnv1a_hash32, .slots_needed = 0, },
- { .name = "larson_hash32(s)", larson_hash32, .slots_needed = 0, },
- { .name = "pluginsd_parser_hash32(s)", pluginsd_parser_hash32, .slots_needed = 0, },
-
- // terminator
- { .name = NULL, NULL, .slots_needed = 0, },
-};
+#include "gperf-hashtable.h"
+
+void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) {
+ parser->repertoire = repertoire;
+
+ for(size_t i = GPERF_PARSER_MIN_HASH_VALUE ; i <= GPERF_PARSER_MAX_HASH_VALUE ;i++) {
+ if(gperf_keywords[i].keyword && *gperf_keywords[i].keyword && (parser->repertoire & gperf_keywords[i].repertoire))
+ worker_register_job_name(gperf_keywords[i].worker_job_id, gperf_keywords[i].keyword);
+ }
+}
+
+void parser_destroy(PARSER *parser) {
+ if (unlikely(!parser))
+ return;
+
+ dictionary_destroy(parser->inflight.functions);
+ freez(parser);
+}
int pluginsd_parser_unittest(void) {
- PARSER *p;
- size_t slots_to_check = 1000;
- size_t i, h;
-
- // check for hashtable collisions
- for(h = 0; hashers[h].name ;h++) {
- hashers[h].slots_needed = slots_to_check * 1000000;
-
- for (i = 10; i < slots_to_check; i++) {
- struct pluginsd_user_unittest user = {
- .hash = hashers[h].hash,
- .size = i,
- .hashtable = callocz(i, sizeof(const char *)),
- .collisions = 0,
- };
-
- p = parser_init(&user, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
- pluginsd_keywords_init_internal(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING,
- pluginsd_keyword_collision_check);
- parser_destroy(p);
-
- freez(user.hashtable);
-
- if (!user.collisions) {
- hashers[h].slots_needed = i;
- break;
- }
+ PARSER *p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
+ pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
+
+ char *lines[] = {
+ "BEGIN2 abcdefghijklmnopqr 123",
+ "SET2 abcdefg 0x12345678 0 0",
+ "SET2 hijklmnoqr 0x12345678 0 0",
+ "SET2 stuvwxyz 0x12345678 0 0",
+ "END2",
+ NULL,
+ };
+
+ char *words[PLUGINSD_MAX_WORDS];
+ size_t iterations = 1000000;
+ size_t count = 0;
+ char input[PLUGINSD_LINE_MAX + 1];
+
+ usec_t started = now_realtime_usec();
+ while(--iterations) {
+ for(size_t line = 0; lines[line] ;line++) {
+ strncpyz(input, lines[line], PLUGINSD_LINE_MAX);
+ size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
+ const char *command = get_word(words, num_words, 0);
+ PARSER_KEYWORD *keyword = parser_find_keyword(p, command);
+ if(unlikely(!keyword))
+ fatal("Cannot parse the line '%s'", lines[line]);
+ count++;
}
}
+ usec_t ended = now_realtime_usec();
- for(h = 0; hashers[h].name ;h++) {
- if(hashers[h].slots_needed > 1000)
- info("PARSER: hash function '%s' cannot be used without collisions under %zu slots", hashers[h].name, slots_to_check);
- else
- info("PARSER: hash function '%s' needs PARSER_KEYWORDS_HASHTABLE_SIZE (in parser.h) set to %zu", hashers[h].name, hashers[h].slots_needed);
- }
+ netdata_log_info("Parsed %zu lines in %0.2f secs, %0.2f klines/sec", count,
+ (double)(ended - started) / (double)USEC_PER_SEC,
+ (double)count / ((double)(ended - started) / (double)USEC_PER_SEC) / 1000.0);
- p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
- pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
parser_destroy(p);
return 0;
}