summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignment.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignment.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignment.c968
1 files changed, 968 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignment.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignment.c
new file mode 100644
index 000000000..dc4bdae94
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignment.c
@@ -0,0 +1,968 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2020 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+/**
+ * @name Consumer assignment state.
+ *
+ * Responsible for managing the state of assigned partitions.
+ *
+ *
+ ******************************************************************************
+ * rd_kafka_assignment_serve()
+ * ---------------------------
+ *
+ * It is important to call rd_kafka_assignment_serve() after each change
+ * to the assignment through assignment_add, assignment_subtract or
+ * assignment_clear as those functions only modify the assignment but does
+ * not take any action to transition partitions to or from the assignment
+ * states.
+ *
+ * The reason assignment_serve() is not automatically called from these
+ * functions is for the caller to be able to set the current state before
+ * the side-effects of serve() kick in, such as the call to
+ * rd_kafka_cgrp_assignment_done() that in turn will set the cgrp state.
+ *
+ *
+ *
+ ******************************************************************************
+ * Querying for committed offsets (.queried list)
+ * ----------------------------------------------
+ *
+ * We only allow one outstanding query (fetch committed offset), this avoids
+ * complex handling of partitions that are assigned, unassigned and reassigned
+ * all within the window of a OffsetFetch request.
+ * Consider the following case:
+ *
+ * 1. tp1 and tp2 are incrementally assigned.
+ * 2. An OffsetFetchRequest is sent for tp1 and tp2
+ * 3. tp2 is incremental unassigned.
+ * 4. Broker sends OffsetFetchResponse with offsets tp1=10, tp2=20.
+ * 4. Some other consumer commits offsets 30 for tp2.
+ * 5. tp2 is incrementally assigned again.
+ * 6. The OffsetFetchResponse is received.
+ *
+ * Without extra handling the consumer would start fetching tp1 at offset 10
+ * (which is correct) and tp2 at offset 20 (which is incorrect, the last
+ * committed offset is now 30).
+ *
+ * To alleviate this situation we remove unassigned partitions from the
+ * .queried list, and in the OffsetFetch response handler we only use offsets
+ * for partitions that are on the .queried list.
+ *
+ * To make sure the tp1 offset is used and not re-queried we only allow
+ * one outstanding OffsetFetch request at the time, meaning that at step 5
+ * a new OffsetFetch request will not be sent and tp2 will remain in the
+ * .pending list until the outstanding OffsetFetch response is received in
+ * step 6. At this point tp2 will transition to .queried and a new
+ * OffsetFetch request will be sent.
+ *
+ * This explanation is more verbose than the code involved.
+ *
+ ******************************************************************************
+ *
+ *
+ * @remark Try to keep any cgrp state out of this file.
+ *
+ * FIXME: There are some pretty obvious optimizations that needs to be done here
+ * with regards to partition_list_t lookups. But we can do that when
+ * we know the current implementation works correctly.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_offset.h"
+#include "rdkafka_request.h"
+
+
+static void rd_kafka_assignment_dump(rd_kafka_t *rk) {
+ rd_kafka_dbg(rk, CGRP, "DUMP",
+ "Assignment dump (started_cnt=%d, wait_stop_cnt=%d)",
+ rk->rk_consumer.assignment.started_cnt,
+ rk->rk_consumer.assignment.wait_stop_cnt);
+
+ rd_kafka_topic_partition_list_log(rk, "DUMP_ALL", RD_KAFKA_DBG_CGRP,
+ rk->rk_consumer.assignment.all);
+
+ rd_kafka_topic_partition_list_log(rk, "DUMP_PND", RD_KAFKA_DBG_CGRP,
+ rk->rk_consumer.assignment.pending);
+
+ rd_kafka_topic_partition_list_log(rk, "DUMP_QRY", RD_KAFKA_DBG_CGRP,
+ rk->rk_consumer.assignment.queried);
+
+ rd_kafka_topic_partition_list_log(rk, "DUMP_REM", RD_KAFKA_DBG_CGRP,
+ rk->rk_consumer.assignment.removed);
+}
+
+/**
+ * @brief Apply the fetched committed offsets to the current assignment's
+ * queried partitions.
+ *
+ * @param err is the request-level error, if any. The caller is responsible
+ * for raising this error to the application. It is only used here
+ * to avoid taking actions.
+ *
+ * Called from the FetchOffsets response handler below.
+ */
+static void
+rd_kafka_assignment_apply_offsets(rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *offsets,
+ rd_kafka_resp_err_t err) {
+ rd_kafka_topic_partition_t *rktpar;
+
+ RD_KAFKA_TPLIST_FOREACH(rktpar, offsets) {
+ /* May be NULL, borrow ref. */
+ rd_kafka_toppar_t *rktp =
+ rd_kafka_topic_partition_toppar(rk, rktpar);
+
+ if (!rd_kafka_topic_partition_list_del(
+ rk->rk_consumer.assignment.queried, rktpar->topic,
+ rktpar->partition)) {
+ rd_kafka_dbg(rk, CGRP, "OFFSETFETCH",
+ "Ignoring OffsetFetch "
+ "response for %s [%" PRId32
+ "] which is no "
+ "longer in the queried list "
+ "(possibly unassigned?)",
+ rktpar->topic, rktpar->partition);
+ continue;
+ }
+
+ if (err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT ||
+ rktpar->err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) {
+ /* Ongoing transactions are blocking offset retrieval.
+ * This is typically retried from the OffsetFetch
+ * handler but we can come here if the assignment
+ * (and thus the assignment.version) was changed while
+ * the OffsetFetch request was in-flight, in which case
+ * we put this partition back on the pending list for
+ * later handling by the assignment state machine. */
+
+ rd_kafka_dbg(rk, CGRP, "OFFSETFETCH",
+ "Adding %s [%" PRId32
+ "] back to pending "
+ "list because on-going transaction is "
+ "blocking offset retrieval",
+ rktpar->topic, rktpar->partition);
+
+ rd_kafka_topic_partition_list_add_copy(
+ rk->rk_consumer.assignment.pending, rktpar);
+
+ } else if (rktpar->err) {
+ /* Partition-level error */
+ rd_kafka_consumer_err(
+ rk->rk_consumer.q, RD_KAFKA_NODEID_UA, rktpar->err,
+ 0, rktpar->topic, rktp, RD_KAFKA_OFFSET_INVALID,
+ "Failed to fetch committed offset for "
+ "group \"%s\" topic %s [%" PRId32 "]: %s",
+ rk->rk_group_id->str, rktpar->topic,
+ rktpar->partition, rd_kafka_err2str(rktpar->err));
+
+ /* The partition will not be added back to .pending
+ * and thus only reside on .all until the application
+ * unassigns it and possible re-assigns it. */
+
+ } else if (!err) {
+ /* If rktpar->offset is RD_KAFKA_OFFSET_INVALID it means
+ * there was no committed offset for this partition.
+ * serve_pending() will now start this partition
+ * since the offset is set to INVALID (rather than
+ * STORED) and the partition fetcher will employ
+ * auto.offset.reset to know what to do. */
+
+ /* Add partition to pending list where serve()
+ * will start the fetcher. */
+ rd_kafka_dbg(rk, CGRP, "OFFSETFETCH",
+ "Adding %s [%" PRId32
+ "] back to pending "
+ "list with offset %s",
+ rktpar->topic, rktpar->partition,
+ rd_kafka_offset2str(rktpar->offset));
+
+ rd_kafka_topic_partition_list_add_copy(
+ rk->rk_consumer.assignment.pending, rktpar);
+ }
+ /* Do nothing for request-level errors (err is set). */
+ }
+
+ if (offsets->cnt > 0)
+ rd_kafka_assignment_serve(rk);
+}
+
+
+
+/**
+ * @brief Reply handler for OffsetFetch queries from the assignment code.
+ *
+ * @param opaque Is a malloced int64_t* containing the assignment version at the
+ * time of the request.
+ *
+ * @locality rdkafka main thread
+ */
+static void rd_kafka_assignment_handle_OffsetFetch(rd_kafka_t *rk,
+ rd_kafka_broker_t *rkb,
+ rd_kafka_resp_err_t err,
+ rd_kafka_buf_t *reply,
+ rd_kafka_buf_t *request,
+ void *opaque) {
+ rd_kafka_topic_partition_list_t *offsets = NULL;
+ int64_t *req_assignment_version = (int64_t *)opaque;
+ /* Only allow retries if there's been no change to the assignment,
+ * otherwise rely on assignment state machine to retry. */
+ rd_bool_t allow_retry =
+ *req_assignment_version == rk->rk_consumer.assignment.version;
+
+ if (err == RD_KAFKA_RESP_ERR__DESTROY) {
+ /* Termination, quick cleanup. */
+ rd_free(req_assignment_version);
+ return;
+ }
+
+ err = rd_kafka_handle_OffsetFetch(
+ rk, rkb, err, reply, request, &offsets,
+ rd_true /* Update toppars */, rd_true /* Add parts */, allow_retry);
+ if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) {
+ if (offsets)
+ rd_kafka_topic_partition_list_destroy(offsets);
+ return; /* retrying */
+ }
+
+ rd_free(req_assignment_version);
+
+ /* offsets may be NULL for certain errors, such
+ * as ERR__TRANSPORT. */
+ if (!offsets && !allow_retry) {
+ rd_dassert(err);
+ if (!err)
+ err = RD_KAFKA_RESP_ERR__NO_OFFSET;
+
+ rd_kafka_dbg(rk, CGRP, "OFFSET", "Offset fetch error: %s",
+ rd_kafka_err2str(err));
+ rd_kafka_consumer_err(
+ rk->rk_consumer.q, rd_kafka_broker_id(rkb), err, 0, NULL,
+ NULL, RD_KAFKA_OFFSET_INVALID,
+ "Failed to fetch committed "
+ "offsets for partitions "
+ "in group \"%s\": %s",
+ rk->rk_group_id->str, rd_kafka_err2str(err));
+
+ return;
+ }
+
+
+
+ if (err) {
+ rd_kafka_dbg(rk, CGRP, "OFFSET",
+ "Offset fetch error for %d partition(s): %s",
+ offsets->cnt, rd_kafka_err2str(err));
+ rd_kafka_consumer_err(
+ rk->rk_consumer.q, rd_kafka_broker_id(rkb), err, 0, NULL,
+ NULL, RD_KAFKA_OFFSET_INVALID,
+ "Failed to fetch committed offsets for "
+ "%d partition(s) in group \"%s\": %s",
+ offsets->cnt, rk->rk_group_id->str, rd_kafka_err2str(err));
+ }
+
+ /* Apply the fetched offsets to the assignment */
+ rd_kafka_assignment_apply_offsets(rk, offsets, err);
+
+ rd_kafka_topic_partition_list_destroy(offsets);
+}
+
+
+/**
+ * @brief Decommission all partitions in the removed list.
+ *
+ * @returns >0 if there are removal operations in progress, else 0.
+ */
+static int rd_kafka_assignment_serve_removals(rd_kafka_t *rk) {
+ rd_kafka_topic_partition_t *rktpar;
+ int valid_offsets = 0;
+
+ RD_KAFKA_TPLIST_FOREACH(rktpar, rk->rk_consumer.assignment.removed) {
+ rd_kafka_toppar_t *rktp =
+ rd_kafka_topic_partition_ensure_toppar(
+ rk, rktpar, rd_true); /* Borrow ref */
+ int was_pending, was_queried;
+
+ /* Remove partition from pending and querying lists,
+ * if it happens to be there.
+ * Outstanding OffsetFetch query results will be ignored
+ * for partitions that are no longer on the .queried list. */
+ was_pending = rd_kafka_topic_partition_list_del(
+ rk->rk_consumer.assignment.pending, rktpar->topic,
+ rktpar->partition);
+ was_queried = rd_kafka_topic_partition_list_del(
+ rk->rk_consumer.assignment.queried, rktpar->topic,
+ rktpar->partition);
+
+ if (rktp->rktp_started) {
+ /* Partition was started, stop the fetcher. */
+ rd_assert(rk->rk_consumer.assignment.started_cnt > 0);
+
+ rd_kafka_toppar_op_fetch_stop(
+ rktp, RD_KAFKA_REPLYQ(rk->rk_ops, 0));
+ rk->rk_consumer.assignment.wait_stop_cnt++;
+ }
+
+ /* Reset the (lib) pause flag which may have been set by
+ * the cgrp when scheduling the rebalance callback. */
+ rd_kafka_toppar_op_pause_resume(rktp, rd_false /*resume*/,
+ RD_KAFKA_TOPPAR_F_LIB_PAUSE,
+ RD_KAFKA_NO_REPLYQ);
+
+ rd_kafka_toppar_lock(rktp);
+
+ /* Save the currently stored offset and epoch on .removed
+ * so it will be committed below. */
+ rd_kafka_topic_partition_set_from_fetch_pos(
+ rktpar, rktp->rktp_stored_pos);
+ valid_offsets += !RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset);
+
+ /* Reset the stored offset to invalid so that
+ * a manual offset-less commit() or the auto-committer
+ * will not commit a stored offset from a previous
+ * assignment (issue #2782). */
+ rd_kafka_offset_store0(
+ rktp, RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, -1),
+ rd_true, RD_DONT_LOCK);
+
+ /* Partition is no longer desired */
+ rd_kafka_toppar_desired_del(rktp);
+
+ rd_assert((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ASSIGNED));
+ rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ASSIGNED;
+
+ rd_kafka_toppar_unlock(rktp);
+
+ rd_kafka_dbg(rk, CGRP, "REMOVE",
+ "Removing %s [%" PRId32
+ "] from assignment "
+ "(started=%s, pending=%s, queried=%s, "
+ "stored offset=%s)",
+ rktpar->topic, rktpar->partition,
+ RD_STR_ToF(rktp->rktp_started),
+ RD_STR_ToF(was_pending), RD_STR_ToF(was_queried),
+ rd_kafka_offset2str(rktpar->offset));
+ }
+
+ rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REMOVE",
+ "Served %d removed partition(s), "
+ "with %d offset(s) to commit",
+ rk->rk_consumer.assignment.removed->cnt, valid_offsets);
+
+ /* If enable.auto.commit=true:
+ * Commit final offsets to broker for the removed partitions,
+ * unless this is a consumer destruction with a close() call. */
+ if (valid_offsets > 0 &&
+ rk->rk_conf.offset_store_method == RD_KAFKA_OFFSET_METHOD_BROKER &&
+ rk->rk_cgrp && rk->rk_conf.enable_auto_commit &&
+ !rd_kafka_destroy_flags_no_consumer_close(rk))
+ rd_kafka_cgrp_assigned_offsets_commit(
+ rk->rk_cgrp, rk->rk_consumer.assignment.removed,
+ rd_false /* use offsets from .removed */,
+ "unassigned partitions");
+
+ rd_kafka_topic_partition_list_clear(rk->rk_consumer.assignment.removed);
+
+ return rk->rk_consumer.assignment.wait_stop_cnt +
+ rk->rk_consumer.wait_commit_cnt;
+}
+
+
+/**
+ * @brief Serve all partitions in the pending list.
+ *
+ * This either (asynchronously) queries the partition's committed offset, or
+ * if the start offset is known, starts the partition fetcher.
+ *
+ * @returns >0 if there are pending operations in progress for the current
+ * assignment, else 0.
+ */
+static int rd_kafka_assignment_serve_pending(rd_kafka_t *rk) {
+ rd_kafka_topic_partition_list_t *partitions_to_query = NULL;
+ /* We can query committed offsets only if all of the following are true:
+ * - We have a group coordinator.
+ * - There are no outstanding commits (since we might need to
+ * read back those commits as our starting position).
+ * - There are no outstanding queries already (since we want to
+ * avoid using a earlier queries response for a partition that
+ * is unassigned and then assigned again).
+ */
+ rd_kafka_broker_t *coord =
+ rk->rk_cgrp ? rd_kafka_cgrp_get_coord(rk->rk_cgrp) : NULL;
+ rd_bool_t can_query_offsets =
+ coord && rk->rk_consumer.wait_commit_cnt == 0 &&
+ rk->rk_consumer.assignment.queried->cnt == 0;
+ int i;
+
+ if (can_query_offsets)
+ partitions_to_query = rd_kafka_topic_partition_list_new(
+ rk->rk_consumer.assignment.pending->cnt);
+
+ /* Scan the list backwards so removals are cheap (no array shuffle) */
+ for (i = rk->rk_consumer.assignment.pending->cnt - 1; i >= 0; i--) {
+ rd_kafka_topic_partition_t *rktpar =
+ &rk->rk_consumer.assignment.pending->elems[i];
+ /* Borrow ref */
+ rd_kafka_toppar_t *rktp =
+ rd_kafka_topic_partition_ensure_toppar(rk, rktpar, rd_true);
+
+ rd_assert(!rktp->rktp_started);
+
+ if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset) ||
+ rktpar->offset == RD_KAFKA_OFFSET_BEGINNING ||
+ rktpar->offset == RD_KAFKA_OFFSET_END ||
+ rktpar->offset == RD_KAFKA_OFFSET_INVALID ||
+ rktpar->offset <= RD_KAFKA_OFFSET_TAIL_BASE) {
+ /* The partition fetcher can handle absolute
+ * as well as beginning/end/tail start offsets, so we're
+ * ready to start the fetcher now.
+ * The INVALID offset means there was no committed
+ * offset and the partition fetcher will employ
+ * auto.offset.reset.
+ *
+ * Start fetcher for partition and forward partition's
+ * fetchq to consumer group's queue. */
+
+ rd_kafka_dbg(rk, CGRP, "SRVPEND",
+ "Starting pending assigned partition "
+ "%s [%" PRId32 "] at %s",
+ rktpar->topic, rktpar->partition,
+ rd_kafka_fetch_pos2str(
+ rd_kafka_topic_partition_get_fetch_pos(
+ rktpar)));
+
+ /* Reset the (lib) pause flag which may have been set by
+ * the cgrp when scheduling the rebalance callback. */
+ rd_kafka_toppar_op_pause_resume(
+ rktp, rd_false /*resume*/,
+ RD_KAFKA_TOPPAR_F_LIB_PAUSE, RD_KAFKA_NO_REPLYQ);
+
+ /* Start the fetcher */
+ rktp->rktp_started = rd_true;
+ rk->rk_consumer.assignment.started_cnt++;
+
+ rd_kafka_toppar_op_fetch_start(
+ rktp,
+ rd_kafka_topic_partition_get_fetch_pos(rktpar),
+ rk->rk_consumer.q, RD_KAFKA_NO_REPLYQ);
+
+
+ } else if (can_query_offsets) {
+ /* Else use the last committed offset for partition.
+ * We can't rely on any internal cached committed offset
+ * so we'll accumulate a list of partitions that need
+ * to be queried and then send FetchOffsetsRequest
+ * to the group coordinator. */
+
+ rd_dassert(!rd_kafka_topic_partition_list_find(
+ rk->rk_consumer.assignment.queried, rktpar->topic,
+ rktpar->partition));
+
+ rd_kafka_topic_partition_list_add_copy(
+ partitions_to_query, rktpar);
+
+ rd_kafka_topic_partition_list_add_copy(
+ rk->rk_consumer.assignment.queried, rktpar);
+
+ rd_kafka_dbg(rk, CGRP, "SRVPEND",
+ "Querying committed offset for pending "
+ "assigned partition %s [%" PRId32 "]",
+ rktpar->topic, rktpar->partition);
+
+
+ } else {
+ rd_kafka_dbg(
+ rk, CGRP, "SRVPEND",
+ "Pending assignment partition "
+ "%s [%" PRId32
+ "] can't fetch committed "
+ "offset yet "
+ "(cgrp state %s, awaiting %d commits, "
+ "%d partition(s) already being queried)",
+ rktpar->topic, rktpar->partition,
+ rk->rk_cgrp
+ ? rd_kafka_cgrp_state_names[rk->rk_cgrp
+ ->rkcg_state]
+ : "n/a",
+ rk->rk_consumer.wait_commit_cnt,
+ rk->rk_consumer.assignment.queried->cnt);
+
+ continue; /* Keep rktpar on pending list */
+ }
+
+ /* Remove rktpar from the pending list */
+ rd_kafka_topic_partition_list_del_by_idx(
+ rk->rk_consumer.assignment.pending, i);
+ }
+
+
+ if (!can_query_offsets) {
+ if (coord)
+ rd_kafka_broker_destroy(coord);
+ return rk->rk_consumer.assignment.pending->cnt +
+ rk->rk_consumer.assignment.queried->cnt;
+ }
+
+
+ if (partitions_to_query->cnt > 0) {
+ int64_t *req_assignment_version = rd_malloc(sizeof(int64_t));
+ *req_assignment_version = rk->rk_consumer.assignment.version;
+
+ rd_kafka_dbg(rk, CGRP, "OFFSETFETCH",
+ "Fetching committed offsets for "
+ "%d pending partition(s) in assignment",
+ partitions_to_query->cnt);
+
+ rd_kafka_OffsetFetchRequest(
+ coord, rk->rk_group_id->str, partitions_to_query,
+ rk->rk_conf.isolation_level ==
+ RD_KAFKA_READ_COMMITTED /*require_stable_offsets*/,
+ 0, /* Timeout */
+ RD_KAFKA_REPLYQ(rk->rk_ops, 0),
+ rd_kafka_assignment_handle_OffsetFetch,
+ /* Must be freed by handler */
+ (void *)req_assignment_version);
+ }
+
+ if (coord)
+ rd_kafka_broker_destroy(coord);
+
+ rd_kafka_topic_partition_list_destroy(partitions_to_query);
+
+ return rk->rk_consumer.assignment.pending->cnt +
+ rk->rk_consumer.assignment.queried->cnt;
+}
+
+
+
+/**
+ * @brief Serve updates to the assignment.
+ *
+ * Call on:
+ * - assignment changes
+ * - wait_commit_cnt reaches 0
+ * - partition fetcher is stopped
+ */
+void rd_kafka_assignment_serve(rd_kafka_t *rk) {
+ int inp_removals = 0;
+ int inp_pending = 0;
+
+ rd_kafka_assignment_dump(rk);
+
+ /* Serve any partitions that should be removed */
+ if (rk->rk_consumer.assignment.removed->cnt > 0)
+ inp_removals = rd_kafka_assignment_serve_removals(rk);
+
+ /* Serve any partitions in the pending list that need further action,
+ * unless we're waiting for a previous assignment change (an unassign
+ * in some form) to propagate, or outstanding offset commits
+ * to finish (since we might need the committed offsets as start
+ * offsets). */
+ if (rk->rk_consumer.assignment.wait_stop_cnt == 0 &&
+ rk->rk_consumer.wait_commit_cnt == 0 && inp_removals == 0 &&
+ rk->rk_consumer.assignment.pending->cnt > 0)
+ inp_pending = rd_kafka_assignment_serve_pending(rk);
+
+ if (inp_removals + inp_pending +
+ rk->rk_consumer.assignment.queried->cnt +
+ rk->rk_consumer.assignment.wait_stop_cnt +
+ rk->rk_consumer.wait_commit_cnt ==
+ 0) {
+ /* No assignment operations in progress,
+ * signal assignment done back to cgrp to let it
+ * transition to its next state if necessary.
+ * We may emit this signalling more than necessary and it is
+ * up to the cgrp to only take action if needed, based on its
+ * state. */
+ rd_kafka_cgrp_assignment_done(rk->rk_cgrp);
+ } else {
+ rd_kafka_dbg(rk, CGRP, "ASSIGNMENT",
+ "Current assignment of %d partition(s) "
+ "with %d pending adds, %d offset queries, "
+ "%d partitions awaiting stop and "
+ "%d offset commits in progress",
+ rk->rk_consumer.assignment.all->cnt, inp_pending,
+ rk->rk_consumer.assignment.queried->cnt,
+ rk->rk_consumer.assignment.wait_stop_cnt,
+ rk->rk_consumer.wait_commit_cnt);
+ }
+}
+
+
+/**
+ * @returns true if the current or previous assignment has operations in
+ * progress, such as waiting for partition fetchers to stop.
+ */
+rd_bool_t rd_kafka_assignment_in_progress(rd_kafka_t *rk) {
+ return rk->rk_consumer.wait_commit_cnt > 0 ||
+ rk->rk_consumer.assignment.wait_stop_cnt > 0 ||
+ rk->rk_consumer.assignment.pending->cnt > 0 ||
+ rk->rk_consumer.assignment.queried->cnt > 0 ||
+ rk->rk_consumer.assignment.removed->cnt > 0;
+}
+
+
+/**
+ * @brief Clear the current assignment.
+ *
+ * @remark Make sure to call rd_kafka_assignment_serve() after successful
+ * return from this function.
+ *
+ * @returns the number of partitions removed.
+ */
+int rd_kafka_assignment_clear(rd_kafka_t *rk) {
+ int cnt = rk->rk_consumer.assignment.all->cnt;
+
+ if (cnt == 0) {
+ rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "CLEARASSIGN",
+ "No current assignment to clear");
+ return 0;
+ }
+
+ rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "CLEARASSIGN",
+ "Clearing current assignment of %d partition(s)",
+ rk->rk_consumer.assignment.all->cnt);
+
+ rd_kafka_topic_partition_list_clear(rk->rk_consumer.assignment.pending);
+ rd_kafka_topic_partition_list_clear(rk->rk_consumer.assignment.queried);
+
+ rd_kafka_topic_partition_list_add_list(
+ rk->rk_consumer.assignment.removed, rk->rk_consumer.assignment.all);
+ rd_kafka_topic_partition_list_clear(rk->rk_consumer.assignment.all);
+
+ rk->rk_consumer.assignment.version++;
+
+ return cnt;
+}
+
+
+/**
+ * @brief Adds \p partitions to the current assignment.
+ *
+ * Will return error if trying to add a partition that is already in the
+ * assignment.
+ *
+ * @remark Make sure to call rd_kafka_assignment_serve() after successful
+ * return from this function.
+ */
+rd_kafka_error_t *
+rd_kafka_assignment_add(rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *partitions) {
+ rd_bool_t was_empty = rk->rk_consumer.assignment.all->cnt == 0;
+ int i;
+
+ /* Make sure there are no duplicates, invalid partitions, or
+ * invalid offsets in the input partitions. */
+ rd_kafka_topic_partition_list_sort(partitions, NULL, NULL);
+
+ for (i = 0; i < partitions->cnt; i++) {
+ rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
+ const rd_kafka_topic_partition_t *prev =
+ i > 0 ? &partitions->elems[i - 1] : NULL;
+
+ if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset) &&
+ rktpar->offset != RD_KAFKA_OFFSET_BEGINNING &&
+ rktpar->offset != RD_KAFKA_OFFSET_END &&
+ rktpar->offset != RD_KAFKA_OFFSET_STORED &&
+ rktpar->offset != RD_KAFKA_OFFSET_INVALID &&
+ rktpar->offset > RD_KAFKA_OFFSET_TAIL_BASE)
+ return rd_kafka_error_new(
+ RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "%s [%" PRId32
+ "] has invalid start offset %" PRId64,
+ rktpar->topic, rktpar->partition, rktpar->offset);
+
+ if (prev && !rd_kafka_topic_partition_cmp(rktpar, prev))
+ return rd_kafka_error_new(
+ RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "Duplicate %s [%" PRId32 "] in input list",
+ rktpar->topic, rktpar->partition);
+
+ if (rd_kafka_topic_partition_list_find(
+ rk->rk_consumer.assignment.all, rktpar->topic,
+ rktpar->partition))
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__CONFLICT,
+ "%s [%" PRId32
+ "] is already part of the "
+ "current assignment",
+ rktpar->topic,
+ rktpar->partition);
+
+ /* Translate RD_KAFKA_OFFSET_INVALID to RD_KAFKA_OFFSET_STORED,
+ * i.e., read from committed offset, since we use INVALID
+ * internally to differentiate between querying for
+ * committed offset (STORED) and no committed offset (INVALID).
+ */
+ if (rktpar->offset == RD_KAFKA_OFFSET_INVALID)
+ rktpar->offset = RD_KAFKA_OFFSET_STORED;
+
+ /* Get toppar object for each partition.
+ * This is to make sure the rktp stays alive while unassigning
+ * any previous assignment in the call to
+ * assignment_clear() below. */
+ rd_kafka_topic_partition_ensure_toppar(rk, rktpar, rd_true);
+ }
+
+ /* Mark all partition objects as assigned and reset the stored
+ * offsets back to invalid in case it was explicitly stored during
+ * the time the partition was not assigned. */
+ for (i = 0; i < partitions->cnt; i++) {
+ rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
+ rd_kafka_toppar_t *rktp =
+ rd_kafka_topic_partition_ensure_toppar(rk, rktpar, rd_true);
+
+ rd_kafka_toppar_lock(rktp);
+
+ rd_assert(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ASSIGNED));
+ rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_ASSIGNED;
+
+ /* Reset the stored offset to INVALID to avoid the race
+ * condition described in rdkafka_offset.h */
+ rd_kafka_offset_store0(
+ rktp, RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, -1),
+ rd_true /* force */, RD_DONT_LOCK);
+
+ rd_kafka_toppar_unlock(rktp);
+ }
+
+
+ /* Add the new list of partitions to the current assignment.
+ * Only need to sort the final assignment if it was non-empty
+ * to begin with since \p partitions is sorted above. */
+ rd_kafka_topic_partition_list_add_list(rk->rk_consumer.assignment.all,
+ partitions);
+ if (!was_empty)
+ rd_kafka_topic_partition_list_sort(
+ rk->rk_consumer.assignment.all, NULL, NULL);
+
+ /* And add to .pending for serve_pending() to handle. */
+ rd_kafka_topic_partition_list_add_list(
+ rk->rk_consumer.assignment.pending, partitions);
+
+
+ rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "ASSIGNMENT",
+ "Added %d partition(s) to assignment which "
+ "now consists of %d partition(s) where of %d are in "
+ "pending state and %d are being queried",
+ partitions->cnt, rk->rk_consumer.assignment.all->cnt,
+ rk->rk_consumer.assignment.pending->cnt,
+ rk->rk_consumer.assignment.queried->cnt);
+
+ rk->rk_consumer.assignment.version++;
+
+ return NULL;
+}
+
+
+/**
+ * @brief Remove \p partitions from the current assignment.
+ *
+ * Will return error if trying to remove a partition that is not in the
+ * assignment.
+ *
+ * @remark Make sure to call rd_kafka_assignment_serve() after successful
+ * return from this function.
+ */
+rd_kafka_error_t *
+rd_kafka_assignment_subtract(rd_kafka_t *rk,
+ rd_kafka_topic_partition_list_t *partitions) {
+ int i;
+ int matched_queried_partitions = 0;
+ int assignment_pre_cnt;
+
+ if (rk->rk_consumer.assignment.all->cnt == 0 && partitions->cnt > 0)
+ return rd_kafka_error_new(
+ RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "Can't subtract from empty assignment");
+
+ /* Verify that all partitions in \p partitions are in the assignment
+ * before starting to modify the assignment. */
+ rd_kafka_topic_partition_list_sort(partitions, NULL, NULL);
+
+ for (i = 0; i < partitions->cnt; i++) {
+ rd_kafka_topic_partition_t *rktpar = &partitions->elems[i];
+
+ if (!rd_kafka_topic_partition_list_find(
+ rk->rk_consumer.assignment.all, rktpar->topic,
+ rktpar->partition))
+ return rd_kafka_error_new(
+ RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "%s [%" PRId32
+ "] can't be unassigned since "
+ "it is not in the current assignment",
+ rktpar->topic, rktpar->partition);
+
+ rd_kafka_topic_partition_ensure_toppar(rk, rktpar, rd_true);
+ }
+
+
+ assignment_pre_cnt = rk->rk_consumer.assignment.all->cnt;
+
+ /* Remove partitions in reverse order to avoid excessive
+ * array shuffling of .all.
+ * Add the removed partitions to .pending for serve() to handle. */
+ for (i = partitions->cnt - 1; i >= 0; i--) {
+ const rd_kafka_topic_partition_t *rktpar =
+ &partitions->elems[i];
+
+ if (!rd_kafka_topic_partition_list_del(
+ rk->rk_consumer.assignment.all, rktpar->topic,
+ rktpar->partition))
+ RD_BUG("Removed partition %s [%" PRId32
+ "] not found "
+ "in assignment.all",
+ rktpar->topic, rktpar->partition);
+
+ if (rd_kafka_topic_partition_list_del(
+ rk->rk_consumer.assignment.queried, rktpar->topic,
+ rktpar->partition))
+ matched_queried_partitions++;
+ else
+ rd_kafka_topic_partition_list_del(
+ rk->rk_consumer.assignment.pending, rktpar->topic,
+ rktpar->partition);
+
+ /* Add to .removed list which will be served by
+ * serve_removals(). */
+ rd_kafka_topic_partition_list_add_copy(
+ rk->rk_consumer.assignment.removed, rktpar);
+ }
+
+ rd_kafka_dbg(rk, CGRP, "REMOVEASSIGN",
+ "Removed %d partition(s) "
+ "(%d with outstanding offset queries) from assignment "
+ "of %d partition(s)",
+ partitions->cnt, matched_queried_partitions,
+ assignment_pre_cnt);
+
+ if (rk->rk_consumer.assignment.all->cnt == 0) {
+ /* Some safe checking */
+ rd_assert(rk->rk_consumer.assignment.pending->cnt == 0);
+ rd_assert(rk->rk_consumer.assignment.queried->cnt == 0);
+ }
+
+ rk->rk_consumer.assignment.version++;
+
+ return NULL;
+}
+
+
+/**
+ * @brief Call when partition fetcher has stopped.
+ */
+void rd_kafka_assignment_partition_stopped(rd_kafka_t *rk,
+ rd_kafka_toppar_t *rktp) {
+ rd_assert(rk->rk_consumer.assignment.wait_stop_cnt > 0);
+ rk->rk_consumer.assignment.wait_stop_cnt--;
+
+ rd_assert(rktp->rktp_started);
+ rktp->rktp_started = rd_false;
+
+ rd_assert(rk->rk_consumer.assignment.started_cnt > 0);
+ rk->rk_consumer.assignment.started_cnt--;
+
+ /* If this was the last partition we awaited stop for, serve the
+ * assignment to transition any existing assignment to the next state */
+ if (rk->rk_consumer.assignment.wait_stop_cnt == 0) {
+ rd_kafka_dbg(rk, CGRP, "STOPSERVE",
+ "All partitions awaiting stop are now "
+ "stopped: serving assignment");
+ rd_kafka_assignment_serve(rk);
+ }
+}
+
+
+/**
+ * @brief Pause fetching of the currently assigned partitions.
+ *
+ * Partitions will be resumed by calling rd_kafka_assignment_resume() or
+ * from either serve_removals() or serve_pending() above.
+ */
+void rd_kafka_assignment_pause(rd_kafka_t *rk, const char *reason) {
+
+ if (rk->rk_consumer.assignment.all->cnt == 0)
+ return;
+
+ rd_kafka_dbg(rk, CGRP, "PAUSE",
+ "Pausing fetchers for %d assigned partition(s): %s",
+ rk->rk_consumer.assignment.all->cnt, reason);
+
+ rd_kafka_toppars_pause_resume(rk, rd_true /*pause*/, RD_ASYNC,
+ RD_KAFKA_TOPPAR_F_LIB_PAUSE,
+ rk->rk_consumer.assignment.all);
+}
+
+/**
+ * @brief Resume fetching of the currently assigned partitions which have
+ * previously been paused by rd_kafka_assignment_pause().
+ */
+void rd_kafka_assignment_resume(rd_kafka_t *rk, const char *reason) {
+
+ if (rk->rk_consumer.assignment.all->cnt == 0)
+ return;
+
+ rd_kafka_dbg(rk, CGRP, "PAUSE",
+ "Resuming fetchers for %d assigned partition(s): %s",
+ rk->rk_consumer.assignment.all->cnt, reason);
+
+ rd_kafka_toppars_pause_resume(rk, rd_false /*resume*/, RD_ASYNC,
+ RD_KAFKA_TOPPAR_F_LIB_PAUSE,
+ rk->rk_consumer.assignment.all);
+}
+
+
+
+/**
+ * @brief Destroy assignment state (but not \p assignment itself)
+ */
+void rd_kafka_assignment_destroy(rd_kafka_t *rk) {
+ if (!rk->rk_consumer.assignment.all)
+ return; /* rd_kafka_assignment_init() not called */
+ rd_kafka_topic_partition_list_destroy(rk->rk_consumer.assignment.all);
+ rd_kafka_topic_partition_list_destroy(
+ rk->rk_consumer.assignment.pending);
+ rd_kafka_topic_partition_list_destroy(
+ rk->rk_consumer.assignment.queried);
+ rd_kafka_topic_partition_list_destroy(
+ rk->rk_consumer.assignment.removed);
+}
+
+
+/**
+ * @brief Initialize the assignment struct.
+ */
+void rd_kafka_assignment_init(rd_kafka_t *rk) {
+ rk->rk_consumer.assignment.all = rd_kafka_topic_partition_list_new(100);
+ rk->rk_consumer.assignment.pending =
+ rd_kafka_topic_partition_list_new(100);
+ rk->rk_consumer.assignment.queried =
+ rd_kafka_topic_partition_list_new(100);
+ rk->rk_consumer.assignment.removed =
+ rd_kafka_topic_partition_list_new(100);
+}