summaryrefslogtreecommitdiffstats
path: root/database/engine/pagecache.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/engine/pagecache.c')
-rw-r--r--database/engine/pagecache.c2054
1 files changed, 931 insertions, 1123 deletions
diff --git a/database/engine/pagecache.c b/database/engine/pagecache.c
index 4f5da7084..b4902d784 100644
--- a/database/engine/pagecache.c
+++ b/database/engine/pagecache.c
@@ -3,1084 +3,836 @@
#include "rrdengine.h"
-ARAL page_descr_aral = {
- .requested_element_size = sizeof(struct rrdeng_page_descr),
- .initial_elements = 20000,
- .filename = "page_descriptors",
- .cache_dir = &netdata_configured_cache_dir,
- .use_mmap = false,
- .internal.initialized = false
-};
-
-void rrdeng_page_descr_aral_go_singlethreaded(void) {
- page_descr_aral.internal.lockless = true;
-}
-void rrdeng_page_descr_aral_go_multithreaded(void) {
- page_descr_aral.internal.lockless = false;
-}
+MRG *main_mrg = NULL;
+PGC *main_cache = NULL;
+PGC *open_cache = NULL;
+PGC *extent_cache = NULL;
+struct rrdeng_cache_efficiency_stats rrdeng_cache_efficiency_stats = {};
-struct rrdeng_page_descr *rrdeng_page_descr_mallocz(void) {
- struct rrdeng_page_descr *descr;
- descr = arrayalloc_mallocz(&page_descr_aral);
- return descr;
+static void main_cache_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused)
+{
+ // Release storage associated with the page
+ dbengine_page_free(entry.data, entry.size);
}
+static void main_cache_flush_dirty_page_init_callback(PGC *cache __maybe_unused, Word_t section) {
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *) section;
-void rrdeng_page_descr_freez(struct rrdeng_page_descr *descr) {
- arrayalloc_freez(&page_descr_aral, descr);
+ // mark ctx as having flushing in progress
+ __atomic_add_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED);
}
-void rrdeng_page_descr_use_malloc(void) {
- if(page_descr_aral.internal.initialized)
- error("DBENGINE: cannot change ARAL allocation policy after it has been initialized.");
- else
- page_descr_aral.use_mmap = false;
-}
+static void main_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused)
+{
+ if(!entries)
+ return;
-void rrdeng_page_descr_use_mmap(void) {
- if(page_descr_aral.internal.initialized)
- error("DBENGINE: cannot change ARAL allocation policy after it has been initialized.");
- else
- page_descr_aral.use_mmap = true;
-}
+ struct rrdengine_instance *ctx = (struct rrdengine_instance *) entries_array[0].section;
-bool rrdeng_page_descr_is_mmap(void) {
- return page_descr_aral.use_mmap;
-}
+ size_t bytes_per_point = CTX_POINT_SIZE_BYTES(ctx);
-/* Forward declarations */
-static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx);
+ struct page_descr_with_data *base = NULL;
-/* always inserts into tail */
-static inline void pg_cache_replaceQ_insert_unsafe(struct rrdengine_instance *ctx,
- struct rrdeng_page_descr *descr)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ for (size_t Index = 0 ; Index < entries; Index++) {
+ time_t start_time_s = entries_array[Index].start_time_s;
+ time_t end_time_s = entries_array[Index].end_time_s;
+ struct page_descr_with_data *descr = page_descriptor_get();
- if (likely(NULL != pg_cache->replaceQ.tail)) {
- pg_cache_descr->prev = pg_cache->replaceQ.tail;
- pg_cache->replaceQ.tail->next = pg_cache_descr;
- }
- if (unlikely(NULL == pg_cache->replaceQ.head)) {
- pg_cache->replaceQ.head = pg_cache_descr;
- }
- pg_cache->replaceQ.tail = pg_cache_descr;
-}
-
-static inline void pg_cache_replaceQ_delete_unsafe(struct rrdengine_instance *ctx,
- struct rrdeng_page_descr *descr)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr, *prev, *next;
+ descr->id = mrg_metric_uuid(main_mrg, (METRIC *) entries_array[Index].metric_id);
+ descr->metric_id = entries_array[Index].metric_id;
+ descr->start_time_ut = start_time_s * USEC_PER_SEC;
+ descr->end_time_ut = end_time_s * USEC_PER_SEC;
+ descr->update_every_s = entries_array[Index].update_every_s;
+ descr->type = ctx->config.page_type;
- prev = pg_cache_descr->prev;
- next = pg_cache_descr->next;
+ descr->page_length = (end_time_s - (start_time_s - descr->update_every_s)) / descr->update_every_s * bytes_per_point;
- if (likely(NULL != prev)) {
- prev->next = next;
- }
- if (likely(NULL != next)) {
- next->prev = prev;
- }
- if (unlikely(pg_cache_descr == pg_cache->replaceQ.head)) {
- pg_cache->replaceQ.head = next;
- }
- if (unlikely(pg_cache_descr == pg_cache->replaceQ.tail)) {
- pg_cache->replaceQ.tail = prev;
- }
- pg_cache_descr->prev = pg_cache_descr->next = NULL;
-}
+ if(descr->page_length > entries_array[Index].size) {
+ descr->page_length = entries_array[Index].size;
-void pg_cache_replaceQ_insert(struct rrdengine_instance *ctx,
- struct rrdeng_page_descr *descr)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
-
- uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
- pg_cache_replaceQ_insert_unsafe(ctx, descr);
- uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
-}
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl, "DBENGINE: page exceeds the maximum size, adjusting it to max.");
+ }
-void pg_cache_replaceQ_delete(struct rrdengine_instance *ctx,
- struct rrdeng_page_descr *descr)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
+ descr->page = pgc_page_data(pages_array[Index]);
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(base, descr, link.prev, link.next);
- uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
- pg_cache_replaceQ_delete_unsafe(ctx, descr);
- uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
-}
-void pg_cache_replaceQ_set_hot(struct rrdengine_instance *ctx,
- struct rrdeng_page_descr *descr)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
+ internal_fatal(descr->page_length > RRDENG_BLOCK_SIZE, "DBENGINE: faulty page length calculation");
+ }
- uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
- pg_cache_replaceQ_delete_unsafe(ctx, descr);
- pg_cache_replaceQ_insert_unsafe(ctx, descr);
- uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+ struct completion completion;
+ completion_init(&completion);
+ rrdeng_enq_cmd(ctx, RRDENG_OPCODE_EXTENT_WRITE, base, &completion, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
+ completion_wait_for(&completion);
+ completion_destroy(&completion);
}
-struct rrdeng_page_descr *pg_cache_create_descr(void)
+static void open_cache_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused)
{
- struct rrdeng_page_descr *descr;
-
- descr = rrdeng_page_descr_mallocz();
- descr->page_length = 0;
- descr->start_time_ut = INVALID_TIME;
- descr->end_time_ut = INVALID_TIME;
- descr->id = NULL;
- descr->extent = NULL;
- descr->pg_cache_descr_state = 0;
- descr->pg_cache_descr = NULL;
- descr->update_every_s = 0;
-
- return descr;
+ struct rrdengine_datafile *datafile = entry.data;
+ datafile_release(datafile, DATAFILE_ACQUIRE_OPEN_CACHE);
}
-/* The caller must hold page descriptor lock. */
-void pg_cache_wake_up_waiters_unsafe(struct rrdeng_page_descr *descr)
+static void open_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused)
{
- struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
- if (pg_cache_descr->waiters)
- uv_cond_broadcast(&pg_cache_descr->cond);
+ ;
}
-void pg_cache_wake_up_waiters(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+static void extent_cache_free_clean_page_callback(PGC *cache __maybe_unused, PGC_ENTRY entry __maybe_unused)
{
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_wake_up_waiters_unsafe(descr);
- rrdeng_page_descr_mutex_unlock(ctx, descr);
+ dbengine_extent_free(entry.data, entry.size);
}
-/*
- * The caller must hold page descriptor lock.
- * The lock will be released and re-acquired. The descriptor is not guaranteed
- * to exist after this function returns.
- */
-void pg_cache_wait_event_unsafe(struct rrdeng_page_descr *descr)
+static void extent_cache_flush_dirty_page_callback(PGC *cache __maybe_unused, PGC_ENTRY *entries_array __maybe_unused, PGC_PAGE **pages_array __maybe_unused, size_t entries __maybe_unused)
{
- struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
-
- ++pg_cache_descr->waiters;
- uv_cond_wait(&pg_cache_descr->cond, &pg_cache_descr->mutex);
- --pg_cache_descr->waiters;
+ ;
}
-/*
- * The caller must hold page descriptor lock.
- * The lock will be released and re-acquired. The descriptor is not guaranteed
- * to exist after this function returns.
- * Returns UV_ETIMEDOUT if timeout_sec seconds pass.
- */
-int pg_cache_timedwait_event_unsafe(struct rrdeng_page_descr *descr, uint64_t timeout_sec)
-{
- int ret;
- struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+inline TIME_RANGE_COMPARE is_page_in_time_range(time_t page_first_time_s, time_t page_last_time_s, time_t wanted_start_time_s, time_t wanted_end_time_s) {
+ // page_first_time_s <= wanted_end_time_s && page_last_time_s >= wanted_start_time_s
+
+ if(page_last_time_s < wanted_start_time_s)
+ return PAGE_IS_IN_THE_PAST;
- ++pg_cache_descr->waiters;
- ret = uv_cond_timedwait(&pg_cache_descr->cond, &pg_cache_descr->mutex, timeout_sec * NSEC_PER_SEC);
- --pg_cache_descr->waiters;
+ if(page_first_time_s > wanted_end_time_s)
+ return PAGE_IS_IN_THE_FUTURE;
- return ret;
+ return PAGE_IS_IN_RANGE;
}
-/*
- * Returns page flags.
- * The lock will be released and re-acquired. The descriptor is not guaranteed
- * to exist after this function returns.
- */
-unsigned long pg_cache_wait_event(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
+static int journal_metric_uuid_compare(const void *key, const void *metric)
{
- struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
- unsigned long flags;
-
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_wait_event_unsafe(descr);
- flags = pg_cache_descr->flags;
- rrdeng_page_descr_mutex_unlock(ctx, descr);
-
- return flags;
+ return uuid_compare(*(uuid_t *) key, ((struct journal_metric_list *) metric)->uuid);
}
-/*
- * The caller must hold page descriptor lock.
- */
-int pg_cache_can_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
-{
- struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+static inline struct page_details *pdc_find_page_for_time(
+ Pcvoid_t PArray,
+ time_t wanted_time_s,
+ size_t *gaps,
+ PDC_PAGE_STATUS mode,
+ PDC_PAGE_STATUS skip_list
+) {
+ Word_t PIndexF = wanted_time_s, PIndexL = wanted_time_s;
+ Pvoid_t *PValueF, *PValueL;
+ struct page_details *pdF = NULL, *pdL = NULL;
+ bool firstF = true, firstL = true;
+
+ PDC_PAGE_STATUS ignore_list = PDC_PAGE_QUERY_GLOBAL_SKIP_LIST | skip_list;
+
+ while ((PValueF = PDCJudyLFirstThenNext(PArray, &PIndexF, &firstF))) {
+ pdF = *PValueF;
+
+ PDC_PAGE_STATUS status = __atomic_load_n(&pdF->status, __ATOMIC_ACQUIRE);
+ if (!(status & (ignore_list | mode)))
+ break;
- if ((pg_cache_descr->flags & (RRD_PAGE_LOCKED | RRD_PAGE_READ_PENDING)) ||
- (exclusive_access && pg_cache_descr->refcnt)) {
- return 0;
+ pdF = NULL;
}
- return 1;
-}
+ while ((PValueL = PDCJudyLLastThenPrev(PArray, &PIndexL, &firstL))) {
+ pdL = *PValueL;
-/*
- * The caller must hold page descriptor lock.
- * Gets a reference to the page descriptor.
- * Returns 1 on success and 0 on failure.
- */
-int pg_cache_try_get_unsafe(struct rrdeng_page_descr *descr, int exclusive_access)
-{
- struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ PDC_PAGE_STATUS status = __atomic_load_n(&pdL->status, __ATOMIC_ACQUIRE);
+ if(status & mode) {
+ // don't go all the way back to the beginning
+ // stop at the last processed
+ pdL = NULL;
+ break;
+ }
- if (!pg_cache_can_get_unsafe(descr, exclusive_access))
- return 0;
+ if (!(status & ignore_list))
+ break;
- if (exclusive_access)
- pg_cache_descr->flags |= RRD_PAGE_LOCKED;
- ++pg_cache_descr->refcnt;
+ pdL = NULL;
+ }
- return 1;
-}
+ TIME_RANGE_COMPARE rcF = (pdF) ? is_page_in_time_range(pdF->first_time_s, pdF->last_time_s, wanted_time_s, wanted_time_s) : PAGE_IS_IN_THE_FUTURE;
+ TIME_RANGE_COMPARE rcL = (pdL) ? is_page_in_time_range(pdL->first_time_s, pdL->last_time_s, wanted_time_s, wanted_time_s) : PAGE_IS_IN_THE_PAST;
-/*
- * The caller must hold the page descriptor lock.
- * This function may block doing cleanup.
- */
-void pg_cache_put_unsafe(struct rrdeng_page_descr *descr)
-{
- struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ if (!pdF || pdF == pdL) {
+ // F is missing, or they are the same
+ // return L
+ (*gaps) += (rcL == PAGE_IS_IN_RANGE) ? 0 : 1;
+ return pdL;
+ }
- pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
- if (0 == --pg_cache_descr->refcnt) {
- pg_cache_wake_up_waiters_unsafe(descr);
+ if (!pdL) {
+ // L is missing
+ // return F
+ (*gaps) += (rcF == PAGE_IS_IN_RANGE) ? 0 : 1;
+ return pdF;
}
-}
-/*
- * This function may block doing cleanup.
- */
-void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
-{
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_put_unsafe(descr);
- rrdeng_page_descr_mutex_unlock(ctx, descr);
-}
+ if (rcF == rcL) {
+ // both are on the same side,
+ // but they are different pages
-/* The caller must hold the page cache lock */
-static void pg_cache_release_pages_unsafe(struct rrdengine_instance *ctx, unsigned number)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
+ switch (rcF) {
+ case PAGE_IS_IN_RANGE:
+ // pick the higher resolution
+ if (pdF->update_every_s && pdF->update_every_s < pdL->update_every_s)
+ return pdF;
- pg_cache->populated_pages -= number;
-}
+ if (pdL->update_every_s && pdL->update_every_s < pdF->update_every_s)
+ return pdL;
-static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned number)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
+ // same resolution - pick the one that starts earlier
+ if (pdL->first_time_s < pdF->first_time_s)
+ return pdL;
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
- pg_cache_release_pages_unsafe(ctx, number);
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
-}
+ return pdF;
+ break;
-/*
- * This function returns the maximum number of pages allowed in the page cache.
- */
-unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
-{
- return ctx->max_cache_pages + (unsigned long)ctx->metric_API_max_producers;
-}
+ case PAGE_IS_IN_THE_FUTURE:
+ (*gaps)++;
-/*
- * This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the
- * number of pages below that number.
- */
-unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
-{
- return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers;
-}
+ // pick the one that starts earlier
+ if (pdL->first_time_s < pdF->first_time_s)
+ return pdL;
-/*
- * This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page
- * cache.
- */
-unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx)
-{
- /* We remove the active pages of the producers from the calculation and only allow the extra pinned pages */
- return ctx->cache_pages_low_watermark + (unsigned long)ctx->metric_API_max_producers;
-}
+ return pdF;
+ break;
-/*
- * This function will block until it reserves #number populated pages.
- * It will trigger evictions or dirty page flushing if the pg_cache_hard_limit() limit is hit.
- */
-static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- unsigned failures = 0;
- const unsigned FAILURES_CEILING = 10; /* truncates exponential backoff to (2^FAILURES_CEILING x slot) */
- unsigned long exp_backoff_slot_usec = USEC_PER_MS * 10;
-
- assert(number < ctx->max_cache_pages);
-
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
- if (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1)
- debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==",
- number);
- while (pg_cache->populated_pages + number >= pg_cache_hard_limit(ctx) + 1) {
-
- if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
- /* failed to evict */
- struct completion compl;
- struct rrdeng_cmd cmd;
-
- ++failures;
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
-
- completion_init(&compl);
- cmd.opcode = RRDENG_FLUSH_PAGES;
- cmd.completion = &compl;
- rrdeng_enq_cmd(&ctx->worker_config, &cmd);
- /* wait for some pages to be flushed */
- debug(D_RRDENGINE, "%s: waiting for pages to be written to disk before evicting.", __func__);
- completion_wait_for(&compl);
- completion_destroy(&compl);
-
- if (unlikely(failures > 1)) {
- unsigned long slots, usecs_to_sleep;
- /* exponential backoff */
- slots = random() % (2LU << MIN(failures, FAILURES_CEILING));
- usecs_to_sleep = slots * exp_backoff_slot_usec;
-
- if (usecs_to_sleep >= USEC_PER_SEC)
- error("Page cache is full. Sleeping for %llu second(s).", usecs_to_sleep / USEC_PER_SEC);
-
- (void)sleep_usec(usecs_to_sleep);
- }
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
+ default:
+ case PAGE_IS_IN_THE_PAST:
+ (*gaps)++;
+ return NULL;
+ break;
}
}
- pg_cache->populated_pages += number;
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
-}
-/*
- * This function will attempt to reserve #number populated pages.
- * It may trigger evictions if the pg_cache_soft_limit() limit is hit.
- * Returns 0 on failure and 1 on success.
- */
-static int pg_cache_try_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- unsigned count = 0;
- int ret = 0;
-
- assert(number < ctx->max_cache_pages);
-
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
- if (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1) {
- debug(D_RRDENGINE,
- "==Page cache full. Trying to reserve %u pages.==",
- number);
- do {
- if (!pg_cache_try_evict_one_page_unsafe(ctx))
- break;
- ++count;
- } while (pg_cache->populated_pages + number >= pg_cache_soft_limit(ctx) + 1);
- debug(D_RRDENGINE, "Evicted %u pages.", count);
+ if(rcF == PAGE_IS_IN_RANGE) {
+ // (*gaps) += 0;
+ return pdF;
}
- if (pg_cache->populated_pages + number < pg_cache_hard_limit(ctx) + 1) {
- pg_cache->populated_pages += number;
- ret = 1; /* success */
+ if(rcL == PAGE_IS_IN_RANGE) {
+ // (*gaps) += 0;
+ return pdL;
}
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
- return ret;
-}
+ if(rcF == PAGE_IS_IN_THE_FUTURE) {
+ (*gaps)++;
+ return pdF;
+ }
-/* The caller must hold the page cache and the page descriptor locks in that order */
-static void pg_cache_evict_unsafe(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr)
-{
- struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
+ if(rcL == PAGE_IS_IN_THE_FUTURE) {
+ (*gaps)++;
+ return pdL;
+ }
- dbengine_page_free(pg_cache_descr->page);
- pg_cache_descr->page = NULL;
- pg_cache_descr->flags &= ~RRD_PAGE_POPULATED;
- pg_cache_release_pages_unsafe(ctx, 1);
- ++ctx->stats.pg_cache_evictions;
+ // impossible case
+ (*gaps)++;
+ return NULL;
}
-/*
- * The caller must hold the page cache lock.
- * Lock order: page cache -> replaceQ -> page descriptor
- * This function iterates all pages and tries to evict one.
- * If it fails it sets in_flight_descr to the oldest descriptor that has write-back in progress,
- * or it sets it to NULL if no write-back is in progress.
- *
- * Returns 1 on success and 0 on failure.
- */
-static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- unsigned long old_flags;
- struct rrdeng_page_descr *descr;
- struct page_cache_descr *pg_cache_descr = NULL;
+static size_t get_page_list_from_pgc(PGC *cache, METRIC *metric, struct rrdengine_instance *ctx,
+ time_t wanted_start_time_s, time_t wanted_end_time_s,
+ Pvoid_t *JudyL_page_array, size_t *cache_gaps,
+ bool open_cache_mode, PDC_PAGE_STATUS tags) {
- uv_rwlock_wrlock(&pg_cache->replaceQ.lock);
- for (pg_cache_descr = pg_cache->replaceQ.head ; NULL != pg_cache_descr ; pg_cache_descr = pg_cache_descr->next) {
- descr = pg_cache_descr->descr;
+ size_t pages_found_in_cache = 0;
+ Word_t metric_id = mrg_metric_id(main_mrg, metric);
- rrdeng_page_descr_mutex_lock(ctx, descr);
- old_flags = pg_cache_descr->flags;
- if ((old_flags & RRD_PAGE_POPULATED) && !(old_flags & RRD_PAGE_DIRTY) && pg_cache_try_get_unsafe(descr, 1)) {
- /* must evict */
- pg_cache_evict_unsafe(ctx, descr);
- pg_cache_put_unsafe(descr);
- pg_cache_replaceQ_delete_unsafe(ctx, descr);
+ time_t now_s = wanted_start_time_s;
+ time_t dt_s = mrg_metric_get_update_every_s(main_mrg, metric);
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+ if(!dt_s)
+ dt_s = default_rrd_update_every;
- rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
+ time_t previous_page_end_time_s = now_s - dt_s;
+ bool first = true;
- return 1;
- }
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- }
- uv_rwlock_wrunlock(&pg_cache->replaceQ.lock);
+ do {
+ PGC_PAGE *page = pgc_page_get_and_acquire(
+ cache, (Word_t)ctx, (Word_t)metric_id, now_s,
+ (first) ? PGC_SEARCH_CLOSEST : PGC_SEARCH_NEXT);
- /* failed to evict */
- return 0;
-}
+ first = false;
-/**
- * Deletes a page from the database.
- * Callers of this function need to make sure they're not deleting the same descriptor concurrently.
- * @param ctx is the database instance.
- * @param descr is the page descriptor.
- * @param remove_dirty must be non-zero if the page to be deleted is dirty.
- * @param is_exclusive_holder must be non-zero if the caller holds an exclusive page reference.
- * @param metric_id is set to the metric the page belongs to, if it's safe to delete the metric and metric_id is not
- * NULL. Otherwise, metric_id is not set.
- * @return 1 if it's safe to delete the metric, 0 otherwise.
- */
-uint8_t pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
- uint8_t is_exclusive_holder, uuid_t *metric_id)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct page_cache_descr *pg_cache_descr = NULL;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index = NULL;
- int ret;
- uint8_t can_delete_metric = 0;
-
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
- fatal_assert(NULL != PValue);
- page_index = *PValue;
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
-
- uv_rwlock_wrlock(&page_index->lock);
- ret = JudyLDel(&page_index->JudyL_array, (Word_t)(descr->start_time_ut / USEC_PER_SEC), PJE0);
- if (unlikely(0 == ret)) {
- uv_rwlock_wrunlock(&page_index->lock);
- if (unlikely(debug_flags & D_RRDENGINE)) {
- print_page_descr(descr);
- }
- goto destroy;
- }
- --page_index->page_count;
- if (!page_index->writers && !page_index->page_count) {
- can_delete_metric = 1;
- if (metric_id) {
- memcpy(metric_id, page_index->id, sizeof(uuid_t));
- }
- }
- uv_rwlock_wrunlock(&page_index->lock);
- fatal_assert(1 == ret);
-
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
- ++ctx->stats.pg_cache_deletions;
- --pg_cache->page_descriptors;
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
-
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_descr = descr->pg_cache_descr;
- if (!is_exclusive_holder) {
- /* If we don't hold an exclusive page reference get one */
- while (!pg_cache_try_get_unsafe(descr, 1)) {
- debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr, "", true);
- pg_cache_wait_event_unsafe(descr);
+ if(!page) {
+ if(previous_page_end_time_s < wanted_end_time_s)
+ (*cache_gaps)++;
+
+ break;
}
- }
- if (remove_dirty) {
- pg_cache_descr->flags &= ~RRD_PAGE_DIRTY;
- } else {
- /* even a locked page could be dirty */
- while (unlikely(pg_cache_descr->flags & RRD_PAGE_DIRTY)) {
- debug(D_RRDENGINE, "%s: Found dirty page, waiting for it to be flushed:", __func__);
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr, "", true);
- pg_cache_wait_event_unsafe(descr);
+
+ time_t page_start_time_s = pgc_page_start_time_s(page);
+ time_t page_end_time_s = pgc_page_end_time_s(page);
+ time_t page_update_every_s = pgc_page_update_every_s(page);
+ size_t page_length = pgc_page_data_size(cache, page);
+
+ if(!page_update_every_s)
+ page_update_every_s = dt_s;
+
+ if(is_page_in_time_range(page_start_time_s, page_end_time_s, wanted_start_time_s, wanted_end_time_s) != PAGE_IS_IN_RANGE) {
+ // not a useful page for this query
+ pgc_page_release(cache, page);
+ page = NULL;
+
+ if(previous_page_end_time_s < wanted_end_time_s)
+ (*cache_gaps)++;
+
+ break;
}
- }
- rrdeng_page_descr_mutex_unlock(ctx, descr);
-
- while (unlikely(pg_cache_descr->flags & RRD_PAGE_READ_PENDING)) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "%s: Found page with READ PENDING, waiting for read to complete", __func__);
- if (unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr, "", true);
- pg_cache_wait_event_unsafe(descr);
- }
- if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
- /* only after locking can it be safely deleted from LRU */
- pg_cache_replaceQ_delete(ctx, descr);
+ if (page_start_time_s - previous_page_end_time_s > dt_s)
+ (*cache_gaps)++;
+
+ Pvoid_t *PValue = PDCJudyLIns(JudyL_page_array, (Word_t) page_start_time_s, PJE0);
+ if (!PValue || PValue == PJERR)
+ fatal("DBENGINE: corrupted judy array in %s()", __FUNCTION__ );
+
+ if (unlikely(*PValue)) {
+ struct page_details *pd = *PValue;
+ UNUSED(pd);
+
+// internal_error(
+// pd->first_time_s != page_first_time_s ||
+// pd->last_time_s != page_last_time_s ||
+// pd->update_every_s != page_update_every_s,
+// "DBENGINE: duplicate page with different retention in %s cache "
+// "1st: %ld to %ld, ue %u, size %u "
+// "2nd: %ld to %ld, ue %ld size %zu "
+// "- ignoring the second",
+// cache == open_cache ? "open" : "main",
+// pd->first_time_s, pd->last_time_s, pd->update_every_s, pd->page_length,
+// page_first_time_s, page_last_time_s, page_update_every_s, page_length);
+
+ pgc_page_release(cache, page);
+ }
+ else {
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
- pg_cache_evict_unsafe(ctx, descr);
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
- }
- pg_cache_put(ctx, descr);
- rrdeng_try_deallocate_pg_cache_descr(ctx, descr);
- while (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
- rrdeng_try_deallocate_pg_cache_descr(ctx, descr); /* spin */
- (void)sleep_usec(1000); /* 1 msec */
- }
-destroy:
- rrdeng_page_descr_freez(descr);
- pg_cache_update_metric_times(page_index);
+ internal_fatal(pgc_page_metric(page) != metric_id, "Wrong metric id in page found in cache");
+ internal_fatal(pgc_page_section(page) != (Word_t)ctx, "Wrong section in page found in cache");
- return can_delete_metric;
-}
+ struct page_details *pd = page_details_get();
+ pd->metric_id = metric_id;
+ pd->first_time_s = page_start_time_s;
+ pd->last_time_s = page_end_time_s;
+ pd->page_length = page_length;
+ pd->update_every_s = page_update_every_s;
+ pd->page = (open_cache_mode) ? NULL : page;
+ pd->status |= tags;
-static inline int is_page_in_time_range(struct rrdeng_page_descr *descr, usec_t start_time, usec_t end_time)
-{
- usec_t pg_start, pg_end;
+ if((pd->page)) {
+ pd->status |= PDC_PAGE_READY | PDC_PAGE_PRELOADED;
- pg_start = descr->start_time_ut;
- pg_end = descr->end_time_ut;
+ if(pgc_page_data(page) == DBENGINE_EMPTY_PAGE)
+ pd->status |= PDC_PAGE_EMPTY;
+ }
- return (pg_start < start_time && pg_end >= start_time) ||
- (pg_start >= start_time && pg_start <= end_time);
-}
+ if(open_cache_mode) {
+ struct rrdengine_datafile *datafile = pgc_page_data(page);
+ if(datafile_acquire(datafile, DATAFILE_ACQUIRE_PAGE_DETAILS)) { // for pd
+ struct extent_io_data *xio = (struct extent_io_data *) pgc_page_custom_data(cache, page);
+ pd->datafile.ptr = pgc_page_data(page);
+ pd->datafile.file = xio->file;
+ pd->datafile.extent.pos = xio->pos;
+ pd->datafile.extent.bytes = xio->bytes;
+ pd->datafile.fileno = pd->datafile.ptr->fileno;
+ pd->status |= PDC_PAGE_DATAFILE_ACQUIRED | PDC_PAGE_DISK_PENDING;
+ }
+ else {
+ pd->status |= PDC_PAGE_FAILED | PDC_PAGE_FAILED_TO_ACQUIRE_DATAFILE;
+ }
+ pgc_page_release(cache, page);
+ }
-static inline int is_point_in_time_in_page(struct rrdeng_page_descr *descr, usec_t point_in_time)
-{
- return (point_in_time >= descr->start_time_ut && point_in_time <= descr->end_time_ut);
-}
+ *PValue = pd;
-/* The caller must hold the page index lock */
-static inline struct rrdeng_page_descr *
- find_first_page_in_time_range(struct pg_cache_page_index *page_index, usec_t start_time, usec_t end_time)
-{
- struct rrdeng_page_descr *descr = NULL;
- Pvoid_t *PValue;
- Word_t Index;
-
- Index = (Word_t)(start_time / USEC_PER_SEC);
- PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
- if (likely(NULL != PValue)) {
- descr = *PValue;
- if (is_page_in_time_range(descr, start_time, end_time)) {
- return descr;
+ pages_found_in_cache++;
}
- }
- Index = (Word_t)(start_time / USEC_PER_SEC);
- PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
- if (likely(NULL != PValue)) {
- descr = *PValue;
- if (is_page_in_time_range(descr, start_time, end_time)) {
- return descr;
- }
- }
+ // prepare for the next iteration
+ previous_page_end_time_s = page_end_time_s;
- return NULL;
-}
+ if(page_update_every_s > 0)
+ dt_s = page_update_every_s;
-/* Update metric oldest and latest timestamps efficiently when adding new values */
-void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr)
-{
- usec_t oldest_time = page_index->oldest_time_ut;
- usec_t latest_time = page_index->latest_time_ut;
+ // we are going to as for the NEXT page
+ // so, set this to our first time
+ now_s = page_start_time_s;
- if (unlikely(oldest_time == INVALID_TIME || descr->start_time_ut < oldest_time)) {
- page_index->oldest_time_ut = descr->start_time_ut;
- }
- if (likely(descr->end_time_ut > latest_time || latest_time == INVALID_TIME)) {
- page_index->latest_time_ut = descr->end_time_ut;
- }
+ } while(now_s <= wanted_end_time_s);
+
+ return pages_found_in_cache;
}
-/* Update metric oldest and latest timestamps when removing old values */
-void pg_cache_update_metric_times(struct pg_cache_page_index *page_index)
-{
- Pvoid_t *firstPValue, *lastPValue;
- Word_t firstIndex, lastIndex;
- struct rrdeng_page_descr *descr;
- usec_t oldest_time = INVALID_TIME;
- usec_t latest_time = INVALID_TIME;
-
- uv_rwlock_rdlock(&page_index->lock);
- /* Find first page in range */
- firstIndex = (Word_t)0;
- firstPValue = JudyLFirst(page_index->JudyL_array, &firstIndex, PJE0);
- if (likely(NULL != firstPValue)) {
- descr = *firstPValue;
- oldest_time = descr->start_time_ut;
- }
- lastIndex = (Word_t)-1;
- lastPValue = JudyLLast(page_index->JudyL_array, &lastIndex, PJE0);
- if (likely(NULL != lastPValue)) {
- descr = *lastPValue;
- latest_time = descr->end_time_ut;
- }
- uv_rwlock_rdunlock(&page_index->lock);
+static void pgc_inject_gap(struct rrdengine_instance *ctx, METRIC *metric, time_t start_time_s, time_t end_time_s) {
- if (unlikely(NULL == firstPValue)) {
- fatal_assert(NULL == lastPValue);
- page_index->oldest_time_ut = page_index->latest_time_ut = INVALID_TIME;
+ time_t db_first_time_s, db_last_time_s, db_update_every_s;
+ mrg_metric_get_retention(main_mrg, metric, &db_first_time_s, &db_last_time_s, &db_update_every_s);
+
+ if(is_page_in_time_range(start_time_s, end_time_s, db_first_time_s, db_last_time_s) != PAGE_IS_IN_RANGE)
return;
- }
- page_index->oldest_time_ut = oldest_time;
- page_index->latest_time_ut = latest_time;
+
+ PGC_ENTRY page_entry = {
+ .hot = false,
+ .section = (Word_t)ctx,
+ .metric_id = (Word_t)metric,
+ .start_time_s = MAX(start_time_s, db_first_time_s),
+ .end_time_s = MIN(end_time_s, db_last_time_s),
+ .update_every_s = 0,
+ .size = 0,
+ .data = DBENGINE_EMPTY_PAGE,
+ };
+
+ if(page_entry.start_time_s >= page_entry.end_time_s)
+ return;
+
+ PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, NULL);
+ pgc_page_release(main_cache, page);
}
-/* If index is NULL lookup by UUID (descr->id) */
-void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
- struct rrdeng_page_descr *descr)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
+static size_t list_has_time_gaps(
+ struct rrdengine_instance *ctx,
+ METRIC *metric,
+ Pvoid_t JudyL_page_array,
+ time_t wanted_start_time_s,
+ time_t wanted_end_time_s,
+ size_t *pages_total,
+ size_t *pages_found_pass4,
+ size_t *pages_pending,
+ size_t *pages_overlapping,
+ time_t *optimal_end_time_s,
+ bool populate_gaps
+) {
+ // we will recalculate these, so zero them
+ *pages_pending = 0;
+ *pages_overlapping = 0;
+ *optimal_end_time_s = 0;
+
+ bool first;
Pvoid_t *PValue;
- struct pg_cache_page_index *page_index;
- unsigned long pg_cache_descr_state = descr->pg_cache_descr_state;
-
- if (0 != pg_cache_descr_state) {
- /* there is page cache descriptor pre-allocated state */
- struct page_cache_descr *pg_cache_descr = descr->pg_cache_descr;
-
- fatal_assert(pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED);
- if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
- pg_cache_reserve_pages(ctx, 1);
- if (!(pg_cache_descr->flags & RRD_PAGE_DIRTY))
- pg_cache_replaceQ_insert(ctx, descr);
- }
- }
+ Word_t this_page_start_time;
+ struct page_details *pd;
+
+ size_t gaps = 0;
+ Word_t metric_id = mrg_metric_id(main_mrg, metric);
+
+ // ------------------------------------------------------------------------
+ // PASS 1: remove the preprocessing flags from the pages in PDC
- if (unlikely(NULL == index)) {
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, descr->id, sizeof(uuid_t));
- fatal_assert(NULL != PValue);
- page_index = *PValue;
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- } else {
- page_index = index;
+ first = true;
+ this_page_start_time = 0;
+ while((PValue = PDCJudyLFirstThenNext(JudyL_page_array, &this_page_start_time, &first))) {
+ pd = *PValue;
+ pd->status &= ~(PDC_PAGE_SKIP|PDC_PAGE_PREPROCESSED);
}
- uv_rwlock_wrlock(&page_index->lock);
- PValue = JudyLIns(&page_index->JudyL_array, (Word_t)(descr->start_time_ut / USEC_PER_SEC), PJE0);
- *PValue = descr;
- ++page_index->page_count;
- pg_cache_add_new_metric_time(page_index, descr);
- uv_rwlock_wrunlock(&page_index->lock);
-
- uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
- ++ctx->stats.pg_cache_insertions;
- ++pg_cache->page_descriptors;
- uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
-}
+ // ------------------------------------------------------------------------
+ // PASS 2: emulate processing to find the useful pages
-usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_descr *descr = NULL;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index = NULL;
+ time_t now_s = wanted_start_time_s;
+ time_t dt_s = mrg_metric_get_update_every_s(main_mrg, metric);
+ if(!dt_s)
+ dt_s = default_rrd_update_every;
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
- }
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (NULL == PValue) {
- return INVALID_TIME;
- }
+ size_t pages_pass2 = 0, pages_pass3 = 0;
+ while((pd = pdc_find_page_for_time(
+ JudyL_page_array, now_s, &gaps,
+ PDC_PAGE_PREPROCESSED, 0))) {
- uv_rwlock_rdlock(&page_index->lock);
- descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut);
- if (NULL == descr) {
- uv_rwlock_rdunlock(&page_index->lock);
- return INVALID_TIME;
- }
- uv_rwlock_rdunlock(&page_index->lock);
- return descr->start_time_ut;
-}
+ pd->status |= PDC_PAGE_PREPROCESSED;
+ pages_pass2++;
-/**
- * Return page information for the first page before point_in_time that satisfies the filter.
- * @param ctx DB context
- * @param page_index page index of a metric
- * @param point_in_time_ut the pages that are searched must be older than this timestamp
- * @param filter decides if the page satisfies the caller's criteria
- * @param page_info the result of the search is set in this pointer
- */
-void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
- usec_t point_in_time_ut, pg_cache_page_info_filter_t *filter,
- struct rrdeng_page_info *page_info)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_descr *descr = NULL;
- Pvoid_t *PValue;
- Word_t Index;
+ if(pd->update_every_s)
+ dt_s = pd->update_every_s;
- (void)pg_cache;
- fatal_assert(NULL != page_index);
+ if(populate_gaps && pd->first_time_s > now_s)
+ pgc_inject_gap(ctx, metric, now_s, pd->first_time_s);
- Index = (Word_t)(point_in_time_ut / USEC_PER_SEC);
- uv_rwlock_rdlock(&page_index->lock);
- do {
- PValue = JudyLPrev(page_index->JudyL_array, &Index, PJE0);
- descr = unlikely(NULL == PValue) ? NULL : *PValue;
- } while (descr != NULL && !filter(descr));
- if (unlikely(NULL == descr)) {
- page_info->page_length = 0;
- page_info->start_time_ut = INVALID_TIME;
- page_info->end_time_ut = INVALID_TIME;
- } else {
- page_info->page_length = descr->page_length;
- page_info->start_time_ut = descr->start_time_ut;
- page_info->end_time_ut = descr->end_time_ut;
+ now_s = pd->last_time_s + dt_s;
+ if(now_s > wanted_end_time_s) {
+ *optimal_end_time_s = pd->last_time_s;
+ break;
+ }
}
- uv_rwlock_rdunlock(&page_index->lock);
-}
-/**
- * Searches for an unallocated page without triggering disk I/O. Attempts to reserve the page and get a reference.
- * @param ctx DB context
- * @param id lookup by UUID
- * @param start_time_ut exact starting time in usec
- * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
- * @return the page descriptor or NULL on failure. It can fail if:
- * 1. The page is already allocated to the page cache.
- * 2. It did not succeed to get a reference.
- * 3. It did not succeed to reserve a spot in the page cache.
- */
-struct rrdeng_page_descr *pg_cache_lookup_unpopulated_and_lock(struct rrdengine_instance *ctx, uuid_t *id,
- usec_t start_time_ut)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_descr *descr = NULL;
- struct page_cache_descr *pg_cache_descr = NULL;
- unsigned long flags;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index = NULL;
- Word_t Index;
+ if(populate_gaps && now_s < wanted_end_time_s)
+ pgc_inject_gap(ctx, metric, now_s, wanted_end_time_s);
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
- }
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
+ // ------------------------------------------------------------------------
+ // PASS 3: mark as skipped all the pages not useful
- if ((NULL == PValue) || !pg_cache_try_reserve_pages(ctx, 1)) {
- /* Failed to find page or failed to reserve a spot in the cache */
- return NULL;
- }
+ first = true;
+ this_page_start_time = 0;
+ while((PValue = PDCJudyLFirstThenNext(JudyL_page_array, &this_page_start_time, &first))) {
+ pd = *PValue;
- uv_rwlock_rdlock(&page_index->lock);
- Index = (Word_t)(start_time_ut / USEC_PER_SEC);
- PValue = JudyLGet(page_index->JudyL_array, Index, PJE0);
- if (likely(NULL != PValue)) {
- descr = *PValue;
- }
- if (NULL == PValue || 0 == descr->page_length) {
- /* Failed to find non-empty page */
- uv_rwlock_rdunlock(&page_index->lock);
+ internal_fatal(pd->metric_id != metric_id, "pd has wrong metric_id");
- pg_cache_release_pages(ctx, 1);
- return NULL;
- }
+ if(!(pd->status & PDC_PAGE_PREPROCESSED)) {
+ (*pages_overlapping)++;
+ pd->status |= PDC_PAGE_SKIP;
+ pd->status &= ~(PDC_PAGE_READY | PDC_PAGE_DISK_PENDING);
+ continue;
+ }
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_descr = descr->pg_cache_descr;
- flags = pg_cache_descr->flags;
- uv_rwlock_rdunlock(&page_index->lock);
+ pages_pass3++;
- if ((flags & RRD_PAGE_POPULATED) || !pg_cache_try_get_unsafe(descr, 1)) {
- /* Failed to get reference or page is already populated */
- rrdeng_page_descr_mutex_unlock(ctx, descr);
+ if(!pd->page) {
+ pd->page = pgc_page_get_and_acquire(main_cache, (Word_t) ctx, (Word_t) metric_id, pd->first_time_s, PGC_SEARCH_EXACT);
- pg_cache_release_pages(ctx, 1);
- return NULL;
+ if(pd->page) {
+ (*pages_found_pass4)++;
+
+ pd->status &= ~PDC_PAGE_DISK_PENDING;
+ pd->status |= PDC_PAGE_READY | PDC_PAGE_PRELOADED | PDC_PAGE_PRELOADED_PASS4;
+
+ if(pgc_page_data(pd->page) == DBENGINE_EMPTY_PAGE)
+ pd->status |= PDC_PAGE_EMPTY;
+
+ }
+ else if(!(pd->status & PDC_PAGE_FAILED) && (pd->status & PDC_PAGE_DATAFILE_ACQUIRED)) {
+ (*pages_pending)++;
+
+ pd->status |= PDC_PAGE_DISK_PENDING;
+
+ internal_fatal(pd->status & PDC_PAGE_SKIP, "page is disk pending and skipped");
+ internal_fatal(!pd->datafile.ptr, "datafile is NULL");
+ internal_fatal(!pd->datafile.extent.bytes, "datafile.extent.bytes zero");
+ internal_fatal(!pd->datafile.extent.pos, "datafile.extent.pos is zero");
+ internal_fatal(!pd->datafile.fileno, "datafile.fileno is zero");
+ }
+ }
+ else {
+ pd->status &= ~PDC_PAGE_DISK_PENDING;
+ pd->status |= (PDC_PAGE_READY | PDC_PAGE_PRELOADED);
+ }
}
- /* success */
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
- return descr;
+ internal_fatal(pages_pass2 != pages_pass3,
+ "DBENGINE: page count does not match");
+
+ *pages_total = pages_pass2;
+
+ return gaps;
}
-/**
- * Searches for pages in a time range and triggers disk I/O if necessary and possible.
- * Does not get a reference.
- * @param ctx DB context
- * @param id UUID
- * @param start_time_ut inclusive starting time in usec
- * @param end_time_ut inclusive ending time in usec
- * @param page_info_arrayp It allocates (*page_arrayp) and populates it with information of pages that overlap
- * with the time range [start_time,end_time]. The caller must free (*page_info_arrayp) with freez().
- * If page_info_arrayp is set to NULL nothing was allocated.
- * @param ret_page_indexp Sets the page index pointer (*ret_page_indexp) for the given UUID.
- * @return the number of pages that overlap with the time range [start_time,end_time].
- */
-unsigned pg_cache_preload(struct rrdengine_instance *ctx, uuid_t *id, usec_t start_time_ut, usec_t end_time_ut,
- struct rrdeng_page_info **page_info_arrayp, struct pg_cache_page_index **ret_page_indexp)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_descr *descr = NULL, *preload_array[PAGE_CACHE_MAX_PRELOAD_PAGES];
- struct page_cache_descr *pg_cache_descr = NULL;
- unsigned i, j, k, preload_count, count, page_info_array_max_size;
- unsigned long flags;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index = NULL;
- Word_t Index;
- uint8_t failed_to_reserve;
+typedef void (*page_found_callback_t)(PGC_PAGE *page, void *data);
+static size_t get_page_list_from_journal_v2(struct rrdengine_instance *ctx, METRIC *metric, usec_t start_time_ut, usec_t end_time_ut, page_found_callback_t callback, void *callback_data) {
+ uuid_t *uuid = mrg_metric_uuid(main_mrg, metric);
+ Word_t metric_id = mrg_metric_id(main_mrg, metric);
- fatal_assert(NULL != ret_page_indexp);
+ time_t wanted_start_time_s = (time_t)(start_time_ut / USEC_PER_SEC);
+ time_t wanted_end_time_s = (time_t)(end_time_ut / USEC_PER_SEC);
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- *ret_page_indexp = page_index = *PValue;
- }
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (NULL == PValue) {
- debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
- *ret_page_indexp = NULL;
- return 0;
- }
+ size_t pages_found = 0;
- uv_rwlock_rdlock(&page_index->lock);
- descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut);
- if (NULL == descr) {
- uv_rwlock_rdunlock(&page_index->lock);
- debug(D_RRDENGINE, "%s: No page was found to attempt preload.", __func__);
- *ret_page_indexp = NULL;
- return 0;
- } else {
- Index = (Word_t)(descr->start_time_ut / USEC_PER_SEC);
- }
- if (page_info_arrayp) {
- page_info_array_max_size = PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
- *page_info_arrayp = mallocz(page_info_array_max_size);
- }
+ uv_rwlock_rdlock(&ctx->datafiles.rwlock);
+ struct rrdengine_datafile *datafile;
+ for(datafile = ctx->datafiles.first; datafile ; datafile = datafile->next) {
+ struct journal_v2_header *j2_header = journalfile_v2_data_acquire(datafile->journalfile, NULL,
+ wanted_start_time_s,
+ wanted_end_time_s);
+ if (unlikely(!j2_header))
+ continue;
- for (count = 0, preload_count = 0 ;
- descr != NULL && is_page_in_time_range(descr, start_time_ut, end_time_ut) ;
- PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0),
- descr = unlikely(NULL == PValue) ? NULL : *PValue) {
- /* Iterate all pages in range */
+ time_t journal_start_time_s = (time_t)(j2_header->start_time_ut / USEC_PER_SEC);
- if (unlikely(0 == descr->page_length))
+ // the datafile possibly contains useful data for this query
+
+ size_t journal_metric_count = (size_t)j2_header->metric_count;
+ struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
+ struct journal_metric_list *uuid_entry = bsearch(uuid,uuid_list,journal_metric_count,sizeof(*uuid_list), journal_metric_uuid_compare);
+
+ if (unlikely(!uuid_entry)) {
+ // our UUID is not in this datafile
+ journalfile_v2_data_release(datafile->journalfile);
continue;
- if (page_info_arrayp) {
- if (unlikely(count >= page_info_array_max_size / sizeof(struct rrdeng_page_info))) {
- page_info_array_max_size += PAGE_CACHE_MAX_PRELOAD_PAGES * sizeof(struct rrdeng_page_info);
- *page_info_arrayp = reallocz(*page_info_arrayp, page_info_array_max_size);
- }
- (*page_info_arrayp)[count].start_time_ut = descr->start_time_ut;
- (*page_info_arrayp)[count].end_time_ut = descr->end_time_ut;
- (*page_info_arrayp)[count].page_length = descr->page_length;
}
- ++count;
-
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_descr = descr->pg_cache_descr;
- flags = pg_cache_descr->flags;
- if (pg_cache_can_get_unsafe(descr, 0)) {
- if (flags & RRD_PAGE_POPULATED) {
- /* success */
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
+
+ struct journal_page_header *page_list_header = (struct journal_page_header *) ((uint8_t *) j2_header + uuid_entry->page_offset);
+ struct journal_page_list *page_list = (struct journal_page_list *)((uint8_t *) page_list_header + sizeof(*page_list_header));
+ struct journal_extent_list *extent_list = (void *)((uint8_t *)j2_header + j2_header->extent_offset);
+ uint32_t uuid_page_entries = page_list_header->entries;
+
+ for (uint32_t index = 0; index < uuid_page_entries; index++) {
+ struct journal_page_list *page_entry_in_journal = &page_list[index];
+
+ time_t page_first_time_s = page_entry_in_journal->delta_start_s + journal_start_time_s;
+ time_t page_last_time_s = page_entry_in_journal->delta_end_s + journal_start_time_s;
+
+ TIME_RANGE_COMPARE prc = is_page_in_time_range(page_first_time_s, page_last_time_s, wanted_start_time_s, wanted_end_time_s);
+ if(prc == PAGE_IS_IN_THE_PAST)
continue;
- }
- }
- if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
- preload_array[preload_count++] = descr;
- if (PAGE_CACHE_MAX_PRELOAD_PAGES == preload_count) {
- rrdeng_page_descr_mutex_unlock(ctx, descr);
+
+ if(prc == PAGE_IS_IN_THE_FUTURE)
break;
+
+ time_t page_update_every_s = page_entry_in_journal->update_every_s;
+ size_t page_length = page_entry_in_journal->page_length;
+
+ if(datafile_acquire(datafile, DATAFILE_ACQUIRE_OPEN_CACHE)) { //for open cache item
+ // add this page to open cache
+ bool added = false;
+ struct extent_io_data ei = {
+ .pos = extent_list[page_entry_in_journal->extent_index].datafile_offset,
+ .bytes = extent_list[page_entry_in_journal->extent_index].datafile_size,
+ .page_length = page_length,
+ .file = datafile->file,
+ .fileno = datafile->fileno,
+ };
+
+ PGC_PAGE *page = pgc_page_add_and_acquire(open_cache, (PGC_ENTRY) {
+ .hot = false,
+ .section = (Word_t) ctx,
+ .metric_id = metric_id,
+ .start_time_s = page_first_time_s,
+ .end_time_s = page_last_time_s,
+ .update_every_s = page_update_every_s,
+ .data = datafile,
+ .size = 0,
+ .custom_data = (uint8_t *) &ei,
+ }, &added);
+
+ if(!added)
+ datafile_release(datafile, DATAFILE_ACQUIRE_OPEN_CACHE);
+
+ callback(page, callback_data);
+
+ pgc_page_release(open_cache, page);
+
+ pages_found++;
}
}
- rrdeng_page_descr_mutex_unlock(ctx, descr);
+ journalfile_v2_data_release(datafile->journalfile);
}
- uv_rwlock_rdunlock(&page_index->lock);
+ uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
- failed_to_reserve = 0;
- for (i = 0 ; i < preload_count && !failed_to_reserve ; ++i) {
- struct rrdeng_cmd cmd;
- struct rrdeng_page_descr *next;
+ return pages_found;
+}
- descr = preload_array[i];
- if (NULL == descr) {
- continue;
- }
- if (!pg_cache_try_reserve_pages(ctx, 1)) {
- failed_to_reserve = 1;
- break;
- }
- cmd.opcode = RRDENG_READ_EXTENT;
- cmd.read_extent.page_cache_descr[0] = descr;
- /* don't use this page again */
- preload_array[i] = NULL;
- for (j = 0, k = 1 ; j < preload_count ; ++j) {
- next = preload_array[j];
- if (NULL == next) {
- continue;
- }
- if (descr->extent == next->extent) {
- /* same extent, consolidate */
- if (!pg_cache_try_reserve_pages(ctx, 1)) {
- failed_to_reserve = 1;
- break;
- }
- cmd.read_extent.page_cache_descr[k++] = next;
- /* don't use this page again */
- preload_array[j] = NULL;
- }
- }
- cmd.read_extent.page_count = k;
- rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+void add_page_details_from_journal_v2(PGC_PAGE *page, void *JudyL_pptr) {
+ struct rrdengine_datafile *datafile = pgc_page_data(page);
+
+ if(!datafile_acquire(datafile, DATAFILE_ACQUIRE_PAGE_DETAILS)) // for pd
+ return;
+
+ Pvoid_t *PValue = PDCJudyLIns(JudyL_pptr, pgc_page_start_time_s(page), PJE0);
+ if (!PValue || PValue == PJERR)
+ fatal("DBENGINE: corrupted judy array");
+
+ if (unlikely(*PValue)) {
+ datafile_release(datafile, DATAFILE_ACQUIRE_PAGE_DETAILS);
+ return;
}
- if (failed_to_reserve) {
- debug(D_RRDENGINE, "%s: Failed to reserve enough memory, canceling I/O.", __func__);
- for (i = 0 ; i < preload_count ; ++i) {
- descr = preload_array[i];
- if (NULL == descr) {
- continue;
- }
- pg_cache_put(ctx, descr);
- }
+
+ Word_t metric_id = pgc_page_metric(page);
+
+ // let's add it to the judy
+ struct extent_io_data *ei = pgc_page_custom_data(open_cache, page);
+ struct page_details *pd = page_details_get();
+ *PValue = pd;
+
+ pd->datafile.extent.pos = ei->pos;
+ pd->datafile.extent.bytes = ei->bytes;
+ pd->datafile.file = ei->file;
+ pd->datafile.fileno = ei->fileno;
+ pd->first_time_s = pgc_page_start_time_s(page);
+ pd->last_time_s = pgc_page_end_time_s(page);
+ pd->datafile.ptr = datafile;
+ pd->page_length = ei->page_length;
+ pd->update_every_s = pgc_page_update_every_s(page);
+ pd->metric_id = metric_id;
+ pd->status |= PDC_PAGE_DISK_PENDING | PDC_PAGE_SOURCE_JOURNAL_V2 | PDC_PAGE_DATAFILE_ACQUIRED;
+}
+
+// Return a judyL will all pages that have start_time_ut and end_time_ut
+// Pvalue of the judy will be the end time for that page
+// DBENGINE2:
+#define time_delta(finish, pass) do { if(pass) { usec_t t = pass; (pass) = (finish) - (pass); (finish) = t; } } while(0)
+static Pvoid_t get_page_list(
+ struct rrdengine_instance *ctx,
+ METRIC *metric,
+ usec_t start_time_ut,
+ usec_t end_time_ut,
+ size_t *pages_to_load,
+ time_t *optimal_end_time_s
+) {
+ *optimal_end_time_s = 0;
+
+ Pvoid_t JudyL_page_array = (Pvoid_t) NULL;
+
+ time_t wanted_start_time_s = (time_t)(start_time_ut / USEC_PER_SEC);
+ time_t wanted_end_time_s = (time_t)(end_time_ut / USEC_PER_SEC);
+
+ size_t pages_found_in_main_cache = 0,
+ pages_found_in_open_cache = 0,
+ pages_found_in_journals_v2 = 0,
+ pages_found_pass4 = 0,
+ pages_pending = 0,
+ pages_overlapping = 0,
+ pages_total = 0;
+
+ size_t cache_gaps = 0, query_gaps = 0;
+ bool done_v2 = false, done_open = false;
+
+ usec_t pass1_ut = 0, pass2_ut = 0, pass3_ut = 0, pass4_ut = 0;
+
+ // --------------------------------------------------------------
+ // PASS 1: Check what the main page cache has available
+
+ pass1_ut = now_monotonic_usec();
+ size_t pages_pass1 = get_page_list_from_pgc(main_cache, metric, ctx, wanted_start_time_s, wanted_end_time_s,
+ &JudyL_page_array, &cache_gaps,
+ false, PDC_PAGE_SOURCE_MAIN_CACHE);
+ query_gaps += cache_gaps;
+ pages_found_in_main_cache += pages_pass1;
+ pages_total += pages_pass1;
+
+ if(pages_found_in_main_cache && !cache_gaps) {
+ query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s,
+ &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping,
+ optimal_end_time_s, false);
+
+ if (pages_total && !query_gaps)
+ goto we_are_done;
}
- if (!preload_count) {
- /* no such page */
- debug(D_RRDENGINE, "%s: No page was eligible to attempt preload.", __func__);
+
+ // --------------------------------------------------------------
+ // PASS 2: Check what the open journal page cache has available
+ // these will be loaded from disk
+
+ pass2_ut = now_monotonic_usec();
+ size_t pages_pass2 = get_page_list_from_pgc(open_cache, metric, ctx, wanted_start_time_s, wanted_end_time_s,
+ &JudyL_page_array, &cache_gaps,
+ true, PDC_PAGE_SOURCE_OPEN_CACHE);
+ query_gaps += cache_gaps;
+ pages_found_in_open_cache += pages_pass2;
+ pages_total += pages_pass2;
+ done_open = true;
+
+ if(pages_found_in_open_cache) {
+ query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s,
+ &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping,
+ optimal_end_time_s, false);
+
+ if (pages_total && !query_gaps)
+ goto we_are_done;
}
- if (unlikely(0 == count && page_info_arrayp)) {
- freez(*page_info_arrayp);
- *page_info_arrayp = NULL;
+
+ // --------------------------------------------------------------
+ // PASS 3: Check Journal v2 to fill the gaps
+
+ pass3_ut = now_monotonic_usec();
+ size_t pages_pass3 = get_page_list_from_journal_v2(ctx, metric, start_time_ut, end_time_ut,
+ add_page_details_from_journal_v2, &JudyL_page_array);
+ pages_found_in_journals_v2 += pages_pass3;
+ pages_total += pages_pass3;
+ done_v2 = true;
+
+ // --------------------------------------------------------------
+ // PASS 4: Check the cache again
+ // and calculate the time gaps in the query
+ // THIS IS REQUIRED AFTER JOURNAL V2 LOOKUP
+
+ pass4_ut = now_monotonic_usec();
+ query_gaps = list_has_time_gaps(ctx, metric, JudyL_page_array, wanted_start_time_s, wanted_end_time_s,
+ &pages_total, &pages_found_pass4, &pages_pending, &pages_overlapping,
+ optimal_end_time_s, true);
+
+we_are_done:
+
+ if(pages_to_load)
+ *pages_to_load = pages_pending;
+
+ usec_t finish_ut = now_monotonic_usec();
+ time_delta(finish_ut, pass4_ut);
+ time_delta(finish_ut, pass3_ut);
+ time_delta(finish_ut, pass2_ut);
+ time_delta(finish_ut, pass1_ut);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_main_cache_lookup, pass1_ut, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_open_cache_lookup, pass2_ut, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_journal_v2_lookup, pass3_ut, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_in_pass4_lookup, pass4_ut, __ATOMIC_RELAXED);
+
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_planned_with_gaps, (query_gaps) ? 1 : 0, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_open, done_open ? 1 : 0, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_journal_v2, done_v2 ? 1 : 0, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_total, pages_total, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_meta_source_main_cache, pages_found_in_main_cache, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_meta_source_open_cache, pages_found_in_open_cache, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_meta_source_journal_v2, pages_found_in_journals_v2, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache, pages_found_in_main_cache, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache_at_pass4, pages_found_pass4, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_to_load_from_disk, pages_pending, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_overlapping_skipped, pages_overlapping, __ATOMIC_RELAXED);
+
+ return JudyL_page_array;
+}
+
+inline void rrdeng_prep_wait(PDC *pdc) {
+ if (unlikely(pdc && !pdc->prep_done)) {
+ usec_t started_ut = now_monotonic_usec();
+ completion_wait_for(&pdc->prep_completion);
+ pdc->prep_done = true;
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_wait_for_prep, now_monotonic_usec() - started_ut, __ATOMIC_RELAXED);
}
- return count;
}
-/*
- * Searches for a page and gets a reference.
- * When point_in_time is INVALID_TIME get any page.
- * If index is NULL lookup by UUID (id).
- */
-struct rrdeng_page_descr *
- pg_cache_lookup(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
- usec_t point_in_time_ut)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_descr *descr = NULL;
- struct page_cache_descr *pg_cache_descr = NULL;
- unsigned long flags;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index = NULL;
- Word_t Index;
- uint8_t page_not_in_cache;
-
- if (unlikely(NULL == index)) {
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
- }
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (NULL == PValue) {
- return NULL;
- }
- } else {
- page_index = index;
+void rrdeng_prep_query(PDC *pdc) {
+ size_t pages_to_load = 0;
+ pdc->page_list_JudyL = get_page_list(pdc->ctx, pdc->metric,
+ pdc->start_time_s * USEC_PER_SEC,
+ pdc->end_time_s * USEC_PER_SEC,
+ &pages_to_load,
+ &pdc->optimal_end_time_s);
+
+ if (pages_to_load && pdc->page_list_JudyL) {
+ pdc_acquire(pdc); // we get 1 for the 1st worker in the chain: do_read_page_list_work()
+ usec_t start_ut = now_monotonic_usec();
+// if(likely(priority == STORAGE_PRIORITY_BEST_EFFORT))
+// dbengine_load_page_list_directly(ctx, handle->pdc);
+// else
+ pdc_route_asynchronously(pdc->ctx, pdc);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.prep_time_to_route, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
}
- pg_cache_reserve_pages(ctx, 1);
-
- page_not_in_cache = 0;
- uv_rwlock_rdlock(&page_index->lock);
- while (1) {
- Index = (Word_t)(point_in_time_ut / USEC_PER_SEC);
- PValue = JudyLLast(page_index->JudyL_array, &Index, PJE0);
- if (likely(NULL != PValue)) {
- descr = *PValue;
- }
- if (NULL == PValue ||
- 0 == descr->page_length ||
- (INVALID_TIME != point_in_time_ut &&
- !is_point_in_time_in_page(descr, point_in_time_ut))) {
- /* non-empty page not found */
- uv_rwlock_rdunlock(&page_index->lock);
-
- pg_cache_release_pages(ctx, 1);
- return NULL;
- }
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_descr = descr->pg_cache_descr;
- flags = pg_cache_descr->flags;
- if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
- /* success */
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
- break;
- }
- if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
- struct rrdeng_cmd cmd;
+ else
+ completion_mark_complete(&pdc->page_completion);
- uv_rwlock_rdunlock(&page_index->lock);
+ completion_mark_complete(&pdc->prep_completion);
- cmd.opcode = RRDENG_READ_PAGE;
- cmd.read_page.page_cache_descr = descr;
- rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+ pdc_release_and_destroy_if_unreferenced(pdc, true, true);
+}
- debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
- if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr, "", true);
- while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
- pg_cache_wait_event_unsafe(descr);
- }
- /* success */
- /* Downgrade exclusive reference to allow other readers */
- pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
- pg_cache_wake_up_waiters_unsafe(descr);
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
- return descr;
- }
- uv_rwlock_rdunlock(&page_index->lock);
- debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
- if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr, "", true);
- if (!(flags & RRD_PAGE_POPULATED))
- page_not_in_cache = 1;
- pg_cache_wait_event_unsafe(descr);
- rrdeng_page_descr_mutex_unlock(ctx, descr);
-
- /* reset scan to find again */
- uv_rwlock_rdlock(&page_index->lock);
- }
- uv_rwlock_rdunlock(&page_index->lock);
+/**
+ * Searches for pages in a time range and triggers disk I/O if necessary and possible.
+ * @param ctx DB context
+ * @param handle query handle as initialized
+ * @param start_time_ut inclusive starting time in usec
+ * @param end_time_ut inclusive ending time in usec
+ * @return 1 / 0 (pages found or not found)
+ */
+void pg_cache_preload(struct rrdeng_query_handle *handle) {
+ if (unlikely(!handle || !handle->metric))
+ return;
- if (!(flags & RRD_PAGE_DIRTY))
- pg_cache_replaceQ_set_hot(ctx, descr);
- pg_cache_release_pages(ctx, 1);
- if (page_not_in_cache)
- rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
- else
- rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
- return descr;
+ __atomic_add_fetch(&handle->ctx->atomic.inflight_queries, 1, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.currently_running_queries, 1, __ATOMIC_RELAXED);
+ handle->pdc = pdc_get();
+ handle->pdc->metric = mrg_metric_dup(main_mrg, handle->metric);
+ handle->pdc->start_time_s = handle->start_time_s;
+ handle->pdc->end_time_s = handle->end_time_s;
+ handle->pdc->priority = handle->priority;
+ handle->pdc->optimal_end_time_s = handle->end_time_s;
+ handle->pdc->ctx = handle->ctx;
+ handle->pdc->refcount = 1;
+ netdata_spinlock_init(&handle->pdc->refcount_spinlock);
+ completion_init(&handle->pdc->prep_completion);
+ completion_init(&handle->pdc->page_completion);
+
+ if(ctx_is_available_for_queries(handle->ctx)) {
+ handle->pdc->refcount++; // we get 1 for the query thread and 1 for the prep thread
+ rrdeng_enq_cmd(handle->ctx, RRDENG_OPCODE_QUERY, handle->pdc, NULL, handle->priority, NULL, NULL);
+ }
+ else {
+ completion_mark_complete(&handle->pdc->prep_completion);
+ completion_mark_complete(&handle->pdc->page_completion);
+ }
}
/*
@@ -1088,226 +840,282 @@ struct rrdeng_page_descr *
* start_time and end_time are inclusive.
* If index is NULL lookup by UUID (id).
*/
-struct rrdeng_page_descr *
-pg_cache_lookup_next(struct rrdengine_instance *ctx, struct pg_cache_page_index *index, uuid_t *id,
- usec_t start_time_ut, usec_t end_time_ut)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- struct rrdeng_page_descr *descr = NULL;
- struct page_cache_descr *pg_cache_descr = NULL;
- unsigned long flags;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index = NULL;
- uint8_t page_not_in_cache;
-
- if (unlikely(NULL == index)) {
- uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
- PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, id, sizeof(uuid_t));
- if (likely(NULL != PValue)) {
- page_index = *PValue;
+struct pgc_page *pg_cache_lookup_next(
+ struct rrdengine_instance *ctx,
+ PDC *pdc,
+ time_t now_s,
+ time_t last_update_every_s,
+ size_t *entries
+) {
+ if (unlikely(!pdc))
+ return NULL;
+
+ rrdeng_prep_wait(pdc);
+
+ if (unlikely(!pdc->page_list_JudyL))
+ return NULL;
+
+ usec_t start_ut = now_monotonic_usec();
+ size_t gaps = 0;
+ bool waited = false, preloaded;
+ PGC_PAGE *page = NULL;
+
+ while(!page) {
+ bool page_from_pd = false;
+ preloaded = false;
+ struct page_details *pd = pdc_find_page_for_time(
+ pdc->page_list_JudyL, now_s, &gaps,
+ PDC_PAGE_PROCESSED, PDC_PAGE_EMPTY);
+
+ if (!pd)
+ break;
+
+ page = pd->page;
+ page_from_pd = true;
+ preloaded = pdc_page_status_check(pd, PDC_PAGE_PRELOADED);
+ if(!page) {
+ if(!completion_is_done(&pdc->page_completion)) {
+ page = pgc_page_get_and_acquire(main_cache, (Word_t)ctx,
+ pd->metric_id, pd->first_time_s, PGC_SEARCH_EXACT);
+ page_from_pd = false;
+ preloaded = pdc_page_status_check(pd, PDC_PAGE_PRELOADED);
+ }
+
+ if(!page) {
+ pdc->completed_jobs =
+ completion_wait_for_a_job(&pdc->page_completion, pdc->completed_jobs);
+
+ page = pd->page;
+ page_from_pd = true;
+ preloaded = pdc_page_status_check(pd, PDC_PAGE_PRELOADED);
+ waited = true;
+ }
}
- uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
- if (NULL == PValue) {
- return NULL;
+
+ if(page && pgc_page_data(page) == DBENGINE_EMPTY_PAGE)
+ pdc_page_status_set(pd, PDC_PAGE_EMPTY);
+
+ if(!page || pdc_page_status_check(pd, PDC_PAGE_QUERY_GLOBAL_SKIP_LIST | PDC_PAGE_EMPTY)) {
+ page = NULL;
+ continue;
}
- } else {
- page_index = index;
- }
- pg_cache_reserve_pages(ctx, 1);
-
- page_not_in_cache = 0;
- uv_rwlock_rdlock(&page_index->lock);
- int retry_count = 0;
- while (1) {
- descr = find_first_page_in_time_range(page_index, start_time_ut, end_time_ut);
- if (NULL == descr || 0 == descr->page_length || retry_count == default_rrdeng_page_fetch_retries) {
- /* non-empty page not found */
- if (retry_count == default_rrdeng_page_fetch_retries)
- error_report("Page cache timeout while waiting for page %p : returning FAIL", descr);
- uv_rwlock_rdunlock(&page_index->lock);
-
- pg_cache_release_pages(ctx, 1);
- return NULL;
+
+ // we now have page and is not empty
+
+ time_t page_start_time_s = pgc_page_start_time_s(page);
+ time_t page_end_time_s = pgc_page_end_time_s(page);
+ time_t page_update_every_s = pgc_page_update_every_s(page);
+ size_t page_length = pgc_page_data_size(main_cache, page);
+
+ if(unlikely(page_start_time_s == INVALID_TIME || page_end_time_s == INVALID_TIME)) {
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_zero_time_skipped, 1, __ATOMIC_RELAXED);
+ pgc_page_to_clean_evict_or_release(main_cache, page);
+ pdc_page_status_set(pd, PDC_PAGE_INVALID | PDC_PAGE_RELEASED);
+ pd->page = page = NULL;
+ continue;
}
- rrdeng_page_descr_mutex_lock(ctx, descr);
- pg_cache_descr = descr->pg_cache_descr;
- flags = pg_cache_descr->flags;
- if ((flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 0)) {
- /* success */
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- debug(D_RRDENGINE, "%s: Page was found in memory.", __func__);
- break;
+ else if(page_length > RRDENG_BLOCK_SIZE) {
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_size_skipped, 1, __ATOMIC_RELAXED);
+ pgc_page_to_clean_evict_or_release(main_cache, page);
+ pdc_page_status_set(pd, PDC_PAGE_INVALID | PDC_PAGE_RELEASED);
+ pd->page = page = NULL;
+ continue;
}
- if (!(flags & RRD_PAGE_POPULATED) && pg_cache_try_get_unsafe(descr, 1)) {
- struct rrdeng_cmd cmd;
+ else {
+ if (unlikely(page_update_every_s <= 0 || page_update_every_s > 86400)) {
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_update_every_fixed, 1, __ATOMIC_RELAXED);
+ pd->update_every_s = page_update_every_s = pgc_page_fix_update_every(page, last_update_every_s);
+ }
- uv_rwlock_rdunlock(&page_index->lock);
+ size_t entries_by_size = page_entries_by_size(page_length, CTX_POINT_SIZE_BYTES(ctx));
+ size_t entries_by_time = page_entries_by_time(page_start_time_s, page_end_time_s, page_update_every_s);
+ if(unlikely(entries_by_size < entries_by_time)) {
+ time_t fixed_page_end_time_s = (time_t)(page_start_time_s + (entries_by_size - 1) * page_update_every_s);
+ pd->last_time_s = page_end_time_s = pgc_page_fix_end_time_s(page, fixed_page_end_time_s);
+ entries_by_time = (page_end_time_s - (page_start_time_s - page_update_every_s)) / page_update_every_s;
- cmd.opcode = RRDENG_READ_PAGE;
- cmd.read_page.page_cache_descr = descr;
- rrdeng_enq_cmd(&ctx->worker_config, &cmd);
+ internal_fatal(entries_by_size != entries_by_time, "DBENGINE: wrong entries by time again!");
- debug(D_RRDENGINE, "%s: Waiting for page to be asynchronously read from disk:", __func__);
- if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr, "", true);
- while (!(pg_cache_descr->flags & RRD_PAGE_POPULATED)) {
- pg_cache_wait_event_unsafe(descr);
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_invalid_entries_fixed, 1, __ATOMIC_RELAXED);
}
- /* success */
- /* Downgrade exclusive reference to allow other readers */
- pg_cache_descr->flags &= ~RRD_PAGE_LOCKED;
- pg_cache_wake_up_waiters_unsafe(descr);
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
- return descr;
+ *entries = entries_by_time;
}
- uv_rwlock_rdunlock(&page_index->lock);
- debug(D_RRDENGINE, "%s: Waiting for page to be unlocked:", __func__);
- if(unlikely(debug_flags & D_RRDENGINE))
- print_page_cache_descr(descr, "", true);
- if (!(flags & RRD_PAGE_POPULATED))
- page_not_in_cache = 1;
-
- if (pg_cache_timedwait_event_unsafe(descr, default_rrdeng_page_fetch_timeout) == UV_ETIMEDOUT) {
- error_report("Page cache timeout while waiting for page %p : retry count = %d", descr, retry_count);
- ++retry_count;
+
+ if(unlikely(page_end_time_s < now_s)) {
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_past_time_skipped, 1, __ATOMIC_RELAXED);
+ pgc_page_release(main_cache, page);
+ pdc_page_status_set(pd, PDC_PAGE_SKIP | PDC_PAGE_RELEASED);
+ pd->page = page = NULL;
+ continue;
}
- rrdeng_page_descr_mutex_unlock(ctx, descr);
- /* reset scan to find again */
- uv_rwlock_rdlock(&page_index->lock);
+ if(page_from_pd)
+ // PDC_PAGE_RELEASED is for pdc_destroy() to not release the page twice - the caller will release it
+ pdc_page_status_set(pd, PDC_PAGE_RELEASED | PDC_PAGE_PROCESSED);
+ else
+ pdc_page_status_set(pd, PDC_PAGE_PROCESSED);
}
- uv_rwlock_rdunlock(&page_index->lock);
- if (!(flags & RRD_PAGE_DIRTY))
- pg_cache_replaceQ_set_hot(ctx, descr);
- pg_cache_release_pages(ctx, 1);
- if (page_not_in_cache)
- rrd_stat_atomic_add(&ctx->stats.pg_cache_misses, 1);
- else
- rrd_stat_atomic_add(&ctx->stats.pg_cache_hits, 1);
- return descr;
-}
+ if(gaps && !pdc->executed_with_gaps)
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.queries_executed_with_gaps, 1, __ATOMIC_RELAXED);
+ pdc->executed_with_gaps = +gaps;
-struct pg_cache_page_index *create_page_index(uuid_t *id, struct rrdengine_instance *ctx)
-{
- struct pg_cache_page_index *page_index;
-
- page_index = mallocz(sizeof(*page_index));
- page_index->JudyL_array = (Pvoid_t) NULL;
- uuid_copy(page_index->id, *id);
- fatal_assert(0 == uv_rwlock_init(&page_index->lock));
- page_index->oldest_time_ut = INVALID_TIME;
- page_index->latest_time_ut = INVALID_TIME;
- page_index->prev = NULL;
- page_index->page_count = 0;
- page_index->refcount = 0;
- page_index->writers = 0;
- page_index->ctx = ctx;
- page_index->latest_update_every_s = default_rrd_update_every;
-
- return page_index;
-}
+ if(page) {
+ if(waited)
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_wait_loaded, 1, __ATOMIC_RELAXED);
+ else
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_nowait_loaded, 1, __ATOMIC_RELAXED);
+ }
+ else {
+ if(waited)
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_wait_failed, 1, __ATOMIC_RELAXED);
+ else
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.page_next_nowait_failed, 1, __ATOMIC_RELAXED);
+ }
-static void init_metrics_index(struct rrdengine_instance *ctx)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
+ if(waited) {
+ if(preloaded)
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_slow_preload_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
+ else
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_slow_disk_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
+ }
+ else {
+ if(preloaded)
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_fast_preload_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
+ else
+ __atomic_add_fetch(&rrdeng_cache_efficiency_stats.query_time_to_fast_disk_next_page, now_monotonic_usec() - start_ut, __ATOMIC_RELAXED);
+ }
- pg_cache->metrics_index.JudyHS_array = (Pvoid_t) NULL;
- pg_cache->metrics_index.last_page_index = NULL;
- fatal_assert(0 == uv_rwlock_init(&pg_cache->metrics_index.lock));
+ return page;
}
-static void init_replaceQ(struct rrdengine_instance *ctx)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
+void pgc_open_add_hot_page(Word_t section, Word_t metric_id, time_t start_time_s, time_t end_time_s, time_t update_every_s,
+ struct rrdengine_datafile *datafile, uint64_t extent_offset, unsigned extent_size, uint32_t page_length) {
+
+ if(!datafile_acquire(datafile, DATAFILE_ACQUIRE_OPEN_CACHE)) // for open cache item
+ fatal("DBENGINE: cannot acquire datafile to put page in open cache");
+
+ struct extent_io_data ext_io_data = {
+ .file = datafile->file,
+ .fileno = datafile->fileno,
+ .pos = extent_offset,
+ .bytes = extent_size,
+ .page_length = page_length
+ };
+
+ PGC_ENTRY page_entry = {
+ .hot = true,
+ .section = section,
+ .metric_id = metric_id,
+ .start_time_s = start_time_s,
+ .end_time_s = end_time_s,
+ .update_every_s = update_every_s,
+ .size = 0,
+ .data = datafile,
+ .custom_data = (uint8_t *) &ext_io_data,
+ };
+
+ internal_fatal(!datafile->fileno, "DBENGINE: datafile supplied does not have a number");
+
+ bool added = true;
+ PGC_PAGE *page = pgc_page_add_and_acquire(open_cache, page_entry, &added);
+ int tries = 100;
+ while(!added && page_entry.end_time_s > pgc_page_end_time_s(page) && tries--) {
+ pgc_page_to_clean_evict_or_release(open_cache, page);
+ page = pgc_page_add_and_acquire(open_cache, page_entry, &added);
+ }
- pg_cache->replaceQ.head = NULL;
- pg_cache->replaceQ.tail = NULL;
- fatal_assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
-}
+ if(!added) {
+ datafile_release(datafile, DATAFILE_ACQUIRE_OPEN_CACHE);
-static void init_committed_page_index(struct rrdengine_instance *ctx)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
+ internal_fatal(page_entry.end_time_s > pgc_page_end_time_s(page),
+ "DBENGINE: cannot add longer page to open cache");
+ }
- pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL;
- fatal_assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock));
- pg_cache->committed_page_index.latest_corr_id = 0;
- pg_cache->committed_page_index.nr_committed_pages = 0;
+ pgc_page_release(open_cache, (PGC_PAGE *)page);
}
-void init_page_cache(struct rrdengine_instance *ctx)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
+size_t dynamic_open_cache_size(void) {
+ size_t main_cache_size = pgc_get_wanted_cache_size(main_cache);
+ size_t target_size = main_cache_size / 100 * 5;
- pg_cache->page_descriptors = 0;
- pg_cache->populated_pages = 0;
- fatal_assert(0 == uv_rwlock_init(&pg_cache->pg_cache_rwlock));
+ if(target_size < 2 * 1024 * 1024)
+ target_size = 2 * 1024 * 1024;
- init_metrics_index(ctx);
- init_replaceQ(ctx);
- init_committed_page_index(ctx);
+ return target_size;
}
-void free_page_cache(struct rrdengine_instance *ctx)
-{
- struct page_cache *pg_cache = &ctx->pg_cache;
- Pvoid_t *PValue;
- struct pg_cache_page_index *page_index, *prev_page_index;
- Word_t Index;
- struct rrdeng_page_descr *descr;
- struct page_cache_descr *pg_cache_descr;
-
- // if we are exiting, the OS will recover all memory so do not slow down the shutdown process
- // Do the cleanup if we are compiling with NETDATA_INTERNAL_CHECKS
- // This affects the reporting of dbengine statistics which are available in real time
- // via the /api/v1/dbengine_stats endpoint
-#ifndef NETDATA_DBENGINE_FREE
- if (netdata_exit)
- return;
-#endif
- Word_t metrics_index_bytes = 0, pages_index_bytes = 0, pages_dirty_index_bytes = 0;
-
- /* Free committed page index */
- pages_dirty_index_bytes = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0);
- fatal_assert(NULL == pg_cache->committed_page_index.JudyL_array);
-
- for (page_index = pg_cache->metrics_index.last_page_index ;
- page_index != NULL ;
- page_index = prev_page_index) {
+size_t dynamic_extent_cache_size(void) {
+ size_t main_cache_size = pgc_get_wanted_cache_size(main_cache);
+ size_t target_size = main_cache_size / 100 * 5;
- prev_page_index = page_index->prev;
+ if(target_size < 3 * 1024 * 1024)
+ target_size = 3 * 1024 * 1024;
- /* Find first page in range */
- Index = (Word_t) 0;
- PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
- descr = unlikely(NULL == PValue) ? NULL : *PValue;
-
- while (descr != NULL) {
- /* Iterate all page descriptors of this metric */
+ return target_size;
+}
- if (descr->pg_cache_descr_state & PG_CACHE_DESCR_ALLOCATED) {
- /* Check rrdenglocking.c */
- pg_cache_descr = descr->pg_cache_descr;
- if (pg_cache_descr->flags & RRD_PAGE_POPULATED) {
- dbengine_page_free(pg_cache_descr->page);
- }
- rrdeng_destroy_pg_cache_descr(ctx, pg_cache_descr);
- }
- rrdeng_page_descr_freez(descr);
+void pgc_and_mrg_initialize(void)
+{
+ main_mrg = mrg_create();
- PValue = JudyLNext(page_index->JudyL_array, &Index, PJE0);
- descr = unlikely(NULL == PValue) ? NULL : *PValue;
- }
+ size_t target_cache_size = (size_t)default_rrdeng_page_cache_mb * 1024ULL * 1024ULL;
+ size_t main_cache_size = (target_cache_size / 100) * 95;
+ size_t open_cache_size = 0;
+ size_t extent_cache_size = (target_cache_size / 100) * 5;
- /* Free page index */
- pages_index_bytes += JudyLFreeArray(&page_index->JudyL_array, PJE0);
- fatal_assert(NULL == page_index->JudyL_array);
- freez(page_index);
+ if(extent_cache_size < 3 * 1024 * 1024) {
+ extent_cache_size = 3 * 1024 * 1024;
+ main_cache_size = target_cache_size - extent_cache_size;
}
- /* Free metrics index */
- metrics_index_bytes = JudyHSFreeArray(&pg_cache->metrics_index.JudyHS_array, PJE0);
- fatal_assert(NULL == pg_cache->metrics_index.JudyHS_array);
- info("Freed %lu bytes of memory from page cache.", pages_dirty_index_bytes + pages_index_bytes + metrics_index_bytes);
+
+ main_cache = pgc_create(
+ "main_cache",
+ main_cache_size,
+ main_cache_free_clean_page_callback,
+ (size_t) rrdeng_pages_per_extent,
+ main_cache_flush_dirty_page_init_callback,
+ main_cache_flush_dirty_page_callback,
+ 10,
+ 10240, // if there are that many threads, evict so many at once!
+ 1000, //
+ 5, // don't delay too much other threads
+ PGC_OPTIONS_AUTOSCALE, // AUTOSCALE = 2x max hot pages
+ 0, // 0 = as many as the system cpus
+ 0
+ );
+
+ open_cache = pgc_create(
+ "open_cache",
+ open_cache_size, // the default is 1MB
+ open_cache_free_clean_page_callback,
+ 1,
+ NULL,
+ open_cache_flush_dirty_page_callback,
+ 10,
+ 10240, // if there are that many threads, evict that many at once!
+ 1000, //
+ 3, // don't delay too much other threads
+ PGC_OPTIONS_AUTOSCALE | PGC_OPTIONS_EVICT_PAGES_INLINE | PGC_OPTIONS_FLUSH_PAGES_INLINE,
+ 0, // 0 = as many as the system cpus
+ sizeof(struct extent_io_data)
+ );
+ pgc_set_dynamic_target_cache_size_callback(open_cache, dynamic_open_cache_size);
+
+ extent_cache = pgc_create(
+ "extent_cache",
+ extent_cache_size,
+ extent_cache_free_clean_page_callback,
+ 1,
+ NULL,
+ extent_cache_flush_dirty_page_callback,
+ 5,
+ 10, // it will lose up to that extents at once!
+ 100, //
+ 2, // don't delay too much other threads
+ PGC_OPTIONS_AUTOSCALE | PGC_OPTIONS_EVICT_PAGES_INLINE | PGC_OPTIONS_FLUSH_PAGES_INLINE,
+ 0, // 0 = as many as the system cpus
+ 0
+ );
+ pgc_set_dynamic_target_cache_size_callback(extent_cache, dynamic_extent_cache_size);
}