summaryrefslogtreecommitdiffstats
path: root/streaming/compression_zstd.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/compression_zstd.c')
-rw-r--r--streaming/compression_zstd.c163
1 files changed, 163 insertions, 0 deletions
diff --git a/streaming/compression_zstd.c b/streaming/compression_zstd.c
new file mode 100644
index 000000000..dabc044f7
--- /dev/null
+++ b/streaming/compression_zstd.c
@@ -0,0 +1,163 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "compression_zstd.h"
+
+#ifdef ENABLE_ZSTD
+#include <zstd.h>
+
+void rrdpush_compressor_init_zstd(struct compressor_state *state) {
+ if(!state->initialized) {
+ state->initialized = true;
+ state->stream = ZSTD_createCStream();
+
+ if(state->level < 1)
+ state->level = 1;
+
+ if(state->level > ZSTD_maxCLevel())
+ state->level = ZSTD_maxCLevel();
+
+ size_t ret = ZSTD_initCStream(state->stream, state->level);
+ if(ZSTD_isError(ret))
+ netdata_log_error("STREAM: ZSTD_initCStream() returned error: %s", ZSTD_getErrorName(ret));
+
+ // ZSTD_CCtx_setParameter(state->stream, ZSTD_c_compressionLevel, 1);
+ // ZSTD_CCtx_setParameter(state->stream, ZSTD_c_strategy, ZSTD_fast);
+ }
+}
+
+void rrdpush_compressor_destroy_zstd(struct compressor_state *state) {
+ if(state->stream) {
+ ZSTD_freeCStream(state->stream);
+ state->stream = NULL;
+ }
+}
+
+size_t rrdpush_compress_zstd(struct compressor_state *state, const char *data, size_t size, const char **out) {
+ if(unlikely(!state || !size || !out))
+ return 0;
+
+ ZSTD_inBuffer inBuffer = {
+ .pos = 0,
+ .size = size,
+ .src = data,
+ };
+
+ size_t wanted_size = MAX(ZSTD_compressBound(inBuffer.size - inBuffer.pos), ZSTD_CStreamOutSize());
+ simple_ring_buffer_make_room(&state->output, wanted_size);
+
+ ZSTD_outBuffer outBuffer = {
+ .pos = 0,
+ .size = state->output.size,
+ .dst = (void *)state->output.data,
+ };
+
+ // compress
+ size_t ret = ZSTD_compressStream(state->stream, &outBuffer, &inBuffer);
+
+ // error handling
+ if(ZSTD_isError(ret)) {
+ netdata_log_error("STREAM: ZSTD_compressStream() return error: %s", ZSTD_getErrorName(ret));
+ return 0;
+ }
+
+ if(inBuffer.pos < inBuffer.size) {
+ netdata_log_error("STREAM: ZSTD_compressStream() left unprocessed input (source payload %zu bytes, consumed %zu bytes)",
+ inBuffer.size, inBuffer.pos);
+ return 0;
+ }
+
+ if(outBuffer.pos == 0) {
+ // ZSTD needs more input to flush the output, so let's flush it manually
+ ret = ZSTD_flushStream(state->stream, &outBuffer);
+
+ if(ZSTD_isError(ret)) {
+ netdata_log_error("STREAM: ZSTD_flushStream() return error: %s", ZSTD_getErrorName(ret));
+ return 0;
+ }
+
+ if(outBuffer.pos == 0) {
+ netdata_log_error("STREAM: ZSTD_compressStream() returned zero compressed bytes "
+ "(source is %zu bytes, output buffer can fit %zu bytes) "
+ , size, outBuffer.size);
+ return 0;
+ }
+ }
+
+ state->sender_locked.total_compressions++;
+ state->sender_locked.total_uncompressed += size;
+ state->sender_locked.total_compressed += outBuffer.pos;
+
+ // return values
+ *out = state->output.data;
+ return outBuffer.pos;
+}
+
+void rrdpush_decompressor_init_zstd(struct decompressor_state *state) {
+ if(!state->initialized) {
+ state->initialized = true;
+ state->stream = ZSTD_createDStream();
+
+ size_t ret = ZSTD_initDStream(state->stream);
+ if(ZSTD_isError(ret))
+ netdata_log_error("STREAM: ZSTD_initDStream() returned error: %s", ZSTD_getErrorName(ret));
+
+ simple_ring_buffer_make_room(&state->output, MAX(COMPRESSION_MAX_CHUNK, ZSTD_DStreamOutSize()));
+ }
+}
+
+void rrdpush_decompressor_destroy_zstd(struct decompressor_state *state) {
+ if (state->stream) {
+ ZSTD_freeDStream(state->stream);
+ state->stream = NULL;
+ }
+}
+
+size_t rrdpush_decompress_zstd(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
+ if (unlikely(!state || !compressed_data || !compressed_size))
+ return 0;
+
+ // The state.output ring buffer is always EMPTY at this point,
+ // meaning that (state->output.read_pos == state->output.write_pos)
+ // However, THEY ARE NOT ZERO.
+
+ ZSTD_inBuffer inBuffer = {
+ .pos = 0,
+ .size = compressed_size,
+ .src = compressed_data,
+ };
+
+ ZSTD_outBuffer outBuffer = {
+ .pos = 0,
+ .dst = (char *)state->output.data,
+ .size = state->output.size,
+ };
+
+ size_t ret = ZSTD_decompressStream(
+ state->stream
+ , &outBuffer
+ , &inBuffer);
+
+ if(ZSTD_isError(ret)) {
+ netdata_log_error("STREAM: ZSTD_decompressStream() return error: %s", ZSTD_getErrorName(ret));
+ return 0;
+ }
+
+ if(inBuffer.pos < inBuffer.size)
+ fatal("RRDPUSH DECOMPRESS: ZSTD ZSTD_decompressStream() decompressed %zu bytes, "
+ "but %zu bytes of compressed data remain",
+ inBuffer.pos, inBuffer.size);
+
+ size_t decompressed_size = outBuffer.pos;
+
+ state->output.read_pos = 0;
+ state->output.write_pos = outBuffer.pos;
+
+ // statistics
+ state->total_compressed += compressed_size;
+ state->total_uncompressed += decompressed_size;
+ state->total_compressions++;
+
+ return decompressed_size;
+}
+
+#endif // ENABLE_ZSTD