summaryrefslogtreecommitdiffstats
path: root/database/rrdset.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:20:02 +0000
commit58daab21cd043e1dc37024a7f99b396788372918 (patch)
tree96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /database/rrdset.c
parentReleasing debian version 1.43.2-1. (diff)
downloadnetdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz
netdata-58daab21cd043e1dc37024a7f99b396788372918.zip
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/rrdset.c')
-rw-r--r--database/rrdset.c179
1 files changed, 161 insertions, 18 deletions
diff --git a/database/rrdset.c b/database/rrdset.c
index 92386f45e..f4bb48aa7 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -5,6 +5,129 @@
#include <sched.h>
#include "storage_engine.h"
+
+void rrdset_metadata_updated(RRDSET *st) {
+ __atomic_add_fetch(&st->version, 1, __ATOMIC_RELAXED);
+ rrdcontext_updated_rrdset(st);
+}
+
+// ----------------------------------------------------------------------------
+// RRDSET rrdpush send chart_slots
+
+static void rrdset_rrdpush_send_chart_slot_assign(RRDSET *st) {
+ RRDHOST *host = st->rrdhost;
+ spinlock_lock(&host->rrdpush.send.pluginsd_chart_slots.available.spinlock);
+
+ if(host->rrdpush.send.pluginsd_chart_slots.available.used > 0)
+ st->rrdpush.sender.chart_slot =
+ host->rrdpush.send.pluginsd_chart_slots.available.array[--host->rrdpush.send.pluginsd_chart_slots.available.used];
+ else
+ st->rrdpush.sender.chart_slot = ++host->rrdpush.send.pluginsd_chart_slots.last_used;
+
+ spinlock_unlock(&host->rrdpush.send.pluginsd_chart_slots.available.spinlock);
+}
+
+static void rrdset_rrdpush_send_chart_slot_release(RRDSET *st) {
+ if(!st->rrdpush.sender.chart_slot || st->rrdhost->rrdpush.send.pluginsd_chart_slots.available.ignore)
+ return;
+
+ RRDHOST *host = st->rrdhost;
+ spinlock_lock(&host->rrdpush.send.pluginsd_chart_slots.available.spinlock);
+
+ if(host->rrdpush.send.pluginsd_chart_slots.available.used >= host->rrdpush.send.pluginsd_chart_slots.available.size) {
+ uint32_t old_size = host->rrdpush.send.pluginsd_chart_slots.available.size;
+ uint32_t new_size = (old_size > 0) ? (old_size * 2) : 1024;
+
+ host->rrdpush.send.pluginsd_chart_slots.available.array =
+ reallocz(host->rrdpush.send.pluginsd_chart_slots.available.array, new_size * sizeof(uint32_t));
+
+ host->rrdpush.send.pluginsd_chart_slots.available.size = new_size;
+ }
+
+ host->rrdpush.send.pluginsd_chart_slots.available.array[host->rrdpush.send.pluginsd_chart_slots.available.used++] =
+ st->rrdpush.sender.chart_slot;
+
+ st->rrdpush.sender.chart_slot = 0;
+ spinlock_unlock(&host->rrdpush.send.pluginsd_chart_slots.available.spinlock);
+}
+
+void rrdhost_pluginsd_send_chart_slots_free(RRDHOST *host) {
+ spinlock_lock(&host->rrdpush.send.pluginsd_chart_slots.available.spinlock);
+ host->rrdpush.send.pluginsd_chart_slots.available.ignore = true;
+ freez(host->rrdpush.send.pluginsd_chart_slots.available.array);
+ host->rrdpush.send.pluginsd_chart_slots.available.array = NULL;
+ host->rrdpush.send.pluginsd_chart_slots.available.used = 0;
+ host->rrdpush.send.pluginsd_chart_slots.available.size = 0;
+ spinlock_unlock(&host->rrdpush.send.pluginsd_chart_slots.available.spinlock);
+
+ // zero all the slots on all charts, so that they will not attempt to access the array
+ RRDSET *st;
+ rrdset_foreach_read(st, host) {
+ st->rrdpush.sender.chart_slot = 0;
+ }
+ rrdset_foreach_done(st);
+}
+
+void rrdset_pluginsd_receive_unslot(RRDSET *st) {
+ for(size_t i = 0; i < st->pluginsd.size ;i++) {
+ rrddim_acquired_release(st->pluginsd.prd_array[i].rda); // can be NULL
+ st->pluginsd.prd_array[i].rda = NULL;
+ st->pluginsd.prd_array[i].rd = NULL;
+ st->pluginsd.prd_array[i].id = NULL;
+ }
+
+ RRDHOST *host = st->rrdhost;
+
+ if(st->pluginsd.last_slot >= 0 &&
+ (uint32_t)st->pluginsd.last_slot < host->rrdpush.receive.pluginsd_chart_slots.size &&
+ host->rrdpush.receive.pluginsd_chart_slots.array[st->pluginsd.last_slot] == st) {
+ host->rrdpush.receive.pluginsd_chart_slots.array[st->pluginsd.last_slot] = NULL;
+ }
+
+ st->pluginsd.last_slot = -1;
+ st->pluginsd.dims_with_slots = false;
+}
+
+void rrdset_pluginsd_receive_unslot_and_cleanup(RRDSET *st) {
+ if(!st)
+ return;
+
+ spinlock_lock(&st->pluginsd.spinlock);
+
+ rrdset_pluginsd_receive_unslot(st);
+
+ freez(st->pluginsd.prd_array);
+ st->pluginsd.prd_array = NULL;
+ st->pluginsd.size = 0;
+ st->pluginsd.pos = 0;
+ st->pluginsd.set = false;
+ st->pluginsd.last_slot = -1;
+ st->pluginsd.dims_with_slots = false;
+ st->pluginsd.collector_tid = 0;
+
+ spinlock_unlock(&st->pluginsd.spinlock);
+}
+
+static void rrdset_pluginsd_receive_slots_initialize(RRDSET *st) {
+ spinlock_init(&st->pluginsd.spinlock);
+ st->pluginsd.last_slot = -1;
+}
+
+void rrdhost_pluginsd_receive_chart_slots_free(RRDHOST *host) {
+ spinlock_lock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock);
+
+ if(host->rrdpush.receive.pluginsd_chart_slots.array) {
+ for (size_t s = 0; s < host->rrdpush.receive.pluginsd_chart_slots.size; s++)
+ rrdset_pluginsd_receive_unslot_and_cleanup(host->rrdpush.receive.pluginsd_chart_slots.array[s]);
+
+ freez(host->rrdpush.receive.pluginsd_chart_slots.array);
+ host->rrdpush.receive.pluginsd_chart_slots.array = NULL;
+ host->rrdpush.receive.pluginsd_chart_slots.size = 0;
+ }
+
+ spinlock_unlock(&host->rrdpush.receive.pluginsd_chart_slots.spinlock);
+}
+
// ----------------------------------------------------------------------------
// RRDSET name index
@@ -39,8 +162,8 @@ static inline RRDSET *rrdset_index_find_name(RRDHOST *host, const char *name) {
static inline void rrdset_update_permanent_labels(RRDSET *st) {
if(!st->rrdlabels) return;
- rrdlabels_add(st->rrdlabels, "_collect_plugin", rrdset_plugin_name(st), RRDLABEL_SRC_AUTO| RRDLABEL_FLAG_PERMANENT);
- rrdlabels_add(st->rrdlabels, "_collect_module", rrdset_module_name(st), RRDLABEL_SRC_AUTO| RRDLABEL_FLAG_PERMANENT);
+ rrdlabels_add(st->rrdlabels, "_collect_plugin", rrdset_plugin_name(st), RRDLABEL_SRC_AUTO | RRDLABEL_FLAG_DONT_DELETE);
+ rrdlabels_add(st->rrdlabels, "_collect_module", rrdset_module_name(st), RRDLABEL_SRC_AUTO | RRDLABEL_FLAG_DONT_DELETE);
}
static STRING *rrdset_fix_name(RRDHOST *host, const char *chart_full_id, const char *type, const char *current_name, const char *name) {
@@ -64,7 +187,7 @@ static STRING *rrdset_fix_name(RRDHOST *host, const char *chart_full_id, const c
i++;
} while (rrdset_index_find_name(host, new_name));
- netdata_log_info("RRDSET: using name '%s' for chart '%s' on host '%s'.", new_name, full_name, rrdhost_hostname(host));
+// netdata_log_info("RRDSET: using name '%s' for chart '%s' on host '%s'.", new_name, full_name, rrdhost_hostname(host));
}
else
return NULL;
@@ -135,6 +258,8 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v
st->chart_type = ctr->chart_type;
st->rrdhost = host;
+ rrdset_rrdpush_send_chart_slot_assign(st);
+
spinlock_init(&st->data_collection_lock);
st->flags = RRDSET_FLAG_SYNC_CLOCK
@@ -179,13 +304,13 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v
st->green = NAN;
st->red = NAN;
+ rrdset_pluginsd_receive_slots_initialize(st);
+
ctr->react_action = RRDSET_REACT_NEW;
ml_chart_new(st);
}
-void pluginsd_rrdset_cleanup(RRDSET *st);
-
void rrdset_finalize_collection(RRDSET *st, bool dimensions_too) {
RRDHOST *host = st->rrdhost;
@@ -208,7 +333,7 @@ void rrdset_finalize_collection(RRDSET *st, bool dimensions_too) {
}
}
- pluginsd_rrdset_cleanup(st);
+ rrdset_pluginsd_receive_unslot_and_cleanup(st);
}
// the destructor - the dictionary is write locked while this runs
@@ -220,6 +345,8 @@ static void rrdset_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, v
rrdset_finalize_collection(st, false);
+ rrdset_rrdpush_send_chart_slot_release(st);
+
// remove it from the name index
rrdset_index_del_name(host, st);
@@ -288,7 +415,7 @@ static bool rrdset_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused,
struct rrdset_constructor *ctr = constructor_data;
RRDSET *st = rrdset;
- rrdset_isnot_obsolete(st);
+ rrdset_isnot_obsolete___safe_from_collector_thread(st);
ctr->react_action = RRDSET_REACT_NONE;
@@ -363,7 +490,6 @@ static bool rrdset_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused,
rrdset_update_permanent_labels(st);
rrdset_flag_set(st, RRDSET_FLAG_SYNC_CLOCK);
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
return ctr->react_action != RRDSET_REACT_NONE;
}
@@ -389,10 +515,9 @@ static void rrdset_react_callback(const DICTIONARY_ITEM *item __maybe_unused, vo
}
rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
}
- rrdcontext_updated_rrdset(st);
+ rrdset_metadata_updated(st);
}
void rrdset_index_init(RRDHOST *host) {
@@ -543,7 +668,7 @@ int rrdset_reset_name(RRDSET *st, const char *name) {
rrdset_flag_clear(st, RRDSET_FLAG_EXPORTING_IGNORE);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_SEND);
rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_IGNORE);
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrdset_metadata_updated(st);
rrdcontext_updated_rrdset_name(st);
return 2;
@@ -652,14 +777,19 @@ void rrdset_get_retention_of_tier_for_collected_chart(RRDSET *st, time_t *first_
*last_time_s = db_last_entry_s;
}
-inline void rrdset_is_obsolete(RRDSET *st) {
+inline void rrdset_is_obsolete___safe_from_collector_thread(RRDSET *st) {
+ rrdset_pluginsd_receive_unslot(st);
+
if(unlikely(!(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))) {
+// netdata_log_info("Setting obsolete flag on chart 'host:%s/chart:%s'",
+// rrdhost_hostname(st->rrdhost), rrdset_id(st));
+
rrdset_flag_set(st, RRDSET_FLAG_OBSOLETE);
rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_PENDING_OBSOLETE_CHARTS);
st->last_accessed_time_s = now_realtime_sec();
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrdset_metadata_updated(st);
// the chart will not get more updates (data collection)
// so, we have to push its definition now
@@ -668,12 +798,16 @@ inline void rrdset_is_obsolete(RRDSET *st) {
}
}
-inline void rrdset_isnot_obsolete(RRDSET *st) {
+inline void rrdset_isnot_obsolete___safe_from_collector_thread(RRDSET *st) {
if(unlikely((rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))) {
+
+// netdata_log_info("Clearing obsolete flag on chart 'host:%s/chart:%s'",
+// rrdhost_hostname(st->rrdhost), rrdset_id(st));
+
rrdset_flag_clear(st, RRDSET_FLAG_OBSOLETE);
st->last_accessed_time_s = now_realtime_sec();
- rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
+ rrdset_metadata_updated(st);
// the chart will be pushed upstream automatically
// due to data collection
@@ -1189,6 +1323,14 @@ void rrddim_store_metric_with_trace(RRDDIM *rd, usec_t point_end_time_ut, NETDAT
#else // !NETDATA_LOG_COLLECTION_ERRORS
void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) {
#endif // !NETDATA_LOG_COLLECTION_ERRORS
+
+ static __thread struct log_stack_entry lgs[] = {
+ [0] = ND_LOG_FIELD_STR(NDF_NIDL_DIMENSION, NULL),
+ [1] = ND_LOG_FIELD_END(),
+ };
+ lgs[0].str = rd->id;
+ log_stack_push(lgs);
+
#ifdef NETDATA_LOG_COLLECTION_ERRORS
rd->rrddim_store_metric_count++;
@@ -1250,6 +1392,7 @@ void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n,
}
rrdcontext_collected_rrddim(rd);
+ log_stack_pop(&lgs);
}
void store_metric_collection_completed() {
@@ -1528,12 +1671,12 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next)
if (unlikely(rrdset_flags & RRDSET_FLAG_OBSOLETE)) {
netdata_log_error("Chart '%s' has the OBSOLETE flag set, but it is collected.", rrdset_id(st));
- rrdset_isnot_obsolete(st);
+ rrdset_isnot_obsolete___safe_from_collector_thread(st);
}
// check if the chart has a long time to be updated
if(unlikely(st->usec_since_last_update > MAX(st->db.entries, 60) * update_every_ut)) {
- netdata_log_info("host '%s', chart '%s': took too long to be updated (counter #%u, update #%u, %0.3" NETDATA_DOUBLE_MODIFIER
+ nd_log_daemon(NDLP_DEBUG, "host '%s', chart '%s': took too long to be updated (counter #%u, update #%u, %0.3" NETDATA_DOUBLE_MODIFIER
" secs). Resetting it.", rrdhost_hostname(st->rrdhost), rrdset_id(st), st->counter, st->counter_done,
(NETDATA_DOUBLE)st->usec_since_last_update / USEC_PER_SEC);
rrdset_reset(st);
@@ -1675,7 +1818,7 @@ void rrdset_timed_done(RRDSET *st, struct timeval now, bool pending_rrdset_next)
if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE))) {
netdata_log_error("Dimension %s in chart '%s' has the OBSOLETE flag set, but it is collected.", rrddim_name(rd), rrdset_id(st));
- rrddim_isnot_obsolete(st, rd);
+ rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
}
}
}