diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:22 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:22 +0000 |
commit | c21c3b0befeb46a51b6bf3758ffa30813bea0ff0 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /libnetdata/gorilla | |
parent | Adding upstream version 1.43.2. (diff) | |
download | netdata-upstream/1.44.3.tar.xz netdata-upstream/1.44.3.zip |
Adding upstream version 1.44.3.upstream/1.44.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | libnetdata/gorilla/README.md | 39 | ||||
-rwxr-xr-x | libnetdata/gorilla/fuzzer.sh | 2 | ||||
-rw-r--r-- | libnetdata/gorilla/gorilla.cc | 726 | ||||
-rw-r--r-- | libnetdata/gorilla/gorilla.h | 77 |
4 files changed, 401 insertions, 443 deletions
diff --git a/libnetdata/gorilla/README.md b/libnetdata/gorilla/README.md new file mode 100644 index 000000000..dc3718d13 --- /dev/null +++ b/libnetdata/gorilla/README.md @@ -0,0 +1,39 @@ +# Gorilla compression and decompression + +This provides an alternative way of representing values stored in database +pages. Instead of allocating and using a page of fixed size, ie. 4096 bytes, +the Gorilla implementation adds support for dynamically sized pages that +contain a variable number of Gorilla buffers. + +Each buffer takes 512 bytes and compresses incoming data using the Gorilla +compression: + +- The very first value is stored as it is. +- For each new value, Gorilla compression doesn't store the value itself. Instead, +it computes the difference (XOR) between the new value and the previous value. +- If the XOR result is zero (meaning the new value is identical to the previous +value), we store just a single bit set to `1`. +- If the XOR result is not zero (meaning the new value differs from the previous): + - We store a `0` bit to indicate the change. + - We compute the leading-zero count (LZC) of the XOR result, and compare it + with the previous LZC. If the two LZCs are equal we store a `1` bit. + - If the LZCs are different we use 5 bits to store the new LZC, and we store + the rest of the value (ie. without its LZC) in the buffer. + +A Gorilla page can have multiple Gorilla buffers. If the values of a metric +are highly compressible, just one Gorilla buffer is able to store all the values +that otherwise would require a regular 4096 byte page, ie. we can use just 512 +bytes instead. In the worst case scenario (for metrics whose values are not +compressible at all), a Gorilla page might end up having `9` Gorilla buffers, +consuming 4608 bytes. In practice, this is pretty rare and does not negate +the effect of compression for the metrics. + +When a gorilla page is full, ie. it contains 1024 slots/values, we serialize +the linked-list of gorilla buffers directly to disk. During deserialization, +eg. when performing a DBEngine query, the Gorilla page is loaded from the disk and +its linked-list entries are patched to point to the new memory allocated for +serving the query results. + +Overall, on a real-agent the Gorilla compression scheme reduces memory +consumption approximately by ~30%, which can be several GiB of RAM for parents +having hundreds, or even thousands of children streaming to them. diff --git a/libnetdata/gorilla/fuzzer.sh b/libnetdata/gorilla/fuzzer.sh index 9dfdec055..19098a615 100755 --- a/libnetdata/gorilla/fuzzer.sh +++ b/libnetdata/gorilla/fuzzer.sh @@ -11,4 +11,4 @@ clang++ \ -fsanitize=fuzzer \ -o gorilla_fuzzer gorilla.cc -./gorilla_fuzzer -workers=8 -jobs=8 +./gorilla_fuzzer -workers=12 -jobs=16 diff --git a/libnetdata/gorilla/gorilla.cc b/libnetdata/gorilla/gorilla.cc index af2f74007..e6138ce38 100644 --- a/libnetdata/gorilla/gorilla.cc +++ b/libnetdata/gorilla/gorilla.cc @@ -17,413 +17,344 @@ static constexpr size_t bit_size() noexcept return (sizeof(T) * CHAR_BIT); } -/* - * Low-level bitstream operations, allowing us to read/write individual bits. -*/ - -template<typename Word> -struct bit_stream_t { - Word *buffer; - size_t capacity; - size_t position; -}; - -template<typename Word> -static bit_stream_t<Word> bit_stream_new(Word *buffer, Word capacity) { - bit_stream_t<Word> bs; - - bs.buffer = buffer; - bs.capacity = capacity * bit_size<Word>(); - bs.position = 0; - - return bs; -} - -template<typename Word> -static bool bit_stream_write(bit_stream_t<Word> *bs, Word value, size_t nbits) { - assert(nbits > 0 && nbits <= bit_size<Word>()); - assert(bs->capacity >= (bs->position + nbits)); +static void bit_buffer_write(uint32_t *buf, size_t pos, uint32_t v, size_t nbits) +{ + assert(nbits > 0 && nbits <= bit_size<uint32_t>()); - if (bs->position + nbits > bs->capacity) { - return false; - } + const size_t index = pos / bit_size<uint32_t>(); + const size_t offset = pos % bit_size<uint32_t>(); - const size_t index = bs->position / bit_size<Word>(); - const size_t offset = bs->position % bit_size<Word>(); - bs->position += nbits; + pos += nbits; if (offset == 0) { - bs->buffer[index] = value; + buf[index] = v; } else { - const size_t remaining_bits = bit_size<Word>() - offset; + const size_t remaining_bits = bit_size<uint32_t>() - offset; // write the lower part of the value - const Word low_bits_mask = ((Word) 1 << remaining_bits) - 1; - const Word lowest_bits_in_value = value & low_bits_mask; - bs->buffer[index] |= (lowest_bits_in_value << offset); + const uint32_t low_bits_mask = ((uint32_t) 1 << remaining_bits) - 1; + const uint32_t lowest_bits_in_value = v & low_bits_mask; + buf[index] |= (lowest_bits_in_value << offset); if (nbits > remaining_bits) { // write the upper part of the value - const Word high_bits_mask = ~low_bits_mask; - const Word highest_bits_in_value = (value & high_bits_mask) >> (remaining_bits); - bs->buffer[index + 1] = highest_bits_in_value; + const uint32_t high_bits_mask = ~low_bits_mask; + const uint32_t highest_bits_in_value = (v & high_bits_mask) >> (remaining_bits); + buf[index + 1] = highest_bits_in_value; } } - - return true; } -template<typename Word> -static bool bit_stream_read(bit_stream_t<Word> *bs, Word *value, size_t nbits) { - assert(nbits > 0 && nbits <= bit_size<Word>()); - assert(bs->capacity >= (bs->position + nbits)); +static void bit_buffer_read(const uint32_t *buf, size_t pos, uint32_t *v, size_t nbits) +{ + assert(nbits > 0 && nbits <= bit_size<uint32_t>()); - if (bs->position + nbits > bs->capacity) { - return false; - } + const size_t index = pos / bit_size<uint32_t>(); + const size_t offset = pos % bit_size<uint32_t>(); - const size_t index = bs->position / bit_size<Word>(); - const size_t offset = bs->position % bit_size<Word>(); - bs->position += nbits; + pos += nbits; if (offset == 0) { - *value = (nbits == bit_size<Word>()) ? - bs->buffer[index] : - bs->buffer[index] & (((Word) 1 << nbits) - 1); + *v = (nbits == bit_size<uint32_t>()) ? + buf[index] : + buf[index] & (((uint32_t) 1 << nbits) - 1); } else { - const size_t remaining_bits = bit_size<Word>() - offset; + const size_t remaining_bits = bit_size<uint32_t>() - offset; // extract the lower part of the value if (nbits < remaining_bits) { - *value = (bs->buffer[index] >> offset) & (((Word) 1 << nbits) - 1); + *v = (buf[index] >> offset) & (((uint32_t) 1 << nbits) - 1); } else { - *value = (bs->buffer[index] >> offset) & (((Word) 1 << remaining_bits) - 1); + *v = (buf[index] >> offset) & (((uint32_t) 1 << remaining_bits) - 1); nbits -= remaining_bits; - *value |= (bs->buffer[index + 1] & (((Word) 1 << nbits) - 1)) << remaining_bits; + *v |= (buf[index + 1] & (((uint32_t) 1 << nbits) - 1)) << remaining_bits; } } - - return true; } -/* - * High-level Gorilla codec implementation -*/ - -template<typename Word> -struct bit_code_t { - bit_stream_t<Word> bs; - Word entries; - Word prev_number; - Word prev_xor; - Word prev_xor_lzc; -}; - -template<typename Word> -static void bit_code_init(bit_code_t<Word> *bc, Word *buffer, Word capacity) { - bc->bs = bit_stream_new(buffer, capacity); - - bc->entries = 0; - bc->prev_number = 0; - bc->prev_xor = 0; - bc->prev_xor_lzc = 0; - - // reserved two words: - // Buffer[0] -> number of entries written - // Buffer[1] -> number of bits written +gorilla_writer_t gorilla_writer_init(gorilla_buffer_t *gbuf, size_t n) +{ + gorilla_writer_t gw = gorilla_writer_t { + .head_buffer = gbuf, + .last_buffer = NULL, + .prev_number = 0, + .prev_xor_lzc = 0, + .capacity = 0 + }; - bc->bs.position += 2 * bit_size<Word>(); + gorilla_writer_add_buffer(&gw, gbuf, n); + return gw; } -template<typename Word> -static bool bit_code_read(bit_code_t<Word> *bc, Word *number) { - bit_stream_t<Word> *bs = &bc->bs; - - bc->entries++; +void gorilla_writer_add_buffer(gorilla_writer_t *gw, gorilla_buffer_t *gbuf, size_t n) +{ + gbuf->header.next = NULL; + gbuf->header.entries = 0; + gbuf->header.nbits = 0; - // read the first number - if (bc->entries == 1) { - bool ok = bit_stream_read(bs, number, bit_size<Word>()); - bc->prev_number = *number; - return ok; - } + uint32_t capacity = (n * bit_size<uint32_t>()) - (sizeof(gorilla_header_t) * CHAR_BIT); - // process same-number bit - Word is_same_number; - if (!bit_stream_read(bs, &is_same_number, 1)) { - return false; - } + gw->prev_number = 0; + gw->prev_xor_lzc = 0; + gw->capacity = capacity; - if (is_same_number) { - *number = bc->prev_number; - return true; - } + if (gw->last_buffer) + gw->last_buffer->header.next = gbuf; - // proceess same-xor-lzc bit - Word xor_lzc = bc->prev_xor_lzc; - - Word same_xor_lzc; - if (!bit_stream_read(bs, &same_xor_lzc, 1)) { - return false; - } + __atomic_store_n(&gw->last_buffer, gbuf, __ATOMIC_RELAXED); +} - if (!same_xor_lzc) { - if (!bit_stream_read(bs, &xor_lzc, (bit_size<Word>() == 32) ? 5 : 6)) { - return false; - } - } +uint32_t gorilla_writer_entries(const gorilla_writer_t *gw) { + uint32_t entries = 0; - // process the non-lzc suffix - Word xor_value = 0; - if (!bit_stream_read(bs, &xor_value, bit_size<Word>() - xor_lzc)) { - return false; - } + const gorilla_buffer_t *curr_gbuf = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST); + do { + const gorilla_buffer_t *next_gbuf = __atomic_load_n(&curr_gbuf->header.next, __ATOMIC_SEQ_CST); - *number = (bc->prev_number ^ xor_value); + entries += __atomic_load_n(&curr_gbuf->header.entries, __ATOMIC_SEQ_CST); - bc->prev_number = *number; - bc->prev_xor_lzc = xor_lzc; - bc->prev_xor = xor_value; + curr_gbuf = next_gbuf; + } while (curr_gbuf); - return true; + return entries; } -template<typename Word> -static bool bit_code_write(bit_code_t<Word> *bc, const Word number) { - bit_stream_t<Word> *bs = &bc->bs; - Word position = bs->position; - - bc->entries++; +bool gorilla_writer_write(gorilla_writer_t *gw, uint32_t number) +{ + gorilla_header_t *hdr = &gw->last_buffer->header; + uint32_t *data = gw->last_buffer->data; // this is the first number we are writing - if (bc->entries == 1) { - bc->prev_number = number; - return bit_stream_write(bs, number, bit_size<Word>()); - } - - // write true/false based on whether we got the same number or not. - if (number == bc->prev_number) { - return bit_stream_write(bs, static_cast<Word>(1), 1); - } else { - if (bit_stream_write(bs, static_cast<Word>(0), 1) == false) { + if (hdr->entries == 0) { + if (hdr->nbits + bit_size<uint32_t>() >= gw->capacity) return false; - } - } - - // otherwise: - // - compute the non-zero xor - // - find its leading-zero count - - Word xor_value = bc->prev_number ^ number; - // FIXME: Use SFINAE - Word xor_lzc = (bit_size<Word>() == 32) ? __builtin_clz(xor_value) : __builtin_clzll(xor_value); - Word is_xor_lzc_same = (xor_lzc == bc->prev_xor_lzc) ? 1 : 0; - - if (is_xor_lzc_same) { - // xor-lzc is same - if (bit_stream_write(bs, static_cast<Word>(1), 1) == false) { - goto RET_FALSE; - } - } else { - // xor-lzc is different - if (bit_stream_write(bs, static_cast<Word>(0), 1) == false) { - goto RET_FALSE; - } - - if (bit_stream_write(bs, xor_lzc, (bit_size<Word>() == 32) ? 5 : 6) == false) { - goto RET_FALSE; - } - } + bit_buffer_write(data, hdr->nbits, number, bit_size<uint32_t>()); - // write the bits of the XOR value without the LZC prefix - if (bit_stream_write(bs, xor_value, bit_size<Word>() - xor_lzc) == false) { - goto RET_FALSE; + __atomic_fetch_add(&hdr->nbits, bit_size<uint32_t>(), __ATOMIC_RELAXED); + __atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED); + gw->prev_number = number; + return true; } - bc->prev_number = number; - bc->prev_xor_lzc = xor_lzc; - return true; - -RET_FALSE: - bc->bs.position = position; - return false; -} + // write true/false based on whether we got the same number or not. + if (number == gw->prev_number) { + if (hdr->nbits + 1 >= gw->capacity) + return false; -// only valid for writers -template<typename Word> -static bool bit_code_flush(bit_code_t<Word> *bc) { - bit_stream_t<Word> *bs = &bc->bs; + bit_buffer_write(data, hdr->nbits, static_cast<uint32_t>(1), 1); + __atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED); + __atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED); + return true; + } - Word num_entries_written = bc->entries; - Word num_bits_written = bs->position; + if (hdr->nbits + 1 >= gw->capacity) + return false; + bit_buffer_write(data, hdr->nbits, static_cast<uint32_t>(0), 1); + __atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED); - // we want to write these at the beginning - bs->position = 0; + uint32_t xor_value = gw->prev_number ^ number; + uint32_t xor_lzc = (bit_size<uint32_t>() == 32) ? __builtin_clz(xor_value) : __builtin_clzll(xor_value); + uint32_t is_xor_lzc_same = (xor_lzc == gw->prev_xor_lzc) ? 1 : 0; - if (!bit_stream_write(bs, num_entries_written, bit_size<Word>())) { + if (hdr->nbits + 1 >= gw->capacity) return false; + bit_buffer_write(data, hdr->nbits, is_xor_lzc_same, 1); + __atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED); + + if (!is_xor_lzc_same) { + if (hdr->nbits + 1 >= gw->capacity) + return false; + bit_buffer_write(data, hdr->nbits, xor_lzc, (bit_size<uint32_t>() == 32) ? 5 : 6); + __atomic_fetch_add(&hdr->nbits, (bit_size<uint32_t>() == 32) ? 5 : 6, __ATOMIC_RELAXED); } - if (!bit_stream_write(bs, num_bits_written, bit_size<Word>())) { + // write the bits of the XOR'd value without the LZC prefix + if (hdr->nbits + (bit_size<uint32_t>() - xor_lzc) >= gw->capacity) return false; - } + bit_buffer_write(data, hdr->nbits, xor_value, bit_size<uint32_t>() - xor_lzc); + __atomic_fetch_add(&hdr->nbits, bit_size<uint32_t>() - xor_lzc, __ATOMIC_RELAXED); + __atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED); - bs->position = num_bits_written; + gw->prev_number = number; + gw->prev_xor_lzc = xor_lzc; return true; } -// only valid for readers -template<typename Word> -static bool bit_code_info(bit_code_t<Word> *bc, Word *num_entries_written, - Word *num_bits_written) { - bit_stream_t<Word> *bs = &bc->bs; +gorilla_buffer_t *gorilla_writer_drop_head_buffer(gorilla_writer_t *gw) { + if (!gw->head_buffer) + return NULL; - assert(bs->position == 2 * bit_size<Word>()); - if (bs->capacity < (2 * bit_size<Word>())) { - return false; - } - - if (num_entries_written) { - *num_entries_written = bs->buffer[0]; - } - if (num_bits_written) { - *num_bits_written = bs->buffer[1]; - } - - return true; + gorilla_buffer_t *curr_head = gw->head_buffer; + gorilla_buffer_t *next_head = gw->head_buffer->header.next; + __atomic_store_n(&gw->head_buffer, next_head, __ATOMIC_RELAXED); + return curr_head; } -template<typename Word> -static size_t gorilla_encode(Word *dst, Word dst_len, const Word *src, Word src_len) { - bit_code_t<Word> bcw; +uint32_t gorilla_writer_nbytes(const gorilla_writer_t *gw) +{ + uint32_t nbits = 0; - bit_code_init(&bcw, dst, dst_len); + const gorilla_buffer_t *curr_gbuf = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST); + do { + const gorilla_buffer_t *next_gbuf = __atomic_load_n(&curr_gbuf->header.next, __ATOMIC_SEQ_CST); - for (size_t i = 0; i != src_len; i++) { - if (!bit_code_write(&bcw, src[i])) - return 0; - } + nbits += __atomic_load_n(&curr_gbuf->header.nbits, __ATOMIC_SEQ_CST); - if (!bit_code_flush(&bcw)) - return 0; + curr_gbuf = next_gbuf; + } while (curr_gbuf); - return src_len; + return (nbits + (CHAR_BIT - 1)) / CHAR_BIT; } -template<typename Word> -static size_t gorilla_decode(Word *dst, Word dst_len, const Word *src, Word src_len) { - bit_code_t<Word> bcr; +bool gorilla_writer_serialize(const gorilla_writer_t *gw, uint8_t *dst, uint32_t dst_size) { + const gorilla_buffer_t *curr_gbuf = gw->head_buffer; - bit_code_init(&bcr, (Word *) src, src_len); + do { + const gorilla_buffer_t *next_gbuf = curr_gbuf->header.next; - Word num_entries; - if (!bit_code_info(&bcr, &num_entries, (Word *) NULL)) { - return 0; - } - if (num_entries > dst_len) { - return 0; - } - - for (size_t i = 0; i != num_entries; i++) { - if (!bit_code_read(&bcr, &dst[i])) - return 0; - } + size_t bytes = GORILLA_BUFFER_SIZE; + if (bytes > dst_size) + return false; - return num_entries; -} + memcpy(dst, curr_gbuf, bytes); + dst += bytes; + dst_size -= bytes; -/* - * Low-level public API -*/ - -// 32-bit API + curr_gbuf = next_gbuf; + } while (curr_gbuf); -void bit_code_writer_u32_init(bit_code_writer_u32_t *bcw, uint32_t *buffer, uint32_t capacity) { - bit_code_t<uint32_t> *bc = (bit_code_t<uint32_t> *) bcw; - bit_code_init(bc, buffer, capacity); + return true; } -bool bit_code_writer_u32_write(bit_code_writer_u32_t *bcw, const uint32_t number) { - bit_code_t<uint32_t> *bc = (bit_code_t<uint32_t> *) bcw; - return bit_code_write(bc, number); -} +uint32_t gorilla_buffer_patch(gorilla_buffer_t *gbuf) { + gorilla_buffer_t *curr_gbuf = gbuf; + uint32_t n = curr_gbuf->header.entries; -bool bit_code_writer_u32_flush(bit_code_writer_u32_t *bcw) { - bit_code_t<uint32_t> *bc = (bit_code_t<uint32_t> *) bcw; - return bit_code_flush(bc); -} + while (curr_gbuf->header.next) { + uint32_t *buf = reinterpret_cast<uint32_t *>(gbuf); + gbuf = reinterpret_cast<gorilla_buffer_t *>(&buf[GORILLA_BUFFER_SLOTS]); -void bit_code_reader_u32_init(bit_code_reader_u32_t *bcr, uint32_t *buffer, uint32_t capacity) { - bit_code_t<uint32_t> *bc = (bit_code_t<uint32_t> *) bcr; - bit_code_init(bc, buffer, capacity); -} + assert(((uintptr_t) (gbuf) % sizeof(uintptr_t)) == 0 && + "Gorilla buffer not aligned to uintptr_t"); -bool bit_code_reader_u32_read(bit_code_reader_u32_t *bcr, uint32_t *number) { - bit_code_t<uint32_t> *bc = (bit_code_t<uint32_t> *) bcr; - return bit_code_read(bc, number); -} + curr_gbuf->header.next = gbuf; + curr_gbuf = curr_gbuf->header.next; -bool bit_code_reader_u32_info(bit_code_reader_u32_t *bcr, uint32_t *num_entries_written, - uint32_t *num_bits_written) { - bit_code_t<uint32_t> *bc = (bit_code_t<uint32_t> *) bcr; - return bit_code_info(bc, num_entries_written, num_bits_written); + n += curr_gbuf->header.entries; + } + + return n; } -// 64-bit API +gorilla_reader_t gorilla_writer_get_reader(const gorilla_writer_t *gw) +{ + const gorilla_buffer_t *buffer = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST); -void bit_code_writer_u64_init(bit_code_writer_u64_t *bcw, uint64_t *buffer, uint64_t capacity) { - bit_code_t<uint64_t> *bc = (bit_code_t<uint64_t> *) bcw; - bit_code_init(bc, buffer, capacity); -} + uint32_t entries = __atomic_load_n(&buffer->header.entries, __ATOMIC_SEQ_CST); + uint32_t capacity = __atomic_load_n(&buffer->header.nbits, __ATOMIC_SEQ_CST); -bool bit_code_writer_u64_write(bit_code_writer_u64_t *bcw, const uint64_t number) { - bit_code_t<uint64_t> *bc = (bit_code_t<uint64_t> *) bcw; - return bit_code_write(bc, number); + return gorilla_reader_t { + .buffer = buffer, + .entries = entries, + .index = 0, + .capacity = capacity, + .position = 0, + .prev_number = 0, + .prev_xor_lzc = 0, + .prev_xor = 0, + }; } -bool bit_code_writer_u64_flush(bit_code_writer_u64_t *bcw) { - bit_code_t<uint64_t> *bc = (bit_code_t<uint64_t> *) bcw; - return bit_code_flush(bc); -} +gorilla_reader_t gorilla_reader_init(gorilla_buffer_t *gbuf) +{ + uint32_t entries = __atomic_load_n(&gbuf->header.entries, __ATOMIC_SEQ_CST); + uint32_t capacity = __atomic_load_n(&gbuf->header.nbits, __ATOMIC_SEQ_CST); + + return gorilla_reader_t { + .buffer = gbuf, + .entries = entries, + .index = 0, + .capacity = capacity, + .position = 0, + .prev_number = 0, + .prev_xor_lzc = 0, + .prev_xor = 0, + }; +} + +bool gorilla_reader_read(gorilla_reader_t *gr, uint32_t *number) +{ + const uint32_t *data = gr->buffer->data; + + if (gr->index + 1 > gr->entries) { + // We don't have any more entries to return. However, the writer + // might have updated the buffer's entries. We need to check once + // more in case more elements were added. + gr->entries = __atomic_load_n(&gr->buffer->header.entries, __ATOMIC_SEQ_CST); + gr->capacity = __atomic_load_n(&gr->buffer->header.nbits, __ATOMIC_SEQ_CST); + + // if the reader's current buffer has not been updated, we need to + // check if it has a pointer to a next buffer. + if (gr->index + 1 > gr->entries) { + gorilla_buffer_t *next_buffer = __atomic_load_n(&gr->buffer->header.next, __ATOMIC_SEQ_CST); + + if (!next_buffer) { + // fprintf(stderr, "Consumed reader with %zu entries from buffer %p\n (No more buffers to read from)", gr->length, gr->buffer); + return false; + } + + // fprintf(stderr, "Consumed reader with %zu entries from buffer %p\n", gr->length, gr->buffer); + *gr = gorilla_reader_init(next_buffer); + return gorilla_reader_read(gr, number); + } + } -void bit_code_reader_u64_init(bit_code_reader_u64_t *bcr, uint64_t *buffer, uint64_t capacity) { - bit_code_t<uint64_t> *bc = (bit_code_t<uint64_t> *) bcr; - bit_code_init(bc, buffer, capacity); -} + // read the first number + if (gr->index == 0) { + bit_buffer_read(data, gr->position, number, bit_size<uint32_t>()); -bool bit_code_reader_u64_read(bit_code_reader_u64_t *bcr, uint64_t *number) { - bit_code_t<uint64_t> *bc = (bit_code_t<uint64_t> *) bcr; - return bit_code_read(bc, number); -} + gr->index++; + gr->position += bit_size<uint32_t>(); + gr->prev_number = *number; + return true; + } -bool bit_code_reader_u64_info(bit_code_reader_u64_t *bcr, uint64_t *num_entries_written, - uint64_t *num_bits_written) { - bit_code_t<uint64_t> *bc = (bit_code_t<uint64_t> *) bcr; - return bit_code_info(bc, num_entries_written, num_bits_written); -} + // process same-number bit + uint32_t is_same_number; + bit_buffer_read(data, gr->position, &is_same_number, 1); + gr->position++; -/* - * High-level public API -*/ + if (is_same_number) { + *number = gr->prev_number; + gr->index++; + return true; + } -// 32-bit API + // proceess same-xor-lzc bit + uint32_t xor_lzc = gr->prev_xor_lzc; -size_t gorilla_encode_u32(uint32_t *dst, size_t dst_len, const uint32_t *src, size_t src_len) { - return gorilla_encode(dst, (uint32_t) dst_len, src, (uint32_t) src_len); -} + uint32_t same_xor_lzc; + bit_buffer_read(data, gr->position, &same_xor_lzc, 1); + gr->position++; -size_t gorilla_decode_u32(uint32_t *dst, size_t dst_len, const uint32_t *src, size_t src_len) { - return gorilla_decode(dst, (uint32_t) dst_len, src, (uint32_t) src_len); -} + if (!same_xor_lzc) { + bit_buffer_read(data, gr->position, &xor_lzc, (bit_size<uint32_t>() == 32) ? 5 : 6); + gr->position += (bit_size<uint32_t>() == 32) ? 5 : 6; + } -// 64-bit API + // process the non-lzc suffix + uint32_t xor_value = 0; + bit_buffer_read(data, gr->position, &xor_value, bit_size<uint32_t>() - xor_lzc); + gr->position += bit_size<uint32_t>() - xor_lzc; -size_t gorilla_encode_u64(uint64_t *dst, size_t dst_len, const uint64_t *src, size_t src_len) { - return gorilla_encode(dst, (uint64_t) dst_len, src, (uint64_t) src_len); -} + *number = (gr->prev_number ^ xor_value); + + gr->index++; + gr->prev_number = *number; + gr->prev_xor_lzc = xor_lzc; + gr->prev_xor = xor_value; -size_t gorilla_decode_u64(uint64_t *dst, size_t dst_len, const uint64_t *src, size_t src_len) { - return gorilla_decode(dst, (uint64_t) dst_len, src, (uint64_t) src_len); + return true; } /* @@ -451,54 +382,69 @@ static std::vector<Word> random_vector(const uint8_t *data, size_t size) { return V; } -template<typename Word> -static void check_equal_buffers(Word *lhs, Word lhs_size, Word *rhs, Word rhs_size) { - assert((lhs_size == rhs_size) && "Buffers have different size."); +class Storage { +public: + gorilla_buffer_t *alloc_buffer(size_t words) { + uint32_t *new_buffer = new uint32_t[words](); + assert(((((uintptr_t) new_buffer) % 8u) == 0) && "Unaligned buffer..."); + Buffers.push_back(new_buffer); + return reinterpret_cast<gorilla_buffer_t *>(new_buffer); + } - for (size_t i = 0; i != lhs_size; i++) { - assert((lhs[i] == rhs[i]) && "Buffers differ"); + void free_buffers() { + for (uint32_t *buffer : Buffers) { + delete[] buffer; + } } -} + +private: + std::vector<uint32_t *> Buffers; +}; extern "C" int LLVMFuzzerTestOneInput(const uint8_t *Data, size_t Size) { - // 32-bit tests - { - if (Size < 4) - return 0; - - std::vector<uint32_t> RandomData = random_vector<uint32_t>(Data, Size); - std::vector<uint32_t> EncodedData(10 * RandomData.capacity(), 0); - std::vector<uint32_t> DecodedData(10 * RandomData.capacity(), 0); - - size_t num_entries_written = gorilla_encode_u32(EncodedData.data(), EncodedData.size(), - RandomData.data(), RandomData.size()); - size_t num_entries_read = gorilla_decode_u32(DecodedData.data(), DecodedData.size(), - EncodedData.data(), EncodedData.size()); - - assert(num_entries_written == num_entries_read); - check_equal_buffers(RandomData.data(), (uint32_t) RandomData.size(), - DecodedData.data(), (uint32_t) RandomData.size()); + if (Size < 4) + return 0; + + std::vector<uint32_t> RandomData = random_vector<uint32_t>(Data, Size); + + Storage S; + size_t words_per_buffer = 8; + + /* + * write data + */ + gorilla_buffer_t *first_buffer = S.alloc_buffer(words_per_buffer); + gorilla_writer_t gw = gorilla_writer_init(first_buffer, words_per_buffer); + + for (size_t i = 0; i != RandomData.size(); i++) { + bool ok = gorilla_writer_write(&gw, RandomData[i]); + if (ok) + continue; + + // add new buffer + gorilla_buffer_t *buffer = S.alloc_buffer(words_per_buffer); + gorilla_writer_add_buffer(&gw, buffer, words_per_buffer); + + ok = gorilla_writer_write(&gw, RandomData[i]); + assert(ok && "Could not write data to new buffer!!!"); } - // 64-bit tests - { - if (Size < 8) - return 0; - std::vector<uint64_t> RandomData = random_vector<uint64_t>(Data, Size); - std::vector<uint64_t> EncodedData(10 * RandomData.capacity(), 0); - std::vector<uint64_t> DecodedData(10 * RandomData.capacity(), 0); + /* + * read data + */ + gorilla_reader_t gr = gorilla_writer_get_reader(&gw); - size_t num_entries_written = gorilla_encode_u64(EncodedData.data(), EncodedData.size(), - RandomData.data(), RandomData.size()); - size_t num_entries_read = gorilla_decode_u64(DecodedData.data(), DecodedData.size(), - EncodedData.data(), EncodedData.size()); + for (size_t i = 0; i != RandomData.size(); i++) { + uint32_t number = 0; + bool ok = gorilla_reader_read(&gr, &number); + assert(ok && "Failed to read number from gorilla buffer"); - assert(num_entries_written == num_entries_read); - check_equal_buffers(RandomData.data(), (uint64_t) RandomData.size(), - DecodedData.data(), (uint64_t) RandomData.size()); + assert((number == RandomData[i]) + && "Read wrong number from gorilla buffer"); } + S.free_buffers(); return 0; } @@ -523,17 +469,20 @@ static void BM_EncodeU32Numbers(benchmark::State& state) { std::vector<uint32_t> EncodedData(10 * RandomData.capacity(), 0); for (auto _ : state) { - benchmark::DoNotOptimize( - gorilla_encode_u32(EncodedData.data(), EncodedData.size(), - RandomData.data(), RandomData.size()) - ); + gorilla_writer_t gw = gorilla_writer_init( + reinterpret_cast<gorilla_buffer_t *>(EncodedData.data()), + EncodedData.size()); + + for (size_t i = 0; i != RandomData.size(); i++) + benchmark::DoNotOptimize(gorilla_writer_write(&gw, RandomData[i])); + benchmark::ClobberMemory(); } state.SetItemsProcessed(NumItems * state.iterations()); state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint32_t)); } -BENCHMARK(BM_EncodeU32Numbers); +BENCHMARK(BM_EncodeU32Numbers)->ThreadRange(1, 16)->UseRealTime(); static void BM_DecodeU32Numbers(benchmark::State& state) { std::random_device rd; @@ -547,74 +496,27 @@ static void BM_DecodeU32Numbers(benchmark::State& state) { std::vector<uint32_t> EncodedData(10 * RandomData.capacity(), 0); std::vector<uint32_t> DecodedData(10 * RandomData.capacity(), 0); - gorilla_encode_u32(EncodedData.data(), EncodedData.size(), - RandomData.data(), RandomData.size()); - - for (auto _ : state) { - benchmark::DoNotOptimize( - gorilla_decode_u32(DecodedData.data(), DecodedData.size(), - EncodedData.data(), EncodedData.size()) - ); - benchmark::ClobberMemory(); - } - - state.SetItemsProcessed(NumItems * state.iterations()); - state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint32_t)); -} -// Register the function as a benchmark -BENCHMARK(BM_DecodeU32Numbers); - -static void BM_EncodeU64Numbers(benchmark::State& state) { - std::random_device rd; - std::mt19937 mt(rd()); - std::uniform_int_distribution<uint64_t> dist(0x0, 0x0000FFFF); + gorilla_writer_t gw = gorilla_writer_init( + reinterpret_cast<gorilla_buffer_t *>(EncodedData.data()), + EncodedData.size()); - std::vector<uint64_t> RandomData; - for (size_t idx = 0; idx != 1024; idx++) { - RandomData.push_back(dist(mt)); - } - std::vector<uint64_t> EncodedData(10 * RandomData.capacity(), 0); + for (size_t i = 0; i != RandomData.size(); i++) + gorilla_writer_write(&gw, RandomData[i]); for (auto _ : state) { - benchmark::DoNotOptimize( - gorilla_encode_u64(EncodedData.data(), EncodedData.size(), - RandomData.data(), RandomData.size()) - ); - benchmark::ClobberMemory(); - } + gorilla_reader_t gr = gorilla_reader_init(reinterpret_cast<gorilla_buffer_t *>(EncodedData.data())); - state.SetItemsProcessed(NumItems * state.iterations()); - state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint64_t)); -} -BENCHMARK(BM_EncodeU64Numbers); - -static void BM_DecodeU64Numbers(benchmark::State& state) { - std::random_device rd; - std::mt19937 mt(rd()); - std::uniform_int_distribution<uint64_t> dist(0x0, 0xFFFFFFFF); - - std::vector<uint64_t> RandomData; - for (size_t idx = 0; idx != 1024; idx++) { - RandomData.push_back(dist(mt)); - } - std::vector<uint64_t> EncodedData(10 * RandomData.capacity(), 0); - std::vector<uint64_t> DecodedData(10 * RandomData.capacity(), 0); - - gorilla_encode_u64(EncodedData.data(), EncodedData.size(), - RandomData.data(), RandomData.size()); + for (size_t i = 0; i != RandomData.size(); i++) { + uint32_t number = 0; + benchmark::DoNotOptimize(gorilla_reader_read(&gr, &number)); + } - for (auto _ : state) { - benchmark::DoNotOptimize( - gorilla_decode_u64(DecodedData.data(), DecodedData.size(), - EncodedData.data(), EncodedData.size()) - ); benchmark::ClobberMemory(); } state.SetItemsProcessed(NumItems * state.iterations()); - state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint64_t)); + state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint32_t)); } -// Register the function as a benchmark -BENCHMARK(BM_DecodeU64Numbers); +BENCHMARK(BM_DecodeU32Numbers)->ThreadRange(1, 16)->UseRealTime(); #endif /* ENABLE_BENCHMARK */ diff --git a/libnetdata/gorilla/gorilla.h b/libnetdata/gorilla/gorilla.h index 12bec42c0..d57c07cf0 100644 --- a/libnetdata/gorilla/gorilla.h +++ b/libnetdata/gorilla/gorilla.h @@ -11,47 +11,64 @@ extern "C" { #endif -/* - * Low-level public API -*/ +struct gorilla_buffer; -// 32-bit API +typedef struct { + struct gorilla_buffer *next; + uint32_t entries; + uint32_t nbits; +} gorilla_header_t; -typedef struct bit_code_writer_u32 bit_code_writer_u32_t; -typedef struct bit_code_reader_u32 bit_code_reader_u32_t; +typedef struct gorilla_buffer { + gorilla_header_t header; + uint32_t data[]; +} gorilla_buffer_t; -void bit_code_writer_u32_init(bit_code_writer_u32_t *bcw, uint32_t *buffer, uint32_t capacity); -bool bit_code_writer_u32_write(bit_code_writer_u32_t *bcw, const uint32_t number); -bool bit_code_writer_u32_flush(bit_code_writer_u32_t *bcw); +typedef struct { + gorilla_buffer_t *head_buffer; + gorilla_buffer_t *last_buffer; -void bit_code_reader_u32_init(bit_code_reader_u32_t *bcr, uint32_t *buffer, uint32_t capacity); -bool bit_code_reader_u32_read(bit_code_reader_u32_t *bcr, uint32_t *number); -bool bit_code_reader_u32_info(bit_code_reader_u32_t *bcr, uint32_t *num_entries_written, - uint64_t *num_bits_written); + uint32_t prev_number; + uint32_t prev_xor_lzc; -// 64-bit API + // in bits + uint32_t capacity; +} gorilla_writer_t; -typedef struct bit_code_writer_u64 bit_code_writer_u64_t; -typedef struct bit_code_reader_u64 bit_code_reader_u64_t; +typedef struct { + const gorilla_buffer_t *buffer; -void bit_code_writer_u64_init(bit_code_writer_u64_t *bcw, uint64_t *buffer, uint64_t capacity); -bool bit_code_writer_u64_write(bit_code_writer_u64_t *bcw, const uint64_t number); -bool bit_code_writer_u64_flush(bit_code_writer_u64_t *bcw); + // number of values + size_t entries; + size_t index; -void bit_code_reader_u64_init(bit_code_reader_u64_t *bcr, uint64_t *buffer, uint64_t capacity); -bool bit_code_reader_u64_read(bit_code_reader_u64_t *bcr, uint64_t *number); -bool bit_code_reader_u64_info(bit_code_reader_u64_t *bcr, uint64_t *num_entries_written, - uint64_t *num_bits_written); + // in bits + size_t capacity; // FIXME: this not needed on the reader's side + size_t position; -/* - * High-level public API -*/ + uint32_t prev_number; + uint32_t prev_xor_lzc; + uint32_t prev_xor; +} gorilla_reader_t; -size_t gorilla_encode_u32(uint32_t *dst, size_t dst_len, const uint32_t *src, size_t src_len); -size_t gorilla_decode_u32(uint32_t *dst, size_t dst_len, const uint32_t *src, size_t src_len); +gorilla_writer_t gorilla_writer_init(gorilla_buffer_t *gbuf, size_t n); +void gorilla_writer_add_buffer(gorilla_writer_t *gw, gorilla_buffer_t *gbuf, size_t n); +bool gorilla_writer_write(gorilla_writer_t *gw, uint32_t number); +uint32_t gorilla_writer_entries(const gorilla_writer_t *gw); -size_t gorilla_encode_u64(uint64_t *dst, size_t dst_len, const uint64_t *src, size_t src_len); -size_t gorilla_decode_u64(uint64_t *dst, size_t dst_len, const uint64_t *src, size_t src_len); +gorilla_reader_t gorilla_writer_get_reader(const gorilla_writer_t *gw); + +gorilla_buffer_t *gorilla_writer_drop_head_buffer(gorilla_writer_t *gw); + +uint32_t gorilla_writer_nbytes(const gorilla_writer_t *gw); +bool gorilla_writer_serialize(const gorilla_writer_t *gw, uint8_t *dst, uint32_t dst_size); + +uint32_t gorilla_buffer_patch(gorilla_buffer_t *buf); +gorilla_reader_t gorilla_reader_init(gorilla_buffer_t *buf); +bool gorilla_reader_read(gorilla_reader_t *gr, uint32_t *number); + +#define GORILLA_BUFFER_SLOTS 128 +#define GORILLA_BUFFER_SIZE (GORILLA_BUFFER_SLOTS * sizeof(uint32_t)) #ifdef __cplusplus } |