diff options
Diffstat (limited to 'src/database/engine/datafile.c')
-rw-r--r-- | src/database/engine/datafile.c | 613 |
1 files changed, 613 insertions, 0 deletions
diff --git a/src/database/engine/datafile.c b/src/database/engine/datafile.c new file mode 100644 index 000000000..1ec2dea79 --- /dev/null +++ b/src/database/engine/datafile.c @@ -0,0 +1,613 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#include "rrdengine.h" + +void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool having_lock) +{ + if(!having_lock) + uv_rwlock_wrlock(&ctx->datafiles.rwlock); + + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(ctx->datafiles.first, datafile, prev, next); + + if(!having_lock) + uv_rwlock_wrunlock(&ctx->datafiles.rwlock); +} + +void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +{ + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(ctx->datafiles.first, datafile, prev, next); +} + + +static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) +{ + fatal_assert(tier == 1); + + struct rrdengine_datafile *datafile = callocz(1, sizeof(struct rrdengine_datafile)); + + datafile->tier = tier; + datafile->fileno = fileno; + fatal_assert(0 == uv_rwlock_init(&datafile->extent_rwlock)); + datafile->ctx = ctx; + + datafile->users.available = true; + + spinlock_init(&datafile->users.spinlock); + spinlock_init(&datafile->writers.spinlock); + spinlock_init(&datafile->extent_queries.spinlock); + + return datafile; +} + +bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) { + bool ret; + + spinlock_lock(&df->users.spinlock); + + if(df->users.available) { + ret = true; + df->users.lockers++; + df->users.lockers_by_reason[reason]++; + } + else + ret = false; + + spinlock_unlock(&df->users.spinlock); + + return ret; +} + +void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) { + spinlock_lock(&df->users.spinlock); + if(!df->users.lockers) + fatal("DBENGINE DATAFILE: cannot release a datafile that is not acquired"); + + df->users.lockers--; + df->users.lockers_by_reason[reason]--; + spinlock_unlock(&df->users.spinlock); +} + +bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { + bool can_be_deleted = false; + + spinlock_lock(&df->users.spinlock); + df->users.available = false; + + if(!df->users.lockers) + can_be_deleted = true; + + else { + // there are lockers + + // evict any pages referencing this in the open cache + spinlock_unlock(&df->users.spinlock); + pgc_open_evict_clean_pages_of_datafile(open_cache, df); + spinlock_lock(&df->users.spinlock); + + if(!df->users.lockers) + can_be_deleted = true; + + else { + // there are lockers still + + // count the number of pages referencing this in the open cache + spinlock_unlock(&df->users.spinlock); + usec_t time_to_scan_ut = now_monotonic_usec(); + size_t clean_pages_in_open_cache = pgc_count_clean_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df); + size_t hot_pages_in_open_cache = pgc_count_hot_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df); + time_to_scan_ut = now_monotonic_usec() - time_to_scan_ut; + spinlock_lock(&df->users.spinlock); + + if(!df->users.lockers) + can_be_deleted = true; + + else if(!clean_pages_in_open_cache && !hot_pages_in_open_cache) { + // no pages in the open cache related to this datafile + + time_t now_s = now_monotonic_sec(); + + if(!df->users.time_to_evict) { + // first time we did the above + df->users.time_to_evict = now_s + 120; + internal_error(true, "DBENGINE: datafile %u of tier %d is not used by any open cache pages, " + "but it has %u lockers (oc:%u, pd:%u), " + "%zu clean and %zu hot open cache pages " + "- will be deleted shortly " + "(scanned open cache in %"PRIu64" usecs)", + df->fileno, df->ctx->config.tier, + df->users.lockers, + df->users.lockers_by_reason[DATAFILE_ACQUIRE_OPEN_CACHE], + df->users.lockers_by_reason[DATAFILE_ACQUIRE_PAGE_DETAILS], + clean_pages_in_open_cache, + hot_pages_in_open_cache, + time_to_scan_ut); + } + + else if(now_s > df->users.time_to_evict) { + // time expired, lets remove it + can_be_deleted = true; + internal_error(true, "DBENGINE: datafile %u of tier %d is not used by any open cache pages, " + "but it has %u lockers (oc:%u, pd:%u), " + "%zu clean and %zu hot open cache pages " + "- will be deleted now " + "(scanned open cache in %"PRIu64" usecs)", + df->fileno, df->ctx->config.tier, + df->users.lockers, + df->users.lockers_by_reason[DATAFILE_ACQUIRE_OPEN_CACHE], + df->users.lockers_by_reason[DATAFILE_ACQUIRE_PAGE_DETAILS], + clean_pages_in_open_cache, + hot_pages_in_open_cache, + time_to_scan_ut); + } + } + else + internal_error(true, "DBENGINE: datafile %u of tier %d " + "has %u lockers (oc:%u, pd:%u), " + "%zu clean and %zu hot open cache pages " + "(scanned open cache in %"PRIu64" usecs)", + df->fileno, df->ctx->config.tier, + df->users.lockers, + df->users.lockers_by_reason[DATAFILE_ACQUIRE_OPEN_CACHE], + df->users.lockers_by_reason[DATAFILE_ACQUIRE_PAGE_DETAILS], + clean_pages_in_open_cache, + hot_pages_in_open_cache, + time_to_scan_ut); + } + } + spinlock_unlock(&df->users.spinlock); + + return can_be_deleted; +} + +void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) +{ + (void) snprintfz(str, maxlen - 1, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION, + datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno); +} + +int close_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_close(NULL, &req, datafile->file, NULL); + if (ret < 0) { + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); + } + uv_fs_req_cleanup(&req); + + return ret; +} + +int unlink_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); + } + uv_fs_req_cleanup(&req); + + __atomic_add_fetch(&ctx->stats.datafile_deletions, 1, __ATOMIC_RELAXED); + + return ret; +} + +int destroy_data_file_unsafe(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + int ret; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + + ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL); + if (ret < 0) { + netdata_log_error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); + } + uv_fs_req_cleanup(&req); + + ret = uv_fs_close(NULL, &req, datafile->file, NULL); + if (ret < 0) { + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); + } + uv_fs_req_cleanup(&req); + + ret = uv_fs_unlink(NULL, &req, path, NULL); + if (ret < 0) { + netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); + } + uv_fs_req_cleanup(&req); + + __atomic_add_fetch(&ctx->stats.datafile_deletions, 1, __ATOMIC_RELAXED); + + return ret; +} + +int create_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd; + struct rrdeng_df_sb *superblock; + uv_buf_t iov; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + fd = open_file_for_io(path, O_CREAT | O_RDWR | O_TRUNC, &file, use_direct_io); + if (fd < 0) { + ctx_fs_error(ctx); + return fd; + } + datafile->file = file; + __atomic_add_fetch(&ctx->stats.datafile_creations, 1, __ATOMIC_RELAXED); + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("DBENGINE: posix_memalign:%s", strerror(ret)); + } + memset(superblock, 0, sizeof(*superblock)); + (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ); + (void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ); + superblock->tier = 1; + + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + fatal_assert(req.result < 0); + netdata_log_error("DBENGINE: uv_fs_write: %s", uv_strerror(ret)); + ctx_io_error(ctx); + } + uv_fs_req_cleanup(&req); + posix_memfree(superblock); + if (ret < 0) { + destroy_data_file_unsafe(datafile); + return ret; + } + + datafile->pos = sizeof(*superblock); + ctx_io_write_op_bytes(ctx, sizeof(*superblock)); + + return 0; +} + +static int check_data_file_superblock(uv_file file) +{ + int ret; + struct rrdeng_df_sb *superblock; + uv_buf_t iov; + uv_fs_t req; + + ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); + if (unlikely(ret)) { + fatal("DBENGINE: posix_memalign:%s", strerror(ret)); + } + iov = uv_buf_init((void *)superblock, sizeof(*superblock)); + + ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); + if (ret < 0) { + netdata_log_error("DBENGINE: uv_fs_read: %s", uv_strerror(ret)); + uv_fs_req_cleanup(&req); + goto error; + } + fatal_assert(req.result >= 0); + uv_fs_req_cleanup(&req); + + if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) || + strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) || + superblock->tier != 1) { + netdata_log_error("DBENGINE: file has invalid superblock."); + ret = UV_EINVAL; + } else { + ret = 0; + } + error: + posix_memfree(superblock); + return ret; +} + +static int load_data_file(struct rrdengine_datafile *datafile) +{ + struct rrdengine_instance *ctx = datafile->ctx; + uv_fs_t req; + uv_file file; + int ret, fd, error; + uint64_t file_size; + char path[RRDENG_PATH_MAX]; + + generate_datafilepath(datafile, path, sizeof(path)); + fd = open_file_for_io(path, O_RDWR, &file, use_direct_io); + if (fd < 0) { + ctx_fs_error(ctx); + return fd; + } + + nd_log_daemon(NDLP_DEBUG, "DBENGINE: initializing data file \"%s\".", path); + + ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); + if (ret) + goto error; + file_size = ALIGN_BYTES_CEILING(file_size); + + ret = check_data_file_superblock(file); + if (ret) + goto error; + + ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_df_sb)); + + datafile->file = file; + datafile->pos = file_size; + + nd_log_daemon(NDLP_DEBUG, "DBENGINE: data file \"%s\" initialized (size:%" PRIu64 ").", path, file_size); + + return 0; + + error: + error = ret; + ret = uv_fs_close(NULL, &req, file, NULL); + if (ret < 0) { + netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); + } + uv_fs_req_cleanup(&req); + return error; +} + +static int scan_data_files_cmp(const void *a, const void *b) +{ + struct rrdengine_datafile *file1, *file2; + char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX]; + + file1 = *(struct rrdengine_datafile **)a; + file2 = *(struct rrdengine_datafile **)b; + generate_datafilepath(file1, path1, sizeof(path1)); + generate_datafilepath(file2, path2, sizeof(path2)); + return strcmp(path1, path2); +} + +/* Returns number of datafiles that were loaded or < 0 on error */ +static int scan_data_files(struct rrdengine_instance *ctx) +{ + int ret, matched_files, failed_to_load, i; + unsigned tier, no; + uv_fs_t req; + uv_dirent_t dent; + struct rrdengine_datafile **datafiles, *datafile; + struct rrdengine_journalfile *journalfile; + + ret = uv_fs_scandir(NULL, &req, ctx->config.dbfiles_path, 0, NULL); + if (ret < 0) { + fatal_assert(req.result < 0); + uv_fs_req_cleanup(&req); + netdata_log_error("DBENGINE: uv_fs_scandir(%s): %s", ctx->config.dbfiles_path, uv_strerror(ret)); + ctx_fs_error(ctx); + return ret; + } + netdata_log_info("DBENGINE: found %d files in path %s", ret, ctx->config.dbfiles_path); + + datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); + for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { + ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no); + if (2 == ret) { + datafile = datafile_alloc_and_init(ctx, tier, no); + datafiles[matched_files++] = datafile; + } + } + uv_fs_req_cleanup(&req); + + if (0 == matched_files) { + freez(datafiles); + return 0; + } + + if (matched_files == MAX_DATAFILES) + netdata_log_error("DBENGINE: warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); + + qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp); + + ctx->atomic.last_fileno = datafiles[matched_files - 1]->fileno; + + netdata_log_info("DBENGINE: loading %d data/journal of tier %d...", matched_files, ctx->config.tier); + for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { + uint8_t must_delete_pair = 0; + + datafile = datafiles[i]; + ret = load_data_file(datafile); + if (0 != ret) + must_delete_pair = 1; + + journalfile = journalfile_alloc_and_init(datafile); + ret = journalfile_load(ctx, journalfile, datafile); + if (0 != ret) { + if (!must_delete_pair) /* If datafile is still open close it */ + close_data_file(datafile); + must_delete_pair = 1; + } + + if (must_delete_pair) { + char path[RRDENG_PATH_MAX]; + + netdata_log_error("DBENGINE: deleting invalid data and journal file pair."); + ret = journalfile_unlink(journalfile); + if (!ret) { + journalfile_v1_generate_path(datafile, path, sizeof(path)); + netdata_log_info("DBENGINE: deleted journal file \"%s\".", path); + } + ret = unlink_data_file(datafile); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + netdata_log_info("DBENGINE: deleted data file \"%s\".", path); + } + freez(journalfile); + freez(datafile); + ++failed_to_load; + continue; + } + + ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos); + datafile_list_insert(ctx, datafile, false); + } + + matched_files -= failed_to_load; + freez(datafiles); + + return matched_files; +} + +/* Creates a datafile and a journalfile pair */ +int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock) +{ + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_creation_started, 1, __ATOMIC_RELAXED); + + struct rrdengine_datafile *datafile; + struct rrdengine_journalfile *journalfile; + unsigned fileno = ctx_last_fileno_get(ctx) + 1; + int ret; + char path[RRDENG_PATH_MAX]; + + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "DBENGINE: creating new data and journal files in path %s", + ctx->config.dbfiles_path); + + datafile = datafile_alloc_and_init(ctx, 1, fileno); + ret = create_data_file(datafile); + if(ret) + goto error_after_datafile; + + generate_datafilepath(datafile, path, sizeof(path)); + nd_log(NDLS_DAEMON, NDLP_INFO, + "DBENGINE: created data file \"%s\".", path); + + journalfile = journalfile_alloc_and_init(datafile); + ret = journalfile_create(journalfile, datafile); + if (ret) + goto error_after_journalfile; + + journalfile_v1_generate_path(datafile, path, sizeof(path)); + nd_log(NDLS_DAEMON, NDLP_INFO, + "DBENGINE: created journal file \"%s\".", path); + + ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos); + datafile_list_insert(ctx, datafile, having_lock); + ctx_last_fileno_increment(ctx); + + return 0; + +error_after_journalfile: + destroy_data_file_unsafe(datafile); + freez(journalfile); + +error_after_datafile: + freez(datafile); + return ret; +} + +/* Page cache must already be initialized. + * Return 0 on success. + */ +int init_data_files(struct rrdengine_instance *ctx) +{ + int ret; + + fatal_assert(0 == uv_rwlock_init(&ctx->datafiles.rwlock)); + ret = scan_data_files(ctx); + if (ret < 0) { + netdata_log_error("DBENGINE: failed to scan path \"%s\".", ctx->config.dbfiles_path); + return ret; + } else if (0 == ret) { + netdata_log_info("DBENGINE: data files not found, creating in path \"%s\".", ctx->config.dbfiles_path); + ctx->atomic.last_fileno = 0; + ret = create_new_datafile_pair(ctx, false); + if (ret) { + netdata_log_error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->config.dbfiles_path); + return ret; + } + } + else { + if (ctx->loading.create_new_datafile_pair) + create_new_datafile_pair(ctx, false); + + while(rrdeng_ctx_exceeded_disk_quota(ctx)) + datafile_delete(ctx, ctx->datafiles.first, false, false); + } + + pgc_reset_hot_max(open_cache); + ctx->loading.create_new_datafile_pair = false; + return 0; +} + +void finalize_data_files(struct rrdengine_instance *ctx) +{ + bool logged = false; + + if (!ctx->datafiles.first) + return; + + while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED)) { + if(!logged) { + netdata_log_info("Waiting for inflight flush to finish on tier %d...", ctx->config.tier); + logged = true; + } + sleep_usec(100 * USEC_PER_MS); + } + + do { + struct rrdengine_datafile *datafile = ctx->datafiles.first; + struct rrdengine_journalfile *journalfile = datafile->journalfile; + + logged = false; + size_t iterations = 100; + while(!datafile_acquire_for_deletion(datafile) && datafile != ctx->datafiles.first->prev && --iterations > 0) { + if(!logged) { + netdata_log_info("Waiting to acquire data file %u of tier %d to close it...", datafile->fileno, ctx->config.tier); + logged = true; + } + sleep_usec(100 * USEC_PER_MS); + } + + logged = false; + bool available = false; + do { + uv_rwlock_wrlock(&ctx->datafiles.rwlock); + spinlock_lock(&datafile->writers.spinlock); + available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true; + + if(!available) { + spinlock_unlock(&datafile->writers.spinlock); + uv_rwlock_wrunlock(&ctx->datafiles.rwlock); + if(!logged) { + netdata_log_info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->config.tier); + logged = true; + } + sleep_usec(100 * USEC_PER_MS); + } + } while(!available); + + journalfile_close(journalfile, datafile); + close_data_file(datafile); + datafile_list_delete_unsafe(ctx, datafile); + spinlock_unlock(&datafile->writers.spinlock); + uv_rwlock_wrunlock(&ctx->datafiles.rwlock); + + freez(journalfile); + freez(datafile); + + } while(ctx->datafiles.first); +} |