summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c5378
1 files changed, 5378 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c
new file mode 100644
index 000000000..12d9eb30e
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c
@@ -0,0 +1,5378 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdarg.h>
+
+#include "rdkafka_int.h"
+#include "rdkafka_request.h"
+#include "rdkafka_broker.h"
+#include "rdkafka_offset.h"
+#include "rdkafka_topic.h"
+#include "rdkafka_partition.h"
+#include "rdkafka_metadata.h"
+#include "rdkafka_msgset.h"
+#include "rdkafka_idempotence.h"
+#include "rdkafka_txnmgr.h"
+#include "rdkafka_sasl.h"
+
+#include "rdrand.h"
+#include "rdstring.h"
+#include "rdunittest.h"
+
+
+/**
+ * Kafka protocol request and response handling.
+ * All of this code runs in the broker thread and uses op queues for
+ * propagating results back to the various sub-systems operating in
+ * other threads.
+ */
+
+
+/* RD_KAFKA_ERR_ACTION_.. to string map */
+static const char *rd_kafka_actions_descs[] = {
+ "Permanent", "Ignore", "Refresh", "Retry",
+ "Inform", "Special", "MsgNotPersisted", "MsgPossiblyPersisted",
+ "MsgPersisted", NULL,
+};
+
+const char *rd_kafka_actions2str(int actions) {
+ static RD_TLS char actstr[128];
+ return rd_flags2str(actstr, sizeof(actstr), rd_kafka_actions_descs,
+ actions);
+}
+
+
+/**
+ * @brief Decide action(s) to take based on the returned error code.
+ *
+ * The optional var-args is a .._ACTION_END terminated list
+ * of action,error tuples which overrides the general behaviour.
+ * It is to be read as: for \p error, return \p action(s).
+ *
+ * @warning \p request, \p rkbuf and \p rkb may be NULL.
+ */
+int rd_kafka_err_action(rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ const rd_kafka_buf_t *request,
+ ...) {
+ va_list ap;
+ int actions = 0;
+ int exp_act;
+
+ if (!err)
+ return 0;
+
+ /* Match explicitly defined error mappings first. */
+ va_start(ap, request);
+ while ((exp_act = va_arg(ap, int))) {
+ int exp_err = va_arg(ap, int);
+
+ if (err == exp_err)
+ actions |= exp_act;
+ }
+ va_end(ap);
+
+ /* Explicit error match. */
+ if (actions) {
+ if (err && rkb && request)
+ rd_rkb_dbg(
+ rkb, BROKER, "REQERR",
+ "%sRequest failed: %s: explicit actions %s",
+ rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey),
+ rd_kafka_err2str(err),
+ rd_kafka_actions2str(actions));
+
+ return actions;
+ }
+
+ /* Default error matching */
+ switch (err) {
+ case RD_KAFKA_RESP_ERR_NO_ERROR:
+ break;
+ case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
+ case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION:
+ case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
+ case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
+ case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
+ case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
+ case RD_KAFKA_RESP_ERR__WAIT_COORD:
+ /* Request metadata information update */
+ actions |= RD_KAFKA_ERR_ACTION_REFRESH |
+ RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED;
+ break;
+
+ case RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR:
+ /* Request metadata update and retry */
+ actions |= RD_KAFKA_ERR_ACTION_REFRESH |
+ RD_KAFKA_ERR_ACTION_RETRY |
+ RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED;
+ break;
+
+ case RD_KAFKA_RESP_ERR__TRANSPORT:
+ case RD_KAFKA_RESP_ERR__TIMED_OUT:
+ case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
+ case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND:
+ actions |= RD_KAFKA_ERR_ACTION_RETRY |
+ RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED;
+ break;
+
+ case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS:
+ /* Client-side wait-response/in-queue timeout */
+ case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE:
+ actions |= RD_KAFKA_ERR_ACTION_RETRY |
+ RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED;
+ break;
+
+ case RD_KAFKA_RESP_ERR__PURGE_INFLIGHT:
+ actions |= RD_KAFKA_ERR_ACTION_PERMANENT |
+ RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED;
+ break;
+
+ case RD_KAFKA_RESP_ERR__BAD_MSG:
+ /* Buffer parse failures are typically a client-side bug,
+ * treat them as permanent failures. */
+ actions |= RD_KAFKA_ERR_ACTION_PERMANENT |
+ RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED;
+ break;
+
+ case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS:
+ actions |= RD_KAFKA_ERR_ACTION_RETRY;
+ break;
+
+ case RD_KAFKA_RESP_ERR__DESTROY:
+ case RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT:
+ case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE:
+ case RD_KAFKA_RESP_ERR__PURGE_QUEUE:
+ default:
+ actions |= RD_KAFKA_ERR_ACTION_PERMANENT |
+ RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED;
+ break;
+ }
+
+ /* Fatal or permanent errors are not retriable */
+ if (actions &
+ (RD_KAFKA_ERR_ACTION_FATAL | RD_KAFKA_ERR_ACTION_PERMANENT))
+ actions &= ~RD_KAFKA_ERR_ACTION_RETRY;
+
+ /* If no request buffer was specified, which might be the case
+ * in certain error call chains, mask out the retry action. */
+ if (!request)
+ actions &= ~RD_KAFKA_ERR_ACTION_RETRY;
+ else if (request->rkbuf_reqhdr.ApiKey != RD_KAFKAP_Produce)
+ /* Mask out message-related bits for non-Produce requests */
+ actions &= ~RD_KAFKA_ERR_ACTION_MSG_FLAGS;
+
+ if (err && actions && rkb && request)
+ rd_rkb_dbg(
+ rkb, BROKER, "REQERR", "%sRequest failed: %s: actions %s",
+ rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey),
+ rd_kafka_err2str(err), rd_kafka_actions2str(actions));
+
+ return actions;
+}
+
+
+/**
+ * @brief Read a list of topic+partitions+extra from \p rkbuf.
+ *
+ * @param rkbuf buffer to read from
+ * @param fields An array of fields to read from the buffer and set on
+ * the rktpar object, in the specified order, must end
+ * with RD_KAFKA_TOPIC_PARTITION_FIELD_END.
+ *
+ * @returns a newly allocated list on success, or NULL on parse error.
+ */
+rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions(
+ rd_kafka_buf_t *rkbuf,
+ size_t estimated_part_cnt,
+ const rd_kafka_topic_partition_field_t *fields) {
+ const int log_decode_errors = LOG_ERR;
+ int32_t TopicArrayCnt;
+ rd_kafka_topic_partition_list_t *parts = NULL;
+
+ rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX);
+
+ parts = rd_kafka_topic_partition_list_new(
+ RD_MAX(TopicArrayCnt * 4, (int)estimated_part_cnt));
+
+ while (TopicArrayCnt-- > 0) {
+ rd_kafkap_str_t kTopic;
+ int32_t PartArrayCnt;
+ char *topic;
+
+ rd_kafka_buf_read_str(rkbuf, &kTopic);
+ rd_kafka_buf_read_arraycnt(rkbuf, &PartArrayCnt,
+ RD_KAFKAP_PARTITIONS_MAX);
+
+ RD_KAFKAP_STR_DUPA(&topic, &kTopic);
+
+ while (PartArrayCnt-- > 0) {
+ int32_t Partition = -1, Epoch = -1234,
+ CurrentLeaderEpoch = -1234;
+ int64_t Offset = -1234;
+ int16_t ErrorCode = 0;
+ rd_kafka_topic_partition_t *rktpar;
+ int fi;
+
+ /*
+ * Read requested fields
+ */
+ for (fi = 0;
+ fields[fi] != RD_KAFKA_TOPIC_PARTITION_FIELD_END;
+ fi++) {
+ switch (fields[fi]) {
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION:
+ rd_kafka_buf_read_i32(rkbuf,
+ &Partition);
+ break;
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET:
+ rd_kafka_buf_read_i64(rkbuf, &Offset);
+ break;
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_CURRENT_EPOCH:
+ rd_kafka_buf_read_i32(
+ rkbuf, &CurrentLeaderEpoch);
+ break;
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH:
+ rd_kafka_buf_read_i32(rkbuf, &Epoch);
+ break;
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_ERR:
+ rd_kafka_buf_read_i16(rkbuf,
+ &ErrorCode);
+ break;
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA:
+ rd_assert(!*"metadata not implemented");
+ break;
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP:
+ break;
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_END:
+ break;
+ }
+ }
+
+ rktpar = rd_kafka_topic_partition_list_add(parts, topic,
+ Partition);
+ /* Use dummy sentinel values that are unlikely to be
+ * seen from the broker to know if we are to set these
+ * fields or not. */
+ if (Offset != -1234)
+ rktpar->offset = Offset;
+ if (Epoch != -1234)
+ rd_kafka_topic_partition_set_leader_epoch(
+ rktpar, Epoch);
+ if (CurrentLeaderEpoch != -1234)
+ rd_kafka_topic_partition_set_current_leader_epoch(
+ rktpar, CurrentLeaderEpoch);
+ rktpar->err = ErrorCode;
+
+
+ rd_kafka_buf_skip_tags(rkbuf);
+ }
+
+ rd_kafka_buf_skip_tags(rkbuf);
+ }
+
+ return parts;
+
+err_parse:
+ if (parts)
+ rd_kafka_topic_partition_list_destroy(parts);
+
+ return NULL;
+}
+
+
+/**
+ * @brief Write a list of topic+partitions+offsets+extra to \p rkbuf
+ *
+ * @returns the number of partitions written to buffer.
+ *
+ * @remark The \p parts list MUST be sorted.
+ */
+int rd_kafka_buf_write_topic_partitions(
+ rd_kafka_buf_t *rkbuf,
+ const rd_kafka_topic_partition_list_t *parts,
+ rd_bool_t skip_invalid_offsets,
+ rd_bool_t only_invalid_offsets,
+ const rd_kafka_topic_partition_field_t *fields) {
+ size_t of_TopicArrayCnt;
+ size_t of_PartArrayCnt = 0;
+ int TopicArrayCnt = 0, PartArrayCnt = 0;
+ int i;
+ const char *prev_topic = NULL;
+ int cnt = 0;
+
+ rd_assert(!only_invalid_offsets ||
+ (only_invalid_offsets != skip_invalid_offsets));
+
+ /* TopicArrayCnt */
+ of_TopicArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf);
+
+ for (i = 0; i < parts->cnt; i++) {
+ const rd_kafka_topic_partition_t *rktpar = &parts->elems[i];
+ int fi;
+
+ if (rktpar->offset < 0) {
+ if (skip_invalid_offsets)
+ continue;
+ } else if (only_invalid_offsets)
+ continue;
+
+ if (!prev_topic || strcmp(rktpar->topic, prev_topic)) {
+ /* Finish previous topic, if any. */
+ if (of_PartArrayCnt > 0) {
+ rd_kafka_buf_finalize_arraycnt(
+ rkbuf, of_PartArrayCnt, PartArrayCnt);
+ /* Tags for previous topic struct */
+ rd_kafka_buf_write_tags(rkbuf);
+ }
+
+
+ /* Topic */
+ rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
+ TopicArrayCnt++;
+ prev_topic = rktpar->topic;
+ /* New topic so reset partition count */
+ PartArrayCnt = 0;
+
+ /* PartitionArrayCnt: updated later */
+ of_PartArrayCnt =
+ rd_kafka_buf_write_arraycnt_pos(rkbuf);
+ }
+
+
+ /*
+ * Write requested fields
+ */
+ for (fi = 0; fields[fi] != RD_KAFKA_TOPIC_PARTITION_FIELD_END;
+ fi++) {
+ switch (fields[fi]) {
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION:
+ rd_kafka_buf_write_i32(rkbuf,
+ rktpar->partition);
+ break;
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET:
+ rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
+ break;
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_CURRENT_EPOCH:
+ rd_kafka_buf_write_i32(
+ rkbuf,
+ rd_kafka_topic_partition_get_current_leader_epoch(
+ rktpar));
+ break;
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH:
+ rd_kafka_buf_write_i32(
+ rkbuf,
+ rd_kafka_topic_partition_get_leader_epoch(
+ rktpar));
+ break;
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_ERR:
+ rd_kafka_buf_write_i16(rkbuf, rktpar->err);
+ break;
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA:
+ /* Java client 0.9.0 and broker <0.10.0 can't
+ * parse Null metadata fields, so as a
+ * workaround we send an empty string if
+ * it's Null. */
+ if (!rktpar->metadata)
+ rd_kafka_buf_write_str(rkbuf, "", 0);
+ else
+ rd_kafka_buf_write_str(
+ rkbuf, rktpar->metadata,
+ rktpar->metadata_size);
+ break;
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP:
+ break;
+ case RD_KAFKA_TOPIC_PARTITION_FIELD_END:
+ break;
+ }
+ }
+
+
+ if (fi > 1)
+ /* If there was more than one field written
+ * then this was a struct and thus needs the
+ * struct suffix tags written. */
+ rd_kafka_buf_write_tags(rkbuf);
+
+ PartArrayCnt++;
+ cnt++;
+ }
+
+ if (of_PartArrayCnt > 0) {
+ rd_kafka_buf_finalize_arraycnt(rkbuf, of_PartArrayCnt,
+ PartArrayCnt);
+ /* Tags for topic struct */
+ rd_kafka_buf_write_tags(rkbuf);
+ }
+
+ rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicArrayCnt, TopicArrayCnt);
+
+ return cnt;
+}
+
+
+/**
+ * @brief Send FindCoordinatorRequest.
+ *
+ * @param coordkey is the group.id for RD_KAFKA_COORD_GROUP,
+ * and the transactional.id for RD_KAFKA_COORD_TXN
+ */
+rd_kafka_resp_err_t
+rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb,
+ rd_kafka_coordtype_t coordtype,
+ const char *coordkey,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_FindCoordinator, 0, 2, NULL);
+
+ if (coordtype != RD_KAFKA_COORD_GROUP && ApiVersion < 1)
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_FindCoordinator, 1,
+ 1 + 2 + strlen(coordkey));
+
+ rd_kafka_buf_write_str(rkbuf, coordkey, -1);
+
+ if (ApiVersion >= 1)
+ rd_kafka_buf_write_i8(rkbuf, (int8_t)coordtype);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+/**
+ * @brief Parses a ListOffsets reply.
+ *
+ * Returns the parsed offsets (and errors) in \p offsets which must have been
+ * initialized by caller.
+ *
+ * @returns 0 on success, else an error (\p offsets may be completely or
+ * partially updated, depending on the nature of the error, and per
+ * partition error codes should be checked by the caller).
+ */
+static rd_kafka_resp_err_t
+rd_kafka_parse_ListOffsets(rd_kafka_buf_t *rkbuf,
+ rd_kafka_topic_partition_list_t *offsets) {
+ const int log_decode_errors = LOG_ERR;
+ int32_t TopicArrayCnt;
+ int16_t api_version;
+ rd_kafka_resp_err_t all_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+
+ api_version = rkbuf->rkbuf_reqhdr.ApiVersion;
+
+ if (api_version >= 2)
+ rd_kafka_buf_read_throttle_time(rkbuf);
+
+ /* NOTE:
+ * Broker may return offsets in a different constellation than
+ * in the original request .*/
+
+ rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
+ while (TopicArrayCnt-- > 0) {
+ rd_kafkap_str_t ktopic;
+ int32_t PartArrayCnt;
+ char *topic_name;
+
+ rd_kafka_buf_read_str(rkbuf, &ktopic);
+ rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
+
+ RD_KAFKAP_STR_DUPA(&topic_name, &ktopic);
+
+ while (PartArrayCnt-- > 0) {
+ int32_t kpartition;
+ int16_t ErrorCode;
+ int32_t OffsetArrayCnt;
+ int64_t Offset = -1;
+ int32_t LeaderEpoch = -1;
+ rd_kafka_topic_partition_t *rktpar;
+
+ rd_kafka_buf_read_i32(rkbuf, &kpartition);
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+
+ if (api_version >= 1) {
+ int64_t Timestamp;
+ rd_kafka_buf_read_i64(rkbuf, &Timestamp);
+ rd_kafka_buf_read_i64(rkbuf, &Offset);
+ if (api_version >= 4)
+ rd_kafka_buf_read_i32(rkbuf,
+ &LeaderEpoch);
+ } else if (api_version == 0) {
+ rd_kafka_buf_read_i32(rkbuf, &OffsetArrayCnt);
+ /* We only request one offset so just grab
+ * the first one. */
+ while (OffsetArrayCnt-- > 0)
+ rd_kafka_buf_read_i64(rkbuf, &Offset);
+ } else {
+ RD_NOTREACHED();
+ }
+
+ rktpar = rd_kafka_topic_partition_list_add(
+ offsets, topic_name, kpartition);
+ rktpar->err = ErrorCode;
+ rktpar->offset = Offset;
+ rd_kafka_topic_partition_set_leader_epoch(rktpar,
+ LeaderEpoch);
+
+ if (ErrorCode && !all_err)
+ all_err = ErrorCode;
+ }
+ }
+
+ return all_err;
+
+err_parse:
+ return rkbuf->rkbuf_err;
+}
+
+
+
+/**
+ * @brief Parses and handles ListOffsets replies.
+ *
+ * Returns the parsed offsets (and errors) in \p offsets.
+ * \p offsets must be initialized by the caller.
+ *
+ * @returns 0 on success, else an error. \p offsets may be populated on error,
+ * depending on the nature of the error.
+ * On error \p actionsp (unless NULL) is updated with the recommended
+ * error actions.
+ */
+rd_kafka_resp_err_t
+rd_kafka_handle_ListOffsets(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ rd_kafka_topic_partition_list_t *offsets,
+ int *actionsp) {
+
+ int actions;
+
+ if (!err)
+ err = rd_kafka_parse_ListOffsets(rkbuf, offsets);
+ if (!err)
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+
+ actions = rd_kafka_err_action(
+ rkb, err, request, RD_KAFKA_ERR_ACTION_PERMANENT,
+ RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
+
+ RD_KAFKA_ERR_ACTION_REFRESH,
+ RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
+
+ RD_KAFKA_ERR_ACTION_REFRESH,
+ RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE,
+
+ RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
+
+ RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE,
+
+ RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY,
+ RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE,
+
+ RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY,
+ RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH,
+
+ RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY,
+ RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH,
+
+ RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TRANSPORT,
+
+ RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
+
+
+ RD_KAFKA_ERR_ACTION_END);
+
+ if (actionsp)
+ *actionsp = actions;
+
+ if (rkb)
+ rd_rkb_dbg(
+ rkb, TOPIC, "OFFSET", "OffsetRequest failed: %s (%s)",
+ rd_kafka_err2str(err), rd_kafka_actions2str(actions));
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ char tmp[256];
+ /* Re-query for leader */
+ rd_snprintf(tmp, sizeof(tmp), "ListOffsetsRequest failed: %s",
+ rd_kafka_err2str(err));
+ rd_kafka_metadata_refresh_known_topics(rk, NULL,
+ rd_true /*force*/, tmp);
+ }
+
+ if ((actions & RD_KAFKA_ERR_ACTION_RETRY) &&
+ rd_kafka_buf_retry(rkb, request))
+ return RD_KAFKA_RESP_ERR__IN_PROGRESS;
+
+ return err;
+}
+
+
+
+/**
+ * @brief Async maker for ListOffsetsRequest.
+ */
+static rd_kafka_resp_err_t
+rd_kafka_make_ListOffsetsRequest(rd_kafka_broker_t *rkb,
+ rd_kafka_buf_t *rkbuf,
+ void *make_opaque) {
+ const rd_kafka_topic_partition_list_t *partitions =
+ (const rd_kafka_topic_partition_list_t *)make_opaque;
+ int i;
+ size_t of_TopicArrayCnt = 0, of_PartArrayCnt = 0;
+ const char *last_topic = "";
+ int32_t topic_cnt = 0, part_cnt = 0;
+ int16_t ApiVersion;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_ListOffsets, 0, 5, NULL);
+ if (ApiVersion == -1)
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+
+ /* ReplicaId */
+ rd_kafka_buf_write_i32(rkbuf, -1);
+
+ /* IsolationLevel */
+ if (ApiVersion >= 2)
+ rd_kafka_buf_write_i8(rkbuf,
+ rkb->rkb_rk->rk_conf.isolation_level);
+
+ /* TopicArrayCnt */
+ of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* updated later */
+
+ for (i = 0; i < partitions->cnt; i++) {
+ const rd_kafka_topic_partition_t *rktpar =
+ &partitions->elems[i];
+
+ if (strcmp(rktpar->topic, last_topic)) {
+ /* Finish last topic, if any. */
+ if (of_PartArrayCnt > 0)
+ rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt,
+ part_cnt);
+
+ /* Topic */
+ rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
+ topic_cnt++;
+ last_topic = rktpar->topic;
+ /* New topic so reset partition count */
+ part_cnt = 0;
+
+ /* PartitionArrayCnt: updated later */
+ of_PartArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+ }
+
+ /* Partition */
+ rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
+ part_cnt++;
+
+ if (ApiVersion >= 4)
+ /* CurrentLeaderEpoch */
+ rd_kafka_buf_write_i32(
+ rkbuf,
+ rd_kafka_topic_partition_get_current_leader_epoch(
+ rktpar));
+
+ /* Time/Offset */
+ rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
+
+ if (ApiVersion == 0) {
+ /* MaxNumberOfOffsets */
+ rd_kafka_buf_write_i32(rkbuf, 1);
+ }
+ }
+
+ if (of_PartArrayCnt > 0) {
+ rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, part_cnt);
+ rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, topic_cnt);
+ }
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_rkb_dbg(rkb, TOPIC, "OFFSET",
+ "ListOffsetsRequest (v%hd, opv %d) "
+ "for %" PRId32 " topic(s) and %" PRId32 " partition(s)",
+ ApiVersion, rkbuf->rkbuf_replyq.version, topic_cnt,
+ partitions->cnt);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief Send ListOffsetsRequest for partitions in \p partitions.
+ */
+void rd_kafka_ListOffsetsRequest(rd_kafka_broker_t *rkb,
+ rd_kafka_topic_partition_list_t *partitions,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ rd_kafka_topic_partition_list_t *make_parts;
+
+ make_parts = rd_kafka_topic_partition_list_copy(partitions);
+ rd_kafka_topic_partition_list_sort_by_topic(make_parts);
+
+ rkbuf = rd_kafka_buf_new_request(
+ rkb, RD_KAFKAP_ListOffsets, 1,
+ /* ReplicaId+IsolationLevel+TopicArrayCnt+Topic */
+ 4 + 1 + 4 + 100 +
+ /* PartArrayCnt */
+ 4 +
+ /* partition_cnt * Partition+Time+MaxNumOffs */
+ (make_parts->cnt * (4 + 8 + 4)));
+
+ /* Postpone creating the request contents until time to send,
+ * at which time the ApiVersion is known. */
+ rd_kafka_buf_set_maker(rkbuf, rd_kafka_make_ListOffsetsRequest,
+ make_parts,
+ rd_kafka_topic_partition_list_destroy_free);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+/**
+ * @brief OffsetForLeaderEpochResponse handler.
+ */
+rd_kafka_resp_err_t rd_kafka_handle_OffsetForLeaderEpoch(
+ rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ rd_kafka_topic_partition_list_t **offsets) {
+ const int log_decode_errors = LOG_ERR;
+ int16_t ApiVersion;
+
+ if (err)
+ goto err;
+
+ ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;
+
+ if (ApiVersion >= 2)
+ rd_kafka_buf_read_throttle_time(rkbuf);
+
+ const rd_kafka_topic_partition_field_t fields[] = {
+ RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
+ RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
+ ApiVersion >= 1 ? RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH
+ : RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP,
+ RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET,
+ RD_KAFKA_TOPIC_PARTITION_FIELD_END};
+ *offsets = rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields);
+ if (!*offsets)
+ goto err_parse;
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+
+err:
+ return err;
+
+err_parse:
+ err = rkbuf->rkbuf_err;
+ goto err;
+}
+
+
+/**
+ * @brief Send OffsetForLeaderEpochRequest for partition(s).
+ *
+ */
+void rd_kafka_OffsetForLeaderEpochRequest(
+ rd_kafka_broker_t *rkb,
+ rd_kafka_topic_partition_list_t *parts,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_OffsetForLeaderEpoch, 2, 2, NULL);
+ /* If the supported ApiVersions are not yet known,
+ * or this broker doesn't support it, we let this request
+ * succeed or fail later from the broker thread where the
+ * version is checked again. */
+ if (ApiVersion == -1)
+ ApiVersion = 2;
+
+ rkbuf = rd_kafka_buf_new_flexver_request(
+ rkb, RD_KAFKAP_OffsetForLeaderEpoch, 1, 4 + (parts->cnt * 64),
+ ApiVersion >= 4 /*flexver*/);
+
+ /* Sort partitions by topic */
+ rd_kafka_topic_partition_list_sort_by_topic(parts);
+
+ /* Write partition list */
+ const rd_kafka_topic_partition_field_t fields[] = {
+ RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
+ /* CurrentLeaderEpoch */
+ RD_KAFKA_TOPIC_PARTITION_FIELD_CURRENT_EPOCH,
+ /* LeaderEpoch */
+ RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH,
+ RD_KAFKA_TOPIC_PARTITION_FIELD_END};
+ rd_kafka_buf_write_topic_partitions(
+ rkbuf, parts, rd_false /*include invalid offsets*/,
+ rd_false /*skip valid offsets */, fields);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ /* Let caller perform retries */
+ rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+
+/**
+ * Generic handler for OffsetFetch responses.
+ * Offsets for included partitions will be propagated through the passed
+ * 'offsets' list.
+ *
+ * @param rkbuf response buffer, may be NULL if \p err is set.
+ * @param update_toppar update toppar's committed_offset
+ * @param add_part if true add partitions from the response to \p *offsets,
+ * else just update the partitions that are already
+ * in \p *offsets.
+ */
+rd_kafka_resp_err_t
+rd_kafka_handle_OffsetFetch(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ rd_kafka_topic_partition_list_t **offsets,
+ rd_bool_t update_toppar,
+ rd_bool_t add_part,
+ rd_bool_t allow_retry) {
+ const int log_decode_errors = LOG_ERR;
+ int32_t TopicArrayCnt;
+ int64_t offset = RD_KAFKA_OFFSET_INVALID;
+ int16_t ApiVersion;
+ rd_kafkap_str_t metadata;
+ int retry_unstable = 0;
+ int i;
+ int actions;
+ int seen_cnt = 0;
+
+ if (err)
+ goto err;
+
+ ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;
+
+ if (ApiVersion >= 3)
+ rd_kafka_buf_read_throttle_time(rkbuf);
+
+ if (!*offsets)
+ *offsets = rd_kafka_topic_partition_list_new(16);
+
+ /* Set default offset for all partitions. */
+ rd_kafka_topic_partition_list_set_offsets(rkb->rkb_rk, *offsets, 0,
+ RD_KAFKA_OFFSET_INVALID,
+ 0 /* !is commit */);
+
+ rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX);
+ for (i = 0; i < TopicArrayCnt; i++) {
+ rd_kafkap_str_t topic;
+ int32_t PartArrayCnt;
+ char *topic_name;
+ int j;
+
+ rd_kafka_buf_read_str(rkbuf, &topic);
+
+ rd_kafka_buf_read_arraycnt(rkbuf, &PartArrayCnt,
+ RD_KAFKAP_PARTITIONS_MAX);
+
+ RD_KAFKAP_STR_DUPA(&topic_name, &topic);
+
+ for (j = 0; j < PartArrayCnt; j++) {
+ int32_t partition;
+ rd_kafka_toppar_t *rktp;
+ rd_kafka_topic_partition_t *rktpar;
+ int32_t LeaderEpoch = -1;
+ int16_t err2;
+
+ rd_kafka_buf_read_i32(rkbuf, &partition);
+ rd_kafka_buf_read_i64(rkbuf, &offset);
+ if (ApiVersion >= 5)
+ rd_kafka_buf_read_i32(rkbuf, &LeaderEpoch);
+ rd_kafka_buf_read_str(rkbuf, &metadata);
+ rd_kafka_buf_read_i16(rkbuf, &err2);
+ rd_kafka_buf_skip_tags(rkbuf);
+
+ rktpar = rd_kafka_topic_partition_list_find(
+ *offsets, topic_name, partition);
+ if (!rktpar && add_part)
+ rktpar = rd_kafka_topic_partition_list_add(
+ *offsets, topic_name, partition);
+ else if (!rktpar) {
+ rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
+ "OffsetFetchResponse: %s [%" PRId32
+ "] "
+ "not found in local list: ignoring",
+ topic_name, partition);
+ continue;
+ }
+
+ seen_cnt++;
+
+ rktp = rd_kafka_topic_partition_get_toppar(
+ rk, rktpar, rd_false /*no create on miss*/);
+
+ /* broker reports invalid offset as -1 */
+ if (offset == -1)
+ rktpar->offset = RD_KAFKA_OFFSET_INVALID;
+ else
+ rktpar->offset = offset;
+
+ rd_kafka_topic_partition_set_leader_epoch(rktpar,
+ LeaderEpoch);
+ rktpar->err = err2;
+
+ rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
+ "OffsetFetchResponse: %s [%" PRId32
+ "] "
+ "offset %" PRId64 ", leader epoch %" PRId32
+ ", metadata %d byte(s): %s",
+ topic_name, partition, offset, LeaderEpoch,
+ RD_KAFKAP_STR_LEN(&metadata),
+ rd_kafka_err2name(rktpar->err));
+
+ if (update_toppar && !err2 && rktp) {
+ /* Update toppar's committed offset */
+ rd_kafka_toppar_lock(rktp);
+ rktp->rktp_committed_pos =
+ rd_kafka_topic_partition_get_fetch_pos(
+ rktpar);
+ rd_kafka_toppar_unlock(rktp);
+ }
+
+ if (rktpar->err ==
+ RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT)
+ retry_unstable++;
+
+
+ if (rktpar->metadata)
+ rd_free(rktpar->metadata);
+
+ if (RD_KAFKAP_STR_IS_NULL(&metadata)) {
+ rktpar->metadata = NULL;
+ rktpar->metadata_size = 0;
+ } else {
+ rktpar->metadata = RD_KAFKAP_STR_DUP(&metadata);
+ rktpar->metadata_size =
+ RD_KAFKAP_STR_LEN(&metadata);
+ }
+
+ /* Loose ref from get_toppar() */
+ if (rktp)
+ rd_kafka_toppar_destroy(rktp);
+ }
+
+ rd_kafka_buf_skip_tags(rkbuf);
+ }
+
+ if (ApiVersion >= 2) {
+ int16_t ErrorCode;
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+ if (ErrorCode) {
+ err = ErrorCode;
+ goto err;
+ }
+ }
+
+
+err:
+ if (!*offsets)
+ rd_rkb_dbg(rkb, TOPIC, "OFFFETCH", "OffsetFetch returned %s",
+ rd_kafka_err2str(err));
+ else
+ rd_rkb_dbg(rkb, TOPIC, "OFFFETCH",
+ "OffsetFetch for %d/%d partition(s) "
+ "(%d unstable partition(s)) returned %s",
+ seen_cnt, (*offsets)->cnt, retry_unstable,
+ rd_kafka_err2str(err));
+
+ actions =
+ rd_kafka_err_action(rkb, err, request, RD_KAFKA_ERR_ACTION_END);
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ /* Re-query for coordinator */
+ rd_kafka_cgrp_op(rkb->rkb_rk->rk_cgrp, NULL, RD_KAFKA_NO_REPLYQ,
+ RD_KAFKA_OP_COORD_QUERY, err);
+ }
+
+ if (actions & RD_KAFKA_ERR_ACTION_RETRY || retry_unstable) {
+ if (allow_retry && rd_kafka_buf_retry(rkb, request))
+ return RD_KAFKA_RESP_ERR__IN_PROGRESS;
+ /* FALLTHRU */
+ }
+
+ return err;
+
+err_parse:
+ err = rkbuf->rkbuf_err;
+ goto err;
+}
+
+
+
+/**
+ * @brief Handle OffsetFetch response based on an RD_KAFKA_OP_OFFSET_FETCH
+ * rko in \p opaque.
+ *
+ * @param opaque rko wrapper for handle_OffsetFetch.
+ *
+ * The \c rko->rko_u.offset_fetch.partitions list will be filled in with
+ * the fetched offsets.
+ *
+ * A reply will be sent on 'rko->rko_replyq' with type RD_KAFKA_OP_OFFSET_FETCH.
+ *
+ * @remark \p rkb, \p rkbuf and \p request are optional.
+ *
+ * @remark The \p request buffer may be retried on error.
+ *
+ * @locality cgrp's broker thread
+ */
+void rd_kafka_op_handle_OffsetFetch(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_op_t *rko = opaque;
+ rd_kafka_op_t *rko_reply;
+ rd_kafka_topic_partition_list_t *offsets;
+
+ RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_OFFSET_FETCH);
+
+ if (err == RD_KAFKA_RESP_ERR__DESTROY) {
+ /* Termination, quick cleanup. */
+ rd_kafka_op_destroy(rko);
+ return;
+ }
+
+ offsets = rd_kafka_topic_partition_list_copy(
+ rko->rko_u.offset_fetch.partitions);
+
+ /* If all partitions already had usable offsets then there
+ * was no request sent and thus no reply, the offsets list is
+ * good to go.. */
+ if (rkbuf) {
+ /* ..else parse the response (or perror) */
+ err = rd_kafka_handle_OffsetFetch(
+ rkb->rkb_rk, rkb, err, rkbuf, request, &offsets,
+ rd_false /*dont update rktp*/, rd_false /*dont add part*/,
+ /* Allow retries if replyq is valid */
+ rd_kafka_op_replyq_is_valid(rko));
+ if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
+ if (offsets)
+ rd_kafka_topic_partition_list_destroy(offsets);
+ return; /* Retrying */
+ }
+ }
+
+ rko_reply =
+ rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH | RD_KAFKA_OP_REPLY);
+ rko_reply->rko_err = err;
+ rko_reply->rko_u.offset_fetch.partitions = offsets;
+ rko_reply->rko_u.offset_fetch.do_free = 1;
+ if (rko->rko_rktp)
+ rko_reply->rko_rktp = rd_kafka_toppar_keep(rko->rko_rktp);
+
+ rd_kafka_replyq_enq(&rko->rko_replyq, rko_reply, 0);
+
+ rd_kafka_op_destroy(rko);
+}
+
+/**
+ * Send OffsetFetchRequest for a consumer group id.
+ *
+ * Any partition with a usable offset will be ignored, if all partitions
+ * have usable offsets then no request is sent at all but an empty
+ * reply is enqueued on the replyq.
+ *
+ * @param group_id Request offset for this group id.
+ * @param parts (optional) List of topic partitions to request,
+ * or NULL to return all topic partitions associated with the
+ * group.
+ * @param require_stable_offsets Whether broker should return stable offsets
+ * (transaction-committed).
+ * @param timeout Optional timeout to set to the buffer.
+ */
+void rd_kafka_OffsetFetchRequest(rd_kafka_broker_t *rkb,
+ const char *group_id,
+ rd_kafka_topic_partition_list_t *parts,
+ rd_bool_t require_stable_offsets,
+ int timeout,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion;
+ size_t parts_size = 0;
+ int PartCnt = -1;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_OffsetFetch, 0, 7, NULL);
+
+ if (parts) {
+ parts_size = parts->cnt * 32;
+ }
+
+ rkbuf = rd_kafka_buf_new_flexver_request(
+ rkb, RD_KAFKAP_OffsetFetch, 1,
+ /* GroupId + rd_kafka_buf_write_arraycnt_pos +
+ * Topics + RequireStable */
+ 32 + 4 + parts_size + 1, ApiVersion >= 6 /*flexver*/);
+
+ /* ConsumerGroup */
+ rd_kafka_buf_write_str(rkbuf, group_id, -1);
+
+ if (parts) {
+ /* Sort partitions by topic */
+ rd_kafka_topic_partition_list_sort_by_topic(parts);
+
+ /* Write partition list, filtering out partitions with valid
+ * offsets */
+ const rd_kafka_topic_partition_field_t fields[] = {
+ RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
+ RD_KAFKA_TOPIC_PARTITION_FIELD_END};
+ PartCnt = rd_kafka_buf_write_topic_partitions(
+ rkbuf, parts, rd_false /*include invalid offsets*/,
+ rd_false /*skip valid offsets */, fields);
+ } else {
+ rd_kafka_buf_write_arraycnt_pos(rkbuf);
+ }
+
+ if (ApiVersion >= 7) {
+ /* RequireStable */
+ rd_kafka_buf_write_i8(rkbuf, require_stable_offsets);
+ }
+
+ if (PartCnt == 0) {
+ /* No partitions needs OffsetFetch, enqueue empty
+ * response right away. */
+ rkbuf->rkbuf_replyq = replyq;
+ rkbuf->rkbuf_cb = resp_cb;
+ rkbuf->rkbuf_opaque = opaque;
+ rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf);
+ return;
+ }
+
+ if (timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
+ rd_kafka_buf_set_abs_timeout(rkbuf, timeout + 1000, 0);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ if (parts) {
+ rd_rkb_dbg(
+ rkb, TOPIC | RD_KAFKA_DBG_CGRP | RD_KAFKA_DBG_CONSUMER,
+ "OFFSET",
+ "Group %s OffsetFetchRequest(v%d) for %d/%d partition(s)",
+ group_id, ApiVersion, PartCnt, parts->cnt);
+ } else {
+ rd_rkb_dbg(
+ rkb, TOPIC | RD_KAFKA_DBG_CGRP | RD_KAFKA_DBG_CONSUMER,
+ "OFFSET",
+ "Group %s OffsetFetchRequest(v%d) for all partitions",
+ group_id, ApiVersion);
+ }
+
+ /* Let handler decide if retries should be performed */
+ rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES;
+
+ if (parts) {
+ rd_rkb_dbg(rkb, CGRP | RD_KAFKA_DBG_CONSUMER, "OFFSET",
+ "Fetch committed offsets for %d/%d partition(s)",
+ PartCnt, parts->cnt);
+ } else {
+ rd_rkb_dbg(rkb, CGRP | RD_KAFKA_DBG_CONSUMER, "OFFSET",
+ "Fetch committed offsets all the partitions");
+ }
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+
+/**
+ * @brief Handle per-partition OffsetCommit errors and returns actions flags.
+ */
+static int
+rd_kafka_handle_OffsetCommit_error(rd_kafka_broker_t *rkb,
+ rd_kafka_buf_t *request,
+ const rd_kafka_topic_partition_t *rktpar) {
+
+ /* These actions are mimicking AK's ConsumerCoordinator.java */
+
+ return rd_kafka_err_action(
+ rkb, rktpar->err, request,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT,
+ RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT,
+ RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
+
+
+ RD_KAFKA_ERR_ACTION_PERMANENT,
+ RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT,
+ RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
+
+
+ RD_KAFKA_ERR_ACTION_RETRY,
+ RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
+
+ RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
+
+
+ /* .._SPECIAL: mark coordinator dead, refresh and retry */
+ RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY |
+ RD_KAFKA_ERR_ACTION_SPECIAL,
+ RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
+
+ RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY |
+ RD_KAFKA_ERR_ACTION_SPECIAL,
+ RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
+
+ /* Replicas possibly unavailable:
+ * Refresh coordinator (but don't mark as dead (!.._SPECIAL)),
+ * and retry */
+ RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY,
+ RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
+
+
+ /* FIXME: There are some cases in the Java code where
+ * this is not treated as a fatal error. */
+ RD_KAFKA_ERR_ACTION_PERMANENT | RD_KAFKA_ERR_ACTION_FATAL,
+ RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID,
+
+
+ RD_KAFKA_ERR_ACTION_PERMANENT,
+ RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
+
+
+ RD_KAFKA_ERR_ACTION_PERMANENT, RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT, RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+
+ RD_KAFKA_ERR_ACTION_END);
+}
+
+
+/**
+ * @brief Handle OffsetCommit response.
+ *
+ * @remark \p offsets may be NULL if \p err is set
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if all partitions were successfully
+ * committed,
+ * RD_KAFKA_RESP_ERR__IN_PROGRESS if a retry was scheduled,
+ * or any other error code if the request was not retried.
+ */
+rd_kafka_resp_err_t
+rd_kafka_handle_OffsetCommit(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ rd_kafka_topic_partition_list_t *offsets,
+ rd_bool_t ignore_cgrp) {
+ const int log_decode_errors = LOG_ERR;
+ int32_t TopicArrayCnt;
+ int errcnt = 0;
+ int partcnt = 0;
+ int i;
+ int actions = 0;
+
+ if (err)
+ goto err;
+
+ if (rd_kafka_buf_ApiVersion(rkbuf) >= 3)
+ rd_kafka_buf_read_throttle_time(rkbuf);
+
+ rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
+ for (i = 0; i < TopicArrayCnt; i++) {
+ rd_kafkap_str_t topic;
+ char *topic_str;
+ int32_t PartArrayCnt;
+ int j;
+
+ rd_kafka_buf_read_str(rkbuf, &topic);
+ rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
+
+ RD_KAFKAP_STR_DUPA(&topic_str, &topic);
+
+ for (j = 0; j < PartArrayCnt; j++) {
+ int32_t partition;
+ int16_t ErrorCode;
+ rd_kafka_topic_partition_t *rktpar;
+
+ rd_kafka_buf_read_i32(rkbuf, &partition);
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+
+ rktpar = rd_kafka_topic_partition_list_find(
+ offsets, topic_str, partition);
+
+ if (!rktpar) {
+ /* Received offset for topic/partition we didn't
+ * ask for, this shouldn't really happen. */
+ continue;
+ }
+
+ rktpar->err = ErrorCode;
+ if (ErrorCode) {
+ err = ErrorCode;
+ errcnt++;
+
+ /* Accumulate actions for per-partition
+ * errors. */
+ actions |= rd_kafka_handle_OffsetCommit_error(
+ rkb, request, rktpar);
+ }
+
+ partcnt++;
+ }
+ }
+
+ /* If all partitions failed use error code
+ * from last partition as the global error. */
+ if (offsets && err && errcnt == partcnt)
+ goto err;
+
+ goto done;
+
+err_parse:
+ err = rkbuf->rkbuf_err;
+
+err:
+ if (!actions) /* Transport/Request-level error */
+ actions = rd_kafka_err_action(rkb, err, request,
+
+ RD_KAFKA_ERR_ACTION_REFRESH |
+ RD_KAFKA_ERR_ACTION_SPECIAL |
+ RD_KAFKA_ERR_ACTION_RETRY,
+ RD_KAFKA_RESP_ERR__TRANSPORT,
+
+ RD_KAFKA_ERR_ACTION_END);
+
+ if (!ignore_cgrp && (actions & RD_KAFKA_ERR_ACTION_FATAL)) {
+ rd_kafka_set_fatal_error(rk, err, "OffsetCommit failed: %s",
+ rd_kafka_err2str(err));
+ return err;
+ }
+
+ if (!ignore_cgrp && (actions & RD_KAFKA_ERR_ACTION_REFRESH) &&
+ rk->rk_cgrp) {
+ /* Mark coordinator dead or re-query for coordinator.
+ * ..dead() will trigger a re-query. */
+ if (actions & RD_KAFKA_ERR_ACTION_SPECIAL)
+ rd_kafka_cgrp_coord_dead(rk->rk_cgrp, err,
+ "OffsetCommitRequest failed");
+ else
+ rd_kafka_cgrp_coord_query(rk->rk_cgrp,
+ "OffsetCommitRequest failed");
+ }
+
+ if (!ignore_cgrp && actions & RD_KAFKA_ERR_ACTION_RETRY &&
+ !(actions & RD_KAFKA_ERR_ACTION_PERMANENT) &&
+ rd_kafka_buf_retry(rkb, request))
+ return RD_KAFKA_RESP_ERR__IN_PROGRESS;
+
+done:
+ return err;
+}
+
+/**
+ * @brief Send OffsetCommitRequest for a list of partitions.
+ *
+ * @param cgmetadata consumer group metadata.
+ *
+ * @param offsets - offsets to commit for each topic-partition.
+ *
+ * @returns 0 if none of the partitions in \p offsets had valid offsets,
+ * else 1.
+ */
+int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb,
+ rd_kafka_consumer_group_metadata_t *cgmetadata,
+ rd_kafka_topic_partition_list_t *offsets,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque,
+ const char *reason) {
+ rd_kafka_buf_t *rkbuf;
+ ssize_t of_TopicCnt = -1;
+ int TopicCnt = 0;
+ const char *last_topic = NULL;
+ ssize_t of_PartCnt = -1;
+ int PartCnt = 0;
+ int tot_PartCnt = 0;
+ int i;
+ int16_t ApiVersion;
+ int features;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_OffsetCommit, 0, 7, &features);
+
+ rd_kafka_assert(NULL, offsets != NULL);
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit, 1,
+ 100 + (offsets->cnt * 128));
+
+ /* ConsumerGroup */
+ rd_kafka_buf_write_str(rkbuf, cgmetadata->group_id, -1);
+
+ /* v1,v2 */
+ if (ApiVersion >= 1) {
+ /* ConsumerGroupGenerationId */
+ rd_kafka_buf_write_i32(rkbuf, cgmetadata->generation_id);
+ /* ConsumerId */
+ rd_kafka_buf_write_str(rkbuf, cgmetadata->member_id, -1);
+ }
+
+ /* v7: GroupInstanceId */
+ if (ApiVersion >= 7)
+ rd_kafka_buf_write_str(rkbuf, cgmetadata->group_instance_id,
+ -1);
+
+ /* v2-4: RetentionTime */
+ if (ApiVersion >= 2 && ApiVersion <= 4)
+ rd_kafka_buf_write_i64(rkbuf, -1);
+
+ /* Sort offsets by topic */
+ rd_kafka_topic_partition_list_sort_by_topic(offsets);
+
+ /* TopicArrayCnt: Will be updated when we know the number of topics. */
+ of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+
+ for (i = 0; i < offsets->cnt; i++) {
+ rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
+
+ /* Skip partitions with invalid offset. */
+ if (rktpar->offset < 0)
+ continue;
+
+ if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) {
+ /* New topic */
+
+ /* Finalize previous PartitionCnt */
+ if (PartCnt > 0)
+ rd_kafka_buf_update_u32(rkbuf, of_PartCnt,
+ PartCnt);
+
+ /* TopicName */
+ rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
+ /* PartitionCnt, finalized later */
+ of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+ PartCnt = 0;
+ last_topic = rktpar->topic;
+ TopicCnt++;
+ }
+
+ /* Partition */
+ rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
+ PartCnt++;
+ tot_PartCnt++;
+
+ /* Offset */
+ rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
+
+ /* v6: KIP-101 CommittedLeaderEpoch */
+ if (ApiVersion >= 6)
+ rd_kafka_buf_write_i32(
+ rkbuf,
+ rd_kafka_topic_partition_get_leader_epoch(rktpar));
+
+ /* v1: TimeStamp */
+ if (ApiVersion == 1)
+ rd_kafka_buf_write_i64(rkbuf, -1);
+
+ /* Metadata */
+ /* Java client 0.9.0 and broker <0.10.0 can't parse
+ * Null metadata fields, so as a workaround we send an
+ * empty string if it's Null. */
+ if (!rktpar->metadata)
+ rd_kafka_buf_write_str(rkbuf, "", 0);
+ else
+ rd_kafka_buf_write_str(rkbuf, rktpar->metadata,
+ rktpar->metadata_size);
+ }
+
+ if (tot_PartCnt == 0) {
+ /* No topic+partitions had valid offsets to commit. */
+ rd_kafka_replyq_destroy(&replyq);
+ rd_kafka_buf_destroy(rkbuf);
+ return 0;
+ }
+
+ /* Finalize previous PartitionCnt */
+ if (PartCnt > 0)
+ rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt);
+
+ /* Finalize TopicCnt */
+ rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_rkb_dbg(rkb, TOPIC, "OFFSET",
+ "Enqueue OffsetCommitRequest(v%d, %d/%d partition(s))): %s",
+ ApiVersion, tot_PartCnt, offsets->cnt, reason);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return 1;
+}
+
+/**
+ * @brief Construct and send OffsetDeleteRequest to \p rkb
+ * with the partitions in del_grpoffsets (DeleteConsumerGroupOffsets_t*)
+ * using \p options.
+ *
+ * The response (unparsed) will be enqueued on \p replyq
+ * for handling by \p resp_cb (with \p opaque passed).
+ *
+ * @remark Only one del_grpoffsets element is supported.
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code and errstr will be
+ * updated with a human readable error string.
+ */
+rd_kafka_resp_err_t
+rd_kafka_OffsetDeleteRequest(rd_kafka_broker_t *rkb,
+ /** (rd_kafka_DeleteConsumerGroupOffsets_t*) */
+ const rd_list_t *del_grpoffsets,
+ rd_kafka_AdminOptions_t *options,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ int features;
+ const rd_kafka_DeleteConsumerGroupOffsets_t *grpoffsets =
+ rd_list_elem(del_grpoffsets, 0);
+
+ rd_assert(rd_list_cnt(del_grpoffsets) == 1);
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_OffsetDelete, 0, 0, &features);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "OffsetDelete API (KIP-496) not supported "
+ "by broker, requires broker version >= 2.4.0");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ rkbuf = rd_kafka_buf_new_request(
+ rkb, RD_KAFKAP_OffsetDelete, 1,
+ 2 + strlen(grpoffsets->group) + (64 * grpoffsets->partitions->cnt));
+
+ /* GroupId */
+ rd_kafka_buf_write_str(rkbuf, grpoffsets->group, -1);
+
+ const rd_kafka_topic_partition_field_t fields[] = {
+ RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
+ RD_KAFKA_TOPIC_PARTITION_FIELD_END};
+ rd_kafka_buf_write_topic_partitions(
+ rkbuf, grpoffsets->partitions,
+ rd_false /*dont skip invalid offsets*/, rd_false /*any offset*/,
+ fields);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+/**
+ * @brief Write "consumer" protocol type MemberState for SyncGroupRequest to
+ * enveloping buffer \p rkbuf.
+ */
+static void
+rd_kafka_group_MemberState_consumer_write(rd_kafka_buf_t *env_rkbuf,
+ const rd_kafka_group_member_t *rkgm) {
+ rd_kafka_buf_t *rkbuf;
+ rd_slice_t slice;
+
+ rkbuf = rd_kafka_buf_new(1, 100);
+ rd_kafka_buf_write_i16(rkbuf, 0); /* Version */
+ rd_assert(rkgm->rkgm_assignment);
+ const rd_kafka_topic_partition_field_t fields[] = {
+ RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
+ RD_KAFKA_TOPIC_PARTITION_FIELD_END};
+ rd_kafka_buf_write_topic_partitions(
+ rkbuf, rkgm->rkgm_assignment,
+ rd_false /*don't skip invalid offsets*/, rd_false /* any offset */,
+ fields);
+ rd_kafka_buf_write_kbytes(rkbuf, rkgm->rkgm_userdata);
+
+ /* Get pointer to binary buffer */
+ rd_slice_init_full(&slice, &rkbuf->rkbuf_buf);
+
+ /* Write binary buffer as Kafka Bytes to enveloping buffer. */
+ rd_kafka_buf_write_i32(env_rkbuf, (int32_t)rd_slice_remains(&slice));
+ rd_buf_write_slice(&env_rkbuf->rkbuf_buf, &slice);
+
+ rd_kafka_buf_destroy(rkbuf);
+}
+
+/**
+ * Send SyncGroupRequest
+ */
+void rd_kafka_SyncGroupRequest(rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *group_id,
+ int32_t generation_id,
+ const rd_kafkap_str_t *member_id,
+ const rd_kafkap_str_t *group_instance_id,
+ const rd_kafka_group_member_t *assignments,
+ int assignment_cnt,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int i;
+ int16_t ApiVersion;
+ int features;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_SyncGroup, 0, 3, &features);
+
+ rkbuf = rd_kafka_buf_new_request(
+ rkb, RD_KAFKAP_SyncGroup, 1,
+ RD_KAFKAP_STR_SIZE(group_id) + 4 /* GenerationId */ +
+ RD_KAFKAP_STR_SIZE(member_id) +
+ RD_KAFKAP_STR_SIZE(group_instance_id) +
+ 4 /* array size group_assignment */ +
+ (assignment_cnt * 100 /*guess*/));
+ rd_kafka_buf_write_kstr(rkbuf, group_id);
+ rd_kafka_buf_write_i32(rkbuf, generation_id);
+ rd_kafka_buf_write_kstr(rkbuf, member_id);
+ if (ApiVersion >= 3)
+ rd_kafka_buf_write_kstr(rkbuf, group_instance_id);
+ rd_kafka_buf_write_i32(rkbuf, assignment_cnt);
+
+ for (i = 0; i < assignment_cnt; i++) {
+ const rd_kafka_group_member_t *rkgm = &assignments[i];
+
+ rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id);
+ rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm);
+ }
+
+ /* This is a blocking request */
+ rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
+ rd_kafka_buf_set_abs_timeout(
+ rkbuf,
+ rkb->rkb_rk->rk_conf.group_session_timeout_ms +
+ 3000 /* 3s grace period*/,
+ 0);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+
+/**
+ * Send JoinGroupRequest
+ */
+void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *group_id,
+ const rd_kafkap_str_t *member_id,
+ const rd_kafkap_str_t *group_instance_id,
+ const rd_kafkap_str_t *protocol_type,
+ const rd_list_t *topics,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ rd_kafka_t *rk = rkb->rkb_rk;
+ rd_kafka_assignor_t *rkas;
+ int i;
+ int16_t ApiVersion = 0;
+ int features;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_JoinGroup, 0, 5, &features);
+
+
+ rkbuf = rd_kafka_buf_new_request(
+ rkb, RD_KAFKAP_JoinGroup, 1,
+ RD_KAFKAP_STR_SIZE(group_id) + 4 /* sessionTimeoutMs */ +
+ 4 /* rebalanceTimeoutMs */ + RD_KAFKAP_STR_SIZE(member_id) +
+ RD_KAFKAP_STR_SIZE(group_instance_id) +
+ RD_KAFKAP_STR_SIZE(protocol_type) +
+ 4 /* array count GroupProtocols */ +
+ (rd_list_cnt(topics) * 100));
+ rd_kafka_buf_write_kstr(rkbuf, group_id);
+ rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.group_session_timeout_ms);
+ if (ApiVersion >= 1)
+ rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.max_poll_interval_ms);
+ rd_kafka_buf_write_kstr(rkbuf, member_id);
+ if (ApiVersion >= 5)
+ rd_kafka_buf_write_kstr(rkbuf, group_instance_id);
+ rd_kafka_buf_write_kstr(rkbuf, protocol_type);
+ rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt);
+
+ RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) {
+ rd_kafkap_bytes_t *member_metadata;
+ if (!rkas->rkas_enabled)
+ continue;
+ rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name);
+ member_metadata = rkas->rkas_get_metadata_cb(
+ rkas, rk->rk_cgrp->rkcg_assignor_state, topics,
+ rk->rk_cgrp->rkcg_group_assignment);
+ rd_kafka_buf_write_kbytes(rkbuf, member_metadata);
+ rd_kafkap_bytes_destroy(member_metadata);
+ }
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ if (ApiVersion < 1 &&
+ rk->rk_conf.max_poll_interval_ms >
+ rk->rk_conf.group_session_timeout_ms &&
+ rd_interval(&rkb->rkb_suppress.unsupported_kip62,
+ /* at most once per day */
+ (rd_ts_t)86400 * 1000 * 1000, 0) > 0)
+ rd_rkb_log(rkb, LOG_NOTICE, "MAXPOLL",
+ "Broker does not support KIP-62 "
+ "(requires Apache Kafka >= v0.10.1.0): "
+ "consumer configuration "
+ "`max.poll.interval.ms` (%d) "
+ "is effectively limited "
+ "by `session.timeout.ms` (%d) "
+ "with this broker version",
+ rk->rk_conf.max_poll_interval_ms,
+ rk->rk_conf.group_session_timeout_ms);
+
+
+ if (ApiVersion < 5 && rk->rk_conf.group_instance_id &&
+ rd_interval(&rkb->rkb_suppress.unsupported_kip345,
+ /* at most once per day */
+ (rd_ts_t)86400 * 1000 * 1000, 0) > 0)
+ rd_rkb_log(rkb, LOG_NOTICE, "STATICMEMBER",
+ "Broker does not support KIP-345 "
+ "(requires Apache Kafka >= v2.3.0): "
+ "consumer configuration "
+ "`group.instance.id` (%s) "
+ "will not take effect",
+ rk->rk_conf.group_instance_id);
+
+ /* Absolute timeout */
+ rd_kafka_buf_set_abs_timeout_force(
+ rkbuf,
+ /* Request timeout is max.poll.interval.ms + grace
+ * if the broker supports it, else
+ * session.timeout.ms + grace. */
+ (ApiVersion >= 1 ? rk->rk_conf.max_poll_interval_ms
+ : rk->rk_conf.group_session_timeout_ms) +
+ 3000 /* 3s grace period*/,
+ 0);
+
+ /* This is a blocking request */
+ rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+
+/**
+ * Send LeaveGroupRequest
+ */
+void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb,
+ const char *group_id,
+ const char *member_id,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ int features;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_LeaveGroup, 0, 1, &features);
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup, 1, 300);
+
+ rd_kafka_buf_write_str(rkbuf, group_id, -1);
+ rd_kafka_buf_write_str(rkbuf, member_id, -1);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ /* LeaveGroupRequests are best-effort, the local consumer
+ * does not care if it succeeds or not, so the request timeout
+ * is shortened.
+ * Retries are not needed. */
+ rd_kafka_buf_set_abs_timeout(rkbuf, 5000, 0);
+ rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+/**
+ * Handler for LeaveGroup responses
+ * opaque must be the cgrp handle.
+ */
+void rd_kafka_handle_LeaveGroup(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_cgrp_t *rkcg = opaque;
+ const int log_decode_errors = LOG_ERR;
+ int16_t ErrorCode = 0;
+ int actions;
+
+ if (err) {
+ ErrorCode = err;
+ goto err;
+ }
+
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+
+err:
+ actions = rd_kafka_err_action(rkb, ErrorCode, request,
+ RD_KAFKA_ERR_ACTION_END);
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ /* Re-query for coordinator */
+ rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
+ RD_KAFKA_OP_COORD_QUERY, ErrorCode);
+ }
+
+ if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
+ if (rd_kafka_buf_retry(rkb, request))
+ return;
+ /* FALLTHRU */
+ }
+
+ if (ErrorCode)
+ rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
+ "LeaveGroup response: %s",
+ rd_kafka_err2str(ErrorCode));
+
+ return;
+
+err_parse:
+ ErrorCode = rkbuf->rkbuf_err;
+ goto err;
+}
+
+
+
+/**
+ * Send HeartbeatRequest
+ */
+void rd_kafka_HeartbeatRequest(rd_kafka_broker_t *rkb,
+ const rd_kafkap_str_t *group_id,
+ int32_t generation_id,
+ const rd_kafkap_str_t *member_id,
+ const rd_kafkap_str_t *group_instance_id,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ int features;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_Heartbeat, 0, 3, &features);
+
+ rd_rkb_dbg(rkb, CGRP, "HEARTBEAT",
+ "Heartbeat for group \"%s\" generation id %" PRId32,
+ group_id->str, generation_id);
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Heartbeat, 1,
+ RD_KAFKAP_STR_SIZE(group_id) +
+ 4 /* GenerationId */ +
+ RD_KAFKAP_STR_SIZE(member_id));
+
+ rd_kafka_buf_write_kstr(rkbuf, group_id);
+ rd_kafka_buf_write_i32(rkbuf, generation_id);
+ rd_kafka_buf_write_kstr(rkbuf, member_id);
+ if (ApiVersion >= 3)
+ rd_kafka_buf_write_kstr(rkbuf, group_instance_id);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_kafka_buf_set_abs_timeout(
+ rkbuf, rkb->rkb_rk->rk_conf.group_session_timeout_ms, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+}
+
+
+
+/**
+ * @brief Construct and send ListGroupsRequest to \p rkb
+ * with the states (const char *) in \p states.
+ * Uses \p max_ApiVersion as maximum API version,
+ * pass -1 to use the maximum available version.
+ *
+ * The response (unparsed) will be enqueued on \p replyq
+ * for handling by \p resp_cb (with \p opaque passed).
+ *
+ * @return NULL on success, a new error instance that must be
+ * released with rd_kafka_error_destroy() in case of error.
+ */
+rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb,
+ int16_t max_ApiVersion,
+ const char **states,
+ size_t states_cnt,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ size_t i;
+
+ if (max_ApiVersion < 0)
+ max_ApiVersion = 4;
+
+ if (max_ApiVersion > ApiVersion) {
+ /* Remark: don't check if max_ApiVersion is zero.
+ * As rd_kafka_broker_ApiVersion_supported cannot be checked
+ * in the application thread reliably . */
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_ListGroups, 0, max_ApiVersion, NULL);
+ }
+
+ if (ApiVersion == -1) {
+ return rd_kafka_error_new(
+ RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
+ "ListGroupsRequest not supported by broker");
+ }
+
+ rkbuf = rd_kafka_buf_new_flexver_request(
+ rkb, RD_KAFKAP_ListGroups, 1,
+ /* rd_kafka_buf_write_arraycnt_pos + tags + StatesFilter */
+ 4 + 1 + 32 * states_cnt, ApiVersion >= 3 /* is_flexver */);
+
+ if (ApiVersion >= 4) {
+ size_t of_GroupsArrayCnt =
+ rd_kafka_buf_write_arraycnt_pos(rkbuf);
+ for (i = 0; i < states_cnt; i++) {
+ rd_kafka_buf_write_str(rkbuf, states[i], -1);
+ }
+ rd_kafka_buf_finalize_arraycnt(rkbuf, of_GroupsArrayCnt, i);
+ }
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+ return NULL;
+}
+
+/**
+ * @brief Construct and send DescribeGroupsRequest to \p rkb
+ * with the groups (const char *) in \p groups.
+ * Uses \p max_ApiVersion as maximum API version,
+ * pass -1 to use the maximum available version.
+ *
+ * The response (unparsed) will be enqueued on \p replyq
+ * for handling by \p resp_cb (with \p opaque passed).
+ *
+ * @return NULL on success, a new error instance that must be
+ * released with rd_kafka_error_destroy() in case of error.
+ */
+rd_kafka_error_t *rd_kafka_DescribeGroupsRequest(rd_kafka_broker_t *rkb,
+ int16_t max_ApiVersion,
+ char **groups,
+ size_t group_cnt,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ size_t of_GroupsArrayCnt;
+
+ if (max_ApiVersion < 0)
+ max_ApiVersion = 4;
+
+ if (max_ApiVersion > ApiVersion) {
+ /* Remark: don't check if max_ApiVersion is zero.
+ * As rd_kafka_broker_ApiVersion_supported cannot be checked
+ * in the application thread reliably . */
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_DescribeGroups, 0, max_ApiVersion, NULL);
+ }
+
+ if (ApiVersion == -1) {
+ return rd_kafka_error_new(
+ RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
+ "DescribeGroupsRequest not supported by broker");
+ }
+
+ rkbuf = rd_kafka_buf_new_flexver_request(
+ rkb, RD_KAFKAP_DescribeGroups, 1,
+ 4 /* rd_kafka_buf_write_arraycnt_pos */ +
+ 1 /* IncludeAuthorizedOperations */ + 1 /* tags */ +
+ 32 * group_cnt /* Groups */,
+ rd_false);
+
+ /* write Groups */
+ of_GroupsArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf);
+ rd_kafka_buf_finalize_arraycnt(rkbuf, of_GroupsArrayCnt, group_cnt);
+ while (group_cnt-- > 0)
+ rd_kafka_buf_write_str(rkbuf, groups[group_cnt], -1);
+
+ /* write IncludeAuthorizedOperations */
+ if (ApiVersion >= 3) {
+ /* TODO: implement KIP-430 */
+ rd_kafka_buf_write_bool(rkbuf, rd_false);
+ }
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+ return NULL;
+}
+
+/**
+ * @brief Generic handler for Metadata responses
+ *
+ * @locality rdkafka main thread
+ */
+static void rd_kafka_handle_Metadata(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_op_t *rko = opaque; /* Possibly NULL */
+ struct rd_kafka_metadata *md = NULL;
+ const rd_list_t *topics = request->rkbuf_u.Metadata.topics;
+ int actions;
+
+ rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY ||
+ thrd_is_current(rk->rk_thread));
+
+ /* Avoid metadata updates when we're terminating. */
+ if (rd_kafka_terminating(rkb->rkb_rk) ||
+ err == RD_KAFKA_RESP_ERR__DESTROY) {
+ /* Terminating */
+ goto done;
+ }
+
+ if (err)
+ goto err;
+
+ if (!topics)
+ rd_rkb_dbg(rkb, METADATA, "METADATA",
+ "===== Received metadata: %s =====",
+ request->rkbuf_u.Metadata.reason);
+ else
+ rd_rkb_dbg(rkb, METADATA, "METADATA",
+ "===== Received metadata "
+ "(for %d requested topics): %s =====",
+ rd_list_cnt(topics),
+ request->rkbuf_u.Metadata.reason);
+
+ err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &md);
+ if (err)
+ goto err;
+
+ if (rko && rko->rko_replyq.q) {
+ /* Reply to metadata requester, passing on the metadata.
+ * Reuse requesting rko for the reply. */
+ rko->rko_err = err;
+ rko->rko_u.metadata.md = md;
+
+ rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
+ rko = NULL;
+ } else {
+ if (md)
+ rd_free(md);
+ }
+
+ goto done;
+
+err:
+ actions = rd_kafka_err_action(rkb, err, request,
+
+ RD_KAFKA_ERR_ACTION_RETRY,
+ RD_KAFKA_RESP_ERR__PARTIAL,
+
+ RD_KAFKA_ERR_ACTION_END);
+
+ if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
+ if (rd_kafka_buf_retry(rkb, request))
+ return;
+ /* FALLTHRU */
+ } else {
+ rd_rkb_log(rkb, LOG_WARNING, "METADATA",
+ "Metadata request failed: %s: %s (%dms): %s",
+ request->rkbuf_u.Metadata.reason,
+ rd_kafka_err2str(err),
+ (int)(request->rkbuf_ts_sent / 1000),
+ rd_kafka_actions2str(actions));
+ /* Respond back to caller on non-retriable errors */
+ if (rko && rko->rko_replyq.q) {
+ rko->rko_err = err;
+ rko->rko_u.metadata.md = NULL;
+ rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
+ rko = NULL;
+ }
+ }
+
+
+
+ /* FALLTHRU */
+
+done:
+ if (rko)
+ rd_kafka_op_destroy(rko);
+}
+
+
+/**
+ * @brief Construct MetadataRequest (does not send)
+ *
+ * \p topics is a list of topic names (char *) to request.
+ *
+ * !topics - only request brokers (if supported by broker, else
+ * all topics)
+ * topics.cnt==0 - all topics in cluster are requested
+ * topics.cnt >0 - only specified topics are requested
+ *
+ * @param reason - metadata request reason
+ * @param allow_auto_create_topics - allow broker-side auto topic creation.
+ * This is best-effort, depending on broker
+ * config and version.
+ * @param cgrp_update - Update cgrp in parse_Metadata (see comment there).
+ * @param rko - (optional) rko with replyq for handling response.
+ * Specifying an rko forces a metadata request even if
+ * there is already a matching one in-transit.
+ *
+ * If full metadata for all topics is requested (or all brokers, which
+ * results in all-topics on older brokers) and there is already a full request
+ * in transit then this function will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS
+ * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. If \p rko is non-NULL the request
+ * is sent regardless.
+ */
+rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
+ const rd_list_t *topics,
+ const char *reason,
+ rd_bool_t allow_auto_create_topics,
+ rd_bool_t cgrp_update,
+ rd_kafka_op_t *rko) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ size_t of_TopicArrayCnt;
+ int features;
+ int topic_cnt = topics ? rd_list_cnt(topics) : 0;
+ int *full_incr = NULL;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_Metadata, 0, 9, &features);
+
+ rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_Metadata, 1,
+ 4 + (50 * topic_cnt) + 1,
+ ApiVersion >= 9);
+
+ if (!reason)
+ reason = "";
+
+ rkbuf->rkbuf_u.Metadata.reason = rd_strdup(reason);
+ rkbuf->rkbuf_u.Metadata.cgrp_update = cgrp_update;
+
+ /* TopicArrayCnt */
+ of_TopicArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf);
+
+ if (!topics) {
+ /* v0: keep 0, brokers only not available,
+ * request all topics */
+ /* v1-8: 0 means empty array, brokers only */
+ if (ApiVersion >= 9) {
+ /* v9+: varint encoded empty array (1), brokers only */
+ rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicArrayCnt,
+ topic_cnt);
+ }
+
+ rd_rkb_dbg(rkb, METADATA, "METADATA",
+ "Request metadata for brokers only: %s", reason);
+ full_incr =
+ &rkb->rkb_rk->rk_metadata_cache.rkmc_full_brokers_sent;
+
+ } else if (topic_cnt == 0) {
+ /* v0: keep 0, request all topics */
+ if (ApiVersion >= 1 && ApiVersion < 9) {
+ /* v1-8: update to -1, all topics */
+ rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, -1);
+ }
+ /* v9+: keep 0, varint encoded null, all topics */
+
+ rkbuf->rkbuf_u.Metadata.all_topics = 1;
+ rd_rkb_dbg(rkb, METADATA, "METADATA",
+ "Request metadata for all topics: "
+ "%s",
+ reason);
+
+ if (!rko)
+ full_incr = &rkb->rkb_rk->rk_metadata_cache
+ .rkmc_full_topics_sent;
+
+ } else {
+ /* request cnt topics */
+ rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicArrayCnt,
+ topic_cnt);
+
+ rd_rkb_dbg(rkb, METADATA, "METADATA",
+ "Request metadata for %d topic(s): "
+ "%s",
+ topic_cnt, reason);
+ }
+
+ if (full_incr) {
+ /* Avoid multiple outstanding full requests
+ * (since they are redundant and side-effect-less).
+ * Forced requests (app using metadata() API) are passed
+ * through regardless. */
+
+ mtx_lock(&rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock);
+ if (*full_incr > 0 && (!rko || !rko->rko_u.metadata.force)) {
+ mtx_unlock(
+ &rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock);
+ rd_rkb_dbg(rkb, METADATA, "METADATA",
+ "Skipping metadata request: %s: "
+ "full request already in-transit",
+ reason);
+ rd_kafka_buf_destroy(rkbuf);
+ return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
+ }
+
+ (*full_incr)++;
+ mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock);
+ rkbuf->rkbuf_u.Metadata.decr = full_incr;
+ rkbuf->rkbuf_u.Metadata.decr_lock =
+ &rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock;
+ }
+
+
+ if (topic_cnt > 0) {
+ char *topic;
+ int i;
+
+ /* Maintain a copy of the topics list so we can purge
+ * hints from the metadata cache on error. */
+ rkbuf->rkbuf_u.Metadata.topics =
+ rd_list_copy(topics, rd_list_string_copy, NULL);
+
+ RD_LIST_FOREACH(topic, topics, i) {
+ rd_kafka_buf_write_str(rkbuf, topic, -1);
+ /* Tags for previous topic */
+ rd_kafka_buf_write_tags(rkbuf);
+ }
+ }
+
+ if (ApiVersion >= 4) {
+ /* AllowAutoTopicCreation */
+ rd_kafka_buf_write_bool(rkbuf, allow_auto_create_topics);
+
+ } else if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER &&
+ !rkb->rkb_rk->rk_conf.allow_auto_create_topics &&
+ rd_kafka_conf_is_modified(&rkb->rkb_rk->rk_conf,
+ "allow.auto.create.topics") &&
+ rd_interval(
+ &rkb->rkb_rk->rk_suppress.allow_auto_create_topics,
+ 30 * 60 * 1000 /* every 30 minutes */, 0) >= 0) {
+ /* Let user know we can't obey allow.auto.create.topics */
+ rd_rkb_log(rkb, LOG_WARNING, "AUTOCREATE",
+ "allow.auto.create.topics=false not supported "
+ "by broker: requires broker version >= 0.11.0.0: "
+ "requested topic(s) may be auto created depending "
+ "on broker auto.create.topics.enable configuration");
+ }
+
+ if (ApiVersion >= 8 && ApiVersion < 10) {
+ /* TODO: implement KIP-430 */
+ /* IncludeClusterAuthorizedOperations */
+ rd_kafka_buf_write_bool(rkbuf, rd_false);
+ }
+
+ if (ApiVersion >= 8) {
+ /* TODO: implement KIP-430 */
+ /* IncludeTopicAuthorizedOperations */
+ rd_kafka_buf_write_bool(rkbuf, rd_false);
+ }
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ /* Metadata requests are part of the important control plane
+ * and should go before most other requests (Produce, Fetch, etc). */
+ rkbuf->rkbuf_prio = RD_KAFKA_PRIO_HIGH;
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
+ /* Handle response thru rk_ops,
+ * but forward parsed result to
+ * rko's replyq when done. */
+ RD_KAFKA_REPLYQ(rkb->rkb_rk->rk_ops, 0),
+ rd_kafka_handle_Metadata, rko);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+/**
+ * @brief Parses and handles ApiVersion reply.
+ *
+ * @param apis will be allocated, populated and sorted
+ * with broker's supported APIs, or set to NULL.
+ * @param api_cnt will be set to the number of elements in \p *apis
+ *
+ * @returns 0 on success, else an error.
+ *
+ * @remark A valid \p apis might be returned even if an error is returned.
+ */
+rd_kafka_resp_err_t
+rd_kafka_handle_ApiVersion(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ struct rd_kafka_ApiVersion **apis,
+ size_t *api_cnt) {
+ const int log_decode_errors = LOG_DEBUG;
+ int32_t ApiArrayCnt;
+ int16_t ErrorCode;
+ int i = 0;
+
+ *apis = NULL;
+ *api_cnt = 0;
+
+ if (err)
+ goto err;
+
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+ err = ErrorCode;
+
+ rd_kafka_buf_read_arraycnt(rkbuf, &ApiArrayCnt, 1000);
+ if (err && ApiArrayCnt < 1) {
+ /* Version >=3 returns the ApiVersions array if the error
+ * code is ERR_UNSUPPORTED_VERSION, previous versions don't */
+ goto err;
+ }
+
+ rd_rkb_dbg(rkb, FEATURE, "APIVERSION", "Broker API support:");
+
+ *apis = rd_malloc(sizeof(**apis) * ApiArrayCnt);
+
+ for (i = 0; i < ApiArrayCnt; i++) {
+ struct rd_kafka_ApiVersion *api = &(*apis)[i];
+
+ rd_kafka_buf_read_i16(rkbuf, &api->ApiKey);
+ rd_kafka_buf_read_i16(rkbuf, &api->MinVer);
+ rd_kafka_buf_read_i16(rkbuf, &api->MaxVer);
+
+ rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
+ " ApiKey %s (%hd) Versions %hd..%hd",
+ rd_kafka_ApiKey2str(api->ApiKey), api->ApiKey,
+ api->MinVer, api->MaxVer);
+
+ /* Discard struct tags */
+ rd_kafka_buf_skip_tags(rkbuf);
+ }
+
+ if (request->rkbuf_reqhdr.ApiVersion >= 1)
+ rd_kafka_buf_read_throttle_time(rkbuf);
+
+ /* Discard end tags */
+ rd_kafka_buf_skip_tags(rkbuf);
+
+ *api_cnt = ApiArrayCnt;
+ qsort(*apis, *api_cnt, sizeof(**apis), rd_kafka_ApiVersion_key_cmp);
+
+ goto done;
+
+err_parse:
+ /* If the broker does not support our ApiVersionRequest version it
+ * will respond with a version 0 response, which will most likely
+ * fail parsing. Instead of propagating the parse error we
+ * propagate the original error, unless there isn't one in which case
+ * we use the parse error. */
+ if (!err)
+ err = rkbuf->rkbuf_err;
+err:
+ /* There are no retryable errors. */
+
+ if (*apis)
+ rd_free(*apis);
+
+ *apis = NULL;
+ *api_cnt = 0;
+
+done:
+ return err;
+}
+
+
+
+/**
+ * @brief Send ApiVersionRequest (KIP-35)
+ *
+ * @param ApiVersion If -1 use the highest supported version, else use the
+ * specified value.
+ */
+void rd_kafka_ApiVersionRequest(rd_kafka_broker_t *rkb,
+ int16_t ApiVersion,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+
+ if (ApiVersion == -1)
+ ApiVersion = 3;
+
+ rkbuf = rd_kafka_buf_new_flexver_request(
+ rkb, RD_KAFKAP_ApiVersion, 1, 3, ApiVersion >= 3 /*flexver*/);
+
+ if (ApiVersion >= 3) {
+ /* KIP-511 adds software name and version through the optional
+ * protocol fields defined in KIP-482. */
+
+ /* ClientSoftwareName */
+ rd_kafka_buf_write_str(rkbuf, rkb->rkb_rk->rk_conf.sw_name, -1);
+
+ /* ClientSoftwareVersion */
+ rd_kafka_buf_write_str(rkbuf, rkb->rkb_rk->rk_conf.sw_version,
+ -1);
+ }
+
+ /* Should be sent before any other requests since it is part of
+ * the initial connection handshake. */
+ rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH;
+
+ /* Non-supporting brokers will tear down the connection when they
+ * receive an unknown API request, so dont retry request on failure. */
+ rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;
+
+ /* 0.9.0.x brokers will not close the connection on unsupported
+ * API requests, so we minimize the timeout for the request.
+ * This is a regression on the broker part. */
+ rd_kafka_buf_set_abs_timeout(
+ rkbuf, rkb->rkb_rk->rk_conf.api_version_request_timeout_ms, 0);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ if (replyq.q)
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb,
+ opaque);
+ else /* in broker thread */
+ rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
+}
+
+
+/**
+ * Send SaslHandshakeRequest (KIP-43)
+ */
+void rd_kafka_SaslHandshakeRequest(rd_kafka_broker_t *rkb,
+ const char *mechanism,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int mechlen = (int)strlen(mechanism);
+ int16_t ApiVersion;
+ int features;
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslHandshake, 1,
+ RD_KAFKAP_STR_SIZE0(mechlen));
+
+ /* Should be sent before any other requests since it is part of
+ * the initial connection handshake. */
+ rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH;
+
+ rd_kafka_buf_write_str(rkbuf, mechanism, mechlen);
+
+ /* Non-supporting brokers will tear down the conneciton when they
+ * receive an unknown API request or where the SASL GSSAPI
+ * token type is not recognized, so dont retry request on failure. */
+ rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;
+
+ /* 0.9.0.x brokers will not close the connection on unsupported
+ * API requests, so we minimize the timeout of the request.
+ * This is a regression on the broker part. */
+ if (!rkb->rkb_rk->rk_conf.api_version_request &&
+ rkb->rkb_rk->rk_conf.socket_timeout_ms > 10 * 1000)
+ rd_kafka_buf_set_abs_timeout(rkbuf, 10 * 1000 /*10s*/, 0);
+
+ /* ApiVersion 1 / RD_KAFKA_FEATURE_SASL_REQ enables
+ * the SaslAuthenticateRequest */
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_SaslHandshake, 0, 1, &features);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ if (replyq.q)
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb,
+ opaque);
+ else /* in broker thread */
+ rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
+}
+
+
+/**
+ * @brief Parses and handles an SaslAuthenticate reply.
+ *
+ * @returns 0 on success, else an error.
+ *
+ * @locality broker thread
+ * @locks none
+ */
+void rd_kafka_handle_SaslAuthenticate(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ const int log_decode_errors = LOG_ERR;
+ int16_t error_code;
+ rd_kafkap_str_t error_str;
+ rd_kafkap_bytes_t auth_data;
+ char errstr[512];
+
+ if (err) {
+ rd_snprintf(errstr, sizeof(errstr),
+ "SaslAuthenticateRequest failed: %s",
+ rd_kafka_err2str(err));
+ goto err;
+ }
+
+ rd_kafka_buf_read_i16(rkbuf, &error_code);
+ rd_kafka_buf_read_str(rkbuf, &error_str);
+
+ if (error_code) {
+ /* Authentication failed */
+
+ /* For backwards compatibility translate the
+ * new broker-side auth error code to our local error code. */
+ if (error_code == RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED)
+ err = RD_KAFKA_RESP_ERR__AUTHENTICATION;
+ else
+ err = error_code;
+
+ rd_snprintf(errstr, sizeof(errstr), "%.*s",
+ RD_KAFKAP_STR_PR(&error_str));
+ goto err;
+ }
+
+ rd_kafka_buf_read_bytes(rkbuf, &auth_data);
+
+ /* Pass SASL auth frame to SASL handler */
+ if (rd_kafka_sasl_recv(rkb->rkb_transport, auth_data.data,
+ (size_t)RD_KAFKAP_BYTES_LEN(&auth_data), errstr,
+ sizeof(errstr)) == -1) {
+ err = RD_KAFKA_RESP_ERR__AUTHENTICATION;
+ goto err;
+ }
+
+ return;
+
+
+err_parse:
+ err = rkbuf->rkbuf_err;
+ rd_snprintf(errstr, sizeof(errstr),
+ "SaslAuthenticateResponse parsing failed: %s",
+ rd_kafka_err2str(err));
+
+err:
+ rd_kafka_broker_fail(rkb, LOG_ERR, err, "SASL authentication error: %s",
+ errstr);
+}
+
+
+/**
+ * @brief Send SaslAuthenticateRequest (KIP-152)
+ */
+void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb,
+ const void *buf,
+ size_t size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslAuthenticate, 0, 0);
+
+ /* Should be sent before any other requests since it is part of
+ * the initial connection handshake. */
+ rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH;
+
+ /* Broker does not support -1 (Null) for this field */
+ rd_kafka_buf_write_bytes(rkbuf, buf ? buf : "", size);
+
+ /* There are no errors that can be retried, instead
+ * close down the connection and reconnect on failure. */
+ rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;
+
+ if (replyq.q)
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb,
+ opaque);
+ else /* in broker thread */
+ rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
+}
+
+
+
+/**
+ * @struct Hold temporary result and return values from ProduceResponse
+ */
+struct rd_kafka_Produce_result {
+ int64_t offset; /**< Assigned offset of first message */
+ int64_t timestamp; /**< (Possibly assigned) offset of first message */
+};
+
+/**
+ * @brief Parses a Produce reply.
+ * @returns 0 on success or an error code on failure.
+ * @locality broker thread
+ */
+static rd_kafka_resp_err_t
+rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb,
+ rd_kafka_toppar_t *rktp,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ struct rd_kafka_Produce_result *result) {
+ int32_t TopicArrayCnt;
+ int32_t PartitionArrayCnt;
+ struct {
+ int32_t Partition;
+ int16_t ErrorCode;
+ int64_t Offset;
+ } hdr;
+ const int log_decode_errors = LOG_ERR;
+ int64_t log_start_offset = -1;
+
+ rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
+ if (TopicArrayCnt != 1)
+ goto err;
+
+ /* Since we only produce to one single topic+partition in each
+ * request we assume that the reply only contains one topic+partition
+ * and that it is the same that we requested.
+ * If not the broker is buggy. */
+ rd_kafka_buf_skip_str(rkbuf);
+ rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt);
+
+ if (PartitionArrayCnt != 1)
+ goto err;
+
+ rd_kafka_buf_read_i32(rkbuf, &hdr.Partition);
+ rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode);
+ rd_kafka_buf_read_i64(rkbuf, &hdr.Offset);
+
+ result->offset = hdr.Offset;
+
+ result->timestamp = -1;
+ if (request->rkbuf_reqhdr.ApiVersion >= 2)
+ rd_kafka_buf_read_i64(rkbuf, &result->timestamp);
+
+ if (request->rkbuf_reqhdr.ApiVersion >= 5)
+ rd_kafka_buf_read_i64(rkbuf, &log_start_offset);
+
+ if (request->rkbuf_reqhdr.ApiVersion >= 1) {
+ int32_t Throttle_Time;
+ rd_kafka_buf_read_i32(rkbuf, &Throttle_Time);
+
+ rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep,
+ Throttle_Time);
+ }
+
+
+ return hdr.ErrorCode;
+
+err_parse:
+ return rkbuf->rkbuf_err;
+err:
+ return RD_KAFKA_RESP_ERR__BAD_MSG;
+}
+
+
+/**
+ * @struct Hold temporary Produce error state
+ */
+struct rd_kafka_Produce_err {
+ rd_kafka_resp_err_t err; /**< Error code */
+ int actions; /**< Actions to take */
+ int incr_retry; /**< Increase per-message retry cnt */
+ rd_kafka_msg_status_t status; /**< Messages persistence status */
+
+ /* Idempotent Producer */
+ int32_t next_ack_seq; /**< Next expected sequence to ack */
+ int32_t next_err_seq; /**< Next expected error sequence */
+ rd_bool_t update_next_ack; /**< Update next_ack_seq */
+ rd_bool_t update_next_err; /**< Update next_err_seq */
+ rd_kafka_pid_t rktp_pid; /**< Partition's current PID */
+ int32_t last_seq; /**< Last sequence in current batch */
+};
+
+
+/**
+ * @brief Error-handling for Idempotent Producer-specific Produce errors.
+ *
+ * May update \p errp, \p actionsp and \p incr_retryp.
+ *
+ * The resulting \p actionsp are handled by the caller.
+ *
+ * @warning May be called on the old leader thread. Lock rktp appropriately!
+ *
+ * @locality broker thread (but not necessarily the leader broker)
+ * @locks none
+ */
+static void
+rd_kafka_handle_idempotent_Produce_error(rd_kafka_broker_t *rkb,
+ rd_kafka_msgbatch_t *batch,
+ struct rd_kafka_Produce_err *perr) {
+ rd_kafka_t *rk = rkb->rkb_rk;
+ rd_kafka_toppar_t *rktp = batch->rktp;
+ rd_kafka_msg_t *firstmsg, *lastmsg;
+ int r;
+ rd_ts_t now = rd_clock(), state_age;
+ struct rd_kafka_toppar_err last_err;
+
+ rd_kafka_rdlock(rkb->rkb_rk);
+ state_age = now - rkb->rkb_rk->rk_eos.ts_idemp_state;
+ rd_kafka_rdunlock(rkb->rkb_rk);
+
+ firstmsg = rd_kafka_msgq_first(&batch->msgq);
+ lastmsg = rd_kafka_msgq_last(&batch->msgq);
+ rd_assert(firstmsg && lastmsg);
+
+ /* Store the last msgid of the batch
+ * on the first message in case we need to retry
+ * and thus reconstruct the entire batch. */
+ if (firstmsg->rkm_u.producer.last_msgid) {
+ /* last_msgid already set, make sure it
+ * actually points to the last message. */
+ rd_assert(firstmsg->rkm_u.producer.last_msgid ==
+ lastmsg->rkm_u.producer.msgid);
+ } else {
+ firstmsg->rkm_u.producer.last_msgid =
+ lastmsg->rkm_u.producer.msgid;
+ }
+
+ if (!rd_kafka_pid_eq(batch->pid, perr->rktp_pid)) {
+ /* Don't retry if PID changed since we can't
+ * guarantee correctness across PID sessions. */
+ perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT;
+ perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
+
+ rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "ERRPID",
+ "%.*s [%" PRId32
+ "] PID mismatch: "
+ "request %s != partition %s: "
+ "failing messages with error %s",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition, rd_kafka_pid2str(batch->pid),
+ rd_kafka_pid2str(perr->rktp_pid),
+ rd_kafka_err2str(perr->err));
+ return;
+ }
+
+ /*
+ * Special error handling
+ */
+ switch (perr->err) {
+ case RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER:
+ /* Compare request's sequence to expected next
+ * acked sequence.
+ *
+ * Example requests in flight:
+ * R1(base_seq:5) R2(10) R3(15) R4(20)
+ */
+
+ /* Acquire the last partition error to help
+ * troubleshoot this problem. */
+ rd_kafka_toppar_lock(rktp);
+ last_err = rktp->rktp_last_err;
+ rd_kafka_toppar_unlock(rktp);
+
+ r = batch->first_seq - perr->next_ack_seq;
+
+ if (r == 0) {
+ /* R1 failed:
+ * If this was the head-of-line request in-flight it
+ * means there is a state desynchronization between the
+ * producer and broker (a bug), in which case
+ * we'll raise a fatal error since we can no longer
+ * reason about the state of messages and thus
+ * not guarantee ordering or once-ness for R1,
+ * nor give the user a chance to opt out of sending
+ * R2 to R4 which would be retried automatically. */
+
+ rd_kafka_idemp_set_fatal_error(
+ rk, perr->err,
+ "ProduceRequest for %.*s [%" PRId32
+ "] "
+ "with %d message(s) failed "
+ "due to sequence desynchronization with "
+ "broker %" PRId32 " (%s, base seq %" PRId32
+ ", "
+ "idemp state change %" PRId64
+ "ms ago, "
+ "last partition error %s (actions %s, "
+ "base seq %" PRId32 "..%" PRId32
+ ", base msgid %" PRIu64 ", %" PRId64 "ms ago)",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rd_kafka_msgq_len(&batch->msgq), rkb->rkb_nodeid,
+ rd_kafka_pid2str(batch->pid), batch->first_seq,
+ state_age / 1000, rd_kafka_err2name(last_err.err),
+ rd_kafka_actions2str(last_err.actions),
+ last_err.base_seq, last_err.last_seq,
+ last_err.base_msgid,
+ last_err.ts ? (now - last_err.ts) / 1000 : -1);
+
+ perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT;
+ perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
+ perr->update_next_ack = rd_false;
+ perr->update_next_err = rd_true;
+
+ } else if (r > 0) {
+ /* R2 failed:
+ * With max.in.flight > 1 we can have a situation
+ * where the first request in-flight (R1) to the broker
+ * fails, which causes the sub-sequent requests
+ * that are in-flight to have a non-sequential
+ * sequence number and thus fail.
+ * But these sub-sequent requests (R2 to R4) are not at
+ * the risk of being duplicated so we bump the epoch and
+ * re-enqueue the messages for later retry
+ * (without incrementing retries).
+ */
+ rd_rkb_dbg(
+ rkb, MSG | RD_KAFKA_DBG_EOS, "ERRSEQ",
+ "ProduceRequest for %.*s [%" PRId32
+ "] "
+ "with %d message(s) failed "
+ "due to skipped sequence numbers "
+ "(%s, base seq %" PRId32
+ " > "
+ "next seq %" PRId32
+ ") "
+ "caused by previous failed request "
+ "(%s, actions %s, "
+ "base seq %" PRId32 "..%" PRId32
+ ", base msgid %" PRIu64 ", %" PRId64
+ "ms ago): "
+ "recovering and retrying",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rd_kafka_msgq_len(&batch->msgq),
+ rd_kafka_pid2str(batch->pid), batch->first_seq,
+ perr->next_ack_seq, rd_kafka_err2name(last_err.err),
+ rd_kafka_actions2str(last_err.actions),
+ last_err.base_seq, last_err.last_seq,
+ last_err.base_msgid,
+ last_err.ts ? (now - last_err.ts) / 1000 : -1);
+
+ perr->incr_retry = 0;
+ perr->actions = RD_KAFKA_ERR_ACTION_RETRY;
+ perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
+ perr->update_next_ack = rd_false;
+ perr->update_next_err = rd_true;
+
+ rd_kafka_idemp_drain_epoch_bump(
+ rk, perr->err, "skipped sequence numbers");
+
+ } else {
+ /* Request's sequence is less than next ack,
+ * this should never happen unless we have
+ * local bug or the broker did not respond
+ * to the requests in order. */
+ rd_kafka_idemp_set_fatal_error(
+ rk, perr->err,
+ "ProduceRequest for %.*s [%" PRId32
+ "] "
+ "with %d message(s) failed "
+ "with rewound sequence number on "
+ "broker %" PRId32
+ " (%s, "
+ "base seq %" PRId32 " < next seq %" PRId32
+ "): "
+ "last error %s (actions %s, "
+ "base seq %" PRId32 "..%" PRId32
+ ", base msgid %" PRIu64 ", %" PRId64 "ms ago)",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rd_kafka_msgq_len(&batch->msgq), rkb->rkb_nodeid,
+ rd_kafka_pid2str(batch->pid), batch->first_seq,
+ perr->next_ack_seq, rd_kafka_err2name(last_err.err),
+ rd_kafka_actions2str(last_err.actions),
+ last_err.base_seq, last_err.last_seq,
+ last_err.base_msgid,
+ last_err.ts ? (now - last_err.ts) / 1000 : -1);
+
+ perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT;
+ perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
+ perr->update_next_ack = rd_false;
+ perr->update_next_err = rd_false;
+ }
+ break;
+
+ case RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER:
+ /* This error indicates that we successfully produced
+ * this set of messages before but this (supposed) retry failed.
+ *
+ * Treat as success, however offset and timestamp
+ * will be invalid. */
+
+ /* Future improvement/FIXME:
+ * But first make sure the first message has actually
+ * been retried, getting this error for a non-retried message
+ * indicates a synchronization issue or bug. */
+ rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "DUPSEQ",
+ "ProduceRequest for %.*s [%" PRId32
+ "] "
+ "with %d message(s) failed "
+ "due to duplicate sequence number: "
+ "previous send succeeded but was not acknowledged "
+ "(%s, base seq %" PRId32
+ "): "
+ "marking the messages successfully delivered",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rd_kafka_msgq_len(&batch->msgq),
+ rd_kafka_pid2str(batch->pid), batch->first_seq);
+
+ /* Void error, delivery succeeded */
+ perr->err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ perr->actions = 0;
+ perr->status = RD_KAFKA_MSG_STATUS_PERSISTED;
+ perr->update_next_ack = rd_true;
+ perr->update_next_err = rd_true;
+ break;
+
+ case RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID:
+ /* The broker/cluster lost track of our PID because
+ * the last message we produced has now been deleted
+ * (by DeleteRecords, compaction, or topic retention policy).
+ *
+ * If all previous messages are accounted for and this is not
+ * a retry we can simply bump the epoch and reset the sequence
+ * number and then retry the message(s) again.
+ *
+ * If there are outstanding messages not yet acknowledged
+ * then there is no safe way to carry on without risking
+ * duplication or reordering, in which case we fail
+ * the producer.
+ *
+ * In case of the transactional producer and a transaction
+ * coordinator that supports KIP-360 (>= AK 2.5, checked from
+ * the txnmgr, not here) we'll raise an abortable error and
+ * flag that the epoch needs to be bumped on the coordinator. */
+ if (rd_kafka_is_transactional(rk)) {
+ rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "UNKPID",
+ "ProduceRequest for %.*s [%" PRId32
+ "] "
+ "with %d message(s) failed "
+ "due to unknown producer id "
+ "(%s, base seq %" PRId32
+ ", %d retries): "
+ "failing the current transaction",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rd_kafka_msgq_len(&batch->msgq),
+ rd_kafka_pid2str(batch->pid),
+ batch->first_seq,
+ firstmsg->rkm_u.producer.retries);
+
+ /* Drain outstanding requests and bump epoch. */
+ rd_kafka_idemp_drain_epoch_bump(rk, perr->err,
+ "unknown producer id");
+
+ rd_kafka_txn_set_abortable_error_with_bump(
+ rk, RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
+ "ProduceRequest for %.*s [%" PRId32
+ "] "
+ "with %d message(s) failed "
+ "due to unknown producer id",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rd_kafka_msgq_len(&batch->msgq));
+
+ perr->incr_retry = 0;
+ perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT;
+ perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
+ perr->update_next_ack = rd_false;
+ perr->update_next_err = rd_true;
+ break;
+
+ } else if (!firstmsg->rkm_u.producer.retries &&
+ perr->next_err_seq == batch->first_seq) {
+ rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "UNKPID",
+ "ProduceRequest for %.*s [%" PRId32
+ "] "
+ "with %d message(s) failed "
+ "due to unknown producer id "
+ "(%s, base seq %" PRId32
+ ", %d retries): "
+ "no risk of duplication/reordering: "
+ "resetting PID and retrying",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rd_kafka_msgq_len(&batch->msgq),
+ rd_kafka_pid2str(batch->pid),
+ batch->first_seq,
+ firstmsg->rkm_u.producer.retries);
+
+ /* Drain outstanding requests and bump epoch. */
+ rd_kafka_idemp_drain_epoch_bump(rk, perr->err,
+ "unknown producer id");
+
+ perr->incr_retry = 0;
+ perr->actions = RD_KAFKA_ERR_ACTION_RETRY;
+ perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
+ perr->update_next_ack = rd_false;
+ perr->update_next_err = rd_true;
+ break;
+ }
+
+ rd_kafka_idemp_set_fatal_error(
+ rk, perr->err,
+ "ProduceRequest for %.*s [%" PRId32
+ "] "
+ "with %d message(s) failed "
+ "due to unknown producer id ("
+ "broker %" PRId32 " %s, base seq %" PRId32
+ ", %d retries): "
+ "unable to retry without risking "
+ "duplication/reordering",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition, rd_kafka_msgq_len(&batch->msgq),
+ rkb->rkb_nodeid, rd_kafka_pid2str(batch->pid),
+ batch->first_seq, firstmsg->rkm_u.producer.retries);
+
+ perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT;
+ perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
+ perr->update_next_ack = rd_false;
+ perr->update_next_err = rd_true;
+ break;
+
+ default:
+ /* All other errors are handled in the standard
+ * error Produce handler, which will set
+ * update_next_ack|err accordingly. */
+ break;
+ }
+}
+
+
+
+/**
+ * @brief Error-handling for failed ProduceRequests
+ *
+ * @param errp Is the input and output error, it may be changed
+ * by this function.
+ *
+ * @returns 0 if no further processing of the request should be performed,
+ * such as triggering delivery reports, else 1.
+ *
+ * @warning May be called on the old leader thread. Lock rktp appropriately!
+ *
+ * @warning \p request may be NULL.
+ *
+ * @locality broker thread (but not necessarily the leader broker)
+ * @locks none
+ */
+static int rd_kafka_handle_Produce_error(rd_kafka_broker_t *rkb,
+ const rd_kafka_buf_t *request,
+ rd_kafka_msgbatch_t *batch,
+ struct rd_kafka_Produce_err *perr) {
+ rd_kafka_t *rk = rkb->rkb_rk;
+ rd_kafka_toppar_t *rktp = batch->rktp;
+ int is_leader;
+
+ if (unlikely(perr->err == RD_KAFKA_RESP_ERR__DESTROY))
+ return 0; /* Terminating */
+
+ /* When there is a partition leader change any outstanding
+ * requests to the old broker will be handled by the old
+ * broker thread when the responses are received/timeout:
+ * in this case we need to be careful with locking:
+ * check once if we're the leader (which allows relaxed
+ * locking), and cache the current rktp's eos state vars. */
+ rd_kafka_toppar_lock(rktp);
+ is_leader = rktp->rktp_broker == rkb;
+ perr->rktp_pid = rktp->rktp_eos.pid;
+ perr->next_ack_seq = rktp->rktp_eos.next_ack_seq;
+ perr->next_err_seq = rktp->rktp_eos.next_err_seq;
+ rd_kafka_toppar_unlock(rktp);
+
+ /* All failures are initially treated as if the message
+ * was not persisted, but the status may be changed later
+ * for specific errors and actions. */
+ perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
+
+ /* Set actions for known errors (may be overriden later),
+ * all other errors are considered permanent failures.
+ * (also see rd_kafka_err_action() for the default actions). */
+ perr->actions = rd_kafka_err_action(
+ rkb, perr->err, request,
+
+ RD_KAFKA_ERR_ACTION_REFRESH |
+ RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
+ RD_KAFKA_RESP_ERR__TRANSPORT,
+
+ RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
+ RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT |
+ RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
+ RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
+
+ RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY |
+ RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
+ RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
+
+ RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
+ RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,
+
+ RD_KAFKA_ERR_ACTION_RETRY |
+ RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
+ RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,
+
+ RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
+ RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
+
+ RD_KAFKA_ERR_ACTION_RETRY |
+ RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
+ RD_KAFKA_RESP_ERR__TIMED_OUT,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT |
+ RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
+ RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
+
+ /* All Idempotent Producer-specific errors are
+ * initially set as permanent errors,
+ * special handling may change the actions. */
+ RD_KAFKA_ERR_ACTION_PERMANENT |
+ RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
+ RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT |
+ RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
+ RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT |
+ RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
+ RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT |
+ RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
+ RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH,
+
+ /* Message was purged from out-queue due to
+ * Idempotent Producer Id change */
+ RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__RETRY,
+
+ RD_KAFKA_ERR_ACTION_END);
+
+ rd_rkb_dbg(rkb, MSG, "MSGSET",
+ "%s [%" PRId32
+ "]: MessageSet with %i message(s) "
+ "(MsgId %" PRIu64 ", BaseSeq %" PRId32
+ ") "
+ "encountered error: %s (actions %s)%s",
+ rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
+ rd_kafka_msgq_len(&batch->msgq), batch->first_msgid,
+ batch->first_seq, rd_kafka_err2str(perr->err),
+ rd_kafka_actions2str(perr->actions),
+ is_leader ? "" : " [NOT LEADER]");
+
+
+ /*
+ * Special handling for Idempotent Producer
+ *
+ * Note: Idempotent Producer-specific errors received
+ * on a non-idempotent producer will be passed through
+ * directly to the application.
+ */
+ if (rd_kafka_is_idempotent(rk))
+ rd_kafka_handle_idempotent_Produce_error(rkb, batch, perr);
+
+ /* Update message persistence status based on action flags.
+ * None of these are typically set after an idempotent error,
+ * which sets the status explicitly. */
+ if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED)
+ perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
+ else if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED)
+ perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
+ else if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_PERSISTED)
+ perr->status = RD_KAFKA_MSG_STATUS_PERSISTED;
+
+ /* Save the last error for debugging sub-sequent errors,
+ * useful for Idempotent Producer throubleshooting. */
+ rd_kafka_toppar_lock(rktp);
+ rktp->rktp_last_err.err = perr->err;
+ rktp->rktp_last_err.actions = perr->actions;
+ rktp->rktp_last_err.ts = rd_clock();
+ rktp->rktp_last_err.base_seq = batch->first_seq;
+ rktp->rktp_last_err.last_seq = perr->last_seq;
+ rktp->rktp_last_err.base_msgid = batch->first_msgid;
+ rd_kafka_toppar_unlock(rktp);
+
+ /*
+ * Handle actions
+ */
+ if (perr->actions &
+ (RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY)) {
+ /* Retry (refresh also implies retry) */
+
+ if (perr->actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ /* Request metadata information update.
+ * These errors imply that we have stale
+ * information and the request was
+ * either rejected or not sent -
+ * we don't need to increment the retry count
+ * when we perform a retry since:
+ * - it is a temporary error (hopefully)
+ * - there is no chance of duplicate delivery
+ */
+ rd_kafka_toppar_leader_unavailable(rktp, "produce",
+ perr->err);
+
+ /* We can't be certain the request wasn't
+ * sent in case of transport failure,
+ * so the ERR__TRANSPORT case will need
+ * the retry count to be increased,
+ * In case of certain other errors we want to
+ * avoid retrying for the duration of the
+ * message.timeout.ms to speed up error propagation. */
+ if (perr->err != RD_KAFKA_RESP_ERR__TRANSPORT &&
+ perr->err != RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR)
+ perr->incr_retry = 0;
+ }
+
+ /* If message timed out in queue, not in transit,
+ * we will retry at a later time but not increment
+ * the retry count since there is no risk
+ * of duplicates. */
+ if (!rd_kafka_buf_was_sent(request))
+ perr->incr_retry = 0;
+
+ if (!perr->incr_retry) {
+ /* If retries are not to be incremented then
+ * there is no chance of duplicates on retry, which
+ * means these messages were not persisted. */
+ perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
+ }
+
+ if (rd_kafka_is_idempotent(rk)) {
+ /* Any currently in-flight requests will
+ * fail with ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
+ * which should not be treated as a fatal error
+ * since this request and sub-sequent requests
+ * will be retried and thus return to order.
+ * Unless the error was a timeout, or similar,
+ * in which case the request might have made it
+ * and the messages are considered possibly persisted:
+ * in this case we allow the next in-flight response
+ * to be successful, in which case we mark
+ * this request's messages as succesfully delivered. */
+ if (perr->status &
+ RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED)
+ perr->update_next_ack = rd_true;
+ else
+ perr->update_next_ack = rd_false;
+ perr->update_next_err = rd_true;
+
+ /* Drain outstanding requests so that retries
+ * are attempted with proper state knowledge and
+ * without any in-flight requests. */
+ rd_kafka_toppar_lock(rktp);
+ rd_kafka_idemp_drain_toppar(rktp,
+ "drain before retrying");
+ rd_kafka_toppar_unlock(rktp);
+ }
+
+ /* Since requests are specific to a broker
+ * we move the retryable messages from the request
+ * back to the partition queue (prepend) and then
+ * let the new broker construct a new request.
+ * While doing this we also make sure the retry count
+ * for each message is honoured, any messages that
+ * would exceeded the retry count will not be
+ * moved but instead fail below. */
+ rd_kafka_toppar_retry_msgq(rktp, &batch->msgq, perr->incr_retry,
+ perr->status);
+
+ if (rd_kafka_msgq_len(&batch->msgq) == 0) {
+ /* No need do anything more with the request
+ * here since the request no longer has any
+ * messages associated with it. */
+ return 0;
+ }
+ }
+
+ if (perr->actions & RD_KAFKA_ERR_ACTION_PERMANENT &&
+ rd_kafka_is_idempotent(rk)) {
+ if (rd_kafka_is_transactional(rk) &&
+ perr->err == RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH) {
+ /* Producer was fenced by new transactional producer
+ * with the same transactional.id */
+ rd_kafka_txn_set_fatal_error(
+ rk, RD_DO_LOCK, RD_KAFKA_RESP_ERR__FENCED,
+ "ProduceRequest for %.*s [%" PRId32
+ "] "
+ "with %d message(s) failed: %s "
+ "(broker %" PRId32 " %s, base seq %" PRId32
+ "): "
+ "transactional producer fenced by newer "
+ "producer instance",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rd_kafka_msgq_len(&batch->msgq),
+ rd_kafka_err2str(perr->err), rkb->rkb_nodeid,
+ rd_kafka_pid2str(batch->pid), batch->first_seq);
+
+ /* Drain outstanding requests and reset PID. */
+ rd_kafka_idemp_drain_reset(
+ rk, "fenced by new transactional producer");
+
+ } else if (rd_kafka_is_transactional(rk)) {
+ /* When transactional any permanent produce failure
+ * would lead to an incomplete transaction, so raise
+ * an abortable transaction error. */
+ rd_kafka_txn_set_abortable_error(
+ rk, perr->err,
+ "ProduceRequest for %.*s [%" PRId32
+ "] "
+ "with %d message(s) failed: %s "
+ "(broker %" PRId32 " %s, base seq %" PRId32
+ "): "
+ "current transaction must be aborted",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rd_kafka_msgq_len(&batch->msgq),
+ rd_kafka_err2str(perr->err), rkb->rkb_nodeid,
+ rd_kafka_pid2str(batch->pid), batch->first_seq);
+
+ } else if (rk->rk_conf.eos.gapless) {
+ /* A permanent non-idempotent error will lead to
+ * gaps in the message series, the next request
+ * will fail with ...ERR_OUT_OF_ORDER_SEQUENCE_NUMBER.
+ * To satisfy the gapless guarantee we need to raise
+ * a fatal error here. */
+ rd_kafka_idemp_set_fatal_error(
+ rk, RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE,
+ "ProduceRequest for %.*s [%" PRId32
+ "] "
+ "with %d message(s) failed: "
+ "%s (broker %" PRId32 " %s, base seq %" PRId32
+ "): "
+ "unable to satisfy gap-less guarantee",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rd_kafka_msgq_len(&batch->msgq),
+ rd_kafka_err2str(perr->err), rkb->rkb_nodeid,
+ rd_kafka_pid2str(batch->pid), batch->first_seq);
+
+ /* Drain outstanding requests and reset PID. */
+ rd_kafka_idemp_drain_reset(
+ rk, "unable to satisfy gap-less guarantee");
+
+ } else {
+ /* If gapless is not set we bump the Epoch and
+ * renumber the messages to send. */
+
+ /* Drain outstanding requests and bump the epoch .*/
+ rd_kafka_idemp_drain_epoch_bump(rk, perr->err,
+ "message sequence gap");
+ }
+
+ perr->update_next_ack = rd_false;
+ /* Make sure the next error will not raise a fatal error. */
+ perr->update_next_err = rd_true;
+ }
+
+ if (perr->err == RD_KAFKA_RESP_ERR__TIMED_OUT ||
+ perr->err == RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE) {
+ /* Translate request-level timeout error code
+ * to message-level timeout error code. */
+ perr->err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
+
+ } else if (perr->err == RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED) {
+ /* If we're no longer authorized to access the topic mark
+ * it as errored to deny further produce requests. */
+ rd_kafka_topic_wrlock(rktp->rktp_rkt);
+ rd_kafka_topic_set_error(rktp->rktp_rkt, perr->err);
+ rd_kafka_topic_wrunlock(rktp->rktp_rkt);
+ }
+
+ return 1;
+}
+
+/**
+ * @brief Handle ProduceResponse success for idempotent producer
+ *
+ * @warning May be called on the old leader thread. Lock rktp appropriately!
+ *
+ * @locks none
+ * @locality broker thread (but not necessarily the leader broker thread)
+ */
+static void
+rd_kafka_handle_idempotent_Produce_success(rd_kafka_broker_t *rkb,
+ rd_kafka_msgbatch_t *batch,
+ int32_t next_seq) {
+ rd_kafka_t *rk = rkb->rkb_rk;
+ rd_kafka_toppar_t *rktp = batch->rktp;
+ char fatal_err[512];
+ uint64_t first_msgid, last_msgid;
+
+ *fatal_err = '\0';
+
+ first_msgid = rd_kafka_msgq_first(&batch->msgq)->rkm_u.producer.msgid;
+ last_msgid = rd_kafka_msgq_last(&batch->msgq)->rkm_u.producer.msgid;
+
+ rd_kafka_toppar_lock(rktp);
+
+ /* If the last acked msgid is higher than
+ * the next message to (re)transmit in the message queue
+ * it means a previous series of R1,R2 ProduceRequests
+ * had R1 fail with uncertain persistence status,
+ * such as timeout or transport error, but R2 succeeded,
+ * which means the messages in R1 were in fact persisted.
+ * In this case trigger delivery reports for all messages
+ * in queue until we hit a non-acked message msgid. */
+ if (unlikely(rktp->rktp_eos.acked_msgid < first_msgid - 1)) {
+ rd_kafka_dr_implicit_ack(rkb, rktp, last_msgid);
+
+ } else if (unlikely(batch->first_seq != rktp->rktp_eos.next_ack_seq &&
+ batch->first_seq == rktp->rktp_eos.next_err_seq)) {
+ /* Response ordering is typically not a concern
+ * (but will not happen with current broker versions),
+ * unless we're expecting an error to be returned at
+ * this sequence rather than a success ack, in which
+ * case raise a fatal error. */
+
+ /* Can't call set_fatal_error() while
+ * holding the toppar lock, so construct
+ * the error string here and call
+ * set_fatal_error() below after
+ * toppar lock has been released. */
+ rd_snprintf(fatal_err, sizeof(fatal_err),
+ "ProduceRequest for %.*s [%" PRId32
+ "] "
+ "with %d message(s) "
+ "succeeded when expecting failure "
+ "(broker %" PRId32
+ " %s, "
+ "base seq %" PRId32
+ ", "
+ "next ack seq %" PRId32
+ ", "
+ "next err seq %" PRId32
+ ": "
+ "unable to retry without risking "
+ "duplication/reordering",
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition,
+ rd_kafka_msgq_len(&batch->msgq), rkb->rkb_nodeid,
+ rd_kafka_pid2str(batch->pid), batch->first_seq,
+ rktp->rktp_eos.next_ack_seq,
+ rktp->rktp_eos.next_err_seq);
+
+ rktp->rktp_eos.next_err_seq = next_seq;
+ }
+
+ if (likely(!*fatal_err)) {
+ /* Advance next expected err and/or ack sequence */
+
+ /* Only step err seq if it hasn't diverged. */
+ if (rktp->rktp_eos.next_err_seq == rktp->rktp_eos.next_ack_seq)
+ rktp->rktp_eos.next_err_seq = next_seq;
+
+ rktp->rktp_eos.next_ack_seq = next_seq;
+ }
+
+ /* Store the last acked message sequence,
+ * since retries within the broker cache window (5 requests)
+ * will succeed for older messages we must only update the
+ * acked msgid if it is higher than the last acked. */
+ if (last_msgid > rktp->rktp_eos.acked_msgid)
+ rktp->rktp_eos.acked_msgid = last_msgid;
+
+ rd_kafka_toppar_unlock(rktp);
+
+ /* Must call set_fatal_error() after releasing
+ * the toppar lock. */
+ if (unlikely(*fatal_err))
+ rd_kafka_idemp_set_fatal_error(
+ rk, RD_KAFKA_RESP_ERR__INCONSISTENT, "%s", fatal_err);
+}
+
+
+/**
+ * @brief Handle ProduceRequest result for a message batch.
+ *
+ * @warning \p request may be NULL.
+ *
+ * @localiy broker thread (but not necessarily the toppar's handler thread)
+ * @locks none
+ */
+static void rd_kafka_msgbatch_handle_Produce_result(
+ rd_kafka_broker_t *rkb,
+ rd_kafka_msgbatch_t *batch,
+ rd_kafka_resp_err_t err,
+ const struct rd_kafka_Produce_result *presult,
+ const rd_kafka_buf_t *request) {
+
+ rd_kafka_t *rk = rkb->rkb_rk;
+ rd_kafka_toppar_t *rktp = batch->rktp;
+ rd_kafka_msg_status_t status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
+ rd_bool_t last_inflight;
+ int32_t next_seq;
+
+ /* Decrease partition's messages in-flight counter */
+ rd_assert(rd_atomic32_get(&rktp->rktp_msgs_inflight) >=
+ rd_kafka_msgq_len(&batch->msgq));
+ last_inflight = !rd_atomic32_sub(&rktp->rktp_msgs_inflight,
+ rd_kafka_msgq_len(&batch->msgq));
+
+ /* Next expected sequence (and handle wrap) */
+ next_seq = rd_kafka_seq_wrap(batch->first_seq +
+ rd_kafka_msgq_len(&batch->msgq));
+
+ if (likely(!err)) {
+ rd_rkb_dbg(rkb, MSG, "MSGSET",
+ "%s [%" PRId32
+ "]: MessageSet with %i message(s) "
+ "(MsgId %" PRIu64 ", BaseSeq %" PRId32 ") delivered",
+ rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
+ rd_kafka_msgq_len(&batch->msgq), batch->first_msgid,
+ batch->first_seq);
+
+ if (rktp->rktp_rkt->rkt_conf.required_acks != 0)
+ status = RD_KAFKA_MSG_STATUS_PERSISTED;
+
+ if (rd_kafka_is_idempotent(rk))
+ rd_kafka_handle_idempotent_Produce_success(rkb, batch,
+ next_seq);
+ } else {
+ /* Error handling */
+ struct rd_kafka_Produce_err perr = {
+ .err = err,
+ .incr_retry = 1,
+ .status = status,
+ .update_next_ack = rd_true,
+ .update_next_err = rd_true,
+ .last_seq = (batch->first_seq +
+ rd_kafka_msgq_len(&batch->msgq) - 1)};
+
+ rd_kafka_handle_Produce_error(rkb, request, batch, &perr);
+
+ /* Update next expected acked and/or err sequence. */
+ if (perr.update_next_ack || perr.update_next_err) {
+ rd_kafka_toppar_lock(rktp);
+ if (perr.update_next_ack)
+ rktp->rktp_eos.next_ack_seq = next_seq;
+ if (perr.update_next_err)
+ rktp->rktp_eos.next_err_seq = next_seq;
+ rd_kafka_toppar_unlock(rktp);
+ }
+
+ err = perr.err;
+ status = perr.status;
+ }
+
+
+ /* Messages to retry will have been removed from the request's queue */
+ if (likely(rd_kafka_msgq_len(&batch->msgq) > 0)) {
+ /* Set offset, timestamp and status for each message. */
+ rd_kafka_msgq_set_metadata(&batch->msgq, rkb->rkb_nodeid,
+ presult->offset, presult->timestamp,
+ status);
+
+ /* Enqueue messages for delivery report. */
+ rd_kafka_dr_msgq(rktp->rktp_rkt, &batch->msgq, err);
+ }
+
+ if (rd_kafka_is_idempotent(rk) && last_inflight)
+ rd_kafka_idemp_inflight_toppar_sub(rk, rktp);
+}
+
+
+/**
+ * @brief Handle ProduceResponse
+ *
+ * @param reply is NULL when `acks=0` and on various local errors.
+ *
+ * @remark ProduceRequests are never retried, retriable errors are
+ * instead handled by re-enqueuing the request's messages back
+ * on the partition queue to have a new ProduceRequest constructed
+ * eventually.
+ *
+ * @warning May be called on the old leader thread. Lock rktp appropriately!
+ *
+ * @locality broker thread (but not necessarily the leader broker thread)
+ */
+static void rd_kafka_handle_Produce(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *reply,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_msgbatch_t *batch = &request->rkbuf_batch;
+ rd_kafka_toppar_t *rktp = batch->rktp;
+ struct rd_kafka_Produce_result result = {
+ .offset = RD_KAFKA_OFFSET_INVALID, .timestamp = -1};
+
+ /* Unit test interface: inject errors */
+ if (unlikely(rk->rk_conf.ut.handle_ProduceResponse != NULL)) {
+ err = rk->rk_conf.ut.handle_ProduceResponse(
+ rkb->rkb_rk, rkb->rkb_nodeid, batch->first_msgid, err);
+ }
+
+ /* Parse Produce reply (unless the request errored) */
+ if (!err && reply)
+ err = rd_kafka_handle_Produce_parse(rkb, rktp, reply, request,
+ &result);
+
+ rd_kafka_msgbatch_handle_Produce_result(rkb, batch, err, &result,
+ request);
+}
+
+
+/**
+ * @brief Send ProduceRequest for messages in toppar queue.
+ *
+ * @returns the number of messages included, or 0 on error / no messages.
+ *
+ * @locality broker thread
+ */
+int rd_kafka_ProduceRequest(rd_kafka_broker_t *rkb,
+ rd_kafka_toppar_t *rktp,
+ const rd_kafka_pid_t pid,
+ uint64_t epoch_base_msgid) {
+ rd_kafka_buf_t *rkbuf;
+ rd_kafka_topic_t *rkt = rktp->rktp_rkt;
+ size_t MessageSetSize = 0;
+ int cnt;
+ rd_ts_t now;
+ int64_t first_msg_timeout;
+ int tmout;
+
+ /**
+ * Create ProduceRequest with as many messages from the toppar
+ * transmit queue as possible.
+ */
+ rkbuf = rd_kafka_msgset_create_ProduceRequest(
+ rkb, rktp, &rktp->rktp_xmit_msgq, pid, epoch_base_msgid,
+ &MessageSetSize);
+ if (unlikely(!rkbuf))
+ return 0;
+
+ cnt = rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq);
+ rd_dassert(cnt > 0);
+
+ rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchcnt, (int64_t)cnt);
+ rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchsize, (int64_t)MessageSetSize);
+
+ if (!rkt->rkt_conf.required_acks)
+ rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE;
+
+ /* Use timeout from first message in batch */
+ now = rd_clock();
+ first_msg_timeout =
+ (rd_kafka_msgq_first(&rkbuf->rkbuf_batch.msgq)->rkm_ts_timeout -
+ now) /
+ 1000;
+
+ if (unlikely(first_msg_timeout <= 0)) {
+ /* Message has already timed out, allow 100 ms
+ * to produce anyway */
+ tmout = 100;
+ } else {
+ tmout = (int)RD_MIN(INT_MAX, first_msg_timeout);
+ }
+
+ /* Set absolute timeout (including retries), the
+ * effective timeout for this specific request will be
+ * capped by socket.timeout.ms */
+ rd_kafka_buf_set_abs_timeout(rkbuf, tmout, now);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, RD_KAFKA_NO_REPLYQ,
+ rd_kafka_handle_Produce, NULL);
+
+ return cnt;
+}
+
+
+/**
+ * @brief Construct and send CreateTopicsRequest to \p rkb
+ * with the topics (NewTopic_t*) in \p new_topics, using
+ * \p options.
+ *
+ * The response (unparsed) will be enqueued on \p replyq
+ * for handling by \p resp_cb (with \p opaque passed).
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code and errstr will be
+ * updated with a human readable error string.
+ */
+rd_kafka_resp_err_t
+rd_kafka_CreateTopicsRequest(rd_kafka_broker_t *rkb,
+ const rd_list_t *new_topics /*(NewTopic_t*)*/,
+ rd_kafka_AdminOptions_t *options,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ int features;
+ int i = 0;
+ rd_kafka_NewTopic_t *newt;
+ int op_timeout;
+
+ if (rd_list_cnt(new_topics) == 0) {
+ rd_snprintf(errstr, errstr_size, "No topics to create");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_CreateTopics, 0, 4, &features);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "Topic Admin API (KIP-4) not supported "
+ "by broker, requires broker version >= 0.10.2.0");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ if (rd_kafka_confval_get_int(&options->validate_only) &&
+ ApiVersion < 1) {
+ rd_snprintf(errstr, errstr_size,
+ "CreateTopics.validate_only=true not "
+ "supported by broker");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreateTopics, 1,
+ 4 + (rd_list_cnt(new_topics) * 200) +
+ 4 + 1);
+
+ /* #topics */
+ rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_topics));
+
+ while ((newt = rd_list_elem(new_topics, i++))) {
+ int partition;
+ int ei = 0;
+ const rd_kafka_ConfigEntry_t *entry;
+
+ if (ApiVersion < 4) {
+ if (newt->num_partitions == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "Default partition count (KIP-464) "
+ "not supported by broker, "
+ "requires broker version <= 2.4.0");
+ rd_kafka_replyq_destroy(&replyq);
+ rd_kafka_buf_destroy(rkbuf);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ if (newt->replication_factor == -1 &&
+ rd_list_empty(&newt->replicas)) {
+ rd_snprintf(errstr, errstr_size,
+ "Default replication factor "
+ "(KIP-464) "
+ "not supported by broker, "
+ "requires broker version <= 2.4.0");
+ rd_kafka_replyq_destroy(&replyq);
+ rd_kafka_buf_destroy(rkbuf);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+ }
+
+ /* topic */
+ rd_kafka_buf_write_str(rkbuf, newt->topic, -1);
+
+ if (rd_list_cnt(&newt->replicas)) {
+ /* num_partitions and replication_factor must be
+ * set to -1 if a replica assignment is sent. */
+ /* num_partitions */
+ rd_kafka_buf_write_i32(rkbuf, -1);
+ /* replication_factor */
+ rd_kafka_buf_write_i16(rkbuf, -1);
+ } else {
+ /* num_partitions */
+ rd_kafka_buf_write_i32(rkbuf, newt->num_partitions);
+ /* replication_factor */
+ rd_kafka_buf_write_i16(
+ rkbuf, (int16_t)newt->replication_factor);
+ }
+
+ /* #replica_assignment */
+ rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->replicas));
+
+ /* Replicas per partition, see rdkafka_admin.[ch]
+ * for how these are constructed. */
+ for (partition = 0; partition < rd_list_cnt(&newt->replicas);
+ partition++) {
+ const rd_list_t *replicas;
+ int ri = 0;
+
+ replicas = rd_list_elem(&newt->replicas, partition);
+ if (!replicas)
+ continue;
+
+ /* partition */
+ rd_kafka_buf_write_i32(rkbuf, partition);
+ /* #replicas */
+ rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(replicas));
+
+ for (ri = 0; ri < rd_list_cnt(replicas); ri++) {
+ /* replica */
+ rd_kafka_buf_write_i32(
+ rkbuf, rd_list_get_int32(replicas, ri));
+ }
+ }
+
+ /* #config_entries */
+ rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->config));
+
+ RD_LIST_FOREACH(entry, &newt->config, ei) {
+ /* config_name */
+ rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1);
+ /* config_value (nullable) */
+ rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1);
+ }
+ }
+
+ /* timeout */
+ op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
+ rd_kafka_buf_write_i32(rkbuf, op_timeout);
+
+ if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
+ rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
+
+ if (ApiVersion >= 1) {
+ /* validate_only */
+ rd_kafka_buf_write_i8(
+ rkbuf, rd_kafka_confval_get_int(&options->validate_only));
+ }
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief Construct and send DeleteTopicsRequest to \p rkb
+ * with the topics (DeleteTopic_t *) in \p del_topics, using
+ * \p options.
+ *
+ * The response (unparsed) will be enqueued on \p replyq
+ * for handling by \p resp_cb (with \p opaque passed).
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code and errstr will be
+ * updated with a human readable error string.
+ */
+rd_kafka_resp_err_t
+rd_kafka_DeleteTopicsRequest(rd_kafka_broker_t *rkb,
+ const rd_list_t *del_topics /*(DeleteTopic_t*)*/,
+ rd_kafka_AdminOptions_t *options,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ int features;
+ int i = 0;
+ rd_kafka_DeleteTopic_t *delt;
+ int op_timeout;
+
+ if (rd_list_cnt(del_topics) == 0) {
+ rd_snprintf(errstr, errstr_size, "No topics to delete");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_DeleteTopics, 0, 1, &features);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "Topic Admin API (KIP-4) not supported "
+ "by broker, requires broker version >= 0.10.2.0");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ rkbuf =
+ rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteTopics, 1,
+ /* FIXME */
+ 4 + (rd_list_cnt(del_topics) * 100) + 4);
+
+ /* #topics */
+ rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(del_topics));
+
+ while ((delt = rd_list_elem(del_topics, i++)))
+ rd_kafka_buf_write_str(rkbuf, delt->topic, -1);
+
+ /* timeout */
+ op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
+ rd_kafka_buf_write_i32(rkbuf, op_timeout);
+
+ if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
+ rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief Construct and send DeleteRecordsRequest to \p rkb
+ * with the offsets to delete (rd_kafka_topic_partition_list_t *) in
+ * \p offsets_list, using \p options.
+ *
+ * The response (unparsed) will be enqueued on \p replyq
+ * for handling by \p resp_cb (with \p opaque passed).
+ *
+ * @remark The rd_kafka_topic_partition_list_t in \p offsets_list must already
+ * be sorted.
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code and errstr will be
+ * updated with a human readable error string.
+ */
+rd_kafka_resp_err_t
+rd_kafka_DeleteRecordsRequest(rd_kafka_broker_t *rkb,
+ /*(rd_kafka_topic_partition_list_t*)*/
+ const rd_list_t *offsets_list,
+ rd_kafka_AdminOptions_t *options,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ int features;
+ const rd_kafka_topic_partition_list_t *partitions;
+ int op_timeout;
+
+ partitions = rd_list_elem(offsets_list, 0);
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_DeleteRecords, 0, 1, &features);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "DeleteRecords Admin API (KIP-107) not supported "
+ "by broker, requires broker version >= 0.11.0");
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteRecords, 1,
+ 4 + (partitions->cnt * 100) + 4);
+
+ const rd_kafka_topic_partition_field_t fields[] = {
+ RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
+ RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET,
+ RD_KAFKA_TOPIC_PARTITION_FIELD_END};
+ rd_kafka_buf_write_topic_partitions(
+ rkbuf, partitions, rd_false /*don't skip invalid offsets*/,
+ rd_false /*any offset*/, fields);
+
+ /* timeout */
+ op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
+ rd_kafka_buf_write_i32(rkbuf, op_timeout);
+
+ if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
+ rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief Construct and send CreatePartitionsRequest to \p rkb
+ * with the topics (NewPartitions_t*) in \p new_parts, using
+ * \p options.
+ *
+ * The response (unparsed) will be enqueued on \p replyq
+ * for handling by \p resp_cb (with \p opaque passed).
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code and errstr will be
+ * updated with a human readable error string.
+ */
+rd_kafka_resp_err_t
+rd_kafka_CreatePartitionsRequest(rd_kafka_broker_t *rkb,
+ /*(NewPartitions_t*)*/
+ const rd_list_t *new_parts,
+ rd_kafka_AdminOptions_t *options,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ int i = 0;
+ rd_kafka_NewPartitions_t *newp;
+ int op_timeout;
+
+ if (rd_list_cnt(new_parts) == 0) {
+ rd_snprintf(errstr, errstr_size, "No partitions to create");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_CreatePartitions, 0, 0, NULL);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "CreatePartitions (KIP-195) not supported "
+ "by broker, requires broker version >= 1.0.0");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreatePartitions, 1,
+ 4 + (rd_list_cnt(new_parts) * 200) +
+ 4 + 1);
+
+ /* #topics */
+ rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_parts));
+
+ while ((newp = rd_list_elem(new_parts, i++))) {
+ /* topic */
+ rd_kafka_buf_write_str(rkbuf, newp->topic, -1);
+
+ /* New partition count */
+ rd_kafka_buf_write_i32(rkbuf, (int32_t)newp->total_cnt);
+
+ /* #replica_assignment */
+ if (rd_list_empty(&newp->replicas)) {
+ rd_kafka_buf_write_i32(rkbuf, -1);
+ } else {
+ const rd_list_t *replicas;
+ int pi = -1;
+
+ rd_kafka_buf_write_i32(rkbuf,
+ rd_list_cnt(&newp->replicas));
+
+ while (
+ (replicas = rd_list_elem(&newp->replicas, ++pi))) {
+ int ri = 0;
+
+ /* replica count */
+ rd_kafka_buf_write_i32(rkbuf,
+ rd_list_cnt(replicas));
+
+ /* replica */
+ for (ri = 0; ri < rd_list_cnt(replicas); ri++) {
+ rd_kafka_buf_write_i32(
+ rkbuf,
+ rd_list_get_int32(replicas, ri));
+ }
+ }
+ }
+ }
+
+ /* timeout */
+ op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
+ rd_kafka_buf_write_i32(rkbuf, op_timeout);
+
+ if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
+ rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
+
+ /* validate_only */
+ rd_kafka_buf_write_i8(
+ rkbuf, rd_kafka_confval_get_int(&options->validate_only));
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief Construct and send AlterConfigsRequest to \p rkb
+ * with the configs (ConfigResource_t*) in \p configs, using
+ * \p options.
+ *
+ * The response (unparsed) will be enqueued on \p replyq
+ * for handling by \p resp_cb (with \p opaque passed).
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code and errstr will be
+ * updated with a human readable error string.
+ */
+rd_kafka_resp_err_t
+rd_kafka_AlterConfigsRequest(rd_kafka_broker_t *rkb,
+ const rd_list_t *configs /*(ConfigResource_t*)*/,
+ rd_kafka_AdminOptions_t *options,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ int i;
+ const rd_kafka_ConfigResource_t *config;
+ int op_timeout;
+
+ if (rd_list_cnt(configs) == 0) {
+ rd_snprintf(errstr, errstr_size,
+ "No config resources specified");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_AlterConfigs, 0, 1, NULL);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "AlterConfigs (KIP-133) not supported "
+ "by broker, requires broker version >= 0.11.0");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ /* Incremental requires IncrementalAlterConfigs */
+ if (rd_kafka_confval_get_int(&options->incremental)) {
+ rd_snprintf(errstr, errstr_size,
+ "AlterConfigs.incremental=true (KIP-248) "
+ "not supported by broker, "
+ "replaced by IncrementalAlterConfigs");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_AlterConfigs, 1,
+ rd_list_cnt(configs) * 200);
+
+ /* #resources */
+ rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs));
+
+ RD_LIST_FOREACH(config, configs, i) {
+ const rd_kafka_ConfigEntry_t *entry;
+ int ei;
+
+ /* resource_type */
+ rd_kafka_buf_write_i8(rkbuf, config->restype);
+
+ /* resource_name */
+ rd_kafka_buf_write_str(rkbuf, config->name, -1);
+
+ /* #config */
+ rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&config->config));
+
+ RD_LIST_FOREACH(entry, &config->config, ei) {
+ /* config_name */
+ rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1);
+ /* config_value (nullable) */
+ rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1);
+
+ if (entry->a.operation != RD_KAFKA_ALTER_OP_SET) {
+ rd_snprintf(errstr, errstr_size,
+ "IncrementalAlterConfigs required "
+ "for add/delete config "
+ "entries: only set supported "
+ "by this operation");
+ rd_kafka_buf_destroy(rkbuf);
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+ }
+ }
+
+ /* timeout */
+ op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
+ if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
+ rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
+
+ /* validate_only */
+ rd_kafka_buf_write_i8(
+ rkbuf, rd_kafka_confval_get_int(&options->validate_only));
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief Construct and send DescribeConfigsRequest to \p rkb
+ * with the configs (ConfigResource_t*) in \p configs, using
+ * \p options.
+ *
+ * The response (unparsed) will be enqueued on \p replyq
+ * for handling by \p resp_cb (with \p opaque passed).
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code and errstr will be
+ * updated with a human readable error string.
+ */
+rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest(
+ rd_kafka_broker_t *rkb,
+ const rd_list_t *configs /*(ConfigResource_t*)*/,
+ rd_kafka_AdminOptions_t *options,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ int i;
+ const rd_kafka_ConfigResource_t *config;
+ int op_timeout;
+
+ if (rd_list_cnt(configs) == 0) {
+ rd_snprintf(errstr, errstr_size,
+ "No config resources specified");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_DescribeConfigs, 0, 1, NULL);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "DescribeConfigs (KIP-133) not supported "
+ "by broker, requires broker version >= 0.11.0");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeConfigs, 1,
+ rd_list_cnt(configs) * 200);
+
+ /* #resources */
+ rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs));
+
+ RD_LIST_FOREACH(config, configs, i) {
+ const rd_kafka_ConfigEntry_t *entry;
+ int ei;
+
+ /* resource_type */
+ rd_kafka_buf_write_i8(rkbuf, config->restype);
+
+ /* resource_name */
+ rd_kafka_buf_write_str(rkbuf, config->name, -1);
+
+ /* #config */
+ if (rd_list_empty(&config->config)) {
+ /* Get all configs */
+ rd_kafka_buf_write_i32(rkbuf, -1);
+ } else {
+ /* Get requested configs only */
+ rd_kafka_buf_write_i32(rkbuf,
+ rd_list_cnt(&config->config));
+ }
+
+ RD_LIST_FOREACH(entry, &config->config, ei) {
+ /* config_name */
+ rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1);
+ }
+ }
+
+
+ if (ApiVersion == 1) {
+ /* include_synonyms */
+ rd_kafka_buf_write_i8(rkbuf, 1);
+ }
+
+ /* timeout */
+ op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
+ if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
+ rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief Construct and send DeleteGroupsRequest to \p rkb
+ * with the groups (DeleteGroup_t *) in \p del_groups, using
+ * \p options.
+ *
+ * The response (unparsed) will be enqueued on \p replyq
+ * for handling by \p resp_cb (with \p opaque passed).
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code and errstr will be
+ * updated with a human readable error string.
+ */
+rd_kafka_resp_err_t
+rd_kafka_DeleteGroupsRequest(rd_kafka_broker_t *rkb,
+ const rd_list_t *del_groups /*(DeleteGroup_t*)*/,
+ rd_kafka_AdminOptions_t *options,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ int features;
+ int i = 0;
+ rd_kafka_DeleteGroup_t *delt;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_DeleteGroups, 0, 1, &features);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "DeleteGroups Admin API (KIP-229) not supported "
+ "by broker, requires broker version >= 1.1.0");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ rkbuf =
+ rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteGroups, 1,
+ 4 + (rd_list_cnt(del_groups) * 100) + 4);
+
+ /* #groups */
+ rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(del_groups));
+
+ while ((delt = rd_list_elem(del_groups, i++)))
+ rd_kafka_buf_write_str(rkbuf, delt->group, -1);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+/**
+ * @brief Returns the request size needed to send a specific AclBinding
+ * specified in \p acl, using the ApiVersion provided in
+ * \p ApiVersion.
+ *
+ * @returns and int16_t with the request size in bytes.
+ */
+static RD_INLINE size_t
+rd_kafka_AclBinding_request_size(const rd_kafka_AclBinding_t *acl,
+ int ApiVersion) {
+ return 1 + 2 + (acl->name ? strlen(acl->name) : 0) + 2 +
+ (acl->principal ? strlen(acl->principal) : 0) + 2 +
+ (acl->host ? strlen(acl->host) : 0) + 1 + 1 +
+ (ApiVersion > 0 ? 1 : 0);
+}
+
+/**
+ * @brief Construct and send CreateAclsRequest to \p rkb
+ * with the acls (AclBinding_t*) in \p new_acls, using
+ * \p options.
+ *
+ * The response (unparsed) will be enqueued on \p replyq
+ * for handling by \p resp_cb (with \p opaque passed).
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code and errstr will be
+ * updated with a human readable error string.
+ */
+rd_kafka_resp_err_t
+rd_kafka_CreateAclsRequest(rd_kafka_broker_t *rkb,
+ const rd_list_t *new_acls /*(AclBinding_t*)*/,
+ rd_kafka_AdminOptions_t *options,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion;
+ int i;
+ size_t len;
+ int op_timeout;
+ rd_kafka_AclBinding_t *new_acl;
+
+ if (rd_list_cnt(new_acls) == 0) {
+ rd_snprintf(errstr, errstr_size, "No acls to create");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_CreateAcls, 0, 1, NULL);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "ACLs Admin API (KIP-140) not supported "
+ "by broker, requires broker version >= 0.11.0.0");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ if (ApiVersion == 0) {
+ RD_LIST_FOREACH(new_acl, new_acls, i) {
+ if (new_acl->resource_pattern_type !=
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL) {
+ rd_snprintf(errstr, errstr_size,
+ "Broker only supports LITERAL "
+ "resource pattern types");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+ }
+ } else {
+ RD_LIST_FOREACH(new_acl, new_acls, i) {
+ if (new_acl->resource_pattern_type !=
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL &&
+ new_acl->resource_pattern_type !=
+ RD_KAFKA_RESOURCE_PATTERN_PREFIXED) {
+ rd_snprintf(errstr, errstr_size,
+ "Only LITERAL and PREFIXED "
+ "resource patterns are supported "
+ "when creating ACLs");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+ }
+ }
+
+ len = 4;
+ RD_LIST_FOREACH(new_acl, new_acls, i) {
+ len += rd_kafka_AclBinding_request_size(new_acl, ApiVersion);
+ }
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreateAcls, 1, len);
+
+ /* #acls */
+ rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_acls));
+
+ RD_LIST_FOREACH(new_acl, new_acls, i) {
+ rd_kafka_buf_write_i8(rkbuf, new_acl->restype);
+
+ rd_kafka_buf_write_str(rkbuf, new_acl->name, -1);
+
+ if (ApiVersion >= 1) {
+ rd_kafka_buf_write_i8(rkbuf,
+ new_acl->resource_pattern_type);
+ }
+
+ rd_kafka_buf_write_str(rkbuf, new_acl->principal, -1);
+
+ rd_kafka_buf_write_str(rkbuf, new_acl->host, -1);
+
+ rd_kafka_buf_write_i8(rkbuf, new_acl->operation);
+
+ rd_kafka_buf_write_i8(rkbuf, new_acl->permission_type);
+ }
+
+ /* timeout */
+ op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
+ if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
+ rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+/**
+ * @brief Construct and send DescribeAclsRequest to \p rkb
+ * with the acls (AclBinding_t*) in \p acls, using
+ * \p options.
+ *
+ * The response (unparsed) will be enqueued on \p replyq
+ * for handling by \p resp_cb (with \p opaque passed).
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code and errstr will be
+ * updated with a human readable error string.
+ */
+rd_kafka_resp_err_t rd_kafka_DescribeAclsRequest(
+ rd_kafka_broker_t *rkb,
+ const rd_list_t *acls /*(rd_kafka_AclBindingFilter_t*)*/,
+ rd_kafka_AdminOptions_t *options,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ const rd_kafka_AclBindingFilter_t *acl;
+ int op_timeout;
+
+ if (rd_list_cnt(acls) == 0) {
+ rd_snprintf(errstr, errstr_size,
+ "No acl binding filters specified");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+ if (rd_list_cnt(acls) > 1) {
+ rd_snprintf(errstr, errstr_size,
+ "Too many acl binding filters specified");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ acl = rd_list_elem(acls, 0);
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_DescribeAcls, 0, 1, NULL);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "ACLs Admin API (KIP-140) not supported "
+ "by broker, requires broker version >= 0.11.0.0");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ if (ApiVersion == 0) {
+ if (acl->resource_pattern_type !=
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL &&
+ acl->resource_pattern_type !=
+ RD_KAFKA_RESOURCE_PATTERN_ANY) {
+ rd_snprintf(errstr, errstr_size,
+ "Broker only supports LITERAL and ANY "
+ "resource pattern types");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+ } else {
+ if (acl->resource_pattern_type ==
+ RD_KAFKA_RESOURCE_PATTERN_UNKNOWN) {
+ rd_snprintf(errstr, errstr_size,
+ "Filter contains UNKNOWN elements");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+ }
+
+ rkbuf = rd_kafka_buf_new_request(
+ rkb, RD_KAFKAP_DescribeAcls, 1,
+ rd_kafka_AclBinding_request_size(acl, ApiVersion));
+
+ /* resource_type */
+ rd_kafka_buf_write_i8(rkbuf, acl->restype);
+
+ /* resource_name filter */
+ rd_kafka_buf_write_str(rkbuf, acl->name, -1);
+
+ if (ApiVersion > 0) {
+ /* resource_pattern_type (rd_kafka_ResourcePatternType_t) */
+ rd_kafka_buf_write_i8(rkbuf, acl->resource_pattern_type);
+ }
+
+ /* principal filter */
+ rd_kafka_buf_write_str(rkbuf, acl->principal, -1);
+
+ /* host filter */
+ rd_kafka_buf_write_str(rkbuf, acl->host, -1);
+
+ /* operation (rd_kafka_AclOperation_t) */
+ rd_kafka_buf_write_i8(rkbuf, acl->operation);
+
+ /* permission type (rd_kafka_AclPermissionType_t) */
+ rd_kafka_buf_write_i8(rkbuf, acl->permission_type);
+
+ /* timeout */
+ op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
+ if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
+ rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+/**
+ * @brief Construct and send DeleteAclsRequest to \p rkb
+ * with the acl filters (AclBindingFilter_t*) in \p del_acls, using
+ * \p options.
+ *
+ * The response (unparsed) will be enqueued on \p replyq
+ * for handling by \p resp_cb (with \p opaque passed).
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code and errstr will be
+ * updated with a human readable error string.
+ */
+rd_kafka_resp_err_t
+rd_kafka_DeleteAclsRequest(rd_kafka_broker_t *rkb,
+ const rd_list_t *del_acls /*(AclBindingFilter_t*)*/,
+ rd_kafka_AdminOptions_t *options,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ const rd_kafka_AclBindingFilter_t *acl;
+ int op_timeout;
+ int i;
+ size_t len;
+
+ if (rd_list_cnt(del_acls) == 0) {
+ rd_snprintf(errstr, errstr_size,
+ "No acl binding filters specified");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_DeleteAcls, 0, 1, NULL);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "ACLs Admin API (KIP-140) not supported "
+ "by broker, requires broker version >= 0.11.0.0");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ len = 4;
+
+ RD_LIST_FOREACH(acl, del_acls, i) {
+ if (ApiVersion == 0) {
+ if (acl->resource_pattern_type !=
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL &&
+ acl->resource_pattern_type !=
+ RD_KAFKA_RESOURCE_PATTERN_ANY) {
+ rd_snprintf(errstr, errstr_size,
+ "Broker only supports LITERAL "
+ "and ANY resource pattern types");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+ } else {
+ if (acl->resource_pattern_type ==
+ RD_KAFKA_RESOURCE_PATTERN_UNKNOWN) {
+ rd_snprintf(errstr, errstr_size,
+ "Filter contains UNKNOWN elements");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+ }
+
+ len += rd_kafka_AclBinding_request_size(acl, ApiVersion);
+ }
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteAcls, 1, len);
+
+ /* #acls */
+ rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(del_acls));
+
+ RD_LIST_FOREACH(acl, del_acls, i) {
+ /* resource_type */
+ rd_kafka_buf_write_i8(rkbuf, acl->restype);
+
+ /* resource_name filter */
+ rd_kafka_buf_write_str(rkbuf, acl->name, -1);
+
+ if (ApiVersion > 0) {
+ /* resource_pattern_type
+ * (rd_kafka_ResourcePatternType_t) */
+ rd_kafka_buf_write_i8(rkbuf,
+ acl->resource_pattern_type);
+ }
+
+ /* principal filter */
+ rd_kafka_buf_write_str(rkbuf, acl->principal, -1);
+
+ /* host filter */
+ rd_kafka_buf_write_str(rkbuf, acl->host, -1);
+
+ /* operation (rd_kafka_AclOperation_t) */
+ rd_kafka_buf_write_i8(rkbuf, acl->operation);
+
+ /* permission type (rd_kafka_AclPermissionType_t) */
+ rd_kafka_buf_write_i8(rkbuf, acl->permission_type);
+ }
+
+ /* timeout */
+ op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
+ if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
+ rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+/**
+ * @brief Parses and handles an InitProducerId reply.
+ *
+ * @locality rdkafka main thread
+ * @locks none
+ */
+void rd_kafka_handle_InitProducerId(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ const int log_decode_errors = LOG_ERR;
+ int16_t error_code;
+ rd_kafka_pid_t pid;
+
+ if (err)
+ goto err;
+
+ rd_kafka_buf_read_throttle_time(rkbuf);
+
+ rd_kafka_buf_read_i16(rkbuf, &error_code);
+ if ((err = error_code))
+ goto err;
+
+ rd_kafka_buf_read_i64(rkbuf, &pid.id);
+ rd_kafka_buf_read_i16(rkbuf, &pid.epoch);
+
+ rd_kafka_idemp_pid_update(rkb, pid);
+
+ return;
+
+err_parse:
+ err = rkbuf->rkbuf_err;
+err:
+ if (err == RD_KAFKA_RESP_ERR__DESTROY)
+ return;
+
+ /* Retries are performed by idempotence state handler */
+ rd_kafka_idemp_request_pid_failed(rkb, err);
+}
+
+/**
+ * @brief Construct and send InitProducerIdRequest to \p rkb.
+ *
+ * @param transactional_id may be NULL.
+ * @param transaction_timeout_ms may be set to -1.
+ * @param current_pid the current PID to reset, requires KIP-360. If not NULL
+ * and KIP-360 is not supported by the broker this function
+ * will return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE.
+ *
+ * The response (unparsed) will be handled by \p resp_cb served
+ * by queue \p replyq.
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code and errstr will be
+ * updated with a human readable error string.
+ */
+rd_kafka_resp_err_t
+rd_kafka_InitProducerIdRequest(rd_kafka_broker_t *rkb,
+ const char *transactional_id,
+ int transaction_timeout_ms,
+ const rd_kafka_pid_t *current_pid,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion;
+
+ if (current_pid) {
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_InitProducerId, 3, 4, NULL);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "InitProducerId (KIP-360) not supported by "
+ "broker, requires broker version >= 2.5.0: "
+ "unable to recover from previous "
+ "transactional error");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+ } else {
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_InitProducerId, 0, 4, NULL);
+
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "InitProducerId (KIP-98) not supported by "
+ "broker, requires broker "
+ "version >= 0.11.0");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+ }
+
+ rkbuf = rd_kafka_buf_new_flexver_request(
+ rkb, RD_KAFKAP_InitProducerId, 1,
+ 2 + (transactional_id ? strlen(transactional_id) : 0) + 4 + 8 + 4,
+ ApiVersion >= 2 /*flexver*/);
+
+ /* transactional_id */
+ rd_kafka_buf_write_str(rkbuf, transactional_id, -1);
+
+ /* transaction_timeout_ms */
+ rd_kafka_buf_write_i32(rkbuf, transaction_timeout_ms);
+
+ if (ApiVersion >= 3) {
+ /* Current PID */
+ rd_kafka_buf_write_i64(rkbuf,
+ current_pid ? current_pid->id : -1);
+ /* Current Epoch */
+ rd_kafka_buf_write_i16(rkbuf,
+ current_pid ? current_pid->epoch : -1);
+ }
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ /* Let the idempotence state handler perform retries */
+ rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief Construct and send AddPartitionsToTxnRequest to \p rkb.
+ *
+ * The response (unparsed) will be handled by \p resp_cb served
+ * by queue \p replyq.
+ *
+ * @param rktps MUST be sorted by topic name.
+ *
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code.
+ */
+rd_kafka_resp_err_t
+rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb,
+ const char *transactional_id,
+ rd_kafka_pid_t pid,
+ const rd_kafka_toppar_tqhead_t *rktps,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+ rd_kafka_toppar_t *rktp;
+ rd_kafka_topic_t *last_rkt = NULL;
+ size_t of_TopicCnt;
+ ssize_t of_PartCnt = -1;
+ int TopicCnt = 0, PartCnt = 0;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_AddPartitionsToTxn, 0, 0, NULL);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "AddPartitionsToTxnRequest (KIP-98) not supported "
+ "by broker, requires broker version >= 0.11.0");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ rkbuf =
+ rd_kafka_buf_new_request(rkb, RD_KAFKAP_AddPartitionsToTxn, 1, 500);
+
+ /* transactional_id */
+ rd_kafka_buf_write_str(rkbuf, transactional_id, -1);
+
+ /* PID */
+ rd_kafka_buf_write_i64(rkbuf, pid.id);
+ rd_kafka_buf_write_i16(rkbuf, pid.epoch);
+
+ /* Topics/partitions array (count updated later) */
+ of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+
+ TAILQ_FOREACH(rktp, rktps, rktp_txnlink) {
+ if (last_rkt != rktp->rktp_rkt) {
+
+ if (last_rkt) {
+ /* Update last topic's partition count field */
+ rd_kafka_buf_update_i32(rkbuf, of_PartCnt,
+ PartCnt);
+ of_PartCnt = -1;
+ }
+
+ /* Topic name */
+ rd_kafka_buf_write_kstr(rkbuf,
+ rktp->rktp_rkt->rkt_topic);
+ /* Partition count, updated later */
+ of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
+
+ PartCnt = 0;
+ TopicCnt++;
+ last_rkt = rktp->rktp_rkt;
+ }
+
+ /* Partition id */
+ rd_kafka_buf_write_i32(rkbuf, rktp->rktp_partition);
+ PartCnt++;
+ }
+
+ /* Update last partition and topic count fields */
+ if (of_PartCnt != -1)
+ rd_kafka_buf_update_i32(rkbuf, (size_t)of_PartCnt, PartCnt);
+ rd_kafka_buf_update_i32(rkbuf, of_TopicCnt, TopicCnt);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ /* Let the handler perform retries so that it can pick
+ * up more added partitions. */
+ rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * @brief Construct and send AddOffsetsToTxnRequest to \p rkb.
+ *
+ * The response (unparsed) will be handled by \p resp_cb served
+ * by queue \p replyq.
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code.
+ */
+rd_kafka_resp_err_t
+rd_kafka_AddOffsetsToTxnRequest(rd_kafka_broker_t *rkb,
+ const char *transactional_id,
+ rd_kafka_pid_t pid,
+ const char *group_id,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(
+ rkb, RD_KAFKAP_AddOffsetsToTxn, 0, 0, NULL);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "AddOffsetsToTxnRequest (KIP-98) not supported "
+ "by broker, requires broker version >= 0.11.0");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ rkbuf =
+ rd_kafka_buf_new_request(rkb, RD_KAFKAP_AddOffsetsToTxn, 1, 100);
+
+ /* transactional_id */
+ rd_kafka_buf_write_str(rkbuf, transactional_id, -1);
+
+ /* PID */
+ rd_kafka_buf_write_i64(rkbuf, pid.id);
+ rd_kafka_buf_write_i16(rkbuf, pid.epoch);
+
+ /* Group Id */
+ rd_kafka_buf_write_str(rkbuf, group_id, -1);
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES;
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+/**
+ * @brief Construct and send EndTxnRequest to \p rkb.
+ *
+ * The response (unparsed) will be handled by \p resp_cb served
+ * by queue \p replyq.
+ *
+ * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
+ * transmission, otherwise an error code.
+ */
+rd_kafka_resp_err_t rd_kafka_EndTxnRequest(rd_kafka_broker_t *rkb,
+ const char *transactional_id,
+ rd_kafka_pid_t pid,
+ rd_bool_t committed,
+ char *errstr,
+ size_t errstr_size,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_resp_cb_t *resp_cb,
+ void *opaque) {
+ rd_kafka_buf_t *rkbuf;
+ int16_t ApiVersion = 0;
+
+ ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, RD_KAFKAP_EndTxn,
+ 0, 1, NULL);
+ if (ApiVersion == -1) {
+ rd_snprintf(errstr, errstr_size,
+ "EndTxnRequest (KIP-98) not supported "
+ "by broker, requires broker version >= 0.11.0");
+ rd_kafka_replyq_destroy(&replyq);
+ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
+ }
+
+ rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_EndTxn, 1, 500);
+
+ /* transactional_id */
+ rd_kafka_buf_write_str(rkbuf, transactional_id, -1);
+
+ /* PID */
+ rd_kafka_buf_write_i64(rkbuf, pid.id);
+ rd_kafka_buf_write_i16(rkbuf, pid.epoch);
+
+ /* Committed */
+ rd_kafka_buf_write_bool(rkbuf, committed);
+ rkbuf->rkbuf_u.EndTxn.commit = committed;
+
+ rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
+
+ rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES;
+
+ rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+/**
+ * @name Unit tests
+ * @{
+ *
+ *
+ *
+ *
+ */
+
+/**
+ * @brief Create \p cnt messages, starting at \p msgid, and add them
+ * to \p rkmq.
+ *
+ * @returns the number of messages added.
+ */
+static int ut_create_msgs(rd_kafka_msgq_t *rkmq, uint64_t msgid, int cnt) {
+ int i;
+
+ for (i = 0; i < cnt; i++) {
+ rd_kafka_msg_t *rkm;
+
+ rkm = ut_rd_kafka_msg_new(0);
+ rkm->rkm_u.producer.msgid = msgid++;
+ rkm->rkm_ts_enq = rd_clock();
+ rkm->rkm_ts_timeout = rkm->rkm_ts_enq + (900 * 1000 * 1000);
+
+ rd_kafka_msgq_enq(rkmq, rkm);
+ }
+
+ return cnt;
+}
+
+/**
+ * @brief Idempotent Producer request/response unit tests
+ *
+ * The current test verifies proper handling of the following case:
+ * Batch 0 succeeds
+ * Batch 1 fails with temporary error
+ * Batch 2,3 fails with out of order sequence
+ * Retry Batch 1-3 should succeed.
+ */
+static int unittest_idempotent_producer(void) {
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ rd_kafka_broker_t *rkb;
+#define _BATCH_CNT 4
+#define _MSGS_PER_BATCH 3
+ const int msgcnt = _BATCH_CNT * _MSGS_PER_BATCH;
+ int remaining_batches;
+ uint64_t msgid = 1;
+ rd_kafka_toppar_t *rktp;
+ rd_kafka_pid_t pid = {.id = 1000, .epoch = 0};
+ struct rd_kafka_Produce_result result = {.offset = 1,
+ .timestamp = 1000};
+ rd_kafka_queue_t *rkqu;
+ rd_kafka_event_t *rkev;
+ rd_kafka_buf_t *request[_BATCH_CNT];
+ int rcnt = 0;
+ int retry_msg_cnt = 0;
+ int drcnt = 0;
+ rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq);
+ const char *tmp;
+ int i, r;
+
+ RD_UT_SAY("Verifying idempotent producer error handling");
+
+ conf = rd_kafka_conf_new();
+ rd_kafka_conf_set(conf, "batch.num.messages", "3", NULL, 0);
+ rd_kafka_conf_set(conf, "retry.backoff.ms", "1", NULL, 0);
+ if ((tmp = rd_getenv("TEST_DEBUG", NULL)))
+ rd_kafka_conf_set(conf, "debug", tmp, NULL, 0);
+ if (rd_kafka_conf_set(conf, "enable.idempotence", "true", NULL, 0) !=
+ RD_KAFKA_CONF_OK)
+ RD_UT_FAIL("Failed to enable idempotence");
+ rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_DR);
+
+ rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
+ RD_UT_ASSERT(rk, "failed to create producer");
+
+ rkqu = rd_kafka_queue_get_main(rk);
+
+ /* We need a broker handle, use a logical broker to avoid
+ * any connection attempts. */
+ rkb = rd_kafka_broker_add_logical(rk, "unittest");
+
+ /* Have the broker support everything so msgset_writer selects
+ * the most up-to-date output features. */
+ rd_kafka_broker_lock(rkb);
+ rkb->rkb_features = RD_KAFKA_FEATURE_UNITTEST | RD_KAFKA_FEATURE_ALL;
+ rd_kafka_broker_unlock(rkb);
+
+ /* Get toppar */
+ rktp = rd_kafka_toppar_get2(rk, "uttopic", 0, rd_false, rd_true);
+ RD_UT_ASSERT(rktp, "failed to get toppar");
+
+ /* Set the topic as exists so messages are enqueued on
+ * the desired rktp away (otherwise UA partition) */
+ rd_ut_kafka_topic_set_topic_exists(rktp->rktp_rkt, 1, -1);
+
+ /* Produce messages */
+ ut_create_msgs(&rkmq, 1, msgcnt);
+
+ /* Set the pid */
+ rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_WAIT_PID);
+ rd_kafka_idemp_pid_update(rkb, pid);
+ pid = rd_kafka_idemp_get_pid(rk);
+ RD_UT_ASSERT(rd_kafka_pid_valid(pid), "PID is invalid");
+ rd_kafka_toppar_pid_change(rktp, pid, msgid);
+
+ remaining_batches = _BATCH_CNT;
+
+ /* Create a ProduceRequest for each batch */
+ for (rcnt = 0; rcnt < remaining_batches; rcnt++) {
+ size_t msize;
+ request[rcnt] = rd_kafka_msgset_create_ProduceRequest(
+ rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), 0, &msize);
+ RD_UT_ASSERT(request[rcnt], "request #%d failed", rcnt);
+ }
+
+ RD_UT_ASSERT(rd_kafka_msgq_len(&rkmq) == 0,
+ "expected input message queue to be empty, "
+ "but still has %d message(s)",
+ rd_kafka_msgq_len(&rkmq));
+
+ /*
+ * Mock handling of each request
+ */
+
+ /* Batch 0: accepted */
+ i = 0;
+ r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
+ RD_UT_ASSERT(r == _MSGS_PER_BATCH, ".");
+ rd_kafka_msgbatch_handle_Produce_result(rkb, &request[i]->rkbuf_batch,
+ RD_KAFKA_RESP_ERR_NO_ERROR,
+ &result, request[i]);
+ result.offset += r;
+ RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == 0,
+ "batch %d: expected no messages in rktp_msgq, not %d", i,
+ rd_kafka_msgq_len(&rktp->rktp_msgq));
+ rd_kafka_buf_destroy(request[i]);
+ remaining_batches--;
+
+ /* Batch 1: fail, triggering retry (re-enq on rktp_msgq) */
+ i = 1;
+ r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
+ RD_UT_ASSERT(r == _MSGS_PER_BATCH, ".");
+ rd_kafka_msgbatch_handle_Produce_result(
+ rkb, &request[i]->rkbuf_batch,
+ RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, &result, request[i]);
+ retry_msg_cnt += r;
+ RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt,
+ "batch %d: expected %d messages in rktp_msgq, not %d", i,
+ retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq));
+ rd_kafka_buf_destroy(request[i]);
+
+ /* Batch 2: OUT_OF_ORDER, triggering retry .. */
+ i = 2;
+ r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
+ RD_UT_ASSERT(r == _MSGS_PER_BATCH, ".");
+ rd_kafka_msgbatch_handle_Produce_result(
+ rkb, &request[i]->rkbuf_batch,
+ RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, &result,
+ request[i]);
+ retry_msg_cnt += r;
+ RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt,
+ "batch %d: expected %d messages in rktp_xmit_msgq, not %d",
+ i, retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq));
+ rd_kafka_buf_destroy(request[i]);
+
+ /* Batch 3: OUT_OF_ORDER, triggering retry .. */
+ i = 3;
+ r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
+ rd_kafka_msgbatch_handle_Produce_result(
+ rkb, &request[i]->rkbuf_batch,
+ RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, &result,
+ request[i]);
+ retry_msg_cnt += r;
+ RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt,
+ "batch %d: expected %d messages in rktp_xmit_msgq, not %d",
+ i, retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq));
+ rd_kafka_buf_destroy(request[i]);
+
+
+ /* Retried messages will have been moved to rktp_msgq,
+ * move them back to our local queue. */
+ rd_kafka_toppar_lock(rktp);
+ rd_kafka_msgq_move(&rkmq, &rktp->rktp_msgq);
+ rd_kafka_toppar_unlock(rktp);
+
+ RD_UT_ASSERT(rd_kafka_msgq_len(&rkmq) == retry_msg_cnt,
+ "Expected %d messages in retry queue, not %d",
+ retry_msg_cnt, rd_kafka_msgq_len(&rkmq));
+
+ /* Sleep a short while to make sure the retry backoff expires. */
+ rd_usleep(5 * 1000, NULL); /* 5ms */
+
+ /*
+ * Create requests for remaining batches.
+ */
+ for (rcnt = 0; rcnt < remaining_batches; rcnt++) {
+ size_t msize;
+ request[rcnt] = rd_kafka_msgset_create_ProduceRequest(
+ rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), 0, &msize);
+ RD_UT_ASSERT(request[rcnt],
+ "Failed to create retry #%d (%d msgs in queue)",
+ rcnt, rd_kafka_msgq_len(&rkmq));
+ }
+
+ /*
+ * Mock handling of each request, they will now succeed.
+ */
+ for (i = 0; i < rcnt; i++) {
+ r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
+ rd_kafka_msgbatch_handle_Produce_result(
+ rkb, &request[i]->rkbuf_batch, RD_KAFKA_RESP_ERR_NO_ERROR,
+ &result, request[i]);
+ result.offset += r;
+ rd_kafka_buf_destroy(request[i]);
+ }
+
+ retry_msg_cnt = 0;
+ RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt,
+ "batch %d: expected %d messages in rktp_xmit_msgq, not %d",
+ i, retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq));
+
+ /*
+ * Wait for delivery reports, they should all be successful.
+ */
+ while ((rkev = rd_kafka_queue_poll(rkqu, 1000))) {
+ const rd_kafka_message_t *rkmessage;
+
+ RD_UT_SAY("Got %s event with %d message(s)",
+ rd_kafka_event_name(rkev),
+ (int)rd_kafka_event_message_count(rkev));
+
+ while ((rkmessage = rd_kafka_event_message_next(rkev))) {
+ RD_UT_SAY(" DR for message: %s: (persistence=%d)",
+ rd_kafka_err2str(rkmessage->err),
+ rd_kafka_message_status(rkmessage));
+ if (rkmessage->err)
+ RD_UT_WARN(" ^ Should not have failed");
+ else
+ drcnt++;
+ }
+ rd_kafka_event_destroy(rkev);
+ }
+
+ /* Should be no more messages in queues */
+ r = rd_kafka_outq_len(rk);
+ RD_UT_ASSERT(r == 0, "expected outq to return 0, not %d", r);
+
+ /* Verify the expected number of good delivery reports were seen */
+ RD_UT_ASSERT(drcnt == msgcnt, "expected %d DRs, not %d", msgcnt, drcnt);
+
+ rd_kafka_queue_destroy(rkqu);
+ rd_kafka_toppar_destroy(rktp);
+ rd_kafka_broker_destroy(rkb);
+ rd_kafka_destroy(rk);
+
+ RD_UT_PASS();
+ return 0;
+}
+
+/**
+ * @brief Request/response unit tests
+ */
+int unittest_request(void) {
+ int fails = 0;
+
+ fails += unittest_idempotent_producer();
+
+ return fails;
+}
+
+/**@}*/