diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:03 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:18 +0000 |
commit | 5da14042f70711ea5cf66e034699730335462f66 (patch) | |
tree | 0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/database/engine | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz netdata-5da14042f70711ea5cf66e034699730335462f66.zip |
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/database/engine/README.md (renamed from database/engine/README.md) | 0 | ||||
-rw-r--r-- | src/database/engine/cache.c (renamed from database/engine/cache.c) | 93 | ||||
-rw-r--r-- | src/database/engine/cache.h (renamed from database/engine/cache.h) | 9 | ||||
-rw-r--r-- | src/database/engine/datafile.c (renamed from database/engine/datafile.c) | 4 | ||||
-rw-r--r-- | src/database/engine/datafile.h (renamed from database/engine/datafile.h) | 0 | ||||
-rw-r--r-- | src/database/engine/dbengine-compression.c | 159 | ||||
-rw-r--r-- | src/database/engine/dbengine-compression.h | 15 | ||||
-rw-r--r-- | src/database/engine/dbengine-diagram.xml (renamed from database/engine/dbengine-diagram.xml) | 0 | ||||
-rw-r--r-- | src/database/engine/dbengine-stresstest.c | 456 | ||||
-rw-r--r-- | src/database/engine/dbengine-unittest.c | 419 | ||||
-rw-r--r-- | src/database/engine/journalfile.c (renamed from database/engine/journalfile.c) | 29 | ||||
-rw-r--r-- | src/database/engine/journalfile.h (renamed from database/engine/journalfile.h) | 1 | ||||
-rw-r--r-- | src/database/engine/metric.c (renamed from database/engine/metric.c) | 331 | ||||
-rw-r--r-- | src/database/engine/metric.h (renamed from database/engine/metric.h) | 15 | ||||
-rw-r--r-- | src/database/engine/page.c (renamed from database/engine/page.c) | 106 | ||||
-rw-r--r-- | src/database/engine/page.h (renamed from database/engine/page.h) | 0 | ||||
-rw-r--r-- | src/database/engine/page_test.cc (renamed from database/engine/page_test.cc) | 0 | ||||
-rw-r--r-- | src/database/engine/page_test.h (renamed from database/engine/page_test.h) | 0 | ||||
-rw-r--r-- | src/database/engine/pagecache.c (renamed from database/engine/pagecache.c) | 24 | ||||
-rw-r--r-- | src/database/engine/pagecache.h (renamed from database/engine/pagecache.h) | 6 | ||||
-rw-r--r-- | src/database/engine/pdc.c (renamed from database/engine/pdc.c) | 89 | ||||
-rw-r--r-- | src/database/engine/pdc.h (renamed from database/engine/pdc.h) | 0 | ||||
-rw-r--r-- | src/database/engine/rrddiskprotocol.h (renamed from database/engine/rrddiskprotocol.h) | 16 | ||||
-rw-r--r-- | src/database/engine/rrdengine.c (renamed from database/engine/rrdengine.c) | 92 | ||||
-rw-r--r-- | src/database/engine/rrdengine.h (renamed from database/engine/rrdengine.h) | 16 | ||||
-rwxr-xr-x | src/database/engine/rrdengineapi.c (renamed from database/engine/rrdengineapi.c) | 269 | ||||
-rw-r--r-- | src/database/engine/rrdengineapi.h (renamed from database/engine/rrdengineapi.h) | 45 | ||||
-rw-r--r-- | src/database/engine/rrdenginelib.c (renamed from database/engine/rrdenginelib.c) | 0 | ||||
-rw-r--r-- | src/database/engine/rrdenginelib.h (renamed from database/engine/rrdenginelib.h) | 0 |
29 files changed, 1705 insertions, 489 deletions
diff --git a/database/engine/README.md b/src/database/engine/README.md index 890018642..890018642 100644 --- a/database/engine/README.md +++ b/src/database/engine/README.md diff --git a/database/engine/cache.c b/src/database/engine/cache.c index eb1c35298..49a9b6b96 100644 --- a/database/engine/cache.c +++ b/src/database/engine/cache.c @@ -1325,21 +1325,8 @@ static PGC_PAGE *page_add(PGC *cache, PGC_ENTRY *entry, bool *added) { return page; } -static PGC_PAGE *page_find_and_acquire(PGC *cache, Word_t section, Word_t metric_id, time_t start_time_s, PGC_SEARCH method) { - __atomic_add_fetch(&cache->stats.workers_search, 1, __ATOMIC_RELAXED); - - size_t *stats_hit_ptr, *stats_miss_ptr; - - if(method == PGC_SEARCH_CLOSEST) { - __atomic_add_fetch(&cache->stats.searches_closest, 1, __ATOMIC_RELAXED); - stats_hit_ptr = &cache->stats.searches_closest_hits; - stats_miss_ptr = &cache->stats.searches_closest_misses; - } - else { - __atomic_add_fetch(&cache->stats.searches_exact, 1, __ATOMIC_RELAXED); - stats_hit_ptr = &cache->stats.searches_exact_hits; - stats_miss_ptr = &cache->stats.searches_exact_misses; - } +static PGC_PAGE *page_find_and_acquire_once(PGC *cache, Word_t section, Word_t metric_id, time_t start_time_s, PGC_SEARCH method, bool *retry) { + *retry = false; PGC_PAGE *page = NULL; size_t partition = pgc_indexing_partition(cache, metric_id); @@ -1462,22 +1449,13 @@ static PGC_PAGE *page_find_and_acquire(PGC *cache, Word_t section, Word_t metric if(!page_acquire(cache, page)) { // this page is not good to use + *retry = true; page = NULL; } } cleanup: pgc_index_read_unlock(cache, partition); - - if(page) { - __atomic_add_fetch(stats_hit_ptr, 1, __ATOMIC_RELAXED); - page_has_been_accessed(cache, page); - } - else - __atomic_add_fetch(stats_miss_ptr, 1, __ATOMIC_RELAXED); - - __atomic_sub_fetch(&cache->stats.workers_search, 1, __ATOMIC_RELAXED); - return page; } @@ -1882,7 +1860,7 @@ void pgc_page_release(PGC *cache, PGC_PAGE *page) { page_release(cache, page, is_page_clean(page)); } -void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page) { +void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page, bool never_flush) { __atomic_add_fetch(&cache->stats.workers_hot2dirty, 1, __ATOMIC_RELAXED); //#ifdef NETDATA_INTERNAL_CHECKS @@ -1901,10 +1879,8 @@ void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page) { __atomic_sub_fetch(&cache->stats.workers_hot2dirty, 1, __ATOMIC_RELAXED); // flush, if we have to - if((cache->config.options & PGC_OPTIONS_FLUSH_PAGES_INLINE) || flushing_critical(cache)) { - flush_pages(cache, cache->config.max_flushes_inline, PGC_SECTION_ALL, - false, false); - } + if(!never_flush && ((cache->config.options & PGC_OPTIONS_FLUSH_PAGES_INLINE) || flushing_critical(cache))) + flush_pages(cache, cache->config.max_flushes_inline, PGC_SECTION_ALL, false, false); } bool pgc_page_to_clean_evict_or_release(PGC *cache, PGC_PAGE *page) { @@ -1949,13 +1925,13 @@ time_t pgc_page_end_time_s(PGC_PAGE *page) { return page->end_time_s; } -time_t pgc_page_update_every_s(PGC_PAGE *page) { +uint32_t pgc_page_update_every_s(PGC_PAGE *page) { return page->update_every_s; } -time_t pgc_page_fix_update_every(PGC_PAGE *page, time_t update_every_s) { +uint32_t pgc_page_fix_update_every(PGC_PAGE *page, uint32_t update_every_s) { if(page->update_every_s == 0) - page->update_every_s = (uint32_t) update_every_s; + page->update_every_s = update_every_s; return page->update_every_s; } @@ -2050,7 +2026,46 @@ void pgc_page_hot_set_end_time_s(PGC *cache __maybe_unused, PGC_PAGE *page, time } PGC_PAGE *pgc_page_get_and_acquire(PGC *cache, Word_t section, Word_t metric_id, time_t start_time_s, PGC_SEARCH method) { - return page_find_and_acquire(cache, section, metric_id, start_time_s, method); + static const struct timespec ns = { .tv_sec = 0, .tv_nsec = 1 }; + + PGC_PAGE *page = NULL; + + __atomic_add_fetch(&cache->stats.workers_search, 1, __ATOMIC_RELAXED); + + size_t *stats_hit_ptr, *stats_miss_ptr; + + if(method == PGC_SEARCH_CLOSEST) { + __atomic_add_fetch(&cache->stats.searches_closest, 1, __ATOMIC_RELAXED); + stats_hit_ptr = &cache->stats.searches_closest_hits; + stats_miss_ptr = &cache->stats.searches_closest_misses; + } + else { + __atomic_add_fetch(&cache->stats.searches_exact, 1, __ATOMIC_RELAXED); + stats_hit_ptr = &cache->stats.searches_exact_hits; + stats_miss_ptr = &cache->stats.searches_exact_misses; + } + + while(1) { + bool retry = false; + + page = page_find_and_acquire_once(cache, section, metric_id, start_time_s, method, &retry); + + if(page || !retry) + break; + + nanosleep(&ns, NULL); + } + + if(page) { + __atomic_add_fetch(stats_hit_ptr, 1, __ATOMIC_RELAXED); + page_has_been_accessed(cache, page); + } + else + __atomic_add_fetch(stats_miss_ptr, 1, __ATOMIC_RELAXED); + + __atomic_sub_fetch(&cache->stats.workers_search, 1, __ATOMIC_RELAXED); + + return page; } struct pgc_statistics pgc_get_statistics(PGC *cache) { @@ -2224,7 +2239,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_ while ((PValue2 = JudyLFirstThenNext(mi->JudyL_pages_by_start_time, &start_time, &start_time_first))) { struct jv2_page_info *pi = *PValue2; page_transition_unlock(cache, pi->page); - pgc_page_hot_to_dirty_and_release(cache, pi->page); + pgc_page_hot_to_dirty_and_release(cache, pi->page, true); // make_acquired_page_clean_and_evict_or_page_release(cache, pi->page); aral_freez(ar_pi, pi); } @@ -2251,6 +2266,8 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_ aral_by_size_release(ar_mi); __atomic_sub_fetch(&cache->stats.workers_jv2_flush, 1, __ATOMIC_RELAXED); + + flush_pages(cache, cache->config.max_flushes_inline, PGC_SECTION_ALL, false, false); } static bool match_page_data(PGC_PAGE *page, void *data) { @@ -2396,7 +2413,7 @@ void *unittest_stress_test_collector(void *ptr) { if(i % 10 == 0) pgc_page_to_clean_evict_or_release(pgc_uts.cache, pgc_uts.metrics[i]); else - pgc_page_hot_to_dirty_and_release(pgc_uts.cache, pgc_uts.metrics[i]); + pgc_page_hot_to_dirty_and_release(pgc_uts.cache, pgc_uts.metrics[i], false); } } @@ -2721,7 +2738,7 @@ int pgc_unittest(void) { }, NULL); pgc_page_hot_set_end_time_s(cache, page2, 2001); - pgc_page_hot_to_dirty_and_release(cache, page2); + pgc_page_hot_to_dirty_and_release(cache, page2, false); PGC_PAGE *page3 = pgc_page_add_and_acquire(cache, (PGC_ENTRY){ .section = 3, @@ -2734,7 +2751,7 @@ int pgc_unittest(void) { }, NULL); pgc_page_hot_set_end_time_s(cache, page3, 2001); - pgc_page_hot_to_dirty_and_release(cache, page3); + pgc_page_hot_to_dirty_and_release(cache, page3, false); pgc_destroy(cache); diff --git a/database/engine/cache.h b/src/database/engine/cache.h index 7cd7c0636..b6f81bcc2 100644 --- a/database/engine/cache.h +++ b/src/database/engine/cache.h @@ -2,6 +2,7 @@ #ifndef DBENGINE_CACHE_H #define DBENGINE_CACHE_H +#include "datafile.h" #include "../rrd.h" // CACHE COMPILE TIME CONFIGURATION @@ -27,7 +28,7 @@ typedef struct pgc_entry { time_t end_time_s; // the end time of the page size_t size; // the size in bytes of the allocation, outside the cache void *data; // a pointer to data outside the cache - uint32_t update_every_s; // the update every of the page + uint32_t update_every_s; // the update every of the page bool hot; // true if this entry is currently being collected uint8_t *custom_data; } PGC_ENTRY; @@ -191,7 +192,7 @@ PGC_PAGE *pgc_page_dup(PGC *cache, PGC_PAGE *page); void pgc_page_release(PGC *cache, PGC_PAGE *page); // mark a hot page dirty, and release it -void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page); +void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page, bool never_flush); // find a page from the cache typedef enum { @@ -210,8 +211,8 @@ Word_t pgc_page_section(PGC_PAGE *page); Word_t pgc_page_metric(PGC_PAGE *page); time_t pgc_page_start_time_s(PGC_PAGE *page); time_t pgc_page_end_time_s(PGC_PAGE *page); -time_t pgc_page_update_every_s(PGC_PAGE *page); -time_t pgc_page_fix_update_every(PGC_PAGE *page, time_t update_every_s); +uint32_t pgc_page_update_every_s(PGC_PAGE *page); +uint32_t pgc_page_fix_update_every(PGC_PAGE *page, uint32_t update_every_s); time_t pgc_page_fix_end_time_s(PGC_PAGE *page, time_t end_time_s); void *pgc_page_data(PGC_PAGE *page); void *pgc_page_custom_data(PGC *cache, PGC_PAGE *page); diff --git a/database/engine/datafile.c b/src/database/engine/datafile.c index 7322039cd..1ec2dea79 100644 --- a/database/engine/datafile.c +++ b/src/database/engine/datafile.c @@ -557,7 +557,9 @@ void finalize_data_files(struct rrdengine_instance *ctx) { bool logged = false; - logged = false; + if (!ctx->datafiles.first) + return; + while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED)) { if(!logged) { netdata_log_info("Waiting for inflight flush to finish on tier %d...", ctx->config.tier); diff --git a/database/engine/datafile.h b/src/database/engine/datafile.h index 569f1b0a2..569f1b0a2 100644 --- a/database/engine/datafile.h +++ b/src/database/engine/datafile.h diff --git a/src/database/engine/dbengine-compression.c b/src/database/engine/dbengine-compression.c new file mode 100644 index 000000000..46ef2b075 --- /dev/null +++ b/src/database/engine/dbengine-compression.c @@ -0,0 +1,159 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "rrdengine.h" +#include "dbengine-compression.h" + +#ifdef ENABLE_LZ4 +#include <lz4.h> +#endif + +#ifdef ENABLE_ZSTD +#include <zstd.h> +#define DBENGINE_ZSTD_DEFAULT_COMPRESSION_LEVEL 3 +#endif + +uint8_t dbengine_default_compression(void) { + +#ifdef ENABLE_LZ4 + return RRDENG_COMPRESSION_LZ4; +#endif + + return RRDENG_COMPRESSION_NONE; +} + +bool dbengine_valid_compression_algorithm(uint8_t algorithm) { + switch(algorithm) { + case RRDENG_COMPRESSION_NONE: + +#ifdef ENABLE_LZ4 + case RRDENG_COMPRESSION_LZ4: +#endif + +#ifdef ENABLE_ZSTD + case RRDENG_COMPRESSION_ZSTD: +#endif + + return true; + + default: + return false; + } +} + +size_t dbengine_max_compressed_size(size_t uncompressed_size, uint8_t algorithm) { + switch(algorithm) { +#ifdef ENABLE_LZ4 + case RRDENG_COMPRESSION_LZ4: + fatal_assert(uncompressed_size < LZ4_MAX_INPUT_SIZE); + return LZ4_compressBound((int)uncompressed_size); +#endif + +#ifdef ENABLE_ZSTD + case RRDENG_COMPRESSION_ZSTD: + return ZSTD_compressBound(uncompressed_size); +#endif + + case RRDENG_COMPRESSION_NONE: + return uncompressed_size; + + default: + fatal("DBENGINE: unknown compression algorithm %u", algorithm); + } +} + +size_t dbengine_compress(void *payload, size_t uncompressed_size, uint8_t algorithm) { + // the result should be stored in the payload + // the caller must have called dbengine_max_compressed_size() to make sure the + // payload is big enough to fit the max size needed. + + switch(algorithm) { +#ifdef ENABLE_LZ4 + case RRDENG_COMPRESSION_LZ4: { + size_t max_compressed_size = dbengine_max_compressed_size(uncompressed_size, algorithm); + struct extent_buffer *eb = extent_buffer_get(max_compressed_size); + void *compressed_buf = eb->data; + + size_t compressed_size = + LZ4_compress_default(payload, compressed_buf, (int)uncompressed_size, (int)max_compressed_size); + + if(compressed_size > 0 && compressed_size < uncompressed_size) + memcpy(payload, compressed_buf, compressed_size); + else + compressed_size = 0; + + extent_buffer_release(eb); + return compressed_size; + } +#endif + +#ifdef ENABLE_ZSTD + case RRDENG_COMPRESSION_ZSTD: { + size_t max_compressed_size = dbengine_max_compressed_size(uncompressed_size, algorithm); + struct extent_buffer *eb = extent_buffer_get(max_compressed_size); + void *compressed_buf = eb->data; + + size_t compressed_size = ZSTD_compress(compressed_buf, max_compressed_size, payload, uncompressed_size, + DBENGINE_ZSTD_DEFAULT_COMPRESSION_LEVEL); + + if (ZSTD_isError(compressed_size)) { + internal_fatal(true, "DBENGINE: ZSTD compression error %s", ZSTD_getErrorName(compressed_size)); + compressed_size = 0; + } + + if(compressed_size > 0 && compressed_size < uncompressed_size) + memcpy(payload, compressed_buf, compressed_size); + else + compressed_size = 0; + + extent_buffer_release(eb); + return compressed_size; + } +#endif + + case RRDENG_COMPRESSION_NONE: + return 0; + + default: + fatal("DBENGINE: unknown compression algorithm %u", algorithm); + } +} + +size_t dbengine_decompress(void *dst, void *src, size_t dst_size, size_t src_size, uint8_t algorithm) { + switch(algorithm) { + +#ifdef ENABLE_LZ4 + case RRDENG_COMPRESSION_LZ4: { + int rc = LZ4_decompress_safe(src, dst, (int)src_size, (int)dst_size); + if(rc < 0) { + nd_log(NDLS_DAEMON, NDLP_ERR, "DBENGINE: ZSTD decompression error %d", rc); + rc = 0; + } + + return rc; + } +#endif + +#ifdef ENABLE_ZSTD + case RRDENG_COMPRESSION_ZSTD: { + size_t decompressed_size = ZSTD_decompress(dst, dst_size, src, src_size); + + if (ZSTD_isError(decompressed_size)) { + nd_log(NDLS_DAEMON, NDLP_ERR, "DBENGINE: ZSTD decompression error %s", + ZSTD_getErrorName(decompressed_size)); + + decompressed_size = 0; + } + + return decompressed_size; + } +#endif + + case RRDENG_COMPRESSION_NONE: + internal_fatal(true, "DBENGINE: %s() should not be called for uncompressed pages", __FUNCTION__ ); + return 0; + + default: + internal_fatal(true, "DBENGINE: unknown compression algorithm %u", algorithm); + return 0; + } +} diff --git a/src/database/engine/dbengine-compression.h b/src/database/engine/dbengine-compression.h new file mode 100644 index 000000000..8dd97f5d7 --- /dev/null +++ b/src/database/engine/dbengine-compression.h @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_DBENGINE_COMPRESSION_H +#define NETDATA_DBENGINE_COMPRESSION_H + +uint8_t dbengine_default_compression(void); + +bool dbengine_valid_compression_algorithm(uint8_t algorithm); + +size_t dbengine_max_compressed_size(size_t uncompressed_size, uint8_t algorithm); +size_t dbengine_compress(void *payload, size_t uncompressed_size, uint8_t algorithm); + +size_t dbengine_decompress(void *dst, void *src, size_t dst_size, size_t src_size, uint8_t algorithm); + +#endif //NETDATA_DBENGINE_COMPRESSION_H diff --git a/database/engine/dbengine-diagram.xml b/src/database/engine/dbengine-diagram.xml index 793e8a355..793e8a355 100644 --- a/database/engine/dbengine-diagram.xml +++ b/src/database/engine/dbengine-diagram.xml diff --git a/src/database/engine/dbengine-stresstest.c b/src/database/engine/dbengine-stresstest.c new file mode 100644 index 000000000..86d09c4ab --- /dev/null +++ b/src/database/engine/dbengine-stresstest.c @@ -0,0 +1,456 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "../../daemon/common.h" + +#ifdef ENABLE_DBENGINE + +static RRDHOST *dbengine_rrdhost_find_or_create(char *name) { + /* We don't want to drop metrics when generating load, + * we prefer to block data generation itself */ + + return rrdhost_find_or_create( + name, + name, + name, + os_type, + netdata_configured_timezone, + netdata_configured_abbrev_timezone, + netdata_configured_utc_offset, + program_name, + program_version, + default_rrd_update_every, + default_rrd_history_entries, + RRD_MEMORY_MODE_DBENGINE, + health_plugin_enabled(), + default_rrdpush_enabled, + default_rrdpush_destination, + default_rrdpush_api_key, + default_rrdpush_send_charts_matching, + default_rrdpush_enable_replication, + default_rrdpush_seconds_to_replicate, + default_rrdpush_replication_step, + NULL, + 0 + ); +} + +static inline void rrddim_set_by_pointer_fake_time(RRDDIM *rd, collected_number value, time_t now) { + rd->collector.last_collected_time.tv_sec = now; + rd->collector.last_collected_time.tv_usec = 0; + rd->collector.collected_value = value; + rrddim_set_updated(rd); + + rd->collector.counter++; + + collected_number v = (value >= 0) ? value : -value; + if(unlikely(v > rd->collector.collected_value_max)) rd->collector.collected_value_max = v; +} + +struct dbengine_chart_thread { + uv_thread_t thread; + RRDHOST *host; + char *chartname; /* Will be prefixed by type, e.g. "example_local1.", "example_local2." etc */ + unsigned dset_charts; /* number of charts */ + unsigned dset_dims; /* dimensions per chart */ + unsigned chart_i; /* current chart offset */ + time_t time_present; /* current virtual time of the benchmark */ + volatile time_t time_max; /* latest timestamp of stored values */ + unsigned history_seconds; /* how far back in the past to go */ + + volatile long done; /* initialize to 0, set to 1 to stop thread */ + struct completion charts_initialized; + unsigned long errors, stored_metrics_nr; /* statistics */ + + RRDSET *st; + RRDDIM *rd[]; /* dset_dims elements */ +}; + +collected_number generate_dbengine_chart_value(int chart_i, int dim_i, time_t time_current) +{ + collected_number value; + + value = ((collected_number)time_current) * (chart_i + 1); + value += ((collected_number)time_current) * (dim_i + 1); + value %= 1024LLU; + + return value; +} + +static void generate_dbengine_chart(void *arg) +{ + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); + struct dbengine_chart_thread *thread_info = (struct dbengine_chart_thread *)arg; + RRDHOST *host = thread_info->host; + char *chartname = thread_info->chartname; + const unsigned DSET_DIMS = thread_info->dset_dims; + unsigned history_seconds = thread_info->history_seconds; + time_t time_present = thread_info->time_present; + + unsigned j, update_every = 1; + RRDSET *st; + RRDDIM *rd[DSET_DIMS]; + char name[RRD_ID_LENGTH_MAX + 1]; + time_t time_current; + + // create the chart + snprintfz(name, RRD_ID_LENGTH_MAX, "example_local%u", thread_info->chart_i + 1); + thread_info->st = st = rrdset_create(host, name, chartname, chartname, "example", NULL, chartname, chartname, + chartname, NULL, 1, update_every, RRDSET_TYPE_LINE); + for (j = 0 ; j < DSET_DIMS ; ++j) { + snprintfz(name, RRD_ID_LENGTH_MAX, "%s%u", chartname, j + 1); + + thread_info->rd[j] = rd[j] = rrddim_add(st, name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + completion_mark_complete(&thread_info->charts_initialized); + + // feed it with the test data + time_current = time_present - history_seconds; + for (j = 0 ; j < DSET_DIMS ; ++j) { + rd[j]->collector.last_collected_time.tv_sec = + st->last_collected_time.tv_sec = st->last_updated.tv_sec = time_current - update_every; + rd[j]->collector.last_collected_time.tv_usec = + st->last_collected_time.tv_usec = st->last_updated.tv_usec = 0; + } + for( ; !thread_info->done && time_current < time_present ; time_current += update_every) { + st->usec_since_last_update = USEC_PER_SEC * update_every; + + for (j = 0; j < DSET_DIMS; ++j) { + collected_number value; + + value = generate_dbengine_chart_value(thread_info->chart_i, j, time_current); + rrddim_set_by_pointer_fake_time(rd[j], value, time_current); + ++thread_info->stored_metrics_nr; + } + rrdset_done(st); + thread_info->time_max = time_current; + } + for (j = 0; j < DSET_DIMS; ++j) { + rrdeng_store_metric_finalize((rd[j])->tiers[0].sch); + } +} + +void generate_dbengine_dataset(unsigned history_seconds) +{ + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); + const int DSET_CHARTS = 16; + const int DSET_DIMS = 128; + const uint64_t EXPECTED_COMPRESSION_RATIO = 20; + RRDHOST *host = NULL; + struct dbengine_chart_thread **thread_info; + int i; + time_t time_present; + + default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE; + default_rrdeng_page_cache_mb = 128; + // Worst case for uncompressible data + default_rrdeng_disk_quota_mb = (((uint64_t)DSET_DIMS * DSET_CHARTS) * sizeof(storage_number) * history_seconds) / + (1024 * 1024); + default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100; + + nd_log_limits_unlimited(); + fprintf(stderr, "Initializing localhost with hostname 'dbengine-dataset'"); + + host = dbengine_rrdhost_find_or_create("dbengine-dataset"); + if (NULL == host) + return; + + thread_info = mallocz(sizeof(*thread_info) * DSET_CHARTS); + for (i = 0 ; i < DSET_CHARTS ; ++i) { + thread_info[i] = mallocz(sizeof(*thread_info[i]) + sizeof(RRDDIM *) * DSET_DIMS); + } + fprintf(stderr, "\nRunning DB-engine workload generator\n"); + + time_present = now_realtime_sec(); + for (i = 0 ; i < DSET_CHARTS ; ++i) { + thread_info[i]->host = host; + thread_info[i]->chartname = "random"; + thread_info[i]->dset_charts = DSET_CHARTS; + thread_info[i]->chart_i = i; + thread_info[i]->dset_dims = DSET_DIMS; + thread_info[i]->history_seconds = history_seconds; + thread_info[i]->time_present = time_present; + thread_info[i]->time_max = 0; + thread_info[i]->done = 0; + completion_init(&thread_info[i]->charts_initialized); + fatal_assert(0 == uv_thread_create(&thread_info[i]->thread, generate_dbengine_chart, thread_info[i])); + completion_wait_for(&thread_info[i]->charts_initialized); + completion_destroy(&thread_info[i]->charts_initialized); + } + for (i = 0 ; i < DSET_CHARTS ; ++i) { + fatal_assert(0 == uv_thread_join(&thread_info[i]->thread)); + } + + for (i = 0 ; i < DSET_CHARTS ; ++i) { + freez(thread_info[i]); + } + freez(thread_info); + rrd_wrlock(); + rrdhost_free___while_having_rrd_wrlock(localhost, true); + rrd_unlock(); +} + +struct dbengine_query_thread { + uv_thread_t thread; + RRDHOST *host; + char *chartname; /* Will be prefixed by type, e.g. "example_local1.", "example_local2." etc */ + unsigned dset_charts; /* number of charts */ + unsigned dset_dims; /* dimensions per chart */ + time_t time_present; /* current virtual time of the benchmark */ + unsigned history_seconds; /* how far back in the past to go */ + volatile long done; /* initialize to 0, set to 1 to stop thread */ + unsigned long errors, queries_nr, queried_metrics_nr; /* statistics */ + uint8_t delete_old_data; /* if non zero then data are deleted when disk space is exhausted */ + + struct dbengine_chart_thread *chart_threads[]; /* dset_charts elements */ +}; + +static void query_dbengine_chart(void *arg) +{ + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); + struct dbengine_query_thread *thread_info = (struct dbengine_query_thread *)arg; + const int DSET_CHARTS = thread_info->dset_charts; + const int DSET_DIMS = thread_info->dset_dims; + time_t time_after, time_before, time_min, time_approx_min, time_max, duration; + int i, j, update_every = 1; + RRDSET *st; + RRDDIM *rd; + uint8_t same; + time_t time_now, time_retrieved, end_time; + collected_number generatedv; + NETDATA_DOUBLE value, expected; + struct storage_engine_query_handle seqh; + size_t value_errors = 0, time_errors = 0; + + do { + // pick a chart and dimension + i = random() % DSET_CHARTS; + st = thread_info->chart_threads[i]->st; + j = random() % DSET_DIMS; + rd = thread_info->chart_threads[i]->rd[j]; + + time_min = thread_info->time_present - thread_info->history_seconds + 1; + time_max = thread_info->chart_threads[i]->time_max; + + if (thread_info->delete_old_data) { + /* A time window of twice the disk space is sufficient for compression space savings of up to 50% */ + time_approx_min = time_max - (default_rrdeng_disk_quota_mb * 2 * 1024 * 1024) / + (((uint64_t) DSET_DIMS * DSET_CHARTS) * sizeof(storage_number)); + time_min = MAX(time_min, time_approx_min); + } + if (!time_max) { + time_before = time_after = time_min; + } else { + time_after = time_min + random() % (MAX(time_max - time_min, 1)); + duration = random() % 3600; + time_before = MIN(time_after + duration, time_max); /* up to 1 hour queries */ + } + + storage_engine_query_init(rd->tiers[0].seb, rd->tiers[0].smh, &seqh, time_after, time_before, STORAGE_PRIORITY_NORMAL); + ++thread_info->queries_nr; + for (time_now = time_after ; time_now <= time_before ; time_now += update_every) { + generatedv = generate_dbengine_chart_value(i, j, time_now); + expected = unpack_storage_number(pack_storage_number((NETDATA_DOUBLE) generatedv, SN_DEFAULT_FLAGS)); + + if (unlikely(storage_engine_query_is_finished(&seqh))) { + if (!thread_info->delete_old_data) { /* data validation only when we don't delete */ + fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT + ", found data gap, ### ERROR 12 ###\n", + rrdset_name(st), rrddim_name(rd), (unsigned long) time_now, expected); + ++thread_info->errors; + } + break; + } + + STORAGE_POINT sp = storage_engine_query_next_metric(&seqh); + value = sp.sum; + time_retrieved = sp.start_time_s; + end_time = sp.end_time_s; + + if (!netdata_double_isnumber(value)) { + if (!thread_info->delete_old_data) { /* data validation only when we don't delete */ + fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT + ", found data gap, ### ERROR 13 ###\n", + rrdset_name(st), rrddim_name(rd), (unsigned long) time_now, expected); + ++thread_info->errors; + } + break; + } + ++thread_info->queried_metrics_nr; + + same = (roundndd(value) == roundndd(expected)) ? 1 : 0; + if (!same) { + if (!thread_info->delete_old_data) { /* data validation only when we don't delete */ + if(!value_errors) + fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT + ", found " NETDATA_DOUBLE_FORMAT ", ### ERROR 14 ###\n", + rrdset_name(st), rrddim_name(rd), (unsigned long) time_now, expected, value); + value_errors++; + thread_info->errors++; + } + } + if (end_time != time_now) { + if (!thread_info->delete_old_data) { /* data validation only when we don't delete */ + if(!time_errors) + fprintf(stderr, + " DB-engine stresstest %s/%s: at %lu secs, found timestamp %lu ### ERROR 15 ###\n", + rrdset_name(st), rrddim_name(rd), (unsigned long) time_now, (unsigned long) time_retrieved); + time_errors++; + thread_info->errors++; + } + } + } + storage_engine_query_finalize(&seqh); + } while(!thread_info->done); + + if(value_errors) + fprintf(stderr, "%zu value errors encountered\n", value_errors); + + if(time_errors) + fprintf(stderr, "%zu time errors encountered\n", time_errors); +} + +void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsigned QUERY_THREADS, + unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB, unsigned DISK_SPACE_MB) +{ + fprintf(stderr, "%s() running...\n", __FUNCTION__ ); + const unsigned DSET_DIMS = 128; + const uint64_t EXPECTED_COMPRESSION_RATIO = 20; + const unsigned HISTORY_SECONDS = 3600 * 24 * 365 * 50; /* 50 year of history */ + RRDHOST *host = NULL; + struct dbengine_chart_thread **chart_threads; + struct dbengine_query_thread **query_threads; + unsigned i, j; + time_t time_start, test_duration; + + nd_log_limits_unlimited(); + + if (!TEST_DURATION_SEC) + TEST_DURATION_SEC = 10; + if (!DSET_CHARTS) + DSET_CHARTS = 1; + if (!QUERY_THREADS) + QUERY_THREADS = 1; + if (PAGE_CACHE_MB < RRDENG_MIN_PAGE_CACHE_SIZE_MB) + PAGE_CACHE_MB = RRDENG_MIN_PAGE_CACHE_SIZE_MB; + + default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE; + default_rrdeng_page_cache_mb = PAGE_CACHE_MB; + if (DISK_SPACE_MB) { + fprintf(stderr, "By setting disk space limit data are allowed to be deleted. " + "Data validation is turned off for this run.\n"); + default_rrdeng_disk_quota_mb = DISK_SPACE_MB; + } else { + // Worst case for uncompressible data + default_rrdeng_disk_quota_mb = + (((uint64_t) DSET_DIMS * DSET_CHARTS) * sizeof(storage_number) * HISTORY_SECONDS) / (1024 * 1024); + default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100; + } + + fprintf(stderr, "Initializing localhost with hostname 'dbengine-stress-test'\n"); + + (void)sql_init_meta_database(DB_CHECK_NONE, 1); + host = dbengine_rrdhost_find_or_create("dbengine-stress-test"); + if (NULL == host) + return; + + chart_threads = mallocz(sizeof(*chart_threads) * DSET_CHARTS); + for (i = 0 ; i < DSET_CHARTS ; ++i) { + chart_threads[i] = mallocz(sizeof(*chart_threads[i]) + sizeof(RRDDIM *) * DSET_DIMS); + } + query_threads = mallocz(sizeof(*query_threads) * QUERY_THREADS); + for (i = 0 ; i < QUERY_THREADS ; ++i) { + query_threads[i] = mallocz(sizeof(*query_threads[i]) + sizeof(struct dbengine_chart_thread *) * DSET_CHARTS); + } + fprintf(stderr, "\nRunning DB-engine stress test, %u seconds writers ramp-up time,\n" + "%u seconds of concurrent readers and writers, %u writer threads, %u reader threads,\n" + "%u MiB of page cache.\n", + RAMP_UP_SECONDS, TEST_DURATION_SEC, DSET_CHARTS, QUERY_THREADS, PAGE_CACHE_MB); + + time_start = now_realtime_sec() + HISTORY_SECONDS; /* move history to the future */ + for (i = 0 ; i < DSET_CHARTS ; ++i) { + chart_threads[i]->host = host; + chart_threads[i]->chartname = "random"; + chart_threads[i]->dset_charts = DSET_CHARTS; + chart_threads[i]->chart_i = i; + chart_threads[i]->dset_dims = DSET_DIMS; + chart_threads[i]->history_seconds = HISTORY_SECONDS; + chart_threads[i]->time_present = time_start; + chart_threads[i]->time_max = 0; + chart_threads[i]->done = 0; + chart_threads[i]->errors = chart_threads[i]->stored_metrics_nr = 0; + completion_init(&chart_threads[i]->charts_initialized); + fatal_assert(0 == uv_thread_create(&chart_threads[i]->thread, generate_dbengine_chart, chart_threads[i])); + } + /* barrier so that subsequent queries can access valid chart data */ + for (i = 0 ; i < DSET_CHARTS ; ++i) { + completion_wait_for(&chart_threads[i]->charts_initialized); + completion_destroy(&chart_threads[i]->charts_initialized); + } + sleep(RAMP_UP_SECONDS); + /* at this point data have already began being written to the database */ + for (i = 0 ; i < QUERY_THREADS ; ++i) { + query_threads[i]->host = host; + query_threads[i]->chartname = "random"; + query_threads[i]->dset_charts = DSET_CHARTS; + query_threads[i]->dset_dims = DSET_DIMS; + query_threads[i]->history_seconds = HISTORY_SECONDS; + query_threads[i]->time_present = time_start; + query_threads[i]->done = 0; + query_threads[i]->errors = query_threads[i]->queries_nr = query_threads[i]->queried_metrics_nr = 0; + for (j = 0 ; j < DSET_CHARTS ; ++j) { + query_threads[i]->chart_threads[j] = chart_threads[j]; + } + query_threads[i]->delete_old_data = DISK_SPACE_MB ? 1 : 0; + fatal_assert(0 == uv_thread_create(&query_threads[i]->thread, query_dbengine_chart, query_threads[i])); + } + sleep(TEST_DURATION_SEC); + /* stop workload */ + for (i = 0 ; i < DSET_CHARTS ; ++i) { + chart_threads[i]->done = 1; + } + for (i = 0 ; i < QUERY_THREADS ; ++i) { + query_threads[i]->done = 1; + } + for (i = 0 ; i < DSET_CHARTS ; ++i) { + assert(0 == uv_thread_join(&chart_threads[i]->thread)); + } + for (i = 0 ; i < QUERY_THREADS ; ++i) { + assert(0 == uv_thread_join(&query_threads[i]->thread)); + } + test_duration = now_realtime_sec() - (time_start - HISTORY_SECONDS); + if (!test_duration) + test_duration = 1; + fprintf(stderr, "\nDB-engine stress test finished in %lld seconds.\n", (long long)test_duration); + unsigned long stored_metrics_nr = 0; + for (i = 0 ; i < DSET_CHARTS ; ++i) { + stored_metrics_nr += chart_threads[i]->stored_metrics_nr; + } + unsigned long queried_metrics_nr = 0; + for (i = 0 ; i < QUERY_THREADS ; ++i) { + queried_metrics_nr += query_threads[i]->queried_metrics_nr; + } + fprintf(stderr, "%u metrics were stored (dataset size of %lu MiB) in %u charts by 1 writer thread per chart.\n", + DSET_CHARTS * DSET_DIMS, stored_metrics_nr * sizeof(storage_number) / (1024 * 1024), DSET_CHARTS); + fprintf(stderr, "Metrics were being generated per 1 emulated second and time was accelerated.\n"); + fprintf(stderr, "%lu metric data points were queried by %u reader threads.\n", queried_metrics_nr, QUERY_THREADS); + fprintf(stderr, "Query starting time is randomly chosen from the beginning of the time-series up to the time of\n" + "the latest data point, and ending time from 1 second up to 1 hour after the starting time.\n"); + fprintf(stderr, "Performance is %lld written data points/sec and %lld read data points/sec.\n", + (long long)(stored_metrics_nr / test_duration), (long long)(queried_metrics_nr / test_duration)); + + for (i = 0 ; i < DSET_CHARTS ; ++i) { + freez(chart_threads[i]); + } + freez(chart_threads); + for (i = 0 ; i < QUERY_THREADS ; ++i) { + freez(query_threads[i]); + } + freez(query_threads); + rrd_wrlock(); + rrdeng_prepare_exit((struct rrdengine_instance *)host->db[0].si); + rrdeng_exit((struct rrdengine_instance *)host->db[0].si); + rrdeng_enq_cmd(NULL, RRDENG_OPCODE_SHUTDOWN_EVLOOP, NULL, NULL, STORAGE_PRIORITY_BEST_EFFORT, NULL, NULL); + rrd_unlock(); +} + +#endif
\ No newline at end of file diff --git a/src/database/engine/dbengine-unittest.c b/src/database/engine/dbengine-unittest.c new file mode 100644 index 000000000..4c4d312c0 --- /dev/null +++ b/src/database/engine/dbengine-unittest.c @@ -0,0 +1,419 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "../../daemon/common.h" + +#ifdef ENABLE_DBENGINE + +#define CHARTS 64 +#define DIMS 16 // CHARTS * DIMS dimensions +#define REGIONS 11 +#define POINTS_PER_REGION 16384 +static const int REGION_UPDATE_EVERY[REGIONS] = {1, 15, 3, 20, 2, 6, 30, 12, 5, 4, 10}; + +#define START_TIMESTAMP MAX(2 * API_RELATIVE_TIME_MAX, 200000000) + +static time_t region_start_time(time_t previous_region_end_time, time_t update_every) { + // leave a small gap between regions + // but keep them close together, so that cross-region queries will be fast + + time_t rc = previous_region_end_time + update_every; + rc += update_every - (rc % update_every); + rc += update_every; + return rc; +} + +static inline collected_number point_value_get(size_t region, size_t chart, size_t dim, size_t point) { + // calculate the value to be stored for each point in the database + + collected_number r = (collected_number)region; + collected_number c = (collected_number)chart; + collected_number d = (collected_number)dim; + collected_number p = (collected_number)point; + + return (r * CHARTS * DIMS * POINTS_PER_REGION + + c * DIMS * POINTS_PER_REGION + + d * POINTS_PER_REGION + + p) % 10000000; +} + +static inline void storage_point_check(size_t region, size_t chart, size_t dim, size_t point, time_t now, time_t update_every, STORAGE_POINT sp, size_t *value_errors, size_t *time_errors, size_t *update_every_errors) { + // check the supplied STORAGE_POINT retrieved from the database + // against the computed timestamp, update_every and expected value + + if(storage_point_is_gap(sp)) sp.min = sp.max = sp.sum = NAN; + + collected_number expected = point_value_get(region, chart, dim, point); + + if(roundndd(expected) != roundndd(sp.sum)) { + if(*value_errors < DIMS * 2) { + fprintf(stderr, " >>> DBENGINE: VALUE DOES NOT MATCH: " + "region %zu, chart %zu, dimension %zu, point %zu, time %ld: " + "expected %lld, found %f\n", + region, chart, dim, point, now, expected, sp.sum); + } + + (*value_errors)++; + } + + if(sp.start_time_s > now || sp.end_time_s < now) { + if(*time_errors < DIMS * 2) { + fprintf(stderr, " >>> DBENGINE: TIMESTAMP DOES NOT MATCH: " + "region %zu, chart %zu, dimension %zu, point %zu, timestamp %ld: " + "expected %ld, found %ld - %ld\n", + region, chart, dim, point, now, now, sp.start_time_s, sp.end_time_s); + } + + (*time_errors)++; + } + + if(update_every != sp.end_time_s - sp.start_time_s) { + if(*update_every_errors < DIMS * 2) { + fprintf(stderr, " >>> DBENGINE: UPDATE EVERY DOES NOT MATCH: " + "region %zu, chart %zu, dimension %zu, point %zu, timestamp %ld: " + "expected %ld, found %ld\n", + region, chart, dim, point, now, update_every, sp.end_time_s - sp.start_time_s); + } + + (*update_every_errors)++; + } +} + +static inline void rrddim_set_by_pointer_fake_time(RRDDIM *rd, collected_number value, time_t now) { + rd->collector.last_collected_time.tv_sec = now; + rd->collector.last_collected_time.tv_usec = 0; + rd->collector.collected_value = value; + rrddim_set_updated(rd); + + rd->collector.counter++; + + collected_number v = (value >= 0) ? value : -value; + if(unlikely(v > rd->collector.collected_value_max)) rd->collector.collected_value_max = v; +} + +static RRDHOST *dbengine_rrdhost_find_or_create(char *name) { + /* We don't want to drop metrics when generating load, + * we prefer to block data generation itself */ + + return rrdhost_find_or_create( + name, + name, + name, + os_type, + netdata_configured_timezone, + netdata_configured_abbrev_timezone, + netdata_configured_utc_offset, + program_name, + program_version, + default_rrd_update_every, + default_rrd_history_entries, + RRD_MEMORY_MODE_DBENGINE, + health_plugin_enabled(), + default_rrdpush_enabled, + default_rrdpush_destination, + default_rrdpush_api_key, + default_rrdpush_send_charts_matching, + default_rrdpush_enable_replication, + default_rrdpush_seconds_to_replicate, + default_rrdpush_replication_step, + NULL, + 0 + ); +} + +static void test_dbengine_create_charts(RRDHOST *host, RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS], + int update_every) { + fprintf(stderr, "DBENGINE Creating Test Charts...\n"); + + int i, j; + char name[101]; + + for (i = 0 ; i < CHARTS ; ++i) { + snprintfz(name, sizeof(name) - 1, "dbengine-chart-%d", i); + + // create the chart + st[i] = rrdset_create(host, "netdata", name, name, "netdata", NULL, "Unit Testing", "a value", "unittest", + NULL, 1, update_every, RRDSET_TYPE_LINE); + rrdset_flag_set(st[i], RRDSET_FLAG_DEBUG); + rrdset_flag_set(st[i], RRDSET_FLAG_STORE_FIRST); + for (j = 0 ; j < DIMS ; ++j) { + snprintfz(name, sizeof(name) - 1, "dim-%d", j); + + rd[i][j] = rrddim_add(st[i], name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + } + } + + // Initialize DB with the very first entries + for (i = 0 ; i < CHARTS ; ++i) { + for (j = 0 ; j < DIMS ; ++j) { + rd[i][j]->collector.last_collected_time.tv_sec = + st[i]->last_collected_time.tv_sec = st[i]->last_updated.tv_sec = START_TIMESTAMP - 1; + rd[i][j]->collector.last_collected_time.tv_usec = + st[i]->last_collected_time.tv_usec = st[i]->last_updated.tv_usec = 0; + } + } + for (i = 0 ; i < CHARTS ; ++i) { + st[i]->usec_since_last_update = USEC_PER_SEC; + + for (j = 0; j < DIMS; ++j) { + rrddim_set_by_pointer_fake_time(rd[i][j], 69, START_TIMESTAMP); // set first value to 69 + } + + struct timeval now; + now_realtime_timeval(&now); + rrdset_timed_done(st[i], now, false); + } + // Flush pages for subsequent real values + for (i = 0 ; i < CHARTS ; ++i) { + for (j = 0; j < DIMS; ++j) { + rrdeng_store_metric_flush_current_page((rd[i][j])->tiers[0].sch); + } + } +} + +static time_t test_dbengine_create_metrics( + RRDSET *st[CHARTS], + RRDDIM *rd[CHARTS][DIMS], + size_t current_region, + time_t time_start) { + + time_t update_every = REGION_UPDATE_EVERY[current_region]; + fprintf(stderr, "DBENGINE Single Region Write to " + "region %zu, from %ld to %ld, with update every %ld...\n", + current_region, time_start, time_start + POINTS_PER_REGION * update_every, update_every); + + // for the database to save the metrics at the right time, we need to set + // the last data collection time to be just before the first data collection. + time_t time_now = time_start; + for (size_t c = 0 ; c < CHARTS ; ++c) { + for (size_t d = 0 ; d < DIMS ; ++d) { + storage_engine_store_change_collection_frequency(rd[c][d]->tiers[0].sch, (int)update_every); + + // setting these timestamps, to the data collection time, prevents interpolation + // during data collection, so that our value will be written as-is to the + // database. + + rd[c][d]->collector.last_collected_time.tv_sec = + st[c]->last_collected_time.tv_sec = st[c]->last_updated.tv_sec = time_now; + + rd[c][d]->collector.last_collected_time.tv_usec = + st[c]->last_collected_time.tv_usec = st[c]->last_updated.tv_usec = 0; + } + } + + // set the samples to the database + for (size_t p = 0; p < POINTS_PER_REGION ; ++p) { + for (size_t c = 0 ; c < CHARTS ; ++c) { + st[c]->usec_since_last_update = USEC_PER_SEC * update_every; + + for (size_t d = 0; d < DIMS; ++d) + rrddim_set_by_pointer_fake_time(rd[c][d], point_value_get(current_region, c, d, p), time_now); + + rrdset_timed_done(st[c], (struct timeval){ .tv_sec = time_now, .tv_usec = 0 }, false); + } + + time_now += update_every; + } + + return time_now; +} + +// Checks the metric data for the given region, returns number of errors +static size_t test_dbengine_check_metrics( + RRDSET *st[CHARTS] __maybe_unused, + RRDDIM *rd[CHARTS][DIMS], + size_t current_region, + time_t time_start, + time_t time_end) { + + time_t update_every = REGION_UPDATE_EVERY[current_region]; + fprintf(stderr, "DBENGINE Single Region Read from " + "region %zu, from %ld to %ld, with update every %ld...\n", + current_region, time_start, time_end, update_every); + + // initialize all queries + struct storage_engine_query_handle handles[CHARTS * DIMS] = { 0 }; + for (size_t c = 0 ; c < CHARTS ; ++c) { + for (size_t d = 0; d < DIMS; ++d) { + storage_engine_query_init(rd[c][d]->tiers[0].seb, + rd[c][d]->tiers[0].smh, + &handles[c * DIMS + d], + time_start, + time_end, + STORAGE_PRIORITY_NORMAL); + } + } + + // check the stored samples + size_t value_errors = 0, time_errors = 0, update_every_errors = 0; + time_t time_now = time_start; + for(size_t p = 0; p < POINTS_PER_REGION ;p++) { + for (size_t c = 0 ; c < CHARTS ; ++c) { + for (size_t d = 0; d < DIMS; ++d) { + STORAGE_POINT sp = storage_engine_query_next_metric(&handles[c * DIMS + d]); + storage_point_check(current_region, c, d, p, time_now, update_every, sp, + &value_errors, &time_errors, &update_every_errors); + } + } + + time_now += update_every; + } + + // finalize the queries + for (size_t c = 0 ; c < CHARTS ; ++c) { + for (size_t d = 0; d < DIMS; ++d) { + storage_engine_query_finalize(&handles[c * DIMS + d]); + } + } + + if(value_errors) + fprintf(stderr, "%zu value errors encountered (out of %d checks)\n", value_errors, POINTS_PER_REGION * CHARTS * DIMS); + + if(time_errors) + fprintf(stderr, "%zu time errors encountered (out of %d checks)\n", time_errors, POINTS_PER_REGION * CHARTS * DIMS); + + if(update_every_errors) + fprintf(stderr, "%zu update every errors encountered (out of %d checks)\n", update_every_errors, POINTS_PER_REGION * CHARTS * DIMS); + + return value_errors + time_errors + update_every_errors; +} + +static size_t dbengine_test_rrdr_single_region( + RRDSET *st[CHARTS], + RRDDIM *rd[CHARTS][DIMS], + size_t current_region, + time_t time_start, + time_t time_end) { + + time_t update_every = REGION_UPDATE_EVERY[current_region]; + fprintf(stderr, "RRDR Single Region Test on " + "region %zu, start time %lld, end time %lld, update every %ld, on %d dimensions...\n", + current_region, (long long)time_start, (long long)time_end, update_every, CHARTS * DIMS); + + size_t errors = 0, value_errors = 0, time_errors = 0, update_every_errors = 0; + long points = (time_end - time_start) / update_every; + for(size_t c = 0; c < CHARTS ;c++) { + ONEWAYALLOC *owa = onewayalloc_create(0); + RRDR *r = rrd2rrdr_legacy(owa, st[c], points, time_start, time_end, + RRDR_GROUPING_AVERAGE, 0, RRDR_OPTION_NATURAL_POINTS, + NULL, NULL, 0, 0, + QUERY_SOURCE_UNITTEST, STORAGE_PRIORITY_NORMAL); + if (!r) { + fprintf(stderr, " >>> DBENGINE: %s: empty RRDR on region %zu\n", rrdset_name(st[c]), current_region); + onewayalloc_destroy(owa); + errors++; + continue; + } + + if(r->internal.qt->request.st != st[c]) + fatal("queried wrong chart"); + + if(rrdr_rows(r) != POINTS_PER_REGION) + fatal("query returned wrong number of points (expected %d, got %zu)", POINTS_PER_REGION, rrdr_rows(r)); + + time_t time_now = time_start; + for (size_t p = 0; p < rrdr_rows(r); p++) { + size_t d = 0; + RRDDIM *dim; + rrddim_foreach_read(dim, r->internal.qt->request.st) { + if(unlikely(d >= r->d)) + fatal("got more dimensions (%zu) than expected (%zu)", d, r->d); + + if(rd[c][d] != dim) + fatal("queried wrong dimension"); + + RRDR_VALUE_FLAGS *co = &r->o[ p * r->d ]; + NETDATA_DOUBLE *cn = &r->v[ p * r->d ]; + + STORAGE_POINT sp = STORAGE_POINT_UNSET; + sp.min = sp.max = sp.sum = (co[d] & RRDR_VALUE_EMPTY) ? NAN :cn[d]; + sp.count = 1; + sp.end_time_s = r->t[p]; + sp.start_time_s = sp.end_time_s - r->view.update_every; + + storage_point_check(current_region, c, d, p, time_now, update_every, sp, &value_errors, &time_errors, &update_every_errors); + d++; + } + rrddim_foreach_done(dim); + time_now += update_every; + } + + rrdr_free(owa, r); + onewayalloc_destroy(owa); + } + + if(value_errors) + fprintf(stderr, "%zu value errors encountered (out of %d checks)\n", value_errors, POINTS_PER_REGION * CHARTS * DIMS); + + if(time_errors) + fprintf(stderr, "%zu time errors encountered (out of %d checks)\n", time_errors, POINTS_PER_REGION * CHARTS * DIMS); + + if(update_every_errors) + fprintf(stderr, "%zu update every errors encountered (out of %d checks)\n", update_every_errors, POINTS_PER_REGION * CHARTS * DIMS); + + return errors + value_errors + time_errors + update_every_errors; +} + +int test_dbengine(void) { + // provide enough threads to dbengine + setenv("UV_THREADPOOL_SIZE", "48", 1); + + size_t errors = 0, value_errors = 0, time_errors = 0; + + nd_log_limits_unlimited(); + fprintf(stderr, "\nRunning DB-engine test\n"); + + default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE; + fprintf(stderr, "Initializing localhost with hostname 'unittest-dbengine'"); + RRDHOST *host = dbengine_rrdhost_find_or_create("unittest-dbengine"); + if(!host) + fatal("Failed to initialize host"); + + RRDSET *st[CHARTS] = { 0 }; + RRDDIM *rd[CHARTS][DIMS] = { 0 }; + time_t time_start[REGIONS] = { 0 }, time_end[REGIONS] = { 0 }; + + // create the charts and dimensions we need + test_dbengine_create_charts(host, st, rd, REGION_UPDATE_EVERY[0]); + + time_t now = START_TIMESTAMP; + time_t update_every_old = REGION_UPDATE_EVERY[0]; + for(size_t current_region = 0; current_region < REGIONS ;current_region++) { + time_t update_every = REGION_UPDATE_EVERY[current_region]; + + if(update_every != update_every_old) { + for (size_t c = 0 ; c < CHARTS ; ++c) + rrdset_set_update_every_s(st[c], update_every); + } + + time_start[current_region] = region_start_time(now, update_every); + now = time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]); + + errors += test_dbengine_check_metrics(st, rd, current_region, time_start[current_region], time_end[current_region]); + } + + // check everything again + for(size_t current_region = 0; current_region < REGIONS ;current_region++) + errors += test_dbengine_check_metrics(st, rd, current_region, time_start[current_region], time_end[current_region]); + + // check again in reverse order + for(size_t current_region = 0; current_region < REGIONS ;current_region++) { + size_t region = REGIONS - 1 - current_region; + errors += test_dbengine_check_metrics(st, rd, region, time_start[region], time_end[region]); + } + + // check all the regions using RRDR + // this also checks the query planner and the query engine of Netdata + for (size_t current_region = 0 ; current_region < REGIONS ; current_region++) { + errors += dbengine_test_rrdr_single_region(st, rd, current_region, time_start[current_region], time_end[current_region]); + } + + rrd_wrlock(); + rrdeng_prepare_exit((struct rrdengine_instance *)host->db[0].si); + rrdeng_exit((struct rrdengine_instance *)host->db[0].si); + rrdeng_enq_cmd(NULL, RRDENG_OPCODE_SHUTDOWN_EVLOOP, NULL, NULL, STORAGE_PRIORITY_BEST_EFFORT, NULL, NULL); + rrd_unlock(); + + return (int)(errors + value_errors + time_errors); +} + +#endif diff --git a/database/engine/journalfile.c b/src/database/engine/journalfile.c index 9005b81ca..8099d017f 100644 --- a/database/engine/journalfile.c +++ b/src/database/engine/journalfile.c @@ -637,9 +637,12 @@ static int journalfile_check_superblock(uv_file file) fatal_assert(req.result >= 0); uv_fs_req_cleanup(&req); - if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) || - strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) { - netdata_log_error("DBENGINE: File has invalid superblock."); + + char jf_magic[RRDENG_MAGIC_SZ] = RRDENG_JF_MAGIC; + char jf_ver[RRDENG_VER_SZ] = RRDENG_JF_VER; + if (strncmp(superblock->magic_number, jf_magic, RRDENG_MAGIC_SZ) != 0 || + strncmp(superblock->version, jf_ver, RRDENG_VER_SZ) != 0) { + nd_log(NDLS_DAEMON, NDLP_ERR, "DBENGINE: File has invalid superblock."); ret = UV_EINVAL; } else { ret = 0; @@ -669,7 +672,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, uuid_t *temp_id; uint8_t page_type = jf_metric_data->descr[i].type; - if (page_type > PAGE_TYPE_MAX) { + if (page_type > RRDENG_PAGE_TYPE_MAX) { if (!bitmap256_get_bit(&page_error_map, page_type)) { netdata_log_error("DBENGINE: unknown page type %d encountered.", page_type); bitmap256_set_bit(&page_error_map, page_type, 1); @@ -700,13 +703,19 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, .section = (Word_t)ctx, .first_time_s = vd.start_time_s, .last_time_s = vd.end_time_s, - .latest_update_every_s = (uint32_t) vd.update_every_s, + .latest_update_every_s = vd.update_every_s, }; bool added; metric = mrg_metric_add_and_acquire(main_mrg, entry, &added); - if(added) + if(added) { + __atomic_add_fetch(&ctx->atomic.metrics, 1, __ATOMIC_RELAXED); update_metric_time = false; + } + if (vd.update_every_s) { + uint64_t samples = (vd.end_time_s - vd.start_time_s) / vd.update_every_s; + __atomic_add_fetch(&ctx->atomic.samples, samples, __ATOMIC_RELAXED); + } } Word_t metric_id = mrg_metric_id(main_mrg, metric); @@ -1005,7 +1014,7 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st time_t end_time_s = header_start_time_s + metric->delta_end_s; mrg_update_metric_retention_and_granularity_by_uuid( - main_mrg, (Word_t)ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s); + main_mrg, (Word_t)ctx, &metric->uuid, start_time_s, end_time_s, metric->update_every_s, now_s); metric++; } @@ -1042,7 +1051,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal journal_v1_file_size = (uint32_t)statbuf.st_size; journalfile_v2_generate_path(datafile, path_v2, sizeof(path_v2)); - fd = open(path_v2, O_RDONLY); + fd = open(path_v2, O_RDONLY | O_CLOEXEC); if (fd < 0) { if (errno == ENOENT) return 1; @@ -1226,7 +1235,7 @@ void *journalfile_v2_write_data_page(struct journal_v2_header *j2_header, void * data_page->delta_end_s = (uint32_t) (page_info->end_time_s - (time_t) (j2_header->start_time_ut) / USEC_PER_SEC); data_page->extent_index = page_info->extent_index; - data_page->update_every_s = (uint32_t) page_info->update_every_s; + data_page->update_every_s = page_info->update_every_s; data_page->page_length = (uint16_t) (ei ? ei->page_length : page_info->page_length); data_page->type = 0; @@ -1252,7 +1261,7 @@ static void *journalfile_v2_write_descriptors(struct journal_v2_header *j2_heade page_info = *PValue; // Write one descriptor and return the next data page location data_page = journalfile_v2_write_data_page(j2_header, (void *) data_page, page_info); - update_every_s = (uint32_t) page_info->update_every_s; + update_every_s = page_info->update_every_s; if (NULL == data_page) break; } diff --git a/database/engine/journalfile.h b/src/database/engine/journalfile.h index 5cdf72b9d..3f881ee16 100644 --- a/database/engine/journalfile.h +++ b/src/database/engine/journalfile.h @@ -7,7 +7,6 @@ /* Forward declarations */ struct rrdengine_instance; -struct rrdengine_worker_config; struct rrdengine_datafile; struct rrdengine_journalfile; diff --git a/database/engine/metric.c b/src/database/engine/metric.c index 2e132612e..01eb22fbc 100644 --- a/database/engine/metric.c +++ b/src/database/engine/metric.c @@ -1,5 +1,8 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "metric.h" +#include "cache.h" +#include "libnetdata/locks/locks.h" +#include "rrddiskprotocol.h" typedef int32_t REFCOUNT; #define REFCOUNT_DELETING (-100) @@ -104,8 +107,11 @@ static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg, size_t partition static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) { uint8_t *u = (uint8_t *)uuid; - size_t *n = (size_t *)&u[UUID_SZ - sizeof(size_t)]; - return *n % mrg->partitions; + + size_t n; + memcpy(&n, &u[UUID_SZ - sizeof(size_t)], sizeof(size_t)); + + return n % mrg->partitions; } static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused, METRIC *metric) { @@ -125,87 +131,174 @@ static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused, return first_time_s; } -static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric) { +static void metric_log(MRG *mrg __maybe_unused, METRIC *metric, const char *msg) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)metric->section; + + char uuid[UUID_STR_LEN]; + uuid_unparse_lower(metric->uuid, uuid); + nd_log(NDLS_DAEMON, NDLP_ERR, + "METRIC: %s on %s at tier %d, refcount %d, partition %u, " + "retention [%ld - %ld (hot), %ld (clean)], update every %"PRIu32", " + "writer pid %d " + "--- PLEASE OPEN A GITHUB ISSUE TO REPORT THIS LOG LINE TO NETDATA --- ", + msg, + uuid, + ctx->config.tier, + metric->refcount, + metric->partition, + metric->first_time_s, + metric->latest_time_s_hot, + metric->latest_time_s_clean, + metric->latest_update_every_s, + (int)metric->writer + ); +} + +static inline bool acquired_metric_has_retention(MRG *mrg, METRIC *metric) { + time_t first, last; + mrg_metric_get_retention(mrg, metric, &first, &last, NULL); + return (!first || !last || first > last); +} + +static inline void acquired_for_deletion_metric_delete(MRG *mrg, METRIC *metric) { size_t partition = metric->partition; - REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); - REFCOUNT refcount; + + size_t mem_before_judyl, mem_after_judyl; + + mrg_index_write_lock(mrg, partition); + + Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t)); + if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) { + MRG_STATS_DELETE_MISS(mrg, partition); + mrg_index_write_unlock(mrg, partition); + return; + } + + mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); + int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0); + mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); + mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); + + if(unlikely(!rc)) { + MRG_STATS_DELETE_MISS(mrg, partition); + mrg_index_write_unlock(mrg, partition); + return; + } + + if(!*sections_judy_pptr) { + rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0); + if(unlikely(!rc)) + fatal("DBENGINE METRIC: cannot delete UUID from JudyHS"); + mrg_stats_size_judyhs_removed_uuid(mrg, partition); + } + + MRG_STATS_DELETED_METRIC(mrg, partition); + + mrg_index_write_unlock(mrg, partition); + + aral_freez(mrg->index[partition].aral, metric); +} + +static inline bool metric_acquire(MRG *mrg, METRIC *metric) { + REFCOUNT expected, desired; + + expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); do { - if(expected < 0) - fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount); + if(unlikely(expected < 0)) + return false; + + desired = expected + 1; - refcount = expected + 1; - } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)); + + size_t partition = metric->partition; - if(refcount == 1) + if(desired == 1) __atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); __atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); - return refcount; + return true; } -static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) { +static inline bool metric_release(MRG *mrg, METRIC *metric, bool delete_if_last_without_retention) { size_t partition = metric->partition; - REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); - REFCOUNT refcount; + REFCOUNT expected, desired; + + expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED); do { - if(expected <= 0) - fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount); + if(expected <= 0) { + metric_log(mrg, metric, "refcount is zero or negative during release"); + fatal("METRIC: refcount is %d (zero or negative) during release", expected); + } - refcount = expected - 1; - } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + if(expected == 1 && delete_if_last_without_retention && !acquired_metric_has_retention(mrg, metric)) + desired = REFCOUNT_DELETING; + else + desired = expected - 1; + + } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); - if(unlikely(!refcount)) + if(desired == 0 || desired == REFCOUNT_DELETING) { __atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); + if(desired == REFCOUNT_DELETING) + acquired_for_deletion_metric_delete(mrg, metric); + } + __atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); - time_t first, last, ue; - mrg_metric_get_retention(mrg, metric, &first, &last, &ue); - return (!first || !last || first > last); + return desired == REFCOUNT_DELETING; } static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { size_t partition = uuid_partition(mrg, entry->uuid); METRIC *allocation = aral_mallocz(mrg->index[partition].aral); + Pvoid_t *PValue; - mrg_index_write_lock(mrg, partition); + while(1) { + mrg_index_write_lock(mrg, partition); - size_t mem_before_judyl, mem_after_judyl; + size_t mem_before_judyl, mem_after_judyl; - Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, entry->uuid, sizeof(uuid_t), PJE0); - if(unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR)) - fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array"); + Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, entry->uuid, sizeof(uuid_t), PJE0); + if (unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR)) + fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array"); - if(unlikely(!*sections_judy_pptr)) - mrg_stats_size_judyhs_added_uuid(mrg, partition); + if (unlikely(!*sections_judy_pptr)) + mrg_stats_size_judyhs_added_uuid(mrg, partition); - mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); - Pvoid_t *PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0); - mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); - mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); + mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); + PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0); + mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); + mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); - if(unlikely(!PValue || PValue == PJERR)) - fatal("DBENGINE METRIC: corrupted section JudyL array"); + if (unlikely(!PValue || PValue == PJERR)) + fatal("DBENGINE METRIC: corrupted section JudyL array"); - if(unlikely(*PValue != NULL)) { - METRIC *metric = *PValue; + if (unlikely(*PValue != NULL)) { + METRIC *metric = *PValue; - metric_acquire(mrg, metric); + if(!metric_acquire(mrg, metric)) { + mrg_index_write_unlock(mrg, partition); + continue; + } - MRG_STATS_DUPLICATE_ADD(mrg, partition); + MRG_STATS_DUPLICATE_ADD(mrg, partition); + mrg_index_write_unlock(mrg, partition); - mrg_index_write_unlock(mrg, partition); + if (ret) + *ret = false; - if(ret) - *ret = false; + aral_freez(mrg->index[partition].aral, allocation); - aral_freez(mrg->index[partition].aral, allocation); + return metric; + } - return metric; + break; } METRIC *metric = allocation; @@ -216,9 +309,8 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r metric->latest_time_s_hot = 0; metric->latest_update_every_s = entry->latest_update_every_s; metric->writer = 0; - metric->refcount = 0; + metric->refcount = 1; metric->partition = partition; - metric_acquire(mrg, metric); *PValue = metric; MRG_STATS_ADDED_METRIC(mrg, partition); @@ -234,77 +326,35 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { size_t partition = uuid_partition(mrg, uuid); - mrg_index_read_lock(mrg, partition); + while(1) { + mrg_index_read_lock(mrg, partition); - Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t)); - if(unlikely(!sections_judy_pptr)) { - mrg_index_read_unlock(mrg, partition); - MRG_STATS_SEARCH_MISS(mrg, partition); - return NULL; - } - - Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0); - if(unlikely(!PValue)) { - mrg_index_read_unlock(mrg, partition); - MRG_STATS_SEARCH_MISS(mrg, partition); - return NULL; - } - - METRIC *metric = *PValue; - - metric_acquire(mrg, metric); - - mrg_index_read_unlock(mrg, partition); - - MRG_STATS_SEARCH_HIT(mrg, partition); - return metric; -} - -static inline bool acquired_metric_del(MRG *mrg, METRIC *metric) { - size_t partition = metric->partition; - - size_t mem_before_judyl, mem_after_judyl; - - mrg_index_write_lock(mrg, partition); + Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t)); + if (unlikely(!sections_judy_pptr)) { + mrg_index_read_unlock(mrg, partition); + MRG_STATS_SEARCH_MISS(mrg, partition); + return NULL; + } - if(!metric_release_and_can_be_deleted(mrg, metric)) { - mrg->index[partition].stats.delete_having_retention_or_referenced++; - mrg_index_write_unlock(mrg, partition); - return false; - } + Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0); + if (unlikely(!PValue)) { + mrg_index_read_unlock(mrg, partition); + MRG_STATS_SEARCH_MISS(mrg, partition); + return NULL; + } - Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t)); - if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) { - MRG_STATS_DELETE_MISS(mrg, partition); - mrg_index_write_unlock(mrg, partition); - return false; - } + METRIC *metric = *PValue; - mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); - int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0); - mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); - mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); + if(metric && !metric_acquire(mrg, metric)) + metric = NULL; - if(unlikely(!rc)) { - MRG_STATS_DELETE_MISS(mrg, partition); - mrg_index_write_unlock(mrg, partition); - return false; - } + mrg_index_read_unlock(mrg, partition); - if(!*sections_judy_pptr) { - rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0); - if(unlikely(!rc)) - fatal("DBENGINE METRIC: cannot delete UUID from JudyHS"); - mrg_stats_size_judyhs_removed_uuid(mrg, partition); + if(metric) { + MRG_STATS_SEARCH_HIT(mrg, partition); + return metric; + } } - - MRG_STATS_DELETED_METRIC(mrg, partition); - - mrg_index_write_unlock(mrg, partition); - - aral_freez(mrg->index[partition].aral, metric); - - return true; } // ---------------------------------------------------------------------------- @@ -359,7 +409,7 @@ inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section } inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) { - return acquired_metric_del(mrg, metric); + return metric_release(mrg, metric, true); } inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { @@ -367,8 +417,8 @@ inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { return metric; } -inline bool mrg_metric_release(MRG *mrg, METRIC *metric) { - return metric_release_and_can_be_deleted(mrg, metric); +inline void mrg_metric_release(MRG *mrg, METRIC *metric) { + metric_release(mrg, metric, false); } inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) { @@ -394,8 +444,8 @@ inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, return true; } -inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) { - internal_fatal(first_time_s < 0 || last_time_s < 0 || update_every_s < 0, +inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, uint32_t update_every_s) { + internal_fatal(first_time_s < 0 || last_time_s < 0, "DBENGINE METRIC: timestamp is negative"); internal_fatal(first_time_s > max_acceptable_collected_time(), "DBENGINE METRIC: metric first time is in the future"); @@ -425,13 +475,14 @@ inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metri return mrg_metric_get_first_time_s_smart(mrg, metric); } -inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) { +inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, uint32_t *update_every_s) { time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); *last_time_s = MAX(clean, hot); *first_time_s = mrg_metric_get_first_time_s_smart(mrg, metric); - *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED); + if (update_every_s) + *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED); } inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { @@ -498,8 +549,8 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr } } while(do_again); - time_t first, last, ue; - mrg_metric_get_retention(mrg, metric, &first, &last, &ue); + time_t first, last; + mrg_metric_get_retention(mrg, metric, &first, &last, NULL); return (first && last && first < last); } @@ -517,6 +568,11 @@ inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *me return false; } +inline time_t mrg_metric_get_latest_clean_time_s(MRG *mrg __maybe_unused, METRIC *metric) { + time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); + return clean; +} + inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) { time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED); time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED); @@ -524,25 +580,21 @@ inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metr return MAX(clean, hot); } -inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { - internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); - +inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, uint32_t update_every_s) { if(update_every_s > 0) return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true); return false; } -inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { - internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); - +inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, uint32_t update_every_s) { if(update_every_s > 0) return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0); return false; } -inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) { +inline uint32_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) { return __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED); } @@ -589,7 +641,7 @@ inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) { inline void mrg_update_metric_retention_and_granularity_by_uuid( MRG *mrg, Word_t section, uuid_t *uuid, time_t first_time_s, time_t last_time_s, - time_t update_every_s, time_t now_s) + uint32_t update_every_s, time_t now_s) { if(unlikely(last_time_s > now_s)) { nd_log_limit_static_global_var(erl, 1, 0); @@ -626,14 +678,35 @@ inline void mrg_update_metric_retention_and_granularity_by_uuid( .section = section, .first_time_s = first_time_s, .last_time_s = last_time_s, - .latest_update_every_s = (uint32_t) update_every_s + .latest_update_every_s = update_every_s }; metric = mrg_metric_add_and_acquire(mrg, entry, &added); } - if (likely(!added)) + struct rrdengine_instance *ctx = (struct rrdengine_instance *) section; + if (likely(!added)) { + uint64_t old_samples = 0; + + if (update_every_s && metric->latest_update_every_s && metric->latest_time_s_clean) + old_samples = (metric->latest_time_s_clean - metric->first_time_s) / metric->latest_update_every_s; + mrg_metric_expand_retention(mrg, metric, first_time_s, last_time_s, update_every_s); + uint64_t new_samples = 0; + if (update_every_s && metric->latest_update_every_s && metric->latest_time_s_clean) + new_samples = (metric->latest_time_s_clean - metric->first_time_s) / metric->latest_update_every_s; + + __atomic_add_fetch(&ctx->atomic.samples, new_samples - old_samples, __ATOMIC_RELAXED); + } + else { + // Newly added + if (update_every_s) { + uint64_t samples = (last_time_s - first_time_s) / update_every_s; + __atomic_add_fetch(&ctx->atomic.samples, samples, __ATOMIC_RELAXED); + } + __atomic_add_fetch(&ctx->atomic.metrics, 1, __ATOMIC_RELAXED); + } + mrg_metric_release(mrg, metric); } diff --git a/database/engine/metric.h b/src/database/engine/metric.h index dbb949301..3bace9057 100644 --- a/database/engine/metric.h +++ b/src/database/engine/metric.h @@ -52,7 +52,7 @@ MRG *mrg_create(ssize_t partitions); void mrg_destroy(MRG *mrg); METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric); -bool mrg_metric_release(MRG *mrg, METRIC *metric); +void mrg_metric_release(MRG *mrg, METRIC *metric); METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret); METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section); @@ -69,13 +69,14 @@ time_t mrg_metric_get_first_time_s(MRG *mrg, METRIC *metric); bool mrg_metric_set_clean_latest_time_s(MRG *mrg, METRIC *metric, time_t latest_time_s); bool mrg_metric_set_hot_latest_time_s(MRG *mrg, METRIC *metric, time_t latest_time_s); time_t mrg_metric_get_latest_time_s(MRG *mrg, METRIC *metric); +time_t mrg_metric_get_latest_clean_time_s(MRG *mrg, METRIC *metric); -bool mrg_metric_set_update_every(MRG *mrg, METRIC *metric, time_t update_every_s); -bool mrg_metric_set_update_every_s_if_zero(MRG *mrg, METRIC *metric, time_t update_every_s); -time_t mrg_metric_get_update_every_s(MRG *mrg, METRIC *metric); +bool mrg_metric_set_update_every(MRG *mrg, METRIC *metric, uint32_t update_every_s); +bool mrg_metric_set_update_every_s_if_zero(MRG *mrg, METRIC *metric, uint32_t update_every_s); +uint32_t mrg_metric_get_update_every_s(MRG *mrg, METRIC *metric); -void mrg_metric_expand_retention(MRG *mrg, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s); -void mrg_metric_get_retention(MRG *mrg, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s); +void mrg_metric_expand_retention(MRG *mrg, METRIC *metric, time_t first_time_s, time_t last_time_s, uint32_t update_every_s); +void mrg_metric_get_retention(MRG *mrg, METRIC *metric, time_t *first_time_s, time_t *last_time_s, uint32_t *update_every_s); bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric); bool mrg_metric_set_writer(MRG *mrg, METRIC *metric); @@ -89,6 +90,6 @@ size_t mrg_aral_overhead(void); void mrg_update_metric_retention_and_granularity_by_uuid( MRG *mrg, Word_t section, uuid_t *uuid, time_t first_time_s, time_t last_time_s, - time_t update_every_s, time_t now_s); + uint32_t update_every_s, time_t now_s); #endif // DBENGINE_METRIC_H diff --git a/database/engine/page.c b/src/database/engine/page.c index b7a393483..13fe90f7f 100644 --- a/database/engine/page.c +++ b/src/database/engine/page.c @@ -111,9 +111,9 @@ void pgd_init_arals(void) // FIXME: add stats pgd_alloc_globals.aral_gorilla_buffer[i] = aral_create( buf, - GORILLA_BUFFER_SIZE, + RRDENG_GORILLA_32BIT_BUFFER_SIZE, 64, - 512 * GORILLA_BUFFER_SIZE, + 512 * RRDENG_GORILLA_32BIT_BUFFER_SIZE, pgc_aral_statistics(), NULL, NULL, false, false); } @@ -165,8 +165,8 @@ PGD *pgd_create(uint8_t type, uint32_t slots) pg->states = PGD_STATE_CREATED_FROM_COLLECTOR; switch (type) { - case PAGE_METRICS: - case PAGE_TIER: { + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: { uint32_t size = slots * page_type_size[type]; internal_fatal(!size || slots == 1, @@ -176,11 +176,11 @@ PGD *pgd_create(uint8_t type, uint32_t slots) pg->raw.data = pgd_data_aral_alloc(size); break; } - case PAGE_GORILLA_METRICS: { + case RRDENG_PAGE_TYPE_GORILLA_32BIT: { internal_fatal(slots == 1, "DBENGINE: invalid number of slots (%u) or page type (%u)", slots, type); - pg->slots = 8 * GORILLA_BUFFER_SLOTS; + pg->slots = 8 * RRDENG_GORILLA_32BIT_BUFFER_SLOTS; // allocate new gorilla writer pg->gorilla.aral_index = gettid() % 4; @@ -188,16 +188,19 @@ PGD *pgd_create(uint8_t type, uint32_t slots) // allocate new gorilla buffer gorilla_buffer_t *gbuf = aral_mallocz(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index]); - memset(gbuf, 0, GORILLA_BUFFER_SIZE); + memset(gbuf, 0, RRDENG_GORILLA_32BIT_BUFFER_SIZE); global_statistics_gorilla_buffer_add_hot(); - *pg->gorilla.writer = gorilla_writer_init(gbuf, GORILLA_BUFFER_SLOTS); + *pg->gorilla.writer = gorilla_writer_init(gbuf, RRDENG_GORILLA_32BIT_BUFFER_SLOTS); pg->gorilla.num_buffers = 1; break; } default: - fatal("Unknown page type: %uc", type); + netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, type); + aral_freez(pgd_alloc_globals.aral_pgd, pg); + pg = PGD_EMPTY; + break; } return pg; @@ -219,8 +222,8 @@ PGD *pgd_create_from_disk_data(uint8_t type, void *base, uint32_t size) switch (type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: pg->raw.size = size; pg->used = size / page_type_size[type]; pg->slots = pg->used; @@ -228,10 +231,11 @@ PGD *pgd_create_from_disk_data(uint8_t type, void *base, uint32_t size) pg->raw.data = pgd_data_aral_alloc(size); memcpy(pg->raw.data, base, size); break; - case PAGE_GORILLA_METRICS: + case RRDENG_PAGE_TYPE_GORILLA_32BIT: internal_fatal(size == 0, "Asked to create page with 0 data!!!"); internal_fatal(size % sizeof(uint32_t), "Unaligned gorilla buffer size"); - internal_fatal(size % GORILLA_BUFFER_SIZE, "Expected size to be a multiple of %zu-bytes", GORILLA_BUFFER_SIZE); + internal_fatal(size % RRDENG_GORILLA_32BIT_BUFFER_SIZE, "Expected size to be a multiple of %zu-bytes", + RRDENG_GORILLA_32BIT_BUFFER_SIZE); pg->raw.data = mallocz(size); pg->raw.size = size; @@ -246,7 +250,10 @@ PGD *pgd_create_from_disk_data(uint8_t type, void *base, uint32_t size) pg->slots = pg->used; break; default: - fatal("Unknown page type: %uc", type); + netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, type); + aral_freez(pgd_alloc_globals.aral_pgd, pg); + pg = PGD_EMPTY; + break; } return pg; @@ -262,11 +269,11 @@ void pgd_free(PGD *pg) switch (pg->type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: pgd_data_aral_free(pg->raw.data, pg->raw.size); break; - case PAGE_GORILLA_METRICS: { + case RRDENG_PAGE_TYPE_GORILLA_32BIT: { if (pg->states & PGD_STATE_CREATED_FROM_DISK) { internal_fatal(pg->raw.data == NULL, "Tried to free gorilla PGD loaded from disk with NULL data"); @@ -306,7 +313,8 @@ void pgd_free(PGD *pg) break; } default: - fatal("Unknown page type: %uc", pg->type); + netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type); + break; } aral_freez(pgd_alloc_globals.aral_pgd, pg); @@ -358,20 +366,21 @@ uint32_t pgd_memory_footprint(PGD *pg) size_t footprint = 0; switch (pg->type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: footprint = sizeof(PGD) + pg->raw.size; break; - case PAGE_GORILLA_METRICS: { + case RRDENG_PAGE_TYPE_GORILLA_32BIT: { if (pg->states & PGD_STATE_CREATED_FROM_DISK) footprint = sizeof(PGD) + pg->raw.size; else - footprint = sizeof(PGD) + sizeof(gorilla_writer_t) + (pg->gorilla.num_buffers * GORILLA_BUFFER_SIZE); + footprint = sizeof(PGD) + sizeof(gorilla_writer_t) + (pg->gorilla.num_buffers * RRDENG_GORILLA_32BIT_BUFFER_SIZE); break; } default: - fatal("Unknown page type: %uc", pg->type); + netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type); + break; } return footprint; @@ -385,15 +394,15 @@ uint32_t pgd_disk_footprint(PGD *pg) size_t size = 0; switch (pg->type) { - case PAGE_METRICS: - case PAGE_TIER: { + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: { uint32_t used_size = pg->used * page_type_size[pg->type]; internal_fatal(used_size > pg->raw.size, "Wrong disk footprint page size"); size = used_size; break; } - case PAGE_GORILLA_METRICS: { + case RRDENG_PAGE_TYPE_GORILLA_32BIT: { if (pg->states & PGD_STATE_CREATED_FROM_COLLECTOR || pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING || pg->states & PGD_STATE_FLUSHED_TO_DISK) @@ -404,7 +413,7 @@ uint32_t pgd_disk_footprint(PGD *pg) internal_fatal(pg->gorilla.num_buffers == 0, "Gorilla writer does not have any buffers"); - size = pg->gorilla.num_buffers * GORILLA_BUFFER_SIZE; + size = pg->gorilla.num_buffers * RRDENG_GORILLA_32BIT_BUFFER_SIZE; if (pg->states & PGD_STATE_CREATED_FROM_COLLECTOR) { global_statistics_tier0_disk_compressed_bytes(gorilla_writer_nbytes(pg->gorilla.writer)); @@ -419,7 +428,8 @@ uint32_t pgd_disk_footprint(PGD *pg) break; } default: - fatal("Unknown page type: %uc", pg->type); + netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type); + break; } internal_fatal(pg->states & PGD_STATE_CREATED_FROM_DISK, @@ -434,11 +444,11 @@ void pgd_copy_to_extent(PGD *pg, uint8_t *dst, uint32_t dst_size) pgd_disk_footprint(pg), dst_size); switch (pg->type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: memcpy(dst, pg->raw.data, dst_size); break; - case PAGE_GORILLA_METRICS: { + case RRDENG_PAGE_TYPE_GORILLA_32BIT: { if ((pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING) == 0) fatal("Copying to extent is supported only for PGDs that are scheduled for flushing."); @@ -456,7 +466,8 @@ void pgd_copy_to_extent(PGD *pg, uint8_t *dst, uint32_t dst_size) break; } default: - fatal("Unknown page type: %uc", pg->type); + netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type); + break; } pg->states = PGD_STATE_FLUSHED_TO_DISK; @@ -490,7 +501,7 @@ void pgd_append_point(PGD *pg, fatal("Data collection on page already scheduled for flushing"); switch (pg->type) { - case PAGE_METRICS: { + case RRDENG_PAGE_TYPE_ARRAY_32BIT: { storage_number *tier0_metric_data = (storage_number *)pg->raw.data; storage_number t = pack_storage_number(n, flags); tier0_metric_data[pg->used++] = t; @@ -500,7 +511,7 @@ void pgd_append_point(PGD *pg, break; } - case PAGE_TIER: { + case RRDENG_PAGE_TYPE_ARRAY_TIER1: { storage_number_tier1_t *tier12_metric_data = (storage_number_tier1_t *)pg->raw.data; storage_number_tier1_t t; t.sum_value = (float) n; @@ -515,7 +526,7 @@ void pgd_append_point(PGD *pg, break; } - case PAGE_GORILLA_METRICS: { + case RRDENG_PAGE_TYPE_GORILLA_32BIT: { pg->used++; storage_number t = pack_storage_number(n, flags); @@ -525,9 +536,9 @@ void pgd_append_point(PGD *pg, bool ok = gorilla_writer_write(pg->gorilla.writer, t); if (!ok) { gorilla_buffer_t *new_buffer = aral_mallocz(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index]); - memset(new_buffer, 0, GORILLA_BUFFER_SIZE); + memset(new_buffer, 0, RRDENG_GORILLA_32BIT_BUFFER_SIZE); - gorilla_writer_add_buffer(pg->gorilla.writer, new_buffer, GORILLA_BUFFER_SLOTS); + gorilla_writer_add_buffer(pg->gorilla.writer, new_buffer, RRDENG_GORILLA_32BIT_BUFFER_SLOTS); pg->gorilla.num_buffers += 1; global_statistics_gorilla_buffer_add_hot(); @@ -537,7 +548,7 @@ void pgd_append_point(PGD *pg, break; } default: - fatal("DBENGINE: unknown page type id %d", pg->type); + netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type); break; } } @@ -550,11 +561,11 @@ static void pgdc_seek(PGDC *pgdc, uint32_t position) PGD *pg = pgdc->pgd; switch (pg->type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: pgdc->slots = pgdc->pgd->used; break; - case PAGE_GORILLA_METRICS: { + case RRDENG_PAGE_TYPE_GORILLA_32BIT: { if (pg->states & PGD_STATE_CREATED_FROM_DISK) { pgdc->slots = pgdc->pgd->slots; pgdc->gr = gorilla_reader_init((void *) pg->raw.data); @@ -588,7 +599,7 @@ static void pgdc_seek(PGDC *pgdc, uint32_t position) break; } default: - fatal("DBENGINE: unknown page type id %d", pg->type); + netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type); break; } } @@ -612,7 +623,7 @@ void pgdc_reset(PGDC *pgdc, PGD *pgd, uint32_t position) pgdc_seek(pgdc, position); } -bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *sp) +bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position __maybe_unused, STORAGE_POINT *sp) { if (!pgdc->pgd || pgdc->pgd == PGD_EMPTY || pgdc->position >= pgdc->slots) { @@ -624,7 +635,7 @@ bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT * switch (pgdc->pgd->type) { - case PAGE_METRICS: { + case RRDENG_PAGE_TYPE_ARRAY_32BIT: { storage_number *array = (storage_number *) pgdc->pgd->raw.data; storage_number n = array[pgdc->position++]; @@ -635,7 +646,7 @@ bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT * return true; } - case PAGE_TIER: { + case RRDENG_PAGE_TYPE_ARRAY_TIER1: { storage_number_tier1_t *array = (storage_number_tier1_t *) pgdc->pgd->raw.data; storage_number_tier1_t n = array[pgdc->position++]; @@ -648,7 +659,7 @@ bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT * return true; } - case PAGE_GORILLA_METRICS: { + case RRDENG_PAGE_TYPE_GORILLA_32BIT: { pgdc->position++; uint32_t n = 666666666; @@ -668,7 +679,8 @@ bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT * static bool logged = false; if (!logged) { - netdata_log_error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", pgd_type(pgdc->pgd)); + netdata_log_error("DBENGINE: unknown page type %"PRIu32" found. Cannot decode it. Ignoring its metrics.", + pgd_type(pgdc->pgd)); logged = true; } diff --git a/database/engine/page.h b/src/database/engine/page.h index 32c87c580..32c87c580 100644 --- a/database/engine/page.h +++ b/src/database/engine/page.h diff --git a/database/engine/page_test.cc b/src/database/engine/page_test.cc index d61299bc4..d61299bc4 100644 --- a/database/engine/page_test.cc +++ b/src/database/engine/page_test.cc diff --git a/database/engine/page_test.h b/src/database/engine/page_test.h index 30837f0ab..30837f0ab 100644 --- a/database/engine/page_test.h +++ b/src/database/engine/page_test.h diff --git a/database/engine/pagecache.c b/src/database/engine/pagecache.c index dab9cdd0d..452fdc50b 100644 --- a/database/engine/pagecache.c +++ b/src/database/engine/pagecache.c @@ -222,7 +222,7 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin Word_t metric_id = mrg_metric_id(main_mrg, metric); time_t now_s = wanted_start_time_s; - time_t dt_s = mrg_metric_get_update_every_s(main_mrg, metric); + uint32_t dt_s = mrg_metric_get_update_every_s(main_mrg, metric); if(!dt_s) dt_s = default_rrd_update_every; @@ -246,7 +246,7 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin time_t page_start_time_s = pgc_page_start_time_s(page); time_t page_end_time_s = pgc_page_end_time_s(page); - time_t page_update_every_s = pgc_page_update_every_s(page); + uint32_t page_update_every_s = pgc_page_update_every_s(page); if(!page_update_every_s) page_update_every_s = dt_s; @@ -282,7 +282,7 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin pd->metric_id = metric_id; pd->first_time_s = page_start_time_s; pd->last_time_s = page_end_time_s; - pd->update_every_s = (uint32_t) page_update_every_s; + pd->update_every_s = page_update_every_s; pd->page = (open_cache_mode) ? NULL : page; pd->status |= tags; @@ -332,8 +332,8 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin static void pgc_inject_gap(struct rrdengine_instance *ctx, METRIC *metric, time_t start_time_s, time_t end_time_s) { - time_t db_first_time_s, db_last_time_s, db_update_every_s; - mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s); + time_t db_first_time_s, db_last_time_s; + mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, NULL); if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) != PAGE_IS_IN_RANGE) return; @@ -547,7 +547,7 @@ static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METR if(prc == PAGE_IS_IN_THE_FUTURE) break; - time_t page_update_every_s = page_entry_in_journal->update_every_s; + uint32_t page_update_every_s = page_entry_in_journal->update_every_s; size_t page_length = page_entry_in_journal->page_length; if(datafile_acquire(datafile, DATAFILE_ACQUIRE_OPEN_CACHE)) { //for open cache item @@ -567,7 +567,7 @@ static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METR .metric_id = metric_id, .start_time_s = page_first_time_s, .end_time_s = page_last_time_s, - .update_every_s = (uint32_t) page_update_every_s, + .update_every_s = page_update_every_s, .data = datafile, .size = 0, .custom_data = (uint8_t *) &ei, @@ -845,7 +845,7 @@ struct pgc_page *pg_cache_lookup_next( struct rrdengine_instance *ctx, PDC *pdc, time_t now_s, - time_t last_update_every_s, + uint32_t last_update_every_s, size_t *entries ) { if (unlikely(!pdc)) @@ -905,7 +905,7 @@ struct pgc_page *pg_cache_lookup_next( time_t page_start_time_s = pgc_page_start_time_s(page); time_t page_end_time_s = pgc_page_end_time_s(page); - time_t page_update_every_s = pgc_page_update_every_s(page); + uint32_t page_update_every_s = pgc_page_update_every_s(page); if(unlikely(page_start_time_s == INVALID_TIME || page_end_time_s == INVALID_TIME)) { __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_zero_time_skipped, 1, __ATOMIC_RELAXED); @@ -918,7 +918,7 @@ struct pgc_page *pg_cache_lookup_next( if (unlikely(page_update_every_s <= 0 || page_update_every_s > 86400)) { __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_update_every_fixed, 1, __ATOMIC_RELAXED); page_update_every_s = pgc_page_fix_update_every(page, last_update_every_s); - pd->update_every_s = (uint32_t) page_update_every_s; + pd->update_every_s = page_update_every_s; } size_t entries_by_size = pgd_slots_used(pgc_page_data(page)); @@ -983,7 +983,7 @@ struct pgc_page *pg_cache_lookup_next( return page; } -void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, time_t update_every_s, +void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, uint32_t update_every_s, struct rrdengine_datafile *datafile, uint64_t extent_offset, unsigned extent_size, uint32_t page_length) { if(!datafile_acquire(datafile, DATAFILE_ACQUIRE_OPEN_CACHE)) // for open cache item @@ -1003,7 +1003,7 @@ void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s .metric_id = metric_id, .start_time_s = start_time_s, .end_time_s = end_time_s, - .update_every_s = (uint32_t) update_every_s, + .update_every_s = update_every_s, .size = 0, .data = datafile, .custom_data = (uint8_t *) &ext_io_data, diff --git a/database/engine/pagecache.h b/src/database/engine/pagecache.h index dbcbea53a..103d36484 100644 --- a/database/engine/pagecache.h +++ b/src/database/engine/pagecache.h @@ -14,8 +14,6 @@ extern struct pgc *extent_cache; struct rrdengine_instance; #define INVALID_TIME (0) -#define MAX_PAGE_CACHE_FETCH_RETRIES (3) -#define PAGE_CACHE_FETCH_WAIT_TIMEOUT (3) extern struct rrdeng_cache_efficiency_stats rrdeng_cache_efficiency_stats; @@ -54,9 +52,9 @@ struct page_details_control; void rrdeng_prep_wait(struct page_details_control *pdc); void rrdeng_prep_query(struct page_details_control *pdc, bool worker); void pg_cache_preload(struct rrdeng_query_handle *handle); -struct pgc_page *pg_cache_lookup_next(struct rrdengine_instance *ctx, struct page_details_control *pdc, time_t now_s, time_t last_update_every_s, size_t *entries); +struct pgc_page *pg_cache_lookup_next(struct rrdengine_instance *ctx, struct page_details_control *pdc, time_t now_s, uint32_t last_update_every_s, size_t *entries); void pgc_and_mrg_initialize(void); -void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, time_t update_every_s, struct rrdengine_datafile *datafile, uint64_t extent_offset, unsigned extent_size, uint32_t page_length); +void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, uint32_t update_every_s, struct rrdengine_datafile *datafile, uint64_t extent_offset, unsigned extent_size, uint32_t page_length); #endif /* NETDATA_PAGECACHE_H */ diff --git a/database/engine/pdc.c b/src/database/engine/pdc.c index 5fe205e64..79a424b77 100644 --- a/database/engine/pdc.c +++ b/src/database/engine/pdc.c @@ -1,6 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later #define NETDATA_RRD_INTERNALS #include "pdc.h" +#include "dbengine-compression.h" struct extent_page_details_list { uv_file file; @@ -628,24 +629,25 @@ void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags) { buffer_strcat(wb, "STEP_UNALIGNED"); } -inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error) { +inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, uint32_t overwrite_zero_update_every_s, bool have_read_error) { time_t start_time_s = (time_t) (descr->start_time_ut / USEC_PER_SEC); - time_t end_time_s; - size_t entries; + time_t end_time_s = 0; + size_t entries = 0; switch (descr->type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: end_time_s = descr->end_time_ut / USEC_PER_SEC; entries = 0; break; - case PAGE_GORILLA_METRICS: + case RRDENG_PAGE_TYPE_GORILLA_32BIT: end_time_s = start_time_s + descr->gorilla.delta_time_s; entries = descr->gorilla.entries; break; default: - fatal("Unknown page type: %uc\n", descr->type); + // Nothing to do. Validate page will notify the user. + break; } return validate_page( @@ -666,29 +668,30 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( uuid_t *uuid, time_t start_time_s, time_t end_time_s, - time_t update_every_s, // can be zero, if unknown + uint32_t update_every_s, // can be zero, if unknown size_t page_length, uint8_t page_type, size_t entries, // can be zero, if unknown time_t now_s, // can be zero, to disable future timestamp check - time_t overwrite_zero_update_every_s, // can be zero, if unknown + uint32_t overwrite_zero_update_every_s, // can be zero, if unknown bool have_read_error, const char *msg, - RRDENG_COLLECT_PAGE_FLAGS flags) { - + RRDENG_COLLECT_PAGE_FLAGS flags) +{ VALIDATED_PAGE_DESCRIPTOR vd = { .start_time_s = start_time_s, .end_time_s = end_time_s, .update_every_s = update_every_s, .page_length = page_length, + .point_size = page_type_size[page_type], .type = page_type, .is_valid = true, }; - vd.point_size = page_type_size[vd.type]; + bool known_page_type = true; switch (page_type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: // always calculate entries by size vd.entries = page_entries_by_size(vd.page_length, vd.point_size); @@ -696,13 +699,13 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( if(!entries) entries = vd.entries; break; - case PAGE_GORILLA_METRICS: + case RRDENG_PAGE_TYPE_GORILLA_32BIT: internal_fatal(entries == 0, "0 number of entries found on gorilla page"); vd.entries = entries; break; default: - // TODO: should set vd.is_valid false instead? - fatal("Unknown page type: %uc", page_type); + known_page_type = false; + break; } // allow to be called without update every (when loading pages from disk) @@ -723,16 +726,16 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( // If gorilla can not compress the data we might end up needing slightly more // than 4KiB. However, gorilla pages extend the page length by increments of // 512 bytes. - max_page_length += ((page_type == PAGE_GORILLA_METRICS) * GORILLA_BUFFER_SIZE); + max_page_length += ((page_type == RRDENG_PAGE_TYPE_GORILLA_32BIT) * RRDENG_GORILLA_32BIT_BUFFER_SIZE); - if( have_read_error || + if (!known_page_type || + have_read_error || vd.page_length == 0 || vd.page_length > max_page_length || vd.start_time_s > vd.end_time_s || (now_s && vd.end_time_s > now_s) || vd.start_time_s <= 0 || vd.end_time_s <= 0 || - vd.update_every_s < 0 || (vd.start_time_s == vd.end_time_s && vd.entries > 1) || (vd.update_every_s == 0 && vd.entries > 1)) { @@ -791,13 +794,13 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, #endif "DBENGINE: metric '%s' %s invalid page of type %u " - "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s)", + "from %ld to %ld (now %ld), update every %u, page length %zu, entries %zu (flags: %s)", uuid_str, msg, vd.type, vd.start_time_s, vd.end_time_s, now_s, vd.update_every_s, vd.page_length, vd.entries, wb?buffer_tostring(wb):"" ); } else { - const char *err_valid = (vd.is_valid) ? "" : "found invalid, "; + const char *err_valid = ""; const char *err_start = (vd.start_time_s == start_time_s) ? "" : "start time updated, "; const char *err_end = (vd.end_time_s == end_time_s) ? "" : "end time updated, "; const char *err_update = (vd.update_every_s == update_every_s) ? "" : "update every updated, "; @@ -811,9 +814,9 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, #endif "DBENGINE: metric '%s' %s page of type %u " - "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s), " + "from %ld to %ld (now %ld), update every %u, page length %zu, entries %zu (flags: %s), " "found inconsistent - the right is " - "from %ld to %ld, update every %ld, page length %zu, entries %zu: " + "from %ld to %ld, update every %u, page length %zu, entries %zu: " "%s%s%s%s%s%s%s", uuid_str, msg, vd.type, start_time_s, end_time_s, now_s, update_every_s, page_length, entries, wb?buffer_tostring(wb):"", @@ -871,11 +874,11 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL * if (descr) { start_time_s = (time_t)(descr->start_time_ut / USEC_PER_SEC); switch (descr->type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC); break; - case PAGE_GORILLA_METRICS: + case RRDENG_PAGE_TYPE_GORILLA_32BIT: end_time_s = (time_t) start_time_s + (descr->gorilla.delta_time_s); break; } @@ -938,7 +941,6 @@ static bool epdl_populate_pages_from_extent_data( PDC_PAGE_STATUS tags, bool cached_extent) { - int ret; unsigned i, count; void *uncompressed_buf = NULL; uint32_t payload_length, payload_offset, trailer_offset, uncompressed_payload_length = 0; @@ -973,18 +975,17 @@ static bool epdl_populate_pages_from_extent_data( if( !can_use_data || count < 1 || count > MAX_PAGES_PER_EXTENT || - (header->compression_algorithm != RRD_NO_COMPRESSION && header->compression_algorithm != RRD_LZ4) || + !dbengine_valid_compression_algorithm(header->compression_algorithm) || (payload_length != trailer_offset - payload_offset) || (data_length != payload_offset + payload_length + sizeof(*trailer)) - ) { + ) { epdl_extent_loading_error_log(ctx, epdl, NULL, "header is INVALID"); return false; } crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, data, epdl->extent_size - sizeof(*trailer)); - ret = crc32cmp(trailer->checksum, crc); - if (unlikely(ret)) { + if (unlikely(crc32cmp(trailer->checksum, crc))) { ctx_io_error(ctx); have_read_error = true; epdl_extent_loading_error_log(ctx, epdl, NULL, "CRC32 checksum FAILED"); @@ -993,14 +994,15 @@ static bool epdl_populate_pages_from_extent_data( if(worker) worker_is_busy(UV_EVENT_DBENGINE_EXTENT_DECOMPRESSION); - if (likely(!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm)) { + if (likely(!have_read_error && RRDENG_COMPRESSION_NONE != header->compression_algorithm)) { // find the uncompressed extent size uncompressed_payload_length = 0; for (i = 0; i < count; ++i) { size_t page_length = header->descr[i].page_length; - if (page_length > RRDENG_BLOCK_SIZE && (header->descr[i].type != PAGE_GORILLA_METRICS || - (header->descr[i].type == PAGE_GORILLA_METRICS && - (page_length - RRDENG_BLOCK_SIZE) % GORILLA_BUFFER_SIZE))) { + if (page_length > RRDENG_BLOCK_SIZE && + (header->descr[i].type != RRDENG_PAGE_TYPE_GORILLA_32BIT || + (header->descr[i].type == RRDENG_PAGE_TYPE_GORILLA_32BIT && + (page_length - RRDENG_BLOCK_SIZE) % RRDENG_GORILLA_32BIT_BUFFER_SIZE))) { have_read_error = true; break; } @@ -1015,11 +1017,16 @@ static bool epdl_populate_pages_from_extent_data( eb = extent_buffer_get(uncompressed_payload_length); uncompressed_buf = eb->data; - ret = LZ4_decompress_safe(data + payload_offset, uncompressed_buf, - (int) payload_length, (int) uncompressed_payload_length); + size_t bytes = dbengine_decompress(uncompressed_buf, data + payload_offset, + uncompressed_payload_length, payload_length, + header->compression_algorithm); - __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED); - __atomic_add_fetch(&ctx->stats.after_decompress_bytes, ret, __ATOMIC_RELAXED); + if(!bytes) + have_read_error = true; + else { + __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED); + __atomic_add_fetch(&ctx->stats.after_decompress_bytes, bytes, __ATOMIC_RELAXED); + } } } @@ -1075,7 +1082,7 @@ static bool epdl_populate_pages_from_extent_data( stats_load_invalid_page++; } else { - if (RRD_NO_COMPRESSION == header->compression_algorithm) { + if (RRDENG_COMPRESSION_NONE == header->compression_algorithm) { pgd = pgd_create_from_disk_data(header->descr[i].type, data + payload_offset + page_offset, vd.page_length); diff --git a/database/engine/pdc.h b/src/database/engine/pdc.h index 9bae39ade..9bae39ade 100644 --- a/database/engine/pdc.h +++ b/src/database/engine/pdc.h diff --git a/database/engine/rrddiskprotocol.h b/src/database/engine/rrddiskprotocol.h index 86b41f0b3..dc1a4c980 100644 --- a/database/engine/rrddiskprotocol.h +++ b/src/database/engine/rrddiskprotocol.h @@ -19,13 +19,16 @@ #define UUID_SZ (16) #define CHECKSUM_SZ (4) /* CRC32 */ -#define RRD_NO_COMPRESSION (0) -#define RRD_LZ4 (1) +#define RRDENG_COMPRESSION_NONE (0) +#define RRDENG_COMPRESSION_LZ4 (1) +#define RRDENG_COMPRESSION_ZSTD (2) #define RRDENG_DF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ + sizeof(uint8_t))) + /* * Data file persistent super-block */ + struct rrdeng_df_sb { char magic_number[RRDENG_MAGIC_SZ]; char version[RRDENG_VER_SZ]; @@ -36,10 +39,11 @@ struct rrdeng_df_sb { /* * Page types */ -#define PAGE_METRICS (0) -#define PAGE_TIER (1) -#define PAGE_GORILLA_METRICS (2) -#define PAGE_TYPE_MAX 2 // Maximum page type (inclusive) + +#define RRDENG_PAGE_TYPE_ARRAY_32BIT (0) +#define RRDENG_PAGE_TYPE_ARRAY_TIER1 (1) +#define RRDENG_PAGE_TYPE_GORILLA_32BIT (2) +#define RRDENG_PAGE_TYPE_MAX (2) // Maximum page type (inclusive) /* * Data file page descriptor diff --git a/database/engine/rrdengine.c b/src/database/engine/rrdengine.c index b82cc1ad1..7b2137436 100644 --- a/database/engine/rrdengine.c +++ b/src/database/engine/rrdengine.c @@ -3,6 +3,7 @@ #include "rrdengine.h" #include "pdc.h" +#include "dbengine-compression.h" rrdeng_stats_t global_io_errors = 0; rrdeng_stats_t global_fs_errors = 0; @@ -229,7 +230,7 @@ static void after_work_standard_callback(uv_work_t* req, int status) { worker_is_idle(); } -static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb work_cb, after_work_cb after_work_cb) { +static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb do_work_cb, after_work_cb do_after_work_cb) { struct rrdeng_work *work_request = NULL; internal_fatal(rrdeng_main.tid != gettid(), "work_dispatch() can only be run from the event loop thread"); @@ -240,8 +241,8 @@ static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct com work_request->ctx = ctx; work_request->data = data; work_request->completion = completion; - work_request->work_cb = work_cb; - work_request->after_work_cb = after_work_cb; + work_request->work_cb = do_work_cb; + work_request->after_work_cb = do_after_work_cb; work_request->opcode = opcode; if(uv_queue_work(&rrdeng_main.loop, &work_request->req, work_standard_worker, after_work_standard_callback)) { @@ -772,13 +773,10 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_ */ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_instance *ctx, struct page_descr_with_data *base, struct completion *completion) { int ret; - int compressed_size, max_compressed_size = 0; unsigned i, count, size_bytes, pos, real_io_size; - uint32_t uncompressed_payload_length, payload_offset; + uint32_t uncompressed_payload_length, max_compressed_size, payload_offset; struct page_descr_with_data *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; struct extent_io_descriptor *xt_io_descr; - struct extent_buffer *eb = NULL; - void *compressed_buf = NULL; Word_t Index; uint8_t compression_algorithm = ctx->config.global_compress_alg; struct rrdengine_datafile *datafile; @@ -807,20 +805,8 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta xt_io_descr = extent_io_descriptor_get(); xt_io_descr->ctx = ctx; payload_offset = sizeof(*header) + count * sizeof(header->descr[0]); - switch (compression_algorithm) { - case RRD_NO_COMPRESSION: - size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer); - break; - - default: /* Compress */ - fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); - max_compressed_size = LZ4_compressBound(uncompressed_payload_length); - eb = extent_buffer_get(max_compressed_size); - compressed_buf = eb->data; - size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer); - break; - } - + max_compressed_size = dbengine_max_compressed_size(uncompressed_payload_length, compression_algorithm); + size_bytes = payload_offset + MAX(uncompressed_payload_length, max_compressed_size) + sizeof(*trailer); ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); if (unlikely(ret)) { fatal("DBENGINE: posix_memalign:%s", strerror(ret)); @@ -832,7 +818,6 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta pos = 0; header = xt_io_descr->buf; - header->compression_algorithm = compression_algorithm; header->number_of_pages = count; pos += sizeof(*header); @@ -844,11 +829,11 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta header->descr[i].start_time_ut = descr->start_time_ut; switch (descr->type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: header->descr[i].end_time_ut = descr->end_time_ut; break; - case PAGE_GORILLA_METRICS: + case RRDENG_PAGE_TYPE_GORILLA_32BIT: header->descr[i].gorilla.delta_time_s = (uint32_t) ((descr->end_time_ut - descr->start_time_ut) / USEC_PER_SEC); header->descr[i].gorilla.entries = pgd_slots_used(descr->pgd); break; @@ -858,29 +843,40 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta pos += sizeof(header->descr[i]); } + + // build the extent payload for (i = 0 ; i < count ; ++i) { descr = xt_io_descr->descr_array[i]; pgd_copy_to_extent(descr->pgd, xt_io_descr->buf + pos, descr->page_length); pos += descr->page_length; } - if(likely(compression_algorithm == RRD_LZ4)) { - compressed_size = LZ4_compress_default( - xt_io_descr->buf + payload_offset, - compressed_buf, - (int)uncompressed_payload_length, - max_compressed_size); + // compress the payload + size_t compressed_size = + (int)dbengine_compress(xt_io_descr->buf + payload_offset, + uncompressed_payload_length, + compression_algorithm); - __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED); - __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED); + internal_fatal(compressed_size > max_compressed_size, "DBENGINE: compression returned more data than the max allowed"); + internal_fatal(compressed_size > uncompressed_payload_length, "DBENGINE: compression returned more data than the uncompressed extent"); - (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size); - extent_buffer_release(eb); - size_bytes = payload_offset + compressed_size + sizeof(*trailer); + if(compressed_size) { + header->compression_algorithm = compression_algorithm; header->payload_length = compressed_size; } - else { // RRD_NO_COMPRESSION - header->payload_length = uncompressed_payload_length; + else { + // compression failed, or generated bigger pages + // so it didn't touch our uncompressed buffer + header->compression_algorithm = RRDENG_COMPRESSION_NONE; + header->payload_length = compressed_size = uncompressed_payload_length; + } + + // set the correct size + size_bytes = payload_offset + compressed_size + sizeof(*trailer); + + if(compression_algorithm != RRDENG_COMPRESSION_NONE) { + __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED); + __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED); } real_io_size = ALIGN_BYTES_CEILING(size_bytes); @@ -1171,7 +1167,17 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r for (size_t index = 0; index < added; ++index) { uuid_first_t_entry = &uuid_first_entry_list[index]; if (likely(uuid_first_t_entry->first_time_s != LONG_MAX)) { - mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s); + + time_t old_first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric); + + bool changed = mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s); + if (changed) { + uint32_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric); + if (update_every_s && old_first_time_s && uuid_first_t_entry->first_time_s > old_first_time_s) { + uint64_t remove_samples = (uuid_first_t_entry->first_time_s - old_first_time_s) / update_every_s; + __atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED); + } + } mrg_metric_release(main_mrg, uuid_first_t_entry->metric); } else { @@ -1180,6 +1186,14 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r // there is no retention for this metric bool has_retention = mrg_metric_zero_disk_retention(main_mrg, uuid_first_t_entry->metric); if (!has_retention) { + time_t first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric); + time_t last_time_s = mrg_metric_get_latest_time_s(main_mrg, uuid_first_t_entry->metric); + time_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric); + if (update_every_s && first_time_s && last_time_s) { + uint64_t remove_samples = (first_time_s - last_time_s) / update_every_s; + __atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED); + } + bool deleted = mrg_metric_release_and_delete(main_mrg, uuid_first_t_entry->metric); if(deleted) deleted_metrics++; diff --git a/database/engine/rrdengine.h b/src/database/engine/rrdengine.h index cd3352f12..3047e0c6a 100644 --- a/database/engine/rrdengine.h +++ b/src/database/engine/rrdengine.h @@ -153,9 +153,9 @@ struct jv2_metrics_info { struct jv2_page_info { time_t start_time_s; time_t end_time_s; - time_t update_every_s; - size_t page_length; + uint32_t update_every_s; uint32_t extent_index; + size_t page_length; void *custom_data; // private @@ -217,7 +217,7 @@ struct rrdeng_query_handle { // internal data time_t now_s; - time_t dt_s; + uint32_t dt_s; unsigned position; unsigned entries; @@ -387,6 +387,8 @@ struct rrdengine_instance { unsigned extents_currently_being_flushed; // non-zero until we commit data to disk (both datafile and journal file) time_t first_time_s; + uint64_t metrics; + uint64_t samples; } atomic; struct { @@ -482,7 +484,7 @@ struct page_descr_with_data *page_descriptor_get(void); typedef struct validated_page_descriptor { time_t start_time_s; time_t end_time_s; - time_t update_every_s; + uint32_t update_every_s; size_t page_length; size_t point_size; size_t entries; @@ -499,16 +501,16 @@ typedef struct validated_page_descriptor { VALIDATED_PAGE_DESCRIPTOR validate_page(uuid_t *uuid, time_t start_time_s, time_t end_time_s, - time_t update_every_s, + uint32_t update_every_s, size_t page_length, uint8_t page_type, size_t entries, time_t now_s, - time_t overwrite_zero_update_every_s, + uint32_t overwrite_zero_update_every_s, bool have_read_error, const char *msg, RRDENG_COLLECT_PAGE_FLAGS flags); -VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error); +VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, uint32_t overwrite_zero_update_every_s, bool have_read_error); void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags); typedef enum { diff --git a/database/engine/rrdengineapi.c b/src/database/engine/rrdengineapi.c index 1ddce5243..43fed492b 100755 --- a/database/engine/rrdengineapi.c +++ b/src/database/engine/rrdengineapi.c @@ -2,6 +2,7 @@ #include "database/engine/rrddiskprotocol.h" #include "rrdengine.h" +#include "dbengine-compression.h" /* Default global database instance */ struct rrdengine_instance multidb_ctx_storage_tier0; @@ -16,7 +17,12 @@ struct rrdengine_instance multidb_ctx_storage_tier4; #error RRD_STORAGE_TIERS is not 5 - you need to add allocations here #endif struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS]; -uint8_t tier_page_type[RRD_STORAGE_TIERS] = {PAGE_METRICS, PAGE_TIER, PAGE_TIER, PAGE_TIER, PAGE_TIER}; +uint8_t tier_page_type[RRD_STORAGE_TIERS] = { + RRDENG_PAGE_TYPE_GORILLA_32BIT, + RRDENG_PAGE_TYPE_ARRAY_TIER1, + RRDENG_PAGE_TYPE_ARRAY_TIER1, + RRDENG_PAGE_TYPE_ARRAY_TIER1, + RRDENG_PAGE_TYPE_ARRAY_TIER1}; #if defined(ENV32BIT) size_t tier_page_size[RRD_STORAGE_TIERS] = {2048, 1024, 192, 192, 192}; @@ -24,14 +30,14 @@ size_t tier_page_size[RRD_STORAGE_TIERS] = {2048, 1024, 192, 192, 192}; size_t tier_page_size[RRD_STORAGE_TIERS] = {4096, 2048, 384, 384, 384}; #endif -#if PAGE_TYPE_MAX != 2 +#if RRDENG_PAGE_TYPE_MAX != 2 #error PAGE_TYPE_MAX is not 2 - you need to add allocations here #endif size_t page_type_size[256] = { - [PAGE_METRICS] = sizeof(storage_number), - [PAGE_TIER] = sizeof(storage_number_tier1_t), - [PAGE_GORILLA_METRICS] = sizeof(storage_number) + [RRDENG_PAGE_TYPE_ARRAY_32BIT] = sizeof(storage_number), + [RRDENG_PAGE_TYPE_ARRAY_TIER1] = sizeof(storage_number_tier1_t), + [RRDENG_PAGE_TYPE_GORILLA_32BIT] = sizeof(storage_number) }; __attribute__((constructor)) void initialize_multidb_ctx(void) { @@ -74,14 +80,14 @@ static inline bool rrdeng_page_alignment_release(struct pg_alignment *pa) { } // charts call this -STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid __maybe_unused) { +STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *si __maybe_unused, uuid_t *uuid __maybe_unused) { struct pg_alignment *pa = callocz(1, sizeof(struct pg_alignment)); rrdeng_page_alignment_acquire(pa); return (STORAGE_METRICS_GROUP *)pa; } // charts call this -void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance __maybe_unused, STORAGE_METRICS_GROUP *smg) { +void rrdeng_metrics_group_release(STORAGE_INSTANCE *si __maybe_unused, STORAGE_METRICS_GROUP *smg) { if(unlikely(!smg)) return; struct pg_alignment *pa = (struct pg_alignment *)smg; @@ -108,8 +114,8 @@ void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_ memcpy(ret_uuid, hash_value, sizeof(uuid_t)); } -static METRIC *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id) { - struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; +static METRIC *rrdeng_metric_get_legacy(STORAGE_INSTANCE *si, const char *rd_id, const char *st_id) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)si; uuid_t legacy_uuid; rrdeng_generate_legacy_uuid(rd_id, st_id, &legacy_uuid); return mrg_metric_get_and_acquire(main_mrg, &legacy_uuid, (Word_t) ctx); @@ -118,25 +124,25 @@ static METRIC *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const cha // ---------------------------------------------------------------------------- // metric handle -void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle) { - METRIC *metric = (METRIC *)db_metric_handle; +void rrdeng_metric_release(STORAGE_METRIC_HANDLE *smh) { + METRIC *metric = (METRIC *)smh; mrg_metric_release(main_mrg, metric); } -STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) { - METRIC *metric = (METRIC *)db_metric_handle; +STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *smh) { + METRIC *metric = (METRIC *)smh; return (STORAGE_METRIC_HANDLE *) mrg_metric_dup(main_mrg, metric); } -STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid) { - struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; +STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *si, uuid_t *uuid) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)si; return (STORAGE_METRIC_HANDLE *) mrg_metric_get_and_acquire(main_mrg, uuid, (Word_t) ctx); } -static METRIC *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid) { - internal_fatal(!db_instance, "DBENGINE: db_instance is NULL"); +static METRIC *rrdeng_metric_create(STORAGE_INSTANCE *si, uuid_t *uuid) { + internal_fatal(!si, "DBENGINE: STORAGE_INSTANCE is NULL"); - struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + struct rrdengine_instance *ctx = (struct rrdengine_instance *)si; MRG_ENTRY entry = { .uuid = uuid, .section = (Word_t)ctx, @@ -145,12 +151,15 @@ static METRIC *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid) .latest_update_every_s = 0, }; - METRIC *metric = mrg_metric_add_and_acquire(main_mrg, entry, NULL); + bool added; + METRIC *metric = mrg_metric_add_and_acquire(main_mrg, entry, &added); + if (added) + __atomic_add_fetch(&ctx->atomic.metrics, 1, __ATOMIC_RELAXED); return metric; } -STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance) { - struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; +STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *si) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)si; METRIC *metric; metric = mrg_metric_get_and_acquire(main_mrg, &rd->metric_uuid, (Word_t) ctx); @@ -160,13 +169,13 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE // this is a single host database // generate uuid from the chart and dimensions ids // and overwrite the one supplied by rrddim - metric = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset)); + metric = rrdeng_metric_get_legacy(si, rrddim_id(rd), rrdset_id(rd->rrdset)); if (metric) uuid_copy(rd->metric_uuid, *mrg_metric_uuid(main_mrg, metric)); } if(likely(!metric)) - metric = rrdeng_metric_create(db_instance, &rd->metric_uuid); + metric = rrdeng_metric_create(si, &rd->metric_uuid); } #ifdef NETDATA_INTERNAL_CHECKS @@ -192,14 +201,14 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE // collect ops static inline void check_and_fix_mrg_update_every(struct rrdeng_collect_handle *handle) { - if(unlikely((time_t)(handle->update_every_ut / USEC_PER_SEC) != mrg_metric_get_update_every_s(main_mrg, handle->metric))) { - internal_error(true, "DBENGINE: collection handle has update every %ld, but the metric registry has %ld. Fixing it.", - (time_t)(handle->update_every_ut / USEC_PER_SEC), mrg_metric_get_update_every_s(main_mrg, handle->metric)); + if(unlikely((uint32_t)(handle->update_every_ut / USEC_PER_SEC) != mrg_metric_get_update_every_s(main_mrg, handle->metric))) { + internal_error(true, "DBENGINE: collection handle has update every %u, but the metric registry has %u. Fixing it.", + (uint32_t)(handle->update_every_ut / USEC_PER_SEC), mrg_metric_get_update_every_s(main_mrg, handle->metric)); if(unlikely(!handle->update_every_ut)) handle->update_every_ut = (usec_t)mrg_metric_get_update_every_s(main_mrg, handle->metric) * USEC_PER_SEC; else - mrg_metric_set_update_every(main_mrg, handle->metric, (time_t)(handle->update_every_ut / USEC_PER_SEC)); + mrg_metric_set_update_every(main_mrg, handle->metric, (uint32_t)(handle->update_every_ut / USEC_PER_SEC)); } } @@ -213,7 +222,7 @@ static inline bool check_completed_page_consistency(struct rrdeng_collect_handle uuid_t *uuid = mrg_metric_uuid(main_mrg, handle->metric); time_t start_time_s = pgc_page_start_time_s(handle->pgc_page); time_t end_time_s = pgc_page_end_time_s(handle->pgc_page); - time_t update_every_s = pgc_page_update_every_s(handle->pgc_page); + uint32_t update_every_s = pgc_page_update_every_s(handle->pgc_page); size_t page_length = handle->page_position * CTX_POINT_SIZE_BYTES(ctx); size_t entries = handle->page_position; time_t overwrite_zero_update_every_s = (time_t)(handle->update_every_ut / USEC_PER_SEC); @@ -245,8 +254,8 @@ static inline bool check_completed_page_consistency(struct rrdeng_collect_handle * Gets a handle for storing metrics to the database. * The handle must be released with rrdeng_store_metric_final(). */ -STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) { - METRIC *metric = (METRIC *)db_metric_handle; +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *smh, uint32_t update_every, STORAGE_METRICS_GROUP *smg) { + METRIC *metric = (METRIC *)smh; struct rrdengine_instance *ctx = mrg_metric_ctx(metric); bool is_1st_metric_writer = true; @@ -262,7 +271,7 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri struct rrdeng_collect_handle *handle; handle = callocz(1, sizeof(struct rrdeng_collect_handle)); - handle->common.backend = STORAGE_ENGINE_BACKEND_DBENGINE; + handle->common.seb = STORAGE_ENGINE_BACKEND_DBENGINE; handle->metric = metric; handle->pgc_page = NULL; @@ -288,15 +297,15 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri // data collection may be able to go back in time and during the addition of new pages // clean pages may be found matching ours! - time_t db_first_time_s, db_last_time_s, db_update_every_s; - mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s); + time_t db_first_time_s, db_last_time_s; + mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, NULL); handle->page_end_time_ut = (usec_t)db_last_time_s * USEC_PER_SEC; return (STORAGE_COLLECT_HANDLE *)handle; } -void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) { - struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; +void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *sch) { + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch; if (unlikely(!handle->pgc_page)) return; @@ -307,7 +316,17 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h else { check_completed_page_consistency(handle); mrg_metric_set_clean_latest_time_s(main_mrg, handle->metric, pgc_page_end_time_s(handle->pgc_page)); - pgc_page_hot_to_dirty_and_release(main_cache, handle->pgc_page); + + struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric); + time_t start_time_s = pgc_page_start_time_s(handle->pgc_page); + time_t end_time_s = pgc_page_end_time_s(handle->pgc_page); + uint32_t update_every_s = mrg_metric_get_update_every_s(main_mrg, handle->metric); + if (end_time_s && start_time_s && end_time_s > start_time_s && update_every_s) { + uint64_t add_samples = (end_time_s - start_time_s) / update_every_s; + __atomic_add_fetch(&ctx->atomic.samples, add_samples, __ATOMIC_RELAXED); + } + + pgc_page_hot_to_dirty_and_release(main_cache, handle->pgc_page, false); } mrg_metric_set_hot_latest_time_s(main_mrg, handle->metric, 0); @@ -336,7 +355,7 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha PGD *data, size_t data_size) { time_t point_in_time_s = (time_t)(point_in_time_ut / USEC_PER_SEC); - const time_t update_every_s = (time_t)(handle->update_every_ut / USEC_PER_SEC); + const uint32_t update_every_s = (uint32_t)(handle->update_every_ut / USEC_PER_SEC); PGC_ENTRY page_entry = { .section = (Word_t) ctx, @@ -345,7 +364,7 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha .end_time_s = point_in_time_s, .size = data_size, .data = data, - .update_every_s = (uint32_t) update_every_s, + .update_every_s = update_every_s, .hot = true }; @@ -364,11 +383,11 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha nd_log_limit_static_global_var(erl, 1, 0); nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING, #endif - "DBENGINE: metric '%s' new page from %ld to %ld, update every %ld, has a conflict in main cache " - "with existing %s%s page from %ld to %ld, update every %ld - " + "DBENGINE: metric '%s' new page from %ld to %ld, update every %u, has a conflict in main cache " + "with existing %s%s page from %ld to %ld, update every %u - " "is it collected more than once?", uuid, - page_entry.start_time_s, page_entry.end_time_s, (time_t)page_entry.update_every_s, + page_entry.start_time_s, page_entry.end_time_s, page_entry.update_every_s, pgc_is_page_hot(pgc_page) ? "hot" : "not-hot", pgc_page_data(pgc_page) == PGD_EMPTY ? " gap" : "", pgc_page_start_time_s(pgc_page), pgc_page_end_time_s(pgc_page), pgc_page_update_every_s(pgc_page) @@ -444,14 +463,14 @@ static PGD *rrdeng_alloc_new_page_data(struct rrdeng_collect_handle *handle, siz *data_size = size; switch (ctx->config.page_type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: d = pgd_create(ctx->config.page_type, slots); break; - case PAGE_GORILLA_METRICS: + case RRDENG_PAGE_TYPE_GORILLA_32BIT: // ignore slots, and use the fixed number of slots per gorilla buffer. // gorilla will automatically add more buffers if needed. - d = pgd_create(ctx->config.page_type, GORILLA_BUFFER_SLOTS); + d = pgd_create(ctx->config.page_type, RRDENG_GORILLA_32BIT_BUFFER_SLOTS); break; default: fatal("Unknown page type: %uc\n", ctx->config.page_type); @@ -461,7 +480,7 @@ static PGD *rrdeng_alloc_new_page_data(struct rrdeng_collect_handle *handle, siz return d; } -static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_handle, +static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *sch, const usec_t point_in_time_ut, const NETDATA_DOUBLE n, const NETDATA_DOUBLE min_value, @@ -470,7 +489,7 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_ const uint16_t anomaly_count, const SN_FLAGS flags) { - struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch; struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric); if(unlikely(!handle->page_data)) @@ -497,7 +516,7 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_ if(unlikely(++handle->page_position >= handle->page_entries_max)) { internal_fatal(handle->page_position > handle->page_entries_max, "DBENGINE: exceeded page max number of points"); handle->page_flags |= RRDENG_PAGE_FULL; - rrdeng_store_metric_flush_current_page(collection_handle); + rrdeng_store_metric_flush_current_page(sch); } } @@ -543,7 +562,7 @@ static void store_metric_next_error_log(struct rrdeng_collect_handle *handle __m #endif } -void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, +void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *sch, const usec_t point_in_time_ut, const NETDATA_DOUBLE n, const NETDATA_DOUBLE min_value, @@ -554,7 +573,7 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, { timing_step(TIMING_STEP_RRDSET_STORE_METRIC); - struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch; #ifdef NETDATA_INTERNAL_CHECKS if(unlikely(point_in_time_ut > (usec_t)max_acceptable_collected_time() * USEC_PER_SEC)) @@ -571,11 +590,11 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, if(handle->pgc_page) { if (unlikely(delta_ut < handle->update_every_ut)) { handle->page_flags |= RRDENG_PAGE_STEP_TOO_SMALL; - rrdeng_store_metric_flush_current_page(collection_handle); + rrdeng_store_metric_flush_current_page(sch); } else if (unlikely(delta_ut % handle->update_every_ut)) { handle->page_flags |= RRDENG_PAGE_STEP_UNALIGNED; - rrdeng_store_metric_flush_current_page(collection_handle); + rrdeng_store_metric_flush_current_page(sch); } else { size_t points_gap = delta_ut / handle->update_every_ut; @@ -583,7 +602,7 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, if (points_gap >= page_remaining_points) { handle->page_flags |= RRDENG_PAGE_BIG_GAP; - rrdeng_store_metric_flush_current_page(collection_handle); + rrdeng_store_metric_flush_current_page(sch); } else { // loop to fill the gap @@ -594,7 +613,7 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, this_ut <= stop_ut; this_ut = handle->page_end_time_ut + handle->update_every_ut) { rrdeng_store_metric_append_point( - collection_handle, + sch, this_ut, NAN, NAN, NAN, 1, 0, @@ -618,7 +637,7 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, timing_step(TIMING_STEP_DBENGINE_FIRST_CHECK); - rrdeng_store_metric_append_point(collection_handle, + rrdeng_store_metric_append_point(sch, point_in_time_ut, n, min_value, max_value, count, anomaly_count, @@ -629,12 +648,12 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, * Releases the database reference from the handle for storing metrics. * Returns 1 if it's safe to delete the dimension. */ -int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { - struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; +int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *sch) { + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch; struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric); handle->page_flags |= RRDENG_PAGE_COLLECT_FINALIZE; - rrdeng_store_metric_flush_current_page(collection_handle); + rrdeng_store_metric_flush_current_page(sch); rrdeng_page_alignment_release(handle->alignment); __atomic_sub_fetch(&ctx->atomic.collectors_running, 1, __ATOMIC_RELAXED); @@ -644,8 +663,8 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { if((handle->options & RRDENG_1ST_METRIC_WRITER) && !mrg_metric_clear_writer(main_mrg, handle->metric)) internal_fatal(true, "DBENGINE: metric is already released"); - time_t first_time_s, last_time_s, update_every_s; - mrg_metric_get_retention(main_mrg, handle->metric, &first_time_s, &last_time_s, &update_every_s); + time_t first_time_s, last_time_s; + mrg_metric_get_retention(main_mrg, handle->metric, &first_time_s, &last_time_s, NULL); mrg_metric_release(main_mrg, handle->metric); freez(handle); @@ -656,8 +675,8 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) { return 0; } -void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) { - struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle; +void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *sch, int update_every) { + struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch; check_and_fix_mrg_update_every(handle); METRIC *metric = handle->metric; @@ -667,7 +686,7 @@ void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *col return; handle->page_flags |= RRDENG_PAGE_UPDATE_EVERY_CHANGE; - rrdeng_store_metric_flush_current_page(collection_handle); + rrdeng_store_metric_flush_current_page(sch); mrg_metric_set_update_every(main_mrg, metric, update_every); handle->update_every_ut = update_every_ut; } @@ -704,8 +723,8 @@ static void unregister_query_handle(struct rrdeng_query_handle *handle __maybe_u * Gets a handle for loading metrics from the database. * The handle must be released with rrdeng_load_metric_final(). */ -void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, - struct storage_engine_query_handle *rrddim_handle, +void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *smh, + struct storage_engine_query_handle *seqh, time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority) @@ -714,7 +733,7 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, netdata_thread_disable_cancelability(); - METRIC *metric = (METRIC *)db_metric_handle; + METRIC *metric = (METRIC *)smh; struct rrdengine_instance *ctx = mrg_metric_ctx(metric); struct rrdeng_query_handle *handle; @@ -736,7 +755,8 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, // is inserted into the main cache, to avoid scanning the journals // again for pages matching the gap. - time_t db_first_time_s, db_last_time_s, db_update_every_s; + time_t db_first_time_s, db_last_time_s; + uint32_t db_update_every_s; mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s); if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) == PAGE_IS_IN_RANGE) { @@ -750,11 +770,11 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, mrg_metric_set_update_every_s_if_zero(main_mrg, metric, default_rrd_update_every); } - rrddim_handle->handle = (STORAGE_QUERY_HANDLE *) handle; - rrddim_handle->start_time_s = handle->start_time_s; - rrddim_handle->end_time_s = handle->end_time_s; - rrddim_handle->priority = priority; - rrddim_handle->backend = STORAGE_ENGINE_BACKEND_DBENGINE; + seqh->handle = (STORAGE_QUERY_HANDLE *) handle; + seqh->start_time_s = handle->start_time_s; + seqh->end_time_s = handle->end_time_s; + seqh->priority = priority; + seqh->seb = STORAGE_ENGINE_BACKEND_DBENGINE; pg_cache_preload(handle); @@ -766,16 +786,16 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, handle->now_s = start_time_s; handle->dt_s = db_update_every_s; - rrddim_handle->handle = (STORAGE_QUERY_HANDLE *) handle; - rrddim_handle->start_time_s = handle->start_time_s; - rrddim_handle->end_time_s = 0; - rrddim_handle->priority = priority; - rrddim_handle->backend = STORAGE_ENGINE_BACKEND_DBENGINE; + seqh->handle = (STORAGE_QUERY_HANDLE *) handle; + seqh->start_time_s = handle->start_time_s; + seqh->end_time_s = 0; + seqh->priority = priority; + seqh->seb = STORAGE_ENGINE_BACKEND_DBENGINE; } } -static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_handle, bool debug_this __maybe_unused) { - struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle; +static bool rrdeng_load_page_next(struct storage_engine_query_handle *seqh, bool debug_this __maybe_unused) { + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle; struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric); if (likely(handle->page)) { @@ -785,7 +805,7 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han pgdc_reset(&handle->pgdc, NULL, UINT32_MAX); } - if (unlikely(handle->now_s > rrddim_handle->end_time_s)) + if (unlikely(handle->now_s > seqh->end_time_s)) return false; size_t entries = 0; @@ -799,7 +819,7 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han time_t page_start_time_s = pgc_page_start_time_s(handle->page); time_t page_end_time_s = pgc_page_end_time_s(handle->page); - time_t page_update_every_s = pgc_page_update_every_s(handle->page); + uint32_t page_update_every_s = pgc_page_update_every_s(handle->page); unsigned position; if(likely(handle->now_s >= page_start_time_s && handle->now_s <= page_end_time_s)) { @@ -810,13 +830,13 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han } else { position = (handle->now_s - page_start_time_s) * (entries - 1) / (page_end_time_s - page_start_time_s); - time_t point_end_time_s = page_start_time_s + position * page_update_every_s; + time_t point_end_time_s = page_start_time_s + position * (time_t) page_update_every_s; while(point_end_time_s < handle->now_s && position + 1 < entries) { // https://github.com/netdata/netdata/issues/14411 // we really need a while() here, because the delta may be // 2 points at higher tiers position++; - point_end_time_s = page_start_time_s + position * page_update_every_s; + point_end_time_s = page_start_time_s + position * (time_t) page_update_every_s; } handle->now_s = point_end_time_s; } @@ -845,11 +865,11 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han // Returns the metric and sets its timestamp into current_time // IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags) // IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES -STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle) { - struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle; +STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *seqh) { + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle; STORAGE_POINT sp; - if (unlikely(handle->now_s > rrddim_handle->end_time_s)) { + if (unlikely(handle->now_s > seqh->end_time_s)) { storage_point_empty(sp, handle->now_s - handle->dt_s, handle->now_s); goto prepare_for_next_iteration; } @@ -857,8 +877,8 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim if (unlikely(!handle->page || handle->position >= handle->entries)) { // We need to get a new page - if (!rrdeng_load_page_next(rrddim_handle, false)) { - handle->now_s = rrddim_handle->end_time_s; + if (!rrdeng_load_page_next(seqh, false)) { + handle->now_s = seqh->end_time_s; storage_point_empty(sp, handle->now_s - handle->dt_s, handle->now_s); goto prepare_for_next_iteration; } @@ -870,7 +890,7 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim pgdc_get_next_point(&handle->pgdc, handle->position, &sp); prepare_for_next_iteration: - internal_fatal(sp.end_time_s < rrddim_handle->start_time_s, "DBENGINE: this point is too old for this query"); + internal_fatal(sp.end_time_s < seqh->start_time_s, "DBENGINE: this point is too old for this query"); internal_fatal(sp.end_time_s < handle->now_s, "DBENGINE: this point is too old for this point in time"); handle->now_s += handle->dt_s; @@ -879,17 +899,17 @@ prepare_for_next_iteration: return sp; } -int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrddim_handle) { - struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle; - return (handle->now_s > rrddim_handle->end_time_s); +int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *seqh) { + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle; + return (handle->now_s > seqh->end_time_s); } /* * Releases the database reference from the handle for loading metrics. */ -void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_handle) +void rrdeng_load_metric_finalize(struct storage_engine_query_handle *seqh) { - struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle; + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle; if (handle->page) { pgc_page_release(main_cache, handle->page); @@ -901,24 +921,24 @@ void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_hand unregister_query_handle(handle); rrdeng_query_handle_release(handle); - rrddim_handle->handle = NULL; + seqh->handle = NULL; netdata_thread_enable_cancelability(); } -time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle) { - struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle; +time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *seqh) { + struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle; if(handle->pdc) { rrdeng_prep_wait(handle->pdc); - if (handle->pdc->optimal_end_time_s > rrddim_handle->end_time_s) - rrddim_handle->end_time_s = handle->pdc->optimal_end_time_s; + if (handle->pdc->optimal_end_time_s > seqh->end_time_s) + seqh->end_time_s = handle->pdc->optimal_end_time_s; } - return rrddim_handle->end_time_s; + return seqh->end_time_s; } -time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) { - METRIC *metric = (METRIC *)db_metric_handle; +time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *smh) { + METRIC *metric = (METRIC *)smh; time_t latest_time_s = 0; if (metric) @@ -927,8 +947,8 @@ time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) { return latest_time_s; } -time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) { - METRIC *metric = (METRIC *)db_metric_handle; +time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *smh) { + METRIC *metric = (METRIC *)smh; time_t oldest_time_s = 0; if (metric) @@ -937,9 +957,9 @@ time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) { return oldest_time_s; } -bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_uuid, time_t *first_entry_s, time_t *last_entry_s) +bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_s, time_t *last_entry_s) { - struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + struct rrdengine_instance *ctx = (struct rrdengine_instance *)si; if (unlikely(!ctx)) { netdata_log_error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__); return false; @@ -949,26 +969,35 @@ bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_ if (unlikely(!metric)) return false; - time_t update_every_s; - mrg_metric_get_retention(main_mrg, metric, first_entry_s, last_entry_s, &update_every_s); + mrg_metric_get_retention(main_mrg, metric, first_entry_s, last_entry_s, NULL); mrg_metric_release(main_mrg, metric); return true; } -uint64_t rrdeng_disk_space_max(STORAGE_INSTANCE *db_instance) { - struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; +uint64_t rrdeng_disk_space_max(STORAGE_INSTANCE *si) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)si; return ctx->config.max_disk_space; } -uint64_t rrdeng_disk_space_used(STORAGE_INSTANCE *db_instance) { - struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; +uint64_t rrdeng_disk_space_used(STORAGE_INSTANCE *si) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)si; return __atomic_load_n(&ctx->atomic.current_disk_space, __ATOMIC_RELAXED); } -time_t rrdeng_global_first_time_s(STORAGE_INSTANCE *db_instance) { - struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; +uint64_t rrdeng_metrics(STORAGE_INSTANCE *si) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)si; + return __atomic_load_n(&ctx->atomic.metrics, __ATOMIC_RELAXED); +} + +uint64_t rrdeng_samples(STORAGE_INSTANCE *si) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)si; + return __atomic_load_n(&ctx->atomic.samples, __ATOMIC_RELAXED); +} + +time_t rrdeng_global_first_time_s(STORAGE_INSTANCE *si) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)si; time_t t = __atomic_load_n(&ctx->atomic.first_time_s, __ATOMIC_RELAXED); if(t == LONG_MAX || t < 0) @@ -977,8 +1006,8 @@ time_t rrdeng_global_first_time_s(STORAGE_INSTANCE *db_instance) { return t; } -size_t rrdeng_currently_collected_metrics(STORAGE_INSTANCE *db_instance) { - struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; +size_t rrdeng_currently_collected_metrics(STORAGE_INSTANCE *si) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)si; return __atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED); } @@ -1099,8 +1128,8 @@ void rrdeng_readiness_wait(struct rrdengine_instance *ctx) { netdata_log_info("DBENGINE: tier %d is ready for data collection and queries", ctx->config.tier); } -bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance) { - struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; +bool rrdeng_is_legacy(STORAGE_INSTANCE *si) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)si; return ctx->config.legacy; } @@ -1142,7 +1171,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, const char *dbfiles_path, ctx->config.tier = (int)tier; ctx->config.page_type = tier_page_type[tier]; - ctx->config.global_compress_alg = RRD_LZ4; + ctx->config.global_compress_alg = dbengine_default_compression(); if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB) disk_space_mb = RRDENG_MIN_DISK_SPACE_MB; ctx->config.max_disk_space = disk_space_mb * 1048576LLU; @@ -1154,6 +1183,8 @@ int rrdeng_init(struct rrdengine_instance **ctxp, const char *dbfiles_path, rw_spinlock_init(&ctx->njfv2idx.spinlock); ctx->atomic.first_time_s = LONG_MAX; + ctx->atomic.metrics = 0; + ctx->atomic.samples = 0; if (rrdeng_dbengine_spawn(ctx) && !init_rrd_files(ctx)) { // success - we run this ctx too diff --git a/database/engine/rrdengineapi.h b/src/database/engine/rrdengineapi.h index 7ae0e7079..fb449cd9b 100644 --- a/database/engine/rrdengineapi.h +++ b/src/database/engine/rrdengineapi.h @@ -26,32 +26,32 @@ extern uint8_t tier_page_type[]; void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_t *ret_uuid); -STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance); -STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); -void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle); -STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle); - -STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg); -void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle); -void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every); -void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, NETDATA_DOUBLE n, +STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *si); +STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *si, uuid_t *uuid); +void rrdeng_metric_release(STORAGE_METRIC_HANDLE *smh); +STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *smh); + +STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *smh, uint32_t update_every, STORAGE_METRICS_GROUP *smg); +void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *sch); +void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *sch, int update_every); +void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *sch, usec_t point_in_time_ut, NETDATA_DOUBLE n, NETDATA_DOUBLE min_value, NETDATA_DOUBLE max_value, uint16_t count, uint16_t anomaly_count, SN_FLAGS flags); -int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle); +int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *sch); -void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrddim_handle, +void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *smh, struct storage_engine_query_handle *seqh, time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority); -STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle); +STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *seqh); -int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrddim_handle); -void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_handle); -time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle); -time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle); -time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle); +int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *seqh); +void rrdeng_load_metric_finalize(struct storage_engine_query_handle *seqh); +time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *smh); +time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *smh); +time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *seqh); void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array); @@ -64,10 +64,10 @@ void rrdeng_exit_mode(struct rrdengine_instance *ctx); int rrdeng_exit(struct rrdengine_instance *ctx); void rrdeng_prepare_exit(struct rrdengine_instance *ctx); -bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_uuid, time_t *first_entry_s, time_t *last_entry_s); +bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_s, time_t *last_entry_s); -extern STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid); -extern void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg); +extern STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *si, uuid_t *uuid); +extern void rrdeng_metrics_group_release(STORAGE_INSTANCE *si, STORAGE_METRICS_GROUP *smg); typedef struct rrdengine_size_statistics { size_t default_granularity_secs; @@ -221,9 +221,6 @@ struct rrdeng_cache_efficiency_stats rrdeng_get_cache_efficiency_stats(void); RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx); size_t rrdeng_collectors_running(struct rrdengine_instance *ctx); -bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance); - -uint64_t rrdeng_disk_space_max(STORAGE_INSTANCE *db_instance); -uint64_t rrdeng_disk_space_used(STORAGE_INSTANCE *db_instance); +bool rrdeng_is_legacy(STORAGE_INSTANCE *si); #endif /* NETDATA_RRDENGINEAPI_H */ diff --git a/database/engine/rrdenginelib.c b/src/database/engine/rrdenginelib.c index dc581d98d..dc581d98d 100644 --- a/database/engine/rrdenginelib.c +++ b/src/database/engine/rrdenginelib.c diff --git a/database/engine/rrdenginelib.h b/src/database/engine/rrdenginelib.h index a0febd4f4..a0febd4f4 100644 --- a/database/engine/rrdenginelib.h +++ b/src/database/engine/rrdenginelib.h |