diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /src/database/engine/rrdengine.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/database/engine/rrdengine.c')
-rw-r--r-- | src/database/engine/rrdengine.c | 2080 |
1 files changed, 2080 insertions, 0 deletions
diff --git a/src/database/engine/rrdengine.c b/src/database/engine/rrdengine.c new file mode 100644 index 000000000..2d6583ead --- /dev/null +++ b/src/database/engine/rrdengine.c @@ -0,0 +1,2080 @@ +// SPDX-License-Identifier: GPL-3.0-or-later +#define NETDATA_RRD_INTERNALS + +#include "rrdengine.h" +#include "pdc.h" +#include "dbengine-compression.h" + +rrdeng_stats_t global_io_errors = 0; +rrdeng_stats_t global_fs_errors = 0; +rrdeng_stats_t rrdeng_reserved_file_descriptors = 0; +rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0; +rrdeng_stats_t global_flushing_pressure_page_deletions = 0; + +unsigned rrdeng_pages_per_extent = DEFAULT_PAGES_PER_EXTENT; + +#if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_OPCODE_MAX + 2) +#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2) +#endif + +struct rrdeng_cmd { + struct rrdengine_instance *ctx; + enum rrdeng_opcode opcode; + void *data; + struct completion *completion; + enum storage_priority priority; + dequeue_callback_t dequeue_cb; + + struct { + struct rrdeng_cmd *prev; + struct rrdeng_cmd *next; + } queue; +}; + +static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker); +static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker); +static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker); + +struct rrdeng_main { + uv_thread_t thread; + uv_loop_t loop; + uv_async_t async; + uv_timer_t timer; + uv_timer_t retention_timer; + pid_t tid; + bool shutdown; + + size_t flushes_running; + size_t evictions_running; + size_t cleanup_running; + + struct { + ARAL *ar; + + struct { + SPINLOCK spinlock; + + size_t waiting; + struct rrdeng_cmd *waiting_items_by_priority[STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE]; + size_t executed_by_priority[STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE]; + } unsafe; + } cmd_queue; + + struct { + ARAL *ar; + + struct { + size_t dispatched; + size_t executing; + } atomics; + } work_cmd; + + struct { + ARAL *ar; + } handles; + + struct { + ARAL *ar; + } descriptors; + + struct { + ARAL *ar; + } xt_io_descr; + +} rrdeng_main = { + .thread = 0, + .loop = {}, + .async = {}, + .timer = {}, + .flushes_running = 0, + .evictions_running = 0, + .cleanup_running = 0, + + .cmd_queue = { + .unsafe = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + }, + } +}; + +static void sanity_check(void) +{ + BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_OPCODE_MAX + 2)); + + /* Magic numbers must fit in the super-blocks */ + BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ); + BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ); + + /* Version strings must fit in the super-blocks */ + BUILD_BUG_ON(strlen(RRDENG_DF_VER) > RRDENG_VER_SZ); + BUILD_BUG_ON(strlen(RRDENG_JF_VER) > RRDENG_VER_SZ); + + /* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */ + BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0); + + BUILD_BUG_ON(sizeof(nd_uuid_t) != UUID_SZ); /* check UUID size */ + + /* page count must fit in 8 bits */ + BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255); +} + +// ---------------------------------------------------------------------------- +// work request cache + +typedef void *(*work_cb)(struct rrdengine_instance *ctx, void *data, struct completion *completion, uv_work_t* req); +typedef void (*after_work_cb)(struct rrdengine_instance *ctx, void *data, struct completion *completion, uv_work_t* req, int status); + +struct rrdeng_work { + uv_work_t req; + + struct rrdengine_instance *ctx; + void *data; + struct completion *completion; + + work_cb work_cb; + after_work_cb after_work_cb; + enum rrdeng_opcode opcode; +}; + +static void work_request_init(void) { + rrdeng_main.work_cmd.ar = aral_create( + "dbengine-work-cmd", + sizeof(struct rrdeng_work), + 0, + 65536, NULL, + NULL, NULL, false, false + ); +} + +enum LIBUV_WORKERS_STATUS { + LIBUV_WORKERS_RELAXED, + LIBUV_WORKERS_STRESSED, + LIBUV_WORKERS_CRITICAL, +}; + +static inline enum LIBUV_WORKERS_STATUS work_request_full(void) { + size_t dispatched = __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED); + + if(dispatched >= (size_t)(libuv_worker_threads)) + return LIBUV_WORKERS_CRITICAL; + + else if(dispatched >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS)) + return LIBUV_WORKERS_STRESSED; + + return LIBUV_WORKERS_RELAXED; +} + +static inline void work_done(struct rrdeng_work *work_request) { + aral_freez(rrdeng_main.work_cmd.ar, work_request); +} + +static void work_standard_worker(uv_work_t *req) { + __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED); + + register_libuv_worker_jobs(); + worker_is_busy(UV_EVENT_WORKER_INIT); + + struct rrdeng_work *work_request = req->data; + + work_request->data = work_request->work_cb(work_request->ctx, work_request->data, work_request->completion, req); + worker_is_idle(); + + if(work_request->opcode == RRDENG_OPCODE_EXTENT_READ || work_request->opcode == RRDENG_OPCODE_QUERY) { + internal_fatal(work_request->after_work_cb != NULL, "DBENGINE: opcodes with a callback should not boosted"); + + while(1) { + struct rrdeng_cmd cmd = rrdeng_deq_cmd(true); + if (cmd.opcode == RRDENG_OPCODE_NOOP) + break; + + worker_is_busy(UV_EVENT_WORKER_INIT); + switch (cmd.opcode) { + case RRDENG_OPCODE_EXTENT_READ: + worker_dispatch_extent_read(cmd, true); + break; + + case RRDENG_OPCODE_QUERY: + worker_dispatch_query_prep(cmd, true); + break; + + default: + fatal("DBENGINE: Opcode should not be executed synchronously"); + break; + } + worker_is_idle(); + } + } + + __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED); + + // signal the event loop a worker is available + fatal_assert(0 == uv_async_send(&rrdeng_main.async)); +} + +static void after_work_standard_callback(uv_work_t* req, int status) { + struct rrdeng_work *work_request = req->data; + + worker_is_busy(RRDENG_OPCODE_MAX + work_request->opcode); + + if(work_request->after_work_cb) + work_request->after_work_cb(work_request->ctx, work_request->data, work_request->completion, req, status); + + work_done(work_request); + + worker_is_idle(); +} + +static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb do_work_cb, after_work_cb do_after_work_cb) { + struct rrdeng_work *work_request = NULL; + + internal_fatal(rrdeng_main.tid != gettid_cached(), "work_dispatch() can only be run from the event loop thread"); + + work_request = aral_mallocz(rrdeng_main.work_cmd.ar); + memset(work_request, 0, sizeof(struct rrdeng_work)); + work_request->req.data = work_request; + work_request->ctx = ctx; + work_request->data = data; + work_request->completion = completion; + work_request->work_cb = do_work_cb; + work_request->after_work_cb = do_after_work_cb; + work_request->opcode = opcode; + + if(uv_queue_work(&rrdeng_main.loop, &work_request->req, work_standard_worker, after_work_standard_callback)) { + internal_fatal(true, "DBENGINE: cannot queue work"); + work_done(work_request); + return false; + } + + __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED); + + return true; +} + +// ---------------------------------------------------------------------------- +// page descriptor cache + +void page_descriptors_init(void) { + rrdeng_main.descriptors.ar = aral_create( + "dbengine-descriptors", + sizeof(struct page_descr_with_data), + 0, + 65536 * 4, + NULL, + NULL, NULL, false, false); +} + +struct page_descr_with_data *page_descriptor_get(void) { + struct page_descr_with_data *descr = aral_mallocz(rrdeng_main.descriptors.ar); + memset(descr, 0, sizeof(struct page_descr_with_data)); + return descr; +} + +static inline void page_descriptor_release(struct page_descr_with_data *descr) { + aral_freez(rrdeng_main.descriptors.ar, descr); +} + +// ---------------------------------------------------------------------------- +// extent io descriptor cache + +static void extent_io_descriptor_init(void) { + rrdeng_main.xt_io_descr.ar = aral_create( + "dbengine-extent-io", + sizeof(struct extent_io_descriptor), + 0, + 65536, + NULL, + NULL, NULL, false, false + ); +} + +static struct extent_io_descriptor *extent_io_descriptor_get(void) { + struct extent_io_descriptor *xt_io_descr = aral_mallocz(rrdeng_main.xt_io_descr.ar); + memset(xt_io_descr, 0, sizeof(struct extent_io_descriptor)); + return xt_io_descr; +} + +static inline void extent_io_descriptor_release(struct extent_io_descriptor *xt_io_descr) { + aral_freez(rrdeng_main.xt_io_descr.ar, xt_io_descr); +} + +// ---------------------------------------------------------------------------- +// query handle cache + +void rrdeng_query_handle_init(void) { + rrdeng_main.handles.ar = aral_create( + "dbengine-query-handles", + sizeof(struct rrdeng_query_handle), + 0, + 65536, + NULL, + NULL, NULL, false, false); +} + +struct rrdeng_query_handle *rrdeng_query_handle_get(void) { + struct rrdeng_query_handle *handle = aral_mallocz(rrdeng_main.handles.ar); + memset(handle, 0, sizeof(struct rrdeng_query_handle)); + return handle; +} + +void rrdeng_query_handle_release(struct rrdeng_query_handle *handle) { + aral_freez(rrdeng_main.handles.ar, handle); +} + +// ---------------------------------------------------------------------------- +// WAL cache + +static struct { + struct { + SPINLOCK spinlock; + WAL *available_items; + size_t available; + } protected; + + struct { + size_t allocated; + } atomics; +} wal_globals = { + .protected = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .available_items = NULL, + .available = 0, + }, + .atomics = { + .allocated = 0, + }, +}; + +static void wal_cleanup1(void) { + WAL *wal = NULL; + + if(!spinlock_trylock(&wal_globals.protected.spinlock)) + return; + + if(wal_globals.protected.available_items && wal_globals.protected.available > storage_tiers) { + wal = wal_globals.protected.available_items; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next); + wal_globals.protected.available--; + } + + spinlock_unlock(&wal_globals.protected.spinlock); + + if(wal) { + posix_memfree(wal->buf); + freez(wal); + __atomic_sub_fetch(&wal_globals.atomics.allocated, 1, __ATOMIC_RELAXED); + } +} + +WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) { + if(!size || size > RRDENG_BLOCK_SIZE) + fatal("DBENGINE: invalid WAL size requested"); + + WAL *wal = NULL; + + spinlock_lock(&wal_globals.protected.spinlock); + + if(likely(wal_globals.protected.available_items)) { + wal = wal_globals.protected.available_items; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next); + wal_globals.protected.available--; + } + + uint64_t transaction_id = __atomic_fetch_add(&ctx->atomic.transaction_id, 1, __ATOMIC_RELAXED); + spinlock_unlock(&wal_globals.protected.spinlock); + + if(unlikely(!wal)) { + wal = mallocz(sizeof(WAL)); + wal->buf_size = RRDENG_BLOCK_SIZE; + int ret = posix_memalign((void *)&wal->buf, RRDFILE_ALIGNMENT, wal->buf_size); + if (unlikely(ret)) + fatal("DBENGINE: posix_memalign:%s", strerror(ret)); + __atomic_add_fetch(&wal_globals.atomics.allocated, 1, __ATOMIC_RELAXED); + } + + // these need to survive + unsigned buf_size = wal->buf_size; + void *buf = wal->buf; + + memset(wal, 0, sizeof(WAL)); + + // put them back + wal->buf_size = buf_size; + wal->buf = buf; + + memset(wal->buf, 0, wal->buf_size); + + wal->transaction_id = transaction_id; + wal->size = size; + + return wal; +} + +void wal_release(WAL *wal) { + if(unlikely(!wal)) return; + + spinlock_lock(&wal_globals.protected.spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next); + wal_globals.protected.available++; + spinlock_unlock(&wal_globals.protected.spinlock); +} + +// ---------------------------------------------------------------------------- +// command queue cache + +static void rrdeng_cmd_queue_init(void) { + rrdeng_main.cmd_queue.ar = aral_create("dbengine-opcodes", + sizeof(struct rrdeng_cmd), + 0, + 65536, + NULL, + NULL, NULL, false, false); +} + +static inline STORAGE_PRIORITY rrdeng_enq_cmd_map_opcode_to_priority(enum rrdeng_opcode opcode, STORAGE_PRIORITY priority) { + if(unlikely(priority >= STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE)) + priority = STORAGE_PRIORITY_BEST_EFFORT; + + switch(opcode) { + case RRDENG_OPCODE_QUERY: + priority = STORAGE_PRIORITY_INTERNAL_QUERY_PREP; + break; + + default: + break; + } + + return priority; +} + +void rrdeng_enqueue_epdl_cmd(struct rrdeng_cmd *cmd) { + epdl_cmd_queued(cmd->data, cmd); +} + +void rrdeng_dequeue_epdl_cmd(struct rrdeng_cmd *cmd) { + epdl_cmd_dequeued(cmd->data); +} + +void rrdeng_req_cmd(requeue_callback_t get_cmd_cb, void *data, STORAGE_PRIORITY priority) { + spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); + + struct rrdeng_cmd *cmd = get_cmd_cb(data); + if(cmd) { + priority = rrdeng_enq_cmd_map_opcode_to_priority(cmd->opcode, priority); + + if (cmd->priority > priority) { + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[cmd->priority], cmd, queue.prev, queue.next); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next); + cmd->priority = priority; + } + } + + spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); +} + +void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, void *data, struct completion *completion, + enum storage_priority priority, enqueue_callback_t enqueue_cb, dequeue_callback_t dequeue_cb) { + + priority = rrdeng_enq_cmd_map_opcode_to_priority(opcode, priority); + + struct rrdeng_cmd *cmd = aral_mallocz(rrdeng_main.cmd_queue.ar); + memset(cmd, 0, sizeof(struct rrdeng_cmd)); + cmd->ctx = ctx; + cmd->opcode = opcode; + cmd->data = data; + cmd->completion = completion; + cmd->priority = priority; + cmd->dequeue_cb = dequeue_cb; + + spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next); + rrdeng_main.cmd_queue.unsafe.waiting++; + if(enqueue_cb) + enqueue_cb(cmd); + spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); + + fatal_assert(0 == uv_async_send(&rrdeng_main.async)); +} + +static inline bool rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(STORAGE_PRIORITY priority, STORAGE_PRIORITY max_priority) { + for(; priority <= max_priority ; priority++) + if(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority]) + return true; + + return false; +} + +#define opcode_empty (struct rrdeng_cmd) { \ + .ctx = NULL, \ + .opcode = RRDENG_OPCODE_NOOP, \ + .priority = STORAGE_PRIORITY_BEST_EFFORT, \ + .completion = NULL, \ + .data = NULL, \ +} + +static inline struct rrdeng_cmd rrdeng_deq_cmd(bool from_worker) { + struct rrdeng_cmd *cmd = NULL; + enum LIBUV_WORKERS_STATUS status = work_request_full(); + + STORAGE_PRIORITY min_priority, max_priority; + min_priority = STORAGE_PRIORITY_INTERNAL_DBENGINE; + max_priority = (status != LIBUV_WORKERS_RELAXED) ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE - 1; + + if(from_worker) { + if(status == LIBUV_WORKERS_CRITICAL) + return opcode_empty; + + min_priority = STORAGE_PRIORITY_INTERNAL_QUERY_PREP; + max_priority = STORAGE_PRIORITY_BEST_EFFORT; + } + + // find an opcode to execute from the queue + spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); + for(STORAGE_PRIORITY priority = min_priority; priority <= max_priority ; priority++) { + cmd = rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority]; + if(cmd) { + + // avoid starvation of lower priorities + if(unlikely(priority >= STORAGE_PRIORITY_HIGH && + priority < STORAGE_PRIORITY_BEST_EFFORT && + ++rrdeng_main.cmd_queue.unsafe.executed_by_priority[priority] % 50 == 0 && + rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(priority + 1, max_priority))) { + // let the others run 2% of the requests + cmd = NULL; + continue; + } + + // remove it from the queue + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next); + rrdeng_main.cmd_queue.unsafe.waiting--; + break; + } + } + + if(cmd && cmd->dequeue_cb) { + cmd->dequeue_cb(cmd); + cmd->dequeue_cb = NULL; + } + + spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); + + struct rrdeng_cmd ret; + if(cmd) { + // copy it, to return it + ret = *cmd; + + aral_freez(rrdeng_main.cmd_queue.ar, cmd); + } + else + ret = opcode_empty; + + return ret; +} + + +// ---------------------------------------------------------------------------- + +void *dbengine_extent_alloc(size_t size) { + void *extent = mallocz(size); + return extent; +} + +void dbengine_extent_free(void *extent, size_t size __maybe_unused) { + freez(extent); +} + +static void journalfile_extent_build(struct rrdengine_instance *ctx, struct extent_io_descriptor *xt_io_descr) { + unsigned count, payload_length, descr_size, size_bytes; + void *buf; + /* persistent structures */ + struct rrdeng_df_extent_header *df_header; + struct rrdeng_jf_transaction_header *jf_header; + struct rrdeng_jf_store_data *jf_metric_data; + struct rrdeng_jf_transaction_trailer *jf_trailer; + uLong crc; + + df_header = xt_io_descr->buf; + count = df_header->number_of_pages; + descr_size = sizeof(*jf_metric_data->descr) * count; + payload_length = sizeof(*jf_metric_data) + descr_size; + size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer); + + xt_io_descr->wal = wal_get(ctx, size_bytes); + buf = xt_io_descr->wal->buf; + + jf_header = buf; + jf_header->type = STORE_DATA; + jf_header->reserved = 0; + jf_header->id = xt_io_descr->wal->transaction_id; + jf_header->payload_length = payload_length; + + jf_metric_data = buf + sizeof(*jf_header); + jf_metric_data->extent_offset = xt_io_descr->pos; + jf_metric_data->extent_size = xt_io_descr->bytes; + jf_metric_data->number_of_pages = count; + memcpy(jf_metric_data->descr, df_header->descr, descr_size); + + jf_trailer = buf + sizeof(*jf_header) + payload_length; + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, buf, sizeof(*jf_header) + payload_length); + crc32set(jf_trailer->checksum, crc); +} + +static void after_extent_flushed_to_open(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + if(completion) + completion_mark_complete(completion); + + if(ctx_is_available_for_queries(ctx)) + rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); +} + +static void *extent_flushed_to_open_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + worker_is_busy(UV_EVENT_DBENGINE_FLUSHED_TO_OPEN); + + uv_fs_t *uv_fs_request = data; + struct extent_io_descriptor *xt_io_descr = uv_fs_request->data; + struct page_descr_with_data *descr; + struct rrdengine_datafile *datafile; + unsigned i; + + datafile = xt_io_descr->datafile; + + bool still_running = ctx_is_available_for_queries(ctx); + + for (i = 0 ; i < xt_io_descr->descr_count ; ++i) { + descr = xt_io_descr->descr_array[i]; + + if (likely(still_running)) + pgc_open_add_hot_page( + (Word_t)ctx, descr->metric_id, + (time_t) (descr->start_time_ut / USEC_PER_SEC), + (time_t) (descr->end_time_ut / USEC_PER_SEC), + descr->update_every_s, + datafile, + xt_io_descr->pos, xt_io_descr->bytes, descr->page_length); + + page_descriptor_release(descr); + } + + uv_fs_req_cleanup(uv_fs_request); + posix_memfree(xt_io_descr->buf); + extent_io_descriptor_release(xt_io_descr); + + spinlock_lock(&datafile->writers.spinlock); + datafile->writers.flushed_to_open_running--; + spinlock_unlock(&datafile->writers.spinlock); + + if(datafile->fileno != ctx_last_fileno_get(ctx) && still_running) + // we just finished a flushing on a datafile that is not the active one + rrdeng_enq_cmd(ctx, RRDENG_OPCODE_JOURNAL_INDEX, datafile, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); + + return data; +} + +// Main event loop callback +static void after_extent_write_datafile_io(uv_fs_t *uv_fs_request) { + worker_is_busy(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_WRITE); + + struct extent_io_descriptor *xt_io_descr = uv_fs_request->data; + struct rrdengine_datafile *datafile = xt_io_descr->datafile; + struct rrdengine_instance *ctx = datafile->ctx; + + if (uv_fs_request->result < 0) { + ctx_io_error(ctx); + netdata_log_error("DBENGINE: %s: uv_fs_write(): %s", __func__, uv_strerror((int)uv_fs_request->result)); + } + + journalfile_v1_extent_write(ctx, xt_io_descr->datafile, xt_io_descr->wal, &rrdeng_main.loop); + + spinlock_lock(&datafile->writers.spinlock); + datafile->writers.running--; + datafile->writers.flushed_to_open_running++; + spinlock_unlock(&datafile->writers.spinlock); + + rrdeng_enq_cmd(xt_io_descr->ctx, + RRDENG_OPCODE_FLUSHED_TO_OPEN, + uv_fs_request, + xt_io_descr->completion, + STORAGE_PRIORITY_INTERNAL_DBENGINE, + NULL, + NULL); + + worker_is_idle(); +} + +static bool datafile_is_full(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) { + bool ret = false; + spinlock_lock(&datafile->writers.spinlock); + + if(ctx_is_available_for_queries(ctx) && datafile->pos > rrdeng_target_data_file_size(ctx)) + ret = true; + + spinlock_unlock(&datafile->writers.spinlock); + + return ret; +} + +static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_instance *ctx) { + struct rrdengine_datafile *datafile; + + // get the latest datafile + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + datafile = ctx->datafiles.first->prev; + // become a writer on this datafile, to prevent it from vanishing + spinlock_lock(&datafile->writers.spinlock); + datafile->writers.running++; + spinlock_unlock(&datafile->writers.spinlock); + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + + if(datafile_is_full(ctx, datafile)) { + // remember the datafile we have become writers to + struct rrdengine_datafile *old_datafile = datafile; + + // only 1 datafile creation at a time + static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER; + netdata_mutex_lock(&mutex); + + // take the latest datafile again - without this, multiple threads may create multiple files + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + datafile = ctx->datafiles.first->prev; + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + + if(datafile_is_full(ctx, datafile) && create_new_datafile_pair(ctx, true) == 0) + rrdeng_enq_cmd(ctx, RRDENG_OPCODE_JOURNAL_INDEX, datafile, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, + NULL); + + netdata_mutex_unlock(&mutex); + + // get the new latest datafile again, like above + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + datafile = ctx->datafiles.first->prev; + // become a writer on this datafile, to prevent it from vanishing + spinlock_lock(&datafile->writers.spinlock); + datafile->writers.running++; + spinlock_unlock(&datafile->writers.spinlock); + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + + // release the writers on the old datafile + spinlock_lock(&old_datafile->writers.spinlock); + old_datafile->writers.running--; + spinlock_unlock(&old_datafile->writers.spinlock); + } + + return datafile; +} + +/* + * Take a page list in a judy array and write them + */ +static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_instance *ctx, struct page_descr_with_data *base, struct completion *completion) { + int ret; + unsigned i, count, size_bytes, pos, real_io_size; + uint32_t uncompressed_payload_length, max_compressed_size, payload_offset; + struct page_descr_with_data *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; + struct extent_io_descriptor *xt_io_descr; + Word_t Index; + uint8_t compression_algorithm = ctx->config.global_compress_alg; + struct rrdengine_datafile *datafile; + /* persistent structures */ + struct rrdeng_df_extent_header *header; + struct rrdeng_df_extent_trailer *trailer; + uLong crc; + + for(descr = base, Index = 0, count = 0, uncompressed_payload_length = 0; + descr && count != rrdeng_pages_per_extent; + descr = descr->link.next, Index++) { + + uncompressed_payload_length += descr->page_length; + eligible_pages[count++] = descr; + + } + + if (!count) { + if (completion) + completion_mark_complete(completion); + + __atomic_sub_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED); + return NULL; + } + + xt_io_descr = extent_io_descriptor_get(); + xt_io_descr->ctx = ctx; + payload_offset = sizeof(*header) + count * sizeof(header->descr[0]); + max_compressed_size = dbengine_max_compressed_size(uncompressed_payload_length, compression_algorithm); + size_bytes = payload_offset + MAX(uncompressed_payload_length, max_compressed_size) + sizeof(*trailer); + ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); + if (unlikely(ret)) { + fatal("DBENGINE: posix_memalign:%s", strerror(ret)); + /* freez(xt_io_descr);*/ + } + memset(xt_io_descr->buf, 0, ALIGN_BYTES_CEILING(size_bytes)); + (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct page_descr_with_data *) * count); + xt_io_descr->descr_count = count; + + pos = 0; + header = xt_io_descr->buf; + header->number_of_pages = count; + pos += sizeof(*header); + + for (i = 0 ; i < count ; ++i) { + descr = xt_io_descr->descr_array[i]; + header->descr[i].type = descr->type; + uuid_copy(*(nd_uuid_t *)header->descr[i].uuid, *descr->id); + header->descr[i].page_length = descr->page_length; + header->descr[i].start_time_ut = descr->start_time_ut; + + switch (descr->type) { + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: + header->descr[i].end_time_ut = descr->end_time_ut; + break; + case RRDENG_PAGE_TYPE_GORILLA_32BIT: + header->descr[i].gorilla.delta_time_s = (uint32_t) ((descr->end_time_ut - descr->start_time_ut) / USEC_PER_SEC); + header->descr[i].gorilla.entries = pgd_slots_used(descr->pgd); + break; + default: + fatal("Unknown page type: %uc", descr->type); + } + + pos += sizeof(header->descr[i]); + } + + // build the extent payload + for (i = 0 ; i < count ; ++i) { + descr = xt_io_descr->descr_array[i]; + pgd_copy_to_extent(descr->pgd, xt_io_descr->buf + pos, descr->page_length); + pos += descr->page_length; + } + + // compress the payload + size_t compressed_size = + (int)dbengine_compress(xt_io_descr->buf + payload_offset, + uncompressed_payload_length, + compression_algorithm); + + internal_fatal(compressed_size > max_compressed_size, "DBENGINE: compression returned more data than the max allowed"); + internal_fatal(compressed_size > uncompressed_payload_length, "DBENGINE: compression returned more data than the uncompressed extent"); + + if(compressed_size) { + header->compression_algorithm = compression_algorithm; + header->payload_length = compressed_size; + } + else { + // compression failed, or generated bigger pages + // so it didn't touch our uncompressed buffer + header->compression_algorithm = RRDENG_COMPRESSION_NONE; + header->payload_length = compressed_size = uncompressed_payload_length; + } + + // set the correct size + size_bytes = payload_offset + compressed_size + sizeof(*trailer); + + if(compression_algorithm != RRDENG_COMPRESSION_NONE) { + __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED); + __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED); + } + + real_io_size = ALIGN_BYTES_CEILING(size_bytes); + + datafile = get_datafile_to_write_extent(ctx); + spinlock_lock(&datafile->writers.spinlock); + xt_io_descr->datafile = datafile; + xt_io_descr->pos = datafile->pos; + datafile->pos += real_io_size; + spinlock_unlock(&datafile->writers.spinlock); + + xt_io_descr->bytes = size_bytes; + xt_io_descr->uv_fs_request.data = xt_io_descr; + xt_io_descr->completion = completion; + + trailer = xt_io_descr->buf + size_bytes - sizeof(*trailer); + crc = crc32(0L, Z_NULL, 0); + crc = crc32(crc, xt_io_descr->buf, size_bytes - sizeof(*trailer)); + crc32set(trailer->checksum, crc); + + xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); + journalfile_extent_build(ctx, xt_io_descr); + + ctx_last_flush_fileno_set(ctx, datafile->fileno); + ctx_current_disk_space_increase(ctx, real_io_size); + ctx_io_write_op_bytes(ctx, real_io_size); + + return xt_io_descr; +} + +static void after_extent_write(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* uv_work_req __maybe_unused, int status __maybe_unused) { + struct extent_io_descriptor *xt_io_descr = data; + + if(xt_io_descr) { + int ret = uv_fs_write(&rrdeng_main.loop, + &xt_io_descr->uv_fs_request, + xt_io_descr->datafile->file, + &xt_io_descr->iov, + 1, + (int64_t) xt_io_descr->pos, + after_extent_write_datafile_io); + + fatal_assert(-1 != ret); + } +} + +static void *extent_write_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + worker_is_busy(UV_EVENT_DBENGINE_EXTENT_WRITE); + struct page_descr_with_data *base = data; + struct extent_io_descriptor *xt_io_descr = datafile_extent_build(ctx, base, completion); + return xt_io_descr; +} + +static void after_database_rotate(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + __atomic_store_n(&ctx->atomic.now_deleting_files, false, __ATOMIC_RELAXED); +} + +struct uuid_first_time_s { + nd_uuid_t *uuid; + time_t first_time_s; + METRIC *metric; + size_t pages_found; + size_t df_matched; + size_t df_index_oldest; +}; + +struct rrdengine_datafile *datafile_release_and_acquire_next_for_retention(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) { + + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + + struct rrdengine_datafile *next_datafile = datafile->next; + + while(next_datafile && !datafile_acquire(next_datafile, DATAFILE_ACQUIRE_RETENTION)) + next_datafile = next_datafile->next; + + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + + datafile_release(datafile, DATAFILE_ACQUIRE_RETENTION); + + return next_datafile; +} + +time_t find_uuid_first_time( + struct rrdengine_instance *ctx, + struct rrdengine_datafile *datafile, + struct uuid_first_time_s *uuid_first_entry_list, + size_t count) +{ + time_t global_first_time_s = LONG_MAX; + + // acquire the datafile to work with it + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + while(datafile && !datafile_acquire(datafile, DATAFILE_ACQUIRE_RETENTION)) + datafile = datafile->next; + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + + if (unlikely(!datafile)) + return global_first_time_s; + + unsigned journalfile_count = 0; + size_t binary_match = 0; + size_t not_matching_bsearches = 0; + + while (datafile) { + struct journal_v2_header *j2_header = journalfile_v2_data_acquire(datafile->journalfile, NULL, 0, 0); + if (!j2_header) { + datafile = datafile_release_and_acquire_next_for_retention(ctx, datafile); + continue; + } + + time_t journal_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC); + + if(journal_start_time_s < global_first_time_s) + global_first_time_s = journal_start_time_s; + + struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset); + struct uuid_first_time_s *uuid_original_entry; + + size_t journal_metric_count = j2_header->metric_count; + + for (size_t index = 0; index < count; ++index) { + uuid_original_entry = &uuid_first_entry_list[index]; + + // Check here if we should skip this + if (uuid_original_entry->df_matched > 3 || uuid_original_entry->pages_found > 5) + continue; + + struct journal_metric_list *live_entry = + bsearch(uuid_original_entry->uuid,uuid_list,journal_metric_count, + sizeof(*uuid_list), journal_metric_uuid_compare); + + if (!live_entry) { + // Not found in this journal + not_matching_bsearches++; + continue; + } + + uuid_original_entry->pages_found += live_entry->entries; + uuid_original_entry->df_matched++; + + time_t old_first_time_s = uuid_original_entry->first_time_s; + + // Calculate first / last for this match + time_t first_time_s = live_entry->delta_start_s + journal_start_time_s; + uuid_original_entry->first_time_s = MIN(uuid_original_entry->first_time_s, first_time_s); + + if (uuid_original_entry->first_time_s != old_first_time_s) + uuid_original_entry->df_index_oldest = uuid_original_entry->df_matched; + + binary_match++; + } + + journalfile_count++; + journalfile_v2_data_release(datafile->journalfile); + datafile = datafile_release_and_acquire_next_for_retention(ctx, datafile); + } + + // Let's scan the open cache for almost exact match + size_t open_cache_count = 0; + + size_t df_index[10] = { 0 }; + size_t without_metric = 0; + size_t open_cache_gave_first_time_s = 0; + size_t metric_count = 0; + size_t without_retention = 0; + size_t not_needed_bsearches = 0; + + for (size_t index = 0; index < count; ++index) { + struct uuid_first_time_s *uuid_first_t_entry = &uuid_first_entry_list[index]; + + metric_count++; + + size_t idx = uuid_first_t_entry->df_index_oldest; + if(idx >= 10) + idx = 9; + + df_index[idx]++; + + not_needed_bsearches += uuid_first_t_entry->df_matched - uuid_first_t_entry->df_index_oldest; + + if (unlikely(!uuid_first_t_entry->metric)) { + without_metric++; + continue; + } + + PGC_PAGE *page = pgc_page_get_and_acquire( + open_cache, (Word_t)ctx, + (Word_t)uuid_first_t_entry->metric, 0, + PGC_SEARCH_FIRST); + + if (page) { + time_t old_first_time_s = uuid_first_t_entry->first_time_s; + + time_t first_time_s = pgc_page_start_time_s(page); + uuid_first_t_entry->first_time_s = MIN(uuid_first_t_entry->first_time_s, first_time_s); + pgc_page_release(open_cache, page); + open_cache_count++; + + if(uuid_first_t_entry->first_time_s != old_first_time_s) { + open_cache_gave_first_time_s++; + } + } + else { + if(!uuid_first_t_entry->df_index_oldest) + without_retention++; + } + } + internal_error(true, + "DBENGINE: analyzed the retention of %zu rotated metrics of tier %d, " + "did %zu jv2 matching binary searches (%zu not matching, %zu overflown) in %u journal files, " + "%zu metrics with entries in open cache, " + "metrics first time found per datafile index ([not in jv2]:%zu, [1]:%zu, [2]:%zu, [3]:%zu, [4]:%zu, [5]:%zu, [6]:%zu, [7]:%zu, [8]:%zu, [bigger]: %zu), " + "open cache found first time %zu, " + "metrics without any remaining retention %zu, " + "metrics not in MRG %zu", + metric_count, + ctx->config.tier, + binary_match, + not_matching_bsearches, + not_needed_bsearches, + journalfile_count, + open_cache_count, + df_index[0], df_index[1], df_index[2], df_index[3], df_index[4], df_index[5], df_index[6], df_index[7], df_index[8], df_index[9], + open_cache_gave_first_time_s, + without_retention, + without_metric + ); + + return global_first_time_s; +} + +static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile_to_delete, struct rrdengine_datafile *first_datafile_remaining, bool worker) { + time_t global_first_time_s = LONG_MAX; + + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_FIND_ROTATED_METRICS); + + struct rrdengine_journalfile *journalfile = datafile_to_delete->journalfile; + struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, NULL, 0, 0); + + if (unlikely(!j2_header)) { + if (worker) + worker_is_idle(); + return; + } + + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.metrics_retention_started, 1, __ATOMIC_RELAXED); + + struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset); + + size_t count = j2_header->metric_count; + struct uuid_first_time_s *uuid_first_t_entry; + struct uuid_first_time_s *uuid_first_entry_list = callocz(count, sizeof(struct uuid_first_time_s)); + + size_t added = 0; + for (size_t index = 0; index < count; ++index) { + METRIC *metric = mrg_metric_get_and_acquire(main_mrg, &uuid_list[index].uuid, (Word_t) ctx); + if (!metric) + continue; + + uuid_first_entry_list[added].metric = metric; + uuid_first_entry_list[added].first_time_s = LONG_MAX; + uuid_first_entry_list[added].df_matched = 0; + uuid_first_entry_list[added].df_index_oldest = 0; + uuid_first_entry_list[added].uuid = mrg_metric_uuid(main_mrg, metric); + added++; + } + + netdata_log_info("DBENGINE: recalculating tier %d retention for %zu metrics starting with datafile %u", + ctx->config.tier, count, first_datafile_remaining->fileno); + + journalfile_v2_data_release(journalfile); + + // Update the first time / last time for all metrics we plan to delete + + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_FIND_REMAINING_RETENTION); + + global_first_time_s = find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added); + + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG); + + netdata_log_info("DBENGINE: updating tier %d metrics registry retention for %zu metrics", + ctx->config.tier, added); + + size_t deleted_metrics = 0, zero_retention_referenced = 0, zero_disk_retention = 0, zero_disk_but_live = 0; + for (size_t index = 0; index < added; ++index) { + uuid_first_t_entry = &uuid_first_entry_list[index]; + if (likely(uuid_first_t_entry->first_time_s != LONG_MAX)) { + + time_t old_first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric); + + bool changed = mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s); + if (changed) { + uint32_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric); + if (update_every_s && old_first_time_s && uuid_first_t_entry->first_time_s > old_first_time_s) { + uint64_t remove_samples = (uuid_first_t_entry->first_time_s - old_first_time_s) / update_every_s; + __atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED); + } + } + mrg_metric_release(main_mrg, uuid_first_t_entry->metric); + } + else { + zero_disk_retention++; + + // there is no retention for this metric + bool has_retention = mrg_metric_zero_disk_retention(main_mrg, uuid_first_t_entry->metric); + if (!has_retention) { + time_t first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric); + time_t last_time_s = mrg_metric_get_latest_time_s(main_mrg, uuid_first_t_entry->metric); + time_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric); + if (update_every_s && first_time_s && last_time_s) { + uint64_t remove_samples = (first_time_s - last_time_s) / update_every_s; + __atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED); + } + + bool deleted = mrg_metric_release_and_delete(main_mrg, uuid_first_t_entry->metric); + if(deleted) + deleted_metrics++; + else + zero_retention_referenced++; + } + else { + zero_disk_but_live++; + mrg_metric_release(main_mrg, uuid_first_t_entry->metric); + } + } + } + freez(uuid_first_entry_list); + + internal_error(zero_disk_retention, + "DBENGINE: deleted %zu metrics, zero retention but referenced %zu (out of %zu total, of which %zu have main cache retention) zero on-disk retention tier %d metrics from metrics registry", + deleted_metrics, zero_retention_referenced, zero_disk_retention, zero_disk_but_live, ctx->config.tier); + + if(global_first_time_s != LONG_MAX) + __atomic_store_n(&ctx->atomic.first_time_s, global_first_time_s, __ATOMIC_RELAXED); + + if(worker) + worker_is_idle(); +} + +void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool update_retention, bool worker) { + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE_WAIT); + + bool datafile_got_for_deletion = datafile_acquire_for_deletion(datafile); + + if (update_retention) + update_metrics_first_time_s(ctx, datafile, datafile->next, worker); + + while (!datafile_got_for_deletion) { + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE_WAIT); + + datafile_got_for_deletion = datafile_acquire_for_deletion(datafile); + + if (!datafile_got_for_deletion) { + netdata_log_info("DBENGINE: waiting for data file '%s/" + DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION + "' to be available for deletion, " + "it is in use currently by %u users.", + ctx->config.dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno, datafile->users.lockers); + + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_deletion_spin, 1, __ATOMIC_RELAXED); + sleep_usec(1 * USEC_PER_SEC); + } + } + + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_deletion_started, 1, __ATOMIC_RELAXED); + netdata_log_info("DBENGINE: deleting data file '%s/" + DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION + "'.", + ctx->config.dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); + + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE); + + struct rrdengine_journalfile *journal_file; + unsigned deleted_bytes, journal_file_bytes, datafile_bytes; + int ret; + char path[RRDENG_PATH_MAX]; + + uv_rwlock_wrlock(&ctx->datafiles.rwlock); + datafile_list_delete_unsafe(ctx, datafile); + uv_rwlock_wrunlock(&ctx->datafiles.rwlock); + + journal_file = datafile->journalfile; + datafile_bytes = datafile->pos; + journal_file_bytes = journalfile_current_size(journal_file); + deleted_bytes = journalfile_v2_data_size_get(journal_file); + + netdata_log_info("DBENGINE: deleting data and journal files to maintain disk quota"); + ret = journalfile_destroy_unsafe(journal_file, datafile); + if (!ret) { + journalfile_v1_generate_path(datafile, path, sizeof(path)); + netdata_log_info("DBENGINE: deleted journal file \"%s\".", path); + journalfile_v2_generate_path(datafile, path, sizeof(path)); + netdata_log_info("DBENGINE: deleted journal file \"%s\".", path); + deleted_bytes += journal_file_bytes; + } + ret = destroy_data_file_unsafe(datafile); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + netdata_log_info("DBENGINE: deleted data file \"%s\".", path); + deleted_bytes += datafile_bytes; + } + freez(journal_file); + freez(datafile); + + ctx_current_disk_space_decrease(ctx, deleted_bytes); + netdata_log_info("DBENGINE: reclaimed %u bytes of disk space.", deleted_bytes); +} + +static void *database_rotate_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + datafile_delete(ctx, ctx->datafiles.first, ctx_is_available_for_queries(ctx), true); + + if (rrdeng_ctx_tier_cap_exceeded(ctx)) + rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); + + rrdcontext_db_rotation(); + + return data; +} + +static void after_flush_all_hot_and_dirty_pages_of_section(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + ; +} + +static void *flush_all_hot_and_dirty_pages_of_section_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + worker_is_busy(UV_EVENT_DBENGINE_QUIESCE); + pgc_flush_all_hot_and_dirty_pages(main_cache, (Word_t)ctx); + completion_mark_complete(&ctx->quiesce.completion); + return data; +} + +static void after_populate_mrg(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + ; +} + +static void *populate_mrg_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG); + + do { + struct rrdengine_datafile *datafile = NULL; + + // find a datafile to work + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + for(datafile = ctx->datafiles.first; datafile ; datafile = datafile->next) { + if(!spinlock_trylock(&datafile->populate_mrg.spinlock)) + continue; + + if(datafile->populate_mrg.populated) { + spinlock_unlock(&datafile->populate_mrg.spinlock); + continue; + } + + // we have the spinlock and it is not populated + break; + } + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + + if(!datafile) + break; + + journalfile_v2_populate_retention_to_mrg(ctx, datafile->journalfile); + datafile->populate_mrg.populated = true; + spinlock_unlock(&datafile->populate_mrg.spinlock); + + } while(1); + + completion_mark_complete(completion); + + return data; +} + +static void after_ctx_shutdown(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + ; +} + +static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + worker_is_busy(UV_EVENT_DBENGINE_SHUTDOWN); + + bool logged = false; + while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED) || + __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) { + if(!logged) { + logged = true; + netdata_log_info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...", + __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED), ctx->config.tier); + } + sleep_usec(1 * USEC_PER_MS); + } + + completion_mark_complete(completion); + + return data; +} + +static void *cache_flush_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + if (!main_cache) + return data; + + worker_is_busy(UV_EVENT_DBENGINE_FLUSH_MAIN_CACHE); + pgc_flush_pages(main_cache, 0); + + return data; +} + +static void *cache_evict_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) { + if (!main_cache) + return data; + + worker_is_busy(UV_EVENT_DBENGINE_EVICT_MAIN_CACHE); + pgc_evict_pages(main_cache, 0, 0); + + return data; +} + +static void *query_prep_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) { + PDC *pdc = data; + rrdeng_prep_query(pdc, true); + return data; +} + +uint64_t rrdeng_target_data_file_size(struct rrdengine_instance *ctx) { + uint64_t target_size = ctx->config.max_disk_space ? ctx->config.max_disk_space / TARGET_DATAFILES : MAX_DATAFILE_SIZE; + target_size = MIN(target_size, MAX_DATAFILE_SIZE); + target_size = MAX(target_size, MIN_DATAFILE_SIZE); + return target_size; +} + +time_t get_datafile_end_time(struct rrdengine_instance *ctx) +{ + time_t last_time_s = 0; + + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + struct rrdengine_datafile *datafile = ctx->datafiles.first; + + if (datafile) { + last_time_s = datafile->journalfile->v2.last_time_s; + if (!last_time_s) + last_time_s = datafile->journalfile->v2.first_time_s; + } + + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + return last_time_s; +} + +/* return 0 on success */ +int init_rrd_files(struct rrdengine_instance *ctx) +{ + return init_data_files(ctx); +} + +void finalize_rrd_files(struct rrdengine_instance *ctx) +{ + return finalize_data_files(ctx); +} + +void async_cb(uv_async_t *handle) +{ + uv_stop(handle->loop); + uv_update_time(handle->loop); + netdata_log_debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle)); +} + +#define TIMER_PERIOD_MS (1000) + + +static void *extent_read_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + EPDL *epdl = data; + epdl_find_extent_and_populate_pages(ctx, epdl, true); + return data; +} + +static void epdl_populate_pages_asynchronously(struct rrdengine_instance *ctx, EPDL *epdl, STORAGE_PRIORITY priority) { + rrdeng_enq_cmd(ctx, RRDENG_OPCODE_EXTENT_READ, epdl, NULL, priority, + rrdeng_enqueue_epdl_cmd, rrdeng_dequeue_epdl_cmd); +} + +void pdc_route_asynchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc) { + pdc_to_epdl_router(ctx, pdc, epdl_populate_pages_asynchronously, epdl_populate_pages_asynchronously); +} + +void epdl_populate_pages_synchronously(struct rrdengine_instance *ctx, EPDL *epdl, enum storage_priority priority __maybe_unused) { + epdl_find_extent_and_populate_pages(ctx, epdl, false); +} + +void pdc_route_synchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc) { + pdc_to_epdl_router(ctx, pdc, epdl_populate_pages_synchronously, epdl_populate_pages_synchronously); +} + +#define MAX_RETRIES_TO_START_INDEX (100) +static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + unsigned count = 0; + worker_is_busy(UV_EVENT_DBENGINE_JOURNAL_INDEX_WAIT); + + while (__atomic_load_n(&ctx->atomic.now_deleting_files, __ATOMIC_RELAXED) && count++ < MAX_RETRIES_TO_START_INDEX) + sleep_usec(100 * USEC_PER_MS); + + if (count == MAX_RETRIES_TO_START_INDEX) { + worker_is_idle(); + return data; + } + + struct rrdengine_datafile *datafile = ctx->datafiles.first; + worker_is_busy(UV_EVENT_DBENGINE_JOURNAL_INDEX); + count = 0; + while (datafile && datafile->fileno != ctx_last_fileno_get(ctx) && datafile->fileno != ctx_last_flush_fileno_get(ctx)) { + if(journalfile_v2_data_available(datafile->journalfile)) { + // journal file v2 is already there for this datafile + datafile = datafile->next; + continue; + } + + spinlock_lock(&datafile->writers.spinlock); + bool available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true; + spinlock_unlock(&datafile->writers.spinlock); + + if(!available) { + nd_log(NDLS_DAEMON, NDLP_NOTICE, + "DBENGINE: journal file %u needs to be indexed, but it has writers working on it - " + "skipping it for now", + datafile->fileno); + + datafile = datafile->next; + continue; + } + + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "DBENGINE: journal file %u is ready to be indexed", + datafile->fileno); + + pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type, + journalfile_migrate_to_v2_callback, (void *) datafile->journalfile); + + count++; + + datafile = datafile->next; + + if (unlikely(!ctx_is_available_for_queries(ctx))) + break; + } + + errno = 0; + if(count) + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "DBENGINE: journal indexing done; %u files processed", + count); + + worker_is_idle(); + + return data; +} + +static void after_do_cache_flush(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + rrdeng_main.flushes_running--; +} + +static void after_do_cache_evict(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + rrdeng_main.evictions_running--; +} + +static void after_journal_v2_indexing(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + __atomic_store_n(&ctx->atomic.migration_to_v2_running, false, __ATOMIC_RELAXED); + rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); +} + +struct rrdeng_buffer_sizes rrdeng_get_buffer_sizes(void) { + return (struct rrdeng_buffer_sizes) { + .pgc = pgc_aral_overhead() + pgc_aral_structures(), + .mrg = mrg_aral_overhead() + mrg_aral_structures(), + .opcodes = aral_overhead(rrdeng_main.cmd_queue.ar) + aral_structures(rrdeng_main.cmd_queue.ar), + .handles = aral_overhead(rrdeng_main.handles.ar) + aral_structures(rrdeng_main.handles.ar), + .descriptors = aral_overhead(rrdeng_main.descriptors.ar) + aral_structures(rrdeng_main.descriptors.ar), + .wal = __atomic_load_n(&wal_globals.atomics.allocated, __ATOMIC_RELAXED) * (sizeof(WAL) + RRDENG_BLOCK_SIZE), + .workers = aral_overhead(rrdeng_main.work_cmd.ar), + .pdc = pdc_cache_size(), + .xt_io = aral_overhead(rrdeng_main.xt_io_descr.ar) + aral_structures(rrdeng_main.xt_io_descr.ar), + .xt_buf = extent_buffer_cache_size(), + .epdl = epdl_cache_size(), + .deol = deol_cache_size(), + .pd = pd_cache_size(), + +#ifdef PDC_USE_JULYL + .julyl = julyl_cache_size(), +#endif + }; +} + +static void after_cleanup(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + rrdeng_main.cleanup_running--; +} + +static void *cleanup_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + worker_is_busy(UV_EVENT_DBENGINE_BUFFERS_CLEANUP); + + wal_cleanup1(); + extent_buffer_cleanup1(); + + { + static time_t last_run_s = 0; + time_t now_s = now_monotonic_sec(); + if(now_s - last_run_s >= 10) { + last_run_s = now_s; + journalfile_v2_data_unmount_cleanup(now_s); + } + } + +#ifdef PDC_USE_JULYL + julyl_cleanup1(); +#endif + + return data; +} + +uint64_t get_used_disk_space(struct rrdengine_instance *ctx) +{ + uint64_t active_space = 0; + + if (ctx->datafiles.first && ctx->datafiles.first->prev) + active_space = ctx->datafiles.first->prev->pos; + + uint64_t estimated_disk_space = ctx_current_disk_space_get(ctx) + rrdeng_target_data_file_size(ctx) - active_space; + + uint64_t database_space = get_total_database_space(); + uint64_t adjusted_database_space = database_space * ctx->config.disk_percentage / 100 ; + estimated_disk_space += adjusted_database_space; + + return estimated_disk_space; +} + +static time_t get_tier_retention(struct rrdengine_instance *ctx) +{ + time_t retention = 0; + if (localhost) { + STORAGE_ENGINE *eng = localhost->db[ctx->config.tier].eng; + if (eng) { + time_t first_time_s = get_datafile_end_time(ctx); + if (first_time_s) + retention = now_realtime_sec() - first_time_s; + } + } + return retention; +} + +// Check if disk or retention time cap reached +bool rrdeng_ctx_tier_cap_exceeded(struct rrdengine_instance *ctx) +{ + if(!ctx->datafiles.first) + // no datafiles available + return false; + + if(!ctx->datafiles.first->next) + // only 1 datafile available + return false; + + uint64_t estimated_disk_space = get_used_disk_space(ctx); + time_t retention = get_tier_retention(ctx); + + if (ctx->config.max_retention_s && retention > ctx->config.max_retention_s) + return true; + + if (ctx->config.max_disk_space && estimated_disk_space > ctx->config.max_disk_space) + return true; + + return false; +} + +void retention_timer_cb(uv_timer_t *handle) +{ + if (!localhost) + return; + + worker_is_busy(RRDENG_TIMER_CB); + uv_stop(handle->loop); + uv_update_time(handle->loop); + + for (size_t tier = 0; tier < storage_tiers; tier++) { + STORAGE_ENGINE *eng = localhost->db[tier].eng; + if (!eng || eng->seb != STORAGE_ENGINE_BACKEND_DBENGINE) + continue; + bool cleanup = rrdeng_ctx_tier_cap_exceeded(multidb_ctx[tier]); + if (cleanup) + rrdeng_enq_cmd(multidb_ctx[tier], RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); + } + + worker_is_idle(); +} + +void timer_cb(uv_timer_t* handle) { + worker_is_busy(RRDENG_TIMER_CB); + uv_stop(handle->loop); + uv_update_time(handle->loop); + + worker_set_metric(RRDENG_OPCODES_WAITING, (NETDATA_DOUBLE)rrdeng_main.cmd_queue.unsafe.waiting); + worker_set_metric(RRDENG_WORKS_DISPATCHED, (NETDATA_DOUBLE)__atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED)); + worker_set_metric(RRDENG_WORKS_EXECUTING, (NETDATA_DOUBLE)__atomic_load_n(&rrdeng_main.work_cmd.atomics.executing, __ATOMIC_RELAXED)); + + rrdeng_enq_cmd(NULL, RRDENG_OPCODE_FLUSH_INIT, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); + rrdeng_enq_cmd(NULL, RRDENG_OPCODE_EVICT_INIT, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); + rrdeng_enq_cmd(NULL, RRDENG_OPCODE_CLEANUP, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); + + worker_is_idle(); +} + +static void dbengine_initialize_structures(void) { + pgc_and_mrg_initialize(); + + pdc_init(); + page_details_init(); + epdl_init(); + deol_init(); + rrdeng_cmd_queue_init(); + work_request_init(); + rrdeng_query_handle_init(); + page_descriptors_init(); + extent_buffer_init(); + pgd_init_arals(); + extent_io_descriptor_init(); +} + +bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) { + static bool spawned = false; + static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER; + + spinlock_lock(&spinlock); + + if(!spawned) { + int ret; + + ret = uv_loop_init(&rrdeng_main.loop); + if (ret) { + netdata_log_error("DBENGINE: uv_loop_init(): %s", uv_strerror(ret)); + return false; + } + rrdeng_main.loop.data = &rrdeng_main; + + ret = uv_async_init(&rrdeng_main.loop, &rrdeng_main.async, async_cb); + if (ret) { + netdata_log_error("DBENGINE: uv_async_init(): %s", uv_strerror(ret)); + fatal_assert(0 == uv_loop_close(&rrdeng_main.loop)); + return false; + } + rrdeng_main.async.data = &rrdeng_main; + + ret = uv_timer_init(&rrdeng_main.loop, &rrdeng_main.timer); + if (ret) { + netdata_log_error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret)); + uv_close((uv_handle_t *)&rrdeng_main.async, NULL); + fatal_assert(0 == uv_loop_close(&rrdeng_main.loop)); + return false; + } + + ret = uv_timer_init(&rrdeng_main.loop, &rrdeng_main.retention_timer); + if (ret) { + netdata_log_error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret)); + uv_close((uv_handle_t *)&rrdeng_main.async, NULL); + fatal_assert(0 == uv_loop_close(&rrdeng_main.loop)); + return false; + } + + rrdeng_main.timer.data = &rrdeng_main; + rrdeng_main.retention_timer.data = &rrdeng_main; + + dbengine_initialize_structures(); + + fatal_assert(0 == uv_thread_create(&rrdeng_main.thread, dbengine_event_loop, &rrdeng_main)); + spawned = true; + } + + spinlock_unlock(&spinlock); + return true; +} + +static inline void worker_dispatch_extent_read(struct rrdeng_cmd cmd, bool from_worker) { + struct rrdengine_instance *ctx = cmd.ctx; + EPDL *epdl = cmd.data; + + if(from_worker) + epdl_find_extent_and_populate_pages(ctx, epdl, true); + else + work_dispatch(ctx, epdl, NULL, cmd.opcode, extent_read_tp_worker, NULL); +} + +static inline void worker_dispatch_query_prep(struct rrdeng_cmd cmd, bool from_worker) { + struct rrdengine_instance *ctx = cmd.ctx; + PDC *pdc = cmd.data; + + if(from_worker) + rrdeng_prep_query(pdc, true); + else + work_dispatch(ctx, pdc, NULL, cmd.opcode, query_prep_tp_worker, NULL); +} + +uint64_t get_directory_free_bytes_space(struct rrdengine_instance *ctx) +{ + uint64_t free_bytes = 0; + struct statvfs buff_statvfs; + if (statvfs(ctx->config.dbfiles_path, &buff_statvfs) == 0) + free_bytes = buff_statvfs.f_bavail * buff_statvfs.f_bsize; + + return (free_bytes - (free_bytes * 5 / 100)); +} + +void calculate_tier_disk_space_percentage(void) +{ + uint64_t tier_space[RRD_STORAGE_TIERS]; + + if (!localhost) + return; + + uint64_t total_diskspace = 0; + for(size_t tier = 0; tier < storage_tiers ;tier++) { + STORAGE_ENGINE *eng = localhost->db[tier].eng; + if (!eng || eng->seb != STORAGE_ENGINE_BACKEND_DBENGINE) { + tier_space[tier] = 0; + continue; + } + uint64_t tier_disk_space = multidb_ctx[tier]->config.max_disk_space ? + multidb_ctx[tier]->config.max_disk_space : + get_directory_free_bytes_space(multidb_ctx[tier]); + total_diskspace += tier_disk_space; + tier_space[tier] = tier_disk_space; + } + + if (total_diskspace) { + for (size_t tier = 0; tier < storage_tiers; tier++) { + multidb_ctx[tier]->config.disk_percentage = (100 * tier_space[tier] / total_diskspace); + } + } +} + +void dbengine_retention_statistics(void) +{ + static bool init = false; + static DBENGINE_TIER_STATS stats[RRD_STORAGE_TIERS]; + + if (!localhost) + return; + + calculate_tier_disk_space_percentage(); + + for (size_t tier = 0; tier < storage_tiers; tier++) { + STORAGE_ENGINE *eng = localhost->db[tier].eng; + if (!eng || eng->seb != STORAGE_ENGINE_BACKEND_DBENGINE) + continue; + + if (init == false) { + char id[200]; + snprintfz(id, sizeof(id) - 1, "dbengine_retention_tier%zu", tier); + stats[tier].st = rrdset_create_localhost( + "netdata", + id, + NULL, + "dbengine retention", + "netdata.dbengine_tier_retention", + "dbengine space and time retention", + "%", + "netdata", + "stats", + 134900, // before "dbengine memory" (dbengine2_statistics_charts) + 10, + RRDSET_TYPE_LINE); + + stats[tier].rd_space = rrddim_add(stats[tier].st, "space", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + stats[tier].rd_time = rrddim_add(stats[tier].st, "time", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE); + + char tier_str[5]; + snprintfz(tier_str, 4, "%zu", tier); + rrdlabels_add(stats[tier].st->rrdlabels, "tier", tier_str, RRDLABEL_SRC_AUTO); + + rrdset_flag_set(stats[tier].st, RRDSET_FLAG_METADATA_UPDATE); + rrdhost_flag_set(stats[tier].st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE); + rrdset_metadata_updated(stats[tier].st); + } + + time_t first_time_s = storage_engine_global_first_time_s(eng->seb, localhost->db[tier].si); + time_t retention = first_time_s ? now_realtime_sec() - first_time_s : 0; + + // + // Note: storage_engine_disk_space_used is the exact diskspace (as reported by api/v2/node_instances + // get_used_disk_space is used to determine if database cleanup (file rotation should happen) + // and adds to the disk space used the desired file size of the active + // datafile + uint64_t disk_space = get_used_disk_space(multidb_ctx[tier]); + //uint64_t disk_space = storage_engine_disk_space_used(eng->seb, localhost->db[tier].si); + + uint64_t config_disk_space = storage_engine_disk_space_max(eng->seb, localhost->db[tier].si); + if (!config_disk_space) { + config_disk_space = get_directory_free_bytes_space(multidb_ctx[tier]); + config_disk_space += disk_space; + } + + collected_number disk_percentage = (collected_number) (config_disk_space ? 100 * disk_space / config_disk_space : 0); + + collected_number retention_percentage = (collected_number)multidb_ctx[tier]->config.max_retention_s ? + 100 * retention / multidb_ctx[tier]->config.max_retention_s : + 0; + + if (retention_percentage > 100) + retention_percentage = 100; + + rrddim_set_by_pointer(stats[tier].st, stats[tier].rd_space, (collected_number) disk_percentage); + rrddim_set_by_pointer(stats[tier].st, stats[tier].rd_time, (collected_number) retention_percentage); + + rrdset_done(stats[tier].st); + } + init = true; +} + +void dbengine_event_loop(void* arg) { + sanity_check(); + uv_thread_set_name_np("DBENGINE"); + service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); + + worker_register("DBENGINE"); + + // opcode jobs + worker_register_job_name(RRDENG_OPCODE_NOOP, "noop"); + + worker_register_job_name(RRDENG_OPCODE_QUERY, "query"); + worker_register_job_name(RRDENG_OPCODE_EXTENT_WRITE, "extent write"); + worker_register_job_name(RRDENG_OPCODE_EXTENT_READ, "extent read"); + worker_register_job_name(RRDENG_OPCODE_FLUSHED_TO_OPEN, "flushed to open"); + worker_register_job_name(RRDENG_OPCODE_DATABASE_ROTATE, "db rotate"); + worker_register_job_name(RRDENG_OPCODE_JOURNAL_INDEX, "journal index"); + worker_register_job_name(RRDENG_OPCODE_FLUSH_INIT, "flush init"); + worker_register_job_name(RRDENG_OPCODE_EVICT_INIT, "evict init"); + worker_register_job_name(RRDENG_OPCODE_CTX_SHUTDOWN, "ctx shutdown"); + worker_register_job_name(RRDENG_OPCODE_CTX_QUIESCE, "ctx quiesce"); + worker_register_job_name(RRDENG_OPCODE_SHUTDOWN_EVLOOP, "dbengine shutdown"); + + worker_register_job_name(RRDENG_OPCODE_MAX, "get opcode"); + + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_QUERY, "query cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_WRITE, "extent write cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_READ, "extent read cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_FLUSHED_TO_OPEN, "flushed to open cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_DATABASE_ROTATE, "db rotate cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_JOURNAL_INDEX, "journal index cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_FLUSH_INIT, "flush init cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EVICT_INIT, "evict init cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_CTX_SHUTDOWN, "ctx shutdown cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_CTX_QUIESCE, "ctx quiesce cb"); + + // special jobs + worker_register_job_name(RRDENG_TIMER_CB, "timer"); + worker_register_job_name(RRDENG_FLUSH_TRANSACTION_BUFFER_CB, "transaction buffer flush cb"); + + worker_register_job_custom_metric(RRDENG_OPCODES_WAITING, "opcodes waiting", "opcodes", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(RRDENG_WORKS_DISPATCHED, "works dispatched", "works", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(RRDENG_WORKS_EXECUTING, "works executing", "works", WORKER_METRIC_ABSOLUTE); + + struct rrdeng_main *main = arg; + enum rrdeng_opcode opcode; + struct rrdeng_cmd cmd; + main->tid = gettid_cached(); + + fatal_assert(0 == uv_timer_start(&main->timer, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); + fatal_assert(0 == uv_timer_start(&main->retention_timer, retention_timer_cb, TIMER_PERIOD_MS * 60, TIMER_PERIOD_MS * 60)); + + bool shutdown = false; + while (likely(!shutdown)) { + worker_is_idle(); + uv_run(&main->loop, UV_RUN_DEFAULT); + + /* wait for commands */ + do { + worker_is_busy(RRDENG_OPCODE_MAX); + cmd = rrdeng_deq_cmd(RRDENG_OPCODE_NOOP); + opcode = cmd.opcode; + + worker_is_busy(opcode); + + switch (opcode) { + case RRDENG_OPCODE_EXTENT_READ: + worker_dispatch_extent_read(cmd, false); + break; + + case RRDENG_OPCODE_QUERY: + worker_dispatch_query_prep(cmd, false); + break; + + case RRDENG_OPCODE_EXTENT_WRITE: { + struct rrdengine_instance *ctx = cmd.ctx; + struct page_descr_with_data *base = cmd.data; + struct completion *completion = cmd.completion; // optional + work_dispatch(ctx, base, completion, opcode, extent_write_tp_worker, after_extent_write); + break; + } + + case RRDENG_OPCODE_FLUSHED_TO_OPEN: { + struct rrdengine_instance *ctx = cmd.ctx; + uv_fs_t *uv_fs_request = cmd.data; + struct extent_io_descriptor *xt_io_descr = uv_fs_request->data; + struct completion *completion = xt_io_descr->completion; + work_dispatch(ctx, uv_fs_request, completion, opcode, extent_flushed_to_open_tp_worker, after_extent_flushed_to_open); + break; + } + + case RRDENG_OPCODE_FLUSH_INIT: { + if(rrdeng_main.flushes_running < (size_t)(libuv_worker_threads / 4)) { + rrdeng_main.flushes_running++; + work_dispatch(NULL, NULL, NULL, opcode, cache_flush_tp_worker, after_do_cache_flush); + } + break; + } + + case RRDENG_OPCODE_EVICT_INIT: { + if(!rrdeng_main.evictions_running) { + rrdeng_main.evictions_running++; + work_dispatch(NULL, NULL, NULL, opcode, cache_evict_tp_worker, after_do_cache_evict); + } + break; + } + + case RRDENG_OPCODE_CLEANUP: { + if(!rrdeng_main.cleanup_running) { + rrdeng_main.cleanup_running++; + work_dispatch(NULL, NULL, NULL, opcode, cleanup_tp_worker, after_cleanup); + } + break; + } + + case RRDENG_OPCODE_JOURNAL_INDEX: { + struct rrdengine_instance *ctx = cmd.ctx; + struct rrdengine_datafile *datafile = cmd.data; + if(!__atomic_load_n(&ctx->atomic.migration_to_v2_running, __ATOMIC_RELAXED)) { + + __atomic_store_n(&ctx->atomic.migration_to_v2_running, true, __ATOMIC_RELAXED); + work_dispatch(ctx, datafile, NULL, opcode, journal_v2_indexing_tp_worker, after_journal_v2_indexing); + } + break; + } + + case RRDENG_OPCODE_DATABASE_ROTATE: { + struct rrdengine_instance *ctx = cmd.ctx; + if (!__atomic_load_n(&ctx->atomic.now_deleting_files, __ATOMIC_RELAXED) && + ctx->datafiles.first->next != NULL && + ctx->datafiles.first->next->next != NULL && + rrdeng_ctx_tier_cap_exceeded(ctx)) { + + __atomic_store_n(&ctx->atomic.now_deleting_files, true, __ATOMIC_RELAXED); + work_dispatch(ctx, NULL, NULL, opcode, database_rotate_tp_worker, after_database_rotate); + } + break; + } + + case RRDENG_OPCODE_CTX_POPULATE_MRG: { + struct rrdengine_instance *ctx = cmd.ctx; + struct completion *completion = cmd.completion; + work_dispatch(ctx, NULL, completion, opcode, populate_mrg_tp_worker, after_populate_mrg); + break; + } + + case RRDENG_OPCODE_CTX_QUIESCE: { + // a ctx will shutdown shortly + struct rrdengine_instance *ctx = cmd.ctx; + __atomic_store_n(&ctx->quiesce.enabled, true, __ATOMIC_RELEASE); + work_dispatch(ctx, NULL, NULL, opcode, + flush_all_hot_and_dirty_pages_of_section_tp_worker, + after_flush_all_hot_and_dirty_pages_of_section); + break; + } + + case RRDENG_OPCODE_CTX_SHUTDOWN: { + // a ctx is shutting down + struct rrdengine_instance *ctx = cmd.ctx; + struct completion *completion = cmd.completion; + work_dispatch(ctx, NULL, completion, opcode, ctx_shutdown_tp_worker, after_ctx_shutdown); + break; + } + + case RRDENG_OPCODE_SHUTDOWN_EVLOOP: { + uv_close((uv_handle_t *)&main->async, NULL); + (void) uv_timer_stop(&main->timer); + uv_close((uv_handle_t *)&main->timer, NULL); + + (void) uv_timer_stop(&main->retention_timer); + uv_close((uv_handle_t *)&main->retention_timer, NULL); + shutdown = true; + break; + } + + case RRDENG_OPCODE_NOOP: { + /* the command queue was empty, do nothing */ + break; + } + + // not opcodes + case RRDENG_OPCODE_MAX: + default: { + internal_fatal(true, "DBENGINE: unknown opcode"); + break; + } + } + + } while (opcode != RRDENG_OPCODE_NOOP); + } + + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Shutting down dbengine thread"); + uv_loop_close(&main->loop); + worker_unregister(); +} |