summaryrefslogtreecommitdiffstats
path: root/database/engine
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--database/engine/cache.c68
-rw-r--r--database/engine/cache.h2
-rw-r--r--database/engine/datafile.c117
-rw-r--r--database/engine/datafile.h6
-rw-r--r--database/engine/journalfile.c379
-rw-r--r--database/engine/journalfile.h21
-rw-r--r--database/engine/metric.c578
-rw-r--r--database/engine/metric.h35
-rw-r--r--database/engine/pagecache.c84
-rw-r--r--database/engine/pdc.c30
-rw-r--r--database/engine/rrdengine.c123
-rw-r--r--database/engine/rrdengine.h60
-rwxr-xr-xdatabase/engine/rrdengineapi.c69
-rw-r--r--database/engine/rrdengineapi.h3
-rw-r--r--database/engine/rrdenginelib.c22
-rw-r--r--database/engine/rrdenginelib.h2
16 files changed, 923 insertions, 676 deletions
diff --git a/database/engine/cache.c b/database/engine/cache.c
index bc3ba6b6a..7a9ccf8d1 100644
--- a/database/engine/cache.c
+++ b/database/engine/cache.c
@@ -112,8 +112,9 @@ struct pgc {
PGC_CACHE_LINE_PADDING(0);
struct pgc_index {
- netdata_rwlock_t rwlock;
+ RW_SPINLOCK rw_spinlock;
Pvoid_t sections_judy;
+ PGC_CACHE_LINE_PADDING(0);
} *index;
PGC_CACHE_LINE_PADDING(1);
@@ -222,43 +223,40 @@ static inline size_t pgc_indexing_partition(PGC *cache, Word_t metric_id) {
}
static inline void pgc_index_read_lock(PGC *cache, size_t partition) {
- netdata_rwlock_rdlock(&cache->index[partition].rwlock);
+ rw_spinlock_read_lock(&cache->index[partition].rw_spinlock);
}
static inline void pgc_index_read_unlock(PGC *cache, size_t partition) {
- netdata_rwlock_unlock(&cache->index[partition].rwlock);
+ rw_spinlock_read_unlock(&cache->index[partition].rw_spinlock);
}
-//static inline bool pgc_index_write_trylock(PGC *cache, size_t partition) {
-// return !netdata_rwlock_trywrlock(&cache->index[partition].rwlock);
-//}
static inline void pgc_index_write_lock(PGC *cache, size_t partition) {
- netdata_rwlock_wrlock(&cache->index[partition].rwlock);
+ rw_spinlock_write_lock(&cache->index[partition].rw_spinlock);
}
static inline void pgc_index_write_unlock(PGC *cache, size_t partition) {
- netdata_rwlock_unlock(&cache->index[partition].rwlock);
+ rw_spinlock_write_unlock(&cache->index[partition].rw_spinlock);
}
static inline bool pgc_ll_trylock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) {
- return netdata_spinlock_trylock(&ll->spinlock);
+ return spinlock_trylock(&ll->spinlock);
}
static inline void pgc_ll_lock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) {
- netdata_spinlock_lock(&ll->spinlock);
+ spinlock_lock(&ll->spinlock);
}
static inline void pgc_ll_unlock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) {
- netdata_spinlock_unlock(&ll->spinlock);
+ spinlock_unlock(&ll->spinlock);
}
static inline bool page_transition_trylock(PGC *cache __maybe_unused, PGC_PAGE *page) {
- return netdata_spinlock_trylock(&page->transition_spinlock);
+ return spinlock_trylock(&page->transition_spinlock);
}
static inline void page_transition_lock(PGC *cache __maybe_unused, PGC_PAGE *page) {
- netdata_spinlock_lock(&page->transition_spinlock);
+ spinlock_lock(&page->transition_spinlock);
}
static inline void page_transition_unlock(PGC *cache __maybe_unused, PGC_PAGE *page) {
- netdata_spinlock_unlock(&page->transition_spinlock);
+ spinlock_unlock(&page->transition_spinlock);
}
// ----------------------------------------------------------------------------
@@ -267,9 +265,9 @@ static inline void page_transition_unlock(PGC *cache __maybe_unused, PGC_PAGE *p
static inline size_t cache_usage_per1000(PGC *cache, size_t *size_to_evict) {
if(size_to_evict)
- netdata_spinlock_lock(&cache->usage.spinlock);
+ spinlock_lock(&cache->usage.spinlock);
- else if(!netdata_spinlock_trylock(&cache->usage.spinlock))
+ else if(!spinlock_trylock(&cache->usage.spinlock))
return __atomic_load_n(&cache->usage.per1000, __ATOMIC_RELAXED);
size_t current_cache_size;
@@ -319,7 +317,7 @@ static inline size_t cache_usage_per1000(PGC *cache, size_t *size_to_evict) {
__atomic_store_n(&cache->stats.wanted_cache_size, wanted_cache_size, __ATOMIC_RELAXED);
__atomic_store_n(&cache->stats.current_cache_size, current_cache_size, __ATOMIC_RELAXED);
- netdata_spinlock_unlock(&cache->usage.spinlock);
+ spinlock_unlock(&cache->usage.spinlock);
if(size_to_evict) {
size_t target = (size_t)((unsigned long long)wanted_cache_size * (unsigned long long)cache->config.evict_low_threshold_per1000 / 1000ULL);
@@ -422,7 +420,7 @@ static void pgc_section_pages_static_aral_init(void) {
static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
if(unlikely(!pgc_section_pages_aral)) {
- netdata_spinlock_lock(&spinlock);
+ spinlock_lock(&spinlock);
// we have to check again
if(!pgc_section_pages_aral)
@@ -433,7 +431,7 @@ static void pgc_section_pages_static_aral_init(void) {
65536, NULL,
NULL, NULL, false, false);
- netdata_spinlock_unlock(&spinlock);
+ spinlock_unlock(&spinlock);
}
}
@@ -1255,7 +1253,7 @@ static PGC_PAGE *page_add(PGC *cache, PGC_ENTRY *entry, bool *added) {
page->update_every_s = entry->update_every_s,
page->data = entry->data;
page->assumed_size = page_assumed_size(cache, entry->size);
- netdata_spinlock_init(&page->transition_spinlock);
+ spinlock_init(&page->transition_spinlock);
page->link.prev = NULL;
page->link.next = NULL;
@@ -1378,7 +1376,7 @@ static PGC_PAGE *page_find_and_acquire(PGC *cache, Word_t section, Word_t metric
Word_t time = start_time_s;
// find the previous page
- page_ptr = JudyLLast(*pages_judy_pptr, &time, PJE0);
+ page_ptr = JudyLPrev(*pages_judy_pptr, &time, PJE0);
if(unlikely(page_ptr == PJERR))
fatal("DBENGINE CACHE: corrupted page in pages judy array #2");
@@ -1779,11 +1777,11 @@ PGC *pgc_create(const char *name,
cache->index = callocz(cache->config.partitions, sizeof(struct pgc_index));
for(size_t part = 0; part < cache->config.partitions ; part++)
- netdata_rwlock_init(&cache->index[part].rwlock);
+ rw_spinlock_init(&cache->index[part].rw_spinlock);
- netdata_spinlock_init(&cache->hot.spinlock);
- netdata_spinlock_init(&cache->dirty.spinlock);
- netdata_spinlock_init(&cache->clean.spinlock);
+ spinlock_init(&cache->hot.spinlock);
+ spinlock_init(&cache->dirty.spinlock);
+ spinlock_init(&cache->clean.spinlock);
cache->hot.flags = PGC_PAGE_HOT;
cache->hot.linked_list_in_sections_judy = true;
@@ -1849,12 +1847,12 @@ void pgc_destroy(PGC *cache) {
free_all_unreferenced_clean_pages(cache);
if(PGC_REFERENCED_PAGES(cache))
- error("DBENGINE CACHE: there are %zu referenced cache pages - leaving the cache allocated", PGC_REFERENCED_PAGES(cache));
+ netdata_log_error("DBENGINE CACHE: there are %zu referenced cache pages - leaving the cache allocated", PGC_REFERENCED_PAGES(cache));
else {
pointer_destroy_index(cache);
- for(size_t part = 0; part < cache->config.partitions ; part++)
- netdata_rwlock_destroy(&cache->index[part].rwlock);
+// for(size_t part = 0; part < cache->config.partitions ; part++)
+// netdata_rwlock_destroy(&cache->index[part].rw_spinlock);
#ifdef PGC_WITH_ARAL
for(size_t part = 0; part < cache->config.partitions ; part++)
@@ -2091,8 +2089,8 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_
}
struct section_pages *sp = *section_pages_pptr;
- if(!netdata_spinlock_trylock(&sp->migration_to_v2_spinlock)) {
- info("DBENGINE: migration to journal v2 for datafile %u is postponed, another jv2 indexer is already running for this section", datafile_fileno);
+ if(!spinlock_trylock(&sp->migration_to_v2_spinlock)) {
+ netdata_log_info("DBENGINE: migration to journal v2 for datafile %u is postponed, another jv2 indexer is already running for this section", datafile_fileno);
pgc_ll_unlock(cache, &cache->hot);
return;
}
@@ -2205,7 +2203,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_
pgc_ll_lock(cache, &cache->hot);
}
- netdata_spinlock_unlock(&sp->migration_to_v2_spinlock);
+ spinlock_unlock(&sp->migration_to_v2_spinlock);
pgc_ll_unlock(cache, &cache->hot);
// callback
@@ -2355,7 +2353,7 @@ void *unittest_stress_test_collector(void *ptr) {
heartbeat_init(&hb);
while(!__atomic_load_n(&pgc_uts.stop, __ATOMIC_RELAXED)) {
- // info("COLLECTOR %zu: collecting metrics %zu to %zu, from %ld to %lu", id, metric_start, metric_end, start_time_t, start_time_t + pgc_uts.points_per_page);
+ // netdata_log_info("COLLECTOR %zu: collecting metrics %zu to %zu, from %ld to %lu", id, metric_start, metric_end, start_time_t, start_time_t + pgc_uts.points_per_page);
netdata_thread_disable_cancelability();
@@ -2485,7 +2483,7 @@ void *unittest_stress_test_service(void *ptr) {
}
static void unittest_stress_test_save_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused) {
- // info("SAVE %zu pages", entries);
+ // netdata_log_info("SAVE %zu pages", entries);
if(!pgc_uts.stop) {
usec_t t = pgc_uts.time_per_flush_ut;
@@ -2625,7 +2623,7 @@ void unittest_stress_test(void) {
if(stats.events_flush_critical > old_stats.events_flush_critical)
flushing_status = "F";
- info("PGS %5zuk +%4zuk/-%4zuk "
+ netdata_log_info("PGS %5zuk +%4zuk/-%4zuk "
"| RF %5zuk "
"| HOT %5zuk +%4zuk -%4zuk "
"| DRT %s %5zuk +%4zuk -%4zuk "
@@ -2651,7 +2649,7 @@ void unittest_stress_test(void) {
#endif
);
}
- info("Waiting for threads to stop...");
+ netdata_log_info("Waiting for threads to stop...");
__atomic_store_n(&pgc_uts.stop, true, __ATOMIC_RELAXED);
netdata_thread_join(service_thread, NULL);
diff --git a/database/engine/cache.h b/database/engine/cache.h
index 65e6a6137..1486fdc16 100644
--- a/database/engine/cache.h
+++ b/database/engine/cache.h
@@ -31,7 +31,7 @@ typedef struct pgc_entry {
uint8_t *custom_data;
} PGC_ENTRY;
-#define PGC_CACHE_LINE_PADDING(x) uint8_t padding##x[128]
+#define PGC_CACHE_LINE_PADDING(x) uint8_t padding##x[64]
struct pgc_queue_statistics {
size_t entries;
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index 8c413d8dc..d5c1285be 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -1,11 +1,15 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool having_lock)
{
- uv_rwlock_wrlock(&ctx->datafiles.rwlock);
+ if(!having_lock)
+ uv_rwlock_wrlock(&ctx->datafiles.rwlock);
+
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(ctx->datafiles.first, datafile, prev, next);
- uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
+
+ if(!having_lock)
+ uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
}
void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
@@ -27,9 +31,9 @@ static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_insta
datafile->users.available = true;
- netdata_spinlock_init(&datafile->users.spinlock);
- netdata_spinlock_init(&datafile->writers.spinlock);
- netdata_spinlock_init(&datafile->extent_queries.spinlock);
+ spinlock_init(&datafile->users.spinlock);
+ spinlock_init(&datafile->writers.spinlock);
+ spinlock_init(&datafile->extent_queries.spinlock);
return datafile;
}
@@ -37,7 +41,7 @@ static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_insta
bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) {
bool ret;
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
if(df->users.available) {
ret = true;
@@ -47,25 +51,25 @@ bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS re
else
ret = false;
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
return ret;
}
void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) {
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
if(!df->users.lockers)
fatal("DBENGINE DATAFILE: cannot release a datafile that is not acquired");
df->users.lockers--;
df->users.lockers_by_reason[reason]--;
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
}
bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
bool can_be_deleted = false;
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
df->users.available = false;
if(!df->users.lockers)
@@ -75,9 +79,9 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
// there are lockers
// evict any pages referencing this in the open cache
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
pgc_open_evict_clean_pages_of_datafile(open_cache, df);
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
if(!df->users.lockers)
can_be_deleted = true;
@@ -86,12 +90,12 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
// there are lockers still
// count the number of pages referencing this in the open cache
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
usec_t time_to_scan_ut = now_monotonic_usec();
size_t clean_pages_in_open_cache = pgc_count_clean_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df);
size_t hot_pages_in_open_cache = pgc_count_hot_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df);
time_to_scan_ut = now_monotonic_usec() - time_to_scan_ut;
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
if(!df->users.lockers)
can_be_deleted = true;
@@ -149,7 +153,7 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
time_to_scan_ut);
}
}
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
return can_be_deleted;
}
@@ -171,7 +175,7 @@ int close_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_close(NULL, &req, datafile->file, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -190,7 +194,7 @@ int unlink_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -211,21 +215,21 @@ int destroy_data_file_unsafe(struct rrdengine_datafile *datafile)
ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, datafile->file, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -268,7 +272,7 @@ int create_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
fatal_assert(req.result < 0);
- error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
ctx_io_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -299,7 +303,7 @@ static int check_data_file_superblock(uv_file file)
ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_read: %s", uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_read: %s", uv_strerror(ret));
uv_fs_req_cleanup(&req);
goto error;
}
@@ -309,7 +313,7 @@ static int check_data_file_superblock(uv_file file)
if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) ||
strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) ||
superblock->tier != 1) {
- error("DBENGINE: file has invalid superblock.");
+ netdata_log_error("DBENGINE: file has invalid superblock.");
ret = UV_EINVAL;
} else {
ret = 0;
@@ -334,7 +338,7 @@ static int load_data_file(struct rrdengine_datafile *datafile)
ctx_fs_error(ctx);
return fd;
}
- info("DBENGINE: initializing data file \"%s\".", path);
+ netdata_log_info("DBENGINE: initializing data file \"%s\".", path);
ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
if (ret)
@@ -350,14 +354,14 @@ static int load_data_file(struct rrdengine_datafile *datafile)
datafile->file = file;
datafile->pos = file_size;
- info("DBENGINE: data file \"%s\" initialized (size:%"PRIu64").", path, file_size);
+ netdata_log_info("DBENGINE: data file \"%s\" initialized (size:%"PRIu64").", path, file_size);
return 0;
error:
error = ret;
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -390,11 +394,11 @@ static int scan_data_files(struct rrdengine_instance *ctx)
if (ret < 0) {
fatal_assert(req.result < 0);
uv_fs_req_cleanup(&req);
- error("DBENGINE: uv_fs_scandir(%s): %s", ctx->config.dbfiles_path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_scandir(%s): %s", ctx->config.dbfiles_path, uv_strerror(ret));
ctx_fs_error(ctx);
return ret;
}
- info("DBENGINE: found %d files in path %s", ret, ctx->config.dbfiles_path);
+ netdata_log_info("DBENGINE: found %d files in path %s", ret, ctx->config.dbfiles_path);
datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
@@ -410,11 +414,12 @@ static int scan_data_files(struct rrdengine_instance *ctx)
freez(datafiles);
return 0;
}
- if (matched_files == MAX_DATAFILES) {
- error("DBENGINE: warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
- }
+
+ if (matched_files == MAX_DATAFILES)
+ netdata_log_error("DBENGINE: warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
+
qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
- /* TODO: change this when tiering is implemented */
+
ctx->atomic.last_fileno = datafiles[matched_files - 1]->fileno;
for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
@@ -422,9 +427,9 @@ static int scan_data_files(struct rrdengine_instance *ctx)
datafile = datafiles[i];
ret = load_data_file(datafile);
- if (0 != ret) {
+ if (0 != ret)
must_delete_pair = 1;
- }
+
journalfile = journalfile_alloc_and_init(datafile);
ret = journalfile_load(ctx, journalfile, datafile);
if (0 != ret) {
@@ -432,19 +437,20 @@ static int scan_data_files(struct rrdengine_instance *ctx)
close_data_file(datafile);
must_delete_pair = 1;
}
+
if (must_delete_pair) {
char path[RRDENG_PATH_MAX];
- error("DBENGINE: deleting invalid data and journal file pair.");
+ netdata_log_error("DBENGINE: deleting invalid data and journal file pair.");
ret = journalfile_unlink(journalfile);
if (!ret) {
journalfile_v1_generate_path(datafile, path, sizeof(path));
- info("DBENGINE: deleted journal file \"%s\".", path);
+ netdata_log_info("DBENGINE: deleted journal file \"%s\".", path);
}
ret = unlink_data_file(datafile);
if (!ret) {
generate_datafilepath(datafile, path, sizeof(path));
- info("DBENGINE: deleted data file \"%s\".", path);
+ netdata_log_info("DBENGINE: deleted data file \"%s\".", path);
}
freez(journalfile);
freez(datafile);
@@ -453,8 +459,9 @@ static int scan_data_files(struct rrdengine_instance *ctx)
}
ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos);
- datafile_list_insert(ctx, datafile);
+ datafile_list_insert(ctx, datafile, false);
}
+
matched_files -= failed_to_load;
freez(datafiles);
@@ -462,7 +469,7 @@ static int scan_data_files(struct rrdengine_instance *ctx)
}
/* Creates a datafile and a journalfile pair */
-int create_new_datafile_pair(struct rrdengine_instance *ctx)
+int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock)
{
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_creation_started, 1, __ATOMIC_RELAXED);
@@ -472,14 +479,14 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx)
int ret;
char path[RRDENG_PATH_MAX];
- info("DBENGINE: creating new data and journal files in path %s", ctx->config.dbfiles_path);
+ netdata_log_info("DBENGINE: creating new data and journal files in path %s", ctx->config.dbfiles_path);
datafile = datafile_alloc_and_init(ctx, 1, fileno);
ret = create_data_file(datafile);
if(ret)
goto error_after_datafile;
generate_datafilepath(datafile, path, sizeof(path));
- info("DBENGINE: created data file \"%s\".", path);
+ netdata_log_info("DBENGINE: created data file \"%s\".", path);
journalfile = journalfile_alloc_and_init(datafile);
ret = journalfile_create(journalfile, datafile);
@@ -487,10 +494,10 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx)
goto error_after_journalfile;
journalfile_v1_generate_path(datafile, path, sizeof(path));
- info("DBENGINE: created journal file \"%s\".", path);
+ netdata_log_info("DBENGINE: created journal file \"%s\".", path);
ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos);
- datafile_list_insert(ctx, datafile);
+ datafile_list_insert(ctx, datafile, having_lock);
ctx_last_fileno_increment(ctx);
return 0;
@@ -514,20 +521,20 @@ int init_data_files(struct rrdengine_instance *ctx)
fatal_assert(0 == uv_rwlock_init(&ctx->datafiles.rwlock));
ret = scan_data_files(ctx);
if (ret < 0) {
- error("DBENGINE: failed to scan path \"%s\".", ctx->config.dbfiles_path);
+ netdata_log_error("DBENGINE: failed to scan path \"%s\".", ctx->config.dbfiles_path);
return ret;
} else if (0 == ret) {
- info("DBENGINE: data files not found, creating in path \"%s\".", ctx->config.dbfiles_path);
+ netdata_log_info("DBENGINE: data files not found, creating in path \"%s\".", ctx->config.dbfiles_path);
ctx->atomic.last_fileno = 0;
- ret = create_new_datafile_pair(ctx);
+ ret = create_new_datafile_pair(ctx, false);
if (ret) {
- error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->config.dbfiles_path);
+ netdata_log_error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->config.dbfiles_path);
return ret;
}
}
else {
if (ctx->loading.create_new_datafile_pair)
- create_new_datafile_pair(ctx);
+ create_new_datafile_pair(ctx, false);
while(rrdeng_ctx_exceeded_disk_quota(ctx))
datafile_delete(ctx, ctx->datafiles.first, false, false);
@@ -545,7 +552,7 @@ void finalize_data_files(struct rrdengine_instance *ctx)
logged = false;
while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED)) {
if(!logged) {
- info("Waiting for inflight flush to finish on tier %d...", ctx->config.tier);
+ netdata_log_info("Waiting for inflight flush to finish on tier %d...", ctx->config.tier);
logged = true;
}
sleep_usec(100 * USEC_PER_MS);
@@ -559,7 +566,7 @@ void finalize_data_files(struct rrdengine_instance *ctx)
size_t iterations = 100;
while(!datafile_acquire_for_deletion(datafile) && datafile != ctx->datafiles.first->prev && --iterations > 0) {
if(!logged) {
- info("Waiting to acquire data file %u of tier %d to close it...", datafile->fileno, ctx->config.tier);
+ netdata_log_info("Waiting to acquire data file %u of tier %d to close it...", datafile->fileno, ctx->config.tier);
logged = true;
}
sleep_usec(100 * USEC_PER_MS);
@@ -569,14 +576,14 @@ void finalize_data_files(struct rrdengine_instance *ctx)
bool available = false;
do {
uv_rwlock_wrlock(&ctx->datafiles.rwlock);
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true;
if(!available) {
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
if(!logged) {
- info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->config.tier);
+ netdata_log_info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->config.tier);
logged = true;
}
sleep_usec(100 * USEC_PER_MS);
@@ -586,7 +593,7 @@ void finalize_data_files(struct rrdengine_instance *ctx)
journalfile_close(journalfile, datafile);
close_data_file(datafile);
datafile_list_delete_unsafe(ctx, datafile);
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
freez(journalfile);
diff --git a/database/engine/datafile.h b/database/engine/datafile.h
index a08f3ae04..569f1b0a2 100644
--- a/database/engine/datafile.h
+++ b/database/engine/datafile.h
@@ -21,7 +21,7 @@ struct rrdengine_instance;
#endif
#define MIN_DATAFILE_SIZE (4LU * 1024LU * 1024LU)
-#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */
+#define MAX_DATAFILES (65536 * 4) /* Supports up to 64TiB for now */
#define TARGET_DATAFILES (50)
typedef enum __attribute__ ((__packed__)) {
@@ -74,14 +74,14 @@ bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS re
void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason);
bool datafile_acquire_for_deletion(struct rrdengine_datafile *df);
-void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
+void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool having_lock);
void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile);
void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen);
int close_data_file(struct rrdengine_datafile *datafile);
int unlink_data_file(struct rrdengine_datafile *datafile);
int destroy_data_file_unsafe(struct rrdengine_datafile *datafile);
int create_data_file(struct rrdengine_datafile *datafile);
-int create_new_datafile_pair(struct rrdengine_instance *ctx);
+int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock);
int init_data_files(struct rrdengine_instance *ctx);
void finalize_data_files(struct rrdengine_instance *ctx);
diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c
index 9998ee540..24d3c1c6d 100644
--- a/database/engine/journalfile.c
+++ b/database/engine/journalfile.c
@@ -1,57 +1,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-
-// DBENGINE2: Helper
-
-static void update_metric_retention_and_granularity_by_uuid(
- struct rrdengine_instance *ctx, uuid_t *uuid,
- time_t first_time_s, time_t last_time_s,
- time_t update_every_s, time_t now_s)
-{
- if(unlikely(last_time_s > now_s)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
- "fixing last time to now",
- first_time_s, last_time_s, now_s);
- last_time_s = now_s;
- }
-
- if (unlikely(first_time_s > last_time_s)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
- "fixing first time to last time",
- first_time_s, last_time_s, now_s);
-
- first_time_s = last_time_s;
- }
-
- if (unlikely(first_time_s == 0 || last_time_s == 0)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
- "using them as-is",
- first_time_s, last_time_s, now_s);
- }
-
- bool added = false;
- METRIC *metric = mrg_metric_get_and_acquire(main_mrg, uuid, (Word_t) ctx);
- if (!metric) {
- MRG_ENTRY entry = {
- .section = (Word_t) ctx,
- .first_time_s = first_time_s,
- .last_time_s = last_time_s,
- .latest_update_every_s = (uint32_t) update_every_s
- };
- uuid_copy(entry.uuid, *uuid);
- metric = mrg_metric_add_and_acquire(main_mrg, entry, &added);
- }
-
- if (likely(!added))
- mrg_metric_expand_retention(main_mrg, metric, first_time_s, last_time_s, update_every_s);
-
- mrg_metric_release(main_mrg, metric);
-}
-
static void after_extent_write_journalfile_v1_io(uv_fs_t* req)
{
worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB);
@@ -60,12 +9,12 @@ static void after_extent_write_journalfile_v1_io(uv_fs_t* req)
struct generic_io_descriptor *io_descr = &wal->io_descr;
struct rrdengine_instance *ctx = io_descr->ctx;
- debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
+ netdata_log_debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
if (req->result < 0) {
ctx_io_error(ctx);
- error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
+ netdata_log_error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
} else {
- debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
+ netdata_log_debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
}
uv_fs_req_cleanup(req);
@@ -92,10 +41,10 @@ void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengin
io_descr->buf = wal->buf;
io_descr->bytes = wal->buf_size;
- netdata_spinlock_lock(&journalfile->unsafe.spinlock);
+ spinlock_lock(&journalfile->unsafe.spinlock);
io_descr->pos = journalfile->unsafe.pos;
journalfile->unsafe.pos += wal->buf_size;
- netdata_spinlock_unlock(&journalfile->unsafe.spinlock);
+ spinlock_unlock(&journalfile->unsafe.spinlock);
io_descr->req.data = wal;
io_descr->data = journalfile;
@@ -122,10 +71,129 @@ void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str
datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno);
}
+// ----------------------------------------------------------------------------
+
+struct rrdengine_datafile *njfv2idx_find_and_acquire_j2_header(NJFV2IDX_FIND_STATE *s) {
+ struct rrdengine_datafile *datafile = NULL;
+
+ rw_spinlock_read_lock(&s->ctx->njfv2idx.spinlock);
+
+ Pvoid_t *PValue = NULL;
+
+ if(unlikely(!s->init)) {
+ s->init = true;
+ s->last = s->wanted_start_time_s;
+
+ PValue = JudyLPrev(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
+ if (unlikely(PValue == PJERR))
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if(!PValue) {
+ s->last = 0;
+ PValue = JudyLFirst(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
+ if (unlikely(PValue == PJERR))
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if(!PValue)
+ s->last = s->wanted_start_time_s;
+ }
+ }
+
+ while(1) {
+ if (likely(!PValue)) {
+ PValue = JudyLNext(s->ctx->njfv2idx.JudyL, &s->last, PJE0);
+ if (unlikely(PValue == PJERR))
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if(!PValue) {
+ // cannot find anything after that point
+ datafile = NULL;
+ break;
+ }
+ }
+
+ datafile = *PValue;
+ TIME_RANGE_COMPARE rc = is_page_in_time_range(datafile->journalfile->v2.first_time_s,
+ datafile->journalfile->v2.last_time_s,
+ s->wanted_start_time_s,
+ s->wanted_end_time_s);
+
+ if(rc == PAGE_IS_IN_RANGE) {
+ // this is good to return
+ break;
+ }
+ else if(rc == PAGE_IS_IN_THE_PAST) {
+ // continue to get the next
+ datafile = NULL;
+ PValue = NULL;
+ continue;
+ }
+ else /* PAGE_IS_IN_THE_FUTURE */ {
+ // we finished - no more datafiles
+ datafile = NULL;
+ PValue = NULL;
+ break;
+ }
+ }
+
+ if(datafile)
+ s->j2_header_acquired = journalfile_v2_data_acquire(datafile->journalfile, NULL,
+ s->wanted_start_time_s,
+ s->wanted_end_time_s);
+ else
+ s->j2_header_acquired = NULL;
+
+ rw_spinlock_read_unlock(&s->ctx->njfv2idx.spinlock);
+
+ return datafile;
+}
+
+static void njfv2idx_add(struct rrdengine_datafile *datafile) {
+ internal_fatal(datafile->journalfile->v2.last_time_s <= 0, "DBENGINE: NJFV2IDX trying to index a journal file with invalid first_time_s");
+
+ rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock);
+ datafile->journalfile->njfv2idx.indexed_as = datafile->journalfile->v2.last_time_s;
+
+ do {
+ internal_fatal(datafile->journalfile->njfv2idx.indexed_as <= 0, "DBENGINE: NJFV2IDX journalfile is already indexed");
+
+ Pvoid_t *PValue = JudyLIns(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0);
+ if (!PValue || PValue == PJERR)
+ fatal("DBENGINE: NJFV2IDX corrupted judy array");
+
+ if (unlikely(*PValue)) {
+ // already there
+ datafile->journalfile->njfv2idx.indexed_as++;
+ }
+ else {
+ *PValue = datafile;
+ break;
+ }
+ } while(0);
+
+ rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock);
+}
+
+static void njfv2idx_remove(struct rrdengine_datafile *datafile) {
+ internal_fatal(!datafile->journalfile->njfv2idx.indexed_as, "DBENGINE: NJFV2IDX journalfile to remove is not indexed");
+
+ rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock);
+
+ int rc = JudyLDel(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0);
+ (void)rc;
+ internal_fatal(!rc, "DBENGINE: NJFV2IDX cannot remove entry");
+
+ datafile->journalfile->njfv2idx.indexed_as = 0;
+
+ rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock);
+}
+
+// ----------------------------------------------------------------------------
+
static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) {
struct journal_v2_header *j2_header = NULL;
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
if(!journalfile->mmap.data) {
journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0);
@@ -136,9 +204,9 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
journalfile->mmap.data = NULL;
journalfile->mmap.size = 0;
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED);
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
ctx_fs_error(journalfile->datafile->ctx);
}
@@ -147,12 +215,21 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
madvise_dontfork(journalfile->mmap.data, journalfile->mmap.size);
madvise_dontdump(journalfile->mmap.data, journalfile->mmap.size);
- madvise_random(journalfile->mmap.data, journalfile->mmap.size);
- madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size);
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED;
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ JOURNALFILE_FLAGS flags = journalfile->v2.flags;
+ spinlock_unlock(&journalfile->v2.spinlock);
+
+ if(flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION) {
+ // we need the entire metrics directory into memory to process it
+ madvise_willneed(journalfile->mmap.data, journalfile->v2.size_of_directory);
+ }
+ else {
+ // let the kernel know that we don't want read-ahead on this file
+ madvise_random(journalfile->mmap.data, journalfile->mmap.size);
+ // madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size);
+ }
}
}
@@ -163,7 +240,7 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin
*data_size = journalfile->mmap.size;
}
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
return j2_header;
}
@@ -173,20 +250,20 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo
if(!have_locks) {
if(!wait) {
- if (!netdata_spinlock_trylock(&journalfile->mmap.spinlock))
+ if (!spinlock_trylock(&journalfile->mmap.spinlock))
return false;
}
else
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
if(!wait) {
- if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) {
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ if(!spinlock_trylock(&journalfile->v2.spinlock)) {
+ spinlock_unlock(&journalfile->mmap.spinlock);
return false;
}
}
else
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
}
if(!journalfile->v2.refcount) {
@@ -194,7 +271,7 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo
if (munmap(journalfile->mmap.data, journalfile->mmap.size)) {
char path[RRDENG_PATH_MAX];
journalfile_v2_generate_path(journalfile->datafile, path, sizeof(path));
- error("DBENGINE: failed to unmap index file '%s'", path);
+ netdata_log_error("DBENGINE: failed to unmap index file '%s'", path);
internal_fatal(true, "DBENGINE: failed to unmap file '%s'", path);
ctx_fs_error(journalfile->datafile->ctx);
}
@@ -209,8 +286,8 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo
}
if(!have_locks) {
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
}
return unmounted;
@@ -230,7 +307,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) {
for (datafile = ctx->datafiles.first; datafile; datafile = datafile->next) {
struct rrdengine_journalfile *journalfile = datafile->journalfile;
- if(!netdata_spinlock_trylock(&journalfile->v2.spinlock))
+ if(!spinlock_trylock(&journalfile->v2.spinlock))
continue;
bool unmount = false;
@@ -244,7 +321,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) {
// 2 minutes have passed since last use
unmount = true;
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
if (unmount)
journalfile_v2_mounted_data_unmount(journalfile, false, false);
@@ -254,7 +331,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) {
}
struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) {
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED);
@@ -276,7 +353,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi
}
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
if(do_we_need_it)
return journalfile_v2_mounted_data_get(journalfile, data_size);
@@ -285,7 +362,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi
}
void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data");
internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile");
@@ -300,7 +377,7 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION)
unmount = true;
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
if(unmount)
journalfile_v2_mounted_data_unmount(journalfile, false, true);
@@ -308,25 +385,25 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) {
bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) {
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE);
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
return has_data;
}
size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) {
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
size_t data_size = journalfile->mmap.size;
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
return data_size;
}
void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) {
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd");
internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data");
@@ -341,22 +418,27 @@ void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd,
struct journal_v2_header *j2_header = journalfile->mmap.data;
journalfile->v2.first_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC);
journalfile->v2.last_time_s = (time_t)(j2_header->end_time_ut / USEC_PER_SEC);
+ journalfile->v2.size_of_directory = j2_header->metric_offset + j2_header->metric_count * sizeof(struct journal_metric_list);
journalfile_v2_mounted_data_unmount(journalfile, true, true);
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
+
+ njfv2idx_add(journalfile->datafile);
}
static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) {
+ njfv2idx_remove(journalfile->datafile);
+
bool has_references = false;
do {
if (has_references)
sleep_usec(10 * USEC_PER_MS);
- netdata_spinlock_lock(&journalfile->mmap.spinlock);
- netdata_spinlock_lock(&journalfile->v2.spinlock);
+ spinlock_lock(&journalfile->mmap.spinlock);
+ spinlock_lock(&journalfile->v2.spinlock);
if(journalfile_v2_mounted_data_unmount(journalfile, true, true)) {
if(journalfile->mmap.fd != -1)
@@ -374,8 +456,8 @@ static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *
internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap...");
}
- netdata_spinlock_unlock(&journalfile->v2.spinlock);
- netdata_spinlock_unlock(&journalfile->mmap.spinlock);
+ spinlock_unlock(&journalfile->v2.spinlock);
+ spinlock_unlock(&journalfile->mmap.spinlock);
} while(has_references);
}
@@ -384,9 +466,9 @@ struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafi
{
struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile));
journalfile->datafile = datafile;
- netdata_spinlock_init(&journalfile->mmap.spinlock);
- netdata_spinlock_init(&journalfile->v2.spinlock);
- netdata_spinlock_init(&journalfile->unsafe.spinlock);
+ spinlock_init(&journalfile->mmap.spinlock);
+ spinlock_init(&journalfile->v2.spinlock);
+ spinlock_init(&journalfile->unsafe.spinlock);
journalfile->mmap.fd = -1;
datafile->journalfile = journalfile;
return journalfile;
@@ -401,7 +483,7 @@ static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file)
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
journalfile_v1_generate_path(datafile, path, sizeof(path));
- error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
ctx_fs_error(datafile->ctx);
}
uv_fs_req_cleanup(&req);
@@ -430,7 +512,7 @@ int journalfile_unlink(struct rrdengine_journalfile *journalfile)
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -454,7 +536,7 @@ int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct
if (journalfile->file) {
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -464,14 +546,14 @@ int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct
// This is the new journal v2 index file
ret = uv_fs_unlink(NULL, &req, path_v2, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -516,7 +598,7 @@ int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengi
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
fatal_assert(req.result < 0);
- error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
ctx_io_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -548,7 +630,7 @@ static int journalfile_check_superblock(uv_file file)
ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_read: %s", uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_read: %s", uv_strerror(ret));
uv_fs_req_cleanup(&req);
goto error;
}
@@ -557,7 +639,7 @@ static int journalfile_check_superblock(uv_file file)
if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
- error("DBENGINE: File has invalid superblock.");
+ netdata_log_error("DBENGINE: File has invalid superblock.");
ret = UV_EINVAL;
} else {
ret = 0;
@@ -569,7 +651,7 @@ static int journalfile_check_superblock(uv_file file)
static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size)
{
- static BITMAP256 page_error_map;
+ static BITMAP256 page_error_map = BITMAP256_INITIALIZER;
unsigned i, count, payload_length, descr_size;
struct rrdeng_jf_store_data *jf_metric_data;
@@ -578,7 +660,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx,
descr_size = sizeof(*jf_metric_data->descr) * count;
payload_length = sizeof(*jf_metric_data) + descr_size;
if (payload_length > max_size) {
- error("DBENGINE: corrupted transaction payload.");
+ netdata_log_error("DBENGINE: corrupted transaction payload.");
return;
}
@@ -589,7 +671,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx,
if (page_type > PAGE_TYPE_MAX) {
if (!bitmap256_get_bit(&page_error_map, page_type)) {
- error("DBENGINE: unknown page type %d encountered.", page_type);
+ netdata_log_error("DBENGINE: unknown page type %d encountered.", page_type);
bitmap256_set_bit(&page_error_map, page_type, 1);
}
continue;
@@ -658,36 +740,36 @@ static unsigned journalfile_replay_transaction(struct rrdengine_instance *ctx, s
*id = 0;
jf_header = buf;
if (STORE_PADDING == jf_header->type) {
- debug(D_RRDENGINE, "Skipping padding.");
+ netdata_log_debug(D_RRDENGINE, "Skipping padding.");
return 0;
}
if (sizeof(*jf_header) > max_size) {
- error("DBENGINE: corrupted transaction record, skipping.");
+ netdata_log_error("DBENGINE: corrupted transaction record, skipping.");
return 0;
}
*id = jf_header->id;
payload_length = jf_header->payload_length;
size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
if (size_bytes > max_size) {
- error("DBENGINE: corrupted transaction record, skipping.");
+ netdata_log_error("DBENGINE: corrupted transaction record, skipping.");
return 0;
}
jf_trailer = buf + sizeof(*jf_header) + payload_length;
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
ret = crc32cmp(jf_trailer->checksum, crc);
- debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
+ netdata_log_debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
if (unlikely(ret)) {
- error("DBENGINE: transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
+ netdata_log_error("DBENGINE: transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
return size_bytes;
}
switch (jf_header->type) {
case STORE_DATA:
- debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
+ netdata_log_debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
journalfile_restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
break;
default:
- error("DBENGINE: unknown transaction type, skipping record.");
+ netdata_log_error("DBENGINE: unknown transaction type, skipping record.");
break;
}
@@ -725,7 +807,7 @@ static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx,
iov = uv_buf_init(buf, size_bytes);
ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
uv_fs_req_cleanup(&req);
goto skip_file;
}
@@ -764,7 +846,7 @@ static int journalfile_check_v2_extent_list (void *data_start, size_t file_size)
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list));
if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
- error("DBENGINE: extent list CRC32 check: FAILED");
+ netdata_log_error("DBENGINE: extent list CRC32 check: FAILED");
return 1;
}
@@ -784,7 +866,7 @@ static int journalfile_check_v2_metric_list(void *data_start, size_t file_size)
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list));
if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) {
- error("DBENGINE: metric list CRC32 check: FAILED");
+ netdata_log_error("DBENGINE: metric list CRC32 check: FAILED");
return 1;
}
return 0;
@@ -828,19 +910,19 @@ static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size
rc = crc32cmp(journal_v2_trailer->checksum, crc);
if (unlikely(rc)) {
- error("DBENGINE: file CRC32 check: FAILED");
+ netdata_log_error("DBENGINE: file CRC32 check: FAILED");
return 1;
}
rc = journalfile_check_v2_extent_list(data_start, journal_v2_file_size);
if (rc) return 1;
- rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size);
- if (rc) return 1;
-
if (!db_engine_journal_check)
return 0;
+ rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size);
+ if (rc) return 1;
+
// Verify complete UUID chain
struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset);
@@ -849,7 +931,7 @@ static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size
unsigned entries;
unsigned total_pages = 0;
- info("DBENGINE: checking %u metrics that exist in the journal", j2_header->metric_count);
+ netdata_log_info("DBENGINE: checking %u metrics that exist in the journal", j2_header->metric_count);
for (entries = 0; entries < j2_header->metric_count; entries++) {
char uuid_str[UUID_STR_LEN];
@@ -880,16 +962,16 @@ static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size
metric++;
if ((uint32_t)((uint8_t *) metric - (uint8_t *) data_start) > (uint32_t) journal_v2_file_size) {
- info("DBENGINE: verification failed EOF reached -- total entries %u, verified %u", entries, verified);
+ netdata_log_info("DBENGINE: verification failed EOF reached -- total entries %u, verified %u", entries, verified);
return 1;
}
}
if (entries != verified) {
- info("DBENGINE: verification failed -- total entries %u, verified %u", entries, verified);
+ netdata_log_info("DBENGINE: verification failed -- total entries %u, verified %u", entries, verified);
return 1;
}
- info("DBENGINE: verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages);
+ netdata_log_info("DBENGINE: verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages);
return 0;
}
@@ -905,15 +987,25 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st
uint8_t *data_start = (uint8_t *)j2_header;
uint32_t entries = j2_header->metric_count;
+ if (journalfile->v2.flags & JOURNALFILE_FLAG_METRIC_CRC_CHECK) {
+ journalfile->v2.flags &= ~JOURNALFILE_FLAG_METRIC_CRC_CHECK;
+ if (journalfile_check_v2_metric_list(data_start, j2_header->journal_v2_file_size)) {
+ journalfile->v2.flags &= ~JOURNALFILE_FLAG_IS_AVAILABLE;
+ // needs rebuild
+ return;
+ }
+ }
+
struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset);
time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
+ time_t global_first_time_s = header_start_time_s;
time_t now_s = max_acceptable_collected_time();
for (size_t i=0; i < entries; i++) {
time_t start_time_s = header_start_time_s + metric->delta_start_s;
time_t end_time_s = header_start_time_s + metric->delta_end_s;
- update_metric_retention_and_granularity_by_uuid(
- ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s);
+ mrg_update_metric_retention_and_granularity_by_uuid(
+ main_mrg, (Word_t)ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s);
metric++;
}
@@ -921,12 +1013,18 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st
journalfile_v2_data_release(journalfile);
usec_t ended_ut = now_monotonic_usec();
- info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms"
+ netdata_log_info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms"
, ctx->config.tier, journalfile->datafile->fileno
, (double)data_size / 1024 / 1024
, (double)entries / 1000
, ((double)(ended_ut - started_ut) / USEC_PER_MS)
);
+
+ time_t old = __atomic_load_n(&ctx->atomic.first_time_s, __ATOMIC_RELAXED);;
+ do {
+ if(old <= global_first_time_s)
+ break;
+ } while(!__atomic_compare_exchange_n(&ctx->atomic.first_time_s, &old, global_first_time_s, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
}
int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
@@ -949,13 +1047,13 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
if (errno == ENOENT)
return 1;
ctx_fs_error(ctx);
- error("DBENGINE: failed to open '%s'", path_v2);
+ netdata_log_error("DBENGINE: failed to open '%s'", path_v2);
return 1;
}
ret = fstat(fd, &statbuf);
if (ret) {
- error("DBENGINE: failed to get file information for '%s'", path_v2);
+ netdata_log_error("DBENGINE: failed to get file information for '%s'", path_v2);
close(fd);
return 1;
}
@@ -975,7 +1073,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
return 1;
}
- info("DBENGINE: checking integrity of '%s'", path_v2);
+ netdata_log_info("DBENGINE: checking integrity of '%s'", path_v2);
usec_t validation_start_ut = now_monotonic_usec();
int rc = journalfile_v2_validate(data_start, journal_v2_file_size, journal_v1_file_size);
if (unlikely(rc)) {
@@ -987,7 +1085,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
error_report("File %s is invalid and it will be rebuilt", path_v2);
if (unlikely(munmap(data_start, journal_v2_file_size)))
- error("DBENGINE: failed to unmap '%s'", path_v2);
+ netdata_log_error("DBENGINE: failed to unmap '%s'", path_v2);
close(fd);
return rc;
@@ -998,7 +1096,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
if (unlikely(!entries)) {
if (unlikely(munmap(data_start, journal_v2_file_size)))
- error("DBENGINE: failed to unmap '%s'", path_v2);
+ netdata_log_error("DBENGINE: failed to unmap '%s'", path_v2);
close(fd);
return 1;
@@ -1006,7 +1104,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
usec_t finished_ut = now_monotonic_usec();
- info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, "
+ netdata_log_info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, "
"mmap: %0.2f ms, validate: %0.2f ms"
, path_v2
, (double)journal_v2_file_size / 1024 / 1024
@@ -1016,6 +1114,9 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal
);
// Initialize the journal file to be able to access the data
+
+ if (!db_engine_journal_check)
+ journalfile->v2.flags |= JOURNALFILE_FLAG_METRIC_CRC_CHECK;
journalfile_v2_data_set(journalfile, fd, data_start, journal_v2_file_size);
ctx_current_disk_space_increase(ctx, journal_v2_file_size);
@@ -1179,7 +1280,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
journalfile_v2_generate_path(datafile, path, sizeof(path));
- info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu",
+ netdata_log_info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu",
path,
number_of_extents,
number_of_metrics,
@@ -1350,7 +1451,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
internal_error(true, "DBENGINE: FILE COMPLETED --------> %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS);
- info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size);
+ netdata_log_info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size);
// msync(data_start, total_file_size, MS_SYNC);
journalfile_v2_data_set(journalfile, fd_v2, data_start, total_file_size);
@@ -1361,7 +1462,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
return;
}
else {
- info("DBENGINE: failed to build index '%s', file will be skipped", path);
+ netdata_log_info("DBENGINE: failed to build index '%s', file will be skipped", path);
j2_header.data = NULL;
j2_header.magic = JOURVAL_V2_SKIP_MAGIC;
memcpy(data_start, &j2_header, sizeof(j2_header));
@@ -1378,7 +1479,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno
if (ret < 0) {
ctx_current_disk_space_increase(ctx, total_file_size);
ctx_fs_error(ctx);
- error("DBENGINE: failed to resize file '%s'", path);
+ netdata_log_error("DBENGINE: failed to resize file '%s'", path);
}
else
ctx_current_disk_space_increase(ctx, resize_file_to);
@@ -1428,19 +1529,19 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil
ret = journalfile_check_superblock(file);
if (ret) {
- info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path);
+ netdata_log_info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path);
error = ret;
goto cleanup;
}
ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_jf_sb));
- info("DBENGINE: loading journal file '%s'", path);
+ netdata_log_info("DBENGINE: loading journal file '%s'", path);
max_id = journalfile_iterate_transactions(ctx, journalfile);
__atomic_store_n(&ctx->atomic.transaction_id, MAX(__atomic_load_n(&ctx->atomic.transaction_id, __ATOMIC_RELAXED), max_id + 1), __ATOMIC_RELAXED);
- info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size);
+ netdata_log_info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size);
bool is_last_file = (ctx_last_fileno_get(ctx) == journalfile->datafile->fileno);
if (is_last_file && journalfile->datafile->pos <= rrdeng_target_data_file_size(ctx) / 3) {
@@ -1459,7 +1560,7 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil
cleanup:
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h
index f6be6bcd9..5cdf72b9d 100644
--- a/database/engine/journalfile.h
+++ b/database/engine/journalfile.h
@@ -21,6 +21,7 @@ typedef enum __attribute__ ((__packed__)) {
JOURNALFILE_FLAG_IS_AVAILABLE = (1 << 0),
JOURNALFILE_FLAG_IS_MOUNTED = (1 << 1),
JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION = (1 << 2),
+ JOURNALFILE_FLAG_METRIC_CRC_CHECK = (1 << 3),
} JOURNALFILE_FLAGS;
/* only one event loop is supported for now */
@@ -39,9 +40,14 @@ struct rrdengine_journalfile {
time_t first_time_s;
time_t last_time_s;
time_t not_needed_since_s;
+ uint32_t size_of_directory;
} v2;
struct {
+ Word_t indexed_as;
+ } njfv2idx;
+
+ struct {
SPINLOCK spinlock;
uint64_t pos;
} unsafe;
@@ -51,9 +57,9 @@ struct rrdengine_journalfile {
};
static inline uint64_t journalfile_current_size(struct rrdengine_journalfile *journalfile) {
- netdata_spinlock_lock(&journalfile->unsafe.spinlock);
+ spinlock_lock(&journalfile->unsafe.spinlock);
uint64_t size = journalfile->unsafe.pos;
- netdata_spinlock_unlock(&journalfile->unsafe.spinlock);
+ spinlock_unlock(&journalfile->unsafe.spinlock);
return size;
}
@@ -157,4 +163,15 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi
void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile);
void journalfile_v2_data_unmount_cleanup(time_t now_s);
+typedef struct {
+ bool init;
+ Word_t last;
+ time_t wanted_start_time_s;
+ time_t wanted_end_time_s;
+ struct rrdengine_instance *ctx;
+ struct journal_v2_header *j2_header_acquired;
+} NJFV2IDX_FIND_STATE;
+
+struct rrdengine_datafile *njfv2idx_find_and_acquire_j2_header(NJFV2IDX_FIND_STATE *s);
+
#endif /* NETDATA_JOURNALFILE_H */ \ No newline at end of file
diff --git a/database/engine/metric.c b/database/engine/metric.c
index 6b65df9bb..1370f9d7a 100644
--- a/database/engine/metric.c
+++ b/database/engine/metric.c
@@ -16,6 +16,7 @@ struct metric {
time_t latest_time_s_hot; // latest time of the currently collected page
uint32_t latest_update_every_s; //
pid_t writer;
+ uint8_t partition;
METRIC_FLAGS flags;
REFCOUNT refcount;
SPINLOCK spinlock; // protects all variable members
@@ -27,103 +28,98 @@ struct metric {
static struct aral_statistics mrg_aral_statistics;
struct mrg {
- ARAL *aral[MRG_PARTITIONS];
+ size_t partitions;
- struct pgc_index {
- netdata_rwlock_t rwlock;
- Pvoid_t uuid_judy; // each UUID has a JudyL of sections (tiers)
- } index[MRG_PARTITIONS];
+ struct mrg_partition {
+ ARAL *aral; // not protected by our spinlock - it has its own
- struct mrg_statistics stats;
+ RW_SPINLOCK rw_spinlock;
+ Pvoid_t uuid_judy; // JudyHS: each UUID has a JudyL of sections (tiers)
- size_t entries_per_partition[MRG_PARTITIONS];
+ struct mrg_statistics stats;
+ } index[];
};
-static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg) {
- __atomic_add_fetch(&mrg->stats.additions_duplicate, 1, __ATOMIC_RELAXED);
+static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg, size_t partition) {
+ mrg->index[partition].stats.additions_duplicate++;
}
static inline void MRG_STATS_ADDED_METRIC(MRG *mrg, size_t partition) {
- __atomic_add_fetch(&mrg->stats.entries, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&mrg->stats.additions, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&mrg->stats.size, sizeof(METRIC), __ATOMIC_RELAXED);
-
- __atomic_add_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED);
+ mrg->index[partition].stats.entries++;
+ mrg->index[partition].stats.additions++;
+ mrg->index[partition].stats.size += sizeof(METRIC);
}
static inline void MRG_STATS_DELETED_METRIC(MRG *mrg, size_t partition) {
- __atomic_sub_fetch(&mrg->stats.entries, 1, __ATOMIC_RELAXED);
- __atomic_sub_fetch(&mrg->stats.size, sizeof(METRIC), __ATOMIC_RELAXED);
- __atomic_add_fetch(&mrg->stats.deletions, 1, __ATOMIC_RELAXED);
-
- __atomic_sub_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED);
+ mrg->index[partition].stats.entries--;
+ mrg->index[partition].stats.size -= sizeof(METRIC);
+ mrg->index[partition].stats.deletions++;
}
-static inline void MRG_STATS_SEARCH_HIT(MRG *mrg) {
- __atomic_add_fetch(&mrg->stats.search_hits, 1, __ATOMIC_RELAXED);
+static inline void MRG_STATS_SEARCH_HIT(MRG *mrg, size_t partition) {
+ __atomic_add_fetch(&mrg->index[partition].stats.search_hits, 1, __ATOMIC_RELAXED);
}
-static inline void MRG_STATS_SEARCH_MISS(MRG *mrg) {
- __atomic_add_fetch(&mrg->stats.search_misses, 1, __ATOMIC_RELAXED);
+static inline void MRG_STATS_SEARCH_MISS(MRG *mrg, size_t partition) {
+ __atomic_add_fetch(&mrg->index[partition].stats.search_misses, 1, __ATOMIC_RELAXED);
}
-static inline void MRG_STATS_DELETE_MISS(MRG *mrg) {
- __atomic_add_fetch(&mrg->stats.delete_misses, 1, __ATOMIC_RELAXED);
+static inline void MRG_STATS_DELETE_MISS(MRG *mrg, size_t partition) {
+ mrg->index[partition].stats.delete_misses++;
}
-static inline void mrg_index_read_lock(MRG *mrg, size_t partition) {
- netdata_rwlock_rdlock(&mrg->index[partition].rwlock);
-}
-static inline void mrg_index_read_unlock(MRG *mrg, size_t partition) {
- netdata_rwlock_unlock(&mrg->index[partition].rwlock);
-}
-static inline void mrg_index_write_lock(MRG *mrg, size_t partition) {
- netdata_rwlock_wrlock(&mrg->index[partition].rwlock);
-}
-static inline void mrg_index_write_unlock(MRG *mrg, size_t partition) {
- netdata_rwlock_unlock(&mrg->index[partition].rwlock);
-}
+#define mrg_index_read_lock(mrg, partition) rw_spinlock_read_lock(&(mrg)->index[partition].rw_spinlock)
+#define mrg_index_read_unlock(mrg, partition) rw_spinlock_read_unlock(&(mrg)->index[partition].rw_spinlock)
+#define mrg_index_write_lock(mrg, partition) rw_spinlock_write_lock(&(mrg)->index[partition].rw_spinlock)
+#define mrg_index_write_unlock(mrg, partition) rw_spinlock_write_unlock(&(mrg)->index[partition].rw_spinlock)
+
+#define metric_lock(metric) spinlock_lock(&(metric)->spinlock)
+#define metric_unlock(metric) spinlock_unlock(&(metric)->spinlock)
-static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl) {
+static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl, size_t partition) {
if(mem_after_judyl > mem_before_judyl)
- __atomic_add_fetch(&mrg->stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&mrg->index[partition].stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED);
else if(mem_after_judyl < mem_before_judyl)
- __atomic_sub_fetch(&mrg->stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&mrg->index[partition].stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED);
}
-static inline void mrg_stats_size_judyhs_added_uuid(MRG *mrg) {
- __atomic_add_fetch(&mrg->stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
+static inline void mrg_stats_size_judyhs_added_uuid(MRG *mrg, size_t partition) {
+ __atomic_add_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
}
-static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg) {
- __atomic_sub_fetch(&mrg->stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
+static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg, size_t partition) {
+ __atomic_sub_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
}
static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) {
uint8_t *u = (uint8_t *)uuid;
- return u[UUID_SZ - 1] % MRG_PARTITIONS;
+ size_t *n = (size_t *)&u[UUID_SZ - sizeof(size_t)];
+ return *n % mrg->partitions;
}
static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) {
+ size_t partition = metric->partition;
+
bool has_retention = (metric->first_time_s > 0 || metric->latest_time_s_clean > 0 || metric->latest_time_s_hot > 0);
if(has_retention && !(metric->flags & METRIC_FLAG_HAS_RETENTION)) {
metric->flags |= METRIC_FLAG_HAS_RETENTION;
- __atomic_add_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED);
}
else if(!has_retention && (metric->flags & METRIC_FLAG_HAS_RETENTION)) {
metric->flags &= ~METRIC_FLAG_HAS_RETENTION;
- __atomic_sub_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED);
}
return has_retention;
}
static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, bool having_spinlock) {
+ size_t partition = metric->partition;
REFCOUNT refcount;
if(!having_spinlock)
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(unlikely(metric->refcount < 0))
fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount);
@@ -134,21 +130,22 @@ static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, b
metric_has_retention_unsafe(mrg, metric);
if(!having_spinlock)
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
if(refcount == 1)
- __atomic_add_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
return refcount;
}
static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) {
bool ret = true;
+ size_t partition = metric->partition;
REFCOUNT refcount;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(unlikely(metric->refcount <= 0))
fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount);
@@ -158,20 +155,20 @@ static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, ME
if(likely(metric_has_retention_unsafe(mrg, metric) || refcount != 0))
ret = false;
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
if(unlikely(!refcount))
- __atomic_sub_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED);
- __atomic_sub_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED);
return ret;
}
-static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
+static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
size_t partition = uuid_partition(mrg, &entry->uuid);
- METRIC *allocation = aral_mallocz(mrg->aral[partition]);
+ METRIC *allocation = aral_mallocz(mrg->index[partition].aral);
mrg_index_write_lock(mrg, partition);
@@ -182,12 +179,12 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array");
if(unlikely(!*sections_judy_pptr))
- mrg_stats_size_judyhs_added_uuid(mrg);
+ mrg_stats_size_judyhs_added_uuid(mrg, partition);
mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
Pvoid_t *PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0);
mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
- mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl);
+ mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
if(unlikely(!PValue || PValue == PJERR))
fatal("DBENGINE METRIC: corrupted section JudyL array");
@@ -196,18 +193,21 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
METRIC *metric = *PValue;
metric_acquire(mrg, metric, false);
+
+ MRG_STATS_DUPLICATE_ADD(mrg, partition);
+
mrg_index_write_unlock(mrg, partition);
if(ret)
*ret = false;
- aral_freez(mrg->aral[partition], allocation);
+ aral_freez(mrg->index[partition].aral, allocation);
- MRG_STATS_DUPLICATE_ADD(mrg);
return metric;
}
METRIC *metric = allocation;
+ // memcpy(metric->uuid, entry->uuid, sizeof(uuid_t));
uuid_copy(metric->uuid, entry->uuid);
metric->section = entry->section;
metric->first_time_s = MAX(0, entry->first_time_s);
@@ -217,21 +217,22 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
metric->writer = 0;
metric->refcount = 0;
metric->flags = 0;
- netdata_spinlock_init(&metric->spinlock);
+ metric->partition = partition;
+ spinlock_init(&metric->spinlock);
metric_acquire(mrg, metric, true); // no spinlock use required here
*PValue = metric;
+ MRG_STATS_ADDED_METRIC(mrg, partition);
+
mrg_index_write_unlock(mrg, partition);
if(ret)
*ret = true;
- MRG_STATS_ADDED_METRIC(mrg, partition);
-
return metric;
}
-static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
+static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
size_t partition = uuid_partition(mrg, uuid);
mrg_index_read_lock(mrg, partition);
@@ -239,14 +240,14 @@ static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t));
if(unlikely(!sections_judy_pptr)) {
mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_MISS(mrg);
+ MRG_STATS_SEARCH_MISS(mrg, partition);
return NULL;
}
Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0);
if(unlikely(!PValue)) {
mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_MISS(mrg);
+ MRG_STATS_SEARCH_MISS(mrg, partition);
return NULL;
}
@@ -256,38 +257,38 @@ static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
mrg_index_read_unlock(mrg, partition);
- MRG_STATS_SEARCH_HIT(mrg);
+ MRG_STATS_SEARCH_HIT(mrg, partition);
return metric;
}
-static bool acquired_metric_del(MRG *mrg, METRIC *metric) {
- size_t partition = uuid_partition(mrg, &metric->uuid);
+static inline bool acquired_metric_del(MRG *mrg, METRIC *metric) {
+ size_t partition = metric->partition;
size_t mem_before_judyl, mem_after_judyl;
mrg_index_write_lock(mrg, partition);
if(!metric_release_and_can_be_deleted(mrg, metric)) {
+ mrg->index[partition].stats.delete_having_retention_or_referenced++;
mrg_index_write_unlock(mrg, partition);
- __atomic_add_fetch(&mrg->stats.delete_having_retention_or_referenced, 1, __ATOMIC_RELAXED);
return false;
}
Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t));
if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) {
+ MRG_STATS_DELETE_MISS(mrg, partition);
mrg_index_write_unlock(mrg, partition);
- MRG_STATS_DELETE_MISS(mrg);
return false;
}
mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0);
mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
- mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl);
+ mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition);
if(unlikely(!rc)) {
+ MRG_STATS_DELETE_MISS(mrg, partition);
mrg_index_write_unlock(mrg, partition);
- MRG_STATS_DELETE_MISS(mrg);
return false;
}
@@ -295,14 +296,14 @@ static bool acquired_metric_del(MRG *mrg, METRIC *metric) {
rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0);
if(unlikely(!rc))
fatal("DBENGINE METRIC: cannot delete UUID from JudyHS");
- mrg_stats_size_judyhs_removed_uuid(mrg);
+ mrg_stats_size_judyhs_removed_uuid(mrg, partition);
}
- mrg_index_write_unlock(mrg, partition);
+ MRG_STATS_DELETED_METRIC(mrg, partition);
- aral_freez(mrg->aral[partition], metric);
+ mrg_index_write_unlock(mrg, partition);
- MRG_STATS_DELETED_METRIC(mrg, partition);
+ aral_freez(mrg->index[partition].aral, metric);
return true;
}
@@ -310,38 +311,34 @@ static bool acquired_metric_del(MRG *mrg, METRIC *metric) {
// ----------------------------------------------------------------------------
// public API
-MRG *mrg_create(void) {
- MRG *mrg = callocz(1, sizeof(MRG));
+inline MRG *mrg_create(ssize_t partitions) {
+ if(partitions < 1)
+ partitions = get_netdata_cpus();
+
+ MRG *mrg = callocz(1, sizeof(MRG) + sizeof(struct mrg_partition) * partitions);
+ mrg->partitions = partitions;
- for(size_t i = 0; i < MRG_PARTITIONS ; i++) {
- netdata_rwlock_init(&mrg->index[i].rwlock);
+ for(size_t i = 0; i < mrg->partitions ; i++) {
+ rw_spinlock_init(&mrg->index[i].rw_spinlock);
char buf[ARAL_MAX_NAME + 1];
snprintfz(buf, ARAL_MAX_NAME, "mrg[%zu]", i);
- mrg->aral[i] = aral_create(buf,
- sizeof(METRIC),
- 0,
- 16384,
- &mrg_aral_statistics,
- NULL, NULL, false,
- false);
+ mrg->index[i].aral = aral_create(buf, sizeof(METRIC), 0, 16384, &mrg_aral_statistics, NULL, NULL, false, false);
}
- mrg->stats.size = sizeof(MRG);
-
return mrg;
}
-size_t mrg_aral_structures(void) {
+inline size_t mrg_aral_structures(void) {
return aral_structures_from_stats(&mrg_aral_statistics);
}
-size_t mrg_aral_overhead(void) {
+inline size_t mrg_aral_overhead(void) {
return aral_overhead_from_stats(&mrg_aral_statistics);
}
-void mrg_destroy(MRG *mrg __maybe_unused) {
+inline void mrg_destroy(MRG *mrg __maybe_unused) {
// no destruction possible
// we can't traverse the metrics list
@@ -351,57 +348,57 @@ void mrg_destroy(MRG *mrg __maybe_unused) {
;
}
-METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) {
+inline METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) {
// internal_fatal(entry.latest_time_s > max_acceptable_collected_time(),
// "DBENGINE METRIC: metric latest time is in the future");
return metric_add_and_acquire(mrg, &entry, ret);
}
-METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
+inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
return metric_get_and_acquire(mrg, uuid, section);
}
-bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
+inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
return acquired_metric_del(mrg, metric);
}
-METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
+inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
metric_acquire(mrg, metric, false);
return metric;
}
-bool mrg_metric_release(MRG *mrg, METRIC *metric) {
+inline bool mrg_metric_release(MRG *mrg, METRIC *metric) {
return metric_release_and_can_be_deleted(mrg, metric);
}
-Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) {
+inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) {
return (Word_t)metric;
}
-uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) {
+inline uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) {
return &metric->uuid;
}
-Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) {
+inline Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) {
return metric->section;
}
-bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
+inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative");
if(unlikely(first_time_s < 0))
return false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
metric->first_time_s = first_time_s;
metric_has_retention_unsafe(mrg, metric);
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
-void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) {
+inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) {
internal_fatal(first_time_s < 0 || last_time_s < 0 || update_every_s < 0,
"DBENGINE METRIC: timestamp is negative");
internal_fatal(first_time_s > max_acceptable_collected_time(),
@@ -421,7 +418,7 @@ void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t
if(unlikely(!first_time_s && !last_time_s && !update_every_s))
return;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(unlikely(first_time_s && (!metric->first_time_s || first_time_s < metric->first_time_s)))
metric->first_time_s = first_time_s;
@@ -436,29 +433,29 @@ void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t
metric->latest_update_every_s = (uint32_t) update_every_s;
metric_has_retention_unsafe(mrg, metric);
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
}
-bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
+inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative");
bool ret = false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(first_time_s > metric->first_time_s) {
metric->first_time_s = first_time_s;
ret = true;
}
metric_has_retention_unsafe(mrg, metric);
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return ret;
}
-time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
+inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t first_time_s;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(unlikely(!metric->first_time_s)) {
if(metric->latest_time_s_clean)
@@ -470,13 +467,13 @@ time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
first_time_s = metric->first_time_s;
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return first_time_s;
}
-void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) {
- netdata_spinlock_lock(&metric->spinlock);
+inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) {
+ metric_lock(metric);
if(unlikely(!metric->first_time_s)) {
if(metric->latest_time_s_clean)
@@ -490,16 +487,16 @@ void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *f
*last_time_s = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
*update_every_s = metric->latest_update_every_s;
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
}
-bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
+inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative");
if(unlikely(latest_time_s < 0))
return false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
// internal_fatal(latest_time_s > max_acceptable_collected_time(),
// "DBENGINE METRIC: metric latest time is in the future");
@@ -513,12 +510,12 @@ bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric,
metric->first_time_s = latest_time_s;
metric_has_retention_unsafe(mrg, metric);
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
// returns true when metric still has retention
-bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
+inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
Word_t section = mrg_metric_section(mrg, metric);
bool do_again = false;
size_t countdown = 5;
@@ -551,7 +548,7 @@ bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
if (min_first_time_s == LONG_MAX)
min_first_time_s = 0;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if (--countdown && !min_first_time_s && metric->latest_time_s_hot)
do_again = true;
else {
@@ -563,13 +560,13 @@ bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
ret = metric_has_retention_unsafe(mrg, metric);
}
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
} while(do_again);
return ret;
}
-bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
+inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative");
// internal_fatal(latest_time_s > max_acceptable_collected_time(),
@@ -578,204 +575,215 @@ bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, t
if(unlikely(latest_time_s < 0))
return false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
metric->latest_time_s_hot = latest_time_s;
if(unlikely(!metric->first_time_s))
metric->first_time_s = latest_time_s;
metric_has_retention_unsafe(mrg, metric);
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
-time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
+inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t max;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
max = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return max;
}
-bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
+inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
if(update_every_s <= 0)
return false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
metric->latest_update_every_s = (uint32_t) update_every_s;
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
-bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
+inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative");
if(update_every_s <= 0)
return false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(!metric->latest_update_every_s)
metric->latest_update_every_s = (uint32_t) update_every_s;
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return true;
}
-time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
+inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
time_t update_every_s;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
update_every_s = metric->latest_update_every_s;
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return update_every_s;
}
-bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
+inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
bool done = false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(!metric->writer) {
metric->writer = gettid();
- __atomic_add_fetch(&mrg->stats.writers, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED);
done = true;
}
else
- __atomic_add_fetch(&mrg->stats.writers_conflicts, 1, __ATOMIC_RELAXED);
- netdata_spinlock_unlock(&metric->spinlock);
+ __atomic_add_fetch(&mrg->index[metric->partition].stats.writers_conflicts, 1, __ATOMIC_RELAXED);
+ metric_unlock(metric);
return done;
}
-bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
+inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
bool done = false;
- netdata_spinlock_lock(&metric->spinlock);
+ metric_lock(metric);
if(metric->writer) {
metric->writer = 0;
- __atomic_sub_fetch(&mrg->stats.writers, 1, __ATOMIC_RELAXED);
+ __atomic_sub_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED);
done = true;
}
- netdata_spinlock_unlock(&metric->spinlock);
+ metric_unlock(metric);
return done;
}
-struct mrg_statistics mrg_get_statistics(MRG *mrg) {
- // FIXME - use atomics
- return mrg->stats;
-}
-
-// ----------------------------------------------------------------------------
-// unit test
+inline void mrg_update_metric_retention_and_granularity_by_uuid(
+ MRG *mrg, Word_t section, uuid_t *uuid,
+ time_t first_time_s, time_t last_time_s,
+ time_t update_every_s, time_t now_s)
+{
+ if(unlikely(last_time_s > now_s)) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), "
+ "fixing last time to now",
+ first_time_s, last_time_s, now_s);
+ last_time_s = now_s;
+ }
-#ifdef MRG_STRESS_TEST
+ if (unlikely(first_time_s > last_time_s)) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), "
+ "fixing first time to last time",
+ first_time_s, last_time_s, now_s);
-static void mrg_stress(MRG *mrg, size_t entries, size_t sections) {
- bool ret;
+ first_time_s = last_time_s;
+ }
- info("DBENGINE METRIC: stress testing %zu entries on %zu sections...", entries, sections);
+ if (unlikely(first_time_s == 0 || last_time_s == 0)) {
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), "
+ "using them as-is",
+ first_time_s, last_time_s, now_s);
+ }
- METRIC *array[entries][sections];
- for(size_t i = 0; i < entries ; i++) {
- MRG_ENTRY e = {
- .first_time_s = (time_t)(i + 1),
- .latest_time_s = (time_t)(i + 2),
- .latest_update_every_s = (time_t)(i + 3),
+ bool added = false;
+ METRIC *metric = mrg_metric_get_and_acquire(mrg, uuid, section);
+ if (!metric) {
+ MRG_ENTRY entry = {
+ .section = section,
+ .first_time_s = first_time_s,
+ .last_time_s = last_time_s,
+ .latest_update_every_s = (uint32_t) update_every_s
};
- uuid_generate_random(e.uuid);
-
- for(size_t section = 0; section < sections ;section++) {
- e.section = section;
- array[i][section] = mrg_metric_add_and_acquire(mrg, e, &ret);
- if(!ret)
- fatal("DBENGINE METRIC: failed to add metric %zu, section %zu", i, section);
-
- if(mrg_metric_add_and_acquire(mrg, e, &ret) != array[i][section])
- fatal("DBENGINE METRIC: adding the same metric twice, returns a different metric");
-
- if(ret)
- fatal("DBENGINE METRIC: adding the same metric twice, returns success");
-
- if(mrg_metric_get_and_acquire(mrg, &e.uuid, e.section) != array[i][section])
- fatal("DBENGINE METRIC: cannot get back the same metric");
-
- if(uuid_compare(*mrg_metric_uuid(mrg, array[i][section]), e.uuid) != 0)
- fatal("DBENGINE METRIC: uuids do not match");
- }
+ // memcpy(entry.uuid, *uuid, sizeof(uuid_t));
+ uuid_copy(entry.uuid, *uuid);
+ metric = mrg_metric_add_and_acquire(mrg, entry, &added);
}
- for(size_t i = 0; i < entries ; i++) {
- for (size_t section = 0; section < sections; section++) {
- uuid_t uuid;
- uuid_generate_random(uuid);
-
- if(mrg_metric_get_and_acquire(mrg, &uuid, section))
- fatal("DBENGINE METRIC: found non-existing uuid");
-
- if(mrg_metric_id(mrg, array[i][section]) != (Word_t)array[i][section])
- fatal("DBENGINE METRIC: metric id does not match");
-
- if(mrg_metric_get_first_time_s(mrg, array[i][section]) != (time_t)(i + 1))
- fatal("DBENGINE METRIC: wrong first time returned");
- if(mrg_metric_get_latest_time_s(mrg, array[i][section]) != (time_t)(i + 2))
- fatal("DBENGINE METRIC: wrong latest time returned");
- if(mrg_metric_get_update_every_s(mrg, array[i][section]) != (time_t)(i + 3))
- fatal("DBENGINE METRIC: wrong latest time returned");
-
- if(!mrg_metric_set_first_time_s(mrg, array[i][section], (time_t)((i + 1) * 2)))
- fatal("DBENGINE METRIC: cannot set first time");
- if(!mrg_metric_set_clean_latest_time_s(mrg, array[i][section], (time_t) ((i + 1) * 3)))
- fatal("DBENGINE METRIC: cannot set latest time");
- if(!mrg_metric_set_update_every(mrg, array[i][section], (time_t)((i + 1) * 4)))
- fatal("DBENGINE METRIC: cannot set update every");
-
- if(mrg_metric_get_first_time_s(mrg, array[i][section]) != (time_t)((i + 1) * 2))
- fatal("DBENGINE METRIC: wrong first time returned");
- if(mrg_metric_get_latest_time_s(mrg, array[i][section]) != (time_t)((i + 1) * 3))
- fatal("DBENGINE METRIC: wrong latest time returned");
- if(mrg_metric_get_update_every_s(mrg, array[i][section]) != (time_t)((i + 1) * 4))
- fatal("DBENGINE METRIC: wrong latest time returned");
- }
+ if (likely(!added))
+ mrg_metric_expand_retention(mrg, metric, first_time_s, last_time_s, update_every_s);
+
+ mrg_metric_release(mrg, metric);
+}
+
+inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) {
+ memset(s, 0, sizeof(struct mrg_statistics));
+
+ for(size_t i = 0; i < mrg->partitions ;i++) {
+ s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED);
+ s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED);
+ s->entries_with_retention += __atomic_load_n(&mrg->index[i].stats.entries_with_retention, __ATOMIC_RELAXED);
+ s->size += __atomic_load_n(&mrg->index[i].stats.size, __ATOMIC_RELAXED);
+ s->current_references += __atomic_load_n(&mrg->index[i].stats.current_references, __ATOMIC_RELAXED);
+ s->additions += __atomic_load_n(&mrg->index[i].stats.additions, __ATOMIC_RELAXED);
+ s->additions_duplicate += __atomic_load_n(&mrg->index[i].stats.additions_duplicate, __ATOMIC_RELAXED);
+ s->deletions += __atomic_load_n(&mrg->index[i].stats.deletions, __ATOMIC_RELAXED);
+ s->delete_having_retention_or_referenced += __atomic_load_n(&mrg->index[i].stats.delete_having_retention_or_referenced, __ATOMIC_RELAXED);
+ s->delete_misses += __atomic_load_n(&mrg->index[i].stats.delete_misses, __ATOMIC_RELAXED);
+ s->search_hits += __atomic_load_n(&mrg->index[i].stats.search_hits, __ATOMIC_RELAXED);
+ s->search_misses += __atomic_load_n(&mrg->index[i].stats.search_misses, __ATOMIC_RELAXED);
+ s->writers += __atomic_load_n(&mrg->index[i].stats.writers, __ATOMIC_RELAXED);
+ s->writers_conflicts += __atomic_load_n(&mrg->index[i].stats.writers_conflicts, __ATOMIC_RELAXED);
}
- for(size_t i = 0; i < entries ; i++) {
- for (size_t section = 0; section < sections; section++) {
- if(!mrg_metric_release_and_delete(mrg, array[i][section]))
- fatal("DBENGINE METRIC: failed to delete metric");
- }
- }
+ s->size += sizeof(MRG) + sizeof(struct mrg_partition) * mrg->partitions;
}
-static void *mrg_stress_test_thread1(void *ptr) {
- MRG *mrg = ptr;
+// ----------------------------------------------------------------------------
+// unit test
- for(int i = 0; i < 5 ; i++)
- mrg_stress(mrg, 10000, 5);
+struct mrg_stress_entry {
+ uuid_t uuid;
+ time_t after;
+ time_t before;
+};
- return ptr;
-}
+struct mrg_stress {
+ MRG *mrg;
+ bool stop;
+ size_t entries;
+ struct mrg_stress_entry *array;
+ size_t updates;
+};
-static void *mrg_stress_test_thread2(void *ptr) {
- MRG *mrg = ptr;
+static void *mrg_stress(void *ptr) {
+ struct mrg_stress *t = ptr;
+ MRG *mrg = t->mrg;
- for(int i = 0; i < 10 ; i++)
- mrg_stress(mrg, 500, 50);
+ ssize_t start = 0;
+ ssize_t end = (ssize_t)t->entries;
+ ssize_t step = 1;
- return ptr;
-}
+ if(gettid() % 2) {
+ start = (ssize_t)t->entries - 1;
+ end = -1;
+ step = -1;
+ }
+
+ while(!__atomic_load_n(&t->stop, __ATOMIC_RELAXED)) {
+ for (ssize_t i = start; i != end; i += step) {
+ struct mrg_stress_entry *e = &t->array[i];
-static void *mrg_stress_test_thread3(void *ptr) {
- MRG *mrg = ptr;
+ time_t after = __atomic_sub_fetch(&e->after, 1, __ATOMIC_RELAXED);
+ time_t before = __atomic_add_fetch(&e->before, 1, __ATOMIC_RELAXED);
- for(int i = 0; i < 50 ; i++)
- mrg_stress(mrg, 5000, 1);
+ mrg_update_metric_retention_and_granularity_by_uuid(
+ mrg, 0x01,
+ &e->uuid,
+ after,
+ before,
+ 1,
+ before);
+
+ __atomic_add_fetch(&t->updates, 1, __ATOMIC_RELAXED);
+ }
+ }
return ptr;
}
-#endif
int mrg_unittest(void) {
- MRG *mrg = mrg_create();
+ MRG *mrg = mrg_create(0);
METRIC *m1_t0, *m2_t0, *m3_t0, *m4_t0;
METRIC *m1_t1, *m2_t1, *m3_t1, *m4_t1;
bool ret;
@@ -850,54 +858,84 @@ int mrg_unittest(void) {
if(!mrg_metric_release_and_delete(mrg, m1_t1))
fatal("DBENGINE METRIC: cannot delete the second metric");
- if(mrg->stats.entries != 0)
+ struct mrg_statistics s;
+ mrg_get_statistics(mrg, &s);
+ if(s.entries != 0)
fatal("DBENGINE METRIC: invalid entries counter");
-#ifdef MRG_STRESS_TEST
- usec_t started_ut = now_monotonic_usec();
- pthread_t thread1;
- netdata_thread_create(&thread1, "TH1",
- NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
- mrg_stress_test_thread1, mrg);
+ size_t entries = 1000000;
+ size_t threads = mrg->partitions / 3 + 1;
+ size_t tiers = 3;
+ size_t run_for_secs = 5;
+ netdata_log_info("preparing stress test of %zu entries...", entries);
+ struct mrg_stress t = {
+ .mrg = mrg,
+ .entries = entries,
+ .array = callocz(entries, sizeof(struct mrg_stress_entry)),
+ };
- pthread_t thread2;
- netdata_thread_create(&thread2, "TH2",
- NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
- mrg_stress_test_thread2, mrg);
+ time_t now = max_acceptable_collected_time();
+ for(size_t i = 0; i < entries ;i++) {
+ uuid_generate_random(t.array[i].uuid);
+ t.array[i].after = now / 3;
+ t.array[i].before = now / 2;
+ }
+ netdata_log_info("stress test is populating MRG with 3 tiers...");
+ for(size_t i = 0; i < entries ;i++) {
+ struct mrg_stress_entry *e = &t.array[i];
+ for(size_t tier = 1; tier <= tiers ;tier++) {
+ mrg_update_metric_retention_and_granularity_by_uuid(
+ mrg, tier,
+ &e->uuid,
+ e->after,
+ e->before,
+ 1,
+ e->before);
+ }
+ }
+ netdata_log_info("stress test ready to run...");
- pthread_t thread3;
- netdata_thread_create(&thread3, "TH3",
- NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
- mrg_stress_test_thread3, mrg);
+ usec_t started_ut = now_monotonic_usec();
+ pthread_t th[threads];
+ for(size_t i = 0; i < threads ; i++) {
+ char buf[15 + 1];
+ snprintfz(buf, 15, "TH[%zu]", i);
+ netdata_thread_create(&th[i], buf,
+ NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
+ mrg_stress, &t);
+ }
- sleep_usec(5 * USEC_PER_SEC);
+ sleep_usec(run_for_secs * USEC_PER_SEC);
+ __atomic_store_n(&t.stop, true, __ATOMIC_RELAXED);
- netdata_thread_cancel(thread1);
- netdata_thread_cancel(thread2);
- netdata_thread_cancel(thread3);
+ for(size_t i = 0; i < threads ; i++)
+ netdata_thread_cancel(th[i]);
+
+ for(size_t i = 0; i < threads ; i++)
+ netdata_thread_join(th[i], NULL);
- netdata_thread_join(thread1, NULL);
- netdata_thread_join(thread2, NULL);
- netdata_thread_join(thread3, NULL);
usec_t ended_ut = now_monotonic_usec();
- info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, "
+ struct mrg_statistics stats;
+ mrg_get_statistics(mrg, &stats);
+
+ netdata_log_info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, "
"%zu deletions, %zu wrong deletions, "
"%zu successful searches, %zu wrong searches, "
- "%zu successful pointer validations, %zu wrong pointer validations "
"in %llu usecs",
- mrg->stats.additions, mrg->stats.additions_duplicate,
- mrg->stats.deletions, mrg->stats.delete_misses,
- mrg->stats.search_hits, mrg->stats.search_misses,
- mrg->stats.pointer_validation_hits, mrg->stats.pointer_validation_misses,
+ stats.additions, stats.additions_duplicate,
+ stats.deletions, stats.delete_misses,
+ stats.search_hits, stats.search_misses,
ended_ut - started_ut);
-#endif
+ netdata_log_info("DBENGINE METRIC: updates performance: %0.2fk/sec total, %0.2fk/sec/thread",
+ (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0,
+ (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0 / threads);
mrg_destroy(mrg);
- info("DBENGINE METRIC: all tests passed!");
+ netdata_log_info("DBENGINE METRIC: all tests passed!");
return 0;
}
diff --git a/database/engine/metric.h b/database/engine/metric.h
index 82aff903a..5cb5b045e 100644
--- a/database/engine/metric.h
+++ b/database/engine/metric.h
@@ -3,7 +3,7 @@
#include "../rrd.h"
-#define MRG_PARTITIONS 10
+#define MRG_CACHE_LINE_PADDING(x) uint8_t padding##x[64]
typedef struct metric METRIC;
typedef struct mrg MRG;
@@ -17,13 +17,10 @@ typedef struct mrg_entry {
} MRG_ENTRY;
struct mrg_statistics {
- size_t entries;
- size_t entries_referenced;
- size_t entries_with_retention;
-
- size_t size; // total memory used, with indexing
+ // --- non-atomic --- under a write lock
- size_t current_references;
+ size_t entries;
+ size_t size; // total memory used, with indexing
size_t additions;
size_t additions_duplicate;
@@ -32,14 +29,28 @@ struct mrg_statistics {
size_t delete_having_retention_or_referenced;
size_t delete_misses;
+ MRG_CACHE_LINE_PADDING(0);
+
+ // --- atomic --- multiple readers / writers
+
+ size_t entries_referenced;
+
+ MRG_CACHE_LINE_PADDING(1);
+ size_t entries_with_retention;
+
+ MRG_CACHE_LINE_PADDING(2);
+ size_t current_references;
+
+ MRG_CACHE_LINE_PADDING(3);
size_t search_hits;
size_t search_misses;
+ MRG_CACHE_LINE_PADDING(4);
size_t writers;
size_t writers_conflicts;
};
-MRG *mrg_create(void);
+MRG *mrg_create(ssize_t partitions);
void mrg_destroy(MRG *mrg);
METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric);
@@ -72,8 +83,14 @@ bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric);
bool mrg_metric_set_writer(MRG *mrg, METRIC *metric);
bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric);
-struct mrg_statistics mrg_get_statistics(MRG *mrg);
+void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s);
size_t mrg_aral_structures(void);
size_t mrg_aral_overhead(void);
+
+void mrg_update_metric_retention_and_granularity_by_uuid(
+ MRG *mrg, Word_t section, uuid_t *uuid,
+ time_t first_time_s, time_t last_time_s,
+ time_t update_every_s, time_t now_s);
+
#endif // DBENGINE_METRIC_H
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index 02d08a164..c608c3270 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -387,15 +387,17 @@ static size_t list_has_time_gaps(
time_t wanted_end_time_s,
size_t *pages_total,
size_t *pages_found_pass4,
- size_t *pages_pending,
+ size_t *pages_to_load_from_disk,
size_t *pages_overlapping,
time_t *optimal_end_time_s,
- bool populate_gaps
+ bool populate_gaps,
+ PDC_PAGE_STATUS *common_status
) {
// we will recalculate these, so zero them
- *pages_pending = 0;
+ *pages_to_load_from_disk = 0;
*pages_overlapping = 0;
*optimal_end_time_s = 0;
+ *common_status = 0;
bool first;
Pvoid_t *PValue;
@@ -461,6 +463,7 @@ static size_t list_has_time_gaps(
(*pages_overlapping)++;
pd->status |= PDC_PAGE_SKIP;
pd->status &= ~(PDC_PAGE_READY | PDC_PAGE_DISK_PENDING);
+ *common_status |= pd->status;
continue;
}
@@ -480,7 +483,7 @@ static size_t list_has_time_gaps(
}
else if(!(pd->status & PDC_PAGE_FAILED) && (pd->status & PDC_PAGE_DATAFILE_ACQUIRED)) {
- (*pages_pending)++;
+ (*pages_to_load_from_disk)++;
pd->status |= PDC_PAGE_DISK_PENDING;
@@ -495,6 +498,8 @@ static size_t list_has_time_gaps(
pd->status &= ~PDC_PAGE_DISK_PENDING;
pd->status |= (PDC_PAGE_READY | PDC_PAGE_PRELOADED);
}
+
+ *common_status |= pd->status;
}
internal_fatal(pages_pass2 != pages_pass3,
@@ -505,6 +510,8 @@ static size_t list_has_time_gaps(
return gaps;
}
+// ----------------------------------------------------------------------------
+
typedef void (*page_found_callback_t)(PGC_PAGE *page, void *data);
static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METRIC *metric, usec_t start_time_ut, usec_t end_time_ut, page_found_callback_t callback, void *callback_data) {
uuid_t *uuid = mrg_metric_uuid(main_mrg, metric);
@@ -515,12 +522,19 @@ static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METR
size_t pages_found = 0;
- uv_rwlock_rdlock(&ctx->datafiles.rwlock);
+ NJFV2IDX_FIND_STATE state = {
+ .init = false,
+ .last = 0,
+ .ctx = ctx,
+ .wanted_start_time_s = wanted_start_time_s,
+ .wanted_end_time_s = wanted_end_time_s,
+ .j2_header_acquired = NULL,
+ };
+
struct rrdengine_datafile *datafile;
- for(datafile = ctx->datafiles.first; datafile ; datafile = datafile->next) {
- struct journal_v2_header *j2_header = journalfile_v2_data_acquire(datafile->journalfile, NULL,
- wanted_start_time_s,
- wanted_end_time_s);
+ while((datafile = njfv2idx_find_and_acquire_j2_header(&state))) {
+ struct journal_v2_header *j2_header = state.j2_header_acquired;
+
if (unlikely(!j2_header))
continue;
@@ -595,7 +609,6 @@ static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METR
journalfile_v2_data_release(datafile->journalfile);
}
- uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
return pages_found;
}
@@ -644,10 +657,13 @@ static Pvoid_t get_page_list(
METRIC *metric,
usec_t start_time_ut,
usec_t end_time_ut,
- size_t *pages_to_load,
- time_t *optimal_end_time_s
+ time_t *optimal_end_time_s,
+ size_t *pages_to_load_from_disk,
+ PDC_PAGE_STATUS *common_status
) {
*optimal_end_time_s = 0;
+ *pages_to_load_from_disk = 0;
+ *common_status = 0;
Pvoid_t JudyL_page_array = (Pvoid_t) NULL;
@@ -658,14 +674,13 @@ static Pvoid_t get_page_list(
pages_found_in_open_cache = 0,
pages_found_in_journals_v2 = 0,
pages_found_pass4 = 0,
- pages_pending = 0,
pages_overlapping = 0,
pages_total = 0;
size_t cache_gaps = 0, query_gaps = 0;
bool done_v2 = false, done_open = false;
- usec_t pass1_ut = 0, pass2_ut = 0, pass3_ut = 0, pass4_ut = 0;
+ usec_t pass1_ut = 0, pass2_ut = 0, pass3_ut = 0, pass4_ut = 0, finish_ut = 0;
// --------------------------------------------------------------
// PASS 1: Check what the main page cache has available
@@ -680,8 +695,8 @@ static Pvoid_t get_page_list(
if(pages_found_in_main_cache && !cache_gaps) {
query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s,
- &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping,
- optimal_end_time_s, false);
+ &pages_total, &pages_found_pass4, pages_to_load_from_disk, &pages_overlapping,
+ optimal_end_time_s, false, common_status);
if (pages_total && !query_gaps)
goto we_are_done;
@@ -702,8 +717,8 @@ static Pvoid_t get_page_list(
if(pages_found_in_open_cache) {
query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s,
- &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping,
- optimal_end_time_s, false);
+ &pages_total, &pages_found_pass4, pages_to_load_from_disk, &pages_overlapping,
+ optimal_end_time_s, false, common_status);
if (pages_total && !query_gaps)
goto we_are_done;
@@ -726,15 +741,11 @@ static Pvoid_t get_page_list(
pass4_ut = now_monotonic_usec();
query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s,
- &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping,
- optimal_end_time_s, true);
+ &pages_total, &pages_found_pass4, pages_to_load_from_disk, &pages_overlapping,
+ optimal_end_time_s, true, common_status);
we_are_done:
-
- if(pages_to_load)
- *pages_to_load = pages_pending;
-
- usec_t finish_ut = now_monotonic_usec();
+ finish_ut = now_monotonic_usec();
time_delta(finish_ut, pass4_ut);
time_delta(finish_ut, pass3_ut);
time_delta(finish_ut, pass2_ut);
@@ -754,7 +765,7 @@ we_are_done:
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_meta_source_journal_v2, pages_found_in_journals_v2, __ATOMIC_RELAXED);
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache, pages_found_in_main_cache, __ATOMIC_RELAXED);
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache_at_pass4, pages_found_pass4, __ATOMIC_RELAXED);
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_to_load_from_disk, pages_pending, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_to_load_from_disk, *pages_to_load_from_disk, __ATOMIC_RELAXED);
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_overlapping_skipped, pages_overlapping, __ATOMIC_RELAXED);
return JudyL_page_array;
@@ -773,14 +784,23 @@ void rrdeng_prep_query(struct page_details_control *pdc, bool worker) {
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_QUERY);
- size_t pages_to_load = 0;
pdc->page_list_JudyL = get_page_list(pdc->ctx, pdc->metric,
pdc->start_time_s * USEC_PER_SEC,
pdc->end_time_s * USEC_PER_SEC,
- &pages_to_load,
- &pdc->optimal_end_time_s);
+ &pdc->optimal_end_time_s,
+ &pdc->pages_to_load_from_disk,
+ &pdc->common_status);
+
+ internal_fatal(pdc->pages_to_load_from_disk && !(pdc->common_status & PDC_PAGE_DISK_PENDING),
+ "DBENGINE: PDC reports there are %zu pages to load from disk, "
+ "but none of the pages has the PDC_PAGE_DISK_PENDING flag",
+ pdc->pages_to_load_from_disk);
+
+ internal_fatal(!pdc->pages_to_load_from_disk && (pdc->common_status & PDC_PAGE_DISK_PENDING),
+ "DBENGINE: PDC reports there are no pages to load from disk, "
+ "but one or more pages have the PDC_PAGE_DISK_PENDING flag");
- if (pages_to_load && pdc->page_list_JudyL) {
+ if (pdc->pages_to_load_from_disk && pdc->page_list_JudyL) {
pdc_acquire(pdc); // we get 1 for the 1st worker in the chain: do_read_page_list_work()
usec_t start_ut = now_monotonic_usec();
if(likely(pdc->priority == STORAGE_PRIORITY_SYNCHRONOUS))
@@ -822,7 +842,7 @@ void pg_cache_preload(struct rrdeng_query_handle *handle) {
handle->pdc->optimal_end_time_s = handle->end_time_s;
handle->pdc->ctx = handle->ctx;
handle->pdc->refcount = 1;
- netdata_spinlock_init(&handle->pdc->refcount_spinlock);
+ spinlock_init(&handle->pdc->refcount_spinlock);
completion_init(&handle->pdc->prep_completion);
completion_init(&handle->pdc->page_completion);
@@ -1063,7 +1083,7 @@ size_t dynamic_extent_cache_size(void) {
void pgc_and_mrg_initialize(void)
{
- main_mrg = mrg_create();
+ main_mrg = mrg_create(0);
size_t target_cache_size = (size_t)default_rrdeng_page_cache_mb * 1024ULL * 1024ULL;
size_t main_cache_size = (target_cache_size / 100) * 95;
diff --git a/database/engine/pdc.c b/database/engine/pdc.c
index 42fb2f6de..7da568787 100644
--- a/database/engine/pdc.c
+++ b/database/engine/pdc.c
@@ -198,7 +198,7 @@ void extent_buffer_init(void) {
void extent_buffer_cleanup1(void) {
struct extent_buffer *item = NULL;
- if(!netdata_spinlock_trylock(&extent_buffer_globals.protected.spinlock))
+ if(!spinlock_trylock(&extent_buffer_globals.protected.spinlock))
return;
if(extent_buffer_globals.protected.available_items && extent_buffer_globals.protected.available > 1) {
@@ -207,7 +207,7 @@ void extent_buffer_cleanup1(void) {
extent_buffer_globals.protected.available--;
}
- netdata_spinlock_unlock(&extent_buffer_globals.protected.spinlock);
+ spinlock_unlock(&extent_buffer_globals.protected.spinlock);
if(item) {
size_t bytes = sizeof(struct extent_buffer) + item->bytes;
@@ -225,13 +225,13 @@ struct extent_buffer *extent_buffer_get(size_t size) {
if(size < extent_buffer_globals.max_size)
size = extent_buffer_globals.max_size;
- netdata_spinlock_lock(&extent_buffer_globals.protected.spinlock);
+ spinlock_lock(&extent_buffer_globals.protected.spinlock);
if(likely(extent_buffer_globals.protected.available_items)) {
eb = extent_buffer_globals.protected.available_items;
DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(extent_buffer_globals.protected.available_items, eb, cache.prev, cache.next);
extent_buffer_globals.protected.available--;
}
- netdata_spinlock_unlock(&extent_buffer_globals.protected.spinlock);
+ spinlock_unlock(&extent_buffer_globals.protected.spinlock);
if(unlikely(eb && eb->bytes < size)) {
size_t bytes = sizeof(struct extent_buffer) + eb->bytes;
@@ -255,10 +255,10 @@ struct extent_buffer *extent_buffer_get(size_t size) {
void extent_buffer_release(struct extent_buffer *eb) {
if(unlikely(!eb)) return;
- netdata_spinlock_lock(&extent_buffer_globals.protected.spinlock);
+ spinlock_lock(&extent_buffer_globals.protected.spinlock);
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(extent_buffer_globals.protected.available_items, eb, cache.prev, cache.next);
extent_buffer_globals.protected.available++;
- netdata_spinlock_unlock(&extent_buffer_globals.protected.spinlock);
+ spinlock_unlock(&extent_buffer_globals.protected.spinlock);
}
size_t extent_buffer_cache_size(void) {
@@ -400,20 +400,20 @@ static void pdc_destroy(PDC *pdc) {
}
void pdc_acquire(PDC *pdc) {
- netdata_spinlock_lock(&pdc->refcount_spinlock);
+ spinlock_lock(&pdc->refcount_spinlock);
if(pdc->refcount < 1)
fatal("DBENGINE: pdc is not referenced and cannot be acquired");
pdc->refcount++;
- netdata_spinlock_unlock(&pdc->refcount_spinlock);
+ spinlock_unlock(&pdc->refcount_spinlock);
}
bool pdc_release_and_destroy_if_unreferenced(PDC *pdc, bool worker, bool router __maybe_unused) {
if(unlikely(!pdc))
return true;
- netdata_spinlock_lock(&pdc->refcount_spinlock);
+ spinlock_lock(&pdc->refcount_spinlock);
if(pdc->refcount <= 0)
fatal("DBENGINE: pdc is not referenced and cannot be released");
@@ -429,12 +429,12 @@ bool pdc_release_and_destroy_if_unreferenced(PDC *pdc, bool worker, bool router
}
if (pdc->refcount == 0) {
- netdata_spinlock_unlock(&pdc->refcount_spinlock);
+ spinlock_unlock(&pdc->refcount_spinlock);
pdc_destroy(pdc);
return true;
}
- netdata_spinlock_unlock(&pdc->refcount_spinlock);
+ spinlock_unlock(&pdc->refcount_spinlock);
return false;
}
@@ -456,7 +456,7 @@ static struct rrdeng_cmd *epdl_get_cmd(void *epdl_ptr) {
static bool epdl_pending_add(EPDL *epdl) {
bool added_new;
- netdata_spinlock_lock(&epdl->datafile->extent_queries.spinlock);
+ spinlock_lock(&epdl->datafile->extent_queries.spinlock);
Pvoid_t *PValue = JudyLIns(&epdl->datafile->extent_queries.pending_epdl_by_extent_offset_judyL, epdl->extent_offset, PJE0);
internal_fatal(!PValue || PValue == PJERR, "DBENGINE: corrupted pending extent judy");
@@ -478,20 +478,20 @@ static bool epdl_pending_add(EPDL *epdl) {
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(base, epdl, query.prev, query.next);
*PValue = base;
- netdata_spinlock_unlock(&epdl->datafile->extent_queries.spinlock);
+ spinlock_unlock(&epdl->datafile->extent_queries.spinlock);
return added_new;
}
static void epdl_pending_del(EPDL *epdl) {
- netdata_spinlock_lock(&epdl->datafile->extent_queries.spinlock);
+ spinlock_lock(&epdl->datafile->extent_queries.spinlock);
if(epdl->head_to_datafile_extent_queries_pending_for_extent) {
epdl->head_to_datafile_extent_queries_pending_for_extent = false;
int rc = JudyLDel(&epdl->datafile->extent_queries.pending_epdl_by_extent_offset_judyL, epdl->extent_offset, PJE0);
(void) rc;
internal_fatal(!rc, "DBENGINE: epdl not found in pending list");
}
- netdata_spinlock_unlock(&epdl->datafile->extent_queries.spinlock);
+ spinlock_unlock(&epdl->datafile->extent_queries.spinlock);
}
void pdc_to_epdl_router(struct rrdengine_instance *ctx, PDC *pdc, execute_extent_page_details_list_t exec_first_extent_list, execute_extent_page_details_list_t exec_rest_extent_list)
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c
index 7811a5eaa..ce363183d 100644
--- a/database/engine/rrdengine.c
+++ b/database/engine/rrdengine.c
@@ -351,7 +351,7 @@ static struct {
static void wal_cleanup1(void) {
WAL *wal = NULL;
- if(!netdata_spinlock_trylock(&wal_globals.protected.spinlock))
+ if(!spinlock_trylock(&wal_globals.protected.spinlock))
return;
if(wal_globals.protected.available_items && wal_globals.protected.available > storage_tiers) {
@@ -360,7 +360,7 @@ static void wal_cleanup1(void) {
wal_globals.protected.available--;
}
- netdata_spinlock_unlock(&wal_globals.protected.spinlock);
+ spinlock_unlock(&wal_globals.protected.spinlock);
if(wal) {
posix_memfree(wal->buf);
@@ -375,7 +375,7 @@ WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) {
WAL *wal = NULL;
- netdata_spinlock_lock(&wal_globals.protected.spinlock);
+ spinlock_lock(&wal_globals.protected.spinlock);
if(likely(wal_globals.protected.available_items)) {
wal = wal_globals.protected.available_items;
@@ -384,7 +384,7 @@ WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) {
}
uint64_t transaction_id = __atomic_fetch_add(&ctx->atomic.transaction_id, 1, __ATOMIC_RELAXED);
- netdata_spinlock_unlock(&wal_globals.protected.spinlock);
+ spinlock_unlock(&wal_globals.protected.spinlock);
if(unlikely(!wal)) {
wal = mallocz(sizeof(WAL));
@@ -416,10 +416,10 @@ WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) {
void wal_release(WAL *wal) {
if(unlikely(!wal)) return;
- netdata_spinlock_lock(&wal_globals.protected.spinlock);
+ spinlock_lock(&wal_globals.protected.spinlock);
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next);
wal_globals.protected.available++;
- netdata_spinlock_unlock(&wal_globals.protected.spinlock);
+ spinlock_unlock(&wal_globals.protected.spinlock);
}
// ----------------------------------------------------------------------------
@@ -459,7 +459,7 @@ void rrdeng_dequeue_epdl_cmd(struct rrdeng_cmd *cmd) {
}
void rrdeng_req_cmd(requeue_callback_t get_cmd_cb, void *data, STORAGE_PRIORITY priority) {
- netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+ spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
struct rrdeng_cmd *cmd = get_cmd_cb(data);
if(cmd) {
@@ -472,7 +472,7 @@ void rrdeng_req_cmd(requeue_callback_t get_cmd_cb, void *data, STORAGE_PRIORITY
}
}
- netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+ spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
}
void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, void *data, struct completion *completion,
@@ -489,12 +489,12 @@ void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, v
cmd->priority = priority;
cmd->dequeue_cb = dequeue_cb;
- netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+ spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next);
rrdeng_main.cmd_queue.unsafe.waiting++;
if(enqueue_cb)
enqueue_cb(cmd);
- netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+ spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
fatal_assert(0 == uv_async_send(&rrdeng_main.async));
}
@@ -532,7 +532,7 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) {
}
// find an opcode to execute from the queue
- netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+ spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
for(STORAGE_PRIORITY priority = min_priority; priority <= max_priority ; priority++) {
cmd = rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority];
if(cmd) {
@@ -559,7 +559,7 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) {
cmd->dequeue_cb = NULL;
}
- netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
+ spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
struct rrdeng_cmd ret;
if(cmd) {
@@ -712,9 +712,9 @@ static void *extent_flushed_to_open_tp_worker(struct rrdengine_instance *ctx __m
posix_memfree(xt_io_descr->buf);
extent_io_descriptor_release(xt_io_descr);
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
datafile->writers.flushed_to_open_running--;
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
if(datafile->fileno != ctx_last_fileno_get(ctx) && still_running)
// we just finished a flushing on a datafile that is not the active one
@@ -733,15 +733,15 @@ static void after_extent_write_datafile_io(uv_fs_t *uv_fs_request) {
if (uv_fs_request->result < 0) {
ctx_io_error(ctx);
- error("DBENGINE: %s: uv_fs_write(): %s", __func__, uv_strerror((int)uv_fs_request->result));
+ netdata_log_error("DBENGINE: %s: uv_fs_write(): %s", __func__, uv_strerror((int)uv_fs_request->result));
}
journalfile_v1_extent_write(ctx, xt_io_descr->datafile, xt_io_descr->wal, &rrdeng_main.loop);
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
datafile->writers.running--;
datafile->writers.flushed_to_open_running++;
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
rrdeng_enq_cmd(xt_io_descr->ctx,
RRDENG_OPCODE_FLUSHED_TO_OPEN,
@@ -756,12 +756,12 @@ static void after_extent_write_datafile_io(uv_fs_t *uv_fs_request) {
static bool datafile_is_full(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) {
bool ret = false;
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
if(ctx_is_available_for_queries(ctx) && datafile->pos > rrdeng_target_data_file_size(ctx))
ret = true;
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
return ret;
}
@@ -773,9 +773,9 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
datafile = ctx->datafiles.first->prev;
// become a writer on this datafile, to prevent it from vanishing
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
datafile->writers.running++;
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
if(datafile_is_full(ctx, datafile)) {
@@ -791,7 +791,7 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_
datafile = ctx->datafiles.first->prev;
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
- if(datafile_is_full(ctx, datafile) && create_new_datafile_pair(ctx) == 0)
+ if(datafile_is_full(ctx, datafile) && create_new_datafile_pair(ctx, true) == 0)
rrdeng_enq_cmd(ctx, RRDENG_OPCODE_JOURNAL_INDEX, datafile, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL,
NULL);
@@ -801,15 +801,15 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
datafile = ctx->datafiles.first->prev;
// become a writer on this datafile, to prevent it from vanishing
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
datafile->writers.running++;
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
// release the writers on the old datafile
- netdata_spinlock_lock(&old_datafile->writers.spinlock);
+ spinlock_lock(&old_datafile->writers.spinlock);
old_datafile->writers.running--;
- netdata_spinlock_unlock(&old_datafile->writers.spinlock);
+ spinlock_unlock(&old_datafile->writers.spinlock);
}
return datafile;
@@ -921,11 +921,11 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta
real_io_size = ALIGN_BYTES_CEILING(size_bytes);
datafile = get_datafile_to_write_extent(ctx);
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
xt_io_descr->datafile = datafile;
xt_io_descr->pos = datafile->pos;
datafile->pos += real_io_size;
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
xt_io_descr->bytes = size_bytes;
xt_io_descr->uv_fs_request.data = xt_io_descr;
@@ -998,12 +998,14 @@ struct rrdengine_datafile *datafile_release_and_acquire_next_for_retention(struc
return next_datafile;
}
-void find_uuid_first_time(
+time_t find_uuid_first_time(
struct rrdengine_instance *ctx,
struct rrdengine_datafile *datafile,
struct uuid_first_time_s *uuid_first_entry_list,
size_t count)
{
+ time_t global_first_time_s = LONG_MAX;
+
// acquire the datafile to work with it
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
while(datafile && !datafile_acquire(datafile, DATAFILE_ACQUIRE_RETENTION))
@@ -1011,7 +1013,7 @@ void find_uuid_first_time(
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
if (unlikely(!datafile))
- return;
+ return global_first_time_s;
unsigned journalfile_count = 0;
size_t binary_match = 0;
@@ -1025,6 +1027,10 @@ void find_uuid_first_time(
}
time_t journal_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
+
+ if(journal_start_time_s < global_first_time_s)
+ global_first_time_s = journal_start_time_s;
+
struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
struct uuid_first_time_s *uuid_original_entry;
@@ -1137,9 +1143,13 @@ void find_uuid_first_time(
without_retention,
without_metric
);
+
+ return global_first_time_s;
}
static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile_to_delete, struct rrdengine_datafile *first_datafile_remaining, bool worker) {
+ time_t global_first_time_s = LONG_MAX;
+
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_FIND_ROTATED_METRICS);
@@ -1174,7 +1184,7 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r
added++;
}
- info("DBENGINE: recalculating tier %d retention for %zu metrics starting with datafile %u",
+ netdata_log_info("DBENGINE: recalculating tier %d retention for %zu metrics starting with datafile %u",
ctx->config.tier, count, first_datafile_remaining->fileno);
journalfile_v2_data_release(journalfile);
@@ -1184,12 +1194,12 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_FIND_REMAINING_RETENTION);
- find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added);
+ global_first_time_s = find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added);
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG);
- info("DBENGINE: updating tier %d metrics registry retention for %zu metrics",
+ netdata_log_info("DBENGINE: updating tier %d metrics registry retention for %zu metrics",
ctx->config.tier, added);
size_t deleted_metrics = 0, zero_retention_referenced = 0, zero_disk_retention = 0, zero_disk_but_live = 0;
@@ -1223,6 +1233,9 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r
"DBENGINE: deleted %zu metrics, zero retention but referenced %zu (out of %zu total, of which %zu have main cache retention) zero on-disk retention tier %d metrics from metrics registry",
deleted_metrics, zero_retention_referenced, zero_disk_retention, zero_disk_but_live, ctx->config.tier);
+ if(global_first_time_s != LONG_MAX)
+ __atomic_store_n(&ctx->atomic.first_time_s, global_first_time_s, __ATOMIC_RELAXED);
+
if(worker)
worker_is_idle();
}
@@ -1243,7 +1256,7 @@ void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *
datafile_got_for_deletion = datafile_acquire_for_deletion(datafile);
if (!datafile_got_for_deletion) {
- info("DBENGINE: waiting for data file '%s/"
+ netdata_log_info("DBENGINE: waiting for data file '%s/"
DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION
"' to be available for deletion, "
"it is in use currently by %u users.",
@@ -1255,7 +1268,7 @@ void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *
}
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_deletion_started, 1, __ATOMIC_RELAXED);
- info("DBENGINE: deleting data file '%s/"
+ netdata_log_info("DBENGINE: deleting data file '%s/"
DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION
"'.",
ctx->config.dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
@@ -1277,26 +1290,26 @@ void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *
journal_file_bytes = journalfile_current_size(journal_file);
deleted_bytes = journalfile_v2_data_size_get(journal_file);
- info("DBENGINE: deleting data and journal files to maintain disk quota");
+ netdata_log_info("DBENGINE: deleting data and journal files to maintain disk quota");
ret = journalfile_destroy_unsafe(journal_file, datafile);
if (!ret) {
journalfile_v1_generate_path(datafile, path, sizeof(path));
- info("DBENGINE: deleted journal file \"%s\".", path);
+ netdata_log_info("DBENGINE: deleted journal file \"%s\".", path);
journalfile_v2_generate_path(datafile, path, sizeof(path));
- info("DBENGINE: deleted journal file \"%s\".", path);
+ netdata_log_info("DBENGINE: deleted journal file \"%s\".", path);
deleted_bytes += journal_file_bytes;
}
ret = destroy_data_file_unsafe(datafile);
if (!ret) {
generate_datafilepath(datafile, path, sizeof(path));
- info("DBENGINE: deleted data file \"%s\".", path);
+ netdata_log_info("DBENGINE: deleted data file \"%s\".", path);
deleted_bytes += datafile_bytes;
}
freez(journal_file);
freez(datafile);
ctx_current_disk_space_decrease(ctx, deleted_bytes);
- info("DBENGINE: reclaimed %u bytes of disk space.", deleted_bytes);
+ netdata_log_info("DBENGINE: reclaimed %u bytes of disk space.", deleted_bytes);
}
static void *database_rotate_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
@@ -1334,11 +1347,11 @@ static void *populate_mrg_tp_worker(struct rrdengine_instance *ctx __maybe_unuse
// find a datafile to work
uv_rwlock_rdlock(&ctx->datafiles.rwlock);
for(datafile = ctx->datafiles.first; datafile ; datafile = datafile->next) {
- if(!netdata_spinlock_trylock(&datafile->populate_mrg.spinlock))
+ if(!spinlock_trylock(&datafile->populate_mrg.spinlock))
continue;
if(datafile->populate_mrg.populated) {
- netdata_spinlock_unlock(&datafile->populate_mrg.spinlock);
+ spinlock_unlock(&datafile->populate_mrg.spinlock);
continue;
}
@@ -1352,7 +1365,7 @@ static void *populate_mrg_tp_worker(struct rrdengine_instance *ctx __maybe_unuse
journalfile_v2_populate_retention_to_mrg(ctx, datafile->journalfile);
datafile->populate_mrg.populated = true;
- netdata_spinlock_unlock(&datafile->populate_mrg.spinlock);
+ spinlock_unlock(&datafile->populate_mrg.spinlock);
} while(1);
@@ -1376,7 +1389,7 @@ static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unuse
__atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) {
if(!logged) {
logged = true;
- info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...",
+ netdata_log_info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...",
__atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED),
(ctx->config.legacy) ? -1 : ctx->config.tier);
}
@@ -1444,7 +1457,7 @@ void async_cb(uv_async_t *handle)
{
uv_stop(handle->loop);
uv_update_time(handle->loop);
- debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
+ netdata_log_debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
}
#define TIMER_PERIOD_MS (1000)
@@ -1496,17 +1509,17 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb
continue;
}
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
bool available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true;
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
if(!available) {
- info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno);
+ netdata_log_info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno);
datafile = datafile->next;
continue;
}
- info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno);
+ netdata_log_info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno);
pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
@@ -1623,21 +1636,21 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) {
static bool spawned = false;
static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
- netdata_spinlock_lock(&spinlock);
+ spinlock_lock(&spinlock);
if(!spawned) {
int ret;
ret = uv_loop_init(&rrdeng_main.loop);
if (ret) {
- error("DBENGINE: uv_loop_init(): %s", uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_loop_init(): %s", uv_strerror(ret));
return false;
}
rrdeng_main.loop.data = &rrdeng_main;
ret = uv_async_init(&rrdeng_main.loop, &rrdeng_main.async, async_cb);
if (ret) {
- error("DBENGINE: uv_async_init(): %s", uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_async_init(): %s", uv_strerror(ret));
fatal_assert(0 == uv_loop_close(&rrdeng_main.loop));
return false;
}
@@ -1645,7 +1658,7 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) {
ret = uv_timer_init(&rrdeng_main.loop, &rrdeng_main.timer);
if (ret) {
- error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret));
uv_close((uv_handle_t *)&rrdeng_main.async, NULL);
fatal_assert(0 == uv_loop_close(&rrdeng_main.loop));
return false;
@@ -1658,7 +1671,7 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) {
spawned = true;
}
- netdata_spinlock_unlock(&spinlock);
+ spinlock_unlock(&spinlock);
return true;
}
@@ -1860,7 +1873,7 @@ void dbengine_event_loop(void* arg) {
}
/* cleanup operations of the event loop */
- info("DBENGINE: shutting down dbengine thread");
+ netdata_log_info("DBENGINE: shutting down dbengine thread");
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,
diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h
index 69e412354..b5476930a 100644
--- a/database/engine/rrdengine.h
+++ b/database/engine/rrdengine.h
@@ -34,31 +34,6 @@ struct rrdeng_cmd;
#define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u"
#define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u"
-typedef struct page_details_control {
- struct rrdengine_instance *ctx;
- struct metric *metric;
-
- struct completion prep_completion;
- struct completion page_completion; // sync between the query thread and the workers
-
- Pvoid_t page_list_JudyL; // the list of page details
- unsigned completed_jobs; // the number of jobs completed last time the query thread checked
- bool workers_should_stop; // true when the query thread left and the workers should stop
- bool prep_done;
-
- SPINLOCK refcount_spinlock; // spinlock to protect refcount
- int32_t refcount; // the number of workers currently working on this request + 1 for the query thread
- size_t executed_with_gaps;
-
- time_t start_time_s;
- time_t end_time_s;
- STORAGE_PRIORITY priority;
-
- time_t optimal_end_time_s;
-} PDC;
-
-PDC *pdc_get(void);
-
typedef enum __attribute__ ((__packed__)) {
// final status for all pages
// if a page does not have one of these, it is considered unroutable
@@ -99,6 +74,34 @@ typedef enum __attribute__ ((__packed__)) {
#define PDC_PAGE_QUERY_GLOBAL_SKIP_LIST (PDC_PAGE_FAILED | PDC_PAGE_SKIP | PDC_PAGE_INVALID | PDC_PAGE_RELEASED)
+typedef struct page_details_control {
+ struct rrdengine_instance *ctx;
+ struct metric *metric;
+
+ struct completion prep_completion;
+ struct completion page_completion; // sync between the query thread and the workers
+
+ Pvoid_t page_list_JudyL; // the list of page details
+ unsigned completed_jobs; // the number of jobs completed last time the query thread checked
+ bool workers_should_stop; // true when the query thread left and the workers should stop
+ bool prep_done;
+
+ PDC_PAGE_STATUS common_status;
+ size_t pages_to_load_from_disk;
+
+ SPINLOCK refcount_spinlock; // spinlock to protect refcount
+ int32_t refcount; // the number of workers currently working on this request + 1 for the query thread
+ size_t executed_with_gaps;
+
+ time_t start_time_s;
+ time_t end_time_s;
+ STORAGE_PRIORITY priority;
+
+ time_t optimal_end_time_s;
+} PDC;
+
+PDC *pdc_get(void);
+
struct page_details {
struct {
struct rrdengine_datafile *ptr;
@@ -362,6 +365,11 @@ struct rrdengine_instance {
} datafiles;
struct {
+ RW_SPINLOCK spinlock;
+ Pvoid_t JudyL;
+ } njfv2idx;
+
+ struct {
unsigned last_fileno; // newest index of datafile and journalfile
unsigned last_flush_fileno; // newest index of datafile received data
@@ -375,6 +383,8 @@ struct rrdengine_instance {
bool migration_to_v2_running;
bool now_deleting_files;
unsigned extents_currently_being_flushed; // non-zero until we commit data to disk (both datafile and journal file)
+
+ time_t first_time_s;
} atomic;
struct {
diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c
index ddc306ed7..49df5c814 100755
--- a/database/engine/rrdengineapi.c
+++ b/database/engine/rrdengineapi.c
@@ -247,7 +247,7 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri
is_1st_metric_writer = false;
char uuid[UUID_STR_LEN + 1];
uuid_unparse(*mrg_metric_uuid(main_mrg, metric), uuid);
- error("DBENGINE: metric '%s' is already collected and should not be collected twice - expect gaps on the charts", uuid);
+ netdata_log_error("DBENGINE: metric '%s' is already collected and should not be collected twice - expect gaps on the charts", uuid);
}
metric = mrg_metric_dup(main_mrg, metric);
@@ -312,7 +312,7 @@ static bool page_has_only_empty_metrics(struct rrdeng_collect_handle *handle) {
default: {
static bool logged = false;
if(!logged) {
- error("DBENGINE: cannot check page for nulls on unknown page type id %d", (mrg_metric_ctx(handle->metric))->config.page_type);
+ netdata_log_error("DBENGINE: cannot check page for nulls on unknown page type id %d", (mrg_metric_ctx(handle->metric))->config.page_type);
logged = true;
}
return false;
@@ -703,14 +703,14 @@ static void register_query_handle(struct rrdeng_query_handle *handle) {
handle->query_pid = gettid();
handle->started_time_s = now_realtime_sec();
- netdata_spinlock_lock(&global_query_handle_spinlock);
+ spinlock_lock(&global_query_handle_spinlock);
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(global_query_handle_ll, handle, prev, next);
- netdata_spinlock_unlock(&global_query_handle_spinlock);
+ spinlock_unlock(&global_query_handle_spinlock);
}
static void unregister_query_handle(struct rrdeng_query_handle *handle) {
- netdata_spinlock_lock(&global_query_handle_spinlock);
+ spinlock_lock(&global_query_handle_spinlock);
DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(global_query_handle_ll, handle, prev, next);
- netdata_spinlock_unlock(&global_query_handle_spinlock);
+ spinlock_unlock(&global_query_handle_spinlock);
}
#else
static void register_query_handle(struct rrdeng_query_handle *handle __maybe_unused) {
@@ -908,7 +908,7 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim
default: {
static bool logged = false;
if(!logged) {
- error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", handle->ctx->config.page_type);
+ netdata_log_error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", handle->ctx->config.page_type);
logged = true;
}
storage_point_empty(sp, sp.start_time_s, sp.end_time_s);
@@ -986,7 +986,7 @@ bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_
{
struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
if (unlikely(!ctx)) {
- error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__);
+ netdata_log_error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__);
return false;
}
@@ -1002,6 +1002,26 @@ bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_
return true;
}
+size_t rrdeng_disk_space_max(STORAGE_INSTANCE *db_instance) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ return ctx->config.max_disk_space;
+}
+
+size_t rrdeng_disk_space_used(STORAGE_INSTANCE *db_instance) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ return __atomic_load_n(&ctx->atomic.current_disk_space, __ATOMIC_RELAXED);
+}
+
+time_t rrdeng_global_first_time_s(STORAGE_INSTANCE *db_instance) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ return __atomic_load_n(&ctx->atomic.first_time_s, __ATOMIC_RELAXED);
+}
+
+size_t rrdeng_currently_collected_metrics(STORAGE_INSTANCE *db_instance) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
+ return __atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED);
+}
+
/*
* Gathers Database Engine statistics.
* Careful when modifying this function.
@@ -1062,20 +1082,20 @@ static void rrdeng_populate_mrg(struct rrdengine_instance *ctx) {
datafiles++;
uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
- size_t cpus = get_netdata_cpus() / storage_tiers;
- if(cpus > datafiles)
- cpus = datafiles;
+ ssize_t cpus = (ssize_t)get_netdata_cpus() / (ssize_t)storage_tiers;
+ if(cpus > (ssize_t)datafiles)
+ cpus = (ssize_t)datafiles;
- if(cpus < 1)
- cpus = 1;
+ if(cpus > (ssize_t)libuv_worker_threads)
+ cpus = (ssize_t)libuv_worker_threads;
- if(cpus > (size_t)libuv_worker_threads)
- cpus = (size_t)libuv_worker_threads;
+ if(cpus >= (ssize_t)get_netdata_cpus() / 2)
+ cpus = (ssize_t)(get_netdata_cpus() / 2 - 1);
- if(cpus > MRG_PARTITIONS)
- cpus = MRG_PARTITIONS;
+ if(cpus < 1)
+ cpus = 1;
- info("DBENGINE: populating retention to MRG from %zu journal files of tier %d, using %zu threads...", datafiles, ctx->config.tier, cpus);
+ netdata_log_info("DBENGINE: populating retention to MRG from %zu journal files of tier %d, using %zd threads...", datafiles, ctx->config.tier, cpus);
if(datafiles > 2) {
struct rrdengine_datafile *datafile;
@@ -1116,7 +1136,7 @@ void rrdeng_readiness_wait(struct rrdengine_instance *ctx) {
ctx->loading.populate_mrg.array = NULL;
ctx->loading.populate_mrg.size = 0;
- info("DBENGINE: tier %d is ready for data collection and queries", ctx->config.tier);
+ netdata_log_info("DBENGINE: tier %d is ready for data collection and queries", ctx->config.tier);
}
bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance) {
@@ -1140,7 +1160,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, const char *dbfiles_path,
/* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */
rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE);
if (rrdeng_reserved_file_descriptors > max_open_files) {
- error(
+ netdata_log_error(
"Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.",
(unsigned)rrdeng_reserved_file_descriptors,
(unsigned)max_open_files);
@@ -1172,6 +1192,9 @@ int rrdeng_init(struct rrdengine_instance **ctxp, const char *dbfiles_path,
ctx->atomic.transaction_id = 1;
ctx->quiesce.enabled = false;
+ rw_spinlock_init(&ctx->njfv2idx.spinlock);
+ ctx->atomic.first_time_s = LONG_MAX;
+
if (rrdeng_dbengine_spawn(ctx) && !init_rrd_files(ctx)) {
// success - we run this ctx too
rrdeng_populate_mrg(ctx);
@@ -1208,16 +1231,16 @@ int rrdeng_exit(struct rrdengine_instance *ctx) {
bool logged = false;
while(__atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED) && !unittest_running) {
if(!logged) {
- info("DBENGINE: waiting for collectors to finish on tier %d...", (ctx->config.legacy) ? -1 : ctx->config.tier);
+ netdata_log_info("DBENGINE: waiting for collectors to finish on tier %d...", (ctx->config.legacy) ? -1 : ctx->config.tier);
logged = true;
}
sleep_usec(100 * USEC_PER_MS);
}
- info("DBENGINE: flushing main cache for tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier);
+ netdata_log_info("DBENGINE: flushing main cache for tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier);
pgc_flush_all_hot_and_dirty_pages(main_cache, (Word_t)ctx);
- info("DBENGINE: shutting down tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier);
+ netdata_log_info("DBENGINE: shutting down tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier);
struct completion completion = {};
completion_init(&completion);
rrdeng_enq_cmd(ctx, RRDENG_OPCODE_CTX_SHUTDOWN, NULL, &completion, STORAGE_PRIORITY_BEST_EFFORT, NULL, NULL);
diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h
index 514954af7..12f1becd1 100644
--- a/database/engine/rrdengineapi.h
+++ b/database/engine/rrdengineapi.h
@@ -222,4 +222,7 @@ RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx);
size_t rrdeng_collectors_running(struct rrdengine_instance *ctx);
bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance);
+size_t rrdeng_disk_space_max(STORAGE_INSTANCE *db_instance);
+size_t rrdeng_disk_space_used(STORAGE_INSTANCE *db_instance);
+
#endif /* NETDATA_RRDENGINEAPI_H */
diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c
index 984a591e8..dc581d98d 100644
--- a/database/engine/rrdenginelib.c
+++ b/database/engine/rrdenginelib.c
@@ -14,12 +14,12 @@ int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size)
fatal_assert(req.result == 0);
s = req.ptr;
if (!(s->st_mode & S_IFREG)) {
- error("Not a regular file.\n");
+ netdata_log_error("Not a regular file.\n");
uv_fs_req_cleanup(&req);
return UV_EINVAL;
}
if (s->st_size < min_size) {
- error("File length is too short.\n");
+ netdata_log_error("File length is too short.\n");
uv_fs_req_cleanup(&req);
return UV_EINVAL;
}
@@ -56,16 +56,16 @@ int open_file_for_io(char *path, int flags, uv_file *file, int direct)
fd = uv_fs_open(NULL, &req, path, current_flags, S_IRUSR | S_IWUSR, NULL);
if (fd < 0) {
if ((direct) && (UV_EINVAL == fd)) {
- error("File \"%s\" does not support direct I/O, falling back to buffered I/O.", path);
+ netdata_log_error("File \"%s\" does not support direct I/O, falling back to buffered I/O.", path);
} else {
- error("Failed to open file \"%s\".", path);
+ netdata_log_error("Failed to open file \"%s\".", path);
--direct; /* break the loop */
}
} else {
fatal_assert(req.result >= 0);
*file = req.result;
#ifdef __APPLE__
- info("Disabling OS X caching for file \"%s\".", path);
+ netdata_log_info("Disabling OS X caching for file \"%s\".", path);
fcntl(fd, F_NOCACHE, 1);
#endif
--direct; /* break the loop */
@@ -90,7 +90,7 @@ int is_legacy_child(const char *machine_guid)
snprintfz(dbengine_file, FILENAME_MAX, "%s/%s/dbengine", netdata_configured_cache_dir, machine_guid);
int rc = uv_fs_stat(NULL, &stat_req, dbengine_file, NULL);
if (likely(rc == 0 && ((stat_req.statbuf.st_mode & S_IFMT) == S_IFDIR))) {
- //info("Found legacy engine folder \"%s\"", dbengine_file);
+ //netdata_log_info("Found legacy engine folder \"%s\"", dbengine_file);
return 1;
}
}
@@ -107,7 +107,7 @@ int count_legacy_children(char *dbfiles_path)
ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL);
if (ret < 0) {
uv_fs_req_cleanup(&req);
- error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret));
+ netdata_log_error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret));
return ret;
}
@@ -134,7 +134,7 @@ int compute_multidb_diskspace()
fclose(fp);
if (unlikely(rc != 1 || computed_multidb_disk_quota_mb < RRDENG_MIN_DISK_SPACE_MB)) {
errno = 0;
- error("File '%s' contains invalid input, it will be rebuild", multidb_disk_space_file);
+ netdata_log_error("File '%s' contains invalid input, it will be rebuild", multidb_disk_space_file);
computed_multidb_disk_quota_mb = -1;
}
}
@@ -143,15 +143,15 @@ int compute_multidb_diskspace()
int rc = count_legacy_children(netdata_configured_cache_dir);
if (likely(rc >= 0)) {
computed_multidb_disk_quota_mb = (rc + 1) * default_rrdeng_disk_quota_mb;
- info("Found %d legacy dbengines, setting multidb diskspace to %dMB", rc, computed_multidb_disk_quota_mb);
+ netdata_log_info("Found %d legacy dbengines, setting multidb diskspace to %dMB", rc, computed_multidb_disk_quota_mb);
fp = fopen(multidb_disk_space_file, "w");
if (likely(fp)) {
fprintf(fp, "%d", computed_multidb_disk_quota_mb);
- info("Created file '%s' to store the computed value", multidb_disk_space_file);
+ netdata_log_info("Created file '%s' to store the computed value", multidb_disk_space_file);
fclose(fp);
} else
- error("Failed to store the default multidb disk quota size on '%s'", multidb_disk_space_file);
+ netdata_log_error("Failed to store the default multidb disk quota size on '%s'", multidb_disk_space_file);
}
else
computed_multidb_disk_quota_mb = default_rrdeng_disk_quota_mb;
diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h
index ca8eacae4..831e48531 100644
--- a/database/engine/rrdenginelib.h
+++ b/database/engine/rrdenginelib.h
@@ -53,7 +53,7 @@ static inline void modify_bit(unsigned *x, unsigned pos, uint8_t val)
*x |= 1U << pos;
break;
default:
- error("modify_bit() called with invalid argument.");
+ netdata_log_error("modify_bit() called with invalid argument.");
break;
}
}