summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d/pluginsd_parser.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:18 +0000
commit5da14042f70711ea5cf66e034699730335462f66 (patch)
tree0f6354ccac934ed87a2d555f45be4c831cf92f4a /collectors/plugins.d/pluginsd_parser.c
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz
netdata-5da14042f70711ea5cf66e034699730335462f66.zip
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'collectors/plugins.d/pluginsd_parser.c')
-rw-r--r--collectors/plugins.d/pluginsd_parser.c3208
1 files changed, 0 insertions, 3208 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
deleted file mode 100644
index 3b47c6c0f..000000000
--- a/collectors/plugins.d/pluginsd_parser.c
+++ /dev/null
@@ -1,3208 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "pluginsd_parser.h"
-
-#define LOG_FUNCTIONS false
-
-#define SERVING_STREAMING(parser) ((parser)->repertoire == PARSER_INIT_STREAMING)
-#define SERVING_PLUGINSD(parser) ((parser)->repertoire == PARSER_INIT_PLUGINSD)
-
-static ssize_t send_to_plugin(const char *txt, void *data) {
- PARSER *parser = data;
-
- if(!txt || !*txt)
- return 0;
-
-#ifdef ENABLE_H2O
- if(parser->h2o_ctx)
- return h2o_stream_write(parser->h2o_ctx, txt, strlen(txt));
-#endif
-
- errno = 0;
- spinlock_lock(&parser->writer.spinlock);
- ssize_t bytes = -1;
-
-#ifdef ENABLE_HTTPS
- NETDATA_SSL *ssl = parser->ssl_output;
- if(ssl) {
-
- if(SSL_connection(ssl))
- bytes = netdata_ssl_write(ssl, (void *) txt, strlen(txt));
-
- else
- netdata_log_error("PLUGINSD: cannot send command (SSL)");
-
- spinlock_unlock(&parser->writer.spinlock);
- return bytes;
- }
-#endif
-
- if(parser->fp_output) {
-
- bytes = fprintf(parser->fp_output, "%s", txt);
- if(bytes <= 0) {
- netdata_log_error("PLUGINSD: cannot send command (FILE)");
- bytes = -2;
- }
- else
- fflush(parser->fp_output);
-
- spinlock_unlock(&parser->writer.spinlock);
- return bytes;
- }
-
- if(parser->fd != -1) {
- bytes = 0;
- ssize_t total = (ssize_t)strlen(txt);
- ssize_t sent;
-
- do {
- sent = write(parser->fd, &txt[bytes], total - bytes);
- if(sent <= 0) {
- netdata_log_error("PLUGINSD: cannot send command (fd)");
- spinlock_unlock(&parser->writer.spinlock);
- return -3;
- }
- bytes += sent;
- }
- while(bytes < total);
-
- spinlock_unlock(&parser->writer.spinlock);
- return (int)bytes;
- }
-
- spinlock_unlock(&parser->writer.spinlock);
- netdata_log_error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
- return -4;
-}
-
-static inline RRDHOST *pluginsd_require_scope_host(PARSER *parser, const char *cmd) {
- RRDHOST *host = parser->user.host;
-
- if(unlikely(!host))
- netdata_log_error("PLUGINSD: command %s requires a host, but is not set.", cmd);
-
- return host;
-}
-
-static inline RRDSET *pluginsd_require_scope_chart(PARSER *parser, const char *cmd, const char *parent_cmd) {
- RRDSET *st = parser->user.st;
-
- if(unlikely(!st))
- netdata_log_error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
-
- return st;
-}
-
-static inline RRDSET *pluginsd_get_scope_chart(PARSER *parser) {
- return parser->user.st;
-}
-
-static inline void pluginsd_lock_rrdset_data_collection(PARSER *parser) {
- if(parser->user.st && !parser->user.v2.locked_data_collection) {
- spinlock_lock(&parser->user.st->data_collection_lock);
- parser->user.v2.locked_data_collection = true;
- }
-}
-
-static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) {
- if(parser->user.st && parser->user.v2.locked_data_collection) {
- spinlock_unlock(&parser->user.st->data_collection_lock);
- parser->user.v2.locked_data_collection = false;
- return true;
- }
-
- return false;
-}
-
-static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const char *keyword, bool stale) {
- if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) {
- if(stale)
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
- rrdhost_hostname(parser->user.st->rrdhost),
- rrdset_id(parser->user.st),
- keyword);
- }
-
- if(unlikely(parser->user.v2.ml_locked)) {
- ml_chart_update_end(parser->user.st);
- parser->user.v2.ml_locked = false;
-
- if(stale)
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
- rrdhost_hostname(parser->user.st->rrdhost),
- rrdset_id(parser->user.st),
- keyword);
- }
-}
-
-static inline void pluginsd_clear_scope_chart(PARSER *parser, const char *keyword) {
- pluginsd_unlock_previous_scope_chart(parser, keyword, true);
-
- if(parser->user.cleanup_slots && parser->user.st)
- rrdset_pluginsd_receive_unslot(parser->user.st);
-
- parser->user.st = NULL;
- parser->user.cleanup_slots = false;
-}
-
-static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const char *keyword) {
- RRDSET *old_st = parser->user.st;
- pid_t old_collector_tid = (old_st) ? old_st->pluginsd.collector_tid : 0;
- pid_t my_collector_tid = gettid();
-
- if(unlikely(old_collector_tid)) {
- if(old_collector_tid != my_collector_tid) {
- nd_log_limit_static_global_var(erl, 1, 0);
- nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING,
- "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)",
- keyword ? keyword : "UNKNOWN",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- my_collector_tid, old_collector_tid);
-
- return false;
- }
-
- old_st->pluginsd.collector_tid = 0;
- }
-
- st->pluginsd.collector_tid = my_collector_tid;
-
- pluginsd_clear_scope_chart(parser, keyword);
-
- st->pluginsd.pos = 0;
- parser->user.st = st;
- parser->user.cleanup_slots = false;
-
- return true;
-}
-
-static inline void pluginsd_rrddim_put_to_slot(PARSER *parser, RRDSET *st, RRDDIM *rd, ssize_t slot, bool obsolete) {
- size_t wanted_size = st->pluginsd.size;
-
- if(slot >= 1) {
- st->pluginsd.dims_with_slots = true;
- wanted_size = slot;
- }
- else {
- st->pluginsd.dims_with_slots = false;
- wanted_size = dictionary_entries(st->rrddim_root_index);
- }
-
- if(wanted_size > st->pluginsd.size) {
- st->pluginsd.prd_array = reallocz(st->pluginsd.prd_array, wanted_size * sizeof(struct pluginsd_rrddim));
-
- // initialize the empty slots
- for(ssize_t i = (ssize_t) wanted_size - 1; i >= (ssize_t) st->pluginsd.size; i--) {
- st->pluginsd.prd_array[i].rda = NULL;
- st->pluginsd.prd_array[i].rd = NULL;
- st->pluginsd.prd_array[i].id = NULL;
- }
-
- st->pluginsd.size = wanted_size;
- }
-
- if(st->pluginsd.dims_with_slots) {
- struct pluginsd_rrddim *prd = &st->pluginsd.prd_array[slot - 1];
-
- if(prd->rd != rd) {
- prd->rda = rrddim_find_and_acquire(st, string2str(rd->id));
- prd->rd = rrddim_acquired_to_rrddim(prd->rda);
- prd->id = string2str(prd->rd->id);
- }
-
- if(obsolete)
- parser->user.cleanup_slots = true;
- }
-}
-
-static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, ssize_t slot, const char *cmd) {
- if (unlikely(!dimension || !*dimension)) {
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
- rrdhost_hostname(host), rrdset_id(st), cmd);
- return NULL;
- }
-
- if (unlikely(!st->pluginsd.size)) {
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, but the chart has no dimensions.",
- rrdhost_hostname(host), rrdset_id(st), cmd);
- return NULL;
- }
-
- struct pluginsd_rrddim *prd;
- RRDDIM *rd;
-
- if(likely(st->pluginsd.dims_with_slots)) {
- // caching with slots
-
- if(unlikely(slot < 1 || slot > st->pluginsd.size)) {
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s with slot %zd, but slots in the range [1 - %u] are expected.",
- rrdhost_hostname(host), rrdset_id(st), cmd, slot, st->pluginsd.size);
- return NULL;
- }
-
- prd = &st->pluginsd.prd_array[slot - 1];
-
- rd = prd->rd;
- if(likely(rd)) {
-#ifdef NETDATA_INTERNAL_CHECKS
- if(strcmp(prd->id, dimension) != 0) {
- ssize_t t;
- for(t = 0; t < st->pluginsd.size ;t++) {
- if (strcmp(st->pluginsd.prd_array[t].id, dimension) == 0)
- break;
- }
- if(t >= st->pluginsd.size)
- t = -1;
-
- internal_fatal(true,
- "PLUGINSD: expected to find dimension '%s' on slot %zd, but found '%s', "
- "the right slot is %zd",
- dimension, slot, prd->id, t);
- }
-#endif
- return rd;
- }
- }
- else {
- // caching without slots
-
- if(unlikely(st->pluginsd.pos >= st->pluginsd.size))
- st->pluginsd.pos = 0;
-
- prd = &st->pluginsd.prd_array[st->pluginsd.pos++];
-
- rd = prd->rd;
- if(likely(rd)) {
- const char *id = prd->id;
-
- if(strcmp(id, dimension) == 0) {
- // we found it cached
- return rd;
- }
- else {
- // the cached one is not good for us
- rrddim_acquired_release(prd->rda);
- prd->rda = NULL;
- prd->rd = NULL;
- prd->id = NULL;
- }
- }
- }
-
- // we need to find the dimension and set it to prd
-
- RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
- if (unlikely(!rda)) {
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
- rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
-
- return NULL;
- }
-
- prd->rda = rda;
- prd->rd = rd = rrddim_acquired_to_rrddim(rda);
- prd->id = string2str(rd->id);
-
- return rd;
-}
-
-static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
- if (unlikely(!chart || !*chart)) {
- netdata_log_error("PLUGINSD: 'host:%s' got a %s without a chart id.",
- rrdhost_hostname(host), cmd);
- return NULL;
- }
-
- RRDSET *st = rrdset_find(host, chart);
- if (unlikely(!st))
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
- rrdhost_hostname(host), chart, cmd);
-
- return st;
-}
-
-static inline ssize_t pluginsd_parse_rrd_slot(char **words, size_t num_words) {
- ssize_t slot = -1;
- char *id = get_word(words, num_words, 1);
- if(id && id[0] == PLUGINSD_KEYWORD_SLOT[0] && id[1] == PLUGINSD_KEYWORD_SLOT[1] &&
- id[2] == PLUGINSD_KEYWORD_SLOT[2] && id[3] == PLUGINSD_KEYWORD_SLOT[3] && id[4] == ':') {
- slot = (ssize_t) str2ull_encoded(&id[5]);
- if(slot < 0) slot = 0; // to make the caller increment its idx of the words
- }
-
- return slot;
-}
-
-static inline void pluginsd_rrdset_cache_put_to_slot(PARSER *parser, RRDSET *st, ssize_t slot, bool obsolete) {
- // clean possible old cached data
- rrdset_pluginsd_receive_unslot(st);
-
- if(unlikely(slot < 1 || slot >= INT32_MAX))
- return;
-
- RRDHOST *host = st->rrdhost;
-
- if(unlikely((size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size)) {
- spinlock_lock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock);
- size_t old_slots = host->rrdpush.receive.pluginsd_chart_slots.size;
- size_t new_slots = (old_slots < PLUGINSD_MIN_RRDSET_POINTERS_CACHE) ? PLUGINSD_MIN_RRDSET_POINTERS_CACHE : old_slots * 2;
-
- if(new_slots < (size_t)slot)
- new_slots = slot;
-
- host->rrdpush.receive.pluginsd_chart_slots.array =
- reallocz(host->rrdpush.receive.pluginsd_chart_slots.array, new_slots * sizeof(RRDSET *));
-
- for(size_t i = old_slots; i < new_slots ;i++)
- host->rrdpush.receive.pluginsd_chart_slots.array[i] = NULL;
-
- host->rrdpush.receive.pluginsd_chart_slots.size = new_slots;
- spinlock_unlock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock);
- }
-
- host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1] = st;
- st->pluginsd.last_slot = (int32_t)slot - 1;
- parser->user.cleanup_slots = obsolete;
-}
-
-static inline RRDSET *pluginsd_rrdset_cache_get_from_slot(PARSER *parser, RRDHOST *host, const char *id, ssize_t slot, const char *keyword) {
- if(unlikely(slot < 1 || (size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size))
- return pluginsd_find_chart(host, id, keyword);
-
- RRDSET *st = host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1];
-
- if(!st) {
- st = pluginsd_find_chart(host, id, keyword);
- if(st)
- pluginsd_rrdset_cache_put_to_slot(parser, st, slot, rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE));
- }
- else {
- internal_fatal(string_strcmp(st->id, id) != 0,
- "PLUGINSD: wrong chart in slot %zd, expected '%s', found '%s'",
- slot - 1, id, string2str(st->id));
- }
-
- return st;
-}
-
-static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg) {
- parser->user.enabled = 0;
-
- if(keyword && msg) {
- nd_log_limit_static_global_var(erl, 1, 0);
- nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_INFO,
- "PLUGINSD: keyword %s: %s", keyword, msg);
- }
-
- return PARSER_RC_ERROR;
-}
-
-static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) {
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
-
- char *dimension = get_word(words, num_words, idx++);
- char *value = get_word(words, num_words, idx++);
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET);
- if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- st->pluginsd.set = true;
-
- if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- netdata_log_debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
- rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
-
- if (value && *value)
- rrddim_set_by_pointer(st, rd, str2ll_encoded(value));
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) {
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
-
- char *id = get_word(words, num_words, idx++);
- char *microseconds_txt = get_word(words, num_words, idx++);
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN))
- return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- usec_t microseconds = 0;
- if (microseconds_txt && *microseconds_txt) {
- long long t = str2ll(microseconds_txt, NULL);
- if(t >= 0)
- microseconds = t;
- }
-
-#ifdef NETDATA_LOG_REPLICATION_REQUESTS
- if(st->replay.log_next_data_collection) {
- st->replay.log_next_data_collection = false;
-
- internal_error(true,
- "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu",
- rrdhost_hostname(host), rrdset_id(st),
- st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec,
- st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec,
- microseconds
- );
- }
-#endif
-
- if (likely(st->counter_done)) {
- if (likely(microseconds)) {
- if (parser->user.trust_durations)
- rrdset_next_usec_unfiltered(st, microseconds);
- else
- rrdset_next_usec(st, microseconds);
- }
- else
- rrdset_next(st);
- }
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) {
- char *tv_sec = get_word(words, num_words, 1);
- char *tv_usec = get_word(words, num_words, 2);
- char *pending_rrdset_next = get_word(words, num_words, 3);
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- netdata_log_debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
-
- pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_END);
- parser->user.data_collections_count++;
-
- struct timeval tv = {
- .tv_sec = (tv_sec && *tv_sec) ? str2ll(tv_sec, NULL) : 0,
- .tv_usec = (tv_usec && *tv_usec) ? str2ll(tv_usec, NULL) : 0
- };
-
- if(!tv.tv_sec)
- now_realtime_timeval(&tv);
-
- rrdset_timed_done(st, tv, pending_rrdset_next && *pending_rrdset_next ? true : false);
-
- return PARSER_RC_OK;
-}
-
-static void pluginsd_host_define_cleanup(PARSER *parser) {
- string_freez(parser->user.host_define.hostname);
- rrdlabels_destroy(parser->user.host_define.rrdlabels);
-
- parser->user.host_define.hostname = NULL;
- parser->user.host_define.rrdlabels = NULL;
- parser->user.host_define.parsing_host = false;
-}
-
-static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) {
- if(uuid_parse(guid, *uuid))
- return false;
-
- uuid_unparse_lower(*uuid, output);
-
- return true;
-}
-
-static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PARSER *parser) {
- char *guid = get_word(words, num_words, 1);
- char *hostname = get_word(words, num_words, 2);
-
- if(unlikely(!guid || !*guid || !hostname || !*hostname))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
-
- if(unlikely(parser->user.host_define.parsing_host))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE,
- "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?");
-
- if(!pluginsd_validate_machine_guid(guid, &parser->user.host_define.machine_guid, parser->user.host_define.machine_guid_str))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
-
- parser->user.host_define.hostname = string_strdupz(hostname);
- parser->user.host_define.rrdlabels = rrdlabels_create();
- parser->user.host_define.parsing_host = true;
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, RRDLABELS *labels, const char *keyword) {
- char *name = get_word(words, num_words, 1);
- char *value = get_word(words, num_words, 2);
-
- if(!name || !*name || !value)
- return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters");
-
- if(!parser->user.host_define.parsing_host || !labels)
- return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
-
- rrdlabels_add(labels, name, value, RRDLABEL_SRC_CONFIG);
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_host_labels(char **words, size_t num_words, PARSER *parser) {
- return pluginsd_host_dictionary(words, num_words, parser,
- parser->user.host_define.rrdlabels,
- PLUGINSD_KEYWORD_HOST_LABEL);
-}
-
-static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
- if(!parser->user.host_define.parsing_host)
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
-
- RRDHOST *host = rrdhost_find_or_create(
- string2str(parser->user.host_define.hostname),
- string2str(parser->user.host_define.hostname),
- parser->user.host_define.machine_guid_str,
- "Netdata Virtual Host 1.0",
- netdata_configured_timezone,
- netdata_configured_abbrev_timezone,
- netdata_configured_utc_offset,
- NULL,
- program_name,
- program_version,
- default_rrd_update_every,
- default_rrd_history_entries,
- default_rrd_memory_mode,
- default_health_enabled,
- default_rrdpush_enabled,
- default_rrdpush_destination,
- default_rrdpush_api_key,
- default_rrdpush_send_charts_matching,
- default_rrdpush_enable_replication,
- default_rrdpush_seconds_to_replicate,
- default_rrdpush_replication_step,
- rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels),
- false);
-
- rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
-
- if(host->rrdlabels) {
- rrdlabels_migrate_to_these(host->rrdlabels, parser->user.host_define.rrdlabels);
- }
- else {
- host->rrdlabels = parser->user.host_define.rrdlabels;
- parser->user.host_define.rrdlabels = NULL;
- }
-
- pluginsd_host_define_cleanup(parser);
-
- parser->user.host = host;
- pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END);
-
- rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
- rrdcontext_host_child_connected(host);
- schedule_node_info_update(host);
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_host(char **words, size_t num_words, PARSER *parser) {
- char *guid = get_word(words, num_words, 1);
-
- if(!guid || !*guid || strcmp(guid, "localhost") == 0) {
- parser->user.host = localhost;
- return PARSER_RC_OK;
- }
-
- uuid_t uuid;
- char uuid_str[UUID_STR_LEN];
- if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
-
- RRDHOST *host = rrdhost_find_by_guid(uuid_str);
- if(unlikely(!host))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
-
- parser->user.host = host;
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *parser) {
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
-
- char *type = get_word(words, num_words, idx++);
- char *name = get_word(words, num_words, idx++);
- char *title = get_word(words, num_words, idx++);
- char *units = get_word(words, num_words, idx++);
- char *family = get_word(words, num_words, idx++);
- char *context = get_word(words, num_words, idx++);
- char *chart = get_word(words, num_words, idx++);
- char *priority_s = get_word(words, num_words, idx++);
- char *update_every_s = get_word(words, num_words, idx++);
- char *options = get_word(words, num_words, idx++);
- char *plugin = get_word(words, num_words, idx++);
- char *module = get_word(words, num_words, idx++);
-
- // parse the id from type
- char *id = NULL;
- if (likely(type && (id = strchr(type, '.')))) {
- *id = '\0';
- id++;
- }
-
- // make sure we have the required variables
- if (unlikely((!type || !*type || !id || !*id)))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_CHART, "missing parameters");
-
- // parse the name, and make sure it does not include 'type.'
- if (unlikely(name && *name)) {
- // when data are streamed from child nodes
- // name will be type.name
- // so, we have to remove 'type.' from name too
- size_t len = strlen(type);
- if (strncmp(type, name, len) == 0 && name[len] == '.')
- name = &name[len + 1];
-
- // if the name is the same with the id,
- // or is just 'NULL', clear it.
- if (unlikely(strcmp(name, id) == 0 || strcasecmp(name, "NULL") == 0 || strcasecmp(name, "(NULL)") == 0))
- name = NULL;
- }
-
- int priority = 1000;
- if (likely(priority_s && *priority_s))
- priority = str2i(priority_s);
-
- int update_every = parser->user.cd->update_every;
- if (likely(update_every_s && *update_every_s))
- update_every = str2i(update_every_s);
- if (unlikely(!update_every))
- update_every = parser->user.cd->update_every;
-
- RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
- if (unlikely(chart))
- chart_type = rrdset_type_id(chart);
-
- if (unlikely(name && !*name))
- name = NULL;
- if (unlikely(family && !*family))
- family = NULL;
- if (unlikely(context && !*context))
- context = NULL;
- if (unlikely(!title))
- title = "";
- if (unlikely(!units))
- units = "unknown";
-
- netdata_log_debug(
- D_PLUGINSD,
- "creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
- type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
- priority, update_every);
-
- RRDSET *st = NULL;
-
- st = rrdset_create(
- host, type, id, name, family, context, title, units,
- (plugin && *plugin) ? plugin : parser->user.cd->filename,
- module, priority, update_every,
- chart_type);
-
- bool obsolete = false;
- if (likely(st)) {
- if (options && *options) {
- if (strstr(options, "obsolete")) {
- rrdset_is_obsolete___safe_from_collector_thread(st);
- obsolete = true;
- }
- else
- rrdset_isnot_obsolete___safe_from_collector_thread(st);
-
- if (strstr(options, "detail"))
- rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
- else
- rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
-
- if (strstr(options, "hidden"))
- rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
- else
- rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
-
- if (strstr(options, "store_first"))
- rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
- else
- rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
- }
- else {
- rrdset_isnot_obsolete___safe_from_collector_thread(st);
- rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
- rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
- }
-
- if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_CHART))
- return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- pluginsd_rrdset_cache_put_to_slot(parser, st, slot, obsolete);
- }
- else
- pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_CHART);
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) {
- const char *first_entry_txt = get_word(words, num_words, 1);
- const char *last_entry_txt = get_word(words, num_words, 2);
- const char *wall_clock_time_txt = get_word(words, num_words, 3);
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
- time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
- time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec();
-
- bool ok = true;
- if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
-
-#ifdef NETDATA_LOG_REPLICATION_REQUESTS
- st->replay.start_streaming = false;
- st->replay.after = 0;
- st->replay.before = 0;
-#endif
-
- rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
- rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
- rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
-
- ok = replicate_chart_request(send_to_plugin, parser, host, st,
- first_entry_child, last_entry_child, child_wall_clock_time,
- 0, 0);
- }
-#ifdef NETDATA_LOG_REPLICATION_REQUESTS
- else {
- internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request",
- rrdhost_hostname(st->rrdhost), rrdset_id(st));
- }
-#endif
-
- return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
-}
-
-static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) {
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
-
- char *id = get_word(words, num_words, idx++);
- char *name = get_word(words, num_words, idx++);
- char *algorithm = get_word(words, num_words, idx++);
- char *multiplier_s = get_word(words, num_words, idx++);
- char *divisor_s = get_word(words, num_words, idx++);
- char *options = get_word(words, num_words, idx++);
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_DIMENSION);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- if (unlikely(!id))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
-
- long multiplier = 1;
- if (multiplier_s && *multiplier_s) {
- multiplier = str2ll_encoded(multiplier_s);
- if (unlikely(!multiplier))
- multiplier = 1;
- }
-
- long divisor = 1;
- if (likely(divisor_s && *divisor_s)) {
- divisor = str2ll_encoded(divisor_s);
- if (unlikely(!divisor))
- divisor = 1;
- }
-
- if (unlikely(!algorithm || !*algorithm))
- algorithm = "absolute";
-
- if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
- netdata_log_debug(
- D_PLUGINSD,
- "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
- rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
- options ? options : "");
-
- RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm));
- int unhide_dimension = 1;
-
- rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
- bool obsolete = false;
- if (options && *options) {
- if (strstr(options, "obsolete") != NULL) {
- obsolete = true;
- rrddim_is_obsolete___safe_from_collector_thread(st, rd);
- }
- else
- rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
-
- unhide_dimension = !strstr(options, "hidden");
-
- if (strstr(options, "noreset") != NULL)
- rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
- if (strstr(options, "nooverflow") != NULL)
- rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
- }
- else
- rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
-
- bool should_update_dimension = false;
-
- if (likely(unhide_dimension)) {
- rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN);
- should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
- }
- else {
- rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
- should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
- }
-
- if (should_update_dimension) {
- rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE);
- rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
- }
-
- pluginsd_rrddim_put_to_slot(parser, st, rd, slot, obsolete);
-
- return PARSER_RC_OK;
-}
-
-// ----------------------------------------------------------------------------
-// execution of functions
-
-struct inflight_function {
- int code;
- int timeout;
- STRING *function;
- BUFFER *result_body_wb;
- rrd_function_result_callback_t result_cb;
- void *result_cb_data;
- usec_t timeout_ut;
- usec_t started_ut;
- usec_t sent_ut;
- const char *payload;
- PARSER *parser;
- bool virtual;
-};
-
-static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
- struct inflight_function *pf = func;
-
- PARSER *parser = parser_ptr;
-
- // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
- pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
-
- const char *transaction = dictionary_acquired_item_name(item);
-
- char buffer[2048 + 1];
- snprintfz(buffer, sizeof(buffer) - 1, "%s %s %d \"%s\"\n",
- pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION",
- transaction,
- pf->timeout,
- string2str(pf->function));
-
- // send the command to the plugin
- ssize_t ret = send_to_plugin(buffer, parser);
-
- pf->sent_ut = now_realtime_usec();
-
- if(ret < 0) {
- netdata_log_error("FUNCTION '%s': failed to send it to the plugin, error %zd", string2str(pf->function), ret);
- rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE);
- }
- else {
- internal_error(LOG_FUNCTIONS,
- "FUNCTION '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)",
- string2str(pf->function), dictionary_acquired_item_name(item), ret,
- pf->sent_ut - pf->started_ut);
- }
-
- if (!pf->payload)
- return;
-
- // send the payload to the plugin
- ret = send_to_plugin(pf->payload, parser);
-
- if(ret < 0) {
- netdata_log_error("FUNCTION_PAYLOAD '%s': failed to send function to plugin, error %zd", string2str(pf->function), ret);
- rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE);
- }
- else {
- internal_error(LOG_FUNCTIONS,
- "FUNCTION_PAYLOAD '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)",
- string2str(pf->function), dictionary_acquired_item_name(item), ret,
- pf->sent_ut - pf->started_ut);
- }
-
- send_to_plugin("\nFUNCTION_PAYLOAD_END\n", parser);
-}
-
-static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) {
- struct inflight_function *pf = new_func;
-
- netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
- pf->code = rrd_call_function_error(pf->result_body_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
- pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data);
- string_freez(pf->function);
-
- return false;
-}
-
-void delete_job_finalize(struct parser *parser __maybe_unused, struct configurable_plugin *plug, const char *fnc_sig, int code) {
- if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED)
- return;
-
- char *params_local = strdupz(fnc_sig);
- char *words[DYNCFG_MAX_WORDS];
- size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd);
-
- if (words_c != 3) {
- netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for delete_job");
- freez(params_local);
- return;
- }
-
- const char *module = words[1];
- const char *job = words[2];
-
- delete_job(plug, module, job);
-
- unlink_job(plug->name, module, job);
-
- rrdpush_send_job_deleted(localhost, plug->name, module, job);
-
- freez(params_local);
-}
-
-void set_job_finalize(struct parser *parser __maybe_unused, struct configurable_plugin *plug __maybe_unused, const char *fnc_sig, int code) {
- if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED)
- return;
-
- char *params_local = strdupz(fnc_sig);
- char *words[DYNCFG_MAX_WORDS];
- size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd);
-
- if (words_c != 3) {
- netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for set_job_config");
- freez(params_local);
- return;
- }
-
- const char *module_name = get_word(words, words_c, 1);
- const char *job_name = get_word(words, words_c, 2);
-
- if (register_job(parser->user.host->configurable_plugins, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED, 1)) {
- freez(params_local);
- return;
- }
-
- // only send this if it is not existing already (register_job cares for that)
- rrdpush_send_dyncfg_reg_job(localhost, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED);
-
- freez(params_local);
-}
-
-static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr) {
- struct inflight_function *pf = func;
- struct parser *parser = (struct parser *)parser_ptr;
-
- internal_error(LOG_FUNCTIONS,
- "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %"PRIu64" usec, response %"PRIu64" usec)",
- string2str(pf->function), dictionary_acquired_item_name(item),
- buffer_strlen(pf->result_body_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut);
-
- if (pf->virtual && SERVING_PLUGINSD(parser)) {
- if (pf->payload) {
- if (strncmp(string2str(pf->function), FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0)
- set_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code);
- dyn_conf_store_config(string2str(pf->function), pf->payload, parser->user.cd->configuration);
- } else if (strncmp(string2str(pf->function), FUNCTION_NAME_DELETE_JOB, strlen(FUNCTION_NAME_DELETE_JOB)) == 0) {
- delete_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code);
- }
- }
-
- pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data);
-
- string_freez(pf->function);
- freez((void *)pf->payload);
-}
-
-void inflight_functions_init(PARSER *parser) {
- parser->inflight.functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE, &dictionary_stats_category_functions, 0);
- dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser);
- dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser);
- dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser);
-}
-
-static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) {
- parser->inflight.smaller_timeout = 0;
- struct inflight_function *pf;
- dfe_start_write(parser->inflight.functions, pf) {
- if (pf->timeout_ut < now) {
- internal_error(true,
- "FUNCTION '%s' removing expired transaction '%s', after %"PRIu64" usec.",
- string2str(pf->function), pf_dfe.name, now - pf->started_ut);
-
- if(!buffer_strlen(pf->result_body_wb) || pf->code == HTTP_RESP_OK)
- pf->code = rrd_call_function_error(pf->result_body_wb,
- "Timeout waiting for collector response.",
- HTTP_RESP_GATEWAY_TIMEOUT);
-
- dictionary_del(parser->inflight.functions, pf_dfe.name);
- }
-
- else if(!parser->inflight.smaller_timeout || pf->timeout_ut < parser->inflight.smaller_timeout)
- parser->inflight.smaller_timeout = pf->timeout_ut;
- }
- dfe_done(pf);
-}
-
-void pluginsd_function_cancel(void *data) {
- struct inflight_function *look_for = data, *t;
-
- bool sent = false;
- dfe_start_read(look_for->parser->inflight.functions, t) {
- if(look_for == t) {
- const char *transaction = t_dfe.name;
-
- internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction);
-
- char buffer[2048 + 1];
- snprintfz(buffer, sizeof(buffer) - 1, "%s %s\n",
- PLUGINSD_KEYWORD_FUNCTION_CANCEL,
- transaction);
-
- // send the command to the plugin
- ssize_t ret = send_to_plugin(buffer, t->parser);
- if(ret < 0)
- sent = true;
-
- break;
- }
- }
- dfe_done(t);
-
- if(sent <= 0)
- nd_log(NDLS_DAEMON, NDLP_DEBUG,
- "PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d.");
-}
-
-// this is the function that is called from
-// rrd_call_function_and_wait() and rrd_call_function_async()
-static int pluginsd_function_execute_cb(BUFFER *result_body_wb, int timeout, const char *function,
- void *execute_cb_data,
- rrd_function_result_callback_t result_cb, void *result_cb_data,
- rrd_function_is_cancelled_cb_t is_cancelled_cb __maybe_unused,
- void *is_cancelled_cb_data __maybe_unused,
- rrd_function_register_canceller_cb_t register_canceller_cb,
- void *register_canceller_db_data) {
- PARSER *parser = execute_cb_data;
-
- usec_t now = now_realtime_usec();
-
- struct inflight_function tmp = {
- .started_ut = now,
- .timeout_ut = now + timeout * USEC_PER_SEC + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT,
- .result_body_wb = result_body_wb,
- .timeout = timeout,
- .function = string_strdupz(function),
- .result_cb = result_cb,
- .result_cb_data = result_cb_data,
- .payload = NULL,
- .parser = parser,
- };
-
- uuid_t uuid;
- uuid_generate_random(uuid);
-
- char transaction[UUID_STR_LEN];
- uuid_unparse_lower(uuid, transaction);
-
- dictionary_write_lock(parser->inflight.functions);
-
- // if there is any error, our dictionary callbacks will call the caller callback to notify
- // the caller about the error - no need for error handling here.
- void *t = dictionary_set(parser->inflight.functions, transaction, &tmp, sizeof(struct inflight_function));
- if(register_canceller_cb)
- register_canceller_cb(register_canceller_db_data, pluginsd_function_cancel, t);
-
- if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
- parser->inflight.smaller_timeout = tmp.timeout_ut;
-
- // garbage collect stale inflight functions
- if(parser->inflight.smaller_timeout < now)
- inflight_functions_garbage_collect(parser, now);
-
- dictionary_write_unlock(parser->inflight.functions);
-
- return HTTP_RESP_OK;
-}
-
-static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) {
- // a plugin or a child is registering a function
-
- bool global = false;
- size_t i = 1;
- if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
- i++;
- global = true;
- }
-
- char *name = get_word(words, num_words, i++);
- char *timeout_s = get_word(words, num_words, i++);
- char *help = get_word(words, num_words, i++);
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_FUNCTION);
- if(!host) return PARSER_RC_ERROR;
-
- RRDSET *st = (global)? NULL: pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
- if(!st) global = true;
-
- if (unlikely(!timeout_s || !name || !help || (!global && !st))) {
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
- rrdhost_hostname(host),
- st?rrdset_id(st):"(unset)",
- global?"yes":"no",
- name?name:"(unset)",
- timeout_s?timeout_s:"(unset)",
- help?help:"(unset)"
- );
- return PARSER_RC_ERROR;
- }
-
- int timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
- if (timeout_s && *timeout_s) {
- timeout = str2i(timeout_s);
- if (unlikely(timeout <= 0))
- timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
- }
-
- rrd_function_add(host, st, name, timeout, help, false, pluginsd_function_execute_cb, parser);
-
- parser->user.data_collections_count++;
-
- return PARSER_RC_OK;
-}
-
-static void pluginsd_function_result_end(struct parser *parser, void *action_data) {
- STRING *key = action_data;
- if(key)
- dictionary_del(parser->inflight.functions, string2str(key));
- string_freez(key);
-
- parser->user.data_collections_count++;
-}
-
-static inline PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, PARSER *parser) {
- char *key = get_word(words, num_words, 1);
- char *status = get_word(words, num_words, 2);
- char *format = get_word(words, num_words, 3);
- char *expires = get_word(words, num_words, 4);
-
- if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) {
- netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
- , key ? key : "(unset)"
- , status ? status : "(unset)"
- , format ? format : "(unset)"
- , expires ? expires : "(unset)"
- );
- }
-
- int code = (status && *status) ? str2i(status) : 0;
- if (code <= 0)
- code = HTTP_RESP_BACKEND_RESPONSE_INVALID;
-
- time_t expiration = (expires && *expires) ? str2l(expires) : 0;
-
- struct inflight_function *pf = NULL;
-
- if(key && *key)
- pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key);
-
- if(!pf) {
- netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)");
- }
- else {
- if(format && *format)
- pf->result_body_wb->content_type = functions_format_to_content_type(format);
-
- pf->code = code;
-
- pf->result_body_wb->expires = expiration;
- if(expiration <= now_realtime_sec())
- buffer_no_cacheable(pf->result_body_wb);
- else
- buffer_cacheable(pf->result_body_wb);
- }
-
- parser->defer.response = (pf) ? pf->result_body_wb : NULL;
- parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END;
- parser->defer.action = pluginsd_function_result_end;
- parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL
- parser->flags |= PARSER_DEFER_UNTIL_KEYWORD;
-
- return PARSER_RC_OK;
-}
-
-// ----------------------------------------------------------------------------
-
-static inline PARSER_RC pluginsd_variable(char **words, size_t num_words, PARSER *parser) {
- char *name = get_word(words, num_words, 1);
- char *value = get_word(words, num_words, 2);
- NETDATA_DOUBLE v;
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_VARIABLE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDSET *st = pluginsd_get_scope_chart(parser);
-
- int global = (st) ? 0 : 1;
-
- if (name && *name) {
- if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
- global = 1;
- name = get_word(words, num_words, 2);
- value = get_word(words, num_words, 3);
- } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
- global = 0;
- name = get_word(words, num_words, 2);
- value = get_word(words, num_words, 3);
- }
- }
-
- if (unlikely(!name || !*name))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
-
- if (unlikely(!value || !*value))
- value = NULL;
-
- if (unlikely(!value)) {
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
- rrdhost_hostname(host),
- st ? rrdset_id(st):"UNSET",
- (global) ? "HOST" : "CHART",
- name);
- return PARSER_RC_OK;
- }
-
- if (!global && !st)
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
-
- char *endptr = NULL;
- v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr);
- if (unlikely(endptr && *endptr)) {
- if (endptr == value)
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
- rrdhost_hostname(host),
- st ? rrdset_id(st):"UNSET",
- value,
- name);
- else
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
- rrdhost_hostname(host),
- st ? rrdset_id(st):"UNSET",
- value,
- name,
- endptr);
- }
-
- if (global) {
- const RRDVAR_ACQUIRED *rva = rrdvar_custom_host_variable_add_and_acquire(host, name);
- if (rva) {
- rrdvar_custom_host_variable_set(host, rva, v);
- rrdvar_custom_host_variable_release(host, rva);
- }
- else
- netdata_log_error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
- rrdhost_hostname(host),
- name);
- } else {
- const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name);
- if (rsa) {
- rrdsetvar_custom_chart_variable_set(st, rsa, v);
- rrdsetvar_custom_chart_variable_release(st, rsa);
- }
- else
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
- rrdhost_hostname(host), rrdset_id(st), name);
- }
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
- netdata_log_debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
- pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_FLUSH);
- parser->user.replay.start_time = 0;
- parser->user.replay.end_time = 0;
- parser->user.replay.start_time_ut = 0;
- parser->user.replay.end_time_ut = 0;
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
- netdata_log_info("PLUGINSD: plugin called DISABLE. Disabling it.");
- parser->user.enabled = 0;
- return PARSER_RC_STOP;
-}
-
-static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *parser) {
- const char *name = get_word(words, num_words, 1);
- const char *label_source = get_word(words, num_words, 2);
- const char *value = get_word(words, num_words, 3);
-
- if (!name || !label_source || !value)
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_LABEL, "missing parameters");
-
- char *store = (char *)value;
- bool allocated_store = false;
-
- if(unlikely(num_words > 4)) {
- allocated_store = true;
- store = mallocz(PLUGINSD_LINE_MAX + 1);
- size_t remaining = PLUGINSD_LINE_MAX;
- char *move = store;
- char *word;
- for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) {
- if(i > 3) {
- *move++ = ' ';
- *move = '\0';
- remaining--;
- }
-
- size_t length = strlen(word);
- if (length > remaining)
- length = remaining;
-
- remaining -= length;
- memcpy(move, word, length);
- move += length;
- *move = '\0';
- }
- }
-
- if(unlikely(!(parser->user.new_host_labels)))
- parser->user.new_host_labels = rrdlabels_create();
-
- if (strcmp(name,HOST_LABEL_IS_EPHEMERAL) == 0) {
- int is_ephemeral = appconfig_test_boolean_value((char *) value);
- if (is_ephemeral) {
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_LABEL);
- if (likely(host))
- rrdhost_option_set(host, RRDHOST_OPTION_EPHEMERAL_HOST);
- }
- }
-
- rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source));
-
- if (allocated_store)
- freez(store);
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_OVERWRITE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- netdata_log_debug(D_PLUGINSD, "requested to OVERWRITE host labels");
-
- if(unlikely(!host->rrdlabels))
- host->rrdlabels = rrdlabels_create();
-
- rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels);
- if (rrdhost_option_check(host, RRDHOST_OPTION_EPHEMERAL_HOST))
- rrdlabels_add(host->rrdlabels, HOST_LABEL_IS_EPHEMERAL, "true", RRDLABEL_SRC_CONFIG);
- rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
-
- rrdlabels_destroy(parser->user.new_host_labels);
- parser->user.new_host_labels = NULL;
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER *parser) {
- const char *name = get_word(words, num_words, 1);
- const char *value = get_word(words, num_words, 2);
- const char *label_source = get_word(words, num_words, 3);
-
- if (!name || !value || !label_source) {
- netdata_log_error("Ignoring malformed or empty CHART LABEL command.");
- return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- }
-
- if(unlikely(!parser->user.chart_rrdlabels_linked_temporarily)) {
- RRDSET *st = pluginsd_get_scope_chart(parser);
- parser->user.chart_rrdlabels_linked_temporarily = st->rrdlabels;
- rrdlabels_unmark_all(parser->user.chart_rrdlabels_linked_temporarily);
- }
-
- rrdlabels_add(parser->user.chart_rrdlabels_linked_temporarily, name, value, str2l(label_source));
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- netdata_log_debug(D_PLUGINSD, "requested to commit chart labels");
-
- if(!parser->user.chart_rrdlabels_linked_temporarily) {
- netdata_log_error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host));
- return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- }
-
- rrdlabels_remove_all_unmarked(parser->user.chart_rrdlabels_linked_temporarily);
-
- rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
- rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
- rrdset_metadata_updated(st);
-
- parser->user.chart_rrdlabels_linked_temporarily = NULL;
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) {
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
-
- char *id = get_word(words, num_words, idx++);
- char *start_time_str = get_word(words, num_words, idx++);
- char *end_time_str = get_word(words, num_words, idx++);
- char *child_now_str = get_word(words, num_words, idx++);
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDSET *st;
- if (likely(!id || !*id))
- st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- else
- st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_REPLAY_BEGIN);
-
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_REPLAY_BEGIN))
- return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- if(start_time_str && end_time_str) {
- time_t start_time = (time_t) str2ull_encoded(start_time_str);
- time_t end_time = (time_t) str2ull_encoded(end_time_str);
-
- time_t wall_clock_time = 0, tolerance;
- bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child;
- if(child_now_str) {
- wall_clock_time = (time_t) str2ull_encoded(child_now_str);
- tolerance = st->update_every + 1;
- wall_clock_comes_from_child = true;
- }
-
- if(wall_clock_time <= 0) {
- wall_clock_time = now_realtime_sec();
- tolerance = st->update_every + 5;
- wall_clock_comes_from_child = false;
- }
-
-#ifdef NETDATA_LOG_REPLICATION_REQUESTS
- internal_error(
- (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
- "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).",
- rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before);
-
- internal_error(
- true,
- "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, child wall clock is %ld (%s), had requested %ld to %ld",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "from child" : "parent time",
- st->replay.after, st->replay.before);
-#endif
-
- if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
- if (unlikely(end_time - start_time != st->update_every))
- rrdset_set_update_every_s(st, end_time - start_time);
-
- st->last_collected_time.tv_sec = end_time;
- st->last_collected_time.tv_usec = 0;
-
- st->last_updated.tv_sec = end_time;
- st->last_updated.tv_usec = 0;
-
- st->counter++;
- st->counter_done++;
-
- // these are only needed for db mode RAM, SAVE, MAP, ALLOC
- st->db.current_entry++;
- if(st->db.current_entry >= st->db.entries)
- st->db.current_entry -= st->db.entries;
-
- parser->user.replay.start_time = start_time;
- parser->user.replay.end_time = end_time;
- parser->user.replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
- parser->user.replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
- parser->user.replay.wall_clock_time = wall_clock_time;
- parser->user.replay.rset_enabled = true;
-
- return PARSER_RC_OK;
- }
-
- netdata_log_error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN
- " from %ld to %ld, but timestamps are invalid "
- "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
- rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
- wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock",
- tolerance);
- }
-
- // the child sends an RBEGIN without any parameters initially
- // setting rset_enabled to false, means the RSET should not store any metrics
- // to store metrics, the RBEGIN needs to have timestamps
- parser->user.replay.start_time = 0;
- parser->user.replay.end_time = 0;
- parser->user.replay.start_time_ut = 0;
- parser->user.replay.end_time_ut = 0;
- parser->user.replay.wall_clock_time = 0;
- parser->user.replay.rset_enabled = false;
- return PARSER_RC_OK;
-}
-
-static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str) {
- SN_FLAGS flags = SN_FLAG_NONE;
-
- char c;
- while ((c = *flags_str++)) {
- switch (c) {
- case 'A':
- flags |= SN_FLAG_NOT_ANOMALOUS;
- break;
-
- case 'R':
- flags |= SN_FLAG_RESET;
- break;
-
- case 'E':
- flags = SN_EMPTY_SLOT;
- return flags;
-
- default:
- internal_error(true, "Unknown SN_FLAGS flag '%c'", c);
- break;
- }
- }
-
- return flags;
-}
-
-static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser) {
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
-
- char *dimension = get_word(words, num_words, idx++);
- char *value_str = get_word(words, num_words, idx++);
- char *flags_str = get_word(words, num_words, idx++);
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_SET);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- if(!parser->user.replay.rset_enabled) {
- nd_log_limit_static_thread_var(erl, 1, 0);
- nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_ERR,
- "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
- rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
-
- // we have to return OK here
- return PARSER_RC_OK;
- }
-
- RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_REPLAY_SET);
- if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- st->pluginsd.set = true;
-
- if (unlikely(!parser->user.replay.start_time || !parser->user.replay.end_time)) {
- netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.",
- rrdhost_hostname(host),
- rrdset_id(st),
- dimension,
- PLUGINSD_KEYWORD_REPLAY_SET,
- parser->user.replay.start_time,
- parser->user.replay.end_time,
- PLUGINSD_KEYWORD_REPLAY_BEGIN);
- return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- }
-
- if (unlikely(!value_str || !*value_str))
- value_str = "NAN";
-
- if(unlikely(!flags_str))
- flags_str = "";
-
- if (likely(value_str)) {
- RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
-
- if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) {
- NETDATA_DOUBLE value = str2ndd_encoded(value_str, NULL);
- SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
-
- if (!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT)) {
- value = NAN;
- flags = SN_EMPTY_SLOT;
- }
-
- rrddim_store_metric(rd, parser->user.replay.end_time_ut, value, flags);
- rd->collector.last_collected_time.tv_sec = parser->user.replay.end_time;
- rd->collector.last_collected_time.tv_usec = 0;
- rd->collector.counter++;
- }
- else {
- nd_log_limit_static_global_var(erl, 1, 0);
- nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING,
- "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. "
- "Ignoring data.",
- rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
- }
- }
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, PARSER *parser) {
- if(parser->user.replay.rset_enabled == false)
- return PARSER_RC_OK;
-
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
-
- char *dimension = get_word(words, num_words, idx++);
- char *last_collected_ut_str = get_word(words, num_words, idx++);
- char *last_collected_value_str = get_word(words, num_words, idx++);
- char *last_calculated_value_str = get_word(words, num_words, idx++);
- char *last_stored_value_str = get_word(words, num_words, idx++);
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- if(st->pluginsd.set) {
- // reset pos to reuse the same RDAs
- st->pluginsd.pos = 0;
- st->pluginsd.set = false;
- }
-
- RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
- if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec;
- usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
- if(last_collected_ut > dim_last_collected_ut) {
- rd->collector.last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
- rd->collector.last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
- }
-
- rd->collector.last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0;
- rd->collector.last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0;
- rd->collector.last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0;
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, PARSER *parser) {
- if(parser->user.replay.rset_enabled == false)
- return PARSER_RC_OK;
-
- char *last_collected_ut_str = get_word(words, num_words, 1);
- char *last_updated_ut_str = get_word(words, num_words, 2);
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE,
- PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
- usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
- if(last_collected_ut > chart_last_collected_ut) {
- st->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
- st->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
- }
-
- usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec;
- usec_t last_updated_ut = last_updated_ut_str ? str2ull_encoded(last_updated_ut_str) : 0;
- if(last_updated_ut > chart_last_updated_ut) {
- st->last_updated.tv_sec = (time_t)(last_updated_ut / USEC_PER_SEC);
- st->last_updated.tv_usec = (last_updated_ut % USEC_PER_SEC);
- }
-
- st->counter++;
- st->counter_done++;
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARSER *parser) {
- if (num_words < 7) { // accepts 7, but the 7th is optional
- netdata_log_error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
- return PARSER_RC_ERROR;
- }
-
- const char *update_every_child_txt = get_word(words, num_words, 1);
- const char *first_entry_child_txt = get_word(words, num_words, 2);
- const char *last_entry_child_txt = get_word(words, num_words, 3);
- const char *start_streaming_txt = get_word(words, num_words, 4);
- const char *first_entry_requested_txt = get_word(words, num_words, 5);
- const char *last_entry_requested_txt = get_word(words, num_words, 6);
- const char *child_world_time_txt = get_word(words, num_words, 7); // optional
-
- time_t update_every_child = (time_t) str2ull_encoded(update_every_child_txt);
- time_t first_entry_child = (time_t) str2ull_encoded(first_entry_child_txt);
- time_t last_entry_child = (time_t) str2ull_encoded(last_entry_child_txt);
-
- bool start_streaming = (strcmp(start_streaming_txt, "true") == 0);
- time_t first_entry_requested = (time_t) str2ull_encoded(first_entry_requested_txt);
- time_t last_entry_requested = (time_t) str2ull_encoded(last_entry_requested_txt);
-
- // the optional child world time
- time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t) str2ull_encoded(
- child_world_time_txt) : now_realtime_sec();
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_END);
- if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
- if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
-#ifdef NETDATA_LOG_REPLICATION_REQUESTS
- internal_error(true,
- "PLUGINSD REPLAY: 'host:%s/chart:%s': got a " PLUGINSD_KEYWORD_REPLAY_END " child db from %llu to %llu, start_streaming %s, had requested from %llu to %llu, wall clock %llu",
- rrdhost_hostname(host), rrdset_id(st),
- (unsigned long long)first_entry_child, (unsigned long long)last_entry_child,
- start_streaming?"true":"false",
- (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested,
- (unsigned long long)child_world_time
- );
-#endif
-
- parser->user.data_collections_count++;
-
- if(parser->user.replay.rset_enabled && st->rrdhost->receiver) {
- time_t now = now_realtime_sec();
- time_t started = st->rrdhost->receiver->replication_first_time_t;
- time_t current = parser->user.replay.end_time;
-
- if(started && current > started) {
- host->rrdpush_receiver_replication_percent = (NETDATA_DOUBLE) (current - started) * 100.0 / (NETDATA_DOUBLE) (now - started);
- worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
- host->rrdpush_receiver_replication_percent);
- }
- }
-
- parser->user.replay.start_time = 0;
- parser->user.replay.end_time = 0;
- parser->user.replay.start_time_ut = 0;
- parser->user.replay.end_time_ut = 0;
- parser->user.replay.wall_clock_time = 0;
- parser->user.replay.rset_enabled = false;
-
- st->counter++;
- st->counter_done++;
- store_metric_collection_completed();
-
-#ifdef NETDATA_LOG_REPLICATION_REQUESTS
- st->replay.start_streaming = false;
- st->replay.after = 0;
- st->replay.before = 0;
- if(start_streaming)
- st->replay.log_next_data_collection = true;
-#endif
-
- if (start_streaming) {
- if (st->update_every != update_every_child)
- rrdset_set_update_every_s(st, update_every_child);
-
- if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
- rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
- rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
- rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
- rrdhost_receiver_replicating_charts_minus_one(st->rrdhost);
- }
-#ifdef NETDATA_LOG_REPLICATION_REQUESTS
- else
- internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
- rrdhost_hostname(host), rrdset_id(st));
-#endif
-
- pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END);
-
- host->rrdpush_receiver_replication_percent = 100.0;
- worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, host->rrdpush_receiver_replication_percent);
-
- return PARSER_RC_OK;
- }
-
- pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END);
-
- rrdcontext_updated_retention_rrdset(st);
-
- bool ok = replicate_chart_request(send_to_plugin, parser, host, st,
- first_entry_child, last_entry_child, child_world_time,
- first_entry_requested, last_entry_requested);
- return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
-}
-
-static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) {
- timing_init();
-
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
-
- char *id = get_word(words, num_words, idx++);
- char *update_every_str = get_word(words, num_words, idx++);
- char *end_time_str = get_word(words, num_words, idx++);
- char *wall_clock_time_str = get_word(words, num_words, idx++);
-
- if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- timing_step(TIMING_STEP_BEGIN2_PREPARE);
-
- RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN_V2);
-
- if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2))
- return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))
- rrdset_isnot_obsolete___safe_from_collector_thread(st);
-
- timing_step(TIMING_STEP_BEGIN2_FIND_CHART);
-
- // ------------------------------------------------------------------------
- // parse the parameters
-
- time_t update_every = (time_t) str2ull_encoded(update_every_str);
- time_t end_time = (time_t) str2ull_encoded(end_time_str);
-
- time_t wall_clock_time;
- if(likely(*wall_clock_time_str == '#'))
- wall_clock_time = end_time;
- else
- wall_clock_time = (time_t) str2ull_encoded(wall_clock_time_str);
-
- if (unlikely(update_every != st->update_every))
- rrdset_set_update_every_s(st, update_every);
-
- timing_step(TIMING_STEP_BEGIN2_PARSE);
-
- // ------------------------------------------------------------------------
- // prepare our state
-
- pluginsd_lock_rrdset_data_collection(parser);
-
- parser->user.v2.update_every = update_every;
- parser->user.v2.end_time = end_time;
- parser->user.v2.wall_clock_time = wall_clock_time;
- parser->user.v2.ml_locked = ml_chart_update_begin(st);
-
- timing_step(TIMING_STEP_BEGIN2_ML);
-
- // ------------------------------------------------------------------------
- // propagate it forward in v2
-
- if(!parser->user.v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost))
- parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time);
-
- if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) {
- // check receiver capabilities
- bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
-
- // check sender capabilities
- bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false;
- NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
-
- BUFFER *wb = parser->user.v2.stream_buffer.wb;
-
- buffer_need_bytes(wb, 1024);
-
- if(unlikely(parser->user.v2.stream_buffer.begin_v2_added))
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
-
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1);
-
- if(with_slots) {
- buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
- buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot);
- }
-
- buffer_fast_strcat(wb, " '", 2);
- buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
- buffer_fast_strcat(wb, "' ", 2);
-
- if(can_copy)
- buffer_strcat(wb, update_every_str);
- else
- buffer_print_uint64_encoded(wb, integer_encoding, update_every);
-
- buffer_fast_strcat(wb, " ", 1);
-
- if(can_copy)
- buffer_strcat(wb, end_time_str);
- else
- buffer_print_uint64_encoded(wb, integer_encoding, end_time);
-
- buffer_fast_strcat(wb, " ", 1);
-
- if(can_copy)
- buffer_strcat(wb, wall_clock_time_str);
- else
- buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
-
- buffer_fast_strcat(wb, "\n", 1);
-
- parser->user.v2.stream_buffer.last_point_end_time_s = end_time;
- parser->user.v2.stream_buffer.begin_v2_added = true;
- }
-
- timing_step(TIMING_STEP_BEGIN2_PROPAGATE);
-
- // ------------------------------------------------------------------------
- // store it
-
- st->last_collected_time.tv_sec = end_time;
- st->last_collected_time.tv_usec = 0;
- st->last_updated.tv_sec = end_time;
- st->last_updated.tv_usec = 0;
- st->counter++;
- st->counter_done++;
-
- // these are only needed for db mode RAM, SAVE, MAP, ALLOC
- st->db.current_entry++;
- if(st->db.current_entry >= st->db.entries)
- st->db.current_entry -= st->db.entries;
-
- timing_step(TIMING_STEP_BEGIN2_STORE);
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) {
- timing_init();
-
- int idx = 1;
- ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
- if(slot >= 0) idx++;
-
- char *dimension = get_word(words, num_words, idx++);
- char *collected_str = get_word(words, num_words, idx++);
- char *value_str = get_word(words, num_words, idx++);
- char *flags_str = get_word(words, num_words, idx++);
-
- if(unlikely(!dimension || !collected_str || !value_str || !flags_str))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET_V2);
- if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- timing_step(TIMING_STEP_SET2_PREPARE);
-
- RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET_V2);
- if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- st->pluginsd.set = true;
-
- if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED)))
- rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
-
- timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION);
-
- // ------------------------------------------------------------------------
- // parse the parameters
-
- collected_number collected_value = (collected_number) str2ll_encoded(collected_str);
-
- NETDATA_DOUBLE value;
- if(*value_str == '#')
- value = (NETDATA_DOUBLE)collected_value;
- else
- value = str2ndd_encoded(value_str, NULL);
-
- SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
-
- timing_step(TIMING_STEP_SET2_PARSE);
-
- // ------------------------------------------------------------------------
- // check value and ML
-
- if (unlikely(!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT))) {
- value = NAN;
- flags = SN_EMPTY_SLOT;
-
- if(parser->user.v2.ml_locked)
- ml_dimension_is_anomalous(rd, parser->user.v2.end_time, 0, false);
- }
- else if(parser->user.v2.ml_locked) {
- if (ml_dimension_is_anomalous(rd, parser->user.v2.end_time, value, true)) {
- // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
- flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS);
- }
- else
- flags |= SN_FLAG_NOT_ANOMALOUS;
- }
-
- timing_step(TIMING_STEP_SET2_ML);
-
- // ------------------------------------------------------------------------
- // propagate it forward in v2
-
- if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) {
- // check if receiver and sender have the same number parsing capabilities
- bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
-
- // check the sender capabilities
- bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false;
- NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
- NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
-
- BUFFER *wb = parser->user.v2.stream_buffer.wb;
- buffer_need_bytes(wb, 1024);
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1);
-
- if(with_slots) {
- buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
- buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
- }
-
- buffer_fast_strcat(wb, " '", 2);
- buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
- buffer_fast_strcat(wb, "' ", 2);
- if(can_copy)
- buffer_strcat(wb, collected_str);
- else
- buffer_print_int64_encoded(wb, integer_encoding, collected_value); // original v2 had hex
- buffer_fast_strcat(wb, " ", 1);
- if(can_copy)
- buffer_strcat(wb, value_str);
- else
- buffer_print_netdata_double_encoded(wb, doubles_encoding, value); // original v2 had decimal
- buffer_fast_strcat(wb, " ", 1);
- buffer_print_sn_flags(wb, flags, true);
- buffer_fast_strcat(wb, "\n", 1);
- }
-
- timing_step(TIMING_STEP_SET2_PROPAGATE);
-
- // ------------------------------------------------------------------------
- // store it
-
- rrddim_store_metric(rd, parser->user.v2.end_time * USEC_PER_SEC, value, flags);
- rd->collector.last_collected_time.tv_sec = parser->user.v2.end_time;
- rd->collector.last_collected_time.tv_usec = 0;
- rd->collector.last_collected_value = collected_value;
- rd->collector.last_stored_value = value;
- rd->collector.last_calculated_value = value;
- rd->collector.counter++;
- rrddim_set_updated(rd);
-
- timing_step(TIMING_STEP_SET2_STORE);
-
- return PARSER_RC_OK;
-}
-
-void pluginsd_cleanup_v2(PARSER *parser) {
- // this is called when the thread is stopped while processing
- pluginsd_clear_scope_chart(parser, "THREAD CLEANUP");
-}
-
-static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
- timing_init();
-
- RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END_V2);
- if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2);
- if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
-
- parser->user.data_collections_count++;
-
- timing_step(TIMING_STEP_END2_PREPARE);
-
- // ------------------------------------------------------------------------
- // propagate the whole chart update in v1
-
- if(unlikely(!parser->user.v2.stream_buffer.v2 && !parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb))
- rrdset_push_metrics_v1(&parser->user.v2.stream_buffer, st);
-
- timing_step(TIMING_STEP_END2_PUSH_V1);
-
- // ------------------------------------------------------------------------
- // unblock data collection
-
- pluginsd_unlock_previous_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, false);
- rrdcontext_collected_rrdset(st);
- store_metric_collection_completed();
-
- timing_step(TIMING_STEP_END2_RRDSET);
-
- // ------------------------------------------------------------------------
- // propagate it forward
-
- rrdset_push_metrics_finished(&parser->user.v2.stream_buffer, st);
-
- timing_step(TIMING_STEP_END2_PROPAGATE);
-
- // ------------------------------------------------------------------------
- // cleanup RRDSET / RRDDIM
-
- if(likely(st->pluginsd.dims_with_slots)) {
- for(size_t i = 0; i < st->pluginsd.size ;i++) {
- RRDDIM *rd = st->pluginsd.prd_array[i].rd;
-
- if(!rd)
- continue;
-
- rd->collector.calculated_value = 0;
- rd->collector.collected_value = 0;
- rrddim_clear_updated(rd);
- }
- }
- else {
- RRDDIM *rd;
- rrddim_foreach_read(rd, st){
- rd->collector.calculated_value = 0;
- rd->collector.collected_value = 0;
- rrddim_clear_updated(rd);
- }
- rrddim_foreach_done(rd);
- }
-
- // ------------------------------------------------------------------------
- // reset state
-
- parser->user.v2 = (struct parser_user_object_v2){ 0 };
-
- timing_step(TIMING_STEP_END2_STORE);
- timing_report();
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
- netdata_log_info("PLUGINSD: plugin called EXIT.");
- return PARSER_RC_STOP;
-}
-
-struct mutex_cond {
- pthread_mutex_t lock;
- pthread_cond_t cond;
- int rc;
-};
-
-static void virt_fnc_got_data_cb(BUFFER *wb __maybe_unused, int code, void *callback_data)
-{
- struct mutex_cond *ctx = callback_data;
- pthread_mutex_lock(&ctx->lock);
- ctx->rc = code;
- pthread_cond_broadcast(&ctx->cond);
- pthread_mutex_unlock(&ctx->lock);
-}
-
-#define VIRT_FNC_TIMEOUT 1
-#define VIRT_FNC_BUF_SIZE (4096)
-void call_virtual_function_async(BUFFER *wb, RRDHOST *host, const char *name, const char *payload, rrd_function_result_callback_t callback, void *callback_data) {
- PARSER *parser = NULL;
-
- //TODO simplify (as we really need only first parameter to get plugin name maybe we can avoid parsing all)
- char *words[PLUGINSD_MAX_WORDS];
- char *function_with_params = strdupz(name);
- size_t num_words = quoted_strings_splitter(function_with_params, words, PLUGINSD_MAX_WORDS, isspace_map_pluginsd);
-
- if (num_words < 2) {
- netdata_log_error("PLUGINSD: virtual function name is empty.");
- freez(function_with_params);
- return;
- }
-
- const DICTIONARY_ITEM *cpi = dictionary_get_and_acquire_item(host->configurable_plugins, get_word(words, num_words, 1));
- if (unlikely(cpi == NULL)) {
- netdata_log_error("PLUGINSD: virtual function plugin '%s' not found.", name);
- freez(function_with_params);
- return;
- }
- struct configurable_plugin *cp = dictionary_acquired_item_value(cpi);
- parser = (PARSER *)cp->cb_usr_ctx;
-
- BUFFER *function_out = buffer_create(VIRT_FNC_BUF_SIZE, NULL);
- // if we are forwarding this to a plugin (as opposed to streaming/child) we have to remove the first parameter (plugin_name)
- buffer_strcat(function_out, get_word(words, num_words, 0));
- for (size_t i = 1; i < num_words; i++) {
- if (i == 1 && SERVING_PLUGINSD(parser))
- continue;
- buffer_sprintf(function_out, " %s", get_word(words, num_words, i));
- }
- freez(function_with_params);
-
- usec_t now = now_realtime_usec();
-
- struct inflight_function tmp = {
- .started_ut = now,
- .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC,
- .result_body_wb = wb,
- .timeout = VIRT_FNC_TIMEOUT * 10,
- .function = string_strdupz(buffer_tostring(function_out)),
- .result_cb = callback,
- .result_cb_data = callback_data,
- .payload = payload != NULL ? strdupz(payload) : NULL,
- .virtual = true,
- };
- buffer_free(function_out);
-
- uuid_t uuid;
- uuid_generate_time(uuid);
-
- char key[UUID_STR_LEN];
- uuid_unparse_lower(uuid, key);
-
- dictionary_write_lock(parser->inflight.functions);
-
- // if there is any error, our dictionary callbacks will call the caller callback to notify
- // the caller about the error - no need for error handling here.
- dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
-
- if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
- parser->inflight.smaller_timeout = tmp.timeout_ut;
-
- // garbage collect stale inflight functions
- if(parser->inflight.smaller_timeout < now)
- inflight_functions_garbage_collect(parser, now);
-
- dictionary_write_unlock(parser->inflight.functions);
-}
-
-
-dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, int *rc, const char *payload) {
- usec_t now = now_realtime_usec();
- BUFFER *wb = buffer_create(VIRT_FNC_BUF_SIZE, NULL);
-
- struct mutex_cond cond = {
- .lock = PTHREAD_MUTEX_INITIALIZER,
- .cond = PTHREAD_COND_INITIALIZER
- };
-
- struct inflight_function tmp = {
- .started_ut = now,
- .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC,
- .result_body_wb = wb,
- .timeout = VIRT_FNC_TIMEOUT,
- .function = string_strdupz(name),
- .result_cb = virt_fnc_got_data_cb,
- .result_cb_data = &cond,
- .payload = payload != NULL ? strdupz(payload) : NULL,
- .virtual = true,
- };
-
- uuid_t uuid;
- uuid_generate_time(uuid);
-
- char key[UUID_STR_LEN];
- uuid_unparse_lower(uuid, key);
-
- dictionary_write_lock(parser->inflight.functions);
-
- // if there is any error, our dictionary callbacks will call the caller callback to notify
- // the caller about the error - no need for error handling here.
- dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
-
- if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
- parser->inflight.smaller_timeout = tmp.timeout_ut;
-
- // garbage collect stale inflight functions
- if(parser->inflight.smaller_timeout < now)
- inflight_functions_garbage_collect(parser, now);
-
- dictionary_write_unlock(parser->inflight.functions);
-
- struct timespec tp;
- clock_gettime(CLOCK_REALTIME, &tp);
- tp.tv_sec += (time_t)VIRT_FNC_TIMEOUT;
-
- pthread_mutex_lock(&cond.lock);
-
- int ret = pthread_cond_timedwait(&cond.cond, &cond.lock, &tp);
- if (ret == ETIMEDOUT)
- netdata_log_error("PLUGINSD: DYNCFG virtual function %s timed out", name);
-
- pthread_mutex_unlock(&cond.lock);
-
- dyncfg_config_t cfg;
- cfg.data = strdupz(buffer_tostring(wb));
- cfg.data_size = buffer_strlen(wb);
-
- if (rc != NULL)
- *rc = cond.rc;
-
- buffer_free(wb);
- return cfg;
-}
-
-#define CVF_MAX_LEN (1024)
-static dyncfg_config_t get_plugin_config_cb(void *usr_ctx, const char *plugin_name)
-{
- PARSER *parser = usr_ctx;
-
- if (SERVING_STREAMING(parser)) {
- char buf[CVF_MAX_LEN + 1];
- snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG " %s", plugin_name);
- return call_virtual_function_blocking(parser, buf, NULL, NULL);
- }
-
- return call_virtual_function_blocking(parser, FUNCTION_NAME_GET_PLUGIN_CONFIG, NULL, NULL);
-}
-
-static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx, const char *plugin_name)
-{
- PARSER *parser = usr_ctx;
-
- if (SERVING_STREAMING(parser)) {
- char buf[CVF_MAX_LEN + 1];
- snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA " %s", plugin_name);
- return call_virtual_function_blocking(parser, buf, NULL, NULL);
- }
-
- return call_virtual_function_blocking(parser, "get_plugin_config_schema", NULL, NULL);
-}
-
-static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
-{
- PARSER *parser = usr_ctx;
- BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
-
- buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG);
- if (SERVING_STREAMING(parser))
- buffer_sprintf(wb, " %s", plugin_name);
-
- buffer_sprintf(wb, " %s", module_name);
-
- dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
-
- buffer_free(wb);
-
- return ret;
-}
-
-static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
-{
- PARSER *parser = usr_ctx;
- BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
-
- buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA);
- if (SERVING_STREAMING(parser))
- buffer_sprintf(wb, " %s", plugin_name);
-
- buffer_sprintf(wb, " %s", module_name);
-
- dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
-
- buffer_free(wb);
-
- return ret;
-}
-
-static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
-{
- PARSER *parser = usr_ctx;
- BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
-
- buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA);
-
- if (SERVING_STREAMING(parser))
- buffer_sprintf(wb, " %s", plugin_name);
-
- buffer_sprintf(wb, " %s", module_name);
-
- dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
-
- buffer_free(wb);
-
- return ret;
-}
-
-static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char* job_name)
-{
- PARSER *parser = usr_ctx;
- BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
-
- buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG);
-
- if (SERVING_STREAMING(parser))
- buffer_sprintf(wb, " %s", plugin_name);
-
- buffer_sprintf(wb, " %s %s", module_name, job_name);
-
- dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
-
- buffer_free(wb);
-
- return ret;
-}
-
-enum set_config_result set_plugin_config_cb(void *usr_ctx, const char *plugin_name, dyncfg_config_t *cfg)
-{
- PARSER *parser = usr_ctx;
- BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
-
- buffer_strcat(wb, FUNCTION_NAME_SET_PLUGIN_CONFIG);
-
- if (SERVING_STREAMING(parser))
- buffer_sprintf(wb, " %s", plugin_name);
-
- int rc;
- call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
-
- buffer_free(wb);
- if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
- return SET_CONFIG_REJECTED;
- return SET_CONFIG_ACCEPTED;
-}
-
-enum set_config_result set_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, dyncfg_config_t *cfg)
-{
- PARSER *parser = usr_ctx;
- BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
-
- buffer_strcat(wb, FUNCTION_NAME_SET_MODULE_CONFIG);
-
- if (SERVING_STREAMING(parser))
- buffer_sprintf(wb, " %s", plugin_name);
-
- buffer_sprintf(wb, " %s", module_name);
-
- int rc;
- call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
-
- buffer_free(wb);
-
- if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
- return SET_CONFIG_REJECTED;
- return SET_CONFIG_ACCEPTED;
-}
-
-enum set_config_result set_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name, dyncfg_config_t *cfg)
-{
- PARSER *parser = usr_ctx;
- BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
-
- buffer_strcat(wb, FUNCTION_NAME_SET_JOB_CONFIG);
-
- if (SERVING_STREAMING(parser))
- buffer_sprintf(wb, " %s", plugin_name);
-
- buffer_sprintf(wb, " %s %s", module_name, job_name);
-
- int rc;
- call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
-
- buffer_free(wb);
-
- if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
- return SET_CONFIG_REJECTED;
- return SET_CONFIG_ACCEPTED;
-}
-
-enum set_config_result delete_job_cb(void *usr_ctx, const char *plugin_name ,const char *module_name, const char *job_name)
-{
- PARSER *parser = usr_ctx;
- BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
-
- buffer_strcat(wb, FUNCTION_NAME_DELETE_JOB);
-
- if (SERVING_STREAMING(parser))
- buffer_sprintf(wb, " %s", plugin_name);
-
- buffer_sprintf(wb, " %s %s", module_name, job_name);
-
- int rc;
- call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, NULL);
-
- buffer_free(wb);
-
- if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
- return SET_CONFIG_REJECTED;
- return SET_CONFIG_ACCEPTED;
-}
-
-
-static inline PARSER_RC pluginsd_register_plugin(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
- netdata_log_info("PLUGINSD: DYNCFG_ENABLE");
-
- if (unlikely (num_words != 2))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "missing name parameter");
-
- struct configurable_plugin *cfg = callocz(1, sizeof(struct configurable_plugin));
-
- cfg->name = strdupz(words[1]);
- cfg->set_config_cb = set_plugin_config_cb;
- cfg->get_config_cb = get_plugin_config_cb;
- cfg->get_config_schema_cb = get_plugin_config_schema_cb;
- cfg->cb_usr_ctx = parser;
-
- const DICTIONARY_ITEM *di = register_plugin(parser->user.host->configurable_plugins, cfg, SERVING_PLUGINSD(parser));
- if (unlikely(di == NULL)) {
- freez(cfg->name);
- freez(cfg);
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "error registering plugin");
- }
-
- if (SERVING_PLUGINSD(parser)) {
- // this is optimization for pluginsd to avoid extra dictionary lookup
- // as we know which plugin is comunicating with us
- parser->user.cd->cfg_dict_item = di;
- parser->user.cd->configuration = cfg;
- } else {
- // register_plugin keeps the item acquired, so we need to release it
- dictionary_acquired_item_release(parser->user.host->configurable_plugins, di);
- }
-
- rrdpush_send_dyncfg_enable(parser->user.host, cfg->name);
-
- return PARSER_RC_OK;
-}
-
-#define LOG_MSG_SIZE (1024)
-#define MODULE_NAME_IDX (SERVING_PLUGINSD(parser) ? 1 : 2)
-#define MODULE_TYPE_IDX (SERVING_PLUGINSD(parser) ? 2 : 3)
-static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
- netdata_log_info("PLUGINSD: DYNCFG_REG_MODULE");
-
- size_t expected_num_words = SERVING_PLUGINSD(parser) ? 3 : 4;
-
- if (unlikely(num_words != expected_num_words)) {
- char log[LOG_MSG_SIZE + 1];
- snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name module_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name ");
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, log);
- }
-
- struct configurable_plugin *plug_cfg;
- const DICTIONARY_ITEM *di = NULL;
- if (SERVING_PLUGINSD(parser)) {
- plug_cfg = parser->user.cd->configuration;
- if (unlikely(plug_cfg == NULL))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE);
- } else {
- di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]);
- if (unlikely(di == NULL))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "plugin not found");
-
- plug_cfg = (struct configurable_plugin *)dictionary_acquired_item_value(di);
- }
-
- struct module *mod = callocz(1, sizeof(struct module));
-
- mod->type = str2_module_type(words[MODULE_TYPE_IDX]);
- if (unlikely(mod->type == MOD_TYPE_UNKNOWN)) {
- freez(mod);
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "unknown module type (allowed: job_array, single)");
- }
-
- mod->name = strdupz(words[MODULE_NAME_IDX]);
-
- mod->set_config_cb = set_module_config_cb;
- mod->get_config_cb = get_module_config_cb;
- mod->get_config_schema_cb = get_module_config_schema_cb;
- mod->config_cb_usr_ctx = parser;
-
- mod->get_job_config_cb = get_job_config_cb;
- mod->get_job_config_schema_cb = get_job_config_schema_cb;
- mod->set_job_config_cb = set_job_config_cb;
- mod->delete_job_cb = delete_job_cb;
- mod->job_config_cb_usr_ctx = parser;
-
- register_module(parser->user.host->configurable_plugins, plug_cfg, mod, SERVING_PLUGINSD(parser));
-
- if (di != NULL)
- dictionary_acquired_item_release(parser->user.host->configurable_plugins, di);
-
- rrdpush_send_dyncfg_reg_module(parser->user.host, plug_cfg->name, mod->name, mod->type);
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_register_job_common(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused, const char *plugin_name) {
- const char *module_name = words[0];
- const char *job_name = words[1];
- const char *job_type_str = words[2];
- const char *flags_str = words[3];
-
- long f = str2l(flags_str);
-
- if (f < 0)
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags received");
-
- dyncfg_job_flg_t flags = f;
-
- if (SERVING_PLUGINSD(parser))
- flags |= JOB_FLG_PLUGIN_PUSHED;
- else
- flags |= JOB_FLG_STREAMING_PUSHED;
-
- enum job_type job_type = dyncfg_str2job_type(job_type_str);
- if (job_type == JOB_TYPE_UNKNOWN)
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "unknown job type");
-
- if (SERVING_PLUGINSD(parser) && job_type == JOB_TYPE_USER)
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "plugins cannot push jobs of type \"user\" (this is allowed only in streaming)");
-
- if (register_job(parser->user.host->configurable_plugins, plugin_name, module_name, job_name, job_type, flags, 0)) // ignore existing is off as this is explicitly called register job
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "error registering job");
-
- rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, module_name, job_name, job_type, flags);
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_register_job(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
- size_t expected_num_words = SERVING_PLUGINSD(parser) ? 5 : 6;
-
- if (unlikely(num_words != expected_num_words)) {
- char log[LOG_MSG_SIZE + 1];
- snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name job_name job_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name ");
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, log);
- }
-
- if (SERVING_PLUGINSD(parser)) {
- return pluginsd_register_job_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name);
- }
- return pluginsd_register_job_common(&words[2], num_words - 2, parser, words[1]);
-}
-
-static inline PARSER_RC pluginsd_dyncfg_reset(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
- if (unlikely(num_words != (SERVING_PLUGINSD(parser) ? 1 : 2)))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_RESET, SERVING_PLUGINSD(parser) ? "expected 0 parameters" : "expected 1 parameter: plugin_name");
-
- if (SERVING_PLUGINSD(parser)) {
- unregister_plugin(parser->user.host->configurable_plugins, parser->user.cd->cfg_dict_item);
- rrdpush_send_dyncfg_reset(parser->user.host, parser->user.cd->configuration->name);
- parser->user.cd->configuration = NULL;
- } else {
- const DICTIONARY_ITEM *di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]);
- if (unlikely(di == NULL))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_RESET, "plugin not found");
- unregister_plugin(parser->user.host->configurable_plugins, di);
- rrdpush_send_dyncfg_reset(parser->user.host, words[1]);
- }
-
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_words, PARSER *parser, const char *plugin_name) {
- int state = str2i(words[3]);
-
- enum job_status status = str2job_state(words[2]);
- if (unlikely(SERVING_PLUGINSD(parser) && status == JOB_STATUS_UNKNOWN))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job status");
-
- char *message = NULL;
- if (num_words == 5 && strlen(words[4]) > 0)
- message = words[4];
-
- const DICTIONARY_ITEM *plugin_item;
- DICTIONARY *job_dict;
- const DICTIONARY_ITEM *job_item = report_job_status_acq_lock(parser->user.host->configurable_plugins, &plugin_item, &job_dict, plugin_name, words[0], words[1], status, state, message);
-
- if (job_item != NULL) {
- struct job *job = dictionary_acquired_item_value(job_item);
- rrdpush_send_job_status_update(parser->user.host, plugin_name, words[0], job);
-
- pthread_mutex_unlock(&job->lock);
- dictionary_acquired_item_release(job_dict, job_item);
- dictionary_acquired_item_release(parser->user.host->configurable_plugins, plugin_item);
- }
-
- return PARSER_RC_OK;
-}
-
-// job_status [plugin_name if streaming] <module_name> <job_name> <status_code> <state> [message]
-static PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser) {
- if (SERVING_PLUGINSD(parser)) {
- if (unlikely(num_words != 5 && num_words != 6))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]");
- } else {
- if (unlikely(num_words != 6 && num_words != 7))
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 5 or 6 parameters: plugin_name, module_name, job_name, status_code, state, [optional: message]");
- }
-
- if (SERVING_PLUGINSD(parser)) {
- return pluginsd_job_status_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name);
- }
- return pluginsd_job_status_common(&words[2], num_words - 2, parser, words[1]);
-}
-
-static PARSER_RC pluginsd_delete_job(char **words, size_t num_words, PARSER *parser) {
- // this can confuse a bit but there is a diference between KEYWORD_DELETE_JOB and actual delete_job function
- // they are of opossite direction
- if (num_words != 4)
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DELETE_JOB, "expected 2 parameters: plugin_name, module_name, job_name");
-
- const char *plugin_name = get_word(words, num_words, 1);
- const char *module_name = get_word(words, num_words, 2);
- const char *job_name = get_word(words, num_words, 3);
-
- if (SERVING_STREAMING(parser))
- delete_job_pname(parser->user.host->configurable_plugins, plugin_name, module_name, job_name);
-
- // forward to parent if any
- rrdpush_send_job_deleted(parser->user.host, plugin_name, module_name, job_name);
- return PARSER_RC_OK;
-}
-
-static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser)
-{
- const char *host_uuid_str = get_word(words, num_words, 1);
- const char *claim_id_str = get_word(words, num_words, 2);
-
- if (!host_uuid_str || !claim_id_str) {
- netdata_log_error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
- host_uuid_str ? host_uuid_str : "[unset]",
- claim_id_str ? claim_id_str : "[unset]");
- return PARSER_RC_ERROR;
- }
-
- uuid_t uuid;
- RRDHOST *host = parser->user.host;
-
- // We don't need the parsed UUID
- // just do it to check the format
- if(uuid_parse(host_uuid_str, uuid)) {
- netdata_log_error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
- return PARSER_RC_ERROR;
- }
- if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL") != 0) {
- netdata_log_error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
- return PARSER_RC_ERROR;
- }
-
- if(strcmp(host_uuid_str, host->machine_guid) != 0) {
- netdata_log_error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
- return PARSER_RC_OK; //the message is OK problem must be somewhere else
- }
-
- rrdhost_aclk_state_lock(host);
-
- if (host->aclk_state.claimed_id)
- freez(host->aclk_state.claimed_id);
-
- host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
-
- rrdhost_aclk_state_unlock(host);
-
- rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
-
- rrdpush_send_claimed_id(host);
-
- return PARSER_RC_OK;
-}
-
-// ----------------------------------------------------------------------------
-
-void pluginsd_process_thread_cleanup(void *ptr) {
- PARSER *parser = (PARSER *)ptr;
-
- pluginsd_cleanup_v2(parser);
- pluginsd_host_define_cleanup(parser);
-
- rrd_collector_finished();
-
-#ifdef NETDATA_LOG_STREAM_RECEIVE
- if(parser->user.stream_log_fp) {
- fclose(parser->user.stream_log_fp);
- parser->user.stream_log_fp = NULL;
- }
-#endif
-
- parser_destroy(parser);
-}
-
-bool parser_reconstruct_node(BUFFER *wb, void *ptr) {
- PARSER *parser = ptr;
- if(!parser || !parser->user.host)
- return false;
-
- buffer_strcat(wb, rrdhost_hostname(parser->user.host));
- return true;
-}
-
-bool parser_reconstruct_instance(BUFFER *wb, void *ptr) {
- PARSER *parser = ptr;
- if(!parser || !parser->user.st)
- return false;
-
- buffer_strcat(wb, rrdset_name(parser->user.st));
- return true;
-}
-
-bool parser_reconstruct_context(BUFFER *wb, void *ptr) {
- PARSER *parser = ptr;
- if(!parser || !parser->user.st)
- return false;
-
- buffer_strcat(wb, string2str(parser->user.st->context));
- return true;
-}
-
-inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
-{
- int enabled = cd->unsafe.enabled;
-
- if (!fp_plugin_input || !fp_plugin_output || !enabled) {
- cd->unsafe.enabled = 0;
- return 0;
- }
-
- if (unlikely(fileno(fp_plugin_input) == -1)) {
- netdata_log_error("input file descriptor given is not a valid stream");
- cd->serial_failures++;
- return 0;
- }
-
- if (unlikely(fileno(fp_plugin_output) == -1)) {
- netdata_log_error("output file descriptor given is not a valid stream");
- cd->serial_failures++;
- return 0;
- }
-
- clearerr(fp_plugin_input);
- clearerr(fp_plugin_output);
-
- PARSER *parser;
- {
- PARSER_USER_OBJECT user = {
- .enabled = cd->unsafe.enabled,
- .host = host,
- .cd = cd,
- .trust_durations = trust_durations
- };
-
- // fp_plugin_output = our input; fp_plugin_input = our output
- parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
- }
-
- pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD);
-
- rrd_collector_started();
-
- size_t count = 0;
-
- // this keeps the parser with its current value
- // so, parser needs to be allocated before pushing it
- netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
-
- {
- ND_LOG_STACK lgs[] = {
- ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
- ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
- ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
- ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
- ND_LOG_FIELD_END(),
- };
- ND_LOG_STACK_PUSH(lgs);
-
- buffered_reader_init(&parser->reader);
- BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL);
- while(likely(service_running(SERVICE_COLLECTORS))) {
-
- if(unlikely(!buffered_reader_next_line(&parser->reader, buffer))) {
- buffered_reader_ret_t ret = buffered_reader_read_timeout(
- &parser->reader,
- fileno((FILE *) parser->fp_input),
- 2 * 60 * MSEC_PER_SEC, true
- );
-
- if(unlikely(ret != BUFFERED_READER_READ_OK))
- break;
-
- continue;
- }
-
- if(unlikely(parser_action(parser, buffer->buffer)))
- break;
-
- buffer->len = 0;
- buffer->buffer[0] = '\0';
- }
- buffer_free(buffer);
-
- cd->unsafe.enabled = parser->user.enabled;
- count = parser->user.data_collections_count;
-
- if(likely(count)) {
- cd->successful_collections += count;
- cd->serial_failures = 0;
- }
- else
- cd->serial_failures++;
- }
-
- // free parser with the pop function
- netdata_thread_cleanup_pop(1);
-
- return count;
-}
-
-void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire) {
- parser_init_repertoire(parser, repertoire);
-
- if (repertoire & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING))
- inflight_functions_init(parser);
-}
-
-PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd,
- PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) {
- PARSER *parser;
-
- parser = callocz(1, sizeof(*parser));
- if(user)
- parser->user = *user;
- parser->fd = fd;
- parser->fp_input = fp_input;
- parser->fp_output = fp_output;
-#ifdef ENABLE_HTTPS
- parser->ssl_output = ssl;
-#endif
- parser->flags = flags;
-
- spinlock_init(&parser->writer.spinlock);
- return parser;
-}
-
-PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words) {
- switch(keyword->id) {
- case 1:
- return pluginsd_set_v2(words, num_words, parser);
-
- case 2:
- return pluginsd_begin_v2(words, num_words, parser);
-
- case 3:
- return pluginsd_end_v2(words, num_words, parser);
-
- case 11:
- return pluginsd_set(words, num_words, parser);
-
- case 12:
- return pluginsd_begin(words, num_words, parser);
-
- case 13:
- return pluginsd_end(words, num_words, parser);
-
- case 21:
- return pluginsd_replay_set(words, num_words, parser);
-
- case 22:
- return pluginsd_replay_begin(words, num_words, parser);
-
- case 23:
- return pluginsd_replay_rrddim_collection_state(words, num_words, parser);
-
- case 24:
- return pluginsd_replay_rrdset_collection_state(words, num_words, parser);
-
- case 25:
- return pluginsd_replay_end(words, num_words, parser);
-
- case 31:
- return pluginsd_dimension(words, num_words, parser);
-
- case 32:
- return pluginsd_chart(words, num_words, parser);
-
- case 33:
- return pluginsd_chart_definition_end(words, num_words, parser);
-
- case 34:
- return pluginsd_clabel(words, num_words, parser);
-
- case 35:
- return pluginsd_clabel_commit(words, num_words, parser);
-
- case 41:
- return pluginsd_function(words, num_words, parser);
-
- case 42:
- return pluginsd_function_result_begin(words, num_words, parser);
-
- case 51:
- return pluginsd_label(words, num_words, parser);
-
- case 52:
- return pluginsd_overwrite(words, num_words, parser);
-
- case 53:
- return pluginsd_variable(words, num_words, parser);
-
- case 61:
- return streaming_claimed_id(words, num_words, parser);
-
- case 71:
- return pluginsd_host(words, num_words, parser);
-
- case 72:
- return pluginsd_host_define(words, num_words, parser);
-
- case 73:
- return pluginsd_host_define_end(words, num_words, parser);
-
- case 74:
- return pluginsd_host_labels(words, num_words, parser);
-
- case 97:
- return pluginsd_flush(words, num_words, parser);
-
- case 98:
- return pluginsd_disable(words, num_words, parser);
-
- case 99:
- return pluginsd_exit(words, num_words, parser);
-
- case 101:
- return pluginsd_register_plugin(words, num_words, parser);
-
- case 102:
- return pluginsd_register_module(words, num_words, parser);
-
- case 103:
- return pluginsd_register_job(words, num_words, parser);
-
- case 104:
- return pluginsd_dyncfg_reset(words, num_words, parser);
-
- case 110:
- return pluginsd_job_status(words, num_words, parser);
-
- case 111:
- return pluginsd_delete_job(words, num_words, parser);
-
- default:
- fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id);
- }
-}
-
-#include "gperf-hashtable.h"
-
-void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) {
- parser->repertoire = repertoire;
-
- for(size_t i = GPERF_PARSER_MIN_HASH_VALUE ; i <= GPERF_PARSER_MAX_HASH_VALUE ;i++) {
- if(gperf_keywords[i].keyword && *gperf_keywords[i].keyword && (parser->repertoire & gperf_keywords[i].repertoire))
- worker_register_job_name(gperf_keywords[i].worker_job_id, gperf_keywords[i].keyword);
- }
-}
-
-static void parser_destroy_dyncfg(PARSER *parser) {
- if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) {
- unregister_plugin(parser->user.host->configurable_plugins, parser->user.cd->cfg_dict_item);
- parser->user.cd->configuration = NULL;
- } else if (parser->user.host != NULL && SERVING_STREAMING(parser) && parser->user.host != localhost){
- dictionary_flush(parser->user.host->configurable_plugins);
- }
-}
-
-void parser_destroy(PARSER *parser) {
- if (unlikely(!parser))
- return;
-
- parser_destroy_dyncfg(parser);
-
- dictionary_destroy(parser->inflight.functions);
- freez(parser);
-}
-
-int pluginsd_parser_unittest(void) {
- PARSER *p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
- pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
-
- char *lines[] = {
- "BEGIN2 abcdefghijklmnopqr 123",
- "SET2 abcdefg 0x12345678 0 0",
- "SET2 hijklmnoqr 0x12345678 0 0",
- "SET2 stuvwxyz 0x12345678 0 0",
- "END2",
- NULL,
- };
-
- char *words[PLUGINSD_MAX_WORDS];
- size_t iterations = 1000000;
- size_t count = 0;
- char input[PLUGINSD_LINE_MAX + 1];
-
- usec_t started = now_realtime_usec();
- while(--iterations) {
- for(size_t line = 0; lines[line] ;line++) {
- strncpyz(input, lines[line], PLUGINSD_LINE_MAX);
- size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
- const char *command = get_word(words, num_words, 0);
- PARSER_KEYWORD *keyword = parser_find_keyword(p, command);
- if(unlikely(!keyword))
- fatal("Cannot parse the line '%s'", lines[line]);
- count++;
- }
- }
- usec_t ended = now_realtime_usec();
-
- netdata_log_info("Parsed %zu lines in %0.2f secs, %0.2f klines/sec", count,
- (double)(ended - started) / (double)USEC_PER_SEC,
- (double)count / ((double)(ended - started) / (double)USEC_PER_SEC) / 1000.0);
-
- parser_destroy(p);
- return 0;
-}