diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:19:48 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-03-09 13:20:02 +0000 |
commit | 58daab21cd043e1dc37024a7f99b396788372918 (patch) | |
tree | 96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c | |
parent | Releasing debian version 1.43.2-1. (diff) | |
download | netdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz netdata-58daab21cd043e1dc37024a7f99b396788372918.zip |
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c | 576 |
1 files changed, 576 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c new file mode 100644 index 000000000..584d37bc6 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c @@ -0,0 +1,576 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2013, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * Tests messages are produced in order. + */ + + +#include "test.h" + +/* Typical include path would be <librdkafka/rdkafka.h>, but this program + * is built from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" /* for Kafka driver */ + + +static int msgid_next = 0; +static int fails = 0; +static int msgcounter = 0; +static int *dr_partition_count = NULL; +static const int topic_num_partitions = 4; +static int msg_partition_wo_flag = 2; +static int msg_partition_wo_flag_success = 0; + +/** + * Delivery reported callback. + * Called for each message once to signal its delivery status. + */ +static void dr_single_partition_cb(rd_kafka_t *rk, + void *payload, + size_t len, + rd_kafka_resp_err_t err, + void *opaque, + void *msg_opaque) { + int msgid = *(int *)msg_opaque; + + free(msg_opaque); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) + TEST_FAIL("Message delivery failed: %s\n", + rd_kafka_err2str(err)); + + if (msgid != msgid_next) { + fails++; + TEST_FAIL("Delivered msg %i, expected %i\n", msgid, msgid_next); + return; + } + + msgid_next = msgid + 1; + msgcounter--; +} + +/* Produce a batch of messages to a single partition. */ +static void test_single_partition(void) { + int partition = 0; + int r; + rd_kafka_t *rk; + rd_kafka_topic_t *rkt; + rd_kafka_conf_t *conf; + rd_kafka_topic_conf_t *topic_conf; + char msg[128]; + int msgcnt = test_quick ? 100 : 100000; + int failcnt = 0; + int i; + rd_kafka_message_t *rkmessages; + + msgid_next = 0; + + test_conf_init(&conf, &topic_conf, 20); + + /* Set delivery report callback */ + rd_kafka_conf_set_dr_cb(conf, dr_single_partition_cb); + + /* Create kafka instance */ + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + TEST_SAY("test_single_partition: Created kafka instance %s\n", + rd_kafka_name(rk)); + + rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf); + if (!rkt) + TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); + + /* Create messages */ + rkmessages = calloc(sizeof(*rkmessages), msgcnt); + for (i = 0; i < msgcnt; i++) { + int *msgidp = malloc(sizeof(*msgidp)); + *msgidp = i; + rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i", + __FILE__, __FUNCTION__, i); + + rkmessages[i].payload = rd_strdup(msg); + rkmessages[i].len = strlen(msg); + rkmessages[i]._private = msgidp; + rkmessages[i].partition = 2; /* Will be ignored since + * RD_KAFKA_MSG_F_PARTITION + * is not supplied. */ + } + + r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE, + rkmessages, msgcnt); + + /* Scan through messages to check for errors. */ + for (i = 0; i < msgcnt; i++) { + if (rkmessages[i].err) { + failcnt++; + if (failcnt < 100) + TEST_SAY("Message #%i failed: %s\n", i, + rd_kafka_err2str(rkmessages[i].err)); + } + } + + /* All messages should've been produced. */ + if (r < msgcnt) { + TEST_SAY( + "Not all messages were accepted " + "by produce_batch(): %i < %i\n", + r, msgcnt); + if (msgcnt - r != failcnt) + TEST_SAY( + "Discrepency between failed messages (%i) " + "and return value %i (%i - %i)\n", + failcnt, msgcnt - r, msgcnt, r); + TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt); + } + + free(rkmessages); + TEST_SAY( + "Single partition: " + "Produced %i messages, waiting for deliveries\n", + r); + + msgcounter = msgcnt; + + /* Wait for messages to be delivered */ + test_wait_delivery(rk, &msgcounter); + + if (fails) + TEST_FAIL("%i failures, see previous errors", fails); + + if (msgid_next != msgcnt) + TEST_FAIL("Still waiting for messages: next %i != end %i\n", + msgid_next, msgcnt); + + /* Destroy topic */ + rd_kafka_topic_destroy(rkt); + + /* Destroy rdkafka instance */ + TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); + rd_kafka_destroy(rk); + + return; +} + + + +/** + * Delivery reported callback. + * Called for each message once to signal its delivery status. + */ +static void dr_partitioner_cb(rd_kafka_t *rk, + void *payload, + size_t len, + rd_kafka_resp_err_t err, + void *opaque, + void *msg_opaque) { + int msgid = *(int *)msg_opaque; + + free(msg_opaque); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) + TEST_FAIL("Message delivery failed: %s\n", + rd_kafka_err2str(err)); + + if (msgcounter <= 0) + TEST_FAIL( + "Too many message dr_cb callback calls " + "(at msgid #%i)\n", + msgid); + msgcounter--; +} + +/* Produce a batch of messages using random (default) partitioner */ +static void test_partitioner(void) { + int partition = RD_KAFKA_PARTITION_UA; + int r; + rd_kafka_t *rk; + rd_kafka_topic_t *rkt; + rd_kafka_conf_t *conf; + rd_kafka_topic_conf_t *topic_conf; + char msg[128]; + int msgcnt = test_quick ? 100 : 100000; + int failcnt = 0; + int i; + rd_kafka_message_t *rkmessages; + + test_conf_init(&conf, &topic_conf, 30); + + /* Set delivery report callback */ + rd_kafka_conf_set_dr_cb(conf, dr_partitioner_cb); + + /* Create kafka instance */ + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + TEST_SAY("test_partitioner: Created kafka instance %s\n", + rd_kafka_name(rk)); + + rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf); + if (!rkt) + TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); + + /* Create messages */ + rkmessages = calloc(sizeof(*rkmessages), msgcnt); + for (i = 0; i < msgcnt; i++) { + int *msgidp = malloc(sizeof(*msgidp)); + *msgidp = i; + rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i", + __FILE__, __FUNCTION__, i); + + rkmessages[i].payload = rd_strdup(msg); + rkmessages[i].len = strlen(msg); + rkmessages[i]._private = msgidp; + } + + r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE, + rkmessages, msgcnt); + + /* Scan through messages to check for errors. */ + for (i = 0; i < msgcnt; i++) { + if (rkmessages[i].err) { + failcnt++; + if (failcnt < 100) + TEST_SAY("Message #%i failed: %s\n", i, + rd_kafka_err2str(rkmessages[i].err)); + } + } + + /* All messages should've been produced. */ + if (r < msgcnt) { + TEST_SAY( + "Not all messages were accepted " + "by produce_batch(): %i < %i\n", + r, msgcnt); + if (msgcnt - r != failcnt) + TEST_SAY( + "Discrepency between failed messages (%i) " + "and return value %i (%i - %i)\n", + failcnt, msgcnt - r, msgcnt, r); + TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt); + } + + free(rkmessages); + TEST_SAY( + "Partitioner: " + "Produced %i messages, waiting for deliveries\n", + r); + + msgcounter = msgcnt; + /* Wait for messages to be delivered */ + test_wait_delivery(rk, &msgcounter); + + if (fails) + TEST_FAIL("%i failures, see previous errors", fails); + + if (msgcounter != 0) + TEST_FAIL("Still waiting for %i/%i messages\n", msgcounter, + msgcnt); + + /* Destroy topic */ + rd_kafka_topic_destroy(rkt); + + /* Destroy rdkafka instance */ + TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); + rd_kafka_destroy(rk); + + return; +} + +static void dr_per_message_partition_cb(rd_kafka_t *rk, + const rd_kafka_message_t *rkmessage, + void *opaque) { + + free(rkmessage->_private); + + if (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) + TEST_FAIL("Message delivery failed: %s\n", + rd_kafka_err2str(rkmessage->err)); + + if (msgcounter <= 0) + TEST_FAIL( + "Too many message dr_cb callback calls " + "(at msg offset #%" PRId64 ")\n", + rkmessage->offset); + + TEST_ASSERT(rkmessage->partition < topic_num_partitions); + msgcounter--; + + dr_partition_count[rkmessage->partition]++; +} + +/* Produce a batch of messages using with per message partition flag */ +static void test_per_message_partition_flag(void) { + int partition = 0; + int r; + rd_kafka_t *rk; + rd_kafka_topic_t *rkt; + rd_kafka_conf_t *conf; + rd_kafka_topic_conf_t *topic_conf; + char msg[128 + sizeof(__FILE__) + sizeof(__FUNCTION__)]; + int msgcnt = test_quick ? 100 : 1000; + int failcnt = 0; + int i; + int *rkpartition_counts; + rd_kafka_message_t *rkmessages; + const char *topic_name; + + test_conf_init(&conf, &topic_conf, 30); + + /* Set delivery report callback */ + rd_kafka_conf_set_dr_msg_cb(conf, dr_per_message_partition_cb); + + /* Create kafka instance */ + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + TEST_SAY("test_per_message_partition_flag: Created kafka instance %s\n", + rd_kafka_name(rk)); + topic_name = test_mk_topic_name("0011_per_message_flag", 1); + test_create_topic(rk, topic_name, topic_num_partitions, 1); + + rkt = rd_kafka_topic_new(rk, topic_name, topic_conf); + if (!rkt) + TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); + + /* Create messages */ + rkpartition_counts = calloc(sizeof(int), topic_num_partitions); + dr_partition_count = calloc(sizeof(int), topic_num_partitions); + rkmessages = calloc(sizeof(*rkmessages), msgcnt); + for (i = 0; i < msgcnt; i++) { + int *msgidp = malloc(sizeof(*msgidp)); + *msgidp = i; + rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i", + __FILE__, __FUNCTION__, i); + + rkmessages[i].payload = rd_strdup(msg); + rkmessages[i].len = strlen(msg); + rkmessages[i]._private = msgidp; + rkmessages[i].partition = jitter(0, topic_num_partitions - 1); + rkpartition_counts[rkmessages[i].partition]++; + } + + r = rd_kafka_produce_batch( + rkt, partition, RD_KAFKA_MSG_F_PARTITION | RD_KAFKA_MSG_F_FREE, + rkmessages, msgcnt); + + /* Scan through messages to check for errors. */ + for (i = 0; i < msgcnt; i++) { + if (rkmessages[i].err) { + failcnt++; + if (failcnt < 100) + TEST_SAY("Message #%i failed: %s\n", i, + rd_kafka_err2str(rkmessages[i].err)); + } + } + + /* All messages should've been produced. */ + if (r < msgcnt) { + TEST_SAY( + "Not all messages were accepted " + "by produce_batch(): %i < %i\n", + r, msgcnt); + if (msgcnt - r != failcnt) + TEST_SAY( + "Discrepency between failed messages (%i) " + "and return value %i (%i - %i)\n", + failcnt, msgcnt - r, msgcnt, r); + TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt); + } + + free(rkmessages); + TEST_SAY( + "Per-message partition: " + "Produced %i messages, waiting for deliveries\n", + r); + + msgcounter = msgcnt; + /* Wait for messages to be delivered */ + test_wait_delivery(rk, &msgcounter); + + if (msgcounter != 0) + TEST_FAIL("Still waiting for %i/%i messages\n", msgcounter, + msgcnt); + + for (i = 0; i < topic_num_partitions; i++) { + if (dr_partition_count[i] != rkpartition_counts[i]) { + TEST_FAIL( + "messages were not sent to designated " + "partitions expected messages %i in " + "partition %i, but only " + "%i messages were sent", + rkpartition_counts[i], i, dr_partition_count[i]); + } + } + + free(rkpartition_counts); + free(dr_partition_count); + + /* Destroy topic */ + rd_kafka_topic_destroy(rkt); + + /* Destroy rdkafka instance */ + TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); + rd_kafka_destroy(rk); + + return; +} + +static void +dr_partitioner_wo_per_message_flag_cb(rd_kafka_t *rk, + const rd_kafka_message_t *rkmessage, + void *opaque) { + free(rkmessage->_private); + + if (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) + TEST_FAIL("Message delivery failed: %s\n", + rd_kafka_err2str(rkmessage->err)); + if (msgcounter <= 0) + TEST_FAIL( + "Too many message dr_cb callback calls " + "(at msg offset #%" PRId64 ")\n", + rkmessage->offset); + if (rkmessage->partition != msg_partition_wo_flag) + msg_partition_wo_flag_success = 1; + msgcounter--; +} + +/** + * @brief Produce a batch of messages using partitioner + * without per message partition flag + */ +static void test_message_partitioner_wo_per_message_flag(void) { + int partition = RD_KAFKA_PARTITION_UA; + int r; + rd_kafka_t *rk; + rd_kafka_topic_t *rkt; + rd_kafka_conf_t *conf; + rd_kafka_topic_conf_t *topic_conf; + char msg[128 + sizeof(__FILE__) + sizeof(__FUNCTION__)]; + int msgcnt = test_quick ? 100 : 1000; + int failcnt = 0; + int i; + rd_kafka_message_t *rkmessages; + + test_conf_init(&conf, &topic_conf, 30); + + /* Set delivery report callback */ + rd_kafka_conf_set_dr_msg_cb(conf, + dr_partitioner_wo_per_message_flag_cb); + test_conf_set(conf, "sticky.partitioning.linger.ms", "0"); + + /* Create kafka instance */ + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + TEST_SAY("test_partitioner: Created kafka instance %s\n", + rd_kafka_name(rk)); + + rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf); + if (!rkt) + TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); + + /* Create messages */ + rkmessages = calloc(sizeof(*rkmessages), msgcnt); + for (i = 0; i < msgcnt; i++) { + int *msgidp = malloc(sizeof(*msgidp)); + *msgidp = i; + rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i", + __FILE__, __FUNCTION__, i); + + rkmessages[i].payload = rd_strdup(msg); + rkmessages[i].len = strlen(msg); + rkmessages[i]._private = msgidp; + rkmessages[i].partition = msg_partition_wo_flag; + } + + r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE, + rkmessages, msgcnt); + + /* Scan through messages to check for errors. */ + for (i = 0; i < msgcnt; i++) { + if (rkmessages[i].err) { + failcnt++; + if (failcnt < 100) + TEST_SAY("Message #%i failed: %s\n", i, + rd_kafka_err2str(rkmessages[i].err)); + } + } + + /* All messages should've been produced. */ + if (r < msgcnt) { + TEST_SAY( + "Not all messages were accepted " + "by produce_batch(): %i < %i\n", + r, msgcnt); + if (msgcnt - r != failcnt) + TEST_SAY( + "Discrepency between failed messages (%i) " + "and return value %i (%i - %i)\n", + failcnt, msgcnt - r, msgcnt, r); + TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt); + } + + free(rkmessages); + TEST_SAY( + "Partitioner: " + "Produced %i messages, waiting for deliveries\n", + r); + + msgcounter = msgcnt; + /* Wait for messages to be delivered */ + test_wait_delivery(rk, &msgcounter); + + if (fails) + TEST_FAIL("%i failures, see previous errors", fails); + + if (msgcounter != 0) + TEST_FAIL("Still waiting for %i/%i messages\n", msgcounter, + msgcnt); + if (msg_partition_wo_flag_success == 0) { + TEST_FAIL( + "partitioner was not used, all messages were sent to " + "message specified partition %i", + i); + } + + /* Destroy topic */ + rd_kafka_topic_destroy(rkt); + + /* Destroy rdkafka instance */ + TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); + rd_kafka_destroy(rk); + + return; +} + + +int main_0011_produce_batch(int argc, char **argv) { + test_message_partitioner_wo_per_message_flag(); + test_single_partition(); + test_partitioner(); + if (test_can_create_topics(1)) + test_per_message_partition_flag(); + return 0; +} |