diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/lib/librdkafka-2.1.0/tests/0103-transactions.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0103-transactions.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0103-transactions.c | 1297 |
1 files changed, 0 insertions, 1297 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0103-transactions.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0103-transactions.c deleted file mode 100644 index eaab2f217..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/0103-transactions.c +++ /dev/null @@ -1,1297 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2019, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "test.h" - -#include "rdkafka.h" - -/** - * @name Producer transaction tests - * - */ - - -/** - * @brief Produce messages using batch interface. - */ -void do_produce_batch(rd_kafka_t *rk, - const char *topic, - uint64_t testid, - int32_t partition, - int msg_base, - int cnt) { - rd_kafka_message_t *messages; - rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic, NULL); - int i; - int ret; - int remains = cnt; - - TEST_SAY("Batch-producing %d messages to partition %" PRId32 "\n", cnt, - partition); - - messages = rd_calloc(sizeof(*messages), cnt); - for (i = 0; i < cnt; i++) { - char key[128]; - char value[128]; - - test_prepare_msg(testid, partition, msg_base + i, value, - sizeof(value), key, sizeof(key)); - messages[i].key = rd_strdup(key); - messages[i].key_len = strlen(key); - messages[i].payload = rd_strdup(value); - messages[i].len = strlen(value); - messages[i]._private = &remains; - } - - ret = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_COPY, - messages, cnt); - - rd_kafka_topic_destroy(rkt); - - TEST_ASSERT(ret == cnt, - "Failed to batch-produce: %d/%d messages produced", ret, - cnt); - - for (i = 0; i < cnt; i++) { - TEST_ASSERT(!messages[i].err, "Failed to produce message: %s", - rd_kafka_err2str(messages[i].err)); - rd_free(messages[i].key); - rd_free(messages[i].payload); - } - rd_free(messages); - - /* Wait for deliveries */ - test_wait_delivery(rk, &remains); -} - - - -/** - * @brief Basic producer transaction testing without consumed input - * (only consumed output for verification). - * e.g., no consumer offsets to commit with transaction. - */ -static void do_test_basic_producer_txn(rd_bool_t enable_compression) { - const char *topic = test_mk_topic_name("0103_transactions", 1); - const int partition_cnt = 4; -#define _TXNCNT 6 - struct { - const char *desc; - uint64_t testid; - int msgcnt; - rd_bool_t abort; - rd_bool_t sync; - rd_bool_t batch; - rd_bool_t batch_any; - } txn[_TXNCNT] = { - {"Commit transaction, sync producing", 0, 100, rd_false, rd_true}, - {"Commit transaction, async producing", 0, 1000, rd_false, - rd_false}, - {"Commit transaction, sync batch producing to any partition", 0, - 100, rd_false, rd_true, rd_true, rd_true}, - {"Abort transaction, sync producing", 0, 500, rd_true, rd_true}, - {"Abort transaction, async producing", 0, 5000, rd_true, rd_false}, - {"Abort transaction, sync batch producing to one partition", 0, 500, - rd_true, rd_true, rd_true, rd_false}, - - }; - rd_kafka_t *p, *c; - rd_kafka_conf_t *conf, *p_conf, *c_conf; - int i; - - /* Mark one of run modes as quick so we don't run both when - * in a hurry.*/ - SUB_TEST0(enable_compression /* quick */, "with%s compression", - enable_compression ? "" : "out"); - - test_conf_init(&conf, NULL, 30); - - /* Create producer */ - p_conf = rd_kafka_conf_dup(conf); - rd_kafka_conf_set_dr_msg_cb(p_conf, test_dr_msg_cb); - test_conf_set(p_conf, "transactional.id", topic); - if (enable_compression) - test_conf_set(p_conf, "compression.type", "lz4"); - p = test_create_handle(RD_KAFKA_PRODUCER, p_conf); - - // FIXME: add testing were the txn id is reused (and thus fails) - - /* Create topic */ - test_create_topic(p, topic, partition_cnt, 3); - - /* Create consumer */ - c_conf = conf; - test_conf_set(conf, "auto.offset.reset", "earliest"); - /* Make sure default isolation.level is transaction aware */ - TEST_ASSERT( - !strcmp(test_conf_get(c_conf, "isolation.level"), "read_committed"), - "expected isolation.level=read_committed, not %s", - test_conf_get(c_conf, "isolation.level")); - - c = test_create_consumer(topic, NULL, c_conf, NULL); - - /* Wait for topic to propagate to avoid test flakyness */ - test_wait_topic_exists(c, topic, tmout_multip(5000)); - - /* Subscribe to topic */ - test_consumer_subscribe(c, topic); - - /* Wait for assignment to make sure consumer is fetching messages - * below, so we can use the poll_no_msgs() timeout to - * determine that messages were indeed aborted. */ - test_consumer_wait_assignment(c, rd_true); - - /* Init transactions */ - TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30 * 1000)); - - for (i = 0; i < _TXNCNT; i++) { - int wait_msgcnt = 0; - - TEST_SAY(_C_BLU "txn[%d]: Begin transaction: %s\n" _C_CLR, i, - txn[i].desc); - - /* Begin a transaction */ - TEST_CALL_ERROR__(rd_kafka_begin_transaction(p)); - - /* If the transaction is aborted it is okay if - * messages fail producing, since they'll be - * purged from queues. */ - test_curr->ignore_dr_err = txn[i].abort; - - /* Produce messages */ - txn[i].testid = test_id_generate(); - TEST_SAY( - "txn[%d]: Produce %d messages %ssynchronously " - "with testid %" PRIu64 "\n", - i, txn[i].msgcnt, txn[i].sync ? "" : "a", txn[i].testid); - - if (!txn[i].batch) { - if (txn[i].sync) - test_produce_msgs2(p, topic, txn[i].testid, - RD_KAFKA_PARTITION_UA, 0, - txn[i].msgcnt, NULL, 0); - else - test_produce_msgs2_nowait( - p, topic, txn[i].testid, - RD_KAFKA_PARTITION_UA, 0, txn[i].msgcnt, - NULL, 0, &wait_msgcnt); - } else if (txn[i].batch_any) { - /* Batch: use any partition */ - do_produce_batch(p, topic, txn[i].testid, - RD_KAFKA_PARTITION_UA, 0, - txn[i].msgcnt); - } else { - /* Batch: specific partition */ - do_produce_batch(p, topic, txn[i].testid, - 1 /* partition */, 0, txn[i].msgcnt); - } - - - /* Abort or commit transaction */ - TEST_SAY("txn[%d]: %s" _C_CLR " transaction\n", i, - txn[i].abort ? _C_RED "Abort" : _C_GRN "Commit"); - if (txn[i].abort) { - test_curr->ignore_dr_err = rd_true; - TEST_CALL_ERROR__( - rd_kafka_abort_transaction(p, 30 * 1000)); - } else { - test_curr->ignore_dr_err = rd_false; - TEST_CALL_ERROR__( - rd_kafka_commit_transaction(p, 30 * 1000)); - } - - if (!txn[i].sync) - /* Wait for delivery reports */ - test_wait_delivery(p, &wait_msgcnt); - - /* Consume messages */ - if (txn[i].abort) - test_consumer_poll_no_msgs(txn[i].desc, c, - txn[i].testid, 3000); - else - test_consumer_poll(txn[i].desc, c, txn[i].testid, - partition_cnt, 0, txn[i].msgcnt, - NULL); - - TEST_SAY(_C_GRN "txn[%d]: Finished successfully: %s\n" _C_CLR, - i, txn[i].desc); - } - - rd_kafka_destroy(p); - - test_consumer_close(c); - rd_kafka_destroy(c); - - SUB_TEST_PASS(); -} - - -/** - * @brief Consumes \p cnt messages and returns them in the provided array - * which must be pre-allocated. - */ -static void -consume_messages(rd_kafka_t *c, rd_kafka_message_t **msgs, int msgcnt) { - int i = 0; - while (i < msgcnt) { - msgs[i] = rd_kafka_consumer_poll(c, 1000); - if (!msgs[i]) - continue; - - if (msgs[i]->err) { - TEST_SAY("%s consumer error: %s\n", rd_kafka_name(c), - rd_kafka_message_errstr(msgs[i])); - rd_kafka_message_destroy(msgs[i]); - continue; - } - - TEST_SAYL(3, "%s: consumed message %s [%d] @ %" PRId64 "\n", - rd_kafka_name(c), rd_kafka_topic_name(msgs[i]->rkt), - msgs[i]->partition, msgs[i]->offset); - - - i++; - } -} - -static void destroy_messages(rd_kafka_message_t **msgs, int msgcnt) { - while (msgcnt-- > 0) - rd_kafka_message_destroy(msgs[msgcnt]); -} - - -/** - * @brief Test a transactional consumer + transactional producer combo, - * mimicing a streams job. - * - * One input topic produced to by transactional producer 1, - * consumed by transactional consumer 1, which forwards messages - * to transactional producer 2 that writes messages to output topic, - * which is consumed and verified by transactional consumer 2. - * - * Every 3rd transaction is aborted. - */ -void do_test_consumer_producer_txn(void) { - char *input_topic = - rd_strdup(test_mk_topic_name("0103-transactions-input", 1)); - char *output_topic = - rd_strdup(test_mk_topic_name("0103-transactions-output", 1)); - const char *c1_groupid = input_topic; - const char *c2_groupid = output_topic; - rd_kafka_t *p1, *p2, *c1, *c2; - rd_kafka_conf_t *conf, *tmpconf; - uint64_t testid; -#define _MSGCNT (10 * 30) - const int txncnt = 10; - const int msgcnt = _MSGCNT; - int txn; - int committed_msgcnt = 0; - test_msgver_t expect_mv, actual_mv; - - SUB_TEST_QUICK("transactional test with %d transactions", txncnt); - - test_conf_init(&conf, NULL, 30); - - testid = test_id_generate(); - - /* - * - * Producer 1 - * | - * v - * input topic - * | - * v - * Consumer 1 } - * | } transactional streams job - * v } - * Producer 2 } - * | - * v - * output tpic - * | - * v - * Consumer 2 - */ - - - /* Create Producer 1 and seed input topic */ - tmpconf = rd_kafka_conf_dup(conf); - test_conf_set(tmpconf, "transactional.id", input_topic); - rd_kafka_conf_set_dr_msg_cb(tmpconf, test_dr_msg_cb); - p1 = test_create_handle(RD_KAFKA_PRODUCER, tmpconf); - - /* Create input and output topics */ - test_create_topic(p1, input_topic, 4, 3); - test_create_topic(p1, output_topic, 4, 3); - - /* Seed input topic with messages */ - TEST_CALL_ERROR__(rd_kafka_init_transactions(p1, 30 * 1000)); - TEST_CALL_ERROR__(rd_kafka_begin_transaction(p1)); - test_produce_msgs2(p1, input_topic, testid, RD_KAFKA_PARTITION_UA, 0, - msgcnt, NULL, 0); - TEST_CALL_ERROR__(rd_kafka_commit_transaction(p1, 30 * 1000)); - - rd_kafka_destroy(p1); - - /* Create Consumer 1: reading msgs from input_topic (Producer 1) */ - tmpconf = rd_kafka_conf_dup(conf); - test_conf_set(tmpconf, "isolation.level", "read_committed"); - test_conf_set(tmpconf, "auto.offset.reset", "earliest"); - test_conf_set(tmpconf, "enable.auto.commit", "false"); - c1 = test_create_consumer(c1_groupid, NULL, tmpconf, NULL); - test_consumer_subscribe(c1, input_topic); - - /* Create Producer 2 */ - tmpconf = rd_kafka_conf_dup(conf); - test_conf_set(tmpconf, "transactional.id", output_topic); - rd_kafka_conf_set_dr_msg_cb(tmpconf, test_dr_msg_cb); - p2 = test_create_handle(RD_KAFKA_PRODUCER, tmpconf); - TEST_CALL_ERROR__(rd_kafka_init_transactions(p2, 30 * 1000)); - - /* Create Consumer 2: reading msgs from output_topic (Producer 2) */ - tmpconf = rd_kafka_conf_dup(conf); - test_conf_set(tmpconf, "isolation.level", "read_committed"); - test_conf_set(tmpconf, "auto.offset.reset", "earliest"); - c2 = test_create_consumer(c2_groupid, NULL, tmpconf, NULL); - test_consumer_subscribe(c2, output_topic); - - /* Keep track of what messages to expect on the output topic */ - test_msgver_init(&expect_mv, testid); - - for (txn = 0; txn < txncnt; txn++) { - int msgcnt2 = 10 * (1 + (txn % 3)); - rd_kafka_message_t *msgs[_MSGCNT]; - int i; - rd_bool_t do_abort = !(txn % 3); - rd_bool_t recreate_consumer = - (do_abort && txn == 3) || (!do_abort && txn == 2); - rd_kafka_topic_partition_list_t *offsets, - *expected_offsets = NULL; - rd_kafka_resp_err_t err; - rd_kafka_consumer_group_metadata_t *c1_cgmetadata; - int remains = msgcnt2; - - TEST_SAY(_C_BLU - "Begin transaction #%d/%d " - "(msgcnt=%d, do_abort=%s, recreate_consumer=%s)\n", - txn, txncnt, msgcnt2, do_abort ? "true" : "false", - recreate_consumer ? "true" : "false"); - - consume_messages(c1, msgs, msgcnt2); - - TEST_CALL_ERROR__(rd_kafka_begin_transaction(p2)); - - for (i = 0; i < msgcnt2; i++) { - rd_kafka_message_t *msg = msgs[i]; - - if (!do_abort) { - /* The expected msgver based on the input topic - * will be compared to the actual msgver based - * on the output topic, so we need to - * override the topic name to match - * the actual msgver's output topic. */ - test_msgver_add_msg0( - __FUNCTION__, __LINE__, rd_kafka_name(p2), - &expect_mv, msg, output_topic); - committed_msgcnt++; - } - - err = rd_kafka_producev( - p2, RD_KAFKA_V_TOPIC(output_topic), - RD_KAFKA_V_KEY(msg->key, msg->key_len), - RD_KAFKA_V_VALUE(msg->payload, msg->len), - RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), - RD_KAFKA_V_OPAQUE(&remains), RD_KAFKA_V_END); - TEST_ASSERT(!err, "produce failed: %s", - rd_kafka_err2str(err)); - - rd_kafka_poll(p2, 0); - } - - destroy_messages(msgs, msgcnt2); - - err = rd_kafka_assignment(c1, &offsets); - TEST_ASSERT(!err, "failed to get consumer assignment: %s", - rd_kafka_err2str(err)); - - err = rd_kafka_position(c1, offsets); - TEST_ASSERT(!err, "failed to get consumer position: %s", - rd_kafka_err2str(err)); - - c1_cgmetadata = rd_kafka_consumer_group_metadata(c1); - TEST_ASSERT(c1_cgmetadata != NULL, - "failed to get consumer group metadata"); - - TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction( - p2, offsets, c1_cgmetadata, -1)); - - if (recreate_consumer && !do_abort) { - expected_offsets = - rd_kafka_topic_partition_list_new(offsets->cnt); - - /* Cannot use rd_kafka_topic_partition_list_copy - * as it needs to be destroyed before closing the - * consumer, because of the _private field holding - * a reference to the internal toppar */ - for (i = 0; i < offsets->cnt; i++) { - rd_kafka_topic_partition_t *rktpar = - &offsets->elems[i]; - rd_kafka_topic_partition_t *rktpar_new; - rktpar_new = rd_kafka_topic_partition_list_add( - expected_offsets, rktpar->topic, - rktpar->partition); - rktpar_new->offset = rktpar->offset; - rd_kafka_topic_partition_set_leader_epoch( - rktpar_new, - rd_kafka_topic_partition_get_leader_epoch( - rktpar)); - } - } - - rd_kafka_consumer_group_metadata_destroy(c1_cgmetadata); - - rd_kafka_topic_partition_list_destroy(offsets); - - - if (do_abort) { - test_curr->ignore_dr_err = rd_true; - TEST_CALL_ERROR__( - rd_kafka_abort_transaction(p2, 30 * 1000)); - } else { - test_curr->ignore_dr_err = rd_false; - TEST_CALL_ERROR__( - rd_kafka_commit_transaction(p2, 30 * 1000)); - } - - TEST_ASSERT(remains == 0, - "expected no remaining messages " - "in-flight/in-queue, got %d", - remains); - - - if (recreate_consumer) { - /* Recreate the consumer to pick up - * on the committed offset. */ - TEST_SAY("Recreating consumer 1\n"); - rd_kafka_consumer_close(c1); - rd_kafka_destroy(c1); - - tmpconf = rd_kafka_conf_dup(conf); - test_conf_set(tmpconf, "isolation.level", - "read_committed"); - test_conf_set(tmpconf, "auto.offset.reset", "earliest"); - test_conf_set(tmpconf, "enable.auto.commit", "false"); - c1 = test_create_consumer(c1_groupid, NULL, tmpconf, - NULL); - test_consumer_subscribe(c1, input_topic); - - - if (expected_offsets) { - rd_kafka_topic_partition_list_t - *committed_offsets = - rd_kafka_topic_partition_list_copy( - expected_offsets); - /* Set committed offsets and epochs to a - * different value before requesting them. */ - for (i = 0; i < committed_offsets->cnt; i++) { - rd_kafka_topic_partition_t *rktpar = - &committed_offsets->elems[i]; - rktpar->offset = -100; - rd_kafka_topic_partition_set_leader_epoch( - rktpar, -100); - } - - TEST_CALL_ERR__(rd_kafka_committed( - c1, committed_offsets, -1)); - - if (test_partition_list_and_offsets_cmp( - expected_offsets, committed_offsets)) { - TEST_SAY("expected list:\n"); - test_print_partition_list( - expected_offsets); - TEST_SAY("committed() list:\n"); - test_print_partition_list( - committed_offsets); - TEST_FAIL( - "committed offsets don't match"); - } - - rd_kafka_topic_partition_list_destroy( - committed_offsets); - - rd_kafka_topic_partition_list_destroy( - expected_offsets); - } - } - } - - rd_kafka_conf_destroy(conf); - - test_msgver_init(&actual_mv, testid); - - test_consumer_poll("Verify output topic", c2, testid, -1, 0, - committed_msgcnt, &actual_mv); - - test_msgver_verify_compare("Verify output topic", &actual_mv, - &expect_mv, TEST_MSGVER_ALL); - - test_msgver_clear(&actual_mv); - test_msgver_clear(&expect_mv); - - rd_kafka_consumer_close(c1); - rd_kafka_consumer_close(c2); - rd_kafka_destroy(c1); - rd_kafka_destroy(c2); - rd_kafka_destroy(p2); - - rd_free(input_topic); - rd_free(output_topic); - - SUB_TEST_PASS(); -} - - -/** - * @brief Testing misuse of the transaction API. - */ -static void do_test_misuse_txn(void) { - const char *topic = test_mk_topic_name("0103-test_misuse_txn", 1); - rd_kafka_t *p; - rd_kafka_conf_t *conf; - rd_kafka_error_t *error; - rd_kafka_resp_err_t fatal_err; - char errstr[512]; - int i; - - /* - * transaction.timeout.ms out of range (from broker's point of view) - */ - SUB_TEST_QUICK(); - - test_conf_init(&conf, NULL, 10); - - test_conf_set(conf, "transactional.id", topic); - test_conf_set(conf, "transaction.timeout.ms", "2147483647"); - - p = test_create_handle(RD_KAFKA_PRODUCER, conf); - - error = rd_kafka_init_transactions(p, 10 * 1000); - TEST_ASSERT(error, "Expected init_transactions() to fail"); - TEST_ASSERT(rd_kafka_error_code(error) == - RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT, - "Expected error ERR_INVALID_TRANSACTION_TIMEOUT, " - "not %s: %s", - rd_kafka_error_name(error), - error ? rd_kafka_error_string(error) : ""); - TEST_ASSERT(rd_kafka_error_is_fatal(error), - "Expected error to have is_fatal() set"); - rd_kafka_error_destroy(error); - /* Check that a fatal error is raised */ - fatal_err = rd_kafka_fatal_error(p, errstr, sizeof(errstr)); - TEST_ASSERT(fatal_err == RD_KAFKA_RESP_ERR_INVALID_TRANSACTION_TIMEOUT, - "Expected fatal error ERR_INVALID_TRANSACTION_TIMEOUT, " - "not %s: %s", - rd_kafka_err2name(fatal_err), fatal_err ? errstr : ""); - - rd_kafka_destroy(p); - - - /* - * Multiple calls to init_transactions(): finish on first. - */ - TEST_SAY("[ Test multiple init_transactions(): finish on first ]\n"); - test_conf_init(&conf, NULL, 10); - - test_conf_set(conf, "transactional.id", topic); - - p = test_create_handle(RD_KAFKA_PRODUCER, conf); - - TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30 * 1000)); - - error = rd_kafka_init_transactions(p, 1); - TEST_ASSERT(error, "Expected init_transactions() to fail"); - TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__STATE, - "Expected ERR__STATE error, not %s", - rd_kafka_error_name(error)); - rd_kafka_error_destroy(error); - - TEST_CALL_ERROR__(rd_kafka_begin_transaction(p)); - - error = rd_kafka_init_transactions(p, 3 * 1000); - TEST_ASSERT(error, "Expected init_transactions() to fail"); - TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__STATE, - "Expected ERR__STATE error, not %s", - rd_kafka_error_name(error)); - rd_kafka_error_destroy(error); - - rd_kafka_destroy(p); - - - /* - * Multiple calls to init_transactions(): timeout on first. - */ - TEST_SAY("[ Test multiple init_transactions(): timeout on first ]\n"); - test_conf_init(&conf, NULL, 10); - - test_conf_set(conf, "transactional.id", topic); - - p = test_create_handle(RD_KAFKA_PRODUCER, conf); - - error = rd_kafka_init_transactions(p, 1); - TEST_ASSERT(error, "Expected init_transactions() to fail"); - TEST_SAY("error: %s, %d\n", rd_kafka_error_string(error), - rd_kafka_error_is_retriable(error)); - TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT, - "Expected ERR__TIMED_OUT, not %s: %s", - rd_kafka_error_name(error), rd_kafka_error_string(error)); - TEST_ASSERT(rd_kafka_error_is_retriable(error), - "Expected error to be retriable"); - rd_kafka_error_destroy(error); - - TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30 * 1000)); - - rd_kafka_destroy(p); - - - /* - * Multiple calls to init_transactions(): hysterical amounts - */ - TEST_SAY("[ Test multiple init_transactions(): hysterical amounts ]\n"); - test_conf_init(&conf, NULL, 10); - - test_conf_set(conf, "transactional.id", topic); - - p = test_create_handle(RD_KAFKA_PRODUCER, conf); - - /* Call until init succeeds */ - for (i = 0; i < 5000; i++) { - if (!(error = rd_kafka_init_transactions(p, 1))) - break; - - TEST_ASSERT(rd_kafka_error_is_retriable(error), - "Expected error to be retriable"); - rd_kafka_error_destroy(error); - - error = rd_kafka_begin_transaction(p); - TEST_ASSERT(error, "Expected begin_transactions() to fail"); - TEST_ASSERT(rd_kafka_error_code(error) == - RD_KAFKA_RESP_ERR__CONFLICT, - "Expected begin_transactions() to fail " - "with CONFLICT, not %s", - rd_kafka_error_name(error)); - - rd_kafka_error_destroy(error); - } - - TEST_ASSERT(i <= 5000, - "init_transactions() did not succeed after %d calls\n", i); - - TEST_SAY("init_transactions() succeeded after %d call(s)\n", i + 1); - - /* Make sure a sub-sequent init call fails. */ - error = rd_kafka_init_transactions(p, 5 * 1000); - TEST_ASSERT(error, "Expected init_transactions() to fail"); - TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__STATE, - "Expected init_transactions() to fail with STATE, not %s", - rd_kafka_error_name(error)); - rd_kafka_error_destroy(error); - - /* But begin.. should work now */ - TEST_CALL_ERROR__(rd_kafka_begin_transaction(p)); - - rd_kafka_destroy(p); - - SUB_TEST_PASS(); -} - - -/** - * @brief is_fatal_cb for fenced_txn test. - */ -static int fenced_txn_is_fatal_cb(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - const char *reason) { - TEST_SAY("is_fatal?: %s: %s\n", rd_kafka_err2str(err), reason); - if (err == RD_KAFKA_RESP_ERR__FENCED) { - TEST_SAY("Saw the expected fatal error\n"); - return 0; - } - return 1; -} - - -/** - * @brief Check that transaction fencing is handled correctly. - */ -static void do_test_fenced_txn(rd_bool_t produce_after_fence) { - const char *topic = test_mk_topic_name("0103_fenced_txn", 1); - rd_kafka_conf_t *conf; - rd_kafka_t *p1, *p2; - rd_kafka_error_t *error; - uint64_t testid; - - SUB_TEST_QUICK("%sproduce after fence", - produce_after_fence ? "" : "do not "); - - if (produce_after_fence) - test_curr->is_fatal_cb = fenced_txn_is_fatal_cb; - - test_curr->ignore_dr_err = rd_false; - - testid = test_id_generate(); - - test_conf_init(&conf, NULL, 30); - - test_conf_set(conf, "transactional.id", topic); - rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); - - p1 = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf)); - p2 = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf)); - rd_kafka_conf_destroy(conf); - - TEST_CALL_ERROR__(rd_kafka_init_transactions(p1, 30 * 1000)); - - /* Begin a transaction */ - TEST_CALL_ERROR__(rd_kafka_begin_transaction(p1)); - - /* Produce some messages */ - test_produce_msgs2(p1, topic, testid, RD_KAFKA_PARTITION_UA, 0, 10, - NULL, 0); - - /* Initialize transactions on producer 2, this should - * fence off producer 1. */ - TEST_CALL_ERROR__(rd_kafka_init_transactions(p2, 30 * 1000)); - - if (produce_after_fence) { - /* This will fail hard since the epoch was bumped. */ - TEST_SAY("Producing after producing fencing\n"); - test_curr->ignore_dr_err = rd_true; - test_produce_msgs2(p1, topic, testid, RD_KAFKA_PARTITION_UA, 0, - 10, NULL, 0); - } - - - error = rd_kafka_commit_transaction(p1, 30 * 1000); - - TEST_ASSERT(error, "Expected commit to fail"); - TEST_ASSERT(rd_kafka_fatal_error(p1, NULL, 0), - "Expected a fatal error to have been raised"); - TEST_ASSERT(error, "Expected commit_transaction() to fail"); - TEST_ASSERT(rd_kafka_error_is_fatal(error), - "Expected commit_transaction() to return a " - "fatal error"); - TEST_ASSERT(!rd_kafka_error_txn_requires_abort(error), - "Expected commit_transaction() not to return an " - "abortable error"); - TEST_ASSERT(!rd_kafka_error_is_retriable(error), - "Expected commit_transaction() not to return a " - "retriable error"); - TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__FENCED, - "Expected commit_transaction() to return %s, " - "not %s: %s", - rd_kafka_err2name(RD_KAFKA_RESP_ERR__FENCED), - rd_kafka_error_name(error), rd_kafka_error_string(error)); - rd_kafka_error_destroy(error); - - rd_kafka_destroy(p1); - rd_kafka_destroy(p2); - - /* Make sure no messages were committed. */ - test_consume_txn_msgs_easy( - topic, topic, testid, - test_get_partition_count(NULL, topic, 10 * 1000), 0, NULL); - - SUB_TEST_PASS(); -} - - - -/** - * @brief Check that fatal idempotent producer errors are also fatal - * transactional errors when KIP-360 is not supported. - */ -static void do_test_fatal_idempo_error_without_kip360(void) { - const char *topic = test_mk_topic_name("0103_fatal_idempo", 1); - const int32_t partition = 0; - rd_kafka_conf_t *conf, *c_conf; - rd_kafka_t *p, *c; - rd_kafka_error_t *error; - uint64_t testid; - const int msgcnt[3] = {6, 4, 1}; - rd_kafka_topic_partition_list_t *records; - test_msgver_t expect_mv, actual_mv; - /* This test triggers UNKNOWN_PRODUCER_ID on AK <2.4 and >2.4, but - * not on AK 2.4. - * On AK <2.5 (pre KIP-360) these errors are unrecoverable, - * on AK >2.5 (with KIP-360) we can recover. - * Since 2.4 is not behaving as the other releases we skip it here. */ - rd_bool_t expect_fail = test_broker_version < TEST_BRKVER(2, 5, 0, 0); - - SUB_TEST_QUICK( - "%s", expect_fail ? "expecting failure since broker is < 2.5" - : "not expecting failure since broker is >= 2.5"); - - if (test_broker_version >= TEST_BRKVER(2, 4, 0, 0) && - test_broker_version < TEST_BRKVER(2, 5, 0, 0)) - SUB_TEST_SKIP("can't trigger UNKNOWN_PRODUCER_ID on AK 2.4"); - - if (expect_fail) - test_curr->is_fatal_cb = test_error_is_not_fatal_cb; - test_curr->ignore_dr_err = expect_fail; - - testid = test_id_generate(); - - /* Keep track of what messages to expect on the output topic */ - test_msgver_init(&expect_mv, testid); - - test_conf_init(&conf, NULL, 30); - - test_conf_set(conf, "transactional.id", topic); - test_conf_set(conf, "batch.num.messages", "1"); - rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); - - p = test_create_handle(RD_KAFKA_PRODUCER, conf); - - test_create_topic(p, topic, 1, 3); - - - TEST_CALL_ERROR__(rd_kafka_init_transactions(p, 30 * 1000)); - - /* - * 3 transactions: - * 1. Produce some messages, commit. - * 2. Produce some messages, then delete the messages from txn 1 and - * then produce some more messages: UNKNOWN_PRODUCER_ID should be - * raised as a fatal error. - * 3. Start a new transaction, produce and commit some new messages. - * (this step is only performed when expect_fail is false). - */ - - /* - * Transaction 1 - */ - TEST_SAY(_C_BLU "Transaction 1: %d msgs\n", msgcnt[0]); - TEST_CALL_ERROR__(rd_kafka_begin_transaction(p)); - test_produce_msgs2(p, topic, testid, partition, 0, msgcnt[0], NULL, 0); - TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1)); - - - /* - * Transaction 2 - */ - TEST_SAY(_C_BLU "Transaction 2: %d msgs\n", msgcnt[1]); - TEST_CALL_ERROR__(rd_kafka_begin_transaction(p)); - - /* Now delete the messages from txn1 */ - TEST_SAY("Deleting records < %s [%" PRId32 "] offset %d+1\n", topic, - partition, msgcnt[0]); - records = rd_kafka_topic_partition_list_new(1); - rd_kafka_topic_partition_list_add(records, topic, partition)->offset = - msgcnt[0]; /* include the control message too */ - - TEST_CALL_ERR__(test_DeleteRecords_simple(p, NULL, records, NULL)); - rd_kafka_topic_partition_list_destroy(records); - - /* Wait for deletes to propagate */ - rd_sleep(2); - - if (!expect_fail) - test_curr->dr_mv = &expect_mv; - - /* Produce more messages, should now fail */ - test_produce_msgs2(p, topic, testid, partition, 0, msgcnt[1], NULL, 0); - - error = rd_kafka_commit_transaction(p, -1); - - TEST_SAY_ERROR(error, "commit_transaction() returned: "); - - if (expect_fail) { - TEST_ASSERT(error != NULL, "Expected transaction to fail"); - TEST_ASSERT(rd_kafka_error_txn_requires_abort(error), - "Expected abortable error"); - rd_kafka_error_destroy(error); - - /* Now abort transaction, which should raise the fatal error - * since it is the abort that performs the PID reinitialization. - */ - error = rd_kafka_abort_transaction(p, -1); - TEST_SAY_ERROR(error, "abort_transaction() returned: "); - TEST_ASSERT(error != NULL, "Expected abort to fail"); - TEST_ASSERT(rd_kafka_error_is_fatal(error), - "Expecting fatal error"); - TEST_ASSERT(!rd_kafka_error_is_retriable(error), - "Did not expect retriable error"); - TEST_ASSERT(!rd_kafka_error_txn_requires_abort(error), - "Did not expect abortable error"); - - rd_kafka_error_destroy(error); - - } else { - TEST_ASSERT(!error, "Did not expect commit to fail: %s", - rd_kafka_error_string(error)); - } - - - if (!expect_fail) { - /* - * Transaction 3 - */ - TEST_SAY(_C_BLU "Transaction 3: %d msgs\n", msgcnt[2]); - test_curr->dr_mv = &expect_mv; - TEST_CALL_ERROR__(rd_kafka_begin_transaction(p)); - test_produce_msgs2(p, topic, testid, partition, 0, msgcnt[2], - NULL, 0); - TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1)); - } - - rd_kafka_destroy(p); - - /* Consume messages. - * On AK<2.5 (expect_fail=true) we do not expect to see any messages - * since the producer will have failed with a fatal error. - * On AK>=2.5 (expect_fail=false) we should only see messages from - * txn 3 which are sent after the producer has recovered. - */ - - test_conf_init(&c_conf, NULL, 0); - test_conf_set(c_conf, "enable.partition.eof", "true"); - c = test_create_consumer(topic, NULL, c_conf, NULL); - test_consumer_assign_partition("consume", c, topic, partition, - RD_KAFKA_OFFSET_BEGINNING); - - test_msgver_init(&actual_mv, testid); - test_msgver_ignore_eof(&actual_mv); - - test_consumer_poll("Verify output topic", c, testid, 1, 0, -1, - &actual_mv); - - test_msgver_verify_compare("Verify output topic", &actual_mv, - &expect_mv, TEST_MSGVER_ALL); - - test_msgver_clear(&actual_mv); - test_msgver_clear(&expect_mv); - - rd_kafka_destroy(c); - - SUB_TEST_PASS(); -} - - -/** - * @brief Check that empty transactions, with no messages produced, work - * as expected. - */ -static void do_test_empty_txn(rd_bool_t send_offsets, rd_bool_t do_commit) { - const char *topic = test_mk_topic_name("0103_empty_txn", 1); - rd_kafka_conf_t *conf, *c_conf; - rd_kafka_t *p, *c; - uint64_t testid; - const int msgcnt = 10; - rd_kafka_topic_partition_list_t *committed; - int64_t offset; - - SUB_TEST_QUICK("%ssend offsets, %s", send_offsets ? "" : "don't ", - do_commit ? "commit" : "abort"); - - testid = test_id_generate(); - - test_conf_init(&conf, NULL, 30); - c_conf = rd_kafka_conf_dup(conf); - - test_conf_set(conf, "transactional.id", topic); - rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); - p = test_create_handle(RD_KAFKA_PRODUCER, conf); - - test_create_topic(p, topic, 1, 3); - - /* Produce some non-txnn messages for the consumer to read and commit */ - test_produce_msgs_easy(topic, testid, 0, msgcnt); - - /* Create consumer and subscribe to the topic */ - test_conf_set(c_conf, "auto.offset.reset", "earliest"); - test_conf_set(c_conf, "enable.auto.commit", "false"); - c = test_create_consumer(topic, NULL, c_conf, NULL); - test_consumer_subscribe(c, topic); - test_consumer_wait_assignment(c, rd_false); - - TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1)); - - TEST_CALL_ERROR__(rd_kafka_begin_transaction(p)); - - /* send_offsets? Consume messages and send those offsets to the txn */ - if (send_offsets) { - rd_kafka_topic_partition_list_t *offsets; - rd_kafka_consumer_group_metadata_t *cgmetadata; - - test_consumer_poll("consume", c, testid, -1, 0, msgcnt, NULL); - - TEST_CALL_ERR__(rd_kafka_assignment(c, &offsets)); - TEST_CALL_ERR__(rd_kafka_position(c, offsets)); - - cgmetadata = rd_kafka_consumer_group_metadata(c); - TEST_ASSERT(cgmetadata != NULL, - "failed to get consumer group metadata"); - - TEST_CALL_ERROR__(rd_kafka_send_offsets_to_transaction( - p, offsets, cgmetadata, -1)); - - rd_kafka_consumer_group_metadata_destroy(cgmetadata); - - rd_kafka_topic_partition_list_destroy(offsets); - } - - - if (do_commit) - TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1)); - else - TEST_CALL_ERROR__(rd_kafka_abort_transaction(p, -1)); - - /* Wait before checking the committed offsets (Kafka < 2.5.0) */ - if (test_broker_version < TEST_BRKVER(2, 5, 0, 0)) - rd_usleep(tmout_multip(5000 * 1000), NULL); - - /* Get the committed offsets */ - TEST_CALL_ERR__(rd_kafka_assignment(c, &committed)); - TEST_CALL_ERR__(rd_kafka_committed(c, committed, 10 * 1000)); - - TEST_ASSERT(committed->cnt == 1, - "expected one committed offset, not %d", committed->cnt); - offset = committed->elems[0].offset; - TEST_SAY("Committed offset is %" PRId64 "\n", offset); - - if (do_commit && send_offsets) - TEST_ASSERT(offset >= msgcnt, - "expected committed offset >= %d, got %" PRId64, - msgcnt, offset); - else - TEST_ASSERT(offset < 0, - "expected no committed offset, got %" PRId64, - offset); - - rd_kafka_topic_partition_list_destroy(committed); - - rd_kafka_destroy(c); - rd_kafka_destroy(p); - - SUB_TEST_PASS(); -} - -/** - * @returns the high watermark for the given partition. - */ -int64_t -query_hi_wmark0(int line, rd_kafka_t *c, const char *topic, int32_t partition) { - rd_kafka_resp_err_t err; - int64_t lo = -1, hi = -1; - - err = rd_kafka_query_watermark_offsets(c, topic, partition, &lo, &hi, - tmout_multip(5 * 1000)); - TEST_ASSERT(!err, "%d: query_watermark_offsets(%s) failed: %s", line, - topic, rd_kafka_err2str(err)); - - return hi; -} -#define query_hi_wmark(c, topic, part) query_hi_wmark0(__LINE__, c, topic, part) - -/** - * @brief Check that isolation.level works as expected for query_watermark..(). - */ -static void do_test_wmark_isolation_level(void) { - const char *topic = test_mk_topic_name("0103_wmark_isol", 1); - rd_kafka_conf_t *conf, *c_conf; - rd_kafka_t *p, *c1, *c2; - uint64_t testid; - int64_t hw_uncommitted, hw_committed; - - SUB_TEST_QUICK(); - - testid = test_id_generate(); - - test_conf_init(&conf, NULL, 30); - c_conf = rd_kafka_conf_dup(conf); - - test_conf_set(conf, "transactional.id", topic); - rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); - p = test_create_handle(RD_KAFKA_PRODUCER, rd_kafka_conf_dup(conf)); - - test_create_topic(p, topic, 1, 3); - - /* Produce some non-txn messages to avoid 0 as the committed hwmark */ - test_produce_msgs_easy(topic, testid, 0, 100); - - /* Create consumer and subscribe to the topic */ - test_conf_set(c_conf, "isolation.level", "read_committed"); - c1 = test_create_consumer(topic, NULL, rd_kafka_conf_dup(c_conf), NULL); - test_conf_set(c_conf, "isolation.level", "read_uncommitted"); - c2 = test_create_consumer(topic, NULL, c_conf, NULL); - - TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1)); - - TEST_CALL_ERROR__(rd_kafka_begin_transaction(p)); - - /* Produce some txn messages */ - test_produce_msgs2(p, topic, testid, 0, 0, 100, NULL, 0); - - test_flush(p, 10 * 1000); - - hw_committed = query_hi_wmark(c1, topic, 0); - hw_uncommitted = query_hi_wmark(c2, topic, 0); - - TEST_SAY("Pre-commit hwmarks: committed %" PRId64 - ", uncommitted %" PRId64 "\n", - hw_committed, hw_uncommitted); - - TEST_ASSERT(hw_committed > 0 && hw_committed < hw_uncommitted, - "Committed hwmark %" PRId64 - " should be lower than " - "uncommitted hwmark %" PRId64 " for %s [0]", - hw_committed, hw_uncommitted, topic); - - TEST_CALL_ERROR__(rd_kafka_commit_transaction(p, -1)); - - /* Re-create the producer and re-init transactions to make - * sure the transaction is fully committed in the cluster. */ - rd_kafka_destroy(p); - p = test_create_handle(RD_KAFKA_PRODUCER, conf); - TEST_CALL_ERROR__(rd_kafka_init_transactions(p, -1)); - rd_kafka_destroy(p); - - - /* Now query wmarks again */ - hw_committed = query_hi_wmark(c1, topic, 0); - hw_uncommitted = query_hi_wmark(c2, topic, 0); - - TEST_SAY("Post-commit hwmarks: committed %" PRId64 - ", uncommitted %" PRId64 "\n", - hw_committed, hw_uncommitted); - - TEST_ASSERT(hw_committed == hw_uncommitted, - "Committed hwmark %" PRId64 - " should be equal to " - "uncommitted hwmark %" PRId64 " for %s [0]", - hw_committed, hw_uncommitted, topic); - - rd_kafka_destroy(c1); - rd_kafka_destroy(c2); - - SUB_TEST_PASS(); -} - - - -int main_0103_transactions(int argc, char **argv) { - - do_test_misuse_txn(); - do_test_basic_producer_txn(rd_false /* without compression */); - do_test_basic_producer_txn(rd_true /* with compression */); - do_test_consumer_producer_txn(); - do_test_fenced_txn(rd_false /* no produce after fencing */); - do_test_fenced_txn(rd_true /* produce after fencing */); - do_test_fatal_idempo_error_without_kip360(); - do_test_empty_txn(rd_false /*don't send offsets*/, rd_true /*commit*/); - do_test_empty_txn(rd_false /*don't send offsets*/, rd_false /*abort*/); - do_test_empty_txn(rd_true /*send offsets*/, rd_true /*commit*/); - do_test_empty_txn(rd_true /*send offsets*/, rd_false /*abort*/); - do_test_wmark_isolation_level(); - return 0; -} - - - -/** - * @brief Transaction tests that don't require a broker. - */ -static void do_test_txn_local(void) { - rd_kafka_conf_t *conf; - rd_kafka_t *p; - rd_kafka_error_t *error; - test_timing_t t_init; - int timeout_ms = 7 * 1000; - - SUB_TEST_QUICK(); - - /* - * No transactional.id, init_transactions() should fail. - */ - test_conf_init(&conf, NULL, 0); - test_conf_set(conf, "bootstrap.servers", NULL); - - p = test_create_handle(RD_KAFKA_PRODUCER, conf); - - error = rd_kafka_init_transactions(p, 10); - TEST_ASSERT(error, "Expected init_transactions() to fail"); - TEST_ASSERT( - rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__NOT_CONFIGURED, - "Expected ERR__NOT_CONFIGURED, not %s", rd_kafka_error_name(error)); - rd_kafka_error_destroy(error); - - rd_kafka_destroy(p); - - - /* - * No brokers, init_transactions() should time out according - * to the timeout. - */ - test_conf_init(&conf, NULL, 0); - test_conf_set(conf, "bootstrap.servers", NULL); - test_conf_set(conf, "transactional.id", "test"); - p = test_create_handle(RD_KAFKA_PRODUCER, conf); - - TEST_SAY("Waiting for init_transactions() timeout %d ms\n", timeout_ms); - - test_timeout_set((timeout_ms + 2000) / 1000); - - TIMING_START(&t_init, "init_transactions()"); - error = rd_kafka_init_transactions(p, timeout_ms); - TIMING_STOP(&t_init); - TEST_ASSERT(error, "Expected init_transactions() to fail"); - TEST_ASSERT(rd_kafka_error_code(error) == RD_KAFKA_RESP_ERR__TIMED_OUT, - "Expected RD_KAFKA_RESP_ERR__TIMED_OUT, " - "not %s: %s", - rd_kafka_error_name(error), rd_kafka_error_string(error)); - - TEST_SAY("init_transactions() failed as expected: %s\n", - rd_kafka_error_string(error)); - - rd_kafka_error_destroy(error); - - TIMING_ASSERT(&t_init, timeout_ms - 2000, timeout_ms + 5000); - - rd_kafka_destroy(p); - - SUB_TEST_PASS(); -} - - -int main_0103_transactions_local(int argc, char **argv) { - - do_test_txn_local(); - - return 0; -} |