diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:04 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-05-08 16:27:04 +0000 |
commit | a836a244a3d2bdd4da1ee2641e3e957850668cea (patch) | |
tree | cb87c75b3677fab7144f868435243f864048a1e6 /database/engine/rrdengine.c | |
parent | Adding upstream version 1.38.1. (diff) | |
download | netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.tar.xz netdata-a836a244a3d2bdd4da1ee2641e3e957850668cea.zip |
Adding upstream version 1.39.0.upstream/1.39.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r-- | database/engine/rrdengine.c | 202 |
1 files changed, 137 insertions, 65 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index d64868f03..7811a5eaa 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -16,6 +16,24 @@ unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT; #error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2) #endif +struct rrdeng_cmd { + struct rrdengine_instance *ctx; + enum rrdeng_opcode opcode; + void *data; + struct completion *completion; + enum storage_priority priority; + dequeue_callback_t dequeue_cb; + + struct { + struct rrdeng_cmd *prev; + struct rrdeng_cmd *next; + } queue; +}; + +static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker); +static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker); +static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker); + struct rrdeng_main { uv_thread_t thread; uv_loop_t loop; @@ -45,7 +63,6 @@ struct rrdeng_main { struct { size_t dispatched; size_t executing; - size_t pending_cb; } atomics; } work_cmd; @@ -132,8 +149,22 @@ static void work_request_init(void) { ); } -static inline bool work_request_full(void) { - return __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED) >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS); +enum LIBUV_WORKERS_STATUS { + LIBUV_WORKERS_RELAXED, + LIBUV_WORKERS_STRESSED, + LIBUV_WORKERS_CRITICAL, +}; + +static inline enum LIBUV_WORKERS_STATUS work_request_full(void) { + size_t dispatched = __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED); + + if(dispatched >= (size_t)(libuv_worker_threads)) + return LIBUV_WORKERS_CRITICAL; + + else if(dispatched >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS)) + return LIBUV_WORKERS_STRESSED; + + return LIBUV_WORKERS_RELAXED; } static inline void work_done(struct rrdeng_work *work_request) { @@ -147,12 +178,38 @@ static void work_standard_worker(uv_work_t *req) { worker_is_busy(UV_EVENT_WORKER_INIT); struct rrdeng_work *work_request = req->data; + work_request->data = work_request->work_cb(work_request->ctx, work_request->data, work_request->completion, req); worker_is_idle(); + if(work_request->opcode == RRDENG_OPCODE_EXTENT_READ || work_request->opcode == RRDENG_OPCODE_QUERY) { + internal_fatal(work_request->after_work_cb != NULL, "DBENGINE: opcodes with a callback should not boosted"); + + while(1) { + struct rrdeng_cmd cmd = rrdeng_deq_cmd(true); + if (cmd.opcode == RRDENG_OPCODE_NOOP) + break; + + worker_is_busy(UV_EVENT_WORKER_INIT); + switch (cmd.opcode) { + case RRDENG_OPCODE_EXTENT_READ: + worker_dispatch_extent_read(cmd, true); + break; + + case RRDENG_OPCODE_QUERY: + worker_dispatch_query_prep(cmd, true); + break; + + default: + fatal("DBENGINE: Opcode should not be executed synchronously"); + break; + } + worker_is_idle(); + } + } + __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED); __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED); // signal the event loop a worker is available fatal_assert(0 == uv_async_send(&rrdeng_main.async)); @@ -167,7 +224,6 @@ static void after_work_standard_callback(uv_work_t* req, int status) { work_request->after_work_cb(work_request->ctx, work_request->data, work_request->completion, req, status); work_done(work_request); - __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED); worker_is_idle(); } @@ -369,20 +425,6 @@ void wal_release(WAL *wal) { // ---------------------------------------------------------------------------- // command queue cache -struct rrdeng_cmd { - struct rrdengine_instance *ctx; - enum rrdeng_opcode opcode; - void *data; - struct completion *completion; - enum storage_priority priority; - dequeue_callback_t dequeue_cb; - - struct { - struct rrdeng_cmd *prev; - struct rrdeng_cmd *next; - } queue; -}; - static void rrdeng_cmd_queue_init(void) { rrdeng_main.cmd_queue.ar = aral_create("dbengine-opcodes", sizeof(struct rrdeng_cmd), @@ -465,14 +507,33 @@ static inline bool rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(STORAGE_PR return false; } -static inline struct rrdeng_cmd rrdeng_deq_cmd(void) { +#define opcode_empty (struct rrdeng_cmd) { \ + .ctx = NULL, \ + .opcode = RRDENG_OPCODE_NOOP, \ + .priority = STORAGE_PRIORITY_BEST_EFFORT, \ + .completion = NULL, \ + .data = NULL, \ +} + +static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) { struct rrdeng_cmd *cmd = NULL; + enum LIBUV_WORKERS_STATUS status = work_request_full(); + + STORAGE_PRIORITY min_priority, max_priority; + min_priority = STORAGE_PRIORITY_INTERNAL_DBENGINE; + max_priority = (status != LIBUV_WORKERS_RELAXED) ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE - 1; - STORAGE_PRIORITY max_priority = work_request_full() ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_BEST_EFFORT; + if(from_worker) { + if(status == LIBUV_WORKERS_CRITICAL) + return opcode_empty; + + min_priority = STORAGE_PRIORITY_INTERNAL_QUERY_PREP; + max_priority = STORAGE_PRIORITY_BEST_EFFORT; + } // find an opcode to execute from the queue netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); - for(STORAGE_PRIORITY priority = STORAGE_PRIORITY_INTERNAL_DBENGINE; priority <= max_priority ; priority++) { + for(STORAGE_PRIORITY priority = min_priority; priority <= max_priority ; priority++) { cmd = rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority]; if(cmd) { @@ -508,13 +569,7 @@ static inline struct rrdeng_cmd rrdeng_deq_cmd(void) { aral_freez(rrdeng_main.cmd_queue.ar, cmd); } else - ret = (struct rrdeng_cmd) { - .ctx = NULL, - .opcode = RRDENG_OPCODE_NOOP, - .priority = STORAGE_PRIORITY_BEST_EFFORT, - .completion = NULL, - .data = NULL, - }; + ret = opcode_empty; return ret; } @@ -927,11 +982,6 @@ struct uuid_first_time_s { size_t df_index_oldest; }; -static int journal_metric_compare(const void *key, const void *metric) -{ - return uuid_compare(*(uuid_t *) key, ((struct journal_metric_list *) metric)->uuid); -} - struct rrdengine_datafile *datafile_release_and_acquire_next_for_retention(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) { uv_rwlock_rdlock(&ctx->datafiles.rwlock); @@ -987,7 +1037,10 @@ void find_uuid_first_time( if (uuid_original_entry->df_matched > 3 || uuid_original_entry->pages_found > 5) continue; - struct journal_metric_list *live_entry = bsearch(uuid_original_entry->uuid,uuid_list,journal_metric_count,sizeof(*uuid_list), journal_metric_compare); + struct journal_metric_list *live_entry = + bsearch(uuid_original_entry->uuid,uuid_list,journal_metric_count, + sizeof(*uuid_list), journal_metric_uuid_compare); + if (!live_entry) { // Not found in this journal not_matching_bsearches++; @@ -1087,13 +1140,20 @@ void find_uuid_first_time( } static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile_to_delete, struct rrdengine_datafile *first_datafile_remaining, bool worker) { - __atomic_add_fetch(&rrdeng_cache_efficiency_stats.metrics_retention_started, 1, __ATOMIC_RELAXED); - if(worker) worker_is_busy(UV_EVENT_DBENGINE_FIND_ROTATED_METRICS); struct rrdengine_journalfile *journalfile = datafile_to_delete->journalfile; struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, NULL, 0, 0); + + if (unlikely(!j2_header)) { + if (worker) + worker_is_idle(); + return; + } + + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.metrics_retention_started, 1, __ATOMIC_RELAXED); + struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset); size_t count = j2_header->metric_count; @@ -1348,14 +1408,9 @@ static void *cache_evict_tp_worker(struct rrdengine_instance *ctx __maybe_unused return data; } -static void after_prep_query(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { - ; -} - static void *query_prep_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) { - worker_is_busy(UV_EVENT_DBENGINE_QUERY); PDC *pdc = data; - rrdeng_prep_query(pdc); + rrdeng_prep_query(pdc, true); return data; } @@ -1435,21 +1490,28 @@ static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __mayb worker_is_busy(UV_EVENT_DBENGINE_JOURNAL_INDEX); count = 0; while (datafile && datafile->fileno != ctx_last_fileno_get(ctx) && datafile->fileno != ctx_last_flush_fileno_get(ctx)) { + if(journalfile_v2_data_available(datafile->journalfile)) { + // journal file v2 is already there for this datafile + datafile = datafile->next; + continue; + } netdata_spinlock_lock(&datafile->writers.spinlock); bool available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true; netdata_spinlock_unlock(&datafile->writers.spinlock); - if(!available) + if(!available) { + info("DBENGINE: journal file %u needs to be indexed, but it has writers working on it - skipping it for now", datafile->fileno); + datafile = datafile->next; continue; - - if (unlikely(!journalfile_v2_data_available(datafile->journalfile))) { - info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno); - pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type, - journalfile_migrate_to_v2_callback, (void *) datafile->journalfile); - count++; } + info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno); + pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type, + journalfile_migrate_to_v2_callback, (void *) datafile->journalfile); + + count++; + datafile = datafile->next; if (unlikely(!ctx_is_available_for_queries(ctx))) @@ -1472,10 +1534,6 @@ static void after_do_cache_evict(struct rrdengine_instance *ctx __maybe_unused, rrdeng_main.evictions_running--; } -static void after_extent_read(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { - ; -} - static void after_journal_v2_indexing(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { __atomic_store_n(&ctx->atomic.migration_to_v2_running, false, __ATOMIC_RELAXED); rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); @@ -1604,6 +1662,26 @@ bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) { return true; } +static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker) { + struct rrdengine_instance *ctx = cmd.ctx; + EPDL *epdl = cmd.data; + + if(from_worker) + epdl_find_extent_and_populate_pages(ctx, epdl, true); + else + work_dispatch(ctx, epdl, NULL, cmd.opcode, extent_read_tp_worker, NULL); +} + +static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker) { + struct rrdengine_instance *ctx = cmd.ctx; + PDC *pdc = cmd.data; + + if(from_worker) + rrdeng_prep_query(pdc, true); + else + work_dispatch(ctx, pdc, NULL, cmd.opcode, query_prep_tp_worker, NULL); +} + void dbengine_event_loop(void* arg) { sanity_check(); uv_thread_set_name_np(pthread_self(), "DBENGINE"); @@ -1661,25 +1739,19 @@ void dbengine_event_loop(void* arg) { /* wait for commands */ do { worker_is_busy(RRDENG_OPCODE_MAX); - cmd = rrdeng_deq_cmd(); + cmd = rrdeng_deq_cmd(RRDENG_OPCODE_NOOP); opcode = cmd.opcode; worker_is_busy(opcode); switch (opcode) { - case RRDENG_OPCODE_EXTENT_READ: { - struct rrdengine_instance *ctx = cmd.ctx; - EPDL *epdl = cmd.data; - work_dispatch(ctx, epdl, NULL, opcode, extent_read_tp_worker, after_extent_read); + case RRDENG_OPCODE_EXTENT_READ: + worker_dispatch_extent_read(cmd, false); break; - } - case RRDENG_OPCODE_QUERY: { - struct rrdengine_instance *ctx = cmd.ctx; - PDC *pdc = cmd.data; - work_dispatch(ctx, pdc, NULL, opcode, query_prep_tp_worker, after_prep_query); + case RRDENG_OPCODE_QUERY: + worker_dispatch_query_prep(cmd, false); break; - } case RRDENG_OPCODE_EXTENT_WRITE: { struct rrdengine_instance *ctx = cmd.ctx; |