diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.c | 5969 |
1 files changed, 5969 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.c new file mode 100644 index 00000000..026e9332 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.c @@ -0,0 +1,5969 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rdkafka_int.h" +#include "rdkafka_broker.h" +#include "rdkafka_request.h" +#include "rdkafka_topic.h" +#include "rdkafka_partition.h" +#include "rdkafka_assignor.h" +#include "rdkafka_offset.h" +#include "rdkafka_metadata.h" +#include "rdkafka_cgrp.h" +#include "rdkafka_interceptor.h" +#include "rdmap.h" + +#include "rdunittest.h" + +#include <ctype.h> +#include <stdarg.h> + +static void rd_kafka_cgrp_offset_commit_tmr_cb(rd_kafka_timers_t *rkts, + void *arg); +static rd_kafka_error_t * +rd_kafka_cgrp_assign(rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *assignment); +static rd_kafka_error_t *rd_kafka_cgrp_unassign(rd_kafka_cgrp_t *rkcg); +static rd_kafka_error_t * +rd_kafka_cgrp_incremental_assign(rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *partitions); +static rd_kafka_error_t * +rd_kafka_cgrp_incremental_unassign(rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *partitions); + +static rd_kafka_op_res_t rd_kafka_cgrp_op_serve(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko, + rd_kafka_q_cb_type_t cb_type, + void *opaque); + +static void rd_kafka_cgrp_group_leader_reset(rd_kafka_cgrp_t *rkcg, + const char *reason); + +static RD_INLINE int rd_kafka_cgrp_try_terminate(rd_kafka_cgrp_t *rkcg); + +static void rd_kafka_cgrp_revoke_all_rejoin(rd_kafka_cgrp_t *rkcg, + rd_bool_t assignment_lost, + rd_bool_t initiating, + const char *reason); +static void rd_kafka_cgrp_revoke_all_rejoin_maybe(rd_kafka_cgrp_t *rkcg, + rd_bool_t assignment_lost, + rd_bool_t initiating, + const char *reason); + +static void rd_kafka_cgrp_group_is_rebalancing(rd_kafka_cgrp_t *rkcg); + +static void +rd_kafka_cgrp_max_poll_interval_check_tmr_cb(rd_kafka_timers_t *rkts, + void *arg); +static rd_kafka_resp_err_t +rd_kafka_cgrp_subscribe(rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *rktparlist); + +static void rd_kafka_cgrp_group_assignment_set( + rd_kafka_cgrp_t *rkcg, + const rd_kafka_topic_partition_list_t *partitions); +static void rd_kafka_cgrp_group_assignment_modify( + rd_kafka_cgrp_t *rkcg, + rd_bool_t add, + const rd_kafka_topic_partition_list_t *partitions); + +static void +rd_kafka_cgrp_handle_assignment(rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *assignment); + + +/** + * @returns true if the current assignment is lost. + */ +rd_bool_t rd_kafka_cgrp_assignment_is_lost(rd_kafka_cgrp_t *rkcg) { + return rd_atomic32_get(&rkcg->rkcg_assignment_lost) != 0; +} + + +/** + * @brief Call when the current assignment has been lost, with a + * human-readable reason. + */ +static void rd_kafka_cgrp_assignment_set_lost(rd_kafka_cgrp_t *rkcg, + char *fmt, + ...) RD_FORMAT(printf, 2, 3); +static void +rd_kafka_cgrp_assignment_set_lost(rd_kafka_cgrp_t *rkcg, char *fmt, ...) { + va_list ap; + char reason[256]; + + if (!rkcg->rkcg_group_assignment) + return; + + va_start(ap, fmt); + rd_vsnprintf(reason, sizeof(reason), fmt, ap); + va_end(ap); + + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "LOST", + "Group \"%s\": " + "current assignment of %d partition(s) lost: %s", + rkcg->rkcg_group_id->str, rkcg->rkcg_group_assignment->cnt, + reason); + + rd_atomic32_set(&rkcg->rkcg_assignment_lost, rd_true); +} + + +/** + * @brief Call when the current assignment is no longer considered lost, with a + * human-readable reason. + */ +static void +rd_kafka_cgrp_assignment_clear_lost(rd_kafka_cgrp_t *rkcg, char *fmt, ...) { + va_list ap; + char reason[256]; + + if (!rd_atomic32_get(&rkcg->rkcg_assignment_lost)) + return; + + va_start(ap, fmt); + rd_vsnprintf(reason, sizeof(reason), fmt, ap); + va_end(ap); + + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "LOST", + "Group \"%s\": " + "current assignment no longer considered lost: %s", + rkcg->rkcg_group_id->str, reason); + + rd_atomic32_set(&rkcg->rkcg_assignment_lost, rd_false); +} + + +/** + * @brief The rebalance protocol currently in use. This will be + * RD_KAFKA_REBALANCE_PROTOCOL_NONE if the consumer has not + * (yet) joined a group, else it will match the rebalance + * protocol of the configured assignor(s). + * + * @locality main thread + */ +rd_kafka_rebalance_protocol_t +rd_kafka_cgrp_rebalance_protocol(rd_kafka_cgrp_t *rkcg) { + if (!rkcg->rkcg_assignor) + return RD_KAFKA_REBALANCE_PROTOCOL_NONE; + return rkcg->rkcg_assignor->rkas_protocol; +} + + + +/** + * @returns true if the cgrp is awaiting a protocol response. This prohibits + * the join-state machine to proceed before the current state + * is done. + */ +static rd_bool_t rd_kafka_cgrp_awaiting_response(rd_kafka_cgrp_t *rkcg) { + return rkcg->rkcg_wait_resp != -1; +} + + +/** + * @brief Set flag indicating we are waiting for a coordinator response + * for the given request. + * + * This is used for specific requests to postpone rejoining the group if + * there are outstanding JoinGroup or SyncGroup requests. + * + * @locality main thread + */ +static void rd_kafka_cgrp_set_wait_resp(rd_kafka_cgrp_t *rkcg, int16_t ApiKey) { + rd_assert(rkcg->rkcg_wait_resp == -1); + rkcg->rkcg_wait_resp = ApiKey; +} + +/** + * @brief Clear the flag that says we're waiting for a coordinator response + * for the given \p request. + * + * @param request Original request, possibly NULL (for errors). + * + * @locality main thread + */ +static void rd_kafka_cgrp_clear_wait_resp(rd_kafka_cgrp_t *rkcg, + int16_t ApiKey) { + rd_assert(rkcg->rkcg_wait_resp == ApiKey); + rkcg->rkcg_wait_resp = -1; +} + + + +/** + * @struct Auxillary glue type used for COOPERATIVE rebalance set operations. + */ +typedef struct PartitionMemberInfo_s { + const rd_kafka_group_member_t *member; + rd_bool_t members_match; +} PartitionMemberInfo_t; + +static PartitionMemberInfo_t * +PartitionMemberInfo_new(const rd_kafka_group_member_t *member, + rd_bool_t members_match) { + PartitionMemberInfo_t *pmi; + + pmi = rd_calloc(1, sizeof(*pmi)); + pmi->member = member; + pmi->members_match = members_match; + + return pmi; +} + +static void PartitionMemberInfo_free(void *p) { + PartitionMemberInfo_t *pmi = p; + rd_free(pmi); +} + +typedef RD_MAP_TYPE(const rd_kafka_topic_partition_t *, + PartitionMemberInfo_t *) map_toppar_member_info_t; + + +/** + * @returns true if consumer has joined the group and thus requires a leave. + */ +#define RD_KAFKA_CGRP_HAS_JOINED(rkcg) \ + (rkcg->rkcg_member_id != NULL && \ + RD_KAFKAP_STR_LEN((rkcg)->rkcg_member_id) > 0) + + +/** + * @returns true if cgrp is waiting for a rebalance_cb to be handled by + * the application. + */ +#define RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) \ + ((rkcg)->rkcg_join_state == \ + RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL || \ + (rkcg)->rkcg_join_state == \ + RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL) + +/** + * @returns true if a rebalance is in progress. + * + * 1. In WAIT_JOIN or WAIT_METADATA state with a member-id set, + * this happens on rejoin. + * 2. In WAIT_SYNC waiting for the group to rebalance on the broker. + * 3. in *_WAIT_UNASSIGN_TO_COMPLETE waiting for unassigned partitions to + * stop fetching, et.al. + * 4. In _WAIT_*ASSIGN_CALL waiting for the application to handle the + * assignment changes in its rebalance callback and then call *assign(). + * 5. An incremental rebalancing is in progress. + * 6. A rebalance-induced rejoin is in progress. + */ +#define RD_KAFKA_CGRP_REBALANCING(rkcg) \ + ((RD_KAFKA_CGRP_HAS_JOINED(rkcg) && \ + ((rkcg)->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN || \ + (rkcg)->rkcg_join_state == \ + RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA)) || \ + (rkcg)->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC || \ + (rkcg)->rkcg_join_state == \ + RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE || \ + (rkcg)->rkcg_join_state == \ + RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE || \ + (rkcg)->rkcg_join_state == \ + RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL || \ + (rkcg)->rkcg_join_state == \ + RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL || \ + (rkcg)->rkcg_rebalance_incr_assignment != NULL || \ + (rkcg)->rkcg_rebalance_rejoin) + + + +const char *rd_kafka_cgrp_state_names[] = { + "init", "term", "query-coord", + "wait-coord", "wait-broker", "wait-broker-transport", + "up"}; + +const char *rd_kafka_cgrp_join_state_names[] = { + "init", + "wait-join", + "wait-metadata", + "wait-sync", + "wait-assign-call", + "wait-unassign-call", + "wait-unassign-to-complete", + "wait-incr-unassign-to-complete", + "steady", +}; + + +/** + * @brief Change the cgrp state. + * + * @returns 1 if the state was changed, else 0. + */ +static int rd_kafka_cgrp_set_state(rd_kafka_cgrp_t *rkcg, int state) { + if ((int)rkcg->rkcg_state == state) + return 0; + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPSTATE", + "Group \"%.*s\" changed state %s -> %s " + "(join-state %s)", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_cgrp_state_names[rkcg->rkcg_state], + rd_kafka_cgrp_state_names[state], + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + rkcg->rkcg_state = state; + rkcg->rkcg_ts_statechange = rd_clock(); + + rd_kafka_brokers_broadcast_state_change(rkcg->rkcg_rk); + + return 1; +} + + +void rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t *rkcg, int join_state) { + if ((int)rkcg->rkcg_join_state == join_state) + return; + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPJOINSTATE", + "Group \"%.*s\" changed join state %s -> %s " + "(state %s)", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], + rd_kafka_cgrp_join_state_names[join_state], + rd_kafka_cgrp_state_names[rkcg->rkcg_state]); + rkcg->rkcg_join_state = join_state; +} + + +void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg) { + rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_subscription); + rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_group_leader.members); + rd_kafka_cgrp_set_member_id(rkcg, NULL); + if (rkcg->rkcg_group_instance_id) + rd_kafkap_str_destroy(rkcg->rkcg_group_instance_id); + + rd_kafka_q_destroy_owner(rkcg->rkcg_q); + rd_kafka_q_destroy_owner(rkcg->rkcg_ops); + rd_kafka_q_destroy_owner(rkcg->rkcg_wait_coord_q); + rd_kafka_assert(rkcg->rkcg_rk, TAILQ_EMPTY(&rkcg->rkcg_topics)); + rd_kafka_assert(rkcg->rkcg_rk, rd_list_empty(&rkcg->rkcg_toppars)); + rd_list_destroy(&rkcg->rkcg_toppars); + rd_list_destroy(rkcg->rkcg_subscribed_topics); + rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics); + if (rkcg->rkcg_assignor && rkcg->rkcg_assignor->rkas_destroy_state_cb) + rkcg->rkcg_assignor->rkas_destroy_state_cb( + rkcg->rkcg_assignor_state); + rd_free(rkcg); +} + + + +/** + * @brief Update the absolute session timeout following a successfull + * response from the coordinator. + * This timeout is used to enforce the session timeout in the + * consumer itself. + * + * @param reset if true the timeout is updated even if the session has expired. + */ +static RD_INLINE void +rd_kafka_cgrp_update_session_timeout(rd_kafka_cgrp_t *rkcg, rd_bool_t reset) { + if (reset || rkcg->rkcg_ts_session_timeout != 0) + rkcg->rkcg_ts_session_timeout = + rd_clock() + + (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000); +} + + + +rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk, + const rd_kafkap_str_t *group_id, + const rd_kafkap_str_t *client_id) { + rd_kafka_cgrp_t *rkcg; + + rkcg = rd_calloc(1, sizeof(*rkcg)); + + rkcg->rkcg_rk = rk; + rkcg->rkcg_group_id = group_id; + rkcg->rkcg_client_id = client_id; + rkcg->rkcg_coord_id = -1; + rkcg->rkcg_generation_id = -1; + rkcg->rkcg_wait_resp = -1; + + rkcg->rkcg_ops = rd_kafka_q_new(rk); + rkcg->rkcg_ops->rkq_serve = rd_kafka_cgrp_op_serve; + rkcg->rkcg_ops->rkq_opaque = rkcg; + rkcg->rkcg_wait_coord_q = rd_kafka_q_new(rk); + rkcg->rkcg_wait_coord_q->rkq_serve = rkcg->rkcg_ops->rkq_serve; + rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque; + rkcg->rkcg_q = rd_kafka_q_new(rk); + rkcg->rkcg_group_instance_id = + rd_kafkap_str_new(rk->rk_conf.group_instance_id, -1); + + TAILQ_INIT(&rkcg->rkcg_topics); + rd_list_init(&rkcg->rkcg_toppars, 32, NULL); + rd_kafka_cgrp_set_member_id(rkcg, ""); + rkcg->rkcg_subscribed_topics = + rd_list_new(0, (void *)rd_kafka_topic_info_destroy); + rd_interval_init(&rkcg->rkcg_coord_query_intvl); + rd_interval_init(&rkcg->rkcg_heartbeat_intvl); + rd_interval_init(&rkcg->rkcg_join_intvl); + rd_interval_init(&rkcg->rkcg_timeout_scan_intvl); + rd_atomic32_init(&rkcg->rkcg_assignment_lost, rd_false); + rd_atomic32_init(&rkcg->rkcg_terminated, rd_false); + + rkcg->rkcg_errored_topics = rd_kafka_topic_partition_list_new(0); + + /* Create a logical group coordinator broker to provide + * a dedicated connection for group coordination. + * This is needed since JoinGroup may block for up to + * max.poll.interval.ms, effectively blocking and timing out + * any other protocol requests (such as Metadata). + * The address for this broker will be updated when + * the group coordinator is assigned. */ + rkcg->rkcg_coord = rd_kafka_broker_add_logical(rk, "GroupCoordinator"); + + if (rk->rk_conf.enable_auto_commit && + rk->rk_conf.auto_commit_interval_ms > 0) + rd_kafka_timer_start( + &rk->rk_timers, &rkcg->rkcg_offset_commit_tmr, + rk->rk_conf.auto_commit_interval_ms * 1000ll, + rd_kafka_cgrp_offset_commit_tmr_cb, rkcg); + + return rkcg; +} + + +/** + * @brief Set the group coordinator broker. + */ +static void rd_kafka_cgrp_coord_set_broker(rd_kafka_cgrp_t *rkcg, + rd_kafka_broker_t *rkb) { + + rd_assert(rkcg->rkcg_curr_coord == NULL); + + rd_assert(RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb)); + + rkcg->rkcg_curr_coord = rkb; + rd_kafka_broker_keep(rkb); + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORDSET", + "Group \"%.*s\" coordinator set to broker %s", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_broker_name(rkb)); + + /* Reset query interval to trigger an immediate + * coord query if required */ + if (!rd_interval_disabled(&rkcg->rkcg_coord_query_intvl)) + rd_interval_reset(&rkcg->rkcg_coord_query_intvl); + + rd_kafka_cgrp_set_state(rkcg, + RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT); + + rd_kafka_broker_persistent_connection_add( + rkcg->rkcg_coord, &rkcg->rkcg_coord->rkb_persistconn.coord); + + /* Set the logical coordinator's nodename to the + * proper broker's nodename, this will trigger a (re)connect + * to the new address. */ + rd_kafka_broker_set_nodename(rkcg->rkcg_coord, rkb); +} + + +/** + * @brief Reset/clear the group coordinator broker. + */ +static void rd_kafka_cgrp_coord_clear_broker(rd_kafka_cgrp_t *rkcg) { + rd_kafka_broker_t *rkb = rkcg->rkcg_curr_coord; + + rd_assert(rkcg->rkcg_curr_coord); + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORDCLEAR", + "Group \"%.*s\" broker %s is no longer coordinator", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_broker_name(rkb)); + + rd_assert(rkcg->rkcg_coord); + + rd_kafka_broker_persistent_connection_del( + rkcg->rkcg_coord, &rkcg->rkcg_coord->rkb_persistconn.coord); + + /* Clear the ephemeral broker's nodename. + * This will also trigger a disconnect. */ + rd_kafka_broker_set_nodename(rkcg->rkcg_coord, NULL); + + rkcg->rkcg_curr_coord = NULL; + rd_kafka_broker_destroy(rkb); /* from set_coord_broker() */ +} + + +/** + * @brief Update/set the group coordinator. + * + * Will do nothing if there's been no change. + * + * @returns 1 if the coordinator, or state, was updated, else 0. + */ +static int rd_kafka_cgrp_coord_update(rd_kafka_cgrp_t *rkcg, int32_t coord_id) { + + /* Don't do anything while terminating */ + if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM) + return 0; + + /* Check if coordinator changed */ + if (rkcg->rkcg_coord_id != coord_id) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPCOORD", + "Group \"%.*s\" changing coordinator %" PRId32 + " -> %" PRId32, + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rkcg->rkcg_coord_id, coord_id); + + /* Update coord id */ + rkcg->rkcg_coord_id = coord_id; + + /* Clear previous broker handle, if any */ + if (rkcg->rkcg_curr_coord) + rd_kafka_cgrp_coord_clear_broker(rkcg); + } + + + if (rkcg->rkcg_curr_coord) { + /* There is already a known coordinator and a + * corresponding broker handle. */ + if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP) + return rd_kafka_cgrp_set_state( + rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT); + + } else if (rkcg->rkcg_coord_id != -1) { + rd_kafka_broker_t *rkb; + + /* Try to find the coordinator broker handle */ + rd_kafka_rdlock(rkcg->rkcg_rk); + rkb = rd_kafka_broker_find_by_nodeid(rkcg->rkcg_rk, coord_id); + rd_kafka_rdunlock(rkcg->rkcg_rk); + + /* It is possible, due to stale metadata, that the + * coordinator id points to a broker we still don't know + * about. In this case the client will continue + * querying metadata and querying for the coordinator + * until a match is found. */ + + if (rkb) { + /* Coordinator is known and broker handle exists */ + rd_kafka_cgrp_coord_set_broker(rkcg, rkb); + rd_kafka_broker_destroy(rkb); /*from find_by_nodeid()*/ + + return 1; + } else { + /* Coordinator is known but no corresponding + * broker handle. */ + return rd_kafka_cgrp_set_state( + rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER); + } + + } else { + /* Coordinator still not known, re-query */ + if (rkcg->rkcg_state >= RD_KAFKA_CGRP_STATE_WAIT_COORD) + return rd_kafka_cgrp_set_state( + rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); + } + + return 0; /* no change */ +} + + + +/** + * Handle FindCoordinator response + */ +static void rd_kafka_cgrp_handle_FindCoordinator(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + const int log_decode_errors = LOG_ERR; + int16_t ErrorCode = 0; + int32_t CoordId; + rd_kafkap_str_t CoordHost = RD_ZERO_INIT; + int32_t CoordPort; + rd_kafka_cgrp_t *rkcg = opaque; + struct rd_kafka_metadata_broker mdb = RD_ZERO_INIT; + char *errstr = NULL; + int actions; + + if (likely(!(ErrorCode = err))) { + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) + rd_kafka_buf_read_throttle_time(rkbuf); + + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + + if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) { + rd_kafkap_str_t ErrorMsg; + + rd_kafka_buf_read_str(rkbuf, &ErrorMsg); + + if (!RD_KAFKAP_STR_IS_NULL(&ErrorMsg)) + RD_KAFKAP_STR_DUPA(&errstr, &ErrorMsg); + } + + rd_kafka_buf_read_i32(rkbuf, &CoordId); + rd_kafka_buf_read_str(rkbuf, &CoordHost); + rd_kafka_buf_read_i32(rkbuf, &CoordPort); + } + + if (ErrorCode) + goto err; + + + mdb.id = CoordId; + RD_KAFKAP_STR_DUPA(&mdb.host, &CoordHost); + mdb.port = CoordPort; + + rd_rkb_dbg(rkb, CGRP, "CGRPCOORD", + "Group \"%.*s\" coordinator is %s:%i id %" PRId32, + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), mdb.host, mdb.port, + mdb.id); + rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, &mdb, NULL); + + rd_kafka_cgrp_coord_update(rkcg, CoordId); + rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */ + return; + +err_parse: /* Parse error */ + ErrorCode = rkbuf->rkbuf_err; + /* FALLTHRU */ + +err: + if (!errstr) + errstr = (char *)rd_kafka_err2str(ErrorCode); + + rd_rkb_dbg(rkb, CGRP, "CGRPCOORD", + "Group \"%.*s\" FindCoordinator response error: %s: %s", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_err2name(ErrorCode), errstr); + + if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY) + return; + + actions = rd_kafka_err_action( + rkb, ErrorCode, request, + + RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_REFRESH, + RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE, + + RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TRANSPORT, + + RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TIMED_OUT, + + RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, + + RD_KAFKA_ERR_ACTION_END); + + + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + rd_kafka_cgrp_coord_update(rkcg, -1); + } else { + if (!(actions & RD_KAFKA_ERR_ACTION_RETRY) && + rkcg->rkcg_last_err != ErrorCode) { + /* Propagate non-retriable errors to the application */ + rd_kafka_consumer_err( + rkcg->rkcg_q, rd_kafka_broker_id(rkb), ErrorCode, 0, + NULL, NULL, RD_KAFKA_OFFSET_INVALID, + "FindCoordinator response error: %s", errstr); + + /* Suppress repeated errors */ + rkcg->rkcg_last_err = ErrorCode; + } + + /* Retries are performed by the timer-intervalled + * coord queries, continue querying */ + rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); + } + + rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */ +} + + +/** + * Query for coordinator. + * Ask any broker in state UP + * + * Locality: main thread + */ +void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason) { + rd_kafka_broker_t *rkb; + rd_kafka_resp_err_t err; + + rkb = rd_kafka_broker_any_usable( + rkcg->rkcg_rk, RD_POLL_NOWAIT, RD_DO_LOCK, + RD_KAFKA_FEATURE_BROKER_GROUP_COORD, "coordinator query"); + + if (!rkb) { + /* Reset the interval because there were no brokers. When a + * broker becomes available, we want to query it immediately. */ + rd_interval_reset(&rkcg->rkcg_coord_query_intvl); + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPQUERY", + "Group \"%.*s\": " + "no broker available for coordinator query: %s", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason); + return; + } + + rd_rkb_dbg(rkb, CGRP, "CGRPQUERY", + "Group \"%.*s\": querying for coordinator: %s", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason); + + err = rd_kafka_FindCoordinatorRequest( + rkb, RD_KAFKA_COORD_GROUP, rkcg->rkcg_group_id->str, + RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), + rd_kafka_cgrp_handle_FindCoordinator, rkcg); + + if (err) { + rd_rkb_dbg(rkb, CGRP, "CGRPQUERY", + "Group \"%.*s\": " + "unable to send coordinator query: %s", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_err2str(err)); + rd_kafka_broker_destroy(rkb); + return; + } + + if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_QUERY_COORD) + rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_COORD); + + rd_kafka_broker_destroy(rkb); + + /* Back off the next intervalled query since we just sent one. */ + rd_interval_reset_to_now(&rkcg->rkcg_coord_query_intvl, 0); +} + +/** + * @brief Mark the current coordinator as dead. + * + * @locality main thread + */ +void rd_kafka_cgrp_coord_dead(rd_kafka_cgrp_t *rkcg, + rd_kafka_resp_err_t err, + const char *reason) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORD", + "Group \"%.*s\": " + "marking the coordinator (%" PRId32 ") dead: %s: %s", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rkcg->rkcg_coord_id, + rd_kafka_err2str(err), reason); + + rd_kafka_cgrp_coord_update(rkcg, -1); + + /* Re-query for coordinator */ + rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); + rd_kafka_cgrp_coord_query(rkcg, reason); +} + + +/** + * @returns a new reference to the current coordinator, if available, else NULL. + * + * @locality rdkafka main thread + * @locks_required none + * @locks_acquired none + */ +rd_kafka_broker_t *rd_kafka_cgrp_get_coord(rd_kafka_cgrp_t *rkcg) { + if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkcg->rkcg_coord) + return NULL; + + rd_kafka_broker_keep(rkcg->rkcg_coord); + + return rkcg->rkcg_coord; +} + + +/** + * @brief cgrp handling of LeaveGroup responses + * @param opaque must be the cgrp handle. + * @locality rdkafka main thread (unless err==ERR__DESTROY) + */ +static void rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_cgrp_t *rkcg = opaque; + const int log_decode_errors = LOG_ERR; + int16_t ErrorCode = 0; + + if (err) { + ErrorCode = err; + goto err; + } + + if (request->rkbuf_reqhdr.ApiVersion >= 1) + rd_kafka_buf_read_throttle_time(rkbuf); + + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + +err: + if (ErrorCode) + rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP", + "LeaveGroup response error in state %s: %s", + rd_kafka_cgrp_state_names[rkcg->rkcg_state], + rd_kafka_err2str(ErrorCode)); + else + rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP", + "LeaveGroup response received in state %s", + rd_kafka_cgrp_state_names[rkcg->rkcg_state]); + + if (ErrorCode != RD_KAFKA_RESP_ERR__DESTROY) { + rd_assert(thrd_is_current(rk->rk_thread)); + rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE; + rd_kafka_cgrp_try_terminate(rkcg); + } + + + + return; + +err_parse: + ErrorCode = rkbuf->rkbuf_err; + goto err; +} + + +static void rd_kafka_cgrp_leave(rd_kafka_cgrp_t *rkcg) { + char *member_id; + + RD_KAFKAP_STR_DUPA(&member_id, rkcg->rkcg_member_id); + + /* Leaving the group invalidates the member id, reset it + * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */ + rd_kafka_cgrp_set_member_id(rkcg, ""); + + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE", + "Group \"%.*s\": leave (in state %s): " + "LeaveGroupRequest already in-transit", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_cgrp_state_names[rkcg->rkcg_state]); + return; + } + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE", + "Group \"%.*s\": leave (in state %s)", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_cgrp_state_names[rkcg->rkcg_state]); + + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE; + + if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) { + rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE", + "Leaving group"); + rd_kafka_LeaveGroupRequest( + rkcg->rkcg_coord, rkcg->rkcg_group_id->str, member_id, + RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), + rd_kafka_cgrp_handle_LeaveGroup, rkcg); + } else + rd_kafka_cgrp_handle_LeaveGroup(rkcg->rkcg_rk, rkcg->rkcg_coord, + RD_KAFKA_RESP_ERR__WAIT_COORD, + NULL, NULL, rkcg); +} + + +/** + * @brief Leave group, if desired. + * + * @returns true if a LeaveGroup was issued, else false. + */ +static rd_bool_t rd_kafka_cgrp_leave_maybe(rd_kafka_cgrp_t *rkcg) { + + /* We were not instructed to leave in the first place. */ + if (!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE)) + return rd_false; + + rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE; + + /* Don't send Leave when termating with NO_CONSUMER_CLOSE flag */ + if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk)) + return rd_false; + + /* KIP-345: Static group members must not send a LeaveGroupRequest + * on termination. */ + if (RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg) && + rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) + return rd_false; + + rd_kafka_cgrp_leave(rkcg); + + return rd_true; +} + + +/** + * @brief Enqueues a rebalance op, delegating responsibility of calling + * incremental_assign / incremental_unassign to the application. + * If there is no rebalance handler configured, or the action + * should not be delegated to the application for some other + * reason, incremental_assign / incremental_unassign will be called + * automatically, immediately. + * + * @param rejoin whether or not to rejoin the group following completion + * of the incremental assign / unassign. + * + * @remarks does not take ownership of \p partitions. + */ +void rd_kafka_rebalance_op_incr(rd_kafka_cgrp_t *rkcg, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *partitions, + rd_bool_t rejoin, + const char *reason) { + rd_kafka_error_t *error; + + /* Flag to rejoin after completion of the incr_assign or incr_unassign, + if required. */ + rkcg->rkcg_rebalance_rejoin = rejoin; + + rd_kafka_wrlock(rkcg->rkcg_rk); + rkcg->rkcg_c.ts_rebalance = rd_clock(); + rkcg->rkcg_c.rebalance_cnt++; + rd_kafka_wrunlock(rkcg->rkcg_rk); + + if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk) || + rd_kafka_fatal_error_code(rkcg->rkcg_rk)) { + /* Total unconditional unassign in these cases */ + rd_kafka_cgrp_unassign(rkcg); + + /* Now serve the assignment to make updates */ + rd_kafka_assignment_serve(rkcg->rkcg_rk); + goto done; + } + + rd_kafka_cgrp_set_join_state( + rkcg, err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS + ? RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL + : RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL); + + /* Schedule application rebalance callback/event if enabled */ + if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE) { + rd_kafka_op_t *rko; + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN", + "Group \"%s\": delegating incremental %s of %d " + "partition(s) to application on queue %s: %s", + rkcg->rkcg_group_id->str, + err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS + ? "revoke" + : "assign", + partitions->cnt, + rd_kafka_q_dest_name(rkcg->rkcg_q), reason); + + /* Pause currently assigned partitions while waiting for + * rebalance callback to get called to make sure the + * application will not receive any more messages that + * might block it from serving the rebalance callback + * and to not process messages for partitions it + * might have lost in the rebalance. */ + rd_kafka_assignment_pause(rkcg->rkcg_rk, + "incremental rebalance"); + + rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE); + rko->rko_err = err; + rko->rko_u.rebalance.partitions = + rd_kafka_topic_partition_list_copy(partitions); + + if (rd_kafka_q_enq(rkcg->rkcg_q, rko)) + goto done; /* Rebalance op successfully enqueued */ + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP", + "Group \"%s\": ops queue is disabled, not " + "delegating partition %s to application", + rkcg->rkcg_group_id->str, + err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS + ? "unassign" + : "assign"); + /* FALLTHRU */ + } + + /* No application rebalance callback/event handler, or it is not + * available, do the assign/unassign ourselves. + * We need to be careful here not to trigger assignment_serve() + * since it may call into the cgrp code again, in which case we + * can't really track what the outcome state will be. */ + + if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) + error = rd_kafka_cgrp_incremental_assign(rkcg, partitions); + else + error = rd_kafka_cgrp_incremental_unassign(rkcg, partitions); + + if (error) { + rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE", + "Group \"%s\": internal incremental %s " + "of %d partition(s) failed: %s: " + "unassigning all partitions and rejoining", + rkcg->rkcg_group_id->str, + err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS + ? "unassign" + : "assign", + partitions->cnt, rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + + rd_kafka_cgrp_set_join_state(rkcg, + /* This is a clean state for + * assignment_done() to rejoin + * from. */ + RD_KAFKA_CGRP_JOIN_STATE_STEADY); + rd_kafka_assignment_clear(rkcg->rkcg_rk); + } + + /* Now serve the assignment to make updates */ + rd_kafka_assignment_serve(rkcg->rkcg_rk); + +done: + /* Update the current group assignment based on the + * added/removed partitions. */ + rd_kafka_cgrp_group_assignment_modify( + rkcg, err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, partitions); +} + + +/** + * @brief Enqueues a rebalance op, delegating responsibility of calling + * assign / unassign to the application. If there is no rebalance + * handler configured, or the action should not be delegated to the + * application for some other reason, assign / unassign will be + * called automatically. + * + * @remarks \p partitions is copied. + */ +static void rd_kafka_rebalance_op(rd_kafka_cgrp_t *rkcg, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *assignment, + const char *reason) { + rd_kafka_error_t *error; + + rd_kafka_wrlock(rkcg->rkcg_rk); + rkcg->rkcg_c.ts_rebalance = rd_clock(); + rkcg->rkcg_c.rebalance_cnt++; + rd_kafka_wrunlock(rkcg->rkcg_rk); + + if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk) || + rd_kafka_fatal_error_code(rkcg->rkcg_rk)) { + /* Unassign */ + rd_kafka_cgrp_unassign(rkcg); + + /* Now serve the assignment to make updates */ + rd_kafka_assignment_serve(rkcg->rkcg_rk); + goto done; + } + + rd_assert(assignment != NULL); + + rd_kafka_cgrp_set_join_state( + rkcg, err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS + ? RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL + : RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL); + + /* Schedule application rebalance callback/event if enabled */ + if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE) { + rd_kafka_op_t *rko; + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN", + "Group \"%s\": delegating %s of %d partition(s) " + "to application on queue %s: %s", + rkcg->rkcg_group_id->str, + err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS + ? "revoke" + : "assign", + assignment->cnt, + rd_kafka_q_dest_name(rkcg->rkcg_q), reason); + + /* Pause currently assigned partitions while waiting for + * rebalance callback to get called to make sure the + * application will not receive any more messages that + * might block it from serving the rebalance callback + * and to not process messages for partitions it + * might have lost in the rebalance. */ + rd_kafka_assignment_pause(rkcg->rkcg_rk, "rebalance"); + + rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE); + rko->rko_err = err; + rko->rko_u.rebalance.partitions = + rd_kafka_topic_partition_list_copy(assignment); + + if (rd_kafka_q_enq(rkcg->rkcg_q, rko)) + goto done; /* Rebalance op successfully enqueued */ + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP", + "Group \"%s\": ops queue is disabled, not " + "delegating partition %s to application", + rkcg->rkcg_group_id->str, + err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS + ? "unassign" + : "assign"); + + /* FALLTHRU */ + } + + /* No application rebalance callback/event handler, or it is not + * available, do the assign/unassign ourselves. + * We need to be careful here not to trigger assignment_serve() + * since it may call into the cgrp code again, in which case we + * can't really track what the outcome state will be. */ + + if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) + error = rd_kafka_cgrp_assign(rkcg, assignment); + else + error = rd_kafka_cgrp_unassign(rkcg); + + if (error) { + rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE", + "Group \"%s\": internal %s " + "of %d partition(s) failed: %s: " + "unassigning all partitions and rejoining", + rkcg->rkcg_group_id->str, + err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS + ? "unassign" + : "assign", + rkcg->rkcg_group_assignment->cnt, + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + + rd_kafka_cgrp_set_join_state(rkcg, + /* This is a clean state for + * assignment_done() to rejoin + * from. */ + RD_KAFKA_CGRP_JOIN_STATE_STEADY); + rd_kafka_assignment_clear(rkcg->rkcg_rk); + } + + /* Now serve the assignment to make updates */ + rd_kafka_assignment_serve(rkcg->rkcg_rk); + +done: + if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) + rd_kafka_cgrp_group_assignment_set(rkcg, assignment); + else + rd_kafka_cgrp_group_assignment_set(rkcg, NULL); +} + + +/** + * @brief Rejoin the group. + * + * @remark This function must not have any side-effects but setting the + * join state. + */ +static void rd_kafka_cgrp_rejoin(rd_kafka_cgrp_t *rkcg, const char *fmt, ...) + RD_FORMAT(printf, 2, 3); + +static void rd_kafka_cgrp_rejoin(rd_kafka_cgrp_t *rkcg, const char *fmt, ...) { + char reason[512]; + va_list ap; + char astr[128]; + + va_start(ap, fmt); + rd_vsnprintf(reason, sizeof(reason), fmt, ap); + va_end(ap); + + if (rkcg->rkcg_group_assignment) + rd_snprintf(astr, sizeof(astr), " with %d owned partition(s)", + rkcg->rkcg_group_assignment->cnt); + else + rd_snprintf(astr, sizeof(astr), " without an assignment"); + + if (rkcg->rkcg_subscription || rkcg->rkcg_next_subscription) { + rd_kafka_dbg( + rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REJOIN", + "Group \"%s\": %s group%s: %s", rkcg->rkcg_group_id->str, + rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT + ? "Joining" + : "Rejoining", + astr, reason); + } else { + rd_kafka_dbg( + rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "NOREJOIN", + "Group \"%s\": Not %s group%s: %s: " + "no subscribed topics", + rkcg->rkcg_group_id->str, + rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT + ? "joining" + : "rejoining", + astr, reason); + + rd_kafka_cgrp_leave_maybe(rkcg); + } + + rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT); +} + + +/** + * @brief Collect all assigned or owned partitions from group members. + * The member field of each result element is set to the associated + * group member. The members_match field is set to rd_false. + * + * @param members Array of group members. + * @param member_cnt Number of elements in members. + * @param par_cnt The total number of partitions expected to be collected. + * @param collect_owned If rd_true, rkgm_owned partitions will be collected, + * else rkgm_assignment partitions will be collected. + */ +static map_toppar_member_info_t * +rd_kafka_collect_partitions(const rd_kafka_group_member_t *members, + size_t member_cnt, + size_t par_cnt, + rd_bool_t collect_owned) { + size_t i; + map_toppar_member_info_t *collected = rd_calloc(1, sizeof(*collected)); + + RD_MAP_INIT(collected, par_cnt, rd_kafka_topic_partition_cmp, + rd_kafka_topic_partition_hash, + rd_kafka_topic_partition_destroy_free, + PartitionMemberInfo_free); + + for (i = 0; i < member_cnt; i++) { + size_t j; + const rd_kafka_group_member_t *rkgm = &members[i]; + const rd_kafka_topic_partition_list_t *toppars = + collect_owned ? rkgm->rkgm_owned : rkgm->rkgm_assignment; + + for (j = 0; j < (size_t)toppars->cnt; j++) { + rd_kafka_topic_partition_t *rktpar = + rd_kafka_topic_partition_copy(&toppars->elems[j]); + PartitionMemberInfo_t *pmi = + PartitionMemberInfo_new(rkgm, rd_false); + RD_MAP_SET(collected, rktpar, pmi); + } + } + + return collected; +} + + +/** + * @brief Set intersection. Returns a set of all elements of \p a that + * are also elements of \p b. Additionally, compares the members + * field of matching elements from \p a and \p b and if not NULL + * and equal, sets the members_match field in the result element + * to rd_true and the member field to equal that of the elements, + * else sets the members_match field to rd_false and member field + * to NULL. + */ +static map_toppar_member_info_t * +rd_kafka_member_partitions_intersect(map_toppar_member_info_t *a, + map_toppar_member_info_t *b) { + const rd_kafka_topic_partition_t *key; + const PartitionMemberInfo_t *a_v; + map_toppar_member_info_t *intersection = + rd_calloc(1, sizeof(*intersection)); + + RD_MAP_INIT( + intersection, RD_MIN(a ? RD_MAP_CNT(a) : 1, b ? RD_MAP_CNT(b) : 1), + rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash, + rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); + + if (!a || !b) + return intersection; + + RD_MAP_FOREACH(key, a_v, a) { + rd_bool_t members_match; + const PartitionMemberInfo_t *b_v = RD_MAP_GET(b, key); + + if (b_v == NULL) + continue; + + members_match = + a_v->member && b_v->member && + rd_kafka_group_member_cmp(a_v->member, b_v->member) == 0; + + RD_MAP_SET(intersection, rd_kafka_topic_partition_copy(key), + PartitionMemberInfo_new(b_v->member, members_match)); + } + + return intersection; +} + + +/** + * @brief Set subtraction. Returns a set of all elements of \p a + * that are not elements of \p b. Sets the member field in + * elements in the returned set to equal that of the + * corresponding element in \p a + */ +static map_toppar_member_info_t * +rd_kafka_member_partitions_subtract(map_toppar_member_info_t *a, + map_toppar_member_info_t *b) { + const rd_kafka_topic_partition_t *key; + const PartitionMemberInfo_t *a_v; + map_toppar_member_info_t *difference = + rd_calloc(1, sizeof(*difference)); + + RD_MAP_INIT(difference, a ? RD_MAP_CNT(a) : 1, + rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash, + rd_kafka_topic_partition_destroy_free, + PartitionMemberInfo_free); + + if (!a) + return difference; + + RD_MAP_FOREACH(key, a_v, a) { + const PartitionMemberInfo_t *b_v = + b ? RD_MAP_GET(b, key) : NULL; + + if (!b_v) + RD_MAP_SET( + difference, rd_kafka_topic_partition_copy(key), + PartitionMemberInfo_new(a_v->member, rd_false)); + } + + return difference; +} + + +/** + * @brief Adjust the partition assignment as provided by the assignor + * according to the COOPERATIVE protocol. + */ +static void rd_kafka_cooperative_protocol_adjust_assignment( + rd_kafka_cgrp_t *rkcg, + rd_kafka_group_member_t *members, + int member_cnt) { + + /* https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafk\ + a+Consumer+Incremental+Rebalance+Protocol */ + + int i; + int expected_max_assignment_size; + int total_assigned = 0; + int not_revoking = 0; + size_t par_cnt = 0; + const rd_kafka_topic_partition_t *toppar; + const PartitionMemberInfo_t *pmi; + map_toppar_member_info_t *assigned; + map_toppar_member_info_t *owned; + map_toppar_member_info_t *maybe_revoking; + map_toppar_member_info_t *ready_to_migrate; + map_toppar_member_info_t *unknown_but_owned; + + for (i = 0; i < member_cnt; i++) + par_cnt += members[i].rkgm_owned->cnt; + + assigned = rd_kafka_collect_partitions(members, member_cnt, par_cnt, + rd_false /*assigned*/); + + owned = rd_kafka_collect_partitions(members, member_cnt, par_cnt, + rd_true /*owned*/); + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP", + "Group \"%s\": Partitions owned by members: %d, " + "partitions assigned by assignor: %d", + rkcg->rkcg_group_id->str, (int)RD_MAP_CNT(owned), + (int)RD_MAP_CNT(assigned)); + + /* Still owned by some members */ + maybe_revoking = rd_kafka_member_partitions_intersect(assigned, owned); + + /* Not previously owned by anyone */ + ready_to_migrate = rd_kafka_member_partitions_subtract(assigned, owned); + + /* Don't exist in assigned partitions */ + unknown_but_owned = + rd_kafka_member_partitions_subtract(owned, assigned); + + /* Rough guess at a size that is a bit higher than + * the maximum number of partitions likely to be + * assigned to any partition. */ + expected_max_assignment_size = + (int)(RD_MAP_CNT(assigned) / member_cnt) + 4; + + for (i = 0; i < member_cnt; i++) { + rd_kafka_group_member_t *rkgm = &members[i]; + rd_kafka_topic_partition_list_destroy(rkgm->rkgm_assignment); + + rkgm->rkgm_assignment = rd_kafka_topic_partition_list_new( + expected_max_assignment_size); + } + + /* For maybe-revoking-partitions, check if the owner has + * changed. If yes, exclude them from the assigned-partitions + * list to the new owner. The old owner will realize it does + * not own it any more, revoke it and then trigger another + * rebalance for these partitions to finally be reassigned. + */ + RD_MAP_FOREACH(toppar, pmi, maybe_revoking) { + if (!pmi->members_match) + /* Owner has changed. */ + continue; + + /* Owner hasn't changed. */ + rd_kafka_topic_partition_list_add(pmi->member->rkgm_assignment, + toppar->topic, + toppar->partition); + + total_assigned++; + not_revoking++; + } + + /* For ready-to-migrate-partitions, it is safe to move them + * to the new member immediately since we know no one owns + * it before, and hence we can encode the owner from the + * newly-assigned-partitions directly. + */ + RD_MAP_FOREACH(toppar, pmi, ready_to_migrate) { + rd_kafka_topic_partition_list_add(pmi->member->rkgm_assignment, + toppar->topic, + toppar->partition); + total_assigned++; + } + + /* For unknown-but-owned-partitions, it is also safe to just + * give them back to whoever claimed to be their owners by + * encoding them directly as well. If this is due to a topic + * metadata update, then a later rebalance will be triggered + * anyway. + */ + RD_MAP_FOREACH(toppar, pmi, unknown_but_owned) { + rd_kafka_topic_partition_list_add(pmi->member->rkgm_assignment, + toppar->topic, + toppar->partition); + total_assigned++; + } + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP", + "Group \"%s\": COOPERATIVE protocol collection sizes: " + "maybe revoking: %d, ready to migrate: %d, unknown but " + "owned: %d", + rkcg->rkcg_group_id->str, (int)RD_MAP_CNT(maybe_revoking), + (int)RD_MAP_CNT(ready_to_migrate), + (int)RD_MAP_CNT(unknown_but_owned)); + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP", + "Group \"%s\": %d partitions assigned to consumers", + rkcg->rkcg_group_id->str, total_assigned); + + RD_MAP_DESTROY_AND_FREE(maybe_revoking); + RD_MAP_DESTROY_AND_FREE(ready_to_migrate); + RD_MAP_DESTROY_AND_FREE(unknown_but_owned); + RD_MAP_DESTROY_AND_FREE(assigned); + RD_MAP_DESTROY_AND_FREE(owned); +} + + +/** + * @brief Parses and handles the MemberState from a SyncGroupResponse. + */ +static void rd_kafka_cgrp_handle_SyncGroup_memberstate( + rd_kafka_cgrp_t *rkcg, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + const rd_kafkap_bytes_t *member_state) { + rd_kafka_buf_t *rkbuf = NULL; + rd_kafka_topic_partition_list_t *assignment = NULL; + const int log_decode_errors = LOG_ERR; + int16_t Version; + rd_kafkap_bytes_t UserData; + + /* Dont handle new assignments when terminating */ + if (!err && rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) + err = RD_KAFKA_RESP_ERR__DESTROY; + + if (err) + goto err; + + if (RD_KAFKAP_BYTES_LEN(member_state) == 0) { + /* Empty assignment. */ + assignment = rd_kafka_topic_partition_list_new(0); + memset(&UserData, 0, sizeof(UserData)); + goto done; + } + + /* Parse assignment from MemberState */ + rkbuf = rd_kafka_buf_new_shadow( + member_state->data, RD_KAFKAP_BYTES_LEN(member_state), NULL); + /* Protocol parser needs a broker handle to log errors on. */ + if (rkb) { + rkbuf->rkbuf_rkb = rkb; + rd_kafka_broker_keep(rkb); + } else + rkbuf->rkbuf_rkb = rd_kafka_broker_internal(rkcg->rkcg_rk); + + rd_kafka_buf_read_i16(rkbuf, &Version); + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + if (!(assignment = + rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields))) + goto err_parse; + rd_kafka_buf_read_bytes(rkbuf, &UserData); + +done: + rd_kafka_cgrp_update_session_timeout(rkcg, rd_true /*reset timeout*/); + + rd_assert(rkcg->rkcg_assignor); + if (rkcg->rkcg_assignor->rkas_on_assignment_cb) { + char *member_id; + RD_KAFKAP_STR_DUPA(&member_id, rkcg->rkcg_member_id); + rd_kafka_consumer_group_metadata_t *cgmd = + rd_kafka_consumer_group_metadata_new_with_genid( + rkcg->rkcg_rk->rk_conf.group_id_str, + rkcg->rkcg_generation_id, member_id, + rkcg->rkcg_rk->rk_conf.group_instance_id); + rkcg->rkcg_assignor->rkas_on_assignment_cb( + rkcg->rkcg_assignor, &(rkcg->rkcg_assignor_state), + assignment, &UserData, cgmd); + rd_kafka_consumer_group_metadata_destroy(cgmd); + } + + // FIXME: Remove when we're done debugging. + rd_kafka_topic_partition_list_log(rkcg->rkcg_rk, "ASSIGNMENT", + RD_KAFKA_DBG_CGRP, assignment); + + /* Set the new assignment */ + rd_kafka_cgrp_handle_assignment(rkcg, assignment); + + rd_kafka_topic_partition_list_destroy(assignment); + + if (rkbuf) + rd_kafka_buf_destroy(rkbuf); + + return; + +err_parse: + err = rkbuf->rkbuf_err; + +err: + if (rkbuf) + rd_kafka_buf_destroy(rkbuf); + + if (assignment) + rd_kafka_topic_partition_list_destroy(assignment); + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPSYNC", + "Group \"%s\": synchronization failed: %s: rejoining", + rkcg->rkcg_group_id->str, rd_kafka_err2str(err)); + + if (err == RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID) + rd_kafka_set_fatal_error(rkcg->rkcg_rk, err, + "Fatal consumer error: %s", + rd_kafka_err2str(err)); + else if (err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION) + rkcg->rkcg_generation_id = -1; + else if (err == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID) + rd_kafka_cgrp_set_member_id(rkcg, ""); + + if (rd_kafka_cgrp_rebalance_protocol(rkcg) == + RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE && + (err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION || + err == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)) + rd_kafka_cgrp_revoke_all_rejoin( + rkcg, rd_true /*assignment is lost*/, + rd_true /*this consumer is initiating*/, "SyncGroup error"); + else + rd_kafka_cgrp_rejoin(rkcg, "SyncGroup error: %s", + rd_kafka_err2str(err)); +} + + + +/** + * @brief Cgrp handler for SyncGroup responses. opaque must be the cgrp handle. + */ +static void rd_kafka_cgrp_handle_SyncGroup(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_cgrp_t *rkcg = opaque; + const int log_decode_errors = LOG_ERR; + int16_t ErrorCode = 0; + rd_kafkap_bytes_t MemberState = RD_ZERO_INIT; + int actions; + + if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) { + rd_kafka_dbg( + rkb->rkb_rk, CGRP, "SYNCGROUP", + "SyncGroup response: discarding outdated request " + "(now in join-state %s)", + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + rd_kafka_cgrp_clear_wait_resp(rkcg, RD_KAFKAP_SyncGroup); + return; + } + + if (err) { + ErrorCode = err; + goto err; + } + + if (request->rkbuf_reqhdr.ApiVersion >= 1) + rd_kafka_buf_read_throttle_time(rkbuf); + + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + rd_kafka_buf_read_bytes(rkbuf, &MemberState); + +err: + actions = rd_kafka_err_action(rkb, ErrorCode, request, + RD_KAFKA_ERR_ACTION_END); + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + /* Re-query for coordinator */ + rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, + RD_KAFKA_OP_COORD_QUERY, ErrorCode); + /* FALLTHRU */ + } + + if (actions & RD_KAFKA_ERR_ACTION_RETRY) { + if (rd_kafka_buf_retry(rkb, request)) + return; + /* FALLTHRU */ + } + + rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP", + "SyncGroup response: %s (%d bytes of MemberState data)", + rd_kafka_err2str(ErrorCode), + RD_KAFKAP_BYTES_LEN(&MemberState)); + + rd_kafka_cgrp_clear_wait_resp(rkcg, RD_KAFKAP_SyncGroup); + + if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY) + return; /* Termination */ + + rd_kafka_cgrp_handle_SyncGroup_memberstate(rkcg, rkb, ErrorCode, + &MemberState); + + return; + +err_parse: + ErrorCode = rkbuf->rkbuf_err; + goto err; +} + + +/** + * @brief Run group assignment. + */ +static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg, + rd_kafka_assignor_t *rkas, + rd_kafka_resp_err_t err, + rd_kafka_metadata_t *metadata, + rd_kafka_group_member_t *members, + int member_cnt) { + char errstr[512]; + + if (err) { + rd_snprintf(errstr, sizeof(errstr), + "Failed to get cluster metadata: %s", + rd_kafka_err2str(err)); + goto err; + } + + *errstr = '\0'; + + /* Run assignor */ + err = rd_kafka_assignor_run(rkcg, rkas, metadata, members, member_cnt, + errstr, sizeof(errstr)); + + if (err) { + if (!*errstr) + rd_snprintf(errstr, sizeof(errstr), "%s", + rd_kafka_err2str(err)); + goto err; + } + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "ASSIGNOR", + "Group \"%s\": \"%s\" assignor run for %d member(s)", + rkcg->rkcg_group_id->str, rkas->rkas_protocol_name->str, + member_cnt); + + if (rkas->rkas_protocol == RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE) + rd_kafka_cooperative_protocol_adjust_assignment(rkcg, members, + member_cnt); + + rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC); + + rd_kafka_cgrp_set_wait_resp(rkcg, RD_KAFKAP_SyncGroup); + + /* Respond to broker with assignment set or error */ + rd_kafka_SyncGroupRequest( + rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_generation_id, + rkcg->rkcg_member_id, rkcg->rkcg_group_instance_id, members, + err ? 0 : member_cnt, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), + rd_kafka_cgrp_handle_SyncGroup, rkcg); + return; + +err: + rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "ASSIGNOR", + "Group \"%s\": failed to run assignor \"%s\" for " + "%d member(s): %s", + rkcg->rkcg_group_id->str, rkas->rkas_protocol_name->str, + member_cnt, errstr); + + rd_kafka_cgrp_rejoin(rkcg, "%s assignor failed: %s", + rkas->rkas_protocol_name->str, errstr); +} + + + +/** + * @brief Op callback from handle_JoinGroup + */ +static rd_kafka_op_res_t +rd_kafka_cgrp_assignor_handle_Metadata_op(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; + + if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) + return RD_KAFKA_OP_RES_HANDLED; /* Terminating */ + + if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA) + return RD_KAFKA_OP_RES_HANDLED; /* From outdated state */ + + if (!rkcg->rkcg_group_leader.members) { + rd_kafka_dbg(rk, CGRP, "GRPLEADER", + "Group \"%.*s\": no longer leader: " + "not running assignor", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); + return RD_KAFKA_OP_RES_HANDLED; + } + + rd_kafka_cgrp_assignor_run(rkcg, rkcg->rkcg_assignor, rko->rko_err, + rko->rko_u.metadata.md, + rkcg->rkcg_group_leader.members, + rkcg->rkcg_group_leader.member_cnt); + + return RD_KAFKA_OP_RES_HANDLED; +} + + +/** + * Parse single JoinGroup.Members.MemberMetadata for "consumer" ProtocolType + * + * Protocol definition: + * https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal + * + * Returns 0 on success or -1 on error. + */ +static int rd_kafka_group_MemberMetadata_consumer_read( + rd_kafka_broker_t *rkb, + rd_kafka_group_member_t *rkgm, + const rd_kafkap_bytes_t *MemberMetadata) { + + rd_kafka_buf_t *rkbuf; + int16_t Version; + int32_t subscription_cnt; + rd_kafkap_bytes_t UserData; + const int log_decode_errors = LOG_ERR; + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG; + + /* Create a shadow-buffer pointing to the metadata to ease parsing. */ + rkbuf = rd_kafka_buf_new_shadow( + MemberMetadata->data, RD_KAFKAP_BYTES_LEN(MemberMetadata), NULL); + + /* Protocol parser needs a broker handle to log errors on. */ + rkbuf->rkbuf_rkb = rkb; + rd_kafka_broker_keep(rkb); + + rd_kafka_buf_read_i16(rkbuf, &Version); + rd_kafka_buf_read_i32(rkbuf, &subscription_cnt); + + if (subscription_cnt > 10000 || subscription_cnt <= 0) + goto err; + + rkgm->rkgm_subscription = + rd_kafka_topic_partition_list_new(subscription_cnt); + + while (subscription_cnt-- > 0) { + rd_kafkap_str_t Topic; + char *topic_name; + rd_kafka_buf_read_str(rkbuf, &Topic); + RD_KAFKAP_STR_DUPA(&topic_name, &Topic); + rd_kafka_topic_partition_list_add( + rkgm->rkgm_subscription, topic_name, RD_KAFKA_PARTITION_UA); + } + + rd_kafka_buf_read_bytes(rkbuf, &UserData); + rkgm->rkgm_userdata = rd_kafkap_bytes_copy(&UserData); + + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + if (Version >= 1 && + !(rkgm->rkgm_owned = + rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields))) + goto err; + + rd_kafka_buf_destroy(rkbuf); + + return 0; + +err_parse: + err = rkbuf->rkbuf_err; + +err: + rd_rkb_dbg(rkb, CGRP, "MEMBERMETA", + "Failed to parse MemberMetadata for \"%.*s\": %s", + RD_KAFKAP_STR_PR(rkgm->rkgm_member_id), + rd_kafka_err2str(err)); + if (rkgm->rkgm_subscription) { + rd_kafka_topic_partition_list_destroy(rkgm->rkgm_subscription); + rkgm->rkgm_subscription = NULL; + } + + rd_kafka_buf_destroy(rkbuf); + return -1; +} + + +/** + * @brief cgrp handler for JoinGroup responses + * opaque must be the cgrp handle. + * + * @locality rdkafka main thread (unless ERR__DESTROY: arbitrary thread) + */ +static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_cgrp_t *rkcg = opaque; + const int log_decode_errors = LOG_ERR; + int16_t ErrorCode = 0; + int32_t GenerationId; + rd_kafkap_str_t Protocol, LeaderId; + rd_kafkap_str_t MyMemberId = RD_KAFKAP_STR_INITIALIZER; + int32_t member_cnt; + int actions; + int i_am_leader = 0; + rd_kafka_assignor_t *rkas = NULL; + + rd_kafka_cgrp_clear_wait_resp(rkcg, RD_KAFKAP_JoinGroup); + + if (err == RD_KAFKA_RESP_ERR__DESTROY || + rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) + return; /* Terminating */ + + if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN) { + rd_kafka_dbg( + rkb->rkb_rk, CGRP, "JOINGROUP", + "JoinGroup response: discarding outdated request " + "(now in join-state %s)", + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + return; + } + + if (err) { + ErrorCode = err; + goto err; + } + + if (request->rkbuf_reqhdr.ApiVersion >= 2) + rd_kafka_buf_read_throttle_time(rkbuf); + + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + rd_kafka_buf_read_i32(rkbuf, &GenerationId); + rd_kafka_buf_read_str(rkbuf, &Protocol); + rd_kafka_buf_read_str(rkbuf, &LeaderId); + rd_kafka_buf_read_str(rkbuf, &MyMemberId); + rd_kafka_buf_read_i32(rkbuf, &member_cnt); + + if (!ErrorCode && RD_KAFKAP_STR_IS_NULL(&Protocol)) { + /* Protocol not set, we will not be able to find + * a matching assignor so error out early. */ + ErrorCode = RD_KAFKA_RESP_ERR__BAD_MSG; + } else if (!ErrorCode) { + char *protocol_name; + RD_KAFKAP_STR_DUPA(&protocol_name, &Protocol); + if (!(rkas = rd_kafka_assignor_find(rkcg->rkcg_rk, + protocol_name)) || + !rkas->rkas_enabled) { + rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP", + "Unsupported assignment strategy \"%s\"", + protocol_name); + if (rkcg->rkcg_assignor) { + if (rkcg->rkcg_assignor->rkas_destroy_state_cb) + rkcg->rkcg_assignor + ->rkas_destroy_state_cb( + rkcg->rkcg_assignor_state); + rkcg->rkcg_assignor_state = NULL; + rkcg->rkcg_assignor = NULL; + } + ErrorCode = RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL; + } + } + + rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP", + "JoinGroup response: GenerationId %" PRId32 + ", " + "Protocol %.*s, LeaderId %.*s%s, my MemberId %.*s, " + "member metadata count " + "%" PRId32 ": %s", + GenerationId, RD_KAFKAP_STR_PR(&Protocol), + RD_KAFKAP_STR_PR(&LeaderId), + RD_KAFKAP_STR_LEN(&MyMemberId) && + !rd_kafkap_str_cmp(&LeaderId, &MyMemberId) + ? " (me)" + : "", + RD_KAFKAP_STR_PR(&MyMemberId), member_cnt, + ErrorCode ? rd_kafka_err2str(ErrorCode) : "(no error)"); + + if (!ErrorCode) { + char *my_member_id; + RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId); + rd_kafka_cgrp_set_member_id(rkcg, my_member_id); + rkcg->rkcg_generation_id = GenerationId; + i_am_leader = !rd_kafkap_str_cmp(&LeaderId, &MyMemberId); + } else { + rd_interval_backoff(&rkcg->rkcg_join_intvl, 1000 * 1000); + goto err; + } + + if (rkcg->rkcg_assignor && rkcg->rkcg_assignor != rkas) { + if (rkcg->rkcg_assignor->rkas_destroy_state_cb) + rkcg->rkcg_assignor->rkas_destroy_state_cb( + rkcg->rkcg_assignor_state); + rkcg->rkcg_assignor_state = NULL; + } + rkcg->rkcg_assignor = rkas; + + if (i_am_leader) { + rd_kafka_group_member_t *members; + int i; + int sub_cnt = 0; + rd_list_t topics; + rd_kafka_op_t *rko; + rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP", + "I am elected leader for group \"%s\" " + "with %" PRId32 " member(s)", + rkcg->rkcg_group_id->str, member_cnt); + + if (member_cnt > 100000) { + err = RD_KAFKA_RESP_ERR__BAD_MSG; + goto err; + } + + rd_list_init(&topics, member_cnt, rd_free); + + members = rd_calloc(member_cnt, sizeof(*members)); + + for (i = 0; i < member_cnt; i++) { + rd_kafkap_str_t MemberId; + rd_kafkap_bytes_t MemberMetadata; + rd_kafka_group_member_t *rkgm; + rd_kafkap_str_t GroupInstanceId = + RD_KAFKAP_STR_INITIALIZER; + + rd_kafka_buf_read_str(rkbuf, &MemberId); + if (request->rkbuf_reqhdr.ApiVersion >= 5) + rd_kafka_buf_read_str(rkbuf, &GroupInstanceId); + rd_kafka_buf_read_bytes(rkbuf, &MemberMetadata); + + rkgm = &members[sub_cnt]; + rkgm->rkgm_member_id = rd_kafkap_str_copy(&MemberId); + rkgm->rkgm_group_instance_id = + rd_kafkap_str_copy(&GroupInstanceId); + rd_list_init(&rkgm->rkgm_eligible, 0, NULL); + rkgm->rkgm_generation = -1; + + if (rd_kafka_group_MemberMetadata_consumer_read( + rkb, rkgm, &MemberMetadata)) { + /* Failed to parse this member's metadata, + * ignore it. */ + } else { + sub_cnt++; + rkgm->rkgm_assignment = + rd_kafka_topic_partition_list_new( + rkgm->rkgm_subscription->cnt); + rd_kafka_topic_partition_list_get_topic_names( + rkgm->rkgm_subscription, &topics, + 0 /*dont include regex*/); + } + } + + /* FIXME: What to do if parsing failed for some/all members? + * It is a sign of incompatibility. */ + + + rd_kafka_cgrp_group_leader_reset(rkcg, + "JoinGroup response clean-up"); + + rd_kafka_assert(NULL, rkcg->rkcg_group_leader.members == NULL); + rkcg->rkcg_group_leader.members = members; + rkcg->rkcg_group_leader.member_cnt = sub_cnt; + + rd_kafka_cgrp_set_join_state( + rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA); + + /* The assignor will need metadata so fetch it asynchronously + * and run the assignor when we get a reply. + * Create a callback op that the generic metadata code + * will trigger when metadata has been parsed. */ + rko = rd_kafka_op_new_cb( + rkcg->rkcg_rk, RD_KAFKA_OP_METADATA, + rd_kafka_cgrp_assignor_handle_Metadata_op); + rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, NULL); + + rd_kafka_MetadataRequest( + rkb, &topics, "partition assignor", + rd_false /*!allow_auto_create*/, + /* cgrp_update=false: + * Since the subscription list may not be identical + * across all members of the group and thus the + * Metadata response may not be identical to this + * consumer's subscription list, we want to + * avoid triggering a rejoin or error propagation + * on receiving the response since some topics + * may be missing. */ + rd_false, rko); + rd_list_destroy(&topics); + + } else { + rd_kafka_cgrp_set_join_state( + rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC); + + rd_kafka_cgrp_set_wait_resp(rkcg, RD_KAFKAP_SyncGroup); + + rd_kafka_SyncGroupRequest( + rkb, rkcg->rkcg_group_id, rkcg->rkcg_generation_id, + rkcg->rkcg_member_id, rkcg->rkcg_group_instance_id, NULL, 0, + RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), + rd_kafka_cgrp_handle_SyncGroup, rkcg); + } + +err: + actions = rd_kafka_err_action( + rkb, ErrorCode, request, RD_KAFKA_ERR_ACTION_IGNORE, + RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID, + + RD_KAFKA_ERR_ACTION_IGNORE, RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED, + + RD_KAFKA_ERR_ACTION_IGNORE, RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, + + RD_KAFKA_ERR_ACTION_PERMANENT, RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID, + + RD_KAFKA_ERR_ACTION_END); + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + /* Re-query for coordinator */ + rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, + RD_KAFKA_OP_COORD_QUERY, ErrorCode); + } + + /* No need for retries here since the join is intervalled, + * see rkcg_join_intvl */ + + if (ErrorCode) { + if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY) + return; /* Termination */ + + if (ErrorCode == RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID) { + rd_kafka_set_fatal_error(rkcg->rkcg_rk, ErrorCode, + "Fatal consumer error: %s", + rd_kafka_err2str(ErrorCode)); + ErrorCode = RD_KAFKA_RESP_ERR__FATAL; + + } else if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) + rd_kafka_consumer_err( + rkcg->rkcg_q, rd_kafka_broker_id(rkb), ErrorCode, 0, + NULL, NULL, RD_KAFKA_OFFSET_INVALID, + "JoinGroup failed: %s", + rd_kafka_err2str(ErrorCode)); + + if (ErrorCode == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID) + rd_kafka_cgrp_set_member_id(rkcg, ""); + else if (ErrorCode == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION) + rkcg->rkcg_generation_id = -1; + else if (ErrorCode == RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED) { + /* KIP-394 requires member.id on initial join + * group request */ + char *my_member_id; + RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId); + rd_kafka_cgrp_set_member_id(rkcg, my_member_id); + /* Skip the join backoff */ + rd_interval_reset(&rkcg->rkcg_join_intvl); + } + + if (rd_kafka_cgrp_rebalance_protocol(rkcg) == + RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE && + (ErrorCode == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION || + ErrorCode == RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED)) + rd_kafka_cgrp_revoke_all_rejoin( + rkcg, rd_true /*assignment is lost*/, + rd_true /*this consumer is initiating*/, + "JoinGroup error"); + else + rd_kafka_cgrp_rejoin(rkcg, "JoinGroup error: %s", + rd_kafka_err2str(ErrorCode)); + } + + return; + +err_parse: + ErrorCode = rkbuf->rkbuf_err; + goto err; +} + + +/** + * @brief Check subscription against requested Metadata. + */ +static rd_kafka_op_res_t rd_kafka_cgrp_handle_Metadata_op(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; + + if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) + return RD_KAFKA_OP_RES_HANDLED; /* Terminating */ + + rd_kafka_cgrp_metadata_update_check(rkcg, rd_false /*dont rejoin*/); + + return RD_KAFKA_OP_RES_HANDLED; +} + + +/** + * @brief (Async) Refresh metadata (for cgrp's needs) + * + * @returns 1 if metadata refresh was requested, or 0 if metadata is + * up to date, or -1 if no broker is available for metadata requests. + * + * @locks none + * @locality rdkafka main thread + */ +static int rd_kafka_cgrp_metadata_refresh(rd_kafka_cgrp_t *rkcg, + int *metadata_agep, + const char *reason) { + rd_kafka_t *rk = rkcg->rkcg_rk; + rd_kafka_op_t *rko; + rd_list_t topics; + rd_kafka_resp_err_t err; + + rd_list_init(&topics, 8, rd_free); + + /* Insert all non-wildcard topics in cache. */ + rd_kafka_metadata_cache_hint_rktparlist( + rkcg->rkcg_rk, rkcg->rkcg_subscription, NULL, 0 /*dont replace*/); + + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) { + /* For wildcard subscriptions make sure the + * cached full metadata isn't too old. */ + int metadata_age = -1; + + if (rk->rk_ts_full_metadata) + metadata_age = + (int)(rd_clock() - rk->rk_ts_full_metadata) / 1000; + + *metadata_agep = metadata_age; + + if (metadata_age != -1 && + metadata_age <= rk->rk_conf.metadata_max_age_ms) { + rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_METADATA, + "CGRPMETADATA", + "%s: metadata for wildcard subscription " + "is up to date (%dms old)", + reason, *metadata_agep); + rd_list_destroy(&topics); + return 0; /* Up-to-date */ + } + + } else { + /* Check that all subscribed topics are in the cache. */ + int r; + + rd_kafka_topic_partition_list_get_topic_names( + rkcg->rkcg_subscription, &topics, 0 /*no regexps*/); + + rd_kafka_rdlock(rk); + r = rd_kafka_metadata_cache_topics_count_exists(rk, &topics, + metadata_agep); + rd_kafka_rdunlock(rk); + + if (r == rd_list_cnt(&topics)) { + rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_METADATA, + "CGRPMETADATA", + "%s: metadata for subscription " + "is up to date (%dms old)", + reason, *metadata_agep); + rd_list_destroy(&topics); + return 0; /* Up-to-date and all topics exist. */ + } + + rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_METADATA, "CGRPMETADATA", + "%s: metadata for subscription " + "only available for %d/%d topics (%dms old)", + reason, r, rd_list_cnt(&topics), *metadata_agep); + } + + /* Async request, result will be triggered from + * rd_kafka_parse_metadata(). */ + rko = rd_kafka_op_new_cb(rkcg->rkcg_rk, RD_KAFKA_OP_METADATA, + rd_kafka_cgrp_handle_Metadata_op); + rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, 0); + + err = rd_kafka_metadata_request(rkcg->rkcg_rk, NULL, &topics, + rd_false /*!allow auto create */, + rd_true /*cgrp_update*/, reason, rko); + if (err) { + rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_METADATA, "CGRPMETADATA", + "%s: need to refresh metadata (%dms old) " + "but no usable brokers available: %s", + reason, *metadata_agep, rd_kafka_err2str(err)); + rd_kafka_op_destroy(rko); + } + + rd_list_destroy(&topics); + + return err ? -1 : 1; +} + + + +static void rd_kafka_cgrp_join(rd_kafka_cgrp_t *rkcg) { + int metadata_age; + + if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || + rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_INIT || + rd_kafka_cgrp_awaiting_response(rkcg)) + return; + + /* On max.poll.interval.ms failure, do not rejoin group until the + * application has called poll. */ + if ((rkcg->rkcg_flags & RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED) && + rd_kafka_max_poll_exceeded(rkcg->rkcg_rk)) + return; + + rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED; + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN", + "Group \"%.*s\": join with %d subscribed topic(s)", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_list_cnt(rkcg->rkcg_subscribed_topics)); + + + /* See if we need to query metadata to continue: + * - if subscription contains wildcards: + * * query all topics in cluster + * + * - if subscription does not contain wildcards but + * some topics are missing from the local metadata cache: + * * query subscribed topics (all cached ones) + * + * - otherwise: + * * rely on topic metadata cache + */ + /* We need up-to-date full metadata to continue, + * refresh metadata if necessary. */ + if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age, + "consumer join") == 1) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, + "JOIN", + "Group \"%.*s\": " + "postponing join until up-to-date " + "metadata is available", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); + + rd_assert( + rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT || + /* Possible via rd_kafka_cgrp_modify_subscription */ + rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY); + + rd_kafka_cgrp_set_join_state( + rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA); + + return; /* ^ async call */ + } + + if (rd_list_empty(rkcg->rkcg_subscribed_topics)) + rd_kafka_cgrp_metadata_update_check(rkcg, + rd_false /*dont join*/); + + if (rd_list_empty(rkcg->rkcg_subscribed_topics)) { + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "JOIN", + "Group \"%.*s\": " + "no matching topics based on %dms old metadata: " + "next metadata refresh in %dms", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), metadata_age, + rkcg->rkcg_rk->rk_conf.metadata_refresh_interval_ms - + metadata_age); + return; + } + + rd_rkb_dbg( + rkcg->rkcg_curr_coord, CONSUMER | RD_KAFKA_DBG_CGRP, "JOIN", + "Joining group \"%.*s\" with %d subscribed topic(s) and " + "member id \"%.*s\"", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_list_cnt(rkcg->rkcg_subscribed_topics), + rkcg->rkcg_member_id ? RD_KAFKAP_STR_LEN(rkcg->rkcg_member_id) : 0, + rkcg->rkcg_member_id ? rkcg->rkcg_member_id->str : ""); + + + rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN); + + rd_kafka_cgrp_set_wait_resp(rkcg, RD_KAFKAP_JoinGroup); + + rd_kafka_JoinGroupRequest( + rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_member_id, + rkcg->rkcg_group_instance_id, + rkcg->rkcg_rk->rk_conf.group_protocol_type, + rkcg->rkcg_subscribed_topics, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), + rd_kafka_cgrp_handle_JoinGroup, rkcg); +} + +/** + * Rejoin group on update to effective subscribed topics list + */ +static void rd_kafka_cgrp_revoke_rejoin(rd_kafka_cgrp_t *rkcg, + const char *reason) { + /* + * Clean-up group leader duties, if any. + */ + rd_kafka_cgrp_group_leader_reset(rkcg, "group (re)join"); + + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP, "REJOIN", + "Group \"%.*s\" (re)joining in join-state %s " + "with %d assigned partition(s): %s", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], + rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0, + reason); + + rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_false /*not lost*/, + rd_true /*initiating*/, reason); +} + +/** + * @brief Update the effective list of subscribed topics. + * + * Set \p tinfos to NULL to clear the list. + * + * @param tinfos rd_list_t(rd_kafka_topic_info_t *): new effective topic list + * + * @returns true on change, else false. + * + * @remark Takes ownership of \p tinfos + */ +static rd_bool_t rd_kafka_cgrp_update_subscribed_topics(rd_kafka_cgrp_t *rkcg, + rd_list_t *tinfos) { + rd_kafka_topic_info_t *tinfo; + int i; + + if (!tinfos) { + if (!rd_list_empty(rkcg->rkcg_subscribed_topics)) + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION", + "Group \"%.*s\": " + "clearing subscribed topics list (%d)", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_list_cnt(rkcg->rkcg_subscribed_topics)); + tinfos = rd_list_new(0, (void *)rd_kafka_topic_info_destroy); + + } else { + if (rd_list_cnt(tinfos) == 0) + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION", + "Group \"%.*s\": " + "no topics in metadata matched " + "subscription", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); + } + + /* Sort for comparison */ + rd_list_sort(tinfos, rd_kafka_topic_info_cmp); + + /* Compare to existing to see if anything changed. */ + if (!rd_list_cmp(rkcg->rkcg_subscribed_topics, tinfos, + rd_kafka_topic_info_cmp)) { + /* No change */ + rd_list_destroy(tinfos); + return rd_false; + } + + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_METADATA, "SUBSCRIPTION", + "Group \"%.*s\": effective subscription list changed " + "from %d to %d topic(s):", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_list_cnt(rkcg->rkcg_subscribed_topics), rd_list_cnt(tinfos)); + + RD_LIST_FOREACH(tinfo, tinfos, i) + rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_METADATA, + "SUBSCRIPTION", " Topic %s with %d partition(s)", + tinfo->topic, tinfo->partition_cnt); + + rd_list_destroy(rkcg->rkcg_subscribed_topics); + + rkcg->rkcg_subscribed_topics = tinfos; + + return rd_true; +} + + +/** + * @brief Handle Heartbeat response. + */ +void rd_kafka_cgrp_handle_Heartbeat(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; + const int log_decode_errors = LOG_ERR; + int16_t ErrorCode = 0; + int actions = 0; + + if (err == RD_KAFKA_RESP_ERR__DESTROY) + return; + + rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT); + rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; + + rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR; + + if (err) + goto err; + + if (request->rkbuf_reqhdr.ApiVersion >= 1) + rd_kafka_buf_read_throttle_time(rkbuf); + + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + if (ErrorCode) { + err = ErrorCode; + goto err; + } + + rd_kafka_cgrp_update_session_timeout( + rkcg, rd_false /*don't update if session has expired*/); + + return; + +err_parse: + err = rkbuf->rkbuf_err; +err: + rkcg->rkcg_last_heartbeat_err = err; + + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Group \"%s\" heartbeat error response in " + "state %s (join-state %s, %d partition(s) assigned): %s", + rkcg->rkcg_group_id->str, + rd_kafka_cgrp_state_names[rkcg->rkcg_state], + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], + rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0, + rd_kafka_err2str(err)); + + if (rkcg->rkcg_join_state <= RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) { + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP, "HEARTBEAT", + "Heartbeat response: discarding outdated " + "request (now in join-state %s)", + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + return; + } + + switch (err) { + case RD_KAFKA_RESP_ERR__DESTROY: + /* quick cleanup */ + return; + + case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: + case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR__TRANSPORT: + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", + "Heartbeat failed due to coordinator (%s) " + "no longer available: %s: " + "re-querying for coordinator", + rkcg->rkcg_curr_coord + ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) + : "none", + rd_kafka_err2str(err)); + /* Remain in joined state and keep querying for coordinator */ + actions = RD_KAFKA_ERR_ACTION_REFRESH; + break; + + case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS: + rd_kafka_cgrp_update_session_timeout( + rkcg, rd_false /*don't update if session has expired*/); + /* No further action if already rebalancing */ + if (RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg)) + return; + rd_kafka_cgrp_group_is_rebalancing(rkcg); + return; + + case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: + rd_kafka_cgrp_set_member_id(rkcg, ""); + rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_true /*lost*/, + rd_true /*initiating*/, + "resetting member-id"); + return; + + case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION: + rkcg->rkcg_generation_id = -1; + rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_true /*lost*/, + rd_true /*initiating*/, + "illegal generation"); + return; + + case RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID: + rd_kafka_set_fatal_error(rkcg->rkcg_rk, err, + "Fatal consumer error: %s", + rd_kafka_err2str(err)); + rd_kafka_cgrp_revoke_all_rejoin_maybe( + rkcg, rd_true, /*assignment lost*/ + rd_true, /*initiating*/ + "consumer fenced by " + "newer instance"); + return; + + default: + actions = rd_kafka_err_action(rkb, err, request, + RD_KAFKA_ERR_ACTION_END); + break; + } + + + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { + /* Re-query for coordinator */ + rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err)); + } + + if (actions & RD_KAFKA_ERR_ACTION_RETRY && + rd_kafka_buf_retry(rkb, request)) { + /* Retry */ + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; + return; + } +} + + + +/** + * @brief Send Heartbeat + */ +static void rd_kafka_cgrp_heartbeat(rd_kafka_cgrp_t *rkcg) { + /* Don't send heartbeats if max.poll.interval.ms was exceeded */ + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED) + return; + + /* Skip heartbeat if we have one in transit */ + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT) + return; + + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; + rd_kafka_HeartbeatRequest( + rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_generation_id, + rkcg->rkcg_member_id, rkcg->rkcg_group_instance_id, + RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), rd_kafka_cgrp_handle_Heartbeat, + NULL); +} + +/** + * Cgrp is now terminated: decommission it and signal back to application. + */ +static void rd_kafka_cgrp_terminated(rd_kafka_cgrp_t *rkcg) { + if (rd_atomic32_get(&rkcg->rkcg_terminated)) + return; /* terminated() may be called multiple times, + * make sure to only terminate once. */ + + rd_kafka_cgrp_group_assignment_set(rkcg, NULL); + + rd_kafka_assert(NULL, !rd_kafka_assignment_in_progress(rkcg->rkcg_rk)); + rd_kafka_assert(NULL, !rkcg->rkcg_group_assignment); + rd_kafka_assert(NULL, rkcg->rkcg_rk->rk_consumer.wait_commit_cnt == 0); + rd_kafka_assert(NULL, rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM); + + rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers, + &rkcg->rkcg_offset_commit_tmr, 1 /*lock*/); + + rd_kafka_q_purge(rkcg->rkcg_wait_coord_q); + + /* Disable and empty ops queue since there will be no + * (broker) thread serving it anymore after the unassign_broker + * below. + * This prevents hang on destroy where responses are enqueued on + * rkcg_ops without anything serving the queue. */ + rd_kafka_q_disable(rkcg->rkcg_ops); + rd_kafka_q_purge(rkcg->rkcg_ops); + + if (rkcg->rkcg_curr_coord) + rd_kafka_cgrp_coord_clear_broker(rkcg); + + if (rkcg->rkcg_coord) { + rd_kafka_broker_destroy(rkcg->rkcg_coord); + rkcg->rkcg_coord = NULL; + } + + rd_atomic32_set(&rkcg->rkcg_terminated, rd_true); + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM", + "Consumer group sub-system terminated%s", + rkcg->rkcg_reply_rko ? " (will enqueue reply)" : ""); + + if (rkcg->rkcg_reply_rko) { + /* Signal back to application. */ + rd_kafka_replyq_enq(&rkcg->rkcg_reply_rko->rko_replyq, + rkcg->rkcg_reply_rko, 0); + rkcg->rkcg_reply_rko = NULL; + } + + /* Remove cgrp application queue forwarding, if any. */ + rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL); +} + + +/** + * If a cgrp is terminating and all outstanding ops are now finished + * then progress to final termination and return 1. + * Else returns 0. + */ +static RD_INLINE int rd_kafka_cgrp_try_terminate(rd_kafka_cgrp_t *rkcg) { + + if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM) + return 1; + + if (likely(!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE))) + return 0; + + /* Check if wait-coord queue has timed out. */ + if (rd_kafka_q_len(rkcg->rkcg_wait_coord_q) > 0 && + rkcg->rkcg_ts_terminate + + (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000) < + rd_clock()) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM", + "Group \"%s\": timing out %d op(s) in " + "wait-for-coordinator queue", + rkcg->rkcg_group_id->str, + rd_kafka_q_len(rkcg->rkcg_wait_coord_q)); + rd_kafka_q_disable(rkcg->rkcg_wait_coord_q); + if (rd_kafka_q_concat(rkcg->rkcg_ops, + rkcg->rkcg_wait_coord_q) == -1) { + /* ops queue shut down, purge coord queue */ + rd_kafka_q_purge(rkcg->rkcg_wait_coord_q); + } + } + + if (!RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) && + rd_list_empty(&rkcg->rkcg_toppars) && + !rd_kafka_assignment_in_progress(rkcg->rkcg_rk) && + rkcg->rkcg_rk->rk_consumer.wait_commit_cnt == 0 && + !(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE)) { + /* Since we might be deep down in a 'rko' handler + * called from cgrp_op_serve() we cant call terminated() + * directly since it will decommission the rkcg_ops queue + * that might be locked by intermediate functions. + * Instead set the TERM state and let the cgrp terminate + * at its own discretion. */ + rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_TERM); + + return 1; + } else { + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP, "CGRPTERM", + "Group \"%s\": " + "waiting for %s%d toppar(s), " + "%s" + "%d commit(s)%s%s%s (state %s, join-state %s) " + "before terminating", + rkcg->rkcg_group_id->str, + RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) ? "assign call, " : "", + rd_list_cnt(&rkcg->rkcg_toppars), + rd_kafka_assignment_in_progress(rkcg->rkcg_rk) + ? "assignment in progress, " + : "", + rkcg->rkcg_rk->rk_consumer.wait_commit_cnt, + (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) + ? ", wait-leave," + : "", + rkcg->rkcg_rebalance_rejoin ? ", rebalance_rejoin," : "", + (rkcg->rkcg_rebalance_incr_assignment != NULL) + ? ", rebalance_incr_assignment," + : "", + rd_kafka_cgrp_state_names[rkcg->rkcg_state], + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + return 0; + } +} + + +/** + * @brief Add partition to this cgrp management + * + * @locks none + */ +static void rd_kafka_cgrp_partition_add(rd_kafka_cgrp_t *rkcg, + rd_kafka_toppar_t *rktp) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTADD", + "Group \"%s\": add %s [%" PRId32 "]", + rkcg->rkcg_group_id->str, rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition); + + rd_kafka_toppar_lock(rktp); + rd_assert(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP)); + rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_ON_CGRP; + rd_kafka_toppar_unlock(rktp); + + rd_kafka_toppar_keep(rktp); + rd_list_add(&rkcg->rkcg_toppars, rktp); +} + +/** + * @brief Remove partition from this cgrp management + * + * @locks none + */ +static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg, + rd_kafka_toppar_t *rktp) { + int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0; + rd_kafka_op_t *rko; + rd_kafka_q_t *rkq; + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", + "Group \"%s\": delete %s [%" PRId32 "]", + rkcg->rkcg_group_id->str, rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition); + + rd_kafka_toppar_lock(rktp); + rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP); + rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP; + + if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) { + /* Partition is being removed from the cluster and it's stopped, + * so rktp->rktp_fetchq->rkq_fwdq is NULL. + * Purge remaining operations in rktp->rktp_fetchq->rkq_q, + * while holding lock, to avoid circular references */ + rkq = rktp->rktp_fetchq; + mtx_lock(&rkq->rkq_lock); + rd_assert(!rkq->rkq_fwdq); + + rko = TAILQ_FIRST(&rkq->rkq_q); + while (rko) { + if (rko->rko_type != RD_KAFKA_OP_BARRIER && + rko->rko_type != RD_KAFKA_OP_FETCH) { + rd_kafka_log( + rkcg->rkcg_rk, LOG_WARNING, "PARTDEL", + "Purging toppar fetch queue buffer op" + "with unexpected type: %s", + rd_kafka_op2str(rko->rko_type)); + } + + if (rko->rko_type == RD_KAFKA_OP_BARRIER) + barrier_cnt++; + else if (rko->rko_type == RD_KAFKA_OP_FETCH) + message_cnt++; + else + other_cnt++; + + rko = TAILQ_NEXT(rko, rko_link); + cnt++; + } + + mtx_unlock(&rkq->rkq_lock); + + if (cnt) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", + "Purge toppar fetch queue buffer " + "containing %d op(s) " + "(%d barrier(s), %d message(s), %d other)" + " to avoid " + "circular references", + cnt, barrier_cnt, message_cnt, other_cnt); + rd_kafka_q_purge(rktp->rktp_fetchq); + } else { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", + "Not purging toppar fetch queue buffer." + " No ops present in the buffer."); + } + } + + rd_kafka_toppar_unlock(rktp); + + rd_list_remove(&rkcg->rkcg_toppars, rktp); + + rd_kafka_toppar_destroy(rktp); /* refcnt from _add above */ + + rd_kafka_cgrp_try_terminate(rkcg); +} + + + +/** + * @brief Defer offset commit (rko) until coordinator is available. + * + * @returns 1 if the rko was deferred or 0 if the defer queue is disabled + * or rko already deferred. + */ +static int rd_kafka_cgrp_defer_offset_commit(rd_kafka_cgrp_t *rkcg, + rd_kafka_op_t *rko, + const char *reason) { + + /* wait_coord_q is disabled session.timeout.ms after + * group close() has been initated. */ + if (rko->rko_u.offset_commit.ts_timeout != 0 || + !rd_kafka_q_ready(rkcg->rkcg_wait_coord_q)) + return 0; + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT", + "Group \"%s\": " + "unable to OffsetCommit in state %s: %s: " + "coordinator (%s) is unavailable: " + "retrying later", + rkcg->rkcg_group_id->str, + rd_kafka_cgrp_state_names[rkcg->rkcg_state], reason, + rkcg->rkcg_curr_coord + ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) + : "none"); + + rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS; + rko->rko_u.offset_commit.ts_timeout = + rd_clock() + + (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000); + rd_kafka_q_enq(rkcg->rkcg_wait_coord_q, rko); + + return 1; +} + + +/** + * @brief Update the committed offsets for the partitions in \p offsets, + * + * @remark \p offsets may be NULL if \p err is set + * @returns the number of partitions with errors encountered + */ +static int rd_kafka_cgrp_update_committed_offsets( + rd_kafka_cgrp_t *rkcg, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *offsets) { + int i; + int errcnt = 0; + + /* Update toppars' committed offset or global error */ + for (i = 0; offsets && i < offsets->cnt; i++) { + rd_kafka_topic_partition_t *rktpar = &offsets->elems[i]; + rd_kafka_toppar_t *rktp; + + /* Ignore logical offsets since they were never + * sent to the broker. */ + if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset)) + continue; + + /* Propagate global error to all partitions that don't have + * explicit error set. */ + if (err && !rktpar->err) + rktpar->err = err; + + if (rktpar->err) { + rd_kafka_dbg(rkcg->rkcg_rk, TOPIC, "OFFSET", + "OffsetCommit failed for " + "%s [%" PRId32 + "] at offset " + "%" PRId64 " in join-state %s: %s", + rktpar->topic, rktpar->partition, + rktpar->offset, + rd_kafka_cgrp_join_state_names + [rkcg->rkcg_join_state], + rd_kafka_err2str(rktpar->err)); + + errcnt++; + continue; + } + + rktp = rd_kafka_topic_partition_get_toppar(rkcg->rkcg_rk, + rktpar, rd_false); + if (!rktp) + continue; + + rd_kafka_toppar_lock(rktp); + rktp->rktp_committed_pos = + rd_kafka_topic_partition_get_fetch_pos(rktpar); + rd_kafka_toppar_unlock(rktp); + + rd_kafka_toppar_destroy(rktp); /* from get_toppar() */ + } + + return errcnt; +} + + +/** + * @brief Propagate OffsetCommit results. + * + * @param rko_orig The original rko that triggered the commit, this is used + * to propagate the result. + * @param err Is the aggregated request-level error, or ERR_NO_ERROR. + * @param errcnt Are the number of partitions in \p offsets that failed + * offset commit. + */ +static void rd_kafka_cgrp_propagate_commit_result( + rd_kafka_cgrp_t *rkcg, + rd_kafka_op_t *rko_orig, + rd_kafka_resp_err_t err, + int errcnt, + rd_kafka_topic_partition_list_t *offsets) { + + const rd_kafka_t *rk = rkcg->rkcg_rk; + int offset_commit_cb_served = 0; + + /* If no special callback is set but a offset_commit_cb has + * been set in conf then post an event for the latter. */ + if (!rko_orig->rko_u.offset_commit.cb && rk->rk_conf.offset_commit_cb) { + rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err); + + rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH); + + if (offsets) + rko_reply->rko_u.offset_commit.partitions = + rd_kafka_topic_partition_list_copy(offsets); + + rko_reply->rko_u.offset_commit.cb = + rk->rk_conf.offset_commit_cb; + rko_reply->rko_u.offset_commit.opaque = rk->rk_conf.opaque; + + rd_kafka_q_enq(rk->rk_rep, rko_reply); + offset_commit_cb_served++; + } + + + /* Enqueue reply to requester's queue, if any. */ + if (rko_orig->rko_replyq.q) { + rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err); + + rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH); + + /* Copy offset & partitions & callbacks to reply op */ + rko_reply->rko_u.offset_commit = rko_orig->rko_u.offset_commit; + if (offsets) + rko_reply->rko_u.offset_commit.partitions = + rd_kafka_topic_partition_list_copy(offsets); + if (rko_reply->rko_u.offset_commit.reason) + rko_reply->rko_u.offset_commit.reason = + rd_strdup(rko_reply->rko_u.offset_commit.reason); + + rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko_reply, 0); + offset_commit_cb_served++; + } + + if (!offset_commit_cb_served && offsets && + (errcnt > 0 || (err != RD_KAFKA_RESP_ERR_NO_ERROR && + err != RD_KAFKA_RESP_ERR__NO_OFFSET))) { + /* If there is no callback or handler for this (auto) + * commit then log an error (#1043) */ + char tmp[512]; + + rd_kafka_topic_partition_list_str( + offsets, tmp, sizeof(tmp), + /* Print per-partition errors unless there was a + * request-level error. */ + RD_KAFKA_FMT_F_OFFSET | + (errcnt ? RD_KAFKA_FMT_F_ONLY_ERR : 0)); + + rd_kafka_log( + rkcg->rkcg_rk, LOG_WARNING, "COMMITFAIL", + "Offset commit (%s) failed " + "for %d/%d partition(s) in join-state %s: " + "%s%s%s", + rko_orig->rko_u.offset_commit.reason, + errcnt ? errcnt : offsets->cnt, offsets->cnt, + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], + errcnt ? rd_kafka_err2str(err) : "", errcnt ? ": " : "", + tmp); + } +} + + + +/** + * @brief Handle OffsetCommitResponse + * Takes the original 'rko' as opaque argument. + * @remark \p rkb, rkbuf, and request may be NULL in a number of + * error cases (e.g., _NO_OFFSET, _WAIT_COORD) + */ +static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; + rd_kafka_op_t *rko_orig = opaque; + rd_kafka_topic_partition_list_t *offsets = + rko_orig->rko_u.offset_commit.partitions; /* maybe NULL */ + int errcnt; + + RD_KAFKA_OP_TYPE_ASSERT(rko_orig, RD_KAFKA_OP_OFFSET_COMMIT); + + err = rd_kafka_handle_OffsetCommit(rk, rkb, err, rkbuf, request, + offsets, rd_false); + + /* Suppress empty commit debug logs if allowed */ + if (err != RD_KAFKA_RESP_ERR__NO_OFFSET || + !rko_orig->rko_u.offset_commit.silent_empty) { + if (rkb) + rd_rkb_dbg(rkb, CGRP, "COMMIT", + "OffsetCommit for %d partition(s) in " + "join-state %s: " + "%s: returned: %s", + offsets ? offsets->cnt : -1, + rd_kafka_cgrp_join_state_names + [rkcg->rkcg_join_state], + rko_orig->rko_u.offset_commit.reason, + rd_kafka_err2str(err)); + else + rd_kafka_dbg(rk, CGRP, "COMMIT", + "OffsetCommit for %d partition(s) in " + "join-state " + "%s: %s: " + "returned: %s", + offsets ? offsets->cnt : -1, + rd_kafka_cgrp_join_state_names + [rkcg->rkcg_join_state], + rko_orig->rko_u.offset_commit.reason, + rd_kafka_err2str(err)); + } + + + /* + * Error handling + */ + switch (err) { + case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: + /* Revoke assignment and rebalance on unknown member */ + rd_kafka_cgrp_set_member_id(rk->rk_cgrp, ""); + rd_kafka_cgrp_revoke_all_rejoin_maybe( + rkcg, rd_true /*assignment is lost*/, + rd_true /*this consumer is initiating*/, + "OffsetCommit error: Unknown member"); + break; + + case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION: + /* Revoke assignment and rebalance on illegal generation */ + rk->rk_cgrp->rkcg_generation_id = -1; + rd_kafka_cgrp_revoke_all_rejoin_maybe( + rkcg, rd_true /*assignment is lost*/, + rd_true /*this consumer is initiating*/, + "OffsetCommit error: Illegal generation"); + break; + + case RD_KAFKA_RESP_ERR__IN_PROGRESS: + return; /* Retrying */ + + case RD_KAFKA_RESP_ERR_NOT_COORDINATOR: + case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR__TRANSPORT: + /* The coordinator is not available, defer the offset commit + * to when the coordinator is back up again. */ + + /* Future-proofing, see timeout_scan(). */ + rd_kafka_assert(NULL, err != RD_KAFKA_RESP_ERR__WAIT_COORD); + + if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko_orig, + rd_kafka_err2str(err))) + return; + break; + + default: + break; + } + + /* Call on_commit interceptors */ + if (err != RD_KAFKA_RESP_ERR__NO_OFFSET && + err != RD_KAFKA_RESP_ERR__DESTROY && offsets && offsets->cnt > 0) + rd_kafka_interceptors_on_commit(rk, offsets, err); + + /* Keep track of outstanding commits */ + rd_kafka_assert(NULL, rk->rk_consumer.wait_commit_cnt > 0); + rk->rk_consumer.wait_commit_cnt--; + + if (err == RD_KAFKA_RESP_ERR__DESTROY) { + rd_kafka_op_destroy(rko_orig); + return; /* Handle is terminating, this op may be handled + * by the op enq()ing thread rather than the + * rdkafka main thread, it is not safe to + * continue here. */ + } + + /* Update the committed offsets for each partition's rktp. */ + errcnt = rd_kafka_cgrp_update_committed_offsets(rkcg, err, offsets); + + if (err != RD_KAFKA_RESP_ERR__DESTROY && + !(err == RD_KAFKA_RESP_ERR__NO_OFFSET && + rko_orig->rko_u.offset_commit.silent_empty)) { + /* Propagate commit results (success or permanent error) + * unless we're shutting down or commit was empty. */ + rd_kafka_cgrp_propagate_commit_result(rkcg, rko_orig, err, + errcnt, offsets); + } + + rd_kafka_op_destroy(rko_orig); + + /* If the current state was waiting for commits to finish we'll try to + * transition to the next state. */ + if (rk->rk_consumer.wait_commit_cnt == 0) + rd_kafka_assignment_serve(rk); +} + + +static size_t rd_kafka_topic_partition_has_absolute_offset( + const rd_kafka_topic_partition_t *rktpar, + void *opaque) { + return rktpar->offset >= 0 ? 1 : 0; +} + + +/** + * Commit a list of offsets. + * Reuse the orignating 'rko' for the async reply. + * 'rko->rko_payload' should either by NULL (to commit current assignment) or + * a proper topic_partition_list_t with offsets to commit. + * The offset list will be altered. + * + * \p rko...silent_empty: if there are no offsets to commit bail out + * silently without posting an op on the reply queue. + * \p set_offsets: set offsets and epochs in + * rko->rko_u.offset_commit.partitions from the rktp's + * stored offset. + * + * Locality: cgrp thread + */ +static void rd_kafka_cgrp_offsets_commit(rd_kafka_cgrp_t *rkcg, + rd_kafka_op_t *rko, + rd_bool_t set_offsets, + const char *reason) { + rd_kafka_topic_partition_list_t *offsets; + rd_kafka_resp_err_t err; + int valid_offsets = 0; + int r; + rd_kafka_buf_t *rkbuf; + rd_kafka_op_t *reply; + rd_kafka_consumer_group_metadata_t *cgmetadata; + + if (!(rko->rko_flags & RD_KAFKA_OP_F_REPROCESS)) { + /* wait_commit_cnt has already been increased for + * reprocessed ops. */ + rkcg->rkcg_rk->rk_consumer.wait_commit_cnt++; + } + + /* If offsets is NULL we shall use the current assignment + * (not the group assignment). */ + if (!rko->rko_u.offset_commit.partitions && + rkcg->rkcg_rk->rk_consumer.assignment.all->cnt > 0) { + if (rd_kafka_cgrp_assignment_is_lost(rkcg)) { + /* Not committing assigned offsets: assignment lost */ + err = RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST; + goto err; + } + + rko->rko_u.offset_commit.partitions = + rd_kafka_topic_partition_list_copy( + rkcg->rkcg_rk->rk_consumer.assignment.all); + } + + offsets = rko->rko_u.offset_commit.partitions; + + if (offsets) { + /* Set offsets to commits */ + if (set_offsets) + rd_kafka_topic_partition_list_set_offsets( + rkcg->rkcg_rk, rko->rko_u.offset_commit.partitions, + 1, RD_KAFKA_OFFSET_INVALID /* def */, + 1 /* is commit */); + + /* Check the number of valid offsets to commit. */ + valid_offsets = (int)rd_kafka_topic_partition_list_sum( + offsets, rd_kafka_topic_partition_has_absolute_offset, + NULL); + } + + if (rd_kafka_fatal_error_code(rkcg->rkcg_rk)) { + /* Commits are not allowed when a fatal error has been raised */ + err = RD_KAFKA_RESP_ERR__FATAL; + goto err; + } + + if (!valid_offsets) { + /* No valid offsets */ + err = RD_KAFKA_RESP_ERR__NO_OFFSET; + goto err; + } + + if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP) { + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, + "COMMIT", + "Deferring \"%s\" offset commit " + "for %d partition(s) in state %s: " + "no coordinator available", + reason, valid_offsets, + rd_kafka_cgrp_state_names[rkcg->rkcg_state]); + + if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko, reason)) + return; + + err = RD_KAFKA_RESP_ERR__WAIT_COORD; + goto err; + } + + + rd_rkb_dbg(rkcg->rkcg_coord, CONSUMER | RD_KAFKA_DBG_CGRP, "COMMIT", + "Committing offsets for %d partition(s) with " + "generation-id %" PRId32 " in join-state %s: %s", + valid_offsets, rkcg->rkcg_generation_id, + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], + reason); + + cgmetadata = rd_kafka_consumer_group_metadata_new_with_genid( + rkcg->rkcg_rk->rk_conf.group_id_str, rkcg->rkcg_generation_id, + rkcg->rkcg_member_id->str, + rkcg->rkcg_rk->rk_conf.group_instance_id); + + /* Send OffsetCommit */ + r = rd_kafka_OffsetCommitRequest(rkcg->rkcg_coord, cgmetadata, offsets, + RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), + rd_kafka_cgrp_op_handle_OffsetCommit, + rko, reason); + rd_kafka_consumer_group_metadata_destroy(cgmetadata); + + /* Must have valid offsets to commit if we get here */ + rd_kafka_assert(NULL, r != 0); + + return; + +err: + if (err != RD_KAFKA_RESP_ERR__NO_OFFSET) + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, + "COMMIT", "OffsetCommit internal error: %s", + rd_kafka_err2str(err)); + + /* Propagate error through dummy buffer object that will + * call the response handler from the main loop, avoiding + * any recursive calls from op_handle_OffsetCommit -> + * assignment_serve() and then back to cgrp_assigned_offsets_commit() */ + + reply = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF); + reply->rko_rk = rkcg->rkcg_rk; /* Set rk since the rkbuf will not + * have a rkb to reach it. */ + reply->rko_err = err; + + rkbuf = rd_kafka_buf_new(0, 0); + rkbuf->rkbuf_cb = rd_kafka_cgrp_op_handle_OffsetCommit; + rkbuf->rkbuf_opaque = rko; + reply->rko_u.xbuf.rkbuf = rkbuf; + + rd_kafka_q_enq(rkcg->rkcg_ops, reply); +} + + +/** + * @brief Commit offsets assigned partitions. + * + * If \p offsets is NULL all partitions in the current assignment will be used. + * If \p set_offsets is true the offsets to commit will be read from the + * rktp's stored offset rather than the .offset fields in \p offsets. + * + * rkcg_wait_commit_cnt will be increased accordingly. + */ +void rd_kafka_cgrp_assigned_offsets_commit( + rd_kafka_cgrp_t *rkcg, + const rd_kafka_topic_partition_list_t *offsets, + rd_bool_t set_offsets, + const char *reason) { + rd_kafka_op_t *rko; + + if (rd_kafka_cgrp_assignment_is_lost(rkcg)) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "AUTOCOMMIT", + "Group \"%s\": not committing assigned offsets: " + "assignment lost", + rkcg->rkcg_group_id->str); + return; + } + + rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT); + rko->rko_u.offset_commit.reason = rd_strdup(reason); + if (rkcg->rkcg_rk->rk_conf.enabled_events & + RD_KAFKA_EVENT_OFFSET_COMMIT) { + /* Send results to application */ + rd_kafka_op_set_replyq(rko, rkcg->rkcg_rk->rk_rep, 0); + rko->rko_u.offset_commit.cb = + rkcg->rkcg_rk->rk_conf.offset_commit_cb; /*maybe NULL*/ + rko->rko_u.offset_commit.opaque = rkcg->rkcg_rk->rk_conf.opaque; + } + /* NULL partitions means current assignment */ + if (offsets) + rko->rko_u.offset_commit.partitions = + rd_kafka_topic_partition_list_copy(offsets); + rko->rko_u.offset_commit.silent_empty = 1; + rd_kafka_cgrp_offsets_commit(rkcg, rko, set_offsets, reason); +} + + +/** + * auto.commit.interval.ms commit timer callback. + * + * Trigger a group offset commit. + * + * Locality: rdkafka main thread + */ +static void rd_kafka_cgrp_offset_commit_tmr_cb(rd_kafka_timers_t *rkts, + void *arg) { + rd_kafka_cgrp_t *rkcg = arg; + + /* Don't attempt auto commit when rebalancing or initializing since + * the rkcg_generation_id is most likely in flux. */ + if (rkcg->rkcg_subscription && + rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_STEADY) + return; + + rd_kafka_cgrp_assigned_offsets_commit( + rkcg, NULL, rd_true /*set offsets*/, "cgrp auto commit timer"); +} + + +/** + * @brief If rkcg_next_subscription or rkcg_next_unsubscribe are + * set, trigger a state change so that they are applied from the + * main dispatcher. + * + * @returns rd_true if a subscribe was scheduled, else false. + */ +static rd_bool_t +rd_kafka_trigger_waiting_subscribe_maybe(rd_kafka_cgrp_t *rkcg) { + + if (rkcg->rkcg_next_subscription || rkcg->rkcg_next_unsubscribe) { + /* Skip the join backoff */ + rd_interval_reset(&rkcg->rkcg_join_intvl); + rd_kafka_cgrp_rejoin(rkcg, "Applying next subscription"); + return rd_true; + } + + return rd_false; +} + + +/** + * @brief Incrementally add to an existing partition assignment + * May update \p partitions but will not hold on to it. + * + * @returns an error object or NULL on success. + */ +static rd_kafka_error_t * +rd_kafka_cgrp_incremental_assign(rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *partitions) { + rd_kafka_error_t *error; + + error = rd_kafka_assignment_add(rkcg->rkcg_rk, partitions); + if (error) + return error; + + if (rkcg->rkcg_join_state == + RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL) { + rd_kafka_assignment_resume(rkcg->rkcg_rk, + "incremental assign called"); + rd_kafka_cgrp_set_join_state(rkcg, + RD_KAFKA_CGRP_JOIN_STATE_STEADY); + + if (rkcg->rkcg_subscription) { + /* If using subscribe(), start a timer to enforce + * `max.poll.interval.ms`. + * Instead of restarting the timer on each ...poll() + * call, which would be costly (once per message), + * set up an intervalled timer that checks a timestamp + * (that is updated on ..poll()). + * The timer interval is 2 hz. */ + rd_kafka_timer_start( + &rkcg->rkcg_rk->rk_timers, + &rkcg->rkcg_max_poll_interval_tmr, + 500 * 1000ll /* 500ms */, + rd_kafka_cgrp_max_poll_interval_check_tmr_cb, rkcg); + } + } + + rd_kafka_cgrp_assignment_clear_lost(rkcg, + "incremental_assign() called"); + + return NULL; +} + + +/** + * @brief Incrementally remove partitions from an existing partition + * assignment. May update \p partitions but will not hold on + * to it. + * + * @remark This method does not unmark the current assignment as lost + * (if lost). That happens following _incr_unassign_done and + * a group-rejoin initiated. + * + * @returns An error object or NULL on success. + */ +static rd_kafka_error_t *rd_kafka_cgrp_incremental_unassign( + rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *partitions) { + rd_kafka_error_t *error; + + error = rd_kafka_assignment_subtract(rkcg->rkcg_rk, partitions); + if (error) + return error; + + if (rkcg->rkcg_join_state == + RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL) { + rd_kafka_assignment_resume(rkcg->rkcg_rk, + "incremental unassign called"); + rd_kafka_cgrp_set_join_state( + rkcg, + RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE); + } + + rd_kafka_cgrp_assignment_clear_lost(rkcg, + "incremental_unassign() called"); + + return NULL; +} + + +/** + * @brief Call when all incremental unassign operations are done to transition + * to the next state. + */ +static void rd_kafka_cgrp_incr_unassign_done(rd_kafka_cgrp_t *rkcg) { + + /* If this action was underway when a terminate was initiated, it will + * be left to complete. Now that's done, unassign all partitions */ + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN", + "Group \"%s\" is terminating, initiating full " + "unassign", + rkcg->rkcg_group_id->str); + rd_kafka_cgrp_unassign(rkcg); + return; + } + + if (rkcg->rkcg_rebalance_incr_assignment) { + + /* This incremental unassign was part of a normal rebalance + * (in which the revoke set was not empty). Immediately + * trigger the assign that follows this revoke. The protocol + * dictates this should occur even if the new assignment + * set is empty. + * + * Also, since this rebalance had some revoked partitions, + * a re-join should occur following the assign. + */ + + rd_kafka_rebalance_op_incr(rkcg, + RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, + rkcg->rkcg_rebalance_incr_assignment, + rd_true /*rejoin following assign*/, + "cooperative assign after revoke"); + + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_rebalance_incr_assignment); + rkcg->rkcg_rebalance_incr_assignment = NULL; + + /* Note: rkcg_rebalance_rejoin is actioned / reset in + * rd_kafka_cgrp_incremental_assign call */ + + } else if (rkcg->rkcg_rebalance_rejoin) { + rkcg->rkcg_rebalance_rejoin = rd_false; + + /* There are some cases (lost partitions), where a rejoin + * should occur immediately following the unassign (this + * is not the case under normal conditions), in which case + * the rejoin flag will be set. */ + + /* Skip the join backoff */ + rd_interval_reset(&rkcg->rkcg_join_intvl); + + rd_kafka_cgrp_rejoin(rkcg, "Incremental unassignment done"); + + } else if (!rd_kafka_trigger_waiting_subscribe_maybe(rkcg)) { + /* After this incremental unassignment we're now back in + * a steady state. */ + rd_kafka_cgrp_set_join_state(rkcg, + RD_KAFKA_CGRP_JOIN_STATE_STEADY); + } +} + + +/** + * @brief Call when all absolute (non-incremental) unassign operations are done + * to transition to the next state. + */ +static void rd_kafka_cgrp_unassign_done(rd_kafka_cgrp_t *rkcg) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN", + "Group \"%s\": unassign done in state %s " + "(join-state %s)", + rkcg->rkcg_group_id->str, + rd_kafka_cgrp_state_names[rkcg->rkcg_state], + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + + /* Leave group, if desired. */ + rd_kafka_cgrp_leave_maybe(rkcg); + + if (rkcg->rkcg_join_state != + RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE) + return; + + /* All partitions are unassigned. Rejoin the group. */ + + /* Skip the join backoff */ + rd_interval_reset(&rkcg->rkcg_join_intvl); + + rd_kafka_cgrp_rejoin(rkcg, "Unassignment done"); +} + + + +/** + * @brief Called from assignment code when all in progress + * assignment/unassignment operations are done, allowing the cgrp to + * transition to other states if needed. + * + * @remark This may be called spontaneously without any need for a state + * change in the rkcg. + */ +void rd_kafka_cgrp_assignment_done(rd_kafka_cgrp_t *rkcg) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNDONE", + "Group \"%s\": " + "assignment operations done in join-state %s " + "(rebalance rejoin=%s)", + rkcg->rkcg_group_id->str, + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], + RD_STR_ToF(rkcg->rkcg_rebalance_rejoin)); + + switch (rkcg->rkcg_join_state) { + case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE: + rd_kafka_cgrp_unassign_done(rkcg); + break; + + case RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE: + rd_kafka_cgrp_incr_unassign_done(rkcg); + break; + + case RD_KAFKA_CGRP_JOIN_STATE_STEADY: + /* If an updated/next subscription is available, schedule it. */ + if (rd_kafka_trigger_waiting_subscribe_maybe(rkcg)) + break; + + if (rkcg->rkcg_rebalance_rejoin) { + rkcg->rkcg_rebalance_rejoin = rd_false; + + /* Skip the join backoff */ + rd_interval_reset(&rkcg->rkcg_join_intvl); + + rd_kafka_cgrp_rejoin( + rkcg, + "rejoining group to redistribute " + "previously owned partitions to other " + "group members"); + break; + } + + /* FALLTHRU */ + + case RD_KAFKA_CGRP_JOIN_STATE_INIT: + /* Check if cgrp is trying to terminate, which is safe to do + * in these two states. Otherwise we'll need to wait for + * the current state to decommission. */ + rd_kafka_cgrp_try_terminate(rkcg); + break; + + default: + break; + } +} + + + +/** + * @brief Remove existing assignment. + */ +static rd_kafka_error_t *rd_kafka_cgrp_unassign(rd_kafka_cgrp_t *rkcg) { + + rd_kafka_assignment_clear(rkcg->rkcg_rk); + + if (rkcg->rkcg_join_state == + RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL) { + rd_kafka_assignment_resume(rkcg->rkcg_rk, "unassign called"); + rd_kafka_cgrp_set_join_state( + rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE); + } + + rd_kafka_cgrp_assignment_clear_lost(rkcg, "unassign() called"); + + return NULL; +} + + +/** + * @brief Set new atomic partition assignment + * May update \p assignment but will not hold on to it. + * + * @returns NULL on success or an error if a fatal error has been raised. + */ +static rd_kafka_error_t * +rd_kafka_cgrp_assign(rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *assignment) { + rd_kafka_error_t *error; + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "ASSIGN", + "Group \"%s\": new assignment of %d partition(s) " + "in join-state %s", + rkcg->rkcg_group_id->str, assignment ? assignment->cnt : 0, + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + + /* Clear existing assignment, if any, and serve its removals. */ + if (rd_kafka_assignment_clear(rkcg->rkcg_rk)) + rd_kafka_assignment_serve(rkcg->rkcg_rk); + + error = rd_kafka_assignment_add(rkcg->rkcg_rk, assignment); + if (error) + return error; + + rd_kafka_cgrp_assignment_clear_lost(rkcg, "assign() called"); + + if (rkcg->rkcg_join_state == + RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL) { + rd_kafka_assignment_resume(rkcg->rkcg_rk, "assign called"); + rd_kafka_cgrp_set_join_state(rkcg, + RD_KAFKA_CGRP_JOIN_STATE_STEADY); + + if (rkcg->rkcg_subscription) { + /* If using subscribe(), start a timer to enforce + * `max.poll.interval.ms`. + * Instead of restarting the timer on each ...poll() + * call, which would be costly (once per message), + * set up an intervalled timer that checks a timestamp + * (that is updated on ..poll()). + * The timer interval is 2 hz. */ + rd_kafka_timer_start( + &rkcg->rkcg_rk->rk_timers, + &rkcg->rkcg_max_poll_interval_tmr, + 500 * 1000ll /* 500ms */, + rd_kafka_cgrp_max_poll_interval_check_tmr_cb, rkcg); + } + } + + return NULL; +} + + + +/** + * @brief Construct a typed map from list \p rktparlist with key corresponding + * to each element in the list and value NULL. + * + * @remark \p rktparlist may be NULL. + */ +static map_toppar_member_info_t *rd_kafka_toppar_list_to_toppar_member_info_map( + rd_kafka_topic_partition_list_t *rktparlist) { + map_toppar_member_info_t *map = rd_calloc(1, sizeof(*map)); + const rd_kafka_topic_partition_t *rktpar; + + RD_MAP_INIT(map, rktparlist ? rktparlist->cnt : 0, + rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash, + rd_kafka_topic_partition_destroy_free, + PartitionMemberInfo_free); + + if (!rktparlist) + return map; + + RD_KAFKA_TPLIST_FOREACH(rktpar, rktparlist) + RD_MAP_SET(map, rd_kafka_topic_partition_copy(rktpar), + PartitionMemberInfo_new(NULL, rd_false)); + + return map; +} + + +/** + * @brief Construct a toppar list from map \p map with elements corresponding + * to the keys of \p map. + */ +static rd_kafka_topic_partition_list_t * +rd_kafka_toppar_member_info_map_to_list(map_toppar_member_info_t *map) { + const rd_kafka_topic_partition_t *k; + rd_kafka_topic_partition_list_t *list = + rd_kafka_topic_partition_list_new((int)RD_MAP_CNT(map)); + + RD_MAP_FOREACH_KEY(k, map) { + rd_kafka_topic_partition_list_add(list, k->topic, k->partition); + } + + return list; +} + + +/** + * @brief Handle a rebalance-triggered partition assignment + * (COOPERATIVE case). + */ +static void rd_kafka_cgrp_handle_assignment_cooperative( + rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *assignment) { + map_toppar_member_info_t *new_assignment_set; + map_toppar_member_info_t *old_assignment_set; + map_toppar_member_info_t *newly_added_set; + map_toppar_member_info_t *revoked_set; + rd_kafka_topic_partition_list_t *newly_added; + rd_kafka_topic_partition_list_t *revoked; + + new_assignment_set = + rd_kafka_toppar_list_to_toppar_member_info_map(assignment); + + old_assignment_set = rd_kafka_toppar_list_to_toppar_member_info_map( + rkcg->rkcg_group_assignment); + + newly_added_set = rd_kafka_member_partitions_subtract( + new_assignment_set, old_assignment_set); + revoked_set = rd_kafka_member_partitions_subtract(old_assignment_set, + new_assignment_set); + + newly_added = rd_kafka_toppar_member_info_map_to_list(newly_added_set); + revoked = rd_kafka_toppar_member_info_map_to_list(revoked_set); + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COOPASSIGN", + "Group \"%s\": incremental assignment: %d newly added, " + "%d revoked partitions based on assignment of %d " + "partitions", + rkcg->rkcg_group_id->str, newly_added->cnt, revoked->cnt, + assignment->cnt); + + if (revoked->cnt > 0) { + /* Setting rkcg_incr_assignment causes a follow on incremental + * assign rebalance op after completion of this incremental + * unassign op. */ + + rkcg->rkcg_rebalance_incr_assignment = newly_added; + newly_added = NULL; + + rd_kafka_rebalance_op_incr(rkcg, + RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, + revoked, rd_false /*no rejoin + following unassign*/ + , + "sync group revoke"); + + } else { + /* There are no revoked partitions - trigger the assign + * rebalance op, and flag that the group does not need + * to be re-joined */ + + rd_kafka_rebalance_op_incr( + rkcg, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, newly_added, + rd_false /*no rejoin following assign*/, + "sync group assign"); + } + + if (newly_added) + rd_kafka_topic_partition_list_destroy(newly_added); + rd_kafka_topic_partition_list_destroy(revoked); + RD_MAP_DESTROY_AND_FREE(revoked_set); + RD_MAP_DESTROY_AND_FREE(newly_added_set); + RD_MAP_DESTROY_AND_FREE(old_assignment_set); + RD_MAP_DESTROY_AND_FREE(new_assignment_set); +} + + +/** + * @brief Sets or clears the group's partition assignment for our consumer. + * + * Will replace the current group assignment, if any. + */ +static void rd_kafka_cgrp_group_assignment_set( + rd_kafka_cgrp_t *rkcg, + const rd_kafka_topic_partition_list_t *partitions) { + + if (rkcg->rkcg_group_assignment) + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_group_assignment); + + if (partitions) { + rkcg->rkcg_group_assignment = + rd_kafka_topic_partition_list_copy(partitions); + rd_kafka_topic_partition_list_sort_by_topic( + rkcg->rkcg_group_assignment); + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNMENT", + "Group \"%s\": setting group assignment to %d " + "partition(s)", + rkcg->rkcg_group_id->str, + rkcg->rkcg_group_assignment->cnt); + + } else { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNMENT", + "Group \"%s\": clearing group assignment", + rkcg->rkcg_group_id->str); + rkcg->rkcg_group_assignment = NULL; + } + + rd_kafka_wrlock(rkcg->rkcg_rk); + rkcg->rkcg_c.assignment_size = + rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0; + rd_kafka_wrunlock(rkcg->rkcg_rk); + + if (rkcg->rkcg_group_assignment) + rd_kafka_topic_partition_list_log( + rkcg->rkcg_rk, "GRPASSIGNMENT", RD_KAFKA_DBG_CGRP, + rkcg->rkcg_group_assignment); +} + + +/** + * @brief Adds or removes \p partitions from the current group assignment. + * + * @param add Whether to add or remove the partitions. + * + * @remark The added partitions must not already be on the group assignment, + * and the removed partitions must be on the group assignment. + * + * To be used with incremental rebalancing. + * + */ +static void rd_kafka_cgrp_group_assignment_modify( + rd_kafka_cgrp_t *rkcg, + rd_bool_t add, + const rd_kafka_topic_partition_list_t *partitions) { + const rd_kafka_topic_partition_t *rktpar; + int precnt; + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP, "ASSIGNMENT", + "Group \"%s\": %d partition(s) being %s group assignment " + "of %d partition(s)", + rkcg->rkcg_group_id->str, partitions->cnt, + add ? "added to" : "removed from", + rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0); + + if (partitions == rkcg->rkcg_group_assignment) { + /* \p partitions is the actual assignment, which + * must mean it is all to be removed. + * Short-cut directly to set(NULL). */ + rd_assert(!add); + rd_kafka_cgrp_group_assignment_set(rkcg, NULL); + return; + } + + if (add && (!rkcg->rkcg_group_assignment || + rkcg->rkcg_group_assignment->cnt == 0)) { + /* Adding to an empty assignment is a set operation. */ + rd_kafka_cgrp_group_assignment_set(rkcg, partitions); + return; + } + + if (!add) { + /* Removing from an empty assignment is illegal. */ + rd_assert(rkcg->rkcg_group_assignment != NULL && + rkcg->rkcg_group_assignment->cnt > 0); + } + + + precnt = rkcg->rkcg_group_assignment->cnt; + RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) { + int idx; + + idx = rd_kafka_topic_partition_list_find_idx( + rkcg->rkcg_group_assignment, rktpar->topic, + rktpar->partition); + + if (add) { + rd_assert(idx == -1); + + rd_kafka_topic_partition_list_add_copy( + rkcg->rkcg_group_assignment, rktpar); + + } else { + rd_assert(idx != -1); + + rd_kafka_topic_partition_list_del_by_idx( + rkcg->rkcg_group_assignment, idx); + } + } + + if (add) + rd_assert(precnt + partitions->cnt == + rkcg->rkcg_group_assignment->cnt); + else + rd_assert(precnt - partitions->cnt == + rkcg->rkcg_group_assignment->cnt); + + if (rkcg->rkcg_group_assignment->cnt == 0) { + rd_kafka_topic_partition_list_destroy( + rkcg->rkcg_group_assignment); + rkcg->rkcg_group_assignment = NULL; + + } else if (add) + rd_kafka_topic_partition_list_sort_by_topic( + rkcg->rkcg_group_assignment); + + rd_kafka_wrlock(rkcg->rkcg_rk); + rkcg->rkcg_c.assignment_size = + rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0; + rd_kafka_wrunlock(rkcg->rkcg_rk); + + if (rkcg->rkcg_group_assignment) + rd_kafka_topic_partition_list_log( + rkcg->rkcg_rk, "GRPASSIGNMENT", RD_KAFKA_DBG_CGRP, + rkcg->rkcg_group_assignment); +} + + +/** + * @brief Handle a rebalance-triggered partition assignment. + * + * If a rebalance_cb has been registered we enqueue an op for the app + * and let the app perform the actual assign() call. Otherwise we + * assign() directly from here. + * + * This provides the most flexibility, allowing the app to perform any + * operation it seem fit (e.g., offset writes or reads) before actually + * updating the assign():ment. + */ +static void +rd_kafka_cgrp_handle_assignment(rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *assignment) { + + if (rd_kafka_cgrp_rebalance_protocol(rkcg) == + RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE) { + rd_kafka_cgrp_handle_assignment_cooperative(rkcg, assignment); + } else { + + rd_kafka_rebalance_op(rkcg, + RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, + assignment, "new assignment"); + } +} + + +/** + * Clean up any group-leader related resources. + * + * Locality: cgrp thread + */ +static void rd_kafka_cgrp_group_leader_reset(rd_kafka_cgrp_t *rkcg, + const char *reason) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPLEADER", + "Group \"%.*s\": resetting group leader info: %s", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason); + + if (rkcg->rkcg_group_leader.members) { + int i; + + for (i = 0; i < rkcg->rkcg_group_leader.member_cnt; i++) + rd_kafka_group_member_clear( + &rkcg->rkcg_group_leader.members[i]); + rkcg->rkcg_group_leader.member_cnt = 0; + rd_free(rkcg->rkcg_group_leader.members); + rkcg->rkcg_group_leader.members = NULL; + } +} + + +/** + * @brief React to a RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS broker response. + */ +static void rd_kafka_cgrp_group_is_rebalancing(rd_kafka_cgrp_t *rkcg) { + + if (rd_kafka_cgrp_rebalance_protocol(rkcg) == + RD_KAFKA_REBALANCE_PROTOCOL_EAGER) { + rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_false /*lost*/, + rd_false /*initiating*/, + "rebalance in progress"); + return; + } + + + /* In the COOPERATIVE case, simply rejoin the group + * - partitions are unassigned on SyncGroup response, + * not prior to JoinGroup as with the EAGER case. */ + + if (RD_KAFKA_CGRP_REBALANCING(rkcg)) { + rd_kafka_dbg( + rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REBALANCE", + "Group \"%.*s\": skipping " + "COOPERATIVE rebalance in state %s " + "(join-state %s)%s%s%s", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_cgrp_state_names[rkcg->rkcg_state], + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], + RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) + ? " (awaiting assign call)" + : "", + (rkcg->rkcg_rebalance_incr_assignment != NULL) + ? " (incremental assignment pending)" + : "", + rkcg->rkcg_rebalance_rejoin ? " (rebalance rejoin)" : ""); + return; + } + + rd_kafka_cgrp_rejoin(rkcg, "Group is rebalancing"); +} + + + +/** + * @brief Triggers the application rebalance callback if required to + * revoke partitions, and transition to INIT state for (eventual) + * rejoin. Does nothing if a rebalance workflow is already in + * progress + */ +static void rd_kafka_cgrp_revoke_all_rejoin_maybe(rd_kafka_cgrp_t *rkcg, + rd_bool_t assignment_lost, + rd_bool_t initiating, + const char *reason) { + if (RD_KAFKA_CGRP_REBALANCING(rkcg)) { + rd_kafka_dbg( + rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REBALANCE", + "Group \"%.*s\": rebalance (%s) " + "already in progress, skipping in state %s " + "(join-state %s) with %d assigned partition(s)%s%s%s: " + "%s", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_rebalance_protocol2str( + rd_kafka_cgrp_rebalance_protocol(rkcg)), + rd_kafka_cgrp_state_names[rkcg->rkcg_state], + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], + rkcg->rkcg_group_assignment + ? rkcg->rkcg_group_assignment->cnt + : 0, + assignment_lost ? " (lost)" : "", + rkcg->rkcg_rebalance_incr_assignment + ? ", incremental assignment in progress" + : "", + rkcg->rkcg_rebalance_rejoin ? ", rejoin on rebalance" : "", + reason); + return; + } + + rd_kafka_cgrp_revoke_all_rejoin(rkcg, assignment_lost, initiating, + reason); +} + + +/** + * @brief Triggers the application rebalance callback if required to + * revoke partitions, and transition to INIT state for (eventual) + * rejoin. + */ +static void rd_kafka_cgrp_revoke_all_rejoin(rd_kafka_cgrp_t *rkcg, + rd_bool_t assignment_lost, + rd_bool_t initiating, + const char *reason) { + + rd_kafka_rebalance_protocol_t protocol = + rd_kafka_cgrp_rebalance_protocol(rkcg); + + rd_bool_t terminating = + unlikely(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE); + + + rd_kafka_dbg( + rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REBALANCE", + "Group \"%.*s\" %s (%s) in state %s (join-state %s) " + "with %d assigned partition(s)%s: %s", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + initiating ? "initiating rebalance" : "is rebalancing", + rd_kafka_rebalance_protocol2str(protocol), + rd_kafka_cgrp_state_names[rkcg->rkcg_state], + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], + rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0, + assignment_lost ? " (lost)" : "", reason); + + rd_snprintf(rkcg->rkcg_c.rebalance_reason, + sizeof(rkcg->rkcg_c.rebalance_reason), "%s", reason); + + + if (protocol == RD_KAFKA_REBALANCE_PROTOCOL_EAGER || + protocol == RD_KAFKA_REBALANCE_PROTOCOL_NONE) { + /* EAGER case (or initial subscribe) - revoke partitions which + * will be followed by rejoin, if required. */ + + if (assignment_lost) + rd_kafka_cgrp_assignment_set_lost( + rkcg, "%s: revoking assignment and rejoining", + reason); + + /* Schedule application rebalance op if there is an existing + * assignment (albeit perhaps empty) and there is no + * outstanding rebalance op in progress. */ + if (rkcg->rkcg_group_assignment && + !RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg)) { + rd_kafka_rebalance_op( + rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, + rkcg->rkcg_group_assignment, reason); + } else { + /* Skip the join backoff */ + rd_interval_reset(&rkcg->rkcg_join_intvl); + + rd_kafka_cgrp_rejoin(rkcg, "%s", reason); + } + + return; + } + + + /* COOPERATIVE case. */ + + /* All partitions should never be revoked unless terminating, leaving + * the group, or on assignment lost. Another scenario represents a + * logic error. Fail fast in this case. */ + if (!(terminating || assignment_lost || + (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE))) { + rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE", + "Group \"%s\": unexpected instruction to revoke " + "current assignment and rebalance " + "(terminating=%d, assignment_lost=%d, " + "LEAVE_ON_UNASSIGN_DONE=%d)", + rkcg->rkcg_group_id->str, terminating, + assignment_lost, + (rkcg->rkcg_flags & + RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE)); + rd_dassert(!*"BUG: unexpected instruction to revoke " + "current assignment and rebalance"); + } + + if (rkcg->rkcg_group_assignment && + rkcg->rkcg_group_assignment->cnt > 0) { + if (assignment_lost) + rd_kafka_cgrp_assignment_set_lost( + rkcg, + "%s: revoking incremental assignment " + "and rejoining", + reason); + + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, + "REBALANCE", + "Group \"%.*s\": revoking " + "all %d partition(s)%s%s", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rkcg->rkcg_group_assignment->cnt, + terminating ? " (terminating)" : "", + assignment_lost ? " (assignment lost)" : ""); + + rd_kafka_rebalance_op_incr( + rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, + rkcg->rkcg_group_assignment, + terminating ? rd_false : rd_true /*rejoin*/, reason); + + return; + } + + if (terminating) { + /* If terminating, then don't rejoin group. */ + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, + "REBALANCE", + "Group \"%.*s\": consumer is " + "terminating, skipping rejoin", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); + return; + } + + rd_kafka_cgrp_rejoin(rkcg, "Current assignment is empty"); +} + + +/** + * @brief `max.poll.interval.ms` enforcement check timer. + * + * @locality rdkafka main thread + * @locks none + */ +static void +rd_kafka_cgrp_max_poll_interval_check_tmr_cb(rd_kafka_timers_t *rkts, + void *arg) { + rd_kafka_cgrp_t *rkcg = arg; + rd_kafka_t *rk = rkcg->rkcg_rk; + int exceeded; + + exceeded = rd_kafka_max_poll_exceeded(rk); + + if (likely(!exceeded)) + return; + + rd_kafka_log(rk, LOG_WARNING, "MAXPOLL", + "Application maximum poll interval (%dms) " + "exceeded by %dms " + "(adjust max.poll.interval.ms for " + "long-running message processing): " + "leaving group", + rk->rk_conf.max_poll_interval_ms, exceeded); + + rd_kafka_consumer_err(rkcg->rkcg_q, RD_KAFKA_NODEID_UA, + RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED, 0, NULL, + NULL, RD_KAFKA_OFFSET_INVALID, + "Application maximum poll interval (%dms) " + "exceeded by %dms", + rk->rk_conf.max_poll_interval_ms, exceeded); + + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED; + + rd_kafka_timer_stop(rkts, &rkcg->rkcg_max_poll_interval_tmr, + 1 /*lock*/); + + /* Leave the group before calling rebalance since the standard leave + * will be triggered first after the rebalance callback has been served. + * But since the application is blocked still doing processing + * that leave will be further delayed. + * + * KIP-345: static group members should continue to respect + * `max.poll.interval.ms` but should not send a LeaveGroupRequest. + */ + if (!RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg)) + rd_kafka_cgrp_leave(rkcg); + + /* Timing out or leaving the group invalidates the member id, reset it + * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */ + rd_kafka_cgrp_set_member_id(rkcg, ""); + + /* Trigger rebalance */ + rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_true /*lost*/, + rd_true /*initiating*/, + "max.poll.interval.ms exceeded"); +} + + +/** + * @brief Generate consumer errors for each topic in the list. + * + * Also replaces the list of last reported topic errors so that repeated + * errors are silenced. + * + * @param errored Errored topics. + * @param error_prefix Error message prefix. + * + * @remark Assumes ownership of \p errored. + */ +static void rd_kafka_propagate_consumer_topic_errors( + rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *errored, + const char *error_prefix) { + int i; + + for (i = 0; i < errored->cnt; i++) { + rd_kafka_topic_partition_t *topic = &errored->elems[i]; + rd_kafka_topic_partition_t *prev; + + rd_assert(topic->err); + + /* Normalize error codes, unknown topic may be + * reported by the broker, or the lack of a topic in + * metadata response is figured out by the client. + * Make sure the application only sees one error code + * for both these cases. */ + if (topic->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) + topic->err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + + /* Check if this topic errored previously */ + prev = rd_kafka_topic_partition_list_find( + rkcg->rkcg_errored_topics, topic->topic, + RD_KAFKA_PARTITION_UA); + + if (prev && prev->err == topic->err) + continue; /* This topic already reported same error */ + + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_TOPIC, + "TOPICERR", "%s: %s: %s", error_prefix, + topic->topic, rd_kafka_err2str(topic->err)); + + /* Send consumer error to application */ + rd_kafka_consumer_err( + rkcg->rkcg_q, RD_KAFKA_NODEID_UA, topic->err, 0, + topic->topic, NULL, RD_KAFKA_OFFSET_INVALID, "%s: %s: %s", + error_prefix, topic->topic, rd_kafka_err2str(topic->err)); + } + + rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics); + rkcg->rkcg_errored_topics = errored; +} + + +/** + * @brief Work out the topics currently subscribed to that do not + * match any pattern in \p subscription. + */ +static rd_kafka_topic_partition_list_t *rd_kafka_cgrp_get_unsubscribing_topics( + rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *subscription) { + int i; + rd_kafka_topic_partition_list_t *result; + + result = rd_kafka_topic_partition_list_new( + rkcg->rkcg_subscribed_topics->rl_cnt); + + /* TODO: Something that isn't O(N*M) */ + for (i = 0; i < rkcg->rkcg_subscribed_topics->rl_cnt; i++) { + int j; + const char *topic = + ((rd_kafka_topic_info_t *) + rkcg->rkcg_subscribed_topics->rl_elems[i]) + ->topic; + + for (j = 0; j < subscription->cnt; j++) { + const char *pattern = subscription->elems[j].topic; + if (rd_kafka_topic_match(rkcg->rkcg_rk, pattern, + topic)) { + break; + } + } + + if (j == subscription->cnt) + rd_kafka_topic_partition_list_add( + result, topic, RD_KAFKA_PARTITION_UA); + } + + if (result->cnt == 0) { + rd_kafka_topic_partition_list_destroy(result); + return NULL; + } + + return result; +} + + +/** + * @brief Determine the partitions to revoke, given the topics being + * unassigned. + */ +static rd_kafka_topic_partition_list_t * +rd_kafka_cgrp_calculate_subscribe_revoking_partitions( + rd_kafka_cgrp_t *rkcg, + const rd_kafka_topic_partition_list_t *unsubscribing) { + rd_kafka_topic_partition_list_t *revoking; + const rd_kafka_topic_partition_t *rktpar; + + if (!unsubscribing) + return NULL; + + if (!rkcg->rkcg_group_assignment || + rkcg->rkcg_group_assignment->cnt == 0) + return NULL; + + revoking = + rd_kafka_topic_partition_list_new(rkcg->rkcg_group_assignment->cnt); + + /* TODO: Something that isn't O(N*M). */ + RD_KAFKA_TPLIST_FOREACH(rktpar, unsubscribing) { + const rd_kafka_topic_partition_t *assigned; + + RD_KAFKA_TPLIST_FOREACH(assigned, rkcg->rkcg_group_assignment) { + if (!strcmp(assigned->topic, rktpar->topic)) { + rd_kafka_topic_partition_list_add( + revoking, assigned->topic, + assigned->partition); + continue; + } + } + } + + if (revoking->cnt == 0) { + rd_kafka_topic_partition_list_destroy(revoking); + revoking = NULL; + } + + return revoking; +} + + +/** + * @brief Handle a new subscription that is modifying an existing subscription + * in the COOPERATIVE case. + * + * @remark Assumes ownership of \p rktparlist. + */ +static rd_kafka_resp_err_t +rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *rktparlist) { + rd_kafka_topic_partition_list_t *unsubscribing_topics; + rd_kafka_topic_partition_list_t *revoking; + rd_list_t *tinfos; + rd_kafka_topic_partition_list_t *errored; + int metadata_age; + int old_cnt = rkcg->rkcg_subscription->cnt; + + rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION; + + if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0) + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION; + + /* Topics in rkcg_subscribed_topics that don't match any pattern in + the new subscription. */ + unsubscribing_topics = + rd_kafka_cgrp_get_unsubscribing_topics(rkcg, rktparlist); + + /* Currently assigned topic partitions that are no longer desired. */ + revoking = rd_kafka_cgrp_calculate_subscribe_revoking_partitions( + rkcg, unsubscribing_topics); + + rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription); + rkcg->rkcg_subscription = rktparlist; + + if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age, + "modify subscription") == 1) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, + "MODSUB", + "Group \"%.*s\": postponing join until " + "up-to-date metadata is available", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); + + rd_assert( + rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT || + /* Possible via rd_kafka_cgrp_modify_subscription */ + rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY); + + rd_kafka_cgrp_set_join_state( + rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA); + + + /* Revoke/join will occur after metadata refresh completes */ + if (revoking) + rd_kafka_topic_partition_list_destroy(revoking); + if (unsubscribing_topics) + rd_kafka_topic_partition_list_destroy( + unsubscribing_topics); + + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE", + "Group \"%.*s\": modifying subscription of size %d to " + "new subscription of size %d, removing %d topic(s), " + "revoking %d partition(s) (join-state %s)", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), old_cnt, + rkcg->rkcg_subscription->cnt, + unsubscribing_topics ? unsubscribing_topics->cnt : 0, + revoking ? revoking->cnt : 0, + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + + if (unsubscribing_topics) + rd_kafka_topic_partition_list_destroy(unsubscribing_topics); + + /* Create a list of the topics in metadata that matches the new + * subscription */ + tinfos = rd_list_new(rkcg->rkcg_subscription->cnt, + (void *)rd_kafka_topic_info_destroy); + + /* Unmatched topics will be added to the errored list. */ + errored = rd_kafka_topic_partition_list_new(0); + + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) + rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos, + rkcg->rkcg_subscription, errored); + else + rd_kafka_metadata_topic_filter( + rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored); + + /* Propagate consumer errors for any non-existent or errored topics. + * The function takes ownership of errored. */ + rd_kafka_propagate_consumer_topic_errors( + rkcg, errored, "Subscribed topic not available"); + + if (rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos) && !revoking) { + rd_kafka_cgrp_rejoin(rkcg, "Subscription modified"); + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + if (revoking) { + rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, + "REBALANCE", + "Group \"%.*s\" revoking " + "%d of %d partition(s)", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + revoking->cnt, rkcg->rkcg_group_assignment->cnt); + + rd_kafka_rebalance_op_incr( + rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, revoking, + rd_true /*rejoin*/, "subscribe"); + + rd_kafka_topic_partition_list_destroy(revoking); + } + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * Remove existing topic subscription. + */ +static rd_kafka_resp_err_t rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t *rkcg, + rd_bool_t leave_group) { + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNSUBSCRIBE", + "Group \"%.*s\": unsubscribe from current %ssubscription " + "of size %d (leave group=%s, has joined=%s, %s, " + "join-state %s)", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rkcg->rkcg_subscription ? "" : "unset ", + rkcg->rkcg_subscription ? rkcg->rkcg_subscription->cnt : 0, + RD_STR_ToF(leave_group), + RD_STR_ToF(RD_KAFKA_CGRP_HAS_JOINED(rkcg)), + rkcg->rkcg_member_id ? rkcg->rkcg_member_id->str : "n/a", + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + + rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers, + &rkcg->rkcg_max_poll_interval_tmr, 1 /*lock*/); + + if (rkcg->rkcg_subscription) { + rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription); + rkcg->rkcg_subscription = NULL; + } + + rd_kafka_cgrp_update_subscribed_topics(rkcg, NULL); + + /* + * Clean-up group leader duties, if any. + */ + rd_kafka_cgrp_group_leader_reset(rkcg, "unsubscribe"); + + if (leave_group && RD_KAFKA_CGRP_HAS_JOINED(rkcg)) + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE; + + /* FIXME: Why are we only revoking if !assignment_lost ? */ + if (!rd_kafka_cgrp_assignment_is_lost(rkcg)) + rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_false /*not lost*/, + rd_true /*initiating*/, + "unsubscribe"); + + rkcg->rkcg_flags &= ~(RD_KAFKA_CGRP_F_SUBSCRIPTION | + RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * Set new atomic topic subscription. + */ +static rd_kafka_resp_err_t +rd_kafka_cgrp_subscribe(rd_kafka_cgrp_t *rkcg, + rd_kafka_topic_partition_list_t *rktparlist) { + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE", + "Group \"%.*s\": subscribe to new %ssubscription " + "of %d topics (join-state %s)", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rktparlist ? "" : "unset ", + rktparlist ? rktparlist->cnt : 0, + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + + if (rkcg->rkcg_rk->rk_conf.enabled_assignor_cnt == 0) + return RD_KAFKA_RESP_ERR__INVALID_ARG; + + /* If the consumer has raised a fatal error treat all subscribes as + unsubscribe */ + if (rd_kafka_fatal_error_code(rkcg->rkcg_rk)) { + if (rkcg->rkcg_subscription) + rd_kafka_cgrp_unsubscribe(rkcg, + rd_true /*leave group*/); + return RD_KAFKA_RESP_ERR__FATAL; + } + + /* Clear any existing postponed subscribe. */ + if (rkcg->rkcg_next_subscription) + rd_kafka_topic_partition_list_destroy_free( + rkcg->rkcg_next_subscription); + rkcg->rkcg_next_subscription = NULL; + rkcg->rkcg_next_unsubscribe = rd_false; + + if (RD_KAFKA_CGRP_REBALANCING(rkcg)) { + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE", + "Group \"%.*s\": postponing " + "subscribe until previous rebalance " + "completes (join-state %s)", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + + if (!rktparlist) + rkcg->rkcg_next_unsubscribe = rd_true; + else + rkcg->rkcg_next_subscription = rktparlist; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + if (rd_kafka_cgrp_rebalance_protocol(rkcg) == + RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE && + rktparlist && rkcg->rkcg_subscription) + return rd_kafka_cgrp_modify_subscription(rkcg, rktparlist); + + /* Remove existing subscription first */ + if (rkcg->rkcg_subscription) + rd_kafka_cgrp_unsubscribe( + rkcg, + rktparlist + ? rd_false /* don't leave group if new subscription */ + : rd_true /* leave group if no new subscription */); + + if (!rktparlist) + return RD_KAFKA_RESP_ERR_NO_ERROR; + + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION; + + if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0) + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION; + + rkcg->rkcg_subscription = rktparlist; + + rd_kafka_cgrp_join(rkcg); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + + +/** + * Same as cgrp_terminate() but called from the cgrp/main thread upon receiving + * the op 'rko' from cgrp_terminate(). + * + * NOTE: Takes ownership of 'rko' + * + * Locality: main thread + */ +void rd_kafka_cgrp_terminate0(rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko) { + + rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread)); + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM", + "Terminating group \"%.*s\" in state %s " + "with %d partition(s)", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_cgrp_state_names[rkcg->rkcg_state], + rd_list_cnt(&rkcg->rkcg_toppars)); + + if (unlikely(rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM || + (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) || + rkcg->rkcg_reply_rko != NULL)) { + /* Already terminating or handling a previous terminate */ + if (rko) { + rd_kafka_q_t *rkq = rko->rko_replyq.q; + rko->rko_replyq.q = NULL; + rd_kafka_consumer_err( + rkq, RD_KAFKA_NODEID_UA, + RD_KAFKA_RESP_ERR__IN_PROGRESS, + rko->rko_replyq.version, NULL, NULL, + RD_KAFKA_OFFSET_INVALID, "Group is %s", + rkcg->rkcg_reply_rko ? "terminating" + : "terminated"); + rd_kafka_q_destroy(rkq); + rd_kafka_op_destroy(rko); + } + return; + } + + /* Mark for stopping, the actual state transition + * is performed when all toppars have left. */ + rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_TERMINATE; + rkcg->rkcg_ts_terminate = rd_clock(); + rkcg->rkcg_reply_rko = rko; + + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION) + rd_kafka_cgrp_unsubscribe( + rkcg, + /* Leave group if this is a controlled shutdown */ + !rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk)); + + /* Reset the wait-for-LeaveGroup flag if there is an outstanding + * LeaveGroupRequest being waited on (from a prior unsubscribe), but + * the destroy flags have NO_CONSUMER_CLOSE set, which calls + * for immediate termination. */ + if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk)) + rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE; + + /* If there's an oustanding rebalance which has not yet been + * served by the application it will be served from consumer_close(). + * If the instance is being terminated with NO_CONSUMER_CLOSE we + * trigger unassign directly to avoid stalling on rebalance callback + * queues that are no longer served by the application. */ + if (!RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) || + rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk)) + rd_kafka_cgrp_unassign(rkcg); + + /* Serve assignment so it can start to decommission */ + rd_kafka_assignment_serve(rkcg->rkcg_rk); + + /* Try to terminate right away if all preconditions are met. */ + rd_kafka_cgrp_try_terminate(rkcg); +} + + +/** + * Terminate and decommission a cgrp asynchronously. + * + * Locality: any thread + */ +void rd_kafka_cgrp_terminate(rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq) { + rd_kafka_assert(NULL, !thrd_is_current(rkcg->rkcg_rk->rk_thread)); + rd_kafka_cgrp_op(rkcg, NULL, replyq, RD_KAFKA_OP_TERMINATE, 0); +} + + +struct _op_timeout_offset_commit { + rd_ts_t now; + rd_kafka_t *rk; + rd_list_t expired; +}; + +/** + * q_filter callback for expiring OFFSET_COMMIT timeouts. + */ +static int rd_kafka_op_offset_commit_timeout_check(rd_kafka_q_t *rkq, + rd_kafka_op_t *rko, + void *opaque) { + struct _op_timeout_offset_commit *state = + (struct _op_timeout_offset_commit *)opaque; + + if (likely(rko->rko_type != RD_KAFKA_OP_OFFSET_COMMIT || + rko->rko_u.offset_commit.ts_timeout == 0 || + rko->rko_u.offset_commit.ts_timeout > state->now)) { + return 0; + } + + rd_kafka_q_deq0(rkq, rko); + + /* Add to temporary list to avoid recursive + * locking of rkcg_wait_coord_q. */ + rd_list_add(&state->expired, rko); + return 1; +} + + +/** + * Scan for various timeouts. + */ +static void rd_kafka_cgrp_timeout_scan(rd_kafka_cgrp_t *rkcg, rd_ts_t now) { + struct _op_timeout_offset_commit ofc_state; + int i, cnt = 0; + rd_kafka_op_t *rko; + + ofc_state.now = now; + ofc_state.rk = rkcg->rkcg_rk; + rd_list_init(&ofc_state.expired, 0, NULL); + + cnt += rd_kafka_q_apply(rkcg->rkcg_wait_coord_q, + rd_kafka_op_offset_commit_timeout_check, + &ofc_state); + + RD_LIST_FOREACH(rko, &ofc_state.expired, i) + rd_kafka_cgrp_op_handle_OffsetCommit(rkcg->rkcg_rk, NULL, + RD_KAFKA_RESP_ERR__WAIT_COORD, + NULL, NULL, rko); + + rd_list_destroy(&ofc_state.expired); + + if (cnt > 0) + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTIMEOUT", + "Group \"%.*s\": timed out %d op(s), %d remain", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), cnt, + rd_kafka_q_len(rkcg->rkcg_wait_coord_q)); +} + + +/** + * @brief Handle an assign op. + * @locality rdkafka main thread + * @locks none + */ +static void rd_kafka_cgrp_handle_assign_op(rd_kafka_cgrp_t *rkcg, + rd_kafka_op_t *rko) { + rd_kafka_error_t *error = NULL; + + if (rd_kafka_fatal_error_code(rkcg->rkcg_rk) || + rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) { + /* Treat all assignments as unassign when a fatal error is + * raised or the cgrp is terminating. */ + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, + "ASSIGN", + "Group \"%s\": Consumer %s: " + "treating assign as unassign", + rkcg->rkcg_group_id->str, + rd_kafka_fatal_error_code(rkcg->rkcg_rk) + ? "has raised a fatal error" + : "is terminating"); + + if (rko->rko_u.assign.partitions) { + rd_kafka_topic_partition_list_destroy( + rko->rko_u.assign.partitions); + rko->rko_u.assign.partitions = NULL; + } + rko->rko_u.assign.method = RD_KAFKA_ASSIGN_METHOD_ASSIGN; + + } else if (rd_kafka_cgrp_rebalance_protocol(rkcg) == + RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE && + !(rko->rko_u.assign.method == + RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN || + rko->rko_u.assign.method == + RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN)) + error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE, + "Changes to the current assignment " + "must be made using " + "incremental_assign() or " + "incremental_unassign() " + "when rebalance protocol type is " + "COOPERATIVE"); + + else if (rd_kafka_cgrp_rebalance_protocol(rkcg) == + RD_KAFKA_REBALANCE_PROTOCOL_EAGER && + !(rko->rko_u.assign.method == RD_KAFKA_ASSIGN_METHOD_ASSIGN)) + error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE, + "Changes to the current assignment " + "must be made using " + "assign() when rebalance " + "protocol type is EAGER"); + + if (!error) { + switch (rko->rko_u.assign.method) { + case RD_KAFKA_ASSIGN_METHOD_ASSIGN: + /* New atomic assignment (partitions != NULL), + * or unassignment (partitions == NULL) */ + if (rko->rko_u.assign.partitions) + error = rd_kafka_cgrp_assign( + rkcg, rko->rko_u.assign.partitions); + else + error = rd_kafka_cgrp_unassign(rkcg); + break; + case RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN: + error = rd_kafka_cgrp_incremental_assign( + rkcg, rko->rko_u.assign.partitions); + break; + case RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN: + error = rd_kafka_cgrp_incremental_unassign( + rkcg, rko->rko_u.assign.partitions); + break; + default: + RD_NOTREACHED(); + break; + } + + /* If call succeeded serve the assignment */ + if (!error) + rd_kafka_assignment_serve(rkcg->rkcg_rk); + } + + if (error) { + /* Log error since caller might not check + * *assign() return value. */ + rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "ASSIGN", + "Group \"%s\": application *assign() call " + "failed: %s", + rkcg->rkcg_group_id->str, + rd_kafka_error_string(error)); + } + + rd_kafka_op_error_reply(rko, error); +} + + +/** + * @brief Handle cgrp queue op. + * @locality rdkafka main thread + * @locks none + */ +static rd_kafka_op_res_t rd_kafka_cgrp_op_serve(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko, + rd_kafka_q_cb_type_t cb_type, + void *opaque) { + rd_kafka_cgrp_t *rkcg = opaque; + rd_kafka_toppar_t *rktp; + rd_kafka_resp_err_t err; + const int silent_op = rko->rko_type == RD_KAFKA_OP_RECV_BUF; + + rktp = rko->rko_rktp; + + if (rktp && !silent_op) + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP, "CGRPOP", + "Group \"%.*s\" received op %s in state %s " + "(join-state %s) for %.*s [%" PRId32 "]", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_op2str(rko->rko_type), + rd_kafka_cgrp_state_names[rkcg->rkcg_state], + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition); + else if (!silent_op) + rd_kafka_dbg( + rkcg->rkcg_rk, CGRP, "CGRPOP", + "Group \"%.*s\" received op %s in state %s " + "(join-state %s)", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_op2str(rko->rko_type), + rd_kafka_cgrp_state_names[rkcg->rkcg_state], + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + + switch ((int)rko->rko_type) { + case RD_KAFKA_OP_NAME: + /* Return the currently assigned member id. */ + if (rkcg->rkcg_member_id) + rko->rko_u.name.str = + RD_KAFKAP_STR_DUP(rkcg->rkcg_member_id); + rd_kafka_op_reply(rko, 0); + rko = NULL; + break; + + case RD_KAFKA_OP_CG_METADATA: + /* Return the current consumer group metadata. */ + rko->rko_u.cg_metadata = + rkcg->rkcg_member_id + ? rd_kafka_consumer_group_metadata_new_with_genid( + rkcg->rkcg_rk->rk_conf.group_id_str, + rkcg->rkcg_generation_id, + rkcg->rkcg_member_id->str, + rkcg->rkcg_rk->rk_conf.group_instance_id) + : NULL; + rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR); + rko = NULL; + break; + + case RD_KAFKA_OP_OFFSET_FETCH: + if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || + (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)) { + rd_kafka_op_handle_OffsetFetch( + rkcg->rkcg_rk, NULL, RD_KAFKA_RESP_ERR__WAIT_COORD, + NULL, NULL, rko); + rko = NULL; /* rko freed by handler */ + break; + } + + rd_kafka_OffsetFetchRequest( + rkcg->rkcg_coord, rk->rk_group_id->str, + rko->rko_u.offset_fetch.partitions, + rko->rko_u.offset_fetch.require_stable_offsets, + 0, /* Timeout */ + RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), + rd_kafka_op_handle_OffsetFetch, rko); + rko = NULL; /* rko now owned by request */ + break; + + case RD_KAFKA_OP_PARTITION_JOIN: + rd_kafka_cgrp_partition_add(rkcg, rktp); + + /* If terminating tell the partition to leave */ + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) + rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_NO_REPLYQ); + break; + + case RD_KAFKA_OP_PARTITION_LEAVE: + rd_kafka_cgrp_partition_del(rkcg, rktp); + break; + + case RD_KAFKA_OP_OFFSET_COMMIT: + /* Trigger offsets commit. */ + rd_kafka_cgrp_offsets_commit(rkcg, rko, + /* only set offsets + * if no partitions were + * specified. */ + rko->rko_u.offset_commit.partitions + ? 0 + : 1 /* set_offsets*/, + rko->rko_u.offset_commit.reason); + rko = NULL; /* rko now owned by request */ + break; + + case RD_KAFKA_OP_COORD_QUERY: + rd_kafka_cgrp_coord_query( + rkcg, + rko->rko_err ? rd_kafka_err2str(rko->rko_err) : "from op"); + break; + + case RD_KAFKA_OP_SUBSCRIBE: + rd_kafka_app_polled(rk); + + /* New atomic subscription (may be NULL) */ + err = + rd_kafka_cgrp_subscribe(rkcg, rko->rko_u.subscribe.topics); + + if (!err) /* now owned by rkcg */ + rko->rko_u.subscribe.topics = NULL; + + rd_kafka_op_reply(rko, err); + rko = NULL; + break; + + case RD_KAFKA_OP_ASSIGN: + rd_kafka_cgrp_handle_assign_op(rkcg, rko); + rko = NULL; + break; + + case RD_KAFKA_OP_GET_SUBSCRIPTION: + if (rkcg->rkcg_next_subscription) + rko->rko_u.subscribe.topics = + rd_kafka_topic_partition_list_copy( + rkcg->rkcg_next_subscription); + else if (rkcg->rkcg_next_unsubscribe) + rko->rko_u.subscribe.topics = NULL; + else if (rkcg->rkcg_subscription) + rko->rko_u.subscribe.topics = + rd_kafka_topic_partition_list_copy( + rkcg->rkcg_subscription); + rd_kafka_op_reply(rko, 0); + rko = NULL; + break; + + case RD_KAFKA_OP_GET_ASSIGNMENT: + /* This is the consumer assignment, not the group assignment. */ + rko->rko_u.assign.partitions = + rd_kafka_topic_partition_list_copy( + rkcg->rkcg_rk->rk_consumer.assignment.all); + + rd_kafka_op_reply(rko, 0); + rko = NULL; + break; + + case RD_KAFKA_OP_GET_REBALANCE_PROTOCOL: + rko->rko_u.rebalance_protocol.str = + rd_kafka_rebalance_protocol2str( + rd_kafka_cgrp_rebalance_protocol(rkcg)); + rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR); + rko = NULL; + break; + + case RD_KAFKA_OP_TERMINATE: + rd_kafka_cgrp_terminate0(rkcg, rko); + rko = NULL; /* terminate0() takes ownership */ + break; + + default: + rd_kafka_assert(rkcg->rkcg_rk, !*"unknown type"); + break; + } + + if (rko) + rd_kafka_op_destroy(rko); + + return RD_KAFKA_OP_RES_HANDLED; +} + + +/** + * @returns true if the session timeout has expired (due to no successful + * Heartbeats in session.timeout.ms) and triggers a rebalance. + */ +static rd_bool_t rd_kafka_cgrp_session_timeout_check(rd_kafka_cgrp_t *rkcg, + rd_ts_t now) { + rd_ts_t delta; + char buf[256]; + + if (unlikely(!rkcg->rkcg_ts_session_timeout)) + return rd_true; /* Session has expired */ + + delta = now - rkcg->rkcg_ts_session_timeout; + if (likely(delta < 0)) + return rd_false; + + delta += rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000; + + rd_snprintf(buf, sizeof(buf), + "Consumer group session timed out (in join-state %s) after " + "%" PRId64 + " ms without a successful response from the " + "group coordinator (broker %" PRId32 ", last error was %s)", + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], + delta / 1000, rkcg->rkcg_coord_id, + rd_kafka_err2str(rkcg->rkcg_last_heartbeat_err)); + + rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR; + + rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "SESSTMOUT", + "%s: revoking assignment and rejoining group", buf); + + /* Prevent further rebalances */ + rkcg->rkcg_ts_session_timeout = 0; + + /* Timing out invalidates the member id, reset it + * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */ + rd_kafka_cgrp_set_member_id(rkcg, ""); + + /* Revoke and rebalance */ + rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_true /*lost*/, + rd_true /*initiating*/, buf); + + return rd_true; +} + + +/** + * @brief Apply the next waiting subscribe/unsubscribe, if any. + */ +static void rd_kafka_cgrp_apply_next_subscribe(rd_kafka_cgrp_t *rkcg) { + rd_assert(rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT); + + if (rkcg->rkcg_next_subscription) { + rd_kafka_topic_partition_list_t *next_subscription = + rkcg->rkcg_next_subscription; + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIBE", + "Group \"%s\": invoking waiting postponed " + "subscribe", + rkcg->rkcg_group_id->str); + rkcg->rkcg_next_subscription = NULL; + rd_kafka_cgrp_subscribe(rkcg, next_subscription); + + } else if (rkcg->rkcg_next_unsubscribe) { + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIBE", + "Group \"%s\": invoking waiting postponed " + "unsubscribe", + rkcg->rkcg_group_id->str); + rkcg->rkcg_next_unsubscribe = rd_false; + rd_kafka_cgrp_unsubscribe(rkcg, rd_true /*Leave*/); + } +} + +/** + * Client group's join state handling + */ +static void rd_kafka_cgrp_join_state_serve(rd_kafka_cgrp_t *rkcg) { + rd_ts_t now = rd_clock(); + + if (unlikely(rd_kafka_fatal_error_code(rkcg->rkcg_rk))) + return; + + switch (rkcg->rkcg_join_state) { + case RD_KAFKA_CGRP_JOIN_STATE_INIT: + if (unlikely(rd_kafka_cgrp_awaiting_response(rkcg))) + break; + + /* If there is a next subscription, apply it. */ + rd_kafka_cgrp_apply_next_subscribe(rkcg); + + /* If we have a subscription start the join process. */ + if (!rkcg->rkcg_subscription) + break; + + if (rd_interval_immediate(&rkcg->rkcg_join_intvl, 1000 * 1000, + now) > 0) + rd_kafka_cgrp_join(rkcg); + break; + + case RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN: + case RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA: + case RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC: + case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE: + /* FIXME: I think we might have to send heartbeats in + * in WAIT_INCR_UNASSIGN, yes-no? */ + case RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE: + break; + + case RD_KAFKA_CGRP_JOIN_STATE_STEADY: + case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL: + case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL: + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION && + rd_interval( + &rkcg->rkcg_heartbeat_intvl, + rkcg->rkcg_rk->rk_conf.group_heartbeat_intvl_ms * 1000, + now) > 0) + rd_kafka_cgrp_heartbeat(rkcg); + break; + } +} +/** + * Client group handling. + * Called from main thread to serve the operational aspects of a cgrp. + */ +void rd_kafka_cgrp_serve(rd_kafka_cgrp_t *rkcg) { + rd_kafka_broker_t *rkb = rkcg->rkcg_coord; + int rkb_state = RD_KAFKA_BROKER_STATE_INIT; + rd_ts_t now; + + if (rkb) { + rd_kafka_broker_lock(rkb); + rkb_state = rkb->rkb_state; + rd_kafka_broker_unlock(rkb); + + /* Go back to querying state if we lost the current coordinator + * connection. */ + if (rkb_state < RD_KAFKA_BROKER_STATE_UP && + rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) + rd_kafka_cgrp_set_state( + rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); + } + + now = rd_clock(); + + /* Check for cgrp termination */ + if (unlikely(rd_kafka_cgrp_try_terminate(rkcg))) { + rd_kafka_cgrp_terminated(rkcg); + return; /* cgrp terminated */ + } + + /* Bail out if we're terminating. */ + if (unlikely(rd_kafka_terminating(rkcg->rkcg_rk))) + return; + + /* Check session timeout regardless of current coordinator + * connection state (rkcg_state) */ + if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) + rd_kafka_cgrp_session_timeout_check(rkcg, now); + +retry: + switch (rkcg->rkcg_state) { + case RD_KAFKA_CGRP_STATE_TERM: + break; + + case RD_KAFKA_CGRP_STATE_INIT: + rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); + /* FALLTHRU */ + + case RD_KAFKA_CGRP_STATE_QUERY_COORD: + /* Query for coordinator. */ + if (rd_interval_immediate(&rkcg->rkcg_coord_query_intvl, + 500 * 1000, now) > 0) + rd_kafka_cgrp_coord_query(rkcg, + "intervaled in " + "state query-coord"); + break; + + case RD_KAFKA_CGRP_STATE_WAIT_COORD: + /* Waiting for FindCoordinator response */ + break; + + case RD_KAFKA_CGRP_STATE_WAIT_BROKER: + /* See if the group should be reassigned to another broker. */ + if (rd_kafka_cgrp_coord_update(rkcg, rkcg->rkcg_coord_id)) + goto retry; /* Coordinator changed, retry state-machine + * to speed up next transition. */ + + /* Coordinator query */ + if (rd_interval(&rkcg->rkcg_coord_query_intvl, 1000 * 1000, + now) > 0) + rd_kafka_cgrp_coord_query(rkcg, + "intervaled in " + "state wait-broker"); + break; + + case RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT: + /* Waiting for broker transport to come up. + * Also make sure broker supports groups. */ + if (rkb_state < RD_KAFKA_BROKER_STATE_UP || !rkb || + !rd_kafka_broker_supports( + rkb, RD_KAFKA_FEATURE_BROKER_GROUP_COORD)) { + /* Coordinator query */ + if (rd_interval(&rkcg->rkcg_coord_query_intvl, + 1000 * 1000, now) > 0) + rd_kafka_cgrp_coord_query( + rkcg, + "intervaled in state " + "wait-broker-transport"); + + } else { + rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_UP); + + /* Serve join state to trigger (re)join */ + rd_kafka_cgrp_join_state_serve(rkcg); + + /* Serve any pending partitions in the assignment */ + rd_kafka_assignment_serve(rkcg->rkcg_rk); + } + break; + + case RD_KAFKA_CGRP_STATE_UP: + /* Move any ops awaiting the coordinator to the ops queue + * for reprocessing. */ + rd_kafka_q_concat(rkcg->rkcg_ops, rkcg->rkcg_wait_coord_q); + + /* Relaxed coordinator queries. */ + if (rd_interval(&rkcg->rkcg_coord_query_intvl, + rkcg->rkcg_rk->rk_conf.coord_query_intvl_ms * + 1000, + now) > 0) + rd_kafka_cgrp_coord_query(rkcg, + "intervaled in state up"); + + rd_kafka_cgrp_join_state_serve(rkcg); + break; + } + + if (unlikely(rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP && + rd_interval(&rkcg->rkcg_timeout_scan_intvl, 1000 * 1000, + now) > 0)) + rd_kafka_cgrp_timeout_scan(rkcg, now); +} + + + +/** + * Send an op to a cgrp. + * + * Locality: any thread + */ +void rd_kafka_cgrp_op(rd_kafka_cgrp_t *rkcg, + rd_kafka_toppar_t *rktp, + rd_kafka_replyq_t replyq, + rd_kafka_op_type_t type, + rd_kafka_resp_err_t err) { + rd_kafka_op_t *rko; + + rko = rd_kafka_op_new(type); + rko->rko_err = err; + rko->rko_replyq = replyq; + + if (rktp) + rko->rko_rktp = rd_kafka_toppar_keep(rktp); + + rd_kafka_q_enq(rkcg->rkcg_ops, rko); +} + + + +void rd_kafka_cgrp_set_member_id(rd_kafka_cgrp_t *rkcg, const char *member_id) { + if (rkcg->rkcg_member_id && member_id && + !rd_kafkap_str_cmp_str(rkcg->rkcg_member_id, member_id)) + return; /* No change */ + + rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "MEMBERID", + "Group \"%.*s\": updating member id \"%s\" -> \"%s\"", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rkcg->rkcg_member_id ? rkcg->rkcg_member_id->str + : "(not-set)", + member_id ? member_id : "(not-set)"); + + if (rkcg->rkcg_member_id) { + rd_kafkap_str_destroy(rkcg->rkcg_member_id); + rkcg->rkcg_member_id = NULL; + } + + if (member_id) + rkcg->rkcg_member_id = rd_kafkap_str_new(member_id, -1); +} + + +/** + * @brief Determine owned partitions that no longer exist (partitions in + * deleted or re-created topics). + */ +static rd_kafka_topic_partition_list_t * +rd_kafka_cgrp_owned_but_not_exist_partitions(rd_kafka_cgrp_t *rkcg) { + rd_kafka_topic_partition_list_t *result = NULL; + const rd_kafka_topic_partition_t *curr; + + if (!rkcg->rkcg_group_assignment) + return NULL; + + RD_KAFKA_TPLIST_FOREACH(curr, rkcg->rkcg_group_assignment) { + if (rd_list_find(rkcg->rkcg_subscribed_topics, curr->topic, + rd_kafka_topic_info_topic_cmp)) + continue; + + if (!result) + result = rd_kafka_topic_partition_list_new( + rkcg->rkcg_group_assignment->cnt); + + rd_kafka_topic_partition_list_add_copy(result, curr); + } + + return result; +} + + +/** + * @brief Check if the latest metadata affects the current subscription: + * - matched topic added + * - matched topic removed + * - matched topic's partition count change + * + * @locks none + * @locality rdkafka main thread + */ +void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg, + rd_bool_t do_join) { + rd_list_t *tinfos; + rd_kafka_topic_partition_list_t *errored; + rd_bool_t changed; + + rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread)); + + if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0) + return; + + /* + * Unmatched topics will be added to the errored list. + */ + errored = rd_kafka_topic_partition_list_new(0); + + /* + * Create a list of the topics in metadata that matches our subscription + */ + tinfos = rd_list_new(rkcg->rkcg_subscription->cnt, + (void *)rd_kafka_topic_info_destroy); + + if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) + rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos, + rkcg->rkcg_subscription, errored); + else + rd_kafka_metadata_topic_filter( + rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored); + + + /* + * Propagate consumer errors for any non-existent or errored topics. + * The function takes ownership of errored. + */ + rd_kafka_propagate_consumer_topic_errors( + rkcg, errored, "Subscribed topic not available"); + + /* + * Update effective list of topics (takes ownership of \c tinfos) + */ + changed = rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos); + + if (!do_join || + (!changed && + /* If we get the same effective list of topics as last time around, + * but the join is waiting for this metadata query to complete, + * then we should not return here but follow through with the + * (re)join below. */ + rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA)) + return; + + /* List of subscribed topics changed, trigger rejoin. */ + rd_kafka_dbg(rkcg->rkcg_rk, + CGRP | RD_KAFKA_DBG_METADATA | RD_KAFKA_DBG_CONSUMER, + "REJOIN", + "Group \"%.*s\": " + "subscription updated from metadata change: " + "rejoining group in state %s", + RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), + rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); + + if (rd_kafka_cgrp_rebalance_protocol(rkcg) == + RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE) { + + /* Partitions from deleted topics */ + rd_kafka_topic_partition_list_t *owned_but_not_exist = + rd_kafka_cgrp_owned_but_not_exist_partitions(rkcg); + + if (owned_but_not_exist) { + rd_kafka_cgrp_assignment_set_lost( + rkcg, "%d subscribed topic(s) no longer exist", + owned_but_not_exist->cnt); + + rd_kafka_rebalance_op_incr( + rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, + owned_but_not_exist, + rkcg->rkcg_group_leader.members != NULL + /* Rejoin group following revoke's + * unassign if we are leader */ + , + "topics not available"); + rd_kafka_topic_partition_list_destroy( + owned_but_not_exist); + + } else { + /* Nothing to revoke, rejoin group if we are the + * leader. + * The KIP says to rejoin the group on metadata + * change only if we're the leader. But what if a + * non-leader is subscribed to a regex that the others + * aren't? + * Going against the KIP and rejoining here. */ + rd_kafka_cgrp_rejoin( + rkcg, + "Metadata for subscribed topic(s) has " + "changed"); + } + + } else { + /* EAGER */ + rd_kafka_cgrp_revoke_rejoin(rkcg, + "Metadata for subscribed topic(s) " + "has changed"); + } + + /* We shouldn't get stuck in this state. */ + rd_dassert(rkcg->rkcg_join_state != + RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA); +} + + +rd_kafka_consumer_group_metadata_t * +rd_kafka_consumer_group_metadata_new(const char *group_id) { + rd_kafka_consumer_group_metadata_t *cgmetadata; + + cgmetadata = rd_kafka_consumer_group_metadata_new_with_genid( + group_id, -1, "", NULL); + + return cgmetadata; +} + +rd_kafka_consumer_group_metadata_t * +rd_kafka_consumer_group_metadata_new_with_genid(const char *group_id, + int32_t generation_id, + const char *member_id, + const char *group_instance_id) { + rd_kafka_consumer_group_metadata_t *cgmetadata; + + cgmetadata = rd_calloc(1, sizeof(*cgmetadata)); + cgmetadata->group_id = rd_strdup(group_id); + cgmetadata->generation_id = generation_id; + cgmetadata->member_id = rd_strdup(member_id); + if (group_instance_id) + cgmetadata->group_instance_id = rd_strdup(group_instance_id); + + return cgmetadata; +} + +rd_kafka_consumer_group_metadata_t * +rd_kafka_consumer_group_metadata(rd_kafka_t *rk) { + rd_kafka_consumer_group_metadata_t *cgmetadata; + rd_kafka_op_t *rko; + rd_kafka_cgrp_t *rkcg; + + if (!(rkcg = rd_kafka_cgrp_get(rk))) + return NULL; + + rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_CG_METADATA); + if (!rko) + return NULL; + + cgmetadata = rko->rko_u.cg_metadata; + rko->rko_u.cg_metadata = NULL; + rd_kafka_op_destroy(rko); + + return cgmetadata; +} + +void rd_kafka_consumer_group_metadata_destroy( + rd_kafka_consumer_group_metadata_t *cgmetadata) { + rd_free(cgmetadata->group_id); + rd_free(cgmetadata->member_id); + if (cgmetadata->group_instance_id) + rd_free(cgmetadata->group_instance_id); + rd_free(cgmetadata); +} + +rd_kafka_consumer_group_metadata_t *rd_kafka_consumer_group_metadata_dup( + const rd_kafka_consumer_group_metadata_t *cgmetadata) { + rd_kafka_consumer_group_metadata_t *ret; + + ret = rd_calloc(1, sizeof(*cgmetadata)); + ret->group_id = rd_strdup(cgmetadata->group_id); + ret->generation_id = cgmetadata->generation_id; + ret->member_id = rd_strdup(cgmetadata->member_id); + if (cgmetadata->group_instance_id) + ret->group_instance_id = + rd_strdup(cgmetadata->group_instance_id); + + return ret; +} + +/* + * Consumer group metadata serialization format v2: + * "CGMDv2:"<generation_id><group_id>"\0"<member_id>"\0" \ + * <group_instance_id_is_null>[<group_instance_id>"\0"] + * Where <group_id> is the group_id string. + */ +static const char rd_kafka_consumer_group_metadata_magic[7] = "CGMDv2:"; + +rd_kafka_error_t *rd_kafka_consumer_group_metadata_write( + const rd_kafka_consumer_group_metadata_t *cgmd, + void **bufferp, + size_t *sizep) { + char *buf; + size_t size; + size_t of = 0; + size_t magic_len = sizeof(rd_kafka_consumer_group_metadata_magic); + size_t groupid_len = strlen(cgmd->group_id) + 1; + size_t generationid_len = sizeof(cgmd->generation_id); + size_t member_id_len = strlen(cgmd->member_id) + 1; + int8_t group_instance_id_is_null = cgmd->group_instance_id ? 0 : 1; + size_t group_instance_id_is_null_len = + sizeof(group_instance_id_is_null); + size_t group_instance_id_len = + cgmd->group_instance_id ? strlen(cgmd->group_instance_id) + 1 : 0; + + size = magic_len + groupid_len + generationid_len + member_id_len + + group_instance_id_is_null_len + group_instance_id_len; + + buf = rd_malloc(size); + + memcpy(buf, rd_kafka_consumer_group_metadata_magic, magic_len); + of += magic_len; + + memcpy(buf + of, &cgmd->generation_id, generationid_len); + of += generationid_len; + + memcpy(buf + of, cgmd->group_id, groupid_len); + of += groupid_len; + + memcpy(buf + of, cgmd->member_id, member_id_len); + of += member_id_len; + + memcpy(buf + of, &group_instance_id_is_null, + group_instance_id_is_null_len); + of += group_instance_id_is_null_len; + + if (!group_instance_id_is_null) + memcpy(buf + of, cgmd->group_instance_id, + group_instance_id_len); + of += group_instance_id_len; + + rd_assert(of == size); + + *bufferp = buf; + *sizep = size; + + return NULL; +} + + +/* + * Check that a string is printable, returning NULL if not or + * a pointer immediately after the end of the string NUL + * terminator if so. + **/ +static const char *str_is_printable(const char *s, const char *end) { + const char *c; + for (c = s; *c && c != end; c++) + if (!isprint((int)*c)) + return NULL; + return c + 1; +} + + +rd_kafka_error_t *rd_kafka_consumer_group_metadata_read( + rd_kafka_consumer_group_metadata_t **cgmdp, + const void *buffer, + size_t size) { + const char *buf = (const char *)buffer; + const char *end = buf + size; + const char *next; + size_t magic_len = sizeof(rd_kafka_consumer_group_metadata_magic); + int32_t generation_id; + size_t generationid_len = sizeof(generation_id); + const char *group_id; + const char *member_id; + int8_t group_instance_id_is_null; + const char *group_instance_id = NULL; + + if (size < magic_len + generationid_len + 1 + 1 + 1) + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG, + "Input buffer is too short"); + + if (memcmp(buffer, rd_kafka_consumer_group_metadata_magic, magic_len)) + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG, + "Input buffer is not a serialized " + "consumer group metadata object"); + memcpy(&generation_id, buf + magic_len, generationid_len); + + group_id = buf + magic_len + generationid_len; + next = str_is_printable(group_id, end); + if (!next) + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG, + "Input buffer group id is not safe"); + + member_id = next; + next = str_is_printable(member_id, end); + if (!next) + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG, + "Input buffer member id is not " + "safe"); + + group_instance_id_is_null = (int8_t) * (next++); + if (!group_instance_id_is_null) { + group_instance_id = next; + next = str_is_printable(group_instance_id, end); + if (!next) + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG, + "Input buffer group " + "instance id is not safe"); + } + + if (next != end) + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG, + "Input buffer bad length"); + + *cgmdp = rd_kafka_consumer_group_metadata_new_with_genid( + group_id, generation_id, member_id, group_instance_id); + + return NULL; +} + + +static int +unittest_consumer_group_metadata_iteration(const char *group_id, + int32_t generation_id, + const char *member_id, + const char *group_instance_id) { + rd_kafka_consumer_group_metadata_t *cgmd; + void *buffer, *buffer2; + size_t size, size2; + rd_kafka_error_t *error; + + cgmd = rd_kafka_consumer_group_metadata_new_with_genid( + group_id, generation_id, member_id, group_instance_id); + RD_UT_ASSERT(cgmd != NULL, "failed to create metadata"); + + error = rd_kafka_consumer_group_metadata_write(cgmd, &buffer, &size); + RD_UT_ASSERT(!error, "metadata_write failed: %s", + rd_kafka_error_string(error)); + + rd_kafka_consumer_group_metadata_destroy(cgmd); + + cgmd = NULL; + error = rd_kafka_consumer_group_metadata_read(&cgmd, buffer, size); + RD_UT_ASSERT(!error, "metadata_read failed: %s", + rd_kafka_error_string(error)); + + /* Serialize again and compare buffers */ + error = rd_kafka_consumer_group_metadata_write(cgmd, &buffer2, &size2); + RD_UT_ASSERT(!error, "metadata_write failed: %s", + rd_kafka_error_string(error)); + + RD_UT_ASSERT(size == size2 && !memcmp(buffer, buffer2, size), + "metadata_read/write size or content mismatch: " + "size %" PRIusz ", size2 %" PRIusz, + size, size2); + + rd_kafka_consumer_group_metadata_destroy(cgmd); + rd_free(buffer); + rd_free(buffer2); + + return 0; +} + + +static int unittest_consumer_group_metadata(void) { + const char *ids[] = { + "mY. random id:.", + "0", + "2222222222222222222222221111111111111111111111111111112222", + "", + "NULL", + NULL, + }; + int i, j, k, gen_id; + int ret; + const char *group_id; + const char *member_id; + const char *group_instance_id; + + for (i = 0; ids[i]; i++) { + for (j = 0; ids[j]; j++) { + for (k = 0; ids[k]; k++) { + for (gen_id = -1; gen_id < 1; gen_id++) { + group_id = ids[i]; + member_id = ids[j]; + group_instance_id = ids[k]; + if (strcmp(group_instance_id, "NULL") == + 0) + group_instance_id = NULL; + ret = + unittest_consumer_group_metadata_iteration( + group_id, gen_id, member_id, + group_instance_id); + if (ret) + return ret; + } + } + } + } + + RD_UT_PASS(); +} + + +static int unittest_set_intersect(void) { + size_t par_cnt = 10; + map_toppar_member_info_t *dst; + rd_kafka_topic_partition_t *toppar; + PartitionMemberInfo_t *v; + char *id = "id"; + rd_kafkap_str_t id1 = RD_KAFKAP_STR_INITIALIZER; + rd_kafkap_str_t id2 = RD_KAFKAP_STR_INITIALIZER; + rd_kafka_group_member_t *gm1; + rd_kafka_group_member_t *gm2; + + id1.len = 2; + id1.str = id; + id2.len = 2; + id2.str = id; + + map_toppar_member_info_t a = RD_MAP_INITIALIZER( + par_cnt, rd_kafka_topic_partition_cmp, + rd_kafka_topic_partition_hash, + rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); + + map_toppar_member_info_t b = RD_MAP_INITIALIZER( + par_cnt, rd_kafka_topic_partition_cmp, + rd_kafka_topic_partition_hash, + rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); + + gm1 = rd_calloc(1, sizeof(*gm1)); + gm1->rkgm_member_id = &id1; + gm1->rkgm_group_instance_id = &id1; + gm2 = rd_calloc(1, sizeof(*gm2)); + gm2->rkgm_member_id = &id2; + gm2->rkgm_group_instance_id = &id2; + + RD_MAP_SET(&a, rd_kafka_topic_partition_new("t1", 4), + PartitionMemberInfo_new(gm1, rd_false)); + RD_MAP_SET(&a, rd_kafka_topic_partition_new("t2", 4), + PartitionMemberInfo_new(gm1, rd_false)); + RD_MAP_SET(&a, rd_kafka_topic_partition_new("t1", 7), + PartitionMemberInfo_new(gm1, rd_false)); + + RD_MAP_SET(&b, rd_kafka_topic_partition_new("t2", 7), + PartitionMemberInfo_new(gm1, rd_false)); + RD_MAP_SET(&b, rd_kafka_topic_partition_new("t1", 4), + PartitionMemberInfo_new(gm2, rd_false)); + + dst = rd_kafka_member_partitions_intersect(&a, &b); + + RD_UT_ASSERT(RD_MAP_CNT(&a) == 3, "expected a cnt to be 3 not %d", + (int)RD_MAP_CNT(&a)); + RD_UT_ASSERT(RD_MAP_CNT(&b) == 2, "expected b cnt to be 2 not %d", + (int)RD_MAP_CNT(&b)); + RD_UT_ASSERT(RD_MAP_CNT(dst) == 1, "expected dst cnt to be 1 not %d", + (int)RD_MAP_CNT(dst)); + + toppar = rd_kafka_topic_partition_new("t1", 4); + RD_UT_ASSERT((v = RD_MAP_GET(dst, toppar)), "unexpected element"); + RD_UT_ASSERT(v->members_match, "expected members to match"); + rd_kafka_topic_partition_destroy(toppar); + + RD_MAP_DESTROY(&a); + RD_MAP_DESTROY(&b); + RD_MAP_DESTROY(dst); + rd_free(dst); + + rd_free(gm1); + rd_free(gm2); + + RD_UT_PASS(); +} + + +static int unittest_set_subtract(void) { + size_t par_cnt = 10; + rd_kafka_topic_partition_t *toppar; + map_toppar_member_info_t *dst; + + map_toppar_member_info_t a = RD_MAP_INITIALIZER( + par_cnt, rd_kafka_topic_partition_cmp, + rd_kafka_topic_partition_hash, + rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); + + map_toppar_member_info_t b = RD_MAP_INITIALIZER( + par_cnt, rd_kafka_topic_partition_cmp, + rd_kafka_topic_partition_hash, + rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); + + RD_MAP_SET(&a, rd_kafka_topic_partition_new("t1", 4), + PartitionMemberInfo_new(NULL, rd_false)); + RD_MAP_SET(&a, rd_kafka_topic_partition_new("t2", 7), + PartitionMemberInfo_new(NULL, rd_false)); + + RD_MAP_SET(&b, rd_kafka_topic_partition_new("t2", 4), + PartitionMemberInfo_new(NULL, rd_false)); + RD_MAP_SET(&b, rd_kafka_topic_partition_new("t1", 4), + PartitionMemberInfo_new(NULL, rd_false)); + RD_MAP_SET(&b, rd_kafka_topic_partition_new("t1", 7), + PartitionMemberInfo_new(NULL, rd_false)); + + dst = rd_kafka_member_partitions_subtract(&a, &b); + + RD_UT_ASSERT(RD_MAP_CNT(&a) == 2, "expected a cnt to be 2 not %d", + (int)RD_MAP_CNT(&a)); + RD_UT_ASSERT(RD_MAP_CNT(&b) == 3, "expected b cnt to be 3 not %d", + (int)RD_MAP_CNT(&b)); + RD_UT_ASSERT(RD_MAP_CNT(dst) == 1, "expected dst cnt to be 1 not %d", + (int)RD_MAP_CNT(dst)); + + toppar = rd_kafka_topic_partition_new("t2", 7); + RD_UT_ASSERT(RD_MAP_GET(dst, toppar), "unexpected element"); + rd_kafka_topic_partition_destroy(toppar); + + RD_MAP_DESTROY(&a); + RD_MAP_DESTROY(&b); + RD_MAP_DESTROY(dst); + rd_free(dst); + + RD_UT_PASS(); +} + + +static int unittest_map_to_list(void) { + rd_kafka_topic_partition_list_t *list; + + map_toppar_member_info_t map = RD_MAP_INITIALIZER( + 10, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash, + rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); + + RD_MAP_SET(&map, rd_kafka_topic_partition_new("t1", 101), + PartitionMemberInfo_new(NULL, rd_false)); + + list = rd_kafka_toppar_member_info_map_to_list(&map); + + RD_UT_ASSERT(list->cnt == 1, "expecting list size of 1 not %d.", + list->cnt); + RD_UT_ASSERT(list->elems[0].partition == 101, + "expecting partition 101 not %d", + list->elems[0].partition); + RD_UT_ASSERT(!strcmp(list->elems[0].topic, "t1"), + "expecting topic 't1', not %s", list->elems[0].topic); + + rd_kafka_topic_partition_list_destroy(list); + RD_MAP_DESTROY(&map); + + RD_UT_PASS(); +} + + +static int unittest_list_to_map(void) { + rd_kafka_topic_partition_t *toppar; + map_toppar_member_info_t *map; + rd_kafka_topic_partition_list_t *list = + rd_kafka_topic_partition_list_new(1); + + rd_kafka_topic_partition_list_add(list, "topic1", 201); + rd_kafka_topic_partition_list_add(list, "topic2", 202); + + map = rd_kafka_toppar_list_to_toppar_member_info_map(list); + + RD_UT_ASSERT(RD_MAP_CNT(map) == 2, "expected map cnt to be 2 not %d", + (int)RD_MAP_CNT(map)); + toppar = rd_kafka_topic_partition_new("topic1", 201); + RD_UT_ASSERT(RD_MAP_GET(map, toppar), + "expected topic1 [201] to exist in map"); + rd_kafka_topic_partition_destroy(toppar); + toppar = rd_kafka_topic_partition_new("topic2", 202); + RD_UT_ASSERT(RD_MAP_GET(map, toppar), + "expected topic2 [202] to exist in map"); + rd_kafka_topic_partition_destroy(toppar); + + RD_MAP_DESTROY(map); + rd_free(map); + rd_kafka_topic_partition_list_destroy(list); + + RD_UT_PASS(); +} + + +/** + * @brief Consumer group unit tests + */ +int unittest_cgrp(void) { + int fails = 0; + + fails += unittest_consumer_group_metadata(); + fails += unittest_set_intersect(); + fails += unittest_set_subtract(); + fails += unittest_map_to_list(); + fails += unittest_list_to_map(); + + return fails; +} |