/* * librdkafka - Apache Kafka C library * * Copyright (c) 2012-2015, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "rdkafka_int.h" #include "rdkafka_broker.h" #include "rdkafka_request.h" #include "rdkafka_topic.h" #include "rdkafka_partition.h" #include "rdkafka_assignor.h" #include "rdkafka_offset.h" #include "rdkafka_metadata.h" #include "rdkafka_cgrp.h" #include "rdkafka_interceptor.h" #include "rdmap.h" #include "rdunittest.h" #include #include static void rd_kafka_cgrp_offset_commit_tmr_cb(rd_kafka_timers_t *rkts, void *arg); static rd_kafka_error_t * rd_kafka_cgrp_assign(rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *assignment); static rd_kafka_error_t *rd_kafka_cgrp_unassign(rd_kafka_cgrp_t *rkcg); static rd_kafka_error_t * rd_kafka_cgrp_incremental_assign(rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *partitions); static rd_kafka_error_t * rd_kafka_cgrp_incremental_unassign(rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *partitions); static rd_kafka_op_res_t rd_kafka_cgrp_op_serve(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type, void *opaque); static void rd_kafka_cgrp_group_leader_reset(rd_kafka_cgrp_t *rkcg, const char *reason); static RD_INLINE int rd_kafka_cgrp_try_terminate(rd_kafka_cgrp_t *rkcg); static void rd_kafka_cgrp_revoke_all_rejoin(rd_kafka_cgrp_t *rkcg, rd_bool_t assignment_lost, rd_bool_t initiating, const char *reason); static void rd_kafka_cgrp_revoke_all_rejoin_maybe(rd_kafka_cgrp_t *rkcg, rd_bool_t assignment_lost, rd_bool_t initiating, const char *reason); static void rd_kafka_cgrp_group_is_rebalancing(rd_kafka_cgrp_t *rkcg); static void rd_kafka_cgrp_max_poll_interval_check_tmr_cb(rd_kafka_timers_t *rkts, void *arg); static rd_kafka_resp_err_t rd_kafka_cgrp_subscribe(rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *rktparlist); static void rd_kafka_cgrp_group_assignment_set( rd_kafka_cgrp_t *rkcg, const rd_kafka_topic_partition_list_t *partitions); static void rd_kafka_cgrp_group_assignment_modify( rd_kafka_cgrp_t *rkcg, rd_bool_t add, const rd_kafka_topic_partition_list_t *partitions); static void rd_kafka_cgrp_handle_assignment(rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *assignment); /** * @returns true if the current assignment is lost. */ rd_bool_t rd_kafka_cgrp_assignment_is_lost(rd_kafka_cgrp_t *rkcg) { return rd_atomic32_get(&rkcg->rkcg_assignment_lost) != 0; } /** * @brief Call when the current assignment has been lost, with a * human-readable reason. */ static void rd_kafka_cgrp_assignment_set_lost(rd_kafka_cgrp_t *rkcg, char *fmt, ...) RD_FORMAT(printf, 2, 3); static void rd_kafka_cgrp_assignment_set_lost(rd_kafka_cgrp_t *rkcg, char *fmt, ...) { va_list ap; char reason[256]; if (!rkcg->rkcg_group_assignment) return; va_start(ap, fmt); rd_vsnprintf(reason, sizeof(reason), fmt, ap); va_end(ap); rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "LOST", "Group \"%s\": " "current assignment of %d partition(s) lost: %s", rkcg->rkcg_group_id->str, rkcg->rkcg_group_assignment->cnt, reason); rd_atomic32_set(&rkcg->rkcg_assignment_lost, rd_true); } /** * @brief Call when the current assignment is no longer considered lost, with a * human-readable reason. */ static void rd_kafka_cgrp_assignment_clear_lost(rd_kafka_cgrp_t *rkcg, char *fmt, ...) { va_list ap; char reason[256]; if (!rd_atomic32_get(&rkcg->rkcg_assignment_lost)) return; va_start(ap, fmt); rd_vsnprintf(reason, sizeof(reason), fmt, ap); va_end(ap); rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "LOST", "Group \"%s\": " "current assignment no longer considered lost: %s", rkcg->rkcg_group_id->str, reason); rd_atomic32_set(&rkcg->rkcg_assignment_lost, rd_false); } /** * @brief The rebalance protocol currently in use. This will be * RD_KAFKA_REBALANCE_PROTOCOL_NONE if the consumer has not * (yet) joined a group, else it will match the rebalance * protocol of the configured assignor(s). * * @locality main thread */ rd_kafka_rebalance_protocol_t rd_kafka_cgrp_rebalance_protocol(rd_kafka_cgrp_t *rkcg) { if (!rkcg->rkcg_assignor) return RD_KAFKA_REBALANCE_PROTOCOL_NONE; return rkcg->rkcg_assignor->rkas_protocol; } /** * @returns true if the cgrp is awaiting a protocol response. This prohibits * the join-state machine to proceed before the current state * is done. */ static rd_bool_t rd_kafka_cgrp_awaiting_response(rd_kafka_cgrp_t *rkcg) { return rkcg->rkcg_wait_resp != -1; } /** * @brief Set flag indicating we are waiting for a coordinator response * for the given request. * * This is used for specific requests to postpone rejoining the group if * there are outstanding JoinGroup or SyncGroup requests. * * @locality main thread */ static void rd_kafka_cgrp_set_wait_resp(rd_kafka_cgrp_t *rkcg, int16_t ApiKey) { rd_assert(rkcg->rkcg_wait_resp == -1); rkcg->rkcg_wait_resp = ApiKey; } /** * @brief Clear the flag that says we're waiting for a coordinator response * for the given \p request. * * @param request Original request, possibly NULL (for errors). * * @locality main thread */ static void rd_kafka_cgrp_clear_wait_resp(rd_kafka_cgrp_t *rkcg, int16_t ApiKey) { rd_assert(rkcg->rkcg_wait_resp == ApiKey); rkcg->rkcg_wait_resp = -1; } /** * @struct Auxillary glue type used for COOPERATIVE rebalance set operations. */ typedef struct PartitionMemberInfo_s { const rd_kafka_group_member_t *member; rd_bool_t members_match; } PartitionMemberInfo_t; static PartitionMemberInfo_t * PartitionMemberInfo_new(const rd_kafka_group_member_t *member, rd_bool_t members_match) { PartitionMemberInfo_t *pmi; pmi = rd_calloc(1, sizeof(*pmi)); pmi->member = member; pmi->members_match = members_match; return pmi; } static void PartitionMemberInfo_free(void *p) { PartitionMemberInfo_t *pmi = p; rd_free(pmi); } typedef RD_MAP_TYPE(const rd_kafka_topic_partition_t *, PartitionMemberInfo_t *) map_toppar_member_info_t; /** * @returns true if consumer has joined the group and thus requires a leave. */ #define RD_KAFKA_CGRP_HAS_JOINED(rkcg) \ (rkcg->rkcg_member_id != NULL && \ RD_KAFKAP_STR_LEN((rkcg)->rkcg_member_id) > 0) /** * @returns true if cgrp is waiting for a rebalance_cb to be handled by * the application. */ #define RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) \ ((rkcg)->rkcg_join_state == \ RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL || \ (rkcg)->rkcg_join_state == \ RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL) /** * @returns true if a rebalance is in progress. * * 1. In WAIT_JOIN or WAIT_METADATA state with a member-id set, * this happens on rejoin. * 2. In WAIT_SYNC waiting for the group to rebalance on the broker. * 3. in *_WAIT_UNASSIGN_TO_COMPLETE waiting for unassigned partitions to * stop fetching, et.al. * 4. In _WAIT_*ASSIGN_CALL waiting for the application to handle the * assignment changes in its rebalance callback and then call *assign(). * 5. An incremental rebalancing is in progress. * 6. A rebalance-induced rejoin is in progress. */ #define RD_KAFKA_CGRP_REBALANCING(rkcg) \ ((RD_KAFKA_CGRP_HAS_JOINED(rkcg) && \ ((rkcg)->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN || \ (rkcg)->rkcg_join_state == \ RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA)) || \ (rkcg)->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC || \ (rkcg)->rkcg_join_state == \ RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE || \ (rkcg)->rkcg_join_state == \ RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE || \ (rkcg)->rkcg_join_state == \ RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL || \ (rkcg)->rkcg_join_state == \ RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL || \ (rkcg)->rkcg_rebalance_incr_assignment != NULL || \ (rkcg)->rkcg_rebalance_rejoin) const char *rd_kafka_cgrp_state_names[] = { "init", "term", "query-coord", "wait-coord", "wait-broker", "wait-broker-transport", "up"}; const char *rd_kafka_cgrp_join_state_names[] = { "init", "wait-join", "wait-metadata", "wait-sync", "wait-assign-call", "wait-unassign-call", "wait-unassign-to-complete", "wait-incr-unassign-to-complete", "steady", }; /** * @brief Change the cgrp state. * * @returns 1 if the state was changed, else 0. */ static int rd_kafka_cgrp_set_state(rd_kafka_cgrp_t *rkcg, int state) { if ((int)rkcg->rkcg_state == state) return 0; rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPSTATE", "Group \"%.*s\" changed state %s -> %s " "(join-state %s)", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_cgrp_state_names[rkcg->rkcg_state], rd_kafka_cgrp_state_names[state], rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); rkcg->rkcg_state = state; rkcg->rkcg_ts_statechange = rd_clock(); rd_kafka_brokers_broadcast_state_change(rkcg->rkcg_rk); return 1; } void rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t *rkcg, int join_state) { if ((int)rkcg->rkcg_join_state == join_state) return; rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPJOINSTATE", "Group \"%.*s\" changed join state %s -> %s " "(state %s)", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], rd_kafka_cgrp_join_state_names[join_state], rd_kafka_cgrp_state_names[rkcg->rkcg_state]); rkcg->rkcg_join_state = join_state; } void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg) { rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_subscription); rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_group_leader.members); rd_kafka_cgrp_set_member_id(rkcg, NULL); if (rkcg->rkcg_group_instance_id) rd_kafkap_str_destroy(rkcg->rkcg_group_instance_id); rd_kafka_q_destroy_owner(rkcg->rkcg_q); rd_kafka_q_destroy_owner(rkcg->rkcg_ops); rd_kafka_q_destroy_owner(rkcg->rkcg_wait_coord_q); rd_kafka_assert(rkcg->rkcg_rk, TAILQ_EMPTY(&rkcg->rkcg_topics)); rd_kafka_assert(rkcg->rkcg_rk, rd_list_empty(&rkcg->rkcg_toppars)); rd_list_destroy(&rkcg->rkcg_toppars); rd_list_destroy(rkcg->rkcg_subscribed_topics); rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics); if (rkcg->rkcg_assignor && rkcg->rkcg_assignor->rkas_destroy_state_cb) rkcg->rkcg_assignor->rkas_destroy_state_cb( rkcg->rkcg_assignor_state); rd_free(rkcg); } /** * @brief Update the absolute session timeout following a successfull * response from the coordinator. * This timeout is used to enforce the session timeout in the * consumer itself. * * @param reset if true the timeout is updated even if the session has expired. */ static RD_INLINE void rd_kafka_cgrp_update_session_timeout(rd_kafka_cgrp_t *rkcg, rd_bool_t reset) { if (reset || rkcg->rkcg_ts_session_timeout != 0) rkcg->rkcg_ts_session_timeout = rd_clock() + (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000); } rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk, const rd_kafkap_str_t *group_id, const rd_kafkap_str_t *client_id) { rd_kafka_cgrp_t *rkcg; rkcg = rd_calloc(1, sizeof(*rkcg)); rkcg->rkcg_rk = rk; rkcg->rkcg_group_id = group_id; rkcg->rkcg_client_id = client_id; rkcg->rkcg_coord_id = -1; rkcg->rkcg_generation_id = -1; rkcg->rkcg_wait_resp = -1; rkcg->rkcg_ops = rd_kafka_q_new(rk); rkcg->rkcg_ops->rkq_serve = rd_kafka_cgrp_op_serve; rkcg->rkcg_ops->rkq_opaque = rkcg; rkcg->rkcg_wait_coord_q = rd_kafka_q_new(rk); rkcg->rkcg_wait_coord_q->rkq_serve = rkcg->rkcg_ops->rkq_serve; rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque; rkcg->rkcg_q = rd_kafka_q_new(rk); rkcg->rkcg_group_instance_id = rd_kafkap_str_new(rk->rk_conf.group_instance_id, -1); TAILQ_INIT(&rkcg->rkcg_topics); rd_list_init(&rkcg->rkcg_toppars, 32, NULL); rd_kafka_cgrp_set_member_id(rkcg, ""); rkcg->rkcg_subscribed_topics = rd_list_new(0, (void *)rd_kafka_topic_info_destroy); rd_interval_init(&rkcg->rkcg_coord_query_intvl); rd_interval_init(&rkcg->rkcg_heartbeat_intvl); rd_interval_init(&rkcg->rkcg_join_intvl); rd_interval_init(&rkcg->rkcg_timeout_scan_intvl); rd_atomic32_init(&rkcg->rkcg_assignment_lost, rd_false); rd_atomic32_init(&rkcg->rkcg_terminated, rd_false); rkcg->rkcg_errored_topics = rd_kafka_topic_partition_list_new(0); /* Create a logical group coordinator broker to provide * a dedicated connection for group coordination. * This is needed since JoinGroup may block for up to * max.poll.interval.ms, effectively blocking and timing out * any other protocol requests (such as Metadata). * The address for this broker will be updated when * the group coordinator is assigned. */ rkcg->rkcg_coord = rd_kafka_broker_add_logical(rk, "GroupCoordinator"); if (rk->rk_conf.enable_auto_commit && rk->rk_conf.auto_commit_interval_ms > 0) rd_kafka_timer_start( &rk->rk_timers, &rkcg->rkcg_offset_commit_tmr, rk->rk_conf.auto_commit_interval_ms * 1000ll, rd_kafka_cgrp_offset_commit_tmr_cb, rkcg); return rkcg; } /** * @brief Set the group coordinator broker. */ static void rd_kafka_cgrp_coord_set_broker(rd_kafka_cgrp_t *rkcg, rd_kafka_broker_t *rkb) { rd_assert(rkcg->rkcg_curr_coord == NULL); rd_assert(RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb)); rkcg->rkcg_curr_coord = rkb; rd_kafka_broker_keep(rkb); rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORDSET", "Group \"%.*s\" coordinator set to broker %s", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_broker_name(rkb)); /* Reset query interval to trigger an immediate * coord query if required */ if (!rd_interval_disabled(&rkcg->rkcg_coord_query_intvl)) rd_interval_reset(&rkcg->rkcg_coord_query_intvl); rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT); rd_kafka_broker_persistent_connection_add( rkcg->rkcg_coord, &rkcg->rkcg_coord->rkb_persistconn.coord); /* Set the logical coordinator's nodename to the * proper broker's nodename, this will trigger a (re)connect * to the new address. */ rd_kafka_broker_set_nodename(rkcg->rkcg_coord, rkb); } /** * @brief Reset/clear the group coordinator broker. */ static void rd_kafka_cgrp_coord_clear_broker(rd_kafka_cgrp_t *rkcg) { rd_kafka_broker_t *rkb = rkcg->rkcg_curr_coord; rd_assert(rkcg->rkcg_curr_coord); rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORDCLEAR", "Group \"%.*s\" broker %s is no longer coordinator", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_broker_name(rkb)); rd_assert(rkcg->rkcg_coord); rd_kafka_broker_persistent_connection_del( rkcg->rkcg_coord, &rkcg->rkcg_coord->rkb_persistconn.coord); /* Clear the ephemeral broker's nodename. * This will also trigger a disconnect. */ rd_kafka_broker_set_nodename(rkcg->rkcg_coord, NULL); rkcg->rkcg_curr_coord = NULL; rd_kafka_broker_destroy(rkb); /* from set_coord_broker() */ } /** * @brief Update/set the group coordinator. * * Will do nothing if there's been no change. * * @returns 1 if the coordinator, or state, was updated, else 0. */ static int rd_kafka_cgrp_coord_update(rd_kafka_cgrp_t *rkcg, int32_t coord_id) { /* Don't do anything while terminating */ if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM) return 0; /* Check if coordinator changed */ if (rkcg->rkcg_coord_id != coord_id) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPCOORD", "Group \"%.*s\" changing coordinator %" PRId32 " -> %" PRId32, RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rkcg->rkcg_coord_id, coord_id); /* Update coord id */ rkcg->rkcg_coord_id = coord_id; /* Clear previous broker handle, if any */ if (rkcg->rkcg_curr_coord) rd_kafka_cgrp_coord_clear_broker(rkcg); } if (rkcg->rkcg_curr_coord) { /* There is already a known coordinator and a * corresponding broker handle. */ if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP) return rd_kafka_cgrp_set_state( rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT); } else if (rkcg->rkcg_coord_id != -1) { rd_kafka_broker_t *rkb; /* Try to find the coordinator broker handle */ rd_kafka_rdlock(rkcg->rkcg_rk); rkb = rd_kafka_broker_find_by_nodeid(rkcg->rkcg_rk, coord_id); rd_kafka_rdunlock(rkcg->rkcg_rk); /* It is possible, due to stale metadata, that the * coordinator id points to a broker we still don't know * about. In this case the client will continue * querying metadata and querying for the coordinator * until a match is found. */ if (rkb) { /* Coordinator is known and broker handle exists */ rd_kafka_cgrp_coord_set_broker(rkcg, rkb); rd_kafka_broker_destroy(rkb); /*from find_by_nodeid()*/ return 1; } else { /* Coordinator is known but no corresponding * broker handle. */ return rd_kafka_cgrp_set_state( rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER); } } else { /* Coordinator still not known, re-query */ if (rkcg->rkcg_state >= RD_KAFKA_CGRP_STATE_WAIT_COORD) return rd_kafka_cgrp_set_state( rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); } return 0; /* no change */ } /** * Handle FindCoordinator response */ static void rd_kafka_cgrp_handle_FindCoordinator(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, void *opaque) { const int log_decode_errors = LOG_ERR; int16_t ErrorCode = 0; int32_t CoordId; rd_kafkap_str_t CoordHost = RD_ZERO_INIT; int32_t CoordPort; rd_kafka_cgrp_t *rkcg = opaque; struct rd_kafka_metadata_broker mdb = RD_ZERO_INIT; char *errstr = NULL; int actions; if (likely(!(ErrorCode = err))) { if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) rd_kafka_buf_read_throttle_time(rkbuf); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) { rd_kafkap_str_t ErrorMsg; rd_kafka_buf_read_str(rkbuf, &ErrorMsg); if (!RD_KAFKAP_STR_IS_NULL(&ErrorMsg)) RD_KAFKAP_STR_DUPA(&errstr, &ErrorMsg); } rd_kafka_buf_read_i32(rkbuf, &CoordId); rd_kafka_buf_read_str(rkbuf, &CoordHost); rd_kafka_buf_read_i32(rkbuf, &CoordPort); } if (ErrorCode) goto err; mdb.id = CoordId; RD_KAFKAP_STR_DUPA(&mdb.host, &CoordHost); mdb.port = CoordPort; rd_rkb_dbg(rkb, CGRP, "CGRPCOORD", "Group \"%.*s\" coordinator is %s:%i id %" PRId32, RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), mdb.host, mdb.port, mdb.id); rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, &mdb, NULL); rd_kafka_cgrp_coord_update(rkcg, CoordId); rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */ return; err_parse: /* Parse error */ ErrorCode = rkbuf->rkbuf_err; /* FALLTHRU */ err: if (!errstr) errstr = (char *)rd_kafka_err2str(ErrorCode); rd_rkb_dbg(rkb, CGRP, "CGRPCOORD", "Group \"%.*s\" FindCoordinator response error: %s: %s", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_err2name(ErrorCode), errstr); if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY) return; actions = rd_kafka_err_action( rkb, ErrorCode, request, RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_REFRESH, RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE, RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TRANSPORT, RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TIMED_OUT, RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE, RD_KAFKA_ERR_ACTION_END); if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { rd_kafka_cgrp_coord_update(rkcg, -1); } else { if (!(actions & RD_KAFKA_ERR_ACTION_RETRY) && rkcg->rkcg_last_err != ErrorCode) { /* Propagate non-retriable errors to the application */ rd_kafka_consumer_err( rkcg->rkcg_q, rd_kafka_broker_id(rkb), ErrorCode, 0, NULL, NULL, RD_KAFKA_OFFSET_INVALID, "FindCoordinator response error: %s", errstr); /* Suppress repeated errors */ rkcg->rkcg_last_err = ErrorCode; } /* Retries are performed by the timer-intervalled * coord queries, continue querying */ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); } rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */ } /** * Query for coordinator. * Ask any broker in state UP * * Locality: main thread */ void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason) { rd_kafka_broker_t *rkb; rd_kafka_resp_err_t err; rkb = rd_kafka_broker_any_usable( rkcg->rkcg_rk, RD_POLL_NOWAIT, RD_DO_LOCK, RD_KAFKA_FEATURE_BROKER_GROUP_COORD, "coordinator query"); if (!rkb) { /* Reset the interval because there were no brokers. When a * broker becomes available, we want to query it immediately. */ rd_interval_reset(&rkcg->rkcg_coord_query_intvl); rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPQUERY", "Group \"%.*s\": " "no broker available for coordinator query: %s", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason); return; } rd_rkb_dbg(rkb, CGRP, "CGRPQUERY", "Group \"%.*s\": querying for coordinator: %s", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason); err = rd_kafka_FindCoordinatorRequest( rkb, RD_KAFKA_COORD_GROUP, rkcg->rkcg_group_id->str, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), rd_kafka_cgrp_handle_FindCoordinator, rkcg); if (err) { rd_rkb_dbg(rkb, CGRP, "CGRPQUERY", "Group \"%.*s\": " "unable to send coordinator query: %s", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_err2str(err)); rd_kafka_broker_destroy(rkb); return; } if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_QUERY_COORD) rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_COORD); rd_kafka_broker_destroy(rkb); /* Back off the next intervalled query since we just sent one. */ rd_interval_reset_to_now(&rkcg->rkcg_coord_query_intvl, 0); } /** * @brief Mark the current coordinator as dead. * * @locality main thread */ void rd_kafka_cgrp_coord_dead(rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err, const char *reason) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORD", "Group \"%.*s\": " "marking the coordinator (%" PRId32 ") dead: %s: %s", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rkcg->rkcg_coord_id, rd_kafka_err2str(err), reason); rd_kafka_cgrp_coord_update(rkcg, -1); /* Re-query for coordinator */ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); rd_kafka_cgrp_coord_query(rkcg, reason); } /** * @returns a new reference to the current coordinator, if available, else NULL. * * @locality rdkafka main thread * @locks_required none * @locks_acquired none */ rd_kafka_broker_t *rd_kafka_cgrp_get_coord(rd_kafka_cgrp_t *rkcg) { if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkcg->rkcg_coord) return NULL; rd_kafka_broker_keep(rkcg->rkcg_coord); return rkcg->rkcg_coord; } /** * @brief cgrp handling of LeaveGroup responses * @param opaque must be the cgrp handle. * @locality rdkafka main thread (unless err==ERR__DESTROY) */ static void rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, void *opaque) { rd_kafka_cgrp_t *rkcg = opaque; const int log_decode_errors = LOG_ERR; int16_t ErrorCode = 0; if (err) { ErrorCode = err; goto err; } if (request->rkbuf_reqhdr.ApiVersion >= 1) rd_kafka_buf_read_throttle_time(rkbuf); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); err: if (ErrorCode) rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP", "LeaveGroup response error in state %s: %s", rd_kafka_cgrp_state_names[rkcg->rkcg_state], rd_kafka_err2str(ErrorCode)); else rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP", "LeaveGroup response received in state %s", rd_kafka_cgrp_state_names[rkcg->rkcg_state]); if (ErrorCode != RD_KAFKA_RESP_ERR__DESTROY) { rd_assert(thrd_is_current(rk->rk_thread)); rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE; rd_kafka_cgrp_try_terminate(rkcg); } return; err_parse: ErrorCode = rkbuf->rkbuf_err; goto err; } static void rd_kafka_cgrp_leave(rd_kafka_cgrp_t *rkcg) { char *member_id; RD_KAFKAP_STR_DUPA(&member_id, rkcg->rkcg_member_id); /* Leaving the group invalidates the member id, reset it * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */ rd_kafka_cgrp_set_member_id(rkcg, ""); if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE", "Group \"%.*s\": leave (in state %s): " "LeaveGroupRequest already in-transit", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_cgrp_state_names[rkcg->rkcg_state]); return; } rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE", "Group \"%.*s\": leave (in state %s)", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_cgrp_state_names[rkcg->rkcg_state]); rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE; if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) { rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE", "Leaving group"); rd_kafka_LeaveGroupRequest( rkcg->rkcg_coord, rkcg->rkcg_group_id->str, member_id, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), rd_kafka_cgrp_handle_LeaveGroup, rkcg); } else rd_kafka_cgrp_handle_LeaveGroup(rkcg->rkcg_rk, rkcg->rkcg_coord, RD_KAFKA_RESP_ERR__WAIT_COORD, NULL, NULL, rkcg); } /** * @brief Leave group, if desired. * * @returns true if a LeaveGroup was issued, else false. */ static rd_bool_t rd_kafka_cgrp_leave_maybe(rd_kafka_cgrp_t *rkcg) { /* We were not instructed to leave in the first place. */ if (!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE)) return rd_false; rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE; /* Don't send Leave when termating with NO_CONSUMER_CLOSE flag */ if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk)) return rd_false; /* KIP-345: Static group members must not send a LeaveGroupRequest * on termination. */ if (RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg) && rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) return rd_false; rd_kafka_cgrp_leave(rkcg); return rd_true; } /** * @brief Enqueues a rebalance op, delegating responsibility of calling * incremental_assign / incremental_unassign to the application. * If there is no rebalance handler configured, or the action * should not be delegated to the application for some other * reason, incremental_assign / incremental_unassign will be called * automatically, immediately. * * @param rejoin whether or not to rejoin the group following completion * of the incremental assign / unassign. * * @remarks does not take ownership of \p partitions. */ void rd_kafka_rebalance_op_incr(rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, rd_bool_t rejoin, const char *reason) { rd_kafka_error_t *error; /* Flag to rejoin after completion of the incr_assign or incr_unassign, if required. */ rkcg->rkcg_rebalance_rejoin = rejoin; rd_kafka_wrlock(rkcg->rkcg_rk); rkcg->rkcg_c.ts_rebalance = rd_clock(); rkcg->rkcg_c.rebalance_cnt++; rd_kafka_wrunlock(rkcg->rkcg_rk); if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk) || rd_kafka_fatal_error_code(rkcg->rkcg_rk)) { /* Total unconditional unassign in these cases */ rd_kafka_cgrp_unassign(rkcg); /* Now serve the assignment to make updates */ rd_kafka_assignment_serve(rkcg->rkcg_rk); goto done; } rd_kafka_cgrp_set_join_state( rkcg, err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ? RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL : RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL); /* Schedule application rebalance callback/event if enabled */ if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE) { rd_kafka_op_t *rko; rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN", "Group \"%s\": delegating incremental %s of %d " "partition(s) to application on queue %s: %s", rkcg->rkcg_group_id->str, err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ? "revoke" : "assign", partitions->cnt, rd_kafka_q_dest_name(rkcg->rkcg_q), reason); /* Pause currently assigned partitions while waiting for * rebalance callback to get called to make sure the * application will not receive any more messages that * might block it from serving the rebalance callback * and to not process messages for partitions it * might have lost in the rebalance. */ rd_kafka_assignment_pause(rkcg->rkcg_rk, "incremental rebalance"); rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE); rko->rko_err = err; rko->rko_u.rebalance.partitions = rd_kafka_topic_partition_list_copy(partitions); if (rd_kafka_q_enq(rkcg->rkcg_q, rko)) goto done; /* Rebalance op successfully enqueued */ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP", "Group \"%s\": ops queue is disabled, not " "delegating partition %s to application", rkcg->rkcg_group_id->str, err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ? "unassign" : "assign"); /* FALLTHRU */ } /* No application rebalance callback/event handler, or it is not * available, do the assign/unassign ourselves. * We need to be careful here not to trigger assignment_serve() * since it may call into the cgrp code again, in which case we * can't really track what the outcome state will be. */ if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) error = rd_kafka_cgrp_incremental_assign(rkcg, partitions); else error = rd_kafka_cgrp_incremental_unassign(rkcg, partitions); if (error) { rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE", "Group \"%s\": internal incremental %s " "of %d partition(s) failed: %s: " "unassigning all partitions and rejoining", rkcg->rkcg_group_id->str, err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ? "unassign" : "assign", partitions->cnt, rd_kafka_error_string(error)); rd_kafka_error_destroy(error); rd_kafka_cgrp_set_join_state(rkcg, /* This is a clean state for * assignment_done() to rejoin * from. */ RD_KAFKA_CGRP_JOIN_STATE_STEADY); rd_kafka_assignment_clear(rkcg->rkcg_rk); } /* Now serve the assignment to make updates */ rd_kafka_assignment_serve(rkcg->rkcg_rk); done: /* Update the current group assignment based on the * added/removed partitions. */ rd_kafka_cgrp_group_assignment_modify( rkcg, err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, partitions); } /** * @brief Enqueues a rebalance op, delegating responsibility of calling * assign / unassign to the application. If there is no rebalance * handler configured, or the action should not be delegated to the * application for some other reason, assign / unassign will be * called automatically. * * @remarks \p partitions is copied. */ static void rd_kafka_rebalance_op(rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *assignment, const char *reason) { rd_kafka_error_t *error; rd_kafka_wrlock(rkcg->rkcg_rk); rkcg->rkcg_c.ts_rebalance = rd_clock(); rkcg->rkcg_c.rebalance_cnt++; rd_kafka_wrunlock(rkcg->rkcg_rk); if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk) || rd_kafka_fatal_error_code(rkcg->rkcg_rk)) { /* Unassign */ rd_kafka_cgrp_unassign(rkcg); /* Now serve the assignment to make updates */ rd_kafka_assignment_serve(rkcg->rkcg_rk); goto done; } rd_assert(assignment != NULL); rd_kafka_cgrp_set_join_state( rkcg, err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS ? RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL : RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL); /* Schedule application rebalance callback/event if enabled */ if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE) { rd_kafka_op_t *rko; rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN", "Group \"%s\": delegating %s of %d partition(s) " "to application on queue %s: %s", rkcg->rkcg_group_id->str, err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ? "revoke" : "assign", assignment->cnt, rd_kafka_q_dest_name(rkcg->rkcg_q), reason); /* Pause currently assigned partitions while waiting for * rebalance callback to get called to make sure the * application will not receive any more messages that * might block it from serving the rebalance callback * and to not process messages for partitions it * might have lost in the rebalance. */ rd_kafka_assignment_pause(rkcg->rkcg_rk, "rebalance"); rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE); rko->rko_err = err; rko->rko_u.rebalance.partitions = rd_kafka_topic_partition_list_copy(assignment); if (rd_kafka_q_enq(rkcg->rkcg_q, rko)) goto done; /* Rebalance op successfully enqueued */ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP", "Group \"%s\": ops queue is disabled, not " "delegating partition %s to application", rkcg->rkcg_group_id->str, err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ? "unassign" : "assign"); /* FALLTHRU */ } /* No application rebalance callback/event handler, or it is not * available, do the assign/unassign ourselves. * We need to be careful here not to trigger assignment_serve() * since it may call into the cgrp code again, in which case we * can't really track what the outcome state will be. */ if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) error = rd_kafka_cgrp_assign(rkcg, assignment); else error = rd_kafka_cgrp_unassign(rkcg); if (error) { rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE", "Group \"%s\": internal %s " "of %d partition(s) failed: %s: " "unassigning all partitions and rejoining", rkcg->rkcg_group_id->str, err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS ? "unassign" : "assign", rkcg->rkcg_group_assignment->cnt, rd_kafka_error_string(error)); rd_kafka_error_destroy(error); rd_kafka_cgrp_set_join_state(rkcg, /* This is a clean state for * assignment_done() to rejoin * from. */ RD_KAFKA_CGRP_JOIN_STATE_STEADY); rd_kafka_assignment_clear(rkcg->rkcg_rk); } /* Now serve the assignment to make updates */ rd_kafka_assignment_serve(rkcg->rkcg_rk); done: if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) rd_kafka_cgrp_group_assignment_set(rkcg, assignment); else rd_kafka_cgrp_group_assignment_set(rkcg, NULL); } /** * @brief Rejoin the group. * * @remark This function must not have any side-effects but setting the * join state. */ static void rd_kafka_cgrp_rejoin(rd_kafka_cgrp_t *rkcg, const char *fmt, ...) RD_FORMAT(printf, 2, 3); static void rd_kafka_cgrp_rejoin(rd_kafka_cgrp_t *rkcg, const char *fmt, ...) { char reason[512]; va_list ap; char astr[128]; va_start(ap, fmt); rd_vsnprintf(reason, sizeof(reason), fmt, ap); va_end(ap); if (rkcg->rkcg_group_assignment) rd_snprintf(astr, sizeof(astr), " with %d owned partition(s)", rkcg->rkcg_group_assignment->cnt); else rd_snprintf(astr, sizeof(astr), " without an assignment"); if (rkcg->rkcg_subscription || rkcg->rkcg_next_subscription) { rd_kafka_dbg( rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REJOIN", "Group \"%s\": %s group%s: %s", rkcg->rkcg_group_id->str, rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT ? "Joining" : "Rejoining", astr, reason); } else { rd_kafka_dbg( rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "NOREJOIN", "Group \"%s\": Not %s group%s: %s: " "no subscribed topics", rkcg->rkcg_group_id->str, rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT ? "joining" : "rejoining", astr, reason); rd_kafka_cgrp_leave_maybe(rkcg); } rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT); } /** * @brief Collect all assigned or owned partitions from group members. * The member field of each result element is set to the associated * group member. The members_match field is set to rd_false. * * @param members Array of group members. * @param member_cnt Number of elements in members. * @param par_cnt The total number of partitions expected to be collected. * @param collect_owned If rd_true, rkgm_owned partitions will be collected, * else rkgm_assignment partitions will be collected. */ static map_toppar_member_info_t * rd_kafka_collect_partitions(const rd_kafka_group_member_t *members, size_t member_cnt, size_t par_cnt, rd_bool_t collect_owned) { size_t i; map_toppar_member_info_t *collected = rd_calloc(1, sizeof(*collected)); RD_MAP_INIT(collected, par_cnt, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash, rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); for (i = 0; i < member_cnt; i++) { size_t j; const rd_kafka_group_member_t *rkgm = &members[i]; const rd_kafka_topic_partition_list_t *toppars = collect_owned ? rkgm->rkgm_owned : rkgm->rkgm_assignment; for (j = 0; j < (size_t)toppars->cnt; j++) { rd_kafka_topic_partition_t *rktpar = rd_kafka_topic_partition_copy(&toppars->elems[j]); PartitionMemberInfo_t *pmi = PartitionMemberInfo_new(rkgm, rd_false); RD_MAP_SET(collected, rktpar, pmi); } } return collected; } /** * @brief Set intersection. Returns a set of all elements of \p a that * are also elements of \p b. Additionally, compares the members * field of matching elements from \p a and \p b and if not NULL * and equal, sets the members_match field in the result element * to rd_true and the member field to equal that of the elements, * else sets the members_match field to rd_false and member field * to NULL. */ static map_toppar_member_info_t * rd_kafka_member_partitions_intersect(map_toppar_member_info_t *a, map_toppar_member_info_t *b) { const rd_kafka_topic_partition_t *key; const PartitionMemberInfo_t *a_v; map_toppar_member_info_t *intersection = rd_calloc(1, sizeof(*intersection)); RD_MAP_INIT( intersection, RD_MIN(a ? RD_MAP_CNT(a) : 1, b ? RD_MAP_CNT(b) : 1), rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash, rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); if (!a || !b) return intersection; RD_MAP_FOREACH(key, a_v, a) { rd_bool_t members_match; const PartitionMemberInfo_t *b_v = RD_MAP_GET(b, key); if (b_v == NULL) continue; members_match = a_v->member && b_v->member && rd_kafka_group_member_cmp(a_v->member, b_v->member) == 0; RD_MAP_SET(intersection, rd_kafka_topic_partition_copy(key), PartitionMemberInfo_new(b_v->member, members_match)); } return intersection; } /** * @brief Set subtraction. Returns a set of all elements of \p a * that are not elements of \p b. Sets the member field in * elements in the returned set to equal that of the * corresponding element in \p a */ static map_toppar_member_info_t * rd_kafka_member_partitions_subtract(map_toppar_member_info_t *a, map_toppar_member_info_t *b) { const rd_kafka_topic_partition_t *key; const PartitionMemberInfo_t *a_v; map_toppar_member_info_t *difference = rd_calloc(1, sizeof(*difference)); RD_MAP_INIT(difference, a ? RD_MAP_CNT(a) : 1, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash, rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); if (!a) return difference; RD_MAP_FOREACH(key, a_v, a) { const PartitionMemberInfo_t *b_v = b ? RD_MAP_GET(b, key) : NULL; if (!b_v) RD_MAP_SET( difference, rd_kafka_topic_partition_copy(key), PartitionMemberInfo_new(a_v->member, rd_false)); } return difference; } /** * @brief Adjust the partition assignment as provided by the assignor * according to the COOPERATIVE protocol. */ static void rd_kafka_cooperative_protocol_adjust_assignment( rd_kafka_cgrp_t *rkcg, rd_kafka_group_member_t *members, int member_cnt) { /* https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafk\ a+Consumer+Incremental+Rebalance+Protocol */ int i; int expected_max_assignment_size; int total_assigned = 0; int not_revoking = 0; size_t par_cnt = 0; const rd_kafka_topic_partition_t *toppar; const PartitionMemberInfo_t *pmi; map_toppar_member_info_t *assigned; map_toppar_member_info_t *owned; map_toppar_member_info_t *maybe_revoking; map_toppar_member_info_t *ready_to_migrate; map_toppar_member_info_t *unknown_but_owned; for (i = 0; i < member_cnt; i++) par_cnt += members[i].rkgm_owned->cnt; assigned = rd_kafka_collect_partitions(members, member_cnt, par_cnt, rd_false /*assigned*/); owned = rd_kafka_collect_partitions(members, member_cnt, par_cnt, rd_true /*owned*/); rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP", "Group \"%s\": Partitions owned by members: %d, " "partitions assigned by assignor: %d", rkcg->rkcg_group_id->str, (int)RD_MAP_CNT(owned), (int)RD_MAP_CNT(assigned)); /* Still owned by some members */ maybe_revoking = rd_kafka_member_partitions_intersect(assigned, owned); /* Not previously owned by anyone */ ready_to_migrate = rd_kafka_member_partitions_subtract(assigned, owned); /* Don't exist in assigned partitions */ unknown_but_owned = rd_kafka_member_partitions_subtract(owned, assigned); /* Rough guess at a size that is a bit higher than * the maximum number of partitions likely to be * assigned to any partition. */ expected_max_assignment_size = (int)(RD_MAP_CNT(assigned) / member_cnt) + 4; for (i = 0; i < member_cnt; i++) { rd_kafka_group_member_t *rkgm = &members[i]; rd_kafka_topic_partition_list_destroy(rkgm->rkgm_assignment); rkgm->rkgm_assignment = rd_kafka_topic_partition_list_new( expected_max_assignment_size); } /* For maybe-revoking-partitions, check if the owner has * changed. If yes, exclude them from the assigned-partitions * list to the new owner. The old owner will realize it does * not own it any more, revoke it and then trigger another * rebalance for these partitions to finally be reassigned. */ RD_MAP_FOREACH(toppar, pmi, maybe_revoking) { if (!pmi->members_match) /* Owner has changed. */ continue; /* Owner hasn't changed. */ rd_kafka_topic_partition_list_add(pmi->member->rkgm_assignment, toppar->topic, toppar->partition); total_assigned++; not_revoking++; } /* For ready-to-migrate-partitions, it is safe to move them * to the new member immediately since we know no one owns * it before, and hence we can encode the owner from the * newly-assigned-partitions directly. */ RD_MAP_FOREACH(toppar, pmi, ready_to_migrate) { rd_kafka_topic_partition_list_add(pmi->member->rkgm_assignment, toppar->topic, toppar->partition); total_assigned++; } /* For unknown-but-owned-partitions, it is also safe to just * give them back to whoever claimed to be their owners by * encoding them directly as well. If this is due to a topic * metadata update, then a later rebalance will be triggered * anyway. */ RD_MAP_FOREACH(toppar, pmi, unknown_but_owned) { rd_kafka_topic_partition_list_add(pmi->member->rkgm_assignment, toppar->topic, toppar->partition); total_assigned++; } rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP", "Group \"%s\": COOPERATIVE protocol collection sizes: " "maybe revoking: %d, ready to migrate: %d, unknown but " "owned: %d", rkcg->rkcg_group_id->str, (int)RD_MAP_CNT(maybe_revoking), (int)RD_MAP_CNT(ready_to_migrate), (int)RD_MAP_CNT(unknown_but_owned)); rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP", "Group \"%s\": %d partitions assigned to consumers", rkcg->rkcg_group_id->str, total_assigned); RD_MAP_DESTROY_AND_FREE(maybe_revoking); RD_MAP_DESTROY_AND_FREE(ready_to_migrate); RD_MAP_DESTROY_AND_FREE(unknown_but_owned); RD_MAP_DESTROY_AND_FREE(assigned); RD_MAP_DESTROY_AND_FREE(owned); } /** * @brief Parses and handles the MemberState from a SyncGroupResponse. */ static void rd_kafka_cgrp_handle_SyncGroup_memberstate( rd_kafka_cgrp_t *rkcg, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, const rd_kafkap_bytes_t *member_state) { rd_kafka_buf_t *rkbuf = NULL; rd_kafka_topic_partition_list_t *assignment = NULL; const int log_decode_errors = LOG_ERR; int16_t Version; rd_kafkap_bytes_t UserData; /* Dont handle new assignments when terminating */ if (!err && rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) err = RD_KAFKA_RESP_ERR__DESTROY; if (err) goto err; if (RD_KAFKAP_BYTES_LEN(member_state) == 0) { /* Empty assignment. */ assignment = rd_kafka_topic_partition_list_new(0); memset(&UserData, 0, sizeof(UserData)); goto done; } /* Parse assignment from MemberState */ rkbuf = rd_kafka_buf_new_shadow( member_state->data, RD_KAFKAP_BYTES_LEN(member_state), NULL); /* Protocol parser needs a broker handle to log errors on. */ if (rkb) { rkbuf->rkbuf_rkb = rkb; rd_kafka_broker_keep(rkb); } else rkbuf->rkbuf_rkb = rd_kafka_broker_internal(rkcg->rkcg_rk); rd_kafka_buf_read_i16(rkbuf, &Version); const rd_kafka_topic_partition_field_t fields[] = { RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, RD_KAFKA_TOPIC_PARTITION_FIELD_END}; if (!(assignment = rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields))) goto err_parse; rd_kafka_buf_read_bytes(rkbuf, &UserData); done: rd_kafka_cgrp_update_session_timeout(rkcg, rd_true /*reset timeout*/); rd_assert(rkcg->rkcg_assignor); if (rkcg->rkcg_assignor->rkas_on_assignment_cb) { char *member_id; RD_KAFKAP_STR_DUPA(&member_id, rkcg->rkcg_member_id); rd_kafka_consumer_group_metadata_t *cgmd = rd_kafka_consumer_group_metadata_new_with_genid( rkcg->rkcg_rk->rk_conf.group_id_str, rkcg->rkcg_generation_id, member_id, rkcg->rkcg_rk->rk_conf.group_instance_id); rkcg->rkcg_assignor->rkas_on_assignment_cb( rkcg->rkcg_assignor, &(rkcg->rkcg_assignor_state), assignment, &UserData, cgmd); rd_kafka_consumer_group_metadata_destroy(cgmd); } // FIXME: Remove when we're done debugging. rd_kafka_topic_partition_list_log(rkcg->rkcg_rk, "ASSIGNMENT", RD_KAFKA_DBG_CGRP, assignment); /* Set the new assignment */ rd_kafka_cgrp_handle_assignment(rkcg, assignment); rd_kafka_topic_partition_list_destroy(assignment); if (rkbuf) rd_kafka_buf_destroy(rkbuf); return; err_parse: err = rkbuf->rkbuf_err; err: if (rkbuf) rd_kafka_buf_destroy(rkbuf); if (assignment) rd_kafka_topic_partition_list_destroy(assignment); rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPSYNC", "Group \"%s\": synchronization failed: %s: rejoining", rkcg->rkcg_group_id->str, rd_kafka_err2str(err)); if (err == RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID) rd_kafka_set_fatal_error(rkcg->rkcg_rk, err, "Fatal consumer error: %s", rd_kafka_err2str(err)); else if (err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION) rkcg->rkcg_generation_id = -1; else if (err == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID) rd_kafka_cgrp_set_member_id(rkcg, ""); if (rd_kafka_cgrp_rebalance_protocol(rkcg) == RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE && (err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION || err == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)) rd_kafka_cgrp_revoke_all_rejoin( rkcg, rd_true /*assignment is lost*/, rd_true /*this consumer is initiating*/, "SyncGroup error"); else rd_kafka_cgrp_rejoin(rkcg, "SyncGroup error: %s", rd_kafka_err2str(err)); } /** * @brief Cgrp handler for SyncGroup responses. opaque must be the cgrp handle. */ static void rd_kafka_cgrp_handle_SyncGroup(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, void *opaque) { rd_kafka_cgrp_t *rkcg = opaque; const int log_decode_errors = LOG_ERR; int16_t ErrorCode = 0; rd_kafkap_bytes_t MemberState = RD_ZERO_INIT; int actions; if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) { rd_kafka_dbg( rkb->rkb_rk, CGRP, "SYNCGROUP", "SyncGroup response: discarding outdated request " "(now in join-state %s)", rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); rd_kafka_cgrp_clear_wait_resp(rkcg, RD_KAFKAP_SyncGroup); return; } if (err) { ErrorCode = err; goto err; } if (request->rkbuf_reqhdr.ApiVersion >= 1) rd_kafka_buf_read_throttle_time(rkbuf); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); rd_kafka_buf_read_bytes(rkbuf, &MemberState); err: actions = rd_kafka_err_action(rkb, ErrorCode, request, RD_KAFKA_ERR_ACTION_END); if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { /* Re-query for coordinator */ rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, RD_KAFKA_OP_COORD_QUERY, ErrorCode); /* FALLTHRU */ } if (actions & RD_KAFKA_ERR_ACTION_RETRY) { if (rd_kafka_buf_retry(rkb, request)) return; /* FALLTHRU */ } rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP", "SyncGroup response: %s (%d bytes of MemberState data)", rd_kafka_err2str(ErrorCode), RD_KAFKAP_BYTES_LEN(&MemberState)); rd_kafka_cgrp_clear_wait_resp(rkcg, RD_KAFKAP_SyncGroup); if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY) return; /* Termination */ rd_kafka_cgrp_handle_SyncGroup_memberstate(rkcg, rkb, ErrorCode, &MemberState); return; err_parse: ErrorCode = rkbuf->rkbuf_err; goto err; } /** * @brief Run group assignment. */ static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg, rd_kafka_assignor_t *rkas, rd_kafka_resp_err_t err, rd_kafka_metadata_t *metadata, rd_kafka_group_member_t *members, int member_cnt) { char errstr[512]; if (err) { rd_snprintf(errstr, sizeof(errstr), "Failed to get cluster metadata: %s", rd_kafka_err2str(err)); goto err; } *errstr = '\0'; /* Run assignor */ err = rd_kafka_assignor_run(rkcg, rkas, metadata, members, member_cnt, errstr, sizeof(errstr)); if (err) { if (!*errstr) rd_snprintf(errstr, sizeof(errstr), "%s", rd_kafka_err2str(err)); goto err; } rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "ASSIGNOR", "Group \"%s\": \"%s\" assignor run for %d member(s)", rkcg->rkcg_group_id->str, rkas->rkas_protocol_name->str, member_cnt); if (rkas->rkas_protocol == RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE) rd_kafka_cooperative_protocol_adjust_assignment(rkcg, members, member_cnt); rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC); rd_kafka_cgrp_set_wait_resp(rkcg, RD_KAFKAP_SyncGroup); /* Respond to broker with assignment set or error */ rd_kafka_SyncGroupRequest( rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_generation_id, rkcg->rkcg_member_id, rkcg->rkcg_group_instance_id, members, err ? 0 : member_cnt, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), rd_kafka_cgrp_handle_SyncGroup, rkcg); return; err: rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "ASSIGNOR", "Group \"%s\": failed to run assignor \"%s\" for " "%d member(s): %s", rkcg->rkcg_group_id->str, rkas->rkas_protocol_name->str, member_cnt, errstr); rd_kafka_cgrp_rejoin(rkcg, "%s assignor failed: %s", rkas->rkas_protocol_name->str, errstr); } /** * @brief Op callback from handle_JoinGroup */ static rd_kafka_op_res_t rd_kafka_cgrp_assignor_handle_Metadata_op(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; /* Terminating */ if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA) return RD_KAFKA_OP_RES_HANDLED; /* From outdated state */ if (!rkcg->rkcg_group_leader.members) { rd_kafka_dbg(rk, CGRP, "GRPLEADER", "Group \"%.*s\": no longer leader: " "not running assignor", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); return RD_KAFKA_OP_RES_HANDLED; } rd_kafka_cgrp_assignor_run(rkcg, rkcg->rkcg_assignor, rko->rko_err, rko->rko_u.metadata.md, rkcg->rkcg_group_leader.members, rkcg->rkcg_group_leader.member_cnt); return RD_KAFKA_OP_RES_HANDLED; } /** * Parse single JoinGroup.Members.MemberMetadata for "consumer" ProtocolType * * Protocol definition: * https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal * * Returns 0 on success or -1 on error. */ static int rd_kafka_group_MemberMetadata_consumer_read( rd_kafka_broker_t *rkb, rd_kafka_group_member_t *rkgm, const rd_kafkap_bytes_t *MemberMetadata) { rd_kafka_buf_t *rkbuf; int16_t Version; int32_t subscription_cnt; rd_kafkap_bytes_t UserData; const int log_decode_errors = LOG_ERR; rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG; /* Create a shadow-buffer pointing to the metadata to ease parsing. */ rkbuf = rd_kafka_buf_new_shadow( MemberMetadata->data, RD_KAFKAP_BYTES_LEN(MemberMetadata), NULL); /* Protocol parser needs a broker handle to log errors on. */ rkbuf->rkbuf_rkb = rkb; rd_kafka_broker_keep(rkb); rd_kafka_buf_read_i16(rkbuf, &Version); rd_kafka_buf_read_i32(rkbuf, &subscription_cnt); if (subscription_cnt > 10000 || subscription_cnt <= 0) goto err; rkgm->rkgm_subscription = rd_kafka_topic_partition_list_new(subscription_cnt); while (subscription_cnt-- > 0) { rd_kafkap_str_t Topic; char *topic_name; rd_kafka_buf_read_str(rkbuf, &Topic); RD_KAFKAP_STR_DUPA(&topic_name, &Topic); rd_kafka_topic_partition_list_add( rkgm->rkgm_subscription, topic_name, RD_KAFKA_PARTITION_UA); } rd_kafka_buf_read_bytes(rkbuf, &UserData); rkgm->rkgm_userdata = rd_kafkap_bytes_copy(&UserData); const rd_kafka_topic_partition_field_t fields[] = { RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, RD_KAFKA_TOPIC_PARTITION_FIELD_END}; if (Version >= 1 && !(rkgm->rkgm_owned = rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields))) goto err; rd_kafka_buf_destroy(rkbuf); return 0; err_parse: err = rkbuf->rkbuf_err; err: rd_rkb_dbg(rkb, CGRP, "MEMBERMETA", "Failed to parse MemberMetadata for \"%.*s\": %s", RD_KAFKAP_STR_PR(rkgm->rkgm_member_id), rd_kafka_err2str(err)); if (rkgm->rkgm_subscription) { rd_kafka_topic_partition_list_destroy(rkgm->rkgm_subscription); rkgm->rkgm_subscription = NULL; } rd_kafka_buf_destroy(rkbuf); return -1; } /** * @brief cgrp handler for JoinGroup responses * opaque must be the cgrp handle. * * @locality rdkafka main thread (unless ERR__DESTROY: arbitrary thread) */ static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, void *opaque) { rd_kafka_cgrp_t *rkcg = opaque; const int log_decode_errors = LOG_ERR; int16_t ErrorCode = 0; int32_t GenerationId; rd_kafkap_str_t Protocol, LeaderId; rd_kafkap_str_t MyMemberId = RD_KAFKAP_STR_INITIALIZER; int32_t member_cnt; int actions; int i_am_leader = 0; rd_kafka_assignor_t *rkas = NULL; rd_kafka_cgrp_clear_wait_resp(rkcg, RD_KAFKAP_JoinGroup); if (err == RD_KAFKA_RESP_ERR__DESTROY || rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) return; /* Terminating */ if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN) { rd_kafka_dbg( rkb->rkb_rk, CGRP, "JOINGROUP", "JoinGroup response: discarding outdated request " "(now in join-state %s)", rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); return; } if (err) { ErrorCode = err; goto err; } if (request->rkbuf_reqhdr.ApiVersion >= 2) rd_kafka_buf_read_throttle_time(rkbuf); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); rd_kafka_buf_read_i32(rkbuf, &GenerationId); rd_kafka_buf_read_str(rkbuf, &Protocol); rd_kafka_buf_read_str(rkbuf, &LeaderId); rd_kafka_buf_read_str(rkbuf, &MyMemberId); rd_kafka_buf_read_i32(rkbuf, &member_cnt); if (!ErrorCode && RD_KAFKAP_STR_IS_NULL(&Protocol)) { /* Protocol not set, we will not be able to find * a matching assignor so error out early. */ ErrorCode = RD_KAFKA_RESP_ERR__BAD_MSG; } else if (!ErrorCode) { char *protocol_name; RD_KAFKAP_STR_DUPA(&protocol_name, &Protocol); if (!(rkas = rd_kafka_assignor_find(rkcg->rkcg_rk, protocol_name)) || !rkas->rkas_enabled) { rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP", "Unsupported assignment strategy \"%s\"", protocol_name); if (rkcg->rkcg_assignor) { if (rkcg->rkcg_assignor->rkas_destroy_state_cb) rkcg->rkcg_assignor ->rkas_destroy_state_cb( rkcg->rkcg_assignor_state); rkcg->rkcg_assignor_state = NULL; rkcg->rkcg_assignor = NULL; } ErrorCode = RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL; } } rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP", "JoinGroup response: GenerationId %" PRId32 ", " "Protocol %.*s, LeaderId %.*s%s, my MemberId %.*s, " "member metadata count " "%" PRId32 ": %s", GenerationId, RD_KAFKAP_STR_PR(&Protocol), RD_KAFKAP_STR_PR(&LeaderId), RD_KAFKAP_STR_LEN(&MyMemberId) && !rd_kafkap_str_cmp(&LeaderId, &MyMemberId) ? " (me)" : "", RD_KAFKAP_STR_PR(&MyMemberId), member_cnt, ErrorCode ? rd_kafka_err2str(ErrorCode) : "(no error)"); if (!ErrorCode) { char *my_member_id; RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId); rd_kafka_cgrp_set_member_id(rkcg, my_member_id); rkcg->rkcg_generation_id = GenerationId; i_am_leader = !rd_kafkap_str_cmp(&LeaderId, &MyMemberId); } else { rd_interval_backoff(&rkcg->rkcg_join_intvl, 1000 * 1000); goto err; } if (rkcg->rkcg_assignor && rkcg->rkcg_assignor != rkas) { if (rkcg->rkcg_assignor->rkas_destroy_state_cb) rkcg->rkcg_assignor->rkas_destroy_state_cb( rkcg->rkcg_assignor_state); rkcg->rkcg_assignor_state = NULL; } rkcg->rkcg_assignor = rkas; if (i_am_leader) { rd_kafka_group_member_t *members; int i; int sub_cnt = 0; rd_list_t topics; rd_kafka_op_t *rko; rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP", "I am elected leader for group \"%s\" " "with %" PRId32 " member(s)", rkcg->rkcg_group_id->str, member_cnt); if (member_cnt > 100000) { err = RD_KAFKA_RESP_ERR__BAD_MSG; goto err; } rd_list_init(&topics, member_cnt, rd_free); members = rd_calloc(member_cnt, sizeof(*members)); for (i = 0; i < member_cnt; i++) { rd_kafkap_str_t MemberId; rd_kafkap_bytes_t MemberMetadata; rd_kafka_group_member_t *rkgm; rd_kafkap_str_t GroupInstanceId = RD_KAFKAP_STR_INITIALIZER; rd_kafka_buf_read_str(rkbuf, &MemberId); if (request->rkbuf_reqhdr.ApiVersion >= 5) rd_kafka_buf_read_str(rkbuf, &GroupInstanceId); rd_kafka_buf_read_bytes(rkbuf, &MemberMetadata); rkgm = &members[sub_cnt]; rkgm->rkgm_member_id = rd_kafkap_str_copy(&MemberId); rkgm->rkgm_group_instance_id = rd_kafkap_str_copy(&GroupInstanceId); rd_list_init(&rkgm->rkgm_eligible, 0, NULL); rkgm->rkgm_generation = -1; if (rd_kafka_group_MemberMetadata_consumer_read( rkb, rkgm, &MemberMetadata)) { /* Failed to parse this member's metadata, * ignore it. */ } else { sub_cnt++; rkgm->rkgm_assignment = rd_kafka_topic_partition_list_new( rkgm->rkgm_subscription->cnt); rd_kafka_topic_partition_list_get_topic_names( rkgm->rkgm_subscription, &topics, 0 /*dont include regex*/); } } /* FIXME: What to do if parsing failed for some/all members? * It is a sign of incompatibility. */ rd_kafka_cgrp_group_leader_reset(rkcg, "JoinGroup response clean-up"); rd_kafka_assert(NULL, rkcg->rkcg_group_leader.members == NULL); rkcg->rkcg_group_leader.members = members; rkcg->rkcg_group_leader.member_cnt = sub_cnt; rd_kafka_cgrp_set_join_state( rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA); /* The assignor will need metadata so fetch it asynchronously * and run the assignor when we get a reply. * Create a callback op that the generic metadata code * will trigger when metadata has been parsed. */ rko = rd_kafka_op_new_cb( rkcg->rkcg_rk, RD_KAFKA_OP_METADATA, rd_kafka_cgrp_assignor_handle_Metadata_op); rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, NULL); rd_kafka_MetadataRequest( rkb, &topics, "partition assignor", rd_false /*!allow_auto_create*/, /* cgrp_update=false: * Since the subscription list may not be identical * across all members of the group and thus the * Metadata response may not be identical to this * consumer's subscription list, we want to * avoid triggering a rejoin or error propagation * on receiving the response since some topics * may be missing. */ rd_false, rko); rd_list_destroy(&topics); } else { rd_kafka_cgrp_set_join_state( rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC); rd_kafka_cgrp_set_wait_resp(rkcg, RD_KAFKAP_SyncGroup); rd_kafka_SyncGroupRequest( rkb, rkcg->rkcg_group_id, rkcg->rkcg_generation_id, rkcg->rkcg_member_id, rkcg->rkcg_group_instance_id, NULL, 0, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), rd_kafka_cgrp_handle_SyncGroup, rkcg); } err: actions = rd_kafka_err_action( rkb, ErrorCode, request, RD_KAFKA_ERR_ACTION_IGNORE, RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID, RD_KAFKA_ERR_ACTION_IGNORE, RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED, RD_KAFKA_ERR_ACTION_IGNORE, RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION, RD_KAFKA_ERR_ACTION_PERMANENT, RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID, RD_KAFKA_ERR_ACTION_END); if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { /* Re-query for coordinator */ rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ, RD_KAFKA_OP_COORD_QUERY, ErrorCode); } /* No need for retries here since the join is intervalled, * see rkcg_join_intvl */ if (ErrorCode) { if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY) return; /* Termination */ if (ErrorCode == RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID) { rd_kafka_set_fatal_error(rkcg->rkcg_rk, ErrorCode, "Fatal consumer error: %s", rd_kafka_err2str(ErrorCode)); ErrorCode = RD_KAFKA_RESP_ERR__FATAL; } else if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) rd_kafka_consumer_err( rkcg->rkcg_q, rd_kafka_broker_id(rkb), ErrorCode, 0, NULL, NULL, RD_KAFKA_OFFSET_INVALID, "JoinGroup failed: %s", rd_kafka_err2str(ErrorCode)); if (ErrorCode == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID) rd_kafka_cgrp_set_member_id(rkcg, ""); else if (ErrorCode == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION) rkcg->rkcg_generation_id = -1; else if (ErrorCode == RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED) { /* KIP-394 requires member.id on initial join * group request */ char *my_member_id; RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId); rd_kafka_cgrp_set_member_id(rkcg, my_member_id); /* Skip the join backoff */ rd_interval_reset(&rkcg->rkcg_join_intvl); } if (rd_kafka_cgrp_rebalance_protocol(rkcg) == RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE && (ErrorCode == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION || ErrorCode == RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED)) rd_kafka_cgrp_revoke_all_rejoin( rkcg, rd_true /*assignment is lost*/, rd_true /*this consumer is initiating*/, "JoinGroup error"); else rd_kafka_cgrp_rejoin(rkcg, "JoinGroup error: %s", rd_kafka_err2str(ErrorCode)); } return; err_parse: ErrorCode = rkbuf->rkbuf_err; goto err; } /** * @brief Check subscription against requested Metadata. */ static rd_kafka_op_res_t rd_kafka_cgrp_handle_Metadata_op(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; /* Terminating */ rd_kafka_cgrp_metadata_update_check(rkcg, rd_false /*dont rejoin*/); return RD_KAFKA_OP_RES_HANDLED; } /** * @brief (Async) Refresh metadata (for cgrp's needs) * * @returns 1 if metadata refresh was requested, or 0 if metadata is * up to date, or -1 if no broker is available for metadata requests. * * @locks none * @locality rdkafka main thread */ static int rd_kafka_cgrp_metadata_refresh(rd_kafka_cgrp_t *rkcg, int *metadata_agep, const char *reason) { rd_kafka_t *rk = rkcg->rkcg_rk; rd_kafka_op_t *rko; rd_list_t topics; rd_kafka_resp_err_t err; rd_list_init(&topics, 8, rd_free); /* Insert all non-wildcard topics in cache. */ rd_kafka_metadata_cache_hint_rktparlist( rkcg->rkcg_rk, rkcg->rkcg_subscription, NULL, 0 /*dont replace*/); if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) { /* For wildcard subscriptions make sure the * cached full metadata isn't too old. */ int metadata_age = -1; if (rk->rk_ts_full_metadata) metadata_age = (int)(rd_clock() - rk->rk_ts_full_metadata) / 1000; *metadata_agep = metadata_age; if (metadata_age != -1 && metadata_age <= rk->rk_conf.metadata_max_age_ms) { rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_METADATA, "CGRPMETADATA", "%s: metadata for wildcard subscription " "is up to date (%dms old)", reason, *metadata_agep); rd_list_destroy(&topics); return 0; /* Up-to-date */ } } else { /* Check that all subscribed topics are in the cache. */ int r; rd_kafka_topic_partition_list_get_topic_names( rkcg->rkcg_subscription, &topics, 0 /*no regexps*/); rd_kafka_rdlock(rk); r = rd_kafka_metadata_cache_topics_count_exists(rk, &topics, metadata_agep); rd_kafka_rdunlock(rk); if (r == rd_list_cnt(&topics)) { rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_METADATA, "CGRPMETADATA", "%s: metadata for subscription " "is up to date (%dms old)", reason, *metadata_agep); rd_list_destroy(&topics); return 0; /* Up-to-date and all topics exist. */ } rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_METADATA, "CGRPMETADATA", "%s: metadata for subscription " "only available for %d/%d topics (%dms old)", reason, r, rd_list_cnt(&topics), *metadata_agep); } /* Async request, result will be triggered from * rd_kafka_parse_metadata(). */ rko = rd_kafka_op_new_cb(rkcg->rkcg_rk, RD_KAFKA_OP_METADATA, rd_kafka_cgrp_handle_Metadata_op); rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, 0); err = rd_kafka_metadata_request(rkcg->rkcg_rk, NULL, &topics, rd_false /*!allow auto create */, rd_true /*cgrp_update*/, reason, rko); if (err) { rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_METADATA, "CGRPMETADATA", "%s: need to refresh metadata (%dms old) " "but no usable brokers available: %s", reason, *metadata_agep, rd_kafka_err2str(err)); rd_kafka_op_destroy(rko); } rd_list_destroy(&topics); return err ? -1 : 1; } static void rd_kafka_cgrp_join(rd_kafka_cgrp_t *rkcg) { int metadata_age; if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_INIT || rd_kafka_cgrp_awaiting_response(rkcg)) return; /* On max.poll.interval.ms failure, do not rejoin group until the * application has called poll. */ if ((rkcg->rkcg_flags & RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED) && rd_kafka_max_poll_exceeded(rkcg->rkcg_rk)) return; rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED; rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN", "Group \"%.*s\": join with %d subscribed topic(s)", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_list_cnt(rkcg->rkcg_subscribed_topics)); /* See if we need to query metadata to continue: * - if subscription contains wildcards: * * query all topics in cluster * * - if subscription does not contain wildcards but * some topics are missing from the local metadata cache: * * query subscribed topics (all cached ones) * * - otherwise: * * rely on topic metadata cache */ /* We need up-to-date full metadata to continue, * refresh metadata if necessary. */ if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age, "consumer join") == 1) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "JOIN", "Group \"%.*s\": " "postponing join until up-to-date " "metadata is available", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); rd_assert( rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT || /* Possible via rd_kafka_cgrp_modify_subscription */ rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY); rd_kafka_cgrp_set_join_state( rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA); return; /* ^ async call */ } if (rd_list_empty(rkcg->rkcg_subscribed_topics)) rd_kafka_cgrp_metadata_update_check(rkcg, rd_false /*dont join*/); if (rd_list_empty(rkcg->rkcg_subscribed_topics)) { rd_kafka_dbg( rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "JOIN", "Group \"%.*s\": " "no matching topics based on %dms old metadata: " "next metadata refresh in %dms", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), metadata_age, rkcg->rkcg_rk->rk_conf.metadata_refresh_interval_ms - metadata_age); return; } rd_rkb_dbg( rkcg->rkcg_curr_coord, CONSUMER | RD_KAFKA_DBG_CGRP, "JOIN", "Joining group \"%.*s\" with %d subscribed topic(s) and " "member id \"%.*s\"", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_list_cnt(rkcg->rkcg_subscribed_topics), rkcg->rkcg_member_id ? RD_KAFKAP_STR_LEN(rkcg->rkcg_member_id) : 0, rkcg->rkcg_member_id ? rkcg->rkcg_member_id->str : ""); rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN); rd_kafka_cgrp_set_wait_resp(rkcg, RD_KAFKAP_JoinGroup); rd_kafka_JoinGroupRequest( rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_member_id, rkcg->rkcg_group_instance_id, rkcg->rkcg_rk->rk_conf.group_protocol_type, rkcg->rkcg_subscribed_topics, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), rd_kafka_cgrp_handle_JoinGroup, rkcg); } /** * Rejoin group on update to effective subscribed topics list */ static void rd_kafka_cgrp_revoke_rejoin(rd_kafka_cgrp_t *rkcg, const char *reason) { /* * Clean-up group leader duties, if any. */ rd_kafka_cgrp_group_leader_reset(rkcg, "group (re)join"); rd_kafka_dbg( rkcg->rkcg_rk, CGRP, "REJOIN", "Group \"%.*s\" (re)joining in join-state %s " "with %d assigned partition(s): %s", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0, reason); rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_false /*not lost*/, rd_true /*initiating*/, reason); } /** * @brief Update the effective list of subscribed topics. * * Set \p tinfos to NULL to clear the list. * * @param tinfos rd_list_t(rd_kafka_topic_info_t *): new effective topic list * * @returns true on change, else false. * * @remark Takes ownership of \p tinfos */ static rd_bool_t rd_kafka_cgrp_update_subscribed_topics(rd_kafka_cgrp_t *rkcg, rd_list_t *tinfos) { rd_kafka_topic_info_t *tinfo; int i; if (!tinfos) { if (!rd_list_empty(rkcg->rkcg_subscribed_topics)) rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION", "Group \"%.*s\": " "clearing subscribed topics list (%d)", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_list_cnt(rkcg->rkcg_subscribed_topics)); tinfos = rd_list_new(0, (void *)rd_kafka_topic_info_destroy); } else { if (rd_list_cnt(tinfos) == 0) rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION", "Group \"%.*s\": " "no topics in metadata matched " "subscription", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); } /* Sort for comparison */ rd_list_sort(tinfos, rd_kafka_topic_info_cmp); /* Compare to existing to see if anything changed. */ if (!rd_list_cmp(rkcg->rkcg_subscribed_topics, tinfos, rd_kafka_topic_info_cmp)) { /* No change */ rd_list_destroy(tinfos); return rd_false; } rd_kafka_dbg( rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_METADATA, "SUBSCRIPTION", "Group \"%.*s\": effective subscription list changed " "from %d to %d topic(s):", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_list_cnt(rkcg->rkcg_subscribed_topics), rd_list_cnt(tinfos)); RD_LIST_FOREACH(tinfo, tinfos, i) rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_METADATA, "SUBSCRIPTION", " Topic %s with %d partition(s)", tinfo->topic, tinfo->partition_cnt); rd_list_destroy(rkcg->rkcg_subscribed_topics); rkcg->rkcg_subscribed_topics = tinfos; return rd_true; } /** * @brief Handle Heartbeat response. */ void rd_kafka_cgrp_handle_Heartbeat(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, void *opaque) { rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; const int log_decode_errors = LOG_ERR; int16_t ErrorCode = 0; int actions = 0; if (err == RD_KAFKA_RESP_ERR__DESTROY) return; rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT); rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR; if (err) goto err; if (request->rkbuf_reqhdr.ApiVersion >= 1) rd_kafka_buf_read_throttle_time(rkbuf); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); if (ErrorCode) { err = ErrorCode; goto err; } rd_kafka_cgrp_update_session_timeout( rkcg, rd_false /*don't update if session has expired*/); return; err_parse: err = rkbuf->rkbuf_err; err: rkcg->rkcg_last_heartbeat_err = err; rd_kafka_dbg( rkcg->rkcg_rk, CGRP, "HEARTBEAT", "Group \"%s\" heartbeat error response in " "state %s (join-state %s, %d partition(s) assigned): %s", rkcg->rkcg_group_id->str, rd_kafka_cgrp_state_names[rkcg->rkcg_state], rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0, rd_kafka_err2str(err)); if (rkcg->rkcg_join_state <= RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) { rd_kafka_dbg( rkcg->rkcg_rk, CGRP, "HEARTBEAT", "Heartbeat response: discarding outdated " "request (now in join-state %s)", rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); return; } switch (err) { case RD_KAFKA_RESP_ERR__DESTROY: /* quick cleanup */ return; case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP: case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE: case RD_KAFKA_RESP_ERR__TRANSPORT: rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT", "Heartbeat failed due to coordinator (%s) " "no longer available: %s: " "re-querying for coordinator", rkcg->rkcg_curr_coord ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) : "none", rd_kafka_err2str(err)); /* Remain in joined state and keep querying for coordinator */ actions = RD_KAFKA_ERR_ACTION_REFRESH; break; case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS: rd_kafka_cgrp_update_session_timeout( rkcg, rd_false /*don't update if session has expired*/); /* No further action if already rebalancing */ if (RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg)) return; rd_kafka_cgrp_group_is_rebalancing(rkcg); return; case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: rd_kafka_cgrp_set_member_id(rkcg, ""); rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_true /*lost*/, rd_true /*initiating*/, "resetting member-id"); return; case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION: rkcg->rkcg_generation_id = -1; rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_true /*lost*/, rd_true /*initiating*/, "illegal generation"); return; case RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID: rd_kafka_set_fatal_error(rkcg->rkcg_rk, err, "Fatal consumer error: %s", rd_kafka_err2str(err)); rd_kafka_cgrp_revoke_all_rejoin_maybe( rkcg, rd_true, /*assignment lost*/ rd_true, /*initiating*/ "consumer fenced by " "newer instance"); return; default: actions = rd_kafka_err_action(rkb, err, request, RD_KAFKA_ERR_ACTION_END); break; } if (actions & RD_KAFKA_ERR_ACTION_REFRESH) { /* Re-query for coordinator */ rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err)); } if (actions & RD_KAFKA_ERR_ACTION_RETRY && rd_kafka_buf_retry(rkb, request)) { /* Retry */ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; return; } } /** * @brief Send Heartbeat */ static void rd_kafka_cgrp_heartbeat(rd_kafka_cgrp_t *rkcg) { /* Don't send heartbeats if max.poll.interval.ms was exceeded */ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED) return; /* Skip heartbeat if we have one in transit */ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT) return; rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT; rd_kafka_HeartbeatRequest( rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_generation_id, rkcg->rkcg_member_id, rkcg->rkcg_group_instance_id, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), rd_kafka_cgrp_handle_Heartbeat, NULL); } /** * Cgrp is now terminated: decommission it and signal back to application. */ static void rd_kafka_cgrp_terminated(rd_kafka_cgrp_t *rkcg) { if (rd_atomic32_get(&rkcg->rkcg_terminated)) return; /* terminated() may be called multiple times, * make sure to only terminate once. */ rd_kafka_cgrp_group_assignment_set(rkcg, NULL); rd_kafka_assert(NULL, !rd_kafka_assignment_in_progress(rkcg->rkcg_rk)); rd_kafka_assert(NULL, !rkcg->rkcg_group_assignment); rd_kafka_assert(NULL, rkcg->rkcg_rk->rk_consumer.wait_commit_cnt == 0); rd_kafka_assert(NULL, rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM); rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers, &rkcg->rkcg_offset_commit_tmr, 1 /*lock*/); rd_kafka_q_purge(rkcg->rkcg_wait_coord_q); /* Disable and empty ops queue since there will be no * (broker) thread serving it anymore after the unassign_broker * below. * This prevents hang on destroy where responses are enqueued on * rkcg_ops without anything serving the queue. */ rd_kafka_q_disable(rkcg->rkcg_ops); rd_kafka_q_purge(rkcg->rkcg_ops); if (rkcg->rkcg_curr_coord) rd_kafka_cgrp_coord_clear_broker(rkcg); if (rkcg->rkcg_coord) { rd_kafka_broker_destroy(rkcg->rkcg_coord); rkcg->rkcg_coord = NULL; } rd_atomic32_set(&rkcg->rkcg_terminated, rd_true); rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM", "Consumer group sub-system terminated%s", rkcg->rkcg_reply_rko ? " (will enqueue reply)" : ""); if (rkcg->rkcg_reply_rko) { /* Signal back to application. */ rd_kafka_replyq_enq(&rkcg->rkcg_reply_rko->rko_replyq, rkcg->rkcg_reply_rko, 0); rkcg->rkcg_reply_rko = NULL; } /* Remove cgrp application queue forwarding, if any. */ rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL); } /** * If a cgrp is terminating and all outstanding ops are now finished * then progress to final termination and return 1. * Else returns 0. */ static RD_INLINE int rd_kafka_cgrp_try_terminate(rd_kafka_cgrp_t *rkcg) { if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM) return 1; if (likely(!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE))) return 0; /* Check if wait-coord queue has timed out. */ if (rd_kafka_q_len(rkcg->rkcg_wait_coord_q) > 0 && rkcg->rkcg_ts_terminate + (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000) < rd_clock()) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM", "Group \"%s\": timing out %d op(s) in " "wait-for-coordinator queue", rkcg->rkcg_group_id->str, rd_kafka_q_len(rkcg->rkcg_wait_coord_q)); rd_kafka_q_disable(rkcg->rkcg_wait_coord_q); if (rd_kafka_q_concat(rkcg->rkcg_ops, rkcg->rkcg_wait_coord_q) == -1) { /* ops queue shut down, purge coord queue */ rd_kafka_q_purge(rkcg->rkcg_wait_coord_q); } } if (!RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) && rd_list_empty(&rkcg->rkcg_toppars) && !rd_kafka_assignment_in_progress(rkcg->rkcg_rk) && rkcg->rkcg_rk->rk_consumer.wait_commit_cnt == 0 && !(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE)) { /* Since we might be deep down in a 'rko' handler * called from cgrp_op_serve() we cant call terminated() * directly since it will decommission the rkcg_ops queue * that might be locked by intermediate functions. * Instead set the TERM state and let the cgrp terminate * at its own discretion. */ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_TERM); return 1; } else { rd_kafka_dbg( rkcg->rkcg_rk, CGRP, "CGRPTERM", "Group \"%s\": " "waiting for %s%d toppar(s), " "%s" "%d commit(s)%s%s%s (state %s, join-state %s) " "before terminating", rkcg->rkcg_group_id->str, RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) ? "assign call, " : "", rd_list_cnt(&rkcg->rkcg_toppars), rd_kafka_assignment_in_progress(rkcg->rkcg_rk) ? "assignment in progress, " : "", rkcg->rkcg_rk->rk_consumer.wait_commit_cnt, (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) ? ", wait-leave," : "", rkcg->rkcg_rebalance_rejoin ? ", rebalance_rejoin," : "", (rkcg->rkcg_rebalance_incr_assignment != NULL) ? ", rebalance_incr_assignment," : "", rd_kafka_cgrp_state_names[rkcg->rkcg_state], rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); return 0; } } /** * @brief Add partition to this cgrp management * * @locks none */ static void rd_kafka_cgrp_partition_add(rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTADD", "Group \"%s\": add %s [%" PRId32 "]", rkcg->rkcg_group_id->str, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition); rd_kafka_toppar_lock(rktp); rd_assert(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP)); rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_ON_CGRP; rd_kafka_toppar_unlock(rktp); rd_kafka_toppar_keep(rktp); rd_list_add(&rkcg->rkcg_toppars, rktp); } /** * @brief Remove partition from this cgrp management * * @locks none */ static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp) { int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0; rd_kafka_op_t *rko; rd_kafka_q_t *rkq; rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", "Group \"%s\": delete %s [%" PRId32 "]", rkcg->rkcg_group_id->str, rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition); rd_kafka_toppar_lock(rktp); rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP); rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP; if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) { /* Partition is being removed from the cluster and it's stopped, * so rktp->rktp_fetchq->rkq_fwdq is NULL. * Purge remaining operations in rktp->rktp_fetchq->rkq_q, * while holding lock, to avoid circular references */ rkq = rktp->rktp_fetchq; mtx_lock(&rkq->rkq_lock); rd_assert(!rkq->rkq_fwdq); rko = TAILQ_FIRST(&rkq->rkq_q); while (rko) { if (rko->rko_type != RD_KAFKA_OP_BARRIER && rko->rko_type != RD_KAFKA_OP_FETCH) { rd_kafka_log( rkcg->rkcg_rk, LOG_WARNING, "PARTDEL", "Purging toppar fetch queue buffer op" "with unexpected type: %s", rd_kafka_op2str(rko->rko_type)); } if (rko->rko_type == RD_KAFKA_OP_BARRIER) barrier_cnt++; else if (rko->rko_type == RD_KAFKA_OP_FETCH) message_cnt++; else other_cnt++; rko = TAILQ_NEXT(rko, rko_link); cnt++; } mtx_unlock(&rkq->rkq_lock); if (cnt) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", "Purge toppar fetch queue buffer " "containing %d op(s) " "(%d barrier(s), %d message(s), %d other)" " to avoid " "circular references", cnt, barrier_cnt, message_cnt, other_cnt); rd_kafka_q_purge(rktp->rktp_fetchq); } else { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL", "Not purging toppar fetch queue buffer." " No ops present in the buffer."); } } rd_kafka_toppar_unlock(rktp); rd_list_remove(&rkcg->rkcg_toppars, rktp); rd_kafka_toppar_destroy(rktp); /* refcnt from _add above */ rd_kafka_cgrp_try_terminate(rkcg); } /** * @brief Defer offset commit (rko) until coordinator is available. * * @returns 1 if the rko was deferred or 0 if the defer queue is disabled * or rko already deferred. */ static int rd_kafka_cgrp_defer_offset_commit(rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko, const char *reason) { /* wait_coord_q is disabled session.timeout.ms after * group close() has been initated. */ if (rko->rko_u.offset_commit.ts_timeout != 0 || !rd_kafka_q_ready(rkcg->rkcg_wait_coord_q)) return 0; rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT", "Group \"%s\": " "unable to OffsetCommit in state %s: %s: " "coordinator (%s) is unavailable: " "retrying later", rkcg->rkcg_group_id->str, rd_kafka_cgrp_state_names[rkcg->rkcg_state], reason, rkcg->rkcg_curr_coord ? rd_kafka_broker_name(rkcg->rkcg_curr_coord) : "none"); rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS; rko->rko_u.offset_commit.ts_timeout = rd_clock() + (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000); rd_kafka_q_enq(rkcg->rkcg_wait_coord_q, rko); return 1; } /** * @brief Update the committed offsets for the partitions in \p offsets, * * @remark \p offsets may be NULL if \p err is set * @returns the number of partitions with errors encountered */ static int rd_kafka_cgrp_update_committed_offsets( rd_kafka_cgrp_t *rkcg, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *offsets) { int i; int errcnt = 0; /* Update toppars' committed offset or global error */ for (i = 0; offsets && i < offsets->cnt; i++) { rd_kafka_topic_partition_t *rktpar = &offsets->elems[i]; rd_kafka_toppar_t *rktp; /* Ignore logical offsets since they were never * sent to the broker. */ if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset)) continue; /* Propagate global error to all partitions that don't have * explicit error set. */ if (err && !rktpar->err) rktpar->err = err; if (rktpar->err) { rd_kafka_dbg(rkcg->rkcg_rk, TOPIC, "OFFSET", "OffsetCommit failed for " "%s [%" PRId32 "] at offset " "%" PRId64 " in join-state %s: %s", rktpar->topic, rktpar->partition, rktpar->offset, rd_kafka_cgrp_join_state_names [rkcg->rkcg_join_state], rd_kafka_err2str(rktpar->err)); errcnt++; continue; } rktp = rd_kafka_topic_partition_get_toppar(rkcg->rkcg_rk, rktpar, rd_false); if (!rktp) continue; rd_kafka_toppar_lock(rktp); rktp->rktp_committed_pos = rd_kafka_topic_partition_get_fetch_pos(rktpar); rd_kafka_toppar_unlock(rktp); rd_kafka_toppar_destroy(rktp); /* from get_toppar() */ } return errcnt; } /** * @brief Propagate OffsetCommit results. * * @param rko_orig The original rko that triggered the commit, this is used * to propagate the result. * @param err Is the aggregated request-level error, or ERR_NO_ERROR. * @param errcnt Are the number of partitions in \p offsets that failed * offset commit. */ static void rd_kafka_cgrp_propagate_commit_result( rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko_orig, rd_kafka_resp_err_t err, int errcnt, rd_kafka_topic_partition_list_t *offsets) { const rd_kafka_t *rk = rkcg->rkcg_rk; int offset_commit_cb_served = 0; /* If no special callback is set but a offset_commit_cb has * been set in conf then post an event for the latter. */ if (!rko_orig->rko_u.offset_commit.cb && rk->rk_conf.offset_commit_cb) { rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err); rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH); if (offsets) rko_reply->rko_u.offset_commit.partitions = rd_kafka_topic_partition_list_copy(offsets); rko_reply->rko_u.offset_commit.cb = rk->rk_conf.offset_commit_cb; rko_reply->rko_u.offset_commit.opaque = rk->rk_conf.opaque; rd_kafka_q_enq(rk->rk_rep, rko_reply); offset_commit_cb_served++; } /* Enqueue reply to requester's queue, if any. */ if (rko_orig->rko_replyq.q) { rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err); rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH); /* Copy offset & partitions & callbacks to reply op */ rko_reply->rko_u.offset_commit = rko_orig->rko_u.offset_commit; if (offsets) rko_reply->rko_u.offset_commit.partitions = rd_kafka_topic_partition_list_copy(offsets); if (rko_reply->rko_u.offset_commit.reason) rko_reply->rko_u.offset_commit.reason = rd_strdup(rko_reply->rko_u.offset_commit.reason); rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko_reply, 0); offset_commit_cb_served++; } if (!offset_commit_cb_served && offsets && (errcnt > 0 || (err != RD_KAFKA_RESP_ERR_NO_ERROR && err != RD_KAFKA_RESP_ERR__NO_OFFSET))) { /* If there is no callback or handler for this (auto) * commit then log an error (#1043) */ char tmp[512]; rd_kafka_topic_partition_list_str( offsets, tmp, sizeof(tmp), /* Print per-partition errors unless there was a * request-level error. */ RD_KAFKA_FMT_F_OFFSET | (errcnt ? RD_KAFKA_FMT_F_ONLY_ERR : 0)); rd_kafka_log( rkcg->rkcg_rk, LOG_WARNING, "COMMITFAIL", "Offset commit (%s) failed " "for %d/%d partition(s) in join-state %s: " "%s%s%s", rko_orig->rko_u.offset_commit.reason, errcnt ? errcnt : offsets->cnt, offsets->cnt, rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], errcnt ? rd_kafka_err2str(err) : "", errcnt ? ": " : "", tmp); } } /** * @brief Handle OffsetCommitResponse * Takes the original 'rko' as opaque argument. * @remark \p rkb, rkbuf, and request may be NULL in a number of * error cases (e.g., _NO_OFFSET, _WAIT_COORD) */ static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, void *opaque) { rd_kafka_cgrp_t *rkcg = rk->rk_cgrp; rd_kafka_op_t *rko_orig = opaque; rd_kafka_topic_partition_list_t *offsets = rko_orig->rko_u.offset_commit.partitions; /* maybe NULL */ int errcnt; RD_KAFKA_OP_TYPE_ASSERT(rko_orig, RD_KAFKA_OP_OFFSET_COMMIT); err = rd_kafka_handle_OffsetCommit(rk, rkb, err, rkbuf, request, offsets, rd_false); /* Suppress empty commit debug logs if allowed */ if (err != RD_KAFKA_RESP_ERR__NO_OFFSET || !rko_orig->rko_u.offset_commit.silent_empty) { if (rkb) rd_rkb_dbg(rkb, CGRP, "COMMIT", "OffsetCommit for %d partition(s) in " "join-state %s: " "%s: returned: %s", offsets ? offsets->cnt : -1, rd_kafka_cgrp_join_state_names [rkcg->rkcg_join_state], rko_orig->rko_u.offset_commit.reason, rd_kafka_err2str(err)); else rd_kafka_dbg(rk, CGRP, "COMMIT", "OffsetCommit for %d partition(s) in " "join-state " "%s: %s: " "returned: %s", offsets ? offsets->cnt : -1, rd_kafka_cgrp_join_state_names [rkcg->rkcg_join_state], rko_orig->rko_u.offset_commit.reason, rd_kafka_err2str(err)); } /* * Error handling */ switch (err) { case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: /* Revoke assignment and rebalance on unknown member */ rd_kafka_cgrp_set_member_id(rk->rk_cgrp, ""); rd_kafka_cgrp_revoke_all_rejoin_maybe( rkcg, rd_true /*assignment is lost*/, rd_true /*this consumer is initiating*/, "OffsetCommit error: Unknown member"); break; case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION: /* Revoke assignment and rebalance on illegal generation */ rk->rk_cgrp->rkcg_generation_id = -1; rd_kafka_cgrp_revoke_all_rejoin_maybe( rkcg, rd_true /*assignment is lost*/, rd_true /*this consumer is initiating*/, "OffsetCommit error: Illegal generation"); break; case RD_KAFKA_RESP_ERR__IN_PROGRESS: return; /* Retrying */ case RD_KAFKA_RESP_ERR_NOT_COORDINATOR: case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE: case RD_KAFKA_RESP_ERR__TRANSPORT: /* The coordinator is not available, defer the offset commit * to when the coordinator is back up again. */ /* Future-proofing, see timeout_scan(). */ rd_kafka_assert(NULL, err != RD_KAFKA_RESP_ERR__WAIT_COORD); if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko_orig, rd_kafka_err2str(err))) return; break; default: break; } /* Call on_commit interceptors */ if (err != RD_KAFKA_RESP_ERR__NO_OFFSET && err != RD_KAFKA_RESP_ERR__DESTROY && offsets && offsets->cnt > 0) rd_kafka_interceptors_on_commit(rk, offsets, err); /* Keep track of outstanding commits */ rd_kafka_assert(NULL, rk->rk_consumer.wait_commit_cnt > 0); rk->rk_consumer.wait_commit_cnt--; if (err == RD_KAFKA_RESP_ERR__DESTROY) { rd_kafka_op_destroy(rko_orig); return; /* Handle is terminating, this op may be handled * by the op enq()ing thread rather than the * rdkafka main thread, it is not safe to * continue here. */ } /* Update the committed offsets for each partition's rktp. */ errcnt = rd_kafka_cgrp_update_committed_offsets(rkcg, err, offsets); if (err != RD_KAFKA_RESP_ERR__DESTROY && !(err == RD_KAFKA_RESP_ERR__NO_OFFSET && rko_orig->rko_u.offset_commit.silent_empty)) { /* Propagate commit results (success or permanent error) * unless we're shutting down or commit was empty. */ rd_kafka_cgrp_propagate_commit_result(rkcg, rko_orig, err, errcnt, offsets); } rd_kafka_op_destroy(rko_orig); /* If the current state was waiting for commits to finish we'll try to * transition to the next state. */ if (rk->rk_consumer.wait_commit_cnt == 0) rd_kafka_assignment_serve(rk); } static size_t rd_kafka_topic_partition_has_absolute_offset( const rd_kafka_topic_partition_t *rktpar, void *opaque) { return rktpar->offset >= 0 ? 1 : 0; } /** * Commit a list of offsets. * Reuse the orignating 'rko' for the async reply. * 'rko->rko_payload' should either by NULL (to commit current assignment) or * a proper topic_partition_list_t with offsets to commit. * The offset list will be altered. * * \p rko...silent_empty: if there are no offsets to commit bail out * silently without posting an op on the reply queue. * \p set_offsets: set offsets and epochs in * rko->rko_u.offset_commit.partitions from the rktp's * stored offset. * * Locality: cgrp thread */ static void rd_kafka_cgrp_offsets_commit(rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko, rd_bool_t set_offsets, const char *reason) { rd_kafka_topic_partition_list_t *offsets; rd_kafka_resp_err_t err; int valid_offsets = 0; int r; rd_kafka_buf_t *rkbuf; rd_kafka_op_t *reply; rd_kafka_consumer_group_metadata_t *cgmetadata; if (!(rko->rko_flags & RD_KAFKA_OP_F_REPROCESS)) { /* wait_commit_cnt has already been increased for * reprocessed ops. */ rkcg->rkcg_rk->rk_consumer.wait_commit_cnt++; } /* If offsets is NULL we shall use the current assignment * (not the group assignment). */ if (!rko->rko_u.offset_commit.partitions && rkcg->rkcg_rk->rk_consumer.assignment.all->cnt > 0) { if (rd_kafka_cgrp_assignment_is_lost(rkcg)) { /* Not committing assigned offsets: assignment lost */ err = RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST; goto err; } rko->rko_u.offset_commit.partitions = rd_kafka_topic_partition_list_copy( rkcg->rkcg_rk->rk_consumer.assignment.all); } offsets = rko->rko_u.offset_commit.partitions; if (offsets) { /* Set offsets to commits */ if (set_offsets) rd_kafka_topic_partition_list_set_offsets( rkcg->rkcg_rk, rko->rko_u.offset_commit.partitions, 1, RD_KAFKA_OFFSET_INVALID /* def */, 1 /* is commit */); /* Check the number of valid offsets to commit. */ valid_offsets = (int)rd_kafka_topic_partition_list_sum( offsets, rd_kafka_topic_partition_has_absolute_offset, NULL); } if (rd_kafka_fatal_error_code(rkcg->rkcg_rk)) { /* Commits are not allowed when a fatal error has been raised */ err = RD_KAFKA_RESP_ERR__FATAL; goto err; } if (!valid_offsets) { /* No valid offsets */ err = RD_KAFKA_RESP_ERR__NO_OFFSET; goto err; } if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP) { rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "COMMIT", "Deferring \"%s\" offset commit " "for %d partition(s) in state %s: " "no coordinator available", reason, valid_offsets, rd_kafka_cgrp_state_names[rkcg->rkcg_state]); if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko, reason)) return; err = RD_KAFKA_RESP_ERR__WAIT_COORD; goto err; } rd_rkb_dbg(rkcg->rkcg_coord, CONSUMER | RD_KAFKA_DBG_CGRP, "COMMIT", "Committing offsets for %d partition(s) with " "generation-id %" PRId32 " in join-state %s: %s", valid_offsets, rkcg->rkcg_generation_id, rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], reason); cgmetadata = rd_kafka_consumer_group_metadata_new_with_genid( rkcg->rkcg_rk->rk_conf.group_id_str, rkcg->rkcg_generation_id, rkcg->rkcg_member_id->str, rkcg->rkcg_rk->rk_conf.group_instance_id); /* Send OffsetCommit */ r = rd_kafka_OffsetCommitRequest(rkcg->rkcg_coord, cgmetadata, offsets, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), rd_kafka_cgrp_op_handle_OffsetCommit, rko, reason); rd_kafka_consumer_group_metadata_destroy(cgmetadata); /* Must have valid offsets to commit if we get here */ rd_kafka_assert(NULL, r != 0); return; err: if (err != RD_KAFKA_RESP_ERR__NO_OFFSET) rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "COMMIT", "OffsetCommit internal error: %s", rd_kafka_err2str(err)); /* Propagate error through dummy buffer object that will * call the response handler from the main loop, avoiding * any recursive calls from op_handle_OffsetCommit -> * assignment_serve() and then back to cgrp_assigned_offsets_commit() */ reply = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF); reply->rko_rk = rkcg->rkcg_rk; /* Set rk since the rkbuf will not * have a rkb to reach it. */ reply->rko_err = err; rkbuf = rd_kafka_buf_new(0, 0); rkbuf->rkbuf_cb = rd_kafka_cgrp_op_handle_OffsetCommit; rkbuf->rkbuf_opaque = rko; reply->rko_u.xbuf.rkbuf = rkbuf; rd_kafka_q_enq(rkcg->rkcg_ops, reply); } /** * @brief Commit offsets assigned partitions. * * If \p offsets is NULL all partitions in the current assignment will be used. * If \p set_offsets is true the offsets to commit will be read from the * rktp's stored offset rather than the .offset fields in \p offsets. * * rkcg_wait_commit_cnt will be increased accordingly. */ void rd_kafka_cgrp_assigned_offsets_commit( rd_kafka_cgrp_t *rkcg, const rd_kafka_topic_partition_list_t *offsets, rd_bool_t set_offsets, const char *reason) { rd_kafka_op_t *rko; if (rd_kafka_cgrp_assignment_is_lost(rkcg)) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "AUTOCOMMIT", "Group \"%s\": not committing assigned offsets: " "assignment lost", rkcg->rkcg_group_id->str); return; } rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT); rko->rko_u.offset_commit.reason = rd_strdup(reason); if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_OFFSET_COMMIT) { /* Send results to application */ rd_kafka_op_set_replyq(rko, rkcg->rkcg_rk->rk_rep, 0); rko->rko_u.offset_commit.cb = rkcg->rkcg_rk->rk_conf.offset_commit_cb; /*maybe NULL*/ rko->rko_u.offset_commit.opaque = rkcg->rkcg_rk->rk_conf.opaque; } /* NULL partitions means current assignment */ if (offsets) rko->rko_u.offset_commit.partitions = rd_kafka_topic_partition_list_copy(offsets); rko->rko_u.offset_commit.silent_empty = 1; rd_kafka_cgrp_offsets_commit(rkcg, rko, set_offsets, reason); } /** * auto.commit.interval.ms commit timer callback. * * Trigger a group offset commit. * * Locality: rdkafka main thread */ static void rd_kafka_cgrp_offset_commit_tmr_cb(rd_kafka_timers_t *rkts, void *arg) { rd_kafka_cgrp_t *rkcg = arg; /* Don't attempt auto commit when rebalancing or initializing since * the rkcg_generation_id is most likely in flux. */ if (rkcg->rkcg_subscription && rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_STEADY) return; rd_kafka_cgrp_assigned_offsets_commit( rkcg, NULL, rd_true /*set offsets*/, "cgrp auto commit timer"); } /** * @brief If rkcg_next_subscription or rkcg_next_unsubscribe are * set, trigger a state change so that they are applied from the * main dispatcher. * * @returns rd_true if a subscribe was scheduled, else false. */ static rd_bool_t rd_kafka_trigger_waiting_subscribe_maybe(rd_kafka_cgrp_t *rkcg) { if (rkcg->rkcg_next_subscription || rkcg->rkcg_next_unsubscribe) { /* Skip the join backoff */ rd_interval_reset(&rkcg->rkcg_join_intvl); rd_kafka_cgrp_rejoin(rkcg, "Applying next subscription"); return rd_true; } return rd_false; } /** * @brief Incrementally add to an existing partition assignment * May update \p partitions but will not hold on to it. * * @returns an error object or NULL on success. */ static rd_kafka_error_t * rd_kafka_cgrp_incremental_assign(rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *partitions) { rd_kafka_error_t *error; error = rd_kafka_assignment_add(rkcg->rkcg_rk, partitions); if (error) return error; if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL) { rd_kafka_assignment_resume(rkcg->rkcg_rk, "incremental assign called"); rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_STEADY); if (rkcg->rkcg_subscription) { /* If using subscribe(), start a timer to enforce * `max.poll.interval.ms`. * Instead of restarting the timer on each ...poll() * call, which would be costly (once per message), * set up an intervalled timer that checks a timestamp * (that is updated on ..poll()). * The timer interval is 2 hz. */ rd_kafka_timer_start( &rkcg->rkcg_rk->rk_timers, &rkcg->rkcg_max_poll_interval_tmr, 500 * 1000ll /* 500ms */, rd_kafka_cgrp_max_poll_interval_check_tmr_cb, rkcg); } } rd_kafka_cgrp_assignment_clear_lost(rkcg, "incremental_assign() called"); return NULL; } /** * @brief Incrementally remove partitions from an existing partition * assignment. May update \p partitions but will not hold on * to it. * * @remark This method does not unmark the current assignment as lost * (if lost). That happens following _incr_unassign_done and * a group-rejoin initiated. * * @returns An error object or NULL on success. */ static rd_kafka_error_t *rd_kafka_cgrp_incremental_unassign( rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *partitions) { rd_kafka_error_t *error; error = rd_kafka_assignment_subtract(rkcg->rkcg_rk, partitions); if (error) return error; if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL) { rd_kafka_assignment_resume(rkcg->rkcg_rk, "incremental unassign called"); rd_kafka_cgrp_set_join_state( rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE); } rd_kafka_cgrp_assignment_clear_lost(rkcg, "incremental_unassign() called"); return NULL; } /** * @brief Call when all incremental unassign operations are done to transition * to the next state. */ static void rd_kafka_cgrp_incr_unassign_done(rd_kafka_cgrp_t *rkcg) { /* If this action was underway when a terminate was initiated, it will * be left to complete. Now that's done, unassign all partitions */ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN", "Group \"%s\" is terminating, initiating full " "unassign", rkcg->rkcg_group_id->str); rd_kafka_cgrp_unassign(rkcg); return; } if (rkcg->rkcg_rebalance_incr_assignment) { /* This incremental unassign was part of a normal rebalance * (in which the revoke set was not empty). Immediately * trigger the assign that follows this revoke. The protocol * dictates this should occur even if the new assignment * set is empty. * * Also, since this rebalance had some revoked partitions, * a re-join should occur following the assign. */ rd_kafka_rebalance_op_incr(rkcg, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, rkcg->rkcg_rebalance_incr_assignment, rd_true /*rejoin following assign*/, "cooperative assign after revoke"); rd_kafka_topic_partition_list_destroy( rkcg->rkcg_rebalance_incr_assignment); rkcg->rkcg_rebalance_incr_assignment = NULL; /* Note: rkcg_rebalance_rejoin is actioned / reset in * rd_kafka_cgrp_incremental_assign call */ } else if (rkcg->rkcg_rebalance_rejoin) { rkcg->rkcg_rebalance_rejoin = rd_false; /* There are some cases (lost partitions), where a rejoin * should occur immediately following the unassign (this * is not the case under normal conditions), in which case * the rejoin flag will be set. */ /* Skip the join backoff */ rd_interval_reset(&rkcg->rkcg_join_intvl); rd_kafka_cgrp_rejoin(rkcg, "Incremental unassignment done"); } else if (!rd_kafka_trigger_waiting_subscribe_maybe(rkcg)) { /* After this incremental unassignment we're now back in * a steady state. */ rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_STEADY); } } /** * @brief Call when all absolute (non-incremental) unassign operations are done * to transition to the next state. */ static void rd_kafka_cgrp_unassign_done(rd_kafka_cgrp_t *rkcg) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN", "Group \"%s\": unassign done in state %s " "(join-state %s)", rkcg->rkcg_group_id->str, rd_kafka_cgrp_state_names[rkcg->rkcg_state], rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); /* Leave group, if desired. */ rd_kafka_cgrp_leave_maybe(rkcg); if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE) return; /* All partitions are unassigned. Rejoin the group. */ /* Skip the join backoff */ rd_interval_reset(&rkcg->rkcg_join_intvl); rd_kafka_cgrp_rejoin(rkcg, "Unassignment done"); } /** * @brief Called from assignment code when all in progress * assignment/unassignment operations are done, allowing the cgrp to * transition to other states if needed. * * @remark This may be called spontaneously without any need for a state * change in the rkcg. */ void rd_kafka_cgrp_assignment_done(rd_kafka_cgrp_t *rkcg) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNDONE", "Group \"%s\": " "assignment operations done in join-state %s " "(rebalance rejoin=%s)", rkcg->rkcg_group_id->str, rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], RD_STR_ToF(rkcg->rkcg_rebalance_rejoin)); switch (rkcg->rkcg_join_state) { case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE: rd_kafka_cgrp_unassign_done(rkcg); break; case RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE: rd_kafka_cgrp_incr_unassign_done(rkcg); break; case RD_KAFKA_CGRP_JOIN_STATE_STEADY: /* If an updated/next subscription is available, schedule it. */ if (rd_kafka_trigger_waiting_subscribe_maybe(rkcg)) break; if (rkcg->rkcg_rebalance_rejoin) { rkcg->rkcg_rebalance_rejoin = rd_false; /* Skip the join backoff */ rd_interval_reset(&rkcg->rkcg_join_intvl); rd_kafka_cgrp_rejoin( rkcg, "rejoining group to redistribute " "previously owned partitions to other " "group members"); break; } /* FALLTHRU */ case RD_KAFKA_CGRP_JOIN_STATE_INIT: /* Check if cgrp is trying to terminate, which is safe to do * in these two states. Otherwise we'll need to wait for * the current state to decommission. */ rd_kafka_cgrp_try_terminate(rkcg); break; default: break; } } /** * @brief Remove existing assignment. */ static rd_kafka_error_t *rd_kafka_cgrp_unassign(rd_kafka_cgrp_t *rkcg) { rd_kafka_assignment_clear(rkcg->rkcg_rk); if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL) { rd_kafka_assignment_resume(rkcg->rkcg_rk, "unassign called"); rd_kafka_cgrp_set_join_state( rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE); } rd_kafka_cgrp_assignment_clear_lost(rkcg, "unassign() called"); return NULL; } /** * @brief Set new atomic partition assignment * May update \p assignment but will not hold on to it. * * @returns NULL on success or an error if a fatal error has been raised. */ static rd_kafka_error_t * rd_kafka_cgrp_assign(rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *assignment) { rd_kafka_error_t *error; rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "ASSIGN", "Group \"%s\": new assignment of %d partition(s) " "in join-state %s", rkcg->rkcg_group_id->str, assignment ? assignment->cnt : 0, rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); /* Clear existing assignment, if any, and serve its removals. */ if (rd_kafka_assignment_clear(rkcg->rkcg_rk)) rd_kafka_assignment_serve(rkcg->rkcg_rk); error = rd_kafka_assignment_add(rkcg->rkcg_rk, assignment); if (error) return error; rd_kafka_cgrp_assignment_clear_lost(rkcg, "assign() called"); if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL) { rd_kafka_assignment_resume(rkcg->rkcg_rk, "assign called"); rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_STEADY); if (rkcg->rkcg_subscription) { /* If using subscribe(), start a timer to enforce * `max.poll.interval.ms`. * Instead of restarting the timer on each ...poll() * call, which would be costly (once per message), * set up an intervalled timer that checks a timestamp * (that is updated on ..poll()). * The timer interval is 2 hz. */ rd_kafka_timer_start( &rkcg->rkcg_rk->rk_timers, &rkcg->rkcg_max_poll_interval_tmr, 500 * 1000ll /* 500ms */, rd_kafka_cgrp_max_poll_interval_check_tmr_cb, rkcg); } } return NULL; } /** * @brief Construct a typed map from list \p rktparlist with key corresponding * to each element in the list and value NULL. * * @remark \p rktparlist may be NULL. */ static map_toppar_member_info_t *rd_kafka_toppar_list_to_toppar_member_info_map( rd_kafka_topic_partition_list_t *rktparlist) { map_toppar_member_info_t *map = rd_calloc(1, sizeof(*map)); const rd_kafka_topic_partition_t *rktpar; RD_MAP_INIT(map, rktparlist ? rktparlist->cnt : 0, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash, rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); if (!rktparlist) return map; RD_KAFKA_TPLIST_FOREACH(rktpar, rktparlist) RD_MAP_SET(map, rd_kafka_topic_partition_copy(rktpar), PartitionMemberInfo_new(NULL, rd_false)); return map; } /** * @brief Construct a toppar list from map \p map with elements corresponding * to the keys of \p map. */ static rd_kafka_topic_partition_list_t * rd_kafka_toppar_member_info_map_to_list(map_toppar_member_info_t *map) { const rd_kafka_topic_partition_t *k; rd_kafka_topic_partition_list_t *list = rd_kafka_topic_partition_list_new((int)RD_MAP_CNT(map)); RD_MAP_FOREACH_KEY(k, map) { rd_kafka_topic_partition_list_add(list, k->topic, k->partition); } return list; } /** * @brief Handle a rebalance-triggered partition assignment * (COOPERATIVE case). */ static void rd_kafka_cgrp_handle_assignment_cooperative( rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *assignment) { map_toppar_member_info_t *new_assignment_set; map_toppar_member_info_t *old_assignment_set; map_toppar_member_info_t *newly_added_set; map_toppar_member_info_t *revoked_set; rd_kafka_topic_partition_list_t *newly_added; rd_kafka_topic_partition_list_t *revoked; new_assignment_set = rd_kafka_toppar_list_to_toppar_member_info_map(assignment); old_assignment_set = rd_kafka_toppar_list_to_toppar_member_info_map( rkcg->rkcg_group_assignment); newly_added_set = rd_kafka_member_partitions_subtract( new_assignment_set, old_assignment_set); revoked_set = rd_kafka_member_partitions_subtract(old_assignment_set, new_assignment_set); newly_added = rd_kafka_toppar_member_info_map_to_list(newly_added_set); revoked = rd_kafka_toppar_member_info_map_to_list(revoked_set); rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COOPASSIGN", "Group \"%s\": incremental assignment: %d newly added, " "%d revoked partitions based on assignment of %d " "partitions", rkcg->rkcg_group_id->str, newly_added->cnt, revoked->cnt, assignment->cnt); if (revoked->cnt > 0) { /* Setting rkcg_incr_assignment causes a follow on incremental * assign rebalance op after completion of this incremental * unassign op. */ rkcg->rkcg_rebalance_incr_assignment = newly_added; newly_added = NULL; rd_kafka_rebalance_op_incr(rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, revoked, rd_false /*no rejoin following unassign*/ , "sync group revoke"); } else { /* There are no revoked partitions - trigger the assign * rebalance op, and flag that the group does not need * to be re-joined */ rd_kafka_rebalance_op_incr( rkcg, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, newly_added, rd_false /*no rejoin following assign*/, "sync group assign"); } if (newly_added) rd_kafka_topic_partition_list_destroy(newly_added); rd_kafka_topic_partition_list_destroy(revoked); RD_MAP_DESTROY_AND_FREE(revoked_set); RD_MAP_DESTROY_AND_FREE(newly_added_set); RD_MAP_DESTROY_AND_FREE(old_assignment_set); RD_MAP_DESTROY_AND_FREE(new_assignment_set); } /** * @brief Sets or clears the group's partition assignment for our consumer. * * Will replace the current group assignment, if any. */ static void rd_kafka_cgrp_group_assignment_set( rd_kafka_cgrp_t *rkcg, const rd_kafka_topic_partition_list_t *partitions) { if (rkcg->rkcg_group_assignment) rd_kafka_topic_partition_list_destroy( rkcg->rkcg_group_assignment); if (partitions) { rkcg->rkcg_group_assignment = rd_kafka_topic_partition_list_copy(partitions); rd_kafka_topic_partition_list_sort_by_topic( rkcg->rkcg_group_assignment); rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNMENT", "Group \"%s\": setting group assignment to %d " "partition(s)", rkcg->rkcg_group_id->str, rkcg->rkcg_group_assignment->cnt); } else { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNMENT", "Group \"%s\": clearing group assignment", rkcg->rkcg_group_id->str); rkcg->rkcg_group_assignment = NULL; } rd_kafka_wrlock(rkcg->rkcg_rk); rkcg->rkcg_c.assignment_size = rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0; rd_kafka_wrunlock(rkcg->rkcg_rk); if (rkcg->rkcg_group_assignment) rd_kafka_topic_partition_list_log( rkcg->rkcg_rk, "GRPASSIGNMENT", RD_KAFKA_DBG_CGRP, rkcg->rkcg_group_assignment); } /** * @brief Adds or removes \p partitions from the current group assignment. * * @param add Whether to add or remove the partitions. * * @remark The added partitions must not already be on the group assignment, * and the removed partitions must be on the group assignment. * * To be used with incremental rebalancing. * */ static void rd_kafka_cgrp_group_assignment_modify( rd_kafka_cgrp_t *rkcg, rd_bool_t add, const rd_kafka_topic_partition_list_t *partitions) { const rd_kafka_topic_partition_t *rktpar; int precnt; rd_kafka_dbg( rkcg->rkcg_rk, CGRP, "ASSIGNMENT", "Group \"%s\": %d partition(s) being %s group assignment " "of %d partition(s)", rkcg->rkcg_group_id->str, partitions->cnt, add ? "added to" : "removed from", rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0); if (partitions == rkcg->rkcg_group_assignment) { /* \p partitions is the actual assignment, which * must mean it is all to be removed. * Short-cut directly to set(NULL). */ rd_assert(!add); rd_kafka_cgrp_group_assignment_set(rkcg, NULL); return; } if (add && (!rkcg->rkcg_group_assignment || rkcg->rkcg_group_assignment->cnt == 0)) { /* Adding to an empty assignment is a set operation. */ rd_kafka_cgrp_group_assignment_set(rkcg, partitions); return; } if (!add) { /* Removing from an empty assignment is illegal. */ rd_assert(rkcg->rkcg_group_assignment != NULL && rkcg->rkcg_group_assignment->cnt > 0); } precnt = rkcg->rkcg_group_assignment->cnt; RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) { int idx; idx = rd_kafka_topic_partition_list_find_idx( rkcg->rkcg_group_assignment, rktpar->topic, rktpar->partition); if (add) { rd_assert(idx == -1); rd_kafka_topic_partition_list_add_copy( rkcg->rkcg_group_assignment, rktpar); } else { rd_assert(idx != -1); rd_kafka_topic_partition_list_del_by_idx( rkcg->rkcg_group_assignment, idx); } } if (add) rd_assert(precnt + partitions->cnt == rkcg->rkcg_group_assignment->cnt); else rd_assert(precnt - partitions->cnt == rkcg->rkcg_group_assignment->cnt); if (rkcg->rkcg_group_assignment->cnt == 0) { rd_kafka_topic_partition_list_destroy( rkcg->rkcg_group_assignment); rkcg->rkcg_group_assignment = NULL; } else if (add) rd_kafka_topic_partition_list_sort_by_topic( rkcg->rkcg_group_assignment); rd_kafka_wrlock(rkcg->rkcg_rk); rkcg->rkcg_c.assignment_size = rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0; rd_kafka_wrunlock(rkcg->rkcg_rk); if (rkcg->rkcg_group_assignment) rd_kafka_topic_partition_list_log( rkcg->rkcg_rk, "GRPASSIGNMENT", RD_KAFKA_DBG_CGRP, rkcg->rkcg_group_assignment); } /** * @brief Handle a rebalance-triggered partition assignment. * * If a rebalance_cb has been registered we enqueue an op for the app * and let the app perform the actual assign() call. Otherwise we * assign() directly from here. * * This provides the most flexibility, allowing the app to perform any * operation it seem fit (e.g., offset writes or reads) before actually * updating the assign():ment. */ static void rd_kafka_cgrp_handle_assignment(rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *assignment) { if (rd_kafka_cgrp_rebalance_protocol(rkcg) == RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE) { rd_kafka_cgrp_handle_assignment_cooperative(rkcg, assignment); } else { rd_kafka_rebalance_op(rkcg, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, assignment, "new assignment"); } } /** * Clean up any group-leader related resources. * * Locality: cgrp thread */ static void rd_kafka_cgrp_group_leader_reset(rd_kafka_cgrp_t *rkcg, const char *reason) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPLEADER", "Group \"%.*s\": resetting group leader info: %s", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason); if (rkcg->rkcg_group_leader.members) { int i; for (i = 0; i < rkcg->rkcg_group_leader.member_cnt; i++) rd_kafka_group_member_clear( &rkcg->rkcg_group_leader.members[i]); rkcg->rkcg_group_leader.member_cnt = 0; rd_free(rkcg->rkcg_group_leader.members); rkcg->rkcg_group_leader.members = NULL; } } /** * @brief React to a RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS broker response. */ static void rd_kafka_cgrp_group_is_rebalancing(rd_kafka_cgrp_t *rkcg) { if (rd_kafka_cgrp_rebalance_protocol(rkcg) == RD_KAFKA_REBALANCE_PROTOCOL_EAGER) { rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_false /*lost*/, rd_false /*initiating*/, "rebalance in progress"); return; } /* In the COOPERATIVE case, simply rejoin the group * - partitions are unassigned on SyncGroup response, * not prior to JoinGroup as with the EAGER case. */ if (RD_KAFKA_CGRP_REBALANCING(rkcg)) { rd_kafka_dbg( rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REBALANCE", "Group \"%.*s\": skipping " "COOPERATIVE rebalance in state %s " "(join-state %s)%s%s%s", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_cgrp_state_names[rkcg->rkcg_state], rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) ? " (awaiting assign call)" : "", (rkcg->rkcg_rebalance_incr_assignment != NULL) ? " (incremental assignment pending)" : "", rkcg->rkcg_rebalance_rejoin ? " (rebalance rejoin)" : ""); return; } rd_kafka_cgrp_rejoin(rkcg, "Group is rebalancing"); } /** * @brief Triggers the application rebalance callback if required to * revoke partitions, and transition to INIT state for (eventual) * rejoin. Does nothing if a rebalance workflow is already in * progress */ static void rd_kafka_cgrp_revoke_all_rejoin_maybe(rd_kafka_cgrp_t *rkcg, rd_bool_t assignment_lost, rd_bool_t initiating, const char *reason) { if (RD_KAFKA_CGRP_REBALANCING(rkcg)) { rd_kafka_dbg( rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REBALANCE", "Group \"%.*s\": rebalance (%s) " "already in progress, skipping in state %s " "(join-state %s) with %d assigned partition(s)%s%s%s: " "%s", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_rebalance_protocol2str( rd_kafka_cgrp_rebalance_protocol(rkcg)), rd_kafka_cgrp_state_names[rkcg->rkcg_state], rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0, assignment_lost ? " (lost)" : "", rkcg->rkcg_rebalance_incr_assignment ? ", incremental assignment in progress" : "", rkcg->rkcg_rebalance_rejoin ? ", rejoin on rebalance" : "", reason); return; } rd_kafka_cgrp_revoke_all_rejoin(rkcg, assignment_lost, initiating, reason); } /** * @brief Triggers the application rebalance callback if required to * revoke partitions, and transition to INIT state for (eventual) * rejoin. */ static void rd_kafka_cgrp_revoke_all_rejoin(rd_kafka_cgrp_t *rkcg, rd_bool_t assignment_lost, rd_bool_t initiating, const char *reason) { rd_kafka_rebalance_protocol_t protocol = rd_kafka_cgrp_rebalance_protocol(rkcg); rd_bool_t terminating = unlikely(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE); rd_kafka_dbg( rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REBALANCE", "Group \"%.*s\" %s (%s) in state %s (join-state %s) " "with %d assigned partition(s)%s: %s", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), initiating ? "initiating rebalance" : "is rebalancing", rd_kafka_rebalance_protocol2str(protocol), rd_kafka_cgrp_state_names[rkcg->rkcg_state], rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0, assignment_lost ? " (lost)" : "", reason); rd_snprintf(rkcg->rkcg_c.rebalance_reason, sizeof(rkcg->rkcg_c.rebalance_reason), "%s", reason); if (protocol == RD_KAFKA_REBALANCE_PROTOCOL_EAGER || protocol == RD_KAFKA_REBALANCE_PROTOCOL_NONE) { /* EAGER case (or initial subscribe) - revoke partitions which * will be followed by rejoin, if required. */ if (assignment_lost) rd_kafka_cgrp_assignment_set_lost( rkcg, "%s: revoking assignment and rejoining", reason); /* Schedule application rebalance op if there is an existing * assignment (albeit perhaps empty) and there is no * outstanding rebalance op in progress. */ if (rkcg->rkcg_group_assignment && !RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg)) { rd_kafka_rebalance_op( rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, rkcg->rkcg_group_assignment, reason); } else { /* Skip the join backoff */ rd_interval_reset(&rkcg->rkcg_join_intvl); rd_kafka_cgrp_rejoin(rkcg, "%s", reason); } return; } /* COOPERATIVE case. */ /* All partitions should never be revoked unless terminating, leaving * the group, or on assignment lost. Another scenario represents a * logic error. Fail fast in this case. */ if (!(terminating || assignment_lost || (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE))) { rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE", "Group \"%s\": unexpected instruction to revoke " "current assignment and rebalance " "(terminating=%d, assignment_lost=%d, " "LEAVE_ON_UNASSIGN_DONE=%d)", rkcg->rkcg_group_id->str, terminating, assignment_lost, (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE)); rd_dassert(!*"BUG: unexpected instruction to revoke " "current assignment and rebalance"); } if (rkcg->rkcg_group_assignment && rkcg->rkcg_group_assignment->cnt > 0) { if (assignment_lost) rd_kafka_cgrp_assignment_set_lost( rkcg, "%s: revoking incremental assignment " "and rejoining", reason); rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REBALANCE", "Group \"%.*s\": revoking " "all %d partition(s)%s%s", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rkcg->rkcg_group_assignment->cnt, terminating ? " (terminating)" : "", assignment_lost ? " (assignment lost)" : ""); rd_kafka_rebalance_op_incr( rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, rkcg->rkcg_group_assignment, terminating ? rd_false : rd_true /*rejoin*/, reason); return; } if (terminating) { /* If terminating, then don't rejoin group. */ rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REBALANCE", "Group \"%.*s\": consumer is " "terminating, skipping rejoin", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); return; } rd_kafka_cgrp_rejoin(rkcg, "Current assignment is empty"); } /** * @brief `max.poll.interval.ms` enforcement check timer. * * @locality rdkafka main thread * @locks none */ static void rd_kafka_cgrp_max_poll_interval_check_tmr_cb(rd_kafka_timers_t *rkts, void *arg) { rd_kafka_cgrp_t *rkcg = arg; rd_kafka_t *rk = rkcg->rkcg_rk; int exceeded; exceeded = rd_kafka_max_poll_exceeded(rk); if (likely(!exceeded)) return; rd_kafka_log(rk, LOG_WARNING, "MAXPOLL", "Application maximum poll interval (%dms) " "exceeded by %dms " "(adjust max.poll.interval.ms for " "long-running message processing): " "leaving group", rk->rk_conf.max_poll_interval_ms, exceeded); rd_kafka_consumer_err(rkcg->rkcg_q, RD_KAFKA_NODEID_UA, RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED, 0, NULL, NULL, RD_KAFKA_OFFSET_INVALID, "Application maximum poll interval (%dms) " "exceeded by %dms", rk->rk_conf.max_poll_interval_ms, exceeded); rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED; rd_kafka_timer_stop(rkts, &rkcg->rkcg_max_poll_interval_tmr, 1 /*lock*/); /* Leave the group before calling rebalance since the standard leave * will be triggered first after the rebalance callback has been served. * But since the application is blocked still doing processing * that leave will be further delayed. * * KIP-345: static group members should continue to respect * `max.poll.interval.ms` but should not send a LeaveGroupRequest. */ if (!RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg)) rd_kafka_cgrp_leave(rkcg); /* Timing out or leaving the group invalidates the member id, reset it * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */ rd_kafka_cgrp_set_member_id(rkcg, ""); /* Trigger rebalance */ rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_true /*lost*/, rd_true /*initiating*/, "max.poll.interval.ms exceeded"); } /** * @brief Generate consumer errors for each topic in the list. * * Also replaces the list of last reported topic errors so that repeated * errors are silenced. * * @param errored Errored topics. * @param error_prefix Error message prefix. * * @remark Assumes ownership of \p errored. */ static void rd_kafka_propagate_consumer_topic_errors( rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *errored, const char *error_prefix) { int i; for (i = 0; i < errored->cnt; i++) { rd_kafka_topic_partition_t *topic = &errored->elems[i]; rd_kafka_topic_partition_t *prev; rd_assert(topic->err); /* Normalize error codes, unknown topic may be * reported by the broker, or the lack of a topic in * metadata response is figured out by the client. * Make sure the application only sees one error code * for both these cases. */ if (topic->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC) topic->err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; /* Check if this topic errored previously */ prev = rd_kafka_topic_partition_list_find( rkcg->rkcg_errored_topics, topic->topic, RD_KAFKA_PARTITION_UA); if (prev && prev->err == topic->err) continue; /* This topic already reported same error */ rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_TOPIC, "TOPICERR", "%s: %s: %s", error_prefix, topic->topic, rd_kafka_err2str(topic->err)); /* Send consumer error to application */ rd_kafka_consumer_err( rkcg->rkcg_q, RD_KAFKA_NODEID_UA, topic->err, 0, topic->topic, NULL, RD_KAFKA_OFFSET_INVALID, "%s: %s: %s", error_prefix, topic->topic, rd_kafka_err2str(topic->err)); } rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics); rkcg->rkcg_errored_topics = errored; } /** * @brief Work out the topics currently subscribed to that do not * match any pattern in \p subscription. */ static rd_kafka_topic_partition_list_t *rd_kafka_cgrp_get_unsubscribing_topics( rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *subscription) { int i; rd_kafka_topic_partition_list_t *result; result = rd_kafka_topic_partition_list_new( rkcg->rkcg_subscribed_topics->rl_cnt); /* TODO: Something that isn't O(N*M) */ for (i = 0; i < rkcg->rkcg_subscribed_topics->rl_cnt; i++) { int j; const char *topic = ((rd_kafka_topic_info_t *) rkcg->rkcg_subscribed_topics->rl_elems[i]) ->topic; for (j = 0; j < subscription->cnt; j++) { const char *pattern = subscription->elems[j].topic; if (rd_kafka_topic_match(rkcg->rkcg_rk, pattern, topic)) { break; } } if (j == subscription->cnt) rd_kafka_topic_partition_list_add( result, topic, RD_KAFKA_PARTITION_UA); } if (result->cnt == 0) { rd_kafka_topic_partition_list_destroy(result); return NULL; } return result; } /** * @brief Determine the partitions to revoke, given the topics being * unassigned. */ static rd_kafka_topic_partition_list_t * rd_kafka_cgrp_calculate_subscribe_revoking_partitions( rd_kafka_cgrp_t *rkcg, const rd_kafka_topic_partition_list_t *unsubscribing) { rd_kafka_topic_partition_list_t *revoking; const rd_kafka_topic_partition_t *rktpar; if (!unsubscribing) return NULL; if (!rkcg->rkcg_group_assignment || rkcg->rkcg_group_assignment->cnt == 0) return NULL; revoking = rd_kafka_topic_partition_list_new(rkcg->rkcg_group_assignment->cnt); /* TODO: Something that isn't O(N*M). */ RD_KAFKA_TPLIST_FOREACH(rktpar, unsubscribing) { const rd_kafka_topic_partition_t *assigned; RD_KAFKA_TPLIST_FOREACH(assigned, rkcg->rkcg_group_assignment) { if (!strcmp(assigned->topic, rktpar->topic)) { rd_kafka_topic_partition_list_add( revoking, assigned->topic, assigned->partition); continue; } } } if (revoking->cnt == 0) { rd_kafka_topic_partition_list_destroy(revoking); revoking = NULL; } return revoking; } /** * @brief Handle a new subscription that is modifying an existing subscription * in the COOPERATIVE case. * * @remark Assumes ownership of \p rktparlist. */ static rd_kafka_resp_err_t rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *rktparlist) { rd_kafka_topic_partition_list_t *unsubscribing_topics; rd_kafka_topic_partition_list_t *revoking; rd_list_t *tinfos; rd_kafka_topic_partition_list_t *errored; int metadata_age; int old_cnt = rkcg->rkcg_subscription->cnt; rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION; if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0) rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION; /* Topics in rkcg_subscribed_topics that don't match any pattern in the new subscription. */ unsubscribing_topics = rd_kafka_cgrp_get_unsubscribing_topics(rkcg, rktparlist); /* Currently assigned topic partitions that are no longer desired. */ revoking = rd_kafka_cgrp_calculate_subscribe_revoking_partitions( rkcg, unsubscribing_topics); rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription); rkcg->rkcg_subscription = rktparlist; if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age, "modify subscription") == 1) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "MODSUB", "Group \"%.*s\": postponing join until " "up-to-date metadata is available", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id)); rd_assert( rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT || /* Possible via rd_kafka_cgrp_modify_subscription */ rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY); rd_kafka_cgrp_set_join_state( rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA); /* Revoke/join will occur after metadata refresh completes */ if (revoking) rd_kafka_topic_partition_list_destroy(revoking); if (unsubscribing_topics) rd_kafka_topic_partition_list_destroy( unsubscribing_topics); return RD_KAFKA_RESP_ERR_NO_ERROR; } rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE", "Group \"%.*s\": modifying subscription of size %d to " "new subscription of size %d, removing %d topic(s), " "revoking %d partition(s) (join-state %s)", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), old_cnt, rkcg->rkcg_subscription->cnt, unsubscribing_topics ? unsubscribing_topics->cnt : 0, revoking ? revoking->cnt : 0, rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); if (unsubscribing_topics) rd_kafka_topic_partition_list_destroy(unsubscribing_topics); /* Create a list of the topics in metadata that matches the new * subscription */ tinfos = rd_list_new(rkcg->rkcg_subscription->cnt, (void *)rd_kafka_topic_info_destroy); /* Unmatched topics will be added to the errored list. */ errored = rd_kafka_topic_partition_list_new(0); if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored); else rd_kafka_metadata_topic_filter( rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored); /* Propagate consumer errors for any non-existent or errored topics. * The function takes ownership of errored. */ rd_kafka_propagate_consumer_topic_errors( rkcg, errored, "Subscribed topic not available"); if (rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos) && !revoking) { rd_kafka_cgrp_rejoin(rkcg, "Subscription modified"); return RD_KAFKA_RESP_ERR_NO_ERROR; } if (revoking) { rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REBALANCE", "Group \"%.*s\" revoking " "%d of %d partition(s)", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), revoking->cnt, rkcg->rkcg_group_assignment->cnt); rd_kafka_rebalance_op_incr( rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, revoking, rd_true /*rejoin*/, "subscribe"); rd_kafka_topic_partition_list_destroy(revoking); } return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * Remove existing topic subscription. */ static rd_kafka_resp_err_t rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t *rkcg, rd_bool_t leave_group) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNSUBSCRIBE", "Group \"%.*s\": unsubscribe from current %ssubscription " "of size %d (leave group=%s, has joined=%s, %s, " "join-state %s)", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rkcg->rkcg_subscription ? "" : "unset ", rkcg->rkcg_subscription ? rkcg->rkcg_subscription->cnt : 0, RD_STR_ToF(leave_group), RD_STR_ToF(RD_KAFKA_CGRP_HAS_JOINED(rkcg)), rkcg->rkcg_member_id ? rkcg->rkcg_member_id->str : "n/a", rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers, &rkcg->rkcg_max_poll_interval_tmr, 1 /*lock*/); if (rkcg->rkcg_subscription) { rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription); rkcg->rkcg_subscription = NULL; } rd_kafka_cgrp_update_subscribed_topics(rkcg, NULL); /* * Clean-up group leader duties, if any. */ rd_kafka_cgrp_group_leader_reset(rkcg, "unsubscribe"); if (leave_group && RD_KAFKA_CGRP_HAS_JOINED(rkcg)) rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE; /* FIXME: Why are we only revoking if !assignment_lost ? */ if (!rd_kafka_cgrp_assignment_is_lost(rkcg)) rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_false /*not lost*/, rd_true /*initiating*/, "unsubscribe"); rkcg->rkcg_flags &= ~(RD_KAFKA_CGRP_F_SUBSCRIPTION | RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * Set new atomic topic subscription. */ static rd_kafka_resp_err_t rd_kafka_cgrp_subscribe(rd_kafka_cgrp_t *rkcg, rd_kafka_topic_partition_list_t *rktparlist) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE", "Group \"%.*s\": subscribe to new %ssubscription " "of %d topics (join-state %s)", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rktparlist ? "" : "unset ", rktparlist ? rktparlist->cnt : 0, rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); if (rkcg->rkcg_rk->rk_conf.enabled_assignor_cnt == 0) return RD_KAFKA_RESP_ERR__INVALID_ARG; /* If the consumer has raised a fatal error treat all subscribes as unsubscribe */ if (rd_kafka_fatal_error_code(rkcg->rkcg_rk)) { if (rkcg->rkcg_subscription) rd_kafka_cgrp_unsubscribe(rkcg, rd_true /*leave group*/); return RD_KAFKA_RESP_ERR__FATAL; } /* Clear any existing postponed subscribe. */ if (rkcg->rkcg_next_subscription) rd_kafka_topic_partition_list_destroy_free( rkcg->rkcg_next_subscription); rkcg->rkcg_next_subscription = NULL; rkcg->rkcg_next_unsubscribe = rd_false; if (RD_KAFKA_CGRP_REBALANCING(rkcg)) { rd_kafka_dbg( rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE", "Group \"%.*s\": postponing " "subscribe until previous rebalance " "completes (join-state %s)", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); if (!rktparlist) rkcg->rkcg_next_unsubscribe = rd_true; else rkcg->rkcg_next_subscription = rktparlist; return RD_KAFKA_RESP_ERR_NO_ERROR; } if (rd_kafka_cgrp_rebalance_protocol(rkcg) == RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE && rktparlist && rkcg->rkcg_subscription) return rd_kafka_cgrp_modify_subscription(rkcg, rktparlist); /* Remove existing subscription first */ if (rkcg->rkcg_subscription) rd_kafka_cgrp_unsubscribe( rkcg, rktparlist ? rd_false /* don't leave group if new subscription */ : rd_true /* leave group if no new subscription */); if (!rktparlist) return RD_KAFKA_RESP_ERR_NO_ERROR; rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION; if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0) rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION; rkcg->rkcg_subscription = rktparlist; rd_kafka_cgrp_join(rkcg); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * Same as cgrp_terminate() but called from the cgrp/main thread upon receiving * the op 'rko' from cgrp_terminate(). * * NOTE: Takes ownership of 'rko' * * Locality: main thread */ void rd_kafka_cgrp_terminate0(rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko) { rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread)); rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM", "Terminating group \"%.*s\" in state %s " "with %d partition(s)", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_cgrp_state_names[rkcg->rkcg_state], rd_list_cnt(&rkcg->rkcg_toppars)); if (unlikely(rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM || (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) || rkcg->rkcg_reply_rko != NULL)) { /* Already terminating or handling a previous terminate */ if (rko) { rd_kafka_q_t *rkq = rko->rko_replyq.q; rko->rko_replyq.q = NULL; rd_kafka_consumer_err( rkq, RD_KAFKA_NODEID_UA, RD_KAFKA_RESP_ERR__IN_PROGRESS, rko->rko_replyq.version, NULL, NULL, RD_KAFKA_OFFSET_INVALID, "Group is %s", rkcg->rkcg_reply_rko ? "terminating" : "terminated"); rd_kafka_q_destroy(rkq); rd_kafka_op_destroy(rko); } return; } /* Mark for stopping, the actual state transition * is performed when all toppars have left. */ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_TERMINATE; rkcg->rkcg_ts_terminate = rd_clock(); rkcg->rkcg_reply_rko = rko; if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION) rd_kafka_cgrp_unsubscribe( rkcg, /* Leave group if this is a controlled shutdown */ !rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk)); /* Reset the wait-for-LeaveGroup flag if there is an outstanding * LeaveGroupRequest being waited on (from a prior unsubscribe), but * the destroy flags have NO_CONSUMER_CLOSE set, which calls * for immediate termination. */ if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk)) rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE; /* If there's an oustanding rebalance which has not yet been * served by the application it will be served from consumer_close(). * If the instance is being terminated with NO_CONSUMER_CLOSE we * trigger unassign directly to avoid stalling on rebalance callback * queues that are no longer served by the application. */ if (!RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) || rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk)) rd_kafka_cgrp_unassign(rkcg); /* Serve assignment so it can start to decommission */ rd_kafka_assignment_serve(rkcg->rkcg_rk); /* Try to terminate right away if all preconditions are met. */ rd_kafka_cgrp_try_terminate(rkcg); } /** * Terminate and decommission a cgrp asynchronously. * * Locality: any thread */ void rd_kafka_cgrp_terminate(rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq) { rd_kafka_assert(NULL, !thrd_is_current(rkcg->rkcg_rk->rk_thread)); rd_kafka_cgrp_op(rkcg, NULL, replyq, RD_KAFKA_OP_TERMINATE, 0); } struct _op_timeout_offset_commit { rd_ts_t now; rd_kafka_t *rk; rd_list_t expired; }; /** * q_filter callback for expiring OFFSET_COMMIT timeouts. */ static int rd_kafka_op_offset_commit_timeout_check(rd_kafka_q_t *rkq, rd_kafka_op_t *rko, void *opaque) { struct _op_timeout_offset_commit *state = (struct _op_timeout_offset_commit *)opaque; if (likely(rko->rko_type != RD_KAFKA_OP_OFFSET_COMMIT || rko->rko_u.offset_commit.ts_timeout == 0 || rko->rko_u.offset_commit.ts_timeout > state->now)) { return 0; } rd_kafka_q_deq0(rkq, rko); /* Add to temporary list to avoid recursive * locking of rkcg_wait_coord_q. */ rd_list_add(&state->expired, rko); return 1; } /** * Scan for various timeouts. */ static void rd_kafka_cgrp_timeout_scan(rd_kafka_cgrp_t *rkcg, rd_ts_t now) { struct _op_timeout_offset_commit ofc_state; int i, cnt = 0; rd_kafka_op_t *rko; ofc_state.now = now; ofc_state.rk = rkcg->rkcg_rk; rd_list_init(&ofc_state.expired, 0, NULL); cnt += rd_kafka_q_apply(rkcg->rkcg_wait_coord_q, rd_kafka_op_offset_commit_timeout_check, &ofc_state); RD_LIST_FOREACH(rko, &ofc_state.expired, i) rd_kafka_cgrp_op_handle_OffsetCommit(rkcg->rkcg_rk, NULL, RD_KAFKA_RESP_ERR__WAIT_COORD, NULL, NULL, rko); rd_list_destroy(&ofc_state.expired); if (cnt > 0) rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTIMEOUT", "Group \"%.*s\": timed out %d op(s), %d remain", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), cnt, rd_kafka_q_len(rkcg->rkcg_wait_coord_q)); } /** * @brief Handle an assign op. * @locality rdkafka main thread * @locks none */ static void rd_kafka_cgrp_handle_assign_op(rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko) { rd_kafka_error_t *error = NULL; if (rd_kafka_fatal_error_code(rkcg->rkcg_rk) || rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) { /* Treat all assignments as unassign when a fatal error is * raised or the cgrp is terminating. */ rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "ASSIGN", "Group \"%s\": Consumer %s: " "treating assign as unassign", rkcg->rkcg_group_id->str, rd_kafka_fatal_error_code(rkcg->rkcg_rk) ? "has raised a fatal error" : "is terminating"); if (rko->rko_u.assign.partitions) { rd_kafka_topic_partition_list_destroy( rko->rko_u.assign.partitions); rko->rko_u.assign.partitions = NULL; } rko->rko_u.assign.method = RD_KAFKA_ASSIGN_METHOD_ASSIGN; } else if (rd_kafka_cgrp_rebalance_protocol(rkcg) == RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE && !(rko->rko_u.assign.method == RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN || rko->rko_u.assign.method == RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN)) error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE, "Changes to the current assignment " "must be made using " "incremental_assign() or " "incremental_unassign() " "when rebalance protocol type is " "COOPERATIVE"); else if (rd_kafka_cgrp_rebalance_protocol(rkcg) == RD_KAFKA_REBALANCE_PROTOCOL_EAGER && !(rko->rko_u.assign.method == RD_KAFKA_ASSIGN_METHOD_ASSIGN)) error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE, "Changes to the current assignment " "must be made using " "assign() when rebalance " "protocol type is EAGER"); if (!error) { switch (rko->rko_u.assign.method) { case RD_KAFKA_ASSIGN_METHOD_ASSIGN: /* New atomic assignment (partitions != NULL), * or unassignment (partitions == NULL) */ if (rko->rko_u.assign.partitions) error = rd_kafka_cgrp_assign( rkcg, rko->rko_u.assign.partitions); else error = rd_kafka_cgrp_unassign(rkcg); break; case RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN: error = rd_kafka_cgrp_incremental_assign( rkcg, rko->rko_u.assign.partitions); break; case RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN: error = rd_kafka_cgrp_incremental_unassign( rkcg, rko->rko_u.assign.partitions); break; default: RD_NOTREACHED(); break; } /* If call succeeded serve the assignment */ if (!error) rd_kafka_assignment_serve(rkcg->rkcg_rk); } if (error) { /* Log error since caller might not check * *assign() return value. */ rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "ASSIGN", "Group \"%s\": application *assign() call " "failed: %s", rkcg->rkcg_group_id->str, rd_kafka_error_string(error)); } rd_kafka_op_error_reply(rko, error); } /** * @brief Handle cgrp queue op. * @locality rdkafka main thread * @locks none */ static rd_kafka_op_res_t rd_kafka_cgrp_op_serve(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko, rd_kafka_q_cb_type_t cb_type, void *opaque) { rd_kafka_cgrp_t *rkcg = opaque; rd_kafka_toppar_t *rktp; rd_kafka_resp_err_t err; const int silent_op = rko->rko_type == RD_KAFKA_OP_RECV_BUF; rktp = rko->rko_rktp; if (rktp && !silent_op) rd_kafka_dbg( rkcg->rkcg_rk, CGRP, "CGRPOP", "Group \"%.*s\" received op %s in state %s " "(join-state %s) for %.*s [%" PRId32 "]", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_op2str(rko->rko_type), rd_kafka_cgrp_state_names[rkcg->rkcg_state], rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), rktp->rktp_partition); else if (!silent_op) rd_kafka_dbg( rkcg->rkcg_rk, CGRP, "CGRPOP", "Group \"%.*s\" received op %s in state %s " "(join-state %s)", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_op2str(rko->rko_type), rd_kafka_cgrp_state_names[rkcg->rkcg_state], rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); switch ((int)rko->rko_type) { case RD_KAFKA_OP_NAME: /* Return the currently assigned member id. */ if (rkcg->rkcg_member_id) rko->rko_u.name.str = RD_KAFKAP_STR_DUP(rkcg->rkcg_member_id); rd_kafka_op_reply(rko, 0); rko = NULL; break; case RD_KAFKA_OP_CG_METADATA: /* Return the current consumer group metadata. */ rko->rko_u.cg_metadata = rkcg->rkcg_member_id ? rd_kafka_consumer_group_metadata_new_with_genid( rkcg->rkcg_rk->rk_conf.group_id_str, rkcg->rkcg_generation_id, rkcg->rkcg_member_id->str, rkcg->rkcg_rk->rk_conf.group_instance_id) : NULL; rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR); rko = NULL; break; case RD_KAFKA_OP_OFFSET_FETCH: if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)) { rd_kafka_op_handle_OffsetFetch( rkcg->rkcg_rk, NULL, RD_KAFKA_RESP_ERR__WAIT_COORD, NULL, NULL, rko); rko = NULL; /* rko freed by handler */ break; } rd_kafka_OffsetFetchRequest( rkcg->rkcg_coord, rk->rk_group_id->str, rko->rko_u.offset_fetch.partitions, rko->rko_u.offset_fetch.require_stable_offsets, 0, /* Timeout */ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), rd_kafka_op_handle_OffsetFetch, rko); rko = NULL; /* rko now owned by request */ break; case RD_KAFKA_OP_PARTITION_JOIN: rd_kafka_cgrp_partition_add(rkcg, rktp); /* If terminating tell the partition to leave */ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_NO_REPLYQ); break; case RD_KAFKA_OP_PARTITION_LEAVE: rd_kafka_cgrp_partition_del(rkcg, rktp); break; case RD_KAFKA_OP_OFFSET_COMMIT: /* Trigger offsets commit. */ rd_kafka_cgrp_offsets_commit(rkcg, rko, /* only set offsets * if no partitions were * specified. */ rko->rko_u.offset_commit.partitions ? 0 : 1 /* set_offsets*/, rko->rko_u.offset_commit.reason); rko = NULL; /* rko now owned by request */ break; case RD_KAFKA_OP_COORD_QUERY: rd_kafka_cgrp_coord_query( rkcg, rko->rko_err ? rd_kafka_err2str(rko->rko_err) : "from op"); break; case RD_KAFKA_OP_SUBSCRIBE: rd_kafka_app_polled(rk); /* New atomic subscription (may be NULL) */ err = rd_kafka_cgrp_subscribe(rkcg, rko->rko_u.subscribe.topics); if (!err) /* now owned by rkcg */ rko->rko_u.subscribe.topics = NULL; rd_kafka_op_reply(rko, err); rko = NULL; break; case RD_KAFKA_OP_ASSIGN: rd_kafka_cgrp_handle_assign_op(rkcg, rko); rko = NULL; break; case RD_KAFKA_OP_GET_SUBSCRIPTION: if (rkcg->rkcg_next_subscription) rko->rko_u.subscribe.topics = rd_kafka_topic_partition_list_copy( rkcg->rkcg_next_subscription); else if (rkcg->rkcg_next_unsubscribe) rko->rko_u.subscribe.topics = NULL; else if (rkcg->rkcg_subscription) rko->rko_u.subscribe.topics = rd_kafka_topic_partition_list_copy( rkcg->rkcg_subscription); rd_kafka_op_reply(rko, 0); rko = NULL; break; case RD_KAFKA_OP_GET_ASSIGNMENT: /* This is the consumer assignment, not the group assignment. */ rko->rko_u.assign.partitions = rd_kafka_topic_partition_list_copy( rkcg->rkcg_rk->rk_consumer.assignment.all); rd_kafka_op_reply(rko, 0); rko = NULL; break; case RD_KAFKA_OP_GET_REBALANCE_PROTOCOL: rko->rko_u.rebalance_protocol.str = rd_kafka_rebalance_protocol2str( rd_kafka_cgrp_rebalance_protocol(rkcg)); rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR); rko = NULL; break; case RD_KAFKA_OP_TERMINATE: rd_kafka_cgrp_terminate0(rkcg, rko); rko = NULL; /* terminate0() takes ownership */ break; default: rd_kafka_assert(rkcg->rkcg_rk, !*"unknown type"); break; } if (rko) rd_kafka_op_destroy(rko); return RD_KAFKA_OP_RES_HANDLED; } /** * @returns true if the session timeout has expired (due to no successful * Heartbeats in session.timeout.ms) and triggers a rebalance. */ static rd_bool_t rd_kafka_cgrp_session_timeout_check(rd_kafka_cgrp_t *rkcg, rd_ts_t now) { rd_ts_t delta; char buf[256]; if (unlikely(!rkcg->rkcg_ts_session_timeout)) return rd_true; /* Session has expired */ delta = now - rkcg->rkcg_ts_session_timeout; if (likely(delta < 0)) return rd_false; delta += rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000; rd_snprintf(buf, sizeof(buf), "Consumer group session timed out (in join-state %s) after " "%" PRId64 " ms without a successful response from the " "group coordinator (broker %" PRId32 ", last error was %s)", rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state], delta / 1000, rkcg->rkcg_coord_id, rd_kafka_err2str(rkcg->rkcg_last_heartbeat_err)); rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR; rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "SESSTMOUT", "%s: revoking assignment and rejoining group", buf); /* Prevent further rebalances */ rkcg->rkcg_ts_session_timeout = 0; /* Timing out invalidates the member id, reset it * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */ rd_kafka_cgrp_set_member_id(rkcg, ""); /* Revoke and rebalance */ rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_true /*lost*/, rd_true /*initiating*/, buf); return rd_true; } /** * @brief Apply the next waiting subscribe/unsubscribe, if any. */ static void rd_kafka_cgrp_apply_next_subscribe(rd_kafka_cgrp_t *rkcg) { rd_assert(rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT); if (rkcg->rkcg_next_subscription) { rd_kafka_topic_partition_list_t *next_subscription = rkcg->rkcg_next_subscription; rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIBE", "Group \"%s\": invoking waiting postponed " "subscribe", rkcg->rkcg_group_id->str); rkcg->rkcg_next_subscription = NULL; rd_kafka_cgrp_subscribe(rkcg, next_subscription); } else if (rkcg->rkcg_next_unsubscribe) { rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIBE", "Group \"%s\": invoking waiting postponed " "unsubscribe", rkcg->rkcg_group_id->str); rkcg->rkcg_next_unsubscribe = rd_false; rd_kafka_cgrp_unsubscribe(rkcg, rd_true /*Leave*/); } } /** * Client group's join state handling */ static void rd_kafka_cgrp_join_state_serve(rd_kafka_cgrp_t *rkcg) { rd_ts_t now = rd_clock(); if (unlikely(rd_kafka_fatal_error_code(rkcg->rkcg_rk))) return; switch (rkcg->rkcg_join_state) { case RD_KAFKA_CGRP_JOIN_STATE_INIT: if (unlikely(rd_kafka_cgrp_awaiting_response(rkcg))) break; /* If there is a next subscription, apply it. */ rd_kafka_cgrp_apply_next_subscribe(rkcg); /* If we have a subscription start the join process. */ if (!rkcg->rkcg_subscription) break; if (rd_interval_immediate(&rkcg->rkcg_join_intvl, 1000 * 1000, now) > 0) rd_kafka_cgrp_join(rkcg); break; case RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN: case RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA: case RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC: case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE: /* FIXME: I think we might have to send heartbeats in * in WAIT_INCR_UNASSIGN, yes-no? */ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE: break; case RD_KAFKA_CGRP_JOIN_STATE_STEADY: case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL: case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL: if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION && rd_interval( &rkcg->rkcg_heartbeat_intvl, rkcg->rkcg_rk->rk_conf.group_heartbeat_intvl_ms * 1000, now) > 0) rd_kafka_cgrp_heartbeat(rkcg); break; } } /** * Client group handling. * Called from main thread to serve the operational aspects of a cgrp. */ void rd_kafka_cgrp_serve(rd_kafka_cgrp_t *rkcg) { rd_kafka_broker_t *rkb = rkcg->rkcg_coord; int rkb_state = RD_KAFKA_BROKER_STATE_INIT; rd_ts_t now; if (rkb) { rd_kafka_broker_lock(rkb); rkb_state = rkb->rkb_state; rd_kafka_broker_unlock(rkb); /* Go back to querying state if we lost the current coordinator * connection. */ if (rkb_state < RD_KAFKA_BROKER_STATE_UP && rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) rd_kafka_cgrp_set_state( rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); } now = rd_clock(); /* Check for cgrp termination */ if (unlikely(rd_kafka_cgrp_try_terminate(rkcg))) { rd_kafka_cgrp_terminated(rkcg); return; /* cgrp terminated */ } /* Bail out if we're terminating. */ if (unlikely(rd_kafka_terminating(rkcg->rkcg_rk))) return; /* Check session timeout regardless of current coordinator * connection state (rkcg_state) */ if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY) rd_kafka_cgrp_session_timeout_check(rkcg, now); retry: switch (rkcg->rkcg_state) { case RD_KAFKA_CGRP_STATE_TERM: break; case RD_KAFKA_CGRP_STATE_INIT: rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD); /* FALLTHRU */ case RD_KAFKA_CGRP_STATE_QUERY_COORD: /* Query for coordinator. */ if (rd_interval_immediate(&rkcg->rkcg_coord_query_intvl, 500 * 1000, now) > 0) rd_kafka_cgrp_coord_query(rkcg, "intervaled in " "state query-coord"); break; case RD_KAFKA_CGRP_STATE_WAIT_COORD: /* Waiting for FindCoordinator response */ break; case RD_KAFKA_CGRP_STATE_WAIT_BROKER: /* See if the group should be reassigned to another broker. */ if (rd_kafka_cgrp_coord_update(rkcg, rkcg->rkcg_coord_id)) goto retry; /* Coordinator changed, retry state-machine * to speed up next transition. */ /* Coordinator query */ if (rd_interval(&rkcg->rkcg_coord_query_intvl, 1000 * 1000, now) > 0) rd_kafka_cgrp_coord_query(rkcg, "intervaled in " "state wait-broker"); break; case RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT: /* Waiting for broker transport to come up. * Also make sure broker supports groups. */ if (rkb_state < RD_KAFKA_BROKER_STATE_UP || !rkb || !rd_kafka_broker_supports( rkb, RD_KAFKA_FEATURE_BROKER_GROUP_COORD)) { /* Coordinator query */ if (rd_interval(&rkcg->rkcg_coord_query_intvl, 1000 * 1000, now) > 0) rd_kafka_cgrp_coord_query( rkcg, "intervaled in state " "wait-broker-transport"); } else { rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_UP); /* Serve join state to trigger (re)join */ rd_kafka_cgrp_join_state_serve(rkcg); /* Serve any pending partitions in the assignment */ rd_kafka_assignment_serve(rkcg->rkcg_rk); } break; case RD_KAFKA_CGRP_STATE_UP: /* Move any ops awaiting the coordinator to the ops queue * for reprocessing. */ rd_kafka_q_concat(rkcg->rkcg_ops, rkcg->rkcg_wait_coord_q); /* Relaxed coordinator queries. */ if (rd_interval(&rkcg->rkcg_coord_query_intvl, rkcg->rkcg_rk->rk_conf.coord_query_intvl_ms * 1000, now) > 0) rd_kafka_cgrp_coord_query(rkcg, "intervaled in state up"); rd_kafka_cgrp_join_state_serve(rkcg); break; } if (unlikely(rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP && rd_interval(&rkcg->rkcg_timeout_scan_intvl, 1000 * 1000, now) > 0)) rd_kafka_cgrp_timeout_scan(rkcg, now); } /** * Send an op to a cgrp. * * Locality: any thread */ void rd_kafka_cgrp_op(rd_kafka_cgrp_t *rkcg, rd_kafka_toppar_t *rktp, rd_kafka_replyq_t replyq, rd_kafka_op_type_t type, rd_kafka_resp_err_t err) { rd_kafka_op_t *rko; rko = rd_kafka_op_new(type); rko->rko_err = err; rko->rko_replyq = replyq; if (rktp) rko->rko_rktp = rd_kafka_toppar_keep(rktp); rd_kafka_q_enq(rkcg->rkcg_ops, rko); } void rd_kafka_cgrp_set_member_id(rd_kafka_cgrp_t *rkcg, const char *member_id) { if (rkcg->rkcg_member_id && member_id && !rd_kafkap_str_cmp_str(rkcg->rkcg_member_id, member_id)) return; /* No change */ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "MEMBERID", "Group \"%.*s\": updating member id \"%s\" -> \"%s\"", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rkcg->rkcg_member_id ? rkcg->rkcg_member_id->str : "(not-set)", member_id ? member_id : "(not-set)"); if (rkcg->rkcg_member_id) { rd_kafkap_str_destroy(rkcg->rkcg_member_id); rkcg->rkcg_member_id = NULL; } if (member_id) rkcg->rkcg_member_id = rd_kafkap_str_new(member_id, -1); } /** * @brief Determine owned partitions that no longer exist (partitions in * deleted or re-created topics). */ static rd_kafka_topic_partition_list_t * rd_kafka_cgrp_owned_but_not_exist_partitions(rd_kafka_cgrp_t *rkcg) { rd_kafka_topic_partition_list_t *result = NULL; const rd_kafka_topic_partition_t *curr; if (!rkcg->rkcg_group_assignment) return NULL; RD_KAFKA_TPLIST_FOREACH(curr, rkcg->rkcg_group_assignment) { if (rd_list_find(rkcg->rkcg_subscribed_topics, curr->topic, rd_kafka_topic_info_topic_cmp)) continue; if (!result) result = rd_kafka_topic_partition_list_new( rkcg->rkcg_group_assignment->cnt); rd_kafka_topic_partition_list_add_copy(result, curr); } return result; } /** * @brief Check if the latest metadata affects the current subscription: * - matched topic added * - matched topic removed * - matched topic's partition count change * * @locks none * @locality rdkafka main thread */ void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg, rd_bool_t do_join) { rd_list_t *tinfos; rd_kafka_topic_partition_list_t *errored; rd_bool_t changed; rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread)); if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0) return; /* * Unmatched topics will be added to the errored list. */ errored = rd_kafka_topic_partition_list_new(0); /* * Create a list of the topics in metadata that matches our subscription */ tinfos = rd_list_new(rkcg->rkcg_subscription->cnt, (void *)rd_kafka_topic_info_destroy); if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored); else rd_kafka_metadata_topic_filter( rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored); /* * Propagate consumer errors for any non-existent or errored topics. * The function takes ownership of errored. */ rd_kafka_propagate_consumer_topic_errors( rkcg, errored, "Subscribed topic not available"); /* * Update effective list of topics (takes ownership of \c tinfos) */ changed = rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos); if (!do_join || (!changed && /* If we get the same effective list of topics as last time around, * but the join is waiting for this metadata query to complete, * then we should not return here but follow through with the * (re)join below. */ rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA)) return; /* List of subscribed topics changed, trigger rejoin. */ rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_METADATA | RD_KAFKA_DBG_CONSUMER, "REJOIN", "Group \"%.*s\": " "subscription updated from metadata change: " "rejoining group in state %s", RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]); if (rd_kafka_cgrp_rebalance_protocol(rkcg) == RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE) { /* Partitions from deleted topics */ rd_kafka_topic_partition_list_t *owned_but_not_exist = rd_kafka_cgrp_owned_but_not_exist_partitions(rkcg); if (owned_but_not_exist) { rd_kafka_cgrp_assignment_set_lost( rkcg, "%d subscribed topic(s) no longer exist", owned_but_not_exist->cnt); rd_kafka_rebalance_op_incr( rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, owned_but_not_exist, rkcg->rkcg_group_leader.members != NULL /* Rejoin group following revoke's * unassign if we are leader */ , "topics not available"); rd_kafka_topic_partition_list_destroy( owned_but_not_exist); } else { /* Nothing to revoke, rejoin group if we are the * leader. * The KIP says to rejoin the group on metadata * change only if we're the leader. But what if a * non-leader is subscribed to a regex that the others * aren't? * Going against the KIP and rejoining here. */ rd_kafka_cgrp_rejoin( rkcg, "Metadata for subscribed topic(s) has " "changed"); } } else { /* EAGER */ rd_kafka_cgrp_revoke_rejoin(rkcg, "Metadata for subscribed topic(s) " "has changed"); } /* We shouldn't get stuck in this state. */ rd_dassert(rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA); } rd_kafka_consumer_group_metadata_t * rd_kafka_consumer_group_metadata_new(const char *group_id) { rd_kafka_consumer_group_metadata_t *cgmetadata; cgmetadata = rd_kafka_consumer_group_metadata_new_with_genid( group_id, -1, "", NULL); return cgmetadata; } rd_kafka_consumer_group_metadata_t * rd_kafka_consumer_group_metadata_new_with_genid(const char *group_id, int32_t generation_id, const char *member_id, const char *group_instance_id) { rd_kafka_consumer_group_metadata_t *cgmetadata; cgmetadata = rd_calloc(1, sizeof(*cgmetadata)); cgmetadata->group_id = rd_strdup(group_id); cgmetadata->generation_id = generation_id; cgmetadata->member_id = rd_strdup(member_id); if (group_instance_id) cgmetadata->group_instance_id = rd_strdup(group_instance_id); return cgmetadata; } rd_kafka_consumer_group_metadata_t * rd_kafka_consumer_group_metadata(rd_kafka_t *rk) { rd_kafka_consumer_group_metadata_t *cgmetadata; rd_kafka_op_t *rko; rd_kafka_cgrp_t *rkcg; if (!(rkcg = rd_kafka_cgrp_get(rk))) return NULL; rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_CG_METADATA); if (!rko) return NULL; cgmetadata = rko->rko_u.cg_metadata; rko->rko_u.cg_metadata = NULL; rd_kafka_op_destroy(rko); return cgmetadata; } void rd_kafka_consumer_group_metadata_destroy( rd_kafka_consumer_group_metadata_t *cgmetadata) { rd_free(cgmetadata->group_id); rd_free(cgmetadata->member_id); if (cgmetadata->group_instance_id) rd_free(cgmetadata->group_instance_id); rd_free(cgmetadata); } rd_kafka_consumer_group_metadata_t *rd_kafka_consumer_group_metadata_dup( const rd_kafka_consumer_group_metadata_t *cgmetadata) { rd_kafka_consumer_group_metadata_t *ret; ret = rd_calloc(1, sizeof(*cgmetadata)); ret->group_id = rd_strdup(cgmetadata->group_id); ret->generation_id = cgmetadata->generation_id; ret->member_id = rd_strdup(cgmetadata->member_id); if (cgmetadata->group_instance_id) ret->group_instance_id = rd_strdup(cgmetadata->group_instance_id); return ret; } /* * Consumer group metadata serialization format v2: * "CGMDv2:""\0""\0" \ * ["\0"] * Where is the group_id string. */ static const char rd_kafka_consumer_group_metadata_magic[7] = "CGMDv2:"; rd_kafka_error_t *rd_kafka_consumer_group_metadata_write( const rd_kafka_consumer_group_metadata_t *cgmd, void **bufferp, size_t *sizep) { char *buf; size_t size; size_t of = 0; size_t magic_len = sizeof(rd_kafka_consumer_group_metadata_magic); size_t groupid_len = strlen(cgmd->group_id) + 1; size_t generationid_len = sizeof(cgmd->generation_id); size_t member_id_len = strlen(cgmd->member_id) + 1; int8_t group_instance_id_is_null = cgmd->group_instance_id ? 0 : 1; size_t group_instance_id_is_null_len = sizeof(group_instance_id_is_null); size_t group_instance_id_len = cgmd->group_instance_id ? strlen(cgmd->group_instance_id) + 1 : 0; size = magic_len + groupid_len + generationid_len + member_id_len + group_instance_id_is_null_len + group_instance_id_len; buf = rd_malloc(size); memcpy(buf, rd_kafka_consumer_group_metadata_magic, magic_len); of += magic_len; memcpy(buf + of, &cgmd->generation_id, generationid_len); of += generationid_len; memcpy(buf + of, cgmd->group_id, groupid_len); of += groupid_len; memcpy(buf + of, cgmd->member_id, member_id_len); of += member_id_len; memcpy(buf + of, &group_instance_id_is_null, group_instance_id_is_null_len); of += group_instance_id_is_null_len; if (!group_instance_id_is_null) memcpy(buf + of, cgmd->group_instance_id, group_instance_id_len); of += group_instance_id_len; rd_assert(of == size); *bufferp = buf; *sizep = size; return NULL; } /* * Check that a string is printable, returning NULL if not or * a pointer immediately after the end of the string NUL * terminator if so. **/ static const char *str_is_printable(const char *s, const char *end) { const char *c; for (c = s; *c && c != end; c++) if (!isprint((int)*c)) return NULL; return c + 1; } rd_kafka_error_t *rd_kafka_consumer_group_metadata_read( rd_kafka_consumer_group_metadata_t **cgmdp, const void *buffer, size_t size) { const char *buf = (const char *)buffer; const char *end = buf + size; const char *next; size_t magic_len = sizeof(rd_kafka_consumer_group_metadata_magic); int32_t generation_id; size_t generationid_len = sizeof(generation_id); const char *group_id; const char *member_id; int8_t group_instance_id_is_null; const char *group_instance_id = NULL; if (size < magic_len + generationid_len + 1 + 1 + 1) return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG, "Input buffer is too short"); if (memcmp(buffer, rd_kafka_consumer_group_metadata_magic, magic_len)) return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG, "Input buffer is not a serialized " "consumer group metadata object"); memcpy(&generation_id, buf + magic_len, generationid_len); group_id = buf + magic_len + generationid_len; next = str_is_printable(group_id, end); if (!next) return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG, "Input buffer group id is not safe"); member_id = next; next = str_is_printable(member_id, end); if (!next) return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG, "Input buffer member id is not " "safe"); group_instance_id_is_null = (int8_t) * (next++); if (!group_instance_id_is_null) { group_instance_id = next; next = str_is_printable(group_instance_id, end); if (!next) return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG, "Input buffer group " "instance id is not safe"); } if (next != end) return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG, "Input buffer bad length"); *cgmdp = rd_kafka_consumer_group_metadata_new_with_genid( group_id, generation_id, member_id, group_instance_id); return NULL; } static int unittest_consumer_group_metadata_iteration(const char *group_id, int32_t generation_id, const char *member_id, const char *group_instance_id) { rd_kafka_consumer_group_metadata_t *cgmd; void *buffer, *buffer2; size_t size, size2; rd_kafka_error_t *error; cgmd = rd_kafka_consumer_group_metadata_new_with_genid( group_id, generation_id, member_id, group_instance_id); RD_UT_ASSERT(cgmd != NULL, "failed to create metadata"); error = rd_kafka_consumer_group_metadata_write(cgmd, &buffer, &size); RD_UT_ASSERT(!error, "metadata_write failed: %s", rd_kafka_error_string(error)); rd_kafka_consumer_group_metadata_destroy(cgmd); cgmd = NULL; error = rd_kafka_consumer_group_metadata_read(&cgmd, buffer, size); RD_UT_ASSERT(!error, "metadata_read failed: %s", rd_kafka_error_string(error)); /* Serialize again and compare buffers */ error = rd_kafka_consumer_group_metadata_write(cgmd, &buffer2, &size2); RD_UT_ASSERT(!error, "metadata_write failed: %s", rd_kafka_error_string(error)); RD_UT_ASSERT(size == size2 && !memcmp(buffer, buffer2, size), "metadata_read/write size or content mismatch: " "size %" PRIusz ", size2 %" PRIusz, size, size2); rd_kafka_consumer_group_metadata_destroy(cgmd); rd_free(buffer); rd_free(buffer2); return 0; } static int unittest_consumer_group_metadata(void) { const char *ids[] = { "mY. random id:.", "0", "2222222222222222222222221111111111111111111111111111112222", "", "NULL", NULL, }; int i, j, k, gen_id; int ret; const char *group_id; const char *member_id; const char *group_instance_id; for (i = 0; ids[i]; i++) { for (j = 0; ids[j]; j++) { for (k = 0; ids[k]; k++) { for (gen_id = -1; gen_id < 1; gen_id++) { group_id = ids[i]; member_id = ids[j]; group_instance_id = ids[k]; if (strcmp(group_instance_id, "NULL") == 0) group_instance_id = NULL; ret = unittest_consumer_group_metadata_iteration( group_id, gen_id, member_id, group_instance_id); if (ret) return ret; } } } } RD_UT_PASS(); } static int unittest_set_intersect(void) { size_t par_cnt = 10; map_toppar_member_info_t *dst; rd_kafka_topic_partition_t *toppar; PartitionMemberInfo_t *v; char *id = "id"; rd_kafkap_str_t id1 = RD_KAFKAP_STR_INITIALIZER; rd_kafkap_str_t id2 = RD_KAFKAP_STR_INITIALIZER; rd_kafka_group_member_t *gm1; rd_kafka_group_member_t *gm2; id1.len = 2; id1.str = id; id2.len = 2; id2.str = id; map_toppar_member_info_t a = RD_MAP_INITIALIZER( par_cnt, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash, rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); map_toppar_member_info_t b = RD_MAP_INITIALIZER( par_cnt, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash, rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); gm1 = rd_calloc(1, sizeof(*gm1)); gm1->rkgm_member_id = &id1; gm1->rkgm_group_instance_id = &id1; gm2 = rd_calloc(1, sizeof(*gm2)); gm2->rkgm_member_id = &id2; gm2->rkgm_group_instance_id = &id2; RD_MAP_SET(&a, rd_kafka_topic_partition_new("t1", 4), PartitionMemberInfo_new(gm1, rd_false)); RD_MAP_SET(&a, rd_kafka_topic_partition_new("t2", 4), PartitionMemberInfo_new(gm1, rd_false)); RD_MAP_SET(&a, rd_kafka_topic_partition_new("t1", 7), PartitionMemberInfo_new(gm1, rd_false)); RD_MAP_SET(&b, rd_kafka_topic_partition_new("t2", 7), PartitionMemberInfo_new(gm1, rd_false)); RD_MAP_SET(&b, rd_kafka_topic_partition_new("t1", 4), PartitionMemberInfo_new(gm2, rd_false)); dst = rd_kafka_member_partitions_intersect(&a, &b); RD_UT_ASSERT(RD_MAP_CNT(&a) == 3, "expected a cnt to be 3 not %d", (int)RD_MAP_CNT(&a)); RD_UT_ASSERT(RD_MAP_CNT(&b) == 2, "expected b cnt to be 2 not %d", (int)RD_MAP_CNT(&b)); RD_UT_ASSERT(RD_MAP_CNT(dst) == 1, "expected dst cnt to be 1 not %d", (int)RD_MAP_CNT(dst)); toppar = rd_kafka_topic_partition_new("t1", 4); RD_UT_ASSERT((v = RD_MAP_GET(dst, toppar)), "unexpected element"); RD_UT_ASSERT(v->members_match, "expected members to match"); rd_kafka_topic_partition_destroy(toppar); RD_MAP_DESTROY(&a); RD_MAP_DESTROY(&b); RD_MAP_DESTROY(dst); rd_free(dst); rd_free(gm1); rd_free(gm2); RD_UT_PASS(); } static int unittest_set_subtract(void) { size_t par_cnt = 10; rd_kafka_topic_partition_t *toppar; map_toppar_member_info_t *dst; map_toppar_member_info_t a = RD_MAP_INITIALIZER( par_cnt, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash, rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); map_toppar_member_info_t b = RD_MAP_INITIALIZER( par_cnt, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash, rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); RD_MAP_SET(&a, rd_kafka_topic_partition_new("t1", 4), PartitionMemberInfo_new(NULL, rd_false)); RD_MAP_SET(&a, rd_kafka_topic_partition_new("t2", 7), PartitionMemberInfo_new(NULL, rd_false)); RD_MAP_SET(&b, rd_kafka_topic_partition_new("t2", 4), PartitionMemberInfo_new(NULL, rd_false)); RD_MAP_SET(&b, rd_kafka_topic_partition_new("t1", 4), PartitionMemberInfo_new(NULL, rd_false)); RD_MAP_SET(&b, rd_kafka_topic_partition_new("t1", 7), PartitionMemberInfo_new(NULL, rd_false)); dst = rd_kafka_member_partitions_subtract(&a, &b); RD_UT_ASSERT(RD_MAP_CNT(&a) == 2, "expected a cnt to be 2 not %d", (int)RD_MAP_CNT(&a)); RD_UT_ASSERT(RD_MAP_CNT(&b) == 3, "expected b cnt to be 3 not %d", (int)RD_MAP_CNT(&b)); RD_UT_ASSERT(RD_MAP_CNT(dst) == 1, "expected dst cnt to be 1 not %d", (int)RD_MAP_CNT(dst)); toppar = rd_kafka_topic_partition_new("t2", 7); RD_UT_ASSERT(RD_MAP_GET(dst, toppar), "unexpected element"); rd_kafka_topic_partition_destroy(toppar); RD_MAP_DESTROY(&a); RD_MAP_DESTROY(&b); RD_MAP_DESTROY(dst); rd_free(dst); RD_UT_PASS(); } static int unittest_map_to_list(void) { rd_kafka_topic_partition_list_t *list; map_toppar_member_info_t map = RD_MAP_INITIALIZER( 10, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash, rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free); RD_MAP_SET(&map, rd_kafka_topic_partition_new("t1", 101), PartitionMemberInfo_new(NULL, rd_false)); list = rd_kafka_toppar_member_info_map_to_list(&map); RD_UT_ASSERT(list->cnt == 1, "expecting list size of 1 not %d.", list->cnt); RD_UT_ASSERT(list->elems[0].partition == 101, "expecting partition 101 not %d", list->elems[0].partition); RD_UT_ASSERT(!strcmp(list->elems[0].topic, "t1"), "expecting topic 't1', not %s", list->elems[0].topic); rd_kafka_topic_partition_list_destroy(list); RD_MAP_DESTROY(&map); RD_UT_PASS(); } static int unittest_list_to_map(void) { rd_kafka_topic_partition_t *toppar; map_toppar_member_info_t *map; rd_kafka_topic_partition_list_t *list = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(list, "topic1", 201); rd_kafka_topic_partition_list_add(list, "topic2", 202); map = rd_kafka_toppar_list_to_toppar_member_info_map(list); RD_UT_ASSERT(RD_MAP_CNT(map) == 2, "expected map cnt to be 2 not %d", (int)RD_MAP_CNT(map)); toppar = rd_kafka_topic_partition_new("topic1", 201); RD_UT_ASSERT(RD_MAP_GET(map, toppar), "expected topic1 [201] to exist in map"); rd_kafka_topic_partition_destroy(toppar); toppar = rd_kafka_topic_partition_new("topic2", 202); RD_UT_ASSERT(RD_MAP_GET(map, toppar), "expected topic2 [202] to exist in map"); rd_kafka_topic_partition_destroy(toppar); RD_MAP_DESTROY(map); rd_free(map); rd_kafka_topic_partition_list_destroy(list); RD_UT_PASS(); } /** * @brief Consumer group unit tests */ int unittest_cgrp(void) { int fails = 0; fails += unittest_consumer_group_metadata(); fails += unittest_set_intersect(); fails += unittest_set_subtract(); fails += unittest_map_to_list(); fails += unittest_list_to_map(); return fails; }