summaryrefslogtreecommitdiffstats
path: root/src/database/engine/pdc.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/database/engine/pdc.c (renamed from database/engine/pdc.c)97
1 files changed, 52 insertions, 45 deletions
diff --git a/database/engine/pdc.c b/src/database/engine/pdc.c
index 5fe205e64..28a83e2bc 100644
--- a/database/engine/pdc.c
+++ b/src/database/engine/pdc.c
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#define NETDATA_RRD_INTERNALS
#include "pdc.h"
+#include "dbengine-compression.h"
struct extent_page_details_list {
uv_file file;
@@ -628,28 +629,29 @@ void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags) {
buffer_strcat(wb, "STEP_UNALIGNED");
}
-inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error) {
+inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, uint32_t overwrite_zero_update_every_s, bool have_read_error) {
time_t start_time_s = (time_t) (descr->start_time_ut / USEC_PER_SEC);
- time_t end_time_s;
- size_t entries;
+ time_t end_time_s = 0;
+ size_t entries = 0;
switch (descr->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
end_time_s = descr->end_time_ut / USEC_PER_SEC;
entries = 0;
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
end_time_s = start_time_s + descr->gorilla.delta_time_s;
entries = descr->gorilla.entries;
break;
default:
- fatal("Unknown page type: %uc\n", descr->type);
+ // Nothing to do. Validate page will notify the user.
+ break;
}
return validate_page(
- (uuid_t *)descr->uuid,
+ (nd_uuid_t *)descr->uuid,
start_time_s,
end_time_s,
0,
@@ -663,32 +665,33 @@ inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_
}
VALIDATED_PAGE_DESCRIPTOR validate_page(
- uuid_t *uuid,
+ nd_uuid_t *uuid,
time_t start_time_s,
time_t end_time_s,
- time_t update_every_s, // can be zero, if unknown
+ uint32_t update_every_s, // can be zero, if unknown
size_t page_length,
uint8_t page_type,
size_t entries, // can be zero, if unknown
time_t now_s, // can be zero, to disable future timestamp check
- time_t overwrite_zero_update_every_s, // can be zero, if unknown
+ uint32_t overwrite_zero_update_every_s, // can be zero, if unknown
bool have_read_error,
const char *msg,
- RRDENG_COLLECT_PAGE_FLAGS flags) {
-
+ RRDENG_COLLECT_PAGE_FLAGS flags)
+{
VALIDATED_PAGE_DESCRIPTOR vd = {
.start_time_s = start_time_s,
.end_time_s = end_time_s,
.update_every_s = update_every_s,
.page_length = page_length,
+ .point_size = page_type_size[page_type],
.type = page_type,
.is_valid = true,
};
- vd.point_size = page_type_size[vd.type];
+ bool known_page_type = true;
switch (page_type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
// always calculate entries by size
vd.entries = page_entries_by_size(vd.page_length, vd.point_size);
@@ -696,13 +699,13 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
if(!entries)
entries = vd.entries;
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
internal_fatal(entries == 0, "0 number of entries found on gorilla page");
vd.entries = entries;
break;
default:
- // TODO: should set vd.is_valid false instead?
- fatal("Unknown page type: %uc", page_type);
+ known_page_type = false;
+ break;
}
// allow to be called without update every (when loading pages from disk)
@@ -723,16 +726,16 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
// If gorilla can not compress the data we might end up needing slightly more
// than 4KiB. However, gorilla pages extend the page length by increments of
// 512 bytes.
- max_page_length += ((page_type == PAGE_GORILLA_METRICS) * GORILLA_BUFFER_SIZE);
+ max_page_length += ((page_type == RRDENG_PAGE_TYPE_GORILLA_32BIT) * RRDENG_GORILLA_32BIT_BUFFER_SIZE);
- if( have_read_error ||
+ if (!known_page_type ||
+ have_read_error ||
vd.page_length == 0 ||
vd.page_length > max_page_length ||
vd.start_time_s > vd.end_time_s ||
(now_s && vd.end_time_s > now_s) ||
vd.start_time_s <= 0 ||
vd.end_time_s <= 0 ||
- vd.update_every_s < 0 ||
(vd.start_time_s == vd.end_time_s && vd.entries > 1) ||
(vd.update_every_s == 0 && vd.entries > 1))
{
@@ -791,13 +794,13 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
#endif
"DBENGINE: metric '%s' %s invalid page of type %u "
- "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s)",
+ "from %ld to %ld (now %ld), update every %u, page length %zu, entries %zu (flags: %s)",
uuid_str, msg, vd.type,
vd.start_time_s, vd.end_time_s, now_s, vd.update_every_s, vd.page_length, vd.entries, wb?buffer_tostring(wb):""
);
}
else {
- const char *err_valid = (vd.is_valid) ? "" : "found invalid, ";
+ const char *err_valid = "";
const char *err_start = (vd.start_time_s == start_time_s) ? "" : "start time updated, ";
const char *err_end = (vd.end_time_s == end_time_s) ? "" : "end time updated, ";
const char *err_update = (vd.update_every_s == update_every_s) ? "" : "update every updated, ";
@@ -811,9 +814,9 @@ VALIDATED_PAGE_DESCRIPTOR validate_page(
nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
#endif
"DBENGINE: metric '%s' %s page of type %u "
- "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s), "
+ "from %ld to %ld (now %ld), update every %u, page length %zu, entries %zu (flags: %s), "
"found inconsistent - the right is "
- "from %ld to %ld, update every %ld, page length %zu, entries %zu: "
+ "from %ld to %ld, update every %u, page length %zu, entries %zu: "
"%s%s%s%s%s%s%s",
uuid_str, msg, vd.type,
start_time_s, end_time_s, now_s, update_every_s, page_length, entries, wb?buffer_tostring(wb):"",
@@ -871,11 +874,11 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL *
if (descr) {
start_time_s = (time_t)(descr->start_time_ut / USEC_PER_SEC);
switch (descr->type) {
- case PAGE_METRICS:
- case PAGE_TIER:
+ case RRDENG_PAGE_TYPE_ARRAY_32BIT:
+ case RRDENG_PAGE_TYPE_ARRAY_TIER1:
end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC);
break;
- case PAGE_GORILLA_METRICS:
+ case RRDENG_PAGE_TYPE_GORILLA_32BIT:
end_time_s = (time_t) start_time_s + (descr->gorilla.delta_time_s);
break;
}
@@ -895,7 +898,7 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL *
start_time_s = pd->first_time_s;
end_time_s = pd->last_time_s;
METRIC *metric = (METRIC *)pd->metric_id;
- uuid_t *u = mrg_metric_uuid(main_mrg, metric);
+ nd_uuid_t *u = mrg_metric_uuid(main_mrg, metric);
uuid_unparse_lower(*u, uuid);
used_epdl = true;
}
@@ -938,7 +941,6 @@ static bool epdl_populate_pages_from_extent_data(
PDC_PAGE_STATUS tags,
bool cached_extent)
{
- int ret;
unsigned i, count;
void *uncompressed_buf = NULL;
uint32_t payload_length, payload_offset, trailer_offset, uncompressed_payload_length = 0;
@@ -973,18 +975,17 @@ static bool epdl_populate_pages_from_extent_data(
if( !can_use_data ||
count < 1 ||
count > MAX_PAGES_PER_EXTENT ||
- (header->compression_algorithm != RRD_NO_COMPRESSION && header->compression_algorithm != RRD_LZ4) ||
+ !dbengine_valid_compression_algorithm(header->compression_algorithm) ||
(payload_length != trailer_offset - payload_offset) ||
(data_length != payload_offset + payload_length + sizeof(*trailer))
- ) {
+ ) {
epdl_extent_loading_error_log(ctx, epdl, NULL, "header is INVALID");
return false;
}
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, data, epdl->extent_size - sizeof(*trailer));
- ret = crc32cmp(trailer->checksum, crc);
- if (unlikely(ret)) {
+ if (unlikely(crc32cmp(trailer->checksum, crc))) {
ctx_io_error(ctx);
have_read_error = true;
epdl_extent_loading_error_log(ctx, epdl, NULL, "CRC32 checksum FAILED");
@@ -993,14 +994,15 @@ static bool epdl_populate_pages_from_extent_data(
if(worker)
worker_is_busy(UV_EVENT_DBENGINE_EXTENT_DECOMPRESSION);
- if (likely(!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm)) {
+ if (likely(!have_read_error && RRDENG_COMPRESSION_NONE != header->compression_algorithm)) {
// find the uncompressed extent size
uncompressed_payload_length = 0;
for (i = 0; i < count; ++i) {
size_t page_length = header->descr[i].page_length;
- if (page_length > RRDENG_BLOCK_SIZE && (header->descr[i].type != PAGE_GORILLA_METRICS ||
- (header->descr[i].type == PAGE_GORILLA_METRICS &&
- (page_length - RRDENG_BLOCK_SIZE) % GORILLA_BUFFER_SIZE))) {
+ if (page_length > RRDENG_BLOCK_SIZE &&
+ (header->descr[i].type != RRDENG_PAGE_TYPE_GORILLA_32BIT ||
+ (header->descr[i].type == RRDENG_PAGE_TYPE_GORILLA_32BIT &&
+ (page_length - RRDENG_BLOCK_SIZE) % RRDENG_GORILLA_32BIT_BUFFER_SIZE))) {
have_read_error = true;
break;
}
@@ -1015,11 +1017,16 @@ static bool epdl_populate_pages_from_extent_data(
eb = extent_buffer_get(uncompressed_payload_length);
uncompressed_buf = eb->data;
- ret = LZ4_decompress_safe(data + payload_offset, uncompressed_buf,
- (int) payload_length, (int) uncompressed_payload_length);
+ size_t bytes = dbengine_decompress(uncompressed_buf, data + payload_offset,
+ uncompressed_payload_length, payload_length,
+ header->compression_algorithm);
- __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED);
- __atomic_add_fetch(&ctx->stats.after_decompress_bytes, ret, __ATOMIC_RELAXED);
+ if(!bytes)
+ have_read_error = true;
+ else {
+ __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED);
+ __atomic_add_fetch(&ctx->stats.after_decompress_bytes, bytes, __ATOMIC_RELAXED);
+ }
}
}
@@ -1075,7 +1082,7 @@ static bool epdl_populate_pages_from_extent_data(
stats_load_invalid_page++;
}
else {
- if (RRD_NO_COMPRESSION == header->compression_algorithm) {
+ if (RRDENG_COMPRESSION_NONE == header->compression_algorithm) {
pgd = pgd_create_from_disk_data(header->descr[i].type,
data + payload_offset + page_offset,
vd.page_length);
@@ -1172,7 +1179,7 @@ static bool epdl_populate_pages_from_extent_data(
static inline void *datafile_extent_read(struct rrdengine_instance *ctx, uv_file file, unsigned pos, unsigned size_bytes)
{
- void *buffer;
+ void *buffer = NULL;
uv_fs_t request;
unsigned real_io_size = ALIGN_BYTES_CEILING(size_bytes);