diff options
Diffstat (limited to 'database/engine/datafile.c')
-rw-r--r-- | database/engine/datafile.c | 415 |
1 files changed, 281 insertions, 134 deletions
diff --git a/database/engine/datafile.c b/database/engine/datafile.c index 9c70068d9..286ae1e30 100644 --- a/database/engine/datafile.c +++ b/database/engine/datafile.c @@ -1,58 +1,174 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "rrdengine.h" -void df_extent_insert(struct extent_info *extent) -{ - struct rrdengine_datafile *datafile = extent->datafile; - - if (likely(NULL != datafile->extents.last)) { - datafile->extents.last->next = extent; - } - if (unlikely(NULL == datafile->extents.first)) { - datafile->extents.first = extent; - } - datafile->extents.last = extent; -} - void datafile_list_insert(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) { - if (likely(NULL != ctx->datafiles.last)) { - ctx->datafiles.last->next = datafile; - } - if (unlikely(NULL == ctx->datafiles.first)) { - ctx->datafiles.first = datafile; - } - ctx->datafiles.last = datafile; + uv_rwlock_wrlock(&ctx->datafiles.rwlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(ctx->datafiles.first, datafile, prev, next); + uv_rwlock_wrunlock(&ctx->datafiles.rwlock); } -void datafile_list_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) +void datafile_list_delete_unsafe(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) { - struct rrdengine_datafile *next; - - next = datafile->next; - fatal_assert((NULL != next) && (ctx->datafiles.first == datafile) && (ctx->datafiles.last != datafile)); - ctx->datafiles.first = next; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(ctx->datafiles.first, datafile, prev, next); } -static void datafile_init(struct rrdengine_datafile *datafile, struct rrdengine_instance *ctx, - unsigned tier, unsigned fileno) +static struct rrdengine_datafile *datafile_alloc_and_init(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) { fatal_assert(tier == 1); + + struct rrdengine_datafile *datafile = callocz(1, sizeof(struct rrdengine_datafile)); + datafile->tier = tier; datafile->fileno = fileno; - datafile->file = (uv_file)0; - datafile->pos = 0; - datafile->extents.first = datafile->extents.last = NULL; /* will be populated by journalfile */ - datafile->journalfile = NULL; - datafile->next = NULL; + fatal_assert(0 == uv_rwlock_init(&datafile->extent_rwlock)); datafile->ctx = ctx; + + datafile->users.available = true; + + netdata_spinlock_init(&datafile->users.spinlock); + netdata_spinlock_init(&datafile->writers.spinlock); + netdata_spinlock_init(&datafile->extent_queries.spinlock); + + return datafile; +} + +void datafile_acquire_dup(struct rrdengine_datafile *df) { + netdata_spinlock_lock(&df->users.spinlock); + + if(!df->users.lockers) + fatal("DBENGINE: datafile is not acquired to duplicate"); + + df->users.lockers++; + + netdata_spinlock_unlock(&df->users.spinlock); +} + +bool datafile_acquire(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) { + bool ret; + + netdata_spinlock_lock(&df->users.spinlock); + + if(df->users.available) { + ret = true; + df->users.lockers++; + df->users.lockers_by_reason[reason]++; + } + else + ret = false; + + netdata_spinlock_unlock(&df->users.spinlock); + + return ret; +} + +void datafile_release(struct rrdengine_datafile *df, DATAFILE_ACQUIRE_REASONS reason) { + netdata_spinlock_lock(&df->users.spinlock); + if(!df->users.lockers) + fatal("DBENGINE DATAFILE: cannot release a datafile that is not acquired"); + + df->users.lockers--; + df->users.lockers_by_reason[reason]--; + netdata_spinlock_unlock(&df->users.spinlock); +} + +bool datafile_acquire_for_deletion(struct rrdengine_datafile *df) { + bool can_be_deleted = false; + + netdata_spinlock_lock(&df->users.spinlock); + df->users.available = false; + + if(!df->users.lockers) + can_be_deleted = true; + + else { + // there are lockers + + // evict any pages referencing this in the open cache + netdata_spinlock_unlock(&df->users.spinlock); + pgc_open_evict_clean_pages_of_datafile(open_cache, df); + netdata_spinlock_lock(&df->users.spinlock); + + if(!df->users.lockers) + can_be_deleted = true; + + else { + // there are lockers still + + // count the number of pages referencing this in the open cache + netdata_spinlock_unlock(&df->users.spinlock); + usec_t time_to_scan_ut = now_monotonic_usec(); + size_t clean_pages_in_open_cache = pgc_count_clean_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df); + size_t hot_pages_in_open_cache = pgc_count_hot_pages_having_data_ptr(open_cache, (Word_t)df->ctx, df); + time_to_scan_ut = now_monotonic_usec() - time_to_scan_ut; + netdata_spinlock_lock(&df->users.spinlock); + + if(!df->users.lockers) + can_be_deleted = true; + + else if(!clean_pages_in_open_cache && !hot_pages_in_open_cache) { + // no pages in the open cache related to this datafile + + time_t now_s = now_monotonic_sec(); + + if(!df->users.time_to_evict) { + // first time we did the above + df->users.time_to_evict = now_s + 120; + internal_error(true, "DBENGINE: datafile %u of tier %d is not used by any open cache pages, " + "but it has %u lockers (oc:%u, pd:%u), " + "%zu clean and %zu hot open cache pages " + "- will be deleted shortly " + "(scanned open cache in %llu usecs)", + df->fileno, df->ctx->config.tier, + df->users.lockers, + df->users.lockers_by_reason[DATAFILE_ACQUIRE_OPEN_CACHE], + df->users.lockers_by_reason[DATAFILE_ACQUIRE_PAGE_DETAILS], + clean_pages_in_open_cache, + hot_pages_in_open_cache, + time_to_scan_ut); + } + + else if(now_s > df->users.time_to_evict) { + // time expired, lets remove it + can_be_deleted = true; + internal_error(true, "DBENGINE: datafile %u of tier %d is not used by any open cache pages, " + "but it has %u lockers (oc:%u, pd:%u), " + "%zu clean and %zu hot open cache pages " + "- will be deleted now " + "(scanned open cache in %llu usecs)", + df->fileno, df->ctx->config.tier, + df->users.lockers, + df->users.lockers_by_reason[DATAFILE_ACQUIRE_OPEN_CACHE], + df->users.lockers_by_reason[DATAFILE_ACQUIRE_PAGE_DETAILS], + clean_pages_in_open_cache, + hot_pages_in_open_cache, + time_to_scan_ut); + } + } + else + internal_error(true, "DBENGINE: datafile %u of tier %d " + "has %u lockers (oc:%u, pd:%u), " + "%zu clean and %zu hot open cache pages " + "(scanned open cache in %llu usecs)", + df->fileno, df->ctx->config.tier, + df->users.lockers, + df->users.lockers_by_reason[DATAFILE_ACQUIRE_OPEN_CACHE], + df->users.lockers_by_reason[DATAFILE_ACQUIRE_PAGE_DETAILS], + clean_pages_in_open_cache, + hot_pages_in_open_cache, + time_to_scan_ut); + } + } + netdata_spinlock_unlock(&df->users.spinlock); + + return can_be_deleted; } void generate_datafilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen) { (void) snprintfz(str, maxlen, "%s/" DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION, - datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno); + datafile->ctx->config.dbfiles_path, datafile->tier, datafile->fileno); } int close_data_file(struct rrdengine_datafile *datafile) @@ -66,9 +182,8 @@ int close_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_close(NULL, &req, datafile->file, NULL); if (ret < 0) { - error("uv_fs_close(%s): %s", path, uv_strerror(ret)); - ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); + error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); @@ -86,18 +201,17 @@ int unlink_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_unlink(NULL, &req, path, NULL); if (ret < 0) { - error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); - ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); + error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); - ++ctx->stats.datafile_deletions; + __atomic_add_fetch(&ctx->stats.datafile_deletions, 1, __ATOMIC_RELAXED); return ret; } -int destroy_data_file(struct rrdengine_datafile *datafile) +int destroy_data_file_unsafe(struct rrdengine_datafile *datafile) { struct rrdengine_instance *ctx = datafile->ctx; uv_fs_t req; @@ -108,29 +222,26 @@ int destroy_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_ftruncate(NULL, &req, datafile->file, 0, NULL); if (ret < 0) { - error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); - ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); + error("DBENGINE: uv_fs_ftruncate(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); ret = uv_fs_close(NULL, &req, datafile->file, NULL); if (ret < 0) { - error("uv_fs_close(%s): %s", path, uv_strerror(ret)); - ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); + error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); ret = uv_fs_unlink(NULL, &req, path, NULL); if (ret < 0) { - error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); - ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); + error("DBENGINE: uv_fs_fsunlink(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); - ++ctx->stats.datafile_deletions; + __atomic_add_fetch(&ctx->stats.datafile_deletions, 1, __ATOMIC_RELAXED); return ret; } @@ -146,18 +257,17 @@ int create_data_file(struct rrdengine_datafile *datafile) char path[RRDENG_PATH_MAX]; generate_datafilepath(datafile, path, sizeof(path)); - fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file); + fd = open_file_for_io(path, O_CREAT | O_RDWR | O_TRUNC, &file, use_direct_io); if (fd < 0) { - ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); + ctx_fs_error(ctx); return fd; } datafile->file = file; - ++ctx->stats.datafile_creations; + __atomic_add_fetch(&ctx->stats.datafile_creations, 1, __ATOMIC_RELAXED); ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); if (unlikely(ret)) { - fatal("posix_memalign:%s", strerror(ret)); + fatal("DBENGINE: posix_memalign:%s", strerror(ret)); } memset(superblock, 0, sizeof(*superblock)); (void) strncpy(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ); @@ -169,20 +279,18 @@ int create_data_file(struct rrdengine_datafile *datafile) ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { fatal_assert(req.result < 0); - error("uv_fs_write: %s", uv_strerror(ret)); - ++ctx->stats.io_errors; - rrd_stat_atomic_add(&global_io_errors, 1); + error("DBENGINE: uv_fs_write: %s", uv_strerror(ret)); + ctx_io_error(ctx); } uv_fs_req_cleanup(&req); posix_memfree(superblock); if (ret < 0) { - destroy_data_file(datafile); + destroy_data_file_unsafe(datafile); return ret; } datafile->pos = sizeof(*superblock); - ctx->stats.io_write_bytes += sizeof(*superblock); - ++ctx->stats.io_write_requests; + ctx_io_write_op_bytes(ctx, sizeof(*superblock)); return 0; } @@ -196,13 +304,13 @@ static int check_data_file_superblock(uv_file file) ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock)); if (unlikely(ret)) { - fatal("posix_memalign:%s", strerror(ret)); + fatal("DBENGINE: posix_memalign:%s", strerror(ret)); } iov = uv_buf_init((void *)superblock, sizeof(*superblock)); ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL); if (ret < 0) { - error("uv_fs_read: %s", uv_strerror(ret)); + error("DBENGINE: uv_fs_read: %s", uv_strerror(ret)); uv_fs_req_cleanup(&req); goto error; } @@ -212,7 +320,7 @@ static int check_data_file_superblock(uv_file file) if (strncmp(superblock->magic_number, RRDENG_DF_MAGIC, RRDENG_MAGIC_SZ) || strncmp(superblock->version, RRDENG_DF_VER, RRDENG_VER_SZ) || superblock->tier != 1) { - error("File has invalid superblock."); + error("DBENGINE: file has invalid superblock."); ret = UV_EINVAL; } else { ret = 0; @@ -232,13 +340,12 @@ static int load_data_file(struct rrdengine_datafile *datafile) char path[RRDENG_PATH_MAX]; generate_datafilepath(datafile, path, sizeof(path)); - fd = open_file_direct_io(path, O_RDWR, &file); + fd = open_file_for_io(path, O_RDWR, &file, use_direct_io); if (fd < 0) { - ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); + ctx_fs_error(ctx); return fd; } - info("Initializing data file \"%s\".", path); + info("DBENGINE: initializing data file \"%s\".", path); ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb)); if (ret) @@ -248,22 +355,21 @@ static int load_data_file(struct rrdengine_datafile *datafile) ret = check_data_file_superblock(file); if (ret) goto error; - ctx->stats.io_read_bytes += sizeof(struct rrdeng_df_sb); - ++ctx->stats.io_read_requests; + + ctx_io_read_op_bytes(ctx, sizeof(struct rrdeng_df_sb)); datafile->file = file; datafile->pos = file_size; - info("Data file \"%s\" initialized (size:%"PRIu64").", path, file_size); + info("DBENGINE: data file \"%s\" initialized (size:%"PRIu64").", path, file_size); return 0; error: error = ret; ret = uv_fs_close(NULL, &req, file, NULL); if (ret < 0) { - error("uv_fs_close(%s): %s", path, uv_strerror(ret)); - ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); + error("DBENGINE: uv_fs_close(%s): %s", path, uv_strerror(ret)); + ctx_fs_error(ctx); } uv_fs_req_cleanup(&req); return error; @@ -286,30 +392,26 @@ static int scan_data_files(struct rrdengine_instance *ctx) { int ret; unsigned tier, no, matched_files, i,failed_to_load; - static uv_fs_t req; + uv_fs_t req; uv_dirent_t dent; struct rrdengine_datafile **datafiles, *datafile; struct rrdengine_journalfile *journalfile; - ret = uv_fs_scandir(NULL, &req, ctx->dbfiles_path, 0, NULL); + ret = uv_fs_scandir(NULL, &req, ctx->config.dbfiles_path, 0, NULL); if (ret < 0) { fatal_assert(req.result < 0); uv_fs_req_cleanup(&req); - error("uv_fs_scandir(%s): %s", ctx->dbfiles_path, uv_strerror(ret)); - ++ctx->stats.fs_errors; - rrd_stat_atomic_add(&global_fs_errors, 1); + error("DBENGINE: uv_fs_scandir(%s): %s", ctx->config.dbfiles_path, uv_strerror(ret)); + ctx_fs_error(ctx); return ret; } - info("Found %d files in path %s", ret, ctx->dbfiles_path); + info("DBENGINE: found %d files in path %s", ret, ctx->config.dbfiles_path); datafiles = callocz(MIN(ret, MAX_DATAFILES), sizeof(*datafiles)); for (matched_files = 0 ; UV_EOF != uv_fs_scandir_next(&req, &dent) && matched_files < MAX_DATAFILES ; ) { - info("Scanning file \"%s/%s\"", ctx->dbfiles_path, dent.name); ret = sscanf(dent.name, DATAFILE_PREFIX RRDENG_FILE_NUMBER_SCAN_TMPL DATAFILE_EXTENSION, &tier, &no); if (2 == ret) { - info("Matched file \"%s/%s\"", ctx->dbfiles_path, dent.name); - datafile = mallocz(sizeof(*datafile)); - datafile_init(datafile, ctx, tier, no); + datafile = datafile_alloc_and_init(ctx, tier, no); datafiles[matched_files++] = datafile; } } @@ -320,11 +422,11 @@ static int scan_data_files(struct rrdengine_instance *ctx) return 0; } if (matched_files == MAX_DATAFILES) { - error("Warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); + error("DBENGINE: warning: hit maximum database engine file limit of %d files", MAX_DATAFILES); } qsort(datafiles, matched_files, sizeof(*datafiles), scan_data_files_cmp); /* TODO: change this when tiering is implemented */ - ctx->last_fileno = datafiles[matched_files - 1]->fileno; + ctx->atomic.last_fileno = datafiles[matched_files - 1]->fileno; for (failed_to_load = 0, i = 0 ; i < matched_files ; ++i) { uint8_t must_delete_pair = 0; @@ -334,10 +436,8 @@ static int scan_data_files(struct rrdengine_instance *ctx) if (0 != ret) { must_delete_pair = 1; } - journalfile = mallocz(sizeof(*journalfile)); - datafile->journalfile = journalfile; - journalfile_init(journalfile, datafile); - ret = load_journal_file(ctx, journalfile, datafile); + journalfile = journalfile_alloc_and_init(datafile); + ret = journalfile_load(ctx, journalfile, datafile); if (0 != ret) { if (!must_delete_pair) /* If datafile is still open close it */ close_data_file(datafile); @@ -346,16 +446,16 @@ static int scan_data_files(struct rrdengine_instance *ctx) if (must_delete_pair) { char path[RRDENG_PATH_MAX]; - error("Deleting invalid data and journal file pair."); - ret = unlink_journal_file(journalfile); + error("DBENGINE: deleting invalid data and journal file pair."); + ret = journalfile_unlink(journalfile); if (!ret) { - generate_journalfilepath(datafile, path, sizeof(path)); - info("Deleted journal file \"%s\".", path); + journalfile_v1_generate_path(datafile, path, sizeof(path)); + info("DBENGINE: deleted journal file \"%s\".", path); } ret = unlink_data_file(datafile); if (!ret) { generate_datafilepath(datafile, path, sizeof(path)); - info("Deleted data file \"%s\".", path); + info("DBENGINE: deleted data file \"%s\".", path); } freez(journalfile); freez(datafile); @@ -363,8 +463,8 @@ static int scan_data_files(struct rrdengine_instance *ctx) continue; } + ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos); datafile_list_insert(ctx, datafile); - ctx->disk_space += datafile->pos + journalfile->pos; } matched_files -= failed_to_load; freez(datafiles); @@ -373,42 +473,43 @@ static int scan_data_files(struct rrdengine_instance *ctx) } /* Creates a datafile and a journalfile pair */ -int create_new_datafile_pair(struct rrdengine_instance *ctx, unsigned tier, unsigned fileno) +int create_new_datafile_pair(struct rrdengine_instance *ctx) { + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_creation_started, 1, __ATOMIC_RELAXED); + struct rrdengine_datafile *datafile; struct rrdengine_journalfile *journalfile; + unsigned fileno = ctx_last_fileno_get(ctx) + 1; int ret; char path[RRDENG_PATH_MAX]; - info("Creating new data and journal files in path %s", ctx->dbfiles_path); - datafile = mallocz(sizeof(*datafile)); - datafile_init(datafile, ctx, tier, fileno); + info("DBENGINE: creating new data and journal files in path %s", ctx->config.dbfiles_path); + datafile = datafile_alloc_and_init(ctx, 1, fileno); ret = create_data_file(datafile); - if (!ret) { - generate_datafilepath(datafile, path, sizeof(path)); - info("Created data file \"%s\".", path); - } else { + if(ret) goto error_after_datafile; - } - journalfile = mallocz(sizeof(*journalfile)); - datafile->journalfile = journalfile; - journalfile_init(journalfile, datafile); - ret = create_journal_file(journalfile, datafile); - if (!ret) { - generate_journalfilepath(datafile, path, sizeof(path)); - info("Created journal file \"%s\".", path); - } else { + generate_datafilepath(datafile, path, sizeof(path)); + info("DBENGINE: created data file \"%s\".", path); + + journalfile = journalfile_alloc_and_init(datafile); + ret = journalfile_create(journalfile, datafile); + if (ret) goto error_after_journalfile; - } + + journalfile_v1_generate_path(datafile, path, sizeof(path)); + info("DBENGINE: created journal file \"%s\".", path); + + ctx_current_disk_space_increase(ctx, datafile->pos + journalfile->unsafe.pos); datafile_list_insert(ctx, datafile); - ctx->disk_space += datafile->pos + journalfile->pos; + ctx_last_fileno_increment(ctx); return 0; error_after_journalfile: - destroy_data_file(datafile); + destroy_data_file_unsafe(datafile); freez(journalfile); + error_after_datafile: freez(datafile); return ret; @@ -421,40 +522,86 @@ int init_data_files(struct rrdengine_instance *ctx) { int ret; + fatal_assert(0 == uv_rwlock_init(&ctx->datafiles.rwlock)); ret = scan_data_files(ctx); if (ret < 0) { - error("Failed to scan path \"%s\".", ctx->dbfiles_path); + error("DBENGINE: failed to scan path \"%s\".", ctx->config.dbfiles_path); return ret; } else if (0 == ret) { - info("Data files not found, creating in path \"%s\".", ctx->dbfiles_path); - ret = create_new_datafile_pair(ctx, 1, 1); + info("DBENGINE: data files not found, creating in path \"%s\".", ctx->config.dbfiles_path); + ctx->atomic.last_fileno = 0; + ret = create_new_datafile_pair(ctx); if (ret) { - error("Failed to create data and journal files in path \"%s\".", ctx->dbfiles_path); + error("DBENGINE: failed to create data and journal files in path \"%s\".", ctx->config.dbfiles_path); return ret; } - ctx->last_fileno = 1; + } + else { + if (ctx->loading.create_new_datafile_pair) + create_new_datafile_pair(ctx); + + while(rrdeng_ctx_exceeded_disk_quota(ctx)) + datafile_delete(ctx, ctx->datafiles.first, false, false); } + pgc_reset_hot_max(open_cache); + ctx->loading.create_new_datafile_pair = false; return 0; } void finalize_data_files(struct rrdengine_instance *ctx) { - struct rrdengine_datafile *datafile, *next_datafile; - struct rrdengine_journalfile *journalfile; - struct extent_info *extent, *next_extent; + bool logged = false; + + logged = false; + while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED)) { + if(!logged) { + info("Waiting for inflight flush to finish on tier %d...", ctx->config.tier); + logged = true; + } + sleep_usec(100 * USEC_PER_MS); + } - for (datafile = ctx->datafiles.first ; datafile != NULL ; datafile = next_datafile) { - journalfile = datafile->journalfile; - next_datafile = datafile->next; + do { + struct rrdengine_datafile *datafile = ctx->datafiles.first; + struct rrdengine_journalfile *journalfile = datafile->journalfile; - for (extent = datafile->extents.first ; extent != NULL ; extent = next_extent) { - next_extent = extent->next; - freez(extent); + logged = false; + size_t iterations = 100; + while(!datafile_acquire_for_deletion(datafile) && datafile != ctx->datafiles.first->prev && --iterations > 0) { + if(!logged) { + info("Waiting to acquire data file %u of tier %d to close it...", datafile->fileno, ctx->config.tier); + logged = true; + } + sleep_usec(100 * USEC_PER_MS); } - close_journal_file(journalfile, datafile); + + logged = false; + bool available = false; + do { + uv_rwlock_wrlock(&ctx->datafiles.rwlock); + netdata_spinlock_lock(&datafile->writers.spinlock); + available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true; + + if(!available) { + netdata_spinlock_unlock(&datafile->writers.spinlock); + uv_rwlock_wrunlock(&ctx->datafiles.rwlock); + if(!logged) { + info("Waiting for writers to data file %u of tier %d to finish...", datafile->fileno, ctx->config.tier); + logged = true; + } + sleep_usec(100 * USEC_PER_MS); + } + } while(!available); + + journalfile_close(journalfile, datafile); close_data_file(datafile); + datafile_list_delete_unsafe(ctx, datafile); + netdata_spinlock_unlock(&datafile->writers.spinlock); + uv_rwlock_wrunlock(&ctx->datafiles.rwlock); + freez(journalfile); freez(datafile); - } + + } while(ctx->datafiles.first); } |