summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0075-retry.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0075-retry.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/0075-retry.c252
1 files changed, 0 insertions, 252 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0075-retry.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0075-retry.c
deleted file mode 100644
index 7e1e4f0f5..000000000
--- a/fluent-bit/lib/librdkafka-2.1.0/tests/0075-retry.c
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * librdkafka - Apache Kafka C library
- *
- * Copyright (c) 2012-2015, Magnus Edenhill
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met:
- *
- * 1. Redistributions of source code must retain the above copyright notice,
- * this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright notice,
- * this list of conditions and the following disclaimer in the documentation
- * and/or other materials provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- */
-
-#include "test.h"
-
-#if WITH_SOCKEM
-#include "rdkafka.h"
-
-#include <stdarg.h>
-#include <errno.h>
-
-/**
- * Request retry testing
- */
-
-/* Hang on to the first broker socket we see in connect_cb,
- * reject all the rest (connection refused) to make sure we're only
- * playing with one single broker for this test. */
-static struct {
- mtx_t lock;
- cnd_t cnd;
- sockem_t *skm;
- thrd_t thrd;
- struct {
- int64_t ts_at; /* to ctrl thread: at this time, set delay */
- int delay;
- int ack; /* from ctrl thread: new delay acked */
- } cmd;
- struct {
- int64_t ts_at; /* to ctrl thread: at this time, set delay */
- int delay;
-
- } next;
- int term;
-} ctrl;
-
-static int ctrl_thrd_main(void *arg) {
-
-
- mtx_lock(&ctrl.lock);
- while (!ctrl.term) {
- int64_t now;
-
- cnd_timedwait_ms(&ctrl.cnd, &ctrl.lock, 10);
-
- if (ctrl.cmd.ts_at) {
- ctrl.next.ts_at = ctrl.cmd.ts_at;
- ctrl.next.delay = ctrl.cmd.delay;
- ctrl.cmd.ts_at = 0;
- ctrl.cmd.ack = 1;
- printf(_C_CYA
- "## %s: sockem: "
- "receieved command to set delay "
- "to %d in %dms\n" _C_CLR,
- __FILE__, ctrl.next.delay,
- (int)(ctrl.next.ts_at - test_clock()) / 1000);
- }
-
- now = test_clock();
- if (ctrl.next.ts_at && now > ctrl.next.ts_at) {
- assert(ctrl.skm);
- printf(_C_CYA
- "## %s: "
- "sockem: setting socket delay to %d\n" _C_CLR,
- __FILE__, ctrl.next.delay);
- sockem_set(ctrl.skm, "delay", ctrl.next.delay, NULL);
- ctrl.next.ts_at = 0;
- cnd_signal(&ctrl.cnd); /* signal back to caller */
- }
- }
- mtx_unlock(&ctrl.lock);
-
- return 0;
-}
-
-
-/**
- * @brief Sockem connect, called from **internal librdkafka thread** through
- * librdkafka's connect_cb
- */
-static int connect_cb(struct test *test, sockem_t *skm, const char *id) {
-
- mtx_lock(&ctrl.lock);
- if (ctrl.skm) {
- /* Reject all but the first connect */
- mtx_unlock(&ctrl.lock);
- return ECONNREFUSED;
- }
-
- ctrl.skm = skm;
-
- /* signal wakeup to main thread */
- cnd_broadcast(&ctrl.cnd);
- mtx_unlock(&ctrl.lock);
-
- return 0;
-}
-
-static int
-is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) {
- /* Ignore connectivity errors since we'll be bringing down
- * .. connectivity.
- * SASL auther will think a connection-down even in the auth
- * state means the broker doesn't support SASL PLAIN. */
- TEST_SAY("is_fatal?: %s: %s\n", rd_kafka_err2str(err), reason);
- if (err == RD_KAFKA_RESP_ERR__TRANSPORT ||
- err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN ||
- err == RD_KAFKA_RESP_ERR__AUTHENTICATION ||
- err == RD_KAFKA_RESP_ERR__TIMED_OUT)
- return 0;
- return 1;
-}
-
-/**
- * @brief Set socket delay to kick in after \p after ms
- */
-static void set_delay(int after, int delay) {
- TEST_SAY("Set delay to %dms (after %dms)\n", delay, after);
-
- mtx_lock(&ctrl.lock);
- ctrl.cmd.ts_at = test_clock() + (after * 1000);
- ctrl.cmd.delay = delay;
- ctrl.cmd.ack = 0;
- cnd_broadcast(&ctrl.cnd);
-
- /* Wait for ack from sockem thread */
- while (!ctrl.cmd.ack) {
- TEST_SAY("Waiting for sockem control ack\n");
- cnd_timedwait_ms(&ctrl.cnd, &ctrl.lock, 1000);
- }
- mtx_unlock(&ctrl.lock);
-}
-
-/**
- * @brief Test that Metadata requests are retried properly when
- * timing out due to high broker rtt.
- */
-static void do_test_low_socket_timeout(const char *topic) {
- rd_kafka_t *rk;
- rd_kafka_conf_t *conf;
- rd_kafka_topic_t *rkt;
- rd_kafka_resp_err_t err;
- const struct rd_kafka_metadata *md;
- int res;
-
- mtx_init(&ctrl.lock, mtx_plain);
- cnd_init(&ctrl.cnd);
-
- TEST_SAY("Test Metadata request retries on timeout\n");
-
- test_conf_init(&conf, NULL, 60);
- test_conf_set(conf, "socket.timeout.ms", "1000");
- test_conf_set(conf, "socket.max.fails", "12345");
- test_conf_set(conf, "retry.backoff.ms", "5000");
- /* Avoid api version requests (with their own timeout) to get in
- * the way of our test */
- test_conf_set(conf, "api.version.request", "false");
- test_socket_enable(conf);
- test_curr->connect_cb = connect_cb;
- test_curr->is_fatal_cb = is_fatal_cb;
-
- rk = test_create_handle(RD_KAFKA_PRODUCER, conf);
- rkt = test_create_producer_topic(rk, topic, NULL);
-
- TEST_SAY("Waiting for sockem connect..\n");
- mtx_lock(&ctrl.lock);
- while (!ctrl.skm)
- cnd_wait(&ctrl.cnd, &ctrl.lock);
- mtx_unlock(&ctrl.lock);
-
- TEST_SAY(
- "Connected, fire off a undelayed metadata() to "
- "make sure connection is up\n");
-
- err = rd_kafka_metadata(rk, 0, rkt, &md, tmout_multip(2000));
- TEST_ASSERT(!err, "metadata(undelayed) failed: %s",
- rd_kafka_err2str(err));
- rd_kafka_metadata_destroy(md);
-
- if (thrd_create(&ctrl.thrd, ctrl_thrd_main, NULL) != thrd_success)
- TEST_FAIL("Failed to create sockem ctrl thread");
-
- set_delay(0, 3000); /* Takes effect immediately */
-
- /* After two retries, remove the delay, the third retry
- * should kick in and work. */
- set_delay(
- ((1000 /*socket.timeout.ms*/ + 5000 /*retry.backoff.ms*/) * 2) -
- 2000,
- 0);
-
- TEST_SAY(
- "Calling metadata() again which should succeed after "
- "3 internal retries\n");
- /* Metadata should be returned after the third retry */
- err = rd_kafka_metadata(
- rk, 0, rkt, &md,
- ((1000 /*socket.timeout.ms*/ + 5000 /*retry.backoff.ms*/) * 2) +
- 5000);
- TEST_SAY("metadata() returned %s\n", rd_kafka_err2str(err));
- TEST_ASSERT(!err, "metadata(undelayed) failed: %s",
- rd_kafka_err2str(err));
- rd_kafka_metadata_destroy(md);
-
- rd_kafka_topic_destroy(rkt);
- rd_kafka_destroy(rk);
-
- /* Join controller thread */
- mtx_lock(&ctrl.lock);
- ctrl.term = 1;
- mtx_unlock(&ctrl.lock);
- thrd_join(ctrl.thrd, &res);
-
- cnd_destroy(&ctrl.cnd);
- mtx_destroy(&ctrl.lock);
-}
-
-int main_0075_retry(int argc, char **argv) {
- const char *topic = test_mk_topic_name("0075_retry", 1);
-
- do_test_low_socket_timeout(topic);
-
- return 0;
-}
-
-
-#endif