diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 11:19:16 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:53:24 +0000 |
commit | b5f8ee61a7f7e9bd291dd26b0585d03eb686c941 (patch) | |
tree | d4d31289c39fc00da064a825df13a0b98ce95b10 /src/database/engine/rrdengine.c | |
parent | Adding upstream version 1.44.3. (diff) | |
download | netdata-upstream.tar.xz netdata-upstream.zip |
Adding upstream version 1.46.3.upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/database/engine/rrdengine.c (renamed from database/engine/rrdengine.c) | 348 |
1 files changed, 281 insertions, 67 deletions
diff --git a/database/engine/rrdengine.c b/src/database/engine/rrdengine.c index b82cc1ad1..2d6583ead 100644 --- a/database/engine/rrdengine.c +++ b/src/database/engine/rrdengine.c @@ -3,6 +3,7 @@ #include "rrdengine.h" #include "pdc.h" +#include "dbengine-compression.h" rrdeng_stats_t global_io_errors = 0; rrdeng_stats_t global_fs_errors = 0; @@ -10,7 +11,7 @@ rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0; rrdeng_stats_t global_flushing_pressure_page_deletions = 0; -unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT; +unsigned rrdeng_pages_per_extent = DEFAULT_PAGES_PER_EXTENT; #if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_OPCODE_MAX + 2) #error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2) @@ -39,6 +40,7 @@ struct rrdeng_main { uv_loop_t loop; uv_async_t async; uv_timer_t timer; + uv_timer_t retention_timer; pid_t tid; bool shutdown; @@ -110,16 +112,10 @@ static void sanity_check(void) /* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */ BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0); - BUILD_BUG_ON(sizeof(uuid_t) != UUID_SZ); /* check UUID size */ + BUILD_BUG_ON(sizeof(nd_uuid_t) != UUID_SZ); /* check UUID size */ /* page count must fit in 8 bits */ BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255); - - /* extent cache count must fit in 32 bits */ -// BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32); - - /* page info scratch space must be able to hold 2 32-bit integers */ - BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t)); } // ---------------------------------------------------------------------------- @@ -229,10 +225,10 @@ static void after_work_standard_callback(uv_work_t* req, int status) { worker_is_idle(); } -static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb work_cb, after_work_cb after_work_cb) { +static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb do_work_cb, after_work_cb do_after_work_cb) { struct rrdeng_work *work_request = NULL; - internal_fatal(rrdeng_main.tid != gettid(), "work_dispatch() can only be run from the event loop thread"); + internal_fatal(rrdeng_main.tid != gettid_cached(), "work_dispatch() can only be run from the event loop thread"); work_request = aral_mallocz(rrdeng_main.work_cmd.ar); memset(work_request, 0, sizeof(struct rrdeng_work)); @@ -240,8 +236,8 @@ static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct com work_request->ctx = ctx; work_request->data = data; work_request->completion = completion; - work_request->work_cb = work_cb; - work_request->after_work_cb = after_work_cb; + work_request->work_cb = do_work_cb; + work_request->after_work_cb = do_after_work_cb; work_request->opcode = opcode; if(uv_queue_work(&rrdeng_main.loop, &work_request->req, work_standard_worker, after_work_standard_callback)) { @@ -772,13 +768,10 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_ */ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_instance *ctx, struct page_descr_with_data *base, struct completion *completion) { int ret; - int compressed_size, max_compressed_size = 0; unsigned i, count, size_bytes, pos, real_io_size; - uint32_t uncompressed_payload_length, payload_offset; + uint32_t uncompressed_payload_length, max_compressed_size, payload_offset; struct page_descr_with_data *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; struct extent_io_descriptor *xt_io_descr; - struct extent_buffer *eb = NULL; - void *compressed_buf = NULL; Word_t Index; uint8_t compression_algorithm = ctx->config.global_compress_alg; struct rrdengine_datafile *datafile; @@ -807,20 +800,8 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta xt_io_descr = extent_io_descriptor_get(); xt_io_descr->ctx = ctx; payload_offset = sizeof(*header) + count * sizeof(header->descr[0]); - switch (compression_algorithm) { - case RRD_NO_COMPRESSION: - size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer); - break; - - default: /* Compress */ - fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); - max_compressed_size = LZ4_compressBound(uncompressed_payload_length); - eb = extent_buffer_get(max_compressed_size); - compressed_buf = eb->data; - size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer); - break; - } - + max_compressed_size = dbengine_max_compressed_size(uncompressed_payload_length, compression_algorithm); + size_bytes = payload_offset + MAX(uncompressed_payload_length, max_compressed_size) + sizeof(*trailer); ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); if (unlikely(ret)) { fatal("DBENGINE: posix_memalign:%s", strerror(ret)); @@ -832,23 +813,22 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta pos = 0; header = xt_io_descr->buf; - header->compression_algorithm = compression_algorithm; header->number_of_pages = count; pos += sizeof(*header); for (i = 0 ; i < count ; ++i) { descr = xt_io_descr->descr_array[i]; header->descr[i].type = descr->type; - uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id); + uuid_copy(*(nd_uuid_t *)header->descr[i].uuid, *descr->id); header->descr[i].page_length = descr->page_length; header->descr[i].start_time_ut = descr->start_time_ut; switch (descr->type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: header->descr[i].end_time_ut = descr->end_time_ut; break; - case PAGE_GORILLA_METRICS: + case RRDENG_PAGE_TYPE_GORILLA_32BIT: header->descr[i].gorilla.delta_time_s = (uint32_t) ((descr->end_time_ut - descr->start_time_ut) / USEC_PER_SEC); header->descr[i].gorilla.entries = pgd_slots_used(descr->pgd); break; @@ -858,29 +838,40 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta pos += sizeof(header->descr[i]); } + + // build the extent payload for (i = 0 ; i < count ; ++i) { descr = xt_io_descr->descr_array[i]; pgd_copy_to_extent(descr->pgd, xt_io_descr->buf + pos, descr->page_length); pos += descr->page_length; } - if(likely(compression_algorithm == RRD_LZ4)) { - compressed_size = LZ4_compress_default( - xt_io_descr->buf + payload_offset, - compressed_buf, - (int)uncompressed_payload_length, - max_compressed_size); + // compress the payload + size_t compressed_size = + (int)dbengine_compress(xt_io_descr->buf + payload_offset, + uncompressed_payload_length, + compression_algorithm); - __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED); - __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED); + internal_fatal(compressed_size > max_compressed_size, "DBENGINE: compression returned more data than the max allowed"); + internal_fatal(compressed_size > uncompressed_payload_length, "DBENGINE: compression returned more data than the uncompressed extent"); - (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size); - extent_buffer_release(eb); - size_bytes = payload_offset + compressed_size + sizeof(*trailer); + if(compressed_size) { + header->compression_algorithm = compression_algorithm; header->payload_length = compressed_size; } - else { // RRD_NO_COMPRESSION - header->payload_length = uncompressed_payload_length; + else { + // compression failed, or generated bigger pages + // so it didn't touch our uncompressed buffer + header->compression_algorithm = RRDENG_COMPRESSION_NONE; + header->payload_length = compressed_size = uncompressed_payload_length; + } + + // set the correct size + size_bytes = payload_offset + compressed_size + sizeof(*trailer); + + if(compression_algorithm != RRDENG_COMPRESSION_NONE) { + __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED); + __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED); } real_io_size = ALIGN_BYTES_CEILING(size_bytes); @@ -939,7 +930,7 @@ static void after_database_rotate(struct rrdengine_instance *ctx __maybe_unused, } struct uuid_first_time_s { - uuid_t *uuid; + nd_uuid_t *uuid; time_t first_time_s; METRIC *metric; size_t pages_found; @@ -1171,7 +1162,17 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r for (size_t index = 0; index < added; ++index) { uuid_first_t_entry = &uuid_first_entry_list[index]; if (likely(uuid_first_t_entry->first_time_s != LONG_MAX)) { - mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s); + + time_t old_first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric); + + bool changed = mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s); + if (changed) { + uint32_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric); + if (update_every_s && old_first_time_s && uuid_first_t_entry->first_time_s > old_first_time_s) { + uint64_t remove_samples = (uuid_first_t_entry->first_time_s - old_first_time_s) / update_every_s; + __atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED); + } + } mrg_metric_release(main_mrg, uuid_first_t_entry->metric); } else { @@ -1180,6 +1181,14 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r // there is no retention for this metric bool has_retention = mrg_metric_zero_disk_retention(main_mrg, uuid_first_t_entry->metric); if (!has_retention) { + time_t first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric); + time_t last_time_s = mrg_metric_get_latest_time_s(main_mrg, uuid_first_t_entry->metric); + time_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric); + if (update_every_s && first_time_s && last_time_s) { + uint64_t remove_samples = (first_time_s - last_time_s) / update_every_s; + __atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED); + } + bool deleted = mrg_metric_release_and_delete(main_mrg, uuid_first_t_entry->metric); if(deleted) deleted_metrics++; @@ -1280,7 +1289,7 @@ void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile * static void *database_rotate_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { datafile_delete(ctx, ctx->datafiles.first, ctx_is_available_for_queries(ctx), true); - if (rrdeng_ctx_exceeded_disk_quota(ctx)) + if (rrdeng_ctx_tier_cap_exceeded(ctx)) rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); rrdcontext_db_rotation(); @@ -1352,8 +1361,7 @@ static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unuse if(!logged) { logged = true; netdata_log_info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...", - __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED), - (ctx->config.legacy) ? -1 : ctx->config.tier); + __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED), ctx->config.tier); } sleep_usec(1 * USEC_PER_MS); } @@ -1390,26 +1398,27 @@ static void *query_prep_tp_worker(struct rrdengine_instance *ctx __maybe_unused, } uint64_t rrdeng_target_data_file_size(struct rrdengine_instance *ctx) { - uint64_t target_size = ctx->config.max_disk_space / TARGET_DATAFILES; + uint64_t target_size = ctx->config.max_disk_space ? ctx->config.max_disk_space / TARGET_DATAFILES : MAX_DATAFILE_SIZE; target_size = MIN(target_size, MAX_DATAFILE_SIZE); target_size = MAX(target_size, MIN_DATAFILE_SIZE); return target_size; } -bool rrdeng_ctx_exceeded_disk_quota(struct rrdengine_instance *ctx) +time_t get_datafile_end_time(struct rrdengine_instance *ctx) { - if(!ctx->datafiles.first) - // no datafiles available - return false; + time_t last_time_s = 0; - if(!ctx->datafiles.first->next) - // only 1 datafile available - return false; + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + struct rrdengine_datafile *datafile = ctx->datafiles.first; - uint64_t estimated_disk_space = ctx_current_disk_space_get(ctx) + rrdeng_target_data_file_size(ctx) - - (ctx->datafiles.first->prev ? ctx->datafiles.first->prev->pos : 0); + if (datafile) { + last_time_s = datafile->journalfile->v2.last_time_s; + if (!last_time_s) + last_time_s = datafile->journalfile->v2.first_time_s; + } - return estimated_disk_space > ctx->config.max_disk_space; + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + return last_time_s; } /* return 0 on success */ @@ -1580,6 +1589,80 @@ static void *cleanup_tp_worker(struct rrdengine_instance *ctx __maybe_unused, vo return data; } +uint64_t get_used_disk_space(struct rrdengine_instance *ctx) +{ + uint64_t active_space = 0; + + if (ctx->datafiles.first && ctx->datafiles.first->prev) + active_space = ctx->datafiles.first->prev->pos; + + uint64_t estimated_disk_space = ctx_current_disk_space_get(ctx) + rrdeng_target_data_file_size(ctx) - active_space; + + uint64_t database_space = get_total_database_space(); + uint64_t adjusted_database_space = database_space * ctx->config.disk_percentage / 100 ; + estimated_disk_space += adjusted_database_space; + + return estimated_disk_space; +} + +static time_t get_tier_retention(struct rrdengine_instance *ctx) +{ + time_t retention = 0; + if (localhost) { + STORAGE_ENGINE *eng = localhost->db[ctx->config.tier].eng; + if (eng) { + time_t first_time_s = get_datafile_end_time(ctx); + if (first_time_s) + retention = now_realtime_sec() - first_time_s; + } + } + return retention; +} + +// Check if disk or retention time cap reached +bool rrdeng_ctx_tier_cap_exceeded(struct rrdengine_instance *ctx) +{ + if(!ctx->datafiles.first) + // no datafiles available + return false; + + if(!ctx->datafiles.first->next) + // only 1 datafile available + return false; + + uint64_t estimated_disk_space = get_used_disk_space(ctx); + time_t retention = get_tier_retention(ctx); + + if (ctx->config.max_retention_s && retention > ctx->config.max_retention_s) + return true; + + if (ctx->config.max_disk_space && estimated_disk_space > ctx->config.max_disk_space) + return true; + + return false; +} + +void retention_timer_cb(uv_timer_t *handle) +{ + if (!localhost) + return; + + worker_is_busy(RRDENG_TIMER_CB); + uv_stop(handle->loop); + uv_update_time(handle->loop); + + for (size_t tier = 0; tier < storage_tiers; tier++) { + STORAGE_ENGINE *eng = localhost->db[tier].eng; + if (!eng || eng->seb != STORAGE_ENGINE_BACKEND_DBENGINE) + continue; + bool cleanup = rrdeng_ctx_tier_cap_exceeded(multidb_ctx[tier]); + if (cleanup) + rrdeng_enq_cmd(multidb_ctx[tier], RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); + } + + worker_is_idle(); +} + void timer_cb(uv_timer_t* handle) { worker_is_busy(RRDENG_TIMER_CB); uv_stop(handle->loop); @@ -1643,7 +1726,17 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) { fatal_assert(0 == uv_loop_close(&rrdeng_main.loop)); return false; } + + ret = uv_timer_init(&rrdeng_main.loop, &rrdeng_main.retention_timer); + if (ret) { + netdata_log_error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret)); + uv_close((uv_handle_t *)&rrdeng_main.async, NULL); + fatal_assert(0 == uv_loop_close(&rrdeng_main.loop)); + return false; + } + rrdeng_main.timer.data = &rrdeng_main; + rrdeng_main.retention_timer.data = &rrdeng_main; dbengine_initialize_structures(); @@ -1675,9 +1768,125 @@ static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_w work_dispatch(ctx, pdc, NULL, cmd.opcode, query_prep_tp_worker, NULL); } +uint64_t get_directory_free_bytes_space(struct rrdengine_instance *ctx) +{ + uint64_t free_bytes = 0; + struct statvfs buff_statvfs; + if (statvfs(ctx->config.dbfiles_path, &buff_statvfs) == 0) + free_bytes = buff_statvfs.f_bavail * buff_statvfs.f_bsize; + + return (free_bytes - (free_bytes * 5 / 100)); +} + +void calculate_tier_disk_space_percentage(void) +{ + uint64_t tier_space[RRD_STORAGE_TIERS]; + + if (!localhost) + return; + + uint64_t total_diskspace = 0; + for(size_t tier = 0; tier < storage_tiers ;tier++) { + STORAGE_ENGINE *eng = localhost->db[tier].eng; + if (!eng || eng->seb != STORAGE_ENGINE_BACKEND_DBENGINE) { + tier_space[tier] = 0; + continue; + } + uint64_t tier_disk_space = multidb_ctx[tier]->config.max_disk_space ? + multidb_ctx[tier]->config.max_disk_space : + get_directory_free_bytes_space(multidb_ctx[tier]); + total_diskspace += tier_disk_space; + tier_space[tier] = tier_disk_space; + } + + if (total_diskspace) { + for (size_t tier = 0; tier < storage_tiers; tier++) { + multidb_ctx[tier]->config.disk_percentage = (100 * tier_space[tier] / total_diskspace); + } + } +} + +void dbengine_retention_statistics(void) +{ + static bool init = false; + static DBENGINE_TIER_STATS stats[RRD_STORAGE_TIERS]; + + if (!localhost) + return; + + calculate_tier_disk_space_percentage(); + + for (size_t tier = 0; tier < storage_tiers; tier++) { + STORAGE_ENGINE *eng = localhost->db[tier].eng; + if (!eng || eng->seb != STORAGE_ENGINE_BACKEND_DBENGINE) + continue; + + if (init == false) { + char id[200]; + snprintfz(id, sizeof(id) - 1, "dbengine_retention_tier%zu", tier); + stats[tier].st = rrdset_create_localhost( + "netdata", + id, + NULL, + "dbengine retention", + "netdata.dbengine_tier_retention", + "dbengine space and time retention", + "%", + "netdata", + "stats", + 134900, // before "dbengine memory" (dbengine2_statistics_charts) + 10, + RRDSET_TYPE_LINE); + + stats[tier].rd_space = rrddim_add(stats[tier].st, "space", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + stats[tier].rd_time = rrddim_add(stats[tier].st, "time", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + + char tier_str[5]; + snprintfz(tier_str, 4, "%zu", tier); + rrdlabels_add(stats[tier].st->rrdlabels, "tier", tier_str, RRDLABEL_SRC_AUTO); + + rrdset_flag_set(stats[tier].st, RRDSET_FLAG_METADATA_UPDATE); + rrdhost_flag_set(stats[tier].st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); + rrdset_metadata_updated(stats[tier].st); + } + + time_t first_time_s = storage_engine_global_first_time_s(eng->seb, localhost->db[tier].si); + time_t retention = first_time_s ? now_realtime_sec() - first_time_s : 0; + + // + // Note: storage_engine_disk_space_used is the exact diskspace (as reported by api/v2/node_instances + // get_used_disk_space is used to determine if database cleanup (file rotation should happen) + // and adds to the disk space used the desired file size of the active + // datafile + uint64_t disk_space = get_used_disk_space(multidb_ctx[tier]); + //uint64_t disk_space = storage_engine_disk_space_used(eng->seb, localhost->db[tier].si); + + uint64_t config_disk_space = storage_engine_disk_space_max(eng->seb, localhost->db[tier].si); + if (!config_disk_space) { + config_disk_space = get_directory_free_bytes_space(multidb_ctx[tier]); + config_disk_space += disk_space; + } + + collected_number disk_percentage = (collected_number) (config_disk_space ? 100 * disk_space / config_disk_space : 0); + + collected_number retention_percentage = (collected_number)multidb_ctx[tier]->config.max_retention_s ? + 100 * retention / multidb_ctx[tier]->config.max_retention_s : + 0; + + if (retention_percentage > 100) + retention_percentage = 100; + + rrddim_set_by_pointer(stats[tier].st, stats[tier].rd_space, (collected_number) disk_percentage); + rrddim_set_by_pointer(stats[tier].st, stats[tier].rd_time, (collected_number) retention_percentage); + + rrdset_done(stats[tier].st); + } + init = true; +} + void dbengine_event_loop(void* arg) { sanity_check(); - uv_thread_set_name_np(pthread_self(), "DBENGINE"); + uv_thread_set_name_np("DBENGINE"); service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); worker_register("DBENGINE"); @@ -1721,9 +1930,10 @@ void dbengine_event_loop(void* arg) { struct rrdeng_main *main = arg; enum rrdeng_opcode opcode; struct rrdeng_cmd cmd; - main->tid = gettid(); + main->tid = gettid_cached(); fatal_assert(0 == uv_timer_start(&main->timer, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); + fatal_assert(0 == uv_timer_start(&main->retention_timer, retention_timer_cb, TIMER_PERIOD_MS * 60, TIMER_PERIOD_MS * 60)); bool shutdown = false; while (likely(!shutdown)) { @@ -1804,7 +2014,7 @@ void dbengine_event_loop(void* arg) { if (!__atomic_load_n(&ctx->atomic.now_deleting_files, __ATOMIC_RELAXED) && ctx->datafiles.first->next != NULL && ctx->datafiles.first->next->next != NULL && - rrdeng_ctx_exceeded_disk_quota(ctx)) { + rrdeng_ctx_tier_cap_exceeded(ctx)) { __atomic_store_n(&ctx->atomic.now_deleting_files, true, __ATOMIC_RELAXED); work_dispatch(ctx, NULL, NULL, opcode, database_rotate_tp_worker, after_database_rotate); @@ -1841,7 +2051,11 @@ void dbengine_event_loop(void* arg) { uv_close((uv_handle_t *)&main->async, NULL); (void) uv_timer_stop(&main->timer); uv_close((uv_handle_t *)&main->timer, NULL); + + (void) uv_timer_stop(&main->retention_timer); + uv_close((uv_handle_t *)&main->retention_timer, NULL); shutdown = true; + break; } case RRDENG_OPCODE_NOOP: { |