diff options
Diffstat (limited to 'src/libnetdata/gorilla/gorilla.cc')
-rw-r--r-- | src/libnetdata/gorilla/gorilla.cc | 522 |
1 files changed, 522 insertions, 0 deletions
diff --git a/src/libnetdata/gorilla/gorilla.cc b/src/libnetdata/gorilla/gorilla.cc new file mode 100644 index 000000000..c76018365 --- /dev/null +++ b/src/libnetdata/gorilla/gorilla.cc @@ -0,0 +1,522 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "gorilla.h" + +#include <cassert> +#include <climits> +#include <cstdio> +#include <cstring> + +using std::size_t; + +template <typename T> +static constexpr size_t bit_size() noexcept +{ + static_assert((sizeof(T) * CHAR_BIT) == 32 || (sizeof(T) * CHAR_BIT) == 64, + "Word size should be 32 or 64 bits."); + return (sizeof(T) * CHAR_BIT); +} + +static void bit_buffer_write(uint32_t *buf, size_t pos, uint32_t v, size_t nbits) +{ + assert(nbits > 0 && nbits <= bit_size<uint32_t>()); + + const size_t index = pos / bit_size<uint32_t>(); + const size_t offset = pos % bit_size<uint32_t>(); + + pos += nbits; + + if (offset == 0) { + buf[index] = v; + } else { + const size_t remaining_bits = bit_size<uint32_t>() - offset; + + // write the lower part of the value + const uint32_t low_bits_mask = ((uint32_t) 1 << remaining_bits) - 1; + const uint32_t lowest_bits_in_value = v & low_bits_mask; + buf[index] |= (lowest_bits_in_value << offset); + + if (nbits > remaining_bits) { + // write the upper part of the value + const uint32_t high_bits_mask = ~low_bits_mask; + const uint32_t highest_bits_in_value = (v & high_bits_mask) >> (remaining_bits); + buf[index + 1] = highest_bits_in_value; + } + } +} + +static void bit_buffer_read(const uint32_t *buf, size_t pos, uint32_t *v, size_t nbits) +{ + assert(nbits > 0 && nbits <= bit_size<uint32_t>()); + + const size_t index = pos / bit_size<uint32_t>(); + const size_t offset = pos % bit_size<uint32_t>(); + + pos += nbits; + + if (offset == 0) { + *v = (nbits == bit_size<uint32_t>()) ? + buf[index] : + buf[index] & (((uint32_t) 1 << nbits) - 1); + } else { + const size_t remaining_bits = bit_size<uint32_t>() - offset; + + // extract the lower part of the value + if (nbits < remaining_bits) { + *v = (buf[index] >> offset) & (((uint32_t) 1 << nbits) - 1); + } else { + *v = (buf[index] >> offset) & (((uint32_t) 1 << remaining_bits) - 1); + nbits -= remaining_bits; + *v |= (buf[index + 1] & (((uint32_t) 1 << nbits) - 1)) << remaining_bits; + } + } +} + +gorilla_writer_t gorilla_writer_init(gorilla_buffer_t *gbuf, size_t n) +{ + gorilla_writer_t gw = gorilla_writer_t { + .head_buffer = gbuf, + .last_buffer = NULL, + .prev_number = 0, + .prev_xor_lzc = 0, + .capacity = 0 + }; + + gorilla_writer_add_buffer(&gw, gbuf, n); + return gw; +} + +void gorilla_writer_add_buffer(gorilla_writer_t *gw, gorilla_buffer_t *gbuf, size_t n) +{ + gbuf->header.next = NULL; + gbuf->header.entries = 0; + gbuf->header.nbits = 0; + + uint32_t capacity = (n * bit_size<uint32_t>()) - (sizeof(gorilla_header_t) * CHAR_BIT); + + gw->prev_number = 0; + gw->prev_xor_lzc = 0; + gw->capacity = capacity; + + if (gw->last_buffer) + gw->last_buffer->header.next = gbuf; + + __atomic_store_n(&gw->last_buffer, gbuf, __ATOMIC_RELAXED); +} + +uint32_t gorilla_writer_entries(const gorilla_writer_t *gw) { + uint32_t entries = 0; + + const gorilla_buffer_t *curr_gbuf = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST); + do { + const gorilla_buffer_t *next_gbuf = __atomic_load_n(&curr_gbuf->header.next, __ATOMIC_SEQ_CST); + + entries += __atomic_load_n(&curr_gbuf->header.entries, __ATOMIC_SEQ_CST); + + curr_gbuf = next_gbuf; + } while (curr_gbuf); + + return entries; +} + +bool gorilla_writer_write(gorilla_writer_t *gw, uint32_t number) +{ + gorilla_header_t *hdr = &gw->last_buffer->header; + uint32_t *data = gw->last_buffer->data; + + // this is the first number we are writing + if (hdr->entries == 0) { + if (hdr->nbits + bit_size<uint32_t>() >= gw->capacity) + return false; + bit_buffer_write(data, hdr->nbits, number, bit_size<uint32_t>()); + + __atomic_fetch_add(&hdr->nbits, bit_size<uint32_t>(), __ATOMIC_RELAXED); + __atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED); + gw->prev_number = number; + return true; + } + + // write true/false based on whether we got the same number or not. + if (number == gw->prev_number) { + if (hdr->nbits + 1 >= gw->capacity) + return false; + + bit_buffer_write(data, hdr->nbits, static_cast<uint32_t>(1), 1); + __atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED); + return true; + } + + if (hdr->nbits + 1 >= gw->capacity) + return false; + bit_buffer_write(data, hdr->nbits, static_cast<uint32_t>(0), 1); + __atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED); + + uint32_t xor_value = gw->prev_number ^ number; + uint32_t xor_lzc = (bit_size<uint32_t>() == 32) ? __builtin_clz(xor_value) : __builtin_clzll(xor_value); + uint32_t is_xor_lzc_same = (xor_lzc == gw->prev_xor_lzc) ? 1 : 0; + + if (hdr->nbits + 1 >= gw->capacity) + return false; + bit_buffer_write(data, hdr->nbits, is_xor_lzc_same, 1); + __atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED); + + if (!is_xor_lzc_same) { + if (hdr->nbits + 1 >= gw->capacity) + return false; + bit_buffer_write(data, hdr->nbits, xor_lzc, (bit_size<uint32_t>() == 32) ? 5 : 6); + __atomic_fetch_add(&hdr->nbits, (bit_size<uint32_t>() == 32) ? 5 : 6, __ATOMIC_RELAXED); + } + + // write the bits of the XOR'd value without the LZC prefix + if (hdr->nbits + (bit_size<uint32_t>() - xor_lzc) >= gw->capacity) + return false; + bit_buffer_write(data, hdr->nbits, xor_value, bit_size<uint32_t>() - xor_lzc); + __atomic_fetch_add(&hdr->nbits, bit_size<uint32_t>() - xor_lzc, __ATOMIC_RELAXED); + __atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED); + + gw->prev_number = number; + gw->prev_xor_lzc = xor_lzc; + return true; +} + +gorilla_buffer_t *gorilla_writer_drop_head_buffer(gorilla_writer_t *gw) { + if (!gw->head_buffer) + return NULL; + + gorilla_buffer_t *curr_head = gw->head_buffer; + gorilla_buffer_t *next_head = gw->head_buffer->header.next; + __atomic_store_n(&gw->head_buffer, next_head, __ATOMIC_RELAXED); + return curr_head; +} + +uint32_t gorilla_writer_nbytes(const gorilla_writer_t *gw) +{ + uint32_t nbits = 0; + + const gorilla_buffer_t *curr_gbuf = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST); + do { + const gorilla_buffer_t *next_gbuf = __atomic_load_n(&curr_gbuf->header.next, __ATOMIC_SEQ_CST); + + nbits += __atomic_load_n(&curr_gbuf->header.nbits, __ATOMIC_SEQ_CST); + + curr_gbuf = next_gbuf; + } while (curr_gbuf); + + return (nbits + (CHAR_BIT - 1)) / CHAR_BIT; +} + +bool gorilla_writer_serialize(const gorilla_writer_t *gw, uint8_t *dst, uint32_t dst_size) { + const gorilla_buffer_t *curr_gbuf = gw->head_buffer; + + do { + const gorilla_buffer_t *next_gbuf = curr_gbuf->header.next; + + size_t bytes = RRDENG_GORILLA_32BIT_BUFFER_SIZE; + if (bytes > dst_size) + return false; + + memcpy(dst, curr_gbuf, bytes); + dst += bytes; + dst_size -= bytes; + + curr_gbuf = next_gbuf; + } while (curr_gbuf); + + return true; +} + +uint32_t gorilla_buffer_patch(gorilla_buffer_t *gbuf) { + gorilla_buffer_t *curr_gbuf = gbuf; + uint32_t n = curr_gbuf->header.entries; + + while (curr_gbuf->header.next) { + uint32_t *buf = reinterpret_cast<uint32_t *>(gbuf); + gbuf = reinterpret_cast<gorilla_buffer_t *>(&buf[RRDENG_GORILLA_32BIT_BUFFER_SLOTS]); + + assert(((uintptr_t) (gbuf) % sizeof(uintptr_t)) == 0 && + "Gorilla buffer not aligned to uintptr_t"); + + curr_gbuf->header.next = gbuf; + curr_gbuf = curr_gbuf->header.next; + + n += curr_gbuf->header.entries; + } + + return n; +} + +gorilla_reader_t gorilla_writer_get_reader(const gorilla_writer_t *gw) +{ + const gorilla_buffer_t *buffer = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST); + + uint32_t entries = __atomic_load_n(&buffer->header.entries, __ATOMIC_SEQ_CST); + uint32_t capacity = __atomic_load_n(&buffer->header.nbits, __ATOMIC_SEQ_CST); + + return gorilla_reader_t { + .buffer = buffer, + .entries = entries, + .index = 0, + .capacity = capacity, + .position = 0, + .prev_number = 0, + .prev_xor_lzc = 0, + .prev_xor = 0, + }; +} + +gorilla_reader_t gorilla_reader_init(gorilla_buffer_t *gbuf) +{ + uint32_t entries = __atomic_load_n(&gbuf->header.entries, __ATOMIC_SEQ_CST); + uint32_t capacity = __atomic_load_n(&gbuf->header.nbits, __ATOMIC_SEQ_CST); + + return gorilla_reader_t { + .buffer = gbuf, + .entries = entries, + .index = 0, + .capacity = capacity, + .position = 0, + .prev_number = 0, + .prev_xor_lzc = 0, + .prev_xor = 0, + }; +} + +bool gorilla_reader_read(gorilla_reader_t *gr, uint32_t *number) +{ + const uint32_t *data = gr->buffer->data; + + if (gr->index + 1 > gr->entries) { + // We don't have any more entries to return. However, the writer + // might have updated the buffer's entries. We need to check once + // more in case more elements were added. + gr->entries = __atomic_load_n(&gr->buffer->header.entries, __ATOMIC_SEQ_CST); + gr->capacity = __atomic_load_n(&gr->buffer->header.nbits, __ATOMIC_SEQ_CST); + + // if the reader's current buffer has not been updated, we need to + // check if it has a pointer to a next buffer. + if (gr->index + 1 > gr->entries) { + gorilla_buffer_t *next_buffer = __atomic_load_n(&gr->buffer->header.next, __ATOMIC_SEQ_CST); + + if (!next_buffer) { + // fprintf(stderr, "Consumed reader with %zu entries from buffer %p\n (No more buffers to read from)", gr->length, gr->buffer); + return false; + } + + // fprintf(stderr, "Consumed reader with %zu entries from buffer %p\n", gr->length, gr->buffer); + *gr = gorilla_reader_init(next_buffer); + return gorilla_reader_read(gr, number); + } + } + + // read the first number + if (gr->index == 0) { + bit_buffer_read(data, gr->position, number, bit_size<uint32_t>()); + + gr->index++; + gr->position += bit_size<uint32_t>(); + gr->prev_number = *number; + return true; + } + + // process same-number bit + uint32_t is_same_number; + bit_buffer_read(data, gr->position, &is_same_number, 1); + gr->position++; + + if (is_same_number) { + *number = gr->prev_number; + gr->index++; + return true; + } + + // proceess same-xor-lzc bit + uint32_t xor_lzc = gr->prev_xor_lzc; + + uint32_t same_xor_lzc; + bit_buffer_read(data, gr->position, &same_xor_lzc, 1); + gr->position++; + + if (!same_xor_lzc) { + bit_buffer_read(data, gr->position, &xor_lzc, (bit_size<uint32_t>() == 32) ? 5 : 6); + gr->position += (bit_size<uint32_t>() == 32) ? 5 : 6; + } + + // process the non-lzc suffix + uint32_t xor_value = 0; + bit_buffer_read(data, gr->position, &xor_value, bit_size<uint32_t>() - xor_lzc); + gr->position += bit_size<uint32_t>() - xor_lzc; + + *number = (gr->prev_number ^ xor_value); + + gr->index++; + gr->prev_number = *number; + gr->prev_xor_lzc = xor_lzc; + gr->prev_xor = xor_value; + + return true; +} + +/* + * Internal code used for fuzzing the library +*/ + +#ifdef ENABLE_FUZZER + +#include <vector> + +template<typename Word> +static std::vector<Word> random_vector(const uint8_t *data, size_t size) { + std::vector<Word> V; + + V.reserve(1024); + + while (size >= sizeof(Word)) { + size -= sizeof(Word); + + Word w; + memcpy(&w, &data[size], sizeof(Word)); + V.push_back(w); + } + + return V; +} + +class Storage { +public: + gorilla_buffer_t *alloc_buffer(size_t words) { + uint32_t *new_buffer = new uint32_t[words](); + assert(((((uintptr_t) new_buffer) % 8u) == 0) && "Unaligned buffer..."); + Buffers.push_back(new_buffer); + return reinterpret_cast<gorilla_buffer_t *>(new_buffer); + } + + void free_buffers() { + for (uint32_t *buffer : Buffers) { + delete[] buffer; + } + } + +private: + std::vector<uint32_t *> Buffers; +}; + +extern "C" int LLVMFuzzerTestOneInput(const uint8_t *Data, size_t Size) { + if (Size < 4) + return 0; + + std::vector<uint32_t> RandomData = random_vector<uint32_t>(Data, Size); + + Storage S; + size_t words_per_buffer = 8; + + /* + * write data + */ + gorilla_buffer_t *first_buffer = S.alloc_buffer(words_per_buffer); + gorilla_writer_t gw = gorilla_writer_init(first_buffer, words_per_buffer); + + for (size_t i = 0; i != RandomData.size(); i++) { + bool ok = gorilla_writer_write(&gw, RandomData[i]); + if (ok) + continue; + + // add new buffer + gorilla_buffer_t *buffer = S.alloc_buffer(words_per_buffer); + gorilla_writer_add_buffer(&gw, buffer, words_per_buffer); + + ok = gorilla_writer_write(&gw, RandomData[i]); + assert(ok && "Could not write data to new buffer!!!"); + } + + + /* + * read data + */ + gorilla_reader_t gr = gorilla_writer_get_reader(&gw); + + for (size_t i = 0; i != RandomData.size(); i++) { + uint32_t number = 0; + bool ok = gorilla_reader_read(&gr, &number); + assert(ok && "Failed to read number from gorilla buffer"); + + assert((number == RandomData[i]) + && "Read wrong number from gorilla buffer"); + } + + S.free_buffers(); + return 0; +} + +#endif /* ENABLE_FUZZER */ + +#ifdef ENABLE_BENCHMARK + +#include <benchmark/benchmark.h> +#include <random> + +static size_t NumItems = 1024; + +static void BM_EncodeU32Numbers(benchmark::State& state) { + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution<uint32_t> dist(0x0, 0x0000FFFF); + + std::vector<uint32_t> RandomData; + for (size_t idx = 0; idx != NumItems; idx++) { + RandomData.push_back(dist(mt)); + } + std::vector<uint32_t> EncodedData(10 * RandomData.capacity(), 0); + + for (auto _ : state) { + gorilla_writer_t gw = gorilla_writer_init( + reinterpret_cast<gorilla_buffer_t *>(EncodedData.data()), + EncodedData.size()); + + for (size_t i = 0; i != RandomData.size(); i++) + benchmark::DoNotOptimize(gorilla_writer_write(&gw, RandomData[i])); + + benchmark::ClobberMemory(); + } + + state.SetItemsProcessed(NumItems * state.iterations()); + state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint32_t)); +} +BENCHMARK(BM_EncodeU32Numbers)->ThreadRange(1, 16)->UseRealTime(); + +static void BM_DecodeU32Numbers(benchmark::State& state) { + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution<uint32_t> dist(0x0, 0xFFFFFFFF); + + std::vector<uint32_t> RandomData; + for (size_t idx = 0; idx != NumItems; idx++) { + RandomData.push_back(dist(mt)); + } + std::vector<uint32_t> EncodedData(10 * RandomData.capacity(), 0); + std::vector<uint32_t> DecodedData(10 * RandomData.capacity(), 0); + + gorilla_writer_t gw = gorilla_writer_init( + reinterpret_cast<gorilla_buffer_t *>(EncodedData.data()), + EncodedData.size()); + + for (size_t i = 0; i != RandomData.size(); i++) + gorilla_writer_write(&gw, RandomData[i]); + + for (auto _ : state) { + gorilla_reader_t gr = gorilla_reader_init(reinterpret_cast<gorilla_buffer_t *>(EncodedData.data())); + + for (size_t i = 0; i != RandomData.size(); i++) { + uint32_t number = 0; + benchmark::DoNotOptimize(gorilla_reader_read(&gr, &number)); + } + + benchmark::ClobberMemory(); + } + + state.SetItemsProcessed(NumItems * state.iterations()); + state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint32_t)); +} +BENCHMARK(BM_DecodeU32Numbers)->ThreadRange(1, 16)->UseRealTime(); + +#endif /* ENABLE_BENCHMARK */ |