diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0137-barrier_batch_consume.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0137-barrier_batch_consume.c | 608 |
1 files changed, 608 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0137-barrier_batch_consume.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0137-barrier_batch_consume.c new file mode 100644 index 00000000..4e3c855d --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0137-barrier_batch_consume.c @@ -0,0 +1,608 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2022, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +/* Typical include path would be <librdkafka/rdkafka.h>, but this program + * is built from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" /* for Kafka driver */ + +typedef struct consumer_s { + const char *what; + rd_kafka_queue_t *rkq; + int timeout_ms; + int consume_msg_cnt; + int expected_msg_cnt; + rd_kafka_t *rk; + uint64_t testid; + test_msgver_t *mv; + struct test *test; +} consumer_t; + +static int consumer_batch_queue(void *arg) { + consumer_t *arguments = arg; + int msg_cnt = 0; + int i; + test_timing_t t_cons; + + rd_kafka_queue_t *rkq = arguments->rkq; + int timeout_ms = arguments->timeout_ms; + const int consume_msg_cnt = arguments->consume_msg_cnt; + rd_kafka_t *rk = arguments->rk; + uint64_t testid = arguments->testid; + rd_kafka_message_t **rkmessage = + malloc(consume_msg_cnt * sizeof(*rkmessage)); + + if (arguments->test) + test_curr = arguments->test; + + TEST_SAY( + "%s calling consume_batch_queue(timeout=%d, msgs=%d) " + "and expecting %d messages back\n", + rd_kafka_name(rk), timeout_ms, consume_msg_cnt, + arguments->expected_msg_cnt); + + TIMING_START(&t_cons, "CONSUME"); + msg_cnt = (int)rd_kafka_consume_batch_queue(rkq, timeout_ms, rkmessage, + consume_msg_cnt); + TIMING_STOP(&t_cons); + + TEST_SAY("%s consumed %d/%d/%d message(s)\n", rd_kafka_name(rk), + msg_cnt, arguments->consume_msg_cnt, + arguments->expected_msg_cnt); + TEST_ASSERT(msg_cnt == arguments->expected_msg_cnt, + "consumed %d messages, expected %d", msg_cnt, + arguments->expected_msg_cnt); + + for (i = 0; i < msg_cnt; i++) { + if (test_msgver_add_msg(rk, arguments->mv, rkmessage[i]) == 0) + TEST_FAIL( + "The message is not from testid " + "%" PRId64, + testid); + rd_kafka_message_destroy(rkmessage[i]); + } + + rd_free(rkmessage); + + return 0; +} + + +static void do_test_consume_batch_with_seek(void) { + rd_kafka_queue_t *rkq; + const char *topic; + rd_kafka_t *consumer; + int p; + uint64_t testid; + rd_kafka_conf_t *conf; + consumer_t consumer_args = RD_ZERO_INIT; + test_msgver_t mv; + thrd_t thread_id; + rd_kafka_error_t *err; + rd_kafka_topic_partition_list_t *seek_toppars; + const int partition_cnt = 2; + const int timeout_ms = 10000; + const int consume_msg_cnt = 10; + const int produce_msg_cnt = 8; + const int32_t seek_partition = 0; + const int64_t seek_offset = 1; + const int expected_msg_cnt = produce_msg_cnt - seek_offset; + + SUB_TEST(); + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + + testid = test_id_generate(); + test_msgver_init(&mv, testid); + + /* Produce messages */ + topic = test_mk_topic_name("0137-barrier_batch_consume", 1); + + test_create_topic(NULL, topic, partition_cnt, 1); + + for (p = 0; p < partition_cnt; p++) + test_produce_msgs_easy(topic, testid, p, + produce_msg_cnt / partition_cnt); + + /* Create consumers */ + consumer = test_create_consumer(topic, NULL, conf, NULL); + + test_consumer_subscribe(consumer, topic); + test_consumer_wait_assignment(consumer, rd_false); + + /* Create generic consume queue */ + rkq = rd_kafka_queue_get_consumer(consumer); + + consumer_args.what = "CONSUMER"; + consumer_args.rkq = rkq; + consumer_args.timeout_ms = timeout_ms; + consumer_args.consume_msg_cnt = consume_msg_cnt; + consumer_args.expected_msg_cnt = expected_msg_cnt; + consumer_args.rk = consumer; + consumer_args.testid = testid; + consumer_args.mv = &mv; + consumer_args.test = test_curr; + if (thrd_create(&thread_id, consumer_batch_queue, &consumer_args) != + thrd_success) + TEST_FAIL("Failed to create thread for %s", "CONSUMER"); + + seek_toppars = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(seek_toppars, topic, seek_partition); + rd_kafka_topic_partition_list_set_offset(seek_toppars, topic, + seek_partition, seek_offset); + err = rd_kafka_seek_partitions(consumer, seek_toppars, 2000); + + TEST_ASSERT( + !err, "Failed to seek partition %d for topic %s to offset %" PRId64, + seek_partition, topic, seek_offset); + + thrd_join(thread_id, NULL); + + test_msgver_verify("CONSUME", &mv, + TEST_MSGVER_ORDER | TEST_MSGVER_DUP | + TEST_MSGVER_BY_OFFSET, + 0, expected_msg_cnt); + test_msgver_clear(&mv); + + rd_kafka_topic_partition_list_destroy(seek_toppars); + + rd_kafka_queue_destroy(rkq); + + test_consumer_close(consumer); + + rd_kafka_destroy(consumer); + + SUB_TEST_PASS(); +} + + +static void do_test_consume_batch_with_pause_and_resume_different_batch(void) { + rd_kafka_queue_t *rkq; + const char *topic; + rd_kafka_t *consumer; + int p; + uint64_t testid; + rd_kafka_conf_t *conf; + consumer_t consumer_args = RD_ZERO_INIT; + test_msgver_t mv; + thrd_t thread_id; + rd_kafka_resp_err_t err; + rd_kafka_topic_partition_list_t *pause_partition_list; + const int timeout_ms = 2000; + const int consume_msg_cnt = 10; + const int produce_msg_cnt = 8; + const int partition_cnt = 2; + const int expected_msg_cnt = 4; + int32_t pause_partition = 0; + int32_t running_partition = 1; + + SUB_TEST(); + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + + testid = test_id_generate(); + test_msgver_init(&mv, testid); + + /* Produce messages */ + topic = test_mk_topic_name("0137-barrier_batch_consume", 1); + + test_create_topic(NULL, topic, partition_cnt, 1); + + for (p = 0; p < partition_cnt; p++) + test_produce_msgs_easy(topic, testid, p, + produce_msg_cnt / partition_cnt); + + /* Create consumers */ + consumer = test_create_consumer(topic, NULL, conf, NULL); + + test_consumer_subscribe(consumer, topic); + test_consumer_wait_assignment(consumer, rd_false); + + /* Create generic consume queue */ + rkq = rd_kafka_queue_get_consumer(consumer); + + consumer_args.what = "CONSUMER"; + consumer_args.rkq = rkq; + consumer_args.timeout_ms = timeout_ms; + consumer_args.consume_msg_cnt = consume_msg_cnt; + consumer_args.expected_msg_cnt = expected_msg_cnt; + consumer_args.rk = consumer; + consumer_args.testid = testid; + consumer_args.mv = &mv; + consumer_args.test = test_curr; + if (thrd_create(&thread_id, consumer_batch_queue, &consumer_args) != + thrd_success) + TEST_FAIL("Failed to create thread for %s", "CONSUMER"); + + pause_partition_list = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(pause_partition_list, topic, + pause_partition); + + rd_sleep(1); + err = rd_kafka_pause_partitions(consumer, pause_partition_list); + + TEST_ASSERT(!err, "Failed to pause partition %d for topic %s", + pause_partition, topic); + + thrd_join(thread_id, NULL); + + test_msgver_verify_part("CONSUME", &mv, + TEST_MSGVER_ORDER | TEST_MSGVER_DUP | + TEST_MSGVER_BY_OFFSET, + topic, running_partition, 0, expected_msg_cnt); + + test_msgver_clear(&mv); + test_msgver_init(&mv, testid); + consumer_args.mv = &mv; + + err = rd_kafka_resume_partitions(consumer, pause_partition_list); + + TEST_ASSERT(!err, "Failed to resume partition %d for topic %s", + pause_partition, topic); + + consumer_batch_queue(&consumer_args); + + test_msgver_verify_part("CONSUME", &mv, + TEST_MSGVER_ORDER | TEST_MSGVER_DUP | + TEST_MSGVER_BY_OFFSET, + topic, pause_partition, 0, expected_msg_cnt); + + rd_kafka_topic_partition_list_destroy(pause_partition_list); + + test_msgver_clear(&mv); + + rd_kafka_queue_destroy(rkq); + + test_consumer_close(consumer); + + rd_kafka_destroy(consumer); + + SUB_TEST_PASS(); +} + + +static void do_test_consume_batch_with_pause_and_resume_same_batch(void) { + rd_kafka_queue_t *rkq; + const char *topic; + rd_kafka_t *consumer; + int p; + uint64_t testid; + rd_kafka_conf_t *conf; + consumer_t consumer_args = RD_ZERO_INIT; + test_msgver_t mv; + thrd_t thread_id; + rd_kafka_resp_err_t err; + rd_kafka_topic_partition_list_t *pause_partition_list; + const int timeout_ms = 10000; + const int consume_msg_cnt = 10; + const int produce_msg_cnt = 8; + const int partition_cnt = 2; + int32_t pause_partition = 0; + + SUB_TEST(); + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + + testid = test_id_generate(); + test_msgver_init(&mv, testid); + + /* Produce messages */ + topic = test_mk_topic_name("0137-barrier_batch_consume", 1); + + test_create_topic(NULL, topic, partition_cnt, 1); + + for (p = 0; p < partition_cnt; p++) + test_produce_msgs_easy(topic, testid, p, + produce_msg_cnt / partition_cnt); + + /* Create consumers */ + consumer = test_create_consumer(topic, NULL, conf, NULL); + + test_consumer_subscribe(consumer, topic); + test_consumer_wait_assignment(consumer, rd_false); + + /* Create generic consume queue */ + rkq = rd_kafka_queue_get_consumer(consumer); + + consumer_args.what = "CONSUMER"; + consumer_args.rkq = rkq; + consumer_args.timeout_ms = timeout_ms; + consumer_args.consume_msg_cnt = consume_msg_cnt; + consumer_args.expected_msg_cnt = produce_msg_cnt; + consumer_args.rk = consumer; + consumer_args.testid = testid; + consumer_args.mv = &mv; + consumer_args.test = test_curr; + if (thrd_create(&thread_id, consumer_batch_queue, &consumer_args) != + thrd_success) + TEST_FAIL("Failed to create thread for %s", "CONSUMER"); + + pause_partition_list = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(pause_partition_list, topic, + pause_partition); + + rd_sleep(1); + err = rd_kafka_pause_partitions(consumer, pause_partition_list); + + TEST_ASSERT(!err, "Failed to pause partition %d for topic %s", + pause_partition, topic); + + rd_sleep(1); + + err = rd_kafka_resume_partitions(consumer, pause_partition_list); + + TEST_ASSERT(!err, "Failed to resume partition %d for topic %s", + pause_partition, topic); + + thrd_join(thread_id, NULL); + + test_msgver_verify("CONSUME", &mv, + TEST_MSGVER_ORDER | TEST_MSGVER_DUP | + TEST_MSGVER_BY_OFFSET, + 0, produce_msg_cnt); + + rd_kafka_topic_partition_list_destroy(pause_partition_list); + + test_msgver_clear(&mv); + + rd_kafka_queue_destroy(rkq); + + test_consumer_close(consumer); + + rd_kafka_destroy(consumer); + + SUB_TEST_PASS(); +} + + +static void do_test_consume_batch_store_offset(void) { + rd_kafka_queue_t *rkq; + const char *topic; + rd_kafka_t *consumer; + int p; + int i; + uint64_t testid; + rd_kafka_conf_t *conf; + consumer_t consumer_args = RD_ZERO_INIT; + test_msgver_t mv; + const int partition_cnt = 1; + const int timeout_ms = 10000; + const int consume_msg_cnt = 4; + const int no_of_consume = 2; + const int produce_msg_cnt = 8; + const int expected_msg_cnt = produce_msg_cnt; + + SUB_TEST(); + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "enable.auto.offset.store", "true"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + + testid = test_id_generate(); + test_msgver_init(&mv, testid); + + /* Produce messages */ + topic = test_mk_topic_name("0137-barrier_batch_consume", 1); + + test_create_topic(NULL, topic, partition_cnt, 1); + + for (p = 0; p < partition_cnt; p++) + test_produce_msgs_easy(topic, testid, p, + produce_msg_cnt / partition_cnt); + + for (i = 0; i < no_of_consume; i++) { + + /* Create consumers */ + consumer = test_create_consumer(topic, NULL, + rd_kafka_conf_dup(conf), NULL); + test_consumer_subscribe(consumer, topic); + test_consumer_wait_assignment(consumer, rd_false); + + /* Create generic consume queue */ + rkq = rd_kafka_queue_get_consumer(consumer); + + consumer_args.what = "CONSUMER"; + consumer_args.rkq = rkq; + consumer_args.timeout_ms = timeout_ms; + consumer_args.consume_msg_cnt = consume_msg_cnt; + consumer_args.expected_msg_cnt = + produce_msg_cnt / no_of_consume; + consumer_args.rk = consumer; + consumer_args.testid = testid; + consumer_args.mv = &mv; + consumer_args.test = test_curr; + + consumer_batch_queue(&consumer_args); + rd_kafka_commit(consumer, NULL, rd_false); + + rd_kafka_queue_destroy(rkq); + test_consumer_close(consumer); + rd_kafka_destroy(consumer); + } + + test_msgver_verify("CONSUME", &mv, + TEST_MSGVER_ORDER | TEST_MSGVER_DUP | + TEST_MSGVER_BY_OFFSET, + 0, expected_msg_cnt); + + test_msgver_clear(&mv); + + rd_kafka_conf_destroy(conf); + + SUB_TEST_PASS(); +} + + +static void do_test_consume_batch_control_msgs(void) { + const char *topic = test_mk_topic_name("0137-barrier_batch_consume", 1); + const int32_t partition = 0; + rd_kafka_conf_t *conf, *c_conf; + rd_kafka_t *producer, *consumer; + uint64_t testid; + const int msgcnt[2] = {2, 3}; + test_msgver_t mv; + rd_kafka_queue_t *rkq; + consumer_t consumer_args = RD_ZERO_INIT; + const int partition_cnt = 1; + const int timeout_ms = 5000; + const int consume_msg_cnt = 10; + const int expected_msg_cnt = 2; + int32_t pause_partition = 0; + int64_t expected_offset = msgcnt[0] + msgcnt[1] + 2; + rd_kafka_topic_partition_list_t *pause_partition_list; + rd_kafka_resp_err_t err; + thrd_t thread_id; + + SUB_TEST("Testing control msgs flow"); + + testid = test_id_generate(); + + test_conf_init(&conf, NULL, 30); + + test_conf_set(conf, "transactional.id", topic); + test_conf_set(conf, "batch.num.messages", "1"); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + + producer = test_create_handle(RD_KAFKA_PRODUCER, conf); + + test_create_topic(producer, topic, partition_cnt, 1); + + TEST_CALL_ERROR__(rd_kafka_init_transactions(producer, 30 * 1000)); + + /* + * Transaction 1 + */ + TEST_SAY("Transaction 1: %d msgs\n", msgcnt[0]); + TEST_CALL_ERROR__(rd_kafka_begin_transaction(producer)); + test_produce_msgs2(producer, topic, testid, partition, 0, msgcnt[0], + NULL, 0); + TEST_CALL_ERROR__(rd_kafka_commit_transaction(producer, -1)); + + /* + * Transaction 2 + */ + TEST_SAY("Transaction 2: %d msgs\n", msgcnt[1]); + TEST_CALL_ERROR__(rd_kafka_begin_transaction(producer)); + test_produce_msgs2(producer, topic, testid, partition, 0, msgcnt[1], + NULL, 0); + TEST_CALL_ERROR__(rd_kafka_abort_transaction(producer, -1)); + + rd_kafka_destroy(producer); + + rd_sleep(2); + + /* + * Consumer + */ + test_conf_init(&c_conf, NULL, 0); + test_conf_set(c_conf, "enable.auto.commit", "false"); + test_conf_set(c_conf, "enable.auto.offset.store", "true"); + test_conf_set(c_conf, "auto.offset.reset", "earliest"); + consumer = test_create_consumer(topic, NULL, c_conf, NULL); + + test_consumer_subscribe(consumer, topic); + test_consumer_wait_assignment(consumer, rd_false); + + /* Create generic consume queue */ + rkq = rd_kafka_queue_get_consumer(consumer); + + test_msgver_init(&mv, testid); + test_msgver_ignore_eof(&mv); + + consumer_args.what = "CONSUMER"; + consumer_args.rkq = rkq; + consumer_args.timeout_ms = timeout_ms; + consumer_args.consume_msg_cnt = consume_msg_cnt; + consumer_args.expected_msg_cnt = expected_msg_cnt; + consumer_args.rk = consumer; + consumer_args.testid = testid; + consumer_args.mv = &mv; + consumer_args.test = test_curr; + + + if (thrd_create(&thread_id, consumer_batch_queue, &consumer_args) != + thrd_success) + TEST_FAIL("Failed to create thread for %s", "CONSUMER"); + + pause_partition_list = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(pause_partition_list, topic, + pause_partition); + + rd_sleep(1); + err = rd_kafka_pause_partitions(consumer, pause_partition_list); + + TEST_ASSERT(!err, "Failed to pause partition %d for topic %s", + pause_partition, topic); + + rd_sleep(1); + + err = rd_kafka_resume_partitions(consumer, pause_partition_list); + + TEST_ASSERT(!err, "Failed to resume partition %d for topic %s", + pause_partition, topic); + + thrd_join(thread_id, NULL); + + rd_kafka_commit(consumer, NULL, rd_false); + + rd_kafka_committed(consumer, pause_partition_list, timeout_ms); + + TEST_ASSERT(pause_partition_list->elems[0].offset == expected_offset, + "Expected offset should be %" PRId64 ", but it is %" PRId64, + expected_offset, pause_partition_list->elems[0].offset); + + rd_kafka_topic_partition_list_destroy(pause_partition_list); + + rd_kafka_queue_destroy(rkq); + + test_msgver_clear(&mv); + + test_consumer_close(consumer); + + rd_kafka_destroy(consumer); + + SUB_TEST_PASS(); +} + + +int main_0137_barrier_batch_consume(int argc, char **argv) { + do_test_consume_batch_with_seek(); + do_test_consume_batch_store_offset(); + do_test_consume_batch_with_pause_and_resume_different_batch(); + do_test_consume_batch_with_pause_and_resume_same_batch(); + do_test_consume_batch_control_msgs(); + + return 0; +} |