diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_reader.c | |
parent | Initial commit. (diff) | |
download | netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.tar.xz netdata-be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_reader.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_reader.c | 1794 |
1 files changed, 1794 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_reader.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_reader.c new file mode 100644 index 00000000..58779f3b --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_msgset_reader.c @@ -0,0 +1,1794 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2017 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * @name MessageSet reader interface + * + * Parses FetchResponse for Messages + * + * + * @remark + * The broker may send partial messages, when this happens we bail out + * silently and keep the messages that we successfully parsed. + * + * "A Guide To The Kafka Protocol" states: + * "As an optimization the server is allowed to + * return a partial message at the end of the + * message set. + * Clients should handle this case." + * + * We're handling it by not passing the error upstream. + * This is why most err_parse: goto labels (that are called from buf parsing + * macros) suppress the error message and why log_decode_errors is off + * unless PROTOCOL debugging is enabled. + * + * When a FetchResponse contains multiple partitions, each partition's + * MessageSet may be partial, regardless of the other partitions. + * To make sure the next partition can be parsed, each partition parse + * uses its own sub-slice of only that partition's MessageSetSize length. + */ + +#include "rd.h" +#include "rdunittest.h" +#include "rdavl.h" +#include "rdlist.h" +#include "rdkafka_int.h" +#include "rdkafka_msg.h" +#include "rdkafka_msgset.h" +#include "rdkafka_topic.h" +#include "rdkafka_partition.h" +#include "rdkafka_header.h" +#include "rdkafka_lz4.h" + +#include "rdvarint.h" +#include "crc32c.h" + +#if WITH_ZLIB +#include "rdgz.h" +#endif +#if WITH_SNAPPY +#include "snappy.h" +#endif +#if WITH_ZSTD +#include "rdkafka_zstd.h" +#endif + + +static RD_INLINE int64_t +rd_kafka_aborted_txns_pop_offset(rd_kafka_aborted_txns_t *aborted_txns, + int64_t pid, + int64_t max_offset); +static RD_INLINE int64_t +rd_kafka_aborted_txns_get_offset(const rd_kafka_aborted_txns_t *aborted_txns, + int64_t pid); + + +struct msgset_v2_hdr { + int64_t BaseOffset; + int32_t Length; + int32_t PartitionLeaderEpoch; + int8_t MagicByte; + int32_t Crc; + int16_t Attributes; + int32_t LastOffsetDelta; + int64_t BaseTimestamp; + int64_t MaxTimestamp; + int64_t PID; + int16_t ProducerEpoch; + int32_t BaseSequence; + int32_t RecordCount; +}; + + +/** + * @struct rd_kafka_aborted_txn_start_offsets_t + * + * @brief A sorted list of aborted transaction start offsets + * (ascending) for a PID, and an offset into that list. + */ +typedef struct rd_kafka_aborted_txn_start_offsets_s { + rd_avl_node_t avl_node; + int64_t pid; + int offsets_idx; + rd_list_t offsets; +} rd_kafka_aborted_txn_start_offsets_t; + + +typedef struct rd_kafka_msgset_reader_s { + rd_kafka_buf_t *msetr_rkbuf; /**< Response read buffer */ + + int msetr_relative_offsets; /**< Bool: using relative offsets */ + + /**< Outer/wrapper Message fields. */ + struct { + int64_t offset; /**< Relative_offsets: outer message's + * Offset (last offset) */ + rd_kafka_timestamp_type_t tstype; /**< Compressed + * MessageSet's + * timestamp type. */ + int64_t timestamp; /**< ... timestamp*/ + } msetr_outer; + + struct msgset_v2_hdr *msetr_v2_hdr; /**< MessageSet v2 header */ + + /* + * Aborted Transaction Start Offsets. These are arranged in a map + * (ABORTED_TXN_OFFSETS), with PID as the key and value as follows: + * - OFFSETS: sorted list of aborted transaction start offsets + * (ascending) + * - IDX: an index into OFFSETS list, initialized to 0. + * + * The logic for processing fetched data is as follows (note: this is + * different from the Java client): + * + * 1. If the message is a transaction control message and the status is + * ABORT then increment ABORTED_TXN_OFFSETS(PID).IDX. note: sanity check + * that OFFSETS[ABORTED_TXN_OFFSETS(PID).IDX] is less than the current + * offset before incrementing. If the status is COMMIT, do nothing. + * + * 2. If the message is a normal message, find the corresponding OFFSETS + * list in ABORTED_TXN_OFFSETS. If it doesn't exist, then keep the + * message. If the PID does exist, compare ABORTED_TXN_OFFSETS(PID).IDX + * with len(OFFSETS). If it's >= then the message should be kept. If + * not, compare the message offset with + * OFFSETS[ABORTED_TXN_OFFSETS(PID).IDX]. If it's greater than or equal + * to this value, then the message should be ignored. If it's less than, + * then the message should be kept. + * + * Note: A MessageSet comprises messages from at most one transaction, + * so the logic in step 2 is done at the message set level. + */ + rd_kafka_aborted_txns_t *msetr_aborted_txns; + + const struct rd_kafka_toppar_ver *msetr_tver; /**< Toppar op version of + * request. */ + + int32_t msetr_leader_epoch; /**< Current MessageSet's partition + * leader epoch (or -1). */ + + int32_t msetr_broker_id; /**< Broker id (of msetr_rkb) */ + rd_kafka_broker_t *msetr_rkb; /* @warning Not a refcounted + * reference! */ + rd_kafka_toppar_t *msetr_rktp; /* @warning Not a refcounted + * reference! */ + + int msetr_msgcnt; /**< Number of messages in rkq */ + int64_t msetr_msg_bytes; /**< Number of bytes in rkq */ + rd_kafka_q_t msetr_rkq; /**< Temp Message and error queue */ + rd_kafka_q_t *msetr_par_rkq; /**< Parent message and error queue, + * the temp msetr_rkq will be moved + * to this queue when parsing + * is done. + * Refcount is not increased. */ + + int64_t msetr_next_offset; /**< Next offset to fetch after + * this reader run is done. + * Optional: only used for special + * cases where the per-message offset + * can't be relied on for next + * fetch offset, such as with + * compacted topics. */ + + int msetr_ctrl_cnt; /**< Number of control messages + * or MessageSets received. */ + + int msetr_aborted_cnt; /**< Number of aborted MessageSets + * encountered. */ + + const char *msetr_srcname; /**< Optional message source string, + * used in debug logging to + * indicate messages were + * from an inner compressed + * message set. + * Not freed (use const memory). + * Add trailing space. */ + + rd_kafka_compression_t msetr_compression; /**< Compression codec */ +} rd_kafka_msgset_reader_t; + + + +/* Forward declarations */ +static rd_kafka_resp_err_t +rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr); +static rd_kafka_resp_err_t +rd_kafka_msgset_reader_msgs_v2(rd_kafka_msgset_reader_t *msetr); + + +/** + * @brief Set up a MessageSet reader but don't start reading messages. + */ +static void rd_kafka_msgset_reader_init(rd_kafka_msgset_reader_t *msetr, + rd_kafka_buf_t *rkbuf, + rd_kafka_toppar_t *rktp, + const struct rd_kafka_toppar_ver *tver, + rd_kafka_aborted_txns_t *aborted_txns, + rd_kafka_q_t *par_rkq) { + + memset(msetr, 0, sizeof(*msetr)); + + msetr->msetr_rkb = rkbuf->rkbuf_rkb; + msetr->msetr_leader_epoch = -1; + msetr->msetr_broker_id = rd_kafka_broker_id(msetr->msetr_rkb); + msetr->msetr_rktp = rktp; + msetr->msetr_aborted_txns = aborted_txns; + msetr->msetr_tver = tver; + msetr->msetr_rkbuf = rkbuf; + msetr->msetr_srcname = ""; + + rkbuf->rkbuf_uflow_mitigation = "truncated response from broker (ok)"; + + /* All parsed messages are put on this temporary op + * queue first and then moved in one go to the real op queue. */ + rd_kafka_q_init(&msetr->msetr_rkq, msetr->msetr_rkb->rkb_rk); + + /* Make sure enqueued ops get the correct serve/opaque reflecting the + * original queue. */ + msetr->msetr_rkq.rkq_serve = par_rkq->rkq_serve; + msetr->msetr_rkq.rkq_opaque = par_rkq->rkq_opaque; + + /* Keep (non-refcounted) reference to parent queue for + * moving the messages and events in msetr_rkq to when + * parsing is done. */ + msetr->msetr_par_rkq = par_rkq; +} + + + +/** + * @brief Decompress MessageSet, pass the uncompressed MessageSet to + * the MessageSet reader. + */ +static rd_kafka_resp_err_t +rd_kafka_msgset_reader_decompress(rd_kafka_msgset_reader_t *msetr, + int MsgVersion, + int Attributes, + int64_t Timestamp, + int64_t Offset, + const void *compressed, + size_t compressed_size) { + struct iovec iov = {.iov_base = NULL, .iov_len = 0}; + rd_kafka_toppar_t *rktp = msetr->msetr_rktp; + int codec = Attributes & RD_KAFKA_MSG_ATTR_COMPRESSION_MASK; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafka_buf_t *rkbufz; + + msetr->msetr_compression = codec; + + switch (codec) { +#if WITH_ZLIB + case RD_KAFKA_COMPRESSION_GZIP: { + uint64_t outlenx = 0; + + /* Decompress Message payload */ + iov.iov_base = rd_gz_decompress(compressed, + (int)compressed_size, &outlenx); + if (unlikely(!iov.iov_base)) { + rd_rkb_dbg(msetr->msetr_rkb, MSG, "GZIP", + "Failed to decompress Gzip " + "message at offset %" PRId64 " of %" PRIusz + " bytes: " + "ignoring message", + Offset, compressed_size); + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + goto err; + } + + iov.iov_len = (size_t)outlenx; + } break; +#endif + +#if WITH_SNAPPY + case RD_KAFKA_COMPRESSION_SNAPPY: { + const char *inbuf = compressed; + size_t inlen = compressed_size; + int r; + static const unsigned char snappy_java_magic[] = { + 0x82, 'S', 'N', 'A', 'P', 'P', 'Y', 0}; + static const size_t snappy_java_hdrlen = 8 + 4 + 4; + + /* snappy-java adds its own header (SnappyCodec) + * which is not compatible with the official Snappy + * implementation. + * 8: magic, 4: version, 4: compatible + * followed by any number of chunks: + * 4: length + * ...: snappy-compressed data. */ + if (likely(inlen > snappy_java_hdrlen + 4 && + !memcmp(inbuf, snappy_java_magic, 8))) { + /* snappy-java framing */ + char errstr[128]; + + inbuf = inbuf + snappy_java_hdrlen; + inlen -= snappy_java_hdrlen; + iov.iov_base = rd_kafka_snappy_java_uncompress( + inbuf, inlen, &iov.iov_len, errstr, sizeof(errstr)); + + if (unlikely(!iov.iov_base)) { + rd_rkb_dbg(msetr->msetr_rkb, MSG, "SNAPPY", + "%s [%" PRId32 + "]: " + "Snappy decompression for message " + "at offset %" PRId64 + " failed: %s: " + "ignoring message", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, Offset, + errstr); + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + goto err; + } + + + } else { + /* No framing */ + + /* Acquire uncompressed length */ + if (unlikely(!rd_kafka_snappy_uncompressed_length( + inbuf, inlen, &iov.iov_len))) { + rd_rkb_dbg(msetr->msetr_rkb, MSG, "SNAPPY", + "Failed to get length of Snappy " + "compressed payload " + "for message at offset %" PRId64 + " (%" PRIusz + " bytes): " + "ignoring message", + Offset, inlen); + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + goto err; + } + + /* Allocate output buffer for uncompressed data */ + iov.iov_base = rd_malloc(iov.iov_len); + if (unlikely(!iov.iov_base)) { + rd_rkb_dbg(msetr->msetr_rkb, MSG, "SNAPPY", + "Failed to allocate Snappy " + "decompress buffer of size %" PRIusz + "for message at offset %" PRId64 + " (%" PRIusz + " bytes): %s: " + "ignoring message", + iov.iov_len, Offset, inlen, + rd_strerror(errno)); + err = RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE; + goto err; + } + + /* Uncompress to outbuf */ + if (unlikely((r = rd_kafka_snappy_uncompress( + inbuf, inlen, iov.iov_base)))) { + rd_rkb_dbg(msetr->msetr_rkb, MSG, "SNAPPY", + "Failed to decompress Snappy " + "payload for message at offset " + "%" PRId64 " (%" PRIusz + " bytes): %s: " + "ignoring message", + Offset, inlen, + rd_strerror(-r /*negative errno*/)); + rd_free(iov.iov_base); + err = RD_KAFKA_RESP_ERR__BAD_COMPRESSION; + goto err; + } + } + + } break; +#endif + + case RD_KAFKA_COMPRESSION_LZ4: { + err = + rd_kafka_lz4_decompress(msetr->msetr_rkb, + /* Proper HC? */ + MsgVersion >= 1 ? 1 : 0, Offset, + /* @warning Will modify compressed + * if no proper HC */ + (char *)compressed, compressed_size, + &iov.iov_base, &iov.iov_len); + if (err) + goto err; + } break; + +#if WITH_ZSTD + case RD_KAFKA_COMPRESSION_ZSTD: { + err = rd_kafka_zstd_decompress( + msetr->msetr_rkb, (char *)compressed, compressed_size, + &iov.iov_base, &iov.iov_len); + if (err) + goto err; + } break; +#endif + + default: + rd_rkb_dbg(msetr->msetr_rkb, MSG, "CODEC", + "%s [%" PRId32 "]: Message at offset %" PRId64 + " with unsupported " + "compression codec 0x%x: message ignored", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + Offset, (int)codec); + + err = RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED; + goto err; + } + + + rd_assert(iov.iov_base); + + /* + * Decompression successful + */ + + /* Create a new buffer pointing to the uncompressed + * allocated buffer (outbuf) and let messages keep a reference to + * this new buffer. */ + rkbufz = rd_kafka_buf_new_shadow(iov.iov_base, iov.iov_len, rd_free); + rkbufz->rkbuf_rkb = msetr->msetr_rkbuf->rkbuf_rkb; + rd_kafka_broker_keep(rkbufz->rkbuf_rkb); + + + /* In MsgVersion v0..1 the decompressed data contains + * an inner MessageSet, pass it to a new MessageSet reader. + * + * For MsgVersion v2 the decompressed data are the list of messages. + */ + + if (MsgVersion <= 1) { + /* Pass decompressed data (inner Messageset) + * to new instance of the MessageSet parser. */ + rd_kafka_msgset_reader_t inner_msetr; + rd_kafka_msgset_reader_init( + &inner_msetr, rkbufz, msetr->msetr_rktp, msetr->msetr_tver, + /* there is no aborted transaction + * support for MsgVersion < 2 */ + NULL, &msetr->msetr_rkq); + + inner_msetr.msetr_srcname = "compressed "; + + if (MsgVersion == 1) { + /* postproc() will convert relative to + * absolute offsets */ + inner_msetr.msetr_relative_offsets = 1; + inner_msetr.msetr_outer.offset = Offset; + + /* Apply single LogAppendTime timestamp for + * all messages. */ + if (Attributes & RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME) { + inner_msetr.msetr_outer.tstype = + RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME; + inner_msetr.msetr_outer.timestamp = Timestamp; + } + } + + /* Parse the inner MessageSet */ + err = rd_kafka_msgset_reader_run(&inner_msetr); + + /* Transfer message count from inner to outer */ + msetr->msetr_msgcnt += inner_msetr.msetr_msgcnt; + msetr->msetr_msg_bytes += inner_msetr.msetr_msg_bytes; + + + } else { + /* MsgVersion 2 */ + rd_kafka_buf_t *orig_rkbuf = msetr->msetr_rkbuf; + + rkbufz->rkbuf_uflow_mitigation = + "truncated response from broker (ok)"; + + /* Temporarily replace read buffer with uncompressed buffer */ + msetr->msetr_rkbuf = rkbufz; + + /* Read messages */ + err = rd_kafka_msgset_reader_msgs_v2(msetr); + + /* Restore original buffer */ + msetr->msetr_rkbuf = orig_rkbuf; + } + + /* Loose our refcnt of the uncompressed rkbuf. + * Individual messages/rko's will have their own reference. */ + rd_kafka_buf_destroy(rkbufz); + + return err; + +err: + /* Enqueue error messsage: + * Create op and push on temporary queue. */ + rd_kafka_consumer_err( + &msetr->msetr_rkq, msetr->msetr_broker_id, err, + msetr->msetr_tver->version, NULL, rktp, Offset, + "Decompression (codec 0x%x) of message at %" PRIu64 " of %" PRIusz + " bytes failed: %s", + codec, Offset, compressed_size, rd_kafka_err2str(err)); + + return err; +} + + + +/** + * @brief Message parser for MsgVersion v0..1 + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR on success or on single-message errors, + * or any other error code when the MessageSet parser should stop + * parsing (such as for partial Messages). + */ +static rd_kafka_resp_err_t +rd_kafka_msgset_reader_msg_v0_1(rd_kafka_msgset_reader_t *msetr) { + rd_kafka_buf_t *rkbuf = msetr->msetr_rkbuf; + rd_kafka_toppar_t *rktp = msetr->msetr_rktp; + rd_kafka_broker_t *rkb = msetr->msetr_rkb; + struct { + int64_t Offset; /* MessageSet header */ + int32_t MessageSize; /* MessageSet header */ + int32_t Crc; + int8_t MagicByte; /* MsgVersion */ + int8_t Attributes; + int64_t Timestamp; /* v1 */ + } hdr; /* Message header */ + rd_kafkap_bytes_t Key; + rd_kafkap_bytes_t Value; + int32_t Value_len; + rd_kafka_op_t *rko; + size_t hdrsize = 6; /* Header size following MessageSize */ + rd_slice_t crc_slice; + rd_kafka_msg_t *rkm; + int relative_offsets = 0; + const char *reloff_str = ""; + /* Only log decoding errors if protocol debugging enabled. */ + int log_decode_errors = + (rkbuf->rkbuf_rkb->rkb_rk->rk_conf.debug & RD_KAFKA_DBG_PROTOCOL) + ? LOG_DEBUG + : 0; + size_t message_end; + + rd_kafka_buf_read_i64(rkbuf, &hdr.Offset); + rd_kafka_buf_read_i32(rkbuf, &hdr.MessageSize); + message_end = rd_slice_offset(&rkbuf->rkbuf_reader) + hdr.MessageSize; + + rd_kafka_buf_read_i32(rkbuf, &hdr.Crc); + if (!rd_slice_narrow_copy_relative(&rkbuf->rkbuf_reader, &crc_slice, + hdr.MessageSize - 4)) + rd_kafka_buf_check_len(rkbuf, hdr.MessageSize - 4); + + rd_kafka_buf_read_i8(rkbuf, &hdr.MagicByte); + rd_kafka_buf_read_i8(rkbuf, &hdr.Attributes); + + if (hdr.MagicByte == 1) { /* MsgVersion */ + rd_kafka_buf_read_i64(rkbuf, &hdr.Timestamp); + hdrsize += 8; + /* MsgVersion 1 has relative offsets for compressed + * MessageSets*/ + if (!(hdr.Attributes & RD_KAFKA_MSG_ATTR_COMPRESSION_MASK) && + msetr->msetr_relative_offsets) { + relative_offsets = 1; + reloff_str = "relative "; + } + } else + hdr.Timestamp = 0; + + /* Verify MessageSize */ + if (unlikely(hdr.MessageSize < (ssize_t)hdrsize)) + rd_kafka_buf_parse_fail( + rkbuf, + "Message at %soffset %" PRId64 " MessageSize %" PRId32 + " < hdrsize %" PRIusz, + reloff_str, hdr.Offset, hdr.MessageSize, hdrsize); + + /* Early check for partial messages */ + rd_kafka_buf_check_len(rkbuf, hdr.MessageSize - hdrsize); + + if (rkb->rkb_rk->rk_conf.check_crcs) { + /* Verify CRC32 if desired. */ + uint32_t calc_crc; + + calc_crc = rd_slice_crc32(&crc_slice); + rd_dassert(rd_slice_remains(&crc_slice) == 0); + + if (unlikely(hdr.Crc != (int32_t)calc_crc)) { + /* Propagate CRC error to application and + * continue with next message. */ + rd_kafka_consumer_err( + &msetr->msetr_rkq, msetr->msetr_broker_id, + RD_KAFKA_RESP_ERR__BAD_MSG, + msetr->msetr_tver->version, NULL, rktp, hdr.Offset, + "Message at %soffset %" PRId64 " (%" PRId32 + " bytes) " + "failed CRC32 check " + "(original 0x%" PRIx32 + " != " + "calculated 0x%" PRIx32 ")", + reloff_str, hdr.Offset, hdr.MessageSize, hdr.Crc, + calc_crc); + rd_kafka_buf_skip_to(rkbuf, message_end); + rd_atomic64_add(&rkb->rkb_c.rx_err, 1); + /* Continue with next message */ + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + } + + + /* Extract key */ + rd_kafka_buf_read_bytes(rkbuf, &Key); + + /* Extract Value */ + rd_kafka_buf_read_bytes(rkbuf, &Value); + Value_len = RD_KAFKAP_BYTES_LEN(&Value); + + /* MessageSets may contain offsets earlier than we + * requested (compressed MessageSets in particular), + * drop the earlier messages. + * Note: the inner offset may only be trusted for + * absolute offsets. KIP-31 introduced + * ApiVersion 2 that maintains relative offsets + * of compressed messages and the base offset + * in the outer message is the offset of + * the *LAST* message in the MessageSet. + * This requires us to assign offsets + * after all messages have been read from + * the messageset, and it also means + * we cant perform this offset check here + * in that case. */ + if (!relative_offsets && + hdr.Offset < rktp->rktp_offsets.fetch_pos.offset) + return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue with next msg */ + + /* Handle compressed MessageSet */ + if (unlikely(hdr.Attributes & RD_KAFKA_MSG_ATTR_COMPRESSION_MASK)) + return rd_kafka_msgset_reader_decompress( + msetr, hdr.MagicByte, hdr.Attributes, hdr.Timestamp, + hdr.Offset, Value.data, Value_len); + + + /* Pure uncompressed message, this is the innermost + * handler after all compression and cascaded + * MessageSets have been peeled off. */ + + /* Create op/message container for message. */ + rko = rd_kafka_op_new_fetch_msg( + &rkm, rktp, msetr->msetr_tver->version, rkbuf, hdr.Offset, + (size_t)RD_KAFKAP_BYTES_LEN(&Key), + RD_KAFKAP_BYTES_IS_NULL(&Key) ? NULL : Key.data, + (size_t)RD_KAFKAP_BYTES_LEN(&Value), + RD_KAFKAP_BYTES_IS_NULL(&Value) ? NULL : Value.data); + + rkm->rkm_u.consumer.leader_epoch = msetr->msetr_leader_epoch; + rkm->rkm_broker_id = msetr->msetr_broker_id; + + /* Assign message timestamp. + * If message was in a compressed MessageSet and the outer/wrapper + * Message.Attribute had a LOG_APPEND_TIME set, use the + * outer timestamp */ + if (msetr->msetr_outer.tstype == RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME) { + rkm->rkm_timestamp = msetr->msetr_outer.timestamp; + rkm->rkm_tstype = msetr->msetr_outer.tstype; + + } else if (hdr.MagicByte >= 1 && hdr.Timestamp) { + rkm->rkm_timestamp = hdr.Timestamp; + if (hdr.Attributes & RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME) + rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME; + else + rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME; + } + + /* Enqueue message on temporary queue */ + rd_kafka_q_enq(&msetr->msetr_rkq, rko); + msetr->msetr_msgcnt++; + msetr->msetr_msg_bytes += rkm->rkm_key_len + rkm->rkm_len; + + return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue */ + +err_parse: + /* Count all parse errors as partial message errors. */ + rd_atomic64_add(&msetr->msetr_rkb->rkb_c.rx_partial, 1); + return rkbuf->rkbuf_err; +} + + + +/** + * @brief Message parser for MsgVersion v2 + */ +static rd_kafka_resp_err_t +rd_kafka_msgset_reader_msg_v2(rd_kafka_msgset_reader_t *msetr) { + rd_kafka_buf_t *rkbuf = msetr->msetr_rkbuf; + rd_kafka_toppar_t *rktp = msetr->msetr_rktp; + struct { + int64_t Length; + int8_t MsgAttributes; + int64_t TimestampDelta; + int64_t OffsetDelta; + int64_t Offset; /* Absolute offset */ + rd_kafkap_bytes_t Key; + rd_kafkap_bytes_t Value; + rd_kafkap_bytes_t Headers; + } hdr; + rd_kafka_op_t *rko; + rd_kafka_msg_t *rkm; + /* Only log decoding errors if protocol debugging enabled. */ + int log_decode_errors = + (rkbuf->rkbuf_rkb->rkb_rk->rk_conf.debug & RD_KAFKA_DBG_PROTOCOL) + ? LOG_DEBUG + : 0; + size_t message_end; + + rd_kafka_buf_read_varint(rkbuf, &hdr.Length); + message_end = + rd_slice_offset(&rkbuf->rkbuf_reader) + (size_t)hdr.Length; + rd_kafka_buf_read_i8(rkbuf, &hdr.MsgAttributes); + + rd_kafka_buf_read_varint(rkbuf, &hdr.TimestampDelta); + rd_kafka_buf_read_varint(rkbuf, &hdr.OffsetDelta); + hdr.Offset = msetr->msetr_v2_hdr->BaseOffset + hdr.OffsetDelta; + + /* Skip message if outdated */ + if (hdr.Offset < rktp->rktp_offsets.fetch_pos.offset) { + rd_rkb_dbg(msetr->msetr_rkb, MSG, "MSG", + "%s [%" PRId32 + "]: " + "Skip offset %" PRId64 " < fetch_offset %" PRId64, + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + hdr.Offset, rktp->rktp_offsets.fetch_pos.offset); + rd_kafka_buf_skip_to(rkbuf, message_end); + return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue with next msg */ + } + + /* Handle control messages */ + if (msetr->msetr_v2_hdr->Attributes & RD_KAFKA_MSGSET_V2_ATTR_CONTROL) { + struct { + int64_t KeySize; + int16_t Version; + int16_t Type; + } ctrl_data; + int64_t aborted_txn_start_offset; + + rd_kafka_buf_read_varint(rkbuf, &ctrl_data.KeySize); + + if (unlikely(ctrl_data.KeySize < 2)) + rd_kafka_buf_parse_fail( + rkbuf, + "%s [%" PRId32 + "]: " + "Ctrl message at offset %" PRId64 + " has invalid key size %" PRId64, + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, hdr.Offset, + ctrl_data.KeySize); + + rd_kafka_buf_read_i16(rkbuf, &ctrl_data.Version); + + if (ctrl_data.Version != 0) { + rd_rkb_dbg(msetr->msetr_rkb, MSG, "MSG", + "%s [%" PRId32 + "]: " + "Skipping ctrl msg with " + "unsupported version %" PRId16 + " at offset %" PRId64, + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, ctrl_data.Version, + hdr.Offset); + rd_kafka_buf_skip_to(rkbuf, message_end); + return RD_KAFKA_RESP_ERR_NO_ERROR; /* Continue with next + msg */ + } + + if (unlikely(ctrl_data.KeySize != 4)) + rd_kafka_buf_parse_fail( + rkbuf, + "%s [%" PRId32 + "]: " + "Ctrl message at offset %" PRId64 + " has invalid key size %" PRId64, + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, hdr.Offset, + ctrl_data.KeySize); + + rd_kafka_buf_read_i16(rkbuf, &ctrl_data.Type); + + /* Client is uninterested in value of commit marker */ + rd_kafka_buf_skip( + rkbuf, (int32_t)(message_end - + rd_slice_offset(&rkbuf->rkbuf_reader))); + + switch (ctrl_data.Type) { + case RD_KAFKA_CTRL_MSG_COMMIT: + /* always ignore. */ + break; + + case RD_KAFKA_CTRL_MSG_ABORT: + if (msetr->msetr_rkb->rkb_rk->rk_conf.isolation_level != + RD_KAFKA_READ_COMMITTED) + break; + + if (unlikely(!msetr->msetr_aborted_txns)) { + rd_rkb_dbg(msetr->msetr_rkb, + MSG | RD_KAFKA_DBG_EOS, "TXN", + "%s [%" PRId32 + "] received abort txn " + "ctrl msg at offset %" PRId64 + " for " + "PID %" PRId64 + ", but there are no " + "known aborted transactions: " + "ignoring", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, hdr.Offset, + msetr->msetr_v2_hdr->PID); + break; + } + + /* This marks the end of this (aborted) transaction, + * advance to next aborted transaction in list */ + aborted_txn_start_offset = + rd_kafka_aborted_txns_pop_offset( + msetr->msetr_aborted_txns, + msetr->msetr_v2_hdr->PID, hdr.Offset); + + if (unlikely(aborted_txn_start_offset == -1)) { + rd_rkb_dbg(msetr->msetr_rkb, + MSG | RD_KAFKA_DBG_EOS, "TXN", + "%s [%" PRId32 + "] received abort txn " + "ctrl msg at offset %" PRId64 + " for " + "PID %" PRId64 + ", but this offset is " + "not listed as an aborted " + "transaction: aborted transaction " + "was possibly empty: ignoring", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, hdr.Offset, + msetr->msetr_v2_hdr->PID); + break; + } + break; + + + default: + rd_rkb_dbg(msetr->msetr_rkb, MSG, + "TXN" + "%s [%" PRId32 + "]: " + "Unsupported ctrl message " + "type %" PRId16 + " at offset" + " %" PRId64 ": ignoring", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, ctrl_data.Type, + hdr.Offset); + break; + } + + rko = rd_kafka_op_new_ctrl_msg(rktp, msetr->msetr_tver->version, + rkbuf, hdr.Offset); + rd_kafka_q_enq(&msetr->msetr_rkq, rko); + msetr->msetr_msgcnt++; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + /* Regular message */ + + /* Note: messages in aborted transactions are skipped at the MessageSet + * level */ + + rd_kafka_buf_read_bytes_varint(rkbuf, &hdr.Key); + rd_kafka_buf_read_bytes_varint(rkbuf, &hdr.Value); + + /* We parse the Headers later, just store the size (possibly truncated) + * and pointer to the headers. */ + hdr.Headers.len = + (int32_t)(message_end - rd_slice_offset(&rkbuf->rkbuf_reader)); + rd_kafka_buf_read_ptr(rkbuf, &hdr.Headers.data, hdr.Headers.len); + + /* Create op/message container for message. */ + rko = rd_kafka_op_new_fetch_msg( + &rkm, rktp, msetr->msetr_tver->version, rkbuf, hdr.Offset, + (size_t)RD_KAFKAP_BYTES_LEN(&hdr.Key), + RD_KAFKAP_BYTES_IS_NULL(&hdr.Key) ? NULL : hdr.Key.data, + (size_t)RD_KAFKAP_BYTES_LEN(&hdr.Value), + RD_KAFKAP_BYTES_IS_NULL(&hdr.Value) ? NULL : hdr.Value.data); + + rkm->rkm_u.consumer.leader_epoch = msetr->msetr_leader_epoch; + rkm->rkm_broker_id = msetr->msetr_broker_id; + + /* Store pointer to unparsed message headers, they will + * be parsed on the first access. + * This pointer points to the rkbuf payload. + * Note: can't perform struct copy here due to const fields (MSVC) */ + rkm->rkm_u.consumer.binhdrs.len = hdr.Headers.len; + rkm->rkm_u.consumer.binhdrs.data = hdr.Headers.data; + + /* Set timestamp. + * + * When broker assigns the timestamps (LOG_APPEND_TIME) it will + * assign the same timestamp for all messages in a MessageSet + * using MaxTimestamp. + */ + if ((msetr->msetr_v2_hdr->Attributes & + RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME) || + (hdr.MsgAttributes & RD_KAFKA_MSG_ATTR_LOG_APPEND_TIME)) { + rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME; + rkm->rkm_timestamp = msetr->msetr_v2_hdr->MaxTimestamp; + } else { + rkm->rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME; + rkm->rkm_timestamp = + msetr->msetr_v2_hdr->BaseTimestamp + hdr.TimestampDelta; + } + + + /* Enqueue message on temporary queue */ + rd_kafka_q_enq(&msetr->msetr_rkq, rko); + msetr->msetr_msgcnt++; + msetr->msetr_msg_bytes += rkm->rkm_key_len + rkm->rkm_len; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + /* Count all parse errors as partial message errors. */ + rd_atomic64_add(&msetr->msetr_rkb->rkb_c.rx_partial, 1); + return rkbuf->rkbuf_err; +} + + +/** + * @brief Read v2 messages from current buffer position. + */ +static rd_kafka_resp_err_t +rd_kafka_msgset_reader_msgs_v2(rd_kafka_msgset_reader_t *msetr) { + rd_kafka_buf_t *rkbuf = msetr->msetr_rkbuf; + rd_kafka_toppar_t *rktp = msetr->msetr_rktp; + /* Only log decoding errors if protocol debugging enabled. */ + int log_decode_errors = + (rkbuf->rkbuf_rkb->rkb_rk->rk_conf.debug & RD_KAFKA_DBG_PROTOCOL) + ? LOG_DEBUG + : 0; + + if (msetr->msetr_aborted_txns != NULL && + (msetr->msetr_v2_hdr->Attributes & + (RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL | + RD_KAFKA_MSGSET_V2_ATTR_CONTROL)) == + RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL) { + /* Transactional non-control MessageSet: + * check if it is part of an aborted transaction. */ + int64_t txn_start_offset = rd_kafka_aborted_txns_get_offset( + msetr->msetr_aborted_txns, msetr->msetr_v2_hdr->PID); + + if (txn_start_offset != -1 && + msetr->msetr_v2_hdr->BaseOffset >= txn_start_offset) { + /* MessageSet is part of aborted transaction */ + rd_rkb_dbg(msetr->msetr_rkb, MSG, "MSG", + "%s [%" PRId32 + "]: " + "Skipping %" PRId32 + " message(s) " + "in aborted transaction " + "at offset %" PRId64 " for PID %" PRId64, + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, + msetr->msetr_v2_hdr->RecordCount, + txn_start_offset, msetr->msetr_v2_hdr->PID); + rd_kafka_buf_skip( + msetr->msetr_rkbuf, + rd_slice_remains( + &msetr->msetr_rkbuf->rkbuf_reader)); + msetr->msetr_aborted_cnt++; + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + } + + while (rd_kafka_buf_read_remain(msetr->msetr_rkbuf)) { + rd_kafka_resp_err_t err; + err = rd_kafka_msgset_reader_msg_v2(msetr); + if (unlikely(err)) + return err; + } + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + /* Count all parse errors as partial message errors. */ + rd_atomic64_add(&msetr->msetr_rkb->rkb_c.rx_partial, 1); + msetr->msetr_v2_hdr = NULL; + return rkbuf->rkbuf_err; +} + + + +/** + * @brief MessageSet reader for MsgVersion v2 (FetchRequest v4) + */ +static rd_kafka_resp_err_t +rd_kafka_msgset_reader_v2(rd_kafka_msgset_reader_t *msetr) { + rd_kafka_buf_t *rkbuf = msetr->msetr_rkbuf; + rd_kafka_toppar_t *rktp = msetr->msetr_rktp; + struct msgset_v2_hdr hdr; + rd_slice_t save_slice; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + size_t len_start; + size_t payload_size; + int64_t LastOffset; /* Last absolute Offset in MessageSet header */ + /* Only log decoding errors if protocol debugging enabled. */ + int log_decode_errors = + (rkbuf->rkbuf_rkb->rkb_rk->rk_conf.debug & RD_KAFKA_DBG_PROTOCOL) + ? LOG_DEBUG + : 0; + + rd_kafka_buf_read_i64(rkbuf, &hdr.BaseOffset); + rd_kafka_buf_read_i32(rkbuf, &hdr.Length); + len_start = rd_slice_offset(&rkbuf->rkbuf_reader); + + if (unlikely(hdr.Length < RD_KAFKAP_MSGSET_V2_SIZE - 8 - 4)) + rd_kafka_buf_parse_fail(rkbuf, + "%s [%" PRId32 + "] " + "MessageSet at offset %" PRId64 + " length %" PRId32 " < header size %d", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, hdr.BaseOffset, + hdr.Length, + RD_KAFKAP_MSGSET_V2_SIZE - 8 - 4); + + rd_kafka_buf_read_i32(rkbuf, &hdr.PartitionLeaderEpoch); + msetr->msetr_leader_epoch = hdr.PartitionLeaderEpoch; + + rd_kafka_buf_read_i8(rkbuf, &hdr.MagicByte); + rd_kafka_buf_read_i32(rkbuf, &hdr.Crc); + + if (msetr->msetr_rkb->rkb_rk->rk_conf.check_crcs) { + /* Verify CRC32C if desired. */ + uint32_t calc_crc; + rd_slice_t crc_slice; + size_t crc_len = hdr.Length - 4 - 1 - 4; + + if (!rd_slice_narrow_copy_relative(&rkbuf->rkbuf_reader, + &crc_slice, crc_len)) + rd_kafka_buf_check_len(rkbuf, crc_len); + + calc_crc = rd_slice_crc32c(&crc_slice); + + if (unlikely((uint32_t)hdr.Crc != calc_crc)) { + /* Propagate CRC error to application and + * continue with next message. */ + rd_kafka_consumer_err( + &msetr->msetr_rkq, msetr->msetr_broker_id, + RD_KAFKA_RESP_ERR__BAD_MSG, + msetr->msetr_tver->version, NULL, rktp, + hdr.BaseOffset, + "MessageSet at offset %" PRId64 " (%" PRId32 + " bytes) " + "failed CRC32C check " + "(original 0x%" PRIx32 + " != " + "calculated 0x%" PRIx32 ")", + hdr.BaseOffset, hdr.Length, hdr.Crc, calc_crc); + rd_kafka_buf_skip_to(rkbuf, crc_len); + rd_atomic64_add(&msetr->msetr_rkb->rkb_c.rx_err, 1); + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + } + + rd_kafka_buf_read_i16(rkbuf, &hdr.Attributes); + rd_kafka_buf_read_i32(rkbuf, &hdr.LastOffsetDelta); + LastOffset = hdr.BaseOffset + hdr.LastOffsetDelta; + rd_kafka_buf_read_i64(rkbuf, &hdr.BaseTimestamp); + rd_kafka_buf_read_i64(rkbuf, &hdr.MaxTimestamp); + rd_kafka_buf_read_i64(rkbuf, &hdr.PID); + rd_kafka_buf_read_i16(rkbuf, &hdr.ProducerEpoch); + rd_kafka_buf_read_i32(rkbuf, &hdr.BaseSequence); + rd_kafka_buf_read_i32(rkbuf, &hdr.RecordCount); + + /* Payload size is hdr.Length - MessageSet headers */ + payload_size = + hdr.Length - (rd_slice_offset(&rkbuf->rkbuf_reader) - len_start); + + if (unlikely(payload_size > rd_kafka_buf_read_remain(rkbuf))) + rd_kafka_buf_underflow_fail( + rkbuf, payload_size, + "%s [%" PRId32 + "] " + "MessageSet at offset %" PRId64 " payload size %" PRIusz, + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + hdr.BaseOffset, payload_size); + + /* If entire MessageSet contains old outdated offsets, skip it. */ + if (LastOffset < rktp->rktp_offsets.fetch_pos.offset) { + rd_kafka_buf_skip(rkbuf, payload_size); + goto done; + } + + if (hdr.Attributes & RD_KAFKA_MSGSET_V2_ATTR_CONTROL) + msetr->msetr_ctrl_cnt++; + + msetr->msetr_v2_hdr = &hdr; + + /* Handle compressed MessageSet */ + if (hdr.Attributes & RD_KAFKA_MSG_ATTR_COMPRESSION_MASK) { + const void *compressed; + + compressed = + rd_slice_ensure_contig(&rkbuf->rkbuf_reader, payload_size); + rd_assert(compressed); + + err = rd_kafka_msgset_reader_decompress( + msetr, 2 /*MsgVersion v2*/, hdr.Attributes, + hdr.BaseTimestamp, hdr.BaseOffset, compressed, + payload_size); + if (err) + goto err; + + } else { + /* Read uncompressed messages */ + + /* Save original slice, reduce size of the current one to + * be limited by the MessageSet.Length, and then start reading + * messages until the lesser slice is exhausted. */ + if (!rd_slice_narrow_relative(&rkbuf->rkbuf_reader, &save_slice, + payload_size)) + rd_kafka_buf_check_len(rkbuf, payload_size); + + /* Read messages */ + err = rd_kafka_msgset_reader_msgs_v2(msetr); + + /* Restore wider slice */ + rd_slice_widen(&rkbuf->rkbuf_reader, &save_slice); + + if (unlikely(err)) + goto err; + } + + +done: + /* Set the next fetch offset to the MessageSet header's last offset + 1 + * to avoid getting stuck on compacted MessageSets where the last + * Message in the MessageSet has an Offset < MessageSet header's + * last offset. See KAFKA-5443 */ + msetr->msetr_next_offset = LastOffset + 1; + + msetr->msetr_v2_hdr = NULL; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + /* Count all parse errors as partial message errors. */ + rd_atomic64_add(&msetr->msetr_rkb->rkb_c.rx_partial, 1); + err = rkbuf->rkbuf_err; + /* FALLTHRU */ +err: + msetr->msetr_v2_hdr = NULL; + return err; +} + + +/** + * @brief Peek into the next MessageSet to find the MsgVersion. + * + * @param MagicBytep the MsgVersion is returned here on success. + * + * @returns an error on read underflow or if the MsgVersion is + * unsupported. + */ +static rd_kafka_resp_err_t +rd_kafka_msgset_reader_peek_msg_version(rd_kafka_msgset_reader_t *msetr, + int8_t *MagicBytep) { + rd_kafka_buf_t *rkbuf = msetr->msetr_rkbuf; + rd_kafka_toppar_t *rktp = msetr->msetr_rktp; + /* Only log decoding errors if protocol debugging enabled. */ + int log_decode_errors = + (rkbuf->rkbuf_rkb->rkb_rk->rk_conf.debug & RD_KAFKA_DBG_PROTOCOL) + ? LOG_DEBUG + : 0; + size_t read_offset = rd_slice_offset(&rkbuf->rkbuf_reader); + + rd_kafka_buf_peek_i8(rkbuf, read_offset + 8 + 4 + 4, MagicBytep); + + if (unlikely(*MagicBytep < 0 || *MagicBytep > 2)) { + int64_t Offset; /* For error logging */ + int32_t Length; + + rd_kafka_buf_read_i64(rkbuf, &Offset); + + rd_rkb_dbg(msetr->msetr_rkb, + MSG | RD_KAFKA_DBG_PROTOCOL | RD_KAFKA_DBG_FETCH, + "MAGICBYTE", + "%s [%" PRId32 + "]: " + "Unsupported Message(Set) MagicByte %d at " + "offset %" PRId64 + " " + "(buffer position %" PRIusz "/%" PRIusz + "): skipping", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + (int)*MagicBytep, Offset, read_offset, + rd_slice_size(&rkbuf->rkbuf_reader)); + + if (Offset >= + msetr->msetr_rktp->rktp_offsets.fetch_pos.offset) { + rd_kafka_consumer_err( + &msetr->msetr_rkq, msetr->msetr_broker_id, + RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED, + msetr->msetr_tver->version, NULL, rktp, Offset, + "Unsupported Message(Set) MagicByte %d " + "at offset %" PRId64, + (int)*MagicBytep, Offset); + /* Skip message(set) */ + msetr->msetr_rktp->rktp_offsets.fetch_pos.offset = + Offset + 1; + } + + /* Skip this Message(Set). + * If the message is malformed, the skip may trigger err_parse + * and return ERR__BAD_MSG. */ + rd_kafka_buf_read_i32(rkbuf, &Length); + rd_kafka_buf_skip(rkbuf, Length); + + return RD_KAFKA_RESP_ERR__NOT_IMPLEMENTED; + } + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + return RD_KAFKA_RESP_ERR__BAD_MSG; +} + + +/** + * @brief Parse and read messages from msgset reader buffer. + */ +static rd_kafka_resp_err_t +rd_kafka_msgset_reader(rd_kafka_msgset_reader_t *msetr) { + rd_kafka_buf_t *rkbuf = msetr->msetr_rkbuf; + rd_kafka_resp_err_t (*reader[])(rd_kafka_msgset_reader_t *) = { + /* Indexed by MsgVersion/MagicByte, pointing to + * a Msg(Set)Version reader */ + [0] = rd_kafka_msgset_reader_msg_v0_1, + [1] = rd_kafka_msgset_reader_msg_v0_1, + [2] = rd_kafka_msgset_reader_v2}; + rd_kafka_resp_err_t err; + + /* Parse MessageSets until the slice is exhausted or an + * error occurs (typically a partial message). */ + do { + int8_t MagicByte; + + /* We dont know the MsgVersion at this point, peek where the + * MagicByte resides both in MsgVersion v0..1 and v2 to + * know which MessageSet reader to use. */ + err = + rd_kafka_msgset_reader_peek_msg_version(msetr, &MagicByte); + if (unlikely(err)) { + if (err == RD_KAFKA_RESP_ERR__BAD_MSG) + /* Read underflow, not an error. + * Broker may return a partial Fetch response + * due to its use of sendfile(2). */ + return RD_KAFKA_RESP_ERR_NO_ERROR; + + /* Continue on unsupported MsgVersions, the + * MessageSet will be skipped. */ + continue; + } + + /* Use MsgVersion-specific reader */ + err = reader[(int)MagicByte](msetr); + + } while (!err && rd_slice_remains(&rkbuf->rkbuf_reader) > 0); + + return err; +} + + + +/** + * @brief MessageSet post-processing. + * + * @param last_offsetp will be set to the offset of the last message in the set, + * or -1 if not applicable. + */ +static void rd_kafka_msgset_reader_postproc(rd_kafka_msgset_reader_t *msetr, + int64_t *last_offsetp) { + rd_kafka_op_t *rko; + + rko = rd_kafka_q_last(&msetr->msetr_rkq, RD_KAFKA_OP_FETCH, + 0 /* no error ops */); + if (rko) { + *last_offsetp = rko->rko_u.fetch.rkm.rkm_offset; + + if (*last_offsetp != -1 && msetr->msetr_relative_offsets) { + /* Update messages to absolute offsets + * and purge any messages older than the current + * fetch offset. */ + rd_kafka_q_fix_offsets( + &msetr->msetr_rkq, + msetr->msetr_rktp->rktp_offsets.fetch_pos.offset, + msetr->msetr_outer.offset - *last_offsetp); + } + } +} + + + +/** + * @brief Run the MessageSet reader, read messages until buffer is + * exhausted (or error encountered), enqueue parsed messages on + * partition queue. + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if MessageSet was successfully + * or partially parsed. When other error codes are returned it + * indicates a semi-permanent error (such as unsupported MsgVersion) + * and the fetcher should back off this partition to avoid + * busy-looping. + */ +static rd_kafka_resp_err_t +rd_kafka_msgset_reader_run(rd_kafka_msgset_reader_t *msetr) { + rd_kafka_toppar_t *rktp = msetr->msetr_rktp; + rd_kafka_resp_err_t err; + int64_t last_offset = -1; + + /* Parse MessageSets and messages */ + err = rd_kafka_msgset_reader(msetr); + + if (unlikely(rd_kafka_q_len(&msetr->msetr_rkq) == 0)) { + /* The message set didn't contain at least one full message + * or no error was posted on the response queue. + * This means the size limit perhaps was too tight, + * increase it automatically. + * If there was at least one control message there + * is probably not a size limit and nothing is done. + * If there were aborted messagesets and no underflow then + * there is no error either (#2993). + * + * Also; avoid propagating underflow errors, which cause + * backoffs, since we'll want to continue fetching the + * remaining truncated messages as soon as possible. + */ + if (msetr->msetr_ctrl_cnt > 0) { + /* Noop */ + if (err == RD_KAFKA_RESP_ERR__UNDERFLOW) + err = RD_KAFKA_RESP_ERR_NO_ERROR; + + } else if (rktp->rktp_fetch_msg_max_bytes < (1 << 30)) { + rktp->rktp_fetch_msg_max_bytes *= 2; + rd_rkb_dbg(msetr->msetr_rkb, FETCH, "CONSUME", + "Topic %s [%" PRId32 + "]: Increasing " + "max fetch bytes to %" PRId32, + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, + rktp->rktp_fetch_msg_max_bytes); + + if (err == RD_KAFKA_RESP_ERR__UNDERFLOW) + err = RD_KAFKA_RESP_ERR_NO_ERROR; + + } else if (!err && msetr->msetr_aborted_cnt == 0) { + rd_kafka_consumer_err( + &msetr->msetr_rkq, msetr->msetr_broker_id, + RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE, + msetr->msetr_tver->version, NULL, rktp, + rktp->rktp_offsets.fetch_pos.offset, + "Message at offset %" PRId64 + " might be too large to fetch, try increasing " + "receive.message.max.bytes", + rktp->rktp_offsets.fetch_pos.offset); + + } else if (msetr->msetr_aborted_cnt > 0) { + /* Noop */ + if (err == RD_KAFKA_RESP_ERR__UNDERFLOW) + err = RD_KAFKA_RESP_ERR_NO_ERROR; + } + + } else { + /* MessageSet post-processing. */ + rd_kafka_msgset_reader_postproc(msetr, &last_offset); + + /* Ignore parse errors if there was at least one + * good message since it probably indicates a + * partial response rather than an erroneous one. */ + if (err == RD_KAFKA_RESP_ERR__UNDERFLOW && + msetr->msetr_msgcnt > 0) + err = RD_KAFKA_RESP_ERR_NO_ERROR; + } + + rd_rkb_dbg(msetr->msetr_rkb, MSG | RD_KAFKA_DBG_FETCH, "CONSUME", + "Enqueue %i %smessage(s) (%" PRId64 + " bytes, %d ops) on %s [%" PRId32 + "] fetch queue (qlen %d, v%d, last_offset %" PRId64 + ", %d ctrl msgs, %d aborted msgsets, %s)", + msetr->msetr_msgcnt, msetr->msetr_srcname, + msetr->msetr_msg_bytes, rd_kafka_q_len(&msetr->msetr_rkq), + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + rd_kafka_q_len(msetr->msetr_par_rkq), + msetr->msetr_tver->version, last_offset, + msetr->msetr_ctrl_cnt, msetr->msetr_aborted_cnt, + msetr->msetr_compression + ? rd_kafka_compression2str(msetr->msetr_compression) + : "uncompressed"); + + /* Concat all messages&errors onto the parent's queue + * (the partition's fetch queue) */ + if (rd_kafka_q_concat(msetr->msetr_par_rkq, &msetr->msetr_rkq) != -1) { + /* Update partition's fetch offset based on + * last message's offest. */ + if (likely(last_offset != -1)) + rktp->rktp_offsets.fetch_pos.offset = last_offset + 1; + } + + /* Adjust next fetch offset if outlier code has indicated + * an even later next offset. */ + if (msetr->msetr_next_offset > rktp->rktp_offsets.fetch_pos.offset) + rktp->rktp_offsets.fetch_pos.offset = msetr->msetr_next_offset; + + rktp->rktp_offsets.fetch_pos.leader_epoch = msetr->msetr_leader_epoch; + + rd_kafka_q_destroy_owner(&msetr->msetr_rkq); + + /* Skip remaining part of slice so caller can continue + * with next partition. */ + rd_slice_read(&msetr->msetr_rkbuf->rkbuf_reader, NULL, + rd_slice_remains(&msetr->msetr_rkbuf->rkbuf_reader)); + return err; +} + + + +/** + * @brief Parse one MessageSet at the current buffer read position, + * enqueueing messages, propagating errors, etc. + * @remark The current rkbuf_reader slice must be limited to the MessageSet size + * + * @returns see rd_kafka_msgset_reader_run() + */ +rd_kafka_resp_err_t +rd_kafka_msgset_parse(rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + rd_kafka_toppar_t *rktp, + rd_kafka_aborted_txns_t *aborted_txns, + const struct rd_kafka_toppar_ver *tver) { + rd_kafka_msgset_reader_t msetr; + rd_kafka_resp_err_t err; + + rd_kafka_msgset_reader_init(&msetr, rkbuf, rktp, tver, aborted_txns, + rktp->rktp_fetchq); + + /* Parse and handle the message set */ + err = rd_kafka_msgset_reader_run(&msetr); + + rd_atomic64_add(&rktp->rktp_c.rx_msgs, msetr.msetr_msgcnt); + rd_atomic64_add(&rktp->rktp_c.rx_msg_bytes, msetr.msetr_msg_bytes); + + rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchcnt, + (int64_t)msetr.msetr_msgcnt); + rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchsize, + (int64_t)msetr.msetr_msg_bytes); + + return err; +} + + +/** + * @brief Offset comparator + */ +static int rd_kafka_offset_cmp(const void *_a, const void *_b) { + const int64_t *a = _a, *b = _b; + return (*a > *b) - (*a < *b); +} + + +/** + * @brief Pid comparator for rd_kafka_aborted_txn_start_offsets_t + */ +static int rd_kafka_aborted_txn_cmp_by_pid(const void *_a, const void *_b) { + const rd_kafka_aborted_txn_start_offsets_t *a = _a, *b = _b; + return (a->pid > b->pid) - (a->pid < b->pid); +} + + +/** + * @brief Free resources associated with an AVL tree node. + */ +static void rd_kafka_aborted_txn_node_destroy(void *_node_ptr) { + rd_kafka_aborted_txn_start_offsets_t *node_ptr = _node_ptr; + rd_list_destroy(&node_ptr->offsets); + rd_free(node_ptr); +} + + +/** + * @brief Allocate memory for, and initialize a new + * rd_kafka_aborted_txns_t struct. + */ +rd_kafka_aborted_txns_t *rd_kafka_aborted_txns_new(int32_t txn_cnt) { + rd_kafka_aborted_txns_t *aborted_txns; + aborted_txns = rd_malloc(sizeof(*aborted_txns)); + rd_avl_init(&aborted_txns->avl, rd_kafka_aborted_txn_cmp_by_pid, 0); + rd_list_init(&aborted_txns->list, txn_cnt, + rd_kafka_aborted_txn_node_destroy); + aborted_txns->cnt = txn_cnt; + return aborted_txns; +} + + +/** + * @brief Free all resources associated with a + * rd_kafka_aborted_txns_t struct. + */ +void rd_kafka_aborted_txns_destroy(rd_kafka_aborted_txns_t *aborted_txns) { + rd_list_destroy(&aborted_txns->list); + rd_avl_destroy(&aborted_txns->avl); + rd_free(aborted_txns); +} + + +/** + * @brief Get the abort txn start offsets corresponding to + * the specified pid. + */ +static RD_INLINE rd_kafka_aborted_txn_start_offsets_t * +rd_kafka_aborted_txns_offsets_for_pid(rd_kafka_aborted_txns_t *aborted_txns, + int64_t pid) { + rd_kafka_aborted_txn_start_offsets_t node; + node.pid = pid; + return RD_AVL_FIND(&aborted_txns->avl, &node); +} + + +/** + * @brief Get the next aborted transaction start + * offset for the specified pid. + * + * @param increment_idx if true, the offset index will be incremented. + * @param max_offset If the next aborted offset is greater than \p max_offset + * then the index is not incremented (regardless of + * \p increment_idx) and the function returns -1. + * This may be the case for empty aborted transactions + * that have an ABORT marker but are not listed in the + * AbortedTxns list. + * + * + * @returns the start offset or -1 if there is none. + */ +static int64_t +rd_kafka_aborted_txns_next_offset(rd_kafka_aborted_txns_t *aborted_txns, + int64_t pid, + rd_bool_t increment_idx, + int64_t max_offset) { + int64_t abort_start_offset; + rd_kafka_aborted_txn_start_offsets_t *node_ptr = + rd_kafka_aborted_txns_offsets_for_pid(aborted_txns, pid); + + if (node_ptr == NULL) + return -1; + + if (unlikely(node_ptr->offsets_idx >= rd_list_cnt(&node_ptr->offsets))) + return -1; + + abort_start_offset = *( + (int64_t *)rd_list_elem(&node_ptr->offsets, node_ptr->offsets_idx)); + + if (unlikely(abort_start_offset > max_offset)) + return -1; + + if (increment_idx) + node_ptr->offsets_idx++; + + return abort_start_offset; +} + + +/** + * @brief Get the next aborted transaction start + * offset for the specified pid and progress the + * current index to the next one. + * + * @param max_offset If the next aborted offset is greater than \p max_offset + * then no offset is popped and the function returns -1. + * This may be the case for empty aborted transactions + * that have an ABORT marker but are not listed in the + * AbortedTxns list. + * + * @returns the start offset or -1 if there is none. + */ +static RD_INLINE int64_t +rd_kafka_aborted_txns_pop_offset(rd_kafka_aborted_txns_t *aborted_txns, + int64_t pid, + int64_t max_offset) { + return rd_kafka_aborted_txns_next_offset(aborted_txns, pid, rd_true, + max_offset); +} + + +/** + * @brief Get the next aborted transaction start + * offset for the specified pid. + * + * @returns the start offset or -1 if there is none. + */ +static RD_INLINE int64_t +rd_kafka_aborted_txns_get_offset(const rd_kafka_aborted_txns_t *aborted_txns, + int64_t pid) { + return rd_kafka_aborted_txns_next_offset( + (rd_kafka_aborted_txns_t *)aborted_txns, pid, rd_false, INT64_MAX); +} + + +/** + * @brief Add a transaction start offset corresponding + * to the specified pid to the aborted_txns collection. + */ +void rd_kafka_aborted_txns_add(rd_kafka_aborted_txns_t *aborted_txns, + int64_t pid, + int64_t first_offset) { + int64_t *v; + rd_kafka_aborted_txn_start_offsets_t *node_ptr = + rd_kafka_aborted_txns_offsets_for_pid(aborted_txns, pid); + + if (!node_ptr) { + node_ptr = rd_malloc(sizeof(*node_ptr)); + node_ptr->pid = pid; + node_ptr->offsets_idx = 0; + rd_list_init(&node_ptr->offsets, 0, NULL); + /* Each PID list has no more than AbortedTxnCnt elements */ + rd_list_prealloc_elems(&node_ptr->offsets, sizeof(int64_t), + aborted_txns->cnt, 0); + RD_AVL_INSERT(&aborted_txns->avl, node_ptr, avl_node); + rd_list_add(&aborted_txns->list, node_ptr); + } + + v = rd_list_add(&node_ptr->offsets, NULL); + *v = first_offset; +} + + +/** + * @brief Sort each of the abort transaction start + * offset lists for each pid. + */ +void rd_kafka_aborted_txns_sort(rd_kafka_aborted_txns_t *aborted_txns) { + int k; + for (k = 0; k < rd_list_cnt(&aborted_txns->list); k++) { + rd_kafka_aborted_txn_start_offsets_t *el = + rd_list_elem(&aborted_txns->list, k); + rd_list_sort(&el->offsets, rd_kafka_offset_cmp); + } +} + + +/** + * @brief Unit tests for all functions that operate on + * rd_kafka_aborted_txns_t + */ +int unittest_aborted_txns(void) { + rd_kafka_aborted_txns_t *aborted_txns = NULL; + int64_t start_offset; + + aborted_txns = rd_kafka_aborted_txns_new(7); + rd_kafka_aborted_txns_add(aborted_txns, 1, 42); + rd_kafka_aborted_txns_add(aborted_txns, 1, 44); + rd_kafka_aborted_txns_add(aborted_txns, 1, 10); + rd_kafka_aborted_txns_add(aborted_txns, 1, 100); + rd_kafka_aborted_txns_add(aborted_txns, 2, 11); + rd_kafka_aborted_txns_add(aborted_txns, 2, 7); + rd_kafka_aborted_txns_add(aborted_txns, 1, 3); + rd_kafka_aborted_txns_sort(aborted_txns); + + start_offset = rd_kafka_aborted_txns_get_offset(aborted_txns, 1); + RD_UT_ASSERT(3 == start_offset, + "queried start offset was %" PRId64 + ", " + "expected 3", + start_offset); + + start_offset = rd_kafka_aborted_txns_get_offset(aborted_txns, 1); + RD_UT_ASSERT(3 == start_offset, + "queried start offset was %" PRId64 + ", " + "expected 3", + start_offset); + + start_offset = + rd_kafka_aborted_txns_pop_offset(aborted_txns, 1, INT64_MAX); + RD_UT_ASSERT(3 == start_offset, + "queried start offset was %" PRId64 + ", " + "expected 3", + start_offset); + + start_offset = rd_kafka_aborted_txns_get_offset(aborted_txns, 1); + RD_UT_ASSERT(10 == start_offset, + "queried start offset was %" PRId64 + ", " + "expected 10", + start_offset); + + start_offset = rd_kafka_aborted_txns_get_offset(aborted_txns, 2); + RD_UT_ASSERT(7 == start_offset, + "queried start offset was %" PRId64 + ", " + "expected 7", + start_offset); + + rd_kafka_aborted_txns_pop_offset(aborted_txns, 1, INT64_MAX); + + start_offset = rd_kafka_aborted_txns_get_offset(aborted_txns, 1); + RD_UT_ASSERT(42 == start_offset, + "queried start offset was %" PRId64 + ", " + "expected 42", + start_offset); + + rd_kafka_aborted_txns_pop_offset(aborted_txns, 1, INT64_MAX); + + start_offset = rd_kafka_aborted_txns_get_offset(aborted_txns, 1); + RD_UT_ASSERT(44 == start_offset, + "queried start offset was %" PRId64 + ", " + "expected 44", + start_offset); + + start_offset = rd_kafka_aborted_txns_get_offset(aborted_txns, 2); + RD_UT_ASSERT(7 == start_offset, + "queried start offset was %" PRId64 + ", " + "expected 7", + start_offset); + + rd_kafka_aborted_txns_pop_offset(aborted_txns, 2, INT64_MAX); + + start_offset = rd_kafka_aborted_txns_get_offset(aborted_txns, 2); + RD_UT_ASSERT(11 == start_offset, + "queried start offset was %" PRId64 + ", " + "expected 11", + start_offset); + + /* error cases */ + start_offset = rd_kafka_aborted_txns_get_offset(aborted_txns, 3); + RD_UT_ASSERT(-1 == start_offset, + "queried start offset was %" PRId64 + ", " + "expected -1", + start_offset); + + rd_kafka_aborted_txns_pop_offset(aborted_txns, 1, INT64_MAX); + rd_kafka_aborted_txns_pop_offset(aborted_txns, 1, INT64_MAX); + rd_kafka_aborted_txns_pop_offset(aborted_txns, 2, INT64_MAX); + + start_offset = rd_kafka_aborted_txns_get_offset(aborted_txns, 1); + RD_UT_ASSERT(-1 == start_offset, + "queried start offset was %" PRId64 + ", " + "expected -1", + start_offset); + + start_offset = rd_kafka_aborted_txns_get_offset(aborted_txns, 2); + RD_UT_ASSERT(-1 == start_offset, + "queried start offset was %" PRId64 + ", " + "expected -1", + start_offset); + + rd_kafka_aborted_txns_destroy(aborted_txns); + + RD_UT_PASS(); +} |