summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c5378
1 files changed, 0 insertions, 5378 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c
deleted file mode 100644
index 12d9eb30..00000000
--- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c
+++ /dev/null
@@ -1,5378 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include <stdarg.h>
-
-#include "rdkafka_int.h"
-#include "rdkafka_request.h"
-#include "rdkafka_broker.h"
-#include "rdkafka_offset.h"
-#include "rdkafka_topic.h"
-#include "rdkafka_partition.h"
-#include "rdkafka_metadata.h"
-#include "rdkafka_msgset.h"
-#include "rdkafka_idempotence.h"
-#include "rdkafka_txnmgr.h"
-#include "rdkafka_sasl.h"
-
-#include "rdrand.h"
-#include "rdstring.h"
-#include "rdunittest.h"
-
-
-/**
- * Kafka protocol request and response handling.
- * All of this code runs in the broker thread and uses op queues for
- * propagating results back to the various sub-systems operating in
- * other threads.
- */
-
-
-/* RD_KAFKA_ERR_ACTION_.. to string map */
-static const char *rd_kafka_actions_descs[] = {
- "Permanent", "Ignore", "Refresh", "Retry",
- "Inform", "Special", "MsgNotPersisted", "MsgPossiblyPersisted",
- "MsgPersisted", NULL,
-};
-
-const char *rd_kafka_actions2str(int actions) {
- static RD_TLS char actstr[128];
- return rd_flags2str(actstr, sizeof(actstr), rd_kafka_actions_descs,
- actions);
-}
-
-
-/**
- * @brief Decide action(s) to take based on the returned error code.
- *
- * The optional var-args is a .._ACTION_END terminated list
- * of action,error tuples which overrides the general behaviour.
- * It is to be read as: for \p error, return \p action(s).
- *
- * @warning \p request, \p rkbuf and \p rkb may be NULL.
- */
-int rd_kafka_err_action(rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- const rd_kafka_buf_t *request,
- ...) {
- va_list ap;
- int actions = 0;
- int exp_act;
-
- if (!err)
- return 0;
-
- /* Match explicitly defined error mappings first. */
- va_start(ap, request);
- while ((exp_act = va_arg(ap, int))) {
- int exp_err = va_arg(ap, int);
-
- if (err == exp_err)
- actions |= exp_act;
- }
- va_end(ap);
-
- /* Explicit error match. */
- if (actions) {
- if (err && rkb && request)
- rd_rkb_dbg(
- rkb, BROKER, "REQERR",
- "%sRequest failed: %s: explicit actions %s",
- rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey),
- rd_kafka_err2str(err),
- rd_kafka_actions2str(actions));
-
- return actions;
- }
-
- /* Default error matching */
- switch (err) {
- case RD_KAFKA_RESP_ERR_NO_ERROR:
- break;
- case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE:
- case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION:
- case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE:
- case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE:
- case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
- case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
- case RD_KAFKA_RESP_ERR__WAIT_COORD:
- /* Request metadata information update */
- actions |= RD_KAFKA_ERR_ACTION_REFRESH |
- RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED;
- break;
-
- case RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR:
- /* Request metadata update and retry */
- actions |= RD_KAFKA_ERR_ACTION_REFRESH |
- RD_KAFKA_ERR_ACTION_RETRY |
- RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED;
- break;
-
- case RD_KAFKA_RESP_ERR__TRANSPORT:
- case RD_KAFKA_RESP_ERR__TIMED_OUT:
- case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT:
- case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND:
- actions |= RD_KAFKA_ERR_ACTION_RETRY |
- RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED;
- break;
-
- case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS:
- /* Client-side wait-response/in-queue timeout */
- case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE:
- actions |= RD_KAFKA_ERR_ACTION_RETRY |
- RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED;
- break;
-
- case RD_KAFKA_RESP_ERR__PURGE_INFLIGHT:
- actions |= RD_KAFKA_ERR_ACTION_PERMANENT |
- RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED;
- break;
-
- case RD_KAFKA_RESP_ERR__BAD_MSG:
- /* Buffer parse failures are typically a client-side bug,
- * treat them as permanent failures. */
- actions |= RD_KAFKA_ERR_ACTION_PERMANENT |
- RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED;
- break;
-
- case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS:
- actions |= RD_KAFKA_ERR_ACTION_RETRY;
- break;
-
- case RD_KAFKA_RESP_ERR__DESTROY:
- case RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT:
- case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE:
- case RD_KAFKA_RESP_ERR__PURGE_QUEUE:
- default:
- actions |= RD_KAFKA_ERR_ACTION_PERMANENT |
- RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED;
- break;
- }
-
- /* Fatal or permanent errors are not retriable */
- if (actions &
- (RD_KAFKA_ERR_ACTION_FATAL | RD_KAFKA_ERR_ACTION_PERMANENT))
- actions &= ~RD_KAFKA_ERR_ACTION_RETRY;
-
- /* If no request buffer was specified, which might be the case
- * in certain error call chains, mask out the retry action. */
- if (!request)
- actions &= ~RD_KAFKA_ERR_ACTION_RETRY;
- else if (request->rkbuf_reqhdr.ApiKey != RD_KAFKAP_Produce)
- /* Mask out message-related bits for non-Produce requests */
- actions &= ~RD_KAFKA_ERR_ACTION_MSG_FLAGS;
-
- if (err && actions && rkb && request)
- rd_rkb_dbg(
- rkb, BROKER, "REQERR", "%sRequest failed: %s: actions %s",
- rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey),
- rd_kafka_err2str(err), rd_kafka_actions2str(actions));
-
- return actions;
-}
-
-
-/**
- * @brief Read a list of topic+partitions+extra from \p rkbuf.
- *
- * @param rkbuf buffer to read from
- * @param fields An array of fields to read from the buffer and set on
- * the rktpar object, in the specified order, must end
- * with RD_KAFKA_TOPIC_PARTITION_FIELD_END.
- *
- * @returns a newly allocated list on success, or NULL on parse error.
- */
-rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions(
- rd_kafka_buf_t *rkbuf,
- size_t estimated_part_cnt,
- const rd_kafka_topic_partition_field_t *fields) {
- const int log_decode_errors = LOG_ERR;
- int32_t TopicArrayCnt;
- rd_kafka_topic_partition_list_t *parts = NULL;
-
- rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX);
-
- parts = rd_kafka_topic_partition_list_new(
- RD_MAX(TopicArrayCnt * 4, (int)estimated_part_cnt));
-
- while (TopicArrayCnt-- > 0) {
- rd_kafkap_str_t kTopic;
- int32_t PartArrayCnt;
- char *topic;
-
- rd_kafka_buf_read_str(rkbuf, &kTopic);
- rd_kafka_buf_read_arraycnt(rkbuf, &PartArrayCnt,
- RD_KAFKAP_PARTITIONS_MAX);
-
- RD_KAFKAP_STR_DUPA(&topic, &kTopic);
-
- while (PartArrayCnt-- > 0) {
- int32_t Partition = -1, Epoch = -1234,
- CurrentLeaderEpoch = -1234;
- int64_t Offset = -1234;
- int16_t ErrorCode = 0;
- rd_kafka_topic_partition_t *rktpar;
- int fi;
-
- /*
- * Read requested fields
- */
- for (fi = 0;
- fields[fi] != RD_KAFKA_TOPIC_PARTITION_FIELD_END;
- fi++) {
- switch (fields[fi]) {
- case RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION:
- rd_kafka_buf_read_i32(rkbuf,
- &Partition);
- break;
- case RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET:
- rd_kafka_buf_read_i64(rkbuf, &Offset);
- break;
- case RD_KAFKA_TOPIC_PARTITION_FIELD_CURRENT_EPOCH:
- rd_kafka_buf_read_i32(
- rkbuf, &CurrentLeaderEpoch);
- break;
- case RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH:
- rd_kafka_buf_read_i32(rkbuf, &Epoch);
- break;
- case RD_KAFKA_TOPIC_PARTITION_FIELD_ERR:
- rd_kafka_buf_read_i16(rkbuf,
- &ErrorCode);
- break;
- case RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA:
- rd_assert(!*"metadata not implemented");
- break;
- case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP:
- break;
- case RD_KAFKA_TOPIC_PARTITION_FIELD_END:
- break;
- }
- }
-
- rktpar = rd_kafka_topic_partition_list_add(parts, topic,
- Partition);
- /* Use dummy sentinel values that are unlikely to be
- * seen from the broker to know if we are to set these
- * fields or not. */
- if (Offset != -1234)
- rktpar->offset = Offset;
- if (Epoch != -1234)
- rd_kafka_topic_partition_set_leader_epoch(
- rktpar, Epoch);
- if (CurrentLeaderEpoch != -1234)
- rd_kafka_topic_partition_set_current_leader_epoch(
- rktpar, CurrentLeaderEpoch);
- rktpar->err = ErrorCode;
-
-
- rd_kafka_buf_skip_tags(rkbuf);
- }
-
- rd_kafka_buf_skip_tags(rkbuf);
- }
-
- return parts;
-
-err_parse:
- if (parts)
- rd_kafka_topic_partition_list_destroy(parts);
-
- return NULL;
-}
-
-
-/**
- * @brief Write a list of topic+partitions+offsets+extra to \p rkbuf
- *
- * @returns the number of partitions written to buffer.
- *
- * @remark The \p parts list MUST be sorted.
- */
-int rd_kafka_buf_write_topic_partitions(
- rd_kafka_buf_t *rkbuf,
- const rd_kafka_topic_partition_list_t *parts,
- rd_bool_t skip_invalid_offsets,
- rd_bool_t only_invalid_offsets,
- const rd_kafka_topic_partition_field_t *fields) {
- size_t of_TopicArrayCnt;
- size_t of_PartArrayCnt = 0;
- int TopicArrayCnt = 0, PartArrayCnt = 0;
- int i;
- const char *prev_topic = NULL;
- int cnt = 0;
-
- rd_assert(!only_invalid_offsets ||
- (only_invalid_offsets != skip_invalid_offsets));
-
- /* TopicArrayCnt */
- of_TopicArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf);
-
- for (i = 0; i < parts->cnt; i++) {
- const rd_kafka_topic_partition_t *rktpar = &parts->elems[i];
- int fi;
-
- if (rktpar->offset < 0) {
- if (skip_invalid_offsets)
- continue;
- } else if (only_invalid_offsets)
- continue;
-
- if (!prev_topic || strcmp(rktpar->topic, prev_topic)) {
- /* Finish previous topic, if any. */
- if (of_PartArrayCnt > 0) {
- rd_kafka_buf_finalize_arraycnt(
- rkbuf, of_PartArrayCnt, PartArrayCnt);
- /* Tags for previous topic struct */
- rd_kafka_buf_write_tags(rkbuf);
- }
-
-
- /* Topic */
- rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
- TopicArrayCnt++;
- prev_topic = rktpar->topic;
- /* New topic so reset partition count */
- PartArrayCnt = 0;
-
- /* PartitionArrayCnt: updated later */
- of_PartArrayCnt =
- rd_kafka_buf_write_arraycnt_pos(rkbuf);
- }
-
-
- /*
- * Write requested fields
- */
- for (fi = 0; fields[fi] != RD_KAFKA_TOPIC_PARTITION_FIELD_END;
- fi++) {
- switch (fields[fi]) {
- case RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION:
- rd_kafka_buf_write_i32(rkbuf,
- rktpar->partition);
- break;
- case RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET:
- rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
- break;
- case RD_KAFKA_TOPIC_PARTITION_FIELD_CURRENT_EPOCH:
- rd_kafka_buf_write_i32(
- rkbuf,
- rd_kafka_topic_partition_get_current_leader_epoch(
- rktpar));
- break;
- case RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH:
- rd_kafka_buf_write_i32(
- rkbuf,
- rd_kafka_topic_partition_get_leader_epoch(
- rktpar));
- break;
- case RD_KAFKA_TOPIC_PARTITION_FIELD_ERR:
- rd_kafka_buf_write_i16(rkbuf, rktpar->err);
- break;
- case RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA:
- /* Java client 0.9.0 and broker <0.10.0 can't
- * parse Null metadata fields, so as a
- * workaround we send an empty string if
- * it's Null. */
- if (!rktpar->metadata)
- rd_kafka_buf_write_str(rkbuf, "", 0);
- else
- rd_kafka_buf_write_str(
- rkbuf, rktpar->metadata,
- rktpar->metadata_size);
- break;
- case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP:
- break;
- case RD_KAFKA_TOPIC_PARTITION_FIELD_END:
- break;
- }
- }
-
-
- if (fi > 1)
- /* If there was more than one field written
- * then this was a struct and thus needs the
- * struct suffix tags written. */
- rd_kafka_buf_write_tags(rkbuf);
-
- PartArrayCnt++;
- cnt++;
- }
-
- if (of_PartArrayCnt > 0) {
- rd_kafka_buf_finalize_arraycnt(rkbuf, of_PartArrayCnt,
- PartArrayCnt);
- /* Tags for topic struct */
- rd_kafka_buf_write_tags(rkbuf);
- }
-
- rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicArrayCnt, TopicArrayCnt);
-
- return cnt;
-}
-
-
-/**
- * @brief Send FindCoordinatorRequest.
- *
- * @param coordkey is the group.id for RD_KAFKA_COORD_GROUP,
- * and the transactional.id for RD_KAFKA_COORD_TXN
- */
-rd_kafka_resp_err_t
-rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb,
- rd_kafka_coordtype_t coordtype,
- const char *coordkey,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_FindCoordinator, 0, 2, NULL);
-
- if (coordtype != RD_KAFKA_COORD_GROUP && ApiVersion < 1)
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_FindCoordinator, 1,
- 1 + 2 + strlen(coordkey));
-
- rd_kafka_buf_write_str(rkbuf, coordkey, -1);
-
- if (ApiVersion >= 1)
- rd_kafka_buf_write_i8(rkbuf, (int8_t)coordtype);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-/**
- * @brief Parses a ListOffsets reply.
- *
- * Returns the parsed offsets (and errors) in \p offsets which must have been
- * initialized by caller.
- *
- * @returns 0 on success, else an error (\p offsets may be completely or
- * partially updated, depending on the nature of the error, and per
- * partition error codes should be checked by the caller).
- */
-static rd_kafka_resp_err_t
-rd_kafka_parse_ListOffsets(rd_kafka_buf_t *rkbuf,
- rd_kafka_topic_partition_list_t *offsets) {
- const int log_decode_errors = LOG_ERR;
- int32_t TopicArrayCnt;
- int16_t api_version;
- rd_kafka_resp_err_t all_err = RD_KAFKA_RESP_ERR_NO_ERROR;
-
- api_version = rkbuf->rkbuf_reqhdr.ApiVersion;
-
- if (api_version >= 2)
- rd_kafka_buf_read_throttle_time(rkbuf);
-
- /* NOTE:
- * Broker may return offsets in a different constellation than
- * in the original request .*/
-
- rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
- while (TopicArrayCnt-- > 0) {
- rd_kafkap_str_t ktopic;
- int32_t PartArrayCnt;
- char *topic_name;
-
- rd_kafka_buf_read_str(rkbuf, &ktopic);
- rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
-
- RD_KAFKAP_STR_DUPA(&topic_name, &ktopic);
-
- while (PartArrayCnt-- > 0) {
- int32_t kpartition;
- int16_t ErrorCode;
- int32_t OffsetArrayCnt;
- int64_t Offset = -1;
- int32_t LeaderEpoch = -1;
- rd_kafka_topic_partition_t *rktpar;
-
- rd_kafka_buf_read_i32(rkbuf, &kpartition);
- rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
-
- if (api_version >= 1) {
- int64_t Timestamp;
- rd_kafka_buf_read_i64(rkbuf, &Timestamp);
- rd_kafka_buf_read_i64(rkbuf, &Offset);
- if (api_version >= 4)
- rd_kafka_buf_read_i32(rkbuf,
- &LeaderEpoch);
- } else if (api_version == 0) {
- rd_kafka_buf_read_i32(rkbuf, &OffsetArrayCnt);
- /* We only request one offset so just grab
- * the first one. */
- while (OffsetArrayCnt-- > 0)
- rd_kafka_buf_read_i64(rkbuf, &Offset);
- } else {
- RD_NOTREACHED();
- }
-
- rktpar = rd_kafka_topic_partition_list_add(
- offsets, topic_name, kpartition);
- rktpar->err = ErrorCode;
- rktpar->offset = Offset;
- rd_kafka_topic_partition_set_leader_epoch(rktpar,
- LeaderEpoch);
-
- if (ErrorCode && !all_err)
- all_err = ErrorCode;
- }
- }
-
- return all_err;
-
-err_parse:
- return rkbuf->rkbuf_err;
-}
-
-
-
-/**
- * @brief Parses and handles ListOffsets replies.
- *
- * Returns the parsed offsets (and errors) in \p offsets.
- * \p offsets must be initialized by the caller.
- *
- * @returns 0 on success, else an error. \p offsets may be populated on error,
- * depending on the nature of the error.
- * On error \p actionsp (unless NULL) is updated with the recommended
- * error actions.
- */
-rd_kafka_resp_err_t
-rd_kafka_handle_ListOffsets(rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- rd_kafka_topic_partition_list_t *offsets,
- int *actionsp) {
-
- int actions;
-
- if (!err)
- err = rd_kafka_parse_ListOffsets(rkbuf, offsets);
- if (!err)
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-
- actions = rd_kafka_err_action(
- rkb, err, request, RD_KAFKA_ERR_ACTION_PERMANENT,
- RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
-
- RD_KAFKA_ERR_ACTION_REFRESH,
- RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
-
- RD_KAFKA_ERR_ACTION_REFRESH,
- RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE,
-
- RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
-
- RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE,
-
- RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY,
- RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE,
-
- RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY,
- RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH,
-
- RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY,
- RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH,
-
- RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TRANSPORT,
-
- RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
-
-
- RD_KAFKA_ERR_ACTION_END);
-
- if (actionsp)
- *actionsp = actions;
-
- if (rkb)
- rd_rkb_dbg(
- rkb, TOPIC, "OFFSET", "OffsetRequest failed: %s (%s)",
- rd_kafka_err2str(err), rd_kafka_actions2str(actions));
-
- if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
- char tmp[256];
- /* Re-query for leader */
- rd_snprintf(tmp, sizeof(tmp), "ListOffsetsRequest failed: %s",
- rd_kafka_err2str(err));
- rd_kafka_metadata_refresh_known_topics(rk, NULL,
- rd_true /*force*/, tmp);
- }
-
- if ((actions & RD_KAFKA_ERR_ACTION_RETRY) &&
- rd_kafka_buf_retry(rkb, request))
- return RD_KAFKA_RESP_ERR__IN_PROGRESS;
-
- return err;
-}
-
-
-
-/**
- * @brief Async maker for ListOffsetsRequest.
- */
-static rd_kafka_resp_err_t
-rd_kafka_make_ListOffsetsRequest(rd_kafka_broker_t *rkb,
- rd_kafka_buf_t *rkbuf,
- void *make_opaque) {
- const rd_kafka_topic_partition_list_t *partitions =
- (const rd_kafka_topic_partition_list_t *)make_opaque;
- int i;
- size_t of_TopicArrayCnt = 0, of_PartArrayCnt = 0;
- const char *last_topic = "";
- int32_t topic_cnt = 0, part_cnt = 0;
- int16_t ApiVersion;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_ListOffsets, 0, 5, NULL);
- if (ApiVersion == -1)
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
-
- /* ReplicaId */
- rd_kafka_buf_write_i32(rkbuf, -1);
-
- /* IsolationLevel */
- if (ApiVersion >= 2)
- rd_kafka_buf_write_i8(rkbuf,
- rkb->rkb_rk->rk_conf.isolation_level);
-
- /* TopicArrayCnt */
- of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* updated later */
-
- for (i = 0; i < partitions->cnt; i++) {
- const rd_kafka_topic_partition_t *rktpar =
- &partitions->elems[i];
-
- if (strcmp(rktpar->topic, last_topic)) {
- /* Finish last topic, if any. */
- if (of_PartArrayCnt > 0)
- rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt,
- part_cnt);
-
- /* Topic */
- rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
- topic_cnt++;
- last_topic = rktpar->topic;
- /* New topic so reset partition count */
- part_cnt = 0;
-
- /* PartitionArrayCnt: updated later */
- of_PartArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0);
- }
-
- /* Partition */
- rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
- part_cnt++;
-
- if (ApiVersion >= 4)
- /* CurrentLeaderEpoch */
- rd_kafka_buf_write_i32(
- rkbuf,
- rd_kafka_topic_partition_get_current_leader_epoch(
- rktpar));
-
- /* Time/Offset */
- rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
-
- if (ApiVersion == 0) {
- /* MaxNumberOfOffsets */
- rd_kafka_buf_write_i32(rkbuf, 1);
- }
- }
-
- if (of_PartArrayCnt > 0) {
- rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, part_cnt);
- rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, topic_cnt);
- }
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_rkb_dbg(rkb, TOPIC, "OFFSET",
- "ListOffsetsRequest (v%hd, opv %d) "
- "for %" PRId32 " topic(s) and %" PRId32 " partition(s)",
- ApiVersion, rkbuf->rkbuf_replyq.version, topic_cnt,
- partitions->cnt);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief Send ListOffsetsRequest for partitions in \p partitions.
- */
-void rd_kafka_ListOffsetsRequest(rd_kafka_broker_t *rkb,
- rd_kafka_topic_partition_list_t *partitions,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- rd_kafka_topic_partition_list_t *make_parts;
-
- make_parts = rd_kafka_topic_partition_list_copy(partitions);
- rd_kafka_topic_partition_list_sort_by_topic(make_parts);
-
- rkbuf = rd_kafka_buf_new_request(
- rkb, RD_KAFKAP_ListOffsets, 1,
- /* ReplicaId+IsolationLevel+TopicArrayCnt+Topic */
- 4 + 1 + 4 + 100 +
- /* PartArrayCnt */
- 4 +
- /* partition_cnt * Partition+Time+MaxNumOffs */
- (make_parts->cnt * (4 + 8 + 4)));
-
- /* Postpone creating the request contents until time to send,
- * at which time the ApiVersion is known. */
- rd_kafka_buf_set_maker(rkbuf, rd_kafka_make_ListOffsetsRequest,
- make_parts,
- rd_kafka_topic_partition_list_destroy_free);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-/**
- * @brief OffsetForLeaderEpochResponse handler.
- */
-rd_kafka_resp_err_t rd_kafka_handle_OffsetForLeaderEpoch(
- rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- rd_kafka_topic_partition_list_t **offsets) {
- const int log_decode_errors = LOG_ERR;
- int16_t ApiVersion;
-
- if (err)
- goto err;
-
- ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;
-
- if (ApiVersion >= 2)
- rd_kafka_buf_read_throttle_time(rkbuf);
-
- const rd_kafka_topic_partition_field_t fields[] = {
- RD_KAFKA_TOPIC_PARTITION_FIELD_ERR,
- RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
- ApiVersion >= 1 ? RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH
- : RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP,
- RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET,
- RD_KAFKA_TOPIC_PARTITION_FIELD_END};
- *offsets = rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields);
- if (!*offsets)
- goto err_parse;
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-
-err:
- return err;
-
-err_parse:
- err = rkbuf->rkbuf_err;
- goto err;
-}
-
-
-/**
- * @brief Send OffsetForLeaderEpochRequest for partition(s).
- *
- */
-void rd_kafka_OffsetForLeaderEpochRequest(
- rd_kafka_broker_t *rkb,
- rd_kafka_topic_partition_list_t *parts,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_OffsetForLeaderEpoch, 2, 2, NULL);
- /* If the supported ApiVersions are not yet known,
- * or this broker doesn't support it, we let this request
- * succeed or fail later from the broker thread where the
- * version is checked again. */
- if (ApiVersion == -1)
- ApiVersion = 2;
-
- rkbuf = rd_kafka_buf_new_flexver_request(
- rkb, RD_KAFKAP_OffsetForLeaderEpoch, 1, 4 + (parts->cnt * 64),
- ApiVersion >= 4 /*flexver*/);
-
- /* Sort partitions by topic */
- rd_kafka_topic_partition_list_sort_by_topic(parts);
-
- /* Write partition list */
- const rd_kafka_topic_partition_field_t fields[] = {
- RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
- /* CurrentLeaderEpoch */
- RD_KAFKA_TOPIC_PARTITION_FIELD_CURRENT_EPOCH,
- /* LeaderEpoch */
- RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH,
- RD_KAFKA_TOPIC_PARTITION_FIELD_END};
- rd_kafka_buf_write_topic_partitions(
- rkbuf, parts, rd_false /*include invalid offsets*/,
- rd_false /*skip valid offsets */, fields);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- /* Let caller perform retries */
- rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-
-/**
- * Generic handler for OffsetFetch responses.
- * Offsets for included partitions will be propagated through the passed
- * 'offsets' list.
- *
- * @param rkbuf response buffer, may be NULL if \p err is set.
- * @param update_toppar update toppar's committed_offset
- * @param add_part if true add partitions from the response to \p *offsets,
- * else just update the partitions that are already
- * in \p *offsets.
- */
-rd_kafka_resp_err_t
-rd_kafka_handle_OffsetFetch(rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- rd_kafka_topic_partition_list_t **offsets,
- rd_bool_t update_toppar,
- rd_bool_t add_part,
- rd_bool_t allow_retry) {
- const int log_decode_errors = LOG_ERR;
- int32_t TopicArrayCnt;
- int64_t offset = RD_KAFKA_OFFSET_INVALID;
- int16_t ApiVersion;
- rd_kafkap_str_t metadata;
- int retry_unstable = 0;
- int i;
- int actions;
- int seen_cnt = 0;
-
- if (err)
- goto err;
-
- ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion;
-
- if (ApiVersion >= 3)
- rd_kafka_buf_read_throttle_time(rkbuf);
-
- if (!*offsets)
- *offsets = rd_kafka_topic_partition_list_new(16);
-
- /* Set default offset for all partitions. */
- rd_kafka_topic_partition_list_set_offsets(rkb->rkb_rk, *offsets, 0,
- RD_KAFKA_OFFSET_INVALID,
- 0 /* !is commit */);
-
- rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX);
- for (i = 0; i < TopicArrayCnt; i++) {
- rd_kafkap_str_t topic;
- int32_t PartArrayCnt;
- char *topic_name;
- int j;
-
- rd_kafka_buf_read_str(rkbuf, &topic);
-
- rd_kafka_buf_read_arraycnt(rkbuf, &PartArrayCnt,
- RD_KAFKAP_PARTITIONS_MAX);
-
- RD_KAFKAP_STR_DUPA(&topic_name, &topic);
-
- for (j = 0; j < PartArrayCnt; j++) {
- int32_t partition;
- rd_kafka_toppar_t *rktp;
- rd_kafka_topic_partition_t *rktpar;
- int32_t LeaderEpoch = -1;
- int16_t err2;
-
- rd_kafka_buf_read_i32(rkbuf, &partition);
- rd_kafka_buf_read_i64(rkbuf, &offset);
- if (ApiVersion >= 5)
- rd_kafka_buf_read_i32(rkbuf, &LeaderEpoch);
- rd_kafka_buf_read_str(rkbuf, &metadata);
- rd_kafka_buf_read_i16(rkbuf, &err2);
- rd_kafka_buf_skip_tags(rkbuf);
-
- rktpar = rd_kafka_topic_partition_list_find(
- *offsets, topic_name, partition);
- if (!rktpar && add_part)
- rktpar = rd_kafka_topic_partition_list_add(
- *offsets, topic_name, partition);
- else if (!rktpar) {
- rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
- "OffsetFetchResponse: %s [%" PRId32
- "] "
- "not found in local list: ignoring",
- topic_name, partition);
- continue;
- }
-
- seen_cnt++;
-
- rktp = rd_kafka_topic_partition_get_toppar(
- rk, rktpar, rd_false /*no create on miss*/);
-
- /* broker reports invalid offset as -1 */
- if (offset == -1)
- rktpar->offset = RD_KAFKA_OFFSET_INVALID;
- else
- rktpar->offset = offset;
-
- rd_kafka_topic_partition_set_leader_epoch(rktpar,
- LeaderEpoch);
- rktpar->err = err2;
-
- rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH",
- "OffsetFetchResponse: %s [%" PRId32
- "] "
- "offset %" PRId64 ", leader epoch %" PRId32
- ", metadata %d byte(s): %s",
- topic_name, partition, offset, LeaderEpoch,
- RD_KAFKAP_STR_LEN(&metadata),
- rd_kafka_err2name(rktpar->err));
-
- if (update_toppar && !err2 && rktp) {
- /* Update toppar's committed offset */
- rd_kafka_toppar_lock(rktp);
- rktp->rktp_committed_pos =
- rd_kafka_topic_partition_get_fetch_pos(
- rktpar);
- rd_kafka_toppar_unlock(rktp);
- }
-
- if (rktpar->err ==
- RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT)
- retry_unstable++;
-
-
- if (rktpar->metadata)
- rd_free(rktpar->metadata);
-
- if (RD_KAFKAP_STR_IS_NULL(&metadata)) {
- rktpar->metadata = NULL;
- rktpar->metadata_size = 0;
- } else {
- rktpar->metadata = RD_KAFKAP_STR_DUP(&metadata);
- rktpar->metadata_size =
- RD_KAFKAP_STR_LEN(&metadata);
- }
-
- /* Loose ref from get_toppar() */
- if (rktp)
- rd_kafka_toppar_destroy(rktp);
- }
-
- rd_kafka_buf_skip_tags(rkbuf);
- }
-
- if (ApiVersion >= 2) {
- int16_t ErrorCode;
- rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
- if (ErrorCode) {
- err = ErrorCode;
- goto err;
- }
- }
-
-
-err:
- if (!*offsets)
- rd_rkb_dbg(rkb, TOPIC, "OFFFETCH", "OffsetFetch returned %s",
- rd_kafka_err2str(err));
- else
- rd_rkb_dbg(rkb, TOPIC, "OFFFETCH",
- "OffsetFetch for %d/%d partition(s) "
- "(%d unstable partition(s)) returned %s",
- seen_cnt, (*offsets)->cnt, retry_unstable,
- rd_kafka_err2str(err));
-
- actions =
- rd_kafka_err_action(rkb, err, request, RD_KAFKA_ERR_ACTION_END);
-
- if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
- /* Re-query for coordinator */
- rd_kafka_cgrp_op(rkb->rkb_rk->rk_cgrp, NULL, RD_KAFKA_NO_REPLYQ,
- RD_KAFKA_OP_COORD_QUERY, err);
- }
-
- if (actions & RD_KAFKA_ERR_ACTION_RETRY || retry_unstable) {
- if (allow_retry && rd_kafka_buf_retry(rkb, request))
- return RD_KAFKA_RESP_ERR__IN_PROGRESS;
- /* FALLTHRU */
- }
-
- return err;
-
-err_parse:
- err = rkbuf->rkbuf_err;
- goto err;
-}
-
-
-
-/**
- * @brief Handle OffsetFetch response based on an RD_KAFKA_OP_OFFSET_FETCH
- * rko in \p opaque.
- *
- * @param opaque rko wrapper for handle_OffsetFetch.
- *
- * The \c rko->rko_u.offset_fetch.partitions list will be filled in with
- * the fetched offsets.
- *
- * A reply will be sent on 'rko->rko_replyq' with type RD_KAFKA_OP_OFFSET_FETCH.
- *
- * @remark \p rkb, \p rkbuf and \p request are optional.
- *
- * @remark The \p request buffer may be retried on error.
- *
- * @locality cgrp's broker thread
- */
-void rd_kafka_op_handle_OffsetFetch(rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- void *opaque) {
- rd_kafka_op_t *rko = opaque;
- rd_kafka_op_t *rko_reply;
- rd_kafka_topic_partition_list_t *offsets;
-
- RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_OFFSET_FETCH);
-
- if (err == RD_KAFKA_RESP_ERR__DESTROY) {
- /* Termination, quick cleanup. */
- rd_kafka_op_destroy(rko);
- return;
- }
-
- offsets = rd_kafka_topic_partition_list_copy(
- rko->rko_u.offset_fetch.partitions);
-
- /* If all partitions already had usable offsets then there
- * was no request sent and thus no reply, the offsets list is
- * good to go.. */
- if (rkbuf) {
- /* ..else parse the response (or perror) */
- err = rd_kafka_handle_OffsetFetch(
- rkb->rkb_rk, rkb, err, rkbuf, request, &offsets,
- rd_false /*dont update rktp*/, rd_false /*dont add part*/,
- /* Allow retries if replyq is valid */
- rd_kafka_op_replyq_is_valid(rko));
- if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
- if (offsets)
- rd_kafka_topic_partition_list_destroy(offsets);
- return; /* Retrying */
- }
- }
-
- rko_reply =
- rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH | RD_KAFKA_OP_REPLY);
- rko_reply->rko_err = err;
- rko_reply->rko_u.offset_fetch.partitions = offsets;
- rko_reply->rko_u.offset_fetch.do_free = 1;
- if (rko->rko_rktp)
- rko_reply->rko_rktp = rd_kafka_toppar_keep(rko->rko_rktp);
-
- rd_kafka_replyq_enq(&rko->rko_replyq, rko_reply, 0);
-
- rd_kafka_op_destroy(rko);
-}
-
-/**
- * Send OffsetFetchRequest for a consumer group id.
- *
- * Any partition with a usable offset will be ignored, if all partitions
- * have usable offsets then no request is sent at all but an empty
- * reply is enqueued on the replyq.
- *
- * @param group_id Request offset for this group id.
- * @param parts (optional) List of topic partitions to request,
- * or NULL to return all topic partitions associated with the
- * group.
- * @param require_stable_offsets Whether broker should return stable offsets
- * (transaction-committed).
- * @param timeout Optional timeout to set to the buffer.
- */
-void rd_kafka_OffsetFetchRequest(rd_kafka_broker_t *rkb,
- const char *group_id,
- rd_kafka_topic_partition_list_t *parts,
- rd_bool_t require_stable_offsets,
- int timeout,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion;
- size_t parts_size = 0;
- int PartCnt = -1;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_OffsetFetch, 0, 7, NULL);
-
- if (parts) {
- parts_size = parts->cnt * 32;
- }
-
- rkbuf = rd_kafka_buf_new_flexver_request(
- rkb, RD_KAFKAP_OffsetFetch, 1,
- /* GroupId + rd_kafka_buf_write_arraycnt_pos +
- * Topics + RequireStable */
- 32 + 4 + parts_size + 1, ApiVersion >= 6 /*flexver*/);
-
- /* ConsumerGroup */
- rd_kafka_buf_write_str(rkbuf, group_id, -1);
-
- if (parts) {
- /* Sort partitions by topic */
- rd_kafka_topic_partition_list_sort_by_topic(parts);
-
- /* Write partition list, filtering out partitions with valid
- * offsets */
- const rd_kafka_topic_partition_field_t fields[] = {
- RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
- RD_KAFKA_TOPIC_PARTITION_FIELD_END};
- PartCnt = rd_kafka_buf_write_topic_partitions(
- rkbuf, parts, rd_false /*include invalid offsets*/,
- rd_false /*skip valid offsets */, fields);
- } else {
- rd_kafka_buf_write_arraycnt_pos(rkbuf);
- }
-
- if (ApiVersion >= 7) {
- /* RequireStable */
- rd_kafka_buf_write_i8(rkbuf, require_stable_offsets);
- }
-
- if (PartCnt == 0) {
- /* No partitions needs OffsetFetch, enqueue empty
- * response right away. */
- rkbuf->rkbuf_replyq = replyq;
- rkbuf->rkbuf_cb = resp_cb;
- rkbuf->rkbuf_opaque = opaque;
- rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf);
- return;
- }
-
- if (timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
- rd_kafka_buf_set_abs_timeout(rkbuf, timeout + 1000, 0);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- if (parts) {
- rd_rkb_dbg(
- rkb, TOPIC | RD_KAFKA_DBG_CGRP | RD_KAFKA_DBG_CONSUMER,
- "OFFSET",
- "Group %s OffsetFetchRequest(v%d) for %d/%d partition(s)",
- group_id, ApiVersion, PartCnt, parts->cnt);
- } else {
- rd_rkb_dbg(
- rkb, TOPIC | RD_KAFKA_DBG_CGRP | RD_KAFKA_DBG_CONSUMER,
- "OFFSET",
- "Group %s OffsetFetchRequest(v%d) for all partitions",
- group_id, ApiVersion);
- }
-
- /* Let handler decide if retries should be performed */
- rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES;
-
- if (parts) {
- rd_rkb_dbg(rkb, CGRP | RD_KAFKA_DBG_CONSUMER, "OFFSET",
- "Fetch committed offsets for %d/%d partition(s)",
- PartCnt, parts->cnt);
- } else {
- rd_rkb_dbg(rkb, CGRP | RD_KAFKA_DBG_CONSUMER, "OFFSET",
- "Fetch committed offsets all the partitions");
- }
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-
-/**
- * @brief Handle per-partition OffsetCommit errors and returns actions flags.
- */
-static int
-rd_kafka_handle_OffsetCommit_error(rd_kafka_broker_t *rkb,
- rd_kafka_buf_t *request,
- const rd_kafka_topic_partition_t *rktpar) {
-
- /* These actions are mimicking AK's ConsumerCoordinator.java */
-
- return rd_kafka_err_action(
- rkb, rktpar->err, request,
-
- RD_KAFKA_ERR_ACTION_PERMANENT,
- RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
-
- RD_KAFKA_ERR_ACTION_PERMANENT,
- RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
-
-
- RD_KAFKA_ERR_ACTION_PERMANENT,
- RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE,
-
- RD_KAFKA_ERR_ACTION_PERMANENT,
- RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE,
-
-
- RD_KAFKA_ERR_ACTION_RETRY,
- RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
-
- RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
-
-
- /* .._SPECIAL: mark coordinator dead, refresh and retry */
- RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY |
- RD_KAFKA_ERR_ACTION_SPECIAL,
- RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
-
- RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY |
- RD_KAFKA_ERR_ACTION_SPECIAL,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
-
- /* Replicas possibly unavailable:
- * Refresh coordinator (but don't mark as dead (!.._SPECIAL)),
- * and retry */
- RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY,
- RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
-
-
- /* FIXME: There are some cases in the Java code where
- * this is not treated as a fatal error. */
- RD_KAFKA_ERR_ACTION_PERMANENT | RD_KAFKA_ERR_ACTION_FATAL,
- RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID,
-
-
- RD_KAFKA_ERR_ACTION_PERMANENT,
- RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS,
-
-
- RD_KAFKA_ERR_ACTION_PERMANENT, RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
-
- RD_KAFKA_ERR_ACTION_PERMANENT, RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
-
- RD_KAFKA_ERR_ACTION_END);
-}
-
-
-/**
- * @brief Handle OffsetCommit response.
- *
- * @remark \p offsets may be NULL if \p err is set
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if all partitions were successfully
- * committed,
- * RD_KAFKA_RESP_ERR__IN_PROGRESS if a retry was scheduled,
- * or any other error code if the request was not retried.
- */
-rd_kafka_resp_err_t
-rd_kafka_handle_OffsetCommit(rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- rd_kafka_topic_partition_list_t *offsets,
- rd_bool_t ignore_cgrp) {
- const int log_decode_errors = LOG_ERR;
- int32_t TopicArrayCnt;
- int errcnt = 0;
- int partcnt = 0;
- int i;
- int actions = 0;
-
- if (err)
- goto err;
-
- if (rd_kafka_buf_ApiVersion(rkbuf) >= 3)
- rd_kafka_buf_read_throttle_time(rkbuf);
-
- rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
- for (i = 0; i < TopicArrayCnt; i++) {
- rd_kafkap_str_t topic;
- char *topic_str;
- int32_t PartArrayCnt;
- int j;
-
- rd_kafka_buf_read_str(rkbuf, &topic);
- rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt);
-
- RD_KAFKAP_STR_DUPA(&topic_str, &topic);
-
- for (j = 0; j < PartArrayCnt; j++) {
- int32_t partition;
- int16_t ErrorCode;
- rd_kafka_topic_partition_t *rktpar;
-
- rd_kafka_buf_read_i32(rkbuf, &partition);
- rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
-
- rktpar = rd_kafka_topic_partition_list_find(
- offsets, topic_str, partition);
-
- if (!rktpar) {
- /* Received offset for topic/partition we didn't
- * ask for, this shouldn't really happen. */
- continue;
- }
-
- rktpar->err = ErrorCode;
- if (ErrorCode) {
- err = ErrorCode;
- errcnt++;
-
- /* Accumulate actions for per-partition
- * errors. */
- actions |= rd_kafka_handle_OffsetCommit_error(
- rkb, request, rktpar);
- }
-
- partcnt++;
- }
- }
-
- /* If all partitions failed use error code
- * from last partition as the global error. */
- if (offsets && err && errcnt == partcnt)
- goto err;
-
- goto done;
-
-err_parse:
- err = rkbuf->rkbuf_err;
-
-err:
- if (!actions) /* Transport/Request-level error */
- actions = rd_kafka_err_action(rkb, err, request,
-
- RD_KAFKA_ERR_ACTION_REFRESH |
- RD_KAFKA_ERR_ACTION_SPECIAL |
- RD_KAFKA_ERR_ACTION_RETRY,
- RD_KAFKA_RESP_ERR__TRANSPORT,
-
- RD_KAFKA_ERR_ACTION_END);
-
- if (!ignore_cgrp && (actions & RD_KAFKA_ERR_ACTION_FATAL)) {
- rd_kafka_set_fatal_error(rk, err, "OffsetCommit failed: %s",
- rd_kafka_err2str(err));
- return err;
- }
-
- if (!ignore_cgrp && (actions & RD_KAFKA_ERR_ACTION_REFRESH) &&
- rk->rk_cgrp) {
- /* Mark coordinator dead or re-query for coordinator.
- * ..dead() will trigger a re-query. */
- if (actions & RD_KAFKA_ERR_ACTION_SPECIAL)
- rd_kafka_cgrp_coord_dead(rk->rk_cgrp, err,
- "OffsetCommitRequest failed");
- else
- rd_kafka_cgrp_coord_query(rk->rk_cgrp,
- "OffsetCommitRequest failed");
- }
-
- if (!ignore_cgrp && actions & RD_KAFKA_ERR_ACTION_RETRY &&
- !(actions & RD_KAFKA_ERR_ACTION_PERMANENT) &&
- rd_kafka_buf_retry(rkb, request))
- return RD_KAFKA_RESP_ERR__IN_PROGRESS;
-
-done:
- return err;
-}
-
-/**
- * @brief Send OffsetCommitRequest for a list of partitions.
- *
- * @param cgmetadata consumer group metadata.
- *
- * @param offsets - offsets to commit for each topic-partition.
- *
- * @returns 0 if none of the partitions in \p offsets had valid offsets,
- * else 1.
- */
-int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb,
- rd_kafka_consumer_group_metadata_t *cgmetadata,
- rd_kafka_topic_partition_list_t *offsets,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque,
- const char *reason) {
- rd_kafka_buf_t *rkbuf;
- ssize_t of_TopicCnt = -1;
- int TopicCnt = 0;
- const char *last_topic = NULL;
- ssize_t of_PartCnt = -1;
- int PartCnt = 0;
- int tot_PartCnt = 0;
- int i;
- int16_t ApiVersion;
- int features;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_OffsetCommit, 0, 7, &features);
-
- rd_kafka_assert(NULL, offsets != NULL);
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit, 1,
- 100 + (offsets->cnt * 128));
-
- /* ConsumerGroup */
- rd_kafka_buf_write_str(rkbuf, cgmetadata->group_id, -1);
-
- /* v1,v2 */
- if (ApiVersion >= 1) {
- /* ConsumerGroupGenerationId */
- rd_kafka_buf_write_i32(rkbuf, cgmetadata->generation_id);
- /* ConsumerId */
- rd_kafka_buf_write_str(rkbuf, cgmetadata->member_id, -1);
- }
-
- /* v7: GroupInstanceId */
- if (ApiVersion >= 7)
- rd_kafka_buf_write_str(rkbuf, cgmetadata->group_instance_id,
- -1);
-
- /* v2-4: RetentionTime */
- if (ApiVersion >= 2 && ApiVersion <= 4)
- rd_kafka_buf_write_i64(rkbuf, -1);
-
- /* Sort offsets by topic */
- rd_kafka_topic_partition_list_sort_by_topic(offsets);
-
- /* TopicArrayCnt: Will be updated when we know the number of topics. */
- of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0);
-
- for (i = 0; i < offsets->cnt; i++) {
- rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
-
- /* Skip partitions with invalid offset. */
- if (rktpar->offset < 0)
- continue;
-
- if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) {
- /* New topic */
-
- /* Finalize previous PartitionCnt */
- if (PartCnt > 0)
- rd_kafka_buf_update_u32(rkbuf, of_PartCnt,
- PartCnt);
-
- /* TopicName */
- rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1);
- /* PartitionCnt, finalized later */
- of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
- PartCnt = 0;
- last_topic = rktpar->topic;
- TopicCnt++;
- }
-
- /* Partition */
- rd_kafka_buf_write_i32(rkbuf, rktpar->partition);
- PartCnt++;
- tot_PartCnt++;
-
- /* Offset */
- rd_kafka_buf_write_i64(rkbuf, rktpar->offset);
-
- /* v6: KIP-101 CommittedLeaderEpoch */
- if (ApiVersion >= 6)
- rd_kafka_buf_write_i32(
- rkbuf,
- rd_kafka_topic_partition_get_leader_epoch(rktpar));
-
- /* v1: TimeStamp */
- if (ApiVersion == 1)
- rd_kafka_buf_write_i64(rkbuf, -1);
-
- /* Metadata */
- /* Java client 0.9.0 and broker <0.10.0 can't parse
- * Null metadata fields, so as a workaround we send an
- * empty string if it's Null. */
- if (!rktpar->metadata)
- rd_kafka_buf_write_str(rkbuf, "", 0);
- else
- rd_kafka_buf_write_str(rkbuf, rktpar->metadata,
- rktpar->metadata_size);
- }
-
- if (tot_PartCnt == 0) {
- /* No topic+partitions had valid offsets to commit. */
- rd_kafka_replyq_destroy(&replyq);
- rd_kafka_buf_destroy(rkbuf);
- return 0;
- }
-
- /* Finalize previous PartitionCnt */
- if (PartCnt > 0)
- rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt);
-
- /* Finalize TopicCnt */
- rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_rkb_dbg(rkb, TOPIC, "OFFSET",
- "Enqueue OffsetCommitRequest(v%d, %d/%d partition(s))): %s",
- ApiVersion, tot_PartCnt, offsets->cnt, reason);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return 1;
-}
-
-/**
- * @brief Construct and send OffsetDeleteRequest to \p rkb
- * with the partitions in del_grpoffsets (DeleteConsumerGroupOffsets_t*)
- * using \p options.
- *
- * The response (unparsed) will be enqueued on \p replyq
- * for handling by \p resp_cb (with \p opaque passed).
- *
- * @remark Only one del_grpoffsets element is supported.
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code and errstr will be
- * updated with a human readable error string.
- */
-rd_kafka_resp_err_t
-rd_kafka_OffsetDeleteRequest(rd_kafka_broker_t *rkb,
- /** (rd_kafka_DeleteConsumerGroupOffsets_t*) */
- const rd_list_t *del_grpoffsets,
- rd_kafka_AdminOptions_t *options,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- int features;
- const rd_kafka_DeleteConsumerGroupOffsets_t *grpoffsets =
- rd_list_elem(del_grpoffsets, 0);
-
- rd_assert(rd_list_cnt(del_grpoffsets) == 1);
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_OffsetDelete, 0, 0, &features);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "OffsetDelete API (KIP-496) not supported "
- "by broker, requires broker version >= 2.4.0");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- rkbuf = rd_kafka_buf_new_request(
- rkb, RD_KAFKAP_OffsetDelete, 1,
- 2 + strlen(grpoffsets->group) + (64 * grpoffsets->partitions->cnt));
-
- /* GroupId */
- rd_kafka_buf_write_str(rkbuf, grpoffsets->group, -1);
-
- const rd_kafka_topic_partition_field_t fields[] = {
- RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
- RD_KAFKA_TOPIC_PARTITION_FIELD_END};
- rd_kafka_buf_write_topic_partitions(
- rkbuf, grpoffsets->partitions,
- rd_false /*dont skip invalid offsets*/, rd_false /*any offset*/,
- fields);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-/**
- * @brief Write "consumer" protocol type MemberState for SyncGroupRequest to
- * enveloping buffer \p rkbuf.
- */
-static void
-rd_kafka_group_MemberState_consumer_write(rd_kafka_buf_t *env_rkbuf,
- const rd_kafka_group_member_t *rkgm) {
- rd_kafka_buf_t *rkbuf;
- rd_slice_t slice;
-
- rkbuf = rd_kafka_buf_new(1, 100);
- rd_kafka_buf_write_i16(rkbuf, 0); /* Version */
- rd_assert(rkgm->rkgm_assignment);
- const rd_kafka_topic_partition_field_t fields[] = {
- RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
- RD_KAFKA_TOPIC_PARTITION_FIELD_END};
- rd_kafka_buf_write_topic_partitions(
- rkbuf, rkgm->rkgm_assignment,
- rd_false /*don't skip invalid offsets*/, rd_false /* any offset */,
- fields);
- rd_kafka_buf_write_kbytes(rkbuf, rkgm->rkgm_userdata);
-
- /* Get pointer to binary buffer */
- rd_slice_init_full(&slice, &rkbuf->rkbuf_buf);
-
- /* Write binary buffer as Kafka Bytes to enveloping buffer. */
- rd_kafka_buf_write_i32(env_rkbuf, (int32_t)rd_slice_remains(&slice));
- rd_buf_write_slice(&env_rkbuf->rkbuf_buf, &slice);
-
- rd_kafka_buf_destroy(rkbuf);
-}
-
-/**
- * Send SyncGroupRequest
- */
-void rd_kafka_SyncGroupRequest(rd_kafka_broker_t *rkb,
- const rd_kafkap_str_t *group_id,
- int32_t generation_id,
- const rd_kafkap_str_t *member_id,
- const rd_kafkap_str_t *group_instance_id,
- const rd_kafka_group_member_t *assignments,
- int assignment_cnt,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int i;
- int16_t ApiVersion;
- int features;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_SyncGroup, 0, 3, &features);
-
- rkbuf = rd_kafka_buf_new_request(
- rkb, RD_KAFKAP_SyncGroup, 1,
- RD_KAFKAP_STR_SIZE(group_id) + 4 /* GenerationId */ +
- RD_KAFKAP_STR_SIZE(member_id) +
- RD_KAFKAP_STR_SIZE(group_instance_id) +
- 4 /* array size group_assignment */ +
- (assignment_cnt * 100 /*guess*/));
- rd_kafka_buf_write_kstr(rkbuf, group_id);
- rd_kafka_buf_write_i32(rkbuf, generation_id);
- rd_kafka_buf_write_kstr(rkbuf, member_id);
- if (ApiVersion >= 3)
- rd_kafka_buf_write_kstr(rkbuf, group_instance_id);
- rd_kafka_buf_write_i32(rkbuf, assignment_cnt);
-
- for (i = 0; i < assignment_cnt; i++) {
- const rd_kafka_group_member_t *rkgm = &assignments[i];
-
- rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id);
- rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm);
- }
-
- /* This is a blocking request */
- rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
- rd_kafka_buf_set_abs_timeout(
- rkbuf,
- rkb->rkb_rk->rk_conf.group_session_timeout_ms +
- 3000 /* 3s grace period*/,
- 0);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-
-/**
- * Send JoinGroupRequest
- */
-void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb,
- const rd_kafkap_str_t *group_id,
- const rd_kafkap_str_t *member_id,
- const rd_kafkap_str_t *group_instance_id,
- const rd_kafkap_str_t *protocol_type,
- const rd_list_t *topics,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- rd_kafka_t *rk = rkb->rkb_rk;
- rd_kafka_assignor_t *rkas;
- int i;
- int16_t ApiVersion = 0;
- int features;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_JoinGroup, 0, 5, &features);
-
-
- rkbuf = rd_kafka_buf_new_request(
- rkb, RD_KAFKAP_JoinGroup, 1,
- RD_KAFKAP_STR_SIZE(group_id) + 4 /* sessionTimeoutMs */ +
- 4 /* rebalanceTimeoutMs */ + RD_KAFKAP_STR_SIZE(member_id) +
- RD_KAFKAP_STR_SIZE(group_instance_id) +
- RD_KAFKAP_STR_SIZE(protocol_type) +
- 4 /* array count GroupProtocols */ +
- (rd_list_cnt(topics) * 100));
- rd_kafka_buf_write_kstr(rkbuf, group_id);
- rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.group_session_timeout_ms);
- if (ApiVersion >= 1)
- rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.max_poll_interval_ms);
- rd_kafka_buf_write_kstr(rkbuf, member_id);
- if (ApiVersion >= 5)
- rd_kafka_buf_write_kstr(rkbuf, group_instance_id);
- rd_kafka_buf_write_kstr(rkbuf, protocol_type);
- rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt);
-
- RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) {
- rd_kafkap_bytes_t *member_metadata;
- if (!rkas->rkas_enabled)
- continue;
- rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name);
- member_metadata = rkas->rkas_get_metadata_cb(
- rkas, rk->rk_cgrp->rkcg_assignor_state, topics,
- rk->rk_cgrp->rkcg_group_assignment);
- rd_kafka_buf_write_kbytes(rkbuf, member_metadata);
- rd_kafkap_bytes_destroy(member_metadata);
- }
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- if (ApiVersion < 1 &&
- rk->rk_conf.max_poll_interval_ms >
- rk->rk_conf.group_session_timeout_ms &&
- rd_interval(&rkb->rkb_suppress.unsupported_kip62,
- /* at most once per day */
- (rd_ts_t)86400 * 1000 * 1000, 0) > 0)
- rd_rkb_log(rkb, LOG_NOTICE, "MAXPOLL",
- "Broker does not support KIP-62 "
- "(requires Apache Kafka >= v0.10.1.0): "
- "consumer configuration "
- "`max.poll.interval.ms` (%d) "
- "is effectively limited "
- "by `session.timeout.ms` (%d) "
- "with this broker version",
- rk->rk_conf.max_poll_interval_ms,
- rk->rk_conf.group_session_timeout_ms);
-
-
- if (ApiVersion < 5 && rk->rk_conf.group_instance_id &&
- rd_interval(&rkb->rkb_suppress.unsupported_kip345,
- /* at most once per day */
- (rd_ts_t)86400 * 1000 * 1000, 0) > 0)
- rd_rkb_log(rkb, LOG_NOTICE, "STATICMEMBER",
- "Broker does not support KIP-345 "
- "(requires Apache Kafka >= v2.3.0): "
- "consumer configuration "
- "`group.instance.id` (%s) "
- "will not take effect",
- rk->rk_conf.group_instance_id);
-
- /* Absolute timeout */
- rd_kafka_buf_set_abs_timeout_force(
- rkbuf,
- /* Request timeout is max.poll.interval.ms + grace
- * if the broker supports it, else
- * session.timeout.ms + grace. */
- (ApiVersion >= 1 ? rk->rk_conf.max_poll_interval_ms
- : rk->rk_conf.group_session_timeout_ms) +
- 3000 /* 3s grace period*/,
- 0);
-
- /* This is a blocking request */
- rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING;
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-
-/**
- * Send LeaveGroupRequest
- */
-void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb,
- const char *group_id,
- const char *member_id,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- int features;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_LeaveGroup, 0, 1, &features);
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup, 1, 300);
-
- rd_kafka_buf_write_str(rkbuf, group_id, -1);
- rd_kafka_buf_write_str(rkbuf, member_id, -1);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- /* LeaveGroupRequests are best-effort, the local consumer
- * does not care if it succeeds or not, so the request timeout
- * is shortened.
- * Retries are not needed. */
- rd_kafka_buf_set_abs_timeout(rkbuf, 5000, 0);
- rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-/**
- * Handler for LeaveGroup responses
- * opaque must be the cgrp handle.
- */
-void rd_kafka_handle_LeaveGroup(rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- void *opaque) {
- rd_kafka_cgrp_t *rkcg = opaque;
- const int log_decode_errors = LOG_ERR;
- int16_t ErrorCode = 0;
- int actions;
-
- if (err) {
- ErrorCode = err;
- goto err;
- }
-
- rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
-
-err:
- actions = rd_kafka_err_action(rkb, ErrorCode, request,
- RD_KAFKA_ERR_ACTION_END);
-
- if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
- /* Re-query for coordinator */
- rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
- RD_KAFKA_OP_COORD_QUERY, ErrorCode);
- }
-
- if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
- if (rd_kafka_buf_retry(rkb, request))
- return;
- /* FALLTHRU */
- }
-
- if (ErrorCode)
- rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
- "LeaveGroup response: %s",
- rd_kafka_err2str(ErrorCode));
-
- return;
-
-err_parse:
- ErrorCode = rkbuf->rkbuf_err;
- goto err;
-}
-
-
-
-/**
- * Send HeartbeatRequest
- */
-void rd_kafka_HeartbeatRequest(rd_kafka_broker_t *rkb,
- const rd_kafkap_str_t *group_id,
- int32_t generation_id,
- const rd_kafkap_str_t *member_id,
- const rd_kafkap_str_t *group_instance_id,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- int features;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_Heartbeat, 0, 3, &features);
-
- rd_rkb_dbg(rkb, CGRP, "HEARTBEAT",
- "Heartbeat for group \"%s\" generation id %" PRId32,
- group_id->str, generation_id);
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Heartbeat, 1,
- RD_KAFKAP_STR_SIZE(group_id) +
- 4 /* GenerationId */ +
- RD_KAFKAP_STR_SIZE(member_id));
-
- rd_kafka_buf_write_kstr(rkbuf, group_id);
- rd_kafka_buf_write_i32(rkbuf, generation_id);
- rd_kafka_buf_write_kstr(rkbuf, member_id);
- if (ApiVersion >= 3)
- rd_kafka_buf_write_kstr(rkbuf, group_instance_id);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_kafka_buf_set_abs_timeout(
- rkbuf, rkb->rkb_rk->rk_conf.group_session_timeout_ms, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-}
-
-
-
-/**
- * @brief Construct and send ListGroupsRequest to \p rkb
- * with the states (const char *) in \p states.
- * Uses \p max_ApiVersion as maximum API version,
- * pass -1 to use the maximum available version.
- *
- * The response (unparsed) will be enqueued on \p replyq
- * for handling by \p resp_cb (with \p opaque passed).
- *
- * @return NULL on success, a new error instance that must be
- * released with rd_kafka_error_destroy() in case of error.
- */
-rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb,
- int16_t max_ApiVersion,
- const char **states,
- size_t states_cnt,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- size_t i;
-
- if (max_ApiVersion < 0)
- max_ApiVersion = 4;
-
- if (max_ApiVersion > ApiVersion) {
- /* Remark: don't check if max_ApiVersion is zero.
- * As rd_kafka_broker_ApiVersion_supported cannot be checked
- * in the application thread reliably . */
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_ListGroups, 0, max_ApiVersion, NULL);
- }
-
- if (ApiVersion == -1) {
- return rd_kafka_error_new(
- RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
- "ListGroupsRequest not supported by broker");
- }
-
- rkbuf = rd_kafka_buf_new_flexver_request(
- rkb, RD_KAFKAP_ListGroups, 1,
- /* rd_kafka_buf_write_arraycnt_pos + tags + StatesFilter */
- 4 + 1 + 32 * states_cnt, ApiVersion >= 3 /* is_flexver */);
-
- if (ApiVersion >= 4) {
- size_t of_GroupsArrayCnt =
- rd_kafka_buf_write_arraycnt_pos(rkbuf);
- for (i = 0; i < states_cnt; i++) {
- rd_kafka_buf_write_str(rkbuf, states[i], -1);
- }
- rd_kafka_buf_finalize_arraycnt(rkbuf, of_GroupsArrayCnt, i);
- }
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
- return NULL;
-}
-
-/**
- * @brief Construct and send DescribeGroupsRequest to \p rkb
- * with the groups (const char *) in \p groups.
- * Uses \p max_ApiVersion as maximum API version,
- * pass -1 to use the maximum available version.
- *
- * The response (unparsed) will be enqueued on \p replyq
- * for handling by \p resp_cb (with \p opaque passed).
- *
- * @return NULL on success, a new error instance that must be
- * released with rd_kafka_error_destroy() in case of error.
- */
-rd_kafka_error_t *rd_kafka_DescribeGroupsRequest(rd_kafka_broker_t *rkb,
- int16_t max_ApiVersion,
- char **groups,
- size_t group_cnt,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- size_t of_GroupsArrayCnt;
-
- if (max_ApiVersion < 0)
- max_ApiVersion = 4;
-
- if (max_ApiVersion > ApiVersion) {
- /* Remark: don't check if max_ApiVersion is zero.
- * As rd_kafka_broker_ApiVersion_supported cannot be checked
- * in the application thread reliably . */
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_DescribeGroups, 0, max_ApiVersion, NULL);
- }
-
- if (ApiVersion == -1) {
- return rd_kafka_error_new(
- RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
- "DescribeGroupsRequest not supported by broker");
- }
-
- rkbuf = rd_kafka_buf_new_flexver_request(
- rkb, RD_KAFKAP_DescribeGroups, 1,
- 4 /* rd_kafka_buf_write_arraycnt_pos */ +
- 1 /* IncludeAuthorizedOperations */ + 1 /* tags */ +
- 32 * group_cnt /* Groups */,
- rd_false);
-
- /* write Groups */
- of_GroupsArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf);
- rd_kafka_buf_finalize_arraycnt(rkbuf, of_GroupsArrayCnt, group_cnt);
- while (group_cnt-- > 0)
- rd_kafka_buf_write_str(rkbuf, groups[group_cnt], -1);
-
- /* write IncludeAuthorizedOperations */
- if (ApiVersion >= 3) {
- /* TODO: implement KIP-430 */
- rd_kafka_buf_write_bool(rkbuf, rd_false);
- }
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
- return NULL;
-}
-
-/**
- * @brief Generic handler for Metadata responses
- *
- * @locality rdkafka main thread
- */
-static void rd_kafka_handle_Metadata(rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- void *opaque) {
- rd_kafka_op_t *rko = opaque; /* Possibly NULL */
- struct rd_kafka_metadata *md = NULL;
- const rd_list_t *topics = request->rkbuf_u.Metadata.topics;
- int actions;
-
- rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY ||
- thrd_is_current(rk->rk_thread));
-
- /* Avoid metadata updates when we're terminating. */
- if (rd_kafka_terminating(rkb->rkb_rk) ||
- err == RD_KAFKA_RESP_ERR__DESTROY) {
- /* Terminating */
- goto done;
- }
-
- if (err)
- goto err;
-
- if (!topics)
- rd_rkb_dbg(rkb, METADATA, "METADATA",
- "===== Received metadata: %s =====",
- request->rkbuf_u.Metadata.reason);
- else
- rd_rkb_dbg(rkb, METADATA, "METADATA",
- "===== Received metadata "
- "(for %d requested topics): %s =====",
- rd_list_cnt(topics),
- request->rkbuf_u.Metadata.reason);
-
- err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &md);
- if (err)
- goto err;
-
- if (rko && rko->rko_replyq.q) {
- /* Reply to metadata requester, passing on the metadata.
- * Reuse requesting rko for the reply. */
- rko->rko_err = err;
- rko->rko_u.metadata.md = md;
-
- rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
- rko = NULL;
- } else {
- if (md)
- rd_free(md);
- }
-
- goto done;
-
-err:
- actions = rd_kafka_err_action(rkb, err, request,
-
- RD_KAFKA_ERR_ACTION_RETRY,
- RD_KAFKA_RESP_ERR__PARTIAL,
-
- RD_KAFKA_ERR_ACTION_END);
-
- if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
- if (rd_kafka_buf_retry(rkb, request))
- return;
- /* FALLTHRU */
- } else {
- rd_rkb_log(rkb, LOG_WARNING, "METADATA",
- "Metadata request failed: %s: %s (%dms): %s",
- request->rkbuf_u.Metadata.reason,
- rd_kafka_err2str(err),
- (int)(request->rkbuf_ts_sent / 1000),
- rd_kafka_actions2str(actions));
- /* Respond back to caller on non-retriable errors */
- if (rko && rko->rko_replyq.q) {
- rko->rko_err = err;
- rko->rko_u.metadata.md = NULL;
- rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0);
- rko = NULL;
- }
- }
-
-
-
- /* FALLTHRU */
-
-done:
- if (rko)
- rd_kafka_op_destroy(rko);
-}
-
-
-/**
- * @brief Construct MetadataRequest (does not send)
- *
- * \p topics is a list of topic names (char *) to request.
- *
- * !topics - only request brokers (if supported by broker, else
- * all topics)
- * topics.cnt==0 - all topics in cluster are requested
- * topics.cnt >0 - only specified topics are requested
- *
- * @param reason - metadata request reason
- * @param allow_auto_create_topics - allow broker-side auto topic creation.
- * This is best-effort, depending on broker
- * config and version.
- * @param cgrp_update - Update cgrp in parse_Metadata (see comment there).
- * @param rko - (optional) rko with replyq for handling response.
- * Specifying an rko forces a metadata request even if
- * there is already a matching one in-transit.
- *
- * If full metadata for all topics is requested (or all brokers, which
- * results in all-topics on older brokers) and there is already a full request
- * in transit then this function will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS
- * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. If \p rko is non-NULL the request
- * is sent regardless.
- */
-rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb,
- const rd_list_t *topics,
- const char *reason,
- rd_bool_t allow_auto_create_topics,
- rd_bool_t cgrp_update,
- rd_kafka_op_t *rko) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- size_t of_TopicArrayCnt;
- int features;
- int topic_cnt = topics ? rd_list_cnt(topics) : 0;
- int *full_incr = NULL;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_Metadata, 0, 9, &features);
-
- rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_Metadata, 1,
- 4 + (50 * topic_cnt) + 1,
- ApiVersion >= 9);
-
- if (!reason)
- reason = "";
-
- rkbuf->rkbuf_u.Metadata.reason = rd_strdup(reason);
- rkbuf->rkbuf_u.Metadata.cgrp_update = cgrp_update;
-
- /* TopicArrayCnt */
- of_TopicArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf);
-
- if (!topics) {
- /* v0: keep 0, brokers only not available,
- * request all topics */
- /* v1-8: 0 means empty array, brokers only */
- if (ApiVersion >= 9) {
- /* v9+: varint encoded empty array (1), brokers only */
- rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicArrayCnt,
- topic_cnt);
- }
-
- rd_rkb_dbg(rkb, METADATA, "METADATA",
- "Request metadata for brokers only: %s", reason);
- full_incr =
- &rkb->rkb_rk->rk_metadata_cache.rkmc_full_brokers_sent;
-
- } else if (topic_cnt == 0) {
- /* v0: keep 0, request all topics */
- if (ApiVersion >= 1 && ApiVersion < 9) {
- /* v1-8: update to -1, all topics */
- rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, -1);
- }
- /* v9+: keep 0, varint encoded null, all topics */
-
- rkbuf->rkbuf_u.Metadata.all_topics = 1;
- rd_rkb_dbg(rkb, METADATA, "METADATA",
- "Request metadata for all topics: "
- "%s",
- reason);
-
- if (!rko)
- full_incr = &rkb->rkb_rk->rk_metadata_cache
- .rkmc_full_topics_sent;
-
- } else {
- /* request cnt topics */
- rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicArrayCnt,
- topic_cnt);
-
- rd_rkb_dbg(rkb, METADATA, "METADATA",
- "Request metadata for %d topic(s): "
- "%s",
- topic_cnt, reason);
- }
-
- if (full_incr) {
- /* Avoid multiple outstanding full requests
- * (since they are redundant and side-effect-less).
- * Forced requests (app using metadata() API) are passed
- * through regardless. */
-
- mtx_lock(&rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock);
- if (*full_incr > 0 && (!rko || !rko->rko_u.metadata.force)) {
- mtx_unlock(
- &rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock);
- rd_rkb_dbg(rkb, METADATA, "METADATA",
- "Skipping metadata request: %s: "
- "full request already in-transit",
- reason);
- rd_kafka_buf_destroy(rkbuf);
- return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS;
- }
-
- (*full_incr)++;
- mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock);
- rkbuf->rkbuf_u.Metadata.decr = full_incr;
- rkbuf->rkbuf_u.Metadata.decr_lock =
- &rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock;
- }
-
-
- if (topic_cnt > 0) {
- char *topic;
- int i;
-
- /* Maintain a copy of the topics list so we can purge
- * hints from the metadata cache on error. */
- rkbuf->rkbuf_u.Metadata.topics =
- rd_list_copy(topics, rd_list_string_copy, NULL);
-
- RD_LIST_FOREACH(topic, topics, i) {
- rd_kafka_buf_write_str(rkbuf, topic, -1);
- /* Tags for previous topic */
- rd_kafka_buf_write_tags(rkbuf);
- }
- }
-
- if (ApiVersion >= 4) {
- /* AllowAutoTopicCreation */
- rd_kafka_buf_write_bool(rkbuf, allow_auto_create_topics);
-
- } else if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER &&
- !rkb->rkb_rk->rk_conf.allow_auto_create_topics &&
- rd_kafka_conf_is_modified(&rkb->rkb_rk->rk_conf,
- "allow.auto.create.topics") &&
- rd_interval(
- &rkb->rkb_rk->rk_suppress.allow_auto_create_topics,
- 30 * 60 * 1000 /* every 30 minutes */, 0) >= 0) {
- /* Let user know we can't obey allow.auto.create.topics */
- rd_rkb_log(rkb, LOG_WARNING, "AUTOCREATE",
- "allow.auto.create.topics=false not supported "
- "by broker: requires broker version >= 0.11.0.0: "
- "requested topic(s) may be auto created depending "
- "on broker auto.create.topics.enable configuration");
- }
-
- if (ApiVersion >= 8 && ApiVersion < 10) {
- /* TODO: implement KIP-430 */
- /* IncludeClusterAuthorizedOperations */
- rd_kafka_buf_write_bool(rkbuf, rd_false);
- }
-
- if (ApiVersion >= 8) {
- /* TODO: implement KIP-430 */
- /* IncludeTopicAuthorizedOperations */
- rd_kafka_buf_write_bool(rkbuf, rd_false);
- }
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- /* Metadata requests are part of the important control plane
- * and should go before most other requests (Produce, Fetch, etc). */
- rkbuf->rkbuf_prio = RD_KAFKA_PRIO_HIGH;
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf,
- /* Handle response thru rk_ops,
- * but forward parsed result to
- * rko's replyq when done. */
- RD_KAFKA_REPLYQ(rkb->rkb_rk->rk_ops, 0),
- rd_kafka_handle_Metadata, rko);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-/**
- * @brief Parses and handles ApiVersion reply.
- *
- * @param apis will be allocated, populated and sorted
- * with broker's supported APIs, or set to NULL.
- * @param api_cnt will be set to the number of elements in \p *apis
- *
- * @returns 0 on success, else an error.
- *
- * @remark A valid \p apis might be returned even if an error is returned.
- */
-rd_kafka_resp_err_t
-rd_kafka_handle_ApiVersion(rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- struct rd_kafka_ApiVersion **apis,
- size_t *api_cnt) {
- const int log_decode_errors = LOG_DEBUG;
- int32_t ApiArrayCnt;
- int16_t ErrorCode;
- int i = 0;
-
- *apis = NULL;
- *api_cnt = 0;
-
- if (err)
- goto err;
-
- rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
- err = ErrorCode;
-
- rd_kafka_buf_read_arraycnt(rkbuf, &ApiArrayCnt, 1000);
- if (err && ApiArrayCnt < 1) {
- /* Version >=3 returns the ApiVersions array if the error
- * code is ERR_UNSUPPORTED_VERSION, previous versions don't */
- goto err;
- }
-
- rd_rkb_dbg(rkb, FEATURE, "APIVERSION", "Broker API support:");
-
- *apis = rd_malloc(sizeof(**apis) * ApiArrayCnt);
-
- for (i = 0; i < ApiArrayCnt; i++) {
- struct rd_kafka_ApiVersion *api = &(*apis)[i];
-
- rd_kafka_buf_read_i16(rkbuf, &api->ApiKey);
- rd_kafka_buf_read_i16(rkbuf, &api->MinVer);
- rd_kafka_buf_read_i16(rkbuf, &api->MaxVer);
-
- rd_rkb_dbg(rkb, FEATURE, "APIVERSION",
- " ApiKey %s (%hd) Versions %hd..%hd",
- rd_kafka_ApiKey2str(api->ApiKey), api->ApiKey,
- api->MinVer, api->MaxVer);
-
- /* Discard struct tags */
- rd_kafka_buf_skip_tags(rkbuf);
- }
-
- if (request->rkbuf_reqhdr.ApiVersion >= 1)
- rd_kafka_buf_read_throttle_time(rkbuf);
-
- /* Discard end tags */
- rd_kafka_buf_skip_tags(rkbuf);
-
- *api_cnt = ApiArrayCnt;
- qsort(*apis, *api_cnt, sizeof(**apis), rd_kafka_ApiVersion_key_cmp);
-
- goto done;
-
-err_parse:
- /* If the broker does not support our ApiVersionRequest version it
- * will respond with a version 0 response, which will most likely
- * fail parsing. Instead of propagating the parse error we
- * propagate the original error, unless there isn't one in which case
- * we use the parse error. */
- if (!err)
- err = rkbuf->rkbuf_err;
-err:
- /* There are no retryable errors. */
-
- if (*apis)
- rd_free(*apis);
-
- *apis = NULL;
- *api_cnt = 0;
-
-done:
- return err;
-}
-
-
-
-/**
- * @brief Send ApiVersionRequest (KIP-35)
- *
- * @param ApiVersion If -1 use the highest supported version, else use the
- * specified value.
- */
-void rd_kafka_ApiVersionRequest(rd_kafka_broker_t *rkb,
- int16_t ApiVersion,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
-
- if (ApiVersion == -1)
- ApiVersion = 3;
-
- rkbuf = rd_kafka_buf_new_flexver_request(
- rkb, RD_KAFKAP_ApiVersion, 1, 3, ApiVersion >= 3 /*flexver*/);
-
- if (ApiVersion >= 3) {
- /* KIP-511 adds software name and version through the optional
- * protocol fields defined in KIP-482. */
-
- /* ClientSoftwareName */
- rd_kafka_buf_write_str(rkbuf, rkb->rkb_rk->rk_conf.sw_name, -1);
-
- /* ClientSoftwareVersion */
- rd_kafka_buf_write_str(rkbuf, rkb->rkb_rk->rk_conf.sw_version,
- -1);
- }
-
- /* Should be sent before any other requests since it is part of
- * the initial connection handshake. */
- rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH;
-
- /* Non-supporting brokers will tear down the connection when they
- * receive an unknown API request, so dont retry request on failure. */
- rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;
-
- /* 0.9.0.x brokers will not close the connection on unsupported
- * API requests, so we minimize the timeout for the request.
- * This is a regression on the broker part. */
- rd_kafka_buf_set_abs_timeout(
- rkbuf, rkb->rkb_rk->rk_conf.api_version_request_timeout_ms, 0);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- if (replyq.q)
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb,
- opaque);
- else /* in broker thread */
- rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
-}
-
-
-/**
- * Send SaslHandshakeRequest (KIP-43)
- */
-void rd_kafka_SaslHandshakeRequest(rd_kafka_broker_t *rkb,
- const char *mechanism,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int mechlen = (int)strlen(mechanism);
- int16_t ApiVersion;
- int features;
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslHandshake, 1,
- RD_KAFKAP_STR_SIZE0(mechlen));
-
- /* Should be sent before any other requests since it is part of
- * the initial connection handshake. */
- rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH;
-
- rd_kafka_buf_write_str(rkbuf, mechanism, mechlen);
-
- /* Non-supporting brokers will tear down the conneciton when they
- * receive an unknown API request or where the SASL GSSAPI
- * token type is not recognized, so dont retry request on failure. */
- rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;
-
- /* 0.9.0.x brokers will not close the connection on unsupported
- * API requests, so we minimize the timeout of the request.
- * This is a regression on the broker part. */
- if (!rkb->rkb_rk->rk_conf.api_version_request &&
- rkb->rkb_rk->rk_conf.socket_timeout_ms > 10 * 1000)
- rd_kafka_buf_set_abs_timeout(rkbuf, 10 * 1000 /*10s*/, 0);
-
- /* ApiVersion 1 / RD_KAFKA_FEATURE_SASL_REQ enables
- * the SaslAuthenticateRequest */
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_SaslHandshake, 0, 1, &features);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- if (replyq.q)
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb,
- opaque);
- else /* in broker thread */
- rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
-}
-
-
-/**
- * @brief Parses and handles an SaslAuthenticate reply.
- *
- * @returns 0 on success, else an error.
- *
- * @locality broker thread
- * @locks none
- */
-void rd_kafka_handle_SaslAuthenticate(rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- void *opaque) {
- const int log_decode_errors = LOG_ERR;
- int16_t error_code;
- rd_kafkap_str_t error_str;
- rd_kafkap_bytes_t auth_data;
- char errstr[512];
-
- if (err) {
- rd_snprintf(errstr, sizeof(errstr),
- "SaslAuthenticateRequest failed: %s",
- rd_kafka_err2str(err));
- goto err;
- }
-
- rd_kafka_buf_read_i16(rkbuf, &error_code);
- rd_kafka_buf_read_str(rkbuf, &error_str);
-
- if (error_code) {
- /* Authentication failed */
-
- /* For backwards compatibility translate the
- * new broker-side auth error code to our local error code. */
- if (error_code == RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED)
- err = RD_KAFKA_RESP_ERR__AUTHENTICATION;
- else
- err = error_code;
-
- rd_snprintf(errstr, sizeof(errstr), "%.*s",
- RD_KAFKAP_STR_PR(&error_str));
- goto err;
- }
-
- rd_kafka_buf_read_bytes(rkbuf, &auth_data);
-
- /* Pass SASL auth frame to SASL handler */
- if (rd_kafka_sasl_recv(rkb->rkb_transport, auth_data.data,
- (size_t)RD_KAFKAP_BYTES_LEN(&auth_data), errstr,
- sizeof(errstr)) == -1) {
- err = RD_KAFKA_RESP_ERR__AUTHENTICATION;
- goto err;
- }
-
- return;
-
-
-err_parse:
- err = rkbuf->rkbuf_err;
- rd_snprintf(errstr, sizeof(errstr),
- "SaslAuthenticateResponse parsing failed: %s",
- rd_kafka_err2str(err));
-
-err:
- rd_kafka_broker_fail(rkb, LOG_ERR, err, "SASL authentication error: %s",
- errstr);
-}
-
-
-/**
- * @brief Send SaslAuthenticateRequest (KIP-152)
- */
-void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb,
- const void *buf,
- size_t size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslAuthenticate, 0, 0);
-
- /* Should be sent before any other requests since it is part of
- * the initial connection handshake. */
- rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH;
-
- /* Broker does not support -1 (Null) for this field */
- rd_kafka_buf_write_bytes(rkbuf, buf ? buf : "", size);
-
- /* There are no errors that can be retried, instead
- * close down the connection and reconnect on failure. */
- rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;
-
- if (replyq.q)
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb,
- opaque);
- else /* in broker thread */
- rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque);
-}
-
-
-
-/**
- * @struct Hold temporary result and return values from ProduceResponse
- */
-struct rd_kafka_Produce_result {
- int64_t offset; /**< Assigned offset of first message */
- int64_t timestamp; /**< (Possibly assigned) offset of first message */
-};
-
-/**
- * @brief Parses a Produce reply.
- * @returns 0 on success or an error code on failure.
- * @locality broker thread
- */
-static rd_kafka_resp_err_t
-rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb,
- rd_kafka_toppar_t *rktp,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- struct rd_kafka_Produce_result *result) {
- int32_t TopicArrayCnt;
- int32_t PartitionArrayCnt;
- struct {
- int32_t Partition;
- int16_t ErrorCode;
- int64_t Offset;
- } hdr;
- const int log_decode_errors = LOG_ERR;
- int64_t log_start_offset = -1;
-
- rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt);
- if (TopicArrayCnt != 1)
- goto err;
-
- /* Since we only produce to one single topic+partition in each
- * request we assume that the reply only contains one topic+partition
- * and that it is the same that we requested.
- * If not the broker is buggy. */
- rd_kafka_buf_skip_str(rkbuf);
- rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt);
-
- if (PartitionArrayCnt != 1)
- goto err;
-
- rd_kafka_buf_read_i32(rkbuf, &hdr.Partition);
- rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode);
- rd_kafka_buf_read_i64(rkbuf, &hdr.Offset);
-
- result->offset = hdr.Offset;
-
- result->timestamp = -1;
- if (request->rkbuf_reqhdr.ApiVersion >= 2)
- rd_kafka_buf_read_i64(rkbuf, &result->timestamp);
-
- if (request->rkbuf_reqhdr.ApiVersion >= 5)
- rd_kafka_buf_read_i64(rkbuf, &log_start_offset);
-
- if (request->rkbuf_reqhdr.ApiVersion >= 1) {
- int32_t Throttle_Time;
- rd_kafka_buf_read_i32(rkbuf, &Throttle_Time);
-
- rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep,
- Throttle_Time);
- }
-
-
- return hdr.ErrorCode;
-
-err_parse:
- return rkbuf->rkbuf_err;
-err:
- return RD_KAFKA_RESP_ERR__BAD_MSG;
-}
-
-
-/**
- * @struct Hold temporary Produce error state
- */
-struct rd_kafka_Produce_err {
- rd_kafka_resp_err_t err; /**< Error code */
- int actions; /**< Actions to take */
- int incr_retry; /**< Increase per-message retry cnt */
- rd_kafka_msg_status_t status; /**< Messages persistence status */
-
- /* Idempotent Producer */
- int32_t next_ack_seq; /**< Next expected sequence to ack */
- int32_t next_err_seq; /**< Next expected error sequence */
- rd_bool_t update_next_ack; /**< Update next_ack_seq */
- rd_bool_t update_next_err; /**< Update next_err_seq */
- rd_kafka_pid_t rktp_pid; /**< Partition's current PID */
- int32_t last_seq; /**< Last sequence in current batch */
-};
-
-
-/**
- * @brief Error-handling for Idempotent Producer-specific Produce errors.
- *
- * May update \p errp, \p actionsp and \p incr_retryp.
- *
- * The resulting \p actionsp are handled by the caller.
- *
- * @warning May be called on the old leader thread. Lock rktp appropriately!
- *
- * @locality broker thread (but not necessarily the leader broker)
- * @locks none
- */
-static void
-rd_kafka_handle_idempotent_Produce_error(rd_kafka_broker_t *rkb,
- rd_kafka_msgbatch_t *batch,
- struct rd_kafka_Produce_err *perr) {
- rd_kafka_t *rk = rkb->rkb_rk;
- rd_kafka_toppar_t *rktp = batch->rktp;
- rd_kafka_msg_t *firstmsg, *lastmsg;
- int r;
- rd_ts_t now = rd_clock(), state_age;
- struct rd_kafka_toppar_err last_err;
-
- rd_kafka_rdlock(rkb->rkb_rk);
- state_age = now - rkb->rkb_rk->rk_eos.ts_idemp_state;
- rd_kafka_rdunlock(rkb->rkb_rk);
-
- firstmsg = rd_kafka_msgq_first(&batch->msgq);
- lastmsg = rd_kafka_msgq_last(&batch->msgq);
- rd_assert(firstmsg && lastmsg);
-
- /* Store the last msgid of the batch
- * on the first message in case we need to retry
- * and thus reconstruct the entire batch. */
- if (firstmsg->rkm_u.producer.last_msgid) {
- /* last_msgid already set, make sure it
- * actually points to the last message. */
- rd_assert(firstmsg->rkm_u.producer.last_msgid ==
- lastmsg->rkm_u.producer.msgid);
- } else {
- firstmsg->rkm_u.producer.last_msgid =
- lastmsg->rkm_u.producer.msgid;
- }
-
- if (!rd_kafka_pid_eq(batch->pid, perr->rktp_pid)) {
- /* Don't retry if PID changed since we can't
- * guarantee correctness across PID sessions. */
- perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT;
- perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
-
- rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "ERRPID",
- "%.*s [%" PRId32
- "] PID mismatch: "
- "request %s != partition %s: "
- "failing messages with error %s",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition, rd_kafka_pid2str(batch->pid),
- rd_kafka_pid2str(perr->rktp_pid),
- rd_kafka_err2str(perr->err));
- return;
- }
-
- /*
- * Special error handling
- */
- switch (perr->err) {
- case RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER:
- /* Compare request's sequence to expected next
- * acked sequence.
- *
- * Example requests in flight:
- * R1(base_seq:5) R2(10) R3(15) R4(20)
- */
-
- /* Acquire the last partition error to help
- * troubleshoot this problem. */
- rd_kafka_toppar_lock(rktp);
- last_err = rktp->rktp_last_err;
- rd_kafka_toppar_unlock(rktp);
-
- r = batch->first_seq - perr->next_ack_seq;
-
- if (r == 0) {
- /* R1 failed:
- * If this was the head-of-line request in-flight it
- * means there is a state desynchronization between the
- * producer and broker (a bug), in which case
- * we'll raise a fatal error since we can no longer
- * reason about the state of messages and thus
- * not guarantee ordering or once-ness for R1,
- * nor give the user a chance to opt out of sending
- * R2 to R4 which would be retried automatically. */
-
- rd_kafka_idemp_set_fatal_error(
- rk, perr->err,
- "ProduceRequest for %.*s [%" PRId32
- "] "
- "with %d message(s) failed "
- "due to sequence desynchronization with "
- "broker %" PRId32 " (%s, base seq %" PRId32
- ", "
- "idemp state change %" PRId64
- "ms ago, "
- "last partition error %s (actions %s, "
- "base seq %" PRId32 "..%" PRId32
- ", base msgid %" PRIu64 ", %" PRId64 "ms ago)",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- rd_kafka_msgq_len(&batch->msgq), rkb->rkb_nodeid,
- rd_kafka_pid2str(batch->pid), batch->first_seq,
- state_age / 1000, rd_kafka_err2name(last_err.err),
- rd_kafka_actions2str(last_err.actions),
- last_err.base_seq, last_err.last_seq,
- last_err.base_msgid,
- last_err.ts ? (now - last_err.ts) / 1000 : -1);
-
- perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT;
- perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
- perr->update_next_ack = rd_false;
- perr->update_next_err = rd_true;
-
- } else if (r > 0) {
- /* R2 failed:
- * With max.in.flight > 1 we can have a situation
- * where the first request in-flight (R1) to the broker
- * fails, which causes the sub-sequent requests
- * that are in-flight to have a non-sequential
- * sequence number and thus fail.
- * But these sub-sequent requests (R2 to R4) are not at
- * the risk of being duplicated so we bump the epoch and
- * re-enqueue the messages for later retry
- * (without incrementing retries).
- */
- rd_rkb_dbg(
- rkb, MSG | RD_KAFKA_DBG_EOS, "ERRSEQ",
- "ProduceRequest for %.*s [%" PRId32
- "] "
- "with %d message(s) failed "
- "due to skipped sequence numbers "
- "(%s, base seq %" PRId32
- " > "
- "next seq %" PRId32
- ") "
- "caused by previous failed request "
- "(%s, actions %s, "
- "base seq %" PRId32 "..%" PRId32
- ", base msgid %" PRIu64 ", %" PRId64
- "ms ago): "
- "recovering and retrying",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- rd_kafka_msgq_len(&batch->msgq),
- rd_kafka_pid2str(batch->pid), batch->first_seq,
- perr->next_ack_seq, rd_kafka_err2name(last_err.err),
- rd_kafka_actions2str(last_err.actions),
- last_err.base_seq, last_err.last_seq,
- last_err.base_msgid,
- last_err.ts ? (now - last_err.ts) / 1000 : -1);
-
- perr->incr_retry = 0;
- perr->actions = RD_KAFKA_ERR_ACTION_RETRY;
- perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
- perr->update_next_ack = rd_false;
- perr->update_next_err = rd_true;
-
- rd_kafka_idemp_drain_epoch_bump(
- rk, perr->err, "skipped sequence numbers");
-
- } else {
- /* Request's sequence is less than next ack,
- * this should never happen unless we have
- * local bug or the broker did not respond
- * to the requests in order. */
- rd_kafka_idemp_set_fatal_error(
- rk, perr->err,
- "ProduceRequest for %.*s [%" PRId32
- "] "
- "with %d message(s) failed "
- "with rewound sequence number on "
- "broker %" PRId32
- " (%s, "
- "base seq %" PRId32 " < next seq %" PRId32
- "): "
- "last error %s (actions %s, "
- "base seq %" PRId32 "..%" PRId32
- ", base msgid %" PRIu64 ", %" PRId64 "ms ago)",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- rd_kafka_msgq_len(&batch->msgq), rkb->rkb_nodeid,
- rd_kafka_pid2str(batch->pid), batch->first_seq,
- perr->next_ack_seq, rd_kafka_err2name(last_err.err),
- rd_kafka_actions2str(last_err.actions),
- last_err.base_seq, last_err.last_seq,
- last_err.base_msgid,
- last_err.ts ? (now - last_err.ts) / 1000 : -1);
-
- perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT;
- perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
- perr->update_next_ack = rd_false;
- perr->update_next_err = rd_false;
- }
- break;
-
- case RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER:
- /* This error indicates that we successfully produced
- * this set of messages before but this (supposed) retry failed.
- *
- * Treat as success, however offset and timestamp
- * will be invalid. */
-
- /* Future improvement/FIXME:
- * But first make sure the first message has actually
- * been retried, getting this error for a non-retried message
- * indicates a synchronization issue or bug. */
- rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "DUPSEQ",
- "ProduceRequest for %.*s [%" PRId32
- "] "
- "with %d message(s) failed "
- "due to duplicate sequence number: "
- "previous send succeeded but was not acknowledged "
- "(%s, base seq %" PRId32
- "): "
- "marking the messages successfully delivered",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- rd_kafka_msgq_len(&batch->msgq),
- rd_kafka_pid2str(batch->pid), batch->first_seq);
-
- /* Void error, delivery succeeded */
- perr->err = RD_KAFKA_RESP_ERR_NO_ERROR;
- perr->actions = 0;
- perr->status = RD_KAFKA_MSG_STATUS_PERSISTED;
- perr->update_next_ack = rd_true;
- perr->update_next_err = rd_true;
- break;
-
- case RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID:
- /* The broker/cluster lost track of our PID because
- * the last message we produced has now been deleted
- * (by DeleteRecords, compaction, or topic retention policy).
- *
- * If all previous messages are accounted for and this is not
- * a retry we can simply bump the epoch and reset the sequence
- * number and then retry the message(s) again.
- *
- * If there are outstanding messages not yet acknowledged
- * then there is no safe way to carry on without risking
- * duplication or reordering, in which case we fail
- * the producer.
- *
- * In case of the transactional producer and a transaction
- * coordinator that supports KIP-360 (>= AK 2.5, checked from
- * the txnmgr, not here) we'll raise an abortable error and
- * flag that the epoch needs to be bumped on the coordinator. */
- if (rd_kafka_is_transactional(rk)) {
- rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "UNKPID",
- "ProduceRequest for %.*s [%" PRId32
- "] "
- "with %d message(s) failed "
- "due to unknown producer id "
- "(%s, base seq %" PRId32
- ", %d retries): "
- "failing the current transaction",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- rd_kafka_msgq_len(&batch->msgq),
- rd_kafka_pid2str(batch->pid),
- batch->first_seq,
- firstmsg->rkm_u.producer.retries);
-
- /* Drain outstanding requests and bump epoch. */
- rd_kafka_idemp_drain_epoch_bump(rk, perr->err,
- "unknown producer id");
-
- rd_kafka_txn_set_abortable_error_with_bump(
- rk, RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
- "ProduceRequest for %.*s [%" PRId32
- "] "
- "with %d message(s) failed "
- "due to unknown producer id",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- rd_kafka_msgq_len(&batch->msgq));
-
- perr->incr_retry = 0;
- perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT;
- perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
- perr->update_next_ack = rd_false;
- perr->update_next_err = rd_true;
- break;
-
- } else if (!firstmsg->rkm_u.producer.retries &&
- perr->next_err_seq == batch->first_seq) {
- rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "UNKPID",
- "ProduceRequest for %.*s [%" PRId32
- "] "
- "with %d message(s) failed "
- "due to unknown producer id "
- "(%s, base seq %" PRId32
- ", %d retries): "
- "no risk of duplication/reordering: "
- "resetting PID and retrying",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- rd_kafka_msgq_len(&batch->msgq),
- rd_kafka_pid2str(batch->pid),
- batch->first_seq,
- firstmsg->rkm_u.producer.retries);
-
- /* Drain outstanding requests and bump epoch. */
- rd_kafka_idemp_drain_epoch_bump(rk, perr->err,
- "unknown producer id");
-
- perr->incr_retry = 0;
- perr->actions = RD_KAFKA_ERR_ACTION_RETRY;
- perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
- perr->update_next_ack = rd_false;
- perr->update_next_err = rd_true;
- break;
- }
-
- rd_kafka_idemp_set_fatal_error(
- rk, perr->err,
- "ProduceRequest for %.*s [%" PRId32
- "] "
- "with %d message(s) failed "
- "due to unknown producer id ("
- "broker %" PRId32 " %s, base seq %" PRId32
- ", %d retries): "
- "unable to retry without risking "
- "duplication/reordering",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition, rd_kafka_msgq_len(&batch->msgq),
- rkb->rkb_nodeid, rd_kafka_pid2str(batch->pid),
- batch->first_seq, firstmsg->rkm_u.producer.retries);
-
- perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT;
- perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
- perr->update_next_ack = rd_false;
- perr->update_next_err = rd_true;
- break;
-
- default:
- /* All other errors are handled in the standard
- * error Produce handler, which will set
- * update_next_ack|err accordingly. */
- break;
- }
-}
-
-
-
-/**
- * @brief Error-handling for failed ProduceRequests
- *
- * @param errp Is the input and output error, it may be changed
- * by this function.
- *
- * @returns 0 if no further processing of the request should be performed,
- * such as triggering delivery reports, else 1.
- *
- * @warning May be called on the old leader thread. Lock rktp appropriately!
- *
- * @warning \p request may be NULL.
- *
- * @locality broker thread (but not necessarily the leader broker)
- * @locks none
- */
-static int rd_kafka_handle_Produce_error(rd_kafka_broker_t *rkb,
- const rd_kafka_buf_t *request,
- rd_kafka_msgbatch_t *batch,
- struct rd_kafka_Produce_err *perr) {
- rd_kafka_t *rk = rkb->rkb_rk;
- rd_kafka_toppar_t *rktp = batch->rktp;
- int is_leader;
-
- if (unlikely(perr->err == RD_KAFKA_RESP_ERR__DESTROY))
- return 0; /* Terminating */
-
- /* When there is a partition leader change any outstanding
- * requests to the old broker will be handled by the old
- * broker thread when the responses are received/timeout:
- * in this case we need to be careful with locking:
- * check once if we're the leader (which allows relaxed
- * locking), and cache the current rktp's eos state vars. */
- rd_kafka_toppar_lock(rktp);
- is_leader = rktp->rktp_broker == rkb;
- perr->rktp_pid = rktp->rktp_eos.pid;
- perr->next_ack_seq = rktp->rktp_eos.next_ack_seq;
- perr->next_err_seq = rktp->rktp_eos.next_err_seq;
- rd_kafka_toppar_unlock(rktp);
-
- /* All failures are initially treated as if the message
- * was not persisted, but the status may be changed later
- * for specific errors and actions. */
- perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
-
- /* Set actions for known errors (may be overriden later),
- * all other errors are considered permanent failures.
- * (also see rd_kafka_err_action() for the default actions). */
- perr->actions = rd_kafka_err_action(
- rkb, perr->err, request,
-
- RD_KAFKA_ERR_ACTION_REFRESH |
- RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
- RD_KAFKA_RESP_ERR__TRANSPORT,
-
- RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
- RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART,
-
- RD_KAFKA_ERR_ACTION_PERMANENT |
- RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
- RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
-
- RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY |
- RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
- RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR,
-
- RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
- RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS,
-
- RD_KAFKA_ERR_ACTION_RETRY |
- RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
- RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND,
-
- RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
- RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
-
- RD_KAFKA_ERR_ACTION_RETRY |
- RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
- RD_KAFKA_RESP_ERR__TIMED_OUT,
-
- RD_KAFKA_ERR_ACTION_PERMANENT |
- RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
- RD_KAFKA_RESP_ERR__MSG_TIMED_OUT,
-
- /* All Idempotent Producer-specific errors are
- * initially set as permanent errors,
- * special handling may change the actions. */
- RD_KAFKA_ERR_ACTION_PERMANENT |
- RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
- RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
-
- RD_KAFKA_ERR_ACTION_PERMANENT |
- RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED,
- RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER,
-
- RD_KAFKA_ERR_ACTION_PERMANENT |
- RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
- RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
-
- RD_KAFKA_ERR_ACTION_PERMANENT |
- RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED,
- RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH,
-
- /* Message was purged from out-queue due to
- * Idempotent Producer Id change */
- RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__RETRY,
-
- RD_KAFKA_ERR_ACTION_END);
-
- rd_rkb_dbg(rkb, MSG, "MSGSET",
- "%s [%" PRId32
- "]: MessageSet with %i message(s) "
- "(MsgId %" PRIu64 ", BaseSeq %" PRId32
- ") "
- "encountered error: %s (actions %s)%s",
- rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
- rd_kafka_msgq_len(&batch->msgq), batch->first_msgid,
- batch->first_seq, rd_kafka_err2str(perr->err),
- rd_kafka_actions2str(perr->actions),
- is_leader ? "" : " [NOT LEADER]");
-
-
- /*
- * Special handling for Idempotent Producer
- *
- * Note: Idempotent Producer-specific errors received
- * on a non-idempotent producer will be passed through
- * directly to the application.
- */
- if (rd_kafka_is_idempotent(rk))
- rd_kafka_handle_idempotent_Produce_error(rkb, batch, perr);
-
- /* Update message persistence status based on action flags.
- * None of these are typically set after an idempotent error,
- * which sets the status explicitly. */
- if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED)
- perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
- else if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED)
- perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
- else if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_PERSISTED)
- perr->status = RD_KAFKA_MSG_STATUS_PERSISTED;
-
- /* Save the last error for debugging sub-sequent errors,
- * useful for Idempotent Producer throubleshooting. */
- rd_kafka_toppar_lock(rktp);
- rktp->rktp_last_err.err = perr->err;
- rktp->rktp_last_err.actions = perr->actions;
- rktp->rktp_last_err.ts = rd_clock();
- rktp->rktp_last_err.base_seq = batch->first_seq;
- rktp->rktp_last_err.last_seq = perr->last_seq;
- rktp->rktp_last_err.base_msgid = batch->first_msgid;
- rd_kafka_toppar_unlock(rktp);
-
- /*
- * Handle actions
- */
- if (perr->actions &
- (RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY)) {
- /* Retry (refresh also implies retry) */
-
- if (perr->actions & RD_KAFKA_ERR_ACTION_REFRESH) {
- /* Request metadata information update.
- * These errors imply that we have stale
- * information and the request was
- * either rejected or not sent -
- * we don't need to increment the retry count
- * when we perform a retry since:
- * - it is a temporary error (hopefully)
- * - there is no chance of duplicate delivery
- */
- rd_kafka_toppar_leader_unavailable(rktp, "produce",
- perr->err);
-
- /* We can't be certain the request wasn't
- * sent in case of transport failure,
- * so the ERR__TRANSPORT case will need
- * the retry count to be increased,
- * In case of certain other errors we want to
- * avoid retrying for the duration of the
- * message.timeout.ms to speed up error propagation. */
- if (perr->err != RD_KAFKA_RESP_ERR__TRANSPORT &&
- perr->err != RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR)
- perr->incr_retry = 0;
- }
-
- /* If message timed out in queue, not in transit,
- * we will retry at a later time but not increment
- * the retry count since there is no risk
- * of duplicates. */
- if (!rd_kafka_buf_was_sent(request))
- perr->incr_retry = 0;
-
- if (!perr->incr_retry) {
- /* If retries are not to be incremented then
- * there is no chance of duplicates on retry, which
- * means these messages were not persisted. */
- perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED;
- }
-
- if (rd_kafka_is_idempotent(rk)) {
- /* Any currently in-flight requests will
- * fail with ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
- * which should not be treated as a fatal error
- * since this request and sub-sequent requests
- * will be retried and thus return to order.
- * Unless the error was a timeout, or similar,
- * in which case the request might have made it
- * and the messages are considered possibly persisted:
- * in this case we allow the next in-flight response
- * to be successful, in which case we mark
- * this request's messages as succesfully delivered. */
- if (perr->status &
- RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED)
- perr->update_next_ack = rd_true;
- else
- perr->update_next_ack = rd_false;
- perr->update_next_err = rd_true;
-
- /* Drain outstanding requests so that retries
- * are attempted with proper state knowledge and
- * without any in-flight requests. */
- rd_kafka_toppar_lock(rktp);
- rd_kafka_idemp_drain_toppar(rktp,
- "drain before retrying");
- rd_kafka_toppar_unlock(rktp);
- }
-
- /* Since requests are specific to a broker
- * we move the retryable messages from the request
- * back to the partition queue (prepend) and then
- * let the new broker construct a new request.
- * While doing this we also make sure the retry count
- * for each message is honoured, any messages that
- * would exceeded the retry count will not be
- * moved but instead fail below. */
- rd_kafka_toppar_retry_msgq(rktp, &batch->msgq, perr->incr_retry,
- perr->status);
-
- if (rd_kafka_msgq_len(&batch->msgq) == 0) {
- /* No need do anything more with the request
- * here since the request no longer has any
- * messages associated with it. */
- return 0;
- }
- }
-
- if (perr->actions & RD_KAFKA_ERR_ACTION_PERMANENT &&
- rd_kafka_is_idempotent(rk)) {
- if (rd_kafka_is_transactional(rk) &&
- perr->err == RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH) {
- /* Producer was fenced by new transactional producer
- * with the same transactional.id */
- rd_kafka_txn_set_fatal_error(
- rk, RD_DO_LOCK, RD_KAFKA_RESP_ERR__FENCED,
- "ProduceRequest for %.*s [%" PRId32
- "] "
- "with %d message(s) failed: %s "
- "(broker %" PRId32 " %s, base seq %" PRId32
- "): "
- "transactional producer fenced by newer "
- "producer instance",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- rd_kafka_msgq_len(&batch->msgq),
- rd_kafka_err2str(perr->err), rkb->rkb_nodeid,
- rd_kafka_pid2str(batch->pid), batch->first_seq);
-
- /* Drain outstanding requests and reset PID. */
- rd_kafka_idemp_drain_reset(
- rk, "fenced by new transactional producer");
-
- } else if (rd_kafka_is_transactional(rk)) {
- /* When transactional any permanent produce failure
- * would lead to an incomplete transaction, so raise
- * an abortable transaction error. */
- rd_kafka_txn_set_abortable_error(
- rk, perr->err,
- "ProduceRequest for %.*s [%" PRId32
- "] "
- "with %d message(s) failed: %s "
- "(broker %" PRId32 " %s, base seq %" PRId32
- "): "
- "current transaction must be aborted",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- rd_kafka_msgq_len(&batch->msgq),
- rd_kafka_err2str(perr->err), rkb->rkb_nodeid,
- rd_kafka_pid2str(batch->pid), batch->first_seq);
-
- } else if (rk->rk_conf.eos.gapless) {
- /* A permanent non-idempotent error will lead to
- * gaps in the message series, the next request
- * will fail with ...ERR_OUT_OF_ORDER_SEQUENCE_NUMBER.
- * To satisfy the gapless guarantee we need to raise
- * a fatal error here. */
- rd_kafka_idemp_set_fatal_error(
- rk, RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE,
- "ProduceRequest for %.*s [%" PRId32
- "] "
- "with %d message(s) failed: "
- "%s (broker %" PRId32 " %s, base seq %" PRId32
- "): "
- "unable to satisfy gap-less guarantee",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- rd_kafka_msgq_len(&batch->msgq),
- rd_kafka_err2str(perr->err), rkb->rkb_nodeid,
- rd_kafka_pid2str(batch->pid), batch->first_seq);
-
- /* Drain outstanding requests and reset PID. */
- rd_kafka_idemp_drain_reset(
- rk, "unable to satisfy gap-less guarantee");
-
- } else {
- /* If gapless is not set we bump the Epoch and
- * renumber the messages to send. */
-
- /* Drain outstanding requests and bump the epoch .*/
- rd_kafka_idemp_drain_epoch_bump(rk, perr->err,
- "message sequence gap");
- }
-
- perr->update_next_ack = rd_false;
- /* Make sure the next error will not raise a fatal error. */
- perr->update_next_err = rd_true;
- }
-
- if (perr->err == RD_KAFKA_RESP_ERR__TIMED_OUT ||
- perr->err == RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE) {
- /* Translate request-level timeout error code
- * to message-level timeout error code. */
- perr->err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
-
- } else if (perr->err == RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED) {
- /* If we're no longer authorized to access the topic mark
- * it as errored to deny further produce requests. */
- rd_kafka_topic_wrlock(rktp->rktp_rkt);
- rd_kafka_topic_set_error(rktp->rktp_rkt, perr->err);
- rd_kafka_topic_wrunlock(rktp->rktp_rkt);
- }
-
- return 1;
-}
-
-/**
- * @brief Handle ProduceResponse success for idempotent producer
- *
- * @warning May be called on the old leader thread. Lock rktp appropriately!
- *
- * @locks none
- * @locality broker thread (but not necessarily the leader broker thread)
- */
-static void
-rd_kafka_handle_idempotent_Produce_success(rd_kafka_broker_t *rkb,
- rd_kafka_msgbatch_t *batch,
- int32_t next_seq) {
- rd_kafka_t *rk = rkb->rkb_rk;
- rd_kafka_toppar_t *rktp = batch->rktp;
- char fatal_err[512];
- uint64_t first_msgid, last_msgid;
-
- *fatal_err = '\0';
-
- first_msgid = rd_kafka_msgq_first(&batch->msgq)->rkm_u.producer.msgid;
- last_msgid = rd_kafka_msgq_last(&batch->msgq)->rkm_u.producer.msgid;
-
- rd_kafka_toppar_lock(rktp);
-
- /* If the last acked msgid is higher than
- * the next message to (re)transmit in the message queue
- * it means a previous series of R1,R2 ProduceRequests
- * had R1 fail with uncertain persistence status,
- * such as timeout or transport error, but R2 succeeded,
- * which means the messages in R1 were in fact persisted.
- * In this case trigger delivery reports for all messages
- * in queue until we hit a non-acked message msgid. */
- if (unlikely(rktp->rktp_eos.acked_msgid < first_msgid - 1)) {
- rd_kafka_dr_implicit_ack(rkb, rktp, last_msgid);
-
- } else if (unlikely(batch->first_seq != rktp->rktp_eos.next_ack_seq &&
- batch->first_seq == rktp->rktp_eos.next_err_seq)) {
- /* Response ordering is typically not a concern
- * (but will not happen with current broker versions),
- * unless we're expecting an error to be returned at
- * this sequence rather than a success ack, in which
- * case raise a fatal error. */
-
- /* Can't call set_fatal_error() while
- * holding the toppar lock, so construct
- * the error string here and call
- * set_fatal_error() below after
- * toppar lock has been released. */
- rd_snprintf(fatal_err, sizeof(fatal_err),
- "ProduceRequest for %.*s [%" PRId32
- "] "
- "with %d message(s) "
- "succeeded when expecting failure "
- "(broker %" PRId32
- " %s, "
- "base seq %" PRId32
- ", "
- "next ack seq %" PRId32
- ", "
- "next err seq %" PRId32
- ": "
- "unable to retry without risking "
- "duplication/reordering",
- RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
- rktp->rktp_partition,
- rd_kafka_msgq_len(&batch->msgq), rkb->rkb_nodeid,
- rd_kafka_pid2str(batch->pid), batch->first_seq,
- rktp->rktp_eos.next_ack_seq,
- rktp->rktp_eos.next_err_seq);
-
- rktp->rktp_eos.next_err_seq = next_seq;
- }
-
- if (likely(!*fatal_err)) {
- /* Advance next expected err and/or ack sequence */
-
- /* Only step err seq if it hasn't diverged. */
- if (rktp->rktp_eos.next_err_seq == rktp->rktp_eos.next_ack_seq)
- rktp->rktp_eos.next_err_seq = next_seq;
-
- rktp->rktp_eos.next_ack_seq = next_seq;
- }
-
- /* Store the last acked message sequence,
- * since retries within the broker cache window (5 requests)
- * will succeed for older messages we must only update the
- * acked msgid if it is higher than the last acked. */
- if (last_msgid > rktp->rktp_eos.acked_msgid)
- rktp->rktp_eos.acked_msgid = last_msgid;
-
- rd_kafka_toppar_unlock(rktp);
-
- /* Must call set_fatal_error() after releasing
- * the toppar lock. */
- if (unlikely(*fatal_err))
- rd_kafka_idemp_set_fatal_error(
- rk, RD_KAFKA_RESP_ERR__INCONSISTENT, "%s", fatal_err);
-}
-
-
-/**
- * @brief Handle ProduceRequest result for a message batch.
- *
- * @warning \p request may be NULL.
- *
- * @localiy broker thread (but not necessarily the toppar's handler thread)
- * @locks none
- */
-static void rd_kafka_msgbatch_handle_Produce_result(
- rd_kafka_broker_t *rkb,
- rd_kafka_msgbatch_t *batch,
- rd_kafka_resp_err_t err,
- const struct rd_kafka_Produce_result *presult,
- const rd_kafka_buf_t *request) {
-
- rd_kafka_t *rk = rkb->rkb_rk;
- rd_kafka_toppar_t *rktp = batch->rktp;
- rd_kafka_msg_status_t status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED;
- rd_bool_t last_inflight;
- int32_t next_seq;
-
- /* Decrease partition's messages in-flight counter */
- rd_assert(rd_atomic32_get(&rktp->rktp_msgs_inflight) >=
- rd_kafka_msgq_len(&batch->msgq));
- last_inflight = !rd_atomic32_sub(&rktp->rktp_msgs_inflight,
- rd_kafka_msgq_len(&batch->msgq));
-
- /* Next expected sequence (and handle wrap) */
- next_seq = rd_kafka_seq_wrap(batch->first_seq +
- rd_kafka_msgq_len(&batch->msgq));
-
- if (likely(!err)) {
- rd_rkb_dbg(rkb, MSG, "MSGSET",
- "%s [%" PRId32
- "]: MessageSet with %i message(s) "
- "(MsgId %" PRIu64 ", BaseSeq %" PRId32 ") delivered",
- rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
- rd_kafka_msgq_len(&batch->msgq), batch->first_msgid,
- batch->first_seq);
-
- if (rktp->rktp_rkt->rkt_conf.required_acks != 0)
- status = RD_KAFKA_MSG_STATUS_PERSISTED;
-
- if (rd_kafka_is_idempotent(rk))
- rd_kafka_handle_idempotent_Produce_success(rkb, batch,
- next_seq);
- } else {
- /* Error handling */
- struct rd_kafka_Produce_err perr = {
- .err = err,
- .incr_retry = 1,
- .status = status,
- .update_next_ack = rd_true,
- .update_next_err = rd_true,
- .last_seq = (batch->first_seq +
- rd_kafka_msgq_len(&batch->msgq) - 1)};
-
- rd_kafka_handle_Produce_error(rkb, request, batch, &perr);
-
- /* Update next expected acked and/or err sequence. */
- if (perr.update_next_ack || perr.update_next_err) {
- rd_kafka_toppar_lock(rktp);
- if (perr.update_next_ack)
- rktp->rktp_eos.next_ack_seq = next_seq;
- if (perr.update_next_err)
- rktp->rktp_eos.next_err_seq = next_seq;
- rd_kafka_toppar_unlock(rktp);
- }
-
- err = perr.err;
- status = perr.status;
- }
-
-
- /* Messages to retry will have been removed from the request's queue */
- if (likely(rd_kafka_msgq_len(&batch->msgq) > 0)) {
- /* Set offset, timestamp and status for each message. */
- rd_kafka_msgq_set_metadata(&batch->msgq, rkb->rkb_nodeid,
- presult->offset, presult->timestamp,
- status);
-
- /* Enqueue messages for delivery report. */
- rd_kafka_dr_msgq(rktp->rktp_rkt, &batch->msgq, err);
- }
-
- if (rd_kafka_is_idempotent(rk) && last_inflight)
- rd_kafka_idemp_inflight_toppar_sub(rk, rktp);
-}
-
-
-/**
- * @brief Handle ProduceResponse
- *
- * @param reply is NULL when `acks=0` and on various local errors.
- *
- * @remark ProduceRequests are never retried, retriable errors are
- * instead handled by re-enqueuing the request's messages back
- * on the partition queue to have a new ProduceRequest constructed
- * eventually.
- *
- * @warning May be called on the old leader thread. Lock rktp appropriately!
- *
- * @locality broker thread (but not necessarily the leader broker thread)
- */
-static void rd_kafka_handle_Produce(rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *reply,
- rd_kafka_buf_t *request,
- void *opaque) {
- rd_kafka_msgbatch_t *batch = &request->rkbuf_batch;
- rd_kafka_toppar_t *rktp = batch->rktp;
- struct rd_kafka_Produce_result result = {
- .offset = RD_KAFKA_OFFSET_INVALID, .timestamp = -1};
-
- /* Unit test interface: inject errors */
- if (unlikely(rk->rk_conf.ut.handle_ProduceResponse != NULL)) {
- err = rk->rk_conf.ut.handle_ProduceResponse(
- rkb->rkb_rk, rkb->rkb_nodeid, batch->first_msgid, err);
- }
-
- /* Parse Produce reply (unless the request errored) */
- if (!err && reply)
- err = rd_kafka_handle_Produce_parse(rkb, rktp, reply, request,
- &result);
-
- rd_kafka_msgbatch_handle_Produce_result(rkb, batch, err, &result,
- request);
-}
-
-
-/**
- * @brief Send ProduceRequest for messages in toppar queue.
- *
- * @returns the number of messages included, or 0 on error / no messages.
- *
- * @locality broker thread
- */
-int rd_kafka_ProduceRequest(rd_kafka_broker_t *rkb,
- rd_kafka_toppar_t *rktp,
- const rd_kafka_pid_t pid,
- uint64_t epoch_base_msgid) {
- rd_kafka_buf_t *rkbuf;
- rd_kafka_topic_t *rkt = rktp->rktp_rkt;
- size_t MessageSetSize = 0;
- int cnt;
- rd_ts_t now;
- int64_t first_msg_timeout;
- int tmout;
-
- /**
- * Create ProduceRequest with as many messages from the toppar
- * transmit queue as possible.
- */
- rkbuf = rd_kafka_msgset_create_ProduceRequest(
- rkb, rktp, &rktp->rktp_xmit_msgq, pid, epoch_base_msgid,
- &MessageSetSize);
- if (unlikely(!rkbuf))
- return 0;
-
- cnt = rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq);
- rd_dassert(cnt > 0);
-
- rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchcnt, (int64_t)cnt);
- rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchsize, (int64_t)MessageSetSize);
-
- if (!rkt->rkt_conf.required_acks)
- rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE;
-
- /* Use timeout from first message in batch */
- now = rd_clock();
- first_msg_timeout =
- (rd_kafka_msgq_first(&rkbuf->rkbuf_batch.msgq)->rkm_ts_timeout -
- now) /
- 1000;
-
- if (unlikely(first_msg_timeout <= 0)) {
- /* Message has already timed out, allow 100 ms
- * to produce anyway */
- tmout = 100;
- } else {
- tmout = (int)RD_MIN(INT_MAX, first_msg_timeout);
- }
-
- /* Set absolute timeout (including retries), the
- * effective timeout for this specific request will be
- * capped by socket.timeout.ms */
- rd_kafka_buf_set_abs_timeout(rkbuf, tmout, now);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, RD_KAFKA_NO_REPLYQ,
- rd_kafka_handle_Produce, NULL);
-
- return cnt;
-}
-
-
-/**
- * @brief Construct and send CreateTopicsRequest to \p rkb
- * with the topics (NewTopic_t*) in \p new_topics, using
- * \p options.
- *
- * The response (unparsed) will be enqueued on \p replyq
- * for handling by \p resp_cb (with \p opaque passed).
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code and errstr will be
- * updated with a human readable error string.
- */
-rd_kafka_resp_err_t
-rd_kafka_CreateTopicsRequest(rd_kafka_broker_t *rkb,
- const rd_list_t *new_topics /*(NewTopic_t*)*/,
- rd_kafka_AdminOptions_t *options,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- int features;
- int i = 0;
- rd_kafka_NewTopic_t *newt;
- int op_timeout;
-
- if (rd_list_cnt(new_topics) == 0) {
- rd_snprintf(errstr, errstr_size, "No topics to create");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
- }
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_CreateTopics, 0, 4, &features);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "Topic Admin API (KIP-4) not supported "
- "by broker, requires broker version >= 0.10.2.0");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- if (rd_kafka_confval_get_int(&options->validate_only) &&
- ApiVersion < 1) {
- rd_snprintf(errstr, errstr_size,
- "CreateTopics.validate_only=true not "
- "supported by broker");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreateTopics, 1,
- 4 + (rd_list_cnt(new_topics) * 200) +
- 4 + 1);
-
- /* #topics */
- rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_topics));
-
- while ((newt = rd_list_elem(new_topics, i++))) {
- int partition;
- int ei = 0;
- const rd_kafka_ConfigEntry_t *entry;
-
- if (ApiVersion < 4) {
- if (newt->num_partitions == -1) {
- rd_snprintf(errstr, errstr_size,
- "Default partition count (KIP-464) "
- "not supported by broker, "
- "requires broker version <= 2.4.0");
- rd_kafka_replyq_destroy(&replyq);
- rd_kafka_buf_destroy(rkbuf);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- if (newt->replication_factor == -1 &&
- rd_list_empty(&newt->replicas)) {
- rd_snprintf(errstr, errstr_size,
- "Default replication factor "
- "(KIP-464) "
- "not supported by broker, "
- "requires broker version <= 2.4.0");
- rd_kafka_replyq_destroy(&replyq);
- rd_kafka_buf_destroy(rkbuf);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
- }
-
- /* topic */
- rd_kafka_buf_write_str(rkbuf, newt->topic, -1);
-
- if (rd_list_cnt(&newt->replicas)) {
- /* num_partitions and replication_factor must be
- * set to -1 if a replica assignment is sent. */
- /* num_partitions */
- rd_kafka_buf_write_i32(rkbuf, -1);
- /* replication_factor */
- rd_kafka_buf_write_i16(rkbuf, -1);
- } else {
- /* num_partitions */
- rd_kafka_buf_write_i32(rkbuf, newt->num_partitions);
- /* replication_factor */
- rd_kafka_buf_write_i16(
- rkbuf, (int16_t)newt->replication_factor);
- }
-
- /* #replica_assignment */
- rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->replicas));
-
- /* Replicas per partition, see rdkafka_admin.[ch]
- * for how these are constructed. */
- for (partition = 0; partition < rd_list_cnt(&newt->replicas);
- partition++) {
- const rd_list_t *replicas;
- int ri = 0;
-
- replicas = rd_list_elem(&newt->replicas, partition);
- if (!replicas)
- continue;
-
- /* partition */
- rd_kafka_buf_write_i32(rkbuf, partition);
- /* #replicas */
- rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(replicas));
-
- for (ri = 0; ri < rd_list_cnt(replicas); ri++) {
- /* replica */
- rd_kafka_buf_write_i32(
- rkbuf, rd_list_get_int32(replicas, ri));
- }
- }
-
- /* #config_entries */
- rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->config));
-
- RD_LIST_FOREACH(entry, &newt->config, ei) {
- /* config_name */
- rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1);
- /* config_value (nullable) */
- rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1);
- }
- }
-
- /* timeout */
- op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
- rd_kafka_buf_write_i32(rkbuf, op_timeout);
-
- if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
- rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
-
- if (ApiVersion >= 1) {
- /* validate_only */
- rd_kafka_buf_write_i8(
- rkbuf, rd_kafka_confval_get_int(&options->validate_only));
- }
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief Construct and send DeleteTopicsRequest to \p rkb
- * with the topics (DeleteTopic_t *) in \p del_topics, using
- * \p options.
- *
- * The response (unparsed) will be enqueued on \p replyq
- * for handling by \p resp_cb (with \p opaque passed).
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code and errstr will be
- * updated with a human readable error string.
- */
-rd_kafka_resp_err_t
-rd_kafka_DeleteTopicsRequest(rd_kafka_broker_t *rkb,
- const rd_list_t *del_topics /*(DeleteTopic_t*)*/,
- rd_kafka_AdminOptions_t *options,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- int features;
- int i = 0;
- rd_kafka_DeleteTopic_t *delt;
- int op_timeout;
-
- if (rd_list_cnt(del_topics) == 0) {
- rd_snprintf(errstr, errstr_size, "No topics to delete");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
- }
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_DeleteTopics, 0, 1, &features);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "Topic Admin API (KIP-4) not supported "
- "by broker, requires broker version >= 0.10.2.0");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- rkbuf =
- rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteTopics, 1,
- /* FIXME */
- 4 + (rd_list_cnt(del_topics) * 100) + 4);
-
- /* #topics */
- rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(del_topics));
-
- while ((delt = rd_list_elem(del_topics, i++)))
- rd_kafka_buf_write_str(rkbuf, delt->topic, -1);
-
- /* timeout */
- op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
- rd_kafka_buf_write_i32(rkbuf, op_timeout);
-
- if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
- rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief Construct and send DeleteRecordsRequest to \p rkb
- * with the offsets to delete (rd_kafka_topic_partition_list_t *) in
- * \p offsets_list, using \p options.
- *
- * The response (unparsed) will be enqueued on \p replyq
- * for handling by \p resp_cb (with \p opaque passed).
- *
- * @remark The rd_kafka_topic_partition_list_t in \p offsets_list must already
- * be sorted.
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code and errstr will be
- * updated with a human readable error string.
- */
-rd_kafka_resp_err_t
-rd_kafka_DeleteRecordsRequest(rd_kafka_broker_t *rkb,
- /*(rd_kafka_topic_partition_list_t*)*/
- const rd_list_t *offsets_list,
- rd_kafka_AdminOptions_t *options,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- int features;
- const rd_kafka_topic_partition_list_t *partitions;
- int op_timeout;
-
- partitions = rd_list_elem(offsets_list, 0);
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_DeleteRecords, 0, 1, &features);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "DeleteRecords Admin API (KIP-107) not supported "
- "by broker, requires broker version >= 0.11.0");
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteRecords, 1,
- 4 + (partitions->cnt * 100) + 4);
-
- const rd_kafka_topic_partition_field_t fields[] = {
- RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
- RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET,
- RD_KAFKA_TOPIC_PARTITION_FIELD_END};
- rd_kafka_buf_write_topic_partitions(
- rkbuf, partitions, rd_false /*don't skip invalid offsets*/,
- rd_false /*any offset*/, fields);
-
- /* timeout */
- op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
- rd_kafka_buf_write_i32(rkbuf, op_timeout);
-
- if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
- rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief Construct and send CreatePartitionsRequest to \p rkb
- * with the topics (NewPartitions_t*) in \p new_parts, using
- * \p options.
- *
- * The response (unparsed) will be enqueued on \p replyq
- * for handling by \p resp_cb (with \p opaque passed).
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code and errstr will be
- * updated with a human readable error string.
- */
-rd_kafka_resp_err_t
-rd_kafka_CreatePartitionsRequest(rd_kafka_broker_t *rkb,
- /*(NewPartitions_t*)*/
- const rd_list_t *new_parts,
- rd_kafka_AdminOptions_t *options,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- int i = 0;
- rd_kafka_NewPartitions_t *newp;
- int op_timeout;
-
- if (rd_list_cnt(new_parts) == 0) {
- rd_snprintf(errstr, errstr_size, "No partitions to create");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
- }
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_CreatePartitions, 0, 0, NULL);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "CreatePartitions (KIP-195) not supported "
- "by broker, requires broker version >= 1.0.0");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreatePartitions, 1,
- 4 + (rd_list_cnt(new_parts) * 200) +
- 4 + 1);
-
- /* #topics */
- rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_parts));
-
- while ((newp = rd_list_elem(new_parts, i++))) {
- /* topic */
- rd_kafka_buf_write_str(rkbuf, newp->topic, -1);
-
- /* New partition count */
- rd_kafka_buf_write_i32(rkbuf, (int32_t)newp->total_cnt);
-
- /* #replica_assignment */
- if (rd_list_empty(&newp->replicas)) {
- rd_kafka_buf_write_i32(rkbuf, -1);
- } else {
- const rd_list_t *replicas;
- int pi = -1;
-
- rd_kafka_buf_write_i32(rkbuf,
- rd_list_cnt(&newp->replicas));
-
- while (
- (replicas = rd_list_elem(&newp->replicas, ++pi))) {
- int ri = 0;
-
- /* replica count */
- rd_kafka_buf_write_i32(rkbuf,
- rd_list_cnt(replicas));
-
- /* replica */
- for (ri = 0; ri < rd_list_cnt(replicas); ri++) {
- rd_kafka_buf_write_i32(
- rkbuf,
- rd_list_get_int32(replicas, ri));
- }
- }
- }
- }
-
- /* timeout */
- op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
- rd_kafka_buf_write_i32(rkbuf, op_timeout);
-
- if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
- rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
-
- /* validate_only */
- rd_kafka_buf_write_i8(
- rkbuf, rd_kafka_confval_get_int(&options->validate_only));
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief Construct and send AlterConfigsRequest to \p rkb
- * with the configs (ConfigResource_t*) in \p configs, using
- * \p options.
- *
- * The response (unparsed) will be enqueued on \p replyq
- * for handling by \p resp_cb (with \p opaque passed).
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code and errstr will be
- * updated with a human readable error string.
- */
-rd_kafka_resp_err_t
-rd_kafka_AlterConfigsRequest(rd_kafka_broker_t *rkb,
- const rd_list_t *configs /*(ConfigResource_t*)*/,
- rd_kafka_AdminOptions_t *options,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- int i;
- const rd_kafka_ConfigResource_t *config;
- int op_timeout;
-
- if (rd_list_cnt(configs) == 0) {
- rd_snprintf(errstr, errstr_size,
- "No config resources specified");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
- }
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_AlterConfigs, 0, 1, NULL);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "AlterConfigs (KIP-133) not supported "
- "by broker, requires broker version >= 0.11.0");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- /* Incremental requires IncrementalAlterConfigs */
- if (rd_kafka_confval_get_int(&options->incremental)) {
- rd_snprintf(errstr, errstr_size,
- "AlterConfigs.incremental=true (KIP-248) "
- "not supported by broker, "
- "replaced by IncrementalAlterConfigs");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_AlterConfigs, 1,
- rd_list_cnt(configs) * 200);
-
- /* #resources */
- rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs));
-
- RD_LIST_FOREACH(config, configs, i) {
- const rd_kafka_ConfigEntry_t *entry;
- int ei;
-
- /* resource_type */
- rd_kafka_buf_write_i8(rkbuf, config->restype);
-
- /* resource_name */
- rd_kafka_buf_write_str(rkbuf, config->name, -1);
-
- /* #config */
- rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&config->config));
-
- RD_LIST_FOREACH(entry, &config->config, ei) {
- /* config_name */
- rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1);
- /* config_value (nullable) */
- rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1);
-
- if (entry->a.operation != RD_KAFKA_ALTER_OP_SET) {
- rd_snprintf(errstr, errstr_size,
- "IncrementalAlterConfigs required "
- "for add/delete config "
- "entries: only set supported "
- "by this operation");
- rd_kafka_buf_destroy(rkbuf);
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
- }
- }
-
- /* timeout */
- op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
- if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
- rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
-
- /* validate_only */
- rd_kafka_buf_write_i8(
- rkbuf, rd_kafka_confval_get_int(&options->validate_only));
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief Construct and send DescribeConfigsRequest to \p rkb
- * with the configs (ConfigResource_t*) in \p configs, using
- * \p options.
- *
- * The response (unparsed) will be enqueued on \p replyq
- * for handling by \p resp_cb (with \p opaque passed).
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code and errstr will be
- * updated with a human readable error string.
- */
-rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest(
- rd_kafka_broker_t *rkb,
- const rd_list_t *configs /*(ConfigResource_t*)*/,
- rd_kafka_AdminOptions_t *options,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- int i;
- const rd_kafka_ConfigResource_t *config;
- int op_timeout;
-
- if (rd_list_cnt(configs) == 0) {
- rd_snprintf(errstr, errstr_size,
- "No config resources specified");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
- }
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_DescribeConfigs, 0, 1, NULL);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "DescribeConfigs (KIP-133) not supported "
- "by broker, requires broker version >= 0.11.0");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeConfigs, 1,
- rd_list_cnt(configs) * 200);
-
- /* #resources */
- rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs));
-
- RD_LIST_FOREACH(config, configs, i) {
- const rd_kafka_ConfigEntry_t *entry;
- int ei;
-
- /* resource_type */
- rd_kafka_buf_write_i8(rkbuf, config->restype);
-
- /* resource_name */
- rd_kafka_buf_write_str(rkbuf, config->name, -1);
-
- /* #config */
- if (rd_list_empty(&config->config)) {
- /* Get all configs */
- rd_kafka_buf_write_i32(rkbuf, -1);
- } else {
- /* Get requested configs only */
- rd_kafka_buf_write_i32(rkbuf,
- rd_list_cnt(&config->config));
- }
-
- RD_LIST_FOREACH(entry, &config->config, ei) {
- /* config_name */
- rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1);
- }
- }
-
-
- if (ApiVersion == 1) {
- /* include_synonyms */
- rd_kafka_buf_write_i8(rkbuf, 1);
- }
-
- /* timeout */
- op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
- if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
- rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief Construct and send DeleteGroupsRequest to \p rkb
- * with the groups (DeleteGroup_t *) in \p del_groups, using
- * \p options.
- *
- * The response (unparsed) will be enqueued on \p replyq
- * for handling by \p resp_cb (with \p opaque passed).
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code and errstr will be
- * updated with a human readable error string.
- */
-rd_kafka_resp_err_t
-rd_kafka_DeleteGroupsRequest(rd_kafka_broker_t *rkb,
- const rd_list_t *del_groups /*(DeleteGroup_t*)*/,
- rd_kafka_AdminOptions_t *options,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- int features;
- int i = 0;
- rd_kafka_DeleteGroup_t *delt;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_DeleteGroups, 0, 1, &features);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "DeleteGroups Admin API (KIP-229) not supported "
- "by broker, requires broker version >= 1.1.0");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- rkbuf =
- rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteGroups, 1,
- 4 + (rd_list_cnt(del_groups) * 100) + 4);
-
- /* #groups */
- rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(del_groups));
-
- while ((delt = rd_list_elem(del_groups, i++)))
- rd_kafka_buf_write_str(rkbuf, delt->group, -1);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-/**
- * @brief Returns the request size needed to send a specific AclBinding
- * specified in \p acl, using the ApiVersion provided in
- * \p ApiVersion.
- *
- * @returns and int16_t with the request size in bytes.
- */
-static RD_INLINE size_t
-rd_kafka_AclBinding_request_size(const rd_kafka_AclBinding_t *acl,
- int ApiVersion) {
- return 1 + 2 + (acl->name ? strlen(acl->name) : 0) + 2 +
- (acl->principal ? strlen(acl->principal) : 0) + 2 +
- (acl->host ? strlen(acl->host) : 0) + 1 + 1 +
- (ApiVersion > 0 ? 1 : 0);
-}
-
-/**
- * @brief Construct and send CreateAclsRequest to \p rkb
- * with the acls (AclBinding_t*) in \p new_acls, using
- * \p options.
- *
- * The response (unparsed) will be enqueued on \p replyq
- * for handling by \p resp_cb (with \p opaque passed).
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code and errstr will be
- * updated with a human readable error string.
- */
-rd_kafka_resp_err_t
-rd_kafka_CreateAclsRequest(rd_kafka_broker_t *rkb,
- const rd_list_t *new_acls /*(AclBinding_t*)*/,
- rd_kafka_AdminOptions_t *options,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion;
- int i;
- size_t len;
- int op_timeout;
- rd_kafka_AclBinding_t *new_acl;
-
- if (rd_list_cnt(new_acls) == 0) {
- rd_snprintf(errstr, errstr_size, "No acls to create");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
- }
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_CreateAcls, 0, 1, NULL);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "ACLs Admin API (KIP-140) not supported "
- "by broker, requires broker version >= 0.11.0.0");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- if (ApiVersion == 0) {
- RD_LIST_FOREACH(new_acl, new_acls, i) {
- if (new_acl->resource_pattern_type !=
- RD_KAFKA_RESOURCE_PATTERN_LITERAL) {
- rd_snprintf(errstr, errstr_size,
- "Broker only supports LITERAL "
- "resource pattern types");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
- }
- } else {
- RD_LIST_FOREACH(new_acl, new_acls, i) {
- if (new_acl->resource_pattern_type !=
- RD_KAFKA_RESOURCE_PATTERN_LITERAL &&
- new_acl->resource_pattern_type !=
- RD_KAFKA_RESOURCE_PATTERN_PREFIXED) {
- rd_snprintf(errstr, errstr_size,
- "Only LITERAL and PREFIXED "
- "resource patterns are supported "
- "when creating ACLs");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
- }
- }
-
- len = 4;
- RD_LIST_FOREACH(new_acl, new_acls, i) {
- len += rd_kafka_AclBinding_request_size(new_acl, ApiVersion);
- }
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreateAcls, 1, len);
-
- /* #acls */
- rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_acls));
-
- RD_LIST_FOREACH(new_acl, new_acls, i) {
- rd_kafka_buf_write_i8(rkbuf, new_acl->restype);
-
- rd_kafka_buf_write_str(rkbuf, new_acl->name, -1);
-
- if (ApiVersion >= 1) {
- rd_kafka_buf_write_i8(rkbuf,
- new_acl->resource_pattern_type);
- }
-
- rd_kafka_buf_write_str(rkbuf, new_acl->principal, -1);
-
- rd_kafka_buf_write_str(rkbuf, new_acl->host, -1);
-
- rd_kafka_buf_write_i8(rkbuf, new_acl->operation);
-
- rd_kafka_buf_write_i8(rkbuf, new_acl->permission_type);
- }
-
- /* timeout */
- op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
- if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
- rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-/**
- * @brief Construct and send DescribeAclsRequest to \p rkb
- * with the acls (AclBinding_t*) in \p acls, using
- * \p options.
- *
- * The response (unparsed) will be enqueued on \p replyq
- * for handling by \p resp_cb (with \p opaque passed).
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code and errstr will be
- * updated with a human readable error string.
- */
-rd_kafka_resp_err_t rd_kafka_DescribeAclsRequest(
- rd_kafka_broker_t *rkb,
- const rd_list_t *acls /*(rd_kafka_AclBindingFilter_t*)*/,
- rd_kafka_AdminOptions_t *options,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- const rd_kafka_AclBindingFilter_t *acl;
- int op_timeout;
-
- if (rd_list_cnt(acls) == 0) {
- rd_snprintf(errstr, errstr_size,
- "No acl binding filters specified");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
- }
- if (rd_list_cnt(acls) > 1) {
- rd_snprintf(errstr, errstr_size,
- "Too many acl binding filters specified");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
- }
-
- acl = rd_list_elem(acls, 0);
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_DescribeAcls, 0, 1, NULL);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "ACLs Admin API (KIP-140) not supported "
- "by broker, requires broker version >= 0.11.0.0");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- if (ApiVersion == 0) {
- if (acl->resource_pattern_type !=
- RD_KAFKA_RESOURCE_PATTERN_LITERAL &&
- acl->resource_pattern_type !=
- RD_KAFKA_RESOURCE_PATTERN_ANY) {
- rd_snprintf(errstr, errstr_size,
- "Broker only supports LITERAL and ANY "
- "resource pattern types");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
- } else {
- if (acl->resource_pattern_type ==
- RD_KAFKA_RESOURCE_PATTERN_UNKNOWN) {
- rd_snprintf(errstr, errstr_size,
- "Filter contains UNKNOWN elements");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
- }
-
- rkbuf = rd_kafka_buf_new_request(
- rkb, RD_KAFKAP_DescribeAcls, 1,
- rd_kafka_AclBinding_request_size(acl, ApiVersion));
-
- /* resource_type */
- rd_kafka_buf_write_i8(rkbuf, acl->restype);
-
- /* resource_name filter */
- rd_kafka_buf_write_str(rkbuf, acl->name, -1);
-
- if (ApiVersion > 0) {
- /* resource_pattern_type (rd_kafka_ResourcePatternType_t) */
- rd_kafka_buf_write_i8(rkbuf, acl->resource_pattern_type);
- }
-
- /* principal filter */
- rd_kafka_buf_write_str(rkbuf, acl->principal, -1);
-
- /* host filter */
- rd_kafka_buf_write_str(rkbuf, acl->host, -1);
-
- /* operation (rd_kafka_AclOperation_t) */
- rd_kafka_buf_write_i8(rkbuf, acl->operation);
-
- /* permission type (rd_kafka_AclPermissionType_t) */
- rd_kafka_buf_write_i8(rkbuf, acl->permission_type);
-
- /* timeout */
- op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
- if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
- rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-/**
- * @brief Construct and send DeleteAclsRequest to \p rkb
- * with the acl filters (AclBindingFilter_t*) in \p del_acls, using
- * \p options.
- *
- * The response (unparsed) will be enqueued on \p replyq
- * for handling by \p resp_cb (with \p opaque passed).
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code and errstr will be
- * updated with a human readable error string.
- */
-rd_kafka_resp_err_t
-rd_kafka_DeleteAclsRequest(rd_kafka_broker_t *rkb,
- const rd_list_t *del_acls /*(AclBindingFilter_t*)*/,
- rd_kafka_AdminOptions_t *options,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- const rd_kafka_AclBindingFilter_t *acl;
- int op_timeout;
- int i;
- size_t len;
-
- if (rd_list_cnt(del_acls) == 0) {
- rd_snprintf(errstr, errstr_size,
- "No acl binding filters specified");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__INVALID_ARG;
- }
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_DeleteAcls, 0, 1, NULL);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "ACLs Admin API (KIP-140) not supported "
- "by broker, requires broker version >= 0.11.0.0");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- len = 4;
-
- RD_LIST_FOREACH(acl, del_acls, i) {
- if (ApiVersion == 0) {
- if (acl->resource_pattern_type !=
- RD_KAFKA_RESOURCE_PATTERN_LITERAL &&
- acl->resource_pattern_type !=
- RD_KAFKA_RESOURCE_PATTERN_ANY) {
- rd_snprintf(errstr, errstr_size,
- "Broker only supports LITERAL "
- "and ANY resource pattern types");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
- } else {
- if (acl->resource_pattern_type ==
- RD_KAFKA_RESOURCE_PATTERN_UNKNOWN) {
- rd_snprintf(errstr, errstr_size,
- "Filter contains UNKNOWN elements");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
- }
-
- len += rd_kafka_AclBinding_request_size(acl, ApiVersion);
- }
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteAcls, 1, len);
-
- /* #acls */
- rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(del_acls));
-
- RD_LIST_FOREACH(acl, del_acls, i) {
- /* resource_type */
- rd_kafka_buf_write_i8(rkbuf, acl->restype);
-
- /* resource_name filter */
- rd_kafka_buf_write_str(rkbuf, acl->name, -1);
-
- if (ApiVersion > 0) {
- /* resource_pattern_type
- * (rd_kafka_ResourcePatternType_t) */
- rd_kafka_buf_write_i8(rkbuf,
- acl->resource_pattern_type);
- }
-
- /* principal filter */
- rd_kafka_buf_write_str(rkbuf, acl->principal, -1);
-
- /* host filter */
- rd_kafka_buf_write_str(rkbuf, acl->host, -1);
-
- /* operation (rd_kafka_AclOperation_t) */
- rd_kafka_buf_write_i8(rkbuf, acl->operation);
-
- /* permission type (rd_kafka_AclPermissionType_t) */
- rd_kafka_buf_write_i8(rkbuf, acl->permission_type);
- }
-
- /* timeout */
- op_timeout = rd_kafka_confval_get_int(&options->operation_timeout);
- if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms)
- rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-/**
- * @brief Parses and handles an InitProducerId reply.
- *
- * @locality rdkafka main thread
- * @locks none
- */
-void rd_kafka_handle_InitProducerId(rd_kafka_t *rk,
- rd_kafka_broker_t *rkb,
- rd_kafka_resp_err_t err,
- rd_kafka_buf_t *rkbuf,
- rd_kafka_buf_t *request,
- void *opaque) {
- const int log_decode_errors = LOG_ERR;
- int16_t error_code;
- rd_kafka_pid_t pid;
-
- if (err)
- goto err;
-
- rd_kafka_buf_read_throttle_time(rkbuf);
-
- rd_kafka_buf_read_i16(rkbuf, &error_code);
- if ((err = error_code))
- goto err;
-
- rd_kafka_buf_read_i64(rkbuf, &pid.id);
- rd_kafka_buf_read_i16(rkbuf, &pid.epoch);
-
- rd_kafka_idemp_pid_update(rkb, pid);
-
- return;
-
-err_parse:
- err = rkbuf->rkbuf_err;
-err:
- if (err == RD_KAFKA_RESP_ERR__DESTROY)
- return;
-
- /* Retries are performed by idempotence state handler */
- rd_kafka_idemp_request_pid_failed(rkb, err);
-}
-
-/**
- * @brief Construct and send InitProducerIdRequest to \p rkb.
- *
- * @param transactional_id may be NULL.
- * @param transaction_timeout_ms may be set to -1.
- * @param current_pid the current PID to reset, requires KIP-360. If not NULL
- * and KIP-360 is not supported by the broker this function
- * will return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE.
- *
- * The response (unparsed) will be handled by \p resp_cb served
- * by queue \p replyq.
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code and errstr will be
- * updated with a human readable error string.
- */
-rd_kafka_resp_err_t
-rd_kafka_InitProducerIdRequest(rd_kafka_broker_t *rkb,
- const char *transactional_id,
- int transaction_timeout_ms,
- const rd_kafka_pid_t *current_pid,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion;
-
- if (current_pid) {
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_InitProducerId, 3, 4, NULL);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "InitProducerId (KIP-360) not supported by "
- "broker, requires broker version >= 2.5.0: "
- "unable to recover from previous "
- "transactional error");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
- } else {
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_InitProducerId, 0, 4, NULL);
-
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "InitProducerId (KIP-98) not supported by "
- "broker, requires broker "
- "version >= 0.11.0");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
- }
-
- rkbuf = rd_kafka_buf_new_flexver_request(
- rkb, RD_KAFKAP_InitProducerId, 1,
- 2 + (transactional_id ? strlen(transactional_id) : 0) + 4 + 8 + 4,
- ApiVersion >= 2 /*flexver*/);
-
- /* transactional_id */
- rd_kafka_buf_write_str(rkbuf, transactional_id, -1);
-
- /* transaction_timeout_ms */
- rd_kafka_buf_write_i32(rkbuf, transaction_timeout_ms);
-
- if (ApiVersion >= 3) {
- /* Current PID */
- rd_kafka_buf_write_i64(rkbuf,
- current_pid ? current_pid->id : -1);
- /* Current Epoch */
- rd_kafka_buf_write_i16(rkbuf,
- current_pid ? current_pid->epoch : -1);
- }
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- /* Let the idempotence state handler perform retries */
- rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief Construct and send AddPartitionsToTxnRequest to \p rkb.
- *
- * The response (unparsed) will be handled by \p resp_cb served
- * by queue \p replyq.
- *
- * @param rktps MUST be sorted by topic name.
- *
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code.
- */
-rd_kafka_resp_err_t
-rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb,
- const char *transactional_id,
- rd_kafka_pid_t pid,
- const rd_kafka_toppar_tqhead_t *rktps,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
- rd_kafka_toppar_t *rktp;
- rd_kafka_topic_t *last_rkt = NULL;
- size_t of_TopicCnt;
- ssize_t of_PartCnt = -1;
- int TopicCnt = 0, PartCnt = 0;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_AddPartitionsToTxn, 0, 0, NULL);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "AddPartitionsToTxnRequest (KIP-98) not supported "
- "by broker, requires broker version >= 0.11.0");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- rkbuf =
- rd_kafka_buf_new_request(rkb, RD_KAFKAP_AddPartitionsToTxn, 1, 500);
-
- /* transactional_id */
- rd_kafka_buf_write_str(rkbuf, transactional_id, -1);
-
- /* PID */
- rd_kafka_buf_write_i64(rkbuf, pid.id);
- rd_kafka_buf_write_i16(rkbuf, pid.epoch);
-
- /* Topics/partitions array (count updated later) */
- of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0);
-
- TAILQ_FOREACH(rktp, rktps, rktp_txnlink) {
- if (last_rkt != rktp->rktp_rkt) {
-
- if (last_rkt) {
- /* Update last topic's partition count field */
- rd_kafka_buf_update_i32(rkbuf, of_PartCnt,
- PartCnt);
- of_PartCnt = -1;
- }
-
- /* Topic name */
- rd_kafka_buf_write_kstr(rkbuf,
- rktp->rktp_rkt->rkt_topic);
- /* Partition count, updated later */
- of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0);
-
- PartCnt = 0;
- TopicCnt++;
- last_rkt = rktp->rktp_rkt;
- }
-
- /* Partition id */
- rd_kafka_buf_write_i32(rkbuf, rktp->rktp_partition);
- PartCnt++;
- }
-
- /* Update last partition and topic count fields */
- if (of_PartCnt != -1)
- rd_kafka_buf_update_i32(rkbuf, (size_t)of_PartCnt, PartCnt);
- rd_kafka_buf_update_i32(rkbuf, of_TopicCnt, TopicCnt);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- /* Let the handler perform retries so that it can pick
- * up more added partitions. */
- rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES;
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief Construct and send AddOffsetsToTxnRequest to \p rkb.
- *
- * The response (unparsed) will be handled by \p resp_cb served
- * by queue \p replyq.
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code.
- */
-rd_kafka_resp_err_t
-rd_kafka_AddOffsetsToTxnRequest(rd_kafka_broker_t *rkb,
- const char *transactional_id,
- rd_kafka_pid_t pid,
- const char *group_id,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(
- rkb, RD_KAFKAP_AddOffsetsToTxn, 0, 0, NULL);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "AddOffsetsToTxnRequest (KIP-98) not supported "
- "by broker, requires broker version >= 0.11.0");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- rkbuf =
- rd_kafka_buf_new_request(rkb, RD_KAFKAP_AddOffsetsToTxn, 1, 100);
-
- /* transactional_id */
- rd_kafka_buf_write_str(rkbuf, transactional_id, -1);
-
- /* PID */
- rd_kafka_buf_write_i64(rkbuf, pid.id);
- rd_kafka_buf_write_i16(rkbuf, pid.epoch);
-
- /* Group Id */
- rd_kafka_buf_write_str(rkbuf, group_id, -1);
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES;
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-/**
- * @brief Construct and send EndTxnRequest to \p rkb.
- *
- * The response (unparsed) will be handled by \p resp_cb served
- * by queue \p replyq.
- *
- * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for
- * transmission, otherwise an error code.
- */
-rd_kafka_resp_err_t rd_kafka_EndTxnRequest(rd_kafka_broker_t *rkb,
- const char *transactional_id,
- rd_kafka_pid_t pid,
- rd_bool_t committed,
- char *errstr,
- size_t errstr_size,
- rd_kafka_replyq_t replyq,
- rd_kafka_resp_cb_t *resp_cb,
- void *opaque) {
- rd_kafka_buf_t *rkbuf;
- int16_t ApiVersion = 0;
-
- ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, RD_KAFKAP_EndTxn,
- 0, 1, NULL);
- if (ApiVersion == -1) {
- rd_snprintf(errstr, errstr_size,
- "EndTxnRequest (KIP-98) not supported "
- "by broker, requires broker version >= 0.11.0");
- rd_kafka_replyq_destroy(&replyq);
- return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE;
- }
-
- rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_EndTxn, 1, 500);
-
- /* transactional_id */
- rd_kafka_buf_write_str(rkbuf, transactional_id, -1);
-
- /* PID */
- rd_kafka_buf_write_i64(rkbuf, pid.id);
- rd_kafka_buf_write_i16(rkbuf, pid.epoch);
-
- /* Committed */
- rd_kafka_buf_write_bool(rkbuf, committed);
- rkbuf->rkbuf_u.EndTxn.commit = committed;
-
- rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);
-
- rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES;
-
- rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-
-/**
- * @name Unit tests
- * @{
- *
- *
- *
- *
- */
-
-/**
- * @brief Create \p cnt messages, starting at \p msgid, and add them
- * to \p rkmq.
- *
- * @returns the number of messages added.
- */
-static int ut_create_msgs(rd_kafka_msgq_t *rkmq, uint64_t msgid, int cnt) {
- int i;
-
- for (i = 0; i < cnt; i++) {
- rd_kafka_msg_t *rkm;
-
- rkm = ut_rd_kafka_msg_new(0);
- rkm->rkm_u.producer.msgid = msgid++;
- rkm->rkm_ts_enq = rd_clock();
- rkm->rkm_ts_timeout = rkm->rkm_ts_enq + (900 * 1000 * 1000);
-
- rd_kafka_msgq_enq(rkmq, rkm);
- }
-
- return cnt;
-}
-
-/**
- * @brief Idempotent Producer request/response unit tests
- *
- * The current test verifies proper handling of the following case:
- * Batch 0 succeeds
- * Batch 1 fails with temporary error
- * Batch 2,3 fails with out of order sequence
- * Retry Batch 1-3 should succeed.
- */
-static int unittest_idempotent_producer(void) {
- rd_kafka_t *rk;
- rd_kafka_conf_t *conf;
- rd_kafka_broker_t *rkb;
-#define _BATCH_CNT 4
-#define _MSGS_PER_BATCH 3
- const int msgcnt = _BATCH_CNT * _MSGS_PER_BATCH;
- int remaining_batches;
- uint64_t msgid = 1;
- rd_kafka_toppar_t *rktp;
- rd_kafka_pid_t pid = {.id = 1000, .epoch = 0};
- struct rd_kafka_Produce_result result = {.offset = 1,
- .timestamp = 1000};
- rd_kafka_queue_t *rkqu;
- rd_kafka_event_t *rkev;
- rd_kafka_buf_t *request[_BATCH_CNT];
- int rcnt = 0;
- int retry_msg_cnt = 0;
- int drcnt = 0;
- rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq);
- const char *tmp;
- int i, r;
-
- RD_UT_SAY("Verifying idempotent producer error handling");
-
- conf = rd_kafka_conf_new();
- rd_kafka_conf_set(conf, "batch.num.messages", "3", NULL, 0);
- rd_kafka_conf_set(conf, "retry.backoff.ms", "1", NULL, 0);
- if ((tmp = rd_getenv("TEST_DEBUG", NULL)))
- rd_kafka_conf_set(conf, "debug", tmp, NULL, 0);
- if (rd_kafka_conf_set(conf, "enable.idempotence", "true", NULL, 0) !=
- RD_KAFKA_CONF_OK)
- RD_UT_FAIL("Failed to enable idempotence");
- rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_DR);
-
- rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
- RD_UT_ASSERT(rk, "failed to create producer");
-
- rkqu = rd_kafka_queue_get_main(rk);
-
- /* We need a broker handle, use a logical broker to avoid
- * any connection attempts. */
- rkb = rd_kafka_broker_add_logical(rk, "unittest");
-
- /* Have the broker support everything so msgset_writer selects
- * the most up-to-date output features. */
- rd_kafka_broker_lock(rkb);
- rkb->rkb_features = RD_KAFKA_FEATURE_UNITTEST | RD_KAFKA_FEATURE_ALL;
- rd_kafka_broker_unlock(rkb);
-
- /* Get toppar */
- rktp = rd_kafka_toppar_get2(rk, "uttopic", 0, rd_false, rd_true);
- RD_UT_ASSERT(rktp, "failed to get toppar");
-
- /* Set the topic as exists so messages are enqueued on
- * the desired rktp away (otherwise UA partition) */
- rd_ut_kafka_topic_set_topic_exists(rktp->rktp_rkt, 1, -1);
-
- /* Produce messages */
- ut_create_msgs(&rkmq, 1, msgcnt);
-
- /* Set the pid */
- rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_WAIT_PID);
- rd_kafka_idemp_pid_update(rkb, pid);
- pid = rd_kafka_idemp_get_pid(rk);
- RD_UT_ASSERT(rd_kafka_pid_valid(pid), "PID is invalid");
- rd_kafka_toppar_pid_change(rktp, pid, msgid);
-
- remaining_batches = _BATCH_CNT;
-
- /* Create a ProduceRequest for each batch */
- for (rcnt = 0; rcnt < remaining_batches; rcnt++) {
- size_t msize;
- request[rcnt] = rd_kafka_msgset_create_ProduceRequest(
- rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), 0, &msize);
- RD_UT_ASSERT(request[rcnt], "request #%d failed", rcnt);
- }
-
- RD_UT_ASSERT(rd_kafka_msgq_len(&rkmq) == 0,
- "expected input message queue to be empty, "
- "but still has %d message(s)",
- rd_kafka_msgq_len(&rkmq));
-
- /*
- * Mock handling of each request
- */
-
- /* Batch 0: accepted */
- i = 0;
- r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
- RD_UT_ASSERT(r == _MSGS_PER_BATCH, ".");
- rd_kafka_msgbatch_handle_Produce_result(rkb, &request[i]->rkbuf_batch,
- RD_KAFKA_RESP_ERR_NO_ERROR,
- &result, request[i]);
- result.offset += r;
- RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == 0,
- "batch %d: expected no messages in rktp_msgq, not %d", i,
- rd_kafka_msgq_len(&rktp->rktp_msgq));
- rd_kafka_buf_destroy(request[i]);
- remaining_batches--;
-
- /* Batch 1: fail, triggering retry (re-enq on rktp_msgq) */
- i = 1;
- r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
- RD_UT_ASSERT(r == _MSGS_PER_BATCH, ".");
- rd_kafka_msgbatch_handle_Produce_result(
- rkb, &request[i]->rkbuf_batch,
- RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, &result, request[i]);
- retry_msg_cnt += r;
- RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt,
- "batch %d: expected %d messages in rktp_msgq, not %d", i,
- retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq));
- rd_kafka_buf_destroy(request[i]);
-
- /* Batch 2: OUT_OF_ORDER, triggering retry .. */
- i = 2;
- r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
- RD_UT_ASSERT(r == _MSGS_PER_BATCH, ".");
- rd_kafka_msgbatch_handle_Produce_result(
- rkb, &request[i]->rkbuf_batch,
- RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, &result,
- request[i]);
- retry_msg_cnt += r;
- RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt,
- "batch %d: expected %d messages in rktp_xmit_msgq, not %d",
- i, retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq));
- rd_kafka_buf_destroy(request[i]);
-
- /* Batch 3: OUT_OF_ORDER, triggering retry .. */
- i = 3;
- r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
- rd_kafka_msgbatch_handle_Produce_result(
- rkb, &request[i]->rkbuf_batch,
- RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, &result,
- request[i]);
- retry_msg_cnt += r;
- RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt,
- "batch %d: expected %d messages in rktp_xmit_msgq, not %d",
- i, retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq));
- rd_kafka_buf_destroy(request[i]);
-
-
- /* Retried messages will have been moved to rktp_msgq,
- * move them back to our local queue. */
- rd_kafka_toppar_lock(rktp);
- rd_kafka_msgq_move(&rkmq, &rktp->rktp_msgq);
- rd_kafka_toppar_unlock(rktp);
-
- RD_UT_ASSERT(rd_kafka_msgq_len(&rkmq) == retry_msg_cnt,
- "Expected %d messages in retry queue, not %d",
- retry_msg_cnt, rd_kafka_msgq_len(&rkmq));
-
- /* Sleep a short while to make sure the retry backoff expires. */
- rd_usleep(5 * 1000, NULL); /* 5ms */
-
- /*
- * Create requests for remaining batches.
- */
- for (rcnt = 0; rcnt < remaining_batches; rcnt++) {
- size_t msize;
- request[rcnt] = rd_kafka_msgset_create_ProduceRequest(
- rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), 0, &msize);
- RD_UT_ASSERT(request[rcnt],
- "Failed to create retry #%d (%d msgs in queue)",
- rcnt, rd_kafka_msgq_len(&rkmq));
- }
-
- /*
- * Mock handling of each request, they will now succeed.
- */
- for (i = 0; i < rcnt; i++) {
- r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq);
- rd_kafka_msgbatch_handle_Produce_result(
- rkb, &request[i]->rkbuf_batch, RD_KAFKA_RESP_ERR_NO_ERROR,
- &result, request[i]);
- result.offset += r;
- rd_kafka_buf_destroy(request[i]);
- }
-
- retry_msg_cnt = 0;
- RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt,
- "batch %d: expected %d messages in rktp_xmit_msgq, not %d",
- i, retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq));
-
- /*
- * Wait for delivery reports, they should all be successful.
- */
- while ((rkev = rd_kafka_queue_poll(rkqu, 1000))) {
- const rd_kafka_message_t *rkmessage;
-
- RD_UT_SAY("Got %s event with %d message(s)",
- rd_kafka_event_name(rkev),
- (int)rd_kafka_event_message_count(rkev));
-
- while ((rkmessage = rd_kafka_event_message_next(rkev))) {
- RD_UT_SAY(" DR for message: %s: (persistence=%d)",
- rd_kafka_err2str(rkmessage->err),
- rd_kafka_message_status(rkmessage));
- if (rkmessage->err)
- RD_UT_WARN(" ^ Should not have failed");
- else
- drcnt++;
- }
- rd_kafka_event_destroy(rkev);
- }
-
- /* Should be no more messages in queues */
- r = rd_kafka_outq_len(rk);
- RD_UT_ASSERT(r == 0, "expected outq to return 0, not %d", r);
-
- /* Verify the expected number of good delivery reports were seen */
- RD_UT_ASSERT(drcnt == msgcnt, "expected %d DRs, not %d", msgcnt, drcnt);
-
- rd_kafka_queue_destroy(rkqu);
- rd_kafka_toppar_destroy(rktp);
- rd_kafka_broker_destroy(rkb);
- rd_kafka_destroy(rk);
-
- RD_UT_PASS();
- return 0;
-}
-
-/**
- * @brief Request/response unit tests
- */
-int unittest_request(void) {
- int fails = 0;
-
- fails += unittest_idempotent_producer();
-
- return fails;
-}
-
-/**@}*/