summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:07:37 +0000
commitb485aab7e71c1625cfc27e0f92c9509f42378458 (patch)
treeae9abe108601079d1679194de237c9a435ae5b55 /database/engine
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz
netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--database/engine/Makefile.am11
-rw-r--r--database/engine/datafile.ksy74
-rw-r--r--database/engine/journalfile_v2.ksy.in150
-rw-r--r--src/database/engine/README.md (renamed from database/engine/README.md)0
-rw-r--r--src/database/engine/cache.c (renamed from database/engine/cache.c)93
-rw-r--r--src/database/engine/cache.h (renamed from database/engine/cache.h)9
-rw-r--r--src/database/engine/datafile.c (renamed from database/engine/datafile.c)4
-rw-r--r--src/database/engine/datafile.h (renamed from database/engine/datafile.h)0
-rw-r--r--src/database/engine/dbengine-diagram.xml (renamed from database/engine/dbengine-diagram.xml)0
-rw-r--r--src/database/engine/journalfile.c (renamed from database/engine/journalfile.c)29
-rw-r--r--src/database/engine/journalfile.h (renamed from database/engine/journalfile.h)1
-rw-r--r--src/database/engine/metric.c (renamed from database/engine/metric.c)331
-rw-r--r--src/database/engine/metric.h (renamed from database/engine/metric.h)15
-rw-r--r--src/database/engine/page.c (renamed from database/engine/page.c)106
-rw-r--r--src/database/engine/page.h (renamed from database/engine/page.h)0
-rw-r--r--src/database/engine/page_test.cc (renamed from database/engine/page_test.cc)0
-rw-r--r--src/database/engine/page_test.h (renamed from database/engine/page_test.h)0
-rw-r--r--src/database/engine/pagecache.c (renamed from database/engine/pagecache.c)24
-rw-r--r--src/database/engine/pagecache.h (renamed from database/engine/pagecache.h)6
-rw-r--r--src/database/engine/pdc.c (renamed from database/engine/pdc.c)89
-rw-r--r--src/database/engine/pdc.h (renamed from database/engine/pdc.h)0
-rw-r--r--src/database/engine/rrddiskprotocol.h (renamed from database/engine/rrddiskprotocol.h)16
-rw-r--r--src/database/engine/rrdengine.c (renamed from database/engine/rrdengine.c)92
-rw-r--r--src/database/engine/rrdengine.h (renamed from database/engine/rrdengine.h)16
-rwxr-xr-xsrc/database/engine/rrdengineapi.c (renamed from database/engine/rrdengineapi.c)269
-rw-r--r--src/database/engine/rrdengineapi.h (renamed from database/engine/rrdengineapi.h)45
-rw-r--r--src/database/engine/rrdenginelib.c (renamed from database/engine/rrdenginelib.c)0
-rw-r--r--src/database/engine/rrdenginelib.h (renamed from database/engine/rrdenginelib.h)0
-rw-r--r--src/go/collectors/go.d.plugin/agent/testdata/agent-empty.conf (renamed from database/engine/metadata_log/README.md)0
29 files changed, 656 insertions, 724 deletions
diff --git a/database/engine/Makefile.am b/database/engine/Makefile.am
deleted file mode 100644
index 59250a997..000000000
--- a/database/engine/Makefile.am
+++ /dev/null
@@ -1,11 +0,0 @@
-# SPDX-License-Identifier: GPL-3.0-or-later
-
-AUTOMAKE_OPTIONS = subdir-objects
-MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
-
-SUBDIRS = \
- $(NULL)
-
-dist_noinst_DATA = \
- README.md \
- $(NULL)
diff --git a/database/engine/datafile.ksy b/database/engine/datafile.ksy
deleted file mode 100644
index 28d4b3935..000000000
--- a/database/engine/datafile.ksy
+++ /dev/null
@@ -1,74 +0,0 @@
-meta:
- id: netdata_datafile
- endian: le
-
-seq:
- - id: hdr
- type: header
- size: 4096
- - id: extents
- type: extent
- repeat: eos
-
-types:
- header:
- seq:
- - id: magic
- contents: "netdata-data-file"
- - id: reserved
- size: 15
- - id: version
- contents: "1.0"
- - id: reserved1
- size: 13
- - id: tier
- type: u1
- extent_page_descr:
- seq:
- - id: type
- type: u1
- enum: page_type
- - id: uuid
- size: 16
- - id: page_len
- type: u4
- - id: start_time_ut
- type: u8
- - id: end_time_ut
- type: u8
- enums:
- page_type:
- 0: metrics
- 1: tier
- extent_header:
- seq:
- - id: payload_length
- type: u4
- - id: compression_algorithm
- type: u1
- enum: compression_algos
- - id: number_of_pages
- type: u1
- - id: page_descriptors
- type: extent_page_descr
- repeat: expr
- repeat-expr: number_of_pages
- enums:
- compression_algos:
- 0: rrd_no_compression
- 1: rrd_lz4
- extent_trailer:
- seq:
- - id: crc32_checksum
- type: u4
- extent:
- seq:
- - id: header
- type: extent_header
- - id: payload
- size: header.payload_length
- - id: trailer
- type: extent_trailer
- - id: padding
- size: (((_io.pos + 4095) / 4096) * 4096) - _io.pos
- # the extent size is made to always be a multiple of 4096
diff --git a/database/engine/journalfile_v2.ksy.in b/database/engine/journalfile_v2.ksy.in
deleted file mode 100644
index 6a656bc45..000000000
--- a/database/engine/journalfile_v2.ksy.in
+++ /dev/null
@@ -1,150 +0,0 @@
-meta:
- id: journalfile_v2`'ifdef(`VIRT_MEMBERS',`_virtmemb')
- endian: le
- application: netdata
- file-extension: njfv2
- license: GPL-3.0-or-later
-
-seq:
- - id: journal_v2_header
- type: journal_v2_header
- size: 4096
- - id: extent_list
- type: journal_v2_extent_list
- repeat: expr
- repeat-expr: journal_v2_header.extent_count
- - id: extent_trailer
- type: journal_v2_block_trailer
- - id: metric_list
- type: journal_v2_metric_list
- repeat: expr
- repeat-expr: journal_v2_header.metric_count
- - id: metric_trailer
- type: journal_v2_block_trailer
- - id: page_blocs
- type: journal_v2_page_block
- repeat: expr
- repeat-expr: _root.journal_v2_header.metric_count
- - id: padding
- size: _root._io.size - _root._io.pos - 4
- - id: journal_file_trailer
- type: journal_v2_block_trailer
-
-types:
- journal_v2_metric_list:
- seq:
- - id: uuid
- size: 16
- - id: entries
- type: u4
- - id: page_offset
- type: u4
- - id: delta_start_s
- type: u4
- - id: delta_end_s
- type: u4
-ifdef(`VIRT_MEMBERS',
-` instances:
- page_block:
- type: journal_v2_page_block
- io: _root._io
- pos: page_offset
-')dnl
- journal_v2_page_hdr:
- seq:
- - id: crc
- type: u4
- - id: uuid_offset
- type: u4
- - id: entries
- type: u4
- - id: uuid
- size: 16
- journal_v2_page_list:
- seq:
- - id: delta_start_s
- type: u4
- - id: delta_end_s
- type: u4
- - id: extent_idx
- type: u4
- - id: update_every_s
- type: u4
- - id: page_len
- type: u2
- - id: type
- type: u1
- - id: reserved
- type: u1
-ifdef(`VIRT_MEMBERS',
-` instances:
- extent:
- io: _root._io
- type: journal_v2_extent_list
- pos: _root.journal_v2_header.extent_offset + (extent_idx * 16)
-')dnl
- journal_v2_header:
- seq:
- - id: magic
- contents: [ 0x19, 0x10, 0x22, 0x01 ] #0x01221019
- - id: reserved
- type: u4
- - id: start_time_ut
- type: u8
- - id: end_time_ut
- type: u8
- - id: extent_count
- type: u4
- - id: extent_offset
- type: u4
- - id: metric_count
- type: u4
- - id: metric_offset
- type: u4
- - id: page_count
- type: u4
- - id: page_offset
- type: u4
- - id: extent_trailer_offset
- type: u4
- - id: metric_trailer_offset
- type: u4
- - id: original_file_size
- type: u4
- - id: total_file_size
- type: u4
- - id: data
- type: u8
-ifdef(`VIRT_MEMBERS',
-` instances:
- trailer:
- io: _root._io
- type: journal_v2_block_trailer
- pos: _root._io.size - 4
-')dnl
- journal_v2_block_trailer:
- seq:
- - id: checksum
- type: u4
- journal_v2_extent_list:
- seq:
- - id: datafile_offset
- type: u8
- - id: datafile_size
- type: u4
- - id: file_idx
- type: u2
- - id: page_cnt
- type: u1
- - id: padding
- type: u1
- journal_v2_page_block:
- seq:
- - id: hdr
- type: journal_v2_page_hdr
- - id: page_list
- type: journal_v2_page_list
- repeat: expr
- repeat-expr: hdr.entries
- - id: block_trailer
- type: journal_v2_block_trailer
diff --git a/database/engine/README.md b/src/database/engine/README.md
index 890018642..890018642 100644
--- a/database/engine/README.md
+++ b/src/database/engine/README.md
diff --git a/database/engine/cache.c b/src/database/engine/cache.c
index eb1c35298..49a9b6b96 100644
--- a/database/engine/cache.c
+++ b/src/database/engine/cache.c
@@ -1325,21 +1325,8 @@ static PGC_PAGE *page_add(PGC *cache, PGC_ENTRY *entry, bool *added) {
return page;
}
-static PGC_PAGE *page_find_and_acquire(PGC *cache, Word_t section, Word_t metric_id, time_t start_time_s, PGC_SEARCH method) {
- __atomic_add_fetch(&cache->stats.workers_search, 1, __ATOMIC_RELAXED);
-
- size_t *stats_hit_ptr, *stats_miss_ptr;
-
- if(method == PGC_SEARCH_CLOSEST) {
- __atomic_add_fetch(&cache->stats.searches_closest, 1, __ATOMIC_RELAXED);
- stats_hit_ptr = &cache->stats.searches_closest_hits;
- stats_miss_ptr = &cache->stats.searches_closest_misses;
- }
- else {
- __atomic_add_fetch(&cache->stats.searches_exact, 1, __ATOMIC_RELAXED);
- stats_hit_ptr = &cache->stats.searches_exact_hits;
- stats_miss_ptr = &cache->stats.searches_exact_misses;
- }
+static PGC_PAGE *page_find_and_acquire_once(PGC *cache, Word_t section, Word_t metric_id, time_t start_time_s, PGC_SEARCH method, bool *retry) {
+ *retry = false;
PGC_PAGE *page = NULL;
size_t partition = pgc_indexing_partition(cache, metric_id);
@@ -1462,22 +1449,13 @@ static PGC_PAGE *page_find_and_acquire(PGC *cache, Word_t section, Word_t metric
if(!page_acquire(cache, page)) {
// this page is not good to use
+ *retry = true;
page = NULL;
}
}
cleanup:
pgc_index_read_unlock(cache, partition);
-
- if(page) {
- __atomic_add_fetch(stats_hit_ptr, 1, __ATOMIC_RELAXED);
- page_has_been_accessed(cache, page);
- }
- else
- __atomic_add_fetch(stats_miss_ptr, 1, __ATOMIC_RELAXED);
-
- __atomic_sub_fetch(&cache->stats.workers_search, 1, __ATOMIC_RELAXED);
-
return page;
}
@@ -1882,7 +1860,7 @@ void pgc_page_release(PGC *cache, PGC_PAGE *page) {
page_release(cache, page, is_page_clean(page));
}
-void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page) {
+void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page, bool never_flush) {
__atomic_add_fetch(&cache->stats.workers_hot2dirty, 1, __ATOMIC_RELAXED);
//#ifdef NETDATA_INTERNAL_CHECKS
@@ -1901,10 +1879,8 @@ void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page) {
__atomic_sub_fetch(&cache->stats.workers_hot2dirty, 1, __ATOMIC_RELAXED);
// flush, if we have to
- if((cache->config.options & PGC_OPTIONS_FLUSH_PAGES_INLINE) || flushing_critical(cache)) {
- flush_pages(cache, cache->config.max_flushes_inline, PGC_SECTION_ALL,
- false, false);
- }
+ if(!never_flush && ((cache->config.options & PGC_OPTIONS_FLUSH_PAGES_INLINE) || flushing_critical(cache)))
+ flush_pages(cache, cache->config.max_flushes_inline, PGC_SECTION_ALL, false, false);
}
bool pgc_page_to_clean_evict_or_release(PGC *cache, PGC_PAGE *page) {
@@ -1949,13 +1925,13 @@ time_t pgc_page_end_time_s(PGC_PAGE *page) {
return page->end_time_s;
}
-time_t pgc_page_update_every_s(PGC_PAGE *page) {
+uint32_t pgc_page_update_every_s(PGC_PAGE *page) {
return page->update_every_s;
}
-time_t pgc_page_fix_update_every(PGC_PAGE *page, time_t update_every_s) {
+uint32_t pgc_page_fix_update_every(PGC_PAGE *page, uint32_t update_every_s) {
if(page->update_every_s == 0)
- page->update_every_s = (uint32_t) update_every_s;
+ page->update_every_s = update_every_s;
return page->update_every_s;
}
@@ -2050,7 +2026,46 @@ void pgc_page_hot_set_end_time_s(PGC *cache __maybe_unused, PGC_PAGE *page, time
}
PGC_PAGE *pgc_page_get_and_acquire(PGC *cache, Word_t section, Word_t metric_id, time_t start_time_s, PGC_SEARCH method) {
- return page_find_and_acquire(cache, section, metric_id, start_time_s, method);
+ static const struct timespec ns = { .tv_sec = 0, .tv_nsec = 1 };
+
+ PGC_PAGE *page = NULL;
+
+ __atomic_add_fetch(&cache->stats.workers_search, 1, __ATOMIC_RELAXED);
+
+ size_t *stats_hit_ptr, *stats_miss_ptr;
+
+ if(method == PGC_SEARCH_CLOSEST) {
+ __atomic_add_fetch(&cache->stats.searches_closest, 1, __ATOMIC_RELAXED);
+ stats_hit_ptr = &cache->stats.searches_closest_hits;
+ stats_miss_ptr = &cache->stats.searches_closest_misses;
+ }
+ else {
+ __atomic_add_fetch(&cache->stats.searches_exact, 1, __ATOMIC_RELAXED);
+ stats_hit_ptr = &cache->stats.searches_exact_hits;
+ stats_miss_ptr = &cache->stats.searches_exact_misses;
+ }
+
+ while(1) {
+ bool retry = false;
+
+ page = page_find_and_acquire_once(cache, section, metric_id, start_time_s, method, &retry);
+
+ if(page || !retry)
+ break;
+
+ nanosleep(&ns, NULL);
+ }
+
+ if(page) {
+ __atomic_add_fetch(stats_hit_ptr, 1, __ATOMIC_RELAXED);
+ page_has_been_accessed(cache, page);
+ }
+ else
+ __atomic_add_fetch(stats_miss_ptr, 1, __ATOMIC_RELAXED);
+
+ __atomic_sub_fetch(&cache->stats.workers_search, 1, __ATOMIC_RELAXED);
+
+ return page;
}
struct pgc_statistics pgc_get_statistics(PGC *cache) {
@@ -2224,7 +2239,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_
while ((PValue2 = JudyLFirstThenNext(mi->JudyL_pages_by_start_time, &start_time, &start_time_first))) {
struct jv2_page_info *pi = *PValue2;
page_transition_unlock(cache, pi->page);
- pgc_page_hot_to_dirty_and_release(cache, pi->page);
+ pgc_page_hot_to_dirty_and_release(cache, pi->page, true);
// make_acquired_page_clean_and_evict_or_page_release(cache, pi->page);
aral_freez(ar_pi, pi);
}
@@ -2251,6 +2266,8 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_
aral_by_size_release(ar_mi);
__atomic_sub_fetch(&cache->stats.workers_jv2_flush, 1, __ATOMIC_RELAXED);
+
+ flush_pages(cache, cache->config.max_flushes_inline, PGC_SECTION_ALL, false, false);
}
static bool match_page_data(PGC_PAGE *page, void *data) {
@@ -2396,7 +2413,7 @@ void *unittest_stress_test_collector(void *ptr) {
if(i % 10 == 0)
pgc_page_to_clean_evict_or_release(pgc_uts.cache, pgc_uts.metrics[i]);
else
- pgc_page_hot_to_dirty_and_release(pgc_uts.cache, pgc_uts.metrics[i]);
+ pgc_page_hot_to_dirty_and_release(pgc_uts.cache, pgc_uts.metrics[i], false);
}
}
@@ -2721,7 +2738,7 @@ int pgc_unittest(void) {
}, NULL);
pgc_page_hot_set_end_time_s(cache, page2, 2001);
- pgc_page_hot_to_dirty_and_release(cache, page2);
+ pgc_page_hot_to_dirty_and_release(cache, page2, false);
PGC_PAGE *page3 = pgc_page_add_and_acquire(cache, (PGC_ENTRY){
.section = 3,
@@ -2734,7 +2751,7 @@ int pgc_unittest(void) {
}, NULL);
pgc_page_hot_set_end_time_s(cache, page3, 2001);
- pgc_page_hot_to_dirty_and_release(cache, page3);
+ pgc_page_hot_to_dirty_and_release(cache, page3, false);
pgc_destroy(cache);
diff --git a/database/engine/cache.h b/src/database/engine/cache.h
index 7cd7c0636..b6f81bcc2 100644
--- a/database/engine/cache.h
+++ b/src/database/engine/cache.h
@@ -2,6 +2,7 @@
#ifndef DBENGINE_CACHE_H
#define DBENGINE_CACHE_H
+#include "datafile.h"
#include "../rrd.h"
// CACHE COMPILE TIME CONFIGURATION
@@ -27,7 +28,7 @@ typedef struct pgc_entry {
time_t end_time_s; // the end time of the page
size_t size; // the size in bytes of the allocation, outside the cache
void *data; // a pointer to data outside the cache
- uint32_t update_every_s; // the update every of the page
+ uint32_t update_every_s; // the update every of the page
bool hot; // true if this entry is currently being collected
uint8_t *custom_data;
} PGC_ENTRY;
@@ -191,7 +192,7 @@ PGC_PAGE *pgc_page_dup(PGC *cache, PGC_PAGE *page);
void pgc_page_release(PGC *cache, PGC_PAGE *page);
// mark a hot page dirty, and release it
-void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page);
+void pgc_page_hot_to_dirty_and_release(PGC *cache, PGC_PAGE *page, bool never_flush);
// find a page from the cache
typedef enum {
@@ -210,8 +211,8 @@ Word_t pgc_page_section(PGC_PAGE *page);
Word_t pgc_page_metric(PGC_PAGE *page);
time_t pgc_page_start_time_s(PGC_PAGE *page);
time_t pgc_page_end_time_s(PGC_PAGE *page);
-time_t pgc_page_update_every_s(PGC_PAGE *page);
-time_t pgc_page_fix_update_every(PGC_PAGE *page, time_t update_every_s);
+uint32_t pgc_page_update_every_s(PGC_PAGE *page);
+uint32_t pgc_page_fix_update_every(PGC_PAGE *page, uint32_t update_every_s);
time_t pgc_page_fix_end_time_s(PGC_PAGE *page, time_t end_time_s);
void *pgc_page_data(PGC_PAGE *page);
void *pgc_page_custom_data(PGC *cache, PGC_PAGE *page);
diff --git a/database/engine/datafile.c b/src/database/engine/datafile.c
index 7322039cd..1ec2dea79 100644
--- a/database/engine/datafile.c
+++ b/src/database/engine/datafile.c
@@ -557,7 +557,9 @@ void finalize_data_files(struct rrdengine_instance *ctx)
{
bool logged = false;
- logged = false;
+ if (!ctx->datafiles.first)
+ return;
+
while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED)) {
if(!logged) {
netdata_log_info("Waiting for inflight flush to finish on tier %d...", ctx->config.tier);
diff --git a/database/engine/datafile.h b/src/database/engine/datafile.h
index 569f1b0a2..569f1b0a2 100644
--- a/database/engine/datafile.h
+++ b/src/database/engine/datafile.h
diff --git a/database/engine/dbengine-diagram.xml b/src/database/engine/dbengine-diagram.xml
index 793e8a355..793e8a355 100644
--- a/database/engine/dbengine-diagram.xml
+++ b/src/database/engine/dbengine-diagram.xml
diff --git a/database/engine/journalfile.c b/src/database/engine/journalfile.c
index 9005b81ca..8099d017f 100644
--- a/database/engine/journalfile.c
+++ b/src/database/engine/journalfile.c
@@ -637,9 +637,12 @@ static int journalfile_check_superblock(uv_file file)
fatal_assert(req.result >= 0);
uv_fs_req_cleanup(&req);
- if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
- strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
- netdata_log_error("DBENGINE: File has invalid superblock.");
+
+ char jf_magic[RRDENG_MAGIC_SZ] = RRDENG_JF_MAGIC;
+ char jf_ver[RRDENG_VER_SZ] = RRDENG_JF_VER;
+ if (strncmp(superblock->magic_number, jf_magic, RRDENG_MAGIC_SZ) != 0 ||
+ strncmp(superblock->version, jf_ver, RRDENG_VER_SZ) != 0) {
+ nd_log(NDLS_DAEMON, NDLP_ERR, "DBENGINE: File has invalid superblock.");
ret = UV_EINVAL;
} else {
ret = 0;
@@ -669,7 +672,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx,
uuid_t *temp_id;
uint8_t page_type = jf_metric_data->descr[i].type;
- if (page_type > PAGE_TYPE_MAX) {
+ if (page_type > RRDENG_PAGE_TYPE_MAX) {
if (!bitmap256_get_bit(&page_error_map, page_type)) {
netdata_log_error("DBENGINE: unknown page type %d encountered.", page_type);
bitmap256_set_bit(&page_error_map, page_type, 1);
@@ -700,13 +703,19 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx,
.section = (Word_t)ctx,
.first_time_s = vd.start_time_s,
.last_time_s = vd.end_time_s,
- .latest_update_every_s = (uint32_t) vd.update_every_s,
+ .latest_update_every_s = vd.update_every_s,
};
bool added;
metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
- if(added)
+ if(added) {
+ __atomic_add_fetch(&ctx->atomic.metrics, 1, __ATOMIC_RELAXED);
update_metric_time = false;
+ }
+ if (vd.update_every_s) {
+ uint64_t samples = (vd.end_time_s - vd.start_time_s) / vd.update_every_s;
+ __atomic_add_fetch(&ctx->atomic.samples, samples, __ATOMIC_RELAXED);
+ }
}
Word_t metric_id = mrg_metric_id(main_mrg, metric);
@@ -1005,7 +1014,7 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st
time_t end_time_s = header_start_time_s + metric->delta_end_s;
mrg_update_metric_retention_and_granularity_by_uuid(
- main_mrg, (Word_t)ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s);
+ main_mrg, (Word_t)ctx, &metric->uuid, start_time_s, end_time_s, metric->update_every_s, now_s);
metric++;
}
@@ -1042,7 +1051,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
journal_v1_file_size = (uint32_t)statbuf.st_size;
journalfile_v2_generate_path(datafile, path_v2, sizeof(path_v2));
- fd = open(path_v2, O_RDONLY);
+ fd = open(path_v2, O_RDONLY | O_CLOEXEC);
if (fd < 0) {
if (errno == ENOENT)
return 1;
@@ -1226,7 +1235,7 @@ void *journalfile_v2_write_data_page(struct journal_v2_header *j2_header, void *
data_page->delta_end_s = (uint32_t) (page_info->end_time_s - (time_t) (j2_header->start_time_ut) / USEC_PER_SEC);
data_page->extent_index = page_info->extent_index;
- data_page->update_every_s = (uint32_t) page_info->update_every_s;
+ data_page->update_every_s = page_info->update_every_s;
data_page->page_length = (uint16_t) (ei ? ei->page_length : page_info->page_length);
data_page->type = 0;
@@ -1252,7 +1261,7 @@ static void *journalfile_v2_write_descriptors(struct journal_v2_header *j2_heade
page_info = *PValue;
// Write one descriptor and return the next data page location
data_page = journalfile_v2_write_data_page(j2_header, (void *) data_page, page_info);
- update_every_s = (uint32_t) page_info->update_every_s;
+ update_every_s = page_info->update_every_s;
if (NULL == data_page)
break;
}
diff --git a/database/engine/journalfile.h b/src/database/engine/journalfile.h
index 5cdf72b9d..3f881ee16 100644
--- a/database/engine/journalfile.h
+++ b/src/database/engine/journalfile.h
@@ -7,7 +7,6 @@
/* Forward declarations */
struct rrdengine_instance;
-struct rrdengine_worker_config;
struct rrdengine_datafile;
struct rrdengine_journalfile;
diff --git a/database/engine/metric.c b/src/database/engine/metric.c
index 2e132612e..01eb22fbc 100644
--- a/database/engine/metric.c
+++ b/src/database/engine/metric.c
@@ -1,5 +1,8 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "metric.h"
+#include "cache.h"
+#include "libnetdata/locks/locks.h"
+#include "rrddiskprotocol.h"
typedef int32_t REFCOUNT;
#define REFCOUNT_DELETING (-100)
@@ -104,8 +107,11 @@ static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg, size_t partition
static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) {
uint8_t *u = (uint8_t *)uuid;
- size_t *n = (size_t *)&u[UUID_SZ - sizeof(size_t)];
- return *n % mrg->partitions;
+
+ size_t n;
+ memcpy(&n, &u[UUID_SZ - sizeof(size_t)], sizeof(size_t));
+
+ return n % mrg->partitions;
}
static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused, METRIC *metric) {
@@ -125,87 +131,174 @@ static inline time_t mrg_metric_get_first_time_s_smart(MRG *mrg __maybe_unused,
return first_time_s;
}
-static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric) {
+static void metric_log(MRG *mrg __maybe_unused, METRIC *metric, const char *msg) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)metric->section;
+
+ char uuid[UUID_STR_LEN];
+ uuid_unparse_lower(metric->uuid, uuid);
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "METRIC: %s on %s at tier %d, refcount %d, partition %u, "
+ "retention [%ld - %ld (hot), %ld (clean)], update every %"PRIu32", "
+ "writer pid %d "
+ "--- PLEASE OPEN A GITHUB ISSUE TO REPORT THIS LOG LINE TO NETDATA --- ",
+ msg,
+ uuid,
+ ctx->config.tier,
+ metric->refcount,
+ metric->partition,
+ metric->first_time_s,
+ metric->latest_time_s_hot,
+ metric->latest_time_s_clean,
+ metric->latest_update_every_s,
+ (int)metric->writer
+ );
+}
+
+static inline bool acquired_metric_has_retention(MRG *mrg, METRIC *metric) {
+ time_t first, last;
+ mrg_metric_get_retention(mrg, metric, &first, &last, NULL);
+ return (!first || !last || first > last);
+}
+
+static inline void acquired_for_deletion_metric_delete(MRG *mrg, METRIC *metric) {
size_t partition = metric->partition;
- REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
- REFCOUNT refcount;
+
+ size_t mem_before_judyl, mem_after_judyl;
+
+ mrg_index_write_lock(mrg, partition);
+
+ Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t));
+ if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) {
+ MRG_STATS_DELETE_MISS(mrg, partition);
+ mrg_index_write_unlock(mrg, partition);
+ return;
+ }
+
+ mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
+ int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0);
+ mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
+ mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
+
+ if(unlikely(!rc)) {
+ MRG_STATS_DELETE_MISS(mrg, partition);
+ mrg_index_write_unlock(mrg, partition);
+ return;
+ }
+
+ if(!*sections_judy_pptr) {
+ rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0);
+ if(unlikely(!rc))
+ fatal("DBENGINE METRIC: cannot delete UUID from JudyHS");
+ mrg_stats_size_judyhs_removed_uuid(mrg, partition);
+ }
+
+ MRG_STATS_DELETED_METRIC(mrg, partition);
+
+ mrg_index_write_unlock(mrg, partition);
+
+ aral_freez(mrg->index[partition].aral, metric);
+}
+
+static inline bool metric_acquire(MRG *mrg, METRIC *metric) {
+ REFCOUNT expected, desired;
+
+ expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
do {
- if(expected < 0)
- fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount);
+ if(unlikely(expected < 0))
+ return false;
+
+ desired = expected + 1;
- refcount = expected + 1;
- } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+ } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED));
+
+ size_t partition = metric->partition;
- if(refcount == 1)
+ if(desired == 1)
__atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
- return refcount;
+ return true;
}
-static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) {
+static inline bool metric_release(MRG *mrg, METRIC *metric, bool delete_if_last_without_retention) {
size_t partition = metric->partition;
- REFCOUNT expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
- REFCOUNT refcount;
+ REFCOUNT expected, desired;
+
+ expected = __atomic_load_n(&metric->refcount, __ATOMIC_RELAXED);
do {
- if(expected <= 0)
- fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount);
+ if(expected <= 0) {
+ metric_log(mrg, metric, "refcount is zero or negative during release");
+ fatal("METRIC: refcount is %d (zero or negative) during release", expected);
+ }
- refcount = expected - 1;
- } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, refcount, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+ if(expected == 1 && delete_if_last_without_retention && !acquired_metric_has_retention(mrg, metric))
+ desired = REFCOUNT_DELETING;
+ else
+ desired = expected - 1;
+
+ } while(!__atomic_compare_exchange_n(&metric->refcount, &expected, desired, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED));
- if(unlikely(!refcount))
+ if(desired == 0 || desired == REFCOUNT_DELETING) {
__atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
+ if(desired == REFCOUNT_DELETING)
+ acquired_for_deletion_metric_delete(mrg, metric);
+ }
+
__atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
- time_t first, last, ue;
- mrg_metric_get_retention(mrg, metric, &first, &last, &ue);
- return (!first || !last || first > last);
+ return desired == REFCOUNT_DELETING;
}
static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
size_t partition = uuid_partition(mrg, entry->uuid);
METRIC *allocation = aral_mallocz(mrg->index[partition].aral);
+ Pvoid_t *PValue;
- mrg_index_write_lock(mrg, partition);
+ while(1) {
+ mrg_index_write_lock(mrg, partition);
- size_t mem_before_judyl, mem_after_judyl;
+ size_t mem_before_judyl, mem_after_judyl;
- Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, entry->uuid, sizeof(uuid_t), PJE0);
- if(unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR))
- fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array");
+ Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, entry->uuid, sizeof(uuid_t), PJE0);
+ if (unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR))
+ fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array");
- if(unlikely(!*sections_judy_pptr))
- mrg_stats_size_judyhs_added_uuid(mrg, partition);
+ if (unlikely(!*sections_judy_pptr))
+ mrg_stats_size_judyhs_added_uuid(mrg, partition);
- mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
- Pvoid_t *PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0);
- mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
- mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
+ mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
+ PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0);
+ mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
+ mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
- if(unlikely(!PValue || PValue == PJERR))
- fatal("DBENGINE METRIC: corrupted section JudyL array");
+ if (unlikely(!PValue || PValue == PJERR))
+ fatal("DBENGINE METRIC: corrupted section JudyL array");
- if(unlikely(*PValue != NULL)) {
- METRIC *metric = *PValue;
+ if (unlikely(*PValue != NULL)) {
+ METRIC *metric = *PValue;
- metric_acquire(mrg, metric);
+ if(!metric_acquire(mrg, metric)) {
+ mrg_index_write_unlock(mrg, partition);
+ continue;
+ }
- MRG_STATS_DUPLICATE_ADD(mrg, partition);
+ MRG_STATS_DUPLICATE_ADD(mrg, partition);
+ mrg_index_write_unlock(mrg, partition);
- mrg_index_write_unlock(mrg, partition);
+ if (ret)
+ *ret = false;
- if(ret)
- *ret = false;
+ aral_freez(mrg->index[partition].aral, allocation);
- aral_freez(mrg->index[partition].aral, allocation);
+ return metric;
+ }
- return metric;
+ break;
}
METRIC *metric = allocation;
@@ -216,9 +309,8 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r
metric->latest_time_s_hot = 0;
metric->latest_update_every_s = entry->latest_update_every_s;
metric->writer = 0;
- metric->refcount = 0;
+ metric->refcount = 1;
metric->partition = partition;
- metric_acquire(mrg, metric);
*PValue = metric;
MRG_STATS_ADDED_METRIC(mrg, partition);
@@ -234,77 +326,35 @@ static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *r
static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
size_t partition = uuid_partition(mrg, uuid);
- mrg_index_read_lock(mrg, partition);
+ while(1) {
+ mrg_index_read_lock(mrg, partition);
- Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t));
- if(unlikely(!sections_judy_pptr)) {
- mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_MISS(mrg, partition);
- return NULL;
- }
-
- Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0);
- if(unlikely(!PValue)) {
- mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_MISS(mrg, partition);
- return NULL;
- }
-
- METRIC *metric = *PValue;
-
- metric_acquire(mrg, metric);
-
- mrg_index_read_unlock(mrg, partition);
-
- MRG_STATS_SEARCH_HIT(mrg, partition);
- return metric;
-}
-
-static inline bool acquired_metric_del(MRG *mrg, METRIC *metric) {
- size_t partition = metric->partition;
-
- size_t mem_before_judyl, mem_after_judyl;
-
- mrg_index_write_lock(mrg, partition);
+ Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t));
+ if (unlikely(!sections_judy_pptr)) {
+ mrg_index_read_unlock(mrg, partition);
+ MRG_STATS_SEARCH_MISS(mrg, partition);
+ return NULL;
+ }
- if(!metric_release_and_can_be_deleted(mrg, metric)) {
- mrg->index[partition].stats.delete_having_retention_or_referenced++;
- mrg_index_write_unlock(mrg, partition);
- return false;
- }
+ Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0);
+ if (unlikely(!PValue)) {
+ mrg_index_read_unlock(mrg, partition);
+ MRG_STATS_SEARCH_MISS(mrg, partition);
+ return NULL;
+ }
- Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t));
- if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) {
- MRG_STATS_DELETE_MISS(mrg, partition);
- mrg_index_write_unlock(mrg, partition);
- return false;
- }
+ METRIC *metric = *PValue;
- mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
- int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0);
- mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
- mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
+ if(metric && !metric_acquire(mrg, metric))
+ metric = NULL;
- if(unlikely(!rc)) {
- MRG_STATS_DELETE_MISS(mrg, partition);
- mrg_index_write_unlock(mrg, partition);
- return false;
- }
+ mrg_index_read_unlock(mrg, partition);
- if(!*sections_judy_pptr) {
- rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0);
- if(unlikely(!rc))
- fatal("DBENGINE METRIC: cannot delete UUID from JudyHS");
- mrg_stats_size_judyhs_removed_uuid(mrg, partition);
+ if(metric) {
+ MRG_STATS_SEARCH_HIT(mrg, partition);
+ return metric;
+ }
}
-
- MRG_STATS_DELETED_METRIC(mrg, partition);
-
- mrg_index_write_unlock(mrg, partition);
-
- aral_freez(mrg->index[partition].aral, metric);
-
- return true;
}
// ----------------------------------------------------------------------------
@@ -359,7 +409,7 @@ inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section
}
inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
- return acquired_metric_del(mrg, metric);
+ return metric_release(mrg, metric, true);
}
inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
@@ -367,8 +417,8 @@ inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
return metric;
}
-inline bool mrg_metric_release(MRG *mrg, METRIC *metric) {
- return metric_release_and_can_be_deleted(mrg, metric);
+inline void mrg_metric_release(MRG *mrg, METRIC *metric) {
+ metric_release(mrg, metric, false);
}
inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) {
@@ -394,8 +444,8 @@ inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric,
return true;
}
-inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) {
- internal_fatal(first_time_s < 0 || last_time_s < 0 || update_every_s < 0,
+inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, uint32_t update_every_s) {
+ internal_fatal(first_time_s < 0 || last_time_s < 0,
"DBENGINE METRIC: timestamp is negative");
internal_fatal(first_time_s > max_acceptable_collected_time(),
"DBENGINE METRIC: metric first time is in the future");
@@ -425,13 +475,14 @@ inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metri
return mrg_metric_get_first_time_s_smart(mrg, metric);
}
-inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) {
+inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, uint32_t *update_every_s) {
time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
*last_time_s = MAX(clean, hot);
*first_time_s = mrg_metric_get_first_time_s_smart(mrg, metric);
- *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
+ if (update_every_s)
+ *update_every_s = __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
}
inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
@@ -498,8 +549,8 @@ inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metr
}
} while(do_again);
- time_t first, last, ue;
- mrg_metric_get_retention(mrg, metric, &first, &last, &ue);
+ time_t first, last;
+ mrg_metric_get_retention(mrg, metric, &first, &last, NULL);
return (first && last && first < last);
}
@@ -517,6 +568,11 @@ inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *me
return false;
}
+inline time_t mrg_metric_get_latest_clean_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
+ time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
+ return clean;
+}
+
inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t clean = __atomic_load_n(&metric->latest_time_s_clean, __ATOMIC_RELAXED);
time_t hot = __atomic_load_n(&metric->latest_time_s_hot, __ATOMIC_RELAXED);
@@ -524,25 +580,21 @@ inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metr
return MAX(clean, hot);
}
-inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
- internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
-
+inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, uint32_t update_every_s) {
if(update_every_s > 0)
return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, true);
return false;
}
-inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
- internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
-
+inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, uint32_t update_every_s) {
if(update_every_s > 0)
return set_metric_field_with_condition(metric->latest_update_every_s, update_every_s, _current <= 0);
return false;
}
-inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
+inline uint32_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
return __atomic_load_n(&metric->latest_update_every_s, __ATOMIC_RELAXED);
}
@@ -589,7 +641,7 @@ inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
inline void mrg_update_metric_retention_and_granularity_by_uuid(
MRG *mrg, Word_t section, uuid_t *uuid,
time_t first_time_s, time_t last_time_s,
- time_t update_every_s, time_t now_s)
+ uint32_t update_every_s, time_t now_s)
{
if(unlikely(last_time_s > now_s)) {
nd_log_limit_static_global_var(erl, 1, 0);
@@ -626,14 +678,35 @@ inline void mrg_update_metric_retention_and_granularity_by_uuid(
.section = section,
.first_time_s = first_time_s,
.last_time_s = last_time_s,
- .latest_update_every_s = (uint32_t) update_every_s
+ .latest_update_every_s = update_every_s
};
metric = mrg_metric_add_and_acquire(mrg, entry, &added);
}
- if (likely(!added))
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *) section;
+ if (likely(!added)) {
+ uint64_t old_samples = 0;
+
+ if (update_every_s && metric->latest_update_every_s && metric->latest_time_s_clean)
+ old_samples = (metric->latest_time_s_clean - metric->first_time_s) / metric->latest_update_every_s;
+
mrg_metric_expand_retention(mrg, metric, first_time_s, last_time_s, update_every_s);
+ uint64_t new_samples = 0;
+ if (update_every_s && metric->latest_update_every_s && metric->latest_time_s_clean)
+ new_samples = (metric->latest_time_s_clean - metric->first_time_s) / metric->latest_update_every_s;
+
+ __atomic_add_fetch(&ctx->atomic.samples, new_samples - old_samples, __ATOMIC_RELAXED);
+ }
+ else {
+ // Newly added
+ if (update_every_s) {
+ uint64_t samples = (last_time_s - first_time_s) / update_every_s;
+ __atomic_add_fetch(&ctx->atomic.samples, samples, __ATOMIC_RELAXED);
+ }
+ __atomic_add_fetch(&ctx->atomic.metrics, 1, __ATOMIC_RELAXED);
+ }
+
mrg_metric_release(mrg, metric);
}
diff --git a/database/engine/metric.h b/src/database/engine/metric.h
index dbb949301..3bace9057 100644
--- a/database/engine/metric.h
+++ b/src/database/engine/metric.h
@@ -52,7 +52,7 @@ MRG *mrg_create(ssize_t partitions);
void mrg_destroy(MRG *mrg);
METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric);
-bool mrg_metric_release(MRG *mrg, METRIC *metric);
+void mrg_metric_release(MRG *mrg, METRIC *metric);
METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret);
METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section);
@@ -69,13 +69,14 @@ time_t mrg_metric_get_first_time_s(MRG *mrg, METRIC *metric);
bool mrg_metric_set_clean_latest_time_s(MRG *mrg, METRIC *metric, time_t latest_time_s);
bool mrg_metric_set_hot_latest_time_s(MRG *mrg, METRIC *metric, time_t latest_time_s);
time_t mrg_metric_get_latest_time_s(MRG *mrg, METRIC *metric);
+time_t mrg_metric_get_latest_clean_time_s(MRG *mrg, METRIC *metric);
-bool mrg_metric_set_update_every(MRG *mrg, METRIC *metric, time_t update_every_s);
-bool mrg_metric_set_update_every_s_if_zero(MRG *mrg, METRIC *metric, time_t update_every_s);
-time_t mrg_metric_get_update_every_s(MRG *mrg, METRIC *metric);
+bool mrg_metric_set_update_every(MRG *mrg, METRIC *metric, uint32_t update_every_s);
+bool mrg_metric_set_update_every_s_if_zero(MRG *mrg, METRIC *metric, uint32_t update_every_s);
+uint32_t mrg_metric_get_update_every_s(MRG *mrg, METRIC *metric);
-void mrg_metric_expand_retention(MRG *mrg, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s);
-void mrg_metric_get_retention(MRG *mrg, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s);
+void mrg_metric_expand_retention(MRG *mrg, METRIC *metric, time_t first_time_s, time_t last_time_s, uint32_t update_every_s);
+void mrg_metric_get_retention(MRG *mrg, METRIC *metric, time_t *first_time_s, time_t *last_time_s, uint32_t *update_every_s);
bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric);
bool mrg_metric_set_writer(MRG *mrg, METRIC *metric);
@@ -89,6 +90,6 @@ size_t mrg_aral_overhead(void);
void mrg_update_metric_retention_and_granularity_by_uuid(
MRG *mrg, Word_t section, uuid_t *uuid,
time_t first_time_s, time_t last_time_s,
- time_t update_every_s, time_t now_s);
+ uint32_t update_every_s, time_t now_s);
#endif // DBENGINE_METRIC_H
diff --git a/database/engine/page.c b/src/database/engine/page.c
index b7a393483..13fe90f7f 100644
--- a/database/engine/page.c
+++ b/src/database/engine/page.c
@@ -111,9 +111,9 @@ void pgd_init_arals(void)
// FIXME: add stats
pgd_alloc_globals.aral_gorilla_buffer[i] = aral_create(
buf,
- GORILLA_BUFFER_SIZE,
+ RRDENG_GORILLA_32BIT_BUFFER_SIZE,
64,
- 512 * GORILLA_BUFFER_SIZE,
+ 512 * RRDENG_GORILLA_32BIT_BUFFER_SIZE,
pgc_aral_statistics(),
NULL, NULL, false, false);
}
@@ -165,8 +165,8 @@ PGD *pgd_create(uint8_t type, uint32_t slots)
pg->states = PGD_STATE_CREATED_FROM_COLLECTOR;
switch (type) {
- case PAGE_METRICS:
- case PAGE_TIER: {
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1: {
uint32_t size = slots * page_type_size[type];
internal_fatal(!size || slots == 1,
@@ -176,11 +176,11 @@ PGD *pgd_create(uint8_t type, uint32_t slots)
pg->raw.data = pgd_data_aral_alloc(size);
break;
}
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
internal_fatal(slots == 1,
"DBENGINE: invalid number of slots (%u) or page type (%u)", slots, type);
- pg->slots = 8 * GORILLA_BUFFER_SLOTS;
+ pg->slots = 8 * RRDENG_GORILLA_32BIT_BUFFER_SLOTS;
// allocate new gorilla writer
pg->gorilla.aral_index = gettid() % 4;
@@ -188,16 +188,19 @@ PGD *pgd_create(uint8_t type, uint32_t slots)
// allocate new gorilla buffer
gorilla_buffer_t *gbuf = aral_mallocz(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index]);
- memset(gbuf, 0, GORILLA_BUFFER_SIZE);
+ memset(gbuf, 0, RRDENG_GORILLA_32BIT_BUFFER_SIZE);
global_statistics_gorilla_buffer_add_hot();
- *pg->gorilla.writer = gorilla_writer_init(gbuf, GORILLA_BUFFER_SLOTS);
+ *pg->gorilla.writer = gorilla_writer_init(gbuf, RRDENG_GORILLA_32BIT_BUFFER_SLOTS);
pg->gorilla.num_buffers = 1;
break;
}
default:
- fatal("Unknown page type: %uc", type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, type);
+ aral_freez(pgd_alloc_globals.aral_pgd, pg);
+ pg = PGD_EMPTY;
+ break;
}
return pg;
@@ -219,8 +222,8 @@ PGD *pgd_create_from_disk_data(uint8_t type, void *base, uint32_t size)
switch (type)
{
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
pg->raw.size = size;
pg->used = size / page_type_size[type];
pg->slots = pg->used;
@@ -228,10 +231,11 @@ PGD *pgd_create_from_disk_data(uint8_t type, void *base, uint32_t size)
pg->raw.data = pgd_data_aral_alloc(size);
memcpy(pg->raw.data, base, size);
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
internal_fatal(size == 0, "Asked to create page with 0 data!!!");
internal_fatal(size % sizeof(uint32_t), "Unaligned gorilla buffer size");
- internal_fatal(size % GORILLA_BUFFER_SIZE, "Expected size to be a multiple of %zu-bytes", GORILLA_BUFFER_SIZE);
+ internal_fatal(size % RRDENG_GORILLA_32BIT_BUFFER_SIZE, "Expected size to be a multiple of %zu-bytes",
+ RRDENG_GORILLA_32BIT_BUFFER_SIZE);
pg->raw.data = mallocz(size);
pg->raw.size = size;
@@ -246,7 +250,10 @@ PGD *pgd_create_from_disk_data(uint8_t type, void *base, uint32_t size)
pg->slots = pg->used;
break;
default:
- fatal("Unknown page type: %uc", type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, type);
+ aral_freez(pgd_alloc_globals.aral_pgd, pg);
+ pg = PGD_EMPTY;
+ break;
}
return pg;
@@ -262,11 +269,11 @@ void pgd_free(PGD *pg)
switch (pg->type)
{
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
pgd_data_aral_free(pg->raw.data, pg->raw.size);
break;
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
if (pg->states & PGD_STATE_CREATED_FROM_DISK)
{
internal_fatal(pg->raw.data == NULL, "Tried to free gorilla PGD loaded from disk with NULL data");
@@ -306,7 +313,8 @@ void pgd_free(PGD *pg)
break;
}
default:
- fatal("Unknown page type: %uc", pg->type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
+ break;
}
aral_freez(pgd_alloc_globals.aral_pgd, pg);
@@ -358,20 +366,21 @@ uint32_t pgd_memory_footprint(PGD *pg)
size_t footprint = 0;
switch (pg->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
footprint = sizeof(PGD) + pg->raw.size;
break;
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
if (pg->states & PGD_STATE_CREATED_FROM_DISK)
footprint = sizeof(PGD) + pg->raw.size;
else
- footprint = sizeof(PGD) + sizeof(gorilla_writer_t) + (pg->gorilla.num_buffers * GORILLA_BUFFER_SIZE);
+ footprint = sizeof(PGD) + sizeof(gorilla_writer_t) + (pg->gorilla.num_buffers * RRDENG_GORILLA_32BIT_BUFFER_SIZE);
break;
}
default:
- fatal("Unknown page type: %uc", pg->type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
+ break;
}
return footprint;
@@ -385,15 +394,15 @@ uint32_t pgd_disk_footprint(PGD *pg)
size_t size = 0;
switch (pg->type) {
- case PAGE_METRICS:
- case PAGE_TIER: {
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1: {
uint32_t used_size = pg->used * page_type_size[pg->type];
internal_fatal(used_size > pg->raw.size, "Wrong disk footprint page size");
size = used_size;
break;
}
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
if (pg->states & PGD_STATE_CREATED_FROM_COLLECTOR ||
pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING ||
pg->states & PGD_STATE_FLUSHED_TO_DISK)
@@ -404,7 +413,7 @@ uint32_t pgd_disk_footprint(PGD *pg)
internal_fatal(pg->gorilla.num_buffers == 0,
"Gorilla writer does not have any buffers");
- size = pg->gorilla.num_buffers * GORILLA_BUFFER_SIZE;
+ size = pg->gorilla.num_buffers * RRDENG_GORILLA_32BIT_BUFFER_SIZE;
if (pg->states & PGD_STATE_CREATED_FROM_COLLECTOR) {
global_statistics_tier0_disk_compressed_bytes(gorilla_writer_nbytes(pg->gorilla.writer));
@@ -419,7 +428,8 @@ uint32_t pgd_disk_footprint(PGD *pg)
break;
}
default:
- fatal("Unknown page type: %uc", pg->type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
+ break;
}
internal_fatal(pg->states & PGD_STATE_CREATED_FROM_DISK,
@@ -434,11 +444,11 @@ void pgd_copy_to_extent(PGD *pg, uint8_t *dst, uint32_t dst_size)
pgd_disk_footprint(pg), dst_size);
switch (pg->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
memcpy(dst, pg->raw.data, dst_size);
break;
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
if ((pg->states & PGD_STATE_SCHEDULED_FOR_FLUSHING) == 0)
fatal("Copying to extent is supported only for PGDs that are scheduled for flushing.");
@@ -456,7 +466,8 @@ void pgd_copy_to_extent(PGD *pg, uint8_t *dst, uint32_t dst_size)
break;
}
default:
- fatal("Unknown page type: %uc", pg->type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
+ break;
}
pg->states = PGD_STATE_FLUSHED_TO_DISK;
@@ -490,7 +501,7 @@ void pgd_append_point(PGD *pg,
fatal("Data collection on page already scheduled for flushing");
switch (pg->type) {
- case PAGE_METRICS: {
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT: {
storage_number *tier0_metric_data = (storage_number *)pg->raw.data;
storage_number t = pack_storage_number(n, flags);
tier0_metric_data[pg->used++] = t;
@@ -500,7 +511,7 @@ void pgd_append_point(PGD *pg,
break;
}
- case PAGE_TIER: {
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1: {
storage_number_tier1_t *tier12_metric_data = (storage_number_tier1_t *)pg->raw.data;
storage_number_tier1_t t;
t.sum_value = (float) n;
@@ -515,7 +526,7 @@ void pgd_append_point(PGD *pg,
break;
}
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
pg->used++;
storage_number t = pack_storage_number(n, flags);
@@ -525,9 +536,9 @@ void pgd_append_point(PGD *pg,
bool ok = gorilla_writer_write(pg->gorilla.writer, t);
if (!ok) {
gorilla_buffer_t *new_buffer = aral_mallocz(pgd_alloc_globals.aral_gorilla_buffer[pg->gorilla.aral_index]);
- memset(new_buffer, 0, GORILLA_BUFFER_SIZE);
+ memset(new_buffer, 0, RRDENG_GORILLA_32BIT_BUFFER_SIZE);
- gorilla_writer_add_buffer(pg->gorilla.writer, new_buffer, GORILLA_BUFFER_SLOTS);
+ gorilla_writer_add_buffer(pg->gorilla.writer, new_buffer, RRDENG_GORILLA_32BIT_BUFFER_SLOTS);
pg->gorilla.num_buffers += 1;
global_statistics_gorilla_buffer_add_hot();
@@ -537,7 +548,7 @@ void pgd_append_point(PGD *pg,
break;
}
default:
- fatal("DBENGINE: unknown page type id %d", pg->type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
break;
}
}
@@ -550,11 +561,11 @@ static void pgdc_seek(PGDC *pgdc, uint32_t position)
PGD *pg = pgdc->pgd;
switch (pg->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
pgdc->slots = pgdc->pgd->used;
break;
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
if (pg->states & PGD_STATE_CREATED_FROM_DISK) {
pgdc->slots = pgdc->pgd->slots;
pgdc->gr = gorilla_reader_init((void *) pg->raw.data);
@@ -588,7 +599,7 @@ static void pgdc_seek(PGDC *pgdc, uint32_t position)
break;
}
default:
- fatal("DBENGINE: unknown page type id %d", pg->type);
+ netdata_log_error("%s() - Unknown page type: %uc", __FUNCTION__, pg->type);
break;
}
}
@@ -612,7 +623,7 @@ void pgdc_reset(PGDC *pgdc, PGD *pgd, uint32_t position)
pgdc_seek(pgdc, position);
}
-bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *sp)
+bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position __maybe_unused, STORAGE_POINT *sp)
{
if (!pgdc->pgd || pgdc->pgd == PGD_EMPTY || pgdc->position >= pgdc->slots)
{
@@ -624,7 +635,7 @@ bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *
switch (pgdc->pgd->type)
{
- case PAGE_METRICS: {
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT: {
storage_number *array = (storage_number *) pgdc->pgd->raw.data;
storage_number n = array[pgdc->position++];
@@ -635,7 +646,7 @@ bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *
return true;
}
- case PAGE_TIER: {
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1: {
storage_number_tier1_t *array = (storage_number_tier1_t *) pgdc->pgd->raw.data;
storage_number_tier1_t n = array[pgdc->position++];
@@ -648,7 +659,7 @@ bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *
return true;
}
- case PAGE_GORILLA_METRICS: {
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT: {
pgdc->position++;
uint32_t n = 666666666;
@@ -668,7 +679,8 @@ bool pgdc_get_next_point(PGDC *pgdc, uint32_t expected_position, STORAGE_POINT *
static bool logged = false;
if (!logged)
{
- netdata_log_error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", pgd_type(pgdc->pgd));
+ netdata_log_error("DBENGINE: unknown page type %"PRIu32" found. Cannot decode it. Ignoring its metrics.",
+ pgd_type(pgdc->pgd));
logged = true;
}
diff --git a/database/engine/page.h b/src/database/engine/page.h
index 32c87c580..32c87c580 100644
--- a/database/engine/page.h
+++ b/src/database/engine/page.h
diff --git a/database/engine/page_test.cc b/src/database/engine/page_test.cc
index d61299bc4..d61299bc4 100644
--- a/database/engine/page_test.cc
+++ b/src/database/engine/page_test.cc
diff --git a/database/engine/page_test.h b/src/database/engine/page_test.h
index 30837f0ab..30837f0ab 100644
--- a/database/engine/page_test.h
+++ b/src/database/engine/page_test.h
diff --git a/database/engine/pagecache.c b/src/database/engine/pagecache.c
index dab9cdd0d..452fdc50b 100644
--- a/database/engine/pagecache.c
+++ b/src/database/engine/pagecache.c
@@ -222,7 +222,7 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin
Word_t metric_id = mrg_metric_id(main_mrg, metric);
time_t now_s = wanted_start_time_s;
- time_t dt_s = mrg_metric_get_update_every_s(main_mrg, metric);
+ uint32_t dt_s = mrg_metric_get_update_every_s(main_mrg, metric);
if(!dt_s)
dt_s = default_rrd_update_every;
@@ -246,7 +246,7 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin
time_t page_start_time_s = pgc_page_start_time_s(page);
time_t page_end_time_s = pgc_page_end_time_s(page);
- time_t page_update_every_s = pgc_page_update_every_s(page);
+ uint32_t page_update_every_s = pgc_page_update_every_s(page);
if(!page_update_every_s)
page_update_every_s = dt_s;
@@ -282,7 +282,7 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin
pd->metric_id = metric_id;
pd->first_time_s = page_start_time_s;
pd->last_time_s = page_end_time_s;
- pd->update_every_s = (uint32_t) page_update_every_s;
+ pd->update_every_s = page_update_every_s;
pd->page = (open_cache_mode) ? NULL : page;
pd->status |= tags;
@@ -332,8 +332,8 @@ static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengin
static void pgc_inject_gap(struct rrdengine_instance *ctx, METRIC *metric, time_t start_time_s, time_t end_time_s) {
- time_t db_first_time_s, db_last_time_s, db_update_every_s;
- mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s);
+ time_t db_first_time_s, db_last_time_s;
+ mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, NULL);
if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) != PAGE_IS_IN_RANGE)
return;
@@ -547,7 +547,7 @@ static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METR
if(prc == PAGE_IS_IN_THE_FUTURE)
break;
- time_t page_update_every_s = page_entry_in_journal->update_every_s;
+ uint32_t page_update_every_s = page_entry_in_journal->update_every_s;
size_t page_length = page_entry_in_journal->page_length;
if(datafile_acquire(datafile, DATAFILE_ACQUIRE_OPEN_CACHE)) { //for open cache item
@@ -567,7 +567,7 @@ static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METR
.metric_id = metric_id,
.start_time_s = page_first_time_s,
.end_time_s = page_last_time_s,
- .update_every_s = (uint32_t) page_update_every_s,
+ .update_every_s = page_update_every_s,
.data = datafile,
.size = 0,
.custom_data = (uint8_t *) &ei,
@@ -845,7 +845,7 @@ struct pgc_page *pg_cache_lookup_next(
struct rrdengine_instance *ctx,
PDC *pdc,
time_t now_s,
- time_t last_update_every_s,
+ uint32_t last_update_every_s,
size_t *entries
) {
if (unlikely(!pdc))
@@ -905,7 +905,7 @@ struct pgc_page *pg_cache_lookup_next(
time_t page_start_time_s = pgc_page_start_time_s(page);
time_t page_end_time_s = pgc_page_end_time_s(page);
- time_t page_update_every_s = pgc_page_update_every_s(page);
+ uint32_t page_update_every_s = pgc_page_update_every_s(page);
if(unlikely(page_start_time_s == INVALID_TIME || page_end_time_s == INVALID_TIME)) {
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_zero_time_skipped, 1, __ATOMIC_RELAXED);
@@ -918,7 +918,7 @@ struct pgc_page *pg_cache_lookup_next(
if (unlikely(page_update_every_s <= 0 || page_update_every_s > 86400)) {
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_update_every_fixed, 1, __ATOMIC_RELAXED);
page_update_every_s = pgc_page_fix_update_every(page, last_update_every_s);
- pd->update_every_s = (uint32_t) page_update_every_s;
+ pd->update_every_s = page_update_every_s;
}
size_t entries_by_size = pgd_slots_used(pgc_page_data(page));
@@ -983,7 +983,7 @@ struct pgc_page *pg_cache_lookup_next(
return page;
}
-void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, time_t update_every_s,
+void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, uint32_t update_every_s,
struct rrdengine_datafile *datafile, uint64_t extent_offset, unsigned extent_size, uint32_t page_length) {
if(!datafile_acquire(datafile, DATAFILE_ACQUIRE_OPEN_CACHE)) // for open cache item
@@ -1003,7 +1003,7 @@ void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s
.metric_id = metric_id,
.start_time_s = start_time_s,
.end_time_s = end_time_s,
- .update_every_s = (uint32_t) update_every_s,
+ .update_every_s = update_every_s,
.size = 0,
.data = datafile,
.custom_data = (uint8_t *) &ext_io_data,
diff --git a/database/engine/pagecache.h b/src/database/engine/pagecache.h
index dbcbea53a..103d36484 100644
--- a/database/engine/pagecache.h
+++ b/src/database/engine/pagecache.h
@@ -14,8 +14,6 @@ extern struct pgc *extent_cache;
struct rrdengine_instance;
#define INVALID_TIME (0)
-#define MAX_PAGE_CACHE_FETCH_RETRIES (3)
-#define PAGE_CACHE_FETCH_WAIT_TIMEOUT (3)
extern struct rrdeng_cache_efficiency_stats rrdeng_cache_efficiency_stats;
@@ -54,9 +52,9 @@ struct page_details_control;
void rrdeng_prep_wait(struct page_details_control *pdc);
void rrdeng_prep_query(struct page_details_control *pdc, bool worker);
void pg_cache_preload(struct rrdeng_query_handle *handle);
-struct pgc_page *pg_cache_lookup_next(struct rrdengine_instance *ctx, struct page_details_control *pdc, time_t now_s, time_t last_update_every_s, size_t *entries);
+struct pgc_page *pg_cache_lookup_next(struct rrdengine_instance *ctx, struct page_details_control *pdc, time_t now_s, uint32_t last_update_every_s, size_t *entries);
void pgc_and_mrg_initialize(void);
-void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, time_t update_every_s, struct rrdengine_datafile *datafile, uint64_t extent_offset, unsigned extent_size, uint32_t page_length);
+void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, uint32_t update_every_s, struct rrdengine_datafile *datafile, uint64_t extent_offset, unsigned extent_size, uint32_t page_length);
#endif /* NETDATA_PAGECACHE_H */
diff --git a/database/engine/pdc.c b/src/database/engine/pdc.c
index 5fe205e64..79a424b77 100644
--- a/database/engine/pdc.c
+++ b/src/database/engine/pdc.c
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#define NETDATA_RRD_INTERNALS
#include "pdc.h"
+#include "dbengine-compression.h"
struct extent_page_details_list {
uv_file file;
@@ -628,24 +629,25 @@ void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags) {
buffer_strcat(wb, "STEP_UNALIGNED");
}
-inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error) {
+inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, uint32_t overwrite_zero_update_every_s, bool have_read_error) {
time_t start_time_s = (time_t) (descr->start_time_ut / USEC_PER_SEC);
- time_t end_time_s;
- size_t entries;
+ time_t end_time_s = 0;
+ size_t entries = 0;
switch (descr->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
end_time_s = descr->end_time_ut / USEC_PER_SEC;
entries = 0;
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
end_time_s = start_time_s + descr->gorilla.delta_time_s;
entries = descr->gorilla.entries;
break;
default:
- fatal("Unknown page type: %uc\n", descr->type);
+ // Nothing to do. Validate page will notify the user.
+ break;
}
return validate_page(
@@ -666,29 +668,30 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
uuid_t *uuid,
time_t start_time_s,
time_t end_time_s,
- time_t update_every_s, // can be zero, if unknown
+ uint32_t update_every_s, // can be zero, if unknown
size_t page_length,
uint8_t page_type,
size_t entries, // can be zero, if unknown
time_t now_s, // can be zero, to disable future timestamp check
- time_t overwrite_zero_update_every_s, // can be zero, if unknown
+ uint32_t overwrite_zero_update_every_s, // can be zero, if unknown
bool have_read_error,
const char *msg,
- RRDENG_COLLECT_PAGE_FLAGS flags) {
-
+ RRDENG_COLLECT_PAGE_FLAGS flags)
+{
VALIDATED_PAGE_DESCRIPTOR vd = {
.start_time_s = start_time_s,
.end_time_s = end_time_s,
.update_every_s = update_every_s,
.page_length = page_length,
+ .point_size = page_type_size[page_type],
.type = page_type,
.is_valid = true,
};
- vd.point_size = page_type_size[vd.type];
+ bool known_page_type = true;
switch (page_type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
// always calculate entries by size
vd.entries = page_entries_by_size(vd.page_length, vd.point_size);
@@ -696,13 +699,13 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
if(!entries)
entries = vd.entries;
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
internal_fatal(entries == 0, "0 number of entries found on gorilla page");
vd.entries = entries;
break;
default:
- // TODO: should set vd.is_valid false instead?
- fatal("Unknown page type: %uc", page_type);
+ known_page_type = false;
+ break;
}
// allow to be called without update every (when loading pages from disk)
@@ -723,16 +726,16 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
// If gorilla can not compress the data we might end up needing slightly more
// than 4KiB. However, gorilla pages extend the page length by increments of
// 512 bytes.
- max_page_length += ((page_type == PAGE_GORILLA_METRICS) * GORILLA_BUFFER_SIZE);
+ max_page_length += ((page_type == RRDENG_PAGE_TYPE_GORILLA_32BIT) * RRDENG_GORILLA_32BIT_BUFFER_SIZE);
- if( have_read_error ||
+ if (!known_page_type ||
+ have_read_error ||
vd.page_length == 0 ||
vd.page_length > max_page_length ||
vd.start_time_s > vd.end_time_s ||
(now_s && vd.end_time_s > now_s) ||
vd.start_time_s <= 0 ||
vd.end_time_s <= 0 ||
- vd.update_every_s < 0 ||
(vd.start_time_s == vd.end_time_s && vd.entries > 1) ||
(vd.update_every_s == 0 && vd.entries > 1))
{
@@ -791,13 +794,13 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
#endif
"DBENGINE: metric '%s' %s invalid page of type %u "
- "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s)",
+ "from %ld to %ld (now %ld), update every %u, page length %zu, entries %zu (flags: %s)",
uuid_str, msg, vd.type,
vd.start_time_s, vd.end_time_s, now_s, vd.update_every_s, vd.page_length, vd.entries, wb?buffer_tostring(wb):""
);
}
else {
- const char *err_valid = (vd.is_valid) ? "" : "found invalid, ";
+ const char *err_valid = "";
const char *err_start = (vd.start_time_s == start_time_s) ? "" : "start time updated, ";
const char *err_end = (vd.end_time_s == end_time_s) ? "" : "end time updated, ";
const char *err_update = (vd.update_every_s == update_every_s) ? "" : "update every updated, ";
@@ -811,9 +814,9 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
#endif
"DBENGINE: metric '%s' %s page of type %u "
- "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s), "
+ "from %ld to %ld (now %ld), update every %u, page length %zu, entries %zu (flags: %s), "
"found inconsistent - the right is "
- "from %ld to %ld, update every %ld, page length %zu, entries %zu: "
+ "from %ld to %ld, update every %u, page length %zu, entries %zu: "
"%s%s%s%s%s%s%s",
uuid_str, msg, vd.type,
start_time_s, end_time_s, now_s, update_every_s, page_length, entries, wb?buffer_tostring(wb):"",
@@ -871,11 +874,11 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL *
if (descr) {
start_time_s = (time_t)(descr->start_time_ut / USEC_PER_SEC);
switch (descr->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC);
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
end_time_s = (time_t) start_time_s + (descr->gorilla.delta_time_s);
break;
}
@@ -938,7 +941,6 @@ static bool epdl_populate_pages_from_extent_data(
PDC_PAGE_STATUS tags,
bool cached_extent)
{
- int ret;
unsigned i, count;
void *uncompressed_buf = NULL;
uint32_t payload_length, payload_offset, trailer_offset, uncompressed_payload_length = 0;
@@ -973,18 +975,17 @@ static bool epdl_populate_pages_from_extent_data(
if( !can_use_data ||
count < 1 ||
count > MAX_PAGES_PER_EXTENT ||
- (header->compression_algorithm != RRD_NO_COMPRESSION && header->compression_algorithm != RRD_LZ4) ||
+ !dbengine_valid_compression_algorithm(header->compression_algorithm) ||
(payload_length != trailer_offset - payload_offset) ||
(data_length != payload_offset + payload_length + sizeof(*trailer))
- ) {
+ ) {
epdl_extent_loading_error_log(ctx, epdl, NULL, "header is INVALID");
return false;
}
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, data, epdl->extent_size - sizeof(*trailer));
- ret = crc32cmp(trailer->checksum, crc);
- if (unlikely(ret)) {
+ if (unlikely(crc32cmp(trailer->checksum, crc))) {
ctx_io_error(ctx);
have_read_error = true;
epdl_extent_loading_error_log(ctx, epdl, NULL, "CRC32 checksum FAILED");
@@ -993,14 +994,15 @@ static bool epdl_populate_pages_from_extent_data(
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_EXTENT_DECOMPRESSION);
- if (likely(!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm)) {
+ if (likely(!have_read_error && RRDENG_COMPRESSION_NONE != header->compression_algorithm)) {
// find the uncompressed extent size
uncompressed_payload_length = 0;
for (i = 0; i < count; ++i) {
size_t page_length = header->descr[i].page_length;
- if (page_length > RRDENG_BLOCK_SIZE && (header->descr[i].type != PAGE_GORILLA_METRICS ||
- (header->descr[i].type == PAGE_GORILLA_METRICS &&
- (page_length - RRDENG_BLOCK_SIZE) % GORILLA_BUFFER_SIZE))) {
+ if (page_length > RRDENG_BLOCK_SIZE &&
+ (header->descr[i].type != RRDENG_PAGE_TYPE_GORILLA_32BIT ||
+ (header->descr[i].type == RRDENG_PAGE_TYPE_GORILLA_32BIT &&
+ (page_length - RRDENG_BLOCK_SIZE) % RRDENG_GORILLA_32BIT_BUFFER_SIZE))) {
have_read_error = true;
break;
}
@@ -1015,11 +1017,16 @@ static bool epdl_populate_pages_from_extent_data(
eb = extent_buffer_get(uncompressed_payload_length);
uncompressed_buf = eb->data;
- ret = LZ4_decompress_safe(data + payload_offset, uncompressed_buf,
- (int) payload_length, (int) uncompressed_payload_length);
+ size_t bytes = dbengine_decompress(uncompressed_buf, data + payload_offset,
+ uncompressed_payload_length, payload_length,
+ header->compression_algorithm);
- __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED);
- __atomic_add_fetch(&ctx->stats.after_decompress_bytes, ret, __ATOMIC_RELAXED);
+ if(!bytes)
+ have_read_error = true;
+ else {
+ __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ctx->stats.after_decompress_bytes, bytes, __ATOMIC_RELAXED);
+ }
}
}
@@ -1075,7 +1082,7 @@ static bool epdl_populate_pages_from_extent_data(
stats_load_invalid_page++;
}
else {
- if (RRD_NO_COMPRESSION == header->compression_algorithm) {
+ if (RRDENG_COMPRESSION_NONE == header->compression_algorithm) {
pgd = pgd_create_from_disk_data(header->descr[i].type,
data + payload_offset + page_offset,
vd.page_length);
diff --git a/database/engine/pdc.h b/src/database/engine/pdc.h
index 9bae39ade..9bae39ade 100644
--- a/database/engine/pdc.h
+++ b/src/database/engine/pdc.h
diff --git a/database/engine/rrddiskprotocol.h b/src/database/engine/rrddiskprotocol.h
index 86b41f0b3..dc1a4c980 100644
--- a/database/engine/rrddiskprotocol.h
+++ b/src/database/engine/rrddiskprotocol.h
@@ -19,13 +19,16 @@
#define UUID_SZ (16)
#define CHECKSUM_SZ (4) /* CRC32 */
-#define RRD_NO_COMPRESSION (0)
-#define RRD_LZ4 (1)
+#define RRDENG_COMPRESSION_NONE (0)
+#define RRDENG_COMPRESSION_LZ4 (1)
+#define RRDENG_COMPRESSION_ZSTD (2)
#define RRDENG_DF_SB_PADDING_SZ (RRDENG_BLOCK_SIZE - (RRDENG_MAGIC_SZ + RRDENG_VER_SZ + sizeof(uint8_t)))
+
/*
* Data file persistent super-block
*/
+
struct rrdeng_df_sb {
char magic_number[RRDENG_MAGIC_SZ];
char version[RRDENG_VER_SZ];
@@ -36,10 +39,11 @@ struct rrdeng_df_sb {
/*
* Page types
*/
-#define PAGE_METRICS (0)
-#define PAGE_TIER (1)
-#define PAGE_GORILLA_METRICS (2)
-#define PAGE_TYPE_MAX 2 // Maximum page type (inclusive)
+
+#define RRDENG_PAGE_TYPE_ARRAY_32BIT (0)
+#define RRDENG_PAGE_TYPE_ARRAY_TIER1 (1)
+#define RRDENG_PAGE_TYPE_GORILLA_32BIT (2)
+#define RRDENG_PAGE_TYPE_MAX (2) // Maximum page type (inclusive)
/*
* Data file page descriptor
diff --git a/database/engine/rrdengine.c b/src/database/engine/rrdengine.c
index b82cc1ad1..7b2137436 100644
--- a/database/engine/rrdengine.c
+++ b/src/database/engine/rrdengine.c
@@ -3,6 +3,7 @@
#include "rrdengine.h"
#include "pdc.h"
+#include "dbengine-compression.h"
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
@@ -229,7 +230,7 @@ static void after_work_standard_callback(uv_work_t* req, int status) {
worker_is_idle();
}
-static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb work_cb, after_work_cb after_work_cb) {
+static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb do_work_cb, after_work_cb do_after_work_cb) {
struct rrdeng_work *work_request = NULL;
internal_fatal(rrdeng_main.tid != gettid(), "work_dispatch() can only be run from the event loop thread");
@@ -240,8 +241,8 @@ static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct com
work_request->ctx = ctx;
work_request->data = data;
work_request->completion = completion;
- work_request->work_cb = work_cb;
- work_request->after_work_cb = after_work_cb;
+ work_request->work_cb = do_work_cb;
+ work_request->after_work_cb = do_after_work_cb;
work_request->opcode = opcode;
if(uv_queue_work(&rrdeng_main.loop, &work_request->req, work_standard_worker, after_work_standard_callback)) {
@@ -772,13 +773,10 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_
*/
static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_instance *ctx, struct page_descr_with_data *base, struct completion *completion) {
int ret;
- int compressed_size, max_compressed_size = 0;
unsigned i, count, size_bytes, pos, real_io_size;
- uint32_t uncompressed_payload_length, payload_offset;
+ uint32_t uncompressed_payload_length, max_compressed_size, payload_offset;
struct page_descr_with_data *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
struct extent_io_descriptor *xt_io_descr;
- struct extent_buffer *eb = NULL;
- void *compressed_buf = NULL;
Word_t Index;
uint8_t compression_algorithm = ctx->config.global_compress_alg;
struct rrdengine_datafile *datafile;
@@ -807,20 +805,8 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
xt_io_descr = extent_io_descriptor_get();
xt_io_descr->ctx = ctx;
payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
- switch (compression_algorithm) {
- case RRD_NO_COMPRESSION:
- size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer);
- break;
-
- default: /* Compress */
- fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
- max_compressed_size = LZ4_compressBound(uncompressed_payload_length);
- eb = extent_buffer_get(max_compressed_size);
- compressed_buf = eb->data;
- size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer);
- break;
- }
-
+ max_compressed_size = dbengine_max_compressed_size(uncompressed_payload_length, compression_algorithm);
+ size_bytes = payload_offset + MAX(uncompressed_payload_length, max_compressed_size) + sizeof(*trailer);
ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
if (unlikely(ret)) {
fatal("DBENGINE: posix_memalign:%s", strerror(ret));
@@ -832,7 +818,6 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
pos = 0;
header = xt_io_descr->buf;
- header->compression_algorithm = compression_algorithm;
header->number_of_pages = count;
pos += sizeof(*header);
@@ -844,11 +829,11 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
header->descr[i].start_time_ut = descr->start_time_ut;
switch (descr->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
header->descr[i].end_time_ut = descr->end_time_ut;
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
header->descr[i].gorilla.delta_time_s = (uint32_t) ((descr->end_time_ut - descr->start_time_ut) / USEC_PER_SEC);
header->descr[i].gorilla.entries = pgd_slots_used(descr->pgd);
break;
@@ -858,29 +843,40 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
pos += sizeof(header->descr[i]);
}
+
+ // build the extent payload
for (i = 0 ; i < count ; ++i) {
descr = xt_io_descr->descr_array[i];
pgd_copy_to_extent(descr->pgd, xt_io_descr->buf + pos, descr->page_length);
pos += descr->page_length;
}
- if(likely(compression_algorithm == RRD_LZ4)) {
- compressed_size = LZ4_compress_default(
- xt_io_descr->buf + payload_offset,
- compressed_buf,
- (int)uncompressed_payload_length,
- max_compressed_size);
+ // compress the payload
+ size_t compressed_size =
+ (int)dbengine_compress(xt_io_descr->buf + payload_offset,
+ uncompressed_payload_length,
+ compression_algorithm);
- __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED);
- __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED);
+ internal_fatal(compressed_size > max_compressed_size, "DBENGINE: compression returned more data than the max allowed");
+ internal_fatal(compressed_size > uncompressed_payload_length, "DBENGINE: compression returned more data than the uncompressed extent");
- (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size);
- extent_buffer_release(eb);
- size_bytes = payload_offset + compressed_size + sizeof(*trailer);
+ if(compressed_size) {
+ header->compression_algorithm = compression_algorithm;
header->payload_length = compressed_size;
}
- else { // RRD_NO_COMPRESSION
- header->payload_length = uncompressed_payload_length;
+ else {
+ // compression failed, or generated bigger pages
+ // so it didn't touch our uncompressed buffer
+ header->compression_algorithm = RRDENG_COMPRESSION_NONE;
+ header->payload_length = compressed_size = uncompressed_payload_length;
+ }
+
+ // set the correct size
+ size_bytes = payload_offset + compressed_size + sizeof(*trailer);
+
+ if(compression_algorithm != RRDENG_COMPRESSION_NONE) {
+ __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED);
}
real_io_size = ALIGN_BYTES_CEILING(size_bytes);
@@ -1171,7 +1167,17 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r
for (size_t index = 0; index < added; ++index) {
uuid_first_t_entry = &uuid_first_entry_list[index];
if (likely(uuid_first_t_entry->first_time_s != LONG_MAX)) {
- mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s);
+
+ time_t old_first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric);
+
+ bool changed = mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s);
+ if (changed) {
+ uint32_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric);
+ if (update_every_s && old_first_time_s && uuid_first_t_entry->first_time_s > old_first_time_s) {
+ uint64_t remove_samples = (uuid_first_t_entry->first_time_s - old_first_time_s) / update_every_s;
+ __atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED);
+ }
+ }
mrg_metric_release(main_mrg, uuid_first_t_entry->metric);
}
else {
@@ -1180,6 +1186,14 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r
// there is no retention for this metric
bool has_retention = mrg_metric_zero_disk_retention(main_mrg, uuid_first_t_entry->metric);
if (!has_retention) {
+ time_t first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric);
+ time_t last_time_s = mrg_metric_get_latest_time_s(main_mrg, uuid_first_t_entry->metric);
+ time_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric);
+ if (update_every_s && first_time_s && last_time_s) {
+ uint64_t remove_samples = (first_time_s - last_time_s) / update_every_s;
+ __atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED);
+ }
+
bool deleted = mrg_metric_release_and_delete(main_mrg, uuid_first_t_entry->metric);
if(deleted)
deleted_metrics++;
diff --git a/database/engine/rrdengine.h b/src/database/engine/rrdengine.h
index cd3352f12..3047e0c6a 100644
--- a/database/engine/rrdengine.h
+++ b/src/database/engine/rrdengine.h
@@ -153,9 +153,9 @@ struct jv2_metrics_info {
struct jv2_page_info {
time_t start_time_s;
time_t end_time_s;
- time_t update_every_s;
- size_t page_length;
+ uint32_t update_every_s;
uint32_t extent_index;
+ size_t page_length;
void *custom_data;
// private
@@ -217,7 +217,7 @@ struct rrdeng_query_handle {
// internal data
time_t now_s;
- time_t dt_s;
+ uint32_t dt_s;
unsigned position;
unsigned entries;
@@ -387,6 +387,8 @@ struct rrdengine_instance {
unsigned extents_currently_being_flushed; // non-zero until we commit data to disk (both datafile and journal file)
time_t first_time_s;
+ uint64_t metrics;
+ uint64_t samples;
} atomic;
struct {
@@ -482,7 +484,7 @@ struct page_descr_with_data *page_descriptor_get(void);
typedef struct validated_page_descriptor {
time_t start_time_s;
time_t end_time_s;
- time_t update_every_s;
+ uint32_t update_every_s;
size_t page_length;
size_t point_size;
size_t entries;
@@ -499,16 +501,16 @@ typedef struct validated_page_descriptor {
VALIDATED_PAGE_DESCRIPTOR validate_page(uuid_t *uuid,
time_t start_time_s,
time_t end_time_s,
- time_t update_every_s,
+ uint32_t update_every_s,
size_t page_length,
uint8_t page_type,
size_t entries,
time_t now_s,
- time_t overwrite_zero_update_every_s,
+ uint32_t overwrite_zero_update_every_s,
bool have_read_error,
const char *msg,
RRDENG_COLLECT_PAGE_FLAGS flags);
-VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error);
+VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, uint32_t overwrite_zero_update_every_s, bool have_read_error);
void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags);
typedef enum {
diff --git a/database/engine/rrdengineapi.c b/src/database/engine/rrdengineapi.c
index 1ddce5243..43fed492b 100755
--- a/database/engine/rrdengineapi.c
+++ b/src/database/engine/rrdengineapi.c
@@ -2,6 +2,7 @@
#include "database/engine/rrddiskprotocol.h"
#include "rrdengine.h"
+#include "dbengine-compression.h"
/* Default global database instance */
struct rrdengine_instance multidb_ctx_storage_tier0;
@@ -16,7 +17,12 @@ struct rrdengine_instance multidb_ctx_storage_tier4;
#error RRD_STORAGE_TIERS is not 5 - you need to add allocations here
#endif
struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS];
-uint8_t tier_page_type[RRD_STORAGE_TIERS] = {PAGE_METRICS, PAGE_TIER, PAGE_TIER, PAGE_TIER, PAGE_TIER};
+uint8_t tier_page_type[RRD_STORAGE_TIERS] = {
+ RRDENG_PAGE_TYPE_GORILLA_32BIT,
+ RRDENG_PAGE_TYPE_ARRAY_TIER1,
+ RRDENG_PAGE_TYPE_ARRAY_TIER1,
+ RRDENG_PAGE_TYPE_ARRAY_TIER1,
+ RRDENG_PAGE_TYPE_ARRAY_TIER1};
#if defined(ENV32BIT)
size_t tier_page_size[RRD_STORAGE_TIERS] = {2048, 1024, 192, 192, 192};
@@ -24,14 +30,14 @@ size_t tier_page_size[RRD_STORAGE_TIERS] = {2048, 1024, 192, 192, 192};
size_t tier_page_size[RRD_STORAGE_TIERS] = {4096, 2048, 384, 384, 384};
#endif
-#if PAGE_TYPE_MAX != 2
+#if RRDENG_PAGE_TYPE_MAX != 2
#error PAGE_TYPE_MAX is not 2 - you need to add allocations here
#endif
size_t page_type_size[256] = {
- [PAGE_METRICS] = sizeof(storage_number),
- [PAGE_TIER] = sizeof(storage_number_tier1_t),
- [PAGE_GORILLA_METRICS] = sizeof(storage_number)
+ [RRDENG_PAGE_TYPE_ARRAY_32BIT] = sizeof(storage_number),
+ [RRDENG_PAGE_TYPE_ARRAY_TIER1] = sizeof(storage_number_tier1_t),
+ [RRDENG_PAGE_TYPE_GORILLA_32BIT] = sizeof(storage_number)
};
__attribute__((constructor)) void initialize_multidb_ctx(void) {
@@ -74,14 +80,14 @@ static inline bool rrdeng_page_alignment_release(struct pg_alignment *pa) {
}
// charts call this
-STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid __maybe_unused) {
+STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *si __maybe_unused, uuid_t *uuid __maybe_unused) {
struct pg_alignment *pa = callocz(1, sizeof(struct pg_alignment));
rrdeng_page_alignment_acquire(pa);
return (STORAGE_METRICS_GROUP *)pa;
}
// charts call this
-void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance __maybe_unused, STORAGE_METRICS_GROUP *smg) {
+void rrdeng_metrics_group_release(STORAGE_INSTANCE *si __maybe_unused, STORAGE_METRICS_GROUP *smg) {
if(unlikely(!smg)) return;
struct pg_alignment *pa = (struct pg_alignment *)smg;
@@ -108,8 +114,8 @@ void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_
memcpy(ret_uuid, hash_value, sizeof(uuid_t));
}
-static METRIC *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+static METRIC *rrdeng_metric_get_legacy(STORAGE_INSTANCE *si, const char *rd_id, const char *st_id) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
uuid_t legacy_uuid;
rrdeng_generate_legacy_uuid(rd_id, st_id, &legacy_uuid);
return mrg_metric_get_and_acquire(main_mrg, &legacy_uuid, (Word_t) ctx);
@@ -118,25 +124,25 @@ static METRIC *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const cha
// ----------------------------------------------------------------------------
// metric handle
-void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle) {
- METRIC *metric = (METRIC *)db_metric_handle;
+void rrdeng_metric_release(STORAGE_METRIC_HANDLE *smh) {
+ METRIC *metric = (METRIC *)smh;
mrg_metric_release(main_mrg, metric);
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) {
- METRIC *metric = (METRIC *)db_metric_handle;
+STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *smh) {
+ METRIC *metric = (METRIC *)smh;
return (STORAGE_METRIC_HANDLE *) mrg_metric_dup(main_mrg, metric);
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *si, uuid_t *uuid) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
return (STORAGE_METRIC_HANDLE *) mrg_metric_get_and_acquire(main_mrg, uuid, (Word_t) ctx);
}
-static METRIC *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid) {
- internal_fatal(!db_instance, "DBENGINE: db_instance is NULL");
+static METRIC *rrdeng_metric_create(STORAGE_INSTANCE *si, uuid_t *uuid) {
+ internal_fatal(!si, "DBENGINE: STORAGE_INSTANCE is NULL");
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
MRG_ENTRY entry = {
.uuid = uuid,
.section = (Word_t)ctx,
@@ -145,12 +151,15 @@ static METRIC *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid)
.latest_update_every_s = 0,
};
- METRIC *metric = mrg_metric_add_and_acquire(main_mrg, entry, NULL);
+ bool added;
+ METRIC *metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
+ if (added)
+ __atomic_add_fetch(&ctx->atomic.metrics, 1, __ATOMIC_RELAXED);
return metric;
}
-STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
METRIC *metric;
metric = mrg_metric_get_and_acquire(main_mrg, &rd->metric_uuid, (Word_t) ctx);
@@ -160,13 +169,13 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE
// this is a single host database
// generate uuid from the chart and dimensions ids
// and overwrite the one supplied by rrddim
- metric = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset));
+ metric = rrdeng_metric_get_legacy(si, rrddim_id(rd), rrdset_id(rd->rrdset));
if (metric)
uuid_copy(rd->metric_uuid, *mrg_metric_uuid(main_mrg, metric));
}
if(likely(!metric))
- metric = rrdeng_metric_create(db_instance, &rd->metric_uuid);
+ metric = rrdeng_metric_create(si, &rd->metric_uuid);
}
#ifdef NETDATA_INTERNAL_CHECKS
@@ -192,14 +201,14 @@ STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE
// collect ops
static inline void check_and_fix_mrg_update_every(struct rrdeng_collect_handle *handle) {
- if(unlikely((time_t)(handle->update_every_ut / USEC_PER_SEC) != mrg_metric_get_update_every_s(main_mrg, handle->metric))) {
- internal_error(true, "DBENGINE: collection handle has update every %ld, but the metric registry has %ld. Fixing it.",
- (time_t)(handle->update_every_ut / USEC_PER_SEC), mrg_metric_get_update_every_s(main_mrg, handle->metric));
+ if(unlikely((uint32_t)(handle->update_every_ut / USEC_PER_SEC) != mrg_metric_get_update_every_s(main_mrg, handle->metric))) {
+ internal_error(true, "DBENGINE: collection handle has update every %u, but the metric registry has %u. Fixing it.",
+ (uint32_t)(handle->update_every_ut / USEC_PER_SEC), mrg_metric_get_update_every_s(main_mrg, handle->metric));
if(unlikely(!handle->update_every_ut))
handle->update_every_ut = (usec_t)mrg_metric_get_update_every_s(main_mrg, handle->metric) * USEC_PER_SEC;
else
- mrg_metric_set_update_every(main_mrg, handle->metric, (time_t)(handle->update_every_ut / USEC_PER_SEC));
+ mrg_metric_set_update_every(main_mrg, handle->metric, (uint32_t)(handle->update_every_ut / USEC_PER_SEC));
}
}
@@ -213,7 +222,7 @@ static inline bool check_completed_page_consistency(struct rrdeng_collect_handle
uuid_t *uuid = mrg_metric_uuid(main_mrg, handle->metric);
time_t start_time_s = pgc_page_start_time_s(handle->pgc_page);
time_t end_time_s = pgc_page_end_time_s(handle->pgc_page);
- time_t update_every_s = pgc_page_update_every_s(handle->pgc_page);
+ uint32_t update_every_s = pgc_page_update_every_s(handle->pgc_page);
size_t page_length = handle->page_position * CTX_POINT_SIZE_BYTES(ctx);
size_t entries = handle->page_position;
time_t overwrite_zero_update_every_s = (time_t)(handle->update_every_ut / USEC_PER_SEC);
@@ -245,8 +254,8 @@ static inline bool check_completed_page_consistency(struct rrdeng_collect_handle
* Gets a handle for storing metrics to the database.
* The handle must be released with rrdeng_store_metric_final().
*/
-STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg) {
- METRIC *metric = (METRIC *)db_metric_handle;
+STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *smh, uint32_t update_every, STORAGE_METRICS_GROUP *smg) {
+ METRIC *metric = (METRIC *)smh;
struct rrdengine_instance *ctx = mrg_metric_ctx(metric);
bool is_1st_metric_writer = true;
@@ -262,7 +271,7 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri
struct rrdeng_collect_handle *handle;
handle = callocz(1, sizeof(struct rrdeng_collect_handle));
- handle->common.backend = STORAGE_ENGINE_BACKEND_DBENGINE;
+ handle->common.seb = STORAGE_ENGINE_BACKEND_DBENGINE;
handle->metric = metric;
handle->pgc_page = NULL;
@@ -288,15 +297,15 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri
// data collection may be able to go back in time and during the addition of new pages
// clean pages may be found matching ours!
- time_t db_first_time_s, db_last_time_s, db_update_every_s;
- mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s);
+ time_t db_first_time_s, db_last_time_s;
+ mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, NULL);
handle->page_end_time_ut = (usec_t)db_last_time_s * USEC_PER_SEC;
return (STORAGE_COLLECT_HANDLE *)handle;
}
-void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) {
- struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *sch) {
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch;
if (unlikely(!handle->pgc_page))
return;
@@ -307,7 +316,17 @@ void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_h
else {
check_completed_page_consistency(handle);
mrg_metric_set_clean_latest_time_s(main_mrg, handle->metric, pgc_page_end_time_s(handle->pgc_page));
- pgc_page_hot_to_dirty_and_release(main_cache, handle->pgc_page);
+
+ struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
+ time_t start_time_s = pgc_page_start_time_s(handle->pgc_page);
+ time_t end_time_s = pgc_page_end_time_s(handle->pgc_page);
+ uint32_t update_every_s = mrg_metric_get_update_every_s(main_mrg, handle->metric);
+ if (end_time_s && start_time_s && end_time_s > start_time_s && update_every_s) {
+ uint64_t add_samples = (end_time_s - start_time_s) / update_every_s;
+ __atomic_add_fetch(&ctx->atomic.samples, add_samples, __ATOMIC_RELAXED);
+ }
+
+ pgc_page_hot_to_dirty_and_release(main_cache, handle->pgc_page, false);
}
mrg_metric_set_hot_latest_time_s(main_mrg, handle->metric, 0);
@@ -336,7 +355,7 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha
PGD *data,
size_t data_size) {
time_t point_in_time_s = (time_t)(point_in_time_ut / USEC_PER_SEC);
- const time_t update_every_s = (time_t)(handle->update_every_ut / USEC_PER_SEC);
+ const uint32_t update_every_s = (uint32_t)(handle->update_every_ut / USEC_PER_SEC);
PGC_ENTRY page_entry = {
.section = (Word_t) ctx,
@@ -345,7 +364,7 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha
.end_time_s = point_in_time_s,
.size = data_size,
.data = data,
- .update_every_s = (uint32_t) update_every_s,
+ .update_every_s = update_every_s,
.hot = true
};
@@ -364,11 +383,11 @@ static void rrdeng_store_metric_create_new_page(struct rrdeng_collect_handle *ha
nd_log_limit_static_global_var(erl, 1, 0);
nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
#endif
- "DBENGINE: metric '%s' new page from %ld to %ld, update every %ld, has a conflict in main cache "
- "with existing %s%s page from %ld to %ld, update every %ld - "
+ "DBENGINE: metric '%s' new page from %ld to %ld, update every %u, has a conflict in main cache "
+ "with existing %s%s page from %ld to %ld, update every %u - "
"is it collected more than once?",
uuid,
- page_entry.start_time_s, page_entry.end_time_s, (time_t)page_entry.update_every_s,
+ page_entry.start_time_s, page_entry.end_time_s, page_entry.update_every_s,
pgc_is_page_hot(pgc_page) ? "hot" : "not-hot",
pgc_page_data(pgc_page) == PGD_EMPTY ? " gap" : "",
pgc_page_start_time_s(pgc_page), pgc_page_end_time_s(pgc_page), pgc_page_update_every_s(pgc_page)
@@ -444,14 +463,14 @@ static PGD *rrdeng_alloc_new_page_data(struct rrdeng_collect_handle *handle, siz
*data_size = size;
switch (ctx->config.page_type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
d = pgd_create(ctx->config.page_type, slots);
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
// ignore slots, and use the fixed number of slots per gorilla buffer.
// gorilla will automatically add more buffers if needed.
- d = pgd_create(ctx->config.page_type, GORILLA_BUFFER_SLOTS);
+ d = pgd_create(ctx->config.page_type, RRDENG_GORILLA_32BIT_BUFFER_SLOTS);
break;
default:
fatal("Unknown page type: %uc\n", ctx->config.page_type);
@@ -461,7 +480,7 @@ static PGD *rrdeng_alloc_new_page_data(struct rrdeng_collect_handle *handle, siz
return d;
}
-static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_handle,
+static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *sch,
const usec_t point_in_time_ut,
const NETDATA_DOUBLE n,
const NETDATA_DOUBLE min_value,
@@ -470,7 +489,7 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_
const uint16_t anomaly_count,
const SN_FLAGS flags)
{
- struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch;
struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
if(unlikely(!handle->page_data))
@@ -497,7 +516,7 @@ static void rrdeng_store_metric_append_point(STORAGE_COLLECT_HANDLE *collection_
if(unlikely(++handle->page_position >= handle->page_entries_max)) {
internal_fatal(handle->page_position > handle->page_entries_max, "DBENGINE: exceeded page max number of points");
handle->page_flags |= RRDENG_PAGE_FULL;
- rrdeng_store_metric_flush_current_page(collection_handle);
+ rrdeng_store_metric_flush_current_page(sch);
}
}
@@ -543,7 +562,7 @@ static void store_metric_next_error_log(struct rrdeng_collect_handle *handle __m
#endif
}
-void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
+void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *sch,
const usec_t point_in_time_ut,
const NETDATA_DOUBLE n,
const NETDATA_DOUBLE min_value,
@@ -554,7 +573,7 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
{
timing_step(TIMING_STEP_RRDSET_STORE_METRIC);
- struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch;
#ifdef NETDATA_INTERNAL_CHECKS
if(unlikely(point_in_time_ut > (usec_t)max_acceptable_collected_time() * USEC_PER_SEC))
@@ -571,11 +590,11 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
if(handle->pgc_page) {
if (unlikely(delta_ut < handle->update_every_ut)) {
handle->page_flags |= RRDENG_PAGE_STEP_TOO_SMALL;
- rrdeng_store_metric_flush_current_page(collection_handle);
+ rrdeng_store_metric_flush_current_page(sch);
}
else if (unlikely(delta_ut % handle->update_every_ut)) {
handle->page_flags |= RRDENG_PAGE_STEP_UNALIGNED;
- rrdeng_store_metric_flush_current_page(collection_handle);
+ rrdeng_store_metric_flush_current_page(sch);
}
else {
size_t points_gap = delta_ut / handle->update_every_ut;
@@ -583,7 +602,7 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
if (points_gap >= page_remaining_points) {
handle->page_flags |= RRDENG_PAGE_BIG_GAP;
- rrdeng_store_metric_flush_current_page(collection_handle);
+ rrdeng_store_metric_flush_current_page(sch);
}
else {
// loop to fill the gap
@@ -594,7 +613,7 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
this_ut <= stop_ut;
this_ut = handle->page_end_time_ut + handle->update_every_ut) {
rrdeng_store_metric_append_point(
- collection_handle,
+ sch,
this_ut,
NAN, NAN, NAN,
1, 0,
@@ -618,7 +637,7 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
timing_step(TIMING_STEP_DBENGINE_FIRST_CHECK);
- rrdeng_store_metric_append_point(collection_handle,
+ rrdeng_store_metric_append_point(sch,
point_in_time_ut,
n, min_value, max_value,
count, anomaly_count,
@@ -629,12 +648,12 @@ void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
* Releases the database reference from the handle for storing metrics.
* Returns 1 if it's safe to delete the dimension.
*/
-int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
- struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *sch) {
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch;
struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
handle->page_flags |= RRDENG_PAGE_COLLECT_FINALIZE;
- rrdeng_store_metric_flush_current_page(collection_handle);
+ rrdeng_store_metric_flush_current_page(sch);
rrdeng_page_alignment_release(handle->alignment);
__atomic_sub_fetch(&ctx->atomic.collectors_running, 1, __ATOMIC_RELAXED);
@@ -644,8 +663,8 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
if((handle->options & RRDENG_1ST_METRIC_WRITER) && !mrg_metric_clear_writer(main_mrg, handle->metric))
internal_fatal(true, "DBENGINE: metric is already released");
- time_t first_time_s, last_time_s, update_every_s;
- mrg_metric_get_retention(main_mrg, handle->metric, &first_time_s, &last_time_s, &update_every_s);
+ time_t first_time_s, last_time_s;
+ mrg_metric_get_retention(main_mrg, handle->metric, &first_time_s, &last_time_s, NULL);
mrg_metric_release(main_mrg, handle->metric);
freez(handle);
@@ -656,8 +675,8 @@ int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
return 0;
}
-void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) {
- struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
+void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *sch, int update_every) {
+ struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)sch;
check_and_fix_mrg_update_every(handle);
METRIC *metric = handle->metric;
@@ -667,7 +686,7 @@ void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *col
return;
handle->page_flags |= RRDENG_PAGE_UPDATE_EVERY_CHANGE;
- rrdeng_store_metric_flush_current_page(collection_handle);
+ rrdeng_store_metric_flush_current_page(sch);
mrg_metric_set_update_every(main_mrg, metric, update_every);
handle->update_every_ut = update_every_ut;
}
@@ -704,8 +723,8 @@ static void unregister_query_handle(struct rrdeng_query_handle *handle __maybe_u
* Gets a handle for loading metrics from the database.
* The handle must be released with rrdeng_load_metric_final().
*/
-void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
- struct storage_engine_query_handle *rrddim_handle,
+void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *smh,
+ struct storage_engine_query_handle *seqh,
time_t start_time_s,
time_t end_time_s,
STORAGE_PRIORITY priority)
@@ -714,7 +733,7 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
netdata_thread_disable_cancelability();
- METRIC *metric = (METRIC *)db_metric_handle;
+ METRIC *metric = (METRIC *)smh;
struct rrdengine_instance *ctx = mrg_metric_ctx(metric);
struct rrdeng_query_handle *handle;
@@ -736,7 +755,8 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
// is inserted into the main cache, to avoid scanning the journals
// again for pages matching the gap.
- time_t db_first_time_s, db_last_time_s, db_update_every_s;
+ time_t db_first_time_s, db_last_time_s;
+ uint32_t db_update_every_s;
mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s);
if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) == PAGE_IS_IN_RANGE) {
@@ -750,11 +770,11 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
mrg_metric_set_update_every_s_if_zero(main_mrg, metric, default_rrd_update_every);
}
- rrddim_handle->handle = (STORAGE_QUERY_HANDLE *) handle;
- rrddim_handle->start_time_s = handle->start_time_s;
- rrddim_handle->end_time_s = handle->end_time_s;
- rrddim_handle->priority = priority;
- rrddim_handle->backend = STORAGE_ENGINE_BACKEND_DBENGINE;
+ seqh->handle = (STORAGE_QUERY_HANDLE *) handle;
+ seqh->start_time_s = handle->start_time_s;
+ seqh->end_time_s = handle->end_time_s;
+ seqh->priority = priority;
+ seqh->seb = STORAGE_ENGINE_BACKEND_DBENGINE;
pg_cache_preload(handle);
@@ -766,16 +786,16 @@ void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle,
handle->now_s = start_time_s;
handle->dt_s = db_update_every_s;
- rrddim_handle->handle = (STORAGE_QUERY_HANDLE *) handle;
- rrddim_handle->start_time_s = handle->start_time_s;
- rrddim_handle->end_time_s = 0;
- rrddim_handle->priority = priority;
- rrddim_handle->backend = STORAGE_ENGINE_BACKEND_DBENGINE;
+ seqh->handle = (STORAGE_QUERY_HANDLE *) handle;
+ seqh->start_time_s = handle->start_time_s;
+ seqh->end_time_s = 0;
+ seqh->priority = priority;
+ seqh->seb = STORAGE_ENGINE_BACKEND_DBENGINE;
}
}
-static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_handle, bool debug_this __maybe_unused) {
- struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
+static bool rrdeng_load_page_next(struct storage_engine_query_handle *seqh, bool debug_this __maybe_unused) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle;
struct rrdengine_instance *ctx = mrg_metric_ctx(handle->metric);
if (likely(handle->page)) {
@@ -785,7 +805,7 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han
pgdc_reset(&handle->pgdc, NULL, UINT32_MAX);
}
- if (unlikely(handle->now_s > rrddim_handle->end_time_s))
+ if (unlikely(handle->now_s > seqh->end_time_s))
return false;
size_t entries = 0;
@@ -799,7 +819,7 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han
time_t page_start_time_s = pgc_page_start_time_s(handle->page);
time_t page_end_time_s = pgc_page_end_time_s(handle->page);
- time_t page_update_every_s = pgc_page_update_every_s(handle->page);
+ uint32_t page_update_every_s = pgc_page_update_every_s(handle->page);
unsigned position;
if(likely(handle->now_s >= page_start_time_s && handle->now_s <= page_end_time_s)) {
@@ -810,13 +830,13 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han
}
else {
position = (handle->now_s - page_start_time_s) * (entries - 1) / (page_end_time_s - page_start_time_s);
- time_t point_end_time_s = page_start_time_s + position * page_update_every_s;
+ time_t point_end_time_s = page_start_time_s + position * (time_t) page_update_every_s;
while(point_end_time_s < handle->now_s && position + 1 < entries) {
// https://github.com/netdata/netdata/issues/14411
// we really need a while() here, because the delta may be
// 2 points at higher tiers
position++;
- point_end_time_s = page_start_time_s + position * page_update_every_s;
+ point_end_time_s = page_start_time_s + position * (time_t) page_update_every_s;
}
handle->now_s = point_end_time_s;
}
@@ -845,11 +865,11 @@ static bool rrdeng_load_page_next(struct storage_engine_query_handle *rrddim_han
// Returns the metric and sets its timestamp into current_time
// IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags)
// IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES
-STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle) {
- struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
+STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *seqh) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle;
STORAGE_POINT sp;
- if (unlikely(handle->now_s > rrddim_handle->end_time_s)) {
+ if (unlikely(handle->now_s > seqh->end_time_s)) {
storage_point_empty(sp, handle->now_s - handle->dt_s, handle->now_s);
goto prepare_for_next_iteration;
}
@@ -857,8 +877,8 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim
if (unlikely(!handle->page || handle->position >= handle->entries)) {
// We need to get a new page
- if (!rrdeng_load_page_next(rrddim_handle, false)) {
- handle->now_s = rrddim_handle->end_time_s;
+ if (!rrdeng_load_page_next(seqh, false)) {
+ handle->now_s = seqh->end_time_s;
storage_point_empty(sp, handle->now_s - handle->dt_s, handle->now_s);
goto prepare_for_next_iteration;
}
@@ -870,7 +890,7 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim
pgdc_get_next_point(&handle->pgdc, handle->position, &sp);
prepare_for_next_iteration:
- internal_fatal(sp.end_time_s < rrddim_handle->start_time_s, "DBENGINE: this point is too old for this query");
+ internal_fatal(sp.end_time_s < seqh->start_time_s, "DBENGINE: this point is too old for this query");
internal_fatal(sp.end_time_s < handle->now_s, "DBENGINE: this point is too old for this point in time");
handle->now_s += handle->dt_s;
@@ -879,17 +899,17 @@ prepare_for_next_iteration:
return sp;
}
-int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrddim_handle) {
- struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
- return (handle->now_s > rrddim_handle->end_time_s);
+int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *seqh) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle;
+ return (handle->now_s > seqh->end_time_s);
}
/*
* Releases the database reference from the handle for loading metrics.
*/
-void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_handle)
+void rrdeng_load_metric_finalize(struct storage_engine_query_handle *seqh)
{
- struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle;
if (handle->page) {
pgc_page_release(main_cache, handle->page);
@@ -901,24 +921,24 @@ void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_hand
unregister_query_handle(handle);
rrdeng_query_handle_release(handle);
- rrddim_handle->handle = NULL;
+ seqh->handle = NULL;
netdata_thread_enable_cancelability();
}
-time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle) {
- struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
+time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *seqh) {
+ struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)seqh->handle;
if(handle->pdc) {
rrdeng_prep_wait(handle->pdc);
- if (handle->pdc->optimal_end_time_s > rrddim_handle->end_time_s)
- rrddim_handle->end_time_s = handle->pdc->optimal_end_time_s;
+ if (handle->pdc->optimal_end_time_s > seqh->end_time_s)
+ seqh->end_time_s = handle->pdc->optimal_end_time_s;
}
- return rrddim_handle->end_time_s;
+ return seqh->end_time_s;
}
-time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
- METRIC *metric = (METRIC *)db_metric_handle;
+time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *smh) {
+ METRIC *metric = (METRIC *)smh;
time_t latest_time_s = 0;
if (metric)
@@ -927,8 +947,8 @@ time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
return latest_time_s;
}
-time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
- METRIC *metric = (METRIC *)db_metric_handle;
+time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *smh) {
+ METRIC *metric = (METRIC *)smh;
time_t oldest_time_s = 0;
if (metric)
@@ -937,9 +957,9 @@ time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
return oldest_time_s;
}
-bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_uuid, time_t *first_entry_s, time_t *last_entry_s)
+bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_s, time_t *last_entry_s)
{
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
if (unlikely(!ctx)) {
netdata_log_error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__);
return false;
@@ -949,26 +969,35 @@ bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_
if (unlikely(!metric))
return false;
- time_t update_every_s;
- mrg_metric_get_retention(main_mrg, metric, first_entry_s, last_entry_s, &update_every_s);
+ mrg_metric_get_retention(main_mrg, metric, first_entry_s, last_entry_s, NULL);
mrg_metric_release(main_mrg, metric);
return true;
}
-uint64_t rrdeng_disk_space_max(STORAGE_INSTANCE *db_instance) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+uint64_t rrdeng_disk_space_max(STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
return ctx->config.max_disk_space;
}
-uint64_t rrdeng_disk_space_used(STORAGE_INSTANCE *db_instance) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+uint64_t rrdeng_disk_space_used(STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
return __atomic_load_n(&ctx->atomic.current_disk_space, __ATOMIC_RELAXED);
}
-time_t rrdeng_global_first_time_s(STORAGE_INSTANCE *db_instance) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+uint64_t rrdeng_metrics(STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
+ return __atomic_load_n(&ctx->atomic.metrics, __ATOMIC_RELAXED);
+}
+
+uint64_t rrdeng_samples(STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
+ return __atomic_load_n(&ctx->atomic.samples, __ATOMIC_RELAXED);
+}
+
+time_t rrdeng_global_first_time_s(STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
time_t t = __atomic_load_n(&ctx->atomic.first_time_s, __ATOMIC_RELAXED);
if(t == LONG_MAX || t < 0)
@@ -977,8 +1006,8 @@ time_t rrdeng_global_first_time_s(STORAGE_INSTANCE *db_instance) {
return t;
}
-size_t rrdeng_currently_collected_metrics(STORAGE_INSTANCE *db_instance) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+size_t rrdeng_currently_collected_metrics(STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
return __atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED);
}
@@ -1099,8 +1128,8 @@ void rrdeng_readiness_wait(struct rrdengine_instance *ctx) {
netdata_log_info("DBENGINE: tier %d is ready for data collection and queries", ctx->config.tier);
}
-bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance) {
- struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+bool rrdeng_is_legacy(STORAGE_INSTANCE *si) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)si;
return ctx->config.legacy;
}
@@ -1142,7 +1171,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, const char *dbfiles_path,
ctx->config.tier = (int)tier;
ctx->config.page_type = tier_page_type[tier];
- ctx->config.global_compress_alg = RRD_LZ4;
+ ctx->config.global_compress_alg = dbengine_default_compression();
if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB)
disk_space_mb = RRDENG_MIN_DISK_SPACE_MB;
ctx->config.max_disk_space = disk_space_mb * 1048576LLU;
@@ -1154,6 +1183,8 @@ int rrdeng_init(struct rrdengine_instance **ctxp, const char *dbfiles_path,
rw_spinlock_init(&ctx->njfv2idx.spinlock);
ctx->atomic.first_time_s = LONG_MAX;
+ ctx->atomic.metrics = 0;
+ ctx->atomic.samples = 0;
if (rrdeng_dbengine_spawn(ctx) && !init_rrd_files(ctx)) {
// success - we run this ctx too
diff --git a/database/engine/rrdengineapi.h b/src/database/engine/rrdengineapi.h
index 7ae0e7079..fb449cd9b 100644
--- a/database/engine/rrdengineapi.h
+++ b/src/database/engine/rrdengineapi.h
@@ -26,32 +26,32 @@ extern uint8_t tier_page_type[];
void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_t *ret_uuid);
-STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance);
-STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
-void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle);
-STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle);
-
-STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every, STORAGE_METRICS_GROUP *smg);
-void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle);
-void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every);
-void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time_ut, NETDATA_DOUBLE n,
+STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *si);
+STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *si, uuid_t *uuid);
+void rrdeng_metric_release(STORAGE_METRIC_HANDLE *smh);
+STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *smh);
+
+STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *smh, uint32_t update_every, STORAGE_METRICS_GROUP *smg);
+void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *sch);
+void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *sch, int update_every);
+void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *sch, usec_t point_in_time_ut, NETDATA_DOUBLE n,
NETDATA_DOUBLE min_value,
NETDATA_DOUBLE max_value,
uint16_t count,
uint16_t anomaly_count,
SN_FLAGS flags);
-int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle);
+int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *sch);
-void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrddim_handle,
+void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *smh, struct storage_engine_query_handle *seqh,
time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority);
-STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle);
+STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *seqh);
-int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrddim_handle);
-void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrddim_handle);
-time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
-time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle);
-time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle);
+int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *seqh);
+void rrdeng_load_metric_finalize(struct storage_engine_query_handle *seqh);
+time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *smh);
+time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *smh);
+time_t rrdeng_load_align_to_optimal_before(struct storage_engine_query_handle *seqh);
void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
@@ -64,10 +64,10 @@ void rrdeng_exit_mode(struct rrdengine_instance *ctx);
int rrdeng_exit(struct rrdengine_instance *ctx);
void rrdeng_prepare_exit(struct rrdengine_instance *ctx);
-bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_uuid, time_t *first_entry_s, time_t *last_entry_s);
+bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_s, time_t *last_entry_s);
-extern STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid);
-extern void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg);
+extern STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *si, uuid_t *uuid);
+extern void rrdeng_metrics_group_release(STORAGE_INSTANCE *si, STORAGE_METRICS_GROUP *smg);
typedef struct rrdengine_size_statistics {
size_t default_granularity_secs;
@@ -221,9 +221,6 @@ struct rrdeng_cache_efficiency_stats rrdeng_get_cache_efficiency_stats(void);
RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx);
size_t rrdeng_collectors_running(struct rrdengine_instance *ctx);
-bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance);
-
-uint64_t rrdeng_disk_space_max(STORAGE_INSTANCE *db_instance);
-uint64_t rrdeng_disk_space_used(STORAGE_INSTANCE *db_instance);
+bool rrdeng_is_legacy(STORAGE_INSTANCE *si);
#endif /* NETDATA_RRDENGINEAPI_H */
diff --git a/database/engine/rrdenginelib.c b/src/database/engine/rrdenginelib.c
index dc581d98d..dc581d98d 100644
--- a/database/engine/rrdenginelib.c
+++ b/src/database/engine/rrdenginelib.c
diff --git a/database/engine/rrdenginelib.h b/src/database/engine/rrdenginelib.h
index a0febd4f4..a0febd4f4 100644
--- a/database/engine/rrdenginelib.h
+++ b/src/database/engine/rrdenginelib.h
diff --git a/database/engine/metadata_log/README.md b/src/go/collectors/go.d.plugin/agent/testdata/agent-empty.conf
index e69de29bb..e69de29bb 100644
--- a/database/engine/metadata_log/README.md
+++ b/src/go/collectors/go.d.plugin/agent/testdata/agent-empty.conf