summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d/pluginsd_parser.c
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/plugins.d/pluginsd_parser.c')
-rw-r--r--collectors/plugins.d/pluginsd_parser.c771
1 files changed, 521 insertions, 250 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 2e69c7da..3b47c6c0 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -4,8 +4,8 @@
#define LOG_FUNCTIONS false
-#define SERVING_STREAMING(parser) (parser->repertoire == PARSER_INIT_STREAMING)
-#define SERVING_PLUGINSD(parser) (parser->repertoire == PARSER_INIT_PLUGINSD)
+#define SERVING_STREAMING(parser) ((parser)->repertoire == PARSER_INIT_STREAMING)
+#define SERVING_PLUGINSD(parser) ((parser)->repertoire == PARSER_INIT_PLUGINSD)
static ssize_t send_to_plugin(const char *txt, void *data) {
PARSER *parser = data;
@@ -13,6 +13,11 @@ static ssize_t send_to_plugin(const char *txt, void *data) {
if(!txt || !*txt)
return 0;
+#ifdef ENABLE_H2O
+ if(parser->h2o_ctx)
+ return h2o_stream_write(parser->h2o_ctx, txt, strlen(txt));
+#endif
+
errno = 0;
spinlock_lock(&parser->writer.spinlock);
ssize_t bytes = -1;
@@ -110,23 +115,6 @@ static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) {
return false;
}
-void pluginsd_rrdset_cleanup(RRDSET *st) {
- spinlock_lock(&st->pluginsd.spinlock);
-
- for(size_t i = 0; i < st->pluginsd.size ; i++) {
- rrddim_acquired_release(st->pluginsd.rda[i]); // can be NULL
- st->pluginsd.rda[i] = NULL;
- }
-
- freez(st->pluginsd.rda);
- st->pluginsd.collector_tid = 0;
- st->pluginsd.rda = NULL;
- st->pluginsd.size = 0;
- st->pluginsd.pos = 0;
-
- spinlock_unlock(&st->pluginsd.spinlock);
-}
-
static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const char *keyword, bool stale) {
if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) {
if(stale)
@@ -150,7 +138,12 @@ static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const ch
static inline void pluginsd_clear_scope_chart(PARSER *parser, const char *keyword) {
pluginsd_unlock_previous_scope_chart(parser, keyword, true);
+
+ if(parser->user.cleanup_slots && parser->user.st)
+ rrdset_pluginsd_receive_unslot(parser->user.st);
+
parser->user.st = NULL;
+ parser->user.cleanup_slots = false;
}
static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const char *keyword) {
@@ -160,11 +153,12 @@ static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const ch
if(unlikely(old_collector_tid)) {
if(old_collector_tid != my_collector_tid) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)",
- keyword ? keyword : "UNKNOWN",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- my_collector_tid, old_collector_tid);
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING,
+ "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)",
+ keyword ? keyword : "UNKNOWN",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ my_collector_tid, old_collector_tid);
return false;
}
@@ -176,61 +170,141 @@ static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const ch
pluginsd_clear_scope_chart(parser, keyword);
- size_t dims = dictionary_entries(st->rrddim_root_index);
- if(unlikely(st->pluginsd.size < dims)) {
- st->pluginsd.rda = reallocz(st->pluginsd.rda, dims * sizeof(RRDDIM_ACQUIRED *));
+ st->pluginsd.pos = 0;
+ parser->user.st = st;
+ parser->user.cleanup_slots = false;
+
+ return true;
+}
+
+static inline void pluginsd_rrddim_put_to_slot(PARSER *parser, RRDSET *st, RRDDIM *rd, ssize_t slot, bool obsolete) {
+ size_t wanted_size = st->pluginsd.size;
+
+ if(slot >= 1) {
+ st->pluginsd.dims_with_slots = true;
+ wanted_size = slot;
+ }
+ else {
+ st->pluginsd.dims_with_slots = false;
+ wanted_size = dictionary_entries(st->rrddim_root_index);
+ }
+
+ if(wanted_size > st->pluginsd.size) {
+ st->pluginsd.prd_array = reallocz(st->pluginsd.prd_array, wanted_size * sizeof(struct pluginsd_rrddim));
// initialize the empty slots
- for(ssize_t i = (ssize_t)dims - 1; i >= (ssize_t)st->pluginsd.size ;i--)
- st->pluginsd.rda[i] = NULL;
+ for(ssize_t i = (ssize_t) wanted_size - 1; i >= (ssize_t) st->pluginsd.size; i--) {
+ st->pluginsd.prd_array[i].rda = NULL;
+ st->pluginsd.prd_array[i].rd = NULL;
+ st->pluginsd.prd_array[i].id = NULL;
+ }
- st->pluginsd.size = dims;
+ st->pluginsd.size = wanted_size;
}
- st->pluginsd.pos = 0;
- parser->user.st = st;
+ if(st->pluginsd.dims_with_slots) {
+ struct pluginsd_rrddim *prd = &st->pluginsd.prd_array[slot - 1];
- return true;
+ if(prd->rd != rd) {
+ prd->rda = rrddim_find_and_acquire(st, string2str(rd->id));
+ prd->rd = rrddim_acquired_to_rrddim(prd->rda);
+ prd->id = string2str(prd->rd->id);
+ }
+
+ if(obsolete)
+ parser->user.cleanup_slots = true;
+ }
}
-static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
+static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, ssize_t slot, const char *cmd) {
if (unlikely(!dimension || !*dimension)) {
netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
rrdhost_hostname(host), rrdset_id(st), cmd);
return NULL;
}
- if(unlikely(st->pluginsd.pos >= st->pluginsd.size))
- st->pluginsd.pos = 0;
+ if (unlikely(!st->pluginsd.size)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, but the chart has no dimensions.",
+ rrdhost_hostname(host), rrdset_id(st), cmd);
+ return NULL;
+ }
+
+ struct pluginsd_rrddim *prd;
+ RRDDIM *rd;
+
+ if(likely(st->pluginsd.dims_with_slots)) {
+ // caching with slots
+
+ if(unlikely(slot < 1 || slot > st->pluginsd.size)) {
+ netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s with slot %zd, but slots in the range [1 - %u] are expected.",
+ rrdhost_hostname(host), rrdset_id(st), cmd, slot, st->pluginsd.size);
+ return NULL;
+ }
- RRDDIM_ACQUIRED *rda = st->pluginsd.rda[st->pluginsd.pos];
+ prd = &st->pluginsd.prd_array[slot - 1];
- if(likely(rda)) {
- RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
- if (likely(rd && string_strcmp(rd->id, dimension) == 0)) {
- // we found a cached RDA
- st->pluginsd.pos++;
+ rd = prd->rd;
+ if(likely(rd)) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(strcmp(prd->id, dimension) != 0) {
+ ssize_t t;
+ for(t = 0; t < st->pluginsd.size ;t++) {
+ if (strcmp(st->pluginsd.prd_array[t].id, dimension) == 0)
+ break;
+ }
+ if(t >= st->pluginsd.size)
+ t = -1;
+
+ internal_fatal(true,
+ "PLUGINSD: expected to find dimension '%s' on slot %zd, but found '%s', "
+ "the right slot is %zd",
+ dimension, slot, prd->id, t);
+ }
+#endif
return rd;
}
- else {
- // the collector is sending dimensions in a different order
- // release the previous one, to reuse this slot
- rrddim_acquired_release(rda);
- st->pluginsd.rda[st->pluginsd.pos] = NULL;
+ }
+ else {
+ // caching without slots
+
+ if(unlikely(st->pluginsd.pos >= st->pluginsd.size))
+ st->pluginsd.pos = 0;
+
+ prd = &st->pluginsd.prd_array[st->pluginsd.pos++];
+
+ rd = prd->rd;
+ if(likely(rd)) {
+ const char *id = prd->id;
+
+ if(strcmp(id, dimension) == 0) {
+ // we found it cached
+ return rd;
+ }
+ else {
+ // the cached one is not good for us
+ rrddim_acquired_release(prd->rda);
+ prd->rda = NULL;
+ prd->rd = NULL;
+ prd->id = NULL;
+ }
}
}
- rda = rrddim_find_and_acquire(st, dimension);
+ // we need to find the dimension and set it to prd
+
+ RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension);
if (unlikely(!rda)) {
netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
- rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
+ rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
return NULL;
}
- st->pluginsd.rda[st->pluginsd.pos++] = rda;
+ prd->rda = rda;
+ prd->rd = rd = rrddim_acquired_to_rrddim(rda);
+ prd->id = string2str(rd->id);
- return rrddim_acquired_to_rrddim(rda);
+ return rd;
}
static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
@@ -248,20 +322,89 @@ static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, cons
return st;
}
+static inline ssize_t pluginsd_parse_rrd_slot(char **words, size_t num_words) {
+ ssize_t slot = -1;
+ char *id = get_word(words, num_words, 1);
+ if(id && id[0] == PLUGINSD_KEYWORD_SLOT[0] && id[1] == PLUGINSD_KEYWORD_SLOT[1] &&
+ id[2] == PLUGINSD_KEYWORD_SLOT[2] && id[3] == PLUGINSD_KEYWORD_SLOT[3] && id[4] == ':') {
+ slot = (ssize_t) str2ull_encoded(&id[5]);
+ if(slot < 0) slot = 0; // to make the caller increment its idx of the words
+ }
+
+ return slot;
+}
+
+static inline void pluginsd_rrdset_cache_put_to_slot(PARSER *parser, RRDSET *st, ssize_t slot, bool obsolete) {
+ // clean possible old cached data
+ rrdset_pluginsd_receive_unslot(st);
+
+ if(unlikely(slot < 1 || slot >= INT32_MAX))
+ return;
+
+ RRDHOST *host = st->rrdhost;
+
+ if(unlikely((size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size)) {
+ spinlock_lock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock);
+ size_t old_slots = host->rrdpush.receive.pluginsd_chart_slots.size;
+ size_t new_slots = (old_slots < PLUGINSD_MIN_RRDSET_POINTERS_CACHE) ? PLUGINSD_MIN_RRDSET_POINTERS_CACHE : old_slots * 2;
+
+ if(new_slots < (size_t)slot)
+ new_slots = slot;
+
+ host->rrdpush.receive.pluginsd_chart_slots.array =
+ reallocz(host->rrdpush.receive.pluginsd_chart_slots.array, new_slots * sizeof(RRDSET *));
+
+ for(size_t i = old_slots; i < new_slots ;i++)
+ host->rrdpush.receive.pluginsd_chart_slots.array[i] = NULL;
+
+ host->rrdpush.receive.pluginsd_chart_slots.size = new_slots;
+ spinlock_unlock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock);
+ }
+
+ host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1] = st;
+ st->pluginsd.last_slot = (int32_t)slot - 1;
+ parser->user.cleanup_slots = obsolete;
+}
+
+static inline RRDSET *pluginsd_rrdset_cache_get_from_slot(PARSER *parser, RRDHOST *host, const char *id, ssize_t slot, const char *keyword) {
+ if(unlikely(slot < 1 || (size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size))
+ return pluginsd_find_chart(host, id, keyword);
+
+ RRDSET *st = host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1];
+
+ if(!st) {
+ st = pluginsd_find_chart(host, id, keyword);
+ if(st)
+ pluginsd_rrdset_cache_put_to_slot(parser, st, slot, rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE));
+ }
+ else {
+ internal_fatal(string_strcmp(st->id, id) != 0,
+ "PLUGINSD: wrong chart in slot %zd, expected '%s', found '%s'",
+ slot - 1, id, string2str(st->id));
+ }
+
+ return st;
+}
+
static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg) {
parser->user.enabled = 0;
if(keyword && msg) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "PLUGINSD: keyword %s: %s", keyword, msg);
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_INFO,
+ "PLUGINSD: keyword %s: %s", keyword, msg);
}
return PARSER_RC_ERROR;
}
static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) {
- char *dimension = get_word(words, num_words, 1);
- char *value = get_word(words, num_words, 2);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *dimension = get_word(words, num_words, idx++);
+ char *value = get_word(words, num_words, idx++);
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
@@ -269,7 +412,7 @@ static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *par
RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET);
if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
st->pluginsd.set = true;
@@ -285,13 +428,17 @@ static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *par
}
static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) {
- char *id = get_word(words, num_words, 1);
- char *microseconds_txt = get_word(words, num_words, 2);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *id = get_word(words, num_words, idx++);
+ char *microseconds_txt = get_word(words, num_words, idx++);
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN);
+ RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN);
if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN))
@@ -332,8 +479,9 @@ static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *p
}
static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) {
- UNUSED(words);
- UNUSED(num_words);
+ char *tv_sec = get_word(words, num_words, 1);
+ char *tv_usec = get_word(words, num_words, 2);
+ char *pending_rrdset_next = get_word(words, num_words, 3);
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
@@ -347,9 +495,15 @@ static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *par
pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_END);
parser->user.data_collections_count++;
- struct timeval now;
- now_realtime_timeval(&now);
- rrdset_timed_done(st, now, /* pending_rrdset_next = */ false);
+ struct timeval tv = {
+ .tv_sec = (tv_sec && *tv_sec) ? str2ll(tv_sec, NULL) : 0,
+ .tv_usec = (tv_usec && *tv_usec) ? str2ll(tv_usec, NULL) : 0
+ };
+
+ if(!tv.tv_sec)
+ now_realtime_timeval(&tv);
+
+ rrdset_timed_done(st, tv, pending_rrdset_next && *pending_rrdset_next ? true : false);
return PARSER_RC_OK;
}
@@ -419,30 +573,29 @@ static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, si
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
RRDHOST *host = rrdhost_find_or_create(
- string2str(parser->user.host_define.hostname),
- string2str(parser->user.host_define.hostname),
- parser->user.host_define.machine_guid_str,
- "Netdata Virtual Host 1.0",
- netdata_configured_timezone,
- netdata_configured_abbrev_timezone,
- netdata_configured_utc_offset,
- NULL,
- program_name,
- program_version,
- default_rrd_update_every,
- default_rrd_history_entries,
- default_rrd_memory_mode,
- default_health_enabled,
- default_rrdpush_enabled,
- default_rrdpush_destination,
- default_rrdpush_api_key,
- default_rrdpush_send_charts_matching,
- default_rrdpush_enable_replication,
- default_rrdpush_seconds_to_replicate,
- default_rrdpush_replication_step,
- rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels),
- false
- );
+ string2str(parser->user.host_define.hostname),
+ string2str(parser->user.host_define.hostname),
+ parser->user.host_define.machine_guid_str,
+ "Netdata Virtual Host 1.0",
+ netdata_configured_timezone,
+ netdata_configured_abbrev_timezone,
+ netdata_configured_utc_offset,
+ NULL,
+ program_name,
+ program_version,
+ default_rrd_update_every,
+ default_rrd_history_entries,
+ default_rrd_memory_mode,
+ default_health_enabled,
+ default_rrdpush_enabled,
+ default_rrdpush_destination,
+ default_rrdpush_api_key,
+ default_rrdpush_send_charts_matching,
+ default_rrdpush_enable_replication,
+ default_rrdpush_seconds_to_replicate,
+ default_rrdpush_replication_step,
+ rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels),
+ false);
rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
@@ -492,18 +645,22 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- char *type = get_word(words, num_words, 1);
- char *name = get_word(words, num_words, 2);
- char *title = get_word(words, num_words, 3);
- char *units = get_word(words, num_words, 4);
- char *family = get_word(words, num_words, 5);
- char *context = get_word(words, num_words, 6);
- char *chart = get_word(words, num_words, 7);
- char *priority_s = get_word(words, num_words, 8);
- char *update_every_s = get_word(words, num_words, 9);
- char *options = get_word(words, num_words, 10);
- char *plugin = get_word(words, num_words, 11);
- char *module = get_word(words, num_words, 12);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *type = get_word(words, num_words, idx++);
+ char *name = get_word(words, num_words, idx++);
+ char *title = get_word(words, num_words, idx++);
+ char *units = get_word(words, num_words, idx++);
+ char *family = get_word(words, num_words, idx++);
+ char *context = get_word(words, num_words, idx++);
+ char *chart = get_word(words, num_words, idx++);
+ char *priority_s = get_word(words, num_words, idx++);
+ char *update_every_s = get_word(words, num_words, idx++);
+ char *options = get_word(words, num_words, idx++);
+ char *plugin = get_word(words, num_words, idx++);
+ char *module = get_word(words, num_words, idx++);
// parse the id from type
char *id = NULL;
@@ -570,14 +727,15 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p
module, priority, update_every,
chart_type);
+ bool obsolete = false;
if (likely(st)) {
if (options && *options) {
if (strstr(options, "obsolete")) {
- pluginsd_rrdset_cleanup(st);
- rrdset_is_obsolete(st);
+ rrdset_is_obsolete___safe_from_collector_thread(st);
+ obsolete = true;
}
else
- rrdset_isnot_obsolete(st);
+ rrdset_isnot_obsolete___safe_from_collector_thread(st);
if (strstr(options, "detail"))
rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
@@ -595,13 +753,15 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p
rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
}
else {
- rrdset_isnot_obsolete(st);
+ rrdset_isnot_obsolete___safe_from_collector_thread(st);
rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
}
if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_CHART))
return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
+
+ pluginsd_rrdset_cache_put_to_slot(parser, st, slot, obsolete);
}
else
pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_CHART);
@@ -652,12 +812,16 @@ static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_w
}
static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) {
- char *id = get_word(words, num_words, 1);
- char *name = get_word(words, num_words, 2);
- char *algorithm = get_word(words, num_words, 3);
- char *multiplier_s = get_word(words, num_words, 4);
- char *divisor_s = get_word(words, num_words, 5);
- char *options = get_word(words, num_words, 6);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *id = get_word(words, num_words, idx++);
+ char *name = get_word(words, num_words, idx++);
+ char *algorithm = get_word(words, num_words, idx++);
+ char *multiplier_s = get_word(words, num_words, idx++);
+ char *divisor_s = get_word(words, num_words, idx++);
+ char *options = get_word(words, num_words, idx++);
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_DIMENSION);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
@@ -696,11 +860,14 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE
int unhide_dimension = 1;
rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
+ bool obsolete = false;
if (options && *options) {
- if (strstr(options, "obsolete") != NULL)
- rrddim_is_obsolete(st, rd);
+ if (strstr(options, "obsolete") != NULL) {
+ obsolete = true;
+ rrddim_is_obsolete___safe_from_collector_thread(st, rd);
+ }
else
- rrddim_isnot_obsolete(st, rd);
+ rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
unhide_dimension = !strstr(options, "hidden");
@@ -708,8 +875,9 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE
rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
if (strstr(options, "nooverflow") != NULL)
rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
- } else
- rrddim_isnot_obsolete(st, rd);
+ }
+ else
+ rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
bool should_update_dimension = false;
@@ -727,6 +895,8 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE
rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
}
+ pluginsd_rrddim_put_to_slot(parser, st, rd, slot, obsolete);
+
return PARSER_RC_OK;
}
@@ -759,7 +929,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void
const char *transaction = dictionary_acquired_item_name(item);
char buffer[2048 + 1];
- snprintfz(buffer, 2048, "%s %s %d \"%s\"\n",
+ snprintfz(buffer, sizeof(buffer) - 1, "%s %s %d \"%s\"\n",
pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION",
transaction,
pf->timeout,
@@ -932,7 +1102,7 @@ void pluginsd_function_cancel(void *data) {
internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction);
char buffer[2048 + 1];
- snprintfz(buffer, 2048, "%s %s\n",
+ snprintfz(buffer, sizeof(buffer) - 1, "%s %s\n",
PLUGINSD_KEYWORD_FUNCTION_CANCEL,
transaction);
@@ -947,7 +1117,8 @@ void pluginsd_function_cancel(void *data) {
dfe_done(t);
if(sent <= 0)
- netdata_log_error("PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d.");
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d.");
}
// this is the function that is called from
@@ -1247,6 +1418,15 @@ static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *p
if(unlikely(!(parser->user.new_host_labels)))
parser->user.new_host_labels = rrdlabels_create();
+ if (strcmp(name,HOST_LABEL_IS_EPHEMERAL) == 0) {
+ int is_ephemeral = appconfig_test_boolean_value((char *) value);
+ if (is_ephemeral) {
+ RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_LABEL);
+ if (likely(host))
+ rrdhost_option_set(host, RRDHOST_OPTION_EPHEMERAL_HOST);
+ }
+ }
+
rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source));
if (allocated_store)
@@ -1265,6 +1445,8 @@ static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t n
host->rrdlabels = rrdlabels_create();
rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels);
+ if (rrdhost_option_check(host, RRDHOST_OPTION_EPHEMERAL_HOST))
+ rrdlabels_add(host->rrdlabels, HOST_LABEL_IS_EPHEMERAL, "true", RRDLABEL_SRC_CONFIG);
rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
rrdlabels_destroy(parser->user.new_host_labels);
@@ -1311,16 +1493,21 @@ static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size
rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrdset_metadata_updated(st);
+
parser->user.chart_rrdlabels_linked_temporarily = NULL;
return PARSER_RC_OK;
}
static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) {
- char *id = get_word(words, num_words, 1);
- char *start_time_str = get_word(words, num_words, 2);
- char *end_time_str = get_word(words, num_words, 3);
- char *child_now_str = get_word(words, num_words, 4);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *id = get_word(words, num_words, idx++);
+ char *start_time_str = get_word(words, num_words, idx++);
+ char *end_time_str = get_word(words, num_words, idx++);
+ char *child_now_str = get_word(words, num_words, idx++);
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
@@ -1329,7 +1516,7 @@ static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PA
if (likely(!id || !*id))
st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
else
- st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_REPLAY_BEGIN);
if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
@@ -1444,9 +1631,13 @@ static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str
}
static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser) {
- char *dimension = get_word(words, num_words, 1);
- char *value_str = get_word(words, num_words, 2);
- char *flags_str = get_word(words, num_words, 3);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *dimension = get_word(words, num_words, idx++);
+ char *value_str = get_word(words, num_words, idx++);
+ char *flags_str = get_word(words, num_words, idx++);
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_SET);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
@@ -1455,15 +1646,16 @@ static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARS
if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if(!parser->user.replay.rset_enabled) {
- error_limit_static_thread_var(erl, 1, 0);
- error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
- rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ nd_log_limit_static_thread_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_ERR,
+ "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
+ rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
// we have to return OK here
return PARSER_RC_OK;
}
- RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_REPLAY_SET);
if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
st->pluginsd.set = true;
@@ -1504,8 +1696,10 @@ static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARS
rd->collector.counter++;
}
else {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.",
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING,
+ "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. "
+ "Ignoring data.",
rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
}
}
@@ -1517,11 +1711,15 @@ static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, si
if(parser->user.replay.rset_enabled == false)
return PARSER_RC_OK;
- char *dimension = get_word(words, num_words, 1);
- char *last_collected_ut_str = get_word(words, num_words, 2);
- char *last_collected_value_str = get_word(words, num_words, 3);
- char *last_calculated_value_str = get_word(words, num_words, 4);
- char *last_stored_value_str = get_word(words, num_words, 5);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *dimension = get_word(words, num_words, idx++);
+ char *last_collected_ut_str = get_word(words, num_words, idx++);
+ char *last_collected_value_str = get_word(words, num_words, idx++);
+ char *last_calculated_value_str = get_word(words, num_words, idx++);
+ char *last_stored_value_str = get_word(words, num_words, idx++);
RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
@@ -1535,7 +1733,7 @@ static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, si
st->pluginsd.set = false;
}
- RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec;
@@ -1699,10 +1897,14 @@ static inline PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARS
static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) {
timing_init();
- char *id = get_word(words, num_words, 1);
- char *update_every_str = get_word(words, num_words, 2);
- char *end_time_str = get_word(words, num_words, 3);
- char *wall_clock_time_str = get_word(words, num_words, 4);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *id = get_word(words, num_words, idx++);
+ char *update_every_str = get_word(words, num_words, idx++);
+ char *end_time_str = get_word(words, num_words, idx++);
+ char *wall_clock_time_str = get_word(words, num_words, idx++);
if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str))
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
@@ -1712,14 +1914,15 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER
timing_step(TIMING_STEP_BEGIN2_PREPARE);
- RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN_V2);
+ RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN_V2);
+
if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2))
return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))
- rrdset_isnot_obsolete(st);
+ rrdset_isnot_obsolete___safe_from_collector_thread(st);
timing_step(TIMING_STEP_BEGIN2_FIND_CHART);
@@ -1759,9 +1962,12 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER
parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time);
if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) {
- // check if receiver and sender have the same number parsing capabilities
+ // check receiver capabilities
bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
- NUMBER_ENCODING encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
+
+ // check sender capabilities
+ bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false;
+ NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
BUFFER *wb = parser->user.v2.stream_buffer.wb;
@@ -1770,28 +1976,35 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER
if(unlikely(parser->user.v2.stream_buffer.begin_v2_added))
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
buffer_fast_strcat(wb, "' ", 2);
if(can_copy)
buffer_strcat(wb, update_every_str);
else
- buffer_print_uint64_encoded(wb, encoding, update_every);
+ buffer_print_uint64_encoded(wb, integer_encoding, update_every);
buffer_fast_strcat(wb, " ", 1);
if(can_copy)
buffer_strcat(wb, end_time_str);
else
- buffer_print_uint64_encoded(wb, encoding, end_time);
+ buffer_print_uint64_encoded(wb, integer_encoding, end_time);
buffer_fast_strcat(wb, " ", 1);
if(can_copy)
buffer_strcat(wb, wall_clock_time_str);
else
- buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
+ buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
buffer_fast_strcat(wb, "\n", 1);
@@ -1824,10 +2037,14 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER
static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) {
timing_init();
- char *dimension = get_word(words, num_words, 1);
- char *collected_str = get_word(words, num_words, 2);
- char *value_str = get_word(words, num_words, 3);
- char *flags_str = get_word(words, num_words, 4);
+ int idx = 1;
+ ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
+ if(slot >= 0) idx++;
+
+ char *dimension = get_word(words, num_words, idx++);
+ char *collected_str = get_word(words, num_words, idx++);
+ char *value_str = get_word(words, num_words, idx++);
+ char *flags_str = get_word(words, num_words, idx++);
if(unlikely(!dimension || !collected_str || !value_str || !flags_str))
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
@@ -1840,13 +2057,13 @@ static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *
timing_step(TIMING_STEP_SET2_PREPARE);
- RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET_V2);
+ RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET_V2);
if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
st->pluginsd.set = true;
if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED)))
- rrddim_isnot_obsolete(st, rd);
+ rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION);
@@ -1892,12 +2109,22 @@ static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *
if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) {
// check if receiver and sender have the same number parsing capabilities
bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
+
+ // check the sender capabilities
+ bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false;
NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
BUFFER *wb = parser->user.v2.stream_buffer.wb;
buffer_need_bytes(wb, 1024);
- buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
+ buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1);
+
+ if(with_slots) {
+ buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
+ buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
+ }
+
+ buffer_fast_strcat(wb, " '", 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "' ", 2);
if(can_copy)
@@ -1978,13 +2205,27 @@ static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_
// ------------------------------------------------------------------------
// cleanup RRDSET / RRDDIM
- RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- rd->collector.calculated_value = 0;
- rd->collector.collected_value = 0;
- rrddim_clear_updated(rd);
+ if(likely(st->pluginsd.dims_with_slots)) {
+ for(size_t i = 0; i < st->pluginsd.size ;i++) {
+ RRDDIM *rd = st->pluginsd.prd_array[i].rd;
+
+ if(!rd)
+ continue;
+
+ rd->collector.calculated_value = 0;
+ rd->collector.collected_value = 0;
+ rrddim_clear_updated(rd);
+ }
+ }
+ else {
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st){
+ rd->collector.calculated_value = 0;
+ rd->collector.collected_value = 0;
+ rrddim_clear_updated(rd);
+ }
+ rrddim_foreach_done(rd);
}
- rrddim_foreach_done(rd);
// ------------------------------------------------------------------------
// reset state
@@ -2438,24 +2679,35 @@ static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, si
}
static inline PARSER_RC pluginsd_register_job_common(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused, const char *plugin_name) {
- if (atol(words[3]) < 0)
- return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags");
- dyncfg_job_flg_t flags = atol(words[3]);
+ const char *module_name = words[0];
+ const char *job_name = words[1];
+ const char *job_type_str = words[2];
+ const char *flags_str = words[3];
+
+ long f = str2l(flags_str);
+
+ if (f < 0)
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags received");
+
+ dyncfg_job_flg_t flags = f;
+
if (SERVING_PLUGINSD(parser))
flags |= JOB_FLG_PLUGIN_PUSHED;
else
flags |= JOB_FLG_STREAMING_PUSHED;
- enum job_type job_type = str2job_type(words[2]);
+ enum job_type job_type = dyncfg_str2job_type(job_type_str);
if (job_type == JOB_TYPE_UNKNOWN)
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "unknown job type");
+
if (SERVING_PLUGINSD(parser) && job_type == JOB_TYPE_USER)
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "plugins cannot push jobs of type \"user\" (this is allowed only in streaming)");
- if (register_job(parser->user.host->configurable_plugins, plugin_name, words[0], words[1], job_type, flags, 0)) // ignore existing is off as this is explicitly called register job
+ if (register_job(parser->user.host->configurable_plugins, plugin_name, module_name, job_name, job_type, flags, 0)) // ignore existing is off as this is explicitly called register job
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "error registering job");
- rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, words[0], words[1], job_type, flags);
+ rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, module_name, job_name, job_type, flags);
+
return PARSER_RC_OK;
}
@@ -2474,6 +2726,25 @@ static inline PARSER_RC pluginsd_register_job(char **words __maybe_unused, size_
return pluginsd_register_job_common(&words[2], num_words - 2, parser, words[1]);
}
+static inline PARSER_RC pluginsd_dyncfg_reset(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
+ if (unlikely(num_words != (SERVING_PLUGINSD(parser) ? 1 : 2)))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_RESET, SERVING_PLUGINSD(parser) ? "expected 0 parameters" : "expected 1 parameter: plugin_name");
+
+ if (SERVING_PLUGINSD(parser)) {
+ unregister_plugin(parser->user.host->configurable_plugins, parser->user.cd->cfg_dict_item);
+ rrdpush_send_dyncfg_reset(parser->user.host, parser->user.cd->configuration->name);
+ parser->user.cd->configuration = NULL;
+ } else {
+ const DICTIONARY_ITEM *di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]);
+ if (unlikely(di == NULL))
+ return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_RESET, "plugin not found");
+ unregister_plugin(parser->user.host->configurable_plugins, di);
+ rrdpush_send_dyncfg_reset(parser->user.host, words[1]);
+ }
+
+ return PARSER_RC_OK;
+}
+
static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_words, PARSER *parser, const char *plugin_name) {
int state = str2i(words[3]);
@@ -2482,7 +2753,7 @@ static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_word
return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job status");
char *message = NULL;
- if (num_words == 5)
+ if (num_words == 5 && strlen(words[4]) > 0)
message = words[4];
const DICTIONARY_ITEM *plugin_item;
@@ -2584,70 +2855,49 @@ static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PAR
// ----------------------------------------------------------------------------
-static inline bool buffered_reader_read(struct buffered_reader *reader, int fd) {
-#ifdef NETDATA_INTERNAL_CHECKS
- if(reader->read_buffer[reader->read_len] != '\0')
- fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
-#endif
-
- ssize_t bytes_read = read(fd, reader->read_buffer + reader->read_len, sizeof(reader->read_buffer) - reader->read_len - 1);
- if(unlikely(bytes_read <= 0))
- return false;
-
- reader->read_len += bytes_read;
- reader->read_buffer[reader->read_len] = '\0';
-
- return true;
-}
-
-static inline bool buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms) {
- errno = 0;
- struct pollfd fds[1];
+void pluginsd_process_thread_cleanup(void *ptr) {
+ PARSER *parser = (PARSER *)ptr;
- fds[0].fd = fd;
- fds[0].events = POLLIN;
+ pluginsd_cleanup_v2(parser);
+ pluginsd_host_define_cleanup(parser);
- int ret = poll(fds, 1, timeout_ms);
+ rrd_collector_finished();
- if (ret > 0) {
- /* There is data to read */
- if (fds[0].revents & POLLIN)
- return buffered_reader_read(reader, fd);
+#ifdef NETDATA_LOG_STREAM_RECEIVE
+ if(parser->user.stream_log_fp) {
+ fclose(parser->user.stream_log_fp);
+ parser->user.stream_log_fp = NULL;
+ }
+#endif
- else if(fds[0].revents & POLLERR) {
- netdata_log_error("PARSER: read failed: POLLERR.");
- return false;
- }
- else if(fds[0].revents & POLLHUP) {
- netdata_log_error("PARSER: read failed: POLLHUP.");
- return false;
- }
- else if(fds[0].revents & POLLNVAL) {
- netdata_log_error("PARSER: read failed: POLLNVAL.");
- return false;
- }
+ parser_destroy(parser);
+}
- netdata_log_error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set.");
- return false;
- }
- else if (ret == 0) {
- netdata_log_error("PARSER: timeout while waiting for data.");
+bool parser_reconstruct_node(BUFFER *wb, void *ptr) {
+ PARSER *parser = ptr;
+ if(!parser || !parser->user.host)
return false;
- }
- netdata_log_error("PARSER: poll() failed with code %d.", ret);
- return false;
+ buffer_strcat(wb, rrdhost_hostname(parser->user.host));
+ return true;
}
-void pluginsd_process_thread_cleanup(void *ptr) {
- PARSER *parser = (PARSER *)ptr;
+bool parser_reconstruct_instance(BUFFER *wb, void *ptr) {
+ PARSER *parser = ptr;
+ if(!parser || !parser->user.st)
+ return false;
- pluginsd_cleanup_v2(parser);
- pluginsd_host_define_cleanup(parser);
+ buffer_strcat(wb, rrdset_name(parser->user.st));
+ return true;
+}
- rrd_collector_finished();
+bool parser_reconstruct_context(BUFFER *wb, void *ptr) {
+ PARSER *parser = ptr;
+ if(!parser || !parser->user.st)
+ return false;
- parser_destroy(parser);
+ buffer_strcat(wb, string2str(parser->user.st->context));
+ return true;
}
inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
@@ -2697,33 +2947,51 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
- buffered_reader_init(&parser->reader);
- BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL);
- while(likely(service_running(SERVICE_COLLECTORS))) {
- if (unlikely(!buffered_reader_next_line(&parser->reader, buffer))) {
- if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC)))
- break;
-
- continue;
- }
-
- if(unlikely(parser_action(parser, buffer->buffer)))
- break;
-
- buffer->len = 0;
- buffer->buffer[0] = '\0';
- }
- buffer_free(buffer);
-
- cd->unsafe.enabled = parser->user.enabled;
- count = parser->user.data_collections_count;
-
- if (likely(count)) {
- cd->successful_collections += count;
- cd->serial_failures = 0;
- }
- else
- cd->serial_failures++;
+ {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
+ ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ buffered_reader_init(&parser->reader);
+ BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL);
+ while(likely(service_running(SERVICE_COLLECTORS))) {
+
+ if(unlikely(!buffered_reader_next_line(&parser->reader, buffer))) {
+ buffered_reader_ret_t ret = buffered_reader_read_timeout(
+ &parser->reader,
+ fileno((FILE *) parser->fp_input),
+ 2 * 60 * MSEC_PER_SEC, true
+ );
+
+ if(unlikely(ret != BUFFERED_READER_READ_OK))
+ break;
+
+ continue;
+ }
+
+ if(unlikely(parser_action(parser, buffer->buffer)))
+ break;
+
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
+ }
+ buffer_free(buffer);
+
+ cd->unsafe.enabled = parser->user.enabled;
+ count = parser->user.data_collections_count;
+
+ if(likely(count)) {
+ cd->successful_collections += count;
+ cd->serial_failures = 0;
+ }
+ else
+ cd->serial_failures++;
+ }
// free parser with the pop function
netdata_thread_cleanup_pop(1);
@@ -2855,6 +3123,9 @@ PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words,
case 103:
return pluginsd_register_job(words, num_words, parser);
+ case 104:
+ return pluginsd_dyncfg_reset(words, num_words, parser);
+
case 110:
return pluginsd_job_status(words, num_words, parser);