summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine')
-rw-r--r--database/engine/cache.c16
-rw-r--r--database/engine/cache.h1
-rw-r--r--database/engine/datafile.c20
-rw-r--r--database/engine/journalfile.c15
-rw-r--r--database/engine/metric.c360
-rw-r--r--database/engine/metric.h4
-rw-r--r--database/engine/page.c679
-rw-r--r--database/engine/page.h58
-rw-r--r--database/engine/page_test.cc405
-rw-r--r--database/engine/page_test.h14
-rw-r--r--database/engine/pagecache.c62
-rw-r--r--database/engine/pagecache.h2
-rw-r--r--database/engine/pdc.c119
-rw-r--r--database/engine/rrddiskprotocol.h14
-rw-r--r--database/engine/rrdengine.c117
-rw-r--r--database/engine/rrdengine.h17
-rwxr-xr-xdatabase/engine/rrdengineapi.c231
-rw-r--r--database/engine/rrdengineapi.h1
-rw-r--r--database/engine/rrdenginelib.h16
19 files changed, 1599 insertions, 552 deletions
diff --git a/database/engine/cache.c b/database/engine/cache.c
index 7a9ccf8d1..eb1c35298 100644
--- a/database/engine/cache.c
+++ b/database/engine/cache.c
@@ -1,3 +1,4 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
#include "cache.h"
/* STATES AND TRANSITIONS
@@ -1170,9 +1171,10 @@ static bool evict_pages_with_filter(PGC *cache, size_t max_skip, size_t max_evic
if(all_of_them && !filter) {
pgc_ll_lock(cache, &cache->clean);
if(cache->clean.stats->entries) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE CACHE: cannot free all clean pages, %zu are still in the clean queue",
- cache->clean.stats->entries);
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_NOTICE,
+ "DBENGINE CACHE: cannot free all clean pages, %zu are still in the clean queue",
+ cache->clean.stats->entries);
}
pgc_ll_unlock(cache, &cache->clean);
}
@@ -1801,7 +1803,7 @@ PGC *pgc_create(const char *name,
cache->aral = callocz(cache->config.partitions, sizeof(ARAL *));
for(size_t part = 0; part < cache->config.partitions ; part++) {
char buf[100 +1];
- snprintfz(buf, 100, "%s[%zu]", name, part);
+ snprintfz(buf, sizeof(buf) - 1, "%s[%zu]", name, part);
cache->aral[part] = aral_create(
buf,
sizeof(PGC_PAGE) + cache->config.additional_bytes_per_page,
@@ -1860,7 +1862,7 @@ void pgc_destroy(PGC *cache) {
freez(cache->aral);
#endif
-
+ freez(cache->index);
freez(cache);
}
}
@@ -2517,7 +2519,7 @@ void unittest_stress_test(void) {
for(size_t i = 0; i < pgc_uts.collect_threads ;i++) {
collect_thread_ids[i] = i;
char buffer[100 + 1];
- snprintfz(buffer, 100, "COLLECT_%zu", i);
+ snprintfz(buffer, sizeof(buffer) - 1, "COLLECT_%zu", i);
netdata_thread_create(&collect_threads[i], buffer,
NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
unittest_stress_test_collector, &collect_thread_ids[i]);
@@ -2529,7 +2531,7 @@ void unittest_stress_test(void) {
for(size_t i = 0; i < pgc_uts.query_threads ;i++) {
query_thread_ids[i] = i;
char buffer[100 + 1];
- snprintfz(buffer, 100, "QUERY_%zu", i);
+ snprintfz(buffer, sizeof(buffer) - 1, "QUERY_%zu", i);
initstate_r(1, pgc_uts.rand_statebufs, 1024, &pgc_uts.random_data[i]);
netdata_thread_create(&queries_threads[i], buffer,
NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
diff --git a/database/engine/cache.h b/database/engine/cache.h
index c10e09928..7cd7c0636 100644
--- a/database/engine/cache.h
+++ b/database/engine/cache.h
@@ -1,3 +1,4 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef DBENGINE_CACHE_H
#define DBENGINE_CACHE_H
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index fcda84bd6..7322039cd 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -160,7 +160,7 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
- (void) snprintfz(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
+ (void) snprintfz(str, maxlen - 1, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION,
datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
}
@@ -338,7 +338,8 @@ static int load_data_file(struct rrdengine_datafile *datafile)
ctx_fs_error(ctx);
return fd;
}
- netdata_log_info("DBENGINE: initializing data file \"%s\".", path);
+
+ nd_log_daemon(NDLP_DEBUG, "DBENGINE: initializing data file \"%s\".", path);
ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
if (ret)
@@ -354,7 +355,8 @@ static int load_data_file(struct rrdengine_datafile *datafile)
datafile->file = file;
datafile->pos = file_size;
- netdata_log_info("DBENGINE: data file \"%s\" initialized (size:%"PRIu64").", path, file_size);
+ nd_log_daemon(NDLP_DEBUG, "DBENGINE: data file \"%s\" initialized (size:%" PRIu64 ").", path, file_size);
+
return 0;
error:
@@ -422,6 +424,7 @@ static int scan_data_files(struct rrdengine_instance *ctx)
ctx->atomic.last_fileno = datafiles[matched_files - 1]->fileno;
+ netdata_log_info("DBENGINE: loading %d data/journal of tier %d...", matched_files, ctx->config.tier);
for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
uint8_t must_delete_pair = 0;
@@ -479,14 +482,18 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock)
int ret;
char path[RRDENG_PATH_MAX];
- netdata_log_info("DBENGINE: creating new data and journal files in path %s", ctx->config.dbfiles_path);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "DBENGINE: creating new data and journal files in path %s",
+ ctx->config.dbfiles_path);
+
datafile = datafile_alloc_and_init(ctx, 1, fileno);
ret = create_data_file(datafile);
if(ret)
goto error_after_datafile;
generate_datafilepath(datafile, path, sizeof(path));
- netdata_log_info("DBENGINE: created data file \"%s\".", path);
+ nd_log(NDLS_DAEMON, NDLP_INFO,
+ "DBENGINE: created data file \"%s\".", path);
journalfile = journalfile_alloc_and_init(datafile);
ret = journalfile_create(journalfile, datafile);
@@ -494,7 +501,8 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock)
goto error_after_journalfile;
journalfile_v1_generate_path(datafile, path, sizeof(path));
- netdata_log_info("DBENGINE: created journal file \"%s\".", path);
+ nd_log(NDLS_DAEMON, NDLP_INFO,
+ "DBENGINE: created journal file \"%s\".", path);
ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos);
datafile_list_insert(ctx, datafile, having_lock);
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index abb9d2eb9..9005b81ca 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -67,7 +67,7 @@ void journalfile_v2_generate_path(struct rrdengine_datafile *datafile, char *str
void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
- (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
+ (void) snprintfz(str, maxlen - 1, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
}
@@ -169,7 +169,7 @@ static void njfv2idx_add(struct rrdengine_datafile *datafile) {
*PValue = datafile;
break;
}
- } while(0);
+ } while(1);
rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock);
}
@@ -1013,7 +1013,7 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st
journalfile_v2_data_release(journalfile);
usec_t ended_ut = now_monotonic_usec();
- netdata_log_info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms"
+ nd_log_daemon(NDLP_DEBUG, "DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms"
, ctx->config.tier, journalfile->datafile->fileno
, (double)data_size / 1024 / 1024
, (double)entries / 1000
@@ -1073,7 +1073,8 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
return 1;
}
- netdata_log_info("DBENGINE: checking integrity of '%s'", path_v2);
+ nd_log_daemon(NDLP_DEBUG, "DBENGINE: checking integrity of '%s'", path_v2);
+
usec_t validation_start_ut = now_monotonic_usec();
int rc = journalfile_v2_validate(data_start, journal_v2_file_size, journal_v1_file_size);
if (unlikely(rc)) {
@@ -1104,7 +1105,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
usec_t finished_ut = now_monotonic_usec();
- netdata_log_info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, "
+ nd_log_daemon(NDLP_DEBUG, "DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, "
"mmap: %0.2f ms, validate: %0.2f ms"
, path_v2
, (double)journal_v2_file_size / 1024 / 1024
@@ -1535,13 +1536,13 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil
}
ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_jf_sb));
- netdata_log_info("DBENGINE: loading journal file '%s'", path);
+ nd_log_daemon(NDLP_DEBUG, "DBENGINE: loading journal file '%s'", path);
max_id = journalfile_iterate_transactions(ctx, journalfile);
__atomic_store_n(&ctx->atomic.transaction_id, MAX(__atomic_load_n(&ctx->atomic.transaction_id, __ATOMIC_RELAXED), max_id + 1), __ATOMIC_RELAXED);
- netdata_log_info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size);
+ nd_log_daemon(NDLP_DEBUG, "DBENGINE: journal file '%s' loaded (size:%" PRIu64 ").", path, file_size);
bool is_last_file = (ctx_last_fileno_get(ctx) == journalfile->datafile->fileno);
if (is_last_file && journalfile->datafile->pos <= rrdeng_target_data_file_size(ctx) / 3) {
diff --git a/database/engine/metric.c b/database/engine/metric.c
index 69b8f3116..2e132612e 100644
--- a/database/engine/metric.c
+++ b/database/engine/metric.c
@@ -1,30 +1,44 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
#include "metric.h"
typedef int32_t REFCOUNT;
#define REFCOUNT_DELETING (-100)
-typedef enum __attribute__ ((__packed__)) {
- METRIC_FLAG_HAS_RETENTION = (1 << 0),
-} METRIC_FLAGS;
-
struct metric {
uuid_t uuid; // never changes
Word_t section; // never changes
- time_t first_time_s; //
- time_t latest_time_s_clean; // archived pages latest time
- time_t latest_time_s_hot; // latest time of the currently collected page
- uint32_t latest_update_every_s; //
+ time_t first_time_s; // the timestamp of the oldest point in the database
+ time_t latest_time_s_clean; // the timestamp of the newest point in the database
+ time_t latest_time_s_hot; // the timestamp of the latest point that has been collected (not yet stored)
+ uint32_t latest_update_every_s; // the latest data collection frequency
pid_t writer;
uint8_t partition;
- METRIC_FLAGS flags;
REFCOUNT refcount;
- SPINLOCK spinlock; // protects all variable members
// THIS IS allocated with malloc()
// YOU HAVE TO INITIALIZE IT YOURSELF !
};
+#define set_metric_field_with_condition(field, value, condition) ({ \
+ typeof(field) _current = __atomic_load_n(&(field), __ATOMIC_RELAXED); \
+ typeof(field) _wanted = value; \
+ bool did_it = true; \
+ \
+ do { \
+ if((condition) && (_current != _wanted)) { \
+ ; \
+ } \
+ else { \
+ did_it = false; \
+ break; \
+ } \
+ } while(!__atomic_compare_exchange_n(&(field), &_current, _wanted, \
+ false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); \
+ \
+ did_it; \
+})
+
static struct aral_statistics mrg_aral_statistics;
struct mrg {
@@ -73,9 +87,6 @@ static inline void MRG_STATS_DELETE_MISS(MRG *mrg, size_t partition) {
#define mrg_index_write_lock(mrg, partition) rw_spinlock_write_lock(&(mrg)->index[partition].rw_spinlock)
#define mrg_index_write_unlock(mrg, partition) rw_spinlock_write_unlock(&(mrg)->index[partition].rw_spinlock)
-#define metric_lock(metric) spinlock_lock(&(metric)->spinlock)
-#define metric_unlock(metric) spinlock_unlock(&(metric)->spinlock)
-
static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl, size_t partition) {
if(mem_after_judyl > mem_before_judyl)
__atomic_add_fetch(&mrg->index[partition].stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED);
@@ -97,40 +108,34 @@ static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) {
return *n % mrg->partitions;
}
-static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) {
- size_t partition = metric->partition;
+static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused, METRIC *metric) {
+ time_t first_time_s = __atomic_load_n(&metric->first_time_s, __ATOMIC_RELAXED);
- bool has_retention = (metric->first_time_s > 0 || metric->latest_time_s_clean > 0 || metric->latest_time_s_hot > 0);
+ if(first_time_s <= 0) {
+ first_time_s = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
+ if(first_time_s <= 0)
+ first_time_s = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
- if(has_retention && !(metric->flags & METRIC_FLAG_HAS_RETENTION)) {
- metric->flags |= METRIC_FLAG_HAS_RETENTION;
- __atomic_add_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED);
- }
- else if(!has_retention && (metric->flags & METRIC_FLAG_HAS_RETENTION)) {
- metric->flags &= ~METRIC_FLAG_HAS_RETENTION;
- __atomic_sub_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED);
+ if(first_time_s <= 0)
+ first_time_s = 0;
+ else
+ __atomic_store_n(&metric->first_time_s, first_time_s, __ATOMIC_RELAXED);
}
- return has_retention;
+ return first_time_s;
}
-static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, bool having_spinlock) {
+static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric) {
size_t partition = metric->partition;
+ REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
REFCOUNT refcount;
- if(!having_spinlock)
- metric_lock(metric);
-
- if(unlikely(metric->refcount < 0))
- fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount);
-
- refcount = ++metric->refcount;
-
- // update its retention flags
- metric_has_retention_unsafe(mrg, metric);
+ do {
+ if(expected < 0)
+ fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount);
- if(!having_spinlock)
- metric_unlock(metric);
+ refcount = expected + 1;
+ } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
if(refcount == 1)
__atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
@@ -141,28 +146,25 @@ static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, b
}
static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) {
- bool ret = true;
size_t partition = metric->partition;
+ REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
REFCOUNT refcount;
- metric_lock(metric);
-
- if(unlikely(metric->refcount <= 0))
- fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount);
-
- refcount = --metric->refcount;
-
- if(likely(metric_has_retention_unsafe(mrg, metric) || refcount != 0))
- ret = false;
+ do {
+ if(expected <= 0)
+ fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount);
- metric_unlock(metric);
+ refcount = expected - 1;
+ } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
if(unlikely(!refcount))
__atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
- return ret;
+ time_t first, last, ue;
+ mrg_metric_get_retention(mrg, metric, &first, &last, &ue);
+ return (!first || !last || first > last);
}
static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
@@ -192,7 +194,7 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r
if(unlikely(*PValue != NULL)) {
METRIC *metric = *PValue;
- metric_acquire(mrg, metric, false);
+ metric_acquire(mrg, metric);
MRG_STATS_DUPLICATE_ADD(mrg, partition);
@@ -215,10 +217,8 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r
metric->latest_update_every_s = entry->latest_update_every_s;
metric->writer = 0;
metric->refcount = 0;
- metric->flags = 0;
metric->partition = partition;
- spinlock_init(&metric->spinlock);
- metric_acquire(mrg, metric, true); // no spinlock use required here
+ metric_acquire(mrg, metric);
*PValue = metric;
MRG_STATS_ADDED_METRIC(mrg, partition);
@@ -252,7 +252,7 @@ static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t sect
METRIC *metric = *PValue;
- metric_acquire(mrg, metric, false);
+ metric_acquire(mrg, metric);
mrg_index_read_unlock(mrg, partition);
@@ -363,7 +363,7 @@ inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
}
inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
- metric_acquire(mrg, metric, false);
+ metric_acquire(mrg, metric);
return metric;
}
@@ -389,10 +389,7 @@ inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric,
if(unlikely(first_time_s < 0))
return false;
- metric_lock(metric);
- metric->first_time_s = first_time_s;
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
+ __atomic_store_n(&metric->first_time_s, first_time_s, __ATOMIC_RELAXED);
return true;
}
@@ -405,112 +402,56 @@ inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric,
internal_fatal(last_time_s > max_acceptable_collected_time(),
"DBENGINE METRIC: metric last time is in the future");
- if(unlikely(first_time_s < 0))
- first_time_s = 0;
-
- if(unlikely(last_time_s < 0))
- last_time_s = 0;
-
- if(unlikely(update_every_s < 0))
- update_every_s = 0;
-
- if(unlikely(!first_time_s && !last_time_s && !update_every_s))
- return;
+ if(first_time_s > 0)
+ set_metric_field_with_condition(metric->first_time_s, first_time_s, _current <= 0 || _wanted < _current);
- metric_lock(metric);
-
- if(unlikely(first_time_s && (!metric->first_time_s || first_time_s < metric->first_time_s)))
- metric->first_time_s = first_time_s;
-
- if(likely(last_time_s && (!metric->latest_time_s_clean || last_time_s > metric->latest_time_s_clean))) {
- metric->latest_time_s_clean = last_time_s;
-
- if(likely(update_every_s))
- metric->latest_update_every_s = (uint32_t) update_every_s;
+ if(last_time_s > 0) {
+ if(set_metric_field_with_condition(metric->latest_time_s_clean, last_time_s, _current <= 0 || _wanted > _current) &&
+ update_every_s > 0)
+ // set the latest update every too
+ set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true);
}
- else if(unlikely(!metric->latest_update_every_s && update_every_s))
- metric->latest_update_every_s = (uint32_t) update_every_s;
-
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
+ else if(update_every_s > 0)
+ // set it only if it is invalid
+ set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0);
}
inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative");
-
- bool ret = false;
-
- metric_lock(metric);
- if(first_time_s > metric->first_time_s) {
- metric->first_time_s = first_time_s;
- ret = true;
- }
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
-
- return ret;
+ return set_metric_field_with_condition(metric->first_time_s, first_time_s, _wanted > _current);
}
inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
- time_t first_time_s;
-
- metric_lock(metric);
-
- if(unlikely(!metric->first_time_s)) {
- if(metric->latest_time_s_clean)
- metric->first_time_s = metric->latest_time_s_clean;
-
- else if(metric->latest_time_s_hot)
- metric->first_time_s = metric->latest_time_s_hot;
- }
-
- first_time_s = metric->first_time_s;
-
- metric_unlock(metric);
-
- return first_time_s;
+ return mrg_metric_get_first_time_s_smart(mrg, metric);
}
inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) {
- metric_lock(metric);
-
- if(unlikely(!metric->first_time_s)) {
- if(metric->latest_time_s_clean)
- metric->first_time_s = metric->latest_time_s_clean;
-
- else if(metric->latest_time_s_hot)
- metric->first_time_s = metric->latest_time_s_hot;
- }
-
- *first_time_s = metric->first_time_s;
- *last_time_s = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
- *update_every_s = metric->latest_update_every_s;
+ time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
+ time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
- metric_unlock(metric);
+ *last_time_s = MAX(clean, hot);
+ *first_time_s = mrg_metric_get_first_time_s_smart(mrg, metric);
+ *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
}
inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative");
- if(unlikely(latest_time_s < 0))
- return false;
-
- metric_lock(metric);
-
// internal_fatal(latest_time_s > max_acceptable_collected_time(),
// "DBENGINE METRIC: metric latest time is in the future");
// internal_fatal(metric->latest_time_s_clean > latest_time_s,
// "DBENGINE METRIC: metric new clean latest time is older than the previous one");
- metric->latest_time_s_clean = latest_time_s;
+ if(latest_time_s > 0) {
+ if(set_metric_field_with_condition(metric->latest_time_s_clean, latest_time_s, true)) {
+ set_metric_field_with_condition(metric->first_time_s, latest_time_s, _current <= 0 || _wanted < _current);
- if(unlikely(!metric->first_time_s))
- metric->first_time_s = latest_time_s;
+ return true;
+ }
+ }
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
- return true;
+ return false;
}
// returns true when metric still has retention
@@ -518,7 +459,6 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr
Word_t section = mrg_metric_section(mrg, metric);
bool do_again = false;
size_t countdown = 5;
- bool ret = true;
do {
time_t min_first_time_s = LONG_MAX;
@@ -547,22 +487,20 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr
if (min_first_time_s == LONG_MAX)
min_first_time_s = 0;
- metric_lock(metric);
- if (--countdown && !min_first_time_s && metric->latest_time_s_hot)
+ if (--countdown && !min_first_time_s && __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED))
do_again = true;
else {
internal_error(!countdown, "METRIC: giving up on updating the retention of metric without disk retention");
do_again = false;
- metric->first_time_s = min_first_time_s;
- metric->latest_time_s_clean = max_end_time_s;
-
- ret = metric_has_retention_unsafe(mrg, metric);
+ set_metric_field_with_condition(metric->first_time_s, min_first_time_s, true);
+ set_metric_field_with_condition(metric->latest_time_s_clean, max_end_time_s, true);
}
- metric_unlock(metric);
} while(do_again);
- return ret;
+ time_t first, last, ue;
+ mrg_metric_get_retention(mrg, metric, &first, &last, &ue);
+ return (first && last && first < last);
}
inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
@@ -571,88 +509,80 @@ inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *me
// internal_fatal(latest_time_s > max_acceptable_collected_time(),
// "DBENGINE METRIC: metric latest time is in the future");
- if(unlikely(latest_time_s < 0))
- return false;
-
- metric_lock(metric);
- metric->latest_time_s_hot = latest_time_s;
-
- if(unlikely(!metric->first_time_s))
- metric->first_time_s = latest_time_s;
+ if(likely(latest_time_s > 0)) {
+ __atomic_store_n(&metric->latest_time_s_hot, latest_time_s, __ATOMIC_RELAXED);
+ return true;
+ }
- metric_has_retention_unsafe(mrg, metric);
- metric_unlock(metric);
- return true;
+ return false;
}
inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
- time_t max;
- metric_lock(metric);
- max = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
- metric_unlock(metric);
- return max;
+ time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
+ time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
+
+ return MAX(clean, hot);
}
inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
- if(update_every_s <= 0)
- return false;
-
- metric_lock(metric);
- metric->latest_update_every_s = (uint32_t) update_every_s;
- metric_unlock(metric);
+ if(update_every_s > 0)
+ return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true);
- return true;
+ return false;
}
inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
- if(update_every_s <= 0)
- return false;
-
- metric_lock(metric);
- if(!metric->latest_update_every_s)
- metric->latest_update_every_s = (uint32_t) update_every_s;
- metric_unlock(metric);
+ if(update_every_s > 0)
+ return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0);
- return true;
+ return false;
}
inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
- time_t update_every_s;
-
- metric_lock(metric);
- update_every_s = metric->latest_update_every_s;
- metric_unlock(metric);
-
- return update_every_s;
+ return __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
}
inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
- bool done = false;
- metric_lock(metric);
- if(!metric->writer) {
- metric->writer = gettid();
+ pid_t expected = __atomic_load_n(&metric->writer, __ATOMIC_RELAXED);
+ pid_t wanted = gettid();
+ bool done = true;
+
+ do {
+ if(expected != 0) {
+ done = false;
+ break;
+ }
+ } while(!__atomic_compare_exchange_n(&metric->writer, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+
+ if(done)
__atomic_add_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED);
- done = true;
- }
else
__atomic_add_fetch(&mrg->index[metric->partition].stats.writers_conflicts, 1, __ATOMIC_RELAXED);
- metric_unlock(metric);
+
return done;
}
inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
- bool done = false;
- metric_lock(metric);
- if(metric->writer) {
- metric->writer = 0;
+ // this function can be called from a different thread than the one than the writer
+
+ pid_t expected = __atomic_load_n(&metric->writer, __ATOMIC_RELAXED);
+ pid_t wanted = 0;
+ bool done = true;
+
+ do {
+ if(!expected) {
+ done = false;
+ break;
+ }
+ } while(!__atomic_compare_exchange_n(&metric->writer, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+
+ if(done)
__atomic_sub_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED);
- done = true;
- }
- metric_unlock(metric);
+
return done;
}
@@ -662,27 +592,30 @@ inline void mrg_update_metric_retention_and_granularity_by_uuid(
time_t update_every_s, time_t now_s)
{
if(unlikely(last_time_s > now_s)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
- "fixing last time to now",
- first_time_s, last_time_s, now_s);
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
+ "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
+ "fixing last time to now",
+ first_time_s, last_time_s, now_s);
last_time_s = now_s;
}
if (unlikely(first_time_s > last_time_s)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
- "fixing first time to last time",
- first_time_s, last_time_s, now_s);
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
+ "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
+ "fixing first time to last time",
+ first_time_s, last_time_s, now_s);
first_time_s = last_time_s;
}
if (unlikely(first_time_s == 0 || last_time_s == 0)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
- "using them as-is",
- first_time_s, last_time_s, now_s);
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
+ "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
+ "using them as-is",
+ first_time_s, last_time_s, now_s);
}
bool added = false;
@@ -710,7 +643,6 @@ inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) {
for(size_t i = 0; i < mrg->partitions ;i++) {
s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED);
s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED);
- s->entries_with_retention += __atomic_load_n(&mrg->index[i].stats.entries_with_retention, __ATOMIC_RELAXED);
s->size += __atomic_load_n(&mrg->index[i].stats.size, __ATOMIC_RELAXED);
s->current_references += __atomic_load_n(&mrg->index[i].stats.current_references, __ATOMIC_RELAXED);
s->additions += __atomic_load_n(&mrg->index[i].stats.additions, __ATOMIC_RELAXED);
@@ -900,7 +832,7 @@ int mrg_unittest(void) {
pthread_t th[threads];
for(size_t i = 0; i < threads ; i++) {
char buf[15 + 1];
- snprintfz(buf, 15, "TH[%zu]", i);
+ snprintfz(buf, sizeof(buf) - 1, "TH[%zu]", i);
netdata_thread_create(&th[i], buf,
NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
mrg_stress, &t);
diff --git a/database/engine/metric.h b/database/engine/metric.h
index 5d5ebd7b1..dbb949301 100644
--- a/database/engine/metric.h
+++ b/database/engine/metric.h
@@ -1,3 +1,4 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef DBENGINE_METRIC_H
#define DBENGINE_METRIC_H
@@ -35,9 +36,6 @@ struct mrg_statistics {
size_t entries_referenced;
- MRG_CACHE_LINE_PADDING(1);
- size_t entries_with_retention;
-
MRG_CACHE_LINE_PADDING(2);
size_t current_references;
diff --git a/database/engine/page.c b/database/engine/page.c
new file mode 100644
index 000000000..b7a393483
--- /dev/null
+++ b/database/engine/page.c
@@ -0,0 +1,679 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "page.h"
+
+#include "libnetdata/libnetdata.h"
+
+typedef enum __attribute__((packed)) {
+ PAGE_OPTION_ALL_VALUES_EMPTY = (1 << 0),
+} PAGE_OPTIONS;
+
+typedef enum __attribute__((packed)) {
+ PGD_STATE_CREATED_FROM_COLLECTOR = (1 << 0),
+ PGD_STATE_CREATED_FROM_DISK = (1 << 1),
+ PGD_STATE_SCHEDULED_FOR_FLUSHING = (1 << 2),
+ PGD_STATE_FLUSHED_TO_DISK = (1 << 3),
+} PGD_STATES;
+
+typedef struct {
+ uint8_t *data;
+ uint32_t size;
+} page_raw_t;
+
+
+typedef struct {
+ size_t num_buffers;
+ gorilla_writer_t *writer;
+ int aral_index;
+} page_gorilla_t;
+
+struct pgd {
+ // the page type
+ uint8_t type;
+
+ // options related to the page
+ PAGE_OPTIONS options;
+
+ PGD_STATES states;
+
+ // the uses number of slots in the page
+ uint32_t used;
+
+ // the total number of slots available in the page
+ uint32_t slots;
+
+ union {
+ page_raw_t raw;
+ page_gorilla_t gorilla;
+ };
+};
+
+// ----------------------------------------------------------------------------
+// memory management
+
+struct {
+ ARAL *aral_pgd;
+ ARAL *aral_data[RRD_STORAGE_TIERS];
+ ARAL *aral_gorilla_buffer[4];
+ ARAL *aral_gorilla_writer[4];
+} pgd_alloc_globals = {};
+
+static ARAL *pgd_aral_data_lookup(size_t size)
+{
+ for (size_t tier = 0; tier < storage_tiers; tier++)
+ if (size == tier_page_size[tier])
+ return pgd_alloc_globals.aral_data[tier];
+
+ return NULL;
+}
+
+void pgd_init_arals(void)
+{
+ // pgd aral
+ {
+ char buf[20 + 1];
+ snprintfz(buf, sizeof(buf) - 1, "pgd");
+
+ // FIXME: add stats
+ pgd_alloc_globals.aral_pgd = aral_create(
+ buf,
+ sizeof(struct pgd),
+ 64,
+ 512 * (sizeof(struct pgd)),
+ pgc_aral_statistics(),
+ NULL, NULL, false, false);
+ }
+
+ // tier page aral
+ {
+ for (size_t i = storage_tiers; i > 0 ;i--)
+ {
+ size_t tier = storage_tiers - i;
+
+ char buf[20 + 1];
+ snprintfz(buf, sizeof(buf) - 1, "tier%zu-pages", tier);
+
+ pgd_alloc_globals.aral_data[tier] = aral_create(
+ buf,
+ tier_page_size[tier],
+ 64,
+ 512 * (tier_page_size[tier]),
+ pgc_aral_statistics(),
+ NULL, NULL, false, false);
+ }
+ }
+
+ // gorilla buffers aral
+ for (size_t i = 0; i != 4; i++) {
+ char buf[20 + 1];
+ snprintfz(buf, sizeof(buf) - 1, "gbuffer-%zu", i);
+
+ // FIXME: add stats
+ pgd_alloc_globals.aral_gorilla_buffer[i] = aral_create(
+ buf,
+ GORILLA_BUFFER_SIZE,
+ 64,
+ 512 * GORILLA_BUFFER_SIZE,
+ pgc_aral_statistics(),
+ NULL, NULL, false, false);
+ }
+
+ // gorilla writers aral
+ for (size_t i = 0; i != 4; i++) {
+ char buf[20 + 1];
+ snprintfz(buf, sizeof(buf) - 1, "gwriter-%zu", i);
+
+ // FIXME: add stats
+ pgd_alloc_globals.aral_gorilla_writer[i] = aral_create(
+ buf,
+ sizeof(gorilla_writer_t),
+ 64,
+ 512 * sizeof(gorilla_writer_t),
+ pgc_aral_statistics(),
+ NULL, NULL, false, false);
+ }
+}
+
+static void *pgd_data_aral_alloc(size_t size)
+{
+ ARAL *ar = pgd_aral_data_lookup(size);
+ if (!ar)
+ return mallocz(size);
+ else
+ return aral_mallocz(ar);
+}
+
+static void pgd_data_aral_free(void *page, size_t size)
+{
+ ARAL *ar = pgd_aral_data_lookup(size);
+ if (!ar)
+ freez(page);
+ else
+ aral_freez(ar, page);
+}
+
+// ----------------------------------------------------------------------------
+// management api
+
+PGD *pgd_create(uint8_t type, uint32_t slots)
+{
+ PGD *pg = aral_mallocz(pgd_alloc_globals.aral_pgd);
+ pg->type = type;
+ pg->used = 0;
+ pg->slots = slots;
+ pg->options = PAGE_OPTION_ALL_VALUES_EMPTY;
+ pg->states = PGD_STATE_CREATED_FROM_COLLECTOR;
+
+ switch (type) {
+ case PAGE_METRICS:
+ case PAGE_TIER: {
+ uint32_t size = slots * page_type_size[type];
+
+ internal_fatal(!size || slots == 1,
+ "DBENGINE: invalid number of slots (%u) or page type (%u)", slots, type);
+
+ pg->raw.size = size;
+ pg->raw.data = pgd_data_aral_alloc(size);
+ break;
+ }
+ case PAGE_GORILLA_METRICS: {
+ internal_fatal(slots == 1,
+ "DBENGINE: invalid number of slots (%u) or page type (%u)", slots, type);
+
+ pg->slots = 8 * GORILLA_BUFFER_SLOTS;
+
+ // allocate new gorilla writer
+ pg->gorilla.aral_index = gettid() % 4;
+ pg->gorilla.writer = aral_mallocz(pgd_alloc_globals.aral_gorilla_writer[pg->gorilla.aral_index]);
+
+ // allocate new gorilla buffer
+ gorilla_buffer_t *gbuf = aral_mallocz(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index]);
+ memset(gbuf, 0, GORILLA_BUFFER_SIZE);
+ global_statistics_gorilla_buffer_add_hot();
+
+ *pg->gorilla.writer = gorilla_writer_init(gbuf, GORILLA_BUFFER_SLOTS);
+ pg->gorilla.num_buffers = 1;
+
+ break;
+ }
+ default:
+ fatal("Unknown page type: %uc", type);
+ }
+
+ return pg;
+}
+
+PGD *pgd_create_from_disk_data(uint8_t type, void *base, uint32_t size)
+{
+ if (!size)
+ return PGD_EMPTY;
+
+ if (size < page_type_size[type])
+ return PGD_EMPTY;
+
+ PGD *pg = aral_mallocz(pgd_alloc_globals.aral_pgd);
+
+ pg->type = type;
+ pg->states = PGD_STATE_CREATED_FROM_DISK;
+ pg->options = ~PAGE_OPTION_ALL_VALUES_EMPTY;
+
+ switch (type)
+ {
+ case PAGE_METRICS:
+ case PAGE_TIER:
+ pg->raw.size = size;
+ pg->used = size / page_type_size[type];
+ pg->slots = pg->used;
+
+ pg->raw.data = pgd_data_aral_alloc(size);
+ memcpy(pg->raw.data, base, size);
+ break;
+ case PAGE_GORILLA_METRICS:
+ internal_fatal(size == 0, "Asked to create page with 0 data!!!");
+ internal_fatal(size % sizeof(uint32_t), "Unaligned gorilla buffer size");
+ internal_fatal(size % GORILLA_BUFFER_SIZE, "Expected size to be a multiple of %zu-bytes", GORILLA_BUFFER_SIZE);
+
+ pg->raw.data = mallocz(size);
+ pg->raw.size = size;
+
+ // TODO: rm this
+ memset(pg->raw.data, 0, size);
+ memcpy(pg->raw.data, base, size);
+
+ uint32_t total_entries = gorilla_buffer_patch((void *) pg->raw.data);
+
+ pg->used = total_entries;
+ pg->slots = pg->used;
+ break;
+ default:
+ fatal("Unknown page type: %uc", type);
+ }
+
+ return pg;
+}
+
+void pgd_free(PGD *pg)
+{
+ if (!pg)
+ return;
+
+ if (pg == PGD_EMPTY)
+ return;
+
+ switch (pg->type)
+ {
+ case PAGE_METRICS:
+ case PAGE_TIER:
+ pgd_data_aral_free(pg->raw.data, pg->raw.size);
+ break;
+ case PAGE_GORILLA_METRICS: {
+ if (pg->states & PGD_STATE_CREATED_FROM_DISK)
+ {
+ internal_fatal(pg->raw.data == NULL, "Tried to free gorilla PGD loaded from disk with NULL data");
+ freez(pg->raw.data);
+ pg->raw.data = NULL;
+ }
+ else if ((pg->states & PGD_STATE_CREATED_FROM_COLLECTOR) ||
+ (pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING) ||
+ (pg->states & PGD_STATE_FLUSHED_TO_DISK))
+ {
+ internal_fatal(pg->gorilla.writer == NULL,
+ "PGD does not have an active gorilla writer");
+
+ internal_fatal(pg->gorilla.num_buffers == 0,
+ "PGD does not have any gorilla buffers allocated");
+
+ while (true) {
+ gorilla_buffer_t *gbuf = gorilla_writer_drop_head_buffer(pg->gorilla.writer);
+ if (!gbuf)
+ break;
+ aral_freez(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index], gbuf);
+ pg->gorilla.num_buffers -= 1;
+ }
+
+ internal_fatal(pg->gorilla.num_buffers != 0,
+ "Could not free all gorilla writer buffers");
+
+ aral_freez(pgd_alloc_globals.aral_gorilla_writer[pg->gorilla.aral_index], pg->gorilla.writer);
+ pg->gorilla.writer = NULL;
+ } else {
+ fatal("pgd_free() called on gorilla page with unsupported state");
+ // TODO: should we support any other states?
+ // if (!(pg->states & PGD_STATE_FLUSHED_TO_DISK))
+ // fatal("pgd_free() is not supported yet for pages flushed to disk");
+ }
+
+ break;
+ }
+ default:
+ fatal("Unknown page type: %uc", pg->type);
+ }
+
+ aral_freez(pgd_alloc_globals.aral_pgd, pg);
+}
+
+// ----------------------------------------------------------------------------
+// utility functions
+
+uint32_t pgd_type(PGD *pg)
+{
+ return pg->type;
+}
+
+bool pgd_is_empty(PGD *pg)
+{
+ if (!pg)
+ return true;
+
+ if (pg == PGD_EMPTY)
+ return true;
+
+ if (pg->used == 0)
+ return true;
+
+ if (pg->options & PAGE_OPTION_ALL_VALUES_EMPTY)
+ return true;
+
+ return false;
+}
+
+uint32_t pgd_slots_used(PGD *pg)
+{
+ if (!pg)
+ return 0;
+
+ if (pg == PGD_EMPTY)
+ return 0;
+
+ return pg->used;
+}
+
+uint32_t pgd_memory_footprint(PGD *pg)
+{
+ if (!pg)
+ return 0;
+
+ if (pg == PGD_EMPTY)
+ return 0;
+
+ size_t footprint = 0;
+ switch (pg->type) {
+ case PAGE_METRICS:
+ case PAGE_TIER:
+ footprint = sizeof(PGD) + pg->raw.size;
+ break;
+ case PAGE_GORILLA_METRICS: {
+ if (pg->states & PGD_STATE_CREATED_FROM_DISK)
+ footprint = sizeof(PGD) + pg->raw.size;
+ else
+ footprint = sizeof(PGD) + sizeof(gorilla_writer_t) + (pg->gorilla.num_buffers * GORILLA_BUFFER_SIZE);
+
+ break;
+ }
+ default:
+ fatal("Unknown page type: %uc", pg->type);
+ }
+
+ return footprint;
+}
+
+uint32_t pgd_disk_footprint(PGD *pg)
+{
+ if (!pgd_slots_used(pg))
+ return 0;
+
+ size_t size = 0;
+
+ switch (pg->type) {
+ case PAGE_METRICS:
+ case PAGE_TIER: {
+ uint32_t used_size = pg->used * page_type_size[pg->type];
+ internal_fatal(used_size > pg->raw.size, "Wrong disk footprint page size");
+ size = used_size;
+
+ break;
+ }
+ case PAGE_GORILLA_METRICS: {
+ if (pg->states & PGD_STATE_CREATED_FROM_COLLECTOR ||
+ pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING ||
+ pg->states & PGD_STATE_FLUSHED_TO_DISK)
+ {
+ internal_fatal(!pg->gorilla.writer,
+ "pgd_disk_footprint() not implemented for NULL gorilla writers");
+
+ internal_fatal(pg->gorilla.num_buffers == 0,
+ "Gorilla writer does not have any buffers");
+
+ size = pg->gorilla.num_buffers * GORILLA_BUFFER_SIZE;
+
+ if (pg->states & PGD_STATE_CREATED_FROM_COLLECTOR) {
+ global_statistics_tier0_disk_compressed_bytes(gorilla_writer_nbytes(pg->gorilla.writer));
+ global_statistics_tier0_disk_uncompressed_bytes(gorilla_writer_entries(pg->gorilla.writer) * sizeof(storage_number));
+ }
+ } else if (pg->states & PGD_STATE_CREATED_FROM_DISK) {
+ size = pg->raw.size;
+ } else {
+ fatal("Asked disk footprint on unknown page state");
+ }
+
+ break;
+ }
+ default:
+ fatal("Unknown page type: %uc", pg->type);
+ }
+
+ internal_fatal(pg->states & PGD_STATE_CREATED_FROM_DISK,
+ "Disk footprint asked for page created from disk.");
+ pg->states = PGD_STATE_SCHEDULED_FOR_FLUSHING;
+ return size;
+}
+
+void pgd_copy_to_extent(PGD *pg, uint8_t *dst, uint32_t dst_size)
+{
+ internal_fatal(pgd_disk_footprint(pg) != dst_size, "Wrong disk footprint size requested (need %u, available %u)",
+ pgd_disk_footprint(pg), dst_size);
+
+ switch (pg->type) {
+ case PAGE_METRICS:
+ case PAGE_TIER:
+ memcpy(dst, pg->raw.data, dst_size);
+ break;
+ case PAGE_GORILLA_METRICS: {
+ if ((pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING) == 0)
+ fatal("Copying to extent is supported only for PGDs that are scheduled for flushing.");
+
+ internal_fatal(!pg->gorilla.writer,
+ "pgd_copy_to_extent() not implemented for NULL gorilla writers");
+
+ internal_fatal(pg->gorilla.num_buffers == 0,
+ "pgd_copy_to_extent() gorilla writer does not have any buffers");
+
+ bool ok = gorilla_writer_serialize(pg->gorilla.writer, dst, dst_size);
+ UNUSED(ok);
+ internal_fatal(!ok,
+ "pgd_copy_to_extent() tried to serialize pg=%p, gw=%p (with dst_size=%u bytes, num_buffers=%zu)",
+ pg, pg->gorilla.writer, dst_size, pg->gorilla.num_buffers);
+ break;
+ }
+ default:
+ fatal("Unknown page type: %uc", pg->type);
+ }
+
+ pg->states = PGD_STATE_FLUSHED_TO_DISK;
+}
+
+// ----------------------------------------------------------------------------
+// data collection
+
+void pgd_append_point(PGD *pg,
+ usec_t point_in_time_ut __maybe_unused,
+ NETDATA_DOUBLE n,
+ NETDATA_DOUBLE min_value,
+ NETDATA_DOUBLE max_value,
+ uint16_t count,
+ uint16_t anomaly_count,
+ SN_FLAGS flags,
+ uint32_t expected_slot)
+{
+ if (unlikely(pg->used >= pg->slots))
+ fatal("DBENGINE: attempted to write beyond page size (page type %u, slots %u, used %u)",
+ pg->type, pg->slots, pg->used /* FIXME:, pg->size */);
+
+ if (unlikely(pg->used != expected_slot))
+ fatal("DBENGINE: page is not aligned to expected slot (used %u, expected %u)",
+ pg->used, expected_slot);
+
+ if (!(pg->states & PGD_STATE_CREATED_FROM_COLLECTOR))
+ fatal("DBENGINE: collection on page not created from a collector");
+
+ if (pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING)
+ fatal("Data collection on page already scheduled for flushing");
+
+ switch (pg->type) {
+ case PAGE_METRICS: {
+ storage_number *tier0_metric_data = (storage_number *)pg->raw.data;
+ storage_number t = pack_storage_number(n, flags);
+ tier0_metric_data[pg->used++] = t;
+
+ if ((pg->options & PAGE_OPTION_ALL_VALUES_EMPTY) && does_storage_number_exist(t))
+ pg->options &= ~PAGE_OPTION_ALL_VALUES_EMPTY;
+
+ break;
+ }
+ case PAGE_TIER: {
+ storage_number_tier1_t *tier12_metric_data = (storage_number_tier1_t *)pg->raw.data;
+ storage_number_tier1_t t;
+ t.sum_value = (float) n;
+ t.min_value = (float) min_value;
+ t.max_value = (float) max_value;
+ t.anomaly_count = anomaly_count;
+ t.count = count;
+ tier12_metric_data[pg->used++] = t;
+
+ if ((pg->options & PAGE_OPTION_ALL_VALUES_EMPTY) && fpclassify(n) != FP_NAN)
+ pg->options &= ~PAGE_OPTION_ALL_VALUES_EMPTY;
+
+ break;
+ }
+ case PAGE_GORILLA_METRICS: {
+ pg->used++;
+ storage_number t = pack_storage_number(n, flags);
+
+ if ((pg->options & PAGE_OPTION_ALL_VALUES_EMPTY) && does_storage_number_exist(t))
+ pg->options &= ~PAGE_OPTION_ALL_VALUES_EMPTY;
+
+ bool ok = gorilla_writer_write(pg->gorilla.writer, t);
+ if (!ok) {
+ gorilla_buffer_t *new_buffer = aral_mallocz(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index]);
+ memset(new_buffer, 0, GORILLA_BUFFER_SIZE);
+
+ gorilla_writer_add_buffer(pg->gorilla.writer, new_buffer, GORILLA_BUFFER_SLOTS);
+ pg->gorilla.num_buffers += 1;
+ global_statistics_gorilla_buffer_add_hot();
+
+ ok = gorilla_writer_write(pg->gorilla.writer, t);
+ internal_fatal(ok == false, "Failed to writer value in newly allocated gorilla buffer.");
+ }
+ break;
+ }
+ default:
+ fatal("DBENGINE: unknown page type id %d", pg->type);
+ break;
+ }
+}
+
+// ----------------------------------------------------------------------------
+// querying with cursor
+
+static void pgdc_seek(PGDC *pgdc, uint32_t position)
+{
+ PGD *pg = pgdc->pgd;
+
+ switch (pg->type) {
+ case PAGE_METRICS:
+ case PAGE_TIER:
+ pgdc->slots = pgdc->pgd->used;
+ break;
+ case PAGE_GORILLA_METRICS: {
+ if (pg->states & PGD_STATE_CREATED_FROM_DISK) {
+ pgdc->slots = pgdc->pgd->slots;
+ pgdc->gr = gorilla_reader_init((void *) pg->raw.data);
+ } else {
+ if (!(pg->states & PGD_STATE_CREATED_FROM_COLLECTOR) &&
+ !(pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING) &&
+ !(pg->states & PGD_STATE_FLUSHED_TO_DISK))
+ fatal("pgdc_seek() currently is not supported for pages created from disk.");
+
+ if (!pg->gorilla.writer)
+ fatal("Seeking from a page without an active gorilla writer is not supported (yet).");
+
+ pgdc->slots = gorilla_writer_entries(pg->gorilla.writer);
+ pgdc->gr = gorilla_writer_get_reader(pg->gorilla.writer);
+ }
+
+ if (position > pgdc->slots)
+ position = pgdc->slots;
+
+ for (uint32_t i = 0; i != position; i++) {
+ uint32_t value;
+
+ bool ok = gorilla_reader_read(&pgdc->gr, &value);
+
+ if (!ok) {
+ // this is fine, the reader will return empty points
+ break;
+ }
+ }
+
+ break;
+ }
+ default:
+ fatal("DBENGINE: unknown page type id %d", pg->type);
+ break;
+ }
+}
+
+void pgdc_reset(PGDC *pgdc, PGD *pgd, uint32_t position)
+{
+ // pgd might be null and position equal to UINT32_MAX
+
+ pgdc->pgd = pgd;
+ pgdc->position = position;
+
+ if (!pgd)
+ return;
+
+ if (pgd == PGD_EMPTY)
+ return;
+
+ if (position == UINT32_MAX)
+ return;
+
+ pgdc_seek(pgdc, position);
+}
+
+bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *sp)
+{
+ if (!pgdc->pgd || pgdc->pgd == PGD_EMPTY || pgdc->position >= pgdc->slots)
+ {
+ storage_point_empty(*sp, sp->start_time_s, sp->end_time_s);
+ return false;
+ }
+
+ internal_fatal(pgdc->position != expected_position, "Wrong expected cursor position");
+
+ switch (pgdc->pgd->type)
+ {
+ case PAGE_METRICS: {
+ storage_number *array = (storage_number *) pgdc->pgd->raw.data;
+ storage_number n = array[pgdc->position++];
+
+ sp->min = sp->max = sp->sum = unpack_storage_number(n);
+ sp->flags = (SN_FLAGS)(n & SN_USER_FLAGS);
+ sp->count = 1;
+ sp->anomaly_count = is_storage_number_anomalous(n) ? 1 : 0;
+
+ return true;
+ }
+ case PAGE_TIER: {
+ storage_number_tier1_t *array = (storage_number_tier1_t *) pgdc->pgd->raw.data;
+ storage_number_tier1_t n = array[pgdc->position++];
+
+ sp->flags = n.anomaly_count ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS;
+ sp->count = n.count;
+ sp->anomaly_count = n.anomaly_count;
+ sp->min = n.min_value;
+ sp->max = n.max_value;
+ sp->sum = n.sum_value;
+
+ return true;
+ }
+ case PAGE_GORILLA_METRICS: {
+ pgdc->position++;
+
+ uint32_t n = 666666666;
+ bool ok = gorilla_reader_read(&pgdc->gr, &n);
+ if (ok) {
+ sp->min = sp->max = sp->sum = unpack_storage_number(n);
+ sp->flags = (SN_FLAGS)(n & SN_USER_FLAGS);
+ sp->count = 1;
+ sp->anomaly_count = is_storage_number_anomalous(n) ? 1 : 0;
+ } else {
+ storage_point_empty(*sp, sp->start_time_s, sp->end_time_s);
+ }
+
+ return ok;
+ }
+ default: {
+ static bool logged = false;
+ if (!logged)
+ {
+ netdata_log_error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", pgd_type(pgdc->pgd));
+ logged = true;
+ }
+
+ storage_point_empty(*sp, sp->start_time_s, sp->end_time_s);
+ return false;
+ }
+ }
+}
diff --git a/database/engine/page.h b/database/engine/page.h
new file mode 100644
index 000000000..32c87c580
--- /dev/null
+++ b/database/engine/page.h
@@ -0,0 +1,58 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef DBENGINE_PAGE_H
+#define DBENGINE_PAGE_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "libnetdata/libnetdata.h"
+
+typedef struct pgd_cursor {
+ struct pgd *pgd;
+ uint32_t position;
+ uint32_t slots;
+
+ gorilla_reader_t gr;
+} PGDC;
+
+#include "rrdengine.h"
+
+typedef struct pgd PGD;
+
+#define PGD_EMPTY (PGD *)(-1)
+
+void pgd_init_arals(void);
+
+PGD *pgd_create(uint8_t type, uint32_t slots);
+PGD *pgd_create_from_disk_data(uint8_t type, void *base, uint32_t size);
+void pgd_free(PGD *pg);
+
+uint32_t pgd_type(PGD *pg);
+bool pgd_is_empty(PGD *pg);
+uint32_t pgd_slots_used(PGD *pg);
+
+uint32_t pgd_memory_footprint(PGD *pg);
+uint32_t pgd_disk_footprint(PGD *pg);
+
+void pgd_copy_to_extent(PGD *pg, uint8_t *dst, uint32_t dst_size);
+
+void pgd_append_point(PGD *pg,
+ usec_t point_in_time_ut,
+ NETDATA_DOUBLE n,
+ NETDATA_DOUBLE min_value,
+ NETDATA_DOUBLE max_value,
+ uint16_t count,
+ uint16_t anomaly_count,
+ SN_FLAGS flags,
+ uint32_t expected_slot);
+
+void pgdc_reset(PGDC *pgdc, PGD *pgd, uint32_t position);
+bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *sp);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif // DBENGINE_PAGE_H
diff --git a/database/engine/page_test.cc b/database/engine/page_test.cc
new file mode 100644
index 000000000..d61299bc4
--- /dev/null
+++ b/database/engine/page_test.cc
@@ -0,0 +1,405 @@
+#include "page.h"
+#include "page_test.h"
+
+#ifdef HAVE_GTEST
+
+#include <gtest/gtest.h>
+#include <limits>
+#include <random>
+
+bool operator==(const STORAGE_POINT lhs, const STORAGE_POINT rhs) {
+ if (lhs.min != rhs.min)
+ return false;
+
+ if (lhs.max != rhs.max)
+ return false;
+
+ if (lhs.sum != rhs.sum)
+ return false;
+
+ if (lhs.start_time_s != rhs.start_time_s)
+ return false;
+
+ if (lhs.end_time_s != rhs.end_time_s)
+ return false;
+
+ if (lhs.count != rhs.count)
+ return false;
+
+ if (lhs.flags != rhs.flags)
+ return false;
+
+ return true;
+}
+
+// TODO: use value-parameterized tests
+// http://google.github.io/googletest/advanced.html#value-parameterized-tests
+static uint8_t page_type = PAGE_GORILLA_METRICS;
+
+static size_t slots_for_page(size_t n) {
+ switch (page_type) {
+ case PAGE_METRICS:
+ return 1024;
+ case PAGE_GORILLA_METRICS:
+ return n;
+ default:
+ fatal("Slots requested for unsupported page: %uc", page_type);
+ }
+}
+
+TEST(PGD, EmptyOrNull) {
+ PGD *pg = NULL;
+
+ PGDC cursor;
+ STORAGE_POINT sp;
+
+ EXPECT_TRUE(pgd_is_empty(pg));
+ EXPECT_EQ(pgd_slots_used(pg), 0);
+ EXPECT_EQ(pgd_memory_footprint(pg), 0);
+ EXPECT_EQ(pgd_disk_footprint(pg), 0);
+
+ pgdc_reset(&cursor, pg, 0);
+ EXPECT_FALSE(pgdc_get_next_point(&cursor, 0, &sp));
+
+ pgd_free(pg);
+
+ pg = PGD_EMPTY;
+
+ EXPECT_TRUE(pgd_is_empty(pg));
+ EXPECT_EQ(pgd_slots_used(pg), 0);
+ EXPECT_EQ(pgd_memory_footprint(pg), 0);
+ EXPECT_EQ(pgd_disk_footprint(pg), 0);
+ EXPECT_FALSE(pgdc_get_next_point(&cursor, 0, &sp));
+
+ pgdc_reset(&cursor, pg, 0);
+ EXPECT_FALSE(pgdc_get_next_point(&cursor, 0, &sp));
+
+ pgd_free(pg);
+}
+
+TEST(PGD, Create) {
+ size_t slots = slots_for_page(1024 * 1024);
+ PGD *pg = pgd_create(page_type, slots);
+
+ EXPECT_EQ(pgd_type(pg), page_type);
+ EXPECT_TRUE(pgd_is_empty(pg));
+ EXPECT_EQ(pgd_slots_used(pg), 0);
+
+ for (size_t i = 0; i != slots; i++) {
+ pgd_append_point(pg, i, i, 0, 0, 1, 1, SN_DEFAULT_FLAGS, i);
+ EXPECT_FALSE(pgd_is_empty(pg));
+ }
+ EXPECT_EQ(pgd_slots_used(pg), slots);
+
+ EXPECT_DEATH(
+ pgd_append_point(pg, slots, slots, 0, 0, 1, 1, SN_DEFAULT_FLAGS, slots),
+ ".*"
+ );
+
+ pgd_free(pg);
+}
+
+TEST(PGD, CursorFullPage) {
+ size_t slots = slots_for_page(1024 * 1024);
+ PGD *pg = pgd_create(page_type, slots);
+
+ for (size_t slot = 0; slot != slots; slot++)
+ pgd_append_point(pg, slot, slot, 0, 0, 1, 1, SN_DEFAULT_FLAGS, slot);
+
+ for (size_t i = 0; i != 2; i++) {
+ PGDC cursor;
+ pgdc_reset(&cursor, pg, 0);
+
+ STORAGE_POINT sp;
+ for (size_t slot = 0; slot != slots; slot++) {
+ EXPECT_TRUE(pgdc_get_next_point(&cursor, slot, &sp));
+
+ EXPECT_EQ(slot, static_cast<size_t>(sp.min));
+ EXPECT_EQ(sp.min, sp.max);
+ EXPECT_EQ(sp.min, sp.sum);
+ EXPECT_EQ(sp.count, 1);
+ EXPECT_EQ(sp.anomaly_count, 0);
+ }
+
+ EXPECT_FALSE(pgdc_get_next_point(&cursor, slots, &sp));
+ }
+
+ for (size_t i = 0; i != 2; i++) {
+ PGDC cursor;
+ pgdc_reset(&cursor, pg, slots / 2);
+
+ STORAGE_POINT sp;
+ for (size_t slot = slots / 2; slot != slots; slot++) {
+ EXPECT_TRUE(pgdc_get_next_point(&cursor, slot, &sp));
+
+ EXPECT_EQ(slot, static_cast<size_t>(sp.min));
+ EXPECT_EQ(sp.min, sp.max);
+ EXPECT_EQ(sp.min, sp.sum);
+ EXPECT_EQ(sp.count, 1);
+ EXPECT_EQ(sp.anomaly_count, 0);
+ }
+
+ EXPECT_FALSE(pgdc_get_next_point(&cursor, slots, &sp));
+ }
+
+ // out of bounds seek
+ {
+ PGDC cursor;
+ pgdc_reset(&cursor, pg, 2 * slots);
+
+ STORAGE_POINT sp;
+ EXPECT_FALSE(pgdc_get_next_point(&cursor, 2 * slots, &sp));
+ }
+
+ pgd_free(pg);
+}
+
+TEST(PGD, CursorHalfPage) {
+ size_t slots = slots_for_page(1024 * 1024);
+ PGD *pg = pgd_create(page_type, slots);
+
+ PGDC cursor;
+ STORAGE_POINT sp;
+
+ // fill the 1st half of the page
+ for (size_t slot = 0; slot != slots / 2; slot++)
+ pgd_append_point(pg, slot, slot, 0, 0, 1, 1, SN_DEFAULT_FLAGS, slot);
+
+ pgdc_reset(&cursor, pg, 0);
+
+ for (size_t slot = 0; slot != slots / 2; slot++) {
+ EXPECT_TRUE(pgdc_get_next_point(&cursor, slot, &sp));
+
+ EXPECT_EQ(slot, static_cast<size_t>(sp.min));
+ EXPECT_EQ(sp.min, sp.max);
+ EXPECT_EQ(sp.min, sp.sum);
+ EXPECT_EQ(sp.count, 1);
+ EXPECT_EQ(sp.anomaly_count, 0);
+ }
+ EXPECT_FALSE(pgdc_get_next_point(&cursor, slots / 2, &sp));
+
+ // reset pgdc to the end of the page, we should not be getting more
+ // points even if the page has grown in between.
+
+ pgdc_reset(&cursor, pg, slots / 2);
+
+ for (size_t slot = slots / 2; slot != slots; slot++)
+ pgd_append_point(pg, slot, slot, 0, 0, 1, 1, SN_DEFAULT_FLAGS, slot);
+
+ for (size_t slot = slots / 2; slot != slots; slot++)
+ EXPECT_FALSE(pgdc_get_next_point(&cursor, slot, &sp));
+
+ EXPECT_FALSE(pgdc_get_next_point(&cursor, slots, &sp));
+
+ pgd_free(pg);
+}
+
+TEST(PGD, MemoryFootprint) {
+ size_t slots = slots_for_page(1024 * 1024);
+ PGD *pg = pgd_create(page_type, slots);
+
+ uint32_t footprint = 0;
+ switch (pgd_type(pg)) {
+ case PAGE_METRICS:
+ footprint = slots * sizeof(uint32_t);
+ break;
+ case PAGE_GORILLA_METRICS:
+ footprint = 128 * sizeof(uint32_t);
+ break;
+ default:
+ fatal("Uknown page type: %uc", pgd_type(pg));
+ }
+ EXPECT_NEAR(pgd_memory_footprint(pg), footprint, 128);
+
+ std::random_device rand_dev;
+ std::mt19937 gen(rand_dev());
+ std::uniform_int_distribution<uint32_t> distr(std::numeric_limits<uint32_t>::min(),
+ std::numeric_limits<uint32_t>::max()); // define the range
+
+ for (size_t slot = 0; slot != slots; slot++) {
+ uint32_t n = distr(gen);
+ pgd_append_point(pg, slot, n, 0, 0, 1, 1, SN_DEFAULT_FLAGS, slot);
+ }
+
+ footprint = slots * sizeof(uint32_t);
+
+ uint32_t abs_error = 0;
+ switch (pgd_type(pg)) {
+ case PAGE_METRICS:
+ abs_error = 128;
+ break;
+ case PAGE_GORILLA_METRICS:
+ abs_error = footprint / 10;
+ break;
+ default:
+ fatal("Uknown page type: %uc", pgd_type(pg));
+ }
+
+ EXPECT_NEAR(pgd_memory_footprint(pg), footprint, abs_error);
+}
+
+TEST(PGD, DiskFootprint) {
+ size_t slots = slots_for_page(1024 * 1024);
+ PGD *pg = pgd_create(page_type, slots);
+
+ std::random_device rand_dev;
+ std::mt19937 gen(rand_dev());
+ std::uniform_int_distribution<uint32_t> distr(std::numeric_limits<uint32_t>::min(),
+ std::numeric_limits<uint32_t>::max()); // define the range
+
+ size_t used_slots = 16;
+
+ for (size_t slot = 0; slot != used_slots; slot++) {
+ uint32_t n = distr(gen);
+ pgd_append_point(pg, slot, n, 0, 0, 1, 1, SN_DEFAULT_FLAGS, slot);
+ }
+
+ uint32_t footprint = 0;
+ switch (pgd_type(pg)) {
+ case PAGE_METRICS:
+ footprint = used_slots * sizeof(uint32_t);
+ break;
+ case PAGE_GORILLA_METRICS:
+ footprint = 128 * sizeof(uint32_t);
+ break;
+ default:
+ fatal("Uknown page type: %uc", pgd_type(pg));
+ }
+ EXPECT_EQ(pgd_disk_footprint(pg), footprint);
+
+ pgd_free(pg);
+
+ pg = pgd_create(page_type, slots);
+
+ used_slots = 128 + 64;
+
+ for (size_t slot = 0; slot != used_slots; slot++) {
+ uint32_t n = distr(gen);
+ pgd_append_point(pg, slot, n, 0, 0, 1, 1, SN_DEFAULT_FLAGS, slot);
+ }
+
+ switch (pgd_type(pg)) {
+ case PAGE_METRICS:
+ footprint = used_slots * sizeof(uint32_t);
+ break;
+ case PAGE_GORILLA_METRICS:
+ footprint = 2 * (128 * sizeof(uint32_t));
+ break;
+ default:
+ fatal("Uknown page type: %uc", pgd_type(pg));
+ }
+ EXPECT_EQ(pgd_disk_footprint(pg), footprint);
+
+ pgd_free(pg);
+}
+
+TEST(PGD, CopyToExtent) {
+ size_t slots = slots_for_page(1024 * 1024);
+ PGD *pg_collector = pgd_create(page_type, slots);
+
+ uint32_t value = 666;
+ pgd_append_point(pg_collector, 0, value, 0, 0, 1, 0, SN_DEFAULT_FLAGS, 0);
+
+ uint32_t size_in_bytes = pgd_disk_footprint(pg_collector);
+ EXPECT_EQ(size_in_bytes, 512);
+
+ uint32_t size_in_words = size_in_bytes / sizeof(uint32_t);
+ alignas(sizeof(uintptr_t)) uint32_t disk_buffer[size_in_words];
+
+ for (size_t i = 0; i != size_in_words; i++) {
+ disk_buffer[i] = std::numeric_limits<uint32_t>::max();
+ }
+
+ pgd_copy_to_extent(pg_collector, (uint8_t *) &disk_buffer[0], size_in_bytes);
+
+ EXPECT_EQ(disk_buffer[0], NULL);
+ EXPECT_EQ(disk_buffer[1], NULL);
+ EXPECT_EQ(disk_buffer[2], 1);
+ EXPECT_EQ(disk_buffer[3], 32);
+ storage_number sn = pack_storage_number(value, SN_DEFAULT_FLAGS);
+ EXPECT_EQ(disk_buffer[4], sn);
+
+ // make sure the rest of the page is 0'ed so that it's amenable to compression
+ for (size_t i = 5; i != size_in_words; i++)
+ EXPECT_EQ(disk_buffer[i], 0);
+
+ pgd_free(pg_collector);
+}
+
+TEST(PGD, Roundtrip) {
+ size_t slots = slots_for_page(1024 * 1024);
+ PGD *pg_collector = pgd_create(page_type, slots);
+
+ for (size_t i = 0; i != slots; i++)
+ pgd_append_point(pg_collector, i, i, 0, 0, 1, 1, SN_DEFAULT_FLAGS, i);
+
+ uint32_t size_in_bytes = pgd_disk_footprint(pg_collector);
+ uint32_t size_in_words = size_in_bytes / sizeof(uint32_t);
+
+ alignas(sizeof(uintptr_t)) uint32_t disk_buffer[size_in_words];
+ for (size_t i = 0; i != size_in_words; i++)
+ disk_buffer[i] = std::numeric_limits<uint32_t>::max();
+
+ pgd_copy_to_extent(pg_collector, (uint8_t *) &disk_buffer[0], size_in_bytes);
+
+ PGD *pg_disk = pgd_create_from_disk_data(page_type, &disk_buffer[0], size_in_bytes);
+ EXPECT_EQ(pgd_slots_used(pg_disk), slots);
+
+ // Expected memory footprint is equal to the disk footprint + a couple
+ // bytes for the PGD metadata.
+ EXPECT_NEAR(pgd_memory_footprint(pg_disk), size_in_bytes, 128);
+
+ // Do not allow calling disk footprint for pages created from disk.
+ EXPECT_DEATH(pgd_disk_footprint(pg_disk), ".*");
+
+ for (size_t i = 0; i != 10; i++) {
+ PGDC cursor_collector;
+ PGDC cursor_disk;
+
+ pgdc_reset(&cursor_collector, pg_collector, i * 1024);
+ pgdc_reset(&cursor_disk, pg_disk, i * 1024);
+
+ STORAGE_POINT sp_collector = {};
+ STORAGE_POINT sp_disk = {};
+
+ for (size_t slot = i * 1024; slot != slots; slot++) {
+ EXPECT_TRUE(pgdc_get_next_point(&cursor_collector, slot, &sp_collector));
+ EXPECT_TRUE(pgdc_get_next_point(&cursor_disk, slot, &sp_disk));
+
+ EXPECT_EQ(sp_collector, sp_disk);
+ }
+
+ EXPECT_FALSE(pgdc_get_next_point(&cursor_collector, slots, &sp_collector));
+ EXPECT_FALSE(pgdc_get_next_point(&cursor_disk, slots, &sp_disk));
+ }
+
+ pgd_free(pg_disk);
+ pgd_free(pg_collector);
+}
+
+int pgd_test(int argc, char *argv[])
+{
+ // Dummy/necessary initialization stuff
+ PGC *dummy_cache = pgc_create("pgd-tests-cache", 32 * 1024 * 1024, NULL, 64, NULL, NULL,
+ 10, 10, 1000, 10, PGC_OPTIONS_NONE, 1, 11);
+ pgd_init_arals();
+
+ ::testing::InitGoogleTest(&argc, argv);
+ int rc = RUN_ALL_TESTS();
+
+ pgc_destroy(dummy_cache);
+
+ return rc;
+}
+
+#else // HAVE_GTEST
+
+int pgd_test(int argc, char *argv[])
+{
+ (void) argc;
+ (void) argv;
+ fprintf(stderr, "Can not run PGD tests because the agent was not build with support for google tests.\n");
+ return 0;
+}
+
+#endif // HAVE_GTEST
diff --git a/database/engine/page_test.h b/database/engine/page_test.h
new file mode 100644
index 000000000..30837f0ab
--- /dev/null
+++ b/database/engine/page_test.h
@@ -0,0 +1,14 @@
+#ifndef PAGE_TEST_H
+#define PAGE_TEST_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+int pgd_test(int argc, char *argv[]);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* PAGE_TEST_H */
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index c608c3270..dab9cdd0d 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -12,8 +12,9 @@ struct rrdeng_cache_efficiency_stats rrdeng_cache_efficiency_stats = {};
static void main_cache_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused)
{
// Release storage associated with the page
- dbengine_page_free(entry.data, entry.size);
+ pgd_free(entry.data);
}
+
static void main_cache_flush_dirty_page_init_callback(PGC *cache __maybe_unused, Word_t section) {
struct rrdengine_instance *ctx = (struct rrdengine_instance *) section;
@@ -28,8 +29,6 @@ static void main_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_
struct rrdengine_instance *ctx = (struct rrdengine_instance *) entries_array[0].section;
- size_t bytes_per_point = CTX_POINT_SIZE_BYTES(ctx);
-
struct page_descr_with_data *base = NULL;
for (size_t Index = 0 ; Index < entries; Index++) {
@@ -42,21 +41,15 @@ static void main_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_
descr->start_time_ut = start_time_s * USEC_PER_SEC;
descr->end_time_ut = end_time_s * USEC_PER_SEC;
descr->update_every_s = entries_array[Index].update_every_s;
- descr->type = ctx->config.page_type;
- descr->page_length = (end_time_s - (start_time_s - descr->update_every_s)) / descr->update_every_s * bytes_per_point;
+ descr->pgd = pgc_page_data(pages_array[Index]);
+ descr->type = pgd_type(descr->pgd);
+ descr->page_length = pgd_disk_footprint(descr->pgd);
- if(descr->page_length > entries_array[Index].size) {
- descr->page_length = entries_array[Index].size;
-
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE: page exceeds the maximum size, adjusting it to max.");
- }
-
- descr->page = pgc_page_data(pages_array[Index]);
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(base, descr, link.prev, link.next);
- internal_fatal(descr->page_length > RRDENG_BLOCK_SIZE, "DBENGINE: faulty page length calculation");
+ // TODO: ask @stelfrag/@ktsaou about this.
+ // internal_fatal(descr->page_length > RRDENG_BLOCK_SIZE, "DBENGINE: faulty page length calculation");
}
struct completion completion;
@@ -254,7 +247,6 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin
time_t page_start_time_s = pgc_page_start_time_s(page);
time_t page_end_time_s = pgc_page_end_time_s(page);
time_t page_update_every_s = pgc_page_update_every_s(page);
- size_t page_length = pgc_page_data_size(cache, page);
if(!page_update_every_s)
page_update_every_s = dt_s;
@@ -277,24 +269,10 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin
if (!PValue || PValue == PJERR)
fatal("DBENGINE: corrupted judy array in %s()", __FUNCTION__ );
- if (unlikely(*PValue)) {
- struct page_details *pd = *PValue;
- UNUSED(pd);
-
-// internal_error(
-// pd->first_time_s != page_first_time_s ||
-// pd->last_time_s != page_last_time_s ||
-// pd->update_every_s != page_update_every_s,
-// "DBENGINE: duplicate page with different retention in %s cache "
-// "1st: %ld to %ld, ue %u, size %u "
-// "2nd: %ld to %ld, ue %ld size %zu "
-// "- ignoring the second",
-// cache == open_cache ? "open" : "main",
-// pd->first_time_s, pd->last_time_s, pd->update_every_s, pd->page_length,
-// page_first_time_s, page_last_time_s, page_update_every_s, page_length);
-
+ if (unlikely(*PValue))
+ // already exists in our list
pgc_page_release(cache, page);
- }
+
else {
internal_fatal(pgc_page_metric(page) != metric_id, "Wrong metric id in page found in cache");
@@ -304,7 +282,6 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin
pd->metric_id = metric_id;
pd->first_time_s = page_start_time_s;
pd->last_time_s = page_end_time_s;
- pd->page_length = page_length;
pd->update_every_s = (uint32_t) page_update_every_s;
pd->page = (open_cache_mode) ? NULL : page;
pd->status |= tags;
@@ -312,7 +289,7 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin
if((pd->page)) {
pd->status |= PDC_PAGE_READY | PDC_PAGE_PRELOADED;
- if(pgc_page_data(page) == DBENGINE_EMPTY_PAGE)
+ if(pgd_is_empty(pgc_page_data(page)))
pd->status |= PDC_PAGE_EMPTY;
}
@@ -369,7 +346,7 @@ static void pgc_inject_gap(struct rrdengine_instance *ctx, METRIC *metric, time_
.end_time_s = MIN(end_time_s, db_last_time_s),
.update_every_s = 0,
.size = 0,
- .data = DBENGINE_EMPTY_PAGE,
+ .data = PGD_EMPTY,
};
if(page_entry.start_time_s >= page_entry.end_time_s)
@@ -478,7 +455,7 @@ static size_t list_has_time_gaps(
pd->status &= ~PDC_PAGE_DISK_PENDING;
pd->status |= PDC_PAGE_READY | PDC_PAGE_PRELOADED | PDC_PAGE_PRELOADED_PASS4;
- if(pgc_page_data(pd->page) == DBENGINE_EMPTY_PAGE)
+ if(pgd_is_empty(pgc_page_data(pd->page)))
pd->status |= PDC_PAGE_EMPTY;
}
@@ -642,7 +619,6 @@ void add_page_details_from_journal_v2(PGC_PAGE *page, void *JudyL_pptr) {
pd->first_time_s = pgc_page_start_time_s(page);
pd->last_time_s = pgc_page_end_time_s(page);
pd->datafile.ptr = datafile;
- pd->page_length = ei->page_length;
pd->update_every_s = (uint32_t) pgc_page_update_every_s(page);
pd->metric_id = metric_id;
pd->status |= PDC_PAGE_DISK_PENDING | PDC_PAGE_SOURCE_JOURNAL_V2 | PDC_PAGE_DATAFILE_ACQUIRED;
@@ -917,7 +893,7 @@ struct pgc_page *pg_cache_lookup_next(
}
}
- if(page && pgc_page_data(page) == DBENGINE_EMPTY_PAGE)
+ if(page && pgd_is_empty(pgc_page_data(page)))
pdc_page_status_set(pd, PDC_PAGE_EMPTY);
if(!page || pdc_page_status_check(pd, PDC_PAGE_QUERY_GLOBAL_SKIP_LIST | PDC_PAGE_EMPTY)) {
@@ -930,7 +906,6 @@ struct pgc_page *pg_cache_lookup_next(
time_t page_start_time_s = pgc_page_start_time_s(page);
time_t page_end_time_s = pgc_page_end_time_s(page);
time_t page_update_every_s = pgc_page_update_every_s(page);
- size_t page_length = pgc_page_data_size(main_cache, page);
if(unlikely(page_start_time_s == INVALID_TIME || page_end_time_s == INVALID_TIME)) {
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_zero_time_skipped, 1, __ATOMIC_RELAXED);
@@ -939,13 +914,6 @@ struct pgc_page *pg_cache_lookup_next(
pd->page = page = NULL;
continue;
}
- else if(page_length > RRDENG_BLOCK_SIZE) {
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_size_skipped, 1, __ATOMIC_RELAXED);
- pgc_page_to_clean_evict_or_release(main_cache, page);
- pdc_page_status_set(pd, PDC_PAGE_INVALID | PDC_PAGE_RELEASED);
- pd->page = page = NULL;
- continue;
- }
else {
if (unlikely(page_update_every_s <= 0 || page_update_every_s > 86400)) {
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_update_every_fixed, 1, __ATOMIC_RELAXED);
@@ -953,7 +921,7 @@ struct pgc_page *pg_cache_lookup_next(
pd->update_every_s = (uint32_t) page_update_every_s;
}
- size_t entries_by_size = page_entries_by_size(page_length, CTX_POINT_SIZE_BYTES(ctx));
+ size_t entries_by_size = pgd_slots_used(pgc_page_data(page));
size_t entries_by_time = page_entries_by_time(page_start_time_s, page_end_time_s, page_update_every_s);
if(unlikely(entries_by_size < entries_by_time)) {
time_t fixed_page_end_time_s = (time_t)(page_start_time_s + (entries_by_size - 1) * page_update_every_s);
diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h
index 5242db89e..dbcbea53a 100644
--- a/database/engine/pagecache.h
+++ b/database/engine/pagecache.h
@@ -27,7 +27,7 @@ struct page_descr_with_data {
uint8_t type;
uint32_t update_every_s;
uint32_t page_length;
- uint8_t *page;
+ struct pgd *pgd;
struct {
struct page_descr_with_data *prev;
diff --git a/database/engine/pdc.c b/database/engine/pdc.c
index 7da568787..5fe205e64 100644
--- a/database/engine/pdc.c
+++ b/database/engine/pdc.c
@@ -629,14 +629,33 @@ void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags) {
}
inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error) {
+ time_t start_time_s = (time_t) (descr->start_time_ut / USEC_PER_SEC);
+
+ time_t end_time_s;
+ size_t entries;
+
+ switch (descr->type) {
+ case PAGE_METRICS:
+ case PAGE_TIER:
+ end_time_s = descr->end_time_ut / USEC_PER_SEC;
+ entries = 0;
+ break;
+ case PAGE_GORILLA_METRICS:
+ end_time_s = start_time_s + descr->gorilla.delta_time_s;
+ entries = descr->gorilla.entries;
+ break;
+ default:
+ fatal("Unknown page type: %uc\n", descr->type);
+ }
+
return validate_page(
(uuid_t *)descr->uuid,
- (time_t) (descr->start_time_ut / USEC_PER_SEC),
- (time_t) (descr->end_time_ut / USEC_PER_SEC),
+ start_time_s,
+ end_time_s,
0,
descr->page_length,
descr->type,
- 0,
+ entries,
now_s,
overwrite_zero_update_every_s,
have_read_error,
@@ -666,13 +685,25 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
.is_valid = true,
};
- // always calculate entries by size
vd.point_size = page_type_size[vd.type];
- vd.entries = page_entries_by_size(vd.page_length, vd.point_size);
-
- // allow to be called without entries (when loading pages from disk)
- if(!entries)
- entries = vd.entries;
+ switch (page_type) {
+ case PAGE_METRICS:
+ case PAGE_TIER:
+ // always calculate entries by size
+ vd.entries = page_entries_by_size(vd.page_length, vd.point_size);
+
+ // allow to be called without entries (when loading pages from disk)
+ if(!entries)
+ entries = vd.entries;
+ break;
+ case PAGE_GORILLA_METRICS:
+ internal_fatal(entries == 0, "0 number of entries found on gorilla page");
+ vd.entries = entries;
+ break;
+ default:
+ // TODO: should set vd.is_valid false instead?
+ fatal("Unknown page type: %uc", page_type);
+ }
// allow to be called without update every (when loading pages from disk)
if(!update_every_s) {
@@ -687,19 +718,26 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
bool updated = false;
+ size_t max_page_length = RRDENG_BLOCK_SIZE;
+
+ // If gorilla can not compress the data we might end up needing slightly more
+ // than 4KiB. However, gorilla pages extend the page length by increments of
+ // 512 bytes.
+ max_page_length += ((page_type == PAGE_GORILLA_METRICS) * GORILLA_BUFFER_SIZE);
+
if( have_read_error ||
vd.page_length == 0 ||
- vd.page_length > RRDENG_BLOCK_SIZE ||
+ vd.page_length > max_page_length ||
vd.start_time_s > vd.end_time_s ||
(now_s && vd.end_time_s > now_s) ||
vd.start_time_s <= 0 ||
vd.end_time_s <= 0 ||
vd.update_every_s < 0 ||
(vd.start_time_s == vd.end_time_s && vd.entries > 1) ||
- (vd.update_every_s == 0 && vd.entries > 1)
- )
+ (vd.update_every_s == 0 && vd.entries > 1))
+ {
vd.is_valid = false;
-
+ }
else {
if(unlikely(vd.entries != entries || vd.update_every_s != update_every_s))
updated = true;
@@ -734,7 +772,7 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
if(unlikely(!vd.is_valid || updated)) {
#ifndef NETDATA_INTERNAL_CHECKS
- error_limit_static_global_var(erl, 1, 0);
+ nd_log_limit_static_global_var(erl, 1, 0);
#endif
char uuid_str[UUID_STR_LEN + 1];
uuid_unparse(*uuid, uuid_str);
@@ -750,7 +788,7 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
#ifdef NETDATA_INTERNAL_CHECKS
internal_error(true,
#else
- error_limit(&erl,
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
#endif
"DBENGINE: metric '%s' %s invalid page of type %u "
"from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s)",
@@ -770,7 +808,7 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
#ifdef NETDATA_INTERNAL_CHECKS
internal_error(true,
#else
- error_limit(&erl,
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
#endif
"DBENGINE: metric '%s' %s page of type %u "
"from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s), "
@@ -832,7 +870,15 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL *
if (descr) {
start_time_s = (time_t)(descr->start_time_ut / USEC_PER_SEC);
- end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC);
+ switch (descr->type) {
+ case PAGE_METRICS:
+ case PAGE_TIER:
+ end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC);
+ break;
+ case PAGE_GORILLA_METRICS:
+ end_time_s = (time_t) start_time_s + (descr->gorilla.delta_time_s);
+ break;
+ }
uuid_unparse_lower(descr->uuid, uuid);
used_descr = true;
}
@@ -869,8 +915,8 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL *
if(end_time_s)
log_date(end_time_str, LOG_DATE_LENGTH, end_time_s);
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl,
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
"DBENGINE: error while reading extent from datafile %u of tier %d, at offset %" PRIu64 " (%u bytes) "
"%s from %ld (%s) to %ld (%s) %s%s: "
"%s",
@@ -952,7 +998,9 @@ static bool epdl_populate_pages_from_extent_data(
uncompressed_payload_length = 0;
for (i = 0; i < count; ++i) {
size_t page_length = header->descr[i].page_length;
- if(page_length > RRDENG_BLOCK_SIZE) {
+ if (page_length > RRDENG_BLOCK_SIZE && (header->descr[i].type != PAGE_GORILLA_METRICS ||
+ (header->descr[i].type == PAGE_GORILLA_METRICS &&
+ (page_length - RRDENG_BLOCK_SIZE) % GORILLA_BUFFER_SIZE))) {
have_read_error = true;
break;
}
@@ -993,7 +1041,7 @@ static bool epdl_populate_pages_from_extent_data(
if(!page_length || !start_time_s) {
char log[200 + 1];
- snprintfz(log, 200, "page %u (out of %u) is EMPTY", i, count);
+ snprintfz(log, sizeof(log) - 1, "page %u (out of %u) is EMPTY", i, count);
epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log);
continue;
}
@@ -1002,7 +1050,7 @@ static bool epdl_populate_pages_from_extent_data(
Word_t metric_id = (Word_t)metric;
if(!metric) {
char log[200 + 1];
- snprintfz(log, 200, "page %u (out of %u) has unknown UUID", i, count);
+ snprintfz(log, sizeof(log) - 1, "page %u (out of %u) has unknown UUID", i, count);
epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log);
continue;
}
@@ -1020,32 +1068,34 @@ static bool epdl_populate_pages_from_extent_data(
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_ALLOCATION);
- void *page_data;
+ PGD *pgd;
if (unlikely(!vd.is_valid)) {
- page_data = DBENGINE_EMPTY_PAGE;
+ pgd = PGD_EMPTY;
stats_load_invalid_page++;
}
else {
if (RRD_NO_COMPRESSION == header->compression_algorithm) {
- page_data = dbengine_page_alloc(vd.page_length);
- memcpy(page_data, data + payload_offset + page_offset, (size_t) vd.page_length);
+ pgd = pgd_create_from_disk_data(header->descr[i].type,
+ data + payload_offset + page_offset,
+ vd.page_length);
stats_load_uncompressed++;
}
else {
if (unlikely(page_offset + vd.page_length > uncompressed_payload_length)) {
char log[200 + 1];
- snprintfz(log, 200, "page %u (out of %u) offset %u + page length %zu, "
+ snprintfz(log, sizeof(log) - 1, "page %u (out of %u) offset %u + page length %zu, "
"exceeds the uncompressed buffer size %u",
i, count, page_offset, vd.page_length, uncompressed_payload_length);
epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log);
- page_data = DBENGINE_EMPTY_PAGE;
+ pgd = PGD_EMPTY;
stats_load_invalid_page++;
}
else {
- page_data = dbengine_page_alloc(vd.page_length);
- memcpy(page_data, uncompressed_buf + page_offset, vd.page_length);
+ pgd = pgd_create_from_disk_data(header->descr[i].type,
+ uncompressed_buf + page_offset,
+ vd.page_length);
stats_load_compressed++;
}
}
@@ -1061,14 +1111,14 @@ static bool epdl_populate_pages_from_extent_data(
.start_time_s = vd.start_time_s,
.end_time_s = vd.end_time_s,
.update_every_s = (uint32_t) vd.update_every_s,
- .size = (size_t) ((page_data == DBENGINE_EMPTY_PAGE) ? 0 : vd.page_length),
- .data = page_data
+ .size = pgd_memory_footprint(pgd), // the footprint of the entire PGD, for accurate memory management
+ .data = pgd,
};
bool added = true;
PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, &added);
if (false == added) {
- dbengine_page_free(page_data, vd.page_length);
+ pgd_free(pgd);
stats_cache_hit_while_inserting++;
stats_data_from_main_cache++;
}
@@ -1081,8 +1131,7 @@ static bool epdl_populate_pages_from_extent_data(
pgc_page_dup(main_cache, page);
pd->page = page;
- pd->page_length = pgc_page_data_size(main_cache, page);
- pdc_page_status_set(pd, PDC_PAGE_READY | tags | ((page_data == DBENGINE_EMPTY_PAGE) ? PDC_PAGE_EMPTY : 0));
+ pdc_page_status_set(pd, PDC_PAGE_READY | tags | (pgd_is_empty(pgd) ? PDC_PAGE_EMPTY : 0));
pd = pd->load.next;
} while(pd);
diff --git a/database/engine/rrddiskprotocol.h b/database/engine/rrddiskprotocol.h
index 5b4be9498..86b41f0b3 100644
--- a/database/engine/rrddiskprotocol.h
+++ b/database/engine/rrddiskprotocol.h
@@ -3,6 +3,8 @@
#ifndef NETDATA_RRDDISKPROTOCOL_H
#define NETDATA_RRDDISKPROTOCOL_H
+#include <stdint.h>
+
#define RRDENG_BLOCK_SIZE (4096)
#define RRDFILE_ALIGNMENT RRDENG_BLOCK_SIZE
@@ -36,7 +38,8 @@ struct rrdeng_df_sb {
*/
#define PAGE_METRICS (0)
#define PAGE_TIER (1)
-#define PAGE_TYPE_MAX 1 // Maximum page type (inclusive)
+#define PAGE_GORILLA_METRICS (2)
+#define PAGE_TYPE_MAX 2 // Maximum page type (inclusive)
/*
* Data file page descriptor
@@ -47,7 +50,14 @@ struct rrdeng_extent_page_descr {
uint8_t uuid[UUID_SZ];
uint32_t page_length;
uint64_t start_time_ut;
- uint64_t end_time_ut;
+ union {
+ struct {
+ uint32_t entries;
+ uint32_t delta_time_s;
+ } gorilla __attribute__((packed));
+
+ uint64_t end_time_ut;
+ };
} __attribute__ ((packed));
/*
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 99257b79d..b82cc1ad1 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -40,6 +40,7 @@ struct rrdeng_main {
uv_async_t async;
uv_timer_t timer;
pid_t tid;
+ bool shutdown;
size_t flushes_running;
size_t evictions_running;
@@ -577,55 +578,6 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) {
// ----------------------------------------------------------------------------
-struct {
- ARAL *aral[RRD_STORAGE_TIERS];
-} dbengine_page_alloc_globals = {};
-
-static inline ARAL *page_size_lookup(size_t size) {
- for(size_t tier = 0; tier < storage_tiers ;tier++)
- if(size == tier_page_size[tier])
- return dbengine_page_alloc_globals.aral[tier];
-
- return NULL;
-}
-
-static void dbengine_page_alloc_init(void) {
- for(size_t i = storage_tiers; i > 0 ;i--) {
- size_t tier = storage_tiers - i;
-
- char buf[20 + 1];
- snprintfz(buf, 20, "tier%zu-pages", tier);
-
- dbengine_page_alloc_globals.aral[tier] = aral_create(
- buf,
- tier_page_size[tier],
- 64,
- 512 * tier_page_size[tier],
- pgc_aral_statistics(),
- NULL, NULL, false, false);
- }
-}
-
-void *dbengine_page_alloc(size_t size) {
- ARAL *ar = page_size_lookup(size);
- if(ar) return aral_mallocz(ar);
-
- return mallocz(size);
-}
-
-void dbengine_page_free(void *page, size_t size __maybe_unused) {
- if(unlikely(!page || page == DBENGINE_EMPTY_PAGE))
- return;
-
- ARAL *ar = page_size_lookup(size);
- if(ar)
- aral_freez(ar, page);
- else
- freez(page);
-}
-
-// ----------------------------------------------------------------------------
-
void *dbengine_extent_alloc(size_t size) {
void *extent = mallocz(size);
return extent;
@@ -890,12 +842,25 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id);
header->descr[i].page_length = descr->page_length;
header->descr[i].start_time_ut = descr->start_time_ut;
- header->descr[i].end_time_ut = descr->end_time_ut;
+
+ switch (descr->type) {
+ case PAGE_METRICS:
+ case PAGE_TIER:
+ header->descr[i].end_time_ut = descr->end_time_ut;
+ break;
+ case PAGE_GORILLA_METRICS:
+ header->descr[i].gorilla.delta_time_s = (uint32_t) ((descr->end_time_ut - descr->start_time_ut) / USEC_PER_SEC);
+ header->descr[i].gorilla.entries = pgd_slots_used(descr->pgd);
+ break;
+ default:
+ fatal("Unknown page type: %uc", descr->type);
+ }
+
pos += sizeof(header->descr[i]);
}
for (i = 0 ; i < count ; ++i) {
descr = xt_io_descr->descr_array[i];
- (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length);
+ pgd_copy_to_extent(descr->pgd, xt_io_descr->buf + pos, descr->page_length);
pos += descr->page_length;
}
@@ -1381,9 +1346,6 @@ static void after_ctx_shutdown(struct rrdengine_instance *ctx __maybe_unused, vo
static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
worker_is_busy(UV_EVENT_DBENGINE_SHUTDOWN);
- completion_wait_for(&ctx->quiesce.completion);
- completion_destroy(&ctx->quiesce.completion);
-
bool logged = false;
while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED) ||
__atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) {
@@ -1436,6 +1398,14 @@ uint64_t rrdeng_target_data_file_size(struct rrdengine_instance *ctx) {
bool rrdeng_ctx_exceeded_disk_quota(struct rrdengine_instance *ctx)
{
+ if(!ctx->datafiles.first)
+ // no datafiles available
+ return false;
+
+ if(!ctx->datafiles.first->next)
+ // only 1 datafile available
+ return false;
+
uint64_t estimated_disk_space = ctx_current_disk_space_get(ctx) + rrdeng_target_data_file_size(ctx) -
(ctx->datafiles.first->prev ? ctx->datafiles.first->prev->pos : 0);
@@ -1514,12 +1484,19 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb
spinlock_unlock(&datafile->writers.spinlock);
if(!available) {
- netdata_log_info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno);
+ nd_log(NDLS_DAEMON, NDLP_NOTICE,
+ "DBENGINE: journal file %u needs to be indexed, but it has writers working on it - "
+ "skipping it for now",
+ datafile->fileno);
+
datafile = datafile->next;
continue;
}
- netdata_log_info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "DBENGINE: journal file %u is ready to be indexed",
+ datafile->fileno);
+
pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
@@ -1532,7 +1509,10 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb
}
errno = 0;
- internal_error(count, "DBENGINE: journal indexing done; %u files processed", count);
+ if(count)
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "DBENGINE: journal indexing done; %u files processed",
+ count);
worker_is_idle();
@@ -1628,7 +1608,7 @@ static void dbengine_initialize_structures(void) {
rrdeng_query_handle_init();
page_descriptors_init();
extent_buffer_init();
- dbengine_page_alloc_init();
+ pgd_init_arals();
extent_io_descriptor_init();
}
@@ -1715,6 +1695,7 @@ void dbengine_event_loop(void* arg) {
worker_register_job_name(RRDENG_OPCODE_EVICT_INIT, "evict init");
worker_register_job_name(RRDENG_OPCODE_CTX_SHUTDOWN, "ctx shutdown");
worker_register_job_name(RRDENG_OPCODE_CTX_QUIESCE, "ctx quiesce");
+ worker_register_job_name(RRDENG_OPCODE_SHUTDOWN_EVLOOP, "dbengine shutdown");
worker_register_job_name(RRDENG_OPCODE_MAX, "get opcode");
@@ -1856,6 +1837,13 @@ void dbengine_event_loop(void* arg) {
break;
}
+ case RRDENG_OPCODE_SHUTDOWN_EVLOOP: {
+ uv_close((uv_handle_t *)&main->async, NULL);
+ (void) uv_timer_stop(&main->timer);
+ uv_close((uv_handle_t *)&main->timer, NULL);
+ shutdown = true;
+ }
+
case RRDENG_OPCODE_NOOP: {
/* the command queue was empty, do nothing */
break;
@@ -1872,18 +1860,7 @@ void dbengine_event_loop(void* arg) {
} while (opcode != RRDENG_OPCODE_NOOP);
}
- /* cleanup operations of the event loop */
- netdata_log_info("DBENGINE: shutting down dbengine thread");
-
- /*
- * uv_async_send after uv_close does not seem to crash in linux at the moment,
- * it is however undocumented behaviour and we need to be aware if this becomes
- * an issue in the future.
- */
- uv_close((uv_handle_t *)&main->async, NULL);
- uv_timer_stop(&main->timer);
- uv_close((uv_handle_t *)&main->timer, NULL);
- uv_run(&main->loop, UV_RUN_DEFAULT);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Shutting down dbengine thread");
uv_loop_close(&main->loop);
worker_unregister();
}
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index 08eaf4128..cd3352f12 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -22,6 +22,7 @@
#include "metric.h"
#include "cache.h"
#include "pdc.h"
+#include "page.h"
extern unsigned rrdeng_pages_per_extent;
@@ -119,7 +120,6 @@ struct page_details {
time_t first_time_s;
time_t last_time_s;
uint32_t update_every_s;
- uint16_t page_length;
PDC_PAGE_STATUS status;
struct {
@@ -190,10 +190,11 @@ struct rrdeng_collect_handle {
RRDENG_COLLECT_HANDLE_OPTIONS options;
uint8_t type;
+ struct rrdengine_instance *ctx;
struct metric *metric;
- struct pgc_page *page;
- void *data;
- size_t data_size;
+ struct pgc_page *pgc_page;
+ struct pgd *page_data;
+ size_t page_data_size;
struct pg_alignment *alignment;
uint32_t page_entries_max;
uint32_t page_position; // keep track of the current page size, to make sure we don't exceed it
@@ -206,7 +207,7 @@ struct rrdeng_query_handle {
struct metric *metric;
struct pgc_page *page;
struct rrdengine_instance *ctx;
- storage_number *metric_data;
+ struct pgd_cursor pgdc;
struct page_details_control *pdc;
// the request
@@ -246,6 +247,7 @@ enum rrdeng_opcode {
RRDENG_OPCODE_CTX_SHUTDOWN,
RRDENG_OPCODE_CTX_QUIESCE,
RRDENG_OPCODE_CTX_POPULATE_MRG,
+ RRDENG_OPCODE_SHUTDOWN_EVLOOP,
RRDENG_OPCODE_CLEANUP,
RRDENG_OPCODE_MAX
@@ -445,9 +447,6 @@ static inline void ctx_last_flush_fileno_set(struct rrdengine_instance *ctx, uns
#define ctx_is_available_for_queries(ctx) (__atomic_load_n(&(ctx)->quiesce.enabled, __ATOMIC_RELAXED) == false && __atomic_load_n(&(ctx)->quiesce.exit_mode, __ATOMIC_RELAXED) == false)
-void *dbengine_page_alloc(size_t size);
-void dbengine_page_free(void *page, size_t size);
-
void *dbengine_extent_alloc(size_t size);
void dbengine_extent_free(void *extent, size_t size);
@@ -491,8 +490,6 @@ typedef struct validated_page_descriptor {
bool is_valid;
} VALIDATED_PAGE_DESCRIPTOR;
-#define DBENGINE_EMPTY_PAGE (void *)(-1)
-
#define page_entries_by_time(start_time_s, end_time_s, update_every_s) \
((update_every_s) ? (((end_time_s) - ((start_time_s) - (update_every_s))) / (update_every_s)) : 1)
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index 318a933f1..1ddce5243 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -1,4 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "database/engine/rrddiskprotocol.h"
#include "rrdengine.h"
/* Default global database instance */
@@ -22,10 +24,15 @@ size_t tier_page_size[RRD_STORAGE_TIERS] = {2048, 1024, 192, 192, 192};
size_t tier_page_size[RRD_STORAGE_TIERS] = {4096, 2048, 384, 384, 384};
#endif
-#if PAGE_TYPE_MAX != 1
-#error PAGE_TYPE_MAX is not 1 - you need to add allocations here
+#if PAGE_TYPE_MAX != 2
+#error PAGE_TYPE_MAX is not 2 - you need to add allocations here
#endif
-size_t page_type_size[256] = {sizeof(storage_number), sizeof(storage_number_tier1_t)};
+
+size_t page_type_size[256] = {
+ [PAGE_METRICS] = sizeof(storage_number),
+ [PAGE_TIER] = sizeof(storage_number_tier1_t),
+ [PAGE_GORILLA_METRICS] = sizeof(storage_number)
+};
__attribute__((constructor)) void initialize_multidb_ctx(void) {
multidb_ctx[0] = &multidb_ctx_storage_tier0;
@@ -198,15 +205,15 @@ static inline void check_and_fix_mrg_update_every(struct rrdeng_collect_handle *
static inline bool check_completed_page_consistency(struct rrdeng_collect_handle *handle __maybe_unused) {
#ifdef NETDATA_INTERNAL_CHECKS
- if (unlikely(!handle->page || !handle->page_entries_max || !handle->page_position || !handle->page_end_time_ut))
+ if (unlikely(!handle->pgc_page || !handle->page_entries_max || !handle->page_position || !handle->page_end_time_ut))
return false;
struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
uuid_t *uuid = mrg_metric_uuid(main_mrg, handle->metric);
- time_t start_time_s = pgc_page_start_time_s(handle->page);
- time_t end_time_s = pgc_page_end_time_s(handle->page);
- time_t update_every_s = pgc_page_update_every_s(handle->page);
+ time_t start_time_s = pgc_page_start_time_s(handle->pgc_page);
+ time_t end_time_s = pgc_page_end_time_s(handle->pgc_page);
+ time_t update_every_s = pgc_page_update_every_s(handle->pgc_page);
size_t page_length = handle->page_position * CTX_POINT_SIZE_BYTES(ctx);
size_t entries = handle->page_position;
time_t overwrite_zero_update_every_s = (time_t)(handle->update_every_ut / USEC_PER_SEC);
@@ -257,9 +264,11 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri
handle = callocz(1, sizeof(struct rrdeng_collect_handle));
handle->common.backend = STORAGE_ENGINE_BACKEND_DBENGINE;
handle->metric = metric;
- handle->page = NULL;
- handle->data = NULL;
- handle->data_size = 0;
+
+ handle->pgc_page = NULL;
+ handle->page_data = NULL;
+ handle->page_data_size = 0;
+
handle->page_position = 0;
handle->page_entries_max = 0;
handle->update_every_ut = (usec_t)update_every * USEC_PER_SEC;
@@ -286,65 +295,29 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri
return (STORAGE_COLLECT_HANDLE *)handle;
}
-/* The page must be populated and referenced */
-static bool page_has_only_empty_metrics(struct rrdeng_collect_handle *handle) {
- switch(handle->type) {
- case PAGE_METRICS: {
- size_t slots = handle->page_position;
- storage_number *array = (storage_number *)pgc_page_data(handle->page);
- for (size_t i = 0 ; i < slots; ++i) {
- if(does_storage_number_exist(array[i]))
- return false;
- }
- }
- break;
-
- case PAGE_TIER: {
- size_t slots = handle->page_position;
- storage_number_tier1_t *array = (storage_number_tier1_t *)pgc_page_data(handle->page);
- for (size_t i = 0 ; i < slots; ++i) {
- if(fpclassify(array[i].sum_value) != FP_NAN)
- return false;
- }
- }
- break;
-
- default: {
- static bool logged = false;
- if(!logged) {
- netdata_log_error("DBENGINE: cannot check page for nulls on unknown page type id %d", (mrg_metric_ctx(handle->metric))->config.page_type);
- logged = true;
- }
- return false;
- }
- }
-
- return true;
-}
-
void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) {
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
- if (unlikely(!handle->page))
+ if (unlikely(!handle->pgc_page))
return;
- if(!handle->page_position || page_has_only_empty_metrics(handle))
- pgc_page_to_clean_evict_or_release(main_cache, handle->page);
+ if(pgd_is_empty(handle->page_data))
+ pgc_page_to_clean_evict_or_release(main_cache, handle->pgc_page);
else {
check_completed_page_consistency(handle);
- mrg_metric_set_clean_latest_time_s(main_mrg, handle->metric, pgc_page_end_time_s(handle->page));
- pgc_page_hot_to_dirty_and_release(main_cache, handle->page);
+ mrg_metric_set_clean_latest_time_s(main_mrg, handle->metric, pgc_page_end_time_s(handle->pgc_page));
+ pgc_page_hot_to_dirty_and_release(main_cache, handle->pgc_page);
}
mrg_metric_set_hot_latest_time_s(main_mrg, handle->metric, 0);
- handle->page = NULL;
+ handle->pgc_page = NULL;
handle->page_flags = 0;
handle->page_position = 0;
handle->page_entries_max = 0;
- handle->data = NULL;
- handle->data_size = 0;
+ handle->page_data = NULL;
+ handle->page_data_size = 0;
// important!
// we should never zero page end time ut, because this will allow
@@ -358,10 +331,10 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h
}
static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *handle,
- struct rrdengine_instance *ctx,
- usec_t point_in_time_ut,
- void *data,
- size_t data_size) {
+ struct rrdengine_instance *ctx,
+ usec_t point_in_time_ut,
+ PGD *data,
+ size_t data_size) {
time_t point_in_time_s = (time_t)(point_in_time_ut / USEC_PER_SEC);
const time_t update_every_s = (time_t)(handle->update_every_ut / USEC_PER_SEC);
@@ -378,7 +351,7 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha
size_t conflicts = 0;
bool added = true;
- PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, &added);
+ PGC_PAGE *pgc_page = pgc_page_add_and_acquire(main_cache, page_entry, &added);
while (unlikely(!added)) {
conflicts++;
@@ -388,33 +361,33 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha
#ifdef NETDATA_INTERNAL_CHECKS
internal_error(true,
#else
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl,
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
#endif
- "DBENGINE: metric '%s' new page from %ld to %ld, update every %ld, has a conflict in main cache "
- "with existing %s%s page from %ld to %ld, update every %ld - "
- "is it collected more than once?",
- uuid,
- page_entry.start_time_s, page_entry.end_time_s, (time_t)page_entry.update_every_s,
- pgc_is_page_hot(page) ? "hot" : "not-hot",
- pgc_page_data(page) == DBENGINE_EMPTY_PAGE ? " gap" : "",
- pgc_page_start_time_s(page), pgc_page_end_time_s(page), pgc_page_update_every_s(page)
+ "DBENGINE: metric '%s' new page from %ld to %ld, update every %ld, has a conflict in main cache "
+ "with existing %s%s page from %ld to %ld, update every %ld - "
+ "is it collected more than once?",
+ uuid,
+ page_entry.start_time_s, page_entry.end_time_s, (time_t)page_entry.update_every_s,
+ pgc_is_page_hot(pgc_page) ? "hot" : "not-hot",
+ pgc_page_data(pgc_page) == PGD_EMPTY ? " gap" : "",
+ pgc_page_start_time_s(pgc_page), pgc_page_end_time_s(pgc_page), pgc_page_update_every_s(pgc_page)
);
- pgc_page_release(main_cache, page);
+ pgc_page_release(main_cache, pgc_page);
point_in_time_ut -= handle->update_every_ut;
point_in_time_s = (time_t)(point_in_time_ut / USEC_PER_SEC);
page_entry.start_time_s = point_in_time_s;
page_entry.end_time_s = point_in_time_s;
- page = pgc_page_add_and_acquire(main_cache, page_entry, &added);
+ pgc_page = pgc_page_add_and_acquire(main_cache, page_entry, &added);
}
handle->page_entries_max = data_size / CTX_POINT_SIZE_BYTES(ctx);
handle->page_start_time_ut = point_in_time_ut;
handle->page_end_time_ut = point_in_time_ut;
handle->page_position = 1; // zero is already in our data
- handle->page = page;
+ handle->pgc_page = pgc_page;
handle->page_flags = conflicts? RRDENG_PAGE_CONFLICT : 0;
if(point_in_time_s > max_acceptable_collected_time())
@@ -441,9 +414,11 @@ static size_t aligned_allocation_entries(size_t max_slots, size_t target_slot, t
return slots;
}
-static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, size_t *data_size, usec_t point_in_time_ut) {
+static PGD *rrdeng_alloc_new_page_data(struct rrdeng_collect_handle *handle, size_t *data_size, usec_t point_in_time_ut) {
struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
+ PGD *d = NULL;
+
size_t max_size = tier_page_size[ctx->config.tier];
size_t max_slots = max_size / CTX_POINT_SIZE_BYTES(ctx);
@@ -467,10 +442,22 @@ static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle,
internal_fatal(size > tier_page_size[ctx->config.tier] || size < CTX_POINT_SIZE_BYTES(ctx) * 2, "ooops! wrong page size");
*data_size = size;
- void *d = dbengine_page_alloc(size);
- timing_step(TIMING_STEP_DBENGINE_PAGE_ALLOC);
+ switch (ctx->config.page_type) {
+ case PAGE_METRICS:
+ case PAGE_TIER:
+ d = pgd_create(ctx->config.page_type, slots);
+ break;
+ case PAGE_GORILLA_METRICS:
+ // ignore slots, and use the fixed number of slots per gorilla buffer.
+ // gorilla will automatically add more buffers if needed.
+ d = pgd_create(ctx->config.page_type, GORILLA_BUFFER_SLOTS);
+ break;
+ default:
+ fatal("Unknown page type: %uc\n", ctx->config.page_type);
+ }
+ timing_step(TIMING_STEP_DBENGINE_PAGE_ALLOC);
return d;
}
@@ -486,37 +473,25 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_
struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
- if(unlikely(!handle->data))
- handle->data = rrdeng_alloc_new_metric_data(handle, &handle->data_size, point_in_time_ut);
+ if(unlikely(!handle->page_data))
+ handle->page_data = rrdeng_alloc_new_page_data(handle, &handle->page_data_size, point_in_time_ut);
timing_step(TIMING_STEP_DBENGINE_CHECK_DATA);
- if(likely(ctx->config.page_type == PAGE_METRICS)) {
- storage_number *tier0_metric_data = handle->data;
- tier0_metric_data[handle->page_position] = pack_storage_number(n, flags);
- }
- else if(likely(ctx->config.page_type == PAGE_TIER)) {
- storage_number_tier1_t *tier12_metric_data = handle->data;
- storage_number_tier1_t number_tier1;
- number_tier1.sum_value = (float) n;
- number_tier1.min_value = (float) min_value;
- number_tier1.max_value = (float) max_value;
- number_tier1.anomaly_count = anomaly_count;
- number_tier1.count = count;
- tier12_metric_data[handle->page_position] = number_tier1;
- }
- else
- fatal("DBENGINE: cannot store metric on unknown page type id %d", ctx->config.page_type);
+ pgd_append_point(handle->page_data,
+ point_in_time_ut,
+ n, min_value, max_value, count, anomaly_count, flags,
+ handle->page_position);
timing_step(TIMING_STEP_DBENGINE_PACK);
- if(unlikely(!handle->page)){
- rrdeng_store_metric_create_new_page(handle, ctx, point_in_time_ut, handle->data, handle->data_size);
+ if(unlikely(!handle->pgc_page)) {
+ rrdeng_store_metric_create_new_page(handle, ctx, point_in_time_ut, handle->page_data, handle->page_data_size);
// handle->position is set to 1 already
}
else {
// update an existing page
- pgc_page_hot_set_end_time_s(main_cache, handle->page, (time_t) (point_in_time_ut / USEC_PER_SEC));
+ pgc_page_hot_set_end_time_s(main_cache, handle->pgc_page, (time_t) (point_in_time_ut / USEC_PER_SEC));
handle->page_end_time_ut = point_in_time_ut;
if(unlikely(++handle->page_position >= handle->page_entries_max)) {
@@ -541,13 +516,13 @@ static void store_metric_next_error_log(struct rrdeng_collect_handle *handle __m
uuid_unparse(*mrg_metric_uuid(main_mrg, handle->metric), uuid);
BUFFER *wb = NULL;
- if(handle->page && handle->page_flags) {
+ if(handle->pgc_page && handle->page_flags) {
wb = buffer_create(0, NULL);
collect_page_flags_to_buffer(wb, handle->page_flags);
}
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl,
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_NOTICE,
"DBENGINE: metric '%s' collected point at %ld, %s last collection at %ld, "
"update every %ld, %s page from %ld to %ld, position %u (of %u), flags: %s",
uuid,
@@ -555,12 +530,12 @@ static void store_metric_next_error_log(struct rrdeng_collect_handle *handle __m
msg,
(time_t)(handle->page_end_time_ut / USEC_PER_SEC),
(time_t)(handle->update_every_ut / USEC_PER_SEC),
- handle->page ? "current" : "*LAST*",
+ handle->pgc_page ? "current" : "*LAST*",
(time_t)(handle->page_start_time_ut / USEC_PER_SEC),
(time_t)(handle->page_end_time_ut / USEC_PER_SEC),
handle->page_position, handle->page_entries_max,
wb ? buffer_tostring(wb) : ""
- );
+ );
buffer_free(wb);
#else
@@ -593,7 +568,7 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
;
}
else if(unlikely(point_in_time_ut > handle->page_end_time_ut)) {
- if(handle->page) {
+ if(handle->pgc_page) {
if (unlikely(delta_ut < handle->update_every_ut)) {
handle->page_flags |= RRDENG_PAGE_STEP_TOO_SMALL;
rrdeng_store_metric_flush_current_page(collection_handle);
@@ -801,12 +776,13 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_handle, bool debug_this __maybe_unused) {
struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
- struct rrdengine_instance *ctx = handle->ctx;
+ struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
if (likely(handle->page)) {
// we have a page to release
pgc_page_release(main_cache, handle->page);
handle->page = NULL;
+ pgdc_reset(&handle->pgdc, NULL, UINT32_MAX);
}
if (unlikely(handle->now_s > rrddim_handle->end_time_s))
@@ -815,10 +791,10 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han
size_t entries = 0;
handle->page = pg_cache_lookup_next(ctx, handle->pdc, handle->now_s, handle->dt_s, &entries);
- internal_fatal(handle->page && (pgc_page_data(handle->page) == DBENGINE_EMPTY_PAGE || !entries),
+ internal_fatal(handle->page && (pgc_page_data(handle->page) == PGD_EMPTY || !entries),
"A page was returned, but it is empty - pg_cache_lookup_next() should be handling this case");
- if (unlikely(!handle->page || pgc_page_data(handle->page) == DBENGINE_EMPTY_PAGE || !entries))
+ if (unlikely(!handle->page || pgc_page_data(handle->page) == PGD_EMPTY || !entries))
return false;
time_t page_start_time_s = pgc_page_start_time_s(handle->page);
@@ -859,8 +835,10 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han
handle->entries = entries;
handle->position = position;
- handle->metric_data = pgc_page_data((PGC_PAGE *)handle->page);
handle->dt_s = page_update_every_s;
+
+ pgdc_reset(&handle->pgdc, pgc_page_data(handle->page), handle->position);
+
return true;
}
@@ -889,38 +867,7 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim
sp.start_time_s = handle->now_s - handle->dt_s;
sp.end_time_s = handle->now_s;
- switch(handle->ctx->config.page_type) {
- case PAGE_METRICS: {
- storage_number n = handle->metric_data[handle->position];
- sp.min = sp.max = sp.sum = unpack_storage_number(n);
- sp.flags = n & SN_USER_FLAGS;
- sp.count = 1;
- sp.anomaly_count = is_storage_number_anomalous(n) ? 1 : 0;
- }
- break;
-
- case PAGE_TIER: {
- storage_number_tier1_t tier1_value = ((storage_number_tier1_t *)handle->metric_data)[handle->position];
- sp.flags = tier1_value.anomaly_count ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS;
- sp.count = tier1_value.count;
- sp.anomaly_count = tier1_value.anomaly_count;
- sp.min = tier1_value.min_value;
- sp.max = tier1_value.max_value;
- sp.sum = tier1_value.sum_value;
- }
- break;
-
- // we don't know this page type
- default: {
- static bool logged = false;
- if(!logged) {
- netdata_log_error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", handle->ctx->config.page_type);
- logged = true;
- }
- storage_point_empty(sp, sp.start_time_s, sp.end_time_s);
- }
- break;
- }
+ pgdc_get_next_point(&handle->pgdc, handle->position, &sp);
prepare_for_next_iteration:
internal_fatal(sp.end_time_s < rrddim_handle->start_time_s, "DBENGINE: this point is too old for this query");
@@ -944,8 +891,10 @@ void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_hand
{
struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
- if (handle->page)
+ if (handle->page) {
pgc_page_release(main_cache, handle->page);
+ pgdc_reset(&handle->pgdc, NULL, UINT32_MAX);
+ }
if(!pdc_release_and_destroy_if_unreferenced(handle->pdc, false, false))
__atomic_store_n(&handle->pdc->workers_should_stop, true, __ATOMIC_RELAXED);
@@ -1240,12 +1189,14 @@ int rrdeng_exit(struct rrdengine_instance *ctx) {
// 4. then wait for completion
bool logged = false;
- while(__atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED) && !unittest_running) {
+ size_t count = 10;
+ while(__atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED) && count && !unittest_running) {
if(!logged) {
netdata_log_info("DBENGINE: waiting for collectors to finish on tier %d...", (ctx->config.legacy) ? -1 : ctx->config.tier);
logged = true;
}
sleep_usec(100 * USEC_PER_MS);
+ count--;
}
netdata_log_info("DBENGINE: flushing main cache for tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier);
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
index 61449426f..7ae0e7079 100644
--- a/database/engine/rrdengineapi.h
+++ b/database/engine/rrdengineapi.h
@@ -20,6 +20,7 @@ extern int default_multidb_disk_quota_mb;
extern struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS];
extern size_t page_type_size[];
extern size_t tier_page_size[];
+extern uint8_t tier_page_type[];
#define CTX_POINT_SIZE_BYTES(ctx) page_type_size[(ctx)->config.page_type]
diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h
index 831e48531..a0febd4f4 100644
--- a/database/engine/rrdenginelib.h
+++ b/database/engine/rrdenginelib.h
@@ -8,16 +8,9 @@
/* Forward declarations */
struct rrdengine_instance;
-#define STR_HELPER(x) #x
-#define STR(x) STR_HELPER(x)
-
-#define BITS_PER_ULONG (sizeof(unsigned long) * 8)
-
#define ALIGN_BYTES_FLOOR(x) (((x) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE)
#define ALIGN_BYTES_CEILING(x) ((((x) + RRDENG_BLOCK_SIZE - 1) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE)
-#define ROUND_USEC_TO_SEC(x) (((x) + USEC_PER_SEC / 2 - 1) / USEC_PER_SEC)
-
typedef uintptr_t rrdeng_stats_t;
#ifdef __ATOMIC_RELAXED
@@ -58,7 +51,7 @@ static inline void modify_bit(unsigned *x, unsigned pos, uint8_t val)
}
}
-#define RRDENG_PATH_MAX (4096)
+#define RRDENG_PATH_MAX (FILENAME_MAX + 1)
/* returns old *ptr value */
static inline unsigned long ulong_compare_and_swap(volatile unsigned long *ptr,
@@ -74,12 +67,15 @@ static inline unsigned long ulong_compare_and_swap(volatile unsigned long *ptr,
static inline int crc32cmp(void *crcp, uLong crc)
{
- return (*(uint32_t *)crcp != crc);
+ uint32_t loaded_crc;
+ memcpy(&loaded_crc, crcp, sizeof(loaded_crc));
+ return (loaded_crc != crc);
}
static inline void crc32set(void *crcp, uLong crc)
{
- *(uint32_t *)crcp = crc;
+ uint32_t store_crc = (uint32_t) crc;
+ memcpy(crcp, &store_crc, sizeof(store_crc));
}
int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size);