From c21c3b0befeb46a51b6bf3758ffa30813bea0ff0 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 9 Mar 2024 14:19:22 +0100 Subject: Adding upstream version 1.44.3. Signed-off-by: Daniel Baumann --- .../lib/librdkafka-2.1.0/src/rdkafka_request.c | 5378 ++++++++++++++++++++ 1 file changed, 5378 insertions(+) create mode 100644 fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c') diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c new file mode 100644 index 000000000..12d9eb30e --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c @@ -0,0 +1,5378 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include + +#include "rdkafka_int.h" +#include "rdkafka_request.h" +#include "rdkafka_broker.h" +#include "rdkafka_offset.h" +#include "rdkafka_topic.h" +#include "rdkafka_partition.h" +#include "rdkafka_metadata.h" +#include "rdkafka_msgset.h" +#include "rdkafka_idempotence.h" +#include "rdkafka_txnmgr.h" +#include "rdkafka_sasl.h" + +#include "rdrand.h" +#include "rdstring.h" +#include "rdunittest.h" + + +/** + * Kafka protocol request and response handling. + * All of this code runs in the broker thread and uses op queues for + * propagating results back to the various sub-systems operating in + * other threads. + */ + + +/* RD_KAFKA_ERR_ACTION_.. to string map */ +static const char *rd_kafka_actions_descs[] = { + "Permanent", "Ignore", "Refresh", "Retry", + "Inform", "Special", "MsgNotPersisted", "MsgPossiblyPersisted", + "MsgPersisted", NULL, +}; + +const char *rd_kafka_actions2str(int actions) { + static RD_TLS char actstr[128]; + return rd_flags2str(actstr, sizeof(actstr), rd_kafka_actions_descs, + actions); +} + + +/** + * @brief Decide action(s) to take based on the returned error code. + * + * The optional var-args is a .._ACTION_END terminated list + * of action,error tuples which overrides the general behaviour. + * It is to be read as: for \p error, return \p action(s). + * + * @warning \p request, \p rkbuf and \p rkb may be NULL. + */ +int rd_kafka_err_action(rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + const rd_kafka_buf_t *request, + ...) { + va_list ap; + int actions = 0; + int exp_act; + + if (!err) + return 0; + + /* Match explicitly defined error mappings first. */ + va_start(ap, request); + while ((exp_act = va_arg(ap, int))) { + int exp_err = va_arg(ap, int); + + if (err == exp_err) + actions |= exp_act; + } + va_end(ap); + + /* Explicit error match. */ + if (actions) { + if (err && rkb && request) + rd_rkb_dbg( + rkb, BROKER, "REQERR", + "%sRequest failed: %s: explicit actions %s", + rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey), + rd_kafka_err2str(err), + rd_kafka_actions2str(actions)); + + return actions; + } + + /* Default error matching */ + switch (err) { + case RD_KAFKA_RESP_ERR_NO_ERROR: + break; + case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION: + case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR_NOT_COORDINATOR: + case RD_KAFKA_RESP_ERR__WAIT_COORD: + /* Request metadata information update */ + actions |= RD_KAFKA_ERR_ACTION_REFRESH | + RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED; + break; + + case RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR: + /* Request metadata update and retry */ + actions |= RD_KAFKA_ERR_ACTION_REFRESH | + RD_KAFKA_ERR_ACTION_RETRY | + RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED; + break; + + case RD_KAFKA_RESP_ERR__TRANSPORT: + case RD_KAFKA_RESP_ERR__TIMED_OUT: + case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT: + case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND: + actions |= RD_KAFKA_ERR_ACTION_RETRY | + RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED; + break; + + case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS: + /* Client-side wait-response/in-queue timeout */ + case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE: + actions |= RD_KAFKA_ERR_ACTION_RETRY | + RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED; + break; + + case RD_KAFKA_RESP_ERR__PURGE_INFLIGHT: + actions |= RD_KAFKA_ERR_ACTION_PERMANENT | + RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED; + break; + + case RD_KAFKA_RESP_ERR__BAD_MSG: + /* Buffer parse failures are typically a client-side bug, + * treat them as permanent failures. */ + actions |= RD_KAFKA_ERR_ACTION_PERMANENT | + RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED; + break; + + case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: + actions |= RD_KAFKA_ERR_ACTION_RETRY; + break; + + case RD_KAFKA_RESP_ERR__DESTROY: + case RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT: + case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE: + case RD_KAFKA_RESP_ERR__PURGE_QUEUE: + default: + actions |= RD_KAFKA_ERR_ACTION_PERMANENT | + RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED; + break; + } + + /* Fatal or permanent errors are not retriable */ + if (actions & + (RD_KAFKA_ERR_ACTION_FATAL | RD_KAFKA_ERR_ACTION_PERMANENT)) + actions &= ~RD_KAFKA_ERR_ACTION_RETRY; + + /* If no request buffer was specified, which might be the case + * in certain error call chains, mask out the retry action. */ + if (!request) + actions &= ~RD_KAFKA_ERR_ACTION_RETRY; + else if (request->rkbuf_reqhdr.ApiKey != RD_KAFKAP_Produce) + /* Mask out message-related bits for non-Produce requests */ + actions &= ~RD_KAFKA_ERR_ACTION_MSG_FLAGS; + + if (err && actions && rkb && request) + rd_rkb_dbg( + rkb, BROKER, "REQERR", "%sRequest failed: %s: actions %s", + rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey), + rd_kafka_err2str(err), rd_kafka_actions2str(actions)); + + return actions; +} + + +/** + * @brief Read a list of topic+partitions+extra from \p rkbuf. + * + * @param rkbuf buffer to read from + * @param fields An array of fields to read from the buffer and set on + * the rktpar object, in the specified order, must end + * with RD_KAFKA_TOPIC_PARTITION_FIELD_END. + * + * @returns a newly allocated list on success, or NULL on parse error. + */ +rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( + rd_kafka_buf_t *rkbuf, + size_t estimated_part_cnt, + const rd_kafka_topic_partition_field_t *fields) { + const int log_decode_errors = LOG_ERR; + int32_t TopicArrayCnt; + rd_kafka_topic_partition_list_t *parts = NULL; + + rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); + + parts = rd_kafka_topic_partition_list_new( + RD_MAX(TopicArrayCnt * 4, (int)estimated_part_cnt)); + + while (TopicArrayCnt-- > 0) { + rd_kafkap_str_t kTopic; + int32_t PartArrayCnt; + char *topic; + + rd_kafka_buf_read_str(rkbuf, &kTopic); + rd_kafka_buf_read_arraycnt(rkbuf, &PartArrayCnt, + RD_KAFKAP_PARTITIONS_MAX); + + RD_KAFKAP_STR_DUPA(&topic, &kTopic); + + while (PartArrayCnt-- > 0) { + int32_t Partition = -1, Epoch = -1234, + CurrentLeaderEpoch = -1234; + int64_t Offset = -1234; + int16_t ErrorCode = 0; + rd_kafka_topic_partition_t *rktpar; + int fi; + + /* + * Read requested fields + */ + for (fi = 0; + fields[fi] != RD_KAFKA_TOPIC_PARTITION_FIELD_END; + fi++) { + switch (fields[fi]) { + case RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION: + rd_kafka_buf_read_i32(rkbuf, + &Partition); + break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET: + rd_kafka_buf_read_i64(rkbuf, &Offset); + break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_CURRENT_EPOCH: + rd_kafka_buf_read_i32( + rkbuf, &CurrentLeaderEpoch); + break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH: + rd_kafka_buf_read_i32(rkbuf, &Epoch); + break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_ERR: + rd_kafka_buf_read_i16(rkbuf, + &ErrorCode); + break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA: + rd_assert(!*"metadata not implemented"); + break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP: + break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_END: + break; + } + } + + rktpar = rd_kafka_topic_partition_list_add(parts, topic, + Partition); + /* Use dummy sentinel values that are unlikely to be + * seen from the broker to know if we are to set these + * fields or not. */ + if (Offset != -1234) + rktpar->offset = Offset; + if (Epoch != -1234) + rd_kafka_topic_partition_set_leader_epoch( + rktpar, Epoch); + if (CurrentLeaderEpoch != -1234) + rd_kafka_topic_partition_set_current_leader_epoch( + rktpar, CurrentLeaderEpoch); + rktpar->err = ErrorCode; + + + rd_kafka_buf_skip_tags(rkbuf); + } + + rd_kafka_buf_skip_tags(rkbuf); + } + + return parts; + +err_parse: + if (parts) + rd_kafka_topic_partition_list_destroy(parts); + + return NULL; +} + + +/** + * @brief Write a list of topic+partitions+offsets+extra to \p rkbuf + * + * @returns the number of partitions written to buffer. + * + * @remark The \p parts list MUST be sorted. + */ +int rd_kafka_buf_write_topic_partitions( + rd_kafka_buf_t *rkbuf, + const rd_kafka_topic_partition_list_t *parts, + rd_bool_t skip_invalid_offsets, + rd_bool_t only_invalid_offsets, + const rd_kafka_topic_partition_field_t *fields) { + size_t of_TopicArrayCnt; + size_t of_PartArrayCnt = 0; + int TopicArrayCnt = 0, PartArrayCnt = 0; + int i; + const char *prev_topic = NULL; + int cnt = 0; + + rd_assert(!only_invalid_offsets || + (only_invalid_offsets != skip_invalid_offsets)); + + /* TopicArrayCnt */ + of_TopicArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf); + + for (i = 0; i < parts->cnt; i++) { + const rd_kafka_topic_partition_t *rktpar = &parts->elems[i]; + int fi; + + if (rktpar->offset < 0) { + if (skip_invalid_offsets) + continue; + } else if (only_invalid_offsets) + continue; + + if (!prev_topic || strcmp(rktpar->topic, prev_topic)) { + /* Finish previous topic, if any. */ + if (of_PartArrayCnt > 0) { + rd_kafka_buf_finalize_arraycnt( + rkbuf, of_PartArrayCnt, PartArrayCnt); + /* Tags for previous topic struct */ + rd_kafka_buf_write_tags(rkbuf); + } + + + /* Topic */ + rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); + TopicArrayCnt++; + prev_topic = rktpar->topic; + /* New topic so reset partition count */ + PartArrayCnt = 0; + + /* PartitionArrayCnt: updated later */ + of_PartArrayCnt = + rd_kafka_buf_write_arraycnt_pos(rkbuf); + } + + + /* + * Write requested fields + */ + for (fi = 0; fields[fi] != RD_KAFKA_TOPIC_PARTITION_FIELD_END; + fi++) { + switch (fields[fi]) { + case RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION: + rd_kafka_buf_write_i32(rkbuf, + rktpar->partition); + break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET: + rd_kafka_buf_write_i64(rkbuf, rktpar->offset); + break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_CURRENT_EPOCH: + rd_kafka_buf_write_i32( + rkbuf, + rd_kafka_topic_partition_get_current_leader_epoch( + rktpar)); + break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH: + rd_kafka_buf_write_i32( + rkbuf, + rd_kafka_topic_partition_get_leader_epoch( + rktpar)); + break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_ERR: + rd_kafka_buf_write_i16(rkbuf, rktpar->err); + break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA: + /* Java client 0.9.0 and broker <0.10.0 can't + * parse Null metadata fields, so as a + * workaround we send an empty string if + * it's Null. */ + if (!rktpar->metadata) + rd_kafka_buf_write_str(rkbuf, "", 0); + else + rd_kafka_buf_write_str( + rkbuf, rktpar->metadata, + rktpar->metadata_size); + break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP: + break; + case RD_KAFKA_TOPIC_PARTITION_FIELD_END: + break; + } + } + + + if (fi > 1) + /* If there was more than one field written + * then this was a struct and thus needs the + * struct suffix tags written. */ + rd_kafka_buf_write_tags(rkbuf); + + PartArrayCnt++; + cnt++; + } + + if (of_PartArrayCnt > 0) { + rd_kafka_buf_finalize_arraycnt(rkbuf, of_PartArrayCnt, + PartArrayCnt); + /* Tags for topic struct */ + rd_kafka_buf_write_tags(rkbuf); + } + + rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicArrayCnt, TopicArrayCnt); + + return cnt; +} + + +/** + * @brief Send FindCoordinatorRequest. + * + * @param coordkey is the group.id for RD_KAFKA_COORD_GROUP, + * and the transactional.id for RD_KAFKA_COORD_TXN + */ +rd_kafka_resp_err_t +rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb, + rd_kafka_coordtype_t coordtype, + const char *coordkey, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_FindCoordinator, 0, 2, NULL); + + if (coordtype != RD_KAFKA_COORD_GROUP && ApiVersion < 1) + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + + rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_FindCoordinator, 1, + 1 + 2 + strlen(coordkey)); + + rd_kafka_buf_write_str(rkbuf, coordkey, -1); + + if (ApiVersion >= 1) + rd_kafka_buf_write_i8(rkbuf, (int8_t)coordtype); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + + +/** + * @brief Parses a ListOffsets reply. + * + * Returns the parsed offsets (and errors) in \p offsets which must have been + * initialized by caller. + * + * @returns 0 on success, else an error (\p offsets may be completely or + * partially updated, depending on the nature of the error, and per + * partition error codes should be checked by the caller). + */ +static rd_kafka_resp_err_t +rd_kafka_parse_ListOffsets(rd_kafka_buf_t *rkbuf, + rd_kafka_topic_partition_list_t *offsets) { + const int log_decode_errors = LOG_ERR; + int32_t TopicArrayCnt; + int16_t api_version; + rd_kafka_resp_err_t all_err = RD_KAFKA_RESP_ERR_NO_ERROR; + + api_version = rkbuf->rkbuf_reqhdr.ApiVersion; + + if (api_version >= 2) + rd_kafka_buf_read_throttle_time(rkbuf); + + /* NOTE: + * Broker may return offsets in a different constellation than + * in the original request .*/ + + rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); + while (TopicArrayCnt-- > 0) { + rd_kafkap_str_t ktopic; + int32_t PartArrayCnt; + char *topic_name; + + rd_kafka_buf_read_str(rkbuf, &ktopic); + rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); + + RD_KAFKAP_STR_DUPA(&topic_name, &ktopic); + + while (PartArrayCnt-- > 0) { + int32_t kpartition; + int16_t ErrorCode; + int32_t OffsetArrayCnt; + int64_t Offset = -1; + int32_t LeaderEpoch = -1; + rd_kafka_topic_partition_t *rktpar; + + rd_kafka_buf_read_i32(rkbuf, &kpartition); + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + + if (api_version >= 1) { + int64_t Timestamp; + rd_kafka_buf_read_i64(rkbuf, &Timestamp); + rd_kafka_buf_read_i64(rkbuf, &Offset); + if (api_version >= 4) + rd_kafka_buf_read_i32(rkbuf, + &LeaderEpoch); + } else if (api_version == 0) { + rd_kafka_buf_read_i32(rkbuf, &OffsetArrayCnt); + /* We only request one offset so just grab + * the first one. */ + while (OffsetArrayCnt-- > 0) + rd_kafka_buf_read_i64(rkbuf, &Offset); + } else { + RD_NOTREACHED(); + } + + rktpar = rd_kafka_topic_partition_list_add( + offsets, topic_name, kpartition); + rktpar->err = ErrorCode; + rktpar->offset = Offset; + rd_kafka_topic_partition_set_leader_epoch(rktpar, + LeaderEpoch); + + if (ErrorCode && !all_err) + all_err = ErrorCode; + } + } + + return all_err; + +err_parse: + return rkbuf->rkbuf_err; +} + + + +/** + * @brief Parses and handles ListOffsets replies. + * + * Returns the parsed offsets (and errors) in \p offsets. + * \p offsets must be initialized by the caller. + * + * @returns 0 on success, else an error. \p offsets may be populated on error, + * depending on the nature of the error. + * On error \p actionsp (unless NULL) is updated with the recommended + * error actions. + */ +rd_kafka_resp_err_t +rd_kafka_handle_ListOffsets(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + rd_kafka_topic_partition_list_t *offsets, + int *actionsp) { + + int actions; + + if (!err) + err = rd_kafka_parse_ListOffsets(rkbuf, offsets); + if (!err) + return RD_KAFKA_RESP_ERR_NO_ERROR; + + actions = rd_kafka_err_action( + rkb, err, request, RD_KAFKA_ERR_ACTION_PERMANENT, + RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, + + RD_KAFKA_ERR_ACTION_REFRESH, + RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, + + RD_KAFKA_ERR_ACTION_REFRESH, + RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, + + RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, + + RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE, + + RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE, + + RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH, + + RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH, + + RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TRANSPORT, + + RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, + + + RD_KAFKA_ERR_ACTION_END); + + if (actionsp) + *actionsp = actions; + + if (rkb) + rd_rkb_dbg( + rkb, TOPIC, "OFFSET", "OffsetRequest failed: %s (%s)", + rd_kafka_err2str(err), rd_kafka_actions2str(actions)); + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + char tmp[256]; + /* Re-query for leader */ + rd_snprintf(tmp, sizeof(tmp), "ListOffsetsRequest failed: %s", + rd_kafka_err2str(err)); + rd_kafka_metadata_refresh_known_topics(rk, NULL, + rd_true /*force*/, tmp); + } + + if ((actions & RD_KAFKA_ERR_ACTION_RETRY) && + rd_kafka_buf_retry(rkb, request)) + return RD_KAFKA_RESP_ERR__IN_PROGRESS; + + return err; +} + + + +/** + * @brief Async maker for ListOffsetsRequest. + */ +static rd_kafka_resp_err_t +rd_kafka_make_ListOffsetsRequest(rd_kafka_broker_t *rkb, + rd_kafka_buf_t *rkbuf, + void *make_opaque) { + const rd_kafka_topic_partition_list_t *partitions = + (const rd_kafka_topic_partition_list_t *)make_opaque; + int i; + size_t of_TopicArrayCnt = 0, of_PartArrayCnt = 0; + const char *last_topic = ""; + int32_t topic_cnt = 0, part_cnt = 0; + int16_t ApiVersion; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_ListOffsets, 0, 5, NULL); + if (ApiVersion == -1) + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + + /* ReplicaId */ + rd_kafka_buf_write_i32(rkbuf, -1); + + /* IsolationLevel */ + if (ApiVersion >= 2) + rd_kafka_buf_write_i8(rkbuf, + rkb->rkb_rk->rk_conf.isolation_level); + + /* TopicArrayCnt */ + of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* updated later */ + + for (i = 0; i < partitions->cnt; i++) { + const rd_kafka_topic_partition_t *rktpar = + &partitions->elems[i]; + + if (strcmp(rktpar->topic, last_topic)) { + /* Finish last topic, if any. */ + if (of_PartArrayCnt > 0) + rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, + part_cnt); + + /* Topic */ + rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); + topic_cnt++; + last_topic = rktpar->topic; + /* New topic so reset partition count */ + part_cnt = 0; + + /* PartitionArrayCnt: updated later */ + of_PartArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); + } + + /* Partition */ + rd_kafka_buf_write_i32(rkbuf, rktpar->partition); + part_cnt++; + + if (ApiVersion >= 4) + /* CurrentLeaderEpoch */ + rd_kafka_buf_write_i32( + rkbuf, + rd_kafka_topic_partition_get_current_leader_epoch( + rktpar)); + + /* Time/Offset */ + rd_kafka_buf_write_i64(rkbuf, rktpar->offset); + + if (ApiVersion == 0) { + /* MaxNumberOfOffsets */ + rd_kafka_buf_write_i32(rkbuf, 1); + } + } + + if (of_PartArrayCnt > 0) { + rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, part_cnt); + rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, topic_cnt); + } + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_rkb_dbg(rkb, TOPIC, "OFFSET", + "ListOffsetsRequest (v%hd, opv %d) " + "for %" PRId32 " topic(s) and %" PRId32 " partition(s)", + ApiVersion, rkbuf->rkbuf_replyq.version, topic_cnt, + partitions->cnt); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Send ListOffsetsRequest for partitions in \p partitions. + */ +void rd_kafka_ListOffsetsRequest(rd_kafka_broker_t *rkb, + rd_kafka_topic_partition_list_t *partitions, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + rd_kafka_topic_partition_list_t *make_parts; + + make_parts = rd_kafka_topic_partition_list_copy(partitions); + rd_kafka_topic_partition_list_sort_by_topic(make_parts); + + rkbuf = rd_kafka_buf_new_request( + rkb, RD_KAFKAP_ListOffsets, 1, + /* ReplicaId+IsolationLevel+TopicArrayCnt+Topic */ + 4 + 1 + 4 + 100 + + /* PartArrayCnt */ + 4 + + /* partition_cnt * Partition+Time+MaxNumOffs */ + (make_parts->cnt * (4 + 8 + 4))); + + /* Postpone creating the request contents until time to send, + * at which time the ApiVersion is known. */ + rd_kafka_buf_set_maker(rkbuf, rd_kafka_make_ListOffsetsRequest, + make_parts, + rd_kafka_topic_partition_list_destroy_free); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); +} + + +/** + * @brief OffsetForLeaderEpochResponse handler. + */ +rd_kafka_resp_err_t rd_kafka_handle_OffsetForLeaderEpoch( + rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + rd_kafka_topic_partition_list_t **offsets) { + const int log_decode_errors = LOG_ERR; + int16_t ApiVersion; + + if (err) + goto err; + + ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion; + + if (ApiVersion >= 2) + rd_kafka_buf_read_throttle_time(rkbuf); + + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_ERR, + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + ApiVersion >= 1 ? RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH + : RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, + RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + *offsets = rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields); + if (!*offsets) + goto err_parse; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err: + return err; + +err_parse: + err = rkbuf->rkbuf_err; + goto err; +} + + +/** + * @brief Send OffsetForLeaderEpochRequest for partition(s). + * + */ +void rd_kafka_OffsetForLeaderEpochRequest( + rd_kafka_broker_t *rkb, + rd_kafka_topic_partition_list_t *parts, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_OffsetForLeaderEpoch, 2, 2, NULL); + /* If the supported ApiVersions are not yet known, + * or this broker doesn't support it, we let this request + * succeed or fail later from the broker thread where the + * version is checked again. */ + if (ApiVersion == -1) + ApiVersion = 2; + + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_OffsetForLeaderEpoch, 1, 4 + (parts->cnt * 64), + ApiVersion >= 4 /*flexver*/); + + /* Sort partitions by topic */ + rd_kafka_topic_partition_list_sort_by_topic(parts); + + /* Write partition list */ + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + /* CurrentLeaderEpoch */ + RD_KAFKA_TOPIC_PARTITION_FIELD_CURRENT_EPOCH, + /* LeaderEpoch */ + RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + rd_kafka_buf_write_topic_partitions( + rkbuf, parts, rd_false /*include invalid offsets*/, + rd_false /*skip valid offsets */, fields); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + /* Let caller perform retries */ + rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); +} + + + +/** + * Generic handler for OffsetFetch responses. + * Offsets for included partitions will be propagated through the passed + * 'offsets' list. + * + * @param rkbuf response buffer, may be NULL if \p err is set. + * @param update_toppar update toppar's committed_offset + * @param add_part if true add partitions from the response to \p *offsets, + * else just update the partitions that are already + * in \p *offsets. + */ +rd_kafka_resp_err_t +rd_kafka_handle_OffsetFetch(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + rd_kafka_topic_partition_list_t **offsets, + rd_bool_t update_toppar, + rd_bool_t add_part, + rd_bool_t allow_retry) { + const int log_decode_errors = LOG_ERR; + int32_t TopicArrayCnt; + int64_t offset = RD_KAFKA_OFFSET_INVALID; + int16_t ApiVersion; + rd_kafkap_str_t metadata; + int retry_unstable = 0; + int i; + int actions; + int seen_cnt = 0; + + if (err) + goto err; + + ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion; + + if (ApiVersion >= 3) + rd_kafka_buf_read_throttle_time(rkbuf); + + if (!*offsets) + *offsets = rd_kafka_topic_partition_list_new(16); + + /* Set default offset for all partitions. */ + rd_kafka_topic_partition_list_set_offsets(rkb->rkb_rk, *offsets, 0, + RD_KAFKA_OFFSET_INVALID, + 0 /* !is commit */); + + rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); + for (i = 0; i < TopicArrayCnt; i++) { + rd_kafkap_str_t topic; + int32_t PartArrayCnt; + char *topic_name; + int j; + + rd_kafka_buf_read_str(rkbuf, &topic); + + rd_kafka_buf_read_arraycnt(rkbuf, &PartArrayCnt, + RD_KAFKAP_PARTITIONS_MAX); + + RD_KAFKAP_STR_DUPA(&topic_name, &topic); + + for (j = 0; j < PartArrayCnt; j++) { + int32_t partition; + rd_kafka_toppar_t *rktp; + rd_kafka_topic_partition_t *rktpar; + int32_t LeaderEpoch = -1; + int16_t err2; + + rd_kafka_buf_read_i32(rkbuf, &partition); + rd_kafka_buf_read_i64(rkbuf, &offset); + if (ApiVersion >= 5) + rd_kafka_buf_read_i32(rkbuf, &LeaderEpoch); + rd_kafka_buf_read_str(rkbuf, &metadata); + rd_kafka_buf_read_i16(rkbuf, &err2); + rd_kafka_buf_skip_tags(rkbuf); + + rktpar = rd_kafka_topic_partition_list_find( + *offsets, topic_name, partition); + if (!rktpar && add_part) + rktpar = rd_kafka_topic_partition_list_add( + *offsets, topic_name, partition); + else if (!rktpar) { + rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH", + "OffsetFetchResponse: %s [%" PRId32 + "] " + "not found in local list: ignoring", + topic_name, partition); + continue; + } + + seen_cnt++; + + rktp = rd_kafka_topic_partition_get_toppar( + rk, rktpar, rd_false /*no create on miss*/); + + /* broker reports invalid offset as -1 */ + if (offset == -1) + rktpar->offset = RD_KAFKA_OFFSET_INVALID; + else + rktpar->offset = offset; + + rd_kafka_topic_partition_set_leader_epoch(rktpar, + LeaderEpoch); + rktpar->err = err2; + + rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH", + "OffsetFetchResponse: %s [%" PRId32 + "] " + "offset %" PRId64 ", leader epoch %" PRId32 + ", metadata %d byte(s): %s", + topic_name, partition, offset, LeaderEpoch, + RD_KAFKAP_STR_LEN(&metadata), + rd_kafka_err2name(rktpar->err)); + + if (update_toppar && !err2 && rktp) { + /* Update toppar's committed offset */ + rd_kafka_toppar_lock(rktp); + rktp->rktp_committed_pos = + rd_kafka_topic_partition_get_fetch_pos( + rktpar); + rd_kafka_toppar_unlock(rktp); + } + + if (rktpar->err == + RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) + retry_unstable++; + + + if (rktpar->metadata) + rd_free(rktpar->metadata); + + if (RD_KAFKAP_STR_IS_NULL(&metadata)) { + rktpar->metadata = NULL; + rktpar->metadata_size = 0; + } else { + rktpar->metadata = RD_KAFKAP_STR_DUP(&metadata); + rktpar->metadata_size = + RD_KAFKAP_STR_LEN(&metadata); + } + + /* Loose ref from get_toppar() */ + if (rktp) + rd_kafka_toppar_destroy(rktp); + } + + rd_kafka_buf_skip_tags(rkbuf); + } + + if (ApiVersion >= 2) { + int16_t ErrorCode; + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + if (ErrorCode) { + err = ErrorCode; + goto err; + } + } + + +err: + if (!*offsets) + rd_rkb_dbg(rkb, TOPIC, "OFFFETCH", "OffsetFetch returned %s", + rd_kafka_err2str(err)); + else + rd_rkb_dbg(rkb, TOPIC, "OFFFETCH", + "OffsetFetch for %d/%d partition(s) " + "(%d unstable partition(s)) returned %s", + seen_cnt, (*offsets)->cnt, retry_unstable, + rd_kafka_err2str(err)); + + actions = + rd_kafka_err_action(rkb, err, request, RD_KAFKA_ERR_ACTION_END); + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + /* Re-query for coordinator */ + rd_kafka_cgrp_op(rkb->rkb_rk->rk_cgrp, NULL, RD_KAFKA_NO_REPLYQ, + RD_KAFKA_OP_COORD_QUERY, err); + } + + if (actions & RD_KAFKA_ERR_ACTION_RETRY || retry_unstable) { + if (allow_retry && rd_kafka_buf_retry(rkb, request)) + return RD_KAFKA_RESP_ERR__IN_PROGRESS; + /* FALLTHRU */ + } + + return err; + +err_parse: + err = rkbuf->rkbuf_err; + goto err; +} + + + +/** + * @brief Handle OffsetFetch response based on an RD_KAFKA_OP_OFFSET_FETCH + * rko in \p opaque. + * + * @param opaque rko wrapper for handle_OffsetFetch. + * + * The \c rko->rko_u.offset_fetch.partitions list will be filled in with + * the fetched offsets. + * + * A reply will be sent on 'rko->rko_replyq' with type RD_KAFKA_OP_OFFSET_FETCH. + * + * @remark \p rkb, \p rkbuf and \p request are optional. + * + * @remark The \p request buffer may be retried on error. + * + * @locality cgrp's broker thread + */ +void rd_kafka_op_handle_OffsetFetch(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_op_t *rko = opaque; + rd_kafka_op_t *rko_reply; + rd_kafka_topic_partition_list_t *offsets; + + RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_OFFSET_FETCH); + + if (err == RD_KAFKA_RESP_ERR__DESTROY) { + /* Termination, quick cleanup. */ + rd_kafka_op_destroy(rko); + return; + } + + offsets = rd_kafka_topic_partition_list_copy( + rko->rko_u.offset_fetch.partitions); + + /* If all partitions already had usable offsets then there + * was no request sent and thus no reply, the offsets list is + * good to go.. */ + if (rkbuf) { + /* ..else parse the response (or perror) */ + err = rd_kafka_handle_OffsetFetch( + rkb->rkb_rk, rkb, err, rkbuf, request, &offsets, + rd_false /*dont update rktp*/, rd_false /*dont add part*/, + /* Allow retries if replyq is valid */ + rd_kafka_op_replyq_is_valid(rko)); + if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) { + if (offsets) + rd_kafka_topic_partition_list_destroy(offsets); + return; /* Retrying */ + } + } + + rko_reply = + rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH | RD_KAFKA_OP_REPLY); + rko_reply->rko_err = err; + rko_reply->rko_u.offset_fetch.partitions = offsets; + rko_reply->rko_u.offset_fetch.do_free = 1; + if (rko->rko_rktp) + rko_reply->rko_rktp = rd_kafka_toppar_keep(rko->rko_rktp); + + rd_kafka_replyq_enq(&rko->rko_replyq, rko_reply, 0); + + rd_kafka_op_destroy(rko); +} + +/** + * Send OffsetFetchRequest for a consumer group id. + * + * Any partition with a usable offset will be ignored, if all partitions + * have usable offsets then no request is sent at all but an empty + * reply is enqueued on the replyq. + * + * @param group_id Request offset for this group id. + * @param parts (optional) List of topic partitions to request, + * or NULL to return all topic partitions associated with the + * group. + * @param require_stable_offsets Whether broker should return stable offsets + * (transaction-committed). + * @param timeout Optional timeout to set to the buffer. + */ +void rd_kafka_OffsetFetchRequest(rd_kafka_broker_t *rkb, + const char *group_id, + rd_kafka_topic_partition_list_t *parts, + rd_bool_t require_stable_offsets, + int timeout, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion; + size_t parts_size = 0; + int PartCnt = -1; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_OffsetFetch, 0, 7, NULL); + + if (parts) { + parts_size = parts->cnt * 32; + } + + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_OffsetFetch, 1, + /* GroupId + rd_kafka_buf_write_arraycnt_pos + + * Topics + RequireStable */ + 32 + 4 + parts_size + 1, ApiVersion >= 6 /*flexver*/); + + /* ConsumerGroup */ + rd_kafka_buf_write_str(rkbuf, group_id, -1); + + if (parts) { + /* Sort partitions by topic */ + rd_kafka_topic_partition_list_sort_by_topic(parts); + + /* Write partition list, filtering out partitions with valid + * offsets */ + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + PartCnt = rd_kafka_buf_write_topic_partitions( + rkbuf, parts, rd_false /*include invalid offsets*/, + rd_false /*skip valid offsets */, fields); + } else { + rd_kafka_buf_write_arraycnt_pos(rkbuf); + } + + if (ApiVersion >= 7) { + /* RequireStable */ + rd_kafka_buf_write_i8(rkbuf, require_stable_offsets); + } + + if (PartCnt == 0) { + /* No partitions needs OffsetFetch, enqueue empty + * response right away. */ + rkbuf->rkbuf_replyq = replyq; + rkbuf->rkbuf_cb = resp_cb; + rkbuf->rkbuf_opaque = opaque; + rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf); + return; + } + + if (timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) + rd_kafka_buf_set_abs_timeout(rkbuf, timeout + 1000, 0); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + if (parts) { + rd_rkb_dbg( + rkb, TOPIC | RD_KAFKA_DBG_CGRP | RD_KAFKA_DBG_CONSUMER, + "OFFSET", + "Group %s OffsetFetchRequest(v%d) for %d/%d partition(s)", + group_id, ApiVersion, PartCnt, parts->cnt); + } else { + rd_rkb_dbg( + rkb, TOPIC | RD_KAFKA_DBG_CGRP | RD_KAFKA_DBG_CONSUMER, + "OFFSET", + "Group %s OffsetFetchRequest(v%d) for all partitions", + group_id, ApiVersion); + } + + /* Let handler decide if retries should be performed */ + rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES; + + if (parts) { + rd_rkb_dbg(rkb, CGRP | RD_KAFKA_DBG_CONSUMER, "OFFSET", + "Fetch committed offsets for %d/%d partition(s)", + PartCnt, parts->cnt); + } else { + rd_rkb_dbg(rkb, CGRP | RD_KAFKA_DBG_CONSUMER, "OFFSET", + "Fetch committed offsets all the partitions"); + } + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); +} + + + +/** + * @brief Handle per-partition OffsetCommit errors and returns actions flags. + */ +static int +rd_kafka_handle_OffsetCommit_error(rd_kafka_broker_t *rkb, + rd_kafka_buf_t *request, + const rd_kafka_topic_partition_t *rktpar) { + + /* These actions are mimicking AK's ConsumerCoordinator.java */ + + return rd_kafka_err_action( + rkb, rktpar->err, request, + + RD_KAFKA_ERR_ACTION_PERMANENT, + RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED, + + RD_KAFKA_ERR_ACTION_PERMANENT, + RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED, + + + RD_KAFKA_ERR_ACTION_PERMANENT, + RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE, + + RD_KAFKA_ERR_ACTION_PERMANENT, + RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE, + + + RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS, + + RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, + + + /* .._SPECIAL: mark coordinator dead, refresh and retry */ + RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY | + RD_KAFKA_ERR_ACTION_SPECIAL, + RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, + + RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY | + RD_KAFKA_ERR_ACTION_SPECIAL, + RD_KAFKA_RESP_ERR_NOT_COORDINATOR, + + /* Replicas possibly unavailable: + * Refresh coordinator (but don't mark as dead (!.._SPECIAL)), + * and retry */ + RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, + + + /* FIXME: There are some cases in the Java code where + * this is not treated as a fatal error. */ + RD_KAFKA_ERR_ACTION_PERMANENT | RD_KAFKA_ERR_ACTION_FATAL, + RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID, + + + RD_KAFKA_ERR_ACTION_PERMANENT, + RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS, + + + RD_KAFKA_ERR_ACTION_PERMANENT, RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID, + + RD_KAFKA_ERR_ACTION_PERMANENT, RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + + RD_KAFKA_ERR_ACTION_END); +} + + +/** + * @brief Handle OffsetCommit response. + * + * @remark \p offsets may be NULL if \p err is set + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if all partitions were successfully + * committed, + * RD_KAFKA_RESP_ERR__IN_PROGRESS if a retry was scheduled, + * or any other error code if the request was not retried. + */ +rd_kafka_resp_err_t +rd_kafka_handle_OffsetCommit(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + rd_kafka_topic_partition_list_t *offsets, + rd_bool_t ignore_cgrp) { + const int log_decode_errors = LOG_ERR; + int32_t TopicArrayCnt; + int errcnt = 0; + int partcnt = 0; + int i; + int actions = 0; + + if (err) + goto err; + + if (rd_kafka_buf_ApiVersion(rkbuf) >= 3) + rd_kafka_buf_read_throttle_time(rkbuf); + + rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); + for (i = 0; i < TopicArrayCnt; i++) { + rd_kafkap_str_t topic; + char *topic_str; + int32_t PartArrayCnt; + int j; + + rd_kafka_buf_read_str(rkbuf, &topic); + rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); + + RD_KAFKAP_STR_DUPA(&topic_str, &topic); + + for (j = 0; j < PartArrayCnt; j++) { + int32_t partition; + int16_t ErrorCode; + rd_kafka_topic_partition_t *rktpar; + + rd_kafka_buf_read_i32(rkbuf, &partition); + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + + rktpar = rd_kafka_topic_partition_list_find( + offsets, topic_str, partition); + + if (!rktpar) { + /* Received offset for topic/partition we didn't + * ask for, this shouldn't really happen. */ + continue; + } + + rktpar->err = ErrorCode; + if (ErrorCode) { + err = ErrorCode; + errcnt++; + + /* Accumulate actions for per-partition + * errors. */ + actions |= rd_kafka_handle_OffsetCommit_error( + rkb, request, rktpar); + } + + partcnt++; + } + } + + /* If all partitions failed use error code + * from last partition as the global error. */ + if (offsets && err && errcnt == partcnt) + goto err; + + goto done; + +err_parse: + err = rkbuf->rkbuf_err; + +err: + if (!actions) /* Transport/Request-level error */ + actions = rd_kafka_err_action(rkb, err, request, + + RD_KAFKA_ERR_ACTION_REFRESH | + RD_KAFKA_ERR_ACTION_SPECIAL | + RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR__TRANSPORT, + + RD_KAFKA_ERR_ACTION_END); + + if (!ignore_cgrp && (actions & RD_KAFKA_ERR_ACTION_FATAL)) { + rd_kafka_set_fatal_error(rk, err, "OffsetCommit failed: %s", + rd_kafka_err2str(err)); + return err; + } + + if (!ignore_cgrp && (actions & RD_KAFKA_ERR_ACTION_REFRESH) && + rk->rk_cgrp) { + /* Mark coordinator dead or re-query for coordinator. + * ..dead() will trigger a re-query. */ + if (actions & RD_KAFKA_ERR_ACTION_SPECIAL) + rd_kafka_cgrp_coord_dead(rk->rk_cgrp, err, + "OffsetCommitRequest failed"); + else + rd_kafka_cgrp_coord_query(rk->rk_cgrp, + "OffsetCommitRequest failed"); + } + + if (!ignore_cgrp && actions & RD_KAFKA_ERR_ACTION_RETRY && + !(actions & RD_KAFKA_ERR_ACTION_PERMANENT) && + rd_kafka_buf_retry(rkb, request)) + return RD_KAFKA_RESP_ERR__IN_PROGRESS; + +done: + return err; +} + +/** + * @brief Send OffsetCommitRequest for a list of partitions. + * + * @param cgmetadata consumer group metadata. + * + * @param offsets - offsets to commit for each topic-partition. + * + * @returns 0 if none of the partitions in \p offsets had valid offsets, + * else 1. + */ +int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb, + rd_kafka_consumer_group_metadata_t *cgmetadata, + rd_kafka_topic_partition_list_t *offsets, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque, + const char *reason) { + rd_kafka_buf_t *rkbuf; + ssize_t of_TopicCnt = -1; + int TopicCnt = 0; + const char *last_topic = NULL; + ssize_t of_PartCnt = -1; + int PartCnt = 0; + int tot_PartCnt = 0; + int i; + int16_t ApiVersion; + int features; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_OffsetCommit, 0, 7, &features); + + rd_kafka_assert(NULL, offsets != NULL); + + rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit, 1, + 100 + (offsets->cnt * 128)); + + /* ConsumerGroup */ + rd_kafka_buf_write_str(rkbuf, cgmetadata->group_id, -1); + + /* v1,v2 */ + if (ApiVersion >= 1) { + /* ConsumerGroupGenerationId */ + rd_kafka_buf_write_i32(rkbuf, cgmetadata->generation_id); + /* ConsumerId */ + rd_kafka_buf_write_str(rkbuf, cgmetadata->member_id, -1); + } + + /* v7: GroupInstanceId */ + if (ApiVersion >= 7) + rd_kafka_buf_write_str(rkbuf, cgmetadata->group_instance_id, + -1); + + /* v2-4: RetentionTime */ + if (ApiVersion >= 2 && ApiVersion <= 4) + rd_kafka_buf_write_i64(rkbuf, -1); + + /* Sort offsets by topic */ + rd_kafka_topic_partition_list_sort_by_topic(offsets); + + /* TopicArrayCnt: Will be updated when we know the number of topics. */ + of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); + + for (i = 0; i < offsets->cnt; i++) { + rd_kafka_topic_partition_t *rktpar = &offsets->elems[i]; + + /* Skip partitions with invalid offset. */ + if (rktpar->offset < 0) + continue; + + if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) { + /* New topic */ + + /* Finalize previous PartitionCnt */ + if (PartCnt > 0) + rd_kafka_buf_update_u32(rkbuf, of_PartCnt, + PartCnt); + + /* TopicName */ + rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); + /* PartitionCnt, finalized later */ + of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); + PartCnt = 0; + last_topic = rktpar->topic; + TopicCnt++; + } + + /* Partition */ + rd_kafka_buf_write_i32(rkbuf, rktpar->partition); + PartCnt++; + tot_PartCnt++; + + /* Offset */ + rd_kafka_buf_write_i64(rkbuf, rktpar->offset); + + /* v6: KIP-101 CommittedLeaderEpoch */ + if (ApiVersion >= 6) + rd_kafka_buf_write_i32( + rkbuf, + rd_kafka_topic_partition_get_leader_epoch(rktpar)); + + /* v1: TimeStamp */ + if (ApiVersion == 1) + rd_kafka_buf_write_i64(rkbuf, -1); + + /* Metadata */ + /* Java client 0.9.0 and broker <0.10.0 can't parse + * Null metadata fields, so as a workaround we send an + * empty string if it's Null. */ + if (!rktpar->metadata) + rd_kafka_buf_write_str(rkbuf, "", 0); + else + rd_kafka_buf_write_str(rkbuf, rktpar->metadata, + rktpar->metadata_size); + } + + if (tot_PartCnt == 0) { + /* No topic+partitions had valid offsets to commit. */ + rd_kafka_replyq_destroy(&replyq); + rd_kafka_buf_destroy(rkbuf); + return 0; + } + + /* Finalize previous PartitionCnt */ + if (PartCnt > 0) + rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt); + + /* Finalize TopicCnt */ + rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_rkb_dbg(rkb, TOPIC, "OFFSET", + "Enqueue OffsetCommitRequest(v%d, %d/%d partition(s))): %s", + ApiVersion, tot_PartCnt, offsets->cnt, reason); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return 1; +} + +/** + * @brief Construct and send OffsetDeleteRequest to \p rkb + * with the partitions in del_grpoffsets (DeleteConsumerGroupOffsets_t*) + * using \p options. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @remark Only one del_grpoffsets element is supported. + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code and errstr will be + * updated with a human readable error string. + */ +rd_kafka_resp_err_t +rd_kafka_OffsetDeleteRequest(rd_kafka_broker_t *rkb, + /** (rd_kafka_DeleteConsumerGroupOffsets_t*) */ + const rd_list_t *del_grpoffsets, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + int features; + const rd_kafka_DeleteConsumerGroupOffsets_t *grpoffsets = + rd_list_elem(del_grpoffsets, 0); + + rd_assert(rd_list_cnt(del_grpoffsets) == 1); + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_OffsetDelete, 0, 0, &features); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "OffsetDelete API (KIP-496) not supported " + "by broker, requires broker version >= 2.4.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + rkbuf = rd_kafka_buf_new_request( + rkb, RD_KAFKAP_OffsetDelete, 1, + 2 + strlen(grpoffsets->group) + (64 * grpoffsets->partitions->cnt)); + + /* GroupId */ + rd_kafka_buf_write_str(rkbuf, grpoffsets->group, -1); + + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + rd_kafka_buf_write_topic_partitions( + rkbuf, grpoffsets->partitions, + rd_false /*dont skip invalid offsets*/, rd_false /*any offset*/, + fields); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + + +/** + * @brief Write "consumer" protocol type MemberState for SyncGroupRequest to + * enveloping buffer \p rkbuf. + */ +static void +rd_kafka_group_MemberState_consumer_write(rd_kafka_buf_t *env_rkbuf, + const rd_kafka_group_member_t *rkgm) { + rd_kafka_buf_t *rkbuf; + rd_slice_t slice; + + rkbuf = rd_kafka_buf_new(1, 100); + rd_kafka_buf_write_i16(rkbuf, 0); /* Version */ + rd_assert(rkgm->rkgm_assignment); + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + rd_kafka_buf_write_topic_partitions( + rkbuf, rkgm->rkgm_assignment, + rd_false /*don't skip invalid offsets*/, rd_false /* any offset */, + fields); + rd_kafka_buf_write_kbytes(rkbuf, rkgm->rkgm_userdata); + + /* Get pointer to binary buffer */ + rd_slice_init_full(&slice, &rkbuf->rkbuf_buf); + + /* Write binary buffer as Kafka Bytes to enveloping buffer. */ + rd_kafka_buf_write_i32(env_rkbuf, (int32_t)rd_slice_remains(&slice)); + rd_buf_write_slice(&env_rkbuf->rkbuf_buf, &slice); + + rd_kafka_buf_destroy(rkbuf); +} + +/** + * Send SyncGroupRequest + */ +void rd_kafka_SyncGroupRequest(rd_kafka_broker_t *rkb, + const rd_kafkap_str_t *group_id, + int32_t generation_id, + const rd_kafkap_str_t *member_id, + const rd_kafkap_str_t *group_instance_id, + const rd_kafka_group_member_t *assignments, + int assignment_cnt, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int i; + int16_t ApiVersion; + int features; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_SyncGroup, 0, 3, &features); + + rkbuf = rd_kafka_buf_new_request( + rkb, RD_KAFKAP_SyncGroup, 1, + RD_KAFKAP_STR_SIZE(group_id) + 4 /* GenerationId */ + + RD_KAFKAP_STR_SIZE(member_id) + + RD_KAFKAP_STR_SIZE(group_instance_id) + + 4 /* array size group_assignment */ + + (assignment_cnt * 100 /*guess*/)); + rd_kafka_buf_write_kstr(rkbuf, group_id); + rd_kafka_buf_write_i32(rkbuf, generation_id); + rd_kafka_buf_write_kstr(rkbuf, member_id); + if (ApiVersion >= 3) + rd_kafka_buf_write_kstr(rkbuf, group_instance_id); + rd_kafka_buf_write_i32(rkbuf, assignment_cnt); + + for (i = 0; i < assignment_cnt; i++) { + const rd_kafka_group_member_t *rkgm = &assignments[i]; + + rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id); + rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm); + } + + /* This is a blocking request */ + rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING; + rd_kafka_buf_set_abs_timeout( + rkbuf, + rkb->rkb_rk->rk_conf.group_session_timeout_ms + + 3000 /* 3s grace period*/, + 0); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); +} + + + +/** + * Send JoinGroupRequest + */ +void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb, + const rd_kafkap_str_t *group_id, + const rd_kafkap_str_t *member_id, + const rd_kafkap_str_t *group_instance_id, + const rd_kafkap_str_t *protocol_type, + const rd_list_t *topics, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_assignor_t *rkas; + int i; + int16_t ApiVersion = 0; + int features; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_JoinGroup, 0, 5, &features); + + + rkbuf = rd_kafka_buf_new_request( + rkb, RD_KAFKAP_JoinGroup, 1, + RD_KAFKAP_STR_SIZE(group_id) + 4 /* sessionTimeoutMs */ + + 4 /* rebalanceTimeoutMs */ + RD_KAFKAP_STR_SIZE(member_id) + + RD_KAFKAP_STR_SIZE(group_instance_id) + + RD_KAFKAP_STR_SIZE(protocol_type) + + 4 /* array count GroupProtocols */ + + (rd_list_cnt(topics) * 100)); + rd_kafka_buf_write_kstr(rkbuf, group_id); + rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.group_session_timeout_ms); + if (ApiVersion >= 1) + rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.max_poll_interval_ms); + rd_kafka_buf_write_kstr(rkbuf, member_id); + if (ApiVersion >= 5) + rd_kafka_buf_write_kstr(rkbuf, group_instance_id); + rd_kafka_buf_write_kstr(rkbuf, protocol_type); + rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt); + + RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) { + rd_kafkap_bytes_t *member_metadata; + if (!rkas->rkas_enabled) + continue; + rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name); + member_metadata = rkas->rkas_get_metadata_cb( + rkas, rk->rk_cgrp->rkcg_assignor_state, topics, + rk->rk_cgrp->rkcg_group_assignment); + rd_kafka_buf_write_kbytes(rkbuf, member_metadata); + rd_kafkap_bytes_destroy(member_metadata); + } + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + if (ApiVersion < 1 && + rk->rk_conf.max_poll_interval_ms > + rk->rk_conf.group_session_timeout_ms && + rd_interval(&rkb->rkb_suppress.unsupported_kip62, + /* at most once per day */ + (rd_ts_t)86400 * 1000 * 1000, 0) > 0) + rd_rkb_log(rkb, LOG_NOTICE, "MAXPOLL", + "Broker does not support KIP-62 " + "(requires Apache Kafka >= v0.10.1.0): " + "consumer configuration " + "`max.poll.interval.ms` (%d) " + "is effectively limited " + "by `session.timeout.ms` (%d) " + "with this broker version", + rk->rk_conf.max_poll_interval_ms, + rk->rk_conf.group_session_timeout_ms); + + + if (ApiVersion < 5 && rk->rk_conf.group_instance_id && + rd_interval(&rkb->rkb_suppress.unsupported_kip345, + /* at most once per day */ + (rd_ts_t)86400 * 1000 * 1000, 0) > 0) + rd_rkb_log(rkb, LOG_NOTICE, "STATICMEMBER", + "Broker does not support KIP-345 " + "(requires Apache Kafka >= v2.3.0): " + "consumer configuration " + "`group.instance.id` (%s) " + "will not take effect", + rk->rk_conf.group_instance_id); + + /* Absolute timeout */ + rd_kafka_buf_set_abs_timeout_force( + rkbuf, + /* Request timeout is max.poll.interval.ms + grace + * if the broker supports it, else + * session.timeout.ms + grace. */ + (ApiVersion >= 1 ? rk->rk_conf.max_poll_interval_ms + : rk->rk_conf.group_session_timeout_ms) + + 3000 /* 3s grace period*/, + 0); + + /* This is a blocking request */ + rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING; + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); +} + + + +/** + * Send LeaveGroupRequest + */ +void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb, + const char *group_id, + const char *member_id, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + int features; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_LeaveGroup, 0, 1, &features); + + rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup, 1, 300); + + rd_kafka_buf_write_str(rkbuf, group_id, -1); + rd_kafka_buf_write_str(rkbuf, member_id, -1); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + /* LeaveGroupRequests are best-effort, the local consumer + * does not care if it succeeds or not, so the request timeout + * is shortened. + * Retries are not needed. */ + rd_kafka_buf_set_abs_timeout(rkbuf, 5000, 0); + rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); +} + + +/** + * Handler for LeaveGroup responses + * opaque must be the cgrp handle. + */ +void rd_kafka_handle_LeaveGroup(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_cgrp_t *rkcg = opaque; + const int log_decode_errors = LOG_ERR; + int16_t ErrorCode = 0; + int actions; + + if (err) { + ErrorCode = err; + goto err; + } + + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + +err: + actions = rd_kafka_err_action(rkb, ErrorCode, request, + RD_KAFKA_ERR_ACTION_END); + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + /* Re-query for coordinator */ + rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, + RD_KAFKA_OP_COORD_QUERY, ErrorCode); + } + + if (actions & RD_KAFKA_ERR_ACTION_RETRY) { + if (rd_kafka_buf_retry(rkb, request)) + return; + /* FALLTHRU */ + } + + if (ErrorCode) + rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP", + "LeaveGroup response: %s", + rd_kafka_err2str(ErrorCode)); + + return; + +err_parse: + ErrorCode = rkbuf->rkbuf_err; + goto err; +} + + + +/** + * Send HeartbeatRequest + */ +void rd_kafka_HeartbeatRequest(rd_kafka_broker_t *rkb, + const rd_kafkap_str_t *group_id, + int32_t generation_id, + const rd_kafkap_str_t *member_id, + const rd_kafkap_str_t *group_instance_id, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + int features; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_Heartbeat, 0, 3, &features); + + rd_rkb_dbg(rkb, CGRP, "HEARTBEAT", + "Heartbeat for group \"%s\" generation id %" PRId32, + group_id->str, generation_id); + + rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Heartbeat, 1, + RD_KAFKAP_STR_SIZE(group_id) + + 4 /* GenerationId */ + + RD_KAFKAP_STR_SIZE(member_id)); + + rd_kafka_buf_write_kstr(rkbuf, group_id); + rd_kafka_buf_write_i32(rkbuf, generation_id); + rd_kafka_buf_write_kstr(rkbuf, member_id); + if (ApiVersion >= 3) + rd_kafka_buf_write_kstr(rkbuf, group_instance_id); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_buf_set_abs_timeout( + rkbuf, rkb->rkb_rk->rk_conf.group_session_timeout_ms, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); +} + + + +/** + * @brief Construct and send ListGroupsRequest to \p rkb + * with the states (const char *) in \p states. + * Uses \p max_ApiVersion as maximum API version, + * pass -1 to use the maximum available version. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @return NULL on success, a new error instance that must be + * released with rd_kafka_error_destroy() in case of error. + */ +rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb, + int16_t max_ApiVersion, + const char **states, + size_t states_cnt, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + size_t i; + + if (max_ApiVersion < 0) + max_ApiVersion = 4; + + if (max_ApiVersion > ApiVersion) { + /* Remark: don't check if max_ApiVersion is zero. + * As rd_kafka_broker_ApiVersion_supported cannot be checked + * in the application thread reliably . */ + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_ListGroups, 0, max_ApiVersion, NULL); + } + + if (ApiVersion == -1) { + return rd_kafka_error_new( + RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, + "ListGroupsRequest not supported by broker"); + } + + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_ListGroups, 1, + /* rd_kafka_buf_write_arraycnt_pos + tags + StatesFilter */ + 4 + 1 + 32 * states_cnt, ApiVersion >= 3 /* is_flexver */); + + if (ApiVersion >= 4) { + size_t of_GroupsArrayCnt = + rd_kafka_buf_write_arraycnt_pos(rkbuf); + for (i = 0; i < states_cnt; i++) { + rd_kafka_buf_write_str(rkbuf, states[i], -1); + } + rd_kafka_buf_finalize_arraycnt(rkbuf, of_GroupsArrayCnt, i); + } + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + return NULL; +} + +/** + * @brief Construct and send DescribeGroupsRequest to \p rkb + * with the groups (const char *) in \p groups. + * Uses \p max_ApiVersion as maximum API version, + * pass -1 to use the maximum available version. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @return NULL on success, a new error instance that must be + * released with rd_kafka_error_destroy() in case of error. + */ +rd_kafka_error_t *rd_kafka_DescribeGroupsRequest(rd_kafka_broker_t *rkb, + int16_t max_ApiVersion, + char **groups, + size_t group_cnt, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + size_t of_GroupsArrayCnt; + + if (max_ApiVersion < 0) + max_ApiVersion = 4; + + if (max_ApiVersion > ApiVersion) { + /* Remark: don't check if max_ApiVersion is zero. + * As rd_kafka_broker_ApiVersion_supported cannot be checked + * in the application thread reliably . */ + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_DescribeGroups, 0, max_ApiVersion, NULL); + } + + if (ApiVersion == -1) { + return rd_kafka_error_new( + RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, + "DescribeGroupsRequest not supported by broker"); + } + + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_DescribeGroups, 1, + 4 /* rd_kafka_buf_write_arraycnt_pos */ + + 1 /* IncludeAuthorizedOperations */ + 1 /* tags */ + + 32 * group_cnt /* Groups */, + rd_false); + + /* write Groups */ + of_GroupsArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf); + rd_kafka_buf_finalize_arraycnt(rkbuf, of_GroupsArrayCnt, group_cnt); + while (group_cnt-- > 0) + rd_kafka_buf_write_str(rkbuf, groups[group_cnt], -1); + + /* write IncludeAuthorizedOperations */ + if (ApiVersion >= 3) { + /* TODO: implement KIP-430 */ + rd_kafka_buf_write_bool(rkbuf, rd_false); + } + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + return NULL; +} + +/** + * @brief Generic handler for Metadata responses + * + * @locality rdkafka main thread + */ +static void rd_kafka_handle_Metadata(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_op_t *rko = opaque; /* Possibly NULL */ + struct rd_kafka_metadata *md = NULL; + const rd_list_t *topics = request->rkbuf_u.Metadata.topics; + int actions; + + rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY || + thrd_is_current(rk->rk_thread)); + + /* Avoid metadata updates when we're terminating. */ + if (rd_kafka_terminating(rkb->rkb_rk) || + err == RD_KAFKA_RESP_ERR__DESTROY) { + /* Terminating */ + goto done; + } + + if (err) + goto err; + + if (!topics) + rd_rkb_dbg(rkb, METADATA, "METADATA", + "===== Received metadata: %s =====", + request->rkbuf_u.Metadata.reason); + else + rd_rkb_dbg(rkb, METADATA, "METADATA", + "===== Received metadata " + "(for %d requested topics): %s =====", + rd_list_cnt(topics), + request->rkbuf_u.Metadata.reason); + + err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &md); + if (err) + goto err; + + if (rko && rko->rko_replyq.q) { + /* Reply to metadata requester, passing on the metadata. + * Reuse requesting rko for the reply. */ + rko->rko_err = err; + rko->rko_u.metadata.md = md; + + rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0); + rko = NULL; + } else { + if (md) + rd_free(md); + } + + goto done; + +err: + actions = rd_kafka_err_action(rkb, err, request, + + RD_KAFKA_ERR_ACTION_RETRY, + RD_KAFKA_RESP_ERR__PARTIAL, + + RD_KAFKA_ERR_ACTION_END); + + if (actions & RD_KAFKA_ERR_ACTION_RETRY) { + if (rd_kafka_buf_retry(rkb, request)) + return; + /* FALLTHRU */ + } else { + rd_rkb_log(rkb, LOG_WARNING, "METADATA", + "Metadata request failed: %s: %s (%dms): %s", + request->rkbuf_u.Metadata.reason, + rd_kafka_err2str(err), + (int)(request->rkbuf_ts_sent / 1000), + rd_kafka_actions2str(actions)); + /* Respond back to caller on non-retriable errors */ + if (rko && rko->rko_replyq.q) { + rko->rko_err = err; + rko->rko_u.metadata.md = NULL; + rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0); + rko = NULL; + } + } + + + + /* FALLTHRU */ + +done: + if (rko) + rd_kafka_op_destroy(rko); +} + + +/** + * @brief Construct MetadataRequest (does not send) + * + * \p topics is a list of topic names (char *) to request. + * + * !topics - only request brokers (if supported by broker, else + * all topics) + * topics.cnt==0 - all topics in cluster are requested + * topics.cnt >0 - only specified topics are requested + * + * @param reason - metadata request reason + * @param allow_auto_create_topics - allow broker-side auto topic creation. + * This is best-effort, depending on broker + * config and version. + * @param cgrp_update - Update cgrp in parse_Metadata (see comment there). + * @param rko - (optional) rko with replyq for handling response. + * Specifying an rko forces a metadata request even if + * there is already a matching one in-transit. + * + * If full metadata for all topics is requested (or all brokers, which + * results in all-topics on older brokers) and there is already a full request + * in transit then this function will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS + * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. If \p rko is non-NULL the request + * is sent regardless. + */ +rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb, + const rd_list_t *topics, + const char *reason, + rd_bool_t allow_auto_create_topics, + rd_bool_t cgrp_update, + rd_kafka_op_t *rko) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + size_t of_TopicArrayCnt; + int features; + int topic_cnt = topics ? rd_list_cnt(topics) : 0; + int *full_incr = NULL; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_Metadata, 0, 9, &features); + + rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_Metadata, 1, + 4 + (50 * topic_cnt) + 1, + ApiVersion >= 9); + + if (!reason) + reason = ""; + + rkbuf->rkbuf_u.Metadata.reason = rd_strdup(reason); + rkbuf->rkbuf_u.Metadata.cgrp_update = cgrp_update; + + /* TopicArrayCnt */ + of_TopicArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf); + + if (!topics) { + /* v0: keep 0, brokers only not available, + * request all topics */ + /* v1-8: 0 means empty array, brokers only */ + if (ApiVersion >= 9) { + /* v9+: varint encoded empty array (1), brokers only */ + rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicArrayCnt, + topic_cnt); + } + + rd_rkb_dbg(rkb, METADATA, "METADATA", + "Request metadata for brokers only: %s", reason); + full_incr = + &rkb->rkb_rk->rk_metadata_cache.rkmc_full_brokers_sent; + + } else if (topic_cnt == 0) { + /* v0: keep 0, request all topics */ + if (ApiVersion >= 1 && ApiVersion < 9) { + /* v1-8: update to -1, all topics */ + rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, -1); + } + /* v9+: keep 0, varint encoded null, all topics */ + + rkbuf->rkbuf_u.Metadata.all_topics = 1; + rd_rkb_dbg(rkb, METADATA, "METADATA", + "Request metadata for all topics: " + "%s", + reason); + + if (!rko) + full_incr = &rkb->rkb_rk->rk_metadata_cache + .rkmc_full_topics_sent; + + } else { + /* request cnt topics */ + rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicArrayCnt, + topic_cnt); + + rd_rkb_dbg(rkb, METADATA, "METADATA", + "Request metadata for %d topic(s): " + "%s", + topic_cnt, reason); + } + + if (full_incr) { + /* Avoid multiple outstanding full requests + * (since they are redundant and side-effect-less). + * Forced requests (app using metadata() API) are passed + * through regardless. */ + + mtx_lock(&rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock); + if (*full_incr > 0 && (!rko || !rko->rko_u.metadata.force)) { + mtx_unlock( + &rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock); + rd_rkb_dbg(rkb, METADATA, "METADATA", + "Skipping metadata request: %s: " + "full request already in-transit", + reason); + rd_kafka_buf_destroy(rkbuf); + return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS; + } + + (*full_incr)++; + mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock); + rkbuf->rkbuf_u.Metadata.decr = full_incr; + rkbuf->rkbuf_u.Metadata.decr_lock = + &rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock; + } + + + if (topic_cnt > 0) { + char *topic; + int i; + + /* Maintain a copy of the topics list so we can purge + * hints from the metadata cache on error. */ + rkbuf->rkbuf_u.Metadata.topics = + rd_list_copy(topics, rd_list_string_copy, NULL); + + RD_LIST_FOREACH(topic, topics, i) { + rd_kafka_buf_write_str(rkbuf, topic, -1); + /* Tags for previous topic */ + rd_kafka_buf_write_tags(rkbuf); + } + } + + if (ApiVersion >= 4) { + /* AllowAutoTopicCreation */ + rd_kafka_buf_write_bool(rkbuf, allow_auto_create_topics); + + } else if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER && + !rkb->rkb_rk->rk_conf.allow_auto_create_topics && + rd_kafka_conf_is_modified(&rkb->rkb_rk->rk_conf, + "allow.auto.create.topics") && + rd_interval( + &rkb->rkb_rk->rk_suppress.allow_auto_create_topics, + 30 * 60 * 1000 /* every 30 minutes */, 0) >= 0) { + /* Let user know we can't obey allow.auto.create.topics */ + rd_rkb_log(rkb, LOG_WARNING, "AUTOCREATE", + "allow.auto.create.topics=false not supported " + "by broker: requires broker version >= 0.11.0.0: " + "requested topic(s) may be auto created depending " + "on broker auto.create.topics.enable configuration"); + } + + if (ApiVersion >= 8 && ApiVersion < 10) { + /* TODO: implement KIP-430 */ + /* IncludeClusterAuthorizedOperations */ + rd_kafka_buf_write_bool(rkbuf, rd_false); + } + + if (ApiVersion >= 8) { + /* TODO: implement KIP-430 */ + /* IncludeTopicAuthorizedOperations */ + rd_kafka_buf_write_bool(rkbuf, rd_false); + } + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + /* Metadata requests are part of the important control plane + * and should go before most other requests (Produce, Fetch, etc). */ + rkbuf->rkbuf_prio = RD_KAFKA_PRIO_HIGH; + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, + /* Handle response thru rk_ops, + * but forward parsed result to + * rko's replyq when done. */ + RD_KAFKA_REPLYQ(rkb->rkb_rk->rk_ops, 0), + rd_kafka_handle_Metadata, rko); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + + +/** + * @brief Parses and handles ApiVersion reply. + * + * @param apis will be allocated, populated and sorted + * with broker's supported APIs, or set to NULL. + * @param api_cnt will be set to the number of elements in \p *apis + * + * @returns 0 on success, else an error. + * + * @remark A valid \p apis might be returned even if an error is returned. + */ +rd_kafka_resp_err_t +rd_kafka_handle_ApiVersion(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + struct rd_kafka_ApiVersion **apis, + size_t *api_cnt) { + const int log_decode_errors = LOG_DEBUG; + int32_t ApiArrayCnt; + int16_t ErrorCode; + int i = 0; + + *apis = NULL; + *api_cnt = 0; + + if (err) + goto err; + + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + err = ErrorCode; + + rd_kafka_buf_read_arraycnt(rkbuf, &ApiArrayCnt, 1000); + if (err && ApiArrayCnt < 1) { + /* Version >=3 returns the ApiVersions array if the error + * code is ERR_UNSUPPORTED_VERSION, previous versions don't */ + goto err; + } + + rd_rkb_dbg(rkb, FEATURE, "APIVERSION", "Broker API support:"); + + *apis = rd_malloc(sizeof(**apis) * ApiArrayCnt); + + for (i = 0; i < ApiArrayCnt; i++) { + struct rd_kafka_ApiVersion *api = &(*apis)[i]; + + rd_kafka_buf_read_i16(rkbuf, &api->ApiKey); + rd_kafka_buf_read_i16(rkbuf, &api->MinVer); + rd_kafka_buf_read_i16(rkbuf, &api->MaxVer); + + rd_rkb_dbg(rkb, FEATURE, "APIVERSION", + " ApiKey %s (%hd) Versions %hd..%hd", + rd_kafka_ApiKey2str(api->ApiKey), api->ApiKey, + api->MinVer, api->MaxVer); + + /* Discard struct tags */ + rd_kafka_buf_skip_tags(rkbuf); + } + + if (request->rkbuf_reqhdr.ApiVersion >= 1) + rd_kafka_buf_read_throttle_time(rkbuf); + + /* Discard end tags */ + rd_kafka_buf_skip_tags(rkbuf); + + *api_cnt = ApiArrayCnt; + qsort(*apis, *api_cnt, sizeof(**apis), rd_kafka_ApiVersion_key_cmp); + + goto done; + +err_parse: + /* If the broker does not support our ApiVersionRequest version it + * will respond with a version 0 response, which will most likely + * fail parsing. Instead of propagating the parse error we + * propagate the original error, unless there isn't one in which case + * we use the parse error. */ + if (!err) + err = rkbuf->rkbuf_err; +err: + /* There are no retryable errors. */ + + if (*apis) + rd_free(*apis); + + *apis = NULL; + *api_cnt = 0; + +done: + return err; +} + + + +/** + * @brief Send ApiVersionRequest (KIP-35) + * + * @param ApiVersion If -1 use the highest supported version, else use the + * specified value. + */ +void rd_kafka_ApiVersionRequest(rd_kafka_broker_t *rkb, + int16_t ApiVersion, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + + if (ApiVersion == -1) + ApiVersion = 3; + + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_ApiVersion, 1, 3, ApiVersion >= 3 /*flexver*/); + + if (ApiVersion >= 3) { + /* KIP-511 adds software name and version through the optional + * protocol fields defined in KIP-482. */ + + /* ClientSoftwareName */ + rd_kafka_buf_write_str(rkbuf, rkb->rkb_rk->rk_conf.sw_name, -1); + + /* ClientSoftwareVersion */ + rd_kafka_buf_write_str(rkbuf, rkb->rkb_rk->rk_conf.sw_version, + -1); + } + + /* Should be sent before any other requests since it is part of + * the initial connection handshake. */ + rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH; + + /* Non-supporting brokers will tear down the connection when they + * receive an unknown API request, so dont retry request on failure. */ + rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; + + /* 0.9.0.x brokers will not close the connection on unsupported + * API requests, so we minimize the timeout for the request. + * This is a regression on the broker part. */ + rd_kafka_buf_set_abs_timeout( + rkbuf, rkb->rkb_rk->rk_conf.api_version_request_timeout_ms, 0); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + if (replyq.q) + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, + opaque); + else /* in broker thread */ + rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque); +} + + +/** + * Send SaslHandshakeRequest (KIP-43) + */ +void rd_kafka_SaslHandshakeRequest(rd_kafka_broker_t *rkb, + const char *mechanism, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int mechlen = (int)strlen(mechanism); + int16_t ApiVersion; + int features; + + rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslHandshake, 1, + RD_KAFKAP_STR_SIZE0(mechlen)); + + /* Should be sent before any other requests since it is part of + * the initial connection handshake. */ + rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH; + + rd_kafka_buf_write_str(rkbuf, mechanism, mechlen); + + /* Non-supporting brokers will tear down the conneciton when they + * receive an unknown API request or where the SASL GSSAPI + * token type is not recognized, so dont retry request on failure. */ + rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; + + /* 0.9.0.x brokers will not close the connection on unsupported + * API requests, so we minimize the timeout of the request. + * This is a regression on the broker part. */ + if (!rkb->rkb_rk->rk_conf.api_version_request && + rkb->rkb_rk->rk_conf.socket_timeout_ms > 10 * 1000) + rd_kafka_buf_set_abs_timeout(rkbuf, 10 * 1000 /*10s*/, 0); + + /* ApiVersion 1 / RD_KAFKA_FEATURE_SASL_REQ enables + * the SaslAuthenticateRequest */ + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_SaslHandshake, 0, 1, &features); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + if (replyq.q) + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, + opaque); + else /* in broker thread */ + rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque); +} + + +/** + * @brief Parses and handles an SaslAuthenticate reply. + * + * @returns 0 on success, else an error. + * + * @locality broker thread + * @locks none + */ +void rd_kafka_handle_SaslAuthenticate(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + const int log_decode_errors = LOG_ERR; + int16_t error_code; + rd_kafkap_str_t error_str; + rd_kafkap_bytes_t auth_data; + char errstr[512]; + + if (err) { + rd_snprintf(errstr, sizeof(errstr), + "SaslAuthenticateRequest failed: %s", + rd_kafka_err2str(err)); + goto err; + } + + rd_kafka_buf_read_i16(rkbuf, &error_code); + rd_kafka_buf_read_str(rkbuf, &error_str); + + if (error_code) { + /* Authentication failed */ + + /* For backwards compatibility translate the + * new broker-side auth error code to our local error code. */ + if (error_code == RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED) + err = RD_KAFKA_RESP_ERR__AUTHENTICATION; + else + err = error_code; + + rd_snprintf(errstr, sizeof(errstr), "%.*s", + RD_KAFKAP_STR_PR(&error_str)); + goto err; + } + + rd_kafka_buf_read_bytes(rkbuf, &auth_data); + + /* Pass SASL auth frame to SASL handler */ + if (rd_kafka_sasl_recv(rkb->rkb_transport, auth_data.data, + (size_t)RD_KAFKAP_BYTES_LEN(&auth_data), errstr, + sizeof(errstr)) == -1) { + err = RD_KAFKA_RESP_ERR__AUTHENTICATION; + goto err; + } + + return; + + +err_parse: + err = rkbuf->rkbuf_err; + rd_snprintf(errstr, sizeof(errstr), + "SaslAuthenticateResponse parsing failed: %s", + rd_kafka_err2str(err)); + +err: + rd_kafka_broker_fail(rkb, LOG_ERR, err, "SASL authentication error: %s", + errstr); +} + + +/** + * @brief Send SaslAuthenticateRequest (KIP-152) + */ +void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb, + const void *buf, + size_t size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + + rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslAuthenticate, 0, 0); + + /* Should be sent before any other requests since it is part of + * the initial connection handshake. */ + rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH; + + /* Broker does not support -1 (Null) for this field */ + rd_kafka_buf_write_bytes(rkbuf, buf ? buf : "", size); + + /* There are no errors that can be retried, instead + * close down the connection and reconnect on failure. */ + rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; + + if (replyq.q) + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, + opaque); + else /* in broker thread */ + rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque); +} + + + +/** + * @struct Hold temporary result and return values from ProduceResponse + */ +struct rd_kafka_Produce_result { + int64_t offset; /**< Assigned offset of first message */ + int64_t timestamp; /**< (Possibly assigned) offset of first message */ +}; + +/** + * @brief Parses a Produce reply. + * @returns 0 on success or an error code on failure. + * @locality broker thread + */ +static rd_kafka_resp_err_t +rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, + rd_kafka_toppar_t *rktp, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + struct rd_kafka_Produce_result *result) { + int32_t TopicArrayCnt; + int32_t PartitionArrayCnt; + struct { + int32_t Partition; + int16_t ErrorCode; + int64_t Offset; + } hdr; + const int log_decode_errors = LOG_ERR; + int64_t log_start_offset = -1; + + rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); + if (TopicArrayCnt != 1) + goto err; + + /* Since we only produce to one single topic+partition in each + * request we assume that the reply only contains one topic+partition + * and that it is the same that we requested. + * If not the broker is buggy. */ + rd_kafka_buf_skip_str(rkbuf); + rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt); + + if (PartitionArrayCnt != 1) + goto err; + + rd_kafka_buf_read_i32(rkbuf, &hdr.Partition); + rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode); + rd_kafka_buf_read_i64(rkbuf, &hdr.Offset); + + result->offset = hdr.Offset; + + result->timestamp = -1; + if (request->rkbuf_reqhdr.ApiVersion >= 2) + rd_kafka_buf_read_i64(rkbuf, &result->timestamp); + + if (request->rkbuf_reqhdr.ApiVersion >= 5) + rd_kafka_buf_read_i64(rkbuf, &log_start_offset); + + if (request->rkbuf_reqhdr.ApiVersion >= 1) { + int32_t Throttle_Time; + rd_kafka_buf_read_i32(rkbuf, &Throttle_Time); + + rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep, + Throttle_Time); + } + + + return hdr.ErrorCode; + +err_parse: + return rkbuf->rkbuf_err; +err: + return RD_KAFKA_RESP_ERR__BAD_MSG; +} + + +/** + * @struct Hold temporary Produce error state + */ +struct rd_kafka_Produce_err { + rd_kafka_resp_err_t err; /**< Error code */ + int actions; /**< Actions to take */ + int incr_retry; /**< Increase per-message retry cnt */ + rd_kafka_msg_status_t status; /**< Messages persistence status */ + + /* Idempotent Producer */ + int32_t next_ack_seq; /**< Next expected sequence to ack */ + int32_t next_err_seq; /**< Next expected error sequence */ + rd_bool_t update_next_ack; /**< Update next_ack_seq */ + rd_bool_t update_next_err; /**< Update next_err_seq */ + rd_kafka_pid_t rktp_pid; /**< Partition's current PID */ + int32_t last_seq; /**< Last sequence in current batch */ +}; + + +/** + * @brief Error-handling for Idempotent Producer-specific Produce errors. + * + * May update \p errp, \p actionsp and \p incr_retryp. + * + * The resulting \p actionsp are handled by the caller. + * + * @warning May be called on the old leader thread. Lock rktp appropriately! + * + * @locality broker thread (but not necessarily the leader broker) + * @locks none + */ +static void +rd_kafka_handle_idempotent_Produce_error(rd_kafka_broker_t *rkb, + rd_kafka_msgbatch_t *batch, + struct rd_kafka_Produce_err *perr) { + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_toppar_t *rktp = batch->rktp; + rd_kafka_msg_t *firstmsg, *lastmsg; + int r; + rd_ts_t now = rd_clock(), state_age; + struct rd_kafka_toppar_err last_err; + + rd_kafka_rdlock(rkb->rkb_rk); + state_age = now - rkb->rkb_rk->rk_eos.ts_idemp_state; + rd_kafka_rdunlock(rkb->rkb_rk); + + firstmsg = rd_kafka_msgq_first(&batch->msgq); + lastmsg = rd_kafka_msgq_last(&batch->msgq); + rd_assert(firstmsg && lastmsg); + + /* Store the last msgid of the batch + * on the first message in case we need to retry + * and thus reconstruct the entire batch. */ + if (firstmsg->rkm_u.producer.last_msgid) { + /* last_msgid already set, make sure it + * actually points to the last message. */ + rd_assert(firstmsg->rkm_u.producer.last_msgid == + lastmsg->rkm_u.producer.msgid); + } else { + firstmsg->rkm_u.producer.last_msgid = + lastmsg->rkm_u.producer.msgid; + } + + if (!rd_kafka_pid_eq(batch->pid, perr->rktp_pid)) { + /* Don't retry if PID changed since we can't + * guarantee correctness across PID sessions. */ + perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT; + perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; + + rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "ERRPID", + "%.*s [%" PRId32 + "] PID mismatch: " + "request %s != partition %s: " + "failing messages with error %s", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, rd_kafka_pid2str(batch->pid), + rd_kafka_pid2str(perr->rktp_pid), + rd_kafka_err2str(perr->err)); + return; + } + + /* + * Special error handling + */ + switch (perr->err) { + case RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER: + /* Compare request's sequence to expected next + * acked sequence. + * + * Example requests in flight: + * R1(base_seq:5) R2(10) R3(15) R4(20) + */ + + /* Acquire the last partition error to help + * troubleshoot this problem. */ + rd_kafka_toppar_lock(rktp); + last_err = rktp->rktp_last_err; + rd_kafka_toppar_unlock(rktp); + + r = batch->first_seq - perr->next_ack_seq; + + if (r == 0) { + /* R1 failed: + * If this was the head-of-line request in-flight it + * means there is a state desynchronization between the + * producer and broker (a bug), in which case + * we'll raise a fatal error since we can no longer + * reason about the state of messages and thus + * not guarantee ordering or once-ness for R1, + * nor give the user a chance to opt out of sending + * R2 to R4 which would be retried automatically. */ + + rd_kafka_idemp_set_fatal_error( + rk, perr->err, + "ProduceRequest for %.*s [%" PRId32 + "] " + "with %d message(s) failed " + "due to sequence desynchronization with " + "broker %" PRId32 " (%s, base seq %" PRId32 + ", " + "idemp state change %" PRId64 + "ms ago, " + "last partition error %s (actions %s, " + "base seq %" PRId32 "..%" PRId32 + ", base msgid %" PRIu64 ", %" PRId64 "ms ago)", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_msgq_len(&batch->msgq), rkb->rkb_nodeid, + rd_kafka_pid2str(batch->pid), batch->first_seq, + state_age / 1000, rd_kafka_err2name(last_err.err), + rd_kafka_actions2str(last_err.actions), + last_err.base_seq, last_err.last_seq, + last_err.base_msgid, + last_err.ts ? (now - last_err.ts) / 1000 : -1); + + perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT; + perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; + perr->update_next_ack = rd_false; + perr->update_next_err = rd_true; + + } else if (r > 0) { + /* R2 failed: + * With max.in.flight > 1 we can have a situation + * where the first request in-flight (R1) to the broker + * fails, which causes the sub-sequent requests + * that are in-flight to have a non-sequential + * sequence number and thus fail. + * But these sub-sequent requests (R2 to R4) are not at + * the risk of being duplicated so we bump the epoch and + * re-enqueue the messages for later retry + * (without incrementing retries). + */ + rd_rkb_dbg( + rkb, MSG | RD_KAFKA_DBG_EOS, "ERRSEQ", + "ProduceRequest for %.*s [%" PRId32 + "] " + "with %d message(s) failed " + "due to skipped sequence numbers " + "(%s, base seq %" PRId32 + " > " + "next seq %" PRId32 + ") " + "caused by previous failed request " + "(%s, actions %s, " + "base seq %" PRId32 "..%" PRId32 + ", base msgid %" PRIu64 ", %" PRId64 + "ms ago): " + "recovering and retrying", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_msgq_len(&batch->msgq), + rd_kafka_pid2str(batch->pid), batch->first_seq, + perr->next_ack_seq, rd_kafka_err2name(last_err.err), + rd_kafka_actions2str(last_err.actions), + last_err.base_seq, last_err.last_seq, + last_err.base_msgid, + last_err.ts ? (now - last_err.ts) / 1000 : -1); + + perr->incr_retry = 0; + perr->actions = RD_KAFKA_ERR_ACTION_RETRY; + perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; + perr->update_next_ack = rd_false; + perr->update_next_err = rd_true; + + rd_kafka_idemp_drain_epoch_bump( + rk, perr->err, "skipped sequence numbers"); + + } else { + /* Request's sequence is less than next ack, + * this should never happen unless we have + * local bug or the broker did not respond + * to the requests in order. */ + rd_kafka_idemp_set_fatal_error( + rk, perr->err, + "ProduceRequest for %.*s [%" PRId32 + "] " + "with %d message(s) failed " + "with rewound sequence number on " + "broker %" PRId32 + " (%s, " + "base seq %" PRId32 " < next seq %" PRId32 + "): " + "last error %s (actions %s, " + "base seq %" PRId32 "..%" PRId32 + ", base msgid %" PRIu64 ", %" PRId64 "ms ago)", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_msgq_len(&batch->msgq), rkb->rkb_nodeid, + rd_kafka_pid2str(batch->pid), batch->first_seq, + perr->next_ack_seq, rd_kafka_err2name(last_err.err), + rd_kafka_actions2str(last_err.actions), + last_err.base_seq, last_err.last_seq, + last_err.base_msgid, + last_err.ts ? (now - last_err.ts) / 1000 : -1); + + perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT; + perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; + perr->update_next_ack = rd_false; + perr->update_next_err = rd_false; + } + break; + + case RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER: + /* This error indicates that we successfully produced + * this set of messages before but this (supposed) retry failed. + * + * Treat as success, however offset and timestamp + * will be invalid. */ + + /* Future improvement/FIXME: + * But first make sure the first message has actually + * been retried, getting this error for a non-retried message + * indicates a synchronization issue or bug. */ + rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "DUPSEQ", + "ProduceRequest for %.*s [%" PRId32 + "] " + "with %d message(s) failed " + "due to duplicate sequence number: " + "previous send succeeded but was not acknowledged " + "(%s, base seq %" PRId32 + "): " + "marking the messages successfully delivered", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_msgq_len(&batch->msgq), + rd_kafka_pid2str(batch->pid), batch->first_seq); + + /* Void error, delivery succeeded */ + perr->err = RD_KAFKA_RESP_ERR_NO_ERROR; + perr->actions = 0; + perr->status = RD_KAFKA_MSG_STATUS_PERSISTED; + perr->update_next_ack = rd_true; + perr->update_next_err = rd_true; + break; + + case RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID: + /* The broker/cluster lost track of our PID because + * the last message we produced has now been deleted + * (by DeleteRecords, compaction, or topic retention policy). + * + * If all previous messages are accounted for and this is not + * a retry we can simply bump the epoch and reset the sequence + * number and then retry the message(s) again. + * + * If there are outstanding messages not yet acknowledged + * then there is no safe way to carry on without risking + * duplication or reordering, in which case we fail + * the producer. + * + * In case of the transactional producer and a transaction + * coordinator that supports KIP-360 (>= AK 2.5, checked from + * the txnmgr, not here) we'll raise an abortable error and + * flag that the epoch needs to be bumped on the coordinator. */ + if (rd_kafka_is_transactional(rk)) { + rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "UNKPID", + "ProduceRequest for %.*s [%" PRId32 + "] " + "with %d message(s) failed " + "due to unknown producer id " + "(%s, base seq %" PRId32 + ", %d retries): " + "failing the current transaction", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_msgq_len(&batch->msgq), + rd_kafka_pid2str(batch->pid), + batch->first_seq, + firstmsg->rkm_u.producer.retries); + + /* Drain outstanding requests and bump epoch. */ + rd_kafka_idemp_drain_epoch_bump(rk, perr->err, + "unknown producer id"); + + rd_kafka_txn_set_abortable_error_with_bump( + rk, RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID, + "ProduceRequest for %.*s [%" PRId32 + "] " + "with %d message(s) failed " + "due to unknown producer id", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_msgq_len(&batch->msgq)); + + perr->incr_retry = 0; + perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT; + perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; + perr->update_next_ack = rd_false; + perr->update_next_err = rd_true; + break; + + } else if (!firstmsg->rkm_u.producer.retries && + perr->next_err_seq == batch->first_seq) { + rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "UNKPID", + "ProduceRequest for %.*s [%" PRId32 + "] " + "with %d message(s) failed " + "due to unknown producer id " + "(%s, base seq %" PRId32 + ", %d retries): " + "no risk of duplication/reordering: " + "resetting PID and retrying", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_msgq_len(&batch->msgq), + rd_kafka_pid2str(batch->pid), + batch->first_seq, + firstmsg->rkm_u.producer.retries); + + /* Drain outstanding requests and bump epoch. */ + rd_kafka_idemp_drain_epoch_bump(rk, perr->err, + "unknown producer id"); + + perr->incr_retry = 0; + perr->actions = RD_KAFKA_ERR_ACTION_RETRY; + perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; + perr->update_next_ack = rd_false; + perr->update_next_err = rd_true; + break; + } + + rd_kafka_idemp_set_fatal_error( + rk, perr->err, + "ProduceRequest for %.*s [%" PRId32 + "] " + "with %d message(s) failed " + "due to unknown producer id (" + "broker %" PRId32 " %s, base seq %" PRId32 + ", %d retries): " + "unable to retry without risking " + "duplication/reordering", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, rd_kafka_msgq_len(&batch->msgq), + rkb->rkb_nodeid, rd_kafka_pid2str(batch->pid), + batch->first_seq, firstmsg->rkm_u.producer.retries); + + perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT; + perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; + perr->update_next_ack = rd_false; + perr->update_next_err = rd_true; + break; + + default: + /* All other errors are handled in the standard + * error Produce handler, which will set + * update_next_ack|err accordingly. */ + break; + } +} + + + +/** + * @brief Error-handling for failed ProduceRequests + * + * @param errp Is the input and output error, it may be changed + * by this function. + * + * @returns 0 if no further processing of the request should be performed, + * such as triggering delivery reports, else 1. + * + * @warning May be called on the old leader thread. Lock rktp appropriately! + * + * @warning \p request may be NULL. + * + * @locality broker thread (but not necessarily the leader broker) + * @locks none + */ +static int rd_kafka_handle_Produce_error(rd_kafka_broker_t *rkb, + const rd_kafka_buf_t *request, + rd_kafka_msgbatch_t *batch, + struct rd_kafka_Produce_err *perr) { + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_toppar_t *rktp = batch->rktp; + int is_leader; + + if (unlikely(perr->err == RD_KAFKA_RESP_ERR__DESTROY)) + return 0; /* Terminating */ + + /* When there is a partition leader change any outstanding + * requests to the old broker will be handled by the old + * broker thread when the responses are received/timeout: + * in this case we need to be careful with locking: + * check once if we're the leader (which allows relaxed + * locking), and cache the current rktp's eos state vars. */ + rd_kafka_toppar_lock(rktp); + is_leader = rktp->rktp_broker == rkb; + perr->rktp_pid = rktp->rktp_eos.pid; + perr->next_ack_seq = rktp->rktp_eos.next_ack_seq; + perr->next_err_seq = rktp->rktp_eos.next_err_seq; + rd_kafka_toppar_unlock(rktp); + + /* All failures are initially treated as if the message + * was not persisted, but the status may be changed later + * for specific errors and actions. */ + perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; + + /* Set actions for known errors (may be overriden later), + * all other errors are considered permanent failures. + * (also see rd_kafka_err_action() for the default actions). */ + perr->actions = rd_kafka_err_action( + rkb, perr->err, request, + + RD_KAFKA_ERR_ACTION_REFRESH | + RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, + RD_KAFKA_RESP_ERR__TRANSPORT, + + RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, + RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, + + RD_KAFKA_ERR_ACTION_PERMANENT | + RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, + RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED, + + RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY | + RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, + RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, + + RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, + + RD_KAFKA_ERR_ACTION_RETRY | + RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, + RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND, + + RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, + RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, + + RD_KAFKA_ERR_ACTION_RETRY | + RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, + RD_KAFKA_RESP_ERR__TIMED_OUT, + + RD_KAFKA_ERR_ACTION_PERMANENT | + RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, + RD_KAFKA_RESP_ERR__MSG_TIMED_OUT, + + /* All Idempotent Producer-specific errors are + * initially set as permanent errors, + * special handling may change the actions. */ + RD_KAFKA_ERR_ACTION_PERMANENT | + RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, + RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, + + RD_KAFKA_ERR_ACTION_PERMANENT | + RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, + RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER, + + RD_KAFKA_ERR_ACTION_PERMANENT | + RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, + RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID, + + RD_KAFKA_ERR_ACTION_PERMANENT | + RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, + RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH, + + /* Message was purged from out-queue due to + * Idempotent Producer Id change */ + RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__RETRY, + + RD_KAFKA_ERR_ACTION_END); + + rd_rkb_dbg(rkb, MSG, "MSGSET", + "%s [%" PRId32 + "]: MessageSet with %i message(s) " + "(MsgId %" PRIu64 ", BaseSeq %" PRId32 + ") " + "encountered error: %s (actions %s)%s", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + rd_kafka_msgq_len(&batch->msgq), batch->first_msgid, + batch->first_seq, rd_kafka_err2str(perr->err), + rd_kafka_actions2str(perr->actions), + is_leader ? "" : " [NOT LEADER]"); + + + /* + * Special handling for Idempotent Producer + * + * Note: Idempotent Producer-specific errors received + * on a non-idempotent producer will be passed through + * directly to the application. + */ + if (rd_kafka_is_idempotent(rk)) + rd_kafka_handle_idempotent_Produce_error(rkb, batch, perr); + + /* Update message persistence status based on action flags. + * None of these are typically set after an idempotent error, + * which sets the status explicitly. */ + if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED) + perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; + else if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED) + perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; + else if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_PERSISTED) + perr->status = RD_KAFKA_MSG_STATUS_PERSISTED; + + /* Save the last error for debugging sub-sequent errors, + * useful for Idempotent Producer throubleshooting. */ + rd_kafka_toppar_lock(rktp); + rktp->rktp_last_err.err = perr->err; + rktp->rktp_last_err.actions = perr->actions; + rktp->rktp_last_err.ts = rd_clock(); + rktp->rktp_last_err.base_seq = batch->first_seq; + rktp->rktp_last_err.last_seq = perr->last_seq; + rktp->rktp_last_err.base_msgid = batch->first_msgid; + rd_kafka_toppar_unlock(rktp); + + /* + * Handle actions + */ + if (perr->actions & + (RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY)) { + /* Retry (refresh also implies retry) */ + + if (perr->actions & RD_KAFKA_ERR_ACTION_REFRESH) { + /* Request metadata information update. + * These errors imply that we have stale + * information and the request was + * either rejected or not sent - + * we don't need to increment the retry count + * when we perform a retry since: + * - it is a temporary error (hopefully) + * - there is no chance of duplicate delivery + */ + rd_kafka_toppar_leader_unavailable(rktp, "produce", + perr->err); + + /* We can't be certain the request wasn't + * sent in case of transport failure, + * so the ERR__TRANSPORT case will need + * the retry count to be increased, + * In case of certain other errors we want to + * avoid retrying for the duration of the + * message.timeout.ms to speed up error propagation. */ + if (perr->err != RD_KAFKA_RESP_ERR__TRANSPORT && + perr->err != RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR) + perr->incr_retry = 0; + } + + /* If message timed out in queue, not in transit, + * we will retry at a later time but not increment + * the retry count since there is no risk + * of duplicates. */ + if (!rd_kafka_buf_was_sent(request)) + perr->incr_retry = 0; + + if (!perr->incr_retry) { + /* If retries are not to be incremented then + * there is no chance of duplicates on retry, which + * means these messages were not persisted. */ + perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; + } + + if (rd_kafka_is_idempotent(rk)) { + /* Any currently in-flight requests will + * fail with ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, + * which should not be treated as a fatal error + * since this request and sub-sequent requests + * will be retried and thus return to order. + * Unless the error was a timeout, or similar, + * in which case the request might have made it + * and the messages are considered possibly persisted: + * in this case we allow the next in-flight response + * to be successful, in which case we mark + * this request's messages as succesfully delivered. */ + if (perr->status & + RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED) + perr->update_next_ack = rd_true; + else + perr->update_next_ack = rd_false; + perr->update_next_err = rd_true; + + /* Drain outstanding requests so that retries + * are attempted with proper state knowledge and + * without any in-flight requests. */ + rd_kafka_toppar_lock(rktp); + rd_kafka_idemp_drain_toppar(rktp, + "drain before retrying"); + rd_kafka_toppar_unlock(rktp); + } + + /* Since requests are specific to a broker + * we move the retryable messages from the request + * back to the partition queue (prepend) and then + * let the new broker construct a new request. + * While doing this we also make sure the retry count + * for each message is honoured, any messages that + * would exceeded the retry count will not be + * moved but instead fail below. */ + rd_kafka_toppar_retry_msgq(rktp, &batch->msgq, perr->incr_retry, + perr->status); + + if (rd_kafka_msgq_len(&batch->msgq) == 0) { + /* No need do anything more with the request + * here since the request no longer has any + * messages associated with it. */ + return 0; + } + } + + if (perr->actions & RD_KAFKA_ERR_ACTION_PERMANENT && + rd_kafka_is_idempotent(rk)) { + if (rd_kafka_is_transactional(rk) && + perr->err == RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH) { + /* Producer was fenced by new transactional producer + * with the same transactional.id */ + rd_kafka_txn_set_fatal_error( + rk, RD_DO_LOCK, RD_KAFKA_RESP_ERR__FENCED, + "ProduceRequest for %.*s [%" PRId32 + "] " + "with %d message(s) failed: %s " + "(broker %" PRId32 " %s, base seq %" PRId32 + "): " + "transactional producer fenced by newer " + "producer instance", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_msgq_len(&batch->msgq), + rd_kafka_err2str(perr->err), rkb->rkb_nodeid, + rd_kafka_pid2str(batch->pid), batch->first_seq); + + /* Drain outstanding requests and reset PID. */ + rd_kafka_idemp_drain_reset( + rk, "fenced by new transactional producer"); + + } else if (rd_kafka_is_transactional(rk)) { + /* When transactional any permanent produce failure + * would lead to an incomplete transaction, so raise + * an abortable transaction error. */ + rd_kafka_txn_set_abortable_error( + rk, perr->err, + "ProduceRequest for %.*s [%" PRId32 + "] " + "with %d message(s) failed: %s " + "(broker %" PRId32 " %s, base seq %" PRId32 + "): " + "current transaction must be aborted", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_msgq_len(&batch->msgq), + rd_kafka_err2str(perr->err), rkb->rkb_nodeid, + rd_kafka_pid2str(batch->pid), batch->first_seq); + + } else if (rk->rk_conf.eos.gapless) { + /* A permanent non-idempotent error will lead to + * gaps in the message series, the next request + * will fail with ...ERR_OUT_OF_ORDER_SEQUENCE_NUMBER. + * To satisfy the gapless guarantee we need to raise + * a fatal error here. */ + rd_kafka_idemp_set_fatal_error( + rk, RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE, + "ProduceRequest for %.*s [%" PRId32 + "] " + "with %d message(s) failed: " + "%s (broker %" PRId32 " %s, base seq %" PRId32 + "): " + "unable to satisfy gap-less guarantee", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_msgq_len(&batch->msgq), + rd_kafka_err2str(perr->err), rkb->rkb_nodeid, + rd_kafka_pid2str(batch->pid), batch->first_seq); + + /* Drain outstanding requests and reset PID. */ + rd_kafka_idemp_drain_reset( + rk, "unable to satisfy gap-less guarantee"); + + } else { + /* If gapless is not set we bump the Epoch and + * renumber the messages to send. */ + + /* Drain outstanding requests and bump the epoch .*/ + rd_kafka_idemp_drain_epoch_bump(rk, perr->err, + "message sequence gap"); + } + + perr->update_next_ack = rd_false; + /* Make sure the next error will not raise a fatal error. */ + perr->update_next_err = rd_true; + } + + if (perr->err == RD_KAFKA_RESP_ERR__TIMED_OUT || + perr->err == RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE) { + /* Translate request-level timeout error code + * to message-level timeout error code. */ + perr->err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT; + + } else if (perr->err == RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED) { + /* If we're no longer authorized to access the topic mark + * it as errored to deny further produce requests. */ + rd_kafka_topic_wrlock(rktp->rktp_rkt); + rd_kafka_topic_set_error(rktp->rktp_rkt, perr->err); + rd_kafka_topic_wrunlock(rktp->rktp_rkt); + } + + return 1; +} + +/** + * @brief Handle ProduceResponse success for idempotent producer + * + * @warning May be called on the old leader thread. Lock rktp appropriately! + * + * @locks none + * @locality broker thread (but not necessarily the leader broker thread) + */ +static void +rd_kafka_handle_idempotent_Produce_success(rd_kafka_broker_t *rkb, + rd_kafka_msgbatch_t *batch, + int32_t next_seq) { + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_toppar_t *rktp = batch->rktp; + char fatal_err[512]; + uint64_t first_msgid, last_msgid; + + *fatal_err = '\0'; + + first_msgid = rd_kafka_msgq_first(&batch->msgq)->rkm_u.producer.msgid; + last_msgid = rd_kafka_msgq_last(&batch->msgq)->rkm_u.producer.msgid; + + rd_kafka_toppar_lock(rktp); + + /* If the last acked msgid is higher than + * the next message to (re)transmit in the message queue + * it means a previous series of R1,R2 ProduceRequests + * had R1 fail with uncertain persistence status, + * such as timeout or transport error, but R2 succeeded, + * which means the messages in R1 were in fact persisted. + * In this case trigger delivery reports for all messages + * in queue until we hit a non-acked message msgid. */ + if (unlikely(rktp->rktp_eos.acked_msgid < first_msgid - 1)) { + rd_kafka_dr_implicit_ack(rkb, rktp, last_msgid); + + } else if (unlikely(batch->first_seq != rktp->rktp_eos.next_ack_seq && + batch->first_seq == rktp->rktp_eos.next_err_seq)) { + /* Response ordering is typically not a concern + * (but will not happen with current broker versions), + * unless we're expecting an error to be returned at + * this sequence rather than a success ack, in which + * case raise a fatal error. */ + + /* Can't call set_fatal_error() while + * holding the toppar lock, so construct + * the error string here and call + * set_fatal_error() below after + * toppar lock has been released. */ + rd_snprintf(fatal_err, sizeof(fatal_err), + "ProduceRequest for %.*s [%" PRId32 + "] " + "with %d message(s) " + "succeeded when expecting failure " + "(broker %" PRId32 + " %s, " + "base seq %" PRId32 + ", " + "next ack seq %" PRId32 + ", " + "next err seq %" PRId32 + ": " + "unable to retry without risking " + "duplication/reordering", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, + rd_kafka_msgq_len(&batch->msgq), rkb->rkb_nodeid, + rd_kafka_pid2str(batch->pid), batch->first_seq, + rktp->rktp_eos.next_ack_seq, + rktp->rktp_eos.next_err_seq); + + rktp->rktp_eos.next_err_seq = next_seq; + } + + if (likely(!*fatal_err)) { + /* Advance next expected err and/or ack sequence */ + + /* Only step err seq if it hasn't diverged. */ + if (rktp->rktp_eos.next_err_seq == rktp->rktp_eos.next_ack_seq) + rktp->rktp_eos.next_err_seq = next_seq; + + rktp->rktp_eos.next_ack_seq = next_seq; + } + + /* Store the last acked message sequence, + * since retries within the broker cache window (5 requests) + * will succeed for older messages we must only update the + * acked msgid if it is higher than the last acked. */ + if (last_msgid > rktp->rktp_eos.acked_msgid) + rktp->rktp_eos.acked_msgid = last_msgid; + + rd_kafka_toppar_unlock(rktp); + + /* Must call set_fatal_error() after releasing + * the toppar lock. */ + if (unlikely(*fatal_err)) + rd_kafka_idemp_set_fatal_error( + rk, RD_KAFKA_RESP_ERR__INCONSISTENT, "%s", fatal_err); +} + + +/** + * @brief Handle ProduceRequest result for a message batch. + * + * @warning \p request may be NULL. + * + * @localiy broker thread (but not necessarily the toppar's handler thread) + * @locks none + */ +static void rd_kafka_msgbatch_handle_Produce_result( + rd_kafka_broker_t *rkb, + rd_kafka_msgbatch_t *batch, + rd_kafka_resp_err_t err, + const struct rd_kafka_Produce_result *presult, + const rd_kafka_buf_t *request) { + + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_toppar_t *rktp = batch->rktp; + rd_kafka_msg_status_t status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; + rd_bool_t last_inflight; + int32_t next_seq; + + /* Decrease partition's messages in-flight counter */ + rd_assert(rd_atomic32_get(&rktp->rktp_msgs_inflight) >= + rd_kafka_msgq_len(&batch->msgq)); + last_inflight = !rd_atomic32_sub(&rktp->rktp_msgs_inflight, + rd_kafka_msgq_len(&batch->msgq)); + + /* Next expected sequence (and handle wrap) */ + next_seq = rd_kafka_seq_wrap(batch->first_seq + + rd_kafka_msgq_len(&batch->msgq)); + + if (likely(!err)) { + rd_rkb_dbg(rkb, MSG, "MSGSET", + "%s [%" PRId32 + "]: MessageSet with %i message(s) " + "(MsgId %" PRIu64 ", BaseSeq %" PRId32 ") delivered", + rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, + rd_kafka_msgq_len(&batch->msgq), batch->first_msgid, + batch->first_seq); + + if (rktp->rktp_rkt->rkt_conf.required_acks != 0) + status = RD_KAFKA_MSG_STATUS_PERSISTED; + + if (rd_kafka_is_idempotent(rk)) + rd_kafka_handle_idempotent_Produce_success(rkb, batch, + next_seq); + } else { + /* Error handling */ + struct rd_kafka_Produce_err perr = { + .err = err, + .incr_retry = 1, + .status = status, + .update_next_ack = rd_true, + .update_next_err = rd_true, + .last_seq = (batch->first_seq + + rd_kafka_msgq_len(&batch->msgq) - 1)}; + + rd_kafka_handle_Produce_error(rkb, request, batch, &perr); + + /* Update next expected acked and/or err sequence. */ + if (perr.update_next_ack || perr.update_next_err) { + rd_kafka_toppar_lock(rktp); + if (perr.update_next_ack) + rktp->rktp_eos.next_ack_seq = next_seq; + if (perr.update_next_err) + rktp->rktp_eos.next_err_seq = next_seq; + rd_kafka_toppar_unlock(rktp); + } + + err = perr.err; + status = perr.status; + } + + + /* Messages to retry will have been removed from the request's queue */ + if (likely(rd_kafka_msgq_len(&batch->msgq) > 0)) { + /* Set offset, timestamp and status for each message. */ + rd_kafka_msgq_set_metadata(&batch->msgq, rkb->rkb_nodeid, + presult->offset, presult->timestamp, + status); + + /* Enqueue messages for delivery report. */ + rd_kafka_dr_msgq(rktp->rktp_rkt, &batch->msgq, err); + } + + if (rd_kafka_is_idempotent(rk) && last_inflight) + rd_kafka_idemp_inflight_toppar_sub(rk, rktp); +} + + +/** + * @brief Handle ProduceResponse + * + * @param reply is NULL when `acks=0` and on various local errors. + * + * @remark ProduceRequests are never retried, retriable errors are + * instead handled by re-enqueuing the request's messages back + * on the partition queue to have a new ProduceRequest constructed + * eventually. + * + * @warning May be called on the old leader thread. Lock rktp appropriately! + * + * @locality broker thread (but not necessarily the leader broker thread) + */ +static void rd_kafka_handle_Produce(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *reply, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_msgbatch_t *batch = &request->rkbuf_batch; + rd_kafka_toppar_t *rktp = batch->rktp; + struct rd_kafka_Produce_result result = { + .offset = RD_KAFKA_OFFSET_INVALID, .timestamp = -1}; + + /* Unit test interface: inject errors */ + if (unlikely(rk->rk_conf.ut.handle_ProduceResponse != NULL)) { + err = rk->rk_conf.ut.handle_ProduceResponse( + rkb->rkb_rk, rkb->rkb_nodeid, batch->first_msgid, err); + } + + /* Parse Produce reply (unless the request errored) */ + if (!err && reply) + err = rd_kafka_handle_Produce_parse(rkb, rktp, reply, request, + &result); + + rd_kafka_msgbatch_handle_Produce_result(rkb, batch, err, &result, + request); +} + + +/** + * @brief Send ProduceRequest for messages in toppar queue. + * + * @returns the number of messages included, or 0 on error / no messages. + * + * @locality broker thread + */ +int rd_kafka_ProduceRequest(rd_kafka_broker_t *rkb, + rd_kafka_toppar_t *rktp, + const rd_kafka_pid_t pid, + uint64_t epoch_base_msgid) { + rd_kafka_buf_t *rkbuf; + rd_kafka_topic_t *rkt = rktp->rktp_rkt; + size_t MessageSetSize = 0; + int cnt; + rd_ts_t now; + int64_t first_msg_timeout; + int tmout; + + /** + * Create ProduceRequest with as many messages from the toppar + * transmit queue as possible. + */ + rkbuf = rd_kafka_msgset_create_ProduceRequest( + rkb, rktp, &rktp->rktp_xmit_msgq, pid, epoch_base_msgid, + &MessageSetSize); + if (unlikely(!rkbuf)) + return 0; + + cnt = rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq); + rd_dassert(cnt > 0); + + rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchcnt, (int64_t)cnt); + rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchsize, (int64_t)MessageSetSize); + + if (!rkt->rkt_conf.required_acks) + rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE; + + /* Use timeout from first message in batch */ + now = rd_clock(); + first_msg_timeout = + (rd_kafka_msgq_first(&rkbuf->rkbuf_batch.msgq)->rkm_ts_timeout - + now) / + 1000; + + if (unlikely(first_msg_timeout <= 0)) { + /* Message has already timed out, allow 100 ms + * to produce anyway */ + tmout = 100; + } else { + tmout = (int)RD_MIN(INT_MAX, first_msg_timeout); + } + + /* Set absolute timeout (including retries), the + * effective timeout for this specific request will be + * capped by socket.timeout.ms */ + rd_kafka_buf_set_abs_timeout(rkbuf, tmout, now); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, RD_KAFKA_NO_REPLYQ, + rd_kafka_handle_Produce, NULL); + + return cnt; +} + + +/** + * @brief Construct and send CreateTopicsRequest to \p rkb + * with the topics (NewTopic_t*) in \p new_topics, using + * \p options. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code and errstr will be + * updated with a human readable error string. + */ +rd_kafka_resp_err_t +rd_kafka_CreateTopicsRequest(rd_kafka_broker_t *rkb, + const rd_list_t *new_topics /*(NewTopic_t*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + int features; + int i = 0; + rd_kafka_NewTopic_t *newt; + int op_timeout; + + if (rd_list_cnt(new_topics) == 0) { + rd_snprintf(errstr, errstr_size, "No topics to create"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_CreateTopics, 0, 4, &features); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "Topic Admin API (KIP-4) not supported " + "by broker, requires broker version >= 0.10.2.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + if (rd_kafka_confval_get_int(&options->validate_only) && + ApiVersion < 1) { + rd_snprintf(errstr, errstr_size, + "CreateTopics.validate_only=true not " + "supported by broker"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreateTopics, 1, + 4 + (rd_list_cnt(new_topics) * 200) + + 4 + 1); + + /* #topics */ + rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_topics)); + + while ((newt = rd_list_elem(new_topics, i++))) { + int partition; + int ei = 0; + const rd_kafka_ConfigEntry_t *entry; + + if (ApiVersion < 4) { + if (newt->num_partitions == -1) { + rd_snprintf(errstr, errstr_size, + "Default partition count (KIP-464) " + "not supported by broker, " + "requires broker version <= 2.4.0"); + rd_kafka_replyq_destroy(&replyq); + rd_kafka_buf_destroy(rkbuf); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + if (newt->replication_factor == -1 && + rd_list_empty(&newt->replicas)) { + rd_snprintf(errstr, errstr_size, + "Default replication factor " + "(KIP-464) " + "not supported by broker, " + "requires broker version <= 2.4.0"); + rd_kafka_replyq_destroy(&replyq); + rd_kafka_buf_destroy(rkbuf); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + } + + /* topic */ + rd_kafka_buf_write_str(rkbuf, newt->topic, -1); + + if (rd_list_cnt(&newt->replicas)) { + /* num_partitions and replication_factor must be + * set to -1 if a replica assignment is sent. */ + /* num_partitions */ + rd_kafka_buf_write_i32(rkbuf, -1); + /* replication_factor */ + rd_kafka_buf_write_i16(rkbuf, -1); + } else { + /* num_partitions */ + rd_kafka_buf_write_i32(rkbuf, newt->num_partitions); + /* replication_factor */ + rd_kafka_buf_write_i16( + rkbuf, (int16_t)newt->replication_factor); + } + + /* #replica_assignment */ + rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->replicas)); + + /* Replicas per partition, see rdkafka_admin.[ch] + * for how these are constructed. */ + for (partition = 0; partition < rd_list_cnt(&newt->replicas); + partition++) { + const rd_list_t *replicas; + int ri = 0; + + replicas = rd_list_elem(&newt->replicas, partition); + if (!replicas) + continue; + + /* partition */ + rd_kafka_buf_write_i32(rkbuf, partition); + /* #replicas */ + rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(replicas)); + + for (ri = 0; ri < rd_list_cnt(replicas); ri++) { + /* replica */ + rd_kafka_buf_write_i32( + rkbuf, rd_list_get_int32(replicas, ri)); + } + } + + /* #config_entries */ + rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->config)); + + RD_LIST_FOREACH(entry, &newt->config, ei) { + /* config_name */ + rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1); + /* config_value (nullable) */ + rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1); + } + } + + /* timeout */ + op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); + rd_kafka_buf_write_i32(rkbuf, op_timeout); + + if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) + rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); + + if (ApiVersion >= 1) { + /* validate_only */ + rd_kafka_buf_write_i8( + rkbuf, rd_kafka_confval_get_int(&options->validate_only)); + } + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Construct and send DeleteTopicsRequest to \p rkb + * with the topics (DeleteTopic_t *) in \p del_topics, using + * \p options. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code and errstr will be + * updated with a human readable error string. + */ +rd_kafka_resp_err_t +rd_kafka_DeleteTopicsRequest(rd_kafka_broker_t *rkb, + const rd_list_t *del_topics /*(DeleteTopic_t*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + int features; + int i = 0; + rd_kafka_DeleteTopic_t *delt; + int op_timeout; + + if (rd_list_cnt(del_topics) == 0) { + rd_snprintf(errstr, errstr_size, "No topics to delete"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_DeleteTopics, 0, 1, &features); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "Topic Admin API (KIP-4) not supported " + "by broker, requires broker version >= 0.10.2.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + rkbuf = + rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteTopics, 1, + /* FIXME */ + 4 + (rd_list_cnt(del_topics) * 100) + 4); + + /* #topics */ + rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(del_topics)); + + while ((delt = rd_list_elem(del_topics, i++))) + rd_kafka_buf_write_str(rkbuf, delt->topic, -1); + + /* timeout */ + op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); + rd_kafka_buf_write_i32(rkbuf, op_timeout); + + if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) + rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Construct and send DeleteRecordsRequest to \p rkb + * with the offsets to delete (rd_kafka_topic_partition_list_t *) in + * \p offsets_list, using \p options. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @remark The rd_kafka_topic_partition_list_t in \p offsets_list must already + * be sorted. + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code and errstr will be + * updated with a human readable error string. + */ +rd_kafka_resp_err_t +rd_kafka_DeleteRecordsRequest(rd_kafka_broker_t *rkb, + /*(rd_kafka_topic_partition_list_t*)*/ + const rd_list_t *offsets_list, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + int features; + const rd_kafka_topic_partition_list_t *partitions; + int op_timeout; + + partitions = rd_list_elem(offsets_list, 0); + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_DeleteRecords, 0, 1, &features); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "DeleteRecords Admin API (KIP-107) not supported " + "by broker, requires broker version >= 0.11.0"); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteRecords, 1, + 4 + (partitions->cnt * 100) + 4); + + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + rd_kafka_buf_write_topic_partitions( + rkbuf, partitions, rd_false /*don't skip invalid offsets*/, + rd_false /*any offset*/, fields); + + /* timeout */ + op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); + rd_kafka_buf_write_i32(rkbuf, op_timeout); + + if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) + rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Construct and send CreatePartitionsRequest to \p rkb + * with the topics (NewPartitions_t*) in \p new_parts, using + * \p options. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code and errstr will be + * updated with a human readable error string. + */ +rd_kafka_resp_err_t +rd_kafka_CreatePartitionsRequest(rd_kafka_broker_t *rkb, + /*(NewPartitions_t*)*/ + const rd_list_t *new_parts, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + int i = 0; + rd_kafka_NewPartitions_t *newp; + int op_timeout; + + if (rd_list_cnt(new_parts) == 0) { + rd_snprintf(errstr, errstr_size, "No partitions to create"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_CreatePartitions, 0, 0, NULL); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "CreatePartitions (KIP-195) not supported " + "by broker, requires broker version >= 1.0.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreatePartitions, 1, + 4 + (rd_list_cnt(new_parts) * 200) + + 4 + 1); + + /* #topics */ + rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_parts)); + + while ((newp = rd_list_elem(new_parts, i++))) { + /* topic */ + rd_kafka_buf_write_str(rkbuf, newp->topic, -1); + + /* New partition count */ + rd_kafka_buf_write_i32(rkbuf, (int32_t)newp->total_cnt); + + /* #replica_assignment */ + if (rd_list_empty(&newp->replicas)) { + rd_kafka_buf_write_i32(rkbuf, -1); + } else { + const rd_list_t *replicas; + int pi = -1; + + rd_kafka_buf_write_i32(rkbuf, + rd_list_cnt(&newp->replicas)); + + while ( + (replicas = rd_list_elem(&newp->replicas, ++pi))) { + int ri = 0; + + /* replica count */ + rd_kafka_buf_write_i32(rkbuf, + rd_list_cnt(replicas)); + + /* replica */ + for (ri = 0; ri < rd_list_cnt(replicas); ri++) { + rd_kafka_buf_write_i32( + rkbuf, + rd_list_get_int32(replicas, ri)); + } + } + } + } + + /* timeout */ + op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); + rd_kafka_buf_write_i32(rkbuf, op_timeout); + + if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) + rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); + + /* validate_only */ + rd_kafka_buf_write_i8( + rkbuf, rd_kafka_confval_get_int(&options->validate_only)); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Construct and send AlterConfigsRequest to \p rkb + * with the configs (ConfigResource_t*) in \p configs, using + * \p options. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code and errstr will be + * updated with a human readable error string. + */ +rd_kafka_resp_err_t +rd_kafka_AlterConfigsRequest(rd_kafka_broker_t *rkb, + const rd_list_t *configs /*(ConfigResource_t*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + int i; + const rd_kafka_ConfigResource_t *config; + int op_timeout; + + if (rd_list_cnt(configs) == 0) { + rd_snprintf(errstr, errstr_size, + "No config resources specified"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_AlterConfigs, 0, 1, NULL); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "AlterConfigs (KIP-133) not supported " + "by broker, requires broker version >= 0.11.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + /* Incremental requires IncrementalAlterConfigs */ + if (rd_kafka_confval_get_int(&options->incremental)) { + rd_snprintf(errstr, errstr_size, + "AlterConfigs.incremental=true (KIP-248) " + "not supported by broker, " + "replaced by IncrementalAlterConfigs"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_AlterConfigs, 1, + rd_list_cnt(configs) * 200); + + /* #resources */ + rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs)); + + RD_LIST_FOREACH(config, configs, i) { + const rd_kafka_ConfigEntry_t *entry; + int ei; + + /* resource_type */ + rd_kafka_buf_write_i8(rkbuf, config->restype); + + /* resource_name */ + rd_kafka_buf_write_str(rkbuf, config->name, -1); + + /* #config */ + rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&config->config)); + + RD_LIST_FOREACH(entry, &config->config, ei) { + /* config_name */ + rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1); + /* config_value (nullable) */ + rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1); + + if (entry->a.operation != RD_KAFKA_ALTER_OP_SET) { + rd_snprintf(errstr, errstr_size, + "IncrementalAlterConfigs required " + "for add/delete config " + "entries: only set supported " + "by this operation"); + rd_kafka_buf_destroy(rkbuf); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + } + } + + /* timeout */ + op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); + if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) + rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); + + /* validate_only */ + rd_kafka_buf_write_i8( + rkbuf, rd_kafka_confval_get_int(&options->validate_only)); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Construct and send DescribeConfigsRequest to \p rkb + * with the configs (ConfigResource_t*) in \p configs, using + * \p options. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code and errstr will be + * updated with a human readable error string. + */ +rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest( + rd_kafka_broker_t *rkb, + const rd_list_t *configs /*(ConfigResource_t*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + int i; + const rd_kafka_ConfigResource_t *config; + int op_timeout; + + if (rd_list_cnt(configs) == 0) { + rd_snprintf(errstr, errstr_size, + "No config resources specified"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_DescribeConfigs, 0, 1, NULL); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "DescribeConfigs (KIP-133) not supported " + "by broker, requires broker version >= 0.11.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeConfigs, 1, + rd_list_cnt(configs) * 200); + + /* #resources */ + rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs)); + + RD_LIST_FOREACH(config, configs, i) { + const rd_kafka_ConfigEntry_t *entry; + int ei; + + /* resource_type */ + rd_kafka_buf_write_i8(rkbuf, config->restype); + + /* resource_name */ + rd_kafka_buf_write_str(rkbuf, config->name, -1); + + /* #config */ + if (rd_list_empty(&config->config)) { + /* Get all configs */ + rd_kafka_buf_write_i32(rkbuf, -1); + } else { + /* Get requested configs only */ + rd_kafka_buf_write_i32(rkbuf, + rd_list_cnt(&config->config)); + } + + RD_LIST_FOREACH(entry, &config->config, ei) { + /* config_name */ + rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1); + } + } + + + if (ApiVersion == 1) { + /* include_synonyms */ + rd_kafka_buf_write_i8(rkbuf, 1); + } + + /* timeout */ + op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); + if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) + rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Construct and send DeleteGroupsRequest to \p rkb + * with the groups (DeleteGroup_t *) in \p del_groups, using + * \p options. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code and errstr will be + * updated with a human readable error string. + */ +rd_kafka_resp_err_t +rd_kafka_DeleteGroupsRequest(rd_kafka_broker_t *rkb, + const rd_list_t *del_groups /*(DeleteGroup_t*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + int features; + int i = 0; + rd_kafka_DeleteGroup_t *delt; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_DeleteGroups, 0, 1, &features); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "DeleteGroups Admin API (KIP-229) not supported " + "by broker, requires broker version >= 1.1.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + rkbuf = + rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteGroups, 1, + 4 + (rd_list_cnt(del_groups) * 100) + 4); + + /* #groups */ + rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(del_groups)); + + while ((delt = rd_list_elem(del_groups, i++))) + rd_kafka_buf_write_str(rkbuf, delt->group, -1); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** + * @brief Returns the request size needed to send a specific AclBinding + * specified in \p acl, using the ApiVersion provided in + * \p ApiVersion. + * + * @returns and int16_t with the request size in bytes. + */ +static RD_INLINE size_t +rd_kafka_AclBinding_request_size(const rd_kafka_AclBinding_t *acl, + int ApiVersion) { + return 1 + 2 + (acl->name ? strlen(acl->name) : 0) + 2 + + (acl->principal ? strlen(acl->principal) : 0) + 2 + + (acl->host ? strlen(acl->host) : 0) + 1 + 1 + + (ApiVersion > 0 ? 1 : 0); +} + +/** + * @brief Construct and send CreateAclsRequest to \p rkb + * with the acls (AclBinding_t*) in \p new_acls, using + * \p options. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code and errstr will be + * updated with a human readable error string. + */ +rd_kafka_resp_err_t +rd_kafka_CreateAclsRequest(rd_kafka_broker_t *rkb, + const rd_list_t *new_acls /*(AclBinding_t*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion; + int i; + size_t len; + int op_timeout; + rd_kafka_AclBinding_t *new_acl; + + if (rd_list_cnt(new_acls) == 0) { + rd_snprintf(errstr, errstr_size, "No acls to create"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_CreateAcls, 0, 1, NULL); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "ACLs Admin API (KIP-140) not supported " + "by broker, requires broker version >= 0.11.0.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + if (ApiVersion == 0) { + RD_LIST_FOREACH(new_acl, new_acls, i) { + if (new_acl->resource_pattern_type != + RD_KAFKA_RESOURCE_PATTERN_LITERAL) { + rd_snprintf(errstr, errstr_size, + "Broker only supports LITERAL " + "resource pattern types"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + } + } else { + RD_LIST_FOREACH(new_acl, new_acls, i) { + if (new_acl->resource_pattern_type != + RD_KAFKA_RESOURCE_PATTERN_LITERAL && + new_acl->resource_pattern_type != + RD_KAFKA_RESOURCE_PATTERN_PREFIXED) { + rd_snprintf(errstr, errstr_size, + "Only LITERAL and PREFIXED " + "resource patterns are supported " + "when creating ACLs"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + } + } + + len = 4; + RD_LIST_FOREACH(new_acl, new_acls, i) { + len += rd_kafka_AclBinding_request_size(new_acl, ApiVersion); + } + + rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreateAcls, 1, len); + + /* #acls */ + rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_acls)); + + RD_LIST_FOREACH(new_acl, new_acls, i) { + rd_kafka_buf_write_i8(rkbuf, new_acl->restype); + + rd_kafka_buf_write_str(rkbuf, new_acl->name, -1); + + if (ApiVersion >= 1) { + rd_kafka_buf_write_i8(rkbuf, + new_acl->resource_pattern_type); + } + + rd_kafka_buf_write_str(rkbuf, new_acl->principal, -1); + + rd_kafka_buf_write_str(rkbuf, new_acl->host, -1); + + rd_kafka_buf_write_i8(rkbuf, new_acl->operation); + + rd_kafka_buf_write_i8(rkbuf, new_acl->permission_type); + } + + /* timeout */ + op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); + if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) + rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** + * @brief Construct and send DescribeAclsRequest to \p rkb + * with the acls (AclBinding_t*) in \p acls, using + * \p options. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code and errstr will be + * updated with a human readable error string. + */ +rd_kafka_resp_err_t rd_kafka_DescribeAclsRequest( + rd_kafka_broker_t *rkb, + const rd_list_t *acls /*(rd_kafka_AclBindingFilter_t*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + const rd_kafka_AclBindingFilter_t *acl; + int op_timeout; + + if (rd_list_cnt(acls) == 0) { + rd_snprintf(errstr, errstr_size, + "No acl binding filters specified"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + if (rd_list_cnt(acls) > 1) { + rd_snprintf(errstr, errstr_size, + "Too many acl binding filters specified"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + acl = rd_list_elem(acls, 0); + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_DescribeAcls, 0, 1, NULL); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "ACLs Admin API (KIP-140) not supported " + "by broker, requires broker version >= 0.11.0.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + if (ApiVersion == 0) { + if (acl->resource_pattern_type != + RD_KAFKA_RESOURCE_PATTERN_LITERAL && + acl->resource_pattern_type != + RD_KAFKA_RESOURCE_PATTERN_ANY) { + rd_snprintf(errstr, errstr_size, + "Broker only supports LITERAL and ANY " + "resource pattern types"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + } else { + if (acl->resource_pattern_type == + RD_KAFKA_RESOURCE_PATTERN_UNKNOWN) { + rd_snprintf(errstr, errstr_size, + "Filter contains UNKNOWN elements"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + } + + rkbuf = rd_kafka_buf_new_request( + rkb, RD_KAFKAP_DescribeAcls, 1, + rd_kafka_AclBinding_request_size(acl, ApiVersion)); + + /* resource_type */ + rd_kafka_buf_write_i8(rkbuf, acl->restype); + + /* resource_name filter */ + rd_kafka_buf_write_str(rkbuf, acl->name, -1); + + if (ApiVersion > 0) { + /* resource_pattern_type (rd_kafka_ResourcePatternType_t) */ + rd_kafka_buf_write_i8(rkbuf, acl->resource_pattern_type); + } + + /* principal filter */ + rd_kafka_buf_write_str(rkbuf, acl->principal, -1); + + /* host filter */ + rd_kafka_buf_write_str(rkbuf, acl->host, -1); + + /* operation (rd_kafka_AclOperation_t) */ + rd_kafka_buf_write_i8(rkbuf, acl->operation); + + /* permission type (rd_kafka_AclPermissionType_t) */ + rd_kafka_buf_write_i8(rkbuf, acl->permission_type); + + /* timeout */ + op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); + if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) + rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** + * @brief Construct and send DeleteAclsRequest to \p rkb + * with the acl filters (AclBindingFilter_t*) in \p del_acls, using + * \p options. + * + * The response (unparsed) will be enqueued on \p replyq + * for handling by \p resp_cb (with \p opaque passed). + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code and errstr will be + * updated with a human readable error string. + */ +rd_kafka_resp_err_t +rd_kafka_DeleteAclsRequest(rd_kafka_broker_t *rkb, + const rd_list_t *del_acls /*(AclBindingFilter_t*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + const rd_kafka_AclBindingFilter_t *acl; + int op_timeout; + int i; + size_t len; + + if (rd_list_cnt(del_acls) == 0) { + rd_snprintf(errstr, errstr_size, + "No acl binding filters specified"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__INVALID_ARG; + } + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_DeleteAcls, 0, 1, NULL); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "ACLs Admin API (KIP-140) not supported " + "by broker, requires broker version >= 0.11.0.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + len = 4; + + RD_LIST_FOREACH(acl, del_acls, i) { + if (ApiVersion == 0) { + if (acl->resource_pattern_type != + RD_KAFKA_RESOURCE_PATTERN_LITERAL && + acl->resource_pattern_type != + RD_KAFKA_RESOURCE_PATTERN_ANY) { + rd_snprintf(errstr, errstr_size, + "Broker only supports LITERAL " + "and ANY resource pattern types"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + } else { + if (acl->resource_pattern_type == + RD_KAFKA_RESOURCE_PATTERN_UNKNOWN) { + rd_snprintf(errstr, errstr_size, + "Filter contains UNKNOWN elements"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + } + + len += rd_kafka_AclBinding_request_size(acl, ApiVersion); + } + + rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteAcls, 1, len); + + /* #acls */ + rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(del_acls)); + + RD_LIST_FOREACH(acl, del_acls, i) { + /* resource_type */ + rd_kafka_buf_write_i8(rkbuf, acl->restype); + + /* resource_name filter */ + rd_kafka_buf_write_str(rkbuf, acl->name, -1); + + if (ApiVersion > 0) { + /* resource_pattern_type + * (rd_kafka_ResourcePatternType_t) */ + rd_kafka_buf_write_i8(rkbuf, + acl->resource_pattern_type); + } + + /* principal filter */ + rd_kafka_buf_write_str(rkbuf, acl->principal, -1); + + /* host filter */ + rd_kafka_buf_write_str(rkbuf, acl->host, -1); + + /* operation (rd_kafka_AclOperation_t) */ + rd_kafka_buf_write_i8(rkbuf, acl->operation); + + /* permission type (rd_kafka_AclPermissionType_t) */ + rd_kafka_buf_write_i8(rkbuf, acl->permission_type); + } + + /* timeout */ + op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); + if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) + rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +/** + * @brief Parses and handles an InitProducerId reply. + * + * @locality rdkafka main thread + * @locks none + */ +void rd_kafka_handle_InitProducerId(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + const int log_decode_errors = LOG_ERR; + int16_t error_code; + rd_kafka_pid_t pid; + + if (err) + goto err; + + rd_kafka_buf_read_throttle_time(rkbuf); + + rd_kafka_buf_read_i16(rkbuf, &error_code); + if ((err = error_code)) + goto err; + + rd_kafka_buf_read_i64(rkbuf, &pid.id); + rd_kafka_buf_read_i16(rkbuf, &pid.epoch); + + rd_kafka_idemp_pid_update(rkb, pid); + + return; + +err_parse: + err = rkbuf->rkbuf_err; +err: + if (err == RD_KAFKA_RESP_ERR__DESTROY) + return; + + /* Retries are performed by idempotence state handler */ + rd_kafka_idemp_request_pid_failed(rkb, err); +} + +/** + * @brief Construct and send InitProducerIdRequest to \p rkb. + * + * @param transactional_id may be NULL. + * @param transaction_timeout_ms may be set to -1. + * @param current_pid the current PID to reset, requires KIP-360. If not NULL + * and KIP-360 is not supported by the broker this function + * will return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE. + * + * The response (unparsed) will be handled by \p resp_cb served + * by queue \p replyq. + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code and errstr will be + * updated with a human readable error string. + */ +rd_kafka_resp_err_t +rd_kafka_InitProducerIdRequest(rd_kafka_broker_t *rkb, + const char *transactional_id, + int transaction_timeout_ms, + const rd_kafka_pid_t *current_pid, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion; + + if (current_pid) { + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_InitProducerId, 3, 4, NULL); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "InitProducerId (KIP-360) not supported by " + "broker, requires broker version >= 2.5.0: " + "unable to recover from previous " + "transactional error"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + } else { + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_InitProducerId, 0, 4, NULL); + + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "InitProducerId (KIP-98) not supported by " + "broker, requires broker " + "version >= 0.11.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + } + + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_InitProducerId, 1, + 2 + (transactional_id ? strlen(transactional_id) : 0) + 4 + 8 + 4, + ApiVersion >= 2 /*flexver*/); + + /* transactional_id */ + rd_kafka_buf_write_str(rkbuf, transactional_id, -1); + + /* transaction_timeout_ms */ + rd_kafka_buf_write_i32(rkbuf, transaction_timeout_ms); + + if (ApiVersion >= 3) { + /* Current PID */ + rd_kafka_buf_write_i64(rkbuf, + current_pid ? current_pid->id : -1); + /* Current Epoch */ + rd_kafka_buf_write_i16(rkbuf, + current_pid ? current_pid->epoch : -1); + } + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + /* Let the idempotence state handler perform retries */ + rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Construct and send AddPartitionsToTxnRequest to \p rkb. + * + * The response (unparsed) will be handled by \p resp_cb served + * by queue \p replyq. + * + * @param rktps MUST be sorted by topic name. + * + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code. + */ +rd_kafka_resp_err_t +rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb, + const char *transactional_id, + rd_kafka_pid_t pid, + const rd_kafka_toppar_tqhead_t *rktps, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + rd_kafka_toppar_t *rktp; + rd_kafka_topic_t *last_rkt = NULL; + size_t of_TopicCnt; + ssize_t of_PartCnt = -1; + int TopicCnt = 0, PartCnt = 0; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_AddPartitionsToTxn, 0, 0, NULL); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "AddPartitionsToTxnRequest (KIP-98) not supported " + "by broker, requires broker version >= 0.11.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + rkbuf = + rd_kafka_buf_new_request(rkb, RD_KAFKAP_AddPartitionsToTxn, 1, 500); + + /* transactional_id */ + rd_kafka_buf_write_str(rkbuf, transactional_id, -1); + + /* PID */ + rd_kafka_buf_write_i64(rkbuf, pid.id); + rd_kafka_buf_write_i16(rkbuf, pid.epoch); + + /* Topics/partitions array (count updated later) */ + of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); + + TAILQ_FOREACH(rktp, rktps, rktp_txnlink) { + if (last_rkt != rktp->rktp_rkt) { + + if (last_rkt) { + /* Update last topic's partition count field */ + rd_kafka_buf_update_i32(rkbuf, of_PartCnt, + PartCnt); + of_PartCnt = -1; + } + + /* Topic name */ + rd_kafka_buf_write_kstr(rkbuf, + rktp->rktp_rkt->rkt_topic); + /* Partition count, updated later */ + of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); + + PartCnt = 0; + TopicCnt++; + last_rkt = rktp->rktp_rkt; + } + + /* Partition id */ + rd_kafka_buf_write_i32(rkbuf, rktp->rktp_partition); + PartCnt++; + } + + /* Update last partition and topic count fields */ + if (of_PartCnt != -1) + rd_kafka_buf_update_i32(rkbuf, (size_t)of_PartCnt, PartCnt); + rd_kafka_buf_update_i32(rkbuf, of_TopicCnt, TopicCnt); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + /* Let the handler perform retries so that it can pick + * up more added partitions. */ + rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Construct and send AddOffsetsToTxnRequest to \p rkb. + * + * The response (unparsed) will be handled by \p resp_cb served + * by queue \p replyq. + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code. + */ +rd_kafka_resp_err_t +rd_kafka_AddOffsetsToTxnRequest(rd_kafka_broker_t *rkb, + const char *transactional_id, + rd_kafka_pid_t pid, + const char *group_id, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_AddOffsetsToTxn, 0, 0, NULL); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "AddOffsetsToTxnRequest (KIP-98) not supported " + "by broker, requires broker version >= 0.11.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + rkbuf = + rd_kafka_buf_new_request(rkb, RD_KAFKAP_AddOffsetsToTxn, 1, 100); + + /* transactional_id */ + rd_kafka_buf_write_str(rkbuf, transactional_id, -1); + + /* PID */ + rd_kafka_buf_write_i64(rkbuf, pid.id); + rd_kafka_buf_write_i16(rkbuf, pid.epoch); + + /* Group Id */ + rd_kafka_buf_write_str(rkbuf, group_id, -1); + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES; + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + + +/** + * @brief Construct and send EndTxnRequest to \p rkb. + * + * The response (unparsed) will be handled by \p resp_cb served + * by queue \p replyq. + * + * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for + * transmission, otherwise an error code. + */ +rd_kafka_resp_err_t rd_kafka_EndTxnRequest(rd_kafka_broker_t *rkb, + const char *transactional_id, + rd_kafka_pid_t pid, + rd_bool_t committed, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion = 0; + + ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, RD_KAFKAP_EndTxn, + 0, 1, NULL); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "EndTxnRequest (KIP-98) not supported " + "by broker, requires broker version >= 0.11.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_EndTxn, 1, 500); + + /* transactional_id */ + rd_kafka_buf_write_str(rkbuf, transactional_id, -1); + + /* PID */ + rd_kafka_buf_write_i64(rkbuf, pid.id); + rd_kafka_buf_write_i16(rkbuf, pid.epoch); + + /* Committed */ + rd_kafka_buf_write_bool(rkbuf, committed); + rkbuf->rkbuf_u.EndTxn.commit = committed; + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES; + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + + +/** + * @name Unit tests + * @{ + * + * + * + * + */ + +/** + * @brief Create \p cnt messages, starting at \p msgid, and add them + * to \p rkmq. + * + * @returns the number of messages added. + */ +static int ut_create_msgs(rd_kafka_msgq_t *rkmq, uint64_t msgid, int cnt) { + int i; + + for (i = 0; i < cnt; i++) { + rd_kafka_msg_t *rkm; + + rkm = ut_rd_kafka_msg_new(0); + rkm->rkm_u.producer.msgid = msgid++; + rkm->rkm_ts_enq = rd_clock(); + rkm->rkm_ts_timeout = rkm->rkm_ts_enq + (900 * 1000 * 1000); + + rd_kafka_msgq_enq(rkmq, rkm); + } + + return cnt; +} + +/** + * @brief Idempotent Producer request/response unit tests + * + * The current test verifies proper handling of the following case: + * Batch 0 succeeds + * Batch 1 fails with temporary error + * Batch 2,3 fails with out of order sequence + * Retry Batch 1-3 should succeed. + */ +static int unittest_idempotent_producer(void) { + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + rd_kafka_broker_t *rkb; +#define _BATCH_CNT 4 +#define _MSGS_PER_BATCH 3 + const int msgcnt = _BATCH_CNT * _MSGS_PER_BATCH; + int remaining_batches; + uint64_t msgid = 1; + rd_kafka_toppar_t *rktp; + rd_kafka_pid_t pid = {.id = 1000, .epoch = 0}; + struct rd_kafka_Produce_result result = {.offset = 1, + .timestamp = 1000}; + rd_kafka_queue_t *rkqu; + rd_kafka_event_t *rkev; + rd_kafka_buf_t *request[_BATCH_CNT]; + int rcnt = 0; + int retry_msg_cnt = 0; + int drcnt = 0; + rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq); + const char *tmp; + int i, r; + + RD_UT_SAY("Verifying idempotent producer error handling"); + + conf = rd_kafka_conf_new(); + rd_kafka_conf_set(conf, "batch.num.messages", "3", NULL, 0); + rd_kafka_conf_set(conf, "retry.backoff.ms", "1", NULL, 0); + if ((tmp = rd_getenv("TEST_DEBUG", NULL))) + rd_kafka_conf_set(conf, "debug", tmp, NULL, 0); + if (rd_kafka_conf_set(conf, "enable.idempotence", "true", NULL, 0) != + RD_KAFKA_CONF_OK) + RD_UT_FAIL("Failed to enable idempotence"); + rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_DR); + + rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0); + RD_UT_ASSERT(rk, "failed to create producer"); + + rkqu = rd_kafka_queue_get_main(rk); + + /* We need a broker handle, use a logical broker to avoid + * any connection attempts. */ + rkb = rd_kafka_broker_add_logical(rk, "unittest"); + + /* Have the broker support everything so msgset_writer selects + * the most up-to-date output features. */ + rd_kafka_broker_lock(rkb); + rkb->rkb_features = RD_KAFKA_FEATURE_UNITTEST | RD_KAFKA_FEATURE_ALL; + rd_kafka_broker_unlock(rkb); + + /* Get toppar */ + rktp = rd_kafka_toppar_get2(rk, "uttopic", 0, rd_false, rd_true); + RD_UT_ASSERT(rktp, "failed to get toppar"); + + /* Set the topic as exists so messages are enqueued on + * the desired rktp away (otherwise UA partition) */ + rd_ut_kafka_topic_set_topic_exists(rktp->rktp_rkt, 1, -1); + + /* Produce messages */ + ut_create_msgs(&rkmq, 1, msgcnt); + + /* Set the pid */ + rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_WAIT_PID); + rd_kafka_idemp_pid_update(rkb, pid); + pid = rd_kafka_idemp_get_pid(rk); + RD_UT_ASSERT(rd_kafka_pid_valid(pid), "PID is invalid"); + rd_kafka_toppar_pid_change(rktp, pid, msgid); + + remaining_batches = _BATCH_CNT; + + /* Create a ProduceRequest for each batch */ + for (rcnt = 0; rcnt < remaining_batches; rcnt++) { + size_t msize; + request[rcnt] = rd_kafka_msgset_create_ProduceRequest( + rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), 0, &msize); + RD_UT_ASSERT(request[rcnt], "request #%d failed", rcnt); + } + + RD_UT_ASSERT(rd_kafka_msgq_len(&rkmq) == 0, + "expected input message queue to be empty, " + "but still has %d message(s)", + rd_kafka_msgq_len(&rkmq)); + + /* + * Mock handling of each request + */ + + /* Batch 0: accepted */ + i = 0; + r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); + RD_UT_ASSERT(r == _MSGS_PER_BATCH, "."); + rd_kafka_msgbatch_handle_Produce_result(rkb, &request[i]->rkbuf_batch, + RD_KAFKA_RESP_ERR_NO_ERROR, + &result, request[i]); + result.offset += r; + RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == 0, + "batch %d: expected no messages in rktp_msgq, not %d", i, + rd_kafka_msgq_len(&rktp->rktp_msgq)); + rd_kafka_buf_destroy(request[i]); + remaining_batches--; + + /* Batch 1: fail, triggering retry (re-enq on rktp_msgq) */ + i = 1; + r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); + RD_UT_ASSERT(r == _MSGS_PER_BATCH, "."); + rd_kafka_msgbatch_handle_Produce_result( + rkb, &request[i]->rkbuf_batch, + RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, &result, request[i]); + retry_msg_cnt += r; + RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, + "batch %d: expected %d messages in rktp_msgq, not %d", i, + retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq)); + rd_kafka_buf_destroy(request[i]); + + /* Batch 2: OUT_OF_ORDER, triggering retry .. */ + i = 2; + r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); + RD_UT_ASSERT(r == _MSGS_PER_BATCH, "."); + rd_kafka_msgbatch_handle_Produce_result( + rkb, &request[i]->rkbuf_batch, + RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, &result, + request[i]); + retry_msg_cnt += r; + RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, + "batch %d: expected %d messages in rktp_xmit_msgq, not %d", + i, retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq)); + rd_kafka_buf_destroy(request[i]); + + /* Batch 3: OUT_OF_ORDER, triggering retry .. */ + i = 3; + r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); + rd_kafka_msgbatch_handle_Produce_result( + rkb, &request[i]->rkbuf_batch, + RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, &result, + request[i]); + retry_msg_cnt += r; + RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, + "batch %d: expected %d messages in rktp_xmit_msgq, not %d", + i, retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq)); + rd_kafka_buf_destroy(request[i]); + + + /* Retried messages will have been moved to rktp_msgq, + * move them back to our local queue. */ + rd_kafka_toppar_lock(rktp); + rd_kafka_msgq_move(&rkmq, &rktp->rktp_msgq); + rd_kafka_toppar_unlock(rktp); + + RD_UT_ASSERT(rd_kafka_msgq_len(&rkmq) == retry_msg_cnt, + "Expected %d messages in retry queue, not %d", + retry_msg_cnt, rd_kafka_msgq_len(&rkmq)); + + /* Sleep a short while to make sure the retry backoff expires. */ + rd_usleep(5 * 1000, NULL); /* 5ms */ + + /* + * Create requests for remaining batches. + */ + for (rcnt = 0; rcnt < remaining_batches; rcnt++) { + size_t msize; + request[rcnt] = rd_kafka_msgset_create_ProduceRequest( + rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), 0, &msize); + RD_UT_ASSERT(request[rcnt], + "Failed to create retry #%d (%d msgs in queue)", + rcnt, rd_kafka_msgq_len(&rkmq)); + } + + /* + * Mock handling of each request, they will now succeed. + */ + for (i = 0; i < rcnt; i++) { + r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); + rd_kafka_msgbatch_handle_Produce_result( + rkb, &request[i]->rkbuf_batch, RD_KAFKA_RESP_ERR_NO_ERROR, + &result, request[i]); + result.offset += r; + rd_kafka_buf_destroy(request[i]); + } + + retry_msg_cnt = 0; + RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, + "batch %d: expected %d messages in rktp_xmit_msgq, not %d", + i, retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq)); + + /* + * Wait for delivery reports, they should all be successful. + */ + while ((rkev = rd_kafka_queue_poll(rkqu, 1000))) { + const rd_kafka_message_t *rkmessage; + + RD_UT_SAY("Got %s event with %d message(s)", + rd_kafka_event_name(rkev), + (int)rd_kafka_event_message_count(rkev)); + + while ((rkmessage = rd_kafka_event_message_next(rkev))) { + RD_UT_SAY(" DR for message: %s: (persistence=%d)", + rd_kafka_err2str(rkmessage->err), + rd_kafka_message_status(rkmessage)); + if (rkmessage->err) + RD_UT_WARN(" ^ Should not have failed"); + else + drcnt++; + } + rd_kafka_event_destroy(rkev); + } + + /* Should be no more messages in queues */ + r = rd_kafka_outq_len(rk); + RD_UT_ASSERT(r == 0, "expected outq to return 0, not %d", r); + + /* Verify the expected number of good delivery reports were seen */ + RD_UT_ASSERT(drcnt == msgcnt, "expected %d DRs, not %d", msgcnt, drcnt); + + rd_kafka_queue_destroy(rkqu); + rd_kafka_toppar_destroy(rktp); + rd_kafka_broker_destroy(rkb); + rd_kafka_destroy(rk); + + RD_UT_PASS(); + return 0; +} + +/** + * @brief Request/response unit tests + */ +int unittest_request(void) { + int fails = 0; + + fails += unittest_idempotent_producer(); + + return fails; +} + +/**@}*/ -- cgit v1.2.3