diff options
Diffstat (limited to 'database/engine/metric.c')
-rw-r--r-- | database/engine/metric.c | 360 |
1 files changed, 146 insertions, 214 deletions
diff --git a/database/engine/metric.c b/database/engine/metric.c index 69b8f3116..2e132612e 100644 --- a/database/engine/metric.c +++ b/database/engine/metric.c @@ -1,30 +1,44 @@ +// SPDX-License-Identifier: GPL-3.0-or-later #include "metric.h" typedef int32_t REFCOUNT; #define REFCOUNT_DELETING (-100) -typedef enum __attribute__ ((__packed__)) { - METRIC_FLAG_HAS_RETENTION = (1 << 0), -} METRIC_FLAGS; - struct metric { uuid_t uuid; // never changes Word_t section; // never changes - time_t first_time_s; // - time_t latest_time_s_clean; // archived pages latest time - time_t latest_time_s_hot; // latest time of the currently collected page - uint32_t latest_update_every_s; // + time_t first_time_s; // the timestamp of the oldest point in the database + time_t latest_time_s_clean; // the timestamp of the newest point in the database + time_t latest_time_s_hot; // the timestamp of the latest point that has been collected (not yet stored) + uint32_t latest_update_every_s; // the latest data collection frequency pid_t writer; uint8_t partition; - METRIC_FLAGS flags; REFCOUNT refcount; - SPINLOCK spinlock; // protects all variable members // THIS IS allocated with malloc() // YOU HAVE TO INITIALIZE IT YOURSELF ! }; +#define set_metric_field_with_condition(field, value, condition) ({ \ + typeof(field) _current = __atomic_load_n(&(field), __ATOMIC_RELAXED); \ + typeof(field) _wanted = value; \ + bool did_it = true; \ + \ + do { \ + if((condition) && (_current != _wanted)) { \ + ; \ + } \ + else { \ + did_it = false; \ + break; \ + } \ + } while(!__atomic_compare_exchange_n(&(field), &_current, _wanted, \ + false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); \ + \ + did_it; \ +}) + static struct aral_statistics mrg_aral_statistics; struct mrg { @@ -73,9 +87,6 @@ static inline void MRG_STATS_DELETE_MISS(MRG *mrg, size_t partition) { #define mrg_index_write_lock(mrg, partition) rw_spinlock_write_lock(&(mrg)->index[partition].rw_spinlock) #define mrg_index_write_unlock(mrg, partition) rw_spinlock_write_unlock(&(mrg)->index[partition].rw_spinlock) -#define metric_lock(metric) spinlock_lock(&(metric)->spinlock) -#define metric_unlock(metric) spinlock_unlock(&(metric)->spinlock) - static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl, size_t partition) { if(mem_after_judyl > mem_before_judyl) __atomic_add_fetch(&mrg->index[partition].stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED); @@ -97,40 +108,34 @@ static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) { return *n % mrg->partitions; } -static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) { - size_t partition = metric->partition; +static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused, METRIC *metric) { + time_t first_time_s = __atomic_load_n(&metric->first_time_s, __ATOMIC_RELAXED); - bool has_retention = (metric->first_time_s > 0 || metric->latest_time_s_clean > 0 || metric->latest_time_s_hot > 0); + if(first_time_s <= 0) { + first_time_s = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); + if(first_time_s <= 0) + first_time_s = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); - if(has_retention && !(metric->flags & METRIC_FLAG_HAS_RETENTION)) { - metric->flags |= METRIC_FLAG_HAS_RETENTION; - __atomic_add_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED); - } - else if(!has_retention && (metric->flags & METRIC_FLAG_HAS_RETENTION)) { - metric->flags &= ~METRIC_FLAG_HAS_RETENTION; - __atomic_sub_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED); + if(first_time_s <= 0) + first_time_s = 0; + else + __atomic_store_n(&metric->first_time_s, first_time_s, __ATOMIC_RELAXED); } - return has_retention; + return first_time_s; } -static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, bool having_spinlock) { +static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric) { size_t partition = metric->partition; + REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); REFCOUNT refcount; - if(!having_spinlock) - metric_lock(metric); - - if(unlikely(metric->refcount < 0)) - fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount); - - refcount = ++metric->refcount; - - // update its retention flags - metric_has_retention_unsafe(mrg, metric); + do { + if(expected < 0) + fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount); - if(!having_spinlock) - metric_unlock(metric); + refcount = expected + 1; + } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); if(refcount == 1) __atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); @@ -141,28 +146,25 @@ static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, b } static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) { - bool ret = true; size_t partition = metric->partition; + REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); REFCOUNT refcount; - metric_lock(metric); - - if(unlikely(metric->refcount <= 0)) - fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount); - - refcount = --metric->refcount; - - if(likely(metric_has_retention_unsafe(mrg, metric) || refcount != 0)) - ret = false; + do { + if(expected <= 0) + fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount); - metric_unlock(metric); + refcount = expected - 1; + } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); if(unlikely(!refcount)) __atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); __atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); - return ret; + time_t first, last, ue; + mrg_metric_get_retention(mrg, metric, &first, &last, &ue); + return (!first || !last || first > last); } static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { @@ -192,7 +194,7 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r if(unlikely(*PValue != NULL)) { METRIC *metric = *PValue; - metric_acquire(mrg, metric, false); + metric_acquire(mrg, metric); MRG_STATS_DUPLICATE_ADD(mrg, partition); @@ -215,10 +217,8 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r metric->latest_update_every_s = entry->latest_update_every_s; metric->writer = 0; metric->refcount = 0; - metric->flags = 0; metric->partition = partition; - spinlock_init(&metric->spinlock); - metric_acquire(mrg, metric, true); // no spinlock use required here + metric_acquire(mrg, metric); *PValue = metric; MRG_STATS_ADDED_METRIC(mrg, partition); @@ -252,7 +252,7 @@ static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t sect METRIC *metric = *PValue; - metric_acquire(mrg, metric, false); + metric_acquire(mrg, metric); mrg_index_read_unlock(mrg, partition); @@ -363,7 +363,7 @@ inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) { } inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { - metric_acquire(mrg, metric, false); + metric_acquire(mrg, metric); return metric; } @@ -389,10 +389,7 @@ inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, if(unlikely(first_time_s < 0)) return false; - metric_lock(metric); - metric->first_time_s = first_time_s; - metric_has_retention_unsafe(mrg, metric); - metric_unlock(metric); + __atomic_store_n(&metric->first_time_s, first_time_s, __ATOMIC_RELAXED); return true; } @@ -405,112 +402,56 @@ inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, internal_fatal(last_time_s > max_acceptable_collected_time(), "DBENGINE METRIC: metric last time is in the future"); - if(unlikely(first_time_s < 0)) - first_time_s = 0; - - if(unlikely(last_time_s < 0)) - last_time_s = 0; - - if(unlikely(update_every_s < 0)) - update_every_s = 0; - - if(unlikely(!first_time_s && !last_time_s && !update_every_s)) - return; + if(first_time_s > 0) + set_metric_field_with_condition(metric->first_time_s, first_time_s, _current <= 0 || _wanted < _current); - metric_lock(metric); - - if(unlikely(first_time_s && (!metric->first_time_s || first_time_s < metric->first_time_s))) - metric->first_time_s = first_time_s; - - if(likely(last_time_s && (!metric->latest_time_s_clean || last_time_s > metric->latest_time_s_clean))) { - metric->latest_time_s_clean = last_time_s; - - if(likely(update_every_s)) - metric->latest_update_every_s = (uint32_t) update_every_s; + if(last_time_s > 0) { + if(set_metric_field_with_condition(metric->latest_time_s_clean, last_time_s, _current <= 0 || _wanted > _current) && + update_every_s > 0) + // set the latest update every too + set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true); } - else if(unlikely(!metric->latest_update_every_s && update_every_s)) - metric->latest_update_every_s = (uint32_t) update_every_s; - - metric_has_retention_unsafe(mrg, metric); - metric_unlock(metric); + else if(update_every_s > 0) + // set it only if it is invalid + set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0); } inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative"); - - bool ret = false; - - metric_lock(metric); - if(first_time_s > metric->first_time_s) { - metric->first_time_s = first_time_s; - ret = true; - } - metric_has_retention_unsafe(mrg, metric); - metric_unlock(metric); - - return ret; + return set_metric_field_with_condition(metric->first_time_s, first_time_s, _wanted > _current); } inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { - time_t first_time_s; - - metric_lock(metric); - - if(unlikely(!metric->first_time_s)) { - if(metric->latest_time_s_clean) - metric->first_time_s = metric->latest_time_s_clean; - - else if(metric->latest_time_s_hot) - metric->first_time_s = metric->latest_time_s_hot; - } - - first_time_s = metric->first_time_s; - - metric_unlock(metric); - - return first_time_s; + return mrg_metric_get_first_time_s_smart(mrg, metric); } inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) { - metric_lock(metric); - - if(unlikely(!metric->first_time_s)) { - if(metric->latest_time_s_clean) - metric->first_time_s = metric->latest_time_s_clean; - - else if(metric->latest_time_s_hot) - metric->first_time_s = metric->latest_time_s_hot; - } - - *first_time_s = metric->first_time_s; - *last_time_s = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot); - *update_every_s = metric->latest_update_every_s; + time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); + time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); - metric_unlock(metric); + *last_time_s = MAX(clean, hot); + *first_time_s = mrg_metric_get_first_time_s_smart(mrg, metric); + *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED); } inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative"); - if(unlikely(latest_time_s < 0)) - return false; - - metric_lock(metric); - // internal_fatal(latest_time_s > max_acceptable_collected_time(), // "DBENGINE METRIC: metric latest time is in the future"); // internal_fatal(metric->latest_time_s_clean > latest_time_s, // "DBENGINE METRIC: metric new clean latest time is older than the previous one"); - metric->latest_time_s_clean = latest_time_s; + if(latest_time_s > 0) { + if(set_metric_field_with_condition(metric->latest_time_s_clean, latest_time_s, true)) { + set_metric_field_with_condition(metric->first_time_s, latest_time_s, _current <= 0 || _wanted < _current); - if(unlikely(!metric->first_time_s)) - metric->first_time_s = latest_time_s; + return true; + } + } - metric_has_retention_unsafe(mrg, metric); - metric_unlock(metric); - return true; + return false; } // returns true when metric still has retention @@ -518,7 +459,6 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr Word_t section = mrg_metric_section(mrg, metric); bool do_again = false; size_t countdown = 5; - bool ret = true; do { time_t min_first_time_s = LONG_MAX; @@ -547,22 +487,20 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr if (min_first_time_s == LONG_MAX) min_first_time_s = 0; - metric_lock(metric); - if (--countdown && !min_first_time_s && metric->latest_time_s_hot) + if (--countdown && !min_first_time_s && __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED)) do_again = true; else { internal_error(!countdown, "METRIC: giving up on updating the retention of metric without disk retention"); do_again = false; - metric->first_time_s = min_first_time_s; - metric->latest_time_s_clean = max_end_time_s; - - ret = metric_has_retention_unsafe(mrg, metric); + set_metric_field_with_condition(metric->first_time_s, min_first_time_s, true); + set_metric_field_with_condition(metric->latest_time_s_clean, max_end_time_s, true); } - metric_unlock(metric); } while(do_again); - return ret; + time_t first, last, ue; + mrg_metric_get_retention(mrg, metric, &first, &last, &ue); + return (first && last && first < last); } inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { @@ -571,88 +509,80 @@ inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *me // internal_fatal(latest_time_s > max_acceptable_collected_time(), // "DBENGINE METRIC: metric latest time is in the future"); - if(unlikely(latest_time_s < 0)) - return false; - - metric_lock(metric); - metric->latest_time_s_hot = latest_time_s; - - if(unlikely(!metric->first_time_s)) - metric->first_time_s = latest_time_s; + if(likely(latest_time_s > 0)) { + __atomic_store_n(&metric->latest_time_s_hot, latest_time_s, __ATOMIC_RELAXED); + return true; + } - metric_has_retention_unsafe(mrg, metric); - metric_unlock(metric); - return true; + return false; } inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) { - time_t max; - metric_lock(metric); - max = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot); - metric_unlock(metric); - return max; + time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); + time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); + + return MAX(clean, hot); } inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); - if(update_every_s <= 0) - return false; - - metric_lock(metric); - metric->latest_update_every_s = (uint32_t) update_every_s; - metric_unlock(metric); + if(update_every_s > 0) + return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true); - return true; + return false; } inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); - if(update_every_s <= 0) - return false; - - metric_lock(metric); - if(!metric->latest_update_every_s) - metric->latest_update_every_s = (uint32_t) update_every_s; - metric_unlock(metric); + if(update_every_s > 0) + return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0); - return true; + return false; } inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) { - time_t update_every_s; - - metric_lock(metric); - update_every_s = metric->latest_update_every_s; - metric_unlock(metric); - - return update_every_s; + return __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED); } inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) { - bool done = false; - metric_lock(metric); - if(!metric->writer) { - metric->writer = gettid(); + pid_t expected = __atomic_load_n(&metric->writer, __ATOMIC_RELAXED); + pid_t wanted = gettid(); + bool done = true; + + do { + if(expected != 0) { + done = false; + break; + } + } while(!__atomic_compare_exchange_n(&metric->writer, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + if(done) __atomic_add_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED); - done = true; - } else __atomic_add_fetch(&mrg->index[metric->partition].stats.writers_conflicts, 1, __ATOMIC_RELAXED); - metric_unlock(metric); + return done; } inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) { - bool done = false; - metric_lock(metric); - if(metric->writer) { - metric->writer = 0; + // this function can be called from a different thread than the one than the writer + + pid_t expected = __atomic_load_n(&metric->writer, __ATOMIC_RELAXED); + pid_t wanted = 0; + bool done = true; + + do { + if(!expected) { + done = false; + break; + } + } while(!__atomic_compare_exchange_n(&metric->writer, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + if(done) __atomic_sub_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED); - done = true; - } - metric_unlock(metric); + return done; } @@ -662,27 +592,30 @@ inline void mrg_update_metric_retention_and_granularity_by_uuid( time_t update_every_s, time_t now_s) { if(unlikely(last_time_s > now_s)) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), " - "fixing last time to now", - first_time_s, last_time_s, now_s); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), " + "fixing last time to now", + first_time_s, last_time_s, now_s); last_time_s = now_s; } if (unlikely(first_time_s > last_time_s)) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), " - "fixing first time to last time", - first_time_s, last_time_s, now_s); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), " + "fixing first time to last time", + first_time_s, last_time_s, now_s); first_time_s = last_time_s; } if (unlikely(first_time_s == 0 || last_time_s == 0)) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), " - "using them as-is", - first_time_s, last_time_s, now_s); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), " + "using them as-is", + first_time_s, last_time_s, now_s); } bool added = false; @@ -710,7 +643,6 @@ inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) { for(size_t i = 0; i < mrg->partitions ;i++) { s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED); s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED); - s->entries_with_retention += __atomic_load_n(&mrg->index[i].stats.entries_with_retention, __ATOMIC_RELAXED); s->size += __atomic_load_n(&mrg->index[i].stats.size, __ATOMIC_RELAXED); s->current_references += __atomic_load_n(&mrg->index[i].stats.current_references, __ATOMIC_RELAXED); s->additions += __atomic_load_n(&mrg->index[i].stats.additions, __ATOMIC_RELAXED); @@ -900,7 +832,7 @@ int mrg_unittest(void) { pthread_t th[threads]; for(size_t i = 0; i < threads ; i++) { char buf[15 + 1]; - snprintfz(buf, 15, "TH[%zu]", i); + snprintfz(buf, sizeof(buf) - 1, "TH[%zu]", i); netdata_thread_create(&th[i], buf, NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, mrg_stress, &t); |