summaryrefslogtreecommitdiffstats
path: root/src/database/engine/metric.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/database/engine/metric.c (renamed from database/engine/metric.c)331
1 files changed, 202 insertions, 129 deletions
diff --git a/database/engine/metric.c b/src/database/engine/metric.c
index 2e132612e..01eb22fbc 100644
--- a/database/engine/metric.c
+++ b/src/database/engine/metric.c
@@ -1,5 +1,8 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "metric.h"
+#include "cache.h"
+#include "libnetdata/locks/locks.h"
+#include "rrddiskprotocol.h"
typedef int32_t REFCOUNT;
#define REFCOUNT_DELETING (-100)
@@ -104,8 +107,11 @@ static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg, size_t partition
static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) {
uint8_t *u = (uint8_t *)uuid;
- size_t *n = (size_t *)&u[UUID_SZ - sizeof(size_t)];
- return *n % mrg->partitions;
+
+ size_t n;
+ memcpy(&n, &u[UUID_SZ - sizeof(size_t)], sizeof(size_t));
+
+ return n % mrg->partitions;
}
static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused, METRIC *metric) {
@@ -125,87 +131,174 @@ static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused,
return first_time_s;
}
-static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric) {
+static void metric_log(MRG *mrg __maybe_unused, METRIC *metric, const char *msg) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)metric->section;
+
+ char uuid[UUID_STR_LEN];
+ uuid_unparse_lower(metric->uuid, uuid);
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "METRIC: %s on %s at tier %d, refcount %d, partition %u, "
+ "retention [%ld - %ld (hot), %ld (clean)], update every %"PRIu32", "
+ "writer pid %d "
+ "--- PLEASE OPEN A GITHUB ISSUE TO REPORT THIS LOG LINE TO NETDATA --- ",
+ msg,
+ uuid,
+ ctx->config.tier,
+ metric->refcount,
+ metric->partition,
+ metric->first_time_s,
+ metric->latest_time_s_hot,
+ metric->latest_time_s_clean,
+ metric->latest_update_every_s,
+ (int)metric->writer
+ );
+}
+
+static inline bool acquired_metric_has_retention(MRG *mrg, METRIC *metric) {
+ time_t first, last;
+ mrg_metric_get_retention(mrg, metric, &first, &last, NULL);
+ return (!first || !last || first > last);
+}
+
+static inline void acquired_for_deletion_metric_delete(MRG *mrg, METRIC *metric) {
size_t partition = metric->partition;
- REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
- REFCOUNT refcount;
+
+ size_t mem_before_judyl, mem_after_judyl;
+
+ mrg_index_write_lock(mrg, partition);
+
+ Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t));
+ if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) {
+ MRG_STATS_DELETE_MISS(mrg, partition);
+ mrg_index_write_unlock(mrg, partition);
+ return;
+ }
+
+ mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
+ int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0);
+ mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
+ mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
+
+ if(unlikely(!rc)) {
+ MRG_STATS_DELETE_MISS(mrg, partition);
+ mrg_index_write_unlock(mrg, partition);
+ return;
+ }
+
+ if(!*sections_judy_pptr) {
+ rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0);
+ if(unlikely(!rc))
+ fatal("DBENGINE METRIC: cannot delete UUID from JudyHS");
+ mrg_stats_size_judyhs_removed_uuid(mrg, partition);
+ }
+
+ MRG_STATS_DELETED_METRIC(mrg, partition);
+
+ mrg_index_write_unlock(mrg, partition);
+
+ aral_freez(mrg->index[partition].aral, metric);
+}
+
+static inline bool metric_acquire(MRG *mrg, METRIC *metric) {
+ REFCOUNT expected, desired;
+
+ expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
do {
- if(expected < 0)
- fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount);
+ if(unlikely(expected < 0))
+ return false;
+
+ desired = expected + 1;
- refcount = expected + 1;
- } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+ } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED));
+
+ size_t partition = metric->partition;
- if(refcount == 1)
+ if(desired == 1)
__atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
- return refcount;
+ return true;
}
-static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) {
+static inline bool metric_release(MRG *mrg, METRIC *metric, bool delete_if_last_without_retention) {
size_t partition = metric->partition;
- REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
- REFCOUNT refcount;
+ REFCOUNT expected, desired;
+
+ expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
do {
- if(expected <= 0)
- fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount);
+ if(expected <= 0) {
+ metric_log(mrg, metric, "refcount is zero or negative during release");
+ fatal("METRIC: refcount is %d (zero or negative) during release", expected);
+ }
- refcount = expected - 1;
- } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+ if(expected == 1 && delete_if_last_without_retention && !acquired_metric_has_retention(mrg, metric))
+ desired = REFCOUNT_DELETING;
+ else
+ desired = expected - 1;
+
+ } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED));
- if(unlikely(!refcount))
+ if(desired == 0 || desired == REFCOUNT_DELETING) {
__atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
+ if(desired == REFCOUNT_DELETING)
+ acquired_for_deletion_metric_delete(mrg, metric);
+ }
+
__atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
- time_t first, last, ue;
- mrg_metric_get_retention(mrg, metric, &first, &last, &ue);
- return (!first || !last || first > last);
+ return desired == REFCOUNT_DELETING;
}
static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
size_t partition = uuid_partition(mrg, entry->uuid);
METRIC *allocation = aral_mallocz(mrg->index[partition].aral);
+ Pvoid_t *PValue;
- mrg_index_write_lock(mrg, partition);
+ while(1) {
+ mrg_index_write_lock(mrg, partition);
- size_t mem_before_judyl, mem_after_judyl;
+ size_t mem_before_judyl, mem_after_judyl;
- Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, entry->uuid, sizeof(uuid_t), PJE0);
- if(unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR))
- fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array");
+ Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, entry->uuid, sizeof(uuid_t), PJE0);
+ if (unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR))
+ fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array");
- if(unlikely(!*sections_judy_pptr))
- mrg_stats_size_judyhs_added_uuid(mrg, partition);
+ if (unlikely(!*sections_judy_pptr))
+ mrg_stats_size_judyhs_added_uuid(mrg, partition);
- mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
- Pvoid_t *PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0);
- mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
- mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
+ mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
+ PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0);
+ mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
+ mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
- if(unlikely(!PValue || PValue == PJERR))
- fatal("DBENGINE METRIC: corrupted section JudyL array");
+ if (unlikely(!PValue || PValue == PJERR))
+ fatal("DBENGINE METRIC: corrupted section JudyL array");
- if(unlikely(*PValue != NULL)) {
- METRIC *metric = *PValue;
+ if (unlikely(*PValue != NULL)) {
+ METRIC *metric = *PValue;
- metric_acquire(mrg, metric);
+ if(!metric_acquire(mrg, metric)) {
+ mrg_index_write_unlock(mrg, partition);
+ continue;
+ }
- MRG_STATS_DUPLICATE_ADD(mrg, partition);
+ MRG_STATS_DUPLICATE_ADD(mrg, partition);
+ mrg_index_write_unlock(mrg, partition);
- mrg_index_write_unlock(mrg, partition);
+ if (ret)
+ *ret = false;
- if(ret)
- *ret = false;
+ aral_freez(mrg->index[partition].aral, allocation);
- aral_freez(mrg->index[partition].aral, allocation);
+ return metric;
+ }
- return metric;
+ break;
}
METRIC *metric = allocation;
@@ -216,9 +309,8 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r
metric->latest_time_s_hot = 0;
metric->latest_update_every_s = entry->latest_update_every_s;
metric->writer = 0;
- metric->refcount = 0;
+ metric->refcount = 1;
metric->partition = partition;
- metric_acquire(mrg, metric);
*PValue = metric;
MRG_STATS_ADDED_METRIC(mrg, partition);
@@ -234,77 +326,35 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r
static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
size_t partition = uuid_partition(mrg, uuid);
- mrg_index_read_lock(mrg, partition);
+ while(1) {
+ mrg_index_read_lock(mrg, partition);
- Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t));
- if(unlikely(!sections_judy_pptr)) {
- mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_MISS(mrg, partition);
- return NULL;
- }
-
- Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0);
- if(unlikely(!PValue)) {
- mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_MISS(mrg, partition);
- return NULL;
- }
-
- METRIC *metric = *PValue;
-
- metric_acquire(mrg, metric);
-
- mrg_index_read_unlock(mrg, partition);
-
- MRG_STATS_SEARCH_HIT(mrg, partition);
- return metric;
-}
-
-static inline bool acquired_metric_del(MRG *mrg, METRIC *metric) {
- size_t partition = metric->partition;
-
- size_t mem_before_judyl, mem_after_judyl;
-
- mrg_index_write_lock(mrg, partition);
+ Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t));
+ if (unlikely(!sections_judy_pptr)) {
+ mrg_index_read_unlock(mrg, partition);
+ MRG_STATS_SEARCH_MISS(mrg, partition);
+ return NULL;
+ }
- if(!metric_release_and_can_be_deleted(mrg, metric)) {
- mrg->index[partition].stats.delete_having_retention_or_referenced++;
- mrg_index_write_unlock(mrg, partition);
- return false;
- }
+ Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0);
+ if (unlikely(!PValue)) {
+ mrg_index_read_unlock(mrg, partition);
+ MRG_STATS_SEARCH_MISS(mrg, partition);
+ return NULL;
+ }
- Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t));
- if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) {
- MRG_STATS_DELETE_MISS(mrg, partition);
- mrg_index_write_unlock(mrg, partition);
- return false;
- }
+ METRIC *metric = *PValue;
- mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
- int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0);
- mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
- mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
+ if(metric && !metric_acquire(mrg, metric))
+ metric = NULL;
- if(unlikely(!rc)) {
- MRG_STATS_DELETE_MISS(mrg, partition);
- mrg_index_write_unlock(mrg, partition);
- return false;
- }
+ mrg_index_read_unlock(mrg, partition);
- if(!*sections_judy_pptr) {
- rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0);
- if(unlikely(!rc))
- fatal("DBENGINE METRIC: cannot delete UUID from JudyHS");
- mrg_stats_size_judyhs_removed_uuid(mrg, partition);
+ if(metric) {
+ MRG_STATS_SEARCH_HIT(mrg, partition);
+ return metric;
+ }
}
-
- MRG_STATS_DELETED_METRIC(mrg, partition);
-
- mrg_index_write_unlock(mrg, partition);
-
- aral_freez(mrg->index[partition].aral, metric);
-
- return true;
}
// ----------------------------------------------------------------------------
@@ -359,7 +409,7 @@ inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section
}
inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
- return acquired_metric_del(mrg, metric);
+ return metric_release(mrg, metric, true);
}
inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
@@ -367,8 +417,8 @@ inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
return metric;
}
-inline bool mrg_metric_release(MRG *mrg, METRIC *metric) {
- return metric_release_and_can_be_deleted(mrg, metric);
+inline void mrg_metric_release(MRG *mrg, METRIC *metric) {
+ metric_release(mrg, metric, false);
}
inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) {
@@ -394,8 +444,8 @@ inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric,
return true;
}
-inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) {
- internal_fatal(first_time_s < 0 || last_time_s < 0 || update_every_s < 0,
+inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, uint32_t update_every_s) {
+ internal_fatal(first_time_s < 0 || last_time_s < 0,
"DBENGINE METRIC: timestamp is negative");
internal_fatal(first_time_s > max_acceptable_collected_time(),
"DBENGINE METRIC: metric first time is in the future");
@@ -425,13 +475,14 @@ inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metri
return mrg_metric_get_first_time_s_smart(mrg, metric);
}
-inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) {
+inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, uint32_t *update_every_s) {
time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
*last_time_s = MAX(clean, hot);
*first_time_s = mrg_metric_get_first_time_s_smart(mrg, metric);
- *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
+ if (update_every_s)
+ *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
}
inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
@@ -498,8 +549,8 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr
}
} while(do_again);
- time_t first, last, ue;
- mrg_metric_get_retention(mrg, metric, &first, &last, &ue);
+ time_t first, last;
+ mrg_metric_get_retention(mrg, metric, &first, &last, NULL);
return (first && last && first < last);
}
@@ -517,6 +568,11 @@ inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *me
return false;
}
+inline time_t mrg_metric_get_latest_clean_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
+ time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
+ return clean;
+}
+
inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
@@ -524,25 +580,21 @@ inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metr
return MAX(clean, hot);
}
-inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
- internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
-
+inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, uint32_t update_every_s) {
if(update_every_s > 0)
return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true);
return false;
}
-inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
- internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
-
+inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, uint32_t update_every_s) {
if(update_every_s > 0)
return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0);
return false;
}
-inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
+inline uint32_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
return __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
}
@@ -589,7 +641,7 @@ inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
inline void mrg_update_metric_retention_and_granularity_by_uuid(
MRG *mrg, Word_t section, uuid_t *uuid,
time_t first_time_s, time_t last_time_s,
- time_t update_every_s, time_t now_s)
+ uint32_t update_every_s, time_t now_s)
{
if(unlikely(last_time_s > now_s)) {
nd_log_limit_static_global_var(erl, 1, 0);
@@ -626,14 +678,35 @@ inline void mrg_update_metric_retention_and_granularity_by_uuid(
.section = section,
.first_time_s = first_time_s,
.last_time_s = last_time_s,
- .latest_update_every_s = (uint32_t) update_every_s
+ .latest_update_every_s = update_every_s
};
metric = mrg_metric_add_and_acquire(mrg, entry, &added);
}
- if (likely(!added))
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *) section;
+ if (likely(!added)) {
+ uint64_t old_samples = 0;
+
+ if (update_every_s && metric->latest_update_every_s && metric->latest_time_s_clean)
+ old_samples = (metric->latest_time_s_clean - metric->first_time_s) / metric->latest_update_every_s;
+
mrg_metric_expand_retention(mrg, metric, first_time_s, last_time_s, update_every_s);
+ uint64_t new_samples = 0;
+ if (update_every_s && metric->latest_update_every_s && metric->latest_time_s_clean)
+ new_samples = (metric->latest_time_s_clean - metric->first_time_s) / metric->latest_update_every_s;
+
+ __atomic_add_fetch(&ctx->atomic.samples, new_samples - old_samples, __ATOMIC_RELAXED);
+ }
+ else {
+ // Newly added
+ if (update_every_s) {
+ uint64_t samples = (last_time_s - first_time_s) / update_every_s;
+ __atomic_add_fetch(&ctx->atomic.samples, samples, __ATOMIC_RELAXED);
+ }
+ __atomic_add_fetch(&ctx->atomic.metrics, 1, __ATOMIC_RELAXED);
+ }
+
mrg_metric_release(mrg, metric);
}