diff options
Diffstat (limited to 'database/engine/metric.c')
-rw-r--r-- | database/engine/metric.c | 875 |
1 files changed, 875 insertions, 0 deletions
diff --git a/database/engine/metric.c b/database/engine/metric.c new file mode 100644 index 000000000..9dc9d9ebc --- /dev/null +++ b/database/engine/metric.c @@ -0,0 +1,875 @@ +#include "metric.h" + +typedef int32_t REFCOUNT; +#define REFCOUNT_DELETING (-100) + +typedef enum __attribute__ ((__packed__)) { + METRIC_FLAG_HAS_RETENTION = (1 << 0), +} METRIC_FLAGS; + +struct metric { + uuid_t uuid; // never changes + Word_t section; // never changes + + time_t first_time_s; // + time_t latest_time_s_clean; // archived pages latest time + time_t latest_time_s_hot; // latest time of the currently collected page + uint32_t latest_update_every_s; // + pid_t writer; + METRIC_FLAGS flags; + REFCOUNT refcount; + SPINLOCK spinlock; // protects all variable members + + // THIS IS allocated with malloc() + // YOU HAVE TO INITIALIZE IT YOURSELF ! +}; + +static struct aral_statistics mrg_aral_statistics; + +struct mrg { + ARAL *aral[MRG_PARTITIONS]; + + struct pgc_index { + netdata_rwlock_t rwlock; + Pvoid_t uuid_judy; // each UUID has a JudyL of sections (tiers) + } index[MRG_PARTITIONS]; + + struct mrg_statistics stats; + + size_t entries_per_partition[MRG_PARTITIONS]; +}; + +static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg) { + __atomic_add_fetch(&mrg->stats.additions_duplicate, 1, __ATOMIC_RELAXED); +} + +static inline void MRG_STATS_ADDED_METRIC(MRG *mrg, size_t partition) { + __atomic_add_fetch(&mrg->stats.entries, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->stats.additions, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->stats.size, sizeof(METRIC), __ATOMIC_RELAXED); + + __atomic_add_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED); +} + +static inline void MRG_STATS_DELETED_METRIC(MRG *mrg, size_t partition) { + __atomic_sub_fetch(&mrg->stats.entries, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&mrg->stats.size, sizeof(METRIC), __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->stats.deletions, 1, __ATOMIC_RELAXED); + + __atomic_sub_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED); +} + +static inline void MRG_STATS_SEARCH_HIT(MRG *mrg) { + __atomic_add_fetch(&mrg->stats.search_hits, 1, __ATOMIC_RELAXED); +} + +static inline void MRG_STATS_SEARCH_MISS(MRG *mrg) { + __atomic_add_fetch(&mrg->stats.search_misses, 1, __ATOMIC_RELAXED); +} + +static inline void MRG_STATS_DELETE_MISS(MRG *mrg) { + __atomic_add_fetch(&mrg->stats.delete_misses, 1, __ATOMIC_RELAXED); +} + +static inline void mrg_index_read_lock(MRG *mrg, size_t partition) { + netdata_rwlock_rdlock(&mrg->index[partition].rwlock); +} +static inline void mrg_index_read_unlock(MRG *mrg, size_t partition) { + netdata_rwlock_unlock(&mrg->index[partition].rwlock); +} +static inline void mrg_index_write_lock(MRG *mrg, size_t partition) { + netdata_rwlock_wrlock(&mrg->index[partition].rwlock); +} +static inline void mrg_index_write_unlock(MRG *mrg, size_t partition) { + netdata_rwlock_unlock(&mrg->index[partition].rwlock); +} + +static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl) { + if(mem_after_judyl > mem_before_judyl) + __atomic_add_fetch(&mrg->stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED); + else if(mem_after_judyl < mem_before_judyl) + __atomic_sub_fetch(&mrg->stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED); +} + +static inline void mrg_stats_size_judyhs_added_uuid(MRG *mrg) { + __atomic_add_fetch(&mrg->stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED); +} + +static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg) { + __atomic_sub_fetch(&mrg->stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED); +} + +static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) { + uint8_t *u = (uint8_t *)uuid; + return u[UUID_SZ - 1] % MRG_PARTITIONS; +} + +static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) { + bool has_retention = (metric->first_time_s || metric->latest_time_s_clean || metric->latest_time_s_hot); + + if(has_retention && !(metric->flags & METRIC_FLAG_HAS_RETENTION)) { + metric->flags |= METRIC_FLAG_HAS_RETENTION; + __atomic_add_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED); + } + else if(!has_retention && (metric->flags & METRIC_FLAG_HAS_RETENTION)) { + metric->flags &= ~METRIC_FLAG_HAS_RETENTION; + __atomic_sub_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED); + } + + return has_retention; +} + +static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, bool having_spinlock) { + REFCOUNT refcount; + + if(!having_spinlock) + netdata_spinlock_lock(&metric->spinlock); + + if(unlikely(metric->refcount < 0)) + fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount); + + refcount = ++metric->refcount; + + // update its retention flags + metric_has_retention_unsafe(mrg, metric); + + if(!having_spinlock) + netdata_spinlock_unlock(&metric->spinlock); + + if(refcount == 1) + __atomic_add_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED); + + __atomic_add_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED); + + return refcount; +} + +static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) { + bool ret = true; + REFCOUNT refcount; + + netdata_spinlock_lock(&metric->spinlock); + + if(unlikely(metric->refcount <= 0)) + fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount); + + refcount = --metric->refcount; + + if(likely(metric_has_retention_unsafe(mrg, metric) || refcount != 0)) + ret = false; + + netdata_spinlock_unlock(&metric->spinlock); + + if(unlikely(!refcount)) + __atomic_sub_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED); + + __atomic_sub_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED); + + return ret; +} + +static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { + size_t partition = uuid_partition(mrg, &entry->uuid); + + METRIC *allocation = aral_mallocz(mrg->aral[partition]); + + mrg_index_write_lock(mrg, partition); + + size_t mem_before_judyl, mem_after_judyl; + + Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, &entry->uuid, sizeof(uuid_t), PJE0); + if(unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR)) + fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array"); + + if(unlikely(!*sections_judy_pptr)) + mrg_stats_size_judyhs_added_uuid(mrg); + + mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); + Pvoid_t *PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0); + mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); + mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl); + + if(unlikely(!PValue || PValue == PJERR)) + fatal("DBENGINE METRIC: corrupted section JudyL array"); + + if(unlikely(*PValue != NULL)) { + METRIC *metric = *PValue; + + metric_acquire(mrg, metric, false); + mrg_index_write_unlock(mrg, partition); + + if(ret) + *ret = false; + + aral_freez(mrg->aral[partition], allocation); + + MRG_STATS_DUPLICATE_ADD(mrg); + return metric; + } + + METRIC *metric = allocation; + uuid_copy(metric->uuid, entry->uuid); + metric->section = entry->section; + metric->first_time_s = entry->first_time_s; + metric->latest_time_s_clean = entry->last_time_s; + metric->latest_time_s_hot = 0; + metric->latest_update_every_s = entry->latest_update_every_s; + metric->writer = 0; + metric->refcount = 0; + metric->flags = 0; + netdata_spinlock_init(&metric->spinlock); + metric_acquire(mrg, metric, true); // no spinlock use required here + *PValue = metric; + + mrg_index_write_unlock(mrg, partition); + + if(ret) + *ret = true; + + MRG_STATS_ADDED_METRIC(mrg, partition); + + return metric; +} + +static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { + size_t partition = uuid_partition(mrg, uuid); + + mrg_index_read_lock(mrg, partition); + + Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t)); + if(unlikely(!sections_judy_pptr)) { + mrg_index_read_unlock(mrg, partition); + MRG_STATS_SEARCH_MISS(mrg); + return NULL; + } + + Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0); + if(unlikely(!PValue)) { + mrg_index_read_unlock(mrg, partition); + MRG_STATS_SEARCH_MISS(mrg); + return NULL; + } + + METRIC *metric = *PValue; + + metric_acquire(mrg, metric, false); + + mrg_index_read_unlock(mrg, partition); + + MRG_STATS_SEARCH_HIT(mrg); + return metric; +} + +static bool acquired_metric_del(MRG *mrg, METRIC *metric) { + size_t partition = uuid_partition(mrg, &metric->uuid); + + size_t mem_before_judyl, mem_after_judyl; + + mrg_index_write_lock(mrg, partition); + + if(!metric_release_and_can_be_deleted(mrg, metric)) { + mrg_index_write_unlock(mrg, partition); + __atomic_add_fetch(&mrg->stats.delete_having_retention_or_referenced, 1, __ATOMIC_RELAXED); + return false; + } + + Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t)); + if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) { + mrg_index_write_unlock(mrg, partition); + MRG_STATS_DELETE_MISS(mrg); + return false; + } + + mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); + int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0); + mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); + mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl); + + if(unlikely(!rc)) { + mrg_index_write_unlock(mrg, partition); + MRG_STATS_DELETE_MISS(mrg); + return false; + } + + if(!*sections_judy_pptr) { + rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0); + if(unlikely(!rc)) + fatal("DBENGINE METRIC: cannot delete UUID from JudyHS"); + mrg_stats_size_judyhs_removed_uuid(mrg); + } + + mrg_index_write_unlock(mrg, partition); + + aral_freez(mrg->aral[partition], metric); + + MRG_STATS_DELETED_METRIC(mrg, partition); + + return true; +} + +// ---------------------------------------------------------------------------- +// public API + +MRG *mrg_create(void) { + MRG *mrg = callocz(1, sizeof(MRG)); + + for(size_t i = 0; i < MRG_PARTITIONS ; i++) { + netdata_rwlock_init(&mrg->index[i].rwlock); + + char buf[ARAL_MAX_NAME + 1]; + snprintfz(buf, ARAL_MAX_NAME, "mrg[%zu]", i); + + mrg->aral[i] = aral_create(buf, + sizeof(METRIC), + 0, + 16384, + &mrg_aral_statistics, + NULL, NULL, false, + false); + } + + mrg->stats.size = sizeof(MRG); + + return mrg; +} + +size_t mrg_aral_structures(void) { + return aral_structures_from_stats(&mrg_aral_statistics); +} + +size_t mrg_aral_overhead(void) { + return aral_overhead_from_stats(&mrg_aral_statistics); +} + +void mrg_destroy(MRG *mrg __maybe_unused) { + // no destruction possible + // we can't traverse the metrics list + + // to delete entries, the caller needs to keep pointers to them + // and delete them one by one + + ; +} + +METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) { +// internal_fatal(entry.latest_time_s > max_acceptable_collected_time(), +// "DBENGINE METRIC: metric latest time is in the future"); + + return metric_add_and_acquire(mrg, &entry, ret); +} + +METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { + return metric_get_and_acquire(mrg, uuid, section); +} + +bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) { + return acquired_metric_del(mrg, metric); +} + +METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { + metric_acquire(mrg, metric, false); + return metric; +} + +bool mrg_metric_release(MRG *mrg, METRIC *metric) { + return metric_release_and_can_be_deleted(mrg, metric); +} + +Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) { + return (Word_t)metric; +} + +uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) { + return &metric->uuid; +} + +Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) { + return metric->section; +} + +bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { + netdata_spinlock_lock(&metric->spinlock); + metric->first_time_s = first_time_s; + metric_has_retention_unsafe(mrg, metric); + netdata_spinlock_unlock(&metric->spinlock); + + return true; +} + +void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) { + + internal_fatal(first_time_s > max_acceptable_collected_time(), + "DBENGINE METRIC: metric first time is in the future"); + internal_fatal(last_time_s > max_acceptable_collected_time(), + "DBENGINE METRIC: metric last time is in the future"); + + netdata_spinlock_lock(&metric->spinlock); + + if(unlikely(first_time_s && (!metric->first_time_s || first_time_s < metric->first_time_s))) + metric->first_time_s = first_time_s; + + if(likely(last_time_s && (!metric->latest_time_s_clean || last_time_s > metric->latest_time_s_clean))) { + metric->latest_time_s_clean = last_time_s; + + if(likely(update_every_s)) + metric->latest_update_every_s = update_every_s; + } + else if(unlikely(!metric->latest_update_every_s && update_every_s)) + metric->latest_update_every_s = update_every_s; + + metric_has_retention_unsafe(mrg, metric); + netdata_spinlock_unlock(&metric->spinlock); +} + +bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { + bool ret = false; + + netdata_spinlock_lock(&metric->spinlock); + if(first_time_s > metric->first_time_s) { + metric->first_time_s = first_time_s; + ret = true; + } + metric_has_retention_unsafe(mrg, metric); + netdata_spinlock_unlock(&metric->spinlock); + + return ret; +} + +time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { + time_t first_time_s; + + netdata_spinlock_lock(&metric->spinlock); + + if(unlikely(!metric->first_time_s)) { + if(metric->latest_time_s_clean) + metric->first_time_s = metric->latest_time_s_clean; + + else if(metric->latest_time_s_hot) + metric->first_time_s = metric->latest_time_s_hot; + } + + first_time_s = metric->first_time_s; + + netdata_spinlock_unlock(&metric->spinlock); + + return first_time_s; +} + +void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) { + netdata_spinlock_lock(&metric->spinlock); + + if(unlikely(!metric->first_time_s)) { + if(metric->latest_time_s_clean) + metric->first_time_s = metric->latest_time_s_clean; + + else if(metric->latest_time_s_hot) + metric->first_time_s = metric->latest_time_s_hot; + } + + *first_time_s = metric->first_time_s; + *last_time_s = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot); + *update_every_s = metric->latest_update_every_s; + + netdata_spinlock_unlock(&metric->spinlock); +} + +bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { + netdata_spinlock_lock(&metric->spinlock); + +// internal_fatal(latest_time_s > max_acceptable_collected_time(), +// "DBENGINE METRIC: metric latest time is in the future"); + +// internal_fatal(metric->latest_time_s_clean > latest_time_s, +// "DBENGINE METRIC: metric new clean latest time is older than the previous one"); + + metric->latest_time_s_clean = latest_time_s; + + if(unlikely(!metric->first_time_s)) + metric->first_time_s = latest_time_s; + +// if(unlikely(metric->first_time_s > latest_time_s)) +// metric->first_time_s = latest_time_s; + + metric_has_retention_unsafe(mrg, metric); + netdata_spinlock_unlock(&metric->spinlock); + return true; +} + +// returns true when metric still has retention +bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) { + Word_t section = mrg_metric_section(mrg, metric); + bool do_again = false; + size_t countdown = 5; + bool ret = true; + + do { + time_t min_first_time_s = LONG_MAX; + time_t max_end_time_s = 0; + PGC_PAGE *page; + PGC_SEARCH method = PGC_SEARCH_FIRST; + time_t page_first_time_s = 0; + time_t page_end_time_s = 0; + while ((page = pgc_page_get_and_acquire(main_cache, section, (Word_t)metric, page_first_time_s, method))) { + method = PGC_SEARCH_NEXT; + + bool is_hot = pgc_is_page_hot(page); + bool is_dirty = pgc_is_page_dirty(page); + page_first_time_s = pgc_page_start_time_s(page); + page_end_time_s = pgc_page_end_time_s(page); + + if ((is_hot || is_dirty) && page_first_time_s < min_first_time_s) + min_first_time_s = page_first_time_s; + + if (is_dirty && page_end_time_s > max_end_time_s) + max_end_time_s = page_end_time_s; + + pgc_page_release(main_cache, page); + } + + if (min_first_time_s == LONG_MAX) + min_first_time_s = 0; + + netdata_spinlock_lock(&metric->spinlock); + if (--countdown && !min_first_time_s && metric->latest_time_s_hot) + do_again = true; + else { + internal_error(!countdown, "METRIC: giving up on updating the retention of metric without disk retention"); + + do_again = false; + metric->first_time_s = min_first_time_s; + metric->latest_time_s_clean = max_end_time_s; + + ret = metric_has_retention_unsafe(mrg, metric); + } + netdata_spinlock_unlock(&metric->spinlock); + } while(do_again); + + return ret; +} + +bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { +// internal_fatal(latest_time_s > max_acceptable_collected_time(), +// "DBENGINE METRIC: metric latest time is in the future"); + + netdata_spinlock_lock(&metric->spinlock); + metric->latest_time_s_hot = latest_time_s; + + if(unlikely(!metric->first_time_s)) + metric->first_time_s = latest_time_s; + +// if(unlikely(metric->first_time_s > latest_time_s)) +// metric->first_time_s = latest_time_s; + + metric_has_retention_unsafe(mrg, metric); + netdata_spinlock_unlock(&metric->spinlock); + return true; +} + +time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) { + time_t max; + netdata_spinlock_lock(&metric->spinlock); + max = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot); + netdata_spinlock_unlock(&metric->spinlock); + return max; +} + +bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { + if(!update_every_s) + return false; + + netdata_spinlock_lock(&metric->spinlock); + metric->latest_update_every_s = update_every_s; + netdata_spinlock_unlock(&metric->spinlock); + + return true; +} + +bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { + if(!update_every_s) + return false; + + netdata_spinlock_lock(&metric->spinlock); + if(!metric->latest_update_every_s) + metric->latest_update_every_s = update_every_s; + netdata_spinlock_unlock(&metric->spinlock); + + return true; +} + +time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) { + time_t update_every_s; + + netdata_spinlock_lock(&metric->spinlock); + update_every_s = metric->latest_update_every_s; + netdata_spinlock_unlock(&metric->spinlock); + + return update_every_s; +} + +bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) { + bool done = false; + netdata_spinlock_lock(&metric->spinlock); + if(!metric->writer) { + metric->writer = gettid(); + __atomic_add_fetch(&mrg->stats.writers, 1, __ATOMIC_RELAXED); + done = true; + } + else + __atomic_add_fetch(&mrg->stats.writers_conflicts, 1, __ATOMIC_RELAXED); + netdata_spinlock_unlock(&metric->spinlock); + return done; +} + +bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) { + bool done = false; + netdata_spinlock_lock(&metric->spinlock); + if(metric->writer) { + metric->writer = 0; + __atomic_sub_fetch(&mrg->stats.writers, 1, __ATOMIC_RELAXED); + done = true; + } + netdata_spinlock_unlock(&metric->spinlock); + return done; +} + +struct mrg_statistics mrg_get_statistics(MRG *mrg) { + // FIXME - use atomics + return mrg->stats; +} + +// ---------------------------------------------------------------------------- +// unit test + +#ifdef MRG_STRESS_TEST + +static void mrg_stress(MRG *mrg, size_t entries, size_t sections) { + bool ret; + + info("DBENGINE METRIC: stress testing %zu entries on %zu sections...", entries, sections); + + METRIC *array[entries][sections]; + for(size_t i = 0; i < entries ; i++) { + MRG_ENTRY e = { + .first_time_s = (time_t)(i + 1), + .latest_time_s = (time_t)(i + 2), + .latest_update_every_s = (time_t)(i + 3), + }; + uuid_generate_random(e.uuid); + + for(size_t section = 0; section < sections ;section++) { + e.section = section; + array[i][section] = mrg_metric_add_and_acquire(mrg, e, &ret); + if(!ret) + fatal("DBENGINE METRIC: failed to add metric %zu, section %zu", i, section); + + if(mrg_metric_add_and_acquire(mrg, e, &ret) != array[i][section]) + fatal("DBENGINE METRIC: adding the same metric twice, returns a different metric"); + + if(ret) + fatal("DBENGINE METRIC: adding the same metric twice, returns success"); + + if(mrg_metric_get_and_acquire(mrg, &e.uuid, e.section) != array[i][section]) + fatal("DBENGINE METRIC: cannot get back the same metric"); + + if(uuid_compare(*mrg_metric_uuid(mrg, array[i][section]), e.uuid) != 0) + fatal("DBENGINE METRIC: uuids do not match"); + } + } + + for(size_t i = 0; i < entries ; i++) { + for (size_t section = 0; section < sections; section++) { + uuid_t uuid; + uuid_generate_random(uuid); + + if(mrg_metric_get_and_acquire(mrg, &uuid, section)) + fatal("DBENGINE METRIC: found non-existing uuid"); + + if(mrg_metric_id(mrg, array[i][section]) != (Word_t)array[i][section]) + fatal("DBENGINE METRIC: metric id does not match"); + + if(mrg_metric_get_first_time_s(mrg, array[i][section]) != (time_t)(i + 1)) + fatal("DBENGINE METRIC: wrong first time returned"); + if(mrg_metric_get_latest_time_s(mrg, array[i][section]) != (time_t)(i + 2)) + fatal("DBENGINE METRIC: wrong latest time returned"); + if(mrg_metric_get_update_every_s(mrg, array[i][section]) != (time_t)(i + 3)) + fatal("DBENGINE METRIC: wrong latest time returned"); + + if(!mrg_metric_set_first_time_s(mrg, array[i][section], (time_t)((i + 1) * 2))) + fatal("DBENGINE METRIC: cannot set first time"); + if(!mrg_metric_set_clean_latest_time_s(mrg, array[i][section], (time_t) ((i + 1) * 3))) + fatal("DBENGINE METRIC: cannot set latest time"); + if(!mrg_metric_set_update_every(mrg, array[i][section], (time_t)((i + 1) * 4))) + fatal("DBENGINE METRIC: cannot set update every"); + + if(mrg_metric_get_first_time_s(mrg, array[i][section]) != (time_t)((i + 1) * 2)) + fatal("DBENGINE METRIC: wrong first time returned"); + if(mrg_metric_get_latest_time_s(mrg, array[i][section]) != (time_t)((i + 1) * 3)) + fatal("DBENGINE METRIC: wrong latest time returned"); + if(mrg_metric_get_update_every_s(mrg, array[i][section]) != (time_t)((i + 1) * 4)) + fatal("DBENGINE METRIC: wrong latest time returned"); + } + } + + for(size_t i = 0; i < entries ; i++) { + for (size_t section = 0; section < sections; section++) { + if(!mrg_metric_release_and_delete(mrg, array[i][section])) + fatal("DBENGINE METRIC: failed to delete metric"); + } + } +} + +static void *mrg_stress_test_thread1(void *ptr) { + MRG *mrg = ptr; + + for(int i = 0; i < 5 ; i++) + mrg_stress(mrg, 10000, 5); + + return ptr; +} + +static void *mrg_stress_test_thread2(void *ptr) { + MRG *mrg = ptr; + + for(int i = 0; i < 10 ; i++) + mrg_stress(mrg, 500, 50); + + return ptr; +} + +static void *mrg_stress_test_thread3(void *ptr) { + MRG *mrg = ptr; + + for(int i = 0; i < 50 ; i++) + mrg_stress(mrg, 5000, 1); + + return ptr; +} +#endif + +int mrg_unittest(void) { + MRG *mrg = mrg_create(); + METRIC *m1_t0, *m2_t0, *m3_t0, *m4_t0; + METRIC *m1_t1, *m2_t1, *m3_t1, *m4_t1; + bool ret; + + MRG_ENTRY entry = { + .section = 0, + .first_time_s = 2, + .last_time_s = 3, + .latest_update_every_s = 4, + }; + uuid_generate(entry.uuid); + m1_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret); + if(!ret) + fatal("DBENGINE METRIC: failed to add metric"); + + // add the same metric again + m2_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret); + if(m2_t0 != m1_t0) + fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer"); + if(ret) + fatal("DBENGINE METRIC: managed to add the same metric twice"); + + m3_t0 = mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section); + if(m3_t0 != m1_t0) + fatal("DBENGINE METRIC: cannot find the metric added"); + + // add the same metric again + m4_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret); + if(m4_t0 != m1_t0) + fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer"); + if(ret) + fatal("DBENGINE METRIC: managed to add the same metric twice"); + + // add the same metric in another section + entry.section = 1; + m1_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret); + if(!ret) + fatal("DBENGINE METRIC: failed to add metric in section %zu", (size_t)entry.section); + + // add the same metric again + m2_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret); + if(m2_t1 != m1_t1) + fatal("DBENGINE METRIC: adding the same metric twice (section %zu), does not return the same pointer", (size_t)entry.section); + if(ret) + fatal("DBENGINE METRIC: managed to add the same metric twice in (section 0)"); + + m3_t1 = mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section); + if(m3_t1 != m1_t1) + fatal("DBENGINE METRIC: cannot find the metric added (section %zu)", (size_t)entry.section); + + // delete the first metric + mrg_metric_release(mrg, m2_t0); + mrg_metric_release(mrg, m3_t0); + mrg_metric_release(mrg, m4_t0); + mrg_metric_set_first_time_s(mrg, m1_t0, 0); + mrg_metric_set_clean_latest_time_s(mrg, m1_t0, 0); + mrg_metric_set_hot_latest_time_s(mrg, m1_t0, 0); + if(!mrg_metric_release_and_delete(mrg, m1_t0)) + fatal("DBENGINE METRIC: cannot delete the first metric"); + + m4_t1 = mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section); + if(m4_t1 != m1_t1) + fatal("DBENGINE METRIC: cannot find the metric added (section %zu), after deleting the first one", (size_t)entry.section); + + // delete the second metric + mrg_metric_release(mrg, m2_t1); + mrg_metric_release(mrg, m3_t1); + mrg_metric_release(mrg, m4_t1); + mrg_metric_set_first_time_s(mrg, m1_t1, 0); + mrg_metric_set_clean_latest_time_s(mrg, m1_t1, 0); + mrg_metric_set_hot_latest_time_s(mrg, m1_t1, 0); + if(!mrg_metric_release_and_delete(mrg, m1_t1)) + fatal("DBENGINE METRIC: cannot delete the second metric"); + + if(mrg->stats.entries != 0) + fatal("DBENGINE METRIC: invalid entries counter"); + +#ifdef MRG_STRESS_TEST + usec_t started_ut = now_monotonic_usec(); + pthread_t thread1; + netdata_thread_create(&thread1, "TH1", + NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, + mrg_stress_test_thread1, mrg); + + pthread_t thread2; + netdata_thread_create(&thread2, "TH2", + NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, + mrg_stress_test_thread2, mrg); + + pthread_t thread3; + netdata_thread_create(&thread3, "TH3", + NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, + mrg_stress_test_thread3, mrg); + + + sleep_usec(5 * USEC_PER_SEC); + + netdata_thread_cancel(thread1); + netdata_thread_cancel(thread2); + netdata_thread_cancel(thread3); + + netdata_thread_join(thread1, NULL); + netdata_thread_join(thread2, NULL); + netdata_thread_join(thread3, NULL); + usec_t ended_ut = now_monotonic_usec(); + + info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, " + "%zu deletions, %zu wrong deletions, " + "%zu successful searches, %zu wrong searches, " + "%zu successful pointer validations, %zu wrong pointer validations " + "in %llu usecs", + mrg->stats.additions, mrg->stats.additions_duplicate, + mrg->stats.deletions, mrg->stats.delete_misses, + mrg->stats.search_hits, mrg->stats.search_misses, + mrg->stats.pointer_validation_hits, mrg->stats.pointer_validation_misses, + ended_ut - started_ut); + +#endif + + mrg_destroy(mrg); + + info("DBENGINE METRIC: all tests passed!"); + + return 0; +} |