summaryrefslogtreecommitdiffstats
path: root/database/engine/datafile.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/datafile.c')
-rw-r--r--database/engine/datafile.c117
1 files changed, 62 insertions, 55 deletions
diff --git a/database/engine/datafile.c b/database/engine/datafile.c
index 8c413d8dc..d5c1285be 100644
--- a/database/engine/datafile.c
+++ b/database/engine/datafile.c
@@ -1,11 +1,15 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
-void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
+void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool having_lock)
{
- uv_rwlock_wrlock(&ctx->datafiles.rwlock);
+ if(!having_lock)
+ uv_rwlock_wrlock(&ctx->datafiles.rwlock);
+
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(ctx->datafiles.first, datafile, prev, next);
- uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
+
+ if(!having_lock)
+ uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
}
void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile)
@@ -27,9 +31,9 @@ static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_insta
datafile->users.available = true;
- netdata_spinlock_init(&datafile->users.spinlock);
- netdata_spinlock_init(&datafile->writers.spinlock);
- netdata_spinlock_init(&datafile->extent_queries.spinlock);
+ spinlock_init(&datafile->users.spinlock);
+ spinlock_init(&datafile->writers.spinlock);
+ spinlock_init(&datafile->extent_queries.spinlock);
return datafile;
}
@@ -37,7 +41,7 @@ static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_insta
bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) {
bool ret;
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
if(df->users.available) {
ret = true;
@@ -47,25 +51,25 @@ bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS re
else
ret = false;
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
return ret;
}
void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) {
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
if(!df->users.lockers)
fatal("DBENGINE DATAFILE: cannot release a datafile that is not acquired");
df->users.lockers--;
df->users.lockers_by_reason[reason]--;
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
}
bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
bool can_be_deleted = false;
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
df->users.available = false;
if(!df->users.lockers)
@@ -75,9 +79,9 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
// there are lockers
// evict any pages referencing this in the open cache
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
pgc_open_evict_clean_pages_of_datafile(open_cache, df);
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
if(!df->users.lockers)
can_be_deleted = true;
@@ -86,12 +90,12 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
// there are lockers still
// count the number of pages referencing this in the open cache
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
usec_t time_to_scan_ut = now_monotonic_usec();
size_t clean_pages_in_open_cache = pgc_count_clean_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df);
size_t hot_pages_in_open_cache = pgc_count_hot_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df);
time_to_scan_ut = now_monotonic_usec() - time_to_scan_ut;
- netdata_spinlock_lock(&df->users.spinlock);
+ spinlock_lock(&df->users.spinlock);
if(!df->users.lockers)
can_be_deleted = true;
@@ -149,7 +153,7 @@ bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) {
time_to_scan_ut);
}
}
- netdata_spinlock_unlock(&df->users.spinlock);
+ spinlock_unlock(&df->users.spinlock);
return can_be_deleted;
}
@@ -171,7 +175,7 @@ int close_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_close(NULL, &req, datafile->file, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -190,7 +194,7 @@ int unlink_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -211,21 +215,21 @@ int destroy_data_file_unsafe(struct rrdengine_datafile *datafile)
ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, datafile->file, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -268,7 +272,7 @@ int create_data_file(struct rrdengine_datafile *datafile)
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
fatal_assert(req.result < 0);
- error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_write: %s", uv_strerror(ret));
ctx_io_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -299,7 +303,7 @@ static int check_data_file_superblock(uv_file file)
ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_read: %s", uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_read: %s", uv_strerror(ret));
uv_fs_req_cleanup(&req);
goto error;
}
@@ -309,7 +313,7 @@ static int check_data_file_superblock(uv_file file)
if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) ||
strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) ||
superblock->tier != 1) {
- error("DBENGINE: file has invalid superblock.");
+ netdata_log_error("DBENGINE: file has invalid superblock.");
ret = UV_EINVAL;
} else {
ret = 0;
@@ -334,7 +338,7 @@ static int load_data_file(struct rrdengine_datafile *datafile)
ctx_fs_error(ctx);
return fd;
}
- info("DBENGINE: initializing data file \"%s\".", path);
+ netdata_log_info("DBENGINE: initializing data file \"%s\".", path);
ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
if (ret)
@@ -350,14 +354,14 @@ static int load_data_file(struct rrdengine_datafile *datafile)
datafile->file = file;
datafile->pos = file_size;
- info("DBENGINE: data file \"%s\" initialized (size:%"PRIu64").", path, file_size);
+ netdata_log_info("DBENGINE: data file \"%s\" initialized (size:%"PRIu64").", path, file_size);
return 0;
error:
error = ret;
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
- error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret));
ctx_fs_error(ctx);
}
uv_fs_req_cleanup(&req);
@@ -390,11 +394,11 @@ static int scan_data_files(struct rrdengine_instance *ctx)
if (ret < 0) {
fatal_assert(req.result < 0);
uv_fs_req_cleanup(&req);
- error("DBENGINE: uv_fs_scandir(%s): %s", ctx->config.dbfiles_path, uv_strerror(ret));
+ netdata_log_error("DBENGINE: uv_fs_scandir(%s): %s", ctx->config.dbfiles_path, uv_strerror(ret));
ctx_fs_error(ctx);
return ret;
}
- info("DBENGINE: found %d files in path %s", ret, ctx->config.dbfiles_path);
+ netdata_log_info("DBENGINE: found %d files in path %s", ret, ctx->config.dbfiles_path);
datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles));
for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) {
@@ -410,11 +414,12 @@ static int scan_data_files(struct rrdengine_instance *ctx)
freez(datafiles);
return 0;
}
- if (matched_files == MAX_DATAFILES) {
- error("DBENGINE: warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
- }
+
+ if (matched_files == MAX_DATAFILES)
+ netdata_log_error("DBENGINE: warning: hit maximum database engine file limit of %d files", MAX_DATAFILES);
+
qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp);
- /* TODO: change this when tiering is implemented */
+
ctx->atomic.last_fileno = datafiles[matched_files - 1]->fileno;
for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) {
@@ -422,9 +427,9 @@ static int scan_data_files(struct rrdengine_instance *ctx)
datafile = datafiles[i];
ret = load_data_file(datafile);
- if (0 != ret) {
+ if (0 != ret)
must_delete_pair = 1;
- }
+
journalfile = journalfile_alloc_and_init(datafile);
ret = journalfile_load(ctx, journalfile, datafile);
if (0 != ret) {
@@ -432,19 +437,20 @@ static int scan_data_files(struct rrdengine_instance *ctx)
close_data_file(datafile);
must_delete_pair = 1;
}
+
if (must_delete_pair) {
char path[RRDENG_PATH_MAX];
- error("DBENGINE: deleting invalid data and journal file pair.");
+ netdata_log_error("DBENGINE: deleting invalid data and journal file pair.");
ret = journalfile_unlink(journalfile);
if (!ret) {
journalfile_v1_generate_path(datafile, path, sizeof(path));
- info("DBENGINE: deleted journal file \"%s\".", path);
+ netdata_log_info("DBENGINE: deleted journal file \"%s\".", path);
}
ret = unlink_data_file(datafile);
if (!ret) {
generate_datafilepath(datafile, path, sizeof(path));
- info("DBENGINE: deleted data file \"%s\".", path);
+ netdata_log_info("DBENGINE: deleted data file \"%s\".", path);
}
freez(journalfile);
freez(datafile);
@@ -453,8 +459,9 @@ static int scan_data_files(struct rrdengine_instance *ctx)
}
ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos);
- datafile_list_insert(ctx, datafile);
+ datafile_list_insert(ctx, datafile, false);
}
+
matched_files -= failed_to_load;
freez(datafiles);
@@ -462,7 +469,7 @@ static int scan_data_files(struct rrdengine_instance *ctx)
}
/* Creates a datafile and a journalfile pair */
-int create_new_datafile_pair(struct rrdengine_instance *ctx)
+int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock)
{
__atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_creation_started, 1, __ATOMIC_RELAXED);
@@ -472,14 +479,14 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx)
int ret;
char path[RRDENG_PATH_MAX];
- info("DBENGINE: creating new data and journal files in path %s", ctx->config.dbfiles_path);
+ netdata_log_info("DBENGINE: creating new data and journal files in path %s", ctx->config.dbfiles_path);
datafile = datafile_alloc_and_init(ctx, 1, fileno);
ret = create_data_file(datafile);
if(ret)
goto error_after_datafile;
generate_datafilepath(datafile, path, sizeof(path));
- info("DBENGINE: created data file \"%s\".", path);
+ netdata_log_info("DBENGINE: created data file \"%s\".", path);
journalfile = journalfile_alloc_and_init(datafile);
ret = journalfile_create(journalfile, datafile);
@@ -487,10 +494,10 @@ int create_new_datafile_pair(struct rrdengine_instance *ctx)
goto error_after_journalfile;
journalfile_v1_generate_path(datafile, path, sizeof(path));
- info("DBENGINE: created journal file \"%s\".", path);
+ netdata_log_info("DBENGINE: created journal file \"%s\".", path);
ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos);
- datafile_list_insert(ctx, datafile);
+ datafile_list_insert(ctx, datafile, having_lock);
ctx_last_fileno_increment(ctx);
return 0;
@@ -514,20 +521,20 @@ int init_data_files(struct rrdengine_instance *ctx)
fatal_assert(0 == uv_rwlock_init(&ctx->datafiles.rwlock));
ret = scan_data_files(ctx);
if (ret < 0) {
- error("DBENGINE: failed to scan path \"%s\".", ctx->config.dbfiles_path);
+ netdata_log_error("DBENGINE: failed to scan path \"%s\".", ctx->config.dbfiles_path);
return ret;
} else if (0 == ret) {
- info("DBENGINE: data files not found, creating in path \"%s\".", ctx->config.dbfiles_path);
+ netdata_log_info("DBENGINE: data files not found, creating in path \"%s\".", ctx->config.dbfiles_path);
ctx->atomic.last_fileno = 0;
- ret = create_new_datafile_pair(ctx);
+ ret = create_new_datafile_pair(ctx, false);
if (ret) {
- error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->config.dbfiles_path);
+ netdata_log_error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->config.dbfiles_path);
return ret;
}
}
else {
if (ctx->loading.create_new_datafile_pair)
- create_new_datafile_pair(ctx);
+ create_new_datafile_pair(ctx, false);
while(rrdeng_ctx_exceeded_disk_quota(ctx))
datafile_delete(ctx, ctx->datafiles.first, false, false);
@@ -545,7 +552,7 @@ void finalize_data_files(struct rrdengine_instance *ctx)
logged = false;
while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED)) {
if(!logged) {
- info("Waiting for inflight flush to finish on tier %d...", ctx->config.tier);
+ netdata_log_info("Waiting for inflight flush to finish on tier %d...", ctx->config.tier);
logged = true;
}
sleep_usec(100 * USEC_PER_MS);
@@ -559,7 +566,7 @@ void finalize_data_files(struct rrdengine_instance *ctx)
size_t iterations = 100;
while(!datafile_acquire_for_deletion(datafile) && datafile != ctx->datafiles.first->prev && --iterations > 0) {
if(!logged) {
- info("Waiting to acquire data file %u of tier %d to close it...", datafile->fileno, ctx->config.tier);
+ netdata_log_info("Waiting to acquire data file %u of tier %d to close it...", datafile->fileno, ctx->config.tier);
logged = true;
}
sleep_usec(100 * USEC_PER_MS);
@@ -569,14 +576,14 @@ void finalize_data_files(struct rrdengine_instance *ctx)
bool available = false;
do {
uv_rwlock_wrlock(&ctx->datafiles.rwlock);
- netdata_spinlock_lock(&datafile->writers.spinlock);
+ spinlock_lock(&datafile->writers.spinlock);
available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true;
if(!available) {
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
if(!logged) {
- info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->config.tier);
+ netdata_log_info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->config.tier);
logged = true;
}
sleep_usec(100 * USEC_PER_MS);
@@ -586,7 +593,7 @@ void finalize_data_files(struct rrdengine_instance *ctx)
journalfile_close(journalfile, datafile);
close_data_file(datafile);
datafile_list_delete_unsafe(ctx, datafile);
- netdata_spinlock_unlock(&datafile->writers.spinlock);
+ spinlock_unlock(&datafile->writers.spinlock);
uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
freez(journalfile);