diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:03 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:18 +0000 |
commit | 5da14042f70711ea5cf66e034699730335462f66 (patch) | |
tree | 0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/database/engine/metric.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz netdata-5da14042f70711ea5cf66e034699730335462f66.zip |
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/database/engine/metric.c (renamed from database/engine/metric.c) | 331 |
1 files changed, 202 insertions, 129 deletions
diff --git a/database/engine/metric.c b/src/database/engine/metric.c index 2e132612e..01eb22fbc 100644 --- a/database/engine/metric.c +++ b/src/database/engine/metric.c @@ -1,5 +1,8 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "metric.h" +#include "cache.h" +#include "libnetdata/locks/locks.h" +#include "rrddiskprotocol.h" typedef int32_t REFCOUNT; #define REFCOUNT_DELETING (-100) @@ -104,8 +107,11 @@ static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg, size_t partition static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) { uint8_t *u = (uint8_t *)uuid; - size_t *n = (size_t *)&u[UUID_SZ - sizeof(size_t)]; - return *n % mrg->partitions; + + size_t n; + memcpy(&n, &u[UUID_SZ - sizeof(size_t)], sizeof(size_t)); + + return n % mrg->partitions; } static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused, METRIC *metric) { @@ -125,87 +131,174 @@ static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused, return first_time_s; } -static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric) { +static void metric_log(MRG *mrg __maybe_unused, METRIC *metric, const char *msg) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)metric->section; + + char uuid[UUID_STR_LEN]; + uuid_unparse_lower(metric->uuid, uuid); + nd_log(NDLS_DAEMON, NDLP_ERR, + "METRIC: %s on %s at tier %d, refcount %d, partition %u, " + "retention [%ld - %ld (hot), %ld (clean)], update every %"PRIu32", " + "writer pid %d " + "--- PLEASE OPEN A GITHUB ISSUE TO REPORT THIS LOG LINE TO NETDATA --- ", + msg, + uuid, + ctx->config.tier, + metric->refcount, + metric->partition, + metric->first_time_s, + metric->latest_time_s_hot, + metric->latest_time_s_clean, + metric->latest_update_every_s, + (int)metric->writer + ); +} + +static inline bool acquired_metric_has_retention(MRG *mrg, METRIC *metric) { + time_t first, last; + mrg_metric_get_retention(mrg, metric, &first, &last, NULL); + return (!first || !last || first > last); +} + +static inline void acquired_for_deletion_metric_delete(MRG *mrg, METRIC *metric) { size_t partition = metric->partition; - REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); - REFCOUNT refcount; + + size_t mem_before_judyl, mem_after_judyl; + + mrg_index_write_lock(mrg, partition); + + Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t)); + if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) { + MRG_STATS_DELETE_MISS(mrg, partition); + mrg_index_write_unlock(mrg, partition); + return; + } + + mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); + int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0); + mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); + mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); + + if(unlikely(!rc)) { + MRG_STATS_DELETE_MISS(mrg, partition); + mrg_index_write_unlock(mrg, partition); + return; + } + + if(!*sections_judy_pptr) { + rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0); + if(unlikely(!rc)) + fatal("DBENGINE METRIC: cannot delete UUID from JudyHS"); + mrg_stats_size_judyhs_removed_uuid(mrg, partition); + } + + MRG_STATS_DELETED_METRIC(mrg, partition); + + mrg_index_write_unlock(mrg, partition); + + aral_freez(mrg->index[partition].aral, metric); +} + +static inline bool metric_acquire(MRG *mrg, METRIC *metric) { + REFCOUNT expected, desired; + + expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); do { - if(expected < 0) - fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount); + if(unlikely(expected < 0)) + return false; + + desired = expected + 1; - refcount = expected + 1; - } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)); + + size_t partition = metric->partition; - if(refcount == 1) + if(desired == 1) __atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); __atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); - return refcount; + return true; } -static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) { +static inline bool metric_release(MRG *mrg, METRIC *metric, bool delete_if_last_without_retention) { size_t partition = metric->partition; - REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); - REFCOUNT refcount; + REFCOUNT expected, desired; + + expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); do { - if(expected <= 0) - fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount); + if(expected <= 0) { + metric_log(mrg, metric, "refcount is zero or negative during release"); + fatal("METRIC: refcount is %d (zero or negative) during release", expected); + } - refcount = expected - 1; - } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + if(expected == 1 && delete_if_last_without_retention && !acquired_metric_has_retention(mrg, metric)) + desired = REFCOUNT_DELETING; + else + desired = expected - 1; + + } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); - if(unlikely(!refcount)) + if(desired == 0 || desired == REFCOUNT_DELETING) { __atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); + if(desired == REFCOUNT_DELETING) + acquired_for_deletion_metric_delete(mrg, metric); + } + __atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); - time_t first, last, ue; - mrg_metric_get_retention(mrg, metric, &first, &last, &ue); - return (!first || !last || first > last); + return desired == REFCOUNT_DELETING; } static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { size_t partition = uuid_partition(mrg, entry->uuid); METRIC *allocation = aral_mallocz(mrg->index[partition].aral); + Pvoid_t *PValue; - mrg_index_write_lock(mrg, partition); + while(1) { + mrg_index_write_lock(mrg, partition); - size_t mem_before_judyl, mem_after_judyl; + size_t mem_before_judyl, mem_after_judyl; - Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, entry->uuid, sizeof(uuid_t), PJE0); - if(unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR)) - fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array"); + Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, entry->uuid, sizeof(uuid_t), PJE0); + if (unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR)) + fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array"); - if(unlikely(!*sections_judy_pptr)) - mrg_stats_size_judyhs_added_uuid(mrg, partition); + if (unlikely(!*sections_judy_pptr)) + mrg_stats_size_judyhs_added_uuid(mrg, partition); - mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); - Pvoid_t *PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0); - mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); - mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); + mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); + PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0); + mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); + mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); - if(unlikely(!PValue || PValue == PJERR)) - fatal("DBENGINE METRIC: corrupted section JudyL array"); + if (unlikely(!PValue || PValue == PJERR)) + fatal("DBENGINE METRIC: corrupted section JudyL array"); - if(unlikely(*PValue != NULL)) { - METRIC *metric = *PValue; + if (unlikely(*PValue != NULL)) { + METRIC *metric = *PValue; - metric_acquire(mrg, metric); + if(!metric_acquire(mrg, metric)) { + mrg_index_write_unlock(mrg, partition); + continue; + } - MRG_STATS_DUPLICATE_ADD(mrg, partition); + MRG_STATS_DUPLICATE_ADD(mrg, partition); + mrg_index_write_unlock(mrg, partition); - mrg_index_write_unlock(mrg, partition); + if (ret) + *ret = false; - if(ret) - *ret = false; + aral_freez(mrg->index[partition].aral, allocation); - aral_freez(mrg->index[partition].aral, allocation); + return metric; + } - return metric; + break; } METRIC *metric = allocation; @@ -216,9 +309,8 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r metric->latest_time_s_hot = 0; metric->latest_update_every_s = entry->latest_update_every_s; metric->writer = 0; - metric->refcount = 0; + metric->refcount = 1; metric->partition = partition; - metric_acquire(mrg, metric); *PValue = metric; MRG_STATS_ADDED_METRIC(mrg, partition); @@ -234,77 +326,35 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { size_t partition = uuid_partition(mrg, uuid); - mrg_index_read_lock(mrg, partition); + while(1) { + mrg_index_read_lock(mrg, partition); - Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t)); - if(unlikely(!sections_judy_pptr)) { - mrg_index_read_unlock(mrg, partition); - MRG_STATS_SEARCH_MISS(mrg, partition); - return NULL; - } - - Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0); - if(unlikely(!PValue)) { - mrg_index_read_unlock(mrg, partition); - MRG_STATS_SEARCH_MISS(mrg, partition); - return NULL; - } - - METRIC *metric = *PValue; - - metric_acquire(mrg, metric); - - mrg_index_read_unlock(mrg, partition); - - MRG_STATS_SEARCH_HIT(mrg, partition); - return metric; -} - -static inline bool acquired_metric_del(MRG *mrg, METRIC *metric) { - size_t partition = metric->partition; - - size_t mem_before_judyl, mem_after_judyl; - - mrg_index_write_lock(mrg, partition); + Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t)); + if (unlikely(!sections_judy_pptr)) { + mrg_index_read_unlock(mrg, partition); + MRG_STATS_SEARCH_MISS(mrg, partition); + return NULL; + } - if(!metric_release_and_can_be_deleted(mrg, metric)) { - mrg->index[partition].stats.delete_having_retention_or_referenced++; - mrg_index_write_unlock(mrg, partition); - return false; - } + Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0); + if (unlikely(!PValue)) { + mrg_index_read_unlock(mrg, partition); + MRG_STATS_SEARCH_MISS(mrg, partition); + return NULL; + } - Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t)); - if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) { - MRG_STATS_DELETE_MISS(mrg, partition); - mrg_index_write_unlock(mrg, partition); - return false; - } + METRIC *metric = *PValue; - mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); - int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0); - mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); - mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); + if(metric && !metric_acquire(mrg, metric)) + metric = NULL; - if(unlikely(!rc)) { - MRG_STATS_DELETE_MISS(mrg, partition); - mrg_index_write_unlock(mrg, partition); - return false; - } + mrg_index_read_unlock(mrg, partition); - if(!*sections_judy_pptr) { - rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0); - if(unlikely(!rc)) - fatal("DBENGINE METRIC: cannot delete UUID from JudyHS"); - mrg_stats_size_judyhs_removed_uuid(mrg, partition); + if(metric) { + MRG_STATS_SEARCH_HIT(mrg, partition); + return metric; + } } - - MRG_STATS_DELETED_METRIC(mrg, partition); - - mrg_index_write_unlock(mrg, partition); - - aral_freez(mrg->index[partition].aral, metric); - - return true; } // ---------------------------------------------------------------------------- @@ -359,7 +409,7 @@ inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section } inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) { - return acquired_metric_del(mrg, metric); + return metric_release(mrg, metric, true); } inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { @@ -367,8 +417,8 @@ inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { return metric; } -inline bool mrg_metric_release(MRG *mrg, METRIC *metric) { - return metric_release_and_can_be_deleted(mrg, metric); +inline void mrg_metric_release(MRG *mrg, METRIC *metric) { + metric_release(mrg, metric, false); } inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) { @@ -394,8 +444,8 @@ inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, return true; } -inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) { - internal_fatal(first_time_s < 0 || last_time_s < 0 || update_every_s < 0, +inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, uint32_t update_every_s) { + internal_fatal(first_time_s < 0 || last_time_s < 0, "DBENGINE METRIC: timestamp is negative"); internal_fatal(first_time_s > max_acceptable_collected_time(), "DBENGINE METRIC: metric first time is in the future"); @@ -425,13 +475,14 @@ inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metri return mrg_metric_get_first_time_s_smart(mrg, metric); } -inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) { +inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, uint32_t *update_every_s) { time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); *last_time_s = MAX(clean, hot); *first_time_s = mrg_metric_get_first_time_s_smart(mrg, metric); - *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED); + if (update_every_s) + *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED); } inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { @@ -498,8 +549,8 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr } } while(do_again); - time_t first, last, ue; - mrg_metric_get_retention(mrg, metric, &first, &last, &ue); + time_t first, last; + mrg_metric_get_retention(mrg, metric, &first, &last, NULL); return (first && last && first < last); } @@ -517,6 +568,11 @@ inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *me return false; } +inline time_t mrg_metric_get_latest_clean_time_s(MRG *mrg __maybe_unused, METRIC *metric) { + time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); + return clean; +} + inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) { time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); @@ -524,25 +580,21 @@ inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metr return MAX(clean, hot); } -inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { - internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); - +inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, uint32_t update_every_s) { if(update_every_s > 0) return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true); return false; } -inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { - internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); - +inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, uint32_t update_every_s) { if(update_every_s > 0) return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0); return false; } -inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) { +inline uint32_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) { return __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED); } @@ -589,7 +641,7 @@ inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) { inline void mrg_update_metric_retention_and_granularity_by_uuid( MRG *mrg, Word_t section, uuid_t *uuid, time_t first_time_s, time_t last_time_s, - time_t update_every_s, time_t now_s) + uint32_t update_every_s, time_t now_s) { if(unlikely(last_time_s > now_s)) { nd_log_limit_static_global_var(erl, 1, 0); @@ -626,14 +678,35 @@ inline void mrg_update_metric_retention_and_granularity_by_uuid( .section = section, .first_time_s = first_time_s, .last_time_s = last_time_s, - .latest_update_every_s = (uint32_t) update_every_s + .latest_update_every_s = update_every_s }; metric = mrg_metric_add_and_acquire(mrg, entry, &added); } - if (likely(!added)) + struct rrdengine_instance *ctx = (struct rrdengine_instance *) section; + if (likely(!added)) { + uint64_t old_samples = 0; + + if (update_every_s && metric->latest_update_every_s && metric->latest_time_s_clean) + old_samples = (metric->latest_time_s_clean - metric->first_time_s) / metric->latest_update_every_s; + mrg_metric_expand_retention(mrg, metric, first_time_s, last_time_s, update_every_s); + uint64_t new_samples = 0; + if (update_every_s && metric->latest_update_every_s && metric->latest_time_s_clean) + new_samples = (metric->latest_time_s_clean - metric->first_time_s) / metric->latest_update_every_s; + + __atomic_add_fetch(&ctx->atomic.samples, new_samples - old_samples, __ATOMIC_RELAXED); + } + else { + // Newly added + if (update_every_s) { + uint64_t samples = (last_time_s - first_time_s) / update_every_s; + __atomic_add_fetch(&ctx->atomic.samples, samples, __ATOMIC_RELAXED); + } + __atomic_add_fetch(&ctx->atomic.metrics, 1, __ATOMIC_RELAXED); + } + mrg_metric_release(mrg, metric); } |