diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-02-06 16:11:30 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2023-02-06 16:11:30 +0000 |
commit | aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7 (patch) | |
tree | 941cbdd387b41c1a81587c20a6df9f0e5e0ff7ab /database/engine/rrdengine.c | |
parent | Adding upstream version 1.37.1. (diff) | |
download | netdata-aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7.tar.xz netdata-aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7.zip |
Adding upstream version 1.38.0.upstream/1.38.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine/rrdengine.c')
-rw-r--r-- | database/engine/rrdengine.c | 2634 |
1 files changed, 1465 insertions, 1169 deletions
diff --git a/database/engine/rrdengine.c b/database/engine/rrdengine.c index a6840f38c..d64868f03 100644 --- a/database/engine/rrdengine.c +++ b/database/engine/rrdengine.c @@ -2,6 +2,7 @@ #define NETDATA_RRD_INTERNALS #include "rrdengine.h" +#include "pdc.h" rrdeng_stats_t global_io_errors = 0; rrdeng_stats_t global_fs_errors = 0; @@ -11,31 +12,74 @@ rrdeng_stats_t global_flushing_pressure_page_deletions = 0; unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT; -#if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2) +#if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_OPCODE_MAX + 2) #error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2) #endif -void *dbengine_page_alloc() { - void *page = NULL; - if (unlikely(db_engine_use_malloc)) - page = mallocz(RRDENG_BLOCK_SIZE); - else { - page = netdata_mmap(NULL, RRDENG_BLOCK_SIZE, MAP_PRIVATE, enable_ksm); - if(!page) fatal("Cannot allocate dbengine page cache page, with mmap()"); - } - return page; -} - -void dbengine_page_free(void *page) { - if (unlikely(db_engine_use_malloc)) - freez(page); - else - netdata_munmap(page, RRDENG_BLOCK_SIZE); -} +struct rrdeng_main { + uv_thread_t thread; + uv_loop_t loop; + uv_async_t async; + uv_timer_t timer; + pid_t tid; + + size_t flushes_running; + size_t evictions_running; + size_t cleanup_running; + + struct { + ARAL *ar; + + struct { + SPINLOCK spinlock; + + size_t waiting; + struct rrdeng_cmd *waiting_items_by_priority[STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE]; + size_t executed_by_priority[STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE]; + } unsafe; + } cmd_queue; + + struct { + ARAL *ar; + + struct { + size_t dispatched; + size_t executing; + size_t pending_cb; + } atomics; + } work_cmd; + + struct { + ARAL *ar; + } handles; + + struct { + ARAL *ar; + } descriptors; + + struct { + ARAL *ar; + } xt_io_descr; + +} rrdeng_main = { + .thread = 0, + .loop = {}, + .async = {}, + .timer = {}, + .flushes_running = 0, + .evictions_running = 0, + .cleanup_running = 0, + + .cmd_queue = { + .unsafe = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + }, + } +}; static void sanity_check(void) { - BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_MAX_OPCODE + 2)); + BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_OPCODE_MAX + 2)); /* Magic numbers must fit in the super-blocks */ BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ); @@ -54,519 +98,489 @@ static void sanity_check(void) BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255); /* extent cache count must fit in 32 bits */ - BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32); +// BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32); /* page info scratch space must be able to hold 2 32-bit integers */ BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t)); } -/* always inserts into tail */ -static inline void xt_cache_replaceQ_insert(struct rrdengine_worker_config* wc, - struct extent_cache_element *xt_cache_elem) -{ - struct extent_cache *xt_cache = &wc->xt_cache; +// ---------------------------------------------------------------------------- +// work request cache - xt_cache_elem->prev = NULL; - xt_cache_elem->next = NULL; +typedef void *(*work_cb)(struct rrdengine_instance *ctx, void *data, struct completion *completion, uv_work_t* req); +typedef void (*after_work_cb)(struct rrdengine_instance *ctx, void *data, struct completion *completion, uv_work_t* req, int status); - if (likely(NULL != xt_cache->replaceQ_tail)) { - xt_cache_elem->prev = xt_cache->replaceQ_tail; - xt_cache->replaceQ_tail->next = xt_cache_elem; - } - if (unlikely(NULL == xt_cache->replaceQ_head)) { - xt_cache->replaceQ_head = xt_cache_elem; - } - xt_cache->replaceQ_tail = xt_cache_elem; +struct rrdeng_work { + uv_work_t req; + + struct rrdengine_instance *ctx; + void *data; + struct completion *completion; + + work_cb work_cb; + after_work_cb after_work_cb; + enum rrdeng_opcode opcode; +}; + +static void work_request_init(void) { + rrdeng_main.work_cmd.ar = aral_create( + "dbengine-work-cmd", + sizeof(struct rrdeng_work), + 0, + 65536, NULL, + NULL, NULL, false, false + ); } -static inline void xt_cache_replaceQ_delete(struct rrdengine_worker_config* wc, - struct extent_cache_element *xt_cache_elem) -{ - struct extent_cache *xt_cache = &wc->xt_cache; - struct extent_cache_element *prev, *next; +static inline bool work_request_full(void) { + return __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED) >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS); +} - prev = xt_cache_elem->prev; - next = xt_cache_elem->next; +static inline void work_done(struct rrdeng_work *work_request) { + aral_freez(rrdeng_main.work_cmd.ar, work_request); +} - if (likely(NULL != prev)) { - prev->next = next; - } - if (likely(NULL != next)) { - next->prev = prev; - } - if (unlikely(xt_cache_elem == xt_cache->replaceQ_head)) { - xt_cache->replaceQ_head = next; - } - if (unlikely(xt_cache_elem == xt_cache->replaceQ_tail)) { - xt_cache->replaceQ_tail = prev; - } - xt_cache_elem->prev = xt_cache_elem->next = NULL; +static void work_standard_worker(uv_work_t *req) { + __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED); + + register_libuv_worker_jobs(); + worker_is_busy(UV_EVENT_WORKER_INIT); + + struct rrdeng_work *work_request = req->data; + work_request->data = work_request->work_cb(work_request->ctx, work_request->data, work_request->completion, req); + worker_is_idle(); + + __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED); + __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED); + + // signal the event loop a worker is available + fatal_assert(0 == uv_async_send(&rrdeng_main.async)); } -static inline void xt_cache_replaceQ_set_hot(struct rrdengine_worker_config* wc, - struct extent_cache_element *xt_cache_elem) -{ - xt_cache_replaceQ_delete(wc, xt_cache_elem); - xt_cache_replaceQ_insert(wc, xt_cache_elem); +static void after_work_standard_callback(uv_work_t* req, int status) { + struct rrdeng_work *work_request = req->data; + + worker_is_busy(RRDENG_OPCODE_MAX + work_request->opcode); + + if(work_request->after_work_cb) + work_request->after_work_cb(work_request->ctx, work_request->data, work_request->completion, req, status); + + work_done(work_request); + __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED); + + worker_is_idle(); } -/* Returns the index of the cached extent if it was successfully inserted in the extent cache, otherwise -1 */ -static int try_insert_into_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent) -{ - struct extent_cache *xt_cache = &wc->xt_cache; - struct extent_cache_element *xt_cache_elem; - unsigned idx; - int ret; +static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb work_cb, after_work_cb after_work_cb) { + struct rrdeng_work *work_request = NULL; - ret = find_first_zero(xt_cache->allocation_bitmap); - if (-1 == ret || ret >= MAX_CACHED_EXTENTS) { - for (xt_cache_elem = xt_cache->replaceQ_head ; NULL != xt_cache_elem ; xt_cache_elem = xt_cache_elem->next) { - idx = xt_cache_elem - xt_cache->extent_array; - if (!check_bit(xt_cache->inflight_bitmap, idx)) { - xt_cache_replaceQ_delete(wc, xt_cache_elem); - break; - } - } - if (NULL == xt_cache_elem) - return -1; - } else { - idx = (unsigned)ret; - xt_cache_elem = &xt_cache->extent_array[idx]; + internal_fatal(rrdeng_main.tid != gettid(), "work_dispatch() can only be run from the event loop thread"); + + work_request = aral_mallocz(rrdeng_main.work_cmd.ar); + memset(work_request, 0, sizeof(struct rrdeng_work)); + work_request->req.data = work_request; + work_request->ctx = ctx; + work_request->data = data; + work_request->completion = completion; + work_request->work_cb = work_cb; + work_request->after_work_cb = after_work_cb; + work_request->opcode = opcode; + + if(uv_queue_work(&rrdeng_main.loop, &work_request->req, work_standard_worker, after_work_standard_callback)) { + internal_fatal(true, "DBENGINE: cannot queue work"); + work_done(work_request); + return false; } - xt_cache_elem->extent = extent; - xt_cache_elem->fileno = extent->datafile->fileno; - xt_cache_elem->inflight_io_descr = NULL; - xt_cache_replaceQ_insert(wc, xt_cache_elem); - modify_bit(&xt_cache->allocation_bitmap, idx, 1); - return (int)idx; + __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED); + + return true; } -/** - * Returns 0 if the cached extent was found in the extent cache, 1 otherwise. - * Sets *idx to point to the position of the extent inside the cache. - **/ -static uint8_t lookup_in_xt_cache(struct rrdengine_worker_config* wc, struct extent_info *extent, unsigned *idx) -{ - struct extent_cache *xt_cache = &wc->xt_cache; - struct extent_cache_element *xt_cache_elem; - unsigned i; +// ---------------------------------------------------------------------------- +// page descriptor cache + +void page_descriptors_init(void) { + rrdeng_main.descriptors.ar = aral_create( + "dbengine-descriptors", + sizeof(struct page_descr_with_data), + 0, + 65536 * 4, + NULL, + NULL, NULL, false, false); +} - for (i = 0 ; i < MAX_CACHED_EXTENTS ; ++i) { - xt_cache_elem = &xt_cache->extent_array[i]; - if (check_bit(xt_cache->allocation_bitmap, i) && xt_cache_elem->extent == extent && - xt_cache_elem->fileno == extent->datafile->fileno) { - *idx = i; - return 0; - } - } - return 1; +struct page_descr_with_data *page_descriptor_get(void) { + struct page_descr_with_data *descr = aral_mallocz(rrdeng_main.descriptors.ar); + memset(descr, 0, sizeof(struct page_descr_with_data)); + return descr; } -#if 0 /* disabled code */ -static void delete_from_xt_cache(struct rrdengine_worker_config* wc, unsigned idx) -{ - struct extent_cache *xt_cache = &wc->xt_cache; - struct extent_cache_element *xt_cache_elem; +static inline void page_descriptor_release(struct page_descr_with_data *descr) { + aral_freez(rrdeng_main.descriptors.ar, descr); +} - xt_cache_elem = &xt_cache->extent_array[idx]; - xt_cache_replaceQ_delete(wc, xt_cache_elem); - xt_cache_elem->extent = NULL; - modify_bit(&wc->xt_cache.allocation_bitmap, idx, 0); /* invalidate it */ - modify_bit(&wc->xt_cache.inflight_bitmap, idx, 0); /* not in-flight anymore */ +// ---------------------------------------------------------------------------- +// extent io descriptor cache + +static void extent_io_descriptor_init(void) { + rrdeng_main.xt_io_descr.ar = aral_create( + "dbengine-extent-io", + sizeof(struct extent_io_descriptor), + 0, + 65536, + NULL, + NULL, NULL, false, false + ); } -#endif -void enqueue_inflight_read_to_xt_cache(struct rrdengine_worker_config* wc, unsigned idx, - struct extent_io_descriptor *xt_io_descr) -{ - struct extent_cache *xt_cache = &wc->xt_cache; - struct extent_cache_element *xt_cache_elem; - struct extent_io_descriptor *old_next; +static struct extent_io_descriptor *extent_io_descriptor_get(void) { + struct extent_io_descriptor *xt_io_descr = aral_mallocz(rrdeng_main.xt_io_descr.ar); + memset(xt_io_descr, 0, sizeof(struct extent_io_descriptor)); + return xt_io_descr; +} - xt_cache_elem = &xt_cache->extent_array[idx]; - old_next = xt_cache_elem->inflight_io_descr->next; - xt_cache_elem->inflight_io_descr->next = xt_io_descr; - xt_io_descr->next = old_next; +static inline void extent_io_descriptor_release(struct extent_io_descriptor *xt_io_descr) { + aral_freez(rrdeng_main.xt_io_descr.ar, xt_io_descr); } -void read_cached_extent_cb(struct rrdengine_worker_config* wc, unsigned idx, struct extent_io_descriptor *xt_io_descr) -{ - unsigned i, j, page_offset; - struct rrdengine_instance *ctx = wc->ctx; - struct rrdeng_page_descr *descr; - struct page_cache_descr *pg_cache_descr; - void *page; - struct extent_info *extent = xt_io_descr->descr_array[0]->extent; - - for (i = 0 ; i < xt_io_descr->descr_count; ++i) { - page = dbengine_page_alloc(); - descr = xt_io_descr->descr_array[i]; - for (j = 0, page_offset = 0 ; j < extent->number_of_pages ; ++j) { - /* care, we don't hold the descriptor mutex */ - if (!uuid_compare(*extent->pages[j]->id, *descr->id) && - extent->pages[j]->page_length == descr->page_length && - extent->pages[j]->start_time_ut == descr->start_time_ut && - extent->pages[j]->end_time_ut == descr->end_time_ut) { - break; - } - page_offset += extent->pages[j]->page_length; +// ---------------------------------------------------------------------------- +// query handle cache + +void rrdeng_query_handle_init(void) { + rrdeng_main.handles.ar = aral_create( + "dbengine-query-handles", + sizeof(struct rrdeng_query_handle), + 0, + 65536, + NULL, + NULL, NULL, false, false); +} - } - /* care, we don't hold the descriptor mutex */ - (void) memcpy(page, wc->xt_cache.extent_array[idx].pages + page_offset, descr->page_length); - - rrdeng_page_descr_mutex_lock(ctx, descr); - pg_cache_descr = descr->pg_cache_descr; - pg_cache_descr->page = page; - pg_cache_descr->flags |= RRD_PAGE_POPULATED; - pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; - rrdeng_page_descr_mutex_unlock(ctx, descr); - pg_cache_replaceQ_insert(ctx, descr); - if (xt_io_descr->release_descr) { - pg_cache_put(ctx, descr); - } else { - debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); - pg_cache_wake_up_waiters(ctx, descr); - } +struct rrdeng_query_handle *rrdeng_query_handle_get(void) { + struct rrdeng_query_handle *handle = aral_mallocz(rrdeng_main.handles.ar); + memset(handle, 0, sizeof(struct rrdeng_query_handle)); + return handle; +} + +void rrdeng_query_handle_release(struct rrdeng_query_handle *handle) { + aral_freez(rrdeng_main.handles.ar, handle); +} + +// ---------------------------------------------------------------------------- +// WAL cache + +static struct { + struct { + SPINLOCK spinlock; + WAL *available_items; + size_t available; + } protected; + + struct { + size_t allocated; + } atomics; +} wal_globals = { + .protected = { + .spinlock = NETDATA_SPINLOCK_INITIALIZER, + .available_items = NULL, + .available = 0, + }, + .atomics = { + .allocated = 0, + }, +}; + +static void wal_cleanup1(void) { + WAL *wal = NULL; + + if(!netdata_spinlock_trylock(&wal_globals.protected.spinlock)) + return; + + if(wal_globals.protected.available_items && wal_globals.protected.available > storage_tiers) { + wal = wal_globals.protected.available_items; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next); + wal_globals.protected.available--; } - if (xt_io_descr->completion) - completion_mark_complete(xt_io_descr->completion); - freez(xt_io_descr); -} - -static void fill_page_with_nulls(void *page, uint32_t page_length, uint8_t type) { - switch(type) { - case PAGE_METRICS: { - storage_number n = pack_storage_number(NAN, SN_FLAG_NONE); - storage_number *array = (storage_number *)page; - size_t slots = page_length / sizeof(n); - for(size_t i = 0; i < slots ; i++) - array[i] = n; - } - break; - - case PAGE_TIER: { - storage_number_tier1_t n = { - .min_value = NAN, - .max_value = NAN, - .sum_value = NAN, - .count = 1, - .anomaly_count = 0, - }; - storage_number_tier1_t *array = (storage_number_tier1_t *)page; - size_t slots = page_length / sizeof(n); - for(size_t i = 0; i < slots ; i++) - array[i] = n; - } - break; - default: { - static bool logged = false; - if(!logged) { - error("DBENGINE: cannot fill page with nulls on unknown page type id %d", type); - logged = true; - } - memset(page, 0, page_length); - } + netdata_spinlock_unlock(&wal_globals.protected.spinlock); + + if(wal) { + posix_memfree(wal->buf); + freez(wal); + __atomic_sub_fetch(&wal_globals.atomics.allocated, 1, __ATOMIC_RELAXED); } } -struct rrdeng_page_descr *get_descriptor(struct pg_cache_page_index *page_index, time_t start_time_s) -{ - uv_rwlock_rdlock(&page_index->lock); - Pvoid_t *PValue = JudyLGet(page_index->JudyL_array, start_time_s, PJE0); - struct rrdeng_page_descr *descr = unlikely(NULL == PValue) ? NULL : *PValue; - uv_rwlock_rdunlock(&page_index->lock); - return descr; -}; +WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) { + if(!size || size > RRDENG_BLOCK_SIZE) + fatal("DBENGINE: invalid WAL size requested"); -static void do_extent_processing (struct rrdengine_worker_config *wc, struct extent_io_descriptor *xt_io_descr, bool read_failed) -{ - struct rrdengine_instance *ctx = wc->ctx; - struct page_cache *pg_cache = &ctx->pg_cache; - struct rrdeng_page_descr *descr; - struct page_cache_descr *pg_cache_descr; - int ret; - unsigned i, j, count; - void *page, *uncompressed_buf = NULL; - uint32_t payload_length, payload_offset, page_offset, uncompressed_payload_length = 0; - uint8_t have_read_error = 0; - /* persistent structures */ - struct rrdeng_df_extent_header *header; - struct rrdeng_df_extent_trailer *trailer; - uLong crc; + WAL *wal = NULL; - header = xt_io_descr->buf; - payload_length = header->payload_length; - count = header->number_of_pages; - payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count; - trailer = xt_io_descr->buf + xt_io_descr->bytes - sizeof(*trailer); - - if (unlikely(read_failed)) { - struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; - - ++ctx->stats.io_errors; - rrd_stat_atomic_add(&global_io_errors, 1); - have_read_error = 1; - error("%s: uv_fs_read - extent at offset %"PRIu64"(%u) in datafile %u-%u.", __func__, xt_io_descr->pos, - xt_io_descr->bytes, datafile->tier, datafile->fileno); - goto after_crc_check; + netdata_spinlock_lock(&wal_globals.protected.spinlock); + + if(likely(wal_globals.protected.available_items)) { + wal = wal_globals.protected.available_items; + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next); + wal_globals.protected.available--; } - crc = crc32(0L, Z_NULL, 0); - crc = crc32(crc, xt_io_descr->buf, xt_io_descr->bytes - sizeof(*trailer)); - ret = crc32cmp(trailer->checksum, crc); -#ifdef NETDATA_INTERNAL_CHECKS - { - struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; - debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: %s", __func__, - xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno, ret ? "FAILED" : "SUCCEEDED"); + + uint64_t transaction_id = __atomic_fetch_add(&ctx->atomic.transaction_id, 1, __ATOMIC_RELAXED); + netdata_spinlock_unlock(&wal_globals.protected.spinlock); + + if(unlikely(!wal)) { + wal = mallocz(sizeof(WAL)); + wal->buf_size = RRDENG_BLOCK_SIZE; + int ret = posix_memalign((void *)&wal->buf, RRDFILE_ALIGNMENT, wal->buf_size); + if (unlikely(ret)) + fatal("DBENGINE: posix_memalign:%s", strerror(ret)); + __atomic_add_fetch(&wal_globals.atomics.allocated, 1, __ATOMIC_RELAXED); } -#endif - if (unlikely(ret)) { - struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; - ++ctx->stats.io_errors; - rrd_stat_atomic_add(&global_io_errors, 1); - have_read_error = 1; - error("%s: Extent at offset %"PRIu64"(%u) was read from datafile %u-%u. CRC32 check: FAILED", __func__, - xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); + // these need to survive + unsigned buf_size = wal->buf_size; + void *buf = wal->buf; + + memset(wal, 0, sizeof(WAL)); + + // put them back + wal->buf_size = buf_size; + wal->buf = buf; + + memset(wal->buf, 0, wal->buf_size); + + wal->transaction_id = transaction_id; + wal->size = size; + + return wal; +} + +void wal_release(WAL *wal) { + if(unlikely(!wal)) return; + + netdata_spinlock_lock(&wal_globals.protected.spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next); + wal_globals.protected.available++; + netdata_spinlock_unlock(&wal_globals.protected.spinlock); +} + +// ---------------------------------------------------------------------------- +// command queue cache + +struct rrdeng_cmd { + struct rrdengine_instance *ctx; + enum rrdeng_opcode opcode; + void *data; + struct completion *completion; + enum storage_priority priority; + dequeue_callback_t dequeue_cb; + + struct { + struct rrdeng_cmd *prev; + struct rrdeng_cmd *next; + } queue; +}; + +static void rrdeng_cmd_queue_init(void) { + rrdeng_main.cmd_queue.ar = aral_create("dbengine-opcodes", + sizeof(struct rrdeng_cmd), + 0, + 65536, + NULL, + NULL, NULL, false, false); +} + +static inline STORAGE_PRIORITY rrdeng_enq_cmd_map_opcode_to_priority(enum rrdeng_opcode opcode, STORAGE_PRIORITY priority) { + if(unlikely(priority >= STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE)) + priority = STORAGE_PRIORITY_BEST_EFFORT; + + switch(opcode) { + case RRDENG_OPCODE_QUERY: + priority = STORAGE_PRIORITY_INTERNAL_QUERY_PREP; + break; + + default: + break; } -after_crc_check: - if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) { - uncompressed_payload_length = 0; - for (i = 0 ; i < count ; ++i) { - uncompressed_payload_length += header->descr[i].page_length; + return priority; +} + +void rrdeng_enqueue_epdl_cmd(struct rrdeng_cmd *cmd) { + epdl_cmd_queued(cmd->data, cmd); +} + +void rrdeng_dequeue_epdl_cmd(struct rrdeng_cmd *cmd) { + epdl_cmd_dequeued(cmd->data); +} + +void rrdeng_req_cmd(requeue_callback_t get_cmd_cb, void *data, STORAGE_PRIORITY priority) { + netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); + + struct rrdeng_cmd *cmd = get_cmd_cb(data); + if(cmd) { + priority = rrdeng_enq_cmd_map_opcode_to_priority(cmd->opcode, priority); + + if (cmd->priority > priority) { + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[cmd->priority], cmd, queue.prev, queue.next); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next); + cmd->priority = priority; } - uncompressed_buf = mallocz(uncompressed_payload_length); - ret = LZ4_decompress_safe(xt_io_descr->buf + payload_offset, uncompressed_buf, - payload_length, uncompressed_payload_length); - ctx->stats.before_decompress_bytes += payload_length; - ctx->stats.after_decompress_bytes += ret; - debug(D_RRDENGINE, "LZ4 decompressed %u bytes to %d bytes.", payload_length, ret); - /* care, we don't hold the descriptor mutex */ } - { - uint8_t xt_is_cached = 0; - unsigned xt_idx; - struct extent_info *extent = xt_io_descr->descr_array[0]->extent; - - xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); - if (xt_is_cached && check_bit(wc->xt_cache.inflight_bitmap, xt_idx)) { - struct extent_cache *xt_cache = &wc->xt_cache; - struct extent_cache_element *xt_cache_elem = &xt_cache->extent_array[xt_idx]; - struct extent_io_descriptor *curr, *next; - - if (have_read_error) { - memset(xt_cache_elem->pages, 0, sizeof(xt_cache_elem->pages)); - } else if (RRD_NO_COMPRESSION == header->compression_algorithm) { - (void)memcpy(xt_cache_elem->pages, xt_io_descr->buf + payload_offset, payload_length); - } else { - (void)memcpy(xt_cache_elem->pages, uncompressed_buf, uncompressed_payload_length); - } - /* complete all connected in-flight read requests */ - for (curr = xt_cache_elem->inflight_io_descr->next ; curr ; curr = next) { - next = curr->next; - read_cached_extent_cb(wc, xt_idx, curr); + + netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); +} + +void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, void *data, struct completion *completion, + enum storage_priority priority, enqueue_callback_t enqueue_cb, dequeue_callback_t dequeue_cb) { + + priority = rrdeng_enq_cmd_map_opcode_to_priority(opcode, priority); + + struct rrdeng_cmd *cmd = aral_mallocz(rrdeng_main.cmd_queue.ar); + memset(cmd, 0, sizeof(struct rrdeng_cmd)); + cmd->ctx = ctx; + cmd->opcode = opcode; + cmd->data = data; + cmd->completion = completion; + cmd->priority = priority; + cmd->dequeue_cb = dequeue_cb; + + netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next); + rrdeng_main.cmd_queue.unsafe.waiting++; + if(enqueue_cb) + enqueue_cb(cmd); + netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); + + fatal_assert(0 == uv_async_send(&rrdeng_main.async)); +} + +static inline bool rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(STORAGE_PRIORITY priority, STORAGE_PRIORITY max_priority) { + for(; priority <= max_priority ; priority++) + if(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority]) + return true; + + return false; +} + +static inline struct rrdeng_cmd rrdeng_deq_cmd(void) { + struct rrdeng_cmd *cmd = NULL; + + STORAGE_PRIORITY max_priority = work_request_full() ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_BEST_EFFORT; + + // find an opcode to execute from the queue + netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock); + for(STORAGE_PRIORITY priority = STORAGE_PRIORITY_INTERNAL_DBENGINE; priority <= max_priority ; priority++) { + cmd = rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority]; + if(cmd) { + + // avoid starvation of lower priorities + if(unlikely(priority >= STORAGE_PRIORITY_HIGH && + priority < STORAGE_PRIORITY_BEST_EFFORT && + ++rrdeng_main.cmd_queue.unsafe.executed_by_priority[priority] % 50 == 0 && + rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(priority + 1, max_priority))) { + // let the others run 2% of the requests + cmd = NULL; + continue; } - xt_cache_elem->inflight_io_descr = NULL; - modify_bit(&xt_cache->inflight_bitmap, xt_idx, 0); /* not in-flight anymore */ + + // remove it from the queue + DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next); + rrdeng_main.cmd_queue.unsafe.waiting--; + break; } } - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, xt_io_descr->descr_array[0]->id, sizeof(uuid_t)); - struct pg_cache_page_index *page_index = likely( NULL != PValue) ? *PValue : NULL; - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - - - for (i = 0, page_offset = 0; i < count; page_offset += header->descr[i++].page_length) { - uint8_t is_prefetched_page; - descr = NULL; - for (j = 0 ; j < xt_io_descr->descr_count; ++j) { - struct rrdeng_page_descr descrj; - - descrj = xt_io_descr->descr_read_array[j]; - /* care, we don't hold the descriptor mutex */ - if (!uuid_compare(*(uuid_t *) header->descr[i].uuid, *descrj.id) && - header->descr[i].page_length == descrj.page_length && - header->descr[i].start_time_ut == descrj.start_time_ut && - header->descr[i].end_time_ut == descrj.end_time_ut) { - //descr = descrj; - descr = get_descriptor(page_index, (time_t) (descrj.start_time_ut / USEC_PER_SEC)); - if (unlikely(!descr)) { - error_limit_static_thread_var(erl, 1, 0); - error_limit(&erl, "%s: Required descriptor is not in the page index anymore", __FUNCTION__); - } - break; - } - } - is_prefetched_page = 0; - if (!descr) { /* This extent page has not been requested. Try populating it for locality (best effort). */ - descr = pg_cache_lookup_unpopulated_and_lock(ctx, (uuid_t *)header->descr[i].uuid, - header->descr[i].start_time_ut); - if (!descr) - continue; /* Failed to reserve a suitable page */ - is_prefetched_page = 1; - } - page = dbengine_page_alloc(); - - /* care, we don't hold the descriptor mutex */ - if (have_read_error) { - fill_page_with_nulls(page, descr->page_length, descr->type); - } else if (RRD_NO_COMPRESSION == header->compression_algorithm) { - (void) memcpy(page, xt_io_descr->buf + payload_offset + page_offset, descr->page_length); - } else { - (void) memcpy(page, uncompressed_buf + page_offset, descr->page_length); - } - rrdeng_page_descr_mutex_lock(ctx, descr); - pg_cache_descr = descr->pg_cache_descr; - pg_cache_descr->page = page; - pg_cache_descr->flags |= RRD_PAGE_POPULATED; - pg_cache_descr->flags &= ~RRD_PAGE_READ_PENDING; - rrdeng_page_descr_mutex_unlock(ctx, descr); - pg_cache_replaceQ_insert(ctx, descr); - if (xt_io_descr->release_descr || is_prefetched_page) { - pg_cache_put(ctx, descr); - } else { - debug(D_RRDENGINE, "%s: Waking up waiters.", __func__); - pg_cache_wake_up_waiters(ctx, descr); - } + if(cmd && cmd->dequeue_cb) { + cmd->dequeue_cb(cmd); + cmd->dequeue_cb = NULL; } - if (!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm) { - freez(uncompressed_buf); + + netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock); + + struct rrdeng_cmd ret; + if(cmd) { + // copy it, to return it + ret = *cmd; + + aral_freez(rrdeng_main.cmd_queue.ar, cmd); } - if (xt_io_descr->completion) - completion_mark_complete(xt_io_descr->completion); + else + ret = (struct rrdeng_cmd) { + .ctx = NULL, + .opcode = RRDENG_OPCODE_NOOP, + .priority = STORAGE_PRIORITY_BEST_EFFORT, + .completion = NULL, + .data = NULL, + }; + + return ret; } -static void read_extent_cb(uv_fs_t *req) -{ - struct rrdengine_worker_config *wc = req->loop->data; - struct extent_io_descriptor *xt_io_descr; - xt_io_descr = req->data; - do_extent_processing(wc, xt_io_descr, req->result < 0); - uv_fs_req_cleanup(req); - posix_memfree(xt_io_descr->buf); - freez(xt_io_descr); +// ---------------------------------------------------------------------------- + +struct { + ARAL *aral[RRD_STORAGE_TIERS]; +} dbengine_page_alloc_globals = {}; + +static inline ARAL *page_size_lookup(size_t size) { + for(size_t tier = 0; tier < storage_tiers ;tier++) + if(size == tier_page_size[tier]) + return dbengine_page_alloc_globals.aral[tier]; + + return NULL; } -static void read_mmap_extent_cb(uv_work_t *req, int status __maybe_unused) -{ - struct rrdengine_worker_config *wc = req->loop->data; - struct rrdengine_instance *ctx = wc->ctx; - struct extent_io_descriptor *xt_io_descr; - xt_io_descr = req->data; +static void dbengine_page_alloc_init(void) { + for(size_t i = storage_tiers; i > 0 ;i--) { + size_t tier = storage_tiers - i; - if (likely(xt_io_descr->map_base)) { - do_extent_processing(wc, xt_io_descr, false); - munmap(xt_io_descr->map_base, xt_io_descr->map_length); - freez(xt_io_descr); - return; - } + char buf[20 + 1]; + snprintfz(buf, 20, "tier%zu-pages", tier); - // MMAP failed, so do uv_fs_read - int ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(xt_io_descr->bytes)); - if (unlikely(ret)) { - fatal("posix_memalign:%s", strerror(ret)); + dbengine_page_alloc_globals.aral[tier] = aral_create( + buf, + tier_page_size[tier], + 64, + 512 * tier_page_size[tier], + pgc_aral_statistics(), + NULL, NULL, false, false); } - unsigned real_io_size = ALIGN_BYTES_CEILING( xt_io_descr->bytes); - xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); - xt_io_descr->req.data = xt_io_descr; - ret = uv_fs_read(req->loop, &xt_io_descr->req, xt_io_descr->file, &xt_io_descr->iov, 1, (unsigned) xt_io_descr->pos, read_extent_cb); - fatal_assert(-1 != ret); - ctx->stats.io_read_bytes += real_io_size; - ctx->stats.io_read_extent_bytes += real_io_size; } -static void do_mmap_read_extent(uv_work_t *req) -{ - struct extent_io_descriptor *xt_io_descr = (struct extent_io_descriptor * )req->data; - struct rrdengine_worker_config *wc = req->loop->data; - struct rrdengine_instance *ctx = wc->ctx; - - off_t map_start = ALIGN_BYTES_FLOOR(xt_io_descr->pos); - size_t length = ALIGN_BYTES_CEILING(xt_io_descr->pos + xt_io_descr->bytes) - map_start; - unsigned real_io_size = xt_io_descr->bytes; - - void *data = mmap(NULL, length, PROT_READ, MAP_SHARED, xt_io_descr->file, map_start); - if (likely(data != MAP_FAILED)) { - xt_io_descr->map_base = data; - xt_io_descr->map_length = length; - xt_io_descr->buf = data + (xt_io_descr->pos - map_start); - ctx->stats.io_read_bytes += real_io_size; - ctx->stats.io_read_extent_bytes += real_io_size; - } +void *dbengine_page_alloc(size_t size) { + ARAL *ar = page_size_lookup(size); + if(ar) return aral_mallocz(ar); + + return mallocz(size); } -static void do_read_extent(struct rrdengine_worker_config* wc, - struct rrdeng_page_descr **descr, - unsigned count, - uint8_t release_descr) -{ - struct rrdengine_instance *ctx = wc->ctx; - struct page_cache_descr *pg_cache_descr; - int ret; - unsigned i, size_bytes, pos; - struct extent_io_descriptor *xt_io_descr; - struct rrdengine_datafile *datafile; - struct extent_info *extent = descr[0]->extent; - uint8_t xt_is_cached = 0, xt_is_inflight = 0; - unsigned xt_idx; - - datafile = extent->datafile; - pos = extent->offset; - size_bytes = extent->size; - - xt_io_descr = callocz(1, sizeof(*xt_io_descr)); - for (i = 0 ; i < count; ++i) { - rrdeng_page_descr_mutex_lock(ctx, descr[i]); - pg_cache_descr = descr[i]->pg_cache_descr; - pg_cache_descr->flags |= RRD_PAGE_READ_PENDING; - rrdeng_page_descr_mutex_unlock(ctx, descr[i]); - xt_io_descr->descr_array[i] = descr[i]; - xt_io_descr->descr_read_array[i] = *(descr[i]); - } - xt_io_descr->descr_count = count; - xt_io_descr->file = datafile->file; - xt_io_descr->bytes = size_bytes; - xt_io_descr->pos = pos; - xt_io_descr->req_worker.data = xt_io_descr; - xt_io_descr->completion = NULL; - xt_io_descr->release_descr = release_descr; - xt_io_descr->buf = NULL; - - xt_is_cached = !lookup_in_xt_cache(wc, extent, &xt_idx); - if (xt_is_cached) { - xt_cache_replaceQ_set_hot(wc, &wc->xt_cache.extent_array[xt_idx]); - xt_is_inflight = check_bit(wc->xt_cache.inflight_bitmap, xt_idx); - if (xt_is_inflight) { - enqueue_inflight_read_to_xt_cache(wc, xt_idx, xt_io_descr); - return; - } - return read_cached_extent_cb(wc, xt_idx, xt_io_descr); - } else { - ret = try_insert_into_xt_cache(wc, extent); - if (-1 != ret) { - xt_idx = (unsigned)ret; - modify_bit(&wc->xt_cache.inflight_bitmap, xt_idx, 1); - wc->xt_cache.extent_array[xt_idx].inflight_io_descr = xt_io_descr; - } - } +void dbengine_page_free(void *page, size_t size __maybe_unused) { + if(unlikely(!page || page == DBENGINE_EMPTY_PAGE)) + return; - ret = uv_queue_work(wc->loop, &xt_io_descr->req_worker, do_mmap_read_extent, read_mmap_extent_cb); - fatal_assert(-1 != ret); + ARAL *ar = page_size_lookup(size); + if(ar) + aral_freez(ar, page); + else + freez(page); +} - ++ctx->stats.io_read_requests; - ++ctx->stats.io_read_extents; - ctx->stats.pg_cache_backfills += count; +// ---------------------------------------------------------------------------- + +void *dbengine_extent_alloc(size_t size) { + void *extent = mallocz(size); + return extent; } -static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent_io_descriptor *xt_io_descr) -{ - struct rrdengine_instance *ctx = wc->ctx; +void dbengine_extent_free(void *extent, size_t size __maybe_unused) { + freez(extent); +} + +static void journalfile_extent_build(struct rrdengine_instance *ctx, struct extent_io_descriptor *xt_io_descr) { unsigned count, payload_length, descr_size, size_bytes; void *buf; /* persistent structures */ @@ -582,12 +596,13 @@ static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent payload_length = sizeof(*jf_metric_data) + descr_size; size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer); - buf = wal_get_transaction_buffer(wc, size_bytes); + xt_io_descr->wal = wal_get(ctx, size_bytes); + buf = xt_io_descr->wal->buf; jf_header = buf; jf_header->type = STORE_DATA; jf_header->reserved = 0; - jf_header->id = ctx->commit_log.transaction_id++; + jf_header->id = xt_io_descr->wal->transaction_id; jf_header->payload_length = payload_length; jf_metric_data = buf + sizeof(*jf_header); @@ -602,265 +617,210 @@ static void commit_data_extent(struct rrdengine_worker_config* wc, struct extent crc32set(jf_trailer->checksum, crc); } -static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t type, void *data) -{ - switch (type) { - case STORE_DATA: - commit_data_extent(wc, (struct extent_io_descriptor *)data); - break; - default: - fatal_assert(type == STORE_DATA); - break; - } -} +static void after_extent_flushed_to_open(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + if(completion) + completion_mark_complete(completion); -static void after_invalidate_oldest_committed(struct rrdengine_worker_config* wc) -{ - int error; - - error = uv_thread_join(wc->now_invalidating_dirty_pages); - if (error) { - error("uv_thread_join(): %s", uv_strerror(error)); - } - freez(wc->now_invalidating_dirty_pages); - wc->now_invalidating_dirty_pages = NULL; - wc->cleanup_thread_invalidating_dirty_pages = 0; + if(ctx_is_available_for_queries(ctx)) + rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); } -static void invalidate_oldest_committed(void *arg) -{ - struct rrdengine_instance *ctx = arg; - struct rrdengine_worker_config *wc = &ctx->worker_config; - struct page_cache *pg_cache = &ctx->pg_cache; - int ret; - struct rrdeng_page_descr *descr; - struct page_cache_descr *pg_cache_descr; - Pvoid_t *PValue; - Word_t Index; - unsigned nr_committed_pages; +static void *extent_flushed_to_open_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + worker_is_busy(UV_EVENT_DBENGINE_FLUSHED_TO_OPEN); - do { - uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); - for (Index = 0, - PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), - descr = unlikely(NULL == PValue) ? NULL : *PValue; + uv_fs_t *uv_fs_request = data; + struct extent_io_descriptor *xt_io_descr = uv_fs_request->data; + struct page_descr_with_data *descr; + struct rrdengine_datafile *datafile; + unsigned i; - descr != NULL; + datafile = xt_io_descr->datafile; - PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), - descr = unlikely(NULL == PValue) ? NULL : *PValue) { - fatal_assert(0 != descr->page_length); + bool still_running = ctx_is_available_for_queries(ctx); - rrdeng_page_descr_mutex_lock(ctx, descr); - pg_cache_descr = descr->pg_cache_descr; - if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING) && pg_cache_try_get_unsafe(descr, 1)) { - rrdeng_page_descr_mutex_unlock(ctx, descr); + for (i = 0 ; i < xt_io_descr->descr_count ; ++i) { + descr = xt_io_descr->descr_array[i]; - ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0); - fatal_assert(1 == ret); - break; - } - rrdeng_page_descr_mutex_unlock(ctx, descr); - } - uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); + if (likely(still_running)) + pgc_open_add_hot_page( + (Word_t)ctx, descr->metric_id, + (time_t) (descr->start_time_ut / USEC_PER_SEC), + (time_t) (descr->end_time_ut / USEC_PER_SEC), + descr->update_every_s, + datafile, + xt_io_descr->pos, xt_io_descr->bytes, descr->page_length); - if (!descr) { - info("Failed to invalidate any dirty pages to relieve page cache pressure."); + page_descriptor_release(descr); + } - goto out; - } - pg_cache_punch_hole(ctx, descr, 1, 1, NULL); + uv_fs_req_cleanup(uv_fs_request); + posix_memfree(xt_io_descr->buf); + extent_io_descriptor_release(xt_io_descr); - uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); - nr_committed_pages = --pg_cache->committed_page_index.nr_committed_pages; - uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); - rrd_stat_atomic_add(&ctx->stats.flushing_pressure_page_deletions, 1); - rrd_stat_atomic_add(&global_flushing_pressure_page_deletions, 1); + netdata_spinlock_lock(&datafile->writers.spinlock); + datafile->writers.flushed_to_open_running--; + netdata_spinlock_unlock(&datafile->writers.spinlock); - } while (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)); -out: - wc->cleanup_thread_invalidating_dirty_pages = 1; - /* wake up event loop */ - fatal_assert(0 == uv_async_send(&wc->async)); -} + if(datafile->fileno != ctx_last_fileno_get(ctx) && still_running) + // we just finished a flushing on a datafile that is not the active one + rrdeng_enq_cmd(ctx, RRDENG_OPCODE_JOURNAL_INDEX, datafile, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); -void rrdeng_invalidate_oldest_committed(struct rrdengine_worker_config* wc) -{ - struct rrdengine_instance *ctx = wc->ctx; - struct page_cache *pg_cache = &ctx->pg_cache; - unsigned nr_committed_pages; - int error; + return data; +} - if (unlikely(ctx->quiesce != NO_QUIESCE)) /* Shutting down */ - return; +// Main event loop callback +static void after_extent_write_datafile_io(uv_fs_t *uv_fs_request) { + worker_is_busy(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_WRITE); - uv_rwlock_rdlock(&pg_cache->committed_page_index.lock); - nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages; - uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock); + struct extent_io_descriptor *xt_io_descr = uv_fs_request->data; + struct rrdengine_datafile *datafile = xt_io_descr->datafile; + struct rrdengine_instance *ctx = datafile->ctx; - if (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) { - /* delete the oldest page in memory */ - if (wc->now_invalidating_dirty_pages) { - /* already deleting a page */ - return; - } - errno = 0; - error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". " - "Metric data are being deleted, please reduce disk load or use a faster disk.", ctx->dbfiles_path); - - wc->now_invalidating_dirty_pages = mallocz(sizeof(*wc->now_invalidating_dirty_pages)); - wc->cleanup_thread_invalidating_dirty_pages = 0; - - error = uv_thread_create(wc->now_invalidating_dirty_pages, invalidate_oldest_committed, ctx); - if (error) { - error("uv_thread_create(): %s", uv_strerror(error)); - freez(wc->now_invalidating_dirty_pages); - wc->now_invalidating_dirty_pages = NULL; - } + if (uv_fs_request->result < 0) { + ctx_io_error(ctx); + error("DBENGINE: %s: uv_fs_write(): %s", __func__, uv_strerror((int)uv_fs_request->result)); } + + journalfile_v1_extent_write(ctx, xt_io_descr->datafile, xt_io_descr->wal, &rrdeng_main.loop); + + netdata_spinlock_lock(&datafile->writers.spinlock); + datafile->writers.running--; + datafile->writers.flushed_to_open_running++; + netdata_spinlock_unlock(&datafile->writers.spinlock); + + rrdeng_enq_cmd(xt_io_descr->ctx, + RRDENG_OPCODE_FLUSHED_TO_OPEN, + uv_fs_request, + xt_io_descr->completion, + STORAGE_PRIORITY_INTERNAL_DBENGINE, + NULL, + NULL); + + worker_is_idle(); } -void flush_pages_cb(uv_fs_t* req) -{ - struct rrdengine_worker_config* wc = req->loop->data; - struct rrdengine_instance *ctx = wc->ctx; - struct page_cache *pg_cache = &ctx->pg_cache; - struct extent_io_descriptor *xt_io_descr; - struct rrdeng_page_descr *descr; - struct page_cache_descr *pg_cache_descr; - unsigned i, count; - - xt_io_descr = req->data; - if (req->result < 0) { - ++ctx->stats.io_errors; - rrd_stat_atomic_add(&global_io_errors, 1); - error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result)); - } -#ifdef NETDATA_INTERNAL_CHECKS - { - struct rrdengine_datafile *datafile = xt_io_descr->descr_array[0]->extent->datafile; - debug(D_RRDENGINE, "%s: Extent at offset %"PRIu64"(%u) was written to datafile %u-%u. Waking up waiters.", - __func__, xt_io_descr->pos, xt_io_descr->bytes, datafile->tier, datafile->fileno); - } -#endif - count = xt_io_descr->descr_count; - for (i = 0 ; i < count ; ++i) { - /* care, we don't hold the descriptor mutex */ - descr = xt_io_descr->descr_array[i]; +static bool datafile_is_full(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) { + bool ret = false; + netdata_spinlock_lock(&datafile->writers.spinlock); - pg_cache_replaceQ_insert(ctx, descr); + if(ctx_is_available_for_queries(ctx) && datafile->pos > rrdeng_target_data_file_size(ctx)) + ret = true; - rrdeng_page_descr_mutex_lock(ctx, descr); - pg_cache_descr = descr->pg_cache_descr; - pg_cache_descr->flags &= ~(RRD_PAGE_DIRTY | RRD_PAGE_WRITE_PENDING); - /* wake up waiters, care no reference being held */ - pg_cache_wake_up_waiters_unsafe(descr); - rrdeng_page_descr_mutex_unlock(ctx, descr); - } - if (xt_io_descr->completion) - completion_mark_complete(xt_io_descr->completion); - uv_fs_req_cleanup(req); - posix_memfree(xt_io_descr->buf); - freez(xt_io_descr); + netdata_spinlock_unlock(&datafile->writers.spinlock); + + return ret; +} + +static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_instance *ctx) { + struct rrdengine_datafile *datafile; - uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); - pg_cache->committed_page_index.nr_committed_pages -= count; - uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); - wc->inflight_dirty_pages -= count; + // get the latest datafile + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + datafile = ctx->datafiles.first->prev; + // become a writer on this datafile, to prevent it from vanishing + netdata_spinlock_lock(&datafile->writers.spinlock); + datafile->writers.running++; + netdata_spinlock_unlock(&datafile->writers.spinlock); + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + + if(datafile_is_full(ctx, datafile)) { + // remember the datafile we have become writers to + struct rrdengine_datafile *old_datafile = datafile; + + // only 1 datafile creation at a time + static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER; + netdata_mutex_lock(&mutex); + + // take the latest datafile again - without this, multiple threads may create multiple files + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + datafile = ctx->datafiles.first->prev; + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + + if(datafile_is_full(ctx, datafile) && create_new_datafile_pair(ctx) == 0) + rrdeng_enq_cmd(ctx, RRDENG_OPCODE_JOURNAL_INDEX, datafile, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, + NULL); + + netdata_mutex_unlock(&mutex); + + // get the new latest datafile again, like above + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + datafile = ctx->datafiles.first->prev; + // become a writer on this datafile, to prevent it from vanishing + netdata_spinlock_lock(&datafile->writers.spinlock); + datafile->writers.running++; + netdata_spinlock_unlock(&datafile->writers.spinlock); + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + + // release the writers on the old datafile + netdata_spinlock_lock(&old_datafile->writers.spinlock); + old_datafile->writers.running--; + netdata_spinlock_unlock(&old_datafile->writers.spinlock); + } + + return datafile; } /* - * completion must be NULL or valid. - * Returns 0 when no flushing can take place. - * Returns datafile bytes to be written on successful flushing initiation. + * Take a page list in a judy array and write them */ -static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct completion *completion) -{ - struct rrdengine_instance *ctx = wc->ctx; - struct page_cache *pg_cache = &ctx->pg_cache; +static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_instance *ctx, struct page_descr_with_data *base, struct completion *completion) { int ret; int compressed_size, max_compressed_size = 0; unsigned i, count, size_bytes, pos, real_io_size; uint32_t uncompressed_payload_length, payload_offset; - struct rrdeng_page_descr *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; - struct page_cache_descr *pg_cache_descr; + struct page_descr_with_data *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; struct extent_io_descriptor *xt_io_descr; + struct extent_buffer *eb = NULL; void *compressed_buf = NULL; - Word_t descr_commit_idx_array[MAX_PAGES_PER_EXTENT]; - Pvoid_t *PValue; Word_t Index; - uint8_t compression_algorithm = ctx->global_compress_alg; - struct extent_info *extent; + uint8_t compression_algorithm = ctx->config.global_compress_alg; struct rrdengine_datafile *datafile; /* persistent structures */ struct rrdeng_df_extent_header *header; struct rrdeng_df_extent_trailer *trailer; uLong crc; - if (force) { - debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure."); - } - uv_rwlock_wrlock(&pg_cache->committed_page_index.lock); - for (Index = 0, count = 0, uncompressed_payload_length = 0, - PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), - descr = unlikely(NULL == PValue) ? NULL : *PValue ; - - descr != NULL && count != rrdeng_pages_per_extent; - - PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0), - descr = unlikely(NULL == PValue) ? NULL : *PValue) { - uint8_t page_write_pending; - - fatal_assert(0 != descr->page_length); - page_write_pending = 0; - - rrdeng_page_descr_mutex_lock(ctx, descr); - pg_cache_descr = descr->pg_cache_descr; - if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING)) { - page_write_pending = 1; - /* care, no reference being held */ - pg_cache_descr->flags |= RRD_PAGE_WRITE_PENDING; - uncompressed_payload_length += descr->page_length; - descr_commit_idx_array[count] = Index; - eligible_pages[count++] = descr; - } - rrdeng_page_descr_mutex_unlock(ctx, descr); + for(descr = base, Index = 0, count = 0, uncompressed_payload_length = 0; + descr && count != rrdeng_pages_per_extent; + descr = descr->link.next, Index++) { + + uncompressed_payload_length += descr->page_length; + eligible_pages[count++] = descr; - if (page_write_pending) { - ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0); - fatal_assert(1 == ret); - } } - uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock); if (!count) { - debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__); if (completion) completion_mark_complete(completion); - return 0; + + __atomic_sub_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED); + return NULL; } - wc->inflight_dirty_pages += count; - xt_io_descr = mallocz(sizeof(*xt_io_descr)); + xt_io_descr = extent_io_descriptor_get(); + xt_io_descr->ctx = ctx; payload_offset = sizeof(*header) + count * sizeof(header->descr[0]); switch (compression_algorithm) { - case RRD_NO_COMPRESSION: - size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer); - break; - default: /* Compress */ - fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); - max_compressed_size = LZ4_compressBound(uncompressed_payload_length); - compressed_buf = mallocz(max_compressed_size); - size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer); - break; + case RRD_NO_COMPRESSION: + size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer); + break; + + default: /* Compress */ + fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); + max_compressed_size = LZ4_compressBound(uncompressed_payload_length); + eb = extent_buffer_get(max_compressed_size); + compressed_buf = eb->data; + size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer); + break; } + ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); if (unlikely(ret)) { - fatal("posix_memalign:%s", strerror(ret)); + fatal("DBENGINE: posix_memalign:%s", strerror(ret)); /* freez(xt_io_descr);*/ } memset(xt_io_descr->buf, 0, ALIGN_BYTES_CEILING(size_bytes)); - (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct rrdeng_page_descr *) * count); + (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct page_descr_with_data *) * count); xt_io_descr->descr_count = count; pos = 0; @@ -869,17 +829,7 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct header->number_of_pages = count; pos += sizeof(*header); - extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0])); - datafile = ctx->datafiles.last; /* TODO: check for exceeded size quota */ - extent->offset = datafile->pos; - extent->number_of_pages = count; - extent->datafile = datafile; - extent->next = NULL; - for (i = 0 ; i < count ; ++i) { - /* This is here for performance reasons */ - xt_io_descr->descr_commit_idx_array[i] = descr_commit_idx_array[i]; - descr = xt_io_descr->descr_array[i]; header->descr[i].type = descr->type; uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id); @@ -890,35 +840,40 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct } for (i = 0 ; i < count ; ++i) { descr = xt_io_descr->descr_array[i]; - /* care, we don't hold the descriptor mutex */ - (void) memcpy(xt_io_descr->buf + pos, descr->pg_cache_descr->page, descr->page_length); - descr->extent = extent; - extent->pages[i] = descr; - + (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length); pos += descr->page_length; } - df_extent_insert(extent); - switch (compression_algorithm) { - case RRD_NO_COMPRESSION: - header->payload_length = uncompressed_payload_length; - break; - default: /* Compress */ - compressed_size = LZ4_compress_default(xt_io_descr->buf + payload_offset, compressed_buf, - uncompressed_payload_length, max_compressed_size); - ctx->stats.before_compress_bytes += uncompressed_payload_length; - ctx->stats.after_compress_bytes += compressed_size; - debug(D_RRDENGINE, "LZ4 compressed %"PRIu32" bytes to %d bytes.", uncompressed_payload_length, compressed_size); + if(likely(compression_algorithm == RRD_LZ4)) { + compressed_size = LZ4_compress_default( + xt_io_descr->buf + payload_offset, + compressed_buf, + (int)uncompressed_payload_length, + max_compressed_size); + + __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED); + __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED); + (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size); - freez(compressed_buf); + extent_buffer_release(eb); size_bytes = payload_offset + compressed_size + sizeof(*trailer); header->payload_length = compressed_size; - break; } - extent->size = size_bytes; - xt_io_descr->bytes = size_bytes; + else { // RRD_NO_COMPRESSION + header->payload_length = uncompressed_payload_length; + } + + real_io_size = ALIGN_BYTES_CEILING(size_bytes); + + datafile = get_datafile_to_write_extent(ctx); + netdata_spinlock_lock(&datafile->writers.spinlock); + xt_io_descr->datafile = datafile; xt_io_descr->pos = datafile->pos; - xt_io_descr->req.data = xt_io_descr; + datafile->pos += real_io_size; + netdata_spinlock_unlock(&datafile->writers.spinlock); + + xt_io_descr->bytes = size_bytes; + xt_io_descr->uv_fs_request.data = xt_io_descr; xt_io_descr->completion = completion; trailer = xt_io_descr->buf + size_bytes - sizeof(*trailer); @@ -926,324 +881,508 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct crc = crc32(crc, xt_io_descr->buf, size_bytes - sizeof(*trailer)); crc32set(trailer->checksum, crc); - real_io_size = ALIGN_BYTES_CEILING(size_bytes); xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size); - ret = uv_fs_write(wc->loop, &xt_io_descr->req, datafile->file, &xt_io_descr->iov, 1, datafile->pos, flush_pages_cb); - fatal_assert(-1 != ret); - ctx->stats.io_write_bytes += real_io_size; - ++ctx->stats.io_write_requests; - ctx->stats.io_write_extent_bytes += real_io_size; - ++ctx->stats.io_write_extents; - do_commit_transaction(wc, STORE_DATA, xt_io_descr); - datafile->pos += ALIGN_BYTES_CEILING(size_bytes); - ctx->disk_space += ALIGN_BYTES_CEILING(size_bytes); - rrdeng_test_quota(wc); + journalfile_extent_build(ctx, xt_io_descr); + + ctx_last_flush_fileno_set(ctx, datafile->fileno); + ctx_current_disk_space_increase(ctx, real_io_size); + ctx_io_write_op_bytes(ctx, real_io_size); - return ALIGN_BYTES_CEILING(size_bytes); + return xt_io_descr; } -static void after_delete_old_data(struct rrdengine_worker_config* wc) -{ - struct rrdengine_instance *ctx = wc->ctx; - struct rrdengine_datafile *datafile; - struct rrdengine_journalfile *journalfile; - unsigned deleted_bytes, journalfile_bytes, datafile_bytes; - int ret, error; - char path[RRDENG_PATH_MAX]; +static void after_extent_write(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* uv_work_req __maybe_unused, int status __maybe_unused) { + struct extent_io_descriptor *xt_io_descr = data; - datafile = ctx->datafiles.first; - journalfile = datafile->journalfile; - datafile_bytes = datafile->pos; - journalfile_bytes = journalfile->pos; - deleted_bytes = 0; + if(xt_io_descr) { + int ret = uv_fs_write(&rrdeng_main.loop, + &xt_io_descr->uv_fs_request, + xt_io_descr->datafile->file, + &xt_io_descr->iov, + 1, + (int64_t) xt_io_descr->pos, + after_extent_write_datafile_io); - info("Deleting data and journal file pair."); - datafile_list_delete(ctx, datafile); - ret = destroy_journal_file(journalfile, datafile); - if (!ret) { - generate_journalfilepath(datafile, path, sizeof(path)); - info("Deleted journal file \"%s\".", path); - deleted_bytes += journalfile_bytes; - } - ret = destroy_data_file(datafile); - if (!ret) { - generate_datafilepath(datafile, path, sizeof(path)); - info("Deleted data file \"%s\".", path); - deleted_bytes += datafile_bytes; + fatal_assert(-1 != ret); } - freez(journalfile); - freez(datafile); +} - ctx->disk_space -= deleted_bytes; - info("Reclaimed %u bytes of disk space.", deleted_bytes); +static void *extent_write_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + worker_is_busy(UV_EVENT_DBENGINE_EXTENT_WRITE); + struct page_descr_with_data *base = data; + struct extent_io_descriptor *xt_io_descr = datafile_extent_build(ctx, base, completion); + return xt_io_descr; +} - error = uv_thread_join(wc->now_deleting_files); - if (error) { - error("uv_thread_join(): %s", uv_strerror(error)); - } - freez(wc->now_deleting_files); - /* unfreeze command processing */ - wc->now_deleting_files = NULL; +static void after_database_rotate(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + __atomic_store_n(&ctx->atomic.now_deleting_files, false, __ATOMIC_RELAXED); +} - wc->cleanup_thread_deleting_files = 0; - rrdcontext_db_rotation(); +struct uuid_first_time_s { + uuid_t *uuid; + time_t first_time_s; + METRIC *metric; + size_t pages_found; + size_t df_matched; + size_t df_index_oldest; +}; - /* interrupt event loop */ - uv_stop(wc->loop); +static int journal_metric_compare(const void *key, const void *metric) +{ + return uuid_compare(*(uuid_t *) key, ((struct journal_metric_list *) metric)->uuid); } -static void delete_old_data(void *arg) -{ - struct rrdengine_instance *ctx = arg; - struct rrdengine_worker_config* wc = &ctx->worker_config; - struct rrdengine_datafile *datafile; - struct extent_info *extent, *next; - struct rrdeng_page_descr *descr; - unsigned count, i; - uint8_t can_delete_metric; - uuid_t metric_id; - - /* Safe to use since it will be deleted after we are done */ - datafile = ctx->datafiles.first; - - for (extent = datafile->extents.first ; extent != NULL ; extent = next) { - count = extent->number_of_pages; - for (i = 0 ; i < count ; ++i) { - descr = extent->pages[i]; - can_delete_metric = pg_cache_punch_hole(ctx, descr, 0, 0, &metric_id); - if (unlikely(can_delete_metric)) { - /* - * If the metric is empty, has no active writers and if the metadata log has been initialized then - * attempt to delete the corresponding netdata dimension. - */ - metaqueue_delete_dimension_uuid(&metric_id); - } - } - next = extent->next; - freez(extent); - } - wc->cleanup_thread_deleting_files = 1; - /* wake up event loop */ - fatal_assert(0 == uv_async_send(&wc->async)); +struct rrdengine_datafile *datafile_release_and_acquire_next_for_retention(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) { + + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + + struct rrdengine_datafile *next_datafile = datafile->next; + + while(next_datafile && !datafile_acquire(next_datafile, DATAFILE_ACQUIRE_RETENTION)) + next_datafile = next_datafile->next; + + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + + datafile_release(datafile, DATAFILE_ACQUIRE_RETENTION); + + return next_datafile; } -void rrdeng_test_quota(struct rrdengine_worker_config* wc) +void find_uuid_first_time( + struct rrdengine_instance *ctx, + struct rrdengine_datafile *datafile, + struct uuid_first_time_s *uuid_first_entry_list, + size_t count) { - struct rrdengine_instance *ctx = wc->ctx; - struct rrdengine_datafile *datafile; - unsigned current_size, target_size; - uint8_t out_of_space, only_one_datafile; - int ret, error; - - out_of_space = 0; - /* Do not allow the pinned pages to exceed the disk space quota to avoid deadlocks */ - if (unlikely(ctx->disk_space > MAX(ctx->max_disk_space, 2 * ctx->metric_API_max_producers * RRDENG_BLOCK_SIZE))) { - out_of_space = 1; - } - datafile = ctx->datafiles.last; - current_size = datafile->pos; - target_size = ctx->max_disk_space / TARGET_DATAFILES; - target_size = MIN(target_size, MAX_DATAFILE_SIZE); - target_size = MAX(target_size, MIN_DATAFILE_SIZE); - only_one_datafile = (datafile == ctx->datafiles.first) ? 1 : 0; - if (unlikely(current_size >= target_size || (out_of_space && only_one_datafile))) { - /* Finalize data and journal file and create a new pair */ - wal_flush_transaction_buffer(wc); - ret = create_new_datafile_pair(ctx, 1, ctx->last_fileno + 1); - if (likely(!ret)) { - ++ctx->last_fileno; + // acquire the datafile to work with it + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + while(datafile && !datafile_acquire(datafile, DATAFILE_ACQUIRE_RETENTION)) + datafile = datafile->next; + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + + if (unlikely(!datafile)) + return; + + unsigned journalfile_count = 0; + size_t binary_match = 0; + size_t not_matching_bsearches = 0; + + while (datafile) { + struct journal_v2_header *j2_header = journalfile_v2_data_acquire(datafile->journalfile, NULL, 0, 0); + if (!j2_header) { + datafile = datafile_release_and_acquire_next_for_retention(ctx, datafile); + continue; + } + + time_t journal_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC); + struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset); + struct uuid_first_time_s *uuid_original_entry; + + size_t journal_metric_count = j2_header->metric_count; + + for (size_t index = 0; index < count; ++index) { + uuid_original_entry = &uuid_first_entry_list[index]; + + // Check here if we should skip this + if (uuid_original_entry->df_matched > 3 || uuid_original_entry->pages_found > 5) + continue; + + struct journal_metric_list *live_entry = bsearch(uuid_original_entry->uuid,uuid_list,journal_metric_count,sizeof(*uuid_list), journal_metric_compare); + if (!live_entry) { + // Not found in this journal + not_matching_bsearches++; + continue; + } + + uuid_original_entry->pages_found += live_entry->entries; + uuid_original_entry->df_matched++; + + time_t old_first_time_s = uuid_original_entry->first_time_s; + + // Calculate first / last for this match + time_t first_time_s = live_entry->delta_start_s + journal_start_time_s; + uuid_original_entry->first_time_s = MIN(uuid_original_entry->first_time_s, first_time_s); + + if (uuid_original_entry->first_time_s != old_first_time_s) + uuid_original_entry->df_index_oldest = uuid_original_entry->df_matched; + + binary_match++; } + + journalfile_count++; + journalfile_v2_data_release(datafile->journalfile); + datafile = datafile_release_and_acquire_next_for_retention(ctx, datafile); } - if (unlikely(out_of_space && NO_QUIESCE == ctx->quiesce)) { - /* delete old data */ - if (wc->now_deleting_files) { - /* already deleting data */ - return; + + // Let's scan the open cache for almost exact match + size_t open_cache_count = 0; + + size_t df_index[10] = { 0 }; + size_t without_metric = 0; + size_t open_cache_gave_first_time_s = 0; + size_t metric_count = 0; + size_t without_retention = 0; + size_t not_needed_bsearches = 0; + + for (size_t index = 0; index < count; ++index) { + struct uuid_first_time_s *uuid_first_t_entry = &uuid_first_entry_list[index]; + + metric_count++; + + size_t idx = uuid_first_t_entry->df_index_oldest; + if(idx >= 10) + idx = 9; + + df_index[idx]++; + + not_needed_bsearches += uuid_first_t_entry->df_matched - uuid_first_t_entry->df_index_oldest; + + if (unlikely(!uuid_first_t_entry->metric)) { + without_metric++; + continue; } - if (NULL == ctx->datafiles.first->next) { - error("Cannot delete data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\"" - " to reclaim space, there are no other file pairs left.", - ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); - return; + + PGC_PAGE *page = pgc_page_get_and_acquire( + open_cache, (Word_t)ctx, + (Word_t)uuid_first_t_entry->metric, 0, + PGC_SEARCH_FIRST); + + if (page) { + time_t old_first_time_s = uuid_first_t_entry->first_time_s; + + time_t first_time_s = pgc_page_start_time_s(page); + uuid_first_t_entry->first_time_s = MIN(uuid_first_t_entry->first_time_s, first_time_s); + pgc_page_release(open_cache, page); + open_cache_count++; + + if(uuid_first_t_entry->first_time_s != old_first_time_s) { + open_cache_gave_first_time_s++; + } } - info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".", - ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); - wc->now_deleting_files = mallocz(sizeof(*wc->now_deleting_files)); - wc->cleanup_thread_deleting_files = 0; - - error = uv_thread_create(wc->now_deleting_files, delete_old_data, ctx); - if (error) { - error("uv_thread_create(): %s", uv_strerror(error)); - freez(wc->now_deleting_files); - wc->now_deleting_files = NULL; + else { + if(!uuid_first_t_entry->df_index_oldest) + without_retention++; } } + internal_error(true, + "DBENGINE: analyzed the retention of %zu rotated metrics of tier %d, " + "did %zu jv2 matching binary searches (%zu not matching, %zu overflown) in %u journal files, " + "%zu metrics with entries in open cache, " + "metrics first time found per datafile index ([not in jv2]:%zu, [1]:%zu, [2]:%zu, [3]:%zu, [4]:%zu, [5]:%zu, [6]:%zu, [7]:%zu, [8]:%zu, [bigger]: %zu), " + "open cache found first time %zu, " + "metrics without any remaining retention %zu, " + "metrics not in MRG %zu", + metric_count, + ctx->config.tier, + binary_match, + not_matching_bsearches, + not_needed_bsearches, + journalfile_count, + open_cache_count, + df_index[0], df_index[1], df_index[2], df_index[3], df_index[4], df_index[5], df_index[6], df_index[7], df_index[8], df_index[9], + open_cache_gave_first_time_s, + without_retention, + without_metric + ); } -static inline int rrdeng_threads_alive(struct rrdengine_worker_config* wc) -{ - if (wc->now_invalidating_dirty_pages || wc->now_deleting_files) { - return 1; +static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile_to_delete, struct rrdengine_datafile *first_datafile_remaining, bool worker) { + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.metrics_retention_started, 1, __ATOMIC_RELAXED); + + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_FIND_ROTATED_METRICS); + + struct rrdengine_journalfile *journalfile = datafile_to_delete->journalfile; + struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, NULL, 0, 0); + struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset); + + size_t count = j2_header->metric_count; + struct uuid_first_time_s *uuid_first_t_entry; + struct uuid_first_time_s *uuid_first_entry_list = callocz(count, sizeof(struct uuid_first_time_s)); + + size_t added = 0; + for (size_t index = 0; index < count; ++index) { + METRIC *metric = mrg_metric_get_and_acquire(main_mrg, &uuid_list[index].uuid, (Word_t) ctx); + if (!metric) + continue; + + uuid_first_entry_list[added].metric = metric; + uuid_first_entry_list[added].first_time_s = LONG_MAX; + uuid_first_entry_list[added].df_matched = 0; + uuid_first_entry_list[added].df_index_oldest = 0; + uuid_first_entry_list[added].uuid = mrg_metric_uuid(main_mrg, metric); + added++; + } + + info("DBENGINE: recalculating tier %d retention for %zu metrics starting with datafile %u", + ctx->config.tier, count, first_datafile_remaining->fileno); + + journalfile_v2_data_release(journalfile); + + // Update the first time / last time for all metrics we plan to delete + + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_FIND_REMAINING_RETENTION); + + find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added); + + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG); + + info("DBENGINE: updating tier %d metrics registry retention for %zu metrics", + ctx->config.tier, added); + + size_t deleted_metrics = 0, zero_retention_referenced = 0, zero_disk_retention = 0, zero_disk_but_live = 0; + for (size_t index = 0; index < added; ++index) { + uuid_first_t_entry = &uuid_first_entry_list[index]; + if (likely(uuid_first_t_entry->first_time_s != LONG_MAX)) { + mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s); + mrg_metric_release(main_mrg, uuid_first_t_entry->metric); + } + else { + zero_disk_retention++; + + // there is no retention for this metric + bool has_retention = mrg_metric_zero_disk_retention(main_mrg, uuid_first_t_entry->metric); + if (!has_retention) { + bool deleted = mrg_metric_release_and_delete(main_mrg, uuid_first_t_entry->metric); + if(deleted) + deleted_metrics++; + else + zero_retention_referenced++; + } + else { + zero_disk_but_live++; + mrg_metric_release(main_mrg, uuid_first_t_entry->metric); + } + } } - return 0; + freez(uuid_first_entry_list); + + internal_error(zero_disk_retention, + "DBENGINE: deleted %zu metrics, zero retention but referenced %zu (out of %zu total, of which %zu have main cache retention) zero on-disk retention tier %d metrics from metrics registry", + deleted_metrics, zero_retention_referenced, zero_disk_retention, zero_disk_but_live, ctx->config.tier); + + if(worker) + worker_is_idle(); } -static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc) -{ - struct rrdengine_instance *ctx = wc->ctx; +void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool update_retention, bool worker) { + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE_WAIT); + + bool datafile_got_for_deletion = datafile_acquire_for_deletion(datafile); + + if (update_retention) + update_metrics_first_time_s(ctx, datafile, datafile->next, worker); - if (unlikely(wc->cleanup_thread_invalidating_dirty_pages)) { - after_invalidate_oldest_committed(wc); + while (!datafile_got_for_deletion) { + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE_WAIT); + + datafile_got_for_deletion = datafile_acquire_for_deletion(datafile); + + if (!datafile_got_for_deletion) { + info("DBENGINE: waiting for data file '%s/" + DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION + "' to be available for deletion, " + "it is in use currently by %u users.", + ctx->config.dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno, datafile->users.lockers); + + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_deletion_spin, 1, __ATOMIC_RELAXED); + sleep_usec(1 * USEC_PER_SEC); + } } - if (unlikely(wc->cleanup_thread_deleting_files)) { - after_delete_old_data(wc); + + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_deletion_started, 1, __ATOMIC_RELAXED); + info("DBENGINE: deleting data file '%s/" + DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION + "'.", + ctx->config.dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno); + + if(worker) + worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE); + + struct rrdengine_journalfile *journal_file; + unsigned deleted_bytes, journal_file_bytes, datafile_bytes; + int ret; + char path[RRDENG_PATH_MAX]; + + uv_rwlock_wrlock(&ctx->datafiles.rwlock); + datafile_list_delete_unsafe(ctx, datafile); + uv_rwlock_wrunlock(&ctx->datafiles.rwlock); + + journal_file = datafile->journalfile; + datafile_bytes = datafile->pos; + journal_file_bytes = journalfile_current_size(journal_file); + deleted_bytes = journalfile_v2_data_size_get(journal_file); + + info("DBENGINE: deleting data and journal files to maintain disk quota"); + ret = journalfile_destroy_unsafe(journal_file, datafile); + if (!ret) { + journalfile_v1_generate_path(datafile, path, sizeof(path)); + info("DBENGINE: deleted journal file \"%s\".", path); + journalfile_v2_generate_path(datafile, path, sizeof(path)); + info("DBENGINE: deleted journal file \"%s\".", path); + deleted_bytes += journal_file_bytes; } - if (unlikely(SET_QUIESCE == ctx->quiesce && !rrdeng_threads_alive(wc))) { - ctx->quiesce = QUIESCED; - completion_mark_complete(&ctx->rrdengine_completion); + ret = destroy_data_file_unsafe(datafile); + if (!ret) { + generate_datafilepath(datafile, path, sizeof(path)); + info("DBENGINE: deleted data file \"%s\".", path); + deleted_bytes += datafile_bytes; } + freez(journal_file); + freez(datafile); + + ctx_current_disk_space_decrease(ctx, deleted_bytes); + info("DBENGINE: reclaimed %u bytes of disk space.", deleted_bytes); } -/* return 0 on success */ -int init_rrd_files(struct rrdengine_instance *ctx) -{ - int ret = init_data_files(ctx); - - BUFFER *wb = buffer_create(1000); - size_t all_errors = 0; - usec_t now = now_realtime_usec(); - - if(ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter) { - buffer_sprintf(wb, "%s%zu pages had start time > end time (latest: %llu secs ago)" - , (all_errors)?", ":"" - , ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter - , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].latest_end_time_ut) / USEC_PER_SEC - ); - all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLIPPED_TIME].counter; - } +static void *database_rotate_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + datafile_delete(ctx, ctx->datafiles.first, ctx_is_available_for_queries(ctx), true); - if(ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter) { - buffer_sprintf(wb, "%s%zu pages had start time = end time with more than 1 entries (latest: %llu secs ago)" - , (all_errors)?", ":"" - , ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter - , (now - ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].latest_end_time_ut) / USEC_PER_SEC - ); - all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_EQUAL_TIME].counter; - } + if (rrdeng_ctx_exceeded_disk_quota(ctx)) + rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); - if(ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter) { - buffer_sprintf(wb, "%s%zu pages had zero points (latest: %llu secs ago)" - , (all_errors)?", ":"" - , ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter - , (now - ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].latest_end_time_ut) / USEC_PER_SEC - ); - all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_ZERO_ENTRIES].counter; - } + rrdcontext_db_rotation(); - if(ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter) { - buffer_sprintf(wb, "%s%zu pages had update every == 0 with entries > 1 (latest: %llu secs ago)" - , (all_errors)?", ":"" - , ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter - , (now - ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].latest_end_time_ut) / USEC_PER_SEC - ); - all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_UPDATE_ZERO].counter; - } + return data; +} - if(ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter) { - buffer_sprintf(wb, "%s%zu pages had a different number of points compared to their timestamps (latest: %llu secs ago; these page have been loaded)" - , (all_errors)?", ":"" - , ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter - , (now - ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].latest_end_time_ut) / USEC_PER_SEC - ); - all_errors += ctx->load_errors[LOAD_ERRORS_PAGE_FLEXY_TIME].counter; - } +static void after_flush_all_hot_and_dirty_pages_of_section(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + ; +} + +static void *flush_all_hot_and_dirty_pages_of_section_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + worker_is_busy(UV_EVENT_DBENGINE_QUIESCE); + pgc_flush_all_hot_and_dirty_pages(main_cache, (Word_t)ctx); + completion_mark_complete(&ctx->quiesce.completion); + return data; +} + +static void after_populate_mrg(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + ; +} + +static void *populate_mrg_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG); + + do { + struct rrdengine_datafile *datafile = NULL; + + // find a datafile to work + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + for(datafile = ctx->datafiles.first; datafile ; datafile = datafile->next) { + if(!netdata_spinlock_trylock(&datafile->populate_mrg.spinlock)) + continue; + + if(datafile->populate_mrg.populated) { + netdata_spinlock_unlock(&datafile->populate_mrg.spinlock); + continue; + } + + // we have the spinlock and it is not populated + break; + } + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); + + if(!datafile) + break; + + journalfile_v2_populate_retention_to_mrg(ctx, datafile->journalfile); + datafile->populate_mrg.populated = true; + netdata_spinlock_unlock(&datafile->populate_mrg.spinlock); - if(ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter) { - buffer_sprintf(wb, "%s%zu extents have been dropped because they didn't have any valid pages" - , (all_errors)?", ":"" - , ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter - ); - all_errors += ctx->load_errors[LOAD_ERRORS_DROPPED_EXTENT].counter; + } while(1); + + completion_mark_complete(completion); + + return data; +} + +static void after_ctx_shutdown(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + ; +} + +static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + worker_is_busy(UV_EVENT_DBENGINE_SHUTDOWN); + + completion_wait_for(&ctx->quiesce.completion); + completion_destroy(&ctx->quiesce.completion); + + bool logged = false; + while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED) || + __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) { + if(!logged) { + logged = true; + info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...", + __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED), + (ctx->config.legacy) ? -1 : ctx->config.tier); + } + sleep_usec(1 * USEC_PER_MS); } - if(all_errors) - info("DBENGINE: tier %d: %s", ctx->tier, buffer_tostring(wb)); + completion_mark_complete(completion); - buffer_free(wb); - return ret; + return data; } -void finalize_rrd_files(struct rrdengine_instance *ctx) -{ - return finalize_data_files(ctx); +static void *cache_flush_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + if (!main_cache) + return data; + + worker_is_busy(UV_EVENT_DBENGINE_FLUSH_MAIN_CACHE); + pgc_flush_pages(main_cache, 0); + + return data; } -void rrdeng_init_cmd_queue(struct rrdengine_worker_config* wc) -{ - wc->cmd_queue.head = wc->cmd_queue.tail = 0; - wc->queue_size = 0; - fatal_assert(0 == uv_cond_init(&wc->cmd_cond)); - fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex)); +static void *cache_evict_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) { + if (!main_cache) + return data; + + worker_is_busy(UV_EVENT_DBENGINE_EVICT_MAIN_CACHE); + pgc_evict_pages(main_cache, 0, 0); + + return data; } -void rrdeng_enq_cmd(struct rrdengine_worker_config* wc, struct rrdeng_cmd *cmd) -{ - unsigned queue_size; +static void after_prep_query(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + ; +} - /* wait for free space in queue */ - uv_mutex_lock(&wc->cmd_mutex); - while ((queue_size = wc->queue_size) == RRDENG_CMD_Q_MAX_SIZE) { - uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex); - } - fatal_assert(queue_size < RRDENG_CMD_Q_MAX_SIZE); - /* enqueue command */ - wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd; - wc->cmd_queue.tail = wc->cmd_queue.tail != RRDENG_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.tail + 1 : 0; - wc->queue_size = queue_size + 1; - uv_mutex_unlock(&wc->cmd_mutex); +static void *query_prep_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) { + worker_is_busy(UV_EVENT_DBENGINE_QUERY); + PDC *pdc = data; + rrdeng_prep_query(pdc); + return data; +} - /* wake up event loop */ - fatal_assert(0 == uv_async_send(&wc->async)); +unsigned rrdeng_target_data_file_size(struct rrdengine_instance *ctx) { + unsigned target_size = ctx->config.max_disk_space / TARGET_DATAFILES; + target_size = MIN(target_size, MAX_DATAFILE_SIZE); + target_size = MAX(target_size, MIN_DATAFILE_SIZE); + return target_size; } -struct rrdeng_cmd rrdeng_deq_cmd(struct rrdengine_worker_config* wc) +bool rrdeng_ctx_exceeded_disk_quota(struct rrdengine_instance *ctx) { - struct rrdeng_cmd ret; - unsigned queue_size; - - uv_mutex_lock(&wc->cmd_mutex); - queue_size = wc->queue_size; - if (queue_size == 0) { - ret.opcode = RRDENG_NOOP; - } else { - /* dequeue command */ - ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head]; - if (queue_size == 1) { - wc->cmd_queue.head = wc->cmd_queue.tail = 0; - } else { - wc->cmd_queue.head = wc->cmd_queue.head != RRDENG_CMD_Q_MAX_SIZE - 1 ? - wc->cmd_queue.head + 1 : 0; - } - wc->queue_size = queue_size - 1; + uint64_t estimated_disk_space = ctx_current_disk_space_get(ctx) + rrdeng_target_data_file_size(ctx) - + (ctx->datafiles.first->prev ? ctx->datafiles.first->prev->pos : 0); - /* wake up producers */ - uv_cond_signal(&wc->cmd_cond); - } - uv_mutex_unlock(&wc->cmd_mutex); + return estimated_disk_space > ctx->config.max_disk_space; +} - return ret; +/* return 0 on success */ +int init_rrd_files(struct rrdengine_instance *ctx) +{ + return init_data_files(ctx); } -static void load_configuration_dynamic(void) +void finalize_rrd_files(struct rrdengine_instance *ctx) { - unsigned read_num = (unsigned)config_get_number(CONFIG_SECTION_DB, "dbengine pages per extent", MAX_PAGES_PER_EXTENT); - if (read_num > 0 && read_num <= MAX_PAGES_PER_EXTENT) - rrdeng_pages_per_extent = read_num; - else { - error("Invalid dbengine pages per extent %u given. Using %u.", read_num, rrdeng_pages_per_extent); - config_set_number(CONFIG_SECTION_DB, "dbengine pages per extent", rrdeng_pages_per_extent); - } + return finalize_data_files(ctx); } void async_cb(uv_async_t *handle) @@ -1253,256 +1392,413 @@ void async_cb(uv_async_t *handle) debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle)); } -/* Flushes dirty pages when timer expires */ #define TIMER_PERIOD_MS (1000) -void timer_cb(uv_timer_t* handle) -{ - worker_is_busy(RRDENG_MAX_OPCODE + 1); - struct rrdengine_worker_config* wc = handle->data; - struct rrdengine_instance *ctx = wc->ctx; +static void *extent_read_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + EPDL *epdl = data; + epdl_find_extent_and_populate_pages(ctx, epdl, true); + return data; +} - uv_stop(handle->loop); - uv_update_time(handle->loop); - rrdeng_test_quota(wc); - debug(D_RRDENGINE, "%s: timeout reached.", __func__); - if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) { - /* There is free space so we can write to disk and we are not actively deleting dirty buffers */ - struct page_cache *pg_cache = &ctx->pg_cache; - unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark, - high_watermark; - - uv_rwlock_rdlock(&pg_cache->committed_page_index.lock); - nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages; - uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock); - producers = ctx->metric_API_max_producers; - /* are flushable pages more than 25% of the maximum page cache size */ - high_watermark = (ctx->max_cache_pages * 25LLU) / 100; - low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */ - - /* Flush more pages only if disk can keep up */ - if (wc->inflight_dirty_pages < high_watermark + producers) { - if (nr_committed_pages > producers && - /* committed to be written pages are more than the produced number */ - nr_committed_pages - producers > high_watermark) { - /* Flushing speed must increase to stop page cache from filling with dirty pages */ - bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE; - } - bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write); +static void epdl_populate_pages_asynchronously(struct rrdengine_instance *ctx, EPDL *epdl, STORAGE_PRIORITY priority) { + rrdeng_enq_cmd(ctx, RRDENG_OPCODE_EXTENT_READ, epdl, NULL, priority, + rrdeng_enqueue_epdl_cmd, rrdeng_dequeue_epdl_cmd); +} - debug(D_RRDENGINE, "Flushing pages to disk."); - for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL); - bytes_written && (total_bytes < bytes_to_write); - total_bytes += bytes_written) { - bytes_written = do_flush_pages(wc, 0, NULL); - } +void pdc_route_asynchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc) { + pdc_to_epdl_router(ctx, pdc, epdl_populate_pages_asynchronously, epdl_populate_pages_asynchronously); +} + +void epdl_populate_pages_synchronously(struct rrdengine_instance *ctx, EPDL *epdl, enum storage_priority priority __maybe_unused) { + epdl_find_extent_and_populate_pages(ctx, epdl, false); +} + +void pdc_route_synchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc) { + pdc_to_epdl_router(ctx, pdc, epdl_populate_pages_synchronously, epdl_populate_pages_synchronously); +} + +#define MAX_RETRIES_TO_START_INDEX (100) +static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + unsigned count = 0; + worker_is_busy(UV_EVENT_DBENGINE_JOURNAL_INDEX_WAIT); + + while (__atomic_load_n(&ctx->atomic.now_deleting_files, __ATOMIC_RELAXED) && count++ < MAX_RETRIES_TO_START_INDEX) + sleep_usec(100 * USEC_PER_MS); + + if (count == MAX_RETRIES_TO_START_INDEX) { + worker_is_idle(); + return data; + } + + struct rrdengine_datafile *datafile = ctx->datafiles.first; + worker_is_busy(UV_EVENT_DBENGINE_JOURNAL_INDEX); + count = 0; + while (datafile && datafile->fileno != ctx_last_fileno_get(ctx) && datafile->fileno != ctx_last_flush_fileno_get(ctx)) { + + netdata_spinlock_lock(&datafile->writers.spinlock); + bool available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true; + netdata_spinlock_unlock(&datafile->writers.spinlock); + + if(!available) + continue; + + if (unlikely(!journalfile_v2_data_available(datafile->journalfile))) { + info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno); + pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type, + journalfile_migrate_to_v2_callback, (void *) datafile->journalfile); + count++; } + + datafile = datafile->next; + + if (unlikely(!ctx_is_available_for_queries(ctx))) + break; } - load_configuration_dynamic(); -#ifdef NETDATA_INTERNAL_CHECKS + + errno = 0; + internal_error(count, "DBENGINE: journal indexing done; %u files processed", count); + + worker_is_idle(); + + return data; +} + +static void after_do_cache_flush(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + rrdeng_main.flushes_running--; +} + +static void after_do_cache_evict(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + rrdeng_main.evictions_running--; +} + +static void after_extent_read(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + ; +} + +static void after_journal_v2_indexing(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + __atomic_store_n(&ctx->atomic.migration_to_v2_running, false, __ATOMIC_RELAXED); + rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); +} + +struct rrdeng_buffer_sizes rrdeng_get_buffer_sizes(void) { + return (struct rrdeng_buffer_sizes) { + .pgc = pgc_aral_overhead() + pgc_aral_structures(), + .mrg = mrg_aral_overhead() + mrg_aral_structures(), + .opcodes = aral_overhead(rrdeng_main.cmd_queue.ar) + aral_structures(rrdeng_main.cmd_queue.ar), + .handles = aral_overhead(rrdeng_main.handles.ar) + aral_structures(rrdeng_main.handles.ar), + .descriptors = aral_overhead(rrdeng_main.descriptors.ar) + aral_structures(rrdeng_main.descriptors.ar), + .wal = __atomic_load_n(&wal_globals.atomics.allocated, __ATOMIC_RELAXED) * (sizeof(WAL) + RRDENG_BLOCK_SIZE), + .workers = aral_overhead(rrdeng_main.work_cmd.ar), + .pdc = pdc_cache_size(), + .xt_io = aral_overhead(rrdeng_main.xt_io_descr.ar) + aral_structures(rrdeng_main.xt_io_descr.ar), + .xt_buf = extent_buffer_cache_size(), + .epdl = epdl_cache_size(), + .deol = deol_cache_size(), + .pd = pd_cache_size(), + +#ifdef PDC_USE_JULYL + .julyl = julyl_cache_size(), +#endif + }; +} + +static void after_cleanup(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) { + rrdeng_main.cleanup_running--; +} + +static void *cleanup_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) { + worker_is_busy(UV_EVENT_DBENGINE_BUFFERS_CLEANUP); + + wal_cleanup1(); + extent_buffer_cleanup1(); + { - char buf[4096]; - debug(D_RRDENGINE, "%s", get_rrdeng_statistics(wc->ctx, buf, sizeof(buf))); + static time_t last_run_s = 0; + time_t now_s = now_monotonic_sec(); + if(now_s - last_run_s >= 10) { + last_run_s = now_s; + journalfile_v2_data_unmount_cleanup(now_s); + } } + +#ifdef PDC_USE_JULYL + julyl_cleanup1(); #endif + return data; +} + +void timer_cb(uv_timer_t* handle) { + worker_is_busy(RRDENG_TIMER_CB); + uv_stop(handle->loop); + uv_update_time(handle->loop); + + worker_set_metric(RRDENG_OPCODES_WAITING, (NETDATA_DOUBLE)rrdeng_main.cmd_queue.unsafe.waiting); + worker_set_metric(RRDENG_WORKS_DISPATCHED, (NETDATA_DOUBLE)__atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED)); + worker_set_metric(RRDENG_WORKS_EXECUTING, (NETDATA_DOUBLE)__atomic_load_n(&rrdeng_main.work_cmd.atomics.executing, __ATOMIC_RELAXED)); + + rrdeng_enq_cmd(NULL, RRDENG_OPCODE_FLUSH_INIT, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); + rrdeng_enq_cmd(NULL, RRDENG_OPCODE_EVICT_INIT, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); + rrdeng_enq_cmd(NULL, RRDENG_OPCODE_CLEANUP, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); + worker_is_idle(); } -#define MAX_CMD_BATCH_SIZE (256) +static void dbengine_initialize_structures(void) { + pgc_and_mrg_initialize(); + + pdc_init(); + page_details_init(); + epdl_init(); + deol_init(); + rrdeng_cmd_queue_init(); + work_request_init(); + rrdeng_query_handle_init(); + page_descriptors_init(); + extent_buffer_init(); + dbengine_page_alloc_init(); + extent_io_descriptor_init(); +} -void rrdeng_worker(void* arg) -{ - worker_register("DBENGINE"); - worker_register_job_name(RRDENG_NOOP, "noop"); - worker_register_job_name(RRDENG_READ_PAGE, "page read"); - worker_register_job_name(RRDENG_READ_EXTENT, "extent read"); - worker_register_job_name(RRDENG_COMMIT_PAGE, "commit"); - worker_register_job_name(RRDENG_FLUSH_PAGES, "flush"); - worker_register_job_name(RRDENG_SHUTDOWN, "shutdown"); - worker_register_job_name(RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE, "page lru"); - worker_register_job_name(RRDENG_QUIESCE, "quiesce"); - worker_register_job_name(RRDENG_MAX_OPCODE, "cleanup"); - worker_register_job_name(RRDENG_MAX_OPCODE + 1, "timer"); - - struct rrdengine_worker_config* wc = arg; - struct rrdengine_instance *ctx = wc->ctx; - uv_loop_t* loop; - int shutdown, ret; - enum rrdeng_opcode opcode; - uv_timer_t timer_req; - struct rrdeng_cmd cmd; - unsigned cmd_batch_size; +bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) { + static bool spawned = false; + static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER; - rrdeng_init_cmd_queue(wc); + netdata_spinlock_lock(&spinlock); - loop = wc->loop = mallocz(sizeof(uv_loop_t)); - ret = uv_loop_init(loop); - if (ret) { - error("uv_loop_init(): %s", uv_strerror(ret)); - goto error_after_loop_init; - } - loop->data = wc; + if(!spawned) { + int ret; - ret = uv_async_init(wc->loop, &wc->async, async_cb); - if (ret) { - error("uv_async_init(): %s", uv_strerror(ret)); - goto error_after_async_init; - } - wc->async.data = wc; + ret = uv_loop_init(&rrdeng_main.loop); + if (ret) { + error("DBENGINE: uv_loop_init(): %s", uv_strerror(ret)); + return false; + } + rrdeng_main.loop.data = &rrdeng_main; - wc->now_deleting_files = NULL; - wc->cleanup_thread_deleting_files = 0; + ret = uv_async_init(&rrdeng_main.loop, &rrdeng_main.async, async_cb); + if (ret) { + error("DBENGINE: uv_async_init(): %s", uv_strerror(ret)); + fatal_assert(0 == uv_loop_close(&rrdeng_main.loop)); + return false; + } + rrdeng_main.async.data = &rrdeng_main; + + ret = uv_timer_init(&rrdeng_main.loop, &rrdeng_main.timer); + if (ret) { + error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret)); + uv_close((uv_handle_t *)&rrdeng_main.async, NULL); + fatal_assert(0 == uv_loop_close(&rrdeng_main.loop)); + return false; + } + rrdeng_main.timer.data = &rrdeng_main; - wc->now_invalidating_dirty_pages = NULL; - wc->cleanup_thread_invalidating_dirty_pages = 0; - wc->inflight_dirty_pages = 0; + dbengine_initialize_structures(); - /* dirty page flushing timer */ - ret = uv_timer_init(loop, &timer_req); - if (ret) { - error("uv_timer_init(): %s", uv_strerror(ret)); - goto error_after_timer_init; + fatal_assert(0 == uv_thread_create(&rrdeng_main.thread, dbengine_event_loop, &rrdeng_main)); + spawned = true; } - timer_req.data = wc; - wc->error = 0; - /* wake up initialization thread */ - completion_mark_complete(&ctx->rrdengine_completion); + netdata_spinlock_unlock(&spinlock); + return true; +} - fatal_assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); - shutdown = 0; - int set_name = 0; - while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) { +void dbengine_event_loop(void* arg) { + sanity_check(); + uv_thread_set_name_np(pthread_self(), "DBENGINE"); + service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true); + + worker_register("DBENGINE"); + + // opcode jobs + worker_register_job_name(RRDENG_OPCODE_NOOP, "noop"); + + worker_register_job_name(RRDENG_OPCODE_QUERY, "query"); + worker_register_job_name(RRDENG_OPCODE_EXTENT_WRITE, "extent write"); + worker_register_job_name(RRDENG_OPCODE_EXTENT_READ, "extent read"); + worker_register_job_name(RRDENG_OPCODE_FLUSHED_TO_OPEN, "flushed to open"); + worker_register_job_name(RRDENG_OPCODE_DATABASE_ROTATE, "db rotate"); + worker_register_job_name(RRDENG_OPCODE_JOURNAL_INDEX, "journal index"); + worker_register_job_name(RRDENG_OPCODE_FLUSH_INIT, "flush init"); + worker_register_job_name(RRDENG_OPCODE_EVICT_INIT, "evict init"); + worker_register_job_name(RRDENG_OPCODE_CTX_SHUTDOWN, "ctx shutdown"); + worker_register_job_name(RRDENG_OPCODE_CTX_QUIESCE, "ctx quiesce"); + + worker_register_job_name(RRDENG_OPCODE_MAX, "get opcode"); + + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_QUERY, "query cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_WRITE, "extent write cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_READ, "extent read cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_FLUSHED_TO_OPEN, "flushed to open cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_DATABASE_ROTATE, "db rotate cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_JOURNAL_INDEX, "journal index cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_FLUSH_INIT, "flush init cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EVICT_INIT, "evict init cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_CTX_SHUTDOWN, "ctx shutdown cb"); + worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_CTX_QUIESCE, "ctx quiesce cb"); + + // special jobs + worker_register_job_name(RRDENG_TIMER_CB, "timer"); + worker_register_job_name(RRDENG_FLUSH_TRANSACTION_BUFFER_CB, "transaction buffer flush cb"); + + worker_register_job_custom_metric(RRDENG_OPCODES_WAITING, "opcodes waiting", "opcodes", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(RRDENG_WORKS_DISPATCHED, "works dispatched", "works", WORKER_METRIC_ABSOLUTE); + worker_register_job_custom_metric(RRDENG_WORKS_EXECUTING, "works executing", "works", WORKER_METRIC_ABSOLUTE); + + struct rrdeng_main *main = arg; + enum rrdeng_opcode opcode; + struct rrdeng_cmd cmd; + main->tid = gettid(); + + fatal_assert(0 == uv_timer_start(&main->timer, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS)); + + bool shutdown = false; + while (likely(!shutdown)) { worker_is_idle(); - uv_run(loop, UV_RUN_DEFAULT); - worker_is_busy(RRDENG_MAX_OPCODE); - rrdeng_cleanup_finished_threads(wc); + uv_run(&main->loop, UV_RUN_DEFAULT); /* wait for commands */ - cmd_batch_size = 0; do { - /* - * Avoid starving the loop when there are too many commands coming in. - * timer_cb will interrupt the loop again to allow serving more commands. - */ - if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE)) - break; - - cmd = rrdeng_deq_cmd(wc); + worker_is_busy(RRDENG_OPCODE_MAX); + cmd = rrdeng_deq_cmd(); opcode = cmd.opcode; - ++cmd_batch_size; - if(likely(opcode != RRDENG_NOOP)) - worker_is_busy(opcode); + worker_is_busy(opcode); switch (opcode) { - case RRDENG_NOOP: - /* the command queue was empty, do nothing */ - break; - case RRDENG_SHUTDOWN: - shutdown = 1; - break; - case RRDENG_QUIESCE: - ctx->drop_metrics_under_page_cache_pressure = 0; - ctx->quiesce = SET_QUIESCE; - fatal_assert(0 == uv_timer_stop(&timer_req)); - uv_close((uv_handle_t *)&timer_req, NULL); - while (do_flush_pages(wc, 1, NULL)) { - ; /* Force flushing of all committed pages. */ + case RRDENG_OPCODE_EXTENT_READ: { + struct rrdengine_instance *ctx = cmd.ctx; + EPDL *epdl = cmd.data; + work_dispatch(ctx, epdl, NULL, opcode, extent_read_tp_worker, after_extent_read); + break; } - wal_flush_transaction_buffer(wc); - if (!rrdeng_threads_alive(wc)) { - ctx->quiesce = QUIESCED; - completion_mark_complete(&ctx->rrdengine_completion); + + case RRDENG_OPCODE_QUERY: { + struct rrdengine_instance *ctx = cmd.ctx; + PDC *pdc = cmd.data; + work_dispatch(ctx, pdc, NULL, opcode, query_prep_tp_worker, after_prep_query); + break; } - break; - case RRDENG_READ_PAGE: - do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0); - break; - case RRDENG_READ_EXTENT: - do_read_extent(wc, cmd.read_extent.page_cache_descr, cmd.read_extent.page_count, 1); - if (unlikely(!set_name)) { - set_name = 1; - uv_thread_set_name_np(ctx->worker_config.thread, "DBENGINE"); + + case RRDENG_OPCODE_EXTENT_WRITE: { + struct rrdengine_instance *ctx = cmd.ctx; + struct page_descr_with_data *base = cmd.data; + struct completion *completion = cmd.completion; // optional + work_dispatch(ctx, base, completion, opcode, extent_write_tp_worker, after_extent_write); + break; } - break; - case RRDENG_COMMIT_PAGE: - do_commit_transaction(wc, STORE_DATA, NULL); - break; - case RRDENG_FLUSH_PAGES: { - if (wc->now_invalidating_dirty_pages) { - /* Do not flush if the disk cannot keep up */ - completion_mark_complete(cmd.completion); - } else { - (void)do_flush_pages(wc, 1, cmd.completion); + + case RRDENG_OPCODE_FLUSHED_TO_OPEN: { + struct rrdengine_instance *ctx = cmd.ctx; + uv_fs_t *uv_fs_request = cmd.data; + struct extent_io_descriptor *xt_io_descr = uv_fs_request->data; + struct completion *completion = xt_io_descr->completion; + work_dispatch(ctx, uv_fs_request, completion, opcode, extent_flushed_to_open_tp_worker, after_extent_flushed_to_open); + break; + } + + case RRDENG_OPCODE_FLUSH_INIT: { + if(rrdeng_main.flushes_running < (size_t)(libuv_worker_threads / 4)) { + rrdeng_main.flushes_running++; + work_dispatch(NULL, NULL, NULL, opcode, cache_flush_tp_worker, after_do_cache_flush); + } + break; + } + + case RRDENG_OPCODE_EVICT_INIT: { + if(!rrdeng_main.evictions_running) { + rrdeng_main.evictions_running++; + work_dispatch(NULL, NULL, NULL, opcode, cache_evict_tp_worker, after_do_cache_evict); + } + break; + } + + case RRDENG_OPCODE_CLEANUP: { + if(!rrdeng_main.cleanup_running) { + rrdeng_main.cleanup_running++; + work_dispatch(NULL, NULL, NULL, opcode, cleanup_tp_worker, after_cleanup); + } + break; + } + + case RRDENG_OPCODE_JOURNAL_INDEX: { + struct rrdengine_instance *ctx = cmd.ctx; + struct rrdengine_datafile *datafile = cmd.data; + if(!__atomic_load_n(&ctx->atomic.migration_to_v2_running, __ATOMIC_RELAXED)) { + + __atomic_store_n(&ctx->atomic.migration_to_v2_running, true, __ATOMIC_RELAXED); + work_dispatch(ctx, datafile, NULL, opcode, journal_v2_indexing_tp_worker, after_journal_v2_indexing); + } + break; + } + + case RRDENG_OPCODE_DATABASE_ROTATE: { + struct rrdengine_instance *ctx = cmd.ctx; + if (!__atomic_load_n(&ctx->atomic.now_deleting_files, __ATOMIC_RELAXED) && + ctx->datafiles.first->next != NULL && + ctx->datafiles.first->next->next != NULL && + rrdeng_ctx_exceeded_disk_quota(ctx)) { + + __atomic_store_n(&ctx->atomic.now_deleting_files, true, __ATOMIC_RELAXED); + work_dispatch(ctx, NULL, NULL, opcode, database_rotate_tp_worker, after_database_rotate); + } + break; + } + + case RRDENG_OPCODE_CTX_POPULATE_MRG: { + struct rrdengine_instance *ctx = cmd.ctx; + struct completion *completion = cmd.completion; + work_dispatch(ctx, NULL, completion, opcode, populate_mrg_tp_worker, after_populate_mrg); + break; + } + + case RRDENG_OPCODE_CTX_QUIESCE: { + // a ctx will shutdown shortly + struct rrdengine_instance *ctx = cmd.ctx; + __atomic_store_n(&ctx->quiesce.enabled, true, __ATOMIC_RELEASE); + work_dispatch(ctx, NULL, NULL, opcode, + flush_all_hot_and_dirty_pages_of_section_tp_worker, + after_flush_all_hot_and_dirty_pages_of_section); + break; + } + + case RRDENG_OPCODE_CTX_SHUTDOWN: { + // a ctx is shutting down + struct rrdengine_instance *ctx = cmd.ctx; + struct completion *completion = cmd.completion; + work_dispatch(ctx, NULL, completion, opcode, ctx_shutdown_tp_worker, after_ctx_shutdown); + break; + } + + case RRDENG_OPCODE_NOOP: { + /* the command queue was empty, do nothing */ + break; + } + + // not opcodes + case RRDENG_OPCODE_MAX: + default: { + internal_fatal(true, "DBENGINE: unknown opcode"); + break; } - break; - case RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE: - rrdeng_invalidate_oldest_committed(wc); - break; - } - default: - debug(D_RRDENGINE, "%s: default.", __func__); - break; } - } while (opcode != RRDENG_NOOP); + + } while (opcode != RRDENG_OPCODE_NOOP); } /* cleanup operations of the event loop */ - info("Shutting down RRD engine event loop for tier %d", ctx->tier); + info("DBENGINE: shutting down dbengine thread"); /* * uv_async_send after uv_close does not seem to crash in linux at the moment, * it is however undocumented behaviour and we need to be aware if this becomes * an issue in the future. */ - uv_close((uv_handle_t *)&wc->async, NULL); - - while (do_flush_pages(wc, 1, NULL)) { - ; /* Force flushing of all committed pages. */ - } - wal_flush_transaction_buffer(wc); - uv_run(loop, UV_RUN_DEFAULT); - - info("Shutting down RRD engine event loop for tier %d complete", ctx->tier); - /* TODO: don't let the API block by waiting to enqueue commands */ - uv_cond_destroy(&wc->cmd_cond); -/* uv_mutex_destroy(&wc->cmd_mutex); */ - fatal_assert(0 == uv_loop_close(loop)); - freez(loop); - + uv_close((uv_handle_t *)&main->async, NULL); + uv_timer_stop(&main->timer); + uv_close((uv_handle_t *)&main->timer, NULL); + uv_run(&main->loop, UV_RUN_DEFAULT); + uv_loop_close(&main->loop); worker_unregister(); - return; - -error_after_timer_init: - uv_close((uv_handle_t *)&wc->async, NULL); -error_after_async_init: - fatal_assert(0 == uv_loop_close(loop)); -error_after_loop_init: - freez(loop); - - wc->error = UV_EAGAIN; - /* wake up initialization thread */ - completion_mark_complete(&ctx->rrdengine_completion); - worker_unregister(); -} - -/* C entry point for development purposes - * make "LDFLAGS=-errdengine_main" - */ -void rrdengine_main(void) -{ - int ret; - struct rrdengine_instance *ctx; - - sanity_check(); - ret = rrdeng_init(NULL, &ctx, "/tmp", RRDENG_MIN_PAGE_CACHE_SIZE_MB, RRDENG_MIN_DISK_SPACE_MB, 0); - if (ret) { - exit(ret); - } - rrdeng_exit(ctx); - fprintf(stderr, "Hello world!"); - exit(0); } |