From b485aab7e71c1625cfc27e0f92c9509f42378458 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 13:19:16 +0200 Subject: Adding upstream version 1.45.3+dfsg. Signed-off-by: Daniel Baumann --- .../lib/librdkafka-2.1.0/tests/0075-retry.c | 252 +++++++++++++++++++++ 1 file changed, 252 insertions(+) create mode 100644 src/fluent-bit/lib/librdkafka-2.1.0/tests/0075-retry.c (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0075-retry.c') diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0075-retry.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0075-retry.c new file mode 100644 index 000000000..7e1e4f0f5 --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0075-retry.c @@ -0,0 +1,252 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + +#if WITH_SOCKEM +#include "rdkafka.h" + +#include +#include + +/** + * Request retry testing + */ + +/* Hang on to the first broker socket we see in connect_cb, + * reject all the rest (connection refused) to make sure we're only + * playing with one single broker for this test. */ +static struct { + mtx_t lock; + cnd_t cnd; + sockem_t *skm; + thrd_t thrd; + struct { + int64_t ts_at; /* to ctrl thread: at this time, set delay */ + int delay; + int ack; /* from ctrl thread: new delay acked */ + } cmd; + struct { + int64_t ts_at; /* to ctrl thread: at this time, set delay */ + int delay; + + } next; + int term; +} ctrl; + +static int ctrl_thrd_main(void *arg) { + + + mtx_lock(&ctrl.lock); + while (!ctrl.term) { + int64_t now; + + cnd_timedwait_ms(&ctrl.cnd, &ctrl.lock, 10); + + if (ctrl.cmd.ts_at) { + ctrl.next.ts_at = ctrl.cmd.ts_at; + ctrl.next.delay = ctrl.cmd.delay; + ctrl.cmd.ts_at = 0; + ctrl.cmd.ack = 1; + printf(_C_CYA + "## %s: sockem: " + "receieved command to set delay " + "to %d in %dms\n" _C_CLR, + __FILE__, ctrl.next.delay, + (int)(ctrl.next.ts_at - test_clock()) / 1000); + } + + now = test_clock(); + if (ctrl.next.ts_at && now > ctrl.next.ts_at) { + assert(ctrl.skm); + printf(_C_CYA + "## %s: " + "sockem: setting socket delay to %d\n" _C_CLR, + __FILE__, ctrl.next.delay); + sockem_set(ctrl.skm, "delay", ctrl.next.delay, NULL); + ctrl.next.ts_at = 0; + cnd_signal(&ctrl.cnd); /* signal back to caller */ + } + } + mtx_unlock(&ctrl.lock); + + return 0; +} + + +/** + * @brief Sockem connect, called from **internal librdkafka thread** through + * librdkafka's connect_cb + */ +static int connect_cb(struct test *test, sockem_t *skm, const char *id) { + + mtx_lock(&ctrl.lock); + if (ctrl.skm) { + /* Reject all but the first connect */ + mtx_unlock(&ctrl.lock); + return ECONNREFUSED; + } + + ctrl.skm = skm; + + /* signal wakeup to main thread */ + cnd_broadcast(&ctrl.cnd); + mtx_unlock(&ctrl.lock); + + return 0; +} + +static int +is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) { + /* Ignore connectivity errors since we'll be bringing down + * .. connectivity. + * SASL auther will think a connection-down even in the auth + * state means the broker doesn't support SASL PLAIN. */ + TEST_SAY("is_fatal?: %s: %s\n", rd_kafka_err2str(err), reason); + if (err == RD_KAFKA_RESP_ERR__TRANSPORT || + err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN || + err == RD_KAFKA_RESP_ERR__AUTHENTICATION || + err == RD_KAFKA_RESP_ERR__TIMED_OUT) + return 0; + return 1; +} + +/** + * @brief Set socket delay to kick in after \p after ms + */ +static void set_delay(int after, int delay) { + TEST_SAY("Set delay to %dms (after %dms)\n", delay, after); + + mtx_lock(&ctrl.lock); + ctrl.cmd.ts_at = test_clock() + (after * 1000); + ctrl.cmd.delay = delay; + ctrl.cmd.ack = 0; + cnd_broadcast(&ctrl.cnd); + + /* Wait for ack from sockem thread */ + while (!ctrl.cmd.ack) { + TEST_SAY("Waiting for sockem control ack\n"); + cnd_timedwait_ms(&ctrl.cnd, &ctrl.lock, 1000); + } + mtx_unlock(&ctrl.lock); +} + +/** + * @brief Test that Metadata requests are retried properly when + * timing out due to high broker rtt. + */ +static void do_test_low_socket_timeout(const char *topic) { + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + rd_kafka_topic_t *rkt; + rd_kafka_resp_err_t err; + const struct rd_kafka_metadata *md; + int res; + + mtx_init(&ctrl.lock, mtx_plain); + cnd_init(&ctrl.cnd); + + TEST_SAY("Test Metadata request retries on timeout\n"); + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "socket.timeout.ms", "1000"); + test_conf_set(conf, "socket.max.fails", "12345"); + test_conf_set(conf, "retry.backoff.ms", "5000"); + /* Avoid api version requests (with their own timeout) to get in + * the way of our test */ + test_conf_set(conf, "api.version.request", "false"); + test_socket_enable(conf); + test_curr->connect_cb = connect_cb; + test_curr->is_fatal_cb = is_fatal_cb; + + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + rkt = test_create_producer_topic(rk, topic, NULL); + + TEST_SAY("Waiting for sockem connect..\n"); + mtx_lock(&ctrl.lock); + while (!ctrl.skm) + cnd_wait(&ctrl.cnd, &ctrl.lock); + mtx_unlock(&ctrl.lock); + + TEST_SAY( + "Connected, fire off a undelayed metadata() to " + "make sure connection is up\n"); + + err = rd_kafka_metadata(rk, 0, rkt, &md, tmout_multip(2000)); + TEST_ASSERT(!err, "metadata(undelayed) failed: %s", + rd_kafka_err2str(err)); + rd_kafka_metadata_destroy(md); + + if (thrd_create(&ctrl.thrd, ctrl_thrd_main, NULL) != thrd_success) + TEST_FAIL("Failed to create sockem ctrl thread"); + + set_delay(0, 3000); /* Takes effect immediately */ + + /* After two retries, remove the delay, the third retry + * should kick in and work. */ + set_delay( + ((1000 /*socket.timeout.ms*/ + 5000 /*retry.backoff.ms*/) * 2) - + 2000, + 0); + + TEST_SAY( + "Calling metadata() again which should succeed after " + "3 internal retries\n"); + /* Metadata should be returned after the third retry */ + err = rd_kafka_metadata( + rk, 0, rkt, &md, + ((1000 /*socket.timeout.ms*/ + 5000 /*retry.backoff.ms*/) * 2) + + 5000); + TEST_SAY("metadata() returned %s\n", rd_kafka_err2str(err)); + TEST_ASSERT(!err, "metadata(undelayed) failed: %s", + rd_kafka_err2str(err)); + rd_kafka_metadata_destroy(md); + + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(rk); + + /* Join controller thread */ + mtx_lock(&ctrl.lock); + ctrl.term = 1; + mtx_unlock(&ctrl.lock); + thrd_join(ctrl.thrd, &res); + + cnd_destroy(&ctrl.cnd); + mtx_destroy(&ctrl.lock); +} + +int main_0075_retry(int argc, char **argv) { + const char *topic = test_mk_topic_name("0075_retry", 1); + + do_test_low_socket_timeout(topic); + + return 0; +} + + +#endif -- cgit v1.2.3