summaryrefslogtreecommitdiffstats
path: root/src/streaming/compression_zstd.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-11-09 08:36:07 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-11-25 11:21:20 +0000
commiteae52fdaa9298e00f14b0b6256400d200db9c373 (patch)
treea3040a19bd024295ded05370853647bab9d7c225 /src/streaming/compression_zstd.c
parentAdding upstream version 1.47.5. (diff)
downloadnetdata-upstream/2.0.3.tar.xz
netdata-upstream/2.0.3.zip
Adding upstream version 2.0.3.upstream/2.0.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/streaming/compression_zstd.c')
-rw-r--r--src/streaming/compression_zstd.c163
1 files changed, 0 insertions, 163 deletions
diff --git a/src/streaming/compression_zstd.c b/src/streaming/compression_zstd.c
deleted file mode 100644
index dabc044f7..000000000
--- a/src/streaming/compression_zstd.c
+++ /dev/null
@@ -1,163 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "compression_zstd.h"
-
-#ifdef ENABLE_ZSTD
-#include <zstd.h>
-
-void rrdpush_compressor_init_zstd(struct compressor_state *state) {
- if(!state->initialized) {
- state->initialized = true;
- state->stream = ZSTD_createCStream();
-
- if(state->level < 1)
- state->level = 1;
-
- if(state->level > ZSTD_maxCLevel())
- state->level = ZSTD_maxCLevel();
-
- size_t ret = ZSTD_initCStream(state->stream, state->level);
- if(ZSTD_isError(ret))
- netdata_log_error("STREAM: ZSTD_initCStream() returned error: %s", ZSTD_getErrorName(ret));
-
- // ZSTD_CCtx_setParameter(state->stream, ZSTD_c_compressionLevel, 1);
- // ZSTD_CCtx_setParameter(state->stream, ZSTD_c_strategy, ZSTD_fast);
- }
-}
-
-void rrdpush_compressor_destroy_zstd(struct compressor_state *state) {
- if(state->stream) {
- ZSTD_freeCStream(state->stream);
- state->stream = NULL;
- }
-}
-
-size_t rrdpush_compress_zstd(struct compressor_state *state, const char *data, size_t size, const char **out) {
- if(unlikely(!state || !size || !out))
- return 0;
-
- ZSTD_inBuffer inBuffer = {
- .pos = 0,
- .size = size,
- .src = data,
- };
-
- size_t wanted_size = MAX(ZSTD_compressBound(inBuffer.size - inBuffer.pos), ZSTD_CStreamOutSize());
- simple_ring_buffer_make_room(&state->output, wanted_size);
-
- ZSTD_outBuffer outBuffer = {
- .pos = 0,
- .size = state->output.size,
- .dst = (void *)state->output.data,
- };
-
- // compress
- size_t ret = ZSTD_compressStream(state->stream, &outBuffer, &inBuffer);
-
- // error handling
- if(ZSTD_isError(ret)) {
- netdata_log_error("STREAM: ZSTD_compressStream() return error: %s", ZSTD_getErrorName(ret));
- return 0;
- }
-
- if(inBuffer.pos < inBuffer.size) {
- netdata_log_error("STREAM: ZSTD_compressStream() left unprocessed input (source payload %zu bytes, consumed %zu bytes)",
- inBuffer.size, inBuffer.pos);
- return 0;
- }
-
- if(outBuffer.pos == 0) {
- // ZSTD needs more input to flush the output, so let's flush it manually
- ret = ZSTD_flushStream(state->stream, &outBuffer);
-
- if(ZSTD_isError(ret)) {
- netdata_log_error("STREAM: ZSTD_flushStream() return error: %s", ZSTD_getErrorName(ret));
- return 0;
- }
-
- if(outBuffer.pos == 0) {
- netdata_log_error("STREAM: ZSTD_compressStream() returned zero compressed bytes "
- "(source is %zu bytes, output buffer can fit %zu bytes) "
- , size, outBuffer.size);
- return 0;
- }
- }
-
- state->sender_locked.total_compressions++;
- state->sender_locked.total_uncompressed += size;
- state->sender_locked.total_compressed += outBuffer.pos;
-
- // return values
- *out = state->output.data;
- return outBuffer.pos;
-}
-
-void rrdpush_decompressor_init_zstd(struct decompressor_state *state) {
- if(!state->initialized) {
- state->initialized = true;
- state->stream = ZSTD_createDStream();
-
- size_t ret = ZSTD_initDStream(state->stream);
- if(ZSTD_isError(ret))
- netdata_log_error("STREAM: ZSTD_initDStream() returned error: %s", ZSTD_getErrorName(ret));
-
- simple_ring_buffer_make_room(&state->output, MAX(COMPRESSION_MAX_CHUNK, ZSTD_DStreamOutSize()));
- }
-}
-
-void rrdpush_decompressor_destroy_zstd(struct decompressor_state *state) {
- if (state->stream) {
- ZSTD_freeDStream(state->stream);
- state->stream = NULL;
- }
-}
-
-size_t rrdpush_decompress_zstd(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
- if (unlikely(!state || !compressed_data || !compressed_size))
- return 0;
-
- // The state.output ring buffer is always EMPTY at this point,
- // meaning that (state->output.read_pos == state->output.write_pos)
- // However, THEY ARE NOT ZERO.
-
- ZSTD_inBuffer inBuffer = {
- .pos = 0,
- .size = compressed_size,
- .src = compressed_data,
- };
-
- ZSTD_outBuffer outBuffer = {
- .pos = 0,
- .dst = (char *)state->output.data,
- .size = state->output.size,
- };
-
- size_t ret = ZSTD_decompressStream(
- state->stream
- , &outBuffer
- , &inBuffer);
-
- if(ZSTD_isError(ret)) {
- netdata_log_error("STREAM: ZSTD_decompressStream() return error: %s", ZSTD_getErrorName(ret));
- return 0;
- }
-
- if(inBuffer.pos < inBuffer.size)
- fatal("RRDPUSH DECOMPRESS: ZSTD ZSTD_decompressStream() decompressed %zu bytes, "
- "but %zu bytes of compressed data remain",
- inBuffer.pos, inBuffer.size);
-
- size_t decompressed_size = outBuffer.pos;
-
- state->output.read_pos = 0;
- state->output.write_pos = outBuffer.pos;
-
- // statistics
- state->total_compressed += compressed_size;
- state->total_uncompressed += decompressed_size;
- state->total_compressions++;
-
- return decompressed_size;
-}
-
-#endif // ENABLE_ZSTD