/* * librdkafka - Apache Kafka C library * * Copyright (c) 2019 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /** * @name Transaction Manager * */ #include #include "rd.h" #include "rdkafka_int.h" #include "rdkafka_txnmgr.h" #include "rdkafka_idempotence.h" #include "rdkafka_request.h" #include "rdkafka_error.h" #include "rdunittest.h" #include "rdrand.h" static void rd_kafka_txn_coord_timer_start(rd_kafka_t *rk, int timeout_ms); #define rd_kafka_txn_curr_api_set_result(rk, actions, error) \ rd_kafka_txn_curr_api_set_result0(__FUNCTION__, __LINE__, rk, actions, \ error) static void rd_kafka_txn_curr_api_set_result0(const char *func, int line, rd_kafka_t *rk, int actions, rd_kafka_error_t *error); /** * @return a normalized error code, this for instance abstracts different * fencing errors to return one single fencing error to the application. */ static rd_kafka_resp_err_t rd_kafka_txn_normalize_err(rd_kafka_resp_err_t err) { switch (err) { case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH: case RD_KAFKA_RESP_ERR_PRODUCER_FENCED: return RD_KAFKA_RESP_ERR__FENCED; case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE: return RD_KAFKA_RESP_ERR__TIMED_OUT; default: return err; } } /** * @brief Ensure client is configured as a transactional producer, * else return error. * * @locality application thread * @locks none */ static RD_INLINE rd_kafka_error_t * rd_kafka_ensure_transactional(const rd_kafka_t *rk) { if (unlikely(rk->rk_type != RD_KAFKA_PRODUCER)) return rd_kafka_error_new( RD_KAFKA_RESP_ERR__INVALID_ARG, "The Transactional API can only be used " "on producer instances"); if (unlikely(!rk->rk_conf.eos.transactional_id)) return rd_kafka_error_new(RD_KAFKA_RESP_ERR__NOT_CONFIGURED, "The Transactional API requires " "transactional.id to be configured"); return NULL; } /** * @brief Ensure transaction state is one of \p states. * * @param the required states, ended by a -1 sentinel. * * @locks_required rd_kafka_*lock(rk) MUST be held * @locality any */ static RD_INLINE rd_kafka_error_t * rd_kafka_txn_require_states0(rd_kafka_t *rk, rd_kafka_txn_state_t states[]) { rd_kafka_error_t *error; size_t i; if (unlikely((error = rd_kafka_ensure_transactional(rk)) != NULL)) return error; for (i = 0; (int)states[i] != -1; i++) if (rk->rk_eos.txn_state == states[i]) return NULL; /* For fatal and abortable states return the last transactional * error, for all other states just return a state error. */ if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_FATAL_ERROR) error = rd_kafka_error_new_fatal(rk->rk_eos.txn_err, "%s", rk->rk_eos.txn_errstr); else if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_ABORTABLE_ERROR) { error = rd_kafka_error_new(rk->rk_eos.txn_err, "%s", rk->rk_eos.txn_errstr); rd_kafka_error_set_txn_requires_abort(error); } else error = rd_kafka_error_new( RD_KAFKA_RESP_ERR__STATE, "Operation not valid in state %s", rd_kafka_txn_state2str(rk->rk_eos.txn_state)); return error; } /** @brief \p ... is a list of states */ #define rd_kafka_txn_require_state(rk, ...) \ rd_kafka_txn_require_states0( \ rk, (rd_kafka_txn_state_t[]) {__VA_ARGS__, -1}) /** * @param ignore Will be set to true if the state transition should be * completely ignored. * @returns true if the state transition is valid, else false. */ static rd_bool_t rd_kafka_txn_state_transition_is_valid(rd_kafka_txn_state_t curr, rd_kafka_txn_state_t new_state, rd_bool_t *ignore) { *ignore = rd_false; switch (new_state) { case RD_KAFKA_TXN_STATE_INIT: /* This is the initialized value and this transition will * never happen. */ return rd_false; case RD_KAFKA_TXN_STATE_WAIT_PID: return curr == RD_KAFKA_TXN_STATE_INIT; case RD_KAFKA_TXN_STATE_READY_NOT_ACKED: return curr == RD_KAFKA_TXN_STATE_WAIT_PID; case RD_KAFKA_TXN_STATE_READY: return curr == RD_KAFKA_TXN_STATE_READY_NOT_ACKED || curr == RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED || curr == RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED; case RD_KAFKA_TXN_STATE_IN_TRANSACTION: return curr == RD_KAFKA_TXN_STATE_READY; case RD_KAFKA_TXN_STATE_BEGIN_COMMIT: return curr == RD_KAFKA_TXN_STATE_IN_TRANSACTION; case RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION: return curr == RD_KAFKA_TXN_STATE_BEGIN_COMMIT; case RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED: return curr == RD_KAFKA_TXN_STATE_BEGIN_COMMIT || curr == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION; case RD_KAFKA_TXN_STATE_BEGIN_ABORT: return curr == RD_KAFKA_TXN_STATE_IN_TRANSACTION || curr == RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION || curr == RD_KAFKA_TXN_STATE_ABORTABLE_ERROR; case RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION: return curr == RD_KAFKA_TXN_STATE_BEGIN_ABORT; case RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED: return curr == RD_KAFKA_TXN_STATE_BEGIN_ABORT || curr == RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION; case RD_KAFKA_TXN_STATE_ABORTABLE_ERROR: if (curr == RD_KAFKA_TXN_STATE_BEGIN_ABORT || curr == RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION || curr == RD_KAFKA_TXN_STATE_FATAL_ERROR) { /* Ignore sub-sequent abortable errors in * these states. */ *ignore = rd_true; return 1; } return curr == RD_KAFKA_TXN_STATE_IN_TRANSACTION || curr == RD_KAFKA_TXN_STATE_BEGIN_COMMIT || curr == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION; case RD_KAFKA_TXN_STATE_FATAL_ERROR: /* Any state can transition to a fatal error */ return rd_true; default: RD_BUG("Invalid txn state transition: %s -> %s", rd_kafka_txn_state2str(curr), rd_kafka_txn_state2str(new_state)); return rd_false; } } /** * @brief Transition the transaction state to \p new_state. * * @returns 0 on success or an error code if the state transition * was invalid. * * @locality rdkafka main thread * @locks_required rd_kafka_wrlock MUST be held */ static void rd_kafka_txn_set_state(rd_kafka_t *rk, rd_kafka_txn_state_t new_state) { rd_bool_t ignore; if (rk->rk_eos.txn_state == new_state) return; /* Check if state transition is valid */ if (!rd_kafka_txn_state_transition_is_valid(rk->rk_eos.txn_state, new_state, &ignore)) { rd_kafka_log(rk, LOG_CRIT, "TXNSTATE", "BUG: Invalid transaction state transition " "attempted: %s -> %s", rd_kafka_txn_state2str(rk->rk_eos.txn_state), rd_kafka_txn_state2str(new_state)); rd_assert(!*"BUG: Invalid transaction state transition"); } if (ignore) { /* Ignore this state change */ return; } rd_kafka_dbg(rk, EOS, "TXNSTATE", "Transaction state change %s -> %s", rd_kafka_txn_state2str(rk->rk_eos.txn_state), rd_kafka_txn_state2str(new_state)); /* If transitioning from IN_TRANSACTION, the app is no longer * allowed to enqueue (produce) messages. */ if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_IN_TRANSACTION) rd_atomic32_set(&rk->rk_eos.txn_may_enq, 0); else if (new_state == RD_KAFKA_TXN_STATE_IN_TRANSACTION) rd_atomic32_set(&rk->rk_eos.txn_may_enq, 1); rk->rk_eos.txn_state = new_state; } /** * @returns the current transaction timeout, i.e., the time remaining in * the current transaction. * * @remark The remaining timeout is currently not tracked, so this function * will always return the remaining time based on transaction.timeout.ms * and we rely on the broker to enforce the actual remaining timeout. * This is still better than not having a timeout cap at all, which * used to be the case. * It's also tricky knowing exactly what the controller thinks the * remaining transaction time is. * * @locks_required rd_kafka_*lock(rk) MUST be held. */ static RD_INLINE rd_ts_t rd_kafka_txn_current_timeout(const rd_kafka_t *rk) { return rd_timeout_init(rk->rk_conf.eos.transaction_timeout_ms); } /** * @brief An unrecoverable transactional error has occurred. * * @param do_lock RD_DO_LOCK: rd_kafka_wrlock(rk) will be acquired and released, * RD_DONT_LOCK: rd_kafka_wrlock(rk) MUST be held by the caller. * @locality any * @locks rd_kafka_wrlock MUST NOT be held */ void rd_kafka_txn_set_fatal_error(rd_kafka_t *rk, rd_dolock_t do_lock, rd_kafka_resp_err_t err, const char *fmt, ...) { char errstr[512]; va_list ap; va_start(ap, fmt); vsnprintf(errstr, sizeof(errstr), fmt, ap); va_end(ap); rd_kafka_log(rk, LOG_ALERT, "TXNERR", "Fatal transaction error: %s (%s)", errstr, rd_kafka_err2name(err)); if (do_lock) rd_kafka_wrlock(rk); rd_kafka_set_fatal_error0(rk, RD_DONT_LOCK, err, "%s", errstr); rk->rk_eos.txn_err = err; if (rk->rk_eos.txn_errstr) rd_free(rk->rk_eos.txn_errstr); rk->rk_eos.txn_errstr = rd_strdup(errstr); rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_FATAL_ERROR); if (do_lock) rd_kafka_wrunlock(rk); /* If application has called a transactional API and * it has now failed, reply to the app. * If there is no currently called API then this is a no-op. */ rd_kafka_txn_curr_api_set_result( rk, 0, rd_kafka_error_new_fatal(err, "%s", errstr)); } /** * @brief An abortable/recoverable transactional error has occured. * * @param requires_epoch_bump If true; abort_transaction() will bump the epoch * on the coordinator (KIP-360). * @locality rdkafka main thread * @locks rd_kafka_wrlock MUST NOT be held */ void rd_kafka_txn_set_abortable_error0(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_bool_t requires_epoch_bump, const char *fmt, ...) { char errstr[512]; va_list ap; if (rd_kafka_fatal_error(rk, NULL, 0)) { rd_kafka_dbg(rk, EOS, "FATAL", "Not propagating abortable transactional " "error (%s) " "since previous fatal error already raised", rd_kafka_err2name(err)); return; } va_start(ap, fmt); vsnprintf(errstr, sizeof(errstr), fmt, ap); va_end(ap); rd_kafka_wrlock(rk); if (requires_epoch_bump) rk->rk_eos.txn_requires_epoch_bump = requires_epoch_bump; if (rk->rk_eos.txn_err) { rd_kafka_dbg(rk, EOS, "TXNERR", "Ignoring sub-sequent abortable transaction " "error: %s (%s): " "previous error (%s) already raised", errstr, rd_kafka_err2name(err), rd_kafka_err2name(rk->rk_eos.txn_err)); rd_kafka_wrunlock(rk); return; } rk->rk_eos.txn_err = err; if (rk->rk_eos.txn_errstr) rd_free(rk->rk_eos.txn_errstr); rk->rk_eos.txn_errstr = rd_strdup(errstr); rd_kafka_log(rk, LOG_ERR, "TXNERR", "Current transaction failed in state %s: %s (%s%s)", rd_kafka_txn_state2str(rk->rk_eos.txn_state), errstr, rd_kafka_err2name(err), requires_epoch_bump ? ", requires epoch bump" : ""); rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_ABORTABLE_ERROR); rd_kafka_wrunlock(rk); /* Purge all messages in queue/flight */ rd_kafka_purge(rk, RD_KAFKA_PURGE_F_QUEUE | RD_KAFKA_PURGE_F_ABORT_TXN | RD_KAFKA_PURGE_F_NON_BLOCKING); } /** * @brief Send request-reply op to txnmgr callback, waits for a reply * or timeout, and returns an error object or NULL on success. * * @remark Does not alter the current API state. * * @returns an error object on failure, else NULL. * * @locality application thread * * @locks_acquired rk->rk_eos.txn_curr_api.lock */ #define rd_kafka_txn_op_req(rk, op_cb, abs_timeout) \ rd_kafka_txn_op_req0(__FUNCTION__, __LINE__, rk, \ rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN, op_cb), \ abs_timeout) #define rd_kafka_txn_op_req1(rk, rko, abs_timeout) \ rd_kafka_txn_op_req0(__FUNCTION__, __LINE__, rk, rko, abs_timeout) static rd_kafka_error_t *rd_kafka_txn_op_req0(const char *func, int line, rd_kafka_t *rk, rd_kafka_op_t *rko, rd_ts_t abs_timeout) { rd_kafka_error_t *error = NULL; rd_bool_t has_result = rd_false; mtx_lock(&rk->rk_eos.txn_curr_api.lock); /* See if there's already a result, if so return that immediately. */ if (rk->rk_eos.txn_curr_api.has_result) { error = rk->rk_eos.txn_curr_api.error; rk->rk_eos.txn_curr_api.error = NULL; rk->rk_eos.txn_curr_api.has_result = rd_false; mtx_unlock(&rk->rk_eos.txn_curr_api.lock); rd_kafka_op_destroy(rko); rd_kafka_dbg(rk, EOS, "OPREQ", "%s:%d: %s: returning already set result: %s", func, line, rk->rk_eos.txn_curr_api.name, error ? rd_kafka_error_string(error) : "Success"); return error; } /* Send one-way op to txnmgr */ if (!rd_kafka_q_enq(rk->rk_ops, rko)) RD_BUG("rk_ops queue disabled"); /* Wait for result to be set, or timeout */ do { if (cnd_timedwait_ms(&rk->rk_eos.txn_curr_api.cnd, &rk->rk_eos.txn_curr_api.lock, rd_timeout_remains(abs_timeout)) == thrd_timedout) break; } while (!rk->rk_eos.txn_curr_api.has_result); if ((has_result = rk->rk_eos.txn_curr_api.has_result)) { rk->rk_eos.txn_curr_api.has_result = rd_false; error = rk->rk_eos.txn_curr_api.error; rk->rk_eos.txn_curr_api.error = NULL; } mtx_unlock(&rk->rk_eos.txn_curr_api.lock); /* If there was no reply it means the background operation is still * in progress and its result will be set later, so the application * should call this API again to resume. */ if (!has_result) { error = rd_kafka_error_new_retriable( RD_KAFKA_RESP_ERR__TIMED_OUT, "Timed out waiting for operation to finish, " "retry call to resume"); } return error; } /** * @brief Begin (or resume) a public API call. * * This function will prevent conflicting calls. * * @returns an error on failure, or NULL on success. * * @locality application thread * * @locks_acquired rk->rk_eos.txn_curr_api.lock */ static rd_kafka_error_t *rd_kafka_txn_curr_api_begin(rd_kafka_t *rk, const char *api_name, rd_bool_t cap_timeout, int timeout_ms, rd_ts_t *abs_timeoutp) { rd_kafka_error_t *error = NULL; if ((error = rd_kafka_ensure_transactional(rk))) return error; rd_kafka_rdlock(rk); /* Need lock for retrieving the states */ rd_kafka_dbg(rk, EOS, "TXNAPI", "Transactional API called: %s " "(in txn state %s, idemp state %s, API timeout %d)", api_name, rd_kafka_txn_state2str(rk->rk_eos.txn_state), rd_kafka_idemp_state2str(rk->rk_eos.idemp_state), timeout_ms); rd_kafka_rdunlock(rk); mtx_lock(&rk->rk_eos.txn_curr_api.lock); /* Make sure there is no other conflicting in-progress API call, * and that this same call is not currently under way in another thread. */ if (unlikely(*rk->rk_eos.txn_curr_api.name && strcmp(rk->rk_eos.txn_curr_api.name, api_name))) { /* Another API is being called. */ error = rd_kafka_error_new_retriable( RD_KAFKA_RESP_ERR__CONFLICT, "Conflicting %s API call is already in progress", rk->rk_eos.txn_curr_api.name); } else if (unlikely(rk->rk_eos.txn_curr_api.calling)) { /* There is an active call to this same API * from another thread. */ error = rd_kafka_error_new_retriable( RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS, "Simultaneous %s API calls not allowed", rk->rk_eos.txn_curr_api.name); } else if (*rk->rk_eos.txn_curr_api.name) { /* Resumed call */ rk->rk_eos.txn_curr_api.calling = rd_true; } else { /* New call */ rd_snprintf(rk->rk_eos.txn_curr_api.name, sizeof(rk->rk_eos.txn_curr_api.name), "%s", api_name); rk->rk_eos.txn_curr_api.calling = rd_true; rd_assert(!rk->rk_eos.txn_curr_api.error); } if (!error && abs_timeoutp) { rd_ts_t abs_timeout = rd_timeout_init(timeout_ms); if (cap_timeout) { /* Cap API timeout to remaining transaction timeout */ rd_ts_t abs_txn_timeout = rd_kafka_txn_current_timeout(rk); if (abs_timeout > abs_txn_timeout || abs_timeout == RD_POLL_INFINITE) abs_timeout = abs_txn_timeout; } *abs_timeoutp = abs_timeout; } mtx_unlock(&rk->rk_eos.txn_curr_api.lock); return error; } /** * @brief Return from public API. * * This function updates the current API state and must be used in * all return statements from the public txn API. * * @param resumable If true and the error is retriable, the current API state * will be maintained to allow a future call to the same API * to resume the background operation that is in progress. * @param error The error object, if not NULL, is simply inspected and returned. * * @returns the \p error object as-is. * * @locality application thread * @locks_acquired rk->rk_eos.txn_curr_api.lock */ #define rd_kafka_txn_curr_api_return(rk, resumable, error) \ rd_kafka_txn_curr_api_return0(__FUNCTION__, __LINE__, rk, resumable, \ error) static rd_kafka_error_t * rd_kafka_txn_curr_api_return0(const char *func, int line, rd_kafka_t *rk, rd_bool_t resumable, rd_kafka_error_t *error) { mtx_lock(&rk->rk_eos.txn_curr_api.lock); rd_kafka_dbg( rk, EOS, "TXNAPI", "Transactional API %s return%s at %s:%d: %s", rk->rk_eos.txn_curr_api.name, resumable && rd_kafka_error_is_retriable(error) ? " resumable" : "", func, line, error ? rd_kafka_error_string(error) : "Success"); rd_assert(*rk->rk_eos.txn_curr_api.name); rd_assert(rk->rk_eos.txn_curr_api.calling); rk->rk_eos.txn_curr_api.calling = rd_false; /* Reset the current API call so that other APIs may be called, * unless this is a resumable API and the error is retriable. */ if (!resumable || (error && !rd_kafka_error_is_retriable(error))) { *rk->rk_eos.txn_curr_api.name = '\0'; /* It is possible for another error to have been set, * typically when a fatal error is raised, so make sure * we're not destroying the error we're supposed to return. */ if (rk->rk_eos.txn_curr_api.error != error) rd_kafka_error_destroy(rk->rk_eos.txn_curr_api.error); rk->rk_eos.txn_curr_api.error = NULL; } mtx_unlock(&rk->rk_eos.txn_curr_api.lock); return error; } /** * @brief Set the (possibly intermediary) result for the current API call. * * The result is \p error NULL for success or \p error object on failure. * If the application is actively blocked on the call the result will be * sent on its replyq, otherwise the result will be stored for future retrieval * the next time the application calls the API again. * * @locality rdkafka main thread * @locks_acquired rk->rk_eos.txn_curr_api.lock */ static void rd_kafka_txn_curr_api_set_result0(const char *func, int line, rd_kafka_t *rk, int actions, rd_kafka_error_t *error) { mtx_lock(&rk->rk_eos.txn_curr_api.lock); if (!*rk->rk_eos.txn_curr_api.name) { /* No current API being called, this could happen * if the application thread API deemed the API was done, * or for fatal errors that attempt to set the result * regardless of current API state. * In this case we simply throw away this result. */ if (error) rd_kafka_error_destroy(error); mtx_unlock(&rk->rk_eos.txn_curr_api.lock); return; } rd_kafka_dbg(rk, EOS, "APIRESULT", "Transactional API %s (intermediary%s) result set " "at %s:%d: %s (%sprevious result%s%s)", rk->rk_eos.txn_curr_api.name, rk->rk_eos.txn_curr_api.calling ? ", calling" : "", func, line, error ? rd_kafka_error_string(error) : "Success", rk->rk_eos.txn_curr_api.has_result ? "" : "no ", rk->rk_eos.txn_curr_api.error ? ": " : "", rd_kafka_error_string(rk->rk_eos.txn_curr_api.error)); rk->rk_eos.txn_curr_api.has_result = rd_true; if (rk->rk_eos.txn_curr_api.error) { /* If there's already an error it typically means * a fatal error has been raised, so nothing more to do here. */ rd_kafka_dbg( rk, EOS, "APIRESULT", "Transactional API %s error " "already set: %s", rk->rk_eos.txn_curr_api.name, rd_kafka_error_string(rk->rk_eos.txn_curr_api.error)); mtx_unlock(&rk->rk_eos.txn_curr_api.lock); if (error) rd_kafka_error_destroy(error); return; } if (error) { if (actions & RD_KAFKA_ERR_ACTION_FATAL) rd_kafka_error_set_fatal(error); else if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) rd_kafka_error_set_txn_requires_abort(error); else if (actions & RD_KAFKA_ERR_ACTION_RETRY) rd_kafka_error_set_retriable(error); } rk->rk_eos.txn_curr_api.error = error; error = NULL; cnd_broadcast(&rk->rk_eos.txn_curr_api.cnd); mtx_unlock(&rk->rk_eos.txn_curr_api.lock); } /** * @brief The underlying idempotent producer state changed, * see if this affects the transactional operations. * * @locality any thread * @locks rd_kafka_wrlock(rk) MUST be held */ void rd_kafka_txn_idemp_state_change(rd_kafka_t *rk, rd_kafka_idemp_state_t idemp_state) { rd_bool_t set_result = rd_false; if (idemp_state == RD_KAFKA_IDEMP_STATE_ASSIGNED && rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_WAIT_PID) { /* Application is calling (or has called) init_transactions() */ RD_UT_COVERAGE(1); rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_READY_NOT_ACKED); set_result = rd_true; } else if (idemp_state == RD_KAFKA_IDEMP_STATE_ASSIGNED && (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_BEGIN_ABORT || rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION)) { /* Application is calling abort_transaction() as we're * recovering from a fatal idempotence error. */ rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED); set_result = rd_true; } else if (idemp_state == RD_KAFKA_IDEMP_STATE_FATAL_ERROR && rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_FATAL_ERROR) { /* A fatal error has been raised. */ rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_FATAL_ERROR); } if (set_result) { /* Application has called init_transactions() or * abort_transaction() and it is now complete, * reply to the app. */ rd_kafka_txn_curr_api_set_result(rk, 0, NULL); } } /** * @brief Moves a partition from the pending list to the proper list. * * @locality rdkafka main thread * @locks none */ static void rd_kafka_txn_partition_registered(rd_kafka_toppar_t *rktp) { rd_kafka_t *rk = rktp->rktp_rkt->rkt_rk; rd_kafka_toppar_lock(rktp); if (unlikely(!(rktp->rktp_flags & RD_KAFKA_TOPPAR_F_PEND_TXN))) { rd_kafka_dbg(rk, EOS | RD_KAFKA_DBG_PROTOCOL, "ADDPARTS", "\"%.*s\" [%" PRId32 "] is not in pending " "list but returned in AddPartitionsToTxn " "response: ignoring", RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), rktp->rktp_partition); rd_kafka_toppar_unlock(rktp); return; } rd_kafka_dbg(rk, EOS | RD_KAFKA_DBG_TOPIC, "ADDPARTS", "%.*s [%" PRId32 "] registered with transaction", RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic), rktp->rktp_partition); rd_assert((rktp->rktp_flags & (RD_KAFKA_TOPPAR_F_PEND_TXN | RD_KAFKA_TOPPAR_F_IN_TXN)) == RD_KAFKA_TOPPAR_F_PEND_TXN); rktp->rktp_flags = (rktp->rktp_flags & ~RD_KAFKA_TOPPAR_F_PEND_TXN) | RD_KAFKA_TOPPAR_F_IN_TXN; rd_kafka_toppar_unlock(rktp); mtx_lock(&rk->rk_eos.txn_pending_lock); TAILQ_REMOVE(&rk->rk_eos.txn_waitresp_rktps, rktp, rktp_txnlink); mtx_unlock(&rk->rk_eos.txn_pending_lock); /* Not destroy()/keep():ing rktp since it just changes tailq. */ TAILQ_INSERT_TAIL(&rk->rk_eos.txn_rktps, rktp, rktp_txnlink); } /** * @brief Handle AddPartitionsToTxnResponse * * @locality rdkafka main thread * @locks none */ static void rd_kafka_txn_handle_AddPartitionsToTxn(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, void *opaque) { const int log_decode_errors = LOG_ERR; int32_t TopicCnt; int actions = 0; int retry_backoff_ms = 500; /* retry backoff */ rd_kafka_resp_err_t reset_coord_err = RD_KAFKA_RESP_ERR_NO_ERROR; rd_bool_t require_bump = rd_false; if (err) goto done; rd_kafka_rdlock(rk); rd_assert(rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION); if (rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_IN_TRANSACTION && rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_BEGIN_COMMIT) { /* Response received after aborting transaction */ rd_rkb_dbg(rkb, EOS, "ADDPARTS", "Ignoring outdated AddPartitionsToTxn response in " "state %s", rd_kafka_txn_state2str(rk->rk_eos.txn_state)); rd_kafka_rdunlock(rk); err = RD_KAFKA_RESP_ERR__OUTDATED; goto done; } rd_kafka_rdunlock(rk); rd_kafka_buf_read_throttle_time(rkbuf); rd_kafka_buf_read_i32(rkbuf, &TopicCnt); while (TopicCnt-- > 0) { rd_kafkap_str_t Topic; rd_kafka_topic_t *rkt; int32_t PartCnt; rd_bool_t request_error = rd_false; rd_kafka_buf_read_str(rkbuf, &Topic); rd_kafka_buf_read_i32(rkbuf, &PartCnt); rkt = rd_kafka_topic_find0(rk, &Topic); if (rkt) rd_kafka_topic_rdlock(rkt); /* for toppar_get() */ while (PartCnt-- > 0) { rd_kafka_toppar_t *rktp = NULL; int32_t Partition; int16_t ErrorCode; int p_actions = 0; rd_kafka_buf_read_i32(rkbuf, &Partition); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); if (rkt) rktp = rd_kafka_toppar_get(rkt, Partition, rd_false); if (!rktp) { rd_rkb_dbg(rkb, EOS | RD_KAFKA_DBG_PROTOCOL, "ADDPARTS", "Unknown partition \"%.*s\" " "[%" PRId32 "] in AddPartitionsToTxn " "response: ignoring", RD_KAFKAP_STR_PR(&Topic), Partition); continue; } switch (ErrorCode) { case RD_KAFKA_RESP_ERR_NO_ERROR: /* Move rktp from pending to proper list */ rd_kafka_txn_partition_registered(rktp); break; /* Request-level errors. * As soon as any of these errors are seen * the rest of the partitions are ignored * since they will have the same error. */ case RD_KAFKA_RESP_ERR_NOT_COORDINATOR: case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE: reset_coord_err = ErrorCode; p_actions |= RD_KAFKA_ERR_ACTION_RETRY; err = ErrorCode; request_error = rd_true; break; case RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS: retry_backoff_ms = 20; /* FALLTHRU */ case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART: p_actions |= RD_KAFKA_ERR_ACTION_RETRY; err = ErrorCode; request_error = rd_true; break; case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH: case RD_KAFKA_RESP_ERR_PRODUCER_FENCED: case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED: case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE: case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED: p_actions |= RD_KAFKA_ERR_ACTION_FATAL; err = ErrorCode; request_error = rd_true; break; case RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID: case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING: require_bump = rd_true; p_actions |= RD_KAFKA_ERR_ACTION_PERMANENT; err = ErrorCode; request_error = rd_true; break; /* Partition-level errors. * Continue with rest of partitions. */ case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED: p_actions |= RD_KAFKA_ERR_ACTION_PERMANENT; err = ErrorCode; break; case RD_KAFKA_RESP_ERR_OPERATION_NOT_ATTEMPTED: /* Partition skipped due to other partition's * error. */ p_actions |= RD_KAFKA_ERR_ACTION_PERMANENT; if (!err) err = ErrorCode; break; default: /* Other partition error */ p_actions |= RD_KAFKA_ERR_ACTION_PERMANENT; err = ErrorCode; break; } if (ErrorCode) { actions |= p_actions; if (!(p_actions & (RD_KAFKA_ERR_ACTION_FATAL | RD_KAFKA_ERR_ACTION_PERMANENT))) rd_rkb_dbg( rkb, EOS, "ADDPARTS", "AddPartitionsToTxn response: " "partition \"%.*s\": " "[%" PRId32 "]: %s", RD_KAFKAP_STR_PR(&Topic), Partition, rd_kafka_err2str(ErrorCode)); else rd_rkb_log(rkb, LOG_ERR, "ADDPARTS", "Failed to add partition " "\"%.*s\" [%" PRId32 "] to " "transaction: %s", RD_KAFKAP_STR_PR(&Topic), Partition, rd_kafka_err2str(ErrorCode)); } rd_kafka_toppar_destroy(rktp); if (request_error) break; /* Request-level error seen, bail out */ } if (rkt) { rd_kafka_topic_rdunlock(rkt); rd_kafka_topic_destroy0(rkt); } if (request_error) break; /* Request-level error seen, bail out */ } if (actions) /* Actions set from encountered errors */ goto done; /* Since these partitions are now allowed to produce * we wake up all broker threads. */ rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT, "partitions added to transaction"); goto done; err_parse: err = rkbuf->rkbuf_err; actions |= RD_KAFKA_ERR_ACTION_PERMANENT; done: if (err) { rd_assert(rk->rk_eos.txn_req_cnt > 0); rk->rk_eos.txn_req_cnt--; } /* Handle local request-level errors */ switch (err) { case RD_KAFKA_RESP_ERR_NO_ERROR: break; case RD_KAFKA_RESP_ERR__DESTROY: case RD_KAFKA_RESP_ERR__OUTDATED: /* Terminating or outdated, ignore response */ return; case RD_KAFKA_RESP_ERR__TRANSPORT: case RD_KAFKA_RESP_ERR__TIMED_OUT: default: /* For these errors we can't be sure if the * request was received by the broker or not, * so increase the txn_req_cnt back up as if * they were received so that and EndTxnRequest * is sent on abort_transaction(). */ rk->rk_eos.txn_req_cnt++; actions |= RD_KAFKA_ERR_ACTION_RETRY; break; } if (reset_coord_err) { rd_kafka_wrlock(rk); rd_kafka_txn_coord_set(rk, NULL, "AddPartitionsToTxn failed: %s", rd_kafka_err2str(reset_coord_err)); rd_kafka_wrunlock(rk); } /* Partitions that failed will still be on the waitresp list * and are moved back to the pending list for the next scheduled * AddPartitionsToTxn request. * If this request was successful there will be no remaining partitions * on the waitresp list. */ mtx_lock(&rk->rk_eos.txn_pending_lock); TAILQ_CONCAT_SORTED(&rk->rk_eos.txn_pending_rktps, &rk->rk_eos.txn_waitresp_rktps, rd_kafka_toppar_t *, rktp_txnlink, rd_kafka_toppar_topic_cmp); mtx_unlock(&rk->rk_eos.txn_pending_lock); err = rd_kafka_txn_normalize_err(err); if (actions & RD_KAFKA_ERR_ACTION_FATAL) { rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err, "Failed to add partitions to " "transaction: %s", rd_kafka_err2str(err)); } else if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) { /* Treat all other permanent errors as abortable errors. * If an epoch bump is required let idempo sort it out. */ if (require_bump) rd_kafka_idemp_drain_epoch_bump( rk, err, "Failed to add partition(s) to transaction " "on broker %s: %s (after %d ms)", rd_kafka_broker_name(rkb), rd_kafka_err2str(err), (int)(request->rkbuf_ts_sent / 1000)); else rd_kafka_txn_set_abortable_error( rk, err, "Failed to add partition(s) to transaction " "on broker %s: %s (after %d ms)", rd_kafka_broker_name(rkb), rd_kafka_err2str(err), (int)(request->rkbuf_ts_sent / 1000)); } else { /* Schedule registration of any new or remaining partitions */ rd_kafka_txn_schedule_register_partitions( rk, (actions & RD_KAFKA_ERR_ACTION_RETRY) ? retry_backoff_ms : 1 /*immediate*/); } } /** * @brief Send AddPartitionsToTxnRequest to the transaction coordinator. * * @locality rdkafka main thread * @locks none */ static void rd_kafka_txn_register_partitions(rd_kafka_t *rk) { char errstr[512]; rd_kafka_resp_err_t err; rd_kafka_error_t *error; rd_kafka_pid_t pid; /* Require operational state */ rd_kafka_rdlock(rk); error = rd_kafka_txn_require_state(rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION, RD_KAFKA_TXN_STATE_BEGIN_COMMIT); if (unlikely(error != NULL)) { rd_kafka_rdunlock(rk); rd_kafka_dbg(rk, EOS, "ADDPARTS", "Not registering partitions: %s", rd_kafka_error_string(error)); rd_kafka_error_destroy(error); return; } /* Get pid, checked later */ pid = rd_kafka_idemp_get_pid0(rk, RD_DONT_LOCK, rd_false); rd_kafka_rdunlock(rk); /* Transaction coordinator needs to be up */ if (!rd_kafka_broker_is_up(rk->rk_eos.txn_coord)) { rd_kafka_dbg(rk, EOS, "ADDPARTS", "Not registering partitions: " "coordinator is not available"); return; } mtx_lock(&rk->rk_eos.txn_pending_lock); if (TAILQ_EMPTY(&rk->rk_eos.txn_pending_rktps)) { /* No pending partitions to register */ mtx_unlock(&rk->rk_eos.txn_pending_lock); return; } if (!TAILQ_EMPTY(&rk->rk_eos.txn_waitresp_rktps)) { /* Only allow one outstanding AddPartitionsToTxnRequest */ mtx_unlock(&rk->rk_eos.txn_pending_lock); rd_kafka_dbg(rk, EOS, "ADDPARTS", "Not registering partitions: waiting for " "previous AddPartitionsToTxn request to complete"); return; } /* Require valid pid */ if (unlikely(!rd_kafka_pid_valid(pid))) { mtx_unlock(&rk->rk_eos.txn_pending_lock); rd_kafka_dbg(rk, EOS, "ADDPARTS", "Not registering partitions: " "No PID available (idempotence state %s)", rd_kafka_idemp_state2str(rk->rk_eos.idemp_state)); rd_dassert(!*"BUG: No PID despite proper transaction state"); return; } /* Send request to coordinator */ err = rd_kafka_AddPartitionsToTxnRequest( rk->rk_eos.txn_coord, rk->rk_conf.eos.transactional_id, pid, &rk->rk_eos.txn_pending_rktps, errstr, sizeof(errstr), RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_txn_handle_AddPartitionsToTxn, NULL); if (err) { mtx_unlock(&rk->rk_eos.txn_pending_lock); rd_kafka_dbg(rk, EOS, "ADDPARTS", "Not registering partitions: %s", errstr); return; } /* Move all pending partitions to wait-response list. * No need to keep waitresp sorted. */ TAILQ_CONCAT(&rk->rk_eos.txn_waitresp_rktps, &rk->rk_eos.txn_pending_rktps, rktp_txnlink); mtx_unlock(&rk->rk_eos.txn_pending_lock); rk->rk_eos.txn_req_cnt++; rd_rkb_dbg(rk->rk_eos.txn_coord, EOS, "ADDPARTS", "Registering partitions with transaction"); } static void rd_kafka_txn_register_partitions_tmr_cb(rd_kafka_timers_t *rkts, void *arg) { rd_kafka_t *rk = arg; rd_kafka_txn_register_partitions(rk); } /** * @brief Schedule register_partitions() as soon as possible. * * @locality any * @locks any */ void rd_kafka_txn_schedule_register_partitions(rd_kafka_t *rk, int backoff_ms) { rd_kafka_timer_start_oneshot( &rk->rk_timers, &rk->rk_eos.txn_register_parts_tmr, rd_false /*dont-restart*/, backoff_ms ? backoff_ms * 1000 : 1 /* immediate */, rd_kafka_txn_register_partitions_tmr_cb, rk); } /** * @brief Clears \p flag from all rktps and destroys them, emptying * and reinitializing the \p tqh. */ static void rd_kafka_txn_clear_partitions_flag(rd_kafka_toppar_tqhead_t *tqh, int flag) { rd_kafka_toppar_t *rktp, *tmp; TAILQ_FOREACH_SAFE(rktp, tqh, rktp_txnlink, tmp) { rd_kafka_toppar_lock(rktp); rd_dassert(rktp->rktp_flags & flag); rktp->rktp_flags &= ~flag; rd_kafka_toppar_unlock(rktp); rd_kafka_toppar_destroy(rktp); } TAILQ_INIT(tqh); } /** * @brief Clear all pending partitions. * * @locks txn_pending_lock MUST be held */ static void rd_kafka_txn_clear_pending_partitions(rd_kafka_t *rk) { rd_kafka_txn_clear_partitions_flag(&rk->rk_eos.txn_pending_rktps, RD_KAFKA_TOPPAR_F_PEND_TXN); rd_kafka_txn_clear_partitions_flag(&rk->rk_eos.txn_waitresp_rktps, RD_KAFKA_TOPPAR_F_PEND_TXN); } /** * @brief Clear all added partitions. * * @locks rd_kafka_wrlock(rk) MUST be held */ static void rd_kafka_txn_clear_partitions(rd_kafka_t *rk) { rd_kafka_txn_clear_partitions_flag(&rk->rk_eos.txn_rktps, RD_KAFKA_TOPPAR_F_IN_TXN); } /** * @brief Async handler for init_transactions() * * @locks none * @locality rdkafka main thread */ static rd_kafka_op_res_t rd_kafka_txn_op_init_transactions(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { rd_kafka_error_t *error; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; rd_kafka_wrlock(rk); if ((error = rd_kafka_txn_require_state( rk, RD_KAFKA_TXN_STATE_INIT, RD_KAFKA_TXN_STATE_WAIT_PID, RD_KAFKA_TXN_STATE_READY_NOT_ACKED))) { rd_kafka_wrunlock(rk); rd_kafka_txn_curr_api_set_result(rk, 0, error); } else if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_READY_NOT_ACKED) { /* A previous init_transactions() called finished successfully * after timeout, the application has called init_transactions() * again, we do nothin here, ack_init_transactions() will * transition the state from READY_NOT_ACKED to READY. */ rd_kafka_wrunlock(rk); } else { /* Possibly a no-op if already in WAIT_PID state */ rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_WAIT_PID); rk->rk_eos.txn_init_err = RD_KAFKA_RESP_ERR_NO_ERROR; rd_kafka_wrunlock(rk); /* Start idempotent producer to acquire PID */ rd_kafka_idemp_start(rk, rd_true /*immediately*/); /* Do not call curr_api_set_result, it will be triggered from * idemp_state_change() when the PID has been retrieved. */ } return RD_KAFKA_OP_RES_HANDLED; } /** * @brief Async handler for the application to acknowledge * successful background completion of init_transactions(). * * @locks none * @locality rdkafka main thread */ static rd_kafka_op_res_t rd_kafka_txn_op_ack_init_transactions(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { rd_kafka_error_t *error; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; rd_kafka_wrlock(rk); if (!(error = rd_kafka_txn_require_state( rk, RD_KAFKA_TXN_STATE_READY_NOT_ACKED))) rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_READY); rd_kafka_wrunlock(rk); rd_kafka_txn_curr_api_set_result(rk, 0, error); return RD_KAFKA_OP_RES_HANDLED; } rd_kafka_error_t *rd_kafka_init_transactions(rd_kafka_t *rk, int timeout_ms) { rd_kafka_error_t *error; rd_ts_t abs_timeout; /* Cap actual timeout to transaction.timeout.ms * 2 when an infinite * timeout is provided, this is to make sure the call doesn't block * indefinitely in case a coordinator is not available. * This is only needed for init_transactions() since there is no * coordinator to time us out yet. */ if (timeout_ms == RD_POLL_INFINITE && /* Avoid overflow */ rk->rk_conf.eos.transaction_timeout_ms < INT_MAX / 2) timeout_ms = rk->rk_conf.eos.transaction_timeout_ms * 2; if ((error = rd_kafka_txn_curr_api_begin(rk, "init_transactions", rd_false /* no cap */, timeout_ms, &abs_timeout))) return error; /* init_transactions() will continue to operate in the background * if the timeout expires, and the application may call * init_transactions() again to resume the initialization * process. * For this reason we need two states: * - TXN_STATE_READY_NOT_ACKED for when initialization is done * but the API call timed out prior to success, meaning the * application does not know initialization finished and * is thus not allowed to call sub-sequent txn APIs, e.g. begin..() * - TXN_STATE_READY for when initialization is done and this * function has returned successfully to the application. * * And due to the two states we need two calls to the rdkafka main * thread (to keep txn_state synchronization in one place). */ /* First call is to trigger initialization */ if ((error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_init_transactions, abs_timeout))) { if (rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT) { /* See if there's a more meaningful txn_init_err set * by idempo that we can return. */ rd_kafka_resp_err_t err; rd_kafka_rdlock(rk); err = rd_kafka_txn_normalize_err(rk->rk_eos.txn_init_err); rd_kafka_rdunlock(rk); if (err && err != RD_KAFKA_RESP_ERR__TIMED_OUT) { rd_kafka_error_destroy(error); error = rd_kafka_error_new_retriable( err, "Failed to initialize Producer ID: %s", rd_kafka_err2str(err)); } } return rd_kafka_txn_curr_api_return(rk, rd_true, error); } /* Second call is to transition from READY_NOT_ACKED -> READY, * if necessary. */ error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_ack_init_transactions, /* Timeout must be infinite since this is * a synchronization point. * The call is immediate though, so this * will not block. */ RD_POLL_INFINITE); return rd_kafka_txn_curr_api_return(rk, /* not resumable at this point */ rd_false, error); } /** * @brief Handler for begin_transaction() * * @locks none * @locality rdkafka main thread */ static rd_kafka_op_res_t rd_kafka_txn_op_begin_transaction(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { rd_kafka_error_t *error; rd_bool_t wakeup_brokers = rd_false; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; rd_kafka_wrlock(rk); if (!(error = rd_kafka_txn_require_state(rk, RD_KAFKA_TXN_STATE_READY))) { rd_assert(TAILQ_EMPTY(&rk->rk_eos.txn_rktps)); rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION); rd_assert(rk->rk_eos.txn_req_cnt == 0); rd_atomic64_set(&rk->rk_eos.txn_dr_fails, 0); rk->rk_eos.txn_err = RD_KAFKA_RESP_ERR_NO_ERROR; RD_IF_FREE(rk->rk_eos.txn_errstr, rd_free); rk->rk_eos.txn_errstr = NULL; /* Wake up all broker threads (that may have messages to send * that were waiting for this transaction state. * But needs to be done below with no lock held. */ wakeup_brokers = rd_true; } rd_kafka_wrunlock(rk); if (wakeup_brokers) rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT, "begin transaction"); rd_kafka_txn_curr_api_set_result(rk, 0, error); return RD_KAFKA_OP_RES_HANDLED; } rd_kafka_error_t *rd_kafka_begin_transaction(rd_kafka_t *rk) { rd_kafka_error_t *error; if ((error = rd_kafka_txn_curr_api_begin(rk, "begin_transaction", rd_false, 0, NULL))) return error; error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_begin_transaction, RD_POLL_INFINITE); return rd_kafka_txn_curr_api_return(rk, rd_false /*not resumable*/, error); } static rd_kafka_resp_err_t rd_kafka_txn_send_TxnOffsetCommitRequest(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, void *reply_opaque); /** * @brief Handle TxnOffsetCommitResponse * * @locality rdkafka main thread * @locks none */ static void rd_kafka_txn_handle_TxnOffsetCommit(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, void *opaque) { const int log_decode_errors = LOG_ERR; rd_kafka_op_t *rko = opaque; int actions = 0; rd_kafka_topic_partition_list_t *partitions = NULL; char errstr[512]; *errstr = '\0'; if (err) goto done; rd_kafka_buf_read_throttle_time(rkbuf); const rd_kafka_topic_partition_field_t fields[] = { RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, RD_KAFKA_TOPIC_PARTITION_FIELD_ERR, RD_KAFKA_TOPIC_PARTITION_FIELD_END}; partitions = rd_kafka_buf_read_topic_partitions(rkbuf, 0, fields); if (!partitions) goto err_parse; err = rd_kafka_topic_partition_list_get_err(partitions); if (err) { char errparts[256]; rd_kafka_topic_partition_list_str(partitions, errparts, sizeof(errparts), RD_KAFKA_FMT_F_ONLY_ERR); rd_snprintf(errstr, sizeof(errstr), "Failed to commit offsets to transaction on " "broker %s: %s " "(after %dms)", rd_kafka_broker_name(rkb), errparts, (int)(request->rkbuf_ts_sent / 1000)); } goto done; err_parse: err = rkbuf->rkbuf_err; done: if (err) { if (!*errstr) { rd_snprintf(errstr, sizeof(errstr), "Failed to commit offsets to " "transaction on broker %s: %s " "(after %d ms)", rkb ? rd_kafka_broker_name(rkb) : "(none)", rd_kafka_err2str(err), (int)(request->rkbuf_ts_sent / 1000)); } } if (partitions) rd_kafka_topic_partition_list_destroy(partitions); switch (err) { case RD_KAFKA_RESP_ERR_NO_ERROR: break; case RD_KAFKA_RESP_ERR__DESTROY: /* Producer is being terminated, ignore the response. */ case RD_KAFKA_RESP_ERR__OUTDATED: /* Set a non-actionable actions flag so that * curr_api_set_result() is called below, without * other side-effects. */ actions = RD_KAFKA_ERR_ACTION_SPECIAL; return; case RD_KAFKA_RESP_ERR_NOT_COORDINATOR: case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE: case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT: case RD_KAFKA_RESP_ERR__TRANSPORT: case RD_KAFKA_RESP_ERR__TIMED_OUT: case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE: /* Note: this is the group coordinator, not the * transaction coordinator. */ rd_kafka_coord_cache_evict(&rk->rk_coord_cache, rkb); actions |= RD_KAFKA_ERR_ACTION_RETRY; break; case RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS: case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART: actions |= RD_KAFKA_ERR_ACTION_RETRY; break; case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED: case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED: case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING: case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH: case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE: case RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT: actions |= RD_KAFKA_ERR_ACTION_FATAL; break; case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED: case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED: actions |= RD_KAFKA_ERR_ACTION_PERMANENT; break; case RD_KAFKA_RESP_ERR_ILLEGAL_GENERATION: case RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID: case RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID: actions |= RD_KAFKA_ERR_ACTION_PERMANENT; break; default: /* Unhandled error, fail transaction */ actions |= RD_KAFKA_ERR_ACTION_PERMANENT; break; } err = rd_kafka_txn_normalize_err(err); if (actions & RD_KAFKA_ERR_ACTION_FATAL) { rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err, "%s", errstr); } else if (actions & RD_KAFKA_ERR_ACTION_RETRY) { int remains_ms = rd_timeout_remains(rko->rko_u.txn.abs_timeout); if (!rd_timeout_expired(remains_ms)) { rd_kafka_coord_req( rk, RD_KAFKA_COORD_GROUP, rko->rko_u.txn.cgmetadata->group_id, rd_kafka_txn_send_TxnOffsetCommitRequest, rko, 500 /* 500ms delay before retrying */, rd_timeout_remains_limit0( remains_ms, rk->rk_conf.socket_timeout_ms), RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_txn_handle_TxnOffsetCommit, rko); return; } else if (!err) err = RD_KAFKA_RESP_ERR__TIMED_OUT; actions |= RD_KAFKA_ERR_ACTION_PERMANENT; } if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) rd_kafka_txn_set_abortable_error(rk, err, "%s", errstr); if (err) rd_kafka_txn_curr_api_set_result( rk, actions, rd_kafka_error_new(err, "%s", errstr)); else rd_kafka_txn_curr_api_set_result(rk, 0, NULL); rd_kafka_op_destroy(rko); } /** * @brief Construct and send TxnOffsetCommitRequest. * * @locality rdkafka main thread * @locks none */ static rd_kafka_resp_err_t rd_kafka_txn_send_TxnOffsetCommitRequest(rd_kafka_broker_t *rkb, rd_kafka_op_t *rko, rd_kafka_replyq_t replyq, rd_kafka_resp_cb_t *resp_cb, void *reply_opaque) { rd_kafka_t *rk = rkb->rkb_rk; rd_kafka_buf_t *rkbuf; int16_t ApiVersion; rd_kafka_pid_t pid; const rd_kafka_consumer_group_metadata_t *cgmetadata = rko->rko_u.txn.cgmetadata; int cnt; rd_kafka_rdlock(rk); if (rk->rk_eos.txn_state != RD_KAFKA_TXN_STATE_IN_TRANSACTION) { rd_kafka_rdunlock(rk); /* Do not free the rko, it is passed as the reply_opaque * on the reply queue by coord_req_fsm() when we return * an error here. */ return RD_KAFKA_RESP_ERR__STATE; } pid = rd_kafka_idemp_get_pid0(rk, RD_DONT_LOCK, rd_false); rd_kafka_rdunlock(rk); if (!rd_kafka_pid_valid(pid)) { /* Do not free the rko, it is passed as the reply_opaque * on the reply queue by coord_req_fsm() when we return * an error here. */ return RD_KAFKA_RESP_ERR__STATE; } ApiVersion = rd_kafka_broker_ApiVersion_supported( rkb, RD_KAFKAP_TxnOffsetCommit, 0, 3, NULL); if (ApiVersion == -1) { /* Do not free the rko, it is passed as the reply_opaque * on the reply queue by coord_req_fsm() when we return * an error here. */ return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; } rkbuf = rd_kafka_buf_new_flexver_request( rkb, RD_KAFKAP_TxnOffsetCommit, 1, rko->rko_u.txn.offsets->cnt * 50, ApiVersion >= 3); /* transactional_id */ rd_kafka_buf_write_str(rkbuf, rk->rk_conf.eos.transactional_id, -1); /* group_id */ rd_kafka_buf_write_str(rkbuf, rko->rko_u.txn.cgmetadata->group_id, -1); /* PID */ rd_kafka_buf_write_i64(rkbuf, pid.id); rd_kafka_buf_write_i16(rkbuf, pid.epoch); if (ApiVersion >= 3) { /* GenerationId */ rd_kafka_buf_write_i32(rkbuf, cgmetadata->generation_id); /* MemberId */ rd_kafka_buf_write_str(rkbuf, cgmetadata->member_id, -1); /* GroupInstanceId */ rd_kafka_buf_write_str(rkbuf, cgmetadata->group_instance_id, -1); } /* Write per-partition offsets list */ const rd_kafka_topic_partition_field_t fields[] = { RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, RD_KAFKA_TOPIC_PARTITION_FIELD_OFFSET, ApiVersion >= 2 ? RD_KAFKA_TOPIC_PARTITION_FIELD_EPOCH : RD_KAFKA_TOPIC_PARTITION_FIELD_NOOP, RD_KAFKA_TOPIC_PARTITION_FIELD_METADATA, RD_KAFKA_TOPIC_PARTITION_FIELD_END}; cnt = rd_kafka_buf_write_topic_partitions( rkbuf, rko->rko_u.txn.offsets, rd_true /*skip invalid offsets*/, rd_false /*any offset*/, fields); if (!cnt) { /* No valid partition offsets, don't commit. */ rd_kafka_buf_destroy(rkbuf); /* Do not free the rko, it is passed as the reply_opaque * on the reply queue by coord_req_fsm() when we return * an error here. */ return RD_KAFKA_RESP_ERR__NO_OFFSET; } rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); rkbuf->rkbuf_max_retries = RD_KAFKA_REQUEST_MAX_RETRIES; rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, reply_opaque); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief Handle AddOffsetsToTxnResponse * * @locality rdkafka main thread * @locks none */ static void rd_kafka_txn_handle_AddOffsetsToTxn(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, void *opaque) { const int log_decode_errors = LOG_ERR; rd_kafka_op_t *rko = opaque; int16_t ErrorCode; int actions = 0; int remains_ms; if (err == RD_KAFKA_RESP_ERR__DESTROY) { rd_kafka_op_destroy(rko); return; } if (err) goto done; rd_kafka_buf_read_throttle_time(rkbuf); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); err = ErrorCode; goto done; err_parse: err = rkbuf->rkbuf_err; done: if (err) { rd_assert(rk->rk_eos.txn_req_cnt > 0); rk->rk_eos.txn_req_cnt--; } remains_ms = rd_timeout_remains(rko->rko_u.txn.abs_timeout); if (rd_timeout_expired(remains_ms) && !err) err = RD_KAFKA_RESP_ERR__TIMED_OUT; switch (err) { case RD_KAFKA_RESP_ERR_NO_ERROR: break; case RD_KAFKA_RESP_ERR__DESTROY: /* Producer is being terminated, ignore the response. */ case RD_KAFKA_RESP_ERR__OUTDATED: /* Set a non-actionable actions flag so that * curr_api_set_result() is called below, without * other side-effects. */ actions = RD_KAFKA_ERR_ACTION_SPECIAL; break; case RD_KAFKA_RESP_ERR__TRANSPORT: case RD_KAFKA_RESP_ERR__TIMED_OUT: /* For these errors we can't be sure if the * request was received by the broker or not, * so increase the txn_req_cnt back up as if * they were received so that and EndTxnRequest * is sent on abort_transaction(). */ rk->rk_eos.txn_req_cnt++; /* FALLTHRU */ case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE: case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE: case RD_KAFKA_RESP_ERR_NOT_COORDINATOR: case RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT: actions |= RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_REFRESH; break; case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED: case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED: case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH: case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE: case RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT: actions |= RD_KAFKA_ERR_ACTION_FATAL; break; case RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED: case RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED: actions |= RD_KAFKA_ERR_ACTION_PERMANENT; break; case RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART: case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: case RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS: actions |= RD_KAFKA_ERR_ACTION_RETRY; break; default: /* All unhandled errors are permanent */ actions |= RD_KAFKA_ERR_ACTION_PERMANENT; break; } err = rd_kafka_txn_normalize_err(err); rd_kafka_dbg(rk, EOS, "ADDOFFSETS", "AddOffsetsToTxn response from %s: %s (%s)", rkb ? rd_kafka_broker_name(rkb) : "(none)", rd_kafka_err2name(err), rd_kafka_actions2str(actions)); /* All unhandled errors are considered permanent */ if (err && !actions) actions |= RD_KAFKA_ERR_ACTION_PERMANENT; if (actions & RD_KAFKA_ERR_ACTION_FATAL) { rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err, "Failed to add offsets to " "transaction: %s", rd_kafka_err2str(err)); } else { if (actions & RD_KAFKA_ERR_ACTION_REFRESH) rd_kafka_txn_coord_timer_start(rk, 50); if (actions & RD_KAFKA_ERR_ACTION_RETRY) { rd_rkb_dbg( rkb, EOS, "ADDOFFSETS", "Failed to add offsets to transaction on " "broker %s: %s (after %dms, %dms remains): " "error is retriable", rd_kafka_broker_name(rkb), rd_kafka_err2str(err), (int)(request->rkbuf_ts_sent / 1000), remains_ms); if (!rd_timeout_expired(remains_ms) && rd_kafka_buf_retry(rk->rk_eos.txn_coord, request)) { rk->rk_eos.txn_req_cnt++; return; } /* Propagate as retriable error through * api_reply() below */ } } if (err) rd_rkb_log(rkb, LOG_ERR, "ADDOFFSETS", "Failed to add offsets to transaction on broker %s: " "%s", rkb ? rd_kafka_broker_name(rkb) : "(none)", rd_kafka_err2str(err)); if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) rd_kafka_txn_set_abortable_error( rk, err, "Failed to add offsets to " "transaction on broker %s: " "%s (after %dms)", rd_kafka_broker_name(rkb), rd_kafka_err2str(err), (int)(request->rkbuf_ts_sent / 1000)); if (!err) { /* Step 2: Commit offsets to transaction on the * group coordinator. */ rd_kafka_coord_req( rk, RD_KAFKA_COORD_GROUP, rko->rko_u.txn.cgmetadata->group_id, rd_kafka_txn_send_TxnOffsetCommitRequest, rko, 0 /* no delay */, rd_timeout_remains_limit0(remains_ms, rk->rk_conf.socket_timeout_ms), RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_txn_handle_TxnOffsetCommit, rko); } else { rd_kafka_txn_curr_api_set_result( rk, actions, rd_kafka_error_new( err, "Failed to add offsets to transaction on " "broker %s: %s (after %dms)", rd_kafka_broker_name(rkb), rd_kafka_err2str(err), (int)(request->rkbuf_ts_sent / 1000))); rd_kafka_op_destroy(rko); } } /** * @brief Async handler for send_offsets_to_transaction() * * @locks none * @locality rdkafka main thread */ static rd_kafka_op_res_t rd_kafka_txn_op_send_offsets_to_transaction(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; char errstr[512]; rd_kafka_error_t *error; rd_kafka_pid_t pid; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; *errstr = '\0'; rd_kafka_wrlock(rk); if ((error = rd_kafka_txn_require_state( rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION))) { rd_kafka_wrunlock(rk); goto err; } rd_kafka_wrunlock(rk); pid = rd_kafka_idemp_get_pid0(rk, RD_DONT_LOCK, rd_false); if (!rd_kafka_pid_valid(pid)) { rd_dassert(!*"BUG: No PID despite proper transaction state"); error = rd_kafka_error_new_retriable( RD_KAFKA_RESP_ERR__STATE, "No PID available (idempotence state %s)", rd_kafka_idemp_state2str(rk->rk_eos.idemp_state)); goto err; } /* This is a multi-stage operation, consisting of: * 1) send AddOffsetsToTxnRequest to transaction coordinator. * 2) send TxnOffsetCommitRequest to group coordinator. */ err = rd_kafka_AddOffsetsToTxnRequest( rk->rk_eos.txn_coord, rk->rk_conf.eos.transactional_id, pid, rko->rko_u.txn.cgmetadata->group_id, errstr, sizeof(errstr), RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_txn_handle_AddOffsetsToTxn, rko); if (err) { error = rd_kafka_error_new_retriable(err, "%s", errstr); goto err; } rk->rk_eos.txn_req_cnt++; return RD_KAFKA_OP_RES_KEEP; /* the rko is passed to AddOffsetsToTxn */ err: rd_kafka_txn_curr_api_set_result(rk, 0, error); return RD_KAFKA_OP_RES_HANDLED; } /** * error returns: * ERR__TRANSPORT - retryable */ rd_kafka_error_t *rd_kafka_send_offsets_to_transaction( rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *offsets, const rd_kafka_consumer_group_metadata_t *cgmetadata, int timeout_ms) { rd_kafka_error_t *error; rd_kafka_op_t *rko; rd_kafka_topic_partition_list_t *valid_offsets; rd_ts_t abs_timeout; if (!cgmetadata || !offsets) return rd_kafka_error_new( RD_KAFKA_RESP_ERR__INVALID_ARG, "cgmetadata and offsets are required parameters"); if ((error = rd_kafka_txn_curr_api_begin( rk, "send_offsets_to_transaction", /* Cap timeout to txn timeout */ rd_true, timeout_ms, &abs_timeout))) return error; valid_offsets = rd_kafka_topic_partition_list_match( offsets, rd_kafka_topic_partition_match_valid_offset, NULL); if (valid_offsets->cnt == 0) { /* No valid offsets, e.g., nothing was consumed, * this is not an error, do nothing. */ rd_kafka_topic_partition_list_destroy(valid_offsets); return rd_kafka_txn_curr_api_return(rk, rd_false, NULL); } rd_kafka_topic_partition_list_sort_by_topic(valid_offsets); rko = rd_kafka_op_new_cb(rk, RD_KAFKA_OP_TXN, rd_kafka_txn_op_send_offsets_to_transaction); rko->rko_u.txn.offsets = valid_offsets; rko->rko_u.txn.cgmetadata = rd_kafka_consumer_group_metadata_dup(cgmetadata); rko->rko_u.txn.abs_timeout = abs_timeout; /* Timeout is enforced by op_send_offsets_to_transaction() */ error = rd_kafka_txn_op_req1(rk, rko, RD_POLL_INFINITE); return rd_kafka_txn_curr_api_return(rk, rd_false, error); } /** * @brief Successfully complete the transaction. * * Current state must be either COMMIT_NOT_ACKED or ABORT_NOT_ACKED. * * @locality rdkafka main thread * @locks rd_kafka_wrlock(rk) MUST be held */ static void rd_kafka_txn_complete(rd_kafka_t *rk, rd_bool_t is_commit) { rd_kafka_dbg(rk, EOS, "TXNCOMPLETE", "Transaction successfully %s", is_commit ? "committed" : "aborted"); /* Clear all transaction partition state */ rd_kafka_txn_clear_pending_partitions(rk); rd_kafka_txn_clear_partitions(rk); rk->rk_eos.txn_requires_epoch_bump = rd_false; rk->rk_eos.txn_req_cnt = 0; rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_READY); } /** * @brief EndTxn (commit or abort of transaction on the coordinator) is done, * or was skipped. * Continue with next steps (if any) before completing the local * transaction state. * * @locality rdkafka main thread * @locks_acquired rd_kafka_wrlock(rk), rk->rk_eos.txn_curr_api.lock */ static void rd_kafka_txn_endtxn_complete(rd_kafka_t *rk) { rd_bool_t is_commit; mtx_lock(&rk->rk_eos.txn_curr_api.lock); is_commit = !strcmp(rk->rk_eos.txn_curr_api.name, "commit_transaction"); mtx_unlock(&rk->rk_eos.txn_curr_api.lock); rd_kafka_wrlock(rk); /* If an epoch bump is required, let idempo handle it. * When the bump is finished we'll be notified through * idemp_state_change() and we can complete the local transaction state * and set the final API call result. * If the bumping fails a fatal error will be raised. */ if (rk->rk_eos.txn_requires_epoch_bump) { rd_kafka_resp_err_t bump_err = rk->rk_eos.txn_err; rd_dassert(!is_commit); rd_kafka_wrunlock(rk); /* After the epoch bump is done we'll be transitioned * to the next state. */ rd_kafka_idemp_drain_epoch_bump0( rk, rd_false /* don't allow txn abort */, bump_err, "Transaction aborted: %s", rd_kafka_err2str(bump_err)); return; } if (is_commit) rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED); else rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED); rd_kafka_wrunlock(rk); rd_kafka_txn_curr_api_set_result(rk, 0, NULL); } /** * @brief Handle EndTxnResponse (commit or abort) * * @locality rdkafka main thread * @locks none */ static void rd_kafka_txn_handle_EndTxn(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, void *opaque) { const int log_decode_errors = LOG_ERR; int16_t ErrorCode; int actions = 0; rd_bool_t is_commit, may_retry = rd_false, require_bump = rd_false; if (err == RD_KAFKA_RESP_ERR__DESTROY) return; is_commit = request->rkbuf_u.EndTxn.commit; if (err) goto err; rd_kafka_buf_read_throttle_time(rkbuf); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); err = ErrorCode; goto err; err_parse: err = rkbuf->rkbuf_err; /* FALLTHRU */ err: rd_kafka_wrlock(rk); if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION) { may_retry = rd_true; } else if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION) { may_retry = rd_true; } else if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_ABORTABLE_ERROR) { /* Transaction has failed locally, typically due to timeout. * Get the transaction error and return that instead of * this error. * This is a tricky state since the transaction will have * failed locally but the EndTxn(commit) may have succeeded. */ if (err) { rd_kafka_txn_curr_api_set_result( rk, RD_KAFKA_ERR_ACTION_PERMANENT, rd_kafka_error_new( rk->rk_eos.txn_err, "EndTxn failed with %s but transaction " "had already failed due to: %s", rd_kafka_err2name(err), rk->rk_eos.txn_errstr)); } else { /* If the transaction has failed locally but * this EndTxn commit succeeded we'll raise * a fatal error. */ if (is_commit) rd_kafka_txn_curr_api_set_result( rk, RD_KAFKA_ERR_ACTION_FATAL, rd_kafka_error_new( rk->rk_eos.txn_err, "Transaction commit succeeded on the " "broker but the transaction " "had already failed locally due to: %s", rk->rk_eos.txn_errstr)); else rd_kafka_txn_curr_api_set_result( rk, RD_KAFKA_ERR_ACTION_PERMANENT, rd_kafka_error_new( rk->rk_eos.txn_err, "Transaction abort succeeded on the " "broker but the transaction" "had already failed locally due to: %s", rk->rk_eos.txn_errstr)); } rd_kafka_wrunlock(rk); return; } else if (!err) { /* Request is outdated */ err = RD_KAFKA_RESP_ERR__OUTDATED; } rd_kafka_dbg(rk, EOS, "ENDTXN", "EndTxn returned %s in state %s (may_retry=%s)", rd_kafka_err2name(err), rd_kafka_txn_state2str(rk->rk_eos.txn_state), RD_STR_ToF(may_retry)); rd_kafka_wrunlock(rk); switch (err) { case RD_KAFKA_RESP_ERR_NO_ERROR: break; case RD_KAFKA_RESP_ERR__DESTROY: /* Producer is being terminated, ignore the response. */ case RD_KAFKA_RESP_ERR__OUTDATED: /* Transactional state no longer relevant for this * outdated response. */ break; case RD_KAFKA_RESP_ERR__TIMED_OUT: case RD_KAFKA_RESP_ERR__TIMED_OUT_QUEUE: /* Request timeout */ /* FALLTHRU */ case RD_KAFKA_RESP_ERR__TRANSPORT: actions |= RD_KAFKA_ERR_ACTION_RETRY | RD_KAFKA_ERR_ACTION_REFRESH; break; case RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE: case RD_KAFKA_RESP_ERR_NOT_COORDINATOR: rd_kafka_wrlock(rk); rd_kafka_txn_coord_set(rk, NULL, "EndTxn failed: %s", rd_kafka_err2str(err)); rd_kafka_wrunlock(rk); actions |= RD_KAFKA_ERR_ACTION_RETRY; break; case RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS: case RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS: actions |= RD_KAFKA_ERR_ACTION_RETRY; break; case RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID: case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING: actions |= RD_KAFKA_ERR_ACTION_PERMANENT; require_bump = rd_true; break; case RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH: case RD_KAFKA_RESP_ERR_PRODUCER_FENCED: case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED: case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED: case RD_KAFKA_RESP_ERR_INVALID_TXN_STATE: actions |= RD_KAFKA_ERR_ACTION_FATAL; break; default: /* All unhandled errors are permanent */ actions |= RD_KAFKA_ERR_ACTION_PERMANENT; } err = rd_kafka_txn_normalize_err(err); if (actions & RD_KAFKA_ERR_ACTION_FATAL) { rd_kafka_txn_set_fatal_error(rk, RD_DO_LOCK, err, "Failed to end transaction: %s", rd_kafka_err2str(err)); } else { if (actions & RD_KAFKA_ERR_ACTION_REFRESH) rd_kafka_txn_coord_timer_start(rk, 50); if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) { if (require_bump && !is_commit) { /* Abort failed to due invalid PID, starting * with KIP-360 we can have idempo sort out * epoch bumping. * When the epoch has been bumped we'll detect * the idemp_state_change and complete the * current API call. */ rd_kafka_idemp_drain_epoch_bump0( rk, /* don't allow txn abort */ rd_false, err, "EndTxn %s failed: %s", is_commit ? "commit" : "abort", rd_kafka_err2str(err)); return; } /* For aborts we need to revert the state back to * BEGIN_ABORT so that the abort can be retried from * the beginning in op_abort_transaction(). */ rd_kafka_wrlock(rk); if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION) rd_kafka_txn_set_state( rk, RD_KAFKA_TXN_STATE_BEGIN_ABORT); rd_kafka_wrunlock(rk); rd_kafka_txn_set_abortable_error0( rk, err, require_bump, "Failed to end transaction: " "%s", rd_kafka_err2str(err)); } else if (may_retry && actions & RD_KAFKA_ERR_ACTION_RETRY && rd_kafka_buf_retry(rkb, request)) return; } if (err) rd_kafka_txn_curr_api_set_result( rk, actions, rd_kafka_error_new(err, "EndTxn %s failed: %s", is_commit ? "commit" : "abort", rd_kafka_err2str(err))); else rd_kafka_txn_endtxn_complete(rk); } /** * @brief Handler for commit_transaction() * * @locks none * @locality rdkafka main thread */ static rd_kafka_op_res_t rd_kafka_txn_op_commit_transaction(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { rd_kafka_error_t *error; rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_pid_t pid; int64_t dr_fails; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; rd_kafka_wrlock(rk); if ((error = rd_kafka_txn_require_state( rk, RD_KAFKA_TXN_STATE_BEGIN_COMMIT, RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION, RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED))) goto done; if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED) { /* A previous call to commit_transaction() timed out but the * commit completed since then, we still * need to wait for the application to call commit_transaction() * again to resume the call, and it just did. */ goto done; } else if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION) { /* A previous call to commit_transaction() timed out but the * commit is still in progress, we still * need to wait for the application to call commit_transaction() * again to resume the call, and it just did. */ rd_kafka_wrunlock(rk); return RD_KAFKA_OP_RES_HANDLED; } /* If any messages failed delivery the transaction must be aborted. */ dr_fails = rd_atomic64_get(&rk->rk_eos.txn_dr_fails); if (unlikely(dr_fails > 0)) { error = rd_kafka_error_new_txn_requires_abort( RD_KAFKA_RESP_ERR__INCONSISTENT, "%" PRId64 " message(s) failed delivery " "(see individual delivery reports)", dr_fails); goto done; } if (!rk->rk_eos.txn_req_cnt) { /* If there were no messages produced, or no send_offsets, * in this transaction, simply complete the transaction * without sending anything to the transaction coordinator * (since it will not have any txn state). */ rd_kafka_dbg(rk, EOS, "TXNCOMMIT", "No partitions registered: not sending EndTxn"); rd_kafka_wrunlock(rk); rd_kafka_txn_endtxn_complete(rk); return RD_KAFKA_OP_RES_HANDLED; } pid = rd_kafka_idemp_get_pid0(rk, RD_DONT_LOCK, rd_false); if (!rd_kafka_pid_valid(pid)) { rd_dassert(!*"BUG: No PID despite proper transaction state"); error = rd_kafka_error_new_retriable( RD_KAFKA_RESP_ERR__STATE, "No PID available (idempotence state %s)", rd_kafka_idemp_state2str(rk->rk_eos.idemp_state)); goto done; } err = rd_kafka_EndTxnRequest( rk->rk_eos.txn_coord, rk->rk_conf.eos.transactional_id, pid, rd_true /* commit */, errstr, sizeof(errstr), RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_txn_handle_EndTxn, NULL); if (err) { error = rd_kafka_error_new_retriable(err, "%s", errstr); goto done; } rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION); rd_kafka_wrunlock(rk); return RD_KAFKA_OP_RES_HANDLED; done: rd_kafka_wrunlock(rk); /* If the returned error is an abortable error * also set the current transaction state accordingly. */ if (rd_kafka_error_txn_requires_abort(error)) rd_kafka_txn_set_abortable_error(rk, rd_kafka_error_code(error), "%s", rd_kafka_error_string(error)); rd_kafka_txn_curr_api_set_result(rk, 0, error); return RD_KAFKA_OP_RES_HANDLED; } /** * @brief Handler for commit_transaction()'s first phase: begin commit * * @locks none * @locality rdkafka main thread */ static rd_kafka_op_res_t rd_kafka_txn_op_begin_commit(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { rd_kafka_error_t *error; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; rd_kafka_wrlock(rk); error = rd_kafka_txn_require_state( rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION, RD_KAFKA_TXN_STATE_BEGIN_COMMIT, RD_KAFKA_TXN_STATE_COMMITTING_TRANSACTION, RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED); if (!error && rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_IN_TRANSACTION) { /* Transition to BEGIN_COMMIT state if no error and commit not * already started. */ rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_BEGIN_COMMIT); } rd_kafka_wrunlock(rk); rd_kafka_txn_curr_api_set_result(rk, 0, error); return RD_KAFKA_OP_RES_HANDLED; } /** * @brief Handler for last ack of commit_transaction() * * @locks none * @locality rdkafka main thread */ static rd_kafka_op_res_t rd_kafka_txn_op_commit_transaction_ack(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { rd_kafka_error_t *error; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; rd_kafka_wrlock(rk); if (!(error = rd_kafka_txn_require_state( rk, RD_KAFKA_TXN_STATE_COMMIT_NOT_ACKED))) { rd_kafka_dbg(rk, EOS, "TXNCOMMIT", "Committed transaction now acked by application"); rd_kafka_txn_complete(rk, rd_true /*is commit*/); } rd_kafka_wrunlock(rk); rd_kafka_txn_curr_api_set_result(rk, 0, error); return RD_KAFKA_OP_RES_HANDLED; } rd_kafka_error_t *rd_kafka_commit_transaction(rd_kafka_t *rk, int timeout_ms) { rd_kafka_error_t *error; rd_kafka_resp_err_t err; rd_ts_t abs_timeout; /* The commit is in three phases: * - begin commit: wait for outstanding messages to be produced, * disallow new messages from being produced * by application. * - commit: commit transaction. * - commit not acked: commit done, but waiting for application * to acknowledge by completing this API call. */ if ((error = rd_kafka_txn_curr_api_begin(rk, "commit_transaction", rd_false /* no cap */, timeout_ms, &abs_timeout))) return error; /* Begin commit */ if ((error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_begin_commit, abs_timeout))) return rd_kafka_txn_curr_api_return(rk, /* not resumable yet */ rd_false, error); rd_kafka_dbg(rk, EOS, "TXNCOMMIT", "Flushing %d outstanding message(s) prior to commit", rd_kafka_outq_len(rk)); /* Wait for queued messages to be delivered, limited by * the remaining transaction lifetime. */ if ((err = rd_kafka_flush(rk, rd_timeout_remains(abs_timeout)))) { rd_kafka_dbg(rk, EOS, "TXNCOMMIT", "Flush failed (with %d messages remaining): %s", rd_kafka_outq_len(rk), rd_kafka_err2str(err)); if (err == RD_KAFKA_RESP_ERR__TIMED_OUT) error = rd_kafka_error_new_retriable( err, "Failed to flush all outstanding messages " "within the API timeout: " "%d message(s) remaining%s", rd_kafka_outq_len(rk), /* In case event queue delivery reports * are enabled and there is no dr callback * we instruct the developer to poll * the event queue separately, since we * can't do it for them. */ ((rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR) && !rk->rk_conf.dr_msg_cb && !rk->rk_conf.dr_cb) ? ": the event queue must be polled " "for delivery report events in a separate " "thread or prior to calling commit" : ""); else error = rd_kafka_error_new_retriable( err, "Failed to flush outstanding messages: %s", rd_kafka_err2str(err)); /* The commit operation is in progress in the background * and the application will need to call this API again * to resume. */ return rd_kafka_txn_curr_api_return(rk, rd_true, error); } rd_kafka_dbg(rk, EOS, "TXNCOMMIT", "Transaction commit message flush complete"); /* Commit transaction */ error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_commit_transaction, abs_timeout); if (error) return rd_kafka_txn_curr_api_return(rk, rd_true, error); /* Last call is to transition from COMMIT_NOT_ACKED to READY */ error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_commit_transaction_ack, /* Timeout must be infinite since this is * a synchronization point. * The call is immediate though, so this * will not block. */ RD_POLL_INFINITE); return rd_kafka_txn_curr_api_return(rk, /* not resumable at this point */ rd_false, error); } /** * @brief Handler for abort_transaction()'s first phase: begin abort * * @locks none * @locality rdkafka main thread */ static rd_kafka_op_res_t rd_kafka_txn_op_begin_abort(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { rd_kafka_error_t *error; rd_bool_t clear_pending = rd_false; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; rd_kafka_wrlock(rk); error = rd_kafka_txn_require_state(rk, RD_KAFKA_TXN_STATE_IN_TRANSACTION, RD_KAFKA_TXN_STATE_BEGIN_ABORT, RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION, RD_KAFKA_TXN_STATE_ABORTABLE_ERROR, RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED); if (!error && (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_IN_TRANSACTION || rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_ABORTABLE_ERROR)) { /* Transition to ABORTING_TRANSACTION state if no error and * abort not already started. */ rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_BEGIN_ABORT); clear_pending = rd_true; } rd_kafka_wrunlock(rk); if (clear_pending) { mtx_lock(&rk->rk_eos.txn_pending_lock); rd_kafka_txn_clear_pending_partitions(rk); mtx_unlock(&rk->rk_eos.txn_pending_lock); } rd_kafka_txn_curr_api_set_result(rk, 0, error); return RD_KAFKA_OP_RES_HANDLED; } /** * @brief Handler for abort_transaction() * * @locks none * @locality rdkafka main thread */ static rd_kafka_op_res_t rd_kafka_txn_op_abort_transaction(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { rd_kafka_error_t *error; rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_pid_t pid; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; rd_kafka_wrlock(rk); if ((error = rd_kafka_txn_require_state( rk, RD_KAFKA_TXN_STATE_BEGIN_ABORT, RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION, RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED))) goto done; if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED) { /* A previous call to abort_transaction() timed out but * the aborting completed since then, we still need to wait * for the application to call abort_transaction() again * to synchronize state, and it just did. */ goto done; } else if (rk->rk_eos.txn_state == RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION) { /* A previous call to abort_transaction() timed out but * the abort is still in progress, we still need to wait * for the application to call abort_transaction() again * to synchronize state, and it just did. */ rd_kafka_wrunlock(rk); return RD_KAFKA_OP_RES_HANDLED; } if (!rk->rk_eos.txn_req_cnt) { rd_kafka_dbg(rk, EOS, "TXNABORT", "No partitions registered: not sending EndTxn"); rd_kafka_wrunlock(rk); rd_kafka_txn_endtxn_complete(rk); return RD_KAFKA_OP_RES_HANDLED; } /* If the underlying idempotent producer's state indicates it * is re-acquiring its PID we need to wait for that to finish * before allowing a new begin_transaction(), and since that is * not a blocking call we need to perform that wait in this * state instead. * To recover we need to request an epoch bump from the * transaction coordinator. This is handled automatically * by the idempotent producer, so we just need to wait for * the new pid to be assigned. */ if (rk->rk_eos.idemp_state != RD_KAFKA_IDEMP_STATE_ASSIGNED && rk->rk_eos.idemp_state != RD_KAFKA_IDEMP_STATE_WAIT_TXN_ABORT) { rd_kafka_dbg(rk, EOS, "TXNABORT", "Waiting for transaction coordinator " "PID bump to complete before aborting " "transaction (idempotent producer state %s)", rd_kafka_idemp_state2str(rk->rk_eos.idemp_state)); rd_kafka_wrunlock(rk); return RD_KAFKA_OP_RES_HANDLED; } pid = rd_kafka_idemp_get_pid0(rk, RD_DONT_LOCK, rd_true); if (!rd_kafka_pid_valid(pid)) { rd_dassert(!*"BUG: No PID despite proper transaction state"); error = rd_kafka_error_new_retriable( RD_KAFKA_RESP_ERR__STATE, "No PID available (idempotence state %s)", rd_kafka_idemp_state2str(rk->rk_eos.idemp_state)); goto done; } err = rd_kafka_EndTxnRequest( rk->rk_eos.txn_coord, rk->rk_conf.eos.transactional_id, pid, rd_false /* abort */, errstr, sizeof(errstr), RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_txn_handle_EndTxn, NULL); if (err) { error = rd_kafka_error_new_retriable(err, "%s", errstr); goto done; } rd_kafka_txn_set_state(rk, RD_KAFKA_TXN_STATE_ABORTING_TRANSACTION); rd_kafka_wrunlock(rk); return RD_KAFKA_OP_RES_HANDLED; done: rd_kafka_wrunlock(rk); rd_kafka_txn_curr_api_set_result(rk, 0, error); return RD_KAFKA_OP_RES_HANDLED; } /** * @brief Handler for last ack of abort_transaction() * * @locks none * @locality rdkafka main thread */ static rd_kafka_op_res_t rd_kafka_txn_op_abort_transaction_ack(rd_kafka_t *rk, rd_kafka_q_t *rkq, rd_kafka_op_t *rko) { rd_kafka_error_t *error; if (rko->rko_err == RD_KAFKA_RESP_ERR__DESTROY) return RD_KAFKA_OP_RES_HANDLED; rd_kafka_wrlock(rk); if (!(error = rd_kafka_txn_require_state( rk, RD_KAFKA_TXN_STATE_ABORT_NOT_ACKED))) { rd_kafka_dbg(rk, EOS, "TXNABORT", "Aborted transaction now acked by application"); rd_kafka_txn_complete(rk, rd_false /*is abort*/); } rd_kafka_wrunlock(rk); rd_kafka_txn_curr_api_set_result(rk, 0, error); return RD_KAFKA_OP_RES_HANDLED; } rd_kafka_error_t *rd_kafka_abort_transaction(rd_kafka_t *rk, int timeout_ms) { rd_kafka_error_t *error; rd_kafka_resp_err_t err; rd_ts_t abs_timeout; if ((error = rd_kafka_txn_curr_api_begin(rk, "abort_transaction", rd_false /* no cap */, timeout_ms, &abs_timeout))) return error; /* The abort is multi-phase: * - set state to BEGIN_ABORT * - flush() outstanding messages * - send EndTxn */ /* Begin abort */ if ((error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_begin_abort, abs_timeout))) return rd_kafka_txn_curr_api_return(rk, /* not resumable yet */ rd_false, error); rd_kafka_dbg(rk, EOS, "TXNABORT", "Purging and flushing %d outstanding message(s) prior " "to abort", rd_kafka_outq_len(rk)); /* Purge all queued messages. * Will need to wait for messages in-flight since purging these * messages may lead to gaps in the idempotent producer sequences. */ err = rd_kafka_purge(rk, RD_KAFKA_PURGE_F_QUEUE | RD_KAFKA_PURGE_F_ABORT_TXN); /* Serve delivery reports for the purged messages. */ if ((err = rd_kafka_flush(rk, rd_timeout_remains(abs_timeout)))) { /* FIXME: Not sure these errors matter that much */ if (err == RD_KAFKA_RESP_ERR__TIMED_OUT) error = rd_kafka_error_new_retriable( err, "Failed to flush all outstanding messages " "within the API timeout: " "%d message(s) remaining%s", rd_kafka_outq_len(rk), (rk->rk_conf.enabled_events & RD_KAFKA_EVENT_DR) ? ": the event queue must be polled " "for delivery report events in a separate " "thread or prior to calling abort" : ""); else error = rd_kafka_error_new_retriable( err, "Failed to flush outstanding messages: %s", rd_kafka_err2str(err)); /* The abort operation is in progress in the background * and the application will need to call this API again * to resume. */ return rd_kafka_txn_curr_api_return(rk, rd_true, error); } rd_kafka_dbg(rk, EOS, "TXNCOMMIT", "Transaction abort message purge and flush complete"); error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_abort_transaction, abs_timeout); if (error) return rd_kafka_txn_curr_api_return(rk, rd_true, error); /* Last call is to transition from ABORT_NOT_ACKED to READY. */ error = rd_kafka_txn_op_req(rk, rd_kafka_txn_op_abort_transaction_ack, /* Timeout must be infinite since this is * a synchronization point. * The call is immediate though, so this * will not block. */ RD_POLL_INFINITE); return rd_kafka_txn_curr_api_return(rk, /* not resumable at this point */ rd_false, error); } /** * @brief Coordinator query timer * * @locality rdkafka main thread * @locks none */ static void rd_kafka_txn_coord_timer_cb(rd_kafka_timers_t *rkts, void *arg) { rd_kafka_t *rk = arg; rd_kafka_wrlock(rk); rd_kafka_txn_coord_query(rk, "Coordinator query timer"); rd_kafka_wrunlock(rk); } /** * @brief Start coord query timer if not already started. * * @locality rdkafka main thread * @locks none */ static void rd_kafka_txn_coord_timer_start(rd_kafka_t *rk, int timeout_ms) { rd_assert(rd_kafka_is_transactional(rk)); rd_kafka_timer_start_oneshot(&rk->rk_timers, &rk->rk_eos.txn_coord_tmr, /* don't restart if already started */ rd_false, 1000 * timeout_ms, rd_kafka_txn_coord_timer_cb, rk); } /** * @brief Parses and handles a FindCoordinator response. * * @locality rdkafka main thread * @locks none */ static void rd_kafka_txn_handle_FindCoordinator(rd_kafka_t *rk, rd_kafka_broker_t *rkb, rd_kafka_resp_err_t err, rd_kafka_buf_t *rkbuf, rd_kafka_buf_t *request, void *opaque) { const int log_decode_errors = LOG_ERR; int16_t ErrorCode; rd_kafkap_str_t Host; int32_t NodeId, Port; char errstr[512]; *errstr = '\0'; rk->rk_eos.txn_wait_coord = rd_false; if (err) goto err; if (request->rkbuf_reqhdr.ApiVersion >= 1) rd_kafka_buf_read_throttle_time(rkbuf); rd_kafka_buf_read_i16(rkbuf, &ErrorCode); if (request->rkbuf_reqhdr.ApiVersion >= 1) { rd_kafkap_str_t ErrorMsg; rd_kafka_buf_read_str(rkbuf, &ErrorMsg); if (ErrorCode) rd_snprintf(errstr, sizeof(errstr), "%.*s", RD_KAFKAP_STR_PR(&ErrorMsg)); } if ((err = ErrorCode)) goto err; rd_kafka_buf_read_i32(rkbuf, &NodeId); rd_kafka_buf_read_str(rkbuf, &Host); rd_kafka_buf_read_i32(rkbuf, &Port); rd_rkb_dbg(rkb, EOS, "TXNCOORD", "FindCoordinator response: " "Transaction coordinator is broker %" PRId32 " (%.*s:%d)", NodeId, RD_KAFKAP_STR_PR(&Host), (int)Port); rd_kafka_rdlock(rk); if (NodeId == -1) err = RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE; else if (!(rkb = rd_kafka_broker_find_by_nodeid(rk, NodeId))) { rd_snprintf(errstr, sizeof(errstr), "Transaction coordinator %" PRId32 " is unknown", NodeId); err = RD_KAFKA_RESP_ERR__UNKNOWN_BROKER; } rd_kafka_rdunlock(rk); if (err) goto err; rd_kafka_wrlock(rk); rd_kafka_txn_coord_set(rk, rkb, "FindCoordinator response"); rd_kafka_wrunlock(rk); rd_kafka_broker_destroy(rkb); return; err_parse: err = rkbuf->rkbuf_err; err: switch (err) { case RD_KAFKA_RESP_ERR__DESTROY: return; case RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED: case RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED: rd_kafka_wrlock(rk); rd_kafka_txn_set_fatal_error( rkb->rkb_rk, RD_DONT_LOCK, err, "Failed to find transaction coordinator: %s: %s%s%s", rd_kafka_broker_name(rkb), rd_kafka_err2str(err), *errstr ? ": " : "", errstr); rd_kafka_idemp_set_state(rk, RD_KAFKA_IDEMP_STATE_FATAL_ERROR); rd_kafka_wrunlock(rk); return; case RD_KAFKA_RESP_ERR__UNKNOWN_BROKER: rd_kafka_metadata_refresh_brokers(rk, NULL, errstr); break; default: break; } rd_kafka_wrlock(rk); rd_kafka_txn_coord_set( rk, NULL, "Failed to find transaction coordinator: %s: %s", rd_kafka_err2name(err), *errstr ? errstr : rd_kafka_err2str(err)); rd_kafka_wrunlock(rk); } /** * @brief Query for the transaction coordinator. * * @returns true if a fatal error was raised, else false. * * @locality rdkafka main thread * @locks rd_kafka_wrlock(rk) MUST be held. */ rd_bool_t rd_kafka_txn_coord_query(rd_kafka_t *rk, const char *reason) { rd_kafka_resp_err_t err; char errstr[512]; rd_kafka_broker_t *rkb; rd_assert(rd_kafka_is_transactional(rk)); if (rk->rk_eos.txn_wait_coord) { rd_kafka_dbg(rk, EOS, "TXNCOORD", "Not sending coordinator query (%s): " "waiting for previous query to finish", reason); return rd_false; } /* Find usable broker to query for the txn coordinator */ rkb = rd_kafka_idemp_broker_any(rk, &err, errstr, sizeof(errstr)); if (!rkb) { rd_kafka_dbg(rk, EOS, "TXNCOORD", "Unable to query for transaction coordinator: " "%s: %s", reason, errstr); if (rd_kafka_idemp_check_error(rk, err, errstr, rd_false)) return rd_true; rd_kafka_txn_coord_timer_start(rk, 500); return rd_false; } rd_kafka_dbg(rk, EOS, "TXNCOORD", "Querying for transaction coordinator: %s", reason); /* Send FindCoordinator request */ err = rd_kafka_FindCoordinatorRequest( rkb, RD_KAFKA_COORD_TXN, rk->rk_conf.eos.transactional_id, RD_KAFKA_REPLYQ(rk->rk_ops, 0), rd_kafka_txn_handle_FindCoordinator, NULL); if (err) { rd_snprintf(errstr, sizeof(errstr), "Failed to send coordinator query to %s: " "%s", rd_kafka_broker_name(rkb), rd_kafka_err2str(err)); rd_kafka_broker_destroy(rkb); if (rd_kafka_idemp_check_error(rk, err, errstr, rd_false)) return rd_true; /* Fatal error */ rd_kafka_txn_coord_timer_start(rk, 500); return rd_false; } rd_kafka_broker_destroy(rkb); rk->rk_eos.txn_wait_coord = rd_true; return rd_false; } /** * @brief Sets or clears the current coordinator address. * * @returns true if the coordinator was changed, else false. * * @locality rdkafka main thread * @locks rd_kafka_wrlock(rk) MUST be held */ rd_bool_t rd_kafka_txn_coord_set(rd_kafka_t *rk, rd_kafka_broker_t *rkb, const char *fmt, ...) { char buf[256]; va_list ap; va_start(ap, fmt); vsnprintf(buf, sizeof(buf), fmt, ap); va_end(ap); if (rk->rk_eos.txn_curr_coord == rkb) { if (!rkb) { rd_kafka_dbg(rk, EOS, "TXNCOORD", "%s", buf); /* Keep querying for the coordinator */ rd_kafka_txn_coord_timer_start(rk, 500); } return rd_false; } rd_kafka_dbg(rk, EOS, "TXNCOORD", "Transaction coordinator changed from %s -> %s: %s", rk->rk_eos.txn_curr_coord ? rd_kafka_broker_name(rk->rk_eos.txn_curr_coord) : "(none)", rkb ? rd_kafka_broker_name(rkb) : "(none)", buf); if (rk->rk_eos.txn_curr_coord) rd_kafka_broker_destroy(rk->rk_eos.txn_curr_coord); rk->rk_eos.txn_curr_coord = rkb; if (rkb) rd_kafka_broker_keep(rkb); rd_kafka_broker_set_nodename(rk->rk_eos.txn_coord, rk->rk_eos.txn_curr_coord); if (!rkb) { /* Lost the current coordinator, query for new coordinator */ rd_kafka_txn_coord_timer_start(rk, 500); } else { /* Trigger PID state machine */ rd_kafka_idemp_pid_fsm(rk); } return rd_true; } /** * @brief Coordinator state monitor callback. * * @locality rdkafka main thread * @locks none */ void rd_kafka_txn_coord_monitor_cb(rd_kafka_broker_t *rkb) { rd_kafka_t *rk = rkb->rkb_rk; rd_kafka_broker_state_t state = rd_kafka_broker_get_state(rkb); rd_bool_t is_up; rd_assert(rk->rk_eos.txn_coord == rkb); is_up = rd_kafka_broker_state_is_up(state); rd_rkb_dbg(rkb, EOS, "COORD", "Transaction coordinator is now %s", is_up ? "up" : "down"); if (!is_up) { /* Coordinator is down, the connection will be re-established * automatically, but we also trigger a coordinator query * to pick up on coordinator change. */ rd_kafka_txn_coord_timer_start(rk, 500); } else { /* Coordinator is up. */ rd_kafka_wrlock(rk); if (rk->rk_eos.idemp_state < RD_KAFKA_IDEMP_STATE_ASSIGNED) { /* See if a idempotence state change is warranted. */ rd_kafka_idemp_pid_fsm(rk); } else if (rk->rk_eos.idemp_state == RD_KAFKA_IDEMP_STATE_ASSIGNED) { /* PID is already valid, continue transactional * operations by checking for partitions to register */ rd_kafka_txn_schedule_register_partitions(rk, 1 /*ASAP*/); } rd_kafka_wrunlock(rk); } } /** * @brief Transactions manager destructor * * @locality rdkafka main thread * @locks none */ void rd_kafka_txns_term(rd_kafka_t *rk) { RD_IF_FREE(rk->rk_eos.txn_errstr, rd_free); RD_IF_FREE(rk->rk_eos.txn_curr_api.error, rd_kafka_error_destroy); mtx_destroy(&rk->rk_eos.txn_curr_api.lock); cnd_destroy(&rk->rk_eos.txn_curr_api.cnd); rd_kafka_timer_stop(&rk->rk_timers, &rk->rk_eos.txn_coord_tmr, 1); rd_kafka_timer_stop(&rk->rk_timers, &rk->rk_eos.txn_register_parts_tmr, 1); if (rk->rk_eos.txn_curr_coord) rd_kafka_broker_destroy(rk->rk_eos.txn_curr_coord); /* Logical coordinator */ rd_kafka_broker_persistent_connection_del( rk->rk_eos.txn_coord, &rk->rk_eos.txn_coord->rkb_persistconn.coord); rd_kafka_broker_monitor_del(&rk->rk_eos.txn_coord_mon); rd_kafka_broker_destroy(rk->rk_eos.txn_coord); rk->rk_eos.txn_coord = NULL; mtx_lock(&rk->rk_eos.txn_pending_lock); rd_kafka_txn_clear_pending_partitions(rk); mtx_unlock(&rk->rk_eos.txn_pending_lock); mtx_destroy(&rk->rk_eos.txn_pending_lock); rd_kafka_txn_clear_partitions(rk); } /** * @brief Initialize transactions manager. * * @locality application thread * @locks none */ void rd_kafka_txns_init(rd_kafka_t *rk) { rd_atomic32_init(&rk->rk_eos.txn_may_enq, 0); mtx_init(&rk->rk_eos.txn_pending_lock, mtx_plain); TAILQ_INIT(&rk->rk_eos.txn_pending_rktps); TAILQ_INIT(&rk->rk_eos.txn_waitresp_rktps); TAILQ_INIT(&rk->rk_eos.txn_rktps); mtx_init(&rk->rk_eos.txn_curr_api.lock, mtx_plain); cnd_init(&rk->rk_eos.txn_curr_api.cnd); /* Logical coordinator */ rk->rk_eos.txn_coord = rd_kafka_broker_add_logical(rk, "TxnCoordinator"); rd_kafka_broker_monitor_add(&rk->rk_eos.txn_coord_mon, rk->rk_eos.txn_coord, rk->rk_ops, rd_kafka_txn_coord_monitor_cb); rd_kafka_broker_persistent_connection_add( rk->rk_eos.txn_coord, &rk->rk_eos.txn_coord->rkb_persistconn.coord); rd_atomic64_init(&rk->rk_eos.txn_dr_fails, 0); }