summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_writer.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_writer.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_writer.c1445
1 files changed, 1445 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_writer.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_writer.c
new file mode 100644
index 000000000..beb36bfac
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_writer.c
@@ -0,0 +1,1445 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2017 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rd.h"
+#include "rdkafka_int.h"
+#include "rdkafka_msg.h"
+#include "rdkafka_msgset.h"
+#include "rdkafka_topic.h"
+#include "rdkafka_partition.h"
+#include "rdkafka_header.h"
+#include "rdkafka_lz4.h"
+
+#if WITH_ZSTD
+#include "rdkafka_zstd.h"
+#endif
+
+#include "snappy.h"
+#include "rdvarint.h"
+#include "crc32c.h"
+
+
+/** @brief The maxium ProduceRequestion ApiVersion supported by librdkafka */
+static const int16_t rd_kafka_ProduceRequest_max_version = 7;
+
+
+typedef struct rd_kafka_msgset_writer_s {
+ rd_kafka_buf_t *msetw_rkbuf; /* Backing store buffer (refcounted)*/
+
+ int16_t msetw_ApiVersion; /* ProduceRequest ApiVersion */
+ int msetw_MsgVersion; /* MsgVersion to construct */
+ int msetw_features; /* Protocol features to use */
+ rd_kafka_compression_t msetw_compression; /**< Compression type */
+ int msetw_msgcntmax; /* Max number of messages to send
+ * in a batch. */
+ size_t msetw_messages_len; /* Total size of Messages, with Message
+ * framing but without
+ * MessageSet header */
+ size_t msetw_messages_kvlen; /* Total size of Message keys
+ * and values */
+
+ size_t msetw_MessageSetSize; /* Current MessageSetSize value */
+ size_t msetw_of_MessageSetSize; /* offset of MessageSetSize */
+ size_t msetw_of_start; /* offset of MessageSet */
+
+ int msetw_relative_offsets; /* Bool: use relative offsets */
+
+ /* For MessageSet v2 */
+ int msetw_Attributes; /* MessageSet Attributes */
+ int64_t msetw_MaxTimestamp; /* Maximum timestamp in batch */
+ size_t msetw_of_CRC; /* offset of MessageSet.CRC */
+
+ rd_kafka_msgbatch_t *msetw_batch; /**< Convenience pointer to
+ * rkbuf_u.Produce.batch */
+
+ /* First message information */
+ struct {
+ size_t of; /* rkbuf's first message position */
+ int64_t timestamp;
+ } msetw_firstmsg;
+
+ rd_kafka_pid_t msetw_pid; /**< Idempotent producer's
+ * current Producer Id */
+ rd_kafka_broker_t *msetw_rkb; /* @warning Not a refcounted
+ * reference! */
+ rd_kafka_toppar_t *msetw_rktp; /* @warning Not a refcounted
+ * reference! */
+ rd_kafka_msgq_t *msetw_msgq; /**< Input message queue */
+} rd_kafka_msgset_writer_t;
+
+
+
+/**
+ * @brief Select ApiVersion and MsgVersion to use based on broker's
+ * feature compatibility.
+ *
+ * @returns -1 if a MsgVersion (or ApiVersion) could not be selected, else 0.
+ * @locality broker thread
+ */
+static RD_INLINE int
+rd_kafka_msgset_writer_select_MsgVersion(rd_kafka_msgset_writer_t *msetw) {
+ rd_kafka_broker_t *rkb = msetw->msetw_rkb;
+ rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
+ const int16_t max_ApiVersion = rd_kafka_ProduceRequest_max_version;
+ int16_t min_ApiVersion = 0;
+ int feature;
+ /* Map compression types to required feature and ApiVersion */
+ static const struct {
+ int feature;
+ int16_t ApiVersion;
+ } compr_req[RD_KAFKA_COMPRESSION_NUM] = {
+ [RD_KAFKA_COMPRESSION_LZ4] = {RD_KAFKA_FEATURE_LZ4, 0},
+#if WITH_ZSTD
+ [RD_KAFKA_COMPRESSION_ZSTD] = {RD_KAFKA_FEATURE_ZSTD, 7},
+#endif
+ };
+
+ if ((feature = rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER2)) {
+ min_ApiVersion = 3;
+ msetw->msetw_MsgVersion = 2;
+ msetw->msetw_features |= feature;
+ } else if ((feature = rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER1)) {
+ min_ApiVersion = 2;
+ msetw->msetw_MsgVersion = 1;
+ msetw->msetw_features |= feature;
+ } else {
+ if ((feature =
+ rkb->rkb_features & RD_KAFKA_FEATURE_THROTTLETIME)) {
+ min_ApiVersion = 1;
+ msetw->msetw_features |= feature;
+ } else
+ min_ApiVersion = 0;
+ msetw->msetw_MsgVersion = 0;
+ }
+
+ msetw->msetw_compression = rktp->rktp_rkt->rkt_conf.compression_codec;
+
+ /*
+ * Check that the configured compression type is supported
+ * by both client and broker, else disable compression.
+ */
+ if (msetw->msetw_compression &&
+ (rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_Produce, 0,
+ compr_req[msetw->msetw_compression].ApiVersion, NULL) == -1 ||
+ (compr_req[msetw->msetw_compression].feature &&
+ !(msetw->msetw_rkb->rkb_features &
+ compr_req[msetw->msetw_compression].feature)))) {
+ if (unlikely(
+ rd_interval(&rkb->rkb_suppress.unsupported_compression,
+ /* at most once per day */
+ (rd_ts_t)86400 * 1000 * 1000, 0) > 0))
+ rd_rkb_log(
+ rkb, LOG_NOTICE, "COMPRESSION",
+ "%.*s [%" PRId32
+ "]: "
+ "Broker does not support compression "
+ "type %s: not compressing batch",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rd_kafka_compression2str(msetw->msetw_compression));
+ else
+ rd_rkb_dbg(
+ rkb, MSG, "PRODUCE",
+ "%.*s [%" PRId32
+ "]: "
+ "Broker does not support compression "
+ "type %s: not compressing batch",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rd_kafka_compression2str(msetw->msetw_compression));
+
+ msetw->msetw_compression = RD_KAFKA_COMPRESSION_NONE;
+ } else {
+ /* Broker supports this compression type. */
+ msetw->msetw_features |=
+ compr_req[msetw->msetw_compression].feature;
+
+ if (min_ApiVersion <
+ compr_req[msetw->msetw_compression].ApiVersion)
+ min_ApiVersion =
+ compr_req[msetw->msetw_compression].ApiVersion;
+ }
+
+ /* MsgVersion specific setup. */
+ switch (msetw->msetw_MsgVersion) {
+ case 2:
+ msetw->msetw_relative_offsets = 1; /* OffsetDelta */
+ break;
+ case 1:
+ if (msetw->msetw_compression != RD_KAFKA_COMPRESSION_NONE)
+ msetw->msetw_relative_offsets = 1;
+ break;
+ }
+
+ /* Set the highest ApiVersion supported by us and broker */
+ msetw->msetw_ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_Produce, min_ApiVersion, max_ApiVersion, NULL);
+
+ if (msetw->msetw_ApiVersion == -1) {
+ rd_kafka_msg_t *rkm;
+ /* This will only happen if the broker reports none, or
+ * no matching ProduceRequest versions, which should never
+ * happen. */
+ rd_rkb_log(rkb, LOG_ERR, "PRODUCE",
+ "%.*s [%" PRId32
+ "]: "
+ "No viable ProduceRequest ApiVersions (v%d..%d) "
+ "supported by broker: unable to produce",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition, min_ApiVersion,
+ max_ApiVersion);
+
+ /* Back off and retry in 5s */
+ rkm = rd_kafka_msgq_first(msetw->msetw_msgq);
+ rd_assert(rkm);
+ rkm->rkm_u.producer.ts_backoff = rd_clock() + (5 * 1000 * 1000);
+ return -1;
+ }
+
+ /* It should not be possible to get a lower version than requested,
+ * otherwise the logic in this function is buggy. */
+ rd_assert(msetw->msetw_ApiVersion >= min_ApiVersion);
+
+ return 0;
+}
+
+
+/**
+ * @brief Allocate buffer for messageset writer based on a previously set
+ * up \p msetw.
+ *
+ * Allocate iovecs to hold all headers and messages,
+ * and allocate enough space to allow copies of small messages.
+ * The allocated size is the minimum of message.max.bytes
+ * or queued_bytes + msgcntmax * msg_overhead
+ */
+static void rd_kafka_msgset_writer_alloc_buf(rd_kafka_msgset_writer_t *msetw) {
+ rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
+ size_t msg_overhead = 0;
+ size_t hdrsize = 0;
+ size_t msgsetsize = 0;
+ size_t bufsize;
+
+ rd_kafka_assert(NULL, !msetw->msetw_rkbuf);
+
+ /* Calculate worst-case buffer size, produce header size,
+ * message size, etc, this isn't critical but avoids unnecesary
+ * extra allocations. The buffer will grow as needed if we get
+ * this wrong.
+ *
+ * ProduceRequest headers go in one iovec:
+ * ProduceRequest v0..2:
+ * RequiredAcks + Timeout +
+ * [Topic + [Partition + MessageSetSize]]
+ *
+ * ProduceRequest v3:
+ * TransactionalId + RequiredAcks + Timeout +
+ * [Topic + [Partition + MessageSetSize + MessageSet]]
+ */
+
+ /*
+ * ProduceRequest header sizes
+ */
+ switch (msetw->msetw_ApiVersion) {
+ case 7:
+ case 6:
+ case 5:
+ case 4:
+ case 3:
+ /* Add TransactionalId */
+ hdrsize += RD_KAFKAP_STR_SIZE(rk->rk_eos.transactional_id);
+ /* FALLTHRU */
+ case 0:
+ case 1:
+ case 2:
+ hdrsize +=
+ /* RequiredAcks + Timeout + TopicCnt */
+ 2 + 4 + 4 +
+ /* Topic */
+ RD_KAFKAP_STR_SIZE(msetw->msetw_rktp->rktp_rkt->rkt_topic) +
+ /* PartitionCnt + Partition + MessageSetSize */
+ 4 + 4 + 4;
+ msgsetsize += 4; /* MessageSetSize */
+ break;
+
+ default:
+ RD_NOTREACHED();
+ }
+
+ /*
+ * MsgVersion specific sizes:
+ * - (Worst-case) Message overhead: message fields
+ * - MessageSet header size
+ */
+ switch (msetw->msetw_MsgVersion) {
+ case 0:
+ /* MsgVer0 */
+ msg_overhead = RD_KAFKAP_MESSAGE_V0_OVERHEAD;
+ break;
+ case 1:
+ /* MsgVer1 */
+ msg_overhead = RD_KAFKAP_MESSAGE_V1_OVERHEAD;
+ break;
+
+ case 2:
+ /* MsgVer2 uses varints, we calculate for the worst-case. */
+ msg_overhead += RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD;
+
+ /* MessageSet header fields */
+ msgsetsize += 8 /* BaseOffset */ + 4 /* Length */ +
+ 4 /* PartitionLeaderEpoch */ +
+ 1 /* Magic (MsgVersion) */ +
+ 4 /* CRC (CRC32C) */ + 2 /* Attributes */ +
+ 4 /* LastOffsetDelta */ + 8 /* BaseTimestamp */ +
+ 8 /* MaxTimestamp */ + 8 /* ProducerId */ +
+ 2 /* ProducerEpoch */ + 4 /* BaseSequence */ +
+ 4 /* RecordCount */;
+ break;
+
+ default:
+ RD_NOTREACHED();
+ }
+
+ /*
+ * Calculate total buffer size to allocate
+ */
+ bufsize = hdrsize + msgsetsize;
+
+ /* If copying for small payloads is enabled, allocate enough
+ * space for each message to be copied based on this limit.
+ */
+ if (rk->rk_conf.msg_copy_max_size > 0) {
+ size_t queued_bytes = rd_kafka_msgq_size(msetw->msetw_msgq);
+ bufsize +=
+ RD_MIN(queued_bytes, (size_t)rk->rk_conf.msg_copy_max_size *
+ msetw->msetw_msgcntmax);
+ }
+
+ /* Add estimed per-message overhead */
+ bufsize += msg_overhead * msetw->msetw_msgcntmax;
+
+ /* Cap allocation at message.max.bytes */
+ if (bufsize > (size_t)rk->rk_conf.max_msg_size)
+ bufsize = (size_t)rk->rk_conf.max_msg_size;
+
+ /*
+ * Allocate iovecs to hold all headers and messages,
+ * and allocate auxilliery space for message headers, etc.
+ */
+ msetw->msetw_rkbuf =
+ rd_kafka_buf_new_request(msetw->msetw_rkb, RD_KAFKAP_Produce,
+ msetw->msetw_msgcntmax / 2 + 10, bufsize);
+
+ rd_kafka_buf_ApiVersion_set(msetw->msetw_rkbuf, msetw->msetw_ApiVersion,
+ msetw->msetw_features);
+}
+
+
+/**
+ * @brief Write the MessageSet header.
+ * @remark Must only be called for MsgVersion 2
+ */
+static void rd_kafka_msgset_writer_write_MessageSet_v2_header(
+ rd_kafka_msgset_writer_t *msetw) {
+ rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
+
+ rd_kafka_assert(NULL, msetw->msetw_ApiVersion >= 3);
+ rd_kafka_assert(NULL, msetw->msetw_MsgVersion == 2);
+
+ /* BaseOffset (also store the offset to the start of
+ * the messageset header fields) */
+ msetw->msetw_of_start = rd_kafka_buf_write_i64(rkbuf, 0);
+
+ /* Length: updated later */
+ rd_kafka_buf_write_i32(rkbuf, 0);
+
+ /* PartitionLeaderEpoch (KIP-101) */
+ rd_kafka_buf_write_i32(rkbuf, 0);
+
+ /* Magic (MsgVersion) */
+ rd_kafka_buf_write_i8(rkbuf, msetw->msetw_MsgVersion);
+
+ /* CRC (CRC32C): updated later.
+ * CRC needs to be done after the entire messageset+messages has
+ * been constructed and the following header fields updated. :(
+ * Save the offset for this position. so it can be udpated later. */
+ msetw->msetw_of_CRC = rd_kafka_buf_write_i32(rkbuf, 0);
+
+ /* Attributes: updated later */
+ rd_kafka_buf_write_i16(rkbuf, 0);
+
+ /* LastOffsetDelta: updated later */
+ rd_kafka_buf_write_i32(rkbuf, 0);
+
+ /* BaseTimestamp: updated later */
+ rd_kafka_buf_write_i64(rkbuf, 0);
+
+ /* MaxTimestamp: updated later */
+ rd_kafka_buf_write_i64(rkbuf, 0);
+
+ /* ProducerId */
+ rd_kafka_buf_write_i64(rkbuf, msetw->msetw_pid.id);
+
+ /* ProducerEpoch */
+ rd_kafka_buf_write_i16(rkbuf, msetw->msetw_pid.epoch);
+
+ /* BaseSequence: updated later in case of Idempotent Producer */
+ rd_kafka_buf_write_i32(rkbuf, -1);
+
+ /* RecordCount: udpated later */
+ rd_kafka_buf_write_i32(rkbuf, 0);
+}
+
+
+/**
+ * @brief Write ProduceRequest headers.
+ * When this function returns the msgset is ready for
+ * writing individual messages.
+ * msetw_MessageSetSize will have been set to the messageset header.
+ */
+static void
+rd_kafka_msgset_writer_write_Produce_header(rd_kafka_msgset_writer_t *msetw) {
+
+ rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
+ rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
+ rd_kafka_topic_t *rkt = msetw->msetw_rktp->rktp_rkt;
+
+ /* V3: TransactionalId */
+ if (msetw->msetw_ApiVersion >= 3)
+ rd_kafka_buf_write_kstr(rkbuf, rk->rk_eos.transactional_id);
+
+ /* RequiredAcks */
+ rd_kafka_buf_write_i16(rkbuf, rkt->rkt_conf.required_acks);
+
+ /* Timeout */
+ rd_kafka_buf_write_i32(rkbuf, rkt->rkt_conf.request_timeout_ms);
+
+ /* TopicArrayCnt */
+ rd_kafka_buf_write_i32(rkbuf, 1);
+
+ /* Insert topic */
+ rd_kafka_buf_write_kstr(rkbuf, rkt->rkt_topic);
+
+ /* PartitionArrayCnt */
+ rd_kafka_buf_write_i32(rkbuf, 1);
+
+ /* Partition */
+ rd_kafka_buf_write_i32(rkbuf, msetw->msetw_rktp->rktp_partition);
+
+ /* MessageSetSize: Will be finalized later*/
+ msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_i32(rkbuf, 0);
+
+ if (msetw->msetw_MsgVersion == 2) {
+ /* MessageSet v2 header */
+ rd_kafka_msgset_writer_write_MessageSet_v2_header(msetw);
+ msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V2_SIZE;
+ } else {
+ /* Older MessageSet */
+ msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V0_SIZE;
+ }
+}
+
+
+/**
+ * @brief Initialize a ProduceRequest MessageSet writer for
+ * the given broker and partition.
+ *
+ * A new buffer will be allocated to fit the pending messages in queue.
+ *
+ * @returns the number of messages to enqueue
+ *
+ * @remark This currently constructs the entire ProduceRequest, containing
+ * a single outer MessageSet for a single partition.
+ *
+ * @locality broker thread
+ */
+static int rd_kafka_msgset_writer_init(rd_kafka_msgset_writer_t *msetw,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_toppar_t *rktp,
+ rd_kafka_msgq_t *rkmq,
+ rd_kafka_pid_t pid,
+ uint64_t epoch_base_msgid) {
+ int msgcnt = rd_kafka_msgq_len(rkmq);
+
+ if (msgcnt == 0)
+ return 0;
+
+ memset(msetw, 0, sizeof(*msetw));
+
+ msetw->msetw_rktp = rktp;
+ msetw->msetw_rkb = rkb;
+ msetw->msetw_msgq = rkmq;
+ msetw->msetw_pid = pid;
+
+ /* Max number of messages to send in a batch,
+ * limited by current queue size or configured batch size,
+ * whichever is lower. */
+ msetw->msetw_msgcntmax =
+ RD_MIN(msgcnt, rkb->rkb_rk->rk_conf.batch_num_messages);
+ rd_dassert(msetw->msetw_msgcntmax > 0);
+
+ /* Select MsgVersion to use */
+ if (rd_kafka_msgset_writer_select_MsgVersion(msetw) == -1)
+ return -1;
+
+ /* Allocate backing buffer */
+ rd_kafka_msgset_writer_alloc_buf(msetw);
+
+ /* Construct first part of Produce header + MessageSet header */
+ rd_kafka_msgset_writer_write_Produce_header(msetw);
+
+ /* The current buffer position is now where the first message
+ * is located.
+ * Record the current buffer position so it can be rewound later
+ * in case of compression. */
+ msetw->msetw_firstmsg.of =
+ rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf);
+
+ rd_kafka_msgbatch_init(&msetw->msetw_rkbuf->rkbuf_u.Produce.batch, rktp,
+ pid, epoch_base_msgid);
+ msetw->msetw_batch = &msetw->msetw_rkbuf->rkbuf_u.Produce.batch;
+
+ return msetw->msetw_msgcntmax;
+}
+
+
+
+/**
+ * @brief Copy or link message payload to buffer.
+ */
+static RD_INLINE void
+rd_kafka_msgset_writer_write_msg_payload(rd_kafka_msgset_writer_t *msetw,
+ const rd_kafka_msg_t *rkm,
+ void (*free_cb)(void *)) {
+ const rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk;
+ rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
+
+ /* If payload is below the copy limit and there is still
+ * room in the buffer we'll copy the payload to the buffer,
+ * otherwise we push a reference to the memory. */
+ if (rkm->rkm_len <= (size_t)rk->rk_conf.msg_copy_max_size &&
+ rd_buf_write_remains(&rkbuf->rkbuf_buf) > rkm->rkm_len) {
+ rd_kafka_buf_write(rkbuf, rkm->rkm_payload, rkm->rkm_len);
+ if (free_cb)
+ free_cb(rkm->rkm_payload);
+ } else
+ rd_kafka_buf_push(rkbuf, rkm->rkm_payload, rkm->rkm_len,
+ free_cb);
+}
+
+
+/**
+ * @brief Write message headers to buffer.
+ *
+ * @remark The enveloping HeaderCount varint must already have been written.
+ * @returns the number of bytes written to msetw->msetw_rkbuf
+ */
+static size_t
+rd_kafka_msgset_writer_write_msg_headers(rd_kafka_msgset_writer_t *msetw,
+ const rd_kafka_headers_t *hdrs) {
+ rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
+ const rd_kafka_header_t *hdr;
+ int i;
+ size_t start_pos = rd_buf_write_pos(&rkbuf->rkbuf_buf);
+ size_t written;
+
+ RD_LIST_FOREACH(hdr, &hdrs->rkhdrs_list, i) {
+ rd_kafka_buf_write_varint(rkbuf, hdr->rkhdr_name_size);
+ rd_kafka_buf_write(rkbuf, hdr->rkhdr_name,
+ hdr->rkhdr_name_size);
+ rd_kafka_buf_write_varint(
+ rkbuf,
+ hdr->rkhdr_value ? (int64_t)hdr->rkhdr_value_size : -1);
+ rd_kafka_buf_write(rkbuf, hdr->rkhdr_value,
+ hdr->rkhdr_value_size);
+ }
+
+ written = rd_buf_write_pos(&rkbuf->rkbuf_buf) - start_pos;
+ rd_dassert(written == hdrs->rkhdrs_ser_size);
+
+ return written;
+}
+
+
+
+/**
+ * @brief Write message to messageset buffer with MsgVersion 0 or 1.
+ * @returns the number of bytes written.
+ */
+static size_t
+rd_kafka_msgset_writer_write_msg_v0_1(rd_kafka_msgset_writer_t *msetw,
+ rd_kafka_msg_t *rkm,
+ int64_t Offset,
+ int8_t MsgAttributes,
+ void (*free_cb)(void *)) {
+ rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
+ size_t MessageSize;
+ size_t of_Crc;
+
+ /*
+ * MessageSet's (v0 and v1) per-Message header.
+ */
+
+ /* Offset (only relevant for compressed messages on MsgVersion v1) */
+ rd_kafka_buf_write_i64(rkbuf, Offset);
+
+ /* MessageSize */
+ MessageSize = 4 + 1 + 1 + /* Crc+MagicByte+Attributes */
+ 4 /* KeyLength */ + rkm->rkm_key_len +
+ 4 /* ValueLength */ + rkm->rkm_len;
+
+ if (msetw->msetw_MsgVersion == 1)
+ MessageSize += 8; /* Timestamp i64 */
+
+ rd_kafka_buf_write_i32(rkbuf, (int32_t)MessageSize);
+
+ /*
+ * Message
+ */
+ /* Crc: will be updated later */
+ of_Crc = rd_kafka_buf_write_i32(rkbuf, 0);
+
+ /* Start Crc calculation of all buf writes. */
+ rd_kafka_buf_crc_init(rkbuf);
+
+ /* MagicByte */
+ rd_kafka_buf_write_i8(rkbuf, msetw->msetw_MsgVersion);
+
+ /* Attributes */
+ rd_kafka_buf_write_i8(rkbuf, MsgAttributes);
+
+ /* V1: Timestamp */
+ if (msetw->msetw_MsgVersion == 1)
+ rd_kafka_buf_write_i64(rkbuf, rkm->rkm_timestamp);
+
+ /* Message Key */
+ rd_kafka_buf_write_bytes(rkbuf, rkm->rkm_key, rkm->rkm_key_len);
+
+ /* Write or copy Value/payload */
+ if (rkm->rkm_payload) {
+ rd_kafka_buf_write_i32(rkbuf, (int32_t)rkm->rkm_len);
+ rd_kafka_msgset_writer_write_msg_payload(msetw, rkm, free_cb);
+ } else
+ rd_kafka_buf_write_i32(rkbuf, RD_KAFKAP_BYTES_LEN_NULL);
+
+ /* Finalize Crc */
+ rd_kafka_buf_update_u32(rkbuf, of_Crc,
+ rd_kafka_buf_crc_finalize(rkbuf));
+
+
+ /* Return written message size */
+ return 8 /*Offset*/ + 4 /*MessageSize*/ + MessageSize;
+}
+
+/**
+ * @brief Write message to messageset buffer with MsgVersion 2.
+ * @returns the number of bytes written.
+ */
+static size_t
+rd_kafka_msgset_writer_write_msg_v2(rd_kafka_msgset_writer_t *msetw,
+ rd_kafka_msg_t *rkm,
+ int64_t Offset,
+ int8_t MsgAttributes,
+ void (*free_cb)(void *)) {
+ rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
+ size_t MessageSize = 0;
+ char varint_Length[RD_UVARINT_ENC_SIZEOF(int32_t)];
+ char varint_TimestampDelta[RD_UVARINT_ENC_SIZEOF(int64_t)];
+ char varint_OffsetDelta[RD_UVARINT_ENC_SIZEOF(int64_t)];
+ char varint_KeyLen[RD_UVARINT_ENC_SIZEOF(int32_t)];
+ char varint_ValueLen[RD_UVARINT_ENC_SIZEOF(int32_t)];
+ char varint_HeaderCount[RD_UVARINT_ENC_SIZEOF(int32_t)];
+ size_t sz_Length;
+ size_t sz_TimestampDelta;
+ size_t sz_OffsetDelta;
+ size_t sz_KeyLen;
+ size_t sz_ValueLen;
+ size_t sz_HeaderCount;
+ int HeaderCount = 0;
+ size_t HeaderSize = 0;
+
+ if (rkm->rkm_headers) {
+ HeaderCount = rkm->rkm_headers->rkhdrs_list.rl_cnt;
+ HeaderSize = rkm->rkm_headers->rkhdrs_ser_size;
+ }
+
+ /* All varints, except for Length, needs to be pre-built
+ * so that the Length field can be set correctly and thus have
+ * correct varint encoded width. */
+
+ sz_TimestampDelta = rd_uvarint_enc_i64(
+ varint_TimestampDelta, sizeof(varint_TimestampDelta),
+ rkm->rkm_timestamp - msetw->msetw_firstmsg.timestamp);
+ sz_OffsetDelta = rd_uvarint_enc_i64(varint_OffsetDelta,
+ sizeof(varint_OffsetDelta), Offset);
+ sz_KeyLen = rd_uvarint_enc_i32(varint_KeyLen, sizeof(varint_KeyLen),
+ rkm->rkm_key
+ ? (int32_t)rkm->rkm_key_len
+ : (int32_t)RD_KAFKAP_BYTES_LEN_NULL);
+ sz_ValueLen = rd_uvarint_enc_i32(
+ varint_ValueLen, sizeof(varint_ValueLen),
+ rkm->rkm_payload ? (int32_t)rkm->rkm_len
+ : (int32_t)RD_KAFKAP_BYTES_LEN_NULL);
+ sz_HeaderCount =
+ rd_uvarint_enc_i32(varint_HeaderCount, sizeof(varint_HeaderCount),
+ (int32_t)HeaderCount);
+
+ /* Calculate MessageSize without length of Length (added later)
+ * to store it in Length. */
+ MessageSize = 1 /* MsgAttributes */ + sz_TimestampDelta +
+ sz_OffsetDelta + sz_KeyLen + rkm->rkm_key_len +
+ sz_ValueLen + rkm->rkm_len + sz_HeaderCount + HeaderSize;
+
+ /* Length */
+ sz_Length = rd_uvarint_enc_i64(varint_Length, sizeof(varint_Length),
+ MessageSize);
+ rd_kafka_buf_write(rkbuf, varint_Length, sz_Length);
+ MessageSize += sz_Length;
+
+ /* Attributes: The MsgAttributes argument is losely based on MsgVer0
+ * which don't apply for MsgVer2 */
+ rd_kafka_buf_write_i8(rkbuf, 0);
+
+ /* TimestampDelta */
+ rd_kafka_buf_write(rkbuf, varint_TimestampDelta, sz_TimestampDelta);
+
+ /* OffsetDelta */
+ rd_kafka_buf_write(rkbuf, varint_OffsetDelta, sz_OffsetDelta);
+
+ /* KeyLen */
+ rd_kafka_buf_write(rkbuf, varint_KeyLen, sz_KeyLen);
+
+ /* Key (if any) */
+ if (rkm->rkm_key)
+ rd_kafka_buf_write(rkbuf, rkm->rkm_key, rkm->rkm_key_len);
+
+ /* ValueLen */
+ rd_kafka_buf_write(rkbuf, varint_ValueLen, sz_ValueLen);
+
+ /* Write or copy Value/payload */
+ if (rkm->rkm_payload)
+ rd_kafka_msgset_writer_write_msg_payload(msetw, rkm, free_cb);
+
+ /* HeaderCount */
+ rd_kafka_buf_write(rkbuf, varint_HeaderCount, sz_HeaderCount);
+
+ /* Headers array */
+ if (rkm->rkm_headers)
+ rd_kafka_msgset_writer_write_msg_headers(msetw,
+ rkm->rkm_headers);
+
+ /* Return written message size */
+ return MessageSize;
+}
+
+
+/**
+ * @brief Write message to messageset buffer.
+ * @returns the number of bytes written.
+ */
+static size_t rd_kafka_msgset_writer_write_msg(rd_kafka_msgset_writer_t *msetw,
+ rd_kafka_msg_t *rkm,
+ int64_t Offset,
+ int8_t MsgAttributes,
+ void (*free_cb)(void *)) {
+ size_t outlen;
+ size_t (*writer[])(rd_kafka_msgset_writer_t *, rd_kafka_msg_t *,
+ int64_t, int8_t, void (*)(void *)) = {
+ [0] = rd_kafka_msgset_writer_write_msg_v0_1,
+ [1] = rd_kafka_msgset_writer_write_msg_v0_1,
+ [2] = rd_kafka_msgset_writer_write_msg_v2};
+ size_t actual_written;
+ size_t pre_pos;
+
+ if (likely(rkm->rkm_timestamp))
+ MsgAttributes |= RD_KAFKA_MSG_ATTR_CREATE_TIME;
+
+ pre_pos = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf);
+
+ outlen = writer[msetw->msetw_MsgVersion](msetw, rkm, Offset,
+ MsgAttributes, free_cb);
+
+ actual_written =
+ rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) - pre_pos;
+ rd_assert(outlen <=
+ rd_kafka_msg_wire_size(rkm, msetw->msetw_MsgVersion));
+ rd_assert(outlen == actual_written);
+
+ return outlen;
+}
+
+/**
+ * @brief Write as many messages from the given message queue to
+ * the messageset.
+ *
+ * May not write any messages.
+ *
+ * @returns 1 on success or 0 on error.
+ */
+static int rd_kafka_msgset_writer_write_msgq(rd_kafka_msgset_writer_t *msetw,
+ rd_kafka_msgq_t *rkmq) {
+ rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
+ rd_kafka_broker_t *rkb = msetw->msetw_rkb;
+ size_t len = rd_buf_len(&msetw->msetw_rkbuf->rkbuf_buf);
+ size_t max_msg_size =
+ RD_MIN((size_t)msetw->msetw_rkb->rkb_rk->rk_conf.max_msg_size,
+ (size_t)msetw->msetw_rkb->rkb_rk->rk_conf.batch_size);
+ rd_ts_t int_latency_base;
+ rd_ts_t MaxTimestamp = 0;
+ rd_kafka_msg_t *rkm;
+ int msgcnt = 0;
+ const rd_ts_t now = rd_clock();
+
+ /* Internal latency calculation base.
+ * Uses rkm_ts_timeout which is enqueue time + timeout */
+ int_latency_base =
+ now + ((rd_ts_t)rktp->rktp_rkt->rkt_conf.message_timeout_ms * 1000);
+
+ /* Acquire BaseTimestamp from first message. */
+ rkm = TAILQ_FIRST(&rkmq->rkmq_msgs);
+ rd_kafka_assert(NULL, rkm);
+ msetw->msetw_firstmsg.timestamp = rkm->rkm_timestamp;
+
+ rd_kafka_msgbatch_set_first_msg(msetw->msetw_batch, rkm);
+
+ /*
+ * Write as many messages as possible until buffer is full
+ * or limit reached.
+ */
+ do {
+ if (unlikely(msetw->msetw_batch->last_msgid &&
+ msetw->msetw_batch->last_msgid <
+ rkm->rkm_u.producer.msgid)) {
+ rd_rkb_dbg(rkb, MSG, "PRODUCE",
+ "%.*s [%" PRId32
+ "]: "
+ "Reconstructed MessageSet "
+ "(%d message(s), %" PRIusz
+ " bytes, "
+ "MsgIds %" PRIu64 "..%" PRIu64 ")",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition, msgcnt, len,
+ msetw->msetw_batch->first_msgid,
+ msetw->msetw_batch->last_msgid);
+ break;
+ }
+
+ /* Check if there is enough space in the current messageset
+ * to add this message.
+ * Since calculating the total size of a request at produce()
+ * time is tricky (we don't know the protocol version or
+ * MsgVersion that will be used), we allow a messageset to
+ * overshoot the message.max.bytes limit by one message to
+ * avoid getting stuck here.
+ * The actual messageset size is enforced by the broker. */
+ if (unlikely(
+ msgcnt == msetw->msetw_msgcntmax ||
+ (msgcnt > 0 && len + rd_kafka_msg_wire_size(
+ rkm, msetw->msetw_MsgVersion) >
+ max_msg_size))) {
+ rd_rkb_dbg(rkb, MSG, "PRODUCE",
+ "%.*s [%" PRId32
+ "]: "
+ "No more space in current MessageSet "
+ "(%i message(s), %" PRIusz " bytes)",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition, msgcnt, len);
+ break;
+ }
+
+ if (unlikely(rkm->rkm_u.producer.ts_backoff > now)) {
+ /* Stop accumulation when we've reached
+ * a message with a retry backoff in the future */
+ break;
+ }
+
+ /* Move message to buffer's queue */
+ rd_kafka_msgq_deq(rkmq, rkm, 1);
+ rd_kafka_msgq_enq(&msetw->msetw_batch->msgq, rkm);
+
+ msetw->msetw_messages_kvlen += rkm->rkm_len + rkm->rkm_key_len;
+
+ /* Add internal latency metrics */
+ rd_avg_add(&rkb->rkb_avg_int_latency,
+ int_latency_base - rkm->rkm_ts_timeout);
+
+ /* MessageSet v2's .MaxTimestamp field */
+ if (unlikely(MaxTimestamp < rkm->rkm_timestamp))
+ MaxTimestamp = rkm->rkm_timestamp;
+
+ /* Write message to buffer */
+ len += rd_kafka_msgset_writer_write_msg(msetw, rkm, msgcnt, 0,
+ NULL);
+
+ msgcnt++;
+
+ } while ((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs)));
+
+ msetw->msetw_MaxTimestamp = MaxTimestamp;
+
+ /* Idempotent Producer:
+ * When reconstructing a batch to retry make sure
+ * the original message sequence span matches identically
+ * or we can't guarantee exactly-once delivery.
+ * If this check fails we raise a fatal error since
+ * it is unrecoverable and most likely caused by a bug
+ * in the client implementation.
+ * This should not be considered an abortable error for
+ * the transactional producer. */
+ if (msgcnt > 0 && msetw->msetw_batch->last_msgid) {
+ rd_kafka_msg_t *lastmsg;
+
+ lastmsg = rd_kafka_msgq_last(&msetw->msetw_batch->msgq);
+ rd_assert(lastmsg);
+
+ if (unlikely(lastmsg->rkm_u.producer.msgid !=
+ msetw->msetw_batch->last_msgid)) {
+ rd_kafka_set_fatal_error(
+ rkb->rkb_rk, RD_KAFKA_RESP_ERR__INCONSISTENT,
+ "Unable to reconstruct MessageSet "
+ "(currently with %d message(s)) "
+ "with msgid range %" PRIu64 "..%" PRIu64
+ ": "
+ "last message added has msgid %" PRIu64
+ ": "
+ "unable to guarantee consistency",
+ msgcnt, msetw->msetw_batch->first_msgid,
+ msetw->msetw_batch->last_msgid,
+ lastmsg->rkm_u.producer.msgid);
+ return 0;
+ }
+ }
+ return 1;
+}
+
+
+#if WITH_ZLIB
+/**
+ * @brief Compress messageset using gzip/zlib
+ */
+static int rd_kafka_msgset_writer_compress_gzip(rd_kafka_msgset_writer_t *msetw,
+ rd_slice_t *slice,
+ struct iovec *ciov) {
+
+ rd_kafka_broker_t *rkb = msetw->msetw_rkb;
+ rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
+ z_stream strm;
+ size_t len = rd_slice_remains(slice);
+ const void *p;
+ size_t rlen;
+ int r;
+ int comp_level =
+ msetw->msetw_rktp->rktp_rkt->rkt_conf.compression_level;
+
+ memset(&strm, 0, sizeof(strm));
+ r = deflateInit2(&strm, comp_level, Z_DEFLATED, 15 + 16, 8,
+ Z_DEFAULT_STRATEGY);
+ if (r != Z_OK) {
+ rd_rkb_log(rkb, LOG_ERR, "GZIP",
+ "Failed to initialize gzip for "
+ "compressing %" PRIusz
+ " bytes in "
+ "topic %.*s [%" PRId32
+ "]: %s (%i): "
+ "sending uncompressed",
+ len, RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition, strm.msg ? strm.msg : "", r);
+ return -1;
+ }
+
+ /* Calculate maximum compressed size and
+ * allocate an output buffer accordingly, being
+ * prefixed with the Message header. */
+ ciov->iov_len = deflateBound(&strm, (uLong)rd_slice_remains(slice));
+ ciov->iov_base = rd_malloc(ciov->iov_len);
+
+ strm.next_out = (void *)ciov->iov_base;
+ strm.avail_out = (uInt)ciov->iov_len;
+
+ /* Iterate through each segment and compress it. */
+ while ((rlen = rd_slice_reader(slice, &p))) {
+
+ strm.next_in = (void *)p;
+ strm.avail_in = (uInt)rlen;
+
+ /* Compress message */
+ if ((r = deflate(&strm, Z_NO_FLUSH)) != Z_OK) {
+ rd_rkb_log(rkb, LOG_ERR, "GZIP",
+ "Failed to gzip-compress "
+ "%" PRIusz " bytes (%" PRIusz
+ " total) for "
+ "topic %.*s [%" PRId32
+ "]: "
+ "%s (%i): "
+ "sending uncompressed",
+ rlen, len,
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ strm.msg ? strm.msg : "", r);
+ deflateEnd(&strm);
+ rd_free(ciov->iov_base);
+ return -1;
+ }
+
+ rd_kafka_assert(rkb->rkb_rk, strm.avail_in == 0);
+ }
+
+ /* Finish the compression */
+ if ((r = deflate(&strm, Z_FINISH)) != Z_STREAM_END) {
+ rd_rkb_log(rkb, LOG_ERR, "GZIP",
+ "Failed to finish gzip compression "
+ " of %" PRIusz
+ " bytes for "
+ "topic %.*s [%" PRId32
+ "]: "
+ "%s (%i): "
+ "sending uncompressed",
+ len, RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition, strm.msg ? strm.msg : "", r);
+ deflateEnd(&strm);
+ rd_free(ciov->iov_base);
+ return -1;
+ }
+
+ ciov->iov_len = strm.total_out;
+
+ /* Deinitialize compression */
+ deflateEnd(&strm);
+
+ return 0;
+}
+#endif
+
+
+#if WITH_SNAPPY
+/**
+ * @brief Compress messageset using Snappy
+ */
+static int
+rd_kafka_msgset_writer_compress_snappy(rd_kafka_msgset_writer_t *msetw,
+ rd_slice_t *slice,
+ struct iovec *ciov) {
+ rd_kafka_broker_t *rkb = msetw->msetw_rkb;
+ rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
+ struct iovec *iov;
+ size_t iov_max, iov_cnt;
+ struct snappy_env senv;
+ size_t len = rd_slice_remains(slice);
+ int r;
+
+ /* Initialize snappy compression environment */
+ rd_kafka_snappy_init_env_sg(&senv, 1 /*iov enable*/);
+
+ /* Calculate maximum compressed size and
+ * allocate an output buffer accordingly. */
+ ciov->iov_len = rd_kafka_snappy_max_compressed_length(len);
+ ciov->iov_base = rd_malloc(ciov->iov_len);
+
+ iov_max = slice->buf->rbuf_segment_cnt;
+ iov = rd_alloca(sizeof(*iov) * iov_max);
+
+ rd_slice_get_iov(slice, iov, &iov_cnt, iov_max, len);
+
+ /* Compress each message */
+ if ((r = rd_kafka_snappy_compress_iov(&senv, iov, iov_cnt, len,
+ ciov)) != 0) {
+ rd_rkb_log(rkb, LOG_ERR, "SNAPPY",
+ "Failed to snappy-compress "
+ "%" PRIusz
+ " bytes for "
+ "topic %.*s [%" PRId32
+ "]: %s: "
+ "sending uncompressed",
+ len, RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition, rd_strerror(-r));
+ rd_free(ciov->iov_base);
+ return -1;
+ }
+
+ /* rd_free snappy environment */
+ rd_kafka_snappy_free_env(&senv);
+
+ return 0;
+}
+#endif
+
+/**
+ * @brief Compress messageset using LZ4F
+ */
+static int rd_kafka_msgset_writer_compress_lz4(rd_kafka_msgset_writer_t *msetw,
+ rd_slice_t *slice,
+ struct iovec *ciov) {
+ rd_kafka_resp_err_t err;
+ int comp_level =
+ msetw->msetw_rktp->rktp_rkt->rkt_conf.compression_level;
+ err = rd_kafka_lz4_compress(msetw->msetw_rkb,
+ /* Correct or incorrect HC */
+ msetw->msetw_MsgVersion >= 1 ? 1 : 0,
+ comp_level, slice, &ciov->iov_base,
+ &ciov->iov_len);
+ return (err ? -1 : 0);
+}
+
+#if WITH_ZSTD
+/**
+ * @brief Compress messageset using ZSTD
+ */
+static int rd_kafka_msgset_writer_compress_zstd(rd_kafka_msgset_writer_t *msetw,
+ rd_slice_t *slice,
+ struct iovec *ciov) {
+ rd_kafka_resp_err_t err;
+ int comp_level =
+ msetw->msetw_rktp->rktp_rkt->rkt_conf.compression_level;
+ err = rd_kafka_zstd_compress(msetw->msetw_rkb, comp_level, slice,
+ &ciov->iov_base, &ciov->iov_len);
+ return (err ? -1 : 0);
+}
+#endif
+
+/**
+ * @brief Compress the message set.
+ * @param outlenp in: total uncompressed messages size,
+ * out (on success): returns the compressed buffer size.
+ * @returns 0 on success or if -1 if compression failed.
+ * @remark Compression failures are not critical, we'll just send the
+ * the messageset uncompressed.
+ */
+static int rd_kafka_msgset_writer_compress(rd_kafka_msgset_writer_t *msetw,
+ size_t *outlenp) {
+ rd_buf_t *rbuf = &msetw->msetw_rkbuf->rkbuf_buf;
+ rd_slice_t slice;
+ size_t len = *outlenp;
+ struct iovec ciov = RD_ZERO_INIT; /* Compressed output buffer */
+ int r = -1;
+ size_t outlen;
+
+ rd_assert(rd_buf_len(rbuf) >= msetw->msetw_firstmsg.of + len);
+
+ /* Create buffer slice from firstmsg and onwards */
+ r = rd_slice_init(&slice, rbuf, msetw->msetw_firstmsg.of, len);
+ rd_assert(r == 0 || !*"invalid firstmsg position");
+
+ switch (msetw->msetw_compression) {
+#if WITH_ZLIB
+ case RD_KAFKA_COMPRESSION_GZIP:
+ r = rd_kafka_msgset_writer_compress_gzip(msetw, &slice, &ciov);
+ break;
+#endif
+
+#if WITH_SNAPPY
+ case RD_KAFKA_COMPRESSION_SNAPPY:
+ r = rd_kafka_msgset_writer_compress_snappy(msetw, &slice,
+ &ciov);
+ break;
+#endif
+
+ case RD_KAFKA_COMPRESSION_LZ4:
+ r = rd_kafka_msgset_writer_compress_lz4(msetw, &slice, &ciov);
+ break;
+
+#if WITH_ZSTD
+ case RD_KAFKA_COMPRESSION_ZSTD:
+ r = rd_kafka_msgset_writer_compress_zstd(msetw, &slice, &ciov);
+ break;
+#endif
+
+ default:
+ rd_kafka_assert(NULL,
+ !*"notreached: unsupported compression.codec");
+ break;
+ }
+
+ if (r == -1) /* Compression failed, send uncompressed */
+ return -1;
+
+
+ if (unlikely(ciov.iov_len > len)) {
+ /* If the compressed data is larger than the uncompressed size
+ * then throw it away and send as uncompressed. */
+ rd_free(ciov.iov_base);
+ return -1;
+ }
+
+ /* Set compression codec in MessageSet.Attributes */
+ msetw->msetw_Attributes |= msetw->msetw_compression;
+
+ /* Rewind rkbuf to the pre-message checkpoint (firstmsg)
+ * and replace the original message(s) with the compressed payload,
+ * possibly with version dependent enveloping. */
+ rd_buf_write_seek(rbuf, msetw->msetw_firstmsg.of);
+
+ rd_kafka_assert(msetw->msetw_rkb->rkb_rk, ciov.iov_len < INT32_MAX);
+
+ if (msetw->msetw_MsgVersion == 2) {
+ /* MsgVersion 2 has no inner MessageSet header or wrapping
+ * for compressed messages, just the messages back-to-back,
+ * so we can push the compressed memory directly to the
+ * buffer without wrapping it. */
+ rd_buf_push(rbuf, ciov.iov_base, ciov.iov_len, rd_free);
+ outlen = ciov.iov_len;
+
+ } else {
+ /* Older MessageSets envelope/wrap the compressed MessageSet
+ * in an outer Message. */
+ rd_kafka_msg_t rkm = {.rkm_len = ciov.iov_len,
+ .rkm_payload = ciov.iov_base,
+ .rkm_timestamp =
+ msetw->msetw_firstmsg.timestamp};
+ outlen = rd_kafka_msgset_writer_write_msg(
+ msetw, &rkm, 0, msetw->msetw_compression,
+ rd_free /*free for ciov.iov_base*/);
+ }
+
+ *outlenp = outlen;
+
+ return 0;
+}
+
+
+
+/**
+ * @brief Calculate MessageSet v2 CRC (CRC32C) when messageset is complete.
+ */
+static void
+rd_kafka_msgset_writer_calc_crc_v2(rd_kafka_msgset_writer_t *msetw) {
+ int32_t crc;
+ rd_slice_t slice;
+ int r;
+
+ r = rd_slice_init(&slice, &msetw->msetw_rkbuf->rkbuf_buf,
+ msetw->msetw_of_CRC + 4,
+ rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) -
+ msetw->msetw_of_CRC - 4);
+ rd_assert(!r && *"slice_init failed");
+
+ /* CRC32C calculation */
+ crc = rd_slice_crc32c(&slice);
+
+ /* Update CRC at MessageSet v2 CRC offset */
+ rd_kafka_buf_update_i32(msetw->msetw_rkbuf, msetw->msetw_of_CRC, crc);
+}
+
+/**
+ * @brief Finalize MessageSet v2 header fields.
+ */
+static void rd_kafka_msgset_writer_finalize_MessageSet_v2_header(
+ rd_kafka_msgset_writer_t *msetw) {
+ rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
+ int msgcnt = rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq);
+
+ rd_kafka_assert(NULL, msgcnt > 0);
+ rd_kafka_assert(NULL, msetw->msetw_ApiVersion >= 3);
+
+ msetw->msetw_MessageSetSize =
+ RD_KAFKAP_MSGSET_V2_SIZE + msetw->msetw_messages_len;
+
+ /* MessageSet.Length is the same as
+ * MessageSetSize minus field widths for FirstOffset+Length */
+ rd_kafka_buf_update_i32(
+ rkbuf, msetw->msetw_of_start + RD_KAFKAP_MSGSET_V2_OF_Length,
+ (int32_t)msetw->msetw_MessageSetSize - (8 + 4));
+
+ msetw->msetw_Attributes |= RD_KAFKA_MSG_ATTR_CREATE_TIME;
+
+ if (rd_kafka_is_transactional(msetw->msetw_rkb->rkb_rk))
+ msetw->msetw_Attributes |=
+ RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL;
+
+ rd_kafka_buf_update_i16(
+ rkbuf, msetw->msetw_of_start + RD_KAFKAP_MSGSET_V2_OF_Attributes,
+ msetw->msetw_Attributes);
+
+ rd_kafka_buf_update_i32(rkbuf,
+ msetw->msetw_of_start +
+ RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta,
+ msgcnt - 1);
+
+ rd_kafka_buf_update_i64(
+ rkbuf, msetw->msetw_of_start + RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp,
+ msetw->msetw_firstmsg.timestamp);
+
+ rd_kafka_buf_update_i64(
+ rkbuf, msetw->msetw_of_start + RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp,
+ msetw->msetw_MaxTimestamp);
+
+ rd_kafka_buf_update_i32(
+ rkbuf, msetw->msetw_of_start + RD_KAFKAP_MSGSET_V2_OF_BaseSequence,
+ msetw->msetw_batch->first_seq);
+
+ rd_kafka_buf_update_i32(
+ rkbuf, msetw->msetw_of_start + RD_KAFKAP_MSGSET_V2_OF_RecordCount,
+ msgcnt);
+
+ rd_kafka_msgset_writer_calc_crc_v2(msetw);
+}
+
+
+
+/**
+ * @brief Finalize the MessageSet header, if applicable.
+ */
+static void
+rd_kafka_msgset_writer_finalize_MessageSet(rd_kafka_msgset_writer_t *msetw) {
+ rd_dassert(msetw->msetw_messages_len > 0);
+
+ if (msetw->msetw_MsgVersion == 2)
+ rd_kafka_msgset_writer_finalize_MessageSet_v2_header(msetw);
+ else
+ msetw->msetw_MessageSetSize =
+ RD_KAFKAP_MSGSET_V0_SIZE + msetw->msetw_messages_len;
+
+ /* Update MessageSetSize */
+ rd_kafka_buf_update_i32(msetw->msetw_rkbuf,
+ msetw->msetw_of_MessageSetSize,
+ (int32_t)msetw->msetw_MessageSetSize);
+}
+
+
+/**
+ * @brief Finalize the messageset - call when no more messages are to be
+ * added to the messageset.
+ *
+ * Will compress, update final values, CRCs, etc.
+ *
+ * The messageset writer is destroyed and the buffer is returned
+ * and ready to be transmitted.
+ *
+ * @param MessagetSetSizep will be set to the finalized MessageSetSize
+ *
+ * @returns the buffer to transmit or NULL if there were no messages
+ * in messageset.
+ */
+static rd_kafka_buf_t *
+rd_kafka_msgset_writer_finalize(rd_kafka_msgset_writer_t *msetw,
+ size_t *MessageSetSizep) {
+ rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf;
+ rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
+ size_t len;
+ int cnt;
+
+ /* No messages added, bail out early. */
+ if (unlikely((cnt = rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq)) ==
+ 0)) {
+ rd_kafka_buf_destroy(rkbuf);
+ return NULL;
+ }
+
+ /* Total size of messages */
+ len = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) -
+ msetw->msetw_firstmsg.of;
+ rd_assert(len > 0);
+ rd_assert(len <= (size_t)rktp->rktp_rkt->rkt_rk->rk_conf.max_msg_size);
+
+ rd_atomic64_add(&rktp->rktp_c.tx_msgs, cnt);
+ rd_atomic64_add(&rktp->rktp_c.tx_msg_bytes,
+ msetw->msetw_messages_kvlen);
+
+ /* Idempotent Producer:
+ * Store request's PID for matching on response
+ * if the instance PID has changed and thus made
+ * the request obsolete. */
+ msetw->msetw_rkbuf->rkbuf_u.Produce.batch.pid = msetw->msetw_pid;
+
+ /* Compress the message set */
+ if (msetw->msetw_compression) {
+ if (rd_kafka_msgset_writer_compress(msetw, &len) == -1)
+ msetw->msetw_compression = 0;
+ }
+
+ msetw->msetw_messages_len = len;
+
+ /* Finalize MessageSet header fields */
+ rd_kafka_msgset_writer_finalize_MessageSet(msetw);
+
+ /* Return final MessageSetSize */
+ *MessageSetSizep = msetw->msetw_MessageSetSize;
+
+ rd_rkb_dbg(msetw->msetw_rkb, MSG, "PRODUCE",
+ "%s [%" PRId32
+ "]: "
+ "Produce MessageSet with %i message(s) (%" PRIusz
+ " bytes, "
+ "ApiVersion %d, MsgVersion %d, MsgId %" PRIu64
+ ", "
+ "BaseSeq %" PRId32 ", %s, %s)",
+ rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, cnt,
+ msetw->msetw_MessageSetSize, msetw->msetw_ApiVersion,
+ msetw->msetw_MsgVersion, msetw->msetw_batch->first_msgid,
+ msetw->msetw_batch->first_seq,
+ rd_kafka_pid2str(msetw->msetw_pid),
+ msetw->msetw_compression
+ ? rd_kafka_compression2str(msetw->msetw_compression)
+ : "uncompressed");
+
+ rd_kafka_msgq_verify_order(rktp, &msetw->msetw_batch->msgq,
+ msetw->msetw_batch->first_msgid, rd_false);
+
+ rd_kafka_msgbatch_ready_produce(msetw->msetw_batch);
+
+ return rkbuf;
+}
+
+
+/**
+ * @brief Create ProduceRequest containing as many messages from
+ * the toppar's transmit queue as possible, limited by configuration,
+ * size, etc.
+ *
+ * @param rkb broker to create buffer for
+ * @param rktp toppar to transmit messages for
+ * @param MessagetSetSizep will be set to the final MessageSetSize
+ *
+ * @returns the buffer to transmit or NULL if there were no messages
+ * in messageset.
+ *
+ * @locality broker thread
+ */
+rd_kafka_buf_t *rd_kafka_msgset_create_ProduceRequest(rd_kafka_broker_t *rkb,
+ rd_kafka_toppar_t *rktp,
+ rd_kafka_msgq_t *rkmq,
+ const rd_kafka_pid_t pid,
+ uint64_t epoch_base_msgid,
+ size_t *MessageSetSizep) {
+
+ rd_kafka_msgset_writer_t msetw;
+
+ if (rd_kafka_msgset_writer_init(&msetw, rkb, rktp, rkmq, pid,
+ epoch_base_msgid) <= 0)
+ return NULL;
+
+ if (!rd_kafka_msgset_writer_write_msgq(&msetw, msetw.msetw_msgq)) {
+ /* Error while writing messages to MessageSet,
+ * move all messages back on the xmit queue. */
+ rd_kafka_msgq_insert_msgq(
+ rkmq, &msetw.msetw_batch->msgq,
+ rktp->rktp_rkt->rkt_conf.msg_order_cmp);
+ }
+
+ return rd_kafka_msgset_writer_finalize(&msetw, MessageSetSizep);
+}