From be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Fri, 19 Apr 2024 04:57:58 +0200 Subject: Adding upstream version 1.44.3. Signed-off-by: Daniel Baumann --- libnetdata/gorilla/README.md | 39 +++ libnetdata/gorilla/benchmark.sh | 14 ++ libnetdata/gorilla/fuzzer.sh | 14 ++ libnetdata/gorilla/gorilla.cc | 522 ++++++++++++++++++++++++++++++++++++++++ libnetdata/gorilla/gorilla.h | 77 ++++++ 5 files changed, 666 insertions(+) create mode 100644 libnetdata/gorilla/README.md create mode 100755 libnetdata/gorilla/benchmark.sh create mode 100755 libnetdata/gorilla/fuzzer.sh create mode 100644 libnetdata/gorilla/gorilla.cc create mode 100644 libnetdata/gorilla/gorilla.h (limited to 'libnetdata/gorilla') diff --git a/libnetdata/gorilla/README.md b/libnetdata/gorilla/README.md new file mode 100644 index 00000000..dc3718d1 --- /dev/null +++ b/libnetdata/gorilla/README.md @@ -0,0 +1,39 @@ +# Gorilla compression and decompression + +This provides an alternative way of representing values stored in database +pages. Instead of allocating and using a page of fixed size, ie. 4096 bytes, +the Gorilla implementation adds support for dynamically sized pages that +contain a variable number of Gorilla buffers. + +Each buffer takes 512 bytes and compresses incoming data using the Gorilla +compression: + +- The very first value is stored as it is. +- For each new value, Gorilla compression doesn't store the value itself. Instead, +it computes the difference (XOR) between the new value and the previous value. +- If the XOR result is zero (meaning the new value is identical to the previous +value), we store just a single bit set to `1`. +- If the XOR result is not zero (meaning the new value differs from the previous): + - We store a `0` bit to indicate the change. + - We compute the leading-zero count (LZC) of the XOR result, and compare it + with the previous LZC. If the two LZCs are equal we store a `1` bit. + - If the LZCs are different we use 5 bits to store the new LZC, and we store + the rest of the value (ie. without its LZC) in the buffer. + +A Gorilla page can have multiple Gorilla buffers. If the values of a metric +are highly compressible, just one Gorilla buffer is able to store all the values +that otherwise would require a regular 4096 byte page, ie. we can use just 512 +bytes instead. In the worst case scenario (for metrics whose values are not +compressible at all), a Gorilla page might end up having `9` Gorilla buffers, +consuming 4608 bytes. In practice, this is pretty rare and does not negate +the effect of compression for the metrics. + +When a gorilla page is full, ie. it contains 1024 slots/values, we serialize +the linked-list of gorilla buffers directly to disk. During deserialization, +eg. when performing a DBEngine query, the Gorilla page is loaded from the disk and +its linked-list entries are patched to point to the new memory allocated for +serving the query results. + +Overall, on a real-agent the Gorilla compression scheme reduces memory +consumption approximately by ~30%, which can be several GiB of RAM for parents +having hundreds, or even thousands of children streaming to them. diff --git a/libnetdata/gorilla/benchmark.sh b/libnetdata/gorilla/benchmark.sh new file mode 100755 index 00000000..a5d11143 --- /dev/null +++ b/libnetdata/gorilla/benchmark.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +set -exu -o pipefail + +clang++ \ + -std=c++11 -Wall -Wextra \ + -DENABLE_BENCHMARK -O2 -g \ + -lbenchmark -lbenchmark_main \ + -o gorilla_benchmark gorilla.cc + +./gorilla_benchmark diff --git a/libnetdata/gorilla/fuzzer.sh b/libnetdata/gorilla/fuzzer.sh new file mode 100755 index 00000000..19098a61 --- /dev/null +++ b/libnetdata/gorilla/fuzzer.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +set -exu -o pipefail + +clang++ \ + -std=c++11 -Wall -Wextra \ + -DENABLE_FUZZER -O2 -g \ + -fsanitize=fuzzer \ + -o gorilla_fuzzer gorilla.cc + +./gorilla_fuzzer -workers=12 -jobs=16 diff --git a/libnetdata/gorilla/gorilla.cc b/libnetdata/gorilla/gorilla.cc new file mode 100644 index 00000000..e6138ce3 --- /dev/null +++ b/libnetdata/gorilla/gorilla.cc @@ -0,0 +1,522 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "gorilla.h" + +#include +#include +#include +#include + +using std::size_t; + +template +static constexpr size_t bit_size() noexcept +{ + static_assert((sizeof(T) * CHAR_BIT) == 32 || (sizeof(T) * CHAR_BIT) == 64, + "Word size should be 32 or 64 bits."); + return (sizeof(T) * CHAR_BIT); +} + +static void bit_buffer_write(uint32_t *buf, size_t pos, uint32_t v, size_t nbits) +{ + assert(nbits > 0 && nbits <= bit_size()); + + const size_t index = pos / bit_size(); + const size_t offset = pos % bit_size(); + + pos += nbits; + + if (offset == 0) { + buf[index] = v; + } else { + const size_t remaining_bits = bit_size() - offset; + + // write the lower part of the value + const uint32_t low_bits_mask = ((uint32_t) 1 << remaining_bits) - 1; + const uint32_t lowest_bits_in_value = v & low_bits_mask; + buf[index] |= (lowest_bits_in_value << offset); + + if (nbits > remaining_bits) { + // write the upper part of the value + const uint32_t high_bits_mask = ~low_bits_mask; + const uint32_t highest_bits_in_value = (v & high_bits_mask) >> (remaining_bits); + buf[index + 1] = highest_bits_in_value; + } + } +} + +static void bit_buffer_read(const uint32_t *buf, size_t pos, uint32_t *v, size_t nbits) +{ + assert(nbits > 0 && nbits <= bit_size()); + + const size_t index = pos / bit_size(); + const size_t offset = pos % bit_size(); + + pos += nbits; + + if (offset == 0) { + *v = (nbits == bit_size()) ? + buf[index] : + buf[index] & (((uint32_t) 1 << nbits) - 1); + } else { + const size_t remaining_bits = bit_size() - offset; + + // extract the lower part of the value + if (nbits < remaining_bits) { + *v = (buf[index] >> offset) & (((uint32_t) 1 << nbits) - 1); + } else { + *v = (buf[index] >> offset) & (((uint32_t) 1 << remaining_bits) - 1); + nbits -= remaining_bits; + *v |= (buf[index + 1] & (((uint32_t) 1 << nbits) - 1)) << remaining_bits; + } + } +} + +gorilla_writer_t gorilla_writer_init(gorilla_buffer_t *gbuf, size_t n) +{ + gorilla_writer_t gw = gorilla_writer_t { + .head_buffer = gbuf, + .last_buffer = NULL, + .prev_number = 0, + .prev_xor_lzc = 0, + .capacity = 0 + }; + + gorilla_writer_add_buffer(&gw, gbuf, n); + return gw; +} + +void gorilla_writer_add_buffer(gorilla_writer_t *gw, gorilla_buffer_t *gbuf, size_t n) +{ + gbuf->header.next = NULL; + gbuf->header.entries = 0; + gbuf->header.nbits = 0; + + uint32_t capacity = (n * bit_size()) - (sizeof(gorilla_header_t) * CHAR_BIT); + + gw->prev_number = 0; + gw->prev_xor_lzc = 0; + gw->capacity = capacity; + + if (gw->last_buffer) + gw->last_buffer->header.next = gbuf; + + __atomic_store_n(&gw->last_buffer, gbuf, __ATOMIC_RELAXED); +} + +uint32_t gorilla_writer_entries(const gorilla_writer_t *gw) { + uint32_t entries = 0; + + const gorilla_buffer_t *curr_gbuf = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST); + do { + const gorilla_buffer_t *next_gbuf = __atomic_load_n(&curr_gbuf->header.next, __ATOMIC_SEQ_CST); + + entries += __atomic_load_n(&curr_gbuf->header.entries, __ATOMIC_SEQ_CST); + + curr_gbuf = next_gbuf; + } while (curr_gbuf); + + return entries; +} + +bool gorilla_writer_write(gorilla_writer_t *gw, uint32_t number) +{ + gorilla_header_t *hdr = &gw->last_buffer->header; + uint32_t *data = gw->last_buffer->data; + + // this is the first number we are writing + if (hdr->entries == 0) { + if (hdr->nbits + bit_size() >= gw->capacity) + return false; + bit_buffer_write(data, hdr->nbits, number, bit_size()); + + __atomic_fetch_add(&hdr->nbits, bit_size(), __ATOMIC_RELAXED); + __atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED); + gw->prev_number = number; + return true; + } + + // write true/false based on whether we got the same number or not. + if (number == gw->prev_number) { + if (hdr->nbits + 1 >= gw->capacity) + return false; + + bit_buffer_write(data, hdr->nbits, static_cast(1), 1); + __atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED); + return true; + } + + if (hdr->nbits + 1 >= gw->capacity) + return false; + bit_buffer_write(data, hdr->nbits, static_cast(0), 1); + __atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED); + + uint32_t xor_value = gw->prev_number ^ number; + uint32_t xor_lzc = (bit_size() == 32) ? __builtin_clz(xor_value) : __builtin_clzll(xor_value); + uint32_t is_xor_lzc_same = (xor_lzc == gw->prev_xor_lzc) ? 1 : 0; + + if (hdr->nbits + 1 >= gw->capacity) + return false; + bit_buffer_write(data, hdr->nbits, is_xor_lzc_same, 1); + __atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED); + + if (!is_xor_lzc_same) { + if (hdr->nbits + 1 >= gw->capacity) + return false; + bit_buffer_write(data, hdr->nbits, xor_lzc, (bit_size() == 32) ? 5 : 6); + __atomic_fetch_add(&hdr->nbits, (bit_size() == 32) ? 5 : 6, __ATOMIC_RELAXED); + } + + // write the bits of the XOR'd value without the LZC prefix + if (hdr->nbits + (bit_size() - xor_lzc) >= gw->capacity) + return false; + bit_buffer_write(data, hdr->nbits, xor_value, bit_size() - xor_lzc); + __atomic_fetch_add(&hdr->nbits, bit_size() - xor_lzc, __ATOMIC_RELAXED); + __atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED); + + gw->prev_number = number; + gw->prev_xor_lzc = xor_lzc; + return true; +} + +gorilla_buffer_t *gorilla_writer_drop_head_buffer(gorilla_writer_t *gw) { + if (!gw->head_buffer) + return NULL; + + gorilla_buffer_t *curr_head = gw->head_buffer; + gorilla_buffer_t *next_head = gw->head_buffer->header.next; + __atomic_store_n(&gw->head_buffer, next_head, __ATOMIC_RELAXED); + return curr_head; +} + +uint32_t gorilla_writer_nbytes(const gorilla_writer_t *gw) +{ + uint32_t nbits = 0; + + const gorilla_buffer_t *curr_gbuf = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST); + do { + const gorilla_buffer_t *next_gbuf = __atomic_load_n(&curr_gbuf->header.next, __ATOMIC_SEQ_CST); + + nbits += __atomic_load_n(&curr_gbuf->header.nbits, __ATOMIC_SEQ_CST); + + curr_gbuf = next_gbuf; + } while (curr_gbuf); + + return (nbits + (CHAR_BIT - 1)) / CHAR_BIT; +} + +bool gorilla_writer_serialize(const gorilla_writer_t *gw, uint8_t *dst, uint32_t dst_size) { + const gorilla_buffer_t *curr_gbuf = gw->head_buffer; + + do { + const gorilla_buffer_t *next_gbuf = curr_gbuf->header.next; + + size_t bytes = GORILLA_BUFFER_SIZE; + if (bytes > dst_size) + return false; + + memcpy(dst, curr_gbuf, bytes); + dst += bytes; + dst_size -= bytes; + + curr_gbuf = next_gbuf; + } while (curr_gbuf); + + return true; +} + +uint32_t gorilla_buffer_patch(gorilla_buffer_t *gbuf) { + gorilla_buffer_t *curr_gbuf = gbuf; + uint32_t n = curr_gbuf->header.entries; + + while (curr_gbuf->header.next) { + uint32_t *buf = reinterpret_cast(gbuf); + gbuf = reinterpret_cast(&buf[GORILLA_BUFFER_SLOTS]); + + assert(((uintptr_t) (gbuf) % sizeof(uintptr_t)) == 0 && + "Gorilla buffer not aligned to uintptr_t"); + + curr_gbuf->header.next = gbuf; + curr_gbuf = curr_gbuf->header.next; + + n += curr_gbuf->header.entries; + } + + return n; +} + +gorilla_reader_t gorilla_writer_get_reader(const gorilla_writer_t *gw) +{ + const gorilla_buffer_t *buffer = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST); + + uint32_t entries = __atomic_load_n(&buffer->header.entries, __ATOMIC_SEQ_CST); + uint32_t capacity = __atomic_load_n(&buffer->header.nbits, __ATOMIC_SEQ_CST); + + return gorilla_reader_t { + .buffer = buffer, + .entries = entries, + .index = 0, + .capacity = capacity, + .position = 0, + .prev_number = 0, + .prev_xor_lzc = 0, + .prev_xor = 0, + }; +} + +gorilla_reader_t gorilla_reader_init(gorilla_buffer_t *gbuf) +{ + uint32_t entries = __atomic_load_n(&gbuf->header.entries, __ATOMIC_SEQ_CST); + uint32_t capacity = __atomic_load_n(&gbuf->header.nbits, __ATOMIC_SEQ_CST); + + return gorilla_reader_t { + .buffer = gbuf, + .entries = entries, + .index = 0, + .capacity = capacity, + .position = 0, + .prev_number = 0, + .prev_xor_lzc = 0, + .prev_xor = 0, + }; +} + +bool gorilla_reader_read(gorilla_reader_t *gr, uint32_t *number) +{ + const uint32_t *data = gr->buffer->data; + + if (gr->index + 1 > gr->entries) { + // We don't have any more entries to return. However, the writer + // might have updated the buffer's entries. We need to check once + // more in case more elements were added. + gr->entries = __atomic_load_n(&gr->buffer->header.entries, __ATOMIC_SEQ_CST); + gr->capacity = __atomic_load_n(&gr->buffer->header.nbits, __ATOMIC_SEQ_CST); + + // if the reader's current buffer has not been updated, we need to + // check if it has a pointer to a next buffer. + if (gr->index + 1 > gr->entries) { + gorilla_buffer_t *next_buffer = __atomic_load_n(&gr->buffer->header.next, __ATOMIC_SEQ_CST); + + if (!next_buffer) { + // fprintf(stderr, "Consumed reader with %zu entries from buffer %p\n (No more buffers to read from)", gr->length, gr->buffer); + return false; + } + + // fprintf(stderr, "Consumed reader with %zu entries from buffer %p\n", gr->length, gr->buffer); + *gr = gorilla_reader_init(next_buffer); + return gorilla_reader_read(gr, number); + } + } + + // read the first number + if (gr->index == 0) { + bit_buffer_read(data, gr->position, number, bit_size()); + + gr->index++; + gr->position += bit_size(); + gr->prev_number = *number; + return true; + } + + // process same-number bit + uint32_t is_same_number; + bit_buffer_read(data, gr->position, &is_same_number, 1); + gr->position++; + + if (is_same_number) { + *number = gr->prev_number; + gr->index++; + return true; + } + + // proceess same-xor-lzc bit + uint32_t xor_lzc = gr->prev_xor_lzc; + + uint32_t same_xor_lzc; + bit_buffer_read(data, gr->position, &same_xor_lzc, 1); + gr->position++; + + if (!same_xor_lzc) { + bit_buffer_read(data, gr->position, &xor_lzc, (bit_size() == 32) ? 5 : 6); + gr->position += (bit_size() == 32) ? 5 : 6; + } + + // process the non-lzc suffix + uint32_t xor_value = 0; + bit_buffer_read(data, gr->position, &xor_value, bit_size() - xor_lzc); + gr->position += bit_size() - xor_lzc; + + *number = (gr->prev_number ^ xor_value); + + gr->index++; + gr->prev_number = *number; + gr->prev_xor_lzc = xor_lzc; + gr->prev_xor = xor_value; + + return true; +} + +/* + * Internal code used for fuzzing the library +*/ + +#ifdef ENABLE_FUZZER + +#include + +template +static std::vector random_vector(const uint8_t *data, size_t size) { + std::vector V; + + V.reserve(1024); + + while (size >= sizeof(Word)) { + size -= sizeof(Word); + + Word w; + memcpy(&w, &data[size], sizeof(Word)); + V.push_back(w); + } + + return V; +} + +class Storage { +public: + gorilla_buffer_t *alloc_buffer(size_t words) { + uint32_t *new_buffer = new uint32_t[words](); + assert(((((uintptr_t) new_buffer) % 8u) == 0) && "Unaligned buffer..."); + Buffers.push_back(new_buffer); + return reinterpret_cast(new_buffer); + } + + void free_buffers() { + for (uint32_t *buffer : Buffers) { + delete[] buffer; + } + } + +private: + std::vector Buffers; +}; + +extern "C" int LLVMFuzzerTestOneInput(const uint8_t *Data, size_t Size) { + if (Size < 4) + return 0; + + std::vector RandomData = random_vector(Data, Size); + + Storage S; + size_t words_per_buffer = 8; + + /* + * write data + */ + gorilla_buffer_t *first_buffer = S.alloc_buffer(words_per_buffer); + gorilla_writer_t gw = gorilla_writer_init(first_buffer, words_per_buffer); + + for (size_t i = 0; i != RandomData.size(); i++) { + bool ok = gorilla_writer_write(&gw, RandomData[i]); + if (ok) + continue; + + // add new buffer + gorilla_buffer_t *buffer = S.alloc_buffer(words_per_buffer); + gorilla_writer_add_buffer(&gw, buffer, words_per_buffer); + + ok = gorilla_writer_write(&gw, RandomData[i]); + assert(ok && "Could not write data to new buffer!!!"); + } + + + /* + * read data + */ + gorilla_reader_t gr = gorilla_writer_get_reader(&gw); + + for (size_t i = 0; i != RandomData.size(); i++) { + uint32_t number = 0; + bool ok = gorilla_reader_read(&gr, &number); + assert(ok && "Failed to read number from gorilla buffer"); + + assert((number == RandomData[i]) + && "Read wrong number from gorilla buffer"); + } + + S.free_buffers(); + return 0; +} + +#endif /* ENABLE_FUZZER */ + +#ifdef ENABLE_BENCHMARK + +#include +#include + +static size_t NumItems = 1024; + +static void BM_EncodeU32Numbers(benchmark::State& state) { + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution dist(0x0, 0x0000FFFF); + + std::vector RandomData; + for (size_t idx = 0; idx != NumItems; idx++) { + RandomData.push_back(dist(mt)); + } + std::vector EncodedData(10 * RandomData.capacity(), 0); + + for (auto _ : state) { + gorilla_writer_t gw = gorilla_writer_init( + reinterpret_cast(EncodedData.data()), + EncodedData.size()); + + for (size_t i = 0; i != RandomData.size(); i++) + benchmark::DoNotOptimize(gorilla_writer_write(&gw, RandomData[i])); + + benchmark::ClobberMemory(); + } + + state.SetItemsProcessed(NumItems * state.iterations()); + state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint32_t)); +} +BENCHMARK(BM_EncodeU32Numbers)->ThreadRange(1, 16)->UseRealTime(); + +static void BM_DecodeU32Numbers(benchmark::State& state) { + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution dist(0x0, 0xFFFFFFFF); + + std::vector RandomData; + for (size_t idx = 0; idx != NumItems; idx++) { + RandomData.push_back(dist(mt)); + } + std::vector EncodedData(10 * RandomData.capacity(), 0); + std::vector DecodedData(10 * RandomData.capacity(), 0); + + gorilla_writer_t gw = gorilla_writer_init( + reinterpret_cast(EncodedData.data()), + EncodedData.size()); + + for (size_t i = 0; i != RandomData.size(); i++) + gorilla_writer_write(&gw, RandomData[i]); + + for (auto _ : state) { + gorilla_reader_t gr = gorilla_reader_init(reinterpret_cast(EncodedData.data())); + + for (size_t i = 0; i != RandomData.size(); i++) { + uint32_t number = 0; + benchmark::DoNotOptimize(gorilla_reader_read(&gr, &number)); + } + + benchmark::ClobberMemory(); + } + + state.SetItemsProcessed(NumItems * state.iterations()); + state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint32_t)); +} +BENCHMARK(BM_DecodeU32Numbers)->ThreadRange(1, 16)->UseRealTime(); + +#endif /* ENABLE_BENCHMARK */ diff --git a/libnetdata/gorilla/gorilla.h b/libnetdata/gorilla/gorilla.h new file mode 100644 index 00000000..d57c07cf --- /dev/null +++ b/libnetdata/gorilla/gorilla.h @@ -0,0 +1,77 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef GORILLA_H +#define GORILLA_H + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct gorilla_buffer; + +typedef struct { + struct gorilla_buffer *next; + uint32_t entries; + uint32_t nbits; +} gorilla_header_t; + +typedef struct gorilla_buffer { + gorilla_header_t header; + uint32_t data[]; +} gorilla_buffer_t; + +typedef struct { + gorilla_buffer_t *head_buffer; + gorilla_buffer_t *last_buffer; + + uint32_t prev_number; + uint32_t prev_xor_lzc; + + // in bits + uint32_t capacity; +} gorilla_writer_t; + +typedef struct { + const gorilla_buffer_t *buffer; + + // number of values + size_t entries; + size_t index; + + // in bits + size_t capacity; // FIXME: this not needed on the reader's side + size_t position; + + uint32_t prev_number; + uint32_t prev_xor_lzc; + uint32_t prev_xor; +} gorilla_reader_t; + +gorilla_writer_t gorilla_writer_init(gorilla_buffer_t *gbuf, size_t n); +void gorilla_writer_add_buffer(gorilla_writer_t *gw, gorilla_buffer_t *gbuf, size_t n); +bool gorilla_writer_write(gorilla_writer_t *gw, uint32_t number); +uint32_t gorilla_writer_entries(const gorilla_writer_t *gw); + +gorilla_reader_t gorilla_writer_get_reader(const gorilla_writer_t *gw); + +gorilla_buffer_t *gorilla_writer_drop_head_buffer(gorilla_writer_t *gw); + +uint32_t gorilla_writer_nbytes(const gorilla_writer_t *gw); +bool gorilla_writer_serialize(const gorilla_writer_t *gw, uint8_t *dst, uint32_t dst_size); + +uint32_t gorilla_buffer_patch(gorilla_buffer_t *buf); +gorilla_reader_t gorilla_reader_init(gorilla_buffer_t *buf); +bool gorilla_reader_read(gorilla_reader_t *gr, uint32_t *number); + +#define GORILLA_BUFFER_SLOTS 128 +#define GORILLA_BUFFER_SIZE (GORILLA_BUFFER_SLOTS * sizeof(uint32_t)) + +#ifdef __cplusplus +} +#endif + +#endif /* GORILLA_H */ -- cgit v1.2.3