diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:03 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:18 +0000 |
commit | 5da14042f70711ea5cf66e034699730335462f66 (patch) | |
tree | 0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/database/engine/rrdengine.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz netdata-5da14042f70711ea5cf66e034699730335462f66.zip |
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/database/engine/rrdengine.c (renamed from database/engine/rrdengine.c) | 92 |
1 files changed, 53 insertions, 39 deletions
diff --git a/database/engine/rrdengine.c b/src/database/engine/rrdengine.c index b82cc1ad1..7b2137436 100644 --- a/database/engine/rrdengine.c +++ b/src/database/engine/rrdengine.c @@ -3,6 +3,7 @@ #include "rrdengine.h" #include "pdc.h" +#include "dbengine-compression.h" rrdeng_stats_t global_io_errors = 0; rrdeng_stats_t global_fs_errors = 0; @@ -229,7 +230,7 @@ static void after_work_standard_callback(uv_work_t* req, int status) { worker_is_idle(); } -static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb work_cb, after_work_cb after_work_cb) { +static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb do_work_cb, after_work_cb do_after_work_cb) { struct rrdeng_work *work_request = NULL; internal_fatal(rrdeng_main.tid != gettid(), "work_dispatch() can only be run from the event loop thread"); @@ -240,8 +241,8 @@ static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct com work_request->ctx = ctx; work_request->data = data; work_request->completion = completion; - work_request->work_cb = work_cb; - work_request->after_work_cb = after_work_cb; + work_request->work_cb = do_work_cb; + work_request->after_work_cb = do_after_work_cb; work_request->opcode = opcode; if(uv_queue_work(&rrdeng_main.loop, &work_request->req, work_standard_worker, after_work_standard_callback)) { @@ -772,13 +773,10 @@ static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_ */ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_instance *ctx, struct page_descr_with_data *base, struct completion *completion) { int ret; - int compressed_size, max_compressed_size = 0; unsigned i, count, size_bytes, pos, real_io_size; - uint32_t uncompressed_payload_length, payload_offset; + uint32_t uncompressed_payload_length, max_compressed_size, payload_offset; struct page_descr_with_data *descr, *eligible_pages[MAX_PAGES_PER_EXTENT]; struct extent_io_descriptor *xt_io_descr; - struct extent_buffer *eb = NULL; - void *compressed_buf = NULL; Word_t Index; uint8_t compression_algorithm = ctx->config.global_compress_alg; struct rrdengine_datafile *datafile; @@ -807,20 +805,8 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta xt_io_descr = extent_io_descriptor_get(); xt_io_descr->ctx = ctx; payload_offset = sizeof(*header) + count * sizeof(header->descr[0]); - switch (compression_algorithm) { - case RRD_NO_COMPRESSION: - size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer); - break; - - default: /* Compress */ - fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE); - max_compressed_size = LZ4_compressBound(uncompressed_payload_length); - eb = extent_buffer_get(max_compressed_size); - compressed_buf = eb->data; - size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer); - break; - } - + max_compressed_size = dbengine_max_compressed_size(uncompressed_payload_length, compression_algorithm); + size_bytes = payload_offset + MAX(uncompressed_payload_length, max_compressed_size) + sizeof(*trailer); ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes)); if (unlikely(ret)) { fatal("DBENGINE: posix_memalign:%s", strerror(ret)); @@ -832,7 +818,6 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta pos = 0; header = xt_io_descr->buf; - header->compression_algorithm = compression_algorithm; header->number_of_pages = count; pos += sizeof(*header); @@ -844,11 +829,11 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta header->descr[i].start_time_ut = descr->start_time_ut; switch (descr->type) { - case PAGE_METRICS: - case PAGE_TIER: + case RRDENG_PAGE_TYPE_ARRAY_32BIT: + case RRDENG_PAGE_TYPE_ARRAY_TIER1: header->descr[i].end_time_ut = descr->end_time_ut; break; - case PAGE_GORILLA_METRICS: + case RRDENG_PAGE_TYPE_GORILLA_32BIT: header->descr[i].gorilla.delta_time_s = (uint32_t) ((descr->end_time_ut - descr->start_time_ut) / USEC_PER_SEC); header->descr[i].gorilla.entries = pgd_slots_used(descr->pgd); break; @@ -858,29 +843,40 @@ static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_insta pos += sizeof(header->descr[i]); } + + // build the extent payload for (i = 0 ; i < count ; ++i) { descr = xt_io_descr->descr_array[i]; pgd_copy_to_extent(descr->pgd, xt_io_descr->buf + pos, descr->page_length); pos += descr->page_length; } - if(likely(compression_algorithm == RRD_LZ4)) { - compressed_size = LZ4_compress_default( - xt_io_descr->buf + payload_offset, - compressed_buf, - (int)uncompressed_payload_length, - max_compressed_size); + // compress the payload + size_t compressed_size = + (int)dbengine_compress(xt_io_descr->buf + payload_offset, + uncompressed_payload_length, + compression_algorithm); - __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED); - __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED); + internal_fatal(compressed_size > max_compressed_size, "DBENGINE: compression returned more data than the max allowed"); + internal_fatal(compressed_size > uncompressed_payload_length, "DBENGINE: compression returned more data than the uncompressed extent"); - (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size); - extent_buffer_release(eb); - size_bytes = payload_offset + compressed_size + sizeof(*trailer); + if(compressed_size) { + header->compression_algorithm = compression_algorithm; header->payload_length = compressed_size; } - else { // RRD_NO_COMPRESSION - header->payload_length = uncompressed_payload_length; + else { + // compression failed, or generated bigger pages + // so it didn't touch our uncompressed buffer + header->compression_algorithm = RRDENG_COMPRESSION_NONE; + header->payload_length = compressed_size = uncompressed_payload_length; + } + + // set the correct size + size_bytes = payload_offset + compressed_size + sizeof(*trailer); + + if(compression_algorithm != RRDENG_COMPRESSION_NONE) { + __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED); + __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED); } real_io_size = ALIGN_BYTES_CEILING(size_bytes); @@ -1171,7 +1167,17 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r for (size_t index = 0; index < added; ++index) { uuid_first_t_entry = &uuid_first_entry_list[index]; if (likely(uuid_first_t_entry->first_time_s != LONG_MAX)) { - mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s); + + time_t old_first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric); + + bool changed = mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s); + if (changed) { + uint32_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric); + if (update_every_s && old_first_time_s && uuid_first_t_entry->first_time_s > old_first_time_s) { + uint64_t remove_samples = (uuid_first_t_entry->first_time_s - old_first_time_s) / update_every_s; + __atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED); + } + } mrg_metric_release(main_mrg, uuid_first_t_entry->metric); } else { @@ -1180,6 +1186,14 @@ static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct r // there is no retention for this metric bool has_retention = mrg_metric_zero_disk_retention(main_mrg, uuid_first_t_entry->metric); if (!has_retention) { + time_t first_time_s = mrg_metric_get_first_time_s(main_mrg, uuid_first_t_entry->metric); + time_t last_time_s = mrg_metric_get_latest_time_s(main_mrg, uuid_first_t_entry->metric); + time_t update_every_s = mrg_metric_get_update_every_s(main_mrg, uuid_first_t_entry->metric); + if (update_every_s && first_time_s && last_time_s) { + uint64_t remove_samples = (first_time_s - last_time_s) / update_every_s; + __atomic_sub_fetch(&ctx->atomic.samples, remove_samples, __ATOMIC_RELAXED); + } + bool deleted = mrg_metric_release_and_delete(main_mrg, uuid_first_t_entry->metric); if(deleted) deleted_metrics++; |