diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_idempotence.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_idempotence.c | 807 |
1 files changed, 807 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_idempotence.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_idempotence.c new file mode 100644 index 00000000..3245e856 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_idempotence.c @@ -0,0 +1,807 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2018 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rd.h" +#include "rdkafka_int.h" +#include "rdkafka_idempotence.h" +#include "rdkafka_txnmgr.h" +#include "rdkafka_request.h" +#include "rdunittest.h" + +#include <stdarg.h> + +/** + * @name Idempotent Producer logic + * + * + * Unrecoverable idempotent producer errors that could jeopardize the + * idempotency guarantees if the producer was to continue operating + * are treated as fatal errors, unless the producer is transactional in which + * case the current transaction will fail (also known as an abortable error) + * but the producer will not raise a fatal error. + * + */ + +static void rd_kafka_idemp_pid_timer_restart(rd_kafka_t *rk, + rd_bool_t immediate, + const char *reason); + + +/** + * @brief Set the producer's idempotence state. + * @locks rd_kafka_wrlock() MUST be held + */ +void rd_kafka_idemp_set_state(rd_kafka_t *rk, + rd_kafka_idemp_state_t new_state) { + + if (rk->rk_eos.idemp_state == new_state) + return; + + if (rd_kafka_fatal_error_code(rk) && + new_state != RD_KAFKA_IDEMP_STATE_FATAL_ERROR && + new_state != RD_KAFKA_IDEMP_STATE_TERM && + new_state != RD_KAFKA_IDEMP_STATE_DRAIN_RESET && + new_state != RD_KAFKA_IDEMP_STATE_DRAIN_BUMP) { + rd_kafka_dbg(rk, EOS, "IDEMPSTATE", + "Denying state change %s -> %s since a " + "fatal error has been raised", + rd_kafka_idemp_state2str(rk->rk_eos.idemp_state), + rd_kafka_idemp_state2str(new_state)); + rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_FATAL_ERROR); + return; + } + + rd_kafka_dbg(rk, EOS, "IDEMPSTATE", + "Idempotent producer state change %s -> %s", + rd_kafka_idemp_state2str(rk->rk_eos.idemp_state), + rd_kafka_idemp_state2str(new_state)); + + rk->rk_eos.idemp_state = new_state; + rk->rk_eos.ts_idemp_state = rd_clock(); + + /* Inform transaction manager of state change */ + if (rd_kafka_is_transactional(rk)) + rd_kafka_txn_idemp_state_change(rk, new_state); +} + + + +/** + * @brief Find a usable broker suitable for acquiring Pid + * or Coordinator query. + * + * @locks rd_kafka_wrlock() MUST be held + * + * @returns a broker with increased refcount, or NULL on error. + */ +rd_kafka_broker_t *rd_kafka_idemp_broker_any(rd_kafka_t *rk, + rd_kafka_resp_err_t *errp, + char *errstr, + size_t errstr_size) { + rd_kafka_broker_t *rkb; + int up_cnt; + + rkb = rd_kafka_broker_any_up(rk, &up_cnt, + rd_kafka_broker_filter_non_idempotent, + NULL, "acquire ProducerID"); + if (rkb) + return rkb; + + if (up_cnt > 0) { + *errp = RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + rd_snprintf(errstr, errstr_size, + "%s not supported by " + "any of the %d connected broker(s): requires " + "Apache Kafka broker version >= 0.11.0", + rd_kafka_is_transactional(rk) + ? "Transactions" + : "Idempotent producer", + up_cnt); + } else { + *errp = RD_KAFKA_RESP_ERR__TRANSPORT; + rd_snprintf(errstr, errstr_size, + "No brokers available for %s (%d broker(s) known)", + rd_kafka_is_transactional(rk) + ? "Transactions" + : "Idempotent producer", + rd_atomic32_get(&rk->rk_broker_cnt)); + } + + rd_kafka_dbg(rk, EOS, "PIDBROKER", "%s", errstr); + + return NULL; +} + + + +/** + * @brief Check if an error needs special attention, possibly + * raising a fatal error. + * + * @param is_fatal if true, force fatal error regardless of error code. + * + * @returns rd_true if a fatal error was triggered, else rd_false. + * + * @locks rd_kafka_wrlock() MUST be held + * @locality rdkafka main thread + */ +rd_bool_t rd_kafka_idemp_check_error(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + const char *errstr, + rd_bool_t is_fatal) { + const char *preface = ""; + + switch (err) { + case RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE: + case RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT: + case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED: + case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED: + is_fatal = rd_true; + break; + + case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH: + case RD_KAFKA_RESP_ERR_PRODUCER_FENCED: + is_fatal = rd_true; + /* Normalize error */ + err = RD_KAFKA_RESP_ERR__FENCED; + preface = "Producer fenced by newer instance: "; + break; + + default: + break; + } + + if (!is_fatal) + return rd_false; + + if (rd_kafka_is_transactional(rk)) + rd_kafka_txn_set_fatal_error(rk, RD_DONT_LOCK, err, "%s%s", + preface, errstr); + else + rd_kafka_set_fatal_error0(rk, RD_DONT_LOCK, err, "%s%s", + preface, errstr); + + rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_FATAL_ERROR); + + return rd_true; +} + + + +/** + * @brief State machine for PID acquisition for the idempotent + * and transactional producers. + * + * @locality rdkafka main thread + * @locks rd_kafka_wrlock() MUST be held. + */ +void rd_kafka_idemp_pid_fsm(rd_kafka_t *rk) { + rd_kafka_resp_err_t err; + char errstr[512]; + rd_kafka_broker_t *rkb; + rd_bool_t is_fatal = rd_false; + + /* If a fatal error has been raised we do not + * attempt to acquire a PID. */ + if (unlikely(rd_kafka_fatal_error_code(rk))) + return; + +redo: + switch (rk->rk_eos.idemp_state) { + case RD_KAFKA_IDEMP_STATE_INIT: + case RD_KAFKA_IDEMP_STATE_TERM: + case RD_KAFKA_IDEMP_STATE_FATAL_ERROR: + break; + + case RD_KAFKA_IDEMP_STATE_REQ_PID: + /* Request (new) PID */ + + /* The idempotent producer may ask any broker for a PID, + * while the transactional producer needs to ask its + * transaction coordinator for a PID. */ + if (!rd_kafka_is_transactional(rk) || + rk->rk_eos.txn_curr_coord) { + rd_kafka_idemp_set_state( + rk, RD_KAFKA_IDEMP_STATE_WAIT_TRANSPORT); + goto redo; + } + + + /* + * Look up transaction coordinator. + * When the coordinator is known this FSM will be called again. + */ + if (rd_kafka_txn_coord_query(rk, "Acquire PID")) + return; /* Fatal error */ + break; + + case RD_KAFKA_IDEMP_STATE_WAIT_TRANSPORT: + /* Waiting for broker/coordinator to become available */ + if (rd_kafka_is_transactional(rk)) { + /* Check that a proper coordinator broker has + * been assigned by inspecting txn_curr_coord + * (the real broker) rather than txn_coord + * (the logical broker). */ + if (!rk->rk_eos.txn_curr_coord) { + /* + * Can happen if the coordinator wasn't set or + * wasn't up initially and has been set to NULL + * after a COORDINATOR_NOT_AVAILABLE error in + * FindCoordinatorResponse. When the coordinator + * is known this FSM will be called again. + */ + rd_kafka_txn_coord_query( + rk, "Awaiting coordinator"); + return; + } + rkb = rk->rk_eos.txn_coord; + rd_kafka_broker_keep(rkb); + + } else { + rkb = rd_kafka_idemp_broker_any(rk, &err, errstr, + sizeof(errstr)); + + if (!rkb && rd_kafka_idemp_check_error(rk, err, errstr, + rd_false)) + return; /* Fatal error */ + } + + if (!rkb || !rd_kafka_broker_is_up(rkb)) { + /* The coordinator broker monitor will re-trigger + * the fsm sooner if txn_coord has a state change, + * else rely on the timer to retry. */ + rd_kafka_idemp_pid_timer_restart( + rk, rd_false, + rkb ? "No broker available" : "Coordinator not up"); + + if (rkb) + rd_kafka_broker_destroy(rkb); + return; + } + + if (rd_kafka_is_transactional(rk)) { + int err_of = 0; + + /* If this is a transactional producer and the + * PID-epoch needs to be bumped we'll require KIP-360 + * support on the broker, else raise a fatal error. */ + + if (rd_kafka_pid_valid(rk->rk_eos.pid)) { + rd_rkb_dbg(rkb, EOS, "GETPID", + "Requesting ProducerId bump for %s", + rd_kafka_pid2str(rk->rk_eos.pid)); + err_of = rd_snprintf(errstr, sizeof(errstr), + "Failed to request " + "ProducerId bump: "); + rd_assert(err_of < 0 || + err_of < (int)sizeof(errstr)); + } else { + rd_rkb_dbg(rkb, EOS, "GETPID", + "Acquiring ProducerId"); + } + + err = rd_kafka_InitProducerIdRequest( + rkb, rk->rk_conf.eos.transactional_id, + rk->rk_conf.eos.transaction_timeout_ms, + rd_kafka_pid_valid(rk->rk_eos.pid) ? &rk->rk_eos.pid + : NULL, + errstr + err_of, sizeof(errstr) - err_of, + RD_KAFKA_REPLYQ(rk->rk_ops, 0), + rd_kafka_handle_InitProducerId, NULL); + + if (err == RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE && + rd_kafka_pid_valid(rk->rk_eos.pid)) + is_fatal = rd_true; + } else { + rd_rkb_dbg(rkb, EOS, "GETPID", "Acquiring ProducerId"); + + err = rd_kafka_InitProducerIdRequest( + rkb, NULL, -1, NULL, errstr, sizeof(errstr), + RD_KAFKA_REPLYQ(rk->rk_ops, 0), + rd_kafka_handle_InitProducerId, NULL); + } + + if (err) { + rd_rkb_dbg(rkb, EOS, "GETPID", + "Can't acquire ProducerId from " + "this broker: %s", + errstr); + } + + rd_kafka_broker_destroy(rkb); + + if (err) { + if (rd_kafka_idemp_check_error(rk, err, errstr, + is_fatal)) + return; /* Fatal error */ + + /* The coordinator broker monitor will re-trigger + * the fsm sooner if txn_coord has a state change, + * else rely on the timer to retry. */ + rd_kafka_idemp_pid_timer_restart(rk, rd_false, errstr); + return; + } + + rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_WAIT_PID); + break; + + case RD_KAFKA_IDEMP_STATE_WAIT_PID: + /* PID requested, waiting for reply */ + break; + + case RD_KAFKA_IDEMP_STATE_ASSIGNED: + /* New PID assigned */ + break; + + case RD_KAFKA_IDEMP_STATE_DRAIN_RESET: + /* Wait for outstanding ProduceRequests to finish + * before resetting and re-requesting a new PID. */ + break; + + case RD_KAFKA_IDEMP_STATE_DRAIN_BUMP: + /* Wait for outstanding ProduceRequests to finish + * before bumping the current epoch. */ + break; + + case RD_KAFKA_IDEMP_STATE_WAIT_TXN_ABORT: + /* Wait for txnmgr to abort its current transaction + * and then trigger a drain & reset or bump. */ + break; + } +} + + +/** + * @brief Timed PID retrieval timer callback. + * + * @locality rdkafka main thread + * @locks none + */ +static void rd_kafka_idemp_pid_timer_cb(rd_kafka_timers_t *rkts, void *arg) { + rd_kafka_t *rk = arg; + + rd_kafka_wrlock(rk); + rd_kafka_idemp_pid_fsm(rk); + rd_kafka_wrunlock(rk); +} + + +/** + * @brief Restart the pid retrieval timer. + * + * @param immediate If true, request a pid as soon as possible, + * else use the default interval (500ms). + * @locality any + * @locks none + */ +static void rd_kafka_idemp_pid_timer_restart(rd_kafka_t *rk, + rd_bool_t immediate, + const char *reason) { + rd_kafka_dbg(rk, EOS, "TXN", "Starting PID FSM timer%s: %s", + immediate ? " (fire immediately)" : "", reason); + rd_kafka_timer_start_oneshot(&rk->rk_timers, &rk->rk_eos.pid_tmr, + rd_true, + 1000 * (immediate ? 1 : 500 /*500ms*/), + rd_kafka_idemp_pid_timer_cb, rk); +} + + +/** + * @brief Handle failure to acquire a PID from broker. + * + * @locality rdkafka main thread + * @locks none + */ +void rd_kafka_idemp_request_pid_failed(rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err) { + rd_kafka_t *rk = rkb->rkb_rk; + char errstr[512]; + + rd_rkb_dbg(rkb, EOS, "GETPID", "Failed to acquire PID: %s", + rd_kafka_err2str(err)); + + if (err == RD_KAFKA_RESP_ERR__DESTROY) + return; /* Ignore */ + + rd_assert(thrd_is_current(rk->rk_thread)); + + rd_snprintf(errstr, sizeof(errstr), + "Failed to acquire %s PID from broker %s: %s", + rd_kafka_is_transactional(rk) ? "transactional" + : "idempotence", + rd_kafka_broker_name(rkb), rd_kafka_err2str(err)); + + rd_kafka_wrlock(rk); + + if (rd_kafka_idemp_check_error(rk, err, errstr, rd_false)) { + rd_kafka_wrunlock(rk); + return; /* Fatal error */ + } + + RD_UT_COVERAGE(0); + + if (rd_kafka_is_transactional(rk) && + (err == RD_KAFKA_RESP_ERR_NOT_COORDINATOR || + err == RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE)) + rd_kafka_txn_coord_set(rk, NULL, "%s", errstr); + + /* This error code is read by init_transactions() for propagation + * to the application. */ + rk->rk_eos.txn_init_err = err; + + rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_REQ_PID); + + rd_kafka_wrunlock(rk); + + rd_kafka_log(rk, LOG_WARNING, "GETPID", "%s: retrying", errstr); + + /* Restart acquisition after a short wait */ + rd_kafka_idemp_pid_timer_restart(rk, rd_false, errstr); +} + + +/** + * @brief Update Producer ID from InitProducerId response. + * + * @locality rdkafka main thread + * @locks none + */ +void rd_kafka_idemp_pid_update(rd_kafka_broker_t *rkb, + const rd_kafka_pid_t pid) { + rd_kafka_t *rk = rkb->rkb_rk; + + rd_kafka_wrlock(rk); + if (rk->rk_eos.idemp_state != RD_KAFKA_IDEMP_STATE_WAIT_PID) { + rd_rkb_dbg(rkb, EOS, "GETPID", + "Ignoring InitProduceId response (%s) " + "in state %s", + rd_kafka_pid2str(pid), + rd_kafka_idemp_state2str(rk->rk_eos.idemp_state)); + rd_kafka_wrunlock(rk); + return; + } + + if (!rd_kafka_pid_valid(pid)) { + rd_kafka_wrunlock(rk); + rd_rkb_log(rkb, LOG_WARNING, "GETPID", + "Acquired invalid PID{%" PRId64 ",%hd}: ignoring", + pid.id, pid.epoch); + rd_kafka_idemp_request_pid_failed(rkb, + RD_KAFKA_RESP_ERR__BAD_MSG); + return; + } + + if (rd_kafka_pid_valid(rk->rk_eos.pid)) + rd_kafka_dbg(rk, EOS, "GETPID", "Acquired %s (previous %s)", + rd_kafka_pid2str(pid), + rd_kafka_pid2str(rk->rk_eos.pid)); + else + rd_kafka_dbg(rk, EOS, "GETPID", "Acquired %s", + rd_kafka_pid2str(pid)); + rk->rk_eos.pid = pid; + rk->rk_eos.epoch_cnt++; + + /* The idempotence state change will trigger the transaction manager, + * see rd_kafka_txn_idemp_state_change(). */ + rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_ASSIGNED); + + rd_kafka_wrunlock(rk); + + /* Wake up all broker threads (that may have messages to send + * that were waiting for a Producer ID). */ + rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT, + "PID updated"); +} + + +/** + * @brief Call when all partition request queues + * are drained to reset and re-request a new PID. + * + * @locality any + * @locks none + */ +static void rd_kafka_idemp_drain_done(rd_kafka_t *rk) { + rd_bool_t restart_tmr = rd_false; + rd_bool_t wakeup_brokers = rd_false; + + rd_kafka_wrlock(rk); + if (rk->rk_eos.idemp_state == RD_KAFKA_IDEMP_STATE_DRAIN_RESET) { + rd_kafka_dbg(rk, EOS, "DRAIN", "All partitions drained"); + rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_REQ_PID); + restart_tmr = rd_true; + + } else if (rk->rk_eos.idemp_state == RD_KAFKA_IDEMP_STATE_DRAIN_BUMP && + rd_kafka_pid_valid(rk->rk_eos.pid)) { + + if (rd_kafka_is_transactional(rk)) { + /* The epoch bump needs to be performed by the + * coordinator by sending it an InitPid request. */ + rd_kafka_dbg(rk, EOS, "DRAIN", + "All partitions drained, asking " + "coordinator to bump epoch (currently %s)", + rd_kafka_pid2str(rk->rk_eos.pid)); + rd_kafka_idemp_set_state(rk, + RD_KAFKA_IDEMP_STATE_REQ_PID); + restart_tmr = rd_true; + + } else { + /* The idempotent producer can bump its own epoch */ + rk->rk_eos.pid = rd_kafka_pid_bump(rk->rk_eos.pid); + rd_kafka_dbg(rk, EOS, "DRAIN", + "All partitions drained, bumped " + "epoch to %s", + rd_kafka_pid2str(rk->rk_eos.pid)); + rd_kafka_idemp_set_state(rk, + RD_KAFKA_IDEMP_STATE_ASSIGNED); + wakeup_brokers = rd_true; + } + } + rd_kafka_wrunlock(rk); + + /* Restart timer to eventually trigger a re-request */ + if (restart_tmr) + rd_kafka_idemp_pid_timer_restart(rk, rd_true, "Drain done"); + + /* Wake up all broker threads (that may have messages to send + * that were waiting for a Producer ID). */ + if (wakeup_brokers) + rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT, + "message drain done"); +} + +/** + * @brief Check if in-flight toppars drain is done, if so transition to + * next state. + * + * @locality any + * @locks none + */ +static RD_INLINE void rd_kafka_idemp_check_drain_done(rd_kafka_t *rk) { + if (rd_atomic32_get(&rk->rk_eos.inflight_toppar_cnt) == 0) + rd_kafka_idemp_drain_done(rk); +} + + +/** + * @brief Schedule a reset and re-request of PID when the + * local ProduceRequest queues have been fully drained. + * + * The PID is not reset until the queues are fully drained. + * + * @locality any + * @locks none + */ +void rd_kafka_idemp_drain_reset(rd_kafka_t *rk, const char *reason) { + rd_kafka_wrlock(rk); + rd_kafka_dbg(rk, EOS, "DRAIN", + "Beginning partition drain for %s reset " + "for %d partition(s) with in-flight requests: %s", + rd_kafka_pid2str(rk->rk_eos.pid), + rd_atomic32_get(&rk->rk_eos.inflight_toppar_cnt), reason); + rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_DRAIN_RESET); + rd_kafka_wrunlock(rk); + + /* Check right away if the drain could be done. */ + rd_kafka_idemp_check_drain_done(rk); +} + + +/** + * @brief Schedule an epoch bump when the local ProduceRequest queues + * have been fully drained. + * + * The PID is not bumped until the queues are fully drained and the current + * transaction is aborted (if any). + * + * @param allow_txn_abort If this is a transactional producer and this flag is + * true then we trigger an abortable txn error to abort + * the current transaction first. The txnmgr will later + * call us back with this flag set to false to go ahead + * with the epoch bump. + * @param fmt is a human-readable reason for the bump + * + * + * @locality any + * @locks none + */ +void rd_kafka_idemp_drain_epoch_bump0(rd_kafka_t *rk, + rd_bool_t allow_txn_abort, + rd_kafka_resp_err_t err, + const char *fmt, + ...) { + va_list ap; + char buf[256]; + rd_bool_t requires_txn_abort = + allow_txn_abort && rd_kafka_is_transactional(rk); + + va_start(ap, fmt); + rd_vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + + rd_kafka_wrlock(rk); + + + if (requires_txn_abort) { + rd_kafka_dbg(rk, EOS, "DRAIN", + "Need transaction abort before beginning " + "partition drain in state %s for %s epoch bump " + "for %d partition(s) with in-flight requests: %s", + rd_kafka_idemp_state2str(rk->rk_eos.idemp_state), + rd_kafka_pid2str(rk->rk_eos.pid), + rd_atomic32_get(&rk->rk_eos.inflight_toppar_cnt), + buf); + rd_kafka_idemp_set_state(rk, + RD_KAFKA_IDEMP_STATE_WAIT_TXN_ABORT); + + } else { + rd_kafka_dbg(rk, EOS, "DRAIN", + "Beginning partition drain in state %s " + "for %s epoch bump " + "for %d partition(s) with in-flight requests: %s", + rd_kafka_idemp_state2str(rk->rk_eos.idemp_state), + rd_kafka_pid2str(rk->rk_eos.pid), + rd_atomic32_get(&rk->rk_eos.inflight_toppar_cnt), + buf); + + rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_DRAIN_BUMP); + } + + rd_kafka_wrunlock(rk); + + if (requires_txn_abort) { + /* Transactions: bumping the epoch requires the current + * transaction to be aborted first. */ + rd_kafka_txn_set_abortable_error_with_bump(rk, err, "%s", buf); + + } else { + /* Idempotent producer: check right away if the drain could + * be done. */ + rd_kafka_idemp_check_drain_done(rk); + } +} + +/** + * @brief Mark partition as waiting-to-drain. + * + * @locks toppar_lock MUST be held + * @locality broker thread (leader or not) + */ +void rd_kafka_idemp_drain_toppar(rd_kafka_toppar_t *rktp, const char *reason) { + if (rktp->rktp_eos.wait_drain) + return; + + rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, EOS | RD_KAFKA_DBG_TOPIC, "DRAIN", + "%.*s [%" PRId32 "] beginning partition drain: %s", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition, reason); + rktp->rktp_eos.wait_drain = rd_true; +} + + +/** + * @brief Mark partition as no longer having a ProduceRequest in-flight. + * + * @locality any + * @locks none + */ +void rd_kafka_idemp_inflight_toppar_sub(rd_kafka_t *rk, + rd_kafka_toppar_t *rktp) { + int r = rd_atomic32_sub(&rk->rk_eos.inflight_toppar_cnt, 1); + + if (r == 0) { + /* Check if we're waiting for the partitions to drain + * before resetting the PID, and if so trigger a reset + * since this was the last drained one. */ + rd_kafka_idemp_drain_done(rk); + } else { + rd_assert(r >= 0); + } +} + + +/** + * @brief Mark partition as having a ProduceRequest in-flight. + * + * @locality toppar handler thread + * @locks none + */ +void rd_kafka_idemp_inflight_toppar_add(rd_kafka_t *rk, + rd_kafka_toppar_t *rktp) { + rd_atomic32_add(&rk->rk_eos.inflight_toppar_cnt, 1); +} + + + +/** + * @brief Start idempotent producer (asynchronously). + * + * @locality rdkafka main thread + * @locks none + */ +void rd_kafka_idemp_start(rd_kafka_t *rk, rd_bool_t immediate) { + + if (rd_kafka_terminating(rk)) + return; + + rd_kafka_wrlock(rk); + /* Don't restart PID acquisition if there's already an outstanding + * request. */ + if (rk->rk_eos.idemp_state != RD_KAFKA_IDEMP_STATE_WAIT_PID) + rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_REQ_PID); + rd_kafka_wrunlock(rk); + + /* Schedule request timer */ + rd_kafka_idemp_pid_timer_restart(rk, immediate, + "Starting idempotent producer"); +} + + +/** + * @brief Initialize the idempotent producer. + * + * @remark Must be called from rd_kafka_new() and only once. + * @locality rdkafka main thread + * @locks none / not needed from rd_kafka_new() + */ +void rd_kafka_idemp_init(rd_kafka_t *rk) { + rd_assert(thrd_is_current(rk->rk_thread)); + + rd_atomic32_init(&rk->rk_eos.inflight_toppar_cnt, 0); + rd_kafka_pid_reset(&rk->rk_eos.pid); + + /* The transactional producer acquires the PID + * from init_transactions(), for non-transactional producers + * the PID can be acquired right away. */ + if (rd_kafka_is_transactional(rk)) + rd_kafka_txns_init(rk); + else + /* There are no available brokers this early, + * so just set the state to indicate that we want to + * acquire a PID as soon as possible and start + * the timer. */ + rd_kafka_idemp_start(rk, rd_false /*non-immediate*/); +} + + +/** + * @brief Terminate and clean up idempotent producer + * + * @locality rdkafka main thread + * @locks rd_kafka_wrlock() MUST be held + */ +void rd_kafka_idemp_term(rd_kafka_t *rk) { + rd_assert(thrd_is_current(rk->rk_thread)); + + rd_kafka_wrlock(rk); + if (rd_kafka_is_transactional(rk)) + rd_kafka_txns_term(rk); + rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_TERM); + rd_kafka_wrunlock(rk); + rd_kafka_timer_stop(&rk->rk_timers, &rk->rk_eos.pid_tmr, 1); +} |