diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-19 02:57:58 +0000 |
commit | be1c7e50e1e8809ea56f2c9d472eccd8ffd73a97 (patch) | |
tree | 9754ff1ca740f6346cf8483ec915d4054bc5da2d /fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_txnmgr.c | |
parent | Initial commit. (diff) | |
download | netdata-upstream.tar.xz netdata-upstream.zip |
Adding upstream version 1.44.3.upstream/1.44.3upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_txnmgr.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_txnmgr.c | 3249 |
1 files changed, 3249 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_txnmgr.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_txnmgr.c new file mode 100644 index 00000000..afbc28b7 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_txnmgr.c @@ -0,0 +1,3249 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2019 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * @name Transaction Manager + * + */ + +#include <stdarg.h> + +#include "rd.h" +#include "rdkafka_int.h" +#include "rdkafka_txnmgr.h" +#include "rdkafka_idempotence.h" +#include "rdkafka_request.h" +#include "rdkafka_error.h" +#include "rdunittest.h" +#include "rdrand.h" + + +static void rd_kafka_txn_coord_timer_start(rd_kafka_t *rk, int timeout_ms); + +#define rd_kafka_txn_curr_api_set_result(rk, actions, error) \ + rd_kafka_txn_curr_api_set_result0(__FUNCTION__, __LINE__, rk, actions, \ + error) +static void rd_kafka_txn_curr_api_set_result0(const char *func, + int line, + rd_kafka_t *rk, + int actions, + rd_kafka_error_t *error); + + + +/** + * @return a normalized error code, this for instance abstracts different + * fencing errors to return one single fencing error to the application. + */ +static rd_kafka_resp_err_t rd_kafka_txn_normalize_err(rd_kafka_resp_err_t err) { + + switch (err) { + case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH: + case RD_KAFKA_RESP_ERR_PRODUCER_FENCED: + return RD_KAFKA_RESP_ERR__FENCED; + case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE: + return RD_KAFKA_RESP_ERR__TIMED_OUT; + default: + return err; + } +} + + +/** + * @brief Ensure client is configured as a transactional producer, + * else return error. + * + * @locality application thread + * @locks none + */ +static RD_INLINE rd_kafka_error_t * +rd_kafka_ensure_transactional(const rd_kafka_t *rk) { + if (unlikely(rk->rk_type != RD_KAFKA_PRODUCER)) + return rd_kafka_error_new( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "The Transactional API can only be used " + "on producer instances"); + + if (unlikely(!rk->rk_conf.eos.transactional_id)) + return rd_kafka_error_new(RD_KAFKA_RESP_ERR__NOT_CONFIGURED, + "The Transactional API requires " + "transactional.id to be configured"); + + return NULL; +} + + + +/** + * @brief Ensure transaction state is one of \p states. + * + * @param the required states, ended by a -1 sentinel. + * + * @locks_required rd_kafka_*lock(rk) MUST be held + * @locality any + */ +static RD_INLINE rd_kafka_error_t * +rd_kafka_txn_require_states0(rd_kafka_t *rk, rd_kafka_txn_state_t states[]) { + rd_kafka_error_t *error; + size_t i; + + if (unlikely((error = rd_kafka_ensure_transactional(rk)) != NULL)) + return error; + + for (i = 0; (int)states[i] != -1; i++) + if (rk->rk_eos.txn_state == states[i]) + return NULL; + + /* For fatal and abortable states return the last transactional + * error, for all other states just return a state error. */ + if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_FATAL_ERROR) + error = rd_kafka_error_new_fatal(rk->rk_eos.txn_err, "%s", + rk->rk_eos.txn_errstr); + else if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_ABORTABLE_ERROR) { + error = rd_kafka_error_new(rk->rk_eos.txn_err, "%s", + rk->rk_eos.txn_errstr); + rd_kafka_error_set_txn_requires_abort(error); + } else + error = rd_kafka_error_new( + RD_KAFKA_RESP_ERR__STATE, "Operation not valid in state %s", + rd_kafka_txn_state2str(rk->rk_eos.txn_state)); + + + return error; +} + +/** @brief \p ... is a list of states */ +#define rd_kafka_txn_require_state(rk, ...) \ + rd_kafka_txn_require_states0( \ + rk, (rd_kafka_txn_state_t[]) {__VA_ARGS__, -1}) + + + +/** + * @param ignore Will be set to true if the state transition should be + * completely ignored. + * @returns true if the state transition is valid, else false. + */ +static rd_bool_t +rd_kafka_txn_state_transition_is_valid(rd_kafka_txn_state_t curr, + rd_kafka_txn_state_t new_state, + rd_bool_t *ignore) { + + *ignore = rd_false; + + switch (new_state) { + case RD_KAFKA_TXN_STATE_INIT: + /* This is the initialized value and this transition will + * never happen. */ + return rd_false; + + case RD_KAFKA_TXN_STATE_WAIT_PID: + return curr == RD_KAFKA_TXN_STATE_INIT; + + case RD_KAFKA_TXN_STATE_READY_NOT_ACKED: + return curr == RD_KAFKA_TXN_STATE_WAIT_PID; + + case RD_KAFKA_TXN_STATE_READY: + return curr == RD_KAFKA_TXN_STATE_READY_NOT_ACKED || + curr == RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED || + curr == RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED; + + case RD_KAFKA_TXN_STATE_IN_TRANSACTION: + return curr == RD_KAFKA_TXN_STATE_READY; + + case RD_KAFKA_TXN_STATE_BEGIN_COMMIT: + return curr == RD_KAFKA_TXN_STATE_IN_TRANSACTION; + + case RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION: + return curr == RD_KAFKA_TXN_STATE_BEGIN_COMMIT; + + case RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED: + return curr == RD_KAFKA_TXN_STATE_BEGIN_COMMIT || + curr == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION; + + case RD_KAFKA_TXN_STATE_BEGIN_ABORT: + return curr == RD_KAFKA_TXN_STATE_IN_TRANSACTION || + curr == RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION || + curr == RD_KAFKA_TXN_STATE_ABORTABLE_ERROR; + + case RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION: + return curr == RD_KAFKA_TXN_STATE_BEGIN_ABORT; + + case RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED: + return curr == RD_KAFKA_TXN_STATE_BEGIN_ABORT || + curr == RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION; + + case RD_KAFKA_TXN_STATE_ABORTABLE_ERROR: + if (curr == RD_KAFKA_TXN_STATE_BEGIN_ABORT || + curr == RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION || + curr == RD_KAFKA_TXN_STATE_FATAL_ERROR) { + /* Ignore sub-sequent abortable errors in + * these states. */ + *ignore = rd_true; + return 1; + } + + return curr == RD_KAFKA_TXN_STATE_IN_TRANSACTION || + curr == RD_KAFKA_TXN_STATE_BEGIN_COMMIT || + curr == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION; + + case RD_KAFKA_TXN_STATE_FATAL_ERROR: + /* Any state can transition to a fatal error */ + return rd_true; + + default: + RD_BUG("Invalid txn state transition: %s -> %s", + rd_kafka_txn_state2str(curr), + rd_kafka_txn_state2str(new_state)); + return rd_false; + } +} + + +/** + * @brief Transition the transaction state to \p new_state. + * + * @returns 0 on success or an error code if the state transition + * was invalid. + * + * @locality rdkafka main thread + * @locks_required rd_kafka_wrlock MUST be held + */ +static void rd_kafka_txn_set_state(rd_kafka_t *rk, + rd_kafka_txn_state_t new_state) { + rd_bool_t ignore; + + if (rk->rk_eos.txn_state == new_state) + return; + + /* Check if state transition is valid */ + if (!rd_kafka_txn_state_transition_is_valid(rk->rk_eos.txn_state, + new_state, &ignore)) { + rd_kafka_log(rk, LOG_CRIT, "TXNSTATE", + "BUG: Invalid transaction state transition " + "attempted: %s -> %s", + rd_kafka_txn_state2str(rk->rk_eos.txn_state), + rd_kafka_txn_state2str(new_state)); + + rd_assert(!*"BUG: Invalid transaction state transition"); + } + + if (ignore) { + /* Ignore this state change */ + return; + } + + rd_kafka_dbg(rk, EOS, "TXNSTATE", "Transaction state change %s -> %s", + rd_kafka_txn_state2str(rk->rk_eos.txn_state), + rd_kafka_txn_state2str(new_state)); + + /* If transitioning from IN_TRANSACTION, the app is no longer + * allowed to enqueue (produce) messages. */ + if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_IN_TRANSACTION) + rd_atomic32_set(&rk->rk_eos.txn_may_enq, 0); + else if (new_state == RD_KAFKA_TXN_STATE_IN_TRANSACTION) + rd_atomic32_set(&rk->rk_eos.txn_may_enq, 1); + + rk->rk_eos.txn_state = new_state; +} + + +/** + * @returns the current transaction timeout, i.e., the time remaining in + * the current transaction. + * + * @remark The remaining timeout is currently not tracked, so this function + * will always return the remaining time based on transaction.timeout.ms + * and we rely on the broker to enforce the actual remaining timeout. + * This is still better than not having a timeout cap at all, which + * used to be the case. + * It's also tricky knowing exactly what the controller thinks the + * remaining transaction time is. + * + * @locks_required rd_kafka_*lock(rk) MUST be held. + */ +static RD_INLINE rd_ts_t rd_kafka_txn_current_timeout(const rd_kafka_t *rk) { + return rd_timeout_init(rk->rk_conf.eos.transaction_timeout_ms); +} + + +/** + * @brief An unrecoverable transactional error has occurred. + * + * @param do_lock RD_DO_LOCK: rd_kafka_wrlock(rk) will be acquired and released, + * RD_DONT_LOCK: rd_kafka_wrlock(rk) MUST be held by the caller. + * @locality any + * @locks rd_kafka_wrlock MUST NOT be held + */ +void rd_kafka_txn_set_fatal_error(rd_kafka_t *rk, + rd_dolock_t do_lock, + rd_kafka_resp_err_t err, + const char *fmt, + ...) { + char errstr[512]; + va_list ap; + + va_start(ap, fmt); + vsnprintf(errstr, sizeof(errstr), fmt, ap); + va_end(ap); + + rd_kafka_log(rk, LOG_ALERT, "TXNERR", + "Fatal transaction error: %s (%s)", errstr, + rd_kafka_err2name(err)); + + if (do_lock) + rd_kafka_wrlock(rk); + rd_kafka_set_fatal_error0(rk, RD_DONT_LOCK, err, "%s", errstr); + + rk->rk_eos.txn_err = err; + if (rk->rk_eos.txn_errstr) + rd_free(rk->rk_eos.txn_errstr); + rk->rk_eos.txn_errstr = rd_strdup(errstr); + + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_FATAL_ERROR); + + if (do_lock) + rd_kafka_wrunlock(rk); + + /* If application has called a transactional API and + * it has now failed, reply to the app. + * If there is no currently called API then this is a no-op. */ + rd_kafka_txn_curr_api_set_result( + rk, 0, rd_kafka_error_new_fatal(err, "%s", errstr)); +} + + +/** + * @brief An abortable/recoverable transactional error has occured. + * + * @param requires_epoch_bump If true; abort_transaction() will bump the epoch + * on the coordinator (KIP-360). + + * @locality rdkafka main thread + * @locks rd_kafka_wrlock MUST NOT be held + */ +void rd_kafka_txn_set_abortable_error0(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_bool_t requires_epoch_bump, + const char *fmt, + ...) { + char errstr[512]; + va_list ap; + + if (rd_kafka_fatal_error(rk, NULL, 0)) { + rd_kafka_dbg(rk, EOS, "FATAL", + "Not propagating abortable transactional " + "error (%s) " + "since previous fatal error already raised", + rd_kafka_err2name(err)); + return; + } + + va_start(ap, fmt); + vsnprintf(errstr, sizeof(errstr), fmt, ap); + va_end(ap); + + rd_kafka_wrlock(rk); + + if (requires_epoch_bump) + rk->rk_eos.txn_requires_epoch_bump = requires_epoch_bump; + + if (rk->rk_eos.txn_err) { + rd_kafka_dbg(rk, EOS, "TXNERR", + "Ignoring sub-sequent abortable transaction " + "error: %s (%s): " + "previous error (%s) already raised", + errstr, rd_kafka_err2name(err), + rd_kafka_err2name(rk->rk_eos.txn_err)); + rd_kafka_wrunlock(rk); + return; + } + + rk->rk_eos.txn_err = err; + if (rk->rk_eos.txn_errstr) + rd_free(rk->rk_eos.txn_errstr); + rk->rk_eos.txn_errstr = rd_strdup(errstr); + + rd_kafka_log(rk, LOG_ERR, "TXNERR", + "Current transaction failed in state %s: %s (%s%s)", + rd_kafka_txn_state2str(rk->rk_eos.txn_state), errstr, + rd_kafka_err2name(err), + requires_epoch_bump ? ", requires epoch bump" : ""); + + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_ABORTABLE_ERROR); + rd_kafka_wrunlock(rk); + + /* Purge all messages in queue/flight */ + rd_kafka_purge(rk, RD_KAFKA_PURGE_F_QUEUE | RD_KAFKA_PURGE_F_ABORT_TXN | + RD_KAFKA_PURGE_F_NON_BLOCKING); +} + + + +/** + * @brief Send request-reply op to txnmgr callback, waits for a reply + * or timeout, and returns an error object or NULL on success. + * + * @remark Does not alter the current API state. + * + * @returns an error object on failure, else NULL. + * + * @locality application thread + * + * @locks_acquired rk->rk_eos.txn_curr_api.lock + */ +#define rd_kafka_txn_op_req(rk, op_cb, abs_timeout) \ + rd_kafka_txn_op_req0(__FUNCTION__, __LINE__, rk, \ + rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN, op_cb), \ + abs_timeout) +#define rd_kafka_txn_op_req1(rk, rko, abs_timeout) \ + rd_kafka_txn_op_req0(__FUNCTION__, __LINE__, rk, rko, abs_timeout) +static rd_kafka_error_t *rd_kafka_txn_op_req0(const char *func, + int line, + rd_kafka_t *rk, + rd_kafka_op_t *rko, + rd_ts_t abs_timeout) { + rd_kafka_error_t *error = NULL; + rd_bool_t has_result = rd_false; + + mtx_lock(&rk->rk_eos.txn_curr_api.lock); + + /* See if there's already a result, if so return that immediately. */ + if (rk->rk_eos.txn_curr_api.has_result) { + error = rk->rk_eos.txn_curr_api.error; + rk->rk_eos.txn_curr_api.error = NULL; + rk->rk_eos.txn_curr_api.has_result = rd_false; + mtx_unlock(&rk->rk_eos.txn_curr_api.lock); + rd_kafka_op_destroy(rko); + rd_kafka_dbg(rk, EOS, "OPREQ", + "%s:%d: %s: returning already set result: %s", + func, line, rk->rk_eos.txn_curr_api.name, + error ? rd_kafka_error_string(error) : "Success"); + return error; + } + + /* Send one-way op to txnmgr */ + if (!rd_kafka_q_enq(rk->rk_ops, rko)) + RD_BUG("rk_ops queue disabled"); + + /* Wait for result to be set, or timeout */ + do { + if (cnd_timedwait_ms(&rk->rk_eos.txn_curr_api.cnd, + &rk->rk_eos.txn_curr_api.lock, + rd_timeout_remains(abs_timeout)) == + thrd_timedout) + break; + } while (!rk->rk_eos.txn_curr_api.has_result); + + + + if ((has_result = rk->rk_eos.txn_curr_api.has_result)) { + rk->rk_eos.txn_curr_api.has_result = rd_false; + error = rk->rk_eos.txn_curr_api.error; + rk->rk_eos.txn_curr_api.error = NULL; + } + + mtx_unlock(&rk->rk_eos.txn_curr_api.lock); + + /* If there was no reply it means the background operation is still + * in progress and its result will be set later, so the application + * should call this API again to resume. */ + if (!has_result) { + error = rd_kafka_error_new_retriable( + RD_KAFKA_RESP_ERR__TIMED_OUT, + "Timed out waiting for operation to finish, " + "retry call to resume"); + } + + return error; +} + + +/** + * @brief Begin (or resume) a public API call. + * + * This function will prevent conflicting calls. + * + * @returns an error on failure, or NULL on success. + * + * @locality application thread + * + * @locks_acquired rk->rk_eos.txn_curr_api.lock + */ +static rd_kafka_error_t *rd_kafka_txn_curr_api_begin(rd_kafka_t *rk, + const char *api_name, + rd_bool_t cap_timeout, + int timeout_ms, + rd_ts_t *abs_timeoutp) { + rd_kafka_error_t *error = NULL; + + if ((error = rd_kafka_ensure_transactional(rk))) + return error; + + rd_kafka_rdlock(rk); /* Need lock for retrieving the states */ + rd_kafka_dbg(rk, EOS, "TXNAPI", + "Transactional API called: %s " + "(in txn state %s, idemp state %s, API timeout %d)", + api_name, rd_kafka_txn_state2str(rk->rk_eos.txn_state), + rd_kafka_idemp_state2str(rk->rk_eos.idemp_state), + timeout_ms); + rd_kafka_rdunlock(rk); + + mtx_lock(&rk->rk_eos.txn_curr_api.lock); + + + /* Make sure there is no other conflicting in-progress API call, + * and that this same call is not currently under way in another thread. + */ + if (unlikely(*rk->rk_eos.txn_curr_api.name && + strcmp(rk->rk_eos.txn_curr_api.name, api_name))) { + /* Another API is being called. */ + error = rd_kafka_error_new_retriable( + RD_KAFKA_RESP_ERR__CONFLICT, + "Conflicting %s API call is already in progress", + rk->rk_eos.txn_curr_api.name); + + } else if (unlikely(rk->rk_eos.txn_curr_api.calling)) { + /* There is an active call to this same API + * from another thread. */ + error = rd_kafka_error_new_retriable( + RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS, + "Simultaneous %s API calls not allowed", + rk->rk_eos.txn_curr_api.name); + + } else if (*rk->rk_eos.txn_curr_api.name) { + /* Resumed call */ + rk->rk_eos.txn_curr_api.calling = rd_true; + + } else { + /* New call */ + rd_snprintf(rk->rk_eos.txn_curr_api.name, + sizeof(rk->rk_eos.txn_curr_api.name), "%s", + api_name); + rk->rk_eos.txn_curr_api.calling = rd_true; + rd_assert(!rk->rk_eos.txn_curr_api.error); + } + + if (!error && abs_timeoutp) { + rd_ts_t abs_timeout = rd_timeout_init(timeout_ms); + + if (cap_timeout) { + /* Cap API timeout to remaining transaction timeout */ + rd_ts_t abs_txn_timeout = + rd_kafka_txn_current_timeout(rk); + if (abs_timeout > abs_txn_timeout || + abs_timeout == RD_POLL_INFINITE) + abs_timeout = abs_txn_timeout; + } + + *abs_timeoutp = abs_timeout; + } + + mtx_unlock(&rk->rk_eos.txn_curr_api.lock); + + return error; +} + + + +/** + * @brief Return from public API. + * + * This function updates the current API state and must be used in + * all return statements from the public txn API. + * + * @param resumable If true and the error is retriable, the current API state + * will be maintained to allow a future call to the same API + * to resume the background operation that is in progress. + * @param error The error object, if not NULL, is simply inspected and returned. + * + * @returns the \p error object as-is. + * + * @locality application thread + * @locks_acquired rk->rk_eos.txn_curr_api.lock + */ +#define rd_kafka_txn_curr_api_return(rk, resumable, error) \ + rd_kafka_txn_curr_api_return0(__FUNCTION__, __LINE__, rk, resumable, \ + error) +static rd_kafka_error_t * +rd_kafka_txn_curr_api_return0(const char *func, + int line, + rd_kafka_t *rk, + rd_bool_t resumable, + rd_kafka_error_t *error) { + + mtx_lock(&rk->rk_eos.txn_curr_api.lock); + + rd_kafka_dbg( + rk, EOS, "TXNAPI", "Transactional API %s return%s at %s:%d: %s", + rk->rk_eos.txn_curr_api.name, + resumable && rd_kafka_error_is_retriable(error) ? " resumable" : "", + func, line, error ? rd_kafka_error_string(error) : "Success"); + + rd_assert(*rk->rk_eos.txn_curr_api.name); + rd_assert(rk->rk_eos.txn_curr_api.calling); + + rk->rk_eos.txn_curr_api.calling = rd_false; + + /* Reset the current API call so that other APIs may be called, + * unless this is a resumable API and the error is retriable. */ + if (!resumable || (error && !rd_kafka_error_is_retriable(error))) { + *rk->rk_eos.txn_curr_api.name = '\0'; + /* It is possible for another error to have been set, + * typically when a fatal error is raised, so make sure + * we're not destroying the error we're supposed to return. */ + if (rk->rk_eos.txn_curr_api.error != error) + rd_kafka_error_destroy(rk->rk_eos.txn_curr_api.error); + rk->rk_eos.txn_curr_api.error = NULL; + } + + mtx_unlock(&rk->rk_eos.txn_curr_api.lock); + + return error; +} + + + +/** + * @brief Set the (possibly intermediary) result for the current API call. + * + * The result is \p error NULL for success or \p error object on failure. + * If the application is actively blocked on the call the result will be + * sent on its replyq, otherwise the result will be stored for future retrieval + * the next time the application calls the API again. + * + * @locality rdkafka main thread + * @locks_acquired rk->rk_eos.txn_curr_api.lock + */ +static void rd_kafka_txn_curr_api_set_result0(const char *func, + int line, + rd_kafka_t *rk, + int actions, + rd_kafka_error_t *error) { + + mtx_lock(&rk->rk_eos.txn_curr_api.lock); + + if (!*rk->rk_eos.txn_curr_api.name) { + /* No current API being called, this could happen + * if the application thread API deemed the API was done, + * or for fatal errors that attempt to set the result + * regardless of current API state. + * In this case we simply throw away this result. */ + if (error) + rd_kafka_error_destroy(error); + mtx_unlock(&rk->rk_eos.txn_curr_api.lock); + return; + } + + rd_kafka_dbg(rk, EOS, "APIRESULT", + "Transactional API %s (intermediary%s) result set " + "at %s:%d: %s (%sprevious result%s%s)", + rk->rk_eos.txn_curr_api.name, + rk->rk_eos.txn_curr_api.calling ? ", calling" : "", func, + line, error ? rd_kafka_error_string(error) : "Success", + rk->rk_eos.txn_curr_api.has_result ? "" : "no ", + rk->rk_eos.txn_curr_api.error ? ": " : "", + rd_kafka_error_string(rk->rk_eos.txn_curr_api.error)); + + rk->rk_eos.txn_curr_api.has_result = rd_true; + + + if (rk->rk_eos.txn_curr_api.error) { + /* If there's already an error it typically means + * a fatal error has been raised, so nothing more to do here. */ + rd_kafka_dbg( + rk, EOS, "APIRESULT", + "Transactional API %s error " + "already set: %s", + rk->rk_eos.txn_curr_api.name, + rd_kafka_error_string(rk->rk_eos.txn_curr_api.error)); + + mtx_unlock(&rk->rk_eos.txn_curr_api.lock); + + if (error) + rd_kafka_error_destroy(error); + + return; + } + + if (error) { + if (actions & RD_KAFKA_ERR_ACTION_FATAL) + rd_kafka_error_set_fatal(error); + else if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) + rd_kafka_error_set_txn_requires_abort(error); + else if (actions & RD_KAFKA_ERR_ACTION_RETRY) + rd_kafka_error_set_retriable(error); + } + + rk->rk_eos.txn_curr_api.error = error; + error = NULL; + cnd_broadcast(&rk->rk_eos.txn_curr_api.cnd); + + + mtx_unlock(&rk->rk_eos.txn_curr_api.lock); +} + + + +/** + * @brief The underlying idempotent producer state changed, + * see if this affects the transactional operations. + * + * @locality any thread + * @locks rd_kafka_wrlock(rk) MUST be held + */ +void rd_kafka_txn_idemp_state_change(rd_kafka_t *rk, + rd_kafka_idemp_state_t idemp_state) { + rd_bool_t set_result = rd_false; + + if (idemp_state == RD_KAFKA_IDEMP_STATE_ASSIGNED && + rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_WAIT_PID) { + /* Application is calling (or has called) init_transactions() */ + RD_UT_COVERAGE(1); + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_READY_NOT_ACKED); + set_result = rd_true; + + } else if (idemp_state == RD_KAFKA_IDEMP_STATE_ASSIGNED && + (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_BEGIN_ABORT || + rk->rk_eos.txn_state == + RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION)) { + /* Application is calling abort_transaction() as we're + * recovering from a fatal idempotence error. */ + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED); + set_result = rd_true; + + } else if (idemp_state == RD_KAFKA_IDEMP_STATE_FATAL_ERROR && + rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_FATAL_ERROR) { + /* A fatal error has been raised. */ + + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_FATAL_ERROR); + } + + if (set_result) { + /* Application has called init_transactions() or + * abort_transaction() and it is now complete, + * reply to the app. */ + rd_kafka_txn_curr_api_set_result(rk, 0, NULL); + } +} + + +/** + * @brief Moves a partition from the pending list to the proper list. + * + * @locality rdkafka main thread + * @locks none + */ +static void rd_kafka_txn_partition_registered(rd_kafka_toppar_t *rktp) { + rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; + + rd_kafka_toppar_lock(rktp); + + if (unlikely(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_PEND_TXN))) { + rd_kafka_dbg(rk, EOS | RD_KAFKA_DBG_PROTOCOL, "ADDPARTS", + "\"%.*s\" [%" PRId32 + "] is not in pending " + "list but returned in AddPartitionsToTxn " + "response: ignoring", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition); + rd_kafka_toppar_unlock(rktp); + return; + } + + rd_kafka_dbg(rk, EOS | RD_KAFKA_DBG_TOPIC, "ADDPARTS", + "%.*s [%" PRId32 "] registered with transaction", + RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), + rktp->rktp_partition); + + rd_assert((rktp->rktp_flags & + (RD_KAFKA_TOPPAR_F_PEND_TXN | RD_KAFKA_TOPPAR_F_IN_TXN)) == + RD_KAFKA_TOPPAR_F_PEND_TXN); + + rktp->rktp_flags = (rktp->rktp_flags & ~RD_KAFKA_TOPPAR_F_PEND_TXN) | + RD_KAFKA_TOPPAR_F_IN_TXN; + + rd_kafka_toppar_unlock(rktp); + + mtx_lock(&rk->rk_eos.txn_pending_lock); + TAILQ_REMOVE(&rk->rk_eos.txn_waitresp_rktps, rktp, rktp_txnlink); + mtx_unlock(&rk->rk_eos.txn_pending_lock); + + /* Not destroy()/keep():ing rktp since it just changes tailq. */ + + TAILQ_INSERT_TAIL(&rk->rk_eos.txn_rktps, rktp, rktp_txnlink); +} + + + +/** + * @brief Handle AddPartitionsToTxnResponse + * + * @locality rdkafka main thread + * @locks none + */ +static void rd_kafka_txn_handle_AddPartitionsToTxn(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + const int log_decode_errors = LOG_ERR; + int32_t TopicCnt; + int actions = 0; + int retry_backoff_ms = 500; /* retry backoff */ + rd_kafka_resp_err_t reset_coord_err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_bool_t require_bump = rd_false; + + if (err) + goto done; + + rd_kafka_rdlock(rk); + rd_assert(rk->rk_eos.txn_state != + RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION); + + if (rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_IN_TRANSACTION && + rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_BEGIN_COMMIT) { + /* Response received after aborting transaction */ + rd_rkb_dbg(rkb, EOS, "ADDPARTS", + "Ignoring outdated AddPartitionsToTxn response in " + "state %s", + rd_kafka_txn_state2str(rk->rk_eos.txn_state)); + rd_kafka_rdunlock(rk); + err = RD_KAFKA_RESP_ERR__OUTDATED; + goto done; + } + rd_kafka_rdunlock(rk); + + rd_kafka_buf_read_throttle_time(rkbuf); + + rd_kafka_buf_read_i32(rkbuf, &TopicCnt); + + while (TopicCnt-- > 0) { + rd_kafkap_str_t Topic; + rd_kafka_topic_t *rkt; + int32_t PartCnt; + rd_bool_t request_error = rd_false; + + rd_kafka_buf_read_str(rkbuf, &Topic); + rd_kafka_buf_read_i32(rkbuf, &PartCnt); + + rkt = rd_kafka_topic_find0(rk, &Topic); + if (rkt) + rd_kafka_topic_rdlock(rkt); /* for toppar_get() */ + + while (PartCnt-- > 0) { + rd_kafka_toppar_t *rktp = NULL; + int32_t Partition; + int16_t ErrorCode; + int p_actions = 0; + + rd_kafka_buf_read_i32(rkbuf, &Partition); + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + + if (rkt) + rktp = rd_kafka_toppar_get(rkt, Partition, + rd_false); + + if (!rktp) { + rd_rkb_dbg(rkb, EOS | RD_KAFKA_DBG_PROTOCOL, + "ADDPARTS", + "Unknown partition \"%.*s\" " + "[%" PRId32 + "] in AddPartitionsToTxn " + "response: ignoring", + RD_KAFKAP_STR_PR(&Topic), Partition); + continue; + } + + switch (ErrorCode) { + case RD_KAFKA_RESP_ERR_NO_ERROR: + /* Move rktp from pending to proper list */ + rd_kafka_txn_partition_registered(rktp); + break; + + /* Request-level errors. + * As soon as any of these errors are seen + * the rest of the partitions are ignored + * since they will have the same error. */ + case RD_KAFKA_RESP_ERR_NOT_COORDINATOR: + case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE: + reset_coord_err = ErrorCode; + p_actions |= RD_KAFKA_ERR_ACTION_RETRY; + err = ErrorCode; + request_error = rd_true; + break; + + case RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS: + retry_backoff_ms = 20; + /* FALLTHRU */ + case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: + case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART: + p_actions |= RD_KAFKA_ERR_ACTION_RETRY; + err = ErrorCode; + request_error = rd_true; + break; + + case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH: + case RD_KAFKA_RESP_ERR_PRODUCER_FENCED: + case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED: + case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE: + case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED: + p_actions |= RD_KAFKA_ERR_ACTION_FATAL; + err = ErrorCode; + request_error = rd_true; + break; + + case RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID: + case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING: + require_bump = rd_true; + p_actions |= RD_KAFKA_ERR_ACTION_PERMANENT; + err = ErrorCode; + request_error = rd_true; + break; + + /* Partition-level errors. + * Continue with rest of partitions. */ + case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED: + p_actions |= RD_KAFKA_ERR_ACTION_PERMANENT; + err = ErrorCode; + break; + + case RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED: + /* Partition skipped due to other partition's + * error. */ + p_actions |= RD_KAFKA_ERR_ACTION_PERMANENT; + if (!err) + err = ErrorCode; + break; + + default: + /* Other partition error */ + p_actions |= RD_KAFKA_ERR_ACTION_PERMANENT; + err = ErrorCode; + break; + } + + if (ErrorCode) { + actions |= p_actions; + + if (!(p_actions & + (RD_KAFKA_ERR_ACTION_FATAL | + RD_KAFKA_ERR_ACTION_PERMANENT))) + rd_rkb_dbg( + rkb, EOS, "ADDPARTS", + "AddPartitionsToTxn response: " + "partition \"%.*s\": " + "[%" PRId32 "]: %s", + RD_KAFKAP_STR_PR(&Topic), Partition, + rd_kafka_err2str(ErrorCode)); + else + rd_rkb_log(rkb, LOG_ERR, "ADDPARTS", + "Failed to add partition " + "\"%.*s\" [%" PRId32 + "] to " + "transaction: %s", + RD_KAFKAP_STR_PR(&Topic), + Partition, + rd_kafka_err2str(ErrorCode)); + } + + rd_kafka_toppar_destroy(rktp); + + if (request_error) + break; /* Request-level error seen, bail out */ + } + + if (rkt) { + rd_kafka_topic_rdunlock(rkt); + rd_kafka_topic_destroy0(rkt); + } + + if (request_error) + break; /* Request-level error seen, bail out */ + } + + if (actions) /* Actions set from encountered errors */ + goto done; + + /* Since these partitions are now allowed to produce + * we wake up all broker threads. */ + rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT, + "partitions added to transaction"); + + goto done; + +err_parse: + err = rkbuf->rkbuf_err; + actions |= RD_KAFKA_ERR_ACTION_PERMANENT; + +done: + if (err) { + rd_assert(rk->rk_eos.txn_req_cnt > 0); + rk->rk_eos.txn_req_cnt--; + } + + /* Handle local request-level errors */ + switch (err) { + case RD_KAFKA_RESP_ERR_NO_ERROR: + break; + + case RD_KAFKA_RESP_ERR__DESTROY: + case RD_KAFKA_RESP_ERR__OUTDATED: + /* Terminating or outdated, ignore response */ + return; + + case RD_KAFKA_RESP_ERR__TRANSPORT: + case RD_KAFKA_RESP_ERR__TIMED_OUT: + default: + /* For these errors we can't be sure if the + * request was received by the broker or not, + * so increase the txn_req_cnt back up as if + * they were received so that and EndTxnRequest + * is sent on abort_transaction(). */ + rk->rk_eos.txn_req_cnt++; + actions |= RD_KAFKA_ERR_ACTION_RETRY; + break; + } + + if (reset_coord_err) { + rd_kafka_wrlock(rk); + rd_kafka_txn_coord_set(rk, NULL, + "AddPartitionsToTxn failed: %s", + rd_kafka_err2str(reset_coord_err)); + rd_kafka_wrunlock(rk); + } + + /* Partitions that failed will still be on the waitresp list + * and are moved back to the pending list for the next scheduled + * AddPartitionsToTxn request. + * If this request was successful there will be no remaining partitions + * on the waitresp list. + */ + mtx_lock(&rk->rk_eos.txn_pending_lock); + TAILQ_CONCAT_SORTED(&rk->rk_eos.txn_pending_rktps, + &rk->rk_eos.txn_waitresp_rktps, rd_kafka_toppar_t *, + rktp_txnlink, rd_kafka_toppar_topic_cmp); + mtx_unlock(&rk->rk_eos.txn_pending_lock); + + err = rd_kafka_txn_normalize_err(err); + + if (actions & RD_KAFKA_ERR_ACTION_FATAL) { + rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err, + "Failed to add partitions to " + "transaction: %s", + rd_kafka_err2str(err)); + + } else if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) { + /* Treat all other permanent errors as abortable errors. + * If an epoch bump is required let idempo sort it out. */ + if (require_bump) + rd_kafka_idemp_drain_epoch_bump( + rk, err, + "Failed to add partition(s) to transaction " + "on broker %s: %s (after %d ms)", + rd_kafka_broker_name(rkb), rd_kafka_err2str(err), + (int)(request->rkbuf_ts_sent / 1000)); + else + rd_kafka_txn_set_abortable_error( + rk, err, + "Failed to add partition(s) to transaction " + "on broker %s: %s (after %d ms)", + rd_kafka_broker_name(rkb), rd_kafka_err2str(err), + (int)(request->rkbuf_ts_sent / 1000)); + + } else { + /* Schedule registration of any new or remaining partitions */ + rd_kafka_txn_schedule_register_partitions( + rk, (actions & RD_KAFKA_ERR_ACTION_RETRY) + ? retry_backoff_ms + : 1 /*immediate*/); + } +} + + +/** + * @brief Send AddPartitionsToTxnRequest to the transaction coordinator. + * + * @locality rdkafka main thread + * @locks none + */ +static void rd_kafka_txn_register_partitions(rd_kafka_t *rk) { + char errstr[512]; + rd_kafka_resp_err_t err; + rd_kafka_error_t *error; + rd_kafka_pid_t pid; + + /* Require operational state */ + rd_kafka_rdlock(rk); + error = + rd_kafka_txn_require_state(rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION, + RD_KAFKA_TXN_STATE_BEGIN_COMMIT); + + if (unlikely(error != NULL)) { + rd_kafka_rdunlock(rk); + rd_kafka_dbg(rk, EOS, "ADDPARTS", + "Not registering partitions: %s", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + return; + } + + /* Get pid, checked later */ + pid = rd_kafka_idemp_get_pid0(rk, RD_DONT_LOCK, rd_false); + + rd_kafka_rdunlock(rk); + + /* Transaction coordinator needs to be up */ + if (!rd_kafka_broker_is_up(rk->rk_eos.txn_coord)) { + rd_kafka_dbg(rk, EOS, "ADDPARTS", + "Not registering partitions: " + "coordinator is not available"); + return; + } + + mtx_lock(&rk->rk_eos.txn_pending_lock); + if (TAILQ_EMPTY(&rk->rk_eos.txn_pending_rktps)) { + /* No pending partitions to register */ + mtx_unlock(&rk->rk_eos.txn_pending_lock); + return; + } + + if (!TAILQ_EMPTY(&rk->rk_eos.txn_waitresp_rktps)) { + /* Only allow one outstanding AddPartitionsToTxnRequest */ + mtx_unlock(&rk->rk_eos.txn_pending_lock); + rd_kafka_dbg(rk, EOS, "ADDPARTS", + "Not registering partitions: waiting for " + "previous AddPartitionsToTxn request to complete"); + return; + } + + /* Require valid pid */ + if (unlikely(!rd_kafka_pid_valid(pid))) { + mtx_unlock(&rk->rk_eos.txn_pending_lock); + rd_kafka_dbg(rk, EOS, "ADDPARTS", + "Not registering partitions: " + "No PID available (idempotence state %s)", + rd_kafka_idemp_state2str(rk->rk_eos.idemp_state)); + rd_dassert(!*"BUG: No PID despite proper transaction state"); + return; + } + + + /* Send request to coordinator */ + err = rd_kafka_AddPartitionsToTxnRequest( + rk->rk_eos.txn_coord, rk->rk_conf.eos.transactional_id, pid, + &rk->rk_eos.txn_pending_rktps, errstr, sizeof(errstr), + RD_KAFKA_REPLYQ(rk->rk_ops, 0), + rd_kafka_txn_handle_AddPartitionsToTxn, NULL); + if (err) { + mtx_unlock(&rk->rk_eos.txn_pending_lock); + rd_kafka_dbg(rk, EOS, "ADDPARTS", + "Not registering partitions: %s", errstr); + return; + } + + /* Move all pending partitions to wait-response list. + * No need to keep waitresp sorted. */ + TAILQ_CONCAT(&rk->rk_eos.txn_waitresp_rktps, + &rk->rk_eos.txn_pending_rktps, rktp_txnlink); + + mtx_unlock(&rk->rk_eos.txn_pending_lock); + + rk->rk_eos.txn_req_cnt++; + + rd_rkb_dbg(rk->rk_eos.txn_coord, EOS, "ADDPARTS", + "Registering partitions with transaction"); +} + + +static void rd_kafka_txn_register_partitions_tmr_cb(rd_kafka_timers_t *rkts, + void *arg) { + rd_kafka_t *rk = arg; + rd_kafka_txn_register_partitions(rk); +} + + +/** + * @brief Schedule register_partitions() as soon as possible. + * + * @locality any + * @locks any + */ +void rd_kafka_txn_schedule_register_partitions(rd_kafka_t *rk, int backoff_ms) { + rd_kafka_timer_start_oneshot( + &rk->rk_timers, &rk->rk_eos.txn_register_parts_tmr, + rd_false /*dont-restart*/, + backoff_ms ? backoff_ms * 1000 : 1 /* immediate */, + rd_kafka_txn_register_partitions_tmr_cb, rk); +} + + + +/** + * @brief Clears \p flag from all rktps and destroys them, emptying + * and reinitializing the \p tqh. + */ +static void rd_kafka_txn_clear_partitions_flag(rd_kafka_toppar_tqhead_t *tqh, + int flag) { + rd_kafka_toppar_t *rktp, *tmp; + + TAILQ_FOREACH_SAFE(rktp, tqh, rktp_txnlink, tmp) { + rd_kafka_toppar_lock(rktp); + rd_dassert(rktp->rktp_flags & flag); + rktp->rktp_flags &= ~flag; + rd_kafka_toppar_unlock(rktp); + rd_kafka_toppar_destroy(rktp); + } + + TAILQ_INIT(tqh); +} + + +/** + * @brief Clear all pending partitions. + * + * @locks txn_pending_lock MUST be held + */ +static void rd_kafka_txn_clear_pending_partitions(rd_kafka_t *rk) { + rd_kafka_txn_clear_partitions_flag(&rk->rk_eos.txn_pending_rktps, + RD_KAFKA_TOPPAR_F_PEND_TXN); + rd_kafka_txn_clear_partitions_flag(&rk->rk_eos.txn_waitresp_rktps, + RD_KAFKA_TOPPAR_F_PEND_TXN); +} + +/** + * @brief Clear all added partitions. + * + * @locks rd_kafka_wrlock(rk) MUST be held + */ +static void rd_kafka_txn_clear_partitions(rd_kafka_t *rk) { + rd_kafka_txn_clear_partitions_flag(&rk->rk_eos.txn_rktps, + RD_KAFKA_TOPPAR_F_IN_TXN); +} + + + +/** + * @brief Async handler for init_transactions() + * + * @locks none + * @locality rdkafka main thread + */ +static rd_kafka_op_res_t rd_kafka_txn_op_init_transactions(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_error_t *error; + + if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) + return RD_KAFKA_OP_RES_HANDLED; + + rd_kafka_wrlock(rk); + + if ((error = rd_kafka_txn_require_state( + rk, RD_KAFKA_TXN_STATE_INIT, RD_KAFKA_TXN_STATE_WAIT_PID, + RD_KAFKA_TXN_STATE_READY_NOT_ACKED))) { + rd_kafka_wrunlock(rk); + rd_kafka_txn_curr_api_set_result(rk, 0, error); + + } else if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_READY_NOT_ACKED) { + /* A previous init_transactions() called finished successfully + * after timeout, the application has called init_transactions() + * again, we do nothin here, ack_init_transactions() will + * transition the state from READY_NOT_ACKED to READY. */ + rd_kafka_wrunlock(rk); + + } else { + + /* Possibly a no-op if already in WAIT_PID state */ + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_WAIT_PID); + + rk->rk_eos.txn_init_err = RD_KAFKA_RESP_ERR_NO_ERROR; + + rd_kafka_wrunlock(rk); + + /* Start idempotent producer to acquire PID */ + rd_kafka_idemp_start(rk, rd_true /*immediately*/); + + /* Do not call curr_api_set_result, it will be triggered from + * idemp_state_change() when the PID has been retrieved. */ + } + + return RD_KAFKA_OP_RES_HANDLED; +} + + +/** + * @brief Async handler for the application to acknowledge + * successful background completion of init_transactions(). + * + * @locks none + * @locality rdkafka main thread + */ +static rd_kafka_op_res_t +rd_kafka_txn_op_ack_init_transactions(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_error_t *error; + + if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) + return RD_KAFKA_OP_RES_HANDLED; + + rd_kafka_wrlock(rk); + + if (!(error = rd_kafka_txn_require_state( + rk, RD_KAFKA_TXN_STATE_READY_NOT_ACKED))) + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_READY); + + rd_kafka_wrunlock(rk); + + rd_kafka_txn_curr_api_set_result(rk, 0, error); + + return RD_KAFKA_OP_RES_HANDLED; +} + + + +rd_kafka_error_t *rd_kafka_init_transactions(rd_kafka_t *rk, int timeout_ms) { + rd_kafka_error_t *error; + rd_ts_t abs_timeout; + + /* Cap actual timeout to transaction.timeout.ms * 2 when an infinite + * timeout is provided, this is to make sure the call doesn't block + * indefinitely in case a coordinator is not available. + * This is only needed for init_transactions() since there is no + * coordinator to time us out yet. */ + if (timeout_ms == RD_POLL_INFINITE && + /* Avoid overflow */ + rk->rk_conf.eos.transaction_timeout_ms < INT_MAX / 2) + timeout_ms = rk->rk_conf.eos.transaction_timeout_ms * 2; + + if ((error = rd_kafka_txn_curr_api_begin(rk, "init_transactions", + rd_false /* no cap */, + timeout_ms, &abs_timeout))) + return error; + + /* init_transactions() will continue to operate in the background + * if the timeout expires, and the application may call + * init_transactions() again to resume the initialization + * process. + * For this reason we need two states: + * - TXN_STATE_READY_NOT_ACKED for when initialization is done + * but the API call timed out prior to success, meaning the + * application does not know initialization finished and + * is thus not allowed to call sub-sequent txn APIs, e.g. begin..() + * - TXN_STATE_READY for when initialization is done and this + * function has returned successfully to the application. + * + * And due to the two states we need two calls to the rdkafka main + * thread (to keep txn_state synchronization in one place). */ + + /* First call is to trigger initialization */ + if ((error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_init_transactions, + abs_timeout))) { + if (rd_kafka_error_code(error) == + RD_KAFKA_RESP_ERR__TIMED_OUT) { + /* See if there's a more meaningful txn_init_err set + * by idempo that we can return. */ + rd_kafka_resp_err_t err; + rd_kafka_rdlock(rk); + err = + rd_kafka_txn_normalize_err(rk->rk_eos.txn_init_err); + rd_kafka_rdunlock(rk); + + if (err && err != RD_KAFKA_RESP_ERR__TIMED_OUT) { + rd_kafka_error_destroy(error); + error = rd_kafka_error_new_retriable( + err, "Failed to initialize Producer ID: %s", + rd_kafka_err2str(err)); + } + } + + return rd_kafka_txn_curr_api_return(rk, rd_true, error); + } + + + /* Second call is to transition from READY_NOT_ACKED -> READY, + * if necessary. */ + error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_ack_init_transactions, + /* Timeout must be infinite since this is + * a synchronization point. + * The call is immediate though, so this + * will not block. */ + RD_POLL_INFINITE); + + return rd_kafka_txn_curr_api_return(rk, + /* not resumable at this point */ + rd_false, error); +} + + + +/** + * @brief Handler for begin_transaction() + * + * @locks none + * @locality rdkafka main thread + */ +static rd_kafka_op_res_t rd_kafka_txn_op_begin_transaction(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_error_t *error; + rd_bool_t wakeup_brokers = rd_false; + + if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) + return RD_KAFKA_OP_RES_HANDLED; + + rd_kafka_wrlock(rk); + if (!(error = + rd_kafka_txn_require_state(rk, RD_KAFKA_TXN_STATE_READY))) { + rd_assert(TAILQ_EMPTY(&rk->rk_eos.txn_rktps)); + + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION); + + rd_assert(rk->rk_eos.txn_req_cnt == 0); + rd_atomic64_set(&rk->rk_eos.txn_dr_fails, 0); + rk->rk_eos.txn_err = RD_KAFKA_RESP_ERR_NO_ERROR; + RD_IF_FREE(rk->rk_eos.txn_errstr, rd_free); + rk->rk_eos.txn_errstr = NULL; + + /* Wake up all broker threads (that may have messages to send + * that were waiting for this transaction state. + * But needs to be done below with no lock held. */ + wakeup_brokers = rd_true; + } + rd_kafka_wrunlock(rk); + + if (wakeup_brokers) + rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT, + "begin transaction"); + + rd_kafka_txn_curr_api_set_result(rk, 0, error); + + return RD_KAFKA_OP_RES_HANDLED; +} + + +rd_kafka_error_t *rd_kafka_begin_transaction(rd_kafka_t *rk) { + rd_kafka_error_t *error; + + if ((error = rd_kafka_txn_curr_api_begin(rk, "begin_transaction", + rd_false, 0, NULL))) + return error; + + error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_begin_transaction, + RD_POLL_INFINITE); + + return rd_kafka_txn_curr_api_return(rk, rd_false /*not resumable*/, + error); +} + + +static rd_kafka_resp_err_t +rd_kafka_txn_send_TxnOffsetCommitRequest(rd_kafka_broker_t *rkb, + rd_kafka_op_t *rko, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *reply_opaque); + +/** + * @brief Handle TxnOffsetCommitResponse + * + * @locality rdkafka main thread + * @locks none + */ +static void rd_kafka_txn_handle_TxnOffsetCommit(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + const int log_decode_errors = LOG_ERR; + rd_kafka_op_t *rko = opaque; + int actions = 0; + rd_kafka_topic_partition_list_t *partitions = NULL; + char errstr[512]; + + *errstr = '\0'; + + if (err) + goto done; + + rd_kafka_buf_read_throttle_time(rkbuf); + + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_ERR, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + partitions = rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields); + if (!partitions) + goto err_parse; + + err = rd_kafka_topic_partition_list_get_err(partitions); + if (err) { + char errparts[256]; + rd_kafka_topic_partition_list_str(partitions, errparts, + sizeof(errparts), + RD_KAFKA_FMT_F_ONLY_ERR); + rd_snprintf(errstr, sizeof(errstr), + "Failed to commit offsets to transaction on " + "broker %s: %s " + "(after %dms)", + rd_kafka_broker_name(rkb), errparts, + (int)(request->rkbuf_ts_sent / 1000)); + } + + goto done; + +err_parse: + err = rkbuf->rkbuf_err; + +done: + if (err) { + if (!*errstr) { + rd_snprintf(errstr, sizeof(errstr), + "Failed to commit offsets to " + "transaction on broker %s: %s " + "(after %d ms)", + rkb ? rd_kafka_broker_name(rkb) : "(none)", + rd_kafka_err2str(err), + (int)(request->rkbuf_ts_sent / 1000)); + } + } + + + if (partitions) + rd_kafka_topic_partition_list_destroy(partitions); + + switch (err) { + case RD_KAFKA_RESP_ERR_NO_ERROR: + break; + + case RD_KAFKA_RESP_ERR__DESTROY: + /* Producer is being terminated, ignore the response. */ + case RD_KAFKA_RESP_ERR__OUTDATED: + /* Set a non-actionable actions flag so that + * curr_api_set_result() is called below, without + * other side-effects. */ + actions = RD_KAFKA_ERR_ACTION_SPECIAL; + return; + + case RD_KAFKA_RESP_ERR_NOT_COORDINATOR: + case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT: + case RD_KAFKA_RESP_ERR__TRANSPORT: + case RD_KAFKA_RESP_ERR__TIMED_OUT: + case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE: + /* Note: this is the group coordinator, not the + * transaction coordinator. */ + rd_kafka_coord_cache_evict(&rk->rk_coord_cache, rkb); + actions |= RD_KAFKA_ERR_ACTION_RETRY; + break; + + case RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS: + case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: + case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART: + actions |= RD_KAFKA_ERR_ACTION_RETRY; + break; + + case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED: + case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED: + case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING: + case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH: + case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE: + case RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT: + actions |= RD_KAFKA_ERR_ACTION_FATAL; + break; + + case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED: + case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED: + actions |= RD_KAFKA_ERR_ACTION_PERMANENT; + break; + + case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION: + case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: + case RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID: + actions |= RD_KAFKA_ERR_ACTION_PERMANENT; + break; + + default: + /* Unhandled error, fail transaction */ + actions |= RD_KAFKA_ERR_ACTION_PERMANENT; + break; + } + + err = rd_kafka_txn_normalize_err(err); + + if (actions & RD_KAFKA_ERR_ACTION_FATAL) { + rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err, "%s", errstr); + + } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) { + int remains_ms = rd_timeout_remains(rko->rko_u.txn.abs_timeout); + + if (!rd_timeout_expired(remains_ms)) { + rd_kafka_coord_req( + rk, RD_KAFKA_COORD_GROUP, + rko->rko_u.txn.cgmetadata->group_id, + rd_kafka_txn_send_TxnOffsetCommitRequest, rko, + 500 /* 500ms delay before retrying */, + rd_timeout_remains_limit0( + remains_ms, rk->rk_conf.socket_timeout_ms), + RD_KAFKA_REPLYQ(rk->rk_ops, 0), + rd_kafka_txn_handle_TxnOffsetCommit, rko); + return; + } else if (!err) + err = RD_KAFKA_RESP_ERR__TIMED_OUT; + actions |= RD_KAFKA_ERR_ACTION_PERMANENT; + } + + if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) + rd_kafka_txn_set_abortable_error(rk, err, "%s", errstr); + + if (err) + rd_kafka_txn_curr_api_set_result( + rk, actions, rd_kafka_error_new(err, "%s", errstr)); + else + rd_kafka_txn_curr_api_set_result(rk, 0, NULL); + + rd_kafka_op_destroy(rko); +} + + + +/** + * @brief Construct and send TxnOffsetCommitRequest. + * + * @locality rdkafka main thread + * @locks none + */ +static rd_kafka_resp_err_t +rd_kafka_txn_send_TxnOffsetCommitRequest(rd_kafka_broker_t *rkb, + rd_kafka_op_t *rko, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *reply_opaque) { + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion; + rd_kafka_pid_t pid; + const rd_kafka_consumer_group_metadata_t *cgmetadata = + rko->rko_u.txn.cgmetadata; + int cnt; + + rd_kafka_rdlock(rk); + if (rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_IN_TRANSACTION) { + rd_kafka_rdunlock(rk); + /* Do not free the rko, it is passed as the reply_opaque + * on the reply queue by coord_req_fsm() when we return + * an error here. */ + return RD_KAFKA_RESP_ERR__STATE; + } + + pid = rd_kafka_idemp_get_pid0(rk, RD_DONT_LOCK, rd_false); + rd_kafka_rdunlock(rk); + if (!rd_kafka_pid_valid(pid)) { + /* Do not free the rko, it is passed as the reply_opaque + * on the reply queue by coord_req_fsm() when we return + * an error here. */ + return RD_KAFKA_RESP_ERR__STATE; + } + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_TxnOffsetCommit, 0, 3, NULL); + if (ApiVersion == -1) { + /* Do not free the rko, it is passed as the reply_opaque + * on the reply queue by coord_req_fsm() when we return + * an error here. */ + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_TxnOffsetCommit, 1, rko->rko_u.txn.offsets->cnt * 50, + ApiVersion >= 3); + + /* transactional_id */ + rd_kafka_buf_write_str(rkbuf, rk->rk_conf.eos.transactional_id, -1); + + /* group_id */ + rd_kafka_buf_write_str(rkbuf, rko->rko_u.txn.cgmetadata->group_id, -1); + + /* PID */ + rd_kafka_buf_write_i64(rkbuf, pid.id); + rd_kafka_buf_write_i16(rkbuf, pid.epoch); + + if (ApiVersion >= 3) { + /* GenerationId */ + rd_kafka_buf_write_i32(rkbuf, cgmetadata->generation_id); + /* MemberId */ + rd_kafka_buf_write_str(rkbuf, cgmetadata->member_id, -1); + /* GroupInstanceId */ + rd_kafka_buf_write_str(rkbuf, cgmetadata->group_instance_id, + -1); + } + + /* Write per-partition offsets list */ + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET, + ApiVersion >= 2 ? RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH + : RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, + RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + cnt = rd_kafka_buf_write_topic_partitions( + rkbuf, rko->rko_u.txn.offsets, rd_true /*skip invalid offsets*/, + rd_false /*any offset*/, fields); + if (!cnt) { + /* No valid partition offsets, don't commit. */ + rd_kafka_buf_destroy(rkbuf); + /* Do not free the rko, it is passed as the reply_opaque + * on the reply queue by coord_req_fsm() when we return + * an error here. */ + return RD_KAFKA_RESP_ERR__NO_OFFSET; + } + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES; + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, + reply_opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +/** + * @brief Handle AddOffsetsToTxnResponse + * + * @locality rdkafka main thread + * @locks none + */ +static void rd_kafka_txn_handle_AddOffsetsToTxn(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + const int log_decode_errors = LOG_ERR; + rd_kafka_op_t *rko = opaque; + int16_t ErrorCode; + int actions = 0; + int remains_ms; + + if (err == RD_KAFKA_RESP_ERR__DESTROY) { + rd_kafka_op_destroy(rko); + return; + } + + if (err) + goto done; + + rd_kafka_buf_read_throttle_time(rkbuf); + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + + err = ErrorCode; + goto done; + +err_parse: + err = rkbuf->rkbuf_err; + +done: + if (err) { + rd_assert(rk->rk_eos.txn_req_cnt > 0); + rk->rk_eos.txn_req_cnt--; + } + + remains_ms = rd_timeout_remains(rko->rko_u.txn.abs_timeout); + if (rd_timeout_expired(remains_ms) && !err) + err = RD_KAFKA_RESP_ERR__TIMED_OUT; + + switch (err) { + case RD_KAFKA_RESP_ERR_NO_ERROR: + break; + + case RD_KAFKA_RESP_ERR__DESTROY: + /* Producer is being terminated, ignore the response. */ + case RD_KAFKA_RESP_ERR__OUTDATED: + /* Set a non-actionable actions flag so that + * curr_api_set_result() is called below, without + * other side-effects. */ + actions = RD_KAFKA_ERR_ACTION_SPECIAL; + break; + + case RD_KAFKA_RESP_ERR__TRANSPORT: + case RD_KAFKA_RESP_ERR__TIMED_OUT: + /* For these errors we can't be sure if the + * request was received by the broker or not, + * so increase the txn_req_cnt back up as if + * they were received so that and EndTxnRequest + * is sent on abort_transaction(). */ + rk->rk_eos.txn_req_cnt++; + /* FALLTHRU */ + case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE: + case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR_NOT_COORDINATOR: + case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT: + actions |= + RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_REFRESH; + break; + + case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED: + case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED: + case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH: + case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE: + case RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT: + actions |= RD_KAFKA_ERR_ACTION_FATAL; + break; + + case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED: + case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED: + actions |= RD_KAFKA_ERR_ACTION_PERMANENT; + break; + + case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART: + case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: + case RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS: + actions |= RD_KAFKA_ERR_ACTION_RETRY; + break; + + default: + /* All unhandled errors are permanent */ + actions |= RD_KAFKA_ERR_ACTION_PERMANENT; + break; + } + + err = rd_kafka_txn_normalize_err(err); + + rd_kafka_dbg(rk, EOS, "ADDOFFSETS", + "AddOffsetsToTxn response from %s: %s (%s)", + rkb ? rd_kafka_broker_name(rkb) : "(none)", + rd_kafka_err2name(err), rd_kafka_actions2str(actions)); + + /* All unhandled errors are considered permanent */ + if (err && !actions) + actions |= RD_KAFKA_ERR_ACTION_PERMANENT; + + if (actions & RD_KAFKA_ERR_ACTION_FATAL) { + rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err, + "Failed to add offsets to " + "transaction: %s", + rd_kafka_err2str(err)); + } else { + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) + rd_kafka_txn_coord_timer_start(rk, 50); + + if (actions & RD_KAFKA_ERR_ACTION_RETRY) { + rd_rkb_dbg( + rkb, EOS, "ADDOFFSETS", + "Failed to add offsets to transaction on " + "broker %s: %s (after %dms, %dms remains): " + "error is retriable", + rd_kafka_broker_name(rkb), rd_kafka_err2str(err), + (int)(request->rkbuf_ts_sent / 1000), remains_ms); + + if (!rd_timeout_expired(remains_ms) && + rd_kafka_buf_retry(rk->rk_eos.txn_coord, request)) { + rk->rk_eos.txn_req_cnt++; + return; + } + + /* Propagate as retriable error through + * api_reply() below */ + } + } + + if (err) + rd_rkb_log(rkb, LOG_ERR, "ADDOFFSETS", + "Failed to add offsets to transaction on broker %s: " + "%s", + rkb ? rd_kafka_broker_name(rkb) : "(none)", + rd_kafka_err2str(err)); + + if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) + rd_kafka_txn_set_abortable_error( + rk, err, + "Failed to add offsets to " + "transaction on broker %s: " + "%s (after %dms)", + rd_kafka_broker_name(rkb), rd_kafka_err2str(err), + (int)(request->rkbuf_ts_sent / 1000)); + + if (!err) { + /* Step 2: Commit offsets to transaction on the + * group coordinator. */ + + rd_kafka_coord_req( + rk, RD_KAFKA_COORD_GROUP, + rko->rko_u.txn.cgmetadata->group_id, + rd_kafka_txn_send_TxnOffsetCommitRequest, rko, + 0 /* no delay */, + rd_timeout_remains_limit0(remains_ms, + rk->rk_conf.socket_timeout_ms), + RD_KAFKA_REPLYQ(rk->rk_ops, 0), + rd_kafka_txn_handle_TxnOffsetCommit, rko); + + } else { + + rd_kafka_txn_curr_api_set_result( + rk, actions, + rd_kafka_error_new( + err, + "Failed to add offsets to transaction on " + "broker %s: %s (after %dms)", + rd_kafka_broker_name(rkb), rd_kafka_err2str(err), + (int)(request->rkbuf_ts_sent / 1000))); + + rd_kafka_op_destroy(rko); + } +} + + +/** + * @brief Async handler for send_offsets_to_transaction() + * + * @locks none + * @locality rdkafka main thread + */ +static rd_kafka_op_res_t +rd_kafka_txn_op_send_offsets_to_transaction(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; + char errstr[512]; + rd_kafka_error_t *error; + rd_kafka_pid_t pid; + + if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) + return RD_KAFKA_OP_RES_HANDLED; + + *errstr = '\0'; + + rd_kafka_wrlock(rk); + + if ((error = rd_kafka_txn_require_state( + rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION))) { + rd_kafka_wrunlock(rk); + goto err; + } + + rd_kafka_wrunlock(rk); + + pid = rd_kafka_idemp_get_pid0(rk, RD_DONT_LOCK, rd_false); + if (!rd_kafka_pid_valid(pid)) { + rd_dassert(!*"BUG: No PID despite proper transaction state"); + error = rd_kafka_error_new_retriable( + RD_KAFKA_RESP_ERR__STATE, + "No PID available (idempotence state %s)", + rd_kafka_idemp_state2str(rk->rk_eos.idemp_state)); + goto err; + } + + /* This is a multi-stage operation, consisting of: + * 1) send AddOffsetsToTxnRequest to transaction coordinator. + * 2) send TxnOffsetCommitRequest to group coordinator. */ + + err = rd_kafka_AddOffsetsToTxnRequest( + rk->rk_eos.txn_coord, rk->rk_conf.eos.transactional_id, pid, + rko->rko_u.txn.cgmetadata->group_id, errstr, sizeof(errstr), + RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_txn_handle_AddOffsetsToTxn, + rko); + + if (err) { + error = rd_kafka_error_new_retriable(err, "%s", errstr); + goto err; + } + + rk->rk_eos.txn_req_cnt++; + + return RD_KAFKA_OP_RES_KEEP; /* the rko is passed to AddOffsetsToTxn */ + +err: + rd_kafka_txn_curr_api_set_result(rk, 0, error); + + return RD_KAFKA_OP_RES_HANDLED; +} + +/** + * error returns: + * ERR__TRANSPORT - retryable + */ +rd_kafka_error_t *rd_kafka_send_offsets_to_transaction( + rd_kafka_t *rk, + const rd_kafka_topic_partition_list_t *offsets, + const rd_kafka_consumer_group_metadata_t *cgmetadata, + int timeout_ms) { + rd_kafka_error_t *error; + rd_kafka_op_t *rko; + rd_kafka_topic_partition_list_t *valid_offsets; + rd_ts_t abs_timeout; + + if (!cgmetadata || !offsets) + return rd_kafka_error_new( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "cgmetadata and offsets are required parameters"); + + if ((error = rd_kafka_txn_curr_api_begin( + rk, "send_offsets_to_transaction", + /* Cap timeout to txn timeout */ + rd_true, timeout_ms, &abs_timeout))) + return error; + + + valid_offsets = rd_kafka_topic_partition_list_match( + offsets, rd_kafka_topic_partition_match_valid_offset, NULL); + + if (valid_offsets->cnt == 0) { + /* No valid offsets, e.g., nothing was consumed, + * this is not an error, do nothing. */ + rd_kafka_topic_partition_list_destroy(valid_offsets); + return rd_kafka_txn_curr_api_return(rk, rd_false, NULL); + } + + rd_kafka_topic_partition_list_sort_by_topic(valid_offsets); + + rko = rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN, + rd_kafka_txn_op_send_offsets_to_transaction); + rko->rko_u.txn.offsets = valid_offsets; + rko->rko_u.txn.cgmetadata = + rd_kafka_consumer_group_metadata_dup(cgmetadata); + rko->rko_u.txn.abs_timeout = abs_timeout; + + /* Timeout is enforced by op_send_offsets_to_transaction() */ + error = rd_kafka_txn_op_req1(rk, rko, RD_POLL_INFINITE); + + return rd_kafka_txn_curr_api_return(rk, rd_false, error); +} + + + +/** + * @brief Successfully complete the transaction. + * + * Current state must be either COMMIT_NOT_ACKED or ABORT_NOT_ACKED. + * + * @locality rdkafka main thread + * @locks rd_kafka_wrlock(rk) MUST be held + */ +static void rd_kafka_txn_complete(rd_kafka_t *rk, rd_bool_t is_commit) { + rd_kafka_dbg(rk, EOS, "TXNCOMPLETE", "Transaction successfully %s", + is_commit ? "committed" : "aborted"); + + /* Clear all transaction partition state */ + rd_kafka_txn_clear_pending_partitions(rk); + rd_kafka_txn_clear_partitions(rk); + + rk->rk_eos.txn_requires_epoch_bump = rd_false; + rk->rk_eos.txn_req_cnt = 0; + + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_READY); +} + + +/** + * @brief EndTxn (commit or abort of transaction on the coordinator) is done, + * or was skipped. + * Continue with next steps (if any) before completing the local + * transaction state. + * + * @locality rdkafka main thread + * @locks_acquired rd_kafka_wrlock(rk), rk->rk_eos.txn_curr_api.lock + */ +static void rd_kafka_txn_endtxn_complete(rd_kafka_t *rk) { + rd_bool_t is_commit; + + mtx_lock(&rk->rk_eos.txn_curr_api.lock); + is_commit = !strcmp(rk->rk_eos.txn_curr_api.name, "commit_transaction"); + mtx_unlock(&rk->rk_eos.txn_curr_api.lock); + + rd_kafka_wrlock(rk); + + /* If an epoch bump is required, let idempo handle it. + * When the bump is finished we'll be notified through + * idemp_state_change() and we can complete the local transaction state + * and set the final API call result. + * If the bumping fails a fatal error will be raised. */ + if (rk->rk_eos.txn_requires_epoch_bump) { + rd_kafka_resp_err_t bump_err = rk->rk_eos.txn_err; + rd_dassert(!is_commit); + + rd_kafka_wrunlock(rk); + + /* After the epoch bump is done we'll be transitioned + * to the next state. */ + rd_kafka_idemp_drain_epoch_bump0( + rk, rd_false /* don't allow txn abort */, bump_err, + "Transaction aborted: %s", rd_kafka_err2str(bump_err)); + return; + } + + if (is_commit) + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED); + else + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED); + + rd_kafka_wrunlock(rk); + + rd_kafka_txn_curr_api_set_result(rk, 0, NULL); +} + + +/** + * @brief Handle EndTxnResponse (commit or abort) + * + * @locality rdkafka main thread + * @locks none + */ +static void rd_kafka_txn_handle_EndTxn(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + const int log_decode_errors = LOG_ERR; + int16_t ErrorCode; + int actions = 0; + rd_bool_t is_commit, may_retry = rd_false, require_bump = rd_false; + + if (err == RD_KAFKA_RESP_ERR__DESTROY) + return; + + is_commit = request->rkbuf_u.EndTxn.commit; + + if (err) + goto err; + + rd_kafka_buf_read_throttle_time(rkbuf); + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + err = ErrorCode; + goto err; + +err_parse: + err = rkbuf->rkbuf_err; + /* FALLTHRU */ + +err: + rd_kafka_wrlock(rk); + + if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION) { + may_retry = rd_true; + + } else if (rk->rk_eos.txn_state == + RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION) { + may_retry = rd_true; + + } else if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_ABORTABLE_ERROR) { + /* Transaction has failed locally, typically due to timeout. + * Get the transaction error and return that instead of + * this error. + * This is a tricky state since the transaction will have + * failed locally but the EndTxn(commit) may have succeeded. */ + + + if (err) { + rd_kafka_txn_curr_api_set_result( + rk, RD_KAFKA_ERR_ACTION_PERMANENT, + rd_kafka_error_new( + rk->rk_eos.txn_err, + "EndTxn failed with %s but transaction " + "had already failed due to: %s", + rd_kafka_err2name(err), rk->rk_eos.txn_errstr)); + } else { + /* If the transaction has failed locally but + * this EndTxn commit succeeded we'll raise + * a fatal error. */ + if (is_commit) + rd_kafka_txn_curr_api_set_result( + rk, RD_KAFKA_ERR_ACTION_FATAL, + rd_kafka_error_new( + rk->rk_eos.txn_err, + "Transaction commit succeeded on the " + "broker but the transaction " + "had already failed locally due to: %s", + rk->rk_eos.txn_errstr)); + + else + rd_kafka_txn_curr_api_set_result( + rk, RD_KAFKA_ERR_ACTION_PERMANENT, + rd_kafka_error_new( + rk->rk_eos.txn_err, + "Transaction abort succeeded on the " + "broker but the transaction" + "had already failed locally due to: %s", + rk->rk_eos.txn_errstr)); + } + + rd_kafka_wrunlock(rk); + + + return; + + } else if (!err) { + /* Request is outdated */ + err = RD_KAFKA_RESP_ERR__OUTDATED; + } + + + rd_kafka_dbg(rk, EOS, "ENDTXN", + "EndTxn returned %s in state %s (may_retry=%s)", + rd_kafka_err2name(err), + rd_kafka_txn_state2str(rk->rk_eos.txn_state), + RD_STR_ToF(may_retry)); + + rd_kafka_wrunlock(rk); + + switch (err) { + case RD_KAFKA_RESP_ERR_NO_ERROR: + break; + + case RD_KAFKA_RESP_ERR__DESTROY: + /* Producer is being terminated, ignore the response. */ + case RD_KAFKA_RESP_ERR__OUTDATED: + /* Transactional state no longer relevant for this + * outdated response. */ + break; + case RD_KAFKA_RESP_ERR__TIMED_OUT: + case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE: + /* Request timeout */ + /* FALLTHRU */ + case RD_KAFKA_RESP_ERR__TRANSPORT: + actions |= + RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_REFRESH; + break; + + case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE: + case RD_KAFKA_RESP_ERR_NOT_COORDINATOR: + rd_kafka_wrlock(rk); + rd_kafka_txn_coord_set(rk, NULL, "EndTxn failed: %s", + rd_kafka_err2str(err)); + rd_kafka_wrunlock(rk); + actions |= RD_KAFKA_ERR_ACTION_RETRY; + break; + + case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: + case RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS: + actions |= RD_KAFKA_ERR_ACTION_RETRY; + break; + + case RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID: + case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING: + actions |= RD_KAFKA_ERR_ACTION_PERMANENT; + require_bump = rd_true; + break; + + case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH: + case RD_KAFKA_RESP_ERR_PRODUCER_FENCED: + case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED: + case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED: + case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE: + actions |= RD_KAFKA_ERR_ACTION_FATAL; + break; + + default: + /* All unhandled errors are permanent */ + actions |= RD_KAFKA_ERR_ACTION_PERMANENT; + } + + err = rd_kafka_txn_normalize_err(err); + + if (actions & RD_KAFKA_ERR_ACTION_FATAL) { + rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err, + "Failed to end transaction: %s", + rd_kafka_err2str(err)); + } else { + if (actions & RD_KAFKA_ERR_ACTION_REFRESH) + rd_kafka_txn_coord_timer_start(rk, 50); + + if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) { + if (require_bump && !is_commit) { + /* Abort failed to due invalid PID, starting + * with KIP-360 we can have idempo sort out + * epoch bumping. + * When the epoch has been bumped we'll detect + * the idemp_state_change and complete the + * current API call. */ + rd_kafka_idemp_drain_epoch_bump0( + rk, + /* don't allow txn abort */ + rd_false, err, "EndTxn %s failed: %s", + is_commit ? "commit" : "abort", + rd_kafka_err2str(err)); + return; + } + + /* For aborts we need to revert the state back to + * BEGIN_ABORT so that the abort can be retried from + * the beginning in op_abort_transaction(). */ + rd_kafka_wrlock(rk); + if (rk->rk_eos.txn_state == + RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION) + rd_kafka_txn_set_state( + rk, RD_KAFKA_TXN_STATE_BEGIN_ABORT); + rd_kafka_wrunlock(rk); + + rd_kafka_txn_set_abortable_error0( + rk, err, require_bump, + "Failed to end transaction: " + "%s", + rd_kafka_err2str(err)); + + } else if (may_retry && actions & RD_KAFKA_ERR_ACTION_RETRY && + rd_kafka_buf_retry(rkb, request)) + return; + } + + if (err) + rd_kafka_txn_curr_api_set_result( + rk, actions, + rd_kafka_error_new(err, "EndTxn %s failed: %s", + is_commit ? "commit" : "abort", + rd_kafka_err2str(err))); + else + rd_kafka_txn_endtxn_complete(rk); +} + + + +/** + * @brief Handler for commit_transaction() + * + * @locks none + * @locality rdkafka main thread + */ +static rd_kafka_op_res_t +rd_kafka_txn_op_commit_transaction(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_error_t *error; + rd_kafka_resp_err_t err; + char errstr[512]; + rd_kafka_pid_t pid; + int64_t dr_fails; + + if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) + return RD_KAFKA_OP_RES_HANDLED; + + rd_kafka_wrlock(rk); + + if ((error = rd_kafka_txn_require_state( + rk, RD_KAFKA_TXN_STATE_BEGIN_COMMIT, + RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION, + RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED))) + goto done; + + if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED) { + /* A previous call to commit_transaction() timed out but the + * commit completed since then, we still + * need to wait for the application to call commit_transaction() + * again to resume the call, and it just did. */ + goto done; + } else if (rk->rk_eos.txn_state == + RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION) { + /* A previous call to commit_transaction() timed out but the + * commit is still in progress, we still + * need to wait for the application to call commit_transaction() + * again to resume the call, and it just did. */ + rd_kafka_wrunlock(rk); + return RD_KAFKA_OP_RES_HANDLED; + } + + /* If any messages failed delivery the transaction must be aborted. */ + dr_fails = rd_atomic64_get(&rk->rk_eos.txn_dr_fails); + if (unlikely(dr_fails > 0)) { + error = rd_kafka_error_new_txn_requires_abort( + RD_KAFKA_RESP_ERR__INCONSISTENT, + "%" PRId64 + " message(s) failed delivery " + "(see individual delivery reports)", + dr_fails); + goto done; + } + + if (!rk->rk_eos.txn_req_cnt) { + /* If there were no messages produced, or no send_offsets, + * in this transaction, simply complete the transaction + * without sending anything to the transaction coordinator + * (since it will not have any txn state). */ + rd_kafka_dbg(rk, EOS, "TXNCOMMIT", + "No partitions registered: not sending EndTxn"); + rd_kafka_wrunlock(rk); + rd_kafka_txn_endtxn_complete(rk); + return RD_KAFKA_OP_RES_HANDLED; + } + + pid = rd_kafka_idemp_get_pid0(rk, RD_DONT_LOCK, rd_false); + if (!rd_kafka_pid_valid(pid)) { + rd_dassert(!*"BUG: No PID despite proper transaction state"); + error = rd_kafka_error_new_retriable( + RD_KAFKA_RESP_ERR__STATE, + "No PID available (idempotence state %s)", + rd_kafka_idemp_state2str(rk->rk_eos.idemp_state)); + goto done; + } + + err = rd_kafka_EndTxnRequest( + rk->rk_eos.txn_coord, rk->rk_conf.eos.transactional_id, pid, + rd_true /* commit */, errstr, sizeof(errstr), + RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_txn_handle_EndTxn, NULL); + if (err) { + error = rd_kafka_error_new_retriable(err, "%s", errstr); + goto done; + } + + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION); + + rd_kafka_wrunlock(rk); + + return RD_KAFKA_OP_RES_HANDLED; + +done: + rd_kafka_wrunlock(rk); + + /* If the returned error is an abortable error + * also set the current transaction state accordingly. */ + if (rd_kafka_error_txn_requires_abort(error)) + rd_kafka_txn_set_abortable_error(rk, rd_kafka_error_code(error), + "%s", + rd_kafka_error_string(error)); + + rd_kafka_txn_curr_api_set_result(rk, 0, error); + + return RD_KAFKA_OP_RES_HANDLED; +} + + +/** + * @brief Handler for commit_transaction()'s first phase: begin commit + * + * @locks none + * @locality rdkafka main thread + */ +static rd_kafka_op_res_t rd_kafka_txn_op_begin_commit(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_error_t *error; + + if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) + return RD_KAFKA_OP_RES_HANDLED; + + + rd_kafka_wrlock(rk); + + error = rd_kafka_txn_require_state( + rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION, + RD_KAFKA_TXN_STATE_BEGIN_COMMIT, + RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION, + RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED); + + if (!error && + rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_IN_TRANSACTION) { + /* Transition to BEGIN_COMMIT state if no error and commit not + * already started. */ + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_BEGIN_COMMIT); + } + + rd_kafka_wrunlock(rk); + + rd_kafka_txn_curr_api_set_result(rk, 0, error); + + return RD_KAFKA_OP_RES_HANDLED; +} + + +/** + * @brief Handler for last ack of commit_transaction() + * + * @locks none + * @locality rdkafka main thread + */ +static rd_kafka_op_res_t +rd_kafka_txn_op_commit_transaction_ack(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_error_t *error; + + if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) + return RD_KAFKA_OP_RES_HANDLED; + + rd_kafka_wrlock(rk); + + if (!(error = rd_kafka_txn_require_state( + rk, RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED))) { + rd_kafka_dbg(rk, EOS, "TXNCOMMIT", + "Committed transaction now acked by application"); + rd_kafka_txn_complete(rk, rd_true /*is commit*/); + } + + rd_kafka_wrunlock(rk); + + rd_kafka_txn_curr_api_set_result(rk, 0, error); + + return RD_KAFKA_OP_RES_HANDLED; +} + + + +rd_kafka_error_t *rd_kafka_commit_transaction(rd_kafka_t *rk, int timeout_ms) { + rd_kafka_error_t *error; + rd_kafka_resp_err_t err; + rd_ts_t abs_timeout; + + /* The commit is in three phases: + * - begin commit: wait for outstanding messages to be produced, + * disallow new messages from being produced + * by application. + * - commit: commit transaction. + * - commit not acked: commit done, but waiting for application + * to acknowledge by completing this API call. + */ + + if ((error = rd_kafka_txn_curr_api_begin(rk, "commit_transaction", + rd_false /* no cap */, + timeout_ms, &abs_timeout))) + return error; + + /* Begin commit */ + if ((error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_begin_commit, + abs_timeout))) + return rd_kafka_txn_curr_api_return(rk, + /* not resumable yet */ + rd_false, error); + + rd_kafka_dbg(rk, EOS, "TXNCOMMIT", + "Flushing %d outstanding message(s) prior to commit", + rd_kafka_outq_len(rk)); + + /* Wait for queued messages to be delivered, limited by + * the remaining transaction lifetime. */ + if ((err = rd_kafka_flush(rk, rd_timeout_remains(abs_timeout)))) { + rd_kafka_dbg(rk, EOS, "TXNCOMMIT", + "Flush failed (with %d messages remaining): %s", + rd_kafka_outq_len(rk), rd_kafka_err2str(err)); + + if (err == RD_KAFKA_RESP_ERR__TIMED_OUT) + error = rd_kafka_error_new_retriable( + err, + "Failed to flush all outstanding messages " + "within the API timeout: " + "%d message(s) remaining%s", + rd_kafka_outq_len(rk), + /* In case event queue delivery reports + * are enabled and there is no dr callback + * we instruct the developer to poll + * the event queue separately, since we + * can't do it for them. */ + ((rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR) && + !rk->rk_conf.dr_msg_cb && !rk->rk_conf.dr_cb) + ? ": the event queue must be polled " + "for delivery report events in a separate " + "thread or prior to calling commit" + : ""); + else + error = rd_kafka_error_new_retriable( + err, "Failed to flush outstanding messages: %s", + rd_kafka_err2str(err)); + + /* The commit operation is in progress in the background + * and the application will need to call this API again + * to resume. */ + return rd_kafka_txn_curr_api_return(rk, rd_true, error); + } + + rd_kafka_dbg(rk, EOS, "TXNCOMMIT", + "Transaction commit message flush complete"); + + /* Commit transaction */ + error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_commit_transaction, + abs_timeout); + if (error) + return rd_kafka_txn_curr_api_return(rk, rd_true, error); + + /* Last call is to transition from COMMIT_NOT_ACKED to READY */ + error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_commit_transaction_ack, + /* Timeout must be infinite since this is + * a synchronization point. + * The call is immediate though, so this + * will not block. */ + RD_POLL_INFINITE); + + return rd_kafka_txn_curr_api_return(rk, + /* not resumable at this point */ + rd_false, error); +} + + + +/** + * @brief Handler for abort_transaction()'s first phase: begin abort + * + * @locks none + * @locality rdkafka main thread + */ +static rd_kafka_op_res_t rd_kafka_txn_op_begin_abort(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_error_t *error; + rd_bool_t clear_pending = rd_false; + + if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) + return RD_KAFKA_OP_RES_HANDLED; + + rd_kafka_wrlock(rk); + + error = + rd_kafka_txn_require_state(rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION, + RD_KAFKA_TXN_STATE_BEGIN_ABORT, + RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION, + RD_KAFKA_TXN_STATE_ABORTABLE_ERROR, + RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED); + + if (!error && + (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_IN_TRANSACTION || + rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_ABORTABLE_ERROR)) { + /* Transition to ABORTING_TRANSACTION state if no error and + * abort not already started. */ + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_BEGIN_ABORT); + clear_pending = rd_true; + } + + rd_kafka_wrunlock(rk); + + if (clear_pending) { + mtx_lock(&rk->rk_eos.txn_pending_lock); + rd_kafka_txn_clear_pending_partitions(rk); + mtx_unlock(&rk->rk_eos.txn_pending_lock); + } + + rd_kafka_txn_curr_api_set_result(rk, 0, error); + + return RD_KAFKA_OP_RES_HANDLED; +} + + +/** + * @brief Handler for abort_transaction() + * + * @locks none + * @locality rdkafka main thread + */ +static rd_kafka_op_res_t rd_kafka_txn_op_abort_transaction(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_error_t *error; + rd_kafka_resp_err_t err; + char errstr[512]; + rd_kafka_pid_t pid; + + if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) + return RD_KAFKA_OP_RES_HANDLED; + + rd_kafka_wrlock(rk); + + if ((error = rd_kafka_txn_require_state( + rk, RD_KAFKA_TXN_STATE_BEGIN_ABORT, + RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION, + RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED))) + goto done; + + if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED) { + /* A previous call to abort_transaction() timed out but + * the aborting completed since then, we still need to wait + * for the application to call abort_transaction() again + * to synchronize state, and it just did. */ + goto done; + } else if (rk->rk_eos.txn_state == + RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION) { + /* A previous call to abort_transaction() timed out but + * the abort is still in progress, we still need to wait + * for the application to call abort_transaction() again + * to synchronize state, and it just did. */ + rd_kafka_wrunlock(rk); + return RD_KAFKA_OP_RES_HANDLED; + } + + if (!rk->rk_eos.txn_req_cnt) { + rd_kafka_dbg(rk, EOS, "TXNABORT", + "No partitions registered: not sending EndTxn"); + rd_kafka_wrunlock(rk); + rd_kafka_txn_endtxn_complete(rk); + return RD_KAFKA_OP_RES_HANDLED; + } + + /* If the underlying idempotent producer's state indicates it + * is re-acquiring its PID we need to wait for that to finish + * before allowing a new begin_transaction(), and since that is + * not a blocking call we need to perform that wait in this + * state instead. + * To recover we need to request an epoch bump from the + * transaction coordinator. This is handled automatically + * by the idempotent producer, so we just need to wait for + * the new pid to be assigned. + */ + if (rk->rk_eos.idemp_state != RD_KAFKA_IDEMP_STATE_ASSIGNED && + rk->rk_eos.idemp_state != RD_KAFKA_IDEMP_STATE_WAIT_TXN_ABORT) { + rd_kafka_dbg(rk, EOS, "TXNABORT", + "Waiting for transaction coordinator " + "PID bump to complete before aborting " + "transaction (idempotent producer state %s)", + rd_kafka_idemp_state2str(rk->rk_eos.idemp_state)); + + rd_kafka_wrunlock(rk); + + return RD_KAFKA_OP_RES_HANDLED; + } + + pid = rd_kafka_idemp_get_pid0(rk, RD_DONT_LOCK, rd_true); + if (!rd_kafka_pid_valid(pid)) { + rd_dassert(!*"BUG: No PID despite proper transaction state"); + error = rd_kafka_error_new_retriable( + RD_KAFKA_RESP_ERR__STATE, + "No PID available (idempotence state %s)", + rd_kafka_idemp_state2str(rk->rk_eos.idemp_state)); + goto done; + } + + err = rd_kafka_EndTxnRequest( + rk->rk_eos.txn_coord, rk->rk_conf.eos.transactional_id, pid, + rd_false /* abort */, errstr, sizeof(errstr), + RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_txn_handle_EndTxn, NULL); + if (err) { + error = rd_kafka_error_new_retriable(err, "%s", errstr); + goto done; + } + + rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION); + + rd_kafka_wrunlock(rk); + + return RD_KAFKA_OP_RES_HANDLED; + +done: + rd_kafka_wrunlock(rk); + + rd_kafka_txn_curr_api_set_result(rk, 0, error); + + return RD_KAFKA_OP_RES_HANDLED; +} + + +/** + * @brief Handler for last ack of abort_transaction() + * + * @locks none + * @locality rdkafka main thread + */ +static rd_kafka_op_res_t +rd_kafka_txn_op_abort_transaction_ack(rd_kafka_t *rk, + rd_kafka_q_t *rkq, + rd_kafka_op_t *rko) { + rd_kafka_error_t *error; + + if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) + return RD_KAFKA_OP_RES_HANDLED; + + rd_kafka_wrlock(rk); + + if (!(error = rd_kafka_txn_require_state( + rk, RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED))) { + rd_kafka_dbg(rk, EOS, "TXNABORT", + "Aborted transaction now acked by application"); + rd_kafka_txn_complete(rk, rd_false /*is abort*/); + } + + rd_kafka_wrunlock(rk); + + rd_kafka_txn_curr_api_set_result(rk, 0, error); + + return RD_KAFKA_OP_RES_HANDLED; +} + + + +rd_kafka_error_t *rd_kafka_abort_transaction(rd_kafka_t *rk, int timeout_ms) { + rd_kafka_error_t *error; + rd_kafka_resp_err_t err; + rd_ts_t abs_timeout; + + if ((error = rd_kafka_txn_curr_api_begin(rk, "abort_transaction", + rd_false /* no cap */, + timeout_ms, &abs_timeout))) + return error; + + /* The abort is multi-phase: + * - set state to BEGIN_ABORT + * - flush() outstanding messages + * - send EndTxn + */ + + /* Begin abort */ + if ((error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_begin_abort, + abs_timeout))) + return rd_kafka_txn_curr_api_return(rk, + /* not resumable yet */ + rd_false, error); + + rd_kafka_dbg(rk, EOS, "TXNABORT", + "Purging and flushing %d outstanding message(s) prior " + "to abort", + rd_kafka_outq_len(rk)); + + /* Purge all queued messages. + * Will need to wait for messages in-flight since purging these + * messages may lead to gaps in the idempotent producer sequences. */ + err = rd_kafka_purge(rk, RD_KAFKA_PURGE_F_QUEUE | + RD_KAFKA_PURGE_F_ABORT_TXN); + + /* Serve delivery reports for the purged messages. */ + if ((err = rd_kafka_flush(rk, rd_timeout_remains(abs_timeout)))) { + /* FIXME: Not sure these errors matter that much */ + if (err == RD_KAFKA_RESP_ERR__TIMED_OUT) + error = rd_kafka_error_new_retriable( + err, + "Failed to flush all outstanding messages " + "within the API timeout: " + "%d message(s) remaining%s", + rd_kafka_outq_len(rk), + (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR) + ? ": the event queue must be polled " + "for delivery report events in a separate " + "thread or prior to calling abort" + : ""); + + else + error = rd_kafka_error_new_retriable( + err, "Failed to flush outstanding messages: %s", + rd_kafka_err2str(err)); + + /* The abort operation is in progress in the background + * and the application will need to call this API again + * to resume. */ + return rd_kafka_txn_curr_api_return(rk, rd_true, error); + } + + rd_kafka_dbg(rk, EOS, "TXNCOMMIT", + "Transaction abort message purge and flush complete"); + + error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_abort_transaction, + abs_timeout); + if (error) + return rd_kafka_txn_curr_api_return(rk, rd_true, error); + + /* Last call is to transition from ABORT_NOT_ACKED to READY. */ + error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_abort_transaction_ack, + /* Timeout must be infinite since this is + * a synchronization point. + * The call is immediate though, so this + * will not block. */ + RD_POLL_INFINITE); + + return rd_kafka_txn_curr_api_return(rk, + /* not resumable at this point */ + rd_false, error); +} + + + +/** + * @brief Coordinator query timer + * + * @locality rdkafka main thread + * @locks none + */ + +static void rd_kafka_txn_coord_timer_cb(rd_kafka_timers_t *rkts, void *arg) { + rd_kafka_t *rk = arg; + + rd_kafka_wrlock(rk); + rd_kafka_txn_coord_query(rk, "Coordinator query timer"); + rd_kafka_wrunlock(rk); +} + +/** + * @brief Start coord query timer if not already started. + * + * @locality rdkafka main thread + * @locks none + */ +static void rd_kafka_txn_coord_timer_start(rd_kafka_t *rk, int timeout_ms) { + rd_assert(rd_kafka_is_transactional(rk)); + rd_kafka_timer_start_oneshot(&rk->rk_timers, &rk->rk_eos.txn_coord_tmr, + /* don't restart if already started */ + rd_false, 1000 * timeout_ms, + rd_kafka_txn_coord_timer_cb, rk); +} + + +/** + * @brief Parses and handles a FindCoordinator response. + * + * @locality rdkafka main thread + * @locks none + */ +static void rd_kafka_txn_handle_FindCoordinator(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + rd_kafka_resp_err_t err, + rd_kafka_buf_t *rkbuf, + rd_kafka_buf_t *request, + void *opaque) { + const int log_decode_errors = LOG_ERR; + int16_t ErrorCode; + rd_kafkap_str_t Host; + int32_t NodeId, Port; + char errstr[512]; + + *errstr = '\0'; + + rk->rk_eos.txn_wait_coord = rd_false; + + if (err) + goto err; + + if (request->rkbuf_reqhdr.ApiVersion >= 1) + rd_kafka_buf_read_throttle_time(rkbuf); + + rd_kafka_buf_read_i16(rkbuf, &ErrorCode); + + if (request->rkbuf_reqhdr.ApiVersion >= 1) { + rd_kafkap_str_t ErrorMsg; + rd_kafka_buf_read_str(rkbuf, &ErrorMsg); + if (ErrorCode) + rd_snprintf(errstr, sizeof(errstr), "%.*s", + RD_KAFKAP_STR_PR(&ErrorMsg)); + } + + if ((err = ErrorCode)) + goto err; + + rd_kafka_buf_read_i32(rkbuf, &NodeId); + rd_kafka_buf_read_str(rkbuf, &Host); + rd_kafka_buf_read_i32(rkbuf, &Port); + + rd_rkb_dbg(rkb, EOS, "TXNCOORD", + "FindCoordinator response: " + "Transaction coordinator is broker %" PRId32 " (%.*s:%d)", + NodeId, RD_KAFKAP_STR_PR(&Host), (int)Port); + + rd_kafka_rdlock(rk); + if (NodeId == -1) + err = RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE; + else if (!(rkb = rd_kafka_broker_find_by_nodeid(rk, NodeId))) { + rd_snprintf(errstr, sizeof(errstr), + "Transaction coordinator %" PRId32 " is unknown", + NodeId); + err = RD_KAFKA_RESP_ERR__UNKNOWN_BROKER; + } + rd_kafka_rdunlock(rk); + + if (err) + goto err; + + rd_kafka_wrlock(rk); + rd_kafka_txn_coord_set(rk, rkb, "FindCoordinator response"); + rd_kafka_wrunlock(rk); + + rd_kafka_broker_destroy(rkb); + + return; + +err_parse: + err = rkbuf->rkbuf_err; +err: + + switch (err) { + case RD_KAFKA_RESP_ERR__DESTROY: + return; + + case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED: + case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED: + rd_kafka_wrlock(rk); + rd_kafka_txn_set_fatal_error( + rkb->rkb_rk, RD_DONT_LOCK, err, + "Failed to find transaction coordinator: %s: %s%s%s", + rd_kafka_broker_name(rkb), rd_kafka_err2str(err), + *errstr ? ": " : "", errstr); + + rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_FATAL_ERROR); + rd_kafka_wrunlock(rk); + return; + + case RD_KAFKA_RESP_ERR__UNKNOWN_BROKER: + rd_kafka_metadata_refresh_brokers(rk, NULL, errstr); + break; + + default: + break; + } + + rd_kafka_wrlock(rk); + rd_kafka_txn_coord_set( + rk, NULL, "Failed to find transaction coordinator: %s: %s", + rd_kafka_err2name(err), *errstr ? errstr : rd_kafka_err2str(err)); + rd_kafka_wrunlock(rk); +} + + + +/** + * @brief Query for the transaction coordinator. + * + * @returns true if a fatal error was raised, else false. + * + * @locality rdkafka main thread + * @locks rd_kafka_wrlock(rk) MUST be held. + */ +rd_bool_t rd_kafka_txn_coord_query(rd_kafka_t *rk, const char *reason) { + rd_kafka_resp_err_t err; + char errstr[512]; + rd_kafka_broker_t *rkb; + + rd_assert(rd_kafka_is_transactional(rk)); + + if (rk->rk_eos.txn_wait_coord) { + rd_kafka_dbg(rk, EOS, "TXNCOORD", + "Not sending coordinator query (%s): " + "waiting for previous query to finish", + reason); + return rd_false; + } + + /* Find usable broker to query for the txn coordinator */ + rkb = rd_kafka_idemp_broker_any(rk, &err, errstr, sizeof(errstr)); + if (!rkb) { + rd_kafka_dbg(rk, EOS, "TXNCOORD", + "Unable to query for transaction coordinator: " + "%s: %s", + reason, errstr); + + if (rd_kafka_idemp_check_error(rk, err, errstr, rd_false)) + return rd_true; + + rd_kafka_txn_coord_timer_start(rk, 500); + + return rd_false; + } + + rd_kafka_dbg(rk, EOS, "TXNCOORD", + "Querying for transaction coordinator: %s", reason); + + /* Send FindCoordinator request */ + err = rd_kafka_FindCoordinatorRequest( + rkb, RD_KAFKA_COORD_TXN, rk->rk_conf.eos.transactional_id, + RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_txn_handle_FindCoordinator, + NULL); + + if (err) { + rd_snprintf(errstr, sizeof(errstr), + "Failed to send coordinator query to %s: " + "%s", + rd_kafka_broker_name(rkb), rd_kafka_err2str(err)); + + rd_kafka_broker_destroy(rkb); + + if (rd_kafka_idemp_check_error(rk, err, errstr, rd_false)) + return rd_true; /* Fatal error */ + + rd_kafka_txn_coord_timer_start(rk, 500); + + return rd_false; + } + + rd_kafka_broker_destroy(rkb); + + rk->rk_eos.txn_wait_coord = rd_true; + + return rd_false; +} + +/** + * @brief Sets or clears the current coordinator address. + * + * @returns true if the coordinator was changed, else false. + * + * @locality rdkafka main thread + * @locks rd_kafka_wrlock(rk) MUST be held + */ +rd_bool_t rd_kafka_txn_coord_set(rd_kafka_t *rk, + rd_kafka_broker_t *rkb, + const char *fmt, + ...) { + char buf[256]; + va_list ap; + + va_start(ap, fmt); + vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + + + if (rk->rk_eos.txn_curr_coord == rkb) { + if (!rkb) { + rd_kafka_dbg(rk, EOS, "TXNCOORD", "%s", buf); + /* Keep querying for the coordinator */ + rd_kafka_txn_coord_timer_start(rk, 500); + } + return rd_false; + } + + rd_kafka_dbg(rk, EOS, "TXNCOORD", + "Transaction coordinator changed from %s -> %s: %s", + rk->rk_eos.txn_curr_coord + ? rd_kafka_broker_name(rk->rk_eos.txn_curr_coord) + : "(none)", + rkb ? rd_kafka_broker_name(rkb) : "(none)", buf); + + if (rk->rk_eos.txn_curr_coord) + rd_kafka_broker_destroy(rk->rk_eos.txn_curr_coord); + + rk->rk_eos.txn_curr_coord = rkb; + if (rkb) + rd_kafka_broker_keep(rkb); + + rd_kafka_broker_set_nodename(rk->rk_eos.txn_coord, + rk->rk_eos.txn_curr_coord); + + if (!rkb) { + /* Lost the current coordinator, query for new coordinator */ + rd_kafka_txn_coord_timer_start(rk, 500); + } else { + /* Trigger PID state machine */ + rd_kafka_idemp_pid_fsm(rk); + } + + return rd_true; +} + + +/** + * @brief Coordinator state monitor callback. + * + * @locality rdkafka main thread + * @locks none + */ +void rd_kafka_txn_coord_monitor_cb(rd_kafka_broker_t *rkb) { + rd_kafka_t *rk = rkb->rkb_rk; + rd_kafka_broker_state_t state = rd_kafka_broker_get_state(rkb); + rd_bool_t is_up; + + rd_assert(rk->rk_eos.txn_coord == rkb); + + is_up = rd_kafka_broker_state_is_up(state); + rd_rkb_dbg(rkb, EOS, "COORD", "Transaction coordinator is now %s", + is_up ? "up" : "down"); + + if (!is_up) { + /* Coordinator is down, the connection will be re-established + * automatically, but we also trigger a coordinator query + * to pick up on coordinator change. */ + rd_kafka_txn_coord_timer_start(rk, 500); + + } else { + /* Coordinator is up. */ + + rd_kafka_wrlock(rk); + if (rk->rk_eos.idemp_state < RD_KAFKA_IDEMP_STATE_ASSIGNED) { + /* See if a idempotence state change is warranted. */ + rd_kafka_idemp_pid_fsm(rk); + + } else if (rk->rk_eos.idemp_state == + RD_KAFKA_IDEMP_STATE_ASSIGNED) { + /* PID is already valid, continue transactional + * operations by checking for partitions to register */ + rd_kafka_txn_schedule_register_partitions(rk, + 1 /*ASAP*/); + } + + rd_kafka_wrunlock(rk); + } +} + + + +/** + * @brief Transactions manager destructor + * + * @locality rdkafka main thread + * @locks none + */ +void rd_kafka_txns_term(rd_kafka_t *rk) { + + RD_IF_FREE(rk->rk_eos.txn_errstr, rd_free); + RD_IF_FREE(rk->rk_eos.txn_curr_api.error, rd_kafka_error_destroy); + + mtx_destroy(&rk->rk_eos.txn_curr_api.lock); + cnd_destroy(&rk->rk_eos.txn_curr_api.cnd); + + rd_kafka_timer_stop(&rk->rk_timers, &rk->rk_eos.txn_coord_tmr, 1); + rd_kafka_timer_stop(&rk->rk_timers, &rk->rk_eos.txn_register_parts_tmr, + 1); + + if (rk->rk_eos.txn_curr_coord) + rd_kafka_broker_destroy(rk->rk_eos.txn_curr_coord); + + /* Logical coordinator */ + rd_kafka_broker_persistent_connection_del( + rk->rk_eos.txn_coord, &rk->rk_eos.txn_coord->rkb_persistconn.coord); + rd_kafka_broker_monitor_del(&rk->rk_eos.txn_coord_mon); + rd_kafka_broker_destroy(rk->rk_eos.txn_coord); + rk->rk_eos.txn_coord = NULL; + + mtx_lock(&rk->rk_eos.txn_pending_lock); + rd_kafka_txn_clear_pending_partitions(rk); + mtx_unlock(&rk->rk_eos.txn_pending_lock); + mtx_destroy(&rk->rk_eos.txn_pending_lock); + + rd_kafka_txn_clear_partitions(rk); +} + + +/** + * @brief Initialize transactions manager. + * + * @locality application thread + * @locks none + */ +void rd_kafka_txns_init(rd_kafka_t *rk) { + rd_atomic32_init(&rk->rk_eos.txn_may_enq, 0); + mtx_init(&rk->rk_eos.txn_pending_lock, mtx_plain); + TAILQ_INIT(&rk->rk_eos.txn_pending_rktps); + TAILQ_INIT(&rk->rk_eos.txn_waitresp_rktps); + TAILQ_INIT(&rk->rk_eos.txn_rktps); + + mtx_init(&rk->rk_eos.txn_curr_api.lock, mtx_plain); + cnd_init(&rk->rk_eos.txn_curr_api.cnd); + + /* Logical coordinator */ + rk->rk_eos.txn_coord = + rd_kafka_broker_add_logical(rk, "TxnCoordinator"); + + rd_kafka_broker_monitor_add(&rk->rk_eos.txn_coord_mon, + rk->rk_eos.txn_coord, rk->rk_ops, + rd_kafka_txn_coord_monitor_cb); + + rd_kafka_broker_persistent_connection_add( + rk->rk_eos.txn_coord, &rk->rk_eos.txn_coord->rkb_persistconn.coord); + + rd_atomic64_init(&rk->rk_eos.txn_dr_fails, 0); +} |