From c21c3b0befeb46a51b6bf3758ffa30813bea0ff0 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 9 Mar 2024 14:19:22 +0100 Subject: Adding upstream version 1.44.3. Signed-off-by: Daniel Baumann --- fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_zstd.c | 226 +++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_zstd.c (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_zstd.c') diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_zstd.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_zstd.c new file mode 100644 index 000000000..68b01a4e1 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_zstd.c @@ -0,0 +1,226 @@ +/* + * librdkafka - The Apache Kafka C/C++ library + * + * Copyright (c) 2018 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rdkafka_int.h" +#include "rdkafka_zstd.h" + +#if WITH_ZSTD_STATIC +/* Enable advanced/unstable API for initCStream_srcSize */ +#define ZSTD_STATIC_LINKING_ONLY +#endif + +#include +#include + +rd_kafka_resp_err_t rd_kafka_zstd_decompress(rd_kafka_broker_t *rkb, + char *inbuf, + size_t inlen, + void **outbuf, + size_t *outlenp) { + unsigned long long out_bufsize = ZSTD_getFrameContentSize(inbuf, inlen); + + switch (out_bufsize) { + case ZSTD_CONTENTSIZE_UNKNOWN: + /* Decompressed size cannot be determined, make a guess */ + out_bufsize = inlen * 2; + break; + case ZSTD_CONTENTSIZE_ERROR: + /* Error calculating frame content size */ + rd_rkb_dbg(rkb, MSG, "ZSTD", + "Unable to begin ZSTD decompression " + "(out buffer is %llu bytes): %s", + out_bufsize, "Error in determining frame size"); + return RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + default: + break; + } + + /* Increase output buffer until it can fit the entire result, + * capped by message.max.bytes */ + while (out_bufsize <= + (unsigned long long)rkb->rkb_rk->rk_conf.recv_max_msg_size) { + size_t ret; + char *decompressed; + + decompressed = rd_malloc((size_t)out_bufsize); + if (!decompressed) { + rd_rkb_dbg(rkb, MSG, "ZSTD", + "Unable to allocate output buffer " + "(%llu bytes for %" PRIusz + " compressed bytes): %s", + out_bufsize, inlen, rd_strerror(errno)); + return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; + } + + + ret = ZSTD_decompress(decompressed, (size_t)out_bufsize, inbuf, + inlen); + if (!ZSTD_isError(ret)) { + *outlenp = ret; + *outbuf = decompressed; + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + rd_free(decompressed); + + /* Check if the destination size is too small */ + if (ZSTD_getErrorCode(ret) == ZSTD_error_dstSize_tooSmall) { + + /* Grow quadratically */ + out_bufsize += RD_MAX(out_bufsize * 2, 4000); + + rd_atomic64_add(&rkb->rkb_c.zbuf_grow, 1); + + } else { + /* Fail on any other error */ + rd_rkb_dbg(rkb, MSG, "ZSTD", + "Unable to begin ZSTD decompression " + "(out buffer is %llu bytes): %s", + out_bufsize, ZSTD_getErrorName(ret)); + return RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + } + } + + rd_rkb_dbg(rkb, MSG, "ZSTD", + "Unable to decompress ZSTD " + "(input buffer %" PRIusz + ", output buffer %llu): " + "output would exceed message.max.bytes (%d)", + inlen, out_bufsize, rkb->rkb_rk->rk_conf.max_msg_size); + + return RD_KAFKA_RESP_ERR__BAD_COMPRESSION; +} + + +rd_kafka_resp_err_t rd_kafka_zstd_compress(rd_kafka_broker_t *rkb, + int comp_level, + rd_slice_t *slice, + void **outbuf, + size_t *outlenp) { + ZSTD_CStream *cctx; + size_t r; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + size_t len = rd_slice_remains(slice); + ZSTD_outBuffer out; + ZSTD_inBuffer in; + + *outbuf = NULL; + out.pos = 0; + out.size = ZSTD_compressBound(len); + out.dst = rd_malloc(out.size); + if (!out.dst) { + rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR", + "Unable to allocate output buffer " + "(%" PRIusz " bytes): %s", + out.size, rd_strerror(errno)); + return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; + } + + + cctx = ZSTD_createCStream(); + if (!cctx) { + rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR", + "Unable to create ZSTD compression context"); + err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; + goto done; + } + +#if defined(WITH_ZSTD_STATIC) && \ + ZSTD_VERSION_NUMBER >= (1 * 100 * 100 + 2 * 100 + 1) /* v1.2.1 */ + r = ZSTD_initCStream_srcSize(cctx, comp_level, len); +#else + /* libzstd not linked statically (or zstd version < 1.2.1): + * decompression in consumer may be more costly due to + * decompressed size not included in header by librdkafka producer */ + r = ZSTD_initCStream(cctx, comp_level); +#endif + if (ZSTD_isError(r)) { + rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR", + "Unable to begin ZSTD compression " + "(out buffer is %" PRIusz " bytes): %s", + out.size, ZSTD_getErrorName(r)); + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + goto done; + } + + while ((in.size = rd_slice_reader(slice, &in.src))) { + in.pos = 0; + r = ZSTD_compressStream(cctx, &out, &in); + if (unlikely(ZSTD_isError(r))) { + rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR", + "ZSTD compression failed " + "(at of %" PRIusz + " bytes, with " + "%" PRIusz + " bytes remaining in out buffer): " + "%s", + in.size, out.size - out.pos, + ZSTD_getErrorName(r)); + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + goto done; + } + + /* No space left in output buffer, + * but input isn't fully consumed */ + if (in.pos < in.size) { + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + goto done; + } + } + + if (rd_slice_remains(slice) != 0) { + rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR", + "Failed to finalize ZSTD compression " + "of %" PRIusz " bytes: %s", + len, "Unexpected trailing data"); + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + goto done; + } + + r = ZSTD_endStream(cctx, &out); + if (unlikely(ZSTD_isError(r) || r > 0)) { + rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR", + "Failed to finalize ZSTD compression " + "of %" PRIusz " bytes: %s", + len, ZSTD_getErrorName(r)); + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + goto done; + } + + *outbuf = out.dst; + *outlenp = out.pos; + +done: + if (cctx) + ZSTD_freeCStream(cctx); + + if (err) + rd_free(out.dst); + + return err; +} -- cgit v1.2.3