summaryrefslogtreecommitdiffstats
path: root/streaming/compression.h
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:23 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:44 +0000
commit836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch)
tree1604da8f482d02effa033c94a84be42bc0c848c3 /streaming/compression.h
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz
netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'streaming/compression.h')
-rw-r--r--streaming/compression.h175
1 files changed, 0 insertions, 175 deletions
diff --git a/streaming/compression.h b/streaming/compression.h
deleted file mode 100644
index a67f65b83..000000000
--- a/streaming/compression.h
+++ /dev/null
@@ -1,175 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "rrdpush.h"
-
-#ifndef NETDATA_RRDPUSH_COMPRESSION_H
-#define NETDATA_RRDPUSH_COMPRESSION_H 1
-
-// signature MUST end with a newline
-
-#if COMPRESSION_MAX_MSG_SIZE >= (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD)
-#error "COMPRESSION_MAX_MSG_SIZE >= (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD)"
-#endif
-
-typedef uint32_t rrdpush_signature_t;
-#define RRDPUSH_COMPRESSION_SIGNATURE ((rrdpush_signature_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
-#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((rrdpush_signature_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
-#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE sizeof(rrdpush_signature_t)
-
-static inline rrdpush_signature_t rrdpush_compress_encode_signature(size_t compressed_data_size) {
- rrdpush_signature_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
- return len | RRDPUSH_COMPRESSION_SIGNATURE;
-}
-
-typedef enum {
- COMPRESSION_ALGORITHM_NONE = 0,
- COMPRESSION_ALGORITHM_ZSTD,
- COMPRESSION_ALGORITHM_LZ4,
- COMPRESSION_ALGORITHM_GZIP,
- COMPRESSION_ALGORITHM_BROTLI,
-
- // terminator
- COMPRESSION_ALGORITHM_MAX,
-} compression_algorithm_t;
-
-extern int rrdpush_compression_levels[COMPRESSION_ALGORITHM_MAX];
-
-// this defines the order the algorithms will be selected by the receiver (parent)
-#define RRDPUSH_COMPRESSION_ALGORITHMS_ORDER "zstd lz4 brotli gzip"
-
-// ----------------------------------------------------------------------------
-
-typedef struct simple_ring_buffer {
- const char *data;
- size_t size;
- size_t read_pos;
- size_t write_pos;
-} SIMPLE_RING_BUFFER;
-
-static inline void simple_ring_buffer_reset(SIMPLE_RING_BUFFER *b) {
- b->read_pos = b->write_pos = 0;
-}
-
-static inline void simple_ring_buffer_make_room(SIMPLE_RING_BUFFER *b, size_t size) {
- if(b->write_pos + size > b->size) {
- if(!b->size)
- b->size = COMPRESSION_MAX_CHUNK;
- else
- b->size *= 2;
-
- if(b->write_pos + size > b->size)
- b->size += size;
-
- b->data = (const char *)reallocz((void *)b->data, b->size);
- }
-}
-
-static inline void simple_ring_buffer_append_data(SIMPLE_RING_BUFFER *b, const void *data, size_t size) {
- simple_ring_buffer_make_room(b, size);
- memcpy((void *)(b->data + b->write_pos), data, size);
- b->write_pos += size;
-}
-
-static inline void simple_ring_buffer_destroy(SIMPLE_RING_BUFFER *b) {
- freez((void *)b->data);
- b->data = NULL;
- b->read_pos = b->write_pos = b->size = 0;
-}
-
-// ----------------------------------------------------------------------------
-
-struct compressor_state {
- bool initialized;
- compression_algorithm_t algorithm;
-
- SIMPLE_RING_BUFFER input;
- SIMPLE_RING_BUFFER output;
-
- int level;
- void *stream;
-
- struct {
- size_t total_compressed;
- size_t total_uncompressed;
- size_t total_compressions;
- } sender_locked;
-};
-
-void rrdpush_compressor_init(struct compressor_state *state);
-void rrdpush_compressor_destroy(struct compressor_state *state);
-size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, const char **out);
-
-// ----------------------------------------------------------------------------
-
-struct decompressor_state {
- bool initialized;
- compression_algorithm_t algorithm;
- size_t signature_size;
-
- size_t total_compressed;
- size_t total_uncompressed;
- size_t total_compressions;
-
- SIMPLE_RING_BUFFER output;
-
- void *stream;
-};
-
-void rrdpush_decompressor_destroy(struct decompressor_state *state);
-void rrdpush_decompressor_init(struct decompressor_state *state);
-size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);
-
-static inline size_t rrdpush_decompress_decode_signature(const char *data, size_t data_size) {
- if (unlikely(!data || !data_size))
- return 0;
-
- if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE))
- return 0;
-
- rrdpush_signature_t sign = *(rrdpush_signature_t *)data;
- if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE))
- return 0;
-
- size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
- return length;
-}
-
-static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) {
- if(unlikely(state->output.read_pos != state->output.write_pos))
- fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");
-
- return rrdpush_decompress_decode_signature(header, header_size);
-}
-
-static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) {
- if(unlikely(state->output.read_pos > state->output.write_pos))
- fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
-
- return state->output.write_pos - state->output.read_pos;
-}
-
-static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
- if (unlikely(!state || !size || !dst))
- return 0;
-
- size_t remaining = rrdpush_decompressed_bytes_in_buffer(state);
-
- if(unlikely(!remaining))
- return 0;
-
- size_t bytes_to_return = size;
- if(bytes_to_return > remaining)
- bytes_to_return = remaining;
-
- memcpy(dst, state->output.data + state->output.read_pos, bytes_to_return);
- state->output.read_pos += bytes_to_return;
-
- if(unlikely(state->output.read_pos > state->output.write_pos))
- fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");
-
- return bytes_to_return;
-}
-
-// ----------------------------------------------------------------------------
-
-#endif // NETDATA_RRDPUSH_COMPRESSION_H 1