summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_zstd.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_zstd.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_zstd.c226
1 files changed, 226 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_zstd.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_zstd.c
new file mode 100644
index 000000000..68b01a4e1
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_zstd.c
@@ -0,0 +1,226 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2018 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_zstd.h"
+
+#if WITH_ZSTD_STATIC
+/* Enable advanced/unstable API for initCStream_srcSize */
+#define ZSTD_STATIC_LINKING_ONLY
+#endif
+
+#include <zstd.h>
+#include <zstd_errors.h>
+
+rd_kafka_resp_err_t rd_kafka_zstd_decompress(rd_kafka_broker_t *rkb,
+ char *inbuf,
+ size_t inlen,
+ void **outbuf,
+ size_t *outlenp) {
+ unsigned long long out_bufsize = ZSTD_getFrameContentSize(inbuf, inlen);
+
+ switch (out_bufsize) {
+ case ZSTD_CONTENTSIZE_UNKNOWN:
+ /* Decompressed size cannot be determined, make a guess */
+ out_bufsize = inlen * 2;
+ break;
+ case ZSTD_CONTENTSIZE_ERROR:
+ /* Error calculating frame content size */
+ rd_rkb_dbg(rkb, MSG, "ZSTD",
+ "Unable to begin ZSTD decompression "
+ "(out buffer is %llu bytes): %s",
+ out_bufsize, "Error in determining frame size");
+ return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ default:
+ break;
+ }
+
+ /* Increase output buffer until it can fit the entire result,
+ * capped by message.max.bytes */
+ while (out_bufsize <=
+ (unsigned long long)rkb->rkb_rk->rk_conf.recv_max_msg_size) {
+ size_t ret;
+ char *decompressed;
+
+ decompressed = rd_malloc((size_t)out_bufsize);
+ if (!decompressed) {
+ rd_rkb_dbg(rkb, MSG, "ZSTD",
+ "Unable to allocate output buffer "
+ "(%llu bytes for %" PRIusz
+ " compressed bytes): %s",
+ out_bufsize, inlen, rd_strerror(errno));
+ return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
+ }
+
+
+ ret = ZSTD_decompress(decompressed, (size_t)out_bufsize, inbuf,
+ inlen);
+ if (!ZSTD_isError(ret)) {
+ *outlenp = ret;
+ *outbuf = decompressed;
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+ }
+
+ rd_free(decompressed);
+
+ /* Check if the destination size is too small */
+ if (ZSTD_getErrorCode(ret) == ZSTD_error_dstSize_tooSmall) {
+
+ /* Grow quadratically */
+ out_bufsize += RD_MAX(out_bufsize * 2, 4000);
+
+ rd_atomic64_add(&rkb->rkb_c.zbuf_grow, 1);
+
+ } else {
+ /* Fail on any other error */
+ rd_rkb_dbg(rkb, MSG, "ZSTD",
+ "Unable to begin ZSTD decompression "
+ "(out buffer is %llu bytes): %s",
+ out_bufsize, ZSTD_getErrorName(ret));
+ return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ }
+ }
+
+ rd_rkb_dbg(rkb, MSG, "ZSTD",
+ "Unable to decompress ZSTD "
+ "(input buffer %" PRIusz
+ ", output buffer %llu): "
+ "output would exceed message.max.bytes (%d)",
+ inlen, out_bufsize, rkb->rkb_rk->rk_conf.max_msg_size);
+
+ return RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+}
+
+
+rd_kafka_resp_err_t rd_kafka_zstd_compress(rd_kafka_broker_t *rkb,
+ int comp_level,
+ rd_slice_t *slice,
+ void **outbuf,
+ size_t *outlenp) {
+ ZSTD_CStream *cctx;
+ size_t r;
+ rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ size_t len = rd_slice_remains(slice);
+ ZSTD_outBuffer out;
+ ZSTD_inBuffer in;
+
+ *outbuf = NULL;
+ out.pos = 0;
+ out.size = ZSTD_compressBound(len);
+ out.dst = rd_malloc(out.size);
+ if (!out.dst) {
+ rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR",
+ "Unable to allocate output buffer "
+ "(%" PRIusz " bytes): %s",
+ out.size, rd_strerror(errno));
+ return RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
+ }
+
+
+ cctx = ZSTD_createCStream();
+ if (!cctx) {
+ rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR",
+ "Unable to create ZSTD compression context");
+ err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE;
+ goto done;
+ }
+
+#if defined(WITH_ZSTD_STATIC) && \
+ ZSTD_VERSION_NUMBER >= (1 * 100 * 100 + 2 * 100 + 1) /* v1.2.1 */
+ r = ZSTD_initCStream_srcSize(cctx, comp_level, len);
+#else
+ /* libzstd not linked statically (or zstd version < 1.2.1):
+ * decompression in consumer may be more costly due to
+ * decompressed size not included in header by librdkafka producer */
+ r = ZSTD_initCStream(cctx, comp_level);
+#endif
+ if (ZSTD_isError(r)) {
+ rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR",
+ "Unable to begin ZSTD compression "
+ "(out buffer is %" PRIusz " bytes): %s",
+ out.size, ZSTD_getErrorName(r));
+ err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ goto done;
+ }
+
+ while ((in.size = rd_slice_reader(slice, &in.src))) {
+ in.pos = 0;
+ r = ZSTD_compressStream(cctx, &out, &in);
+ if (unlikely(ZSTD_isError(r))) {
+ rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR",
+ "ZSTD compression failed "
+ "(at of %" PRIusz
+ " bytes, with "
+ "%" PRIusz
+ " bytes remaining in out buffer): "
+ "%s",
+ in.size, out.size - out.pos,
+ ZSTD_getErrorName(r));
+ err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ goto done;
+ }
+
+ /* No space left in output buffer,
+ * but input isn't fully consumed */
+ if (in.pos < in.size) {
+ err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ goto done;
+ }
+ }
+
+ if (rd_slice_remains(slice) != 0) {
+ rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR",
+ "Failed to finalize ZSTD compression "
+ "of %" PRIusz " bytes: %s",
+ len, "Unexpected trailing data");
+ err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ goto done;
+ }
+
+ r = ZSTD_endStream(cctx, &out);
+ if (unlikely(ZSTD_isError(r) || r > 0)) {
+ rd_rkb_dbg(rkb, MSG, "ZSTDCOMPR",
+ "Failed to finalize ZSTD compression "
+ "of %" PRIusz " bytes: %s",
+ len, ZSTD_getErrorName(r));
+ err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION;
+ goto done;
+ }
+
+ *outbuf = out.dst;
+ *outlenp = out.pos;
+
+done:
+ if (cctx)
+ ZSTD_freeCStream(cctx);
+
+ if (err)
+ rd_free(out.dst);
+
+ return err;
+}