// SPDX-License-Identifier: GPL-3.0-or-later #include "compression_lz4.h" #ifdef ENABLE_LZ4 #include "lz4.h" // ---------------------------------------------------------------------------- // compress void rrdpush_compressor_init_lz4(struct compressor_state *state) { if(!state->initialized) { state->initialized = true; state->stream = LZ4_createStream(); // LZ4 needs access to the last 64KB of source data // so, we keep twice the size of each message simple_ring_buffer_make_room(&state->input, 65536 + COMPRESSION_MAX_CHUNK * 2); } } void rrdpush_compressor_destroy_lz4(struct compressor_state *state) { if (state->stream) { LZ4_freeStream(state->stream); state->stream = NULL; } } /* * Compress the given block of data * Compressed data will remain in the internal buffer until the next invocation * Return the size of compressed data block as result and the pointer to internal buffer using the last argument * or 0 in case of error */ size_t rrdpush_compress_lz4(struct compressor_state *state, const char *data, size_t size, const char **out) { if(unlikely(!state || !size || !out)) return 0; // we need to keep the last 64K of our previous source data // as they were in the ring buffer simple_ring_buffer_make_room(&state->output, LZ4_COMPRESSBOUND(size)); if(state->input.write_pos + size > state->input.size) // the input buffer cannot fit out data, restart from zero simple_ring_buffer_reset(&state->input); simple_ring_buffer_append_data(&state->input, data, size); long int compressed_data_size = LZ4_compress_fast_continue( state->stream, state->input.data + state->input.read_pos, (char *)state->output.data, (int)(state->input.write_pos - state->input.read_pos), (int)state->output.size, state->level); if (compressed_data_size <= 0) { netdata_log_error("STREAM: LZ4_compress_fast_continue() returned %ld " "(source is %zu bytes, output buffer can fit %zu bytes)", compressed_data_size, size, state->output.size); return 0; } state->input.read_pos = state->input.write_pos; state->sender_locked.total_compressions++; state->sender_locked.total_uncompressed += size; state->sender_locked.total_compressed += compressed_data_size; *out = state->output.data; return compressed_data_size; } // ---------------------------------------------------------------------------- // decompress void rrdpush_decompressor_init_lz4(struct decompressor_state *state) { if(!state->initialized) { state->initialized = true; state->stream = LZ4_createStreamDecode(); simple_ring_buffer_make_room(&state->output, 65536 + COMPRESSION_MAX_CHUNK * 2); } } void rrdpush_decompressor_destroy_lz4(struct decompressor_state *state) { if (state->stream) { LZ4_freeStreamDecode(state->stream); state->stream = NULL; } } /* * Decompress the compressed data in the internal buffer * Return the size of uncompressed data or 0 for error */ size_t rrdpush_decompress_lz4(struct decompressor_state *state, const char *compressed_data, size_t compressed_size) { if (unlikely(!state || !compressed_data || !compressed_size)) return 0; // The state.output ring buffer is always EMPTY at this point, // meaning that (state->output.read_pos == state->output.write_pos) // However, THEY ARE NOT ZERO. if (unlikely(state->output.write_pos + COMPRESSION_MAX_CHUNK > state->output.size)) // the input buffer cannot fit out data, restart from zero simple_ring_buffer_reset(&state->output); long int decompressed_size = LZ4_decompress_safe_continue( state->stream , compressed_data , (char *)(state->output.data + state->output.write_pos) , (int)compressed_size , (int)(state->output.size - state->output.write_pos) ); if (unlikely(decompressed_size < 0)) { netdata_log_error("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() returned negative value: %ld " "(compressed chunk is %zu bytes)" , decompressed_size, compressed_size); return 0; } if(unlikely(decompressed_size + state->output.write_pos > state->output.size)) fatal("RRDPUSH DECOMPRESS: LZ4_decompress_safe_continue() overflown the stream_buffer " "(size: %zu, pos: %zu, added: %ld, exceeding the buffer by %zu)" , state->output.size , state->output.write_pos , decompressed_size , (size_t)(state->output.write_pos + decompressed_size - state->output.size) ); state->output.write_pos += decompressed_size; // statistics state->total_compressed += compressed_size; state->total_uncompressed += decompressed_size; state->total_compressions++; return decompressed_size; } #endif // ENABLE_LZ4