diff options
Diffstat (limited to '')
-rw-r--r-- | src/database/engine/pdc.c (renamed from database/engine/pdc.c) | 97 |
1 files changed, 52 insertions, 45 deletions
diff --git a/database/engine/pdc.c b/src/database/engine/pdc.c index 5fe205e64..28a83e2bc 100644 --- a/database/engine/pdc.c +++ b/src/database/engine/pdc.c @@ -1,6 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later #define NETDATA_RRD_INTERNALS #include "pdc.h" +#include "dbengine-compression.h" struct extent_page_details_list { uv_file file; @@ -628,28 +629,29 @@ void collect_page_flags_to_buffer(BUFFER *wb, RRDENG_COLLECT_PAGE_FLAGS flags) { buffer_strcat(wb, "STEP_UNALIGNED"); } -inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, time_t overwrite_zero_update_every_s, bool have_read_error) { +inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_extent_page_descr *descr, time_t now_s, uint32_t overwrite_zero_update_every_s, bool have_read_error) { time_t start_time_s = (time_t) (descr->start_time_ut / USEC_PER_SEC); - time_t end_time_s; - size_t entries; + time_t end_time_s = 0; + size_t entries = 0; switch (descr->type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: end_time_s = descr->end_time_ut / USEC_PER_SEC; entries = 0; break; - case PAGE_GORILLA_METRICS: + case RRDENG_PAGE_TYPE_GORILLA_32BIT: end_time_s = start_time_s + descr->gorilla.delta_time_s; entries = descr->gorilla.entries; break; default: - fatal("Unknown page type: %uc\n", descr->type); + // Nothing to do. Validate page will notify the user. + break; } return validate_page( - (uuid_t *)descr->uuid, + (nd_uuid_t *)descr->uuid, start_time_s, end_time_s, 0, @@ -663,32 +665,33 @@ inline VALIDATED_PAGE_DESCRIPTOR validate_extent_page_descr(const struct rrdeng_ } VALIDATED_PAGE_DESCRIPTOR validate_page( - uuid_t *uuid, + nd_uuid_t *uuid, time_t start_time_s, time_t end_time_s, - time_t update_every_s, // can be zero, if unknown + uint32_t update_every_s, // can be zero, if unknown size_t page_length, uint8_t page_type, size_t entries, // can be zero, if unknown time_t now_s, // can be zero, to disable future timestamp check - time_t overwrite_zero_update_every_s, // can be zero, if unknown + uint32_t overwrite_zero_update_every_s, // can be zero, if unknown bool have_read_error, const char *msg, - RRDENG_COLLECT_PAGE_FLAGS flags) { - + RRDENG_COLLECT_PAGE_FLAGS flags) +{ VALIDATED_PAGE_DESCRIPTOR vd = { .start_time_s = start_time_s, .end_time_s = end_time_s, .update_every_s = update_every_s, .page_length = page_length, + .point_size = page_type_size[page_type], .type = page_type, .is_valid = true, }; - vd.point_size = page_type_size[vd.type]; + bool known_page_type = true; switch (page_type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: // always calculate entries by size vd.entries = page_entries_by_size(vd.page_length, vd.point_size); @@ -696,13 +699,13 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( if(!entries) entries = vd.entries; break; - case PAGE_GORILLA_METRICS: + case RRDENG_PAGE_TYPE_GORILLA_32BIT: internal_fatal(entries == 0, "0 number of entries found on gorilla page"); vd.entries = entries; break; default: - // TODO: should set vd.is_valid false instead? - fatal("Unknown page type: %uc", page_type); + known_page_type = false; + break; } // allow to be called without update every (when loading pages from disk) @@ -723,16 +726,16 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( // If gorilla can not compress the data we might end up needing slightly more // than 4KiB. However, gorilla pages extend the page length by increments of // 512 bytes. - max_page_length += ((page_type == PAGE_GORILLA_METRICS) * GORILLA_BUFFER_SIZE); + max_page_length += ((page_type == RRDENG_PAGE_TYPE_GORILLA_32BIT) * RRDENG_GORILLA_32BIT_BUFFER_SIZE); - if( have_read_error || + if (!known_page_type || + have_read_error || vd.page_length == 0 || vd.page_length > max_page_length || vd.start_time_s > vd.end_time_s || (now_s && vd.end_time_s > now_s) || vd.start_time_s <= 0 || vd.end_time_s <= 0 || - vd.update_every_s < 0 || (vd.start_time_s == vd.end_time_s && vd.entries > 1) || (vd.update_every_s == 0 && vd.entries > 1)) { @@ -791,13 +794,13 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, #endif "DBENGINE: metric '%s' %s invalid page of type %u " - "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s)", + "from %ld to %ld (now %ld), update every %u, page length %zu, entries %zu (flags: %s)", uuid_str, msg, vd.type, vd.start_time_s, vd.end_time_s, now_s, vd.update_every_s, vd.page_length, vd.entries, wb?buffer_tostring(wb):"" ); } else { - const char *err_valid = (vd.is_valid) ? "" : "found invalid, "; + const char *err_valid = ""; const char *err_start = (vd.start_time_s == start_time_s) ? "" : "start time updated, "; const char *err_end = (vd.end_time_s == end_time_s) ? "" : "end time updated, "; const char *err_update = (vd.update_every_s == update_every_s) ? "" : "update every updated, "; @@ -811,9 +814,9 @@ VALIDATED_PAGE_DESCRIPTOR validate_page( nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR, #endif "DBENGINE: metric '%s' %s page of type %u " - "from %ld to %ld (now %ld), update every %ld, page length %zu, entries %zu (flags: %s), " + "from %ld to %ld (now %ld), update every %u, page length %zu, entries %zu (flags: %s), " "found inconsistent - the right is " - "from %ld to %ld, update every %ld, page length %zu, entries %zu: " + "from %ld to %ld, update every %u, page length %zu, entries %zu: " "%s%s%s%s%s%s%s", uuid_str, msg, vd.type, start_time_s, end_time_s, now_s, update_every_s, page_length, entries, wb?buffer_tostring(wb):"", @@ -871,11 +874,11 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL * if (descr) { start_time_s = (time_t)(descr->start_time_ut / USEC_PER_SEC); switch (descr->type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: end_time_s = (time_t)(descr->end_time_ut / USEC_PER_SEC); break; - case PAGE_GORILLA_METRICS: + case RRDENG_PAGE_TYPE_GORILLA_32BIT: end_time_s = (time_t) start_time_s + (descr->gorilla.delta_time_s); break; } @@ -895,7 +898,7 @@ static void epdl_extent_loading_error_log(struct rrdengine_instance *ctx, EPDL * start_time_s = pd->first_time_s; end_time_s = pd->last_time_s; METRIC *metric = (METRIC *)pd->metric_id; - uuid_t *u = mrg_metric_uuid(main_mrg, metric); + nd_uuid_t *u = mrg_metric_uuid(main_mrg, metric); uuid_unparse_lower(*u, uuid); used_epdl = true; } @@ -938,7 +941,6 @@ static bool epdl_populate_pages_from_extent_data( PDC_PAGE_STATUS tags, bool cached_extent) { - int ret; unsigned i, count; void *uncompressed_buf = NULL; uint32_t payload_length, payload_offset, trailer_offset, uncompressed_payload_length = 0; @@ -973,18 +975,17 @@ static bool epdl_populate_pages_from_extent_data( if( !can_use_data || count < 1 || count > MAX_PAGES_PER_EXTENT || - (header->compression_algorithm != RRD_NO_COMPRESSION && header->compression_algorithm != RRD_LZ4) || + !dbengine_valid_compression_algorithm(header->compression_algorithm) || (payload_length != trailer_offset - payload_offset) || (data_length != payload_offset + payload_length + sizeof(*trailer)) - ) { + ) { epdl_extent_loading_error_log(ctx, epdl, NULL, "header is INVALID"); return false; } crc = crc32(0L, Z_NULL, 0); crc = crc32(crc, data, epdl->extent_size - sizeof(*trailer)); - ret = crc32cmp(trailer->checksum, crc); - if (unlikely(ret)) { + if (unlikely(crc32cmp(trailer->checksum, crc))) { ctx_io_error(ctx); have_read_error = true; epdl_extent_loading_error_log(ctx, epdl, NULL, "CRC32 checksum FAILED"); @@ -993,14 +994,15 @@ static bool epdl_populate_pages_from_extent_data( if(worker) worker_is_busy(UV_EVENT_DBENGINE_EXTENT_DECOMPRESSION); - if (likely(!have_read_error && RRD_NO_COMPRESSION != header->compression_algorithm)) { + if (likely(!have_read_error && RRDENG_COMPRESSION_NONE != header->compression_algorithm)) { // find the uncompressed extent size uncompressed_payload_length = 0; for (i = 0; i < count; ++i) { size_t page_length = header->descr[i].page_length; - if (page_length > RRDENG_BLOCK_SIZE && (header->descr[i].type != PAGE_GORILLA_METRICS || - (header->descr[i].type == PAGE_GORILLA_METRICS && - (page_length - RRDENG_BLOCK_SIZE) % GORILLA_BUFFER_SIZE))) { + if (page_length > RRDENG_BLOCK_SIZE && + (header->descr[i].type != RRDENG_PAGE_TYPE_GORILLA_32BIT || + (header->descr[i].type == RRDENG_PAGE_TYPE_GORILLA_32BIT && + (page_length - RRDENG_BLOCK_SIZE) % RRDENG_GORILLA_32BIT_BUFFER_SIZE))) { have_read_error = true; break; } @@ -1015,11 +1017,16 @@ static bool epdl_populate_pages_from_extent_data( eb = extent_buffer_get(uncompressed_payload_length); uncompressed_buf = eb->data; - ret = LZ4_decompress_safe(data + payload_offset, uncompressed_buf, - (int) payload_length, (int) uncompressed_payload_length); + size_t bytes = dbengine_decompress(uncompressed_buf, data + payload_offset, + uncompressed_payload_length, payload_length, + header->compression_algorithm); - __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED); - __atomic_add_fetch(&ctx->stats.after_decompress_bytes, ret, __ATOMIC_RELAXED); + if(!bytes) + have_read_error = true; + else { + __atomic_add_fetch(&ctx->stats.before_decompress_bytes, payload_length, __ATOMIC_RELAXED); + __atomic_add_fetch(&ctx->stats.after_decompress_bytes, bytes, __ATOMIC_RELAXED); + } } } @@ -1075,7 +1082,7 @@ static bool epdl_populate_pages_from_extent_data( stats_load_invalid_page++; } else { - if (RRD_NO_COMPRESSION == header->compression_algorithm) { + if (RRDENG_COMPRESSION_NONE == header->compression_algorithm) { pgd = pgd_create_from_disk_data(header->descr[i].type, data + payload_offset + page_offset, vd.page_length); @@ -1172,7 +1179,7 @@ static bool epdl_populate_pages_from_extent_data( static inline void *datafile_extent_read(struct rrdengine_instance *ctx, uv_file file, unsigned pos, unsigned size_bytes) { - void *buffer; + void *buffer = NULL; uv_fs_t request; unsigned real_io_size = ALIGN_BYTES_CEILING(size_bytes); |