summaryrefslogtreecommitdiffstats
path: root/streaming/compression_lz4.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/compression_lz4.c')
-rw-r--r--streaming/compression_lz4.c143
1 files changed, 143 insertions, 0 deletions
diff --git a/streaming/compression_lz4.c b/streaming/compression_lz4.c
new file mode 100644
index 000000000..f5174134e
--- /dev/null
+++ b/streaming/compression_lz4.c
@@ -0,0 +1,143 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "compression_lz4.h"
+
+#ifdef ENABLE_LZ4
+#include "lz4.h"
+
+// ----------------------------------------------------------------------------
+// compress
+
+void rrdpush_compressor_init_lz4(struct compressor_state *state) {
+ if(!state->initialized) {
+ state->initialized = true;
+ state->stream = LZ4_createStream();
+
+ // LZ4 needs access to the last 64KB of source data
+ // so, we keep twice the size of each message
+ simple_ring_buffer_make_room(&state->input, 65536 + COMPRESSION_MAX_CHUNK * 2);
+ }
+}
+
+void rrdpush_compressor_destroy_lz4(struct compressor_state *state) {
+ if (state->stream) {
+ LZ4_freeStream(state->stream);
+ state->stream = NULL;
+ }
+}
+
+/*
+ * Compress the given block of data
+ * Compressed data will remain in the internal buffer until the next invocation
+ * Return the size of compressed data block as result and the pointer to internal buffer using the last argument
+ * or 0 in case of error
+ */
+size_t rrdpush_compress_lz4(struct compressor_state *state, const char *data, size_t size, const char **out) {
+ if(unlikely(!state || !size || !out))
+ return 0;
+
+ // we need to keep the last 64K of our previous source data
+ // as they were in the ring buffer
+
+ simple_ring_buffer_make_room(&state->output, LZ4_COMPRESSBOUND(size));
+
+ if(state->input.write_pos + size > state->input.size)
+ // the input buffer cannot fit out data, restart from zero
+ simple_ring_buffer_reset(&state->input);
+
+ simple_ring_buffer_append_data(&state->input, data, size);
+
+ long int compressed_data_size = LZ4_compress_fast_continue(
+ state->stream,
+ state->input.data + state->input.read_pos,
+ (char *)state->output.data,
+ (int)(state->input.write_pos - state->input.read_pos),
+ (int)state->output.size,
+ state->level);
+
+ if (compressed_data_size <= 0) {
+ netdata_log_error("STREAM: LZ4_compress_fast_continue() returned %ld "
+ "(source is %zu bytes, output buffer can fit %zu bytes)",
+ compressed_data_size, size, state->output.size);
+ return 0;
+ }
+
+ state->input.read_pos = state->input.write_pos;
+
+ state->sender_locked.total_compressions++;
+ state->sender_locked.total_uncompressed += size;
+ state->sender_locked.total_compressed += compressed_data_size;
+
+ *out = state->output.data;
+ return compressed_data_size;
+}
+
+// ----------------------------------------------------------------------------
+// decompress
+
+void rrdpush_decompressor_init_lz4(struct decompressor_state *state) {
+ if(!state->initialized) {
+ state->initialized = true;
+ state->stream = LZ4_createStreamDecode();
+ simple_ring_buffer_make_room(&state->output, 65536 + COMPRESSION_MAX_CHUNK * 2);
+ }
+}
+
+void rrdpush_decompressor_destroy_lz4(struct decompressor_state *state) {
+ if (state->stream) {
+ LZ4_freeStreamDecode(state->stream);
+ state->stream = NULL;
+ }
+}
+
+/*
+ * Decompress the compressed data in the internal buffer
+ * Return the size of uncompressed data or 0 for error
+ */
+size_t rrdpush_decompress_lz4(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) {
+ if (unlikely(!state || !compressed_data || !compressed_size))
+ return 0;
+
+ // The state.output ring buffer is always EMPTY at this point,
+ // meaning that (state->output.read_pos == state->output.write_pos)
+ // However, THEY ARE NOT ZERO.
+
+ if (unlikely(state->output.write_pos + COMPRESSION_MAX_CHUNK > state->output.size))
+ // the input buffer cannot fit out data, restart from zero
+ simple_ring_buffer_reset(&state->output);
+
+ long int decompressed_size = LZ4_decompress_safe_continue(
+ state->stream
+ , compressed_data
+ , (char *)(state->output.data + state->output.write_pos)
+ , (int)compressed_size
+ , (int)(state->output.size - state->output.write_pos)
+ );
+
+ if (unlikely(decompressed_size < 0)) {
+ netdata_log_error("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() returned negative value: %ld "
+ "(compressed chunk is %zu bytes)"
+ , decompressed_size, compressed_size);
+ return 0;
+ }
+
+ if(unlikely(decompressed_size + state->output.write_pos > state->output.size))
+ fatal("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() overflown the stream_buffer "
+ "(size: %zu, pos: %zu, added: %ld, exceeding the buffer by %zu)"
+ , state->output.size
+ , state->output.write_pos
+ , decompressed_size
+ , (size_t)(state->output.write_pos + decompressed_size - state->output.size)
+ );
+
+ state->output.write_pos += decompressed_size;
+
+ // statistics
+ state->total_compressed += compressed_size;
+ state->total_uncompressed += decompressed_size;
+ state->total_compressions++;
+
+ return decompressed_size;
+}
+
+#endif // ENABLE_LZ4