From 58daab21cd043e1dc37024a7f99b396788372918 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 9 Mar 2024 14:19:48 +0100 Subject: Merging upstream version 1.44.3. Signed-off-by: Daniel Baumann --- database/engine/rrdengine.c | 117 ++++++++++++++++++-------------------------- 1 file changed, 47 insertions(+), 70 deletions(-) (limited to 'database/engine/rrdengine.c') diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index 99257b79d..b82cc1ad1 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -40,6 +40,7 @@ struct rrdeng_main { uv_async_t async; uv_timer_t timer; pid_t tid; + bool shutdown; size_t flushes_running; size_t evictions_running; @@ -575,55 +576,6 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) { } -// ---------------------------------------------------------------------------- - -struct { - ARAL *aral[RRD_STORAGE_TIERS]; -} dbengine_page_alloc_globals = {}; - -static inline ARAL *page_size_lookup(size_t size) { - for(size_t tier = 0; tier < storage_tiers ;tier++) - if(size == tier_page_size[tier]) - return dbengine_page_alloc_globals.aral[tier]; - - return NULL; -} - -static void dbengine_page_alloc_init(void) { - for(size_t i = storage_tiers; i > 0 ;i--) { - size_t tier = storage_tiers - i; - - char buf[20 + 1]; - snprintfz(buf, 20, "tier%zu-pages", tier); - - dbengine_page_alloc_globals.aral[tier] = aral_create( - buf, - tier_page_size[tier], - 64, - 512 * tier_page_size[tier], - pgc_aral_statistics(), - NULL, NULL, false, false); - } -} - -void *dbengine_page_alloc(size_t size) { - ARAL *ar = page_size_lookup(size); - if(ar) return aral_mallocz(ar); - - return mallocz(size); -} - -void dbengine_page_free(void *page, size_t size __maybe_unused) { - if(unlikely(!page || page == DBENGINE_EMPTY_PAGE)) - return; - - ARAL *ar = page_size_lookup(size); - if(ar) - aral_freez(ar, page); - else - freez(page); -} - // ---------------------------------------------------------------------------- void *dbengine_extent_alloc(size_t size) { @@ -890,12 +842,25 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id); header->descr[i].page_length = descr->page_length; header->descr[i].start_time_ut = descr->start_time_ut; - header->descr[i].end_time_ut = descr->end_time_ut; + + switch (descr->type) { + case PAGE_METRICS: + case PAGE_TIER: + header->descr[i].end_time_ut = descr->end_time_ut; + break; + case PAGE_GORILLA_METRICS: + header->descr[i].gorilla.delta_time_s = (uint32_t) ((descr->end_time_ut - descr->start_time_ut) / USEC_PER_SEC); + header->descr[i].gorilla.entries = pgd_slots_used(descr->pgd); + break; + default: + fatal("Unknown page type: %uc", descr->type); + } + pos += sizeof(header->descr[i]); } for (i = 0 ; i < count ; ++i) { descr = xt_io_descr->descr_array[i]; - (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length); + pgd_copy_to_extent(descr->pgd, xt_io_descr->buf + pos, descr->page_length); pos += descr->page_length; } @@ -1381,9 +1346,6 @@ static void after_ctx_shutdown(struct rrdengine_instance *ctx __maybe_unused, vo static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { worker_is_busy(UV_EVENT_DBENGINE_SHUTDOWN); - completion_wait_for(&ctx->quiesce.completion); - completion_destroy(&ctx->quiesce.completion); - bool logged = false; while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED) || __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) { @@ -1436,6 +1398,14 @@ uint64_t rrdeng_target_data_file_size(struct rrdengine_instance *ctx) { bool rrdeng_ctx_exceeded_disk_quota(struct rrdengine_instance *ctx) { + if(!ctx->datafiles.first) + // no datafiles available + return false; + + if(!ctx->datafiles.first->next) + // only 1 datafile available + return false; + uint64_t estimated_disk_space = ctx_current_disk_space_get(ctx) + rrdeng_target_data_file_size(ctx) - (ctx->datafiles.first->prev ? ctx->datafiles.first->prev->pos : 0); @@ -1514,12 +1484,19 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb spinlock_unlock(&datafile->writers.spinlock); if(!available) { - netdata_log_info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno); + nd_log(NDLS_DAEMON, NDLP_NOTICE, + "DBENGINE: journal file %u needs to be indexed, but it has writers working on it - " + "skipping it for now", + datafile->fileno); + datafile = datafile->next; continue; } - netdata_log_info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "DBENGINE: journal file %u is ready to be indexed", + datafile->fileno); + pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type, journalfile_migrate_to_v2_callback, (void *) datafile->journalfile); @@ -1532,7 +1509,10 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb } errno = 0; - internal_error(count, "DBENGINE: journal indexing done; %u files processed", count); + if(count) + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "DBENGINE: journal indexing done; %u files processed", + count); worker_is_idle(); @@ -1628,7 +1608,7 @@ static void dbengine_initialize_structures(void) { rrdeng_query_handle_init(); page_descriptors_init(); extent_buffer_init(); - dbengine_page_alloc_init(); + pgd_init_arals(); extent_io_descriptor_init(); } @@ -1715,6 +1695,7 @@ void dbengine_event_loop(void* arg) { worker_register_job_name(RRDENG_OPCODE_EVICT_INIT, "evict init"); worker_register_job_name(RRDENG_OPCODE_CTX_SHUTDOWN, "ctx shutdown"); worker_register_job_name(RRDENG_OPCODE_CTX_QUIESCE, "ctx quiesce"); + worker_register_job_name(RRDENG_OPCODE_SHUTDOWN_EVLOOP, "dbengine shutdown"); worker_register_job_name(RRDENG_OPCODE_MAX, "get opcode"); @@ -1856,6 +1837,13 @@ void dbengine_event_loop(void* arg) { break; } + case RRDENG_OPCODE_SHUTDOWN_EVLOOP: { + uv_close((uv_handle_t *)&main->async, NULL); + (void) uv_timer_stop(&main->timer); + uv_close((uv_handle_t *)&main->timer, NULL); + shutdown = true; + } + case RRDENG_OPCODE_NOOP: { /* the command queue was empty, do nothing */ break; @@ -1872,18 +1860,7 @@ void dbengine_event_loop(void* arg) { } while (opcode != RRDENG_OPCODE_NOOP); } - /* cleanup operations of the event loop */ - netdata_log_info("DBENGINE: shutting down dbengine thread"); - - /* - * uv_async_send after uv_close does not seem to crash in linux at the moment, - * it is however undocumented behaviour and we need to be aware if this becomes - * an issue in the future. - */ - uv_close((uv_handle_t *)&main->async, NULL); - uv_timer_stop(&main->timer); - uv_close((uv_handle_t *)&main->timer, NULL); - uv_run(&main->loop, UV_RUN_DEFAULT); + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Shutting down dbengine thread"); uv_loop_close(&main->loop); worker_unregister(); } -- cgit v1.2.3