diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c | 450 |
1 files changed, 0 insertions, 450 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c deleted file mode 100644 index b52108bb1..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_lz4.c +++ /dev/null @@ -1,450 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2017 Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "rdkafka_int.h" -#include "rdkafka_lz4.h" - -#if WITH_LZ4_EXT -#include <lz4frame.h> -#else -#include "lz4frame.h" -#endif -#include "rdxxhash.h" - -#include "rdbuf.h" - -/** - * Fix-up bad LZ4 framing caused by buggy Kafka client / broker. - * The LZ4F framing format is described in detail here: - * https://github.com/lz4/lz4/blob/master/doc/lz4_Frame_format.md - * - * NOTE: This modifies 'inbuf'. - * - * Returns an error on failure to fix (nothing modified), else NO_ERROR. - */ -static rd_kafka_resp_err_t -rd_kafka_lz4_decompress_fixup_bad_framing(rd_kafka_broker_t *rkb, - char *inbuf, - size_t inlen) { - static const char magic[4] = {0x04, 0x22, 0x4d, 0x18}; - uint8_t FLG, HC, correct_HC; - size_t of = 4; - - /* Format is: - * int32_t magic; - * int8_t_ FLG; - * int8_t BD; - * [ int64_t contentSize; ] - * int8_t HC; - */ - if (inlen < 4 + 3 || memcmp(inbuf, magic, 4)) { - rd_rkb_dbg(rkb, BROKER, "LZ4FIXUP", - "Unable to fix-up legacy LZ4 framing " - "(%" PRIusz " bytes): invalid length or magic value", - inlen); - return RD_KAFKA_RESP_ERR__BAD_COMPRESSION; - } - - of = 4; /* past magic */ - FLG = inbuf[of++]; - of++; /* BD */ - - if ((FLG >> 3) & 1) /* contentSize */ - of += 8; - - if (of >= inlen) { - rd_rkb_dbg(rkb, BROKER, "LZ4FIXUP", - "Unable to fix-up legacy LZ4 framing " - "(%" PRIusz " bytes): requires %" PRIusz " bytes", - inlen, of); - return RD_KAFKA_RESP_ERR__BAD_COMPRESSION; - } - - /* Header hash code */ - HC = inbuf[of]; - - /* Calculate correct header hash code */ - correct_HC = (XXH32(inbuf + 4, of - 4, 0) >> 8) & 0xff; - - if (HC != correct_HC) - inbuf[of] = correct_HC; - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -/** - * Reverse of fix-up: break LZ4 framing caused to be compatbile with with - * buggy Kafka client / broker. - * - * NOTE: This modifies 'outbuf'. - * - * Returns an error on failure to recognize format (nothing modified), - * else NO_ERROR. - */ -static rd_kafka_resp_err_t -rd_kafka_lz4_compress_break_framing(rd_kafka_broker_t *rkb, - char *outbuf, - size_t outlen) { - static const char magic[4] = {0x04, 0x22, 0x4d, 0x18}; - uint8_t FLG, HC, bad_HC; - size_t of = 4; - - /* Format is: - * int32_t magic; - * int8_t_ FLG; - * int8_t BD; - * [ int64_t contentSize; ] - * int8_t HC; - */ - if (outlen < 4 + 3 || memcmp(outbuf, magic, 4)) { - rd_rkb_dbg(rkb, BROKER, "LZ4FIXDOWN", - "Unable to break legacy LZ4 framing " - "(%" PRIusz " bytes): invalid length or magic value", - outlen); - return RD_KAFKA_RESP_ERR__BAD_COMPRESSION; - } - - of = 4; /* past magic */ - FLG = outbuf[of++]; - of++; /* BD */ - - if ((FLG >> 3) & 1) /* contentSize */ - of += 8; - - if (of >= outlen) { - rd_rkb_dbg(rkb, BROKER, "LZ4FIXUP", - "Unable to break legacy LZ4 framing " - "(%" PRIusz " bytes): requires %" PRIusz " bytes", - outlen, of); - return RD_KAFKA_RESP_ERR__BAD_COMPRESSION; - } - - /* Header hash code */ - HC = outbuf[of]; - - /* Calculate bad header hash code (include magic) */ - bad_HC = (XXH32(outbuf, of, 0) >> 8) & 0xff; - - if (HC != bad_HC) - outbuf[of] = bad_HC; - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - - -/** - * @brief Decompress LZ4F (framed) data. - * Kafka broker versions <0.10.0.0 (MsgVersion 0) breaks LZ4 framing - * checksum, if \p proper_hc we assume the checksum is okay - * (broker version >=0.10.0, MsgVersion >= 1) else we fix it up. - * - * @remark May modify \p inbuf (if not \p proper_hc) - */ -rd_kafka_resp_err_t rd_kafka_lz4_decompress(rd_kafka_broker_t *rkb, - int proper_hc, - int64_t Offset, - char *inbuf, - size_t inlen, - void **outbuf, - size_t *outlenp) { - LZ4F_errorCode_t code; - LZ4F_decompressionContext_t dctx; - LZ4F_frameInfo_t fi; - size_t in_sz, out_sz; - size_t in_of, out_of; - size_t r; - size_t estimated_uncompressed_size; - size_t outlen; - rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; - char *out = NULL; - - *outbuf = NULL; - - code = LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION); - if (LZ4F_isError(code)) { - rd_rkb_dbg(rkb, BROKER, "LZ4DECOMPR", - "Unable to create LZ4 decompression context: %s", - LZ4F_getErrorName(code)); - return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; - } - - if (!proper_hc) { - /* The original/legacy LZ4 framing in Kafka was buggy and - * calculated the LZ4 framing header hash code (HC) incorrectly. - * We do a fix-up of it here. */ - if ((err = rd_kafka_lz4_decompress_fixup_bad_framing(rkb, inbuf, - inlen))) - goto done; - } - - in_sz = inlen; - r = LZ4F_getFrameInfo(dctx, &fi, (const void *)inbuf, &in_sz); - if (LZ4F_isError(r)) { - rd_rkb_dbg(rkb, BROKER, "LZ4DECOMPR", - "Failed to gather LZ4 frame info: %s", - LZ4F_getErrorName(r)); - err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; - goto done; - } - - /* If uncompressed size is unknown or out of bounds, use a sane - * default (4x compression) and reallocate if needed - * More info on max size: http://stackoverflow.com/a/25751871/1821055 - * More info on lz4 compression ratios seen for different data sets: - * http://dev.ti.com/tirex/content/simplelink_msp432p4_sdk_1_50_00_12/docs/lz4/users_guide/docguide.llQpgm/benchmarking.html - */ - if (fi.contentSize == 0 || fi.contentSize > inlen * 255) { - estimated_uncompressed_size = RD_MIN( - inlen * 4, (size_t)(rkb->rkb_rk->rk_conf.max_msg_size)); - } else { - estimated_uncompressed_size = (size_t)fi.contentSize; - } - - /* Allocate output buffer, we increase this later if needed, - * but hopefully not. */ - out = rd_malloc(estimated_uncompressed_size); - if (!out) { - rd_rkb_log(rkb, LOG_WARNING, "LZ4DEC", - "Unable to allocate decompression " - "buffer of %" PRIusz " bytes: %s", - estimated_uncompressed_size, rd_strerror(errno)); - err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; - goto done; - } - - - /* Decompress input buffer to output buffer until input is exhausted. */ - outlen = estimated_uncompressed_size; - in_of = in_sz; - out_of = 0; - while (in_of < inlen) { - out_sz = outlen - out_of; - in_sz = inlen - in_of; - r = LZ4F_decompress(dctx, out + out_of, &out_sz, inbuf + in_of, - &in_sz, NULL); - if (unlikely(LZ4F_isError(r))) { - rd_rkb_dbg(rkb, MSG, "LZ4DEC", - "Failed to LZ4 (%s HC) decompress message " - "(offset %" PRId64 - ") at " - "payload offset %" PRIusz "/%" PRIusz ": %s", - proper_hc ? "proper" : "legacy", Offset, - in_of, inlen, LZ4F_getErrorName(r)); - err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; - goto done; - } - - rd_kafka_assert(NULL, out_of + out_sz <= outlen && - in_of + in_sz <= inlen); - out_of += out_sz; - in_of += in_sz; - if (r == 0) - break; - - /* Need to grow output buffer, this shouldn't happen if - * contentSize was properly set. */ - if (unlikely(out_of == outlen)) { - char *tmp; - /* Grow exponentially with some factor > 1 (using 1.75) - * for amortized O(1) copying */ - size_t extra = RD_MAX(outlen * 3 / 4, 1024); - - rd_atomic64_add(&rkb->rkb_c.zbuf_grow, 1); - - if (!(tmp = rd_realloc(out, outlen + extra))) { - rd_rkb_log(rkb, LOG_WARNING, "LZ4DEC", - "Unable to grow decompression " - "buffer to %" PRIusz "+%" PRIusz - " bytes: %s", - outlen, extra, rd_strerror(errno)); - err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; - goto done; - } - out = tmp; - outlen += extra; - } - } - - - if (in_of < inlen) { - rd_rkb_dbg(rkb, MSG, "LZ4DEC", - "Failed to LZ4 (%s HC) decompress message " - "(offset %" PRId64 - "): " - "%" PRIusz " (out of %" PRIusz ") bytes remaining", - proper_hc ? "proper" : "legacy", Offset, - inlen - in_of, inlen); - err = RD_KAFKA_RESP_ERR__BAD_MSG; - goto done; - } - - *outbuf = out; - *outlenp = out_of; - -done: - code = LZ4F_freeDecompressionContext(dctx); - if (LZ4F_isError(code)) { - rd_rkb_dbg(rkb, BROKER, "LZ4DECOMPR", - "Failed to close LZ4 compression context: %s", - LZ4F_getErrorName(code)); - err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; - } - - if (err && out) - rd_free(out); - - return err; -} - - -/** - * Allocate space for \p *outbuf and compress all \p iovlen buffers in \p iov. - * @param proper_hc generate a proper HC (checksum) (kafka >=0.10.0.0, - * MsgVersion >= 1) - * @param MessageSetSize indicates (at least) full uncompressed data size, - * possibly including MessageSet fields that will not - * be compressed. - * - * @returns allocated buffer in \p *outbuf, length in \p *outlenp. - */ -rd_kafka_resp_err_t rd_kafka_lz4_compress(rd_kafka_broker_t *rkb, - int proper_hc, - int comp_level, - rd_slice_t *slice, - void **outbuf, - size_t *outlenp) { - LZ4F_compressionContext_t cctx; - LZ4F_errorCode_t r; - rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; - size_t len = rd_slice_remains(slice); - size_t out_sz; - size_t out_of = 0; - char *out; - const void *p; - size_t rlen; - - /* Required by Kafka */ - const LZ4F_preferences_t prefs = { - .frameInfo = {.blockMode = LZ4F_blockIndependent}, - .compressionLevel = comp_level}; - - *outbuf = NULL; - - out_sz = LZ4F_compressBound(len, NULL) + 1000; - if (LZ4F_isError(out_sz)) { - rd_rkb_dbg(rkb, MSG, "LZ4COMPR", - "Unable to query LZ4 compressed size " - "(for %" PRIusz " uncompressed bytes): %s", - len, LZ4F_getErrorName(out_sz)); - return RD_KAFKA_RESP_ERR__BAD_MSG; - } - - out = rd_malloc(out_sz); - if (!out) { - rd_rkb_dbg(rkb, MSG, "LZ4COMPR", - "Unable to allocate output buffer " - "(%" PRIusz " bytes): %s", - out_sz, rd_strerror(errno)); - return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; - } - - r = LZ4F_createCompressionContext(&cctx, LZ4F_VERSION); - if (LZ4F_isError(r)) { - rd_rkb_dbg(rkb, MSG, "LZ4COMPR", - "Unable to create LZ4 compression context: %s", - LZ4F_getErrorName(r)); - rd_free(out); - return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; - } - - r = LZ4F_compressBegin(cctx, out, out_sz, &prefs); - if (LZ4F_isError(r)) { - rd_rkb_dbg(rkb, MSG, "LZ4COMPR", - "Unable to begin LZ4 compression " - "(out buffer is %" PRIusz " bytes): %s", - out_sz, LZ4F_getErrorName(r)); - err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; - goto done; - } - - out_of += r; - - while ((rlen = rd_slice_reader(slice, &p))) { - rd_assert(out_of < out_sz); - r = LZ4F_compressUpdate(cctx, out + out_of, out_sz - out_of, p, - rlen, NULL); - if (unlikely(LZ4F_isError(r))) { - rd_rkb_dbg(rkb, MSG, "LZ4COMPR", - "LZ4 compression failed " - "(at of %" PRIusz - " bytes, with " - "%" PRIusz - " bytes remaining in out buffer): " - "%s", - rlen, out_sz - out_of, LZ4F_getErrorName(r)); - err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; - goto done; - } - - out_of += r; - } - - rd_assert(rd_slice_remains(slice) == 0); - - r = LZ4F_compressEnd(cctx, out + out_of, out_sz - out_of, NULL); - if (unlikely(LZ4F_isError(r))) { - rd_rkb_dbg(rkb, MSG, "LZ4COMPR", - "Failed to finalize LZ4 compression " - "of %" PRIusz " bytes: %s", - len, LZ4F_getErrorName(r)); - err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; - goto done; - } - - out_of += r; - - /* For the broken legacy framing we need to mess up the header checksum - * so that the Kafka client / broker code accepts it. */ - if (!proper_hc) - if ((err = - rd_kafka_lz4_compress_break_framing(rkb, out, out_of))) - goto done; - - - *outbuf = out; - *outlenp = out_of; - -done: - LZ4F_freeCompressionContext(cctx); - - if (err) - rd_free(out); - - return err; -} |