From aa2fe8ccbfcb117efa207d10229eeeac5d0f97c7 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 6 Feb 2023 17:11:30 +0100 Subject: Adding upstream version 1.38.0. Signed-off-by: Daniel Baumann --- database/engine/pagecache.c | 2054 ++++++++++++++++++++----------------------- 1 file changed, 931 insertions(+), 1123 deletions(-) (limited to 'database/engine/pagecache.c') diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c index 4f5da7084..b4902d784 100644 --- a/database/engine/pagecache.c +++ b/database/engine/pagecache.c @@ -3,1084 +3,836 @@ #include "rrdengine.h" -ARAL page_descr_aral = { - .requested_element_size = sizeof(struct rrdeng_page_descr), - .initial_elements = 20000, - .filename = "page_descriptors", - .cache_dir = &netdata_configured_cache_dir, - .use_mmap = false, - .internal.initialized = false -}; - -void rrdeng_page_descr_aral_go_singlethreaded(void) { - page_descr_aral.internal.lockless = true; -} -void rrdeng_page_descr_aral_go_multithreaded(void) { - page_descr_aral.internal.lockless = false; -} +MRG *main_mrg = NULL; +PGC *main_cache = NULL; +PGC *open_cache = NULL; +PGC *extent_cache = NULL; +struct rrdeng_cache_efficiency_stats rrdeng_cache_efficiency_stats = {}; -struct rrdeng_page_descr *rrdeng_page_descr_mallocz(void) { - struct rrdeng_page_descr *descr; - descr = arrayalloc_mallocz(&page_descr_aral); - return descr; +static void main_cache_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused) +{ + // Release storage associated with the page + dbengine_page_free(entry.data, entry.size); } +static void main_cache_flush_dirty_page_init_callback(PGC *cache __maybe_unused, Word_t section) { + struct rrdengine_instance *ctx = (struct rrdengine_instance *) section; -void rrdeng_page_descr_freez(struct rrdeng_page_descr *descr) { - arrayalloc_freez(&page_descr_aral, descr); + // mark ctx as having flushing in progress + __atomic_add_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED); } -void rrdeng_page_descr_use_malloc(void) { - if(page_descr_aral.internal.initialized) - error("DBENGINE: cannot change ARAL allocation policy after it has been initialized."); - else - page_descr_aral.use_mmap = false; -} +static void main_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused) +{ + if(!entries) + return; -void rrdeng_page_descr_use_mmap(void) { - if(page_descr_aral.internal.initialized) - error("DBENGINE: cannot change ARAL allocation policy after it has been initialized."); - else - page_descr_aral.use_mmap = true; -} + struct rrdengine_instance *ctx = (struct rrdengine_instance *) entries_array[0].section; -bool rrdeng_page_descr_is_mmap(void) { - return page_descr_aral.use_mmap; -} + size_t bytes_per_point = CTX_POINT_SIZE_BYTES(ctx); -/* Forward declarations */ -static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx); + struct page_descr_with_data *base = NULL; -/* always inserts into tail */ -static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx, - struct rrdeng_page_descr *descr) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + for (size_t Index = 0 ; Index < entries; Index++) { + time_t start_time_s = entries_array[Index].start_time_s; + time_t end_time_s = entries_array[Index].end_time_s; + struct page_descr_with_data *descr = page_descriptor_get(); - if (likely(NULL != pg_cache->replaceQ.tail)) { - pg_cache_descr->prev = pg_cache->replaceQ.tail; - pg_cache->replaceQ.tail->next = pg_cache_descr; - } - if (unlikely(NULL == pg_cache->replaceQ.head)) { - pg_cache->replaceQ.head = pg_cache_descr; - } - pg_cache->replaceQ.tail = pg_cache_descr; -} - -static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx, - struct rrdeng_page_descr *descr) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next; + descr->id = mrg_metric_uuid(main_mrg, (METRIC *) entries_array[Index].metric_id); + descr->metric_id = entries_array[Index].metric_id; + descr->start_time_ut = start_time_s * USEC_PER_SEC; + descr->end_time_ut = end_time_s * USEC_PER_SEC; + descr->update_every_s = entries_array[Index].update_every_s; + descr->type = ctx->config.page_type; - prev = pg_cache_descr->prev; - next = pg_cache_descr->next; + descr->page_length = (end_time_s - (start_time_s - descr->update_every_s)) / descr->update_every_s * bytes_per_point; - if (likely(NULL != prev)) { - prev->next = next; - } - if (likely(NULL != next)) { - next->prev = prev; - } - if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) { - pg_cache->replaceQ.head = next; - } - if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) { - pg_cache->replaceQ.tail = prev; - } - pg_cache_descr->prev = pg_cache_descr->next = NULL; -} + if(descr->page_length > entries_array[Index].size) { + descr->page_length = entries_array[Index].size; -void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx, - struct rrdeng_page_descr *descr) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - - uv_rwlock_wrlock(&pg_cache->replaceQ.lock); - pg_cache_replaceQ_insert_unsafe(ctx, descr); - uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); -} + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, "DBENGINE: page exceeds the maximum size, adjusting it to max."); + } -void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx, - struct rrdeng_page_descr *descr) -{ - struct page_cache *pg_cache = &ctx->pg_cache; + descr->page = pgc_page_data(pages_array[Index]); + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(base, descr, link.prev, link.next); - uv_rwlock_wrlock(&pg_cache->replaceQ.lock); - pg_cache_replaceQ_delete_unsafe(ctx, descr); - uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); -} -void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx, - struct rrdeng_page_descr *descr) -{ - struct page_cache *pg_cache = &ctx->pg_cache; + internal_fatal(descr->page_length > RRDENG_BLOCK_SIZE, "DBENGINE: faulty page length calculation"); + } - uv_rwlock_wrlock(&pg_cache->replaceQ.lock); - pg_cache_replaceQ_delete_unsafe(ctx, descr); - pg_cache_replaceQ_insert_unsafe(ctx, descr); - uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); + struct completion completion; + completion_init(&completion); + rrdeng_enq_cmd(ctx, RRDENG_OPCODE_EXTENT_WRITE, base, &completion, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL); + completion_wait_for(&completion); + completion_destroy(&completion); } -struct rrdeng_page_descr *pg_cache_create_descr(void) +static void open_cache_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused) { - struct rrdeng_page_descr *descr; - - descr = rrdeng_page_descr_mallocz(); - descr->page_length = 0; - descr->start_time_ut = INVALID_TIME; - descr->end_time_ut = INVALID_TIME; - descr->id = NULL; - descr->extent = NULL; - descr->pg_cache_descr_state = 0; - descr->pg_cache_descr = NULL; - descr->update_every_s = 0; - - return descr; + struct rrdengine_datafile *datafile = entry.data; + datafile_release(datafile, DATAFILE_ACQUIRE_OPEN_CACHE); } -/* The caller must hold page descriptor lock. */ -void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr) +static void open_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused) { - struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; - if (pg_cache_descr->waiters) - uv_cond_broadcast(&pg_cache_descr->cond); + ; } -void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +static void extent_cache_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused) { - rrdeng_page_descr_mutex_lock(ctx, descr); - pg_cache_wake_up_waiters_unsafe(descr); - rrdeng_page_descr_mutex_unlock(ctx, descr); + dbengine_extent_free(entry.data, entry.size); } -/* - * The caller must hold page descriptor lock. - * The lock will be released and re-acquired. The descriptor is not guaranteed - * to exist after this function returns. - */ -void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr) +static void extent_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused) { - struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; - - ++pg_cache_descr->waiters; - uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex); - --pg_cache_descr->waiters; + ; } -/* - * The caller must hold page descriptor lock. - * The lock will be released and re-acquired. The descriptor is not guaranteed - * to exist after this function returns. - * Returns UV_ETIMEDOUT if timeout_sec seconds pass. - */ -int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec) -{ - int ret; - struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; +inline TIME_RANGE_COMPARE is_page_in_time_range(time_t page_first_time_s, time_t page_last_time_s, time_t wanted_start_time_s, time_t wanted_end_time_s) { + // page_first_time_s <= wanted_end_time_s && page_last_time_s >= wanted_start_time_s + + if(page_last_time_s < wanted_start_time_s) + return PAGE_IS_IN_THE_PAST; - ++pg_cache_descr->waiters; - ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC); - --pg_cache_descr->waiters; + if(page_first_time_s > wanted_end_time_s) + return PAGE_IS_IN_THE_FUTURE; - return ret; + return PAGE_IS_IN_RANGE; } -/* - * Returns page flags. - * The lock will be released and re-acquired. The descriptor is not guaranteed - * to exist after this function returns. - */ -unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) +static int journal_metric_uuid_compare(const void *key, const void *metric) { - struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; - unsigned long flags; - - rrdeng_page_descr_mutex_lock(ctx, descr); - pg_cache_wait_event_unsafe(descr); - flags = pg_cache_descr->flags; - rrdeng_page_descr_mutex_unlock(ctx, descr); - - return flags; + return uuid_compare(*(uuid_t *) key, ((struct journal_metric_list *) metric)->uuid); } -/* - * The caller must hold page descriptor lock. - */ -int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) -{ - struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; +static inline struct page_details *pdc_find_page_for_time( + Pcvoid_t PArray, + time_t wanted_time_s, + size_t *gaps, + PDC_PAGE_STATUS mode, + PDC_PAGE_STATUS skip_list +) { + Word_t PIndexF = wanted_time_s, PIndexL = wanted_time_s; + Pvoid_t *PValueF, *PValueL; + struct page_details *pdF = NULL, *pdL = NULL; + bool firstF = true, firstL = true; + + PDC_PAGE_STATUS ignore_list = PDC_PAGE_QUERY_GLOBAL_SKIP_LIST | skip_list; + + while ((PValueF = PDCJudyLFirstThenNext(PArray, &PIndexF, &firstF))) { + pdF = *PValueF; + + PDC_PAGE_STATUS status = __atomic_load_n(&pdF->status, __ATOMIC_ACQUIRE); + if (!(status & (ignore_list | mode))) + break; - if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) || - (exclusive_access && pg_cache_descr->refcnt)) { - return 0; + pdF = NULL; } - return 1; -} + while ((PValueL = PDCJudyLLastThenPrev(PArray, &PIndexL, &firstL))) { + pdL = *PValueL; -/* - * The caller must hold page descriptor lock. - * Gets a reference to the page descriptor. - * Returns 1 on success and 0 on failure. - */ -int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access) -{ - struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + PDC_PAGE_STATUS status = __atomic_load_n(&pdL->status, __ATOMIC_ACQUIRE); + if(status & mode) { + // don't go all the way back to the beginning + // stop at the last processed + pdL = NULL; + break; + } - if (!pg_cache_can_get_unsafe(descr, exclusive_access)) - return 0; + if (!(status & ignore_list)) + break; - if (exclusive_access) - pg_cache_descr->flags |= RRD_PAGE_LOCKED; - ++pg_cache_descr->refcnt; + pdL = NULL; + } - return 1; -} + TIME_RANGE_COMPARE rcF = (pdF) ? is_page_in_time_range(pdF->first_time_s, pdF->last_time_s, wanted_time_s, wanted_time_s) : PAGE_IS_IN_THE_FUTURE; + TIME_RANGE_COMPARE rcL = (pdL) ? is_page_in_time_range(pdL->first_time_s, pdL->last_time_s, wanted_time_s, wanted_time_s) : PAGE_IS_IN_THE_PAST; -/* - * The caller must hold the page descriptor lock. - * This function may block doing cleanup. - */ -void pg_cache_put_unsafe(struct rrdeng_page_descr *descr) -{ - struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + if (!pdF || pdF == pdL) { + // F is missing, or they are the same + // return L + (*gaps) += (rcL == PAGE_IS_IN_RANGE) ? 0 : 1; + return pdL; + } - pg_cache_descr->flags &= ~RRD_PAGE_LOCKED; - if (0 == --pg_cache_descr->refcnt) { - pg_cache_wake_up_waiters_unsafe(descr); + if (!pdL) { + // L is missing + // return F + (*gaps) += (rcF == PAGE_IS_IN_RANGE) ? 0 : 1; + return pdF; } -} -/* - * This function may block doing cleanup. - */ -void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) -{ - rrdeng_page_descr_mutex_lock(ctx, descr); - pg_cache_put_unsafe(descr); - rrdeng_page_descr_mutex_unlock(ctx, descr); -} + if (rcF == rcL) { + // both are on the same side, + // but they are different pages -/* The caller must hold the page cache lock */ -static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number) -{ - struct page_cache *pg_cache = &ctx->pg_cache; + switch (rcF) { + case PAGE_IS_IN_RANGE: + // pick the higher resolution + if (pdF->update_every_s && pdF->update_every_s < pdL->update_every_s) + return pdF; - pg_cache->populated_pages -= number; -} + if (pdL->update_every_s && pdL->update_every_s < pdF->update_every_s) + return pdL; -static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number) -{ - struct page_cache *pg_cache = &ctx->pg_cache; + // same resolution - pick the one that starts earlier + if (pdL->first_time_s < pdF->first_time_s) + return pdL; - uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); - pg_cache_release_pages_unsafe(ctx, number); - uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); -} + return pdF; + break; -/* - * This function returns the maximum number of pages allowed in the page cache. - */ -unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx) -{ - return ctx->max_cache_pages + (unsigned long)ctx->metric_API_max_producers; -} + case PAGE_IS_IN_THE_FUTURE: + (*gaps)++; -/* - * This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the - * number of pages below that number. - */ -unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx) -{ - return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers; -} + // pick the one that starts earlier + if (pdL->first_time_s < pdF->first_time_s) + return pdL; -/* - * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page - * cache. - */ -unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx) -{ - /* We remove the active pages of the producers from the calculation and only allow the extra pinned pages */ - return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers; -} + return pdF; + break; -/* - * This function will block until it reserves #number populated pages. - * It will trigger evictions or dirty page flushing if the pg_cache_hard_limit() limit is hit. - */ -static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - unsigned failures = 0; - const unsigned FAILURES_CEILING = 10; /* truncates exponential backoff to (2^FAILURES_CEILING x slot) */ - unsigned long exp_backoff_slot_usec = USEC_PER_MS * 10; - - assert(number < ctx->max_cache_pages); - - uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); - if (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) - debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==", - number); - while (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) { - - if (!pg_cache_try_evict_one_page_unsafe(ctx)) { - /* failed to evict */ - struct completion compl; - struct rrdeng_cmd cmd; - - ++failures; - uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); - - completion_init(&compl); - cmd.opcode = RRDENG_FLUSH_PAGES; - cmd.completion = &compl; - rrdeng_enq_cmd(&ctx->worker_config, &cmd); - /* wait for some pages to be flushed */ - debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__); - completion_wait_for(&compl); - completion_destroy(&compl); - - if (unlikely(failures > 1)) { - unsigned long slots, usecs_to_sleep; - /* exponential backoff */ - slots = random() % (2LU << MIN(failures, FAILURES_CEILING)); - usecs_to_sleep = slots * exp_backoff_slot_usec; - - if (usecs_to_sleep >= USEC_PER_SEC) - error("Page cache is full. Sleeping for %llu second(s).", usecs_to_sleep / USEC_PER_SEC); - - (void)sleep_usec(usecs_to_sleep); - } - uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); + default: + case PAGE_IS_IN_THE_PAST: + (*gaps)++; + return NULL; + break; } } - pg_cache->populated_pages += number; - uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); -} -/* - * This function will attempt to reserve #number populated pages. - * It may trigger evictions if the pg_cache_soft_limit() limit is hit. - * Returns 0 on failure and 1 on success. - */ -static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - unsigned count = 0; - int ret = 0; - - assert(number < ctx->max_cache_pages); - - uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); - if (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1) { - debug(D_RRDENGINE, - "==Page cache full. Trying to reserve %u pages.==", - number); - do { - if (!pg_cache_try_evict_one_page_unsafe(ctx)) - break; - ++count; - } while (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1); - debug(D_RRDENGINE, "Evicted %u pages.", count); + if(rcF == PAGE_IS_IN_RANGE) { + // (*gaps) += 0; + return pdF; } - if (pg_cache->populated_pages + number < pg_cache_hard_limit(ctx) + 1) { - pg_cache->populated_pages += number; - ret = 1; /* success */ + if(rcL == PAGE_IS_IN_RANGE) { + // (*gaps) += 0; + return pdL; } - uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); - return ret; -} + if(rcF == PAGE_IS_IN_THE_FUTURE) { + (*gaps)++; + return pdF; + } -/* The caller must hold the page cache and the page descriptor locks in that order */ -static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr) -{ - struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; + if(rcL == PAGE_IS_IN_THE_FUTURE) { + (*gaps)++; + return pdL; + } - dbengine_page_free(pg_cache_descr->page); - pg_cache_descr->page = NULL; - pg_cache_descr->flags &= ~RRD_PAGE_POPULATED; - pg_cache_release_pages_unsafe(ctx, 1); - ++ctx->stats.pg_cache_evictions; + // impossible case + (*gaps)++; + return NULL; } -/* - * The caller must hold the page cache lock. - * Lock order: page cache -> replaceQ -> page descriptor - * This function iterates all pages and tries to evict one. - * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress, - * or it sets it to NULL if no write-back is in progress. - * - * Returns 1 on success and 0 on failure. - */ -static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - unsigned long old_flags; - struct rrdeng_page_descr *descr; - struct page_cache_descr *pg_cache_descr = NULL; +static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengine_instance *ctx, + time_t wanted_start_time_s, time_t wanted_end_time_s, + Pvoid_t *JudyL_page_array, size_t *cache_gaps, + bool open_cache_mode, PDC_PAGE_STATUS tags) { - uv_rwlock_wrlock(&pg_cache->replaceQ.lock); - for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) { - descr = pg_cache_descr->descr; + size_t pages_found_in_cache = 0; + Word_t metric_id = mrg_metric_id(main_mrg, metric); - rrdeng_page_descr_mutex_lock(ctx, descr); - old_flags = pg_cache_descr->flags; - if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) { - /* must evict */ - pg_cache_evict_unsafe(ctx, descr); - pg_cache_put_unsafe(descr); - pg_cache_replaceQ_delete_unsafe(ctx, descr); + time_t now_s = wanted_start_time_s; + time_t dt_s = mrg_metric_get_update_every_s(main_mrg, metric); - rrdeng_page_descr_mutex_unlock(ctx, descr); - uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); + if(!dt_s) + dt_s = default_rrd_update_every; - rrdeng_try_deallocate_pg_cache_descr(ctx, descr); + time_t previous_page_end_time_s = now_s - dt_s; + bool first = true; - return 1; - } - rrdeng_page_descr_mutex_unlock(ctx, descr); - } - uv_rwlock_wrunlock(&pg_cache->replaceQ.lock); + do { + PGC_PAGE *page = pgc_page_get_and_acquire( + cache, (Word_t)ctx, (Word_t)metric_id, now_s, + (first) ? PGC_SEARCH_CLOSEST : PGC_SEARCH_NEXT); - /* failed to evict */ - return 0; -} + first = false; -/** - * Deletes a page from the database. - * Callers of this function need to make sure they're not deleting the same descriptor concurrently. - * @param ctx is the database instance. - * @param descr is the page descriptor. - * @param remove_dirty must be non-zero if the page to be deleted is dirty. - * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference. - * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not - * NULL. Otherwise, metric_id is not set. - * @return 1 if it's safe to delete the metric, 0 otherwise. - */ -uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty, - uint8_t is_exclusive_holder, uuid_t *metric_id) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - struct page_cache_descr *pg_cache_descr = NULL; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index = NULL; - int ret; - uint8_t can_delete_metric = 0; - - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t)); - fatal_assert(NULL != PValue); - page_index = *PValue; - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - - uv_rwlock_wrlock(&page_index->lock); - ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time_ut / USEC_PER_SEC), PJE0); - if (unlikely(0 == ret)) { - uv_rwlock_wrunlock(&page_index->lock); - if (unlikely(debug_flags & D_RRDENGINE)) { - print_page_descr(descr); - } - goto destroy; - } - --page_index->page_count; - if (!page_index->writers && !page_index->page_count) { - can_delete_metric = 1; - if (metric_id) { - memcpy(metric_id, page_index->id, sizeof(uuid_t)); - } - } - uv_rwlock_wrunlock(&page_index->lock); - fatal_assert(1 == ret); - - uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); - ++ctx->stats.pg_cache_deletions; - --pg_cache->page_descriptors; - uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); - - rrdeng_page_descr_mutex_lock(ctx, descr); - pg_cache_descr = descr->pg_cache_descr; - if (!is_exclusive_holder) { - /* If we don't hold an exclusive page reference get one */ - while (!pg_cache_try_get_unsafe(descr, 1)) { - debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__); - if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr, "", true); - pg_cache_wait_event_unsafe(descr); + if(!page) { + if(previous_page_end_time_s < wanted_end_time_s) + (*cache_gaps)++; + + break; } - } - if (remove_dirty) { - pg_cache_descr->flags &= ~RRD_PAGE_DIRTY; - } else { - /* even a locked page could be dirty */ - while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) { - debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__); - if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr, "", true); - pg_cache_wait_event_unsafe(descr); + + time_t page_start_time_s = pgc_page_start_time_s(page); + time_t page_end_time_s = pgc_page_end_time_s(page); + time_t page_update_every_s = pgc_page_update_every_s(page); + size_t page_length = pgc_page_data_size(cache, page); + + if(!page_update_every_s) + page_update_every_s = dt_s; + + if(is_page_in_time_range(page_start_time_s, page_end_time_s, wanted_start_time_s, wanted_end_time_s) != PAGE_IS_IN_RANGE) { + // not a useful page for this query + pgc_page_release(cache, page); + page = NULL; + + if(previous_page_end_time_s < wanted_end_time_s) + (*cache_gaps)++; + + break; } - } - rrdeng_page_descr_mutex_unlock(ctx, descr); - - while (unlikely(pg_cache_descr->flags & RRD_PAGE_READ_PENDING)) { - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, "%s: Found page with READ PENDING, waiting for read to complete", __func__); - if (unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr, "", true); - pg_cache_wait_event_unsafe(descr); - } - if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { - /* only after locking can it be safely deleted from LRU */ - pg_cache_replaceQ_delete(ctx, descr); + if (page_start_time_s - previous_page_end_time_s > dt_s) + (*cache_gaps)++; + + Pvoid_t *PValue = PDCJudyLIns(JudyL_page_array, (Word_t) page_start_time_s, PJE0); + if (!PValue || PValue == PJERR) + fatal("DBENGINE: corrupted judy array in %s()", __FUNCTION__ ); + + if (unlikely(*PValue)) { + struct page_details *pd = *PValue; + UNUSED(pd); + +// internal_error( +// pd->first_time_s != page_first_time_s || +// pd->last_time_s != page_last_time_s || +// pd->update_every_s != page_update_every_s, +// "DBENGINE: duplicate page with different retention in %s cache " +// "1st: %ld to %ld, ue %u, size %u " +// "2nd: %ld to %ld, ue %ld size %zu " +// "- ignoring the second", +// cache == open_cache ? "open" : "main", +// pd->first_time_s, pd->last_time_s, pd->update_every_s, pd->page_length, +// page_first_time_s, page_last_time_s, page_update_every_s, page_length); + + pgc_page_release(cache, page); + } + else { - uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); - pg_cache_evict_unsafe(ctx, descr); - uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); - } - pg_cache_put(ctx, descr); - rrdeng_try_deallocate_pg_cache_descr(ctx, descr); - while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) { - rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */ - (void)sleep_usec(1000); /* 1 msec */ - } -destroy: - rrdeng_page_descr_freez(descr); - pg_cache_update_metric_times(page_index); + internal_fatal(pgc_page_metric(page) != metric_id, "Wrong metric id in page found in cache"); + internal_fatal(pgc_page_section(page) != (Word_t)ctx, "Wrong section in page found in cache"); - return can_delete_metric; -} + struct page_details *pd = page_details_get(); + pd->metric_id = metric_id; + pd->first_time_s = page_start_time_s; + pd->last_time_s = page_end_time_s; + pd->page_length = page_length; + pd->update_every_s = page_update_every_s; + pd->page = (open_cache_mode) ? NULL : page; + pd->status |= tags; -static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time) -{ - usec_t pg_start, pg_end; + if((pd->page)) { + pd->status |= PDC_PAGE_READY | PDC_PAGE_PRELOADED; - pg_start = descr->start_time_ut; - pg_end = descr->end_time_ut; + if(pgc_page_data(page) == DBENGINE_EMPTY_PAGE) + pd->status |= PDC_PAGE_EMPTY; + } - return (pg_start < start_time && pg_end >= start_time) || - (pg_start >= start_time && pg_start <= end_time); -} + if(open_cache_mode) { + struct rrdengine_datafile *datafile = pgc_page_data(page); + if(datafile_acquire(datafile, DATAFILE_ACQUIRE_PAGE_DETAILS)) { // for pd + struct extent_io_data *xio = (struct extent_io_data *) pgc_page_custom_data(cache, page); + pd->datafile.ptr = pgc_page_data(page); + pd->datafile.file = xio->file; + pd->datafile.extent.pos = xio->pos; + pd->datafile.extent.bytes = xio->bytes; + pd->datafile.fileno = pd->datafile.ptr->fileno; + pd->status |= PDC_PAGE_DATAFILE_ACQUIRED | PDC_PAGE_DISK_PENDING; + } + else { + pd->status |= PDC_PAGE_FAILED | PDC_PAGE_FAILED_TO_ACQUIRE_DATAFILE; + } + pgc_page_release(cache, page); + } -static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time) -{ - return (point_in_time >= descr->start_time_ut && point_in_time <= descr->end_time_ut); -} + *PValue = pd; -/* The caller must hold the page index lock */ -static inline struct rrdeng_page_descr * - find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time) -{ - struct rrdeng_page_descr *descr = NULL; - Pvoid_t *PValue; - Word_t Index; - - Index = (Word_t)(start_time / USEC_PER_SEC); - PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); - if (likely(NULL != PValue)) { - descr = *PValue; - if (is_page_in_time_range(descr, start_time, end_time)) { - return descr; + pages_found_in_cache++; } - } - Index = (Word_t)(start_time / USEC_PER_SEC); - PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); - if (likely(NULL != PValue)) { - descr = *PValue; - if (is_page_in_time_range(descr, start_time, end_time)) { - return descr; - } - } + // prepare for the next iteration + previous_page_end_time_s = page_end_time_s; - return NULL; -} + if(page_update_every_s > 0) + dt_s = page_update_every_s; -/* Update metric oldest and latest timestamps efficiently when adding new values */ -void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr) -{ - usec_t oldest_time = page_index->oldest_time_ut; - usec_t latest_time = page_index->latest_time_ut; + // we are going to as for the NEXT page + // so, set this to our first time + now_s = page_start_time_s; - if (unlikely(oldest_time == INVALID_TIME || descr->start_time_ut < oldest_time)) { - page_index->oldest_time_ut = descr->start_time_ut; - } - if (likely(descr->end_time_ut > latest_time || latest_time == INVALID_TIME)) { - page_index->latest_time_ut = descr->end_time_ut; - } + } while(now_s <= wanted_end_time_s); + + return pages_found_in_cache; } -/* Update metric oldest and latest timestamps when removing old values */ -void pg_cache_update_metric_times(struct pg_cache_page_index *page_index) -{ - Pvoid_t *firstPValue, *lastPValue; - Word_t firstIndex, lastIndex; - struct rrdeng_page_descr *descr; - usec_t oldest_time = INVALID_TIME; - usec_t latest_time = INVALID_TIME; - - uv_rwlock_rdlock(&page_index->lock); - /* Find first page in range */ - firstIndex = (Word_t)0; - firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0); - if (likely(NULL != firstPValue)) { - descr = *firstPValue; - oldest_time = descr->start_time_ut; - } - lastIndex = (Word_t)-1; - lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0); - if (likely(NULL != lastPValue)) { - descr = *lastPValue; - latest_time = descr->end_time_ut; - } - uv_rwlock_rdunlock(&page_index->lock); +static void pgc_inject_gap(struct rrdengine_instance *ctx, METRIC *metric, time_t start_time_s, time_t end_time_s) { - if (unlikely(NULL == firstPValue)) { - fatal_assert(NULL == lastPValue); - page_index->oldest_time_ut = page_index->latest_time_ut = INVALID_TIME; + time_t db_first_time_s, db_last_time_s, db_update_every_s; + mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s); + + if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) != PAGE_IS_IN_RANGE) return; - } - page_index->oldest_time_ut = oldest_time; - page_index->latest_time_ut = latest_time; + + PGC_ENTRY page_entry = { + .hot = false, + .section = (Word_t)ctx, + .metric_id = (Word_t)metric, + .start_time_s = MAX(start_time_s, db_first_time_s), + .end_time_s = MIN(end_time_s, db_last_time_s), + .update_every_s = 0, + .size = 0, + .data = DBENGINE_EMPTY_PAGE, + }; + + if(page_entry.start_time_s >= page_entry.end_time_s) + return; + + PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, NULL); + pgc_page_release(main_cache, page); } -/* If index is NULL lookup by UUID (descr->id) */ -void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, - struct rrdeng_page_descr *descr) -{ - struct page_cache *pg_cache = &ctx->pg_cache; +static size_t list_has_time_gaps( + struct rrdengine_instance *ctx, + METRIC *metric, + Pvoid_t JudyL_page_array, + time_t wanted_start_time_s, + time_t wanted_end_time_s, + size_t *pages_total, + size_t *pages_found_pass4, + size_t *pages_pending, + size_t *pages_overlapping, + time_t *optimal_end_time_s, + bool populate_gaps +) { + // we will recalculate these, so zero them + *pages_pending = 0; + *pages_overlapping = 0; + *optimal_end_time_s = 0; + + bool first; Pvoid_t *PValue; - struct pg_cache_page_index *page_index; - unsigned long pg_cache_descr_state = descr->pg_cache_descr_state; - - if (0 != pg_cache_descr_state) { - /* there is page cache descriptor pre-allocated state */ - struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr; - - fatal_assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED); - if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { - pg_cache_reserve_pages(ctx, 1); - if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY)) - pg_cache_replaceQ_insert(ctx, descr); - } - } + Word_t this_page_start_time; + struct page_details *pd; + + size_t gaps = 0; + Word_t metric_id = mrg_metric_id(main_mrg, metric); + + // ------------------------------------------------------------------------ + // PASS 1: remove the preprocessing flags from the pages in PDC - if (unlikely(NULL == index)) { - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t)); - fatal_assert(NULL != PValue); - page_index = *PValue; - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - } else { - page_index = index; + first = true; + this_page_start_time = 0; + while((PValue = PDCJudyLFirstThenNext(JudyL_page_array, &this_page_start_time, &first))) { + pd = *PValue; + pd->status &= ~(PDC_PAGE_SKIP|PDC_PAGE_PREPROCESSED); } - uv_rwlock_wrlock(&page_index->lock); - PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time_ut / USEC_PER_SEC), PJE0); - *PValue = descr; - ++page_index->page_count; - pg_cache_add_new_metric_time(page_index, descr); - uv_rwlock_wrunlock(&page_index->lock); - - uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock); - ++ctx->stats.pg_cache_insertions; - ++pg_cache->page_descriptors; - uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock); -} + // ------------------------------------------------------------------------ + // PASS 2: emulate processing to find the useful pages -usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - struct rrdeng_page_descr *descr = NULL; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index = NULL; + time_t now_s = wanted_start_time_s; + time_t dt_s = mrg_metric_get_update_every_s(main_mrg, metric); + if(!dt_s) + dt_s = default_rrd_update_every; - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); - if (likely(NULL != PValue)) { - page_index = *PValue; - } - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - if (NULL == PValue) { - return INVALID_TIME; - } + size_t pages_pass2 = 0, pages_pass3 = 0; + while((pd = pdc_find_page_for_time( + JudyL_page_array, now_s, &gaps, + PDC_PAGE_PREPROCESSED, 0))) { - uv_rwlock_rdlock(&page_index->lock); - descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut); - if (NULL == descr) { - uv_rwlock_rdunlock(&page_index->lock); - return INVALID_TIME; - } - uv_rwlock_rdunlock(&page_index->lock); - return descr->start_time_ut; -} + pd->status |= PDC_PAGE_PREPROCESSED; + pages_pass2++; -/** - * Return page information for the first page before point_in_time that satisfies the filter. - * @param ctx DB context - * @param page_index page index of a metric - * @param point_in_time_ut the pages that are searched must be older than this timestamp - * @param filter decides if the page satisfies the caller's criteria - * @param page_info the result of the search is set in this pointer - */ -void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index, - usec_t point_in_time_ut, pg_cache_page_info_filter_t *filter, - struct rrdeng_page_info *page_info) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - struct rrdeng_page_descr *descr = NULL; - Pvoid_t *PValue; - Word_t Index; + if(pd->update_every_s) + dt_s = pd->update_every_s; - (void)pg_cache; - fatal_assert(NULL != page_index); + if(populate_gaps && pd->first_time_s > now_s) + pgc_inject_gap(ctx, metric, now_s, pd->first_time_s); - Index = (Word_t)(point_in_time_ut / USEC_PER_SEC); - uv_rwlock_rdlock(&page_index->lock); - do { - PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0); - descr = unlikely(NULL == PValue) ? NULL : *PValue; - } while (descr != NULL && !filter(descr)); - if (unlikely(NULL == descr)) { - page_info->page_length = 0; - page_info->start_time_ut = INVALID_TIME; - page_info->end_time_ut = INVALID_TIME; - } else { - page_info->page_length = descr->page_length; - page_info->start_time_ut = descr->start_time_ut; - page_info->end_time_ut = descr->end_time_ut; + now_s = pd->last_time_s + dt_s; + if(now_s > wanted_end_time_s) { + *optimal_end_time_s = pd->last_time_s; + break; + } } - uv_rwlock_rdunlock(&page_index->lock); -} -/** - * Searches for an unallocated page without triggering disk I/O. Attempts to reserve the page and get a reference. - * @param ctx DB context - * @param id lookup by UUID - * @param start_time_ut exact starting time in usec - * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID. - * @return the page descriptor or NULL on failure. It can fail if: - * 1. The page is already allocated to the page cache. - * 2. It did not succeed to get a reference. - * 3. It did not succeed to reserve a spot in the page cache. - */ -struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id, - usec_t start_time_ut) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - struct rrdeng_page_descr *descr = NULL; - struct page_cache_descr *pg_cache_descr = NULL; - unsigned long flags; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index = NULL; - Word_t Index; + if(populate_gaps && now_s < wanted_end_time_s) + pgc_inject_gap(ctx, metric, now_s, wanted_end_time_s); - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); - if (likely(NULL != PValue)) { - page_index = *PValue; - } - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); + // ------------------------------------------------------------------------ + // PASS 3: mark as skipped all the pages not useful - if ((NULL == PValue) || !pg_cache_try_reserve_pages(ctx, 1)) { - /* Failed to find page or failed to reserve a spot in the cache */ - return NULL; - } + first = true; + this_page_start_time = 0; + while((PValue = PDCJudyLFirstThenNext(JudyL_page_array, &this_page_start_time, &first))) { + pd = *PValue; - uv_rwlock_rdlock(&page_index->lock); - Index = (Word_t)(start_time_ut / USEC_PER_SEC); - PValue = JudyLGet(page_index->JudyL_array, Index, PJE0); - if (likely(NULL != PValue)) { - descr = *PValue; - } - if (NULL == PValue || 0 == descr->page_length) { - /* Failed to find non-empty page */ - uv_rwlock_rdunlock(&page_index->lock); + internal_fatal(pd->metric_id != metric_id, "pd has wrong metric_id"); - pg_cache_release_pages(ctx, 1); - return NULL; - } + if(!(pd->status & PDC_PAGE_PREPROCESSED)) { + (*pages_overlapping)++; + pd->status |= PDC_PAGE_SKIP; + pd->status &= ~(PDC_PAGE_READY | PDC_PAGE_DISK_PENDING); + continue; + } - rrdeng_page_descr_mutex_lock(ctx, descr); - pg_cache_descr = descr->pg_cache_descr; - flags = pg_cache_descr->flags; - uv_rwlock_rdunlock(&page_index->lock); + pages_pass3++; - if ((flags & RRD_PAGE_POPULATED) || !pg_cache_try_get_unsafe(descr, 1)) { - /* Failed to get reference or page is already populated */ - rrdeng_page_descr_mutex_unlock(ctx, descr); + if(!pd->page) { + pd->page = pgc_page_get_and_acquire(main_cache, (Word_t) ctx, (Word_t) metric_id, pd->first_time_s, PGC_SEARCH_EXACT); - pg_cache_release_pages(ctx, 1); - return NULL; + if(pd->page) { + (*pages_found_pass4)++; + + pd->status &= ~PDC_PAGE_DISK_PENDING; + pd->status |= PDC_PAGE_READY | PDC_PAGE_PRELOADED | PDC_PAGE_PRELOADED_PASS4; + + if(pgc_page_data(pd->page) == DBENGINE_EMPTY_PAGE) + pd->status |= PDC_PAGE_EMPTY; + + } + else if(!(pd->status & PDC_PAGE_FAILED) && (pd->status & PDC_PAGE_DATAFILE_ACQUIRED)) { + (*pages_pending)++; + + pd->status |= PDC_PAGE_DISK_PENDING; + + internal_fatal(pd->status & PDC_PAGE_SKIP, "page is disk pending and skipped"); + internal_fatal(!pd->datafile.ptr, "datafile is NULL"); + internal_fatal(!pd->datafile.extent.bytes, "datafile.extent.bytes zero"); + internal_fatal(!pd->datafile.extent.pos, "datafile.extent.pos is zero"); + internal_fatal(!pd->datafile.fileno, "datafile.fileno is zero"); + } + } + else { + pd->status &= ~PDC_PAGE_DISK_PENDING; + pd->status |= (PDC_PAGE_READY | PDC_PAGE_PRELOADED); + } } - /* success */ - rrdeng_page_descr_mutex_unlock(ctx, descr); - rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); - return descr; + internal_fatal(pages_pass2 != pages_pass3, + "DBENGINE: page count does not match"); + + *pages_total = pages_pass2; + + return gaps; } -/** - * Searches for pages in a time range and triggers disk I/O if necessary and possible. - * Does not get a reference. - * @param ctx DB context - * @param id UUID - * @param start_time_ut inclusive starting time in usec - * @param end_time_ut inclusive ending time in usec - * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap - * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez(). - * If page_info_arrayp is set to NULL nothing was allocated. - * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID. - * @return the number of pages that overlap with the time range [start_time,end_time]. - */ -unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut, - struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES]; - struct page_cache_descr *pg_cache_descr = NULL; - unsigned i, j, k, preload_count, count, page_info_array_max_size; - unsigned long flags; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index = NULL; - Word_t Index; - uint8_t failed_to_reserve; +typedef void (*page_found_callback_t)(PGC_PAGE *page, void *data); +static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METRIC *metric, usec_t start_time_ut, usec_t end_time_ut, page_found_callback_t callback, void *callback_data) { + uuid_t *uuid = mrg_metric_uuid(main_mrg, metric); + Word_t metric_id = mrg_metric_id(main_mrg, metric); - fatal_assert(NULL != ret_page_indexp); + time_t wanted_start_time_s = (time_t)(start_time_ut / USEC_PER_SEC); + time_t wanted_end_time_s = (time_t)(end_time_ut / USEC_PER_SEC); - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); - if (likely(NULL != PValue)) { - *ret_page_indexp = page_index = *PValue; - } - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - if (NULL == PValue) { - debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__); - *ret_page_indexp = NULL; - return 0; - } + size_t pages_found = 0; - uv_rwlock_rdlock(&page_index->lock); - descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut); - if (NULL == descr) { - uv_rwlock_rdunlock(&page_index->lock); - debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__); - *ret_page_indexp = NULL; - return 0; - } else { - Index = (Word_t)(descr->start_time_ut / USEC_PER_SEC); - } - if (page_info_arrayp) { - page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info); - *page_info_arrayp = mallocz(page_info_array_max_size); - } + uv_rwlock_rdlock(&ctx->datafiles.rwlock); + struct rrdengine_datafile *datafile; + for(datafile = ctx->datafiles.first; datafile ; datafile = datafile->next) { + struct journal_v2_header *j2_header = journalfile_v2_data_acquire(datafile->journalfile, NULL, + wanted_start_time_s, + wanted_end_time_s); + if (unlikely(!j2_header)) + continue; - for (count = 0, preload_count = 0 ; - descr != NULL && is_page_in_time_range(descr, start_time_ut, end_time_ut) ; - PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0), - descr = unlikely(NULL == PValue) ? NULL : *PValue) { - /* Iterate all pages in range */ + time_t journal_start_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC); - if (unlikely(0 == descr->page_length)) + // the datafile possibly contains useful data for this query + + size_t journal_metric_count = (size_t)j2_header->metric_count; + struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset); + struct journal_metric_list *uuid_entry = bsearch(uuid,uuid_list,journal_metric_count,sizeof(*uuid_list), journal_metric_uuid_compare); + + if (unlikely(!uuid_entry)) { + // our UUID is not in this datafile + journalfile_v2_data_release(datafile->journalfile); continue; - if (page_info_arrayp) { - if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) { - page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info); - *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size); - } - (*page_info_arrayp)[count].start_time_ut = descr->start_time_ut; - (*page_info_arrayp)[count].end_time_ut = descr->end_time_ut; - (*page_info_arrayp)[count].page_length = descr->page_length; } - ++count; - - rrdeng_page_descr_mutex_lock(ctx, descr); - pg_cache_descr = descr->pg_cache_descr; - flags = pg_cache_descr->flags; - if (pg_cache_can_get_unsafe(descr, 0)) { - if (flags & RRD_PAGE_POPULATED) { - /* success */ - rrdeng_page_descr_mutex_unlock(ctx, descr); - debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); + + struct journal_page_header *page_list_header = (struct journal_page_header *) ((uint8_t *) j2_header + uuid_entry->page_offset); + struct journal_page_list *page_list = (struct journal_page_list *)((uint8_t *) page_list_header + sizeof(*page_list_header)); + struct journal_extent_list *extent_list = (void *)((uint8_t *)j2_header + j2_header->extent_offset); + uint32_t uuid_page_entries = page_list_header->entries; + + for (uint32_t index = 0; index < uuid_page_entries; index++) { + struct journal_page_list *page_entry_in_journal = &page_list[index]; + + time_t page_first_time_s = page_entry_in_journal->delta_start_s + journal_start_time_s; + time_t page_last_time_s = page_entry_in_journal->delta_end_s + journal_start_time_s; + + TIME_RANGE_COMPARE prc = is_page_in_time_range(page_first_time_s, page_last_time_s, wanted_start_time_s, wanted_end_time_s); + if(prc == PAGE_IS_IN_THE_PAST) continue; - } - } - if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { - preload_array[preload_count++] = descr; - if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) { - rrdeng_page_descr_mutex_unlock(ctx, descr); + + if(prc == PAGE_IS_IN_THE_FUTURE) break; + + time_t page_update_every_s = page_entry_in_journal->update_every_s; + size_t page_length = page_entry_in_journal->page_length; + + if(datafile_acquire(datafile, DATAFILE_ACQUIRE_OPEN_CACHE)) { //for open cache item + // add this page to open cache + bool added = false; + struct extent_io_data ei = { + .pos = extent_list[page_entry_in_journal->extent_index].datafile_offset, + .bytes = extent_list[page_entry_in_journal->extent_index].datafile_size, + .page_length = page_length, + .file = datafile->file, + .fileno = datafile->fileno, + }; + + PGC_PAGE *page = pgc_page_add_and_acquire(open_cache, (PGC_ENTRY) { + .hot = false, + .section = (Word_t) ctx, + .metric_id = metric_id, + .start_time_s = page_first_time_s, + .end_time_s = page_last_time_s, + .update_every_s = page_update_every_s, + .data = datafile, + .size = 0, + .custom_data = (uint8_t *) &ei, + }, &added); + + if(!added) + datafile_release(datafile, DATAFILE_ACQUIRE_OPEN_CACHE); + + callback(page, callback_data); + + pgc_page_release(open_cache, page); + + pages_found++; } } - rrdeng_page_descr_mutex_unlock(ctx, descr); + journalfile_v2_data_release(datafile->journalfile); } - uv_rwlock_rdunlock(&page_index->lock); + uv_rwlock_rdunlock(&ctx->datafiles.rwlock); - failed_to_reserve = 0; - for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) { - struct rrdeng_cmd cmd; - struct rrdeng_page_descr *next; + return pages_found; +} - descr = preload_array[i]; - if (NULL == descr) { - continue; - } - if (!pg_cache_try_reserve_pages(ctx, 1)) { - failed_to_reserve = 1; - break; - } - cmd.opcode = RRDENG_READ_EXTENT; - cmd.read_extent.page_cache_descr[0] = descr; - /* don't use this page again */ - preload_array[i] = NULL; - for (j = 0, k = 1 ; j < preload_count ; ++j) { - next = preload_array[j]; - if (NULL == next) { - continue; - } - if (descr->extent == next->extent) { - /* same extent, consolidate */ - if (!pg_cache_try_reserve_pages(ctx, 1)) { - failed_to_reserve = 1; - break; - } - cmd.read_extent.page_cache_descr[k++] = next; - /* don't use this page again */ - preload_array[j] = NULL; - } - } - cmd.read_extent.page_count = k; - rrdeng_enq_cmd(&ctx->worker_config, &cmd); +void add_page_details_from_journal_v2(PGC_PAGE *page, void *JudyL_pptr) { + struct rrdengine_datafile *datafile = pgc_page_data(page); + + if(!datafile_acquire(datafile, DATAFILE_ACQUIRE_PAGE_DETAILS)) // for pd + return; + + Pvoid_t *PValue = PDCJudyLIns(JudyL_pptr, pgc_page_start_time_s(page), PJE0); + if (!PValue || PValue == PJERR) + fatal("DBENGINE: corrupted judy array"); + + if (unlikely(*PValue)) { + datafile_release(datafile, DATAFILE_ACQUIRE_PAGE_DETAILS); + return; } - if (failed_to_reserve) { - debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__); - for (i = 0 ; i < preload_count ; ++i) { - descr = preload_array[i]; - if (NULL == descr) { - continue; - } - pg_cache_put(ctx, descr); - } + + Word_t metric_id = pgc_page_metric(page); + + // let's add it to the judy + struct extent_io_data *ei = pgc_page_custom_data(open_cache, page); + struct page_details *pd = page_details_get(); + *PValue = pd; + + pd->datafile.extent.pos = ei->pos; + pd->datafile.extent.bytes = ei->bytes; + pd->datafile.file = ei->file; + pd->datafile.fileno = ei->fileno; + pd->first_time_s = pgc_page_start_time_s(page); + pd->last_time_s = pgc_page_end_time_s(page); + pd->datafile.ptr = datafile; + pd->page_length = ei->page_length; + pd->update_every_s = pgc_page_update_every_s(page); + pd->metric_id = metric_id; + pd->status |= PDC_PAGE_DISK_PENDING | PDC_PAGE_SOURCE_JOURNAL_V2 | PDC_PAGE_DATAFILE_ACQUIRED; +} + +// Return a judyL will all pages that have start_time_ut and end_time_ut +// Pvalue of the judy will be the end time for that page +// DBENGINE2: +#define time_delta(finish, pass) do { if(pass) { usec_t t = pass; (pass) = (finish) - (pass); (finish) = t; } } while(0) +static Pvoid_t get_page_list( + struct rrdengine_instance *ctx, + METRIC *metric, + usec_t start_time_ut, + usec_t end_time_ut, + size_t *pages_to_load, + time_t *optimal_end_time_s +) { + *optimal_end_time_s = 0; + + Pvoid_t JudyL_page_array = (Pvoid_t) NULL; + + time_t wanted_start_time_s = (time_t)(start_time_ut / USEC_PER_SEC); + time_t wanted_end_time_s = (time_t)(end_time_ut / USEC_PER_SEC); + + size_t pages_found_in_main_cache = 0, + pages_found_in_open_cache = 0, + pages_found_in_journals_v2 = 0, + pages_found_pass4 = 0, + pages_pending = 0, + pages_overlapping = 0, + pages_total = 0; + + size_t cache_gaps = 0, query_gaps = 0; + bool done_v2 = false, done_open = false; + + usec_t pass1_ut = 0, pass2_ut = 0, pass3_ut = 0, pass4_ut = 0; + + // -------------------------------------------------------------- + // PASS 1: Check what the main page cache has available + + pass1_ut = now_monotonic_usec(); + size_t pages_pass1 = get_page_list_from_pgc(main_cache, metric, ctx, wanted_start_time_s, wanted_end_time_s, + &JudyL_page_array, &cache_gaps, + false, PDC_PAGE_SOURCE_MAIN_CACHE); + query_gaps += cache_gaps; + pages_found_in_main_cache += pages_pass1; + pages_total += pages_pass1; + + if(pages_found_in_main_cache && !cache_gaps) { + query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s, + &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping, + optimal_end_time_s, false); + + if (pages_total && !query_gaps) + goto we_are_done; } - if (!preload_count) { - /* no such page */ - debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__); + + // -------------------------------------------------------------- + // PASS 2: Check what the open journal page cache has available + // these will be loaded from disk + + pass2_ut = now_monotonic_usec(); + size_t pages_pass2 = get_page_list_from_pgc(open_cache, metric, ctx, wanted_start_time_s, wanted_end_time_s, + &JudyL_page_array, &cache_gaps, + true, PDC_PAGE_SOURCE_OPEN_CACHE); + query_gaps += cache_gaps; + pages_found_in_open_cache += pages_pass2; + pages_total += pages_pass2; + done_open = true; + + if(pages_found_in_open_cache) { + query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s, + &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping, + optimal_end_time_s, false); + + if (pages_total && !query_gaps) + goto we_are_done; } - if (unlikely(0 == count && page_info_arrayp)) { - freez(*page_info_arrayp); - *page_info_arrayp = NULL; + + // -------------------------------------------------------------- + // PASS 3: Check Journal v2 to fill the gaps + + pass3_ut = now_monotonic_usec(); + size_t pages_pass3 = get_page_list_from_journal_v2(ctx, metric, start_time_ut, end_time_ut, + add_page_details_from_journal_v2, &JudyL_page_array); + pages_found_in_journals_v2 += pages_pass3; + pages_total += pages_pass3; + done_v2 = true; + + // -------------------------------------------------------------- + // PASS 4: Check the cache again + // and calculate the time gaps in the query + // THIS IS REQUIRED AFTER JOURNAL V2 LOOKUP + + pass4_ut = now_monotonic_usec(); + query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s, + &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping, + optimal_end_time_s, true); + +we_are_done: + + if(pages_to_load) + *pages_to_load = pages_pending; + + usec_t finish_ut = now_monotonic_usec(); + time_delta(finish_ut, pass4_ut); + time_delta(finish_ut, pass3_ut); + time_delta(finish_ut, pass2_ut); + time_delta(finish_ut, pass1_ut); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_main_cache_lookup, pass1_ut, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_open_cache_lookup, pass2_ut, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_journal_v2_lookup, pass3_ut, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_pass4_lookup, pass4_ut, __ATOMIC_RELAXED); + + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_planned_with_gaps, (query_gaps) ? 1 : 0, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_open, done_open ? 1 : 0, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_journal_v2, done_v2 ? 1 : 0, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_total, pages_total, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_meta_source_main_cache, pages_found_in_main_cache, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_meta_source_open_cache, pages_found_in_open_cache, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_meta_source_journal_v2, pages_found_in_journals_v2, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache, pages_found_in_main_cache, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache_at_pass4, pages_found_pass4, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_to_load_from_disk, pages_pending, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_overlapping_skipped, pages_overlapping, __ATOMIC_RELAXED); + + return JudyL_page_array; +} + +inline void rrdeng_prep_wait(PDC *pdc) { + if (unlikely(pdc && !pdc->prep_done)) { + usec_t started_ut = now_monotonic_usec(); + completion_wait_for(&pdc->prep_completion); + pdc->prep_done = true; + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_wait_for_prep, now_monotonic_usec() - started_ut, __ATOMIC_RELAXED); } - return count; } -/* - * Searches for a page and gets a reference. - * When point_in_time is INVALID_TIME get any page. - * If index is NULL lookup by UUID (id). - */ -struct rrdeng_page_descr * - pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, - usec_t point_in_time_ut) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - struct rrdeng_page_descr *descr = NULL; - struct page_cache_descr *pg_cache_descr = NULL; - unsigned long flags; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index = NULL; - Word_t Index; - uint8_t page_not_in_cache; - - if (unlikely(NULL == index)) { - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); - if (likely(NULL != PValue)) { - page_index = *PValue; - } - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - if (NULL == PValue) { - return NULL; - } - } else { - page_index = index; +void rrdeng_prep_query(PDC *pdc) { + size_t pages_to_load = 0; + pdc->page_list_JudyL = get_page_list(pdc->ctx, pdc->metric, + pdc->start_time_s * USEC_PER_SEC, + pdc->end_time_s * USEC_PER_SEC, + &pages_to_load, + &pdc->optimal_end_time_s); + + if (pages_to_load && pdc->page_list_JudyL) { + pdc_acquire(pdc); // we get 1 for the 1st worker in the chain: do_read_page_list_work() + usec_t start_ut = now_monotonic_usec(); +// if(likely(priority == STORAGE_PRIORITY_BEST_EFFORT)) +// dbengine_load_page_list_directly(ctx, handle->pdc); +// else + pdc_route_asynchronously(pdc->ctx, pdc); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_to_route, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED); } - pg_cache_reserve_pages(ctx, 1); - - page_not_in_cache = 0; - uv_rwlock_rdlock(&page_index->lock); - while (1) { - Index = (Word_t)(point_in_time_ut / USEC_PER_SEC); - PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0); - if (likely(NULL != PValue)) { - descr = *PValue; - } - if (NULL == PValue || - 0 == descr->page_length || - (INVALID_TIME != point_in_time_ut && - !is_point_in_time_in_page(descr, point_in_time_ut))) { - /* non-empty page not found */ - uv_rwlock_rdunlock(&page_index->lock); - - pg_cache_release_pages(ctx, 1); - return NULL; - } - rrdeng_page_descr_mutex_lock(ctx, descr); - pg_cache_descr = descr->pg_cache_descr; - flags = pg_cache_descr->flags; - if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) { - /* success */ - rrdeng_page_descr_mutex_unlock(ctx, descr); - debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); - break; - } - if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { - struct rrdeng_cmd cmd; + else + completion_mark_complete(&pdc->page_completion); - uv_rwlock_rdunlock(&page_index->lock); + completion_mark_complete(&pdc->prep_completion); - cmd.opcode = RRDENG_READ_PAGE; - cmd.read_page.page_cache_descr = descr; - rrdeng_enq_cmd(&ctx->worker_config, &cmd); + pdc_release_and_destroy_if_unreferenced(pdc, true, true); +} - debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__); - if(unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr, "", true); - while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) { - pg_cache_wait_event_unsafe(descr); - } - /* success */ - /* Downgrade exclusive reference to allow other readers */ - pg_cache_descr->flags &= ~RRD_PAGE_LOCKED; - pg_cache_wake_up_waiters_unsafe(descr); - rrdeng_page_descr_mutex_unlock(ctx, descr); - rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); - return descr; - } - uv_rwlock_rdunlock(&page_index->lock); - debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__); - if(unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr, "", true); - if (!(flags & RRD_PAGE_POPULATED)) - page_not_in_cache = 1; - pg_cache_wait_event_unsafe(descr); - rrdeng_page_descr_mutex_unlock(ctx, descr); - - /* reset scan to find again */ - uv_rwlock_rdlock(&page_index->lock); - } - uv_rwlock_rdunlock(&page_index->lock); +/** + * Searches for pages in a time range and triggers disk I/O if necessary and possible. + * @param ctx DB context + * @param handle query handle as initialized + * @param start_time_ut inclusive starting time in usec + * @param end_time_ut inclusive ending time in usec + * @return 1 / 0 (pages found or not found) + */ +void pg_cache_preload(struct rrdeng_query_handle *handle) { + if (unlikely(!handle || !handle->metric)) + return; - if (!(flags & RRD_PAGE_DIRTY)) - pg_cache_replaceQ_set_hot(ctx, descr); - pg_cache_release_pages(ctx, 1); - if (page_not_in_cache) - rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); - else - rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1); - return descr; + __atomic_add_fetch(&handle->ctx->atomic.inflight_queries, 1, __ATOMIC_RELAXED); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.currently_running_queries, 1, __ATOMIC_RELAXED); + handle->pdc = pdc_get(); + handle->pdc->metric = mrg_metric_dup(main_mrg, handle->metric); + handle->pdc->start_time_s = handle->start_time_s; + handle->pdc->end_time_s = handle->end_time_s; + handle->pdc->priority = handle->priority; + handle->pdc->optimal_end_time_s = handle->end_time_s; + handle->pdc->ctx = handle->ctx; + handle->pdc->refcount = 1; + netdata_spinlock_init(&handle->pdc->refcount_spinlock); + completion_init(&handle->pdc->prep_completion); + completion_init(&handle->pdc->page_completion); + + if(ctx_is_available_for_queries(handle->ctx)) { + handle->pdc->refcount++; // we get 1 for the query thread and 1 for the prep thread + rrdeng_enq_cmd(handle->ctx, RRDENG_OPCODE_QUERY, handle->pdc, NULL, handle->priority, NULL, NULL); + } + else { + completion_mark_complete(&handle->pdc->prep_completion); + completion_mark_complete(&handle->pdc->page_completion); + } } /* @@ -1088,226 +840,282 @@ struct rrdeng_page_descr * * start_time and end_time are inclusive. * If index is NULL lookup by UUID (id). */ -struct rrdeng_page_descr * -pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id, - usec_t start_time_ut, usec_t end_time_ut) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - struct rrdeng_page_descr *descr = NULL; - struct page_cache_descr *pg_cache_descr = NULL; - unsigned long flags; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index = NULL; - uint8_t page_not_in_cache; - - if (unlikely(NULL == index)) { - uv_rwlock_rdlock(&pg_cache->metrics_index.lock); - PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t)); - if (likely(NULL != PValue)) { - page_index = *PValue; +struct pgc_page *pg_cache_lookup_next( + struct rrdengine_instance *ctx, + PDC *pdc, + time_t now_s, + time_t last_update_every_s, + size_t *entries +) { + if (unlikely(!pdc)) + return NULL; + + rrdeng_prep_wait(pdc); + + if (unlikely(!pdc->page_list_JudyL)) + return NULL; + + usec_t start_ut = now_monotonic_usec(); + size_t gaps = 0; + bool waited = false, preloaded; + PGC_PAGE *page = NULL; + + while(!page) { + bool page_from_pd = false; + preloaded = false; + struct page_details *pd = pdc_find_page_for_time( + pdc->page_list_JudyL, now_s, &gaps, + PDC_PAGE_PROCESSED, PDC_PAGE_EMPTY); + + if (!pd) + break; + + page = pd->page; + page_from_pd = true; + preloaded = pdc_page_status_check(pd, PDC_PAGE_PRELOADED); + if(!page) { + if(!completion_is_done(&pdc->page_completion)) { + page = pgc_page_get_and_acquire(main_cache, (Word_t)ctx, + pd->metric_id, pd->first_time_s, PGC_SEARCH_EXACT); + page_from_pd = false; + preloaded = pdc_page_status_check(pd, PDC_PAGE_PRELOADED); + } + + if(!page) { + pdc->completed_jobs = + completion_wait_for_a_job(&pdc->page_completion, pdc->completed_jobs); + + page = pd->page; + page_from_pd = true; + preloaded = pdc_page_status_check(pd, PDC_PAGE_PRELOADED); + waited = true; + } } - uv_rwlock_rdunlock(&pg_cache->metrics_index.lock); - if (NULL == PValue) { - return NULL; + + if(page && pgc_page_data(page) == DBENGINE_EMPTY_PAGE) + pdc_page_status_set(pd, PDC_PAGE_EMPTY); + + if(!page || pdc_page_status_check(pd, PDC_PAGE_QUERY_GLOBAL_SKIP_LIST | PDC_PAGE_EMPTY)) { + page = NULL; + continue; } - } else { - page_index = index; - } - pg_cache_reserve_pages(ctx, 1); - - page_not_in_cache = 0; - uv_rwlock_rdlock(&page_index->lock); - int retry_count = 0; - while (1) { - descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut); - if (NULL == descr || 0 == descr->page_length || retry_count == default_rrdeng_page_fetch_retries) { - /* non-empty page not found */ - if (retry_count == default_rrdeng_page_fetch_retries) - error_report("Page cache timeout while waiting for page %p : returning FAIL", descr); - uv_rwlock_rdunlock(&page_index->lock); - - pg_cache_release_pages(ctx, 1); - return NULL; + + // we now have page and is not empty + + time_t page_start_time_s = pgc_page_start_time_s(page); + time_t page_end_time_s = pgc_page_end_time_s(page); + time_t page_update_every_s = pgc_page_update_every_s(page); + size_t page_length = pgc_page_data_size(main_cache, page); + + if(unlikely(page_start_time_s == INVALID_TIME || page_end_time_s == INVALID_TIME)) { + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_zero_time_skipped, 1, __ATOMIC_RELAXED); + pgc_page_to_clean_evict_or_release(main_cache, page); + pdc_page_status_set(pd, PDC_PAGE_INVALID | PDC_PAGE_RELEASED); + pd->page = page = NULL; + continue; } - rrdeng_page_descr_mutex_lock(ctx, descr); - pg_cache_descr = descr->pg_cache_descr; - flags = pg_cache_descr->flags; - if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) { - /* success */ - rrdeng_page_descr_mutex_unlock(ctx, descr); - debug(D_RRDENGINE, "%s: Page was found in memory.", __func__); - break; + else if(page_length > RRDENG_BLOCK_SIZE) { + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_size_skipped, 1, __ATOMIC_RELAXED); + pgc_page_to_clean_evict_or_release(main_cache, page); + pdc_page_status_set(pd, PDC_PAGE_INVALID | PDC_PAGE_RELEASED); + pd->page = page = NULL; + continue; } - if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) { - struct rrdeng_cmd cmd; + else { + if (unlikely(page_update_every_s <= 0 || page_update_every_s > 86400)) { + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_update_every_fixed, 1, __ATOMIC_RELAXED); + pd->update_every_s = page_update_every_s = pgc_page_fix_update_every(page, last_update_every_s); + } - uv_rwlock_rdunlock(&page_index->lock); + size_t entries_by_size = page_entries_by_size(page_length, CTX_POINT_SIZE_BYTES(ctx)); + size_t entries_by_time = page_entries_by_time(page_start_time_s, page_end_time_s, page_update_every_s); + if(unlikely(entries_by_size < entries_by_time)) { + time_t fixed_page_end_time_s = (time_t)(page_start_time_s + (entries_by_size - 1) * page_update_every_s); + pd->last_time_s = page_end_time_s = pgc_page_fix_end_time_s(page, fixed_page_end_time_s); + entries_by_time = (page_end_time_s - (page_start_time_s - page_update_every_s)) / page_update_every_s; - cmd.opcode = RRDENG_READ_PAGE; - cmd.read_page.page_cache_descr = descr; - rrdeng_enq_cmd(&ctx->worker_config, &cmd); + internal_fatal(entries_by_size != entries_by_time, "DBENGINE: wrong entries by time again!"); - debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__); - if(unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr, "", true); - while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) { - pg_cache_wait_event_unsafe(descr); + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_entries_fixed, 1, __ATOMIC_RELAXED); } - /* success */ - /* Downgrade exclusive reference to allow other readers */ - pg_cache_descr->flags &= ~RRD_PAGE_LOCKED; - pg_cache_wake_up_waiters_unsafe(descr); - rrdeng_page_descr_mutex_unlock(ctx, descr); - rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); - return descr; + *entries = entries_by_time; } - uv_rwlock_rdunlock(&page_index->lock); - debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__); - if(unlikely(debug_flags & D_RRDENGINE)) - print_page_cache_descr(descr, "", true); - if (!(flags & RRD_PAGE_POPULATED)) - page_not_in_cache = 1; - - if (pg_cache_timedwait_event_unsafe(descr, default_rrdeng_page_fetch_timeout) == UV_ETIMEDOUT) { - error_report("Page cache timeout while waiting for page %p : retry count = %d", descr, retry_count); - ++retry_count; + + if(unlikely(page_end_time_s < now_s)) { + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_past_time_skipped, 1, __ATOMIC_RELAXED); + pgc_page_release(main_cache, page); + pdc_page_status_set(pd, PDC_PAGE_SKIP | PDC_PAGE_RELEASED); + pd->page = page = NULL; + continue; } - rrdeng_page_descr_mutex_unlock(ctx, descr); - /* reset scan to find again */ - uv_rwlock_rdlock(&page_index->lock); + if(page_from_pd) + // PDC_PAGE_RELEASED is for pdc_destroy() to not release the page twice - the caller will release it + pdc_page_status_set(pd, PDC_PAGE_RELEASED | PDC_PAGE_PROCESSED); + else + pdc_page_status_set(pd, PDC_PAGE_PROCESSED); } - uv_rwlock_rdunlock(&page_index->lock); - if (!(flags & RRD_PAGE_DIRTY)) - pg_cache_replaceQ_set_hot(ctx, descr); - pg_cache_release_pages(ctx, 1); - if (page_not_in_cache) - rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1); - else - rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1); - return descr; -} + if(gaps && !pdc->executed_with_gaps) + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_executed_with_gaps, 1, __ATOMIC_RELAXED); + pdc->executed_with_gaps = +gaps; -struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_instance *ctx) -{ - struct pg_cache_page_index *page_index; - - page_index = mallocz(sizeof(*page_index)); - page_index->JudyL_array = (Pvoid_t) NULL; - uuid_copy(page_index->id, *id); - fatal_assert(0 == uv_rwlock_init(&page_index->lock)); - page_index->oldest_time_ut = INVALID_TIME; - page_index->latest_time_ut = INVALID_TIME; - page_index->prev = NULL; - page_index->page_count = 0; - page_index->refcount = 0; - page_index->writers = 0; - page_index->ctx = ctx; - page_index->latest_update_every_s = default_rrd_update_every; - - return page_index; -} + if(page) { + if(waited) + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_wait_loaded, 1, __ATOMIC_RELAXED); + else + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_nowait_loaded, 1, __ATOMIC_RELAXED); + } + else { + if(waited) + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_wait_failed, 1, __ATOMIC_RELAXED); + else + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_nowait_failed, 1, __ATOMIC_RELAXED); + } -static void init_metrics_index(struct rrdengine_instance *ctx) -{ - struct page_cache *pg_cache = &ctx->pg_cache; + if(waited) { + if(preloaded) + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_slow_preload_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED); + else + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_slow_disk_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED); + } + else { + if(preloaded) + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_fast_preload_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED); + else + __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_fast_disk_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED); + } - pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL; - pg_cache->metrics_index.last_page_index = NULL; - fatal_assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock)); + return page; } -static void init_replaceQ(struct rrdengine_instance *ctx) -{ - struct page_cache *pg_cache = &ctx->pg_cache; +void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, time_t update_every_s, + struct rrdengine_datafile *datafile, uint64_t extent_offset, unsigned extent_size, uint32_t page_length) { + + if(!datafile_acquire(datafile, DATAFILE_ACQUIRE_OPEN_CACHE)) // for open cache item + fatal("DBENGINE: cannot acquire datafile to put page in open cache"); + + struct extent_io_data ext_io_data = { + .file = datafile->file, + .fileno = datafile->fileno, + .pos = extent_offset, + .bytes = extent_size, + .page_length = page_length + }; + + PGC_ENTRY page_entry = { + .hot = true, + .section = section, + .metric_id = metric_id, + .start_time_s = start_time_s, + .end_time_s = end_time_s, + .update_every_s = update_every_s, + .size = 0, + .data = datafile, + .custom_data = (uint8_t *) &ext_io_data, + }; + + internal_fatal(!datafile->fileno, "DBENGINE: datafile supplied does not have a number"); + + bool added = true; + PGC_PAGE *page = pgc_page_add_and_acquire(open_cache, page_entry, &added); + int tries = 100; + while(!added && page_entry.end_time_s > pgc_page_end_time_s(page) && tries--) { + pgc_page_to_clean_evict_or_release(open_cache, page); + page = pgc_page_add_and_acquire(open_cache, page_entry, &added); + } - pg_cache->replaceQ.head = NULL; - pg_cache->replaceQ.tail = NULL; - fatal_assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock)); -} + if(!added) { + datafile_release(datafile, DATAFILE_ACQUIRE_OPEN_CACHE); -static void init_committed_page_index(struct rrdengine_instance *ctx) -{ - struct page_cache *pg_cache = &ctx->pg_cache; + internal_fatal(page_entry.end_time_s > pgc_page_end_time_s(page), + "DBENGINE: cannot add longer page to open cache"); + } - pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL; - fatal_assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock)); - pg_cache->committed_page_index.latest_corr_id = 0; - pg_cache->committed_page_index.nr_committed_pages = 0; + pgc_page_release(open_cache, (PGC_PAGE *)page); } -void init_page_cache(struct rrdengine_instance *ctx) -{ - struct page_cache *pg_cache = &ctx->pg_cache; +size_t dynamic_open_cache_size(void) { + size_t main_cache_size = pgc_get_wanted_cache_size(main_cache); + size_t target_size = main_cache_size / 100 * 5; - pg_cache->page_descriptors = 0; - pg_cache->populated_pages = 0; - fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock)); + if(target_size < 2 * 1024 * 1024) + target_size = 2 * 1024 * 1024; - init_metrics_index(ctx); - init_replaceQ(ctx); - init_committed_page_index(ctx); + return target_size; } -void free_page_cache(struct rrdengine_instance *ctx) -{ - struct page_cache *pg_cache = &ctx->pg_cache; - Pvoid_t *PValue; - struct pg_cache_page_index *page_index, *prev_page_index; - Word_t Index; - struct rrdeng_page_descr *descr; - struct page_cache_descr *pg_cache_descr; - - // if we are exiting, the OS will recover all memory so do not slow down the shutdown process - // Do the cleanup if we are compiling with NETDATA_INTERNAL_CHECKS - // This affects the reporting of dbengine statistics which are available in real time - // via the /api/v1/dbengine_stats endpoint -#ifndef NETDATA_DBENGINE_FREE - if (netdata_exit) - return; -#endif - Word_t metrics_index_bytes = 0, pages_index_bytes = 0, pages_dirty_index_bytes = 0; - - /* Free committed page index */ - pages_dirty_index_bytes = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0); - fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array); - - for (page_index = pg_cache->metrics_index.last_page_index ; - page_index != NULL ; - page_index = prev_page_index) { +size_t dynamic_extent_cache_size(void) { + size_t main_cache_size = pgc_get_wanted_cache_size(main_cache); + size_t target_size = main_cache_size / 100 * 5; - prev_page_index = page_index->prev; + if(target_size < 3 * 1024 * 1024) + target_size = 3 * 1024 * 1024; - /* Find first page in range */ - Index = (Word_t) 0; - PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0); - descr = unlikely(NULL == PValue) ? NULL : *PValue; - - while (descr != NULL) { - /* Iterate all page descriptors of this metric */ + return target_size; +} - if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) { - /* Check rrdenglocking.c */ - pg_cache_descr = descr->pg_cache_descr; - if (pg_cache_descr->flags & RRD_PAGE_POPULATED) { - dbengine_page_free(pg_cache_descr->page); - } - rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr); - } - rrdeng_page_descr_freez(descr); +void pgc_and_mrg_initialize(void) +{ + main_mrg = mrg_create(); - PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0); - descr = unlikely(NULL == PValue) ? NULL : *PValue; - } + size_t target_cache_size = (size_t)default_rrdeng_page_cache_mb * 1024ULL * 1024ULL; + size_t main_cache_size = (target_cache_size / 100) * 95; + size_t open_cache_size = 0; + size_t extent_cache_size = (target_cache_size / 100) * 5; - /* Free page index */ - pages_index_bytes += JudyLFreeArray(&page_index->JudyL_array, PJE0); - fatal_assert(NULL == page_index->JudyL_array); - freez(page_index); + if(extent_cache_size < 3 * 1024 * 1024) { + extent_cache_size = 3 * 1024 * 1024; + main_cache_size = target_cache_size - extent_cache_size; } - /* Free metrics index */ - metrics_index_bytes = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0); - fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array); - info("Freed %lu bytes of memory from page cache.", pages_dirty_index_bytes + pages_index_bytes + metrics_index_bytes); + + main_cache = pgc_create( + "main_cache", + main_cache_size, + main_cache_free_clean_page_callback, + (size_t) rrdeng_pages_per_extent, + main_cache_flush_dirty_page_init_callback, + main_cache_flush_dirty_page_callback, + 10, + 10240, // if there are that many threads, evict so many at once! + 1000, // + 5, // don't delay too much other threads + PGC_OPTIONS_AUTOSCALE, // AUTOSCALE = 2x max hot pages + 0, // 0 = as many as the system cpus + 0 + ); + + open_cache = pgc_create( + "open_cache", + open_cache_size, // the default is 1MB + open_cache_free_clean_page_callback, + 1, + NULL, + open_cache_flush_dirty_page_callback, + 10, + 10240, // if there are that many threads, evict that many at once! + 1000, // + 3, // don't delay too much other threads + PGC_OPTIONS_AUTOSCALE | PGC_OPTIONS_EVICT_PAGES_INLINE | PGC_OPTIONS_FLUSH_PAGES_INLINE, + 0, // 0 = as many as the system cpus + sizeof(struct extent_io_data) + ); + pgc_set_dynamic_target_cache_size_callback(open_cache, dynamic_open_cache_size); + + extent_cache = pgc_create( + "extent_cache", + extent_cache_size, + extent_cache_free_clean_page_callback, + 1, + NULL, + extent_cache_flush_dirty_page_callback, + 5, + 10, // it will lose up to that extents at once! + 100, // + 2, // don't delay too much other threads + PGC_OPTIONS_AUTOSCALE | PGC_OPTIONS_EVICT_PAGES_INLINE | PGC_OPTIONS_FLUSH_PAGES_INLINE, + 0, // 0 = as many as the system cpus + 0 + ); + pgc_set_dynamic_target_cache_size_callback(extent_cache, dynamic_extent_cache_size); } -- cgit v1.2.3