diff options
Diffstat (limited to 'database/engine')
-rw-r--r-- | database/engine/cache.c | 68 | ||||
-rw-r--r-- | database/engine/cache.h | 2 | ||||
-rw-r--r-- | database/engine/datafile.c | 117 | ||||
-rw-r--r-- | database/engine/datafile.h | 6 | ||||
-rw-r--r-- | database/engine/journalfile.c | 379 | ||||
-rw-r--r-- | database/engine/journalfile.h | 21 | ||||
-rw-r--r-- | database/engine/metric.c | 578 | ||||
-rw-r--r-- | database/engine/metric.h | 35 | ||||
-rw-r--r-- | database/engine/pagecache.c | 84 | ||||
-rw-r--r-- | database/engine/pdc.c | 30 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 123 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 60 | ||||
-rwxr-xr-x | database/engine/rrdengineapi.c | 69 | ||||
-rw-r--r-- | database/engine/rrdengineapi.h | 3 | ||||
-rw-r--r-- | database/engine/rrdenginelib.c | 22 | ||||
-rw-r--r-- | database/engine/rrdenginelib.h | 2 |
16 files changed, 923 insertions, 676 deletions
diff --git a/database/engine/cache.c b/database/engine/cache.c index bc3ba6b6..7a9ccf8d 100644 --- a/database/engine/cache.c +++ b/database/engine/cache.c @@ -112,8 +112,9 @@ struct pgc { PGC_CACHE_LINE_PADDING(0); struct pgc_index { - netdata_rwlock_t rwlock; + RW_SPINLOCK rw_spinlock; Pvoid_t sections_judy; + PGC_CACHE_LINE_PADDING(0); } *index; PGC_CACHE_LINE_PADDING(1); @@ -222,43 +223,40 @@ static inline size_t pgc_indexing_partition(PGC *cache, Word_t metric_id) { } static inline void pgc_index_read_lock(PGC *cache, size_t partition) { - netdata_rwlock_rdlock(&cache->index[partition].rwlock); + rw_spinlock_read_lock(&cache->index[partition].rw_spinlock); } static inline void pgc_index_read_unlock(PGC *cache, size_t partition) { - netdata_rwlock_unlock(&cache->index[partition].rwlock); + rw_spinlock_read_unlock(&cache->index[partition].rw_spinlock); } -//static inline bool pgc_index_write_trylock(PGC *cache, size_t partition) { -// return !netdata_rwlock_trywrlock(&cache->index[partition].rwlock); -//} static inline void pgc_index_write_lock(PGC *cache, size_t partition) { - netdata_rwlock_wrlock(&cache->index[partition].rwlock); + rw_spinlock_write_lock(&cache->index[partition].rw_spinlock); } static inline void pgc_index_write_unlock(PGC *cache, size_t partition) { - netdata_rwlock_unlock(&cache->index[partition].rwlock); + rw_spinlock_write_unlock(&cache->index[partition].rw_spinlock); } static inline bool pgc_ll_trylock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) { - return netdata_spinlock_trylock(&ll->spinlock); + return spinlock_trylock(&ll->spinlock); } static inline void pgc_ll_lock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) { - netdata_spinlock_lock(&ll->spinlock); + spinlock_lock(&ll->spinlock); } static inline void pgc_ll_unlock(PGC *cache __maybe_unused, struct pgc_linked_list *ll) { - netdata_spinlock_unlock(&ll->spinlock); + spinlock_unlock(&ll->spinlock); } static inline bool page_transition_trylock(PGC *cache __maybe_unused, PGC_PAGE *page) { - return netdata_spinlock_trylock(&page->transition_spinlock); + return spinlock_trylock(&page->transition_spinlock); } static inline void page_transition_lock(PGC *cache __maybe_unused, PGC_PAGE *page) { - netdata_spinlock_lock(&page->transition_spinlock); + spinlock_lock(&page->transition_spinlock); } static inline void page_transition_unlock(PGC *cache __maybe_unused, PGC_PAGE *page) { - netdata_spinlock_unlock(&page->transition_spinlock); + spinlock_unlock(&page->transition_spinlock); } // ---------------------------------------------------------------------------- @@ -267,9 +265,9 @@ static inline void page_transition_unlock(PGC *cache __maybe_unused, PGC_PAGE *p static inline size_t cache_usage_per1000(PGC *cache, size_t *size_to_evict) { if(size_to_evict) - netdata_spinlock_lock(&cache->usage.spinlock); + spinlock_lock(&cache->usage.spinlock); - else if(!netdata_spinlock_trylock(&cache->usage.spinlock)) + else if(!spinlock_trylock(&cache->usage.spinlock)) return __atomic_load_n(&cache->usage.per1000, __ATOMIC_RELAXED); size_t current_cache_size; @@ -319,7 +317,7 @@ static inline size_t cache_usage_per1000(PGC *cache, size_t *size_to_evict) { __atomic_store_n(&cache->stats.wanted_cache_size, wanted_cache_size, __ATOMIC_RELAXED); __atomic_store_n(&cache->stats.current_cache_size, current_cache_size, __ATOMIC_RELAXED); - netdata_spinlock_unlock(&cache->usage.spinlock); + spinlock_unlock(&cache->usage.spinlock); if(size_to_evict) { size_t target = (size_t)((unsigned long long)wanted_cache_size * (unsigned long long)cache->config.evict_low_threshold_per1000 / 1000ULL); @@ -422,7 +420,7 @@ static void pgc_section_pages_static_aral_init(void) { static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER; if(unlikely(!pgc_section_pages_aral)) { - netdata_spinlock_lock(&spinlock); + spinlock_lock(&spinlock); // we have to check again if(!pgc_section_pages_aral) @@ -433,7 +431,7 @@ static void pgc_section_pages_static_aral_init(void) { 65536, NULL, NULL, NULL, false, false); - netdata_spinlock_unlock(&spinlock); + spinlock_unlock(&spinlock); } } @@ -1255,7 +1253,7 @@ static PGC_PAGE *page_add(PGC *cache, PGC_ENTRY *entry, bool *added) { page->update_every_s = entry->update_every_s, page->data = entry->data; page->assumed_size = page_assumed_size(cache, entry->size); - netdata_spinlock_init(&page->transition_spinlock); + spinlock_init(&page->transition_spinlock); page->link.prev = NULL; page->link.next = NULL; @@ -1378,7 +1376,7 @@ static PGC_PAGE *page_find_and_acquire(PGC *cache, Word_t section, Word_t metric Word_t time = start_time_s; // find the previous page - page_ptr = JudyLLast(*pages_judy_pptr, &time, PJE0); + page_ptr = JudyLPrev(*pages_judy_pptr, &time, PJE0); if(unlikely(page_ptr == PJERR)) fatal("DBENGINE CACHE: corrupted page in pages judy array #2"); @@ -1779,11 +1777,11 @@ PGC *pgc_create(const char *name, cache->index = callocz(cache->config.partitions, sizeof(struct pgc_index)); for(size_t part = 0; part < cache->config.partitions ; part++) - netdata_rwlock_init(&cache->index[part].rwlock); + rw_spinlock_init(&cache->index[part].rw_spinlock); - netdata_spinlock_init(&cache->hot.spinlock); - netdata_spinlock_init(&cache->dirty.spinlock); - netdata_spinlock_init(&cache->clean.spinlock); + spinlock_init(&cache->hot.spinlock); + spinlock_init(&cache->dirty.spinlock); + spinlock_init(&cache->clean.spinlock); cache->hot.flags = PGC_PAGE_HOT; cache->hot.linked_list_in_sections_judy = true; @@ -1849,12 +1847,12 @@ void pgc_destroy(PGC *cache) { free_all_unreferenced_clean_pages(cache); if(PGC_REFERENCED_PAGES(cache)) - error("DBENGINE CACHE: there are %zu referenced cache pages - leaving the cache allocated", PGC_REFERENCED_PAGES(cache)); + netdata_log_error("DBENGINE CACHE: there are %zu referenced cache pages - leaving the cache allocated", PGC_REFERENCED_PAGES(cache)); else { pointer_destroy_index(cache); - for(size_t part = 0; part < cache->config.partitions ; part++) - netdata_rwlock_destroy(&cache->index[part].rwlock); +// for(size_t part = 0; part < cache->config.partitions ; part++) +// netdata_rwlock_destroy(&cache->index[part].rw_spinlock); #ifdef PGC_WITH_ARAL for(size_t part = 0; part < cache->config.partitions ; part++) @@ -2091,8 +2089,8 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_ } struct section_pages *sp = *section_pages_pptr; - if(!netdata_spinlock_trylock(&sp->migration_to_v2_spinlock)) { - info("DBENGINE: migration to journal v2 for datafile %u is postponed, another jv2 indexer is already running for this section", datafile_fileno); + if(!spinlock_trylock(&sp->migration_to_v2_spinlock)) { + netdata_log_info("DBENGINE: migration to journal v2 for datafile %u is postponed, another jv2 indexer is already running for this section", datafile_fileno); pgc_ll_unlock(cache, &cache->hot); return; } @@ -2205,7 +2203,7 @@ void pgc_open_cache_to_journal_v2(PGC *cache, Word_t section, unsigned datafile_ pgc_ll_lock(cache, &cache->hot); } - netdata_spinlock_unlock(&sp->migration_to_v2_spinlock); + spinlock_unlock(&sp->migration_to_v2_spinlock); pgc_ll_unlock(cache, &cache->hot); // callback @@ -2355,7 +2353,7 @@ void *unittest_stress_test_collector(void *ptr) { heartbeat_init(&hb); while(!__atomic_load_n(&pgc_uts.stop, __ATOMIC_RELAXED)) { - // info("COLLECTOR %zu: collecting metrics %zu to %zu, from %ld to %lu", id, metric_start, metric_end, start_time_t, start_time_t + pgc_uts.points_per_page); + // netdata_log_info("COLLECTOR %zu: collecting metrics %zu to %zu, from %ld to %lu", id, metric_start, metric_end, start_time_t, start_time_t + pgc_uts.points_per_page); netdata_thread_disable_cancelability(); @@ -2485,7 +2483,7 @@ void *unittest_stress_test_service(void *ptr) { } static void unittest_stress_test_save_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused) { - // info("SAVE %zu pages", entries); + // netdata_log_info("SAVE %zu pages", entries); if(!pgc_uts.stop) { usec_t t = pgc_uts.time_per_flush_ut; @@ -2625,7 +2623,7 @@ void unittest_stress_test(void) { if(stats.events_flush_critical > old_stats.events_flush_critical) flushing_status = "F"; - info("PGS %5zuk +%4zuk/-%4zuk " + netdata_log_info("PGS %5zuk +%4zuk/-%4zuk " "| RF %5zuk " "| HOT %5zuk +%4zuk -%4zuk " "| DRT %s %5zuk +%4zuk -%4zuk " @@ -2651,7 +2649,7 @@ void unittest_stress_test(void) { #endif ); } - info("Waiting for threads to stop..."); + netdata_log_info("Waiting for threads to stop..."); __atomic_store_n(&pgc_uts.stop, true, __ATOMIC_RELAXED); netdata_thread_join(service_thread, NULL); diff --git a/database/engine/cache.h b/database/engine/cache.h index 65e6a613..1486fdc1 100644 --- a/database/engine/cache.h +++ b/database/engine/cache.h @@ -31,7 +31,7 @@ typedef struct pgc_entry { uint8_t *custom_data; } PGC_ENTRY; -#define PGC_CACHE_LINE_PADDING(x) uint8_t padding##x[128] +#define PGC_CACHE_LINE_PADDING(x) uint8_t padding##x[64] struct pgc_queue_statistics { size_t entries; diff --git a/database/engine/datafile.c b/database/engine/datafile.c index 8c413d8d..d5c1285b 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -1,11 +1,15 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdengine.h" -void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool having_lock) { - uv_rwlock_wrlock(&ctx->datafiles.rwlock); + if(!having_lock) + uv_rwlock_wrlock(&ctx->datafiles.rwlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(ctx->datafiles.first, datafile, prev, next); - uv_rwlock_wrunlock(&ctx->datafiles.rwlock); + + if(!having_lock) + uv_rwlock_wrunlock(&ctx->datafiles.rwlock); } void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) @@ -27,9 +31,9 @@ static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_insta datafile->users.available = true; - netdata_spinlock_init(&datafile->users.spinlock); - netdata_spinlock_init(&datafile->writers.spinlock); - netdata_spinlock_init(&datafile->extent_queries.spinlock); + spinlock_init(&datafile->users.spinlock); + spinlock_init(&datafile->writers.spinlock); + spinlock_init(&datafile->extent_queries.spinlock); return datafile; } @@ -37,7 +41,7 @@ static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_insta bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) { bool ret; - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); if(df->users.available) { ret = true; @@ -47,25 +51,25 @@ bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS re else ret = false; - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); return ret; } void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) { - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); if(!df->users.lockers) fatal("DBENGINE DATAFILE: cannot release a datafile that is not acquired"); df->users.lockers--; df->users.lockers_by_reason[reason]--; - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); } bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { bool can_be_deleted = false; - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); df->users.available = false; if(!df->users.lockers) @@ -75,9 +79,9 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { // there are lockers // evict any pages referencing this in the open cache - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); pgc_open_evict_clean_pages_of_datafile(open_cache, df); - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); if(!df->users.lockers) can_be_deleted = true; @@ -86,12 +90,12 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { // there are lockers still // count the number of pages referencing this in the open cache - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); usec_t time_to_scan_ut = now_monotonic_usec(); size_t clean_pages_in_open_cache = pgc_count_clean_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df); size_t hot_pages_in_open_cache = pgc_count_hot_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df); time_to_scan_ut = now_monotonic_usec() - time_to_scan_ut; - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); if(!df->users.lockers) can_be_deleted = true; @@ -149,7 +153,7 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { time_to_scan_ut); } } - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); return can_be_deleted; } @@ -171,7 +175,7 @@ int close_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_close(NULL, &req, datafile->file, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -190,7 +194,7 @@ int unlink_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_unlink(NULL, &req, path, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -211,21 +215,21 @@ int destroy_data_file_unsafe(struct rrdengine_datafile *datafile) ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); ret = uv_fs_close(NULL, &req, datafile->file, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); ret = uv_fs_unlink(NULL, &req, path, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -268,7 +272,7 @@ int create_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { fatal_assert(req.result < 0); - error("DBENGINE: uv_fs_write: %s", uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_write: %s", uv_strerror(ret)); ctx_io_error(ctx); } uv_fs_req_cleanup(&req); @@ -299,7 +303,7 @@ static int check_data_file_superblock(uv_file file) ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_read: %s", uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_read: %s", uv_strerror(ret)); uv_fs_req_cleanup(&req); goto error; } @@ -309,7 +313,7 @@ static int check_data_file_superblock(uv_file file) if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) || strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) || superblock->tier != 1) { - error("DBENGINE: file has invalid superblock."); + netdata_log_error("DBENGINE: file has invalid superblock."); ret = UV_EINVAL; } else { ret = 0; @@ -334,7 +338,7 @@ static int load_data_file(struct rrdengine_datafile *datafile) ctx_fs_error(ctx); return fd; } - info("DBENGINE: initializing data file \"%s\".", path); + netdata_log_info("DBENGINE: initializing data file \"%s\".", path); ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); if (ret) @@ -350,14 +354,14 @@ static int load_data_file(struct rrdengine_datafile *datafile) datafile->file = file; datafile->pos = file_size; - info("DBENGINE: data file \"%s\" initialized (size:%"PRIu64").", path, file_size); + netdata_log_info("DBENGINE: data file \"%s\" initialized (size:%"PRIu64").", path, file_size); return 0; error: error = ret; ret = uv_fs_close(NULL, &req, file, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -390,11 +394,11 @@ static int scan_data_files(struct rrdengine_instance *ctx) if (ret < 0) { fatal_assert(req.result < 0); uv_fs_req_cleanup(&req); - error("DBENGINE: uv_fs_scandir(%s): %s", ctx->config.dbfiles_path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_scandir(%s): %s", ctx->config.dbfiles_path, uv_strerror(ret)); ctx_fs_error(ctx); return ret; } - info("DBENGINE: found %d files in path %s", ret, ctx->config.dbfiles_path); + netdata_log_info("DBENGINE: found %d files in path %s", ret, ctx->config.dbfiles_path); datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { @@ -410,11 +414,12 @@ static int scan_data_files(struct rrdengine_instance *ctx) freez(datafiles); return 0; } - if (matched_files == MAX_DATAFILES) { - error("DBENGINE: warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); - } + + if (matched_files == MAX_DATAFILES) + netdata_log_error("DBENGINE: warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); + qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp); - /* TODO: change this when tiering is implemented */ + ctx->atomic.last_fileno = datafiles[matched_files - 1]->fileno; for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { @@ -422,9 +427,9 @@ static int scan_data_files(struct rrdengine_instance *ctx) datafile = datafiles[i]; ret = load_data_file(datafile); - if (0 != ret) { + if (0 != ret) must_delete_pair = 1; - } + journalfile = journalfile_alloc_and_init(datafile); ret = journalfile_load(ctx, journalfile, datafile); if (0 != ret) { @@ -432,19 +437,20 @@ static int scan_data_files(struct rrdengine_instance *ctx) close_data_file(datafile); must_delete_pair = 1; } + if (must_delete_pair) { char path[RRDENG_PATH_MAX]; - error("DBENGINE: deleting invalid data and journal file pair."); + netdata_log_error("DBENGINE: deleting invalid data and journal file pair."); ret = journalfile_unlink(journalfile); if (!ret) { journalfile_v1_generate_path(datafile, path, sizeof(path)); - info("DBENGINE: deleted journal file \"%s\".", path); + netdata_log_info("DBENGINE: deleted journal file \"%s\".", path); } ret = unlink_data_file(datafile); if (!ret) { generate_datafilepath(datafile, path, sizeof(path)); - info("DBENGINE: deleted data file \"%s\".", path); + netdata_log_info("DBENGINE: deleted data file \"%s\".", path); } freez(journalfile); freez(datafile); @@ -453,8 +459,9 @@ static int scan_data_files(struct rrdengine_instance *ctx) } ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos); - datafile_list_insert(ctx, datafile); + datafile_list_insert(ctx, datafile, false); } + matched_files -= failed_to_load; freez(datafiles); @@ -462,7 +469,7 @@ static int scan_data_files(struct rrdengine_instance *ctx) } /* Creates a datafile and a journalfile pair */ -int create_new_datafile_pair(struct rrdengine_instance *ctx) +int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock) { __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_creation_started, 1, __ATOMIC_RELAXED); @@ -472,14 +479,14 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx) int ret; char path[RRDENG_PATH_MAX]; - info("DBENGINE: creating new data and journal files in path %s", ctx->config.dbfiles_path); + netdata_log_info("DBENGINE: creating new data and journal files in path %s", ctx->config.dbfiles_path); datafile = datafile_alloc_and_init(ctx, 1, fileno); ret = create_data_file(datafile); if(ret) goto error_after_datafile; generate_datafilepath(datafile, path, sizeof(path)); - info("DBENGINE: created data file \"%s\".", path); + netdata_log_info("DBENGINE: created data file \"%s\".", path); journalfile = journalfile_alloc_and_init(datafile); ret = journalfile_create(journalfile, datafile); @@ -487,10 +494,10 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx) goto error_after_journalfile; journalfile_v1_generate_path(datafile, path, sizeof(path)); - info("DBENGINE: created journal file \"%s\".", path); + netdata_log_info("DBENGINE: created journal file \"%s\".", path); ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos); - datafile_list_insert(ctx, datafile); + datafile_list_insert(ctx, datafile, having_lock); ctx_last_fileno_increment(ctx); return 0; @@ -514,20 +521,20 @@ int init_data_files(struct rrdengine_instance *ctx) fatal_assert(0 == uv_rwlock_init(&ctx->datafiles.rwlock)); ret = scan_data_files(ctx); if (ret < 0) { - error("DBENGINE: failed to scan path \"%s\".", ctx->config.dbfiles_path); + netdata_log_error("DBENGINE: failed to scan path \"%s\".", ctx->config.dbfiles_path); return ret; } else if (0 == ret) { - info("DBENGINE: data files not found, creating in path \"%s\".", ctx->config.dbfiles_path); + netdata_log_info("DBENGINE: data files not found, creating in path \"%s\".", ctx->config.dbfiles_path); ctx->atomic.last_fileno = 0; - ret = create_new_datafile_pair(ctx); + ret = create_new_datafile_pair(ctx, false); if (ret) { - error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->config.dbfiles_path); + netdata_log_error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->config.dbfiles_path); return ret; } } else { if (ctx->loading.create_new_datafile_pair) - create_new_datafile_pair(ctx); + create_new_datafile_pair(ctx, false); while(rrdeng_ctx_exceeded_disk_quota(ctx)) datafile_delete(ctx, ctx->datafiles.first, false, false); @@ -545,7 +552,7 @@ void finalize_data_files(struct rrdengine_instance *ctx) logged = false; while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED)) { if(!logged) { - info("Waiting for inflight flush to finish on tier %d...", ctx->config.tier); + netdata_log_info("Waiting for inflight flush to finish on tier %d...", ctx->config.tier); logged = true; } sleep_usec(100 * USEC_PER_MS); @@ -559,7 +566,7 @@ void finalize_data_files(struct rrdengine_instance *ctx) size_t iterations = 100; while(!datafile_acquire_for_deletion(datafile) && datafile != ctx->datafiles.first->prev && --iterations > 0) { if(!logged) { - info("Waiting to acquire data file %u of tier %d to close it...", datafile->fileno, ctx->config.tier); + netdata_log_info("Waiting to acquire data file %u of tier %d to close it...", datafile->fileno, ctx->config.tier); logged = true; } sleep_usec(100 * USEC_PER_MS); @@ -569,14 +576,14 @@ void finalize_data_files(struct rrdengine_instance *ctx) bool available = false; do { uv_rwlock_wrlock(&ctx->datafiles.rwlock); - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true; if(!available) { - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); uv_rwlock_wrunlock(&ctx->datafiles.rwlock); if(!logged) { - info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->config.tier); + netdata_log_info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->config.tier); logged = true; } sleep_usec(100 * USEC_PER_MS); @@ -586,7 +593,7 @@ void finalize_data_files(struct rrdengine_instance *ctx) journalfile_close(journalfile, datafile); close_data_file(datafile); datafile_list_delete_unsafe(ctx, datafile); - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); uv_rwlock_wrunlock(&ctx->datafiles.rwlock); freez(journalfile); diff --git a/database/engine/datafile.h b/database/engine/datafile.h index a08f3ae0..569f1b0a 100644 --- a/database/engine/datafile.h +++ b/database/engine/datafile.h @@ -21,7 +21,7 @@ struct rrdengine_instance; #endif #define MIN_DATAFILE_SIZE (4LU * 1024LU * 1024LU) -#define MAX_DATAFILES (65536) /* Supports up to 64TiB for now */ +#define MAX_DATAFILES (65536 * 4) /* Supports up to 64TiB for now */ #define TARGET_DATAFILES (50) typedef enum __attribute__ ((__packed__)) { @@ -74,14 +74,14 @@ bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS re void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason); bool datafile_acquire_for_deletion(struct rrdengine_datafile *df); -void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); +void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool having_lock); void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile); void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen); int close_data_file(struct rrdengine_datafile *datafile); int unlink_data_file(struct rrdengine_datafile *datafile); int destroy_data_file_unsafe(struct rrdengine_datafile *datafile); int create_data_file(struct rrdengine_datafile *datafile); -int create_new_datafile_pair(struct rrdengine_instance *ctx); +int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock); int init_data_files(struct rrdengine_instance *ctx); void finalize_data_files(struct rrdengine_instance *ctx); diff --git a/database/engine/journalfile.c b/database/engine/journalfile.c index 9998ee54..24d3c1c6 100644 --- a/database/engine/journalfile.c +++ b/database/engine/journalfile.c @@ -1,57 +1,6 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdengine.h" - -// DBENGINE2: Helper - -static void update_metric_retention_and_granularity_by_uuid( - struct rrdengine_instance *ctx, uuid_t *uuid, - time_t first_time_s, time_t last_time_s, - time_t update_every_s, time_t now_s) -{ - if(unlikely(last_time_s > now_s)) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), " - "fixing last time to now", - first_time_s, last_time_s, now_s); - last_time_s = now_s; - } - - if (unlikely(first_time_s > last_time_s)) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), " - "fixing first time to last time", - first_time_s, last_time_s, now_s); - - first_time_s = last_time_s; - } - - if (unlikely(first_time_s == 0 || last_time_s == 0)) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), " - "using them as-is", - first_time_s, last_time_s, now_s); - } - - bool added = false; - METRIC *metric = mrg_metric_get_and_acquire(main_mrg, uuid, (Word_t) ctx); - if (!metric) { - MRG_ENTRY entry = { - .section = (Word_t) ctx, - .first_time_s = first_time_s, - .last_time_s = last_time_s, - .latest_update_every_s = (uint32_t) update_every_s - }; - uuid_copy(entry.uuid, *uuid); - metric = mrg_metric_add_and_acquire(main_mrg, entry, &added); - } - - if (likely(!added)) - mrg_metric_expand_retention(main_mrg, metric, first_time_s, last_time_s, update_every_s); - - mrg_metric_release(main_mrg, metric); -} - static void after_extent_write_journalfile_v1_io(uv_fs_t* req) { worker_is_busy(RRDENG_FLUSH_TRANSACTION_BUFFER_CB); @@ -60,12 +9,12 @@ static void after_extent_write_journalfile_v1_io(uv_fs_t* req) struct generic_io_descriptor *io_descr = &wal->io_descr; struct rrdengine_instance *ctx = io_descr->ctx; - debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); + netdata_log_debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); if (req->result < 0) { ctx_io_error(ctx); - error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); + netdata_log_error("DBENGINE: %s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); } else { - debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); + netdata_log_debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__); } uv_fs_req_cleanup(req); @@ -92,10 +41,10 @@ void journalfile_v1_extent_write(struct rrdengine_instance *ctx, struct rrdengin io_descr->buf = wal->buf; io_descr->bytes = wal->buf_size; - netdata_spinlock_lock(&journalfile->unsafe.spinlock); + spinlock_lock(&journalfile->unsafe.spinlock); io_descr->pos = journalfile->unsafe.pos; journalfile->unsafe.pos += wal->buf_size; - netdata_spinlock_unlock(&journalfile->unsafe.spinlock); + spinlock_unlock(&journalfile->unsafe.spinlock); io_descr->req.data = wal; io_descr->data = journalfile; @@ -122,10 +71,129 @@ void journalfile_v1_generate_path(struct rrdengine_datafile *datafile, char *str datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno); } +// ---------------------------------------------------------------------------- + +struct rrdengine_datafile *njfv2idx_find_and_acquire_j2_header(NJFV2IDX_FIND_STATE *s) { + struct rrdengine_datafile *datafile = NULL; + + rw_spinlock_read_lock(&s->ctx->njfv2idx.spinlock); + + Pvoid_t *PValue = NULL; + + if(unlikely(!s->init)) { + s->init = true; + s->last = s->wanted_start_time_s; + + PValue = JudyLPrev(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) { + s->last = 0; + PValue = JudyLFirst(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) + s->last = s->wanted_start_time_s; + } + } + + while(1) { + if (likely(!PValue)) { + PValue = JudyLNext(s->ctx->njfv2idx.JudyL, &s->last, PJE0); + if (unlikely(PValue == PJERR)) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if(!PValue) { + // cannot find anything after that point + datafile = NULL; + break; + } + } + + datafile = *PValue; + TIME_RANGE_COMPARE rc = is_page_in_time_range(datafile->journalfile->v2.first_time_s, + datafile->journalfile->v2.last_time_s, + s->wanted_start_time_s, + s->wanted_end_time_s); + + if(rc == PAGE_IS_IN_RANGE) { + // this is good to return + break; + } + else if(rc == PAGE_IS_IN_THE_PAST) { + // continue to get the next + datafile = NULL; + PValue = NULL; + continue; + } + else /* PAGE_IS_IN_THE_FUTURE */ { + // we finished - no more datafiles + datafile = NULL; + PValue = NULL; + break; + } + } + + if(datafile) + s->j2_header_acquired = journalfile_v2_data_acquire(datafile->journalfile, NULL, + s->wanted_start_time_s, + s->wanted_end_time_s); + else + s->j2_header_acquired = NULL; + + rw_spinlock_read_unlock(&s->ctx->njfv2idx.spinlock); + + return datafile; +} + +static void njfv2idx_add(struct rrdengine_datafile *datafile) { + internal_fatal(datafile->journalfile->v2.last_time_s <= 0, "DBENGINE: NJFV2IDX trying to index a journal file with invalid first_time_s"); + + rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock); + datafile->journalfile->njfv2idx.indexed_as = datafile->journalfile->v2.last_time_s; + + do { + internal_fatal(datafile->journalfile->njfv2idx.indexed_as <= 0, "DBENGINE: NJFV2IDX journalfile is already indexed"); + + Pvoid_t *PValue = JudyLIns(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0); + if (!PValue || PValue == PJERR) + fatal("DBENGINE: NJFV2IDX corrupted judy array"); + + if (unlikely(*PValue)) { + // already there + datafile->journalfile->njfv2idx.indexed_as++; + } + else { + *PValue = datafile; + break; + } + } while(0); + + rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock); +} + +static void njfv2idx_remove(struct rrdengine_datafile *datafile) { + internal_fatal(!datafile->journalfile->njfv2idx.indexed_as, "DBENGINE: NJFV2IDX journalfile to remove is not indexed"); + + rw_spinlock_write_lock(&datafile->ctx->njfv2idx.spinlock); + + int rc = JudyLDel(&datafile->ctx->njfv2idx.JudyL, datafile->journalfile->njfv2idx.indexed_as, PJE0); + (void)rc; + internal_fatal(!rc, "DBENGINE: NJFV2IDX cannot remove entry"); + + datafile->journalfile->njfv2idx.indexed_as = 0; + + rw_spinlock_write_unlock(&datafile->ctx->njfv2idx.spinlock); +} + +// ---------------------------------------------------------------------------- + static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengine_journalfile *journalfile, size_t *data_size) { struct journal_v2_header *j2_header = NULL; - netdata_spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); if(!journalfile->mmap.data) { journalfile->mmap.data = mmap(NULL, journalfile->mmap.size, PROT_READ, MAP_SHARED, journalfile->mmap.fd, 0); @@ -136,9 +204,9 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin journalfile->mmap.data = NULL; journalfile->mmap.size = 0; - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); journalfile->v2.flags &= ~(JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED); - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); ctx_fs_error(journalfile->datafile->ctx); } @@ -147,12 +215,21 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin madvise_dontfork(journalfile->mmap.data, journalfile->mmap.size); madvise_dontdump(journalfile->mmap.data, journalfile->mmap.size); - madvise_random(journalfile->mmap.data, journalfile->mmap.size); - madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size); - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); journalfile->v2.flags |= JOURNALFILE_FLAG_IS_AVAILABLE | JOURNALFILE_FLAG_IS_MOUNTED; - netdata_spinlock_unlock(&journalfile->v2.spinlock); + JOURNALFILE_FLAGS flags = journalfile->v2.flags; + spinlock_unlock(&journalfile->v2.spinlock); + + if(flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION) { + // we need the entire metrics directory into memory to process it + madvise_willneed(journalfile->mmap.data, journalfile->v2.size_of_directory); + } + else { + // let the kernel know that we don't want read-ahead on this file + madvise_random(journalfile->mmap.data, journalfile->mmap.size); + // madvise_dontneed(journalfile->mmap.data, journalfile->mmap.size); + } } } @@ -163,7 +240,7 @@ static struct journal_v2_header *journalfile_v2_mounted_data_get(struct rrdengin *data_size = journalfile->mmap.size; } - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); return j2_header; } @@ -173,20 +250,20 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo if(!have_locks) { if(!wait) { - if (!netdata_spinlock_trylock(&journalfile->mmap.spinlock)) + if (!spinlock_trylock(&journalfile->mmap.spinlock)) return false; } else - netdata_spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); if(!wait) { - if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) { - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + if(!spinlock_trylock(&journalfile->v2.spinlock)) { + spinlock_unlock(&journalfile->mmap.spinlock); return false; } } else - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); } if(!journalfile->v2.refcount) { @@ -194,7 +271,7 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo if (munmap(journalfile->mmap.data, journalfile->mmap.size)) { char path[RRDENG_PATH_MAX]; journalfile_v2_generate_path(journalfile->datafile, path, sizeof(path)); - error("DBENGINE: failed to unmap index file '%s'", path); + netdata_log_error("DBENGINE: failed to unmap index file '%s'", path); internal_fatal(true, "DBENGINE: failed to unmap file '%s'", path); ctx_fs_error(journalfile->datafile->ctx); } @@ -209,8 +286,8 @@ static bool journalfile_v2_mounted_data_unmount(struct rrdengine_journalfile *jo } if(!have_locks) { - netdata_spinlock_unlock(&journalfile->v2.spinlock); - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); } return unmounted; @@ -230,7 +307,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) { for (datafile = ctx->datafiles.first; datafile; datafile = datafile->next) { struct rrdengine_journalfile *journalfile = datafile->journalfile; - if(!netdata_spinlock_trylock(&journalfile->v2.spinlock)) + if(!spinlock_trylock(&journalfile->v2.spinlock)) continue; bool unmount = false; @@ -244,7 +321,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) { // 2 minutes have passed since last use unmount = true; } - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); if (unmount) journalfile_v2_mounted_data_unmount(journalfile, false, false); @@ -254,7 +331,7 @@ void journalfile_v2_data_unmount_cleanup(time_t now_s) { } struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfile *journalfile, size_t *data_size, time_t wanted_first_time_s, time_t wanted_last_time_s) { - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE); bool is_mounted = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_MOUNTED); @@ -276,7 +353,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi } } - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); if(do_we_need_it) return journalfile_v2_mounted_data_get(journalfile, data_size); @@ -285,7 +362,7 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi } void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) { - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); internal_fatal(!journalfile->mmap.data, "trying to release a journalfile without data"); internal_fatal(journalfile->v2.refcount < 1, "trying to release a non-acquired journalfile"); @@ -300,7 +377,7 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) { if(journalfile->v2.flags & JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION) unmount = true; } - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); if(unmount) journalfile_v2_mounted_data_unmount(journalfile, false, true); @@ -308,25 +385,25 @@ void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile) { bool journalfile_v2_data_available(struct rrdengine_journalfile *journalfile) { - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->v2.spinlock); bool has_data = (journalfile->v2.flags & JOURNALFILE_FLAG_IS_AVAILABLE); - netdata_spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); return has_data; } size_t journalfile_v2_data_size_get(struct rrdengine_journalfile *journalfile) { - netdata_spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); size_t data_size = journalfile->mmap.size; - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); return data_size; } void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, void *journal_data, uint32_t journal_data_size) { - netdata_spinlock_lock(&journalfile->mmap.spinlock); - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->v2.spinlock); internal_fatal(journalfile->mmap.fd != -1, "DBENGINE JOURNALFILE: trying to re-set journal fd"); internal_fatal(journalfile->mmap.data, "DBENGINE JOURNALFILE: trying to re-set journal_data"); @@ -341,22 +418,27 @@ void journalfile_v2_data_set(struct rrdengine_journalfile *journalfile, int fd, struct journal_v2_header *j2_header = journalfile->mmap.data; journalfile->v2.first_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC); journalfile->v2.last_time_s = (time_t)(j2_header->end_time_ut / USEC_PER_SEC); + journalfile->v2.size_of_directory = j2_header->metric_offset + j2_header->metric_count * sizeof(struct journal_metric_list); journalfile_v2_mounted_data_unmount(journalfile, true, true); - netdata_spinlock_unlock(&journalfile->v2.spinlock); - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); + + njfv2idx_add(journalfile->datafile); } static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile *journalfile) { + njfv2idx_remove(journalfile->datafile); + bool has_references = false; do { if (has_references) sleep_usec(10 * USEC_PER_MS); - netdata_spinlock_lock(&journalfile->mmap.spinlock); - netdata_spinlock_lock(&journalfile->v2.spinlock); + spinlock_lock(&journalfile->mmap.spinlock); + spinlock_lock(&journalfile->v2.spinlock); if(journalfile_v2_mounted_data_unmount(journalfile, true, true)) { if(journalfile->mmap.fd != -1) @@ -374,8 +456,8 @@ static void journalfile_v2_data_unmap_permanently(struct rrdengine_journalfile * internal_error(true, "DBENGINE JOURNALFILE: waiting for journalfile to be available to unmap..."); } - netdata_spinlock_unlock(&journalfile->v2.spinlock); - netdata_spinlock_unlock(&journalfile->mmap.spinlock); + spinlock_unlock(&journalfile->v2.spinlock); + spinlock_unlock(&journalfile->mmap.spinlock); } while(has_references); } @@ -384,9 +466,9 @@ struct rrdengine_journalfile *journalfile_alloc_and_init(struct rrdengine_datafi { struct rrdengine_journalfile *journalfile = callocz(1, sizeof(struct rrdengine_journalfile)); journalfile->datafile = datafile; - netdata_spinlock_init(&journalfile->mmap.spinlock); - netdata_spinlock_init(&journalfile->v2.spinlock); - netdata_spinlock_init(&journalfile->unsafe.spinlock); + spinlock_init(&journalfile->mmap.spinlock); + spinlock_init(&journalfile->v2.spinlock); + spinlock_init(&journalfile->unsafe.spinlock); journalfile->mmap.fd = -1; datafile->journalfile = journalfile; return journalfile; @@ -401,7 +483,7 @@ static int close_uv_file(struct rrdengine_datafile *datafile, uv_file file) ret = uv_fs_close(NULL, &req, file, NULL); if (ret < 0) { journalfile_v1_generate_path(datafile, path, sizeof(path)); - error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); ctx_fs_error(datafile->ctx); } uv_fs_req_cleanup(&req); @@ -430,7 +512,7 @@ int journalfile_unlink(struct rrdengine_journalfile *journalfile) ret = uv_fs_unlink(NULL, &req, path, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -454,7 +536,7 @@ int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct if (journalfile->file) { ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -464,14 +546,14 @@ int journalfile_destroy_unsafe(struct rrdengine_journalfile *journalfile, struct // This is the new journal v2 index file ret = uv_fs_unlink(NULL, &req, path_v2, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); ret = uv_fs_unlink(NULL, &req, path, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -516,7 +598,7 @@ int journalfile_create(struct rrdengine_journalfile *journalfile, struct rrdengi ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { fatal_assert(req.result < 0); - error("DBENGINE: uv_fs_write: %s", uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_write: %s", uv_strerror(ret)); ctx_io_error(ctx); } uv_fs_req_cleanup(&req); @@ -548,7 +630,7 @@ static int journalfile_check_superblock(uv_file file) ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_read: %s", uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_read: %s", uv_strerror(ret)); uv_fs_req_cleanup(&req); goto error; } @@ -557,7 +639,7 @@ static int journalfile_check_superblock(uv_file file) if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) || strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) { - error("DBENGINE: File has invalid superblock."); + netdata_log_error("DBENGINE: File has invalid superblock."); ret = UV_EINVAL; } else { ret = 0; @@ -569,7 +651,7 @@ static int journalfile_check_superblock(uv_file file) static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, void *buf, unsigned max_size) { - static BITMAP256 page_error_map; + static BITMAP256 page_error_map = BITMAP256_INITIALIZER; unsigned i, count, payload_length, descr_size; struct rrdeng_jf_store_data *jf_metric_data; @@ -578,7 +660,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, descr_size = sizeof(*jf_metric_data->descr) * count; payload_length = sizeof(*jf_metric_data) + descr_size; if (payload_length > max_size) { - error("DBENGINE: corrupted transaction payload."); + netdata_log_error("DBENGINE: corrupted transaction payload."); return; } @@ -589,7 +671,7 @@ static void journalfile_restore_extent_metadata(struct rrdengine_instance *ctx, if (page_type > PAGE_TYPE_MAX) { if (!bitmap256_get_bit(&page_error_map, page_type)) { - error("DBENGINE: unknown page type %d encountered.", page_type); + netdata_log_error("DBENGINE: unknown page type %d encountered.", page_type); bitmap256_set_bit(&page_error_map, page_type, 1); } continue; @@ -658,36 +740,36 @@ static unsigned journalfile_replay_transaction(struct rrdengine_instance *ctx, s *id = 0; jf_header = buf; if (STORE_PADDING == jf_header->type) { - debug(D_RRDENGINE, "Skipping padding."); + netdata_log_debug(D_RRDENGINE, "Skipping padding."); return 0; } if (sizeof(*jf_header) > max_size) { - error("DBENGINE: corrupted transaction record, skipping."); + netdata_log_error("DBENGINE: corrupted transaction record, skipping."); return 0; } *id = jf_header->id; payload_length = jf_header->payload_length; size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer); if (size_bytes > max_size) { - error("DBENGINE: corrupted transaction record, skipping."); + netdata_log_error("DBENGINE: corrupted transaction record, skipping."); return 0; } jf_trailer = buf + sizeof(*jf_header) + payload_length; crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, buf, sizeof(*jf_header) + payload_length); ret = crc32cmp(jf_trailer->checksum, crc); - debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED"); + netdata_log_debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED"); if (unlikely(ret)) { - error("DBENGINE: transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id); + netdata_log_error("DBENGINE: transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id); return size_bytes; } switch (jf_header->type) { case STORE_DATA: - debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); + netdata_log_debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id); journalfile_restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length); break; default: - error("DBENGINE: unknown transaction type, skipping record."); + netdata_log_error("DBENGINE: unknown transaction type, skipping record."); break; } @@ -725,7 +807,7 @@ static uint64_t journalfile_iterate_transactions(struct rrdengine_instance *ctx, iov = uv_buf_init(buf, size_bytes); ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret)); uv_fs_req_cleanup(&req); goto skip_file; } @@ -764,7 +846,7 @@ static int journalfile_check_v2_extent_list (void *data_start, size_t file_size) crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, (uint8_t *) data_start + j2_header->extent_offset, j2_header->extent_count * sizeof(struct journal_extent_list)); if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) { - error("DBENGINE: extent list CRC32 check: FAILED"); + netdata_log_error("DBENGINE: extent list CRC32 check: FAILED"); return 1; } @@ -784,7 +866,7 @@ static int journalfile_check_v2_metric_list(void *data_start, size_t file_size) crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, (uint8_t *) data_start + j2_header->metric_offset, j2_header->metric_count * sizeof(struct journal_metric_list)); if (unlikely(crc32cmp(journal_v2_trailer->checksum, crc))) { - error("DBENGINE: metric list CRC32 check: FAILED"); + netdata_log_error("DBENGINE: metric list CRC32 check: FAILED"); return 1; } return 0; @@ -828,19 +910,19 @@ static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size rc = crc32cmp(journal_v2_trailer->checksum, crc); if (unlikely(rc)) { - error("DBENGINE: file CRC32 check: FAILED"); + netdata_log_error("DBENGINE: file CRC32 check: FAILED"); return 1; } rc = journalfile_check_v2_extent_list(data_start, journal_v2_file_size); if (rc) return 1; - rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size); - if (rc) return 1; - if (!db_engine_journal_check) return 0; + rc = journalfile_check_v2_metric_list(data_start, journal_v2_file_size); + if (rc) return 1; + // Verify complete UUID chain struct journal_metric_list *metric = (void *) (data_start + j2_header->metric_offset); @@ -849,7 +931,7 @@ static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size unsigned entries; unsigned total_pages = 0; - info("DBENGINE: checking %u metrics that exist in the journal", j2_header->metric_count); + netdata_log_info("DBENGINE: checking %u metrics that exist in the journal", j2_header->metric_count); for (entries = 0; entries < j2_header->metric_count; entries++) { char uuid_str[UUID_STR_LEN]; @@ -880,16 +962,16 @@ static int journalfile_v2_validate(void *data_start, size_t journal_v2_file_size metric++; if ((uint32_t)((uint8_t *) metric - (uint8_t *) data_start) > (uint32_t) journal_v2_file_size) { - info("DBENGINE: verification failed EOF reached -- total entries %u, verified %u", entries, verified); + netdata_log_info("DBENGINE: verification failed EOF reached -- total entries %u, verified %u", entries, verified); return 1; } } if (entries != verified) { - info("DBENGINE: verification failed -- total entries %u, verified %u", entries, verified); + netdata_log_info("DBENGINE: verification failed -- total entries %u, verified %u", entries, verified); return 1; } - info("DBENGINE: verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages); + netdata_log_info("DBENGINE: verification succeeded -- total entries %u, verified %u (%u total pages)", entries, verified, total_pages); return 0; } @@ -905,15 +987,25 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st uint8_t *data_start = (uint8_t *)j2_header; uint32_t entries = j2_header->metric_count; + if (journalfile->v2.flags & JOURNALFILE_FLAG_METRIC_CRC_CHECK) { + journalfile->v2.flags &= ~JOURNALFILE_FLAG_METRIC_CRC_CHECK; + if (journalfile_check_v2_metric_list(data_start, j2_header->journal_v2_file_size)) { + journalfile->v2.flags &= ~JOURNALFILE_FLAG_IS_AVAILABLE; + // needs rebuild + return; + } + } + struct journal_metric_list *metric = (struct journal_metric_list *) (data_start + j2_header->metric_offset); time_t header_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC); + time_t global_first_time_s = header_start_time_s; time_t now_s = max_acceptable_collected_time(); for (size_t i=0; i < entries; i++) { time_t start_time_s = header_start_time_s + metric->delta_start_s; time_t end_time_s = header_start_time_s + metric->delta_end_s; - update_metric_retention_and_granularity_by_uuid( - ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s); + mrg_update_metric_retention_and_granularity_by_uuid( + main_mrg, (Word_t)ctx, &metric->uuid, start_time_s, end_time_s, (time_t) metric->update_every_s, now_s); metric++; } @@ -921,12 +1013,18 @@ void journalfile_v2_populate_retention_to_mrg(struct rrdengine_instance *ctx, st journalfile_v2_data_release(journalfile); usec_t ended_ut = now_monotonic_usec(); - info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms" + netdata_log_info("DBENGINE: journal v2 of tier %d, datafile %u populated, size: %0.2f MiB, metrics: %0.2f k, %0.2f ms" , ctx->config.tier, journalfile->datafile->fileno , (double)data_size / 1024 / 1024 , (double)entries / 1000 , ((double)(ended_ut - started_ut) / USEC_PER_MS) ); + + time_t old = __atomic_load_n(&ctx->atomic.first_time_s, __ATOMIC_RELAXED);; + do { + if(old <= global_first_time_s) + break; + } while(!__atomic_compare_exchange_n(&ctx->atomic.first_time_s, &old, global_first_time_s, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); } int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile) @@ -949,13 +1047,13 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal if (errno == ENOENT) return 1; ctx_fs_error(ctx); - error("DBENGINE: failed to open '%s'", path_v2); + netdata_log_error("DBENGINE: failed to open '%s'", path_v2); return 1; } ret = fstat(fd, &statbuf); if (ret) { - error("DBENGINE: failed to get file information for '%s'", path_v2); + netdata_log_error("DBENGINE: failed to get file information for '%s'", path_v2); close(fd); return 1; } @@ -975,7 +1073,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal return 1; } - info("DBENGINE: checking integrity of '%s'", path_v2); + netdata_log_info("DBENGINE: checking integrity of '%s'", path_v2); usec_t validation_start_ut = now_monotonic_usec(); int rc = journalfile_v2_validate(data_start, journal_v2_file_size, journal_v1_file_size); if (unlikely(rc)) { @@ -987,7 +1085,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal error_report("File %s is invalid and it will be rebuilt", path_v2); if (unlikely(munmap(data_start, journal_v2_file_size))) - error("DBENGINE: failed to unmap '%s'", path_v2); + netdata_log_error("DBENGINE: failed to unmap '%s'", path_v2); close(fd); return rc; @@ -998,7 +1096,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal if (unlikely(!entries)) { if (unlikely(munmap(data_start, journal_v2_file_size))) - error("DBENGINE: failed to unmap '%s'", path_v2); + netdata_log_error("DBENGINE: failed to unmap '%s'", path_v2); close(fd); return 1; @@ -1006,7 +1104,7 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal usec_t finished_ut = now_monotonic_usec(); - info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, " + netdata_log_info("DBENGINE: journal v2 '%s' loaded, size: %0.2f MiB, metrics: %0.2f k, " "mmap: %0.2f ms, validate: %0.2f ms" , path_v2 , (double)journal_v2_file_size / 1024 / 1024 @@ -1016,6 +1114,9 @@ int journalfile_v2_load(struct rrdengine_instance *ctx, struct rrdengine_journal ); // Initialize the journal file to be able to access the data + + if (!db_engine_journal_check) + journalfile->v2.flags |= JOURNALFILE_FLAG_METRIC_CRC_CHECK; journalfile_v2_data_set(journalfile, fd, data_start, journal_v2_file_size); ctx_current_disk_space_increase(ctx, journal_v2_file_size); @@ -1179,7 +1280,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno journalfile_v2_generate_path(datafile, path, sizeof(path)); - info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu", + netdata_log_info("DBENGINE: indexing file '%s': extents %zu, metrics %zu, pages %zu", path, number_of_extents, number_of_metrics, @@ -1350,7 +1451,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno internal_error(true, "DBENGINE: FILE COMPLETED --------> %llu", (now_monotonic_usec() - start_loading) / USEC_PER_MS); - info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size); + netdata_log_info("DBENGINE: migrated journal file '%s', file size %zu", path, total_file_size); // msync(data_start, total_file_size, MS_SYNC); journalfile_v2_data_set(journalfile, fd_v2, data_start, total_file_size); @@ -1361,7 +1462,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno return; } else { - info("DBENGINE: failed to build index '%s', file will be skipped", path); + netdata_log_info("DBENGINE: failed to build index '%s', file will be skipped", path); j2_header.data = NULL; j2_header.magic = JOURVAL_V2_SKIP_MAGIC; memcpy(data_start, &j2_header, sizeof(j2_header)); @@ -1378,7 +1479,7 @@ void journalfile_migrate_to_v2_callback(Word_t section, unsigned datafile_fileno if (ret < 0) { ctx_current_disk_space_increase(ctx, total_file_size); ctx_fs_error(ctx); - error("DBENGINE: failed to resize file '%s'", path); + netdata_log_error("DBENGINE: failed to resize file '%s'", path); } else ctx_current_disk_space_increase(ctx, resize_file_to); @@ -1428,19 +1529,19 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil ret = journalfile_check_superblock(file); if (ret) { - info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path); + netdata_log_info("DBENGINE: invalid journal file '%s' ; superblock check failed.", path); error = ret; goto cleanup; } ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_jf_sb)); - info("DBENGINE: loading journal file '%s'", path); + netdata_log_info("DBENGINE: loading journal file '%s'", path); max_id = journalfile_iterate_transactions(ctx, journalfile); __atomic_store_n(&ctx->atomic.transaction_id, MAX(__atomic_load_n(&ctx->atomic.transaction_id, __ATOMIC_RELAXED), max_id + 1), __ATOMIC_RELAXED); - info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size); + netdata_log_info("DBENGINE: journal file '%s' loaded (size:%"PRIu64").", path, file_size); bool is_last_file = (ctx_last_fileno_get(ctx) == journalfile->datafile->fileno); if (is_last_file && journalfile->datafile->pos <= rrdeng_target_data_file_size(ctx) / 3) { @@ -1459,7 +1560,7 @@ int journalfile_load(struct rrdengine_instance *ctx, struct rrdengine_journalfil cleanup: ret = uv_fs_close(NULL, &req, file, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); diff --git a/database/engine/journalfile.h b/database/engine/journalfile.h index f6be6bcd..5cdf72b9 100644 --- a/database/engine/journalfile.h +++ b/database/engine/journalfile.h @@ -21,6 +21,7 @@ typedef enum __attribute__ ((__packed__)) { JOURNALFILE_FLAG_IS_AVAILABLE = (1 << 0), JOURNALFILE_FLAG_IS_MOUNTED = (1 << 1), JOURNALFILE_FLAG_MOUNTED_FOR_RETENTION = (1 << 2), + JOURNALFILE_FLAG_METRIC_CRC_CHECK = (1 << 3), } JOURNALFILE_FLAGS; /* only one event loop is supported for now */ @@ -39,9 +40,14 @@ struct rrdengine_journalfile { time_t first_time_s; time_t last_time_s; time_t not_needed_since_s; + uint32_t size_of_directory; } v2; struct { + Word_t indexed_as; + } njfv2idx; + + struct { SPINLOCK spinlock; uint64_t pos; } unsafe; @@ -51,9 +57,9 @@ struct rrdengine_journalfile { }; static inline uint64_t journalfile_current_size(struct rrdengine_journalfile *journalfile) { - netdata_spinlock_lock(&journalfile->unsafe.spinlock); + spinlock_lock(&journalfile->unsafe.spinlock); uint64_t size = journalfile->unsafe.pos; - netdata_spinlock_unlock(&journalfile->unsafe.spinlock); + spinlock_unlock(&journalfile->unsafe.spinlock); return size; } @@ -157,4 +163,15 @@ struct journal_v2_header *journalfile_v2_data_acquire(struct rrdengine_journalfi void journalfile_v2_data_release(struct rrdengine_journalfile *journalfile); void journalfile_v2_data_unmount_cleanup(time_t now_s); +typedef struct { + bool init; + Word_t last; + time_t wanted_start_time_s; + time_t wanted_end_time_s; + struct rrdengine_instance *ctx; + struct journal_v2_header *j2_header_acquired; +} NJFV2IDX_FIND_STATE; + +struct rrdengine_datafile *njfv2idx_find_and_acquire_j2_header(NJFV2IDX_FIND_STATE *s); + #endif /* NETDATA_JOURNALFILE_H */
\ No newline at end of file diff --git a/database/engine/metric.c b/database/engine/metric.c index 6b65df9b..1370f9d7 100644 --- a/database/engine/metric.c +++ b/database/engine/metric.c @@ -16,6 +16,7 @@ struct metric { time_t latest_time_s_hot; // latest time of the currently collected page uint32_t latest_update_every_s; // pid_t writer; + uint8_t partition; METRIC_FLAGS flags; REFCOUNT refcount; SPINLOCK spinlock; // protects all variable members @@ -27,103 +28,98 @@ struct metric { static struct aral_statistics mrg_aral_statistics; struct mrg { - ARAL *aral[MRG_PARTITIONS]; + size_t partitions; - struct pgc_index { - netdata_rwlock_t rwlock; - Pvoid_t uuid_judy; // each UUID has a JudyL of sections (tiers) - } index[MRG_PARTITIONS]; + struct mrg_partition { + ARAL *aral; // not protected by our spinlock - it has its own - struct mrg_statistics stats; + RW_SPINLOCK rw_spinlock; + Pvoid_t uuid_judy; // JudyHS: each UUID has a JudyL of sections (tiers) - size_t entries_per_partition[MRG_PARTITIONS]; + struct mrg_statistics stats; + } index[]; }; -static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg) { - __atomic_add_fetch(&mrg->stats.additions_duplicate, 1, __ATOMIC_RELAXED); +static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg, size_t partition) { + mrg->index[partition].stats.additions_duplicate++; } static inline void MRG_STATS_ADDED_METRIC(MRG *mrg, size_t partition) { - __atomic_add_fetch(&mrg->stats.entries, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&mrg->stats.additions, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&mrg->stats.size, sizeof(METRIC), __ATOMIC_RELAXED); - - __atomic_add_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED); + mrg->index[partition].stats.entries++; + mrg->index[partition].stats.additions++; + mrg->index[partition].stats.size += sizeof(METRIC); } static inline void MRG_STATS_DELETED_METRIC(MRG *mrg, size_t partition) { - __atomic_sub_fetch(&mrg->stats.entries, 1, __ATOMIC_RELAXED); - __atomic_sub_fetch(&mrg->stats.size, sizeof(METRIC), __ATOMIC_RELAXED); - __atomic_add_fetch(&mrg->stats.deletions, 1, __ATOMIC_RELAXED); - - __atomic_sub_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED); + mrg->index[partition].stats.entries--; + mrg->index[partition].stats.size -= sizeof(METRIC); + mrg->index[partition].stats.deletions++; } -static inline void MRG_STATS_SEARCH_HIT(MRG *mrg) { - __atomic_add_fetch(&mrg->stats.search_hits, 1, __ATOMIC_RELAXED); +static inline void MRG_STATS_SEARCH_HIT(MRG *mrg, size_t partition) { + __atomic_add_fetch(&mrg->index[partition].stats.search_hits, 1, __ATOMIC_RELAXED); } -static inline void MRG_STATS_SEARCH_MISS(MRG *mrg) { - __atomic_add_fetch(&mrg->stats.search_misses, 1, __ATOMIC_RELAXED); +static inline void MRG_STATS_SEARCH_MISS(MRG *mrg, size_t partition) { + __atomic_add_fetch(&mrg->index[partition].stats.search_misses, 1, __ATOMIC_RELAXED); } -static inline void MRG_STATS_DELETE_MISS(MRG *mrg) { - __atomic_add_fetch(&mrg->stats.delete_misses, 1, __ATOMIC_RELAXED); +static inline void MRG_STATS_DELETE_MISS(MRG *mrg, size_t partition) { + mrg->index[partition].stats.delete_misses++; } -static inline void mrg_index_read_lock(MRG *mrg, size_t partition) { - netdata_rwlock_rdlock(&mrg->index[partition].rwlock); -} -static inline void mrg_index_read_unlock(MRG *mrg, size_t partition) { - netdata_rwlock_unlock(&mrg->index[partition].rwlock); -} -static inline void mrg_index_write_lock(MRG *mrg, size_t partition) { - netdata_rwlock_wrlock(&mrg->index[partition].rwlock); -} -static inline void mrg_index_write_unlock(MRG *mrg, size_t partition) { - netdata_rwlock_unlock(&mrg->index[partition].rwlock); -} +#define mrg_index_read_lock(mrg, partition) rw_spinlock_read_lock(&(mrg)->index[partition].rw_spinlock) +#define mrg_index_read_unlock(mrg, partition) rw_spinlock_read_unlock(&(mrg)->index[partition].rw_spinlock) +#define mrg_index_write_lock(mrg, partition) rw_spinlock_write_lock(&(mrg)->index[partition].rw_spinlock) +#define mrg_index_write_unlock(mrg, partition) rw_spinlock_write_unlock(&(mrg)->index[partition].rw_spinlock) + +#define metric_lock(metric) spinlock_lock(&(metric)->spinlock) +#define metric_unlock(metric) spinlock_unlock(&(metric)->spinlock) -static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl) { +static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl, size_t partition) { if(mem_after_judyl > mem_before_judyl) - __atomic_add_fetch(&mrg->stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->index[partition].stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED); else if(mem_after_judyl < mem_before_judyl) - __atomic_sub_fetch(&mrg->stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED); + __atomic_sub_fetch(&mrg->index[partition].stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED); } -static inline void mrg_stats_size_judyhs_added_uuid(MRG *mrg) { - __atomic_add_fetch(&mrg->stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED); +static inline void mrg_stats_size_judyhs_added_uuid(MRG *mrg, size_t partition) { + __atomic_add_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED); } -static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg) { - __atomic_sub_fetch(&mrg->stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED); +static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg, size_t partition) { + __atomic_sub_fetch(&mrg->index[partition].stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED); } static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) { uint8_t *u = (uint8_t *)uuid; - return u[UUID_SZ - 1] % MRG_PARTITIONS; + size_t *n = (size_t *)&u[UUID_SZ - sizeof(size_t)]; + return *n % mrg->partitions; } static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) { + size_t partition = metric->partition; + bool has_retention = (metric->first_time_s > 0 || metric->latest_time_s_clean > 0 || metric->latest_time_s_hot > 0); if(has_retention && !(metric->flags & METRIC_FLAG_HAS_RETENTION)) { metric->flags |= METRIC_FLAG_HAS_RETENTION; - __atomic_add_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED); } else if(!has_retention && (metric->flags & METRIC_FLAG_HAS_RETENTION)) { metric->flags &= ~METRIC_FLAG_HAS_RETENTION; - __atomic_sub_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&mrg->index[partition].stats.entries_with_retention, 1, __ATOMIC_RELAXED); } return has_retention; } static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, bool having_spinlock) { + size_t partition = metric->partition; REFCOUNT refcount; if(!having_spinlock) - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(unlikely(metric->refcount < 0)) fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount); @@ -134,21 +130,22 @@ static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, b metric_has_retention_unsafe(mrg, metric); if(!having_spinlock) - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); if(refcount == 1) - __atomic_add_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); return refcount; } static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) { bool ret = true; + size_t partition = metric->partition; REFCOUNT refcount; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(unlikely(metric->refcount <= 0)) fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount); @@ -158,20 +155,20 @@ static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, ME if(likely(metric_has_retention_unsafe(mrg, metric) || refcount != 0)) ret = false; - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); if(unlikely(!refcount)) - __atomic_sub_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&mrg->index[partition].stats.entries_referenced, 1, __ATOMIC_RELAXED); - __atomic_sub_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&mrg->index[partition].stats.current_references, 1, __ATOMIC_RELAXED); return ret; } -static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { +static inline METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { size_t partition = uuid_partition(mrg, &entry->uuid); - METRIC *allocation = aral_mallocz(mrg->aral[partition]); + METRIC *allocation = aral_mallocz(mrg->index[partition].aral); mrg_index_write_lock(mrg, partition); @@ -182,12 +179,12 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array"); if(unlikely(!*sections_judy_pptr)) - mrg_stats_size_judyhs_added_uuid(mrg); + mrg_stats_size_judyhs_added_uuid(mrg, partition); mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); Pvoid_t *PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0); mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); - mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl); + mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); if(unlikely(!PValue || PValue == PJERR)) fatal("DBENGINE METRIC: corrupted section JudyL array"); @@ -196,18 +193,21 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { METRIC *metric = *PValue; metric_acquire(mrg, metric, false); + + MRG_STATS_DUPLICATE_ADD(mrg, partition); + mrg_index_write_unlock(mrg, partition); if(ret) *ret = false; - aral_freez(mrg->aral[partition], allocation); + aral_freez(mrg->index[partition].aral, allocation); - MRG_STATS_DUPLICATE_ADD(mrg); return metric; } METRIC *metric = allocation; + // memcpy(metric->uuid, entry->uuid, sizeof(uuid_t)); uuid_copy(metric->uuid, entry->uuid); metric->section = entry->section; metric->first_time_s = MAX(0, entry->first_time_s); @@ -217,21 +217,22 @@ static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) { metric->writer = 0; metric->refcount = 0; metric->flags = 0; - netdata_spinlock_init(&metric->spinlock); + metric->partition = partition; + spinlock_init(&metric->spinlock); metric_acquire(mrg, metric, true); // no spinlock use required here *PValue = metric; + MRG_STATS_ADDED_METRIC(mrg, partition); + mrg_index_write_unlock(mrg, partition); if(ret) *ret = true; - MRG_STATS_ADDED_METRIC(mrg, partition); - return metric; } -static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { +static inline METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { size_t partition = uuid_partition(mrg, uuid); mrg_index_read_lock(mrg, partition); @@ -239,14 +240,14 @@ static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t)); if(unlikely(!sections_judy_pptr)) { mrg_index_read_unlock(mrg, partition); - MRG_STATS_SEARCH_MISS(mrg); + MRG_STATS_SEARCH_MISS(mrg, partition); return NULL; } Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0); if(unlikely(!PValue)) { mrg_index_read_unlock(mrg, partition); - MRG_STATS_SEARCH_MISS(mrg); + MRG_STATS_SEARCH_MISS(mrg, partition); return NULL; } @@ -256,38 +257,38 @@ static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { mrg_index_read_unlock(mrg, partition); - MRG_STATS_SEARCH_HIT(mrg); + MRG_STATS_SEARCH_HIT(mrg, partition); return metric; } -static bool acquired_metric_del(MRG *mrg, METRIC *metric) { - size_t partition = uuid_partition(mrg, &metric->uuid); +static inline bool acquired_metric_del(MRG *mrg, METRIC *metric) { + size_t partition = metric->partition; size_t mem_before_judyl, mem_after_judyl; mrg_index_write_lock(mrg, partition); if(!metric_release_and_can_be_deleted(mrg, metric)) { + mrg->index[partition].stats.delete_having_retention_or_referenced++; mrg_index_write_unlock(mrg, partition); - __atomic_add_fetch(&mrg->stats.delete_having_retention_or_referenced, 1, __ATOMIC_RELAXED); return false; } Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t)); if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) { + MRG_STATS_DELETE_MISS(mrg, partition); mrg_index_write_unlock(mrg, partition); - MRG_STATS_DELETE_MISS(mrg); return false; } mem_before_judyl = JudyLMemUsed(*sections_judy_pptr); int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0); mem_after_judyl = JudyLMemUsed(*sections_judy_pptr); - mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl); + mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl, partition); if(unlikely(!rc)) { + MRG_STATS_DELETE_MISS(mrg, partition); mrg_index_write_unlock(mrg, partition); - MRG_STATS_DELETE_MISS(mrg); return false; } @@ -295,14 +296,14 @@ static bool acquired_metric_del(MRG *mrg, METRIC *metric) { rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0); if(unlikely(!rc)) fatal("DBENGINE METRIC: cannot delete UUID from JudyHS"); - mrg_stats_size_judyhs_removed_uuid(mrg); + mrg_stats_size_judyhs_removed_uuid(mrg, partition); } - mrg_index_write_unlock(mrg, partition); + MRG_STATS_DELETED_METRIC(mrg, partition); - aral_freez(mrg->aral[partition], metric); + mrg_index_write_unlock(mrg, partition); - MRG_STATS_DELETED_METRIC(mrg, partition); + aral_freez(mrg->index[partition].aral, metric); return true; } @@ -310,38 +311,34 @@ static bool acquired_metric_del(MRG *mrg, METRIC *metric) { // ---------------------------------------------------------------------------- // public API -MRG *mrg_create(void) { - MRG *mrg = callocz(1, sizeof(MRG)); +inline MRG *mrg_create(ssize_t partitions) { + if(partitions < 1) + partitions = get_netdata_cpus(); + + MRG *mrg = callocz(1, sizeof(MRG) + sizeof(struct mrg_partition) * partitions); + mrg->partitions = partitions; - for(size_t i = 0; i < MRG_PARTITIONS ; i++) { - netdata_rwlock_init(&mrg->index[i].rwlock); + for(size_t i = 0; i < mrg->partitions ; i++) { + rw_spinlock_init(&mrg->index[i].rw_spinlock); char buf[ARAL_MAX_NAME + 1]; snprintfz(buf, ARAL_MAX_NAME, "mrg[%zu]", i); - mrg->aral[i] = aral_create(buf, - sizeof(METRIC), - 0, - 16384, - &mrg_aral_statistics, - NULL, NULL, false, - false); + mrg->index[i].aral = aral_create(buf, sizeof(METRIC), 0, 16384, &mrg_aral_statistics, NULL, NULL, false, false); } - mrg->stats.size = sizeof(MRG); - return mrg; } -size_t mrg_aral_structures(void) { +inline size_t mrg_aral_structures(void) { return aral_structures_from_stats(&mrg_aral_statistics); } -size_t mrg_aral_overhead(void) { +inline size_t mrg_aral_overhead(void) { return aral_overhead_from_stats(&mrg_aral_statistics); } -void mrg_destroy(MRG *mrg __maybe_unused) { +inline void mrg_destroy(MRG *mrg __maybe_unused) { // no destruction possible // we can't traverse the metrics list @@ -351,57 +348,57 @@ void mrg_destroy(MRG *mrg __maybe_unused) { ; } -METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) { +inline METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) { // internal_fatal(entry.latest_time_s > max_acceptable_collected_time(), // "DBENGINE METRIC: metric latest time is in the future"); return metric_add_and_acquire(mrg, &entry, ret); } -METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { +inline METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) { return metric_get_and_acquire(mrg, uuid, section); } -bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) { +inline bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) { return acquired_metric_del(mrg, metric); } -METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { +inline METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) { metric_acquire(mrg, metric, false); return metric; } -bool mrg_metric_release(MRG *mrg, METRIC *metric) { +inline bool mrg_metric_release(MRG *mrg, METRIC *metric) { return metric_release_and_can_be_deleted(mrg, metric); } -Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) { +inline Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) { return (Word_t)metric; } -uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) { +inline uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) { return &metric->uuid; } -Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) { +inline Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) { return metric->section; } -bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { +inline bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative"); if(unlikely(first_time_s < 0)) return false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); metric->first_time_s = first_time_s; metric_has_retention_unsafe(mrg, metric); - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return true; } -void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) { +inline void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) { internal_fatal(first_time_s < 0 || last_time_s < 0 || update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); internal_fatal(first_time_s > max_acceptable_collected_time(), @@ -421,7 +418,7 @@ void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t if(unlikely(!first_time_s && !last_time_s && !update_every_s)) return; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(unlikely(first_time_s && (!metric->first_time_s || first_time_s < metric->first_time_s))) metric->first_time_s = first_time_s; @@ -436,29 +433,29 @@ void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t metric->latest_update_every_s = (uint32_t) update_every_s; metric_has_retention_unsafe(mrg, metric); - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); } -bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { +inline bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) { internal_fatal(first_time_s < 0, "DBENGINE METRIC: timestamp is negative"); bool ret = false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(first_time_s > metric->first_time_s) { metric->first_time_s = first_time_s; ret = true; } metric_has_retention_unsafe(mrg, metric); - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return ret; } -time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { +inline time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { time_t first_time_s; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(unlikely(!metric->first_time_s)) { if(metric->latest_time_s_clean) @@ -470,13 +467,13 @@ time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) { first_time_s = metric->first_time_s; - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return first_time_s; } -void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) { - netdata_spinlock_lock(&metric->spinlock); +inline void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) { + metric_lock(metric); if(unlikely(!metric->first_time_s)) { if(metric->latest_time_s_clean) @@ -490,16 +487,16 @@ void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *f *last_time_s = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot); *update_every_s = metric->latest_update_every_s; - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); } -bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { +inline bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative"); if(unlikely(latest_time_s < 0)) return false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); // internal_fatal(latest_time_s > max_acceptable_collected_time(), // "DBENGINE METRIC: metric latest time is in the future"); @@ -513,12 +510,12 @@ bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, metric->first_time_s = latest_time_s; metric_has_retention_unsafe(mrg, metric); - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return true; } // returns true when metric still has retention -bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) { +inline bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) { Word_t section = mrg_metric_section(mrg, metric); bool do_again = false; size_t countdown = 5; @@ -551,7 +548,7 @@ bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) { if (min_first_time_s == LONG_MAX) min_first_time_s = 0; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if (--countdown && !min_first_time_s && metric->latest_time_s_hot) do_again = true; else { @@ -563,13 +560,13 @@ bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) { ret = metric_has_retention_unsafe(mrg, metric); } - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); } while(do_again); return ret; } -bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { +inline bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) { internal_fatal(latest_time_s < 0, "DBENGINE METRIC: timestamp is negative"); // internal_fatal(latest_time_s > max_acceptable_collected_time(), @@ -578,204 +575,215 @@ bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, t if(unlikely(latest_time_s < 0)) return false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); metric->latest_time_s_hot = latest_time_s; if(unlikely(!metric->first_time_s)) metric->first_time_s = latest_time_s; metric_has_retention_unsafe(mrg, metric); - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return true; } -time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) { +inline time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) { time_t max; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); max = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot); - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return max; } -bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { +inline bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); if(update_every_s <= 0) return false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); metric->latest_update_every_s = (uint32_t) update_every_s; - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return true; } -bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { +inline bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) { internal_fatal(update_every_s < 0, "DBENGINE METRIC: timestamp is negative"); if(update_every_s <= 0) return false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(!metric->latest_update_every_s) metric->latest_update_every_s = (uint32_t) update_every_s; - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return true; } -time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) { +inline time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) { time_t update_every_s; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); update_every_s = metric->latest_update_every_s; - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return update_every_s; } -bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) { +inline bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) { bool done = false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(!metric->writer) { metric->writer = gettid(); - __atomic_add_fetch(&mrg->stats.writers, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED); done = true; } else - __atomic_add_fetch(&mrg->stats.writers_conflicts, 1, __ATOMIC_RELAXED); - netdata_spinlock_unlock(&metric->spinlock); + __atomic_add_fetch(&mrg->index[metric->partition].stats.writers_conflicts, 1, __ATOMIC_RELAXED); + metric_unlock(metric); return done; } -bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) { +inline bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) { bool done = false; - netdata_spinlock_lock(&metric->spinlock); + metric_lock(metric); if(metric->writer) { metric->writer = 0; - __atomic_sub_fetch(&mrg->stats.writers, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&mrg->index[metric->partition].stats.writers, 1, __ATOMIC_RELAXED); done = true; } - netdata_spinlock_unlock(&metric->spinlock); + metric_unlock(metric); return done; } -struct mrg_statistics mrg_get_statistics(MRG *mrg) { - // FIXME - use atomics - return mrg->stats; -} - -// ---------------------------------------------------------------------------- -// unit test +inline void mrg_update_metric_retention_and_granularity_by_uuid( + MRG *mrg, Word_t section, uuid_t *uuid, + time_t first_time_s, time_t last_time_s, + time_t update_every_s, time_t now_s) +{ + if(unlikely(last_time_s > now_s)) { + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "DBENGINE JV2: wrong last time on-disk (%ld - %ld, now %ld), " + "fixing last time to now", + first_time_s, last_time_s, now_s); + last_time_s = now_s; + } -#ifdef MRG_STRESS_TEST + if (unlikely(first_time_s > last_time_s)) { + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "DBENGINE JV2: wrong first time on-disk (%ld - %ld, now %ld), " + "fixing first time to last time", + first_time_s, last_time_s, now_s); -static void mrg_stress(MRG *mrg, size_t entries, size_t sections) { - bool ret; + first_time_s = last_time_s; + } - info("DBENGINE METRIC: stress testing %zu entries on %zu sections...", entries, sections); + if (unlikely(first_time_s == 0 || last_time_s == 0)) { + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "DBENGINE JV2: zero on-disk timestamps (%ld - %ld, now %ld), " + "using them as-is", + first_time_s, last_time_s, now_s); + } - METRIC *array[entries][sections]; - for(size_t i = 0; i < entries ; i++) { - MRG_ENTRY e = { - .first_time_s = (time_t)(i + 1), - .latest_time_s = (time_t)(i + 2), - .latest_update_every_s = (time_t)(i + 3), + bool added = false; + METRIC *metric = mrg_metric_get_and_acquire(mrg, uuid, section); + if (!metric) { + MRG_ENTRY entry = { + .section = section, + .first_time_s = first_time_s, + .last_time_s = last_time_s, + .latest_update_every_s = (uint32_t) update_every_s }; - uuid_generate_random(e.uuid); - - for(size_t section = 0; section < sections ;section++) { - e.section = section; - array[i][section] = mrg_metric_add_and_acquire(mrg, e, &ret); - if(!ret) - fatal("DBENGINE METRIC: failed to add metric %zu, section %zu", i, section); - - if(mrg_metric_add_and_acquire(mrg, e, &ret) != array[i][section]) - fatal("DBENGINE METRIC: adding the same metric twice, returns a different metric"); - - if(ret) - fatal("DBENGINE METRIC: adding the same metric twice, returns success"); - - if(mrg_metric_get_and_acquire(mrg, &e.uuid, e.section) != array[i][section]) - fatal("DBENGINE METRIC: cannot get back the same metric"); - - if(uuid_compare(*mrg_metric_uuid(mrg, array[i][section]), e.uuid) != 0) - fatal("DBENGINE METRIC: uuids do not match"); - } + // memcpy(entry.uuid, *uuid, sizeof(uuid_t)); + uuid_copy(entry.uuid, *uuid); + metric = mrg_metric_add_and_acquire(mrg, entry, &added); } - for(size_t i = 0; i < entries ; i++) { - for (size_t section = 0; section < sections; section++) { - uuid_t uuid; - uuid_generate_random(uuid); - - if(mrg_metric_get_and_acquire(mrg, &uuid, section)) - fatal("DBENGINE METRIC: found non-existing uuid"); - - if(mrg_metric_id(mrg, array[i][section]) != (Word_t)array[i][section]) - fatal("DBENGINE METRIC: metric id does not match"); - - if(mrg_metric_get_first_time_s(mrg, array[i][section]) != (time_t)(i + 1)) - fatal("DBENGINE METRIC: wrong first time returned"); - if(mrg_metric_get_latest_time_s(mrg, array[i][section]) != (time_t)(i + 2)) - fatal("DBENGINE METRIC: wrong latest time returned"); - if(mrg_metric_get_update_every_s(mrg, array[i][section]) != (time_t)(i + 3)) - fatal("DBENGINE METRIC: wrong latest time returned"); - - if(!mrg_metric_set_first_time_s(mrg, array[i][section], (time_t)((i + 1) * 2))) - fatal("DBENGINE METRIC: cannot set first time"); - if(!mrg_metric_set_clean_latest_time_s(mrg, array[i][section], (time_t) ((i + 1) * 3))) - fatal("DBENGINE METRIC: cannot set latest time"); - if(!mrg_metric_set_update_every(mrg, array[i][section], (time_t)((i + 1) * 4))) - fatal("DBENGINE METRIC: cannot set update every"); - - if(mrg_metric_get_first_time_s(mrg, array[i][section]) != (time_t)((i + 1) * 2)) - fatal("DBENGINE METRIC: wrong first time returned"); - if(mrg_metric_get_latest_time_s(mrg, array[i][section]) != (time_t)((i + 1) * 3)) - fatal("DBENGINE METRIC: wrong latest time returned"); - if(mrg_metric_get_update_every_s(mrg, array[i][section]) != (time_t)((i + 1) * 4)) - fatal("DBENGINE METRIC: wrong latest time returned"); - } + if (likely(!added)) + mrg_metric_expand_retention(mrg, metric, first_time_s, last_time_s, update_every_s); + + mrg_metric_release(mrg, metric); +} + +inline void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s) { + memset(s, 0, sizeof(struct mrg_statistics)); + + for(size_t i = 0; i < mrg->partitions ;i++) { + s->entries += __atomic_load_n(&mrg->index[i].stats.entries, __ATOMIC_RELAXED); + s->entries_referenced += __atomic_load_n(&mrg->index[i].stats.entries_referenced, __ATOMIC_RELAXED); + s->entries_with_retention += __atomic_load_n(&mrg->index[i].stats.entries_with_retention, __ATOMIC_RELAXED); + s->size += __atomic_load_n(&mrg->index[i].stats.size, __ATOMIC_RELAXED); + s->current_references += __atomic_load_n(&mrg->index[i].stats.current_references, __ATOMIC_RELAXED); + s->additions += __atomic_load_n(&mrg->index[i].stats.additions, __ATOMIC_RELAXED); + s->additions_duplicate += __atomic_load_n(&mrg->index[i].stats.additions_duplicate, __ATOMIC_RELAXED); + s->deletions += __atomic_load_n(&mrg->index[i].stats.deletions, __ATOMIC_RELAXED); + s->delete_having_retention_or_referenced += __atomic_load_n(&mrg->index[i].stats.delete_having_retention_or_referenced, __ATOMIC_RELAXED); + s->delete_misses += __atomic_load_n(&mrg->index[i].stats.delete_misses, __ATOMIC_RELAXED); + s->search_hits += __atomic_load_n(&mrg->index[i].stats.search_hits, __ATOMIC_RELAXED); + s->search_misses += __atomic_load_n(&mrg->index[i].stats.search_misses, __ATOMIC_RELAXED); + s->writers += __atomic_load_n(&mrg->index[i].stats.writers, __ATOMIC_RELAXED); + s->writers_conflicts += __atomic_load_n(&mrg->index[i].stats.writers_conflicts, __ATOMIC_RELAXED); } - for(size_t i = 0; i < entries ; i++) { - for (size_t section = 0; section < sections; section++) { - if(!mrg_metric_release_and_delete(mrg, array[i][section])) - fatal("DBENGINE METRIC: failed to delete metric"); - } - } + s->size += sizeof(MRG) + sizeof(struct mrg_partition) * mrg->partitions; } -static void *mrg_stress_test_thread1(void *ptr) { - MRG *mrg = ptr; +// ---------------------------------------------------------------------------- +// unit test - for(int i = 0; i < 5 ; i++) - mrg_stress(mrg, 10000, 5); +struct mrg_stress_entry { + uuid_t uuid; + time_t after; + time_t before; +}; - return ptr; -} +struct mrg_stress { + MRG *mrg; + bool stop; + size_t entries; + struct mrg_stress_entry *array; + size_t updates; +}; -static void *mrg_stress_test_thread2(void *ptr) { - MRG *mrg = ptr; +static void *mrg_stress(void *ptr) { + struct mrg_stress *t = ptr; + MRG *mrg = t->mrg; - for(int i = 0; i < 10 ; i++) - mrg_stress(mrg, 500, 50); + ssize_t start = 0; + ssize_t end = (ssize_t)t->entries; + ssize_t step = 1; - return ptr; -} + if(gettid() % 2) { + start = (ssize_t)t->entries - 1; + end = -1; + step = -1; + } + + while(!__atomic_load_n(&t->stop, __ATOMIC_RELAXED)) { + for (ssize_t i = start; i != end; i += step) { + struct mrg_stress_entry *e = &t->array[i]; -static void *mrg_stress_test_thread3(void *ptr) { - MRG *mrg = ptr; + time_t after = __atomic_sub_fetch(&e->after, 1, __ATOMIC_RELAXED); + time_t before = __atomic_add_fetch(&e->before, 1, __ATOMIC_RELAXED); - for(int i = 0; i < 50 ; i++) - mrg_stress(mrg, 5000, 1); + mrg_update_metric_retention_and_granularity_by_uuid( + mrg, 0x01, + &e->uuid, + after, + before, + 1, + before); + + __atomic_add_fetch(&t->updates, 1, __ATOMIC_RELAXED); + } + } return ptr; } -#endif int mrg_unittest(void) { - MRG *mrg = mrg_create(); + MRG *mrg = mrg_create(0); METRIC *m1_t0, *m2_t0, *m3_t0, *m4_t0; METRIC *m1_t1, *m2_t1, *m3_t1, *m4_t1; bool ret; @@ -850,54 +858,84 @@ int mrg_unittest(void) { if(!mrg_metric_release_and_delete(mrg, m1_t1)) fatal("DBENGINE METRIC: cannot delete the second metric"); - if(mrg->stats.entries != 0) + struct mrg_statistics s; + mrg_get_statistics(mrg, &s); + if(s.entries != 0) fatal("DBENGINE METRIC: invalid entries counter"); -#ifdef MRG_STRESS_TEST - usec_t started_ut = now_monotonic_usec(); - pthread_t thread1; - netdata_thread_create(&thread1, "TH1", - NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, - mrg_stress_test_thread1, mrg); + size_t entries = 1000000; + size_t threads = mrg->partitions / 3 + 1; + size_t tiers = 3; + size_t run_for_secs = 5; + netdata_log_info("preparing stress test of %zu entries...", entries); + struct mrg_stress t = { + .mrg = mrg, + .entries = entries, + .array = callocz(entries, sizeof(struct mrg_stress_entry)), + }; - pthread_t thread2; - netdata_thread_create(&thread2, "TH2", - NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, - mrg_stress_test_thread2, mrg); + time_t now = max_acceptable_collected_time(); + for(size_t i = 0; i < entries ;i++) { + uuid_generate_random(t.array[i].uuid); + t.array[i].after = now / 3; + t.array[i].before = now / 2; + } + netdata_log_info("stress test is populating MRG with 3 tiers..."); + for(size_t i = 0; i < entries ;i++) { + struct mrg_stress_entry *e = &t.array[i]; + for(size_t tier = 1; tier <= tiers ;tier++) { + mrg_update_metric_retention_and_granularity_by_uuid( + mrg, tier, + &e->uuid, + e->after, + e->before, + 1, + e->before); + } + } + netdata_log_info("stress test ready to run..."); - pthread_t thread3; - netdata_thread_create(&thread3, "TH3", - NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, - mrg_stress_test_thread3, mrg); + usec_t started_ut = now_monotonic_usec(); + pthread_t th[threads]; + for(size_t i = 0; i < threads ; i++) { + char buf[15 + 1]; + snprintfz(buf, 15, "TH[%zu]", i); + netdata_thread_create(&th[i], buf, + NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG, + mrg_stress, &t); + } - sleep_usec(5 * USEC_PER_SEC); + sleep_usec(run_for_secs * USEC_PER_SEC); + __atomic_store_n(&t.stop, true, __ATOMIC_RELAXED); - netdata_thread_cancel(thread1); - netdata_thread_cancel(thread2); - netdata_thread_cancel(thread3); + for(size_t i = 0; i < threads ; i++) + netdata_thread_cancel(th[i]); + + for(size_t i = 0; i < threads ; i++) + netdata_thread_join(th[i], NULL); - netdata_thread_join(thread1, NULL); - netdata_thread_join(thread2, NULL); - netdata_thread_join(thread3, NULL); usec_t ended_ut = now_monotonic_usec(); - info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, " + struct mrg_statistics stats; + mrg_get_statistics(mrg, &stats); + + netdata_log_info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, " "%zu deletions, %zu wrong deletions, " "%zu successful searches, %zu wrong searches, " - "%zu successful pointer validations, %zu wrong pointer validations " "in %llu usecs", - mrg->stats.additions, mrg->stats.additions_duplicate, - mrg->stats.deletions, mrg->stats.delete_misses, - mrg->stats.search_hits, mrg->stats.search_misses, - mrg->stats.pointer_validation_hits, mrg->stats.pointer_validation_misses, + stats.additions, stats.additions_duplicate, + stats.deletions, stats.delete_misses, + stats.search_hits, stats.search_misses, ended_ut - started_ut); -#endif + netdata_log_info("DBENGINE METRIC: updates performance: %0.2fk/sec total, %0.2fk/sec/thread", + (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0, + (double)t.updates / (double)((ended_ut - started_ut) / USEC_PER_SEC) / 1000.0 / threads); mrg_destroy(mrg); - info("DBENGINE METRIC: all tests passed!"); + netdata_log_info("DBENGINE METRIC: all tests passed!"); return 0; } diff --git a/database/engine/metric.h b/database/engine/metric.h index 82aff903..5cb5b045 100644 --- a/database/engine/metric.h +++ b/database/engine/metric.h @@ -3,7 +3,7 @@ #include "../rrd.h" -#define MRG_PARTITIONS 10 +#define MRG_CACHE_LINE_PADDING(x) uint8_t padding##x[64] typedef struct metric METRIC; typedef struct mrg MRG; @@ -17,13 +17,10 @@ typedef struct mrg_entry { } MRG_ENTRY; struct mrg_statistics { - size_t entries; - size_t entries_referenced; - size_t entries_with_retention; - - size_t size; // total memory used, with indexing + // --- non-atomic --- under a write lock - size_t current_references; + size_t entries; + size_t size; // total memory used, with indexing size_t additions; size_t additions_duplicate; @@ -32,14 +29,28 @@ struct mrg_statistics { size_t delete_having_retention_or_referenced; size_t delete_misses; + MRG_CACHE_LINE_PADDING(0); + + // --- atomic --- multiple readers / writers + + size_t entries_referenced; + + MRG_CACHE_LINE_PADDING(1); + size_t entries_with_retention; + + MRG_CACHE_LINE_PADDING(2); + size_t current_references; + + MRG_CACHE_LINE_PADDING(3); size_t search_hits; size_t search_misses; + MRG_CACHE_LINE_PADDING(4); size_t writers; size_t writers_conflicts; }; -MRG *mrg_create(void); +MRG *mrg_create(ssize_t partitions); void mrg_destroy(MRG *mrg); METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric); @@ -72,8 +83,14 @@ bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric); bool mrg_metric_set_writer(MRG *mrg, METRIC *metric); bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric); -struct mrg_statistics mrg_get_statistics(MRG *mrg); +void mrg_get_statistics(MRG *mrg, struct mrg_statistics *s); size_t mrg_aral_structures(void); size_t mrg_aral_overhead(void); + +void mrg_update_metric_retention_and_granularity_by_uuid( + MRG *mrg, Word_t section, uuid_t *uuid, + time_t first_time_s, time_t last_time_s, + time_t update_every_s, time_t now_s); + #endif // DBENGINE_METRIC_H diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index 02d08a16..c608c327 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -387,15 +387,17 @@ static size_t list_has_time_gaps( time_t wanted_end_time_s, size_t *pages_total, size_t *pages_found_pass4, - size_t *pages_pending, + size_t *pages_to_load_from_disk, size_t *pages_overlapping, time_t *optimal_end_time_s, - bool populate_gaps + bool populate_gaps, + PDC_PAGE_STATUS *common_status ) { // we will recalculate these, so zero them - *pages_pending = 0; + *pages_to_load_from_disk = 0; *pages_overlapping = 0; *optimal_end_time_s = 0; + *common_status = 0; bool first; Pvoid_t *PValue; @@ -461,6 +463,7 @@ static size_t list_has_time_gaps( (*pages_overlapping)++; pd->status |= PDC_PAGE_SKIP; pd->status &= ~(PDC_PAGE_READY | PDC_PAGE_DISK_PENDING); + *common_status |= pd->status; continue; } @@ -480,7 +483,7 @@ static size_t list_has_time_gaps( } else if(!(pd->status & PDC_PAGE_FAILED) && (pd->status & PDC_PAGE_DATAFILE_ACQUIRED)) { - (*pages_pending)++; + (*pages_to_load_from_disk)++; pd->status |= PDC_PAGE_DISK_PENDING; @@ -495,6 +498,8 @@ static size_t list_has_time_gaps( pd->status &= ~PDC_PAGE_DISK_PENDING; pd->status |= (PDC_PAGE_READY | PDC_PAGE_PRELOADED); } + + *common_status |= pd->status; } internal_fatal(pages_pass2 != pages_pass3, @@ -505,6 +510,8 @@ static size_t list_has_time_gaps( return gaps; } +// ---------------------------------------------------------------------------- + typedef void (*page_found_callback_t)(PGC_PAGE *page, void *data); static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METRIC *metric, usec_t start_time_ut, usec_t end_time_ut, page_found_callback_t callback, void *callback_data) { uuid_t *uuid = mrg_metric_uuid(main_mrg, metric); @@ -515,12 +522,19 @@ static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METR size_t pages_found = 0; - uv_rwlock_rdlock(&ctx->datafiles.rwlock); + NJFV2IDX_FIND_STATE state = { + .init = false, + .last = 0, + .ctx = ctx, + .wanted_start_time_s = wanted_start_time_s, + .wanted_end_time_s = wanted_end_time_s, + .j2_header_acquired = NULL, + }; + struct rrdengine_datafile *datafile; - for(datafile = ctx->datafiles.first; datafile ; datafile = datafile->next) { - struct journal_v2_header *j2_header = journalfile_v2_data_acquire(datafile->journalfile, NULL, - wanted_start_time_s, - wanted_end_time_s); + while((datafile = njfv2idx_find_and_acquire_j2_header(&state))) { + struct journal_v2_header *j2_header = state.j2_header_acquired; + if (unlikely(!j2_header)) continue; @@ -595,7 +609,6 @@ static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METR journalfile_v2_data_release(datafile->journalfile); } - uv_rwlock_rdunlock(&ctx->datafiles.rwlock); return pages_found; } @@ -644,10 +657,13 @@ static Pvoid_t get_page_list( METRIC *metric, usec_t start_time_ut, usec_t end_time_ut, - size_t *pages_to_load, - time_t *optimal_end_time_s + time_t *optimal_end_time_s, + size_t *pages_to_load_from_disk, + PDC_PAGE_STATUS *common_status ) { *optimal_end_time_s = 0; + *pages_to_load_from_disk = 0; + *common_status = 0; Pvoid_t JudyL_page_array = (Pvoid_t) NULL; @@ -658,14 +674,13 @@ static Pvoid_t get_page_list( pages_found_in_open_cache = 0, pages_found_in_journals_v2 = 0, pages_found_pass4 = 0, - pages_pending = 0, pages_overlapping = 0, pages_total = 0; size_t cache_gaps = 0, query_gaps = 0; bool done_v2 = false, done_open = false; - usec_t pass1_ut = 0, pass2_ut = 0, pass3_ut = 0, pass4_ut = 0; + usec_t pass1_ut = 0, pass2_ut = 0, pass3_ut = 0, pass4_ut = 0, finish_ut = 0; // -------------------------------------------------------------- // PASS 1: Check what the main page cache has available @@ -680,8 +695,8 @@ static Pvoid_t get_page_list( if(pages_found_in_main_cache && !cache_gaps) { query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s, - &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping, - optimal_end_time_s, false); + &pages_total, &pages_found_pass4, pages_to_load_from_disk, &pages_overlapping, + optimal_end_time_s, false, common_status); if (pages_total && !query_gaps) goto we_are_done; @@ -702,8 +717,8 @@ static Pvoid_t get_page_list( if(pages_found_in_open_cache) { query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s, - &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping, - optimal_end_time_s, false); + &pages_total, &pages_found_pass4, pages_to_load_from_disk, &pages_overlapping, + optimal_end_time_s, false, common_status); if (pages_total && !query_gaps) goto we_are_done; @@ -726,15 +741,11 @@ static Pvoid_t get_page_list( pass4_ut = now_monotonic_usec(); query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s, - &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping, - optimal_end_time_s, true); + &pages_total, &pages_found_pass4, pages_to_load_from_disk, &pages_overlapping, + optimal_end_time_s, true, common_status); we_are_done: - - if(pages_to_load) - *pages_to_load = pages_pending; - - usec_t finish_ut = now_monotonic_usec(); + finish_ut = now_monotonic_usec(); time_delta(finish_ut, pass4_ut); time_delta(finish_ut, pass3_ut); time_delta(finish_ut, pass2_ut); @@ -754,7 +765,7 @@ we_are_done: __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_meta_source_journal_v2, pages_found_in_journals_v2, __ATOMIC_RELAXED); __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache, pages_found_in_main_cache, __ATOMIC_RELAXED); __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache_at_pass4, pages_found_pass4, __ATOMIC_RELAXED); - __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_to_load_from_disk, pages_pending, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_to_load_from_disk, *pages_to_load_from_disk, __ATOMIC_RELAXED); __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_overlapping_skipped, pages_overlapping, __ATOMIC_RELAXED); return JudyL_page_array; @@ -773,14 +784,23 @@ void rrdeng_prep_query(struct page_details_control *pdc, bool worker) { if(worker) worker_is_busy(UV_EVENT_DBENGINE_QUERY); - size_t pages_to_load = 0; pdc->page_list_JudyL = get_page_list(pdc->ctx, pdc->metric, pdc->start_time_s * USEC_PER_SEC, pdc->end_time_s * USEC_PER_SEC, - &pages_to_load, - &pdc->optimal_end_time_s); + &pdc->optimal_end_time_s, + &pdc->pages_to_load_from_disk, + &pdc->common_status); + + internal_fatal(pdc->pages_to_load_from_disk && !(pdc->common_status & PDC_PAGE_DISK_PENDING), + "DBENGINE: PDC reports there are %zu pages to load from disk, " + "but none of the pages has the PDC_PAGE_DISK_PENDING flag", + pdc->pages_to_load_from_disk); + + internal_fatal(!pdc->pages_to_load_from_disk && (pdc->common_status & PDC_PAGE_DISK_PENDING), + "DBENGINE: PDC reports there are no pages to load from disk, " + "but one or more pages have the PDC_PAGE_DISK_PENDING flag"); - if (pages_to_load && pdc->page_list_JudyL) { + if (pdc->pages_to_load_from_disk && pdc->page_list_JudyL) { pdc_acquire(pdc); // we get 1 for the 1st worker in the chain: do_read_page_list_work() usec_t start_ut = now_monotonic_usec(); if(likely(pdc->priority == STORAGE_PRIORITY_SYNCHRONOUS)) @@ -822,7 +842,7 @@ void pg_cache_preload(struct rrdeng_query_handle *handle) { handle->pdc->optimal_end_time_s = handle->end_time_s; handle->pdc->ctx = handle->ctx; handle->pdc->refcount = 1; - netdata_spinlock_init(&handle->pdc->refcount_spinlock); + spinlock_init(&handle->pdc->refcount_spinlock); completion_init(&handle->pdc->prep_completion); completion_init(&handle->pdc->page_completion); @@ -1063,7 +1083,7 @@ size_t dynamic_extent_cache_size(void) { void pgc_and_mrg_initialize(void) { - main_mrg = mrg_create(); + main_mrg = mrg_create(0); size_t target_cache_size = (size_t)default_rrdeng_page_cache_mb * 1024ULL * 1024ULL; size_t main_cache_size = (target_cache_size / 100) * 95; diff --git a/database/engine/pdc.c b/database/engine/pdc.c index 42fb2f6d..7da56878 100644 --- a/database/engine/pdc.c +++ b/database/engine/pdc.c @@ -198,7 +198,7 @@ void extent_buffer_init(void) { void extent_buffer_cleanup1(void) { struct extent_buffer *item = NULL; - if(!netdata_spinlock_trylock(&extent_buffer_globals.protected.spinlock)) + if(!spinlock_trylock(&extent_buffer_globals.protected.spinlock)) return; if(extent_buffer_globals.protected.available_items && extent_buffer_globals.protected.available > 1) { @@ -207,7 +207,7 @@ void extent_buffer_cleanup1(void) { extent_buffer_globals.protected.available--; } - netdata_spinlock_unlock(&extent_buffer_globals.protected.spinlock); + spinlock_unlock(&extent_buffer_globals.protected.spinlock); if(item) { size_t bytes = sizeof(struct extent_buffer) + item->bytes; @@ -225,13 +225,13 @@ struct extent_buffer *extent_buffer_get(size_t size) { if(size < extent_buffer_globals.max_size) size = extent_buffer_globals.max_size; - netdata_spinlock_lock(&extent_buffer_globals.protected.spinlock); + spinlock_lock(&extent_buffer_globals.protected.spinlock); if(likely(extent_buffer_globals.protected.available_items)) { eb = extent_buffer_globals.protected.available_items; DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(extent_buffer_globals.protected.available_items, eb, cache.prev, cache.next); extent_buffer_globals.protected.available--; } - netdata_spinlock_unlock(&extent_buffer_globals.protected.spinlock); + spinlock_unlock(&extent_buffer_globals.protected.spinlock); if(unlikely(eb && eb->bytes < size)) { size_t bytes = sizeof(struct extent_buffer) + eb->bytes; @@ -255,10 +255,10 @@ struct extent_buffer *extent_buffer_get(size_t size) { void extent_buffer_release(struct extent_buffer *eb) { if(unlikely(!eb)) return; - netdata_spinlock_lock(&extent_buffer_globals.protected.spinlock); + spinlock_lock(&extent_buffer_globals.protected.spinlock); DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(extent_buffer_globals.protected.available_items, eb, cache.prev, cache.next); extent_buffer_globals.protected.available++; - netdata_spinlock_unlock(&extent_buffer_globals.protected.spinlock); + spinlock_unlock(&extent_buffer_globals.protected.spinlock); } size_t extent_buffer_cache_size(void) { @@ -400,20 +400,20 @@ static void pdc_destroy(PDC *pdc) { } void pdc_acquire(PDC *pdc) { - netdata_spinlock_lock(&pdc->refcount_spinlock); + spinlock_lock(&pdc->refcount_spinlock); if(pdc->refcount < 1) fatal("DBENGINE: pdc is not referenced and cannot be acquired"); pdc->refcount++; - netdata_spinlock_unlock(&pdc->refcount_spinlock); + spinlock_unlock(&pdc->refcount_spinlock); } bool pdc_release_and_destroy_if_unreferenced(PDC *pdc, bool worker, bool router __maybe_unused) { if(unlikely(!pdc)) return true; - netdata_spinlock_lock(&pdc->refcount_spinlock); + spinlock_lock(&pdc->refcount_spinlock); if(pdc->refcount <= 0) fatal("DBENGINE: pdc is not referenced and cannot be released"); @@ -429,12 +429,12 @@ bool pdc_release_and_destroy_if_unreferenced(PDC *pdc, bool worker, bool router } if (pdc->refcount == 0) { - netdata_spinlock_unlock(&pdc->refcount_spinlock); + spinlock_unlock(&pdc->refcount_spinlock); pdc_destroy(pdc); return true; } - netdata_spinlock_unlock(&pdc->refcount_spinlock); + spinlock_unlock(&pdc->refcount_spinlock); return false; } @@ -456,7 +456,7 @@ static struct rrdeng_cmd *epdl_get_cmd(void *epdl_ptr) { static bool epdl_pending_add(EPDL *epdl) { bool added_new; - netdata_spinlock_lock(&epdl->datafile->extent_queries.spinlock); + spinlock_lock(&epdl->datafile->extent_queries.spinlock); Pvoid_t *PValue = JudyLIns(&epdl->datafile->extent_queries.pending_epdl_by_extent_offset_judyL, epdl->extent_offset, PJE0); internal_fatal(!PValue || PValue == PJERR, "DBENGINE: corrupted pending extent judy"); @@ -478,20 +478,20 @@ static bool epdl_pending_add(EPDL *epdl) { DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(base, epdl, query.prev, query.next); *PValue = base; - netdata_spinlock_unlock(&epdl->datafile->extent_queries.spinlock); + spinlock_unlock(&epdl->datafile->extent_queries.spinlock); return added_new; } static void epdl_pending_del(EPDL *epdl) { - netdata_spinlock_lock(&epdl->datafile->extent_queries.spinlock); + spinlock_lock(&epdl->datafile->extent_queries.spinlock); if(epdl->head_to_datafile_extent_queries_pending_for_extent) { epdl->head_to_datafile_extent_queries_pending_for_extent = false; int rc = JudyLDel(&epdl->datafile->extent_queries.pending_epdl_by_extent_offset_judyL, epdl->extent_offset, PJE0); (void) rc; internal_fatal(!rc, "DBENGINE: epdl not found in pending list"); } - netdata_spinlock_unlock(&epdl->datafile->extent_queries.spinlock); + spinlock_unlock(&epdl->datafile->extent_queries.spinlock); } void pdc_to_epdl_router(struct rrdengine_instance *ctx, PDC *pdc, execute_extent_page_details_list_t exec_first_extent_list, execute_extent_page_details_list_t exec_rest_extent_list) diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 7811a5ea..ce363183 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -351,7 +351,7 @@ static struct { static void wal_cleanup1(void) { WAL *wal = NULL; - if(!netdata_spinlock_trylock(&wal_globals.protected.spinlock)) + if(!spinlock_trylock(&wal_globals.protected.spinlock)) return; if(wal_globals.protected.available_items && wal_globals.protected.available > storage_tiers) { @@ -360,7 +360,7 @@ static void wal_cleanup1(void) { wal_globals.protected.available--; } - netdata_spinlock_unlock(&wal_globals.protected.spinlock); + spinlock_unlock(&wal_globals.protected.spinlock); if(wal) { posix_memfree(wal->buf); @@ -375,7 +375,7 @@ WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) { WAL *wal = NULL; - netdata_spinlock_lock(&wal_globals.protected.spinlock); + spinlock_lock(&wal_globals.protected.spinlock); if(likely(wal_globals.protected.available_items)) { wal = wal_globals.protected.available_items; @@ -384,7 +384,7 @@ WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) { } uint64_t transaction_id = __atomic_fetch_add(&ctx->atomic.transaction_id, 1, __ATOMIC_RELAXED); - netdata_spinlock_unlock(&wal_globals.protected.spinlock); + spinlock_unlock(&wal_globals.protected.spinlock); if(unlikely(!wal)) { wal = mallocz(sizeof(WAL)); @@ -416,10 +416,10 @@ WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) { void wal_release(WAL *wal) { if(unlikely(!wal)) return; - netdata_spinlock_lock(&wal_globals.protected.spinlock); + spinlock_lock(&wal_globals.protected.spinlock); DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next); wal_globals.protected.available++; - netdata_spinlock_unlock(&wal_globals.protected.spinlock); + spinlock_unlock(&wal_globals.protected.spinlock); } // ---------------------------------------------------------------------------- @@ -459,7 +459,7 @@ void rrdeng_dequeue_epdl_cmd(struct rrdeng_cmd *cmd) { } void rrdeng_req_cmd(requeue_callback_t get_cmd_cb, void *data, STORAGE_PRIORITY priority) { - netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); + spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); struct rrdeng_cmd *cmd = get_cmd_cb(data); if(cmd) { @@ -472,7 +472,7 @@ void rrdeng_req_cmd(requeue_callback_t get_cmd_cb, void *data, STORAGE_PRIORITY } } - netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); + spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); } void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, void *data, struct completion *completion, @@ -489,12 +489,12 @@ void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, v cmd->priority = priority; cmd->dequeue_cb = dequeue_cb; - netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); + spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next); rrdeng_main.cmd_queue.unsafe.waiting++; if(enqueue_cb) enqueue_cb(cmd); - netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); + spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); fatal_assert(0 == uv_async_send(&rrdeng_main.async)); } @@ -532,7 +532,7 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) { } // find an opcode to execute from the queue - netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); + spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); for(STORAGE_PRIORITY priority = min_priority; priority <= max_priority ; priority++) { cmd = rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority]; if(cmd) { @@ -559,7 +559,7 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) { cmd->dequeue_cb = NULL; } - netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); + spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); struct rrdeng_cmd ret; if(cmd) { @@ -712,9 +712,9 @@ static void *extent_flushed_to_open_tp_worker(struct rrdengine_instance *ctx __m posix_memfree(xt_io_descr->buf); extent_io_descriptor_release(xt_io_descr); - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); datafile->writers.flushed_to_open_running--; - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); if(datafile->fileno != ctx_last_fileno_get(ctx) && still_running) // we just finished a flushing on a datafile that is not the active one @@ -733,15 +733,15 @@ static void after_extent_write_datafile_io(uv_fs_t *uv_fs_request) { if (uv_fs_request->result < 0) { ctx_io_error(ctx); - error("DBENGINE: %s: uv_fs_write(): %s", __func__, uv_strerror((int)uv_fs_request->result)); + netdata_log_error("DBENGINE: %s: uv_fs_write(): %s", __func__, uv_strerror((int)uv_fs_request->result)); } journalfile_v1_extent_write(ctx, xt_io_descr->datafile, xt_io_descr->wal, &rrdeng_main.loop); - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); datafile->writers.running--; datafile->writers.flushed_to_open_running++; - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); rrdeng_enq_cmd(xt_io_descr->ctx, RRDENG_OPCODE_FLUSHED_TO_OPEN, @@ -756,12 +756,12 @@ static void after_extent_write_datafile_io(uv_fs_t *uv_fs_request) { static bool datafile_is_full(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) { bool ret = false; - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); if(ctx_is_available_for_queries(ctx) && datafile->pos > rrdeng_target_data_file_size(ctx)) ret = true; - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); return ret; } @@ -773,9 +773,9 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_ uv_rwlock_rdlock(&ctx->datafiles.rwlock); datafile = ctx->datafiles.first->prev; // become a writer on this datafile, to prevent it from vanishing - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); datafile->writers.running++; - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); uv_rwlock_rdunlock(&ctx->datafiles.rwlock); if(datafile_is_full(ctx, datafile)) { @@ -791,7 +791,7 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_ datafile = ctx->datafiles.first->prev; uv_rwlock_rdunlock(&ctx->datafiles.rwlock); - if(datafile_is_full(ctx, datafile) && create_new_datafile_pair(ctx) == 0) + if(datafile_is_full(ctx, datafile) && create_new_datafile_pair(ctx, true) == 0) rrdeng_enq_cmd(ctx, RRDENG_OPCODE_JOURNAL_INDEX, datafile, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); @@ -801,15 +801,15 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_ uv_rwlock_rdlock(&ctx->datafiles.rwlock); datafile = ctx->datafiles.first->prev; // become a writer on this datafile, to prevent it from vanishing - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); datafile->writers.running++; - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); uv_rwlock_rdunlock(&ctx->datafiles.rwlock); // release the writers on the old datafile - netdata_spinlock_lock(&old_datafile->writers.spinlock); + spinlock_lock(&old_datafile->writers.spinlock); old_datafile->writers.running--; - netdata_spinlock_unlock(&old_datafile->writers.spinlock); + spinlock_unlock(&old_datafile->writers.spinlock); } return datafile; @@ -921,11 +921,11 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta real_io_size = ALIGN_BYTES_CEILING(size_bytes); datafile = get_datafile_to_write_extent(ctx); - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); xt_io_descr->datafile = datafile; xt_io_descr->pos = datafile->pos; datafile->pos += real_io_size; - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); xt_io_descr->bytes = size_bytes; xt_io_descr->uv_fs_request.data = xt_io_descr; @@ -998,12 +998,14 @@ struct rrdengine_datafile *datafile_release_and_acquire_next_for_retention(struc return next_datafile; } -void find_uuid_first_time( +time_t find_uuid_first_time( struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, struct uuid_first_time_s *uuid_first_entry_list, size_t count) { + time_t global_first_time_s = LONG_MAX; + // acquire the datafile to work with it uv_rwlock_rdlock(&ctx->datafiles.rwlock); while(datafile && !datafile_acquire(datafile, DATAFILE_ACQUIRE_RETENTION)) @@ -1011,7 +1013,7 @@ void find_uuid_first_time( uv_rwlock_rdunlock(&ctx->datafiles.rwlock); if (unlikely(!datafile)) - return; + return global_first_time_s; unsigned journalfile_count = 0; size_t binary_match = 0; @@ -1025,6 +1027,10 @@ void find_uuid_first_time( } time_t journal_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC); + + if(journal_start_time_s < global_first_time_s) + global_first_time_s = journal_start_time_s; + struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset); struct uuid_first_time_s *uuid_original_entry; @@ -1137,9 +1143,13 @@ void find_uuid_first_time( without_retention, without_metric ); + + return global_first_time_s; } static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile_to_delete, struct rrdengine_datafile *first_datafile_remaining, bool worker) { + time_t global_first_time_s = LONG_MAX; + if(worker) worker_is_busy(UV_EVENT_DBENGINE_FIND_ROTATED_METRICS); @@ -1174,7 +1184,7 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r added++; } - info("DBENGINE: recalculating tier %d retention for %zu metrics starting with datafile %u", + netdata_log_info("DBENGINE: recalculating tier %d retention for %zu metrics starting with datafile %u", ctx->config.tier, count, first_datafile_remaining->fileno); journalfile_v2_data_release(journalfile); @@ -1184,12 +1194,12 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r if(worker) worker_is_busy(UV_EVENT_DBENGINE_FIND_REMAINING_RETENTION); - find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added); + global_first_time_s = find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added); if(worker) worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG); - info("DBENGINE: updating tier %d metrics registry retention for %zu metrics", + netdata_log_info("DBENGINE: updating tier %d metrics registry retention for %zu metrics", ctx->config.tier, added); size_t deleted_metrics = 0, zero_retention_referenced = 0, zero_disk_retention = 0, zero_disk_but_live = 0; @@ -1223,6 +1233,9 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r "DBENGINE: deleted %zu metrics, zero retention but referenced %zu (out of %zu total, of which %zu have main cache retention) zero on-disk retention tier %d metrics from metrics registry", deleted_metrics, zero_retention_referenced, zero_disk_retention, zero_disk_but_live, ctx->config.tier); + if(global_first_time_s != LONG_MAX) + __atomic_store_n(&ctx->atomic.first_time_s, global_first_time_s, __ATOMIC_RELAXED); + if(worker) worker_is_idle(); } @@ -1243,7 +1256,7 @@ void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile * datafile_got_for_deletion = datafile_acquire_for_deletion(datafile); if (!datafile_got_for_deletion) { - info("DBENGINE: waiting for data file '%s/" + netdata_log_info("DBENGINE: waiting for data file '%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION "' to be available for deletion, " "it is in use currently by %u users.", @@ -1255,7 +1268,7 @@ void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile * } __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_deletion_started, 1, __ATOMIC_RELAXED); - info("DBENGINE: deleting data file '%s/" + netdata_log_info("DBENGINE: deleting data file '%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION "'.", ctx->config.dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); @@ -1277,26 +1290,26 @@ void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile * journal_file_bytes = journalfile_current_size(journal_file); deleted_bytes = journalfile_v2_data_size_get(journal_file); - info("DBENGINE: deleting data and journal files to maintain disk quota"); + netdata_log_info("DBENGINE: deleting data and journal files to maintain disk quota"); ret = journalfile_destroy_unsafe(journal_file, datafile); if (!ret) { journalfile_v1_generate_path(datafile, path, sizeof(path)); - info("DBENGINE: deleted journal file \"%s\".", path); + netdata_log_info("DBENGINE: deleted journal file \"%s\".", path); journalfile_v2_generate_path(datafile, path, sizeof(path)); - info("DBENGINE: deleted journal file \"%s\".", path); + netdata_log_info("DBENGINE: deleted journal file \"%s\".", path); deleted_bytes += journal_file_bytes; } ret = destroy_data_file_unsafe(datafile); if (!ret) { generate_datafilepath(datafile, path, sizeof(path)); - info("DBENGINE: deleted data file \"%s\".", path); + netdata_log_info("DBENGINE: deleted data file \"%s\".", path); deleted_bytes += datafile_bytes; } freez(journal_file); freez(datafile); ctx_current_disk_space_decrease(ctx, deleted_bytes); - info("DBENGINE: reclaimed %u bytes of disk space.", deleted_bytes); + netdata_log_info("DBENGINE: reclaimed %u bytes of disk space.", deleted_bytes); } static void *database_rotate_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { @@ -1334,11 +1347,11 @@ static void *populate_mrg_tp_worker(struct rrdengine_instance *ctx __maybe_unuse // find a datafile to work uv_rwlock_rdlock(&ctx->datafiles.rwlock); for(datafile = ctx->datafiles.first; datafile ; datafile = datafile->next) { - if(!netdata_spinlock_trylock(&datafile->populate_mrg.spinlock)) + if(!spinlock_trylock(&datafile->populate_mrg.spinlock)) continue; if(datafile->populate_mrg.populated) { - netdata_spinlock_unlock(&datafile->populate_mrg.spinlock); + spinlock_unlock(&datafile->populate_mrg.spinlock); continue; } @@ -1352,7 +1365,7 @@ static void *populate_mrg_tp_worker(struct rrdengine_instance *ctx __maybe_unuse journalfile_v2_populate_retention_to_mrg(ctx, datafile->journalfile); datafile->populate_mrg.populated = true; - netdata_spinlock_unlock(&datafile->populate_mrg.spinlock); + spinlock_unlock(&datafile->populate_mrg.spinlock); } while(1); @@ -1376,7 +1389,7 @@ static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unuse __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) { if(!logged) { logged = true; - info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...", + netdata_log_info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...", __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED), (ctx->config.legacy) ? -1 : ctx->config.tier); } @@ -1444,7 +1457,7 @@ void async_cb(uv_async_t *handle) { uv_stop(handle->loop); uv_update_time(handle->loop); - debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle)); + netdata_log_debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle)); } #define TIMER_PERIOD_MS (1000) @@ -1496,17 +1509,17 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb continue; } - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); bool available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true; - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); if(!available) { - info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno); + netdata_log_info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno); datafile = datafile->next; continue; } - info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno); + netdata_log_info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno); pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type, journalfile_migrate_to_v2_callback, (void *) datafile->journalfile); @@ -1623,21 +1636,21 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) { static bool spawned = false; static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER; - netdata_spinlock_lock(&spinlock); + spinlock_lock(&spinlock); if(!spawned) { int ret; ret = uv_loop_init(&rrdeng_main.loop); if (ret) { - error("DBENGINE: uv_loop_init(): %s", uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_loop_init(): %s", uv_strerror(ret)); return false; } rrdeng_main.loop.data = &rrdeng_main; ret = uv_async_init(&rrdeng_main.loop, &rrdeng_main.async, async_cb); if (ret) { - error("DBENGINE: uv_async_init(): %s", uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_async_init(): %s", uv_strerror(ret)); fatal_assert(0 == uv_loop_close(&rrdeng_main.loop)); return false; } @@ -1645,7 +1658,7 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) { ret = uv_timer_init(&rrdeng_main.loop, &rrdeng_main.timer); if (ret) { - error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret)); uv_close((uv_handle_t *)&rrdeng_main.async, NULL); fatal_assert(0 == uv_loop_close(&rrdeng_main.loop)); return false; @@ -1658,7 +1671,7 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) { spawned = true; } - netdata_spinlock_unlock(&spinlock); + spinlock_unlock(&spinlock); return true; } @@ -1860,7 +1873,7 @@ void dbengine_event_loop(void* arg) { } /* cleanup operations of the event loop */ - info("DBENGINE: shutting down dbengine thread"); + netdata_log_info("DBENGINE: shutting down dbengine thread"); /* * uv_async_send after uv_close does not seem to crash in linux at the moment, diff --git a/database/engine/rrdengine.h b/database/engine/rrdengine.h index 69e41235..b5476930 100644 --- a/database/engine/rrdengine.h +++ b/database/engine/rrdengine.h @@ -34,31 +34,6 @@ struct rrdeng_cmd; #define RRDENG_FILE_NUMBER_SCAN_TMPL "%1u-%10u" #define RRDENG_FILE_NUMBER_PRINT_TMPL "%1.1u-%10.10u" -typedef struct page_details_control { - struct rrdengine_instance *ctx; - struct metric *metric; - - struct completion prep_completion; - struct completion page_completion; // sync between the query thread and the workers - - Pvoid_t page_list_JudyL; // the list of page details - unsigned completed_jobs; // the number of jobs completed last time the query thread checked - bool workers_should_stop; // true when the query thread left and the workers should stop - bool prep_done; - - SPINLOCK refcount_spinlock; // spinlock to protect refcount - int32_t refcount; // the number of workers currently working on this request + 1 for the query thread - size_t executed_with_gaps; - - time_t start_time_s; - time_t end_time_s; - STORAGE_PRIORITY priority; - - time_t optimal_end_time_s; -} PDC; - -PDC *pdc_get(void); - typedef enum __attribute__ ((__packed__)) { // final status for all pages // if a page does not have one of these, it is considered unroutable @@ -99,6 +74,34 @@ typedef enum __attribute__ ((__packed__)) { #define PDC_PAGE_QUERY_GLOBAL_SKIP_LIST (PDC_PAGE_FAILED | PDC_PAGE_SKIP | PDC_PAGE_INVALID | PDC_PAGE_RELEASED) +typedef struct page_details_control { + struct rrdengine_instance *ctx; + struct metric *metric; + + struct completion prep_completion; + struct completion page_completion; // sync between the query thread and the workers + + Pvoid_t page_list_JudyL; // the list of page details + unsigned completed_jobs; // the number of jobs completed last time the query thread checked + bool workers_should_stop; // true when the query thread left and the workers should stop + bool prep_done; + + PDC_PAGE_STATUS common_status; + size_t pages_to_load_from_disk; + + SPINLOCK refcount_spinlock; // spinlock to protect refcount + int32_t refcount; // the number of workers currently working on this request + 1 for the query thread + size_t executed_with_gaps; + + time_t start_time_s; + time_t end_time_s; + STORAGE_PRIORITY priority; + + time_t optimal_end_time_s; +} PDC; + +PDC *pdc_get(void); + struct page_details { struct { struct rrdengine_datafile *ptr; @@ -362,6 +365,11 @@ struct rrdengine_instance { } datafiles; struct { + RW_SPINLOCK spinlock; + Pvoid_t JudyL; + } njfv2idx; + + struct { unsigned last_fileno; // newest index of datafile and journalfile unsigned last_flush_fileno; // newest index of datafile received data @@ -375,6 +383,8 @@ struct rrdengine_instance { bool migration_to_v2_running; bool now_deleting_files; unsigned extents_currently_being_flushed; // non-zero until we commit data to disk (both datafile and journal file) + + time_t first_time_s; } atomic; struct { diff --git a/database/engine/rrdengineapi.c b/database/engine/rrdengineapi.c index ddc306ed..49df5c81 100755 --- a/database/engine/rrdengineapi.c +++ b/database/engine/rrdengineapi.c @@ -247,7 +247,7 @@ STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metri is_1st_metric_writer = false; char uuid[UUID_STR_LEN + 1]; uuid_unparse(*mrg_metric_uuid(main_mrg, metric), uuid); - error("DBENGINE: metric '%s' is already collected and should not be collected twice - expect gaps on the charts", uuid); + netdata_log_error("DBENGINE: metric '%s' is already collected and should not be collected twice - expect gaps on the charts", uuid); } metric = mrg_metric_dup(main_mrg, metric); @@ -312,7 +312,7 @@ static bool page_has_only_empty_metrics(struct rrdeng_collect_handle *handle) { default: { static bool logged = false; if(!logged) { - error("DBENGINE: cannot check page for nulls on unknown page type id %d", (mrg_metric_ctx(handle->metric))->config.page_type); + netdata_log_error("DBENGINE: cannot check page for nulls on unknown page type id %d", (mrg_metric_ctx(handle->metric))->config.page_type); logged = true; } return false; @@ -703,14 +703,14 @@ static void register_query_handle(struct rrdeng_query_handle *handle) { handle->query_pid = gettid(); handle->started_time_s = now_realtime_sec(); - netdata_spinlock_lock(&global_query_handle_spinlock); + spinlock_lock(&global_query_handle_spinlock); DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(global_query_handle_ll, handle, prev, next); - netdata_spinlock_unlock(&global_query_handle_spinlock); + spinlock_unlock(&global_query_handle_spinlock); } static void unregister_query_handle(struct rrdeng_query_handle *handle) { - netdata_spinlock_lock(&global_query_handle_spinlock); + spinlock_lock(&global_query_handle_spinlock); DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(global_query_handle_ll, handle, prev, next); - netdata_spinlock_unlock(&global_query_handle_spinlock); + spinlock_unlock(&global_query_handle_spinlock); } #else static void register_query_handle(struct rrdeng_query_handle *handle __maybe_unused) { @@ -908,7 +908,7 @@ STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim default: { static bool logged = false; if(!logged) { - error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", handle->ctx->config.page_type); + netdata_log_error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", handle->ctx->config.page_type); logged = true; } storage_point_empty(sp, sp.start_time_s, sp.end_time_s); @@ -986,7 +986,7 @@ bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_ { struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; if (unlikely(!ctx)) { - error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__); + netdata_log_error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__); return false; } @@ -1002,6 +1002,26 @@ bool rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance, uuid_t *dim_ return true; } +size_t rrdeng_disk_space_max(STORAGE_INSTANCE *db_instance) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + return ctx->config.max_disk_space; +} + +size_t rrdeng_disk_space_used(STORAGE_INSTANCE *db_instance) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + return __atomic_load_n(&ctx->atomic.current_disk_space, __ATOMIC_RELAXED); +} + +time_t rrdeng_global_first_time_s(STORAGE_INSTANCE *db_instance) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + return __atomic_load_n(&ctx->atomic.first_time_s, __ATOMIC_RELAXED); +} + +size_t rrdeng_currently_collected_metrics(STORAGE_INSTANCE *db_instance) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance; + return __atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED); +} + /* * Gathers Database Engine statistics. * Careful when modifying this function. @@ -1062,20 +1082,20 @@ static void rrdeng_populate_mrg(struct rrdengine_instance *ctx) { datafiles++; uv_rwlock_rdunlock(&ctx->datafiles.rwlock); - size_t cpus = get_netdata_cpus() / storage_tiers; - if(cpus > datafiles) - cpus = datafiles; + ssize_t cpus = (ssize_t)get_netdata_cpus() / (ssize_t)storage_tiers; + if(cpus > (ssize_t)datafiles) + cpus = (ssize_t)datafiles; - if(cpus < 1) - cpus = 1; + if(cpus > (ssize_t)libuv_worker_threads) + cpus = (ssize_t)libuv_worker_threads; - if(cpus > (size_t)libuv_worker_threads) - cpus = (size_t)libuv_worker_threads; + if(cpus >= (ssize_t)get_netdata_cpus() / 2) + cpus = (ssize_t)(get_netdata_cpus() / 2 - 1); - if(cpus > MRG_PARTITIONS) - cpus = MRG_PARTITIONS; + if(cpus < 1) + cpus = 1; - info("DBENGINE: populating retention to MRG from %zu journal files of tier %d, using %zu threads...", datafiles, ctx->config.tier, cpus); + netdata_log_info("DBENGINE: populating retention to MRG from %zu journal files of tier %d, using %zd threads...", datafiles, ctx->config.tier, cpus); if(datafiles > 2) { struct rrdengine_datafile *datafile; @@ -1116,7 +1136,7 @@ void rrdeng_readiness_wait(struct rrdengine_instance *ctx) { ctx->loading.populate_mrg.array = NULL; ctx->loading.populate_mrg.size = 0; - info("DBENGINE: tier %d is ready for data collection and queries", ctx->config.tier); + netdata_log_info("DBENGINE: tier %d is ready for data collection and queries", ctx->config.tier); } bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance) { @@ -1140,7 +1160,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, const char *dbfiles_path, /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */ rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE); if (rrdeng_reserved_file_descriptors > max_open_files) { - error( + netdata_log_error( "Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.", (unsigned)rrdeng_reserved_file_descriptors, (unsigned)max_open_files); @@ -1172,6 +1192,9 @@ int rrdeng_init(struct rrdengine_instance **ctxp, const char *dbfiles_path, ctx->atomic.transaction_id = 1; ctx->quiesce.enabled = false; + rw_spinlock_init(&ctx->njfv2idx.spinlock); + ctx->atomic.first_time_s = LONG_MAX; + if (rrdeng_dbengine_spawn(ctx) && !init_rrd_files(ctx)) { // success - we run this ctx too rrdeng_populate_mrg(ctx); @@ -1208,16 +1231,16 @@ int rrdeng_exit(struct rrdengine_instance *ctx) { bool logged = false; while(__atomic_load_n(&ctx->atomic.collectors_running, __ATOMIC_RELAXED) && !unittest_running) { if(!logged) { - info("DBENGINE: waiting for collectors to finish on tier %d...", (ctx->config.legacy) ? -1 : ctx->config.tier); + netdata_log_info("DBENGINE: waiting for collectors to finish on tier %d...", (ctx->config.legacy) ? -1 : ctx->config.tier); logged = true; } sleep_usec(100 * USEC_PER_MS); } - info("DBENGINE: flushing main cache for tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier); + netdata_log_info("DBENGINE: flushing main cache for tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier); pgc_flush_all_hot_and_dirty_pages(main_cache, (Word_t)ctx); - info("DBENGINE: shutting down tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier); + netdata_log_info("DBENGINE: shutting down tier %d", (ctx->config.legacy) ? -1 : ctx->config.tier); struct completion completion = {}; completion_init(&completion); rrdeng_enq_cmd(ctx, RRDENG_OPCODE_CTX_SHUTDOWN, NULL, &completion, STORAGE_PRIORITY_BEST_EFFORT, NULL, NULL); diff --git a/database/engine/rrdengineapi.h b/database/engine/rrdengineapi.h index 514954af..12f1becd 100644 --- a/database/engine/rrdengineapi.h +++ b/database/engine/rrdengineapi.h @@ -222,4 +222,7 @@ RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx); size_t rrdeng_collectors_running(struct rrdengine_instance *ctx); bool rrdeng_is_legacy(STORAGE_INSTANCE *db_instance); +size_t rrdeng_disk_space_max(STORAGE_INSTANCE *db_instance); +size_t rrdeng_disk_space_used(STORAGE_INSTANCE *db_instance); + #endif /* NETDATA_RRDENGINEAPI_H */ diff --git a/database/engine/rrdenginelib.c b/database/engine/rrdenginelib.c index 984a591e..dc581d98 100644 --- a/database/engine/rrdenginelib.c +++ b/database/engine/rrdenginelib.c @@ -14,12 +14,12 @@ int check_file_properties(uv_file file, uint64_t *file_size, size_t min_size) fatal_assert(req.result == 0); s = req.ptr; if (!(s->st_mode & S_IFREG)) { - error("Not a regular file.\n"); + netdata_log_error("Not a regular file.\n"); uv_fs_req_cleanup(&req); return UV_EINVAL; } if (s->st_size < min_size) { - error("File length is too short.\n"); + netdata_log_error("File length is too short.\n"); uv_fs_req_cleanup(&req); return UV_EINVAL; } @@ -56,16 +56,16 @@ int open_file_for_io(char *path, int flags, uv_file *file, int direct) fd = uv_fs_open(NULL, &req, path, current_flags, S_IRUSR | S_IWUSR, NULL); if (fd < 0) { if ((direct) && (UV_EINVAL == fd)) { - error("File \"%s\" does not support direct I/O, falling back to buffered I/O.", path); + netdata_log_error("File \"%s\" does not support direct I/O, falling back to buffered I/O.", path); } else { - error("Failed to open file \"%s\".", path); + netdata_log_error("Failed to open file \"%s\".", path); --direct; /* break the loop */ } } else { fatal_assert(req.result >= 0); *file = req.result; #ifdef __APPLE__ - info("Disabling OS X caching for file \"%s\".", path); + netdata_log_info("Disabling OS X caching for file \"%s\".", path); fcntl(fd, F_NOCACHE, 1); #endif --direct; /* break the loop */ @@ -90,7 +90,7 @@ int is_legacy_child(const char *machine_guid) snprintfz(dbengine_file, FILENAME_MAX, "%s/%s/dbengine", netdata_configured_cache_dir, machine_guid); int rc = uv_fs_stat(NULL, &stat_req, dbengine_file, NULL); if (likely(rc == 0 && ((stat_req.statbuf.st_mode & S_IFMT) == S_IFDIR))) { - //info("Found legacy engine folder \"%s\"", dbengine_file); + //netdata_log_info("Found legacy engine folder \"%s\"", dbengine_file); return 1; } } @@ -107,7 +107,7 @@ int count_legacy_children(char *dbfiles_path) ret = uv_fs_scandir(NULL, &req, dbfiles_path, 0, NULL); if (ret < 0) { uv_fs_req_cleanup(&req); - error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret)); + netdata_log_error("uv_fs_scandir(%s): %s", dbfiles_path, uv_strerror(ret)); return ret; } @@ -134,7 +134,7 @@ int compute_multidb_diskspace() fclose(fp); if (unlikely(rc != 1 || computed_multidb_disk_quota_mb < RRDENG_MIN_DISK_SPACE_MB)) { errno = 0; - error("File '%s' contains invalid input, it will be rebuild", multidb_disk_space_file); + netdata_log_error("File '%s' contains invalid input, it will be rebuild", multidb_disk_space_file); computed_multidb_disk_quota_mb = -1; } } @@ -143,15 +143,15 @@ int compute_multidb_diskspace() int rc = count_legacy_children(netdata_configured_cache_dir); if (likely(rc >= 0)) { computed_multidb_disk_quota_mb = (rc + 1) * default_rrdeng_disk_quota_mb; - info("Found %d legacy dbengines, setting multidb diskspace to %dMB", rc, computed_multidb_disk_quota_mb); + netdata_log_info("Found %d legacy dbengines, setting multidb diskspace to %dMB", rc, computed_multidb_disk_quota_mb); fp = fopen(multidb_disk_space_file, "w"); if (likely(fp)) { fprintf(fp, "%d", computed_multidb_disk_quota_mb); - info("Created file '%s' to store the computed value", multidb_disk_space_file); + netdata_log_info("Created file '%s' to store the computed value", multidb_disk_space_file); fclose(fp); } else - error("Failed to store the default multidb disk quota size on '%s'", multidb_disk_space_file); + netdata_log_error("Failed to store the default multidb disk quota size on '%s'", multidb_disk_space_file); } else computed_multidb_disk_quota_mb = default_rrdeng_disk_quota_mb; diff --git a/database/engine/rrdenginelib.h b/database/engine/rrdenginelib.h index ca8eacae..831e4853 100644 --- a/database/engine/rrdenginelib.h +++ b/database/engine/rrdenginelib.h @@ -53,7 +53,7 @@ static inline void modify_bit(unsigned *x, unsigned pos, uint8_t val) *x |= 1U << pos; break; default: - error("modify_bit() called with invalid argument."); + netdata_log_error("modify_bit() called with invalid argument."); break; } } |