From ab1bb5b7f1c3c3a7b240ab7fc8661459ecd7decb Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 20 Jul 2023 06:49:55 +0200 Subject: Adding upstream version 1.41.0. Signed-off-by: Daniel Baumann --- collectors/plugins.d/pluginsd_parser.c | 1220 ++++++++++++++++++-------------- 1 file changed, 674 insertions(+), 546 deletions(-) (limited to 'collectors/plugins.d/pluginsd_parser.c') diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 097e5ea60..cda17710c 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -4,90 +4,103 @@ #define LOG_FUNCTIONS false -static int send_to_plugin(const char *txt, void *data) { +static ssize_t send_to_plugin(const char *txt, void *data) { PARSER *parser = data; if(!txt || !*txt) return 0; + errno = 0; + spinlock_lock(&parser->writer.spinlock); + ssize_t bytes = -1; + #ifdef ENABLE_HTTPS NETDATA_SSL *ssl = parser->ssl_output; if(ssl) { + if(SSL_connection(ssl)) - return (int)netdata_ssl_write(ssl, (void *)txt, strlen(txt)); + bytes = netdata_ssl_write(ssl, (void *) txt, strlen(txt)); - error("PLUGINSD: cannot send command (SSL)"); - return -1; + else + netdata_log_error("PLUGINSD: cannot send command (SSL)"); + + spinlock_unlock(&parser->writer.spinlock); + return bytes; } #endif if(parser->fp_output) { - int bytes = fprintf(parser->fp_output, "%s", txt); + + bytes = fprintf(parser->fp_output, "%s", txt); if(bytes <= 0) { - error("PLUGINSD: cannot send command (FILE)"); - return -2; + netdata_log_error("PLUGINSD: cannot send command (FILE)"); + bytes = -2; } - fflush(parser->fp_output); + else + fflush(parser->fp_output); + + spinlock_unlock(&parser->writer.spinlock); return bytes; } if(parser->fd != -1) { - size_t bytes = 0; - size_t total = strlen(txt); + bytes = 0; + ssize_t total = (ssize_t)strlen(txt); ssize_t sent; do { sent = write(parser->fd, &txt[bytes], total - bytes); if(sent <= 0) { - error("PLUGINSD: cannot send command (fd)"); + netdata_log_error("PLUGINSD: cannot send command (fd)"); + spinlock_unlock(&parser->writer.spinlock); return -3; } bytes += sent; } while(bytes < total); + spinlock_unlock(&parser->writer.spinlock); return (int)bytes; } - error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)"); + spinlock_unlock(&parser->writer.spinlock); + netdata_log_error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)"); return -4; } -static inline RRDHOST *pluginsd_require_host_from_parent(void *user, const char *cmd) { - RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host; +static inline RRDHOST *pluginsd_require_host_from_parent(PARSER *parser, const char *cmd) { + RRDHOST *host = parser->user.host; if(unlikely(!host)) - error("PLUGINSD: command %s requires a host, but is not set.", cmd); + netdata_log_error("PLUGINSD: command %s requires a host, but is not set.", cmd); return host; } -static inline RRDSET *pluginsd_require_chart_from_parent(void *user, const char *cmd, const char *parent_cmd) { - RRDSET *st = ((PARSER_USER_OBJECT *) user)->st; +static inline RRDSET *pluginsd_require_chart_from_parent(PARSER *parser, const char *cmd, const char *parent_cmd) { + RRDSET *st = parser->user.st; if(unlikely(!st)) - error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd); + netdata_log_error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd); return st; } -static inline RRDSET *pluginsd_get_chart_from_parent(void *user) { - return ((PARSER_USER_OBJECT *) user)->st; +static inline RRDSET *pluginsd_get_chart_from_parent(PARSER *parser) { + return parser->user.st; } -static inline void pluginsd_lock_rrdset_data_collection(void *user) { - PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; - if(u->st && !u->v2.locked_data_collection) { - netdata_spinlock_lock(&u->st->data_collection_lock); - u->v2.locked_data_collection = true; +static inline void pluginsd_lock_rrdset_data_collection(PARSER *parser) { + if(parser->user.st && !parser->user.v2.locked_data_collection) { + spinlock_lock(&parser->user.st->data_collection_lock); + parser->user.v2.locked_data_collection = true; } } -static inline bool pluginsd_unlock_rrdset_data_collection(void *user) { - PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; - if(u->st && u->v2.locked_data_collection) { - netdata_spinlock_unlock(&u->st->data_collection_lock); - u->v2.locked_data_collection = false; +static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) { + if(parser->user.st && parser->user.v2.locked_data_collection) { + spinlock_unlock(&parser->user.st->data_collection_lock); + parser->user.v2.locked_data_collection = false; return true; } @@ -108,29 +121,29 @@ void pluginsd_rrdset_cleanup(RRDSET *st) { st->pluginsd.pos = 0; } -static inline void pluginsd_unlock_previous_chart(void *user, const char *keyword, bool stale) { - PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; - - if(unlikely(pluginsd_unlock_rrdset_data_collection(user))) { +static inline void pluginsd_unlock_previous_chart(PARSER *parser, const char *keyword, bool stale) { + if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) { if(stale) - error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked", - rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked", + rrdhost_hostname(parser->user.st->rrdhost), + rrdset_id(parser->user.st), + keyword); } - if(unlikely(u->v2.ml_locked)) { - ml_chart_update_end(u->st); - u->v2.ml_locked = false; + if(unlikely(parser->user.v2.ml_locked)) { + ml_chart_update_end(parser->user.st); + parser->user.v2.ml_locked = false; if(stale) - error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked", - rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked", + rrdhost_hostname(parser->user.st->rrdhost), + rrdset_id(parser->user.st), + keyword); } } -static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const char *keyword) { - PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; - - pluginsd_unlock_previous_chart(user, keyword, true); +static inline void pluginsd_set_chart_from_parent(PARSER *parser, RRDSET *st, const char *keyword) { + pluginsd_unlock_previous_chart(parser, keyword, true); if(st) { size_t dims = dictionary_entries(st->rrddim_root_index); @@ -145,13 +158,13 @@ static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const st->pluginsd.pos = 0; } - u->st = st; + parser->user.st = st; } static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) { if (unlikely(!dimension || !*dimension)) { - error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.", - rrdhost_hostname(host), rrdset_id(st), cmd); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.", + rrdhost_hostname(host), rrdset_id(st), cmd); return NULL; } @@ -172,8 +185,8 @@ static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, cons rda = rrddim_find_and_acquire(st, dimension); if (unlikely(!rda)) { - error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.", - rrdhost_hostname(host), rrdset_id(st), dimension, cmd); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.", + rrdhost_hostname(host), rrdset_id(st), dimension, cmd); return NULL; } @@ -186,21 +199,21 @@ static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, cons static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) { if (unlikely(!chart || !*chart)) { - error("PLUGINSD: 'host:%s' got a %s without a chart id.", - rrdhost_hostname(host), cmd); + netdata_log_error("PLUGINSD: 'host:%s' got a %s without a chart id.", + rrdhost_hostname(host), cmd); return NULL; } RRDSET *st = rrdset_find(host, chart); if (unlikely(!st)) - error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.", - rrdhost_hostname(host), chart, cmd); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.", + rrdhost_hostname(host), chart, cmd); return st; } -static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user, const char *keyword, const char *msg) { - ((PARSER_USER_OBJECT *) user)->enabled = 0; +static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg) { + parser->user.enabled = 0; if(keyword && msg) { error_limit_static_global_var(erl, 1, 0); @@ -210,22 +223,21 @@ static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user, const char *keyword, return PARSER_RC_ERROR; } -PARSER_RC pluginsd_set(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) { char *dimension = get_word(words, num_words, 1); char *value = get_word(words, num_words, 2); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_SET); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET); - if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'", + netdata_log_debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'", rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET"); if (value && *value) @@ -234,18 +246,17 @@ PARSER_RC pluginsd_set(char **words, size_t num_words, void *user) return PARSER_RC_OK; } -PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) { char *id = get_word(words, num_words, 1); char *microseconds_txt = get_word(words, num_words, 2); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_BEGIN); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN); + pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_BEGIN); usec_t microseconds = 0; if (microseconds_txt && *microseconds_txt) { @@ -270,7 +281,7 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user) if (likely(st->counter_done)) { if (likely(microseconds)) { - if (((PARSER_USER_OBJECT *)user)->trust_durations) + if (parser->user.trust_durations) rrdset_next_usec_unfiltered(st, microseconds); else rrdset_next_usec(st, microseconds); @@ -281,22 +292,21 @@ PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user) return PARSER_RC_OK; } -PARSER_RC pluginsd_end(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) { UNUSED(words); UNUSED(num_words); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_END); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st)); + netdata_log_debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st)); - pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_END); - ((PARSER_USER_OBJECT *) user)->data_collections_count++; + pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_END); + parser->user.data_collections_count++; struct timeval now; now_realtime_timeval(&now); @@ -305,15 +315,13 @@ PARSER_RC pluginsd_end(char **words, size_t num_words, void *user) return PARSER_RC_OK; } -static void pluginsd_host_define_cleanup(void *user) { - PARSER_USER_OBJECT *u = user; +static void pluginsd_host_define_cleanup(PARSER *parser) { + string_freez(parser->user.host_define.hostname); + dictionary_destroy(parser->user.host_define.rrdlabels); - string_freez(u->host_define.hostname); - dictionary_destroy(u->host_define.rrdlabels); - - u->host_define.hostname = NULL; - u->host_define.rrdlabels = NULL; - u->host_define.parsing_host = false; + parser->user.host_define.hostname = NULL; + parser->user.host_define.rrdlabels = NULL; + parser->user.host_define.parsing_host = false; } static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) { @@ -325,61 +333,56 @@ static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid return true; } -static PARSER_RC pluginsd_host_define(char **words, size_t num_words, void *user) { - PARSER_USER_OBJECT *u = user; - +static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PARSER *parser) { char *guid = get_word(words, num_words, 1); char *hostname = get_word(words, num_words, 2); if(unlikely(!guid || !*guid || !hostname || !*hostname)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters"); - if(unlikely(u->host_define.parsing_host)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, + if(unlikely(parser->user.host_define.parsing_host)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?"); - if(!pluginsd_validate_machine_guid(guid, &u->host_define.machine_guid, u->host_define.machine_guid_str)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?"); + if(!pluginsd_validate_machine_guid(guid, &parser->user.host_define.machine_guid, parser->user.host_define.machine_guid_str)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?"); - u->host_define.hostname = string_strdupz(hostname); - u->host_define.rrdlabels = rrdlabels_create(); - u->host_define.parsing_host = true; + parser->user.host_define.hostname = string_strdupz(hostname); + parser->user.host_define.rrdlabels = rrdlabels_create(); + parser->user.host_define.parsing_host = true; return PARSER_RC_OK; } -static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, void *user, DICTIONARY *dict, const char *keyword) { - PARSER_USER_OBJECT *u = user; - +static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, DICTIONARY *dict, const char *keyword) { char *name = get_word(words, num_words, 1); char *value = get_word(words, num_words, 2); if(!name || !*name || !value) - return PLUGINSD_DISABLE_PLUGIN(user, keyword, "missing parameters"); + return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters"); - if(!u->host_define.parsing_host || !dict) - return PLUGINSD_DISABLE_PLUGIN(user, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); + if(!parser->user.host_define.parsing_host || !dict) + return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); rrdlabels_add(dict, name, value, RRDLABEL_SRC_CONFIG); return PARSER_RC_OK; } -static PARSER_RC pluginsd_host_labels(char **words, size_t num_words, void *user) { - PARSER_USER_OBJECT *u = user; - return pluginsd_host_dictionary(words, num_words, user, u->host_define.rrdlabels, PLUGINSD_KEYWORD_HOST_LABEL); +static inline PARSER_RC pluginsd_host_labels(char **words, size_t num_words, PARSER *parser) { + return pluginsd_host_dictionary(words, num_words, parser, + parser->user.host_define.rrdlabels, + PLUGINSD_KEYWORD_HOST_LABEL); } -static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { - PARSER_USER_OBJECT *u = user; - - if(!u->host_define.parsing_host) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); +static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { + if(!parser->user.host_define.parsing_host) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); RRDHOST *host = rrdhost_find_or_create( - string2str(u->host_define.hostname), - string2str(u->host_define.hostname), - u->host_define.machine_guid_str, + string2str(parser->user.host_define.hostname), + string2str(parser->user.host_define.hostname), + parser->user.host_define.machine_guid_str, "Netdata Virtual Host 1.0", netdata_configured_timezone, netdata_configured_abbrev_timezone, @@ -398,22 +401,24 @@ static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t nu default_rrdpush_enable_replication, default_rrdpush_seconds_to_replicate, default_rrdpush_replication_step, - rrdhost_labels_to_system_info(u->host_define.rrdlabels), + rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels), false ); + rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST); + if(host->rrdlabels) { - rrdlabels_migrate_to_these(host->rrdlabels, u->host_define.rrdlabels); + rrdlabels_migrate_to_these(host->rrdlabels, parser->user.host_define.rrdlabels); } else { - host->rrdlabels = u->host_define.rrdlabels; - u->host_define.rrdlabels = NULL; + host->rrdlabels = parser->user.host_define.rrdlabels; + parser->user.host_define.rrdlabels = NULL; } - pluginsd_host_define_cleanup(user); + pluginsd_host_define_cleanup(parser); - u->host = host; - pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_HOST_DEFINE_END); + parser->user.host = host; + pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_HOST_DEFINE_END); rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); rrdcontext_host_child_connected(host); @@ -422,34 +427,31 @@ static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t nu return PARSER_RC_OK; } -static PARSER_RC pluginsd_host(char **words, size_t num_words, void *user) { - PARSER_USER_OBJECT *u = user; - +static inline PARSER_RC pluginsd_host(char **words, size_t num_words, PARSER *parser) { char *guid = get_word(words, num_words, 1); if(!guid || !*guid || strcmp(guid, "localhost") == 0) { - u->host = localhost; + parser->user.host = localhost; return PARSER_RC_OK; } uuid_t uuid; char uuid_str[UUID_STR_LEN]; if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?"); RRDHOST *host = rrdhost_find_by_guid(uuid_str); if(unlikely(!host)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?"); - u->host = host; + parser->user.host = host; return PARSER_RC_OK; } -PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) -{ - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); +static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *parser) { + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CHART); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); char *type = get_word(words, num_words, 1); char *name = get_word(words, num_words, 2); @@ -473,7 +475,7 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) // make sure we have the required variables if (unlikely((!type || !*type || !id || !*id))) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_CHART, "missing parameters"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_CHART, "missing parameters"); // parse the name, and make sure it does not include 'type.' if (unlikely(name && *name)) { @@ -494,11 +496,11 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) if (likely(priority_s && *priority_s)) priority = str2i(priority_s); - int update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every; + int update_every = parser->user.cd->update_every; if (likely(update_every_s && *update_every_s)) update_every = str2i(update_every_s); if (unlikely(!update_every)) - update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every; + update_every = parser->user.cd->update_every; RRDSET_TYPE chart_type = RRDSET_TYPE_LINE; if (unlikely(chart)) @@ -515,7 +517,7 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) if (unlikely(!units)) units = "unknown"; - debug( + netdata_log_debug( D_PLUGINSD, "creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d", type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type), @@ -525,14 +527,16 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) st = rrdset_create( host, type, id, name, family, context, title, units, - (plugin && *plugin) ? plugin : ((PARSER_USER_OBJECT *)user)->cd->filename, + (plugin && *plugin) ? plugin : parser->user.cd->filename, module, priority, update_every, chart_type); if (likely(st)) { if (options && *options) { - if (strstr(options, "obsolete")) + if (strstr(options, "obsolete")) { + pluginsd_rrdset_cleanup(st); rrdset_is_obsolete(st); + } else rrdset_isnot_obsolete(st); @@ -556,22 +560,21 @@ PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user) rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); } } - pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_CHART); + pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_CHART); return PARSER_RC_OK; } -PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) { const char *first_entry_txt = get_word(words, num_words, 1); const char *last_entry_txt = get_word(words, num_words, 2); const char *wall_clock_time_txt = get_word(words, num_words, 3); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0; time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0; @@ -590,7 +593,6 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); rrdhost_receiver_replicating_charts_plus_one(st->rrdhost); - PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser; ok = replicate_chart_request(send_to_plugin, parser, host, st, first_entry_child, last_entry_child, child_wall_clock_time, 0, 0); @@ -605,8 +607,7 @@ PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *us return ok ? PARSER_RC_OK : PARSER_RC_ERROR; } -PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) { char *id = get_word(words, num_words, 1); char *name = get_word(words, num_words, 2); char *algorithm = get_word(words, num_words, 3); @@ -614,14 +615,14 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user) char *divisor_s = get_word(words, num_words, 5); char *options = get_word(words, num_words, 6); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_DIMENSION); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_DIMENSION); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if (unlikely(!id)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id"); long multiplier = 1; if (multiplier_s && *multiplier_s) { @@ -641,7 +642,7 @@ PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user) algorithm = "absolute"; if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG))) - debug( + netdata_log_debug( D_PLUGINSD, "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'", rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor, @@ -720,7 +721,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void pf->sent_ut = now_realtime_usec(); if(ret < 0) { - error("FUNCTION: failed to send function to plugin, error %d", ret); + netdata_log_error("FUNCTION: failed to send function to plugin, error %d", ret); rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED); } else { @@ -734,7 +735,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) { struct inflight_function *pf = new_func; - error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function)); + netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function)); pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST); pf->callback(pf->destination_wb, pf->code, pf->callback_data); string_freez(pf->function); @@ -825,8 +826,7 @@ static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeou return HTTP_RESP_OK; } -PARSER_RC pluginsd_function(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) { bool global = false; size_t i = 1; if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) { @@ -838,21 +838,21 @@ PARSER_RC pluginsd_function(char **words, size_t num_words, void *user) char *timeout_s = get_word(words, num_words, i++); char *help = get_word(words, num_words, i++); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_FUNCTION); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_FUNCTION); if(!host) return PARSER_RC_ERROR; - RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART); + RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART); if(!st) global = true; if (unlikely(!timeout_s || !name || !help || (!global && !st))) { - error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.", - rrdhost_hostname(host), - st?rrdset_id(st):"(unset)", - global?"yes":"no", - name?name:"(unset)", - timeout_s?timeout_s:"(unset)", - help?help:"(unset)" - ); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.", + rrdhost_hostname(host), + st?rrdset_id(st):"(unset)", + global?"yes":"no", + name?name:"(unset)", + timeout_s?timeout_s:"(unset)", + help?help:"(unset)" + ); return PARSER_RC_ERROR; } @@ -863,7 +863,6 @@ PARSER_RC pluginsd_function(char **words, size_t num_words, void *user) timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT; } - PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser; rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser); return PARSER_RC_OK; @@ -876,15 +875,14 @@ static void pluginsd_function_result_end(struct parser *parser, void *action_dat string_freez(key); } -PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, PARSER *parser) { char *key = get_word(words, num_words, 1); char *status = get_word(words, num_words, 2); char *format = get_word(words, num_words, 3); char *expires = get_word(words, num_words, 4); if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) { - error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')." + netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')." , key ? key : "(unset)" , status ? status : "(unset)" , format ? format : "(unset)" @@ -898,15 +896,13 @@ PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *u time_t expiration = (expires && *expires) ? str2l(expires) : 0; - PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser; - struct inflight_function *pf = NULL; if(key && *key) pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key); if(!pf) { - error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)"); + netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)"); } else { if(format && *format) @@ -932,16 +928,15 @@ PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *u // ---------------------------------------------------------------------------- -PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_variable(char **words, size_t num_words, PARSER *parser) { char *name = get_word(words, num_words, 1); char *value = get_word(words, num_words, 2); NETDATA_DOUBLE v; - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_VARIABLE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_VARIABLE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_get_chart_from_parent(user); + RRDSET *st = pluginsd_get_chart_from_parent(parser); int global = (st) ? 0 : 1; @@ -958,39 +953,39 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) } if (unlikely(!name || !*name)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "missing variable name"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "missing variable name"); if (unlikely(!value || !*value)) value = NULL; if (unlikely(!value)) { - error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value", - rrdhost_hostname(host), - st ? rrdset_id(st):"UNSET", - (global) ? "HOST" : "CHART", - name); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value", + rrdhost_hostname(host), + st ? rrdset_id(st):"UNSET", + (global) ? "HOST" : "CHART", + name); return PARSER_RC_OK; } if (!global && !st) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given"); char *endptr = NULL; v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr); if (unlikely(endptr && *endptr)) { if (endptr == value) - error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number", - rrdhost_hostname(host), - st ? rrdset_id(st):"UNSET", - value, - name); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number", + rrdhost_hostname(host), + st ? rrdset_id(st):"UNSET", + value, + name); else - error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'", - rrdhost_hostname(host), - st ? rrdset_id(st):"UNSET", - value, - name, - endptr); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'", + rrdhost_hostname(host), + st ? rrdset_id(st):"UNSET", + value, + name, + endptr); } if (global) { @@ -1000,9 +995,9 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) rrdvar_custom_host_variable_release(host, rva); } else - error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'", - rrdhost_hostname(host), - name); + netdata_log_error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'", + rrdhost_hostname(host), + name); } else { const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name); if (rsa) { @@ -1010,39 +1005,36 @@ PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user) rrdsetvar_custom_chart_variable_release(st, rsa); } else - error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'", - rrdhost_hostname(host), rrdset_id(st), name); + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'", + rrdhost_hostname(host), rrdset_id(st), name); } return PARSER_RC_OK; } -PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) -{ - debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH); - pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_FLUSH); - ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; +static inline PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { + netdata_log_debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH); + pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_FLUSH); + parser->user.replay.start_time = 0; + parser->user.replay.end_time = 0; + parser->user.replay.start_time_ut = 0; + parser->user.replay.end_time_ut = 0; return PARSER_RC_OK; } -PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused) -{ - info("PLUGINSD: plugin called DISABLE. Disabling it."); - ((PARSER_USER_OBJECT *) user)->enabled = 0; +static inline PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { + netdata_log_info("PLUGINSD: plugin called DISABLE. Disabling it."); + parser->user.enabled = 0; return PARSER_RC_STOP; } -PARSER_RC pluginsd_label(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *parser) { const char *name = get_word(words, num_words, 1); const char *label_source = get_word(words, num_words, 2); const char *value = get_word(words, num_words, 3); if (!name || !label_source || !value) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_LABEL, "missing parameters"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_LABEL, "missing parameters"); char *store = (char *)value; bool allocated_store = false; @@ -1071,13 +1063,10 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user) } } - if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels)) - ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create(); + if(unlikely(!(parser->user.new_host_labels))) + parser->user.new_host_labels = rrdlabels_create(); - rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels, - name, - store, - str2l(label_source)); + rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source)); if (allocated_store) freez(store); @@ -1085,90 +1074,84 @@ PARSER_RC pluginsd_label(char **words, size_t num_words, void *user) return PARSER_RC_OK; } -PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) -{ - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_OVERWRITE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); +static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_OVERWRITE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - debug(D_PLUGINSD, "requested to OVERWRITE host labels"); + netdata_log_debug(D_PLUGINSD, "requested to OVERWRITE host labels"); if(unlikely(!host->rrdlabels)) host->rrdlabels = rrdlabels_create(); - rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels)); + rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels); rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE); - rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels); - ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL; + rrdlabels_destroy(parser->user.new_host_labels); + parser->user.new_host_labels = NULL; return PARSER_RC_OK; } - -PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER *parser) { const char *name = get_word(words, num_words, 1); const char *value = get_word(words, num_words, 2); const char *label_source = get_word(words, num_words, 3); if (!name || !value || !*label_source) { - error("Ignoring malformed or empty CHART LABEL command."); - return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + netdata_log_error("Ignoring malformed or empty CHART LABEL command."); + return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); } - if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) { - RRDSET *st = pluginsd_get_chart_from_parent(user); - ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = st->rrdlabels; - rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily); + if(unlikely(!parser->user.chart_rrdlabels_linked_temporarily)) { + RRDSET *st = pluginsd_get_chart_from_parent(parser); + parser->user.chart_rrdlabels_linked_temporarily = st->rrdlabels; + rrdlabels_unmark_all(parser->user.chart_rrdlabels_linked_temporarily); } - rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily, - name, value, str2l(label_source)); + rrdlabels_add(parser->user.chart_rrdlabels_linked_temporarily, name, value, str2l(label_source)); return PARSER_RC_OK; } -PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) -{ - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); +static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - debug(D_PLUGINSD, "requested to commit chart labels"); + netdata_log_debug(D_PLUGINSD, "requested to commit chart labels"); - if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) { - error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", - rrdhost_hostname(host)); - return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + if(!parser->user.chart_rrdlabels_linked_temporarily) { + netdata_log_error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host)); + return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); } - rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily); + rrdlabels_remove_all_unmarked(parser->user.chart_rrdlabels_linked_temporarily); rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE); rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); - ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = NULL; + parser->user.chart_rrdlabels_linked_temporarily = NULL; return PARSER_RC_OK; } -PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user) { +static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) { char *id = get_word(words, num_words, 1); char *start_time_str = get_word(words, num_words, 2); char *end_time_str = get_word(words, num_words, 3); char *child_now_str = get_word(words, num_words, 4); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); RRDSET *st; if (likely(!id || !*id)) - st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN); + st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN); else st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); - pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); + pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_REPLAY_BEGIN); if(start_time_str && end_time_str) { time_t start_time = (time_t) str2ull_encoded(start_time_str); @@ -1216,36 +1199,37 @@ PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user) { st->counter_done++; // these are only needed for db mode RAM, SAVE, MAP, ALLOC - st->current_entry++; - if(st->current_entry >= st->entries) - st->current_entry -= st->entries; + st->db.current_entry++; + if(st->db.current_entry >= st->db.entries) + st->db.current_entry -= st->db.entries; - ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time; - ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time; - ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC; - ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC; - ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = wall_clock_time; - ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = true; + parser->user.replay.start_time = start_time; + parser->user.replay.end_time = end_time; + parser->user.replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC; + parser->user.replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC; + parser->user.replay.wall_clock_time = wall_clock_time; + parser->user.replay.rset_enabled = true; return PARSER_RC_OK; } - error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN - " from %ld to %ld, but timestamps are invalid " - "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET, - rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, - wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance); + netdata_log_error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN + " from %ld to %ld, but timestamps are invalid " + "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET, + rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, + wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", + tolerance); } // the child sends an RBEGIN without any parameters initially // setting rset_enabled to false, means the RSET should not store any metrics // to store metrics, the RBEGIN needs to have timestamps - ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; - ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false; + parser->user.replay.start_time = 0; + parser->user.replay.end_time = 0; + parser->user.replay.start_time_ut = 0; + parser->user.replay.end_time_ut = 0; + parser->user.replay.wall_clock_time = 0; + parser->user.replay.rset_enabled = false; return PARSER_RC_OK; } @@ -1276,20 +1260,18 @@ static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str return flags; } -PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser) { char *dimension = get_word(words, num_words, 1); char *value_str = get_word(words, num_words, 2); char *flags_str = get_word(words, num_words, 3); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_SET); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - PARSER_USER_OBJECT *u = user; - if(!u->replay.rset_enabled) { + if(!parser->user.replay.rset_enabled) { error_limit_static_thread_var(erl, 1, 0); error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors", rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); @@ -1299,18 +1281,18 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) } RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET); - if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - if (unlikely(!u->replay.start_time || !u->replay.end_time)) { - error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.", + if (unlikely(!parser->user.replay.start_time || !parser->user.replay.end_time)) { + netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.", rrdhost_hostname(host), rrdset_id(st), dimension, PLUGINSD_KEYWORD_REPLAY_SET, - u->replay.start_time, - u->replay.end_time, + parser->user.replay.start_time, + parser->user.replay.end_time, PLUGINSD_KEYWORD_REPLAY_BEGIN); - return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); } if (unlikely(!value_str || !*value_str)) @@ -1331,10 +1313,10 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) flags = SN_EMPTY_SLOT; } - rrddim_store_metric(rd, u->replay.end_time_ut, value, flags); - rd->last_collected_time.tv_sec = u->replay.end_time; - rd->last_collected_time.tv_usec = 0; - rd->collections_counter++; + rrddim_store_metric(rd, parser->user.replay.end_time_ut, value, flags); + rd->collector.last_collected_time.tv_sec = parser->user.replay.end_time; + rd->collector.last_collected_time.tv_usec = 0; + rd->collector.counter++; } else { error_limit_static_global_var(erl, 1, 0); @@ -1346,9 +1328,8 @@ PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user) return PARSER_RC_OK; } -PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user) -{ - if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false) +static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, PARSER *parser) { + if(parser->user.replay.rset_enabled == false) return PARSER_RC_OK; char *dimension = get_word(words, num_words, 1); @@ -1357,42 +1338,41 @@ PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words char *last_calculated_value_str = get_word(words, num_words, 4); char *last_stored_value_str = get_word(words, num_words, 5); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); - if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec; + usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec; usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0; if(last_collected_ut > dim_last_collected_ut) { - rd->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC); - rd->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC); + rd->collector.last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC); + rd->collector.last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC); } - rd->last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0; - rd->last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0; - rd->last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0; + rd->collector.last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0; + rd->collector.last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0; + rd->collector.last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0; return PARSER_RC_OK; } -PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user) -{ - if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false) +static inline PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, PARSER *parser) { + if(parser->user.replay.rset_enabled == false) return PARSER_RC_OK; char *last_collected_ut_str = get_word(words, num_words, 1); char *last_updated_ut_str = get_word(words, num_words, 2); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec; usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0; @@ -1414,10 +1394,9 @@ PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words return PARSER_RC_OK; } -PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) -{ +static inline PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARSER *parser) { if (num_words < 7) { // accepts 7, but the 7th is optional - error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command"); + netdata_log_error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command"); return PARSER_RC_ERROR; } @@ -1441,13 +1420,11 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t) str2ull_encoded( child_world_time_txt) : now_realtime_sec(); - PARSER_USER_OBJECT *user_object = user; - - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END); - if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_END); + if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN); - if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN); + if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); #ifdef NETDATA_LOG_REPLICATION_REQUESTS internal_error(true, @@ -1460,12 +1437,12 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) ); #endif - ((PARSER_USER_OBJECT *) user)->data_collections_count++; + parser->user.data_collections_count++; - if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) { + if(parser->user.replay.rset_enabled && st->rrdhost->receiver) { time_t now = now_realtime_sec(); time_t started = st->rrdhost->receiver->replication_first_time_t; - time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time; + time_t current = parser->user.replay.end_time; if(started && current > started) { host->rrdpush_receiver_replication_percent = (NETDATA_DOUBLE) (current - started) * 100.0 / (NETDATA_DOUBLE) (now - started); @@ -1474,12 +1451,12 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) } } - ((PARSER_USER_OBJECT *) user)->replay.start_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0; - ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0; - ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0; - ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false; + parser->user.replay.start_time = 0; + parser->user.replay.end_time = 0; + parser->user.replay.start_time_ut = 0; + parser->user.replay.end_time_ut = 0; + parser->user.replay.wall_clock_time = 0; + parser->user.replay.rset_enabled = false; st->counter++; st->counter_done++; @@ -1509,7 +1486,7 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) rrdhost_hostname(host), rrdset_id(st)); #endif - pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END); + pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_REPLAY_END); host->rrdpush_receiver_replication_percent = 100.0; worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, host->rrdpush_receiver_replication_percent); @@ -1517,17 +1494,17 @@ PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user) return PARSER_RC_OK; } - pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END); + pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_REPLAY_END); rrdcontext_updated_retention_rrdset(st); - bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st, + bool ok = replicate_chart_request(send_to_plugin, parser, host, st, first_entry_child, last_entry_child, child_world_time, first_entry_requested, last_entry_requested); return ok ? PARSER_RC_OK : PARSER_RC_ERROR; } -PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) { +static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) { timing_init(); char *id = get_word(words, num_words, 1); @@ -1536,17 +1513,17 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) { char *wall_clock_time_str = get_word(words, num_words, 4); if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters"); + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters"); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN_V2); - if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_BEGIN_V2); + if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); timing_step(TIMING_STEP_BEGIN2_PREPARE); RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN_V2); - if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN_V2); + pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_BEGIN_V2); if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE | RRDSET_FLAG_ARCHIVED))) rrdset_isnot_obsolete(st); @@ -1573,32 +1550,31 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) { // ------------------------------------------------------------------------ // prepare our state - pluginsd_lock_rrdset_data_collection(user); + pluginsd_lock_rrdset_data_collection(parser); - PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; - u->v2.update_every = update_every; - u->v2.end_time = end_time; - u->v2.wall_clock_time = wall_clock_time; - u->v2.ml_locked = ml_chart_update_begin(st); + parser->user.v2.update_every = update_every; + parser->user.v2.end_time = end_time; + parser->user.v2.wall_clock_time = wall_clock_time; + parser->user.v2.ml_locked = ml_chart_update_begin(st); timing_step(TIMING_STEP_BEGIN2_ML); // ------------------------------------------------------------------------ // propagate it forward in v2 - if(!u->v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost)) - u->v2.stream_buffer = rrdset_push_metric_initialize(u->st, wall_clock_time); + if(!parser->user.v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost)) + parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time); - if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.wb) { + if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) { // check if receiver and sender have the same number parsing capabilities - bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754); - NUMBER_ENCODING encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754); + NUMBER_ENCODING encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; - BUFFER *wb = u->v2.stream_buffer.wb; + BUFFER *wb = parser->user.v2.stream_buffer.wb; buffer_need_bytes(wb, 1024); - if(unlikely(u->v2.stream_buffer.begin_v2_added)) + if(unlikely(parser->user.v2.stream_buffer.begin_v2_added)) buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2); @@ -1626,8 +1602,8 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) { buffer_fast_strcat(wb, "\n", 1); - u->v2.stream_buffer.last_point_end_time_s = end_time; - u->v2.stream_buffer.begin_v2_added = true; + parser->user.v2.stream_buffer.last_point_end_time_s = end_time; + parser->user.v2.stream_buffer.begin_v2_added = true; } timing_step(TIMING_STEP_BEGIN2_PROPAGATE); @@ -1643,16 +1619,16 @@ PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) { st->counter_done++; // these are only needed for db mode RAM, SAVE, MAP, ALLOC - st->current_entry++; - if(st->current_entry >= st->entries) - st->current_entry -= st->entries; + st->db.current_entry++; + if(st->db.current_entry >= st->db.entries) + st->db.current_entry -= st->db.entries; timing_step(TIMING_STEP_BEGIN2_STORE); return PARSER_RC_OK; } -PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) { +static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) { timing_init(); char *dimension = get_word(words, num_words, 1); @@ -1661,20 +1637,18 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) { char *flags_str = get_word(words, num_words, 4); if(unlikely(!dimension || !collected_str || !value_str || !flags_str)) - return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_SET_V2, "missing parameters"); - - PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters"); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET_V2); - if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_SET_V2); + if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2); - if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2); + if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); timing_step(TIMING_STEP_SET2_PREPARE); RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET_V2); - if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED))) rrddim_isnot_obsolete(st, rd); @@ -1703,11 +1677,11 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) { value = NAN; flags = SN_EMPTY_SLOT; - if(u->v2.ml_locked) - ml_dimension_is_anomalous(rd, u->v2.end_time, 0, false); + if(parser->user.v2.ml_locked) + ml_dimension_is_anomalous(rd, parser->user.v2.end_time, 0, false); } - else if(u->v2.ml_locked) { - if (ml_dimension_is_anomalous(rd, u->v2.end_time, value, true)) { + else if(parser->user.v2.ml_locked) { + if (ml_dimension_is_anomalous(rd, parser->user.v2.end_time, value, true)) { // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS); } @@ -1720,13 +1694,13 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) { // ------------------------------------------------------------------------ // propagate it forward in v2 - if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb) { + if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) { // check if receiver and sender have the same number parsing capabilities - bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754); - NUMBER_ENCODING integer_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; - NUMBER_ENCODING doubles_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; + bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754); + NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; - BUFFER *wb = u->v2.stream_buffer.wb; + BUFFER *wb = parser->user.v2.stream_buffer.wb; buffer_need_bytes(wb, 1024); buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); @@ -1750,51 +1724,50 @@ PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) { // ------------------------------------------------------------------------ // store it - rrddim_store_metric(rd, u->v2.end_time * USEC_PER_SEC, value, flags); - rd->last_collected_time.tv_sec = u->v2.end_time; - rd->last_collected_time.tv_usec = 0; - rd->last_collected_value = collected_value; - rd->last_stored_value = value; - rd->last_calculated_value = value; - rd->collections_counter++; - rd->updated = true; + rrddim_store_metric(rd, parser->user.v2.end_time * USEC_PER_SEC, value, flags); + rd->collector.last_collected_time.tv_sec = parser->user.v2.end_time; + rd->collector.last_collected_time.tv_usec = 0; + rd->collector.last_collected_value = collected_value; + rd->collector.last_stored_value = value; + rd->collector.last_calculated_value = value; + rd->collector.counter++; + rrddim_set_updated(rd); timing_step(TIMING_STEP_SET2_STORE); return PARSER_RC_OK; } -void pluginsd_cleanup_v2(void *user) { +void pluginsd_cleanup_v2(PARSER *parser) { // this is called when the thread is stopped while processing - pluginsd_set_chart_from_parent(user, NULL, "THREAD CLEANUP"); + pluginsd_set_chart_from_parent(parser, NULL, "THREAD CLEANUP"); } -PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) { +static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) { timing_init(); - RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END_V2); - if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_END_V2); + if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2); - if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL); + RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2); + if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user; - u->data_collections_count++; + parser->user.data_collections_count++; timing_step(TIMING_STEP_END2_PREPARE); // ------------------------------------------------------------------------ // propagate the whole chart update in v1 - if(unlikely(!u->v2.stream_buffer.v2 && !u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb)) - rrdset_push_metrics_v1(&u->v2.stream_buffer, st); + if(unlikely(!parser->user.v2.stream_buffer.v2 && !parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb)) + rrdset_push_metrics_v1(&parser->user.v2.stream_buffer, st); timing_step(TIMING_STEP_END2_PUSH_V1); // ------------------------------------------------------------------------ // unblock data collection - pluginsd_unlock_previous_chart(user, PLUGINSD_KEYWORD_END_V2, false); + pluginsd_unlock_previous_chart(parser, PLUGINSD_KEYWORD_END_V2, false); rrdcontext_collected_rrdset(st); store_metric_collection_completed(); @@ -1803,7 +1776,7 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_ // ------------------------------------------------------------------------ // propagate it forward - rrdset_push_metrics_finished(&u->v2.stream_buffer, st); + rrdset_push_metrics_finished(&parser->user.v2.stream_buffer, st); timing_step(TIMING_STEP_END2_PROPAGATE); @@ -1812,16 +1785,16 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_ RRDDIM *rd; rrddim_foreach_read(rd, st) { - rd->calculated_value = 0; - rd->collected_value = 0; - rd->updated = false; - } + rd->collector.calculated_value = 0; + rd->collector.collected_value = 0; + rrddim_clear_updated(rd); + } rrddim_foreach_done(rd); // ------------------------------------------------------------------------ // reset state - u->v2 = (struct parser_user_object_v2){ 0 }; + parser->user.v2 = (struct parser_user_object_v2){ 0 }; timing_step(TIMING_STEP_END2_STORE); timing_report(); @@ -1829,19 +1802,126 @@ PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_ return PARSER_RC_OK; } +static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { + netdata_log_info("PLUGINSD: plugin called EXIT."); + return PARSER_RC_STOP; +} + +static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser) +{ + const char *host_uuid_str = get_word(words, num_words, 1); + const char *claim_id_str = get_word(words, num_words, 2); + + if (!host_uuid_str || !claim_id_str) { + netdata_log_error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'", + host_uuid_str ? host_uuid_str : "[unset]", + claim_id_str ? claim_id_str : "[unset]"); + return PARSER_RC_ERROR; + } + + uuid_t uuid; + RRDHOST *host = parser->user.host; + + // We don't need the parsed UUID + // just do it to check the format + if(uuid_parse(host_uuid_str, uuid)) { + netdata_log_error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str); + return PARSER_RC_ERROR; + } + if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL") != 0) { + netdata_log_error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str); + return PARSER_RC_ERROR; + } + + if(strcmp(host_uuid_str, host->machine_guid) != 0) { + netdata_log_error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid); + return PARSER_RC_OK; //the message is OK problem must be somewhere else + } + + rrdhost_aclk_state_lock(host); + + if (host->aclk_state.claimed_id) + freez(host->aclk_state.claimed_id); + + host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL; + + rrdhost_aclk_state_unlock(host); + + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE); + + rrdpush_send_claimed_id(host); + + return PARSER_RC_OK; +} + +// ---------------------------------------------------------------------------- + +static inline bool buffered_reader_read(struct buffered_reader *reader, int fd) { +#ifdef NETDATA_INTERNAL_CHECKS + if(reader->read_buffer[reader->read_len] != '\0') + fatal("%s(): read_buffer does not start with zero", __FUNCTION__ ); +#endif + + ssize_t bytes_read = read(fd, reader->read_buffer + reader->read_len, sizeof(reader->read_buffer) - reader->read_len - 1); + if(unlikely(bytes_read <= 0)) + return false; + + reader->read_len += bytes_read; + reader->read_buffer[reader->read_len] = '\0'; + + return true; +} + +static inline bool buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms) { + errno = 0; + struct pollfd fds[1]; + + fds[0].fd = fd; + fds[0].events = POLLIN; + + int ret = poll(fds, 1, timeout_ms); + + if (ret > 0) { + /* There is data to read */ + if (fds[0].revents & POLLIN) + return buffered_reader_read(reader, fd); + + else if(fds[0].revents & POLLERR) { + netdata_log_error("PARSER: read failed: POLLERR."); + return false; + } + else if(fds[0].revents & POLLHUP) { + netdata_log_error("PARSER: read failed: POLLHUP."); + return false; + } + else if(fds[0].revents & POLLNVAL) { + netdata_log_error("PARSER: read failed: POLLNVAL."); + return false; + } + + netdata_log_error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set."); + return false; + } + else if (ret == 0) { + netdata_log_error("PARSER: timeout while waiting for data."); + return false; + } + + netdata_log_error("PARSER: poll() failed with code %d.", ret); + return false; +} + void pluginsd_process_thread_cleanup(void *ptr) { PARSER *parser = (PARSER *)ptr; - pluginsd_cleanup_v2(parser->user); - pluginsd_host_define_cleanup(parser->user); + pluginsd_cleanup_v2(parser); + pluginsd_host_define_cleanup(parser); rrd_collector_finished(); parser_destroy(parser); } -// New plugins.d parser - inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations) { int enabled = cd->unsafe.enabled; @@ -1852,13 +1932,13 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi } if (unlikely(fileno(fp_plugin_input) == -1)) { - error("input file descriptor given is not a valid stream"); + netdata_log_error("input file descriptor given is not a valid stream"); cd->serial_failures++; return 0; } if (unlikely(fileno(fp_plugin_output) == -1)) { - error("output file descriptor given is not a valid stream"); + netdata_log_error("output file descriptor given is not a valid stream"); cd->serial_failures++; return 0; } @@ -1866,38 +1946,42 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi clearerr(fp_plugin_input); clearerr(fp_plugin_output); - PARSER_USER_OBJECT user = { - .enabled = cd->unsafe.enabled, - .host = host, - .cd = cd, - .trust_durations = trust_durations - }; + PARSER *parser; + { + PARSER_USER_OBJECT user = { + .enabled = cd->unsafe.enabled, + .host = host, + .cd = cd, + .trust_durations = trust_durations + }; - // fp_plugin_output = our input; fp_plugin_input = our output - PARSER *parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, - PARSER_INPUT_SPLIT, NULL); + // fp_plugin_output = our input; fp_plugin_input = our output + parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL); + } pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD); rrd_collector_started(); + size_t count = 0; + // this keeps the parser with its current value // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - user.parser = parser; - char buffer[PLUGINSD_LINE_MAX + 1]; - - while (likely(!parser_next(parser, buffer, PLUGINSD_LINE_MAX))) { - if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, buffer))) + buffered_reader_init(&parser->reader); + char buffer[PLUGINSD_LINE_MAX + 2]; + while(likely(service_running(SERVICE_COLLECTORS))) { + if (unlikely(!buffered_reader_next_line(&parser->reader, buffer, PLUGINSD_LINE_MAX + 2))) { + if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC))) + break; + } + else if(unlikely(parser_action(parser, buffer))) break; } - // free parser with the pop function - netdata_thread_cleanup_pop(1); - - cd->unsafe.enabled = user.enabled; - size_t count = user.data_collections_count; + cd->unsafe.enabled = parser->user.enabled; + count = parser->user.data_collections_count; if (likely(count)) { cd->successful_collections += count; @@ -1906,143 +1990,187 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi else cd->serial_failures++; + // free parser with the pop function + netdata_thread_cleanup_pop(1); + return count; } -PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused) -{ - info("PLUGINSD: plugin called EXIT."); - return PARSER_RC_STOP; +void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire) { + parser_init_repertoire(parser, repertoire); + + if (repertoire & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING)) + inflight_functions_init(parser); } -static void pluginsd_keywords_init_internal(PARSER *parser, PLUGINSD_KEYWORDS types, void (*add_func)(PARSER *parser, char *keyword, keyword_function func)) { +PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd, + PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) { + PARSER *parser; - if (types & PARSER_INIT_PLUGINSD) { - add_func(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush); - add_func(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable); + parser = callocz(1, sizeof(*parser)); + if(user) + parser->user = *user; + parser->fd = fd; + parser->fp_input = fp_input; + parser->fp_output = fp_output; +#ifdef ENABLE_HTTPS + parser->ssl_output = ssl; +#endif + parser->flags = flags; - add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE, pluginsd_host_define); - add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, pluginsd_host_define_end); - add_func(parser, PLUGINSD_KEYWORD_HOST_LABEL, pluginsd_host_labels); - add_func(parser, PLUGINSD_KEYWORD_HOST, pluginsd_host); + spinlock_init(&parser->writer.spinlock); + return parser; +} - add_func(parser, PLUGINSD_KEYWORD_EXIT, pluginsd_exit); - } +PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words) { + switch(keyword->id) { + case 1: + return pluginsd_set_v2(words, num_words, parser); - if (types & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING)) { - // plugins.d plugins and streaming - add_func(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart); - add_func(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension); - add_func(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable); - add_func(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label); - add_func(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite); - add_func(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, pluginsd_clabel_commit); - add_func(parser, PLUGINSD_KEYWORD_CLABEL, pluginsd_clabel); - add_func(parser, PLUGINSD_KEYWORD_FUNCTION, pluginsd_function); - add_func(parser, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN, pluginsd_function_result_begin); + case 2: + return pluginsd_begin_v2(words, num_words, parser); - add_func(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin); - add_func(parser, PLUGINSD_KEYWORD_SET, pluginsd_set); - add_func(parser, PLUGINSD_KEYWORD_END, pluginsd_end); + case 3: + return pluginsd_end_v2(words, num_words, parser); - inflight_functions_init(parser); - } + case 11: + return pluginsd_set(words, num_words, parser); - if (types & PARSER_INIT_STREAMING) { - add_func(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, pluginsd_chart_definition_end); + case 12: + return pluginsd_begin(words, num_words, parser); - // replication - add_func(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, pluginsd_replay_begin); - add_func(parser, PLUGINSD_KEYWORD_REPLAY_SET, pluginsd_replay_set); - add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, pluginsd_replay_rrddim_collection_state); - add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, pluginsd_replay_rrdset_collection_state); - add_func(parser, PLUGINSD_KEYWORD_REPLAY_END, pluginsd_replay_end); + case 13: + return pluginsd_end(words, num_words, parser); - // streaming metrics v2 - add_func(parser, PLUGINSD_KEYWORD_BEGIN_V2, pluginsd_begin_v2); - add_func(parser, PLUGINSD_KEYWORD_SET_V2, pluginsd_set_v2); - add_func(parser, PLUGINSD_KEYWORD_END_V2, pluginsd_end_v2); - } -} + case 21: + return pluginsd_replay_set(words, num_words, parser); -void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types) { - pluginsd_keywords_init_internal(parser, types, parser_add_keyword); -} + case 22: + return pluginsd_replay_begin(words, num_words, parser); -struct pluginsd_user_unittest { - size_t size; - const char **hashtable; - uint32_t (*hash)(const char *s); - size_t collisions; -}; + case 23: + return pluginsd_replay_rrddim_collection_state(words, num_words, parser); + + case 24: + return pluginsd_replay_rrdset_collection_state(words, num_words, parser); + + case 25: + return pluginsd_replay_end(words, num_words, parser); + + case 31: + return pluginsd_dimension(words, num_words, parser); + + case 32: + return pluginsd_chart(words, num_words, parser); + + case 33: + return pluginsd_chart_definition_end(words, num_words, parser); + + case 34: + return pluginsd_clabel(words, num_words, parser); + + case 35: + return pluginsd_clabel_commit(words, num_words, parser); + + case 41: + return pluginsd_function(words, num_words, parser); + + case 42: + return pluginsd_function_result_begin(words, num_words, parser); + + case 51: + return pluginsd_label(words, num_words, parser); + + case 52: + return pluginsd_overwrite(words, num_words, parser); + + case 53: + return pluginsd_variable(words, num_words, parser); + + case 61: + return streaming_claimed_id(words, num_words, parser); + + case 71: + return pluginsd_host(words, num_words, parser); + + case 72: + return pluginsd_host_define(words, num_words, parser); + + case 73: + return pluginsd_host_define_end(words, num_words, parser); -void pluginsd_keyword_collision_check(PARSER *parser, char *keyword, keyword_function func __maybe_unused) { - struct pluginsd_user_unittest *u = parser->user; + case 74: + return pluginsd_host_labels(words, num_words, parser); - uint32_t hash = u->hash(keyword); - uint32_t slot = hash % u->size; + case 97: + return pluginsd_flush(words, num_words, parser); - if(u->hashtable[slot]) - u->collisions++; + case 98: + return pluginsd_disable(words, num_words, parser); - u->hashtable[slot] = keyword; + case 99: + return pluginsd_exit(words, num_words, parser); + + default: + fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id); + } } -static struct { - const char *name; - uint32_t (*hash)(const char *s); - size_t slots_needed; -} hashers[] = { - { .name = "djb2_hash32(s)", djb2_hash32, .slots_needed = 0, }, - { .name = "fnv1_hash32(s)", fnv1_hash32, .slots_needed = 0, }, - { .name = "fnv1a_hash32(s)", fnv1a_hash32, .slots_needed = 0, }, - { .name = "larson_hash32(s)", larson_hash32, .slots_needed = 0, }, - { .name = "pluginsd_parser_hash32(s)", pluginsd_parser_hash32, .slots_needed = 0, }, - - // terminator - { .name = NULL, NULL, .slots_needed = 0, }, -}; +#include "gperf-hashtable.h" + +void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) { + parser->repertoire = repertoire; + + for(size_t i = GPERF_PARSER_MIN_HASH_VALUE ; i <= GPERF_PARSER_MAX_HASH_VALUE ;i++) { + if(gperf_keywords[i].keyword && *gperf_keywords[i].keyword && (parser->repertoire & gperf_keywords[i].repertoire)) + worker_register_job_name(gperf_keywords[i].worker_job_id, gperf_keywords[i].keyword); + } +} + +void parser_destroy(PARSER *parser) { + if (unlikely(!parser)) + return; + + dictionary_destroy(parser->inflight.functions); + freez(parser); +} int pluginsd_parser_unittest(void) { - PARSER *p; - size_t slots_to_check = 1000; - size_t i, h; - - // check for hashtable collisions - for(h = 0; hashers[h].name ;h++) { - hashers[h].slots_needed = slots_to_check * 1000000; - - for (i = 10; i < slots_to_check; i++) { - struct pluginsd_user_unittest user = { - .hash = hashers[h].hash, - .size = i, - .hashtable = callocz(i, sizeof(const char *)), - .collisions = 0, - }; - - p = parser_init(&user, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL); - pluginsd_keywords_init_internal(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING, - pluginsd_keyword_collision_check); - parser_destroy(p); - - freez(user.hashtable); - - if (!user.collisions) { - hashers[h].slots_needed = i; - break; - } + PARSER *p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL); + pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING); + + char *lines[] = { + "BEGIN2 abcdefghijklmnopqr 123", + "SET2 abcdefg 0x12345678 0 0", + "SET2 hijklmnoqr 0x12345678 0 0", + "SET2 stuvwxyz 0x12345678 0 0", + "END2", + NULL, + }; + + char *words[PLUGINSD_MAX_WORDS]; + size_t iterations = 1000000; + size_t count = 0; + char input[PLUGINSD_LINE_MAX + 1]; + + usec_t started = now_realtime_usec(); + while(--iterations) { + for(size_t line = 0; lines[line] ;line++) { + strncpyz(input, lines[line], PLUGINSD_LINE_MAX); + size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS); + const char *command = get_word(words, num_words, 0); + PARSER_KEYWORD *keyword = parser_find_keyword(p, command); + if(unlikely(!keyword)) + fatal("Cannot parse the line '%s'", lines[line]); + count++; } } + usec_t ended = now_realtime_usec(); - for(h = 0; hashers[h].name ;h++) { - if(hashers[h].slots_needed > 1000) - info("PARSER: hash function '%s' cannot be used without collisions under %zu slots", hashers[h].name, slots_to_check); - else - info("PARSER: hash function '%s' needs PARSER_KEYWORDS_HASHTABLE_SIZE (in parser.h) set to %zu", hashers[h].name, hashers[h].slots_needed); - } + netdata_log_info("Parsed %zu lines in %0.2f secs, %0.2f klines/sec", count, + (double)(ended - started) / (double)USEC_PER_SEC, + (double)count / ((double)(ended - started) / (double)USEC_PER_SEC) / 1000.0); - p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL); - pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING); parser_destroy(p); return 0; } -- cgit v1.2.3