summaryrefslogtreecommitdiffstats
path: root/src/database/engine
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:18 +0000
commit5da14042f70711ea5cf66e034699730335462f66 (patch)
tree0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/database/engine
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz
netdata-5da14042f70711ea5cf66e034699730335462f66.zip
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--src/database/engine/README.md (renamed from database/engine/README.md)0
-rw-r--r--src/database/engine/cache.c (renamed from database/engine/cache.c)93
-rw-r--r--src/database/engine/cache.h (renamed from database/engine/cache.h)9
-rw-r--r--src/database/engine/datafile.c (renamed from database/engine/datafile.c)4
-rw-r--r--src/database/engine/datafile.h (renamed from database/engine/datafile.h)0
-rw-r--r--src/database/engine/dbengine-compression.c159
-rw-r--r--src/database/engine/dbengine-compression.h15
-rw-r--r--src/database/engine/dbengine-diagram.xml (renamed from database/engine/dbengine-diagram.xml)0
-rw-r--r--src/database/engine/dbengine-stresstest.c456
-rw-r--r--src/database/engine/dbengine-unittest.c419
-rw-r--r--src/database/engine/journalfile.c (renamed from database/engine/journalfile.c)29
-rw-r--r--src/database/engine/journalfile.h (renamed from database/engine/journalfile.h)1
-rw-r--r--src/database/engine/metric.c (renamed from database/engine/metric.c)331
-rw-r--r--src/database/engine/metric.h (renamed from database/engine/metric.h)15
-rw-r--r--src/database/engine/page.c (renamed from database/engine/page.c)106
-rw-r--r--src/database/engine/page.h (renamed from database/engine/page.h)0
-rw-r--r--src/database/engine/page_test.cc (renamed from database/engine/page_test.cc)0
-rw-r--r--src/database/engine/page_test.h (renamed from database/engine/page_test.h)0
-rw-r--r--src/database/engine/pagecache.c (renamed from database/engine/pagecache.c)24
-rw-r--r--src/database/engine/pagecache.h (renamed from database/engine/pagecache.h)6
-rw-r--r--src/database/engine/pdc.c (renamed from database/engine/pdc.c)89
-rw-r--r--src/database/engine/pdc.h (renamed from database/engine/pdc.h)0
-rw-r--r--src/database/engine/rrddiskprotocol.h (renamed from database/engine/rrddiskprotocol.h)16
-rw-r--r--src/database/engine/rrdengine.c (renamed from database/engine/rrdengine.c)92
-rw-r--r--src/database/engine/rrdengine.h (renamed from database/engine/rrdengine.h)16
-rwxr-xr-xsrc/database/engine/rrdengineapi.c (renamed from database/engine/rrdengineapi.c)269
-rw-r--r--src/database/engine/rrdengineapi.h (renamed from database/engine/rrdengineapi.h)45
-rw-r--r--src/database/engine/rrdenginelib.c (renamed from database/engine/rrdenginelib.c)0
-rw-r--r--src/database/engine/rrdenginelib.h (renamed from database/engine/rrdenginelib.h)0
29 files changed, 1705 insertions, 489 deletions
diff --git a/database/engine/README.md b/src/database/engine/README.md
index 890018642..890018642 100644
--- a/database/engine/README.md
+++ b/src/database/engine/README.md
diff --git a/database/engine/cache.c b/src/database/engine/cache.c
index eb1c35298..49a9b6b96 100644
--- a/database/engine/cache.c
+++ b/src/database/engine/cache.c
@@ -1325,21 +1325,8 @@ static PGC_PAGE *page_add(PGC *cache, PGC_ENTRY *entry, bool *added) {
return page;
}
-static PGC_PAGE *page_find_and_acquire(PGC *cache, Word_t section, Word_t metric_id, time_t start_time_s, PGC_SEARCH method) {
- __atomic_add_fetch(&cache->stats.workers_search, 1, __ATOMIC_RELAXED);
-
- size_t *stats_hit_ptr, *stats_miss_ptr;
-
- if(method == PGC_SEARCH_CLOSEST) {
- __atomic_add_fetch(&cache->stats.searches_closest, 1, __ATOMIC_RELAXED);
- stats_hit_ptr = &cache->stats.searches_closest_hits;
- stats_miss_ptr = &cache->stats.searches_closest_misses;
- }
- else {
- __atomic_add_fetch(&cache->stats.searches_exact, 1, __ATOMIC_RELAXED);
- stats_hit_ptr = &cache->stats.searches_exact_hits;
- stats_miss_ptr = &cache->stats.searches_exact_misses;
- }
+static PGC_PAGE *page_find_and_acquire_once(PGC *cache, Word_t section, Word_t metric_id, time_t start_time_s, PGC_SEARCH method, bool *retry) {
+ *retry = false;
PGC_PAGE *page = NULL;
size_t partition = pgc_indexing_partition(cache, metric_id);
@@ -1462,22 +1449,13 @@ static PGC_PAGE *page_find_and_acquire(PGC *cache, Word_t section, Word_t metric
if(!page_acquire(cache, page)) {
// this page is not good to use
+ *retry = true;
page = NULL;
}
}
cleanup:
pgc_index_read_unlock(cache, partition);
-
- if(page) {
- __atomic_add_fetch(stats_hit_ptr, 1, __ATOMIC_RELAXED);
- page_has_been_accessed(cache, page);
- }
- else
- __atomic_add_fetch(stats_miss_ptr, 1, __ATOMIC_RELAXED);
-
- __atomic_sub_fetch(&cache->stats.workers_search, 1, __ATOMIC_RELAXED);
-
return page;
}
@@ -1882,7 +1860,7 @@ void pgc_page_release(PGC *cache, PGC_PAGE *page) {
page_release(cache, page, is_page_clean(page));
}
-void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page) {
+void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page, bool never_flush) {
__atomic_add_fetch(&cache->stats.workers_hot2dirty, 1, __ATOMIC_RELAXED);
//#ifdef NETDATA_INTERNAL_CHECKS
@@ -1901,10 +1879,8 @@ void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page) {
__atomic_sub_fetch(&cache->stats.workers_hot2dirty, 1, __ATOMIC_RELAXED);
// flush, if we have to
- if((cache->config.options & PGC_OPTIONS_FLUSH_PAGES_INLINE) || flushing_critical(cache)) {
- flush_pages(cache, cache->config.max_flushes_inline, PGC_SECTION_ALL,
- false, false);
- }
+ if(!never_flush && ((cache->config.options & PGC_OPTIONS_FLUSH_PAGES_INLINE) || flushing_critical(cache)))
+ flush_pages(cache, cache->config.max_flushes_inline, PGC_SECTION_ALL, false, false);
}
bool pgc_page_to_clean_evict_or_release(PGC *cache, PGC_PAGE *page) {
@@ -1949,13 +1925,13 @@ time_t pgc_page_end_time_s(PGC_PAGE *page) {
return page->end_time_s;
}
-time_t pgc_page_update_every_s(PGC_PAGE *page) {
+uint32_t pgc_page_update_every_s(PGC_PAGE *page) {
return page->update_every_s;
}
-time_t pgc_page_fix_update_every(PGC_PAGE *page, time_t update_every_s) {
+uint32_t pgc_page_fix_update_every(PGC_PAGE *page, uint32_t update_every_s) {
if(page->update_every_s == 0)
- page->update_every_s = (uint32_t) update_every_s;
+ page->update_every_s = update_every_s;
return page->update_every_s;
}
@@ -2050,7 +2026,46 @@ void pgc_page_hot_set_end_time_s(PGC *cache __maybe_unused, PGC_PAGE *page, time
}
PGC_PAGE *pgc_page_get_and_acquire(PGC *cache, Word_t section, Word_t metric_id, time_t start_time_s, PGC_SEARCH method) {
- return page_find_and_acquire(cache, section, metric_id, start_time_s, method);
+ static const struct timespec ns = { .tv_sec = 0, .tv_nsec = 1 };
+
+ PGC_PAGE *page = NULL;
+
+ __atomic_add_fetch(&cache->stats.workers_search, 1, __ATOMIC_RELAXED);
+
+ size_t *stats_hit_ptr, *stats_miss_ptr;
+
+ if(method == PGC_SEARCH_CLOSEST) {
+ __atomic_add_fetch(&cache->stats.searches_closest, 1, __ATOMIC_RELAXED);
+ stats_hit_ptr = &cache->stats.searches_closest_hits;
+ stats_miss_ptr = &cache->stats.searches_closest_misses;
+ }
+ else {
+ __atomic_add_fetch(&cache->stats.searches_exact, 1, __ATOMIC_RELAXED);
+ stats_hit_ptr = &cache->stats.searches_exact_hits;
+ stats_miss_ptr = &cache->stats.searches_exact_misses;
+ }
+
+ while(1) {
+ bool retry = false;
+
+ page = page_find_and_acquire_once(cache, section, metric_id, start_time_s, method, &retry);
+
+ if(page || !retry)
+ break;
+
+ nanosleep(&ns, NULL);
+ }
+
+ if(page) {
+ __atomic_add_fetch(stats_hit_ptr, 1, __ATOMIC_RELAXED);
+ page_has_been_accessed(cache, page);
+ }
+ else
+ __atomic_add_fetch(stats_miss_ptr, 1, __ATOMIC_RELAXED);
+
+ __atomic_sub_fetch(&cache->stats.workers_search, 1, __ATOMIC_RELAXED);
+
+ return page;
}
struct pgc_statistics pgc_get_statistics(PGC *cache) {
@@ -2224,7 +2239,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_
while ((PValue2 = JudyLFirstThenNext(mi->JudyL_pages_by_start_time, &start_time, &start_time_first))) {
struct jv2_page_info *pi = *PValue2;
page_transition_unlock(cache, pi->page);
- pgc_page_hot_to_dirty_and_release(cache, pi->page);
+ pgc_page_hot_to_dirty_and_release(cache, pi->page, true);
// make_acquired_page_clean_and_evict_or_page_release(cache, pi->page);
aral_freez(ar_pi, pi);
}
@@ -2251,6 +2266,8 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_
aral_by_size_release(ar_mi);
__atomic_sub_fetch(&cache->stats.workers_jv2_flush, 1, __ATOMIC_RELAXED);
+
+ flush_pages(cache, cache->config.max_flushes_inline, PGC_SECTION_ALL, false, false);
}
static bool match_page_data(PGC_PAGE *page, void *data) {
@@ -2396,7 +2413,7 @@ void *unittest_stress_test_collector(void *ptr) {
if(i % 10 == 0)
pgc_page_to_clean_evict_or_release(pgc_uts.cache, pgc_uts.metrics[i]);
else
- pgc_page_hot_to_dirty_and_release(pgc_uts.cache, pgc_uts.metrics[i]);
+ pgc_page_hot_to_dirty_and_release(pgc_uts.cache, pgc_uts.metrics[i], false);
}
}
@@ -2721,7 +2738,7 @@ int pgc_unittest(void) {
}, NULL);
pgc_page_hot_set_end_time_s(cache, page2, 2001);
- pgc_page_hot_to_dirty_and_release(cache, page2);
+ pgc_page_hot_to_dirty_and_release(cache, page2, false);
PGC_PAGE *page3 = pgc_page_add_and_acquire(cache, (PGC_ENTRY){
.section = 3,
@@ -2734,7 +2751,7 @@ int pgc_unittest(void) {
}, NULL);
pgc_page_hot_set_end_time_s(cache, page3, 2001);
- pgc_page_hot_to_dirty_and_release(cache, page3);
+ pgc_page_hot_to_dirty_and_release(cache, page3, false);
pgc_destroy(cache);
diff --git a/database/engine/cache.h b/src/database/engine/cache.h
index 7cd7c0636..b6f81bcc2 100644
--- a/database/engine/cache.h
+++ b/src/database/engine/cache.h
@@ -2,6 +2,7 @@
#ifndef DBENGINE_CACHE_H
#define DBENGINE_CACHE_H
+#include "datafile.h"
#include "../rrd.h"
// CACHE COMPILE TIME CONFIGURATION
@@ -27,7 +28,7 @@ typedef struct pgc_entry {
time_t end_time_s; // the end time of the page
size_t size; // the size in bytes of the allocation, outside the cache
void *data; // a pointer to data outside the cache
- uint32_t update_every_s; // the update every of the page
+ uint32_t update_every_s; // the update every of the page
bool hot; // true if this entry is currently being collected
uint8_t *custom_data;
} PGC_ENTRY;
@@ -191,7 +192,7 @@ PGC_PAGE *pgc_page_dup(PGC *cache, PGC_PAGE *page);
void pgc_page_release(PGC *cache, PGC_PAGE *page);
// mark a hot page dirty, and release it
-void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page);
+void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page, bool never_flush);
// find a page from the cache
typedef enum {
@@ -210,8 +211,8 @@ Word_t pgc_page_section(PGC_PAGE *page);
Word_t pgc_page_metric(PGC_PAGE *page);
time_t pgc_page_start_time_s(PGC_PAGE *page);
time_t pgc_page_end_time_s(PGC_PAGE *page);
-time_t pgc_page_update_every_s(PGC_PAGE *page);
-time_t pgc_page_fix_update_every(PGC_PAGE *page, time_t update_every_s);
+uint32_t pgc_page_update_every_s(PGC_PAGE *page);
+uint32_t pgc_page_fix_update_every(PGC_PAGE *page, uint32_t update_every_s);
time_t pgc_page_fix_end_time_s(PGC_PAGE *page, time_t end_time_s);
void *pgc_page_data(PGC_PAGE *page);
void *pgc_page_custom_data(PGC *cache, PGC_PAGE *page);
diff --git a/database/engine/datafile.c b/src/database/engine/datafile.c
index 7322039cd..1ec2dea79 100644
--- a/database/engine/datafile.c
+++ b/src/database/engine/datafile.c
@@ -557,7 +557,9 @@ void finalize_data_files(struct rrdengine_instance *ctx)
{
bool logged = false;
- logged = false;
+ if (!ctx->datafiles.first)
+ return;
+
while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED)) {
if(!logged) {
netdata_log_info("Waiting for inflight flush to finish on tier %d...", ctx->config.tier);
diff --git a/database/engine/datafile.h b/src/database/engine/datafile.h
index 569f1b0a2..569f1b0a2 100644
--- a/database/engine/datafile.h
+++ b/src/database/engine/datafile.h
diff --git a/src/database/engine/dbengine-compression.c b/src/database/engine/dbengine-compression.c
new file mode 100644
index 000000000..46ef2b075
--- /dev/null
+++ b/src/database/engine/dbengine-compression.c
@@ -0,0 +1,159 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "rrdengine.h"
+#include "dbengine-compression.h"
+
+#ifdef ENABLE_LZ4
+#include <lz4.h>
+#endif
+
+#ifdef ENABLE_ZSTD
+#include <zstd.h>
+#define DBENGINE_ZSTD_DEFAULT_COMPRESSION_LEVEL 3
+#endif
+
+uint8_t dbengine_default_compression(void) {
+
+#ifdef ENABLE_LZ4
+ return RRDENG_COMPRESSION_LZ4;
+#endif
+
+ return RRDENG_COMPRESSION_NONE;
+}
+
+bool dbengine_valid_compression_algorithm(uint8_t algorithm) {
+ switch(algorithm) {
+ case RRDENG_COMPRESSION_NONE:
+
+#ifdef ENABLE_LZ4
+ case RRDENG_COMPRESSION_LZ4:
+#endif
+
+#ifdef ENABLE_ZSTD
+ case RRDENG_COMPRESSION_ZSTD:
+#endif
+
+ return true;
+
+ default:
+ return false;
+ }
+}
+
+size_t dbengine_max_compressed_size(size_t uncompressed_size, uint8_t algorithm) {
+ switch(algorithm) {
+#ifdef ENABLE_LZ4
+ case RRDENG_COMPRESSION_LZ4:
+ fatal_assert(uncompressed_size < LZ4_MAX_INPUT_SIZE);
+ return LZ4_compressBound((int)uncompressed_size);
+#endif
+
+#ifdef ENABLE_ZSTD
+ case RRDENG_COMPRESSION_ZSTD:
+ return ZSTD_compressBound(uncompressed_size);
+#endif
+
+ case RRDENG_COMPRESSION_NONE:
+ return uncompressed_size;
+
+ default:
+ fatal("DBENGINE: unknown compression algorithm %u", algorithm);
+ }
+}
+
+size_t dbengine_compress(void *payload, size_t uncompressed_size, uint8_t algorithm) {
+ // the result should be stored in the payload
+ // the caller must have called dbengine_max_compressed_size() to make sure the
+ // payload is big enough to fit the max size needed.
+
+ switch(algorithm) {
+#ifdef ENABLE_LZ4
+ case RRDENG_COMPRESSION_LZ4: {
+ size_t max_compressed_size = dbengine_max_compressed_size(uncompressed_size, algorithm);
+ struct extent_buffer *eb = extent_buffer_get(max_compressed_size);
+ void *compressed_buf = eb->data;
+
+ size_t compressed_size =
+ LZ4_compress_default(payload, compressed_buf, (int)uncompressed_size, (int)max_compressed_size);
+
+ if(compressed_size > 0 && compressed_size < uncompressed_size)
+ memcpy(payload, compressed_buf, compressed_size);
+ else
+ compressed_size = 0;
+
+ extent_buffer_release(eb);
+ return compressed_size;
+ }
+#endif
+
+#ifdef ENABLE_ZSTD
+ case RRDENG_COMPRESSION_ZSTD: {
+ size_t max_compressed_size = dbengine_max_compressed_size(uncompressed_size, algorithm);
+ struct extent_buffer *eb = extent_buffer_get(max_compressed_size);
+ void *compressed_buf = eb->data;
+
+ size_t compressed_size = ZSTD_compress(compressed_buf, max_compressed_size, payload, uncompressed_size,
+ DBENGINE_ZSTD_DEFAULT_COMPRESSION_LEVEL);
+
+ if (ZSTD_isError(compressed_size)) {
+ internal_fatal(true, "DBENGINE: ZSTD compression error %s", ZSTD_getErrorName(compressed_size));
+ compressed_size = 0;
+ }
+
+ if(compressed_size > 0 && compressed_size < uncompressed_size)
+ memcpy(payload, compressed_buf, compressed_size);
+ else
+ compressed_size = 0;
+
+ extent_buffer_release(eb);
+ return compressed_size;
+ }
+#endif
+
+ case RRDENG_COMPRESSION_NONE:
+ return 0;
+
+ default:
+ fatal("DBENGINE: unknown compression algorithm %u", algorithm);
+ }
+}
+
+size_t dbengine_decompress(void *dst, void *src, size_t dst_size, size_t src_size, uint8_t algorithm) {
+ switch(algorithm) {
+
+#ifdef ENABLE_LZ4
+ case RRDENG_COMPRESSION_LZ4: {
+ int rc = LZ4_decompress_safe(src, dst, (int)src_size, (int)dst_size);
+ if(rc < 0) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "DBENGINE: ZSTD decompression error %d", rc);
+ rc = 0;
+ }
+
+ return rc;
+ }
+#endif
+
+#ifdef ENABLE_ZSTD
+ case RRDENG_COMPRESSION_ZSTD: {
+ size_t decompressed_size = ZSTD_decompress(dst, dst_size, src, src_size);
+
+ if (ZSTD_isError(decompressed_size)) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "DBENGINE: ZSTD decompression error %s",
+ ZSTD_getErrorName(decompressed_size));
+
+ decompressed_size = 0;
+ }
+
+ return decompressed_size;
+ }
+#endif
+
+ case RRDENG_COMPRESSION_NONE:
+ internal_fatal(true, "DBENGINE: %s() should not be called for uncompressed pages", __FUNCTION__ );
+ return 0;
+
+ default:
+ internal_fatal(true, "DBENGINE: unknown compression algorithm %u", algorithm);
+ return 0;
+ }
+}
diff --git a/src/database/engine/dbengine-compression.h b/src/database/engine/dbengine-compression.h
new file mode 100644
index 000000000..8dd97f5d7
--- /dev/null
+++ b/src/database/engine/dbengine-compression.h
@@ -0,0 +1,15 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_DBENGINE_COMPRESSION_H
+#define NETDATA_DBENGINE_COMPRESSION_H
+
+uint8_t dbengine_default_compression(void);
+
+bool dbengine_valid_compression_algorithm(uint8_t algorithm);
+
+size_t dbengine_max_compressed_size(size_t uncompressed_size, uint8_t algorithm);
+size_t dbengine_compress(void *payload, size_t uncompressed_size, uint8_t algorithm);
+
+size_t dbengine_decompress(void *dst, void *src, size_t dst_size, size_t src_size, uint8_t algorithm);
+
+#endif //NETDATA_DBENGINE_COMPRESSION_H
diff --git a/database/engine/dbengine-diagram.xml b/src/database/engine/dbengine-diagram.xml
index 793e8a355..793e8a355 100644
--- a/database/engine/dbengine-diagram.xml
+++ b/src/database/engine/dbengine-diagram.xml
diff --git a/src/database/engine/dbengine-stresstest.c b/src/database/engine/dbengine-stresstest.c
new file mode 100644
index 000000000..86d09c4ab
--- /dev/null
+++ b/src/database/engine/dbengine-stresstest.c
@@ -0,0 +1,456 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "../../daemon/common.h"
+
+#ifdef ENABLE_DBENGINE
+
+static RRDHOST *dbengine_rrdhost_find_or_create(char *name) {
+ /* We don't want to drop metrics when generating load,
+ * we prefer to block data generation itself */
+
+ return rrdhost_find_or_create(
+ name,
+ name,
+ name,
+ os_type,
+ netdata_configured_timezone,
+ netdata_configured_abbrev_timezone,
+ netdata_configured_utc_offset,
+ program_name,
+ program_version,
+ default_rrd_update_every,
+ default_rrd_history_entries,
+ RRD_MEMORY_MODE_DBENGINE,
+ health_plugin_enabled(),
+ default_rrdpush_enabled,
+ default_rrdpush_destination,
+ default_rrdpush_api_key,
+ default_rrdpush_send_charts_matching,
+ default_rrdpush_enable_replication,
+ default_rrdpush_seconds_to_replicate,
+ default_rrdpush_replication_step,
+ NULL,
+ 0
+ );
+}
+
+static inline void rrddim_set_by_pointer_fake_time(RRDDIM *rd, collected_number value, time_t now) {
+ rd->collector.last_collected_time.tv_sec = now;
+ rd->collector.last_collected_time.tv_usec = 0;
+ rd->collector.collected_value = value;
+ rrddim_set_updated(rd);
+
+ rd->collector.counter++;
+
+ collected_number v = (value >= 0) ? value : -value;
+ if(unlikely(v > rd->collector.collected_value_max)) rd->collector.collected_value_max = v;
+}
+
+struct dbengine_chart_thread {
+ uv_thread_t thread;
+ RRDHOST *host;
+ char *chartname; /* Will be prefixed by type, e.g. "example_local1.", "example_local2." etc */
+ unsigned dset_charts; /* number of charts */
+ unsigned dset_dims; /* dimensions per chart */
+ unsigned chart_i; /* current chart offset */
+ time_t time_present; /* current virtual time of the benchmark */
+ volatile time_t time_max; /* latest timestamp of stored values */
+ unsigned history_seconds; /* how far back in the past to go */
+
+ volatile long done; /* initialize to 0, set to 1 to stop thread */
+ struct completion charts_initialized;
+ unsigned long errors, stored_metrics_nr; /* statistics */
+
+ RRDSET *st;
+ RRDDIM *rd[]; /* dset_dims elements */
+};
+
+collected_number generate_dbengine_chart_value(int chart_i, int dim_i, time_t time_current)
+{
+ collected_number value;
+
+ value = ((collected_number)time_current) * (chart_i + 1);
+ value += ((collected_number)time_current) * (dim_i + 1);
+ value %= 1024LLU;
+
+ return value;
+}
+
+static void generate_dbengine_chart(void *arg)
+{
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
+ struct dbengine_chart_thread *thread_info = (struct dbengine_chart_thread *)arg;
+ RRDHOST *host = thread_info->host;
+ char *chartname = thread_info->chartname;
+ const unsigned DSET_DIMS = thread_info->dset_dims;
+ unsigned history_seconds = thread_info->history_seconds;
+ time_t time_present = thread_info->time_present;
+
+ unsigned j, update_every = 1;
+ RRDSET *st;
+ RRDDIM *rd[DSET_DIMS];
+ char name[RRD_ID_LENGTH_MAX + 1];
+ time_t time_current;
+
+ // create the chart
+ snprintfz(name, RRD_ID_LENGTH_MAX, "example_local%u", thread_info->chart_i + 1);
+ thread_info->st = st = rrdset_create(host, name, chartname, chartname, "example", NULL, chartname, chartname,
+ chartname, NULL, 1, update_every, RRDSET_TYPE_LINE);
+ for (j = 0 ; j < DSET_DIMS ; ++j) {
+ snprintfz(name, RRD_ID_LENGTH_MAX, "%s%u", chartname, j + 1);
+
+ thread_info->rd[j] = rd[j] = rrddim_add(st, name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ }
+ completion_mark_complete(&thread_info->charts_initialized);
+
+ // feed it with the test data
+ time_current = time_present - history_seconds;
+ for (j = 0 ; j < DSET_DIMS ; ++j) {
+ rd[j]->collector.last_collected_time.tv_sec =
+ st->last_collected_time.tv_sec = st->last_updated.tv_sec = time_current - update_every;
+ rd[j]->collector.last_collected_time.tv_usec =
+ st->last_collected_time.tv_usec = st->last_updated.tv_usec = 0;
+ }
+ for( ; !thread_info->done && time_current < time_present ; time_current += update_every) {
+ st->usec_since_last_update = USEC_PER_SEC * update_every;
+
+ for (j = 0; j < DSET_DIMS; ++j) {
+ collected_number value;
+
+ value = generate_dbengine_chart_value(thread_info->chart_i, j, time_current);
+ rrddim_set_by_pointer_fake_time(rd[j], value, time_current);
+ ++thread_info->stored_metrics_nr;
+ }
+ rrdset_done(st);
+ thread_info->time_max = time_current;
+ }
+ for (j = 0; j < DSET_DIMS; ++j) {
+ rrdeng_store_metric_finalize((rd[j])->tiers[0].sch);
+ }
+}
+
+void generate_dbengine_dataset(unsigned history_seconds)
+{
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
+ const int DSET_CHARTS = 16;
+ const int DSET_DIMS = 128;
+ const uint64_t EXPECTED_COMPRESSION_RATIO = 20;
+ RRDHOST *host = NULL;
+ struct dbengine_chart_thread **thread_info;
+ int i;
+ time_t time_present;
+
+ default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
+ default_rrdeng_page_cache_mb = 128;
+ // Worst case for uncompressible data
+ default_rrdeng_disk_quota_mb = (((uint64_t)DSET_DIMS * DSET_CHARTS) * sizeof(storage_number) * history_seconds) /
+ (1024 * 1024);
+ default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100;
+
+ nd_log_limits_unlimited();
+ fprintf(stderr, "Initializing localhost with hostname 'dbengine-dataset'");
+
+ host = dbengine_rrdhost_find_or_create("dbengine-dataset");
+ if (NULL == host)
+ return;
+
+ thread_info = mallocz(sizeof(*thread_info) * DSET_CHARTS);
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ thread_info[i] = mallocz(sizeof(*thread_info[i]) + sizeof(RRDDIM *) * DSET_DIMS);
+ }
+ fprintf(stderr, "\nRunning DB-engine workload generator\n");
+
+ time_present = now_realtime_sec();
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ thread_info[i]->host = host;
+ thread_info[i]->chartname = "random";
+ thread_info[i]->dset_charts = DSET_CHARTS;
+ thread_info[i]->chart_i = i;
+ thread_info[i]->dset_dims = DSET_DIMS;
+ thread_info[i]->history_seconds = history_seconds;
+ thread_info[i]->time_present = time_present;
+ thread_info[i]->time_max = 0;
+ thread_info[i]->done = 0;
+ completion_init(&thread_info[i]->charts_initialized);
+ fatal_assert(0 == uv_thread_create(&thread_info[i]->thread, generate_dbengine_chart, thread_info[i]));
+ completion_wait_for(&thread_info[i]->charts_initialized);
+ completion_destroy(&thread_info[i]->charts_initialized);
+ }
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ fatal_assert(0 == uv_thread_join(&thread_info[i]->thread));
+ }
+
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ freez(thread_info[i]);
+ }
+ freez(thread_info);
+ rrd_wrlock();
+ rrdhost_free___while_having_rrd_wrlock(localhost, true);
+ rrd_unlock();
+}
+
+struct dbengine_query_thread {
+ uv_thread_t thread;
+ RRDHOST *host;
+ char *chartname; /* Will be prefixed by type, e.g. "example_local1.", "example_local2." etc */
+ unsigned dset_charts; /* number of charts */
+ unsigned dset_dims; /* dimensions per chart */
+ time_t time_present; /* current virtual time of the benchmark */
+ unsigned history_seconds; /* how far back in the past to go */
+ volatile long done; /* initialize to 0, set to 1 to stop thread */
+ unsigned long errors, queries_nr, queried_metrics_nr; /* statistics */
+ uint8_t delete_old_data; /* if non zero then data are deleted when disk space is exhausted */
+
+ struct dbengine_chart_thread *chart_threads[]; /* dset_charts elements */
+};
+
+static void query_dbengine_chart(void *arg)
+{
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
+ struct dbengine_query_thread *thread_info = (struct dbengine_query_thread *)arg;
+ const int DSET_CHARTS = thread_info->dset_charts;
+ const int DSET_DIMS = thread_info->dset_dims;
+ time_t time_after, time_before, time_min, time_approx_min, time_max, duration;
+ int i, j, update_every = 1;
+ RRDSET *st;
+ RRDDIM *rd;
+ uint8_t same;
+ time_t time_now, time_retrieved, end_time;
+ collected_number generatedv;
+ NETDATA_DOUBLE value, expected;
+ struct storage_engine_query_handle seqh;
+ size_t value_errors = 0, time_errors = 0;
+
+ do {
+ // pick a chart and dimension
+ i = random() % DSET_CHARTS;
+ st = thread_info->chart_threads[i]->st;
+ j = random() % DSET_DIMS;
+ rd = thread_info->chart_threads[i]->rd[j];
+
+ time_min = thread_info->time_present - thread_info->history_seconds + 1;
+ time_max = thread_info->chart_threads[i]->time_max;
+
+ if (thread_info->delete_old_data) {
+ /* A time window of twice the disk space is sufficient for compression space savings of up to 50% */
+ time_approx_min = time_max - (default_rrdeng_disk_quota_mb * 2 * 1024 * 1024) /
+ (((uint64_t) DSET_DIMS * DSET_CHARTS) * sizeof(storage_number));
+ time_min = MAX(time_min, time_approx_min);
+ }
+ if (!time_max) {
+ time_before = time_after = time_min;
+ } else {
+ time_after = time_min + random() % (MAX(time_max - time_min, 1));
+ duration = random() % 3600;
+ time_before = MIN(time_after + duration, time_max); /* up to 1 hour queries */
+ }
+
+ storage_engine_query_init(rd->tiers[0].seb, rd->tiers[0].smh, &seqh, time_after, time_before, STORAGE_PRIORITY_NORMAL);
+ ++thread_info->queries_nr;
+ for (time_now = time_after ; time_now <= time_before ; time_now += update_every) {
+ generatedv = generate_dbengine_chart_value(i, j, time_now);
+ expected = unpack_storage_number(pack_storage_number((NETDATA_DOUBLE) generatedv, SN_DEFAULT_FLAGS));
+
+ if (unlikely(storage_engine_query_is_finished(&seqh))) {
+ if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
+ fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT
+ ", found data gap, ### ERROR 12 ###\n",
+ rrdset_name(st), rrddim_name(rd), (unsigned long) time_now, expected);
+ ++thread_info->errors;
+ }
+ break;
+ }
+
+ STORAGE_POINT sp = storage_engine_query_next_metric(&seqh);
+ value = sp.sum;
+ time_retrieved = sp.start_time_s;
+ end_time = sp.end_time_s;
+
+ if (!netdata_double_isnumber(value)) {
+ if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
+ fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT
+ ", found data gap, ### ERROR 13 ###\n",
+ rrdset_name(st), rrddim_name(rd), (unsigned long) time_now, expected);
+ ++thread_info->errors;
+ }
+ break;
+ }
+ ++thread_info->queried_metrics_nr;
+
+ same = (roundndd(value) == roundndd(expected)) ? 1 : 0;
+ if (!same) {
+ if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
+ if(!value_errors)
+ fprintf(stderr, " DB-engine stresstest %s/%s: at %lu secs, expecting value " NETDATA_DOUBLE_FORMAT
+ ", found " NETDATA_DOUBLE_FORMAT ", ### ERROR 14 ###\n",
+ rrdset_name(st), rrddim_name(rd), (unsigned long) time_now, expected, value);
+ value_errors++;
+ thread_info->errors++;
+ }
+ }
+ if (end_time != time_now) {
+ if (!thread_info->delete_old_data) { /* data validation only when we don't delete */
+ if(!time_errors)
+ fprintf(stderr,
+ " DB-engine stresstest %s/%s: at %lu secs, found timestamp %lu ### ERROR 15 ###\n",
+ rrdset_name(st), rrddim_name(rd), (unsigned long) time_now, (unsigned long) time_retrieved);
+ time_errors++;
+ thread_info->errors++;
+ }
+ }
+ }
+ storage_engine_query_finalize(&seqh);
+ } while(!thread_info->done);
+
+ if(value_errors)
+ fprintf(stderr, "%zu value errors encountered\n", value_errors);
+
+ if(time_errors)
+ fprintf(stderr, "%zu time errors encountered\n", time_errors);
+}
+
+void dbengine_stress_test(unsigned TEST_DURATION_SEC, unsigned DSET_CHARTS, unsigned QUERY_THREADS,
+ unsigned RAMP_UP_SECONDS, unsigned PAGE_CACHE_MB, unsigned DISK_SPACE_MB)
+{
+ fprintf(stderr, "%s() running...\n", __FUNCTION__ );
+ const unsigned DSET_DIMS = 128;
+ const uint64_t EXPECTED_COMPRESSION_RATIO = 20;
+ const unsigned HISTORY_SECONDS = 3600 * 24 * 365 * 50; /* 50 year of history */
+ RRDHOST *host = NULL;
+ struct dbengine_chart_thread **chart_threads;
+ struct dbengine_query_thread **query_threads;
+ unsigned i, j;
+ time_t time_start, test_duration;
+
+ nd_log_limits_unlimited();
+
+ if (!TEST_DURATION_SEC)
+ TEST_DURATION_SEC = 10;
+ if (!DSET_CHARTS)
+ DSET_CHARTS = 1;
+ if (!QUERY_THREADS)
+ QUERY_THREADS = 1;
+ if (PAGE_CACHE_MB < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
+ PAGE_CACHE_MB = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
+
+ default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
+ default_rrdeng_page_cache_mb = PAGE_CACHE_MB;
+ if (DISK_SPACE_MB) {
+ fprintf(stderr, "By setting disk space limit data are allowed to be deleted. "
+ "Data validation is turned off for this run.\n");
+ default_rrdeng_disk_quota_mb = DISK_SPACE_MB;
+ } else {
+ // Worst case for uncompressible data
+ default_rrdeng_disk_quota_mb =
+ (((uint64_t) DSET_DIMS * DSET_CHARTS) * sizeof(storage_number) * HISTORY_SECONDS) / (1024 * 1024);
+ default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100;
+ }
+
+ fprintf(stderr, "Initializing localhost with hostname 'dbengine-stress-test'\n");
+
+ (void)sql_init_meta_database(DB_CHECK_NONE, 1);
+ host = dbengine_rrdhost_find_or_create("dbengine-stress-test");
+ if (NULL == host)
+ return;
+
+ chart_threads = mallocz(sizeof(*chart_threads) * DSET_CHARTS);
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ chart_threads[i] = mallocz(sizeof(*chart_threads[i]) + sizeof(RRDDIM *) * DSET_DIMS);
+ }
+ query_threads = mallocz(sizeof(*query_threads) * QUERY_THREADS);
+ for (i = 0 ; i < QUERY_THREADS ; ++i) {
+ query_threads[i] = mallocz(sizeof(*query_threads[i]) + sizeof(struct dbengine_chart_thread *) * DSET_CHARTS);
+ }
+ fprintf(stderr, "\nRunning DB-engine stress test, %u seconds writers ramp-up time,\n"
+ "%u seconds of concurrent readers and writers, %u writer threads, %u reader threads,\n"
+ "%u MiB of page cache.\n",
+ RAMP_UP_SECONDS, TEST_DURATION_SEC, DSET_CHARTS, QUERY_THREADS, PAGE_CACHE_MB);
+
+ time_start = now_realtime_sec() + HISTORY_SECONDS; /* move history to the future */
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ chart_threads[i]->host = host;
+ chart_threads[i]->chartname = "random";
+ chart_threads[i]->dset_charts = DSET_CHARTS;
+ chart_threads[i]->chart_i = i;
+ chart_threads[i]->dset_dims = DSET_DIMS;
+ chart_threads[i]->history_seconds = HISTORY_SECONDS;
+ chart_threads[i]->time_present = time_start;
+ chart_threads[i]->time_max = 0;
+ chart_threads[i]->done = 0;
+ chart_threads[i]->errors = chart_threads[i]->stored_metrics_nr = 0;
+ completion_init(&chart_threads[i]->charts_initialized);
+ fatal_assert(0 == uv_thread_create(&chart_threads[i]->thread, generate_dbengine_chart, chart_threads[i]));
+ }
+ /* barrier so that subsequent queries can access valid chart data */
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ completion_wait_for(&chart_threads[i]->charts_initialized);
+ completion_destroy(&chart_threads[i]->charts_initialized);
+ }
+ sleep(RAMP_UP_SECONDS);
+ /* at this point data have already began being written to the database */
+ for (i = 0 ; i < QUERY_THREADS ; ++i) {
+ query_threads[i]->host = host;
+ query_threads[i]->chartname = "random";
+ query_threads[i]->dset_charts = DSET_CHARTS;
+ query_threads[i]->dset_dims = DSET_DIMS;
+ query_threads[i]->history_seconds = HISTORY_SECONDS;
+ query_threads[i]->time_present = time_start;
+ query_threads[i]->done = 0;
+ query_threads[i]->errors = query_threads[i]->queries_nr = query_threads[i]->queried_metrics_nr = 0;
+ for (j = 0 ; j < DSET_CHARTS ; ++j) {
+ query_threads[i]->chart_threads[j] = chart_threads[j];
+ }
+ query_threads[i]->delete_old_data = DISK_SPACE_MB ? 1 : 0;
+ fatal_assert(0 == uv_thread_create(&query_threads[i]->thread, query_dbengine_chart, query_threads[i]));
+ }
+ sleep(TEST_DURATION_SEC);
+ /* stop workload */
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ chart_threads[i]->done = 1;
+ }
+ for (i = 0 ; i < QUERY_THREADS ; ++i) {
+ query_threads[i]->done = 1;
+ }
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ assert(0 == uv_thread_join(&chart_threads[i]->thread));
+ }
+ for (i = 0 ; i < QUERY_THREADS ; ++i) {
+ assert(0 == uv_thread_join(&query_threads[i]->thread));
+ }
+ test_duration = now_realtime_sec() - (time_start - HISTORY_SECONDS);
+ if (!test_duration)
+ test_duration = 1;
+ fprintf(stderr, "\nDB-engine stress test finished in %lld seconds.\n", (long long)test_duration);
+ unsigned long stored_metrics_nr = 0;
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ stored_metrics_nr += chart_threads[i]->stored_metrics_nr;
+ }
+ unsigned long queried_metrics_nr = 0;
+ for (i = 0 ; i < QUERY_THREADS ; ++i) {
+ queried_metrics_nr += query_threads[i]->queried_metrics_nr;
+ }
+ fprintf(stderr, "%u metrics were stored (dataset size of %lu MiB) in %u charts by 1 writer thread per chart.\n",
+ DSET_CHARTS * DSET_DIMS, stored_metrics_nr * sizeof(storage_number) / (1024 * 1024), DSET_CHARTS);
+ fprintf(stderr, "Metrics were being generated per 1 emulated second and time was accelerated.\n");
+ fprintf(stderr, "%lu metric data points were queried by %u reader threads.\n", queried_metrics_nr, QUERY_THREADS);
+ fprintf(stderr, "Query starting time is randomly chosen from the beginning of the time-series up to the time of\n"
+ "the latest data point, and ending time from 1 second up to 1 hour after the starting time.\n");
+ fprintf(stderr, "Performance is %lld written data points/sec and %lld read data points/sec.\n",
+ (long long)(stored_metrics_nr / test_duration), (long long)(queried_metrics_nr / test_duration));
+
+ for (i = 0 ; i < DSET_CHARTS ; ++i) {
+ freez(chart_threads[i]);
+ }
+ freez(chart_threads);
+ for (i = 0 ; i < QUERY_THREADS ; ++i) {
+ freez(query_threads[i]);
+ }
+ freez(query_threads);
+ rrd_wrlock();
+ rrdeng_prepare_exit((struct rrdengine_instance *)host->db[0].si);
+ rrdeng_exit((struct rrdengine_instance *)host->db[0].si);
+ rrdeng_enq_cmd(NULL, RRDENG_OPCODE_SHUTDOWN_EVLOOP, NULL, NULL, STORAGE_PRIORITY_BEST_EFFORT, NULL, NULL);
+ rrd_unlock();
+}
+
+#endif \ No newline at end of file
diff --git a/src/database/engine/dbengine-unittest.c b/src/database/engine/dbengine-unittest.c
new file mode 100644
index 000000000..4c4d312c0
--- /dev/null
+++ b/src/database/engine/dbengine-unittest.c
@@ -0,0 +1,419 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "../../daemon/common.h"
+
+#ifdef ENABLE_DBENGINE
+
+#define CHARTS 64
+#define DIMS 16 // CHARTS * DIMS dimensions
+#define REGIONS 11
+#define POINTS_PER_REGION 16384
+static const int REGION_UPDATE_EVERY[REGIONS] = {1, 15, 3, 20, 2, 6, 30, 12, 5, 4, 10};
+
+#define START_TIMESTAMP MAX(2 * API_RELATIVE_TIME_MAX, 200000000)
+
+static time_t region_start_time(time_t previous_region_end_time, time_t update_every) {
+ // leave a small gap between regions
+ // but keep them close together, so that cross-region queries will be fast
+
+ time_t rc = previous_region_end_time + update_every;
+ rc += update_every - (rc % update_every);
+ rc += update_every;
+ return rc;
+}
+
+static inline collected_number point_value_get(size_t region, size_t chart, size_t dim, size_t point) {
+ // calculate the value to be stored for each point in the database
+
+ collected_number r = (collected_number)region;
+ collected_number c = (collected_number)chart;
+ collected_number d = (collected_number)dim;
+ collected_number p = (collected_number)point;
+
+ return (r * CHARTS * DIMS * POINTS_PER_REGION +
+ c * DIMS * POINTS_PER_REGION +
+ d * POINTS_PER_REGION +
+ p) % 10000000;
+}
+
+static inline void storage_point_check(size_t region, size_t chart, size_t dim, size_t point, time_t now, time_t update_every, STORAGE_POINT sp, size_t *value_errors, size_t *time_errors, size_t *update_every_errors) {
+ // check the supplied STORAGE_POINT retrieved from the database
+ // against the computed timestamp, update_every and expected value
+
+ if(storage_point_is_gap(sp)) sp.min = sp.max = sp.sum = NAN;
+
+ collected_number expected = point_value_get(region, chart, dim, point);
+
+ if(roundndd(expected) != roundndd(sp.sum)) {
+ if(*value_errors < DIMS * 2) {
+ fprintf(stderr, " >>> DBENGINE: VALUE DOES NOT MATCH: "
+ "region %zu, chart %zu, dimension %zu, point %zu, time %ld: "
+ "expected %lld, found %f\n",
+ region, chart, dim, point, now, expected, sp.sum);
+ }
+
+ (*value_errors)++;
+ }
+
+ if(sp.start_time_s > now || sp.end_time_s < now) {
+ if(*time_errors < DIMS * 2) {
+ fprintf(stderr, " >>> DBENGINE: TIMESTAMP DOES NOT MATCH: "
+ "region %zu, chart %zu, dimension %zu, point %zu, timestamp %ld: "
+ "expected %ld, found %ld - %ld\n",
+ region, chart, dim, point, now, now, sp.start_time_s, sp.end_time_s);
+ }
+
+ (*time_errors)++;
+ }
+
+ if(update_every != sp.end_time_s - sp.start_time_s) {
+ if(*update_every_errors < DIMS * 2) {
+ fprintf(stderr, " >>> DBENGINE: UPDATE EVERY DOES NOT MATCH: "
+ "region %zu, chart %zu, dimension %zu, point %zu, timestamp %ld: "
+ "expected %ld, found %ld\n",
+ region, chart, dim, point, now, update_every, sp.end_time_s - sp.start_time_s);
+ }
+
+ (*update_every_errors)++;
+ }
+}
+
+static inline void rrddim_set_by_pointer_fake_time(RRDDIM *rd, collected_number value, time_t now) {
+ rd->collector.last_collected_time.tv_sec = now;
+ rd->collector.last_collected_time.tv_usec = 0;
+ rd->collector.collected_value = value;
+ rrddim_set_updated(rd);
+
+ rd->collector.counter++;
+
+ collected_number v = (value >= 0) ? value : -value;
+ if(unlikely(v > rd->collector.collected_value_max)) rd->collector.collected_value_max = v;
+}
+
+static RRDHOST *dbengine_rrdhost_find_or_create(char *name) {
+ /* We don't want to drop metrics when generating load,
+ * we prefer to block data generation itself */
+
+ return rrdhost_find_or_create(
+ name,
+ name,
+ name,
+ os_type,
+ netdata_configured_timezone,
+ netdata_configured_abbrev_timezone,
+ netdata_configured_utc_offset,
+ program_name,
+ program_version,
+ default_rrd_update_every,
+ default_rrd_history_entries,
+ RRD_MEMORY_MODE_DBENGINE,
+ health_plugin_enabled(),
+ default_rrdpush_enabled,
+ default_rrdpush_destination,
+ default_rrdpush_api_key,
+ default_rrdpush_send_charts_matching,
+ default_rrdpush_enable_replication,
+ default_rrdpush_seconds_to_replicate,
+ default_rrdpush_replication_step,
+ NULL,
+ 0
+ );
+}
+
+static void test_dbengine_create_charts(RRDHOST *host, RRDSET *st[CHARTS], RRDDIM *rd[CHARTS][DIMS],
+ int update_every) {
+ fprintf(stderr, "DBENGINE Creating Test Charts...\n");
+
+ int i, j;
+ char name[101];
+
+ for (i = 0 ; i < CHARTS ; ++i) {
+ snprintfz(name, sizeof(name) - 1, "dbengine-chart-%d", i);
+
+ // create the chart
+ st[i] = rrdset_create(host, "netdata", name, name, "netdata", NULL, "Unit Testing", "a value", "unittest",
+ NULL, 1, update_every, RRDSET_TYPE_LINE);
+ rrdset_flag_set(st[i], RRDSET_FLAG_DEBUG);
+ rrdset_flag_set(st[i], RRDSET_FLAG_STORE_FIRST);
+ for (j = 0 ; j < DIMS ; ++j) {
+ snprintfz(name, sizeof(name) - 1, "dim-%d", j);
+
+ rd[i][j] = rrddim_add(st[i], name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ }
+ }
+
+ // Initialize DB with the very first entries
+ for (i = 0 ; i < CHARTS ; ++i) {
+ for (j = 0 ; j < DIMS ; ++j) {
+ rd[i][j]->collector.last_collected_time.tv_sec =
+ st[i]->last_collected_time.tv_sec = st[i]->last_updated.tv_sec = START_TIMESTAMP - 1;
+ rd[i][j]->collector.last_collected_time.tv_usec =
+ st[i]->last_collected_time.tv_usec = st[i]->last_updated.tv_usec = 0;
+ }
+ }
+ for (i = 0 ; i < CHARTS ; ++i) {
+ st[i]->usec_since_last_update = USEC_PER_SEC;
+
+ for (j = 0; j < DIMS; ++j) {
+ rrddim_set_by_pointer_fake_time(rd[i][j], 69, START_TIMESTAMP); // set first value to 69
+ }
+
+ struct timeval now;
+ now_realtime_timeval(&now);
+ rrdset_timed_done(st[i], now, false);
+ }
+ // Flush pages for subsequent real values
+ for (i = 0 ; i < CHARTS ; ++i) {
+ for (j = 0; j < DIMS; ++j) {
+ rrdeng_store_metric_flush_current_page((rd[i][j])->tiers[0].sch);
+ }
+ }
+}
+
+static time_t test_dbengine_create_metrics(
+ RRDSET *st[CHARTS],
+ RRDDIM *rd[CHARTS][DIMS],
+ size_t current_region,
+ time_t time_start) {
+
+ time_t update_every = REGION_UPDATE_EVERY[current_region];
+ fprintf(stderr, "DBENGINE Single Region Write to "
+ "region %zu, from %ld to %ld, with update every %ld...\n",
+ current_region, time_start, time_start + POINTS_PER_REGION * update_every, update_every);
+
+ // for the database to save the metrics at the right time, we need to set
+ // the last data collection time to be just before the first data collection.
+ time_t time_now = time_start;
+ for (size_t c = 0 ; c < CHARTS ; ++c) {
+ for (size_t d = 0 ; d < DIMS ; ++d) {
+ storage_engine_store_change_collection_frequency(rd[c][d]->tiers[0].sch, (int)update_every);
+
+ // setting these timestamps, to the data collection time, prevents interpolation
+ // during data collection, so that our value will be written as-is to the
+ // database.
+
+ rd[c][d]->collector.last_collected_time.tv_sec =
+ st[c]->last_collected_time.tv_sec = st[c]->last_updated.tv_sec = time_now;
+
+ rd[c][d]->collector.last_collected_time.tv_usec =
+ st[c]->last_collected_time.tv_usec = st[c]->last_updated.tv_usec = 0;
+ }
+ }
+
+ // set the samples to the database
+ for (size_t p = 0; p < POINTS_PER_REGION ; ++p) {
+ for (size_t c = 0 ; c < CHARTS ; ++c) {
+ st[c]->usec_since_last_update = USEC_PER_SEC * update_every;
+
+ for (size_t d = 0; d < DIMS; ++d)
+ rrddim_set_by_pointer_fake_time(rd[c][d], point_value_get(current_region, c, d, p), time_now);
+
+ rrdset_timed_done(st[c], (struct timeval){ .tv_sec = time_now, .tv_usec = 0 }, false);
+ }
+
+ time_now += update_every;
+ }
+
+ return time_now;
+}
+
+// Checks the metric data for the given region, returns number of errors
+static size_t test_dbengine_check_metrics(
+ RRDSET *st[CHARTS] __maybe_unused,
+ RRDDIM *rd[CHARTS][DIMS],
+ size_t current_region,
+ time_t time_start,
+ time_t time_end) {
+
+ time_t update_every = REGION_UPDATE_EVERY[current_region];
+ fprintf(stderr, "DBENGINE Single Region Read from "
+ "region %zu, from %ld to %ld, with update every %ld...\n",
+ current_region, time_start, time_end, update_every);
+
+ // initialize all queries
+ struct storage_engine_query_handle handles[CHARTS * DIMS] = { 0 };
+ for (size_t c = 0 ; c < CHARTS ; ++c) {
+ for (size_t d = 0; d < DIMS; ++d) {
+ storage_engine_query_init(rd[c][d]->tiers[0].seb,
+ rd[c][d]->tiers[0].smh,
+ &handles[c * DIMS + d],
+ time_start,
+ time_end,
+ STORAGE_PRIORITY_NORMAL);
+ }
+ }
+
+ // check the stored samples
+ size_t value_errors = 0, time_errors = 0, update_every_errors = 0;
+ time_t time_now = time_start;
+ for(size_t p = 0; p < POINTS_PER_REGION ;p++) {
+ for (size_t c = 0 ; c < CHARTS ; ++c) {
+ for (size_t d = 0; d < DIMS; ++d) {
+ STORAGE_POINT sp = storage_engine_query_next_metric(&handles[c * DIMS + d]);
+ storage_point_check(current_region, c, d, p, time_now, update_every, sp,
+ &value_errors, &time_errors, &update_every_errors);
+ }
+ }
+
+ time_now += update_every;
+ }
+
+ // finalize the queries
+ for (size_t c = 0 ; c < CHARTS ; ++c) {
+ for (size_t d = 0; d < DIMS; ++d) {
+ storage_engine_query_finalize(&handles[c * DIMS + d]);
+ }
+ }
+
+ if(value_errors)
+ fprintf(stderr, "%zu value errors encountered (out of %d checks)\n", value_errors, POINTS_PER_REGION * CHARTS * DIMS);
+
+ if(time_errors)
+ fprintf(stderr, "%zu time errors encountered (out of %d checks)\n", time_errors, POINTS_PER_REGION * CHARTS * DIMS);
+
+ if(update_every_errors)
+ fprintf(stderr, "%zu update every errors encountered (out of %d checks)\n", update_every_errors, POINTS_PER_REGION * CHARTS * DIMS);
+
+ return value_errors + time_errors + update_every_errors;
+}
+
+static size_t dbengine_test_rrdr_single_region(
+ RRDSET *st[CHARTS],
+ RRDDIM *rd[CHARTS][DIMS],
+ size_t current_region,
+ time_t time_start,
+ time_t time_end) {
+
+ time_t update_every = REGION_UPDATE_EVERY[current_region];
+ fprintf(stderr, "RRDR Single Region Test on "
+ "region %zu, start time %lld, end time %lld, update every %ld, on %d dimensions...\n",
+ current_region, (long long)time_start, (long long)time_end, update_every, CHARTS * DIMS);
+
+ size_t errors = 0, value_errors = 0, time_errors = 0, update_every_errors = 0;
+ long points = (time_end - time_start) / update_every;
+ for(size_t c = 0; c < CHARTS ;c++) {
+ ONEWAYALLOC *owa = onewayalloc_create(0);
+ RRDR *r = rrd2rrdr_legacy(owa, st[c], points, time_start, time_end,
+ RRDR_GROUPING_AVERAGE, 0, RRDR_OPTION_NATURAL_POINTS,
+ NULL, NULL, 0, 0,
+ QUERY_SOURCE_UNITTEST, STORAGE_PRIORITY_NORMAL);
+ if (!r) {
+ fprintf(stderr, " >>> DBENGINE: %s: empty RRDR on region %zu\n", rrdset_name(st[c]), current_region);
+ onewayalloc_destroy(owa);
+ errors++;
+ continue;
+ }
+
+ if(r->internal.qt->request.st != st[c])
+ fatal("queried wrong chart");
+
+ if(rrdr_rows(r) != POINTS_PER_REGION)
+ fatal("query returned wrong number of points (expected %d, got %zu)", POINTS_PER_REGION, rrdr_rows(r));
+
+ time_t time_now = time_start;
+ for (size_t p = 0; p < rrdr_rows(r); p++) {
+ size_t d = 0;
+ RRDDIM *dim;
+ rrddim_foreach_read(dim, r->internal.qt->request.st) {
+ if(unlikely(d >= r->d))
+ fatal("got more dimensions (%zu) than expected (%zu)", d, r->d);
+
+ if(rd[c][d] != dim)
+ fatal("queried wrong dimension");
+
+ RRDR_VALUE_FLAGS *co = &r->o[ p * r->d ];
+ NETDATA_DOUBLE *cn = &r->v[ p * r->d ];
+
+ STORAGE_POINT sp = STORAGE_POINT_UNSET;
+ sp.min = sp.max = sp.sum = (co[d] & RRDR_VALUE_EMPTY) ? NAN :cn[d];
+ sp.count = 1;
+ sp.end_time_s = r->t[p];
+ sp.start_time_s = sp.end_time_s - r->view.update_every;
+
+ storage_point_check(current_region, c, d, p, time_now, update_every, sp, &value_errors, &time_errors, &update_every_errors);
+ d++;
+ }
+ rrddim_foreach_done(dim);
+ time_now += update_every;
+ }
+
+ rrdr_free(owa, r);
+ onewayalloc_destroy(owa);
+ }
+
+ if(value_errors)
+ fprintf(stderr, "%zu value errors encountered (out of %d checks)\n", value_errors, POINTS_PER_REGION * CHARTS * DIMS);
+
+ if(time_errors)
+ fprintf(stderr, "%zu time errors encountered (out of %d checks)\n", time_errors, POINTS_PER_REGION * CHARTS * DIMS);
+
+ if(update_every_errors)
+ fprintf(stderr, "%zu update every errors encountered (out of %d checks)\n", update_every_errors, POINTS_PER_REGION * CHARTS * DIMS);
+
+ return errors + value_errors + time_errors + update_every_errors;
+}
+
+int test_dbengine(void) {
+ // provide enough threads to dbengine
+ setenv("UV_THREADPOOL_SIZE", "48", 1);
+
+ size_t errors = 0, value_errors = 0, time_errors = 0;
+
+ nd_log_limits_unlimited();
+ fprintf(stderr, "\nRunning DB-engine test\n");
+
+ default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
+ fprintf(stderr, "Initializing localhost with hostname 'unittest-dbengine'");
+ RRDHOST *host = dbengine_rrdhost_find_or_create("unittest-dbengine");
+ if(!host)
+ fatal("Failed to initialize host");
+
+ RRDSET *st[CHARTS] = { 0 };
+ RRDDIM *rd[CHARTS][DIMS] = { 0 };
+ time_t time_start[REGIONS] = { 0 }, time_end[REGIONS] = { 0 };
+
+ // create the charts and dimensions we need
+ test_dbengine_create_charts(host, st, rd, REGION_UPDATE_EVERY[0]);
+
+ time_t now = START_TIMESTAMP;
+ time_t update_every_old = REGION_UPDATE_EVERY[0];
+ for(size_t current_region = 0; current_region < REGIONS ;current_region++) {
+ time_t update_every = REGION_UPDATE_EVERY[current_region];
+
+ if(update_every != update_every_old) {
+ for (size_t c = 0 ; c < CHARTS ; ++c)
+ rrdset_set_update_every_s(st[c], update_every);
+ }
+
+ time_start[current_region] = region_start_time(now, update_every);
+ now = time_end[current_region] = test_dbengine_create_metrics(st,rd, current_region, time_start[current_region]);
+
+ errors += test_dbengine_check_metrics(st, rd, current_region, time_start[current_region], time_end[current_region]);
+ }
+
+ // check everything again
+ for(size_t current_region = 0; current_region < REGIONS ;current_region++)
+ errors += test_dbengine_check_metrics(st, rd, current_region, time_start[current_region], time_end[current_region]);
+
+ // check again in reverse order
+ for(size_t current_region = 0; current_region < REGIONS ;current_region++) {
+ size_t region = REGIONS - 1 - current_region;
+ errors += test_dbengine_check_metrics(st, rd, region, time_start[region], time_end[region]);
+ }
+
+ // check all the regions using RRDR
+ // this also checks the query planner and the query engine of Netdata
+ for (size_t current_region = 0 ; current_region < REGIONS ; current_region++) {
+ errors += dbengine_test_rrdr_single_region(st, rd, current_region, time_start[current_region], time_end[current_region]);
+ }
+
+ rrd_wrlock();
+ rrdeng_prepare_exit((struct rrdengine_instance *)host->db[0].si);
+ rrdeng_exit((struct rrdengine_instance *)host->db[0].si);
+ rrdeng_enq_cmd(NULL, RRDENG_OPCODE_SHUTDOWN_EVLOOP, NULL, NULL, STORAGE_PRIORITY_BEST_EFFORT, NULL, NULL);
+ rrd_unlock();
+
+ return (int)(errors + value_errors + time_errors);
+}
+
+#endif
diff --git a/database/engine/journalfile.c b/src/database/engine/journalfile.c
index 9005b81ca..8099d017f 100644
--- a/database/engine/journalfile.c
+++ b/src/database/engine/journalfile.c
@@ -637,9 +637,12 @@ static int journalfile_check_superblock(uv_file file)
fatal_assert(req.result >= 0);
uv_fs_req_cleanup(&req);
- if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
- strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
- netdata_log_error("DBENGINE: File has invalid superblock.");
+
+ char jf_magic[RRDENG_MAGIC_SZ] = RRDENG_JF_MAGIC;
+ char jf_ver[RRDENG_VER_SZ] = RRDENG_JF_VER;
+ if (strncmp(superblock->magic_number, jf_magic, RRDENG_MAGIC_SZ) != 0 ||
+ strncmp(superblock->version, jf_ver, RRDENG_VER_SZ) != 0) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "DBENGINE: File has invalid superblock.");
ret = UV_EINVAL;
} else {
ret = 0;
@@ -669,7 +672,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx,
uuid_t *temp_id;
uint8_t page_type = jf_metric_data->descr[i].type;
- if (page_type > PAGE_TYPE_MAX) {
+ if (page_type > RRDENG_PAGE_TYPE_MAX) {
if (!bitmap256_get_bit(&page_error_map, page_type)) {
netdata_log_error("DBENGINE: unknown page type %d encountered.", page_type);
bitmap256_set_bit(&page_error_map, page_type, 1);
@@ -700,13 +703,19 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx,
.section = (Word_t)ctx,
.first_time_s = vd.start_time_s,
.last_time_s = vd.end_time_s,
- .latest_update_every_s = (uint32_t) vd.update_every_s,
+ .latest_update_every_s = vd.update_every_s,
};
bool added;
metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
- if(added)
+ if(added) {
+ __atomic_add_fetch(&ctx->atomic.metrics, 1, __ATOMIC_RELAXED);
update_metric_time = false;
+ }
+ if (vd.update_every_s) {
+ uint64_t samples = (vd.end_time_s - vd.start_time_s) / vd.update_every_s;
+ __atomic_add_fetch(&ctx->atomic.samples, samples, __ATOMIC_RELAXED);
+ }
}
Word_t metric_id = mrg_metric_id(main_mrg, metric);
@@ -1005,7 +1014,7 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st
time_t end_time_s = header_start_time_s + metric->delta_end_s;
mrg_update_metric_retention_and_granularity_by_uuid(
- main_mrg, (Word_t)ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s);
+ main_mrg, (Word_t)ctx, &metric->uuid, start_time_s, end_time_s, metric->update_every_s, now_s);
metric++;
}
@@ -1042,7 +1051,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
journal_v1_file_size = (uint32_t)statbuf.st_size;
journalfile_v2_generate_path(datafile, path_v2, sizeof(path_v2));
- fd = open(path_v2, O_RDONLY);
+ fd = open(path_v2, O_RDONLY | O_CLOEXEC);
if (fd < 0) {
if (errno == ENOENT)
return 1;
@@ -1226,7 +1235,7 @@ void *journalfile_v2_write_data_page(struct journal_v2_header *j2_header, void *
data_page->delta_end_s = (uint32_t) (page_info->end_time_s - (time_t) (j2_header->start_time_ut) / USEC_PER_SEC);
data_page->extent_index = page_info->extent_index;
- data_page->update_every_s = (uint32_t) page_info->update_every_s;
+ data_page->update_every_s = page_info->update_every_s;
data_page->page_length = (uint16_t) (ei ? ei->page_length : page_info->page_length);
data_page->type = 0;
@@ -1252,7 +1261,7 @@ static void *journalfile_v2_write_descriptors(struct journal_v2_header *j2_heade
page_info = *PValue;
// Write one descriptor and return the next data page location
data_page = journalfile_v2_write_data_page(j2_header, (void *) data_page, page_info);
- update_every_s = (uint32_t) page_info->update_every_s;
+ update_every_s = page_info->update_every_s;
if (NULL == data_page)
break;
}
diff --git a/database/engine/journalfile.h b/src/database/engine/journalfile.h
index 5cdf72b9d..3f881ee16 100644
--- a/database/engine/journalfile.h
+++ b/src/database/engine/journalfile.h
@@ -7,7 +7,6 @@
/* Forward declarations */
struct rrdengine_instance;
-struct rrdengine_worker_config;
struct rrdengine_datafile;
struct rrdengine_journalfile;
diff --git a/database/engine/metric.c b/src/database/engine/metric.c
index 2e132612e..01eb22fbc 100644
--- a/database/engine/metric.c
+++ b/src/database/engine/metric.c
@@ -1,5 +1,8 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "metric.h"
+#include "cache.h"
+#include "libnetdata/locks/locks.h"
+#include "rrddiskprotocol.h"
typedef int32_t REFCOUNT;
#define REFCOUNT_DELETING (-100)
@@ -104,8 +107,11 @@ static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg, size_t partition
static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) {
uint8_t *u = (uint8_t *)uuid;
- size_t *n = (size_t *)&u[UUID_SZ - sizeof(size_t)];
- return *n % mrg->partitions;
+
+ size_t n;
+ memcpy(&n, &u[UUID_SZ - sizeof(size_t)], sizeof(size_t));
+
+ return n % mrg->partitions;
}
static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused, METRIC *metric) {
@@ -125,87 +131,174 @@ static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused,
return first_time_s;
}
-static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric) {
+static void metric_log(MRG *mrg __maybe_unused, METRIC *metric, const char *msg) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)metric->section;
+
+ char uuid[UUID_STR_LEN];
+ uuid_unparse_lower(metric->uuid, uuid);
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "METRIC: %s on %s at tier %d, refcount %d, partition %u, "
+ "retention [%ld - %ld (hot), %ld (clean)], update every %"PRIu32", "
+ "writer pid %d "
+ "--- PLEASE OPEN A GITHUB ISSUE TO REPORT THIS LOG LINE TO NETDATA --- ",
+ msg,
+ uuid,
+ ctx->config.tier,
+ metric->refcount,
+ metric->partition,
+ metric->first_time_s,
+ metric->latest_time_s_hot,
+ metric->latest_time_s_clean,
+ metric->latest_update_every_s,
+ (int)metric->writer
+ );
+}
+
+static inline bool acquired_metric_has_retention(MRG *mrg, METRIC *metric) {
+ time_t first, last;
+ mrg_metric_get_retention(mrg, metric, &first, &last, NULL);
+ return (!first || !last || first > last);
+}
+
+static inline void acquired_for_deletion_metric_delete(MRG *mrg, METRIC *metric) {
size_t partition = metric->partition;
- REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
- REFCOUNT refcount;
+
+ size_t mem_before_judyl, mem_after_judyl;
+
+ mrg_index_write_lock(mrg, partition);
+
+ Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t));
+ if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) {
+ MRG_STATS_DELETE_MISS(mrg, partition);
+ mrg_index_write_unlock(mrg, partition);
+ return;
+ }
+
+ mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
+ int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0);
+ mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
+ mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
+
+ if(unlikely(!rc)) {
+ MRG_STATS_DELETE_MISS(mrg, partition);
+ mrg_index_write_unlock(mrg, partition);
+ return;
+ }
+
+ if(!*sections_judy_pptr) {
+ rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0);
+ if(unlikely(!rc))
+ fatal("DBENGINE METRIC: cannot delete UUID from JudyHS");
+ mrg_stats_size_judyhs_removed_uuid(mrg, partition);
+ }
+
+ MRG_STATS_DELETED_METRIC(mrg, partition);
+
+ mrg_index_write_unlock(mrg, partition);
+
+ aral_freez(mrg->index[partition].aral, metric);
+}
+
+static inline bool metric_acquire(MRG *mrg, METRIC *metric) {
+ REFCOUNT expected, desired;
+
+ expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
do {
- if(expected < 0)
- fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount);
+ if(unlikely(expected < 0))
+ return false;
+
+ desired = expected + 1;
- refcount = expected + 1;
- } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+ } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED));
+
+ size_t partition = metric->partition;
- if(refcount == 1)
+ if(desired == 1)
__atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
- return refcount;
+ return true;
}
-static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) {
+static inline bool metric_release(MRG *mrg, METRIC *metric, bool delete_if_last_without_retention) {
size_t partition = metric->partition;
- REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
- REFCOUNT refcount;
+ REFCOUNT expected, desired;
+
+ expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
do {
- if(expected <= 0)
- fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount);
+ if(expected <= 0) {
+ metric_log(mrg, metric, "refcount is zero or negative during release");
+ fatal("METRIC: refcount is %d (zero or negative) during release", expected);
+ }
- refcount = expected - 1;
- } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+ if(expected == 1 && delete_if_last_without_retention && !acquired_metric_has_retention(mrg, metric))
+ desired = REFCOUNT_DELETING;
+ else
+ desired = expected - 1;
+
+ } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED));
- if(unlikely(!refcount))
+ if(desired == 0 || desired == REFCOUNT_DELETING) {
__atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
+ if(desired == REFCOUNT_DELETING)
+ acquired_for_deletion_metric_delete(mrg, metric);
+ }
+
__atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
- time_t first, last, ue;
- mrg_metric_get_retention(mrg, metric, &first, &last, &ue);
- return (!first || !last || first > last);
+ return desired == REFCOUNT_DELETING;
}
static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
size_t partition = uuid_partition(mrg, entry->uuid);
METRIC *allocation = aral_mallocz(mrg->index[partition].aral);
+ Pvoid_t *PValue;
- mrg_index_write_lock(mrg, partition);
+ while(1) {
+ mrg_index_write_lock(mrg, partition);
- size_t mem_before_judyl, mem_after_judyl;
+ size_t mem_before_judyl, mem_after_judyl;
- Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, entry->uuid, sizeof(uuid_t), PJE0);
- if(unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR))
- fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array");
+ Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, entry->uuid, sizeof(uuid_t), PJE0);
+ if (unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR))
+ fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array");
- if(unlikely(!*sections_judy_pptr))
- mrg_stats_size_judyhs_added_uuid(mrg, partition);
+ if (unlikely(!*sections_judy_pptr))
+ mrg_stats_size_judyhs_added_uuid(mrg, partition);
- mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
- Pvoid_t *PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0);
- mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
- mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
+ mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
+ PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0);
+ mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
+ mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
- if(unlikely(!PValue || PValue == PJERR))
- fatal("DBENGINE METRIC: corrupted section JudyL array");
+ if (unlikely(!PValue || PValue == PJERR))
+ fatal("DBENGINE METRIC: corrupted section JudyL array");
- if(unlikely(*PValue != NULL)) {
- METRIC *metric = *PValue;
+ if (unlikely(*PValue != NULL)) {
+ METRIC *metric = *PValue;
- metric_acquire(mrg, metric);
+ if(!metric_acquire(mrg, metric)) {
+ mrg_index_write_unlock(mrg, partition);
+ continue;
+ }
- MRG_STATS_DUPLICATE_ADD(mrg, partition);
+ MRG_STATS_DUPLICATE_ADD(mrg, partition);
+ mrg_index_write_unlock(mrg, partition);
- mrg_index_write_unlock(mrg, partition);
+ if (ret)
+ *ret = false;
- if(ret)
- *ret = false;
+ aral_freez(mrg->index[partition].aral, allocation);
- aral_freez(mrg->index[partition].aral, allocation);
+ return metric;
+ }
- return metric;
+ break;
}
METRIC *metric = allocation;
@@ -216,9 +309,8 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r
metric->latest_time_s_hot = 0;
metric->latest_update_every_s = entry->latest_update_every_s;
metric->writer = 0;
- metric->refcount = 0;
+ metric->refcount = 1;
metric->partition = partition;
- metric_acquire(mrg, metric);
*PValue = metric;
MRG_STATS_ADDED_METRIC(mrg, partition);
@@ -234,77 +326,35 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r
static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
size_t partition = uuid_partition(mrg, uuid);
- mrg_index_read_lock(mrg, partition);
+ while(1) {
+ mrg_index_read_lock(mrg, partition);
- Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t));
- if(unlikely(!sections_judy_pptr)) {
- mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_MISS(mrg, partition);
- return NULL;
- }
-
- Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0);
- if(unlikely(!PValue)) {
- mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_MISS(mrg, partition);
- return NULL;
- }
-
- METRIC *metric = *PValue;
-
- metric_acquire(mrg, metric);
-
- mrg_index_read_unlock(mrg, partition);
-
- MRG_STATS_SEARCH_HIT(mrg, partition);
- return metric;
-}
-
-static inline bool acquired_metric_del(MRG *mrg, METRIC *metric) {
- size_t partition = metric->partition;
-
- size_t mem_before_judyl, mem_after_judyl;
-
- mrg_index_write_lock(mrg, partition);
+ Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t));
+ if (unlikely(!sections_judy_pptr)) {
+ mrg_index_read_unlock(mrg, partition);
+ MRG_STATS_SEARCH_MISS(mrg, partition);
+ return NULL;
+ }
- if(!metric_release_and_can_be_deleted(mrg, metric)) {
- mrg->index[partition].stats.delete_having_retention_or_referenced++;
- mrg_index_write_unlock(mrg, partition);
- return false;
- }
+ Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0);
+ if (unlikely(!PValue)) {
+ mrg_index_read_unlock(mrg, partition);
+ MRG_STATS_SEARCH_MISS(mrg, partition);
+ return NULL;
+ }
- Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t));
- if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) {
- MRG_STATS_DELETE_MISS(mrg, partition);
- mrg_index_write_unlock(mrg, partition);
- return false;
- }
+ METRIC *metric = *PValue;
- mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
- int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0);
- mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
- mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
+ if(metric && !metric_acquire(mrg, metric))
+ metric = NULL;
- if(unlikely(!rc)) {
- MRG_STATS_DELETE_MISS(mrg, partition);
- mrg_index_write_unlock(mrg, partition);
- return false;
- }
+ mrg_index_read_unlock(mrg, partition);
- if(!*sections_judy_pptr) {
- rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0);
- if(unlikely(!rc))
- fatal("DBENGINE METRIC: cannot delete UUID from JudyHS");
- mrg_stats_size_judyhs_removed_uuid(mrg, partition);
+ if(metric) {
+ MRG_STATS_SEARCH_HIT(mrg, partition);
+ return metric;
+ }
}
-
- MRG_STATS_DELETED_METRIC(mrg, partition);
-
- mrg_index_write_unlock(mrg, partition);
-
- aral_freez(mrg->index[partition].aral, metric);
-
- return true;
}
// ----------------------------------------------------------------------------
@@ -359,7 +409,7 @@ inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section
}
inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
- return acquired_metric_del(mrg, metric);
+ return metric_release(mrg, metric, true);
}
inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
@@ -367,8 +417,8 @@ inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
return metric;
}
-inline bool mrg_metric_release(MRG *mrg, METRIC *metric) {
- return metric_release_and_can_be_deleted(mrg, metric);
+inline void mrg_metric_release(MRG *mrg, METRIC *metric) {
+ metric_release(mrg, metric, false);
}
inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) {
@@ -394,8 +444,8 @@ inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric,
return true;
}
-inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) {
- internal_fatal(first_time_s < 0 || last_time_s < 0 || update_every_s < 0,
+inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, uint32_t update_every_s) {
+ internal_fatal(first_time_s < 0 || last_time_s < 0,
"DBENGINE METRIC: timestamp is negative");
internal_fatal(first_time_s > max_acceptable_collected_time(),
"DBENGINE METRIC: metric first time is in the future");
@@ -425,13 +475,14 @@ inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metri
return mrg_metric_get_first_time_s_smart(mrg, metric);
}
-inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) {
+inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, uint32_t *update_every_s) {
time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
*last_time_s = MAX(clean, hot);
*first_time_s = mrg_metric_get_first_time_s_smart(mrg, metric);
- *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
+ if (update_every_s)
+ *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
}
inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
@@ -498,8 +549,8 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr
}
} while(do_again);
- time_t first, last, ue;
- mrg_metric_get_retention(mrg, metric, &first, &last, &ue);
+ time_t first, last;
+ mrg_metric_get_retention(mrg, metric, &first, &last, NULL);
return (first && last && first < last);
}
@@ -517,6 +568,11 @@ inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *me
return false;
}
+inline time_t mrg_metric_get_latest_clean_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
+ time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
+ return clean;
+}
+
inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
@@ -524,25 +580,21 @@ inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metr
return MAX(clean, hot);
}
-inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
- internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
-
+inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, uint32_t update_every_s) {
if(update_every_s > 0)
return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true);
return false;
}
-inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
- internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
-
+inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, uint32_t update_every_s) {
if(update_every_s > 0)
return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0);
return false;
}
-inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
+inline uint32_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
return __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
}
@@ -589,7 +641,7 @@ inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
inline void mrg_update_metric_retention_and_granularity_by_uuid(
MRG *mrg, Word_t section, uuid_t *uuid,
time_t first_time_s, time_t last_time_s,
- time_t update_every_s, time_t now_s)
+ uint32_t update_every_s, time_t now_s)
{
if(unlikely(last_time_s > now_s)) {
nd_log_limit_static_global_var(erl, 1, 0);
@@ -626,14 +678,35 @@ inline void mrg_update_metric_retention_and_granularity_by_uuid(
.section = section,
.first_time_s = first_time_s,
.last_time_s = last_time_s,
- .latest_update_every_s = (uint32_t) update_every_s
+ .latest_update_every_s = update_every_s
};
metric = mrg_metric_add_and_acquire(mrg, entry, &added);
}
- if (likely(!added))
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *) section;
+ if (likely(!added)) {
+ uint64_t old_samples = 0;
+
+ if (update_every_s && metric->latest_update_every_s && metric->latest_time_s_clean)
+ old_samples = (metric->latest_time_s_clean - metric->first_time_s) / metric->latest_update_every_s;
+
mrg_metric_expand_retention(mrg, metric, first_time_s, last_time_s, update_every_s);
+ uint64_t new_samples = 0;
+ if (update_every_s && metric->latest_update_every_s && metric->latest_time_s_clean)
+ new_samples = (metric->latest_time_s_clean - metric->first_time_s) / metric->latest_update_every_s;
+
+ __atomic_add_fetch(&ctx->atomic.samples, new_samples - old_samples, __ATOMIC_RELAXED);
+ }
+ else {
+ // Newly added
+ if (update_every_s) {
+ uint64_t samples = (last_time_s - first_time_s) / update_every_s;
+ __atomic_add_fetch(&ctx->atomic.samples, samples, __ATOMIC_RELAXED);
+ }
+ __atomic_add_fetch(&ctx->atomic.metrics, 1, __ATOMIC_RELAXED);
+ }
+
mrg_metric_release(mrg, metric);
}
diff --git a/database/engine/metric.h b/src/database/engine/metric.h
index dbb949301..3bace9057 100644
--- a/database/engine/metric.h
+++ b/src/database/engine/metric.h
@@ -52,7 +52,7 @@ MRG *mrg_create(ssize_t partitions);
void mrg_destroy(MRG *mrg);
METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric);
-bool mrg_metric_release(MRG *mrg, METRIC *metric);
+void mrg_metric_release(MRG *mrg, METRIC *metric);
METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret);
METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section);
@@ -69,13 +69,14 @@ time_t mrg_metric_get_first_time_s(MRG *mrg, METRIC *metric);
bool mrg_metric_set_clean_latest_time_s(MRG *mrg, METRIC *metric, time_t latest_time_s);
bool mrg_metric_set_hot_latest_time_s(MRG *mrg, METRIC *metric, time_t latest_time_s);
time_t mrg_metric_get_latest_time_s(MRG *mrg, METRIC *metric);
+time_t mrg_metric_get_latest_clean_time_s(MRG *mrg, METRIC *metric);
-bool mrg_metric_set_update_every(MRG *mrg, METRIC *metric, time_t update_every_s);
-bool mrg_metric_set_update_every_s_if_zero(MRG *mrg, METRIC *metric, time_t update_every_s);
-time_t mrg_metric_get_update_every_s(MRG *mrg, METRIC *metric);
+bool mrg_metric_set_update_every(MRG *mrg, METRIC *metric, uint32_t update_every_s);
+bool mrg_metric_set_update_every_s_if_zero(MRG *mrg, METRIC *metric, uint32_t update_every_s);
+uint32_t mrg_metric_get_update_every_s(MRG *mrg, METRIC *metric);
-void mrg_metric_expand_retention(MRG *mrg, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s);
-void mrg_metric_get_retention(MRG *mrg, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s);
+void mrg_metric_expand_retention(MRG *mrg, METRIC *metric, time_t first_time_s, time_t last_time_s, uint32_t update_every_s);
+void mrg_metric_get_retention(MRG *mrg, METRIC *metric, time_t *first_time_s, time_t *last_time_s, uint32_t *update_every_s);
bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric);
bool mrg_metric_set_writer(MRG *mrg, METRIC *metric);
@@ -89,6 +90,6 @@ size_t mrg_aral_overhead(void);
void mrg_update_metric_retention_and_granularity_by_uuid(
MRG *mrg, Word_t section, uuid_t *uuid,
time_t first_time_s, time_t last_time_s,
- time_t update_every_s, time_t now_s);
+ uint32_t update_every_s, time_t now_s);
#endif // DBENGINE_METRIC_H
diff --git a/database/engine/page.c b/src/database/engine/page.c
index b7a393483..13fe90f7f 100644
--- a/database/engine/page.c
+++ b/src/database/engine/page.c
@@ -111,9 +111,9 @@ void pgd_init_arals(void)
// FIXME: add stats
pgd_alloc_globals.aral_gorilla_buffer[i] = aral_create(
buf,
- GORILLA_BUFFER_SIZE,
+ RRDENG_GORILLA_32BIT_BUFFER_SIZE,
64,
- 512 * GORILLA_BUFFER_SIZE,
+ 512 * RRDENG_GORILLA_32BIT_BUFFER_SIZE,
pgc_aral_statistics(),
NULL, NULL, false, false);
}
@@ -165,8 +165,8 @@ PGD *pgd_create(uint8_t type, uint32_t slots)
pg->states = PGD_STATE_CREATED_FROM_COLLECTOR;
switch (type) {
- case PAGE_METRICS:
- case PAGE_TIER: {
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1: {
uint32_t size = slots * page_type_size[type];
internal_fatal(!size || slots == 1,
@@ -176,11 +176,11 @@ PGD *pgd_create(uint8_t type, uint32_t slots)
pg->raw.data = pgd_data_aral_alloc(size);
break;
}
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
internal_fatal(slots == 1,
"DBENGINE: invalid number of slots (%u) or page type (%u)", slots, type);
- pg->slots = 8 * GORILLA_BUFFER_SLOTS;
+ pg->slots = 8 * RRDENG_GORILLA_32BIT_BUFFER_SLOTS;
// allocate new gorilla writer
pg->gorilla.aral_index = gettid() % 4;
@@ -188,16 +188,19 @@ PGD *pgd_create(uint8_t type, uint32_t slots)
// allocate new gorilla buffer
gorilla_buffer_t *gbuf = aral_mallocz(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index]);
- memset(gbuf, 0, GORILLA_BUFFER_SIZE);
+ memset(gbuf, 0, RRDENG_GORILLA_32BIT_BUFFER_SIZE);
global_statistics_gorilla_buffer_add_hot();
- *pg->gorilla.writer = gorilla_writer_init(gbuf, GORILLA_BUFFER_SLOTS);
+ *pg->gorilla.writer = gorilla_writer_init(gbuf, RRDENG_GORILLA_32BIT_BUFFER_SLOTS);
pg->gorilla.num_buffers = 1;
break;
}
default:
- fatal("Unknown page type: %uc", type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, type);
+ aral_freez(pgd_alloc_globals.aral_pgd, pg);
+ pg = PGD_EMPTY;
+ break;
}
return pg;
@@ -219,8 +222,8 @@ PGD *pgd_create_from_disk_data(uint8_t type, void *base, uint32_t size)
switch (type)
{
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
pg->raw.size = size;
pg->used = size / page_type_size[type];
pg->slots = pg->used;
@@ -228,10 +231,11 @@ PGD *pgd_create_from_disk_data(uint8_t type, void *base, uint32_t size)
pg->raw.data = pgd_data_aral_alloc(size);
memcpy(pg->raw.data, base, size);
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
internal_fatal(size == 0, "Asked to create page with 0 data!!!");
internal_fatal(size % sizeof(uint32_t), "Unaligned gorilla buffer size");
- internal_fatal(size % GORILLA_BUFFER_SIZE, "Expected size to be a multiple of %zu-bytes", GORILLA_BUFFER_SIZE);
+ internal_fatal(size % RRDENG_GORILLA_32BIT_BUFFER_SIZE, "Expected size to be a multiple of %zu-bytes",
+ RRDENG_GORILLA_32BIT_BUFFER_SIZE);
pg->raw.data = mallocz(size);
pg->raw.size = size;
@@ -246,7 +250,10 @@ PGD *pgd_create_from_disk_data(uint8_t type, void *base, uint32_t size)
pg->slots = pg->used;
break;
default:
- fatal("Unknown page type: %uc", type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, type);
+ aral_freez(pgd_alloc_globals.aral_pgd, pg);
+ pg = PGD_EMPTY;
+ break;
}
return pg;
@@ -262,11 +269,11 @@ void pgd_free(PGD *pg)
switch (pg->type)
{
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
pgd_data_aral_free(pg->raw.data, pg->raw.size);
break;
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
if (pg->states & PGD_STATE_CREATED_FROM_DISK)
{
internal_fatal(pg->raw.data == NULL, "Tried to free gorilla PGD loaded from disk with NULL data");
@@ -306,7 +313,8 @@ void pgd_free(PGD *pg)
break;
}
default:
- fatal("Unknown page type: %uc", pg->type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
+ break;
}
aral_freez(pgd_alloc_globals.aral_pgd, pg);
@@ -358,20 +366,21 @@ uint32_t pgd_memory_footprint(PGD *pg)
size_t footprint = 0;
switch (pg->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
footprint = sizeof(PGD) + pg->raw.size;
break;
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
if (pg->states & PGD_STATE_CREATED_FROM_DISK)
footprint = sizeof(PGD) + pg->raw.size;
else
- footprint = sizeof(PGD) + sizeof(gorilla_writer_t) + (pg->gorilla.num_buffers * GORILLA_BUFFER_SIZE);
+ footprint = sizeof(PGD) + sizeof(gorilla_writer_t) + (pg->gorilla.num_buffers * RRDENG_GORILLA_32BIT_BUFFER_SIZE);
break;
}
default:
- fatal("Unknown page type: %uc", pg->type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
+ break;
}
return footprint;
@@ -385,15 +394,15 @@ uint32_t pgd_disk_footprint(PGD *pg)
size_t size = 0;
switch (pg->type) {
- case PAGE_METRICS:
- case PAGE_TIER: {
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1: {
uint32_t used_size = pg->used * page_type_size[pg->type];
internal_fatal(used_size > pg->raw.size, "Wrong disk footprint page size");
size = used_size;
break;
}
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
if (pg->states & PGD_STATE_CREATED_FROM_COLLECTOR ||
pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING ||
pg->states & PGD_STATE_FLUSHED_TO_DISK)
@@ -404,7 +413,7 @@ uint32_t pgd_disk_footprint(PGD *pg)
internal_fatal(pg->gorilla.num_buffers == 0,
"Gorilla writer does not have any buffers");
- size = pg->gorilla.num_buffers * GORILLA_BUFFER_SIZE;
+ size = pg->gorilla.num_buffers * RRDENG_GORILLA_32BIT_BUFFER_SIZE;
if (pg->states & PGD_STATE_CREATED_FROM_COLLECTOR) {
global_statistics_tier0_disk_compressed_bytes(gorilla_writer_nbytes(pg->gorilla.writer));
@@ -419,7 +428,8 @@ uint32_t pgd_disk_footprint(PGD *pg)
break;
}
default:
- fatal("Unknown page type: %uc", pg->type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
+ break;
}
internal_fatal(pg->states & PGD_STATE_CREATED_FROM_DISK,
@@ -434,11 +444,11 @@ void pgd_copy_to_extent(PGD *pg, uint8_t *dst, uint32_t dst_size)
pgd_disk_footprint(pg), dst_size);
switch (pg->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
memcpy(dst, pg->raw.data, dst_size);
break;
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
if ((pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING) == 0)
fatal("Copying to extent is supported only for PGDs that are scheduled for flushing.");
@@ -456,7 +466,8 @@ void pgd_copy_to_extent(PGD *pg, uint8_t *dst, uint32_t dst_size)
break;
}
default:
- fatal("Unknown page type: %uc", pg->type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
+ break;
}
pg->states = PGD_STATE_FLUSHED_TO_DISK;
@@ -490,7 +501,7 @@ void pgd_append_point(PGD *pg,
fatal("Data collection on page already scheduled for flushing");
switch (pg->type) {
- case PAGE_METRICS: {
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT: {
storage_number *tier0_metric_data = (storage_number *)pg->raw.data;
storage_number t = pack_storage_number(n, flags);
tier0_metric_data[pg->used++] = t;
@@ -500,7 +511,7 @@ void pgd_append_point(PGD *pg,
break;
}
- case PAGE_TIER: {
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1: {
storage_number_tier1_t *tier12_metric_data = (storage_number_tier1_t *)pg->raw.data;
storage_number_tier1_t t;
t.sum_value = (float) n;
@@ -515,7 +526,7 @@ void pgd_append_point(PGD *pg,
break;
}
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
pg->used++;
storage_number t = pack_storage_number(n, flags);
@@ -525,9 +536,9 @@ void pgd_append_point(PGD *pg,
bool ok = gorilla_writer_write(pg->gorilla.writer, t);
if (!ok) {
gorilla_buffer_t *new_buffer = aral_mallocz(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index]);
- memset(new_buffer, 0, GORILLA_BUFFER_SIZE);
+ memset(new_buffer, 0, RRDENG_GORILLA_32BIT_BUFFER_SIZE);
- gorilla_writer_add_buffer(pg->gorilla.writer, new_buffer, GORILLA_BUFFER_SLOTS);
+ gorilla_writer_add_buffer(pg->gorilla.writer, new_buffer, RRDENG_GORILLA_32BIT_BUFFER_SLOTS);
pg->gorilla.num_buffers += 1;
global_statistics_gorilla_buffer_add_hot();
@@ -537,7 +548,7 @@ void pgd_append_point(PGD *pg,
break;
}
default:
- fatal("DBENGINE: unknown page type id %d", pg->type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
break;
}
}
@@ -550,11 +561,11 @@ static void pgdc_seek(PGDC *pgdc, uint32_t position)
PGD *pg = pgdc->pgd;
switch (pg->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
pgdc->slots = pgdc->pgd->used;
break;
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
if (pg->states & PGD_STATE_CREATED_FROM_DISK) {
pgdc->slots = pgdc->pgd->slots;
pgdc->gr = gorilla_reader_init((void *) pg->raw.data);
@@ -588,7 +599,7 @@ static void pgdc_seek(PGDC *pgdc, uint32_t position)
break;
}
default:
- fatal("DBENGINE: unknown page type id %d", pg->type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
break;
}
}
@@ -612,7 +623,7 @@ void pgdc_reset(PGDC *pgdc, PGD *pgd, uint32_t position)
pgdc_seek(pgdc, position);
}
-bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *sp)
+bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position __maybe_unused, STORAGE_POINT *sp)
{
if (!pgdc->pgd || pgdc->pgd == PGD_EMPTY || pgdc->position >= pgdc->slots)
{
@@ -624,7 +635,7 @@ bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *
switch (pgdc->pgd->type)
{
- case PAGE_METRICS: {
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT: {
storage_number *array = (storage_number *) pgdc->pgd->raw.data;
storage_number n = array[pgdc->position++];
@@ -635,7 +646,7 @@ bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *
return true;
}
- case PAGE_TIER: {
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1: {
storage_number_tier1_t *array = (storage_number_tier1_t *) pgdc->pgd->raw.data;
storage_number_tier1_t n = array[pgdc->position++];
@@ -648,7 +659,7 @@ bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *
return true;
}
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
pgdc->position++;
uint32_t n = 666666666;
@@ -668,7 +679,8 @@ bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *
static bool logged = false;
if (!logged)
{
- netdata_log_error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", pgd_type(pgdc->pgd));
+ netdata_log_error("DBENGINE: unknown page type %"PRIu32" found. Cannot decode it. Ignoring its metrics.",
+ pgd_type(pgdc->pgd));
logged = true;
}
diff --git a/database/engine/page.h b/src/database/engine/page.h
index 32c87c580..32c87c580 100644
--- a/database/engine/page.h
+++ b/src/database/engine/page.h
diff --git a/database/engine/page_test.cc b/src/database/engine/page_test.cc
index d61299bc4..d61299bc4 100644
--- a/database/engine/page_test.cc
+++ b/src/database/engine/page_test.cc
diff --git a/database/engine/page_test.h b/src/database/engine/page_test.h
index 30837f0ab..30837f0ab 100644
--- a/database/engine/page_test.h
+++ b/src/database/engine/page_test.h
diff --git a/database/engine/pagecache.c b/src/database/engine/pagecache.c
index dab9cdd0d..452fdc50b 100644
--- a/database/engine/pagecache.c
+++ b/src/database/engine/pagecache.c
@@ -222,7 +222,7 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin
Word_t metric_id = mrg_metric_id(main_mrg, metric);
time_t now_s = wanted_start_time_s;
- time_t dt_s = mrg_metric_get_update_every_s(main_mrg, metric);
+ uint32_t dt_s = mrg_metric_get_update_every_s(main_mrg, metric);
if(!dt_s)
dt_s = default_rrd_update_every;
@@ -246,7 +246,7 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin
time_t page_start_time_s = pgc_page_start_time_s(page);
time_t page_end_time_s = pgc_page_end_time_s(page);
- time_t page_update_every_s = pgc_page_update_every_s(page);
+ uint32_t page_update_every_s = pgc_page_update_every_s(page);
if(!page_update_every_s)
page_update_every_s = dt_s;
@@ -282,7 +282,7 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin
pd->metric_id = metric_id;
pd->first_time_s = page_start_time_s;
pd->last_time_s = page_end_time_s;
- pd->update_every_s = (uint32_t) page_update_every_s;
+ pd->update_every_s = page_update_every_s;
pd->page = (open_cache_mode) ? NULL : page;
pd->status |= tags;
@@ -332,8 +332,8 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin
static void pgc_inject_gap(struct rrdengine_instance *ctx, METRIC *metric, time_t start_time_s, time_t end_time_s) {
- time_t db_first_time_s, db_last_time_s, db_update_every_s;
- mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s);
+ time_t db_first_time_s, db_last_time_s;
+ mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, NULL);
if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) != PAGE_IS_IN_RANGE)
return;
@@ -547,7 +547,7 @@ static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METR
if(prc == PAGE_IS_IN_THE_FUTURE)
break;
- time_t page_update_every_s = page_entry_in_journal->update_every_s;
+ uint32_t page_update_every_s = page_entry_in_journal->update_every_s;
size_t page_length = page_entry_in_journal->page_length;
if(datafile_acquire(datafile, DATAFILE_ACQUIRE_OPEN_CACHE)) { //for open cache item
@@ -567,7 +567,7 @@ static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METR
.metric_id = metric_id,
.start_time_s = page_first_time_s,
.end_time_s = page_last_time_s,
- .update_every_s = (uint32_t) page_update_every_s,
+ .update_every_s = page_update_every_s,
.data = datafile,
.size = 0,
.custom_data = (uint8_t *) &ei,
@@ -845,7 +845,7 @@ struct pgc_page *pg_cache_lookup_next(
struct rrdengine_instance *ctx,
PDC *pdc,
time_t now_s,
- time_t last_update_every_s,
+ uint32_t last_update_every_s,
size_t *entries
) {
if (unlikely(!pdc))
@@ -905,7 +905,7 @@ struct pgc_page *pg_cache_lookup_next(
time_t page_start_time_s = pgc_page_start_time_s(page);
time_t page_end_time_s = pgc_page_end_time_s(page);
- time_t page_update_every_s = pgc_page_update_every_s(page);
+ uint32_t page_update_every_s = pgc_page_update_every_s(page);
if(unlikely(page_start_time_s == INVALID_TIME || page_end_time_s == INVALID_TIME)) {
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_zero_time_skipped, 1, __ATOMIC_RELAXED);
@@ -918,7 +918,7 @@ struct pgc_page *pg_cache_lookup_next(
if (unlikely(page_update_every_s <= 0 || page_update_every_s > 86400)) {
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_update_every_fixed, 1, __ATOMIC_RELAXED);
page_update_every_s = pgc_page_fix_update_every(page, last_update_every_s);
- pd->update_every_s = (uint32_t) page_update_every_s;
+ pd->update_every_s = page_update_every_s;
}
size_t entries_by_size = pgd_slots_used(pgc_page_data(page));
@@ -983,7 +983,7 @@ struct pgc_page *pg_cache_lookup_next(
return page;
}
-void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, time_t update_every_s,
+void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, uint32_t update_every_s,
struct rrdengine_datafile *datafile, uint64_t extent_offset, unsigned extent_size, uint32_t page_length) {
if(!datafile_acquire(datafile, DATAFILE_ACQUIRE_OPEN_CACHE)) // for open cache item
@@ -1003,7 +1003,7 @@ void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s
.metric_id = metric_id,
.start_time_s = start_time_s,
.end_time_s = end_time_s,
- .update_every_s = (uint32_t) update_every_s,
+ .update_every_s = update_every_s,
.size = 0,
.data = datafile,
.custom_data = (uint8_t *) &ext_io_data,
diff --git a/database/engine/pagecache.h b/src/database/engine/pagecache.h
index dbcbea53a..103d36484 100644
--- a/database/engine/pagecache.h
+++ b/src/database/engine/pagecache.h
@@ -14,8 +14,6 @@ extern struct pgc *extent_cache;
struct rrdengine_instance;
#define INVALID_TIME (0)
-#define MAX_PAGE_CACHE_FETCH_RETRIES (3)
-#define PAGE_CACHE_FETCH_WAIT_TIMEOUT (3)
extern struct rrdeng_cache_efficiency_stats rrdeng_cache_efficiency_stats;
@@ -54,9 +52,9 @@ struct page_details_control;
void rrdeng_prep_wait(struct page_details_control *pdc);
void rrdeng_prep_query(struct page_details_control *pdc, bool worker);
void pg_cache_preload(struct rrdeng_query_handle *handle);
-struct pgc_page *pg_cache_lookup_next(struct rrdengine_instance *ctx, struct page_details_control *pdc, time_t now_s, time_t last_update_every_s, size_t *entries);
+struct pgc_page *pg_cache_lookup_next(struct rrdengine_instance *ctx, struct page_details_control *pdc, time_t now_s, uint32_t last_update_every_s, size_t *entries);
void pgc_and_mrg_initialize(void);
-void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, time_t update_every_s, struct rrdengine_datafile *datafile, uint64_t extent_offset, unsigned extent_size, uint32_t page_length);
+void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, uint32_t update_every_s, struct rrdengine_datafile *datafile, uint64_t extent_offset, unsigned extent_size, uint32_t page_length);
#endif /* NETDATA_PAGECACHE_H */
diff --git a/database/engine/pdc.c b/src/database/engine/pdc.c
index 5fe205e64..79a424b77 100644
--- a/database/engine/pdc.c
+++ b/src/database/engine/pdc.c
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#define NETDATA_RRD_INTERNALS
#include "pdc.h"
+#include "dbengine-compression.h"
struct extent_page_details_list {
uv_file file;
@@ -628,24 +629,25 @@ void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags) {
buffer_strcat(wb, "STEP_UNALIGNED");
}
-inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error) {
+inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, uint32_t overwrite_zero_update_every_s, bool have_read_error) {
time_t start_time_s = (time_t) (descr->start_time_ut / USEC_PER_SEC);
- time_t end_time_s;
- size_t entries;
+ time_t end_time_s = 0;
+ size_t entries = 0;
switch (descr->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
end_time_s = descr->end_time_ut / USEC_PER_SEC;
entries = 0;
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
end_time_s = start_time_s + descr->gorilla.delta_time_s;
entries = descr->gorilla.entries;
break;
default:
- fatal("Unknown page type: %uc\n", descr->type);
+ // Nothing to do. Validate page will notify the user.
+ break;
}
return validate_page(
@@ -666,29 +668,30 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
uuid_t *uuid,
time_t start_time_s,
time_t end_time_s,
- time_t update_every_s, // can be zero, if unknown
+ uint32_t update_every_s, // can be zero, if unknown
size_t page_length,
uint8_t page_type,
size_t entries, // can be zero, if unknown
time_t now_s, // can be zero, to disable future timestamp check
- time_t overwrite_zero_update_every_s, // can be zero, if unknown
+ uint32_t overwrite_zero_update_every_s, // can be zero, if unknown
bool have_read_error,
const char *msg,
- RRDENG_COLLECT_PAGE_FLAGS flags) {
-
+ RRDENG_COLLECT_PAGE_FLAGS flags)
+{
VALIDATED_PAGE_DESCRIPTOR vd = {
.start_time_s = start_time_s,
.end_time_s = end_time_s,
.update_every_s = update_every_s,
.page_length = page_length,
+ .point_size = page_type_size[page_type],
.type = page_type,
.is_valid = true,
};
- vd.point_size = page_type_size[vd.type];
+ bool known_page_type = true;
switch (page_type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
// always calculate entries by size
vd.entries = page_entries_by_size(vd.page_length, vd.point_size);
@@ -696,13 +699,13 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
if(!entries)
entries = vd.entries;
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
internal_fatal(entries == 0, "0 number of entries found on gorilla page");
vd.entries = entries;
break;
default:
- // TODO: should set vd.is_valid false instead?
- fatal("Unknown page type: %uc", page_type);
+ known_page_type = false;
+ break;
}
// allow to be called without update every (when loading pages from disk)
@@ -723,16 +726,16 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
// If gorilla can not compress the data we might end up needing slightly more
// than 4KiB. However, gorilla pages extend the page length by increments of
// 512 bytes.
- max_page_length += ((page_type == PAGE_GORILLA_METRICS) * GORILLA_BUFFER_SIZE);
+ max_page_length += ((page_type == RRDENG_PAGE_TYPE_GORILLA_32BIT) * RRDENG_GORILLA_32BIT_BUFFER_SIZE);
- if( have_read_error ||
+ if (!known_page_type ||
+ have_read_error ||
vd.page_length == 0 ||
vd.page_length > max_page_length ||
vd.start_time_s > vd.end_time_s ||
(now_s && vd.end_time_s > now_s) ||
vd.start_time_s <= 0 ||
vd.end_time_s <= 0 ||
- vd.update_every_s < 0 ||
(vd.start_time_s == vd.end_time_s && vd.entries > 1) ||
(vd.update_every_s == 0 && vd.entries > 1))
{
@@ -791,13 +794,13 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
#endif
"DBENGINE: metric '%s' %s invalid page of type %u "
- "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s)",
+ "from %ld to %ld (now %ld), update every %u, page length %zu, entries %zu (flags: %s)",
uuid_str, msg, vd.type,
vd.start_time_s, vd.end_time_s, now_s, vd.update_every_s, vd.page_length, vd.entries, wb?buffer_tostring(wb):""
);
}
else {
- const char *err_valid = (vd.is_valid) ? "" : "found invalid, ";
+ const char *err_valid = "";
const char *err_start = (vd.start_time_s == start_time_s) ? "" : "start time updated, ";
const char *err_end = (vd.end_time_s == end_time_s) ? "" : "end time updated, ";
const char *err_update = (vd.update_every_s == update_every_s) ? "" : "update every updated, ";
@@ -811,9 +814,9 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
#endif
"DBENGINE: metric '%s' %s page of type %u "
- "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s), "
+ "from %ld to %ld (now %ld), update every %u, page length %zu, entries %zu (flags: %s), "
"found inconsistent - the right is "
- "from %ld to %ld, update every %ld, page length %zu, entries %zu: "
+ "from %ld to %ld, update every %u, page length %zu, entries %zu: "
"%s%s%s%s%s%s%s",
uuid_str, msg, vd.type,
start_time_s, end_time_s, now_s, update_every_s, page_length, entries, wb?buffer_tostring(wb):"",
@@ -871,11 +874,11 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL *
if (descr) {
start_time_s = (time_t)(descr->start_time_ut / USEC_PER_SEC);
switch (descr->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC);
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
end_time_s = (time_t) start_time_s + (descr->gorilla.delta_time_s);
break;
}
@@ -938,7 +941,6 @@ static bool epdl_populate_pages_from_extent_data(
PDC_PAGE_STATUS tags,
bool cached_extent)
{
- int ret;
unsigned i, count;
void *uncompressed_buf = NULL;
uint32_t payload_length, payload_offset, trailer_offset, uncompressed_payload_length = 0;
@@ -973,18 +975,17 @@ static bool epdl_populate_pages_from_extent_data(
if( !can_use_data ||
count < 1 ||
count > MAX_PAGES_PER_EXTENT ||
- (header->compression_algorithm != RRD_NO_COMPRESSION && header->compression_algorithm != RRD_LZ4) ||
+ !dbengine_valid_compression_algorithm(header->compression_algorithm) ||
(payload_length != trailer_offset - payload_offset) ||
(data_length != payload_offset + payload_length + sizeof(*trailer))
- ) {
+ ) {
epdl_extent_loading_error_log(ctx, epdl, NULL, "header is INVALID");
return false;
}
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, data, epdl->extent_size - sizeof(*trailer));
- ret = crc32cmp(trailer->checksum, crc);
- if (unlikely(ret)) {
+ if (unlikely(crc32cmp(trailer->checksum, crc))) {
ctx_io_error(ctx);
have_read_error = true;
epdl_extent_loading_error_log(ctx, epdl, NULL, "CRC32 checksum FAILED");
@@ -993,14 +994,15 @@ static bool epdl_populate_pages_from_extent_data(
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_EXTENT_DECOMPRESSION);
- if (likely(!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm)) {
+ if (likely(!have_read_error && RRDENG_COMPRESSION_NONE != header->compression_algorithm)) {
// find the uncompressed extent size
uncompressed_payload_length = 0;
for (i = 0; i < count; ++i) {
size_t page_length = header->descr[i].page_length;
- if (page_length > RRDENG_BLOCK_SIZE && (header->descr[i].type != PAGE_GORILLA_METRICS ||
- (header->descr[i].type == PAGE_GORILLA_METRICS &&
- (page_length - RRDENG_BLOCK_SIZE) % GORILLA_BUFFER_SIZE))) {
+ if (page_length > RRDENG_BLOCK_SIZE &&
+ (header->descr[i].type != RRDENG_PAGE_TYPE_GORILLA_32BIT ||
+ (header->descr[i].type == RRDENG_PAGE_TYPE_GORILLA_32BIT &&
+ (page_length - RRDENG_BLOCK_SIZE) % RRDENG_GORILLA_32BIT_BUFFER_SIZE))) {
have_read_error = true;
break;
}
@@ -1015,11 +1017,16 @@ static bool epdl_populate_pages_from_extent_data(
eb = extent_buffer_get(uncompressed_payload_length);
uncompressed_buf = eb->data;
- ret = LZ4_decompress_safe(data + payload_offset, uncompressed_buf,
- (int) payload_length, (int) uncompressed_payload_length);
+ size_t bytes = dbengine_decompress(uncompressed_buf, data + payload_offset,
+ uncompressed_payload_length, payload_length,
+ header->compression_algorithm);
- __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED);
- __atomic_add_fetch(&ctx->stats.after_decompress_bytes, ret, __ATOMIC_RELAXED);
+ if(!bytes)
+ have_read_error = true;
+ else {
+ __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ctx->stats.after_decompress_bytes, bytes, __ATOMIC_RELAXED);
+ }
}
}
@@ -1075,7 +1082,7 @@ static bool epdl_populate_pages_from_extent_data(
stats_load_invalid_page++;
}
else {
- if (RRD_NO_COMPRESSION == header->compression_algorithm) {
+ if (RRDENG_COMPRESSION_NONE == header->compression_algorithm) {
pgd = pgd_create_from_disk_data(header->descr[i].type,
data + payload_offset + page_offset,
vd.page_length);
diff --git a/database/engine/pdc.h b/src/database/engine/pdc.h
index 9bae39ade..9bae39ade 100644
--- a/database/engine/pdc.h
+++ b/src/database/engine/pdc.h
diff --git a/database/engine/rrddiskprotocol.h b/src/database/engine/rrddiskprotocol.h
index 86b41f0b3..dc1a4c980 100644
--- a/database/engine/rrddiskprotocol.h
+++ b/src/database/engine/rrddiskprotocol.h
@@ -19,13 +19,16 @@
#define UUID_SZ (16)
#define CHECKSUM_SZ (4) /* CRC32 */
-#define RRD_NO_COMPRESSION (0)
-#define RRD_LZ4 (1)
+#define RRDENG_COMPRESSION_NONE (0)
+#define RRDENG_COMPRESSION_LZ4 (1)
+#define RRDENG_COMPRESSION_ZSTD (2)
#define RRDENG_DF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ + sizeof(uint8_t)))
+
/*
* Data file persistent super-block
*/
+
struct rrdeng_df_sb {
char magic_number[RRDENG_MAGIC_SZ];
char version[RRDENG_VER_SZ];
@@ -36,10 +39,11 @@ struct rrdeng_df_sb {
/*
* Page types
*/
-#define PAGE_METRICS (0)
-#define PAGE_TIER (1)
-#define PAGE_GORILLA_METRICS (2)
-#define PAGE_TYPE_MAX 2 // Maximum page type (inclusive)
+
+#define RRDENG_PAGE_TYPE_ARRAY_32BIT (0)
+#define RRDENG_PAGE_TYPE_ARRAY_TIER1 (1)
+#define RRDENG_PAGE_TYPE_GORILLA_32BIT (2)
+#define RRDENG_PAGE_TYPE_MAX (2) // Maximum page type (inclusive)
/*
* Data file page descriptor
diff --git a/database/engine/rrdengine.c b/src/database/engine/rrdengine.c
index b82cc1ad1..7b2137436 100644
--- a/database/engine/rrdengine.c
+++ b/src/database/engine/rrdengine.c
@@ -3,6 +3,7 @@
#include "rrdengine.h"
#include "pdc.h"
+#include "dbengine-compression.h"
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
@@ -229,7 +230,7 @@ static void after_work_standard_callback(uv_work_t* req, int status) {
worker_is_idle();
}
-static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb work_cb, after_work_cb after_work_cb) {
+static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb do_work_cb, after_work_cb do_after_work_cb) {
struct rrdeng_work *work_request = NULL;
internal_fatal(rrdeng_main.tid != gettid(), "work_dispatch() can only be run from the event loop thread");
@@ -240,8 +241,8 @@ static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct com
work_request->ctx = ctx;
work_request->data = data;
work_request->completion = completion;
- work_request->work_cb = work_cb;
- work_request->after_work_cb = after_work_cb;
+ work_request->work_cb = do_work_cb;
+ work_request->after_work_cb = do_after_work_cb;
work_request->opcode = opcode;
if(uv_queue_work(&rrdeng_main.loop, &work_request->req, work_standard_worker, after_work_standard_callback)) {
@@ -772,13 +773,10 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_
*/
static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_instance *ctx, struct page_descr_with_data *base, struct completion *completion) {
int ret;
- int compressed_size, max_compressed_size = 0;
unsigned i, count, size_bytes, pos, real_io_size;
- uint32_t uncompressed_payload_length, payload_offset;
+ uint32_t uncompressed_payload_length, max_compressed_size, payload_offset;
struct page_descr_with_data *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
struct extent_io_descriptor *xt_io_descr;
- struct extent_buffer *eb = NULL;
- void *compressed_buf = NULL;
Word_t Index;
uint8_t compression_algorithm = ctx->config.global_compress_alg;
struct rrdengine_datafile *datafile;
@@ -807,20 +805,8 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
xt_io_descr = extent_io_descriptor_get();
xt_io_descr->ctx = ctx;
payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
- switch (compression_algorithm) {
- case RRD_NO_COMPRESSION:
- size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer);
- break;
-
- default: /* Compress */
- fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
- max_compressed_size = LZ4_compressBound(uncompressed_payload_length);
- eb = extent_buffer_get(max_compressed_size);
- compressed_buf = eb->data;
- size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer);
- break;
- }
-
+ max_compressed_size = dbengine_max_compressed_size(uncompressed_payload_length, compression_algorithm);
+ size_bytes = payload_offset + MAX(uncompressed_payload_length, max_compressed_size) + sizeof(*trailer);
ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
if (unlikely(ret)) {
fatal("DBENGINE: posix_memalign:%s", strerror(ret));
@@ -832,7 +818,6 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
pos = 0;
header = xt_io_descr->buf;
- header->compression_algorithm = compression_algorithm;
header->number_of_pages = count;
pos += sizeof(*header);
@@ -844,11 +829,11 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
header->descr[i].start_time_ut = descr->start_time_ut;
switch (descr->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
header->descr[i].end_time_ut = descr->end_time_ut;
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
header->descr[i].gorilla.delta_time_s = (uint32_t) ((descr->end_time_ut - descr->start_time_ut) / USEC_PER_SEC);
header->descr[i].gorilla.entries = pgd_slots_used(descr->pgd);
break;
@@ -858,29 +843,40 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
pos += sizeof(header->descr[i]);
}
+
+ // build the extent payload
for (i = 0 ; i < count ; ++i) {
descr = xt_io_descr->descr_array[i];
pgd_copy_to_extent(descr->pgd, xt_io_descr->buf + pos, descr->page_length);
pos += descr->page_length;
}
- if(likely(compression_algorithm == RRD_LZ4)) {
- compressed_size = LZ4_compress_default(
- xt_io_descr->buf + payload_offset,
- compressed_buf,
- (int)uncompressed_payload_length,
- max_compressed_size);
+ // compress the payload
+ size_t compressed_size =
+ (int)dbengine_compress(xt_io_descr->buf + payload_offset,
+ uncompressed_payload_length,
+ compression_algorithm);
- __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED);
- __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED);
+ internal_fatal(compressed_size > max_compressed_size, "DBENGINE: compression returned more data than the max allowed");
+ internal_fatal(compressed_size > uncompressed_payload_length, "DBENGINE: compression returned more data than the uncompressed extent");
- (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size);
- extent_buffer_release(eb);
- size_bytes = payload_offset + compressed_size + sizeof(*trailer);
+ if(compressed_size) {
+ header->compression_algorithm = compression_algorithm;
header->payload_length = compressed_size;
}
- else { // RRD_NO_COMPRESSION
- header->payload_length = uncompressed_payload_length;
+ else {
+ // compression failed, or generated bigger pages
+ // so it didn't touch our uncompressed buffer
+ header->compression_algorithm = RRDENG_COMPRESSION_NONE;
+ header->payload_length = compressed_size = uncompressed_payload_length;
+ }
+
+ // set the correct size
+ size_bytes = payload_offset + compressed_size + sizeof(*trailer);
+
+ if(compression_algorithm != RRDENG_COMPRESSION_NONE) {
+ __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED);
}
real_io_size = ALIGN_BYTES_CEILING(size_bytes);
@@ -1171,7 +1167,17 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r
for (size_t index = 0; index < added; ++index) {
uuid_first_t_entry = &uuid_first_entry_list[index];
if (likely(uuid_first_t_entry->first_time_s != LONG_MAX)) {
- mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s);
+
+ time_t old_first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric);
+
+ bool changed = mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s);
+ if (changed) {
+ uint32_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric);
+ if (update_every_s && old_first_time_s && uuid_first_t_entry->first_time_s > old_first_time_s) {
+ uint64_t remove_samples = (uuid_first_t_entry->first_time_s - old_first_time_s) / update_every_s;
+ __atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED);
+ }
+ }
mrg_metric_release(main_mrg, uuid_first_t_entry->metric);
}
else {
@@ -1180,6 +1186,14 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r
// there is no retention for this metric
bool has_retention = mrg_metric_zero_disk_retention(main_mrg, uuid_first_t_entry->metric);
if (!has_retention) {
+ time_t first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric);
+ time_t last_time_s = mrg_metric_get_latest_time_s(main_mrg, uuid_first_t_entry->metric);
+ time_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric);
+ if (update_every_s && first_time_s && last_time_s) {
+ uint64_t remove_samples = (first_time_s - last_time_s) / update_every_s;
+ __atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED);
+ }
+
bool deleted = mrg_metric_release_and_delete(main_mrg, uuid_first_t_entry->metric);
if(deleted)
deleted_metrics++;
diff --git a/database/engine/rrdengine.h b/src/database/engine/rrdengine.h
index cd3352f12..3047e0c6a 100644
--- a/database/engine/rrdengine.h
+++ b/src/database/engine/rrdengine.h
@@ -153,9 +153,9 @@ struct jv2_metrics_info {
struct jv2_page_info {
time_t start_time_s;
time_t end_time_s;
- time_t update_every_s;
- size_t page_length;
+ uint32_t update_every_s;
uint32_t extent_index;
+ size_t page_length;
void *custom_data;
// private
@@ -217,7 +217,7 @@ struct rrdeng_query_handle {
// internal data
time_t now_s;
- time_t dt_s;
+ uint32_t dt_s;
unsigned position;
unsigned entries;
@@ -387,6 +387,8 @@ struct rrdengine_instance {
unsigned extents_currently_being_flushed; // non-zero until we commit data to disk (both datafile and journal file)
time_t first_time_s;
+ uint64_t metrics;
+ uint64_t samples;
} atomic;
struct {
@@ -482,7 +484,7 @@ struct page_descr_with_data *page_descriptor_get(void);
typedef struct validated_page_descriptor {
time_t start_time_s;
time_t end_time_s;
- time_t update_every_s;
+ uint32_t update_every_s;
size_t page_length;
size_t point_size;
size_t entries;
@@ -499,16 +501,16 @@ typedef struct validated_page_descriptor {
VALIDATED_PAGE_DESCRIPTOR validate_page(uuid_t *uuid,
time_t start_time_s,
time_t end_time_s,
- time_t update_every_s,
+ uint32_t update_every_s,
size_t page_length,
uint8_t page_type,
size_t entries,
time_t now_s,
- time_t overwrite_zero_update_every_s,
+ uint32_t overwrite_zero_update_every_s,
bool have_read_error,
const char *msg,
RRDENG_COLLECT_PAGE_FLAGS flags);
-VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error);
+VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, uint32_t overwrite_zero_update_every_s, bool have_read_error);
void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags);
typedef enum {
diff --git a/database/engine/rrdengineapi.c b/src/database/engine/rrdengineapi.c
index 1ddce5243..43fed492b 100755
--- a/database/engine/rrdengineapi.c
+++ b/src/database/engine/rrdengineapi.c
@@ -2,6 +2,7 @@
#include "database/engine/rrddiskprotocol.h"
#include "rrdengine.h"
+#include "dbengine-compression.h"
/* Default global database instance */
struct rrdengine_instance multidb_ctx_storage_tier0;
@@ -16,7 +17,12 @@ struct rrdengine_instance multidb_ctx_storage_tier4;
#error RRD_STORAGE_TIERS is not 5 - you need to add allocations here
#endif
struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS];
-uint8_t tier_page_type[RRD_STORAGE_TIERS] = {PAGE_METRICS, PAGE_TIER, PAGE_TIER, PAGE_TIER, PAGE_TIER};
+uint8_t tier_page_type[RRD_STORAGE_TIERS] = {
+ RRDENG_PAGE_TYPE_GORILLA_32BIT,
+ RRDENG_PAGE_TYPE_ARRAY_TIER1,
+ RRDENG_PAGE_TYPE_ARRAY_TIER1,
+ RRDENG_PAGE_TYPE_ARRAY_TIER1,
+ RRDENG_PAGE_TYPE_ARRAY_TIER1};
#if defined(ENV32BIT)
size_t tier_page_size[RRD_STORAGE_TIERS] = {2048, 1024, 192, 192, 192};
@@ -24,14 +30,14 @@ size_t tier_page_size[RRD_STORAGE_TIERS] = {2048, 1024, 192, 192, 192};
size_t tier_page_size[RRD_STORAGE_TIERS] = {4096, 2048, 384, 384, 384};
#endif
-#if PAGE_TYPE_MAX != 2
+#if RRDENG_PAGE_TYPE_MAX != 2
#error PAGE_TYPE_MAX is not 2 - you need to add allocations here
#endif
size_t page_type_size[256] = {
- [PAGE_METRICS] = sizeof(storage_number),
- [PAGE_TIER] = sizeof(storage_number_tier1_t),
- [PAGE_GORILLA_METRICS] = sizeof(storage_number)
+ [RRDENG_PAGE_TYPE_ARRAY_32BIT] = sizeof(storage_number),
+ [RRDENG_PAGE_TYPE_ARRAY_TIER1] = sizeof(storage_number_tier1_t),
+ [RRDENG_PAGE_TYPE_GORILLA_32BIT] = sizeof(storage_number)
};
__attribute__((constructor)) void initialize_multidb_ctx(void) {
@@ -74,14 +80,14 @@ static inline bool rrdeng_page_alignment_release(struct pg_alignment *pa) {
}
// charts call this
-STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid __maybe_unused) {
+STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *si __maybe_unused, uuid_t *uuid __maybe_unused) {
struct pg_alignment *pa = callocz(1, sizeof(struct pg_alignment));
rrdeng_page_alignment_acquire(pa);
return (STORAGE_METRICS_GROUP *)pa;
}
// charts call this
-void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance __maybe_unused, STORAGE_METRICS_GROUP *smg) {
+void rrdeng_metrics_group_release(STORAGE_INSTANCE *si __maybe_unused, STORAGE_METRICS_GROUP *smg) {
if(unlikely(!smg)) return;
struct pg_alignment *pa = (struct pg_alignment *)smg;
@@ -108,8 +114,8 @@ void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_
memcpy(ret_uuid, hash_value, sizeof(uuid_t));
}
-static METRIC *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+static METRIC *rrdeng_metric_get_legacy(STORAGE_INSTANCE *si, const char *rd_id, const char *st_id) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
uuid_t legacy_uuid;
rrdeng_generate_legacy_uuid(rd_id, st_id, &legacy_uuid);
return mrg_metric_get_and_acquire(main_mrg, &legacy_uuid, (Word_t) ctx);
@@ -118,25 +124,25 @@ static METRIC *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const cha
// ----------------------------------------------------------------------------
// metric handle
-void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle) {
- METRIC *metric = (METRIC *)db_metric_handle;
+void rrdeng_metric_release(STORAGE_METRIC_HANDLE *smh) {
+ METRIC *metric = (METRIC *)smh;
mrg_metric_release(main_mrg, metric);
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) {
- METRIC *metric = (METRIC *)db_metric_handle;
+STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *smh) {
+ METRIC *metric = (METRIC *)smh;
return (STORAGE_METRIC_HANDLE *) mrg_metric_dup(main_mrg, metric);
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *si, uuid_t *uuid) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
return (STORAGE_METRIC_HANDLE *) mrg_metric_get_and_acquire(main_mrg, uuid, (Word_t) ctx);
}
-static METRIC *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
- internal_fatal(!db_instance, "DBENGINE: db_instance is NULL");
+static METRIC *rrdeng_metric_create(STORAGE_INSTANCE *si, uuid_t *uuid) {
+ internal_fatal(!si, "DBENGINE: STORAGE_INSTANCE is NULL");
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
MRG_ENTRY entry = {
.uuid = uuid,
.section = (Word_t)ctx,
@@ -145,12 +151,15 @@ static METRIC *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid)
.latest_update_every_s = 0,
};
- METRIC *metric = mrg_metric_add_and_acquire(main_mrg, entry, NULL);
+ bool added;
+ METRIC *metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
+ if (added)
+ __atomic_add_fetch(&ctx->atomic.metrics, 1, __ATOMIC_RELAXED);
return metric;
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
METRIC *metric;
metric = mrg_metric_get_and_acquire(main_mrg, &rd->metric_uuid, (Word_t) ctx);
@@ -160,13 +169,13 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE
// this is a single host database
// generate uuid from the chart and dimensions ids
// and overwrite the one supplied by rrddim
- metric = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset));
+ metric = rrdeng_metric_get_legacy(si, rrddim_id(rd), rrdset_id(rd->rrdset));
if (metric)
uuid_copy(rd->metric_uuid, *mrg_metric_uuid(main_mrg, metric));
}
if(likely(!metric))
- metric = rrdeng_metric_create(db_instance, &rd->metric_uuid);
+ metric = rrdeng_metric_create(si, &rd->metric_uuid);
}
#ifdef NETDATA_INTERNAL_CHECKS
@@ -192,14 +201,14 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE
// collect ops
static inline void check_and_fix_mrg_update_every(struct rrdeng_collect_handle *handle) {
- if(unlikely((time_t)(handle->update_every_ut / USEC_PER_SEC) != mrg_metric_get_update_every_s(main_mrg, handle->metric))) {
- internal_error(true, "DBENGINE: collection handle has update every %ld, but the metric registry has %ld. Fixing it.",
- (time_t)(handle->update_every_ut / USEC_PER_SEC), mrg_metric_get_update_every_s(main_mrg, handle->metric));
+ if(unlikely((uint32_t)(handle->update_every_ut / USEC_PER_SEC) != mrg_metric_get_update_every_s(main_mrg, handle->metric))) {
+ internal_error(true, "DBENGINE: collection handle has update every %u, but the metric registry has %u. Fixing it.",
+ (uint32_t)(handle->update_every_ut / USEC_PER_SEC), mrg_metric_get_update_every_s(main_mrg, handle->metric));
if(unlikely(!handle->update_every_ut))
handle->update_every_ut = (usec_t)mrg_metric_get_update_every_s(main_mrg, handle->metric) * USEC_PER_SEC;
else
- mrg_metric_set_update_every(main_mrg, handle->metric, (time_t)(handle->update_every_ut / USEC_PER_SEC));
+ mrg_metric_set_update_every(main_mrg, handle->metric, (uint32_t)(handle->update_every_ut / USEC_PER_SEC));
}
}
@@ -213,7 +222,7 @@ static inline bool check_completed_page_consistency(struct rrdeng_collect_handle
uuid_t *uuid = mrg_metric_uuid(main_mrg, handle->metric);
time_t start_time_s = pgc_page_start_time_s(handle->pgc_page);
time_t end_time_s = pgc_page_end_time_s(handle->pgc_page);
- time_t update_every_s = pgc_page_update_every_s(handle->pgc_page);
+ uint32_t update_every_s = pgc_page_update_every_s(handle->pgc_page);
size_t page_length = handle->page_position * CTX_POINT_SIZE_BYTES(ctx);
size_t entries = handle->page_position;
time_t overwrite_zero_update_every_s = (time_t)(handle->update_every_ut / USEC_PER_SEC);
@@ -245,8 +254,8 @@ static inline bool check_completed_page_consistency(struct rrdeng_collect_handle
* Gets a handle for storing metrics to the database.
* The handle must be released with rrdeng_store_metric_final().
*/
-STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) {
- METRIC *metric = (METRIC *)db_metric_handle;
+STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *smh, uint32_t update_every, STORAGE_METRICS_GROUP *smg) {
+ METRIC *metric = (METRIC *)smh;
struct rrdengine_instance *ctx = mrg_metric_ctx(metric);
bool is_1st_metric_writer = true;
@@ -262,7 +271,7 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri
struct rrdeng_collect_handle *handle;
handle = callocz(1, sizeof(struct rrdeng_collect_handle));
- handle->common.backend = STORAGE_ENGINE_BACKEND_DBENGINE;
+ handle->common.seb = STORAGE_ENGINE_BACKEND_DBENGINE;
handle->metric = metric;
handle->pgc_page = NULL;
@@ -288,15 +297,15 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri
// data collection may be able to go back in time and during the addition of new pages
// clean pages may be found matching ours!
- time_t db_first_time_s, db_last_time_s, db_update_every_s;
- mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s);
+ time_t db_first_time_s, db_last_time_s;
+ mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, NULL);
handle->page_end_time_ut = (usec_t)db_last_time_s * USEC_PER_SEC;
return (STORAGE_COLLECT_HANDLE *)handle;
}
-void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) {
- struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *sch) {
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch;
if (unlikely(!handle->pgc_page))
return;
@@ -307,7 +316,17 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h
else {
check_completed_page_consistency(handle);
mrg_metric_set_clean_latest_time_s(main_mrg, handle->metric, pgc_page_end_time_s(handle->pgc_page));
- pgc_page_hot_to_dirty_and_release(main_cache, handle->pgc_page);
+
+ struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
+ time_t start_time_s = pgc_page_start_time_s(handle->pgc_page);
+ time_t end_time_s = pgc_page_end_time_s(handle->pgc_page);
+ uint32_t update_every_s = mrg_metric_get_update_every_s(main_mrg, handle->metric);
+ if (end_time_s && start_time_s && end_time_s > start_time_s && update_every_s) {
+ uint64_t add_samples = (end_time_s - start_time_s) / update_every_s;
+ __atomic_add_fetch(&ctx->atomic.samples, add_samples, __ATOMIC_RELAXED);
+ }
+
+ pgc_page_hot_to_dirty_and_release(main_cache, handle->pgc_page, false);
}
mrg_metric_set_hot_latest_time_s(main_mrg, handle->metric, 0);
@@ -336,7 +355,7 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha
PGD *data,
size_t data_size) {
time_t point_in_time_s = (time_t)(point_in_time_ut / USEC_PER_SEC);
- const time_t update_every_s = (time_t)(handle->update_every_ut / USEC_PER_SEC);
+ const uint32_t update_every_s = (uint32_t)(handle->update_every_ut / USEC_PER_SEC);
PGC_ENTRY page_entry = {
.section = (Word_t) ctx,
@@ -345,7 +364,7 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha
.end_time_s = point_in_time_s,
.size = data_size,
.data = data,
- .update_every_s = (uint32_t) update_every_s,
+ .update_every_s = update_every_s,
.hot = true
};
@@ -364,11 +383,11 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha
nd_log_limit_static_global_var(erl, 1, 0);
nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
#endif
- "DBENGINE: metric '%s' new page from %ld to %ld, update every %ld, has a conflict in main cache "
- "with existing %s%s page from %ld to %ld, update every %ld - "
+ "DBENGINE: metric '%s' new page from %ld to %ld, update every %u, has a conflict in main cache "
+ "with existing %s%s page from %ld to %ld, update every %u - "
"is it collected more than once?",
uuid,
- page_entry.start_time_s, page_entry.end_time_s, (time_t)page_entry.update_every_s,
+ page_entry.start_time_s, page_entry.end_time_s, page_entry.update_every_s,
pgc_is_page_hot(pgc_page) ? "hot" : "not-hot",
pgc_page_data(pgc_page) == PGD_EMPTY ? " gap" : "",
pgc_page_start_time_s(pgc_page), pgc_page_end_time_s(pgc_page), pgc_page_update_every_s(pgc_page)
@@ -444,14 +463,14 @@ static PGD *rrdeng_alloc_new_page_data(struct rrdeng_collect_handle *handle, siz
*data_size = size;
switch (ctx->config.page_type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
d = pgd_create(ctx->config.page_type, slots);
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
// ignore slots, and use the fixed number of slots per gorilla buffer.
// gorilla will automatically add more buffers if needed.
- d = pgd_create(ctx->config.page_type, GORILLA_BUFFER_SLOTS);
+ d = pgd_create(ctx->config.page_type, RRDENG_GORILLA_32BIT_BUFFER_SLOTS);
break;
default:
fatal("Unknown page type: %uc\n", ctx->config.page_type);
@@ -461,7 +480,7 @@ static PGD *rrdeng_alloc_new_page_data(struct rrdeng_collect_handle *handle, siz
return d;
}
-static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_handle,
+static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *sch,
const usec_t point_in_time_ut,
const NETDATA_DOUBLE n,
const NETDATA_DOUBLE min_value,
@@ -470,7 +489,7 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_
const uint16_t anomaly_count,
const SN_FLAGS flags)
{
- struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch;
struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
if(unlikely(!handle->page_data))
@@ -497,7 +516,7 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_
if(unlikely(++handle->page_position >= handle->page_entries_max)) {
internal_fatal(handle->page_position > handle->page_entries_max, "DBENGINE: exceeded page max number of points");
handle->page_flags |= RRDENG_PAGE_FULL;
- rrdeng_store_metric_flush_current_page(collection_handle);
+ rrdeng_store_metric_flush_current_page(sch);
}
}
@@ -543,7 +562,7 @@ static void store_metric_next_error_log(struct rrdeng_collect_handle *handle __m
#endif
}
-void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
+void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *sch,
const usec_t point_in_time_ut,
const NETDATA_DOUBLE n,
const NETDATA_DOUBLE min_value,
@@ -554,7 +573,7 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
{
timing_step(TIMING_STEP_RRDSET_STORE_METRIC);
- struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch;
#ifdef NETDATA_INTERNAL_CHECKS
if(unlikely(point_in_time_ut > (usec_t)max_acceptable_collected_time() * USEC_PER_SEC))
@@ -571,11 +590,11 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
if(handle->pgc_page) {
if (unlikely(delta_ut < handle->update_every_ut)) {
handle->page_flags |= RRDENG_PAGE_STEP_TOO_SMALL;
- rrdeng_store_metric_flush_current_page(collection_handle);
+ rrdeng_store_metric_flush_current_page(sch);
}
else if (unlikely(delta_ut % handle->update_every_ut)) {
handle->page_flags |= RRDENG_PAGE_STEP_UNALIGNED;
- rrdeng_store_metric_flush_current_page(collection_handle);
+ rrdeng_store_metric_flush_current_page(sch);
}
else {
size_t points_gap = delta_ut / handle->update_every_ut;
@@ -583,7 +602,7 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
if (points_gap >= page_remaining_points) {
handle->page_flags |= RRDENG_PAGE_BIG_GAP;
- rrdeng_store_metric_flush_current_page(collection_handle);
+ rrdeng_store_metric_flush_current_page(sch);
}
else {
// loop to fill the gap
@@ -594,7 +613,7 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
this_ut <= stop_ut;
this_ut = handle->page_end_time_ut + handle->update_every_ut) {
rrdeng_store_metric_append_point(
- collection_handle,
+ sch,
this_ut,
NAN, NAN, NAN,
1, 0,
@@ -618,7 +637,7 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
timing_step(TIMING_STEP_DBENGINE_FIRST_CHECK);
- rrdeng_store_metric_append_point(collection_handle,
+ rrdeng_store_metric_append_point(sch,
point_in_time_ut,
n, min_value, max_value,
count, anomaly_count,
@@ -629,12 +648,12 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
* Releases the database reference from the handle for storing metrics.
* Returns 1 if it's safe to delete the dimension.
*/
-int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
- struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *sch) {
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch;
struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
handle->page_flags |= RRDENG_PAGE_COLLECT_FINALIZE;
- rrdeng_store_metric_flush_current_page(collection_handle);
+ rrdeng_store_metric_flush_current_page(sch);
rrdeng_page_alignment_release(handle->alignment);
__atomic_sub_fetch(&ctx->atomic.collectors_running, 1, __ATOMIC_RELAXED);
@@ -644,8 +663,8 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
if((handle->options & RRDENG_1ST_METRIC_WRITER) && !mrg_metric_clear_writer(main_mrg, handle->metric))
internal_fatal(true, "DBENGINE: metric is already released");
- time_t first_time_s, last_time_s, update_every_s;
- mrg_metric_get_retention(main_mrg, handle->metric, &first_time_s, &last_time_s, &update_every_s);
+ time_t first_time_s, last_time_s;
+ mrg_metric_get_retention(main_mrg, handle->metric, &first_time_s, &last_time_s, NULL);
mrg_metric_release(main_mrg, handle->metric);
freez(handle);
@@ -656,8 +675,8 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
return 0;
}
-void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) {
- struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *sch, int update_every) {
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch;
check_and_fix_mrg_update_every(handle);
METRIC *metric = handle->metric;
@@ -667,7 +686,7 @@ void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *col
return;
handle->page_flags |= RRDENG_PAGE_UPDATE_EVERY_CHANGE;
- rrdeng_store_metric_flush_current_page(collection_handle);
+ rrdeng_store_metric_flush_current_page(sch);
mrg_metric_set_update_every(main_mrg, metric, update_every);
handle->update_every_ut = update_every_ut;
}
@@ -704,8 +723,8 @@ static void unregister_query_handle(struct rrdeng_query_handle *handle __maybe_u
* Gets a handle for loading metrics from the database.
* The handle must be released with rrdeng_load_metric_final().
*/
-void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
- struct storage_engine_query_handle *rrddim_handle,
+void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *smh,
+ struct storage_engine_query_handle *seqh,
time_t start_time_s,
time_t end_time_s,
STORAGE_PRIORITY priority)
@@ -714,7 +733,7 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
netdata_thread_disable_cancelability();
- METRIC *metric = (METRIC *)db_metric_handle;
+ METRIC *metric = (METRIC *)smh;
struct rrdengine_instance *ctx = mrg_metric_ctx(metric);
struct rrdeng_query_handle *handle;
@@ -736,7 +755,8 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
// is inserted into the main cache, to avoid scanning the journals
// again for pages matching the gap.
- time_t db_first_time_s, db_last_time_s, db_update_every_s;
+ time_t db_first_time_s, db_last_time_s;
+ uint32_t db_update_every_s;
mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s);
if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) == PAGE_IS_IN_RANGE) {
@@ -750,11 +770,11 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
mrg_metric_set_update_every_s_if_zero(main_mrg, metric, default_rrd_update_every);
}
- rrddim_handle->handle = (STORAGE_QUERY_HANDLE *) handle;
- rrddim_handle->start_time_s = handle->start_time_s;
- rrddim_handle->end_time_s = handle->end_time_s;
- rrddim_handle->priority = priority;
- rrddim_handle->backend = STORAGE_ENGINE_BACKEND_DBENGINE;
+ seqh->handle = (STORAGE_QUERY_HANDLE *) handle;
+ seqh->start_time_s = handle->start_time_s;
+ seqh->end_time_s = handle->end_time_s;
+ seqh->priority = priority;
+ seqh->seb = STORAGE_ENGINE_BACKEND_DBENGINE;
pg_cache_preload(handle);
@@ -766,16 +786,16 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
handle->now_s = start_time_s;
handle->dt_s = db_update_every_s;
- rrddim_handle->handle = (STORAGE_QUERY_HANDLE *) handle;
- rrddim_handle->start_time_s = handle->start_time_s;
- rrddim_handle->end_time_s = 0;
- rrddim_handle->priority = priority;
- rrddim_handle->backend = STORAGE_ENGINE_BACKEND_DBENGINE;
+ seqh->handle = (STORAGE_QUERY_HANDLE *) handle;
+ seqh->start_time_s = handle->start_time_s;
+ seqh->end_time_s = 0;
+ seqh->priority = priority;
+ seqh->seb = STORAGE_ENGINE_BACKEND_DBENGINE;
}
}
-static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_handle, bool debug_this __maybe_unused) {
- struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
+static bool rrdeng_load_page_next(struct storage_engine_query_handle *seqh, bool debug_this __maybe_unused) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle;
struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
if (likely(handle->page)) {
@@ -785,7 +805,7 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han
pgdc_reset(&handle->pgdc, NULL, UINT32_MAX);
}
- if (unlikely(handle->now_s > rrddim_handle->end_time_s))
+ if (unlikely(handle->now_s > seqh->end_time_s))
return false;
size_t entries = 0;
@@ -799,7 +819,7 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han
time_t page_start_time_s = pgc_page_start_time_s(handle->page);
time_t page_end_time_s = pgc_page_end_time_s(handle->page);
- time_t page_update_every_s = pgc_page_update_every_s(handle->page);
+ uint32_t page_update_every_s = pgc_page_update_every_s(handle->page);
unsigned position;
if(likely(handle->now_s >= page_start_time_s && handle->now_s <= page_end_time_s)) {
@@ -810,13 +830,13 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han
}
else {
position = (handle->now_s - page_start_time_s) * (entries - 1) / (page_end_time_s - page_start_time_s);
- time_t point_end_time_s = page_start_time_s + position * page_update_every_s;
+ time_t point_end_time_s = page_start_time_s + position * (time_t) page_update_every_s;
while(point_end_time_s < handle->now_s && position + 1 < entries) {
// https://github.com/netdata/netdata/issues/14411
// we really need a while() here, because the delta may be
// 2 points at higher tiers
position++;
- point_end_time_s = page_start_time_s + position * page_update_every_s;
+ point_end_time_s = page_start_time_s + position * (time_t) page_update_every_s;
}
handle->now_s = point_end_time_s;
}
@@ -845,11 +865,11 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han
// Returns the metric and sets its timestamp into current_time
// IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags)
// IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES
-STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle) {
- struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
+STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *seqh) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle;
STORAGE_POINT sp;
- if (unlikely(handle->now_s > rrddim_handle->end_time_s)) {
+ if (unlikely(handle->now_s > seqh->end_time_s)) {
storage_point_empty(sp, handle->now_s - handle->dt_s, handle->now_s);
goto prepare_for_next_iteration;
}
@@ -857,8 +877,8 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim
if (unlikely(!handle->page || handle->position >= handle->entries)) {
// We need to get a new page
- if (!rrdeng_load_page_next(rrddim_handle, false)) {
- handle->now_s = rrddim_handle->end_time_s;
+ if (!rrdeng_load_page_next(seqh, false)) {
+ handle->now_s = seqh->end_time_s;
storage_point_empty(sp, handle->now_s - handle->dt_s, handle->now_s);
goto prepare_for_next_iteration;
}
@@ -870,7 +890,7 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim
pgdc_get_next_point(&handle->pgdc, handle->position, &sp);
prepare_for_next_iteration:
- internal_fatal(sp.end_time_s < rrddim_handle->start_time_s, "DBENGINE: this point is too old for this query");
+ internal_fatal(sp.end_time_s < seqh->start_time_s, "DBENGINE: this point is too old for this query");
internal_fatal(sp.end_time_s < handle->now_s, "DBENGINE: this point is too old for this point in time");
handle->now_s += handle->dt_s;
@@ -879,17 +899,17 @@ prepare_for_next_iteration:
return sp;
}
-int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrddim_handle) {
- struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
- return (handle->now_s > rrddim_handle->end_time_s);
+int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *seqh) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle;
+ return (handle->now_s > seqh->end_time_s);
}
/*
* Releases the database reference from the handle for loading metrics.
*/
-void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_handle)
+void rrdeng_load_metric_finalize(struct storage_engine_query_handle *seqh)
{
- struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle;
if (handle->page) {
pgc_page_release(main_cache, handle->page);
@@ -901,24 +921,24 @@ void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_hand
unregister_query_handle(handle);
rrdeng_query_handle_release(handle);
- rrddim_handle->handle = NULL;
+ seqh->handle = NULL;
netdata_thread_enable_cancelability();
}
-time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle) {
- struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
+time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *seqh) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle;
if(handle->pdc) {
rrdeng_prep_wait(handle->pdc);
- if (handle->pdc->optimal_end_time_s > rrddim_handle->end_time_s)
- rrddim_handle->end_time_s = handle->pdc->optimal_end_time_s;
+ if (handle->pdc->optimal_end_time_s > seqh->end_time_s)
+ seqh->end_time_s = handle->pdc->optimal_end_time_s;
}
- return rrddim_handle->end_time_s;
+ return seqh->end_time_s;
}
-time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
- METRIC *metric = (METRIC *)db_metric_handle;
+time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *smh) {
+ METRIC *metric = (METRIC *)smh;
time_t latest_time_s = 0;
if (metric)
@@ -927,8 +947,8 @@ time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
return latest_time_s;
}
-time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
- METRIC *metric = (METRIC *)db_metric_handle;
+time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *smh) {
+ METRIC *metric = (METRIC *)smh;
time_t oldest_time_s = 0;
if (metric)
@@ -937,9 +957,9 @@ time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
return oldest_time_s;
}
-bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_uuid, time_t *first_entry_s, time_t *last_entry_s)
+bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_s, time_t *last_entry_s)
{
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
if (unlikely(!ctx)) {
netdata_log_error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__);
return false;
@@ -949,26 +969,35 @@ bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_
if (unlikely(!metric))
return false;
- time_t update_every_s;
- mrg_metric_get_retention(main_mrg, metric, first_entry_s, last_entry_s, &update_every_s);
+ mrg_metric_get_retention(main_mrg, metric, first_entry_s, last_entry_s, NULL);
mrg_metric_release(main_mrg, metric);
return true;
}
-uint64_t rrdeng_disk_space_max(STORAGE_INSTANCE *db_instance) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+uint64_t rrdeng_disk_space_max(STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
return ctx->config.max_disk_space;
}
-uint64_t rrdeng_disk_space_used(STORAGE_INSTANCE *db_instance) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+uint64_t rrdeng_disk_space_used(STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
return __atomic_load_n(&ctx->atomic.current_disk_space, __ATOMIC_RELAXED);
}
-time_t rrdeng_global_first_time_s(STORAGE_INSTANCE *db_instance) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+uint64_t rrdeng_metrics(STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
+ return __atomic_load_n(&ctx->atomic.metrics, __ATOMIC_RELAXED);
+}
+
+uint64_t rrdeng_samples(STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
+ return __atomic_load_n(&ctx->atomic.samples, __ATOMIC_RELAXED);
+}
+
+time_t rrdeng_global_first_time_s(STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
time_t t = __atomic_load_n(&ctx->atomic.first_time_s, __ATOMIC_RELAXED);
if(t == LONG_MAX || t < 0)
@@ -977,8 +1006,8 @@ time_t rrdeng_global_first_time_s(STORAGE_INSTANCE *db_instance) {
return t;
}
-size_t rrdeng_currently_collected_metrics(STORAGE_INSTANCE *db_instance) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+size_t rrdeng_currently_collected_metrics(STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
return __atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED);
}
@@ -1099,8 +1128,8 @@ void rrdeng_readiness_wait(struct rrdengine_instance *ctx) {
netdata_log_info("DBENGINE: tier %d is ready for data collection and queries", ctx->config.tier);
}
-bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+bool rrdeng_is_legacy(STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
return ctx->config.legacy;
}
@@ -1142,7 +1171,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, const char *dbfiles_path,
ctx->config.tier = (int)tier;
ctx->config.page_type = tier_page_type[tier];
- ctx->config.global_compress_alg = RRD_LZ4;
+ ctx->config.global_compress_alg = dbengine_default_compression();
if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB)
disk_space_mb = RRDENG_MIN_DISK_SPACE_MB;
ctx->config.max_disk_space = disk_space_mb * 1048576LLU;
@@ -1154,6 +1183,8 @@ int rrdeng_init(struct rrdengine_instance **ctxp, const char *dbfiles_path,
rw_spinlock_init(&ctx->njfv2idx.spinlock);
ctx->atomic.first_time_s = LONG_MAX;
+ ctx->atomic.metrics = 0;
+ ctx->atomic.samples = 0;
if (rrdeng_dbengine_spawn(ctx) && !init_rrd_files(ctx)) {
// success - we run this ctx too
diff --git a/database/engine/rrdengineapi.h b/src/database/engine/rrdengineapi.h
index 7ae0e7079..fb449cd9b 100644
--- a/database/engine/rrdengineapi.h
+++ b/src/database/engine/rrdengineapi.h
@@ -26,32 +26,32 @@ extern uint8_t tier_page_type[];
void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_t *ret_uuid);
-STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance);
-STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
-void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle);
-STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle);
-
-STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg);
-void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle);
-void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every);
-void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, NETDATA_DOUBLE n,
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *si);
+STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *si, uuid_t *uuid);
+void rrdeng_metric_release(STORAGE_METRIC_HANDLE *smh);
+STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *smh);
+
+STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *smh, uint32_t update_every, STORAGE_METRICS_GROUP *smg);
+void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *sch);
+void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *sch, int update_every);
+void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *sch, usec_t point_in_time_ut, NETDATA_DOUBLE n,
NETDATA_DOUBLE min_value,
NETDATA_DOUBLE max_value,
uint16_t count,
uint16_t anomaly_count,
SN_FLAGS flags);
-int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle);
+int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *sch);
-void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrddim_handle,
+void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *smh, struct storage_engine_query_handle *seqh,
time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority);
-STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle);
+STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *seqh);
-int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrddim_handle);
-void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_handle);
-time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
-time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
-time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle);
+int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *seqh);
+void rrdeng_load_metric_finalize(struct storage_engine_query_handle *seqh);
+time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *smh);
+time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *smh);
+time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *seqh);
void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
@@ -64,10 +64,10 @@ void rrdeng_exit_mode(struct rrdengine_instance *ctx);
int rrdeng_exit(struct rrdengine_instance *ctx);
void rrdeng_prepare_exit(struct rrdengine_instance *ctx);
-bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_uuid, time_t *first_entry_s, time_t *last_entry_s);
+bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_s, time_t *last_entry_s);
-extern STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
-extern void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg);
+extern STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *si, uuid_t *uuid);
+extern void rrdeng_metrics_group_release(STORAGE_INSTANCE *si, STORAGE_METRICS_GROUP *smg);
typedef struct rrdengine_size_statistics {
size_t default_granularity_secs;
@@ -221,9 +221,6 @@ struct rrdeng_cache_efficiency_stats rrdeng_get_cache_efficiency_stats(void);
RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx);
size_t rrdeng_collectors_running(struct rrdengine_instance *ctx);
-bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance);
-
-uint64_t rrdeng_disk_space_max(STORAGE_INSTANCE *db_instance);
-uint64_t rrdeng_disk_space_used(STORAGE_INSTANCE *db_instance);
+bool rrdeng_is_legacy(STORAGE_INSTANCE *si);
#endif /* NETDATA_RRDENGINEAPI_H */
diff --git a/database/engine/rrdenginelib.c b/src/database/engine/rrdenginelib.c
index dc581d98d..dc581d98d 100644
--- a/database/engine/rrdenginelib.c
+++ b/src/database/engine/rrdenginelib.c
diff --git a/database/engine/rrdenginelib.h b/src/database/engine/rrdenginelib.h
index a0febd4f4..a0febd4f4 100644
--- a/database/engine/rrdenginelib.h
+++ b/src/database/engine/rrdenginelib.h