diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c | 5378 |
1 files changed, 0 insertions, 5378 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c deleted file mode 100644 index 12d9eb30..00000000 --- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_request.c +++ /dev/null @@ -1,5378 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2012-2015, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include <stdarg.h> - -#include "rdkafka_int.h" -#include "rdkafka_request.h" -#include "rdkafka_broker.h" -#include "rdkafka_offset.h" -#include "rdkafka_topic.h" -#include "rdkafka_partition.h" -#include "rdkafka_metadata.h" -#include "rdkafka_msgset.h" -#include "rdkafka_idempotence.h" -#include "rdkafka_txnmgr.h" -#include "rdkafka_sasl.h" - -#include "rdrand.h" -#include "rdstring.h" -#include "rdunittest.h" - - -/** - * Kafka protocol request and response handling. - * All of this code runs in the broker thread and uses op queues for - * propagating results back to the various sub-systems operating in - * other threads. - */ - - -/* RD_KAFKA_ERR_ACTION_.. to string map */ -static const char *rd_kafka_actions_descs[] = { - "Permanent", "Ignore", "Refresh", "Retry", - "Inform", "Special", "MsgNotPersisted", "MsgPossiblyPersisted", - "MsgPersisted", NULL, -}; - -const char *rd_kafka_actions2str(int actions) { - static RD_TLS char actstr[128]; - return rd_flags2str(actstr, sizeof(actstr), rd_kafka_actions_descs, - actions); -} - - -/** - * @brief Decide action(s) to take based on the returned error code. - * - * The optional var-args is a .._ACTION_END terminated list - * of action,error tuples which overrides the general behaviour. - * It is to be read as: for \p error, return \p action(s). - * - * @warning \p request, \p rkbuf and \p rkb may be NULL. - */ -int rd_kafka_err_action(rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - const rd_kafka_buf_t *request, - ...) { - va_list ap; - int actions = 0; - int exp_act; - - if (!err) - return 0; - - /* Match explicitly defined error mappings first. */ - va_start(ap, request); - while ((exp_act = va_arg(ap, int))) { - int exp_err = va_arg(ap, int); - - if (err == exp_err) - actions |= exp_act; - } - va_end(ap); - - /* Explicit error match. */ - if (actions) { - if (err && rkb && request) - rd_rkb_dbg( - rkb, BROKER, "REQERR", - "%sRequest failed: %s: explicit actions %s", - rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey), - rd_kafka_err2str(err), - rd_kafka_actions2str(actions)); - - return actions; - } - - /* Default error matching */ - switch (err) { - case RD_KAFKA_RESP_ERR_NO_ERROR: - break; - case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE: - case RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION: - case RD_KAFKA_RESP_ERR_BROKER_NOT_AVAILABLE: - case RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE: - case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE: - case RD_KAFKA_RESP_ERR_NOT_COORDINATOR: - case RD_KAFKA_RESP_ERR__WAIT_COORD: - /* Request metadata information update */ - actions |= RD_KAFKA_ERR_ACTION_REFRESH | - RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED; - break; - - case RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR: - /* Request metadata update and retry */ - actions |= RD_KAFKA_ERR_ACTION_REFRESH | - RD_KAFKA_ERR_ACTION_RETRY | - RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED; - break; - - case RD_KAFKA_RESP_ERR__TRANSPORT: - case RD_KAFKA_RESP_ERR__TIMED_OUT: - case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT: - case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND: - actions |= RD_KAFKA_ERR_ACTION_RETRY | - RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED; - break; - - case RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS: - /* Client-side wait-response/in-queue timeout */ - case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE: - actions |= RD_KAFKA_ERR_ACTION_RETRY | - RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED; - break; - - case RD_KAFKA_RESP_ERR__PURGE_INFLIGHT: - actions |= RD_KAFKA_ERR_ACTION_PERMANENT | - RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED; - break; - - case RD_KAFKA_RESP_ERR__BAD_MSG: - /* Buffer parse failures are typically a client-side bug, - * treat them as permanent failures. */ - actions |= RD_KAFKA_ERR_ACTION_PERMANENT | - RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED; - break; - - case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: - actions |= RD_KAFKA_ERR_ACTION_RETRY; - break; - - case RD_KAFKA_RESP_ERR__DESTROY: - case RD_KAFKA_RESP_ERR_INVALID_SESSION_TIMEOUT: - case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE: - case RD_KAFKA_RESP_ERR__PURGE_QUEUE: - default: - actions |= RD_KAFKA_ERR_ACTION_PERMANENT | - RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED; - break; - } - - /* Fatal or permanent errors are not retriable */ - if (actions & - (RD_KAFKA_ERR_ACTION_FATAL | RD_KAFKA_ERR_ACTION_PERMANENT)) - actions &= ~RD_KAFKA_ERR_ACTION_RETRY; - - /* If no request buffer was specified, which might be the case - * in certain error call chains, mask out the retry action. */ - if (!request) - actions &= ~RD_KAFKA_ERR_ACTION_RETRY; - else if (request->rkbuf_reqhdr.ApiKey != RD_KAFKAP_Produce) - /* Mask out message-related bits for non-Produce requests */ - actions &= ~RD_KAFKA_ERR_ACTION_MSG_FLAGS; - - if (err && actions && rkb && request) - rd_rkb_dbg( - rkb, BROKER, "REQERR", "%sRequest failed: %s: actions %s", - rd_kafka_ApiKey2str(request->rkbuf_reqhdr.ApiKey), - rd_kafka_err2str(err), rd_kafka_actions2str(actions)); - - return actions; -} - - -/** - * @brief Read a list of topic+partitions+extra from \p rkbuf. - * - * @param rkbuf buffer to read from - * @param fields An array of fields to read from the buffer and set on - * the rktpar object, in the specified order, must end - * with RD_KAFKA_TOPIC_PARTITION_FIELD_END. - * - * @returns a newly allocated list on success, or NULL on parse error. - */ -rd_kafka_topic_partition_list_t *rd_kafka_buf_read_topic_partitions( - rd_kafka_buf_t *rkbuf, - size_t estimated_part_cnt, - const rd_kafka_topic_partition_field_t *fields) { - const int log_decode_errors = LOG_ERR; - int32_t TopicArrayCnt; - rd_kafka_topic_partition_list_t *parts = NULL; - - rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); - - parts = rd_kafka_topic_partition_list_new( - RD_MAX(TopicArrayCnt * 4, (int)estimated_part_cnt)); - - while (TopicArrayCnt-- > 0) { - rd_kafkap_str_t kTopic; - int32_t PartArrayCnt; - char *topic; - - rd_kafka_buf_read_str(rkbuf, &kTopic); - rd_kafka_buf_read_arraycnt(rkbuf, &PartArrayCnt, - RD_KAFKAP_PARTITIONS_MAX); - - RD_KAFKAP_STR_DUPA(&topic, &kTopic); - - while (PartArrayCnt-- > 0) { - int32_t Partition = -1, Epoch = -1234, - CurrentLeaderEpoch = -1234; - int64_t Offset = -1234; - int16_t ErrorCode = 0; - rd_kafka_topic_partition_t *rktpar; - int fi; - - /* - * Read requested fields - */ - for (fi = 0; - fields[fi] != RD_KAFKA_TOPIC_PARTITION_FIELD_END; - fi++) { - switch (fields[fi]) { - case RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION: - rd_kafka_buf_read_i32(rkbuf, - &Partition); - break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET: - rd_kafka_buf_read_i64(rkbuf, &Offset); - break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_CURRENT_EPOCH: - rd_kafka_buf_read_i32( - rkbuf, &CurrentLeaderEpoch); - break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH: - rd_kafka_buf_read_i32(rkbuf, &Epoch); - break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_ERR: - rd_kafka_buf_read_i16(rkbuf, - &ErrorCode); - break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA: - rd_assert(!*"metadata not implemented"); - break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP: - break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_END: - break; - } - } - - rktpar = rd_kafka_topic_partition_list_add(parts, topic, - Partition); - /* Use dummy sentinel values that are unlikely to be - * seen from the broker to know if we are to set these - * fields or not. */ - if (Offset != -1234) - rktpar->offset = Offset; - if (Epoch != -1234) - rd_kafka_topic_partition_set_leader_epoch( - rktpar, Epoch); - if (CurrentLeaderEpoch != -1234) - rd_kafka_topic_partition_set_current_leader_epoch( - rktpar, CurrentLeaderEpoch); - rktpar->err = ErrorCode; - - - rd_kafka_buf_skip_tags(rkbuf); - } - - rd_kafka_buf_skip_tags(rkbuf); - } - - return parts; - -err_parse: - if (parts) - rd_kafka_topic_partition_list_destroy(parts); - - return NULL; -} - - -/** - * @brief Write a list of topic+partitions+offsets+extra to \p rkbuf - * - * @returns the number of partitions written to buffer. - * - * @remark The \p parts list MUST be sorted. - */ -int rd_kafka_buf_write_topic_partitions( - rd_kafka_buf_t *rkbuf, - const rd_kafka_topic_partition_list_t *parts, - rd_bool_t skip_invalid_offsets, - rd_bool_t only_invalid_offsets, - const rd_kafka_topic_partition_field_t *fields) { - size_t of_TopicArrayCnt; - size_t of_PartArrayCnt = 0; - int TopicArrayCnt = 0, PartArrayCnt = 0; - int i; - const char *prev_topic = NULL; - int cnt = 0; - - rd_assert(!only_invalid_offsets || - (only_invalid_offsets != skip_invalid_offsets)); - - /* TopicArrayCnt */ - of_TopicArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf); - - for (i = 0; i < parts->cnt; i++) { - const rd_kafka_topic_partition_t *rktpar = &parts->elems[i]; - int fi; - - if (rktpar->offset < 0) { - if (skip_invalid_offsets) - continue; - } else if (only_invalid_offsets) - continue; - - if (!prev_topic || strcmp(rktpar->topic, prev_topic)) { - /* Finish previous topic, if any. */ - if (of_PartArrayCnt > 0) { - rd_kafka_buf_finalize_arraycnt( - rkbuf, of_PartArrayCnt, PartArrayCnt); - /* Tags for previous topic struct */ - rd_kafka_buf_write_tags(rkbuf); - } - - - /* Topic */ - rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); - TopicArrayCnt++; - prev_topic = rktpar->topic; - /* New topic so reset partition count */ - PartArrayCnt = 0; - - /* PartitionArrayCnt: updated later */ - of_PartArrayCnt = - rd_kafka_buf_write_arraycnt_pos(rkbuf); - } - - - /* - * Write requested fields - */ - for (fi = 0; fields[fi] != RD_KAFKA_TOPIC_PARTITION_FIELD_END; - fi++) { - switch (fields[fi]) { - case RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION: - rd_kafka_buf_write_i32(rkbuf, - rktpar->partition); - break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET: - rd_kafka_buf_write_i64(rkbuf, rktpar->offset); - break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_CURRENT_EPOCH: - rd_kafka_buf_write_i32( - rkbuf, - rd_kafka_topic_partition_get_current_leader_epoch( - rktpar)); - break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH: - rd_kafka_buf_write_i32( - rkbuf, - rd_kafka_topic_partition_get_leader_epoch( - rktpar)); - break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_ERR: - rd_kafka_buf_write_i16(rkbuf, rktpar->err); - break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA: - /* Java client 0.9.0 and broker <0.10.0 can't - * parse Null metadata fields, so as a - * workaround we send an empty string if - * it's Null. */ - if (!rktpar->metadata) - rd_kafka_buf_write_str(rkbuf, "", 0); - else - rd_kafka_buf_write_str( - rkbuf, rktpar->metadata, - rktpar->metadata_size); - break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP: - break; - case RD_KAFKA_TOPIC_PARTITION_FIELD_END: - break; - } - } - - - if (fi > 1) - /* If there was more than one field written - * then this was a struct and thus needs the - * struct suffix tags written. */ - rd_kafka_buf_write_tags(rkbuf); - - PartArrayCnt++; - cnt++; - } - - if (of_PartArrayCnt > 0) { - rd_kafka_buf_finalize_arraycnt(rkbuf, of_PartArrayCnt, - PartArrayCnt); - /* Tags for topic struct */ - rd_kafka_buf_write_tags(rkbuf); - } - - rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicArrayCnt, TopicArrayCnt); - - return cnt; -} - - -/** - * @brief Send FindCoordinatorRequest. - * - * @param coordkey is the group.id for RD_KAFKA_COORD_GROUP, - * and the transactional.id for RD_KAFKA_COORD_TXN - */ -rd_kafka_resp_err_t -rd_kafka_FindCoordinatorRequest(rd_kafka_broker_t *rkb, - rd_kafka_coordtype_t coordtype, - const char *coordkey, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion; - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_FindCoordinator, 0, 2, NULL); - - if (coordtype != RD_KAFKA_COORD_GROUP && ApiVersion < 1) - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_FindCoordinator, 1, - 1 + 2 + strlen(coordkey)); - - rd_kafka_buf_write_str(rkbuf, coordkey, -1); - - if (ApiVersion >= 1) - rd_kafka_buf_write_i8(rkbuf, (int8_t)coordtype); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - - -/** - * @brief Parses a ListOffsets reply. - * - * Returns the parsed offsets (and errors) in \p offsets which must have been - * initialized by caller. - * - * @returns 0 on success, else an error (\p offsets may be completely or - * partially updated, depending on the nature of the error, and per - * partition error codes should be checked by the caller). - */ -static rd_kafka_resp_err_t -rd_kafka_parse_ListOffsets(rd_kafka_buf_t *rkbuf, - rd_kafka_topic_partition_list_t *offsets) { - const int log_decode_errors = LOG_ERR; - int32_t TopicArrayCnt; - int16_t api_version; - rd_kafka_resp_err_t all_err = RD_KAFKA_RESP_ERR_NO_ERROR; - - api_version = rkbuf->rkbuf_reqhdr.ApiVersion; - - if (api_version >= 2) - rd_kafka_buf_read_throttle_time(rkbuf); - - /* NOTE: - * Broker may return offsets in a different constellation than - * in the original request .*/ - - rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); - while (TopicArrayCnt-- > 0) { - rd_kafkap_str_t ktopic; - int32_t PartArrayCnt; - char *topic_name; - - rd_kafka_buf_read_str(rkbuf, &ktopic); - rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); - - RD_KAFKAP_STR_DUPA(&topic_name, &ktopic); - - while (PartArrayCnt-- > 0) { - int32_t kpartition; - int16_t ErrorCode; - int32_t OffsetArrayCnt; - int64_t Offset = -1; - int32_t LeaderEpoch = -1; - rd_kafka_topic_partition_t *rktpar; - - rd_kafka_buf_read_i32(rkbuf, &kpartition); - rd_kafka_buf_read_i16(rkbuf, &ErrorCode); - - if (api_version >= 1) { - int64_t Timestamp; - rd_kafka_buf_read_i64(rkbuf, &Timestamp); - rd_kafka_buf_read_i64(rkbuf, &Offset); - if (api_version >= 4) - rd_kafka_buf_read_i32(rkbuf, - &LeaderEpoch); - } else if (api_version == 0) { - rd_kafka_buf_read_i32(rkbuf, &OffsetArrayCnt); - /* We only request one offset so just grab - * the first one. */ - while (OffsetArrayCnt-- > 0) - rd_kafka_buf_read_i64(rkbuf, &Offset); - } else { - RD_NOTREACHED(); - } - - rktpar = rd_kafka_topic_partition_list_add( - offsets, topic_name, kpartition); - rktpar->err = ErrorCode; - rktpar->offset = Offset; - rd_kafka_topic_partition_set_leader_epoch(rktpar, - LeaderEpoch); - - if (ErrorCode && !all_err) - all_err = ErrorCode; - } - } - - return all_err; - -err_parse: - return rkbuf->rkbuf_err; -} - - - -/** - * @brief Parses and handles ListOffsets replies. - * - * Returns the parsed offsets (and errors) in \p offsets. - * \p offsets must be initialized by the caller. - * - * @returns 0 on success, else an error. \p offsets may be populated on error, - * depending on the nature of the error. - * On error \p actionsp (unless NULL) is updated with the recommended - * error actions. - */ -rd_kafka_resp_err_t -rd_kafka_handle_ListOffsets(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - rd_kafka_topic_partition_list_t *offsets, - int *actionsp) { - - int actions; - - if (!err) - err = rd_kafka_parse_ListOffsets(rkbuf, offsets); - if (!err) - return RD_KAFKA_RESP_ERR_NO_ERROR; - - actions = rd_kafka_err_action( - rkb, err, request, RD_KAFKA_ERR_ACTION_PERMANENT, - RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, - - RD_KAFKA_ERR_ACTION_REFRESH, - RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, - - RD_KAFKA_ERR_ACTION_REFRESH, - RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, - - RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, - - RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE, - - RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, - RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE, - - RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, - RD_KAFKA_RESP_ERR_FENCED_LEADER_EPOCH, - - RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, - RD_KAFKA_RESP_ERR_UNKNOWN_LEADER_EPOCH, - - RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TRANSPORT, - - RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, - - - RD_KAFKA_ERR_ACTION_END); - - if (actionsp) - *actionsp = actions; - - if (rkb) - rd_rkb_dbg( - rkb, TOPIC, "OFFSET", "OffsetRequest failed: %s (%s)", - rd_kafka_err2str(err), rd_kafka_actions2str(actions)); - - if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { - char tmp[256]; - /* Re-query for leader */ - rd_snprintf(tmp, sizeof(tmp), "ListOffsetsRequest failed: %s", - rd_kafka_err2str(err)); - rd_kafka_metadata_refresh_known_topics(rk, NULL, - rd_true /*force*/, tmp); - } - - if ((actions & RD_KAFKA_ERR_ACTION_RETRY) && - rd_kafka_buf_retry(rkb, request)) - return RD_KAFKA_RESP_ERR__IN_PROGRESS; - - return err; -} - - - -/** - * @brief Async maker for ListOffsetsRequest. - */ -static rd_kafka_resp_err_t -rd_kafka_make_ListOffsetsRequest(rd_kafka_broker_t *rkb, - rd_kafka_buf_t *rkbuf, - void *make_opaque) { - const rd_kafka_topic_partition_list_t *partitions = - (const rd_kafka_topic_partition_list_t *)make_opaque; - int i; - size_t of_TopicArrayCnt = 0, of_PartArrayCnt = 0; - const char *last_topic = ""; - int32_t topic_cnt = 0, part_cnt = 0; - int16_t ApiVersion; - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_ListOffsets, 0, 5, NULL); - if (ApiVersion == -1) - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - - /* ReplicaId */ - rd_kafka_buf_write_i32(rkbuf, -1); - - /* IsolationLevel */ - if (ApiVersion >= 2) - rd_kafka_buf_write_i8(rkbuf, - rkb->rkb_rk->rk_conf.isolation_level); - - /* TopicArrayCnt */ - of_TopicArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); /* updated later */ - - for (i = 0; i < partitions->cnt; i++) { - const rd_kafka_topic_partition_t *rktpar = - &partitions->elems[i]; - - if (strcmp(rktpar->topic, last_topic)) { - /* Finish last topic, if any. */ - if (of_PartArrayCnt > 0) - rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, - part_cnt); - - /* Topic */ - rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); - topic_cnt++; - last_topic = rktpar->topic; - /* New topic so reset partition count */ - part_cnt = 0; - - /* PartitionArrayCnt: updated later */ - of_PartArrayCnt = rd_kafka_buf_write_i32(rkbuf, 0); - } - - /* Partition */ - rd_kafka_buf_write_i32(rkbuf, rktpar->partition); - part_cnt++; - - if (ApiVersion >= 4) - /* CurrentLeaderEpoch */ - rd_kafka_buf_write_i32( - rkbuf, - rd_kafka_topic_partition_get_current_leader_epoch( - rktpar)); - - /* Time/Offset */ - rd_kafka_buf_write_i64(rkbuf, rktpar->offset); - - if (ApiVersion == 0) { - /* MaxNumberOfOffsets */ - rd_kafka_buf_write_i32(rkbuf, 1); - } - } - - if (of_PartArrayCnt > 0) { - rd_kafka_buf_update_i32(rkbuf, of_PartArrayCnt, part_cnt); - rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, topic_cnt); - } - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_rkb_dbg(rkb, TOPIC, "OFFSET", - "ListOffsetsRequest (v%hd, opv %d) " - "for %" PRId32 " topic(s) and %" PRId32 " partition(s)", - ApiVersion, rkbuf->rkbuf_replyq.version, topic_cnt, - partitions->cnt); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -/** - * @brief Send ListOffsetsRequest for partitions in \p partitions. - */ -void rd_kafka_ListOffsetsRequest(rd_kafka_broker_t *rkb, - rd_kafka_topic_partition_list_t *partitions, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - rd_kafka_topic_partition_list_t *make_parts; - - make_parts = rd_kafka_topic_partition_list_copy(partitions); - rd_kafka_topic_partition_list_sort_by_topic(make_parts); - - rkbuf = rd_kafka_buf_new_request( - rkb, RD_KAFKAP_ListOffsets, 1, - /* ReplicaId+IsolationLevel+TopicArrayCnt+Topic */ - 4 + 1 + 4 + 100 + - /* PartArrayCnt */ - 4 + - /* partition_cnt * Partition+Time+MaxNumOffs */ - (make_parts->cnt * (4 + 8 + 4))); - - /* Postpone creating the request contents until time to send, - * at which time the ApiVersion is known. */ - rd_kafka_buf_set_maker(rkbuf, rd_kafka_make_ListOffsetsRequest, - make_parts, - rd_kafka_topic_partition_list_destroy_free); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); -} - - -/** - * @brief OffsetForLeaderEpochResponse handler. - */ -rd_kafka_resp_err_t rd_kafka_handle_OffsetForLeaderEpoch( - rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - rd_kafka_topic_partition_list_t **offsets) { - const int log_decode_errors = LOG_ERR; - int16_t ApiVersion; - - if (err) - goto err; - - ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion; - - if (ApiVersion >= 2) - rd_kafka_buf_read_throttle_time(rkbuf); - - const rd_kafka_topic_partition_field_t fields[] = { - RD_KAFKA_TOPIC_PARTITION_FIELD_ERR, - RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, - ApiVersion >= 1 ? RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH - : RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, - RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET, - RD_KAFKA_TOPIC_PARTITION_FIELD_END}; - *offsets = rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields); - if (!*offsets) - goto err_parse; - - return RD_KAFKA_RESP_ERR_NO_ERROR; - -err: - return err; - -err_parse: - err = rkbuf->rkbuf_err; - goto err; -} - - -/** - * @brief Send OffsetForLeaderEpochRequest for partition(s). - * - */ -void rd_kafka_OffsetForLeaderEpochRequest( - rd_kafka_broker_t *rkb, - rd_kafka_topic_partition_list_t *parts, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion; - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_OffsetForLeaderEpoch, 2, 2, NULL); - /* If the supported ApiVersions are not yet known, - * or this broker doesn't support it, we let this request - * succeed or fail later from the broker thread where the - * version is checked again. */ - if (ApiVersion == -1) - ApiVersion = 2; - - rkbuf = rd_kafka_buf_new_flexver_request( - rkb, RD_KAFKAP_OffsetForLeaderEpoch, 1, 4 + (parts->cnt * 64), - ApiVersion >= 4 /*flexver*/); - - /* Sort partitions by topic */ - rd_kafka_topic_partition_list_sort_by_topic(parts); - - /* Write partition list */ - const rd_kafka_topic_partition_field_t fields[] = { - RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, - /* CurrentLeaderEpoch */ - RD_KAFKA_TOPIC_PARTITION_FIELD_CURRENT_EPOCH, - /* LeaderEpoch */ - RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH, - RD_KAFKA_TOPIC_PARTITION_FIELD_END}; - rd_kafka_buf_write_topic_partitions( - rkbuf, parts, rd_false /*include invalid offsets*/, - rd_false /*skip valid offsets */, fields); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - /* Let caller perform retries */ - rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); -} - - - -/** - * Generic handler for OffsetFetch responses. - * Offsets for included partitions will be propagated through the passed - * 'offsets' list. - * - * @param rkbuf response buffer, may be NULL if \p err is set. - * @param update_toppar update toppar's committed_offset - * @param add_part if true add partitions from the response to \p *offsets, - * else just update the partitions that are already - * in \p *offsets. - */ -rd_kafka_resp_err_t -rd_kafka_handle_OffsetFetch(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - rd_kafka_topic_partition_list_t **offsets, - rd_bool_t update_toppar, - rd_bool_t add_part, - rd_bool_t allow_retry) { - const int log_decode_errors = LOG_ERR; - int32_t TopicArrayCnt; - int64_t offset = RD_KAFKA_OFFSET_INVALID; - int16_t ApiVersion; - rd_kafkap_str_t metadata; - int retry_unstable = 0; - int i; - int actions; - int seen_cnt = 0; - - if (err) - goto err; - - ApiVersion = rkbuf->rkbuf_reqhdr.ApiVersion; - - if (ApiVersion >= 3) - rd_kafka_buf_read_throttle_time(rkbuf); - - if (!*offsets) - *offsets = rd_kafka_topic_partition_list_new(16); - - /* Set default offset for all partitions. */ - rd_kafka_topic_partition_list_set_offsets(rkb->rkb_rk, *offsets, 0, - RD_KAFKA_OFFSET_INVALID, - 0 /* !is commit */); - - rd_kafka_buf_read_arraycnt(rkbuf, &TopicArrayCnt, RD_KAFKAP_TOPICS_MAX); - for (i = 0; i < TopicArrayCnt; i++) { - rd_kafkap_str_t topic; - int32_t PartArrayCnt; - char *topic_name; - int j; - - rd_kafka_buf_read_str(rkbuf, &topic); - - rd_kafka_buf_read_arraycnt(rkbuf, &PartArrayCnt, - RD_KAFKAP_PARTITIONS_MAX); - - RD_KAFKAP_STR_DUPA(&topic_name, &topic); - - for (j = 0; j < PartArrayCnt; j++) { - int32_t partition; - rd_kafka_toppar_t *rktp; - rd_kafka_topic_partition_t *rktpar; - int32_t LeaderEpoch = -1; - int16_t err2; - - rd_kafka_buf_read_i32(rkbuf, &partition); - rd_kafka_buf_read_i64(rkbuf, &offset); - if (ApiVersion >= 5) - rd_kafka_buf_read_i32(rkbuf, &LeaderEpoch); - rd_kafka_buf_read_str(rkbuf, &metadata); - rd_kafka_buf_read_i16(rkbuf, &err2); - rd_kafka_buf_skip_tags(rkbuf); - - rktpar = rd_kafka_topic_partition_list_find( - *offsets, topic_name, partition); - if (!rktpar && add_part) - rktpar = rd_kafka_topic_partition_list_add( - *offsets, topic_name, partition); - else if (!rktpar) { - rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH", - "OffsetFetchResponse: %s [%" PRId32 - "] " - "not found in local list: ignoring", - topic_name, partition); - continue; - } - - seen_cnt++; - - rktp = rd_kafka_topic_partition_get_toppar( - rk, rktpar, rd_false /*no create on miss*/); - - /* broker reports invalid offset as -1 */ - if (offset == -1) - rktpar->offset = RD_KAFKA_OFFSET_INVALID; - else - rktpar->offset = offset; - - rd_kafka_topic_partition_set_leader_epoch(rktpar, - LeaderEpoch); - rktpar->err = err2; - - rd_rkb_dbg(rkb, TOPIC, "OFFSETFETCH", - "OffsetFetchResponse: %s [%" PRId32 - "] " - "offset %" PRId64 ", leader epoch %" PRId32 - ", metadata %d byte(s): %s", - topic_name, partition, offset, LeaderEpoch, - RD_KAFKAP_STR_LEN(&metadata), - rd_kafka_err2name(rktpar->err)); - - if (update_toppar && !err2 && rktp) { - /* Update toppar's committed offset */ - rd_kafka_toppar_lock(rktp); - rktp->rktp_committed_pos = - rd_kafka_topic_partition_get_fetch_pos( - rktpar); - rd_kafka_toppar_unlock(rktp); - } - - if (rktpar->err == - RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) - retry_unstable++; - - - if (rktpar->metadata) - rd_free(rktpar->metadata); - - if (RD_KAFKAP_STR_IS_NULL(&metadata)) { - rktpar->metadata = NULL; - rktpar->metadata_size = 0; - } else { - rktpar->metadata = RD_KAFKAP_STR_DUP(&metadata); - rktpar->metadata_size = - RD_KAFKAP_STR_LEN(&metadata); - } - - /* Loose ref from get_toppar() */ - if (rktp) - rd_kafka_toppar_destroy(rktp); - } - - rd_kafka_buf_skip_tags(rkbuf); - } - - if (ApiVersion >= 2) { - int16_t ErrorCode; - rd_kafka_buf_read_i16(rkbuf, &ErrorCode); - if (ErrorCode) { - err = ErrorCode; - goto err; - } - } - - -err: - if (!*offsets) - rd_rkb_dbg(rkb, TOPIC, "OFFFETCH", "OffsetFetch returned %s", - rd_kafka_err2str(err)); - else - rd_rkb_dbg(rkb, TOPIC, "OFFFETCH", - "OffsetFetch for %d/%d partition(s) " - "(%d unstable partition(s)) returned %s", - seen_cnt, (*offsets)->cnt, retry_unstable, - rd_kafka_err2str(err)); - - actions = - rd_kafka_err_action(rkb, err, request, RD_KAFKA_ERR_ACTION_END); - - if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { - /* Re-query for coordinator */ - rd_kafka_cgrp_op(rkb->rkb_rk->rk_cgrp, NULL, RD_KAFKA_NO_REPLYQ, - RD_KAFKA_OP_COORD_QUERY, err); - } - - if (actions & RD_KAFKA_ERR_ACTION_RETRY || retry_unstable) { - if (allow_retry && rd_kafka_buf_retry(rkb, request)) - return RD_KAFKA_RESP_ERR__IN_PROGRESS; - /* FALLTHRU */ - } - - return err; - -err_parse: - err = rkbuf->rkbuf_err; - goto err; -} - - - -/** - * @brief Handle OffsetFetch response based on an RD_KAFKA_OP_OFFSET_FETCH - * rko in \p opaque. - * - * @param opaque rko wrapper for handle_OffsetFetch. - * - * The \c rko->rko_u.offset_fetch.partitions list will be filled in with - * the fetched offsets. - * - * A reply will be sent on 'rko->rko_replyq' with type RD_KAFKA_OP_OFFSET_FETCH. - * - * @remark \p rkb, \p rkbuf and \p request are optional. - * - * @remark The \p request buffer may be retried on error. - * - * @locality cgrp's broker thread - */ -void rd_kafka_op_handle_OffsetFetch(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - void *opaque) { - rd_kafka_op_t *rko = opaque; - rd_kafka_op_t *rko_reply; - rd_kafka_topic_partition_list_t *offsets; - - RD_KAFKA_OP_TYPE_ASSERT(rko, RD_KAFKA_OP_OFFSET_FETCH); - - if (err == RD_KAFKA_RESP_ERR__DESTROY) { - /* Termination, quick cleanup. */ - rd_kafka_op_destroy(rko); - return; - } - - offsets = rd_kafka_topic_partition_list_copy( - rko->rko_u.offset_fetch.partitions); - - /* If all partitions already had usable offsets then there - * was no request sent and thus no reply, the offsets list is - * good to go.. */ - if (rkbuf) { - /* ..else parse the response (or perror) */ - err = rd_kafka_handle_OffsetFetch( - rkb->rkb_rk, rkb, err, rkbuf, request, &offsets, - rd_false /*dont update rktp*/, rd_false /*dont add part*/, - /* Allow retries if replyq is valid */ - rd_kafka_op_replyq_is_valid(rko)); - if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) { - if (offsets) - rd_kafka_topic_partition_list_destroy(offsets); - return; /* Retrying */ - } - } - - rko_reply = - rd_kafka_op_new(RD_KAFKA_OP_OFFSET_FETCH | RD_KAFKA_OP_REPLY); - rko_reply->rko_err = err; - rko_reply->rko_u.offset_fetch.partitions = offsets; - rko_reply->rko_u.offset_fetch.do_free = 1; - if (rko->rko_rktp) - rko_reply->rko_rktp = rd_kafka_toppar_keep(rko->rko_rktp); - - rd_kafka_replyq_enq(&rko->rko_replyq, rko_reply, 0); - - rd_kafka_op_destroy(rko); -} - -/** - * Send OffsetFetchRequest for a consumer group id. - * - * Any partition with a usable offset will be ignored, if all partitions - * have usable offsets then no request is sent at all but an empty - * reply is enqueued on the replyq. - * - * @param group_id Request offset for this group id. - * @param parts (optional) List of topic partitions to request, - * or NULL to return all topic partitions associated with the - * group. - * @param require_stable_offsets Whether broker should return stable offsets - * (transaction-committed). - * @param timeout Optional timeout to set to the buffer. - */ -void rd_kafka_OffsetFetchRequest(rd_kafka_broker_t *rkb, - const char *group_id, - rd_kafka_topic_partition_list_t *parts, - rd_bool_t require_stable_offsets, - int timeout, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion; - size_t parts_size = 0; - int PartCnt = -1; - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_OffsetFetch, 0, 7, NULL); - - if (parts) { - parts_size = parts->cnt * 32; - } - - rkbuf = rd_kafka_buf_new_flexver_request( - rkb, RD_KAFKAP_OffsetFetch, 1, - /* GroupId + rd_kafka_buf_write_arraycnt_pos + - * Topics + RequireStable */ - 32 + 4 + parts_size + 1, ApiVersion >= 6 /*flexver*/); - - /* ConsumerGroup */ - rd_kafka_buf_write_str(rkbuf, group_id, -1); - - if (parts) { - /* Sort partitions by topic */ - rd_kafka_topic_partition_list_sort_by_topic(parts); - - /* Write partition list, filtering out partitions with valid - * offsets */ - const rd_kafka_topic_partition_field_t fields[] = { - RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, - RD_KAFKA_TOPIC_PARTITION_FIELD_END}; - PartCnt = rd_kafka_buf_write_topic_partitions( - rkbuf, parts, rd_false /*include invalid offsets*/, - rd_false /*skip valid offsets */, fields); - } else { - rd_kafka_buf_write_arraycnt_pos(rkbuf); - } - - if (ApiVersion >= 7) { - /* RequireStable */ - rd_kafka_buf_write_i8(rkbuf, require_stable_offsets); - } - - if (PartCnt == 0) { - /* No partitions needs OffsetFetch, enqueue empty - * response right away. */ - rkbuf->rkbuf_replyq = replyq; - rkbuf->rkbuf_cb = resp_cb; - rkbuf->rkbuf_opaque = opaque; - rd_kafka_buf_callback(rkb->rkb_rk, rkb, 0, NULL, rkbuf); - return; - } - - if (timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) - rd_kafka_buf_set_abs_timeout(rkbuf, timeout + 1000, 0); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - if (parts) { - rd_rkb_dbg( - rkb, TOPIC | RD_KAFKA_DBG_CGRP | RD_KAFKA_DBG_CONSUMER, - "OFFSET", - "Group %s OffsetFetchRequest(v%d) for %d/%d partition(s)", - group_id, ApiVersion, PartCnt, parts->cnt); - } else { - rd_rkb_dbg( - rkb, TOPIC | RD_KAFKA_DBG_CGRP | RD_KAFKA_DBG_CONSUMER, - "OFFSET", - "Group %s OffsetFetchRequest(v%d) for all partitions", - group_id, ApiVersion); - } - - /* Let handler decide if retries should be performed */ - rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES; - - if (parts) { - rd_rkb_dbg(rkb, CGRP | RD_KAFKA_DBG_CONSUMER, "OFFSET", - "Fetch committed offsets for %d/%d partition(s)", - PartCnt, parts->cnt); - } else { - rd_rkb_dbg(rkb, CGRP | RD_KAFKA_DBG_CONSUMER, "OFFSET", - "Fetch committed offsets all the partitions"); - } - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); -} - - - -/** - * @brief Handle per-partition OffsetCommit errors and returns actions flags. - */ -static int -rd_kafka_handle_OffsetCommit_error(rd_kafka_broker_t *rkb, - rd_kafka_buf_t *request, - const rd_kafka_topic_partition_t *rktpar) { - - /* These actions are mimicking AK's ConsumerCoordinator.java */ - - return rd_kafka_err_action( - rkb, rktpar->err, request, - - RD_KAFKA_ERR_ACTION_PERMANENT, - RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED, - - RD_KAFKA_ERR_ACTION_PERMANENT, - RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED, - - - RD_KAFKA_ERR_ACTION_PERMANENT, - RD_KAFKA_RESP_ERR_OFFSET_METADATA_TOO_LARGE, - - RD_KAFKA_ERR_ACTION_PERMANENT, - RD_KAFKA_RESP_ERR_INVALID_COMMIT_OFFSET_SIZE, - - - RD_KAFKA_ERR_ACTION_RETRY, - RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS, - - RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, - - - /* .._SPECIAL: mark coordinator dead, refresh and retry */ - RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY | - RD_KAFKA_ERR_ACTION_SPECIAL, - RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, - - RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY | - RD_KAFKA_ERR_ACTION_SPECIAL, - RD_KAFKA_RESP_ERR_NOT_COORDINATOR, - - /* Replicas possibly unavailable: - * Refresh coordinator (but don't mark as dead (!.._SPECIAL)), - * and retry */ - RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY, - RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, - - - /* FIXME: There are some cases in the Java code where - * this is not treated as a fatal error. */ - RD_KAFKA_ERR_ACTION_PERMANENT | RD_KAFKA_ERR_ACTION_FATAL, - RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID, - - - RD_KAFKA_ERR_ACTION_PERMANENT, - RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS, - - - RD_KAFKA_ERR_ACTION_PERMANENT, RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID, - - RD_KAFKA_ERR_ACTION_PERMANENT, RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, - - RD_KAFKA_ERR_ACTION_END); -} - - -/** - * @brief Handle OffsetCommit response. - * - * @remark \p offsets may be NULL if \p err is set - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if all partitions were successfully - * committed, - * RD_KAFKA_RESP_ERR__IN_PROGRESS if a retry was scheduled, - * or any other error code if the request was not retried. - */ -rd_kafka_resp_err_t -rd_kafka_handle_OffsetCommit(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - rd_kafka_topic_partition_list_t *offsets, - rd_bool_t ignore_cgrp) { - const int log_decode_errors = LOG_ERR; - int32_t TopicArrayCnt; - int errcnt = 0; - int partcnt = 0; - int i; - int actions = 0; - - if (err) - goto err; - - if (rd_kafka_buf_ApiVersion(rkbuf) >= 3) - rd_kafka_buf_read_throttle_time(rkbuf); - - rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); - for (i = 0; i < TopicArrayCnt; i++) { - rd_kafkap_str_t topic; - char *topic_str; - int32_t PartArrayCnt; - int j; - - rd_kafka_buf_read_str(rkbuf, &topic); - rd_kafka_buf_read_i32(rkbuf, &PartArrayCnt); - - RD_KAFKAP_STR_DUPA(&topic_str, &topic); - - for (j = 0; j < PartArrayCnt; j++) { - int32_t partition; - int16_t ErrorCode; - rd_kafka_topic_partition_t *rktpar; - - rd_kafka_buf_read_i32(rkbuf, &partition); - rd_kafka_buf_read_i16(rkbuf, &ErrorCode); - - rktpar = rd_kafka_topic_partition_list_find( - offsets, topic_str, partition); - - if (!rktpar) { - /* Received offset for topic/partition we didn't - * ask for, this shouldn't really happen. */ - continue; - } - - rktpar->err = ErrorCode; - if (ErrorCode) { - err = ErrorCode; - errcnt++; - - /* Accumulate actions for per-partition - * errors. */ - actions |= rd_kafka_handle_OffsetCommit_error( - rkb, request, rktpar); - } - - partcnt++; - } - } - - /* If all partitions failed use error code - * from last partition as the global error. */ - if (offsets && err && errcnt == partcnt) - goto err; - - goto done; - -err_parse: - err = rkbuf->rkbuf_err; - -err: - if (!actions) /* Transport/Request-level error */ - actions = rd_kafka_err_action(rkb, err, request, - - RD_KAFKA_ERR_ACTION_REFRESH | - RD_KAFKA_ERR_ACTION_SPECIAL | - RD_KAFKA_ERR_ACTION_RETRY, - RD_KAFKA_RESP_ERR__TRANSPORT, - - RD_KAFKA_ERR_ACTION_END); - - if (!ignore_cgrp && (actions & RD_KAFKA_ERR_ACTION_FATAL)) { - rd_kafka_set_fatal_error(rk, err, "OffsetCommit failed: %s", - rd_kafka_err2str(err)); - return err; - } - - if (!ignore_cgrp && (actions & RD_KAFKA_ERR_ACTION_REFRESH) && - rk->rk_cgrp) { - /* Mark coordinator dead or re-query for coordinator. - * ..dead() will trigger a re-query. */ - if (actions & RD_KAFKA_ERR_ACTION_SPECIAL) - rd_kafka_cgrp_coord_dead(rk->rk_cgrp, err, - "OffsetCommitRequest failed"); - else - rd_kafka_cgrp_coord_query(rk->rk_cgrp, - "OffsetCommitRequest failed"); - } - - if (!ignore_cgrp && actions & RD_KAFKA_ERR_ACTION_RETRY && - !(actions & RD_KAFKA_ERR_ACTION_PERMANENT) && - rd_kafka_buf_retry(rkb, request)) - return RD_KAFKA_RESP_ERR__IN_PROGRESS; - -done: - return err; -} - -/** - * @brief Send OffsetCommitRequest for a list of partitions. - * - * @param cgmetadata consumer group metadata. - * - * @param offsets - offsets to commit for each topic-partition. - * - * @returns 0 if none of the partitions in \p offsets had valid offsets, - * else 1. - */ -int rd_kafka_OffsetCommitRequest(rd_kafka_broker_t *rkb, - rd_kafka_consumer_group_metadata_t *cgmetadata, - rd_kafka_topic_partition_list_t *offsets, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque, - const char *reason) { - rd_kafka_buf_t *rkbuf; - ssize_t of_TopicCnt = -1; - int TopicCnt = 0; - const char *last_topic = NULL; - ssize_t of_PartCnt = -1; - int PartCnt = 0; - int tot_PartCnt = 0; - int i; - int16_t ApiVersion; - int features; - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_OffsetCommit, 0, 7, &features); - - rd_kafka_assert(NULL, offsets != NULL); - - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_OffsetCommit, 1, - 100 + (offsets->cnt * 128)); - - /* ConsumerGroup */ - rd_kafka_buf_write_str(rkbuf, cgmetadata->group_id, -1); - - /* v1,v2 */ - if (ApiVersion >= 1) { - /* ConsumerGroupGenerationId */ - rd_kafka_buf_write_i32(rkbuf, cgmetadata->generation_id); - /* ConsumerId */ - rd_kafka_buf_write_str(rkbuf, cgmetadata->member_id, -1); - } - - /* v7: GroupInstanceId */ - if (ApiVersion >= 7) - rd_kafka_buf_write_str(rkbuf, cgmetadata->group_instance_id, - -1); - - /* v2-4: RetentionTime */ - if (ApiVersion >= 2 && ApiVersion <= 4) - rd_kafka_buf_write_i64(rkbuf, -1); - - /* Sort offsets by topic */ - rd_kafka_topic_partition_list_sort_by_topic(offsets); - - /* TopicArrayCnt: Will be updated when we know the number of topics. */ - of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); - - for (i = 0; i < offsets->cnt; i++) { - rd_kafka_topic_partition_t *rktpar = &offsets->elems[i]; - - /* Skip partitions with invalid offset. */ - if (rktpar->offset < 0) - continue; - - if (last_topic == NULL || strcmp(last_topic, rktpar->topic)) { - /* New topic */ - - /* Finalize previous PartitionCnt */ - if (PartCnt > 0) - rd_kafka_buf_update_u32(rkbuf, of_PartCnt, - PartCnt); - - /* TopicName */ - rd_kafka_buf_write_str(rkbuf, rktpar->topic, -1); - /* PartitionCnt, finalized later */ - of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); - PartCnt = 0; - last_topic = rktpar->topic; - TopicCnt++; - } - - /* Partition */ - rd_kafka_buf_write_i32(rkbuf, rktpar->partition); - PartCnt++; - tot_PartCnt++; - - /* Offset */ - rd_kafka_buf_write_i64(rkbuf, rktpar->offset); - - /* v6: KIP-101 CommittedLeaderEpoch */ - if (ApiVersion >= 6) - rd_kafka_buf_write_i32( - rkbuf, - rd_kafka_topic_partition_get_leader_epoch(rktpar)); - - /* v1: TimeStamp */ - if (ApiVersion == 1) - rd_kafka_buf_write_i64(rkbuf, -1); - - /* Metadata */ - /* Java client 0.9.0 and broker <0.10.0 can't parse - * Null metadata fields, so as a workaround we send an - * empty string if it's Null. */ - if (!rktpar->metadata) - rd_kafka_buf_write_str(rkbuf, "", 0); - else - rd_kafka_buf_write_str(rkbuf, rktpar->metadata, - rktpar->metadata_size); - } - - if (tot_PartCnt == 0) { - /* No topic+partitions had valid offsets to commit. */ - rd_kafka_replyq_destroy(&replyq); - rd_kafka_buf_destroy(rkbuf); - return 0; - } - - /* Finalize previous PartitionCnt */ - if (PartCnt > 0) - rd_kafka_buf_update_u32(rkbuf, of_PartCnt, PartCnt); - - /* Finalize TopicCnt */ - rd_kafka_buf_update_u32(rkbuf, of_TopicCnt, TopicCnt); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_rkb_dbg(rkb, TOPIC, "OFFSET", - "Enqueue OffsetCommitRequest(v%d, %d/%d partition(s))): %s", - ApiVersion, tot_PartCnt, offsets->cnt, reason); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return 1; -} - -/** - * @brief Construct and send OffsetDeleteRequest to \p rkb - * with the partitions in del_grpoffsets (DeleteConsumerGroupOffsets_t*) - * using \p options. - * - * The response (unparsed) will be enqueued on \p replyq - * for handling by \p resp_cb (with \p opaque passed). - * - * @remark Only one del_grpoffsets element is supported. - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code and errstr will be - * updated with a human readable error string. - */ -rd_kafka_resp_err_t -rd_kafka_OffsetDeleteRequest(rd_kafka_broker_t *rkb, - /** (rd_kafka_DeleteConsumerGroupOffsets_t*) */ - const rd_list_t *del_grpoffsets, - rd_kafka_AdminOptions_t *options, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - int features; - const rd_kafka_DeleteConsumerGroupOffsets_t *grpoffsets = - rd_list_elem(del_grpoffsets, 0); - - rd_assert(rd_list_cnt(del_grpoffsets) == 1); - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_OffsetDelete, 0, 0, &features); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "OffsetDelete API (KIP-496) not supported " - "by broker, requires broker version >= 2.4.0"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - rkbuf = rd_kafka_buf_new_request( - rkb, RD_KAFKAP_OffsetDelete, 1, - 2 + strlen(grpoffsets->group) + (64 * grpoffsets->partitions->cnt)); - - /* GroupId */ - rd_kafka_buf_write_str(rkbuf, grpoffsets->group, -1); - - const rd_kafka_topic_partition_field_t fields[] = { - RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, - RD_KAFKA_TOPIC_PARTITION_FIELD_END}; - rd_kafka_buf_write_topic_partitions( - rkbuf, grpoffsets->partitions, - rd_false /*dont skip invalid offsets*/, rd_false /*any offset*/, - fields); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - - -/** - * @brief Write "consumer" protocol type MemberState for SyncGroupRequest to - * enveloping buffer \p rkbuf. - */ -static void -rd_kafka_group_MemberState_consumer_write(rd_kafka_buf_t *env_rkbuf, - const rd_kafka_group_member_t *rkgm) { - rd_kafka_buf_t *rkbuf; - rd_slice_t slice; - - rkbuf = rd_kafka_buf_new(1, 100); - rd_kafka_buf_write_i16(rkbuf, 0); /* Version */ - rd_assert(rkgm->rkgm_assignment); - const rd_kafka_topic_partition_field_t fields[] = { - RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, - RD_KAFKA_TOPIC_PARTITION_FIELD_END}; - rd_kafka_buf_write_topic_partitions( - rkbuf, rkgm->rkgm_assignment, - rd_false /*don't skip invalid offsets*/, rd_false /* any offset */, - fields); - rd_kafka_buf_write_kbytes(rkbuf, rkgm->rkgm_userdata); - - /* Get pointer to binary buffer */ - rd_slice_init_full(&slice, &rkbuf->rkbuf_buf); - - /* Write binary buffer as Kafka Bytes to enveloping buffer. */ - rd_kafka_buf_write_i32(env_rkbuf, (int32_t)rd_slice_remains(&slice)); - rd_buf_write_slice(&env_rkbuf->rkbuf_buf, &slice); - - rd_kafka_buf_destroy(rkbuf); -} - -/** - * Send SyncGroupRequest - */ -void rd_kafka_SyncGroupRequest(rd_kafka_broker_t *rkb, - const rd_kafkap_str_t *group_id, - int32_t generation_id, - const rd_kafkap_str_t *member_id, - const rd_kafkap_str_t *group_instance_id, - const rd_kafka_group_member_t *assignments, - int assignment_cnt, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int i; - int16_t ApiVersion; - int features; - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_SyncGroup, 0, 3, &features); - - rkbuf = rd_kafka_buf_new_request( - rkb, RD_KAFKAP_SyncGroup, 1, - RD_KAFKAP_STR_SIZE(group_id) + 4 /* GenerationId */ + - RD_KAFKAP_STR_SIZE(member_id) + - RD_KAFKAP_STR_SIZE(group_instance_id) + - 4 /* array size group_assignment */ + - (assignment_cnt * 100 /*guess*/)); - rd_kafka_buf_write_kstr(rkbuf, group_id); - rd_kafka_buf_write_i32(rkbuf, generation_id); - rd_kafka_buf_write_kstr(rkbuf, member_id); - if (ApiVersion >= 3) - rd_kafka_buf_write_kstr(rkbuf, group_instance_id); - rd_kafka_buf_write_i32(rkbuf, assignment_cnt); - - for (i = 0; i < assignment_cnt; i++) { - const rd_kafka_group_member_t *rkgm = &assignments[i]; - - rd_kafka_buf_write_kstr(rkbuf, rkgm->rkgm_member_id); - rd_kafka_group_MemberState_consumer_write(rkbuf, rkgm); - } - - /* This is a blocking request */ - rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING; - rd_kafka_buf_set_abs_timeout( - rkbuf, - rkb->rkb_rk->rk_conf.group_session_timeout_ms + - 3000 /* 3s grace period*/, - 0); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); -} - - - -/** - * Send JoinGroupRequest - */ -void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb, - const rd_kafkap_str_t *group_id, - const rd_kafkap_str_t *member_id, - const rd_kafkap_str_t *group_instance_id, - const rd_kafkap_str_t *protocol_type, - const rd_list_t *topics, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - rd_kafka_t *rk = rkb->rkb_rk; - rd_kafka_assignor_t *rkas; - int i; - int16_t ApiVersion = 0; - int features; - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_JoinGroup, 0, 5, &features); - - - rkbuf = rd_kafka_buf_new_request( - rkb, RD_KAFKAP_JoinGroup, 1, - RD_KAFKAP_STR_SIZE(group_id) + 4 /* sessionTimeoutMs */ + - 4 /* rebalanceTimeoutMs */ + RD_KAFKAP_STR_SIZE(member_id) + - RD_KAFKAP_STR_SIZE(group_instance_id) + - RD_KAFKAP_STR_SIZE(protocol_type) + - 4 /* array count GroupProtocols */ + - (rd_list_cnt(topics) * 100)); - rd_kafka_buf_write_kstr(rkbuf, group_id); - rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.group_session_timeout_ms); - if (ApiVersion >= 1) - rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.max_poll_interval_ms); - rd_kafka_buf_write_kstr(rkbuf, member_id); - if (ApiVersion >= 5) - rd_kafka_buf_write_kstr(rkbuf, group_instance_id); - rd_kafka_buf_write_kstr(rkbuf, protocol_type); - rd_kafka_buf_write_i32(rkbuf, rk->rk_conf.enabled_assignor_cnt); - - RD_LIST_FOREACH(rkas, &rk->rk_conf.partition_assignors, i) { - rd_kafkap_bytes_t *member_metadata; - if (!rkas->rkas_enabled) - continue; - rd_kafka_buf_write_kstr(rkbuf, rkas->rkas_protocol_name); - member_metadata = rkas->rkas_get_metadata_cb( - rkas, rk->rk_cgrp->rkcg_assignor_state, topics, - rk->rk_cgrp->rkcg_group_assignment); - rd_kafka_buf_write_kbytes(rkbuf, member_metadata); - rd_kafkap_bytes_destroy(member_metadata); - } - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - if (ApiVersion < 1 && - rk->rk_conf.max_poll_interval_ms > - rk->rk_conf.group_session_timeout_ms && - rd_interval(&rkb->rkb_suppress.unsupported_kip62, - /* at most once per day */ - (rd_ts_t)86400 * 1000 * 1000, 0) > 0) - rd_rkb_log(rkb, LOG_NOTICE, "MAXPOLL", - "Broker does not support KIP-62 " - "(requires Apache Kafka >= v0.10.1.0): " - "consumer configuration " - "`max.poll.interval.ms` (%d) " - "is effectively limited " - "by `session.timeout.ms` (%d) " - "with this broker version", - rk->rk_conf.max_poll_interval_ms, - rk->rk_conf.group_session_timeout_ms); - - - if (ApiVersion < 5 && rk->rk_conf.group_instance_id && - rd_interval(&rkb->rkb_suppress.unsupported_kip345, - /* at most once per day */ - (rd_ts_t)86400 * 1000 * 1000, 0) > 0) - rd_rkb_log(rkb, LOG_NOTICE, "STATICMEMBER", - "Broker does not support KIP-345 " - "(requires Apache Kafka >= v2.3.0): " - "consumer configuration " - "`group.instance.id` (%s) " - "will not take effect", - rk->rk_conf.group_instance_id); - - /* Absolute timeout */ - rd_kafka_buf_set_abs_timeout_force( - rkbuf, - /* Request timeout is max.poll.interval.ms + grace - * if the broker supports it, else - * session.timeout.ms + grace. */ - (ApiVersion >= 1 ? rk->rk_conf.max_poll_interval_ms - : rk->rk_conf.group_session_timeout_ms) + - 3000 /* 3s grace period*/, - 0); - - /* This is a blocking request */ - rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_BLOCKING; - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); -} - - - -/** - * Send LeaveGroupRequest - */ -void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb, - const char *group_id, - const char *member_id, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - int features; - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_LeaveGroup, 0, 1, &features); - - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup, 1, 300); - - rd_kafka_buf_write_str(rkbuf, group_id, -1); - rd_kafka_buf_write_str(rkbuf, member_id, -1); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - /* LeaveGroupRequests are best-effort, the local consumer - * does not care if it succeeds or not, so the request timeout - * is shortened. - * Retries are not needed. */ - rd_kafka_buf_set_abs_timeout(rkbuf, 5000, 0); - rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); -} - - -/** - * Handler for LeaveGroup responses - * opaque must be the cgrp handle. - */ -void rd_kafka_handle_LeaveGroup(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - void *opaque) { - rd_kafka_cgrp_t *rkcg = opaque; - const int log_decode_errors = LOG_ERR; - int16_t ErrorCode = 0; - int actions; - - if (err) { - ErrorCode = err; - goto err; - } - - rd_kafka_buf_read_i16(rkbuf, &ErrorCode); - -err: - actions = rd_kafka_err_action(rkb, ErrorCode, request, - RD_KAFKA_ERR_ACTION_END); - - if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { - /* Re-query for coordinator */ - rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, - RD_KAFKA_OP_COORD_QUERY, ErrorCode); - } - - if (actions & RD_KAFKA_ERR_ACTION_RETRY) { - if (rd_kafka_buf_retry(rkb, request)) - return; - /* FALLTHRU */ - } - - if (ErrorCode) - rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP", - "LeaveGroup response: %s", - rd_kafka_err2str(ErrorCode)); - - return; - -err_parse: - ErrorCode = rkbuf->rkbuf_err; - goto err; -} - - - -/** - * Send HeartbeatRequest - */ -void rd_kafka_HeartbeatRequest(rd_kafka_broker_t *rkb, - const rd_kafkap_str_t *group_id, - int32_t generation_id, - const rd_kafkap_str_t *member_id, - const rd_kafkap_str_t *group_instance_id, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - int features; - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_Heartbeat, 0, 3, &features); - - rd_rkb_dbg(rkb, CGRP, "HEARTBEAT", - "Heartbeat for group \"%s\" generation id %" PRId32, - group_id->str, generation_id); - - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_Heartbeat, 1, - RD_KAFKAP_STR_SIZE(group_id) + - 4 /* GenerationId */ + - RD_KAFKAP_STR_SIZE(member_id)); - - rd_kafka_buf_write_kstr(rkbuf, group_id); - rd_kafka_buf_write_i32(rkbuf, generation_id); - rd_kafka_buf_write_kstr(rkbuf, member_id); - if (ApiVersion >= 3) - rd_kafka_buf_write_kstr(rkbuf, group_instance_id); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_kafka_buf_set_abs_timeout( - rkbuf, rkb->rkb_rk->rk_conf.group_session_timeout_ms, 0); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); -} - - - -/** - * @brief Construct and send ListGroupsRequest to \p rkb - * with the states (const char *) in \p states. - * Uses \p max_ApiVersion as maximum API version, - * pass -1 to use the maximum available version. - * - * The response (unparsed) will be enqueued on \p replyq - * for handling by \p resp_cb (with \p opaque passed). - * - * @return NULL on success, a new error instance that must be - * released with rd_kafka_error_destroy() in case of error. - */ -rd_kafka_error_t *rd_kafka_ListGroupsRequest(rd_kafka_broker_t *rkb, - int16_t max_ApiVersion, - const char **states, - size_t states_cnt, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - size_t i; - - if (max_ApiVersion < 0) - max_ApiVersion = 4; - - if (max_ApiVersion > ApiVersion) { - /* Remark: don't check if max_ApiVersion is zero. - * As rd_kafka_broker_ApiVersion_supported cannot be checked - * in the application thread reliably . */ - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_ListGroups, 0, max_ApiVersion, NULL); - } - - if (ApiVersion == -1) { - return rd_kafka_error_new( - RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, - "ListGroupsRequest not supported by broker"); - } - - rkbuf = rd_kafka_buf_new_flexver_request( - rkb, RD_KAFKAP_ListGroups, 1, - /* rd_kafka_buf_write_arraycnt_pos + tags + StatesFilter */ - 4 + 1 + 32 * states_cnt, ApiVersion >= 3 /* is_flexver */); - - if (ApiVersion >= 4) { - size_t of_GroupsArrayCnt = - rd_kafka_buf_write_arraycnt_pos(rkbuf); - for (i = 0; i < states_cnt; i++) { - rd_kafka_buf_write_str(rkbuf, states[i], -1); - } - rd_kafka_buf_finalize_arraycnt(rkbuf, of_GroupsArrayCnt, i); - } - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - return NULL; -} - -/** - * @brief Construct and send DescribeGroupsRequest to \p rkb - * with the groups (const char *) in \p groups. - * Uses \p max_ApiVersion as maximum API version, - * pass -1 to use the maximum available version. - * - * The response (unparsed) will be enqueued on \p replyq - * for handling by \p resp_cb (with \p opaque passed). - * - * @return NULL on success, a new error instance that must be - * released with rd_kafka_error_destroy() in case of error. - */ -rd_kafka_error_t *rd_kafka_DescribeGroupsRequest(rd_kafka_broker_t *rkb, - int16_t max_ApiVersion, - char **groups, - size_t group_cnt, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - size_t of_GroupsArrayCnt; - - if (max_ApiVersion < 0) - max_ApiVersion = 4; - - if (max_ApiVersion > ApiVersion) { - /* Remark: don't check if max_ApiVersion is zero. - * As rd_kafka_broker_ApiVersion_supported cannot be checked - * in the application thread reliably . */ - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_DescribeGroups, 0, max_ApiVersion, NULL); - } - - if (ApiVersion == -1) { - return rd_kafka_error_new( - RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, - "DescribeGroupsRequest not supported by broker"); - } - - rkbuf = rd_kafka_buf_new_flexver_request( - rkb, RD_KAFKAP_DescribeGroups, 1, - 4 /* rd_kafka_buf_write_arraycnt_pos */ + - 1 /* IncludeAuthorizedOperations */ + 1 /* tags */ + - 32 * group_cnt /* Groups */, - rd_false); - - /* write Groups */ - of_GroupsArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf); - rd_kafka_buf_finalize_arraycnt(rkbuf, of_GroupsArrayCnt, group_cnt); - while (group_cnt-- > 0) - rd_kafka_buf_write_str(rkbuf, groups[group_cnt], -1); - - /* write IncludeAuthorizedOperations */ - if (ApiVersion >= 3) { - /* TODO: implement KIP-430 */ - rd_kafka_buf_write_bool(rkbuf, rd_false); - } - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - return NULL; -} - -/** - * @brief Generic handler for Metadata responses - * - * @locality rdkafka main thread - */ -static void rd_kafka_handle_Metadata(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - void *opaque) { - rd_kafka_op_t *rko = opaque; /* Possibly NULL */ - struct rd_kafka_metadata *md = NULL; - const rd_list_t *topics = request->rkbuf_u.Metadata.topics; - int actions; - - rd_kafka_assert(NULL, err == RD_KAFKA_RESP_ERR__DESTROY || - thrd_is_current(rk->rk_thread)); - - /* Avoid metadata updates when we're terminating. */ - if (rd_kafka_terminating(rkb->rkb_rk) || - err == RD_KAFKA_RESP_ERR__DESTROY) { - /* Terminating */ - goto done; - } - - if (err) - goto err; - - if (!topics) - rd_rkb_dbg(rkb, METADATA, "METADATA", - "===== Received metadata: %s =====", - request->rkbuf_u.Metadata.reason); - else - rd_rkb_dbg(rkb, METADATA, "METADATA", - "===== Received metadata " - "(for %d requested topics): %s =====", - rd_list_cnt(topics), - request->rkbuf_u.Metadata.reason); - - err = rd_kafka_parse_Metadata(rkb, request, rkbuf, &md); - if (err) - goto err; - - if (rko && rko->rko_replyq.q) { - /* Reply to metadata requester, passing on the metadata. - * Reuse requesting rko for the reply. */ - rko->rko_err = err; - rko->rko_u.metadata.md = md; - - rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0); - rko = NULL; - } else { - if (md) - rd_free(md); - } - - goto done; - -err: - actions = rd_kafka_err_action(rkb, err, request, - - RD_KAFKA_ERR_ACTION_RETRY, - RD_KAFKA_RESP_ERR__PARTIAL, - - RD_KAFKA_ERR_ACTION_END); - - if (actions & RD_KAFKA_ERR_ACTION_RETRY) { - if (rd_kafka_buf_retry(rkb, request)) - return; - /* FALLTHRU */ - } else { - rd_rkb_log(rkb, LOG_WARNING, "METADATA", - "Metadata request failed: %s: %s (%dms): %s", - request->rkbuf_u.Metadata.reason, - rd_kafka_err2str(err), - (int)(request->rkbuf_ts_sent / 1000), - rd_kafka_actions2str(actions)); - /* Respond back to caller on non-retriable errors */ - if (rko && rko->rko_replyq.q) { - rko->rko_err = err; - rko->rko_u.metadata.md = NULL; - rd_kafka_replyq_enq(&rko->rko_replyq, rko, 0); - rko = NULL; - } - } - - - - /* FALLTHRU */ - -done: - if (rko) - rd_kafka_op_destroy(rko); -} - - -/** - * @brief Construct MetadataRequest (does not send) - * - * \p topics is a list of topic names (char *) to request. - * - * !topics - only request brokers (if supported by broker, else - * all topics) - * topics.cnt==0 - all topics in cluster are requested - * topics.cnt >0 - only specified topics are requested - * - * @param reason - metadata request reason - * @param allow_auto_create_topics - allow broker-side auto topic creation. - * This is best-effort, depending on broker - * config and version. - * @param cgrp_update - Update cgrp in parse_Metadata (see comment there). - * @param rko - (optional) rko with replyq for handling response. - * Specifying an rko forces a metadata request even if - * there is already a matching one in-transit. - * - * If full metadata for all topics is requested (or all brokers, which - * results in all-topics on older brokers) and there is already a full request - * in transit then this function will return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS - * otherwise RD_KAFKA_RESP_ERR_NO_ERROR. If \p rko is non-NULL the request - * is sent regardless. - */ -rd_kafka_resp_err_t rd_kafka_MetadataRequest(rd_kafka_broker_t *rkb, - const rd_list_t *topics, - const char *reason, - rd_bool_t allow_auto_create_topics, - rd_bool_t cgrp_update, - rd_kafka_op_t *rko) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - size_t of_TopicArrayCnt; - int features; - int topic_cnt = topics ? rd_list_cnt(topics) : 0; - int *full_incr = NULL; - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_Metadata, 0, 9, &features); - - rkbuf = rd_kafka_buf_new_flexver_request(rkb, RD_KAFKAP_Metadata, 1, - 4 + (50 * topic_cnt) + 1, - ApiVersion >= 9); - - if (!reason) - reason = ""; - - rkbuf->rkbuf_u.Metadata.reason = rd_strdup(reason); - rkbuf->rkbuf_u.Metadata.cgrp_update = cgrp_update; - - /* TopicArrayCnt */ - of_TopicArrayCnt = rd_kafka_buf_write_arraycnt_pos(rkbuf); - - if (!topics) { - /* v0: keep 0, brokers only not available, - * request all topics */ - /* v1-8: 0 means empty array, brokers only */ - if (ApiVersion >= 9) { - /* v9+: varint encoded empty array (1), brokers only */ - rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicArrayCnt, - topic_cnt); - } - - rd_rkb_dbg(rkb, METADATA, "METADATA", - "Request metadata for brokers only: %s", reason); - full_incr = - &rkb->rkb_rk->rk_metadata_cache.rkmc_full_brokers_sent; - - } else if (topic_cnt == 0) { - /* v0: keep 0, request all topics */ - if (ApiVersion >= 1 && ApiVersion < 9) { - /* v1-8: update to -1, all topics */ - rd_kafka_buf_update_i32(rkbuf, of_TopicArrayCnt, -1); - } - /* v9+: keep 0, varint encoded null, all topics */ - - rkbuf->rkbuf_u.Metadata.all_topics = 1; - rd_rkb_dbg(rkb, METADATA, "METADATA", - "Request metadata for all topics: " - "%s", - reason); - - if (!rko) - full_incr = &rkb->rkb_rk->rk_metadata_cache - .rkmc_full_topics_sent; - - } else { - /* request cnt topics */ - rd_kafka_buf_finalize_arraycnt(rkbuf, of_TopicArrayCnt, - topic_cnt); - - rd_rkb_dbg(rkb, METADATA, "METADATA", - "Request metadata for %d topic(s): " - "%s", - topic_cnt, reason); - } - - if (full_incr) { - /* Avoid multiple outstanding full requests - * (since they are redundant and side-effect-less). - * Forced requests (app using metadata() API) are passed - * through regardless. */ - - mtx_lock(&rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock); - if (*full_incr > 0 && (!rko || !rko->rko_u.metadata.force)) { - mtx_unlock( - &rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock); - rd_rkb_dbg(rkb, METADATA, "METADATA", - "Skipping metadata request: %s: " - "full request already in-transit", - reason); - rd_kafka_buf_destroy(rkbuf); - return RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS; - } - - (*full_incr)++; - mtx_unlock(&rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock); - rkbuf->rkbuf_u.Metadata.decr = full_incr; - rkbuf->rkbuf_u.Metadata.decr_lock = - &rkb->rkb_rk->rk_metadata_cache.rkmc_full_lock; - } - - - if (topic_cnt > 0) { - char *topic; - int i; - - /* Maintain a copy of the topics list so we can purge - * hints from the metadata cache on error. */ - rkbuf->rkbuf_u.Metadata.topics = - rd_list_copy(topics, rd_list_string_copy, NULL); - - RD_LIST_FOREACH(topic, topics, i) { - rd_kafka_buf_write_str(rkbuf, topic, -1); - /* Tags for previous topic */ - rd_kafka_buf_write_tags(rkbuf); - } - } - - if (ApiVersion >= 4) { - /* AllowAutoTopicCreation */ - rd_kafka_buf_write_bool(rkbuf, allow_auto_create_topics); - - } else if (rkb->rkb_rk->rk_type == RD_KAFKA_CONSUMER && - !rkb->rkb_rk->rk_conf.allow_auto_create_topics && - rd_kafka_conf_is_modified(&rkb->rkb_rk->rk_conf, - "allow.auto.create.topics") && - rd_interval( - &rkb->rkb_rk->rk_suppress.allow_auto_create_topics, - 30 * 60 * 1000 /* every 30 minutes */, 0) >= 0) { - /* Let user know we can't obey allow.auto.create.topics */ - rd_rkb_log(rkb, LOG_WARNING, "AUTOCREATE", - "allow.auto.create.topics=false not supported " - "by broker: requires broker version >= 0.11.0.0: " - "requested topic(s) may be auto created depending " - "on broker auto.create.topics.enable configuration"); - } - - if (ApiVersion >= 8 && ApiVersion < 10) { - /* TODO: implement KIP-430 */ - /* IncludeClusterAuthorizedOperations */ - rd_kafka_buf_write_bool(rkbuf, rd_false); - } - - if (ApiVersion >= 8) { - /* TODO: implement KIP-430 */ - /* IncludeTopicAuthorizedOperations */ - rd_kafka_buf_write_bool(rkbuf, rd_false); - } - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - /* Metadata requests are part of the important control plane - * and should go before most other requests (Produce, Fetch, etc). */ - rkbuf->rkbuf_prio = RD_KAFKA_PRIO_HIGH; - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, - /* Handle response thru rk_ops, - * but forward parsed result to - * rko's replyq when done. */ - RD_KAFKA_REPLYQ(rkb->rkb_rk->rk_ops, 0), - rd_kafka_handle_Metadata, rko); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - - -/** - * @brief Parses and handles ApiVersion reply. - * - * @param apis will be allocated, populated and sorted - * with broker's supported APIs, or set to NULL. - * @param api_cnt will be set to the number of elements in \p *apis - * - * @returns 0 on success, else an error. - * - * @remark A valid \p apis might be returned even if an error is returned. - */ -rd_kafka_resp_err_t -rd_kafka_handle_ApiVersion(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - struct rd_kafka_ApiVersion **apis, - size_t *api_cnt) { - const int log_decode_errors = LOG_DEBUG; - int32_t ApiArrayCnt; - int16_t ErrorCode; - int i = 0; - - *apis = NULL; - *api_cnt = 0; - - if (err) - goto err; - - rd_kafka_buf_read_i16(rkbuf, &ErrorCode); - err = ErrorCode; - - rd_kafka_buf_read_arraycnt(rkbuf, &ApiArrayCnt, 1000); - if (err && ApiArrayCnt < 1) { - /* Version >=3 returns the ApiVersions array if the error - * code is ERR_UNSUPPORTED_VERSION, previous versions don't */ - goto err; - } - - rd_rkb_dbg(rkb, FEATURE, "APIVERSION", "Broker API support:"); - - *apis = rd_malloc(sizeof(**apis) * ApiArrayCnt); - - for (i = 0; i < ApiArrayCnt; i++) { - struct rd_kafka_ApiVersion *api = &(*apis)[i]; - - rd_kafka_buf_read_i16(rkbuf, &api->ApiKey); - rd_kafka_buf_read_i16(rkbuf, &api->MinVer); - rd_kafka_buf_read_i16(rkbuf, &api->MaxVer); - - rd_rkb_dbg(rkb, FEATURE, "APIVERSION", - " ApiKey %s (%hd) Versions %hd..%hd", - rd_kafka_ApiKey2str(api->ApiKey), api->ApiKey, - api->MinVer, api->MaxVer); - - /* Discard struct tags */ - rd_kafka_buf_skip_tags(rkbuf); - } - - if (request->rkbuf_reqhdr.ApiVersion >= 1) - rd_kafka_buf_read_throttle_time(rkbuf); - - /* Discard end tags */ - rd_kafka_buf_skip_tags(rkbuf); - - *api_cnt = ApiArrayCnt; - qsort(*apis, *api_cnt, sizeof(**apis), rd_kafka_ApiVersion_key_cmp); - - goto done; - -err_parse: - /* If the broker does not support our ApiVersionRequest version it - * will respond with a version 0 response, which will most likely - * fail parsing. Instead of propagating the parse error we - * propagate the original error, unless there isn't one in which case - * we use the parse error. */ - if (!err) - err = rkbuf->rkbuf_err; -err: - /* There are no retryable errors. */ - - if (*apis) - rd_free(*apis); - - *apis = NULL; - *api_cnt = 0; - -done: - return err; -} - - - -/** - * @brief Send ApiVersionRequest (KIP-35) - * - * @param ApiVersion If -1 use the highest supported version, else use the - * specified value. - */ -void rd_kafka_ApiVersionRequest(rd_kafka_broker_t *rkb, - int16_t ApiVersion, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - - if (ApiVersion == -1) - ApiVersion = 3; - - rkbuf = rd_kafka_buf_new_flexver_request( - rkb, RD_KAFKAP_ApiVersion, 1, 3, ApiVersion >= 3 /*flexver*/); - - if (ApiVersion >= 3) { - /* KIP-511 adds software name and version through the optional - * protocol fields defined in KIP-482. */ - - /* ClientSoftwareName */ - rd_kafka_buf_write_str(rkbuf, rkb->rkb_rk->rk_conf.sw_name, -1); - - /* ClientSoftwareVersion */ - rd_kafka_buf_write_str(rkbuf, rkb->rkb_rk->rk_conf.sw_version, - -1); - } - - /* Should be sent before any other requests since it is part of - * the initial connection handshake. */ - rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH; - - /* Non-supporting brokers will tear down the connection when they - * receive an unknown API request, so dont retry request on failure. */ - rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; - - /* 0.9.0.x brokers will not close the connection on unsupported - * API requests, so we minimize the timeout for the request. - * This is a regression on the broker part. */ - rd_kafka_buf_set_abs_timeout( - rkbuf, rkb->rkb_rk->rk_conf.api_version_request_timeout_ms, 0); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - if (replyq.q) - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, - opaque); - else /* in broker thread */ - rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque); -} - - -/** - * Send SaslHandshakeRequest (KIP-43) - */ -void rd_kafka_SaslHandshakeRequest(rd_kafka_broker_t *rkb, - const char *mechanism, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int mechlen = (int)strlen(mechanism); - int16_t ApiVersion; - int features; - - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslHandshake, 1, - RD_KAFKAP_STR_SIZE0(mechlen)); - - /* Should be sent before any other requests since it is part of - * the initial connection handshake. */ - rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH; - - rd_kafka_buf_write_str(rkbuf, mechanism, mechlen); - - /* Non-supporting brokers will tear down the conneciton when they - * receive an unknown API request or where the SASL GSSAPI - * token type is not recognized, so dont retry request on failure. */ - rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; - - /* 0.9.0.x brokers will not close the connection on unsupported - * API requests, so we minimize the timeout of the request. - * This is a regression on the broker part. */ - if (!rkb->rkb_rk->rk_conf.api_version_request && - rkb->rkb_rk->rk_conf.socket_timeout_ms > 10 * 1000) - rd_kafka_buf_set_abs_timeout(rkbuf, 10 * 1000 /*10s*/, 0); - - /* ApiVersion 1 / RD_KAFKA_FEATURE_SASL_REQ enables - * the SaslAuthenticateRequest */ - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_SaslHandshake, 0, 1, &features); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - if (replyq.q) - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, - opaque); - else /* in broker thread */ - rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque); -} - - -/** - * @brief Parses and handles an SaslAuthenticate reply. - * - * @returns 0 on success, else an error. - * - * @locality broker thread - * @locks none - */ -void rd_kafka_handle_SaslAuthenticate(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - void *opaque) { - const int log_decode_errors = LOG_ERR; - int16_t error_code; - rd_kafkap_str_t error_str; - rd_kafkap_bytes_t auth_data; - char errstr[512]; - - if (err) { - rd_snprintf(errstr, sizeof(errstr), - "SaslAuthenticateRequest failed: %s", - rd_kafka_err2str(err)); - goto err; - } - - rd_kafka_buf_read_i16(rkbuf, &error_code); - rd_kafka_buf_read_str(rkbuf, &error_str); - - if (error_code) { - /* Authentication failed */ - - /* For backwards compatibility translate the - * new broker-side auth error code to our local error code. */ - if (error_code == RD_KAFKA_RESP_ERR_SASL_AUTHENTICATION_FAILED) - err = RD_KAFKA_RESP_ERR__AUTHENTICATION; - else - err = error_code; - - rd_snprintf(errstr, sizeof(errstr), "%.*s", - RD_KAFKAP_STR_PR(&error_str)); - goto err; - } - - rd_kafka_buf_read_bytes(rkbuf, &auth_data); - - /* Pass SASL auth frame to SASL handler */ - if (rd_kafka_sasl_recv(rkb->rkb_transport, auth_data.data, - (size_t)RD_KAFKAP_BYTES_LEN(&auth_data), errstr, - sizeof(errstr)) == -1) { - err = RD_KAFKA_RESP_ERR__AUTHENTICATION; - goto err; - } - - return; - - -err_parse: - err = rkbuf->rkbuf_err; - rd_snprintf(errstr, sizeof(errstr), - "SaslAuthenticateResponse parsing failed: %s", - rd_kafka_err2str(err)); - -err: - rd_kafka_broker_fail(rkb, LOG_ERR, err, "SASL authentication error: %s", - errstr); -} - - -/** - * @brief Send SaslAuthenticateRequest (KIP-152) - */ -void rd_kafka_SaslAuthenticateRequest(rd_kafka_broker_t *rkb, - const void *buf, - size_t size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_SaslAuthenticate, 0, 0); - - /* Should be sent before any other requests since it is part of - * the initial connection handshake. */ - rkbuf->rkbuf_prio = RD_KAFKA_PRIO_FLASH; - - /* Broker does not support -1 (Null) for this field */ - rd_kafka_buf_write_bytes(rkbuf, buf ? buf : "", size); - - /* There are no errors that can be retried, instead - * close down the connection and reconnect on failure. */ - rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; - - if (replyq.q) - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, - opaque); - else /* in broker thread */ - rd_kafka_broker_buf_enq1(rkb, rkbuf, resp_cb, opaque); -} - - - -/** - * @struct Hold temporary result and return values from ProduceResponse - */ -struct rd_kafka_Produce_result { - int64_t offset; /**< Assigned offset of first message */ - int64_t timestamp; /**< (Possibly assigned) offset of first message */ -}; - -/** - * @brief Parses a Produce reply. - * @returns 0 on success or an error code on failure. - * @locality broker thread - */ -static rd_kafka_resp_err_t -rd_kafka_handle_Produce_parse(rd_kafka_broker_t *rkb, - rd_kafka_toppar_t *rktp, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - struct rd_kafka_Produce_result *result) { - int32_t TopicArrayCnt; - int32_t PartitionArrayCnt; - struct { - int32_t Partition; - int16_t ErrorCode; - int64_t Offset; - } hdr; - const int log_decode_errors = LOG_ERR; - int64_t log_start_offset = -1; - - rd_kafka_buf_read_i32(rkbuf, &TopicArrayCnt); - if (TopicArrayCnt != 1) - goto err; - - /* Since we only produce to one single topic+partition in each - * request we assume that the reply only contains one topic+partition - * and that it is the same that we requested. - * If not the broker is buggy. */ - rd_kafka_buf_skip_str(rkbuf); - rd_kafka_buf_read_i32(rkbuf, &PartitionArrayCnt); - - if (PartitionArrayCnt != 1) - goto err; - - rd_kafka_buf_read_i32(rkbuf, &hdr.Partition); - rd_kafka_buf_read_i16(rkbuf, &hdr.ErrorCode); - rd_kafka_buf_read_i64(rkbuf, &hdr.Offset); - - result->offset = hdr.Offset; - - result->timestamp = -1; - if (request->rkbuf_reqhdr.ApiVersion >= 2) - rd_kafka_buf_read_i64(rkbuf, &result->timestamp); - - if (request->rkbuf_reqhdr.ApiVersion >= 5) - rd_kafka_buf_read_i64(rkbuf, &log_start_offset); - - if (request->rkbuf_reqhdr.ApiVersion >= 1) { - int32_t Throttle_Time; - rd_kafka_buf_read_i32(rkbuf, &Throttle_Time); - - rd_kafka_op_throttle_time(rkb, rkb->rkb_rk->rk_rep, - Throttle_Time); - } - - - return hdr.ErrorCode; - -err_parse: - return rkbuf->rkbuf_err; -err: - return RD_KAFKA_RESP_ERR__BAD_MSG; -} - - -/** - * @struct Hold temporary Produce error state - */ -struct rd_kafka_Produce_err { - rd_kafka_resp_err_t err; /**< Error code */ - int actions; /**< Actions to take */ - int incr_retry; /**< Increase per-message retry cnt */ - rd_kafka_msg_status_t status; /**< Messages persistence status */ - - /* Idempotent Producer */ - int32_t next_ack_seq; /**< Next expected sequence to ack */ - int32_t next_err_seq; /**< Next expected error sequence */ - rd_bool_t update_next_ack; /**< Update next_ack_seq */ - rd_bool_t update_next_err; /**< Update next_err_seq */ - rd_kafka_pid_t rktp_pid; /**< Partition's current PID */ - int32_t last_seq; /**< Last sequence in current batch */ -}; - - -/** - * @brief Error-handling for Idempotent Producer-specific Produce errors. - * - * May update \p errp, \p actionsp and \p incr_retryp. - * - * The resulting \p actionsp are handled by the caller. - * - * @warning May be called on the old leader thread. Lock rktp appropriately! - * - * @locality broker thread (but not necessarily the leader broker) - * @locks none - */ -static void -rd_kafka_handle_idempotent_Produce_error(rd_kafka_broker_t *rkb, - rd_kafka_msgbatch_t *batch, - struct rd_kafka_Produce_err *perr) { - rd_kafka_t *rk = rkb->rkb_rk; - rd_kafka_toppar_t *rktp = batch->rktp; - rd_kafka_msg_t *firstmsg, *lastmsg; - int r; - rd_ts_t now = rd_clock(), state_age; - struct rd_kafka_toppar_err last_err; - - rd_kafka_rdlock(rkb->rkb_rk); - state_age = now - rkb->rkb_rk->rk_eos.ts_idemp_state; - rd_kafka_rdunlock(rkb->rkb_rk); - - firstmsg = rd_kafka_msgq_first(&batch->msgq); - lastmsg = rd_kafka_msgq_last(&batch->msgq); - rd_assert(firstmsg && lastmsg); - - /* Store the last msgid of the batch - * on the first message in case we need to retry - * and thus reconstruct the entire batch. */ - if (firstmsg->rkm_u.producer.last_msgid) { - /* last_msgid already set, make sure it - * actually points to the last message. */ - rd_assert(firstmsg->rkm_u.producer.last_msgid == - lastmsg->rkm_u.producer.msgid); - } else { - firstmsg->rkm_u.producer.last_msgid = - lastmsg->rkm_u.producer.msgid; - } - - if (!rd_kafka_pid_eq(batch->pid, perr->rktp_pid)) { - /* Don't retry if PID changed since we can't - * guarantee correctness across PID sessions. */ - perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT; - perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; - - rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "ERRPID", - "%.*s [%" PRId32 - "] PID mismatch: " - "request %s != partition %s: " - "failing messages with error %s", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, rd_kafka_pid2str(batch->pid), - rd_kafka_pid2str(perr->rktp_pid), - rd_kafka_err2str(perr->err)); - return; - } - - /* - * Special error handling - */ - switch (perr->err) { - case RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER: - /* Compare request's sequence to expected next - * acked sequence. - * - * Example requests in flight: - * R1(base_seq:5) R2(10) R3(15) R4(20) - */ - - /* Acquire the last partition error to help - * troubleshoot this problem. */ - rd_kafka_toppar_lock(rktp); - last_err = rktp->rktp_last_err; - rd_kafka_toppar_unlock(rktp); - - r = batch->first_seq - perr->next_ack_seq; - - if (r == 0) { - /* R1 failed: - * If this was the head-of-line request in-flight it - * means there is a state desynchronization between the - * producer and broker (a bug), in which case - * we'll raise a fatal error since we can no longer - * reason about the state of messages and thus - * not guarantee ordering or once-ness for R1, - * nor give the user a chance to opt out of sending - * R2 to R4 which would be retried automatically. */ - - rd_kafka_idemp_set_fatal_error( - rk, perr->err, - "ProduceRequest for %.*s [%" PRId32 - "] " - "with %d message(s) failed " - "due to sequence desynchronization with " - "broker %" PRId32 " (%s, base seq %" PRId32 - ", " - "idemp state change %" PRId64 - "ms ago, " - "last partition error %s (actions %s, " - "base seq %" PRId32 "..%" PRId32 - ", base msgid %" PRIu64 ", %" PRId64 "ms ago)", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, - rd_kafka_msgq_len(&batch->msgq), rkb->rkb_nodeid, - rd_kafka_pid2str(batch->pid), batch->first_seq, - state_age / 1000, rd_kafka_err2name(last_err.err), - rd_kafka_actions2str(last_err.actions), - last_err.base_seq, last_err.last_seq, - last_err.base_msgid, - last_err.ts ? (now - last_err.ts) / 1000 : -1); - - perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT; - perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; - perr->update_next_ack = rd_false; - perr->update_next_err = rd_true; - - } else if (r > 0) { - /* R2 failed: - * With max.in.flight > 1 we can have a situation - * where the first request in-flight (R1) to the broker - * fails, which causes the sub-sequent requests - * that are in-flight to have a non-sequential - * sequence number and thus fail. - * But these sub-sequent requests (R2 to R4) are not at - * the risk of being duplicated so we bump the epoch and - * re-enqueue the messages for later retry - * (without incrementing retries). - */ - rd_rkb_dbg( - rkb, MSG | RD_KAFKA_DBG_EOS, "ERRSEQ", - "ProduceRequest for %.*s [%" PRId32 - "] " - "with %d message(s) failed " - "due to skipped sequence numbers " - "(%s, base seq %" PRId32 - " > " - "next seq %" PRId32 - ") " - "caused by previous failed request " - "(%s, actions %s, " - "base seq %" PRId32 "..%" PRId32 - ", base msgid %" PRIu64 ", %" PRId64 - "ms ago): " - "recovering and retrying", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, - rd_kafka_msgq_len(&batch->msgq), - rd_kafka_pid2str(batch->pid), batch->first_seq, - perr->next_ack_seq, rd_kafka_err2name(last_err.err), - rd_kafka_actions2str(last_err.actions), - last_err.base_seq, last_err.last_seq, - last_err.base_msgid, - last_err.ts ? (now - last_err.ts) / 1000 : -1); - - perr->incr_retry = 0; - perr->actions = RD_KAFKA_ERR_ACTION_RETRY; - perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; - perr->update_next_ack = rd_false; - perr->update_next_err = rd_true; - - rd_kafka_idemp_drain_epoch_bump( - rk, perr->err, "skipped sequence numbers"); - - } else { - /* Request's sequence is less than next ack, - * this should never happen unless we have - * local bug or the broker did not respond - * to the requests in order. */ - rd_kafka_idemp_set_fatal_error( - rk, perr->err, - "ProduceRequest for %.*s [%" PRId32 - "] " - "with %d message(s) failed " - "with rewound sequence number on " - "broker %" PRId32 - " (%s, " - "base seq %" PRId32 " < next seq %" PRId32 - "): " - "last error %s (actions %s, " - "base seq %" PRId32 "..%" PRId32 - ", base msgid %" PRIu64 ", %" PRId64 "ms ago)", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, - rd_kafka_msgq_len(&batch->msgq), rkb->rkb_nodeid, - rd_kafka_pid2str(batch->pid), batch->first_seq, - perr->next_ack_seq, rd_kafka_err2name(last_err.err), - rd_kafka_actions2str(last_err.actions), - last_err.base_seq, last_err.last_seq, - last_err.base_msgid, - last_err.ts ? (now - last_err.ts) / 1000 : -1); - - perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT; - perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; - perr->update_next_ack = rd_false; - perr->update_next_err = rd_false; - } - break; - - case RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER: - /* This error indicates that we successfully produced - * this set of messages before but this (supposed) retry failed. - * - * Treat as success, however offset and timestamp - * will be invalid. */ - - /* Future improvement/FIXME: - * But first make sure the first message has actually - * been retried, getting this error for a non-retried message - * indicates a synchronization issue or bug. */ - rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "DUPSEQ", - "ProduceRequest for %.*s [%" PRId32 - "] " - "with %d message(s) failed " - "due to duplicate sequence number: " - "previous send succeeded but was not acknowledged " - "(%s, base seq %" PRId32 - "): " - "marking the messages successfully delivered", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, - rd_kafka_msgq_len(&batch->msgq), - rd_kafka_pid2str(batch->pid), batch->first_seq); - - /* Void error, delivery succeeded */ - perr->err = RD_KAFKA_RESP_ERR_NO_ERROR; - perr->actions = 0; - perr->status = RD_KAFKA_MSG_STATUS_PERSISTED; - perr->update_next_ack = rd_true; - perr->update_next_err = rd_true; - break; - - case RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID: - /* The broker/cluster lost track of our PID because - * the last message we produced has now been deleted - * (by DeleteRecords, compaction, or topic retention policy). - * - * If all previous messages are accounted for and this is not - * a retry we can simply bump the epoch and reset the sequence - * number and then retry the message(s) again. - * - * If there are outstanding messages not yet acknowledged - * then there is no safe way to carry on without risking - * duplication or reordering, in which case we fail - * the producer. - * - * In case of the transactional producer and a transaction - * coordinator that supports KIP-360 (>= AK 2.5, checked from - * the txnmgr, not here) we'll raise an abortable error and - * flag that the epoch needs to be bumped on the coordinator. */ - if (rd_kafka_is_transactional(rk)) { - rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "UNKPID", - "ProduceRequest for %.*s [%" PRId32 - "] " - "with %d message(s) failed " - "due to unknown producer id " - "(%s, base seq %" PRId32 - ", %d retries): " - "failing the current transaction", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, - rd_kafka_msgq_len(&batch->msgq), - rd_kafka_pid2str(batch->pid), - batch->first_seq, - firstmsg->rkm_u.producer.retries); - - /* Drain outstanding requests and bump epoch. */ - rd_kafka_idemp_drain_epoch_bump(rk, perr->err, - "unknown producer id"); - - rd_kafka_txn_set_abortable_error_with_bump( - rk, RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID, - "ProduceRequest for %.*s [%" PRId32 - "] " - "with %d message(s) failed " - "due to unknown producer id", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, - rd_kafka_msgq_len(&batch->msgq)); - - perr->incr_retry = 0; - perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT; - perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; - perr->update_next_ack = rd_false; - perr->update_next_err = rd_true; - break; - - } else if (!firstmsg->rkm_u.producer.retries && - perr->next_err_seq == batch->first_seq) { - rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_EOS, "UNKPID", - "ProduceRequest for %.*s [%" PRId32 - "] " - "with %d message(s) failed " - "due to unknown producer id " - "(%s, base seq %" PRId32 - ", %d retries): " - "no risk of duplication/reordering: " - "resetting PID and retrying", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, - rd_kafka_msgq_len(&batch->msgq), - rd_kafka_pid2str(batch->pid), - batch->first_seq, - firstmsg->rkm_u.producer.retries); - - /* Drain outstanding requests and bump epoch. */ - rd_kafka_idemp_drain_epoch_bump(rk, perr->err, - "unknown producer id"); - - perr->incr_retry = 0; - perr->actions = RD_KAFKA_ERR_ACTION_RETRY; - perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; - perr->update_next_ack = rd_false; - perr->update_next_err = rd_true; - break; - } - - rd_kafka_idemp_set_fatal_error( - rk, perr->err, - "ProduceRequest for %.*s [%" PRId32 - "] " - "with %d message(s) failed " - "due to unknown producer id (" - "broker %" PRId32 " %s, base seq %" PRId32 - ", %d retries): " - "unable to retry without risking " - "duplication/reordering", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, rd_kafka_msgq_len(&batch->msgq), - rkb->rkb_nodeid, rd_kafka_pid2str(batch->pid), - batch->first_seq, firstmsg->rkm_u.producer.retries); - - perr->actions = RD_KAFKA_ERR_ACTION_PERMANENT; - perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; - perr->update_next_ack = rd_false; - perr->update_next_err = rd_true; - break; - - default: - /* All other errors are handled in the standard - * error Produce handler, which will set - * update_next_ack|err accordingly. */ - break; - } -} - - - -/** - * @brief Error-handling for failed ProduceRequests - * - * @param errp Is the input and output error, it may be changed - * by this function. - * - * @returns 0 if no further processing of the request should be performed, - * such as triggering delivery reports, else 1. - * - * @warning May be called on the old leader thread. Lock rktp appropriately! - * - * @warning \p request may be NULL. - * - * @locality broker thread (but not necessarily the leader broker) - * @locks none - */ -static int rd_kafka_handle_Produce_error(rd_kafka_broker_t *rkb, - const rd_kafka_buf_t *request, - rd_kafka_msgbatch_t *batch, - struct rd_kafka_Produce_err *perr) { - rd_kafka_t *rk = rkb->rkb_rk; - rd_kafka_toppar_t *rktp = batch->rktp; - int is_leader; - - if (unlikely(perr->err == RD_KAFKA_RESP_ERR__DESTROY)) - return 0; /* Terminating */ - - /* When there is a partition leader change any outstanding - * requests to the old broker will be handled by the old - * broker thread when the responses are received/timeout: - * in this case we need to be careful with locking: - * check once if we're the leader (which allows relaxed - * locking), and cache the current rktp's eos state vars. */ - rd_kafka_toppar_lock(rktp); - is_leader = rktp->rktp_broker == rkb; - perr->rktp_pid = rktp->rktp_eos.pid; - perr->next_ack_seq = rktp->rktp_eos.next_ack_seq; - perr->next_err_seq = rktp->rktp_eos.next_err_seq; - rd_kafka_toppar_unlock(rktp); - - /* All failures are initially treated as if the message - * was not persisted, but the status may be changed later - * for specific errors and actions. */ - perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; - - /* Set actions for known errors (may be overriden later), - * all other errors are considered permanent failures. - * (also see rd_kafka_err_action() for the default actions). */ - perr->actions = rd_kafka_err_action( - rkb, perr->err, request, - - RD_KAFKA_ERR_ACTION_REFRESH | - RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, - RD_KAFKA_RESP_ERR__TRANSPORT, - - RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, - RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART, - - RD_KAFKA_ERR_ACTION_PERMANENT | - RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, - RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED, - - RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY | - RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, - RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, - - RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, - RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS, - - RD_KAFKA_ERR_ACTION_RETRY | - RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, - RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS_AFTER_APPEND, - - RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, - RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, - - RD_KAFKA_ERR_ACTION_RETRY | - RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, - RD_KAFKA_RESP_ERR__TIMED_OUT, - - RD_KAFKA_ERR_ACTION_PERMANENT | - RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, - RD_KAFKA_RESP_ERR__MSG_TIMED_OUT, - - /* All Idempotent Producer-specific errors are - * initially set as permanent errors, - * special handling may change the actions. */ - RD_KAFKA_ERR_ACTION_PERMANENT | - RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, - RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, - - RD_KAFKA_ERR_ACTION_PERMANENT | - RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED, - RD_KAFKA_RESP_ERR_DUPLICATE_SEQUENCE_NUMBER, - - RD_KAFKA_ERR_ACTION_PERMANENT | - RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, - RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID, - - RD_KAFKA_ERR_ACTION_PERMANENT | - RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED, - RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH, - - /* Message was purged from out-queue due to - * Idempotent Producer Id change */ - RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__RETRY, - - RD_KAFKA_ERR_ACTION_END); - - rd_rkb_dbg(rkb, MSG, "MSGSET", - "%s [%" PRId32 - "]: MessageSet with %i message(s) " - "(MsgId %" PRIu64 ", BaseSeq %" PRId32 - ") " - "encountered error: %s (actions %s)%s", - rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, - rd_kafka_msgq_len(&batch->msgq), batch->first_msgid, - batch->first_seq, rd_kafka_err2str(perr->err), - rd_kafka_actions2str(perr->actions), - is_leader ? "" : " [NOT LEADER]"); - - - /* - * Special handling for Idempotent Producer - * - * Note: Idempotent Producer-specific errors received - * on a non-idempotent producer will be passed through - * directly to the application. - */ - if (rd_kafka_is_idempotent(rk)) - rd_kafka_handle_idempotent_Produce_error(rkb, batch, perr); - - /* Update message persistence status based on action flags. - * None of these are typically set after an idempotent error, - * which sets the status explicitly. */ - if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_POSSIBLY_PERSISTED) - perr->status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; - else if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_NOT_PERSISTED) - perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; - else if (perr->actions & RD_KAFKA_ERR_ACTION_MSG_PERSISTED) - perr->status = RD_KAFKA_MSG_STATUS_PERSISTED; - - /* Save the last error for debugging sub-sequent errors, - * useful for Idempotent Producer throubleshooting. */ - rd_kafka_toppar_lock(rktp); - rktp->rktp_last_err.err = perr->err; - rktp->rktp_last_err.actions = perr->actions; - rktp->rktp_last_err.ts = rd_clock(); - rktp->rktp_last_err.base_seq = batch->first_seq; - rktp->rktp_last_err.last_seq = perr->last_seq; - rktp->rktp_last_err.base_msgid = batch->first_msgid; - rd_kafka_toppar_unlock(rktp); - - /* - * Handle actions - */ - if (perr->actions & - (RD_KAFKA_ERR_ACTION_REFRESH | RD_KAFKA_ERR_ACTION_RETRY)) { - /* Retry (refresh also implies retry) */ - - if (perr->actions & RD_KAFKA_ERR_ACTION_REFRESH) { - /* Request metadata information update. - * These errors imply that we have stale - * information and the request was - * either rejected or not sent - - * we don't need to increment the retry count - * when we perform a retry since: - * - it is a temporary error (hopefully) - * - there is no chance of duplicate delivery - */ - rd_kafka_toppar_leader_unavailable(rktp, "produce", - perr->err); - - /* We can't be certain the request wasn't - * sent in case of transport failure, - * so the ERR__TRANSPORT case will need - * the retry count to be increased, - * In case of certain other errors we want to - * avoid retrying for the duration of the - * message.timeout.ms to speed up error propagation. */ - if (perr->err != RD_KAFKA_RESP_ERR__TRANSPORT && - perr->err != RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR) - perr->incr_retry = 0; - } - - /* If message timed out in queue, not in transit, - * we will retry at a later time but not increment - * the retry count since there is no risk - * of duplicates. */ - if (!rd_kafka_buf_was_sent(request)) - perr->incr_retry = 0; - - if (!perr->incr_retry) { - /* If retries are not to be incremented then - * there is no chance of duplicates on retry, which - * means these messages were not persisted. */ - perr->status = RD_KAFKA_MSG_STATUS_NOT_PERSISTED; - } - - if (rd_kafka_is_idempotent(rk)) { - /* Any currently in-flight requests will - * fail with ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, - * which should not be treated as a fatal error - * since this request and sub-sequent requests - * will be retried and thus return to order. - * Unless the error was a timeout, or similar, - * in which case the request might have made it - * and the messages are considered possibly persisted: - * in this case we allow the next in-flight response - * to be successful, in which case we mark - * this request's messages as succesfully delivered. */ - if (perr->status & - RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED) - perr->update_next_ack = rd_true; - else - perr->update_next_ack = rd_false; - perr->update_next_err = rd_true; - - /* Drain outstanding requests so that retries - * are attempted with proper state knowledge and - * without any in-flight requests. */ - rd_kafka_toppar_lock(rktp); - rd_kafka_idemp_drain_toppar(rktp, - "drain before retrying"); - rd_kafka_toppar_unlock(rktp); - } - - /* Since requests are specific to a broker - * we move the retryable messages from the request - * back to the partition queue (prepend) and then - * let the new broker construct a new request. - * While doing this we also make sure the retry count - * for each message is honoured, any messages that - * would exceeded the retry count will not be - * moved but instead fail below. */ - rd_kafka_toppar_retry_msgq(rktp, &batch->msgq, perr->incr_retry, - perr->status); - - if (rd_kafka_msgq_len(&batch->msgq) == 0) { - /* No need do anything more with the request - * here since the request no longer has any - * messages associated with it. */ - return 0; - } - } - - if (perr->actions & RD_KAFKA_ERR_ACTION_PERMANENT && - rd_kafka_is_idempotent(rk)) { - if (rd_kafka_is_transactional(rk) && - perr->err == RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH) { - /* Producer was fenced by new transactional producer - * with the same transactional.id */ - rd_kafka_txn_set_fatal_error( - rk, RD_DO_LOCK, RD_KAFKA_RESP_ERR__FENCED, - "ProduceRequest for %.*s [%" PRId32 - "] " - "with %d message(s) failed: %s " - "(broker %" PRId32 " %s, base seq %" PRId32 - "): " - "transactional producer fenced by newer " - "producer instance", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, - rd_kafka_msgq_len(&batch->msgq), - rd_kafka_err2str(perr->err), rkb->rkb_nodeid, - rd_kafka_pid2str(batch->pid), batch->first_seq); - - /* Drain outstanding requests and reset PID. */ - rd_kafka_idemp_drain_reset( - rk, "fenced by new transactional producer"); - - } else if (rd_kafka_is_transactional(rk)) { - /* When transactional any permanent produce failure - * would lead to an incomplete transaction, so raise - * an abortable transaction error. */ - rd_kafka_txn_set_abortable_error( - rk, perr->err, - "ProduceRequest for %.*s [%" PRId32 - "] " - "with %d message(s) failed: %s " - "(broker %" PRId32 " %s, base seq %" PRId32 - "): " - "current transaction must be aborted", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, - rd_kafka_msgq_len(&batch->msgq), - rd_kafka_err2str(perr->err), rkb->rkb_nodeid, - rd_kafka_pid2str(batch->pid), batch->first_seq); - - } else if (rk->rk_conf.eos.gapless) { - /* A permanent non-idempotent error will lead to - * gaps in the message series, the next request - * will fail with ...ERR_OUT_OF_ORDER_SEQUENCE_NUMBER. - * To satisfy the gapless guarantee we need to raise - * a fatal error here. */ - rd_kafka_idemp_set_fatal_error( - rk, RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE, - "ProduceRequest for %.*s [%" PRId32 - "] " - "with %d message(s) failed: " - "%s (broker %" PRId32 " %s, base seq %" PRId32 - "): " - "unable to satisfy gap-less guarantee", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, - rd_kafka_msgq_len(&batch->msgq), - rd_kafka_err2str(perr->err), rkb->rkb_nodeid, - rd_kafka_pid2str(batch->pid), batch->first_seq); - - /* Drain outstanding requests and reset PID. */ - rd_kafka_idemp_drain_reset( - rk, "unable to satisfy gap-less guarantee"); - - } else { - /* If gapless is not set we bump the Epoch and - * renumber the messages to send. */ - - /* Drain outstanding requests and bump the epoch .*/ - rd_kafka_idemp_drain_epoch_bump(rk, perr->err, - "message sequence gap"); - } - - perr->update_next_ack = rd_false; - /* Make sure the next error will not raise a fatal error. */ - perr->update_next_err = rd_true; - } - - if (perr->err == RD_KAFKA_RESP_ERR__TIMED_OUT || - perr->err == RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE) { - /* Translate request-level timeout error code - * to message-level timeout error code. */ - perr->err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT; - - } else if (perr->err == RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED) { - /* If we're no longer authorized to access the topic mark - * it as errored to deny further produce requests. */ - rd_kafka_topic_wrlock(rktp->rktp_rkt); - rd_kafka_topic_set_error(rktp->rktp_rkt, perr->err); - rd_kafka_topic_wrunlock(rktp->rktp_rkt); - } - - return 1; -} - -/** - * @brief Handle ProduceResponse success for idempotent producer - * - * @warning May be called on the old leader thread. Lock rktp appropriately! - * - * @locks none - * @locality broker thread (but not necessarily the leader broker thread) - */ -static void -rd_kafka_handle_idempotent_Produce_success(rd_kafka_broker_t *rkb, - rd_kafka_msgbatch_t *batch, - int32_t next_seq) { - rd_kafka_t *rk = rkb->rkb_rk; - rd_kafka_toppar_t *rktp = batch->rktp; - char fatal_err[512]; - uint64_t first_msgid, last_msgid; - - *fatal_err = '\0'; - - first_msgid = rd_kafka_msgq_first(&batch->msgq)->rkm_u.producer.msgid; - last_msgid = rd_kafka_msgq_last(&batch->msgq)->rkm_u.producer.msgid; - - rd_kafka_toppar_lock(rktp); - - /* If the last acked msgid is higher than - * the next message to (re)transmit in the message queue - * it means a previous series of R1,R2 ProduceRequests - * had R1 fail with uncertain persistence status, - * such as timeout or transport error, but R2 succeeded, - * which means the messages in R1 were in fact persisted. - * In this case trigger delivery reports for all messages - * in queue until we hit a non-acked message msgid. */ - if (unlikely(rktp->rktp_eos.acked_msgid < first_msgid - 1)) { - rd_kafka_dr_implicit_ack(rkb, rktp, last_msgid); - - } else if (unlikely(batch->first_seq != rktp->rktp_eos.next_ack_seq && - batch->first_seq == rktp->rktp_eos.next_err_seq)) { - /* Response ordering is typically not a concern - * (but will not happen with current broker versions), - * unless we're expecting an error to be returned at - * this sequence rather than a success ack, in which - * case raise a fatal error. */ - - /* Can't call set_fatal_error() while - * holding the toppar lock, so construct - * the error string here and call - * set_fatal_error() below after - * toppar lock has been released. */ - rd_snprintf(fatal_err, sizeof(fatal_err), - "ProduceRequest for %.*s [%" PRId32 - "] " - "with %d message(s) " - "succeeded when expecting failure " - "(broker %" PRId32 - " %s, " - "base seq %" PRId32 - ", " - "next ack seq %" PRId32 - ", " - "next err seq %" PRId32 - ": " - "unable to retry without risking " - "duplication/reordering", - RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), - rktp->rktp_partition, - rd_kafka_msgq_len(&batch->msgq), rkb->rkb_nodeid, - rd_kafka_pid2str(batch->pid), batch->first_seq, - rktp->rktp_eos.next_ack_seq, - rktp->rktp_eos.next_err_seq); - - rktp->rktp_eos.next_err_seq = next_seq; - } - - if (likely(!*fatal_err)) { - /* Advance next expected err and/or ack sequence */ - - /* Only step err seq if it hasn't diverged. */ - if (rktp->rktp_eos.next_err_seq == rktp->rktp_eos.next_ack_seq) - rktp->rktp_eos.next_err_seq = next_seq; - - rktp->rktp_eos.next_ack_seq = next_seq; - } - - /* Store the last acked message sequence, - * since retries within the broker cache window (5 requests) - * will succeed for older messages we must only update the - * acked msgid if it is higher than the last acked. */ - if (last_msgid > rktp->rktp_eos.acked_msgid) - rktp->rktp_eos.acked_msgid = last_msgid; - - rd_kafka_toppar_unlock(rktp); - - /* Must call set_fatal_error() after releasing - * the toppar lock. */ - if (unlikely(*fatal_err)) - rd_kafka_idemp_set_fatal_error( - rk, RD_KAFKA_RESP_ERR__INCONSISTENT, "%s", fatal_err); -} - - -/** - * @brief Handle ProduceRequest result for a message batch. - * - * @warning \p request may be NULL. - * - * @localiy broker thread (but not necessarily the toppar's handler thread) - * @locks none - */ -static void rd_kafka_msgbatch_handle_Produce_result( - rd_kafka_broker_t *rkb, - rd_kafka_msgbatch_t *batch, - rd_kafka_resp_err_t err, - const struct rd_kafka_Produce_result *presult, - const rd_kafka_buf_t *request) { - - rd_kafka_t *rk = rkb->rkb_rk; - rd_kafka_toppar_t *rktp = batch->rktp; - rd_kafka_msg_status_t status = RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; - rd_bool_t last_inflight; - int32_t next_seq; - - /* Decrease partition's messages in-flight counter */ - rd_assert(rd_atomic32_get(&rktp->rktp_msgs_inflight) >= - rd_kafka_msgq_len(&batch->msgq)); - last_inflight = !rd_atomic32_sub(&rktp->rktp_msgs_inflight, - rd_kafka_msgq_len(&batch->msgq)); - - /* Next expected sequence (and handle wrap) */ - next_seq = rd_kafka_seq_wrap(batch->first_seq + - rd_kafka_msgq_len(&batch->msgq)); - - if (likely(!err)) { - rd_rkb_dbg(rkb, MSG, "MSGSET", - "%s [%" PRId32 - "]: MessageSet with %i message(s) " - "(MsgId %" PRIu64 ", BaseSeq %" PRId32 ") delivered", - rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition, - rd_kafka_msgq_len(&batch->msgq), batch->first_msgid, - batch->first_seq); - - if (rktp->rktp_rkt->rkt_conf.required_acks != 0) - status = RD_KAFKA_MSG_STATUS_PERSISTED; - - if (rd_kafka_is_idempotent(rk)) - rd_kafka_handle_idempotent_Produce_success(rkb, batch, - next_seq); - } else { - /* Error handling */ - struct rd_kafka_Produce_err perr = { - .err = err, - .incr_retry = 1, - .status = status, - .update_next_ack = rd_true, - .update_next_err = rd_true, - .last_seq = (batch->first_seq + - rd_kafka_msgq_len(&batch->msgq) - 1)}; - - rd_kafka_handle_Produce_error(rkb, request, batch, &perr); - - /* Update next expected acked and/or err sequence. */ - if (perr.update_next_ack || perr.update_next_err) { - rd_kafka_toppar_lock(rktp); - if (perr.update_next_ack) - rktp->rktp_eos.next_ack_seq = next_seq; - if (perr.update_next_err) - rktp->rktp_eos.next_err_seq = next_seq; - rd_kafka_toppar_unlock(rktp); - } - - err = perr.err; - status = perr.status; - } - - - /* Messages to retry will have been removed from the request's queue */ - if (likely(rd_kafka_msgq_len(&batch->msgq) > 0)) { - /* Set offset, timestamp and status for each message. */ - rd_kafka_msgq_set_metadata(&batch->msgq, rkb->rkb_nodeid, - presult->offset, presult->timestamp, - status); - - /* Enqueue messages for delivery report. */ - rd_kafka_dr_msgq(rktp->rktp_rkt, &batch->msgq, err); - } - - if (rd_kafka_is_idempotent(rk) && last_inflight) - rd_kafka_idemp_inflight_toppar_sub(rk, rktp); -} - - -/** - * @brief Handle ProduceResponse - * - * @param reply is NULL when `acks=0` and on various local errors. - * - * @remark ProduceRequests are never retried, retriable errors are - * instead handled by re-enqueuing the request's messages back - * on the partition queue to have a new ProduceRequest constructed - * eventually. - * - * @warning May be called on the old leader thread. Lock rktp appropriately! - * - * @locality broker thread (but not necessarily the leader broker thread) - */ -static void rd_kafka_handle_Produce(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *reply, - rd_kafka_buf_t *request, - void *opaque) { - rd_kafka_msgbatch_t *batch = &request->rkbuf_batch; - rd_kafka_toppar_t *rktp = batch->rktp; - struct rd_kafka_Produce_result result = { - .offset = RD_KAFKA_OFFSET_INVALID, .timestamp = -1}; - - /* Unit test interface: inject errors */ - if (unlikely(rk->rk_conf.ut.handle_ProduceResponse != NULL)) { - err = rk->rk_conf.ut.handle_ProduceResponse( - rkb->rkb_rk, rkb->rkb_nodeid, batch->first_msgid, err); - } - - /* Parse Produce reply (unless the request errored) */ - if (!err && reply) - err = rd_kafka_handle_Produce_parse(rkb, rktp, reply, request, - &result); - - rd_kafka_msgbatch_handle_Produce_result(rkb, batch, err, &result, - request); -} - - -/** - * @brief Send ProduceRequest for messages in toppar queue. - * - * @returns the number of messages included, or 0 on error / no messages. - * - * @locality broker thread - */ -int rd_kafka_ProduceRequest(rd_kafka_broker_t *rkb, - rd_kafka_toppar_t *rktp, - const rd_kafka_pid_t pid, - uint64_t epoch_base_msgid) { - rd_kafka_buf_t *rkbuf; - rd_kafka_topic_t *rkt = rktp->rktp_rkt; - size_t MessageSetSize = 0; - int cnt; - rd_ts_t now; - int64_t first_msg_timeout; - int tmout; - - /** - * Create ProduceRequest with as many messages from the toppar - * transmit queue as possible. - */ - rkbuf = rd_kafka_msgset_create_ProduceRequest( - rkb, rktp, &rktp->rktp_xmit_msgq, pid, epoch_base_msgid, - &MessageSetSize); - if (unlikely(!rkbuf)) - return 0; - - cnt = rd_kafka_msgq_len(&rkbuf->rkbuf_batch.msgq); - rd_dassert(cnt > 0); - - rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchcnt, (int64_t)cnt); - rd_avg_add(&rktp->rktp_rkt->rkt_avg_batchsize, (int64_t)MessageSetSize); - - if (!rkt->rkt_conf.required_acks) - rkbuf->rkbuf_flags |= RD_KAFKA_OP_F_NO_RESPONSE; - - /* Use timeout from first message in batch */ - now = rd_clock(); - first_msg_timeout = - (rd_kafka_msgq_first(&rkbuf->rkbuf_batch.msgq)->rkm_ts_timeout - - now) / - 1000; - - if (unlikely(first_msg_timeout <= 0)) { - /* Message has already timed out, allow 100 ms - * to produce anyway */ - tmout = 100; - } else { - tmout = (int)RD_MIN(INT_MAX, first_msg_timeout); - } - - /* Set absolute timeout (including retries), the - * effective timeout for this specific request will be - * capped by socket.timeout.ms */ - rd_kafka_buf_set_abs_timeout(rkbuf, tmout, now); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, RD_KAFKA_NO_REPLYQ, - rd_kafka_handle_Produce, NULL); - - return cnt; -} - - -/** - * @brief Construct and send CreateTopicsRequest to \p rkb - * with the topics (NewTopic_t*) in \p new_topics, using - * \p options. - * - * The response (unparsed) will be enqueued on \p replyq - * for handling by \p resp_cb (with \p opaque passed). - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code and errstr will be - * updated with a human readable error string. - */ -rd_kafka_resp_err_t -rd_kafka_CreateTopicsRequest(rd_kafka_broker_t *rkb, - const rd_list_t *new_topics /*(NewTopic_t*)*/, - rd_kafka_AdminOptions_t *options, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - int features; - int i = 0; - rd_kafka_NewTopic_t *newt; - int op_timeout; - - if (rd_list_cnt(new_topics) == 0) { - rd_snprintf(errstr, errstr_size, "No topics to create"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__INVALID_ARG; - } - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_CreateTopics, 0, 4, &features); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "Topic Admin API (KIP-4) not supported " - "by broker, requires broker version >= 0.10.2.0"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - if (rd_kafka_confval_get_int(&options->validate_only) && - ApiVersion < 1) { - rd_snprintf(errstr, errstr_size, - "CreateTopics.validate_only=true not " - "supported by broker"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreateTopics, 1, - 4 + (rd_list_cnt(new_topics) * 200) + - 4 + 1); - - /* #topics */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_topics)); - - while ((newt = rd_list_elem(new_topics, i++))) { - int partition; - int ei = 0; - const rd_kafka_ConfigEntry_t *entry; - - if (ApiVersion < 4) { - if (newt->num_partitions == -1) { - rd_snprintf(errstr, errstr_size, - "Default partition count (KIP-464) " - "not supported by broker, " - "requires broker version <= 2.4.0"); - rd_kafka_replyq_destroy(&replyq); - rd_kafka_buf_destroy(rkbuf); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - if (newt->replication_factor == -1 && - rd_list_empty(&newt->replicas)) { - rd_snprintf(errstr, errstr_size, - "Default replication factor " - "(KIP-464) " - "not supported by broker, " - "requires broker version <= 2.4.0"); - rd_kafka_replyq_destroy(&replyq); - rd_kafka_buf_destroy(rkbuf); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - } - - /* topic */ - rd_kafka_buf_write_str(rkbuf, newt->topic, -1); - - if (rd_list_cnt(&newt->replicas)) { - /* num_partitions and replication_factor must be - * set to -1 if a replica assignment is sent. */ - /* num_partitions */ - rd_kafka_buf_write_i32(rkbuf, -1); - /* replication_factor */ - rd_kafka_buf_write_i16(rkbuf, -1); - } else { - /* num_partitions */ - rd_kafka_buf_write_i32(rkbuf, newt->num_partitions); - /* replication_factor */ - rd_kafka_buf_write_i16( - rkbuf, (int16_t)newt->replication_factor); - } - - /* #replica_assignment */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->replicas)); - - /* Replicas per partition, see rdkafka_admin.[ch] - * for how these are constructed. */ - for (partition = 0; partition < rd_list_cnt(&newt->replicas); - partition++) { - const rd_list_t *replicas; - int ri = 0; - - replicas = rd_list_elem(&newt->replicas, partition); - if (!replicas) - continue; - - /* partition */ - rd_kafka_buf_write_i32(rkbuf, partition); - /* #replicas */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(replicas)); - - for (ri = 0; ri < rd_list_cnt(replicas); ri++) { - /* replica */ - rd_kafka_buf_write_i32( - rkbuf, rd_list_get_int32(replicas, ri)); - } - } - - /* #config_entries */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&newt->config)); - - RD_LIST_FOREACH(entry, &newt->config, ei) { - /* config_name */ - rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1); - /* config_value (nullable) */ - rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1); - } - } - - /* timeout */ - op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); - rd_kafka_buf_write_i32(rkbuf, op_timeout); - - if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) - rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); - - if (ApiVersion >= 1) { - /* validate_only */ - rd_kafka_buf_write_i8( - rkbuf, rd_kafka_confval_get_int(&options->validate_only)); - } - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -/** - * @brief Construct and send DeleteTopicsRequest to \p rkb - * with the topics (DeleteTopic_t *) in \p del_topics, using - * \p options. - * - * The response (unparsed) will be enqueued on \p replyq - * for handling by \p resp_cb (with \p opaque passed). - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code and errstr will be - * updated with a human readable error string. - */ -rd_kafka_resp_err_t -rd_kafka_DeleteTopicsRequest(rd_kafka_broker_t *rkb, - const rd_list_t *del_topics /*(DeleteTopic_t*)*/, - rd_kafka_AdminOptions_t *options, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - int features; - int i = 0; - rd_kafka_DeleteTopic_t *delt; - int op_timeout; - - if (rd_list_cnt(del_topics) == 0) { - rd_snprintf(errstr, errstr_size, "No topics to delete"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__INVALID_ARG; - } - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_DeleteTopics, 0, 1, &features); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "Topic Admin API (KIP-4) not supported " - "by broker, requires broker version >= 0.10.2.0"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - rkbuf = - rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteTopics, 1, - /* FIXME */ - 4 + (rd_list_cnt(del_topics) * 100) + 4); - - /* #topics */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(del_topics)); - - while ((delt = rd_list_elem(del_topics, i++))) - rd_kafka_buf_write_str(rkbuf, delt->topic, -1); - - /* timeout */ - op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); - rd_kafka_buf_write_i32(rkbuf, op_timeout); - - if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) - rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -/** - * @brief Construct and send DeleteRecordsRequest to \p rkb - * with the offsets to delete (rd_kafka_topic_partition_list_t *) in - * \p offsets_list, using \p options. - * - * The response (unparsed) will be enqueued on \p replyq - * for handling by \p resp_cb (with \p opaque passed). - * - * @remark The rd_kafka_topic_partition_list_t in \p offsets_list must already - * be sorted. - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code and errstr will be - * updated with a human readable error string. - */ -rd_kafka_resp_err_t -rd_kafka_DeleteRecordsRequest(rd_kafka_broker_t *rkb, - /*(rd_kafka_topic_partition_list_t*)*/ - const rd_list_t *offsets_list, - rd_kafka_AdminOptions_t *options, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - int features; - const rd_kafka_topic_partition_list_t *partitions; - int op_timeout; - - partitions = rd_list_elem(offsets_list, 0); - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_DeleteRecords, 0, 1, &features); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "DeleteRecords Admin API (KIP-107) not supported " - "by broker, requires broker version >= 0.11.0"); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteRecords, 1, - 4 + (partitions->cnt * 100) + 4); - - const rd_kafka_topic_partition_field_t fields[] = { - RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, - RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET, - RD_KAFKA_TOPIC_PARTITION_FIELD_END}; - rd_kafka_buf_write_topic_partitions( - rkbuf, partitions, rd_false /*don't skip invalid offsets*/, - rd_false /*any offset*/, fields); - - /* timeout */ - op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); - rd_kafka_buf_write_i32(rkbuf, op_timeout); - - if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) - rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -/** - * @brief Construct and send CreatePartitionsRequest to \p rkb - * with the topics (NewPartitions_t*) in \p new_parts, using - * \p options. - * - * The response (unparsed) will be enqueued on \p replyq - * for handling by \p resp_cb (with \p opaque passed). - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code and errstr will be - * updated with a human readable error string. - */ -rd_kafka_resp_err_t -rd_kafka_CreatePartitionsRequest(rd_kafka_broker_t *rkb, - /*(NewPartitions_t*)*/ - const rd_list_t *new_parts, - rd_kafka_AdminOptions_t *options, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - int i = 0; - rd_kafka_NewPartitions_t *newp; - int op_timeout; - - if (rd_list_cnt(new_parts) == 0) { - rd_snprintf(errstr, errstr_size, "No partitions to create"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__INVALID_ARG; - } - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_CreatePartitions, 0, 0, NULL); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "CreatePartitions (KIP-195) not supported " - "by broker, requires broker version >= 1.0.0"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreatePartitions, 1, - 4 + (rd_list_cnt(new_parts) * 200) + - 4 + 1); - - /* #topics */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_parts)); - - while ((newp = rd_list_elem(new_parts, i++))) { - /* topic */ - rd_kafka_buf_write_str(rkbuf, newp->topic, -1); - - /* New partition count */ - rd_kafka_buf_write_i32(rkbuf, (int32_t)newp->total_cnt); - - /* #replica_assignment */ - if (rd_list_empty(&newp->replicas)) { - rd_kafka_buf_write_i32(rkbuf, -1); - } else { - const rd_list_t *replicas; - int pi = -1; - - rd_kafka_buf_write_i32(rkbuf, - rd_list_cnt(&newp->replicas)); - - while ( - (replicas = rd_list_elem(&newp->replicas, ++pi))) { - int ri = 0; - - /* replica count */ - rd_kafka_buf_write_i32(rkbuf, - rd_list_cnt(replicas)); - - /* replica */ - for (ri = 0; ri < rd_list_cnt(replicas); ri++) { - rd_kafka_buf_write_i32( - rkbuf, - rd_list_get_int32(replicas, ri)); - } - } - } - } - - /* timeout */ - op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); - rd_kafka_buf_write_i32(rkbuf, op_timeout); - - if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) - rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); - - /* validate_only */ - rd_kafka_buf_write_i8( - rkbuf, rd_kafka_confval_get_int(&options->validate_only)); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -/** - * @brief Construct and send AlterConfigsRequest to \p rkb - * with the configs (ConfigResource_t*) in \p configs, using - * \p options. - * - * The response (unparsed) will be enqueued on \p replyq - * for handling by \p resp_cb (with \p opaque passed). - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code and errstr will be - * updated with a human readable error string. - */ -rd_kafka_resp_err_t -rd_kafka_AlterConfigsRequest(rd_kafka_broker_t *rkb, - const rd_list_t *configs /*(ConfigResource_t*)*/, - rd_kafka_AdminOptions_t *options, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - int i; - const rd_kafka_ConfigResource_t *config; - int op_timeout; - - if (rd_list_cnt(configs) == 0) { - rd_snprintf(errstr, errstr_size, - "No config resources specified"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__INVALID_ARG; - } - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_AlterConfigs, 0, 1, NULL); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "AlterConfigs (KIP-133) not supported " - "by broker, requires broker version >= 0.11.0"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - /* Incremental requires IncrementalAlterConfigs */ - if (rd_kafka_confval_get_int(&options->incremental)) { - rd_snprintf(errstr, errstr_size, - "AlterConfigs.incremental=true (KIP-248) " - "not supported by broker, " - "replaced by IncrementalAlterConfigs"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_AlterConfigs, 1, - rd_list_cnt(configs) * 200); - - /* #resources */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs)); - - RD_LIST_FOREACH(config, configs, i) { - const rd_kafka_ConfigEntry_t *entry; - int ei; - - /* resource_type */ - rd_kafka_buf_write_i8(rkbuf, config->restype); - - /* resource_name */ - rd_kafka_buf_write_str(rkbuf, config->name, -1); - - /* #config */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(&config->config)); - - RD_LIST_FOREACH(entry, &config->config, ei) { - /* config_name */ - rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1); - /* config_value (nullable) */ - rd_kafka_buf_write_str(rkbuf, entry->kv->value, -1); - - if (entry->a.operation != RD_KAFKA_ALTER_OP_SET) { - rd_snprintf(errstr, errstr_size, - "IncrementalAlterConfigs required " - "for add/delete config " - "entries: only set supported " - "by this operation"); - rd_kafka_buf_destroy(rkbuf); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - } - } - - /* timeout */ - op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); - if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) - rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); - - /* validate_only */ - rd_kafka_buf_write_i8( - rkbuf, rd_kafka_confval_get_int(&options->validate_only)); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -/** - * @brief Construct and send DescribeConfigsRequest to \p rkb - * with the configs (ConfigResource_t*) in \p configs, using - * \p options. - * - * The response (unparsed) will be enqueued on \p replyq - * for handling by \p resp_cb (with \p opaque passed). - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code and errstr will be - * updated with a human readable error string. - */ -rd_kafka_resp_err_t rd_kafka_DescribeConfigsRequest( - rd_kafka_broker_t *rkb, - const rd_list_t *configs /*(ConfigResource_t*)*/, - rd_kafka_AdminOptions_t *options, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - int i; - const rd_kafka_ConfigResource_t *config; - int op_timeout; - - if (rd_list_cnt(configs) == 0) { - rd_snprintf(errstr, errstr_size, - "No config resources specified"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__INVALID_ARG; - } - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_DescribeConfigs, 0, 1, NULL); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "DescribeConfigs (KIP-133) not supported " - "by broker, requires broker version >= 0.11.0"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DescribeConfigs, 1, - rd_list_cnt(configs) * 200); - - /* #resources */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(configs)); - - RD_LIST_FOREACH(config, configs, i) { - const rd_kafka_ConfigEntry_t *entry; - int ei; - - /* resource_type */ - rd_kafka_buf_write_i8(rkbuf, config->restype); - - /* resource_name */ - rd_kafka_buf_write_str(rkbuf, config->name, -1); - - /* #config */ - if (rd_list_empty(&config->config)) { - /* Get all configs */ - rd_kafka_buf_write_i32(rkbuf, -1); - } else { - /* Get requested configs only */ - rd_kafka_buf_write_i32(rkbuf, - rd_list_cnt(&config->config)); - } - - RD_LIST_FOREACH(entry, &config->config, ei) { - /* config_name */ - rd_kafka_buf_write_str(rkbuf, entry->kv->name, -1); - } - } - - - if (ApiVersion == 1) { - /* include_synonyms */ - rd_kafka_buf_write_i8(rkbuf, 1); - } - - /* timeout */ - op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); - if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) - rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -/** - * @brief Construct and send DeleteGroupsRequest to \p rkb - * with the groups (DeleteGroup_t *) in \p del_groups, using - * \p options. - * - * The response (unparsed) will be enqueued on \p replyq - * for handling by \p resp_cb (with \p opaque passed). - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code and errstr will be - * updated with a human readable error string. - */ -rd_kafka_resp_err_t -rd_kafka_DeleteGroupsRequest(rd_kafka_broker_t *rkb, - const rd_list_t *del_groups /*(DeleteGroup_t*)*/, - rd_kafka_AdminOptions_t *options, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - int features; - int i = 0; - rd_kafka_DeleteGroup_t *delt; - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_DeleteGroups, 0, 1, &features); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "DeleteGroups Admin API (KIP-229) not supported " - "by broker, requires broker version >= 1.1.0"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - rkbuf = - rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteGroups, 1, - 4 + (rd_list_cnt(del_groups) * 100) + 4); - - /* #groups */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(del_groups)); - - while ((delt = rd_list_elem(del_groups, i++))) - rd_kafka_buf_write_str(rkbuf, delt->group, -1); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - -/** - * @brief Returns the request size needed to send a specific AclBinding - * specified in \p acl, using the ApiVersion provided in - * \p ApiVersion. - * - * @returns and int16_t with the request size in bytes. - */ -static RD_INLINE size_t -rd_kafka_AclBinding_request_size(const rd_kafka_AclBinding_t *acl, - int ApiVersion) { - return 1 + 2 + (acl->name ? strlen(acl->name) : 0) + 2 + - (acl->principal ? strlen(acl->principal) : 0) + 2 + - (acl->host ? strlen(acl->host) : 0) + 1 + 1 + - (ApiVersion > 0 ? 1 : 0); -} - -/** - * @brief Construct and send CreateAclsRequest to \p rkb - * with the acls (AclBinding_t*) in \p new_acls, using - * \p options. - * - * The response (unparsed) will be enqueued on \p replyq - * for handling by \p resp_cb (with \p opaque passed). - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code and errstr will be - * updated with a human readable error string. - */ -rd_kafka_resp_err_t -rd_kafka_CreateAclsRequest(rd_kafka_broker_t *rkb, - const rd_list_t *new_acls /*(AclBinding_t*)*/, - rd_kafka_AdminOptions_t *options, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion; - int i; - size_t len; - int op_timeout; - rd_kafka_AclBinding_t *new_acl; - - if (rd_list_cnt(new_acls) == 0) { - rd_snprintf(errstr, errstr_size, "No acls to create"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__INVALID_ARG; - } - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_CreateAcls, 0, 1, NULL); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "ACLs Admin API (KIP-140) not supported " - "by broker, requires broker version >= 0.11.0.0"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - if (ApiVersion == 0) { - RD_LIST_FOREACH(new_acl, new_acls, i) { - if (new_acl->resource_pattern_type != - RD_KAFKA_RESOURCE_PATTERN_LITERAL) { - rd_snprintf(errstr, errstr_size, - "Broker only supports LITERAL " - "resource pattern types"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - } - } else { - RD_LIST_FOREACH(new_acl, new_acls, i) { - if (new_acl->resource_pattern_type != - RD_KAFKA_RESOURCE_PATTERN_LITERAL && - new_acl->resource_pattern_type != - RD_KAFKA_RESOURCE_PATTERN_PREFIXED) { - rd_snprintf(errstr, errstr_size, - "Only LITERAL and PREFIXED " - "resource patterns are supported " - "when creating ACLs"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - } - } - - len = 4; - RD_LIST_FOREACH(new_acl, new_acls, i) { - len += rd_kafka_AclBinding_request_size(new_acl, ApiVersion); - } - - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_CreateAcls, 1, len); - - /* #acls */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(new_acls)); - - RD_LIST_FOREACH(new_acl, new_acls, i) { - rd_kafka_buf_write_i8(rkbuf, new_acl->restype); - - rd_kafka_buf_write_str(rkbuf, new_acl->name, -1); - - if (ApiVersion >= 1) { - rd_kafka_buf_write_i8(rkbuf, - new_acl->resource_pattern_type); - } - - rd_kafka_buf_write_str(rkbuf, new_acl->principal, -1); - - rd_kafka_buf_write_str(rkbuf, new_acl->host, -1); - - rd_kafka_buf_write_i8(rkbuf, new_acl->operation); - - rd_kafka_buf_write_i8(rkbuf, new_acl->permission_type); - } - - /* timeout */ - op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); - if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) - rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - -/** - * @brief Construct and send DescribeAclsRequest to \p rkb - * with the acls (AclBinding_t*) in \p acls, using - * \p options. - * - * The response (unparsed) will be enqueued on \p replyq - * for handling by \p resp_cb (with \p opaque passed). - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code and errstr will be - * updated with a human readable error string. - */ -rd_kafka_resp_err_t rd_kafka_DescribeAclsRequest( - rd_kafka_broker_t *rkb, - const rd_list_t *acls /*(rd_kafka_AclBindingFilter_t*)*/, - rd_kafka_AdminOptions_t *options, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - const rd_kafka_AclBindingFilter_t *acl; - int op_timeout; - - if (rd_list_cnt(acls) == 0) { - rd_snprintf(errstr, errstr_size, - "No acl binding filters specified"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__INVALID_ARG; - } - if (rd_list_cnt(acls) > 1) { - rd_snprintf(errstr, errstr_size, - "Too many acl binding filters specified"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__INVALID_ARG; - } - - acl = rd_list_elem(acls, 0); - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_DescribeAcls, 0, 1, NULL); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "ACLs Admin API (KIP-140) not supported " - "by broker, requires broker version >= 0.11.0.0"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - if (ApiVersion == 0) { - if (acl->resource_pattern_type != - RD_KAFKA_RESOURCE_PATTERN_LITERAL && - acl->resource_pattern_type != - RD_KAFKA_RESOURCE_PATTERN_ANY) { - rd_snprintf(errstr, errstr_size, - "Broker only supports LITERAL and ANY " - "resource pattern types"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - } else { - if (acl->resource_pattern_type == - RD_KAFKA_RESOURCE_PATTERN_UNKNOWN) { - rd_snprintf(errstr, errstr_size, - "Filter contains UNKNOWN elements"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - } - - rkbuf = rd_kafka_buf_new_request( - rkb, RD_KAFKAP_DescribeAcls, 1, - rd_kafka_AclBinding_request_size(acl, ApiVersion)); - - /* resource_type */ - rd_kafka_buf_write_i8(rkbuf, acl->restype); - - /* resource_name filter */ - rd_kafka_buf_write_str(rkbuf, acl->name, -1); - - if (ApiVersion > 0) { - /* resource_pattern_type (rd_kafka_ResourcePatternType_t) */ - rd_kafka_buf_write_i8(rkbuf, acl->resource_pattern_type); - } - - /* principal filter */ - rd_kafka_buf_write_str(rkbuf, acl->principal, -1); - - /* host filter */ - rd_kafka_buf_write_str(rkbuf, acl->host, -1); - - /* operation (rd_kafka_AclOperation_t) */ - rd_kafka_buf_write_i8(rkbuf, acl->operation); - - /* permission type (rd_kafka_AclPermissionType_t) */ - rd_kafka_buf_write_i8(rkbuf, acl->permission_type); - - /* timeout */ - op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); - if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) - rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - -/** - * @brief Construct and send DeleteAclsRequest to \p rkb - * with the acl filters (AclBindingFilter_t*) in \p del_acls, using - * \p options. - * - * The response (unparsed) will be enqueued on \p replyq - * for handling by \p resp_cb (with \p opaque passed). - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code and errstr will be - * updated with a human readable error string. - */ -rd_kafka_resp_err_t -rd_kafka_DeleteAclsRequest(rd_kafka_broker_t *rkb, - const rd_list_t *del_acls /*(AclBindingFilter_t*)*/, - rd_kafka_AdminOptions_t *options, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - const rd_kafka_AclBindingFilter_t *acl; - int op_timeout; - int i; - size_t len; - - if (rd_list_cnt(del_acls) == 0) { - rd_snprintf(errstr, errstr_size, - "No acl binding filters specified"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__INVALID_ARG; - } - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_DeleteAcls, 0, 1, NULL); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "ACLs Admin API (KIP-140) not supported " - "by broker, requires broker version >= 0.11.0.0"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - len = 4; - - RD_LIST_FOREACH(acl, del_acls, i) { - if (ApiVersion == 0) { - if (acl->resource_pattern_type != - RD_KAFKA_RESOURCE_PATTERN_LITERAL && - acl->resource_pattern_type != - RD_KAFKA_RESOURCE_PATTERN_ANY) { - rd_snprintf(errstr, errstr_size, - "Broker only supports LITERAL " - "and ANY resource pattern types"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - } else { - if (acl->resource_pattern_type == - RD_KAFKA_RESOURCE_PATTERN_UNKNOWN) { - rd_snprintf(errstr, errstr_size, - "Filter contains UNKNOWN elements"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - } - - len += rd_kafka_AclBinding_request_size(acl, ApiVersion); - } - - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_DeleteAcls, 1, len); - - /* #acls */ - rd_kafka_buf_write_i32(rkbuf, rd_list_cnt(del_acls)); - - RD_LIST_FOREACH(acl, del_acls, i) { - /* resource_type */ - rd_kafka_buf_write_i8(rkbuf, acl->restype); - - /* resource_name filter */ - rd_kafka_buf_write_str(rkbuf, acl->name, -1); - - if (ApiVersion > 0) { - /* resource_pattern_type - * (rd_kafka_ResourcePatternType_t) */ - rd_kafka_buf_write_i8(rkbuf, - acl->resource_pattern_type); - } - - /* principal filter */ - rd_kafka_buf_write_str(rkbuf, acl->principal, -1); - - /* host filter */ - rd_kafka_buf_write_str(rkbuf, acl->host, -1); - - /* operation (rd_kafka_AclOperation_t) */ - rd_kafka_buf_write_i8(rkbuf, acl->operation); - - /* permission type (rd_kafka_AclPermissionType_t) */ - rd_kafka_buf_write_i8(rkbuf, acl->permission_type); - } - - /* timeout */ - op_timeout = rd_kafka_confval_get_int(&options->operation_timeout); - if (op_timeout > rkb->rkb_rk->rk_conf.socket_timeout_ms) - rd_kafka_buf_set_abs_timeout(rkbuf, op_timeout + 1000, 0); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - -/** - * @brief Parses and handles an InitProducerId reply. - * - * @locality rdkafka main thread - * @locks none - */ -void rd_kafka_handle_InitProducerId(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - rd_kafka_resp_err_t err, - rd_kafka_buf_t *rkbuf, - rd_kafka_buf_t *request, - void *opaque) { - const int log_decode_errors = LOG_ERR; - int16_t error_code; - rd_kafka_pid_t pid; - - if (err) - goto err; - - rd_kafka_buf_read_throttle_time(rkbuf); - - rd_kafka_buf_read_i16(rkbuf, &error_code); - if ((err = error_code)) - goto err; - - rd_kafka_buf_read_i64(rkbuf, &pid.id); - rd_kafka_buf_read_i16(rkbuf, &pid.epoch); - - rd_kafka_idemp_pid_update(rkb, pid); - - return; - -err_parse: - err = rkbuf->rkbuf_err; -err: - if (err == RD_KAFKA_RESP_ERR__DESTROY) - return; - - /* Retries are performed by idempotence state handler */ - rd_kafka_idemp_request_pid_failed(rkb, err); -} - -/** - * @brief Construct and send InitProducerIdRequest to \p rkb. - * - * @param transactional_id may be NULL. - * @param transaction_timeout_ms may be set to -1. - * @param current_pid the current PID to reset, requires KIP-360. If not NULL - * and KIP-360 is not supported by the broker this function - * will return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE. - * - * The response (unparsed) will be handled by \p resp_cb served - * by queue \p replyq. - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code and errstr will be - * updated with a human readable error string. - */ -rd_kafka_resp_err_t -rd_kafka_InitProducerIdRequest(rd_kafka_broker_t *rkb, - const char *transactional_id, - int transaction_timeout_ms, - const rd_kafka_pid_t *current_pid, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion; - - if (current_pid) { - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_InitProducerId, 3, 4, NULL); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "InitProducerId (KIP-360) not supported by " - "broker, requires broker version >= 2.5.0: " - "unable to recover from previous " - "transactional error"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - } else { - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_InitProducerId, 0, 4, NULL); - - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "InitProducerId (KIP-98) not supported by " - "broker, requires broker " - "version >= 0.11.0"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - } - - rkbuf = rd_kafka_buf_new_flexver_request( - rkb, RD_KAFKAP_InitProducerId, 1, - 2 + (transactional_id ? strlen(transactional_id) : 0) + 4 + 8 + 4, - ApiVersion >= 2 /*flexver*/); - - /* transactional_id */ - rd_kafka_buf_write_str(rkbuf, transactional_id, -1); - - /* transaction_timeout_ms */ - rd_kafka_buf_write_i32(rkbuf, transaction_timeout_ms); - - if (ApiVersion >= 3) { - /* Current PID */ - rd_kafka_buf_write_i64(rkbuf, - current_pid ? current_pid->id : -1); - /* Current Epoch */ - rd_kafka_buf_write_i16(rkbuf, - current_pid ? current_pid->epoch : -1); - } - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - /* Let the idempotence state handler perform retries */ - rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -/** - * @brief Construct and send AddPartitionsToTxnRequest to \p rkb. - * - * The response (unparsed) will be handled by \p resp_cb served - * by queue \p replyq. - * - * @param rktps MUST be sorted by topic name. - * - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code. - */ -rd_kafka_resp_err_t -rd_kafka_AddPartitionsToTxnRequest(rd_kafka_broker_t *rkb, - const char *transactional_id, - rd_kafka_pid_t pid, - const rd_kafka_toppar_tqhead_t *rktps, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - rd_kafka_toppar_t *rktp; - rd_kafka_topic_t *last_rkt = NULL; - size_t of_TopicCnt; - ssize_t of_PartCnt = -1; - int TopicCnt = 0, PartCnt = 0; - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_AddPartitionsToTxn, 0, 0, NULL); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "AddPartitionsToTxnRequest (KIP-98) not supported " - "by broker, requires broker version >= 0.11.0"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - rkbuf = - rd_kafka_buf_new_request(rkb, RD_KAFKAP_AddPartitionsToTxn, 1, 500); - - /* transactional_id */ - rd_kafka_buf_write_str(rkbuf, transactional_id, -1); - - /* PID */ - rd_kafka_buf_write_i64(rkbuf, pid.id); - rd_kafka_buf_write_i16(rkbuf, pid.epoch); - - /* Topics/partitions array (count updated later) */ - of_TopicCnt = rd_kafka_buf_write_i32(rkbuf, 0); - - TAILQ_FOREACH(rktp, rktps, rktp_txnlink) { - if (last_rkt != rktp->rktp_rkt) { - - if (last_rkt) { - /* Update last topic's partition count field */ - rd_kafka_buf_update_i32(rkbuf, of_PartCnt, - PartCnt); - of_PartCnt = -1; - } - - /* Topic name */ - rd_kafka_buf_write_kstr(rkbuf, - rktp->rktp_rkt->rkt_topic); - /* Partition count, updated later */ - of_PartCnt = rd_kafka_buf_write_i32(rkbuf, 0); - - PartCnt = 0; - TopicCnt++; - last_rkt = rktp->rktp_rkt; - } - - /* Partition id */ - rd_kafka_buf_write_i32(rkbuf, rktp->rktp_partition); - PartCnt++; - } - - /* Update last partition and topic count fields */ - if (of_PartCnt != -1) - rd_kafka_buf_update_i32(rkbuf, (size_t)of_PartCnt, PartCnt); - rd_kafka_buf_update_i32(rkbuf, of_TopicCnt, TopicCnt); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - /* Let the handler perform retries so that it can pick - * up more added partitions. */ - rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_NO_RETRIES; - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - -/** - * @brief Construct and send AddOffsetsToTxnRequest to \p rkb. - * - * The response (unparsed) will be handled by \p resp_cb served - * by queue \p replyq. - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code. - */ -rd_kafka_resp_err_t -rd_kafka_AddOffsetsToTxnRequest(rd_kafka_broker_t *rkb, - const char *transactional_id, - rd_kafka_pid_t pid, - const char *group_id, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - - ApiVersion = rd_kafka_broker_ApiVersion_supported( - rkb, RD_KAFKAP_AddOffsetsToTxn, 0, 0, NULL); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "AddOffsetsToTxnRequest (KIP-98) not supported " - "by broker, requires broker version >= 0.11.0"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - rkbuf = - rd_kafka_buf_new_request(rkb, RD_KAFKAP_AddOffsetsToTxn, 1, 100); - - /* transactional_id */ - rd_kafka_buf_write_str(rkbuf, transactional_id, -1); - - /* PID */ - rd_kafka_buf_write_i64(rkbuf, pid.id); - rd_kafka_buf_write_i16(rkbuf, pid.epoch); - - /* Group Id */ - rd_kafka_buf_write_str(rkbuf, group_id, -1); - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES; - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - - -/** - * @brief Construct and send EndTxnRequest to \p rkb. - * - * The response (unparsed) will be handled by \p resp_cb served - * by queue \p replyq. - * - * @returns RD_KAFKA_RESP_ERR_NO_ERROR if the request was enqueued for - * transmission, otherwise an error code. - */ -rd_kafka_resp_err_t rd_kafka_EndTxnRequest(rd_kafka_broker_t *rkb, - const char *transactional_id, - rd_kafka_pid_t pid, - rd_bool_t committed, - char *errstr, - size_t errstr_size, - rd_kafka_replyq_t replyq, - rd_kafka_resp_cb_t *resp_cb, - void *opaque) { - rd_kafka_buf_t *rkbuf; - int16_t ApiVersion = 0; - - ApiVersion = rd_kafka_broker_ApiVersion_supported(rkb, RD_KAFKAP_EndTxn, - 0, 1, NULL); - if (ApiVersion == -1) { - rd_snprintf(errstr, errstr_size, - "EndTxnRequest (KIP-98) not supported " - "by broker, requires broker version >= 0.11.0"); - rd_kafka_replyq_destroy(&replyq); - return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; - } - - rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_EndTxn, 1, 500); - - /* transactional_id */ - rd_kafka_buf_write_str(rkbuf, transactional_id, -1); - - /* PID */ - rd_kafka_buf_write_i64(rkbuf, pid.id); - rd_kafka_buf_write_i16(rkbuf, pid.epoch); - - /* Committed */ - rd_kafka_buf_write_bool(rkbuf, committed); - rkbuf->rkbuf_u.EndTxn.commit = committed; - - rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); - - rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES; - - rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - - - -/** - * @name Unit tests - * @{ - * - * - * - * - */ - -/** - * @brief Create \p cnt messages, starting at \p msgid, and add them - * to \p rkmq. - * - * @returns the number of messages added. - */ -static int ut_create_msgs(rd_kafka_msgq_t *rkmq, uint64_t msgid, int cnt) { - int i; - - for (i = 0; i < cnt; i++) { - rd_kafka_msg_t *rkm; - - rkm = ut_rd_kafka_msg_new(0); - rkm->rkm_u.producer.msgid = msgid++; - rkm->rkm_ts_enq = rd_clock(); - rkm->rkm_ts_timeout = rkm->rkm_ts_enq + (900 * 1000 * 1000); - - rd_kafka_msgq_enq(rkmq, rkm); - } - - return cnt; -} - -/** - * @brief Idempotent Producer request/response unit tests - * - * The current test verifies proper handling of the following case: - * Batch 0 succeeds - * Batch 1 fails with temporary error - * Batch 2,3 fails with out of order sequence - * Retry Batch 1-3 should succeed. - */ -static int unittest_idempotent_producer(void) { - rd_kafka_t *rk; - rd_kafka_conf_t *conf; - rd_kafka_broker_t *rkb; -#define _BATCH_CNT 4 -#define _MSGS_PER_BATCH 3 - const int msgcnt = _BATCH_CNT * _MSGS_PER_BATCH; - int remaining_batches; - uint64_t msgid = 1; - rd_kafka_toppar_t *rktp; - rd_kafka_pid_t pid = {.id = 1000, .epoch = 0}; - struct rd_kafka_Produce_result result = {.offset = 1, - .timestamp = 1000}; - rd_kafka_queue_t *rkqu; - rd_kafka_event_t *rkev; - rd_kafka_buf_t *request[_BATCH_CNT]; - int rcnt = 0; - int retry_msg_cnt = 0; - int drcnt = 0; - rd_kafka_msgq_t rkmq = RD_KAFKA_MSGQ_INITIALIZER(rkmq); - const char *tmp; - int i, r; - - RD_UT_SAY("Verifying idempotent producer error handling"); - - conf = rd_kafka_conf_new(); - rd_kafka_conf_set(conf, "batch.num.messages", "3", NULL, 0); - rd_kafka_conf_set(conf, "retry.backoff.ms", "1", NULL, 0); - if ((tmp = rd_getenv("TEST_DEBUG", NULL))) - rd_kafka_conf_set(conf, "debug", tmp, NULL, 0); - if (rd_kafka_conf_set(conf, "enable.idempotence", "true", NULL, 0) != - RD_KAFKA_CONF_OK) - RD_UT_FAIL("Failed to enable idempotence"); - rd_kafka_conf_set_events(conf, RD_KAFKA_EVENT_DR); - - rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0); - RD_UT_ASSERT(rk, "failed to create producer"); - - rkqu = rd_kafka_queue_get_main(rk); - - /* We need a broker handle, use a logical broker to avoid - * any connection attempts. */ - rkb = rd_kafka_broker_add_logical(rk, "unittest"); - - /* Have the broker support everything so msgset_writer selects - * the most up-to-date output features. */ - rd_kafka_broker_lock(rkb); - rkb->rkb_features = RD_KAFKA_FEATURE_UNITTEST | RD_KAFKA_FEATURE_ALL; - rd_kafka_broker_unlock(rkb); - - /* Get toppar */ - rktp = rd_kafka_toppar_get2(rk, "uttopic", 0, rd_false, rd_true); - RD_UT_ASSERT(rktp, "failed to get toppar"); - - /* Set the topic as exists so messages are enqueued on - * the desired rktp away (otherwise UA partition) */ - rd_ut_kafka_topic_set_topic_exists(rktp->rktp_rkt, 1, -1); - - /* Produce messages */ - ut_create_msgs(&rkmq, 1, msgcnt); - - /* Set the pid */ - rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_WAIT_PID); - rd_kafka_idemp_pid_update(rkb, pid); - pid = rd_kafka_idemp_get_pid(rk); - RD_UT_ASSERT(rd_kafka_pid_valid(pid), "PID is invalid"); - rd_kafka_toppar_pid_change(rktp, pid, msgid); - - remaining_batches = _BATCH_CNT; - - /* Create a ProduceRequest for each batch */ - for (rcnt = 0; rcnt < remaining_batches; rcnt++) { - size_t msize; - request[rcnt] = rd_kafka_msgset_create_ProduceRequest( - rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), 0, &msize); - RD_UT_ASSERT(request[rcnt], "request #%d failed", rcnt); - } - - RD_UT_ASSERT(rd_kafka_msgq_len(&rkmq) == 0, - "expected input message queue to be empty, " - "but still has %d message(s)", - rd_kafka_msgq_len(&rkmq)); - - /* - * Mock handling of each request - */ - - /* Batch 0: accepted */ - i = 0; - r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); - RD_UT_ASSERT(r == _MSGS_PER_BATCH, "."); - rd_kafka_msgbatch_handle_Produce_result(rkb, &request[i]->rkbuf_batch, - RD_KAFKA_RESP_ERR_NO_ERROR, - &result, request[i]); - result.offset += r; - RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == 0, - "batch %d: expected no messages in rktp_msgq, not %d", i, - rd_kafka_msgq_len(&rktp->rktp_msgq)); - rd_kafka_buf_destroy(request[i]); - remaining_batches--; - - /* Batch 1: fail, triggering retry (re-enq on rktp_msgq) */ - i = 1; - r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); - RD_UT_ASSERT(r == _MSGS_PER_BATCH, "."); - rd_kafka_msgbatch_handle_Produce_result( - rkb, &request[i]->rkbuf_batch, - RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, &result, request[i]); - retry_msg_cnt += r; - RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, - "batch %d: expected %d messages in rktp_msgq, not %d", i, - retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq)); - rd_kafka_buf_destroy(request[i]); - - /* Batch 2: OUT_OF_ORDER, triggering retry .. */ - i = 2; - r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); - RD_UT_ASSERT(r == _MSGS_PER_BATCH, "."); - rd_kafka_msgbatch_handle_Produce_result( - rkb, &request[i]->rkbuf_batch, - RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, &result, - request[i]); - retry_msg_cnt += r; - RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, - "batch %d: expected %d messages in rktp_xmit_msgq, not %d", - i, retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq)); - rd_kafka_buf_destroy(request[i]); - - /* Batch 3: OUT_OF_ORDER, triggering retry .. */ - i = 3; - r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); - rd_kafka_msgbatch_handle_Produce_result( - rkb, &request[i]->rkbuf_batch, - RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER, &result, - request[i]); - retry_msg_cnt += r; - RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, - "batch %d: expected %d messages in rktp_xmit_msgq, not %d", - i, retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq)); - rd_kafka_buf_destroy(request[i]); - - - /* Retried messages will have been moved to rktp_msgq, - * move them back to our local queue. */ - rd_kafka_toppar_lock(rktp); - rd_kafka_msgq_move(&rkmq, &rktp->rktp_msgq); - rd_kafka_toppar_unlock(rktp); - - RD_UT_ASSERT(rd_kafka_msgq_len(&rkmq) == retry_msg_cnt, - "Expected %d messages in retry queue, not %d", - retry_msg_cnt, rd_kafka_msgq_len(&rkmq)); - - /* Sleep a short while to make sure the retry backoff expires. */ - rd_usleep(5 * 1000, NULL); /* 5ms */ - - /* - * Create requests for remaining batches. - */ - for (rcnt = 0; rcnt < remaining_batches; rcnt++) { - size_t msize; - request[rcnt] = rd_kafka_msgset_create_ProduceRequest( - rkb, rktp, &rkmq, rd_kafka_idemp_get_pid(rk), 0, &msize); - RD_UT_ASSERT(request[rcnt], - "Failed to create retry #%d (%d msgs in queue)", - rcnt, rd_kafka_msgq_len(&rkmq)); - } - - /* - * Mock handling of each request, they will now succeed. - */ - for (i = 0; i < rcnt; i++) { - r = rd_kafka_msgq_len(&request[i]->rkbuf_batch.msgq); - rd_kafka_msgbatch_handle_Produce_result( - rkb, &request[i]->rkbuf_batch, RD_KAFKA_RESP_ERR_NO_ERROR, - &result, request[i]); - result.offset += r; - rd_kafka_buf_destroy(request[i]); - } - - retry_msg_cnt = 0; - RD_UT_ASSERT(rd_kafka_msgq_len(&rktp->rktp_msgq) == retry_msg_cnt, - "batch %d: expected %d messages in rktp_xmit_msgq, not %d", - i, retry_msg_cnt, rd_kafka_msgq_len(&rktp->rktp_msgq)); - - /* - * Wait for delivery reports, they should all be successful. - */ - while ((rkev = rd_kafka_queue_poll(rkqu, 1000))) { - const rd_kafka_message_t *rkmessage; - - RD_UT_SAY("Got %s event with %d message(s)", - rd_kafka_event_name(rkev), - (int)rd_kafka_event_message_count(rkev)); - - while ((rkmessage = rd_kafka_event_message_next(rkev))) { - RD_UT_SAY(" DR for message: %s: (persistence=%d)", - rd_kafka_err2str(rkmessage->err), - rd_kafka_message_status(rkmessage)); - if (rkmessage->err) - RD_UT_WARN(" ^ Should not have failed"); - else - drcnt++; - } - rd_kafka_event_destroy(rkev); - } - - /* Should be no more messages in queues */ - r = rd_kafka_outq_len(rk); - RD_UT_ASSERT(r == 0, "expected outq to return 0, not %d", r); - - /* Verify the expected number of good delivery reports were seen */ - RD_UT_ASSERT(drcnt == msgcnt, "expected %d DRs, not %d", msgcnt, drcnt); - - rd_kafka_queue_destroy(rkqu); - rd_kafka_toppar_destroy(rktp); - rd_kafka_broker_destroy(rkb); - rd_kafka_destroy(rk); - - RD_UT_PASS(); - return 0; -} - -/** - * @brief Request/response unit tests - */ -int unittest_request(void) { - int fails = 0; - - fails += unittest_idempotent_producer(); - - return fails; -} - -/**@}*/ |