/* * librdkafka - Apache Kafka C library * * Copyright (c) 2019, Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include "test.h" #include "rdkafka.h" #include "../src/rdkafka_proto.h" #include "../src/rdstring.h" #include "../src/rdunittest.h" #include /** * @name Producer transaction tests using the mock cluster * */ static int allowed_error; /** * @brief Decide what error_cb's will cause the test to fail. */ static int error_is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) { if (err == allowed_error || /* If transport errors are allowed then it is likely * that we'll also see ALL_BROKERS_DOWN. */ (allowed_error == RD_KAFKA_RESP_ERR__TRANSPORT && err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)) { TEST_SAY("Ignoring allowed error: %s: %s\n", rd_kafka_err2name(err), reason); return 0; } return 1; } static rd_kafka_resp_err_t (*on_response_received_cb)(rd_kafka_t *rk, int sockfd, const char *brokername, int32_t brokerid, int16_t ApiKey, int16_t ApiVersion, int32_t CorrId, size_t size, int64_t rtt, rd_kafka_resp_err_t err, void *ic_opaque); /** * @brief Simple on_response_received interceptor that simply calls the * sub-test's on_response_received_cb function, if set. */ static rd_kafka_resp_err_t on_response_received_trampoline(rd_kafka_t *rk, int sockfd, const char *brokername, int32_t brokerid, int16_t ApiKey, int16_t ApiVersion, int32_t CorrId, size_t size, int64_t rtt, rd_kafka_resp_err_t err, void *ic_opaque) { TEST_ASSERT(on_response_received_cb != NULL, ""); return on_response_received_cb(rk, sockfd, brokername, brokerid, ApiKey, ApiVersion, CorrId, size, rtt, err, ic_opaque); } /** * @brief on_new interceptor to add an on_response_received interceptor. */ static rd_kafka_resp_err_t on_new_producer(rd_kafka_t *rk, const rd_kafka_conf_t *conf, void *ic_opaque, char *errstr, size_t errstr_size) { rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; if (on_response_received_cb) err = rd_kafka_interceptor_add_on_response_received( rk, "on_response_received", on_response_received_trampoline, ic_opaque); return err; } /** * @brief Create a transactional producer and a mock cluster. * * The var-arg list is a NULL-terminated list of * (const char *key, const char *value) config properties. * * Special keys: * "on_response_received", "" - enable the on_response_received_cb * interceptor, * which must be assigned prior to * calling create_tnx_producer(). */ static RD_SENTINEL rd_kafka_t * create_txn_producer(rd_kafka_mock_cluster_t **mclusterp, const char *transactional_id, int broker_cnt, ...) { rd_kafka_conf_t *conf; rd_kafka_t *rk; char numstr[8]; va_list ap; const char *key; rd_bool_t add_interceptors = rd_false; rd_snprintf(numstr, sizeof(numstr), "%d", broker_cnt); test_conf_init(&conf, NULL, 60); test_conf_set(conf, "transactional.id", transactional_id); /* When mock brokers are set to down state they're still binding * the port, just not listening to it, which makes connection attempts * stall until socket.connection.setup.timeout.ms expires. * To speed up detection of brokers being down we reduce this timeout * to just a couple of seconds. */ test_conf_set(conf, "socket.connection.setup.timeout.ms", "5000"); /* Speed up reconnects */ test_conf_set(conf, "reconnect.backoff.max.ms", "2000"); test_conf_set(conf, "test.mock.num.brokers", numstr); rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); test_curr->ignore_dr_err = rd_false; va_start(ap, broker_cnt); while ((key = va_arg(ap, const char *))) { if (!strcmp(key, "on_response_received")) { add_interceptors = rd_true; (void)va_arg(ap, const char *); } else { test_conf_set(conf, key, va_arg(ap, const char *)); } } va_end(ap); /* Add an on_.. interceptors */ if (add_interceptors) rd_kafka_conf_interceptor_add_on_new(conf, "on_new_producer", on_new_producer, NULL); rk = test_create_handle(RD_KAFKA_PRODUCER, conf); if (mclusterp) { *mclusterp = rd_kafka_handle_mock_cluster(rk); TEST_ASSERT(*mclusterp, "failed to create mock cluster"); /* Create some of the common consumer "input" topics * that we must be able to commit to with * send_offsets_to_transaction(). * The number depicts the number of partitions in the topic. */ TEST_CALL_ERR__( rd_kafka_mock_topic_create(*mclusterp, "srctopic4", 4, 1)); TEST_CALL_ERR__(rd_kafka_mock_topic_create( *mclusterp, "srctopic64", 64, 1)); } return rk; } /** * @brief Test recoverable errors using mock broker error injections * and code coverage checks. */ static void do_test_txn_recoverable_errors(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; const char *groupid = "myGroupId"; const char *txnid = "myTxnId"; SUB_TEST_QUICK(); rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1", NULL); /* Make sure transaction and group coordinators are different. * This verifies that AddOffsetsToTxnRequest isn't sent to the * transaction coordinator but the group coordinator. */ rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1); rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 2); /* * Inject som InitProducerId errors that causes retries */ rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_InitProducerId, 3, RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); (void)RD_UT_COVERAGE_CHECK(0); /* idemp_request_pid_failed(retry) */ (void)RD_UT_COVERAGE_CHECK(1); /* txn_idemp_state_change(READY) */ /* * Start a transaction */ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); /* Produce a message without error first */ TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); rd_kafka_flush(rk, -1); /* * Produce a message, let it fail with a non-idempo/non-txn * retryable error */ rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_Produce, 1, RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS); TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); /* Make sure messages are produced */ rd_kafka_flush(rk, -1); /* * Send some arbitrary offsets, first with some failures, then * succeed. */ offsets = rd_kafka_topic_partition_list_new(4); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12; rd_kafka_topic_partition_list_add(offsets, "srctopic64", 39)->offset = 999999111; rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 999; rd_kafka_topic_partition_list_add(offsets, "srctopic64", 19)->offset = 123456789; rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_AddPartitionsToTxn, 1, RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART); rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_TxnOffsetCommit, 2, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS); cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); TEST_CALL_ERROR__( rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1)); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); /* * Commit transaction, first with som failures, then succeed. */ rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_EndTxn, 3, RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000)); /* All done */ rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief KIP-360: Test that fatal idempotence errors triggers abortable * transaction errors and that the producer can recover. */ static void do_test_txn_fatal_idempo_errors(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_error_t *error; const char *txnid = "myTxnId"; SUB_TEST_QUICK(); rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1", NULL); test_curr->ignore_dr_err = rd_true; test_curr->is_fatal_cb = error_is_fatal_cb; allowed_error = RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID; TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); /* * Start a transaction */ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); /* Produce a message without error first */ TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); /* Produce a message, let it fail with a fatal idempo error. */ rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_Produce, 1, RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID); TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); /* Commit the transaction, should fail */ error = rd_kafka_commit_transaction(rk, -1); TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail"); TEST_SAY("commit_transaction() failed (expectedly): %s\n", rd_kafka_error_string(error)); TEST_ASSERT(!rd_kafka_error_is_fatal(error), "Did not expect fatal error"); TEST_ASSERT(rd_kafka_error_txn_requires_abort(error), "Expected abortable error"); rd_kafka_error_destroy(error); /* Abort the transaction */ TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1)); /* Run a new transaction without errors to verify that the * producer can recover. */ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); /* All done */ rd_kafka_destroy(rk); allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR; SUB_TEST_PASS(); } /** * @brief KIP-360: Test that fatal idempotence errors triggers abortable * transaction errors, but let the broker-side bumping of the * producer PID take longer than the remaining transaction timeout * which should raise a retriable error from abort_transaction(). * * @param with_sleep After the first abort sleep longer than it takes to * re-init the pid so that the internal state automatically * transitions. */ static void do_test_txn_slow_reinit(rd_bool_t with_sleep) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_error_t *error; int32_t txn_coord = 2; const char *txnid = "myTxnId"; test_timing_t timing; SUB_TEST("%s sleep", with_sleep ? "with" : "without"); rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1", NULL); rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, txn_coord); test_curr->ignore_dr_err = rd_true; test_curr->is_fatal_cb = NULL; TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1)); /* * Start a transaction */ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); /* Produce a message without error first */ TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); test_flush(rk, -1); /* Set transaction coordinator latency higher than * the abort_transaction() call timeout so that the automatic * re-initpid takes longer than abort_transaction(). */ rd_kafka_mock_broker_push_request_error_rtts( mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1, RD_KAFKA_RESP_ERR_NO_ERROR, 10000 /*10s*/); /* Produce a message, let it fail with a fatal idempo error. */ rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_Produce, 1, RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID); TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); /* Commit the transaction, should fail */ TIMING_START(&timing, "commit_transaction(-1)"); error = rd_kafka_commit_transaction(rk, -1); TIMING_STOP(&timing); TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail"); TEST_SAY("commit_transaction() failed (expectedly): %s\n", rd_kafka_error_string(error)); TEST_ASSERT(!rd_kafka_error_is_fatal(error), "Did not expect fatal error"); TEST_ASSERT(rd_kafka_error_txn_requires_abort(error), "Expected abortable error"); rd_kafka_error_destroy(error); /* Abort the transaction, should fail with retriable (timeout) error */ TIMING_START(&timing, "abort_transaction(100)"); error = rd_kafka_abort_transaction(rk, 100); TIMING_STOP(&timing); TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail"); TEST_SAY("First abort_transaction() failed: %s\n", rd_kafka_error_string(error)); TEST_ASSERT(!rd_kafka_error_is_fatal(error), "Did not expect fatal error"); TEST_ASSERT(rd_kafka_error_is_retriable(error), "Expected retriable error"); rd_kafka_error_destroy(error); if (with_sleep) rd_sleep(12); /* Retry abort, should now finish. */ TEST_SAY("Retrying abort\n"); TIMING_START(&timing, "abort_transaction(-1)"); TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1)); TIMING_STOP(&timing); /* Run a new transaction without errors to verify that the * producer can recover. */ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); /* All done */ rd_kafka_destroy(rk); allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR; SUB_TEST_PASS(); } /** * @brief KIP-360: Test that fatal idempotence errors triggers abortable * transaction errors, but let the broker-side bumping of the * producer PID fail with a fencing error. * Should raise a fatal error. * * @param error_code Which error code InitProducerIdRequest should fail with. * Either RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH (older) * or RD_KAFKA_RESP_ERR_PRODUCER_FENCED (newer). */ static void do_test_txn_fenced_reinit(rd_kafka_resp_err_t error_code) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_error_t *error; int32_t txn_coord = 2; const char *txnid = "myTxnId"; char errstr[512]; rd_kafka_resp_err_t fatal_err; SUB_TEST_QUICK("With error %s", rd_kafka_err2name(error_code)); rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1", NULL); rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, txn_coord); test_curr->ignore_dr_err = rd_true; test_curr->is_fatal_cb = error_is_fatal_cb; allowed_error = RD_KAFKA_RESP_ERR__FENCED; TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1)); /* * Start a transaction */ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); /* Produce a message without error first */ TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); test_flush(rk, -1); /* Fail the PID reinit */ rd_kafka_mock_broker_push_request_error_rtts( mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1, error_code, 0); /* Produce a message, let it fail with a fatal idempo error. */ rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_Produce, 1, RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID); TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); test_flush(rk, -1); /* Abort the transaction, should fail with a fatal error */ error = rd_kafka_abort_transaction(rk, -1); TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail"); TEST_SAY("abort_transaction() failed: %s\n", rd_kafka_error_string(error)); TEST_ASSERT(rd_kafka_error_is_fatal(error), "Expected a fatal error"); rd_kafka_error_destroy(error); fatal_err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr)); TEST_ASSERT(fatal_err, "Expected a fatal error to have been raised"); TEST_SAY("Fatal error: %s: %s\n", rd_kafka_err2name(fatal_err), errstr); /* All done */ rd_kafka_destroy(rk); allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR; SUB_TEST_PASS(); } /** * @brief Test EndTxn errors. */ static void do_test_txn_endtxn_errors(void) { rd_kafka_t *rk = NULL; rd_kafka_mock_cluster_t *mcluster = NULL; rd_kafka_resp_err_t err; struct { size_t error_cnt; rd_kafka_resp_err_t errors[4]; rd_kafka_resp_err_t exp_err; rd_bool_t exp_retriable; rd_bool_t exp_abortable; rd_bool_t exp_fatal; rd_bool_t exp_successful_abort; } scenario[] = { /* This list of errors is from the EndTxnResponse handler in * AK clients/.../TransactionManager.java */ { /* #0 */ 2, {RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE}, /* Should auto-recover */ RD_KAFKA_RESP_ERR_NO_ERROR, }, { /* #1 */ 2, {RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_NOT_COORDINATOR}, /* Should auto-recover */ RD_KAFKA_RESP_ERR_NO_ERROR, }, { /* #2 */ 1, {RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS}, /* Should auto-recover */ RD_KAFKA_RESP_ERR_NO_ERROR, }, { /* #3 */ 3, {RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS, RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS, RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS}, /* Should auto-recover */ RD_KAFKA_RESP_ERR_NO_ERROR, }, { /* #4: the abort is auto-recovering thru epoch bump */ 1, {RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID}, RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID, rd_false /* !retriable */, rd_true /* abortable */, rd_false /* !fatal */, rd_true /* successful abort */ }, { /* #5: the abort is auto-recovering thru epoch bump */ 1, {RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING}, RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING, rd_false /* !retriable */, rd_true /* abortable */, rd_false /* !fatal */, rd_true /* successful abort */ }, { /* #6 */ 1, {RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH}, /* This error is normalized */ RD_KAFKA_RESP_ERR__FENCED, rd_false /* !retriable */, rd_false /* !abortable */, rd_true /* fatal */ }, { /* #7 */ 1, {RD_KAFKA_RESP_ERR_PRODUCER_FENCED}, /* This error is normalized */ RD_KAFKA_RESP_ERR__FENCED, rd_false /* !retriable */, rd_false /* !abortable */, rd_true /* fatal */ }, { /* #8 */ 1, {RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED}, RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED, rd_false /* !retriable */, rd_false /* !abortable */, rd_true /* fatal */ }, { /* #9 */ 1, {RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED}, RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED, rd_false /* !retriable */, rd_true /* abortable */, rd_false /* !fatal */ }, { /* #10 */ /* Any other error should raise a fatal error */ 1, {RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE}, RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE, rd_false /* !retriable */, rd_true /* abortable */, rd_false /* !fatal */, }, { /* #11 */ 1, {RD_KAFKA_RESP_ERR_PRODUCER_FENCED}, /* This error is normalized */ RD_KAFKA_RESP_ERR__FENCED, rd_false /* !retriable */, rd_false /* !abortable */, rd_true /* fatal */ }, {0}, }; int i; SUB_TEST_QUICK(); for (i = 0; scenario[i].error_cnt > 0; i++) { int j; /* For each scenario, test: * commit_transaction() * flush() + commit_transaction() * abort_transaction() * flush() + abort_transaction() */ for (j = 0; j < (2 + 2); j++) { rd_bool_t commit = j < 2; rd_bool_t with_flush = j & 1; rd_bool_t exp_successful_abort = !commit && scenario[i].exp_successful_abort; const char *commit_str = commit ? (with_flush ? "commit&flush" : "commit") : (with_flush ? "abort&flush" : "abort"); rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; rd_kafka_error_t *error; test_timing_t t_call; TEST_SAY("Testing scenario #%d %s with %" PRIusz " injected erorrs, expecting %s\n", i, commit_str, scenario[i].error_cnt, exp_successful_abort ? "successful abort" : rd_kafka_err2name(scenario[i].exp_err)); if (!rk) { const char *txnid = "myTxnId"; rk = create_txn_producer(&mcluster, txnid, 3, NULL); TEST_CALL_ERROR__( rd_kafka_init_transactions(rk, 5000)); } /* * Start transaction */ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); /* Transaction aborts will cause DR errors: * ignore them. */ test_curr->ignore_dr_err = !commit; /* * Produce a message. */ err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END); TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err)); if (with_flush) test_flush(rk, -1); /* * Send some arbitrary offsets. */ offsets = rd_kafka_topic_partition_list_new(4); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3) ->offset = 12; rd_kafka_topic_partition_list_add(offsets, "srctopic64", 60) ->offset = 99999; cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction( rk, offsets, cgmetadata, -1)); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); /* * Commit transaction, first with som failures, * then succeed. */ rd_kafka_mock_push_request_errors_array( mcluster, RD_KAFKAP_EndTxn, scenario[i].error_cnt, scenario[i].errors); TIMING_START(&t_call, "%s", commit_str); if (commit) error = rd_kafka_commit_transaction( rk, tmout_multip(5000)); else error = rd_kafka_abort_transaction( rk, tmout_multip(5000)); TIMING_STOP(&t_call); if (error) TEST_SAY( "Scenario #%d %s failed: %s: %s " "(retriable=%s, req_abort=%s, " "fatal=%s)\n", i, commit_str, rd_kafka_error_name(error), rd_kafka_error_string(error), RD_STR_ToF( rd_kafka_error_is_retriable(error)), RD_STR_ToF( rd_kafka_error_txn_requires_abort( error)), RD_STR_ToF(rd_kafka_error_is_fatal(error))); else TEST_SAY("Scenario #%d %s succeeded\n", i, commit_str); if (!scenario[i].exp_err || exp_successful_abort) { TEST_ASSERT(!error, "Expected #%d %s to succeed, " "got %s", i, commit_str, rd_kafka_error_string(error)); continue; } TEST_ASSERT(error != NULL, "Expected #%d %s to fail", i, commit_str); TEST_ASSERT(scenario[i].exp_err == rd_kafka_error_code(error), "Scenario #%d: expected %s, not %s", i, rd_kafka_err2name(scenario[i].exp_err), rd_kafka_error_name(error)); TEST_ASSERT( scenario[i].exp_retriable == (rd_bool_t)rd_kafka_error_is_retriable(error), "Scenario #%d: retriable mismatch", i); TEST_ASSERT( scenario[i].exp_abortable == (rd_bool_t)rd_kafka_error_txn_requires_abort( error), "Scenario #%d: abortable mismatch", i); TEST_ASSERT( scenario[i].exp_fatal == (rd_bool_t)rd_kafka_error_is_fatal(error), "Scenario #%d: fatal mismatch", i); /* Handle errors according to the error flags */ if (rd_kafka_error_is_fatal(error)) { TEST_SAY("Fatal error, destroying producer\n"); rd_kafka_error_destroy(error); rd_kafka_destroy(rk); rk = NULL; /* Will be re-created on the next * loop iteration. */ } else if (rd_kafka_error_txn_requires_abort(error)) { rd_kafka_error_destroy(error); TEST_SAY( "Abortable error, " "aborting transaction\n"); TEST_CALL_ERROR__( rd_kafka_abort_transaction(rk, -1)); } else if (rd_kafka_error_is_retriable(error)) { rd_kafka_error_destroy(error); TEST_SAY("Retriable error, retrying %s once\n", commit_str); if (commit) TEST_CALL_ERROR__( rd_kafka_commit_transaction(rk, 5000)); else TEST_CALL_ERROR__( rd_kafka_abort_transaction(rk, 5000)); } else { TEST_FAIL( "Scenario #%d %s: " "Permanent error without enough " "hints to proceed: %s\n", i, commit_str, rd_kafka_error_string(error)); } } } /* All done */ if (rk) rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Test that the commit/abort works properly with infinite timeout. */ static void do_test_txn_endtxn_infinite(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster = NULL; const char *txnid = "myTxnId"; int i; SUB_TEST_QUICK(); rk = create_txn_producer(&mcluster, txnid, 3, NULL); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); for (i = 0; i < 2; i++) { rd_bool_t commit = i == 0; const char *commit_str = commit ? "commit" : "abort"; rd_kafka_error_t *error; test_timing_t t_call; /* Messages will fail on as the transaction fails, * ignore the DR error */ test_curr->ignore_dr_err = rd_true; TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); /* * Commit/abort transaction, first with som retriable failures, * then success. */ rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_EndTxn, 10, RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_NOT_COORDINATOR); rd_sleep(1); TIMING_START(&t_call, "%s_transaction()", commit_str); if (commit) error = rd_kafka_commit_transaction(rk, -1); else error = rd_kafka_abort_transaction(rk, -1); TIMING_STOP(&t_call); TEST_SAY("%s returned %s\n", commit_str, error ? rd_kafka_error_string(error) : "success"); TEST_ASSERT(!error, "Expected %s to succeed, got %s", commit_str, rd_kafka_error_string(error)); } /* All done */ rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Test that the commit/abort user timeout is honoured. */ static void do_test_txn_endtxn_timeout(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster = NULL; const char *txnid = "myTxnId"; int i; SUB_TEST_QUICK(); rk = create_txn_producer(&mcluster, txnid, 3, NULL); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); for (i = 0; i < 2; i++) { rd_bool_t commit = i == 0; const char *commit_str = commit ? "commit" : "abort"; rd_kafka_error_t *error; test_timing_t t_call; /* Messages will fail as the transaction fails, * ignore the DR error */ test_curr->ignore_dr_err = rd_true; TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); /* * Commit/abort transaction, first with some retriable failures * whos retries exceed the user timeout. */ rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_EndTxn, 10, RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS, RD_KAFKA_RESP_ERR_NOT_COORDINATOR, RD_KAFKA_RESP_ERR_NOT_COORDINATOR); rd_sleep(1); TIMING_START(&t_call, "%s_transaction()", commit_str); if (commit) error = rd_kafka_commit_transaction(rk, 100); else error = rd_kafka_abort_transaction(rk, 100); TIMING_STOP(&t_call); TEST_SAY_ERROR(error, "%s returned: ", commit_str); TEST_ASSERT(error != NULL, "Expected %s to fail", commit_str); TEST_ASSERT( rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT, "Expected %s to fail with timeout, not %s: %s", commit_str, rd_kafka_error_name(error), rd_kafka_error_string(error)); TEST_ASSERT(rd_kafka_error_is_retriable(error), "%s failure should raise a retriable error", commit_str); rd_kafka_error_destroy(error); /* Now call it again with an infinite timeout, should work. */ TIMING_START(&t_call, "%s_transaction() nr 2", commit_str); if (commit) TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); else TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1)); TIMING_STOP(&t_call); } /* All done */ rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Test commit/abort inflight timeout behaviour, which should result * in a retriable error. */ static void do_test_txn_endtxn_timeout_inflight(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster = NULL; const char *txnid = "myTxnId"; int32_t coord_id = 1; int i; SUB_TEST(); allowed_error = RD_KAFKA_RESP_ERR__TIMED_OUT; test_curr->is_fatal_cb = error_is_fatal_cb; rk = create_txn_producer(&mcluster, txnid, 1, "transaction.timeout.ms", "5000", NULL); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1)); for (i = 0; i < 2; i++) { rd_bool_t commit = i == 0; const char *commit_str = commit ? "commit" : "abort"; rd_kafka_error_t *error; test_timing_t t_call; /* Messages will fail as the transaction fails, * ignore the DR error */ test_curr->ignore_dr_err = rd_true; TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); /* Let EndTxn & EndTxn retry timeout */ rd_kafka_mock_broker_push_request_error_rtts( mcluster, coord_id, RD_KAFKAP_EndTxn, 2, RD_KAFKA_RESP_ERR_NO_ERROR, 10000, RD_KAFKA_RESP_ERR_NO_ERROR, 10000); rd_sleep(1); TIMING_START(&t_call, "%s_transaction()", commit_str); if (commit) error = rd_kafka_commit_transaction(rk, 4000); else error = rd_kafka_abort_transaction(rk, 4000); TIMING_STOP(&t_call); TEST_SAY_ERROR(error, "%s returned: ", commit_str); TEST_ASSERT(error != NULL, "Expected %s to fail", commit_str); TEST_ASSERT( rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT, "Expected %s to fail with timeout, not %s: %s", commit_str, rd_kafka_error_name(error), rd_kafka_error_string(error)); TEST_ASSERT(rd_kafka_error_is_retriable(error), "%s failure should raise a retriable error", commit_str); rd_kafka_error_destroy(error); /* Now call it again with an infinite timeout, should work. */ TIMING_START(&t_call, "%s_transaction() nr 2", commit_str); if (commit) TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); else TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1)); TIMING_STOP(&t_call); } /* All done */ rd_kafka_destroy(rk); allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR; test_curr->is_fatal_cb = NULL; SUB_TEST_PASS(); } /** * @brief Test that EndTxn is properly sent for aborted transactions * even if AddOffsetsToTxnRequest was retried. * This is a check for a txn_req_cnt bug. */ static void do_test_txn_req_cnt(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; const char *txnid = "myTxnId"; SUB_TEST_QUICK(); rk = create_txn_producer(&mcluster, txnid, 3, NULL); /* Messages will fail on abort(), ignore the DR error */ test_curr->ignore_dr_err = rd_true; TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); /* * Send some arbitrary offsets, first with some failures, then * succeed. */ offsets = rd_kafka_topic_partition_list_new(2); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12; rd_kafka_topic_partition_list_add(offsets, "srctopic64", 40)->offset = 999999111; rd_kafka_mock_push_request_errors(mcluster, RD_KAFKAP_AddOffsetsToTxn, 2, RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, RD_KAFKA_RESP_ERR_NOT_COORDINATOR); rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_TxnOffsetCommit, 2, RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS, RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART); cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); TEST_CALL_ERROR__( rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1)); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000)); /* All done */ rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Test abortable errors using mock broker error injections * and code coverage checks. */ static void do_test_txn_requires_abort_errors(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_error_t *error; rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; int r; SUB_TEST_QUICK(); rk = create_txn_producer(&mcluster, "txnid", 3, NULL); test_curr->ignore_dr_err = rd_true; TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); /* * 1. Fail on produce */ TEST_SAY("1. Fail on produce\n"); rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_Produce, 1, RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED); err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END); TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err)); /* Wait for messages to fail */ test_flush(rk, 5000); /* Any other transactional API should now raise an error */ offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12; cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); error = rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); TEST_ASSERT(error, "expected error"); TEST_ASSERT(rd_kafka_error_txn_requires_abort(error), "expected abortable error, not %s", rd_kafka_error_string(error)); TEST_SAY("Error %s: %s\n", rd_kafka_error_name(error), rd_kafka_error_string(error)); rd_kafka_error_destroy(error); TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1)); /* * 2. Restart transaction and fail on AddPartitionsToTxn */ TEST_SAY("2. Fail on AddPartitionsToTxn\n"); /* First refresh proper Metadata to clear the topic's auth error, * otherwise the produce() below will fail immediately. */ r = test_get_partition_count(rk, "mytopic", 5000); TEST_ASSERT(r > 0, "Expected topic %s to exist", "mytopic"); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_AddPartitionsToTxn, 1, RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED); err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END); TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err)); error = rd_kafka_commit_transaction(rk, 5000); TEST_ASSERT(error, "commit_transaction should have failed"); TEST_SAY("commit_transaction() error %s: %s\n", rd_kafka_error_name(error), rd_kafka_error_string(error)); rd_kafka_error_destroy(error); TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1)); /* * 3. Restart transaction and fail on AddOffsetsToTxn */ TEST_SAY("3. Fail on AddOffsetsToTxn\n"); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END); TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err)); rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_AddOffsetsToTxn, 1, RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED); offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12; cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); error = rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1); TEST_ASSERT(error, "Expected send_offsets..() to fail"); TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED, "expected send_offsets_to_transaction() to fail with " "group auth error: not %s", rd_kafka_error_name(error)); rd_kafka_error_destroy(error); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); error = rd_kafka_commit_transaction(rk, 5000); TEST_ASSERT(error, "commit_transaction should have failed"); rd_kafka_error_destroy(error); TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1)); /* All done */ rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Test error handling and recover for when broker goes down during * an ongoing transaction. */ static void do_test_txn_broker_down_in_txn(rd_bool_t down_coord) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; int32_t coord_id, leader_id, down_id; const char *down_what; rd_kafka_resp_err_t err; const char *topic = "test"; const char *transactional_id = "txnid"; int msgcnt = 1000; int remains = 0; /* Assign coordinator and leader to two different brokers */ coord_id = 1; leader_id = 2; if (down_coord) { down_id = coord_id; down_what = "coordinator"; } else { down_id = leader_id; down_what = "leader"; } SUB_TEST_QUICK("Test %s down", down_what); rk = create_txn_producer(&mcluster, transactional_id, 3, NULL); /* Broker down is not a test-failing error */ allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT; test_curr->is_fatal_cb = error_is_fatal_cb; err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3); TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err)); rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id, coord_id); rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id); /* Start transactioning */ TEST_SAY("Starting transaction\n"); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, msgcnt / 2, NULL, 0, &remains); TEST_SAY("Bringing down %s %" PRId32 "\n", down_what, down_id); rd_kafka_mock_broker_set_down(mcluster, down_id); rd_kafka_flush(rk, 3000); /* Produce remaining messages */ test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, msgcnt / 2, msgcnt / 2, NULL, 0, &remains); rd_sleep(2); TEST_SAY("Bringing up %s %" PRId32 "\n", down_what, down_id); rd_kafka_mock_broker_set_up(mcluster, down_id); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains); rd_kafka_destroy(rk); test_curr->is_fatal_cb = NULL; SUB_TEST_PASS(); } /** * @brief Advance the coord_id to the next broker. */ static void set_next_coord(rd_kafka_mock_cluster_t *mcluster, const char *transactional_id, int broker_cnt, int32_t *coord_idp) { int32_t new_coord_id; new_coord_id = 1 + ((*coord_idp) % (broker_cnt)); TEST_SAY("Changing transaction coordinator from %" PRId32 " to %" PRId32 "\n", *coord_idp, new_coord_id); rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id, new_coord_id); *coord_idp = new_coord_id; } /** * @brief Switch coordinator during a transaction. * */ static void do_test_txn_switch_coordinator(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; int32_t coord_id; const char *topic = "test"; const char *transactional_id = "txnid"; const int broker_cnt = 5; const int iterations = 20; int i; test_timeout_set(iterations * 10); SUB_TEST("Test switching coordinators"); rk = create_txn_producer(&mcluster, transactional_id, broker_cnt, NULL); coord_id = 1; rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id, coord_id); /* Start transactioning */ TEST_SAY("Starting transaction\n"); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); for (i = 0; i < iterations; i++) { const int msgcnt = 100; int remains = 0; set_next_coord(mcluster, transactional_id, broker_cnt, &coord_id); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, msgcnt / 2, NULL, 0); if (!(i % 3)) set_next_coord(mcluster, transactional_id, broker_cnt, &coord_id); /* Produce remaining messages */ test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, msgcnt / 2, msgcnt / 2, NULL, 0, &remains); if ((i & 1) || !(i % 8)) set_next_coord(mcluster, transactional_id, broker_cnt, &coord_id); if (!(i % 5)) { test_curr->ignore_dr_err = rd_false; TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); } else { test_curr->ignore_dr_err = rd_true; TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1)); } } rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Switch coordinator during a transaction when AddOffsetsToTxn * are sent. #3571. */ static void do_test_txn_switch_coordinator_refresh(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; const char *topic = "test"; const char *transactional_id = "txnid"; rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; SUB_TEST("Test switching coordinators (refresh)"); rk = create_txn_producer(&mcluster, transactional_id, 3, NULL); rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id, 1); /* Start transactioning */ TEST_SAY("Starting transaction\n"); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); /* Switch the coordinator so that AddOffsetsToTxnRequest * will respond with NOT_COORDINATOR. */ TEST_SAY("Switching to coordinator 2\n"); rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id, 2); /* * Send some arbitrary offsets. */ offsets = rd_kafka_topic_partition_list_new(4); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12; rd_kafka_topic_partition_list_add(offsets, "srctopic64", 29)->offset = 99999; cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction( rk, offsets, cgmetadata, 20 * 1000)); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); /* Produce some messages */ test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, 10, NULL, 0); /* And commit the transaction */ TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Test fatal error handling when transactions are not supported * by the broker. */ static void do_test_txns_not_supported(void) { rd_kafka_t *rk; rd_kafka_conf_t *conf; rd_kafka_mock_cluster_t *mcluster; rd_kafka_error_t *error; rd_kafka_resp_err_t err; SUB_TEST_QUICK(); test_conf_init(&conf, NULL, 10); test_conf_set(conf, "transactional.id", "myxnid"); test_conf_set(conf, "bootstrap.servers", ","); rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); rk = test_create_handle(RD_KAFKA_PRODUCER, conf); /* Create mock cluster */ mcluster = rd_kafka_mock_cluster_new(rk, 3); /* Disable InitProducerId */ rd_kafka_mock_set_apiversion(mcluster, 22 /*InitProducerId*/, -1, -1); rd_kafka_brokers_add(rk, rd_kafka_mock_cluster_bootstraps(mcluster)); error = rd_kafka_init_transactions(rk, 5 * 1000); TEST_SAY("init_transactions() returned %s: %s\n", error ? rd_kafka_error_name(error) : "success", error ? rd_kafka_error_string(error) : "success"); TEST_ASSERT(error, "Expected init_transactions() to fail"); TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, "Expected init_transactions() to fail with %s, not %s: %s", rd_kafka_err2name(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE), rd_kafka_error_name(error), rd_kafka_error_string(error)); rd_kafka_error_destroy(error); err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("test"), RD_KAFKA_V_KEY("test", 4), RD_KAFKA_V_END); TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL, "Expected producev() to fail with %s, not %s", rd_kafka_err2name(RD_KAFKA_RESP_ERR__FATAL), rd_kafka_err2name(err)); rd_kafka_mock_cluster_destroy(mcluster); rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief CONCURRENT_TRANSACTION on AddOffsets.. should be retried. */ static void do_test_txns_send_offsets_concurrent_is_retried(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; SUB_TEST_QUICK(); rk = create_txn_producer(&mcluster, "txnid", 3, NULL); test_curr->ignore_dr_err = rd_true; TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END); TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err)); /* Wait for messages to be delivered */ test_flush(rk, 5000); /* * Have AddOffsetsToTxn fail but eventually succeed due to * infinite retries. */ rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_AddOffsetsToTxn, 1 + 5, /* first request + some retries */ RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS, RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS, RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS, RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS, RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS, RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS); offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12; cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); TEST_CALL_ERROR__( rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1)); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000)); /* All done */ rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Verify that send_offsets_to_transaction() with no eligible offsets * is handled properly - the call should succeed immediately and be * repeatable. */ static void do_test_txns_send_offsets_non_eligible(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; SUB_TEST_QUICK(); rk = create_txn_producer(&mcluster, "txnid", 3, NULL); test_curr->ignore_dr_err = rd_true; TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END); TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err)); /* Wait for messages to be delivered */ test_flush(rk, 5000); /* Empty offsets list */ offsets = rd_kafka_topic_partition_list_new(0); cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); TEST_CALL_ERROR__( rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1)); /* Now call it again, should also succeed. */ TEST_CALL_ERROR__( rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1)); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000)); /* All done */ rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Verify that request timeouts don't cause crash (#2913). */ static void do_test_txns_no_timeout_crash(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_error_t *error; rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; SUB_TEST_QUICK(); rk = create_txn_producer(&mcluster, "txnid", 3, "socket.timeout.ms", "1000", "transaction.timeout.ms", "5000", NULL); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END); TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err)); test_flush(rk, -1); /* Delay all broker connections */ if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 2000)) || (err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 2000)) || (err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 2000))) TEST_FAIL("Failed to set broker RTT: %s", rd_kafka_err2str(err)); /* send_offsets..() should now time out */ offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12; cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); error = rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1); TEST_ASSERT(error, "Expected send_offsets..() to fail"); TEST_SAY("send_offsets..() failed with %serror: %s\n", rd_kafka_error_is_retriable(error) ? "retriable " : "", rd_kafka_error_string(error)); TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT, "expected send_offsets_to_transaction() to fail with " "timeout, not %s", rd_kafka_error_name(error)); TEST_ASSERT(rd_kafka_error_is_retriable(error), "expected send_offsets_to_transaction() to fail with " "a retriable error"); rd_kafka_error_destroy(error); /* Reset delay and try again */ if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 0)) || (err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 0)) || (err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 0))) TEST_FAIL("Failed to reset broker RTT: %s", rd_kafka_err2str(err)); TEST_SAY("Retrying send_offsets..()\n"); error = rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1); TEST_ASSERT(!error, "Expected send_offsets..() to succeed, got: %s", rd_kafka_error_string(error)); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); /* All done */ rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Test auth failure handling. */ static void do_test_txn_auth_failure(int16_t ApiKey, rd_kafka_resp_err_t ErrorCode) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_error_t *error; SUB_TEST_QUICK("ApiKey=%s ErrorCode=%s", rd_kafka_ApiKey2str(ApiKey), rd_kafka_err2name(ErrorCode)); rk = create_txn_producer(&mcluster, "txnid", 3, NULL); rd_kafka_mock_push_request_errors(mcluster, ApiKey, 1, ErrorCode); error = rd_kafka_init_transactions(rk, 5000); TEST_ASSERT(error, "Expected init_transactions() to fail"); TEST_SAY("init_transactions() failed: %s: %s\n", rd_kafka_err2name(rd_kafka_error_code(error)), rd_kafka_error_string(error)); TEST_ASSERT(rd_kafka_error_code(error) == ErrorCode, "Expected error %s, not %s", rd_kafka_err2name(ErrorCode), rd_kafka_err2name(rd_kafka_error_code(error))); TEST_ASSERT(rd_kafka_error_is_fatal(error), "Expected error to be fatal"); TEST_ASSERT(!rd_kafka_error_is_retriable(error), "Expected error to not be retriable"); rd_kafka_error_destroy(error); /* All done */ rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Issue #3041: Commit fails due to message flush() taking too long, * eventually resulting in an unabortable error and failure to * re-init the transactional producer. */ static void do_test_txn_flush_timeout(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; rd_kafka_error_t *error; const char *txnid = "myTxnId"; const char *topic = "myTopic"; const int32_t coord_id = 2; int msgcounter = 0; rd_bool_t is_retry = rd_false; SUB_TEST_QUICK(); rk = create_txn_producer(&mcluster, txnid, 3, "message.timeout.ms", "10000", "transaction.timeout.ms", "10000", /* Speed up coordinator reconnect */ "reconnect.backoff.max.ms", "1000", NULL); /* Broker down is not a test-failing error */ test_curr->is_fatal_cb = error_is_fatal_cb; allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT; rd_kafka_mock_topic_create(mcluster, topic, 2, 3); /* Set coordinator so we can disconnect it later */ rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, coord_id); /* * Init transactions */ TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); retry: if (!is_retry) { /* First attempt should fail. */ test_curr->ignore_dr_err = rd_true; test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT; /* Assign invalid partition leaders for some partitions so * that messages will not be delivered. */ rd_kafka_mock_partition_set_leader(mcluster, topic, 0, -1); rd_kafka_mock_partition_set_leader(mcluster, topic, 1, -1); } else { /* The retry should succeed */ test_curr->ignore_dr_err = rd_false; test_curr->exp_dr_err = is_retry ? RD_KAFKA_RESP_ERR_NO_ERROR : RD_KAFKA_RESP_ERR__MSG_TIMED_OUT; rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1); } /* * Start a transaction */ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); /* * Produce some messages to specific partitions and random. */ test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 100, NULL, 10, &msgcounter); test_produce_msgs2_nowait(rk, topic, 1, 0, 0, 100, NULL, 10, &msgcounter); test_produce_msgs2_nowait(rk, topic, RD_KAFKA_PARTITION_UA, 0, 0, 100, NULL, 10, &msgcounter); /* * Send some arbitrary offsets. */ offsets = rd_kafka_topic_partition_list_new(4); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12; rd_kafka_topic_partition_list_add(offsets, "srctopic64", 49)->offset = 999999111; rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 999; rd_kafka_topic_partition_list_add(offsets, "srctopic64", 34)->offset = 123456789; cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); TEST_CALL_ERROR__( rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1)); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); rd_sleep(2); if (!is_retry) { /* Now disconnect the coordinator. */ TEST_SAY("Disconnecting transaction coordinator %" PRId32 "\n", coord_id); rd_kafka_mock_broker_set_down(mcluster, coord_id); } /* * Start committing. */ error = rd_kafka_commit_transaction(rk, -1); if (!is_retry) { TEST_ASSERT(error != NULL, "Expected commit to fail"); TEST_SAY("commit_transaction() failed (expectedly): %s\n", rd_kafka_error_string(error)); rd_kafka_error_destroy(error); } else { TEST_ASSERT(!error, "Expected commit to succeed, not: %s", rd_kafka_error_string(error)); } if (!is_retry) { /* * Bring the coordinator back up. */ rd_kafka_mock_broker_set_up(mcluster, coord_id); rd_sleep(2); /* * Abort, and try again, this time without error. */ TEST_SAY("Aborting and retrying\n"); is_retry = rd_true; TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 60000)); goto retry; } /* All done */ rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief ESC-4424: rko is reused in response handler after destroy in coord_req * sender due to bad state. * * This is somewhat of a race condition so we need to perform a couple of * iterations before it hits, usually 2 or 3, so we try at least 15 times. */ static void do_test_txn_coord_req_destroy(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; int i; int errcnt = 0; SUB_TEST(); rk = create_txn_producer(&mcluster, "txnid", 3, NULL); test_curr->ignore_dr_err = rd_true; TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); for (i = 0; i < 15; i++) { rd_kafka_error_t *error; rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; test_timeout_set(10); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); /* * Inject errors to trigger retries */ rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_AddPartitionsToTxn, 2, /* first request + number of internal retries */ RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS, RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS); rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_AddOffsetsToTxn, 1, /* first request + number of internal retries */ RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS); err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END); TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err)); rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_Produce, 4, RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT, RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED, RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED); /* FIXME: When KIP-360 is supported, add this error: * RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER */ err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END); TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err)); /* * Send offsets to transaction */ offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3) ->offset = 12; cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); error = rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1); TEST_SAY("send_offsets_to_transaction() #%d: %s\n", i, rd_kafka_error_string(error)); /* As we can't control the exact timing and sequence * of requests this sometimes fails and sometimes succeeds, * but we run the test enough times to trigger at least * one failure. */ if (error) { TEST_SAY( "send_offsets_to_transaction() #%d " "failed (expectedly): %s\n", i, rd_kafka_error_string(error)); TEST_ASSERT(rd_kafka_error_txn_requires_abort(error), "Expected abortable error for #%d", i); rd_kafka_error_destroy(error); errcnt++; } rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); /* Allow time for internal retries */ rd_sleep(2); TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000)); } TEST_ASSERT(errcnt > 0, "Expected at least one send_offets_to_transaction() " "failure"); /* All done */ rd_kafka_destroy(rk); } static rd_atomic32_t multi_find_req_cnt; static rd_kafka_resp_err_t multi_find_on_response_received_cb(rd_kafka_t *rk, int sockfd, const char *brokername, int32_t brokerid, int16_t ApiKey, int16_t ApiVersion, int32_t CorrId, size_t size, int64_t rtt, rd_kafka_resp_err_t err, void *ic_opaque) { rd_kafka_mock_cluster_t *mcluster = rd_kafka_handle_mock_cluster(rk); rd_bool_t done = rd_atomic32_get(&multi_find_req_cnt) > 10000; if (ApiKey != RD_KAFKAP_AddOffsetsToTxn || done) return RD_KAFKA_RESP_ERR_NO_ERROR; TEST_SAY("on_response_received_cb: %s: %s: brokerid %" PRId32 ", ApiKey %hd, CorrId %d, rtt %.2fms, %s: %s\n", rd_kafka_name(rk), brokername, brokerid, ApiKey, CorrId, rtt != -1 ? (float)rtt / 1000.0 : 0.0, done ? "already done" : "not done yet", rd_kafka_err2name(err)); if (rd_atomic32_add(&multi_find_req_cnt, 1) == 1) { /* Trigger a broker down/up event, which in turns * triggers the coord_req_fsm(). */ rd_kafka_mock_broker_set_down(mcluster, 2); rd_kafka_mock_broker_set_up(mcluster, 2); return RD_KAFKA_RESP_ERR_NO_ERROR; } /* Trigger a broker down/up event, which in turns * triggers the coord_req_fsm(). */ rd_kafka_mock_broker_set_down(mcluster, 3); rd_kafka_mock_broker_set_up(mcluster, 3); /* Clear the downed broker's latency so that it reconnects * quickly, otherwise the ApiVersionRequest will be delayed and * this will in turn delay the -> UP transition that we need to * trigger the coord_reqs. */ rd_kafka_mock_broker_set_rtt(mcluster, 3, 0); /* Only do this down/up once */ rd_atomic32_add(&multi_find_req_cnt, 10000); return RD_KAFKA_RESP_ERR_NO_ERROR; } /** * @brief ESC-4444: multiple FindCoordinatorRequests are sent referencing * the same coord_req_t, but the first one received will destroy * the coord_req_t object and make the subsequent FindCoordingResponses * reference a freed object. * * What we want to achieve is this sequence: * 1. AddOffsetsToTxnRequest + Response which.. * 2. Triggers TxnOffsetCommitRequest, but the coordinator is not known, so.. * 3. Triggers a FindCoordinatorRequest * 4. FindCoordinatorResponse from 3 is received .. * 5. A TxnOffsetCommitRequest is sent from coord_req_fsm(). * 6. Another broker changing state to Up triggers coord reqs again, which.. * 7. Triggers a second TxnOffsetCommitRequest from coord_req_fsm(). * 7. FindCoordinatorResponse from 5 is received, references the destroyed rko * and crashes. */ static void do_test_txn_coord_req_multi_find(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_error_t *error; rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; const char *txnid = "txnid", *groupid = "mygroupid", *topic = "mytopic"; int i; SUB_TEST(); rd_atomic32_init(&multi_find_req_cnt, 0); on_response_received_cb = multi_find_on_response_received_cb; rk = create_txn_producer(&mcluster, txnid, 3, /* Need connections to all brokers so we * can trigger coord_req_fsm events * by toggling connections. */ "enable.sparse.connections", "false", /* Set up on_response_received interceptor */ "on_response_received", "", NULL); /* Let broker 1 be both txn and group coordinator * so that the group coordinator connection is up when it is time * send the TxnOffsetCommitRequest. */ rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1); rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1); /* Set broker 1, 2, and 3 as leaders for a partition each and * later produce to both partitions so we know there's a connection * to all brokers. */ rd_kafka_mock_topic_create(mcluster, topic, 3, 1); rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 2); rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 3); /* Broker down is not a test-failing error */ allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT; test_curr->is_fatal_cb = error_is_fatal_cb; TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); for (i = 0; i < 3; i++) { err = rd_kafka_producev( rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(i), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END); TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err)); } test_flush(rk, 5000); /* * send_offsets_to_transaction() will query for the group coordinator, * we need to make those requests slow so that multiple requests are * sent. */ for (i = 1; i <= 3; i++) rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 4000); /* * Send offsets to transaction */ offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12; cgmetadata = rd_kafka_consumer_group_metadata_new(groupid); error = rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1); TEST_SAY("send_offsets_to_transaction() %s\n", rd_kafka_error_string(error)); TEST_ASSERT(!error, "send_offsets_to_transaction() failed: %s", rd_kafka_error_string(error)); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); /* Clear delay */ for (i = 1; i <= 3; i++) rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 0); rd_sleep(5); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000)); /* All done */ TEST_ASSERT(rd_atomic32_get(&multi_find_req_cnt) > 10000, "on_request_sent interceptor did not trigger properly"); rd_kafka_destroy(rk); on_response_received_cb = NULL; SUB_TEST_PASS(); } /** * @brief ESC-4410: adding producer partitions gradually will trigger multiple * AddPartitionsToTxn requests. Due to a bug the third partition to be * registered would hang in PEND_TXN state. * * Trigger this behaviour by having two outstanding AddPartitionsToTxn requests * at the same time, followed by a need for a third: * * 1. Set coordinator broker rtt high (to give us time to produce). * 2. Produce to partition 0, will trigger first AddPartitionsToTxn. * 3. Produce to partition 1, will trigger second AddPartitionsToTxn. * 4. Wait for second AddPartitionsToTxn response. * 5. Produce to partition 2, should trigger AddPartitionsToTxn, but bug * causes it to be stale in pending state. */ static rd_atomic32_t multi_addparts_resp_cnt; static rd_kafka_resp_err_t multi_addparts_response_received_cb(rd_kafka_t *rk, int sockfd, const char *brokername, int32_t brokerid, int16_t ApiKey, int16_t ApiVersion, int32_t CorrId, size_t size, int64_t rtt, rd_kafka_resp_err_t err, void *ic_opaque) { if (ApiKey == RD_KAFKAP_AddPartitionsToTxn) { TEST_SAY("on_response_received_cb: %s: %s: brokerid %" PRId32 ", ApiKey %hd, CorrId %d, rtt %.2fms, count %" PRId32 ": %s\n", rd_kafka_name(rk), brokername, brokerid, ApiKey, CorrId, rtt != -1 ? (float)rtt / 1000.0 : 0.0, rd_atomic32_get(&multi_addparts_resp_cnt), rd_kafka_err2name(err)); rd_atomic32_add(&multi_addparts_resp_cnt, 1); } return RD_KAFKA_RESP_ERR_NO_ERROR; } static void do_test_txn_addparts_req_multi(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; const char *txnid = "txnid", *topic = "mytopic"; int32_t txn_coord = 2; SUB_TEST(); rd_atomic32_init(&multi_addparts_resp_cnt, 0); on_response_received_cb = multi_addparts_response_received_cb; rk = create_txn_producer(&mcluster, txnid, 3, "linger.ms", "0", "message.timeout.ms", "9000", /* Set up on_response_received interceptor */ "on_response_received", "", NULL); /* Let broker 1 be txn coordinator. */ rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, txn_coord); rd_kafka_mock_topic_create(mcluster, topic, 3, 1); /* Set partition leaders to non-txn-coord broker so they wont * be affected by rtt delay */ rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1); rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 1); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); /* * Run one transaction first to let the client familiarize with * the topic, this avoids metadata lookups, etc, when the real * test is run. */ TEST_SAY("Running seed transaction\n"); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); TEST_CALL_ERR__(rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_VALUE("seed", 4), RD_KAFKA_V_END)); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000)); /* * Now perform test transaction with rtt delays */ TEST_SAY("Running test transaction\n"); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); /* Reset counter */ rd_atomic32_set(&multi_addparts_resp_cnt, 0); /* Add latency to txn coordinator so we can pace our produce() calls */ rd_kafka_mock_broker_set_rtt(mcluster, txn_coord, 1000); /* Produce to partition 0 */ TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); rd_usleep(500 * 1000, NULL); /* Produce to partition 1 */ TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(1), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); TEST_SAY("Waiting for two AddPartitionsToTxnResponse\n"); while (rd_atomic32_get(&multi_addparts_resp_cnt) < 2) rd_usleep(10 * 1000, NULL); TEST_SAY("%" PRId32 " AddPartitionsToTxnResponses seen\n", rd_atomic32_get(&multi_addparts_resp_cnt)); /* Produce to partition 2, this message will hang in * queue if the bug is not fixed. */ TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(2), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); /* Allow some extra time for things to settle before committing * transaction. */ rd_usleep(1000 * 1000, NULL); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 10 * 1000)); /* All done */ rd_kafka_destroy(rk); on_response_received_cb = NULL; SUB_TEST_PASS(); } /** * @brief Test handling of OffsetFetchRequest returning UNSTABLE_OFFSET_COMMIT. * * There are two things to test; * - OffsetFetch triggered by committed() (and similar code paths) * - OffsetFetch triggered by assign() */ static void do_test_unstable_offset_commit(void) { rd_kafka_t *rk, *c; rd_kafka_conf_t *c_conf; rd_kafka_mock_cluster_t *mcluster; rd_kafka_topic_partition_list_t *offsets; const char *topic = "srctopic4"; const int msgcnt = 100; const int64_t offset_to_commit = msgcnt / 2; int i; int remains = 0; SUB_TEST_QUICK(); rk = create_txn_producer(&mcluster, "txnid", 3, NULL); test_conf_init(&c_conf, NULL, 0); test_conf_set(c_conf, "security.protocol", "PLAINTEXT"); test_conf_set(c_conf, "bootstrap.servers", rd_kafka_mock_cluster_bootstraps(mcluster)); test_conf_set(c_conf, "enable.partition.eof", "true"); test_conf_set(c_conf, "auto.offset.reset", "error"); c = test_create_consumer("mygroup", NULL, c_conf, NULL); rd_kafka_mock_topic_create(mcluster, topic, 2, 3); /* Produce some messages to the topic so that the consumer has * something to read. */ TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); test_produce_msgs2_nowait(rk, topic, 0, 0, 0, msgcnt, NULL, 0, &remains); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); /* Commit offset */ offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset = offset_to_commit; TEST_CALL_ERR__(rd_kafka_commit(c, offsets, 0 /*sync*/)); rd_kafka_topic_partition_list_destroy(offsets); /* Retrieve offsets by calling committed(). * * Have OffsetFetch fail and retry, on the first iteration * the API timeout is higher than the amount of time the retries will * take and thus succeed, and on the second iteration the timeout * will be lower and thus fail. */ for (i = 0; i < 2; i++) { rd_kafka_resp_err_t err; rd_kafka_resp_err_t exp_err = i == 0 ? RD_KAFKA_RESP_ERR_NO_ERROR : RD_KAFKA_RESP_ERR__TIMED_OUT; int timeout_ms = exp_err ? 200 : 5 * 1000; rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_OffsetFetch, 1 + 5, /* first request + some retries */ RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT, RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT, RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT, RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT, RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT, RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT); offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(offsets, topic, 0); err = rd_kafka_committed(c, offsets, timeout_ms); TEST_SAY("#%d: committed() returned %s (expected %s)\n", i, rd_kafka_err2name(err), rd_kafka_err2name(exp_err)); TEST_ASSERT(err == exp_err, "#%d: Expected committed() to return %s, not %s", i, rd_kafka_err2name(exp_err), rd_kafka_err2name(err)); TEST_ASSERT(offsets->cnt == 1, "Expected 1 committed offset, not %d", offsets->cnt); if (!exp_err) TEST_ASSERT(offsets->elems[0].offset == offset_to_commit, "Expected committed offset %" PRId64 ", " "not %" PRId64, offset_to_commit, offsets->elems[0].offset); else TEST_ASSERT(offsets->elems[0].offset < 0, "Expected no committed offset, " "not %" PRId64, offsets->elems[0].offset); rd_kafka_topic_partition_list_destroy(offsets); } TEST_SAY("Phase 2: OffsetFetch lookup through assignment\n"); offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset = RD_KAFKA_OFFSET_STORED; rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_OffsetFetch, 1 + 5, /* first request + some retries */ RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT, RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT, RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT, RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT, RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT, RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT); test_consumer_incremental_assign("assign", c, offsets); rd_kafka_topic_partition_list_destroy(offsets); test_consumer_poll_exact("consume", c, 0, 1 /*eof*/, 0, msgcnt / 2, rd_true /*exact counts*/, NULL); /* All done */ rd_kafka_destroy(c); rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief If a message times out locally before being attempted to send * and commit_transaction() is called, the transaction must not succeed. * https://github.com/confluentinc/confluent-kafka-dotnet/issues/1568 */ static void do_test_commit_after_msg_timeout(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; int32_t coord_id, leader_id; rd_kafka_resp_err_t err; rd_kafka_error_t *error; const char *topic = "test"; const char *transactional_id = "txnid"; int remains = 0; SUB_TEST_QUICK(); /* Assign coordinator and leader to two different brokers */ coord_id = 1; leader_id = 2; rk = create_txn_producer(&mcluster, transactional_id, 3, "message.timeout.ms", "5000", "transaction.timeout.ms", "10000", NULL); /* Broker down is not a test-failing error */ allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT; test_curr->is_fatal_cb = error_is_fatal_cb; test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT; err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3); TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err)); rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id, coord_id); rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id); /* Start transactioning */ TEST_SAY("Starting transaction\n"); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); TEST_SAY("Bringing down %" PRId32 "\n", leader_id); rd_kafka_mock_broker_set_down(mcluster, leader_id); rd_kafka_mock_broker_set_down(mcluster, coord_id); test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains); error = rd_kafka_commit_transaction(rk, -1); TEST_ASSERT(error != NULL, "expected commit_transaciton() to fail"); TEST_SAY_ERROR(error, "commit_transaction() failed (as expected): "); TEST_ASSERT(rd_kafka_error_txn_requires_abort(error), "Expected txn_requires_abort error"); rd_kafka_error_destroy(error); /* Bring the brokers up so the abort can complete */ rd_kafka_mock_broker_set_up(mcluster, coord_id); rd_kafka_mock_broker_set_up(mcluster, leader_id); TEST_SAY("Aborting transaction\n"); TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1)); TEST_ASSERT(remains == 0, "%d message(s) were not flushed\n", remains); TEST_SAY("Attempting second transaction, which should succeed\n"); test_curr->is_fatal_cb = error_is_fatal_cb; test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR; TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains); rd_kafka_destroy(rk); allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR; test_curr->is_fatal_cb = NULL; SUB_TEST_PASS(); } /** * @brief #3575: Verify that OUT_OF_ORDER_SEQ does not trigger an epoch bump * during an ongoing transaction. * The transaction should instead enter the abortable state. */ static void do_test_out_of_order_seq(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_error_t *error; int32_t txn_coord = 1, leader = 2; const char *txnid = "myTxnId"; test_timing_t timing; rd_kafka_resp_err_t err; SUB_TEST_QUICK(); rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1", NULL); rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, txn_coord); rd_kafka_mock_partition_set_leader(mcluster, "mytopic", 0, leader); test_curr->ignore_dr_err = rd_true; test_curr->is_fatal_cb = NULL; TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1)); /* * Start a transaction */ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); /* Produce one seeding message first to get the leader up and running */ TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); test_flush(rk, -1); /* Let partition leader have a latency of 2 seconds * so that we can have multiple messages in-flight. */ rd_kafka_mock_broker_set_rtt(mcluster, leader, 2 * 1000); /* Produce a message, let it fail with with different errors, * ending with OUT_OF_ORDER which previously triggered an * Epoch bump. */ rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_Produce, 3, RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION, RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER); /* Produce three messages that will be delayed * and have errors injected.*/ TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); /* Now sleep a short while so that the messages are processed * by the broker and errors are returned. */ TEST_SAY("Sleeping..\n"); rd_sleep(5); rd_kafka_mock_broker_set_rtt(mcluster, leader, 0); /* Produce a fifth message, should fail with ERR__STATE since * the transaction should have entered the abortable state. */ err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END); TEST_ASSERT(err == RD_KAFKA_RESP_ERR__STATE, "Expected produce() to fail with ERR__STATE, not %s", rd_kafka_err2name(err)); TEST_SAY("produce() failed as expected: %s\n", rd_kafka_err2str(err)); /* Commit the transaction, should fail with abortable error. */ TIMING_START(&timing, "commit_transaction(-1)"); error = rd_kafka_commit_transaction(rk, -1); TIMING_STOP(&timing); TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail"); TEST_SAY("commit_transaction() failed (expectedly): %s\n", rd_kafka_error_string(error)); TEST_ASSERT(!rd_kafka_error_is_fatal(error), "Did not expect fatal error"); TEST_ASSERT(rd_kafka_error_txn_requires_abort(error), "Expected abortable error"); rd_kafka_error_destroy(error); /* Abort the transaction */ TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1)); /* Run a new transaction without errors to verify that the * producer can recover. */ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Verify lossless delivery if topic disappears from Metadata for awhile. * * If a topic is removed from metadata inbetween transactions, the producer * will remove its partition state for the topic's partitions. * If later the same topic comes back (same topic instance, not a new creation) * then the producer must restore the previously used msgid/BaseSequence * in case the same Epoch is still used, or messages will be silently lost * as they would seem like legit duplicates to the broker. * * Reproduction: * 1. produce msgs to topic, commit transaction. * 2. remove topic from metadata * 3. make sure client updates its metadata, which removes the partition * objects. * 4. restore the topic in metadata * 5. produce new msgs to topic, commit transaction. * 6. consume topic. All messages should be accounted for. */ static void do_test_topic_disappears_for_awhile(void) { rd_kafka_t *rk, *c; rd_kafka_conf_t *c_conf; rd_kafka_mock_cluster_t *mcluster; const char *topic = "mytopic"; const char *txnid = "myTxnId"; test_timing_t timing; int i; int msgcnt = 0; const int partition_cnt = 10; SUB_TEST_QUICK(); rk = create_txn_producer( &mcluster, txnid, 1, "batch.num.messages", "3", "linger.ms", "100", "topic.metadata.refresh.interval.ms", "2000", NULL); rd_kafka_mock_topic_create(mcluster, topic, partition_cnt, 1); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1)); for (i = 0; i < 2; i++) { int cnt = 3 * 2 * partition_cnt; rd_bool_t remove_topic = (i % 2) == 0; /* * Start a transaction */ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); while (cnt-- >= 0) { TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(cnt % partition_cnt), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); msgcnt++; } /* Commit the transaction */ TIMING_START(&timing, "commit_transaction(-1)"); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); TIMING_STOP(&timing); if (remove_topic) { /* Make it seem the topic is removed, refresh metadata, * and then make the topic available again. */ const rd_kafka_metadata_t *md; TEST_SAY("Marking topic as non-existent\n"); rd_kafka_mock_topic_set_error( mcluster, topic, RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART); TEST_CALL_ERR__(rd_kafka_metadata(rk, 0, NULL, &md, tmout_multip(5000))); rd_kafka_metadata_destroy(md); rd_sleep(2); TEST_SAY("Bringing topic back to life\n"); rd_kafka_mock_topic_set_error( mcluster, topic, RD_KAFKA_RESP_ERR_NO_ERROR); } } TEST_SAY("Verifying messages by consumtion\n"); test_conf_init(&c_conf, NULL, 0); test_conf_set(c_conf, "security.protocol", "PLAINTEXT"); test_conf_set(c_conf, "bootstrap.servers", rd_kafka_mock_cluster_bootstraps(mcluster)); test_conf_set(c_conf, "enable.partition.eof", "true"); test_conf_set(c_conf, "auto.offset.reset", "earliest"); c = test_create_consumer("mygroup", NULL, c_conf, NULL); test_consumer_subscribe(c, topic); test_consumer_poll_exact("consume", c, 0, partition_cnt, 0, msgcnt, rd_true /*exact*/, NULL); rd_kafka_destroy(c); rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Test that group coordinator requests can handle an * untimely disconnect. * * The transaction manager makes use of librdkafka coord_req to commit * transaction offsets to the group coordinator. * If the connection to the given group coordinator is not up the * coord_req code will request a connection once, but if this connection fails * there will be no new attempts and the coord_req will idle until either * destroyed or the connection is retried for other reasons. * This in turn stalls the send_offsets_to_transaction() call until the * transaction times out. * * There are two variants to this test based on switch_coord: * - True - Switches the coordinator during the downtime. * The client should detect this and send the request to the * new coordinator. * - False - The coordinator remains on the down broker. Client will reconnect * when down broker comes up again. */ struct some_state { rd_kafka_mock_cluster_t *mcluster; rd_bool_t switch_coord; int32_t broker_id; const char *grpid; }; static int delayed_up_cb(void *arg) { struct some_state *state = arg; rd_sleep(3); if (state->switch_coord) { TEST_SAY("Switching group coordinator to %" PRId32 "\n", state->broker_id); rd_kafka_mock_coordinator_set(state->mcluster, "group", state->grpid, state->broker_id); } else { TEST_SAY("Bringing up group coordinator %" PRId32 "..\n", state->broker_id); rd_kafka_mock_broker_set_up(state->mcluster, state->broker_id); } return 0; } static void do_test_disconnected_group_coord(rd_bool_t switch_coord) { const char *topic = "mytopic"; const char *txnid = "myTxnId"; const char *grpid = "myGrpId"; const int partition_cnt = 1; rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; struct some_state state = RD_ZERO_INIT; test_timing_t timing; thrd_t thrd; int ret; SUB_TEST_QUICK("switch_coord=%s", RD_STR_ToF(switch_coord)); test_curr->is_fatal_cb = error_is_fatal_cb; allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT; rk = create_txn_producer(&mcluster, txnid, 3, NULL); rd_kafka_mock_topic_create(mcluster, topic, partition_cnt, 1); /* Broker 1: txn coordinator * Broker 2: group coordinator * Broker 3: partition leader & backup coord if switch_coord=true */ rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1); rd_kafka_mock_coordinator_set(mcluster, "group", grpid, 2); rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 3); /* Bring down group coordinator so there are no undesired * connections to it. */ rd_kafka_mock_broker_set_down(mcluster, 2); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); test_flush(rk, -1); rd_sleep(1); /* Run a background thread that after 3s, which should be enough * to perform the first failed connection attempt, makes the * group coordinator available again. */ state.switch_coord = switch_coord; state.mcluster = mcluster; state.grpid = grpid; state.broker_id = switch_coord ? 3 : 2; if (thrd_create(&thrd, delayed_up_cb, &state) != thrd_success) TEST_FAIL("Failed to create thread"); TEST_SAY("Calling send_offsets_to_transaction()\n"); offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 1; cgmetadata = rd_kafka_consumer_group_metadata_new(grpid); TIMING_START(&timing, "send_offsets_to_transaction(-1)"); TEST_CALL_ERROR__( rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1)); TIMING_STOP(&timing); TIMING_ASSERT(&timing, 0, 10 * 1000 /*10s*/); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); thrd_join(thrd, &ret); /* Commit the transaction */ TIMING_START(&timing, "commit_transaction(-1)"); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); TIMING_STOP(&timing); rd_kafka_destroy(rk); allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR; test_curr->is_fatal_cb = NULL; SUB_TEST_PASS(); } /** * @brief Test that a NULL coordinator is not fatal when * the transactional producer reconnects to the txn coordinator * and the first thing it does is a FindCoordinatorRequest that * fails with COORDINATOR_NOT_AVAILABLE, setting coordinator to NULL. */ static void do_test_txn_coordinator_null_not_fatal(void) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_error_t *error; rd_kafka_resp_err_t err; int32_t coord_id = 1; const char *topic = "test"; const char *transactional_id = "txnid"; int msgcnt = 1; int remains = 0; SUB_TEST_QUICK(); /* Broker down is not a test-failing error */ allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT; test_curr->is_fatal_cb = error_is_fatal_cb; test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT; /* One second is the minimum transaction timeout */ rk = create_txn_producer(&mcluster, transactional_id, 1, "transaction.timeout.ms", "1000", NULL); err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1); TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err)); rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id, coord_id); rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id); /* Start transactioning */ TEST_SAY("Starting transaction\n"); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); /* Makes the produce request timeout. */ rd_kafka_mock_broker_push_request_error_rtts( mcluster, coord_id, RD_KAFKAP_Produce, 1, RD_KAFKA_RESP_ERR_NO_ERROR, 3000); test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, msgcnt, NULL, 0, &remains); /* This value is linked to transaction.timeout.ms, needs enough time * so the message times out and a DrainBump sequence is started. */ rd_kafka_flush(rk, 1000); /* To trigger the error the COORDINATOR_NOT_AVAILABLE response * must come AFTER idempotent state has changed to WaitTransport * but BEFORE it changes to WaitPID. To make it more likely * rd_kafka_txn_coord_timer_start timeout can be changed to 5 ms * in rd_kafka_txn_coord_query, when unable to query for * transaction coordinator. */ rd_kafka_mock_broker_push_request_error_rtts( mcluster, coord_id, RD_KAFKAP_FindCoordinator, 1, RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, 10); /* Coordinator down starts the FindCoordinatorRequest loop. */ TEST_SAY("Bringing down coordinator %" PRId32 "\n", coord_id); rd_kafka_mock_broker_set_down(mcluster, coord_id); /* Coordinator down for some time. */ rd_usleep(100 * 1000, NULL); /* When it comes up, the error is triggered, if the preconditions * happen. */ TEST_SAY("Bringing up coordinator %" PRId32 "\n", coord_id); rd_kafka_mock_broker_set_up(mcluster, coord_id); /* Make sure DRs are received */ rd_kafka_flush(rk, 1000); error = rd_kafka_commit_transaction(rk, -1); TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains); TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail"); TEST_SAY("commit_transaction() failed (expectedly): %s\n", rd_kafka_error_string(error)); rd_kafka_error_destroy(error); /* Needs to wait some time before closing to make sure it doesn't go * into TERMINATING state before error is triggered. */ rd_usleep(1000 * 1000, NULL); rd_kafka_destroy(rk); allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR; test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR; test_curr->is_fatal_cb = NULL; SUB_TEST_PASS(); } /** * @brief Simple test to make sure the init_transactions() timeout is honoured * and also not infinite. */ static void do_test_txn_resumable_init(void) { rd_kafka_t *rk; const char *transactional_id = "txnid"; rd_kafka_error_t *error; test_timing_t duration; SUB_TEST(); rd_kafka_conf_t *conf; test_conf_init(&conf, NULL, 20); test_conf_set(conf, "bootstrap.servers", ""); test_conf_set(conf, "transactional.id", transactional_id); test_conf_set(conf, "transaction.timeout.ms", "4000"); rk = test_create_handle(RD_KAFKA_PRODUCER, conf); /* First make sure a lower timeout is honoured. */ TIMING_START(&duration, "init_transactions(1000)"); error = rd_kafka_init_transactions(rk, 1000); TIMING_STOP(&duration); if (error) TEST_SAY("First init_transactions failed (as expected): %s\n", rd_kafka_error_string(error)); TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT, "Expected _TIMED_OUT, not %s", error ? rd_kafka_error_string(error) : "success"); rd_kafka_error_destroy(error); TIMING_ASSERT(&duration, 900, 1500); TEST_SAY( "Performing second init_transactions() call now with an " "infinite timeout: " "should time out in 2 x transaction.timeout.ms\n"); TIMING_START(&duration, "init_transactions(infinite)"); error = rd_kafka_init_transactions(rk, -1); TIMING_STOP(&duration); if (error) TEST_SAY("Second init_transactions failed (as expected): %s\n", rd_kafka_error_string(error)); TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT, "Expected _TIMED_OUT, not %s", error ? rd_kafka_error_string(error) : "success"); rd_kafka_error_destroy(error); TIMING_ASSERT(&duration, 2 * 4000 - 500, 2 * 4000 + 500); rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Retries a transaction call until it succeeds or returns a * non-retriable error - which will cause the test to fail. * * @param intermed_calls Is a block of code that will be called after each * retriable failure of \p call. */ #define RETRY_TXN_CALL__(call, intermed_calls) \ do { \ rd_kafka_error_t *_error = call; \ if (!_error) \ break; \ TEST_SAY_ERROR(_error, "%s: ", "" #call); \ TEST_ASSERT(rd_kafka_error_is_retriable(_error), \ "Expected retriable error"); \ TEST_SAY("%s failed, retrying in 1 second\n", "" #call); \ rd_kafka_error_destroy(_error); \ intermed_calls; \ rd_sleep(1); \ } while (1) /** * @brief Call \p call and expect it to fail with \p exp_err_code. */ #define TXN_CALL_EXPECT_ERROR__(call, exp_err_code) \ do { \ rd_kafka_error_t *_error = call; \ TEST_ASSERT(_error != NULL, \ "%s: Expected %s error, got success", "" #call, \ rd_kafka_err2name(exp_err_code)); \ TEST_SAY_ERROR(_error, "%s: ", "" #call); \ TEST_ASSERT(rd_kafka_error_code(_error) == exp_err_code, \ "%s: Expected %s error, got %s", "" #call, \ rd_kafka_err2name(exp_err_code), \ rd_kafka_error_name(_error)); \ rd_kafka_error_destroy(_error); \ } while (0) /** * @brief Simple test to make sure short API timeouts can be safely resumed * by calling the same API again. * * @param do_commit Commit transaction if true, else abort transaction. */ static void do_test_txn_resumable_calls_timeout(rd_bool_t do_commit) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; int32_t coord_id = 1; const char *topic = "test"; const char *transactional_id = "txnid"; int msgcnt = 1; int remains = 0; SUB_TEST("%s_transaction", do_commit ? "commit" : "abort"); rk = create_txn_producer(&mcluster, transactional_id, 1, NULL); err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1); TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err)); rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id, coord_id); rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id); TEST_SAY("Starting transaction\n"); TEST_SAY("Delaying first two InitProducerIdRequests by 500ms\n"); rd_kafka_mock_broker_push_request_error_rtts( mcluster, coord_id, RD_KAFKAP_InitProducerId, 2, RD_KAFKA_RESP_ERR_NO_ERROR, 500, RD_KAFKA_RESP_ERR_NO_ERROR, 500); RETRY_TXN_CALL__( rd_kafka_init_transactions(rk, 100), TXN_CALL_EXPECT_ERROR__(rd_kafka_abort_transaction(rk, -1), RD_KAFKA_RESP_ERR__CONFLICT)); RETRY_TXN_CALL__(rd_kafka_begin_transaction(rk), /*none*/); TEST_SAY("Delaying ProduceRequests by 3000ms\n"); rd_kafka_mock_broker_push_request_error_rtts( mcluster, coord_id, RD_KAFKAP_Produce, 1, RD_KAFKA_RESP_ERR_NO_ERROR, 3000); test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, msgcnt, NULL, 0, &remains); TEST_SAY("Delaying SendOffsetsToTransaction by 400ms\n"); rd_kafka_mock_broker_push_request_error_rtts( mcluster, coord_id, RD_KAFKAP_AddOffsetsToTxn, 1, RD_KAFKA_RESP_ERR_NO_ERROR, 400); offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 12; cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); /* This is not a resumable call on timeout */ TEST_CALL_ERROR__( rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1)); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); TEST_SAY("Delaying EndTxnRequests by 1200ms\n"); rd_kafka_mock_broker_push_request_error_rtts( mcluster, coord_id, RD_KAFKAP_EndTxn, 1, RD_KAFKA_RESP_ERR_NO_ERROR, 1200); /* Committing/aborting the transaction will also be delayed by the * previous accumulated remaining delays. */ if (do_commit) { TEST_SAY("Committing transaction\n"); RETRY_TXN_CALL__( rd_kafka_commit_transaction(rk, 100), TXN_CALL_EXPECT_ERROR__(rd_kafka_abort_transaction(rk, -1), RD_KAFKA_RESP_ERR__CONFLICT)); } else { TEST_SAY("Aborting transaction\n"); RETRY_TXN_CALL__( rd_kafka_abort_transaction(rk, 100), TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, -1), RD_KAFKA_RESP_ERR__CONFLICT)); } rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Verify that resuming timed out calls that after the timeout, but * before the resuming call, would error out. */ static void do_test_txn_resumable_calls_timeout_error(rd_bool_t do_commit) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_resp_err_t err; int32_t coord_id = 1; const char *topic = "test"; const char *transactional_id = "txnid"; int msgcnt = 1; int remains = 0; rd_kafka_error_t *error; SUB_TEST_QUICK("%s_transaction", do_commit ? "commit" : "abort"); rk = create_txn_producer(&mcluster, transactional_id, 1, NULL); err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1); TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err)); rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id, coord_id); rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id); TEST_SAY("Starting transaction\n"); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, msgcnt, NULL, 0, &remains); TEST_SAY("Fail EndTxn fatally after 2000ms\n"); rd_kafka_mock_broker_push_request_error_rtts( mcluster, coord_id, RD_KAFKAP_EndTxn, 1, RD_KAFKA_RESP_ERR_INVALID_TXN_STATE, 2000); if (do_commit) { TEST_SAY("Committing transaction\n"); TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, 500), RD_KAFKA_RESP_ERR__TIMED_OUT); /* Sleep so that the background EndTxn fails locally and sets * an error result. */ rd_sleep(3); error = rd_kafka_commit_transaction(rk, -1); } else { TEST_SAY("Aborting transaction\n"); TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, 500), RD_KAFKA_RESP_ERR__TIMED_OUT); /* Sleep so that the background EndTxn fails locally and sets * an error result. */ rd_sleep(3); error = rd_kafka_commit_transaction(rk, -1); } TEST_ASSERT(error != NULL && rd_kafka_error_is_fatal(error), "Expected fatal error, not %s", rd_kafka_error_string(error)); TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR_INVALID_TXN_STATE, "Expected error INVALID_TXN_STATE, got %s", rd_kafka_error_name(error)); rd_kafka_error_destroy(error); rd_kafka_destroy(rk); SUB_TEST_PASS(); } /** * @brief Concurrent transaction API calls are not permitted. * This test makes sure they're properly enforced. * * For each transactional API, call it with a 5s timeout, and during that time * from another thread call transactional APIs, one by one, and verify that * we get an ERR__CONFLICT error back in the second thread. * * We use a mutex for synchronization, the main thread will hold the lock * when not calling an API but release it just prior to calling. * The other thread will acquire the lock, sleep, and hold the lock while * calling the concurrent API that should fail immediately, releasing the lock * when done. * */ struct _txn_concurrent_state { const char *api; mtx_t lock; rd_kafka_t *rk; struct test *test; }; static int txn_concurrent_thread_main(void *arg) { struct _txn_concurrent_state *state = arg; static const char *apis[] = { "init_transactions", "begin_transaction", "send_offsets_to_transaction", "commit_transaction", "abort_transaction", NULL}; rd_kafka_t *rk = state->rk; const char *main_api = NULL; int i; /* Update TLS variable so TEST_..() macros work */ test_curr = state->test; while (1) { const char *api = NULL; const int timeout_ms = 10000; rd_kafka_error_t *error = NULL; rd_kafka_resp_err_t exp_err; test_timing_t duration; /* Wait for other thread's txn call to start, then sleep a bit * to increase the chance of that call has really begun. */ mtx_lock(&state->lock); if (state->api && state->api == main_api) { /* Main thread is still blocking on the last API call */ TEST_SAY("Waiting for main thread to finish %s()\n", main_api); mtx_unlock(&state->lock); rd_sleep(1); continue; } else if (!(main_api = state->api)) { mtx_unlock(&state->lock); break; } rd_sleep(1); for (i = 0; (api = apis[i]) != NULL; i++) { TEST_SAY( "Triggering concurrent %s() call while " "main is in %s() call\n", api, main_api); TIMING_START(&duration, "%s", api); if (!strcmp(api, "init_transactions")) error = rd_kafka_init_transactions(rk, timeout_ms); else if (!strcmp(api, "begin_transaction")) error = rd_kafka_begin_transaction(rk); else if (!strcmp(api, "send_offsets_to_transaction")) { rd_kafka_topic_partition_list_t *offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_consumer_group_metadata_t *cgmetadata = rd_kafka_consumer_group_metadata_new( "mygroupid"); rd_kafka_topic_partition_list_add( offsets, "srctopic4", 0) ->offset = 12; error = rd_kafka_send_offsets_to_transaction( rk, offsets, cgmetadata, -1); rd_kafka_consumer_group_metadata_destroy( cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); } else if (!strcmp(api, "commit_transaction")) error = rd_kafka_commit_transaction(rk, timeout_ms); else if (!strcmp(api, "abort_transaction")) error = rd_kafka_abort_transaction(rk, timeout_ms); else TEST_FAIL("Unknown API: %s", api); TIMING_STOP(&duration); TEST_SAY_ERROR(error, "Conflicting %s() call: ", api); TEST_ASSERT(error, "Expected conflicting %s() call to fail", api); exp_err = !strcmp(api, main_api) ? RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS : RD_KAFKA_RESP_ERR__CONFLICT; TEST_ASSERT(rd_kafka_error_code(error) == exp_err, "Conflicting %s(): Expected %s, not %s", api, rd_kafka_err2str(exp_err), rd_kafka_error_name(error)); TEST_ASSERT( rd_kafka_error_is_retriable(error), "Conflicting %s(): Expected retriable error", api); rd_kafka_error_destroy(error); /* These calls should fail immediately */ TIMING_ASSERT(&duration, 0, 100); } mtx_unlock(&state->lock); } return 0; } static void do_test_txn_concurrent_operations(rd_bool_t do_commit) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; int32_t coord_id = 1; rd_kafka_resp_err_t err; const char *topic = "test"; const char *transactional_id = "txnid"; int remains = 0; thrd_t thrd; struct _txn_concurrent_state state = RD_ZERO_INIT; rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; SUB_TEST("%s", do_commit ? "commit" : "abort"); test_timeout_set(90); /* We need to override the value of socket.connection.setup.timeout.ms * to be at least 2*RTT of the mock broker. This is because the first * ApiVersion request will fail, since we make the request with v3, and * the mock broker's MaxVersion is 2, so the request is retried with v0. * We use the value 3*RTT to add some buffer. */ rk = create_txn_producer(&mcluster, transactional_id, 1, "socket.connection.setup.timeout.ms", "15000", NULL); /* Set broker RTT to 3.5s so that the background thread has ample * time to call its conflicting APIs. * This value must be less than socket.connection.setup.timeout.ms/2. */ rd_kafka_mock_broker_set_rtt(mcluster, coord_id, 3500); err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1); TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err)); /* Set up shared state between us and the concurrent thread */ mtx_init(&state.lock, mtx_plain); state.test = test_curr; state.rk = rk; /* We release the lock only while calling the TXN API */ mtx_lock(&state.lock); /* Spin up concurrent thread */ if (thrd_create(&thrd, txn_concurrent_thread_main, (void *)&state) != thrd_success) TEST_FAIL("Failed to create thread"); #define _start_call(callname) \ do { \ state.api = callname; \ mtx_unlock(&state.lock); \ } while (0) #define _end_call() mtx_lock(&state.lock) _start_call("init_transactions"); TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1)); _end_call(); /* This call doesn't block, so can't really be tested concurrently. */ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, 10, NULL, 0, &remains); _start_call("send_offsets_to_transaction"); offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 12; cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); TEST_CALL_ERROR__( rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1)); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); _end_call(); if (do_commit) { _start_call("commit_transaction"); TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1)); _end_call(); } else { _start_call("abort_transaction"); TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1)); _end_call(); } /* Signal completion to background thread */ state.api = NULL; mtx_unlock(&state.lock); thrd_join(thrd, NULL); rd_kafka_destroy(rk); mtx_destroy(&state.lock); SUB_TEST_PASS(); } /** * @brief KIP-360: Test that fatal idempotence errors triggers abortable * transaction errors, but let the broker-side abort of the * transaction fail with a fencing error. * Should raise a fatal error. * * @param error_code Which error code EndTxn should fail with. * Either RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH (older) * or RD_KAFKA_RESP_ERR_PRODUCER_FENCED (newer). */ static void do_test_txn_fenced_abort(rd_kafka_resp_err_t error_code) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_error_t *error; int32_t txn_coord = 2; const char *txnid = "myTxnId"; char errstr[512]; rd_kafka_resp_err_t fatal_err; size_t errors_cnt; SUB_TEST_QUICK("With error %s", rd_kafka_err2name(error_code)); rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1", NULL); rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, txn_coord); test_curr->ignore_dr_err = rd_true; test_curr->is_fatal_cb = error_is_fatal_cb; allowed_error = RD_KAFKA_RESP_ERR__FENCED; TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1)); /* * Start a transaction */ TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); /* Produce a message without error first */ TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); test_flush(rk, -1); /* Fail abort transaction */ rd_kafka_mock_broker_push_request_error_rtts( mcluster, txn_coord, RD_KAFKAP_EndTxn, 1, error_code, 0); /* Fail the PID reinit */ rd_kafka_mock_broker_push_request_error_rtts( mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1, error_code, 0); /* Produce a message, let it fail with a fatal idempo error. */ rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_Produce, 1, RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID); TEST_CALL_ERR__(rd_kafka_producev( rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END)); test_flush(rk, -1); /* Abort the transaction, should fail with a fatal error */ error = rd_kafka_abort_transaction(rk, -1); TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail"); TEST_SAY_ERROR(error, "abort_transaction() failed: "); TEST_ASSERT(rd_kafka_error_is_fatal(error), "Expected a fatal error"); rd_kafka_error_destroy(error); fatal_err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr)); TEST_ASSERT(fatal_err, "Expected a fatal error to have been raised"); TEST_SAY("Fatal error: %s: %s\n", rd_kafka_err2name(fatal_err), errstr); /* Verify that the producer sent the expected number of EndTxn requests * by inspecting the mock broker error stack, * which should now be empty. */ if (rd_kafka_mock_broker_error_stack_cnt( mcluster, txn_coord, RD_KAFKAP_EndTxn, &errors_cnt)) { TEST_FAIL( "Broker error count should succeed for API %s" " on broker %" PRId32, rd_kafka_ApiKey2str(RD_KAFKAP_EndTxn), txn_coord); } /* Checks all the RD_KAFKAP_EndTxn responses have been consumed */ TEST_ASSERT(errors_cnt == 0, "Expected error count 0 for API %s, found %zu", rd_kafka_ApiKey2str(RD_KAFKAP_EndTxn), errors_cnt); if (rd_kafka_mock_broker_error_stack_cnt( mcluster, txn_coord, RD_KAFKAP_InitProducerId, &errors_cnt)) { TEST_FAIL( "Broker error count should succeed for API %s" " on broker %" PRId32, rd_kafka_ApiKey2str(RD_KAFKAP_InitProducerId), txn_coord); } /* Checks none of the RD_KAFKAP_InitProducerId responses have been * consumed */ TEST_ASSERT(errors_cnt == 1, "Expected error count 1 for API %s, found %zu", rd_kafka_ApiKey2str(RD_KAFKAP_InitProducerId), errors_cnt); /* All done */ rd_kafka_destroy(rk); allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR; SUB_TEST_PASS(); } /** * @brief Test that the TxnOffsetCommit op doesn't retry without waiting * if the coordinator is found but not available, causing too frequent retries. */ static void do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_bool_t times_out) { rd_kafka_t *rk; rd_kafka_mock_cluster_t *mcluster; rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *offsets; rd_kafka_consumer_group_metadata_t *cgmetadata; rd_kafka_error_t *error; int timeout; SUB_TEST_QUICK("times_out=%s", RD_STR_ToF(times_out)); rk = create_txn_producer(&mcluster, "txnid", 3, NULL); test_curr->ignore_dr_err = rd_true; TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000)); TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk)); err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END); TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err)); /* Wait for messages to be delivered */ test_flush(rk, 5000); /* * Fail TxnOffsetCommit with COORDINATOR_NOT_AVAILABLE * repeatedly. */ rd_kafka_mock_push_request_errors( mcluster, RD_KAFKAP_TxnOffsetCommit, 4, RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE); offsets = rd_kafka_topic_partition_list_new(1); rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 1; cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid"); /* The retry delay is 500ms, with 4 retries it should take at least * 2000ms for this call to succeed. */ timeout = times_out ? 500 : 4000; error = rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, timeout); rd_kafka_consumer_group_metadata_destroy(cgmetadata); rd_kafka_topic_partition_list_destroy(offsets); if (times_out) { TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, "expected %s, got: %s", rd_kafka_err2name( RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE), rd_kafka_err2str(rd_kafka_error_code(error))); } else { TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR_NO_ERROR, "expected \"Success\", found: %s", rd_kafka_err2str(rd_kafka_error_code(error))); } rd_kafka_error_destroy(error); /* All done */ rd_kafka_destroy(rk); SUB_TEST_PASS(); } int main_0105_transactions_mock(int argc, char **argv) { if (test_needs_auth()) { TEST_SKIP("Mock cluster does not support SSL/SASL\n"); return 0; } do_test_txn_recoverable_errors(); do_test_txn_fatal_idempo_errors(); do_test_txn_fenced_reinit(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH); do_test_txn_fenced_reinit(RD_KAFKA_RESP_ERR_PRODUCER_FENCED); do_test_txn_req_cnt(); do_test_txn_requires_abort_errors(); do_test_txn_slow_reinit(rd_false); do_test_txn_slow_reinit(rd_true); /* Just do a subset of tests in quick mode */ if (test_quick) return 0; do_test_txn_endtxn_errors(); do_test_txn_endtxn_infinite(); do_test_txn_endtxn_timeout(); do_test_txn_endtxn_timeout_inflight(); /* Bring down the coordinator */ do_test_txn_broker_down_in_txn(rd_true); /* Bring down partition leader */ do_test_txn_broker_down_in_txn(rd_false); do_test_txns_not_supported(); do_test_txns_send_offsets_concurrent_is_retried(); do_test_txns_send_offsets_non_eligible(); do_test_txn_coord_req_destroy(); do_test_txn_coord_req_multi_find(); do_test_txn_addparts_req_multi(); do_test_txns_no_timeout_crash(); do_test_txn_auth_failure( RD_KAFKAP_InitProducerId, RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED); do_test_txn_auth_failure( RD_KAFKAP_FindCoordinator, RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED); do_test_txn_flush_timeout(); do_test_unstable_offset_commit(); do_test_commit_after_msg_timeout(); do_test_txn_switch_coordinator(); do_test_txn_switch_coordinator_refresh(); do_test_out_of_order_seq(); do_test_topic_disappears_for_awhile(); do_test_disconnected_group_coord(rd_false); do_test_disconnected_group_coord(rd_true); do_test_txn_coordinator_null_not_fatal(); do_test_txn_resumable_calls_timeout(rd_true); do_test_txn_resumable_calls_timeout(rd_false); do_test_txn_resumable_calls_timeout_error(rd_true); do_test_txn_resumable_calls_timeout_error(rd_false); do_test_txn_resumable_init(); do_test_txn_concurrent_operations(rd_true /*commit*/); do_test_txn_concurrent_operations(rd_false /*abort*/); do_test_txn_fenced_abort(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH); do_test_txn_fenced_abort(RD_KAFKA_RESP_ERR_PRODUCER_FENCED); do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_true); do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_false); return 0; }