summaryrefslogtreecommitdiffstats
path: root/streaming/compression.h
blob: a67f65b83a7cffd3aeffb6f2da6a76d6dc23269c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// SPDX-License-Identifier: GPL-3.0-or-later

#include "rrdpush.h"

#ifndef NETDATA_RRDPUSH_COMPRESSION_H
#define NETDATA_RRDPUSH_COMPRESSION_H 1

// signature MUST end with a newline

#if COMPRESSION_MAX_MSG_SIZE >= (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD)
#error "COMPRESSION_MAX_MSG_SIZE >= (COMPRESSION_MAX_CHUNK - COMPRESSION_MAX_OVERHEAD)"
#endif

typedef uint32_t rrdpush_signature_t;
#define RRDPUSH_COMPRESSION_SIGNATURE ((rrdpush_signature_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
#define RRDPUSH_COMPRESSION_SIGNATURE_MASK ((rrdpush_signature_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
#define RRDPUSH_COMPRESSION_SIGNATURE_SIZE sizeof(rrdpush_signature_t)

static inline rrdpush_signature_t rrdpush_compress_encode_signature(size_t compressed_data_size) {
    rrdpush_signature_t len = ((compressed_data_size & 0x7f) | 0x80 | (((compressed_data_size & (0x7f << 7)) << 1) | 0x8000)) << 8;
    return len | RRDPUSH_COMPRESSION_SIGNATURE;
}

typedef enum {
    COMPRESSION_ALGORITHM_NONE  = 0,
    COMPRESSION_ALGORITHM_ZSTD,
    COMPRESSION_ALGORITHM_LZ4,
    COMPRESSION_ALGORITHM_GZIP,
    COMPRESSION_ALGORITHM_BROTLI,

    // terminator
    COMPRESSION_ALGORITHM_MAX,
} compression_algorithm_t;

extern int rrdpush_compression_levels[COMPRESSION_ALGORITHM_MAX];

// this defines the order the algorithms will be selected by the receiver (parent)
#define RRDPUSH_COMPRESSION_ALGORITHMS_ORDER "zstd lz4 brotli gzip"

// ----------------------------------------------------------------------------

typedef struct simple_ring_buffer {
    const char *data;
    size_t size;
    size_t read_pos;
    size_t write_pos;
} SIMPLE_RING_BUFFER;

static inline void simple_ring_buffer_reset(SIMPLE_RING_BUFFER *b) {
    b->read_pos = b->write_pos = 0;
}

static inline void simple_ring_buffer_make_room(SIMPLE_RING_BUFFER *b, size_t size) {
    if(b->write_pos + size > b->size) {
        if(!b->size)
            b->size = COMPRESSION_MAX_CHUNK;
        else
            b->size *= 2;

        if(b->write_pos + size > b->size)
            b->size += size;

        b->data = (const char *)reallocz((void *)b->data, b->size);
    }
}

static inline void simple_ring_buffer_append_data(SIMPLE_RING_BUFFER *b, const void *data, size_t size) {
    simple_ring_buffer_make_room(b, size);
    memcpy((void *)(b->data + b->write_pos), data, size);
    b->write_pos += size;
}

static inline void simple_ring_buffer_destroy(SIMPLE_RING_BUFFER *b) {
    freez((void *)b->data);
    b->data = NULL;
    b->read_pos = b->write_pos = b->size = 0;
}

// ----------------------------------------------------------------------------

struct compressor_state {
    bool initialized;
    compression_algorithm_t algorithm;

    SIMPLE_RING_BUFFER input;
    SIMPLE_RING_BUFFER output;

    int level;
    void *stream;

    struct {
        size_t total_compressed;
        size_t total_uncompressed;
        size_t total_compressions;
    } sender_locked;
};

void rrdpush_compressor_init(struct compressor_state *state);
void rrdpush_compressor_destroy(struct compressor_state *state);
size_t rrdpush_compress(struct compressor_state *state, const char *data, size_t size, const char **out);

// ----------------------------------------------------------------------------

struct decompressor_state {
    bool initialized;
    compression_algorithm_t algorithm;
    size_t signature_size;

    size_t total_compressed;
    size_t total_uncompressed;
    size_t total_compressions;

    SIMPLE_RING_BUFFER output;

    void *stream;
};

void rrdpush_decompressor_destroy(struct decompressor_state *state);
void rrdpush_decompressor_init(struct decompressor_state *state);
size_t rrdpush_decompress(struct decompressor_state *state, const char *compressed_data, size_t compressed_size);

static inline size_t rrdpush_decompress_decode_signature(const char *data, size_t data_size) {
    if (unlikely(!data || !data_size))
        return 0;

    if (unlikely(data_size != RRDPUSH_COMPRESSION_SIGNATURE_SIZE))
        return 0;

    rrdpush_signature_t sign = *(rrdpush_signature_t *)data;
    if (unlikely((sign & RRDPUSH_COMPRESSION_SIGNATURE_MASK) != RRDPUSH_COMPRESSION_SIGNATURE))
        return 0;

    size_t length = ((sign >> 8) & 0x7f) | ((sign >> 9) & (0x7f << 7));
    return length;
}

static inline size_t rrdpush_decompressor_start(struct decompressor_state *state, const char *header, size_t header_size) {
    if(unlikely(state->output.read_pos != state->output.write_pos))
        fatal("RRDPUSH DECOMPRESS: asked to decompress new data, while there are unread data in the decompression buffer!");

    return rrdpush_decompress_decode_signature(header, header_size);
}

static inline size_t rrdpush_decompressed_bytes_in_buffer(struct decompressor_state *state) {
    if(unlikely(state->output.read_pos > state->output.write_pos))
        fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");

    return state->output.write_pos - state->output.read_pos;
}

static inline size_t rrdpush_decompressor_get(struct decompressor_state *state, char *dst, size_t size) {
    if (unlikely(!state || !size || !dst))
        return 0;

    size_t remaining = rrdpush_decompressed_bytes_in_buffer(state);

    if(unlikely(!remaining))
        return 0;

    size_t bytes_to_return = size;
    if(bytes_to_return > remaining)
        bytes_to_return = remaining;

    memcpy(dst, state->output.data + state->output.read_pos, bytes_to_return);
    state->output.read_pos += bytes_to_return;

    if(unlikely(state->output.read_pos > state->output.write_pos))
        fatal("RRDPUSH DECOMPRESS: invalid read/write stream positions");

    return bytes_to_return;
}

// ----------------------------------------------------------------------------

#endif // NETDATA_RRDPUSH_COMPRESSION_H 1