summaryrefslogtreecommitdiffstats
path: root/libnetdata/gorilla
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:22 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:22 +0000
commitc21c3b0befeb46a51b6bf3758ffa30813bea0ff0 (patch)
tree9754ff1ca740f6346cf8483ec915d4054bc5da2d /libnetdata/gorilla
parentAdding upstream version 1.43.2. (diff)
downloadnetdata-upstream/1.44.3.tar.xz
netdata-upstream/1.44.3.zip
Adding upstream version 1.44.3.upstream/1.44.3
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--libnetdata/gorilla/README.md39
-rwxr-xr-xlibnetdata/gorilla/fuzzer.sh2
-rw-r--r--libnetdata/gorilla/gorilla.cc726
-rw-r--r--libnetdata/gorilla/gorilla.h77
4 files changed, 401 insertions, 443 deletions
diff --git a/libnetdata/gorilla/README.md b/libnetdata/gorilla/README.md
new file mode 100644
index 000000000..dc3718d13
--- /dev/null
+++ b/libnetdata/gorilla/README.md
@@ -0,0 +1,39 @@
+# Gorilla compression and decompression
+
+This provides an alternative way of representing values stored in database
+pages. Instead of allocating and using a page of fixed size, ie. 4096 bytes,
+the Gorilla implementation adds support for dynamically sized pages that
+contain a variable number of Gorilla buffers.
+
+Each buffer takes 512 bytes and compresses incoming data using the Gorilla
+compression:
+
+- The very first value is stored as it is.
+- For each new value, Gorilla compression doesn't store the value itself. Instead,
+it computes the difference (XOR) between the new value and the previous value.
+- If the XOR result is zero (meaning the new value is identical to the previous
+value), we store just a single bit set to `1`.
+- If the XOR result is not zero (meaning the new value differs from the previous):
+ - We store a `0` bit to indicate the change.
+ - We compute the leading-zero count (LZC) of the XOR result, and compare it
+ with the previous LZC. If the two LZCs are equal we store a `1` bit.
+ - If the LZCs are different we use 5 bits to store the new LZC, and we store
+ the rest of the value (ie. without its LZC) in the buffer.
+
+A Gorilla page can have multiple Gorilla buffers. If the values of a metric
+are highly compressible, just one Gorilla buffer is able to store all the values
+that otherwise would require a regular 4096 byte page, ie. we can use just 512
+bytes instead. In the worst case scenario (for metrics whose values are not
+compressible at all), a Gorilla page might end up having `9` Gorilla buffers,
+consuming 4608 bytes. In practice, this is pretty rare and does not negate
+the effect of compression for the metrics.
+
+When a gorilla page is full, ie. it contains 1024 slots/values, we serialize
+the linked-list of gorilla buffers directly to disk. During deserialization,
+eg. when performing a DBEngine query, the Gorilla page is loaded from the disk and
+its linked-list entries are patched to point to the new memory allocated for
+serving the query results.
+
+Overall, on a real-agent the Gorilla compression scheme reduces memory
+consumption approximately by ~30%, which can be several GiB of RAM for parents
+having hundreds, or even thousands of children streaming to them.
diff --git a/libnetdata/gorilla/fuzzer.sh b/libnetdata/gorilla/fuzzer.sh
index 9dfdec055..19098a615 100755
--- a/libnetdata/gorilla/fuzzer.sh
+++ b/libnetdata/gorilla/fuzzer.sh
@@ -11,4 +11,4 @@ clang++ \
-fsanitize=fuzzer \
-o gorilla_fuzzer gorilla.cc
-./gorilla_fuzzer -workers=8 -jobs=8
+./gorilla_fuzzer -workers=12 -jobs=16
diff --git a/libnetdata/gorilla/gorilla.cc b/libnetdata/gorilla/gorilla.cc
index af2f74007..e6138ce38 100644
--- a/libnetdata/gorilla/gorilla.cc
+++ b/libnetdata/gorilla/gorilla.cc
@@ -17,413 +17,344 @@ static constexpr size_t bit_size() noexcept
return (sizeof(T) * CHAR_BIT);
}
-/*
- * Low-level bitstream operations, allowing us to read/write individual bits.
-*/
-
-template<typename Word>
-struct bit_stream_t {
- Word *buffer;
- size_t capacity;
- size_t position;
-};
-
-template<typename Word>
-static bit_stream_t<Word> bit_stream_new(Word *buffer, Word capacity) {
- bit_stream_t<Word> bs;
-
- bs.buffer = buffer;
- bs.capacity = capacity * bit_size<Word>();
- bs.position = 0;
-
- return bs;
-}
-
-template<typename Word>
-static bool bit_stream_write(bit_stream_t<Word> *bs, Word value, size_t nbits) {
- assert(nbits > 0 && nbits <= bit_size<Word>());
- assert(bs->capacity >= (bs->position + nbits));
+static void bit_buffer_write(uint32_t *buf, size_t pos, uint32_t v, size_t nbits)
+{
+ assert(nbits > 0 && nbits <= bit_size<uint32_t>());
- if (bs->position + nbits > bs->capacity) {
- return false;
- }
+ const size_t index = pos / bit_size<uint32_t>();
+ const size_t offset = pos % bit_size<uint32_t>();
- const size_t index = bs->position / bit_size<Word>();
- const size_t offset = bs->position % bit_size<Word>();
- bs->position += nbits;
+ pos += nbits;
if (offset == 0) {
- bs->buffer[index] = value;
+ buf[index] = v;
} else {
- const size_t remaining_bits = bit_size<Word>() - offset;
+ const size_t remaining_bits = bit_size<uint32_t>() - offset;
// write the lower part of the value
- const Word low_bits_mask = ((Word) 1 << remaining_bits) - 1;
- const Word lowest_bits_in_value = value & low_bits_mask;
- bs->buffer[index] |= (lowest_bits_in_value << offset);
+ const uint32_t low_bits_mask = ((uint32_t) 1 << remaining_bits) - 1;
+ const uint32_t lowest_bits_in_value = v & low_bits_mask;
+ buf[index] |= (lowest_bits_in_value << offset);
if (nbits > remaining_bits) {
// write the upper part of the value
- const Word high_bits_mask = ~low_bits_mask;
- const Word highest_bits_in_value = (value & high_bits_mask) >> (remaining_bits);
- bs->buffer[index + 1] = highest_bits_in_value;
+ const uint32_t high_bits_mask = ~low_bits_mask;
+ const uint32_t highest_bits_in_value = (v & high_bits_mask) >> (remaining_bits);
+ buf[index + 1] = highest_bits_in_value;
}
}
-
- return true;
}
-template<typename Word>
-static bool bit_stream_read(bit_stream_t<Word> *bs, Word *value, size_t nbits) {
- assert(nbits > 0 && nbits <= bit_size<Word>());
- assert(bs->capacity >= (bs->position + nbits));
+static void bit_buffer_read(const uint32_t *buf, size_t pos, uint32_t *v, size_t nbits)
+{
+ assert(nbits > 0 && nbits <= bit_size<uint32_t>());
- if (bs->position + nbits > bs->capacity) {
- return false;
- }
+ const size_t index = pos / bit_size<uint32_t>();
+ const size_t offset = pos % bit_size<uint32_t>();
- const size_t index = bs->position / bit_size<Word>();
- const size_t offset = bs->position % bit_size<Word>();
- bs->position += nbits;
+ pos += nbits;
if (offset == 0) {
- *value = (nbits == bit_size<Word>()) ?
- bs->buffer[index] :
- bs->buffer[index] & (((Word) 1 << nbits) - 1);
+ *v = (nbits == bit_size<uint32_t>()) ?
+ buf[index] :
+ buf[index] & (((uint32_t) 1 << nbits) - 1);
} else {
- const size_t remaining_bits = bit_size<Word>() - offset;
+ const size_t remaining_bits = bit_size<uint32_t>() - offset;
// extract the lower part of the value
if (nbits < remaining_bits) {
- *value = (bs->buffer[index] >> offset) & (((Word) 1 << nbits) - 1);
+ *v = (buf[index] >> offset) & (((uint32_t) 1 << nbits) - 1);
} else {
- *value = (bs->buffer[index] >> offset) & (((Word) 1 << remaining_bits) - 1);
+ *v = (buf[index] >> offset) & (((uint32_t) 1 << remaining_bits) - 1);
nbits -= remaining_bits;
- *value |= (bs->buffer[index + 1] & (((Word) 1 << nbits) - 1)) << remaining_bits;
+ *v |= (buf[index + 1] & (((uint32_t) 1 << nbits) - 1)) << remaining_bits;
}
}
-
- return true;
}
-/*
- * High-level Gorilla codec implementation
-*/
-
-template<typename Word>
-struct bit_code_t {
- bit_stream_t<Word> bs;
- Word entries;
- Word prev_number;
- Word prev_xor;
- Word prev_xor_lzc;
-};
-
-template<typename Word>
-static void bit_code_init(bit_code_t<Word> *bc, Word *buffer, Word capacity) {
- bc->bs = bit_stream_new(buffer, capacity);
-
- bc->entries = 0;
- bc->prev_number = 0;
- bc->prev_xor = 0;
- bc->prev_xor_lzc = 0;
-
- // reserved two words:
- // Buffer[0] -> number of entries written
- // Buffer[1] -> number of bits written
+gorilla_writer_t gorilla_writer_init(gorilla_buffer_t *gbuf, size_t n)
+{
+ gorilla_writer_t gw = gorilla_writer_t {
+ .head_buffer = gbuf,
+ .last_buffer = NULL,
+ .prev_number = 0,
+ .prev_xor_lzc = 0,
+ .capacity = 0
+ };
- bc->bs.position += 2 * bit_size<Word>();
+ gorilla_writer_add_buffer(&gw, gbuf, n);
+ return gw;
}
-template<typename Word>
-static bool bit_code_read(bit_code_t<Word> *bc, Word *number) {
- bit_stream_t<Word> *bs = &bc->bs;
-
- bc->entries++;
+void gorilla_writer_add_buffer(gorilla_writer_t *gw, gorilla_buffer_t *gbuf, size_t n)
+{
+ gbuf->header.next = NULL;
+ gbuf->header.entries = 0;
+ gbuf->header.nbits = 0;
- // read the first number
- if (bc->entries == 1) {
- bool ok = bit_stream_read(bs, number, bit_size<Word>());
- bc->prev_number = *number;
- return ok;
- }
+ uint32_t capacity = (n * bit_size<uint32_t>()) - (sizeof(gorilla_header_t) * CHAR_BIT);
- // process same-number bit
- Word is_same_number;
- if (!bit_stream_read(bs, &is_same_number, 1)) {
- return false;
- }
+ gw->prev_number = 0;
+ gw->prev_xor_lzc = 0;
+ gw->capacity = capacity;
- if (is_same_number) {
- *number = bc->prev_number;
- return true;
- }
+ if (gw->last_buffer)
+ gw->last_buffer->header.next = gbuf;
- // proceess same-xor-lzc bit
- Word xor_lzc = bc->prev_xor_lzc;
-
- Word same_xor_lzc;
- if (!bit_stream_read(bs, &same_xor_lzc, 1)) {
- return false;
- }
+ __atomic_store_n(&gw->last_buffer, gbuf, __ATOMIC_RELAXED);
+}
- if (!same_xor_lzc) {
- if (!bit_stream_read(bs, &xor_lzc, (bit_size<Word>() == 32) ? 5 : 6)) {
- return false;
- }
- }
+uint32_t gorilla_writer_entries(const gorilla_writer_t *gw) {
+ uint32_t entries = 0;
- // process the non-lzc suffix
- Word xor_value = 0;
- if (!bit_stream_read(bs, &xor_value, bit_size<Word>() - xor_lzc)) {
- return false;
- }
+ const gorilla_buffer_t *curr_gbuf = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST);
+ do {
+ const gorilla_buffer_t *next_gbuf = __atomic_load_n(&curr_gbuf->header.next, __ATOMIC_SEQ_CST);
- *number = (bc->prev_number ^ xor_value);
+ entries += __atomic_load_n(&curr_gbuf->header.entries, __ATOMIC_SEQ_CST);
- bc->prev_number = *number;
- bc->prev_xor_lzc = xor_lzc;
- bc->prev_xor = xor_value;
+ curr_gbuf = next_gbuf;
+ } while (curr_gbuf);
- return true;
+ return entries;
}
-template<typename Word>
-static bool bit_code_write(bit_code_t<Word> *bc, const Word number) {
- bit_stream_t<Word> *bs = &bc->bs;
- Word position = bs->position;
-
- bc->entries++;
+bool gorilla_writer_write(gorilla_writer_t *gw, uint32_t number)
+{
+ gorilla_header_t *hdr = &gw->last_buffer->header;
+ uint32_t *data = gw->last_buffer->data;
// this is the first number we are writing
- if (bc->entries == 1) {
- bc->prev_number = number;
- return bit_stream_write(bs, number, bit_size<Word>());
- }
-
- // write true/false based on whether we got the same number or not.
- if (number == bc->prev_number) {
- return bit_stream_write(bs, static_cast<Word>(1), 1);
- } else {
- if (bit_stream_write(bs, static_cast<Word>(0), 1) == false) {
+ if (hdr->entries == 0) {
+ if (hdr->nbits + bit_size<uint32_t>() >= gw->capacity)
return false;
- }
- }
-
- // otherwise:
- // - compute the non-zero xor
- // - find its leading-zero count
-
- Word xor_value = bc->prev_number ^ number;
- // FIXME: Use SFINAE
- Word xor_lzc = (bit_size<Word>() == 32) ? __builtin_clz(xor_value) : __builtin_clzll(xor_value);
- Word is_xor_lzc_same = (xor_lzc == bc->prev_xor_lzc) ? 1 : 0;
-
- if (is_xor_lzc_same) {
- // xor-lzc is same
- if (bit_stream_write(bs, static_cast<Word>(1), 1) == false) {
- goto RET_FALSE;
- }
- } else {
- // xor-lzc is different
- if (bit_stream_write(bs, static_cast<Word>(0), 1) == false) {
- goto RET_FALSE;
- }
-
- if (bit_stream_write(bs, xor_lzc, (bit_size<Word>() == 32) ? 5 : 6) == false) {
- goto RET_FALSE;
- }
- }
+ bit_buffer_write(data, hdr->nbits, number, bit_size<uint32_t>());
- // write the bits of the XOR value without the LZC prefix
- if (bit_stream_write(bs, xor_value, bit_size<Word>() - xor_lzc) == false) {
- goto RET_FALSE;
+ __atomic_fetch_add(&hdr->nbits, bit_size<uint32_t>(), __ATOMIC_RELAXED);
+ __atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED);
+ gw->prev_number = number;
+ return true;
}
- bc->prev_number = number;
- bc->prev_xor_lzc = xor_lzc;
- return true;
-
-RET_FALSE:
- bc->bs.position = position;
- return false;
-}
+ // write true/false based on whether we got the same number or not.
+ if (number == gw->prev_number) {
+ if (hdr->nbits + 1 >= gw->capacity)
+ return false;
-// only valid for writers
-template<typename Word>
-static bool bit_code_flush(bit_code_t<Word> *bc) {
- bit_stream_t<Word> *bs = &bc->bs;
+ bit_buffer_write(data, hdr->nbits, static_cast<uint32_t>(1), 1);
+ __atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED);
+ __atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED);
+ return true;
+ }
- Word num_entries_written = bc->entries;
- Word num_bits_written = bs->position;
+ if (hdr->nbits + 1 >= gw->capacity)
+ return false;
+ bit_buffer_write(data, hdr->nbits, static_cast<uint32_t>(0), 1);
+ __atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED);
- // we want to write these at the beginning
- bs->position = 0;
+ uint32_t xor_value = gw->prev_number ^ number;
+ uint32_t xor_lzc = (bit_size<uint32_t>() == 32) ? __builtin_clz(xor_value) : __builtin_clzll(xor_value);
+ uint32_t is_xor_lzc_same = (xor_lzc == gw->prev_xor_lzc) ? 1 : 0;
- if (!bit_stream_write(bs, num_entries_written, bit_size<Word>())) {
+ if (hdr->nbits + 1 >= gw->capacity)
return false;
+ bit_buffer_write(data, hdr->nbits, is_xor_lzc_same, 1);
+ __atomic_fetch_add(&hdr->nbits, 1, __ATOMIC_RELAXED);
+
+ if (!is_xor_lzc_same) {
+ if (hdr->nbits + 1 >= gw->capacity)
+ return false;
+ bit_buffer_write(data, hdr->nbits, xor_lzc, (bit_size<uint32_t>() == 32) ? 5 : 6);
+ __atomic_fetch_add(&hdr->nbits, (bit_size<uint32_t>() == 32) ? 5 : 6, __ATOMIC_RELAXED);
}
- if (!bit_stream_write(bs, num_bits_written, bit_size<Word>())) {
+ // write the bits of the XOR'd value without the LZC prefix
+ if (hdr->nbits + (bit_size<uint32_t>() - xor_lzc) >= gw->capacity)
return false;
- }
+ bit_buffer_write(data, hdr->nbits, xor_value, bit_size<uint32_t>() - xor_lzc);
+ __atomic_fetch_add(&hdr->nbits, bit_size<uint32_t>() - xor_lzc, __ATOMIC_RELAXED);
+ __atomic_fetch_add(&hdr->entries, 1, __ATOMIC_RELAXED);
- bs->position = num_bits_written;
+ gw->prev_number = number;
+ gw->prev_xor_lzc = xor_lzc;
return true;
}
-// only valid for readers
-template<typename Word>
-static bool bit_code_info(bit_code_t<Word> *bc, Word *num_entries_written,
- Word *num_bits_written) {
- bit_stream_t<Word> *bs = &bc->bs;
+gorilla_buffer_t *gorilla_writer_drop_head_buffer(gorilla_writer_t *gw) {
+ if (!gw->head_buffer)
+ return NULL;
- assert(bs->position == 2 * bit_size<Word>());
- if (bs->capacity < (2 * bit_size<Word>())) {
- return false;
- }
-
- if (num_entries_written) {
- *num_entries_written = bs->buffer[0];
- }
- if (num_bits_written) {
- *num_bits_written = bs->buffer[1];
- }
-
- return true;
+ gorilla_buffer_t *curr_head = gw->head_buffer;
+ gorilla_buffer_t *next_head = gw->head_buffer->header.next;
+ __atomic_store_n(&gw->head_buffer, next_head, __ATOMIC_RELAXED);
+ return curr_head;
}
-template<typename Word>
-static size_t gorilla_encode(Word *dst, Word dst_len, const Word *src, Word src_len) {
- bit_code_t<Word> bcw;
+uint32_t gorilla_writer_nbytes(const gorilla_writer_t *gw)
+{
+ uint32_t nbits = 0;
- bit_code_init(&bcw, dst, dst_len);
+ const gorilla_buffer_t *curr_gbuf = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST);
+ do {
+ const gorilla_buffer_t *next_gbuf = __atomic_load_n(&curr_gbuf->header.next, __ATOMIC_SEQ_CST);
- for (size_t i = 0; i != src_len; i++) {
- if (!bit_code_write(&bcw, src[i]))
- return 0;
- }
+ nbits += __atomic_load_n(&curr_gbuf->header.nbits, __ATOMIC_SEQ_CST);
- if (!bit_code_flush(&bcw))
- return 0;
+ curr_gbuf = next_gbuf;
+ } while (curr_gbuf);
- return src_len;
+ return (nbits + (CHAR_BIT - 1)) / CHAR_BIT;
}
-template<typename Word>
-static size_t gorilla_decode(Word *dst, Word dst_len, const Word *src, Word src_len) {
- bit_code_t<Word> bcr;
+bool gorilla_writer_serialize(const gorilla_writer_t *gw, uint8_t *dst, uint32_t dst_size) {
+ const gorilla_buffer_t *curr_gbuf = gw->head_buffer;
- bit_code_init(&bcr, (Word *) src, src_len);
+ do {
+ const gorilla_buffer_t *next_gbuf = curr_gbuf->header.next;
- Word num_entries;
- if (!bit_code_info(&bcr, &num_entries, (Word *) NULL)) {
- return 0;
- }
- if (num_entries > dst_len) {
- return 0;
- }
-
- for (size_t i = 0; i != num_entries; i++) {
- if (!bit_code_read(&bcr, &dst[i]))
- return 0;
- }
+ size_t bytes = GORILLA_BUFFER_SIZE;
+ if (bytes > dst_size)
+ return false;
- return num_entries;
-}
+ memcpy(dst, curr_gbuf, bytes);
+ dst += bytes;
+ dst_size -= bytes;
-/*
- * Low-level public API
-*/
-
-// 32-bit API
+ curr_gbuf = next_gbuf;
+ } while (curr_gbuf);
-void bit_code_writer_u32_init(bit_code_writer_u32_t *bcw, uint32_t *buffer, uint32_t capacity) {
- bit_code_t<uint32_t> *bc = (bit_code_t<uint32_t> *) bcw;
- bit_code_init(bc, buffer, capacity);
+ return true;
}
-bool bit_code_writer_u32_write(bit_code_writer_u32_t *bcw, const uint32_t number) {
- bit_code_t<uint32_t> *bc = (bit_code_t<uint32_t> *) bcw;
- return bit_code_write(bc, number);
-}
+uint32_t gorilla_buffer_patch(gorilla_buffer_t *gbuf) {
+ gorilla_buffer_t *curr_gbuf = gbuf;
+ uint32_t n = curr_gbuf->header.entries;
-bool bit_code_writer_u32_flush(bit_code_writer_u32_t *bcw) {
- bit_code_t<uint32_t> *bc = (bit_code_t<uint32_t> *) bcw;
- return bit_code_flush(bc);
-}
+ while (curr_gbuf->header.next) {
+ uint32_t *buf = reinterpret_cast<uint32_t *>(gbuf);
+ gbuf = reinterpret_cast<gorilla_buffer_t *>(&buf[GORILLA_BUFFER_SLOTS]);
-void bit_code_reader_u32_init(bit_code_reader_u32_t *bcr, uint32_t *buffer, uint32_t capacity) {
- bit_code_t<uint32_t> *bc = (bit_code_t<uint32_t> *) bcr;
- bit_code_init(bc, buffer, capacity);
-}
+ assert(((uintptr_t) (gbuf) % sizeof(uintptr_t)) == 0 &&
+ "Gorilla buffer not aligned to uintptr_t");
-bool bit_code_reader_u32_read(bit_code_reader_u32_t *bcr, uint32_t *number) {
- bit_code_t<uint32_t> *bc = (bit_code_t<uint32_t> *) bcr;
- return bit_code_read(bc, number);
-}
+ curr_gbuf->header.next = gbuf;
+ curr_gbuf = curr_gbuf->header.next;
-bool bit_code_reader_u32_info(bit_code_reader_u32_t *bcr, uint32_t *num_entries_written,
- uint32_t *num_bits_written) {
- bit_code_t<uint32_t> *bc = (bit_code_t<uint32_t> *) bcr;
- return bit_code_info(bc, num_entries_written, num_bits_written);
+ n += curr_gbuf->header.entries;
+ }
+
+ return n;
}
-// 64-bit API
+gorilla_reader_t gorilla_writer_get_reader(const gorilla_writer_t *gw)
+{
+ const gorilla_buffer_t *buffer = __atomic_load_n(&gw->head_buffer, __ATOMIC_SEQ_CST);
-void bit_code_writer_u64_init(bit_code_writer_u64_t *bcw, uint64_t *buffer, uint64_t capacity) {
- bit_code_t<uint64_t> *bc = (bit_code_t<uint64_t> *) bcw;
- bit_code_init(bc, buffer, capacity);
-}
+ uint32_t entries = __atomic_load_n(&buffer->header.entries, __ATOMIC_SEQ_CST);
+ uint32_t capacity = __atomic_load_n(&buffer->header.nbits, __ATOMIC_SEQ_CST);
-bool bit_code_writer_u64_write(bit_code_writer_u64_t *bcw, const uint64_t number) {
- bit_code_t<uint64_t> *bc = (bit_code_t<uint64_t> *) bcw;
- return bit_code_write(bc, number);
+ return gorilla_reader_t {
+ .buffer = buffer,
+ .entries = entries,
+ .index = 0,
+ .capacity = capacity,
+ .position = 0,
+ .prev_number = 0,
+ .prev_xor_lzc = 0,
+ .prev_xor = 0,
+ };
}
-bool bit_code_writer_u64_flush(bit_code_writer_u64_t *bcw) {
- bit_code_t<uint64_t> *bc = (bit_code_t<uint64_t> *) bcw;
- return bit_code_flush(bc);
-}
+gorilla_reader_t gorilla_reader_init(gorilla_buffer_t *gbuf)
+{
+ uint32_t entries = __atomic_load_n(&gbuf->header.entries, __ATOMIC_SEQ_CST);
+ uint32_t capacity = __atomic_load_n(&gbuf->header.nbits, __ATOMIC_SEQ_CST);
+
+ return gorilla_reader_t {
+ .buffer = gbuf,
+ .entries = entries,
+ .index = 0,
+ .capacity = capacity,
+ .position = 0,
+ .prev_number = 0,
+ .prev_xor_lzc = 0,
+ .prev_xor = 0,
+ };
+}
+
+bool gorilla_reader_read(gorilla_reader_t *gr, uint32_t *number)
+{
+ const uint32_t *data = gr->buffer->data;
+
+ if (gr->index + 1 > gr->entries) {
+ // We don't have any more entries to return. However, the writer
+ // might have updated the buffer's entries. We need to check once
+ // more in case more elements were added.
+ gr->entries = __atomic_load_n(&gr->buffer->header.entries, __ATOMIC_SEQ_CST);
+ gr->capacity = __atomic_load_n(&gr->buffer->header.nbits, __ATOMIC_SEQ_CST);
+
+ // if the reader's current buffer has not been updated, we need to
+ // check if it has a pointer to a next buffer.
+ if (gr->index + 1 > gr->entries) {
+ gorilla_buffer_t *next_buffer = __atomic_load_n(&gr->buffer->header.next, __ATOMIC_SEQ_CST);
+
+ if (!next_buffer) {
+ // fprintf(stderr, "Consumed reader with %zu entries from buffer %p\n (No more buffers to read from)", gr->length, gr->buffer);
+ return false;
+ }
+
+ // fprintf(stderr, "Consumed reader with %zu entries from buffer %p\n", gr->length, gr->buffer);
+ *gr = gorilla_reader_init(next_buffer);
+ return gorilla_reader_read(gr, number);
+ }
+ }
-void bit_code_reader_u64_init(bit_code_reader_u64_t *bcr, uint64_t *buffer, uint64_t capacity) {
- bit_code_t<uint64_t> *bc = (bit_code_t<uint64_t> *) bcr;
- bit_code_init(bc, buffer, capacity);
-}
+ // read the first number
+ if (gr->index == 0) {
+ bit_buffer_read(data, gr->position, number, bit_size<uint32_t>());
-bool bit_code_reader_u64_read(bit_code_reader_u64_t *bcr, uint64_t *number) {
- bit_code_t<uint64_t> *bc = (bit_code_t<uint64_t> *) bcr;
- return bit_code_read(bc, number);
-}
+ gr->index++;
+ gr->position += bit_size<uint32_t>();
+ gr->prev_number = *number;
+ return true;
+ }
-bool bit_code_reader_u64_info(bit_code_reader_u64_t *bcr, uint64_t *num_entries_written,
- uint64_t *num_bits_written) {
- bit_code_t<uint64_t> *bc = (bit_code_t<uint64_t> *) bcr;
- return bit_code_info(bc, num_entries_written, num_bits_written);
-}
+ // process same-number bit
+ uint32_t is_same_number;
+ bit_buffer_read(data, gr->position, &is_same_number, 1);
+ gr->position++;
-/*
- * High-level public API
-*/
+ if (is_same_number) {
+ *number = gr->prev_number;
+ gr->index++;
+ return true;
+ }
-// 32-bit API
+ // proceess same-xor-lzc bit
+ uint32_t xor_lzc = gr->prev_xor_lzc;
-size_t gorilla_encode_u32(uint32_t *dst, size_t dst_len, const uint32_t *src, size_t src_len) {
- return gorilla_encode(dst, (uint32_t) dst_len, src, (uint32_t) src_len);
-}
+ uint32_t same_xor_lzc;
+ bit_buffer_read(data, gr->position, &same_xor_lzc, 1);
+ gr->position++;
-size_t gorilla_decode_u32(uint32_t *dst, size_t dst_len, const uint32_t *src, size_t src_len) {
- return gorilla_decode(dst, (uint32_t) dst_len, src, (uint32_t) src_len);
-}
+ if (!same_xor_lzc) {
+ bit_buffer_read(data, gr->position, &xor_lzc, (bit_size<uint32_t>() == 32) ? 5 : 6);
+ gr->position += (bit_size<uint32_t>() == 32) ? 5 : 6;
+ }
-// 64-bit API
+ // process the non-lzc suffix
+ uint32_t xor_value = 0;
+ bit_buffer_read(data, gr->position, &xor_value, bit_size<uint32_t>() - xor_lzc);
+ gr->position += bit_size<uint32_t>() - xor_lzc;
-size_t gorilla_encode_u64(uint64_t *dst, size_t dst_len, const uint64_t *src, size_t src_len) {
- return gorilla_encode(dst, (uint64_t) dst_len, src, (uint64_t) src_len);
-}
+ *number = (gr->prev_number ^ xor_value);
+
+ gr->index++;
+ gr->prev_number = *number;
+ gr->prev_xor_lzc = xor_lzc;
+ gr->prev_xor = xor_value;
-size_t gorilla_decode_u64(uint64_t *dst, size_t dst_len, const uint64_t *src, size_t src_len) {
- return gorilla_decode(dst, (uint64_t) dst_len, src, (uint64_t) src_len);
+ return true;
}
/*
@@ -451,54 +382,69 @@ static std::vector<Word> random_vector(const uint8_t *data, size_t size) {
return V;
}
-template<typename Word>
-static void check_equal_buffers(Word *lhs, Word lhs_size, Word *rhs, Word rhs_size) {
- assert((lhs_size == rhs_size) && "Buffers have different size.");
+class Storage {
+public:
+ gorilla_buffer_t *alloc_buffer(size_t words) {
+ uint32_t *new_buffer = new uint32_t[words]();
+ assert(((((uintptr_t) new_buffer) % 8u) == 0) && "Unaligned buffer...");
+ Buffers.push_back(new_buffer);
+ return reinterpret_cast<gorilla_buffer_t *>(new_buffer);
+ }
- for (size_t i = 0; i != lhs_size; i++) {
- assert((lhs[i] == rhs[i]) && "Buffers differ");
+ void free_buffers() {
+ for (uint32_t *buffer : Buffers) {
+ delete[] buffer;
+ }
}
-}
+
+private:
+ std::vector<uint32_t *> Buffers;
+};
extern "C" int LLVMFuzzerTestOneInput(const uint8_t *Data, size_t Size) {
- // 32-bit tests
- {
- if (Size < 4)
- return 0;
-
- std::vector<uint32_t> RandomData = random_vector<uint32_t>(Data, Size);
- std::vector<uint32_t> EncodedData(10 * RandomData.capacity(), 0);
- std::vector<uint32_t> DecodedData(10 * RandomData.capacity(), 0);
-
- size_t num_entries_written = gorilla_encode_u32(EncodedData.data(), EncodedData.size(),
- RandomData.data(), RandomData.size());
- size_t num_entries_read = gorilla_decode_u32(DecodedData.data(), DecodedData.size(),
- EncodedData.data(), EncodedData.size());
-
- assert(num_entries_written == num_entries_read);
- check_equal_buffers(RandomData.data(), (uint32_t) RandomData.size(),
- DecodedData.data(), (uint32_t) RandomData.size());
+ if (Size < 4)
+ return 0;
+
+ std::vector<uint32_t> RandomData = random_vector<uint32_t>(Data, Size);
+
+ Storage S;
+ size_t words_per_buffer = 8;
+
+ /*
+ * write data
+ */
+ gorilla_buffer_t *first_buffer = S.alloc_buffer(words_per_buffer);
+ gorilla_writer_t gw = gorilla_writer_init(first_buffer, words_per_buffer);
+
+ for (size_t i = 0; i != RandomData.size(); i++) {
+ bool ok = gorilla_writer_write(&gw, RandomData[i]);
+ if (ok)
+ continue;
+
+ // add new buffer
+ gorilla_buffer_t *buffer = S.alloc_buffer(words_per_buffer);
+ gorilla_writer_add_buffer(&gw, buffer, words_per_buffer);
+
+ ok = gorilla_writer_write(&gw, RandomData[i]);
+ assert(ok && "Could not write data to new buffer!!!");
}
- // 64-bit tests
- {
- if (Size < 8)
- return 0;
- std::vector<uint64_t> RandomData = random_vector<uint64_t>(Data, Size);
- std::vector<uint64_t> EncodedData(10 * RandomData.capacity(), 0);
- std::vector<uint64_t> DecodedData(10 * RandomData.capacity(), 0);
+ /*
+ * read data
+ */
+ gorilla_reader_t gr = gorilla_writer_get_reader(&gw);
- size_t num_entries_written = gorilla_encode_u64(EncodedData.data(), EncodedData.size(),
- RandomData.data(), RandomData.size());
- size_t num_entries_read = gorilla_decode_u64(DecodedData.data(), DecodedData.size(),
- EncodedData.data(), EncodedData.size());
+ for (size_t i = 0; i != RandomData.size(); i++) {
+ uint32_t number = 0;
+ bool ok = gorilla_reader_read(&gr, &number);
+ assert(ok && "Failed to read number from gorilla buffer");
- assert(num_entries_written == num_entries_read);
- check_equal_buffers(RandomData.data(), (uint64_t) RandomData.size(),
- DecodedData.data(), (uint64_t) RandomData.size());
+ assert((number == RandomData[i])
+ && "Read wrong number from gorilla buffer");
}
+ S.free_buffers();
return 0;
}
@@ -523,17 +469,20 @@ static void BM_EncodeU32Numbers(benchmark::State& state) {
std::vector<uint32_t> EncodedData(10 * RandomData.capacity(), 0);
for (auto _ : state) {
- benchmark::DoNotOptimize(
- gorilla_encode_u32(EncodedData.data(), EncodedData.size(),
- RandomData.data(), RandomData.size())
- );
+ gorilla_writer_t gw = gorilla_writer_init(
+ reinterpret_cast<gorilla_buffer_t *>(EncodedData.data()),
+ EncodedData.size());
+
+ for (size_t i = 0; i != RandomData.size(); i++)
+ benchmark::DoNotOptimize(gorilla_writer_write(&gw, RandomData[i]));
+
benchmark::ClobberMemory();
}
state.SetItemsProcessed(NumItems * state.iterations());
state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint32_t));
}
-BENCHMARK(BM_EncodeU32Numbers);
+BENCHMARK(BM_EncodeU32Numbers)->ThreadRange(1, 16)->UseRealTime();
static void BM_DecodeU32Numbers(benchmark::State& state) {
std::random_device rd;
@@ -547,74 +496,27 @@ static void BM_DecodeU32Numbers(benchmark::State& state) {
std::vector<uint32_t> EncodedData(10 * RandomData.capacity(), 0);
std::vector<uint32_t> DecodedData(10 * RandomData.capacity(), 0);
- gorilla_encode_u32(EncodedData.data(), EncodedData.size(),
- RandomData.data(), RandomData.size());
-
- for (auto _ : state) {
- benchmark::DoNotOptimize(
- gorilla_decode_u32(DecodedData.data(), DecodedData.size(),
- EncodedData.data(), EncodedData.size())
- );
- benchmark::ClobberMemory();
- }
-
- state.SetItemsProcessed(NumItems * state.iterations());
- state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint32_t));
-}
-// Register the function as a benchmark
-BENCHMARK(BM_DecodeU32Numbers);
-
-static void BM_EncodeU64Numbers(benchmark::State& state) {
- std::random_device rd;
- std::mt19937 mt(rd());
- std::uniform_int_distribution<uint64_t> dist(0x0, 0x0000FFFF);
+ gorilla_writer_t gw = gorilla_writer_init(
+ reinterpret_cast<gorilla_buffer_t *>(EncodedData.data()),
+ EncodedData.size());
- std::vector<uint64_t> RandomData;
- for (size_t idx = 0; idx != 1024; idx++) {
- RandomData.push_back(dist(mt));
- }
- std::vector<uint64_t> EncodedData(10 * RandomData.capacity(), 0);
+ for (size_t i = 0; i != RandomData.size(); i++)
+ gorilla_writer_write(&gw, RandomData[i]);
for (auto _ : state) {
- benchmark::DoNotOptimize(
- gorilla_encode_u64(EncodedData.data(), EncodedData.size(),
- RandomData.data(), RandomData.size())
- );
- benchmark::ClobberMemory();
- }
+ gorilla_reader_t gr = gorilla_reader_init(reinterpret_cast<gorilla_buffer_t *>(EncodedData.data()));
- state.SetItemsProcessed(NumItems * state.iterations());
- state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint64_t));
-}
-BENCHMARK(BM_EncodeU64Numbers);
-
-static void BM_DecodeU64Numbers(benchmark::State& state) {
- std::random_device rd;
- std::mt19937 mt(rd());
- std::uniform_int_distribution<uint64_t> dist(0x0, 0xFFFFFFFF);
-
- std::vector<uint64_t> RandomData;
- for (size_t idx = 0; idx != 1024; idx++) {
- RandomData.push_back(dist(mt));
- }
- std::vector<uint64_t> EncodedData(10 * RandomData.capacity(), 0);
- std::vector<uint64_t> DecodedData(10 * RandomData.capacity(), 0);
-
- gorilla_encode_u64(EncodedData.data(), EncodedData.size(),
- RandomData.data(), RandomData.size());
+ for (size_t i = 0; i != RandomData.size(); i++) {
+ uint32_t number = 0;
+ benchmark::DoNotOptimize(gorilla_reader_read(&gr, &number));
+ }
- for (auto _ : state) {
- benchmark::DoNotOptimize(
- gorilla_decode_u64(DecodedData.data(), DecodedData.size(),
- EncodedData.data(), EncodedData.size())
- );
benchmark::ClobberMemory();
}
state.SetItemsProcessed(NumItems * state.iterations());
- state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint64_t));
+ state.SetBytesProcessed(NumItems * state.iterations() * sizeof(uint32_t));
}
-// Register the function as a benchmark
-BENCHMARK(BM_DecodeU64Numbers);
+BENCHMARK(BM_DecodeU32Numbers)->ThreadRange(1, 16)->UseRealTime();
#endif /* ENABLE_BENCHMARK */
diff --git a/libnetdata/gorilla/gorilla.h b/libnetdata/gorilla/gorilla.h
index 12bec42c0..d57c07cf0 100644
--- a/libnetdata/gorilla/gorilla.h
+++ b/libnetdata/gorilla/gorilla.h
@@ -11,47 +11,64 @@
extern "C" {
#endif
-/*
- * Low-level public API
-*/
+struct gorilla_buffer;
-// 32-bit API
+typedef struct {
+ struct gorilla_buffer *next;
+ uint32_t entries;
+ uint32_t nbits;
+} gorilla_header_t;
-typedef struct bit_code_writer_u32 bit_code_writer_u32_t;
-typedef struct bit_code_reader_u32 bit_code_reader_u32_t;
+typedef struct gorilla_buffer {
+ gorilla_header_t header;
+ uint32_t data[];
+} gorilla_buffer_t;
-void bit_code_writer_u32_init(bit_code_writer_u32_t *bcw, uint32_t *buffer, uint32_t capacity);
-bool bit_code_writer_u32_write(bit_code_writer_u32_t *bcw, const uint32_t number);
-bool bit_code_writer_u32_flush(bit_code_writer_u32_t *bcw);
+typedef struct {
+ gorilla_buffer_t *head_buffer;
+ gorilla_buffer_t *last_buffer;
-void bit_code_reader_u32_init(bit_code_reader_u32_t *bcr, uint32_t *buffer, uint32_t capacity);
-bool bit_code_reader_u32_read(bit_code_reader_u32_t *bcr, uint32_t *number);
-bool bit_code_reader_u32_info(bit_code_reader_u32_t *bcr, uint32_t *num_entries_written,
- uint64_t *num_bits_written);
+ uint32_t prev_number;
+ uint32_t prev_xor_lzc;
-// 64-bit API
+ // in bits
+ uint32_t capacity;
+} gorilla_writer_t;
-typedef struct bit_code_writer_u64 bit_code_writer_u64_t;
-typedef struct bit_code_reader_u64 bit_code_reader_u64_t;
+typedef struct {
+ const gorilla_buffer_t *buffer;
-void bit_code_writer_u64_init(bit_code_writer_u64_t *bcw, uint64_t *buffer, uint64_t capacity);
-bool bit_code_writer_u64_write(bit_code_writer_u64_t *bcw, const uint64_t number);
-bool bit_code_writer_u64_flush(bit_code_writer_u64_t *bcw);
+ // number of values
+ size_t entries;
+ size_t index;
-void bit_code_reader_u64_init(bit_code_reader_u64_t *bcr, uint64_t *buffer, uint64_t capacity);
-bool bit_code_reader_u64_read(bit_code_reader_u64_t *bcr, uint64_t *number);
-bool bit_code_reader_u64_info(bit_code_reader_u64_t *bcr, uint64_t *num_entries_written,
- uint64_t *num_bits_written);
+ // in bits
+ size_t capacity; // FIXME: this not needed on the reader's side
+ size_t position;
-/*
- * High-level public API
-*/
+ uint32_t prev_number;
+ uint32_t prev_xor_lzc;
+ uint32_t prev_xor;
+} gorilla_reader_t;
-size_t gorilla_encode_u32(uint32_t *dst, size_t dst_len, const uint32_t *src, size_t src_len);
-size_t gorilla_decode_u32(uint32_t *dst, size_t dst_len, const uint32_t *src, size_t src_len);
+gorilla_writer_t gorilla_writer_init(gorilla_buffer_t *gbuf, size_t n);
+void gorilla_writer_add_buffer(gorilla_writer_t *gw, gorilla_buffer_t *gbuf, size_t n);
+bool gorilla_writer_write(gorilla_writer_t *gw, uint32_t number);
+uint32_t gorilla_writer_entries(const gorilla_writer_t *gw);
-size_t gorilla_encode_u64(uint64_t *dst, size_t dst_len, const uint64_t *src, size_t src_len);
-size_t gorilla_decode_u64(uint64_t *dst, size_t dst_len, const uint64_t *src, size_t src_len);
+gorilla_reader_t gorilla_writer_get_reader(const gorilla_writer_t *gw);
+
+gorilla_buffer_t *gorilla_writer_drop_head_buffer(gorilla_writer_t *gw);
+
+uint32_t gorilla_writer_nbytes(const gorilla_writer_t *gw);
+bool gorilla_writer_serialize(const gorilla_writer_t *gw, uint8_t *dst, uint32_t dst_size);
+
+uint32_t gorilla_buffer_patch(gorilla_buffer_t *buf);
+gorilla_reader_t gorilla_reader_init(gorilla_buffer_t *buf);
+bool gorilla_reader_read(gorilla_reader_t *gr, uint32_t *number);
+
+#define GORILLA_BUFFER_SLOTS 128
+#define GORILLA_BUFFER_SIZE (GORILLA_BUFFER_SLOTS * sizeof(uint32_t))
#ifdef __cplusplus
}