diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_writer.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_writer.c | 1445 |
1 files changed, 1445 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_writer.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_writer.c new file mode 100644 index 00000000..beb36bfa --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_writer.c @@ -0,0 +1,1445 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2017 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rd.h" +#include "rdkafka_int.h" +#include "rdkafka_msg.h" +#include "rdkafka_msgset.h" +#include "rdkafka_topic.h" +#include "rdkafka_partition.h" +#include "rdkafka_header.h" +#include "rdkafka_lz4.h" + +#if WITH_ZSTD +#include "rdkafka_zstd.h" +#endif + +#include "snappy.h" +#include "rdvarint.h" +#include "crc32c.h" + + +/** @brief The maxium ProduceRequestion ApiVersion supported by librdkafka */ +static const int16_t rd_kafka_ProduceRequest_max_version = 7; + + +typedef struct rd_kafka_msgset_writer_s { + rd_kafka_buf_t *msetw_rkbuf; /* Backing store buffer (refcounted)*/ + + int16_t msetw_ApiVersion; /* ProduceRequest ApiVersion */ + int msetw_MsgVersion; /* MsgVersion to construct */ + int msetw_features; /* Protocol features to use */ + rd_kafka_compression_t msetw_compression; /**< Compression type */ + int msetw_msgcntmax; /* Max number of messages to send + * in a batch. */ + size_t msetw_messages_len; /* Total size of Messages, with Message + * framing but without + * MessageSet header */ + size_t msetw_messages_kvlen; /* Total size of Message keys + * and values */ + + size_t msetw_MessageSetSize; /* Current MessageSetSize value */ + size_t msetw_of_MessageSetSize; /* offset of MessageSetSize */ + size_t msetw_of_start; /* offset of MessageSet */ + + int msetw_relative_offsets; /* Bool: use relative offsets */ + + /* For MessageSet v2 */ + int msetw_Attributes; /* MessageSet Attributes */ + int64_t msetw_MaxTimestamp; /* Maximum timestamp in batch */ + size_t msetw_of_CRC; /* offset of MessageSet.CRC */ + + rd_kafka_msgbatch_t *msetw_batch; /**< Convenience pointer to + * rkbuf_u.Produce.batch */ + + /* First message information */ + struct { + size_t of; /* rkbuf's first message position */ + int64_t timestamp; + } msetw_firstmsg; + + rd_kafka_pid_t msetw_pid; /**< Idempotent producer's + * current Producer Id */ + rd_kafka_broker_t *msetw_rkb; /* @warning Not a refcounted + * reference! */ + rd_kafka_toppar_t *msetw_rktp; /* @warning Not a refcounted + * reference! */ + rd_kafka_msgq_t *msetw_msgq; /**< Input message queue */ +} rd_kafka_msgset_writer_t; + + + +/** + * @brief Select ApiVersion and MsgVersion to use based on broker's + * feature compatibility. + * + * @returns -1 if a MsgVersion (or ApiVersion) could not be selected, else 0. + * @locality broker thread + */ +static RD_INLINE int +rd_kafka_msgset_writer_select_MsgVersion(rd_kafka_msgset_writer_t *msetw) { + rd_kafka_broker_t *rkb = msetw->msetw_rkb; + rd_kafka_toppar_t *rktp = msetw->msetw_rktp; + const int16_t max_ApiVersion = rd_kafka_ProduceRequest_max_version; + int16_t min_ApiVersion = 0; + int feature; + /* Map compression types to required feature and ApiVersion */ + static const struct { + int feature; + int16_t ApiVersion; + } compr_req[RD_KAFKA_COMPRESSION_NUM] = { + [RD_KAFKA_COMPRESSION_LZ4] = {RD_KAFKA_FEATURE_LZ4, 0}, +#if WITH_ZSTD + [RD_KAFKA_COMPRESSION_ZSTD] = {RD_KAFKA_FEATURE_ZSTD, 7}, +#endif + }; + + if ((feature = rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER2)) { + min_ApiVersion = 3; + msetw->msetw_MsgVersion = 2; + msetw->msetw_features |= feature; + } else if ((feature = rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER1)) { + min_ApiVersion = 2; + msetw->msetw_MsgVersion = 1; + msetw->msetw_features |= feature; + } else { + if ((feature = + rkb->rkb_features & RD_KAFKA_FEATURE_THROTTLETIME)) { + min_ApiVersion = 1; + msetw->msetw_features |= feature; + } else + min_ApiVersion = 0; + msetw->msetw_MsgVersion = 0; + } + + msetw->msetw_compression = rktp->rktp_rkt->rkt_conf.compression_codec; + + /* + * Check that the configured compression type is supported + * by both client and broker, else disable compression. + */ + if (msetw->msetw_compression && + (rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_Produce, 0, + compr_req[msetw->msetw_compression].ApiVersion, NULL) == -1 || + (compr_req[msetw->msetw_compression].feature && + !(msetw->msetw_rkb->rkb_features & + compr_req[msetw->msetw_compression].feature)))) { + if (unlikely( + rd_interval(&rkb->rkb_suppress.unsupported_compression, + /* at most once per day */ + (rd_ts_t)86400 * 1000 * 1000, 0) > 0)) + rd_rkb_log( + rkb, LOG_NOTICE, "COMPRESSION", + "%.*s [%" PRId32 + "]: " + "Broker does not support compression " + "type %s: not compressing batch", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_compression2str(msetw->msetw_compression)); + else + rd_rkb_dbg( + rkb, MSG, "PRODUCE", + "%.*s [%" PRId32 + "]: " + "Broker does not support compression " + "type %s: not compressing batch", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_compression2str(msetw->msetw_compression)); + + msetw->msetw_compression = RD_KAFKA_COMPRESSION_NONE; + } else { + /* Broker supports this compression type. */ + msetw->msetw_features |= + compr_req[msetw->msetw_compression].feature; + + if (min_ApiVersion < + compr_req[msetw->msetw_compression].ApiVersion) + min_ApiVersion = + compr_req[msetw->msetw_compression].ApiVersion; + } + + /* MsgVersion specific setup. */ + switch (msetw->msetw_MsgVersion) { + case 2: + msetw->msetw_relative_offsets = 1; /* OffsetDelta */ + break; + case 1: + if (msetw->msetw_compression != RD_KAFKA_COMPRESSION_NONE) + msetw->msetw_relative_offsets = 1; + break; + } + + /* Set the highest ApiVersion supported by us and broker */ + msetw->msetw_ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_Produce, min_ApiVersion, max_ApiVersion, NULL); + + if (msetw->msetw_ApiVersion == -1) { + rd_kafka_msg_t *rkm; + /* This will only happen if the broker reports none, or + * no matching ProduceRequest versions, which should never + * happen. */ + rd_rkb_log(rkb, LOG_ERR, "PRODUCE", + "%.*s [%" PRId32 + "]: " + "No viable ProduceRequest ApiVersions (v%d..%d) " + "supported by broker: unable to produce", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, min_ApiVersion, + max_ApiVersion); + + /* Back off and retry in 5s */ + rkm = rd_kafka_msgq_first(msetw->msetw_msgq); + rd_assert(rkm); + rkm->rkm_u.producer.ts_backoff = rd_clock() + (5 * 1000 * 1000); + return -1; + } + + /* It should not be possible to get a lower version than requested, + * otherwise the logic in this function is buggy. */ + rd_assert(msetw->msetw_ApiVersion >= min_ApiVersion); + + return 0; +} + + +/** + * @brief Allocate buffer for messageset writer based on a previously set + * up \p msetw. + * + * Allocate iovecs to hold all headers and messages, + * and allocate enough space to allow copies of small messages. + * The allocated size is the minimum of message.max.bytes + * or queued_bytes + msgcntmax * msg_overhead + */ +static void rd_kafka_msgset_writer_alloc_buf(rd_kafka_msgset_writer_t *msetw) { + rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk; + size_t msg_overhead = 0; + size_t hdrsize = 0; + size_t msgsetsize = 0; + size_t bufsize; + + rd_kafka_assert(NULL, !msetw->msetw_rkbuf); + + /* Calculate worst-case buffer size, produce header size, + * message size, etc, this isn't critical but avoids unnecesary + * extra allocations. The buffer will grow as needed if we get + * this wrong. + * + * ProduceRequest headers go in one iovec: + * ProduceRequest v0..2: + * RequiredAcks + Timeout + + * [Topic + [Partition + MessageSetSize]] + * + * ProduceRequest v3: + * TransactionalId + RequiredAcks + Timeout + + * [Topic + [Partition + MessageSetSize + MessageSet]] + */ + + /* + * ProduceRequest header sizes + */ + switch (msetw->msetw_ApiVersion) { + case 7: + case 6: + case 5: + case 4: + case 3: + /* Add TransactionalId */ + hdrsize += RD_KAFKAP_STR_SIZE(rk->rk_eos.transactional_id); + /* FALLTHRU */ + case 0: + case 1: + case 2: + hdrsize += + /* RequiredAcks + Timeout + TopicCnt */ + 2 + 4 + 4 + + /* Topic */ + RD_KAFKAP_STR_SIZE(msetw->msetw_rktp->rktp_rkt->rkt_topic) + + /* PartitionCnt + Partition + MessageSetSize */ + 4 + 4 + 4; + msgsetsize += 4; /* MessageSetSize */ + break; + + default: + RD_NOTREACHED(); + } + + /* + * MsgVersion specific sizes: + * - (Worst-case) Message overhead: message fields + * - MessageSet header size + */ + switch (msetw->msetw_MsgVersion) { + case 0: + /* MsgVer0 */ + msg_overhead = RD_KAFKAP_MESSAGE_V0_OVERHEAD; + break; + case 1: + /* MsgVer1 */ + msg_overhead = RD_KAFKAP_MESSAGE_V1_OVERHEAD; + break; + + case 2: + /* MsgVer2 uses varints, we calculate for the worst-case. */ + msg_overhead += RD_KAFKAP_MESSAGE_V2_MAX_OVERHEAD; + + /* MessageSet header fields */ + msgsetsize += 8 /* BaseOffset */ + 4 /* Length */ + + 4 /* PartitionLeaderEpoch */ + + 1 /* Magic (MsgVersion) */ + + 4 /* CRC (CRC32C) */ + 2 /* Attributes */ + + 4 /* LastOffsetDelta */ + 8 /* BaseTimestamp */ + + 8 /* MaxTimestamp */ + 8 /* ProducerId */ + + 2 /* ProducerEpoch */ + 4 /* BaseSequence */ + + 4 /* RecordCount */; + break; + + default: + RD_NOTREACHED(); + } + + /* + * Calculate total buffer size to allocate + */ + bufsize = hdrsize + msgsetsize; + + /* If copying for small payloads is enabled, allocate enough + * space for each message to be copied based on this limit. + */ + if (rk->rk_conf.msg_copy_max_size > 0) { + size_t queued_bytes = rd_kafka_msgq_size(msetw->msetw_msgq); + bufsize += + RD_MIN(queued_bytes, (size_t)rk->rk_conf.msg_copy_max_size * + msetw->msetw_msgcntmax); + } + + /* Add estimed per-message overhead */ + bufsize += msg_overhead * msetw->msetw_msgcntmax; + + /* Cap allocation at message.max.bytes */ + if (bufsize > (size_t)rk->rk_conf.max_msg_size) + bufsize = (size_t)rk->rk_conf.max_msg_size; + + /* + * Allocate iovecs to hold all headers and messages, + * and allocate auxilliery space for message headers, etc. + */ + msetw->msetw_rkbuf = + rd_kafka_buf_new_request(msetw->msetw_rkb, RD_KAFKAP_Produce, + msetw->msetw_msgcntmax / 2 + 10, bufsize); + + rd_kafka_buf_ApiVersion_set(msetw->msetw_rkbuf, msetw->msetw_ApiVersion, + msetw->msetw_features); +} + + +/** + * @brief Write the MessageSet header. + * @remark Must only be called for MsgVersion 2 + */ +static void rd_kafka_msgset_writer_write_MessageSet_v2_header( + rd_kafka_msgset_writer_t *msetw) { + rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf; + + rd_kafka_assert(NULL, msetw->msetw_ApiVersion >= 3); + rd_kafka_assert(NULL, msetw->msetw_MsgVersion == 2); + + /* BaseOffset (also store the offset to the start of + * the messageset header fields) */ + msetw->msetw_of_start = rd_kafka_buf_write_i64(rkbuf, 0); + + /* Length: updated later */ + rd_kafka_buf_write_i32(rkbuf, 0); + + /* PartitionLeaderEpoch (KIP-101) */ + rd_kafka_buf_write_i32(rkbuf, 0); + + /* Magic (MsgVersion) */ + rd_kafka_buf_write_i8(rkbuf, msetw->msetw_MsgVersion); + + /* CRC (CRC32C): updated later. + * CRC needs to be done after the entire messageset+messages has + * been constructed and the following header fields updated. :( + * Save the offset for this position. so it can be udpated later. */ + msetw->msetw_of_CRC = rd_kafka_buf_write_i32(rkbuf, 0); + + /* Attributes: updated later */ + rd_kafka_buf_write_i16(rkbuf, 0); + + /* LastOffsetDelta: updated later */ + rd_kafka_buf_write_i32(rkbuf, 0); + + /* BaseTimestamp: updated later */ + rd_kafka_buf_write_i64(rkbuf, 0); + + /* MaxTimestamp: updated later */ + rd_kafka_buf_write_i64(rkbuf, 0); + + /* ProducerId */ + rd_kafka_buf_write_i64(rkbuf, msetw->msetw_pid.id); + + /* ProducerEpoch */ + rd_kafka_buf_write_i16(rkbuf, msetw->msetw_pid.epoch); + + /* BaseSequence: updated later in case of Idempotent Producer */ + rd_kafka_buf_write_i32(rkbuf, -1); + + /* RecordCount: udpated later */ + rd_kafka_buf_write_i32(rkbuf, 0); +} + + +/** + * @brief Write ProduceRequest headers. + * When this function returns the msgset is ready for + * writing individual messages. + * msetw_MessageSetSize will have been set to the messageset header. + */ +static void +rd_kafka_msgset_writer_write_Produce_header(rd_kafka_msgset_writer_t *msetw) { + + rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf; + rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk; + rd_kafka_topic_t *rkt = msetw->msetw_rktp->rktp_rkt; + + /* V3: TransactionalId */ + if (msetw->msetw_ApiVersion >= 3) + rd_kafka_buf_write_kstr(rkbuf, rk->rk_eos.transactional_id); + + /* RequiredAcks */ + rd_kafka_buf_write_i16(rkbuf, rkt->rkt_conf.required_acks); + + /* Timeout */ + rd_kafka_buf_write_i32(rkbuf, rkt->rkt_conf.request_timeout_ms); + + /* TopicArrayCnt */ + rd_kafka_buf_write_i32(rkbuf, 1); + + /* Insert topic */ + rd_kafka_buf_write_kstr(rkbuf, rkt->rkt_topic); + + /* PartitionArrayCnt */ + rd_kafka_buf_write_i32(rkbuf, 1); + + /* Partition */ + rd_kafka_buf_write_i32(rkbuf, msetw->msetw_rktp->rktp_partition); + + /* MessageSetSize: Will be finalized later*/ + msetw->msetw_of_MessageSetSize = rd_kafka_buf_write_i32(rkbuf, 0); + + if (msetw->msetw_MsgVersion == 2) { + /* MessageSet v2 header */ + rd_kafka_msgset_writer_write_MessageSet_v2_header(msetw); + msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V2_SIZE; + } else { + /* Older MessageSet */ + msetw->msetw_MessageSetSize = RD_KAFKAP_MSGSET_V0_SIZE; + } +} + + +/** + * @brief Initialize a ProduceRequest MessageSet writer for + * the given broker and partition. + * + * A new buffer will be allocated to fit the pending messages in queue. + * + * @returns the number of messages to enqueue + * + * @remark This currently constructs the entire ProduceRequest, containing + * a single outer MessageSet for a single partition. + * + * @locality broker thread + */ +static int rd_kafka_msgset_writer_init(rd_kafka_msgset_writer_t *msetw, + rd_kafka_broker_t *rkb, + rd_kafka_toppar_t *rktp, + rd_kafka_msgq_t *rkmq, + rd_kafka_pid_t pid, + uint64_t epoch_base_msgid) { + int msgcnt = rd_kafka_msgq_len(rkmq); + + if (msgcnt == 0) + return 0; + + memset(msetw, 0, sizeof(*msetw)); + + msetw->msetw_rktp = rktp; + msetw->msetw_rkb = rkb; + msetw->msetw_msgq = rkmq; + msetw->msetw_pid = pid; + + /* Max number of messages to send in a batch, + * limited by current queue size or configured batch size, + * whichever is lower. */ + msetw->msetw_msgcntmax = + RD_MIN(msgcnt, rkb->rkb_rk->rk_conf.batch_num_messages); + rd_dassert(msetw->msetw_msgcntmax > 0); + + /* Select MsgVersion to use */ + if (rd_kafka_msgset_writer_select_MsgVersion(msetw) == -1) + return -1; + + /* Allocate backing buffer */ + rd_kafka_msgset_writer_alloc_buf(msetw); + + /* Construct first part of Produce header + MessageSet header */ + rd_kafka_msgset_writer_write_Produce_header(msetw); + + /* The current buffer position is now where the first message + * is located. + * Record the current buffer position so it can be rewound later + * in case of compression. */ + msetw->msetw_firstmsg.of = + rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf); + + rd_kafka_msgbatch_init(&msetw->msetw_rkbuf->rkbuf_u.Produce.batch, rktp, + pid, epoch_base_msgid); + msetw->msetw_batch = &msetw->msetw_rkbuf->rkbuf_u.Produce.batch; + + return msetw->msetw_msgcntmax; +} + + + +/** + * @brief Copy or link message payload to buffer. + */ +static RD_INLINE void +rd_kafka_msgset_writer_write_msg_payload(rd_kafka_msgset_writer_t *msetw, + const rd_kafka_msg_t *rkm, + void (*free_cb)(void *)) { + const rd_kafka_t *rk = msetw->msetw_rkb->rkb_rk; + rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf; + + /* If payload is below the copy limit and there is still + * room in the buffer we'll copy the payload to the buffer, + * otherwise we push a reference to the memory. */ + if (rkm->rkm_len <= (size_t)rk->rk_conf.msg_copy_max_size && + rd_buf_write_remains(&rkbuf->rkbuf_buf) > rkm->rkm_len) { + rd_kafka_buf_write(rkbuf, rkm->rkm_payload, rkm->rkm_len); + if (free_cb) + free_cb(rkm->rkm_payload); + } else + rd_kafka_buf_push(rkbuf, rkm->rkm_payload, rkm->rkm_len, + free_cb); +} + + +/** + * @brief Write message headers to buffer. + * + * @remark The enveloping HeaderCount varint must already have been written. + * @returns the number of bytes written to msetw->msetw_rkbuf + */ +static size_t +rd_kafka_msgset_writer_write_msg_headers(rd_kafka_msgset_writer_t *msetw, + const rd_kafka_headers_t *hdrs) { + rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf; + const rd_kafka_header_t *hdr; + int i; + size_t start_pos = rd_buf_write_pos(&rkbuf->rkbuf_buf); + size_t written; + + RD_LIST_FOREACH(hdr, &hdrs->rkhdrs_list, i) { + rd_kafka_buf_write_varint(rkbuf, hdr->rkhdr_name_size); + rd_kafka_buf_write(rkbuf, hdr->rkhdr_name, + hdr->rkhdr_name_size); + rd_kafka_buf_write_varint( + rkbuf, + hdr->rkhdr_value ? (int64_t)hdr->rkhdr_value_size : -1); + rd_kafka_buf_write(rkbuf, hdr->rkhdr_value, + hdr->rkhdr_value_size); + } + + written = rd_buf_write_pos(&rkbuf->rkbuf_buf) - start_pos; + rd_dassert(written == hdrs->rkhdrs_ser_size); + + return written; +} + + + +/** + * @brief Write message to messageset buffer with MsgVersion 0 or 1. + * @returns the number of bytes written. + */ +static size_t +rd_kafka_msgset_writer_write_msg_v0_1(rd_kafka_msgset_writer_t *msetw, + rd_kafka_msg_t *rkm, + int64_t Offset, + int8_t MsgAttributes, + void (*free_cb)(void *)) { + rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf; + size_t MessageSize; + size_t of_Crc; + + /* + * MessageSet's (v0 and v1) per-Message header. + */ + + /* Offset (only relevant for compressed messages on MsgVersion v1) */ + rd_kafka_buf_write_i64(rkbuf, Offset); + + /* MessageSize */ + MessageSize = 4 + 1 + 1 + /* Crc+MagicByte+Attributes */ + 4 /* KeyLength */ + rkm->rkm_key_len + + 4 /* ValueLength */ + rkm->rkm_len; + + if (msetw->msetw_MsgVersion == 1) + MessageSize += 8; /* Timestamp i64 */ + + rd_kafka_buf_write_i32(rkbuf, (int32_t)MessageSize); + + /* + * Message + */ + /* Crc: will be updated later */ + of_Crc = rd_kafka_buf_write_i32(rkbuf, 0); + + /* Start Crc calculation of all buf writes. */ + rd_kafka_buf_crc_init(rkbuf); + + /* MagicByte */ + rd_kafka_buf_write_i8(rkbuf, msetw->msetw_MsgVersion); + + /* Attributes */ + rd_kafka_buf_write_i8(rkbuf, MsgAttributes); + + /* V1: Timestamp */ + if (msetw->msetw_MsgVersion == 1) + rd_kafka_buf_write_i64(rkbuf, rkm->rkm_timestamp); + + /* Message Key */ + rd_kafka_buf_write_bytes(rkbuf, rkm->rkm_key, rkm->rkm_key_len); + + /* Write or copy Value/payload */ + if (rkm->rkm_payload) { + rd_kafka_buf_write_i32(rkbuf, (int32_t)rkm->rkm_len); + rd_kafka_msgset_writer_write_msg_payload(msetw, rkm, free_cb); + } else + rd_kafka_buf_write_i32(rkbuf, RD_KAFKAP_BYTES_LEN_NULL); + + /* Finalize Crc */ + rd_kafka_buf_update_u32(rkbuf, of_Crc, + rd_kafka_buf_crc_finalize(rkbuf)); + + + /* Return written message size */ + return 8 /*Offset*/ + 4 /*MessageSize*/ + MessageSize; +} + +/** + * @brief Write message to messageset buffer with MsgVersion 2. + * @returns the number of bytes written. + */ +static size_t +rd_kafka_msgset_writer_write_msg_v2(rd_kafka_msgset_writer_t *msetw, + rd_kafka_msg_t *rkm, + int64_t Offset, + int8_t MsgAttributes, + void (*free_cb)(void *)) { + rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf; + size_t MessageSize = 0; + char varint_Length[RD_UVARINT_ENC_SIZEOF(int32_t)]; + char varint_TimestampDelta[RD_UVARINT_ENC_SIZEOF(int64_t)]; + char varint_OffsetDelta[RD_UVARINT_ENC_SIZEOF(int64_t)]; + char varint_KeyLen[RD_UVARINT_ENC_SIZEOF(int32_t)]; + char varint_ValueLen[RD_UVARINT_ENC_SIZEOF(int32_t)]; + char varint_HeaderCount[RD_UVARINT_ENC_SIZEOF(int32_t)]; + size_t sz_Length; + size_t sz_TimestampDelta; + size_t sz_OffsetDelta; + size_t sz_KeyLen; + size_t sz_ValueLen; + size_t sz_HeaderCount; + int HeaderCount = 0; + size_t HeaderSize = 0; + + if (rkm->rkm_headers) { + HeaderCount = rkm->rkm_headers->rkhdrs_list.rl_cnt; + HeaderSize = rkm->rkm_headers->rkhdrs_ser_size; + } + + /* All varints, except for Length, needs to be pre-built + * so that the Length field can be set correctly and thus have + * correct varint encoded width. */ + + sz_TimestampDelta = rd_uvarint_enc_i64( + varint_TimestampDelta, sizeof(varint_TimestampDelta), + rkm->rkm_timestamp - msetw->msetw_firstmsg.timestamp); + sz_OffsetDelta = rd_uvarint_enc_i64(varint_OffsetDelta, + sizeof(varint_OffsetDelta), Offset); + sz_KeyLen = rd_uvarint_enc_i32(varint_KeyLen, sizeof(varint_KeyLen), + rkm->rkm_key + ? (int32_t)rkm->rkm_key_len + : (int32_t)RD_KAFKAP_BYTES_LEN_NULL); + sz_ValueLen = rd_uvarint_enc_i32( + varint_ValueLen, sizeof(varint_ValueLen), + rkm->rkm_payload ? (int32_t)rkm->rkm_len + : (int32_t)RD_KAFKAP_BYTES_LEN_NULL); + sz_HeaderCount = + rd_uvarint_enc_i32(varint_HeaderCount, sizeof(varint_HeaderCount), + (int32_t)HeaderCount); + + /* Calculate MessageSize without length of Length (added later) + * to store it in Length. */ + MessageSize = 1 /* MsgAttributes */ + sz_TimestampDelta + + sz_OffsetDelta + sz_KeyLen + rkm->rkm_key_len + + sz_ValueLen + rkm->rkm_len + sz_HeaderCount + HeaderSize; + + /* Length */ + sz_Length = rd_uvarint_enc_i64(varint_Length, sizeof(varint_Length), + MessageSize); + rd_kafka_buf_write(rkbuf, varint_Length, sz_Length); + MessageSize += sz_Length; + + /* Attributes: The MsgAttributes argument is losely based on MsgVer0 + * which don't apply for MsgVer2 */ + rd_kafka_buf_write_i8(rkbuf, 0); + + /* TimestampDelta */ + rd_kafka_buf_write(rkbuf, varint_TimestampDelta, sz_TimestampDelta); + + /* OffsetDelta */ + rd_kafka_buf_write(rkbuf, varint_OffsetDelta, sz_OffsetDelta); + + /* KeyLen */ + rd_kafka_buf_write(rkbuf, varint_KeyLen, sz_KeyLen); + + /* Key (if any) */ + if (rkm->rkm_key) + rd_kafka_buf_write(rkbuf, rkm->rkm_key, rkm->rkm_key_len); + + /* ValueLen */ + rd_kafka_buf_write(rkbuf, varint_ValueLen, sz_ValueLen); + + /* Write or copy Value/payload */ + if (rkm->rkm_payload) + rd_kafka_msgset_writer_write_msg_payload(msetw, rkm, free_cb); + + /* HeaderCount */ + rd_kafka_buf_write(rkbuf, varint_HeaderCount, sz_HeaderCount); + + /* Headers array */ + if (rkm->rkm_headers) + rd_kafka_msgset_writer_write_msg_headers(msetw, + rkm->rkm_headers); + + /* Return written message size */ + return MessageSize; +} + + +/** + * @brief Write message to messageset buffer. + * @returns the number of bytes written. + */ +static size_t rd_kafka_msgset_writer_write_msg(rd_kafka_msgset_writer_t *msetw, + rd_kafka_msg_t *rkm, + int64_t Offset, + int8_t MsgAttributes, + void (*free_cb)(void *)) { + size_t outlen; + size_t (*writer[])(rd_kafka_msgset_writer_t *, rd_kafka_msg_t *, + int64_t, int8_t, void (*)(void *)) = { + [0] = rd_kafka_msgset_writer_write_msg_v0_1, + [1] = rd_kafka_msgset_writer_write_msg_v0_1, + [2] = rd_kafka_msgset_writer_write_msg_v2}; + size_t actual_written; + size_t pre_pos; + + if (likely(rkm->rkm_timestamp)) + MsgAttributes |= RD_KAFKA_MSG_ATTR_CREATE_TIME; + + pre_pos = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf); + + outlen = writer[msetw->msetw_MsgVersion](msetw, rkm, Offset, + MsgAttributes, free_cb); + + actual_written = + rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) - pre_pos; + rd_assert(outlen <= + rd_kafka_msg_wire_size(rkm, msetw->msetw_MsgVersion)); + rd_assert(outlen == actual_written); + + return outlen; +} + +/** + * @brief Write as many messages from the given message queue to + * the messageset. + * + * May not write any messages. + * + * @returns 1 on success or 0 on error. + */ +static int rd_kafka_msgset_writer_write_msgq(rd_kafka_msgset_writer_t *msetw, + rd_kafka_msgq_t *rkmq) { + rd_kafka_toppar_t *rktp = msetw->msetw_rktp; + rd_kafka_broker_t *rkb = msetw->msetw_rkb; + size_t len = rd_buf_len(&msetw->msetw_rkbuf->rkbuf_buf); + size_t max_msg_size = + RD_MIN((size_t)msetw->msetw_rkb->rkb_rk->rk_conf.max_msg_size, + (size_t)msetw->msetw_rkb->rkb_rk->rk_conf.batch_size); + rd_ts_t int_latency_base; + rd_ts_t MaxTimestamp = 0; + rd_kafka_msg_t *rkm; + int msgcnt = 0; + const rd_ts_t now = rd_clock(); + + /* Internal latency calculation base. + * Uses rkm_ts_timeout which is enqueue time + timeout */ + int_latency_base = + now + ((rd_ts_t)rktp->rktp_rkt->rkt_conf.message_timeout_ms * 1000); + + /* Acquire BaseTimestamp from first message. */ + rkm = TAILQ_FIRST(&rkmq->rkmq_msgs); + rd_kafka_assert(NULL, rkm); + msetw->msetw_firstmsg.timestamp = rkm->rkm_timestamp; + + rd_kafka_msgbatch_set_first_msg(msetw->msetw_batch, rkm); + + /* + * Write as many messages as possible until buffer is full + * or limit reached. + */ + do { + if (unlikely(msetw->msetw_batch->last_msgid && + msetw->msetw_batch->last_msgid < + rkm->rkm_u.producer.msgid)) { + rd_rkb_dbg(rkb, MSG, "PRODUCE", + "%.*s [%" PRId32 + "]: " + "Reconstructed MessageSet " + "(%d message(s), %" PRIusz + " bytes, " + "MsgIds %" PRIu64 "..%" PRIu64 ")", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, msgcnt, len, + msetw->msetw_batch->first_msgid, + msetw->msetw_batch->last_msgid); + break; + } + + /* Check if there is enough space in the current messageset + * to add this message. + * Since calculating the total size of a request at produce() + * time is tricky (we don't know the protocol version or + * MsgVersion that will be used), we allow a messageset to + * overshoot the message.max.bytes limit by one message to + * avoid getting stuck here. + * The actual messageset size is enforced by the broker. */ + if (unlikely( + msgcnt == msetw->msetw_msgcntmax || + (msgcnt > 0 && len + rd_kafka_msg_wire_size( + rkm, msetw->msetw_MsgVersion) > + max_msg_size))) { + rd_rkb_dbg(rkb, MSG, "PRODUCE", + "%.*s [%" PRId32 + "]: " + "No more space in current MessageSet " + "(%i message(s), %" PRIusz " bytes)", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, msgcnt, len); + break; + } + + if (unlikely(rkm->rkm_u.producer.ts_backoff > now)) { + /* Stop accumulation when we've reached + * a message with a retry backoff in the future */ + break; + } + + /* Move message to buffer's queue */ + rd_kafka_msgq_deq(rkmq, rkm, 1); + rd_kafka_msgq_enq(&msetw->msetw_batch->msgq, rkm); + + msetw->msetw_messages_kvlen += rkm->rkm_len + rkm->rkm_key_len; + + /* Add internal latency metrics */ + rd_avg_add(&rkb->rkb_avg_int_latency, + int_latency_base - rkm->rkm_ts_timeout); + + /* MessageSet v2's .MaxTimestamp field */ + if (unlikely(MaxTimestamp < rkm->rkm_timestamp)) + MaxTimestamp = rkm->rkm_timestamp; + + /* Write message to buffer */ + len += rd_kafka_msgset_writer_write_msg(msetw, rkm, msgcnt, 0, + NULL); + + msgcnt++; + + } while ((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs))); + + msetw->msetw_MaxTimestamp = MaxTimestamp; + + /* Idempotent Producer: + * When reconstructing a batch to retry make sure + * the original message sequence span matches identically + * or we can't guarantee exactly-once delivery. + * If this check fails we raise a fatal error since + * it is unrecoverable and most likely caused by a bug + * in the client implementation. + * This should not be considered an abortable error for + * the transactional producer. */ + if (msgcnt > 0 && msetw->msetw_batch->last_msgid) { + rd_kafka_msg_t *lastmsg; + + lastmsg = rd_kafka_msgq_last(&msetw->msetw_batch->msgq); + rd_assert(lastmsg); + + if (unlikely(lastmsg->rkm_u.producer.msgid != + msetw->msetw_batch->last_msgid)) { + rd_kafka_set_fatal_error( + rkb->rkb_rk, RD_KAFKA_RESP_ERR__INCONSISTENT, + "Unable to reconstruct MessageSet " + "(currently with %d message(s)) " + "with msgid range %" PRIu64 "..%" PRIu64 + ": " + "last message added has msgid %" PRIu64 + ": " + "unable to guarantee consistency", + msgcnt, msetw->msetw_batch->first_msgid, + msetw->msetw_batch->last_msgid, + lastmsg->rkm_u.producer.msgid); + return 0; + } + } + return 1; +} + + +#if WITH_ZLIB +/** + * @brief Compress messageset using gzip/zlib + */ +static int rd_kafka_msgset_writer_compress_gzip(rd_kafka_msgset_writer_t *msetw, + rd_slice_t *slice, + struct iovec *ciov) { + + rd_kafka_broker_t *rkb = msetw->msetw_rkb; + rd_kafka_toppar_t *rktp = msetw->msetw_rktp; + z_stream strm; + size_t len = rd_slice_remains(slice); + const void *p; + size_t rlen; + int r; + int comp_level = + msetw->msetw_rktp->rktp_rkt->rkt_conf.compression_level; + + memset(&strm, 0, sizeof(strm)); + r = deflateInit2(&strm, comp_level, Z_DEFLATED, 15 + 16, 8, + Z_DEFAULT_STRATEGY); + if (r != Z_OK) { + rd_rkb_log(rkb, LOG_ERR, "GZIP", + "Failed to initialize gzip for " + "compressing %" PRIusz + " bytes in " + "topic %.*s [%" PRId32 + "]: %s (%i): " + "sending uncompressed", + len, RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, strm.msg ? strm.msg : "", r); + return -1; + } + + /* Calculate maximum compressed size and + * allocate an output buffer accordingly, being + * prefixed with the Message header. */ + ciov->iov_len = deflateBound(&strm, (uLong)rd_slice_remains(slice)); + ciov->iov_base = rd_malloc(ciov->iov_len); + + strm.next_out = (void *)ciov->iov_base; + strm.avail_out = (uInt)ciov->iov_len; + + /* Iterate through each segment and compress it. */ + while ((rlen = rd_slice_reader(slice, &p))) { + + strm.next_in = (void *)p; + strm.avail_in = (uInt)rlen; + + /* Compress message */ + if ((r = deflate(&strm, Z_NO_FLUSH)) != Z_OK) { + rd_rkb_log(rkb, LOG_ERR, "GZIP", + "Failed to gzip-compress " + "%" PRIusz " bytes (%" PRIusz + " total) for " + "topic %.*s [%" PRId32 + "]: " + "%s (%i): " + "sending uncompressed", + rlen, len, + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + strm.msg ? strm.msg : "", r); + deflateEnd(&strm); + rd_free(ciov->iov_base); + return -1; + } + + rd_kafka_assert(rkb->rkb_rk, strm.avail_in == 0); + } + + /* Finish the compression */ + if ((r = deflate(&strm, Z_FINISH)) != Z_STREAM_END) { + rd_rkb_log(rkb, LOG_ERR, "GZIP", + "Failed to finish gzip compression " + " of %" PRIusz + " bytes for " + "topic %.*s [%" PRId32 + "]: " + "%s (%i): " + "sending uncompressed", + len, RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, strm.msg ? strm.msg : "", r); + deflateEnd(&strm); + rd_free(ciov->iov_base); + return -1; + } + + ciov->iov_len = strm.total_out; + + /* Deinitialize compression */ + deflateEnd(&strm); + + return 0; +} +#endif + + +#if WITH_SNAPPY +/** + * @brief Compress messageset using Snappy + */ +static int +rd_kafka_msgset_writer_compress_snappy(rd_kafka_msgset_writer_t *msetw, + rd_slice_t *slice, + struct iovec *ciov) { + rd_kafka_broker_t *rkb = msetw->msetw_rkb; + rd_kafka_toppar_t *rktp = msetw->msetw_rktp; + struct iovec *iov; + size_t iov_max, iov_cnt; + struct snappy_env senv; + size_t len = rd_slice_remains(slice); + int r; + + /* Initialize snappy compression environment */ + rd_kafka_snappy_init_env_sg(&senv, 1 /*iov enable*/); + + /* Calculate maximum compressed size and + * allocate an output buffer accordingly. */ + ciov->iov_len = rd_kafka_snappy_max_compressed_length(len); + ciov->iov_base = rd_malloc(ciov->iov_len); + + iov_max = slice->buf->rbuf_segment_cnt; + iov = rd_alloca(sizeof(*iov) * iov_max); + + rd_slice_get_iov(slice, iov, &iov_cnt, iov_max, len); + + /* Compress each message */ + if ((r = rd_kafka_snappy_compress_iov(&senv, iov, iov_cnt, len, + ciov)) != 0) { + rd_rkb_log(rkb, LOG_ERR, "SNAPPY", + "Failed to snappy-compress " + "%" PRIusz + " bytes for " + "topic %.*s [%" PRId32 + "]: %s: " + "sending uncompressed", + len, RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, rd_strerror(-r)); + rd_free(ciov->iov_base); + return -1; + } + + /* rd_free snappy environment */ + rd_kafka_snappy_free_env(&senv); + + return 0; +} +#endif + +/** + * @brief Compress messageset using LZ4F + */ +static int rd_kafka_msgset_writer_compress_lz4(rd_kafka_msgset_writer_t *msetw, + rd_slice_t *slice, + struct iovec *ciov) { + rd_kafka_resp_err_t err; + int comp_level = + msetw->msetw_rktp->rktp_rkt->rkt_conf.compression_level; + err = rd_kafka_lz4_compress(msetw->msetw_rkb, + /* Correct or incorrect HC */ + msetw->msetw_MsgVersion >= 1 ? 1 : 0, + comp_level, slice, &ciov->iov_base, + &ciov->iov_len); + return (err ? -1 : 0); +} + +#if WITH_ZSTD +/** + * @brief Compress messageset using ZSTD + */ +static int rd_kafka_msgset_writer_compress_zstd(rd_kafka_msgset_writer_t *msetw, + rd_slice_t *slice, + struct iovec *ciov) { + rd_kafka_resp_err_t err; + int comp_level = + msetw->msetw_rktp->rktp_rkt->rkt_conf.compression_level; + err = rd_kafka_zstd_compress(msetw->msetw_rkb, comp_level, slice, + &ciov->iov_base, &ciov->iov_len); + return (err ? -1 : 0); +} +#endif + +/** + * @brief Compress the message set. + * @param outlenp in: total uncompressed messages size, + * out (on success): returns the compressed buffer size. + * @returns 0 on success or if -1 if compression failed. + * @remark Compression failures are not critical, we'll just send the + * the messageset uncompressed. + */ +static int rd_kafka_msgset_writer_compress(rd_kafka_msgset_writer_t *msetw, + size_t *outlenp) { + rd_buf_t *rbuf = &msetw->msetw_rkbuf->rkbuf_buf; + rd_slice_t slice; + size_t len = *outlenp; + struct iovec ciov = RD_ZERO_INIT; /* Compressed output buffer */ + int r = -1; + size_t outlen; + + rd_assert(rd_buf_len(rbuf) >= msetw->msetw_firstmsg.of + len); + + /* Create buffer slice from firstmsg and onwards */ + r = rd_slice_init(&slice, rbuf, msetw->msetw_firstmsg.of, len); + rd_assert(r == 0 || !*"invalid firstmsg position"); + + switch (msetw->msetw_compression) { +#if WITH_ZLIB + case RD_KAFKA_COMPRESSION_GZIP: + r = rd_kafka_msgset_writer_compress_gzip(msetw, &slice, &ciov); + break; +#endif + +#if WITH_SNAPPY + case RD_KAFKA_COMPRESSION_SNAPPY: + r = rd_kafka_msgset_writer_compress_snappy(msetw, &slice, + &ciov); + break; +#endif + + case RD_KAFKA_COMPRESSION_LZ4: + r = rd_kafka_msgset_writer_compress_lz4(msetw, &slice, &ciov); + break; + +#if WITH_ZSTD + case RD_KAFKA_COMPRESSION_ZSTD: + r = rd_kafka_msgset_writer_compress_zstd(msetw, &slice, &ciov); + break; +#endif + + default: + rd_kafka_assert(NULL, + !*"notreached: unsupported compression.codec"); + break; + } + + if (r == -1) /* Compression failed, send uncompressed */ + return -1; + + + if (unlikely(ciov.iov_len > len)) { + /* If the compressed data is larger than the uncompressed size + * then throw it away and send as uncompressed. */ + rd_free(ciov.iov_base); + return -1; + } + + /* Set compression codec in MessageSet.Attributes */ + msetw->msetw_Attributes |= msetw->msetw_compression; + + /* Rewind rkbuf to the pre-message checkpoint (firstmsg) + * and replace the original message(s) with the compressed payload, + * possibly with version dependent enveloping. */ + rd_buf_write_seek(rbuf, msetw->msetw_firstmsg.of); + + rd_kafka_assert(msetw->msetw_rkb->rkb_rk, ciov.iov_len < INT32_MAX); + + if (msetw->msetw_MsgVersion == 2) { + /* MsgVersion 2 has no inner MessageSet header or wrapping + * for compressed messages, just the messages back-to-back, + * so we can push the compressed memory directly to the + * buffer without wrapping it. */ + rd_buf_push(rbuf, ciov.iov_base, ciov.iov_len, rd_free); + outlen = ciov.iov_len; + + } else { + /* Older MessageSets envelope/wrap the compressed MessageSet + * in an outer Message. */ + rd_kafka_msg_t rkm = {.rkm_len = ciov.iov_len, + .rkm_payload = ciov.iov_base, + .rkm_timestamp = + msetw->msetw_firstmsg.timestamp}; + outlen = rd_kafka_msgset_writer_write_msg( + msetw, &rkm, 0, msetw->msetw_compression, + rd_free /*free for ciov.iov_base*/); + } + + *outlenp = outlen; + + return 0; +} + + + +/** + * @brief Calculate MessageSet v2 CRC (CRC32C) when messageset is complete. + */ +static void +rd_kafka_msgset_writer_calc_crc_v2(rd_kafka_msgset_writer_t *msetw) { + int32_t crc; + rd_slice_t slice; + int r; + + r = rd_slice_init(&slice, &msetw->msetw_rkbuf->rkbuf_buf, + msetw->msetw_of_CRC + 4, + rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) - + msetw->msetw_of_CRC - 4); + rd_assert(!r && *"slice_init failed"); + + /* CRC32C calculation */ + crc = rd_slice_crc32c(&slice); + + /* Update CRC at MessageSet v2 CRC offset */ + rd_kafka_buf_update_i32(msetw->msetw_rkbuf, msetw->msetw_of_CRC, crc); +} + +/** + * @brief Finalize MessageSet v2 header fields. + */ +static void rd_kafka_msgset_writer_finalize_MessageSet_v2_header( + rd_kafka_msgset_writer_t *msetw) { + rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf; + int msgcnt = rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq); + + rd_kafka_assert(NULL, msgcnt > 0); + rd_kafka_assert(NULL, msetw->msetw_ApiVersion >= 3); + + msetw->msetw_MessageSetSize = + RD_KAFKAP_MSGSET_V2_SIZE + msetw->msetw_messages_len; + + /* MessageSet.Length is the same as + * MessageSetSize minus field widths for FirstOffset+Length */ + rd_kafka_buf_update_i32( + rkbuf, msetw->msetw_of_start + RD_KAFKAP_MSGSET_V2_OF_Length, + (int32_t)msetw->msetw_MessageSetSize - (8 + 4)); + + msetw->msetw_Attributes |= RD_KAFKA_MSG_ATTR_CREATE_TIME; + + if (rd_kafka_is_transactional(msetw->msetw_rkb->rkb_rk)) + msetw->msetw_Attributes |= + RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL; + + rd_kafka_buf_update_i16( + rkbuf, msetw->msetw_of_start + RD_KAFKAP_MSGSET_V2_OF_Attributes, + msetw->msetw_Attributes); + + rd_kafka_buf_update_i32(rkbuf, + msetw->msetw_of_start + + RD_KAFKAP_MSGSET_V2_OF_LastOffsetDelta, + msgcnt - 1); + + rd_kafka_buf_update_i64( + rkbuf, msetw->msetw_of_start + RD_KAFKAP_MSGSET_V2_OF_BaseTimestamp, + msetw->msetw_firstmsg.timestamp); + + rd_kafka_buf_update_i64( + rkbuf, msetw->msetw_of_start + RD_KAFKAP_MSGSET_V2_OF_MaxTimestamp, + msetw->msetw_MaxTimestamp); + + rd_kafka_buf_update_i32( + rkbuf, msetw->msetw_of_start + RD_KAFKAP_MSGSET_V2_OF_BaseSequence, + msetw->msetw_batch->first_seq); + + rd_kafka_buf_update_i32( + rkbuf, msetw->msetw_of_start + RD_KAFKAP_MSGSET_V2_OF_RecordCount, + msgcnt); + + rd_kafka_msgset_writer_calc_crc_v2(msetw); +} + + + +/** + * @brief Finalize the MessageSet header, if applicable. + */ +static void +rd_kafka_msgset_writer_finalize_MessageSet(rd_kafka_msgset_writer_t *msetw) { + rd_dassert(msetw->msetw_messages_len > 0); + + if (msetw->msetw_MsgVersion == 2) + rd_kafka_msgset_writer_finalize_MessageSet_v2_header(msetw); + else + msetw->msetw_MessageSetSize = + RD_KAFKAP_MSGSET_V0_SIZE + msetw->msetw_messages_len; + + /* Update MessageSetSize */ + rd_kafka_buf_update_i32(msetw->msetw_rkbuf, + msetw->msetw_of_MessageSetSize, + (int32_t)msetw->msetw_MessageSetSize); +} + + +/** + * @brief Finalize the messageset - call when no more messages are to be + * added to the messageset. + * + * Will compress, update final values, CRCs, etc. + * + * The messageset writer is destroyed and the buffer is returned + * and ready to be transmitted. + * + * @param MessagetSetSizep will be set to the finalized MessageSetSize + * + * @returns the buffer to transmit or NULL if there were no messages + * in messageset. + */ +static rd_kafka_buf_t * +rd_kafka_msgset_writer_finalize(rd_kafka_msgset_writer_t *msetw, + size_t *MessageSetSizep) { + rd_kafka_buf_t *rkbuf = msetw->msetw_rkbuf; + rd_kafka_toppar_t *rktp = msetw->msetw_rktp; + size_t len; + int cnt; + + /* No messages added, bail out early. */ + if (unlikely((cnt = rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq)) == + 0)) { + rd_kafka_buf_destroy(rkbuf); + return NULL; + } + + /* Total size of messages */ + len = rd_buf_write_pos(&msetw->msetw_rkbuf->rkbuf_buf) - + msetw->msetw_firstmsg.of; + rd_assert(len > 0); + rd_assert(len <= (size_t)rktp->rktp_rkt->rkt_rk->rk_conf.max_msg_size); + + rd_atomic64_add(&rktp->rktp_c.tx_msgs, cnt); + rd_atomic64_add(&rktp->rktp_c.tx_msg_bytes, + msetw->msetw_messages_kvlen); + + /* Idempotent Producer: + * Store request's PID for matching on response + * if the instance PID has changed and thus made + * the request obsolete. */ + msetw->msetw_rkbuf->rkbuf_u.Produce.batch.pid = msetw->msetw_pid; + + /* Compress the message set */ + if (msetw->msetw_compression) { + if (rd_kafka_msgset_writer_compress(msetw, &len) == -1) + msetw->msetw_compression = 0; + } + + msetw->msetw_messages_len = len; + + /* Finalize MessageSet header fields */ + rd_kafka_msgset_writer_finalize_MessageSet(msetw); + + /* Return final MessageSetSize */ + *MessageSetSizep = msetw->msetw_MessageSetSize; + + rd_rkb_dbg(msetw->msetw_rkb, MSG, "PRODUCE", + "%s [%" PRId32 + "]: " + "Produce MessageSet with %i message(s) (%" PRIusz + " bytes, " + "ApiVersion %d, MsgVersion %d, MsgId %" PRIu64 + ", " + "BaseSeq %" PRId32 ", %s, %s)", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, cnt, + msetw->msetw_MessageSetSize, msetw->msetw_ApiVersion, + msetw->msetw_MsgVersion, msetw->msetw_batch->first_msgid, + msetw->msetw_batch->first_seq, + rd_kafka_pid2str(msetw->msetw_pid), + msetw->msetw_compression + ? rd_kafka_compression2str(msetw->msetw_compression) + : "uncompressed"); + + rd_kafka_msgq_verify_order(rktp, &msetw->msetw_batch->msgq, + msetw->msetw_batch->first_msgid, rd_false); + + rd_kafka_msgbatch_ready_produce(msetw->msetw_batch); + + return rkbuf; +} + + +/** + * @brief Create ProduceRequest containing as many messages from + * the toppar's transmit queue as possible, limited by configuration, + * size, etc. + * + * @param rkb broker to create buffer for + * @param rktp toppar to transmit messages for + * @param MessagetSetSizep will be set to the final MessageSetSize + * + * @returns the buffer to transmit or NULL if there were no messages + * in messageset. + * + * @locality broker thread + */ +rd_kafka_buf_t *rd_kafka_msgset_create_ProduceRequest(rd_kafka_broker_t *rkb, + rd_kafka_toppar_t *rktp, + rd_kafka_msgq_t *rkmq, + const rd_kafka_pid_t pid, + uint64_t epoch_base_msgid, + size_t *MessageSetSizep) { + + rd_kafka_msgset_writer_t msetw; + + if (rd_kafka_msgset_writer_init(&msetw, rkb, rktp, rkmq, pid, + epoch_base_msgid) <= 0) + return NULL; + + if (!rd_kafka_msgset_writer_write_msgq(&msetw, msetw.msetw_msgq)) { + /* Error while writing messages to MessageSet, + * move all messages back on the xmit queue. */ + rd_kafka_msgq_insert_msgq( + rkmq, &msetw.msetw_batch->msgq, + rktp->rktp_rkt->rkt_conf.msg_order_cmp); + } + + return rd_kafka_msgset_writer_finalize(&msetw, MessageSetSizep); +} |