summaryrefslogtreecommitdiffstats
path: root/database/engine/pdc.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:20:02 +0000
commit58daab21cd043e1dc37024a7f99b396788372918 (patch)
tree96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /database/engine/pdc.c
parentReleasing debian version 1.43.2-1. (diff)
downloadnetdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz
netdata-58daab21cd043e1dc37024a7f99b396788372918.zip
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'database/engine/pdc.c')
-rw-r--r--database/engine/pdc.c119
1 files changed, 84 insertions, 35 deletions
diff --git a/database/engine/pdc.c b/database/engine/pdc.c
index 7da568787..5fe205e64 100644
--- a/database/engine/pdc.c
+++ b/database/engine/pdc.c
@@ -629,14 +629,33 @@ void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags) {
}
inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error) {
+ time_t start_time_s = (time_t) (descr->start_time_ut / USEC_PER_SEC);
+
+ time_t end_time_s;
+ size_t entries;
+
+ switch (descr->type) {
+ case PAGE_METRICS:
+ case PAGE_TIER:
+ end_time_s = descr->end_time_ut / USEC_PER_SEC;
+ entries = 0;
+ break;
+ case PAGE_GORILLA_METRICS:
+ end_time_s = start_time_s + descr->gorilla.delta_time_s;
+ entries = descr->gorilla.entries;
+ break;
+ default:
+ fatal("Unknown page type: %uc\n", descr->type);
+ }
+
return validate_page(
(uuid_t *)descr->uuid,
- (time_t) (descr->start_time_ut / USEC_PER_SEC),
- (time_t) (descr->end_time_ut / USEC_PER_SEC),
+ start_time_s,
+ end_time_s,
0,
descr->page_length,
descr->type,
- 0,
+ entries,
now_s,
overwrite_zero_update_every_s,
have_read_error,
@@ -666,13 +685,25 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
.is_valid = true,
};
- // always calculate entries by size
vd.point_size = page_type_size[vd.type];
- vd.entries = page_entries_by_size(vd.page_length, vd.point_size);
-
- // allow to be called without entries (when loading pages from disk)
- if(!entries)
- entries = vd.entries;
+ switch (page_type) {
+ case PAGE_METRICS:
+ case PAGE_TIER:
+ // always calculate entries by size
+ vd.entries = page_entries_by_size(vd.page_length, vd.point_size);
+
+ // allow to be called without entries (when loading pages from disk)
+ if(!entries)
+ entries = vd.entries;
+ break;
+ case PAGE_GORILLA_METRICS:
+ internal_fatal(entries == 0, "0 number of entries found on gorilla page");
+ vd.entries = entries;
+ break;
+ default:
+ // TODO: should set vd.is_valid false instead?
+ fatal("Unknown page type: %uc", page_type);
+ }
// allow to be called without update every (when loading pages from disk)
if(!update_every_s) {
@@ -687,19 +718,26 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
bool updated = false;
+ size_t max_page_length = RRDENG_BLOCK_SIZE;
+
+ // If gorilla can not compress the data we might end up needing slightly more
+ // than 4KiB. However, gorilla pages extend the page length by increments of
+ // 512 bytes.
+ max_page_length += ((page_type == PAGE_GORILLA_METRICS) * GORILLA_BUFFER_SIZE);
+
if( have_read_error ||
vd.page_length == 0 ||
- vd.page_length > RRDENG_BLOCK_SIZE ||
+ vd.page_length > max_page_length ||
vd.start_time_s > vd.end_time_s ||
(now_s && vd.end_time_s > now_s) ||
vd.start_time_s <= 0 ||
vd.end_time_s <= 0 ||
vd.update_every_s < 0 ||
(vd.start_time_s == vd.end_time_s && vd.entries > 1) ||
- (vd.update_every_s == 0 && vd.entries > 1)
- )
+ (vd.update_every_s == 0 && vd.entries > 1))
+ {
vd.is_valid = false;
-
+ }
else {
if(unlikely(vd.entries != entries || vd.update_every_s != update_every_s))
updated = true;
@@ -734,7 +772,7 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
if(unlikely(!vd.is_valid || updated)) {
#ifndef NETDATA_INTERNAL_CHECKS
- error_limit_static_global_var(erl, 1, 0);
+ nd_log_limit_static_global_var(erl, 1, 0);
#endif
char uuid_str[UUID_STR_LEN + 1];
uuid_unparse(*uuid, uuid_str);
@@ -750,7 +788,7 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
#ifdef NETDATA_INTERNAL_CHECKS
internal_error(true,
#else
- error_limit(&erl,
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
#endif
"DBENGINE: metric '%s' %s invalid page of type %u "
"from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s)",
@@ -770,7 +808,7 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
#ifdef NETDATA_INTERNAL_CHECKS
internal_error(true,
#else
- error_limit(&erl,
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
#endif
"DBENGINE: metric '%s' %s page of type %u "
"from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s), "
@@ -832,7 +870,15 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL *
if (descr) {
start_time_s = (time_t)(descr->start_time_ut / USEC_PER_SEC);
- end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC);
+ switch (descr->type) {
+ case PAGE_METRICS:
+ case PAGE_TIER:
+ end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC);
+ break;
+ case PAGE_GORILLA_METRICS:
+ end_time_s = (time_t) start_time_s + (descr->gorilla.delta_time_s);
+ break;
+ }
uuid_unparse_lower(descr->uuid, uuid);
used_descr = true;
}
@@ -869,8 +915,8 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL *
if(end_time_s)
log_date(end_time_str, LOG_DATE_LENGTH, end_time_s);
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl,
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
"DBENGINE: error while reading extent from datafile %u of tier %d, at offset %" PRIu64 " (%u bytes) "
"%s from %ld (%s) to %ld (%s) %s%s: "
"%s",
@@ -952,7 +998,9 @@ static bool epdl_populate_pages_from_extent_data(
uncompressed_payload_length = 0;
for (i = 0; i < count; ++i) {
size_t page_length = header->descr[i].page_length;
- if(page_length > RRDENG_BLOCK_SIZE) {
+ if (page_length > RRDENG_BLOCK_SIZE && (header->descr[i].type != PAGE_GORILLA_METRICS ||
+ (header->descr[i].type == PAGE_GORILLA_METRICS &&
+ (page_length - RRDENG_BLOCK_SIZE) % GORILLA_BUFFER_SIZE))) {
have_read_error = true;
break;
}
@@ -993,7 +1041,7 @@ static bool epdl_populate_pages_from_extent_data(
if(!page_length || !start_time_s) {
char log[200 + 1];
- snprintfz(log, 200, "page %u (out of %u) is EMPTY", i, count);
+ snprintfz(log, sizeof(log) - 1, "page %u (out of %u) is EMPTY", i, count);
epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log);
continue;
}
@@ -1002,7 +1050,7 @@ static bool epdl_populate_pages_from_extent_data(
Word_t metric_id = (Word_t)metric;
if(!metric) {
char log[200 + 1];
- snprintfz(log, 200, "page %u (out of %u) has unknown UUID", i, count);
+ snprintfz(log, sizeof(log) - 1, "page %u (out of %u) has unknown UUID", i, count);
epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log);
continue;
}
@@ -1020,32 +1068,34 @@ static bool epdl_populate_pages_from_extent_data(
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_EXTENT_PAGE_ALLOCATION);
- void *page_data;
+ PGD *pgd;
if (unlikely(!vd.is_valid)) {
- page_data = DBENGINE_EMPTY_PAGE;
+ pgd = PGD_EMPTY;
stats_load_invalid_page++;
}
else {
if (RRD_NO_COMPRESSION == header->compression_algorithm) {
- page_data = dbengine_page_alloc(vd.page_length);
- memcpy(page_data, data + payload_offset + page_offset, (size_t) vd.page_length);
+ pgd = pgd_create_from_disk_data(header->descr[i].type,
+ data + payload_offset + page_offset,
+ vd.page_length);
stats_load_uncompressed++;
}
else {
if (unlikely(page_offset + vd.page_length > uncompressed_payload_length)) {
char log[200 + 1];
- snprintfz(log, 200, "page %u (out of %u) offset %u + page length %zu, "
+ snprintfz(log, sizeof(log) - 1, "page %u (out of %u) offset %u + page length %zu, "
"exceeds the uncompressed buffer size %u",
i, count, page_offset, vd.page_length, uncompressed_payload_length);
epdl_extent_loading_error_log(ctx, epdl, &header->descr[i], log);
- page_data = DBENGINE_EMPTY_PAGE;
+ pgd = PGD_EMPTY;
stats_load_invalid_page++;
}
else {
- page_data = dbengine_page_alloc(vd.page_length);
- memcpy(page_data, uncompressed_buf + page_offset, vd.page_length);
+ pgd = pgd_create_from_disk_data(header->descr[i].type,
+ uncompressed_buf + page_offset,
+ vd.page_length);
stats_load_compressed++;
}
}
@@ -1061,14 +1111,14 @@ static bool epdl_populate_pages_from_extent_data(
.start_time_s = vd.start_time_s,
.end_time_s = vd.end_time_s,
.update_every_s = (uint32_t) vd.update_every_s,
- .size = (size_t) ((page_data == DBENGINE_EMPTY_PAGE) ? 0 : vd.page_length),
- .data = page_data
+ .size = pgd_memory_footprint(pgd), // the footprint of the entire PGD, for accurate memory management
+ .data = pgd,
};
bool added = true;
PGC_PAGE *page = pgc_page_add_and_acquire(main_cache, page_entry, &added);
if (false == added) {
- dbengine_page_free(page_data, vd.page_length);
+ pgd_free(pgd);
stats_cache_hit_while_inserting++;
stats_data_from_main_cache++;
}
@@ -1081,8 +1131,7 @@ static bool epdl_populate_pages_from_extent_data(
pgc_page_dup(main_cache, page);
pd->page = page;
- pd->page_length = pgc_page_data_size(main_cache, page);
- pdc_page_status_set(pd, PDC_PAGE_READY | tags | ((page_data == DBENGINE_EMPTY_PAGE) ? PDC_PAGE_EMPTY : 0));
+ pdc_page_status_set(pd, PDC_PAGE_READY | tags | (pgd_is_empty(pgd) ? PDC_PAGE_EMPTY : 0));
pd = pd->load.next;
} while(pd);