summaryrefslogtreecommitdiffstats
path: root/src/database/engine/dbengine-compression.c
blob: 46ef2b075f21bde27d7c55c86eec073969f6e51d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// SPDX-License-Identifier: GPL-3.0-or-later

#include "rrdengine.h"
#include "dbengine-compression.h"

#ifdef ENABLE_LZ4
#include <lz4.h>
#endif

#ifdef ENABLE_ZSTD
#include <zstd.h>
#define DBENGINE_ZSTD_DEFAULT_COMPRESSION_LEVEL 3
#endif

uint8_t dbengine_default_compression(void) {

#ifdef ENABLE_LZ4
    return RRDENG_COMPRESSION_LZ4;
#endif

    return RRDENG_COMPRESSION_NONE;
}

bool dbengine_valid_compression_algorithm(uint8_t algorithm) {
    switch(algorithm) {
        case RRDENG_COMPRESSION_NONE:

#ifdef ENABLE_LZ4
        case RRDENG_COMPRESSION_LZ4:
#endif

#ifdef ENABLE_ZSTD
        case RRDENG_COMPRESSION_ZSTD:
#endif

            return true;

        default:
            return false;
    }
}

size_t dbengine_max_compressed_size(size_t uncompressed_size, uint8_t algorithm) {
    switch(algorithm) {
#ifdef ENABLE_LZ4
        case RRDENG_COMPRESSION_LZ4:
            fatal_assert(uncompressed_size < LZ4_MAX_INPUT_SIZE);
            return LZ4_compressBound((int)uncompressed_size);
#endif

#ifdef ENABLE_ZSTD
        case RRDENG_COMPRESSION_ZSTD:
            return ZSTD_compressBound(uncompressed_size);
#endif

        case RRDENG_COMPRESSION_NONE:
            return uncompressed_size;

        default:
            fatal("DBENGINE: unknown compression algorithm %u", algorithm);
    }
}

size_t dbengine_compress(void *payload, size_t uncompressed_size, uint8_t algorithm) {
    // the result should be stored in the payload
    // the caller must have called dbengine_max_compressed_size() to make sure the
    // payload is big enough to fit the max size needed.

    switch(algorithm) {
#ifdef ENABLE_LZ4
        case RRDENG_COMPRESSION_LZ4: {
            size_t max_compressed_size = dbengine_max_compressed_size(uncompressed_size, algorithm);
            struct extent_buffer *eb = extent_buffer_get(max_compressed_size);
            void *compressed_buf = eb->data;

            size_t compressed_size =
                LZ4_compress_default(payload, compressed_buf, (int)uncompressed_size, (int)max_compressed_size);

            if(compressed_size > 0 && compressed_size < uncompressed_size)
                memcpy(payload, compressed_buf, compressed_size);
            else
                compressed_size = 0;

            extent_buffer_release(eb);
            return compressed_size;
        }
#endif

#ifdef ENABLE_ZSTD
        case RRDENG_COMPRESSION_ZSTD: {
            size_t max_compressed_size = dbengine_max_compressed_size(uncompressed_size, algorithm);
            struct extent_buffer *eb = extent_buffer_get(max_compressed_size);
            void *compressed_buf = eb->data;

            size_t compressed_size = ZSTD_compress(compressed_buf, max_compressed_size, payload, uncompressed_size,
                                                   DBENGINE_ZSTD_DEFAULT_COMPRESSION_LEVEL);

            if (ZSTD_isError(compressed_size)) {
                internal_fatal(true, "DBENGINE: ZSTD compression error %s", ZSTD_getErrorName(compressed_size));
                compressed_size = 0;
            }

            if(compressed_size > 0 && compressed_size < uncompressed_size)
                memcpy(payload, compressed_buf, compressed_size);
            else
                compressed_size = 0;

            extent_buffer_release(eb);
            return compressed_size;
        }
#endif

        case RRDENG_COMPRESSION_NONE:
            return 0;

        default:
            fatal("DBENGINE: unknown compression algorithm %u", algorithm);
    }
}

size_t dbengine_decompress(void *dst, void *src, size_t dst_size, size_t src_size, uint8_t algorithm) {
    switch(algorithm) {

#ifdef ENABLE_LZ4
        case RRDENG_COMPRESSION_LZ4: {
            int rc = LZ4_decompress_safe(src, dst, (int)src_size, (int)dst_size);
            if(rc < 0) {
                nd_log(NDLS_DAEMON, NDLP_ERR, "DBENGINE: ZSTD decompression error %d", rc);
                rc = 0;
            }

            return rc;
        }
#endif

#ifdef ENABLE_ZSTD
        case RRDENG_COMPRESSION_ZSTD: {
            size_t decompressed_size = ZSTD_decompress(dst, dst_size, src, src_size);

            if (ZSTD_isError(decompressed_size)) {
                nd_log(NDLS_DAEMON, NDLP_ERR, "DBENGINE: ZSTD decompression error %s",
                       ZSTD_getErrorName(decompressed_size));

                decompressed_size = 0;
            }

            return decompressed_size;
        }
#endif

        case RRDENG_COMPRESSION_NONE:
            internal_fatal(true, "DBENGINE: %s() should not be called for uncompressed pages", __FUNCTION__ );
            return 0;

        default:
            internal_fatal(true, "DBENGINE: unknown compression algorithm %u", algorithm);
            return 0;
    }
}