diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c | 450 |
1 files changed, 450 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c new file mode 100644 index 000000000..b52108bb1 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c @@ -0,0 +1,450 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2017 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rdkafka_int.h" +#include "rdkafka_lz4.h" + +#if WITH_LZ4_EXT +#include <lz4frame.h> +#else +#include "lz4frame.h" +#endif +#include "rdxxhash.h" + +#include "rdbuf.h" + +/** + * Fix-up bad LZ4 framing caused by buggy Kafka client / broker. + * The LZ4F framing format is described in detail here: + * https://github.com/lz4/lz4/blob/master/doc/lz4_Frame_format.md + * + * NOTE: This modifies 'inbuf'. + * + * Returns an error on failure to fix (nothing modified), else NO_ERROR. + */ +static rd_kafka_resp_err_t +rd_kafka_lz4_decompress_fixup_bad_framing(rd_kafka_broker_t *rkb, + char *inbuf, + size_t inlen) { + static const char magic[4] = {0x04, 0x22, 0x4d, 0x18}; + uint8_t FLG, HC, correct_HC; + size_t of = 4; + + /* Format is: + * int32_t magic; + * int8_t_ FLG; + * int8_t BD; + * [ int64_t contentSize; ] + * int8_t HC; + */ + if (inlen < 4 + 3 || memcmp(inbuf, magic, 4)) { + rd_rkb_dbg(rkb, BROKER, "LZ4FIXUP", + "Unable to fix-up legacy LZ4 framing " + "(%" PRIusz " bytes): invalid length or magic value", + inlen); + return RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + } + + of = 4; /* past magic */ + FLG = inbuf[of++]; + of++; /* BD */ + + if ((FLG >> 3) & 1) /* contentSize */ + of += 8; + + if (of >= inlen) { + rd_rkb_dbg(rkb, BROKER, "LZ4FIXUP", + "Unable to fix-up legacy LZ4 framing " + "(%" PRIusz " bytes): requires %" PRIusz " bytes", + inlen, of); + return RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + } + + /* Header hash code */ + HC = inbuf[of]; + + /* Calculate correct header hash code */ + correct_HC = (XXH32(inbuf + 4, of - 4, 0) >> 8) & 0xff; + + if (HC != correct_HC) + inbuf[of] = correct_HC; + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * Reverse of fix-up: break LZ4 framing caused to be compatbile with with + * buggy Kafka client / broker. + * + * NOTE: This modifies 'outbuf'. + * + * Returns an error on failure to recognize format (nothing modified), + * else NO_ERROR. + */ +static rd_kafka_resp_err_t +rd_kafka_lz4_compress_break_framing(rd_kafka_broker_t *rkb, + char *outbuf, + size_t outlen) { + static const char magic[4] = {0x04, 0x22, 0x4d, 0x18}; + uint8_t FLG, HC, bad_HC; + size_t of = 4; + + /* Format is: + * int32_t magic; + * int8_t_ FLG; + * int8_t BD; + * [ int64_t contentSize; ] + * int8_t HC; + */ + if (outlen < 4 + 3 || memcmp(outbuf, magic, 4)) { + rd_rkb_dbg(rkb, BROKER, "LZ4FIXDOWN", + "Unable to break legacy LZ4 framing " + "(%" PRIusz " bytes): invalid length or magic value", + outlen); + return RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + } + + of = 4; /* past magic */ + FLG = outbuf[of++]; + of++; /* BD */ + + if ((FLG >> 3) & 1) /* contentSize */ + of += 8; + + if (of >= outlen) { + rd_rkb_dbg(rkb, BROKER, "LZ4FIXUP", + "Unable to break legacy LZ4 framing " + "(%" PRIusz " bytes): requires %" PRIusz " bytes", + outlen, of); + return RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + } + + /* Header hash code */ + HC = outbuf[of]; + + /* Calculate bad header hash code (include magic) */ + bad_HC = (XXH32(outbuf, of, 0) >> 8) & 0xff; + + if (HC != bad_HC) + outbuf[of] = bad_HC; + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + + +/** + * @brief Decompress LZ4F (framed) data. + * Kafka broker versions <0.10.0.0 (MsgVersion 0) breaks LZ4 framing + * checksum, if \p proper_hc we assume the checksum is okay + * (broker version >=0.10.0, MsgVersion >= 1) else we fix it up. + * + * @remark May modify \p inbuf (if not \p proper_hc) + */ +rd_kafka_resp_err_t rd_kafka_lz4_decompress(rd_kafka_broker_t *rkb, + int proper_hc, + int64_t Offset, + char *inbuf, + size_t inlen, + void **outbuf, + size_t *outlenp) { + LZ4F_errorCode_t code; + LZ4F_decompressionContext_t dctx; + LZ4F_frameInfo_t fi; + size_t in_sz, out_sz; + size_t in_of, out_of; + size_t r; + size_t estimated_uncompressed_size; + size_t outlen; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + char *out = NULL; + + *outbuf = NULL; + + code = LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION); + if (LZ4F_isError(code)) { + rd_rkb_dbg(rkb, BROKER, "LZ4DECOMPR", + "Unable to create LZ4 decompression context: %s", + LZ4F_getErrorName(code)); + return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; + } + + if (!proper_hc) { + /* The original/legacy LZ4 framing in Kafka was buggy and + * calculated the LZ4 framing header hash code (HC) incorrectly. + * We do a fix-up of it here. */ + if ((err = rd_kafka_lz4_decompress_fixup_bad_framing(rkb, inbuf, + inlen))) + goto done; + } + + in_sz = inlen; + r = LZ4F_getFrameInfo(dctx, &fi, (const void *)inbuf, &in_sz); + if (LZ4F_isError(r)) { + rd_rkb_dbg(rkb, BROKER, "LZ4DECOMPR", + "Failed to gather LZ4 frame info: %s", + LZ4F_getErrorName(r)); + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + goto done; + } + + /* If uncompressed size is unknown or out of bounds, use a sane + * default (4x compression) and reallocate if needed + * More info on max size: http://stackoverflow.com/a/25751871/1821055 + * More info on lz4 compression ratios seen for different data sets: + * http://dev.ti.com/tirex/content/simplelink_msp432p4_sdk_1_50_00_12/docs/lz4/users_guide/docguide.llQpgm/benchmarking.html + */ + if (fi.contentSize == 0 || fi.contentSize > inlen * 255) { + estimated_uncompressed_size = RD_MIN( + inlen * 4, (size_t)(rkb->rkb_rk->rk_conf.max_msg_size)); + } else { + estimated_uncompressed_size = (size_t)fi.contentSize; + } + + /* Allocate output buffer, we increase this later if needed, + * but hopefully not. */ + out = rd_malloc(estimated_uncompressed_size); + if (!out) { + rd_rkb_log(rkb, LOG_WARNING, "LZ4DEC", + "Unable to allocate decompression " + "buffer of %" PRIusz " bytes: %s", + estimated_uncompressed_size, rd_strerror(errno)); + err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; + goto done; + } + + + /* Decompress input buffer to output buffer until input is exhausted. */ + outlen = estimated_uncompressed_size; + in_of = in_sz; + out_of = 0; + while (in_of < inlen) { + out_sz = outlen - out_of; + in_sz = inlen - in_of; + r = LZ4F_decompress(dctx, out + out_of, &out_sz, inbuf + in_of, + &in_sz, NULL); + if (unlikely(LZ4F_isError(r))) { + rd_rkb_dbg(rkb, MSG, "LZ4DEC", + "Failed to LZ4 (%s HC) decompress message " + "(offset %" PRId64 + ") at " + "payload offset %" PRIusz "/%" PRIusz ": %s", + proper_hc ? "proper" : "legacy", Offset, + in_of, inlen, LZ4F_getErrorName(r)); + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + goto done; + } + + rd_kafka_assert(NULL, out_of + out_sz <= outlen && + in_of + in_sz <= inlen); + out_of += out_sz; + in_of += in_sz; + if (r == 0) + break; + + /* Need to grow output buffer, this shouldn't happen if + * contentSize was properly set. */ + if (unlikely(out_of == outlen)) { + char *tmp; + /* Grow exponentially with some factor > 1 (using 1.75) + * for amortized O(1) copying */ + size_t extra = RD_MAX(outlen * 3 / 4, 1024); + + rd_atomic64_add(&rkb->rkb_c.zbuf_grow, 1); + + if (!(tmp = rd_realloc(out, outlen + extra))) { + rd_rkb_log(rkb, LOG_WARNING, "LZ4DEC", + "Unable to grow decompression " + "buffer to %" PRIusz "+%" PRIusz + " bytes: %s", + outlen, extra, rd_strerror(errno)); + err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; + goto done; + } + out = tmp; + outlen += extra; + } + } + + + if (in_of < inlen) { + rd_rkb_dbg(rkb, MSG, "LZ4DEC", + "Failed to LZ4 (%s HC) decompress message " + "(offset %" PRId64 + "): " + "%" PRIusz " (out of %" PRIusz ") bytes remaining", + proper_hc ? "proper" : "legacy", Offset, + inlen - in_of, inlen); + err = RD_KAFKA_RESP_ERR__BAD_MSG; + goto done; + } + + *outbuf = out; + *outlenp = out_of; + +done: + code = LZ4F_freeDecompressionContext(dctx); + if (LZ4F_isError(code)) { + rd_rkb_dbg(rkb, BROKER, "LZ4DECOMPR", + "Failed to close LZ4 compression context: %s", + LZ4F_getErrorName(code)); + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + } + + if (err && out) + rd_free(out); + + return err; +} + + +/** + * Allocate space for \p *outbuf and compress all \p iovlen buffers in \p iov. + * @param proper_hc generate a proper HC (checksum) (kafka >=0.10.0.0, + * MsgVersion >= 1) + * @param MessageSetSize indicates (at least) full uncompressed data size, + * possibly including MessageSet fields that will not + * be compressed. + * + * @returns allocated buffer in \p *outbuf, length in \p *outlenp. + */ +rd_kafka_resp_err_t rd_kafka_lz4_compress(rd_kafka_broker_t *rkb, + int proper_hc, + int comp_level, + rd_slice_t *slice, + void **outbuf, + size_t *outlenp) { + LZ4F_compressionContext_t cctx; + LZ4F_errorCode_t r; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + size_t len = rd_slice_remains(slice); + size_t out_sz; + size_t out_of = 0; + char *out; + const void *p; + size_t rlen; + + /* Required by Kafka */ + const LZ4F_preferences_t prefs = { + .frameInfo = {.blockMode = LZ4F_blockIndependent}, + .compressionLevel = comp_level}; + + *outbuf = NULL; + + out_sz = LZ4F_compressBound(len, NULL) + 1000; + if (LZ4F_isError(out_sz)) { + rd_rkb_dbg(rkb, MSG, "LZ4COMPR", + "Unable to query LZ4 compressed size " + "(for %" PRIusz " uncompressed bytes): %s", + len, LZ4F_getErrorName(out_sz)); + return RD_KAFKA_RESP_ERR__BAD_MSG; + } + + out = rd_malloc(out_sz); + if (!out) { + rd_rkb_dbg(rkb, MSG, "LZ4COMPR", + "Unable to allocate output buffer " + "(%" PRIusz " bytes): %s", + out_sz, rd_strerror(errno)); + return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; + } + + r = LZ4F_createCompressionContext(&cctx, LZ4F_VERSION); + if (LZ4F_isError(r)) { + rd_rkb_dbg(rkb, MSG, "LZ4COMPR", + "Unable to create LZ4 compression context: %s", + LZ4F_getErrorName(r)); + rd_free(out); + return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; + } + + r = LZ4F_compressBegin(cctx, out, out_sz, &prefs); + if (LZ4F_isError(r)) { + rd_rkb_dbg(rkb, MSG, "LZ4COMPR", + "Unable to begin LZ4 compression " + "(out buffer is %" PRIusz " bytes): %s", + out_sz, LZ4F_getErrorName(r)); + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + goto done; + } + + out_of += r; + + while ((rlen = rd_slice_reader(slice, &p))) { + rd_assert(out_of < out_sz); + r = LZ4F_compressUpdate(cctx, out + out_of, out_sz - out_of, p, + rlen, NULL); + if (unlikely(LZ4F_isError(r))) { + rd_rkb_dbg(rkb, MSG, "LZ4COMPR", + "LZ4 compression failed " + "(at of %" PRIusz + " bytes, with " + "%" PRIusz + " bytes remaining in out buffer): " + "%s", + rlen, out_sz - out_of, LZ4F_getErrorName(r)); + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + goto done; + } + + out_of += r; + } + + rd_assert(rd_slice_remains(slice) == 0); + + r = LZ4F_compressEnd(cctx, out + out_of, out_sz - out_of, NULL); + if (unlikely(LZ4F_isError(r))) { + rd_rkb_dbg(rkb, MSG, "LZ4COMPR", + "Failed to finalize LZ4 compression " + "of %" PRIusz " bytes: %s", + len, LZ4F_getErrorName(r)); + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + goto done; + } + + out_of += r; + + /* For the broken legacy framing we need to mess up the header checksum + * so that the Kafka client / broker code accepts it. */ + if (!proper_hc) + if ((err = + rd_kafka_lz4_compress_break_framing(rkb, out, out_of))) + goto done; + + + *outbuf = out; + *outlenp = out_of; + +done: + LZ4F_freeCompressionContext(cctx); + + if (err) + rd_free(out); + + return err; +} |