summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:19:48 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-03-09 13:20:02 +0000
commit58daab21cd043e1dc37024a7f99b396788372918 (patch)
tree96771e43bb69f7c1c2b0b4f7374cb74d7866d0cb /fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c
parentReleasing debian version 1.43.2-1. (diff)
downloadnetdata-58daab21cd043e1dc37024a7f99b396788372918.tar.xz
netdata-58daab21cd043e1dc37024a7f99b396788372918.zip
Merging upstream version 1.44.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c576
1 files changed, 576 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c
new file mode 100644
index 000000000..584d37bc6
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0011-produce_batch.c
@@ -0,0 +1,576 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2013, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Tests messages are produced in order.
+ */
+
+
+#include "test.h"
+
+/* Typical include path would be <librdkafka/rdkafka.h>, but this program
+ * is built from within the librdkafka source tree and thus differs. */
+#include "rdkafka.h" /* for Kafka driver */
+
+
+static int msgid_next = 0;
+static int fails = 0;
+static int msgcounter = 0;
+static int *dr_partition_count = NULL;
+static const int topic_num_partitions = 4;
+static int msg_partition_wo_flag = 2;
+static int msg_partition_wo_flag_success = 0;
+
+/**
+ * Delivery reported callback.
+ * Called for each message once to signal its delivery status.
+ */
+static void dr_single_partition_cb(rd_kafka_t *rk,
+ void *payload,
+ size_t len,
+ rd_kafka_resp_err_t err,
+ void *opaque,
+ void *msg_opaque) {
+ int msgid = *(int *)msg_opaque;
+
+ free(msg_opaque);
+
+ if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
+ TEST_FAIL("Message delivery failed: %s\n",
+ rd_kafka_err2str(err));
+
+ if (msgid != msgid_next) {
+ fails++;
+ TEST_FAIL("Delivered msg %i, expected %i\n", msgid, msgid_next);
+ return;
+ }
+
+ msgid_next = msgid + 1;
+ msgcounter--;
+}
+
+/* Produce a batch of messages to a single partition. */
+static void test_single_partition(void) {
+ int partition = 0;
+ int r;
+ rd_kafka_t *rk;
+ rd_kafka_topic_t *rkt;
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_conf_t *topic_conf;
+ char msg[128];
+ int msgcnt = test_quick ? 100 : 100000;
+ int failcnt = 0;
+ int i;
+ rd_kafka_message_t *rkmessages;
+
+ msgid_next = 0;
+
+ test_conf_init(&conf, &topic_conf, 20);
+
+ /* Set delivery report callback */
+ rd_kafka_conf_set_dr_cb(conf, dr_single_partition_cb);
+
+ /* Create kafka instance */
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+
+ TEST_SAY("test_single_partition: Created kafka instance %s\n",
+ rd_kafka_name(rk));
+
+ rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf);
+ if (!rkt)
+ TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno));
+
+ /* Create messages */
+ rkmessages = calloc(sizeof(*rkmessages), msgcnt);
+ for (i = 0; i < msgcnt; i++) {
+ int *msgidp = malloc(sizeof(*msgidp));
+ *msgidp = i;
+ rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i",
+ __FILE__, __FUNCTION__, i);
+
+ rkmessages[i].payload = rd_strdup(msg);
+ rkmessages[i].len = strlen(msg);
+ rkmessages[i]._private = msgidp;
+ rkmessages[i].partition = 2; /* Will be ignored since
+ * RD_KAFKA_MSG_F_PARTITION
+ * is not supplied. */
+ }
+
+ r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE,
+ rkmessages, msgcnt);
+
+ /* Scan through messages to check for errors. */
+ for (i = 0; i < msgcnt; i++) {
+ if (rkmessages[i].err) {
+ failcnt++;
+ if (failcnt < 100)
+ TEST_SAY("Message #%i failed: %s\n", i,
+ rd_kafka_err2str(rkmessages[i].err));
+ }
+ }
+
+ /* All messages should've been produced. */
+ if (r < msgcnt) {
+ TEST_SAY(
+ "Not all messages were accepted "
+ "by produce_batch(): %i < %i\n",
+ r, msgcnt);
+ if (msgcnt - r != failcnt)
+ TEST_SAY(
+ "Discrepency between failed messages (%i) "
+ "and return value %i (%i - %i)\n",
+ failcnt, msgcnt - r, msgcnt, r);
+ TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt);
+ }
+
+ free(rkmessages);
+ TEST_SAY(
+ "Single partition: "
+ "Produced %i messages, waiting for deliveries\n",
+ r);
+
+ msgcounter = msgcnt;
+
+ /* Wait for messages to be delivered */
+ test_wait_delivery(rk, &msgcounter);
+
+ if (fails)
+ TEST_FAIL("%i failures, see previous errors", fails);
+
+ if (msgid_next != msgcnt)
+ TEST_FAIL("Still waiting for messages: next %i != end %i\n",
+ msgid_next, msgcnt);
+
+ /* Destroy topic */
+ rd_kafka_topic_destroy(rkt);
+
+ /* Destroy rdkafka instance */
+ TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
+ rd_kafka_destroy(rk);
+
+ return;
+}
+
+
+
+/**
+ * Delivery reported callback.
+ * Called for each message once to signal its delivery status.
+ */
+static void dr_partitioner_cb(rd_kafka_t *rk,
+ void *payload,
+ size_t len,
+ rd_kafka_resp_err_t err,
+ void *opaque,
+ void *msg_opaque) {
+ int msgid = *(int *)msg_opaque;
+
+ free(msg_opaque);
+
+ if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
+ TEST_FAIL("Message delivery failed: %s\n",
+ rd_kafka_err2str(err));
+
+ if (msgcounter <= 0)
+ TEST_FAIL(
+ "Too many message dr_cb callback calls "
+ "(at msgid #%i)\n",
+ msgid);
+ msgcounter--;
+}
+
+/* Produce a batch of messages using random (default) partitioner */
+static void test_partitioner(void) {
+ int partition = RD_KAFKA_PARTITION_UA;
+ int r;
+ rd_kafka_t *rk;
+ rd_kafka_topic_t *rkt;
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_conf_t *topic_conf;
+ char msg[128];
+ int msgcnt = test_quick ? 100 : 100000;
+ int failcnt = 0;
+ int i;
+ rd_kafka_message_t *rkmessages;
+
+ test_conf_init(&conf, &topic_conf, 30);
+
+ /* Set delivery report callback */
+ rd_kafka_conf_set_dr_cb(conf, dr_partitioner_cb);
+
+ /* Create kafka instance */
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+
+ TEST_SAY("test_partitioner: Created kafka instance %s\n",
+ rd_kafka_name(rk));
+
+ rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf);
+ if (!rkt)
+ TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno));
+
+ /* Create messages */
+ rkmessages = calloc(sizeof(*rkmessages), msgcnt);
+ for (i = 0; i < msgcnt; i++) {
+ int *msgidp = malloc(sizeof(*msgidp));
+ *msgidp = i;
+ rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i",
+ __FILE__, __FUNCTION__, i);
+
+ rkmessages[i].payload = rd_strdup(msg);
+ rkmessages[i].len = strlen(msg);
+ rkmessages[i]._private = msgidp;
+ }
+
+ r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE,
+ rkmessages, msgcnt);
+
+ /* Scan through messages to check for errors. */
+ for (i = 0; i < msgcnt; i++) {
+ if (rkmessages[i].err) {
+ failcnt++;
+ if (failcnt < 100)
+ TEST_SAY("Message #%i failed: %s\n", i,
+ rd_kafka_err2str(rkmessages[i].err));
+ }
+ }
+
+ /* All messages should've been produced. */
+ if (r < msgcnt) {
+ TEST_SAY(
+ "Not all messages were accepted "
+ "by produce_batch(): %i < %i\n",
+ r, msgcnt);
+ if (msgcnt - r != failcnt)
+ TEST_SAY(
+ "Discrepency between failed messages (%i) "
+ "and return value %i (%i - %i)\n",
+ failcnt, msgcnt - r, msgcnt, r);
+ TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt);
+ }
+
+ free(rkmessages);
+ TEST_SAY(
+ "Partitioner: "
+ "Produced %i messages, waiting for deliveries\n",
+ r);
+
+ msgcounter = msgcnt;
+ /* Wait for messages to be delivered */
+ test_wait_delivery(rk, &msgcounter);
+
+ if (fails)
+ TEST_FAIL("%i failures, see previous errors", fails);
+
+ if (msgcounter != 0)
+ TEST_FAIL("Still waiting for %i/%i messages\n", msgcounter,
+ msgcnt);
+
+ /* Destroy topic */
+ rd_kafka_topic_destroy(rkt);
+
+ /* Destroy rdkafka instance */
+ TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
+ rd_kafka_destroy(rk);
+
+ return;
+}
+
+static void dr_per_message_partition_cb(rd_kafka_t *rk,
+ const rd_kafka_message_t *rkmessage,
+ void *opaque) {
+
+ free(rkmessage->_private);
+
+ if (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR)
+ TEST_FAIL("Message delivery failed: %s\n",
+ rd_kafka_err2str(rkmessage->err));
+
+ if (msgcounter <= 0)
+ TEST_FAIL(
+ "Too many message dr_cb callback calls "
+ "(at msg offset #%" PRId64 ")\n",
+ rkmessage->offset);
+
+ TEST_ASSERT(rkmessage->partition < topic_num_partitions);
+ msgcounter--;
+
+ dr_partition_count[rkmessage->partition]++;
+}
+
+/* Produce a batch of messages using with per message partition flag */
+static void test_per_message_partition_flag(void) {
+ int partition = 0;
+ int r;
+ rd_kafka_t *rk;
+ rd_kafka_topic_t *rkt;
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_conf_t *topic_conf;
+ char msg[128 + sizeof(__FILE__) + sizeof(__FUNCTION__)];
+ int msgcnt = test_quick ? 100 : 1000;
+ int failcnt = 0;
+ int i;
+ int *rkpartition_counts;
+ rd_kafka_message_t *rkmessages;
+ const char *topic_name;
+
+ test_conf_init(&conf, &topic_conf, 30);
+
+ /* Set delivery report callback */
+ rd_kafka_conf_set_dr_msg_cb(conf, dr_per_message_partition_cb);
+
+ /* Create kafka instance */
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+
+ TEST_SAY("test_per_message_partition_flag: Created kafka instance %s\n",
+ rd_kafka_name(rk));
+ topic_name = test_mk_topic_name("0011_per_message_flag", 1);
+ test_create_topic(rk, topic_name, topic_num_partitions, 1);
+
+ rkt = rd_kafka_topic_new(rk, topic_name, topic_conf);
+ if (!rkt)
+ TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno));
+
+ /* Create messages */
+ rkpartition_counts = calloc(sizeof(int), topic_num_partitions);
+ dr_partition_count = calloc(sizeof(int), topic_num_partitions);
+ rkmessages = calloc(sizeof(*rkmessages), msgcnt);
+ for (i = 0; i < msgcnt; i++) {
+ int *msgidp = malloc(sizeof(*msgidp));
+ *msgidp = i;
+ rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i",
+ __FILE__, __FUNCTION__, i);
+
+ rkmessages[i].payload = rd_strdup(msg);
+ rkmessages[i].len = strlen(msg);
+ rkmessages[i]._private = msgidp;
+ rkmessages[i].partition = jitter(0, topic_num_partitions - 1);
+ rkpartition_counts[rkmessages[i].partition]++;
+ }
+
+ r = rd_kafka_produce_batch(
+ rkt, partition, RD_KAFKA_MSG_F_PARTITION | RD_KAFKA_MSG_F_FREE,
+ rkmessages, msgcnt);
+
+ /* Scan through messages to check for errors. */
+ for (i = 0; i < msgcnt; i++) {
+ if (rkmessages[i].err) {
+ failcnt++;
+ if (failcnt < 100)
+ TEST_SAY("Message #%i failed: %s\n", i,
+ rd_kafka_err2str(rkmessages[i].err));
+ }
+ }
+
+ /* All messages should've been produced. */
+ if (r < msgcnt) {
+ TEST_SAY(
+ "Not all messages were accepted "
+ "by produce_batch(): %i < %i\n",
+ r, msgcnt);
+ if (msgcnt - r != failcnt)
+ TEST_SAY(
+ "Discrepency between failed messages (%i) "
+ "and return value %i (%i - %i)\n",
+ failcnt, msgcnt - r, msgcnt, r);
+ TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt);
+ }
+
+ free(rkmessages);
+ TEST_SAY(
+ "Per-message partition: "
+ "Produced %i messages, waiting for deliveries\n",
+ r);
+
+ msgcounter = msgcnt;
+ /* Wait for messages to be delivered */
+ test_wait_delivery(rk, &msgcounter);
+
+ if (msgcounter != 0)
+ TEST_FAIL("Still waiting for %i/%i messages\n", msgcounter,
+ msgcnt);
+
+ for (i = 0; i < topic_num_partitions; i++) {
+ if (dr_partition_count[i] != rkpartition_counts[i]) {
+ TEST_FAIL(
+ "messages were not sent to designated "
+ "partitions expected messages %i in "
+ "partition %i, but only "
+ "%i messages were sent",
+ rkpartition_counts[i], i, dr_partition_count[i]);
+ }
+ }
+
+ free(rkpartition_counts);
+ free(dr_partition_count);
+
+ /* Destroy topic */
+ rd_kafka_topic_destroy(rkt);
+
+ /* Destroy rdkafka instance */
+ TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
+ rd_kafka_destroy(rk);
+
+ return;
+}
+
+static void
+dr_partitioner_wo_per_message_flag_cb(rd_kafka_t *rk,
+ const rd_kafka_message_t *rkmessage,
+ void *opaque) {
+ free(rkmessage->_private);
+
+ if (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR)
+ TEST_FAIL("Message delivery failed: %s\n",
+ rd_kafka_err2str(rkmessage->err));
+ if (msgcounter <= 0)
+ TEST_FAIL(
+ "Too many message dr_cb callback calls "
+ "(at msg offset #%" PRId64 ")\n",
+ rkmessage->offset);
+ if (rkmessage->partition != msg_partition_wo_flag)
+ msg_partition_wo_flag_success = 1;
+ msgcounter--;
+}
+
+/**
+ * @brief Produce a batch of messages using partitioner
+ * without per message partition flag
+ */
+static void test_message_partitioner_wo_per_message_flag(void) {
+ int partition = RD_KAFKA_PARTITION_UA;
+ int r;
+ rd_kafka_t *rk;
+ rd_kafka_topic_t *rkt;
+ rd_kafka_conf_t *conf;
+ rd_kafka_topic_conf_t *topic_conf;
+ char msg[128 + sizeof(__FILE__) + sizeof(__FUNCTION__)];
+ int msgcnt = test_quick ? 100 : 1000;
+ int failcnt = 0;
+ int i;
+ rd_kafka_message_t *rkmessages;
+
+ test_conf_init(&conf, &topic_conf, 30);
+
+ /* Set delivery report callback */
+ rd_kafka_conf_set_dr_msg_cb(conf,
+ dr_partitioner_wo_per_message_flag_cb);
+ test_conf_set(conf, "sticky.partitioning.linger.ms", "0");
+
+ /* Create kafka instance */
+ rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
+
+ TEST_SAY("test_partitioner: Created kafka instance %s\n",
+ rd_kafka_name(rk));
+
+ rkt = rd_kafka_topic_new(rk, test_mk_topic_name("0011", 0), topic_conf);
+ if (!rkt)
+ TEST_FAIL("Failed to create topic: %s\n", rd_strerror(errno));
+
+ /* Create messages */
+ rkmessages = calloc(sizeof(*rkmessages), msgcnt);
+ for (i = 0; i < msgcnt; i++) {
+ int *msgidp = malloc(sizeof(*msgidp));
+ *msgidp = i;
+ rd_snprintf(msg, sizeof(msg), "%s:%s test message #%i",
+ __FILE__, __FUNCTION__, i);
+
+ rkmessages[i].payload = rd_strdup(msg);
+ rkmessages[i].len = strlen(msg);
+ rkmessages[i]._private = msgidp;
+ rkmessages[i].partition = msg_partition_wo_flag;
+ }
+
+ r = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_FREE,
+ rkmessages, msgcnt);
+
+ /* Scan through messages to check for errors. */
+ for (i = 0; i < msgcnt; i++) {
+ if (rkmessages[i].err) {
+ failcnt++;
+ if (failcnt < 100)
+ TEST_SAY("Message #%i failed: %s\n", i,
+ rd_kafka_err2str(rkmessages[i].err));
+ }
+ }
+
+ /* All messages should've been produced. */
+ if (r < msgcnt) {
+ TEST_SAY(
+ "Not all messages were accepted "
+ "by produce_batch(): %i < %i\n",
+ r, msgcnt);
+ if (msgcnt - r != failcnt)
+ TEST_SAY(
+ "Discrepency between failed messages (%i) "
+ "and return value %i (%i - %i)\n",
+ failcnt, msgcnt - r, msgcnt, r);
+ TEST_FAIL("%i/%i messages failed\n", msgcnt - r, msgcnt);
+ }
+
+ free(rkmessages);
+ TEST_SAY(
+ "Partitioner: "
+ "Produced %i messages, waiting for deliveries\n",
+ r);
+
+ msgcounter = msgcnt;
+ /* Wait for messages to be delivered */
+ test_wait_delivery(rk, &msgcounter);
+
+ if (fails)
+ TEST_FAIL("%i failures, see previous errors", fails);
+
+ if (msgcounter != 0)
+ TEST_FAIL("Still waiting for %i/%i messages\n", msgcounter,
+ msgcnt);
+ if (msg_partition_wo_flag_success == 0) {
+ TEST_FAIL(
+ "partitioner was not used, all messages were sent to "
+ "message specified partition %i",
+ i);
+ }
+
+ /* Destroy topic */
+ rd_kafka_topic_destroy(rkt);
+
+ /* Destroy rdkafka instance */
+ TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
+ rd_kafka_destroy(rk);
+
+ return;
+}
+
+
+int main_0011_produce_batch(int argc, char **argv) {
+ test_message_partitioner_wo_per_message_flag();
+ test_single_partition();
+ test_partitioner();
+ if (test_can_create_topics(1))
+ test_per_message_partition_flag();
+ return 0;
+}