diff options
Diffstat (limited to 'database/engine/metric.c')
-rw-r--r-- | database/engine/metric.c | 578 |
1 files changed, 308 insertions, 270 deletions
diff --git a/database/engine/metric.c b/database/engine/metric.c index 6b65df9b..1370f9d7 100644 --- a/database/engine/metric.c +++ b/database/engine/metric.c @@ -16,6 +16,7 @@ struct metric { time_t latest_time_s_hot; // latest time of the currently collected page uint32_t latest_update_every_s; // pid_t writer; + uint8_t partition; METRIC_FLAGS flags; REFCOUNT refcount; SPINLOCK spinlock; // protects all variable members @@ -27,103 +28,98 @@ struct metric { static struct aral_statistics mrg_aral_statistics; struct mrg { - ARAL *aral[MRG_PARTITIONS]; + size_t partitions; - struct pgc_index { - netdata_rwlock_t rwlock; - Pvoid_t uuid_judy; // each UUID has a JudyL of sections (tiers) - } index[MRG_PARTITIONS]; + struct mrg_partition { + ARAL *aral; // not protected by our spinlock - it has its own - struct mrg_statistics stats; + RW_SPINLOCK rw_spinlock; + Pvoid_t uuid_judy; // JudyHS: each UUID has a JudyL of sections (tiers) - size_t entries_per_partition[MRG_PARTITIONS]; + struct mrg_statistics stats; + } index[]; }; -static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg) { - __atomic_add_fetch(&mrg->stats.additions_duplicate, 1, __ATOMIC_RELAXED); +static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg, size_t partition) { + mrg->index[partition].stats.additions_duplicate++; } static inline void MRG_STATS_ADDED_METRIC(MRG *mrg, size_t partition) { - __atomic_add_fetch(&mrg->stats.entries, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&mrg->stats.additions, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&mrg->stats.size, sizeof(METRIC), __ATOMIC_RELAXED); - - __atomic_add_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED); + mrg->index[partition].stats.entries++; + mrg->index[partition].stats.additions++; + mrg->index[partition].stats.size += sizeof(METRIC); } static inline void MRG_STATS_DELETED_METRIC(MRG *mrg, size_t partition) { - __atomic_sub_fetch(&mrg->stats.entries, 1, __ATOMIC_RELAXED); - __atomic_sub_fetch(&mrg->stats.size, sizeof(METRIC), __ATOMIC_RELAXED); - __atomic_add_fetch(&mrg->stats.deletions, 1, __ATOMIC_RELAXED); - - __atomic_sub_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED); + mrg->index[partition].stats.entries--; + mrg->index[partition].stats.size -= sizeof(METRIC); + mrg->index[partition].stats.deletions++; } -static inline void MRG_STATS_SEARCH_HIT(MRG *mrg) { - __atomic_add_fetch(&mrg->stats.search_hits, 1, __ATOMIC_RELAXED); +static inline void MRG_STATS_SEARCH_HIT(MRG *mrg, size_t partition) { + __atomic_add_fetch(&mrg->index[partition].stats.search_hits, 1, __ATOMIC_RELAXED); } -static inline void MRG_STATS_SEARCH_MISS(MRG *mrg) { - __atomic_add_fetch(&mrg->stats.search_misses, 1, __ATOMIC_RELAXED); +static inline void MRG_STATS_SEARCH_MISS(MRG *mrg, size_t partition) { + __atomic_add_fetch(&mrg->index[partition].stats.search_misses, 1, __ATOMIC_RELAXED); } -static inline void MRG_STATS_DELETE_MISS(MRG *mrg) { - __atomic_add_fetch(&mrg->stats.delete_misses, 1, __ATOMIC_RELAXED); +static inline void MRG_STATS_DELETE_MISS(MRG *mrg, size_t partition) { + mrg->index[partition].stats.delete_misses++; } -static inline void mrg_index_read_lock(MRG *mrg, size_t partition) { - netdata_rwlock_rdlock(&mrg->index[partition].rwlock); -} -static inline void mrg_index_read_unlock(MRG *mrg, size_t partition) { - netdata_rwlock_unlock(&mrg->index[partition].rwlock); -} -static inline void mrg_index_write_lock(MRG *mrg, size_t partition) { - netdata_rwlock_wrlock(&mrg->index[partition].rwlock); -} -static inline void mrg_index_write_unlock(MRG *mrg, size_t partition) { - netdata_rwlock_unlock(&mrg->index[partition].rwlock); -} +#define mrg_index_read_lock(mrg, partition) rw_spinlock_read_lock(&(mrg)->index[partition].rw_spinlock) +#define mrg_index_read_unlock(mrg, partition) rw_spinlock_read_unlock(&(mrg)->index[partition].rw_spinlock) +#define mrg_index_write_lock(mrg, partition) rw_spinlock_write_lock(&(mrg)->index[partition].rw_spinlock) +#define mrg_index_write_unlock(mrg, partition) rw_spinlock_write_unlock(&(mrg)->index[partition].rw_spinlock) + +#define metric_lock(metric) spinlock_lock(&(metric)->spinlock) +#define metric_unlock(metric) spinlock_unlock(&(metric)->spinlock) -static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl) { +static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl, size_t partition) { if(mem_after_judyl > mem_before_judyl) - __atomic_add_fetch(&mrg->stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->index[partition].stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED); else if(mem_after_judyl < mem_before_judyl) - __atomic_sub_fetch(&mrg->stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED); + __atomic_sub_fetch(&mrg->index[partition].stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED); } -static inline void mrg_stats_size_judyhs_added_uuid(MRG *mrg) { - __atomic_add_fetch(&mrg->stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED); +static inline void mrg_stats_size_judyhs_added_uuid(MRG *mrg, size_t partition) { + __atomic_add_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED); } -static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg) { - __atomic_sub_fetch(&mrg->stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED); +static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg, size_t partition) { + __atomic_sub_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED); } static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) { uint8_t *u = (uint8_t *)uuid; - return u[UUID_SZ - 1] % MRG_PARTITIONS; + size_t *n = (size_t *)&u[UUID_SZ - sizeof(size_t)]; + return *n % mrg->partitions; } static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) { + size_t partition = metric->partition; + bool has_retention = (metric->first_time_s > 0 || metric->latest_time_s_clean > 0 || metric->latest_time_s_hot > 0); if(has_retention && !(metric->flags & METRIC_FLAG_HAS_RETENTION)) { metric->flags |= METRIC_FLAG_HAS_RETENTION; - __atomic_add_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED); } else if(!has_retention && (metric->flags & METRIC_FLAG_HAS_RETENTION)) { metric->flags &= ~METRIC_FLAG_HAS_RETENTION; - __atomic_sub_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED); } return has_retention; } static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, bool having_spinlock) { + size_t partition = metric->partition; REFCOUNT refcount; if(!having_spinlock) - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(unlikely(metric->refcount < 0)) fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount); @@ -134,21 +130,22 @@ static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, b metric_has_retention_unsafe(mrg, metric); if(!having_spinlock) - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); if(refcount == 1) - __atomic_add_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); return refcount; } static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) { bool ret = true; + size_t partition = metric->partition; REFCOUNT refcount; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(unlikely(metric->refcount <= 0)) fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount); @@ -158,20 +155,20 @@ static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, ME if(likely(metric_has_retention_unsafe(mrg, metric) || refcount != 0)) ret = false; - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); if(unlikely(!refcount)) - __atomic_sub_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); - __atomic_sub_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); return ret; } -static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { +static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { size_t partition = uuid_partition(mrg, &entry->uuid); - METRIC *allocation = aral_mallocz(mrg->aral[partition]); + METRIC *allocation = aral_mallocz(mrg->index[partition].aral); mrg_index_write_lock(mrg, partition); @@ -182,12 +179,12 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array"); if(unlikely(!*sections_judy_pptr)) - mrg_stats_size_judyhs_added_uuid(mrg); + mrg_stats_size_judyhs_added_uuid(mrg, partition); mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); Pvoid_t *PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0); mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); - mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl); + mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); if(unlikely(!PValue || PValue == PJERR)) fatal("DBENGINE METRIC: corrupted section JudyL array"); @@ -196,18 +193,21 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { METRIC *metric = *PValue; metric_acquire(mrg, metric, false); + + MRG_STATS_DUPLICATE_ADD(mrg, partition); + mrg_index_write_unlock(mrg, partition); if(ret) *ret = false; - aral_freez(mrg->aral[partition], allocation); + aral_freez(mrg->index[partition].aral, allocation); - MRG_STATS_DUPLICATE_ADD(mrg); return metric; } METRIC *metric = allocation; + // memcpy(metric->uuid, entry->uuid, sizeof(uuid_t)); uuid_copy(metric->uuid, entry->uuid); metric->section = entry->section; metric->first_time_s = MAX(0, entry->first_time_s); @@ -217,21 +217,22 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { metric->writer = 0; metric->refcount = 0; metric->flags = 0; - netdata_spinlock_init(&metric->spinlock); + metric->partition = partition; + spinlock_init(&metric->spinlock); metric_acquire(mrg, metric, true); // no spinlock use required here *PValue = metric; + MRG_STATS_ADDED_METRIC(mrg, partition); + mrg_index_write_unlock(mrg, partition); if(ret) *ret = true; - MRG_STATS_ADDED_METRIC(mrg, partition); - return metric; } -static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { +static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { size_t partition = uuid_partition(mrg, uuid); mrg_index_read_lock(mrg, partition); @@ -239,14 +240,14 @@ static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t)); if(unlikely(!sections_judy_pptr)) { mrg_index_read_unlock(mrg, partition); - MRG_STATS_SEARCH_MISS(mrg); + MRG_STATS_SEARCH_MISS(mrg, partition); return NULL; } Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0); if(unlikely(!PValue)) { mrg_index_read_unlock(mrg, partition); - MRG_STATS_SEARCH_MISS(mrg); + MRG_STATS_SEARCH_MISS(mrg, partition); return NULL; } @@ -256,38 +257,38 @@ static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { mrg_index_read_unlock(mrg, partition); - MRG_STATS_SEARCH_HIT(mrg); + MRG_STATS_SEARCH_HIT(mrg, partition); return metric; } -static bool acquired_metric_del(MRG *mrg, METRIC *metric) { - size_t partition = uuid_partition(mrg, &metric->uuid); +static inline bool acquired_metric_del(MRG *mrg, METRIC *metric) { + size_t partition = metric->partition; size_t mem_before_judyl, mem_after_judyl; mrg_index_write_lock(mrg, partition); if(!metric_release_and_can_be_deleted(mrg, metric)) { + mrg->index[partition].stats.delete_having_retention_or_referenced++; mrg_index_write_unlock(mrg, partition); - __atomic_add_fetch(&mrg->stats.delete_having_retention_or_referenced, 1, __ATOMIC_RELAXED); return false; } Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t)); if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) { + MRG_STATS_DELETE_MISS(mrg, partition); mrg_index_write_unlock(mrg, partition); - MRG_STATS_DELETE_MISS(mrg); return false; } mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0); mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); - mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl); + mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); if(unlikely(!rc)) { + MRG_STATS_DELETE_MISS(mrg, partition); mrg_index_write_unlock(mrg, partition); - MRG_STATS_DELETE_MISS(mrg); return false; } @@ -295,14 +296,14 @@ static bool acquired_metric_del(MRG *mrg, METRIC *metric) { rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0); if(unlikely(!rc)) fatal("DBENGINE METRIC: cannot delete UUID from JudyHS"); - mrg_stats_size_judyhs_removed_uuid(mrg); + mrg_stats_size_judyhs_removed_uuid(mrg, partition); } - mrg_index_write_unlock(mrg, partition); + MRG_STATS_DELETED_METRIC(mrg, partition); - aral_freez(mrg->aral[partition], metric); + mrg_index_write_unlock(mrg, partition); - MRG_STATS_DELETED_METRIC(mrg, partition); + aral_freez(mrg->index[partition].aral, metric); return true; } @@ -310,38 +311,34 @@ static bool acquired_metric_del(MRG *mrg, METRIC *metric) { // ---------------------------------------------------------------------------- // public API -MRG *mrg_create(void) { - MRG *mrg = callocz(1, sizeof(MRG)); +inline MRG *mrg_create(ssize_t partitions) { + if(partitions < 1) + partitions = get_netdata_cpus(); + + MRG *mrg = callocz(1, sizeof(MRG) + sizeof(struct mrg_partition) * partitions); + mrg->partitions = partitions; - for(size_t i = 0; i < MRG_PARTITIONS ; i++) { - netdata_rwlock_init(&mrg->index[i].rwlock); + for(size_t i = 0; i < mrg->partitions ; i++) { + rw_spinlock_init(&mrg->index[i].rw_spinlock); char buf[ARAL_MAX_NAME + 1]; snprintfz(buf, ARAL_MAX_NAME, "mrg[%zu]", i); - mrg->aral[i] = aral_create(buf, - sizeof(METRIC), - 0, - 16384, - &mrg_aral_statistics, - NULL, NULL, false, - false); + mrg->index[i].aral = aral_create(buf, sizeof(METRIC), 0, 16384, &mrg_aral_statistics, NULL, NULL, false, false); } - mrg->stats.size = sizeof(MRG); - return mrg; } -size_t mrg_aral_structures(void) { +inline size_t mrg_aral_structures(void) { return aral_structures_from_stats(&mrg_aral_statistics); } -size_t mrg_aral_overhead(void) { +inline size_t mrg_aral_overhead(void) { return aral_overhead_from_stats(&mrg_aral_statistics); } -void mrg_destroy(MRG *mrg __maybe_unused) { +inline void mrg_destroy(MRG *mrg __maybe_unused) { // no destruction possible // we can't traverse the metrics list @@ -351,57 +348,57 @@ void mrg_destroy(MRG *mrg __maybe_unused) { ; } -METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) { +inline METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) { // internal_fatal(entry.latest_time_s > max_acceptable_collected_time(), // "DBENGINE METRIC: metric latest time is in the future"); return metric_add_and_acquire(mrg, &entry, ret); } -METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { +inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { return metric_get_and_acquire(mrg, uuid, section); } -bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) { +inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) { return acquired_metric_del(mrg, metric); } -METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { +inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { metric_acquire(mrg, metric, false); return metric; } -bool mrg_metric_release(MRG *mrg, METRIC *metric) { +inline bool mrg_metric_release(MRG *mrg, METRIC *metric) { return metric_release_and_can_be_deleted(mrg, metric); } -Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) { +inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) { return (Word_t)metric; } -uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) { +inline uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) { return &metric->uuid; } -Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) { +inline Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) { return metric->section; } -bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { +inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative"); if(unlikely(first_time_s < 0)) return false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); metric->first_time_s = first_time_s; metric_has_retention_unsafe(mrg, metric); - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return true; } -void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) { +inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) { internal_fatal(first_time_s < 0 || last_time_s < 0 || update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); internal_fatal(first_time_s > max_acceptable_collected_time(), @@ -421,7 +418,7 @@ void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t if(unlikely(!first_time_s && !last_time_s && !update_every_s)) return; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(unlikely(first_time_s && (!metric->first_time_s || first_time_s < metric->first_time_s))) metric->first_time_s = first_time_s; @@ -436,29 +433,29 @@ void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t metric->latest_update_every_s = (uint32_t) update_every_s; metric_has_retention_unsafe(mrg, metric); - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); } -bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { +inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative"); bool ret = false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(first_time_s > metric->first_time_s) { metric->first_time_s = first_time_s; ret = true; } metric_has_retention_unsafe(mrg, metric); - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return ret; } -time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { +inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { time_t first_time_s; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(unlikely(!metric->first_time_s)) { if(metric->latest_time_s_clean) @@ -470,13 +467,13 @@ time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { first_time_s = metric->first_time_s; - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return first_time_s; } -void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) { - netdata_spinlock_lock(&metric->spinlock); +inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) { + metric_lock(metric); if(unlikely(!metric->first_time_s)) { if(metric->latest_time_s_clean) @@ -490,16 +487,16 @@ void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *f *last_time_s = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot); *update_every_s = metric->latest_update_every_s; - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); } -bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { +inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative"); if(unlikely(latest_time_s < 0)) return false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); // internal_fatal(latest_time_s > max_acceptable_collected_time(), // "DBENGINE METRIC: metric latest time is in the future"); @@ -513,12 +510,12 @@ bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, metric->first_time_s = latest_time_s; metric_has_retention_unsafe(mrg, metric); - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return true; } // returns true when metric still has retention -bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) { +inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) { Word_t section = mrg_metric_section(mrg, metric); bool do_again = false; size_t countdown = 5; @@ -551,7 +548,7 @@ bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) { if (min_first_time_s == LONG_MAX) min_first_time_s = 0; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if (--countdown && !min_first_time_s && metric->latest_time_s_hot) do_again = true; else { @@ -563,13 +560,13 @@ bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) { ret = metric_has_retention_unsafe(mrg, metric); } - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); } while(do_again); return ret; } -bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { +inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative"); // internal_fatal(latest_time_s > max_acceptable_collected_time(), @@ -578,204 +575,215 @@ bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, t if(unlikely(latest_time_s < 0)) return false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); metric->latest_time_s_hot = latest_time_s; if(unlikely(!metric->first_time_s)) metric->first_time_s = latest_time_s; metric_has_retention_unsafe(mrg, metric); - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return true; } -time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) { +inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) { time_t max; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); max = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot); - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return max; } -bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { +inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); if(update_every_s <= 0) return false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); metric->latest_update_every_s = (uint32_t) update_every_s; - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return true; } -bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { +inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); if(update_every_s <= 0) return false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(!metric->latest_update_every_s) metric->latest_update_every_s = (uint32_t) update_every_s; - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return true; } -time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) { +inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) { time_t update_every_s; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); update_every_s = metric->latest_update_every_s; - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return update_every_s; } -bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) { +inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) { bool done = false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(!metric->writer) { metric->writer = gettid(); - __atomic_add_fetch(&mrg->stats.writers, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED); done = true; } else - __atomic_add_fetch(&mrg->stats.writers_conflicts, 1, __ATOMIC_RELAXED); - netdata_spinlock_unlock(&metric->spinlock); + __atomic_add_fetch(&mrg->index[metric->partition].stats.writers_conflicts, 1, __ATOMIC_RELAXED); + metric_unlock(metric); return done; } -bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) { +inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) { bool done = false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(metric->writer) { metric->writer = 0; - __atomic_sub_fetch(&mrg->stats.writers, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED); done = true; } - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return done; } -struct mrg_statistics mrg_get_statistics(MRG *mrg) { - // FIXME - use atomics - return mrg->stats; -} - -// ---------------------------------------------------------------------------- -// unit test +inline void mrg_update_metric_retention_and_granularity_by_uuid( + MRG *mrg, Word_t section, uuid_t *uuid, + time_t first_time_s, time_t last_time_s, + time_t update_every_s, time_t now_s) +{ + if(unlikely(last_time_s > now_s)) { + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), " + "fixing last time to now", + first_time_s, last_time_s, now_s); + last_time_s = now_s; + } -#ifdef MRG_STRESS_TEST + if (unlikely(first_time_s > last_time_s)) { + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), " + "fixing first time to last time", + first_time_s, last_time_s, now_s); -static void mrg_stress(MRG *mrg, size_t entries, size_t sections) { - bool ret; + first_time_s = last_time_s; + } - info("DBENGINE METRIC: stress testing %zu entries on %zu sections...", entries, sections); + if (unlikely(first_time_s == 0 || last_time_s == 0)) { + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), " + "using them as-is", + first_time_s, last_time_s, now_s); + } - METRIC *array[entries][sections]; - for(size_t i = 0; i < entries ; i++) { - MRG_ENTRY e = { - .first_time_s = (time_t)(i + 1), - .latest_time_s = (time_t)(i + 2), - .latest_update_every_s = (time_t)(i + 3), + bool added = false; + METRIC *metric = mrg_metric_get_and_acquire(mrg, uuid, section); + if (!metric) { + MRG_ENTRY entry = { + .section = section, + .first_time_s = first_time_s, + .last_time_s = last_time_s, + .latest_update_every_s = (uint32_t) update_every_s }; - uuid_generate_random(e.uuid); - - for(size_t section = 0; section < sections ;section++) { - e.section = section; - array[i][section] = mrg_metric_add_and_acquire(mrg, e, &ret); - if(!ret) - fatal("DBENGINE METRIC: failed to add metric %zu, section %zu", i, section); - - if(mrg_metric_add_and_acquire(mrg, e, &ret) != array[i][section]) - fatal("DBENGINE METRIC: adding the same metric twice, returns a different metric"); - - if(ret) - fatal("DBENGINE METRIC: adding the same metric twice, returns success"); - - if(mrg_metric_get_and_acquire(mrg, &e.uuid, e.section) != array[i][section]) - fatal("DBENGINE METRIC: cannot get back the same metric"); - - if(uuid_compare(*mrg_metric_uuid(mrg, array[i][section]), e.uuid) != 0) - fatal("DBENGINE METRIC: uuids do not match"); - } + // memcpy(entry.uuid, *uuid, sizeof(uuid_t)); + uuid_copy(entry.uuid, *uuid); + metric = mrg_metric_add_and_acquire(mrg, entry, &added); } - for(size_t i = 0; i < entries ; i++) { - for (size_t section = 0; section < sections; section++) { - uuid_t uuid; - uuid_generate_random(uuid); - - if(mrg_metric_get_and_acquire(mrg, &uuid, section)) - fatal("DBENGINE METRIC: found non-existing uuid"); - - if(mrg_metric_id(mrg, array[i][section]) != (Word_t)array[i][section]) - fatal("DBENGINE METRIC: metric id does not match"); - - if(mrg_metric_get_first_time_s(mrg, array[i][section]) != (time_t)(i + 1)) - fatal("DBENGINE METRIC: wrong first time returned"); - if(mrg_metric_get_latest_time_s(mrg, array[i][section]) != (time_t)(i + 2)) - fatal("DBENGINE METRIC: wrong latest time returned"); - if(mrg_metric_get_update_every_s(mrg, array[i][section]) != (time_t)(i + 3)) - fatal("DBENGINE METRIC: wrong latest time returned"); - - if(!mrg_metric_set_first_time_s(mrg, array[i][section], (time_t)((i + 1) * 2))) - fatal("DBENGINE METRIC: cannot set first time"); - if(!mrg_metric_set_clean_latest_time_s(mrg, array[i][section], (time_t) ((i + 1) * 3))) - fatal("DBENGINE METRIC: cannot set latest time"); - if(!mrg_metric_set_update_every(mrg, array[i][section], (time_t)((i + 1) * 4))) - fatal("DBENGINE METRIC: cannot set update every"); - - if(mrg_metric_get_first_time_s(mrg, array[i][section]) != (time_t)((i + 1) * 2)) - fatal("DBENGINE METRIC: wrong first time returned"); - if(mrg_metric_get_latest_time_s(mrg, array[i][section]) != (time_t)((i + 1) * 3)) - fatal("DBENGINE METRIC: wrong latest time returned"); - if(mrg_metric_get_update_every_s(mrg, array[i][section]) != (time_t)((i + 1) * 4)) - fatal("DBENGINE METRIC: wrong latest time returned"); - } + if (likely(!added)) + mrg_metric_expand_retention(mrg, metric, first_time_s, last_time_s, update_every_s); + + mrg_metric_release(mrg, metric); +} + +inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) { + memset(s, 0, sizeof(struct mrg_statistics)); + + for(size_t i = 0; i < mrg->partitions ;i++) { + s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED); + s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED); + s->entries_with_retention += __atomic_load_n(&mrg->index[i].stats.entries_with_retention, __ATOMIC_RELAXED); + s->size += __atomic_load_n(&mrg->index[i].stats.size, __ATOMIC_RELAXED); + s->current_references += __atomic_load_n(&mrg->index[i].stats.current_references, __ATOMIC_RELAXED); + s->additions += __atomic_load_n(&mrg->index[i].stats.additions, __ATOMIC_RELAXED); + s->additions_duplicate += __atomic_load_n(&mrg->index[i].stats.additions_duplicate, __ATOMIC_RELAXED); + s->deletions += __atomic_load_n(&mrg->index[i].stats.deletions, __ATOMIC_RELAXED); + s->delete_having_retention_or_referenced += __atomic_load_n(&mrg->index[i].stats.delete_having_retention_or_referenced, __ATOMIC_RELAXED); + s->delete_misses += __atomic_load_n(&mrg->index[i].stats.delete_misses, __ATOMIC_RELAXED); + s->search_hits += __atomic_load_n(&mrg->index[i].stats.search_hits, __ATOMIC_RELAXED); + s->search_misses += __atomic_load_n(&mrg->index[i].stats.search_misses, __ATOMIC_RELAXED); + s->writers += __atomic_load_n(&mrg->index[i].stats.writers, __ATOMIC_RELAXED); + s->writers_conflicts += __atomic_load_n(&mrg->index[i].stats.writers_conflicts, __ATOMIC_RELAXED); } - for(size_t i = 0; i < entries ; i++) { - for (size_t section = 0; section < sections; section++) { - if(!mrg_metric_release_and_delete(mrg, array[i][section])) - fatal("DBENGINE METRIC: failed to delete metric"); - } - } + s->size += sizeof(MRG) + sizeof(struct mrg_partition) * mrg->partitions; } -static void *mrg_stress_test_thread1(void *ptr) { - MRG *mrg = ptr; +// ---------------------------------------------------------------------------- +// unit test - for(int i = 0; i < 5 ; i++) - mrg_stress(mrg, 10000, 5); +struct mrg_stress_entry { + uuid_t uuid; + time_t after; + time_t before; +}; - return ptr; -} +struct mrg_stress { + MRG *mrg; + bool stop; + size_t entries; + struct mrg_stress_entry *array; + size_t updates; +}; -static void *mrg_stress_test_thread2(void *ptr) { - MRG *mrg = ptr; +static void *mrg_stress(void *ptr) { + struct mrg_stress *t = ptr; + MRG *mrg = t->mrg; - for(int i = 0; i < 10 ; i++) - mrg_stress(mrg, 500, 50); + ssize_t start = 0; + ssize_t end = (ssize_t)t->entries; + ssize_t step = 1; - return ptr; -} + if(gettid() % 2) { + start = (ssize_t)t->entries - 1; + end = -1; + step = -1; + } + + while(!__atomic_load_n(&t->stop, __ATOMIC_RELAXED)) { + for (ssize_t i = start; i != end; i += step) { + struct mrg_stress_entry *e = &t->array[i]; -static void *mrg_stress_test_thread3(void *ptr) { - MRG *mrg = ptr; + time_t after = __atomic_sub_fetch(&e->after, 1, __ATOMIC_RELAXED); + time_t before = __atomic_add_fetch(&e->before, 1, __ATOMIC_RELAXED); - for(int i = 0; i < 50 ; i++) - mrg_stress(mrg, 5000, 1); + mrg_update_metric_retention_and_granularity_by_uuid( + mrg, 0x01, + &e->uuid, + after, + before, + 1, + before); + + __atomic_add_fetch(&t->updates, 1, __ATOMIC_RELAXED); + } + } return ptr; } -#endif int mrg_unittest(void) { - MRG *mrg = mrg_create(); + MRG *mrg = mrg_create(0); METRIC *m1_t0, *m2_t0, *m3_t0, *m4_t0; METRIC *m1_t1, *m2_t1, *m3_t1, *m4_t1; bool ret; @@ -850,54 +858,84 @@ int mrg_unittest(void) { if(!mrg_metric_release_and_delete(mrg, m1_t1)) fatal("DBENGINE METRIC: cannot delete the second metric"); - if(mrg->stats.entries != 0) + struct mrg_statistics s; + mrg_get_statistics(mrg, &s); + if(s.entries != 0) fatal("DBENGINE METRIC: invalid entries counter"); -#ifdef MRG_STRESS_TEST - usec_t started_ut = now_monotonic_usec(); - pthread_t thread1; - netdata_thread_create(&thread1, "TH1", - NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, - mrg_stress_test_thread1, mrg); + size_t entries = 1000000; + size_t threads = mrg->partitions / 3 + 1; + size_t tiers = 3; + size_t run_for_secs = 5; + netdata_log_info("preparing stress test of %zu entries...", entries); + struct mrg_stress t = { + .mrg = mrg, + .entries = entries, + .array = callocz(entries, sizeof(struct mrg_stress_entry)), + }; - pthread_t thread2; - netdata_thread_create(&thread2, "TH2", - NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, - mrg_stress_test_thread2, mrg); + time_t now = max_acceptable_collected_time(); + for(size_t i = 0; i < entries ;i++) { + uuid_generate_random(t.array[i].uuid); + t.array[i].after = now / 3; + t.array[i].before = now / 2; + } + netdata_log_info("stress test is populating MRG with 3 tiers..."); + for(size_t i = 0; i < entries ;i++) { + struct mrg_stress_entry *e = &t.array[i]; + for(size_t tier = 1; tier <= tiers ;tier++) { + mrg_update_metric_retention_and_granularity_by_uuid( + mrg, tier, + &e->uuid, + e->after, + e->before, + 1, + e->before); + } + } + netdata_log_info("stress test ready to run..."); - pthread_t thread3; - netdata_thread_create(&thread3, "TH3", - NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, - mrg_stress_test_thread3, mrg); + usec_t started_ut = now_monotonic_usec(); + pthread_t th[threads]; + for(size_t i = 0; i < threads ; i++) { + char buf[15 + 1]; + snprintfz(buf, 15, "TH[%zu]", i); + netdata_thread_create(&th[i], buf, + NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, + mrg_stress, &t); + } - sleep_usec(5 * USEC_PER_SEC); + sleep_usec(run_for_secs * USEC_PER_SEC); + __atomic_store_n(&t.stop, true, __ATOMIC_RELAXED); - netdata_thread_cancel(thread1); - netdata_thread_cancel(thread2); - netdata_thread_cancel(thread3); + for(size_t i = 0; i < threads ; i++) + netdata_thread_cancel(th[i]); + + for(size_t i = 0; i < threads ; i++) + netdata_thread_join(th[i], NULL); - netdata_thread_join(thread1, NULL); - netdata_thread_join(thread2, NULL); - netdata_thread_join(thread3, NULL); usec_t ended_ut = now_monotonic_usec(); - info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, " + struct mrg_statistics stats; + mrg_get_statistics(mrg, &stats); + + netdata_log_info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, " "%zu deletions, %zu wrong deletions, " "%zu successful searches, %zu wrong searches, " - "%zu successful pointer validations, %zu wrong pointer validations " "in %llu usecs", - mrg->stats.additions, mrg->stats.additions_duplicate, - mrg->stats.deletions, mrg->stats.delete_misses, - mrg->stats.search_hits, mrg->stats.search_misses, - mrg->stats.pointer_validation_hits, mrg->stats.pointer_validation_misses, + stats.additions, stats.additions_duplicate, + stats.deletions, stats.delete_misses, + stats.search_hits, stats.search_misses, ended_ut - started_ut); -#endif + netdata_log_info("DBENGINE METRIC: updates performance: %0.2fk/sec total, %0.2fk/sec/thread", + (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0, + (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0 / threads); mrg_destroy(mrg); - info("DBENGINE METRIC: all tests passed!"); + netdata_log_info("DBENGINE METRIC: all tests passed!"); return 0; } |