diff options
Diffstat (limited to 'streaming/compression_zstd.c')
-rw-r--r-- | streaming/compression_zstd.c | 163 |
1 files changed, 163 insertions, 0 deletions
diff --git a/streaming/compression_zstd.c b/streaming/compression_zstd.c new file mode 100644 index 000000000..dabc044f7 --- /dev/null +++ b/streaming/compression_zstd.c @@ -0,0 +1,163 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "compression_zstd.h" + +#ifdef ENABLE_ZSTD +#include <zstd.h> + +void rrdpush_compressor_init_zstd(struct compressor_state *state) { + if(!state->initialized) { + state->initialized = true; + state->stream = ZSTD_createCStream(); + + if(state->level < 1) + state->level = 1; + + if(state->level > ZSTD_maxCLevel()) + state->level = ZSTD_maxCLevel(); + + size_t ret = ZSTD_initCStream(state->stream, state->level); + if(ZSTD_isError(ret)) + netdata_log_error("STREAM: ZSTD_initCStream() returned error: %s", ZSTD_getErrorName(ret)); + + // ZSTD_CCtx_setParameter(state->stream, ZSTD_c_compressionLevel, 1); + // ZSTD_CCtx_setParameter(state->stream, ZSTD_c_strategy, ZSTD_fast); + } +} + +void rrdpush_compressor_destroy_zstd(struct compressor_state *state) { + if(state->stream) { + ZSTD_freeCStream(state->stream); + state->stream = NULL; + } +} + +size_t rrdpush_compress_zstd(struct compressor_state *state, const char *data, size_t size, const char **out) { + if(unlikely(!state || !size || !out)) + return 0; + + ZSTD_inBuffer inBuffer = { + .pos = 0, + .size = size, + .src = data, + }; + + size_t wanted_size = MAX(ZSTD_compressBound(inBuffer.size - inBuffer.pos), ZSTD_CStreamOutSize()); + simple_ring_buffer_make_room(&state->output, wanted_size); + + ZSTD_outBuffer outBuffer = { + .pos = 0, + .size = state->output.size, + .dst = (void *)state->output.data, + }; + + // compress + size_t ret = ZSTD_compressStream(state->stream, &outBuffer, &inBuffer); + + // error handling + if(ZSTD_isError(ret)) { + netdata_log_error("STREAM: ZSTD_compressStream() return error: %s", ZSTD_getErrorName(ret)); + return 0; + } + + if(inBuffer.pos < inBuffer.size) { + netdata_log_error("STREAM: ZSTD_compressStream() left unprocessed input (source payload %zu bytes, consumed %zu bytes)", + inBuffer.size, inBuffer.pos); + return 0; + } + + if(outBuffer.pos == 0) { + // ZSTD needs more input to flush the output, so let's flush it manually + ret = ZSTD_flushStream(state->stream, &outBuffer); + + if(ZSTD_isError(ret)) { + netdata_log_error("STREAM: ZSTD_flushStream() return error: %s", ZSTD_getErrorName(ret)); + return 0; + } + + if(outBuffer.pos == 0) { + netdata_log_error("STREAM: ZSTD_compressStream() returned zero compressed bytes " + "(source is %zu bytes, output buffer can fit %zu bytes) " + , size, outBuffer.size); + return 0; + } + } + + state->sender_locked.total_compressions++; + state->sender_locked.total_uncompressed += size; + state->sender_locked.total_compressed += outBuffer.pos; + + // return values + *out = state->output.data; + return outBuffer.pos; +} + +void rrdpush_decompressor_init_zstd(struct decompressor_state *state) { + if(!state->initialized) { + state->initialized = true; + state->stream = ZSTD_createDStream(); + + size_t ret = ZSTD_initDStream(state->stream); + if(ZSTD_isError(ret)) + netdata_log_error("STREAM: ZSTD_initDStream() returned error: %s", ZSTD_getErrorName(ret)); + + simple_ring_buffer_make_room(&state->output, MAX(COMPRESSION_MAX_CHUNK, ZSTD_DStreamOutSize())); + } +} + +void rrdpush_decompressor_destroy_zstd(struct decompressor_state *state) { + if (state->stream) { + ZSTD_freeDStream(state->stream); + state->stream = NULL; + } +} + +size_t rrdpush_decompress_zstd(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { + if (unlikely(!state || !compressed_data || !compressed_size)) + return 0; + + // The state.output ring buffer is always EMPTY at this point, + // meaning that (state->output.read_pos == state->output.write_pos) + // However, THEY ARE NOT ZERO. + + ZSTD_inBuffer inBuffer = { + .pos = 0, + .size = compressed_size, + .src = compressed_data, + }; + + ZSTD_outBuffer outBuffer = { + .pos = 0, + .dst = (char *)state->output.data, + .size = state->output.size, + }; + + size_t ret = ZSTD_decompressStream( + state->stream + , &outBuffer + , &inBuffer); + + if(ZSTD_isError(ret)) { + netdata_log_error("STREAM: ZSTD_decompressStream() return error: %s", ZSTD_getErrorName(ret)); + return 0; + } + + if(inBuffer.pos < inBuffer.size) + fatal("RRDPUSH DECOMPRESS: ZSTD ZSTD_decompressStream() decompressed %zu bytes, " + "but %zu bytes of compressed data remain", + inBuffer.pos, inBuffer.size); + + size_t decompressed_size = outBuffer.pos; + + state->output.read_pos = 0; + state->output.write_pos = outBuffer.pos; + + // statistics + state->total_compressed += compressed_size; + state->total_uncompressed += decompressed_size; + state->total_compressions++; + + return decompressed_size; +} + +#endif // ENABLE_ZSTD |