diff options
Diffstat (limited to '')
-rw-r--r-- | database/engine/datafile.c | 117 |
1 files changed, 62 insertions, 55 deletions
diff --git a/database/engine/datafile.c b/database/engine/datafile.c index 8c413d8dc..d5c1285be 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -1,11 +1,15 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdengine.h" -void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool having_lock) { - uv_rwlock_wrlock(&ctx->datafiles.rwlock); + if(!having_lock) + uv_rwlock_wrlock(&ctx->datafiles.rwlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(ctx->datafiles.first, datafile, prev, next); - uv_rwlock_wrunlock(&ctx->datafiles.rwlock); + + if(!having_lock) + uv_rwlock_wrunlock(&ctx->datafiles.rwlock); } void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) @@ -27,9 +31,9 @@ static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_insta datafile->users.available = true; - netdata_spinlock_init(&datafile->users.spinlock); - netdata_spinlock_init(&datafile->writers.spinlock); - netdata_spinlock_init(&datafile->extent_queries.spinlock); + spinlock_init(&datafile->users.spinlock); + spinlock_init(&datafile->writers.spinlock); + spinlock_init(&datafile->extent_queries.spinlock); return datafile; } @@ -37,7 +41,7 @@ static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_insta bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) { bool ret; - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); if(df->users.available) { ret = true; @@ -47,25 +51,25 @@ bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS re else ret = false; - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); return ret; } void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) { - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); if(!df->users.lockers) fatal("DBENGINE DATAFILE: cannot release a datafile that is not acquired"); df->users.lockers--; df->users.lockers_by_reason[reason]--; - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); } bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { bool can_be_deleted = false; - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); df->users.available = false; if(!df->users.lockers) @@ -75,9 +79,9 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { // there are lockers // evict any pages referencing this in the open cache - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); pgc_open_evict_clean_pages_of_datafile(open_cache, df); - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); if(!df->users.lockers) can_be_deleted = true; @@ -86,12 +90,12 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { // there are lockers still // count the number of pages referencing this in the open cache - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); usec_t time_to_scan_ut = now_monotonic_usec(); size_t clean_pages_in_open_cache = pgc_count_clean_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df); size_t hot_pages_in_open_cache = pgc_count_hot_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df); time_to_scan_ut = now_monotonic_usec() - time_to_scan_ut; - netdata_spinlock_lock(&df->users.spinlock); + spinlock_lock(&df->users.spinlock); if(!df->users.lockers) can_be_deleted = true; @@ -149,7 +153,7 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { time_to_scan_ut); } } - netdata_spinlock_unlock(&df->users.spinlock); + spinlock_unlock(&df->users.spinlock); return can_be_deleted; } @@ -171,7 +175,7 @@ int close_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_close(NULL, &req, datafile->file, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -190,7 +194,7 @@ int unlink_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_unlink(NULL, &req, path, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -211,21 +215,21 @@ int destroy_data_file_unsafe(struct rrdengine_datafile *datafile) ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); ret = uv_fs_close(NULL, &req, datafile->file, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); ret = uv_fs_unlink(NULL, &req, path, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -268,7 +272,7 @@ int create_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { fatal_assert(req.result < 0); - error("DBENGINE: uv_fs_write: %s", uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_write: %s", uv_strerror(ret)); ctx_io_error(ctx); } uv_fs_req_cleanup(&req); @@ -299,7 +303,7 @@ static int check_data_file_superblock(uv_file file) ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_read: %s", uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_read: %s", uv_strerror(ret)); uv_fs_req_cleanup(&req); goto error; } @@ -309,7 +313,7 @@ static int check_data_file_superblock(uv_file file) if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) || strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) || superblock->tier != 1) { - error("DBENGINE: file has invalid superblock."); + netdata_log_error("DBENGINE: file has invalid superblock."); ret = UV_EINVAL; } else { ret = 0; @@ -334,7 +338,7 @@ static int load_data_file(struct rrdengine_datafile *datafile) ctx_fs_error(ctx); return fd; } - info("DBENGINE: initializing data file \"%s\".", path); + netdata_log_info("DBENGINE: initializing data file \"%s\".", path); ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); if (ret) @@ -350,14 +354,14 @@ static int load_data_file(struct rrdengine_datafile *datafile) datafile->file = file; datafile->pos = file_size; - info("DBENGINE: data file \"%s\" initialized (size:%"PRIu64").", path, file_size); + netdata_log_info("DBENGINE: data file \"%s\" initialized (size:%"PRIu64").", path, file_size); return 0; error: error = ret; ret = uv_fs_close(NULL, &req, file, NULL); if (ret < 0) { - error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -390,11 +394,11 @@ static int scan_data_files(struct rrdengine_instance *ctx) if (ret < 0) { fatal_assert(req.result < 0); uv_fs_req_cleanup(&req); - error("DBENGINE: uv_fs_scandir(%s): %s", ctx->config.dbfiles_path, uv_strerror(ret)); + netdata_log_error("DBENGINE: uv_fs_scandir(%s): %s", ctx->config.dbfiles_path, uv_strerror(ret)); ctx_fs_error(ctx); return ret; } - info("DBENGINE: found %d files in path %s", ret, ctx->config.dbfiles_path); + netdata_log_info("DBENGINE: found %d files in path %s", ret, ctx->config.dbfiles_path); datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { @@ -410,11 +414,12 @@ static int scan_data_files(struct rrdengine_instance *ctx) freez(datafiles); return 0; } - if (matched_files == MAX_DATAFILES) { - error("DBENGINE: warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); - } + + if (matched_files == MAX_DATAFILES) + netdata_log_error("DBENGINE: warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); + qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp); - /* TODO: change this when tiering is implemented */ + ctx->atomic.last_fileno = datafiles[matched_files - 1]->fileno; for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { @@ -422,9 +427,9 @@ static int scan_data_files(struct rrdengine_instance *ctx) datafile = datafiles[i]; ret = load_data_file(datafile); - if (0 != ret) { + if (0 != ret) must_delete_pair = 1; - } + journalfile = journalfile_alloc_and_init(datafile); ret = journalfile_load(ctx, journalfile, datafile); if (0 != ret) { @@ -432,19 +437,20 @@ static int scan_data_files(struct rrdengine_instance *ctx) close_data_file(datafile); must_delete_pair = 1; } + if (must_delete_pair) { char path[RRDENG_PATH_MAX]; - error("DBENGINE: deleting invalid data and journal file pair."); + netdata_log_error("DBENGINE: deleting invalid data and journal file pair."); ret = journalfile_unlink(journalfile); if (!ret) { journalfile_v1_generate_path(datafile, path, sizeof(path)); - info("DBENGINE: deleted journal file \"%s\".", path); + netdata_log_info("DBENGINE: deleted journal file \"%s\".", path); } ret = unlink_data_file(datafile); if (!ret) { generate_datafilepath(datafile, path, sizeof(path)); - info("DBENGINE: deleted data file \"%s\".", path); + netdata_log_info("DBENGINE: deleted data file \"%s\".", path); } freez(journalfile); freez(datafile); @@ -453,8 +459,9 @@ static int scan_data_files(struct rrdengine_instance *ctx) } ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos); - datafile_list_insert(ctx, datafile); + datafile_list_insert(ctx, datafile, false); } + matched_files -= failed_to_load; freez(datafiles); @@ -462,7 +469,7 @@ static int scan_data_files(struct rrdengine_instance *ctx) } /* Creates a datafile and a journalfile pair */ -int create_new_datafile_pair(struct rrdengine_instance *ctx) +int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock) { __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_creation_started, 1, __ATOMIC_RELAXED); @@ -472,14 +479,14 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx) int ret; char path[RRDENG_PATH_MAX]; - info("DBENGINE: creating new data and journal files in path %s", ctx->config.dbfiles_path); + netdata_log_info("DBENGINE: creating new data and journal files in path %s", ctx->config.dbfiles_path); datafile = datafile_alloc_and_init(ctx, 1, fileno); ret = create_data_file(datafile); if(ret) goto error_after_datafile; generate_datafilepath(datafile, path, sizeof(path)); - info("DBENGINE: created data file \"%s\".", path); + netdata_log_info("DBENGINE: created data file \"%s\".", path); journalfile = journalfile_alloc_and_init(datafile); ret = journalfile_create(journalfile, datafile); @@ -487,10 +494,10 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx) goto error_after_journalfile; journalfile_v1_generate_path(datafile, path, sizeof(path)); - info("DBENGINE: created journal file \"%s\".", path); + netdata_log_info("DBENGINE: created journal file \"%s\".", path); ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos); - datafile_list_insert(ctx, datafile); + datafile_list_insert(ctx, datafile, having_lock); ctx_last_fileno_increment(ctx); return 0; @@ -514,20 +521,20 @@ int init_data_files(struct rrdengine_instance *ctx) fatal_assert(0 == uv_rwlock_init(&ctx->datafiles.rwlock)); ret = scan_data_files(ctx); if (ret < 0) { - error("DBENGINE: failed to scan path \"%s\".", ctx->config.dbfiles_path); + netdata_log_error("DBENGINE: failed to scan path \"%s\".", ctx->config.dbfiles_path); return ret; } else if (0 == ret) { - info("DBENGINE: data files not found, creating in path \"%s\".", ctx->config.dbfiles_path); + netdata_log_info("DBENGINE: data files not found, creating in path \"%s\".", ctx->config.dbfiles_path); ctx->atomic.last_fileno = 0; - ret = create_new_datafile_pair(ctx); + ret = create_new_datafile_pair(ctx, false); if (ret) { - error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->config.dbfiles_path); + netdata_log_error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->config.dbfiles_path); return ret; } } else { if (ctx->loading.create_new_datafile_pair) - create_new_datafile_pair(ctx); + create_new_datafile_pair(ctx, false); while(rrdeng_ctx_exceeded_disk_quota(ctx)) datafile_delete(ctx, ctx->datafiles.first, false, false); @@ -545,7 +552,7 @@ void finalize_data_files(struct rrdengine_instance *ctx) logged = false; while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED)) { if(!logged) { - info("Waiting for inflight flush to finish on tier %d...", ctx->config.tier); + netdata_log_info("Waiting for inflight flush to finish on tier %d...", ctx->config.tier); logged = true; } sleep_usec(100 * USEC_PER_MS); @@ -559,7 +566,7 @@ void finalize_data_files(struct rrdengine_instance *ctx) size_t iterations = 100; while(!datafile_acquire_for_deletion(datafile) && datafile != ctx->datafiles.first->prev && --iterations > 0) { if(!logged) { - info("Waiting to acquire data file %u of tier %d to close it...", datafile->fileno, ctx->config.tier); + netdata_log_info("Waiting to acquire data file %u of tier %d to close it...", datafile->fileno, ctx->config.tier); logged = true; } sleep_usec(100 * USEC_PER_MS); @@ -569,14 +576,14 @@ void finalize_data_files(struct rrdengine_instance *ctx) bool available = false; do { uv_rwlock_wrlock(&ctx->datafiles.rwlock); - netdata_spinlock_lock(&datafile->writers.spinlock); + spinlock_lock(&datafile->writers.spinlock); available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true; if(!available) { - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); uv_rwlock_wrunlock(&ctx->datafiles.rwlock); if(!logged) { - info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->config.tier); + netdata_log_info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->config.tier); logged = true; } sleep_usec(100 * USEC_PER_MS); @@ -586,7 +593,7 @@ void finalize_data_files(struct rrdengine_instance *ctx) journalfile_close(journalfile, datafile); close_data_file(datafile); datafile_list_delete_unsafe(ctx, datafile); - netdata_spinlock_unlock(&datafile->writers.spinlock); + spinlock_unlock(&datafile->writers.spinlock); uv_rwlock_wrunlock(&ctx->datafiles.rwlock); freez(journalfile); |