summaryrefslogtreecommitdiffstats
path: root/database/engine/metric.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--database/engine/metric.c360
1 files changed, 146 insertions, 214 deletions
diff --git a/database/engine/metric.c b/database/engine/metric.c
index 69b8f3116..2e132612e 100644
--- a/database/engine/metric.c
+++ b/database/engine/metric.c
@@ -1,30 +1,44 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
#include "metric.h"
typedef int32_t REFCOUNT;
#define REFCOUNT_DELETING (-100)
-typedef enum __attribute__ ((__packed__)) {
- METRIC_FLAG_HAS_RETENTION = (1 << 0),
-} METRIC_FLAGS;
-
struct metric {
uuid_t uuid; // never changes
Word_t section; // never changes
- time_t first_time_s; //
- time_t latest_time_s_clean; // archived pages latest time
- time_t latest_time_s_hot; // latest time of the currently collected page
- uint32_t latest_update_every_s; //
+ time_t first_time_s; // the timestamp of the oldest point in the database
+ time_t latest_time_s_clean; // the timestamp of the newest point in the database
+ time_t latest_time_s_hot; // the timestamp of the latest point that has been collected (not yet stored)
+ uint32_t latest_update_every_s; // the latest data collection frequency
pid_t writer;
uint8_t partition;
- METRIC_FLAGS flags;
REFCOUNT refcount;
- SPINLOCK spinlock; // protects all variable members
// THIS IS allocated with malloc()
// YOU HAVE TO INITIALIZE IT YOURSELF !
};
+#define set_metric_field_with_condition(field, value, condition) ({ \
+ typeof(field) _current = __atomic_load_n(&(field), __ATOMIC_RELAXED); \
+ typeof(field) _wanted = value; \
+ bool did_it = true; \
+ \
+ do { \
+ if((condition) && (_current != _wanted)) { \
+ ; \
+ } \
+ else { \
+ did_it = false; \
+ break; \
+ } \
+ } while(!__atomic_compare_exchange_n(&(field), &_current, _wanted, \
+ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); \
+ \
+ did_it; \
+})
+
static struct aral_statistics mrg_aral_statistics;
struct mrg {
@@ -73,9 +87,6 @@ static inline void MRG_STATS_DELETE_MISS(MRG *mrg, size_t partition) {
#define mrg_index_write_lock(mrg, partition) rw_spinlock_write_lock(&(mrg)->index[partition].rw_spinlock)
#define mrg_index_write_unlock(mrg, partition) rw_spinlock_write_unlock(&(mrg)->index[partition].rw_spinlock)
-#define metric_lock(metric) spinlock_lock(&(metric)->spinlock)
-#define metric_unlock(metric) spinlock_unlock(&(metric)->spinlock)
-
static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl, size_t partition) {
if(mem_after_judyl > mem_before_judyl)
__atomic_add_fetch(&mrg->index[partition].stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED);
@@ -97,40 +108,34 @@ static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) {
return *n % mrg->partitions;
}
-static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) {
- size_t partition = metric->partition;
+static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused, METRIC *metric) {
+ time_t first_time_s = __atomic_load_n(&metric->first_time_s, __ATOMIC_RELAXED);
- bool has_retention = (metric->first_time_s > 0 || metric->latest_time_s_clean > 0 || metric->latest_time_s_hot > 0);
+ if(first_time_s <= 0) {
+ first_time_s = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
+ if(first_time_s <= 0)
+ first_time_s = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
- if(has_retention && !(metric->flags & METRIC_FLAG_HAS_RETENTION)) {
- metric->flags |= METRIC_FLAG_HAS_RETENTION;
- __atomic_add_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED);
- }
- else if(!has_retention && (metric->flags & METRIC_FLAG_HAS_RETENTION)) {
- metric->flags &= ~METRIC_FLAG_HAS_RETENTION;
- __atomic_sub_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED);
+ if(first_time_s <= 0)
+ first_time_s = 0;
+ else
+ __atomic_store_n(&metric->first_time_s, first_time_s, __ATOMIC_RELAXED);
}
- return has_retention;
+ return first_time_s;
}
-static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, bool having_spinlock) {
+static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric) {
size_t partition = metric->partition;
+ REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
REFCOUNT refcount;
- if(!having_spinlock)
- metric_lock(metric);
-
- if(unlikely(metric->refcount < 0))
- fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount);
-
- refcount = ++metric->refcount;
-
- // update its retention flags
- metric_has_retention_unsafe(mrg, metric);
+ do {
+ if(expected < 0)
+ fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount);
- if(!having_spinlock)
- metric_unlock(metric);
+ refcount = expected + 1;
+ } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
if(refcount == 1)
__atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
@@ -141,28 +146,25 @@ static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, b
}
static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) {
- bool ret = true;
size_t partition = metric->partition;
+ REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
REFCOUNT refcount;
- metric_lock(metric);
-
- if(unlikely(metric->refcount <= 0))
- fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount);
-
- refcount = --metric->refcount;
-
- if(likely(metric_has_retention_unsafe(mrg, metric) || refcount != 0))
- ret = false;
+ do {
+ if(expected <= 0)
+ fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount);
- metric_unlock(metric);
+ refcount = expected - 1;
+ } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
if(unlikely(!refcount))
__atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
- return ret;
+ time_t first, last, ue;
+ mrg_metric_get_retention(mrg, metric, &first, &last, &ue);
+ return (!first || !last || first > last);
}
static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
@@ -192,7 +194,7 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r
if(unlikely(*PValue != NULL)) {
METRIC *metric = *PValue;
- metric_acquire(mrg, metric, false);
+ metric_acquire(mrg, metric);
MRG_STATS_DUPLICATE_ADD(mrg, partition);
@@ -215,10 +217,8 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r
metric->latest_update_every_s = entry->latest_update_every_s;
metric->writer = 0;
metric->refcount = 0;
- metric->flags = 0;
metric->partition = partition;
- spinlock_init(&metric->spinlock);
- metric_acquire(mrg, metric, true); // no spinlock use required here
+ metric_acquire(mrg, metric);
*PValue = metric;
MRG_STATS_ADDED_METRIC(mrg, partition);
@@ -252,7 +252,7 @@ static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t sect
METRIC *metric = *PValue;
- metric_acquire(mrg, metric, false);
+ metric_acquire(mrg, metric);
mrg_index_read_unlock(mrg, partition);
@@ -363,7 +363,7 @@ inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
}
inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
- metric_acquire(mrg, metric, false);
+ metric_acquire(mrg, metric);
return metric;
}
@@ -389,10 +389,7 @@ inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric,
if(unlikely(first_time_s < 0))
return false;
- metric_lock(metric);
- metric->first_time_s = first_time_s;
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
+ __atomic_store_n(&metric->first_time_s, first_time_s, __ATOMIC_RELAXED);
return true;
}
@@ -405,112 +402,56 @@ inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric,
internal_fatal(last_time_s > max_acceptable_collected_time(),
"DBENGINE METRIC: metric last time is in the future");
- if(unlikely(first_time_s < 0))
- first_time_s = 0;
-
- if(unlikely(last_time_s < 0))
- last_time_s = 0;
-
- if(unlikely(update_every_s < 0))
- update_every_s = 0;
-
- if(unlikely(!first_time_s && !last_time_s && !update_every_s))
- return;
+ if(first_time_s > 0)
+ set_metric_field_with_condition(metric->first_time_s, first_time_s, _current <= 0 || _wanted < _current);
- metric_lock(metric);
-
- if(unlikely(first_time_s && (!metric->first_time_s || first_time_s < metric->first_time_s)))
- metric->first_time_s = first_time_s;
-
- if(likely(last_time_s && (!metric->latest_time_s_clean || last_time_s > metric->latest_time_s_clean))) {
- metric->latest_time_s_clean = last_time_s;
-
- if(likely(update_every_s))
- metric->latest_update_every_s = (uint32_t) update_every_s;
+ if(last_time_s > 0) {
+ if(set_metric_field_with_condition(metric->latest_time_s_clean, last_time_s, _current <= 0 || _wanted > _current) &&
+ update_every_s > 0)
+ // set the latest update every too
+ set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true);
}
- else if(unlikely(!metric->latest_update_every_s && update_every_s))
- metric->latest_update_every_s = (uint32_t) update_every_s;
-
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
+ else if(update_every_s > 0)
+ // set it only if it is invalid
+ set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0);
}
inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative");
-
- bool ret = false;
-
- metric_lock(metric);
- if(first_time_s > metric->first_time_s) {
- metric->first_time_s = first_time_s;
- ret = true;
- }
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
-
- return ret;
+ return set_metric_field_with_condition(metric->first_time_s, first_time_s, _wanted > _current);
}
inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
- time_t first_time_s;
-
- metric_lock(metric);
-
- if(unlikely(!metric->first_time_s)) {
- if(metric->latest_time_s_clean)
- metric->first_time_s = metric->latest_time_s_clean;
-
- else if(metric->latest_time_s_hot)
- metric->first_time_s = metric->latest_time_s_hot;
- }
-
- first_time_s = metric->first_time_s;
-
- metric_unlock(metric);
-
- return first_time_s;
+ return mrg_metric_get_first_time_s_smart(mrg, metric);
}
inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) {
- metric_lock(metric);
-
- if(unlikely(!metric->first_time_s)) {
- if(metric->latest_time_s_clean)
- metric->first_time_s = metric->latest_time_s_clean;
-
- else if(metric->latest_time_s_hot)
- metric->first_time_s = metric->latest_time_s_hot;
- }
-
- *first_time_s = metric->first_time_s;
- *last_time_s = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
- *update_every_s = metric->latest_update_every_s;
+ time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
+ time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
- metric_unlock(metric);
+ *last_time_s = MAX(clean, hot);
+ *first_time_s = mrg_metric_get_first_time_s_smart(mrg, metric);
+ *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
}
inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative");
- if(unlikely(latest_time_s < 0))
- return false;
-
- metric_lock(metric);
-
// internal_fatal(latest_time_s > max_acceptable_collected_time(),
// "DBENGINE METRIC: metric latest time is in the future");
// internal_fatal(metric->latest_time_s_clean > latest_time_s,
// "DBENGINE METRIC: metric new clean latest time is older than the previous one");
- metric->latest_time_s_clean = latest_time_s;
+ if(latest_time_s > 0) {
+ if(set_metric_field_with_condition(metric->latest_time_s_clean, latest_time_s, true)) {
+ set_metric_field_with_condition(metric->first_time_s, latest_time_s, _current <= 0 || _wanted < _current);
- if(unlikely(!metric->first_time_s))
- metric->first_time_s = latest_time_s;
+ return true;
+ }
+ }
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
- return true;
+ return false;
}
// returns true when metric still has retention
@@ -518,7 +459,6 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr
Word_t section = mrg_metric_section(mrg, metric);
bool do_again = false;
size_t countdown = 5;
- bool ret = true;
do {
time_t min_first_time_s = LONG_MAX;
@@ -547,22 +487,20 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr
if (min_first_time_s == LONG_MAX)
min_first_time_s = 0;
- metric_lock(metric);
- if (--countdown && !min_first_time_s && metric->latest_time_s_hot)
+ if (--countdown && !min_first_time_s && __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED))
do_again = true;
else {
internal_error(!countdown, "METRIC: giving up on updating the retention of metric without disk retention");
do_again = false;
- metric->first_time_s = min_first_time_s;
- metric->latest_time_s_clean = max_end_time_s;
-
- ret = metric_has_retention_unsafe(mrg, metric);
+ set_metric_field_with_condition(metric->first_time_s, min_first_time_s, true);
+ set_metric_field_with_condition(metric->latest_time_s_clean, max_end_time_s, true);
}
- metric_unlock(metric);
} while(do_again);
- return ret;
+ time_t first, last, ue;
+ mrg_metric_get_retention(mrg, metric, &first, &last, &ue);
+ return (first && last && first < last);
}
inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
@@ -571,88 +509,80 @@ inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *me
// internal_fatal(latest_time_s > max_acceptable_collected_time(),
// "DBENGINE METRIC: metric latest time is in the future");
- if(unlikely(latest_time_s < 0))
- return false;
-
- metric_lock(metric);
- metric->latest_time_s_hot = latest_time_s;
-
- if(unlikely(!metric->first_time_s))
- metric->first_time_s = latest_time_s;
+ if(likely(latest_time_s > 0)) {
+ __atomic_store_n(&metric->latest_time_s_hot, latest_time_s, __ATOMIC_RELAXED);
+ return true;
+ }
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
- return true;
+ return false;
}
inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
- time_t max;
- metric_lock(metric);
- max = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
- metric_unlock(metric);
- return max;
+ time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
+ time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
+
+ return MAX(clean, hot);
}
inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
- if(update_every_s <= 0)
- return false;
-
- metric_lock(metric);
- metric->latest_update_every_s = (uint32_t) update_every_s;
- metric_unlock(metric);
+ if(update_every_s > 0)
+ return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true);
- return true;
+ return false;
}
inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
- if(update_every_s <= 0)
- return false;
-
- metric_lock(metric);
- if(!metric->latest_update_every_s)
- metric->latest_update_every_s = (uint32_t) update_every_s;
- metric_unlock(metric);
+ if(update_every_s > 0)
+ return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0);
- return true;
+ return false;
}
inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
- time_t update_every_s;
-
- metric_lock(metric);
- update_every_s = metric->latest_update_every_s;
- metric_unlock(metric);
-
- return update_every_s;
+ return __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
}
inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
- bool done = false;
- metric_lock(metric);
- if(!metric->writer) {
- metric->writer = gettid();
+ pid_t expected = __atomic_load_n(&metric->writer, __ATOMIC_RELAXED);
+ pid_t wanted = gettid();
+ bool done = true;
+
+ do {
+ if(expected != 0) {
+ done = false;
+ break;
+ }
+ } while(!__atomic_compare_exchange_n(&metric->writer, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+
+ if(done)
__atomic_add_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED);
- done = true;
- }
else
__atomic_add_fetch(&mrg->index[metric->partition].stats.writers_conflicts, 1, __ATOMIC_RELAXED);
- metric_unlock(metric);
+
return done;
}
inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
- bool done = false;
- metric_lock(metric);
- if(metric->writer) {
- metric->writer = 0;
+ // this function can be called from a different thread than the one than the writer
+
+ pid_t expected = __atomic_load_n(&metric->writer, __ATOMIC_RELAXED);
+ pid_t wanted = 0;
+ bool done = true;
+
+ do {
+ if(!expected) {
+ done = false;
+ break;
+ }
+ } while(!__atomic_compare_exchange_n(&metric->writer, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+
+ if(done)
__atomic_sub_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED);
- done = true;
- }
- metric_unlock(metric);
+
return done;
}
@@ -662,27 +592,30 @@ inline void mrg_update_metric_retention_and_granularity_by_uuid(
time_t update_every_s, time_t now_s)
{
if(unlikely(last_time_s > now_s)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
- "fixing last time to now",
- first_time_s, last_time_s, now_s);
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
+ "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
+ "fixing last time to now",
+ first_time_s, last_time_s, now_s);
last_time_s = now_s;
}
if (unlikely(first_time_s > last_time_s)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
- "fixing first time to last time",
- first_time_s, last_time_s, now_s);
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
+ "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
+ "fixing first time to last time",
+ first_time_s, last_time_s, now_s);
first_time_s = last_time_s;
}
if (unlikely(first_time_s == 0 || last_time_s == 0)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
- "using them as-is",
- first_time_s, last_time_s, now_s);
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
+ "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
+ "using them as-is",
+ first_time_s, last_time_s, now_s);
}
bool added = false;
@@ -710,7 +643,6 @@ inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) {
for(size_t i = 0; i < mrg->partitions ;i++) {
s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED);
s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED);
- s->entries_with_retention += __atomic_load_n(&mrg->index[i].stats.entries_with_retention, __ATOMIC_RELAXED);
s->size += __atomic_load_n(&mrg->index[i].stats.size, __ATOMIC_RELAXED);
s->current_references += __atomic_load_n(&mrg->index[i].stats.current_references, __ATOMIC_RELAXED);
s->additions += __atomic_load_n(&mrg->index[i].stats.additions, __ATOMIC_RELAXED);
@@ -900,7 +832,7 @@ int mrg_unittest(void) {
pthread_t th[threads];
for(size_t i = 0; i < threads ; i++) {
char buf[15 + 1];
- snprintfz(buf, 15, "TH[%zu]", i);
+ snprintfz(buf, sizeof(buf) - 1, "TH[%zu]", i);
netdata_thread_create(&th[i], buf,
NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
mrg_stress, &t);