diff options
Diffstat (limited to 'database/engine/datafile.c')
-rw-r--r-- | database/engine/datafile.c | 611 |
1 files changed, 0 insertions, 611 deletions
diff --git a/database/engine/datafile.c b/database/engine/datafile.c deleted file mode 100644 index 7322039c..00000000 --- a/database/engine/datafile.c +++ /dev/null @@ -1,611 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later -#include "rrdengine.h" - -void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool having_lock) -{ - if(!having_lock) - uv_rwlock_wrlock(&ctx->datafiles.rwlock); - - DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(ctx->datafiles.first, datafile, prev, next); - - if(!having_lock) - uv_rwlock_wrunlock(&ctx->datafiles.rwlock); -} - -void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) -{ - DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(ctx->datafiles.first, datafile, prev, next); -} - - -static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) -{ - fatal_assert(tier == 1); - - struct rrdengine_datafile *datafile = callocz(1, sizeof(struct rrdengine_datafile)); - - datafile->tier = tier; - datafile->fileno = fileno; - fatal_assert(0 == uv_rwlock_init(&datafile->extent_rwlock)); - datafile->ctx = ctx; - - datafile->users.available = true; - - spinlock_init(&datafile->users.spinlock); - spinlock_init(&datafile->writers.spinlock); - spinlock_init(&datafile->extent_queries.spinlock); - - return datafile; -} - -bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) { - bool ret; - - spinlock_lock(&df->users.spinlock); - - if(df->users.available) { - ret = true; - df->users.lockers++; - df->users.lockers_by_reason[reason]++; - } - else - ret = false; - - spinlock_unlock(&df->users.spinlock); - - return ret; -} - -void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) { - spinlock_lock(&df->users.spinlock); - if(!df->users.lockers) - fatal("DBENGINE DATAFILE: cannot release a datafile that is not acquired"); - - df->users.lockers--; - df->users.lockers_by_reason[reason]--; - spinlock_unlock(&df->users.spinlock); -} - -bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { - bool can_be_deleted = false; - - spinlock_lock(&df->users.spinlock); - df->users.available = false; - - if(!df->users.lockers) - can_be_deleted = true; - - else { - // there are lockers - - // evict any pages referencing this in the open cache - spinlock_unlock(&df->users.spinlock); - pgc_open_evict_clean_pages_of_datafile(open_cache, df); - spinlock_lock(&df->users.spinlock); - - if(!df->users.lockers) - can_be_deleted = true; - - else { - // there are lockers still - - // count the number of pages referencing this in the open cache - spinlock_unlock(&df->users.spinlock); - usec_t time_to_scan_ut = now_monotonic_usec(); - size_t clean_pages_in_open_cache = pgc_count_clean_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df); - size_t hot_pages_in_open_cache = pgc_count_hot_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df); - time_to_scan_ut = now_monotonic_usec() - time_to_scan_ut; - spinlock_lock(&df->users.spinlock); - - if(!df->users.lockers) - can_be_deleted = true; - - else if(!clean_pages_in_open_cache && !hot_pages_in_open_cache) { - // no pages in the open cache related to this datafile - - time_t now_s = now_monotonic_sec(); - - if(!df->users.time_to_evict) { - // first time we did the above - df->users.time_to_evict = now_s + 120; - internal_error(true, "DBENGINE: datafile %u of tier %d is not used by any open cache pages, " - "but it has %u lockers (oc:%u, pd:%u), " - "%zu clean and %zu hot open cache pages " - "- will be deleted shortly " - "(scanned open cache in %"PRIu64" usecs)", - df->fileno, df->ctx->config.tier, - df->users.lockers, - df->users.lockers_by_reason[DATAFILE_ACQUIRE_OPEN_CACHE], - df->users.lockers_by_reason[DATAFILE_ACQUIRE_PAGE_DETAILS], - clean_pages_in_open_cache, - hot_pages_in_open_cache, - time_to_scan_ut); - } - - else if(now_s > df->users.time_to_evict) { - // time expired, lets remove it - can_be_deleted = true; - internal_error(true, "DBENGINE: datafile %u of tier %d is not used by any open cache pages, " - "but it has %u lockers (oc:%u, pd:%u), " - "%zu clean and %zu hot open cache pages " - "- will be deleted now " - "(scanned open cache in %"PRIu64" usecs)", - df->fileno, df->ctx->config.tier, - df->users.lockers, - df->users.lockers_by_reason[DATAFILE_ACQUIRE_OPEN_CACHE], - df->users.lockers_by_reason[DATAFILE_ACQUIRE_PAGE_DETAILS], - clean_pages_in_open_cache, - hot_pages_in_open_cache, - time_to_scan_ut); - } - } - else - internal_error(true, "DBENGINE: datafile %u of tier %d " - "has %u lockers (oc:%u, pd:%u), " - "%zu clean and %zu hot open cache pages " - "(scanned open cache in %"PRIu64" usecs)", - df->fileno, df->ctx->config.tier, - df->users.lockers, - df->users.lockers_by_reason[DATAFILE_ACQUIRE_OPEN_CACHE], - df->users.lockers_by_reason[DATAFILE_ACQUIRE_PAGE_DETAILS], - clean_pages_in_open_cache, - hot_pages_in_open_cache, - time_to_scan_ut); - } - } - spinlock_unlock(&df->users.spinlock); - - return can_be_deleted; -} - -void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) -{ - (void) snprintfz(str, maxlen - 1, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION, - datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno); -} - -int close_data_file(struct rrdengine_datafile *datafile) -{ - struct rrdengine_instance *ctx = datafile->ctx; - uv_fs_t req; - int ret; - char path[RRDENG_PATH_MAX]; - - generate_datafilepath(datafile, path, sizeof(path)); - - ret = uv_fs_close(NULL, &req, datafile->file, NULL); - if (ret < 0) { - netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); - ctx_fs_error(ctx); - } - uv_fs_req_cleanup(&req); - - return ret; -} - -int unlink_data_file(struct rrdengine_datafile *datafile) -{ - struct rrdengine_instance *ctx = datafile->ctx; - uv_fs_t req; - int ret; - char path[RRDENG_PATH_MAX]; - - generate_datafilepath(datafile, path, sizeof(path)); - - ret = uv_fs_unlink(NULL, &req, path, NULL); - if (ret < 0) { - netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); - ctx_fs_error(ctx); - } - uv_fs_req_cleanup(&req); - - __atomic_add_fetch(&ctx->stats.datafile_deletions, 1, __ATOMIC_RELAXED); - - return ret; -} - -int destroy_data_file_unsafe(struct rrdengine_datafile *datafile) -{ - struct rrdengine_instance *ctx = datafile->ctx; - uv_fs_t req; - int ret; - char path[RRDENG_PATH_MAX]; - - generate_datafilepath(datafile, path, sizeof(path)); - - ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL); - if (ret < 0) { - netdata_log_error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); - ctx_fs_error(ctx); - } - uv_fs_req_cleanup(&req); - - ret = uv_fs_close(NULL, &req, datafile->file, NULL); - if (ret < 0) { - netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); - ctx_fs_error(ctx); - } - uv_fs_req_cleanup(&req); - - ret = uv_fs_unlink(NULL, &req, path, NULL); - if (ret < 0) { - netdata_log_error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); - ctx_fs_error(ctx); - } - uv_fs_req_cleanup(&req); - - __atomic_add_fetch(&ctx->stats.datafile_deletions, 1, __ATOMIC_RELAXED); - - return ret; -} - -int create_data_file(struct rrdengine_datafile *datafile) -{ - struct rrdengine_instance *ctx = datafile->ctx; - uv_fs_t req; - uv_file file; - int ret, fd; - struct rrdeng_df_sb *superblock; - uv_buf_t iov; - char path[RRDENG_PATH_MAX]; - - generate_datafilepath(datafile, path, sizeof(path)); - fd = open_file_for_io(path, O_CREAT | O_RDWR | O_TRUNC, &file, use_direct_io); - if (fd < 0) { - ctx_fs_error(ctx); - return fd; - } - datafile->file = file; - __atomic_add_fetch(&ctx->stats.datafile_creations, 1, __ATOMIC_RELAXED); - - ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); - if (unlikely(ret)) { - fatal("DBENGINE: posix_memalign:%s", strerror(ret)); - } - memset(superblock, 0, sizeof(*superblock)); - (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ); - (void) strncpy(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ); - superblock->tier = 1; - - iov = uv_buf_init((void *)superblock, sizeof(*superblock)); - - ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); - if (ret < 0) { - fatal_assert(req.result < 0); - netdata_log_error("DBENGINE: uv_fs_write: %s", uv_strerror(ret)); - ctx_io_error(ctx); - } - uv_fs_req_cleanup(&req); - posix_memfree(superblock); - if (ret < 0) { - destroy_data_file_unsafe(datafile); - return ret; - } - - datafile->pos = sizeof(*superblock); - ctx_io_write_op_bytes(ctx, sizeof(*superblock)); - - return 0; -} - -static int check_data_file_superblock(uv_file file) -{ - int ret; - struct rrdeng_df_sb *superblock; - uv_buf_t iov; - uv_fs_t req; - - ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); - if (unlikely(ret)) { - fatal("DBENGINE: posix_memalign:%s", strerror(ret)); - } - iov = uv_buf_init((void *)superblock, sizeof(*superblock)); - - ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); - if (ret < 0) { - netdata_log_error("DBENGINE: uv_fs_read: %s", uv_strerror(ret)); - uv_fs_req_cleanup(&req); - goto error; - } - fatal_assert(req.result >= 0); - uv_fs_req_cleanup(&req); - - if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) || - strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) || - superblock->tier != 1) { - netdata_log_error("DBENGINE: file has invalid superblock."); - ret = UV_EINVAL; - } else { - ret = 0; - } - error: - posix_memfree(superblock); - return ret; -} - -static int load_data_file(struct rrdengine_datafile *datafile) -{ - struct rrdengine_instance *ctx = datafile->ctx; - uv_fs_t req; - uv_file file; - int ret, fd, error; - uint64_t file_size; - char path[RRDENG_PATH_MAX]; - - generate_datafilepath(datafile, path, sizeof(path)); - fd = open_file_for_io(path, O_RDWR, &file, use_direct_io); - if (fd < 0) { - ctx_fs_error(ctx); - return fd; - } - - nd_log_daemon(NDLP_DEBUG, "DBENGINE: initializing data file \"%s\".", path); - - ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); - if (ret) - goto error; - file_size = ALIGN_BYTES_CEILING(file_size); - - ret = check_data_file_superblock(file); - if (ret) - goto error; - - ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_df_sb)); - - datafile->file = file; - datafile->pos = file_size; - - nd_log_daemon(NDLP_DEBUG, "DBENGINE: data file \"%s\" initialized (size:%" PRIu64 ").", path, file_size); - - return 0; - - error: - error = ret; - ret = uv_fs_close(NULL, &req, file, NULL); - if (ret < 0) { - netdata_log_error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); - ctx_fs_error(ctx); - } - uv_fs_req_cleanup(&req); - return error; -} - -static int scan_data_files_cmp(const void *a, const void *b) -{ - struct rrdengine_datafile *file1, *file2; - char path1[RRDENG_PATH_MAX], path2[RRDENG_PATH_MAX]; - - file1 = *(struct rrdengine_datafile **)a; - file2 = *(struct rrdengine_datafile **)b; - generate_datafilepath(file1, path1, sizeof(path1)); - generate_datafilepath(file2, path2, sizeof(path2)); - return strcmp(path1, path2); -} - -/* Returns number of datafiles that were loaded or < 0 on error */ -static int scan_data_files(struct rrdengine_instance *ctx) -{ - int ret, matched_files, failed_to_load, i; - unsigned tier, no; - uv_fs_t req; - uv_dirent_t dent; - struct rrdengine_datafile **datafiles, *datafile; - struct rrdengine_journalfile *journalfile; - - ret = uv_fs_scandir(NULL, &req, ctx->config.dbfiles_path, 0, NULL); - if (ret < 0) { - fatal_assert(req.result < 0); - uv_fs_req_cleanup(&req); - netdata_log_error("DBENGINE: uv_fs_scandir(%s): %s", ctx->config.dbfiles_path, uv_strerror(ret)); - ctx_fs_error(ctx); - return ret; - } - netdata_log_info("DBENGINE: found %d files in path %s", ret, ctx->config.dbfiles_path); - - datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); - for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { - ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no); - if (2 == ret) { - datafile = datafile_alloc_and_init(ctx, tier, no); - datafiles[matched_files++] = datafile; - } - } - uv_fs_req_cleanup(&req); - - if (0 == matched_files) { - freez(datafiles); - return 0; - } - - if (matched_files == MAX_DATAFILES) - netdata_log_error("DBENGINE: warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); - - qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp); - - ctx->atomic.last_fileno = datafiles[matched_files - 1]->fileno; - - netdata_log_info("DBENGINE: loading %d data/journal of tier %d...", matched_files, ctx->config.tier); - for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { - uint8_t must_delete_pair = 0; - - datafile = datafiles[i]; - ret = load_data_file(datafile); - if (0 != ret) - must_delete_pair = 1; - - journalfile = journalfile_alloc_and_init(datafile); - ret = journalfile_load(ctx, journalfile, datafile); - if (0 != ret) { - if (!must_delete_pair) /* If datafile is still open close it */ - close_data_file(datafile); - must_delete_pair = 1; - } - - if (must_delete_pair) { - char path[RRDENG_PATH_MAX]; - - netdata_log_error("DBENGINE: deleting invalid data and journal file pair."); - ret = journalfile_unlink(journalfile); - if (!ret) { - journalfile_v1_generate_path(datafile, path, sizeof(path)); - netdata_log_info("DBENGINE: deleted journal file \"%s\".", path); - } - ret = unlink_data_file(datafile); - if (!ret) { - generate_datafilepath(datafile, path, sizeof(path)); - netdata_log_info("DBENGINE: deleted data file \"%s\".", path); - } - freez(journalfile); - freez(datafile); - ++failed_to_load; - continue; - } - - ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos); - datafile_list_insert(ctx, datafile, false); - } - - matched_files -= failed_to_load; - freez(datafiles); - - return matched_files; -} - -/* Creates a datafile and a journalfile pair */ -int create_new_datafile_pair(struct rrdengine_instance *ctx, bool having_lock) -{ - __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_creation_started, 1, __ATOMIC_RELAXED); - - struct rrdengine_datafile *datafile; - struct rrdengine_journalfile *journalfile; - unsigned fileno = ctx_last_fileno_get(ctx) + 1; - int ret; - char path[RRDENG_PATH_MAX]; - - nd_log(NDLS_DAEMON, NDLP_DEBUG, - "DBENGINE: creating new data and journal files in path %s", - ctx->config.dbfiles_path); - - datafile = datafile_alloc_and_init(ctx, 1, fileno); - ret = create_data_file(datafile); - if(ret) - goto error_after_datafile; - - generate_datafilepath(datafile, path, sizeof(path)); - nd_log(NDLS_DAEMON, NDLP_INFO, - "DBENGINE: created data file \"%s\".", path); - - journalfile = journalfile_alloc_and_init(datafile); - ret = journalfile_create(journalfile, datafile); - if (ret) - goto error_after_journalfile; - - journalfile_v1_generate_path(datafile, path, sizeof(path)); - nd_log(NDLS_DAEMON, NDLP_INFO, - "DBENGINE: created journal file \"%s\".", path); - - ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos); - datafile_list_insert(ctx, datafile, having_lock); - ctx_last_fileno_increment(ctx); - - return 0; - -error_after_journalfile: - destroy_data_file_unsafe(datafile); - freez(journalfile); - -error_after_datafile: - freez(datafile); - return ret; -} - -/* Page cache must already be initialized. - * Return 0 on success. - */ -int init_data_files(struct rrdengine_instance *ctx) -{ - int ret; - - fatal_assert(0 == uv_rwlock_init(&ctx->datafiles.rwlock)); - ret = scan_data_files(ctx); - if (ret < 0) { - netdata_log_error("DBENGINE: failed to scan path \"%s\".", ctx->config.dbfiles_path); - return ret; - } else if (0 == ret) { - netdata_log_info("DBENGINE: data files not found, creating in path \"%s\".", ctx->config.dbfiles_path); - ctx->atomic.last_fileno = 0; - ret = create_new_datafile_pair(ctx, false); - if (ret) { - netdata_log_error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->config.dbfiles_path); - return ret; - } - } - else { - if (ctx->loading.create_new_datafile_pair) - create_new_datafile_pair(ctx, false); - - while(rrdeng_ctx_exceeded_disk_quota(ctx)) - datafile_delete(ctx, ctx->datafiles.first, false, false); - } - - pgc_reset_hot_max(open_cache); - ctx->loading.create_new_datafile_pair = false; - return 0; -} - -void finalize_data_files(struct rrdengine_instance *ctx) -{ - bool logged = false; - - logged = false; - while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED)) { - if(!logged) { - netdata_log_info("Waiting for inflight flush to finish on tier %d...", ctx->config.tier); - logged = true; - } - sleep_usec(100 * USEC_PER_MS); - } - - do { - struct rrdengine_datafile *datafile = ctx->datafiles.first; - struct rrdengine_journalfile *journalfile = datafile->journalfile; - - logged = false; - size_t iterations = 100; - while(!datafile_acquire_for_deletion(datafile) && datafile != ctx->datafiles.first->prev && --iterations > 0) { - if(!logged) { - netdata_log_info("Waiting to acquire data file %u of tier %d to close it...", datafile->fileno, ctx->config.tier); - logged = true; - } - sleep_usec(100 * USEC_PER_MS); - } - - logged = false; - bool available = false; - do { - uv_rwlock_wrlock(&ctx->datafiles.rwlock); - spinlock_lock(&datafile->writers.spinlock); - available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true; - - if(!available) { - spinlock_unlock(&datafile->writers.spinlock); - uv_rwlock_wrunlock(&ctx->datafiles.rwlock); - if(!logged) { - netdata_log_info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->config.tier); - logged = true; - } - sleep_usec(100 * USEC_PER_MS); - } - } while(!available); - - journalfile_close(journalfile, datafile); - close_data_file(datafile); - datafile_list_delete_unsafe(ctx, datafile); - spinlock_unlock(&datafile->writers.spinlock); - uv_rwlock_wrunlock(&ctx->datafiles.rwlock); - - freez(journalfile); - freez(datafile); - - } while(ctx->datafiles.first); -} |