diff options
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0076-produce_retry.c')
-rw-r--r-- | src/fluent-bit/lib/librdkafka-2.1.0/tests/0076-produce_retry.c | 350 |
1 files changed, 350 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0076-produce_retry.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0076-produce_retry.c new file mode 100644 index 000000000..16d6f602c --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0076-produce_retry.c @@ -0,0 +1,350 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +#include "rdkafka.h" + +#include <stdarg.h> +#include <errno.h> + +static int +is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) { + /* Ignore connectivity errors since we'll be bringing down + * .. connectivity. + * SASL auther will think a connection-down even in the auth + * state means the broker doesn't support SASL PLAIN. */ + TEST_SAY("is_fatal?: %s: %s\n", rd_kafka_err2str(err), reason); + if (err == RD_KAFKA_RESP_ERR__TRANSPORT || + err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN || + err == RD_KAFKA_RESP_ERR__AUTHENTICATION || + err == RD_KAFKA_RESP_ERR__TIMED_OUT) + return 0; + return 1; +} + + +#if WITH_SOCKEM +/** + * Producer message retry testing + */ + +/* Hang on to the first broker socket we see in connect_cb, + * reject all the rest (connection refused) to make sure we're only + * playing with one single broker for this test. */ + +#include "sockem_ctrl.h" + + +/** + * @brief Test produce retries. + * + * @param should_fail If true, do negative testing which should fail. + */ +static void do_test_produce_retries(const char *topic, + int idempotence, + int try_fail, + int should_fail) { + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + rd_kafka_topic_t *rkt; + uint64_t testid; + rd_kafka_resp_err_t err; + int msgcnt = 1; + sockem_ctrl_t ctrl; + + TEST_SAY(_C_BLU + "Test produce retries " + "(idempotence=%d,try_fail=%d,should_fail=%d)\n", + idempotence, try_fail, should_fail); + + testid = test_id_generate(); + + test_conf_init(&conf, NULL, 60); + + if (should_fail && + !strcmp(test_conf_get(conf, "enable.sparse.connections"), "true")) { + rd_kafka_conf_destroy(conf); + TEST_SAY(_C_YEL + "Sparse connections enabled: " + "skipping connection-timing related test\n"); + return; + } + + sockem_ctrl_init(&ctrl); + + test_conf_set(conf, "socket.timeout.ms", "1000"); + /* Avoid disconnects on request timeouts */ + test_conf_set(conf, "socket.max.fails", "100"); + test_conf_set(conf, "enable.idempotence", + idempotence ? "true" : "false"); + test_curr->exp_dr_err = RD_KAFKA_RESP_ERR_NO_ERROR; + test_curr->exp_dr_status = RD_KAFKA_MSG_STATUS_PERSISTED; + if (!try_fail) { + test_conf_set(conf, "retries", "5"); + } else { + /* enable.idempotence=true request retries >= 1 which + * makes the test pass. Adjust expected error accordingly. */ + if (idempotence) + test_conf_set(conf, "retries", "5"); + else + test_conf_set(conf, "retries", "0"); + if (should_fail) { + test_curr->exp_dr_err = + RD_KAFKA_RESP_ERR__MSG_TIMED_OUT; + test_curr->exp_dr_status = + RD_KAFKA_MSG_STATUS_POSSIBLY_PERSISTED; + } + } + test_conf_set(conf, "retry.backoff.ms", "5000"); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + test_socket_enable(conf); + test_curr->is_fatal_cb = is_fatal_cb; + + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + rkt = test_create_producer_topic(rk, topic, NULL); + + /* Create the topic to make sure connections are up and ready. */ + err = test_auto_create_topic_rkt(rk, rkt, tmout_multip(5000)); + TEST_ASSERT(!err, "topic creation failed: %s", rd_kafka_err2str(err)); + + /* Set initial delay to 3s */ + sockem_ctrl_set_delay(&ctrl, 0, 3000); /* Takes effect immediately */ + + /* After two retries, remove the delay, the third retry + * should kick in and work. */ + sockem_ctrl_set_delay( + &ctrl, + ((1000 /*socket.timeout.ms*/ + 5000 /*retry.backoff.ms*/) * 2) - + 2000, + 0); + + test_produce_msgs(rk, rkt, testid, RD_KAFKA_PARTITION_UA, 0, msgcnt, + NULL, 0); + + + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(rk); + + if (!should_fail) { + TEST_SAY("Verifying messages with consumer\n"); + test_consume_msgs_easy(NULL, topic, testid, -1, msgcnt, NULL); + } + + sockem_ctrl_term(&ctrl); + + TEST_SAY(_C_GRN + "Test produce retries " + "(idempotence=%d,try_fail=%d,should_fail=%d): PASS\n", + idempotence, try_fail, should_fail); +} +#endif + + + +/** + * @brief Simple on_request_sent interceptor that simply disconnects + * the socket when first ProduceRequest is seen. + * Sub-sequent ProduceRequests will not trigger a disconnect, to allow + * for retries. + */ +static mtx_t produce_disconnect_lock; +static int produce_disconnects = 0; +static rd_kafka_resp_err_t on_request_sent(rd_kafka_t *rk, + int sockfd, + const char *brokername, + int32_t brokerid, + int16_t ApiKey, + int16_t ApiVersion, + int32_t CorrId, + size_t size, + void *ic_opaque) { + + /* Ignore if not a ProduceRequest */ + if (ApiKey != 0) + return RD_KAFKA_RESP_ERR_NO_ERROR; + + mtx_lock(&produce_disconnect_lock); + if (produce_disconnects == 0) { + char buf[512]; + ssize_t r; + printf(_C_CYA "%s:%d: shutting down socket %d (%s)\n" _C_CLR, + __FILE__, __LINE__, sockfd, brokername); +#ifdef _WIN32 + closesocket(sockfd); +#else + close(sockfd); +#endif + /* There is a chance the broker responded in the + * time it took us to get here, so purge the + * socket recv buffer to make sure librdkafka does not see + * the response. */ + while ((r = recv(sockfd, buf, sizeof(buf), 0)) > 0) + printf(_C_CYA + "%s:%d: " + "purged %" PRIdsz " bytes from socket\n", + __FILE__, __LINE__, r); + produce_disconnects = 1; + } + mtx_unlock(&produce_disconnect_lock); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + + +static rd_kafka_resp_err_t on_new_producer(rd_kafka_t *rk, + const rd_kafka_conf_t *conf, + void *ic_opaque, + char *errstr, + size_t errstr_size) { + return rd_kafka_interceptor_add_on_request_sent( + rk, "disconnect_on_send", on_request_sent, NULL); +} + +/** + * @brief Test produce retries by disconnecting right after ProduceRequest + * has been sent. + * + * @param should_fail If true, do negative testing which should fail. + */ +static void do_test_produce_retries_disconnect(const char *topic, + int idempotence, + int try_fail, + int should_fail) { + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + rd_kafka_topic_t *rkt; + uint64_t testid; + rd_kafka_resp_err_t err; + int msgcnt = 1; + int partition_cnt; + + TEST_SAY(_C_BLU + "Test produce retries by disconnect " + "(idempotence=%d,try_fail=%d,should_fail=%d)\n", + idempotence, try_fail, should_fail); + + test_curr->is_fatal_cb = is_fatal_cb; + + testid = test_id_generate(); + + test_conf_init(&conf, NULL, 60); + rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb); + test_conf_set(conf, "socket.timeout.ms", test_quick ? "3000" : "10000"); + test_conf_set(conf, "message.timeout.ms", + test_quick ? "9000" : "30000"); + test_conf_set(conf, "enable.idempotence", + idempotence ? "true" : "false"); + if (!try_fail) { + test_conf_set(conf, "retries", "1"); + } else { + /* enable.idempotence=true request retries >= 1 which + * makes the test pass. */ + if (!idempotence) + test_conf_set(conf, "retries", "0"); + } + + mtx_init(&produce_disconnect_lock, mtx_plain); + produce_disconnects = 0; + + rd_kafka_conf_interceptor_add_on_new(conf, "on_new_producer", + on_new_producer, NULL); + + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + rkt = test_create_producer_topic(rk, topic, NULL); + + err = test_produce_sync(rk, rkt, testid, 0); + + if (should_fail) { + if (!err) + TEST_FAIL("Expected produce to fail\n"); + else + TEST_SAY("Produced message failed as expected: %s\n", + rd_kafka_err2str(err)); + } else { + if (err) + TEST_FAIL("Produced message failed: %s\n", + rd_kafka_err2str(err)); + else + TEST_SAY("Produced message delivered\n"); + } + + mtx_lock(&produce_disconnect_lock); + TEST_ASSERT(produce_disconnects == 1, "expected %d disconnects, not %d", + 1, produce_disconnects); + mtx_unlock(&produce_disconnect_lock); + + + partition_cnt = test_get_partition_count(rk, topic, tmout_multip(5000)); + + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(rk); + + TEST_SAY("Verifying messages with consumer\n"); + test_consume_msgs_easy(NULL, topic, testid, partition_cnt, + /* Since we don't know the number of + * messages that got thru on the socket + * before disconnect we can't let the + * expected message count be 0 in case of + * should_fail, so instead ignore the message + * count (-1). */ + should_fail ? -1 : msgcnt, NULL); + + TEST_SAY(_C_GRN + "Test produce retries by disconnect " + "(idempotence=%d,try_fail=%d,should_fail=%d): PASS\n", + idempotence, try_fail, should_fail); +} + + +int main_0076_produce_retry(int argc, char **argv) { + const char *topic = test_mk_topic_name("0076_produce_retry", 1); + const rd_bool_t has_idempotence = + test_broker_version >= TEST_BRKVER(0, 11, 0, 0); + +#if WITH_SOCKEM + if (has_idempotence) { + /* Idempotence, no try fail, should succeed. */ + do_test_produce_retries(topic, 1, 0, 0); + /* Idempotence, try fail, should succeed. */ + do_test_produce_retries(topic, 1, 1, 0); + } + /* No idempotence, try fail, should fail. */ + do_test_produce_retries(topic, 0, 1, 1); +#endif + + if (has_idempotence) { + /* Idempotence, no try fail, should succeed. */ + do_test_produce_retries_disconnect(topic, 1, 0, 0); + /* Idempotence, try fail, should succeed. */ + do_test_produce_retries_disconnect(topic, 1, 1, 0); + } + /* No idempotence, try fail, should fail. */ + do_test_produce_retries_disconnect(topic, 0, 1, 1); + + return 0; +} |