summaryrefslogtreecommitdiffstats
path: root/database/engine/pdc.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:23 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:44 +0000
commit836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch)
tree1604da8f482d02effa033c94a84be42bc0c848c3 /database/engine/pdc.c
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz
netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine/pdc.c')
-rw-r--r--database/engine/pdc.c1332
1 files changed, 0 insertions, 1332 deletions
diff --git a/database/engine/pdc.c b/database/engine/pdc.c
deleted file mode 100644
index 5fe205e64..000000000
--- a/database/engine/pdc.c
+++ /dev/null
@@ -1,1332 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-#define NETDATA_RRD_INTERNALS
-#include "pdc.h"
-
-struct extent_page_details_list {
- uv_file file;
- uint64_t extent_offset;
- uint32_t extent_size;
- unsigned number_of_pages_in_JudyL;
- Pvoid_t page_details_by_metric_id_JudyL;
- struct page_details_control *pdc;
- struct rrdengine_datafile *datafile;
-
- struct rrdeng_cmd *cmd;
- bool head_to_datafile_extent_queries_pending_for_extent;
-
- struct {
- struct extent_page_details_list *prev;
- struct extent_page_details_list *next;
- } query;
-};
-
-typedef struct datafile_extent_offset_list {
- uv_file file;
- unsigned fileno;
- Pvoid_t extent_pd_list_by_extent_offset_JudyL;
-} DEOL;
-
-// ----------------------------------------------------------------------------
-// PDC cache
-
-static struct {
- struct {
- ARAL *ar;
- } pdc;
-
- struct {
- ARAL *ar;
- } pd;
-
- struct {
- ARAL *ar;
- } epdl;
-
- struct {
- ARAL *ar;
- } deol;
-} pdc_globals = {};
-
-void pdc_init(void) {
- pdc_globals.pdc.ar = aral_create(
- "dbengine-pdc",
- sizeof(PDC),
- 0,
- 65536,
- NULL,
- NULL, NULL, false, false
- );
-}
-
-PDC *pdc_get(void) {
- PDC *pdc = aral_mallocz(pdc_globals.pdc.ar);
- memset(pdc, 0, sizeof(PDC));
- return pdc;
-}
-
-static void pdc_release(PDC *pdc) {
- aral_freez(pdc_globals.pdc.ar, pdc);
-}
-
-size_t pdc_cache_size(void) {
- return aral_overhead(pdc_globals.pdc.ar) + aral_structures(pdc_globals.pdc.ar);
-}
-
-// ----------------------------------------------------------------------------
-// PD cache
-
-void page_details_init(void) {
- pdc_globals.pd.ar = aral_create(
- "dbengine-pd",
- sizeof(struct page_details),
- 0,
- 65536,
- NULL,
- NULL, NULL, false, false
- );
-}
-
-struct page_details *page_details_get(void) {
- struct page_details *pd = aral_mallocz(pdc_globals.pd.ar);
- memset(pd, 0, sizeof(struct page_details));
- return pd;
-}
-
-static void page_details_release(struct page_details *pd) {
- aral_freez(pdc_globals.pd.ar, pd);
-}
-
-size_t pd_cache_size(void) {
- return aral_overhead(pdc_globals.pd.ar) + aral_structures(pdc_globals.pd.ar);
-}
-
-// ----------------------------------------------------------------------------
-// epdl cache
-
-void epdl_init(void) {
- pdc_globals.epdl.ar = aral_create(
- "dbengine-epdl",
- sizeof(EPDL),
- 0,
- 65536,
- NULL,
- NULL, NULL, false, false
- );
-}
-
-static EPDL *epdl_get(void) {
- EPDL *epdl = aral_mallocz(pdc_globals.epdl.ar);
- memset(epdl, 0, sizeof(EPDL));
- return epdl;
-}
-
-static void epdl_release(EPDL *epdl) {
- aral_freez(pdc_globals.epdl.ar, epdl);
-}
-
-size_t epdl_cache_size(void) {
- return aral_overhead(pdc_globals.epdl.ar) + aral_structures(pdc_globals.epdl.ar);
-}
-
-// ----------------------------------------------------------------------------
-// deol cache
-
-void deol_init(void) {
- pdc_globals.deol.ar = aral_create(
- "dbengine-deol",
- sizeof(DEOL),
- 0,
- 65536,
- NULL,
- NULL, NULL, false, false
- );
-}
-
-static DEOL *deol_get(void) {
- DEOL *deol = aral_mallocz(pdc_globals.deol.ar);
- memset(deol, 0, sizeof(DEOL));
- return deol;
-}
-
-static void deol_release(DEOL *deol) {
- aral_freez(pdc_globals.deol.ar, deol);
-}
-
-size_t deol_cache_size(void) {
- return aral_overhead(pdc_globals.deol.ar) + aral_structures(pdc_globals.deol.ar);
-}
-
-// ----------------------------------------------------------------------------
-// extent with buffer cache
-
-static struct {
- struct {
- SPINLOCK spinlock;
- struct extent_buffer *available_items;
- size_t available;
- } protected;
-
- struct {
- size_t allocated;
- size_t allocated_bytes;
- } atomics;
-
- size_t max_size;
-
-} extent_buffer_globals = {
- .protected = {
- .spinlock = NETDATA_SPINLOCK_INITIALIZER,
- .available_items = NULL,
- .available = 0,
- },
- .atomics = {
- .allocated = 0,
- .allocated_bytes = 0,
- },
- .max_size = MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE,
-};
-
-void extent_buffer_init(void) {
- size_t max_extent_uncompressed = MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE;
- size_t max_size = (size_t)LZ4_compressBound(MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE);
- if(max_size < max_extent_uncompressed)
- max_size = max_extent_uncompressed;
-
- extent_buffer_globals.max_size = max_size;
-}
-
-void extent_buffer_cleanup1(void) {
- struct extent_buffer *item = NULL;
-
- if(!spinlock_trylock(&extent_buffer_globals.protected.spinlock))
- return;
-
- if(extent_buffer_globals.protected.available_items && extent_buffer_globals.protected.available > 1) {
- item = extent_buffer_globals.protected.available_items;
- DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(extent_buffer_globals.protected.available_items, item, cache.prev, cache.next);
- extent_buffer_globals.protected.available--;
- }
-
- spinlock_unlock(&extent_buffer_globals.protected.spinlock);
-
- if(item) {
- size_t bytes = sizeof(struct extent_buffer) + item->bytes;
- freez(item);
- __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
- __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated_bytes, bytes, __ATOMIC_RELAXED);
- }
-}
-
-struct extent_buffer *extent_buffer_get(size_t size) {
- internal_fatal(size > extent_buffer_globals.max_size, "DBENGINE: extent size is too big");
-
- struct extent_buffer *eb = NULL;
-
- if(size < extent_buffer_globals.max_size)
- size = extent_buffer_globals.max_size;
-
- spinlock_lock(&extent_buffer_globals.protected.spinlock);
- if(likely(extent_buffer_globals.protected.available_items)) {
- eb = extent_buffer_globals.protected.available_items;
- DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(extent_buffer_globals.protected.available_items, eb, cache.prev, cache.next);
- extent_buffer_globals.protected.available--;
- }
- spinlock_unlock(&extent_buffer_globals.protected.spinlock);
-
- if(unlikely(eb && eb->bytes < size)) {
- size_t bytes = sizeof(struct extent_buffer) + eb->bytes;
- freez(eb);
- eb = NULL;
- __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
- __atomic_sub_fetch(&extent_buffer_globals.atomics.allocated_bytes, bytes, __ATOMIC_RELAXED);
- }
-
- if(unlikely(!eb)) {
- size_t bytes = sizeof(struct extent_buffer) + size;
- eb = mallocz(bytes);
- eb->bytes = size;
- __atomic_add_fetch(&extent_buffer_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&extent_buffer_globals.atomics.allocated_bytes, bytes, __ATOMIC_RELAXED);
- }
-
- return eb;
-}
-
-void extent_buffer_release(struct extent_buffer *eb) {
- if(unlikely(!eb)) return;
-
- spinlock_lock(&extent_buffer_globals.protected.spinlock);
- DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(extent_buffer_globals.protected.available_items, eb, cache.prev, cache.next);
- extent_buffer_globals.protected.available++;
- spinlock_unlock(&extent_buffer_globals.protected.spinlock);
-}
-
-size_t extent_buffer_cache_size(void) {
- return __atomic_load_n(&extent_buffer_globals.atomics.allocated_bytes, __ATOMIC_RELAXED);
-}
-
-// ----------------------------------------------------------------------------
-// epdl logic
-
-static void epdl_destroy(EPDL *epdl)
-{
- Pvoid_t *pd_by_start_time_s_JudyL;
- Word_t metric_id_index = 0;
- bool metric_id_first = true;
- while ((pd_by_start_time_s_JudyL = PDCJudyLFirstThenNext(
- epdl->page_details_by_metric_id_JudyL,
- &metric_id_index, &metric_id_first)))
- PDCJudyLFreeArray(pd_by_start_time_s_JudyL, PJE0);
-
- PDCJudyLFreeArray(&epdl->page_details_by_metric_id_JudyL, PJE0);
- epdl_release(epdl);
-}
-
-static void epdl_mark_all_not_loaded_pages_as_failed(EPDL *epdl, PDC_PAGE_STATUS tags, size_t *statistics_counter)
-{
- size_t pages_matched = 0;
-
- Word_t metric_id_index = 0;
- bool metric_id_first = true;
- Pvoid_t *pd_by_start_time_s_JudyL;
- while((pd_by_start_time_s_JudyL = PDCJudyLFirstThenNext(epdl->page_details_by_metric_id_JudyL, &metric_id_index, &metric_id_first))) {
-
- Word_t start_time_index = 0;
- bool start_time_first = true;
- Pvoid_t *PValue;
- while ((PValue = PDCJudyLFirstThenNext(*pd_by_start_time_s_JudyL, &start_time_index, &start_time_first))) {
- struct page_details *pd = *PValue;
-
- if(!pd->page && !pdc_page_status_check(pd, PDC_PAGE_FAILED|PDC_PAGE_READY)) {
- pdc_page_status_set(pd, PDC_PAGE_FAILED | tags);
- pages_matched++;
- }
- }
- }
-
- if(pages_matched && statistics_counter)
- __atomic_add_fetch(statistics_counter, pages_matched, __ATOMIC_RELAXED);
-}
-/*
-static bool epdl_check_if_pages_are_already_in_cache(struct rrdengine_instance *ctx, EPDL *epdl, PDC_PAGE_STATUS tags)
-{
- size_t count_remaining = 0;
- size_t found = 0;
-
- Word_t metric_id_index = 0;
- bool metric_id_first = true;
- Pvoid_t *pd_by_start_time_s_JudyL;
- while((pd_by_start_time_s_JudyL = PDCJudyLFirstThenNext(epdl->page_details_by_metric_id_JudyL, &metric_id_index, &metric_id_first))) {
-
- Word_t start_time_index = 0;
- bool start_time_first = true;
- Pvoid_t *PValue;
- while ((PValue = PDCJudyLFirstThenNext(*pd_by_start_time_s_JudyL, &start_time_index, &start_time_first))) {
- struct page_details *pd = *PValue;
- if (pd->page)
- continue;
-
- pd->page = pgc_page_get_and_acquire(main_cache, (Word_t) ctx, pd->metric_id, pd->first_time_s, PGC_SEARCH_EXACT);
- if (pd->page) {
- found++;
- pdc_page_status_set(pd, PDC_PAGE_READY | tags);
- }
- else
- count_remaining++;
- }
- }
-
- if(found) {
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_ok_preloaded, found, __ATOMIC_RELAXED);
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache, found, __ATOMIC_RELAXED);
- }
-
- return count_remaining == 0;
-}
-*/
-
-// ----------------------------------------------------------------------------
-// PDC logic
-
-static void pdc_destroy(PDC *pdc) {
- mrg_metric_release(main_mrg, pdc->metric);
- completion_destroy(&pdc->prep_completion);
- completion_destroy(&pdc->page_completion);
-
- Pvoid_t *PValue;
- struct page_details *pd;
- Word_t time_index = 0;
- bool first_then_next = true;
- size_t unroutable = 0, cancelled = 0;
- while((PValue = PDCJudyLFirstThenNext(pdc->page_list_JudyL, &time_index, &first_then_next))) {
- pd = *PValue;
-
- // no need for atomics here - we are done...
- PDC_PAGE_STATUS status = pd->status;
-
- if(status & PDC_PAGE_DATAFILE_ACQUIRED) {
- datafile_release(pd->datafile.ptr, DATAFILE_ACQUIRE_PAGE_DETAILS);
- pd->datafile.ptr = NULL;
- }
-
- internal_fatal(pd->datafile.ptr, "DBENGINE: page details has a datafile.ptr that is not released.");
-
- if(!pd->page && !(status & (PDC_PAGE_READY | PDC_PAGE_FAILED | PDC_PAGE_RELEASED | PDC_PAGE_SKIP | PDC_PAGE_INVALID | PDC_PAGE_CANCELLED))) {
- // pdc_page_status_set(pd, PDC_PAGE_FAILED);
- unroutable++;
- }
- else if(!pd->page && (status & PDC_PAGE_CANCELLED))
- cancelled++;
-
- if(pd->page && !(status & PDC_PAGE_RELEASED)) {
- pgc_page_release(main_cache, pd->page);
- // pdc_page_status_set(pd, PDC_PAGE_RELEASED);
- }
-
- page_details_release(pd);
- }
-
- PDCJudyLFreeArray(&pdc->page_list_JudyL, PJE0);
-
- __atomic_sub_fetch(&rrdeng_cache_efficiency_stats.currently_running_queries, 1, __ATOMIC_RELAXED);
- __atomic_sub_fetch(&pdc->ctx->atomic.inflight_queries, 1, __ATOMIC_RELAXED);
- pdc_release(pdc);
-
- if(unroutable)
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_fail_unroutable, unroutable, __ATOMIC_RELAXED);
-
- if(cancelled)
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_fail_cancelled, cancelled, __ATOMIC_RELAXED);
-}
-
-void pdc_acquire(PDC *pdc) {
- spinlock_lock(&pdc->refcount_spinlock);
-
- if(pdc->refcount < 1)
- fatal("DBENGINE: pdc is not referenced and cannot be acquired");
-
- pdc->refcount++;
- spinlock_unlock(&pdc->refcount_spinlock);
-}
-
-bool pdc_release_and_destroy_if_unreferenced(PDC *pdc, bool worker, bool router __maybe_unused) {
- if(unlikely(!pdc))
- return true;
-
- spinlock_lock(&pdc->refcount_spinlock);
-
- if(pdc->refcount <= 0)
- fatal("DBENGINE: pdc is not referenced and cannot be released");
-
- pdc->refcount--;
-
- if (pdc->refcount <= 1 && worker) {
- // when 1 refcount is remaining, and we are a worker,
- // we can mark the job completed:
- // - if the remaining refcount is from the query caller, we will wake it up
- // - if the remaining refcount is from another worker, the query thread is already away
- completion_mark_complete(&pdc->page_completion);
- }
-
- if (pdc->refcount == 0) {
- spinlock_unlock(&pdc->refcount_spinlock);
- pdc_destroy(pdc);
- return true;
- }
-
- spinlock_unlock(&pdc->refcount_spinlock);
- return false;
-}
-
-void epdl_cmd_queued(void *epdl_ptr, struct rrdeng_cmd *cmd) {
- EPDL *epdl = epdl_ptr;
- epdl->cmd = cmd;
-}
-
-void epdl_cmd_dequeued(void *epdl_ptr) {
- EPDL *epdl = epdl_ptr;
- epdl->cmd = NULL;
-}
-
-static struct rrdeng_cmd *epdl_get_cmd(void *epdl_ptr) {
- EPDL *epdl = epdl_ptr;
- return epdl->cmd;
-}
-
-static bool epdl_pending_add(EPDL *epdl) {
- bool added_new;
-
- spinlock_lock(&epdl->datafile->extent_queries.spinlock);
- Pvoid_t *PValue = JudyLIns(&epdl->datafile->extent_queries.pending_epdl_by_extent_offset_judyL, epdl->extent_offset, PJE0);
- internal_fatal(!PValue || PValue == PJERR, "DBENGINE: corrupted pending extent judy");
-
- EPDL *base = *PValue;
-
- if(!base) {
- added_new = true;
- epdl->head_to_datafile_extent_queries_pending_for_extent = true;
- }
- else {
- added_new = false;
- epdl->head_to_datafile_extent_queries_pending_for_extent = false;
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_extent_merged, 1, __ATOMIC_RELAXED);
-
- if(base->pdc->priority > epdl->pdc->priority)
- rrdeng_req_cmd(epdl_get_cmd, base, epdl->pdc->priority);
- }
-
- DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(base, epdl, query.prev, query.next);
- *PValue = base;
-
- spinlock_unlock(&epdl->datafile->extent_queries.spinlock);
-
- return added_new;
-}
-
-static void epdl_pending_del(EPDL *epdl) {
- spinlock_lock(&epdl->datafile->extent_queries.spinlock);
- if(epdl->head_to_datafile_extent_queries_pending_for_extent) {
- epdl->head_to_datafile_extent_queries_pending_for_extent = false;
- int rc = JudyLDel(&epdl->datafile->extent_queries.pending_epdl_by_extent_offset_judyL, epdl->extent_offset, PJE0);
- (void) rc;
- internal_fatal(!rc, "DBENGINE: epdl not found in pending list");
- }
- spinlock_unlock(&epdl->datafile->extent_queries.spinlock);
-}
-
-void pdc_to_epdl_router(struct rrdengine_instance *ctx, PDC *pdc, execute_extent_page_details_list_t exec_first_extent_list, execute_extent_page_details_list_t exec_rest_extent_list)
-{
- Pvoid_t *PValue;
- Pvoid_t *PValue1;
- Pvoid_t *PValue2;
- Word_t time_index = 0;
- struct page_details *pd = NULL;
-
- // this is the entire page list
- // Lets do some deduplication
- // 1. Per datafile
- // 2. Per extent
- // 3. Pages per extent will be added to the cache either as acquired or not
-
- Pvoid_t JudyL_datafile_list = NULL;
-
- DEOL *deol;
- EPDL *epdl;
-
- if (pdc->page_list_JudyL) {
- bool first_then_next = true;
- while((PValue = PDCJudyLFirstThenNext(pdc->page_list_JudyL, &time_index, &first_then_next))) {
- pd = *PValue;
-
- internal_fatal(!pd,
- "DBENGINE: pdc page list has an empty page details entry");
-
- if (!(pd->status & PDC_PAGE_DISK_PENDING))
- continue;
-
- internal_fatal(!(pd->status & PDC_PAGE_DATAFILE_ACQUIRED),
- "DBENGINE: page details has not acquired the datafile");
-
- internal_fatal((pd->status & (PDC_PAGE_READY | PDC_PAGE_FAILED)),
- "DBENGINE: page details has disk pending flag but it is ready/failed");
-
- internal_fatal(pd->page,
- "DBENGINE: page details has a page linked to it, but it is marked for loading");
-
- PValue1 = PDCJudyLIns(&JudyL_datafile_list, pd->datafile.fileno, PJE0);
- if (PValue1 && !*PValue1) {
- *PValue1 = deol = deol_get();
- deol->extent_pd_list_by_extent_offset_JudyL = NULL;
- deol->fileno = pd->datafile.fileno;
- }
- else
- deol = *PValue1;
-
- PValue2 = PDCJudyLIns(&deol->extent_pd_list_by_extent_offset_JudyL, pd->datafile.extent.pos, PJE0);
- if (PValue2 && !*PValue2) {
- *PValue2 = epdl = epdl_get();
- epdl->page_details_by_metric_id_JudyL = NULL;
- epdl->number_of_pages_in_JudyL = 0;
- epdl->file = pd->datafile.file;
- epdl->extent_offset = pd->datafile.extent.pos;
- epdl->extent_size = pd->datafile.extent.bytes;
- epdl->datafile = pd->datafile.ptr;
- }
- else
- epdl = *PValue2;
-
- epdl->number_of_pages_in_JudyL++;
-
- Pvoid_t *pd_by_first_time_s_judyL = PDCJudyLIns(&epdl->page_details_by_metric_id_JudyL, pd->metric_id, PJE0);
- Pvoid_t *pd_pptr = PDCJudyLIns(pd_by_first_time_s_judyL, pd->first_time_s, PJE0);
- *pd_pptr = pd;
- }
-
- size_t extent_list_no = 0;
- Word_t datafile_no = 0;
- first_then_next = true;
- while((PValue = PDCJudyLFirstThenNext(JudyL_datafile_list, &datafile_no, &first_then_next))) {
- deol = *PValue;
-
- bool first_then_next_extent = true;
- Word_t pos = 0;
- while ((PValue = PDCJudyLFirstThenNext(deol->extent_pd_list_by_extent_offset_JudyL, &pos, &first_then_next_extent))) {
- epdl = *PValue;
- internal_fatal(!epdl, "DBENGINE: extent_list is not populated properly");
-
- // The extent page list can be dispatched to a worker
- // It will need to populate the cache with "acquired" pages that are in the list (pd) only
- // the rest of the extent pages will be added to the cache butnot acquired
-
- pdc_acquire(pdc); // we do this for the next worker: do_read_extent_work()
- epdl->pdc = pdc;
-
- if(epdl_pending_add(epdl)) {
- if (extent_list_no++ == 0)
- exec_first_extent_list(ctx, epdl, pdc->priority);
- else
- exec_rest_extent_list(ctx, epdl, pdc->priority);
- }
- }
- PDCJudyLFreeArray(&deol->extent_pd_list_by_extent_offset_JudyL, PJE0);
- deol_release(deol);
- }
- PDCJudyLFreeArray(&JudyL_datafile_list, PJE0);
- }
-
- pdc_release_and_destroy_if_unreferenced(pdc, true, true);
-}
-
-void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags) {
- if(flags & RRDENG_PAGE_PAST_COLLECTION)
- buffer_strcat(wb, "PAST_COLLECTION ");
- if(flags & RRDENG_PAGE_REPEATED_COLLECTION)
- buffer_strcat(wb, "REPEATED_COLLECTION ");
- if(flags & RRDENG_PAGE_BIG_GAP)
- buffer_strcat(wb, "BIG_GAP ");
- if(flags & RRDENG_PAGE_GAP)
- buffer_strcat(wb, "GAP ");
- if(flags & RRDENG_PAGE_FUTURE_POINT)
- buffer_strcat(wb, "FUTURE_POINT ");
- if(flags & RRDENG_PAGE_CREATED_IN_FUTURE)
- buffer_strcat(wb, "CREATED_IN_FUTURE ");
- if(flags & RRDENG_PAGE_COMPLETED_IN_FUTURE)
- buffer_strcat(wb, "COMPLETED_IN_FUTURE ");
- if(flags & RRDENG_PAGE_UNALIGNED)
- buffer_strcat(wb, "UNALIGNED ");
- if(flags & RRDENG_PAGE_CONFLICT)
- buffer_strcat(wb, "CONFLICT ");
- if(flags & RRDENG_PAGE_FULL)
- buffer_strcat(wb, "PAGE_FULL");
- if(flags & RRDENG_PAGE_COLLECT_FINALIZE)
- buffer_strcat(wb, "COLLECT_FINALIZE");
- if(flags & RRDENG_PAGE_UPDATE_EVERY_CHANGE)
- buffer_strcat(wb, "UPDATE_EVERY_CHANGE");
- if(flags & RRDENG_PAGE_STEP_TOO_SMALL)
- buffer_strcat(wb, "STEP_TOO_SMALL");
- if(flags & RRDENG_PAGE_STEP_UNALIGNED)
- buffer_strcat(wb, "STEP_UNALIGNED");
-}
-
-inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error) {
- time_t start_time_s = (time_t) (descr->start_time_ut / USEC_PER_SEC);
-
- time_t end_time_s;
- size_t entries;
-
- switch (descr->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
- end_time_s = descr->end_time_ut / USEC_PER_SEC;
- entries = 0;
- break;
- case PAGE_GORILLA_METRICS:
- end_time_s = start_time_s + descr->gorilla.delta_time_s;
- entries = descr->gorilla.entries;
- break;
- default:
- fatal("Unknown page type: %uc\n", descr->type);
- }
-
- return validate_page(
- (uuid_t *)descr->uuid,
- start_time_s,
- end_time_s,
- 0,
- descr->page_length,
- descr->type,
- entries,
- now_s,
- overwrite_zero_update_every_s,
- have_read_error,
- "loaded", 0);
-}
-
-VALIDATED_PAGE_DESCRIPTOR validate_page(
- uuid_t *uuid,
- time_t start_time_s,
- time_t end_time_s,
- time_t update_every_s, // can be zero, if unknown
- size_t page_length,
- uint8_t page_type,
- size_t entries, // can be zero, if unknown
- time_t now_s, // can be zero, to disable future timestamp check
- time_t overwrite_zero_update_every_s, // can be zero, if unknown
- bool have_read_error,
- const char *msg,
- RRDENG_COLLECT_PAGE_FLAGS flags) {
-
- VALIDATED_PAGE_DESCRIPTOR vd = {
- .start_time_s = start_time_s,
- .end_time_s = end_time_s,
- .update_every_s = update_every_s,
- .page_length = page_length,
- .type = page_type,
- .is_valid = true,
- };
-
- vd.point_size = page_type_size[vd.type];
- switch (page_type) {
- case PAGE_METRICS:
- case PAGE_TIER:
- // always calculate entries by size
- vd.entries = page_entries_by_size(vd.page_length, vd.point_size);
-
- // allow to be called without entries (when loading pages from disk)
- if(!entries)
- entries = vd.entries;
- break;
- case PAGE_GORILLA_METRICS:
- internal_fatal(entries == 0, "0 number of entries found on gorilla page");
- vd.entries = entries;
- break;
- default:
- // TODO: should set vd.is_valid false instead?
- fatal("Unknown page type: %uc", page_type);
- }
-
- // allow to be called without update every (when loading pages from disk)
- if(!update_every_s) {
- vd.update_every_s = (vd.entries > 1) ? ((vd.end_time_s - vd.start_time_s) / (time_t) (vd.entries - 1))
- : overwrite_zero_update_every_s;
-
- update_every_s = vd.update_every_s;
- }
-
- // another such set of checks exists in
- // update_metric_retention_and_granularity_by_uuid()
-
- bool updated = false;
-
- size_t max_page_length = RRDENG_BLOCK_SIZE;
-
- // If gorilla can not compress the data we might end up needing slightly more
- // than 4KiB. However, gorilla pages extend the page length by increments of
- // 512 bytes.
- max_page_length += ((page_type == PAGE_GORILLA_METRICS) * GORILLA_BUFFER_SIZE);
-
- if( have_read_error ||
- vd.page_length == 0 ||
- vd.page_length > max_page_length ||
- vd.start_time_s > vd.end_time_s ||
- (now_s && vd.end_time_s > now_s) ||
- vd.start_time_s <= 0 ||
- vd.end_time_s <= 0 ||
- vd.update_every_s < 0 ||
- (vd.start_time_s == vd.end_time_s && vd.entries > 1) ||
- (vd.update_every_s == 0 && vd.entries > 1))
- {
- vd.is_valid = false;
- }
- else {
- if(unlikely(vd.entries != entries || vd.update_every_s != update_every_s))
- updated = true;
-
- if (likely(vd.update_every_s)) {
- size_t entries_by_time = page_entries_by_time(vd.start_time_s, vd.end_time_s, vd.update_every_s);
-
- if (vd.entries != entries_by_time) {
- if (overwrite_zero_update_every_s < vd.update_every_s)
- vd.update_every_s = overwrite_zero_update_every_s;
-
- time_t new_end_time_s = (time_t)(vd.start_time_s + (vd.entries - 1) * vd.update_every_s);
-
- if(new_end_time_s <= vd.end_time_s) {
- // end time is wrong
- vd.end_time_s = new_end_time_s;
- }
- else {
- // update every is wrong
- vd.update_every_s = overwrite_zero_update_every_s;
- vd.end_time_s = (time_t)(vd.start_time_s + (vd.entries - 1) * vd.update_every_s);
- }
-
- updated = true;
- }
- }
- else if(overwrite_zero_update_every_s) {
- vd.update_every_s = overwrite_zero_update_every_s;
- updated = true;
- }
- }
-
- if(unlikely(!vd.is_valid || updated)) {
-#ifndef NETDATA_INTERNAL_CHECKS
- nd_log_limit_static_global_var(erl, 1, 0);
-#endif
- char uuid_str[UUID_STR_LEN + 1];
- uuid_unparse(*uuid, uuid_str);
-
- BUFFER *wb = NULL;
-
- if(flags) {
- wb = buffer_create(0, NULL);
- collect_page_flags_to_buffer(wb, flags);
- }
-
- if(!vd.is_valid) {
-#ifdef NETDATA_INTERNAL_CHECKS
- internal_error(true,
-#else
- nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
-#endif
- "DBENGINE: metric '%s' %s invalid page of type %u "
- "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s)",
- uuid_str, msg, vd.type,
- vd.start_time_s, vd.end_time_s, now_s, vd.update_every_s, vd.page_length, vd.entries, wb?buffer_tostring(wb):""
- );
- }
- else {
- const char *err_valid = (vd.is_valid) ? "" : "found invalid, ";
- const char *err_start = (vd.start_time_s == start_time_s) ? "" : "start time updated, ";
- const char *err_end = (vd.end_time_s == end_time_s) ? "" : "end time updated, ";
- const char *err_update = (vd.update_every_s == update_every_s) ? "" : "update every updated, ";
- const char *err_length = (vd.page_length == page_length) ? "" : "page length updated, ";
- const char *err_entries = (vd.entries == entries) ? "" : "entries updated, ";
- const char *err_future = (now_s && vd.end_time_s <= now_s) ? "" : "future end time, ";
-
-#ifdef NETDATA_INTERNAL_CHECKS
- internal_error(true,
-#else
- nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
-#endif
- "DBENGINE: metric '%s' %s page of type %u "
- "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s), "
- "found inconsistent - the right is "
- "from %ld to %ld, update every %ld, page length %zu, entries %zu: "
- "%s%s%s%s%s%s%s",
- uuid_str, msg, vd.type,
- start_time_s, end_time_s, now_s, update_every_s, page_length, entries, wb?buffer_tostring(wb):"",
- vd.start_time_s, vd.end_time_s, vd.update_every_s, vd.page_length, vd.entries,
- err_valid, err_start, err_end, err_update, err_length, err_entries, err_future
- );
- }
-
- buffer_free(wb);
- }
-
- return vd;
-}
-
-static inline struct page_details *epdl_get_pd_load_link_list_from_metric_start_time(EPDL *epdl, Word_t metric_id, time_t start_time_s) {
-
- if(unlikely(epdl->head_to_datafile_extent_queries_pending_for_extent))
- // stop appending more pages to this epdl
- epdl_pending_del(epdl);
-
- struct page_details *pd_list = NULL;
-
- for(EPDL *ep = epdl; ep ;ep = ep->query.next) {
- Pvoid_t *pd_by_start_time_s_judyL = PDCJudyLGet(ep->page_details_by_metric_id_JudyL, metric_id, PJE0);
- internal_fatal(pd_by_start_time_s_judyL == PJERR, "DBENGINE: corrupted extent metrics JudyL");
-
- if (unlikely(pd_by_start_time_s_judyL && *pd_by_start_time_s_judyL)) {
- Pvoid_t *pd_pptr = PDCJudyLGet(*pd_by_start_time_s_judyL, start_time_s, PJE0);
- internal_fatal(pd_pptr == PJERR, "DBENGINE: corrupted metric page details JudyHS");
-
- if(likely(pd_pptr && *pd_pptr)) {
- struct page_details *pd = *pd_pptr;
- internal_fatal(metric_id != pd->metric_id, "DBENGINE: metric ids do not match");
-
- if(likely(!pd->page)) {
- if (unlikely(__atomic_load_n(&ep->pdc->workers_should_stop, __ATOMIC_RELAXED)))
- pdc_page_status_set(pd, PDC_PAGE_FAILED | PDC_PAGE_CANCELLED);
- else
- DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(pd_list, pd, load.prev, load.next);
- }
- }
- }
- }
-
- return pd_list;
-}
-
-static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL *epdl, struct rrdeng_extent_page_descr *descr, const char *msg) {
- char uuid[UUID_STR_LEN] = "";
- time_t start_time_s = 0;
- time_t end_time_s = 0;
- bool used_epdl = false;
- bool used_descr = false;
-
- if (descr) {
- start_time_s = (time_t)(descr->start_time_ut / USEC_PER_SEC);
- switch (descr->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
- end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC);
- break;
- case PAGE_GORILLA_METRICS:
- end_time_s = (time_t) start_time_s + (descr->gorilla.delta_time_s);
- break;
- }
- uuid_unparse_lower(descr->uuid, uuid);
- used_descr = true;
- }
- else {
- struct page_details *pd = NULL;
-
- Word_t start = 0;
- Pvoid_t *pd_by_start_time_s_judyL = PDCJudyLFirst(epdl->page_details_by_metric_id_JudyL, &start, PJE0);
- if(pd_by_start_time_s_judyL) {
- start = 0;
- Pvoid_t *pd_pptr = PDCJudyLFirst(*pd_by_start_time_s_judyL, &start, PJE0);
- if(pd_pptr) {
- pd = *pd_pptr;
- start_time_s = pd->first_time_s;
- end_time_s = pd->last_time_s;
- METRIC *metric = (METRIC *)pd->metric_id;
- uuid_t *u = mrg_metric_uuid(main_mrg, metric);
- uuid_unparse_lower(*u, uuid);
- used_epdl = true;
- }
- }
- }
-
- if(!used_epdl && !used_descr && epdl->pdc) {
- start_time_s = epdl->pdc->start_time_s;
- end_time_s = epdl->pdc->end_time_s;
- }
-
- char start_time_str[LOG_DATE_LENGTH + 1] = "";
- if(start_time_s)
- log_date(start_time_str, LOG_DATE_LENGTH, start_time_s);
-
- char end_time_str[LOG_DATE_LENGTH + 1] = "";
- if(end_time_s)
- log_date(end_time_str, LOG_DATE_LENGTH, end_time_s);
-
- nd_log_limit_static_global_var(erl, 1, 0);
- nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
- "DBENGINE: error while reading extent from datafile %u of tier %d, at offset %" PRIu64 " (%u bytes) "
- "%s from %ld (%s) to %ld (%s) %s%s: "
- "%s",
- epdl->datafile->fileno, ctx->config.tier,
- epdl->extent_offset, epdl->extent_size,
- used_epdl ? "to extract page (PD)" : used_descr ? "expected page (DESCR)" : "part of a query (PDC)",
- start_time_s, start_time_str, end_time_s, end_time_str,
- used_epdl || used_descr ? " of metric " : "",
- used_epdl || used_descr ? uuid : "",
- msg);
-}
-
-static bool epdl_populate_pages_from_extent_data(
- struct rrdengine_instance *ctx,
- void *data,
- size_t data_length,
- EPDL *epdl,
- bool worker,
- PDC_PAGE_STATUS tags,
- bool cached_extent)
-{
- int ret;
- unsigned i, count;
- void *uncompressed_buf = NULL;
- uint32_t payload_length, payload_offset, trailer_offset, uncompressed_payload_length = 0;
- bool have_read_error = false;
- /* persistent structures */
- struct rrdeng_df_extent_header *header;
- struct rrdeng_df_extent_trailer *trailer;
- struct extent_buffer *eb = NULL;
- uLong crc;
-
- bool can_use_data = true;
- if(data_length < sizeof(*header) + sizeof(header->descr[0]) + sizeof(*trailer)) {
- can_use_data = false;
-
- // added to satisfy the requirements of older compilers (prevent warnings)
- payload_length = 0;
- payload_offset = 0;
- trailer_offset = 0;
- count = 0;
- header = NULL;
- trailer = NULL;
- }
- else {
- header = data;
- payload_length = header->payload_length;
- count = header->number_of_pages;
- payload_offset = sizeof(*header) + sizeof(header->descr[0]) * count;
- trailer_offset = data_length - sizeof(*trailer);
- trailer = data + trailer_offset;
- }
-
- if( !can_use_data ||
- count < 1 ||
- count > MAX_PAGES_PER_EXTENT ||
- (header->compression_algorithm != RRD_NO_COMPRESSION && header->compression_algorithm != RRD_LZ4) ||
- (payload_length != trailer_offset - payload_offset) ||
- (data_length != payload_offset + payload_length + sizeof(*trailer))
- ) {
- epdl_extent_loading_error_log(ctx, epdl, NULL, "header is INVALID");
- return false;
- }
-
- crc = crc32(0L, Z_NULL, 0);
- crc = crc32(crc, data, epdl->extent_size - sizeof(*trailer));
- ret = crc32cmp(trailer->checksum, crc);
- if (unlikely(ret)) {
- ctx_io_error(ctx);
- have_read_error = true;
- epdl_extent_loading_error_log(ctx, epdl, NULL, "CRC32 checksum FAILED");
- }
-
- if(worker)
- worker_is_busy(UV_EVENT_DBENGINE_EXTENT_DECOMPRESSION);
-
- if (likely(!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm)) {
- // find the uncompressed extent size
- uncompressed_payload_length = 0;
- for (i = 0; i < count; ++i) {
- size_t page_length = header->descr[i].page_length;
- if (page_length > RRDENG_BLOCK_SIZE && (header->descr[i].type != PAGE_GORILLA_METRICS ||
- (header->descr[i].type == PAGE_GORILLA_METRICS &&
- (page_length - RRDENG_BLOCK_SIZE) % GORILLA_BUFFER_SIZE))) {
- have_read_error = true;
- break;
- }
-
- uncompressed_payload_length += header->descr[i].page_length;
- }
-
- if(unlikely(uncompressed_payload_length > MAX_PAGES_PER_EXTENT * RRDENG_BLOCK_SIZE))
- have_read_error = true;
-
- if(likely(!have_read_error)) {
- eb = extent_buffer_get(uncompressed_payload_length);
- uncompressed_buf = eb->data;
-
- ret = LZ4_decompress_safe(data + payload_offset, uncompressed_buf,
- (int) payload_length, (int) uncompressed_payload_length);
-
- __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED);
- __atomic_add_fetch(&ctx->stats.after_decompress_bytes, ret, __ATOMIC_RELAXED);
- }
- }
-
- if(worker)
- worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_LOOKUP);
-
- size_t stats_data_from_main_cache = 0;
- size_t stats_data_from_extent = 0;
- size_t stats_load_compressed = 0;
- size_t stats_load_uncompressed = 0;
- size_t stats_load_invalid_page = 0;
- size_t stats_cache_hit_while_inserting = 0;
-
- uint32_t page_offset = 0, page_length;
- time_t now_s = max_acceptable_collected_time();
- for (i = 0; i < count; i++, page_offset += page_length) {
- page_length = header->descr[i].page_length;
- time_t start_time_s = (time_t) (header->descr[i].start_time_ut / USEC_PER_SEC);
-
- if(!page_length || !start_time_s) {
- char log[200 + 1];
- snprintfz(log, sizeof(log) - 1, "page %u (out of %u) is EMPTY", i, count);
- epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log);
- continue;
- }
-
- METRIC *metric = mrg_metric_get_and_acquire(main_mrg, &header->descr[i].uuid, (Word_t)ctx);
- Word_t metric_id = (Word_t)metric;
- if(!metric) {
- char log[200 + 1];
- snprintfz(log, sizeof(log) - 1, "page %u (out of %u) has unknown UUID", i, count);
- epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log);
- continue;
- }
- mrg_metric_release(main_mrg, metric);
-
- struct page_details *pd_list = epdl_get_pd_load_link_list_from_metric_start_time(epdl, metric_id, start_time_s);
- if(likely(!pd_list))
- continue;
-
- VALIDATED_PAGE_DESCRIPTOR vd = validate_extent_page_descr(
- &header->descr[i], now_s,
- (pd_list) ? pd_list->update_every_s : 0,
- have_read_error);
-
- if(worker)
- worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_ALLOCATION);
-
- PGD *pgd;
-
- if (unlikely(!vd.is_valid)) {
- pgd = PGD_EMPTY;
- stats_load_invalid_page++;
- }
- else {
- if (RRD_NO_COMPRESSION == header->compression_algorithm) {
- pgd = pgd_create_from_disk_data(header->descr[i].type,
- data + payload_offset + page_offset,
- vd.page_length);
- stats_load_uncompressed++;
- }
- else {
- if (unlikely(page_offset + vd.page_length > uncompressed_payload_length)) {
- char log[200 + 1];
- snprintfz(log, sizeof(log) - 1, "page %u (out of %u) offset %u + page length %zu, "
- "exceeds the uncompressed buffer size %u",
- i, count, page_offset, vd.page_length, uncompressed_payload_length);
- epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log);
-
- pgd = PGD_EMPTY;
- stats_load_invalid_page++;
- }
- else {
- pgd = pgd_create_from_disk_data(header->descr[i].type,
- uncompressed_buf + page_offset,
- vd.page_length);
- stats_load_compressed++;
- }
- }
- }
-
- if(worker)
- worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_POPULATION);
-
- PGC_ENTRY page_entry = {
- .hot = false,
- .section = (Word_t)ctx,
- .metric_id = metric_id,
- .start_time_s = vd.start_time_s,
- .end_time_s = vd.end_time_s,
- .update_every_s = (uint32_t) vd.update_every_s,
- .size = pgd_memory_footprint(pgd), // the footprint of the entire PGD, for accurate memory management
- .data = pgd,
- };
-
- bool added = true;
- PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, &added);
- if (false == added) {
- pgd_free(pgd);
- stats_cache_hit_while_inserting++;
- stats_data_from_main_cache++;
- }
- else
- stats_data_from_extent++;
-
- struct page_details *pd = pd_list;
- do {
- if(pd != pd_list)
- pgc_page_dup(main_cache, page);
-
- pd->page = page;
- pdc_page_status_set(pd, PDC_PAGE_READY | tags | (pgd_is_empty(pgd) ? PDC_PAGE_EMPTY : 0));
-
- pd = pd->load.next;
- } while(pd);
-
- if(worker)
- worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_LOOKUP);
- }
-
- if(stats_data_from_main_cache)
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_main_cache, stats_data_from_main_cache, __ATOMIC_RELAXED);
-
- if(cached_extent)
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_extent_cache, stats_data_from_extent, __ATOMIC_RELAXED);
- else {
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_data_source_disk, stats_data_from_extent, __ATOMIC_RELAXED);
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.extents_loaded_from_disk, 1, __ATOMIC_RELAXED);
- }
-
- if(stats_cache_hit_while_inserting)
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_ok_loaded_but_cache_hit_while_inserting, stats_cache_hit_while_inserting, __ATOMIC_RELAXED);
-
- if(stats_load_compressed)
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_ok_compressed, stats_load_compressed, __ATOMIC_RELAXED);
-
- if(stats_load_uncompressed)
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_ok_uncompressed, stats_load_uncompressed, __ATOMIC_RELAXED);
-
- if(stats_load_invalid_page)
- __atomic_add_fetch(&rrdeng_cache_efficiency_stats.pages_load_fail_invalid_page_in_extent, stats_load_invalid_page, __ATOMIC_RELAXED);
-
- if(worker)
- worker_is_idle();
-
- extent_buffer_release(eb);
-
- return true;
-}
-
-static inline void *datafile_extent_read(struct rrdengine_instance *ctx, uv_file file, unsigned pos, unsigned size_bytes)
-{
- void *buffer;
- uv_fs_t request;
-
- unsigned real_io_size = ALIGN_BYTES_CEILING(size_bytes);
- int ret = posix_memalign(&buffer, RRDFILE_ALIGNMENT, real_io_size);
- if (unlikely(ret))
- fatal("DBENGINE: posix_memalign(): %s", strerror(ret));
-
- uv_buf_t iov = uv_buf_init(buffer, real_io_size);
- ret = uv_fs_read(NULL, &request, file, &iov, 1, pos, NULL);
- if (unlikely(-1 == ret)) {
- ctx_io_error(ctx);
- posix_memfree(buffer);
- buffer = NULL;
- }
- else
- ctx_io_read_op_bytes(ctx, real_io_size);
-
- uv_fs_req_cleanup(&request);
-
- return buffer;
-}
-
-static inline void datafile_extent_read_free(void *buffer) {
- posix_memfree(buffer);
-}
-
-void epdl_find_extent_and_populate_pages(struct rrdengine_instance *ctx, EPDL *epdl, bool worker) {
- if(worker)
- worker_is_busy(UV_EVENT_DBENGINE_EXTENT_CACHE_LOOKUP);
-
- size_t *statistics_counter = NULL;
- PDC_PAGE_STATUS not_loaded_pages_tag = 0, loaded_pages_tag = 0;
-
- bool should_stop = __atomic_load_n(&epdl->pdc->workers_should_stop, __ATOMIC_RELAXED);
- for(EPDL *ep = epdl->query.next; ep ;ep = ep->query.next) {
- internal_fatal(ep->datafile != epdl->datafile, "DBENGINE: datafiles do not match");
- internal_fatal(ep->extent_offset != epdl->extent_offset, "DBENGINE: extent offsets do not match");
- internal_fatal(ep->extent_size != epdl->extent_size, "DBENGINE: extent sizes do not match");
- internal_fatal(ep->file != epdl->file, "DBENGINE: files do not match");
-
- if(!__atomic_load_n(&ep->pdc->workers_should_stop, __ATOMIC_RELAXED)) {
- should_stop = false;
- break;
- }
- }
-
- if(unlikely(should_stop)) {
- statistics_counter = &rrdeng_cache_efficiency_stats.pages_load_fail_cancelled;
- not_loaded_pages_tag = PDC_PAGE_CANCELLED;
- goto cleanup;
- }
-
- bool extent_found_in_cache = false;
-
- void *extent_compressed_data = NULL;
- PGC_PAGE *extent_cache_page = pgc_page_get_and_acquire(
- extent_cache, (Word_t)ctx,
- (Word_t)epdl->datafile->fileno, (time_t)epdl->extent_offset,
- PGC_SEARCH_EXACT);
-
- if(extent_cache_page) {
- extent_compressed_data = pgc_page_data(extent_cache_page);
- internal_fatal(epdl->extent_size != pgc_page_data_size(extent_cache, extent_cache_page),
- "DBENGINE: cache size does not match the expected size");
-
- loaded_pages_tag |= PDC_PAGE_EXTENT_FROM_CACHE;
- not_loaded_pages_tag |= PDC_PAGE_EXTENT_FROM_CACHE;
- extent_found_in_cache = true;
- }
- else {
- if(worker)
- worker_is_busy(UV_EVENT_DBENGINE_EXTENT_MMAP);
-
- void *extent_data = datafile_extent_read(ctx, epdl->file, epdl->extent_offset, epdl->extent_size);
- if(extent_data != NULL) {
-
- void *copied_extent_compressed_data = dbengine_extent_alloc(epdl->extent_size);
- memcpy(copied_extent_compressed_data, extent_data, epdl->extent_size);
- datafile_extent_read_free(extent_data);
-
- if(worker)
- worker_is_busy(UV_EVENT_DBENGINE_EXTENT_CACHE_LOOKUP);
-
- bool added = false;
- extent_cache_page = pgc_page_add_and_acquire(extent_cache, (PGC_ENTRY) {
- .hot = false,
- .section = (Word_t) ctx,
- .metric_id = (Word_t) epdl->datafile->fileno,
- .start_time_s = (time_t) epdl->extent_offset,
- .size = epdl->extent_size,
- .end_time_s = 0,
- .update_every_s = 0,
- .data = copied_extent_compressed_data,
- }, &added);
-
- if (!added) {
- dbengine_extent_free(copied_extent_compressed_data, epdl->extent_size);
- internal_fatal(epdl->extent_size != pgc_page_data_size(extent_cache, extent_cache_page),
- "DBENGINE: cache size does not match the expected size");
- }
-
- extent_compressed_data = pgc_page_data(extent_cache_page);
-
- loaded_pages_tag |= PDC_PAGE_EXTENT_FROM_DISK;
- not_loaded_pages_tag |= PDC_PAGE_EXTENT_FROM_DISK;
- }
- }
-
- if(extent_compressed_data) {
- // Need to decompress and then process the pagelist
- bool extent_used = epdl_populate_pages_from_extent_data(
- ctx, extent_compressed_data, epdl->extent_size,
- epdl, worker, loaded_pages_tag, extent_found_in_cache);
-
- if(extent_used) {
- // since the extent was used, all the pages that are not
- // loaded from this extent, were not found in the extent
- not_loaded_pages_tag |= PDC_PAGE_FAILED_NOT_IN_EXTENT;
- statistics_counter = &rrdeng_cache_efficiency_stats.pages_load_fail_not_found;
- }
- else {
- not_loaded_pages_tag |= PDC_PAGE_FAILED_INVALID_EXTENT;
- statistics_counter = &rrdeng_cache_efficiency_stats.pages_load_fail_invalid_extent;
- }
- }
- else {
- not_loaded_pages_tag |= PDC_PAGE_FAILED_TO_MAP_EXTENT;
- statistics_counter = &rrdeng_cache_efficiency_stats.pages_load_fail_cant_mmap_extent;
- }
-
- if(extent_cache_page)
- pgc_page_release(extent_cache, extent_cache_page);
-
-cleanup:
- // remove it from the datafile extent_queries
- // this can be called multiple times safely
- epdl_pending_del(epdl);
-
- // mark all pending pages as failed
- for(EPDL *ep = epdl; ep ;ep = ep->query.next) {
- epdl_mark_all_not_loaded_pages_as_failed(
- ep, not_loaded_pages_tag, statistics_counter);
- }
-
- for(EPDL *ep = epdl, *next = NULL; ep ; ep = next) {
- next = ep->query.next;
-
- completion_mark_complete_a_job(&ep->pdc->page_completion);
- pdc_release_and_destroy_if_unreferenced(ep->pdc, true, false);
-
- // Free the Judy that holds the requested pagelist and the extents
- epdl_destroy(ep);
- }
-
- if(worker)
- worker_is_idle();
-}