diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_fetcher.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_fetcher.c | 1145 |
1 files changed, 1145 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_fetcher.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_fetcher.c new file mode 100644 index 00000000..8ee67a42 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_fetcher.c @@ -0,0 +1,1145 @@ +/* + * librdkafka - The Apache Kafka C/C++ library + * + * Copyright (c) 2022 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +/** + * @name Fetcher + * + */ + +#include "rdkafka_int.h" +#include "rdkafka_offset.h" +#include "rdkafka_msgset.h" +#include "rdkafka_fetcher.h" + + +/** + * Backoff the next Fetch request (due to error). + */ +static void rd_kafka_broker_fetch_backoff(rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err) { + int backoff_ms = rkb->rkb_rk->rk_conf.fetch_error_backoff_ms; + rkb->rkb_ts_fetch_backoff = rd_clock() + (backoff_ms * 1000); + rd_rkb_dbg(rkb, FETCH, "BACKOFF", "Fetch backoff for %dms: %s", + backoff_ms, rd_kafka_err2str(err)); +} + +/** + * @brief Backoff the next Fetch for specific partition + */ +static void rd_kafka_toppar_fetch_backoff(rd_kafka_broker_t *rkb, + rd_kafka_toppar_t *rktp, + rd_kafka_resp_err_t err) { + int backoff_ms = rkb->rkb_rk->rk_conf.fetch_error_backoff_ms; + + /* Don't back off on reaching end of partition */ + if (err == RD_KAFKA_RESP_ERR__PARTITION_EOF) + return; + + /* Certain errors that may require manual intervention should have + * a longer backoff time. */ + if (err == RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED) + backoff_ms = RD_MAX(1000, backoff_ms * 10); + + rktp->rktp_ts_fetch_backoff = rd_clock() + (backoff_ms * 1000); + + rd_rkb_dbg(rkb, FETCH, "BACKOFF", + "%s [%" PRId32 "]: Fetch backoff for %dms%s%s", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + backoff_ms, err ? ": " : "", + err ? rd_kafka_err2str(err) : ""); +} + + +/** + * @brief Handle preferred replica in fetch response. + * + * @locks rd_kafka_toppar_lock(rktp) and + * rd_kafka_rdlock(rk) must NOT be held. + * + * @locality broker thread + */ +static void rd_kafka_fetch_preferred_replica_handle(rd_kafka_toppar_t *rktp, + rd_kafka_buf_t *rkbuf, + rd_kafka_broker_t *rkb, + int32_t preferred_id) { + const rd_ts_t one_minute = 60 * 1000 * 1000; + const rd_ts_t five_seconds = 5 * 1000 * 1000; + rd_kafka_broker_t *preferred_rkb; + rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; + rd_ts_t new_intvl = + rd_interval_immediate(&rktp->rktp_new_lease_intvl, one_minute, 0); + + if (new_intvl < 0) { + /* In lieu of KIP-320, the toppar is delegated back to + * the leader in the event of an offset out-of-range + * error (KIP-392 error case #4) because this scenario + * implies the preferred replica is out-of-sync. + * + * If program execution reaches here, the leader has + * relatively quickly instructed the client back to + * a preferred replica, quite possibly the same one + * as before (possibly resulting from stale metadata), + * so we back off the toppar to slow down potential + * back-and-forth. + */ + + if (rd_interval_immediate(&rktp->rktp_new_lease_log_intvl, + one_minute, 0) > 0) + rd_rkb_log(rkb, LOG_NOTICE, "FETCH", + "%.*s [%" PRId32 + "]: preferred replica " + "(%" PRId32 + ") lease changing too quickly " + "(%" PRId64 + "s < 60s): possibly due to " + "unavailable replica or stale cluster " + "state: backing off next fetch", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, preferred_id, + (one_minute - -new_intvl) / (1000 * 1000)); + + rd_kafka_toppar_fetch_backoff(rkb, rktp, + RD_KAFKA_RESP_ERR_NO_ERROR); + } + + rd_kafka_rdlock(rk); + preferred_rkb = rd_kafka_broker_find_by_nodeid(rk, preferred_id); + rd_kafka_rdunlock(rk); + + if (preferred_rkb) { + rd_interval_reset_to_now(&rktp->rktp_lease_intvl, 0); + rd_kafka_toppar_lock(rktp); + rd_kafka_toppar_broker_update(rktp, preferred_id, preferred_rkb, + "preferred replica updated"); + rd_kafka_toppar_unlock(rktp); + rd_kafka_broker_destroy(preferred_rkb); + return; + } + + if (rd_interval_immediate(&rktp->rktp_metadata_intvl, five_seconds, 0) > + 0) { + rd_rkb_log(rkb, LOG_NOTICE, "FETCH", + "%.*s [%" PRId32 "]: preferred replica (%" PRId32 + ") " + "is unknown: refreshing metadata", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, preferred_id); + + rd_kafka_metadata_refresh_brokers( + rktp->rktp_rkt->rkt_rk, NULL, + "preferred replica unavailable"); + } + + rd_kafka_toppar_fetch_backoff(rkb, rktp, + RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE); +} + + +/** + * @brief Handle partition-specific Fetch error. + */ +static void rd_kafka_fetch_reply_handle_partition_error( + rd_kafka_broker_t *rkb, + rd_kafka_toppar_t *rktp, + const struct rd_kafka_toppar_ver *tver, + rd_kafka_resp_err_t err, + int64_t HighwaterMarkOffset) { + + rd_rkb_dbg(rkb, FETCH, "FETCHERR", + "%.*s [%" PRId32 "]: Fetch failed at %s: %s", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_fetch_pos2str(rktp->rktp_offsets.fetch_pos), + rd_kafka_err2name(err)); + + /* Some errors should be passed to the + * application while some handled by rdkafka */ + switch (err) { + /* Errors handled by rdkafka */ + case RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART: + case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR_NOT_LEADER_OR_FOLLOWER: + case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR: + case RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH: + case RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH: + if (err == RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE) { + /* Occurs when: + * - Msg exists on broker but + * offset > HWM, or: + * - HWM is >= offset, but msg not + * yet available at that offset + * (replica is out of sync). + * - partition leader is out of sync. + * + * Handle by requesting metadata update, changing back + * to the leader, and then retrying FETCH + * (with backoff). + */ + rd_rkb_dbg(rkb, MSG, "FETCH", + "Topic %s [%" PRId32 + "]: %s not " + "available on broker %" PRId32 + " (leader %" PRId32 + "): updating metadata and retrying", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, + rd_kafka_fetch_pos2str( + rktp->rktp_offsets.fetch_pos), + rktp->rktp_broker_id, rktp->rktp_leader_id); + } + + if (err == RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH) { + rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_CONSUMER, "FETCH", + "Topic %s [%" PRId32 + "]: Fetch failed at %s: %s: broker %" PRId32 + "has not yet caught up on latest metadata: " + "retrying", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, + rd_kafka_fetch_pos2str( + rktp->rktp_offsets.fetch_pos), + rd_kafka_err2str(err), rktp->rktp_broker_id); + } + + if (rktp->rktp_broker_id != rktp->rktp_leader_id) { + rd_kafka_toppar_delegate_to_leader(rktp); + } + /* Request metadata information update*/ + rd_kafka_toppar_leader_unavailable(rktp, "fetch", err); + break; + + case RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE: { + rd_kafka_fetch_pos_t err_pos; + + if (rktp->rktp_broker_id != rktp->rktp_leader_id && + rktp->rktp_offsets.fetch_pos.offset > HighwaterMarkOffset) { + rd_kafka_log(rkb->rkb_rk, LOG_WARNING, "FETCH", + "Topic %s [%" PRId32 + "]: %s " + " out of range (HighwaterMark %" PRId64 + " fetching from " + "broker %" PRId32 " (leader %" PRId32 + "): reverting to leader", + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, + rd_kafka_fetch_pos2str( + rktp->rktp_offsets.fetch_pos), + HighwaterMarkOffset, rktp->rktp_broker_id, + rktp->rktp_leader_id); + + /* Out of range error cannot be taken as definitive + * when fetching from follower. + * Revert back to the leader in lieu of KIP-320. + */ + rd_kafka_toppar_delegate_to_leader(rktp); + break; + } + + /* Application error */ + err_pos = rktp->rktp_offsets.fetch_pos; + rktp->rktp_offsets.fetch_pos.offset = RD_KAFKA_OFFSET_INVALID; + rktp->rktp_offsets.fetch_pos.leader_epoch = -1; + rd_kafka_offset_reset(rktp, rd_kafka_broker_id(rkb), err_pos, + err, + "fetch failed due to requested offset " + "not available on the broker"); + } break; + + case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED: + /* If we're not authorized to access the + * topic mark it as errored to deny + * further Fetch requests. */ + if (rktp->rktp_last_error != err) { + rd_kafka_consumer_err( + rktp->rktp_fetchq, rd_kafka_broker_id(rkb), err, + tver->version, NULL, rktp, + rktp->rktp_offsets.fetch_pos.offset, + "Fetch from broker %" PRId32 " failed: %s", + rd_kafka_broker_id(rkb), rd_kafka_err2str(err)); + rktp->rktp_last_error = err; + } + break; + + + /* Application errors */ + case RD_KAFKA_RESP_ERR__PARTITION_EOF: + if (rkb->rkb_rk->rk_conf.enable_partition_eof) + rd_kafka_consumer_err( + rktp->rktp_fetchq, rd_kafka_broker_id(rkb), err, + tver->version, NULL, rktp, + rktp->rktp_offsets.fetch_pos.offset, + "Fetch from broker %" PRId32 + " reached end of " + "partition at offset %" PRId64 + " (HighwaterMark %" PRId64 ")", + rd_kafka_broker_id(rkb), + rktp->rktp_offsets.fetch_pos.offset, + HighwaterMarkOffset); + break; + + case RD_KAFKA_RESP_ERR_MSG_SIZE_TOO_LARGE: + default: /* and all other errors */ + rd_dassert(tver->version > 0); + rd_kafka_consumer_err( + rktp->rktp_fetchq, rd_kafka_broker_id(rkb), err, + tver->version, NULL, rktp, + rktp->rktp_offsets.fetch_pos.offset, + "Fetch from broker %" PRId32 " failed at %s: %s", + rd_kafka_broker_id(rkb), + rd_kafka_fetch_pos2str(rktp->rktp_offsets.fetch_pos), + rd_kafka_err2str(err)); + break; + } + + /* Back off the next fetch for this partition */ + rd_kafka_toppar_fetch_backoff(rkb, rktp, err); +} + + + +/** + * @brief Per-partition FetchResponse parsing and handling. + * + * @returns an error on buffer parse failure, else RD_KAFKA_RESP_ERR_NO_ERROR. + */ +static rd_kafka_resp_err_t +rd_kafka_fetch_reply_handle_partition(rd_kafka_broker_t *rkb, + const rd_kafkap_str_t *topic, + rd_kafka_topic_t *rkt /*possibly NULL*/, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + int16_t ErrorCode) { + const int log_decode_errors = LOG_ERR; + struct rd_kafka_toppar_ver *tver, tver_skel; + rd_kafka_toppar_t *rktp = NULL; + rd_kafka_aborted_txns_t *aborted_txns = NULL; + rd_slice_t save_slice; + int32_t fetch_version; + struct { + int32_t Partition; + int16_t ErrorCode; + int64_t HighwaterMarkOffset; + int64_t LastStableOffset; /* v4 */ + int64_t LogStartOffset; /* v5 */ + int32_t MessageSetSize; + int32_t PreferredReadReplica; /* v11 */ + } hdr; + rd_kafka_resp_err_t err; + int64_t end_offset; + + rd_kafka_buf_read_i32(rkbuf, &hdr.Partition); + rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode); + if (ErrorCode) + hdr.ErrorCode = ErrorCode; + rd_kafka_buf_read_i64(rkbuf, &hdr.HighwaterMarkOffset); + + end_offset = hdr.HighwaterMarkOffset; + + hdr.LastStableOffset = RD_KAFKA_OFFSET_INVALID; + hdr.LogStartOffset = RD_KAFKA_OFFSET_INVALID; + if (rd_kafka_buf_ApiVersion(request) >= 4) { + int32_t AbortedTxnCnt; + rd_kafka_buf_read_i64(rkbuf, &hdr.LastStableOffset); + if (rd_kafka_buf_ApiVersion(request) >= 5) + rd_kafka_buf_read_i64(rkbuf, &hdr.LogStartOffset); + + rd_kafka_buf_read_i32(rkbuf, &AbortedTxnCnt); + + if (rkb->rkb_rk->rk_conf.isolation_level == + RD_KAFKA_READ_UNCOMMITTED) { + + if (unlikely(AbortedTxnCnt > 0)) { + rd_rkb_log(rkb, LOG_ERR, "FETCH", + "%.*s [%" PRId32 + "]: " + "%" PRId32 + " aborted transaction(s) " + "encountered in READ_UNCOMMITTED " + "fetch response: ignoring.", + RD_KAFKAP_STR_PR(topic), + hdr.Partition, AbortedTxnCnt); + + rd_kafka_buf_skip(rkbuf, + AbortedTxnCnt * (8 + 8)); + } + } else { + /* Older brokers may return LSO -1, + * in which case we use the HWM. */ + if (hdr.LastStableOffset >= 0) + end_offset = hdr.LastStableOffset; + + if (AbortedTxnCnt > 0) { + int k; + + if (unlikely(AbortedTxnCnt > 1000000)) + rd_kafka_buf_parse_fail( + rkbuf, + "%.*s [%" PRId32 + "]: " + "invalid AbortedTxnCnt %" PRId32, + RD_KAFKAP_STR_PR(topic), + hdr.Partition, AbortedTxnCnt); + + aborted_txns = + rd_kafka_aborted_txns_new(AbortedTxnCnt); + for (k = 0; k < AbortedTxnCnt; k++) { + int64_t PID; + int64_t FirstOffset; + rd_kafka_buf_read_i64(rkbuf, &PID); + rd_kafka_buf_read_i64(rkbuf, + &FirstOffset); + rd_kafka_aborted_txns_add( + aborted_txns, PID, FirstOffset); + } + rd_kafka_aborted_txns_sort(aborted_txns); + } + } + } + + if (rd_kafka_buf_ApiVersion(request) >= 11) + rd_kafka_buf_read_i32(rkbuf, &hdr.PreferredReadReplica); + else + hdr.PreferredReadReplica = -1; + + rd_kafka_buf_read_i32(rkbuf, &hdr.MessageSetSize); + + if (unlikely(hdr.MessageSetSize < 0)) + rd_kafka_buf_parse_fail( + rkbuf, + "%.*s [%" PRId32 "]: invalid MessageSetSize %" PRId32, + RD_KAFKAP_STR_PR(topic), hdr.Partition, hdr.MessageSetSize); + + /* Look up topic+partition */ + if (likely(rkt != NULL)) { + rd_kafka_topic_rdlock(rkt); + rktp = rd_kafka_toppar_get(rkt, hdr.Partition, + 0 /*no ua-on-miss*/); + rd_kafka_topic_rdunlock(rkt); + } + + if (unlikely(!rkt || !rktp)) { + rd_rkb_dbg(rkb, TOPIC, "UNKTOPIC", + "Received Fetch response (error %hu) for unknown " + "topic %.*s [%" PRId32 "]: ignoring", + hdr.ErrorCode, RD_KAFKAP_STR_PR(topic), + hdr.Partition); + rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize); + if (aborted_txns) + rd_kafka_aborted_txns_destroy(aborted_txns); + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + rd_kafka_toppar_lock(rktp); + rktp->rktp_lo_offset = hdr.LogStartOffset; + rktp->rktp_hi_offset = hdr.HighwaterMarkOffset; + /* Let the LastStable offset be the effective + * end_offset based on protocol version, that is: + * if connected to a broker that does not support + * LastStableOffset we use the HighwaterMarkOffset. */ + rktp->rktp_ls_offset = end_offset; + rd_kafka_toppar_unlock(rktp); + + if (hdr.PreferredReadReplica != -1) { + + rd_kafka_fetch_preferred_replica_handle( + rktp, rkbuf, rkb, hdr.PreferredReadReplica); + + if (unlikely(hdr.MessageSetSize != 0)) { + rd_rkb_log(rkb, LOG_WARNING, "FETCH", + "%.*s [%" PRId32 + "]: Fetch response has both preferred read " + "replica and non-zero message set size: " + "%" PRId32 ": skipping messages", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, hdr.MessageSetSize); + rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize); + } + + if (aborted_txns) + rd_kafka_aborted_txns_destroy(aborted_txns); + rd_kafka_toppar_destroy(rktp); /* from get */ + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + rd_kafka_toppar_lock(rktp); + + /* Make sure toppar hasn't moved to another broker + * during the lifetime of the request. */ + if (unlikely(rktp->rktp_broker != rkb)) { + rd_kafka_toppar_unlock(rktp); + rd_rkb_dbg(rkb, MSG, "FETCH", + "%.*s [%" PRId32 + "]: partition broker has changed: " + "discarding fetch response", + RD_KAFKAP_STR_PR(topic), hdr.Partition); + rd_kafka_toppar_destroy(rktp); /* from get */ + rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize); + if (aborted_txns) + rd_kafka_aborted_txns_destroy(aborted_txns); + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + fetch_version = rktp->rktp_fetch_version; + rd_kafka_toppar_unlock(rktp); + + /* Check if this Fetch is for an outdated fetch version, + * or the original rktp was removed and a new one + * created (due to partition count decreasing and + * then increasing again, which can happen in + * desynchronized clusters): if so ignore it. */ + tver_skel.rktp = rktp; + tver = rd_list_find(request->rkbuf_rktp_vers, &tver_skel, + rd_kafka_toppar_ver_cmp); + rd_kafka_assert(NULL, tver); + if (tver->rktp != rktp || tver->version < fetch_version) { + rd_rkb_dbg(rkb, MSG, "DROP", + "%s [%" PRId32 + "]: dropping outdated fetch response " + "(v%d < %d or old rktp)", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + tver->version, fetch_version); + rd_atomic64_add(&rktp->rktp_c.rx_ver_drops, 1); + rd_kafka_toppar_destroy(rktp); /* from get */ + rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize); + if (aborted_txns) + rd_kafka_aborted_txns_destroy(aborted_txns); + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + rd_rkb_dbg(rkb, MSG, "FETCH", + "Topic %.*s [%" PRId32 "] MessageSet size %" PRId32 + ", error \"%s\", MaxOffset %" PRId64 ", LSO %" PRId64 + ", Ver %" PRId32 "/%" PRId32, + RD_KAFKAP_STR_PR(topic), hdr.Partition, hdr.MessageSetSize, + rd_kafka_err2str(hdr.ErrorCode), hdr.HighwaterMarkOffset, + hdr.LastStableOffset, tver->version, fetch_version); + + /* If this is the last message of the queue, + * signal EOF back to the application. */ + if (end_offset == rktp->rktp_offsets.fetch_pos.offset && + rktp->rktp_offsets.eof_offset != end_offset) { + hdr.ErrorCode = RD_KAFKA_RESP_ERR__PARTITION_EOF; + rktp->rktp_offsets.eof_offset = end_offset; + } + + if (unlikely(hdr.ErrorCode != RD_KAFKA_RESP_ERR_NO_ERROR)) { + /* Handle partition-level errors. */ + rd_kafka_fetch_reply_handle_partition_error( + rkb, rktp, tver, hdr.ErrorCode, hdr.HighwaterMarkOffset); + + rd_kafka_toppar_destroy(rktp); /* from get()*/ + + rd_kafka_buf_skip(rkbuf, hdr.MessageSetSize); + + if (aborted_txns) + rd_kafka_aborted_txns_destroy(aborted_txns); + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + /* No error, clear any previous fetch error. */ + rktp->rktp_last_error = RD_KAFKA_RESP_ERR_NO_ERROR; + + if (unlikely(hdr.MessageSetSize <= 0)) { + rd_kafka_toppar_destroy(rktp); /*from get()*/ + if (aborted_txns) + rd_kafka_aborted_txns_destroy(aborted_txns); + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + /** + * Parse MessageSet + */ + if (!rd_slice_narrow_relative(&rkbuf->rkbuf_reader, &save_slice, + (size_t)hdr.MessageSetSize)) + rd_kafka_buf_check_len(rkbuf, hdr.MessageSetSize); + + /* Parse messages */ + err = rd_kafka_msgset_parse(rkbuf, request, rktp, aborted_txns, tver); + + if (aborted_txns) + rd_kafka_aborted_txns_destroy(aborted_txns); + + rd_slice_widen(&rkbuf->rkbuf_reader, &save_slice); + /* Continue with next partition regardless of + * parse errors (which are partition-specific) */ + + /* On error: back off the fetcher for this partition */ + if (unlikely(err)) + rd_kafka_toppar_fetch_backoff(rkb, rktp, err); + + rd_kafka_toppar_destroy(rktp); /*from get()*/ + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (rktp) + rd_kafka_toppar_destroy(rktp); /*from get()*/ + + return rkbuf->rkbuf_err; +} + +/** + * Parses and handles a Fetch reply. + * Returns 0 on success or an error code on failure. + */ +static rd_kafka_resp_err_t +rd_kafka_fetch_reply_handle(rd_kafka_broker_t *rkb, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request) { + int32_t TopicArrayCnt; + int i; + const int log_decode_errors = LOG_ERR; + rd_kafka_topic_t *rkt = NULL; + int16_t ErrorCode = RD_KAFKA_RESP_ERR_NO_ERROR; + + if (rd_kafka_buf_ApiVersion(request) >= 1) { + int32_t Throttle_Time; + rd_kafka_buf_read_i32(rkbuf, &Throttle_Time); + + rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep, + Throttle_Time); + } + + if (rd_kafka_buf_ApiVersion(request) >= 7) { + int32_t SessionId; + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + rd_kafka_buf_read_i32(rkbuf, &SessionId); + } + + rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); + /* Verify that TopicArrayCnt seems to be in line with remaining size */ + rd_kafka_buf_check_len(rkbuf, + TopicArrayCnt * (3 /*topic min size*/ + + 4 /*PartitionArrayCnt*/ + 4 + + 2 + 8 + 4 /*inner header*/)); + + for (i = 0; i < TopicArrayCnt; i++) { + rd_kafkap_str_t topic; + int32_t PartitionArrayCnt; + int j; + + rd_kafka_buf_read_str(rkbuf, &topic); + rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt); + + rkt = rd_kafka_topic_find0(rkb->rkb_rk, &topic); + + for (j = 0; j < PartitionArrayCnt; j++) { + if (rd_kafka_fetch_reply_handle_partition( + rkb, &topic, rkt, rkbuf, request, ErrorCode)) + goto err_parse; + } + + if (rkt) { + rd_kafka_topic_destroy0(rkt); + rkt = NULL; + } + } + + if (rd_kafka_buf_read_remain(rkbuf) != 0) { + rd_kafka_buf_parse_fail(rkbuf, + "Remaining data after message set " + "parse: %" PRIusz " bytes", + rd_kafka_buf_read_remain(rkbuf)); + RD_NOTREACHED(); + } + + return 0; + +err_parse: + if (rkt) + rd_kafka_topic_destroy0(rkt); + rd_rkb_dbg(rkb, MSG, "BADMSG", + "Bad message (Fetch v%d): " + "is broker.version.fallback incorrectly set?", + (int)request->rkbuf_reqhdr.ApiVersion); + return rkbuf->rkbuf_err; +} + + + +/** + * @broker FetchResponse handling. + * + * @locality broker thread (or any thread if err == __DESTROY). + */ +static void rd_kafka_broker_fetch_reply(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *reply, + rd_kafka_buf_t *request, + void *opaque) { + + if (err == RD_KAFKA_RESP_ERR__DESTROY) + return; /* Terminating */ + + rd_kafka_assert(rkb->rkb_rk, rkb->rkb_fetching > 0); + rkb->rkb_fetching = 0; + + /* Parse and handle the messages (unless the request errored) */ + if (!err && reply) + err = rd_kafka_fetch_reply_handle(rkb, reply, request); + + if (unlikely(err)) { + char tmp[128]; + + rd_rkb_dbg(rkb, MSG, "FETCH", "Fetch reply: %s", + rd_kafka_err2str(err)); + switch (err) { + case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART: + case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION: + case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE: + /* Request metadata information update */ + rd_snprintf(tmp, sizeof(tmp), "FetchRequest failed: %s", + rd_kafka_err2str(err)); + rd_kafka_metadata_refresh_known_topics( + rkb->rkb_rk, NULL, rd_true /*force*/, tmp); + /* FALLTHRU */ + + case RD_KAFKA_RESP_ERR__TRANSPORT: + case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT: + case RD_KAFKA_RESP_ERR__MSG_TIMED_OUT: + /* The fetch is already intervalled from + * consumer_serve() so dont retry. */ + break; + + default: + break; + } + + rd_kafka_broker_fetch_backoff(rkb, err); + /* FALLTHRU */ + } +} + + + +/** + * @brief Build and send a Fetch request message for all underflowed toppars + * for a specific broker. + * + * @returns the number of partitions included in the FetchRequest, if any. + * + * @locality broker thread + */ +int rd_kafka_broker_fetch_toppars(rd_kafka_broker_t *rkb, rd_ts_t now) { + rd_kafka_toppar_t *rktp; + rd_kafka_buf_t *rkbuf; + int cnt = 0; + size_t of_TopicArrayCnt = 0; + int TopicArrayCnt = 0; + size_t of_PartitionArrayCnt = 0; + int PartitionArrayCnt = 0; + rd_kafka_topic_t *rkt_last = NULL; + int16_t ApiVersion = 0; + + /* Create buffer and segments: + * 1 x ReplicaId MaxWaitTime MinBytes TopicArrayCnt + * N x topic name + * N x PartitionArrayCnt Partition FetchOffset MaxBytes + * where N = number of toppars. + * Since we dont keep track of the number of topics served by + * this broker, only the partition count, we do a worst-case calc + * when allocating and assume each partition is on its own topic + */ + + if (unlikely(rkb->rkb_active_toppar_cnt == 0)) + return 0; + + rkbuf = rd_kafka_buf_new_request( + rkb, RD_KAFKAP_Fetch, 1, + /* ReplicaId+MaxWaitTime+MinBytes+MaxBytes+IsolationLevel+ + * SessionId+Epoch+TopicCnt */ + 4 + 4 + 4 + 4 + 1 + 4 + 4 + 4 + + /* N x PartCnt+Partition+CurrentLeaderEpoch+FetchOffset+ + * LogStartOffset+MaxBytes+?TopicNameLen?*/ + (rkb->rkb_active_toppar_cnt * (4 + 4 + 4 + 8 + 8 + 4 + 40)) + + /* ForgottenTopicsCnt */ + 4 + + /* N x ForgottenTopicsData */ + 0); + + ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, RD_KAFKAP_Fetch, + 0, 11, NULL); + + if (rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER2) + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, + RD_KAFKA_FEATURE_MSGVER2); + else if (rkb->rkb_features & RD_KAFKA_FEATURE_MSGVER1) + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, + RD_KAFKA_FEATURE_MSGVER1); + else if (rkb->rkb_features & RD_KAFKA_FEATURE_THROTTLETIME) + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, + RD_KAFKA_FEATURE_THROTTLETIME); + + + /* FetchRequest header */ + /* ReplicaId */ + rd_kafka_buf_write_i32(rkbuf, -1); + /* MaxWaitTime */ + rd_kafka_buf_write_i32(rkbuf, rkb->rkb_rk->rk_conf.fetch_wait_max_ms); + /* MinBytes */ + rd_kafka_buf_write_i32(rkbuf, rkb->rkb_rk->rk_conf.fetch_min_bytes); + + if (rd_kafka_buf_ApiVersion(rkbuf) >= 3) + /* MaxBytes */ + rd_kafka_buf_write_i32(rkbuf, + rkb->rkb_rk->rk_conf.fetch_max_bytes); + + if (rd_kafka_buf_ApiVersion(rkbuf) >= 4) + /* IsolationLevel */ + rd_kafka_buf_write_i8(rkbuf, + rkb->rkb_rk->rk_conf.isolation_level); + + if (rd_kafka_buf_ApiVersion(rkbuf) >= 7) { + /* SessionId */ + rd_kafka_buf_write_i32(rkbuf, 0); + /* Epoch */ + rd_kafka_buf_write_i32(rkbuf, -1); + } + + /* Write zero TopicArrayCnt but store pointer for later update */ + of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); + + /* Prepare map for storing the fetch version for each partition, + * this will later be checked in Fetch response to purge outdated + * responses (e.g., after a seek). */ + rkbuf->rkbuf_rktp_vers = + rd_list_new(0, (void *)rd_kafka_toppar_ver_destroy); + rd_list_prealloc_elems(rkbuf->rkbuf_rktp_vers, + sizeof(struct rd_kafka_toppar_ver), + rkb->rkb_active_toppar_cnt, 0); + + /* Round-robin start of the list. */ + rktp = rkb->rkb_active_toppar_next; + do { + struct rd_kafka_toppar_ver *tver; + + if (rkt_last != rktp->rktp_rkt) { + if (rkt_last != NULL) { + /* Update PartitionArrayCnt */ + rd_kafka_buf_update_i32(rkbuf, + of_PartitionArrayCnt, + PartitionArrayCnt); + } + + /* Topic name */ + rd_kafka_buf_write_kstr(rkbuf, + rktp->rktp_rkt->rkt_topic); + TopicArrayCnt++; + rkt_last = rktp->rktp_rkt; + /* Partition count */ + of_PartitionArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); + PartitionArrayCnt = 0; + } + + PartitionArrayCnt++; + + /* Partition */ + rd_kafka_buf_write_i32(rkbuf, rktp->rktp_partition); + + if (rd_kafka_buf_ApiVersion(rkbuf) >= 9) { + /* CurrentLeaderEpoch */ + if (rktp->rktp_leader_epoch < 0 && + rd_kafka_has_reliable_leader_epochs(rkb)) { + /* If current leader epoch is set to -1 and + * the broker has reliable leader epochs, + * send 0 instead, so that epoch is checked + * and optionally metadata is refreshed. + * This can happen if metadata is read initially + * without an existing topic (see + * rd_kafka_topic_metadata_update2). + * TODO: have a private metadata struct that + * stores leader epochs before topic creation. + */ + rd_kafka_buf_write_i32(rkbuf, 0); + } else { + rd_kafka_buf_write_i32(rkbuf, + rktp->rktp_leader_epoch); + } + } + + /* FetchOffset */ + rd_kafka_buf_write_i64(rkbuf, + rktp->rktp_offsets.fetch_pos.offset); + + if (rd_kafka_buf_ApiVersion(rkbuf) >= 5) + /* LogStartOffset - only used by follower replica */ + rd_kafka_buf_write_i64(rkbuf, -1); + + /* MaxBytes */ + rd_kafka_buf_write_i32(rkbuf, rktp->rktp_fetch_msg_max_bytes); + + rd_rkb_dbg(rkb, FETCH, "FETCH", + "Fetch topic %.*s [%" PRId32 "] at offset %" PRId64 + " (leader epoch %" PRId32 + ", current leader epoch %" PRId32 ", v%d)", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rktp->rktp_offsets.fetch_pos.offset, + rktp->rktp_offsets.fetch_pos.leader_epoch, + rktp->rktp_leader_epoch, rktp->rktp_fetch_version); + + /* We must have a valid fetch offset when we get here */ + rd_dassert(rktp->rktp_offsets.fetch_pos.offset >= 0); + + /* Add toppar + op version mapping. */ + tver = rd_list_add(rkbuf->rkbuf_rktp_vers, NULL); + tver->rktp = rd_kafka_toppar_keep(rktp); + tver->version = rktp->rktp_fetch_version; + + cnt++; + } while ((rktp = CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars, rktp, + rktp_activelink)) != + rkb->rkb_active_toppar_next); + + /* Update next toppar to fetch in round-robin list. */ + rd_kafka_broker_active_toppar_next( + rkb, rktp ? CIRCLEQ_LOOP_NEXT(&rkb->rkb_active_toppars, rktp, + rktp_activelink) + : NULL); + + rd_rkb_dbg(rkb, FETCH, "FETCH", "Fetch %i/%i/%i toppar(s)", cnt, + rkb->rkb_active_toppar_cnt, rkb->rkb_toppar_cnt); + if (!cnt) { + rd_kafka_buf_destroy(rkbuf); + return cnt; + } + + if (rkt_last != NULL) { + /* Update last topic's PartitionArrayCnt */ + rd_kafka_buf_update_i32(rkbuf, of_PartitionArrayCnt, + PartitionArrayCnt); + } + + /* Update TopicArrayCnt */ + rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, TopicArrayCnt); + + + if (rd_kafka_buf_ApiVersion(rkbuf) >= 7) + /* Length of the ForgottenTopics list (KIP-227). Broker + * use only - not used by the consumer. */ + rd_kafka_buf_write_i32(rkbuf, 0); + + if (rd_kafka_buf_ApiVersion(rkbuf) >= 11) + /* RackId */ + rd_kafka_buf_write_kstr(rkbuf, + rkb->rkb_rk->rk_conf.client_rack); + + /* Consider Fetch requests blocking if fetch.wait.max.ms >= 1s */ + if (rkb->rkb_rk->rk_conf.fetch_wait_max_ms >= 1000) + rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING; + + /* Use configured timeout */ + rd_kafka_buf_set_timeout(rkbuf, + rkb->rkb_rk->rk_conf.socket_timeout_ms + + rkb->rkb_rk->rk_conf.fetch_wait_max_ms, + now); + + /* Sort toppar versions for quicker lookups in Fetch response. */ + rd_list_sort(rkbuf->rkbuf_rktp_vers, rd_kafka_toppar_ver_cmp); + + rkb->rkb_fetching = 1; + rd_kafka_broker_buf_enq1(rkb, rkbuf, rd_kafka_broker_fetch_reply, NULL); + + return cnt; +} + + + +/** + * @brief Decide whether this toppar should be on the fetch list or not. + * + * Also: + * - update toppar's op version (for broker thread's copy) + * - finalize statistics (move rktp_offsets to rktp_offsets_fin) + * + * @returns the partition's Fetch backoff timestamp, or 0 if no backoff. + * + * @locality broker thread + * @locks none + */ +rd_ts_t rd_kafka_toppar_fetch_decide(rd_kafka_toppar_t *rktp, + rd_kafka_broker_t *rkb, + int force_remove) { + int should_fetch = 1; + const char *reason = ""; + int32_t version; + rd_ts_t ts_backoff = 0; + rd_bool_t lease_expired = rd_false; + + rd_kafka_toppar_lock(rktp); + + /* Check for preferred replica lease expiry */ + lease_expired = rktp->rktp_leader_id != rktp->rktp_broker_id && + rd_interval(&rktp->rktp_lease_intvl, + 5 * 60 * 1000 * 1000 /*5 minutes*/, 0) > 0; + if (lease_expired) { + /* delete_to_leader() requires no locks to be held */ + rd_kafka_toppar_unlock(rktp); + rd_kafka_toppar_delegate_to_leader(rktp); + rd_kafka_toppar_lock(rktp); + + reason = "preferred replica lease expired"; + should_fetch = 0; + goto done; + } + + /* Forced removal from fetch list */ + if (unlikely(force_remove)) { + reason = "forced removal"; + should_fetch = 0; + goto done; + } + + if (unlikely((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) != 0)) { + reason = "partition removed"; + should_fetch = 0; + goto done; + } + + /* Skip toppars not in active fetch state */ + if (rktp->rktp_fetch_state != RD_KAFKA_TOPPAR_FETCH_ACTIVE) { + reason = "not in active fetch state"; + should_fetch = 0; + goto done; + } + + /* Update broker thread's fetch op version */ + version = rktp->rktp_op_version; + if (version > rktp->rktp_fetch_version || + rd_kafka_fetch_pos_cmp(&rktp->rktp_next_fetch_start, + &rktp->rktp_last_next_fetch_start) || + rktp->rktp_offsets.fetch_pos.offset == RD_KAFKA_OFFSET_INVALID) { + /* New version barrier, something was modified from the + * control plane. Reset and start over. + * Alternatively only the next_offset changed but not the + * barrier, which is the case when automatically triggering + * offset.reset (such as on PARTITION_EOF or + * OFFSET_OUT_OF_RANGE). */ + + rd_kafka_dbg( + rktp->rktp_rkt->rkt_rk, TOPIC, "FETCHDEC", + "Topic %s [%" PRId32 + "]: fetch decide: " + "updating to version %d (was %d) at %s " + "(was %s)", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + version, rktp->rktp_fetch_version, + rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start), + rd_kafka_fetch_pos2str(rktp->rktp_offsets.fetch_pos)); + + rd_kafka_offset_stats_reset(&rktp->rktp_offsets); + + /* New start offset */ + rktp->rktp_offsets.fetch_pos = rktp->rktp_next_fetch_start; + rktp->rktp_last_next_fetch_start = rktp->rktp_next_fetch_start; + + rktp->rktp_fetch_version = version; + + /* Clear last error to propagate new fetch + * errors if encountered. */ + rktp->rktp_last_error = RD_KAFKA_RESP_ERR_NO_ERROR; + + rd_kafka_q_purge_toppar_version(rktp->rktp_fetchq, rktp, + version); + } + + + if (RD_KAFKA_TOPPAR_IS_PAUSED(rktp)) { + should_fetch = 0; + reason = "paused"; + + } else if (RD_KAFKA_OFFSET_IS_LOGICAL( + rktp->rktp_next_fetch_start.offset)) { + should_fetch = 0; + reason = "no concrete offset"; + + } else if (rd_kafka_q_len(rktp->rktp_fetchq) >= + rkb->rkb_rk->rk_conf.queued_min_msgs) { + /* Skip toppars who's local message queue is already above + * the lower threshold. */ + reason = "queued.min.messages exceeded"; + should_fetch = 0; + + } else if ((int64_t)rd_kafka_q_size(rktp->rktp_fetchq) >= + rkb->rkb_rk->rk_conf.queued_max_msg_bytes) { + reason = "queued.max.messages.kbytes exceeded"; + should_fetch = 0; + + } else if (rktp->rktp_ts_fetch_backoff > rd_clock()) { + reason = "fetch backed off"; + ts_backoff = rktp->rktp_ts_fetch_backoff; + should_fetch = 0; + } + +done: + /* Copy offset stats to finalized place holder. */ + rktp->rktp_offsets_fin = rktp->rktp_offsets; + + if (rktp->rktp_fetch != should_fetch) { + rd_rkb_dbg( + rkb, FETCH, "FETCH", + "Topic %s [%" PRId32 + "] in state %s at %s " + "(%d/%d msgs, %" PRId64 + "/%d kb queued, " + "opv %" PRId32 ") is %s%s", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + rd_kafka_fetch_states[rktp->rktp_fetch_state], + rd_kafka_fetch_pos2str(rktp->rktp_next_fetch_start), + rd_kafka_q_len(rktp->rktp_fetchq), + rkb->rkb_rk->rk_conf.queued_min_msgs, + rd_kafka_q_size(rktp->rktp_fetchq) / 1024, + rkb->rkb_rk->rk_conf.queued_max_msg_kbytes, + rktp->rktp_fetch_version, + should_fetch ? "fetchable" : "not fetchable: ", reason); + + if (should_fetch) { + rd_dassert(rktp->rktp_fetch_version > 0); + rd_kafka_broker_active_toppar_add( + rkb, rktp, *reason ? reason : "fetchable"); + } else { + rd_kafka_broker_active_toppar_del(rkb, rktp, reason); + } + } + + rd_kafka_toppar_unlock(rktp); + + /* Non-fetching partitions will have an + * indefinate backoff, unless explicitly specified. */ + if (!should_fetch && !ts_backoff) + ts_backoff = RD_TS_MAX; + + return ts_backoff; +} |