summaryrefslogtreecommitdiffstats
path: root/database/engine/metric.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--database/engine/metric.c578
1 files changed, 308 insertions, 270 deletions
diff --git a/database/engine/metric.c b/database/engine/metric.c
index 6b65df9bb..1370f9d7a 100644
--- a/database/engine/metric.c
+++ b/database/engine/metric.c
@@ -16,6 +16,7 @@ struct metric {
time_t latest_time_s_hot; // latest time of the currently collected page
uint32_t latest_update_every_s; //
pid_t writer;
+ uint8_t partition;
METRIC_FLAGS flags;
REFCOUNT refcount;
SPINLOCK spinlock; // protects all variable members
@@ -27,103 +28,98 @@ struct metric {
static struct aral_statistics mrg_aral_statistics;
struct mrg {
- ARAL *aral[MRG_PARTITIONS];
+ size_t partitions;
- struct pgc_index {
- netdata_rwlock_t rwlock;
- Pvoid_t uuid_judy; // each UUID has a JudyL of sections (tiers)
- } index[MRG_PARTITIONS];
+ struct mrg_partition {
+ ARAL *aral; // not protected by our spinlock - it has its own
- struct mrg_statistics stats;
+ RW_SPINLOCK rw_spinlock;
+ Pvoid_t uuid_judy; // JudyHS: each UUID has a JudyL of sections (tiers)
- size_t entries_per_partition[MRG_PARTITIONS];
+ struct mrg_statistics stats;
+ } index[];
};
-static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg) {
- __atomic_add_fetch(&mrg->stats.additions_duplicate, 1, __ATOMIC_RELAXED);
+static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg, size_t partition) {
+ mrg->index[partition].stats.additions_duplicate++;
}
static inline void MRG_STATS_ADDED_METRIC(MRG *mrg, size_t partition) {
- __atomic_add_fetch(&mrg->stats.entries, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&mrg->stats.additions, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&mrg->stats.size, sizeof(METRIC), __ATOMIC_RELAXED);
-
- __atomic_add_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED);
+ mrg->index[partition].stats.entries++;
+ mrg->index[partition].stats.additions++;
+ mrg->index[partition].stats.size += sizeof(METRIC);
}
static inline void MRG_STATS_DELETED_METRIC(MRG *mrg, size_t partition) {
- __atomic_sub_fetch(&mrg->stats.entries, 1, __ATOMIC_RELAXED);
- __atomic_sub_fetch(&mrg->stats.size, sizeof(METRIC), __ATOMIC_RELAXED);
- __atomic_add_fetch(&mrg->stats.deletions, 1, __ATOMIC_RELAXED);
-
- __atomic_sub_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED);
+ mrg->index[partition].stats.entries--;
+ mrg->index[partition].stats.size -= sizeof(METRIC);
+ mrg->index[partition].stats.deletions++;
}
-static inline void MRG_STATS_SEARCH_HIT(MRG *mrg) {
- __atomic_add_fetch(&mrg->stats.search_hits, 1, __ATOMIC_RELAXED);
+static inline void MRG_STATS_SEARCH_HIT(MRG *mrg, size_t partition) {
+ __atomic_add_fetch(&mrg->index[partition].stats.search_hits, 1, __ATOMIC_RELAXED);
}
-static inline void MRG_STATS_SEARCH_MISS(MRG *mrg) {
- __atomic_add_fetch(&mrg->stats.search_misses, 1, __ATOMIC_RELAXED);
+static inline void MRG_STATS_SEARCH_MISS(MRG *mrg, size_t partition) {
+ __atomic_add_fetch(&mrg->index[partition].stats.search_misses, 1, __ATOMIC_RELAXED);
}
-static inline void MRG_STATS_DELETE_MISS(MRG *mrg) {
- __atomic_add_fetch(&mrg->stats.delete_misses, 1, __ATOMIC_RELAXED);
+static inline void MRG_STATS_DELETE_MISS(MRG *mrg, size_t partition) {
+ mrg->index[partition].stats.delete_misses++;
}
-static inline void mrg_index_read_lock(MRG *mrg, size_t partition) {
- netdata_rwlock_rdlock(&mrg->index[partition].rwlock);
-}
-static inline void mrg_index_read_unlock(MRG *mrg, size_t partition) {
- netdata_rwlock_unlock(&mrg->index[partition].rwlock);
-}
-static inline void mrg_index_write_lock(MRG *mrg, size_t partition) {
- netdata_rwlock_wrlock(&mrg->index[partition].rwlock);
-}
-static inline void mrg_index_write_unlock(MRG *mrg, size_t partition) {
- netdata_rwlock_unlock(&mrg->index[partition].rwlock);
-}
+#define mrg_index_read_lock(mrg, partition) rw_spinlock_read_lock(&(mrg)->index[partition].rw_spinlock)
+#define mrg_index_read_unlock(mrg, partition) rw_spinlock_read_unlock(&(mrg)->index[partition].rw_spinlock)
+#define mrg_index_write_lock(mrg, partition) rw_spinlock_write_lock(&(mrg)->index[partition].rw_spinlock)
+#define mrg_index_write_unlock(mrg, partition) rw_spinlock_write_unlock(&(mrg)->index[partition].rw_spinlock)
+
+#define metric_lock(metric) spinlock_lock(&(metric)->spinlock)
+#define metric_unlock(metric) spinlock_unlock(&(metric)->spinlock)
-static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl) {
+static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl, size_t partition) {
if(mem_after_judyl > mem_before_judyl)
- __atomic_add_fetch(&mrg->stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&mrg->index[partition].stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED);
else if(mem_after_judyl < mem_before_judyl)
- __atomic_sub_fetch(&mrg->stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&mrg->index[partition].stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED);
}
-static inline void mrg_stats_size_judyhs_added_uuid(MRG *mrg) {
- __atomic_add_fetch(&mrg->stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
+static inline void mrg_stats_size_judyhs_added_uuid(MRG *mrg, size_t partition) {
+ __atomic_add_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
}
-static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg) {
- __atomic_sub_fetch(&mrg->stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
+static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg, size_t partition) {
+ __atomic_sub_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
}
static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) {
uint8_t *u = (uint8_t *)uuid;
- return u[UUID_SZ - 1] % MRG_PARTITIONS;
+ size_t *n = (size_t *)&u[UUID_SZ - sizeof(size_t)];
+ return *n % mrg->partitions;
}
static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) {
+ size_t partition = metric->partition;
+
bool has_retention = (metric->first_time_s > 0 || metric->latest_time_s_clean > 0 || metric->latest_time_s_hot > 0);
if(has_retention && !(metric->flags & METRIC_FLAG_HAS_RETENTION)) {
metric->flags |= METRIC_FLAG_HAS_RETENTION;
- __atomic_add_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED);
}
else if(!has_retention && (metric->flags & METRIC_FLAG_HAS_RETENTION)) {
metric->flags &= ~METRIC_FLAG_HAS_RETENTION;
- __atomic_sub_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED);
}
return has_retention;
}
static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, bool having_spinlock) {
+ size_t partition = metric->partition;
REFCOUNT refcount;
if(!having_spinlock)
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(unlikely(metric->refcount < 0))
fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount);
@@ -134,21 +130,22 @@ static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, b
metric_has_retention_unsafe(mrg, metric);
if(!having_spinlock)
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
if(refcount == 1)
- __atomic_add_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
return refcount;
}
static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) {
bool ret = true;
+ size_t partition = metric->partition;
REFCOUNT refcount;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(unlikely(metric->refcount <= 0))
fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount);
@@ -158,20 +155,20 @@ static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, ME
if(likely(metric_has_retention_unsafe(mrg, metric) || refcount != 0))
ret = false;
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
if(unlikely(!refcount))
- __atomic_sub_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
- __atomic_sub_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
return ret;
}
-static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
+static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
size_t partition = uuid_partition(mrg, &entry->uuid);
- METRIC *allocation = aral_mallocz(mrg->aral[partition]);
+ METRIC *allocation = aral_mallocz(mrg->index[partition].aral);
mrg_index_write_lock(mrg, partition);
@@ -182,12 +179,12 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array");
if(unlikely(!*sections_judy_pptr))
- mrg_stats_size_judyhs_added_uuid(mrg);
+ mrg_stats_size_judyhs_added_uuid(mrg, partition);
mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
Pvoid_t *PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0);
mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
- mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl);
+ mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
if(unlikely(!PValue || PValue == PJERR))
fatal("DBENGINE METRIC: corrupted section JudyL array");
@@ -196,18 +193,21 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
METRIC *metric = *PValue;
metric_acquire(mrg, metric, false);
+
+ MRG_STATS_DUPLICATE_ADD(mrg, partition);
+
mrg_index_write_unlock(mrg, partition);
if(ret)
*ret = false;
- aral_freez(mrg->aral[partition], allocation);
+ aral_freez(mrg->index[partition].aral, allocation);
- MRG_STATS_DUPLICATE_ADD(mrg);
return metric;
}
METRIC *metric = allocation;
+ // memcpy(metric->uuid, entry->uuid, sizeof(uuid_t));
uuid_copy(metric->uuid, entry->uuid);
metric->section = entry->section;
metric->first_time_s = MAX(0, entry->first_time_s);
@@ -217,21 +217,22 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
metric->writer = 0;
metric->refcount = 0;
metric->flags = 0;
- netdata_spinlock_init(&metric->spinlock);
+ metric->partition = partition;
+ spinlock_init(&metric->spinlock);
metric_acquire(mrg, metric, true); // no spinlock use required here
*PValue = metric;
+ MRG_STATS_ADDED_METRIC(mrg, partition);
+
mrg_index_write_unlock(mrg, partition);
if(ret)
*ret = true;
- MRG_STATS_ADDED_METRIC(mrg, partition);
-
return metric;
}
-static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
+static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
size_t partition = uuid_partition(mrg, uuid);
mrg_index_read_lock(mrg, partition);
@@ -239,14 +240,14 @@ static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t));
if(unlikely(!sections_judy_pptr)) {
mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_MISS(mrg);
+ MRG_STATS_SEARCH_MISS(mrg, partition);
return NULL;
}
Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0);
if(unlikely(!PValue)) {
mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_MISS(mrg);
+ MRG_STATS_SEARCH_MISS(mrg, partition);
return NULL;
}
@@ -256,38 +257,38 @@ static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_HIT(mrg);
+ MRG_STATS_SEARCH_HIT(mrg, partition);
return metric;
}
-static bool acquired_metric_del(MRG *mrg, METRIC *metric) {
- size_t partition = uuid_partition(mrg, &metric->uuid);
+static inline bool acquired_metric_del(MRG *mrg, METRIC *metric) {
+ size_t partition = metric->partition;
size_t mem_before_judyl, mem_after_judyl;
mrg_index_write_lock(mrg, partition);
if(!metric_release_and_can_be_deleted(mrg, metric)) {
+ mrg->index[partition].stats.delete_having_retention_or_referenced++;
mrg_index_write_unlock(mrg, partition);
- __atomic_add_fetch(&mrg->stats.delete_having_retention_or_referenced, 1, __ATOMIC_RELAXED);
return false;
}
Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t));
if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) {
+ MRG_STATS_DELETE_MISS(mrg, partition);
mrg_index_write_unlock(mrg, partition);
- MRG_STATS_DELETE_MISS(mrg);
return false;
}
mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0);
mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
- mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl);
+ mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
if(unlikely(!rc)) {
+ MRG_STATS_DELETE_MISS(mrg, partition);
mrg_index_write_unlock(mrg, partition);
- MRG_STATS_DELETE_MISS(mrg);
return false;
}
@@ -295,14 +296,14 @@ static bool acquired_metric_del(MRG *mrg, METRIC *metric) {
rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0);
if(unlikely(!rc))
fatal("DBENGINE METRIC: cannot delete UUID from JudyHS");
- mrg_stats_size_judyhs_removed_uuid(mrg);
+ mrg_stats_size_judyhs_removed_uuid(mrg, partition);
}
- mrg_index_write_unlock(mrg, partition);
+ MRG_STATS_DELETED_METRIC(mrg, partition);
- aral_freez(mrg->aral[partition], metric);
+ mrg_index_write_unlock(mrg, partition);
- MRG_STATS_DELETED_METRIC(mrg, partition);
+ aral_freez(mrg->index[partition].aral, metric);
return true;
}
@@ -310,38 +311,34 @@ static bool acquired_metric_del(MRG *mrg, METRIC *metric) {
// ----------------------------------------------------------------------------
// public API
-MRG *mrg_create(void) {
- MRG *mrg = callocz(1, sizeof(MRG));
+inline MRG *mrg_create(ssize_t partitions) {
+ if(partitions < 1)
+ partitions = get_netdata_cpus();
+
+ MRG *mrg = callocz(1, sizeof(MRG) + sizeof(struct mrg_partition) * partitions);
+ mrg->partitions = partitions;
- for(size_t i = 0; i < MRG_PARTITIONS ; i++) {
- netdata_rwlock_init(&mrg->index[i].rwlock);
+ for(size_t i = 0; i < mrg->partitions ; i++) {
+ rw_spinlock_init(&mrg->index[i].rw_spinlock);
char buf[ARAL_MAX_NAME + 1];
snprintfz(buf, ARAL_MAX_NAME, "mrg[%zu]", i);
- mrg->aral[i] = aral_create(buf,
- sizeof(METRIC),
- 0,
- 16384,
- &mrg_aral_statistics,
- NULL, NULL, false,
- false);
+ mrg->index[i].aral = aral_create(buf, sizeof(METRIC), 0, 16384, &mrg_aral_statistics, NULL, NULL, false, false);
}
- mrg->stats.size = sizeof(MRG);
-
return mrg;
}
-size_t mrg_aral_structures(void) {
+inline size_t mrg_aral_structures(void) {
return aral_structures_from_stats(&mrg_aral_statistics);
}
-size_t mrg_aral_overhead(void) {
+inline size_t mrg_aral_overhead(void) {
return aral_overhead_from_stats(&mrg_aral_statistics);
}
-void mrg_destroy(MRG *mrg __maybe_unused) {
+inline void mrg_destroy(MRG *mrg __maybe_unused) {
// no destruction possible
// we can't traverse the metrics list
@@ -351,57 +348,57 @@ void mrg_destroy(MRG *mrg __maybe_unused) {
;
}
-METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) {
+inline METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) {
// internal_fatal(entry.latest_time_s > max_acceptable_collected_time(),
// "DBENGINE METRIC: metric latest time is in the future");
return metric_add_and_acquire(mrg, &entry, ret);
}
-METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
+inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
return metric_get_and_acquire(mrg, uuid, section);
}
-bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
+inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
return acquired_metric_del(mrg, metric);
}
-METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
+inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
metric_acquire(mrg, metric, false);
return metric;
}
-bool mrg_metric_release(MRG *mrg, METRIC *metric) {
+inline bool mrg_metric_release(MRG *mrg, METRIC *metric) {
return metric_release_and_can_be_deleted(mrg, metric);
}
-Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) {
+inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) {
return (Word_t)metric;
}
-uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) {
+inline uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) {
return &metric->uuid;
}
-Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) {
+inline Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) {
return metric->section;
}
-bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
+inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative");
if(unlikely(first_time_s < 0))
return false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
metric->first_time_s = first_time_s;
metric_has_retention_unsafe(mrg, metric);
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
-void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) {
+inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) {
internal_fatal(first_time_s < 0 || last_time_s < 0 || update_every_s < 0,
"DBENGINE METRIC: timestamp is negative");
internal_fatal(first_time_s > max_acceptable_collected_time(),
@@ -421,7 +418,7 @@ void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t
if(unlikely(!first_time_s && !last_time_s && !update_every_s))
return;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(unlikely(first_time_s && (!metric->first_time_s || first_time_s < metric->first_time_s)))
metric->first_time_s = first_time_s;
@@ -436,29 +433,29 @@ void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t
metric->latest_update_every_s = (uint32_t) update_every_s;
metric_has_retention_unsafe(mrg, metric);
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
}
-bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
+inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative");
bool ret = false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(first_time_s > metric->first_time_s) {
metric->first_time_s = first_time_s;
ret = true;
}
metric_has_retention_unsafe(mrg, metric);
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return ret;
}
-time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
+inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t first_time_s;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(unlikely(!metric->first_time_s)) {
if(metric->latest_time_s_clean)
@@ -470,13 +467,13 @@ time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
first_time_s = metric->first_time_s;
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return first_time_s;
}
-void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) {
- netdata_spinlock_lock(&metric->spinlock);
+inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) {
+ metric_lock(metric);
if(unlikely(!metric->first_time_s)) {
if(metric->latest_time_s_clean)
@@ -490,16 +487,16 @@ void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *f
*last_time_s = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
*update_every_s = metric->latest_update_every_s;
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
}
-bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
+inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative");
if(unlikely(latest_time_s < 0))
return false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
// internal_fatal(latest_time_s > max_acceptable_collected_time(),
// "DBENGINE METRIC: metric latest time is in the future");
@@ -513,12 +510,12 @@ bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric,
metric->first_time_s = latest_time_s;
metric_has_retention_unsafe(mrg, metric);
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
// returns true when metric still has retention
-bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
+inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
Word_t section = mrg_metric_section(mrg, metric);
bool do_again = false;
size_t countdown = 5;
@@ -551,7 +548,7 @@ bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
if (min_first_time_s == LONG_MAX)
min_first_time_s = 0;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if (--countdown && !min_first_time_s && metric->latest_time_s_hot)
do_again = true;
else {
@@ -563,13 +560,13 @@ bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
ret = metric_has_retention_unsafe(mrg, metric);
}
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
} while(do_again);
return ret;
}
-bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
+inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative");
// internal_fatal(latest_time_s > max_acceptable_collected_time(),
@@ -578,204 +575,215 @@ bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, t
if(unlikely(latest_time_s < 0))
return false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
metric->latest_time_s_hot = latest_time_s;
if(unlikely(!metric->first_time_s))
metric->first_time_s = latest_time_s;
metric_has_retention_unsafe(mrg, metric);
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
-time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
+inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t max;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
max = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return max;
}
-bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
+inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
if(update_every_s <= 0)
return false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
metric->latest_update_every_s = (uint32_t) update_every_s;
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
-bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
+inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
if(update_every_s <= 0)
return false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(!metric->latest_update_every_s)
metric->latest_update_every_s = (uint32_t) update_every_s;
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
-time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
+inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t update_every_s;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
update_every_s = metric->latest_update_every_s;
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return update_every_s;
}
-bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
+inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
bool done = false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(!metric->writer) {
metric->writer = gettid();
- __atomic_add_fetch(&mrg->stats.writers, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED);
done = true;
}
else
- __atomic_add_fetch(&mrg->stats.writers_conflicts, 1, __ATOMIC_RELAXED);
- netdata_spinlock_unlock(&metric->spinlock);
+ __atomic_add_fetch(&mrg->index[metric->partition].stats.writers_conflicts, 1, __ATOMIC_RELAXED);
+ metric_unlock(metric);
return done;
}
-bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
+inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
bool done = false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(metric->writer) {
metric->writer = 0;
- __atomic_sub_fetch(&mrg->stats.writers, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED);
done = true;
}
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return done;
}
-struct mrg_statistics mrg_get_statistics(MRG *mrg) {
- // FIXME - use atomics
- return mrg->stats;
-}
-
-// ----------------------------------------------------------------------------
-// unit test
+inline void mrg_update_metric_retention_and_granularity_by_uuid(
+ MRG *mrg, Word_t section, uuid_t *uuid,
+ time_t first_time_s, time_t last_time_s,
+ time_t update_every_s, time_t now_s)
+{
+ if(unlikely(last_time_s > now_s)) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
+ "fixing last time to now",
+ first_time_s, last_time_s, now_s);
+ last_time_s = now_s;
+ }
-#ifdef MRG_STRESS_TEST
+ if (unlikely(first_time_s > last_time_s)) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
+ "fixing first time to last time",
+ first_time_s, last_time_s, now_s);
-static void mrg_stress(MRG *mrg, size_t entries, size_t sections) {
- bool ret;
+ first_time_s = last_time_s;
+ }
- info("DBENGINE METRIC: stress testing %zu entries on %zu sections...", entries, sections);
+ if (unlikely(first_time_s == 0 || last_time_s == 0)) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
+ "using them as-is",
+ first_time_s, last_time_s, now_s);
+ }
- METRIC *array[entries][sections];
- for(size_t i = 0; i < entries ; i++) {
- MRG_ENTRY e = {
- .first_time_s = (time_t)(i + 1),
- .latest_time_s = (time_t)(i + 2),
- .latest_update_every_s = (time_t)(i + 3),
+ bool added = false;
+ METRIC *metric = mrg_metric_get_and_acquire(mrg, uuid, section);
+ if (!metric) {
+ MRG_ENTRY entry = {
+ .section = section,
+ .first_time_s = first_time_s,
+ .last_time_s = last_time_s,
+ .latest_update_every_s = (uint32_t) update_every_s
};
- uuid_generate_random(e.uuid);
-
- for(size_t section = 0; section < sections ;section++) {
- e.section = section;
- array[i][section] = mrg_metric_add_and_acquire(mrg, e, &ret);
- if(!ret)
- fatal("DBENGINE METRIC: failed to add metric %zu, section %zu", i, section);
-
- if(mrg_metric_add_and_acquire(mrg, e, &ret) != array[i][section])
- fatal("DBENGINE METRIC: adding the same metric twice, returns a different metric");
-
- if(ret)
- fatal("DBENGINE METRIC: adding the same metric twice, returns success");
-
- if(mrg_metric_get_and_acquire(mrg, &e.uuid, e.section) != array[i][section])
- fatal("DBENGINE METRIC: cannot get back the same metric");
-
- if(uuid_compare(*mrg_metric_uuid(mrg, array[i][section]), e.uuid) != 0)
- fatal("DBENGINE METRIC: uuids do not match");
- }
+ // memcpy(entry.uuid, *uuid, sizeof(uuid_t));
+ uuid_copy(entry.uuid, *uuid);
+ metric = mrg_metric_add_and_acquire(mrg, entry, &added);
}
- for(size_t i = 0; i < entries ; i++) {
- for (size_t section = 0; section < sections; section++) {
- uuid_t uuid;
- uuid_generate_random(uuid);
-
- if(mrg_metric_get_and_acquire(mrg, &uuid, section))
- fatal("DBENGINE METRIC: found non-existing uuid");
-
- if(mrg_metric_id(mrg, array[i][section]) != (Word_t)array[i][section])
- fatal("DBENGINE METRIC: metric id does not match");
-
- if(mrg_metric_get_first_time_s(mrg, array[i][section]) != (time_t)(i + 1))
- fatal("DBENGINE METRIC: wrong first time returned");
- if(mrg_metric_get_latest_time_s(mrg, array[i][section]) != (time_t)(i + 2))
- fatal("DBENGINE METRIC: wrong latest time returned");
- if(mrg_metric_get_update_every_s(mrg, array[i][section]) != (time_t)(i + 3))
- fatal("DBENGINE METRIC: wrong latest time returned");
-
- if(!mrg_metric_set_first_time_s(mrg, array[i][section], (time_t)((i + 1) * 2)))
- fatal("DBENGINE METRIC: cannot set first time");
- if(!mrg_metric_set_clean_latest_time_s(mrg, array[i][section], (time_t) ((i + 1) * 3)))
- fatal("DBENGINE METRIC: cannot set latest time");
- if(!mrg_metric_set_update_every(mrg, array[i][section], (time_t)((i + 1) * 4)))
- fatal("DBENGINE METRIC: cannot set update every");
-
- if(mrg_metric_get_first_time_s(mrg, array[i][section]) != (time_t)((i + 1) * 2))
- fatal("DBENGINE METRIC: wrong first time returned");
- if(mrg_metric_get_latest_time_s(mrg, array[i][section]) != (time_t)((i + 1) * 3))
- fatal("DBENGINE METRIC: wrong latest time returned");
- if(mrg_metric_get_update_every_s(mrg, array[i][section]) != (time_t)((i + 1) * 4))
- fatal("DBENGINE METRIC: wrong latest time returned");
- }
+ if (likely(!added))
+ mrg_metric_expand_retention(mrg, metric, first_time_s, last_time_s, update_every_s);
+
+ mrg_metric_release(mrg, metric);
+}
+
+inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) {
+ memset(s, 0, sizeof(struct mrg_statistics));
+
+ for(size_t i = 0; i < mrg->partitions ;i++) {
+ s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED);
+ s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED);
+ s->entries_with_retention += __atomic_load_n(&mrg->index[i].stats.entries_with_retention, __ATOMIC_RELAXED);
+ s->size += __atomic_load_n(&mrg->index[i].stats.size, __ATOMIC_RELAXED);
+ s->current_references += __atomic_load_n(&mrg->index[i].stats.current_references, __ATOMIC_RELAXED);
+ s->additions += __atomic_load_n(&mrg->index[i].stats.additions, __ATOMIC_RELAXED);
+ s->additions_duplicate += __atomic_load_n(&mrg->index[i].stats.additions_duplicate, __ATOMIC_RELAXED);
+ s->deletions += __atomic_load_n(&mrg->index[i].stats.deletions, __ATOMIC_RELAXED);
+ s->delete_having_retention_or_referenced += __atomic_load_n(&mrg->index[i].stats.delete_having_retention_or_referenced, __ATOMIC_RELAXED);
+ s->delete_misses += __atomic_load_n(&mrg->index[i].stats.delete_misses, __ATOMIC_RELAXED);
+ s->search_hits += __atomic_load_n(&mrg->index[i].stats.search_hits, __ATOMIC_RELAXED);
+ s->search_misses += __atomic_load_n(&mrg->index[i].stats.search_misses, __ATOMIC_RELAXED);
+ s->writers += __atomic_load_n(&mrg->index[i].stats.writers, __ATOMIC_RELAXED);
+ s->writers_conflicts += __atomic_load_n(&mrg->index[i].stats.writers_conflicts, __ATOMIC_RELAXED);
}
- for(size_t i = 0; i < entries ; i++) {
- for (size_t section = 0; section < sections; section++) {
- if(!mrg_metric_release_and_delete(mrg, array[i][section]))
- fatal("DBENGINE METRIC: failed to delete metric");
- }
- }
+ s->size += sizeof(MRG) + sizeof(struct mrg_partition) * mrg->partitions;
}
-static void *mrg_stress_test_thread1(void *ptr) {
- MRG *mrg = ptr;
+// ----------------------------------------------------------------------------
+// unit test
- for(int i = 0; i < 5 ; i++)
- mrg_stress(mrg, 10000, 5);
+struct mrg_stress_entry {
+ uuid_t uuid;
+ time_t after;
+ time_t before;
+};
- return ptr;
-}
+struct mrg_stress {
+ MRG *mrg;
+ bool stop;
+ size_t entries;
+ struct mrg_stress_entry *array;
+ size_t updates;
+};
-static void *mrg_stress_test_thread2(void *ptr) {
- MRG *mrg = ptr;
+static void *mrg_stress(void *ptr) {
+ struct mrg_stress *t = ptr;
+ MRG *mrg = t->mrg;
- for(int i = 0; i < 10 ; i++)
- mrg_stress(mrg, 500, 50);
+ ssize_t start = 0;
+ ssize_t end = (ssize_t)t->entries;
+ ssize_t step = 1;
- return ptr;
-}
+ if(gettid() % 2) {
+ start = (ssize_t)t->entries - 1;
+ end = -1;
+ step = -1;
+ }
+
+ while(!__atomic_load_n(&t->stop, __ATOMIC_RELAXED)) {
+ for (ssize_t i = start; i != end; i += step) {
+ struct mrg_stress_entry *e = &t->array[i];
-static void *mrg_stress_test_thread3(void *ptr) {
- MRG *mrg = ptr;
+ time_t after = __atomic_sub_fetch(&e->after, 1, __ATOMIC_RELAXED);
+ time_t before = __atomic_add_fetch(&e->before, 1, __ATOMIC_RELAXED);
- for(int i = 0; i < 50 ; i++)
- mrg_stress(mrg, 5000, 1);
+ mrg_update_metric_retention_and_granularity_by_uuid(
+ mrg, 0x01,
+ &e->uuid,
+ after,
+ before,
+ 1,
+ before);
+
+ __atomic_add_fetch(&t->updates, 1, __ATOMIC_RELAXED);
+ }
+ }
return ptr;
}
-#endif
int mrg_unittest(void) {
- MRG *mrg = mrg_create();
+ MRG *mrg = mrg_create(0);
METRIC *m1_t0, *m2_t0, *m3_t0, *m4_t0;
METRIC *m1_t1, *m2_t1, *m3_t1, *m4_t1;
bool ret;
@@ -850,54 +858,84 @@ int mrg_unittest(void) {
if(!mrg_metric_release_and_delete(mrg, m1_t1))
fatal("DBENGINE METRIC: cannot delete the second metric");
- if(mrg->stats.entries != 0)
+ struct mrg_statistics s;
+ mrg_get_statistics(mrg, &s);
+ if(s.entries != 0)
fatal("DBENGINE METRIC: invalid entries counter");
-#ifdef MRG_STRESS_TEST
- usec_t started_ut = now_monotonic_usec();
- pthread_t thread1;
- netdata_thread_create(&thread1, "TH1",
- NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
- mrg_stress_test_thread1, mrg);
+ size_t entries = 1000000;
+ size_t threads = mrg->partitions / 3 + 1;
+ size_t tiers = 3;
+ size_t run_for_secs = 5;
+ netdata_log_info("preparing stress test of %zu entries...", entries);
+ struct mrg_stress t = {
+ .mrg = mrg,
+ .entries = entries,
+ .array = callocz(entries, sizeof(struct mrg_stress_entry)),
+ };
- pthread_t thread2;
- netdata_thread_create(&thread2, "TH2",
- NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
- mrg_stress_test_thread2, mrg);
+ time_t now = max_acceptable_collected_time();
+ for(size_t i = 0; i < entries ;i++) {
+ uuid_generate_random(t.array[i].uuid);
+ t.array[i].after = now / 3;
+ t.array[i].before = now / 2;
+ }
+ netdata_log_info("stress test is populating MRG with 3 tiers...");
+ for(size_t i = 0; i < entries ;i++) {
+ struct mrg_stress_entry *e = &t.array[i];
+ for(size_t tier = 1; tier <= tiers ;tier++) {
+ mrg_update_metric_retention_and_granularity_by_uuid(
+ mrg, tier,
+ &e->uuid,
+ e->after,
+ e->before,
+ 1,
+ e->before);
+ }
+ }
+ netdata_log_info("stress test ready to run...");
- pthread_t thread3;
- netdata_thread_create(&thread3, "TH3",
- NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
- mrg_stress_test_thread3, mrg);
+ usec_t started_ut = now_monotonic_usec();
+ pthread_t th[threads];
+ for(size_t i = 0; i < threads ; i++) {
+ char buf[15 + 1];
+ snprintfz(buf, 15, "TH[%zu]", i);
+ netdata_thread_create(&th[i], buf,
+ NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
+ mrg_stress, &t);
+ }
- sleep_usec(5 * USEC_PER_SEC);
+ sleep_usec(run_for_secs * USEC_PER_SEC);
+ __atomic_store_n(&t.stop, true, __ATOMIC_RELAXED);
- netdata_thread_cancel(thread1);
- netdata_thread_cancel(thread2);
- netdata_thread_cancel(thread3);
+ for(size_t i = 0; i < threads ; i++)
+ netdata_thread_cancel(th[i]);
+
+ for(size_t i = 0; i < threads ; i++)
+ netdata_thread_join(th[i], NULL);
- netdata_thread_join(thread1, NULL);
- netdata_thread_join(thread2, NULL);
- netdata_thread_join(thread3, NULL);
usec_t ended_ut = now_monotonic_usec();
- info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, "
+ struct mrg_statistics stats;
+ mrg_get_statistics(mrg, &stats);
+
+ netdata_log_info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, "
"%zu deletions, %zu wrong deletions, "
"%zu successful searches, %zu wrong searches, "
- "%zu successful pointer validations, %zu wrong pointer validations "
"in %llu usecs",
- mrg->stats.additions, mrg->stats.additions_duplicate,
- mrg->stats.deletions, mrg->stats.delete_misses,
- mrg->stats.search_hits, mrg->stats.search_misses,
- mrg->stats.pointer_validation_hits, mrg->stats.pointer_validation_misses,
+ stats.additions, stats.additions_duplicate,
+ stats.deletions, stats.delete_misses,
+ stats.search_hits, stats.search_misses,
ended_ut - started_ut);
-#endif
+ netdata_log_info("DBENGINE METRIC: updates performance: %0.2fk/sec total, %0.2fk/sec/thread",
+ (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0,
+ (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0 / threads);
mrg_destroy(mrg);
- info("DBENGINE METRIC: all tests passed!");
+ netdata_log_info("DBENGINE METRIC: all tests passed!");
return 0;
}