summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.c5969
1 files changed, 5969 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.c
new file mode 100644
index 000000000..026e93321
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_cgrp.c
@@ -0,0 +1,5969 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_broker.h"
+#include "rdkafka_request.h"
+#include "rdkafka_topic.h"
+#include "rdkafka_partition.h"
+#include "rdkafka_assignor.h"
+#include "rdkafka_offset.h"
+#include "rdkafka_metadata.h"
+#include "rdkafka_cgrp.h"
+#include "rdkafka_interceptor.h"
+#include "rdmap.h"
+
+#include "rdunittest.h"
+
+#include <ctype.h>
+#include <stdarg.h>
+
+static void rd_kafka_cgrp_offset_commit_tmr_cb(rd_kafka_timers_t *rkts,
+ void *arg);
+static rd_kafka_error_t *
+rd_kafka_cgrp_assign(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *assignment);
+static rd_kafka_error_t *rd_kafka_cgrp_unassign(rd_kafka_cgrp_t *rkcg);
+static rd_kafka_error_t *
+rd_kafka_cgrp_incremental_assign(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *partitions);
+static rd_kafka_error_t *
+rd_kafka_cgrp_incremental_unassign(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *partitions);
+
+static rd_kafka_op_res_t rd_kafka_cgrp_op_serve(rd_kafka_t *rk,
+ rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko,
+ rd_kafka_q_cb_type_t cb_type,
+ void *opaque);
+
+static void rd_kafka_cgrp_group_leader_reset(rd_kafka_cgrp_t *rkcg,
+ const char *reason);
+
+static RD_INLINE int rd_kafka_cgrp_try_terminate(rd_kafka_cgrp_t *rkcg);
+
+static void rd_kafka_cgrp_revoke_all_rejoin(rd_kafka_cgrp_t *rkcg,
+ rd_bool_t assignment_lost,
+ rd_bool_t initiating,
+ const char *reason);
+static void rd_kafka_cgrp_revoke_all_rejoin_maybe(rd_kafka_cgrp_t *rkcg,
+ rd_bool_t assignment_lost,
+ rd_bool_t initiating,
+ const char *reason);
+
+static void rd_kafka_cgrp_group_is_rebalancing(rd_kafka_cgrp_t *rkcg);
+
+static void
+rd_kafka_cgrp_max_poll_interval_check_tmr_cb(rd_kafka_timers_t *rkts,
+ void *arg);
+static rd_kafka_resp_err_t
+rd_kafka_cgrp_subscribe(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *rktparlist);
+
+static void rd_kafka_cgrp_group_assignment_set(
+ rd_kafka_cgrp_t *rkcg,
+ const rd_kafka_topic_partition_list_t *partitions);
+static void rd_kafka_cgrp_group_assignment_modify(
+ rd_kafka_cgrp_t *rkcg,
+ rd_bool_t add,
+ const rd_kafka_topic_partition_list_t *partitions);
+
+static void
+rd_kafka_cgrp_handle_assignment(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *assignment);
+
+
+/**
+ * @returns true if the current assignment is lost.
+ */
+rd_bool_t rd_kafka_cgrp_assignment_is_lost(rd_kafka_cgrp_t *rkcg) {
+ return rd_atomic32_get(&rkcg->rkcg_assignment_lost) != 0;
+}
+
+
+/**
+ * @brief Call when the current assignment has been lost, with a
+ * human-readable reason.
+ */
+static void rd_kafka_cgrp_assignment_set_lost(rd_kafka_cgrp_t *rkcg,
+ char *fmt,
+ ...) RD_FORMAT(printf, 2, 3);
+static void
+rd_kafka_cgrp_assignment_set_lost(rd_kafka_cgrp_t *rkcg, char *fmt, ...) {
+ va_list ap;
+ char reason[256];
+
+ if (!rkcg->rkcg_group_assignment)
+ return;
+
+ va_start(ap, fmt);
+ rd_vsnprintf(reason, sizeof(reason), fmt, ap);
+ va_end(ap);
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "LOST",
+ "Group \"%s\": "
+ "current assignment of %d partition(s) lost: %s",
+ rkcg->rkcg_group_id->str, rkcg->rkcg_group_assignment->cnt,
+ reason);
+
+ rd_atomic32_set(&rkcg->rkcg_assignment_lost, rd_true);
+}
+
+
+/**
+ * @brief Call when the current assignment is no longer considered lost, with a
+ * human-readable reason.
+ */
+static void
+rd_kafka_cgrp_assignment_clear_lost(rd_kafka_cgrp_t *rkcg, char *fmt, ...) {
+ va_list ap;
+ char reason[256];
+
+ if (!rd_atomic32_get(&rkcg->rkcg_assignment_lost))
+ return;
+
+ va_start(ap, fmt);
+ rd_vsnprintf(reason, sizeof(reason), fmt, ap);
+ va_end(ap);
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "LOST",
+ "Group \"%s\": "
+ "current assignment no longer considered lost: %s",
+ rkcg->rkcg_group_id->str, reason);
+
+ rd_atomic32_set(&rkcg->rkcg_assignment_lost, rd_false);
+}
+
+
+/**
+ * @brief The rebalance protocol currently in use. This will be
+ * RD_KAFKA_REBALANCE_PROTOCOL_NONE if the consumer has not
+ * (yet) joined a group, else it will match the rebalance
+ * protocol of the configured assignor(s).
+ *
+ * @locality main thread
+ */
+rd_kafka_rebalance_protocol_t
+rd_kafka_cgrp_rebalance_protocol(rd_kafka_cgrp_t *rkcg) {
+ if (!rkcg->rkcg_assignor)
+ return RD_KAFKA_REBALANCE_PROTOCOL_NONE;
+ return rkcg->rkcg_assignor->rkas_protocol;
+}
+
+
+
+/**
+ * @returns true if the cgrp is awaiting a protocol response. This prohibits
+ * the join-state machine to proceed before the current state
+ * is done.
+ */
+static rd_bool_t rd_kafka_cgrp_awaiting_response(rd_kafka_cgrp_t *rkcg) {
+ return rkcg->rkcg_wait_resp != -1;
+}
+
+
+/**
+ * @brief Set flag indicating we are waiting for a coordinator response
+ * for the given request.
+ *
+ * This is used for specific requests to postpone rejoining the group if
+ * there are outstanding JoinGroup or SyncGroup requests.
+ *
+ * @locality main thread
+ */
+static void rd_kafka_cgrp_set_wait_resp(rd_kafka_cgrp_t *rkcg, int16_t ApiKey) {
+ rd_assert(rkcg->rkcg_wait_resp == -1);
+ rkcg->rkcg_wait_resp = ApiKey;
+}
+
+/**
+ * @brief Clear the flag that says we're waiting for a coordinator response
+ * for the given \p request.
+ *
+ * @param request Original request, possibly NULL (for errors).
+ *
+ * @locality main thread
+ */
+static void rd_kafka_cgrp_clear_wait_resp(rd_kafka_cgrp_t *rkcg,
+ int16_t ApiKey) {
+ rd_assert(rkcg->rkcg_wait_resp == ApiKey);
+ rkcg->rkcg_wait_resp = -1;
+}
+
+
+
+/**
+ * @struct Auxillary glue type used for COOPERATIVE rebalance set operations.
+ */
+typedef struct PartitionMemberInfo_s {
+ const rd_kafka_group_member_t *member;
+ rd_bool_t members_match;
+} PartitionMemberInfo_t;
+
+static PartitionMemberInfo_t *
+PartitionMemberInfo_new(const rd_kafka_group_member_t *member,
+ rd_bool_t members_match) {
+ PartitionMemberInfo_t *pmi;
+
+ pmi = rd_calloc(1, sizeof(*pmi));
+ pmi->member = member;
+ pmi->members_match = members_match;
+
+ return pmi;
+}
+
+static void PartitionMemberInfo_free(void *p) {
+ PartitionMemberInfo_t *pmi = p;
+ rd_free(pmi);
+}
+
+typedef RD_MAP_TYPE(const rd_kafka_topic_partition_t *,
+ PartitionMemberInfo_t *) map_toppar_member_info_t;
+
+
+/**
+ * @returns true if consumer has joined the group and thus requires a leave.
+ */
+#define RD_KAFKA_CGRP_HAS_JOINED(rkcg) \
+ (rkcg->rkcg_member_id != NULL && \
+ RD_KAFKAP_STR_LEN((rkcg)->rkcg_member_id) > 0)
+
+
+/**
+ * @returns true if cgrp is waiting for a rebalance_cb to be handled by
+ * the application.
+ */
+#define RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) \
+ ((rkcg)->rkcg_join_state == \
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL || \
+ (rkcg)->rkcg_join_state == \
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL)
+
+/**
+ * @returns true if a rebalance is in progress.
+ *
+ * 1. In WAIT_JOIN or WAIT_METADATA state with a member-id set,
+ * this happens on rejoin.
+ * 2. In WAIT_SYNC waiting for the group to rebalance on the broker.
+ * 3. in *_WAIT_UNASSIGN_TO_COMPLETE waiting for unassigned partitions to
+ * stop fetching, et.al.
+ * 4. In _WAIT_*ASSIGN_CALL waiting for the application to handle the
+ * assignment changes in its rebalance callback and then call *assign().
+ * 5. An incremental rebalancing is in progress.
+ * 6. A rebalance-induced rejoin is in progress.
+ */
+#define RD_KAFKA_CGRP_REBALANCING(rkcg) \
+ ((RD_KAFKA_CGRP_HAS_JOINED(rkcg) && \
+ ((rkcg)->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN || \
+ (rkcg)->rkcg_join_state == \
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA)) || \
+ (rkcg)->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC || \
+ (rkcg)->rkcg_join_state == \
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE || \
+ (rkcg)->rkcg_join_state == \
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE || \
+ (rkcg)->rkcg_join_state == \
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL || \
+ (rkcg)->rkcg_join_state == \
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL || \
+ (rkcg)->rkcg_rebalance_incr_assignment != NULL || \
+ (rkcg)->rkcg_rebalance_rejoin)
+
+
+
+const char *rd_kafka_cgrp_state_names[] = {
+ "init", "term", "query-coord",
+ "wait-coord", "wait-broker", "wait-broker-transport",
+ "up"};
+
+const char *rd_kafka_cgrp_join_state_names[] = {
+ "init",
+ "wait-join",
+ "wait-metadata",
+ "wait-sync",
+ "wait-assign-call",
+ "wait-unassign-call",
+ "wait-unassign-to-complete",
+ "wait-incr-unassign-to-complete",
+ "steady",
+};
+
+
+/**
+ * @brief Change the cgrp state.
+ *
+ * @returns 1 if the state was changed, else 0.
+ */
+static int rd_kafka_cgrp_set_state(rd_kafka_cgrp_t *rkcg, int state) {
+ if ((int)rkcg->rkcg_state == state)
+ return 0;
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPSTATE",
+ "Group \"%.*s\" changed state %s -> %s "
+ "(join-state %s)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_kafka_cgrp_state_names[state],
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+ rkcg->rkcg_state = state;
+ rkcg->rkcg_ts_statechange = rd_clock();
+
+ rd_kafka_brokers_broadcast_state_change(rkcg->rkcg_rk);
+
+ return 1;
+}
+
+
+void rd_kafka_cgrp_set_join_state(rd_kafka_cgrp_t *rkcg, int join_state) {
+ if ((int)rkcg->rkcg_join_state == join_state)
+ return;
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPJOINSTATE",
+ "Group \"%.*s\" changed join state %s -> %s "
+ "(state %s)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ rd_kafka_cgrp_join_state_names[join_state],
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
+ rkcg->rkcg_join_state = join_state;
+}
+
+
+void rd_kafka_cgrp_destroy_final(rd_kafka_cgrp_t *rkcg) {
+ rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_subscription);
+ rd_kafka_assert(rkcg->rkcg_rk, !rkcg->rkcg_group_leader.members);
+ rd_kafka_cgrp_set_member_id(rkcg, NULL);
+ if (rkcg->rkcg_group_instance_id)
+ rd_kafkap_str_destroy(rkcg->rkcg_group_instance_id);
+
+ rd_kafka_q_destroy_owner(rkcg->rkcg_q);
+ rd_kafka_q_destroy_owner(rkcg->rkcg_ops);
+ rd_kafka_q_destroy_owner(rkcg->rkcg_wait_coord_q);
+ rd_kafka_assert(rkcg->rkcg_rk, TAILQ_EMPTY(&rkcg->rkcg_topics));
+ rd_kafka_assert(rkcg->rkcg_rk, rd_list_empty(&rkcg->rkcg_toppars));
+ rd_list_destroy(&rkcg->rkcg_toppars);
+ rd_list_destroy(rkcg->rkcg_subscribed_topics);
+ rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics);
+ if (rkcg->rkcg_assignor && rkcg->rkcg_assignor->rkas_destroy_state_cb)
+ rkcg->rkcg_assignor->rkas_destroy_state_cb(
+ rkcg->rkcg_assignor_state);
+ rd_free(rkcg);
+}
+
+
+
+/**
+ * @brief Update the absolute session timeout following a successfull
+ * response from the coordinator.
+ * This timeout is used to enforce the session timeout in the
+ * consumer itself.
+ *
+ * @param reset if true the timeout is updated even if the session has expired.
+ */
+static RD_INLINE void
+rd_kafka_cgrp_update_session_timeout(rd_kafka_cgrp_t *rkcg, rd_bool_t reset) {
+ if (reset || rkcg->rkcg_ts_session_timeout != 0)
+ rkcg->rkcg_ts_session_timeout =
+ rd_clock() +
+ (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000);
+}
+
+
+
+rd_kafka_cgrp_t *rd_kafka_cgrp_new(rd_kafka_t *rk,
+ const rd_kafkap_str_t *group_id,
+ const rd_kafkap_str_t *client_id) {
+ rd_kafka_cgrp_t *rkcg;
+
+ rkcg = rd_calloc(1, sizeof(*rkcg));
+
+ rkcg->rkcg_rk = rk;
+ rkcg->rkcg_group_id = group_id;
+ rkcg->rkcg_client_id = client_id;
+ rkcg->rkcg_coord_id = -1;
+ rkcg->rkcg_generation_id = -1;
+ rkcg->rkcg_wait_resp = -1;
+
+ rkcg->rkcg_ops = rd_kafka_q_new(rk);
+ rkcg->rkcg_ops->rkq_serve = rd_kafka_cgrp_op_serve;
+ rkcg->rkcg_ops->rkq_opaque = rkcg;
+ rkcg->rkcg_wait_coord_q = rd_kafka_q_new(rk);
+ rkcg->rkcg_wait_coord_q->rkq_serve = rkcg->rkcg_ops->rkq_serve;
+ rkcg->rkcg_wait_coord_q->rkq_opaque = rkcg->rkcg_ops->rkq_opaque;
+ rkcg->rkcg_q = rd_kafka_q_new(rk);
+ rkcg->rkcg_group_instance_id =
+ rd_kafkap_str_new(rk->rk_conf.group_instance_id, -1);
+
+ TAILQ_INIT(&rkcg->rkcg_topics);
+ rd_list_init(&rkcg->rkcg_toppars, 32, NULL);
+ rd_kafka_cgrp_set_member_id(rkcg, "");
+ rkcg->rkcg_subscribed_topics =
+ rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
+ rd_interval_init(&rkcg->rkcg_coord_query_intvl);
+ rd_interval_init(&rkcg->rkcg_heartbeat_intvl);
+ rd_interval_init(&rkcg->rkcg_join_intvl);
+ rd_interval_init(&rkcg->rkcg_timeout_scan_intvl);
+ rd_atomic32_init(&rkcg->rkcg_assignment_lost, rd_false);
+ rd_atomic32_init(&rkcg->rkcg_terminated, rd_false);
+
+ rkcg->rkcg_errored_topics = rd_kafka_topic_partition_list_new(0);
+
+ /* Create a logical group coordinator broker to provide
+ * a dedicated connection for group coordination.
+ * This is needed since JoinGroup may block for up to
+ * max.poll.interval.ms, effectively blocking and timing out
+ * any other protocol requests (such as Metadata).
+ * The address for this broker will be updated when
+ * the group coordinator is assigned. */
+ rkcg->rkcg_coord = rd_kafka_broker_add_logical(rk, "GroupCoordinator");
+
+ if (rk->rk_conf.enable_auto_commit &&
+ rk->rk_conf.auto_commit_interval_ms > 0)
+ rd_kafka_timer_start(
+ &rk->rk_timers, &rkcg->rkcg_offset_commit_tmr,
+ rk->rk_conf.auto_commit_interval_ms * 1000ll,
+ rd_kafka_cgrp_offset_commit_tmr_cb, rkcg);
+
+ return rkcg;
+}
+
+
+/**
+ * @brief Set the group coordinator broker.
+ */
+static void rd_kafka_cgrp_coord_set_broker(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_broker_t *rkb) {
+
+ rd_assert(rkcg->rkcg_curr_coord == NULL);
+
+ rd_assert(RD_KAFKA_CGRP_BROKER_IS_COORD(rkcg, rkb));
+
+ rkcg->rkcg_curr_coord = rkb;
+ rd_kafka_broker_keep(rkb);
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORDSET",
+ "Group \"%.*s\" coordinator set to broker %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_broker_name(rkb));
+
+ /* Reset query interval to trigger an immediate
+ * coord query if required */
+ if (!rd_interval_disabled(&rkcg->rkcg_coord_query_intvl))
+ rd_interval_reset(&rkcg->rkcg_coord_query_intvl);
+
+ rd_kafka_cgrp_set_state(rkcg,
+ RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT);
+
+ rd_kafka_broker_persistent_connection_add(
+ rkcg->rkcg_coord, &rkcg->rkcg_coord->rkb_persistconn.coord);
+
+ /* Set the logical coordinator's nodename to the
+ * proper broker's nodename, this will trigger a (re)connect
+ * to the new address. */
+ rd_kafka_broker_set_nodename(rkcg->rkcg_coord, rkb);
+}
+
+
+/**
+ * @brief Reset/clear the group coordinator broker.
+ */
+static void rd_kafka_cgrp_coord_clear_broker(rd_kafka_cgrp_t *rkcg) {
+ rd_kafka_broker_t *rkb = rkcg->rkcg_curr_coord;
+
+ rd_assert(rkcg->rkcg_curr_coord);
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORDCLEAR",
+ "Group \"%.*s\" broker %s is no longer coordinator",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_broker_name(rkb));
+
+ rd_assert(rkcg->rkcg_coord);
+
+ rd_kafka_broker_persistent_connection_del(
+ rkcg->rkcg_coord, &rkcg->rkcg_coord->rkb_persistconn.coord);
+
+ /* Clear the ephemeral broker's nodename.
+ * This will also trigger a disconnect. */
+ rd_kafka_broker_set_nodename(rkcg->rkcg_coord, NULL);
+
+ rkcg->rkcg_curr_coord = NULL;
+ rd_kafka_broker_destroy(rkb); /* from set_coord_broker() */
+}
+
+
+/**
+ * @brief Update/set the group coordinator.
+ *
+ * Will do nothing if there's been no change.
+ *
+ * @returns 1 if the coordinator, or state, was updated, else 0.
+ */
+static int rd_kafka_cgrp_coord_update(rd_kafka_cgrp_t *rkcg, int32_t coord_id) {
+
+ /* Don't do anything while terminating */
+ if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM)
+ return 0;
+
+ /* Check if coordinator changed */
+ if (rkcg->rkcg_coord_id != coord_id) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPCOORD",
+ "Group \"%.*s\" changing coordinator %" PRId32
+ " -> %" PRId32,
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rkcg->rkcg_coord_id, coord_id);
+
+ /* Update coord id */
+ rkcg->rkcg_coord_id = coord_id;
+
+ /* Clear previous broker handle, if any */
+ if (rkcg->rkcg_curr_coord)
+ rd_kafka_cgrp_coord_clear_broker(rkcg);
+ }
+
+
+ if (rkcg->rkcg_curr_coord) {
+ /* There is already a known coordinator and a
+ * corresponding broker handle. */
+ if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP)
+ return rd_kafka_cgrp_set_state(
+ rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT);
+
+ } else if (rkcg->rkcg_coord_id != -1) {
+ rd_kafka_broker_t *rkb;
+
+ /* Try to find the coordinator broker handle */
+ rd_kafka_rdlock(rkcg->rkcg_rk);
+ rkb = rd_kafka_broker_find_by_nodeid(rkcg->rkcg_rk, coord_id);
+ rd_kafka_rdunlock(rkcg->rkcg_rk);
+
+ /* It is possible, due to stale metadata, that the
+ * coordinator id points to a broker we still don't know
+ * about. In this case the client will continue
+ * querying metadata and querying for the coordinator
+ * until a match is found. */
+
+ if (rkb) {
+ /* Coordinator is known and broker handle exists */
+ rd_kafka_cgrp_coord_set_broker(rkcg, rkb);
+ rd_kafka_broker_destroy(rkb); /*from find_by_nodeid()*/
+
+ return 1;
+ } else {
+ /* Coordinator is known but no corresponding
+ * broker handle. */
+ return rd_kafka_cgrp_set_state(
+ rkcg, RD_KAFKA_CGRP_STATE_WAIT_BROKER);
+ }
+
+ } else {
+ /* Coordinator still not known, re-query */
+ if (rkcg->rkcg_state >= RD_KAFKA_CGRP_STATE_WAIT_COORD)
+ return rd_kafka_cgrp_set_state(
+ rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
+ }
+
+ return 0; /* no change */
+}
+
+
+
+/**
+ * Handle FindCoordinator response
+ */
+static void rd_kafka_cgrp_handle_FindCoordinator(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ const int log_decode_errors = LOG_ERR;
+ int16_t ErrorCode = 0;
+ int32_t CoordId;
+ rd_kafkap_str_t CoordHost = RD_ZERO_INIT;
+ int32_t CoordPort;
+ rd_kafka_cgrp_t *rkcg = opaque;
+ struct rd_kafka_metadata_broker mdb = RD_ZERO_INIT;
+ char *errstr = NULL;
+ int actions;
+
+ if (likely(!(ErrorCode = err))) {
+ if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1)
+ rd_kafka_buf_read_throttle_time(rkbuf);
+
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+
+ if (rkbuf->rkbuf_reqhdr.ApiVersion >= 1) {
+ rd_kafkap_str_t ErrorMsg;
+
+ rd_kafka_buf_read_str(rkbuf, &ErrorMsg);
+
+ if (!RD_KAFKAP_STR_IS_NULL(&ErrorMsg))
+ RD_KAFKAP_STR_DUPA(&errstr, &ErrorMsg);
+ }
+
+ rd_kafka_buf_read_i32(rkbuf, &CoordId);
+ rd_kafka_buf_read_str(rkbuf, &CoordHost);
+ rd_kafka_buf_read_i32(rkbuf, &CoordPort);
+ }
+
+ if (ErrorCode)
+ goto err;
+
+
+ mdb.id = CoordId;
+ RD_KAFKAP_STR_DUPA(&mdb.host, &CoordHost);
+ mdb.port = CoordPort;
+
+ rd_rkb_dbg(rkb, CGRP, "CGRPCOORD",
+ "Group \"%.*s\" coordinator is %s:%i id %" PRId32,
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), mdb.host, mdb.port,
+ mdb.id);
+ rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, &mdb, NULL);
+
+ rd_kafka_cgrp_coord_update(rkcg, CoordId);
+ rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
+ return;
+
+err_parse: /* Parse error */
+ ErrorCode = rkbuf->rkbuf_err;
+ /* FALLTHRU */
+
+err:
+ if (!errstr)
+ errstr = (char *)rd_kafka_err2str(ErrorCode);
+
+ rd_rkb_dbg(rkb, CGRP, "CGRPCOORD",
+ "Group \"%.*s\" FindCoordinator response error: %s: %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_err2name(ErrorCode), errstr);
+
+ if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
+ return;
+
+ actions = rd_kafka_err_action(
+ rkb, ErrorCode, request,
+
+ RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_REFRESH,
+ RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
+
+ RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TRANSPORT,
+
+ RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TIMED_OUT,
+
+ RD_KAFKA_ERR_ACTION_RETRY, RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE,
+
+ RD_KAFKA_ERR_ACTION_END);
+
+
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ rd_kafka_cgrp_coord_update(rkcg, -1);
+ } else {
+ if (!(actions & RD_KAFKA_ERR_ACTION_RETRY) &&
+ rkcg->rkcg_last_err != ErrorCode) {
+ /* Propagate non-retriable errors to the application */
+ rd_kafka_consumer_err(
+ rkcg->rkcg_q, rd_kafka_broker_id(rkb), ErrorCode, 0,
+ NULL, NULL, RD_KAFKA_OFFSET_INVALID,
+ "FindCoordinator response error: %s", errstr);
+
+ /* Suppress repeated errors */
+ rkcg->rkcg_last_err = ErrorCode;
+ }
+
+ /* Retries are performed by the timer-intervalled
+ * coord queries, continue querying */
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
+ }
+
+ rd_kafka_cgrp_serve(rkcg); /* Serve updated state, if possible */
+}
+
+
+/**
+ * Query for coordinator.
+ * Ask any broker in state UP
+ *
+ * Locality: main thread
+ */
+void rd_kafka_cgrp_coord_query(rd_kafka_cgrp_t *rkcg, const char *reason) {
+ rd_kafka_broker_t *rkb;
+ rd_kafka_resp_err_t err;
+
+ rkb = rd_kafka_broker_any_usable(
+ rkcg->rkcg_rk, RD_POLL_NOWAIT, RD_DO_LOCK,
+ RD_KAFKA_FEATURE_BROKER_GROUP_COORD, "coordinator query");
+
+ if (!rkb) {
+ /* Reset the interval because there were no brokers. When a
+ * broker becomes available, we want to query it immediately. */
+ rd_interval_reset(&rkcg->rkcg_coord_query_intvl);
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPQUERY",
+ "Group \"%.*s\": "
+ "no broker available for coordinator query: %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
+ return;
+ }
+
+ rd_rkb_dbg(rkb, CGRP, "CGRPQUERY",
+ "Group \"%.*s\": querying for coordinator: %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
+
+ err = rd_kafka_FindCoordinatorRequest(
+ rkb, RD_KAFKA_COORD_GROUP, rkcg->rkcg_group_id->str,
+ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+ rd_kafka_cgrp_handle_FindCoordinator, rkcg);
+
+ if (err) {
+ rd_rkb_dbg(rkb, CGRP, "CGRPQUERY",
+ "Group \"%.*s\": "
+ "unable to send coordinator query: %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_err2str(err));
+ rd_kafka_broker_destroy(rkb);
+ return;
+ }
+
+ if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_QUERY_COORD)
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_WAIT_COORD);
+
+ rd_kafka_broker_destroy(rkb);
+
+ /* Back off the next intervalled query since we just sent one. */
+ rd_interval_reset_to_now(&rkcg->rkcg_coord_query_intvl, 0);
+}
+
+/**
+ * @brief Mark the current coordinator as dead.
+ *
+ * @locality main thread
+ */
+void rd_kafka_cgrp_coord_dead(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_resp_err_t err,
+ const char *reason) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COORD",
+ "Group \"%.*s\": "
+ "marking the coordinator (%" PRId32 ") dead: %s: %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), rkcg->rkcg_coord_id,
+ rd_kafka_err2str(err), reason);
+
+ rd_kafka_cgrp_coord_update(rkcg, -1);
+
+ /* Re-query for coordinator */
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
+ rd_kafka_cgrp_coord_query(rkcg, reason);
+}
+
+
+/**
+ * @returns a new reference to the current coordinator, if available, else NULL.
+ *
+ * @locality rdkafka main thread
+ * @locks_required none
+ * @locks_acquired none
+ */
+rd_kafka_broker_t *rd_kafka_cgrp_get_coord(rd_kafka_cgrp_t *rkcg) {
+ if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP || !rkcg->rkcg_coord)
+ return NULL;
+
+ rd_kafka_broker_keep(rkcg->rkcg_coord);
+
+ return rkcg->rkcg_coord;
+}
+
+
+/**
+ * @brief cgrp handling of LeaveGroup responses
+ * @param opaque must be the cgrp handle.
+ * @locality rdkafka main thread (unless err==ERR__DESTROY)
+ */
+static void rd_kafka_cgrp_handle_LeaveGroup(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_cgrp_t *rkcg = opaque;
+ const int log_decode_errors = LOG_ERR;
+ int16_t ErrorCode = 0;
+
+ if (err) {
+ ErrorCode = err;
+ goto err;
+ }
+
+ if (request->rkbuf_reqhdr.ApiVersion >= 1)
+ rd_kafka_buf_read_throttle_time(rkbuf);
+
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+
+err:
+ if (ErrorCode)
+ rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
+ "LeaveGroup response error in state %s: %s",
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_kafka_err2str(ErrorCode));
+ else
+ rd_kafka_dbg(rkb->rkb_rk, CGRP, "LEAVEGROUP",
+ "LeaveGroup response received in state %s",
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
+
+ if (ErrorCode != RD_KAFKA_RESP_ERR__DESTROY) {
+ rd_assert(thrd_is_current(rk->rk_thread));
+ rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE;
+ rd_kafka_cgrp_try_terminate(rkcg);
+ }
+
+
+
+ return;
+
+err_parse:
+ ErrorCode = rkbuf->rkbuf_err;
+ goto err;
+}
+
+
+static void rd_kafka_cgrp_leave(rd_kafka_cgrp_t *rkcg) {
+ char *member_id;
+
+ RD_KAFKAP_STR_DUPA(&member_id, rkcg->rkcg_member_id);
+
+ /* Leaving the group invalidates the member id, reset it
+ * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */
+ rd_kafka_cgrp_set_member_id(rkcg, "");
+
+ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
+ "Group \"%.*s\": leave (in state %s): "
+ "LeaveGroupRequest already in-transit",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
+ return;
+ }
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "LEAVE",
+ "Group \"%.*s\": leave (in state %s)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
+
+ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WAIT_LEAVE;
+
+ if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP) {
+ rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE",
+ "Leaving group");
+ rd_kafka_LeaveGroupRequest(
+ rkcg->rkcg_coord, rkcg->rkcg_group_id->str, member_id,
+ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+ rd_kafka_cgrp_handle_LeaveGroup, rkcg);
+ } else
+ rd_kafka_cgrp_handle_LeaveGroup(rkcg->rkcg_rk, rkcg->rkcg_coord,
+ RD_KAFKA_RESP_ERR__WAIT_COORD,
+ NULL, NULL, rkcg);
+}
+
+
+/**
+ * @brief Leave group, if desired.
+ *
+ * @returns true if a LeaveGroup was issued, else false.
+ */
+static rd_bool_t rd_kafka_cgrp_leave_maybe(rd_kafka_cgrp_t *rkcg) {
+
+ /* We were not instructed to leave in the first place. */
+ if (!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE))
+ return rd_false;
+
+ rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE;
+
+ /* Don't send Leave when termating with NO_CONSUMER_CLOSE flag */
+ if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk))
+ return rd_false;
+
+ /* KIP-345: Static group members must not send a LeaveGroupRequest
+ * on termination. */
+ if (RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg) &&
+ rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
+ return rd_false;
+
+ rd_kafka_cgrp_leave(rkcg);
+
+ return rd_true;
+}
+
+
+/**
+ * @brief Enqueues a rebalance op, delegating responsibility of calling
+ * incremental_assign / incremental_unassign to the application.
+ * If there is no rebalance handler configured, or the action
+ * should not be delegated to the application for some other
+ * reason, incremental_assign / incremental_unassign will be called
+ * automatically, immediately.
+ *
+ * @param rejoin whether or not to rejoin the group following completion
+ * of the incremental assign / unassign.
+ *
+ * @remarks does not take ownership of \p partitions.
+ */
+void rd_kafka_rebalance_op_incr(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *partitions,
+ rd_bool_t rejoin,
+ const char *reason) {
+ rd_kafka_error_t *error;
+
+ /* Flag to rejoin after completion of the incr_assign or incr_unassign,
+ if required. */
+ rkcg->rkcg_rebalance_rejoin = rejoin;
+
+ rd_kafka_wrlock(rkcg->rkcg_rk);
+ rkcg->rkcg_c.ts_rebalance = rd_clock();
+ rkcg->rkcg_c.rebalance_cnt++;
+ rd_kafka_wrunlock(rkcg->rkcg_rk);
+
+ if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk) ||
+ rd_kafka_fatal_error_code(rkcg->rkcg_rk)) {
+ /* Total unconditional unassign in these cases */
+ rd_kafka_cgrp_unassign(rkcg);
+
+ /* Now serve the assignment to make updates */
+ rd_kafka_assignment_serve(rkcg->rkcg_rk);
+ goto done;
+ }
+
+ rd_kafka_cgrp_set_join_state(
+ rkcg, err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
+ ? RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL
+ : RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL);
+
+ /* Schedule application rebalance callback/event if enabled */
+ if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE) {
+ rd_kafka_op_t *rko;
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
+ "Group \"%s\": delegating incremental %s of %d "
+ "partition(s) to application on queue %s: %s",
+ rkcg->rkcg_group_id->str,
+ err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
+ ? "revoke"
+ : "assign",
+ partitions->cnt,
+ rd_kafka_q_dest_name(rkcg->rkcg_q), reason);
+
+ /* Pause currently assigned partitions while waiting for
+ * rebalance callback to get called to make sure the
+ * application will not receive any more messages that
+ * might block it from serving the rebalance callback
+ * and to not process messages for partitions it
+ * might have lost in the rebalance. */
+ rd_kafka_assignment_pause(rkcg->rkcg_rk,
+ "incremental rebalance");
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE);
+ rko->rko_err = err;
+ rko->rko_u.rebalance.partitions =
+ rd_kafka_topic_partition_list_copy(partitions);
+
+ if (rd_kafka_q_enq(rkcg->rkcg_q, rko))
+ goto done; /* Rebalance op successfully enqueued */
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
+ "Group \"%s\": ops queue is disabled, not "
+ "delegating partition %s to application",
+ rkcg->rkcg_group_id->str,
+ err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
+ ? "unassign"
+ : "assign");
+ /* FALLTHRU */
+ }
+
+ /* No application rebalance callback/event handler, or it is not
+ * available, do the assign/unassign ourselves.
+ * We need to be careful here not to trigger assignment_serve()
+ * since it may call into the cgrp code again, in which case we
+ * can't really track what the outcome state will be. */
+
+ if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
+ error = rd_kafka_cgrp_incremental_assign(rkcg, partitions);
+ else
+ error = rd_kafka_cgrp_incremental_unassign(rkcg, partitions);
+
+ if (error) {
+ rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE",
+ "Group \"%s\": internal incremental %s "
+ "of %d partition(s) failed: %s: "
+ "unassigning all partitions and rejoining",
+ rkcg->rkcg_group_id->str,
+ err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
+ ? "unassign"
+ : "assign",
+ partitions->cnt, rd_kafka_error_string(error));
+ rd_kafka_error_destroy(error);
+
+ rd_kafka_cgrp_set_join_state(rkcg,
+ /* This is a clean state for
+ * assignment_done() to rejoin
+ * from. */
+ RD_KAFKA_CGRP_JOIN_STATE_STEADY);
+ rd_kafka_assignment_clear(rkcg->rkcg_rk);
+ }
+
+ /* Now serve the assignment to make updates */
+ rd_kafka_assignment_serve(rkcg->rkcg_rk);
+
+done:
+ /* Update the current group assignment based on the
+ * added/removed partitions. */
+ rd_kafka_cgrp_group_assignment_modify(
+ rkcg, err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, partitions);
+}
+
+
+/**
+ * @brief Enqueues a rebalance op, delegating responsibility of calling
+ * assign / unassign to the application. If there is no rebalance
+ * handler configured, or the action should not be delegated to the
+ * application for some other reason, assign / unassign will be
+ * called automatically.
+ *
+ * @remarks \p partitions is copied.
+ */
+static void rd_kafka_rebalance_op(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *assignment,
+ const char *reason) {
+ rd_kafka_error_t *error;
+
+ rd_kafka_wrlock(rkcg->rkcg_rk);
+ rkcg->rkcg_c.ts_rebalance = rd_clock();
+ rkcg->rkcg_c.rebalance_cnt++;
+ rd_kafka_wrunlock(rkcg->rkcg_rk);
+
+ if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk) ||
+ rd_kafka_fatal_error_code(rkcg->rkcg_rk)) {
+ /* Unassign */
+ rd_kafka_cgrp_unassign(rkcg);
+
+ /* Now serve the assignment to make updates */
+ rd_kafka_assignment_serve(rkcg->rkcg_rk);
+ goto done;
+ }
+
+ rd_assert(assignment != NULL);
+
+ rd_kafka_cgrp_set_join_state(
+ rkcg, err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
+ ? RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL
+ : RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL);
+
+ /* Schedule application rebalance callback/event if enabled */
+ if (rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE) {
+ rd_kafka_op_t *rko;
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGN",
+ "Group \"%s\": delegating %s of %d partition(s) "
+ "to application on queue %s: %s",
+ rkcg->rkcg_group_id->str,
+ err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
+ ? "revoke"
+ : "assign",
+ assignment->cnt,
+ rd_kafka_q_dest_name(rkcg->rkcg_q), reason);
+
+ /* Pause currently assigned partitions while waiting for
+ * rebalance callback to get called to make sure the
+ * application will not receive any more messages that
+ * might block it from serving the rebalance callback
+ * and to not process messages for partitions it
+ * might have lost in the rebalance. */
+ rd_kafka_assignment_pause(rkcg->rkcg_rk, "rebalance");
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_REBALANCE);
+ rko->rko_err = err;
+ rko->rko_u.rebalance.partitions =
+ rd_kafka_topic_partition_list_copy(assignment);
+
+ if (rd_kafka_q_enq(rkcg->rkcg_q, rko))
+ goto done; /* Rebalance op successfully enqueued */
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
+ "Group \"%s\": ops queue is disabled, not "
+ "delegating partition %s to application",
+ rkcg->rkcg_group_id->str,
+ err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
+ ? "unassign"
+ : "assign");
+
+ /* FALLTHRU */
+ }
+
+ /* No application rebalance callback/event handler, or it is not
+ * available, do the assign/unassign ourselves.
+ * We need to be careful here not to trigger assignment_serve()
+ * since it may call into the cgrp code again, in which case we
+ * can't really track what the outcome state will be. */
+
+ if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
+ error = rd_kafka_cgrp_assign(rkcg, assignment);
+ else
+ error = rd_kafka_cgrp_unassign(rkcg);
+
+ if (error) {
+ rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE",
+ "Group \"%s\": internal %s "
+ "of %d partition(s) failed: %s: "
+ "unassigning all partitions and rejoining",
+ rkcg->rkcg_group_id->str,
+ err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
+ ? "unassign"
+ : "assign",
+ rkcg->rkcg_group_assignment->cnt,
+ rd_kafka_error_string(error));
+ rd_kafka_error_destroy(error);
+
+ rd_kafka_cgrp_set_join_state(rkcg,
+ /* This is a clean state for
+ * assignment_done() to rejoin
+ * from. */
+ RD_KAFKA_CGRP_JOIN_STATE_STEADY);
+ rd_kafka_assignment_clear(rkcg->rkcg_rk);
+ }
+
+ /* Now serve the assignment to make updates */
+ rd_kafka_assignment_serve(rkcg->rkcg_rk);
+
+done:
+ if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS)
+ rd_kafka_cgrp_group_assignment_set(rkcg, assignment);
+ else
+ rd_kafka_cgrp_group_assignment_set(rkcg, NULL);
+}
+
+
+/**
+ * @brief Rejoin the group.
+ *
+ * @remark This function must not have any side-effects but setting the
+ * join state.
+ */
+static void rd_kafka_cgrp_rejoin(rd_kafka_cgrp_t *rkcg, const char *fmt, ...)
+ RD_FORMAT(printf, 2, 3);
+
+static void rd_kafka_cgrp_rejoin(rd_kafka_cgrp_t *rkcg, const char *fmt, ...) {
+ char reason[512];
+ va_list ap;
+ char astr[128];
+
+ va_start(ap, fmt);
+ rd_vsnprintf(reason, sizeof(reason), fmt, ap);
+ va_end(ap);
+
+ if (rkcg->rkcg_group_assignment)
+ rd_snprintf(astr, sizeof(astr), " with %d owned partition(s)",
+ rkcg->rkcg_group_assignment->cnt);
+ else
+ rd_snprintf(astr, sizeof(astr), " without an assignment");
+
+ if (rkcg->rkcg_subscription || rkcg->rkcg_next_subscription) {
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REJOIN",
+ "Group \"%s\": %s group%s: %s", rkcg->rkcg_group_id->str,
+ rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT
+ ? "Joining"
+ : "Rejoining",
+ astr, reason);
+ } else {
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "NOREJOIN",
+ "Group \"%s\": Not %s group%s: %s: "
+ "no subscribed topics",
+ rkcg->rkcg_group_id->str,
+ rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT
+ ? "joining"
+ : "rejoining",
+ astr, reason);
+
+ rd_kafka_cgrp_leave_maybe(rkcg);
+ }
+
+ rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_INIT);
+}
+
+
+/**
+ * @brief Collect all assigned or owned partitions from group members.
+ * The member field of each result element is set to the associated
+ * group member. The members_match field is set to rd_false.
+ *
+ * @param members Array of group members.
+ * @param member_cnt Number of elements in members.
+ * @param par_cnt The total number of partitions expected to be collected.
+ * @param collect_owned If rd_true, rkgm_owned partitions will be collected,
+ * else rkgm_assignment partitions will be collected.
+ */
+static map_toppar_member_info_t *
+rd_kafka_collect_partitions(const rd_kafka_group_member_t *members,
+ size_t member_cnt,
+ size_t par_cnt,
+ rd_bool_t collect_owned) {
+ size_t i;
+ map_toppar_member_info_t *collected = rd_calloc(1, sizeof(*collected));
+
+ RD_MAP_INIT(collected, par_cnt, rd_kafka_topic_partition_cmp,
+ rd_kafka_topic_partition_hash,
+ rd_kafka_topic_partition_destroy_free,
+ PartitionMemberInfo_free);
+
+ for (i = 0; i < member_cnt; i++) {
+ size_t j;
+ const rd_kafka_group_member_t *rkgm = &members[i];
+ const rd_kafka_topic_partition_list_t *toppars =
+ collect_owned ? rkgm->rkgm_owned : rkgm->rkgm_assignment;
+
+ for (j = 0; j < (size_t)toppars->cnt; j++) {
+ rd_kafka_topic_partition_t *rktpar =
+ rd_kafka_topic_partition_copy(&toppars->elems[j]);
+ PartitionMemberInfo_t *pmi =
+ PartitionMemberInfo_new(rkgm, rd_false);
+ RD_MAP_SET(collected, rktpar, pmi);
+ }
+ }
+
+ return collected;
+}
+
+
+/**
+ * @brief Set intersection. Returns a set of all elements of \p a that
+ * are also elements of \p b. Additionally, compares the members
+ * field of matching elements from \p a and \p b and if not NULL
+ * and equal, sets the members_match field in the result element
+ * to rd_true and the member field to equal that of the elements,
+ * else sets the members_match field to rd_false and member field
+ * to NULL.
+ */
+static map_toppar_member_info_t *
+rd_kafka_member_partitions_intersect(map_toppar_member_info_t *a,
+ map_toppar_member_info_t *b) {
+ const rd_kafka_topic_partition_t *key;
+ const PartitionMemberInfo_t *a_v;
+ map_toppar_member_info_t *intersection =
+ rd_calloc(1, sizeof(*intersection));
+
+ RD_MAP_INIT(
+ intersection, RD_MIN(a ? RD_MAP_CNT(a) : 1, b ? RD_MAP_CNT(b) : 1),
+ rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash,
+ rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free);
+
+ if (!a || !b)
+ return intersection;
+
+ RD_MAP_FOREACH(key, a_v, a) {
+ rd_bool_t members_match;
+ const PartitionMemberInfo_t *b_v = RD_MAP_GET(b, key);
+
+ if (b_v == NULL)
+ continue;
+
+ members_match =
+ a_v->member && b_v->member &&
+ rd_kafka_group_member_cmp(a_v->member, b_v->member) == 0;
+
+ RD_MAP_SET(intersection, rd_kafka_topic_partition_copy(key),
+ PartitionMemberInfo_new(b_v->member, members_match));
+ }
+
+ return intersection;
+}
+
+
+/**
+ * @brief Set subtraction. Returns a set of all elements of \p a
+ * that are not elements of \p b. Sets the member field in
+ * elements in the returned set to equal that of the
+ * corresponding element in \p a
+ */
+static map_toppar_member_info_t *
+rd_kafka_member_partitions_subtract(map_toppar_member_info_t *a,
+ map_toppar_member_info_t *b) {
+ const rd_kafka_topic_partition_t *key;
+ const PartitionMemberInfo_t *a_v;
+ map_toppar_member_info_t *difference =
+ rd_calloc(1, sizeof(*difference));
+
+ RD_MAP_INIT(difference, a ? RD_MAP_CNT(a) : 1,
+ rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash,
+ rd_kafka_topic_partition_destroy_free,
+ PartitionMemberInfo_free);
+
+ if (!a)
+ return difference;
+
+ RD_MAP_FOREACH(key, a_v, a) {
+ const PartitionMemberInfo_t *b_v =
+ b ? RD_MAP_GET(b, key) : NULL;
+
+ if (!b_v)
+ RD_MAP_SET(
+ difference, rd_kafka_topic_partition_copy(key),
+ PartitionMemberInfo_new(a_v->member, rd_false));
+ }
+
+ return difference;
+}
+
+
+/**
+ * @brief Adjust the partition assignment as provided by the assignor
+ * according to the COOPERATIVE protocol.
+ */
+static void rd_kafka_cooperative_protocol_adjust_assignment(
+ rd_kafka_cgrp_t *rkcg,
+ rd_kafka_group_member_t *members,
+ int member_cnt) {
+
+ /* https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafk\
+ a+Consumer+Incremental+Rebalance+Protocol */
+
+ int i;
+ int expected_max_assignment_size;
+ int total_assigned = 0;
+ int not_revoking = 0;
+ size_t par_cnt = 0;
+ const rd_kafka_topic_partition_t *toppar;
+ const PartitionMemberInfo_t *pmi;
+ map_toppar_member_info_t *assigned;
+ map_toppar_member_info_t *owned;
+ map_toppar_member_info_t *maybe_revoking;
+ map_toppar_member_info_t *ready_to_migrate;
+ map_toppar_member_info_t *unknown_but_owned;
+
+ for (i = 0; i < member_cnt; i++)
+ par_cnt += members[i].rkgm_owned->cnt;
+
+ assigned = rd_kafka_collect_partitions(members, member_cnt, par_cnt,
+ rd_false /*assigned*/);
+
+ owned = rd_kafka_collect_partitions(members, member_cnt, par_cnt,
+ rd_true /*owned*/);
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
+ "Group \"%s\": Partitions owned by members: %d, "
+ "partitions assigned by assignor: %d",
+ rkcg->rkcg_group_id->str, (int)RD_MAP_CNT(owned),
+ (int)RD_MAP_CNT(assigned));
+
+ /* Still owned by some members */
+ maybe_revoking = rd_kafka_member_partitions_intersect(assigned, owned);
+
+ /* Not previously owned by anyone */
+ ready_to_migrate = rd_kafka_member_partitions_subtract(assigned, owned);
+
+ /* Don't exist in assigned partitions */
+ unknown_but_owned =
+ rd_kafka_member_partitions_subtract(owned, assigned);
+
+ /* Rough guess at a size that is a bit higher than
+ * the maximum number of partitions likely to be
+ * assigned to any partition. */
+ expected_max_assignment_size =
+ (int)(RD_MAP_CNT(assigned) / member_cnt) + 4;
+
+ for (i = 0; i < member_cnt; i++) {
+ rd_kafka_group_member_t *rkgm = &members[i];
+ rd_kafka_topic_partition_list_destroy(rkgm->rkgm_assignment);
+
+ rkgm->rkgm_assignment = rd_kafka_topic_partition_list_new(
+ expected_max_assignment_size);
+ }
+
+ /* For maybe-revoking-partitions, check if the owner has
+ * changed. If yes, exclude them from the assigned-partitions
+ * list to the new owner. The old owner will realize it does
+ * not own it any more, revoke it and then trigger another
+ * rebalance for these partitions to finally be reassigned.
+ */
+ RD_MAP_FOREACH(toppar, pmi, maybe_revoking) {
+ if (!pmi->members_match)
+ /* Owner has changed. */
+ continue;
+
+ /* Owner hasn't changed. */
+ rd_kafka_topic_partition_list_add(pmi->member->rkgm_assignment,
+ toppar->topic,
+ toppar->partition);
+
+ total_assigned++;
+ not_revoking++;
+ }
+
+ /* For ready-to-migrate-partitions, it is safe to move them
+ * to the new member immediately since we know no one owns
+ * it before, and hence we can encode the owner from the
+ * newly-assigned-partitions directly.
+ */
+ RD_MAP_FOREACH(toppar, pmi, ready_to_migrate) {
+ rd_kafka_topic_partition_list_add(pmi->member->rkgm_assignment,
+ toppar->topic,
+ toppar->partition);
+ total_assigned++;
+ }
+
+ /* For unknown-but-owned-partitions, it is also safe to just
+ * give them back to whoever claimed to be their owners by
+ * encoding them directly as well. If this is due to a topic
+ * metadata update, then a later rebalance will be triggered
+ * anyway.
+ */
+ RD_MAP_FOREACH(toppar, pmi, unknown_but_owned) {
+ rd_kafka_topic_partition_list_add(pmi->member->rkgm_assignment,
+ toppar->topic,
+ toppar->partition);
+ total_assigned++;
+ }
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
+ "Group \"%s\": COOPERATIVE protocol collection sizes: "
+ "maybe revoking: %d, ready to migrate: %d, unknown but "
+ "owned: %d",
+ rkcg->rkcg_group_id->str, (int)RD_MAP_CNT(maybe_revoking),
+ (int)RD_MAP_CNT(ready_to_migrate),
+ (int)RD_MAP_CNT(unknown_but_owned));
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRP",
+ "Group \"%s\": %d partitions assigned to consumers",
+ rkcg->rkcg_group_id->str, total_assigned);
+
+ RD_MAP_DESTROY_AND_FREE(maybe_revoking);
+ RD_MAP_DESTROY_AND_FREE(ready_to_migrate);
+ RD_MAP_DESTROY_AND_FREE(unknown_but_owned);
+ RD_MAP_DESTROY_AND_FREE(assigned);
+ RD_MAP_DESTROY_AND_FREE(owned);
+}
+
+
+/**
+ * @brief Parses and handles the MemberState from a SyncGroupResponse.
+ */
+static void rd_kafka_cgrp_handle_SyncGroup_memberstate(
+ rd_kafka_cgrp_t *rkcg,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ const rd_kafkap_bytes_t *member_state) {
+ rd_kafka_buf_t *rkbuf = NULL;
+ rd_kafka_topic_partition_list_t *assignment = NULL;
+ const int log_decode_errors = LOG_ERR;
+ int16_t Version;
+ rd_kafkap_bytes_t UserData;
+
+ /* Dont handle new assignments when terminating */
+ if (!err && rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
+ err = RD_KAFKA_RESP_ERR__DESTROY;
+
+ if (err)
+ goto err;
+
+ if (RD_KAFKAP_BYTES_LEN(member_state) == 0) {
+ /* Empty assignment. */
+ assignment = rd_kafka_topic_partition_list_new(0);
+ memset(&UserData, 0, sizeof(UserData));
+ goto done;
+ }
+
+ /* Parse assignment from MemberState */
+ rkbuf = rd_kafka_buf_new_shadow(
+ member_state->data, RD_KAFKAP_BYTES_LEN(member_state), NULL);
+ /* Protocol parser needs a broker handle to log errors on. */
+ if (rkb) {
+ rkbuf->rkbuf_rkb = rkb;
+ rd_kafka_broker_keep(rkb);
+ } else
+ rkbuf->rkbuf_rkb = rd_kafka_broker_internal(rkcg->rkcg_rk);
+
+ rd_kafka_buf_read_i16(rkbuf, &Version);
+ const rd_kafka_topic_partition_field_t fields[] = {
+ RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
+ RD_KAFKA_TOPIC_PARTITION_FIELD_END};
+ if (!(assignment =
+ rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields)))
+ goto err_parse;
+ rd_kafka_buf_read_bytes(rkbuf, &UserData);
+
+done:
+ rd_kafka_cgrp_update_session_timeout(rkcg, rd_true /*reset timeout*/);
+
+ rd_assert(rkcg->rkcg_assignor);
+ if (rkcg->rkcg_assignor->rkas_on_assignment_cb) {
+ char *member_id;
+ RD_KAFKAP_STR_DUPA(&member_id, rkcg->rkcg_member_id);
+ rd_kafka_consumer_group_metadata_t *cgmd =
+ rd_kafka_consumer_group_metadata_new_with_genid(
+ rkcg->rkcg_rk->rk_conf.group_id_str,
+ rkcg->rkcg_generation_id, member_id,
+ rkcg->rkcg_rk->rk_conf.group_instance_id);
+ rkcg->rkcg_assignor->rkas_on_assignment_cb(
+ rkcg->rkcg_assignor, &(rkcg->rkcg_assignor_state),
+ assignment, &UserData, cgmd);
+ rd_kafka_consumer_group_metadata_destroy(cgmd);
+ }
+
+ // FIXME: Remove when we're done debugging.
+ rd_kafka_topic_partition_list_log(rkcg->rkcg_rk, "ASSIGNMENT",
+ RD_KAFKA_DBG_CGRP, assignment);
+
+ /* Set the new assignment */
+ rd_kafka_cgrp_handle_assignment(rkcg, assignment);
+
+ rd_kafka_topic_partition_list_destroy(assignment);
+
+ if (rkbuf)
+ rd_kafka_buf_destroy(rkbuf);
+
+ return;
+
+err_parse:
+ err = rkbuf->rkbuf_err;
+
+err:
+ if (rkbuf)
+ rd_kafka_buf_destroy(rkbuf);
+
+ if (assignment)
+ rd_kafka_topic_partition_list_destroy(assignment);
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPSYNC",
+ "Group \"%s\": synchronization failed: %s: rejoining",
+ rkcg->rkcg_group_id->str, rd_kafka_err2str(err));
+
+ if (err == RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID)
+ rd_kafka_set_fatal_error(rkcg->rkcg_rk, err,
+ "Fatal consumer error: %s",
+ rd_kafka_err2str(err));
+ else if (err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION)
+ rkcg->rkcg_generation_id = -1;
+ else if (err == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
+ rd_kafka_cgrp_set_member_id(rkcg, "");
+
+ if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
+ RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE &&
+ (err == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION ||
+ err == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID))
+ rd_kafka_cgrp_revoke_all_rejoin(
+ rkcg, rd_true /*assignment is lost*/,
+ rd_true /*this consumer is initiating*/, "SyncGroup error");
+ else
+ rd_kafka_cgrp_rejoin(rkcg, "SyncGroup error: %s",
+ rd_kafka_err2str(err));
+}
+
+
+
+/**
+ * @brief Cgrp handler for SyncGroup responses. opaque must be the cgrp handle.
+ */
+static void rd_kafka_cgrp_handle_SyncGroup(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_cgrp_t *rkcg = opaque;
+ const int log_decode_errors = LOG_ERR;
+ int16_t ErrorCode = 0;
+ rd_kafkap_bytes_t MemberState = RD_ZERO_INIT;
+ int actions;
+
+ if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
+ rd_kafka_dbg(
+ rkb->rkb_rk, CGRP, "SYNCGROUP",
+ "SyncGroup response: discarding outdated request "
+ "(now in join-state %s)",
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+ rd_kafka_cgrp_clear_wait_resp(rkcg, RD_KAFKAP_SyncGroup);
+ return;
+ }
+
+ if (err) {
+ ErrorCode = err;
+ goto err;
+ }
+
+ if (request->rkbuf_reqhdr.ApiVersion >= 1)
+ rd_kafka_buf_read_throttle_time(rkbuf);
+
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+ rd_kafka_buf_read_bytes(rkbuf, &MemberState);
+
+err:
+ actions = rd_kafka_err_action(rkb, ErrorCode, request,
+ RD_KAFKA_ERR_ACTION_END);
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ /* Re-query for coordinator */
+ rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
+ RD_KAFKA_OP_COORD_QUERY, ErrorCode);
+ /* FALLTHRU */
+ }
+
+ if (actions & RD_KAFKA_ERR_ACTION_RETRY) {
+ if (rd_kafka_buf_retry(rkb, request))
+ return;
+ /* FALLTHRU */
+ }
+
+ rd_kafka_dbg(rkb->rkb_rk, CGRP, "SYNCGROUP",
+ "SyncGroup response: %s (%d bytes of MemberState data)",
+ rd_kafka_err2str(ErrorCode),
+ RD_KAFKAP_BYTES_LEN(&MemberState));
+
+ rd_kafka_cgrp_clear_wait_resp(rkcg, RD_KAFKAP_SyncGroup);
+
+ if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
+ return; /* Termination */
+
+ rd_kafka_cgrp_handle_SyncGroup_memberstate(rkcg, rkb, ErrorCode,
+ &MemberState);
+
+ return;
+
+err_parse:
+ ErrorCode = rkbuf->rkbuf_err;
+ goto err;
+}
+
+
+/**
+ * @brief Run group assignment.
+ */
+static void rd_kafka_cgrp_assignor_run(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_assignor_t *rkas,
+ rd_kafka_resp_err_t err,
+ rd_kafka_metadata_t *metadata,
+ rd_kafka_group_member_t *members,
+ int member_cnt) {
+ char errstr[512];
+
+ if (err) {
+ rd_snprintf(errstr, sizeof(errstr),
+ "Failed to get cluster metadata: %s",
+ rd_kafka_err2str(err));
+ goto err;
+ }
+
+ *errstr = '\0';
+
+ /* Run assignor */
+ err = rd_kafka_assignor_run(rkcg, rkas, metadata, members, member_cnt,
+ errstr, sizeof(errstr));
+
+ if (err) {
+ if (!*errstr)
+ rd_snprintf(errstr, sizeof(errstr), "%s",
+ rd_kafka_err2str(err));
+ goto err;
+ }
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "ASSIGNOR",
+ "Group \"%s\": \"%s\" assignor run for %d member(s)",
+ rkcg->rkcg_group_id->str, rkas->rkas_protocol_name->str,
+ member_cnt);
+
+ if (rkas->rkas_protocol == RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE)
+ rd_kafka_cooperative_protocol_adjust_assignment(rkcg, members,
+ member_cnt);
+
+ rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC);
+
+ rd_kafka_cgrp_set_wait_resp(rkcg, RD_KAFKAP_SyncGroup);
+
+ /* Respond to broker with assignment set or error */
+ rd_kafka_SyncGroupRequest(
+ rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_generation_id,
+ rkcg->rkcg_member_id, rkcg->rkcg_group_instance_id, members,
+ err ? 0 : member_cnt, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+ rd_kafka_cgrp_handle_SyncGroup, rkcg);
+ return;
+
+err:
+ rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "ASSIGNOR",
+ "Group \"%s\": failed to run assignor \"%s\" for "
+ "%d member(s): %s",
+ rkcg->rkcg_group_id->str, rkas->rkas_protocol_name->str,
+ member_cnt, errstr);
+
+ rd_kafka_cgrp_rejoin(rkcg, "%s assignor failed: %s",
+ rkas->rkas_protocol_name->str, errstr);
+}
+
+
+
+/**
+ * @brief Op callback from handle_JoinGroup
+ */
+static rd_kafka_op_res_t
+rd_kafka_cgrp_assignor_handle_Metadata_op(rd_kafka_t *rk,
+ rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko) {
+ rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+
+ if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
+ return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
+
+ if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA)
+ return RD_KAFKA_OP_RES_HANDLED; /* From outdated state */
+
+ if (!rkcg->rkcg_group_leader.members) {
+ rd_kafka_dbg(rk, CGRP, "GRPLEADER",
+ "Group \"%.*s\": no longer leader: "
+ "not running assignor",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
+ return RD_KAFKA_OP_RES_HANDLED;
+ }
+
+ rd_kafka_cgrp_assignor_run(rkcg, rkcg->rkcg_assignor, rko->rko_err,
+ rko->rko_u.metadata.md,
+ rkcg->rkcg_group_leader.members,
+ rkcg->rkcg_group_leader.member_cnt);
+
+ return RD_KAFKA_OP_RES_HANDLED;
+}
+
+
+/**
+ * Parse single JoinGroup.Members.MemberMetadata for "consumer" ProtocolType
+ *
+ * Protocol definition:
+ * https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
+ *
+ * Returns 0 on success or -1 on error.
+ */
+static int rd_kafka_group_MemberMetadata_consumer_read(
+ rd_kafka_broker_t *rkb,
+ rd_kafka_group_member_t *rkgm,
+ const rd_kafkap_bytes_t *MemberMetadata) {
+
+ rd_kafka_buf_t *rkbuf;
+ int16_t Version;
+ int32_t subscription_cnt;
+ rd_kafkap_bytes_t UserData;
+ const int log_decode_errors = LOG_ERR;
+ rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__BAD_MSG;
+
+ /* Create a shadow-buffer pointing to the metadata to ease parsing. */
+ rkbuf = rd_kafka_buf_new_shadow(
+ MemberMetadata->data, RD_KAFKAP_BYTES_LEN(MemberMetadata), NULL);
+
+ /* Protocol parser needs a broker handle to log errors on. */
+ rkbuf->rkbuf_rkb = rkb;
+ rd_kafka_broker_keep(rkb);
+
+ rd_kafka_buf_read_i16(rkbuf, &Version);
+ rd_kafka_buf_read_i32(rkbuf, &subscription_cnt);
+
+ if (subscription_cnt > 10000 || subscription_cnt <= 0)
+ goto err;
+
+ rkgm->rkgm_subscription =
+ rd_kafka_topic_partition_list_new(subscription_cnt);
+
+ while (subscription_cnt-- > 0) {
+ rd_kafkap_str_t Topic;
+ char *topic_name;
+ rd_kafka_buf_read_str(rkbuf, &Topic);
+ RD_KAFKAP_STR_DUPA(&topic_name, &Topic);
+ rd_kafka_topic_partition_list_add(
+ rkgm->rkgm_subscription, topic_name, RD_KAFKA_PARTITION_UA);
+ }
+
+ rd_kafka_buf_read_bytes(rkbuf, &UserData);
+ rkgm->rkgm_userdata = rd_kafkap_bytes_copy(&UserData);
+
+ const rd_kafka_topic_partition_field_t fields[] = {
+ RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION,
+ RD_KAFKA_TOPIC_PARTITION_FIELD_END};
+ if (Version >= 1 &&
+ !(rkgm->rkgm_owned =
+ rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields)))
+ goto err;
+
+ rd_kafka_buf_destroy(rkbuf);
+
+ return 0;
+
+err_parse:
+ err = rkbuf->rkbuf_err;
+
+err:
+ rd_rkb_dbg(rkb, CGRP, "MEMBERMETA",
+ "Failed to parse MemberMetadata for \"%.*s\": %s",
+ RD_KAFKAP_STR_PR(rkgm->rkgm_member_id),
+ rd_kafka_err2str(err));
+ if (rkgm->rkgm_subscription) {
+ rd_kafka_topic_partition_list_destroy(rkgm->rkgm_subscription);
+ rkgm->rkgm_subscription = NULL;
+ }
+
+ rd_kafka_buf_destroy(rkbuf);
+ return -1;
+}
+
+
+/**
+ * @brief cgrp handler for JoinGroup responses
+ * opaque must be the cgrp handle.
+ *
+ * @locality rdkafka main thread (unless ERR__DESTROY: arbitrary thread)
+ */
+static void rd_kafka_cgrp_handle_JoinGroup(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_cgrp_t *rkcg = opaque;
+ const int log_decode_errors = LOG_ERR;
+ int16_t ErrorCode = 0;
+ int32_t GenerationId;
+ rd_kafkap_str_t Protocol, LeaderId;
+ rd_kafkap_str_t MyMemberId = RD_KAFKAP_STR_INITIALIZER;
+ int32_t member_cnt;
+ int actions;
+ int i_am_leader = 0;
+ rd_kafka_assignor_t *rkas = NULL;
+
+ rd_kafka_cgrp_clear_wait_resp(rkcg, RD_KAFKAP_JoinGroup);
+
+ if (err == RD_KAFKA_RESP_ERR__DESTROY ||
+ rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
+ return; /* Terminating */
+
+ if (rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN) {
+ rd_kafka_dbg(
+ rkb->rkb_rk, CGRP, "JOINGROUP",
+ "JoinGroup response: discarding outdated request "
+ "(now in join-state %s)",
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+ return;
+ }
+
+ if (err) {
+ ErrorCode = err;
+ goto err;
+ }
+
+ if (request->rkbuf_reqhdr.ApiVersion >= 2)
+ rd_kafka_buf_read_throttle_time(rkbuf);
+
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+ rd_kafka_buf_read_i32(rkbuf, &GenerationId);
+ rd_kafka_buf_read_str(rkbuf, &Protocol);
+ rd_kafka_buf_read_str(rkbuf, &LeaderId);
+ rd_kafka_buf_read_str(rkbuf, &MyMemberId);
+ rd_kafka_buf_read_i32(rkbuf, &member_cnt);
+
+ if (!ErrorCode && RD_KAFKAP_STR_IS_NULL(&Protocol)) {
+ /* Protocol not set, we will not be able to find
+ * a matching assignor so error out early. */
+ ErrorCode = RD_KAFKA_RESP_ERR__BAD_MSG;
+ } else if (!ErrorCode) {
+ char *protocol_name;
+ RD_KAFKAP_STR_DUPA(&protocol_name, &Protocol);
+ if (!(rkas = rd_kafka_assignor_find(rkcg->rkcg_rk,
+ protocol_name)) ||
+ !rkas->rkas_enabled) {
+ rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
+ "Unsupported assignment strategy \"%s\"",
+ protocol_name);
+ if (rkcg->rkcg_assignor) {
+ if (rkcg->rkcg_assignor->rkas_destroy_state_cb)
+ rkcg->rkcg_assignor
+ ->rkas_destroy_state_cb(
+ rkcg->rkcg_assignor_state);
+ rkcg->rkcg_assignor_state = NULL;
+ rkcg->rkcg_assignor = NULL;
+ }
+ ErrorCode = RD_KAFKA_RESP_ERR__UNKNOWN_PROTOCOL;
+ }
+ }
+
+ rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
+ "JoinGroup response: GenerationId %" PRId32
+ ", "
+ "Protocol %.*s, LeaderId %.*s%s, my MemberId %.*s, "
+ "member metadata count "
+ "%" PRId32 ": %s",
+ GenerationId, RD_KAFKAP_STR_PR(&Protocol),
+ RD_KAFKAP_STR_PR(&LeaderId),
+ RD_KAFKAP_STR_LEN(&MyMemberId) &&
+ !rd_kafkap_str_cmp(&LeaderId, &MyMemberId)
+ ? " (me)"
+ : "",
+ RD_KAFKAP_STR_PR(&MyMemberId), member_cnt,
+ ErrorCode ? rd_kafka_err2str(ErrorCode) : "(no error)");
+
+ if (!ErrorCode) {
+ char *my_member_id;
+ RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId);
+ rd_kafka_cgrp_set_member_id(rkcg, my_member_id);
+ rkcg->rkcg_generation_id = GenerationId;
+ i_am_leader = !rd_kafkap_str_cmp(&LeaderId, &MyMemberId);
+ } else {
+ rd_interval_backoff(&rkcg->rkcg_join_intvl, 1000 * 1000);
+ goto err;
+ }
+
+ if (rkcg->rkcg_assignor && rkcg->rkcg_assignor != rkas) {
+ if (rkcg->rkcg_assignor->rkas_destroy_state_cb)
+ rkcg->rkcg_assignor->rkas_destroy_state_cb(
+ rkcg->rkcg_assignor_state);
+ rkcg->rkcg_assignor_state = NULL;
+ }
+ rkcg->rkcg_assignor = rkas;
+
+ if (i_am_leader) {
+ rd_kafka_group_member_t *members;
+ int i;
+ int sub_cnt = 0;
+ rd_list_t topics;
+ rd_kafka_op_t *rko;
+ rd_kafka_dbg(rkb->rkb_rk, CGRP, "JOINGROUP",
+ "I am elected leader for group \"%s\" "
+ "with %" PRId32 " member(s)",
+ rkcg->rkcg_group_id->str, member_cnt);
+
+ if (member_cnt > 100000) {
+ err = RD_KAFKA_RESP_ERR__BAD_MSG;
+ goto err;
+ }
+
+ rd_list_init(&topics, member_cnt, rd_free);
+
+ members = rd_calloc(member_cnt, sizeof(*members));
+
+ for (i = 0; i < member_cnt; i++) {
+ rd_kafkap_str_t MemberId;
+ rd_kafkap_bytes_t MemberMetadata;
+ rd_kafka_group_member_t *rkgm;
+ rd_kafkap_str_t GroupInstanceId =
+ RD_KAFKAP_STR_INITIALIZER;
+
+ rd_kafka_buf_read_str(rkbuf, &MemberId);
+ if (request->rkbuf_reqhdr.ApiVersion >= 5)
+ rd_kafka_buf_read_str(rkbuf, &GroupInstanceId);
+ rd_kafka_buf_read_bytes(rkbuf, &MemberMetadata);
+
+ rkgm = &members[sub_cnt];
+ rkgm->rkgm_member_id = rd_kafkap_str_copy(&MemberId);
+ rkgm->rkgm_group_instance_id =
+ rd_kafkap_str_copy(&GroupInstanceId);
+ rd_list_init(&rkgm->rkgm_eligible, 0, NULL);
+ rkgm->rkgm_generation = -1;
+
+ if (rd_kafka_group_MemberMetadata_consumer_read(
+ rkb, rkgm, &MemberMetadata)) {
+ /* Failed to parse this member's metadata,
+ * ignore it. */
+ } else {
+ sub_cnt++;
+ rkgm->rkgm_assignment =
+ rd_kafka_topic_partition_list_new(
+ rkgm->rkgm_subscription->cnt);
+ rd_kafka_topic_partition_list_get_topic_names(
+ rkgm->rkgm_subscription, &topics,
+ 0 /*dont include regex*/);
+ }
+ }
+
+ /* FIXME: What to do if parsing failed for some/all members?
+ * It is a sign of incompatibility. */
+
+
+ rd_kafka_cgrp_group_leader_reset(rkcg,
+ "JoinGroup response clean-up");
+
+ rd_kafka_assert(NULL, rkcg->rkcg_group_leader.members == NULL);
+ rkcg->rkcg_group_leader.members = members;
+ rkcg->rkcg_group_leader.member_cnt = sub_cnt;
+
+ rd_kafka_cgrp_set_join_state(
+ rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
+
+ /* The assignor will need metadata so fetch it asynchronously
+ * and run the assignor when we get a reply.
+ * Create a callback op that the generic metadata code
+ * will trigger when metadata has been parsed. */
+ rko = rd_kafka_op_new_cb(
+ rkcg->rkcg_rk, RD_KAFKA_OP_METADATA,
+ rd_kafka_cgrp_assignor_handle_Metadata_op);
+ rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, NULL);
+
+ rd_kafka_MetadataRequest(
+ rkb, &topics, "partition assignor",
+ rd_false /*!allow_auto_create*/,
+ /* cgrp_update=false:
+ * Since the subscription list may not be identical
+ * across all members of the group and thus the
+ * Metadata response may not be identical to this
+ * consumer's subscription list, we want to
+ * avoid triggering a rejoin or error propagation
+ * on receiving the response since some topics
+ * may be missing. */
+ rd_false, rko);
+ rd_list_destroy(&topics);
+
+ } else {
+ rd_kafka_cgrp_set_join_state(
+ rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC);
+
+ rd_kafka_cgrp_set_wait_resp(rkcg, RD_KAFKAP_SyncGroup);
+
+ rd_kafka_SyncGroupRequest(
+ rkb, rkcg->rkcg_group_id, rkcg->rkcg_generation_id,
+ rkcg->rkcg_member_id, rkcg->rkcg_group_instance_id, NULL, 0,
+ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+ rd_kafka_cgrp_handle_SyncGroup, rkcg);
+ }
+
+err:
+ actions = rd_kafka_err_action(
+ rkb, ErrorCode, request, RD_KAFKA_ERR_ACTION_IGNORE,
+ RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID,
+
+ RD_KAFKA_ERR_ACTION_IGNORE, RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED,
+
+ RD_KAFKA_ERR_ACTION_IGNORE, RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION,
+
+ RD_KAFKA_ERR_ACTION_PERMANENT, RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID,
+
+ RD_KAFKA_ERR_ACTION_END);
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ /* Re-query for coordinator */
+ rd_kafka_cgrp_op(rkcg, NULL, RD_KAFKA_NO_REPLYQ,
+ RD_KAFKA_OP_COORD_QUERY, ErrorCode);
+ }
+
+ /* No need for retries here since the join is intervalled,
+ * see rkcg_join_intvl */
+
+ if (ErrorCode) {
+ if (ErrorCode == RD_KAFKA_RESP_ERR__DESTROY)
+ return; /* Termination */
+
+ if (ErrorCode == RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID) {
+ rd_kafka_set_fatal_error(rkcg->rkcg_rk, ErrorCode,
+ "Fatal consumer error: %s",
+ rd_kafka_err2str(ErrorCode));
+ ErrorCode = RD_KAFKA_RESP_ERR__FATAL;
+
+ } else if (actions & RD_KAFKA_ERR_ACTION_PERMANENT)
+ rd_kafka_consumer_err(
+ rkcg->rkcg_q, rd_kafka_broker_id(rkb), ErrorCode, 0,
+ NULL, NULL, RD_KAFKA_OFFSET_INVALID,
+ "JoinGroup failed: %s",
+ rd_kafka_err2str(ErrorCode));
+
+ if (ErrorCode == RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID)
+ rd_kafka_cgrp_set_member_id(rkcg, "");
+ else if (ErrorCode == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION)
+ rkcg->rkcg_generation_id = -1;
+ else if (ErrorCode == RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED) {
+ /* KIP-394 requires member.id on initial join
+ * group request */
+ char *my_member_id;
+ RD_KAFKAP_STR_DUPA(&my_member_id, &MyMemberId);
+ rd_kafka_cgrp_set_member_id(rkcg, my_member_id);
+ /* Skip the join backoff */
+ rd_interval_reset(&rkcg->rkcg_join_intvl);
+ }
+
+ if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
+ RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE &&
+ (ErrorCode == RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION ||
+ ErrorCode == RD_KAFKA_RESP_ERR_MEMBER_ID_REQUIRED))
+ rd_kafka_cgrp_revoke_all_rejoin(
+ rkcg, rd_true /*assignment is lost*/,
+ rd_true /*this consumer is initiating*/,
+ "JoinGroup error");
+ else
+ rd_kafka_cgrp_rejoin(rkcg, "JoinGroup error: %s",
+ rd_kafka_err2str(ErrorCode));
+ }
+
+ return;
+
+err_parse:
+ ErrorCode = rkbuf->rkbuf_err;
+ goto err;
+}
+
+
+/**
+ * @brief Check subscription against requested Metadata.
+ */
+static rd_kafka_op_res_t rd_kafka_cgrp_handle_Metadata_op(rd_kafka_t *rk,
+ rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko) {
+ rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+
+ if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY)
+ return RD_KAFKA_OP_RES_HANDLED; /* Terminating */
+
+ rd_kafka_cgrp_metadata_update_check(rkcg, rd_false /*dont rejoin*/);
+
+ return RD_KAFKA_OP_RES_HANDLED;
+}
+
+
+/**
+ * @brief (Async) Refresh metadata (for cgrp's needs)
+ *
+ * @returns 1 if metadata refresh was requested, or 0 if metadata is
+ * up to date, or -1 if no broker is available for metadata requests.
+ *
+ * @locks none
+ * @locality rdkafka main thread
+ */
+static int rd_kafka_cgrp_metadata_refresh(rd_kafka_cgrp_t *rkcg,
+ int *metadata_agep,
+ const char *reason) {
+ rd_kafka_t *rk = rkcg->rkcg_rk;
+ rd_kafka_op_t *rko;
+ rd_list_t topics;
+ rd_kafka_resp_err_t err;
+
+ rd_list_init(&topics, 8, rd_free);
+
+ /* Insert all non-wildcard topics in cache. */
+ rd_kafka_metadata_cache_hint_rktparlist(
+ rkcg->rkcg_rk, rkcg->rkcg_subscription, NULL, 0 /*dont replace*/);
+
+ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION) {
+ /* For wildcard subscriptions make sure the
+ * cached full metadata isn't too old. */
+ int metadata_age = -1;
+
+ if (rk->rk_ts_full_metadata)
+ metadata_age =
+ (int)(rd_clock() - rk->rk_ts_full_metadata) / 1000;
+
+ *metadata_agep = metadata_age;
+
+ if (metadata_age != -1 &&
+ metadata_age <= rk->rk_conf.metadata_max_age_ms) {
+ rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_METADATA,
+ "CGRPMETADATA",
+ "%s: metadata for wildcard subscription "
+ "is up to date (%dms old)",
+ reason, *metadata_agep);
+ rd_list_destroy(&topics);
+ return 0; /* Up-to-date */
+ }
+
+ } else {
+ /* Check that all subscribed topics are in the cache. */
+ int r;
+
+ rd_kafka_topic_partition_list_get_topic_names(
+ rkcg->rkcg_subscription, &topics, 0 /*no regexps*/);
+
+ rd_kafka_rdlock(rk);
+ r = rd_kafka_metadata_cache_topics_count_exists(rk, &topics,
+ metadata_agep);
+ rd_kafka_rdunlock(rk);
+
+ if (r == rd_list_cnt(&topics)) {
+ rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_METADATA,
+ "CGRPMETADATA",
+ "%s: metadata for subscription "
+ "is up to date (%dms old)",
+ reason, *metadata_agep);
+ rd_list_destroy(&topics);
+ return 0; /* Up-to-date and all topics exist. */
+ }
+
+ rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_METADATA, "CGRPMETADATA",
+ "%s: metadata for subscription "
+ "only available for %d/%d topics (%dms old)",
+ reason, r, rd_list_cnt(&topics), *metadata_agep);
+ }
+
+ /* Async request, result will be triggered from
+ * rd_kafka_parse_metadata(). */
+ rko = rd_kafka_op_new_cb(rkcg->rkcg_rk, RD_KAFKA_OP_METADATA,
+ rd_kafka_cgrp_handle_Metadata_op);
+ rd_kafka_op_set_replyq(rko, rkcg->rkcg_ops, 0);
+
+ err = rd_kafka_metadata_request(rkcg->rkcg_rk, NULL, &topics,
+ rd_false /*!allow auto create */,
+ rd_true /*cgrp_update*/, reason, rko);
+ if (err) {
+ rd_kafka_dbg(rk, CGRP | RD_KAFKA_DBG_METADATA, "CGRPMETADATA",
+ "%s: need to refresh metadata (%dms old) "
+ "but no usable brokers available: %s",
+ reason, *metadata_agep, rd_kafka_err2str(err));
+ rd_kafka_op_destroy(rko);
+ }
+
+ rd_list_destroy(&topics);
+
+ return err ? -1 : 1;
+}
+
+
+
+static void rd_kafka_cgrp_join(rd_kafka_cgrp_t *rkcg) {
+ int metadata_age;
+
+ if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP ||
+ rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_INIT ||
+ rd_kafka_cgrp_awaiting_response(rkcg))
+ return;
+
+ /* On max.poll.interval.ms failure, do not rejoin group until the
+ * application has called poll. */
+ if ((rkcg->rkcg_flags & RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED) &&
+ rd_kafka_max_poll_exceeded(rkcg->rkcg_rk))
+ return;
+
+ rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED;
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "JOIN",
+ "Group \"%.*s\": join with %d subscribed topic(s)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_list_cnt(rkcg->rkcg_subscribed_topics));
+
+
+ /* See if we need to query metadata to continue:
+ * - if subscription contains wildcards:
+ * * query all topics in cluster
+ *
+ * - if subscription does not contain wildcards but
+ * some topics are missing from the local metadata cache:
+ * * query subscribed topics (all cached ones)
+ *
+ * - otherwise:
+ * * rely on topic metadata cache
+ */
+ /* We need up-to-date full metadata to continue,
+ * refresh metadata if necessary. */
+ if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
+ "consumer join") == 1) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER,
+ "JOIN",
+ "Group \"%.*s\": "
+ "postponing join until up-to-date "
+ "metadata is available",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
+
+ rd_assert(
+ rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT ||
+ /* Possible via rd_kafka_cgrp_modify_subscription */
+ rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY);
+
+ rd_kafka_cgrp_set_join_state(
+ rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
+
+ return; /* ^ async call */
+ }
+
+ if (rd_list_empty(rkcg->rkcg_subscribed_topics))
+ rd_kafka_cgrp_metadata_update_check(rkcg,
+ rd_false /*dont join*/);
+
+ if (rd_list_empty(rkcg->rkcg_subscribed_topics)) {
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "JOIN",
+ "Group \"%.*s\": "
+ "no matching topics based on %dms old metadata: "
+ "next metadata refresh in %dms",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), metadata_age,
+ rkcg->rkcg_rk->rk_conf.metadata_refresh_interval_ms -
+ metadata_age);
+ return;
+ }
+
+ rd_rkb_dbg(
+ rkcg->rkcg_curr_coord, CONSUMER | RD_KAFKA_DBG_CGRP, "JOIN",
+ "Joining group \"%.*s\" with %d subscribed topic(s) and "
+ "member id \"%.*s\"",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_list_cnt(rkcg->rkcg_subscribed_topics),
+ rkcg->rkcg_member_id ? RD_KAFKAP_STR_LEN(rkcg->rkcg_member_id) : 0,
+ rkcg->rkcg_member_id ? rkcg->rkcg_member_id->str : "");
+
+
+ rd_kafka_cgrp_set_join_state(rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN);
+
+ rd_kafka_cgrp_set_wait_resp(rkcg, RD_KAFKAP_JoinGroup);
+
+ rd_kafka_JoinGroupRequest(
+ rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_member_id,
+ rkcg->rkcg_group_instance_id,
+ rkcg->rkcg_rk->rk_conf.group_protocol_type,
+ rkcg->rkcg_subscribed_topics, RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+ rd_kafka_cgrp_handle_JoinGroup, rkcg);
+}
+
+/**
+ * Rejoin group on update to effective subscribed topics list
+ */
+static void rd_kafka_cgrp_revoke_rejoin(rd_kafka_cgrp_t *rkcg,
+ const char *reason) {
+ /*
+ * Clean-up group leader duties, if any.
+ */
+ rd_kafka_cgrp_group_leader_reset(rkcg, "group (re)join");
+
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CGRP, "REJOIN",
+ "Group \"%.*s\" (re)joining in join-state %s "
+ "with %d assigned partition(s): %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0,
+ reason);
+
+ rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_false /*not lost*/,
+ rd_true /*initiating*/, reason);
+}
+
+/**
+ * @brief Update the effective list of subscribed topics.
+ *
+ * Set \p tinfos to NULL to clear the list.
+ *
+ * @param tinfos rd_list_t(rd_kafka_topic_info_t *): new effective topic list
+ *
+ * @returns true on change, else false.
+ *
+ * @remark Takes ownership of \p tinfos
+ */
+static rd_bool_t rd_kafka_cgrp_update_subscribed_topics(rd_kafka_cgrp_t *rkcg,
+ rd_list_t *tinfos) {
+ rd_kafka_topic_info_t *tinfo;
+ int i;
+
+ if (!tinfos) {
+ if (!rd_list_empty(rkcg->rkcg_subscribed_topics))
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION",
+ "Group \"%.*s\": "
+ "clearing subscribed topics list (%d)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_list_cnt(rkcg->rkcg_subscribed_topics));
+ tinfos = rd_list_new(0, (void *)rd_kafka_topic_info_destroy);
+
+ } else {
+ if (rd_list_cnt(tinfos) == 0)
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIPTION",
+ "Group \"%.*s\": "
+ "no topics in metadata matched "
+ "subscription",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
+ }
+
+ /* Sort for comparison */
+ rd_list_sort(tinfos, rd_kafka_topic_info_cmp);
+
+ /* Compare to existing to see if anything changed. */
+ if (!rd_list_cmp(rkcg->rkcg_subscribed_topics, tinfos,
+ rd_kafka_topic_info_cmp)) {
+ /* No change */
+ rd_list_destroy(tinfos);
+ return rd_false;
+ }
+
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_METADATA, "SUBSCRIPTION",
+ "Group \"%.*s\": effective subscription list changed "
+ "from %d to %d topic(s):",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_list_cnt(rkcg->rkcg_subscribed_topics), rd_list_cnt(tinfos));
+
+ RD_LIST_FOREACH(tinfo, tinfos, i)
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_METADATA,
+ "SUBSCRIPTION", " Topic %s with %d partition(s)",
+ tinfo->topic, tinfo->partition_cnt);
+
+ rd_list_destroy(rkcg->rkcg_subscribed_topics);
+
+ rkcg->rkcg_subscribed_topics = tinfos;
+
+ return rd_true;
+}
+
+
+/**
+ * @brief Handle Heartbeat response.
+ */
+void rd_kafka_cgrp_handle_Heartbeat(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+ const int log_decode_errors = LOG_ERR;
+ int16_t ErrorCode = 0;
+ int actions = 0;
+
+ if (err == RD_KAFKA_RESP_ERR__DESTROY)
+ return;
+
+ rd_dassert(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT);
+ rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
+
+ rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+
+ if (err)
+ goto err;
+
+ if (request->rkbuf_reqhdr.ApiVersion >= 1)
+ rd_kafka_buf_read_throttle_time(rkbuf);
+
+ rd_kafka_buf_read_i16(rkbuf, &ErrorCode);
+ if (ErrorCode) {
+ err = ErrorCode;
+ goto err;
+ }
+
+ rd_kafka_cgrp_update_session_timeout(
+ rkcg, rd_false /*don't update if session has expired*/);
+
+ return;
+
+err_parse:
+ err = rkbuf->rkbuf_err;
+err:
+ rkcg->rkcg_last_heartbeat_err = err;
+
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CGRP, "HEARTBEAT",
+ "Group \"%s\" heartbeat error response in "
+ "state %s (join-state %s, %d partition(s) assigned): %s",
+ rkcg->rkcg_group_id->str,
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0,
+ rd_kafka_err2str(err));
+
+ if (rkcg->rkcg_join_state <= RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC) {
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CGRP, "HEARTBEAT",
+ "Heartbeat response: discarding outdated "
+ "request (now in join-state %s)",
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+ return;
+ }
+
+ switch (err) {
+ case RD_KAFKA_RESP_ERR__DESTROY:
+ /* quick cleanup */
+ return;
+
+ case RD_KAFKA_RESP_ERR_NOT_COORDINATOR_FOR_GROUP:
+ case RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE:
+ case RD_KAFKA_RESP_ERR__TRANSPORT:
+ rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER, "HEARTBEAT",
+ "Heartbeat failed due to coordinator (%s) "
+ "no longer available: %s: "
+ "re-querying for coordinator",
+ rkcg->rkcg_curr_coord
+ ? rd_kafka_broker_name(rkcg->rkcg_curr_coord)
+ : "none",
+ rd_kafka_err2str(err));
+ /* Remain in joined state and keep querying for coordinator */
+ actions = RD_KAFKA_ERR_ACTION_REFRESH;
+ break;
+
+ case RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS:
+ rd_kafka_cgrp_update_session_timeout(
+ rkcg, rd_false /*don't update if session has expired*/);
+ /* No further action if already rebalancing */
+ if (RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg))
+ return;
+ rd_kafka_cgrp_group_is_rebalancing(rkcg);
+ return;
+
+ case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
+ rd_kafka_cgrp_set_member_id(rkcg, "");
+ rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_true /*lost*/,
+ rd_true /*initiating*/,
+ "resetting member-id");
+ return;
+
+ case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
+ rkcg->rkcg_generation_id = -1;
+ rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_true /*lost*/,
+ rd_true /*initiating*/,
+ "illegal generation");
+ return;
+
+ case RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID:
+ rd_kafka_set_fatal_error(rkcg->rkcg_rk, err,
+ "Fatal consumer error: %s",
+ rd_kafka_err2str(err));
+ rd_kafka_cgrp_revoke_all_rejoin_maybe(
+ rkcg, rd_true, /*assignment lost*/
+ rd_true, /*initiating*/
+ "consumer fenced by "
+ "newer instance");
+ return;
+
+ default:
+ actions = rd_kafka_err_action(rkb, err, request,
+ RD_KAFKA_ERR_ACTION_END);
+ break;
+ }
+
+
+ if (actions & RD_KAFKA_ERR_ACTION_REFRESH) {
+ /* Re-query for coordinator */
+ rd_kafka_cgrp_coord_query(rkcg, rd_kafka_err2str(err));
+ }
+
+ if (actions & RD_KAFKA_ERR_ACTION_RETRY &&
+ rd_kafka_buf_retry(rkb, request)) {
+ /* Retry */
+ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
+ return;
+ }
+}
+
+
+
+/**
+ * @brief Send Heartbeat
+ */
+static void rd_kafka_cgrp_heartbeat(rd_kafka_cgrp_t *rkcg) {
+ /* Don't send heartbeats if max.poll.interval.ms was exceeded */
+ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED)
+ return;
+
+ /* Skip heartbeat if we have one in transit */
+ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT)
+ return;
+
+ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_HEARTBEAT_IN_TRANSIT;
+ rd_kafka_HeartbeatRequest(
+ rkcg->rkcg_coord, rkcg->rkcg_group_id, rkcg->rkcg_generation_id,
+ rkcg->rkcg_member_id, rkcg->rkcg_group_instance_id,
+ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0), rd_kafka_cgrp_handle_Heartbeat,
+ NULL);
+}
+
+/**
+ * Cgrp is now terminated: decommission it and signal back to application.
+ */
+static void rd_kafka_cgrp_terminated(rd_kafka_cgrp_t *rkcg) {
+ if (rd_atomic32_get(&rkcg->rkcg_terminated))
+ return; /* terminated() may be called multiple times,
+ * make sure to only terminate once. */
+
+ rd_kafka_cgrp_group_assignment_set(rkcg, NULL);
+
+ rd_kafka_assert(NULL, !rd_kafka_assignment_in_progress(rkcg->rkcg_rk));
+ rd_kafka_assert(NULL, !rkcg->rkcg_group_assignment);
+ rd_kafka_assert(NULL, rkcg->rkcg_rk->rk_consumer.wait_commit_cnt == 0);
+ rd_kafka_assert(NULL, rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM);
+
+ rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers,
+ &rkcg->rkcg_offset_commit_tmr, 1 /*lock*/);
+
+ rd_kafka_q_purge(rkcg->rkcg_wait_coord_q);
+
+ /* Disable and empty ops queue since there will be no
+ * (broker) thread serving it anymore after the unassign_broker
+ * below.
+ * This prevents hang on destroy where responses are enqueued on
+ * rkcg_ops without anything serving the queue. */
+ rd_kafka_q_disable(rkcg->rkcg_ops);
+ rd_kafka_q_purge(rkcg->rkcg_ops);
+
+ if (rkcg->rkcg_curr_coord)
+ rd_kafka_cgrp_coord_clear_broker(rkcg);
+
+ if (rkcg->rkcg_coord) {
+ rd_kafka_broker_destroy(rkcg->rkcg_coord);
+ rkcg->rkcg_coord = NULL;
+ }
+
+ rd_atomic32_set(&rkcg->rkcg_terminated, rd_true);
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
+ "Consumer group sub-system terminated%s",
+ rkcg->rkcg_reply_rko ? " (will enqueue reply)" : "");
+
+ if (rkcg->rkcg_reply_rko) {
+ /* Signal back to application. */
+ rd_kafka_replyq_enq(&rkcg->rkcg_reply_rko->rko_replyq,
+ rkcg->rkcg_reply_rko, 0);
+ rkcg->rkcg_reply_rko = NULL;
+ }
+
+ /* Remove cgrp application queue forwarding, if any. */
+ rd_kafka_q_fwd_set(rkcg->rkcg_q, NULL);
+}
+
+
+/**
+ * If a cgrp is terminating and all outstanding ops are now finished
+ * then progress to final termination and return 1.
+ * Else returns 0.
+ */
+static RD_INLINE int rd_kafka_cgrp_try_terminate(rd_kafka_cgrp_t *rkcg) {
+
+ if (rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM)
+ return 1;
+
+ if (likely(!(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)))
+ return 0;
+
+ /* Check if wait-coord queue has timed out. */
+ if (rd_kafka_q_len(rkcg->rkcg_wait_coord_q) > 0 &&
+ rkcg->rkcg_ts_terminate +
+ (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000) <
+ rd_clock()) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
+ "Group \"%s\": timing out %d op(s) in "
+ "wait-for-coordinator queue",
+ rkcg->rkcg_group_id->str,
+ rd_kafka_q_len(rkcg->rkcg_wait_coord_q));
+ rd_kafka_q_disable(rkcg->rkcg_wait_coord_q);
+ if (rd_kafka_q_concat(rkcg->rkcg_ops,
+ rkcg->rkcg_wait_coord_q) == -1) {
+ /* ops queue shut down, purge coord queue */
+ rd_kafka_q_purge(rkcg->rkcg_wait_coord_q);
+ }
+ }
+
+ if (!RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) &&
+ rd_list_empty(&rkcg->rkcg_toppars) &&
+ !rd_kafka_assignment_in_progress(rkcg->rkcg_rk) &&
+ rkcg->rkcg_rk->rk_consumer.wait_commit_cnt == 0 &&
+ !(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE)) {
+ /* Since we might be deep down in a 'rko' handler
+ * called from cgrp_op_serve() we cant call terminated()
+ * directly since it will decommission the rkcg_ops queue
+ * that might be locked by intermediate functions.
+ * Instead set the TERM state and let the cgrp terminate
+ * at its own discretion. */
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_TERM);
+
+ return 1;
+ } else {
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CGRP, "CGRPTERM",
+ "Group \"%s\": "
+ "waiting for %s%d toppar(s), "
+ "%s"
+ "%d commit(s)%s%s%s (state %s, join-state %s) "
+ "before terminating",
+ rkcg->rkcg_group_id->str,
+ RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) ? "assign call, " : "",
+ rd_list_cnt(&rkcg->rkcg_toppars),
+ rd_kafka_assignment_in_progress(rkcg->rkcg_rk)
+ ? "assignment in progress, "
+ : "",
+ rkcg->rkcg_rk->rk_consumer.wait_commit_cnt,
+ (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WAIT_LEAVE)
+ ? ", wait-leave,"
+ : "",
+ rkcg->rkcg_rebalance_rejoin ? ", rebalance_rejoin," : "",
+ (rkcg->rkcg_rebalance_incr_assignment != NULL)
+ ? ", rebalance_incr_assignment,"
+ : "",
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+ return 0;
+ }
+}
+
+
+/**
+ * @brief Add partition to this cgrp management
+ *
+ * @locks none
+ */
+static void rd_kafka_cgrp_partition_add(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_toppar_t *rktp) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTADD",
+ "Group \"%s\": add %s [%" PRId32 "]",
+ rkcg->rkcg_group_id->str, rktp->rktp_rkt->rkt_topic->str,
+ rktp->rktp_partition);
+
+ rd_kafka_toppar_lock(rktp);
+ rd_assert(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP));
+ rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_ON_CGRP;
+ rd_kafka_toppar_unlock(rktp);
+
+ rd_kafka_toppar_keep(rktp);
+ rd_list_add(&rkcg->rkcg_toppars, rktp);
+}
+
+/**
+ * @brief Remove partition from this cgrp management
+ *
+ * @locks none
+ */
+static void rd_kafka_cgrp_partition_del(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_toppar_t *rktp) {
+ int cnt = 0, barrier_cnt = 0, message_cnt = 0, other_cnt = 0;
+ rd_kafka_op_t *rko;
+ rd_kafka_q_t *rkq;
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL",
+ "Group \"%s\": delete %s [%" PRId32 "]",
+ rkcg->rkcg_group_id->str, rktp->rktp_rkt->rkt_topic->str,
+ rktp->rktp_partition);
+
+ rd_kafka_toppar_lock(rktp);
+ rd_assert(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ON_CGRP);
+ rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ON_CGRP;
+
+ if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_REMOVE) {
+ /* Partition is being removed from the cluster and it's stopped,
+ * so rktp->rktp_fetchq->rkq_fwdq is NULL.
+ * Purge remaining operations in rktp->rktp_fetchq->rkq_q,
+ * while holding lock, to avoid circular references */
+ rkq = rktp->rktp_fetchq;
+ mtx_lock(&rkq->rkq_lock);
+ rd_assert(!rkq->rkq_fwdq);
+
+ rko = TAILQ_FIRST(&rkq->rkq_q);
+ while (rko) {
+ if (rko->rko_type != RD_KAFKA_OP_BARRIER &&
+ rko->rko_type != RD_KAFKA_OP_FETCH) {
+ rd_kafka_log(
+ rkcg->rkcg_rk, LOG_WARNING, "PARTDEL",
+ "Purging toppar fetch queue buffer op"
+ "with unexpected type: %s",
+ rd_kafka_op2str(rko->rko_type));
+ }
+
+ if (rko->rko_type == RD_KAFKA_OP_BARRIER)
+ barrier_cnt++;
+ else if (rko->rko_type == RD_KAFKA_OP_FETCH)
+ message_cnt++;
+ else
+ other_cnt++;
+
+ rko = TAILQ_NEXT(rko, rko_link);
+ cnt++;
+ }
+
+ mtx_unlock(&rkq->rkq_lock);
+
+ if (cnt) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL",
+ "Purge toppar fetch queue buffer "
+ "containing %d op(s) "
+ "(%d barrier(s), %d message(s), %d other)"
+ " to avoid "
+ "circular references",
+ cnt, barrier_cnt, message_cnt, other_cnt);
+ rd_kafka_q_purge(rktp->rktp_fetchq);
+ } else {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "PARTDEL",
+ "Not purging toppar fetch queue buffer."
+ " No ops present in the buffer.");
+ }
+ }
+
+ rd_kafka_toppar_unlock(rktp);
+
+ rd_list_remove(&rkcg->rkcg_toppars, rktp);
+
+ rd_kafka_toppar_destroy(rktp); /* refcnt from _add above */
+
+ rd_kafka_cgrp_try_terminate(rkcg);
+}
+
+
+
+/**
+ * @brief Defer offset commit (rko) until coordinator is available.
+ *
+ * @returns 1 if the rko was deferred or 0 if the defer queue is disabled
+ * or rko already deferred.
+ */
+static int rd_kafka_cgrp_defer_offset_commit(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_op_t *rko,
+ const char *reason) {
+
+ /* wait_coord_q is disabled session.timeout.ms after
+ * group close() has been initated. */
+ if (rko->rko_u.offset_commit.ts_timeout != 0 ||
+ !rd_kafka_q_ready(rkcg->rkcg_wait_coord_q))
+ return 0;
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COMMIT",
+ "Group \"%s\": "
+ "unable to OffsetCommit in state %s: %s: "
+ "coordinator (%s) is unavailable: "
+ "retrying later",
+ rkcg->rkcg_group_id->str,
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state], reason,
+ rkcg->rkcg_curr_coord
+ ? rd_kafka_broker_name(rkcg->rkcg_curr_coord)
+ : "none");
+
+ rko->rko_flags |= RD_KAFKA_OP_F_REPROCESS;
+ rko->rko_u.offset_commit.ts_timeout =
+ rd_clock() +
+ (rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000);
+ rd_kafka_q_enq(rkcg->rkcg_wait_coord_q, rko);
+
+ return 1;
+}
+
+
+/**
+ * @brief Update the committed offsets for the partitions in \p offsets,
+ *
+ * @remark \p offsets may be NULL if \p err is set
+ * @returns the number of partitions with errors encountered
+ */
+static int rd_kafka_cgrp_update_committed_offsets(
+ rd_kafka_cgrp_t *rkcg,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *offsets) {
+ int i;
+ int errcnt = 0;
+
+ /* Update toppars' committed offset or global error */
+ for (i = 0; offsets && i < offsets->cnt; i++) {
+ rd_kafka_topic_partition_t *rktpar = &offsets->elems[i];
+ rd_kafka_toppar_t *rktp;
+
+ /* Ignore logical offsets since they were never
+ * sent to the broker. */
+ if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset))
+ continue;
+
+ /* Propagate global error to all partitions that don't have
+ * explicit error set. */
+ if (err && !rktpar->err)
+ rktpar->err = err;
+
+ if (rktpar->err) {
+ rd_kafka_dbg(rkcg->rkcg_rk, TOPIC, "OFFSET",
+ "OffsetCommit failed for "
+ "%s [%" PRId32
+ "] at offset "
+ "%" PRId64 " in join-state %s: %s",
+ rktpar->topic, rktpar->partition,
+ rktpar->offset,
+ rd_kafka_cgrp_join_state_names
+ [rkcg->rkcg_join_state],
+ rd_kafka_err2str(rktpar->err));
+
+ errcnt++;
+ continue;
+ }
+
+ rktp = rd_kafka_topic_partition_get_toppar(rkcg->rkcg_rk,
+ rktpar, rd_false);
+ if (!rktp)
+ continue;
+
+ rd_kafka_toppar_lock(rktp);
+ rktp->rktp_committed_pos =
+ rd_kafka_topic_partition_get_fetch_pos(rktpar);
+ rd_kafka_toppar_unlock(rktp);
+
+ rd_kafka_toppar_destroy(rktp); /* from get_toppar() */
+ }
+
+ return errcnt;
+}
+
+
+/**
+ * @brief Propagate OffsetCommit results.
+ *
+ * @param rko_orig The original rko that triggered the commit, this is used
+ * to propagate the result.
+ * @param err Is the aggregated request-level error, or ERR_NO_ERROR.
+ * @param errcnt Are the number of partitions in \p offsets that failed
+ * offset commit.
+ */
+static void rd_kafka_cgrp_propagate_commit_result(
+ rd_kafka_cgrp_t *rkcg,
+ rd_kafka_op_t *rko_orig,
+ rd_kafka_resp_err_t err,
+ int errcnt,
+ rd_kafka_topic_partition_list_t *offsets) {
+
+ const rd_kafka_t *rk = rkcg->rkcg_rk;
+ int offset_commit_cb_served = 0;
+
+ /* If no special callback is set but a offset_commit_cb has
+ * been set in conf then post an event for the latter. */
+ if (!rko_orig->rko_u.offset_commit.cb && rk->rk_conf.offset_commit_cb) {
+ rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);
+
+ rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);
+
+ if (offsets)
+ rko_reply->rko_u.offset_commit.partitions =
+ rd_kafka_topic_partition_list_copy(offsets);
+
+ rko_reply->rko_u.offset_commit.cb =
+ rk->rk_conf.offset_commit_cb;
+ rko_reply->rko_u.offset_commit.opaque = rk->rk_conf.opaque;
+
+ rd_kafka_q_enq(rk->rk_rep, rko_reply);
+ offset_commit_cb_served++;
+ }
+
+
+ /* Enqueue reply to requester's queue, if any. */
+ if (rko_orig->rko_replyq.q) {
+ rd_kafka_op_t *rko_reply = rd_kafka_op_new_reply(rko_orig, err);
+
+ rd_kafka_op_set_prio(rko_reply, RD_KAFKA_PRIO_HIGH);
+
+ /* Copy offset & partitions & callbacks to reply op */
+ rko_reply->rko_u.offset_commit = rko_orig->rko_u.offset_commit;
+ if (offsets)
+ rko_reply->rko_u.offset_commit.partitions =
+ rd_kafka_topic_partition_list_copy(offsets);
+ if (rko_reply->rko_u.offset_commit.reason)
+ rko_reply->rko_u.offset_commit.reason =
+ rd_strdup(rko_reply->rko_u.offset_commit.reason);
+
+ rd_kafka_replyq_enq(&rko_orig->rko_replyq, rko_reply, 0);
+ offset_commit_cb_served++;
+ }
+
+ if (!offset_commit_cb_served && offsets &&
+ (errcnt > 0 || (err != RD_KAFKA_RESP_ERR_NO_ERROR &&
+ err != RD_KAFKA_RESP_ERR__NO_OFFSET))) {
+ /* If there is no callback or handler for this (auto)
+ * commit then log an error (#1043) */
+ char tmp[512];
+
+ rd_kafka_topic_partition_list_str(
+ offsets, tmp, sizeof(tmp),
+ /* Print per-partition errors unless there was a
+ * request-level error. */
+ RD_KAFKA_FMT_F_OFFSET |
+ (errcnt ? RD_KAFKA_FMT_F_ONLY_ERR : 0));
+
+ rd_kafka_log(
+ rkcg->rkcg_rk, LOG_WARNING, "COMMITFAIL",
+ "Offset commit (%s) failed "
+ "for %d/%d partition(s) in join-state %s: "
+ "%s%s%s",
+ rko_orig->rko_u.offset_commit.reason,
+ errcnt ? errcnt : offsets->cnt, offsets->cnt,
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ errcnt ? rd_kafka_err2str(err) : "", errcnt ? ": " : "",
+ tmp);
+ }
+}
+
+
+
+/**
+ * @brief Handle OffsetCommitResponse
+ * Takes the original 'rko' as opaque argument.
+ * @remark \p rkb, rkbuf, and request may be NULL in a number of
+ * error cases (e.g., _NO_OFFSET, _WAIT_COORD)
+ */
+static void rd_kafka_cgrp_op_handle_OffsetCommit(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *rkbuf,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_cgrp_t *rkcg = rk->rk_cgrp;
+ rd_kafka_op_t *rko_orig = opaque;
+ rd_kafka_topic_partition_list_t *offsets =
+ rko_orig->rko_u.offset_commit.partitions; /* maybe NULL */
+ int errcnt;
+
+ RD_KAFKA_OP_TYPE_ASSERT(rko_orig, RD_KAFKA_OP_OFFSET_COMMIT);
+
+ err = rd_kafka_handle_OffsetCommit(rk, rkb, err, rkbuf, request,
+ offsets, rd_false);
+
+ /* Suppress empty commit debug logs if allowed */
+ if (err != RD_KAFKA_RESP_ERR__NO_OFFSET ||
+ !rko_orig->rko_u.offset_commit.silent_empty) {
+ if (rkb)
+ rd_rkb_dbg(rkb, CGRP, "COMMIT",
+ "OffsetCommit for %d partition(s) in "
+ "join-state %s: "
+ "%s: returned: %s",
+ offsets ? offsets->cnt : -1,
+ rd_kafka_cgrp_join_state_names
+ [rkcg->rkcg_join_state],
+ rko_orig->rko_u.offset_commit.reason,
+ rd_kafka_err2str(err));
+ else
+ rd_kafka_dbg(rk, CGRP, "COMMIT",
+ "OffsetCommit for %d partition(s) in "
+ "join-state "
+ "%s: %s: "
+ "returned: %s",
+ offsets ? offsets->cnt : -1,
+ rd_kafka_cgrp_join_state_names
+ [rkcg->rkcg_join_state],
+ rko_orig->rko_u.offset_commit.reason,
+ rd_kafka_err2str(err));
+ }
+
+
+ /*
+ * Error handling
+ */
+ switch (err) {
+ case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID:
+ /* Revoke assignment and rebalance on unknown member */
+ rd_kafka_cgrp_set_member_id(rk->rk_cgrp, "");
+ rd_kafka_cgrp_revoke_all_rejoin_maybe(
+ rkcg, rd_true /*assignment is lost*/,
+ rd_true /*this consumer is initiating*/,
+ "OffsetCommit error: Unknown member");
+ break;
+
+ case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION:
+ /* Revoke assignment and rebalance on illegal generation */
+ rk->rk_cgrp->rkcg_generation_id = -1;
+ rd_kafka_cgrp_revoke_all_rejoin_maybe(
+ rkcg, rd_true /*assignment is lost*/,
+ rd_true /*this consumer is initiating*/,
+ "OffsetCommit error: Illegal generation");
+ break;
+
+ case RD_KAFKA_RESP_ERR__IN_PROGRESS:
+ return; /* Retrying */
+
+ case RD_KAFKA_RESP_ERR_NOT_COORDINATOR:
+ case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE:
+ case RD_KAFKA_RESP_ERR__TRANSPORT:
+ /* The coordinator is not available, defer the offset commit
+ * to when the coordinator is back up again. */
+
+ /* Future-proofing, see timeout_scan(). */
+ rd_kafka_assert(NULL, err != RD_KAFKA_RESP_ERR__WAIT_COORD);
+
+ if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko_orig,
+ rd_kafka_err2str(err)))
+ return;
+ break;
+
+ default:
+ break;
+ }
+
+ /* Call on_commit interceptors */
+ if (err != RD_KAFKA_RESP_ERR__NO_OFFSET &&
+ err != RD_KAFKA_RESP_ERR__DESTROY && offsets && offsets->cnt > 0)
+ rd_kafka_interceptors_on_commit(rk, offsets, err);
+
+ /* Keep track of outstanding commits */
+ rd_kafka_assert(NULL, rk->rk_consumer.wait_commit_cnt > 0);
+ rk->rk_consumer.wait_commit_cnt--;
+
+ if (err == RD_KAFKA_RESP_ERR__DESTROY) {
+ rd_kafka_op_destroy(rko_orig);
+ return; /* Handle is terminating, this op may be handled
+ * by the op enq()ing thread rather than the
+ * rdkafka main thread, it is not safe to
+ * continue here. */
+ }
+
+ /* Update the committed offsets for each partition's rktp. */
+ errcnt = rd_kafka_cgrp_update_committed_offsets(rkcg, err, offsets);
+
+ if (err != RD_KAFKA_RESP_ERR__DESTROY &&
+ !(err == RD_KAFKA_RESP_ERR__NO_OFFSET &&
+ rko_orig->rko_u.offset_commit.silent_empty)) {
+ /* Propagate commit results (success or permanent error)
+ * unless we're shutting down or commit was empty. */
+ rd_kafka_cgrp_propagate_commit_result(rkcg, rko_orig, err,
+ errcnt, offsets);
+ }
+
+ rd_kafka_op_destroy(rko_orig);
+
+ /* If the current state was waiting for commits to finish we'll try to
+ * transition to the next state. */
+ if (rk->rk_consumer.wait_commit_cnt == 0)
+ rd_kafka_assignment_serve(rk);
+}
+
+
+static size_t rd_kafka_topic_partition_has_absolute_offset(
+ const rd_kafka_topic_partition_t *rktpar,
+ void *opaque) {
+ return rktpar->offset >= 0 ? 1 : 0;
+}
+
+
+/**
+ * Commit a list of offsets.
+ * Reuse the orignating 'rko' for the async reply.
+ * 'rko->rko_payload' should either by NULL (to commit current assignment) or
+ * a proper topic_partition_list_t with offsets to commit.
+ * The offset list will be altered.
+ *
+ * \p rko...silent_empty: if there are no offsets to commit bail out
+ * silently without posting an op on the reply queue.
+ * \p set_offsets: set offsets and epochs in
+ * rko->rko_u.offset_commit.partitions from the rktp's
+ * stored offset.
+ *
+ * Locality: cgrp thread
+ */
+static void rd_kafka_cgrp_offsets_commit(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_op_t *rko,
+ rd_bool_t set_offsets,
+ const char *reason) {
+ rd_kafka_topic_partition_list_t *offsets;
+ rd_kafka_resp_err_t err;
+ int valid_offsets = 0;
+ int r;
+ rd_kafka_buf_t *rkbuf;
+ rd_kafka_op_t *reply;
+ rd_kafka_consumer_group_metadata_t *cgmetadata;
+
+ if (!(rko->rko_flags & RD_KAFKA_OP_F_REPROCESS)) {
+ /* wait_commit_cnt has already been increased for
+ * reprocessed ops. */
+ rkcg->rkcg_rk->rk_consumer.wait_commit_cnt++;
+ }
+
+ /* If offsets is NULL we shall use the current assignment
+ * (not the group assignment). */
+ if (!rko->rko_u.offset_commit.partitions &&
+ rkcg->rkcg_rk->rk_consumer.assignment.all->cnt > 0) {
+ if (rd_kafka_cgrp_assignment_is_lost(rkcg)) {
+ /* Not committing assigned offsets: assignment lost */
+ err = RD_KAFKA_RESP_ERR__ASSIGNMENT_LOST;
+ goto err;
+ }
+
+ rko->rko_u.offset_commit.partitions =
+ rd_kafka_topic_partition_list_copy(
+ rkcg->rkcg_rk->rk_consumer.assignment.all);
+ }
+
+ offsets = rko->rko_u.offset_commit.partitions;
+
+ if (offsets) {
+ /* Set offsets to commits */
+ if (set_offsets)
+ rd_kafka_topic_partition_list_set_offsets(
+ rkcg->rkcg_rk, rko->rko_u.offset_commit.partitions,
+ 1, RD_KAFKA_OFFSET_INVALID /* def */,
+ 1 /* is commit */);
+
+ /* Check the number of valid offsets to commit. */
+ valid_offsets = (int)rd_kafka_topic_partition_list_sum(
+ offsets, rd_kafka_topic_partition_has_absolute_offset,
+ NULL);
+ }
+
+ if (rd_kafka_fatal_error_code(rkcg->rkcg_rk)) {
+ /* Commits are not allowed when a fatal error has been raised */
+ err = RD_KAFKA_RESP_ERR__FATAL;
+ goto err;
+ }
+
+ if (!valid_offsets) {
+ /* No valid offsets */
+ err = RD_KAFKA_RESP_ERR__NO_OFFSET;
+ goto err;
+ }
+
+ if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP,
+ "COMMIT",
+ "Deferring \"%s\" offset commit "
+ "for %d partition(s) in state %s: "
+ "no coordinator available",
+ reason, valid_offsets,
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state]);
+
+ if (rd_kafka_cgrp_defer_offset_commit(rkcg, rko, reason))
+ return;
+
+ err = RD_KAFKA_RESP_ERR__WAIT_COORD;
+ goto err;
+ }
+
+
+ rd_rkb_dbg(rkcg->rkcg_coord, CONSUMER | RD_KAFKA_DBG_CGRP, "COMMIT",
+ "Committing offsets for %d partition(s) with "
+ "generation-id %" PRId32 " in join-state %s: %s",
+ valid_offsets, rkcg->rkcg_generation_id,
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ reason);
+
+ cgmetadata = rd_kafka_consumer_group_metadata_new_with_genid(
+ rkcg->rkcg_rk->rk_conf.group_id_str, rkcg->rkcg_generation_id,
+ rkcg->rkcg_member_id->str,
+ rkcg->rkcg_rk->rk_conf.group_instance_id);
+
+ /* Send OffsetCommit */
+ r = rd_kafka_OffsetCommitRequest(rkcg->rkcg_coord, cgmetadata, offsets,
+ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+ rd_kafka_cgrp_op_handle_OffsetCommit,
+ rko, reason);
+ rd_kafka_consumer_group_metadata_destroy(cgmetadata);
+
+ /* Must have valid offsets to commit if we get here */
+ rd_kafka_assert(NULL, r != 0);
+
+ return;
+
+err:
+ if (err != RD_KAFKA_RESP_ERR__NO_OFFSET)
+ rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP,
+ "COMMIT", "OffsetCommit internal error: %s",
+ rd_kafka_err2str(err));
+
+ /* Propagate error through dummy buffer object that will
+ * call the response handler from the main loop, avoiding
+ * any recursive calls from op_handle_OffsetCommit ->
+ * assignment_serve() and then back to cgrp_assigned_offsets_commit() */
+
+ reply = rd_kafka_op_new(RD_KAFKA_OP_RECV_BUF);
+ reply->rko_rk = rkcg->rkcg_rk; /* Set rk since the rkbuf will not
+ * have a rkb to reach it. */
+ reply->rko_err = err;
+
+ rkbuf = rd_kafka_buf_new(0, 0);
+ rkbuf->rkbuf_cb = rd_kafka_cgrp_op_handle_OffsetCommit;
+ rkbuf->rkbuf_opaque = rko;
+ reply->rko_u.xbuf.rkbuf = rkbuf;
+
+ rd_kafka_q_enq(rkcg->rkcg_ops, reply);
+}
+
+
+/**
+ * @brief Commit offsets assigned partitions.
+ *
+ * If \p offsets is NULL all partitions in the current assignment will be used.
+ * If \p set_offsets is true the offsets to commit will be read from the
+ * rktp's stored offset rather than the .offset fields in \p offsets.
+ *
+ * rkcg_wait_commit_cnt will be increased accordingly.
+ */
+void rd_kafka_cgrp_assigned_offsets_commit(
+ rd_kafka_cgrp_t *rkcg,
+ const rd_kafka_topic_partition_list_t *offsets,
+ rd_bool_t set_offsets,
+ const char *reason) {
+ rd_kafka_op_t *rko;
+
+ if (rd_kafka_cgrp_assignment_is_lost(rkcg)) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "AUTOCOMMIT",
+ "Group \"%s\": not committing assigned offsets: "
+ "assignment lost",
+ rkcg->rkcg_group_id->str);
+ return;
+ }
+
+ rko = rd_kafka_op_new(RD_KAFKA_OP_OFFSET_COMMIT);
+ rko->rko_u.offset_commit.reason = rd_strdup(reason);
+ if (rkcg->rkcg_rk->rk_conf.enabled_events &
+ RD_KAFKA_EVENT_OFFSET_COMMIT) {
+ /* Send results to application */
+ rd_kafka_op_set_replyq(rko, rkcg->rkcg_rk->rk_rep, 0);
+ rko->rko_u.offset_commit.cb =
+ rkcg->rkcg_rk->rk_conf.offset_commit_cb; /*maybe NULL*/
+ rko->rko_u.offset_commit.opaque = rkcg->rkcg_rk->rk_conf.opaque;
+ }
+ /* NULL partitions means current assignment */
+ if (offsets)
+ rko->rko_u.offset_commit.partitions =
+ rd_kafka_topic_partition_list_copy(offsets);
+ rko->rko_u.offset_commit.silent_empty = 1;
+ rd_kafka_cgrp_offsets_commit(rkcg, rko, set_offsets, reason);
+}
+
+
+/**
+ * auto.commit.interval.ms commit timer callback.
+ *
+ * Trigger a group offset commit.
+ *
+ * Locality: rdkafka main thread
+ */
+static void rd_kafka_cgrp_offset_commit_tmr_cb(rd_kafka_timers_t *rkts,
+ void *arg) {
+ rd_kafka_cgrp_t *rkcg = arg;
+
+ /* Don't attempt auto commit when rebalancing or initializing since
+ * the rkcg_generation_id is most likely in flux. */
+ if (rkcg->rkcg_subscription &&
+ rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_STEADY)
+ return;
+
+ rd_kafka_cgrp_assigned_offsets_commit(
+ rkcg, NULL, rd_true /*set offsets*/, "cgrp auto commit timer");
+}
+
+
+/**
+ * @brief If rkcg_next_subscription or rkcg_next_unsubscribe are
+ * set, trigger a state change so that they are applied from the
+ * main dispatcher.
+ *
+ * @returns rd_true if a subscribe was scheduled, else false.
+ */
+static rd_bool_t
+rd_kafka_trigger_waiting_subscribe_maybe(rd_kafka_cgrp_t *rkcg) {
+
+ if (rkcg->rkcg_next_subscription || rkcg->rkcg_next_unsubscribe) {
+ /* Skip the join backoff */
+ rd_interval_reset(&rkcg->rkcg_join_intvl);
+ rd_kafka_cgrp_rejoin(rkcg, "Applying next subscription");
+ return rd_true;
+ }
+
+ return rd_false;
+}
+
+
+/**
+ * @brief Incrementally add to an existing partition assignment
+ * May update \p partitions but will not hold on to it.
+ *
+ * @returns an error object or NULL on success.
+ */
+static rd_kafka_error_t *
+rd_kafka_cgrp_incremental_assign(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *partitions) {
+ rd_kafka_error_t *error;
+
+ error = rd_kafka_assignment_add(rkcg->rkcg_rk, partitions);
+ if (error)
+ return error;
+
+ if (rkcg->rkcg_join_state ==
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL) {
+ rd_kafka_assignment_resume(rkcg->rkcg_rk,
+ "incremental assign called");
+ rd_kafka_cgrp_set_join_state(rkcg,
+ RD_KAFKA_CGRP_JOIN_STATE_STEADY);
+
+ if (rkcg->rkcg_subscription) {
+ /* If using subscribe(), start a timer to enforce
+ * `max.poll.interval.ms`.
+ * Instead of restarting the timer on each ...poll()
+ * call, which would be costly (once per message),
+ * set up an intervalled timer that checks a timestamp
+ * (that is updated on ..poll()).
+ * The timer interval is 2 hz. */
+ rd_kafka_timer_start(
+ &rkcg->rkcg_rk->rk_timers,
+ &rkcg->rkcg_max_poll_interval_tmr,
+ 500 * 1000ll /* 500ms */,
+ rd_kafka_cgrp_max_poll_interval_check_tmr_cb, rkcg);
+ }
+ }
+
+ rd_kafka_cgrp_assignment_clear_lost(rkcg,
+ "incremental_assign() called");
+
+ return NULL;
+}
+
+
+/**
+ * @brief Incrementally remove partitions from an existing partition
+ * assignment. May update \p partitions but will not hold on
+ * to it.
+ *
+ * @remark This method does not unmark the current assignment as lost
+ * (if lost). That happens following _incr_unassign_done and
+ * a group-rejoin initiated.
+ *
+ * @returns An error object or NULL on success.
+ */
+static rd_kafka_error_t *rd_kafka_cgrp_incremental_unassign(
+ rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *partitions) {
+ rd_kafka_error_t *error;
+
+ error = rd_kafka_assignment_subtract(rkcg->rkcg_rk, partitions);
+ if (error)
+ return error;
+
+ if (rkcg->rkcg_join_state ==
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL) {
+ rd_kafka_assignment_resume(rkcg->rkcg_rk,
+ "incremental unassign called");
+ rd_kafka_cgrp_set_join_state(
+ rkcg,
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE);
+ }
+
+ rd_kafka_cgrp_assignment_clear_lost(rkcg,
+ "incremental_unassign() called");
+
+ return NULL;
+}
+
+
+/**
+ * @brief Call when all incremental unassign operations are done to transition
+ * to the next state.
+ */
+static void rd_kafka_cgrp_incr_unassign_done(rd_kafka_cgrp_t *rkcg) {
+
+ /* If this action was underway when a terminate was initiated, it will
+ * be left to complete. Now that's done, unassign all partitions */
+ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
+ "Group \"%s\" is terminating, initiating full "
+ "unassign",
+ rkcg->rkcg_group_id->str);
+ rd_kafka_cgrp_unassign(rkcg);
+ return;
+ }
+
+ if (rkcg->rkcg_rebalance_incr_assignment) {
+
+ /* This incremental unassign was part of a normal rebalance
+ * (in which the revoke set was not empty). Immediately
+ * trigger the assign that follows this revoke. The protocol
+ * dictates this should occur even if the new assignment
+ * set is empty.
+ *
+ * Also, since this rebalance had some revoked partitions,
+ * a re-join should occur following the assign.
+ */
+
+ rd_kafka_rebalance_op_incr(rkcg,
+ RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+ rkcg->rkcg_rebalance_incr_assignment,
+ rd_true /*rejoin following assign*/,
+ "cooperative assign after revoke");
+
+ rd_kafka_topic_partition_list_destroy(
+ rkcg->rkcg_rebalance_incr_assignment);
+ rkcg->rkcg_rebalance_incr_assignment = NULL;
+
+ /* Note: rkcg_rebalance_rejoin is actioned / reset in
+ * rd_kafka_cgrp_incremental_assign call */
+
+ } else if (rkcg->rkcg_rebalance_rejoin) {
+ rkcg->rkcg_rebalance_rejoin = rd_false;
+
+ /* There are some cases (lost partitions), where a rejoin
+ * should occur immediately following the unassign (this
+ * is not the case under normal conditions), in which case
+ * the rejoin flag will be set. */
+
+ /* Skip the join backoff */
+ rd_interval_reset(&rkcg->rkcg_join_intvl);
+
+ rd_kafka_cgrp_rejoin(rkcg, "Incremental unassignment done");
+
+ } else if (!rd_kafka_trigger_waiting_subscribe_maybe(rkcg)) {
+ /* After this incremental unassignment we're now back in
+ * a steady state. */
+ rd_kafka_cgrp_set_join_state(rkcg,
+ RD_KAFKA_CGRP_JOIN_STATE_STEADY);
+ }
+}
+
+
+/**
+ * @brief Call when all absolute (non-incremental) unassign operations are done
+ * to transition to the next state.
+ */
+static void rd_kafka_cgrp_unassign_done(rd_kafka_cgrp_t *rkcg) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNASSIGN",
+ "Group \"%s\": unassign done in state %s "
+ "(join-state %s)",
+ rkcg->rkcg_group_id->str,
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+
+ /* Leave group, if desired. */
+ rd_kafka_cgrp_leave_maybe(rkcg);
+
+ if (rkcg->rkcg_join_state !=
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE)
+ return;
+
+ /* All partitions are unassigned. Rejoin the group. */
+
+ /* Skip the join backoff */
+ rd_interval_reset(&rkcg->rkcg_join_intvl);
+
+ rd_kafka_cgrp_rejoin(rkcg, "Unassignment done");
+}
+
+
+
+/**
+ * @brief Called from assignment code when all in progress
+ * assignment/unassignment operations are done, allowing the cgrp to
+ * transition to other states if needed.
+ *
+ * @remark This may be called spontaneously without any need for a state
+ * change in the rkcg.
+ */
+void rd_kafka_cgrp_assignment_done(rd_kafka_cgrp_t *rkcg) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNDONE",
+ "Group \"%s\": "
+ "assignment operations done in join-state %s "
+ "(rebalance rejoin=%s)",
+ rkcg->rkcg_group_id->str,
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ RD_STR_ToF(rkcg->rkcg_rebalance_rejoin));
+
+ switch (rkcg->rkcg_join_state) {
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE:
+ rd_kafka_cgrp_unassign_done(rkcg);
+ break;
+
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE:
+ rd_kafka_cgrp_incr_unassign_done(rkcg);
+ break;
+
+ case RD_KAFKA_CGRP_JOIN_STATE_STEADY:
+ /* If an updated/next subscription is available, schedule it. */
+ if (rd_kafka_trigger_waiting_subscribe_maybe(rkcg))
+ break;
+
+ if (rkcg->rkcg_rebalance_rejoin) {
+ rkcg->rkcg_rebalance_rejoin = rd_false;
+
+ /* Skip the join backoff */
+ rd_interval_reset(&rkcg->rkcg_join_intvl);
+
+ rd_kafka_cgrp_rejoin(
+ rkcg,
+ "rejoining group to redistribute "
+ "previously owned partitions to other "
+ "group members");
+ break;
+ }
+
+ /* FALLTHRU */
+
+ case RD_KAFKA_CGRP_JOIN_STATE_INIT:
+ /* Check if cgrp is trying to terminate, which is safe to do
+ * in these two states. Otherwise we'll need to wait for
+ * the current state to decommission. */
+ rd_kafka_cgrp_try_terminate(rkcg);
+ break;
+
+ default:
+ break;
+ }
+}
+
+
+
+/**
+ * @brief Remove existing assignment.
+ */
+static rd_kafka_error_t *rd_kafka_cgrp_unassign(rd_kafka_cgrp_t *rkcg) {
+
+ rd_kafka_assignment_clear(rkcg->rkcg_rk);
+
+ if (rkcg->rkcg_join_state ==
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL) {
+ rd_kafka_assignment_resume(rkcg->rkcg_rk, "unassign called");
+ rd_kafka_cgrp_set_join_state(
+ rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE);
+ }
+
+ rd_kafka_cgrp_assignment_clear_lost(rkcg, "unassign() called");
+
+ return NULL;
+}
+
+
+/**
+ * @brief Set new atomic partition assignment
+ * May update \p assignment but will not hold on to it.
+ *
+ * @returns NULL on success or an error if a fatal error has been raised.
+ */
+static rd_kafka_error_t *
+rd_kafka_cgrp_assign(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *assignment) {
+ rd_kafka_error_t *error;
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "ASSIGN",
+ "Group \"%s\": new assignment of %d partition(s) "
+ "in join-state %s",
+ rkcg->rkcg_group_id->str, assignment ? assignment->cnt : 0,
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+
+ /* Clear existing assignment, if any, and serve its removals. */
+ if (rd_kafka_assignment_clear(rkcg->rkcg_rk))
+ rd_kafka_assignment_serve(rkcg->rkcg_rk);
+
+ error = rd_kafka_assignment_add(rkcg->rkcg_rk, assignment);
+ if (error)
+ return error;
+
+ rd_kafka_cgrp_assignment_clear_lost(rkcg, "assign() called");
+
+ if (rkcg->rkcg_join_state ==
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL) {
+ rd_kafka_assignment_resume(rkcg->rkcg_rk, "assign called");
+ rd_kafka_cgrp_set_join_state(rkcg,
+ RD_KAFKA_CGRP_JOIN_STATE_STEADY);
+
+ if (rkcg->rkcg_subscription) {
+ /* If using subscribe(), start a timer to enforce
+ * `max.poll.interval.ms`.
+ * Instead of restarting the timer on each ...poll()
+ * call, which would be costly (once per message),
+ * set up an intervalled timer that checks a timestamp
+ * (that is updated on ..poll()).
+ * The timer interval is 2 hz. */
+ rd_kafka_timer_start(
+ &rkcg->rkcg_rk->rk_timers,
+ &rkcg->rkcg_max_poll_interval_tmr,
+ 500 * 1000ll /* 500ms */,
+ rd_kafka_cgrp_max_poll_interval_check_tmr_cb, rkcg);
+ }
+ }
+
+ return NULL;
+}
+
+
+
+/**
+ * @brief Construct a typed map from list \p rktparlist with key corresponding
+ * to each element in the list and value NULL.
+ *
+ * @remark \p rktparlist may be NULL.
+ */
+static map_toppar_member_info_t *rd_kafka_toppar_list_to_toppar_member_info_map(
+ rd_kafka_topic_partition_list_t *rktparlist) {
+ map_toppar_member_info_t *map = rd_calloc(1, sizeof(*map));
+ const rd_kafka_topic_partition_t *rktpar;
+
+ RD_MAP_INIT(map, rktparlist ? rktparlist->cnt : 0,
+ rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash,
+ rd_kafka_topic_partition_destroy_free,
+ PartitionMemberInfo_free);
+
+ if (!rktparlist)
+ return map;
+
+ RD_KAFKA_TPLIST_FOREACH(rktpar, rktparlist)
+ RD_MAP_SET(map, rd_kafka_topic_partition_copy(rktpar),
+ PartitionMemberInfo_new(NULL, rd_false));
+
+ return map;
+}
+
+
+/**
+ * @brief Construct a toppar list from map \p map with elements corresponding
+ * to the keys of \p map.
+ */
+static rd_kafka_topic_partition_list_t *
+rd_kafka_toppar_member_info_map_to_list(map_toppar_member_info_t *map) {
+ const rd_kafka_topic_partition_t *k;
+ rd_kafka_topic_partition_list_t *list =
+ rd_kafka_topic_partition_list_new((int)RD_MAP_CNT(map));
+
+ RD_MAP_FOREACH_KEY(k, map) {
+ rd_kafka_topic_partition_list_add(list, k->topic, k->partition);
+ }
+
+ return list;
+}
+
+
+/**
+ * @brief Handle a rebalance-triggered partition assignment
+ * (COOPERATIVE case).
+ */
+static void rd_kafka_cgrp_handle_assignment_cooperative(
+ rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *assignment) {
+ map_toppar_member_info_t *new_assignment_set;
+ map_toppar_member_info_t *old_assignment_set;
+ map_toppar_member_info_t *newly_added_set;
+ map_toppar_member_info_t *revoked_set;
+ rd_kafka_topic_partition_list_t *newly_added;
+ rd_kafka_topic_partition_list_t *revoked;
+
+ new_assignment_set =
+ rd_kafka_toppar_list_to_toppar_member_info_map(assignment);
+
+ old_assignment_set = rd_kafka_toppar_list_to_toppar_member_info_map(
+ rkcg->rkcg_group_assignment);
+
+ newly_added_set = rd_kafka_member_partitions_subtract(
+ new_assignment_set, old_assignment_set);
+ revoked_set = rd_kafka_member_partitions_subtract(old_assignment_set,
+ new_assignment_set);
+
+ newly_added = rd_kafka_toppar_member_info_map_to_list(newly_added_set);
+ revoked = rd_kafka_toppar_member_info_map_to_list(revoked_set);
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "COOPASSIGN",
+ "Group \"%s\": incremental assignment: %d newly added, "
+ "%d revoked partitions based on assignment of %d "
+ "partitions",
+ rkcg->rkcg_group_id->str, newly_added->cnt, revoked->cnt,
+ assignment->cnt);
+
+ if (revoked->cnt > 0) {
+ /* Setting rkcg_incr_assignment causes a follow on incremental
+ * assign rebalance op after completion of this incremental
+ * unassign op. */
+
+ rkcg->rkcg_rebalance_incr_assignment = newly_added;
+ newly_added = NULL;
+
+ rd_kafka_rebalance_op_incr(rkcg,
+ RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
+ revoked, rd_false /*no rejoin
+ following unassign*/
+ ,
+ "sync group revoke");
+
+ } else {
+ /* There are no revoked partitions - trigger the assign
+ * rebalance op, and flag that the group does not need
+ * to be re-joined */
+
+ rd_kafka_rebalance_op_incr(
+ rkcg, RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, newly_added,
+ rd_false /*no rejoin following assign*/,
+ "sync group assign");
+ }
+
+ if (newly_added)
+ rd_kafka_topic_partition_list_destroy(newly_added);
+ rd_kafka_topic_partition_list_destroy(revoked);
+ RD_MAP_DESTROY_AND_FREE(revoked_set);
+ RD_MAP_DESTROY_AND_FREE(newly_added_set);
+ RD_MAP_DESTROY_AND_FREE(old_assignment_set);
+ RD_MAP_DESTROY_AND_FREE(new_assignment_set);
+}
+
+
+/**
+ * @brief Sets or clears the group's partition assignment for our consumer.
+ *
+ * Will replace the current group assignment, if any.
+ */
+static void rd_kafka_cgrp_group_assignment_set(
+ rd_kafka_cgrp_t *rkcg,
+ const rd_kafka_topic_partition_list_t *partitions) {
+
+ if (rkcg->rkcg_group_assignment)
+ rd_kafka_topic_partition_list_destroy(
+ rkcg->rkcg_group_assignment);
+
+ if (partitions) {
+ rkcg->rkcg_group_assignment =
+ rd_kafka_topic_partition_list_copy(partitions);
+ rd_kafka_topic_partition_list_sort_by_topic(
+ rkcg->rkcg_group_assignment);
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNMENT",
+ "Group \"%s\": setting group assignment to %d "
+ "partition(s)",
+ rkcg->rkcg_group_id->str,
+ rkcg->rkcg_group_assignment->cnt);
+
+ } else {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "ASSIGNMENT",
+ "Group \"%s\": clearing group assignment",
+ rkcg->rkcg_group_id->str);
+ rkcg->rkcg_group_assignment = NULL;
+ }
+
+ rd_kafka_wrlock(rkcg->rkcg_rk);
+ rkcg->rkcg_c.assignment_size =
+ rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0;
+ rd_kafka_wrunlock(rkcg->rkcg_rk);
+
+ if (rkcg->rkcg_group_assignment)
+ rd_kafka_topic_partition_list_log(
+ rkcg->rkcg_rk, "GRPASSIGNMENT", RD_KAFKA_DBG_CGRP,
+ rkcg->rkcg_group_assignment);
+}
+
+
+/**
+ * @brief Adds or removes \p partitions from the current group assignment.
+ *
+ * @param add Whether to add or remove the partitions.
+ *
+ * @remark The added partitions must not already be on the group assignment,
+ * and the removed partitions must be on the group assignment.
+ *
+ * To be used with incremental rebalancing.
+ *
+ */
+static void rd_kafka_cgrp_group_assignment_modify(
+ rd_kafka_cgrp_t *rkcg,
+ rd_bool_t add,
+ const rd_kafka_topic_partition_list_t *partitions) {
+ const rd_kafka_topic_partition_t *rktpar;
+ int precnt;
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CGRP, "ASSIGNMENT",
+ "Group \"%s\": %d partition(s) being %s group assignment "
+ "of %d partition(s)",
+ rkcg->rkcg_group_id->str, partitions->cnt,
+ add ? "added to" : "removed from",
+ rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0);
+
+ if (partitions == rkcg->rkcg_group_assignment) {
+ /* \p partitions is the actual assignment, which
+ * must mean it is all to be removed.
+ * Short-cut directly to set(NULL). */
+ rd_assert(!add);
+ rd_kafka_cgrp_group_assignment_set(rkcg, NULL);
+ return;
+ }
+
+ if (add && (!rkcg->rkcg_group_assignment ||
+ rkcg->rkcg_group_assignment->cnt == 0)) {
+ /* Adding to an empty assignment is a set operation. */
+ rd_kafka_cgrp_group_assignment_set(rkcg, partitions);
+ return;
+ }
+
+ if (!add) {
+ /* Removing from an empty assignment is illegal. */
+ rd_assert(rkcg->rkcg_group_assignment != NULL &&
+ rkcg->rkcg_group_assignment->cnt > 0);
+ }
+
+
+ precnt = rkcg->rkcg_group_assignment->cnt;
+ RD_KAFKA_TPLIST_FOREACH(rktpar, partitions) {
+ int idx;
+
+ idx = rd_kafka_topic_partition_list_find_idx(
+ rkcg->rkcg_group_assignment, rktpar->topic,
+ rktpar->partition);
+
+ if (add) {
+ rd_assert(idx == -1);
+
+ rd_kafka_topic_partition_list_add_copy(
+ rkcg->rkcg_group_assignment, rktpar);
+
+ } else {
+ rd_assert(idx != -1);
+
+ rd_kafka_topic_partition_list_del_by_idx(
+ rkcg->rkcg_group_assignment, idx);
+ }
+ }
+
+ if (add)
+ rd_assert(precnt + partitions->cnt ==
+ rkcg->rkcg_group_assignment->cnt);
+ else
+ rd_assert(precnt - partitions->cnt ==
+ rkcg->rkcg_group_assignment->cnt);
+
+ if (rkcg->rkcg_group_assignment->cnt == 0) {
+ rd_kafka_topic_partition_list_destroy(
+ rkcg->rkcg_group_assignment);
+ rkcg->rkcg_group_assignment = NULL;
+
+ } else if (add)
+ rd_kafka_topic_partition_list_sort_by_topic(
+ rkcg->rkcg_group_assignment);
+
+ rd_kafka_wrlock(rkcg->rkcg_rk);
+ rkcg->rkcg_c.assignment_size =
+ rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0;
+ rd_kafka_wrunlock(rkcg->rkcg_rk);
+
+ if (rkcg->rkcg_group_assignment)
+ rd_kafka_topic_partition_list_log(
+ rkcg->rkcg_rk, "GRPASSIGNMENT", RD_KAFKA_DBG_CGRP,
+ rkcg->rkcg_group_assignment);
+}
+
+
+/**
+ * @brief Handle a rebalance-triggered partition assignment.
+ *
+ * If a rebalance_cb has been registered we enqueue an op for the app
+ * and let the app perform the actual assign() call. Otherwise we
+ * assign() directly from here.
+ *
+ * This provides the most flexibility, allowing the app to perform any
+ * operation it seem fit (e.g., offset writes or reads) before actually
+ * updating the assign():ment.
+ */
+static void
+rd_kafka_cgrp_handle_assignment(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *assignment) {
+
+ if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
+ RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE) {
+ rd_kafka_cgrp_handle_assignment_cooperative(rkcg, assignment);
+ } else {
+
+ rd_kafka_rebalance_op(rkcg,
+ RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS,
+ assignment, "new assignment");
+ }
+}
+
+
+/**
+ * Clean up any group-leader related resources.
+ *
+ * Locality: cgrp thread
+ */
+static void rd_kafka_cgrp_group_leader_reset(rd_kafka_cgrp_t *rkcg,
+ const char *reason) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "GRPLEADER",
+ "Group \"%.*s\": resetting group leader info: %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), reason);
+
+ if (rkcg->rkcg_group_leader.members) {
+ int i;
+
+ for (i = 0; i < rkcg->rkcg_group_leader.member_cnt; i++)
+ rd_kafka_group_member_clear(
+ &rkcg->rkcg_group_leader.members[i]);
+ rkcg->rkcg_group_leader.member_cnt = 0;
+ rd_free(rkcg->rkcg_group_leader.members);
+ rkcg->rkcg_group_leader.members = NULL;
+ }
+}
+
+
+/**
+ * @brief React to a RD_KAFKA_RESP_ERR_REBALANCE_IN_PROGRESS broker response.
+ */
+static void rd_kafka_cgrp_group_is_rebalancing(rd_kafka_cgrp_t *rkcg) {
+
+ if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
+ RD_KAFKA_REBALANCE_PROTOCOL_EAGER) {
+ rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_false /*lost*/,
+ rd_false /*initiating*/,
+ "rebalance in progress");
+ return;
+ }
+
+
+ /* In the COOPERATIVE case, simply rejoin the group
+ * - partitions are unassigned on SyncGroup response,
+ * not prior to JoinGroup as with the EAGER case. */
+
+ if (RD_KAFKA_CGRP_REBALANCING(rkcg)) {
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REBALANCE",
+ "Group \"%.*s\": skipping "
+ "COOPERATIVE rebalance in state %s "
+ "(join-state %s)%s%s%s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg)
+ ? " (awaiting assign call)"
+ : "",
+ (rkcg->rkcg_rebalance_incr_assignment != NULL)
+ ? " (incremental assignment pending)"
+ : "",
+ rkcg->rkcg_rebalance_rejoin ? " (rebalance rejoin)" : "");
+ return;
+ }
+
+ rd_kafka_cgrp_rejoin(rkcg, "Group is rebalancing");
+}
+
+
+
+/**
+ * @brief Triggers the application rebalance callback if required to
+ * revoke partitions, and transition to INIT state for (eventual)
+ * rejoin. Does nothing if a rebalance workflow is already in
+ * progress
+ */
+static void rd_kafka_cgrp_revoke_all_rejoin_maybe(rd_kafka_cgrp_t *rkcg,
+ rd_bool_t assignment_lost,
+ rd_bool_t initiating,
+ const char *reason) {
+ if (RD_KAFKA_CGRP_REBALANCING(rkcg)) {
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REBALANCE",
+ "Group \"%.*s\": rebalance (%s) "
+ "already in progress, skipping in state %s "
+ "(join-state %s) with %d assigned partition(s)%s%s%s: "
+ "%s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_rebalance_protocol2str(
+ rd_kafka_cgrp_rebalance_protocol(rkcg)),
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ rkcg->rkcg_group_assignment
+ ? rkcg->rkcg_group_assignment->cnt
+ : 0,
+ assignment_lost ? " (lost)" : "",
+ rkcg->rkcg_rebalance_incr_assignment
+ ? ", incremental assignment in progress"
+ : "",
+ rkcg->rkcg_rebalance_rejoin ? ", rejoin on rebalance" : "",
+ reason);
+ return;
+ }
+
+ rd_kafka_cgrp_revoke_all_rejoin(rkcg, assignment_lost, initiating,
+ reason);
+}
+
+
+/**
+ * @brief Triggers the application rebalance callback if required to
+ * revoke partitions, and transition to INIT state for (eventual)
+ * rejoin.
+ */
+static void rd_kafka_cgrp_revoke_all_rejoin(rd_kafka_cgrp_t *rkcg,
+ rd_bool_t assignment_lost,
+ rd_bool_t initiating,
+ const char *reason) {
+
+ rd_kafka_rebalance_protocol_t protocol =
+ rd_kafka_cgrp_rebalance_protocol(rkcg);
+
+ rd_bool_t terminating =
+ unlikely(rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE);
+
+
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REBALANCE",
+ "Group \"%.*s\" %s (%s) in state %s (join-state %s) "
+ "with %d assigned partition(s)%s: %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ initiating ? "initiating rebalance" : "is rebalancing",
+ rd_kafka_rebalance_protocol2str(protocol),
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ rkcg->rkcg_group_assignment ? rkcg->rkcg_group_assignment->cnt : 0,
+ assignment_lost ? " (lost)" : "", reason);
+
+ rd_snprintf(rkcg->rkcg_c.rebalance_reason,
+ sizeof(rkcg->rkcg_c.rebalance_reason), "%s", reason);
+
+
+ if (protocol == RD_KAFKA_REBALANCE_PROTOCOL_EAGER ||
+ protocol == RD_KAFKA_REBALANCE_PROTOCOL_NONE) {
+ /* EAGER case (or initial subscribe) - revoke partitions which
+ * will be followed by rejoin, if required. */
+
+ if (assignment_lost)
+ rd_kafka_cgrp_assignment_set_lost(
+ rkcg, "%s: revoking assignment and rejoining",
+ reason);
+
+ /* Schedule application rebalance op if there is an existing
+ * assignment (albeit perhaps empty) and there is no
+ * outstanding rebalance op in progress. */
+ if (rkcg->rkcg_group_assignment &&
+ !RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg)) {
+ rd_kafka_rebalance_op(
+ rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
+ rkcg->rkcg_group_assignment, reason);
+ } else {
+ /* Skip the join backoff */
+ rd_interval_reset(&rkcg->rkcg_join_intvl);
+
+ rd_kafka_cgrp_rejoin(rkcg, "%s", reason);
+ }
+
+ return;
+ }
+
+
+ /* COOPERATIVE case. */
+
+ /* All partitions should never be revoked unless terminating, leaving
+ * the group, or on assignment lost. Another scenario represents a
+ * logic error. Fail fast in this case. */
+ if (!(terminating || assignment_lost ||
+ (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE))) {
+ rd_kafka_log(rkcg->rkcg_rk, LOG_ERR, "REBALANCE",
+ "Group \"%s\": unexpected instruction to revoke "
+ "current assignment and rebalance "
+ "(terminating=%d, assignment_lost=%d, "
+ "LEAVE_ON_UNASSIGN_DONE=%d)",
+ rkcg->rkcg_group_id->str, terminating,
+ assignment_lost,
+ (rkcg->rkcg_flags &
+ RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE));
+ rd_dassert(!*"BUG: unexpected instruction to revoke "
+ "current assignment and rebalance");
+ }
+
+ if (rkcg->rkcg_group_assignment &&
+ rkcg->rkcg_group_assignment->cnt > 0) {
+ if (assignment_lost)
+ rd_kafka_cgrp_assignment_set_lost(
+ rkcg,
+ "%s: revoking incremental assignment "
+ "and rejoining",
+ reason);
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP,
+ "REBALANCE",
+ "Group \"%.*s\": revoking "
+ "all %d partition(s)%s%s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rkcg->rkcg_group_assignment->cnt,
+ terminating ? " (terminating)" : "",
+ assignment_lost ? " (assignment lost)" : "");
+
+ rd_kafka_rebalance_op_incr(
+ rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
+ rkcg->rkcg_group_assignment,
+ terminating ? rd_false : rd_true /*rejoin*/, reason);
+
+ return;
+ }
+
+ if (terminating) {
+ /* If terminating, then don't rejoin group. */
+ rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP,
+ "REBALANCE",
+ "Group \"%.*s\": consumer is "
+ "terminating, skipping rejoin",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
+ return;
+ }
+
+ rd_kafka_cgrp_rejoin(rkcg, "Current assignment is empty");
+}
+
+
+/**
+ * @brief `max.poll.interval.ms` enforcement check timer.
+ *
+ * @locality rdkafka main thread
+ * @locks none
+ */
+static void
+rd_kafka_cgrp_max_poll_interval_check_tmr_cb(rd_kafka_timers_t *rkts,
+ void *arg) {
+ rd_kafka_cgrp_t *rkcg = arg;
+ rd_kafka_t *rk = rkcg->rkcg_rk;
+ int exceeded;
+
+ exceeded = rd_kafka_max_poll_exceeded(rk);
+
+ if (likely(!exceeded))
+ return;
+
+ rd_kafka_log(rk, LOG_WARNING, "MAXPOLL",
+ "Application maximum poll interval (%dms) "
+ "exceeded by %dms "
+ "(adjust max.poll.interval.ms for "
+ "long-running message processing): "
+ "leaving group",
+ rk->rk_conf.max_poll_interval_ms, exceeded);
+
+ rd_kafka_consumer_err(rkcg->rkcg_q, RD_KAFKA_NODEID_UA,
+ RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED, 0, NULL,
+ NULL, RD_KAFKA_OFFSET_INVALID,
+ "Application maximum poll interval (%dms) "
+ "exceeded by %dms",
+ rk->rk_conf.max_poll_interval_ms, exceeded);
+
+ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_MAX_POLL_EXCEEDED;
+
+ rd_kafka_timer_stop(rkts, &rkcg->rkcg_max_poll_interval_tmr,
+ 1 /*lock*/);
+
+ /* Leave the group before calling rebalance since the standard leave
+ * will be triggered first after the rebalance callback has been served.
+ * But since the application is blocked still doing processing
+ * that leave will be further delayed.
+ *
+ * KIP-345: static group members should continue to respect
+ * `max.poll.interval.ms` but should not send a LeaveGroupRequest.
+ */
+ if (!RD_KAFKA_CGRP_IS_STATIC_MEMBER(rkcg))
+ rd_kafka_cgrp_leave(rkcg);
+
+ /* Timing out or leaving the group invalidates the member id, reset it
+ * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */
+ rd_kafka_cgrp_set_member_id(rkcg, "");
+
+ /* Trigger rebalance */
+ rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_true /*lost*/,
+ rd_true /*initiating*/,
+ "max.poll.interval.ms exceeded");
+}
+
+
+/**
+ * @brief Generate consumer errors for each topic in the list.
+ *
+ * Also replaces the list of last reported topic errors so that repeated
+ * errors are silenced.
+ *
+ * @param errored Errored topics.
+ * @param error_prefix Error message prefix.
+ *
+ * @remark Assumes ownership of \p errored.
+ */
+static void rd_kafka_propagate_consumer_topic_errors(
+ rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *errored,
+ const char *error_prefix) {
+ int i;
+
+ for (i = 0; i < errored->cnt; i++) {
+ rd_kafka_topic_partition_t *topic = &errored->elems[i];
+ rd_kafka_topic_partition_t *prev;
+
+ rd_assert(topic->err);
+
+ /* Normalize error codes, unknown topic may be
+ * reported by the broker, or the lack of a topic in
+ * metadata response is figured out by the client.
+ * Make sure the application only sees one error code
+ * for both these cases. */
+ if (topic->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
+ topic->err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
+
+ /* Check if this topic errored previously */
+ prev = rd_kafka_topic_partition_list_find(
+ rkcg->rkcg_errored_topics, topic->topic,
+ RD_KAFKA_PARTITION_UA);
+
+ if (prev && prev->err == topic->err)
+ continue; /* This topic already reported same error */
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_TOPIC,
+ "TOPICERR", "%s: %s: %s", error_prefix,
+ topic->topic, rd_kafka_err2str(topic->err));
+
+ /* Send consumer error to application */
+ rd_kafka_consumer_err(
+ rkcg->rkcg_q, RD_KAFKA_NODEID_UA, topic->err, 0,
+ topic->topic, NULL, RD_KAFKA_OFFSET_INVALID, "%s: %s: %s",
+ error_prefix, topic->topic, rd_kafka_err2str(topic->err));
+ }
+
+ rd_kafka_topic_partition_list_destroy(rkcg->rkcg_errored_topics);
+ rkcg->rkcg_errored_topics = errored;
+}
+
+
+/**
+ * @brief Work out the topics currently subscribed to that do not
+ * match any pattern in \p subscription.
+ */
+static rd_kafka_topic_partition_list_t *rd_kafka_cgrp_get_unsubscribing_topics(
+ rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *subscription) {
+ int i;
+ rd_kafka_topic_partition_list_t *result;
+
+ result = rd_kafka_topic_partition_list_new(
+ rkcg->rkcg_subscribed_topics->rl_cnt);
+
+ /* TODO: Something that isn't O(N*M) */
+ for (i = 0; i < rkcg->rkcg_subscribed_topics->rl_cnt; i++) {
+ int j;
+ const char *topic =
+ ((rd_kafka_topic_info_t *)
+ rkcg->rkcg_subscribed_topics->rl_elems[i])
+ ->topic;
+
+ for (j = 0; j < subscription->cnt; j++) {
+ const char *pattern = subscription->elems[j].topic;
+ if (rd_kafka_topic_match(rkcg->rkcg_rk, pattern,
+ topic)) {
+ break;
+ }
+ }
+
+ if (j == subscription->cnt)
+ rd_kafka_topic_partition_list_add(
+ result, topic, RD_KAFKA_PARTITION_UA);
+ }
+
+ if (result->cnt == 0) {
+ rd_kafka_topic_partition_list_destroy(result);
+ return NULL;
+ }
+
+ return result;
+}
+
+
+/**
+ * @brief Determine the partitions to revoke, given the topics being
+ * unassigned.
+ */
+static rd_kafka_topic_partition_list_t *
+rd_kafka_cgrp_calculate_subscribe_revoking_partitions(
+ rd_kafka_cgrp_t *rkcg,
+ const rd_kafka_topic_partition_list_t *unsubscribing) {
+ rd_kafka_topic_partition_list_t *revoking;
+ const rd_kafka_topic_partition_t *rktpar;
+
+ if (!unsubscribing)
+ return NULL;
+
+ if (!rkcg->rkcg_group_assignment ||
+ rkcg->rkcg_group_assignment->cnt == 0)
+ return NULL;
+
+ revoking =
+ rd_kafka_topic_partition_list_new(rkcg->rkcg_group_assignment->cnt);
+
+ /* TODO: Something that isn't O(N*M). */
+ RD_KAFKA_TPLIST_FOREACH(rktpar, unsubscribing) {
+ const rd_kafka_topic_partition_t *assigned;
+
+ RD_KAFKA_TPLIST_FOREACH(assigned, rkcg->rkcg_group_assignment) {
+ if (!strcmp(assigned->topic, rktpar->topic)) {
+ rd_kafka_topic_partition_list_add(
+ revoking, assigned->topic,
+ assigned->partition);
+ continue;
+ }
+ }
+ }
+
+ if (revoking->cnt == 0) {
+ rd_kafka_topic_partition_list_destroy(revoking);
+ revoking = NULL;
+ }
+
+ return revoking;
+}
+
+
+/**
+ * @brief Handle a new subscription that is modifying an existing subscription
+ * in the COOPERATIVE case.
+ *
+ * @remark Assumes ownership of \p rktparlist.
+ */
+static rd_kafka_resp_err_t
+rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *rktparlist) {
+ rd_kafka_topic_partition_list_t *unsubscribing_topics;
+ rd_kafka_topic_partition_list_t *revoking;
+ rd_list_t *tinfos;
+ rd_kafka_topic_partition_list_t *errored;
+ int metadata_age;
+ int old_cnt = rkcg->rkcg_subscription->cnt;
+
+ rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
+
+ if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
+ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
+
+ /* Topics in rkcg_subscribed_topics that don't match any pattern in
+ the new subscription. */
+ unsubscribing_topics =
+ rd_kafka_cgrp_get_unsubscribing_topics(rkcg, rktparlist);
+
+ /* Currently assigned topic partitions that are no longer desired. */
+ revoking = rd_kafka_cgrp_calculate_subscribe_revoking_partitions(
+ rkcg, unsubscribing_topics);
+
+ rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
+ rkcg->rkcg_subscription = rktparlist;
+
+ if (rd_kafka_cgrp_metadata_refresh(rkcg, &metadata_age,
+ "modify subscription") == 1) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER,
+ "MODSUB",
+ "Group \"%.*s\": postponing join until "
+ "up-to-date metadata is available",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id));
+
+ rd_assert(
+ rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT ||
+ /* Possible via rd_kafka_cgrp_modify_subscription */
+ rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY);
+
+ rd_kafka_cgrp_set_join_state(
+ rkcg, RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
+
+
+ /* Revoke/join will occur after metadata refresh completes */
+ if (revoking)
+ rd_kafka_topic_partition_list_destroy(revoking);
+ if (unsubscribing_topics)
+ rd_kafka_topic_partition_list_destroy(
+ unsubscribing_topics);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+ }
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE",
+ "Group \"%.*s\": modifying subscription of size %d to "
+ "new subscription of size %d, removing %d topic(s), "
+ "revoking %d partition(s) (join-state %s)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), old_cnt,
+ rkcg->rkcg_subscription->cnt,
+ unsubscribing_topics ? unsubscribing_topics->cnt : 0,
+ revoking ? revoking->cnt : 0,
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+
+ if (unsubscribing_topics)
+ rd_kafka_topic_partition_list_destroy(unsubscribing_topics);
+
+ /* Create a list of the topics in metadata that matches the new
+ * subscription */
+ tinfos = rd_list_new(rkcg->rkcg_subscription->cnt,
+ (void *)rd_kafka_topic_info_destroy);
+
+ /* Unmatched topics will be added to the errored list. */
+ errored = rd_kafka_topic_partition_list_new(0);
+
+ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
+ rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos,
+ rkcg->rkcg_subscription, errored);
+ else
+ rd_kafka_metadata_topic_filter(
+ rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored);
+
+ /* Propagate consumer errors for any non-existent or errored topics.
+ * The function takes ownership of errored. */
+ rd_kafka_propagate_consumer_topic_errors(
+ rkcg, errored, "Subscribed topic not available");
+
+ if (rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos) && !revoking) {
+ rd_kafka_cgrp_rejoin(rkcg, "Subscription modified");
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+ }
+
+ if (revoking) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CONSUMER | RD_KAFKA_DBG_CGRP,
+ "REBALANCE",
+ "Group \"%.*s\" revoking "
+ "%d of %d partition(s)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ revoking->cnt, rkcg->rkcg_group_assignment->cnt);
+
+ rd_kafka_rebalance_op_incr(
+ rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, revoking,
+ rd_true /*rejoin*/, "subscribe");
+
+ rd_kafka_topic_partition_list_destroy(revoking);
+ }
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * Remove existing topic subscription.
+ */
+static rd_kafka_resp_err_t rd_kafka_cgrp_unsubscribe(rd_kafka_cgrp_t *rkcg,
+ rd_bool_t leave_group) {
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "UNSUBSCRIBE",
+ "Group \"%.*s\": unsubscribe from current %ssubscription "
+ "of size %d (leave group=%s, has joined=%s, %s, "
+ "join-state %s)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rkcg->rkcg_subscription ? "" : "unset ",
+ rkcg->rkcg_subscription ? rkcg->rkcg_subscription->cnt : 0,
+ RD_STR_ToF(leave_group),
+ RD_STR_ToF(RD_KAFKA_CGRP_HAS_JOINED(rkcg)),
+ rkcg->rkcg_member_id ? rkcg->rkcg_member_id->str : "n/a",
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+
+ rd_kafka_timer_stop(&rkcg->rkcg_rk->rk_timers,
+ &rkcg->rkcg_max_poll_interval_tmr, 1 /*lock*/);
+
+ if (rkcg->rkcg_subscription) {
+ rd_kafka_topic_partition_list_destroy(rkcg->rkcg_subscription);
+ rkcg->rkcg_subscription = NULL;
+ }
+
+ rd_kafka_cgrp_update_subscribed_topics(rkcg, NULL);
+
+ /*
+ * Clean-up group leader duties, if any.
+ */
+ rd_kafka_cgrp_group_leader_reset(rkcg, "unsubscribe");
+
+ if (leave_group && RD_KAFKA_CGRP_HAS_JOINED(rkcg))
+ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_LEAVE_ON_UNASSIGN_DONE;
+
+ /* FIXME: Why are we only revoking if !assignment_lost ? */
+ if (!rd_kafka_cgrp_assignment_is_lost(rkcg))
+ rd_kafka_cgrp_revoke_all_rejoin(rkcg, rd_false /*not lost*/,
+ rd_true /*initiating*/,
+ "unsubscribe");
+
+ rkcg->rkcg_flags &= ~(RD_KAFKA_CGRP_F_SUBSCRIPTION |
+ RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+/**
+ * Set new atomic topic subscription.
+ */
+static rd_kafka_resp_err_t
+rd_kafka_cgrp_subscribe(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_topic_partition_list_t *rktparlist) {
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE",
+ "Group \"%.*s\": subscribe to new %ssubscription "
+ "of %d topics (join-state %s)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rktparlist ? "" : "unset ",
+ rktparlist ? rktparlist->cnt : 0,
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+
+ if (rkcg->rkcg_rk->rk_conf.enabled_assignor_cnt == 0)
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ /* If the consumer has raised a fatal error treat all subscribes as
+ unsubscribe */
+ if (rd_kafka_fatal_error_code(rkcg->rkcg_rk)) {
+ if (rkcg->rkcg_subscription)
+ rd_kafka_cgrp_unsubscribe(rkcg,
+ rd_true /*leave group*/);
+ return RD_KAFKA_RESP_ERR__FATAL;
+ }
+
+ /* Clear any existing postponed subscribe. */
+ if (rkcg->rkcg_next_subscription)
+ rd_kafka_topic_partition_list_destroy_free(
+ rkcg->rkcg_next_subscription);
+ rkcg->rkcg_next_subscription = NULL;
+ rkcg->rkcg_next_unsubscribe = rd_false;
+
+ if (RD_KAFKA_CGRP_REBALANCING(rkcg)) {
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER, "SUBSCRIBE",
+ "Group \"%.*s\": postponing "
+ "subscribe until previous rebalance "
+ "completes (join-state %s)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+
+ if (!rktparlist)
+ rkcg->rkcg_next_unsubscribe = rd_true;
+ else
+ rkcg->rkcg_next_subscription = rktparlist;
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+ }
+
+ if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
+ RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE &&
+ rktparlist && rkcg->rkcg_subscription)
+ return rd_kafka_cgrp_modify_subscription(rkcg, rktparlist);
+
+ /* Remove existing subscription first */
+ if (rkcg->rkcg_subscription)
+ rd_kafka_cgrp_unsubscribe(
+ rkcg,
+ rktparlist
+ ? rd_false /* don't leave group if new subscription */
+ : rd_true /* leave group if no new subscription */);
+
+ if (!rktparlist)
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+
+ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_SUBSCRIPTION;
+
+ if (rd_kafka_topic_partition_list_regex_cnt(rktparlist) > 0)
+ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION;
+
+ rkcg->rkcg_subscription = rktparlist;
+
+ rd_kafka_cgrp_join(rkcg);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+
+
+/**
+ * Same as cgrp_terminate() but called from the cgrp/main thread upon receiving
+ * the op 'rko' from cgrp_terminate().
+ *
+ * NOTE: Takes ownership of 'rko'
+ *
+ * Locality: main thread
+ */
+void rd_kafka_cgrp_terminate0(rd_kafka_cgrp_t *rkcg, rd_kafka_op_t *rko) {
+
+ rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTERM",
+ "Terminating group \"%.*s\" in state %s "
+ "with %d partition(s)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_list_cnt(&rkcg->rkcg_toppars));
+
+ if (unlikely(rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_TERM ||
+ (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) ||
+ rkcg->rkcg_reply_rko != NULL)) {
+ /* Already terminating or handling a previous terminate */
+ if (rko) {
+ rd_kafka_q_t *rkq = rko->rko_replyq.q;
+ rko->rko_replyq.q = NULL;
+ rd_kafka_consumer_err(
+ rkq, RD_KAFKA_NODEID_UA,
+ RD_KAFKA_RESP_ERR__IN_PROGRESS,
+ rko->rko_replyq.version, NULL, NULL,
+ RD_KAFKA_OFFSET_INVALID, "Group is %s",
+ rkcg->rkcg_reply_rko ? "terminating"
+ : "terminated");
+ rd_kafka_q_destroy(rkq);
+ rd_kafka_op_destroy(rko);
+ }
+ return;
+ }
+
+ /* Mark for stopping, the actual state transition
+ * is performed when all toppars have left. */
+ rkcg->rkcg_flags |= RD_KAFKA_CGRP_F_TERMINATE;
+ rkcg->rkcg_ts_terminate = rd_clock();
+ rkcg->rkcg_reply_rko = rko;
+
+ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION)
+ rd_kafka_cgrp_unsubscribe(
+ rkcg,
+ /* Leave group if this is a controlled shutdown */
+ !rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk));
+
+ /* Reset the wait-for-LeaveGroup flag if there is an outstanding
+ * LeaveGroupRequest being waited on (from a prior unsubscribe), but
+ * the destroy flags have NO_CONSUMER_CLOSE set, which calls
+ * for immediate termination. */
+ if (rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk))
+ rkcg->rkcg_flags &= ~RD_KAFKA_CGRP_F_WAIT_LEAVE;
+
+ /* If there's an oustanding rebalance which has not yet been
+ * served by the application it will be served from consumer_close().
+ * If the instance is being terminated with NO_CONSUMER_CLOSE we
+ * trigger unassign directly to avoid stalling on rebalance callback
+ * queues that are no longer served by the application. */
+ if (!RD_KAFKA_CGRP_WAIT_ASSIGN_CALL(rkcg) ||
+ rd_kafka_destroy_flags_no_consumer_close(rkcg->rkcg_rk))
+ rd_kafka_cgrp_unassign(rkcg);
+
+ /* Serve assignment so it can start to decommission */
+ rd_kafka_assignment_serve(rkcg->rkcg_rk);
+
+ /* Try to terminate right away if all preconditions are met. */
+ rd_kafka_cgrp_try_terminate(rkcg);
+}
+
+
+/**
+ * Terminate and decommission a cgrp asynchronously.
+ *
+ * Locality: any thread
+ */
+void rd_kafka_cgrp_terminate(rd_kafka_cgrp_t *rkcg, rd_kafka_replyq_t replyq) {
+ rd_kafka_assert(NULL, !thrd_is_current(rkcg->rkcg_rk->rk_thread));
+ rd_kafka_cgrp_op(rkcg, NULL, replyq, RD_KAFKA_OP_TERMINATE, 0);
+}
+
+
+struct _op_timeout_offset_commit {
+ rd_ts_t now;
+ rd_kafka_t *rk;
+ rd_list_t expired;
+};
+
+/**
+ * q_filter callback for expiring OFFSET_COMMIT timeouts.
+ */
+static int rd_kafka_op_offset_commit_timeout_check(rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko,
+ void *opaque) {
+ struct _op_timeout_offset_commit *state =
+ (struct _op_timeout_offset_commit *)opaque;
+
+ if (likely(rko->rko_type != RD_KAFKA_OP_OFFSET_COMMIT ||
+ rko->rko_u.offset_commit.ts_timeout == 0 ||
+ rko->rko_u.offset_commit.ts_timeout > state->now)) {
+ return 0;
+ }
+
+ rd_kafka_q_deq0(rkq, rko);
+
+ /* Add to temporary list to avoid recursive
+ * locking of rkcg_wait_coord_q. */
+ rd_list_add(&state->expired, rko);
+ return 1;
+}
+
+
+/**
+ * Scan for various timeouts.
+ */
+static void rd_kafka_cgrp_timeout_scan(rd_kafka_cgrp_t *rkcg, rd_ts_t now) {
+ struct _op_timeout_offset_commit ofc_state;
+ int i, cnt = 0;
+ rd_kafka_op_t *rko;
+
+ ofc_state.now = now;
+ ofc_state.rk = rkcg->rkcg_rk;
+ rd_list_init(&ofc_state.expired, 0, NULL);
+
+ cnt += rd_kafka_q_apply(rkcg->rkcg_wait_coord_q,
+ rd_kafka_op_offset_commit_timeout_check,
+ &ofc_state);
+
+ RD_LIST_FOREACH(rko, &ofc_state.expired, i)
+ rd_kafka_cgrp_op_handle_OffsetCommit(rkcg->rkcg_rk, NULL,
+ RD_KAFKA_RESP_ERR__WAIT_COORD,
+ NULL, NULL, rko);
+
+ rd_list_destroy(&ofc_state.expired);
+
+ if (cnt > 0)
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "CGRPTIMEOUT",
+ "Group \"%.*s\": timed out %d op(s), %d remain",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id), cnt,
+ rd_kafka_q_len(rkcg->rkcg_wait_coord_q));
+}
+
+
+/**
+ * @brief Handle an assign op.
+ * @locality rdkafka main thread
+ * @locks none
+ */
+static void rd_kafka_cgrp_handle_assign_op(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_op_t *rko) {
+ rd_kafka_error_t *error = NULL;
+
+ if (rd_kafka_fatal_error_code(rkcg->rkcg_rk) ||
+ rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE) {
+ /* Treat all assignments as unassign when a fatal error is
+ * raised or the cgrp is terminating. */
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP | RD_KAFKA_DBG_CONSUMER,
+ "ASSIGN",
+ "Group \"%s\": Consumer %s: "
+ "treating assign as unassign",
+ rkcg->rkcg_group_id->str,
+ rd_kafka_fatal_error_code(rkcg->rkcg_rk)
+ ? "has raised a fatal error"
+ : "is terminating");
+
+ if (rko->rko_u.assign.partitions) {
+ rd_kafka_topic_partition_list_destroy(
+ rko->rko_u.assign.partitions);
+ rko->rko_u.assign.partitions = NULL;
+ }
+ rko->rko_u.assign.method = RD_KAFKA_ASSIGN_METHOD_ASSIGN;
+
+ } else if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
+ RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE &&
+ !(rko->rko_u.assign.method ==
+ RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN ||
+ rko->rko_u.assign.method ==
+ RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN))
+ error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE,
+ "Changes to the current assignment "
+ "must be made using "
+ "incremental_assign() or "
+ "incremental_unassign() "
+ "when rebalance protocol type is "
+ "COOPERATIVE");
+
+ else if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
+ RD_KAFKA_REBALANCE_PROTOCOL_EAGER &&
+ !(rko->rko_u.assign.method == RD_KAFKA_ASSIGN_METHOD_ASSIGN))
+ error = rd_kafka_error_new(RD_KAFKA_RESP_ERR__STATE,
+ "Changes to the current assignment "
+ "must be made using "
+ "assign() when rebalance "
+ "protocol type is EAGER");
+
+ if (!error) {
+ switch (rko->rko_u.assign.method) {
+ case RD_KAFKA_ASSIGN_METHOD_ASSIGN:
+ /* New atomic assignment (partitions != NULL),
+ * or unassignment (partitions == NULL) */
+ if (rko->rko_u.assign.partitions)
+ error = rd_kafka_cgrp_assign(
+ rkcg, rko->rko_u.assign.partitions);
+ else
+ error = rd_kafka_cgrp_unassign(rkcg);
+ break;
+ case RD_KAFKA_ASSIGN_METHOD_INCR_ASSIGN:
+ error = rd_kafka_cgrp_incremental_assign(
+ rkcg, rko->rko_u.assign.partitions);
+ break;
+ case RD_KAFKA_ASSIGN_METHOD_INCR_UNASSIGN:
+ error = rd_kafka_cgrp_incremental_unassign(
+ rkcg, rko->rko_u.assign.partitions);
+ break;
+ default:
+ RD_NOTREACHED();
+ break;
+ }
+
+ /* If call succeeded serve the assignment */
+ if (!error)
+ rd_kafka_assignment_serve(rkcg->rkcg_rk);
+ }
+
+ if (error) {
+ /* Log error since caller might not check
+ * *assign() return value. */
+ rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "ASSIGN",
+ "Group \"%s\": application *assign() call "
+ "failed: %s",
+ rkcg->rkcg_group_id->str,
+ rd_kafka_error_string(error));
+ }
+
+ rd_kafka_op_error_reply(rko, error);
+}
+
+
+/**
+ * @brief Handle cgrp queue op.
+ * @locality rdkafka main thread
+ * @locks none
+ */
+static rd_kafka_op_res_t rd_kafka_cgrp_op_serve(rd_kafka_t *rk,
+ rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko,
+ rd_kafka_q_cb_type_t cb_type,
+ void *opaque) {
+ rd_kafka_cgrp_t *rkcg = opaque;
+ rd_kafka_toppar_t *rktp;
+ rd_kafka_resp_err_t err;
+ const int silent_op = rko->rko_type == RD_KAFKA_OP_RECV_BUF;
+
+ rktp = rko->rko_rktp;
+
+ if (rktp && !silent_op)
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CGRP, "CGRPOP",
+ "Group \"%.*s\" received op %s in state %s "
+ "(join-state %s) for %.*s [%" PRId32 "]",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_op2str(rko->rko_type),
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
+ rktp->rktp_partition);
+ else if (!silent_op)
+ rd_kafka_dbg(
+ rkcg->rkcg_rk, CGRP, "CGRPOP",
+ "Group \"%.*s\" received op %s in state %s "
+ "(join-state %s)",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_op2str(rko->rko_type),
+ rd_kafka_cgrp_state_names[rkcg->rkcg_state],
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+
+ switch ((int)rko->rko_type) {
+ case RD_KAFKA_OP_NAME:
+ /* Return the currently assigned member id. */
+ if (rkcg->rkcg_member_id)
+ rko->rko_u.name.str =
+ RD_KAFKAP_STR_DUP(rkcg->rkcg_member_id);
+ rd_kafka_op_reply(rko, 0);
+ rko = NULL;
+ break;
+
+ case RD_KAFKA_OP_CG_METADATA:
+ /* Return the current consumer group metadata. */
+ rko->rko_u.cg_metadata =
+ rkcg->rkcg_member_id
+ ? rd_kafka_consumer_group_metadata_new_with_genid(
+ rkcg->rkcg_rk->rk_conf.group_id_str,
+ rkcg->rkcg_generation_id,
+ rkcg->rkcg_member_id->str,
+ rkcg->rkcg_rk->rk_conf.group_instance_id)
+ : NULL;
+ rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
+ rko = NULL;
+ break;
+
+ case RD_KAFKA_OP_OFFSET_FETCH:
+ if (rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP ||
+ (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)) {
+ rd_kafka_op_handle_OffsetFetch(
+ rkcg->rkcg_rk, NULL, RD_KAFKA_RESP_ERR__WAIT_COORD,
+ NULL, NULL, rko);
+ rko = NULL; /* rko freed by handler */
+ break;
+ }
+
+ rd_kafka_OffsetFetchRequest(
+ rkcg->rkcg_coord, rk->rk_group_id->str,
+ rko->rko_u.offset_fetch.partitions,
+ rko->rko_u.offset_fetch.require_stable_offsets,
+ 0, /* Timeout */
+ RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
+ rd_kafka_op_handle_OffsetFetch, rko);
+ rko = NULL; /* rko now owned by request */
+ break;
+
+ case RD_KAFKA_OP_PARTITION_JOIN:
+ rd_kafka_cgrp_partition_add(rkcg, rktp);
+
+ /* If terminating tell the partition to leave */
+ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_TERMINATE)
+ rd_kafka_toppar_op_fetch_stop(rktp, RD_KAFKA_NO_REPLYQ);
+ break;
+
+ case RD_KAFKA_OP_PARTITION_LEAVE:
+ rd_kafka_cgrp_partition_del(rkcg, rktp);
+ break;
+
+ case RD_KAFKA_OP_OFFSET_COMMIT:
+ /* Trigger offsets commit. */
+ rd_kafka_cgrp_offsets_commit(rkcg, rko,
+ /* only set offsets
+ * if no partitions were
+ * specified. */
+ rko->rko_u.offset_commit.partitions
+ ? 0
+ : 1 /* set_offsets*/,
+ rko->rko_u.offset_commit.reason);
+ rko = NULL; /* rko now owned by request */
+ break;
+
+ case RD_KAFKA_OP_COORD_QUERY:
+ rd_kafka_cgrp_coord_query(
+ rkcg,
+ rko->rko_err ? rd_kafka_err2str(rko->rko_err) : "from op");
+ break;
+
+ case RD_KAFKA_OP_SUBSCRIBE:
+ rd_kafka_app_polled(rk);
+
+ /* New atomic subscription (may be NULL) */
+ err =
+ rd_kafka_cgrp_subscribe(rkcg, rko->rko_u.subscribe.topics);
+
+ if (!err) /* now owned by rkcg */
+ rko->rko_u.subscribe.topics = NULL;
+
+ rd_kafka_op_reply(rko, err);
+ rko = NULL;
+ break;
+
+ case RD_KAFKA_OP_ASSIGN:
+ rd_kafka_cgrp_handle_assign_op(rkcg, rko);
+ rko = NULL;
+ break;
+
+ case RD_KAFKA_OP_GET_SUBSCRIPTION:
+ if (rkcg->rkcg_next_subscription)
+ rko->rko_u.subscribe.topics =
+ rd_kafka_topic_partition_list_copy(
+ rkcg->rkcg_next_subscription);
+ else if (rkcg->rkcg_next_unsubscribe)
+ rko->rko_u.subscribe.topics = NULL;
+ else if (rkcg->rkcg_subscription)
+ rko->rko_u.subscribe.topics =
+ rd_kafka_topic_partition_list_copy(
+ rkcg->rkcg_subscription);
+ rd_kafka_op_reply(rko, 0);
+ rko = NULL;
+ break;
+
+ case RD_KAFKA_OP_GET_ASSIGNMENT:
+ /* This is the consumer assignment, not the group assignment. */
+ rko->rko_u.assign.partitions =
+ rd_kafka_topic_partition_list_copy(
+ rkcg->rkcg_rk->rk_consumer.assignment.all);
+
+ rd_kafka_op_reply(rko, 0);
+ rko = NULL;
+ break;
+
+ case RD_KAFKA_OP_GET_REBALANCE_PROTOCOL:
+ rko->rko_u.rebalance_protocol.str =
+ rd_kafka_rebalance_protocol2str(
+ rd_kafka_cgrp_rebalance_protocol(rkcg));
+ rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
+ rko = NULL;
+ break;
+
+ case RD_KAFKA_OP_TERMINATE:
+ rd_kafka_cgrp_terminate0(rkcg, rko);
+ rko = NULL; /* terminate0() takes ownership */
+ break;
+
+ default:
+ rd_kafka_assert(rkcg->rkcg_rk, !*"unknown type");
+ break;
+ }
+
+ if (rko)
+ rd_kafka_op_destroy(rko);
+
+ return RD_KAFKA_OP_RES_HANDLED;
+}
+
+
+/**
+ * @returns true if the session timeout has expired (due to no successful
+ * Heartbeats in session.timeout.ms) and triggers a rebalance.
+ */
+static rd_bool_t rd_kafka_cgrp_session_timeout_check(rd_kafka_cgrp_t *rkcg,
+ rd_ts_t now) {
+ rd_ts_t delta;
+ char buf[256];
+
+ if (unlikely(!rkcg->rkcg_ts_session_timeout))
+ return rd_true; /* Session has expired */
+
+ delta = now - rkcg->rkcg_ts_session_timeout;
+ if (likely(delta < 0))
+ return rd_false;
+
+ delta += rkcg->rkcg_rk->rk_conf.group_session_timeout_ms * 1000;
+
+ rd_snprintf(buf, sizeof(buf),
+ "Consumer group session timed out (in join-state %s) after "
+ "%" PRId64
+ " ms without a successful response from the "
+ "group coordinator (broker %" PRId32 ", last error was %s)",
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state],
+ delta / 1000, rkcg->rkcg_coord_id,
+ rd_kafka_err2str(rkcg->rkcg_last_heartbeat_err));
+
+ rkcg->rkcg_last_heartbeat_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+
+ rd_kafka_log(rkcg->rkcg_rk, LOG_WARNING, "SESSTMOUT",
+ "%s: revoking assignment and rejoining group", buf);
+
+ /* Prevent further rebalances */
+ rkcg->rkcg_ts_session_timeout = 0;
+
+ /* Timing out invalidates the member id, reset it
+ * now to avoid an ERR_UNKNOWN_MEMBER_ID on the next join. */
+ rd_kafka_cgrp_set_member_id(rkcg, "");
+
+ /* Revoke and rebalance */
+ rd_kafka_cgrp_revoke_all_rejoin_maybe(rkcg, rd_true /*lost*/,
+ rd_true /*initiating*/, buf);
+
+ return rd_true;
+}
+
+
+/**
+ * @brief Apply the next waiting subscribe/unsubscribe, if any.
+ */
+static void rd_kafka_cgrp_apply_next_subscribe(rd_kafka_cgrp_t *rkcg) {
+ rd_assert(rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_INIT);
+
+ if (rkcg->rkcg_next_subscription) {
+ rd_kafka_topic_partition_list_t *next_subscription =
+ rkcg->rkcg_next_subscription;
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIBE",
+ "Group \"%s\": invoking waiting postponed "
+ "subscribe",
+ rkcg->rkcg_group_id->str);
+ rkcg->rkcg_next_subscription = NULL;
+ rd_kafka_cgrp_subscribe(rkcg, next_subscription);
+
+ } else if (rkcg->rkcg_next_unsubscribe) {
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "SUBSCRIBE",
+ "Group \"%s\": invoking waiting postponed "
+ "unsubscribe",
+ rkcg->rkcg_group_id->str);
+ rkcg->rkcg_next_unsubscribe = rd_false;
+ rd_kafka_cgrp_unsubscribe(rkcg, rd_true /*Leave*/);
+ }
+}
+
+/**
+ * Client group's join state handling
+ */
+static void rd_kafka_cgrp_join_state_serve(rd_kafka_cgrp_t *rkcg) {
+ rd_ts_t now = rd_clock();
+
+ if (unlikely(rd_kafka_fatal_error_code(rkcg->rkcg_rk)))
+ return;
+
+ switch (rkcg->rkcg_join_state) {
+ case RD_KAFKA_CGRP_JOIN_STATE_INIT:
+ if (unlikely(rd_kafka_cgrp_awaiting_response(rkcg)))
+ break;
+
+ /* If there is a next subscription, apply it. */
+ rd_kafka_cgrp_apply_next_subscribe(rkcg);
+
+ /* If we have a subscription start the join process. */
+ if (!rkcg->rkcg_subscription)
+ break;
+
+ if (rd_interval_immediate(&rkcg->rkcg_join_intvl, 1000 * 1000,
+ now) > 0)
+ rd_kafka_cgrp_join(rkcg);
+ break;
+
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_JOIN:
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA:
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_SYNC:
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_TO_COMPLETE:
+ /* FIXME: I think we might have to send heartbeats in
+ * in WAIT_INCR_UNASSIGN, yes-no? */
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_INCR_UNASSIGN_TO_COMPLETE:
+ break;
+
+ case RD_KAFKA_CGRP_JOIN_STATE_STEADY:
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_ASSIGN_CALL:
+ case RD_KAFKA_CGRP_JOIN_STATE_WAIT_UNASSIGN_CALL:
+ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_SUBSCRIPTION &&
+ rd_interval(
+ &rkcg->rkcg_heartbeat_intvl,
+ rkcg->rkcg_rk->rk_conf.group_heartbeat_intvl_ms * 1000,
+ now) > 0)
+ rd_kafka_cgrp_heartbeat(rkcg);
+ break;
+ }
+}
+/**
+ * Client group handling.
+ * Called from main thread to serve the operational aspects of a cgrp.
+ */
+void rd_kafka_cgrp_serve(rd_kafka_cgrp_t *rkcg) {
+ rd_kafka_broker_t *rkb = rkcg->rkcg_coord;
+ int rkb_state = RD_KAFKA_BROKER_STATE_INIT;
+ rd_ts_t now;
+
+ if (rkb) {
+ rd_kafka_broker_lock(rkb);
+ rkb_state = rkb->rkb_state;
+ rd_kafka_broker_unlock(rkb);
+
+ /* Go back to querying state if we lost the current coordinator
+ * connection. */
+ if (rkb_state < RD_KAFKA_BROKER_STATE_UP &&
+ rkcg->rkcg_state == RD_KAFKA_CGRP_STATE_UP)
+ rd_kafka_cgrp_set_state(
+ rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
+ }
+
+ now = rd_clock();
+
+ /* Check for cgrp termination */
+ if (unlikely(rd_kafka_cgrp_try_terminate(rkcg))) {
+ rd_kafka_cgrp_terminated(rkcg);
+ return; /* cgrp terminated */
+ }
+
+ /* Bail out if we're terminating. */
+ if (unlikely(rd_kafka_terminating(rkcg->rkcg_rk)))
+ return;
+
+ /* Check session timeout regardless of current coordinator
+ * connection state (rkcg_state) */
+ if (rkcg->rkcg_join_state == RD_KAFKA_CGRP_JOIN_STATE_STEADY)
+ rd_kafka_cgrp_session_timeout_check(rkcg, now);
+
+retry:
+ switch (rkcg->rkcg_state) {
+ case RD_KAFKA_CGRP_STATE_TERM:
+ break;
+
+ case RD_KAFKA_CGRP_STATE_INIT:
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_QUERY_COORD);
+ /* FALLTHRU */
+
+ case RD_KAFKA_CGRP_STATE_QUERY_COORD:
+ /* Query for coordinator. */
+ if (rd_interval_immediate(&rkcg->rkcg_coord_query_intvl,
+ 500 * 1000, now) > 0)
+ rd_kafka_cgrp_coord_query(rkcg,
+ "intervaled in "
+ "state query-coord");
+ break;
+
+ case RD_KAFKA_CGRP_STATE_WAIT_COORD:
+ /* Waiting for FindCoordinator response */
+ break;
+
+ case RD_KAFKA_CGRP_STATE_WAIT_BROKER:
+ /* See if the group should be reassigned to another broker. */
+ if (rd_kafka_cgrp_coord_update(rkcg, rkcg->rkcg_coord_id))
+ goto retry; /* Coordinator changed, retry state-machine
+ * to speed up next transition. */
+
+ /* Coordinator query */
+ if (rd_interval(&rkcg->rkcg_coord_query_intvl, 1000 * 1000,
+ now) > 0)
+ rd_kafka_cgrp_coord_query(rkcg,
+ "intervaled in "
+ "state wait-broker");
+ break;
+
+ case RD_KAFKA_CGRP_STATE_WAIT_BROKER_TRANSPORT:
+ /* Waiting for broker transport to come up.
+ * Also make sure broker supports groups. */
+ if (rkb_state < RD_KAFKA_BROKER_STATE_UP || !rkb ||
+ !rd_kafka_broker_supports(
+ rkb, RD_KAFKA_FEATURE_BROKER_GROUP_COORD)) {
+ /* Coordinator query */
+ if (rd_interval(&rkcg->rkcg_coord_query_intvl,
+ 1000 * 1000, now) > 0)
+ rd_kafka_cgrp_coord_query(
+ rkcg,
+ "intervaled in state "
+ "wait-broker-transport");
+
+ } else {
+ rd_kafka_cgrp_set_state(rkcg, RD_KAFKA_CGRP_STATE_UP);
+
+ /* Serve join state to trigger (re)join */
+ rd_kafka_cgrp_join_state_serve(rkcg);
+
+ /* Serve any pending partitions in the assignment */
+ rd_kafka_assignment_serve(rkcg->rkcg_rk);
+ }
+ break;
+
+ case RD_KAFKA_CGRP_STATE_UP:
+ /* Move any ops awaiting the coordinator to the ops queue
+ * for reprocessing. */
+ rd_kafka_q_concat(rkcg->rkcg_ops, rkcg->rkcg_wait_coord_q);
+
+ /* Relaxed coordinator queries. */
+ if (rd_interval(&rkcg->rkcg_coord_query_intvl,
+ rkcg->rkcg_rk->rk_conf.coord_query_intvl_ms *
+ 1000,
+ now) > 0)
+ rd_kafka_cgrp_coord_query(rkcg,
+ "intervaled in state up");
+
+ rd_kafka_cgrp_join_state_serve(rkcg);
+ break;
+ }
+
+ if (unlikely(rkcg->rkcg_state != RD_KAFKA_CGRP_STATE_UP &&
+ rd_interval(&rkcg->rkcg_timeout_scan_intvl, 1000 * 1000,
+ now) > 0))
+ rd_kafka_cgrp_timeout_scan(rkcg, now);
+}
+
+
+
+/**
+ * Send an op to a cgrp.
+ *
+ * Locality: any thread
+ */
+void rd_kafka_cgrp_op(rd_kafka_cgrp_t *rkcg,
+ rd_kafka_toppar_t *rktp,
+ rd_kafka_replyq_t replyq,
+ rd_kafka_op_type_t type,
+ rd_kafka_resp_err_t err) {
+ rd_kafka_op_t *rko;
+
+ rko = rd_kafka_op_new(type);
+ rko->rko_err = err;
+ rko->rko_replyq = replyq;
+
+ if (rktp)
+ rko->rko_rktp = rd_kafka_toppar_keep(rktp);
+
+ rd_kafka_q_enq(rkcg->rkcg_ops, rko);
+}
+
+
+
+void rd_kafka_cgrp_set_member_id(rd_kafka_cgrp_t *rkcg, const char *member_id) {
+ if (rkcg->rkcg_member_id && member_id &&
+ !rd_kafkap_str_cmp_str(rkcg->rkcg_member_id, member_id))
+ return; /* No change */
+
+ rd_kafka_dbg(rkcg->rkcg_rk, CGRP, "MEMBERID",
+ "Group \"%.*s\": updating member id \"%s\" -> \"%s\"",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rkcg->rkcg_member_id ? rkcg->rkcg_member_id->str
+ : "(not-set)",
+ member_id ? member_id : "(not-set)");
+
+ if (rkcg->rkcg_member_id) {
+ rd_kafkap_str_destroy(rkcg->rkcg_member_id);
+ rkcg->rkcg_member_id = NULL;
+ }
+
+ if (member_id)
+ rkcg->rkcg_member_id = rd_kafkap_str_new(member_id, -1);
+}
+
+
+/**
+ * @brief Determine owned partitions that no longer exist (partitions in
+ * deleted or re-created topics).
+ */
+static rd_kafka_topic_partition_list_t *
+rd_kafka_cgrp_owned_but_not_exist_partitions(rd_kafka_cgrp_t *rkcg) {
+ rd_kafka_topic_partition_list_t *result = NULL;
+ const rd_kafka_topic_partition_t *curr;
+
+ if (!rkcg->rkcg_group_assignment)
+ return NULL;
+
+ RD_KAFKA_TPLIST_FOREACH(curr, rkcg->rkcg_group_assignment) {
+ if (rd_list_find(rkcg->rkcg_subscribed_topics, curr->topic,
+ rd_kafka_topic_info_topic_cmp))
+ continue;
+
+ if (!result)
+ result = rd_kafka_topic_partition_list_new(
+ rkcg->rkcg_group_assignment->cnt);
+
+ rd_kafka_topic_partition_list_add_copy(result, curr);
+ }
+
+ return result;
+}
+
+
+/**
+ * @brief Check if the latest metadata affects the current subscription:
+ * - matched topic added
+ * - matched topic removed
+ * - matched topic's partition count change
+ *
+ * @locks none
+ * @locality rdkafka main thread
+ */
+void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg,
+ rd_bool_t do_join) {
+ rd_list_t *tinfos;
+ rd_kafka_topic_partition_list_t *errored;
+ rd_bool_t changed;
+
+ rd_kafka_assert(NULL, thrd_is_current(rkcg->rkcg_rk->rk_thread));
+
+ if (!rkcg->rkcg_subscription || rkcg->rkcg_subscription->cnt == 0)
+ return;
+
+ /*
+ * Unmatched topics will be added to the errored list.
+ */
+ errored = rd_kafka_topic_partition_list_new(0);
+
+ /*
+ * Create a list of the topics in metadata that matches our subscription
+ */
+ tinfos = rd_list_new(rkcg->rkcg_subscription->cnt,
+ (void *)rd_kafka_topic_info_destroy);
+
+ if (rkcg->rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION)
+ rd_kafka_metadata_topic_match(rkcg->rkcg_rk, tinfos,
+ rkcg->rkcg_subscription, errored);
+ else
+ rd_kafka_metadata_topic_filter(
+ rkcg->rkcg_rk, tinfos, rkcg->rkcg_subscription, errored);
+
+
+ /*
+ * Propagate consumer errors for any non-existent or errored topics.
+ * The function takes ownership of errored.
+ */
+ rd_kafka_propagate_consumer_topic_errors(
+ rkcg, errored, "Subscribed topic not available");
+
+ /*
+ * Update effective list of topics (takes ownership of \c tinfos)
+ */
+ changed = rd_kafka_cgrp_update_subscribed_topics(rkcg, tinfos);
+
+ if (!do_join ||
+ (!changed &&
+ /* If we get the same effective list of topics as last time around,
+ * but the join is waiting for this metadata query to complete,
+ * then we should not return here but follow through with the
+ * (re)join below. */
+ rkcg->rkcg_join_state != RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA))
+ return;
+
+ /* List of subscribed topics changed, trigger rejoin. */
+ rd_kafka_dbg(rkcg->rkcg_rk,
+ CGRP | RD_KAFKA_DBG_METADATA | RD_KAFKA_DBG_CONSUMER,
+ "REJOIN",
+ "Group \"%.*s\": "
+ "subscription updated from metadata change: "
+ "rejoining group in state %s",
+ RD_KAFKAP_STR_PR(rkcg->rkcg_group_id),
+ rd_kafka_cgrp_join_state_names[rkcg->rkcg_join_state]);
+
+ if (rd_kafka_cgrp_rebalance_protocol(rkcg) ==
+ RD_KAFKA_REBALANCE_PROTOCOL_COOPERATIVE) {
+
+ /* Partitions from deleted topics */
+ rd_kafka_topic_partition_list_t *owned_but_not_exist =
+ rd_kafka_cgrp_owned_but_not_exist_partitions(rkcg);
+
+ if (owned_but_not_exist) {
+ rd_kafka_cgrp_assignment_set_lost(
+ rkcg, "%d subscribed topic(s) no longer exist",
+ owned_but_not_exist->cnt);
+
+ rd_kafka_rebalance_op_incr(
+ rkcg, RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS,
+ owned_but_not_exist,
+ rkcg->rkcg_group_leader.members != NULL
+ /* Rejoin group following revoke's
+ * unassign if we are leader */
+ ,
+ "topics not available");
+ rd_kafka_topic_partition_list_destroy(
+ owned_but_not_exist);
+
+ } else {
+ /* Nothing to revoke, rejoin group if we are the
+ * leader.
+ * The KIP says to rejoin the group on metadata
+ * change only if we're the leader. But what if a
+ * non-leader is subscribed to a regex that the others
+ * aren't?
+ * Going against the KIP and rejoining here. */
+ rd_kafka_cgrp_rejoin(
+ rkcg,
+ "Metadata for subscribed topic(s) has "
+ "changed");
+ }
+
+ } else {
+ /* EAGER */
+ rd_kafka_cgrp_revoke_rejoin(rkcg,
+ "Metadata for subscribed topic(s) "
+ "has changed");
+ }
+
+ /* We shouldn't get stuck in this state. */
+ rd_dassert(rkcg->rkcg_join_state !=
+ RD_KAFKA_CGRP_JOIN_STATE_WAIT_METADATA);
+}
+
+
+rd_kafka_consumer_group_metadata_t *
+rd_kafka_consumer_group_metadata_new(const char *group_id) {
+ rd_kafka_consumer_group_metadata_t *cgmetadata;
+
+ cgmetadata = rd_kafka_consumer_group_metadata_new_with_genid(
+ group_id, -1, "", NULL);
+
+ return cgmetadata;
+}
+
+rd_kafka_consumer_group_metadata_t *
+rd_kafka_consumer_group_metadata_new_with_genid(const char *group_id,
+ int32_t generation_id,
+ const char *member_id,
+ const char *group_instance_id) {
+ rd_kafka_consumer_group_metadata_t *cgmetadata;
+
+ cgmetadata = rd_calloc(1, sizeof(*cgmetadata));
+ cgmetadata->group_id = rd_strdup(group_id);
+ cgmetadata->generation_id = generation_id;
+ cgmetadata->member_id = rd_strdup(member_id);
+ if (group_instance_id)
+ cgmetadata->group_instance_id = rd_strdup(group_instance_id);
+
+ return cgmetadata;
+}
+
+rd_kafka_consumer_group_metadata_t *
+rd_kafka_consumer_group_metadata(rd_kafka_t *rk) {
+ rd_kafka_consumer_group_metadata_t *cgmetadata;
+ rd_kafka_op_t *rko;
+ rd_kafka_cgrp_t *rkcg;
+
+ if (!(rkcg = rd_kafka_cgrp_get(rk)))
+ return NULL;
+
+ rko = rd_kafka_op_req2(rkcg->rkcg_ops, RD_KAFKA_OP_CG_METADATA);
+ if (!rko)
+ return NULL;
+
+ cgmetadata = rko->rko_u.cg_metadata;
+ rko->rko_u.cg_metadata = NULL;
+ rd_kafka_op_destroy(rko);
+
+ return cgmetadata;
+}
+
+void rd_kafka_consumer_group_metadata_destroy(
+ rd_kafka_consumer_group_metadata_t *cgmetadata) {
+ rd_free(cgmetadata->group_id);
+ rd_free(cgmetadata->member_id);
+ if (cgmetadata->group_instance_id)
+ rd_free(cgmetadata->group_instance_id);
+ rd_free(cgmetadata);
+}
+
+rd_kafka_consumer_group_metadata_t *rd_kafka_consumer_group_metadata_dup(
+ const rd_kafka_consumer_group_metadata_t *cgmetadata) {
+ rd_kafka_consumer_group_metadata_t *ret;
+
+ ret = rd_calloc(1, sizeof(*cgmetadata));
+ ret->group_id = rd_strdup(cgmetadata->group_id);
+ ret->generation_id = cgmetadata->generation_id;
+ ret->member_id = rd_strdup(cgmetadata->member_id);
+ if (cgmetadata->group_instance_id)
+ ret->group_instance_id =
+ rd_strdup(cgmetadata->group_instance_id);
+
+ return ret;
+}
+
+/*
+ * Consumer group metadata serialization format v2:
+ * "CGMDv2:"<generation_id><group_id>"\0"<member_id>"\0" \
+ * <group_instance_id_is_null>[<group_instance_id>"\0"]
+ * Where <group_id> is the group_id string.
+ */
+static const char rd_kafka_consumer_group_metadata_magic[7] = "CGMDv2:";
+
+rd_kafka_error_t *rd_kafka_consumer_group_metadata_write(
+ const rd_kafka_consumer_group_metadata_t *cgmd,
+ void **bufferp,
+ size_t *sizep) {
+ char *buf;
+ size_t size;
+ size_t of = 0;
+ size_t magic_len = sizeof(rd_kafka_consumer_group_metadata_magic);
+ size_t groupid_len = strlen(cgmd->group_id) + 1;
+ size_t generationid_len = sizeof(cgmd->generation_id);
+ size_t member_id_len = strlen(cgmd->member_id) + 1;
+ int8_t group_instance_id_is_null = cgmd->group_instance_id ? 0 : 1;
+ size_t group_instance_id_is_null_len =
+ sizeof(group_instance_id_is_null);
+ size_t group_instance_id_len =
+ cgmd->group_instance_id ? strlen(cgmd->group_instance_id) + 1 : 0;
+
+ size = magic_len + groupid_len + generationid_len + member_id_len +
+ group_instance_id_is_null_len + group_instance_id_len;
+
+ buf = rd_malloc(size);
+
+ memcpy(buf, rd_kafka_consumer_group_metadata_magic, magic_len);
+ of += magic_len;
+
+ memcpy(buf + of, &cgmd->generation_id, generationid_len);
+ of += generationid_len;
+
+ memcpy(buf + of, cgmd->group_id, groupid_len);
+ of += groupid_len;
+
+ memcpy(buf + of, cgmd->member_id, member_id_len);
+ of += member_id_len;
+
+ memcpy(buf + of, &group_instance_id_is_null,
+ group_instance_id_is_null_len);
+ of += group_instance_id_is_null_len;
+
+ if (!group_instance_id_is_null)
+ memcpy(buf + of, cgmd->group_instance_id,
+ group_instance_id_len);
+ of += group_instance_id_len;
+
+ rd_assert(of == size);
+
+ *bufferp = buf;
+ *sizep = size;
+
+ return NULL;
+}
+
+
+/*
+ * Check that a string is printable, returning NULL if not or
+ * a pointer immediately after the end of the string NUL
+ * terminator if so.
+ **/
+static const char *str_is_printable(const char *s, const char *end) {
+ const char *c;
+ for (c = s; *c && c != end; c++)
+ if (!isprint((int)*c))
+ return NULL;
+ return c + 1;
+}
+
+
+rd_kafka_error_t *rd_kafka_consumer_group_metadata_read(
+ rd_kafka_consumer_group_metadata_t **cgmdp,
+ const void *buffer,
+ size_t size) {
+ const char *buf = (const char *)buffer;
+ const char *end = buf + size;
+ const char *next;
+ size_t magic_len = sizeof(rd_kafka_consumer_group_metadata_magic);
+ int32_t generation_id;
+ size_t generationid_len = sizeof(generation_id);
+ const char *group_id;
+ const char *member_id;
+ int8_t group_instance_id_is_null;
+ const char *group_instance_id = NULL;
+
+ if (size < magic_len + generationid_len + 1 + 1 + 1)
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
+ "Input buffer is too short");
+
+ if (memcmp(buffer, rd_kafka_consumer_group_metadata_magic, magic_len))
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
+ "Input buffer is not a serialized "
+ "consumer group metadata object");
+ memcpy(&generation_id, buf + magic_len, generationid_len);
+
+ group_id = buf + magic_len + generationid_len;
+ next = str_is_printable(group_id, end);
+ if (!next)
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
+ "Input buffer group id is not safe");
+
+ member_id = next;
+ next = str_is_printable(member_id, end);
+ if (!next)
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
+ "Input buffer member id is not "
+ "safe");
+
+ group_instance_id_is_null = (int8_t) * (next++);
+ if (!group_instance_id_is_null) {
+ group_instance_id = next;
+ next = str_is_printable(group_instance_id, end);
+ if (!next)
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
+ "Input buffer group "
+ "instance id is not safe");
+ }
+
+ if (next != end)
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__BAD_MSG,
+ "Input buffer bad length");
+
+ *cgmdp = rd_kafka_consumer_group_metadata_new_with_genid(
+ group_id, generation_id, member_id, group_instance_id);
+
+ return NULL;
+}
+
+
+static int
+unittest_consumer_group_metadata_iteration(const char *group_id,
+ int32_t generation_id,
+ const char *member_id,
+ const char *group_instance_id) {
+ rd_kafka_consumer_group_metadata_t *cgmd;
+ void *buffer, *buffer2;
+ size_t size, size2;
+ rd_kafka_error_t *error;
+
+ cgmd = rd_kafka_consumer_group_metadata_new_with_genid(
+ group_id, generation_id, member_id, group_instance_id);
+ RD_UT_ASSERT(cgmd != NULL, "failed to create metadata");
+
+ error = rd_kafka_consumer_group_metadata_write(cgmd, &buffer, &size);
+ RD_UT_ASSERT(!error, "metadata_write failed: %s",
+ rd_kafka_error_string(error));
+
+ rd_kafka_consumer_group_metadata_destroy(cgmd);
+
+ cgmd = NULL;
+ error = rd_kafka_consumer_group_metadata_read(&cgmd, buffer, size);
+ RD_UT_ASSERT(!error, "metadata_read failed: %s",
+ rd_kafka_error_string(error));
+
+ /* Serialize again and compare buffers */
+ error = rd_kafka_consumer_group_metadata_write(cgmd, &buffer2, &size2);
+ RD_UT_ASSERT(!error, "metadata_write failed: %s",
+ rd_kafka_error_string(error));
+
+ RD_UT_ASSERT(size == size2 && !memcmp(buffer, buffer2, size),
+ "metadata_read/write size or content mismatch: "
+ "size %" PRIusz ", size2 %" PRIusz,
+ size, size2);
+
+ rd_kafka_consumer_group_metadata_destroy(cgmd);
+ rd_free(buffer);
+ rd_free(buffer2);
+
+ return 0;
+}
+
+
+static int unittest_consumer_group_metadata(void) {
+ const char *ids[] = {
+ "mY. random id:.",
+ "0",
+ "2222222222222222222222221111111111111111111111111111112222",
+ "",
+ "NULL",
+ NULL,
+ };
+ int i, j, k, gen_id;
+ int ret;
+ const char *group_id;
+ const char *member_id;
+ const char *group_instance_id;
+
+ for (i = 0; ids[i]; i++) {
+ for (j = 0; ids[j]; j++) {
+ for (k = 0; ids[k]; k++) {
+ for (gen_id = -1; gen_id < 1; gen_id++) {
+ group_id = ids[i];
+ member_id = ids[j];
+ group_instance_id = ids[k];
+ if (strcmp(group_instance_id, "NULL") ==
+ 0)
+ group_instance_id = NULL;
+ ret =
+ unittest_consumer_group_metadata_iteration(
+ group_id, gen_id, member_id,
+ group_instance_id);
+ if (ret)
+ return ret;
+ }
+ }
+ }
+ }
+
+ RD_UT_PASS();
+}
+
+
+static int unittest_set_intersect(void) {
+ size_t par_cnt = 10;
+ map_toppar_member_info_t *dst;
+ rd_kafka_topic_partition_t *toppar;
+ PartitionMemberInfo_t *v;
+ char *id = "id";
+ rd_kafkap_str_t id1 = RD_KAFKAP_STR_INITIALIZER;
+ rd_kafkap_str_t id2 = RD_KAFKAP_STR_INITIALIZER;
+ rd_kafka_group_member_t *gm1;
+ rd_kafka_group_member_t *gm2;
+
+ id1.len = 2;
+ id1.str = id;
+ id2.len = 2;
+ id2.str = id;
+
+ map_toppar_member_info_t a = RD_MAP_INITIALIZER(
+ par_cnt, rd_kafka_topic_partition_cmp,
+ rd_kafka_topic_partition_hash,
+ rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free);
+
+ map_toppar_member_info_t b = RD_MAP_INITIALIZER(
+ par_cnt, rd_kafka_topic_partition_cmp,
+ rd_kafka_topic_partition_hash,
+ rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free);
+
+ gm1 = rd_calloc(1, sizeof(*gm1));
+ gm1->rkgm_member_id = &id1;
+ gm1->rkgm_group_instance_id = &id1;
+ gm2 = rd_calloc(1, sizeof(*gm2));
+ gm2->rkgm_member_id = &id2;
+ gm2->rkgm_group_instance_id = &id2;
+
+ RD_MAP_SET(&a, rd_kafka_topic_partition_new("t1", 4),
+ PartitionMemberInfo_new(gm1, rd_false));
+ RD_MAP_SET(&a, rd_kafka_topic_partition_new("t2", 4),
+ PartitionMemberInfo_new(gm1, rd_false));
+ RD_MAP_SET(&a, rd_kafka_topic_partition_new("t1", 7),
+ PartitionMemberInfo_new(gm1, rd_false));
+
+ RD_MAP_SET(&b, rd_kafka_topic_partition_new("t2", 7),
+ PartitionMemberInfo_new(gm1, rd_false));
+ RD_MAP_SET(&b, rd_kafka_topic_partition_new("t1", 4),
+ PartitionMemberInfo_new(gm2, rd_false));
+
+ dst = rd_kafka_member_partitions_intersect(&a, &b);
+
+ RD_UT_ASSERT(RD_MAP_CNT(&a) == 3, "expected a cnt to be 3 not %d",
+ (int)RD_MAP_CNT(&a));
+ RD_UT_ASSERT(RD_MAP_CNT(&b) == 2, "expected b cnt to be 2 not %d",
+ (int)RD_MAP_CNT(&b));
+ RD_UT_ASSERT(RD_MAP_CNT(dst) == 1, "expected dst cnt to be 1 not %d",
+ (int)RD_MAP_CNT(dst));
+
+ toppar = rd_kafka_topic_partition_new("t1", 4);
+ RD_UT_ASSERT((v = RD_MAP_GET(dst, toppar)), "unexpected element");
+ RD_UT_ASSERT(v->members_match, "expected members to match");
+ rd_kafka_topic_partition_destroy(toppar);
+
+ RD_MAP_DESTROY(&a);
+ RD_MAP_DESTROY(&b);
+ RD_MAP_DESTROY(dst);
+ rd_free(dst);
+
+ rd_free(gm1);
+ rd_free(gm2);
+
+ RD_UT_PASS();
+}
+
+
+static int unittest_set_subtract(void) {
+ size_t par_cnt = 10;
+ rd_kafka_topic_partition_t *toppar;
+ map_toppar_member_info_t *dst;
+
+ map_toppar_member_info_t a = RD_MAP_INITIALIZER(
+ par_cnt, rd_kafka_topic_partition_cmp,
+ rd_kafka_topic_partition_hash,
+ rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free);
+
+ map_toppar_member_info_t b = RD_MAP_INITIALIZER(
+ par_cnt, rd_kafka_topic_partition_cmp,
+ rd_kafka_topic_partition_hash,
+ rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free);
+
+ RD_MAP_SET(&a, rd_kafka_topic_partition_new("t1", 4),
+ PartitionMemberInfo_new(NULL, rd_false));
+ RD_MAP_SET(&a, rd_kafka_topic_partition_new("t2", 7),
+ PartitionMemberInfo_new(NULL, rd_false));
+
+ RD_MAP_SET(&b, rd_kafka_topic_partition_new("t2", 4),
+ PartitionMemberInfo_new(NULL, rd_false));
+ RD_MAP_SET(&b, rd_kafka_topic_partition_new("t1", 4),
+ PartitionMemberInfo_new(NULL, rd_false));
+ RD_MAP_SET(&b, rd_kafka_topic_partition_new("t1", 7),
+ PartitionMemberInfo_new(NULL, rd_false));
+
+ dst = rd_kafka_member_partitions_subtract(&a, &b);
+
+ RD_UT_ASSERT(RD_MAP_CNT(&a) == 2, "expected a cnt to be 2 not %d",
+ (int)RD_MAP_CNT(&a));
+ RD_UT_ASSERT(RD_MAP_CNT(&b) == 3, "expected b cnt to be 3 not %d",
+ (int)RD_MAP_CNT(&b));
+ RD_UT_ASSERT(RD_MAP_CNT(dst) == 1, "expected dst cnt to be 1 not %d",
+ (int)RD_MAP_CNT(dst));
+
+ toppar = rd_kafka_topic_partition_new("t2", 7);
+ RD_UT_ASSERT(RD_MAP_GET(dst, toppar), "unexpected element");
+ rd_kafka_topic_partition_destroy(toppar);
+
+ RD_MAP_DESTROY(&a);
+ RD_MAP_DESTROY(&b);
+ RD_MAP_DESTROY(dst);
+ rd_free(dst);
+
+ RD_UT_PASS();
+}
+
+
+static int unittest_map_to_list(void) {
+ rd_kafka_topic_partition_list_t *list;
+
+ map_toppar_member_info_t map = RD_MAP_INITIALIZER(
+ 10, rd_kafka_topic_partition_cmp, rd_kafka_topic_partition_hash,
+ rd_kafka_topic_partition_destroy_free, PartitionMemberInfo_free);
+
+ RD_MAP_SET(&map, rd_kafka_topic_partition_new("t1", 101),
+ PartitionMemberInfo_new(NULL, rd_false));
+
+ list = rd_kafka_toppar_member_info_map_to_list(&map);
+
+ RD_UT_ASSERT(list->cnt == 1, "expecting list size of 1 not %d.",
+ list->cnt);
+ RD_UT_ASSERT(list->elems[0].partition == 101,
+ "expecting partition 101 not %d",
+ list->elems[0].partition);
+ RD_UT_ASSERT(!strcmp(list->elems[0].topic, "t1"),
+ "expecting topic 't1', not %s", list->elems[0].topic);
+
+ rd_kafka_topic_partition_list_destroy(list);
+ RD_MAP_DESTROY(&map);
+
+ RD_UT_PASS();
+}
+
+
+static int unittest_list_to_map(void) {
+ rd_kafka_topic_partition_t *toppar;
+ map_toppar_member_info_t *map;
+ rd_kafka_topic_partition_list_t *list =
+ rd_kafka_topic_partition_list_new(1);
+
+ rd_kafka_topic_partition_list_add(list, "topic1", 201);
+ rd_kafka_topic_partition_list_add(list, "topic2", 202);
+
+ map = rd_kafka_toppar_list_to_toppar_member_info_map(list);
+
+ RD_UT_ASSERT(RD_MAP_CNT(map) == 2, "expected map cnt to be 2 not %d",
+ (int)RD_MAP_CNT(map));
+ toppar = rd_kafka_topic_partition_new("topic1", 201);
+ RD_UT_ASSERT(RD_MAP_GET(map, toppar),
+ "expected topic1 [201] to exist in map");
+ rd_kafka_topic_partition_destroy(toppar);
+ toppar = rd_kafka_topic_partition_new("topic2", 202);
+ RD_UT_ASSERT(RD_MAP_GET(map, toppar),
+ "expected topic2 [202] to exist in map");
+ rd_kafka_topic_partition_destroy(toppar);
+
+ RD_MAP_DESTROY(map);
+ rd_free(map);
+ rd_kafka_topic_partition_list_destroy(list);
+
+ RD_UT_PASS();
+}
+
+
+/**
+ * @brief Consumer group unit tests
+ */
+int unittest_cgrp(void) {
+ int fails = 0;
+
+ fails += unittest_consumer_group_metadata();
+ fails += unittest_set_intersect();
+ fails += unittest_set_subtract();
+ fails += unittest_map_to_list();
+ fails += unittest_list_to_map();
+
+ return fails;
+}