diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:22 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:22 +0000 |
commit | c21c3b0befeb46a51b6bf3758ffa30813bea0ff0 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /collectors/plugins.d/pluginsd_parser.c | |
parent | Adding upstream version 1.43.2. (diff) | |
download | netdata-upstream/1.44.3.tar.xz netdata-upstream/1.44.3.zip |
Adding upstream version 1.44.3.upstream/1.44.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'collectors/plugins.d/pluginsd_parser.c')
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 771 |
1 files changed, 521 insertions, 250 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 2e69c7da..3b47c6c0 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -4,8 +4,8 @@ #define LOG_FUNCTIONS false -#define SERVING_STREAMING(parser) (parser->repertoire == PARSER_INIT_STREAMING) -#define SERVING_PLUGINSD(parser) (parser->repertoire == PARSER_INIT_PLUGINSD) +#define SERVING_STREAMING(parser) ((parser)->repertoire == PARSER_INIT_STREAMING) +#define SERVING_PLUGINSD(parser) ((parser)->repertoire == PARSER_INIT_PLUGINSD) static ssize_t send_to_plugin(const char *txt, void *data) { PARSER *parser = data; @@ -13,6 +13,11 @@ static ssize_t send_to_plugin(const char *txt, void *data) { if(!txt || !*txt) return 0; +#ifdef ENABLE_H2O + if(parser->h2o_ctx) + return h2o_stream_write(parser->h2o_ctx, txt, strlen(txt)); +#endif + errno = 0; spinlock_lock(&parser->writer.spinlock); ssize_t bytes = -1; @@ -110,23 +115,6 @@ static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) { return false; } -void pluginsd_rrdset_cleanup(RRDSET *st) { - spinlock_lock(&st->pluginsd.spinlock); - - for(size_t i = 0; i < st->pluginsd.size ; i++) { - rrddim_acquired_release(st->pluginsd.rda[i]); // can be NULL - st->pluginsd.rda[i] = NULL; - } - - freez(st->pluginsd.rda); - st->pluginsd.collector_tid = 0; - st->pluginsd.rda = NULL; - st->pluginsd.size = 0; - st->pluginsd.pos = 0; - - spinlock_unlock(&st->pluginsd.spinlock); -} - static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const char *keyword, bool stale) { if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) { if(stale) @@ -150,7 +138,12 @@ static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const ch static inline void pluginsd_clear_scope_chart(PARSER *parser, const char *keyword) { pluginsd_unlock_previous_scope_chart(parser, keyword, true); + + if(parser->user.cleanup_slots && parser->user.st) + rrdset_pluginsd_receive_unslot(parser->user.st); + parser->user.st = NULL; + parser->user.cleanup_slots = false; } static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const char *keyword) { @@ -160,11 +153,12 @@ static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const ch if(unlikely(old_collector_tid)) { if(old_collector_tid != my_collector_tid) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)", - keyword ? keyword : "UNKNOWN", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - my_collector_tid, old_collector_tid); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING, + "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)", + keyword ? keyword : "UNKNOWN", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + my_collector_tid, old_collector_tid); return false; } @@ -176,61 +170,141 @@ static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const ch pluginsd_clear_scope_chart(parser, keyword); - size_t dims = dictionary_entries(st->rrddim_root_index); - if(unlikely(st->pluginsd.size < dims)) { - st->pluginsd.rda = reallocz(st->pluginsd.rda, dims * sizeof(RRDDIM_ACQUIRED *)); + st->pluginsd.pos = 0; + parser->user.st = st; + parser->user.cleanup_slots = false; + + return true; +} + +static inline void pluginsd_rrddim_put_to_slot(PARSER *parser, RRDSET *st, RRDDIM *rd, ssize_t slot, bool obsolete) { + size_t wanted_size = st->pluginsd.size; + + if(slot >= 1) { + st->pluginsd.dims_with_slots = true; + wanted_size = slot; + } + else { + st->pluginsd.dims_with_slots = false; + wanted_size = dictionary_entries(st->rrddim_root_index); + } + + if(wanted_size > st->pluginsd.size) { + st->pluginsd.prd_array = reallocz(st->pluginsd.prd_array, wanted_size * sizeof(struct pluginsd_rrddim)); // initialize the empty slots - for(ssize_t i = (ssize_t)dims - 1; i >= (ssize_t)st->pluginsd.size ;i--) - st->pluginsd.rda[i] = NULL; + for(ssize_t i = (ssize_t) wanted_size - 1; i >= (ssize_t) st->pluginsd.size; i--) { + st->pluginsd.prd_array[i].rda = NULL; + st->pluginsd.prd_array[i].rd = NULL; + st->pluginsd.prd_array[i].id = NULL; + } - st->pluginsd.size = dims; + st->pluginsd.size = wanted_size; } - st->pluginsd.pos = 0; - parser->user.st = st; + if(st->pluginsd.dims_with_slots) { + struct pluginsd_rrddim *prd = &st->pluginsd.prd_array[slot - 1]; - return true; + if(prd->rd != rd) { + prd->rda = rrddim_find_and_acquire(st, string2str(rd->id)); + prd->rd = rrddim_acquired_to_rrddim(prd->rda); + prd->id = string2str(prd->rd->id); + } + + if(obsolete) + parser->user.cleanup_slots = true; + } } -static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) { +static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, ssize_t slot, const char *cmd) { if (unlikely(!dimension || !*dimension)) { netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.", rrdhost_hostname(host), rrdset_id(st), cmd); return NULL; } - if(unlikely(st->pluginsd.pos >= st->pluginsd.size)) - st->pluginsd.pos = 0; + if (unlikely(!st->pluginsd.size)) { + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, but the chart has no dimensions.", + rrdhost_hostname(host), rrdset_id(st), cmd); + return NULL; + } + + struct pluginsd_rrddim *prd; + RRDDIM *rd; + + if(likely(st->pluginsd.dims_with_slots)) { + // caching with slots + + if(unlikely(slot < 1 || slot > st->pluginsd.size)) { + netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s with slot %zd, but slots in the range [1 - %u] are expected.", + rrdhost_hostname(host), rrdset_id(st), cmd, slot, st->pluginsd.size); + return NULL; + } - RRDDIM_ACQUIRED *rda = st->pluginsd.rda[st->pluginsd.pos]; + prd = &st->pluginsd.prd_array[slot - 1]; - if(likely(rda)) { - RRDDIM *rd = rrddim_acquired_to_rrddim(rda); - if (likely(rd && string_strcmp(rd->id, dimension) == 0)) { - // we found a cached RDA - st->pluginsd.pos++; + rd = prd->rd; + if(likely(rd)) { +#ifdef NETDATA_INTERNAL_CHECKS + if(strcmp(prd->id, dimension) != 0) { + ssize_t t; + for(t = 0; t < st->pluginsd.size ;t++) { + if (strcmp(st->pluginsd.prd_array[t].id, dimension) == 0) + break; + } + if(t >= st->pluginsd.size) + t = -1; + + internal_fatal(true, + "PLUGINSD: expected to find dimension '%s' on slot %zd, but found '%s', " + "the right slot is %zd", + dimension, slot, prd->id, t); + } +#endif return rd; } - else { - // the collector is sending dimensions in a different order - // release the previous one, to reuse this slot - rrddim_acquired_release(rda); - st->pluginsd.rda[st->pluginsd.pos] = NULL; + } + else { + // caching without slots + + if(unlikely(st->pluginsd.pos >= st->pluginsd.size)) + st->pluginsd.pos = 0; + + prd = &st->pluginsd.prd_array[st->pluginsd.pos++]; + + rd = prd->rd; + if(likely(rd)) { + const char *id = prd->id; + + if(strcmp(id, dimension) == 0) { + // we found it cached + return rd; + } + else { + // the cached one is not good for us + rrddim_acquired_release(prd->rda); + prd->rda = NULL; + prd->rd = NULL; + prd->id = NULL; + } } } - rda = rrddim_find_and_acquire(st, dimension); + // we need to find the dimension and set it to prd + + RRDDIM_ACQUIRED *rda = rrddim_find_and_acquire(st, dimension); if (unlikely(!rda)) { netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.", - rrdhost_hostname(host), rrdset_id(st), dimension, cmd); + rrdhost_hostname(host), rrdset_id(st), dimension, cmd); return NULL; } - st->pluginsd.rda[st->pluginsd.pos++] = rda; + prd->rda = rda; + prd->rd = rd = rrddim_acquired_to_rrddim(rda); + prd->id = string2str(rd->id); - return rrddim_acquired_to_rrddim(rda); + return rd; } static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) { @@ -248,20 +322,89 @@ static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, cons return st; } +static inline ssize_t pluginsd_parse_rrd_slot(char **words, size_t num_words) { + ssize_t slot = -1; + char *id = get_word(words, num_words, 1); + if(id && id[0] == PLUGINSD_KEYWORD_SLOT[0] && id[1] == PLUGINSD_KEYWORD_SLOT[1] && + id[2] == PLUGINSD_KEYWORD_SLOT[2] && id[3] == PLUGINSD_KEYWORD_SLOT[3] && id[4] == ':') { + slot = (ssize_t) str2ull_encoded(&id[5]); + if(slot < 0) slot = 0; // to make the caller increment its idx of the words + } + + return slot; +} + +static inline void pluginsd_rrdset_cache_put_to_slot(PARSER *parser, RRDSET *st, ssize_t slot, bool obsolete) { + // clean possible old cached data + rrdset_pluginsd_receive_unslot(st); + + if(unlikely(slot < 1 || slot >= INT32_MAX)) + return; + + RRDHOST *host = st->rrdhost; + + if(unlikely((size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size)) { + spinlock_lock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock); + size_t old_slots = host->rrdpush.receive.pluginsd_chart_slots.size; + size_t new_slots = (old_slots < PLUGINSD_MIN_RRDSET_POINTERS_CACHE) ? PLUGINSD_MIN_RRDSET_POINTERS_CACHE : old_slots * 2; + + if(new_slots < (size_t)slot) + new_slots = slot; + + host->rrdpush.receive.pluginsd_chart_slots.array = + reallocz(host->rrdpush.receive.pluginsd_chart_slots.array, new_slots * sizeof(RRDSET *)); + + for(size_t i = old_slots; i < new_slots ;i++) + host->rrdpush.receive.pluginsd_chart_slots.array[i] = NULL; + + host->rrdpush.receive.pluginsd_chart_slots.size = new_slots; + spinlock_unlock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock); + } + + host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1] = st; + st->pluginsd.last_slot = (int32_t)slot - 1; + parser->user.cleanup_slots = obsolete; +} + +static inline RRDSET *pluginsd_rrdset_cache_get_from_slot(PARSER *parser, RRDHOST *host, const char *id, ssize_t slot, const char *keyword) { + if(unlikely(slot < 1 || (size_t)slot > host->rrdpush.receive.pluginsd_chart_slots.size)) + return pluginsd_find_chart(host, id, keyword); + + RRDSET *st = host->rrdpush.receive.pluginsd_chart_slots.array[slot - 1]; + + if(!st) { + st = pluginsd_find_chart(host, id, keyword); + if(st) + pluginsd_rrdset_cache_put_to_slot(parser, st, slot, rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)); + } + else { + internal_fatal(string_strcmp(st->id, id) != 0, + "PLUGINSD: wrong chart in slot %zd, expected '%s', found '%s'", + slot - 1, id, string2str(st->id)); + } + + return st; +} + static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg) { parser->user.enabled = 0; if(keyword && msg) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "PLUGINSD: keyword %s: %s", keyword, msg); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_INFO, + "PLUGINSD: keyword %s: %s", keyword, msg); } return PARSER_RC_ERROR; } static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) { - char *dimension = get_word(words, num_words, 1); - char *value = get_word(words, num_words, 2); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *dimension = get_word(words, num_words, idx++); + char *value = get_word(words, num_words, idx++); RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); @@ -269,7 +412,7 @@ static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *par RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART); if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET); + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET); if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); st->pluginsd.set = true; @@ -285,13 +428,17 @@ static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *par } static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) { - char *id = get_word(words, num_words, 1); - char *microseconds_txt = get_word(words, num_words, 2); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *id = get_word(words, num_words, idx++); + char *microseconds_txt = get_word(words, num_words, idx++); RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN); + RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN); if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN)) @@ -332,8 +479,9 @@ static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *p } static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) { - UNUSED(words); - UNUSED(num_words); + char *tv_sec = get_word(words, num_words, 1); + char *tv_usec = get_word(words, num_words, 2); + char *pending_rrdset_next = get_word(words, num_words, 3); RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); @@ -347,9 +495,15 @@ static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *par pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_END); parser->user.data_collections_count++; - struct timeval now; - now_realtime_timeval(&now); - rrdset_timed_done(st, now, /* pending_rrdset_next = */ false); + struct timeval tv = { + .tv_sec = (tv_sec && *tv_sec) ? str2ll(tv_sec, NULL) : 0, + .tv_usec = (tv_usec && *tv_usec) ? str2ll(tv_usec, NULL) : 0 + }; + + if(!tv.tv_sec) + now_realtime_timeval(&tv); + + rrdset_timed_done(st, tv, pending_rrdset_next && *pending_rrdset_next ? true : false); return PARSER_RC_OK; } @@ -419,30 +573,29 @@ static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, si return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this"); RRDHOST *host = rrdhost_find_or_create( - string2str(parser->user.host_define.hostname), - string2str(parser->user.host_define.hostname), - parser->user.host_define.machine_guid_str, - "Netdata Virtual Host 1.0", - netdata_configured_timezone, - netdata_configured_abbrev_timezone, - netdata_configured_utc_offset, - NULL, - program_name, - program_version, - default_rrd_update_every, - default_rrd_history_entries, - default_rrd_memory_mode, - default_health_enabled, - default_rrdpush_enabled, - default_rrdpush_destination, - default_rrdpush_api_key, - default_rrdpush_send_charts_matching, - default_rrdpush_enable_replication, - default_rrdpush_seconds_to_replicate, - default_rrdpush_replication_step, - rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels), - false - ); + string2str(parser->user.host_define.hostname), + string2str(parser->user.host_define.hostname), + parser->user.host_define.machine_guid_str, + "Netdata Virtual Host 1.0", + netdata_configured_timezone, + netdata_configured_abbrev_timezone, + netdata_configured_utc_offset, + NULL, + program_name, + program_version, + default_rrd_update_every, + default_rrd_history_entries, + default_rrd_memory_mode, + default_health_enabled, + default_rrdpush_enabled, + default_rrdpush_destination, + default_rrdpush_api_key, + default_rrdpush_send_charts_matching, + default_rrdpush_enable_replication, + default_rrdpush_seconds_to_replicate, + default_rrdpush_replication_step, + rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels), + false); rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST); @@ -492,18 +645,22 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); - char *type = get_word(words, num_words, 1); - char *name = get_word(words, num_words, 2); - char *title = get_word(words, num_words, 3); - char *units = get_word(words, num_words, 4); - char *family = get_word(words, num_words, 5); - char *context = get_word(words, num_words, 6); - char *chart = get_word(words, num_words, 7); - char *priority_s = get_word(words, num_words, 8); - char *update_every_s = get_word(words, num_words, 9); - char *options = get_word(words, num_words, 10); - char *plugin = get_word(words, num_words, 11); - char *module = get_word(words, num_words, 12); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *type = get_word(words, num_words, idx++); + char *name = get_word(words, num_words, idx++); + char *title = get_word(words, num_words, idx++); + char *units = get_word(words, num_words, idx++); + char *family = get_word(words, num_words, idx++); + char *context = get_word(words, num_words, idx++); + char *chart = get_word(words, num_words, idx++); + char *priority_s = get_word(words, num_words, idx++); + char *update_every_s = get_word(words, num_words, idx++); + char *options = get_word(words, num_words, idx++); + char *plugin = get_word(words, num_words, idx++); + char *module = get_word(words, num_words, idx++); // parse the id from type char *id = NULL; @@ -570,14 +727,15 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p module, priority, update_every, chart_type); + bool obsolete = false; if (likely(st)) { if (options && *options) { if (strstr(options, "obsolete")) { - pluginsd_rrdset_cleanup(st); - rrdset_is_obsolete(st); + rrdset_is_obsolete___safe_from_collector_thread(st); + obsolete = true; } else - rrdset_isnot_obsolete(st); + rrdset_isnot_obsolete___safe_from_collector_thread(st); if (strstr(options, "detail")) rrdset_flag_set(st, RRDSET_FLAG_DETAIL); @@ -595,13 +753,15 @@ static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *p rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); } else { - rrdset_isnot_obsolete(st); + rrdset_isnot_obsolete___safe_from_collector_thread(st); rrdset_flag_clear(st, RRDSET_FLAG_DETAIL); rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST); } if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_CHART)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); + + pluginsd_rrdset_cache_put_to_slot(parser, st, slot, obsolete); } else pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_CHART); @@ -652,12 +812,16 @@ static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_w } static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) { - char *id = get_word(words, num_words, 1); - char *name = get_word(words, num_words, 2); - char *algorithm = get_word(words, num_words, 3); - char *multiplier_s = get_word(words, num_words, 4); - char *divisor_s = get_word(words, num_words, 5); - char *options = get_word(words, num_words, 6); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *id = get_word(words, num_words, idx++); + char *name = get_word(words, num_words, idx++); + char *algorithm = get_word(words, num_words, idx++); + char *multiplier_s = get_word(words, num_words, idx++); + char *divisor_s = get_word(words, num_words, idx++); + char *options = get_word(words, num_words, idx++); RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_DIMENSION); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); @@ -696,11 +860,14 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE int unhide_dimension = 1; rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); + bool obsolete = false; if (options && *options) { - if (strstr(options, "obsolete") != NULL) - rrddim_is_obsolete(st, rd); + if (strstr(options, "obsolete") != NULL) { + obsolete = true; + rrddim_is_obsolete___safe_from_collector_thread(st, rd); + } else - rrddim_isnot_obsolete(st, rd); + rrddim_isnot_obsolete___safe_from_collector_thread(st, rd); unhide_dimension = !strstr(options, "hidden"); @@ -708,8 +875,9 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); if (strstr(options, "nooverflow") != NULL) rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS); - } else - rrddim_isnot_obsolete(st, rd); + } + else + rrddim_isnot_obsolete___safe_from_collector_thread(st, rd); bool should_update_dimension = false; @@ -727,6 +895,8 @@ static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSE rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); } + pluginsd_rrddim_put_to_slot(parser, st, rd, slot, obsolete); + return PARSER_RC_OK; } @@ -759,7 +929,7 @@ static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void const char *transaction = dictionary_acquired_item_name(item); char buffer[2048 + 1]; - snprintfz(buffer, 2048, "%s %s %d \"%s\"\n", + snprintfz(buffer, sizeof(buffer) - 1, "%s %s %d \"%s\"\n", pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION", transaction, pf->timeout, @@ -932,7 +1102,7 @@ void pluginsd_function_cancel(void *data) { internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction); char buffer[2048 + 1]; - snprintfz(buffer, 2048, "%s %s\n", + snprintfz(buffer, sizeof(buffer) - 1, "%s %s\n", PLUGINSD_KEYWORD_FUNCTION_CANCEL, transaction); @@ -947,7 +1117,8 @@ void pluginsd_function_cancel(void *data) { dfe_done(t); if(sent <= 0) - netdata_log_error("PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d."); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d."); } // this is the function that is called from @@ -1247,6 +1418,15 @@ static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *p if(unlikely(!(parser->user.new_host_labels))) parser->user.new_host_labels = rrdlabels_create(); + if (strcmp(name,HOST_LABEL_IS_EPHEMERAL) == 0) { + int is_ephemeral = appconfig_test_boolean_value((char *) value); + if (is_ephemeral) { + RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_LABEL); + if (likely(host)) + rrdhost_option_set(host, RRDHOST_OPTION_EPHEMERAL_HOST); + } + } + rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source)); if (allocated_store) @@ -1265,6 +1445,8 @@ static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t n host->rrdlabels = rrdlabels_create(); rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels); + if (rrdhost_option_check(host, RRDHOST_OPTION_EPHEMERAL_HOST)) + rrdlabels_add(host->rrdlabels, HOST_LABEL_IS_EPHEMERAL, "true", RRDLABEL_SRC_CONFIG); rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE); rrdlabels_destroy(parser->user.new_host_labels); @@ -1311,16 +1493,21 @@ static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE); rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); + rrdset_metadata_updated(st); + parser->user.chart_rrdlabels_linked_temporarily = NULL; return PARSER_RC_OK; } static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) { - char *id = get_word(words, num_words, 1); - char *start_time_str = get_word(words, num_words, 2); - char *end_time_str = get_word(words, num_words, 3); - char *child_now_str = get_word(words, num_words, 4); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *id = get_word(words, num_words, idx++); + char *start_time_str = get_word(words, num_words, idx++); + char *end_time_str = get_word(words, num_words, idx++); + char *child_now_str = get_word(words, num_words, idx++); RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); @@ -1329,7 +1516,7 @@ static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PA if (likely(!id || !*id)) st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN); else - st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN); + st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_REPLAY_BEGIN); if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); @@ -1444,9 +1631,13 @@ static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str } static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser) { - char *dimension = get_word(words, num_words, 1); - char *value_str = get_word(words, num_words, 2); - char *flags_str = get_word(words, num_words, 3); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *dimension = get_word(words, num_words, idx++); + char *value_str = get_word(words, num_words, idx++); + char *flags_str = get_word(words, num_words, idx++); RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_SET); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); @@ -1455,15 +1646,16 @@ static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARS if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if(!parser->user.replay.rset_enabled) { - error_limit_static_thread_var(erl, 1, 0); - error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors", - rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); + nd_log_limit_static_thread_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_ERR, + "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors", + rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN); // we have to return OK here return PARSER_RC_OK; } - RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET); + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_REPLAY_SET); if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); st->pluginsd.set = true; @@ -1504,8 +1696,10 @@ static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARS rd->collector.counter++; } else { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.", + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING, + "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. " + "Ignoring data.", rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd)); } } @@ -1517,11 +1711,15 @@ static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, si if(parser->user.replay.rset_enabled == false) return PARSER_RC_OK; - char *dimension = get_word(words, num_words, 1); - char *last_collected_ut_str = get_word(words, num_words, 2); - char *last_collected_value_str = get_word(words, num_words, 3); - char *last_calculated_value_str = get_word(words, num_words, 4); - char *last_stored_value_str = get_word(words, num_words, 5); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *dimension = get_word(words, num_words, idx++); + char *last_collected_ut_str = get_word(words, num_words, idx++); + char *last_collected_value_str = get_word(words, num_words, idx++); + char *last_calculated_value_str = get_word(words, num_words, idx++); + char *last_stored_value_str = get_word(words, num_words, idx++); RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); @@ -1535,7 +1733,7 @@ static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, si st->pluginsd.set = false; } - RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE); if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec; @@ -1699,10 +1897,14 @@ static inline PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARS static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) { timing_init(); - char *id = get_word(words, num_words, 1); - char *update_every_str = get_word(words, num_words, 2); - char *end_time_str = get_word(words, num_words, 3); - char *wall_clock_time_str = get_word(words, num_words, 4); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *id = get_word(words, num_words, idx++); + char *update_every_str = get_word(words, num_words, idx++); + char *end_time_str = get_word(words, num_words, idx++); + char *wall_clock_time_str = get_word(words, num_words, idx++); if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str)) return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters"); @@ -1712,14 +1914,15 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER timing_step(TIMING_STEP_BEGIN2_PREPARE); - RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN_V2); + RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN_V2); + if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE))) - rrdset_isnot_obsolete(st); + rrdset_isnot_obsolete___safe_from_collector_thread(st); timing_step(TIMING_STEP_BEGIN2_FIND_CHART); @@ -1759,9 +1962,12 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time); if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) { - // check if receiver and sender have the same number parsing capabilities + // check receiver capabilities bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754); - NUMBER_ENCODING encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; + + // check sender capabilities + bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false; + NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; BUFFER *wb = parser->user.v2.stream_buffer.wb; @@ -1770,28 +1976,35 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER if(unlikely(parser->user.v2.stream_buffer.begin_v2_added)) buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1); - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot); + } + + buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id)); buffer_fast_strcat(wb, "' ", 2); if(can_copy) buffer_strcat(wb, update_every_str); else - buffer_print_uint64_encoded(wb, encoding, update_every); + buffer_print_uint64_encoded(wb, integer_encoding, update_every); buffer_fast_strcat(wb, " ", 1); if(can_copy) buffer_strcat(wb, end_time_str); else - buffer_print_uint64_encoded(wb, encoding, end_time); + buffer_print_uint64_encoded(wb, integer_encoding, end_time); buffer_fast_strcat(wb, " ", 1); if(can_copy) buffer_strcat(wb, wall_clock_time_str); else - buffer_print_uint64_encoded(wb, encoding, wall_clock_time); + buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time); buffer_fast_strcat(wb, "\n", 1); @@ -1824,10 +2037,14 @@ static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) { timing_init(); - char *dimension = get_word(words, num_words, 1); - char *collected_str = get_word(words, num_words, 2); - char *value_str = get_word(words, num_words, 3); - char *flags_str = get_word(words, num_words, 4); + int idx = 1; + ssize_t slot = pluginsd_parse_rrd_slot(words, num_words); + if(slot >= 0) idx++; + + char *dimension = get_word(words, num_words, idx++); + char *collected_str = get_word(words, num_words, idx++); + char *value_str = get_word(words, num_words, idx++); + char *flags_str = get_word(words, num_words, idx++); if(unlikely(!dimension || !collected_str || !value_str || !flags_str)) return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters"); @@ -1840,13 +2057,13 @@ static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER * timing_step(TIMING_STEP_SET2_PREPARE); - RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET_V2); + RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET_V2); if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL); st->pluginsd.set = true; if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED))) - rrddim_isnot_obsolete(st, rd); + rrddim_isnot_obsolete___safe_from_collector_thread(st, rd); timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION); @@ -1892,12 +2109,22 @@ static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER * if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) { // check if receiver and sender have the same number parsing capabilities bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754); + + // check the sender capabilities + bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false; NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX; NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; BUFFER *wb = parser->user.v2.stream_buffer.wb; buffer_need_bytes(wb, 1024); - buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2); + buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1); + + if(with_slots) { + buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2); + buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot); + } + + buffer_fast_strcat(wb, " '", 2); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "' ", 2); if(can_copy) @@ -1978,13 +2205,27 @@ static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_ // ------------------------------------------------------------------------ // cleanup RRDSET / RRDDIM - RRDDIM *rd; - rrddim_foreach_read(rd, st) { - rd->collector.calculated_value = 0; - rd->collector.collected_value = 0; - rrddim_clear_updated(rd); + if(likely(st->pluginsd.dims_with_slots)) { + for(size_t i = 0; i < st->pluginsd.size ;i++) { + RRDDIM *rd = st->pluginsd.prd_array[i].rd; + + if(!rd) + continue; + + rd->collector.calculated_value = 0; + rd->collector.collected_value = 0; + rrddim_clear_updated(rd); + } + } + else { + RRDDIM *rd; + rrddim_foreach_read(rd, st){ + rd->collector.calculated_value = 0; + rd->collector.collected_value = 0; + rrddim_clear_updated(rd); + } + rrddim_foreach_done(rd); } - rrddim_foreach_done(rd); // ------------------------------------------------------------------------ // reset state @@ -2438,24 +2679,35 @@ static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, si } static inline PARSER_RC pluginsd_register_job_common(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused, const char *plugin_name) { - if (atol(words[3]) < 0) - return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags"); - dyncfg_job_flg_t flags = atol(words[3]); + const char *module_name = words[0]; + const char *job_name = words[1]; + const char *job_type_str = words[2]; + const char *flags_str = words[3]; + + long f = str2l(flags_str); + + if (f < 0) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags received"); + + dyncfg_job_flg_t flags = f; + if (SERVING_PLUGINSD(parser)) flags |= JOB_FLG_PLUGIN_PUSHED; else flags |= JOB_FLG_STREAMING_PUSHED; - enum job_type job_type = str2job_type(words[2]); + enum job_type job_type = dyncfg_str2job_type(job_type_str); if (job_type == JOB_TYPE_UNKNOWN) return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "unknown job type"); + if (SERVING_PLUGINSD(parser) && job_type == JOB_TYPE_USER) return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "plugins cannot push jobs of type \"user\" (this is allowed only in streaming)"); - if (register_job(parser->user.host->configurable_plugins, plugin_name, words[0], words[1], job_type, flags, 0)) // ignore existing is off as this is explicitly called register job + if (register_job(parser->user.host->configurable_plugins, plugin_name, module_name, job_name, job_type, flags, 0)) // ignore existing is off as this is explicitly called register job return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "error registering job"); - rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, words[0], words[1], job_type, flags); + rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, module_name, job_name, job_type, flags); + return PARSER_RC_OK; } @@ -2474,6 +2726,25 @@ static inline PARSER_RC pluginsd_register_job(char **words __maybe_unused, size_ return pluginsd_register_job_common(&words[2], num_words - 2, parser, words[1]); } +static inline PARSER_RC pluginsd_dyncfg_reset(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) { + if (unlikely(num_words != (SERVING_PLUGINSD(parser) ? 1 : 2))) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_RESET, SERVING_PLUGINSD(parser) ? "expected 0 parameters" : "expected 1 parameter: plugin_name"); + + if (SERVING_PLUGINSD(parser)) { + unregister_plugin(parser->user.host->configurable_plugins, parser->user.cd->cfg_dict_item); + rrdpush_send_dyncfg_reset(parser->user.host, parser->user.cd->configuration->name); + parser->user.cd->configuration = NULL; + } else { + const DICTIONARY_ITEM *di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]); + if (unlikely(di == NULL)) + return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_RESET, "plugin not found"); + unregister_plugin(parser->user.host->configurable_plugins, di); + rrdpush_send_dyncfg_reset(parser->user.host, words[1]); + } + + return PARSER_RC_OK; +} + static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_words, PARSER *parser, const char *plugin_name) { int state = str2i(words[3]); @@ -2482,7 +2753,7 @@ static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_word return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job status"); char *message = NULL; - if (num_words == 5) + if (num_words == 5 && strlen(words[4]) > 0) message = words[4]; const DICTIONARY_ITEM *plugin_item; @@ -2584,70 +2855,49 @@ static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PAR // ---------------------------------------------------------------------------- -static inline bool buffered_reader_read(struct buffered_reader *reader, int fd) { -#ifdef NETDATA_INTERNAL_CHECKS - if(reader->read_buffer[reader->read_len] != '\0') - fatal("%s(): read_buffer does not start with zero", __FUNCTION__ ); -#endif - - ssize_t bytes_read = read(fd, reader->read_buffer + reader->read_len, sizeof(reader->read_buffer) - reader->read_len - 1); - if(unlikely(bytes_read <= 0)) - return false; - - reader->read_len += bytes_read; - reader->read_buffer[reader->read_len] = '\0'; - - return true; -} - -static inline bool buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms) { - errno = 0; - struct pollfd fds[1]; +void pluginsd_process_thread_cleanup(void *ptr) { + PARSER *parser = (PARSER *)ptr; - fds[0].fd = fd; - fds[0].events = POLLIN; + pluginsd_cleanup_v2(parser); + pluginsd_host_define_cleanup(parser); - int ret = poll(fds, 1, timeout_ms); + rrd_collector_finished(); - if (ret > 0) { - /* There is data to read */ - if (fds[0].revents & POLLIN) - return buffered_reader_read(reader, fd); +#ifdef NETDATA_LOG_STREAM_RECEIVE + if(parser->user.stream_log_fp) { + fclose(parser->user.stream_log_fp); + parser->user.stream_log_fp = NULL; + } +#endif - else if(fds[0].revents & POLLERR) { - netdata_log_error("PARSER: read failed: POLLERR."); - return false; - } - else if(fds[0].revents & POLLHUP) { - netdata_log_error("PARSER: read failed: POLLHUP."); - return false; - } - else if(fds[0].revents & POLLNVAL) { - netdata_log_error("PARSER: read failed: POLLNVAL."); - return false; - } + parser_destroy(parser); +} - netdata_log_error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set."); - return false; - } - else if (ret == 0) { - netdata_log_error("PARSER: timeout while waiting for data."); +bool parser_reconstruct_node(BUFFER *wb, void *ptr) { + PARSER *parser = ptr; + if(!parser || !parser->user.host) return false; - } - netdata_log_error("PARSER: poll() failed with code %d.", ret); - return false; + buffer_strcat(wb, rrdhost_hostname(parser->user.host)); + return true; } -void pluginsd_process_thread_cleanup(void *ptr) { - PARSER *parser = (PARSER *)ptr; +bool parser_reconstruct_instance(BUFFER *wb, void *ptr) { + PARSER *parser = ptr; + if(!parser || !parser->user.st) + return false; - pluginsd_cleanup_v2(parser); - pluginsd_host_define_cleanup(parser); + buffer_strcat(wb, rrdset_name(parser->user.st)); + return true; +} - rrd_collector_finished(); +bool parser_reconstruct_context(BUFFER *wb, void *ptr) { + PARSER *parser = ptr; + if(!parser || !parser->user.st) + return false; - parser_destroy(parser); + buffer_strcat(wb, string2str(parser->user.st->context)); + return true; } inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations) @@ -2697,33 +2947,51 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - buffered_reader_init(&parser->reader); - BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL); - while(likely(service_running(SERVICE_COLLECTORS))) { - if (unlikely(!buffered_reader_next_line(&parser->reader, buffer))) { - if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC))) - break; - - continue; - } - - if(unlikely(parser_action(parser, buffer->buffer))) - break; - - buffer->len = 0; - buffer->buffer[0] = '\0'; - } - buffer_free(buffer); - - cd->unsafe.enabled = parser->user.enabled; - count = parser->user.data_collections_count; - - if (likely(count)) { - cd->successful_collections += count; - cd->serial_failures = 0; - } - else - cd->serial_failures++; + { + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line), + ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser), + ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser), + ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + buffered_reader_init(&parser->reader); + BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL); + while(likely(service_running(SERVICE_COLLECTORS))) { + + if(unlikely(!buffered_reader_next_line(&parser->reader, buffer))) { + buffered_reader_ret_t ret = buffered_reader_read_timeout( + &parser->reader, + fileno((FILE *) parser->fp_input), + 2 * 60 * MSEC_PER_SEC, true + ); + + if(unlikely(ret != BUFFERED_READER_READ_OK)) + break; + + continue; + } + + if(unlikely(parser_action(parser, buffer->buffer))) + break; + + buffer->len = 0; + buffer->buffer[0] = '\0'; + } + buffer_free(buffer); + + cd->unsafe.enabled = parser->user.enabled; + count = parser->user.data_collections_count; + + if(likely(count)) { + cd->successful_collections += count; + cd->serial_failures = 0; + } + else + cd->serial_failures++; + } // free parser with the pop function netdata_thread_cleanup_pop(1); @@ -2855,6 +3123,9 @@ PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, case 103: return pluginsd_register_job(words, num_words, parser); + case 104: + return pluginsd_dyncfg_reset(words, num_words, parser); + case 110: return pluginsd_job_status(words, num_words, parser); |