summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0105-transactions_mock.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:23 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-07-24 09:54:44 +0000
commit836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch)
tree1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/lib/librdkafka-2.1.0/tests/0105-transactions_mock.c
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz
netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0105-transactions_mock.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/0105-transactions_mock.c3926
1 files changed, 0 insertions, 3926 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0105-transactions_mock.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0105-transactions_mock.c
deleted file mode 100644
index 014642df1..000000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/0105-transactions_mock.c
+++ /dev/null
@@ -1,3926 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2019, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "test.h"
-
-#include "rdkafka.h"
-
-#include "../src/rdkafka_proto.h"
-#include "../src/rdstring.h"
-#include "../src/rdunittest.h"
-
-#include <stdarg.h>
-
-
-/**
- * @name Producer transaction tests using the mock cluster
- *
- */
-
-
-static int allowed_error;
-
-/**
- * @brief Decide what error_cb's will cause the test to fail.
- */
-static int
-error_is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) {
- if (err == allowed_error ||
- /* If transport errors are allowed then it is likely
- * that we'll also see ALL_BROKERS_DOWN. */
- (allowed_error == RD_KAFKA_RESP_ERR__TRANSPORT &&
- err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)) {
- TEST_SAY("Ignoring allowed error: %s: %s\n",
- rd_kafka_err2name(err), reason);
- return 0;
- }
- return 1;
-}
-
-
-static rd_kafka_resp_err_t (*on_response_received_cb)(rd_kafka_t *rk,
- int sockfd,
- const char *brokername,
- int32_t brokerid,
- int16_t ApiKey,
- int16_t ApiVersion,
- int32_t CorrId,
- size_t size,
- int64_t rtt,
- rd_kafka_resp_err_t err,
- void *ic_opaque);
-
-/**
- * @brief Simple on_response_received interceptor that simply calls the
- * sub-test's on_response_received_cb function, if set.
- */
-static rd_kafka_resp_err_t
-on_response_received_trampoline(rd_kafka_t *rk,
- int sockfd,
- const char *brokername,
- int32_t brokerid,
- int16_t ApiKey,
- int16_t ApiVersion,
- int32_t CorrId,
- size_t size,
- int64_t rtt,
- rd_kafka_resp_err_t err,
- void *ic_opaque) {
- TEST_ASSERT(on_response_received_cb != NULL, "");
- return on_response_received_cb(rk, sockfd, brokername, brokerid, ApiKey,
- ApiVersion, CorrId, size, rtt, err,
- ic_opaque);
-}
-
-
-/**
- * @brief on_new interceptor to add an on_response_received interceptor.
- */
-static rd_kafka_resp_err_t on_new_producer(rd_kafka_t *rk,
- const rd_kafka_conf_t *conf,
- void *ic_opaque,
- char *errstr,
- size_t errstr_size) {
- rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
-
- if (on_response_received_cb)
- err = rd_kafka_interceptor_add_on_response_received(
- rk, "on_response_received", on_response_received_trampoline,
- ic_opaque);
-
- return err;
-}
-
-
-/**
- * @brief Create a transactional producer and a mock cluster.
- *
- * The var-arg list is a NULL-terminated list of
- * (const char *key, const char *value) config properties.
- *
- * Special keys:
- * "on_response_received", "" - enable the on_response_received_cb
- * interceptor,
- * which must be assigned prior to
- * calling create_tnx_producer().
- */
-static RD_SENTINEL rd_kafka_t *
-create_txn_producer(rd_kafka_mock_cluster_t **mclusterp,
- const char *transactional_id,
- int broker_cnt,
- ...) {
- rd_kafka_conf_t *conf;
- rd_kafka_t *rk;
- char numstr[8];
- va_list ap;
- const char *key;
- rd_bool_t add_interceptors = rd_false;
-
- rd_snprintf(numstr, sizeof(numstr), "%d", broker_cnt);
-
- test_conf_init(&conf, NULL, 60);
-
- test_conf_set(conf, "transactional.id", transactional_id);
- /* When mock brokers are set to down state they're still binding
- * the port, just not listening to it, which makes connection attempts
- * stall until socket.connection.setup.timeout.ms expires.
- * To speed up detection of brokers being down we reduce this timeout
- * to just a couple of seconds. */
- test_conf_set(conf, "socket.connection.setup.timeout.ms", "5000");
- /* Speed up reconnects */
- test_conf_set(conf, "reconnect.backoff.max.ms", "2000");
- test_conf_set(conf, "test.mock.num.brokers", numstr);
- rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
-
- test_curr->ignore_dr_err = rd_false;
-
- va_start(ap, broker_cnt);
- while ((key = va_arg(ap, const char *))) {
- if (!strcmp(key, "on_response_received")) {
- add_interceptors = rd_true;
- (void)va_arg(ap, const char *);
- } else {
- test_conf_set(conf, key, va_arg(ap, const char *));
- }
- }
- va_end(ap);
-
- /* Add an on_.. interceptors */
- if (add_interceptors)
- rd_kafka_conf_interceptor_add_on_new(conf, "on_new_producer",
- on_new_producer, NULL);
-
- rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
-
- if (mclusterp) {
- *mclusterp = rd_kafka_handle_mock_cluster(rk);
- TEST_ASSERT(*mclusterp, "failed to create mock cluster");
-
- /* Create some of the common consumer "input" topics
- * that we must be able to commit to with
- * send_offsets_to_transaction().
- * The number depicts the number of partitions in the topic. */
- TEST_CALL_ERR__(
- rd_kafka_mock_topic_create(*mclusterp, "srctopic4", 4, 1));
- TEST_CALL_ERR__(rd_kafka_mock_topic_create(
- *mclusterp, "srctopic64", 64, 1));
- }
-
- return rk;
-}
-
-
-/**
- * @brief Test recoverable errors using mock broker error injections
- * and code coverage checks.
- */
-static void do_test_txn_recoverable_errors(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
- const char *groupid = "myGroupId";
- const char *txnid = "myTxnId";
-
- SUB_TEST_QUICK();
-
- rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
- NULL);
-
- /* Make sure transaction and group coordinators are different.
- * This verifies that AddOffsetsToTxnRequest isn't sent to the
- * transaction coordinator but the group coordinator. */
- rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
- rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 2);
-
- /*
- * Inject som InitProducerId errors that causes retries
- */
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_InitProducerId, 3,
- RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- (void)RD_UT_COVERAGE_CHECK(0); /* idemp_request_pid_failed(retry) */
- (void)RD_UT_COVERAGE_CHECK(1); /* txn_idemp_state_change(READY) */
-
- /*
- * Start a transaction
- */
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
-
- /* Produce a message without error first */
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- rd_kafka_flush(rk, -1);
-
- /*
- * Produce a message, let it fail with a non-idempo/non-txn
- * retryable error
- */
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_Produce, 1,
- RD_KAFKA_RESP_ERR_NOT_ENOUGH_REPLICAS);
-
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- /* Make sure messages are produced */
- rd_kafka_flush(rk, -1);
-
- /*
- * Send some arbitrary offsets, first with some failures, then
- * succeed.
- */
- offsets = rd_kafka_topic_partition_list_new(4);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
- rd_kafka_topic_partition_list_add(offsets, "srctopic64", 39)->offset =
- 999999111;
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset =
- 999;
- rd_kafka_topic_partition_list_add(offsets, "srctopic64", 19)->offset =
- 123456789;
-
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_AddPartitionsToTxn, 1,
- RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
-
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_TxnOffsetCommit, 2,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
-
- cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
-
- TEST_CALL_ERROR__(
- rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
-
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
-
- /*
- * Commit transaction, first with som failures, then succeed.
- */
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_EndTxn, 3,
- RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS);
-
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
-
- /* All done */
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief KIP-360: Test that fatal idempotence errors triggers abortable
- * transaction errors and that the producer can recover.
- */
-static void do_test_txn_fatal_idempo_errors(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_error_t *error;
- const char *txnid = "myTxnId";
-
- SUB_TEST_QUICK();
-
- rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
- NULL);
-
- test_curr->ignore_dr_err = rd_true;
- test_curr->is_fatal_cb = error_is_fatal_cb;
- allowed_error = RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID;
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- /*
- * Start a transaction
- */
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
-
- /* Produce a message without error first */
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- /* Produce a message, let it fail with a fatal idempo error. */
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_Produce, 1,
- RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
-
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- /* Commit the transaction, should fail */
- error = rd_kafka_commit_transaction(rk, -1);
- TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
-
- TEST_SAY("commit_transaction() failed (expectedly): %s\n",
- rd_kafka_error_string(error));
-
- TEST_ASSERT(!rd_kafka_error_is_fatal(error),
- "Did not expect fatal error");
- TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
- "Expected abortable error");
- rd_kafka_error_destroy(error);
-
- /* Abort the transaction */
- TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
-
- /* Run a new transaction without errors to verify that the
- * producer can recover. */
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
-
- /* All done */
-
- rd_kafka_destroy(rk);
-
- allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief KIP-360: Test that fatal idempotence errors triggers abortable
- * transaction errors, but let the broker-side bumping of the
- * producer PID take longer than the remaining transaction timeout
- * which should raise a retriable error from abort_transaction().
- *
- * @param with_sleep After the first abort sleep longer than it takes to
- * re-init the pid so that the internal state automatically
- * transitions.
- */
-static void do_test_txn_slow_reinit(rd_bool_t with_sleep) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_error_t *error;
- int32_t txn_coord = 2;
- const char *txnid = "myTxnId";
- test_timing_t timing;
-
- SUB_TEST("%s sleep", with_sleep ? "with" : "without");
-
- rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
- NULL);
-
- rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
- txn_coord);
-
- test_curr->ignore_dr_err = rd_true;
- test_curr->is_fatal_cb = NULL;
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
-
- /*
- * Start a transaction
- */
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
-
- /* Produce a message without error first */
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- test_flush(rk, -1);
-
- /* Set transaction coordinator latency higher than
- * the abort_transaction() call timeout so that the automatic
- * re-initpid takes longer than abort_transaction(). */
- rd_kafka_mock_broker_push_request_error_rtts(
- mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1,
- RD_KAFKA_RESP_ERR_NO_ERROR, 10000 /*10s*/);
-
- /* Produce a message, let it fail with a fatal idempo error. */
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_Produce, 1,
- RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
-
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
-
- /* Commit the transaction, should fail */
- TIMING_START(&timing, "commit_transaction(-1)");
- error = rd_kafka_commit_transaction(rk, -1);
- TIMING_STOP(&timing);
- TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
-
- TEST_SAY("commit_transaction() failed (expectedly): %s\n",
- rd_kafka_error_string(error));
-
- TEST_ASSERT(!rd_kafka_error_is_fatal(error),
- "Did not expect fatal error");
- TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
- "Expected abortable error");
- rd_kafka_error_destroy(error);
-
- /* Abort the transaction, should fail with retriable (timeout) error */
- TIMING_START(&timing, "abort_transaction(100)");
- error = rd_kafka_abort_transaction(rk, 100);
- TIMING_STOP(&timing);
- TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
-
- TEST_SAY("First abort_transaction() failed: %s\n",
- rd_kafka_error_string(error));
- TEST_ASSERT(!rd_kafka_error_is_fatal(error),
- "Did not expect fatal error");
- TEST_ASSERT(rd_kafka_error_is_retriable(error),
- "Expected retriable error");
- rd_kafka_error_destroy(error);
-
- if (with_sleep)
- rd_sleep(12);
-
- /* Retry abort, should now finish. */
- TEST_SAY("Retrying abort\n");
- TIMING_START(&timing, "abort_transaction(-1)");
- TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
- TIMING_STOP(&timing);
-
- /* Run a new transaction without errors to verify that the
- * producer can recover. */
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
-
- /* All done */
-
- rd_kafka_destroy(rk);
-
- allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
-
- SUB_TEST_PASS();
-}
-
-
-
-/**
- * @brief KIP-360: Test that fatal idempotence errors triggers abortable
- * transaction errors, but let the broker-side bumping of the
- * producer PID fail with a fencing error.
- * Should raise a fatal error.
- *
- * @param error_code Which error code InitProducerIdRequest should fail with.
- * Either RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH (older)
- * or RD_KAFKA_RESP_ERR_PRODUCER_FENCED (newer).
- */
-static void do_test_txn_fenced_reinit(rd_kafka_resp_err_t error_code) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_error_t *error;
- int32_t txn_coord = 2;
- const char *txnid = "myTxnId";
- char errstr[512];
- rd_kafka_resp_err_t fatal_err;
-
- SUB_TEST_QUICK("With error %s", rd_kafka_err2name(error_code));
-
- rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
- NULL);
-
- rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
- txn_coord);
-
- test_curr->ignore_dr_err = rd_true;
- test_curr->is_fatal_cb = error_is_fatal_cb;
- allowed_error = RD_KAFKA_RESP_ERR__FENCED;
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
-
- /*
- * Start a transaction
- */
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
-
- /* Produce a message without error first */
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- test_flush(rk, -1);
-
- /* Fail the PID reinit */
- rd_kafka_mock_broker_push_request_error_rtts(
- mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1, error_code, 0);
-
- /* Produce a message, let it fail with a fatal idempo error. */
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_Produce, 1,
- RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
-
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- test_flush(rk, -1);
-
- /* Abort the transaction, should fail with a fatal error */
- error = rd_kafka_abort_transaction(rk, -1);
- TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
-
- TEST_SAY("abort_transaction() failed: %s\n",
- rd_kafka_error_string(error));
- TEST_ASSERT(rd_kafka_error_is_fatal(error), "Expected a fatal error");
- rd_kafka_error_destroy(error);
-
- fatal_err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
- TEST_ASSERT(fatal_err, "Expected a fatal error to have been raised");
- TEST_SAY("Fatal error: %s: %s\n", rd_kafka_err2name(fatal_err), errstr);
-
- /* All done */
-
- rd_kafka_destroy(rk);
-
- allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Test EndTxn errors.
- */
-static void do_test_txn_endtxn_errors(void) {
- rd_kafka_t *rk = NULL;
- rd_kafka_mock_cluster_t *mcluster = NULL;
- rd_kafka_resp_err_t err;
- struct {
- size_t error_cnt;
- rd_kafka_resp_err_t errors[4];
- rd_kafka_resp_err_t exp_err;
- rd_bool_t exp_retriable;
- rd_bool_t exp_abortable;
- rd_bool_t exp_fatal;
- rd_bool_t exp_successful_abort;
- } scenario[] = {
- /* This list of errors is from the EndTxnResponse handler in
- * AK clients/.../TransactionManager.java */
- {
- /* #0 */
- 2,
- {RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
- RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE},
- /* Should auto-recover */
- RD_KAFKA_RESP_ERR_NO_ERROR,
- },
- {
- /* #1 */
- 2,
- {RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR},
- /* Should auto-recover */
- RD_KAFKA_RESP_ERR_NO_ERROR,
- },
- {
- /* #2 */
- 1,
- {RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS},
- /* Should auto-recover */
- RD_KAFKA_RESP_ERR_NO_ERROR,
- },
- {
- /* #3 */
- 3,
- {RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
- RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
- RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS},
- /* Should auto-recover */
- RD_KAFKA_RESP_ERR_NO_ERROR,
- },
- {
- /* #4: the abort is auto-recovering thru epoch bump */
- 1,
- {RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID},
- RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID,
- rd_false /* !retriable */,
- rd_true /* abortable */,
- rd_false /* !fatal */,
- rd_true /* successful abort */
- },
- {
- /* #5: the abort is auto-recovering thru epoch bump */
- 1,
- {RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING},
- RD_KAFKA_RESP_ERR_INVALID_PRODUCER_ID_MAPPING,
- rd_false /* !retriable */,
- rd_true /* abortable */,
- rd_false /* !fatal */,
- rd_true /* successful abort */
- },
- {
- /* #6 */
- 1,
- {RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH},
- /* This error is normalized */
- RD_KAFKA_RESP_ERR__FENCED,
- rd_false /* !retriable */,
- rd_false /* !abortable */,
- rd_true /* fatal */
- },
- {
- /* #7 */
- 1,
- {RD_KAFKA_RESP_ERR_PRODUCER_FENCED},
- /* This error is normalized */
- RD_KAFKA_RESP_ERR__FENCED,
- rd_false /* !retriable */,
- rd_false /* !abortable */,
- rd_true /* fatal */
- },
- {
- /* #8 */
- 1,
- {RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED},
- RD_KAFKA_RESP_ERR_TRANSACTIONAL_ID_AUTHORIZATION_FAILED,
- rd_false /* !retriable */,
- rd_false /* !abortable */,
- rd_true /* fatal */
- },
- {
- /* #9 */
- 1,
- {RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED},
- RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
- rd_false /* !retriable */,
- rd_true /* abortable */,
- rd_false /* !fatal */
- },
- {
- /* #10 */
- /* Any other error should raise a fatal error */
- 1,
- {RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE},
- RD_KAFKA_RESP_ERR_INVALID_MSG_SIZE,
- rd_false /* !retriable */,
- rd_true /* abortable */,
- rd_false /* !fatal */,
- },
- {
- /* #11 */
- 1,
- {RD_KAFKA_RESP_ERR_PRODUCER_FENCED},
- /* This error is normalized */
- RD_KAFKA_RESP_ERR__FENCED,
- rd_false /* !retriable */,
- rd_false /* !abortable */,
- rd_true /* fatal */
- },
- {0},
- };
- int i;
-
- SUB_TEST_QUICK();
-
- for (i = 0; scenario[i].error_cnt > 0; i++) {
- int j;
- /* For each scenario, test:
- * commit_transaction()
- * flush() + commit_transaction()
- * abort_transaction()
- * flush() + abort_transaction()
- */
- for (j = 0; j < (2 + 2); j++) {
- rd_bool_t commit = j < 2;
- rd_bool_t with_flush = j & 1;
- rd_bool_t exp_successful_abort =
- !commit && scenario[i].exp_successful_abort;
- const char *commit_str =
- commit ? (with_flush ? "commit&flush" : "commit")
- : (with_flush ? "abort&flush" : "abort");
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
- rd_kafka_error_t *error;
- test_timing_t t_call;
-
- TEST_SAY("Testing scenario #%d %s with %" PRIusz
- " injected erorrs, expecting %s\n",
- i, commit_str, scenario[i].error_cnt,
- exp_successful_abort
- ? "successful abort"
- : rd_kafka_err2name(scenario[i].exp_err));
-
- if (!rk) {
- const char *txnid = "myTxnId";
- rk = create_txn_producer(&mcluster, txnid, 3,
- NULL);
- TEST_CALL_ERROR__(
- rd_kafka_init_transactions(rk, 5000));
- }
-
- /*
- * Start transaction
- */
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- /* Transaction aborts will cause DR errors:
- * ignore them. */
- test_curr->ignore_dr_err = !commit;
-
- /*
- * Produce a message.
- */
- err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
- RD_KAFKA_V_VALUE("hi", 2),
- RD_KAFKA_V_END);
- TEST_ASSERT(!err, "produce failed: %s",
- rd_kafka_err2str(err));
-
- if (with_flush)
- test_flush(rk, -1);
-
- /*
- * Send some arbitrary offsets.
- */
- offsets = rd_kafka_topic_partition_list_new(4);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4",
- 3)
- ->offset = 12;
- rd_kafka_topic_partition_list_add(offsets, "srctopic64",
- 60)
- ->offset = 99999;
-
- cgmetadata =
- rd_kafka_consumer_group_metadata_new("mygroupid");
-
- TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
- rk, offsets, cgmetadata, -1));
-
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
-
- /*
- * Commit transaction, first with som failures,
- * then succeed.
- */
- rd_kafka_mock_push_request_errors_array(
- mcluster, RD_KAFKAP_EndTxn, scenario[i].error_cnt,
- scenario[i].errors);
-
- TIMING_START(&t_call, "%s", commit_str);
- if (commit)
- error = rd_kafka_commit_transaction(
- rk, tmout_multip(5000));
- else
- error = rd_kafka_abort_transaction(
- rk, tmout_multip(5000));
- TIMING_STOP(&t_call);
-
- if (error)
- TEST_SAY(
- "Scenario #%d %s failed: %s: %s "
- "(retriable=%s, req_abort=%s, "
- "fatal=%s)\n",
- i, commit_str, rd_kafka_error_name(error),
- rd_kafka_error_string(error),
- RD_STR_ToF(
- rd_kafka_error_is_retriable(error)),
- RD_STR_ToF(
- rd_kafka_error_txn_requires_abort(
- error)),
- RD_STR_ToF(rd_kafka_error_is_fatal(error)));
- else
- TEST_SAY("Scenario #%d %s succeeded\n", i,
- commit_str);
-
- if (!scenario[i].exp_err || exp_successful_abort) {
- TEST_ASSERT(!error,
- "Expected #%d %s to succeed, "
- "got %s",
- i, commit_str,
- rd_kafka_error_string(error));
- continue;
- }
-
-
- TEST_ASSERT(error != NULL, "Expected #%d %s to fail", i,
- commit_str);
- TEST_ASSERT(scenario[i].exp_err ==
- rd_kafka_error_code(error),
- "Scenario #%d: expected %s, not %s", i,
- rd_kafka_err2name(scenario[i].exp_err),
- rd_kafka_error_name(error));
- TEST_ASSERT(
- scenario[i].exp_retriable ==
- (rd_bool_t)rd_kafka_error_is_retriable(error),
- "Scenario #%d: retriable mismatch", i);
- TEST_ASSERT(
- scenario[i].exp_abortable ==
- (rd_bool_t)rd_kafka_error_txn_requires_abort(
- error),
- "Scenario #%d: abortable mismatch", i);
- TEST_ASSERT(
- scenario[i].exp_fatal ==
- (rd_bool_t)rd_kafka_error_is_fatal(error),
- "Scenario #%d: fatal mismatch", i);
-
- /* Handle errors according to the error flags */
- if (rd_kafka_error_is_fatal(error)) {
- TEST_SAY("Fatal error, destroying producer\n");
- rd_kafka_error_destroy(error);
- rd_kafka_destroy(rk);
- rk = NULL; /* Will be re-created on the next
- * loop iteration. */
-
- } else if (rd_kafka_error_txn_requires_abort(error)) {
- rd_kafka_error_destroy(error);
- TEST_SAY(
- "Abortable error, "
- "aborting transaction\n");
- TEST_CALL_ERROR__(
- rd_kafka_abort_transaction(rk, -1));
-
- } else if (rd_kafka_error_is_retriable(error)) {
- rd_kafka_error_destroy(error);
- TEST_SAY("Retriable error, retrying %s once\n",
- commit_str);
- if (commit)
- TEST_CALL_ERROR__(
- rd_kafka_commit_transaction(rk,
- 5000));
- else
- TEST_CALL_ERROR__(
- rd_kafka_abort_transaction(rk,
- 5000));
- } else {
- TEST_FAIL(
- "Scenario #%d %s: "
- "Permanent error without enough "
- "hints to proceed: %s\n",
- i, commit_str,
- rd_kafka_error_string(error));
- }
- }
- }
-
- /* All done */
- if (rk)
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Test that the commit/abort works properly with infinite timeout.
- */
-static void do_test_txn_endtxn_infinite(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster = NULL;
- const char *txnid = "myTxnId";
- int i;
-
- SUB_TEST_QUICK();
-
- rk = create_txn_producer(&mcluster, txnid, 3, NULL);
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- for (i = 0; i < 2; i++) {
- rd_bool_t commit = i == 0;
- const char *commit_str = commit ? "commit" : "abort";
- rd_kafka_error_t *error;
- test_timing_t t_call;
-
- /* Messages will fail on as the transaction fails,
- * ignore the DR error */
- test_curr->ignore_dr_err = rd_true;
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2),
- RD_KAFKA_V_END));
-
- /*
- * Commit/abort transaction, first with som retriable failures,
- * then success.
- */
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_EndTxn, 10,
- RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
-
- rd_sleep(1);
-
- TIMING_START(&t_call, "%s_transaction()", commit_str);
- if (commit)
- error = rd_kafka_commit_transaction(rk, -1);
- else
- error = rd_kafka_abort_transaction(rk, -1);
- TIMING_STOP(&t_call);
-
- TEST_SAY("%s returned %s\n", commit_str,
- error ? rd_kafka_error_string(error) : "success");
-
- TEST_ASSERT(!error, "Expected %s to succeed, got %s",
- commit_str, rd_kafka_error_string(error));
- }
-
- /* All done */
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-
-/**
- * @brief Test that the commit/abort user timeout is honoured.
- */
-static void do_test_txn_endtxn_timeout(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster = NULL;
- const char *txnid = "myTxnId";
- int i;
-
- SUB_TEST_QUICK();
-
- rk = create_txn_producer(&mcluster, txnid, 3, NULL);
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- for (i = 0; i < 2; i++) {
- rd_bool_t commit = i == 0;
- const char *commit_str = commit ? "commit" : "abort";
- rd_kafka_error_t *error;
- test_timing_t t_call;
-
- /* Messages will fail as the transaction fails,
- * ignore the DR error */
- test_curr->ignore_dr_err = rd_true;
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2),
- RD_KAFKA_V_END));
-
- /*
- * Commit/abort transaction, first with some retriable failures
- * whos retries exceed the user timeout.
- */
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_EndTxn, 10,
- RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
-
- rd_sleep(1);
-
- TIMING_START(&t_call, "%s_transaction()", commit_str);
- if (commit)
- error = rd_kafka_commit_transaction(rk, 100);
- else
- error = rd_kafka_abort_transaction(rk, 100);
- TIMING_STOP(&t_call);
-
- TEST_SAY_ERROR(error, "%s returned: ", commit_str);
- TEST_ASSERT(error != NULL, "Expected %s to fail", commit_str);
- TEST_ASSERT(
- rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
- "Expected %s to fail with timeout, not %s: %s", commit_str,
- rd_kafka_error_name(error), rd_kafka_error_string(error));
- TEST_ASSERT(rd_kafka_error_is_retriable(error),
- "%s failure should raise a retriable error",
- commit_str);
- rd_kafka_error_destroy(error);
-
- /* Now call it again with an infinite timeout, should work. */
- TIMING_START(&t_call, "%s_transaction() nr 2", commit_str);
- if (commit)
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
- else
- TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
- TIMING_STOP(&t_call);
- }
-
- /* All done */
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-
-/**
- * @brief Test commit/abort inflight timeout behaviour, which should result
- * in a retriable error.
- */
-static void do_test_txn_endtxn_timeout_inflight(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster = NULL;
- const char *txnid = "myTxnId";
- int32_t coord_id = 1;
- int i;
-
- SUB_TEST();
-
- allowed_error = RD_KAFKA_RESP_ERR__TIMED_OUT;
- test_curr->is_fatal_cb = error_is_fatal_cb;
-
- rk = create_txn_producer(&mcluster, txnid, 1, "transaction.timeout.ms",
- "5000", NULL);
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
-
- for (i = 0; i < 2; i++) {
- rd_bool_t commit = i == 0;
- const char *commit_str = commit ? "commit" : "abort";
- rd_kafka_error_t *error;
- test_timing_t t_call;
-
- /* Messages will fail as the transaction fails,
- * ignore the DR error */
- test_curr->ignore_dr_err = rd_true;
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_VALUE("hi", 2),
- RD_KAFKA_V_END));
-
- /* Let EndTxn & EndTxn retry timeout */
- rd_kafka_mock_broker_push_request_error_rtts(
- mcluster, coord_id, RD_KAFKAP_EndTxn, 2,
- RD_KAFKA_RESP_ERR_NO_ERROR, 10000,
- RD_KAFKA_RESP_ERR_NO_ERROR, 10000);
-
- rd_sleep(1);
-
- TIMING_START(&t_call, "%s_transaction()", commit_str);
- if (commit)
- error = rd_kafka_commit_transaction(rk, 4000);
- else
- error = rd_kafka_abort_transaction(rk, 4000);
- TIMING_STOP(&t_call);
-
- TEST_SAY_ERROR(error, "%s returned: ", commit_str);
- TEST_ASSERT(error != NULL, "Expected %s to fail", commit_str);
- TEST_ASSERT(
- rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
- "Expected %s to fail with timeout, not %s: %s", commit_str,
- rd_kafka_error_name(error), rd_kafka_error_string(error));
- TEST_ASSERT(rd_kafka_error_is_retriable(error),
- "%s failure should raise a retriable error",
- commit_str);
- rd_kafka_error_destroy(error);
-
- /* Now call it again with an infinite timeout, should work. */
- TIMING_START(&t_call, "%s_transaction() nr 2", commit_str);
- if (commit)
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
- else
- TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
- TIMING_STOP(&t_call);
- }
-
- /* All done */
-
- rd_kafka_destroy(rk);
-
- allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
- test_curr->is_fatal_cb = NULL;
-
- SUB_TEST_PASS();
-}
-
-
-
-/**
- * @brief Test that EndTxn is properly sent for aborted transactions
- * even if AddOffsetsToTxnRequest was retried.
- * This is a check for a txn_req_cnt bug.
- */
-static void do_test_txn_req_cnt(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
- const char *txnid = "myTxnId";
-
- SUB_TEST_QUICK();
-
- rk = create_txn_producer(&mcluster, txnid, 3, NULL);
-
- /* Messages will fail on abort(), ignore the DR error */
- test_curr->ignore_dr_err = rd_true;
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- /*
- * Send some arbitrary offsets, first with some failures, then
- * succeed.
- */
- offsets = rd_kafka_topic_partition_list_new(2);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
- rd_kafka_topic_partition_list_add(offsets, "srctopic64", 40)->offset =
- 999999111;
-
- rd_kafka_mock_push_request_errors(mcluster, RD_KAFKAP_AddOffsetsToTxn,
- 2,
- RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
- RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
-
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_TxnOffsetCommit, 2,
- RD_KAFKA_RESP_ERR_COORDINATOR_LOAD_IN_PROGRESS,
- RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
-
- cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
-
- TEST_CALL_ERROR__(
- rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
-
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
-
- TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
-
- /* All done */
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Test abortable errors using mock broker error injections
- * and code coverage checks.
- */
-static void do_test_txn_requires_abort_errors(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_error_t *error;
- rd_kafka_resp_err_t err;
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
- int r;
-
- SUB_TEST_QUICK();
-
- rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
-
- test_curr->ignore_dr_err = rd_true;
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- /*
- * 1. Fail on produce
- */
- TEST_SAY("1. Fail on produce\n");
-
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_Produce, 1,
- RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
-
- err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
- TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
-
- /* Wait for messages to fail */
- test_flush(rk, 5000);
-
- /* Any other transactional API should now raise an error */
- offsets = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
-
- cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
-
- error =
- rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
-
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
- TEST_ASSERT(error, "expected error");
- TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
- "expected abortable error, not %s",
- rd_kafka_error_string(error));
- TEST_SAY("Error %s: %s\n", rd_kafka_error_name(error),
- rd_kafka_error_string(error));
- rd_kafka_error_destroy(error);
-
- TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
-
- /*
- * 2. Restart transaction and fail on AddPartitionsToTxn
- */
- TEST_SAY("2. Fail on AddPartitionsToTxn\n");
-
- /* First refresh proper Metadata to clear the topic's auth error,
- * otherwise the produce() below will fail immediately. */
- r = test_get_partition_count(rk, "mytopic", 5000);
- TEST_ASSERT(r > 0, "Expected topic %s to exist", "mytopic");
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_AddPartitionsToTxn, 1,
- RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
-
- err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
- TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
-
- error = rd_kafka_commit_transaction(rk, 5000);
- TEST_ASSERT(error, "commit_transaction should have failed");
- TEST_SAY("commit_transaction() error %s: %s\n",
- rd_kafka_error_name(error), rd_kafka_error_string(error));
- rd_kafka_error_destroy(error);
-
- TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
-
- /*
- * 3. Restart transaction and fail on AddOffsetsToTxn
- */
- TEST_SAY("3. Fail on AddOffsetsToTxn\n");
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
- TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
-
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_AddOffsetsToTxn, 1,
- RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED);
-
- offsets = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
- cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
-
- error =
- rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
- TEST_ASSERT(error, "Expected send_offsets..() to fail");
- TEST_ASSERT(rd_kafka_error_code(error) ==
- RD_KAFKA_RESP_ERR_GROUP_AUTHORIZATION_FAILED,
- "expected send_offsets_to_transaction() to fail with "
- "group auth error: not %s",
- rd_kafka_error_name(error));
- rd_kafka_error_destroy(error);
-
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
-
-
- error = rd_kafka_commit_transaction(rk, 5000);
- TEST_ASSERT(error, "commit_transaction should have failed");
- rd_kafka_error_destroy(error);
-
- TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
-
- /* All done */
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Test error handling and recover for when broker goes down during
- * an ongoing transaction.
- */
-static void do_test_txn_broker_down_in_txn(rd_bool_t down_coord) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- int32_t coord_id, leader_id, down_id;
- const char *down_what;
- rd_kafka_resp_err_t err;
- const char *topic = "test";
- const char *transactional_id = "txnid";
- int msgcnt = 1000;
- int remains = 0;
-
- /* Assign coordinator and leader to two different brokers */
- coord_id = 1;
- leader_id = 2;
- if (down_coord) {
- down_id = coord_id;
- down_what = "coordinator";
- } else {
- down_id = leader_id;
- down_what = "leader";
- }
-
- SUB_TEST_QUICK("Test %s down", down_what);
-
- rk = create_txn_producer(&mcluster, transactional_id, 3, NULL);
-
- /* Broker down is not a test-failing error */
- allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
- test_curr->is_fatal_cb = error_is_fatal_cb;
-
- err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
- TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
-
- rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
- coord_id);
- rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
-
- /* Start transactioning */
- TEST_SAY("Starting transaction\n");
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
- msgcnt / 2, NULL, 0, &remains);
-
- TEST_SAY("Bringing down %s %" PRId32 "\n", down_what, down_id);
- rd_kafka_mock_broker_set_down(mcluster, down_id);
-
- rd_kafka_flush(rk, 3000);
-
- /* Produce remaining messages */
- test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
- msgcnt / 2, msgcnt / 2, NULL, 0, &remains);
-
- rd_sleep(2);
-
- TEST_SAY("Bringing up %s %" PRId32 "\n", down_what, down_id);
- rd_kafka_mock_broker_set_up(mcluster, down_id);
-
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
-
- TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains);
-
- rd_kafka_destroy(rk);
-
- test_curr->is_fatal_cb = NULL;
-
- SUB_TEST_PASS();
-}
-
-
-
-/**
- * @brief Advance the coord_id to the next broker.
- */
-static void set_next_coord(rd_kafka_mock_cluster_t *mcluster,
- const char *transactional_id,
- int broker_cnt,
- int32_t *coord_idp) {
- int32_t new_coord_id;
-
- new_coord_id = 1 + ((*coord_idp) % (broker_cnt));
- TEST_SAY("Changing transaction coordinator from %" PRId32 " to %" PRId32
- "\n",
- *coord_idp, new_coord_id);
- rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
- new_coord_id);
-
- *coord_idp = new_coord_id;
-}
-
-/**
- * @brief Switch coordinator during a transaction.
- *
- */
-static void do_test_txn_switch_coordinator(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- int32_t coord_id;
- const char *topic = "test";
- const char *transactional_id = "txnid";
- const int broker_cnt = 5;
- const int iterations = 20;
- int i;
-
- test_timeout_set(iterations * 10);
-
- SUB_TEST("Test switching coordinators");
-
- rk = create_txn_producer(&mcluster, transactional_id, broker_cnt, NULL);
-
- coord_id = 1;
- rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
- coord_id);
-
- /* Start transactioning */
- TEST_SAY("Starting transaction\n");
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- for (i = 0; i < iterations; i++) {
- const int msgcnt = 100;
- int remains = 0;
-
- set_next_coord(mcluster, transactional_id, broker_cnt,
- &coord_id);
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
- msgcnt / 2, NULL, 0);
-
- if (!(i % 3))
- set_next_coord(mcluster, transactional_id, broker_cnt,
- &coord_id);
-
- /* Produce remaining messages */
- test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA,
- msgcnt / 2, msgcnt / 2, NULL, 0,
- &remains);
-
- if ((i & 1) || !(i % 8))
- set_next_coord(mcluster, transactional_id, broker_cnt,
- &coord_id);
-
-
- if (!(i % 5)) {
- test_curr->ignore_dr_err = rd_false;
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
-
- } else {
- test_curr->ignore_dr_err = rd_true;
- TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
- }
- }
-
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Switch coordinator during a transaction when AddOffsetsToTxn
- * are sent. #3571.
- */
-static void do_test_txn_switch_coordinator_refresh(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- const char *topic = "test";
- const char *transactional_id = "txnid";
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
-
- SUB_TEST("Test switching coordinators (refresh)");
-
- rk = create_txn_producer(&mcluster, transactional_id, 3, NULL);
-
- rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
- 1);
-
- /* Start transactioning */
- TEST_SAY("Starting transaction\n");
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- /* Switch the coordinator so that AddOffsetsToTxnRequest
- * will respond with NOT_COORDINATOR. */
- TEST_SAY("Switching to coordinator 2\n");
- rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
- 2);
-
- /*
- * Send some arbitrary offsets.
- */
- offsets = rd_kafka_topic_partition_list_new(4);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
- rd_kafka_topic_partition_list_add(offsets, "srctopic64", 29)->offset =
- 99999;
-
- cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
-
- TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction(
- rk, offsets, cgmetadata, 20 * 1000));
-
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
-
-
- /* Produce some messages */
- test_produce_msgs2(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, 10, NULL, 0);
-
- /* And commit the transaction */
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Test fatal error handling when transactions are not supported
- * by the broker.
- */
-static void do_test_txns_not_supported(void) {
- rd_kafka_t *rk;
- rd_kafka_conf_t *conf;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_error_t *error;
- rd_kafka_resp_err_t err;
-
- SUB_TEST_QUICK();
-
- test_conf_init(&conf, NULL, 10);
-
- test_conf_set(conf, "transactional.id", "myxnid");
- test_conf_set(conf, "bootstrap.servers", ",");
- rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);
-
- rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
-
- /* Create mock cluster */
- mcluster = rd_kafka_mock_cluster_new(rk, 3);
-
- /* Disable InitProducerId */
- rd_kafka_mock_set_apiversion(mcluster, 22 /*InitProducerId*/, -1, -1);
-
-
- rd_kafka_brokers_add(rk, rd_kafka_mock_cluster_bootstraps(mcluster));
-
-
-
- error = rd_kafka_init_transactions(rk, 5 * 1000);
- TEST_SAY("init_transactions() returned %s: %s\n",
- error ? rd_kafka_error_name(error) : "success",
- error ? rd_kafka_error_string(error) : "success");
-
- TEST_ASSERT(error, "Expected init_transactions() to fail");
- TEST_ASSERT(rd_kafka_error_code(error) ==
- RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
- "Expected init_transactions() to fail with %s, not %s: %s",
- rd_kafka_err2name(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE),
- rd_kafka_error_name(error), rd_kafka_error_string(error));
- rd_kafka_error_destroy(error);
-
- err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("test"),
- RD_KAFKA_V_KEY("test", 4), RD_KAFKA_V_END);
- TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL,
- "Expected producev() to fail with %s, not %s",
- rd_kafka_err2name(RD_KAFKA_RESP_ERR__FATAL),
- rd_kafka_err2name(err));
-
- rd_kafka_mock_cluster_destroy(mcluster);
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief CONCURRENT_TRANSACTION on AddOffsets.. should be retried.
- */
-static void do_test_txns_send_offsets_concurrent_is_retried(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_resp_err_t err;
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
-
- SUB_TEST_QUICK();
-
- rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
-
- test_curr->ignore_dr_err = rd_true;
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
- TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
-
- /* Wait for messages to be delivered */
- test_flush(rk, 5000);
-
-
- /*
- * Have AddOffsetsToTxn fail but eventually succeed due to
- * infinite retries.
- */
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_AddOffsetsToTxn,
- 1 + 5, /* first request + some retries */
- RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
- RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
- RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
- RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
- RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
- RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
-
- offsets = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
-
- cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
-
- TEST_CALL_ERROR__(
- rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
-
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
-
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
-
- /* All done */
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Verify that send_offsets_to_transaction() with no eligible offsets
- * is handled properly - the call should succeed immediately and be
- * repeatable.
- */
-static void do_test_txns_send_offsets_non_eligible(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_resp_err_t err;
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
-
- SUB_TEST_QUICK();
-
- rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
-
- test_curr->ignore_dr_err = rd_true;
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
- TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
-
- /* Wait for messages to be delivered */
- test_flush(rk, 5000);
-
- /* Empty offsets list */
- offsets = rd_kafka_topic_partition_list_new(0);
-
- cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
-
- TEST_CALL_ERROR__(
- rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
-
- /* Now call it again, should also succeed. */
- TEST_CALL_ERROR__(
- rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
-
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
-
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
-
- /* All done */
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Verify that request timeouts don't cause crash (#2913).
- */
-static void do_test_txns_no_timeout_crash(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_error_t *error;
- rd_kafka_resp_err_t err;
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
-
- SUB_TEST_QUICK();
-
- rk =
- create_txn_producer(&mcluster, "txnid", 3, "socket.timeout.ms",
- "1000", "transaction.timeout.ms", "5000", NULL);
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
- TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
-
- test_flush(rk, -1);
-
- /* Delay all broker connections */
- if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 2000)) ||
- (err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 2000)) ||
- (err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 2000)))
- TEST_FAIL("Failed to set broker RTT: %s",
- rd_kafka_err2str(err));
-
- /* send_offsets..() should now time out */
- offsets = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
- cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
-
- error =
- rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
- TEST_ASSERT(error, "Expected send_offsets..() to fail");
- TEST_SAY("send_offsets..() failed with %serror: %s\n",
- rd_kafka_error_is_retriable(error) ? "retriable " : "",
- rd_kafka_error_string(error));
- TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
- "expected send_offsets_to_transaction() to fail with "
- "timeout, not %s",
- rd_kafka_error_name(error));
- TEST_ASSERT(rd_kafka_error_is_retriable(error),
- "expected send_offsets_to_transaction() to fail with "
- "a retriable error");
- rd_kafka_error_destroy(error);
-
- /* Reset delay and try again */
- if ((err = rd_kafka_mock_broker_set_rtt(mcluster, 1, 0)) ||
- (err = rd_kafka_mock_broker_set_rtt(mcluster, 2, 0)) ||
- (err = rd_kafka_mock_broker_set_rtt(mcluster, 3, 0)))
- TEST_FAIL("Failed to reset broker RTT: %s",
- rd_kafka_err2str(err));
-
- TEST_SAY("Retrying send_offsets..()\n");
- error =
- rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
- TEST_ASSERT(!error, "Expected send_offsets..() to succeed, got: %s",
- rd_kafka_error_string(error));
-
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
-
- /* All done */
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Test auth failure handling.
- */
-static void do_test_txn_auth_failure(int16_t ApiKey,
- rd_kafka_resp_err_t ErrorCode) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_error_t *error;
-
- SUB_TEST_QUICK("ApiKey=%s ErrorCode=%s", rd_kafka_ApiKey2str(ApiKey),
- rd_kafka_err2name(ErrorCode));
-
- rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
-
- rd_kafka_mock_push_request_errors(mcluster, ApiKey, 1, ErrorCode);
-
- error = rd_kafka_init_transactions(rk, 5000);
- TEST_ASSERT(error, "Expected init_transactions() to fail");
-
- TEST_SAY("init_transactions() failed: %s: %s\n",
- rd_kafka_err2name(rd_kafka_error_code(error)),
- rd_kafka_error_string(error));
- TEST_ASSERT(rd_kafka_error_code(error) == ErrorCode,
- "Expected error %s, not %s", rd_kafka_err2name(ErrorCode),
- rd_kafka_err2name(rd_kafka_error_code(error)));
- TEST_ASSERT(rd_kafka_error_is_fatal(error),
- "Expected error to be fatal");
- TEST_ASSERT(!rd_kafka_error_is_retriable(error),
- "Expected error to not be retriable");
- rd_kafka_error_destroy(error);
-
- /* All done */
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Issue #3041: Commit fails due to message flush() taking too long,
- * eventually resulting in an unabortable error and failure to
- * re-init the transactional producer.
- */
-static void do_test_txn_flush_timeout(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
- rd_kafka_error_t *error;
- const char *txnid = "myTxnId";
- const char *topic = "myTopic";
- const int32_t coord_id = 2;
- int msgcounter = 0;
- rd_bool_t is_retry = rd_false;
-
- SUB_TEST_QUICK();
-
- rk = create_txn_producer(&mcluster, txnid, 3, "message.timeout.ms",
- "10000", "transaction.timeout.ms", "10000",
- /* Speed up coordinator reconnect */
- "reconnect.backoff.max.ms", "1000", NULL);
-
-
- /* Broker down is not a test-failing error */
- test_curr->is_fatal_cb = error_is_fatal_cb;
- allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
-
- rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
-
- /* Set coordinator so we can disconnect it later */
- rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, coord_id);
-
- /*
- * Init transactions
- */
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
-retry:
- if (!is_retry) {
- /* First attempt should fail. */
-
- test_curr->ignore_dr_err = rd_true;
- test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
-
- /* Assign invalid partition leaders for some partitions so
- * that messages will not be delivered. */
- rd_kafka_mock_partition_set_leader(mcluster, topic, 0, -1);
- rd_kafka_mock_partition_set_leader(mcluster, topic, 1, -1);
-
- } else {
- /* The retry should succeed */
- test_curr->ignore_dr_err = rd_false;
- test_curr->exp_dr_err = is_retry
- ? RD_KAFKA_RESP_ERR_NO_ERROR
- : RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
-
- rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
- rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
- }
-
-
- /*
- * Start a transaction
- */
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- /*
- * Produce some messages to specific partitions and random.
- */
- test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 100, NULL, 10,
- &msgcounter);
- test_produce_msgs2_nowait(rk, topic, 1, 0, 0, 100, NULL, 10,
- &msgcounter);
- test_produce_msgs2_nowait(rk, topic, RD_KAFKA_PARTITION_UA, 0, 0, 100,
- NULL, 10, &msgcounter);
-
-
- /*
- * Send some arbitrary offsets.
- */
- offsets = rd_kafka_topic_partition_list_new(4);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
- rd_kafka_topic_partition_list_add(offsets, "srctopic64", 49)->offset =
- 999999111;
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset =
- 999;
- rd_kafka_topic_partition_list_add(offsets, "srctopic64", 34)->offset =
- 123456789;
-
- cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
-
- TEST_CALL_ERROR__(
- rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
-
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
-
- rd_sleep(2);
-
- if (!is_retry) {
- /* Now disconnect the coordinator. */
- TEST_SAY("Disconnecting transaction coordinator %" PRId32 "\n",
- coord_id);
- rd_kafka_mock_broker_set_down(mcluster, coord_id);
- }
-
- /*
- * Start committing.
- */
- error = rd_kafka_commit_transaction(rk, -1);
-
- if (!is_retry) {
- TEST_ASSERT(error != NULL, "Expected commit to fail");
- TEST_SAY("commit_transaction() failed (expectedly): %s\n",
- rd_kafka_error_string(error));
- rd_kafka_error_destroy(error);
-
- } else {
- TEST_ASSERT(!error, "Expected commit to succeed, not: %s",
- rd_kafka_error_string(error));
- }
-
- if (!is_retry) {
- /*
- * Bring the coordinator back up.
- */
- rd_kafka_mock_broker_set_up(mcluster, coord_id);
- rd_sleep(2);
-
- /*
- * Abort, and try again, this time without error.
- */
- TEST_SAY("Aborting and retrying\n");
- is_retry = rd_true;
-
- TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 60000));
- goto retry;
- }
-
- /* All done */
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief ESC-4424: rko is reused in response handler after destroy in coord_req
- * sender due to bad state.
- *
- * This is somewhat of a race condition so we need to perform a couple of
- * iterations before it hits, usually 2 or 3, so we try at least 15 times.
- */
-static void do_test_txn_coord_req_destroy(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- int i;
- int errcnt = 0;
-
- SUB_TEST();
-
- rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
-
- test_curr->ignore_dr_err = rd_true;
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- for (i = 0; i < 15; i++) {
- rd_kafka_error_t *error;
- rd_kafka_resp_err_t err;
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
-
- test_timeout_set(10);
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- /*
- * Inject errors to trigger retries
- */
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_AddPartitionsToTxn,
- 2, /* first request + number of internal retries */
- RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS,
- RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
-
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_AddOffsetsToTxn,
- 1, /* first request + number of internal retries */
- RD_KAFKA_RESP_ERR_CONCURRENT_TRANSACTIONS);
-
- err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
- RD_KAFKA_V_VALUE("hi", 2),
- RD_KAFKA_V_END);
- TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
-
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_Produce, 4,
- RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
- RD_KAFKA_RESP_ERR_REQUEST_TIMED_OUT,
- RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED,
- RD_KAFKA_RESP_ERR_TOPIC_AUTHORIZATION_FAILED);
- /* FIXME: When KIP-360 is supported, add this error:
- * RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER */
-
- err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
- RD_KAFKA_V_VALUE("hi", 2),
- RD_KAFKA_V_END);
- TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
-
-
- /*
- * Send offsets to transaction
- */
-
- offsets = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)
- ->offset = 12;
-
- cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
-
- error = rd_kafka_send_offsets_to_transaction(rk, offsets,
- cgmetadata, -1);
-
- TEST_SAY("send_offsets_to_transaction() #%d: %s\n", i,
- rd_kafka_error_string(error));
-
- /* As we can't control the exact timing and sequence
- * of requests this sometimes fails and sometimes succeeds,
- * but we run the test enough times to trigger at least
- * one failure. */
- if (error) {
- TEST_SAY(
- "send_offsets_to_transaction() #%d "
- "failed (expectedly): %s\n",
- i, rd_kafka_error_string(error));
- TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
- "Expected abortable error for #%d", i);
- rd_kafka_error_destroy(error);
- errcnt++;
- }
-
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
-
- /* Allow time for internal retries */
- rd_sleep(2);
-
- TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, 5000));
- }
-
- TEST_ASSERT(errcnt > 0,
- "Expected at least one send_offets_to_transaction() "
- "failure");
-
- /* All done */
-
- rd_kafka_destroy(rk);
-}
-
-
-static rd_atomic32_t multi_find_req_cnt;
-
-static rd_kafka_resp_err_t
-multi_find_on_response_received_cb(rd_kafka_t *rk,
- int sockfd,
- const char *brokername,
- int32_t brokerid,
- int16_t ApiKey,
- int16_t ApiVersion,
- int32_t CorrId,
- size_t size,
- int64_t rtt,
- rd_kafka_resp_err_t err,
- void *ic_opaque) {
- rd_kafka_mock_cluster_t *mcluster = rd_kafka_handle_mock_cluster(rk);
- rd_bool_t done = rd_atomic32_get(&multi_find_req_cnt) > 10000;
-
- if (ApiKey != RD_KAFKAP_AddOffsetsToTxn || done)
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-
- TEST_SAY("on_response_received_cb: %s: %s: brokerid %" PRId32
- ", ApiKey %hd, CorrId %d, rtt %.2fms, %s: %s\n",
- rd_kafka_name(rk), brokername, brokerid, ApiKey, CorrId,
- rtt != -1 ? (float)rtt / 1000.0 : 0.0,
- done ? "already done" : "not done yet",
- rd_kafka_err2name(err));
-
-
- if (rd_atomic32_add(&multi_find_req_cnt, 1) == 1) {
- /* Trigger a broker down/up event, which in turns
- * triggers the coord_req_fsm(). */
- rd_kafka_mock_broker_set_down(mcluster, 2);
- rd_kafka_mock_broker_set_up(mcluster, 2);
- return RD_KAFKA_RESP_ERR_NO_ERROR;
- }
-
- /* Trigger a broker down/up event, which in turns
- * triggers the coord_req_fsm(). */
- rd_kafka_mock_broker_set_down(mcluster, 3);
- rd_kafka_mock_broker_set_up(mcluster, 3);
-
- /* Clear the downed broker's latency so that it reconnects
- * quickly, otherwise the ApiVersionRequest will be delayed and
- * this will in turn delay the -> UP transition that we need to
- * trigger the coord_reqs. */
- rd_kafka_mock_broker_set_rtt(mcluster, 3, 0);
-
- /* Only do this down/up once */
- rd_atomic32_add(&multi_find_req_cnt, 10000);
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-/**
- * @brief ESC-4444: multiple FindCoordinatorRequests are sent referencing
- * the same coord_req_t, but the first one received will destroy
- * the coord_req_t object and make the subsequent FindCoordingResponses
- * reference a freed object.
- *
- * What we want to achieve is this sequence:
- * 1. AddOffsetsToTxnRequest + Response which..
- * 2. Triggers TxnOffsetCommitRequest, but the coordinator is not known, so..
- * 3. Triggers a FindCoordinatorRequest
- * 4. FindCoordinatorResponse from 3 is received ..
- * 5. A TxnOffsetCommitRequest is sent from coord_req_fsm().
- * 6. Another broker changing state to Up triggers coord reqs again, which..
- * 7. Triggers a second TxnOffsetCommitRequest from coord_req_fsm().
- * 7. FindCoordinatorResponse from 5 is received, references the destroyed rko
- * and crashes.
- */
-static void do_test_txn_coord_req_multi_find(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_error_t *error;
- rd_kafka_resp_err_t err;
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
- const char *txnid = "txnid", *groupid = "mygroupid", *topic = "mytopic";
- int i;
-
- SUB_TEST();
-
- rd_atomic32_init(&multi_find_req_cnt, 0);
-
- on_response_received_cb = multi_find_on_response_received_cb;
- rk = create_txn_producer(&mcluster, txnid, 3,
- /* Need connections to all brokers so we
- * can trigger coord_req_fsm events
- * by toggling connections. */
- "enable.sparse.connections", "false",
- /* Set up on_response_received interceptor */
- "on_response_received", "", NULL);
-
- /* Let broker 1 be both txn and group coordinator
- * so that the group coordinator connection is up when it is time
- * send the TxnOffsetCommitRequest. */
- rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1);
- rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
-
- /* Set broker 1, 2, and 3 as leaders for a partition each and
- * later produce to both partitions so we know there's a connection
- * to all brokers. */
- rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
- rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
- rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 2);
- rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 3);
-
- /* Broker down is not a test-failing error */
- allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
- test_curr->is_fatal_cb = error_is_fatal_cb;
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- for (i = 0; i < 3; i++) {
- err = rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(i),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
- TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
- }
-
- test_flush(rk, 5000);
-
- /*
- * send_offsets_to_transaction() will query for the group coordinator,
- * we need to make those requests slow so that multiple requests are
- * sent.
- */
- for (i = 1; i <= 3; i++)
- rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 4000);
-
- /*
- * Send offsets to transaction
- */
-
- offsets = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 12;
-
- cgmetadata = rd_kafka_consumer_group_metadata_new(groupid);
-
- error =
- rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1);
-
- TEST_SAY("send_offsets_to_transaction() %s\n",
- rd_kafka_error_string(error));
- TEST_ASSERT(!error, "send_offsets_to_transaction() failed: %s",
- rd_kafka_error_string(error));
-
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
-
- /* Clear delay */
- for (i = 1; i <= 3; i++)
- rd_kafka_mock_broker_set_rtt(mcluster, (int32_t)i, 0);
-
- rd_sleep(5);
-
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
-
- /* All done */
-
- TEST_ASSERT(rd_atomic32_get(&multi_find_req_cnt) > 10000,
- "on_request_sent interceptor did not trigger properly");
-
- rd_kafka_destroy(rk);
-
- on_response_received_cb = NULL;
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief ESC-4410: adding producer partitions gradually will trigger multiple
- * AddPartitionsToTxn requests. Due to a bug the third partition to be
- * registered would hang in PEND_TXN state.
- *
- * Trigger this behaviour by having two outstanding AddPartitionsToTxn requests
- * at the same time, followed by a need for a third:
- *
- * 1. Set coordinator broker rtt high (to give us time to produce).
- * 2. Produce to partition 0, will trigger first AddPartitionsToTxn.
- * 3. Produce to partition 1, will trigger second AddPartitionsToTxn.
- * 4. Wait for second AddPartitionsToTxn response.
- * 5. Produce to partition 2, should trigger AddPartitionsToTxn, but bug
- * causes it to be stale in pending state.
- */
-
-static rd_atomic32_t multi_addparts_resp_cnt;
-static rd_kafka_resp_err_t
-multi_addparts_response_received_cb(rd_kafka_t *rk,
- int sockfd,
- const char *brokername,
- int32_t brokerid,
- int16_t ApiKey,
- int16_t ApiVersion,
- int32_t CorrId,
- size_t size,
- int64_t rtt,
- rd_kafka_resp_err_t err,
- void *ic_opaque) {
-
- if (ApiKey == RD_KAFKAP_AddPartitionsToTxn) {
- TEST_SAY("on_response_received_cb: %s: %s: brokerid %" PRId32
- ", ApiKey %hd, CorrId %d, rtt %.2fms, count %" PRId32
- ": %s\n",
- rd_kafka_name(rk), brokername, brokerid, ApiKey,
- CorrId, rtt != -1 ? (float)rtt / 1000.0 : 0.0,
- rd_atomic32_get(&multi_addparts_resp_cnt),
- rd_kafka_err2name(err));
-
- rd_atomic32_add(&multi_addparts_resp_cnt, 1);
- }
-
- return RD_KAFKA_RESP_ERR_NO_ERROR;
-}
-
-
-static void do_test_txn_addparts_req_multi(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- const char *txnid = "txnid", *topic = "mytopic";
- int32_t txn_coord = 2;
-
- SUB_TEST();
-
- rd_atomic32_init(&multi_addparts_resp_cnt, 0);
-
- on_response_received_cb = multi_addparts_response_received_cb;
- rk = create_txn_producer(&mcluster, txnid, 3, "linger.ms", "0",
- "message.timeout.ms", "9000",
- /* Set up on_response_received interceptor */
- "on_response_received", "", NULL);
-
- /* Let broker 1 be txn coordinator. */
- rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
- txn_coord);
-
- rd_kafka_mock_topic_create(mcluster, topic, 3, 1);
-
- /* Set partition leaders to non-txn-coord broker so they wont
- * be affected by rtt delay */
- rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
- rd_kafka_mock_partition_set_leader(mcluster, topic, 1, 1);
- rd_kafka_mock_partition_set_leader(mcluster, topic, 2, 1);
-
-
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- /*
- * Run one transaction first to let the client familiarize with
- * the topic, this avoids metadata lookups, etc, when the real
- * test is run.
- */
- TEST_SAY("Running seed transaction\n");
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
- TEST_CALL_ERR__(rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic),
- RD_KAFKA_V_VALUE("seed", 4),
- RD_KAFKA_V_END));
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 5000));
-
-
- /*
- * Now perform test transaction with rtt delays
- */
- TEST_SAY("Running test transaction\n");
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- /* Reset counter */
- rd_atomic32_set(&multi_addparts_resp_cnt, 0);
-
- /* Add latency to txn coordinator so we can pace our produce() calls */
- rd_kafka_mock_broker_set_rtt(mcluster, txn_coord, 1000);
-
- /* Produce to partition 0 */
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- rd_usleep(500 * 1000, NULL);
-
- /* Produce to partition 1 */
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(1),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- TEST_SAY("Waiting for two AddPartitionsToTxnResponse\n");
- while (rd_atomic32_get(&multi_addparts_resp_cnt) < 2)
- rd_usleep(10 * 1000, NULL);
-
- TEST_SAY("%" PRId32 " AddPartitionsToTxnResponses seen\n",
- rd_atomic32_get(&multi_addparts_resp_cnt));
-
- /* Produce to partition 2, this message will hang in
- * queue if the bug is not fixed. */
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(2),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- /* Allow some extra time for things to settle before committing
- * transaction. */
- rd_usleep(1000 * 1000, NULL);
-
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, 10 * 1000));
-
- /* All done */
- rd_kafka_destroy(rk);
-
- on_response_received_cb = NULL;
-
- SUB_TEST_PASS();
-}
-
-
-
-/**
- * @brief Test handling of OffsetFetchRequest returning UNSTABLE_OFFSET_COMMIT.
- *
- * There are two things to test;
- * - OffsetFetch triggered by committed() (and similar code paths)
- * - OffsetFetch triggered by assign()
- */
-static void do_test_unstable_offset_commit(void) {
- rd_kafka_t *rk, *c;
- rd_kafka_conf_t *c_conf;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_topic_partition_list_t *offsets;
- const char *topic = "srctopic4";
- const int msgcnt = 100;
- const int64_t offset_to_commit = msgcnt / 2;
- int i;
- int remains = 0;
-
- SUB_TEST_QUICK();
-
- rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
-
- test_conf_init(&c_conf, NULL, 0);
- test_conf_set(c_conf, "security.protocol", "PLAINTEXT");
- test_conf_set(c_conf, "bootstrap.servers",
- rd_kafka_mock_cluster_bootstraps(mcluster));
- test_conf_set(c_conf, "enable.partition.eof", "true");
- test_conf_set(c_conf, "auto.offset.reset", "error");
- c = test_create_consumer("mygroup", NULL, c_conf, NULL);
-
- rd_kafka_mock_topic_create(mcluster, topic, 2, 3);
-
- /* Produce some messages to the topic so that the consumer has
- * something to read. */
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
- test_produce_msgs2_nowait(rk, topic, 0, 0, 0, msgcnt, NULL, 0,
- &remains);
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
-
-
- /* Commit offset */
- offsets = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
- offset_to_commit;
- TEST_CALL_ERR__(rd_kafka_commit(c, offsets, 0 /*sync*/));
- rd_kafka_topic_partition_list_destroy(offsets);
-
- /* Retrieve offsets by calling committed().
- *
- * Have OffsetFetch fail and retry, on the first iteration
- * the API timeout is higher than the amount of time the retries will
- * take and thus succeed, and on the second iteration the timeout
- * will be lower and thus fail. */
- for (i = 0; i < 2; i++) {
- rd_kafka_resp_err_t err;
- rd_kafka_resp_err_t exp_err =
- i == 0 ? RD_KAFKA_RESP_ERR_NO_ERROR
- : RD_KAFKA_RESP_ERR__TIMED_OUT;
- int timeout_ms = exp_err ? 200 : 5 * 1000;
-
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_OffsetFetch,
- 1 + 5, /* first request + some retries */
- RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
- RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
- RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
- RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
- RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
- RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);
-
- offsets = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(offsets, topic, 0);
-
- err = rd_kafka_committed(c, offsets, timeout_ms);
-
- TEST_SAY("#%d: committed() returned %s (expected %s)\n", i,
- rd_kafka_err2name(err), rd_kafka_err2name(exp_err));
-
- TEST_ASSERT(err == exp_err,
- "#%d: Expected committed() to return %s, not %s", i,
- rd_kafka_err2name(exp_err), rd_kafka_err2name(err));
- TEST_ASSERT(offsets->cnt == 1,
- "Expected 1 committed offset, not %d",
- offsets->cnt);
- if (!exp_err)
- TEST_ASSERT(offsets->elems[0].offset ==
- offset_to_commit,
- "Expected committed offset %" PRId64
- ", "
- "not %" PRId64,
- offset_to_commit, offsets->elems[0].offset);
- else
- TEST_ASSERT(offsets->elems[0].offset < 0,
- "Expected no committed offset, "
- "not %" PRId64,
- offsets->elems[0].offset);
-
- rd_kafka_topic_partition_list_destroy(offsets);
- }
-
- TEST_SAY("Phase 2: OffsetFetch lookup through assignment\n");
- offsets = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(offsets, topic, 0)->offset =
- RD_KAFKA_OFFSET_STORED;
-
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_OffsetFetch,
- 1 + 5, /* first request + some retries */
- RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
- RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
- RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
- RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
- RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT,
- RD_KAFKA_RESP_ERR_UNSTABLE_OFFSET_COMMIT);
-
- test_consumer_incremental_assign("assign", c, offsets);
- rd_kafka_topic_partition_list_destroy(offsets);
-
- test_consumer_poll_exact("consume", c, 0, 1 /*eof*/, 0, msgcnt / 2,
- rd_true /*exact counts*/, NULL);
-
- /* All done */
- rd_kafka_destroy(c);
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief If a message times out locally before being attempted to send
- * and commit_transaction() is called, the transaction must not succeed.
- * https://github.com/confluentinc/confluent-kafka-dotnet/issues/1568
- */
-static void do_test_commit_after_msg_timeout(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- int32_t coord_id, leader_id;
- rd_kafka_resp_err_t err;
- rd_kafka_error_t *error;
- const char *topic = "test";
- const char *transactional_id = "txnid";
- int remains = 0;
-
- SUB_TEST_QUICK();
-
- /* Assign coordinator and leader to two different brokers */
- coord_id = 1;
- leader_id = 2;
-
- rk = create_txn_producer(&mcluster, transactional_id, 3,
- "message.timeout.ms", "5000",
- "transaction.timeout.ms", "10000", NULL);
-
- /* Broker down is not a test-failing error */
- allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
- test_curr->is_fatal_cb = error_is_fatal_cb;
- test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
-
- err = rd_kafka_mock_topic_create(mcluster, topic, 1, 3);
- TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
-
- rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
- coord_id);
- rd_kafka_mock_partition_set_leader(mcluster, topic, 0, leader_id);
-
- /* Start transactioning */
- TEST_SAY("Starting transaction\n");
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- TEST_SAY("Bringing down %" PRId32 "\n", leader_id);
- rd_kafka_mock_broker_set_down(mcluster, leader_id);
- rd_kafka_mock_broker_set_down(mcluster, coord_id);
-
- test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);
-
- error = rd_kafka_commit_transaction(rk, -1);
- TEST_ASSERT(error != NULL, "expected commit_transaciton() to fail");
- TEST_SAY_ERROR(error, "commit_transaction() failed (as expected): ");
- TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
- "Expected txn_requires_abort error");
- rd_kafka_error_destroy(error);
-
- /* Bring the brokers up so the abort can complete */
- rd_kafka_mock_broker_set_up(mcluster, coord_id);
- rd_kafka_mock_broker_set_up(mcluster, leader_id);
-
- TEST_SAY("Aborting transaction\n");
- TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
-
- TEST_ASSERT(remains == 0, "%d message(s) were not flushed\n", remains);
-
- TEST_SAY("Attempting second transaction, which should succeed\n");
- test_curr->is_fatal_cb = error_is_fatal_cb;
- test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
- test_produce_msgs2_nowait(rk, topic, 0, 0, 0, 1, NULL, 0, &remains);
-
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
-
- TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains);
-
- rd_kafka_destroy(rk);
-
- allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
- test_curr->is_fatal_cb = NULL;
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief #3575: Verify that OUT_OF_ORDER_SEQ does not trigger an epoch bump
- * during an ongoing transaction.
- * The transaction should instead enter the abortable state.
- */
-static void do_test_out_of_order_seq(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_error_t *error;
- int32_t txn_coord = 1, leader = 2;
- const char *txnid = "myTxnId";
- test_timing_t timing;
- rd_kafka_resp_err_t err;
-
- SUB_TEST_QUICK();
-
- rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
- NULL);
-
- rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
- txn_coord);
-
- rd_kafka_mock_partition_set_leader(mcluster, "mytopic", 0, leader);
-
- test_curr->ignore_dr_err = rd_true;
- test_curr->is_fatal_cb = NULL;
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
-
- /*
- * Start a transaction
- */
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
-
-
- /* Produce one seeding message first to get the leader up and running */
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
- test_flush(rk, -1);
-
- /* Let partition leader have a latency of 2 seconds
- * so that we can have multiple messages in-flight. */
- rd_kafka_mock_broker_set_rtt(mcluster, leader, 2 * 1000);
-
- /* Produce a message, let it fail with with different errors,
- * ending with OUT_OF_ORDER which previously triggered an
- * Epoch bump. */
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_Produce, 3,
- RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
- RD_KAFKA_RESP_ERR_NOT_LEADER_FOR_PARTITION,
- RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER);
-
- /* Produce three messages that will be delayed
- * and have errors injected.*/
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- /* Now sleep a short while so that the messages are processed
- * by the broker and errors are returned. */
- TEST_SAY("Sleeping..\n");
- rd_sleep(5);
-
- rd_kafka_mock_broker_set_rtt(mcluster, leader, 0);
-
- /* Produce a fifth message, should fail with ERR__STATE since
- * the transaction should have entered the abortable state. */
- err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
- RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
- TEST_ASSERT(err == RD_KAFKA_RESP_ERR__STATE,
- "Expected produce() to fail with ERR__STATE, not %s",
- rd_kafka_err2name(err));
- TEST_SAY("produce() failed as expected: %s\n", rd_kafka_err2str(err));
-
- /* Commit the transaction, should fail with abortable error. */
- TIMING_START(&timing, "commit_transaction(-1)");
- error = rd_kafka_commit_transaction(rk, -1);
- TIMING_STOP(&timing);
- TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
-
- TEST_SAY("commit_transaction() failed (expectedly): %s\n",
- rd_kafka_error_string(error));
-
- TEST_ASSERT(!rd_kafka_error_is_fatal(error),
- "Did not expect fatal error");
- TEST_ASSERT(rd_kafka_error_txn_requires_abort(error),
- "Expected abortable error");
- rd_kafka_error_destroy(error);
-
- /* Abort the transaction */
- TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
-
- /* Run a new transaction without errors to verify that the
- * producer can recover. */
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Verify lossless delivery if topic disappears from Metadata for awhile.
- *
- * If a topic is removed from metadata inbetween transactions, the producer
- * will remove its partition state for the topic's partitions.
- * If later the same topic comes back (same topic instance, not a new creation)
- * then the producer must restore the previously used msgid/BaseSequence
- * in case the same Epoch is still used, or messages will be silently lost
- * as they would seem like legit duplicates to the broker.
- *
- * Reproduction:
- * 1. produce msgs to topic, commit transaction.
- * 2. remove topic from metadata
- * 3. make sure client updates its metadata, which removes the partition
- * objects.
- * 4. restore the topic in metadata
- * 5. produce new msgs to topic, commit transaction.
- * 6. consume topic. All messages should be accounted for.
- */
-static void do_test_topic_disappears_for_awhile(void) {
- rd_kafka_t *rk, *c;
- rd_kafka_conf_t *c_conf;
- rd_kafka_mock_cluster_t *mcluster;
- const char *topic = "mytopic";
- const char *txnid = "myTxnId";
- test_timing_t timing;
- int i;
- int msgcnt = 0;
- const int partition_cnt = 10;
-
- SUB_TEST_QUICK();
-
- rk = create_txn_producer(
- &mcluster, txnid, 1, "batch.num.messages", "3", "linger.ms", "100",
- "topic.metadata.refresh.interval.ms", "2000", NULL);
-
- rd_kafka_mock_topic_create(mcluster, topic, partition_cnt, 1);
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
-
- for (i = 0; i < 2; i++) {
- int cnt = 3 * 2 * partition_cnt;
- rd_bool_t remove_topic = (i % 2) == 0;
- /*
- * Start a transaction
- */
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
-
- while (cnt-- >= 0) {
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC(topic),
- RD_KAFKA_V_PARTITION(cnt % partition_cnt),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
- msgcnt++;
- }
-
- /* Commit the transaction */
- TIMING_START(&timing, "commit_transaction(-1)");
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
- TIMING_STOP(&timing);
-
-
-
- if (remove_topic) {
- /* Make it seem the topic is removed, refresh metadata,
- * and then make the topic available again. */
- const rd_kafka_metadata_t *md;
-
- TEST_SAY("Marking topic as non-existent\n");
-
- rd_kafka_mock_topic_set_error(
- mcluster, topic,
- RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART);
-
- TEST_CALL_ERR__(rd_kafka_metadata(rk, 0, NULL, &md,
- tmout_multip(5000)));
-
- rd_kafka_metadata_destroy(md);
-
- rd_sleep(2);
-
- TEST_SAY("Bringing topic back to life\n");
- rd_kafka_mock_topic_set_error(
- mcluster, topic, RD_KAFKA_RESP_ERR_NO_ERROR);
- }
- }
-
- TEST_SAY("Verifying messages by consumtion\n");
- test_conf_init(&c_conf, NULL, 0);
- test_conf_set(c_conf, "security.protocol", "PLAINTEXT");
- test_conf_set(c_conf, "bootstrap.servers",
- rd_kafka_mock_cluster_bootstraps(mcluster));
- test_conf_set(c_conf, "enable.partition.eof", "true");
- test_conf_set(c_conf, "auto.offset.reset", "earliest");
- c = test_create_consumer("mygroup", NULL, c_conf, NULL);
-
- test_consumer_subscribe(c, topic);
- test_consumer_poll_exact("consume", c, 0, partition_cnt, 0, msgcnt,
- rd_true /*exact*/, NULL);
- rd_kafka_destroy(c);
-
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Test that group coordinator requests can handle an
- * untimely disconnect.
- *
- * The transaction manager makes use of librdkafka coord_req to commit
- * transaction offsets to the group coordinator.
- * If the connection to the given group coordinator is not up the
- * coord_req code will request a connection once, but if this connection fails
- * there will be no new attempts and the coord_req will idle until either
- * destroyed or the connection is retried for other reasons.
- * This in turn stalls the send_offsets_to_transaction() call until the
- * transaction times out.
- *
- * There are two variants to this test based on switch_coord:
- * - True - Switches the coordinator during the downtime.
- * The client should detect this and send the request to the
- * new coordinator.
- * - False - The coordinator remains on the down broker. Client will reconnect
- * when down broker comes up again.
- */
-struct some_state {
- rd_kafka_mock_cluster_t *mcluster;
- rd_bool_t switch_coord;
- int32_t broker_id;
- const char *grpid;
-};
-
-static int delayed_up_cb(void *arg) {
- struct some_state *state = arg;
- rd_sleep(3);
- if (state->switch_coord) {
- TEST_SAY("Switching group coordinator to %" PRId32 "\n",
- state->broker_id);
- rd_kafka_mock_coordinator_set(state->mcluster, "group",
- state->grpid, state->broker_id);
- } else {
- TEST_SAY("Bringing up group coordinator %" PRId32 "..\n",
- state->broker_id);
- rd_kafka_mock_broker_set_up(state->mcluster, state->broker_id);
- }
- return 0;
-}
-
-static void do_test_disconnected_group_coord(rd_bool_t switch_coord) {
- const char *topic = "mytopic";
- const char *txnid = "myTxnId";
- const char *grpid = "myGrpId";
- const int partition_cnt = 1;
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
- struct some_state state = RD_ZERO_INIT;
- test_timing_t timing;
- thrd_t thrd;
- int ret;
-
- SUB_TEST_QUICK("switch_coord=%s", RD_STR_ToF(switch_coord));
-
- test_curr->is_fatal_cb = error_is_fatal_cb;
- allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
-
- rk = create_txn_producer(&mcluster, txnid, 3, NULL);
-
- rd_kafka_mock_topic_create(mcluster, topic, partition_cnt, 1);
-
- /* Broker 1: txn coordinator
- * Broker 2: group coordinator
- * Broker 3: partition leader & backup coord if switch_coord=true */
- rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid, 1);
- rd_kafka_mock_coordinator_set(mcluster, "group", grpid, 2);
- rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 3);
-
- /* Bring down group coordinator so there are no undesired
- * connections to it. */
- rd_kafka_mock_broker_set_down(mcluster, 2);
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
- test_flush(rk, -1);
-
- rd_sleep(1);
-
- /* Run a background thread that after 3s, which should be enough
- * to perform the first failed connection attempt, makes the
- * group coordinator available again. */
- state.switch_coord = switch_coord;
- state.mcluster = mcluster;
- state.grpid = grpid;
- state.broker_id = switch_coord ? 3 : 2;
- if (thrd_create(&thrd, delayed_up_cb, &state) != thrd_success)
- TEST_FAIL("Failed to create thread");
-
- TEST_SAY("Calling send_offsets_to_transaction()\n");
- offsets = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 1;
- cgmetadata = rd_kafka_consumer_group_metadata_new(grpid);
-
- TIMING_START(&timing, "send_offsets_to_transaction(-1)");
- TEST_CALL_ERROR__(
- rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
- TIMING_STOP(&timing);
- TIMING_ASSERT(&timing, 0, 10 * 1000 /*10s*/);
-
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
- thrd_join(thrd, &ret);
-
- /* Commit the transaction */
- TIMING_START(&timing, "commit_transaction(-1)");
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
- TIMING_STOP(&timing);
-
- rd_kafka_destroy(rk);
-
- allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
- test_curr->is_fatal_cb = NULL;
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Test that a NULL coordinator is not fatal when
- * the transactional producer reconnects to the txn coordinator
- * and the first thing it does is a FindCoordinatorRequest that
- * fails with COORDINATOR_NOT_AVAILABLE, setting coordinator to NULL.
- */
-static void do_test_txn_coordinator_null_not_fatal(void) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_error_t *error;
- rd_kafka_resp_err_t err;
- int32_t coord_id = 1;
- const char *topic = "test";
- const char *transactional_id = "txnid";
- int msgcnt = 1;
- int remains = 0;
-
- SUB_TEST_QUICK();
-
- /* Broker down is not a test-failing error */
- allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
- test_curr->is_fatal_cb = error_is_fatal_cb;
- test_curr->exp_dr_err = RD_KAFKA_RESP_ERR__MSG_TIMED_OUT;
-
- /* One second is the minimum transaction timeout */
- rk = create_txn_producer(&mcluster, transactional_id, 1,
- "transaction.timeout.ms", "1000", NULL);
-
- err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
- TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
-
- rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
- coord_id);
- rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id);
-
- /* Start transactioning */
- TEST_SAY("Starting transaction\n");
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- /* Makes the produce request timeout. */
- rd_kafka_mock_broker_push_request_error_rtts(
- mcluster, coord_id, RD_KAFKAP_Produce, 1,
- RD_KAFKA_RESP_ERR_NO_ERROR, 3000);
-
- test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
- msgcnt, NULL, 0, &remains);
-
- /* This value is linked to transaction.timeout.ms, needs enough time
- * so the message times out and a DrainBump sequence is started. */
- rd_kafka_flush(rk, 1000);
-
- /* To trigger the error the COORDINATOR_NOT_AVAILABLE response
- * must come AFTER idempotent state has changed to WaitTransport
- * but BEFORE it changes to WaitPID. To make it more likely
- * rd_kafka_txn_coord_timer_start timeout can be changed to 5 ms
- * in rd_kafka_txn_coord_query, when unable to query for
- * transaction coordinator.
- */
- rd_kafka_mock_broker_push_request_error_rtts(
- mcluster, coord_id, RD_KAFKAP_FindCoordinator, 1,
- RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE, 10);
-
- /* Coordinator down starts the FindCoordinatorRequest loop. */
- TEST_SAY("Bringing down coordinator %" PRId32 "\n", coord_id);
- rd_kafka_mock_broker_set_down(mcluster, coord_id);
-
- /* Coordinator down for some time. */
- rd_usleep(100 * 1000, NULL);
-
- /* When it comes up, the error is triggered, if the preconditions
- * happen. */
- TEST_SAY("Bringing up coordinator %" PRId32 "\n", coord_id);
- rd_kafka_mock_broker_set_up(mcluster, coord_id);
-
- /* Make sure DRs are received */
- rd_kafka_flush(rk, 1000);
-
- error = rd_kafka_commit_transaction(rk, -1);
-
- TEST_ASSERT(remains == 0, "%d message(s) were not produced\n", remains);
- TEST_ASSERT(error != NULL, "Expected commit_transaction() to fail");
- TEST_SAY("commit_transaction() failed (expectedly): %s\n",
- rd_kafka_error_string(error));
- rd_kafka_error_destroy(error);
-
- /* Needs to wait some time before closing to make sure it doesn't go
- * into TERMINATING state before error is triggered. */
- rd_usleep(1000 * 1000, NULL);
- rd_kafka_destroy(rk);
-
- allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
- test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR;
- test_curr->is_fatal_cb = NULL;
-
- SUB_TEST_PASS();
-}
-
-
-
-/**
- * @brief Simple test to make sure the init_transactions() timeout is honoured
- * and also not infinite.
- */
-static void do_test_txn_resumable_init(void) {
- rd_kafka_t *rk;
- const char *transactional_id = "txnid";
- rd_kafka_error_t *error;
- test_timing_t duration;
-
- SUB_TEST();
-
- rd_kafka_conf_t *conf;
-
- test_conf_init(&conf, NULL, 20);
- test_conf_set(conf, "bootstrap.servers", "");
- test_conf_set(conf, "transactional.id", transactional_id);
- test_conf_set(conf, "transaction.timeout.ms", "4000");
-
- rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
-
- /* First make sure a lower timeout is honoured. */
- TIMING_START(&duration, "init_transactions(1000)");
- error = rd_kafka_init_transactions(rk, 1000);
- TIMING_STOP(&duration);
-
- if (error)
- TEST_SAY("First init_transactions failed (as expected): %s\n",
- rd_kafka_error_string(error));
- TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
- "Expected _TIMED_OUT, not %s",
- error ? rd_kafka_error_string(error) : "success");
- rd_kafka_error_destroy(error);
-
- TIMING_ASSERT(&duration, 900, 1500);
-
- TEST_SAY(
- "Performing second init_transactions() call now with an "
- "infinite timeout: "
- "should time out in 2 x transaction.timeout.ms\n");
-
- TIMING_START(&duration, "init_transactions(infinite)");
- error = rd_kafka_init_transactions(rk, -1);
- TIMING_STOP(&duration);
-
- if (error)
- TEST_SAY("Second init_transactions failed (as expected): %s\n",
- rd_kafka_error_string(error));
- TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT,
- "Expected _TIMED_OUT, not %s",
- error ? rd_kafka_error_string(error) : "success");
- rd_kafka_error_destroy(error);
-
- TIMING_ASSERT(&duration, 2 * 4000 - 500, 2 * 4000 + 500);
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Retries a transaction call until it succeeds or returns a
- * non-retriable error - which will cause the test to fail.
- *
- * @param intermed_calls Is a block of code that will be called after each
- * retriable failure of \p call.
- */
-#define RETRY_TXN_CALL__(call, intermed_calls) \
- do { \
- rd_kafka_error_t *_error = call; \
- if (!_error) \
- break; \
- TEST_SAY_ERROR(_error, "%s: ", "" #call); \
- TEST_ASSERT(rd_kafka_error_is_retriable(_error), \
- "Expected retriable error"); \
- TEST_SAY("%s failed, retrying in 1 second\n", "" #call); \
- rd_kafka_error_destroy(_error); \
- intermed_calls; \
- rd_sleep(1); \
- } while (1)
-
-/**
- * @brief Call \p call and expect it to fail with \p exp_err_code.
- */
-#define TXN_CALL_EXPECT_ERROR__(call, exp_err_code) \
- do { \
- rd_kafka_error_t *_error = call; \
- TEST_ASSERT(_error != NULL, \
- "%s: Expected %s error, got success", "" #call, \
- rd_kafka_err2name(exp_err_code)); \
- TEST_SAY_ERROR(_error, "%s: ", "" #call); \
- TEST_ASSERT(rd_kafka_error_code(_error) == exp_err_code, \
- "%s: Expected %s error, got %s", "" #call, \
- rd_kafka_err2name(exp_err_code), \
- rd_kafka_error_name(_error)); \
- rd_kafka_error_destroy(_error); \
- } while (0)
-
-
-/**
- * @brief Simple test to make sure short API timeouts can be safely resumed
- * by calling the same API again.
- *
- * @param do_commit Commit transaction if true, else abort transaction.
- */
-static void do_test_txn_resumable_calls_timeout(rd_bool_t do_commit) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_resp_err_t err;
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
- int32_t coord_id = 1;
- const char *topic = "test";
- const char *transactional_id = "txnid";
- int msgcnt = 1;
- int remains = 0;
-
- SUB_TEST("%s_transaction", do_commit ? "commit" : "abort");
-
- rk = create_txn_producer(&mcluster, transactional_id, 1, NULL);
-
- err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
- TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
-
- rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
- coord_id);
- rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id);
-
- TEST_SAY("Starting transaction\n");
- TEST_SAY("Delaying first two InitProducerIdRequests by 500ms\n");
- rd_kafka_mock_broker_push_request_error_rtts(
- mcluster, coord_id, RD_KAFKAP_InitProducerId, 2,
- RD_KAFKA_RESP_ERR_NO_ERROR, 500, RD_KAFKA_RESP_ERR_NO_ERROR, 500);
-
- RETRY_TXN_CALL__(
- rd_kafka_init_transactions(rk, 100),
- TXN_CALL_EXPECT_ERROR__(rd_kafka_abort_transaction(rk, -1),
- RD_KAFKA_RESP_ERR__CONFLICT));
-
- RETRY_TXN_CALL__(rd_kafka_begin_transaction(rk), /*none*/);
-
-
- TEST_SAY("Delaying ProduceRequests by 3000ms\n");
- rd_kafka_mock_broker_push_request_error_rtts(
- mcluster, coord_id, RD_KAFKAP_Produce, 1,
- RD_KAFKA_RESP_ERR_NO_ERROR, 3000);
-
- test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
- msgcnt, NULL, 0, &remains);
-
-
- TEST_SAY("Delaying SendOffsetsToTransaction by 400ms\n");
- rd_kafka_mock_broker_push_request_error_rtts(
- mcluster, coord_id, RD_KAFKAP_AddOffsetsToTxn, 1,
- RD_KAFKA_RESP_ERR_NO_ERROR, 400);
- offsets = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 12;
- cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
-
- /* This is not a resumable call on timeout */
- TEST_CALL_ERROR__(
- rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
-
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
-
-
- TEST_SAY("Delaying EndTxnRequests by 1200ms\n");
- rd_kafka_mock_broker_push_request_error_rtts(
- mcluster, coord_id, RD_KAFKAP_EndTxn, 1, RD_KAFKA_RESP_ERR_NO_ERROR,
- 1200);
-
- /* Committing/aborting the transaction will also be delayed by the
- * previous accumulated remaining delays. */
-
- if (do_commit) {
- TEST_SAY("Committing transaction\n");
-
- RETRY_TXN_CALL__(
- rd_kafka_commit_transaction(rk, 100),
- TXN_CALL_EXPECT_ERROR__(rd_kafka_abort_transaction(rk, -1),
- RD_KAFKA_RESP_ERR__CONFLICT));
- } else {
- TEST_SAY("Aborting transaction\n");
-
- RETRY_TXN_CALL__(
- rd_kafka_abort_transaction(rk, 100),
- TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, -1),
- RD_KAFKA_RESP_ERR__CONFLICT));
- }
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Verify that resuming timed out calls that after the timeout, but
- * before the resuming call, would error out.
- */
-static void do_test_txn_resumable_calls_timeout_error(rd_bool_t do_commit) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_resp_err_t err;
- int32_t coord_id = 1;
- const char *topic = "test";
- const char *transactional_id = "txnid";
- int msgcnt = 1;
- int remains = 0;
- rd_kafka_error_t *error;
-
- SUB_TEST_QUICK("%s_transaction", do_commit ? "commit" : "abort");
-
- rk = create_txn_producer(&mcluster, transactional_id, 1, NULL);
-
- err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
- TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
-
- rd_kafka_mock_coordinator_set(mcluster, "transaction", transactional_id,
- coord_id);
- rd_kafka_mock_partition_set_leader(mcluster, topic, 0, coord_id);
-
- TEST_SAY("Starting transaction\n");
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0,
- msgcnt, NULL, 0, &remains);
-
-
- TEST_SAY("Fail EndTxn fatally after 2000ms\n");
- rd_kafka_mock_broker_push_request_error_rtts(
- mcluster, coord_id, RD_KAFKAP_EndTxn, 1,
- RD_KAFKA_RESP_ERR_INVALID_TXN_STATE, 2000);
-
- if (do_commit) {
- TEST_SAY("Committing transaction\n");
-
- TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, 500),
- RD_KAFKA_RESP_ERR__TIMED_OUT);
-
- /* Sleep so that the background EndTxn fails locally and sets
- * an error result. */
- rd_sleep(3);
-
- error = rd_kafka_commit_transaction(rk, -1);
-
- } else {
- TEST_SAY("Aborting transaction\n");
-
- TXN_CALL_EXPECT_ERROR__(rd_kafka_commit_transaction(rk, 500),
- RD_KAFKA_RESP_ERR__TIMED_OUT);
-
- /* Sleep so that the background EndTxn fails locally and sets
- * an error result. */
- rd_sleep(3);
-
- error = rd_kafka_commit_transaction(rk, -1);
- }
-
- TEST_ASSERT(error != NULL && rd_kafka_error_is_fatal(error),
- "Expected fatal error, not %s",
- rd_kafka_error_string(error));
- TEST_ASSERT(rd_kafka_error_code(error) ==
- RD_KAFKA_RESP_ERR_INVALID_TXN_STATE,
- "Expected error INVALID_TXN_STATE, got %s",
- rd_kafka_error_name(error));
- rd_kafka_error_destroy(error);
-
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Concurrent transaction API calls are not permitted.
- * This test makes sure they're properly enforced.
- *
- * For each transactional API, call it with a 5s timeout, and during that time
- * from another thread call transactional APIs, one by one, and verify that
- * we get an ERR__CONFLICT error back in the second thread.
- *
- * We use a mutex for synchronization, the main thread will hold the lock
- * when not calling an API but release it just prior to calling.
- * The other thread will acquire the lock, sleep, and hold the lock while
- * calling the concurrent API that should fail immediately, releasing the lock
- * when done.
- *
- */
-
-struct _txn_concurrent_state {
- const char *api;
- mtx_t lock;
- rd_kafka_t *rk;
- struct test *test;
-};
-
-static int txn_concurrent_thread_main(void *arg) {
- struct _txn_concurrent_state *state = arg;
- static const char *apis[] = {
- "init_transactions", "begin_transaction",
- "send_offsets_to_transaction", "commit_transaction",
- "abort_transaction", NULL};
- rd_kafka_t *rk = state->rk;
- const char *main_api = NULL;
- int i;
-
- /* Update TLS variable so TEST_..() macros work */
- test_curr = state->test;
-
- while (1) {
- const char *api = NULL;
- const int timeout_ms = 10000;
- rd_kafka_error_t *error = NULL;
- rd_kafka_resp_err_t exp_err;
- test_timing_t duration;
-
- /* Wait for other thread's txn call to start, then sleep a bit
- * to increase the chance of that call has really begun. */
- mtx_lock(&state->lock);
-
- if (state->api && state->api == main_api) {
- /* Main thread is still blocking on the last API call */
- TEST_SAY("Waiting for main thread to finish %s()\n",
- main_api);
- mtx_unlock(&state->lock);
- rd_sleep(1);
- continue;
- } else if (!(main_api = state->api)) {
- mtx_unlock(&state->lock);
- break;
- }
-
- rd_sleep(1);
-
- for (i = 0; (api = apis[i]) != NULL; i++) {
- TEST_SAY(
- "Triggering concurrent %s() call while "
- "main is in %s() call\n",
- api, main_api);
- TIMING_START(&duration, "%s", api);
-
- if (!strcmp(api, "init_transactions"))
- error =
- rd_kafka_init_transactions(rk, timeout_ms);
- else if (!strcmp(api, "begin_transaction"))
- error = rd_kafka_begin_transaction(rk);
- else if (!strcmp(api, "send_offsets_to_transaction")) {
- rd_kafka_topic_partition_list_t *offsets =
- rd_kafka_topic_partition_list_new(1);
- rd_kafka_consumer_group_metadata_t *cgmetadata =
- rd_kafka_consumer_group_metadata_new(
- "mygroupid");
- rd_kafka_topic_partition_list_add(
- offsets, "srctopic4", 0)
- ->offset = 12;
-
- error = rd_kafka_send_offsets_to_transaction(
- rk, offsets, cgmetadata, -1);
- rd_kafka_consumer_group_metadata_destroy(
- cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
- } else if (!strcmp(api, "commit_transaction"))
- error =
- rd_kafka_commit_transaction(rk, timeout_ms);
- else if (!strcmp(api, "abort_transaction"))
- error =
- rd_kafka_abort_transaction(rk, timeout_ms);
- else
- TEST_FAIL("Unknown API: %s", api);
-
- TIMING_STOP(&duration);
-
- TEST_SAY_ERROR(error, "Conflicting %s() call: ", api);
- TEST_ASSERT(error,
- "Expected conflicting %s() call to fail",
- api);
-
- exp_err = !strcmp(api, main_api)
- ? RD_KAFKA_RESP_ERR__PREV_IN_PROGRESS
- : RD_KAFKA_RESP_ERR__CONFLICT;
-
- TEST_ASSERT(rd_kafka_error_code(error) == exp_err,
-
- "Conflicting %s(): Expected %s, not %s",
- api, rd_kafka_err2str(exp_err),
- rd_kafka_error_name(error));
- TEST_ASSERT(
- rd_kafka_error_is_retriable(error),
- "Conflicting %s(): Expected retriable error", api);
- rd_kafka_error_destroy(error);
- /* These calls should fail immediately */
- TIMING_ASSERT(&duration, 0, 100);
- }
-
- mtx_unlock(&state->lock);
- }
-
- return 0;
-}
-
-static void do_test_txn_concurrent_operations(rd_bool_t do_commit) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- int32_t coord_id = 1;
- rd_kafka_resp_err_t err;
- const char *topic = "test";
- const char *transactional_id = "txnid";
- int remains = 0;
- thrd_t thrd;
- struct _txn_concurrent_state state = RD_ZERO_INIT;
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
-
- SUB_TEST("%s", do_commit ? "commit" : "abort");
-
- test_timeout_set(90);
-
- /* We need to override the value of socket.connection.setup.timeout.ms
- * to be at least 2*RTT of the mock broker. This is because the first
- * ApiVersion request will fail, since we make the request with v3, and
- * the mock broker's MaxVersion is 2, so the request is retried with v0.
- * We use the value 3*RTT to add some buffer.
- */
- rk = create_txn_producer(&mcluster, transactional_id, 1,
- "socket.connection.setup.timeout.ms", "15000",
- NULL);
-
- /* Set broker RTT to 3.5s so that the background thread has ample
- * time to call its conflicting APIs.
- * This value must be less than socket.connection.setup.timeout.ms/2. */
- rd_kafka_mock_broker_set_rtt(mcluster, coord_id, 3500);
-
- err = rd_kafka_mock_topic_create(mcluster, topic, 1, 1);
- TEST_ASSERT(!err, "Failed to create topic: %s", rd_kafka_err2str(err));
-
- /* Set up shared state between us and the concurrent thread */
- mtx_init(&state.lock, mtx_plain);
- state.test = test_curr;
- state.rk = rk;
-
- /* We release the lock only while calling the TXN API */
- mtx_lock(&state.lock);
-
- /* Spin up concurrent thread */
- if (thrd_create(&thrd, txn_concurrent_thread_main, (void *)&state) !=
- thrd_success)
- TEST_FAIL("Failed to create thread");
-
-#define _start_call(callname) \
- do { \
- state.api = callname; \
- mtx_unlock(&state.lock); \
- } while (0)
-#define _end_call() mtx_lock(&state.lock)
-
- _start_call("init_transactions");
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
- _end_call();
-
- /* This call doesn't block, so can't really be tested concurrently. */
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- test_produce_msgs2_nowait(rk, topic, 0, RD_KAFKA_PARTITION_UA, 0, 10,
- NULL, 0, &remains);
-
- _start_call("send_offsets_to_transaction");
- offsets = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 0)->offset = 12;
- cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
-
- TEST_CALL_ERROR__(
- rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata, -1));
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
- _end_call();
-
- if (do_commit) {
- _start_call("commit_transaction");
- TEST_CALL_ERROR__(rd_kafka_commit_transaction(rk, -1));
- _end_call();
- } else {
- _start_call("abort_transaction");
- TEST_CALL_ERROR__(rd_kafka_abort_transaction(rk, -1));
- _end_call();
- }
-
- /* Signal completion to background thread */
- state.api = NULL;
-
- mtx_unlock(&state.lock);
-
- thrd_join(thrd, NULL);
-
- rd_kafka_destroy(rk);
-
- mtx_destroy(&state.lock);
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief KIP-360: Test that fatal idempotence errors triggers abortable
- * transaction errors, but let the broker-side abort of the
- * transaction fail with a fencing error.
- * Should raise a fatal error.
- *
- * @param error_code Which error code EndTxn should fail with.
- * Either RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH (older)
- * or RD_KAFKA_RESP_ERR_PRODUCER_FENCED (newer).
- */
-static void do_test_txn_fenced_abort(rd_kafka_resp_err_t error_code) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_error_t *error;
- int32_t txn_coord = 2;
- const char *txnid = "myTxnId";
- char errstr[512];
- rd_kafka_resp_err_t fatal_err;
- size_t errors_cnt;
-
- SUB_TEST_QUICK("With error %s", rd_kafka_err2name(error_code));
-
- rk = create_txn_producer(&mcluster, txnid, 3, "batch.num.messages", "1",
- NULL);
-
- rd_kafka_mock_coordinator_set(mcluster, "transaction", txnid,
- txn_coord);
-
- test_curr->ignore_dr_err = rd_true;
- test_curr->is_fatal_cb = error_is_fatal_cb;
- allowed_error = RD_KAFKA_RESP_ERR__FENCED;
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, -1));
-
- /*
- * Start a transaction
- */
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
-
- /* Produce a message without error first */
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- test_flush(rk, -1);
-
- /* Fail abort transaction */
- rd_kafka_mock_broker_push_request_error_rtts(
- mcluster, txn_coord, RD_KAFKAP_EndTxn, 1, error_code, 0);
-
- /* Fail the PID reinit */
- rd_kafka_mock_broker_push_request_error_rtts(
- mcluster, txn_coord, RD_KAFKAP_InitProducerId, 1, error_code, 0);
-
- /* Produce a message, let it fail with a fatal idempo error. */
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_Produce, 1,
- RD_KAFKA_RESP_ERR_UNKNOWN_PRODUCER_ID);
-
- TEST_CALL_ERR__(rd_kafka_producev(
- rk, RD_KAFKA_V_TOPIC("mytopic"), RD_KAFKA_V_PARTITION(0),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END));
-
- test_flush(rk, -1);
-
- /* Abort the transaction, should fail with a fatal error */
- error = rd_kafka_abort_transaction(rk, -1);
- TEST_ASSERT(error != NULL, "Expected abort_transaction() to fail");
-
- TEST_SAY_ERROR(error, "abort_transaction() failed: ");
- TEST_ASSERT(rd_kafka_error_is_fatal(error), "Expected a fatal error");
- rd_kafka_error_destroy(error);
-
- fatal_err = rd_kafka_fatal_error(rk, errstr, sizeof(errstr));
- TEST_ASSERT(fatal_err, "Expected a fatal error to have been raised");
- TEST_SAY("Fatal error: %s: %s\n", rd_kafka_err2name(fatal_err), errstr);
-
- /* Verify that the producer sent the expected number of EndTxn requests
- * by inspecting the mock broker error stack,
- * which should now be empty. */
- if (rd_kafka_mock_broker_error_stack_cnt(
- mcluster, txn_coord, RD_KAFKAP_EndTxn, &errors_cnt)) {
- TEST_FAIL(
- "Broker error count should succeed for API %s"
- " on broker %" PRId32,
- rd_kafka_ApiKey2str(RD_KAFKAP_EndTxn), txn_coord);
- }
- /* Checks all the RD_KAFKAP_EndTxn responses have been consumed */
- TEST_ASSERT(errors_cnt == 0,
- "Expected error count 0 for API %s, found %zu",
- rd_kafka_ApiKey2str(RD_KAFKAP_EndTxn), errors_cnt);
-
- if (rd_kafka_mock_broker_error_stack_cnt(
- mcluster, txn_coord, RD_KAFKAP_InitProducerId, &errors_cnt)) {
- TEST_FAIL(
- "Broker error count should succeed for API %s"
- " on broker %" PRId32,
- rd_kafka_ApiKey2str(RD_KAFKAP_InitProducerId), txn_coord);
- }
- /* Checks none of the RD_KAFKAP_InitProducerId responses have been
- * consumed
- */
- TEST_ASSERT(errors_cnt == 1,
- "Expected error count 1 for API %s, found %zu",
- rd_kafka_ApiKey2str(RD_KAFKAP_InitProducerId), errors_cnt);
-
- /* All done */
- rd_kafka_destroy(rk);
-
- allowed_error = RD_KAFKA_RESP_ERR_NO_ERROR;
-
- SUB_TEST_PASS();
-}
-
-
-/**
- * @brief Test that the TxnOffsetCommit op doesn't retry without waiting
- * if the coordinator is found but not available, causing too frequent retries.
- */
-static void
-do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_bool_t times_out) {
- rd_kafka_t *rk;
- rd_kafka_mock_cluster_t *mcluster;
- rd_kafka_resp_err_t err;
- rd_kafka_topic_partition_list_t *offsets;
- rd_kafka_consumer_group_metadata_t *cgmetadata;
- rd_kafka_error_t *error;
- int timeout;
-
- SUB_TEST_QUICK("times_out=%s", RD_STR_ToF(times_out));
-
- rk = create_txn_producer(&mcluster, "txnid", 3, NULL);
-
- test_curr->ignore_dr_err = rd_true;
-
- TEST_CALL_ERROR__(rd_kafka_init_transactions(rk, 5000));
-
- TEST_CALL_ERROR__(rd_kafka_begin_transaction(rk));
-
- err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC("mytopic"),
- RD_KAFKA_V_VALUE("hi", 2), RD_KAFKA_V_END);
- TEST_ASSERT(!err, "produce failed: %s", rd_kafka_err2str(err));
-
- /* Wait for messages to be delivered */
- test_flush(rk, 5000);
-
- /*
- * Fail TxnOffsetCommit with COORDINATOR_NOT_AVAILABLE
- * repeatedly.
- */
- rd_kafka_mock_push_request_errors(
- mcluster, RD_KAFKAP_TxnOffsetCommit, 4,
- RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
- RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
- RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
- RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE);
-
- offsets = rd_kafka_topic_partition_list_new(1);
- rd_kafka_topic_partition_list_add(offsets, "srctopic4", 3)->offset = 1;
-
- cgmetadata = rd_kafka_consumer_group_metadata_new("mygroupid");
-
- /* The retry delay is 500ms, with 4 retries it should take at least
- * 2000ms for this call to succeed. */
- timeout = times_out ? 500 : 4000;
- error = rd_kafka_send_offsets_to_transaction(rk, offsets, cgmetadata,
- timeout);
- rd_kafka_consumer_group_metadata_destroy(cgmetadata);
- rd_kafka_topic_partition_list_destroy(offsets);
-
- if (times_out) {
- TEST_ASSERT(rd_kafka_error_code(error) ==
- RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE,
- "expected %s, got: %s",
- rd_kafka_err2name(
- RD_KAFKA_RESP_ERR_COORDINATOR_NOT_AVAILABLE),
- rd_kafka_err2str(rd_kafka_error_code(error)));
- } else {
- TEST_ASSERT(rd_kafka_error_code(error) ==
- RD_KAFKA_RESP_ERR_NO_ERROR,
- "expected \"Success\", found: %s",
- rd_kafka_err2str(rd_kafka_error_code(error)));
- }
- rd_kafka_error_destroy(error);
-
- /* All done */
- rd_kafka_destroy(rk);
-
- SUB_TEST_PASS();
-}
-
-
-int main_0105_transactions_mock(int argc, char **argv) {
- if (test_needs_auth()) {
- TEST_SKIP("Mock cluster does not support SSL/SASL\n");
- return 0;
- }
-
- do_test_txn_recoverable_errors();
-
- do_test_txn_fatal_idempo_errors();
-
- do_test_txn_fenced_reinit(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH);
- do_test_txn_fenced_reinit(RD_KAFKA_RESP_ERR_PRODUCER_FENCED);
-
- do_test_txn_req_cnt();
-
- do_test_txn_requires_abort_errors();
-
- do_test_txn_slow_reinit(rd_false);
- do_test_txn_slow_reinit(rd_true);
-
- /* Just do a subset of tests in quick mode */
- if (test_quick)
- return 0;
-
- do_test_txn_endtxn_errors();
-
- do_test_txn_endtxn_infinite();
-
- do_test_txn_endtxn_timeout();
-
- do_test_txn_endtxn_timeout_inflight();
-
- /* Bring down the coordinator */
- do_test_txn_broker_down_in_txn(rd_true);
-
- /* Bring down partition leader */
- do_test_txn_broker_down_in_txn(rd_false);
-
- do_test_txns_not_supported();
-
- do_test_txns_send_offsets_concurrent_is_retried();
-
- do_test_txns_send_offsets_non_eligible();
-
- do_test_txn_coord_req_destroy();
-
- do_test_txn_coord_req_multi_find();
-
- do_test_txn_addparts_req_multi();
-
- do_test_txns_no_timeout_crash();
-
- do_test_txn_auth_failure(
- RD_KAFKAP_InitProducerId,
- RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
-
- do_test_txn_auth_failure(
- RD_KAFKAP_FindCoordinator,
- RD_KAFKA_RESP_ERR_CLUSTER_AUTHORIZATION_FAILED);
-
- do_test_txn_flush_timeout();
-
- do_test_unstable_offset_commit();
-
- do_test_commit_after_msg_timeout();
-
- do_test_txn_switch_coordinator();
-
- do_test_txn_switch_coordinator_refresh();
-
- do_test_out_of_order_seq();
-
- do_test_topic_disappears_for_awhile();
-
- do_test_disconnected_group_coord(rd_false);
-
- do_test_disconnected_group_coord(rd_true);
-
- do_test_txn_coordinator_null_not_fatal();
-
- do_test_txn_resumable_calls_timeout(rd_true);
-
- do_test_txn_resumable_calls_timeout(rd_false);
-
- do_test_txn_resumable_calls_timeout_error(rd_true);
-
- do_test_txn_resumable_calls_timeout_error(rd_false);
- do_test_txn_resumable_init();
-
- do_test_txn_concurrent_operations(rd_true /*commit*/);
-
- do_test_txn_concurrent_operations(rd_false /*abort*/);
-
- do_test_txn_fenced_abort(RD_KAFKA_RESP_ERR_INVALID_PRODUCER_EPOCH);
-
- do_test_txn_fenced_abort(RD_KAFKA_RESP_ERR_PRODUCER_FENCED);
-
- do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_true);
-
- do_test_txn_offset_commit_doesnt_retry_too_quickly(rd_false);
-
- return 0;
-}