diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignment.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignment.c | 968 |
1 files changed, 968 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignment.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignment.c new file mode 100644 index 000000000..dc4bdae94 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_assignment.c @@ -0,0 +1,968 @@ +/* + * librdkafka - The Apache Kafka C/C++ library + * + * Copyright (c) 2020 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +/** + * @name Consumer assignment state. + * + * Responsible for managing the state of assigned partitions. + * + * + ****************************************************************************** + * rd_kafka_assignment_serve() + * --------------------------- + * + * It is important to call rd_kafka_assignment_serve() after each change + * to the assignment through assignment_add, assignment_subtract or + * assignment_clear as those functions only modify the assignment but does + * not take any action to transition partitions to or from the assignment + * states. + * + * The reason assignment_serve() is not automatically called from these + * functions is for the caller to be able to set the current state before + * the side-effects of serve() kick in, such as the call to + * rd_kafka_cgrp_assignment_done() that in turn will set the cgrp state. + * + * + * + ****************************************************************************** + * Querying for committed offsets (.queried list) + * ---------------------------------------------- + * + * We only allow one outstanding query (fetch committed offset), this avoids + * complex handling of partitions that are assigned, unassigned and reassigned + * all within the window of a OffsetFetch request. + * Consider the following case: + * + * 1. tp1 and tp2 are incrementally assigned. + * 2. An OffsetFetchRequest is sent for tp1 and tp2 + * 3. tp2 is incremental unassigned. + * 4. Broker sends OffsetFetchResponse with offsets tp1=10, tp2=20. + * 4. Some other consumer commits offsets 30 for tp2. + * 5. tp2 is incrementally assigned again. + * 6. The OffsetFetchResponse is received. + * + * Without extra handling the consumer would start fetching tp1 at offset 10 + * (which is correct) and tp2 at offset 20 (which is incorrect, the last + * committed offset is now 30). + * + * To alleviate this situation we remove unassigned partitions from the + * .queried list, and in the OffsetFetch response handler we only use offsets + * for partitions that are on the .queried list. + * + * To make sure the tp1 offset is used and not re-queried we only allow + * one outstanding OffsetFetch request at the time, meaning that at step 5 + * a new OffsetFetch request will not be sent and tp2 will remain in the + * .pending list until the outstanding OffsetFetch response is received in + * step 6. At this point tp2 will transition to .queried and a new + * OffsetFetch request will be sent. + * + * This explanation is more verbose than the code involved. + * + ****************************************************************************** + * + * + * @remark Try to keep any cgrp state out of this file. + * + * FIXME: There are some pretty obvious optimizations that needs to be done here + * with regards to partition_list_t lookups. But we can do that when + * we know the current implementation works correctly. + */ + +#include "rdkafka_int.h" +#include "rdkafka_offset.h" +#include "rdkafka_request.h" + + +static void rd_kafka_assignment_dump(rd_kafka_t *rk) { + rd_kafka_dbg(rk, CGRP, "DUMP", + "Assignment dump (started_cnt=%d, wait_stop_cnt=%d)", + rk->rk_consumer.assignment.started_cnt, + rk->rk_consumer.assignment.wait_stop_cnt); + + rd_kafka_topic_partition_list_log(rk, "DUMP_ALL", RD_KAFKA_DBG_CGRP, + rk->rk_consumer.assignment.all); + + rd_kafka_topic_partition_list_log(rk, "DUMP_PND", RD_KAFKA_DBG_CGRP, + rk->rk_consumer.assignment.pending); + + rd_kafka_topic_partition_list_log(rk, "DUMP_QRY", RD_KAFKA_DBG_CGRP, + rk->rk_consumer.assignment.queried); + + rd_kafka_topic_partition_list_log(rk, "DUMP_REM", RD_KAFKA_DBG_CGRP, + rk->rk_consumer.assignment.removed); +} + +/** + * @brief Apply the fetched committed offsets to the current assignment's + * queried partitions. + * + * @param err is the request-level error, if any. The caller is responsible + * for raising this error to the application. It is only used here + * to avoid taking actions. + * + * Called from the FetchOffsets response handler below. + */ +static void +rd_kafka_assignment_apply_offsets(rd_kafka_t *rk, + rd_kafka_topic_partition_list_t *offsets, + rd_kafka_resp_err_t err) { + rd_kafka_topic_partition_t *rktpar; + + RD_KAFKA_TPLIST_FOREACH(rktpar, offsets) { + /* May be NULL, borrow ref. */ + rd_kafka_toppar_t *rktp = + rd_kafka_topic_partition_toppar(rk, rktpar); + + if (!rd_kafka_topic_partition_list_del( + rk->rk_consumer.assignment.queried, rktpar->topic, + rktpar->partition)) { + rd_kafka_dbg(rk, CGRP, "OFFSETFETCH", + "Ignoring OffsetFetch " + "response for %s [%" PRId32 + "] which is no " + "longer in the queried list " + "(possibly unassigned?)", + rktpar->topic, rktpar->partition); + continue; + } + + if (err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT || + rktpar->err == RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT) { + /* Ongoing transactions are blocking offset retrieval. + * This is typically retried from the OffsetFetch + * handler but we can come here if the assignment + * (and thus the assignment.version) was changed while + * the OffsetFetch request was in-flight, in which case + * we put this partition back on the pending list for + * later handling by the assignment state machine. */ + + rd_kafka_dbg(rk, CGRP, "OFFSETFETCH", + "Adding %s [%" PRId32 + "] back to pending " + "list because on-going transaction is " + "blocking offset retrieval", + rktpar->topic, rktpar->partition); + + rd_kafka_topic_partition_list_add_copy( + rk->rk_consumer.assignment.pending, rktpar); + + } else if (rktpar->err) { + /* Partition-level error */ + rd_kafka_consumer_err( + rk->rk_consumer.q, RD_KAFKA_NODEID_UA, rktpar->err, + 0, rktpar->topic, rktp, RD_KAFKA_OFFSET_INVALID, + "Failed to fetch committed offset for " + "group \"%s\" topic %s [%" PRId32 "]: %s", + rk->rk_group_id->str, rktpar->topic, + rktpar->partition, rd_kafka_err2str(rktpar->err)); + + /* The partition will not be added back to .pending + * and thus only reside on .all until the application + * unassigns it and possible re-assigns it. */ + + } else if (!err) { + /* If rktpar->offset is RD_KAFKA_OFFSET_INVALID it means + * there was no committed offset for this partition. + * serve_pending() will now start this partition + * since the offset is set to INVALID (rather than + * STORED) and the partition fetcher will employ + * auto.offset.reset to know what to do. */ + + /* Add partition to pending list where serve() + * will start the fetcher. */ + rd_kafka_dbg(rk, CGRP, "OFFSETFETCH", + "Adding %s [%" PRId32 + "] back to pending " + "list with offset %s", + rktpar->topic, rktpar->partition, + rd_kafka_offset2str(rktpar->offset)); + + rd_kafka_topic_partition_list_add_copy( + rk->rk_consumer.assignment.pending, rktpar); + } + /* Do nothing for request-level errors (err is set). */ + } + + if (offsets->cnt > 0) + rd_kafka_assignment_serve(rk); +} + + + +/** + * @brief Reply handler for OffsetFetch queries from the assignment code. + * + * @param opaque Is a malloced int64_t* containing the assignment version at the + * time of the request. + * + * @locality rdkafka main thread + */ +static void rd_kafka_assignment_handle_OffsetFetch(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *reply, + rd_kafka_buf_t *request, + void *opaque) { + rd_kafka_topic_partition_list_t *offsets = NULL; + int64_t *req_assignment_version = (int64_t *)opaque; + /* Only allow retries if there's been no change to the assignment, + * otherwise rely on assignment state machine to retry. */ + rd_bool_t allow_retry = + *req_assignment_version == rk->rk_consumer.assignment.version; + + if (err == RD_KAFKA_RESP_ERR__DESTROY) { + /* Termination, quick cleanup. */ + rd_free(req_assignment_version); + return; + } + + err = rd_kafka_handle_OffsetFetch( + rk, rkb, err, reply, request, &offsets, + rd_true /* Update toppars */, rd_true /* Add parts */, allow_retry); + if (err == RD_KAFKA_RESP_ERR__IN_PROGRESS) { + if (offsets) + rd_kafka_topic_partition_list_destroy(offsets); + return; /* retrying */ + } + + rd_free(req_assignment_version); + + /* offsets may be NULL for certain errors, such + * as ERR__TRANSPORT. */ + if (!offsets && !allow_retry) { + rd_dassert(err); + if (!err) + err = RD_KAFKA_RESP_ERR__NO_OFFSET; + + rd_kafka_dbg(rk, CGRP, "OFFSET", "Offset fetch error: %s", + rd_kafka_err2str(err)); + rd_kafka_consumer_err( + rk->rk_consumer.q, rd_kafka_broker_id(rkb), err, 0, NULL, + NULL, RD_KAFKA_OFFSET_INVALID, + "Failed to fetch committed " + "offsets for partitions " + "in group \"%s\": %s", + rk->rk_group_id->str, rd_kafka_err2str(err)); + + return; + } + + + + if (err) { + rd_kafka_dbg(rk, CGRP, "OFFSET", + "Offset fetch error for %d partition(s): %s", + offsets->cnt, rd_kafka_err2str(err)); + rd_kafka_consumer_err( + rk->rk_consumer.q, rd_kafka_broker_id(rkb), err, 0, NULL, + NULL, RD_KAFKA_OFFSET_INVALID, + "Failed to fetch committed offsets for " + "%d partition(s) in group \"%s\": %s", + offsets->cnt, rk->rk_group_id->str, rd_kafka_err2str(err)); + } + + /* Apply the fetched offsets to the assignment */ + rd_kafka_assignment_apply_offsets(rk, offsets, err); + + rd_kafka_topic_partition_list_destroy(offsets); +} + + +/** + * @brief Decommission all partitions in the removed list. + * + * @returns >0 if there are removal operations in progress, else 0. + */ +static int rd_kafka_assignment_serve_removals(rd_kafka_t *rk) { + rd_kafka_topic_partition_t *rktpar; + int valid_offsets = 0; + + RD_KAFKA_TPLIST_FOREACH(rktpar, rk->rk_consumer.assignment.removed) { + rd_kafka_toppar_t *rktp = + rd_kafka_topic_partition_ensure_toppar( + rk, rktpar, rd_true); /* Borrow ref */ + int was_pending, was_queried; + + /* Remove partition from pending and querying lists, + * if it happens to be there. + * Outstanding OffsetFetch query results will be ignored + * for partitions that are no longer on the .queried list. */ + was_pending = rd_kafka_topic_partition_list_del( + rk->rk_consumer.assignment.pending, rktpar->topic, + rktpar->partition); + was_queried = rd_kafka_topic_partition_list_del( + rk->rk_consumer.assignment.queried, rktpar->topic, + rktpar->partition); + + if (rktp->rktp_started) { + /* Partition was started, stop the fetcher. */ + rd_assert(rk->rk_consumer.assignment.started_cnt > 0); + + rd_kafka_toppar_op_fetch_stop( + rktp, RD_KAFKA_REPLYQ(rk->rk_ops, 0)); + rk->rk_consumer.assignment.wait_stop_cnt++; + } + + /* Reset the (lib) pause flag which may have been set by + * the cgrp when scheduling the rebalance callback. */ + rd_kafka_toppar_op_pause_resume(rktp, rd_false /*resume*/, + RD_KAFKA_TOPPAR_F_LIB_PAUSE, + RD_KAFKA_NO_REPLYQ); + + rd_kafka_toppar_lock(rktp); + + /* Save the currently stored offset and epoch on .removed + * so it will be committed below. */ + rd_kafka_topic_partition_set_from_fetch_pos( + rktpar, rktp->rktp_stored_pos); + valid_offsets += !RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset); + + /* Reset the stored offset to invalid so that + * a manual offset-less commit() or the auto-committer + * will not commit a stored offset from a previous + * assignment (issue #2782). */ + rd_kafka_offset_store0( + rktp, RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, -1), + rd_true, RD_DONT_LOCK); + + /* Partition is no longer desired */ + rd_kafka_toppar_desired_del(rktp); + + rd_assert((rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ASSIGNED)); + rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_ASSIGNED; + + rd_kafka_toppar_unlock(rktp); + + rd_kafka_dbg(rk, CGRP, "REMOVE", + "Removing %s [%" PRId32 + "] from assignment " + "(started=%s, pending=%s, queried=%s, " + "stored offset=%s)", + rktpar->topic, rktpar->partition, + RD_STR_ToF(rktp->rktp_started), + RD_STR_ToF(was_pending), RD_STR_ToF(was_queried), + rd_kafka_offset2str(rktpar->offset)); + } + + rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "REMOVE", + "Served %d removed partition(s), " + "with %d offset(s) to commit", + rk->rk_consumer.assignment.removed->cnt, valid_offsets); + + /* If enable.auto.commit=true: + * Commit final offsets to broker for the removed partitions, + * unless this is a consumer destruction with a close() call. */ + if (valid_offsets > 0 && + rk->rk_conf.offset_store_method == RD_KAFKA_OFFSET_METHOD_BROKER && + rk->rk_cgrp && rk->rk_conf.enable_auto_commit && + !rd_kafka_destroy_flags_no_consumer_close(rk)) + rd_kafka_cgrp_assigned_offsets_commit( + rk->rk_cgrp, rk->rk_consumer.assignment.removed, + rd_false /* use offsets from .removed */, + "unassigned partitions"); + + rd_kafka_topic_partition_list_clear(rk->rk_consumer.assignment.removed); + + return rk->rk_consumer.assignment.wait_stop_cnt + + rk->rk_consumer.wait_commit_cnt; +} + + +/** + * @brief Serve all partitions in the pending list. + * + * This either (asynchronously) queries the partition's committed offset, or + * if the start offset is known, starts the partition fetcher. + * + * @returns >0 if there are pending operations in progress for the current + * assignment, else 0. + */ +static int rd_kafka_assignment_serve_pending(rd_kafka_t *rk) { + rd_kafka_topic_partition_list_t *partitions_to_query = NULL; + /* We can query committed offsets only if all of the following are true: + * - We have a group coordinator. + * - There are no outstanding commits (since we might need to + * read back those commits as our starting position). + * - There are no outstanding queries already (since we want to + * avoid using a earlier queries response for a partition that + * is unassigned and then assigned again). + */ + rd_kafka_broker_t *coord = + rk->rk_cgrp ? rd_kafka_cgrp_get_coord(rk->rk_cgrp) : NULL; + rd_bool_t can_query_offsets = + coord && rk->rk_consumer.wait_commit_cnt == 0 && + rk->rk_consumer.assignment.queried->cnt == 0; + int i; + + if (can_query_offsets) + partitions_to_query = rd_kafka_topic_partition_list_new( + rk->rk_consumer.assignment.pending->cnt); + + /* Scan the list backwards so removals are cheap (no array shuffle) */ + for (i = rk->rk_consumer.assignment.pending->cnt - 1; i >= 0; i--) { + rd_kafka_topic_partition_t *rktpar = + &rk->rk_consumer.assignment.pending->elems[i]; + /* Borrow ref */ + rd_kafka_toppar_t *rktp = + rd_kafka_topic_partition_ensure_toppar(rk, rktpar, rd_true); + + rd_assert(!rktp->rktp_started); + + if (!RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset) || + rktpar->offset == RD_KAFKA_OFFSET_BEGINNING || + rktpar->offset == RD_KAFKA_OFFSET_END || + rktpar->offset == RD_KAFKA_OFFSET_INVALID || + rktpar->offset <= RD_KAFKA_OFFSET_TAIL_BASE) { + /* The partition fetcher can handle absolute + * as well as beginning/end/tail start offsets, so we're + * ready to start the fetcher now. + * The INVALID offset means there was no committed + * offset and the partition fetcher will employ + * auto.offset.reset. + * + * Start fetcher for partition and forward partition's + * fetchq to consumer group's queue. */ + + rd_kafka_dbg(rk, CGRP, "SRVPEND", + "Starting pending assigned partition " + "%s [%" PRId32 "] at %s", + rktpar->topic, rktpar->partition, + rd_kafka_fetch_pos2str( + rd_kafka_topic_partition_get_fetch_pos( + rktpar))); + + /* Reset the (lib) pause flag which may have been set by + * the cgrp when scheduling the rebalance callback. */ + rd_kafka_toppar_op_pause_resume( + rktp, rd_false /*resume*/, + RD_KAFKA_TOPPAR_F_LIB_PAUSE, RD_KAFKA_NO_REPLYQ); + + /* Start the fetcher */ + rktp->rktp_started = rd_true; + rk->rk_consumer.assignment.started_cnt++; + + rd_kafka_toppar_op_fetch_start( + rktp, + rd_kafka_topic_partition_get_fetch_pos(rktpar), + rk->rk_consumer.q, RD_KAFKA_NO_REPLYQ); + + + } else if (can_query_offsets) { + /* Else use the last committed offset for partition. + * We can't rely on any internal cached committed offset + * so we'll accumulate a list of partitions that need + * to be queried and then send FetchOffsetsRequest + * to the group coordinator. */ + + rd_dassert(!rd_kafka_topic_partition_list_find( + rk->rk_consumer.assignment.queried, rktpar->topic, + rktpar->partition)); + + rd_kafka_topic_partition_list_add_copy( + partitions_to_query, rktpar); + + rd_kafka_topic_partition_list_add_copy( + rk->rk_consumer.assignment.queried, rktpar); + + rd_kafka_dbg(rk, CGRP, "SRVPEND", + "Querying committed offset for pending " + "assigned partition %s [%" PRId32 "]", + rktpar->topic, rktpar->partition); + + + } else { + rd_kafka_dbg( + rk, CGRP, "SRVPEND", + "Pending assignment partition " + "%s [%" PRId32 + "] can't fetch committed " + "offset yet " + "(cgrp state %s, awaiting %d commits, " + "%d partition(s) already being queried)", + rktpar->topic, rktpar->partition, + rk->rk_cgrp + ? rd_kafka_cgrp_state_names[rk->rk_cgrp + ->rkcg_state] + : "n/a", + rk->rk_consumer.wait_commit_cnt, + rk->rk_consumer.assignment.queried->cnt); + + continue; /* Keep rktpar on pending list */ + } + + /* Remove rktpar from the pending list */ + rd_kafka_topic_partition_list_del_by_idx( + rk->rk_consumer.assignment.pending, i); + } + + + if (!can_query_offsets) { + if (coord) + rd_kafka_broker_destroy(coord); + return rk->rk_consumer.assignment.pending->cnt + + rk->rk_consumer.assignment.queried->cnt; + } + + + if (partitions_to_query->cnt > 0) { + int64_t *req_assignment_version = rd_malloc(sizeof(int64_t)); + *req_assignment_version = rk->rk_consumer.assignment.version; + + rd_kafka_dbg(rk, CGRP, "OFFSETFETCH", + "Fetching committed offsets for " + "%d pending partition(s) in assignment", + partitions_to_query->cnt); + + rd_kafka_OffsetFetchRequest( + coord, rk->rk_group_id->str, partitions_to_query, + rk->rk_conf.isolation_level == + RD_KAFKA_READ_COMMITTED /*require_stable_offsets*/, + 0, /* Timeout */ + RD_KAFKA_REPLYQ(rk->rk_ops, 0), + rd_kafka_assignment_handle_OffsetFetch, + /* Must be freed by handler */ + (void *)req_assignment_version); + } + + if (coord) + rd_kafka_broker_destroy(coord); + + rd_kafka_topic_partition_list_destroy(partitions_to_query); + + return rk->rk_consumer.assignment.pending->cnt + + rk->rk_consumer.assignment.queried->cnt; +} + + + +/** + * @brief Serve updates to the assignment. + * + * Call on: + * - assignment changes + * - wait_commit_cnt reaches 0 + * - partition fetcher is stopped + */ +void rd_kafka_assignment_serve(rd_kafka_t *rk) { + int inp_removals = 0; + int inp_pending = 0; + + rd_kafka_assignment_dump(rk); + + /* Serve any partitions that should be removed */ + if (rk->rk_consumer.assignment.removed->cnt > 0) + inp_removals = rd_kafka_assignment_serve_removals(rk); + + /* Serve any partitions in the pending list that need further action, + * unless we're waiting for a previous assignment change (an unassign + * in some form) to propagate, or outstanding offset commits + * to finish (since we might need the committed offsets as start + * offsets). */ + if (rk->rk_consumer.assignment.wait_stop_cnt == 0 && + rk->rk_consumer.wait_commit_cnt == 0 && inp_removals == 0 && + rk->rk_consumer.assignment.pending->cnt > 0) + inp_pending = rd_kafka_assignment_serve_pending(rk); + + if (inp_removals + inp_pending + + rk->rk_consumer.assignment.queried->cnt + + rk->rk_consumer.assignment.wait_stop_cnt + + rk->rk_consumer.wait_commit_cnt == + 0) { + /* No assignment operations in progress, + * signal assignment done back to cgrp to let it + * transition to its next state if necessary. + * We may emit this signalling more than necessary and it is + * up to the cgrp to only take action if needed, based on its + * state. */ + rd_kafka_cgrp_assignment_done(rk->rk_cgrp); + } else { + rd_kafka_dbg(rk, CGRP, "ASSIGNMENT", + "Current assignment of %d partition(s) " + "with %d pending adds, %d offset queries, " + "%d partitions awaiting stop and " + "%d offset commits in progress", + rk->rk_consumer.assignment.all->cnt, inp_pending, + rk->rk_consumer.assignment.queried->cnt, + rk->rk_consumer.assignment.wait_stop_cnt, + rk->rk_consumer.wait_commit_cnt); + } +} + + +/** + * @returns true if the current or previous assignment has operations in + * progress, such as waiting for partition fetchers to stop. + */ +rd_bool_t rd_kafka_assignment_in_progress(rd_kafka_t *rk) { + return rk->rk_consumer.wait_commit_cnt > 0 || + rk->rk_consumer.assignment.wait_stop_cnt > 0 || + rk->rk_consumer.assignment.pending->cnt > 0 || + rk->rk_consumer.assignment.queried->cnt > 0 || + rk->rk_consumer.assignment.removed->cnt > 0; +} + + +/** + * @brief Clear the current assignment. + * + * @remark Make sure to call rd_kafka_assignment_serve() after successful + * return from this function. + * + * @returns the number of partitions removed. + */ +int rd_kafka_assignment_clear(rd_kafka_t *rk) { + int cnt = rk->rk_consumer.assignment.all->cnt; + + if (cnt == 0) { + rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "CLEARASSIGN", + "No current assignment to clear"); + return 0; + } + + rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "CLEARASSIGN", + "Clearing current assignment of %d partition(s)", + rk->rk_consumer.assignment.all->cnt); + + rd_kafka_topic_partition_list_clear(rk->rk_consumer.assignment.pending); + rd_kafka_topic_partition_list_clear(rk->rk_consumer.assignment.queried); + + rd_kafka_topic_partition_list_add_list( + rk->rk_consumer.assignment.removed, rk->rk_consumer.assignment.all); + rd_kafka_topic_partition_list_clear(rk->rk_consumer.assignment.all); + + rk->rk_consumer.assignment.version++; + + return cnt; +} + + +/** + * @brief Adds \p partitions to the current assignment. + * + * Will return error if trying to add a partition that is already in the + * assignment. + * + * @remark Make sure to call rd_kafka_assignment_serve() after successful + * return from this function. + */ +rd_kafka_error_t * +rd_kafka_assignment_add(rd_kafka_t *rk, + rd_kafka_topic_partition_list_t *partitions) { + rd_bool_t was_empty = rk->rk_consumer.assignment.all->cnt == 0; + int i; + + /* Make sure there are no duplicates, invalid partitions, or + * invalid offsets in the input partitions. */ + rd_kafka_topic_partition_list_sort(partitions, NULL, NULL); + + for (i = 0; i < partitions->cnt; i++) { + rd_kafka_topic_partition_t *rktpar = &partitions->elems[i]; + const rd_kafka_topic_partition_t *prev = + i > 0 ? &partitions->elems[i - 1] : NULL; + + if (RD_KAFKA_OFFSET_IS_LOGICAL(rktpar->offset) && + rktpar->offset != RD_KAFKA_OFFSET_BEGINNING && + rktpar->offset != RD_KAFKA_OFFSET_END && + rktpar->offset != RD_KAFKA_OFFSET_STORED && + rktpar->offset != RD_KAFKA_OFFSET_INVALID && + rktpar->offset > RD_KAFKA_OFFSET_TAIL_BASE) + return rd_kafka_error_new( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "%s [%" PRId32 + "] has invalid start offset %" PRId64, + rktpar->topic, rktpar->partition, rktpar->offset); + + if (prev && !rd_kafka_topic_partition_cmp(rktpar, prev)) + return rd_kafka_error_new( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "Duplicate %s [%" PRId32 "] in input list", + rktpar->topic, rktpar->partition); + + if (rd_kafka_topic_partition_list_find( + rk->rk_consumer.assignment.all, rktpar->topic, + rktpar->partition)) + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__CONFLICT, + "%s [%" PRId32 + "] is already part of the " + "current assignment", + rktpar->topic, + rktpar->partition); + + /* Translate RD_KAFKA_OFFSET_INVALID to RD_KAFKA_OFFSET_STORED, + * i.e., read from committed offset, since we use INVALID + * internally to differentiate between querying for + * committed offset (STORED) and no committed offset (INVALID). + */ + if (rktpar->offset == RD_KAFKA_OFFSET_INVALID) + rktpar->offset = RD_KAFKA_OFFSET_STORED; + + /* Get toppar object for each partition. + * This is to make sure the rktp stays alive while unassigning + * any previous assignment in the call to + * assignment_clear() below. */ + rd_kafka_topic_partition_ensure_toppar(rk, rktpar, rd_true); + } + + /* Mark all partition objects as assigned and reset the stored + * offsets back to invalid in case it was explicitly stored during + * the time the partition was not assigned. */ + for (i = 0; i < partitions->cnt; i++) { + rd_kafka_topic_partition_t *rktpar = &partitions->elems[i]; + rd_kafka_toppar_t *rktp = + rd_kafka_topic_partition_ensure_toppar(rk, rktpar, rd_true); + + rd_kafka_toppar_lock(rktp); + + rd_assert(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_ASSIGNED)); + rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_ASSIGNED; + + /* Reset the stored offset to INVALID to avoid the race + * condition described in rdkafka_offset.h */ + rd_kafka_offset_store0( + rktp, RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, -1), + rd_true /* force */, RD_DONT_LOCK); + + rd_kafka_toppar_unlock(rktp); + } + + + /* Add the new list of partitions to the current assignment. + * Only need to sort the final assignment if it was non-empty + * to begin with since \p partitions is sorted above. */ + rd_kafka_topic_partition_list_add_list(rk->rk_consumer.assignment.all, + partitions); + if (!was_empty) + rd_kafka_topic_partition_list_sort( + rk->rk_consumer.assignment.all, NULL, NULL); + + /* And add to .pending for serve_pending() to handle. */ + rd_kafka_topic_partition_list_add_list( + rk->rk_consumer.assignment.pending, partitions); + + + rd_kafka_dbg(rk, CONSUMER | RD_KAFKA_DBG_CGRP, "ASSIGNMENT", + "Added %d partition(s) to assignment which " + "now consists of %d partition(s) where of %d are in " + "pending state and %d are being queried", + partitions->cnt, rk->rk_consumer.assignment.all->cnt, + rk->rk_consumer.assignment.pending->cnt, + rk->rk_consumer.assignment.queried->cnt); + + rk->rk_consumer.assignment.version++; + + return NULL; +} + + +/** + * @brief Remove \p partitions from the current assignment. + * + * Will return error if trying to remove a partition that is not in the + * assignment. + * + * @remark Make sure to call rd_kafka_assignment_serve() after successful + * return from this function. + */ +rd_kafka_error_t * +rd_kafka_assignment_subtract(rd_kafka_t *rk, + rd_kafka_topic_partition_list_t *partitions) { + int i; + int matched_queried_partitions = 0; + int assignment_pre_cnt; + + if (rk->rk_consumer.assignment.all->cnt == 0 && partitions->cnt > 0) + return rd_kafka_error_new( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "Can't subtract from empty assignment"); + + /* Verify that all partitions in \p partitions are in the assignment + * before starting to modify the assignment. */ + rd_kafka_topic_partition_list_sort(partitions, NULL, NULL); + + for (i = 0; i < partitions->cnt; i++) { + rd_kafka_topic_partition_t *rktpar = &partitions->elems[i]; + + if (!rd_kafka_topic_partition_list_find( + rk->rk_consumer.assignment.all, rktpar->topic, + rktpar->partition)) + return rd_kafka_error_new( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "%s [%" PRId32 + "] can't be unassigned since " + "it is not in the current assignment", + rktpar->topic, rktpar->partition); + + rd_kafka_topic_partition_ensure_toppar(rk, rktpar, rd_true); + } + + + assignment_pre_cnt = rk->rk_consumer.assignment.all->cnt; + + /* Remove partitions in reverse order to avoid excessive + * array shuffling of .all. + * Add the removed partitions to .pending for serve() to handle. */ + for (i = partitions->cnt - 1; i >= 0; i--) { + const rd_kafka_topic_partition_t *rktpar = + &partitions->elems[i]; + + if (!rd_kafka_topic_partition_list_del( + rk->rk_consumer.assignment.all, rktpar->topic, + rktpar->partition)) + RD_BUG("Removed partition %s [%" PRId32 + "] not found " + "in assignment.all", + rktpar->topic, rktpar->partition); + + if (rd_kafka_topic_partition_list_del( + rk->rk_consumer.assignment.queried, rktpar->topic, + rktpar->partition)) + matched_queried_partitions++; + else + rd_kafka_topic_partition_list_del( + rk->rk_consumer.assignment.pending, rktpar->topic, + rktpar->partition); + + /* Add to .removed list which will be served by + * serve_removals(). */ + rd_kafka_topic_partition_list_add_copy( + rk->rk_consumer.assignment.removed, rktpar); + } + + rd_kafka_dbg(rk, CGRP, "REMOVEASSIGN", + "Removed %d partition(s) " + "(%d with outstanding offset queries) from assignment " + "of %d partition(s)", + partitions->cnt, matched_queried_partitions, + assignment_pre_cnt); + + if (rk->rk_consumer.assignment.all->cnt == 0) { + /* Some safe checking */ + rd_assert(rk->rk_consumer.assignment.pending->cnt == 0); + rd_assert(rk->rk_consumer.assignment.queried->cnt == 0); + } + + rk->rk_consumer.assignment.version++; + + return NULL; +} + + +/** + * @brief Call when partition fetcher has stopped. + */ +void rd_kafka_assignment_partition_stopped(rd_kafka_t *rk, + rd_kafka_toppar_t *rktp) { + rd_assert(rk->rk_consumer.assignment.wait_stop_cnt > 0); + rk->rk_consumer.assignment.wait_stop_cnt--; + + rd_assert(rktp->rktp_started); + rktp->rktp_started = rd_false; + + rd_assert(rk->rk_consumer.assignment.started_cnt > 0); + rk->rk_consumer.assignment.started_cnt--; + + /* If this was the last partition we awaited stop for, serve the + * assignment to transition any existing assignment to the next state */ + if (rk->rk_consumer.assignment.wait_stop_cnt == 0) { + rd_kafka_dbg(rk, CGRP, "STOPSERVE", + "All partitions awaiting stop are now " + "stopped: serving assignment"); + rd_kafka_assignment_serve(rk); + } +} + + +/** + * @brief Pause fetching of the currently assigned partitions. + * + * Partitions will be resumed by calling rd_kafka_assignment_resume() or + * from either serve_removals() or serve_pending() above. + */ +void rd_kafka_assignment_pause(rd_kafka_t *rk, const char *reason) { + + if (rk->rk_consumer.assignment.all->cnt == 0) + return; + + rd_kafka_dbg(rk, CGRP, "PAUSE", + "Pausing fetchers for %d assigned partition(s): %s", + rk->rk_consumer.assignment.all->cnt, reason); + + rd_kafka_toppars_pause_resume(rk, rd_true /*pause*/, RD_ASYNC, + RD_KAFKA_TOPPAR_F_LIB_PAUSE, + rk->rk_consumer.assignment.all); +} + +/** + * @brief Resume fetching of the currently assigned partitions which have + * previously been paused by rd_kafka_assignment_pause(). + */ +void rd_kafka_assignment_resume(rd_kafka_t *rk, const char *reason) { + + if (rk->rk_consumer.assignment.all->cnt == 0) + return; + + rd_kafka_dbg(rk, CGRP, "PAUSE", + "Resuming fetchers for %d assigned partition(s): %s", + rk->rk_consumer.assignment.all->cnt, reason); + + rd_kafka_toppars_pause_resume(rk, rd_false /*resume*/, RD_ASYNC, + RD_KAFKA_TOPPAR_F_LIB_PAUSE, + rk->rk_consumer.assignment.all); +} + + + +/** + * @brief Destroy assignment state (but not \p assignment itself) + */ +void rd_kafka_assignment_destroy(rd_kafka_t *rk) { + if (!rk->rk_consumer.assignment.all) + return; /* rd_kafka_assignment_init() not called */ + rd_kafka_topic_partition_list_destroy(rk->rk_consumer.assignment.all); + rd_kafka_topic_partition_list_destroy( + rk->rk_consumer.assignment.pending); + rd_kafka_topic_partition_list_destroy( + rk->rk_consumer.assignment.queried); + rd_kafka_topic_partition_list_destroy( + rk->rk_consumer.assignment.removed); +} + + +/** + * @brief Initialize the assignment struct. + */ +void rd_kafka_assignment_init(rd_kafka_t *rk) { + rk->rk_consumer.assignment.all = rd_kafka_topic_partition_list_new(100); + rk->rk_consumer.assignment.pending = + rd_kafka_topic_partition_list_new(100); + rk->rk_consumer.assignment.queried = + rd_kafka_topic_partition_list_new(100); + rk->rk_consumer.assignment.removed = + rd_kafka_topic_partition_list_new(100); +} |