summaryrefslogtreecommitdiffstats
path: root/src/database/engine/metric.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:18 +0000
commit5da14042f70711ea5cf66e034699730335462f66 (patch)
tree0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/database/engine/metric.c
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz
netdata-5da14042f70711ea5cf66e034699730335462f66.zip
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/database/engine/metric.c')
-rw-r--r--src/database/engine/metric.c946
1 files changed, 946 insertions, 0 deletions
diff --git a/src/database/engine/metric.c b/src/database/engine/metric.c
new file mode 100644
index 000000000..01eb22fbc
--- /dev/null
+++ b/src/database/engine/metric.c
@@ -0,0 +1,946 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+#include "metric.h"
+#include "cache.h"
+#include "libnetdata/locks/locks.h"
+#include "rrddiskprotocol.h"
+
+typedef int32_t REFCOUNT;
+#define REFCOUNT_DELETING (-100)
+
+struct metric {
+ uuid_t uuid; // never changes
+ Word_t section; // never changes
+
+ time_t first_time_s; // the timestamp of the oldest point in the database
+ time_t latest_time_s_clean; // the timestamp of the newest point in the database
+ time_t latest_time_s_hot; // the timestamp of the latest point that has been collected (not yet stored)
+ uint32_t latest_update_every_s; // the latest data collection frequency
+ pid_t writer;
+ uint8_t partition;
+ REFCOUNT refcount;
+
+ // THIS IS allocated with malloc()
+ // YOU HAVE TO INITIALIZE IT YOURSELF !
+};
+
+#define set_metric_field_with_condition(field, value, condition) ({ \
+ typeof(field) _current = __atomic_load_n(&(field), __ATOMIC_RELAXED); \
+ typeof(field) _wanted = value; \
+ bool did_it = true; \
+ \
+ do { \
+ if((condition) && (_current != _wanted)) { \
+ ; \
+ } \
+ else { \
+ did_it = false; \
+ break; \
+ } \
+ } while(!__atomic_compare_exchange_n(&(field), &_current, _wanted, \
+ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); \
+ \
+ did_it; \
+})
+
+static struct aral_statistics mrg_aral_statistics;
+
+struct mrg {
+ size_t partitions;
+
+ struct mrg_partition {
+ ARAL *aral; // not protected by our spinlock - it has its own
+
+ RW_SPINLOCK rw_spinlock;
+ Pvoid_t uuid_judy; // JudyHS: each UUID has a JudyL of sections (tiers)
+
+ struct mrg_statistics stats;
+ } index[];
+};
+
+static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg, size_t partition) {
+ mrg->index[partition].stats.additions_duplicate++;
+}
+
+static inline void MRG_STATS_ADDED_METRIC(MRG *mrg, size_t partition) {
+ mrg->index[partition].stats.entries++;
+ mrg->index[partition].stats.additions++;
+ mrg->index[partition].stats.size += sizeof(METRIC);
+}
+
+static inline void MRG_STATS_DELETED_METRIC(MRG *mrg, size_t partition) {
+ mrg->index[partition].stats.entries--;
+ mrg->index[partition].stats.size -= sizeof(METRIC);
+ mrg->index[partition].stats.deletions++;
+}
+
+static inline void MRG_STATS_SEARCH_HIT(MRG *mrg, size_t partition) {
+ __atomic_add_fetch(&mrg->index[partition].stats.search_hits, 1, __ATOMIC_RELAXED);
+}
+
+static inline void MRG_STATS_SEARCH_MISS(MRG *mrg, size_t partition) {
+ __atomic_add_fetch(&mrg->index[partition].stats.search_misses, 1, __ATOMIC_RELAXED);
+}
+
+static inline void MRG_STATS_DELETE_MISS(MRG *mrg, size_t partition) {
+ mrg->index[partition].stats.delete_misses++;
+}
+
+#define mrg_index_read_lock(mrg, partition) rw_spinlock_read_lock(&(mrg)->index[partition].rw_spinlock)
+#define mrg_index_read_unlock(mrg, partition) rw_spinlock_read_unlock(&(mrg)->index[partition].rw_spinlock)
+#define mrg_index_write_lock(mrg, partition) rw_spinlock_write_lock(&(mrg)->index[partition].rw_spinlock)
+#define mrg_index_write_unlock(mrg, partition) rw_spinlock_write_unlock(&(mrg)->index[partition].rw_spinlock)
+
+static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl, size_t partition) {
+ if(mem_after_judyl > mem_before_judyl)
+ __atomic_add_fetch(&mrg->index[partition].stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED);
+ else if(mem_after_judyl < mem_before_judyl)
+ __atomic_sub_fetch(&mrg->index[partition].stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED);
+}
+
+static inline void mrg_stats_size_judyhs_added_uuid(MRG *mrg, size_t partition) {
+ __atomic_add_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
+}
+
+static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg, size_t partition) {
+ __atomic_sub_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
+}
+
+static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) {
+ uint8_t *u = (uint8_t *)uuid;
+
+ size_t n;
+ memcpy(&n, &u[UUID_SZ - sizeof(size_t)], sizeof(size_t));
+
+ return n % mrg->partitions;
+}
+
+static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused, METRIC *metric) {
+ time_t first_time_s = __atomic_load_n(&metric->first_time_s, __ATOMIC_RELAXED);
+
+ if(first_time_s <= 0) {
+ first_time_s = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
+ if(first_time_s <= 0)
+ first_time_s = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
+
+ if(first_time_s <= 0)
+ first_time_s = 0;
+ else
+ __atomic_store_n(&metric->first_time_s, first_time_s, __ATOMIC_RELAXED);
+ }
+
+ return first_time_s;
+}
+
+static void metric_log(MRG *mrg __maybe_unused, METRIC *metric, const char *msg) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)metric->section;
+
+ char uuid[UUID_STR_LEN];
+ uuid_unparse_lower(metric->uuid, uuid);
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "METRIC: %s on %s at tier %d, refcount %d, partition %u, "
+ "retention [%ld - %ld (hot), %ld (clean)], update every %"PRIu32", "
+ "writer pid %d "
+ "--- PLEASE OPEN A GITHUB ISSUE TO REPORT THIS LOG LINE TO NETDATA --- ",
+ msg,
+ uuid,
+ ctx->config.tier,
+ metric->refcount,
+ metric->partition,
+ metric->first_time_s,
+ metric->latest_time_s_hot,
+ metric->latest_time_s_clean,
+ metric->latest_update_every_s,
+ (int)metric->writer
+ );
+}
+
+static inline bool acquired_metric_has_retention(MRG *mrg, METRIC *metric) {
+ time_t first, last;
+ mrg_metric_get_retention(mrg, metric, &first, &last, NULL);
+ return (!first || !last || first > last);
+}
+
+static inline void acquired_for_deletion_metric_delete(MRG *mrg, METRIC *metric) {
+ size_t partition = metric->partition;
+
+ size_t mem_before_judyl, mem_after_judyl;
+
+ mrg_index_write_lock(mrg, partition);
+
+ Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t));
+ if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) {
+ MRG_STATS_DELETE_MISS(mrg, partition);
+ mrg_index_write_unlock(mrg, partition);
+ return;
+ }
+
+ mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
+ int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0);
+ mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
+ mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
+
+ if(unlikely(!rc)) {
+ MRG_STATS_DELETE_MISS(mrg, partition);
+ mrg_index_write_unlock(mrg, partition);
+ return;
+ }
+
+ if(!*sections_judy_pptr) {
+ rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0);
+ if(unlikely(!rc))
+ fatal("DBENGINE METRIC: cannot delete UUID from JudyHS");
+ mrg_stats_size_judyhs_removed_uuid(mrg, partition);
+ }
+
+ MRG_STATS_DELETED_METRIC(mrg, partition);
+
+ mrg_index_write_unlock(mrg, partition);
+
+ aral_freez(mrg->index[partition].aral, metric);
+}
+
+static inline bool metric_acquire(MRG *mrg, METRIC *metric) {
+ REFCOUNT expected, desired;
+
+ expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
+
+ do {
+ if(unlikely(expected < 0))
+ return false;
+
+ desired = expected + 1;
+
+ } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED));
+
+ size_t partition = metric->partition;
+
+ if(desired == 1)
+ __atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
+
+ __atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
+
+ return true;
+}
+
+static inline bool metric_release(MRG *mrg, METRIC *metric, bool delete_if_last_without_retention) {
+ size_t partition = metric->partition;
+ REFCOUNT expected, desired;
+
+ expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
+
+ do {
+ if(expected <= 0) {
+ metric_log(mrg, metric, "refcount is zero or negative during release");
+ fatal("METRIC: refcount is %d (zero or negative) during release", expected);
+ }
+
+ if(expected == 1 && delete_if_last_without_retention && !acquired_metric_has_retention(mrg, metric))
+ desired = REFCOUNT_DELETING;
+ else
+ desired = expected - 1;
+
+ } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED));
+
+ if(desired == 0 || desired == REFCOUNT_DELETING) {
+ __atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
+
+ if(desired == REFCOUNT_DELETING)
+ acquired_for_deletion_metric_delete(mrg, metric);
+ }
+
+ __atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
+
+ return desired == REFCOUNT_DELETING;
+}
+
+static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
+ size_t partition = uuid_partition(mrg, entry->uuid);
+
+ METRIC *allocation = aral_mallocz(mrg->index[partition].aral);
+ Pvoid_t *PValue;
+
+ while(1) {
+ mrg_index_write_lock(mrg, partition);
+
+ size_t mem_before_judyl, mem_after_judyl;
+
+ Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, entry->uuid, sizeof(uuid_t), PJE0);
+ if (unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR))
+ fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array");
+
+ if (unlikely(!*sections_judy_pptr))
+ mrg_stats_size_judyhs_added_uuid(mrg, partition);
+
+ mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
+ PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0);
+ mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
+ mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
+
+ if (unlikely(!PValue || PValue == PJERR))
+ fatal("DBENGINE METRIC: corrupted section JudyL array");
+
+ if (unlikely(*PValue != NULL)) {
+ METRIC *metric = *PValue;
+
+ if(!metric_acquire(mrg, metric)) {
+ mrg_index_write_unlock(mrg, partition);
+ continue;
+ }
+
+ MRG_STATS_DUPLICATE_ADD(mrg, partition);
+ mrg_index_write_unlock(mrg, partition);
+
+ if (ret)
+ *ret = false;
+
+ aral_freez(mrg->index[partition].aral, allocation);
+
+ return metric;
+ }
+
+ break;
+ }
+
+ METRIC *metric = allocation;
+ uuid_copy(metric->uuid, *entry->uuid);
+ metric->section = entry->section;
+ metric->first_time_s = MAX(0, entry->first_time_s);
+ metric->latest_time_s_clean = MAX(0, entry->last_time_s);
+ metric->latest_time_s_hot = 0;
+ metric->latest_update_every_s = entry->latest_update_every_s;
+ metric->writer = 0;
+ metric->refcount = 1;
+ metric->partition = partition;
+ *PValue = metric;
+
+ MRG_STATS_ADDED_METRIC(mrg, partition);
+
+ mrg_index_write_unlock(mrg, partition);
+
+ if(ret)
+ *ret = true;
+
+ return metric;
+}
+
+static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
+ size_t partition = uuid_partition(mrg, uuid);
+
+ while(1) {
+ mrg_index_read_lock(mrg, partition);
+
+ Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t));
+ if (unlikely(!sections_judy_pptr)) {
+ mrg_index_read_unlock(mrg, partition);
+ MRG_STATS_SEARCH_MISS(mrg, partition);
+ return NULL;
+ }
+
+ Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0);
+ if (unlikely(!PValue)) {
+ mrg_index_read_unlock(mrg, partition);
+ MRG_STATS_SEARCH_MISS(mrg, partition);
+ return NULL;
+ }
+
+ METRIC *metric = *PValue;
+
+ if(metric && !metric_acquire(mrg, metric))
+ metric = NULL;
+
+ mrg_index_read_unlock(mrg, partition);
+
+ if(metric) {
+ MRG_STATS_SEARCH_HIT(mrg, partition);
+ return metric;
+ }
+ }
+}
+
+// ----------------------------------------------------------------------------
+// public API
+
+inline MRG *mrg_create(ssize_t partitions) {
+ if(partitions < 1)
+ partitions = get_netdata_cpus();
+
+ MRG *mrg = callocz(1, sizeof(MRG) + sizeof(struct mrg_partition) * partitions);
+ mrg->partitions = partitions;
+
+ for(size_t i = 0; i < mrg->partitions ; i++) {
+ rw_spinlock_init(&mrg->index[i].rw_spinlock);
+
+ char buf[ARAL_MAX_NAME + 1];
+ snprintfz(buf, ARAL_MAX_NAME, "mrg[%zu]", i);
+
+ mrg->index[i].aral = aral_create(buf, sizeof(METRIC), 0, 16384, &mrg_aral_statistics, NULL, NULL, false, false);
+ }
+
+ return mrg;
+}
+
+inline size_t mrg_aral_structures(void) {
+ return aral_structures_from_stats(&mrg_aral_statistics);
+}
+
+inline size_t mrg_aral_overhead(void) {
+ return aral_overhead_from_stats(&mrg_aral_statistics);
+}
+
+inline void mrg_destroy(MRG *mrg __maybe_unused) {
+ // no destruction possible
+ // we can't traverse the metrics list
+
+ // to delete entries, the caller needs to keep pointers to them
+ // and delete them one by one
+
+ ;
+}
+
+inline METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) {
+// internal_fatal(entry.latest_time_s > max_acceptable_collected_time(),
+// "DBENGINE METRIC: metric latest time is in the future");
+
+ return metric_add_and_acquire(mrg, &entry, ret);
+}
+
+inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
+ return metric_get_and_acquire(mrg, uuid, section);
+}
+
+inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
+ return metric_release(mrg, metric, true);
+}
+
+inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
+ metric_acquire(mrg, metric);
+ return metric;
+}
+
+inline void mrg_metric_release(MRG *mrg, METRIC *metric) {
+ metric_release(mrg, metric, false);
+}
+
+inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) {
+ return (Word_t)metric;
+}
+
+inline uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) {
+ return &metric->uuid;
+}
+
+inline Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) {
+ return metric->section;
+}
+
+inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
+ internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative");
+
+ if(unlikely(first_time_s < 0))
+ return false;
+
+ __atomic_store_n(&metric->first_time_s, first_time_s, __ATOMIC_RELAXED);
+
+ return true;
+}
+
+inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, uint32_t update_every_s) {
+ internal_fatal(first_time_s < 0 || last_time_s < 0,
+ "DBENGINE METRIC: timestamp is negative");
+ internal_fatal(first_time_s > max_acceptable_collected_time(),
+ "DBENGINE METRIC: metric first time is in the future");
+ internal_fatal(last_time_s > max_acceptable_collected_time(),
+ "DBENGINE METRIC: metric last time is in the future");
+
+ if(first_time_s > 0)
+ set_metric_field_with_condition(metric->first_time_s, first_time_s, _current <= 0 || _wanted < _current);
+
+ if(last_time_s > 0) {
+ if(set_metric_field_with_condition(metric->latest_time_s_clean, last_time_s, _current <= 0 || _wanted > _current) &&
+ update_every_s > 0)
+ // set the latest update every too
+ set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true);
+ }
+ else if(update_every_s > 0)
+ // set it only if it is invalid
+ set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0);
+}
+
+inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
+ internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative");
+ return set_metric_field_with_condition(metric->first_time_s, first_time_s, _wanted > _current);
+}
+
+inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
+ return mrg_metric_get_first_time_s_smart(mrg, metric);
+}
+
+inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, uint32_t *update_every_s) {
+ time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
+ time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
+
+ *last_time_s = MAX(clean, hot);
+ *first_time_s = mrg_metric_get_first_time_s_smart(mrg, metric);
+ if (update_every_s)
+ *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
+}
+
+inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
+ internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative");
+
+// internal_fatal(latest_time_s > max_acceptable_collected_time(),
+// "DBENGINE METRIC: metric latest time is in the future");
+
+// internal_fatal(metric->latest_time_s_clean > latest_time_s,
+// "DBENGINE METRIC: metric new clean latest time is older than the previous one");
+
+ if(latest_time_s > 0) {
+ if(set_metric_field_with_condition(metric->latest_time_s_clean, latest_time_s, true)) {
+ set_metric_field_with_condition(metric->first_time_s, latest_time_s, _current <= 0 || _wanted < _current);
+
+ return true;
+ }
+ }
+
+ return false;
+}
+
+// returns true when metric still has retention
+inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
+ Word_t section = mrg_metric_section(mrg, metric);
+ bool do_again = false;
+ size_t countdown = 5;
+
+ do {
+ time_t min_first_time_s = LONG_MAX;
+ time_t max_end_time_s = 0;
+ PGC_PAGE *page;
+ PGC_SEARCH method = PGC_SEARCH_FIRST;
+ time_t page_first_time_s = 0;
+ time_t page_end_time_s = 0;
+ while ((page = pgc_page_get_and_acquire(main_cache, section, (Word_t)metric, page_first_time_s, method))) {
+ method = PGC_SEARCH_NEXT;
+
+ bool is_hot = pgc_is_page_hot(page);
+ bool is_dirty = pgc_is_page_dirty(page);
+ page_first_time_s = pgc_page_start_time_s(page);
+ page_end_time_s = pgc_page_end_time_s(page);
+
+ if ((is_hot || is_dirty) && page_first_time_s > 0 && page_first_time_s < min_first_time_s)
+ min_first_time_s = page_first_time_s;
+
+ if (is_dirty && page_end_time_s > max_end_time_s)
+ max_end_time_s = page_end_time_s;
+
+ pgc_page_release(main_cache, page);
+ }
+
+ if (min_first_time_s == LONG_MAX)
+ min_first_time_s = 0;
+
+ if (--countdown && !min_first_time_s && __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED))
+ do_again = true;
+ else {
+ internal_error(!countdown, "METRIC: giving up on updating the retention of metric without disk retention");
+
+ do_again = false;
+ set_metric_field_with_condition(metric->first_time_s, min_first_time_s, true);
+ set_metric_field_with_condition(metric->latest_time_s_clean, max_end_time_s, true);
+ }
+ } while(do_again);
+
+ time_t first, last;
+ mrg_metric_get_retention(mrg, metric, &first, &last, NULL);
+ return (first && last && first < last);
+}
+
+inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
+ internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative");
+
+// internal_fatal(latest_time_s > max_acceptable_collected_time(),
+// "DBENGINE METRIC: metric latest time is in the future");
+
+ if(likely(latest_time_s > 0)) {
+ __atomic_store_n(&metric->latest_time_s_hot, latest_time_s, __ATOMIC_RELAXED);
+ return true;
+ }
+
+ return false;
+}
+
+inline time_t mrg_metric_get_latest_clean_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
+ time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
+ return clean;
+}
+
+inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
+ time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
+ time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
+
+ return MAX(clean, hot);
+}
+
+inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, uint32_t update_every_s) {
+ if(update_every_s > 0)
+ return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true);
+
+ return false;
+}
+
+inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, uint32_t update_every_s) {
+ if(update_every_s > 0)
+ return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0);
+
+ return false;
+}
+
+inline uint32_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
+ return __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
+}
+
+inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
+ pid_t expected = __atomic_load_n(&metric->writer, __ATOMIC_RELAXED);
+ pid_t wanted = gettid();
+ bool done = true;
+
+ do {
+ if(expected != 0) {
+ done = false;
+ break;
+ }
+ } while(!__atomic_compare_exchange_n(&metric->writer, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+
+ if(done)
+ __atomic_add_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED);
+ else
+ __atomic_add_fetch(&mrg->index[metric->partition].stats.writers_conflicts, 1, __ATOMIC_RELAXED);
+
+ return done;
+}
+
+inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
+ // this function can be called from a different thread than the one than the writer
+
+ pid_t expected = __atomic_load_n(&metric->writer, __ATOMIC_RELAXED);
+ pid_t wanted = 0;
+ bool done = true;
+
+ do {
+ if(!expected) {
+ done = false;
+ break;
+ }
+ } while(!__atomic_compare_exchange_n(&metric->writer, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+
+ if(done)
+ __atomic_sub_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED);
+
+ return done;
+}
+
+inline void mrg_update_metric_retention_and_granularity_by_uuid(
+ MRG *mrg, Word_t section, uuid_t *uuid,
+ time_t first_time_s, time_t last_time_s,
+ uint32_t update_every_s, time_t now_s)
+{
+ if(unlikely(last_time_s > now_s)) {
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
+ "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
+ "fixing last time to now",
+ first_time_s, last_time_s, now_s);
+ last_time_s = now_s;
+ }
+
+ if (unlikely(first_time_s > last_time_s)) {
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
+ "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
+ "fixing first time to last time",
+ first_time_s, last_time_s, now_s);
+
+ first_time_s = last_time_s;
+ }
+
+ if (unlikely(first_time_s == 0 || last_time_s == 0)) {
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
+ "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
+ "using them as-is",
+ first_time_s, last_time_s, now_s);
+ }
+
+ bool added = false;
+ METRIC *metric = mrg_metric_get_and_acquire(mrg, uuid, section);
+ if (!metric) {
+ MRG_ENTRY entry = {
+ .uuid = uuid,
+ .section = section,
+ .first_time_s = first_time_s,
+ .last_time_s = last_time_s,
+ .latest_update_every_s = update_every_s
+ };
+ metric = mrg_metric_add_and_acquire(mrg, entry, &added);
+ }
+
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *) section;
+ if (likely(!added)) {
+ uint64_t old_samples = 0;
+
+ if (update_every_s && metric->latest_update_every_s && metric->latest_time_s_clean)
+ old_samples = (metric->latest_time_s_clean - metric->first_time_s) / metric->latest_update_every_s;
+
+ mrg_metric_expand_retention(mrg, metric, first_time_s, last_time_s, update_every_s);
+
+ uint64_t new_samples = 0;
+ if (update_every_s && metric->latest_update_every_s && metric->latest_time_s_clean)
+ new_samples = (metric->latest_time_s_clean - metric->first_time_s) / metric->latest_update_every_s;
+
+ __atomic_add_fetch(&ctx->atomic.samples, new_samples - old_samples, __ATOMIC_RELAXED);
+ }
+ else {
+ // Newly added
+ if (update_every_s) {
+ uint64_t samples = (last_time_s - first_time_s) / update_every_s;
+ __atomic_add_fetch(&ctx->atomic.samples, samples, __ATOMIC_RELAXED);
+ }
+ __atomic_add_fetch(&ctx->atomic.metrics, 1, __ATOMIC_RELAXED);
+ }
+
+ mrg_metric_release(mrg, metric);
+}
+
+inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) {
+ memset(s, 0, sizeof(struct mrg_statistics));
+
+ for(size_t i = 0; i < mrg->partitions ;i++) {
+ s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED);
+ s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED);
+ s->size += __atomic_load_n(&mrg->index[i].stats.size, __ATOMIC_RELAXED);
+ s->current_references += __atomic_load_n(&mrg->index[i].stats.current_references, __ATOMIC_RELAXED);
+ s->additions += __atomic_load_n(&mrg->index[i].stats.additions, __ATOMIC_RELAXED);
+ s->additions_duplicate += __atomic_load_n(&mrg->index[i].stats.additions_duplicate, __ATOMIC_RELAXED);
+ s->deletions += __atomic_load_n(&mrg->index[i].stats.deletions, __ATOMIC_RELAXED);
+ s->delete_having_retention_or_referenced += __atomic_load_n(&mrg->index[i].stats.delete_having_retention_or_referenced, __ATOMIC_RELAXED);
+ s->delete_misses += __atomic_load_n(&mrg->index[i].stats.delete_misses, __ATOMIC_RELAXED);
+ s->search_hits += __atomic_load_n(&mrg->index[i].stats.search_hits, __ATOMIC_RELAXED);
+ s->search_misses += __atomic_load_n(&mrg->index[i].stats.search_misses, __ATOMIC_RELAXED);
+ s->writers += __atomic_load_n(&mrg->index[i].stats.writers, __ATOMIC_RELAXED);
+ s->writers_conflicts += __atomic_load_n(&mrg->index[i].stats.writers_conflicts, __ATOMIC_RELAXED);
+ }
+
+ s->size += sizeof(MRG) + sizeof(struct mrg_partition) * mrg->partitions;
+}
+
+// ----------------------------------------------------------------------------
+// unit test
+
+struct mrg_stress_entry {
+ uuid_t uuid;
+ time_t after;
+ time_t before;
+};
+
+struct mrg_stress {
+ MRG *mrg;
+ bool stop;
+ size_t entries;
+ struct mrg_stress_entry *array;
+ size_t updates;
+};
+
+static void *mrg_stress(void *ptr) {
+ struct mrg_stress *t = ptr;
+ MRG *mrg = t->mrg;
+
+ ssize_t start = 0;
+ ssize_t end = (ssize_t)t->entries;
+ ssize_t step = 1;
+
+ if(gettid() % 2) {
+ start = (ssize_t)t->entries - 1;
+ end = -1;
+ step = -1;
+ }
+
+ while(!__atomic_load_n(&t->stop, __ATOMIC_RELAXED)) {
+ for (ssize_t i = start; i != end; i += step) {
+ struct mrg_stress_entry *e = &t->array[i];
+
+ time_t after = __atomic_sub_fetch(&e->after, 1, __ATOMIC_RELAXED);
+ time_t before = __atomic_add_fetch(&e->before, 1, __ATOMIC_RELAXED);
+
+ mrg_update_metric_retention_and_granularity_by_uuid(
+ mrg, 0x01,
+ &e->uuid,
+ after,
+ before,
+ 1,
+ before);
+
+ __atomic_add_fetch(&t->updates, 1, __ATOMIC_RELAXED);
+ }
+ }
+
+ return ptr;
+}
+
+int mrg_unittest(void) {
+ MRG *mrg = mrg_create(0);
+ METRIC *m1_t0, *m2_t0, *m3_t0, *m4_t0;
+ METRIC *m1_t1, *m2_t1, *m3_t1, *m4_t1;
+ bool ret;
+
+ uuid_t test_uuid;
+ uuid_generate(test_uuid);
+ MRG_ENTRY entry = {
+ .uuid = &test_uuid,
+ .section = 0,
+ .first_time_s = 2,
+ .last_time_s = 3,
+ .latest_update_every_s = 4,
+ };
+ m1_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret);
+ if(!ret)
+ fatal("DBENGINE METRIC: failed to add metric");
+
+ // add the same metric again
+ m2_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret);
+ if(m2_t0 != m1_t0)
+ fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer");
+ if(ret)
+ fatal("DBENGINE METRIC: managed to add the same metric twice");
+
+ m3_t0 = mrg_metric_get_and_acquire(mrg, entry.uuid, entry.section);
+ if(m3_t0 != m1_t0)
+ fatal("DBENGINE METRIC: cannot find the metric added");
+
+ // add the same metric again
+ m4_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret);
+ if(m4_t0 != m1_t0)
+ fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer");
+ if(ret)
+ fatal("DBENGINE METRIC: managed to add the same metric twice");
+
+ // add the same metric in another section
+ entry.section = 1;
+ m1_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret);
+ if(!ret)
+ fatal("DBENGINE METRIC: failed to add metric in section %zu", (size_t)entry.section);
+
+ // add the same metric again
+ m2_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret);
+ if(m2_t1 != m1_t1)
+ fatal("DBENGINE METRIC: adding the same metric twice (section %zu), does not return the same pointer", (size_t)entry.section);
+ if(ret)
+ fatal("DBENGINE METRIC: managed to add the same metric twice in (section 0)");
+
+ m3_t1 = mrg_metric_get_and_acquire(mrg, entry.uuid, entry.section);
+ if(m3_t1 != m1_t1)
+ fatal("DBENGINE METRIC: cannot find the metric added (section %zu)", (size_t)entry.section);
+
+ // delete the first metric
+ mrg_metric_release(mrg, m2_t0);
+ mrg_metric_release(mrg, m3_t0);
+ mrg_metric_release(mrg, m4_t0);
+ mrg_metric_set_first_time_s(mrg, m1_t0, 0);
+ mrg_metric_set_clean_latest_time_s(mrg, m1_t0, 0);
+ mrg_metric_set_hot_latest_time_s(mrg, m1_t0, 0);
+ if(!mrg_metric_release_and_delete(mrg, m1_t0))
+ fatal("DBENGINE METRIC: cannot delete the first metric");
+
+ m4_t1 = mrg_metric_get_and_acquire(mrg, entry.uuid, entry.section);
+ if(m4_t1 != m1_t1)
+ fatal("DBENGINE METRIC: cannot find the metric added (section %zu), after deleting the first one", (size_t)entry.section);
+
+ // delete the second metric
+ mrg_metric_release(mrg, m2_t1);
+ mrg_metric_release(mrg, m3_t1);
+ mrg_metric_release(mrg, m4_t1);
+ mrg_metric_set_first_time_s(mrg, m1_t1, 0);
+ mrg_metric_set_clean_latest_time_s(mrg, m1_t1, 0);
+ mrg_metric_set_hot_latest_time_s(mrg, m1_t1, 0);
+ if(!mrg_metric_release_and_delete(mrg, m1_t1))
+ fatal("DBENGINE METRIC: cannot delete the second metric");
+
+ struct mrg_statistics s;
+ mrg_get_statistics(mrg, &s);
+ if(s.entries != 0)
+ fatal("DBENGINE METRIC: invalid entries counter");
+
+ size_t entries = 1000000;
+ size_t threads = mrg->partitions / 3 + 1;
+ size_t tiers = 3;
+ size_t run_for_secs = 5;
+ netdata_log_info("preparing stress test of %zu entries...", entries);
+ struct mrg_stress t = {
+ .mrg = mrg,
+ .entries = entries,
+ .array = callocz(entries, sizeof(struct mrg_stress_entry)),
+ };
+
+ time_t now = max_acceptable_collected_time();
+ for(size_t i = 0; i < entries ;i++) {
+ uuid_generate_random(t.array[i].uuid);
+ t.array[i].after = now / 3;
+ t.array[i].before = now / 2;
+ }
+ netdata_log_info("stress test is populating MRG with 3 tiers...");
+ for(size_t i = 0; i < entries ;i++) {
+ struct mrg_stress_entry *e = &t.array[i];
+ for(size_t tier = 1; tier <= tiers ;tier++) {
+ mrg_update_metric_retention_and_granularity_by_uuid(
+ mrg, tier,
+ &e->uuid,
+ e->after,
+ e->before,
+ 1,
+ e->before);
+ }
+ }
+ netdata_log_info("stress test ready to run...");
+
+ usec_t started_ut = now_monotonic_usec();
+
+ pthread_t th[threads];
+ for(size_t i = 0; i < threads ; i++) {
+ char buf[15 + 1];
+ snprintfz(buf, sizeof(buf) - 1, "TH[%zu]", i);
+ netdata_thread_create(&th[i], buf,
+ NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
+ mrg_stress, &t);
+ }
+
+ sleep_usec(run_for_secs * USEC_PER_SEC);
+ __atomic_store_n(&t.stop, true, __ATOMIC_RELAXED);
+
+ for(size_t i = 0; i < threads ; i++)
+ netdata_thread_cancel(th[i]);
+
+ for(size_t i = 0; i < threads ; i++)
+ netdata_thread_join(th[i], NULL);
+
+ usec_t ended_ut = now_monotonic_usec();
+
+ struct mrg_statistics stats;
+ mrg_get_statistics(mrg, &stats);
+
+ netdata_log_info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, "
+ "%zu deletions, %zu wrong deletions, "
+ "%zu successful searches, %zu wrong searches, "
+ "in %"PRIu64" usecs",
+ stats.additions, stats.additions_duplicate,
+ stats.deletions, stats.delete_misses,
+ stats.search_hits, stats.search_misses,
+ ended_ut - started_ut);
+
+ netdata_log_info("DBENGINE METRIC: updates performance: %0.2fk/sec total, %0.2fk/sec/thread",
+ (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0,
+ (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0 / threads);
+
+ mrg_destroy(mrg);
+
+ netdata_log_info("DBENGINE METRIC: all tests passed!");
+
+ return 0;
+}