summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c450
1 files changed, 450 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c
new file mode 100644
index 000000000..b52108bb1
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c
@@ -0,0 +1,450 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2017 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_lz4.h"
+
+#if WITH_LZ4_EXT
+#include <lz4frame.h>
+#else
+#include "lz4frame.h"
+#endif
+#include "rdxxhash.h"
+
+#include "rdbuf.h"
+
+/**
+ * Fix-up bad LZ4 framing caused by buggy Kafka client / broker.
+ * The LZ4F framing format is described in detail here:
+ * https://github.com/lz4/lz4/blob/master/doc/lz4_Frame_format.md
+ *
+ * NOTE: This modifies 'inbuf'.
+ *
+ * Returns an error on failure to fix (nothing modified), else NO_ERROR.
+ */
+static rd_kafka_resp_err_t
+rd_kafka_lz4_decompress_fixup_bad_framing(rd_kafka_broker_t *rkb,
+ char *inbuf,
+ size_t inlen) {
+ static const char magic[4] = {0x04, 0x22, 0x4d, 0x18};
+ uint8_t FLG, HC, correct_HC;
+ size_t of = 4;
+
+ /* Format is:
+ * int32_t magic;
+ * int8_t_ FLG;
+ * int8_t BD;
+ * [ int64_t contentSize; ]
+ * int8_t HC;
+ */
+ if (inlen < 4 + 3 || memcmp(inbuf, magic, 4)) {
+ rd_rkb_dbg(rkb, BROKER, "LZ4FIXUP",
+ "Unable to fix-up legacy LZ4 framing "
+ "(%" PRIusz " bytes): invalid length or magic value",
+ inlen);
+ return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ }
+
+ of = 4; /* past magic */
+ FLG = inbuf[of++];
+ of++; /* BD */
+
+ if ((FLG >> 3) & 1) /* contentSize */
+ of += 8;
+
+ if (of >= inlen) {
+ rd_rkb_dbg(rkb, BROKER, "LZ4FIXUP",
+ "Unable to fix-up legacy LZ4 framing "
+ "(%" PRIusz " bytes): requires %" PRIusz " bytes",
+ inlen, of);
+ return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ }
+
+ /* Header hash code */
+ HC = inbuf[of];
+
+ /* Calculate correct header hash code */
+ correct_HC = (XXH32(inbuf + 4, of - 4, 0) >> 8) & 0xff;
+
+ if (HC != correct_HC)
+ inbuf[of] = correct_HC;
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * Reverse of fix-up: break LZ4 framing caused to be compatbile with with
+ * buggy Kafka client / broker.
+ *
+ * NOTE: This modifies 'outbuf'.
+ *
+ * Returns an error on failure to recognize format (nothing modified),
+ * else NO_ERROR.
+ */
+static rd_kafka_resp_err_t
+rd_kafka_lz4_compress_break_framing(rd_kafka_broker_t *rkb,
+ char *outbuf,
+ size_t outlen) {
+ static const char magic[4] = {0x04, 0x22, 0x4d, 0x18};
+ uint8_t FLG, HC, bad_HC;
+ size_t of = 4;
+
+ /* Format is:
+ * int32_t magic;
+ * int8_t_ FLG;
+ * int8_t BD;
+ * [ int64_t contentSize; ]
+ * int8_t HC;
+ */
+ if (outlen < 4 + 3 || memcmp(outbuf, magic, 4)) {
+ rd_rkb_dbg(rkb, BROKER, "LZ4FIXDOWN",
+ "Unable to break legacy LZ4 framing "
+ "(%" PRIusz " bytes): invalid length or magic value",
+ outlen);
+ return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ }
+
+ of = 4; /* past magic */
+ FLG = outbuf[of++];
+ of++; /* BD */
+
+ if ((FLG >> 3) & 1) /* contentSize */
+ of += 8;
+
+ if (of >= outlen) {
+ rd_rkb_dbg(rkb, BROKER, "LZ4FIXUP",
+ "Unable to break legacy LZ4 framing "
+ "(%" PRIusz " bytes): requires %" PRIusz " bytes",
+ outlen, of);
+ return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ }
+
+ /* Header hash code */
+ HC = outbuf[of];
+
+ /* Calculate bad header hash code (include magic) */
+ bad_HC = (XXH32(outbuf, of, 0) >> 8) & 0xff;
+
+ if (HC != bad_HC)
+ outbuf[of] = bad_HC;
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+/**
+ * @brief Decompress LZ4F (framed) data.
+ * Kafka broker versions <0.10.0.0 (MsgVersion 0) breaks LZ4 framing
+ * checksum, if \p proper_hc we assume the checksum is okay
+ * (broker version >=0.10.0, MsgVersion >= 1) else we fix it up.
+ *
+ * @remark May modify \p inbuf (if not \p proper_hc)
+ */
+rd_kafka_resp_err_t rd_kafka_lz4_decompress(rd_kafka_broker_t *rkb,
+ int proper_hc,
+ int64_t Offset,
+ char *inbuf,
+ size_t inlen,
+ void **outbuf,
+ size_t *outlenp) {
+ LZ4F_errorCode_t code;
+ LZ4F_decompressionContext_t dctx;
+ LZ4F_frameInfo_t fi;
+ size_t in_sz, out_sz;
+ size_t in_of, out_of;
+ size_t r;
+ size_t estimated_uncompressed_size;
+ size_t outlen;
+ rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ char *out = NULL;
+
+ *outbuf = NULL;
+
+ code = LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION);
+ if (LZ4F_isError(code)) {
+ rd_rkb_dbg(rkb, BROKER, "LZ4DECOMPR",
+ "Unable to create LZ4 decompression context: %s",
+ LZ4F_getErrorName(code));
+ return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
+ }
+
+ if (!proper_hc) {
+ /* The original/legacy LZ4 framing in Kafka was buggy and
+ * calculated the LZ4 framing header hash code (HC) incorrectly.
+ * We do a fix-up of it here. */
+ if ((err = rd_kafka_lz4_decompress_fixup_bad_framing(rkb, inbuf,
+ inlen)))
+ goto done;
+ }
+
+ in_sz = inlen;
+ r = LZ4F_getFrameInfo(dctx, &fi, (const void *)inbuf, &in_sz);
+ if (LZ4F_isError(r)) {
+ rd_rkb_dbg(rkb, BROKER, "LZ4DECOMPR",
+ "Failed to gather LZ4 frame info: %s",
+ LZ4F_getErrorName(r));
+ err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ goto done;
+ }
+
+ /* If uncompressed size is unknown or out of bounds, use a sane
+ * default (4x compression) and reallocate if needed
+ * More info on max size: http://stackoverflow.com/a/25751871/1821055
+ * More info on lz4 compression ratios seen for different data sets:
+ * http://dev.ti.com/tirex/content/simplelink_msp432p4_sdk_1_50_00_12/docs/lz4/users_guide/docguide.llQpgm/benchmarking.html
+ */
+ if (fi.contentSize == 0 || fi.contentSize > inlen * 255) {
+ estimated_uncompressed_size = RD_MIN(
+ inlen * 4, (size_t)(rkb->rkb_rk->rk_conf.max_msg_size));
+ } else {
+ estimated_uncompressed_size = (size_t)fi.contentSize;
+ }
+
+ /* Allocate output buffer, we increase this later if needed,
+ * but hopefully not. */
+ out = rd_malloc(estimated_uncompressed_size);
+ if (!out) {
+ rd_rkb_log(rkb, LOG_WARNING, "LZ4DEC",
+ "Unable to allocate decompression "
+ "buffer of %" PRIusz " bytes: %s",
+ estimated_uncompressed_size, rd_strerror(errno));
+ err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
+ goto done;
+ }
+
+
+ /* Decompress input buffer to output buffer until input is exhausted. */
+ outlen = estimated_uncompressed_size;
+ in_of = in_sz;
+ out_of = 0;
+ while (in_of < inlen) {
+ out_sz = outlen - out_of;
+ in_sz = inlen - in_of;
+ r = LZ4F_decompress(dctx, out + out_of, &out_sz, inbuf + in_of,
+ &in_sz, NULL);
+ if (unlikely(LZ4F_isError(r))) {
+ rd_rkb_dbg(rkb, MSG, "LZ4DEC",
+ "Failed to LZ4 (%s HC) decompress message "
+ "(offset %" PRId64
+ ") at "
+ "payload offset %" PRIusz "/%" PRIusz ": %s",
+ proper_hc ? "proper" : "legacy", Offset,
+ in_of, inlen, LZ4F_getErrorName(r));
+ err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ goto done;
+ }
+
+ rd_kafka_assert(NULL, out_of + out_sz <= outlen &&
+ in_of + in_sz <= inlen);
+ out_of += out_sz;
+ in_of += in_sz;
+ if (r == 0)
+ break;
+
+ /* Need to grow output buffer, this shouldn't happen if
+ * contentSize was properly set. */
+ if (unlikely(out_of == outlen)) {
+ char *tmp;
+ /* Grow exponentially with some factor > 1 (using 1.75)
+ * for amortized O(1) copying */
+ size_t extra = RD_MAX(outlen * 3 / 4, 1024);
+
+ rd_atomic64_add(&rkb->rkb_c.zbuf_grow, 1);
+
+ if (!(tmp = rd_realloc(out, outlen + extra))) {
+ rd_rkb_log(rkb, LOG_WARNING, "LZ4DEC",
+ "Unable to grow decompression "
+ "buffer to %" PRIusz "+%" PRIusz
+ " bytes: %s",
+ outlen, extra, rd_strerror(errno));
+ err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
+ goto done;
+ }
+ out = tmp;
+ outlen += extra;
+ }
+ }
+
+
+ if (in_of < inlen) {
+ rd_rkb_dbg(rkb, MSG, "LZ4DEC",
+ "Failed to LZ4 (%s HC) decompress message "
+ "(offset %" PRId64
+ "): "
+ "%" PRIusz " (out of %" PRIusz ") bytes remaining",
+ proper_hc ? "proper" : "legacy", Offset,
+ inlen - in_of, inlen);
+ err = RD_KAFKA_RESP_ERR__BAD_MSG;
+ goto done;
+ }
+
+ *outbuf = out;
+ *outlenp = out_of;
+
+done:
+ code = LZ4F_freeDecompressionContext(dctx);
+ if (LZ4F_isError(code)) {
+ rd_rkb_dbg(rkb, BROKER, "LZ4DECOMPR",
+ "Failed to close LZ4 compression context: %s",
+ LZ4F_getErrorName(code));
+ err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ }
+
+ if (err && out)
+ rd_free(out);
+
+ return err;
+}
+
+
+/**
+ * Allocate space for \p *outbuf and compress all \p iovlen buffers in \p iov.
+ * @param proper_hc generate a proper HC (checksum) (kafka >=0.10.0.0,
+ * MsgVersion >= 1)
+ * @param MessageSetSize indicates (at least) full uncompressed data size,
+ * possibly including MessageSet fields that will not
+ * be compressed.
+ *
+ * @returns allocated buffer in \p *outbuf, length in \p *outlenp.
+ */
+rd_kafka_resp_err_t rd_kafka_lz4_compress(rd_kafka_broker_t *rkb,
+ int proper_hc,
+ int comp_level,
+ rd_slice_t *slice,
+ void **outbuf,
+ size_t *outlenp) {
+ LZ4F_compressionContext_t cctx;
+ LZ4F_errorCode_t r;
+ rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ size_t len = rd_slice_remains(slice);
+ size_t out_sz;
+ size_t out_of = 0;
+ char *out;
+ const void *p;
+ size_t rlen;
+
+ /* Required by Kafka */
+ const LZ4F_preferences_t prefs = {
+ .frameInfo = {.blockMode = LZ4F_blockIndependent},
+ .compressionLevel = comp_level};
+
+ *outbuf = NULL;
+
+ out_sz = LZ4F_compressBound(len, NULL) + 1000;
+ if (LZ4F_isError(out_sz)) {
+ rd_rkb_dbg(rkb, MSG, "LZ4COMPR",
+ "Unable to query LZ4 compressed size "
+ "(for %" PRIusz " uncompressed bytes): %s",
+ len, LZ4F_getErrorName(out_sz));
+ return RD_KAFKA_RESP_ERR__BAD_MSG;
+ }
+
+ out = rd_malloc(out_sz);
+ if (!out) {
+ rd_rkb_dbg(rkb, MSG, "LZ4COMPR",
+ "Unable to allocate output buffer "
+ "(%" PRIusz " bytes): %s",
+ out_sz, rd_strerror(errno));
+ return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
+ }
+
+ r = LZ4F_createCompressionContext(&cctx, LZ4F_VERSION);
+ if (LZ4F_isError(r)) {
+ rd_rkb_dbg(rkb, MSG, "LZ4COMPR",
+ "Unable to create LZ4 compression context: %s",
+ LZ4F_getErrorName(r));
+ rd_free(out);
+ return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
+ }
+
+ r = LZ4F_compressBegin(cctx, out, out_sz, &prefs);
+ if (LZ4F_isError(r)) {
+ rd_rkb_dbg(rkb, MSG, "LZ4COMPR",
+ "Unable to begin LZ4 compression "
+ "(out buffer is %" PRIusz " bytes): %s",
+ out_sz, LZ4F_getErrorName(r));
+ err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ goto done;
+ }
+
+ out_of += r;
+
+ while ((rlen = rd_slice_reader(slice, &p))) {
+ rd_assert(out_of < out_sz);
+ r = LZ4F_compressUpdate(cctx, out + out_of, out_sz - out_of, p,
+ rlen, NULL);
+ if (unlikely(LZ4F_isError(r))) {
+ rd_rkb_dbg(rkb, MSG, "LZ4COMPR",
+ "LZ4 compression failed "
+ "(at of %" PRIusz
+ " bytes, with "
+ "%" PRIusz
+ " bytes remaining in out buffer): "
+ "%s",
+ rlen, out_sz - out_of, LZ4F_getErrorName(r));
+ err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ goto done;
+ }
+
+ out_of += r;
+ }
+
+ rd_assert(rd_slice_remains(slice) == 0);
+
+ r = LZ4F_compressEnd(cctx, out + out_of, out_sz - out_of, NULL);
+ if (unlikely(LZ4F_isError(r))) {
+ rd_rkb_dbg(rkb, MSG, "LZ4COMPR",
+ "Failed to finalize LZ4 compression "
+ "of %" PRIusz " bytes: %s",
+ len, LZ4F_getErrorName(r));
+ err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ goto done;
+ }
+
+ out_of += r;
+
+ /* For the broken legacy framing we need to mess up the header checksum
+ * so that the Kafka client / broker code accepts it. */
+ if (!proper_hc)
+ if ((err =
+ rd_kafka_lz4_compress_break_framing(rkb, out, out_of)))
+ goto done;
+
+
+ *outbuf = out;
+ *outlenp = out_of;
+
+done:
+ LZ4F_freeCompressionContext(cctx);
+
+ if (err)
+ rd_free(out);
+
+ return err;
+}