summaryrefslogtreecommitdiffstats
path: root/src/collectors/plugins.d/pluginsd_internals.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:18 +0000
commit5da14042f70711ea5cf66e034699730335462f66 (patch)
tree0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/collectors/plugins.d/pluginsd_internals.h
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz
netdata-5da14042f70711ea5cf66e034699730335462f66.zip
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/collectors/plugins.d/pluginsd_internals.h')
-rw-r--r--src/collectors/plugins.d/pluginsd_internals.h355
1 files changed, 355 insertions, 0 deletions
diff --git a/src/collectors/plugins.d/pluginsd_internals.h b/src/collectors/plugins.d/pluginsd_internals.h
new file mode 100644
index 000000000..31db02544
--- /dev/null
+++ b/src/collectors/plugins.d/pluginsd_internals.h
@@ -0,0 +1,355 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_PLUGINSD_INTERNALS_H
+#define NETDATA_PLUGINSD_INTERNALS_H
+
+#include "pluginsd_parser.h"
+#include "pluginsd_functions.h"
+#include "pluginsd_dyncfg.h"
+#include "pluginsd_replication.h"
+
+#define SERVING_STREAMING(parser) ((parser)->repertoire == PARSER_INIT_STREAMING)
+#define SERVING_PLUGINSD(parser) ((parser)->repertoire == PARSER_INIT_PLUGINSD)
+
+PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg);
+
+ssize_t send_to_plugin(const char *txt, void *data);
+
+static inline RRDHOST *pluginsd_require_scope_host(PARSER *parser, const char *cmd) {
+ RRDHOST *host = parser->user.host;
+
+ if(unlikely(!host))
+ netdata_log_error("PLUGINSD: command %s requires a host, but is not set.", cmd);
+
+ return host;
+}
+
+static inline RRDSET *pluginsd_require_scope_chart(PARSER *parser, const char *cmd, const char *parent_cmd) {
+ RRDSET *st = parser->user.st;
+
+ if(unlikely(!st))
+ netdata_log_error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
+
+ return st;
+}
+
+static inline RRDSET *pluginsd_get_scope_chart(PARSER *parser) {
+ return parser->user.st;
+}
+
+static inline void pluginsd_lock_rrdset_data_collection(PARSER *parser) {
+ if(parser->user.st && !parser->user.v2.locked_data_collection) {
+ spinlock_lock(&parser->user.st->data_collection_lock);
+ parser->user.v2.locked_data_collection = true;
+ }
+}
+
+static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) {
+ if(parser->user.st && parser->user.v2.locked_data_collection) {
+ spinlock_unlock(&parser->user.st->data_collection_lock);
+ parser->user.v2.locked_data_collection = false;
+ return true;
+ }
+
+ return false;
+}
+
+static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const char *keyword, bool stale) {
+ if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) {
+ if(stale)
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
+ rrdhost_hostname(parser->user.st->rrdhost),
+ rrdset_id(parser->user.st),
+ keyword);
+ }
+
+ if(unlikely(parser->user.v2.ml_locked)) {
+ ml_chart_update_end(parser->user.st);
+ parser->user.v2.ml_locked = false;
+
+ if(stale)
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
+ rrdhost_hostname(parser->user.st->rrdhost),
+ rrdset_id(parser->user.st),
+ keyword);
+ }
+}
+
+static inline void pluginsd_clear_scope_chart(PARSER *parser, const char *keyword) {
+ pluginsd_unlock_previous_scope_chart(parser, keyword, true);
+
+ if(parser->user.cleanup_slots && parser->user.st)
+ rrdset_pluginsd_receive_unslot(parser->user.st);
+
+ parser->user.st = NULL;
+ parser->user.cleanup_slots = false;
+}
+
+static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const char *keyword) {
+ RRDSET *old_st = parser->user.st;
+ pid_t old_collector_tid = (old_st) ? old_st->pluginsd.collector_tid : 0;
+ pid_t my_collector_tid = gettid();
+
+ if(unlikely(old_collector_tid)) {
+ if(old_collector_tid != my_collector_tid) {
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING,
+ "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)",
+ keyword ? keyword : "UNKNOWN",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ my_collector_tid, old_collector_tid);
+
+ return false;
+ }
+
+ old_st->pluginsd.collector_tid = 0;
+ }
+
+ st->pluginsd.collector_tid = my_collector_tid;
+
+ pluginsd_clear_scope_chart(parser, keyword);
+
+ st->pluginsd.pos = 0;
+ parser->user.st = st;
+ parser->user.cleanup_slots = false;
+
+ return true;
+}
+
+static inline void pluginsd_rrddim_put_to_slot(PARSER *parser, RRDSET *st, RRDDIM *rd, ssize_t slot, bool obsolete) {
+ size_t wanted_size = st->pluginsd.size;
+
+ if(slot >= 1) {
+ st->pluginsd.dims_with_slots = true;
+ wanted_size = slot;
+ }
+ else {
+ st->pluginsd.dims_with_slots = false;
+ wanted_size = dictionary_entries(st->rrddim_root_index);
+ }
+
+ if(wanted_size > st->pluginsd.size) {
+ st->pluginsd.prd_array = reallocz(st->pluginsd.prd_array, wanted_size * sizeof(struct pluginsd_rrddim));
+
+ // initialize the empty slots
+ for(ssize_t i = (ssize_t) wanted_size - 1; i >= (ssize_t) st->pluginsd.size; i--) {
+ st->pluginsd.prd_array[i].rda = NULL;
+ st->pluginsd.prd_array[i].rd = NULL;
+ st->pluginsd.prd_array[i].id = NULL;
+ }
+
+ st->pluginsd.size = wanted_size;
+ }
+
+ if(st->pluginsd.dims_with_slots) {
+ struct pluginsd_rrddim *prd = &st->pluginsd.prd_array[slot - 1];
+
+ if(prd->rd != rd) {
+ prd->rda = rrddim_find_and_acquire(st, string2str(rd->id));
+ prd->rd = rrddim_acquired_to_rrddim(prd->rda);
+ prd->id = string2str(prd->rd->id);
+ }
+
+ if(obsolete)
+ parser->user.cleanup_slots = true;
+ }
+}
+
+static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, ssize_t slot, const char *cmd) {
+ if (unlikely(!dimension || !*dimension)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
+ rrdhost_hostname(host), rrdset_id(st), cmd);
+ return NULL;
+ }
+
+ if (unlikely(!st->pluginsd.size)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, but the chart has no dimensions.",
+ rrdhost_hostname(host), rrdset_id(st), cmd);
+ return NULL;
+ }
+
+ struct pluginsd_rrddim *prd;
+ RRDDIM *rd;
+
+ if(likely(st->pluginsd.dims_with_slots)) {
+ // caching with slots
+
+ if(unlikely(slot < 1 || slot > st->pluginsd.size)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s with slot %zd, but slots in the range [1 - %u] are expected.",
+ rrdhost_hostname(host), rrdset_id(st), cmd, slot, st->pluginsd.size);
+ return NULL;
+ }
+
+ prd = &st->pluginsd.prd_array[slot - 1];
+
+ rd = prd->rd;
+ if(likely(rd)) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(strcmp(prd->id, dimension) != 0) {
+ ssize_t t;
+ for(t = 0; t < st->pluginsd.size ;t++) {
+ if (strcmp(st->pluginsd.prd_array[t].id, dimension) == 0)
+ break;
+ }
+ if(t >= st->pluginsd.size)
+ t = -1;
+
+ internal_fatal(true,
+ "PLUGINSD: expected to find dimension '%s' on slot %zd, but found '%s', "
+ "the right slot is %zd",
+ dimension, slot, prd->id, t);
+ }
+#endif
+ return rd;
+ }
+ }
+ else {
+ // caching without slots
+
+ if(unlikely(st->pluginsd.pos >= st->pluginsd.size))
+ st->pluginsd.pos = 0;
+
+ prd = &st->pluginsd.prd_array[st->pluginsd.pos++];
+
+ rd = prd->rd;
+ if(likely(rd)) {
+ const char *id = prd->id;
+
+ if(strcmp(id, dimension) == 0) {
+ // we found it cached
+ return rd;
+ }
+ else {
+ // the cached one is not good for us
+ rrddim_acquired_release(prd->rda);
+ prd->rda = NULL;
+ prd->rd = NULL;
+ prd->id = NULL;
+ }
+ }
+ }
+
+ // we need to find the dimension and set it to prd
+
+ RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
+ if (unlikely(!rda)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
+ rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
+
+ return NULL;
+ }
+
+ prd->rda = rda;
+ prd->rd = rd = rrddim_acquired_to_rrddim(rda);
+ prd->id = string2str(rd->id);
+
+ return rd;
+}
+
+static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
+ if (unlikely(!chart || !*chart)) {
+ netdata_log_error("PLUGINSD: 'host:%s' got a %s without a chart id.",
+ rrdhost_hostname(host), cmd);
+ return NULL;
+ }
+
+ RRDSET *st = rrdset_find(host, chart);
+ if (unlikely(!st))
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
+ rrdhost_hostname(host), chart, cmd);
+
+ return st;
+}
+
+static inline ssize_t pluginsd_parse_rrd_slot(char **words, size_t num_words) {
+ ssize_t slot = -1;
+ char *id = get_word(words, num_words, 1);
+ if(id && id[0] == PLUGINSD_KEYWORD_SLOT[0] && id[1] == PLUGINSD_KEYWORD_SLOT[1] &&
+ id[2] == PLUGINSD_KEYWORD_SLOT[2] && id[3] == PLUGINSD_KEYWORD_SLOT[3] && id[4] == ':') {
+ slot = (ssize_t) str2ull_encoded(&id[5]);
+ if(slot < 0) slot = 0; // to make the caller increment its idx of the words
+ }
+
+ return slot;
+}
+
+static inline void pluginsd_rrdset_cache_put_to_slot(PARSER *parser, RRDSET *st, ssize_t slot, bool obsolete) {
+ // clean possible old cached data
+ rrdset_pluginsd_receive_unslot(st);
+
+ if(unlikely(slot < 1 || slot >= INT32_MAX))
+ return;
+
+ RRDHOST *host = st->rrdhost;
+
+ if(unlikely((size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size)) {
+ spinlock_lock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock);
+ size_t old_slots = host->rrdpush.receive.pluginsd_chart_slots.size;
+ size_t new_slots = (old_slots < PLUGINSD_MIN_RRDSET_POINTERS_CACHE) ? PLUGINSD_MIN_RRDSET_POINTERS_CACHE : old_slots * 2;
+
+ if(new_slots < (size_t)slot)
+ new_slots = slot;
+
+ host->rrdpush.receive.pluginsd_chart_slots.array =
+ reallocz(host->rrdpush.receive.pluginsd_chart_slots.array, new_slots * sizeof(RRDSET *));
+
+ for(size_t i = old_slots; i < new_slots ;i++)
+ host->rrdpush.receive.pluginsd_chart_slots.array[i] = NULL;
+
+ host->rrdpush.receive.pluginsd_chart_slots.size = new_slots;
+ spinlock_unlock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock);
+ }
+
+ host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1] = st;
+ st->pluginsd.last_slot = (int32_t)slot - 1;
+ parser->user.cleanup_slots = obsolete;
+}
+
+static inline RRDSET *pluginsd_rrdset_cache_get_from_slot(PARSER *parser, RRDHOST *host, const char *id, ssize_t slot, const char *keyword) {
+ if(unlikely(slot < 1 || (size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size))
+ return pluginsd_find_chart(host, id, keyword);
+
+ RRDSET *st = host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1];
+
+ if(!st) {
+ st = pluginsd_find_chart(host, id, keyword);
+ if(st)
+ pluginsd_rrdset_cache_put_to_slot(parser, st, slot, rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE));
+ }
+ else {
+ internal_fatal(string_strcmp(st->id, id) != 0,
+ "PLUGINSD: wrong chart in slot %zd, expected '%s', found '%s'",
+ slot - 1, id, string2str(st->id));
+ }
+
+ return st;
+}
+
+static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str) {
+ SN_FLAGS flags = SN_FLAG_NONE;
+
+ char c;
+ while ((c = *flags_str++)) {
+ switch (c) {
+ case 'A':
+ flags |= SN_FLAG_NOT_ANOMALOUS;
+ break;
+
+ case 'R':
+ flags |= SN_FLAG_RESET;
+ break;
+
+ case 'E':
+ flags = SN_EMPTY_SLOT;
+ return flags;
+
+ default:
+ internal_error(true, "Unknown SN_FLAGS flag '%c'", c);
+ break;
+ }
+ }
+
+ return flags;
+}
+
+#endif //NETDATA_PLUGINSD_INTERNALS_H