diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:22 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:22 +0000 |
commit | c21c3b0befeb46a51b6bf3758ffa30813bea0ff0 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /database/engine | |
parent | Adding upstream version 1.43.2. (diff) | |
download | netdata-c21c3b0befeb46a51b6bf3758ffa30813bea0ff0.tar.xz netdata-c21c3b0befeb46a51b6bf3758ffa30813bea0ff0.zip |
Adding upstream version 1.44.3.upstream/1.44.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine')
-rw-r--r-- | database/engine/cache.c | 16 | ||||
-rw-r--r-- | database/engine/cache.h | 1 | ||||
-rw-r--r-- | database/engine/datafile.c | 20 | ||||
-rw-r--r-- | database/engine/journalfile.c | 15 | ||||
-rw-r--r-- | database/engine/metric.c | 360 | ||||
-rw-r--r-- | database/engine/metric.h | 4 | ||||
-rw-r--r-- | database/engine/page.c | 679 | ||||
-rw-r--r-- | database/engine/page.h | 58 | ||||
-rw-r--r-- | database/engine/page_test.cc | 405 | ||||
-rw-r--r-- | database/engine/page_test.h | 14 | ||||
-rw-r--r-- | database/engine/pagecache.c | 62 | ||||
-rw-r--r-- | database/engine/pagecache.h | 2 | ||||
-rw-r--r-- | database/engine/pdc.c | 119 | ||||
-rw-r--r-- | database/engine/rrddiskprotocol.h | 14 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 117 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 17 | ||||
-rwxr-xr-x | database/engine/rrdengineapi.c | 231 | ||||
-rw-r--r-- | database/engine/rrdengineapi.h | 1 | ||||
-rw-r--r-- | database/engine/rrdenginelib.h | 16 |
19 files changed, 1599 insertions, 552 deletions
diff --git a/database/engine/cache.c b/database/engine/cache.c index 7a9ccf8d1..eb1c35298 100644 --- a/database/engine/cache.c +++ b/database/engine/cache.c @@ -1,3 +1,4 @@ +// SPDX-License-Identifier: GPL-3.0-or-later #include "cache.h" /* STATES AND TRANSITIONS @@ -1170,9 +1171,10 @@ static bool evict_pages_with_filter(PGC *cache, size_t max_skip, size_t max_evic if(all_of_them && !filter) { pgc_ll_lock(cache, &cache->clean); if(cache->clean.stats->entries) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "DBENGINE CACHE: cannot free all clean pages, %zu are still in the clean queue", - cache->clean.stats->entries); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_NOTICE, + "DBENGINE CACHE: cannot free all clean pages, %zu are still in the clean queue", + cache->clean.stats->entries); } pgc_ll_unlock(cache, &cache->clean); } @@ -1801,7 +1803,7 @@ PGC *pgc_create(const char *name, cache->aral = callocz(cache->config.partitions, sizeof(ARAL *)); for(size_t part = 0; part < cache->config.partitions ; part++) { char buf[100 +1]; - snprintfz(buf, 100, "%s[%zu]", name, part); + snprintfz(buf, sizeof(buf) - 1, "%s[%zu]", name, part); cache->aral[part] = aral_create( buf, sizeof(PGC_PAGE) + cache->config.additional_bytes_per_page, @@ -1860,7 +1862,7 @@ void pgc_destroy(PGC *cache) { freez(cache->aral); #endif - + freez(cache->index); freez(cache); } } @@ -2517,7 +2519,7 @@ void unittest_stress_test(void) { for(size_t i = 0; i < pgc_uts.collect_threads ;i++) { collect_thread_ids[i] = i; char buffer[100 + 1]; - snprintfz(buffer, 100, "COLLECT_%zu", i); + snprintfz(buffer, sizeof(buffer) - 1, "COLLECT_%zu", i); netdata_thread_create(&collect_threads[i], buffer, NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, unittest_stress_test_collector, &collect_thread_ids[i]); @@ -2529,7 +2531,7 @@ void unittest_stress_test(void) { for(size_t i = 0; i < pgc_uts.query_threads ;i++) { query_thread_ids[i] = i; char buffer[100 + 1]; - snprintfz(buffer, 100, "QUERY_%zu", i); + snprintfz(buffer, sizeof(buffer) - 1, "QUERY_%zu", i); initstate_r(1, pgc_uts.rand_statebufs, 1024, &pgc_uts.random_data[i]); netdata_thread_create(&queries_threads[i], buffer, NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, diff --git a/database/engine/cache.h b/database/engine/cache.h index c10e09928..7cd7c0636 100644 --- a/database/engine/cache.h +++ b/database/engine/cache.h @@ -1,3 +1,4 @@ +// SPDX-License-Identifier: GPL-3.0-or-later #ifndef DBENGINE_CACHE_H #define DBENGINE_CACHE_H diff --git a/database/engine/datafile.c b/database/engine/datafile.c index fcda84bd6..7322039cd 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -160,7 +160,7 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { - (void) snprintfz(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION, + (void) snprintfz(str, maxlen - 1, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION, datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno); } @@ -338,7 +338,8 @@ static int load_data_file(struct rrdengine_datafile *datafile) ctx_fs_error(ctx); return fd; } - netdata_log_info("DBENGINE: initializing data file \"%s\".", path); + + nd_log_daemon(NDLP_DEBUG, "DBENGINE: initializing data file \"%s\".", path); ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); if (ret) @@ -354,7 +355,8 @@ static int load_data_file(struct rrdengine_datafile *datafile) datafile->file = file; datafile->pos = file_size; - netdata_log_info("DBENGINE: data file \"%s\" initialized (size:%"PRIu64").", path, file_size); + nd_log_daemon(NDLP_DEBUG, "DBENGINE: data file \"%s\" initialized (size:%" PRIu64 ").", path, file_size); + return 0; error: @@ -422,6 +424,7 @@ static int scan_data_files(struct rrdengine_instance *ctx) ctx->atomic.last_fileno = datafiles[matched_files - 1]->fileno; + netdata_log_info("DBENGINE: loading %d data/journal of tier %d...", matched_files, ctx->config.tier); for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { uint8_t must_delete_pair = 0; @@ -479,14 +482,18 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock) int ret; char path[RRDENG_PATH_MAX]; - netdata_log_info("DBENGINE: creating new data and journal files in path %s", ctx->config.dbfiles_path); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "DBENGINE: creating new data and journal files in path %s", + ctx->config.dbfiles_path); + datafile = datafile_alloc_and_init(ctx, 1, fileno); ret = create_data_file(datafile); if(ret) goto error_after_datafile; generate_datafilepath(datafile, path, sizeof(path)); - netdata_log_info("DBENGINE: created data file \"%s\".", path); + nd_log(NDLS_DAEMON, NDLP_INFO, + "DBENGINE: created data file \"%s\".", path); journalfile = journalfile_alloc_and_init(datafile); ret = journalfile_create(journalfile, datafile); @@ -494,7 +501,8 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock) goto error_after_journalfile; journalfile_v1_generate_path(datafile, path, sizeof(path)); - netdata_log_info("DBENGINE: created journal file \"%s\".", path); + nd_log(NDLS_DAEMON, NDLP_INFO, + "DBENGINE: created journal file \"%s\".", path); ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos); datafile_list_insert(ctx, datafile, having_lock); diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index abb9d2eb9..9005b81ca 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -67,7 +67,7 @@ void journalfile_v2_generate_path(struct rrdengine_datafile *datafile, char *str void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { - (void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, + (void) snprintfz(str, maxlen - 1, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION, datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno); } @@ -169,7 +169,7 @@ static void njfv2idx_add(struct rrdengine_datafile *datafile) { *PValue = datafile; break; } - } while(0); + } while(1); rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock); } @@ -1013,7 +1013,7 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st journalfile_v2_data_release(journalfile); usec_t ended_ut = now_monotonic_usec(); - netdata_log_info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms" + nd_log_daemon(NDLP_DEBUG, "DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms" , ctx->config.tier, journalfile->datafile->fileno , (double)data_size / 1024 / 1024 , (double)entries / 1000 @@ -1073,7 +1073,8 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal return 1; } - netdata_log_info("DBENGINE: checking integrity of '%s'", path_v2); + nd_log_daemon(NDLP_DEBUG, "DBENGINE: checking integrity of '%s'", path_v2); + usec_t validation_start_ut = now_monotonic_usec(); int rc = journalfile_v2_validate(data_start, journal_v2_file_size, journal_v1_file_size); if (unlikely(rc)) { @@ -1104,7 +1105,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal usec_t finished_ut = now_monotonic_usec(); - netdata_log_info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, " + nd_log_daemon(NDLP_DEBUG, "DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, " "mmap: %0.2f ms, validate: %0.2f ms" , path_v2 , (double)journal_v2_file_size / 1024 / 1024 @@ -1535,13 +1536,13 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil } ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_jf_sb)); - netdata_log_info("DBENGINE: loading journal file '%s'", path); + nd_log_daemon(NDLP_DEBUG, "DBENGINE: loading journal file '%s'", path); max_id = journalfile_iterate_transactions(ctx, journalfile); __atomic_store_n(&ctx->atomic.transaction_id, MAX(__atomic_load_n(&ctx->atomic.transaction_id, __ATOMIC_RELAXED), max_id + 1), __ATOMIC_RELAXED); - netdata_log_info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size); + nd_log_daemon(NDLP_DEBUG, "DBENGINE: journal file '%s' loaded (size:%" PRIu64 ").", path, file_size); bool is_last_file = (ctx_last_fileno_get(ctx) == journalfile->datafile->fileno); if (is_last_file && journalfile->datafile->pos <= rrdeng_target_data_file_size(ctx) / 3) { diff --git a/database/engine/metric.c b/database/engine/metric.c index 69b8f3116..2e132612e 100644 --- a/database/engine/metric.c +++ b/database/engine/metric.c @@ -1,30 +1,44 @@ +// SPDX-License-Identifier: GPL-3.0-or-later #include "metric.h" typedef int32_t REFCOUNT; #define REFCOUNT_DELETING (-100) -typedef enum __attribute__ ((__packed__)) { - METRIC_FLAG_HAS_RETENTION = (1 << 0), -} METRIC_FLAGS; - struct metric { uuid_t uuid; // never changes Word_t section; // never changes - time_t first_time_s; // - time_t latest_time_s_clean; // archived pages latest time - time_t latest_time_s_hot; // latest time of the currently collected page - uint32_t latest_update_every_s; // + time_t first_time_s; // the timestamp of the oldest point in the database + time_t latest_time_s_clean; // the timestamp of the newest point in the database + time_t latest_time_s_hot; // the timestamp of the latest point that has been collected (not yet stored) + uint32_t latest_update_every_s; // the latest data collection frequency pid_t writer; uint8_t partition; - METRIC_FLAGS flags; REFCOUNT refcount; - SPINLOCK spinlock; // protects all variable members // THIS IS allocated with malloc() // YOU HAVE TO INITIALIZE IT YOURSELF ! }; +#define set_metric_field_with_condition(field, value, condition) ({ \ + typeof(field) _current = __atomic_load_n(&(field), __ATOMIC_RELAXED); \ + typeof(field) _wanted = value; \ + bool did_it = true; \ + \ + do { \ + if((condition) && (_current != _wanted)) { \ + ; \ + } \ + else { \ + did_it = false; \ + break; \ + } \ + } while(!__atomic_compare_exchange_n(&(field), &_current, _wanted, \ + false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); \ + \ + did_it; \ +}) + static struct aral_statistics mrg_aral_statistics; struct mrg { @@ -73,9 +87,6 @@ static inline void MRG_STATS_DELETE_MISS(MRG *mrg, size_t partition) { #define mrg_index_write_lock(mrg, partition) rw_spinlock_write_lock(&(mrg)->index[partition].rw_spinlock) #define mrg_index_write_unlock(mrg, partition) rw_spinlock_write_unlock(&(mrg)->index[partition].rw_spinlock) -#define metric_lock(metric) spinlock_lock(&(metric)->spinlock) -#define metric_unlock(metric) spinlock_unlock(&(metric)->spinlock) - static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl, size_t partition) { if(mem_after_judyl > mem_before_judyl) __atomic_add_fetch(&mrg->index[partition].stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED); @@ -97,40 +108,34 @@ static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) { return *n % mrg->partitions; } -static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) { - size_t partition = metric->partition; +static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused, METRIC *metric) { + time_t first_time_s = __atomic_load_n(&metric->first_time_s, __ATOMIC_RELAXED); - bool has_retention = (metric->first_time_s > 0 || metric->latest_time_s_clean > 0 || metric->latest_time_s_hot > 0); + if(first_time_s <= 0) { + first_time_s = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); + if(first_time_s <= 0) + first_time_s = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); - if(has_retention && !(metric->flags & METRIC_FLAG_HAS_RETENTION)) { - metric->flags |= METRIC_FLAG_HAS_RETENTION; - __atomic_add_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED); - } - else if(!has_retention && (metric->flags & METRIC_FLAG_HAS_RETENTION)) { - metric->flags &= ~METRIC_FLAG_HAS_RETENTION; - __atomic_sub_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED); + if(first_time_s <= 0) + first_time_s = 0; + else + __atomic_store_n(&metric->first_time_s, first_time_s, __ATOMIC_RELAXED); } - return has_retention; + return first_time_s; } -static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, bool having_spinlock) { +static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric) { size_t partition = metric->partition; + REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); REFCOUNT refcount; - if(!having_spinlock) - metric_lock(metric); - - if(unlikely(metric->refcount < 0)) - fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount); - - refcount = ++metric->refcount; - - // update its retention flags - metric_has_retention_unsafe(mrg, metric); + do { + if(expected < 0) + fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount); - if(!having_spinlock) - metric_unlock(metric); + refcount = expected + 1; + } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); if(refcount == 1) __atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); @@ -141,28 +146,25 @@ static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, b } static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) { - bool ret = true; size_t partition = metric->partition; + REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); REFCOUNT refcount; - metric_lock(metric); - - if(unlikely(metric->refcount <= 0)) - fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount); - - refcount = --metric->refcount; - - if(likely(metric_has_retention_unsafe(mrg, metric) || refcount != 0)) - ret = false; + do { + if(expected <= 0) + fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount); - metric_unlock(metric); + refcount = expected - 1; + } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); if(unlikely(!refcount)) __atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); __atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); - return ret; + time_t first, last, ue; + mrg_metric_get_retention(mrg, metric, &first, &last, &ue); + return (!first || !last || first > last); } static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { @@ -192,7 +194,7 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r if(unlikely(*PValue != NULL)) { METRIC *metric = *PValue; - metric_acquire(mrg, metric, false); + metric_acquire(mrg, metric); MRG_STATS_DUPLICATE_ADD(mrg, partition); @@ -215,10 +217,8 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r metric->latest_update_every_s = entry->latest_update_every_s; metric->writer = 0; metric->refcount = 0; - metric->flags = 0; metric->partition = partition; - spinlock_init(&metric->spinlock); - metric_acquire(mrg, metric, true); // no spinlock use required here + metric_acquire(mrg, metric); *PValue = metric; MRG_STATS_ADDED_METRIC(mrg, partition); @@ -252,7 +252,7 @@ static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t sect METRIC *metric = *PValue; - metric_acquire(mrg, metric, false); + metric_acquire(mrg, metric); mrg_index_read_unlock(mrg, partition); @@ -363,7 +363,7 @@ inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) { } inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { - metric_acquire(mrg, metric, false); + metric_acquire(mrg, metric); return metric; } @@ -389,10 +389,7 @@ inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, if(unlikely(first_time_s < 0)) return false; - metric_lock(metric); - metric->first_time_s = first_time_s; - metric_has_retention_unsafe(mrg, metric); - metric_unlock(metric); + __atomic_store_n(&metric->first_time_s, first_time_s, __ATOMIC_RELAXED); return true; } @@ -405,112 +402,56 @@ inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, internal_fatal(last_time_s > max_acceptable_collected_time(), "DBENGINE METRIC: metric last time is in the future"); - if(unlikely(first_time_s < 0)) - first_time_s = 0; - - if(unlikely(last_time_s < 0)) - last_time_s = 0; - - if(unlikely(update_every_s < 0)) - update_every_s = 0; - - if(unlikely(!first_time_s && !last_time_s && !update_every_s)) - return; + if(first_time_s > 0) + set_metric_field_with_condition(metric->first_time_s, first_time_s, _current <= 0 || _wanted < _current); - metric_lock(metric); - - if(unlikely(first_time_s && (!metric->first_time_s || first_time_s < metric->first_time_s))) - metric->first_time_s = first_time_s; - - if(likely(last_time_s && (!metric->latest_time_s_clean || last_time_s > metric->latest_time_s_clean))) { - metric->latest_time_s_clean = last_time_s; - - if(likely(update_every_s)) - metric->latest_update_every_s = (uint32_t) update_every_s; + if(last_time_s > 0) { + if(set_metric_field_with_condition(metric->latest_time_s_clean, last_time_s, _current <= 0 || _wanted > _current) && + update_every_s > 0) + // set the latest update every too + set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true); } - else if(unlikely(!metric->latest_update_every_s && update_every_s)) - metric->latest_update_every_s = (uint32_t) update_every_s; - - metric_has_retention_unsafe(mrg, metric); - metric_unlock(metric); + else if(update_every_s > 0) + // set it only if it is invalid + set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0); } inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative"); - - bool ret = false; - - metric_lock(metric); - if(first_time_s > metric->first_time_s) { - metric->first_time_s = first_time_s; - ret = true; - } - metric_has_retention_unsafe(mrg, metric); - metric_unlock(metric); - - return ret; + return set_metric_field_with_condition(metric->first_time_s, first_time_s, _wanted > _current); } inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { - time_t first_time_s; - - metric_lock(metric); - - if(unlikely(!metric->first_time_s)) { - if(metric->latest_time_s_clean) - metric->first_time_s = metric->latest_time_s_clean; - - else if(metric->latest_time_s_hot) - metric->first_time_s = metric->latest_time_s_hot; - } - - first_time_s = metric->first_time_s; - - metric_unlock(metric); - - return first_time_s; + return mrg_metric_get_first_time_s_smart(mrg, metric); } inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) { - metric_lock(metric); - - if(unlikely(!metric->first_time_s)) { - if(metric->latest_time_s_clean) - metric->first_time_s = metric->latest_time_s_clean; - - else if(metric->latest_time_s_hot) - metric->first_time_s = metric->latest_time_s_hot; - } - - *first_time_s = metric->first_time_s; - *last_time_s = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot); - *update_every_s = metric->latest_update_every_s; + time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); + time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); - metric_unlock(metric); + *last_time_s = MAX(clean, hot); + *first_time_s = mrg_metric_get_first_time_s_smart(mrg, metric); + *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED); } inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative"); - if(unlikely(latest_time_s < 0)) - return false; - - metric_lock(metric); - // internal_fatal(latest_time_s > max_acceptable_collected_time(), // "DBENGINE METRIC: metric latest time is in the future"); // internal_fatal(metric->latest_time_s_clean > latest_time_s, // "DBENGINE METRIC: metric new clean latest time is older than the previous one"); - metric->latest_time_s_clean = latest_time_s; + if(latest_time_s > 0) { + if(set_metric_field_with_condition(metric->latest_time_s_clean, latest_time_s, true)) { + set_metric_field_with_condition(metric->first_time_s, latest_time_s, _current <= 0 || _wanted < _current); - if(unlikely(!metric->first_time_s)) - metric->first_time_s = latest_time_s; + return true; + } + } - metric_has_retention_unsafe(mrg, metric); - metric_unlock(metric); - return true; + return false; } // returns true when metric still has retention @@ -518,7 +459,6 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr Word_t section = mrg_metric_section(mrg, metric); bool do_again = false; size_t countdown = 5; - bool ret = true; do { time_t min_first_time_s = LONG_MAX; @@ -547,22 +487,20 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr if (min_first_time_s == LONG_MAX) min_first_time_s = 0; - metric_lock(metric); - if (--countdown && !min_first_time_s && metric->latest_time_s_hot) + if (--countdown && !min_first_time_s && __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED)) do_again = true; else { internal_error(!countdown, "METRIC: giving up on updating the retention of metric without disk retention"); do_again = false; - metric->first_time_s = min_first_time_s; - metric->latest_time_s_clean = max_end_time_s; - - ret = metric_has_retention_unsafe(mrg, metric); + set_metric_field_with_condition(metric->first_time_s, min_first_time_s, true); + set_metric_field_with_condition(metric->latest_time_s_clean, max_end_time_s, true); } - metric_unlock(metric); } while(do_again); - return ret; + time_t first, last, ue; + mrg_metric_get_retention(mrg, metric, &first, &last, &ue); + return (first && last && first < last); } inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { @@ -571,88 +509,80 @@ inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *me // internal_fatal(latest_time_s > max_acceptable_collected_time(), // "DBENGINE METRIC: metric latest time is in the future"); - if(unlikely(latest_time_s < 0)) - return false; - - metric_lock(metric); - metric->latest_time_s_hot = latest_time_s; - - if(unlikely(!metric->first_time_s)) - metric->first_time_s = latest_time_s; + if(likely(latest_time_s > 0)) { + __atomic_store_n(&metric->latest_time_s_hot, latest_time_s, __ATOMIC_RELAXED); + return true; + } - metric_has_retention_unsafe(mrg, metric); - metric_unlock(metric); - return true; + return false; } inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) { - time_t max; - metric_lock(metric); - max = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot); - metric_unlock(metric); - return max; + time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); + time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); + + return MAX(clean, hot); } inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); - if(update_every_s <= 0) - return false; - - metric_lock(metric); - metric->latest_update_every_s = (uint32_t) update_every_s; - metric_unlock(metric); + if(update_every_s > 0) + return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true); - return true; + return false; } inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); - if(update_every_s <= 0) - return false; - - metric_lock(metric); - if(!metric->latest_update_every_s) - metric->latest_update_every_s = (uint32_t) update_every_s; - metric_unlock(metric); + if(update_every_s > 0) + return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0); - return true; + return false; } inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) { - time_t update_every_s; - - metric_lock(metric); - update_every_s = metric->latest_update_every_s; - metric_unlock(metric); - - return update_every_s; + return __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED); } inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) { - bool done = false; - metric_lock(metric); - if(!metric->writer) { - metric->writer = gettid(); + pid_t expected = __atomic_load_n(&metric->writer, __ATOMIC_RELAXED); + pid_t wanted = gettid(); + bool done = true; + + do { + if(expected != 0) { + done = false; + break; + } + } while(!__atomic_compare_exchange_n(&metric->writer, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + if(done) __atomic_add_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED); - done = true; - } else __atomic_add_fetch(&mrg->index[metric->partition].stats.writers_conflicts, 1, __ATOMIC_RELAXED); - metric_unlock(metric); + return done; } inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) { - bool done = false; - metric_lock(metric); - if(metric->writer) { - metric->writer = 0; + // this function can be called from a different thread than the one than the writer + + pid_t expected = __atomic_load_n(&metric->writer, __ATOMIC_RELAXED); + pid_t wanted = 0; + bool done = true; + + do { + if(!expected) { + done = false; + break; + } + } while(!__atomic_compare_exchange_n(&metric->writer, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + if(done) __atomic_sub_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED); - done = true; - } - metric_unlock(metric); + return done; } @@ -662,27 +592,30 @@ inline void mrg_update_metric_retention_and_granularity_by_uuid( time_t update_every_s, time_t now_s) { if(unlikely(last_time_s > now_s)) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), " - "fixing last time to now", - first_time_s, last_time_s, now_s); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), " + "fixing last time to now", + first_time_s, last_time_s, now_s); last_time_s = now_s; } if (unlikely(first_time_s > last_time_s)) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), " - "fixing first time to last time", - first_time_s, last_time_s, now_s); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), " + "fixing first time to last time", + first_time_s, last_time_s, now_s); first_time_s = last_time_s; } if (unlikely(first_time_s == 0 || last_time_s == 0)) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), " - "using them as-is", - first_time_s, last_time_s, now_s); + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, + "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), " + "using them as-is", + first_time_s, last_time_s, now_s); } bool added = false; @@ -710,7 +643,6 @@ inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) { for(size_t i = 0; i < mrg->partitions ;i++) { s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED); s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED); - s->entries_with_retention += __atomic_load_n(&mrg->index[i].stats.entries_with_retention, __ATOMIC_RELAXED); s->size += __atomic_load_n(&mrg->index[i].stats.size, __ATOMIC_RELAXED); s->current_references += __atomic_load_n(&mrg->index[i].stats.current_references, __ATOMIC_RELAXED); s->additions += __atomic_load_n(&mrg->index[i].stats.additions, __ATOMIC_RELAXED); @@ -900,7 +832,7 @@ int mrg_unittest(void) { pthread_t th[threads]; for(size_t i = 0; i < threads ; i++) { char buf[15 + 1]; - snprintfz(buf, 15, "TH[%zu]", i); + snprintfz(buf, sizeof(buf) - 1, "TH[%zu]", i); netdata_thread_create(&th[i], buf, NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, mrg_stress, &t); diff --git a/database/engine/metric.h b/database/engine/metric.h index 5d5ebd7b1..dbb949301 100644 --- a/database/engine/metric.h +++ b/database/engine/metric.h @@ -1,3 +1,4 @@ +// SPDX-License-Identifier: GPL-3.0-or-later #ifndef DBENGINE_METRIC_H #define DBENGINE_METRIC_H @@ -35,9 +36,6 @@ struct mrg_statistics { size_t entries_referenced; - MRG_CACHE_LINE_PADDING(1); - size_t entries_with_retention; - MRG_CACHE_LINE_PADDING(2); size_t current_references; diff --git a/database/engine/page.c b/database/engine/page.c new file mode 100644 index 000000000..b7a393483 --- /dev/null +++ b/database/engine/page.c @@ -0,0 +1,679 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "page.h" + +#include "libnetdata/libnetdata.h" + +typedef enum __attribute__((packed)) { + PAGE_OPTION_ALL_VALUES_EMPTY = (1 << 0), +} PAGE_OPTIONS; + +typedef enum __attribute__((packed)) { + PGD_STATE_CREATED_FROM_COLLECTOR = (1 << 0), + PGD_STATE_CREATED_FROM_DISK = (1 << 1), + PGD_STATE_SCHEDULED_FOR_FLUSHING = (1 << 2), + PGD_STATE_FLUSHED_TO_DISK = (1 << 3), +} PGD_STATES; + +typedef struct { + uint8_t *data; + uint32_t size; +} page_raw_t; + + +typedef struct { + size_t num_buffers; + gorilla_writer_t *writer; + int aral_index; +} page_gorilla_t; + +struct pgd { + // the page type + uint8_t type; + + // options related to the page + PAGE_OPTIONS options; + + PGD_STATES states; + + // the uses number of slots in the page + uint32_t used; + + // the total number of slots available in the page + uint32_t slots; + + union { + page_raw_t raw; + page_gorilla_t gorilla; + }; +}; + +// ---------------------------------------------------------------------------- +// memory management + +struct { + ARAL *aral_pgd; + ARAL *aral_data[RRD_STORAGE_TIERS]; + ARAL *aral_gorilla_buffer[4]; + ARAL *aral_gorilla_writer[4]; +} pgd_alloc_globals = {}; + +static ARAL *pgd_aral_data_lookup(size_t size) +{ + for (size_t tier = 0; tier < storage_tiers; tier++) + if (size == tier_page_size[tier]) + return pgd_alloc_globals.aral_data[tier]; + + return NULL; +} + +void pgd_init_arals(void) +{ + // pgd aral + { + char buf[20 + 1]; + snprintfz(buf, sizeof(buf) - 1, "pgd"); + + // FIXME: add stats + pgd_alloc_globals.aral_pgd = aral_create( + buf, + sizeof(struct pgd), + 64, + 512 * (sizeof(struct pgd)), + pgc_aral_statistics(), + NULL, NULL, false, false); + } + + // tier page aral + { + for (size_t i = storage_tiers; i > 0 ;i--) + { + size_t tier = storage_tiers - i; + + char buf[20 + 1]; + snprintfz(buf, sizeof(buf) - 1, "tier%zu-pages", tier); + + pgd_alloc_globals.aral_data[tier] = aral_create( + buf, + tier_page_size[tier], + 64, + 512 * (tier_page_size[tier]), + pgc_aral_statistics(), + NULL, NULL, false, false); + } + } + + // gorilla buffers aral + for (size_t i = 0; i != 4; i++) { + char buf[20 + 1]; + snprintfz(buf, sizeof(buf) - 1, "gbuffer-%zu", i); + + // FIXME: add stats + pgd_alloc_globals.aral_gorilla_buffer[i] = aral_create( + buf, + GORILLA_BUFFER_SIZE, + 64, + 512 * GORILLA_BUFFER_SIZE, + pgc_aral_statistics(), + NULL, NULL, false, false); + } + + // gorilla writers aral + for (size_t i = 0; i != 4; i++) { + char buf[20 + 1]; + snprintfz(buf, sizeof(buf) - 1, "gwriter-%zu", i); + + // FIXME: add stats + pgd_alloc_globals.aral_gorilla_writer[i] = aral_create( + buf, + sizeof(gorilla_writer_t), + 64, + 512 * sizeof(gorilla_writer_t), + pgc_aral_statistics(), + NULL, NULL, false, false); + } +} + +static void *pgd_data_aral_alloc(size_t size) +{ + ARAL *ar = pgd_aral_data_lookup(size); + if (!ar) + return mallocz(size); + else + return aral_mallocz(ar); +} + +static void pgd_data_aral_free(void *page, size_t size) +{ + ARAL *ar = pgd_aral_data_lookup(size); + if (!ar) + freez(page); + else + aral_freez(ar, page); +} + +// ---------------------------------------------------------------------------- +// management api + +PGD *pgd_create(uint8_t type, uint32_t slots) +{ + PGD *pg = aral_mallocz(pgd_alloc_globals.aral_pgd); + pg->type = type; + pg->used = 0; + pg->slots = slots; + pg->options = PAGE_OPTION_ALL_VALUES_EMPTY; + pg->states = PGD_STATE_CREATED_FROM_COLLECTOR; + + switch (type) { + case PAGE_METRICS: + case PAGE_TIER: { + uint32_t size = slots * page_type_size[type]; + + internal_fatal(!size || slots == 1, + "DBENGINE: invalid number of slots (%u) or page type (%u)", slots, type); + + pg->raw.size = size; + pg->raw.data = pgd_data_aral_alloc(size); + break; + } + case PAGE_GORILLA_METRICS: { + internal_fatal(slots == 1, + "DBENGINE: invalid number of slots (%u) or page type (%u)", slots, type); + + pg->slots = 8 * GORILLA_BUFFER_SLOTS; + + // allocate new gorilla writer + pg->gorilla.aral_index = gettid() % 4; + pg->gorilla.writer = aral_mallocz(pgd_alloc_globals.aral_gorilla_writer[pg->gorilla.aral_index]); + + // allocate new gorilla buffer + gorilla_buffer_t *gbuf = aral_mallocz(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index]); + memset(gbuf, 0, GORILLA_BUFFER_SIZE); + global_statistics_gorilla_buffer_add_hot(); + + *pg->gorilla.writer = gorilla_writer_init(gbuf, GORILLA_BUFFER_SLOTS); + pg->gorilla.num_buffers = 1; + + break; + } + default: + fatal("Unknown page type: %uc", type); + } + + return pg; +} + +PGD *pgd_create_from_disk_data(uint8_t type, void *base, uint32_t size) +{ + if (!size) + return PGD_EMPTY; + + if (size < page_type_size[type]) + return PGD_EMPTY; + + PGD *pg = aral_mallocz(pgd_alloc_globals.aral_pgd); + + pg->type = type; + pg->states = PGD_STATE_CREATED_FROM_DISK; + pg->options = ~PAGE_OPTION_ALL_VALUES_EMPTY; + + switch (type) + { + case PAGE_METRICS: + case PAGE_TIER: + pg->raw.size = size; + pg->used = size / page_type_size[type]; + pg->slots = pg->used; + + pg->raw.data = pgd_data_aral_alloc(size); + memcpy(pg->raw.data, base, size); + break; + case PAGE_GORILLA_METRICS: + internal_fatal(size == 0, "Asked to create page with 0 data!!!"); + internal_fatal(size % sizeof(uint32_t), "Unaligned gorilla buffer size"); + internal_fatal(size % GORILLA_BUFFER_SIZE, "Expected size to be a multiple of %zu-bytes", GORILLA_BUFFER_SIZE); + + pg->raw.data = mallocz(size); + pg->raw.size = size; + + // TODO: rm this + memset(pg->raw.data, 0, size); + memcpy(pg->raw.data, base, size); + + uint32_t total_entries = gorilla_buffer_patch((void *) pg->raw.data); + + pg->used = total_entries; + pg->slots = pg->used; + break; + default: + fatal("Unknown page type: %uc", type); + } + + return pg; +} + +void pgd_free(PGD *pg) +{ + if (!pg) + return; + + if (pg == PGD_EMPTY) + return; + + switch (pg->type) + { + case PAGE_METRICS: + case PAGE_TIER: + pgd_data_aral_free(pg->raw.data, pg->raw.size); + break; + case PAGE_GORILLA_METRICS: { + if (pg->states & PGD_STATE_CREATED_FROM_DISK) + { + internal_fatal(pg->raw.data == NULL, "Tried to free gorilla PGD loaded from disk with NULL data"); + freez(pg->raw.data); + pg->raw.data = NULL; + } + else if ((pg->states & PGD_STATE_CREATED_FROM_COLLECTOR) || + (pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING) || + (pg->states & PGD_STATE_FLUSHED_TO_DISK)) + { + internal_fatal(pg->gorilla.writer == NULL, + "PGD does not have an active gorilla writer"); + + internal_fatal(pg->gorilla.num_buffers == 0, + "PGD does not have any gorilla buffers allocated"); + + while (true) { + gorilla_buffer_t *gbuf = gorilla_writer_drop_head_buffer(pg->gorilla.writer); + if (!gbuf) + break; + aral_freez(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index], gbuf); + pg->gorilla.num_buffers -= 1; + } + + internal_fatal(pg->gorilla.num_buffers != 0, + "Could not free all gorilla writer buffers"); + + aral_freez(pgd_alloc_globals.aral_gorilla_writer[pg->gorilla.aral_index], pg->gorilla.writer); + pg->gorilla.writer = NULL; + } else { + fatal("pgd_free() called on gorilla page with unsupported state"); + // TODO: should we support any other states? + // if (!(pg->states & PGD_STATE_FLUSHED_TO_DISK)) + // fatal("pgd_free() is not supported yet for pages flushed to disk"); + } + + break; + } + default: + fatal("Unknown page type: %uc", pg->type); + } + + aral_freez(pgd_alloc_globals.aral_pgd, pg); +} + +// ---------------------------------------------------------------------------- +// utility functions + +uint32_t pgd_type(PGD *pg) +{ + return pg->type; +} + +bool pgd_is_empty(PGD *pg) +{ + if (!pg) + return true; + + if (pg == PGD_EMPTY) + return true; + + if (pg->used == 0) + return true; + + if (pg->options & PAGE_OPTION_ALL_VALUES_EMPTY) + return true; + + return false; +} + +uint32_t pgd_slots_used(PGD *pg) +{ + if (!pg) + return 0; + + if (pg == PGD_EMPTY) + return 0; + + return pg->used; +} + +uint32_t pgd_memory_footprint(PGD *pg) +{ + if (!pg) + return 0; + + if (pg == PGD_EMPTY) + return 0; + + size_t footprint = 0; + switch (pg->type) { + case PAGE_METRICS: + case PAGE_TIER: + footprint = sizeof(PGD) + pg->raw.size; + break; + case PAGE_GORILLA_METRICS: { + if (pg->states & PGD_STATE_CREATED_FROM_DISK) + footprint = sizeof(PGD) + pg->raw.size; + else + footprint = sizeof(PGD) + sizeof(gorilla_writer_t) + (pg->gorilla.num_buffers * GORILLA_BUFFER_SIZE); + + break; + } + default: + fatal("Unknown page type: %uc", pg->type); + } + + return footprint; +} + +uint32_t pgd_disk_footprint(PGD *pg) +{ + if (!pgd_slots_used(pg)) + return 0; + + size_t size = 0; + + switch (pg->type) { + case PAGE_METRICS: + case PAGE_TIER: { + uint32_t used_size = pg->used * page_type_size[pg->type]; + internal_fatal(used_size > pg->raw.size, "Wrong disk footprint page size"); + size = used_size; + + break; + } + case PAGE_GORILLA_METRICS: { + if (pg->states & PGD_STATE_CREATED_FROM_COLLECTOR || + pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING || + pg->states & PGD_STATE_FLUSHED_TO_DISK) + { + internal_fatal(!pg->gorilla.writer, + "pgd_disk_footprint() not implemented for NULL gorilla writers"); + + internal_fatal(pg->gorilla.num_buffers == 0, + "Gorilla writer does not have any buffers"); + + size = pg->gorilla.num_buffers * GORILLA_BUFFER_SIZE; + + if (pg->states & PGD_STATE_CREATED_FROM_COLLECTOR) { + global_statistics_tier0_disk_compressed_bytes(gorilla_writer_nbytes(pg->gorilla.writer)); + global_statistics_tier0_disk_uncompressed_bytes(gorilla_writer_entries(pg->gorilla.writer) * sizeof(storage_number)); + } + } else if (pg->states & PGD_STATE_CREATED_FROM_DISK) { + size = pg->raw.size; + } else { + fatal("Asked disk footprint on unknown page state"); + } + + break; + } + default: + fatal("Unknown page type: %uc", pg->type); + } + + internal_fatal(pg->states & PGD_STATE_CREATED_FROM_DISK, + "Disk footprint asked for page created from disk."); + pg->states = PGD_STATE_SCHEDULED_FOR_FLUSHING; + return size; +} + +void pgd_copy_to_extent(PGD *pg, uint8_t *dst, uint32_t dst_size) +{ + internal_fatal(pgd_disk_footprint(pg) != dst_size, "Wrong disk footprint size requested (need %u, available %u)", + pgd_disk_footprint(pg), dst_size); + + switch (pg->type) { + case PAGE_METRICS: + case PAGE_TIER: + memcpy(dst, pg->raw.data, dst_size); + break; + case PAGE_GORILLA_METRICS: { + if ((pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING) == 0) + fatal("Copying to extent is supported only for PGDs that are scheduled for flushing."); + + internal_fatal(!pg->gorilla.writer, + "pgd_copy_to_extent() not implemented for NULL gorilla writers"); + + internal_fatal(pg->gorilla.num_buffers == 0, + "pgd_copy_to_extent() gorilla writer does not have any buffers"); + + bool ok = gorilla_writer_serialize(pg->gorilla.writer, dst, dst_size); + UNUSED(ok); + internal_fatal(!ok, + "pgd_copy_to_extent() tried to serialize pg=%p, gw=%p (with dst_size=%u bytes, num_buffers=%zu)", + pg, pg->gorilla.writer, dst_size, pg->gorilla.num_buffers); + break; + } + default: + fatal("Unknown page type: %uc", pg->type); + } + + pg->states = PGD_STATE_FLUSHED_TO_DISK; +} + +// ---------------------------------------------------------------------------- +// data collection + +void pgd_append_point(PGD *pg, + usec_t point_in_time_ut __maybe_unused, + NETDATA_DOUBLE n, + NETDATA_DOUBLE min_value, + NETDATA_DOUBLE max_value, + uint16_t count, + uint16_t anomaly_count, + SN_FLAGS flags, + uint32_t expected_slot) +{ + if (unlikely(pg->used >= pg->slots)) + fatal("DBENGINE: attempted to write beyond page size (page type %u, slots %u, used %u)", + pg->type, pg->slots, pg->used /* FIXME:, pg->size */); + + if (unlikely(pg->used != expected_slot)) + fatal("DBENGINE: page is not aligned to expected slot (used %u, expected %u)", + pg->used, expected_slot); + + if (!(pg->states & PGD_STATE_CREATED_FROM_COLLECTOR)) + fatal("DBENGINE: collection on page not created from a collector"); + + if (pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING) + fatal("Data collection on page already scheduled for flushing"); + + switch (pg->type) { + case PAGE_METRICS: { + storage_number *tier0_metric_data = (storage_number *)pg->raw.data; + storage_number t = pack_storage_number(n, flags); + tier0_metric_data[pg->used++] = t; + + if ((pg->options & PAGE_OPTION_ALL_VALUES_EMPTY) && does_storage_number_exist(t)) + pg->options &= ~PAGE_OPTION_ALL_VALUES_EMPTY; + + break; + } + case PAGE_TIER: { + storage_number_tier1_t *tier12_metric_data = (storage_number_tier1_t *)pg->raw.data; + storage_number_tier1_t t; + t.sum_value = (float) n; + t.min_value = (float) min_value; + t.max_value = (float) max_value; + t.anomaly_count = anomaly_count; + t.count = count; + tier12_metric_data[pg->used++] = t; + + if ((pg->options & PAGE_OPTION_ALL_VALUES_EMPTY) && fpclassify(n) != FP_NAN) + pg->options &= ~PAGE_OPTION_ALL_VALUES_EMPTY; + + break; + } + case PAGE_GORILLA_METRICS: { + pg->used++; + storage_number t = pack_storage_number(n, flags); + + if ((pg->options & PAGE_OPTION_ALL_VALUES_EMPTY) && does_storage_number_exist(t)) + pg->options &= ~PAGE_OPTION_ALL_VALUES_EMPTY; + + bool ok = gorilla_writer_write(pg->gorilla.writer, t); + if (!ok) { + gorilla_buffer_t *new_buffer = aral_mallocz(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index]); + memset(new_buffer, 0, GORILLA_BUFFER_SIZE); + + gorilla_writer_add_buffer(pg->gorilla.writer, new_buffer, GORILLA_BUFFER_SLOTS); + pg->gorilla.num_buffers += 1; + global_statistics_gorilla_buffer_add_hot(); + + ok = gorilla_writer_write(pg->gorilla.writer, t); + internal_fatal(ok == false, "Failed to writer value in newly allocated gorilla buffer."); + } + break; + } + default: + fatal("DBENGINE: unknown page type id %d", pg->type); + break; + } +} + +// ---------------------------------------------------------------------------- +// querying with cursor + +static void pgdc_seek(PGDC *pgdc, uint32_t position) +{ + PGD *pg = pgdc->pgd; + + switch (pg->type) { + case PAGE_METRICS: + case PAGE_TIER: + pgdc->slots = pgdc->pgd->used; + break; + case PAGE_GORILLA_METRICS: { + if (pg->states & PGD_STATE_CREATED_FROM_DISK) { + pgdc->slots = pgdc->pgd->slots; + pgdc->gr = gorilla_reader_init((void *) pg->raw.data); + } else { + if (!(pg->states & PGD_STATE_CREATED_FROM_COLLECTOR) && + !(pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING) && + !(pg->states & PGD_STATE_FLUSHED_TO_DISK)) + fatal("pgdc_seek() currently is not supported for pages created from disk."); + + if (!pg->gorilla.writer) + fatal("Seeking from a page without an active gorilla writer is not supported (yet)."); + + pgdc->slots = gorilla_writer_entries(pg->gorilla.writer); + pgdc->gr = gorilla_writer_get_reader(pg->gorilla.writer); + } + + if (position > pgdc->slots) + position = pgdc->slots; + + for (uint32_t i = 0; i != position; i++) { + uint32_t value; + + bool ok = gorilla_reader_read(&pgdc->gr, &value); + + if (!ok) { + // this is fine, the reader will return empty points + break; + } + } + + break; + } + default: + fatal("DBENGINE: unknown page type id %d", pg->type); + break; + } +} + +void pgdc_reset(PGDC *pgdc, PGD *pgd, uint32_t position) +{ + // pgd might be null and position equal to UINT32_MAX + + pgdc->pgd = pgd; + pgdc->position = position; + + if (!pgd) + return; + + if (pgd == PGD_EMPTY) + return; + + if (position == UINT32_MAX) + return; + + pgdc_seek(pgdc, position); +} + +bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *sp) +{ + if (!pgdc->pgd || pgdc->pgd == PGD_EMPTY || pgdc->position >= pgdc->slots) + { + storage_point_empty(*sp, sp->start_time_s, sp->end_time_s); + return false; + } + + internal_fatal(pgdc->position != expected_position, "Wrong expected cursor position"); + + switch (pgdc->pgd->type) + { + case PAGE_METRICS: { + storage_number *array = (storage_number *) pgdc->pgd->raw.data; + storage_number n = array[pgdc->position++]; + + sp->min = sp->max = sp->sum = unpack_storage_number(n); + sp->flags = (SN_FLAGS)(n & SN_USER_FLAGS); + sp->count = 1; + sp->anomaly_count = is_storage_number_anomalous(n) ? 1 : 0; + + return true; + } + case PAGE_TIER: { + storage_number_tier1_t *array = (storage_number_tier1_t *) pgdc->pgd->raw.data; + storage_number_tier1_t n = array[pgdc->position++]; + + sp->flags = n.anomaly_count ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS; + sp->count = n.count; + sp->anomaly_count = n.anomaly_count; + sp->min = n.min_value; + sp->max = n.max_value; + sp->sum = n.sum_value; + + return true; + } + case PAGE_GORILLA_METRICS: { + pgdc->position++; + + uint32_t n = 666666666; + bool ok = gorilla_reader_read(&pgdc->gr, &n); + if (ok) { + sp->min = sp->max = sp->sum = unpack_storage_number(n); + sp->flags = (SN_FLAGS)(n & SN_USER_FLAGS); + sp->count = 1; + sp->anomaly_count = is_storage_number_anomalous(n) ? 1 : 0; + } else { + storage_point_empty(*sp, sp->start_time_s, sp->end_time_s); + } + + return ok; + } + default: { + static bool logged = false; + if (!logged) + { + netdata_log_error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", pgd_type(pgdc->pgd)); + logged = true; + } + + storage_point_empty(*sp, sp->start_time_s, sp->end_time_s); + return false; + } + } +} diff --git a/database/engine/page.h b/database/engine/page.h new file mode 100644 index 000000000..32c87c580 --- /dev/null +++ b/database/engine/page.h @@ -0,0 +1,58 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef DBENGINE_PAGE_H +#define DBENGINE_PAGE_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "libnetdata/libnetdata.h" + +typedef struct pgd_cursor { + struct pgd *pgd; + uint32_t position; + uint32_t slots; + + gorilla_reader_t gr; +} PGDC; + +#include "rrdengine.h" + +typedef struct pgd PGD; + +#define PGD_EMPTY (PGD *)(-1) + +void pgd_init_arals(void); + +PGD *pgd_create(uint8_t type, uint32_t slots); +PGD *pgd_create_from_disk_data(uint8_t type, void *base, uint32_t size); +void pgd_free(PGD *pg); + +uint32_t pgd_type(PGD *pg); +bool pgd_is_empty(PGD *pg); +uint32_t pgd_slots_used(PGD *pg); + +uint32_t pgd_memory_footprint(PGD *pg); +uint32_t pgd_disk_footprint(PGD *pg); + +void pgd_copy_to_extent(PGD *pg, uint8_t *dst, uint32_t dst_size); + +void pgd_append_point(PGD *pg, + usec_t point_in_time_ut, + NETDATA_DOUBLE n, + NETDATA_DOUBLE min_value, + NETDATA_DOUBLE max_value, + uint16_t count, + uint16_t anomaly_count, + SN_FLAGS flags, + uint32_t expected_slot); + +void pgdc_reset(PGDC *pgdc, PGD *pgd, uint32_t position); +bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *sp); + +#ifdef __cplusplus +} +#endif + +#endif // DBENGINE_PAGE_H diff --git a/database/engine/page_test.cc b/database/engine/page_test.cc new file mode 100644 index 000000000..d61299bc4 --- /dev/null +++ b/database/engine/page_test.cc @@ -0,0 +1,405 @@ +#include "page.h" +#include "page_test.h" + +#ifdef HAVE_GTEST + +#include <gtest/gtest.h> +#include <limits> +#include <random> + +bool operator==(const STORAGE_POINT lhs, const STORAGE_POINT rhs) { + if (lhs.min != rhs.min) + return false; + + if (lhs.max != rhs.max) + return false; + + if (lhs.sum != rhs.sum) + return false; + + if (lhs.start_time_s != rhs.start_time_s) + return false; + + if (lhs.end_time_s != rhs.end_time_s) + return false; + + if (lhs.count != rhs.count) + return false; + + if (lhs.flags != rhs.flags) + return false; + + return true; +} + +// TODO: use value-parameterized tests +// http://google.github.io/googletest/advanced.html#value-parameterized-tests +static uint8_t page_type = PAGE_GORILLA_METRICS; + +static size_t slots_for_page(size_t n) { + switch (page_type) { + case PAGE_METRICS: + return 1024; + case PAGE_GORILLA_METRICS: + return n; + default: + fatal("Slots requested for unsupported page: %uc", page_type); + } +} + +TEST(PGD, EmptyOrNull) { + PGD *pg = NULL; + + PGDC cursor; + STORAGE_POINT sp; + + EXPECT_TRUE(pgd_is_empty(pg)); + EXPECT_EQ(pgd_slots_used(pg), 0); + EXPECT_EQ(pgd_memory_footprint(pg), 0); + EXPECT_EQ(pgd_disk_footprint(pg), 0); + + pgdc_reset(&cursor, pg, 0); + EXPECT_FALSE(pgdc_get_next_point(&cursor, 0, &sp)); + + pgd_free(pg); + + pg = PGD_EMPTY; + + EXPECT_TRUE(pgd_is_empty(pg)); + EXPECT_EQ(pgd_slots_used(pg), 0); + EXPECT_EQ(pgd_memory_footprint(pg), 0); + EXPECT_EQ(pgd_disk_footprint(pg), 0); + EXPECT_FALSE(pgdc_get_next_point(&cursor, 0, &sp)); + + pgdc_reset(&cursor, pg, 0); + EXPECT_FALSE(pgdc_get_next_point(&cursor, 0, &sp)); + + pgd_free(pg); +} + +TEST(PGD, Create) { + size_t slots = slots_for_page(1024 * 1024); + PGD *pg = pgd_create(page_type, slots); + + EXPECT_EQ(pgd_type(pg), page_type); + EXPECT_TRUE(pgd_is_empty(pg)); + EXPECT_EQ(pgd_slots_used(pg), 0); + + for (size_t i = 0; i != slots; i++) { + pgd_append_point(pg, i, i, 0, 0, 1, 1, SN_DEFAULT_FLAGS, i); + EXPECT_FALSE(pgd_is_empty(pg)); + } + EXPECT_EQ(pgd_slots_used(pg), slots); + + EXPECT_DEATH( + pgd_append_point(pg, slots, slots, 0, 0, 1, 1, SN_DEFAULT_FLAGS, slots), + ".*" + ); + + pgd_free(pg); +} + +TEST(PGD, CursorFullPage) { + size_t slots = slots_for_page(1024 * 1024); + PGD *pg = pgd_create(page_type, slots); + + for (size_t slot = 0; slot != slots; slot++) + pgd_append_point(pg, slot, slot, 0, 0, 1, 1, SN_DEFAULT_FLAGS, slot); + + for (size_t i = 0; i != 2; i++) { + PGDC cursor; + pgdc_reset(&cursor, pg, 0); + + STORAGE_POINT sp; + for (size_t slot = 0; slot != slots; slot++) { + EXPECT_TRUE(pgdc_get_next_point(&cursor, slot, &sp)); + + EXPECT_EQ(slot, static_cast<size_t>(sp.min)); + EXPECT_EQ(sp.min, sp.max); + EXPECT_EQ(sp.min, sp.sum); + EXPECT_EQ(sp.count, 1); + EXPECT_EQ(sp.anomaly_count, 0); + } + + EXPECT_FALSE(pgdc_get_next_point(&cursor, slots, &sp)); + } + + for (size_t i = 0; i != 2; i++) { + PGDC cursor; + pgdc_reset(&cursor, pg, slots / 2); + + STORAGE_POINT sp; + for (size_t slot = slots / 2; slot != slots; slot++) { + EXPECT_TRUE(pgdc_get_next_point(&cursor, slot, &sp)); + + EXPECT_EQ(slot, static_cast<size_t>(sp.min)); + EXPECT_EQ(sp.min, sp.max); + EXPECT_EQ(sp.min, sp.sum); + EXPECT_EQ(sp.count, 1); + EXPECT_EQ(sp.anomaly_count, 0); + } + + EXPECT_FALSE(pgdc_get_next_point(&cursor, slots, &sp)); + } + + // out of bounds seek + { + PGDC cursor; + pgdc_reset(&cursor, pg, 2 * slots); + + STORAGE_POINT sp; + EXPECT_FALSE(pgdc_get_next_point(&cursor, 2 * slots, &sp)); + } + + pgd_free(pg); +} + +TEST(PGD, CursorHalfPage) { + size_t slots = slots_for_page(1024 * 1024); + PGD *pg = pgd_create(page_type, slots); + + PGDC cursor; + STORAGE_POINT sp; + + // fill the 1st half of the page + for (size_t slot = 0; slot != slots / 2; slot++) + pgd_append_point(pg, slot, slot, 0, 0, 1, 1, SN_DEFAULT_FLAGS, slot); + + pgdc_reset(&cursor, pg, 0); + + for (size_t slot = 0; slot != slots / 2; slot++) { + EXPECT_TRUE(pgdc_get_next_point(&cursor, slot, &sp)); + + EXPECT_EQ(slot, static_cast<size_t>(sp.min)); + EXPECT_EQ(sp.min, sp.max); + EXPECT_EQ(sp.min, sp.sum); + EXPECT_EQ(sp.count, 1); + EXPECT_EQ(sp.anomaly_count, 0); + } + EXPECT_FALSE(pgdc_get_next_point(&cursor, slots / 2, &sp)); + + // reset pgdc to the end of the page, we should not be getting more + // points even if the page has grown in between. + + pgdc_reset(&cursor, pg, slots / 2); + + for (size_t slot = slots / 2; slot != slots; slot++) + pgd_append_point(pg, slot, slot, 0, 0, 1, 1, SN_DEFAULT_FLAGS, slot); + + for (size_t slot = slots / 2; slot != slots; slot++) + EXPECT_FALSE(pgdc_get_next_point(&cursor, slot, &sp)); + + EXPECT_FALSE(pgdc_get_next_point(&cursor, slots, &sp)); + + pgd_free(pg); +} + +TEST(PGD, MemoryFootprint) { + size_t slots = slots_for_page(1024 * 1024); + PGD *pg = pgd_create(page_type, slots); + + uint32_t footprint = 0; + switch (pgd_type(pg)) { + case PAGE_METRICS: + footprint = slots * sizeof(uint32_t); + break; + case PAGE_GORILLA_METRICS: + footprint = 128 * sizeof(uint32_t); + break; + default: + fatal("Uknown page type: %uc", pgd_type(pg)); + } + EXPECT_NEAR(pgd_memory_footprint(pg), footprint, 128); + + std::random_device rand_dev; + std::mt19937 gen(rand_dev()); + std::uniform_int_distribution<uint32_t> distr(std::numeric_limits<uint32_t>::min(), + std::numeric_limits<uint32_t>::max()); // define the range + + for (size_t slot = 0; slot != slots; slot++) { + uint32_t n = distr(gen); + pgd_append_point(pg, slot, n, 0, 0, 1, 1, SN_DEFAULT_FLAGS, slot); + } + + footprint = slots * sizeof(uint32_t); + + uint32_t abs_error = 0; + switch (pgd_type(pg)) { + case PAGE_METRICS: + abs_error = 128; + break; + case PAGE_GORILLA_METRICS: + abs_error = footprint / 10; + break; + default: + fatal("Uknown page type: %uc", pgd_type(pg)); + } + + EXPECT_NEAR(pgd_memory_footprint(pg), footprint, abs_error); +} + +TEST(PGD, DiskFootprint) { + size_t slots = slots_for_page(1024 * 1024); + PGD *pg = pgd_create(page_type, slots); + + std::random_device rand_dev; + std::mt19937 gen(rand_dev()); + std::uniform_int_distribution<uint32_t> distr(std::numeric_limits<uint32_t>::min(), + std::numeric_limits<uint32_t>::max()); // define the range + + size_t used_slots = 16; + + for (size_t slot = 0; slot != used_slots; slot++) { + uint32_t n = distr(gen); + pgd_append_point(pg, slot, n, 0, 0, 1, 1, SN_DEFAULT_FLAGS, slot); + } + + uint32_t footprint = 0; + switch (pgd_type(pg)) { + case PAGE_METRICS: + footprint = used_slots * sizeof(uint32_t); + break; + case PAGE_GORILLA_METRICS: + footprint = 128 * sizeof(uint32_t); + break; + default: + fatal("Uknown page type: %uc", pgd_type(pg)); + } + EXPECT_EQ(pgd_disk_footprint(pg), footprint); + + pgd_free(pg); + + pg = pgd_create(page_type, slots); + + used_slots = 128 + 64; + + for (size_t slot = 0; slot != used_slots; slot++) { + uint32_t n = distr(gen); + pgd_append_point(pg, slot, n, 0, 0, 1, 1, SN_DEFAULT_FLAGS, slot); + } + + switch (pgd_type(pg)) { + case PAGE_METRICS: + footprint = used_slots * sizeof(uint32_t); + break; + case PAGE_GORILLA_METRICS: + footprint = 2 * (128 * sizeof(uint32_t)); + break; + default: + fatal("Uknown page type: %uc", pgd_type(pg)); + } + EXPECT_EQ(pgd_disk_footprint(pg), footprint); + + pgd_free(pg); +} + +TEST(PGD, CopyToExtent) { + size_t slots = slots_for_page(1024 * 1024); + PGD *pg_collector = pgd_create(page_type, slots); + + uint32_t value = 666; + pgd_append_point(pg_collector, 0, value, 0, 0, 1, 0, SN_DEFAULT_FLAGS, 0); + + uint32_t size_in_bytes = pgd_disk_footprint(pg_collector); + EXPECT_EQ(size_in_bytes, 512); + + uint32_t size_in_words = size_in_bytes / sizeof(uint32_t); + alignas(sizeof(uintptr_t)) uint32_t disk_buffer[size_in_words]; + + for (size_t i = 0; i != size_in_words; i++) { + disk_buffer[i] = std::numeric_limits<uint32_t>::max(); + } + + pgd_copy_to_extent(pg_collector, (uint8_t *) &disk_buffer[0], size_in_bytes); + + EXPECT_EQ(disk_buffer[0], NULL); + EXPECT_EQ(disk_buffer[1], NULL); + EXPECT_EQ(disk_buffer[2], 1); + EXPECT_EQ(disk_buffer[3], 32); + storage_number sn = pack_storage_number(value, SN_DEFAULT_FLAGS); + EXPECT_EQ(disk_buffer[4], sn); + + // make sure the rest of the page is 0'ed so that it's amenable to compression + for (size_t i = 5; i != size_in_words; i++) + EXPECT_EQ(disk_buffer[i], 0); + + pgd_free(pg_collector); +} + +TEST(PGD, Roundtrip) { + size_t slots = slots_for_page(1024 * 1024); + PGD *pg_collector = pgd_create(page_type, slots); + + for (size_t i = 0; i != slots; i++) + pgd_append_point(pg_collector, i, i, 0, 0, 1, 1, SN_DEFAULT_FLAGS, i); + + uint32_t size_in_bytes = pgd_disk_footprint(pg_collector); + uint32_t size_in_words = size_in_bytes / sizeof(uint32_t); + + alignas(sizeof(uintptr_t)) uint32_t disk_buffer[size_in_words]; + for (size_t i = 0; i != size_in_words; i++) + disk_buffer[i] = std::numeric_limits<uint32_t>::max(); + + pgd_copy_to_extent(pg_collector, (uint8_t *) &disk_buffer[0], size_in_bytes); + + PGD *pg_disk = pgd_create_from_disk_data(page_type, &disk_buffer[0], size_in_bytes); + EXPECT_EQ(pgd_slots_used(pg_disk), slots); + + // Expected memory footprint is equal to the disk footprint + a couple + // bytes for the PGD metadata. + EXPECT_NEAR(pgd_memory_footprint(pg_disk), size_in_bytes, 128); + + // Do not allow calling disk footprint for pages created from disk. + EXPECT_DEATH(pgd_disk_footprint(pg_disk), ".*"); + + for (size_t i = 0; i != 10; i++) { + PGDC cursor_collector; + PGDC cursor_disk; + + pgdc_reset(&cursor_collector, pg_collector, i * 1024); + pgdc_reset(&cursor_disk, pg_disk, i * 1024); + + STORAGE_POINT sp_collector = {}; + STORAGE_POINT sp_disk = {}; + + for (size_t slot = i * 1024; slot != slots; slot++) { + EXPECT_TRUE(pgdc_get_next_point(&cursor_collector, slot, &sp_collector)); + EXPECT_TRUE(pgdc_get_next_point(&cursor_disk, slot, &sp_disk)); + + EXPECT_EQ(sp_collector, sp_disk); + } + + EXPECT_FALSE(pgdc_get_next_point(&cursor_collector, slots, &sp_collector)); + EXPECT_FALSE(pgdc_get_next_point(&cursor_disk, slots, &sp_disk)); + } + + pgd_free(pg_disk); + pgd_free(pg_collector); +} + +int pgd_test(int argc, char *argv[]) +{ + // Dummy/necessary initialization stuff + PGC *dummy_cache = pgc_create("pgd-tests-cache", 32 * 1024 * 1024, NULL, 64, NULL, NULL, + 10, 10, 1000, 10, PGC_OPTIONS_NONE, 1, 11); + pgd_init_arals(); + + ::testing::InitGoogleTest(&argc, argv); + int rc = RUN_ALL_TESTS(); + + pgc_destroy(dummy_cache); + + return rc; +} + +#else // HAVE_GTEST + +int pgd_test(int argc, char *argv[]) +{ + (void) argc; + (void) argv; + fprintf(stderr, "Can not run PGD tests because the agent was not build with support for google tests.\n"); + return 0; +} + +#endif // HAVE_GTEST diff --git a/database/engine/page_test.h b/database/engine/page_test.h new file mode 100644 index 000000000..30837f0ab --- /dev/null +++ b/database/engine/page_test.h @@ -0,0 +1,14 @@ +#ifndef PAGE_TEST_H +#define PAGE_TEST_H + +#ifdef __cplusplus +extern "C" { +#endif + +int pgd_test(int argc, char *argv[]); + +#ifdef __cplusplus +} +#endif + +#endif /* PAGE_TEST_H */ diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index c608c3270..dab9cdd0d 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -12,8 +12,9 @@ struct rrdeng_cache_efficiency_stats rrdeng_cache_efficiency_stats = {}; static void main_cache_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused) { // Release storage associated with the page - dbengine_page_free(entry.data, entry.size); + pgd_free(entry.data); } + static void main_cache_flush_dirty_page_init_callback(PGC *cache __maybe_unused, Word_t section) { struct rrdengine_instance *ctx = (struct rrdengine_instance *) section; @@ -28,8 +29,6 @@ static void main_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_ struct rrdengine_instance *ctx = (struct rrdengine_instance *) entries_array[0].section; - size_t bytes_per_point = CTX_POINT_SIZE_BYTES(ctx); - struct page_descr_with_data *base = NULL; for (size_t Index = 0 ; Index < entries; Index++) { @@ -42,21 +41,15 @@ static void main_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_ descr->start_time_ut = start_time_s * USEC_PER_SEC; descr->end_time_ut = end_time_s * USEC_PER_SEC; descr->update_every_s = entries_array[Index].update_every_s; - descr->type = ctx->config.page_type; - descr->page_length = (end_time_s - (start_time_s - descr->update_every_s)) / descr->update_every_s * bytes_per_point; + descr->pgd = pgc_page_data(pages_array[Index]); + descr->type = pgd_type(descr->pgd); + descr->page_length = pgd_disk_footprint(descr->pgd); - if(descr->page_length > entries_array[Index].size) { - descr->page_length = entries_array[Index].size; - - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "DBENGINE: page exceeds the maximum size, adjusting it to max."); - } - - descr->page = pgc_page_data(pages_array[Index]); DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(base, descr, link.prev, link.next); - internal_fatal(descr->page_length > RRDENG_BLOCK_SIZE, "DBENGINE: faulty page length calculation"); + // TODO: ask @stelfrag/@ktsaou about this. + // internal_fatal(descr->page_length > RRDENG_BLOCK_SIZE, "DBENGINE: faulty page length calculation"); } struct completion completion; @@ -254,7 +247,6 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin time_t page_start_time_s = pgc_page_start_time_s(page); time_t page_end_time_s = pgc_page_end_time_s(page); time_t page_update_every_s = pgc_page_update_every_s(page); - size_t page_length = pgc_page_data_size(cache, page); if(!page_update_every_s) page_update_every_s = dt_s; @@ -277,24 +269,10 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin if (!PValue || PValue == PJERR) fatal("DBENGINE: corrupted judy array in %s()", __FUNCTION__ ); - if (unlikely(*PValue)) { - struct page_details *pd = *PValue; - UNUSED(pd); - -// internal_error( -// pd->first_time_s != page_first_time_s || -// pd->last_time_s != page_last_time_s || -// pd->update_every_s != page_update_every_s, -// "DBENGINE: duplicate page with different retention in %s cache " -// "1st: %ld to %ld, ue %u, size %u " -// "2nd: %ld to %ld, ue %ld size %zu " -// "- ignoring the second", -// cache == open_cache ? "open" : "main", -// pd->first_time_s, pd->last_time_s, pd->update_every_s, pd->page_length, -// page_first_time_s, page_last_time_s, page_update_every_s, page_length); - + if (unlikely(*PValue)) + // already exists in our list pgc_page_release(cache, page); - } + else { internal_fatal(pgc_page_metric(page) != metric_id, "Wrong metric id in page found in cache"); @@ -304,7 +282,6 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin pd->metric_id = metric_id; pd->first_time_s = page_start_time_s; pd->last_time_s = page_end_time_s; - pd->page_length = page_length; pd->update_every_s = (uint32_t) page_update_every_s; pd->page = (open_cache_mode) ? NULL : page; pd->status |= tags; @@ -312,7 +289,7 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin if((pd->page)) { pd->status |= PDC_PAGE_READY | PDC_PAGE_PRELOADED; - if(pgc_page_data(page) == DBENGINE_EMPTY_PAGE) + if(pgd_is_empty(pgc_page_data(page))) pd->status |= PDC_PAGE_EMPTY; } @@ -369,7 +346,7 @@ static void pgc_inject_gap(struct rrdengine_instance *ctx, METRIC *metric, time_ .end_time_s = MIN(end_time_s, db_last_time_s), .update_every_s = 0, .size = 0, - .data = DBENGINE_EMPTY_PAGE, + .data = PGD_EMPTY, }; if(page_entry.start_time_s >= page_entry.end_time_s) @@ -478,7 +455,7 @@ static size_t list_has_time_gaps( pd->status &= ~PDC_PAGE_DISK_PENDING; pd->status |= PDC_PAGE_READY | PDC_PAGE_PRELOADED | PDC_PAGE_PRELOADED_PASS4; - if(pgc_page_data(pd->page) == DBENGINE_EMPTY_PAGE) + if(pgd_is_empty(pgc_page_data(pd->page))) pd->status |= PDC_PAGE_EMPTY; } @@ -642,7 +619,6 @@ void add_page_details_from_journal_v2(PGC_PAGE *page, void *JudyL_pptr) { pd->first_time_s = pgc_page_start_time_s(page); pd->last_time_s = pgc_page_end_time_s(page); pd->datafile.ptr = datafile; - pd->page_length = ei->page_length; pd->update_every_s = (uint32_t) pgc_page_update_every_s(page); pd->metric_id = metric_id; pd->status |= PDC_PAGE_DISK_PENDING | PDC_PAGE_SOURCE_JOURNAL_V2 | PDC_PAGE_DATAFILE_ACQUIRED; @@ -917,7 +893,7 @@ struct pgc_page *pg_cache_lookup_next( } } - if(page && pgc_page_data(page) == DBENGINE_EMPTY_PAGE) + if(page && pgd_is_empty(pgc_page_data(page))) pdc_page_status_set(pd, PDC_PAGE_EMPTY); if(!page || pdc_page_status_check(pd, PDC_PAGE_QUERY_GLOBAL_SKIP_LIST | PDC_PAGE_EMPTY)) { @@ -930,7 +906,6 @@ struct pgc_page *pg_cache_lookup_next( time_t page_start_time_s = pgc_page_start_time_s(page); time_t page_end_time_s = pgc_page_end_time_s(page); time_t page_update_every_s = pgc_page_update_every_s(page); - size_t page_length = pgc_page_data_size(main_cache, page); if(unlikely(page_start_time_s == INVALID_TIME || page_end_time_s == INVALID_TIME)) { __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_zero_time_skipped, 1, __ATOMIC_RELAXED); @@ -939,13 +914,6 @@ struct pgc_page *pg_cache_lookup_next( pd->page = page = NULL; continue; } - else if(page_length > RRDENG_BLOCK_SIZE) { - __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_size_skipped, 1, __ATOMIC_RELAXED); - pgc_page_to_clean_evict_or_release(main_cache, page); - pdc_page_status_set(pd, PDC_PAGE_INVALID | PDC_PAGE_RELEASED); - pd->page = page = NULL; - continue; - } else { if (unlikely(page_update_every_s <= 0 || page_update_every_s > 86400)) { __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_update_every_fixed, 1, __ATOMIC_RELAXED); @@ -953,7 +921,7 @@ struct pgc_page *pg_cache_lookup_next( pd->update_every_s = (uint32_t) page_update_every_s; } - size_t entries_by_size = page_entries_by_size(page_length, CTX_POINT_SIZE_BYTES(ctx)); + size_t entries_by_size = pgd_slots_used(pgc_page_data(page)); size_t entries_by_time = page_entries_by_time(page_start_time_s, page_end_time_s, page_update_every_s); if(unlikely(entries_by_size < entries_by_time)) { time_t fixed_page_end_time_s = (time_t)(page_start_time_s + (entries_by_size - 1) * page_update_every_s); diff --git a/database/engine/pagecache.h b/database/engine/pagecache.h index 5242db89e..dbcbea53a 100644 --- a/database/engine/pagecache.h +++ b/database/engine/pagecache.h @@ -27,7 +27,7 @@ struct page_descr_with_data { uint8_t type; uint32_t update_every_s; uint32_t page_length; - uint8_t *page; + struct pgd *pgd; struct { struct page_descr_with_data *prev; diff --git a/database/engine/pdc.c b/database/engine/pdc.c index 7da568787..5fe205e64 100644 --- a/database/engine/pdc.c +++ b/database/engine/pdc.c @@ -629,14 +629,33 @@ void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags) { } inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error) { + time_t start_time_s = (time_t) (descr->start_time_ut / USEC_PER_SEC); + + time_t end_time_s; + size_t entries; + + switch (descr->type) { + case PAGE_METRICS: + case PAGE_TIER: + end_time_s = descr->end_time_ut / USEC_PER_SEC; + entries = 0; + break; + case PAGE_GORILLA_METRICS: + end_time_s = start_time_s + descr->gorilla.delta_time_s; + entries = descr->gorilla.entries; + break; + default: + fatal("Unknown page type: %uc\n", descr->type); + } + return validate_page( (uuid_t *)descr->uuid, - (time_t) (descr->start_time_ut / USEC_PER_SEC), - (time_t) (descr->end_time_ut / USEC_PER_SEC), + start_time_s, + end_time_s, 0, descr->page_length, descr->type, - 0, + entries, now_s, overwrite_zero_update_every_s, have_read_error, @@ -666,13 +685,25 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( .is_valid = true, }; - // always calculate entries by size vd.point_size = page_type_size[vd.type]; - vd.entries = page_entries_by_size(vd.page_length, vd.point_size); - - // allow to be called without entries (when loading pages from disk) - if(!entries) - entries = vd.entries; + switch (page_type) { + case PAGE_METRICS: + case PAGE_TIER: + // always calculate entries by size + vd.entries = page_entries_by_size(vd.page_length, vd.point_size); + + // allow to be called without entries (when loading pages from disk) + if(!entries) + entries = vd.entries; + break; + case PAGE_GORILLA_METRICS: + internal_fatal(entries == 0, "0 number of entries found on gorilla page"); + vd.entries = entries; + break; + default: + // TODO: should set vd.is_valid false instead? + fatal("Unknown page type: %uc", page_type); + } // allow to be called without update every (when loading pages from disk) if(!update_every_s) { @@ -687,19 +718,26 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( bool updated = false; + size_t max_page_length = RRDENG_BLOCK_SIZE; + + // If gorilla can not compress the data we might end up needing slightly more + // than 4KiB. However, gorilla pages extend the page length by increments of + // 512 bytes. + max_page_length += ((page_type == PAGE_GORILLA_METRICS) * GORILLA_BUFFER_SIZE); + if( have_read_error || vd.page_length == 0 || - vd.page_length > RRDENG_BLOCK_SIZE || + vd.page_length > max_page_length || vd.start_time_s > vd.end_time_s || (now_s && vd.end_time_s > now_s) || vd.start_time_s <= 0 || vd.end_time_s <= 0 || vd.update_every_s < 0 || (vd.start_time_s == vd.end_time_s && vd.entries > 1) || - (vd.update_every_s == 0 && vd.entries > 1) - ) + (vd.update_every_s == 0 && vd.entries > 1)) + { vd.is_valid = false; - + } else { if(unlikely(vd.entries != entries || vd.update_every_s != update_every_s)) updated = true; @@ -734,7 +772,7 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( if(unlikely(!vd.is_valid || updated)) { #ifndef NETDATA_INTERNAL_CHECKS - error_limit_static_global_var(erl, 1, 0); + nd_log_limit_static_global_var(erl, 1, 0); #endif char uuid_str[UUID_STR_LEN + 1]; uuid_unparse(*uuid, uuid_str); @@ -750,7 +788,7 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( #ifdef NETDATA_INTERNAL_CHECKS internal_error(true, #else - error_limit(&erl, + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, #endif "DBENGINE: metric '%s' %s invalid page of type %u " "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s)", @@ -770,7 +808,7 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( #ifdef NETDATA_INTERNAL_CHECKS internal_error(true, #else - error_limit(&erl, + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, #endif "DBENGINE: metric '%s' %s page of type %u " "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s), " @@ -832,7 +870,15 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL * if (descr) { start_time_s = (time_t)(descr->start_time_ut / USEC_PER_SEC); - end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC); + switch (descr->type) { + case PAGE_METRICS: + case PAGE_TIER: + end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC); + break; + case PAGE_GORILLA_METRICS: + end_time_s = (time_t) start_time_s + (descr->gorilla.delta_time_s); + break; + } uuid_unparse_lower(descr->uuid, uuid); used_descr = true; } @@ -869,8 +915,8 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL * if(end_time_s) log_date(end_time_str, LOG_DATE_LENGTH, end_time_s); - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, "DBENGINE: error while reading extent from datafile %u of tier %d, at offset %" PRIu64 " (%u bytes) " "%s from %ld (%s) to %ld (%s) %s%s: " "%s", @@ -952,7 +998,9 @@ static bool epdl_populate_pages_from_extent_data( uncompressed_payload_length = 0; for (i = 0; i < count; ++i) { size_t page_length = header->descr[i].page_length; - if(page_length > RRDENG_BLOCK_SIZE) { + if (page_length > RRDENG_BLOCK_SIZE && (header->descr[i].type != PAGE_GORILLA_METRICS || + (header->descr[i].type == PAGE_GORILLA_METRICS && + (page_length - RRDENG_BLOCK_SIZE) % GORILLA_BUFFER_SIZE))) { have_read_error = true; break; } @@ -993,7 +1041,7 @@ static bool epdl_populate_pages_from_extent_data( if(!page_length || !start_time_s) { char log[200 + 1]; - snprintfz(log, 200, "page %u (out of %u) is EMPTY", i, count); + snprintfz(log, sizeof(log) - 1, "page %u (out of %u) is EMPTY", i, count); epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log); continue; } @@ -1002,7 +1050,7 @@ static bool epdl_populate_pages_from_extent_data( Word_t metric_id = (Word_t)metric; if(!metric) { char log[200 + 1]; - snprintfz(log, 200, "page %u (out of %u) has unknown UUID", i, count); + snprintfz(log, sizeof(log) - 1, "page %u (out of %u) has unknown UUID", i, count); epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log); continue; } @@ -1020,32 +1068,34 @@ static bool epdl_populate_pages_from_extent_data( if(worker) worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_ALLOCATION); - void *page_data; + PGD *pgd; if (unlikely(!vd.is_valid)) { - page_data = DBENGINE_EMPTY_PAGE; + pgd = PGD_EMPTY; stats_load_invalid_page++; } else { if (RRD_NO_COMPRESSION == header->compression_algorithm) { - page_data = dbengine_page_alloc(vd.page_length); - memcpy(page_data, data + payload_offset + page_offset, (size_t) vd.page_length); + pgd = pgd_create_from_disk_data(header->descr[i].type, + data + payload_offset + page_offset, + vd.page_length); stats_load_uncompressed++; } else { if (unlikely(page_offset + vd.page_length > uncompressed_payload_length)) { char log[200 + 1]; - snprintfz(log, 200, "page %u (out of %u) offset %u + page length %zu, " + snprintfz(log, sizeof(log) - 1, "page %u (out of %u) offset %u + page length %zu, " "exceeds the uncompressed buffer size %u", i, count, page_offset, vd.page_length, uncompressed_payload_length); epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log); - page_data = DBENGINE_EMPTY_PAGE; + pgd = PGD_EMPTY; stats_load_invalid_page++; } else { - page_data = dbengine_page_alloc(vd.page_length); - memcpy(page_data, uncompressed_buf + page_offset, vd.page_length); + pgd = pgd_create_from_disk_data(header->descr[i].type, + uncompressed_buf + page_offset, + vd.page_length); stats_load_compressed++; } } @@ -1061,14 +1111,14 @@ static bool epdl_populate_pages_from_extent_data( .start_time_s = vd.start_time_s, .end_time_s = vd.end_time_s, .update_every_s = (uint32_t) vd.update_every_s, - .size = (size_t) ((page_data == DBENGINE_EMPTY_PAGE) ? 0 : vd.page_length), - .data = page_data + .size = pgd_memory_footprint(pgd), // the footprint of the entire PGD, for accurate memory management + .data = pgd, }; bool added = true; PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, &added); if (false == added) { - dbengine_page_free(page_data, vd.page_length); + pgd_free(pgd); stats_cache_hit_while_inserting++; stats_data_from_main_cache++; } @@ -1081,8 +1131,7 @@ static bool epdl_populate_pages_from_extent_data( pgc_page_dup(main_cache, page); pd->page = page; - pd->page_length = pgc_page_data_size(main_cache, page); - pdc_page_status_set(pd, PDC_PAGE_READY | tags | ((page_data == DBENGINE_EMPTY_PAGE) ? PDC_PAGE_EMPTY : 0)); + pdc_page_status_set(pd, PDC_PAGE_READY | tags | (pgd_is_empty(pgd) ? PDC_PAGE_EMPTY : 0)); pd = pd->load.next; } while(pd); diff --git a/database/engine/rrddiskprotocol.h b/database/engine/rrddiskprotocol.h index 5b4be9498..86b41f0b3 100644 --- a/database/engine/rrddiskprotocol.h +++ b/database/engine/rrddiskprotocol.h @@ -3,6 +3,8 @@ #ifndef NETDATA_RRDDISKPROTOCOL_H #define NETDATA_RRDDISKPROTOCOL_H +#include <stdint.h> + #define RRDENG_BLOCK_SIZE (4096) #define RRDFILE_ALIGNMENT RRDENG_BLOCK_SIZE @@ -36,7 +38,8 @@ struct rrdeng_df_sb { */ #define PAGE_METRICS (0) #define PAGE_TIER (1) -#define PAGE_TYPE_MAX 1 // Maximum page type (inclusive) +#define PAGE_GORILLA_METRICS (2) +#define PAGE_TYPE_MAX 2 // Maximum page type (inclusive) /* * Data file page descriptor @@ -47,7 +50,14 @@ struct rrdeng_extent_page_descr { uint8_t uuid[UUID_SZ]; uint32_t page_length; uint64_t start_time_ut; - uint64_t end_time_ut; + union { + struct { + uint32_t entries; + uint32_t delta_time_s; + } gorilla __attribute__((packed)); + + uint64_t end_time_ut; + }; } __attribute__ ((packed)); /* diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 99257b79d..b82cc1ad1 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -40,6 +40,7 @@ struct rrdeng_main { uv_async_t async; uv_timer_t timer; pid_t tid; + bool shutdown; size_t flushes_running; size_t evictions_running; @@ -577,55 +578,6 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) { // ---------------------------------------------------------------------------- -struct { - ARAL *aral[RRD_STORAGE_TIERS]; -} dbengine_page_alloc_globals = {}; - -static inline ARAL *page_size_lookup(size_t size) { - for(size_t tier = 0; tier < storage_tiers ;tier++) - if(size == tier_page_size[tier]) - return dbengine_page_alloc_globals.aral[tier]; - - return NULL; -} - -static void dbengine_page_alloc_init(void) { - for(size_t i = storage_tiers; i > 0 ;i--) { - size_t tier = storage_tiers - i; - - char buf[20 + 1]; - snprintfz(buf, 20, "tier%zu-pages", tier); - - dbengine_page_alloc_globals.aral[tier] = aral_create( - buf, - tier_page_size[tier], - 64, - 512 * tier_page_size[tier], - pgc_aral_statistics(), - NULL, NULL, false, false); - } -} - -void *dbengine_page_alloc(size_t size) { - ARAL *ar = page_size_lookup(size); - if(ar) return aral_mallocz(ar); - - return mallocz(size); -} - -void dbengine_page_free(void *page, size_t size __maybe_unused) { - if(unlikely(!page || page == DBENGINE_EMPTY_PAGE)) - return; - - ARAL *ar = page_size_lookup(size); - if(ar) - aral_freez(ar, page); - else - freez(page); -} - -// ---------------------------------------------------------------------------- - void *dbengine_extent_alloc(size_t size) { void *extent = mallocz(size); return extent; @@ -890,12 +842,25 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id); header->descr[i].page_length = descr->page_length; header->descr[i].start_time_ut = descr->start_time_ut; - header->descr[i].end_time_ut = descr->end_time_ut; + + switch (descr->type) { + case PAGE_METRICS: + case PAGE_TIER: + header->descr[i].end_time_ut = descr->end_time_ut; + break; + case PAGE_GORILLA_METRICS: + header->descr[i].gorilla.delta_time_s = (uint32_t) ((descr->end_time_ut - descr->start_time_ut) / USEC_PER_SEC); + header->descr[i].gorilla.entries = pgd_slots_used(descr->pgd); + break; + default: + fatal("Unknown page type: %uc", descr->type); + } + pos += sizeof(header->descr[i]); } for (i = 0 ; i < count ; ++i) { descr = xt_io_descr->descr_array[i]; - (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length); + pgd_copy_to_extent(descr->pgd, xt_io_descr->buf + pos, descr->page_length); pos += descr->page_length; } @@ -1381,9 +1346,6 @@ static void after_ctx_shutdown(struct rrdengine_instance *ctx __maybe_unused, vo static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { worker_is_busy(UV_EVENT_DBENGINE_SHUTDOWN); - completion_wait_for(&ctx->quiesce.completion); - completion_destroy(&ctx->quiesce.completion); - bool logged = false; while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED) || __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) { @@ -1436,6 +1398,14 @@ uint64_t rrdeng_target_data_file_size(struct rrdengine_instance *ctx) { bool rrdeng_ctx_exceeded_disk_quota(struct rrdengine_instance *ctx) { + if(!ctx->datafiles.first) + // no datafiles available + return false; + + if(!ctx->datafiles.first->next) + // only 1 datafile available + return false; + uint64_t estimated_disk_space = ctx_current_disk_space_get(ctx) + rrdeng_target_data_file_size(ctx) - (ctx->datafiles.first->prev ? ctx->datafiles.first->prev->pos : 0); @@ -1514,12 +1484,19 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb spinlock_unlock(&datafile->writers.spinlock); if(!available) { - netdata_log_info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno); + nd_log(NDLS_DAEMON, NDLP_NOTICE, + "DBENGINE: journal file %u needs to be indexed, but it has writers working on it - " + "skipping it for now", + datafile->fileno); + datafile = datafile->next; continue; } - netdata_log_info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "DBENGINE: journal file %u is ready to be indexed", + datafile->fileno); + pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type, journalfile_migrate_to_v2_callback, (void *) datafile->journalfile); @@ -1532,7 +1509,10 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb } errno = 0; - internal_error(count, "DBENGINE: journal indexing done; %u files processed", count); + if(count) + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "DBENGINE: journal indexing done; %u files processed", + count); worker_is_idle(); @@ -1628,7 +1608,7 @@ static void dbengine_initialize_structures(void) { rrdeng_query_handle_init(); page_descriptors_init(); extent_buffer_init(); - dbengine_page_alloc_init(); + pgd_init_arals(); extent_io_descriptor_init(); } @@ -1715,6 +1695,7 @@ void dbengine_event_loop(void* arg) { worker_register_job_name(RRDENG_OPCODE_EVICT_INIT, "evict init"); worker_register_job_name(RRDENG_OPCODE_CTX_SHUTDOWN, "ctx shutdown"); worker_register_job_name(RRDENG_OPCODE_CTX_QUIESCE, "ctx quiesce"); + worker_register_job_name(RRDENG_OPCODE_SHUTDOWN_EVLOOP, "dbengine shutdown"); worker_register_job_name(RRDENG_OPCODE_MAX, "get opcode"); @@ -1856,6 +1837,13 @@ void dbengine_event_loop(void* arg) { break; } + case RRDENG_OPCODE_SHUTDOWN_EVLOOP: { + uv_close((uv_handle_t *)&main->async, NULL); + (void) uv_timer_stop(&main->timer); + uv_close((uv_handle_t *)&main->timer, NULL); + shutdown = true; + } + case RRDENG_OPCODE_NOOP: { /* the command queue was empty, do nothing */ break; @@ -1872,18 +1860,7 @@ void dbengine_event_loop(void* arg) { } while (opcode != RRDENG_OPCODE_NOOP); } - /* cleanup operations of the event loop */ - netdata_log_info("DBENGINE: shutting down dbengine thread"); - - /* - * uv_async_send after uv_close does not seem to crash in linux at the moment, - * it is however undocumented behaviour and we need to be aware if this becomes - * an issue in the future. - */ - uv_close((uv_handle_t *)&main->async, NULL); - uv_timer_stop(&main->timer); - uv_close((uv_handle_t *)&main->timer, NULL); - uv_run(&main->loop, UV_RUN_DEFAULT); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Shutting down dbengine thread"); uv_loop_close(&main->loop); worker_unregister(); } diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index 08eaf4128..cd3352f12 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -22,6 +22,7 @@ #include "metric.h" #include "cache.h" #include "pdc.h" +#include "page.h" extern unsigned rrdeng_pages_per_extent; @@ -119,7 +120,6 @@ struct page_details { time_t first_time_s; time_t last_time_s; uint32_t update_every_s; - uint16_t page_length; PDC_PAGE_STATUS status; struct { @@ -190,10 +190,11 @@ struct rrdeng_collect_handle { RRDENG_COLLECT_HANDLE_OPTIONS options; uint8_t type; + struct rrdengine_instance *ctx; struct metric *metric; - struct pgc_page *page; - void *data; - size_t data_size; + struct pgc_page *pgc_page; + struct pgd *page_data; + size_t page_data_size; struct pg_alignment *alignment; uint32_t page_entries_max; uint32_t page_position; // keep track of the current page size, to make sure we don't exceed it @@ -206,7 +207,7 @@ struct rrdeng_query_handle { struct metric *metric; struct pgc_page *page; struct rrdengine_instance *ctx; - storage_number *metric_data; + struct pgd_cursor pgdc; struct page_details_control *pdc; // the request @@ -246,6 +247,7 @@ enum rrdeng_opcode { RRDENG_OPCODE_CTX_SHUTDOWN, RRDENG_OPCODE_CTX_QUIESCE, RRDENG_OPCODE_CTX_POPULATE_MRG, + RRDENG_OPCODE_SHUTDOWN_EVLOOP, RRDENG_OPCODE_CLEANUP, RRDENG_OPCODE_MAX @@ -445,9 +447,6 @@ static inline void ctx_last_flush_fileno_set(struct rrdengine_instance *ctx, uns #define ctx_is_available_for_queries(ctx) (__atomic_load_n(&(ctx)->quiesce.enabled, __ATOMIC_RELAXED) == false && __atomic_load_n(&(ctx)->quiesce.exit_mode, __ATOMIC_RELAXED) == false) -void *dbengine_page_alloc(size_t size); -void dbengine_page_free(void *page, size_t size); - void *dbengine_extent_alloc(size_t size); void dbengine_extent_free(void *extent, size_t size); @@ -491,8 +490,6 @@ typedef struct validated_page_descriptor { bool is_valid; } VALIDATED_PAGE_DESCRIPTOR; -#define DBENGINE_EMPTY_PAGE (void *)(-1) - #define page_entries_by_time(start_time_s, end_time_s, update_every_s) \ ((update_every_s) ? (((end_time_s) - ((start_time_s) - (update_every_s))) / (update_every_s)) : 1) diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index 318a933f1..1ddce5243 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -1,4 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later + +#include "database/engine/rrddiskprotocol.h" #include "rrdengine.h" /* Default global database instance */ @@ -22,10 +24,15 @@ size_t tier_page_size[RRD_STORAGE_TIERS] = {2048, 1024, 192, 192, 192}; size_t tier_page_size[RRD_STORAGE_TIERS] = {4096, 2048, 384, 384, 384}; #endif -#if PAGE_TYPE_MAX != 1 -#error PAGE_TYPE_MAX is not 1 - you need to add allocations here +#if PAGE_TYPE_MAX != 2 +#error PAGE_TYPE_MAX is not 2 - you need to add allocations here #endif -size_t page_type_size[256] = {sizeof(storage_number), sizeof(storage_number_tier1_t)}; + +size_t page_type_size[256] = { + [PAGE_METRICS] = sizeof(storage_number), + [PAGE_TIER] = sizeof(storage_number_tier1_t), + [PAGE_GORILLA_METRICS] = sizeof(storage_number) +}; __attribute__((constructor)) void initialize_multidb_ctx(void) { multidb_ctx[0] = &multidb_ctx_storage_tier0; @@ -198,15 +205,15 @@ static inline void check_and_fix_mrg_update_every(struct rrdeng_collect_handle * static inline bool check_completed_page_consistency(struct rrdeng_collect_handle *handle __maybe_unused) { #ifdef NETDATA_INTERNAL_CHECKS - if (unlikely(!handle->page || !handle->page_entries_max || !handle->page_position || !handle->page_end_time_ut)) + if (unlikely(!handle->pgc_page || !handle->page_entries_max || !handle->page_position || !handle->page_end_time_ut)) return false; struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric); uuid_t *uuid = mrg_metric_uuid(main_mrg, handle->metric); - time_t start_time_s = pgc_page_start_time_s(handle->page); - time_t end_time_s = pgc_page_end_time_s(handle->page); - time_t update_every_s = pgc_page_update_every_s(handle->page); + time_t start_time_s = pgc_page_start_time_s(handle->pgc_page); + time_t end_time_s = pgc_page_end_time_s(handle->pgc_page); + time_t update_every_s = pgc_page_update_every_s(handle->pgc_page); size_t page_length = handle->page_position * CTX_POINT_SIZE_BYTES(ctx); size_t entries = handle->page_position; time_t overwrite_zero_update_every_s = (time_t)(handle->update_every_ut / USEC_PER_SEC); @@ -257,9 +264,11 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri handle = callocz(1, sizeof(struct rrdeng_collect_handle)); handle->common.backend = STORAGE_ENGINE_BACKEND_DBENGINE; handle->metric = metric; - handle->page = NULL; - handle->data = NULL; - handle->data_size = 0; + + handle->pgc_page = NULL; + handle->page_data = NULL; + handle->page_data_size = 0; + handle->page_position = 0; handle->page_entries_max = 0; handle->update_every_ut = (usec_t)update_every * USEC_PER_SEC; @@ -286,65 +295,29 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri return (STORAGE_COLLECT_HANDLE *)handle; } -/* The page must be populated and referenced */ -static bool page_has_only_empty_metrics(struct rrdeng_collect_handle *handle) { - switch(handle->type) { - case PAGE_METRICS: { - size_t slots = handle->page_position; - storage_number *array = (storage_number *)pgc_page_data(handle->page); - for (size_t i = 0 ; i < slots; ++i) { - if(does_storage_number_exist(array[i])) - return false; - } - } - break; - - case PAGE_TIER: { - size_t slots = handle->page_position; - storage_number_tier1_t *array = (storage_number_tier1_t *)pgc_page_data(handle->page); - for (size_t i = 0 ; i < slots; ++i) { - if(fpclassify(array[i].sum_value) != FP_NAN) - return false; - } - } - break; - - default: { - static bool logged = false; - if(!logged) { - netdata_log_error("DBENGINE: cannot check page for nulls on unknown page type id %d", (mrg_metric_ctx(handle->metric))->config.page_type); - logged = true; - } - return false; - } - } - - return true; -} - void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) { struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; - if (unlikely(!handle->page)) + if (unlikely(!handle->pgc_page)) return; - if(!handle->page_position || page_has_only_empty_metrics(handle)) - pgc_page_to_clean_evict_or_release(main_cache, handle->page); + if(pgd_is_empty(handle->page_data)) + pgc_page_to_clean_evict_or_release(main_cache, handle->pgc_page); else { check_completed_page_consistency(handle); - mrg_metric_set_clean_latest_time_s(main_mrg, handle->metric, pgc_page_end_time_s(handle->page)); - pgc_page_hot_to_dirty_and_release(main_cache, handle->page); + mrg_metric_set_clean_latest_time_s(main_mrg, handle->metric, pgc_page_end_time_s(handle->pgc_page)); + pgc_page_hot_to_dirty_and_release(main_cache, handle->pgc_page); } mrg_metric_set_hot_latest_time_s(main_mrg, handle->metric, 0); - handle->page = NULL; + handle->pgc_page = NULL; handle->page_flags = 0; handle->page_position = 0; handle->page_entries_max = 0; - handle->data = NULL; - handle->data_size = 0; + handle->page_data = NULL; + handle->page_data_size = 0; // important! // we should never zero page end time ut, because this will allow @@ -358,10 +331,10 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h } static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *handle, - struct rrdengine_instance *ctx, - usec_t point_in_time_ut, - void *data, - size_t data_size) { + struct rrdengine_instance *ctx, + usec_t point_in_time_ut, + PGD *data, + size_t data_size) { time_t point_in_time_s = (time_t)(point_in_time_ut / USEC_PER_SEC); const time_t update_every_s = (time_t)(handle->update_every_ut / USEC_PER_SEC); @@ -378,7 +351,7 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha size_t conflicts = 0; bool added = true; - PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, &added); + PGC_PAGE *pgc_page = pgc_page_add_and_acquire(main_cache, page_entry, &added); while (unlikely(!added)) { conflicts++; @@ -388,33 +361,33 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha #ifdef NETDATA_INTERNAL_CHECKS internal_error(true, #else - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, #endif - "DBENGINE: metric '%s' new page from %ld to %ld, update every %ld, has a conflict in main cache " - "with existing %s%s page from %ld to %ld, update every %ld - " - "is it collected more than once?", - uuid, - page_entry.start_time_s, page_entry.end_time_s, (time_t)page_entry.update_every_s, - pgc_is_page_hot(page) ? "hot" : "not-hot", - pgc_page_data(page) == DBENGINE_EMPTY_PAGE ? " gap" : "", - pgc_page_start_time_s(page), pgc_page_end_time_s(page), pgc_page_update_every_s(page) + "DBENGINE: metric '%s' new page from %ld to %ld, update every %ld, has a conflict in main cache " + "with existing %s%s page from %ld to %ld, update every %ld - " + "is it collected more than once?", + uuid, + page_entry.start_time_s, page_entry.end_time_s, (time_t)page_entry.update_every_s, + pgc_is_page_hot(pgc_page) ? "hot" : "not-hot", + pgc_page_data(pgc_page) == PGD_EMPTY ? " gap" : "", + pgc_page_start_time_s(pgc_page), pgc_page_end_time_s(pgc_page), pgc_page_update_every_s(pgc_page) ); - pgc_page_release(main_cache, page); + pgc_page_release(main_cache, pgc_page); point_in_time_ut -= handle->update_every_ut; point_in_time_s = (time_t)(point_in_time_ut / USEC_PER_SEC); page_entry.start_time_s = point_in_time_s; page_entry.end_time_s = point_in_time_s; - page = pgc_page_add_and_acquire(main_cache, page_entry, &added); + pgc_page = pgc_page_add_and_acquire(main_cache, page_entry, &added); } handle->page_entries_max = data_size / CTX_POINT_SIZE_BYTES(ctx); handle->page_start_time_ut = point_in_time_ut; handle->page_end_time_ut = point_in_time_ut; handle->page_position = 1; // zero is already in our data - handle->page = page; + handle->pgc_page = pgc_page; handle->page_flags = conflicts? RRDENG_PAGE_CONFLICT : 0; if(point_in_time_s > max_acceptable_collected_time()) @@ -441,9 +414,11 @@ static size_t aligned_allocation_entries(size_t max_slots, size_t target_slot, t return slots; } -static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, size_t *data_size, usec_t point_in_time_ut) { +static PGD *rrdeng_alloc_new_page_data(struct rrdeng_collect_handle *handle, size_t *data_size, usec_t point_in_time_ut) { struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric); + PGD *d = NULL; + size_t max_size = tier_page_size[ctx->config.tier]; size_t max_slots = max_size / CTX_POINT_SIZE_BYTES(ctx); @@ -467,10 +442,22 @@ static void *rrdeng_alloc_new_metric_data(struct rrdeng_collect_handle *handle, internal_fatal(size > tier_page_size[ctx->config.tier] || size < CTX_POINT_SIZE_BYTES(ctx) * 2, "ooops! wrong page size"); *data_size = size; - void *d = dbengine_page_alloc(size); - timing_step(TIMING_STEP_DBENGINE_PAGE_ALLOC); + switch (ctx->config.page_type) { + case PAGE_METRICS: + case PAGE_TIER: + d = pgd_create(ctx->config.page_type, slots); + break; + case PAGE_GORILLA_METRICS: + // ignore slots, and use the fixed number of slots per gorilla buffer. + // gorilla will automatically add more buffers if needed. + d = pgd_create(ctx->config.page_type, GORILLA_BUFFER_SLOTS); + break; + default: + fatal("Unknown page type: %uc\n", ctx->config.page_type); + } + timing_step(TIMING_STEP_DBENGINE_PAGE_ALLOC); return d; } @@ -486,37 +473,25 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric); - if(unlikely(!handle->data)) - handle->data = rrdeng_alloc_new_metric_data(handle, &handle->data_size, point_in_time_ut); + if(unlikely(!handle->page_data)) + handle->page_data = rrdeng_alloc_new_page_data(handle, &handle->page_data_size, point_in_time_ut); timing_step(TIMING_STEP_DBENGINE_CHECK_DATA); - if(likely(ctx->config.page_type == PAGE_METRICS)) { - storage_number *tier0_metric_data = handle->data; - tier0_metric_data[handle->page_position] = pack_storage_number(n, flags); - } - else if(likely(ctx->config.page_type == PAGE_TIER)) { - storage_number_tier1_t *tier12_metric_data = handle->data; - storage_number_tier1_t number_tier1; - number_tier1.sum_value = (float) n; - number_tier1.min_value = (float) min_value; - number_tier1.max_value = (float) max_value; - number_tier1.anomaly_count = anomaly_count; - number_tier1.count = count; - tier12_metric_data[handle->page_position] = number_tier1; - } - else - fatal("DBENGINE: cannot store metric on unknown page type id %d", ctx->config.page_type); + pgd_append_point(handle->page_data, + point_in_time_ut, + n, min_value, max_value, count, anomaly_count, flags, + handle->page_position); timing_step(TIMING_STEP_DBENGINE_PACK); - if(unlikely(!handle->page)){ - rrdeng_store_metric_create_new_page(handle, ctx, point_in_time_ut, handle->data, handle->data_size); + if(unlikely(!handle->pgc_page)) { + rrdeng_store_metric_create_new_page(handle, ctx, point_in_time_ut, handle->page_data, handle->page_data_size); // handle->position is set to 1 already } else { // update an existing page - pgc_page_hot_set_end_time_s(main_cache, handle->page, (time_t) (point_in_time_ut / USEC_PER_SEC)); + pgc_page_hot_set_end_time_s(main_cache, handle->pgc_page, (time_t) (point_in_time_ut / USEC_PER_SEC)); handle->page_end_time_ut = point_in_time_ut; if(unlikely(++handle->page_position >= handle->page_entries_max)) { @@ -541,13 +516,13 @@ static void store_metric_next_error_log(struct rrdeng_collect_handle *handle __m uuid_unparse(*mrg_metric_uuid(main_mrg, handle->metric), uuid); BUFFER *wb = NULL; - if(handle->page && handle->page_flags) { + if(handle->pgc_page && handle->page_flags) { wb = buffer_create(0, NULL); collect_page_flags_to_buffer(wb, handle->page_flags); } - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_NOTICE, "DBENGINE: metric '%s' collected point at %ld, %s last collection at %ld, " "update every %ld, %s page from %ld to %ld, position %u (of %u), flags: %s", uuid, @@ -555,12 +530,12 @@ static void store_metric_next_error_log(struct rrdeng_collect_handle *handle __m msg, (time_t)(handle->page_end_time_ut / USEC_PER_SEC), (time_t)(handle->update_every_ut / USEC_PER_SEC), - handle->page ? "current" : "*LAST*", + handle->pgc_page ? "current" : "*LAST*", (time_t)(handle->page_start_time_ut / USEC_PER_SEC), (time_t)(handle->page_end_time_ut / USEC_PER_SEC), handle->page_position, handle->page_entries_max, wb ? buffer_tostring(wb) : "" - ); + ); buffer_free(wb); #else @@ -593,7 +568,7 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, ; } else if(unlikely(point_in_time_ut > handle->page_end_time_ut)) { - if(handle->page) { + if(handle->pgc_page) { if (unlikely(delta_ut < handle->update_every_ut)) { handle->page_flags |= RRDENG_PAGE_STEP_TOO_SMALL; rrdeng_store_metric_flush_current_page(collection_handle); @@ -801,12 +776,13 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_handle, bool debug_this __maybe_unused) { struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle; - struct rrdengine_instance *ctx = handle->ctx; + struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric); if (likely(handle->page)) { // we have a page to release pgc_page_release(main_cache, handle->page); handle->page = NULL; + pgdc_reset(&handle->pgdc, NULL, UINT32_MAX); } if (unlikely(handle->now_s > rrddim_handle->end_time_s)) @@ -815,10 +791,10 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han size_t entries = 0; handle->page = pg_cache_lookup_next(ctx, handle->pdc, handle->now_s, handle->dt_s, &entries); - internal_fatal(handle->page && (pgc_page_data(handle->page) == DBENGINE_EMPTY_PAGE || !entries), + internal_fatal(handle->page && (pgc_page_data(handle->page) == PGD_EMPTY || !entries), "A page was returned, but it is empty - pg_cache_lookup_next() should be handling this case"); - if (unlikely(!handle->page || pgc_page_data(handle->page) == DBENGINE_EMPTY_PAGE || !entries)) + if (unlikely(!handle->page || pgc_page_data(handle->page) == PGD_EMPTY || !entries)) return false; time_t page_start_time_s = pgc_page_start_time_s(handle->page); @@ -859,8 +835,10 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han handle->entries = entries; handle->position = position; - handle->metric_data = pgc_page_data((PGC_PAGE *)handle->page); handle->dt_s = page_update_every_s; + + pgdc_reset(&handle->pgdc, pgc_page_data(handle->page), handle->position); + return true; } @@ -889,38 +867,7 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim sp.start_time_s = handle->now_s - handle->dt_s; sp.end_time_s = handle->now_s; - switch(handle->ctx->config.page_type) { - case PAGE_METRICS: { - storage_number n = handle->metric_data[handle->position]; - sp.min = sp.max = sp.sum = unpack_storage_number(n); - sp.flags = n & SN_USER_FLAGS; - sp.count = 1; - sp.anomaly_count = is_storage_number_anomalous(n) ? 1 : 0; - } - break; - - case PAGE_TIER: { - storage_number_tier1_t tier1_value = ((storage_number_tier1_t *)handle->metric_data)[handle->position]; - sp.flags = tier1_value.anomaly_count ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS; - sp.count = tier1_value.count; - sp.anomaly_count = tier1_value.anomaly_count; - sp.min = tier1_value.min_value; - sp.max = tier1_value.max_value; - sp.sum = tier1_value.sum_value; - } - break; - - // we don't know this page type - default: { - static bool logged = false; - if(!logged) { - netdata_log_error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", handle->ctx->config.page_type); - logged = true; - } - storage_point_empty(sp, sp.start_time_s, sp.end_time_s); - } - break; - } + pgdc_get_next_point(&handle->pgdc, handle->position, &sp); prepare_for_next_iteration: internal_fatal(sp.end_time_s < rrddim_handle->start_time_s, "DBENGINE: this point is too old for this query"); @@ -944,8 +891,10 @@ void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_hand { struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle; - if (handle->page) + if (handle->page) { pgc_page_release(main_cache, handle->page); + pgdc_reset(&handle->pgdc, NULL, UINT32_MAX); + } if(!pdc_release_and_destroy_if_unreferenced(handle->pdc, false, false)) __atomic_store_n(&handle->pdc->workers_should_stop, true, __ATOMIC_RELAXED); @@ -1240,12 +1189,14 @@ int rrdeng_exit(struct rrdengine_instance *ctx) { // 4. then wait for completion bool logged = false; - while(__atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED) && !unittest_running) { + size_t count = 10; + while(__atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED) && count && !unittest_running) { if(!logged) { netdata_log_info("DBENGINE: waiting for collectors to finish on tier %d...", (ctx->config.legacy) ? -1 : ctx->config.tier); logged = true; } sleep_usec(100 * USEC_PER_MS); + count--; } netdata_log_info("DBENGINE: flushing main cache for tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier); diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h index 61449426f..7ae0e7079 100644 --- a/database/engine/rrdengineapi.h +++ b/database/engine/rrdengineapi.h @@ -20,6 +20,7 @@ extern int default_multidb_disk_quota_mb; extern struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS]; extern size_t page_type_size[]; extern size_t tier_page_size[]; +extern uint8_t tier_page_type[]; #define CTX_POINT_SIZE_BYTES(ctx) page_type_size[(ctx)->config.page_type] diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h index 831e48531..a0febd4f4 100644 --- a/database/engine/rrdenginelib.h +++ b/database/engine/rrdenginelib.h @@ -8,16 +8,9 @@ /* Forward declarations */ struct rrdengine_instance; -#define STR_HELPER(x) #x -#define STR(x) STR_HELPER(x) - -#define BITS_PER_ULONG (sizeof(unsigned long) * 8) - #define ALIGN_BYTES_FLOOR(x) (((x) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE) #define ALIGN_BYTES_CEILING(x) ((((x) + RRDENG_BLOCK_SIZE - 1) / RRDENG_BLOCK_SIZE) * RRDENG_BLOCK_SIZE) -#define ROUND_USEC_TO_SEC(x) (((x) + USEC_PER_SEC / 2 - 1) / USEC_PER_SEC) - typedef uintptr_t rrdeng_stats_t; #ifdef __ATOMIC_RELAXED @@ -58,7 +51,7 @@ static inline void modify_bit(unsigned *x, unsigned pos, uint8_t val) } } -#define RRDENG_PATH_MAX (4096) +#define RRDENG_PATH_MAX (FILENAME_MAX + 1) /* returns old *ptr value */ static inline unsigned long ulong_compare_and_swap(volatile unsigned long *ptr, @@ -74,12 +67,15 @@ static inline unsigned long ulong_compare_and_swap(volatile unsigned long *ptr, static inline int crc32cmp(void *crcp, uLong crc) { - return (*(uint32_t *)crcp != crc); + uint32_t loaded_crc; + memcpy(&loaded_crc, crcp, sizeof(loaded_crc)); + return (loaded_crc != crc); } static inline void crc32set(void *crcp, uLong crc) { - *(uint32_t *)crcp = crc; + uint32_t store_crc = (uint32_t) crc; + memcpy(crcp, &store_crc, sizeof(store_crc)); } int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size); |