diff options
Diffstat (limited to 'database/engine/metric.c')
-rw-r--r-- | database/engine/metric.c | 873 |
1 files changed, 0 insertions, 873 deletions
diff --git a/database/engine/metric.c b/database/engine/metric.c deleted file mode 100644 index 2e132612e..000000000 --- a/database/engine/metric.c +++ /dev/null @@ -1,873 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -#include "metric.h" - -typedef int32_t REFCOUNT; -#define REFCOUNT_DELETING (-100) - -struct metric { - uuid_t uuid; // never changes - Word_t section; // never changes - - time_t first_time_s; // the timestamp of the oldest point in the database - time_t latest_time_s_clean; // the timestamp of the newest point in the database - time_t latest_time_s_hot; // the timestamp of the latest point that has been collected (not yet stored) - uint32_t latest_update_every_s; // the latest data collection frequency - pid_t writer; - uint8_t partition; - REFCOUNT refcount; - - // THIS IS allocated with malloc() - // YOU HAVE TO INITIALIZE IT YOURSELF ! -}; - -#define set_metric_field_with_condition(field, value, condition) ({ \ - typeof(field) _current = __atomic_load_n(&(field), __ATOMIC_RELAXED); \ - typeof(field) _wanted = value; \ - bool did_it = true; \ - \ - do { \ - if((condition) && (_current != _wanted)) { \ - ; \ - } \ - else { \ - did_it = false; \ - break; \ - } \ - } while(!__atomic_compare_exchange_n(&(field), &_current, _wanted, \ - false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); \ - \ - did_it; \ -}) - -static struct aral_statistics mrg_aral_statistics; - -struct mrg { - size_t partitions; - - struct mrg_partition { - ARAL *aral; // not protected by our spinlock - it has its own - - RW_SPINLOCK rw_spinlock; - Pvoid_t uuid_judy; // JudyHS: each UUID has a JudyL of sections (tiers) - - struct mrg_statistics stats; - } index[]; -}; - -static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg, size_t partition) { - mrg->index[partition].stats.additions_duplicate++; -} - -static inline void MRG_STATS_ADDED_METRIC(MRG *mrg, size_t partition) { - mrg->index[partition].stats.entries++; - mrg->index[partition].stats.additions++; - mrg->index[partition].stats.size += sizeof(METRIC); -} - -static inline void MRG_STATS_DELETED_METRIC(MRG *mrg, size_t partition) { - mrg->index[partition].stats.entries--; - mrg->index[partition].stats.size -= sizeof(METRIC); - mrg->index[partition].stats.deletions++; -} - -static inline void MRG_STATS_SEARCH_HIT(MRG *mrg, size_t partition) { - __atomic_add_fetch(&mrg->index[partition].stats.search_hits, 1, __ATOMIC_RELAXED); -} - -static inline void MRG_STATS_SEARCH_MISS(MRG *mrg, size_t partition) { - __atomic_add_fetch(&mrg->index[partition].stats.search_misses, 1, __ATOMIC_RELAXED); -} - -static inline void MRG_STATS_DELETE_MISS(MRG *mrg, size_t partition) { - mrg->index[partition].stats.delete_misses++; -} - -#define mrg_index_read_lock(mrg, partition) rw_spinlock_read_lock(&(mrg)->index[partition].rw_spinlock) -#define mrg_index_read_unlock(mrg, partition) rw_spinlock_read_unlock(&(mrg)->index[partition].rw_spinlock) -#define mrg_index_write_lock(mrg, partition) rw_spinlock_write_lock(&(mrg)->index[partition].rw_spinlock) -#define mrg_index_write_unlock(mrg, partition) rw_spinlock_write_unlock(&(mrg)->index[partition].rw_spinlock) - -static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl, size_t partition) { - if(mem_after_judyl > mem_before_judyl) - __atomic_add_fetch(&mrg->index[partition].stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED); - else if(mem_after_judyl < mem_before_judyl) - __atomic_sub_fetch(&mrg->index[partition].stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED); -} - -static inline void mrg_stats_size_judyhs_added_uuid(MRG *mrg, size_t partition) { - __atomic_add_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED); -} - -static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg, size_t partition) { - __atomic_sub_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED); -} - -static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) { - uint8_t *u = (uint8_t *)uuid; - size_t *n = (size_t *)&u[UUID_SZ - sizeof(size_t)]; - return *n % mrg->partitions; -} - -static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused, METRIC *metric) { - time_t first_time_s = __atomic_load_n(&metric->first_time_s, __ATOMIC_RELAXED); - - if(first_time_s <= 0) { - first_time_s = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); - if(first_time_s <= 0) - first_time_s = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); - - if(first_time_s <= 0) - first_time_s = 0; - else - __atomic_store_n(&metric->first_time_s, first_time_s, __ATOMIC_RELAXED); - } - - return first_time_s; -} - -static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric) { - size_t partition = metric->partition; - REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); - REFCOUNT refcount; - - do { - if(expected < 0) - fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount); - - refcount = expected + 1; - } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); - - if(refcount == 1) - __atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); - - __atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); - - return refcount; -} - -static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) { - size_t partition = metric->partition; - REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); - REFCOUNT refcount; - - do { - if(expected <= 0) - fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount); - - refcount = expected - 1; - } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); - - if(unlikely(!refcount)) - __atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); - - __atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); - - time_t first, last, ue; - mrg_metric_get_retention(mrg, metric, &first, &last, &ue); - return (!first || !last || first > last); -} - -static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { - size_t partition = uuid_partition(mrg, entry->uuid); - - METRIC *allocation = aral_mallocz(mrg->index[partition].aral); - - mrg_index_write_lock(mrg, partition); - - size_t mem_before_judyl, mem_after_judyl; - - Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, entry->uuid, sizeof(uuid_t), PJE0); - if(unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR)) - fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array"); - - if(unlikely(!*sections_judy_pptr)) - mrg_stats_size_judyhs_added_uuid(mrg, partition); - - mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); - Pvoid_t *PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0); - mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); - mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); - - if(unlikely(!PValue || PValue == PJERR)) - fatal("DBENGINE METRIC: corrupted section JudyL array"); - - if(unlikely(*PValue != NULL)) { - METRIC *metric = *PValue; - - metric_acquire(mrg, metric); - - MRG_STATS_DUPLICATE_ADD(mrg, partition); - - mrg_index_write_unlock(mrg, partition); - - if(ret) - *ret = false; - - aral_freez(mrg->index[partition].aral, allocation); - - return metric; - } - - METRIC *metric = allocation; - uuid_copy(metric->uuid, *entry->uuid); - metric->section = entry->section; - metric->first_time_s = MAX(0, entry->first_time_s); - metric->latest_time_s_clean = MAX(0, entry->last_time_s); - metric->latest_time_s_hot = 0; - metric->latest_update_every_s = entry->latest_update_every_s; - metric->writer = 0; - metric->refcount = 0; - metric->partition = partition; - metric_acquire(mrg, metric); - *PValue = metric; - - MRG_STATS_ADDED_METRIC(mrg, partition); - - mrg_index_write_unlock(mrg, partition); - - if(ret) - *ret = true; - - return metric; -} - -static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { - size_t partition = uuid_partition(mrg, uuid); - - mrg_index_read_lock(mrg, partition); - - Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t)); - if(unlikely(!sections_judy_pptr)) { - mrg_index_read_unlock(mrg, partition); - MRG_STATS_SEARCH_MISS(mrg, partition); - return NULL; - } - - Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0); - if(unlikely(!PValue)) { - mrg_index_read_unlock(mrg, partition); - MRG_STATS_SEARCH_MISS(mrg, partition); - return NULL; - } - - METRIC *metric = *PValue; - - metric_acquire(mrg, metric); - - mrg_index_read_unlock(mrg, partition); - - MRG_STATS_SEARCH_HIT(mrg, partition); - return metric; -} - -static inline bool acquired_metric_del(MRG *mrg, METRIC *metric) { - size_t partition = metric->partition; - - size_t mem_before_judyl, mem_after_judyl; - - mrg_index_write_lock(mrg, partition); - - if(!metric_release_and_can_be_deleted(mrg, metric)) { - mrg->index[partition].stats.delete_having_retention_or_referenced++; - mrg_index_write_unlock(mrg, partition); - return false; - } - - Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t)); - if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) { - MRG_STATS_DELETE_MISS(mrg, partition); - mrg_index_write_unlock(mrg, partition); - return false; - } - - mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); - int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0); - mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); - mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); - - if(unlikely(!rc)) { - MRG_STATS_DELETE_MISS(mrg, partition); - mrg_index_write_unlock(mrg, partition); - return false; - } - - if(!*sections_judy_pptr) { - rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0); - if(unlikely(!rc)) - fatal("DBENGINE METRIC: cannot delete UUID from JudyHS"); - mrg_stats_size_judyhs_removed_uuid(mrg, partition); - } - - MRG_STATS_DELETED_METRIC(mrg, partition); - - mrg_index_write_unlock(mrg, partition); - - aral_freez(mrg->index[partition].aral, metric); - - return true; -} - -// ---------------------------------------------------------------------------- -// public API - -inline MRG *mrg_create(ssize_t partitions) { - if(partitions < 1) - partitions = get_netdata_cpus(); - - MRG *mrg = callocz(1, sizeof(MRG) + sizeof(struct mrg_partition) * partitions); - mrg->partitions = partitions; - - for(size_t i = 0; i < mrg->partitions ; i++) { - rw_spinlock_init(&mrg->index[i].rw_spinlock); - - char buf[ARAL_MAX_NAME + 1]; - snprintfz(buf, ARAL_MAX_NAME, "mrg[%zu]", i); - - mrg->index[i].aral = aral_create(buf, sizeof(METRIC), 0, 16384, &mrg_aral_statistics, NULL, NULL, false, false); - } - - return mrg; -} - -inline size_t mrg_aral_structures(void) { - return aral_structures_from_stats(&mrg_aral_statistics); -} - -inline size_t mrg_aral_overhead(void) { - return aral_overhead_from_stats(&mrg_aral_statistics); -} - -inline void mrg_destroy(MRG *mrg __maybe_unused) { - // no destruction possible - // we can't traverse the metrics list - - // to delete entries, the caller needs to keep pointers to them - // and delete them one by one - - ; -} - -inline METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) { -// internal_fatal(entry.latest_time_s > max_acceptable_collected_time(), -// "DBENGINE METRIC: metric latest time is in the future"); - - return metric_add_and_acquire(mrg, &entry, ret); -} - -inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { - return metric_get_and_acquire(mrg, uuid, section); -} - -inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) { - return acquired_metric_del(mrg, metric); -} - -inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { - metric_acquire(mrg, metric); - return metric; -} - -inline bool mrg_metric_release(MRG *mrg, METRIC *metric) { - return metric_release_and_can_be_deleted(mrg, metric); -} - -inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) { - return (Word_t)metric; -} - -inline uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) { - return &metric->uuid; -} - -inline Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) { - return metric->section; -} - -inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { - internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative"); - - if(unlikely(first_time_s < 0)) - return false; - - __atomic_store_n(&metric->first_time_s, first_time_s, __ATOMIC_RELAXED); - - return true; -} - -inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) { - internal_fatal(first_time_s < 0 || last_time_s < 0 || update_every_s < 0, - "DBENGINE METRIC: timestamp is negative"); - internal_fatal(first_time_s > max_acceptable_collected_time(), - "DBENGINE METRIC: metric first time is in the future"); - internal_fatal(last_time_s > max_acceptable_collected_time(), - "DBENGINE METRIC: metric last time is in the future"); - - if(first_time_s > 0) - set_metric_field_with_condition(metric->first_time_s, first_time_s, _current <= 0 || _wanted < _current); - - if(last_time_s > 0) { - if(set_metric_field_with_condition(metric->latest_time_s_clean, last_time_s, _current <= 0 || _wanted > _current) && - update_every_s > 0) - // set the latest update every too - set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true); - } - else if(update_every_s > 0) - // set it only if it is invalid - set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0); -} - -inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { - internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative"); - return set_metric_field_with_condition(metric->first_time_s, first_time_s, _wanted > _current); -} - -inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { - return mrg_metric_get_first_time_s_smart(mrg, metric); -} - -inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) { - time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); - time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); - - *last_time_s = MAX(clean, hot); - *first_time_s = mrg_metric_get_first_time_s_smart(mrg, metric); - *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED); -} - -inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { - internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative"); - -// internal_fatal(latest_time_s > max_acceptable_collected_time(), -// "DBENGINE METRIC: metric latest time is in the future"); - -// internal_fatal(metric->latest_time_s_clean > latest_time_s, -// "DBENGINE METRIC: metric new clean latest time is older than the previous one"); - - if(latest_time_s > 0) { - if(set_metric_field_with_condition(metric->latest_time_s_clean, latest_time_s, true)) { - set_metric_field_with_condition(metric->first_time_s, latest_time_s, _current <= 0 || _wanted < _current); - - return true; - } - } - - return false; -} - -// returns true when metric still has retention -inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) { - Word_t section = mrg_metric_section(mrg, metric); - bool do_again = false; - size_t countdown = 5; - - do { - time_t min_first_time_s = LONG_MAX; - time_t max_end_time_s = 0; - PGC_PAGE *page; - PGC_SEARCH method = PGC_SEARCH_FIRST; - time_t page_first_time_s = 0; - time_t page_end_time_s = 0; - while ((page = pgc_page_get_and_acquire(main_cache, section, (Word_t)metric, page_first_time_s, method))) { - method = PGC_SEARCH_NEXT; - - bool is_hot = pgc_is_page_hot(page); - bool is_dirty = pgc_is_page_dirty(page); - page_first_time_s = pgc_page_start_time_s(page); - page_end_time_s = pgc_page_end_time_s(page); - - if ((is_hot || is_dirty) && page_first_time_s > 0 && page_first_time_s < min_first_time_s) - min_first_time_s = page_first_time_s; - - if (is_dirty && page_end_time_s > max_end_time_s) - max_end_time_s = page_end_time_s; - - pgc_page_release(main_cache, page); - } - - if (min_first_time_s == LONG_MAX) - min_first_time_s = 0; - - if (--countdown && !min_first_time_s && __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED)) - do_again = true; - else { - internal_error(!countdown, "METRIC: giving up on updating the retention of metric without disk retention"); - - do_again = false; - set_metric_field_with_condition(metric->first_time_s, min_first_time_s, true); - set_metric_field_with_condition(metric->latest_time_s_clean, max_end_time_s, true); - } - } while(do_again); - - time_t first, last, ue; - mrg_metric_get_retention(mrg, metric, &first, &last, &ue); - return (first && last && first < last); -} - -inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { - internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative"); - -// internal_fatal(latest_time_s > max_acceptable_collected_time(), -// "DBENGINE METRIC: metric latest time is in the future"); - - if(likely(latest_time_s > 0)) { - __atomic_store_n(&metric->latest_time_s_hot, latest_time_s, __ATOMIC_RELAXED); - return true; - } - - return false; -} - -inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) { - time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); - time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); - - return MAX(clean, hot); -} - -inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { - internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); - - if(update_every_s > 0) - return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true); - - return false; -} - -inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { - internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); - - if(update_every_s > 0) - return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0); - - return false; -} - -inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) { - return __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED); -} - -inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) { - pid_t expected = __atomic_load_n(&metric->writer, __ATOMIC_RELAXED); - pid_t wanted = gettid(); - bool done = true; - - do { - if(expected != 0) { - done = false; - break; - } - } while(!__atomic_compare_exchange_n(&metric->writer, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); - - if(done) - __atomic_add_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED); - else - __atomic_add_fetch(&mrg->index[metric->partition].stats.writers_conflicts, 1, __ATOMIC_RELAXED); - - return done; -} - -inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) { - // this function can be called from a different thread than the one than the writer - - pid_t expected = __atomic_load_n(&metric->writer, __ATOMIC_RELAXED); - pid_t wanted = 0; - bool done = true; - - do { - if(!expected) { - done = false; - break; - } - } while(!__atomic_compare_exchange_n(&metric->writer, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); - - if(done) - __atomic_sub_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED); - - return done; -} - -inline void mrg_update_metric_retention_and_granularity_by_uuid( - MRG *mrg, Word_t section, uuid_t *uuid, - time_t first_time_s, time_t last_time_s, - time_t update_every_s, time_t now_s) -{ - if(unlikely(last_time_s > now_s)) { - nd_log_limit_static_global_var(erl, 1, 0); - nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, - "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), " - "fixing last time to now", - first_time_s, last_time_s, now_s); - last_time_s = now_s; - } - - if (unlikely(first_time_s > last_time_s)) { - nd_log_limit_static_global_var(erl, 1, 0); - nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, - "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), " - "fixing first time to last time", - first_time_s, last_time_s, now_s); - - first_time_s = last_time_s; - } - - if (unlikely(first_time_s == 0 || last_time_s == 0)) { - nd_log_limit_static_global_var(erl, 1, 0); - nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, - "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), " - "using them as-is", - first_time_s, last_time_s, now_s); - } - - bool added = false; - METRIC *metric = mrg_metric_get_and_acquire(mrg, uuid, section); - if (!metric) { - MRG_ENTRY entry = { - .uuid = uuid, - .section = section, - .first_time_s = first_time_s, - .last_time_s = last_time_s, - .latest_update_every_s = (uint32_t) update_every_s - }; - metric = mrg_metric_add_and_acquire(mrg, entry, &added); - } - - if (likely(!added)) - mrg_metric_expand_retention(mrg, metric, first_time_s, last_time_s, update_every_s); - - mrg_metric_release(mrg, metric); -} - -inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) { - memset(s, 0, sizeof(struct mrg_statistics)); - - for(size_t i = 0; i < mrg->partitions ;i++) { - s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED); - s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED); - s->size += __atomic_load_n(&mrg->index[i].stats.size, __ATOMIC_RELAXED); - s->current_references += __atomic_load_n(&mrg->index[i].stats.current_references, __ATOMIC_RELAXED); - s->additions += __atomic_load_n(&mrg->index[i].stats.additions, __ATOMIC_RELAXED); - s->additions_duplicate += __atomic_load_n(&mrg->index[i].stats.additions_duplicate, __ATOMIC_RELAXED); - s->deletions += __atomic_load_n(&mrg->index[i].stats.deletions, __ATOMIC_RELAXED); - s->delete_having_retention_or_referenced += __atomic_load_n(&mrg->index[i].stats.delete_having_retention_or_referenced, __ATOMIC_RELAXED); - s->delete_misses += __atomic_load_n(&mrg->index[i].stats.delete_misses, __ATOMIC_RELAXED); - s->search_hits += __atomic_load_n(&mrg->index[i].stats.search_hits, __ATOMIC_RELAXED); - s->search_misses += __atomic_load_n(&mrg->index[i].stats.search_misses, __ATOMIC_RELAXED); - s->writers += __atomic_load_n(&mrg->index[i].stats.writers, __ATOMIC_RELAXED); - s->writers_conflicts += __atomic_load_n(&mrg->index[i].stats.writers_conflicts, __ATOMIC_RELAXED); - } - - s->size += sizeof(MRG) + sizeof(struct mrg_partition) * mrg->partitions; -} - -// ---------------------------------------------------------------------------- -// unit test - -struct mrg_stress_entry { - uuid_t uuid; - time_t after; - time_t before; -}; - -struct mrg_stress { - MRG *mrg; - bool stop; - size_t entries; - struct mrg_stress_entry *array; - size_t updates; -}; - -static void *mrg_stress(void *ptr) { - struct mrg_stress *t = ptr; - MRG *mrg = t->mrg; - - ssize_t start = 0; - ssize_t end = (ssize_t)t->entries; - ssize_t step = 1; - - if(gettid() % 2) { - start = (ssize_t)t->entries - 1; - end = -1; - step = -1; - } - - while(!__atomic_load_n(&t->stop, __ATOMIC_RELAXED)) { - for (ssize_t i = start; i != end; i += step) { - struct mrg_stress_entry *e = &t->array[i]; - - time_t after = __atomic_sub_fetch(&e->after, 1, __ATOMIC_RELAXED); - time_t before = __atomic_add_fetch(&e->before, 1, __ATOMIC_RELAXED); - - mrg_update_metric_retention_and_granularity_by_uuid( - mrg, 0x01, - &e->uuid, - after, - before, - 1, - before); - - __atomic_add_fetch(&t->updates, 1, __ATOMIC_RELAXED); - } - } - - return ptr; -} - -int mrg_unittest(void) { - MRG *mrg = mrg_create(0); - METRIC *m1_t0, *m2_t0, *m3_t0, *m4_t0; - METRIC *m1_t1, *m2_t1, *m3_t1, *m4_t1; - bool ret; - - uuid_t test_uuid; - uuid_generate(test_uuid); - MRG_ENTRY entry = { - .uuid = &test_uuid, - .section = 0, - .first_time_s = 2, - .last_time_s = 3, - .latest_update_every_s = 4, - }; - m1_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret); - if(!ret) - fatal("DBENGINE METRIC: failed to add metric"); - - // add the same metric again - m2_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret); - if(m2_t0 != m1_t0) - fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer"); - if(ret) - fatal("DBENGINE METRIC: managed to add the same metric twice"); - - m3_t0 = mrg_metric_get_and_acquire(mrg, entry.uuid, entry.section); - if(m3_t0 != m1_t0) - fatal("DBENGINE METRIC: cannot find the metric added"); - - // add the same metric again - m4_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret); - if(m4_t0 != m1_t0) - fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer"); - if(ret) - fatal("DBENGINE METRIC: managed to add the same metric twice"); - - // add the same metric in another section - entry.section = 1; - m1_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret); - if(!ret) - fatal("DBENGINE METRIC: failed to add metric in section %zu", (size_t)entry.section); - - // add the same metric again - m2_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret); - if(m2_t1 != m1_t1) - fatal("DBENGINE METRIC: adding the same metric twice (section %zu), does not return the same pointer", (size_t)entry.section); - if(ret) - fatal("DBENGINE METRIC: managed to add the same metric twice in (section 0)"); - - m3_t1 = mrg_metric_get_and_acquire(mrg, entry.uuid, entry.section); - if(m3_t1 != m1_t1) - fatal("DBENGINE METRIC: cannot find the metric added (section %zu)", (size_t)entry.section); - - // delete the first metric - mrg_metric_release(mrg, m2_t0); - mrg_metric_release(mrg, m3_t0); - mrg_metric_release(mrg, m4_t0); - mrg_metric_set_first_time_s(mrg, m1_t0, 0); - mrg_metric_set_clean_latest_time_s(mrg, m1_t0, 0); - mrg_metric_set_hot_latest_time_s(mrg, m1_t0, 0); - if(!mrg_metric_release_and_delete(mrg, m1_t0)) - fatal("DBENGINE METRIC: cannot delete the first metric"); - - m4_t1 = mrg_metric_get_and_acquire(mrg, entry.uuid, entry.section); - if(m4_t1 != m1_t1) - fatal("DBENGINE METRIC: cannot find the metric added (section %zu), after deleting the first one", (size_t)entry.section); - - // delete the second metric - mrg_metric_release(mrg, m2_t1); - mrg_metric_release(mrg, m3_t1); - mrg_metric_release(mrg, m4_t1); - mrg_metric_set_first_time_s(mrg, m1_t1, 0); - mrg_metric_set_clean_latest_time_s(mrg, m1_t1, 0); - mrg_metric_set_hot_latest_time_s(mrg, m1_t1, 0); - if(!mrg_metric_release_and_delete(mrg, m1_t1)) - fatal("DBENGINE METRIC: cannot delete the second metric"); - - struct mrg_statistics s; - mrg_get_statistics(mrg, &s); - if(s.entries != 0) - fatal("DBENGINE METRIC: invalid entries counter"); - - size_t entries = 1000000; - size_t threads = mrg->partitions / 3 + 1; - size_t tiers = 3; - size_t run_for_secs = 5; - netdata_log_info("preparing stress test of %zu entries...", entries); - struct mrg_stress t = { - .mrg = mrg, - .entries = entries, - .array = callocz(entries, sizeof(struct mrg_stress_entry)), - }; - - time_t now = max_acceptable_collected_time(); - for(size_t i = 0; i < entries ;i++) { - uuid_generate_random(t.array[i].uuid); - t.array[i].after = now / 3; - t.array[i].before = now / 2; - } - netdata_log_info("stress test is populating MRG with 3 tiers..."); - for(size_t i = 0; i < entries ;i++) { - struct mrg_stress_entry *e = &t.array[i]; - for(size_t tier = 1; tier <= tiers ;tier++) { - mrg_update_metric_retention_and_granularity_by_uuid( - mrg, tier, - &e->uuid, - e->after, - e->before, - 1, - e->before); - } - } - netdata_log_info("stress test ready to run..."); - - usec_t started_ut = now_monotonic_usec(); - - pthread_t th[threads]; - for(size_t i = 0; i < threads ; i++) { - char buf[15 + 1]; - snprintfz(buf, sizeof(buf) - 1, "TH[%zu]", i); - netdata_thread_create(&th[i], buf, - NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, - mrg_stress, &t); - } - - sleep_usec(run_for_secs * USEC_PER_SEC); - __atomic_store_n(&t.stop, true, __ATOMIC_RELAXED); - - for(size_t i = 0; i < threads ; i++) - netdata_thread_cancel(th[i]); - - for(size_t i = 0; i < threads ; i++) - netdata_thread_join(th[i], NULL); - - usec_t ended_ut = now_monotonic_usec(); - - struct mrg_statistics stats; - mrg_get_statistics(mrg, &stats); - - netdata_log_info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, " - "%zu deletions, %zu wrong deletions, " - "%zu successful searches, %zu wrong searches, " - "in %"PRIu64" usecs", - stats.additions, stats.additions_duplicate, - stats.deletions, stats.delete_misses, - stats.search_hits, stats.search_misses, - ended_ut - started_ut); - - netdata_log_info("DBENGINE METRIC: updates performance: %0.2fk/sec total, %0.2fk/sec/thread", - (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0, - (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0 / threads); - - mrg_destroy(mrg); - - netdata_log_info("DBENGINE METRIC: all tests passed!"); - - return 0; -} |