From 58daab21cd043e1dc37024a7f99b396788372918 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 9 Mar 2024 14:19:48 +0100 Subject: Merging upstream version 1.44.3. Signed-off-by: Daniel Baumann --- database/engine/pdc.c | 119 +++++++++++++++++++++++++++++++++++--------------- 1 file changed, 84 insertions(+), 35 deletions(-) (limited to 'database/engine/pdc.c') diff --git a/database/engine/pdc.c b/database/engine/pdc.c index 7da568787..5fe205e64 100644 --- a/database/engine/pdc.c +++ b/database/engine/pdc.c @@ -629,14 +629,33 @@ void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags) { } inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error) { + time_t start_time_s = (time_t) (descr->start_time_ut / USEC_PER_SEC); + + time_t end_time_s; + size_t entries; + + switch (descr->type) { + case PAGE_METRICS: + case PAGE_TIER: + end_time_s = descr->end_time_ut / USEC_PER_SEC; + entries = 0; + break; + case PAGE_GORILLA_METRICS: + end_time_s = start_time_s + descr->gorilla.delta_time_s; + entries = descr->gorilla.entries; + break; + default: + fatal("Unknown page type: %uc\n", descr->type); + } + return validate_page( (uuid_t *)descr->uuid, - (time_t) (descr->start_time_ut / USEC_PER_SEC), - (time_t) (descr->end_time_ut / USEC_PER_SEC), + start_time_s, + end_time_s, 0, descr->page_length, descr->type, - 0, + entries, now_s, overwrite_zero_update_every_s, have_read_error, @@ -666,13 +685,25 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( .is_valid = true, }; - // always calculate entries by size vd.point_size = page_type_size[vd.type]; - vd.entries = page_entries_by_size(vd.page_length, vd.point_size); - - // allow to be called without entries (when loading pages from disk) - if(!entries) - entries = vd.entries; + switch (page_type) { + case PAGE_METRICS: + case PAGE_TIER: + // always calculate entries by size + vd.entries = page_entries_by_size(vd.page_length, vd.point_size); + + // allow to be called without entries (when loading pages from disk) + if(!entries) + entries = vd.entries; + break; + case PAGE_GORILLA_METRICS: + internal_fatal(entries == 0, "0 number of entries found on gorilla page"); + vd.entries = entries; + break; + default: + // TODO: should set vd.is_valid false instead? + fatal("Unknown page type: %uc", page_type); + } // allow to be called without update every (when loading pages from disk) if(!update_every_s) { @@ -687,19 +718,26 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( bool updated = false; + size_t max_page_length = RRDENG_BLOCK_SIZE; + + // If gorilla can not compress the data we might end up needing slightly more + // than 4KiB. However, gorilla pages extend the page length by increments of + // 512 bytes. + max_page_length += ((page_type == PAGE_GORILLA_METRICS) * GORILLA_BUFFER_SIZE); + if( have_read_error || vd.page_length == 0 || - vd.page_length > RRDENG_BLOCK_SIZE || + vd.page_length > max_page_length || vd.start_time_s > vd.end_time_s || (now_s && vd.end_time_s > now_s) || vd.start_time_s <= 0 || vd.end_time_s <= 0 || vd.update_every_s < 0 || (vd.start_time_s == vd.end_time_s && vd.entries > 1) || - (vd.update_every_s == 0 && vd.entries > 1) - ) + (vd.update_every_s == 0 && vd.entries > 1)) + { vd.is_valid = false; - + } else { if(unlikely(vd.entries != entries || vd.update_every_s != update_every_s)) updated = true; @@ -734,7 +772,7 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( if(unlikely(!vd.is_valid || updated)) { #ifndef NETDATA_INTERNAL_CHECKS - error_limit_static_global_var(erl, 1, 0); + nd_log_limit_static_global_var(erl, 1, 0); #endif char uuid_str[UUID_STR_LEN + 1]; uuid_unparse(*uuid, uuid_str); @@ -750,7 +788,7 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( #ifdef NETDATA_INTERNAL_CHECKS internal_error(true, #else - error_limit(&erl, + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, #endif "DBENGINE: metric '%s' %s invalid page of type %u " "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s)", @@ -770,7 +808,7 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( #ifdef NETDATA_INTERNAL_CHECKS internal_error(true, #else - error_limit(&erl, + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, #endif "DBENGINE: metric '%s' %s page of type %u " "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s), " @@ -832,7 +870,15 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL * if (descr) { start_time_s = (time_t)(descr->start_time_ut / USEC_PER_SEC); - end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC); + switch (descr->type) { + case PAGE_METRICS: + case PAGE_TIER: + end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC); + break; + case PAGE_GORILLA_METRICS: + end_time_s = (time_t) start_time_s + (descr->gorilla.delta_time_s); + break; + } uuid_unparse_lower(descr->uuid, uuid); used_descr = true; } @@ -869,8 +915,8 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL * if(end_time_s) log_date(end_time_str, LOG_DATE_LENGTH, end_time_s); - error_limit_static_global_var(erl, 1, 0); - error_limit(&erl, + nd_log_limit_static_global_var(erl, 1, 0); + nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, "DBENGINE: error while reading extent from datafile %u of tier %d, at offset %" PRIu64 " (%u bytes) " "%s from %ld (%s) to %ld (%s) %s%s: " "%s", @@ -952,7 +998,9 @@ static bool epdl_populate_pages_from_extent_data( uncompressed_payload_length = 0; for (i = 0; i < count; ++i) { size_t page_length = header->descr[i].page_length; - if(page_length > RRDENG_BLOCK_SIZE) { + if (page_length > RRDENG_BLOCK_SIZE && (header->descr[i].type != PAGE_GORILLA_METRICS || + (header->descr[i].type == PAGE_GORILLA_METRICS && + (page_length - RRDENG_BLOCK_SIZE) % GORILLA_BUFFER_SIZE))) { have_read_error = true; break; } @@ -993,7 +1041,7 @@ static bool epdl_populate_pages_from_extent_data( if(!page_length || !start_time_s) { char log[200 + 1]; - snprintfz(log, 200, "page %u (out of %u) is EMPTY", i, count); + snprintfz(log, sizeof(log) - 1, "page %u (out of %u) is EMPTY", i, count); epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log); continue; } @@ -1002,7 +1050,7 @@ static bool epdl_populate_pages_from_extent_data( Word_t metric_id = (Word_t)metric; if(!metric) { char log[200 + 1]; - snprintfz(log, 200, "page %u (out of %u) has unknown UUID", i, count); + snprintfz(log, sizeof(log) - 1, "page %u (out of %u) has unknown UUID", i, count); epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log); continue; } @@ -1020,32 +1068,34 @@ static bool epdl_populate_pages_from_extent_data( if(worker) worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_ALLOCATION); - void *page_data; + PGD *pgd; if (unlikely(!vd.is_valid)) { - page_data = DBENGINE_EMPTY_PAGE; + pgd = PGD_EMPTY; stats_load_invalid_page++; } else { if (RRD_NO_COMPRESSION == header->compression_algorithm) { - page_data = dbengine_page_alloc(vd.page_length); - memcpy(page_data, data + payload_offset + page_offset, (size_t) vd.page_length); + pgd = pgd_create_from_disk_data(header->descr[i].type, + data + payload_offset + page_offset, + vd.page_length); stats_load_uncompressed++; } else { if (unlikely(page_offset + vd.page_length > uncompressed_payload_length)) { char log[200 + 1]; - snprintfz(log, 200, "page %u (out of %u) offset %u + page length %zu, " + snprintfz(log, sizeof(log) - 1, "page %u (out of %u) offset %u + page length %zu, " "exceeds the uncompressed buffer size %u", i, count, page_offset, vd.page_length, uncompressed_payload_length); epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log); - page_data = DBENGINE_EMPTY_PAGE; + pgd = PGD_EMPTY; stats_load_invalid_page++; } else { - page_data = dbengine_page_alloc(vd.page_length); - memcpy(page_data, uncompressed_buf + page_offset, vd.page_length); + pgd = pgd_create_from_disk_data(header->descr[i].type, + uncompressed_buf + page_offset, + vd.page_length); stats_load_compressed++; } } @@ -1061,14 +1111,14 @@ static bool epdl_populate_pages_from_extent_data( .start_time_s = vd.start_time_s, .end_time_s = vd.end_time_s, .update_every_s = (uint32_t) vd.update_every_s, - .size = (size_t) ((page_data == DBENGINE_EMPTY_PAGE) ? 0 : vd.page_length), - .data = page_data + .size = pgd_memory_footprint(pgd), // the footprint of the entire PGD, for accurate memory management + .data = pgd, }; bool added = true; PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, &added); if (false == added) { - dbengine_page_free(page_data, vd.page_length); + pgd_free(pgd); stats_cache_hit_while_inserting++; stats_data_from_main_cache++; } @@ -1081,8 +1131,7 @@ static bool epdl_populate_pages_from_extent_data( pgc_page_dup(main_cache, page); pd->page = page; - pd->page_length = pgc_page_data_size(main_cache, page); - pdc_page_status_set(pd, PDC_PAGE_READY | tags | ((page_data == DBENGINE_EMPTY_PAGE) ? PDC_PAGE_EMPTY : 0)); + pdc_page_status_set(pd, PDC_PAGE_READY | tags | (pgd_is_empty(pgd) ? PDC_PAGE_EMPTY : 0)); pd = pd->load.next; } while(pd); -- cgit v1.2.3