diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:03 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:18 +0000 |
commit | 5da14042f70711ea5cf66e034699730335462f66 (patch) | |
tree | 0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/database/engine/metric.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz netdata-5da14042f70711ea5cf66e034699730335462f66.zip |
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/database/engine/metric.c')
-rw-r--r-- | src/database/engine/metric.c | 946 |
1 files changed, 946 insertions, 0 deletions
diff --git a/src/database/engine/metric.c b/src/database/engine/metric.c new file mode 100644 index 000000000..01eb22fbc --- /dev/null +++ b/src/database/engine/metric.c @@ -0,0 +1,946 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "metric.h" +#include "cache.h" +#include "libnetdata/locks/locks.h" +#include "rrddiskprotocol.h" + +typedef int32_t REFCOUNT; +#define REFCOUNT_DELETING (-100) + +struct metric { + uuid_t uuid; // never changes + Word_t section; // never changes + + time_t first_time_s; // the timestamp of the oldest point in the database + time_t latest_time_s_clean; // the timestamp of the newest point in the database + time_t latest_time_s_hot; // the timestamp of the latest point that has been collected (not yet stored) + uint32_t latest_update_every_s; // the latest data collection frequency + pid_t writer; + uint8_t partition; + REFCOUNT refcount; + + // THIS IS allocated with malloc() + // YOU HAVE TO INITIALIZE IT YOURSELF ! +}; + +#define set_metric_field_with_condition(field, value, condition) ({ \ + typeof(field) _current = __atomic_load_n(&(field), __ATOMIC_RELAXED); \ + typeof(field) _wanted = value; \ + bool did_it = true; \ + \ + do { \ + if((condition) && (_current != _wanted)) { \ + ; \ + } \ + else { \ + did_it = false; \ + break; \ + } \ + } while(!__atomic_compare_exchange_n(&(field), &_current, _wanted, \ + false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); \ + \ + did_it; \ +}) + +static struct aral_statistics mrg_aral_statistics; + +struct mrg { + size_t partitions; + + struct mrg_partition { + ARAL *aral; // not protected by our spinlock - it has its own + + RW_SPINLOCK rw_spinlock; + Pvoid_t uuid_judy; // JudyHS: each UUID has a JudyL of sections (tiers) + + struct mrg_statistics stats; + } index[]; +}; + +static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg, size_t partition) { + mrg->index[partition].stats.additions_duplicate++; +} + +static inline void MRG_STATS_ADDED_METRIC(MRG *mrg, size_t partition) { + mrg->index[partition].stats.entries++; + mrg->index[partition].stats.additions++; + mrg->index[partition].stats.size += sizeof(METRIC); +} + +static inline void MRG_STATS_DELETED_METRIC(MRG *mrg, size_t partition) { + mrg->index[partition].stats.entries--; + mrg->index[partition].stats.size -= sizeof(METRIC); + mrg->index[partition].stats.deletions++; +} + +static inline void MRG_STATS_SEARCH_HIT(MRG *mrg, size_t partition) { + __atomic_add_fetch(&mrg->index[partition].stats.search_hits, 1, __ATOMIC_RELAXED); +} + +static inline void MRG_STATS_SEARCH_MISS(MRG *mrg, size_t partition) { + __atomic_add_fetch(&mrg->index[partition].stats.search_misses, 1, __ATOMIC_RELAXED); +} + +static inline void MRG_STATS_DELETE_MISS(MRG *mrg, size_t partition) { + mrg->index[partition].stats.delete_misses++; +} + +#define mrg_index_read_lock(mrg, partition) rw_spinlock_read_lock(&(mrg)->index[partition].rw_spinlock) +#define mrg_index_read_unlock(mrg, partition) rw_spinlock_read_unlock(&(mrg)->index[partition].rw_spinlock) +#define mrg_index_write_lock(mrg, partition) rw_spinlock_write_lock(&(mrg)->index[partition].rw_spinlock) +#define mrg_index_write_unlock(mrg, partition) rw_spinlock_write_unlock(&(mrg)->index[partition].rw_spinlock) + +static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl, size_t partition) { + if(mem_after_judyl > mem_before_judyl) + __atomic_add_fetch(&mrg->index[partition].stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED); + else if(mem_after_judyl < mem_before_judyl) + __atomic_sub_fetch(&mrg->index[partition].stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED); +} + +static inline void mrg_stats_size_judyhs_added_uuid(MRG *mrg, size_t partition) { + __atomic_add_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED); +} + +static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg, size_t partition) { + __atomic_sub_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED); +} + +static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) { + uint8_t *u = (uint8_t *)uuid; + + size_t n; + memcpy(&n, &u[UUID_SZ - sizeof(size_t)], sizeof(size_t)); + + return n % mrg->partitions; +} + +static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused, METRIC *metric) { + time_t first_time_s = __atomic_load_n(&metric->first_time_s, __ATOMIC_RELAXED); + + if(first_time_s <= 0) { + first_time_s = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); + if(first_time_s <= 0) + first_time_s = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); + + if(first_time_s <= 0) + first_time_s = 0; + else + __atomic_store_n(&metric->first_time_s, first_time_s, __ATOMIC_RELAXED); + } + + return first_time_s; +} + +static void metric_log(MRG *mrg __maybe_unused, METRIC *metric, const char *msg) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)metric->section; + + char uuid[UUID_STR_LEN]; + uuid_unparse_lower(metric->uuid, uuid); + nd_log(NDLS_DAEMON, NDLP_ERR, + "METRIC: %s on %s at tier %d, refcount %d, partition %u, " + "retention [%ld - %ld (hot), %ld (clean)], update every %"PRIu32", " + "writer pid %d " + "--- PLEASE OPEN A GITHUB ISSUE TO REPORT THIS LOG LINE TO NETDATA --- ", + msg, + uuid, + ctx->config.tier, + metric->refcount, + metric->partition, + metric->first_time_s, + metric->latest_time_s_hot, + metric->latest_time_s_clean, + metric->latest_update_every_s, + (int)metric->writer + ); +} + +static inline bool acquired_metric_has_retention(MRG *mrg, METRIC *metric) { + time_t first, last; + mrg_metric_get_retention(mrg, metric, &first, &last, NULL); + return (!first || !last || first > last); +} + +static inline void acquired_for_deletion_metric_delete(MRG *mrg, METRIC *metric) { + size_t partition = metric->partition; + + size_t mem_before_judyl, mem_after_judyl; + + mrg_index_write_lock(mrg, partition); + + Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t)); + if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) { + MRG_STATS_DELETE_MISS(mrg, partition); + mrg_index_write_unlock(mrg, partition); + return; + } + + mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); + int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0); + mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); + mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); + + if(unlikely(!rc)) { + MRG_STATS_DELETE_MISS(mrg, partition); + mrg_index_write_unlock(mrg, partition); + return; + } + + if(!*sections_judy_pptr) { + rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0); + if(unlikely(!rc)) + fatal("DBENGINE METRIC: cannot delete UUID from JudyHS"); + mrg_stats_size_judyhs_removed_uuid(mrg, partition); + } + + MRG_STATS_DELETED_METRIC(mrg, partition); + + mrg_index_write_unlock(mrg, partition); + + aral_freez(mrg->index[partition].aral, metric); +} + +static inline bool metric_acquire(MRG *mrg, METRIC *metric) { + REFCOUNT expected, desired; + + expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); + + do { + if(unlikely(expected < 0)) + return false; + + desired = expected + 1; + + } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)); + + size_t partition = metric->partition; + + if(desired == 1) + __atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); + + __atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); + + return true; +} + +static inline bool metric_release(MRG *mrg, METRIC *metric, bool delete_if_last_without_retention) { + size_t partition = metric->partition; + REFCOUNT expected, desired; + + expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); + + do { + if(expected <= 0) { + metric_log(mrg, metric, "refcount is zero or negative during release"); + fatal("METRIC: refcount is %d (zero or negative) during release", expected); + } + + if(expected == 1 && delete_if_last_without_retention && !acquired_metric_has_retention(mrg, metric)) + desired = REFCOUNT_DELETING; + else + desired = expected - 1; + + } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); + + if(desired == 0 || desired == REFCOUNT_DELETING) { + __atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); + + if(desired == REFCOUNT_DELETING) + acquired_for_deletion_metric_delete(mrg, metric); + } + + __atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); + + return desired == REFCOUNT_DELETING; +} + +static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { + size_t partition = uuid_partition(mrg, entry->uuid); + + METRIC *allocation = aral_mallocz(mrg->index[partition].aral); + Pvoid_t *PValue; + + while(1) { + mrg_index_write_lock(mrg, partition); + + size_t mem_before_judyl, mem_after_judyl; + + Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, entry->uuid, sizeof(uuid_t), PJE0); + if (unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR)) + fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array"); + + if (unlikely(!*sections_judy_pptr)) + mrg_stats_size_judyhs_added_uuid(mrg, partition); + + mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); + PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0); + mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); + mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); + + if (unlikely(!PValue || PValue == PJERR)) + fatal("DBENGINE METRIC: corrupted section JudyL array"); + + if (unlikely(*PValue != NULL)) { + METRIC *metric = *PValue; + + if(!metric_acquire(mrg, metric)) { + mrg_index_write_unlock(mrg, partition); + continue; + } + + MRG_STATS_DUPLICATE_ADD(mrg, partition); + mrg_index_write_unlock(mrg, partition); + + if (ret) + *ret = false; + + aral_freez(mrg->index[partition].aral, allocation); + + return metric; + } + + break; + } + + METRIC *metric = allocation; + uuid_copy(metric->uuid, *entry->uuid); + metric->section = entry->section; + metric->first_time_s = MAX(0, entry->first_time_s); + metric->latest_time_s_clean = MAX(0, entry->last_time_s); + metric->latest_time_s_hot = 0; + metric->latest_update_every_s = entry->latest_update_every_s; + metric->writer = 0; + metric->refcount = 1; + metric->partition = partition; + *PValue = metric; + + MRG_STATS_ADDED_METRIC(mrg, partition); + + mrg_index_write_unlock(mrg, partition); + + if(ret) + *ret = true; + + return metric; +} + +static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { + size_t partition = uuid_partition(mrg, uuid); + + while(1) { + mrg_index_read_lock(mrg, partition); + + Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t)); + if (unlikely(!sections_judy_pptr)) { + mrg_index_read_unlock(mrg, partition); + MRG_STATS_SEARCH_MISS(mrg, partition); + return NULL; + } + + Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0); + if (unlikely(!PValue)) { + mrg_index_read_unlock(mrg, partition); + MRG_STATS_SEARCH_MISS(mrg, partition); + return NULL; + } + + METRIC *metric = *PValue; + + if(metric && !metric_acquire(mrg, metric)) + metric = NULL; + + mrg_index_read_unlock(mrg, partition); + + if(metric) { + MRG_STATS_SEARCH_HIT(mrg, partition); + return metric; + } + } +} + +// ---------------------------------------------------------------------------- +// public API + +inline MRG *mrg_create(ssize_t partitions) { + if(partitions < 1) + partitions = get_netdata_cpus(); + + MRG *mrg = callocz(1, sizeof(MRG) + sizeof(struct mrg_partition) * partitions); + mrg->partitions = partitions; + + for(size_t i = 0; i < mrg->partitions ; i++) { + rw_spinlock_init(&mrg->index[i].rw_spinlock); + + char buf[ARAL_MAX_NAME + 1]; + snprintfz(buf, ARAL_MAX_NAME, "mrg[%zu]", i); + + mrg->index[i].aral = aral_create(buf, sizeof(METRIC), 0, 16384, &mrg_aral_statistics, NULL, NULL, false, false); + } + + return mrg; +} + +inline size_t mrg_aral_structures(void) { + return aral_structures_from_stats(&mrg_aral_statistics); +} + +inline size_t mrg_aral_overhead(void) { + return aral_overhead_from_stats(&mrg_aral_statistics); +} + +inline void mrg_destroy(MRG *mrg __maybe_unused) { + // no destruction possible + // we can't traverse the metrics list + + // to delete entries, the caller needs to keep pointers to them + // and delete them one by one + + ; +} + +inline METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) { +// internal_fatal(entry.latest_time_s > max_acceptable_collected_time(), +// "DBENGINE METRIC: metric latest time is in the future"); + + return metric_add_and_acquire(mrg, &entry, ret); +} + +inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { + return metric_get_and_acquire(mrg, uuid, section); +} + +inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) { + return metric_release(mrg, metric, true); +} + +inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { + metric_acquire(mrg, metric); + return metric; +} + +inline void mrg_metric_release(MRG *mrg, METRIC *metric) { + metric_release(mrg, metric, false); +} + +inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) { + return (Word_t)metric; +} + +inline uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) { + return &metric->uuid; +} + +inline Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) { + return metric->section; +} + +inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { + internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative"); + + if(unlikely(first_time_s < 0)) + return false; + + __atomic_store_n(&metric->first_time_s, first_time_s, __ATOMIC_RELAXED); + + return true; +} + +inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, uint32_t update_every_s) { + internal_fatal(first_time_s < 0 || last_time_s < 0, + "DBENGINE METRIC: timestamp is negative"); + internal_fatal(first_time_s > max_acceptable_collected_time(), + "DBENGINE METRIC: metric first time is in the future"); + internal_fatal(last_time_s > max_acceptable_collected_time(), + "DBENGINE METRIC: metric last time is in the future"); + + if(first_time_s > 0) + set_metric_field_with_condition(metric->first_time_s, first_time_s, _current <= 0 || _wanted < _current); + + if(last_time_s > 0) { + if(set_metric_field_with_condition(metric->latest_time_s_clean, last_time_s, _current <= 0 || _wanted > _current) && + update_every_s > 0) + // set the latest update every too + set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true); + } + else if(update_every_s > 0) + // set it only if it is invalid + set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0); +} + +inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { + internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative"); + return set_metric_field_with_condition(metric->first_time_s, first_time_s, _wanted > _current); +} + +inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { + return mrg_metric_get_first_time_s_smart(mrg, metric); +} + +inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, uint32_t *update_every_s) { + time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); + time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); + + *last_time_s = MAX(clean, hot); + *first_time_s = mrg_metric_get_first_time_s_smart(mrg, metric); + if (update_every_s) + *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED); +} + +inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { + internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative"); + +// internal_fatal(latest_time_s > max_acceptable_collected_time(), +// "DBENGINE METRIC: metric latest time is in the future"); + +// internal_fatal(metric->latest_time_s_clean > latest_time_s, +// "DBENGINE METRIC: metric new clean latest time is older than the previous one"); + + if(latest_time_s > 0) { + if(set_metric_field_with_condition(metric->latest_time_s_clean, latest_time_s, true)) { + set_metric_field_with_condition(metric->first_time_s, latest_time_s, _current <= 0 || _wanted < _current); + + return true; + } + } + + return false; +} + +// returns true when metric still has retention +inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) { + Word_t section = mrg_metric_section(mrg, metric); + bool do_again = false; + size_t countdown = 5; + + do { + time_t min_first_time_s = LONG_MAX; + time_t max_end_time_s = 0; + PGC_PAGE *page; + PGC_SEARCH method = PGC_SEARCH_FIRST; + time_t page_first_time_s = 0; + time_t page_end_time_s = 0; + while ((page = pgc_page_get_and_acquire(main_cache, section, (Word_t)metric, page_first_time_s, method))) { + method = PGC_SEARCH_NEXT; + + bool is_hot = pgc_is_page_hot(page); + bool is_dirty = pgc_is_page_dirty(page); + page_first_time_s = pgc_page_start_time_s(page); + page_end_time_s = pgc_page_end_time_s(page); + + if ((is_hot || is_dirty) && page_first_time_s > 0 && page_first_time_s < min_first_time_s) + min_first_time_s = page_first_time_s; + + if (is_dirty && page_end_time_s > max_end_time_s) + max_end_time_s = page_end_time_s; + + pgc_page_release(main_cache, page); + } + + if (min_first_time_s == LONG_MAX) + min_first_time_s = 0; + + if (--countdown && !min_first_time_s && __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED)) + do_again = true; + else { + internal_error(!countdown, "METRIC: giving up on updating the retention of metric without disk retention"); + + do_again = false; + set_metric_field_with_condition(metric->first_time_s, min_first_time_s, true); + set_metric_field_with_condition(metric->latest_time_s_clean, max_end_time_s, true); + } + } while(do_again); + + time_t first, last; + mrg_metric_get_retention(mrg, metric, &first, &last, NULL); + return (first && last && first < last); +} + +inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { + internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative"); + +// internal_fatal(latest_time_s > max_acceptable_collected_time(), +// "DBENGINE METRIC: metric latest time is in the future"); + + if(likely(latest_time_s > 0)) { + __atomic_store_n(&metric->latest_time_s_hot, latest_time_s, __ATOMIC_RELAXED); + return true; + } + + return false; +} + +inline time_t mrg_metric_get_latest_clean_time_s(MRG *mrg __maybe_unused, METRIC *metric) { + time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); + return clean; +} + +inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) { + time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); + time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); + + return MAX(clean, hot); +} + +inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, uint32_t update_every_s) { + if(update_every_s > 0) + return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true); + + return false; +} + +inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, uint32_t update_every_s) { + if(update_every_s > 0) + return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0); + + return false; +} + +inline uint32_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) { + return __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED); +} + +inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) { + pid_t expected = __atomic_load_n(&metric->writer, __ATOMIC_RELAXED); + pid_t wanted = gettid(); + bool done = true; + + do { + if(expected != 0) { + done = false; + break; + } + } while(!__atomic_compare_exchange_n(&metric->writer, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + if(done) + __atomic_add_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED); + else + __atomic_add_fetch(&mrg->index[metric->partition].stats.writers_conflicts, 1, __ATOMIC_RELAXED); + + return done; +} + +inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) { + // this function can be called from a different thread than the one than the writer + + pid_t expected = __atomic_load_n(&metric->writer, __ATOMIC_RELAXED); + pid_t wanted = 0; + bool done = true; + + do { + if(!expected) { + done = false; + break; + } + } while(!__atomic_compare_exchange_n(&metric->writer, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + if(done) + __atomic_sub_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED); + + return done; +} + +inline void mrg_update_metric_retention_and_granularity_by_uuid( + MRG *mrg, Word_t section, uuid_t *uuid, + time_t first_time_s, time_t last_time_s, + uint32_t update_every_s, time_t now_s) +{ + if(unlikely(last_time_s > now_s)) { + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), " + "fixing last time to now", + first_time_s, last_time_s, now_s); + last_time_s = now_s; + } + + if (unlikely(first_time_s > last_time_s)) { + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), " + "fixing first time to last time", + first_time_s, last_time_s, now_s); + + first_time_s = last_time_s; + } + + if (unlikely(first_time_s == 0 || last_time_s == 0)) { + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), " + "using them as-is", + first_time_s, last_time_s, now_s); + } + + bool added = false; + METRIC *metric = mrg_metric_get_and_acquire(mrg, uuid, section); + if (!metric) { + MRG_ENTRY entry = { + .uuid = uuid, + .section = section, + .first_time_s = first_time_s, + .last_time_s = last_time_s, + .latest_update_every_s = update_every_s + }; + metric = mrg_metric_add_and_acquire(mrg, entry, &added); + } + + struct rrdengine_instance *ctx = (struct rrdengine_instance *) section; + if (likely(!added)) { + uint64_t old_samples = 0; + + if (update_every_s && metric->latest_update_every_s && metric->latest_time_s_clean) + old_samples = (metric->latest_time_s_clean - metric->first_time_s) / metric->latest_update_every_s; + + mrg_metric_expand_retention(mrg, metric, first_time_s, last_time_s, update_every_s); + + uint64_t new_samples = 0; + if (update_every_s && metric->latest_update_every_s && metric->latest_time_s_clean) + new_samples = (metric->latest_time_s_clean - metric->first_time_s) / metric->latest_update_every_s; + + __atomic_add_fetch(&ctx->atomic.samples, new_samples - old_samples, __ATOMIC_RELAXED); + } + else { + // Newly added + if (update_every_s) { + uint64_t samples = (last_time_s - first_time_s) / update_every_s; + __atomic_add_fetch(&ctx->atomic.samples, samples, __ATOMIC_RELAXED); + } + __atomic_add_fetch(&ctx->atomic.metrics, 1, __ATOMIC_RELAXED); + } + + mrg_metric_release(mrg, metric); +} + +inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) { + memset(s, 0, sizeof(struct mrg_statistics)); + + for(size_t i = 0; i < mrg->partitions ;i++) { + s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED); + s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED); + s->size += __atomic_load_n(&mrg->index[i].stats.size, __ATOMIC_RELAXED); + s->current_references += __atomic_load_n(&mrg->index[i].stats.current_references, __ATOMIC_RELAXED); + s->additions += __atomic_load_n(&mrg->index[i].stats.additions, __ATOMIC_RELAXED); + s->additions_duplicate += __atomic_load_n(&mrg->index[i].stats.additions_duplicate, __ATOMIC_RELAXED); + s->deletions += __atomic_load_n(&mrg->index[i].stats.deletions, __ATOMIC_RELAXED); + s->delete_having_retention_or_referenced += __atomic_load_n(&mrg->index[i].stats.delete_having_retention_or_referenced, __ATOMIC_RELAXED); + s->delete_misses += __atomic_load_n(&mrg->index[i].stats.delete_misses, __ATOMIC_RELAXED); + s->search_hits += __atomic_load_n(&mrg->index[i].stats.search_hits, __ATOMIC_RELAXED); + s->search_misses += __atomic_load_n(&mrg->index[i].stats.search_misses, __ATOMIC_RELAXED); + s->writers += __atomic_load_n(&mrg->index[i].stats.writers, __ATOMIC_RELAXED); + s->writers_conflicts += __atomic_load_n(&mrg->index[i].stats.writers_conflicts, __ATOMIC_RELAXED); + } + + s->size += sizeof(MRG) + sizeof(struct mrg_partition) * mrg->partitions; +} + +// ---------------------------------------------------------------------------- +// unit test + +struct mrg_stress_entry { + uuid_t uuid; + time_t after; + time_t before; +}; + +struct mrg_stress { + MRG *mrg; + bool stop; + size_t entries; + struct mrg_stress_entry *array; + size_t updates; +}; + +static void *mrg_stress(void *ptr) { + struct mrg_stress *t = ptr; + MRG *mrg = t->mrg; + + ssize_t start = 0; + ssize_t end = (ssize_t)t->entries; + ssize_t step = 1; + + if(gettid() % 2) { + start = (ssize_t)t->entries - 1; + end = -1; + step = -1; + } + + while(!__atomic_load_n(&t->stop, __ATOMIC_RELAXED)) { + for (ssize_t i = start; i != end; i += step) { + struct mrg_stress_entry *e = &t->array[i]; + + time_t after = __atomic_sub_fetch(&e->after, 1, __ATOMIC_RELAXED); + time_t before = __atomic_add_fetch(&e->before, 1, __ATOMIC_RELAXED); + + mrg_update_metric_retention_and_granularity_by_uuid( + mrg, 0x01, + &e->uuid, + after, + before, + 1, + before); + + __atomic_add_fetch(&t->updates, 1, __ATOMIC_RELAXED); + } + } + + return ptr; +} + +int mrg_unittest(void) { + MRG *mrg = mrg_create(0); + METRIC *m1_t0, *m2_t0, *m3_t0, *m4_t0; + METRIC *m1_t1, *m2_t1, *m3_t1, *m4_t1; + bool ret; + + uuid_t test_uuid; + uuid_generate(test_uuid); + MRG_ENTRY entry = { + .uuid = &test_uuid, + .section = 0, + .first_time_s = 2, + .last_time_s = 3, + .latest_update_every_s = 4, + }; + m1_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret); + if(!ret) + fatal("DBENGINE METRIC: failed to add metric"); + + // add the same metric again + m2_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret); + if(m2_t0 != m1_t0) + fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer"); + if(ret) + fatal("DBENGINE METRIC: managed to add the same metric twice"); + + m3_t0 = mrg_metric_get_and_acquire(mrg, entry.uuid, entry.section); + if(m3_t0 != m1_t0) + fatal("DBENGINE METRIC: cannot find the metric added"); + + // add the same metric again + m4_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret); + if(m4_t0 != m1_t0) + fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer"); + if(ret) + fatal("DBENGINE METRIC: managed to add the same metric twice"); + + // add the same metric in another section + entry.section = 1; + m1_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret); + if(!ret) + fatal("DBENGINE METRIC: failed to add metric in section %zu", (size_t)entry.section); + + // add the same metric again + m2_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret); + if(m2_t1 != m1_t1) + fatal("DBENGINE METRIC: adding the same metric twice (section %zu), does not return the same pointer", (size_t)entry.section); + if(ret) + fatal("DBENGINE METRIC: managed to add the same metric twice in (section 0)"); + + m3_t1 = mrg_metric_get_and_acquire(mrg, entry.uuid, entry.section); + if(m3_t1 != m1_t1) + fatal("DBENGINE METRIC: cannot find the metric added (section %zu)", (size_t)entry.section); + + // delete the first metric + mrg_metric_release(mrg, m2_t0); + mrg_metric_release(mrg, m3_t0); + mrg_metric_release(mrg, m4_t0); + mrg_metric_set_first_time_s(mrg, m1_t0, 0); + mrg_metric_set_clean_latest_time_s(mrg, m1_t0, 0); + mrg_metric_set_hot_latest_time_s(mrg, m1_t0, 0); + if(!mrg_metric_release_and_delete(mrg, m1_t0)) + fatal("DBENGINE METRIC: cannot delete the first metric"); + + m4_t1 = mrg_metric_get_and_acquire(mrg, entry.uuid, entry.section); + if(m4_t1 != m1_t1) + fatal("DBENGINE METRIC: cannot find the metric added (section %zu), after deleting the first one", (size_t)entry.section); + + // delete the second metric + mrg_metric_release(mrg, m2_t1); + mrg_metric_release(mrg, m3_t1); + mrg_metric_release(mrg, m4_t1); + mrg_metric_set_first_time_s(mrg, m1_t1, 0); + mrg_metric_set_clean_latest_time_s(mrg, m1_t1, 0); + mrg_metric_set_hot_latest_time_s(mrg, m1_t1, 0); + if(!mrg_metric_release_and_delete(mrg, m1_t1)) + fatal("DBENGINE METRIC: cannot delete the second metric"); + + struct mrg_statistics s; + mrg_get_statistics(mrg, &s); + if(s.entries != 0) + fatal("DBENGINE METRIC: invalid entries counter"); + + size_t entries = 1000000; + size_t threads = mrg->partitions / 3 + 1; + size_t tiers = 3; + size_t run_for_secs = 5; + netdata_log_info("preparing stress test of %zu entries...", entries); + struct mrg_stress t = { + .mrg = mrg, + .entries = entries, + .array = callocz(entries, sizeof(struct mrg_stress_entry)), + }; + + time_t now = max_acceptable_collected_time(); + for(size_t i = 0; i < entries ;i++) { + uuid_generate_random(t.array[i].uuid); + t.array[i].after = now / 3; + t.array[i].before = now / 2; + } + netdata_log_info("stress test is populating MRG with 3 tiers..."); + for(size_t i = 0; i < entries ;i++) { + struct mrg_stress_entry *e = &t.array[i]; + for(size_t tier = 1; tier <= tiers ;tier++) { + mrg_update_metric_retention_and_granularity_by_uuid( + mrg, tier, + &e->uuid, + e->after, + e->before, + 1, + e->before); + } + } + netdata_log_info("stress test ready to run..."); + + usec_t started_ut = now_monotonic_usec(); + + pthread_t th[threads]; + for(size_t i = 0; i < threads ; i++) { + char buf[15 + 1]; + snprintfz(buf, sizeof(buf) - 1, "TH[%zu]", i); + netdata_thread_create(&th[i], buf, + NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, + mrg_stress, &t); + } + + sleep_usec(run_for_secs * USEC_PER_SEC); + __atomic_store_n(&t.stop, true, __ATOMIC_RELAXED); + + for(size_t i = 0; i < threads ; i++) + netdata_thread_cancel(th[i]); + + for(size_t i = 0; i < threads ; i++) + netdata_thread_join(th[i], NULL); + + usec_t ended_ut = now_monotonic_usec(); + + struct mrg_statistics stats; + mrg_get_statistics(mrg, &stats); + + netdata_log_info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, " + "%zu deletions, %zu wrong deletions, " + "%zu successful searches, %zu wrong searches, " + "in %"PRIu64" usecs", + stats.additions, stats.additions_duplicate, + stats.deletions, stats.delete_misses, + stats.search_hits, stats.search_misses, + ended_ut - started_ut); + + netdata_log_info("DBENGINE METRIC: updates performance: %0.2fk/sec total, %0.2fk/sec/thread", + (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0, + (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0 / threads); + + mrg_destroy(mrg); + + netdata_log_info("DBENGINE METRIC: all tests passed!"); + + return 0; +} |