diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c | 576 |
1 files changed, 0 insertions, 576 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c deleted file mode 100644 index 584d37bc6..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c +++ /dev/null @@ -1,576 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2012-2013, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -/** - * Tests messages are produced in order. - */ - - -#include "test.h" - -/* Typical include path would be <librdkafka/rdkafka.h>, but this program - * is built from within the librdkafka source tree and thus differs. */ -#include "rdkafka.h" /* for Kafka driver */ - - -static int msgid_next = 0; -static int fails = 0; -static int msgcounter = 0; -static int *dr_partition_count = NULL; -static const int topic_num_partitions = 4; -static int msg_partition_wo_flag = 2; -static int msg_partition_wo_flag_success = 0; - -/** - * Delivery reported callback. - * Called for each message once to signal its delivery status. - */ -static void dr_single_partition_cb(rd_kafka_t *rk, - void *payload, - size_t len, - rd_kafka_resp_err_t err, - void *opaque, - void *msg_opaque) { - int msgid = *(int *)msg_opaque; - - free(msg_opaque); - - if (err != RD_KAFKA_RESP_ERR_NO_ERROR) - TEST_FAIL("Message delivery failed: %s\n", - rd_kafka_err2str(err)); - - if (msgid != msgid_next) { - fails++; - TEST_FAIL("Delivered msg %i, expected %i\n", msgid, msgid_next); - return; - } - - msgid_next = msgid + 1; - msgcounter--; -} - -/* Produce a batch of messages to a single partition. */ -static void test_single_partition(void) { - int partition = 0; - int r; - rd_kafka_t *rk; - rd_kafka_topic_t *rkt; - rd_kafka_conf_t *conf; - rd_kafka_topic_conf_t *topic_conf; - char msg[128]; - int msgcnt = test_quick ? 100 : 100000; - int failcnt = 0; - int i; - rd_kafka_message_t *rkmessages; - - msgid_next = 0; - - test_conf_init(&conf, &topic_conf, 20); - - /* Set delivery report callback */ - rd_kafka_conf_set_dr_cb(conf, dr_single_partition_cb); - - /* Create kafka instance */ - rk = test_create_handle(RD_KAFKA_PRODUCER, conf); - - TEST_SAY("test_single_partition: Created kafka instance %s\n", - rd_kafka_name(rk)); - - rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf); - if (!rkt) - TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); - - /* Create messages */ - rkmessages = calloc(sizeof(*rkmessages), msgcnt); - for (i = 0; i < msgcnt; i++) { - int *msgidp = malloc(sizeof(*msgidp)); - *msgidp = i; - rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i", - __FILE__, __FUNCTION__, i); - - rkmessages[i].payload = rd_strdup(msg); - rkmessages[i].len = strlen(msg); - rkmessages[i]._private = msgidp; - rkmessages[i].partition = 2; /* Will be ignored since - * RD_KAFKA_MSG_F_PARTITION - * is not supplied. */ - } - - r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE, - rkmessages, msgcnt); - - /* Scan through messages to check for errors. */ - for (i = 0; i < msgcnt; i++) { - if (rkmessages[i].err) { - failcnt++; - if (failcnt < 100) - TEST_SAY("Message #%i failed: %s\n", i, - rd_kafka_err2str(rkmessages[i].err)); - } - } - - /* All messages should've been produced. */ - if (r < msgcnt) { - TEST_SAY( - "Not all messages were accepted " - "by produce_batch(): %i < %i\n", - r, msgcnt); - if (msgcnt - r != failcnt) - TEST_SAY( - "Discrepency between failed messages (%i) " - "and return value %i (%i - %i)\n", - failcnt, msgcnt - r, msgcnt, r); - TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt); - } - - free(rkmessages); - TEST_SAY( - "Single partition: " - "Produced %i messages, waiting for deliveries\n", - r); - - msgcounter = msgcnt; - - /* Wait for messages to be delivered */ - test_wait_delivery(rk, &msgcounter); - - if (fails) - TEST_FAIL("%i failures, see previous errors", fails); - - if (msgid_next != msgcnt) - TEST_FAIL("Still waiting for messages: next %i != end %i\n", - msgid_next, msgcnt); - - /* Destroy topic */ - rd_kafka_topic_destroy(rkt); - - /* Destroy rdkafka instance */ - TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); - rd_kafka_destroy(rk); - - return; -} - - - -/** - * Delivery reported callback. - * Called for each message once to signal its delivery status. - */ -static void dr_partitioner_cb(rd_kafka_t *rk, - void *payload, - size_t len, - rd_kafka_resp_err_t err, - void *opaque, - void *msg_opaque) { - int msgid = *(int *)msg_opaque; - - free(msg_opaque); - - if (err != RD_KAFKA_RESP_ERR_NO_ERROR) - TEST_FAIL("Message delivery failed: %s\n", - rd_kafka_err2str(err)); - - if (msgcounter <= 0) - TEST_FAIL( - "Too many message dr_cb callback calls " - "(at msgid #%i)\n", - msgid); - msgcounter--; -} - -/* Produce a batch of messages using random (default) partitioner */ -static void test_partitioner(void) { - int partition = RD_KAFKA_PARTITION_UA; - int r; - rd_kafka_t *rk; - rd_kafka_topic_t *rkt; - rd_kafka_conf_t *conf; - rd_kafka_topic_conf_t *topic_conf; - char msg[128]; - int msgcnt = test_quick ? 100 : 100000; - int failcnt = 0; - int i; - rd_kafka_message_t *rkmessages; - - test_conf_init(&conf, &topic_conf, 30); - - /* Set delivery report callback */ - rd_kafka_conf_set_dr_cb(conf, dr_partitioner_cb); - - /* Create kafka instance */ - rk = test_create_handle(RD_KAFKA_PRODUCER, conf); - - TEST_SAY("test_partitioner: Created kafka instance %s\n", - rd_kafka_name(rk)); - - rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf); - if (!rkt) - TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); - - /* Create messages */ - rkmessages = calloc(sizeof(*rkmessages), msgcnt); - for (i = 0; i < msgcnt; i++) { - int *msgidp = malloc(sizeof(*msgidp)); - *msgidp = i; - rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i", - __FILE__, __FUNCTION__, i); - - rkmessages[i].payload = rd_strdup(msg); - rkmessages[i].len = strlen(msg); - rkmessages[i]._private = msgidp; - } - - r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE, - rkmessages, msgcnt); - - /* Scan through messages to check for errors. */ - for (i = 0; i < msgcnt; i++) { - if (rkmessages[i].err) { - failcnt++; - if (failcnt < 100) - TEST_SAY("Message #%i failed: %s\n", i, - rd_kafka_err2str(rkmessages[i].err)); - } - } - - /* All messages should've been produced. */ - if (r < msgcnt) { - TEST_SAY( - "Not all messages were accepted " - "by produce_batch(): %i < %i\n", - r, msgcnt); - if (msgcnt - r != failcnt) - TEST_SAY( - "Discrepency between failed messages (%i) " - "and return value %i (%i - %i)\n", - failcnt, msgcnt - r, msgcnt, r); - TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt); - } - - free(rkmessages); - TEST_SAY( - "Partitioner: " - "Produced %i messages, waiting for deliveries\n", - r); - - msgcounter = msgcnt; - /* Wait for messages to be delivered */ - test_wait_delivery(rk, &msgcounter); - - if (fails) - TEST_FAIL("%i failures, see previous errors", fails); - - if (msgcounter != 0) - TEST_FAIL("Still waiting for %i/%i messages\n", msgcounter, - msgcnt); - - /* Destroy topic */ - rd_kafka_topic_destroy(rkt); - - /* Destroy rdkafka instance */ - TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); - rd_kafka_destroy(rk); - - return; -} - -static void dr_per_message_partition_cb(rd_kafka_t *rk, - const rd_kafka_message_t *rkmessage, - void *opaque) { - - free(rkmessage->_private); - - if (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) - TEST_FAIL("Message delivery failed: %s\n", - rd_kafka_err2str(rkmessage->err)); - - if (msgcounter <= 0) - TEST_FAIL( - "Too many message dr_cb callback calls " - "(at msg offset #%" PRId64 ")\n", - rkmessage->offset); - - TEST_ASSERT(rkmessage->partition < topic_num_partitions); - msgcounter--; - - dr_partition_count[rkmessage->partition]++; -} - -/* Produce a batch of messages using with per message partition flag */ -static void test_per_message_partition_flag(void) { - int partition = 0; - int r; - rd_kafka_t *rk; - rd_kafka_topic_t *rkt; - rd_kafka_conf_t *conf; - rd_kafka_topic_conf_t *topic_conf; - char msg[128 + sizeof(__FILE__) + sizeof(__FUNCTION__)]; - int msgcnt = test_quick ? 100 : 1000; - int failcnt = 0; - int i; - int *rkpartition_counts; - rd_kafka_message_t *rkmessages; - const char *topic_name; - - test_conf_init(&conf, &topic_conf, 30); - - /* Set delivery report callback */ - rd_kafka_conf_set_dr_msg_cb(conf, dr_per_message_partition_cb); - - /* Create kafka instance */ - rk = test_create_handle(RD_KAFKA_PRODUCER, conf); - - TEST_SAY("test_per_message_partition_flag: Created kafka instance %s\n", - rd_kafka_name(rk)); - topic_name = test_mk_topic_name("0011_per_message_flag", 1); - test_create_topic(rk, topic_name, topic_num_partitions, 1); - - rkt = rd_kafka_topic_new(rk, topic_name, topic_conf); - if (!rkt) - TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); - - /* Create messages */ - rkpartition_counts = calloc(sizeof(int), topic_num_partitions); - dr_partition_count = calloc(sizeof(int), topic_num_partitions); - rkmessages = calloc(sizeof(*rkmessages), msgcnt); - for (i = 0; i < msgcnt; i++) { - int *msgidp = malloc(sizeof(*msgidp)); - *msgidp = i; - rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i", - __FILE__, __FUNCTION__, i); - - rkmessages[i].payload = rd_strdup(msg); - rkmessages[i].len = strlen(msg); - rkmessages[i]._private = msgidp; - rkmessages[i].partition = jitter(0, topic_num_partitions - 1); - rkpartition_counts[rkmessages[i].partition]++; - } - - r = rd_kafka_produce_batch( - rkt, partition, RD_KAFKA_MSG_F_PARTITION | RD_KAFKA_MSG_F_FREE, - rkmessages, msgcnt); - - /* Scan through messages to check for errors. */ - for (i = 0; i < msgcnt; i++) { - if (rkmessages[i].err) { - failcnt++; - if (failcnt < 100) - TEST_SAY("Message #%i failed: %s\n", i, - rd_kafka_err2str(rkmessages[i].err)); - } - } - - /* All messages should've been produced. */ - if (r < msgcnt) { - TEST_SAY( - "Not all messages were accepted " - "by produce_batch(): %i < %i\n", - r, msgcnt); - if (msgcnt - r != failcnt) - TEST_SAY( - "Discrepency between failed messages (%i) " - "and return value %i (%i - %i)\n", - failcnt, msgcnt - r, msgcnt, r); - TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt); - } - - free(rkmessages); - TEST_SAY( - "Per-message partition: " - "Produced %i messages, waiting for deliveries\n", - r); - - msgcounter = msgcnt; - /* Wait for messages to be delivered */ - test_wait_delivery(rk, &msgcounter); - - if (msgcounter != 0) - TEST_FAIL("Still waiting for %i/%i messages\n", msgcounter, - msgcnt); - - for (i = 0; i < topic_num_partitions; i++) { - if (dr_partition_count[i] != rkpartition_counts[i]) { - TEST_FAIL( - "messages were not sent to designated " - "partitions expected messages %i in " - "partition %i, but only " - "%i messages were sent", - rkpartition_counts[i], i, dr_partition_count[i]); - } - } - - free(rkpartition_counts); - free(dr_partition_count); - - /* Destroy topic */ - rd_kafka_topic_destroy(rkt); - - /* Destroy rdkafka instance */ - TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); - rd_kafka_destroy(rk); - - return; -} - -static void -dr_partitioner_wo_per_message_flag_cb(rd_kafka_t *rk, - const rd_kafka_message_t *rkmessage, - void *opaque) { - free(rkmessage->_private); - - if (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) - TEST_FAIL("Message delivery failed: %s\n", - rd_kafka_err2str(rkmessage->err)); - if (msgcounter <= 0) - TEST_FAIL( - "Too many message dr_cb callback calls " - "(at msg offset #%" PRId64 ")\n", - rkmessage->offset); - if (rkmessage->partition != msg_partition_wo_flag) - msg_partition_wo_flag_success = 1; - msgcounter--; -} - -/** - * @brief Produce a batch of messages using partitioner - * without per message partition flag - */ -static void test_message_partitioner_wo_per_message_flag(void) { - int partition = RD_KAFKA_PARTITION_UA; - int r; - rd_kafka_t *rk; - rd_kafka_topic_t *rkt; - rd_kafka_conf_t *conf; - rd_kafka_topic_conf_t *topic_conf; - char msg[128 + sizeof(__FILE__) + sizeof(__FUNCTION__)]; - int msgcnt = test_quick ? 100 : 1000; - int failcnt = 0; - int i; - rd_kafka_message_t *rkmessages; - - test_conf_init(&conf, &topic_conf, 30); - - /* Set delivery report callback */ - rd_kafka_conf_set_dr_msg_cb(conf, - dr_partitioner_wo_per_message_flag_cb); - test_conf_set(conf, "sticky.partitioning.linger.ms", "0"); - - /* Create kafka instance */ - rk = test_create_handle(RD_KAFKA_PRODUCER, conf); - - TEST_SAY("test_partitioner: Created kafka instance %s\n", - rd_kafka_name(rk)); - - rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf); - if (!rkt) - TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno)); - - /* Create messages */ - rkmessages = calloc(sizeof(*rkmessages), msgcnt); - for (i = 0; i < msgcnt; i++) { - int *msgidp = malloc(sizeof(*msgidp)); - *msgidp = i; - rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i", - __FILE__, __FUNCTION__, i); - - rkmessages[i].payload = rd_strdup(msg); - rkmessages[i].len = strlen(msg); - rkmessages[i]._private = msgidp; - rkmessages[i].partition = msg_partition_wo_flag; - } - - r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE, - rkmessages, msgcnt); - - /* Scan through messages to check for errors. */ - for (i = 0; i < msgcnt; i++) { - if (rkmessages[i].err) { - failcnt++; - if (failcnt < 100) - TEST_SAY("Message #%i failed: %s\n", i, - rd_kafka_err2str(rkmessages[i].err)); - } - } - - /* All messages should've been produced. */ - if (r < msgcnt) { - TEST_SAY( - "Not all messages were accepted " - "by produce_batch(): %i < %i\n", - r, msgcnt); - if (msgcnt - r != failcnt) - TEST_SAY( - "Discrepency between failed messages (%i) " - "and return value %i (%i - %i)\n", - failcnt, msgcnt - r, msgcnt, r); - TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt); - } - - free(rkmessages); - TEST_SAY( - "Partitioner: " - "Produced %i messages, waiting for deliveries\n", - r); - - msgcounter = msgcnt; - /* Wait for messages to be delivered */ - test_wait_delivery(rk, &msgcounter); - - if (fails) - TEST_FAIL("%i failures, see previous errors", fails); - - if (msgcounter != 0) - TEST_FAIL("Still waiting for %i/%i messages\n", msgcounter, - msgcnt); - if (msg_partition_wo_flag_success == 0) { - TEST_FAIL( - "partitioner was not used, all messages were sent to " - "message specified partition %i", - i); - } - - /* Destroy topic */ - rd_kafka_topic_destroy(rkt); - - /* Destroy rdkafka instance */ - TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk)); - rd_kafka_destroy(rk); - - return; -} - - -int main_0011_produce_batch(int argc, char **argv) { - test_message_partitioner_wo_per_message_flag(); - test_single_partition(); - test_partitioner(); - if (test_can_create_topics(1)) - test_per_message_partition_flag(); - return 0; -} |