summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0107-topic_recreate.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:07:37 +0000
commitb485aab7e71c1625cfc27e0f92c9509f42378458 (patch)
treeae9abe108601079d1679194de237c9a435ae5b55 /src/fluent-bit/lib/librdkafka-2.1.0/tests/0107-topic_recreate.c
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz
netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0107-topic_recreate.c')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0107-topic_recreate.c259
1 files changed, 259 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0107-topic_recreate.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0107-topic_recreate.c
new file mode 100644
index 000000000..1f91e2a84
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0107-topic_recreate.c
@@ -0,0 +1,259 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2020, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+#include "../src/rdkafka_proto.h"
+
+
+/**
+ * @name Verify that producer and consumer resumes operation after
+ * a topic has been deleted and recreated.
+ */
+
+/**
+ * The message value to produce, one of:
+ * "before" - before topic deletion
+ * "during" - during topic deletion
+ * "after" - after topic has been re-created
+ * "end" - stop producing
+ */
+static mtx_t value_mtx;
+static char *value;
+
+static const int msg_rate = 10; /**< Messages produced per second */
+
+static struct test *this_test; /**< Exposes current test struct (in TLS) to
+ * producer thread. */
+
+
+/**
+ * @brief Treat all error_cb as non-test-fatal.
+ */
+static int
+is_error_fatal(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) {
+ return rd_false;
+}
+
+/**
+ * @brief Producing thread
+ */
+static int run_producer(void *arg) {
+ const char *topic = arg;
+ rd_kafka_t *producer = test_create_producer();
+ int ret = 0;
+
+ test_curr = this_test;
+
+ /* Don't check message status */
+ test_curr->exp_dr_status = (rd_kafka_msg_status_t)-1;
+
+ while (1) {
+ rd_kafka_resp_err_t err;
+
+ mtx_lock(&value_mtx);
+ if (!strcmp(value, "end")) {
+ mtx_unlock(&value_mtx);
+ break;
+ } else if (strcmp(value, "before")) {
+ /* Ignore Delivery report errors after topic
+ * has been deleted and eventually re-created,
+ * we rely on the consumer to verify that
+ * messages are produced. */
+ test_curr->ignore_dr_err = rd_true;
+ }
+
+ err = rd_kafka_producev(
+ producer, RD_KAFKA_V_TOPIC(topic),
+ RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
+ RD_KAFKA_V_VALUE(value, strlen(value)), RD_KAFKA_V_END);
+
+ if (err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART ||
+ err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
+ TEST_SAY("Produce failed (expectedly): %s\n",
+ rd_kafka_err2name(err));
+ else
+ TEST_ASSERT(!err, "producev() failed: %s",
+ rd_kafka_err2name(err));
+
+ mtx_unlock(&value_mtx);
+
+ rd_usleep(1000000 / msg_rate, NULL);
+
+ rd_kafka_poll(producer, 0);
+ }
+
+ if (rd_kafka_flush(producer, 5000)) {
+ TEST_WARN("Failed to flush all message(s), %d remain\n",
+ rd_kafka_outq_len(producer));
+ /* Purge the messages to see which partition they were for */
+ rd_kafka_purge(producer, RD_KAFKA_PURGE_F_QUEUE |
+ RD_KAFKA_PURGE_F_INFLIGHT);
+ rd_kafka_flush(producer, 5000);
+ TEST_SAY("%d message(s) in queue after purge\n",
+ rd_kafka_outq_len(producer));
+
+ ret = 1; /* Fail test from main thread */
+ }
+
+ rd_kafka_destroy(producer);
+
+ return ret;
+}
+
+
+/**
+ * @brief Expect at least \p cnt messages with value matching \p exp_value,
+ * else fail the current test.
+ */
+static void
+expect_messages(rd_kafka_t *consumer, int cnt, const char *exp_value) {
+ int match_cnt = 0, other_cnt = 0, err_cnt = 0;
+ size_t exp_len = strlen(exp_value);
+
+ TEST_SAY("Expecting >= %d messages with value \"%s\"...\n", cnt,
+ exp_value);
+
+ while (match_cnt < cnt) {
+ rd_kafka_message_t *rkmessage;
+
+ rkmessage = rd_kafka_consumer_poll(consumer, 1000);
+ if (!rkmessage)
+ continue;
+
+ if (rkmessage->err) {
+ TEST_SAY("Consume error: %s\n",
+ rd_kafka_message_errstr(rkmessage));
+ err_cnt++;
+ } else if (rkmessage->len == exp_len &&
+ !memcmp(rkmessage->payload, exp_value, exp_len)) {
+ match_cnt++;
+ } else {
+ TEST_SAYL(3,
+ "Received \"%.*s\", expected \"%s\": "
+ "ignored\n",
+ (int)rkmessage->len,
+ (const char *)rkmessage->payload, exp_value);
+ other_cnt++;
+ }
+
+ rd_kafka_message_destroy(rkmessage);
+ }
+
+ TEST_SAY(
+ "Consumed %d messages matching \"%s\", "
+ "ignored %d others, saw %d error(s)\n",
+ match_cnt, exp_value, other_cnt, err_cnt);
+}
+
+
+/**
+ * @brief Test topic create + delete + create with first topic having
+ * \p part_cnt_1 partitions and second topic having \p part_cnt_2 .
+ */
+static void do_test_create_delete_create(int part_cnt_1, int part_cnt_2) {
+ rd_kafka_t *consumer;
+ thrd_t producer_thread;
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+ int ret = 0;
+
+ TEST_SAY(_C_MAG
+ "[ Test topic create(%d parts)+delete+create(%d parts) ]\n",
+ part_cnt_1, part_cnt_2);
+
+ consumer = test_create_consumer(topic, NULL, NULL, NULL);
+
+ /* Create topic */
+ test_create_topic(consumer, topic, part_cnt_1, 3);
+
+ /* Start consumer */
+ test_consumer_subscribe(consumer, topic);
+ test_consumer_wait_assignment(consumer, rd_true);
+
+ mtx_lock(&value_mtx);
+ value = "before";
+ mtx_unlock(&value_mtx);
+
+ /* Create producer thread */
+ if (thrd_create(&producer_thread, run_producer, (void *)topic) !=
+ thrd_success)
+ TEST_FAIL("thrd_create failed");
+
+ /* Consume messages for 5s */
+ expect_messages(consumer, msg_rate * 5, value);
+
+ /* Delete topic */
+ mtx_lock(&value_mtx);
+ value = "during";
+ mtx_unlock(&value_mtx);
+
+ test_delete_topic(consumer, topic);
+ rd_sleep(5);
+
+ /* Re-create topic */
+ test_create_topic(consumer, topic, part_cnt_2, 3);
+
+ mtx_lock(&value_mtx);
+ value = "after";
+ mtx_unlock(&value_mtx);
+
+ /* Consume for 5 more seconds, should see new messages */
+ expect_messages(consumer, msg_rate * 5, value);
+
+ rd_kafka_destroy(consumer);
+
+ /* Wait for producer to exit */
+ mtx_lock(&value_mtx);
+ value = "end";
+ mtx_unlock(&value_mtx);
+
+ if (thrd_join(producer_thread, &ret) != thrd_success || ret != 0)
+ TEST_FAIL("Producer failed: see previous errors");
+
+ TEST_SAY(_C_GRN
+ "[ Test topic create(%d parts)+delete+create(%d parts): "
+ "PASS ]\n",
+ part_cnt_1, part_cnt_2);
+}
+
+
+int main_0107_topic_recreate(int argc, char **argv) {
+ this_test = test_curr; /* Need to expose current test struct (in TLS)
+ * to producer thread. */
+
+ this_test->is_fatal_cb = is_error_fatal;
+
+ mtx_init(&value_mtx, mtx_plain);
+
+ test_conf_init(NULL, NULL, 60);
+
+ do_test_create_delete_create(10, 3);
+ do_test_create_delete_create(3, 6);
+
+ return 0;
+}