From b485aab7e71c1625cfc27e0f92c9509f42378458 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 13:19:16 +0200 Subject: Adding upstream version 1.45.3+dfsg. Signed-off-by: Daniel Baumann --- .../lib/librdkafka-2.1.0/tests/0086-purge.c | 334 +++++++++++++++++++++ 1 file changed, 334 insertions(+) create mode 100644 src/fluent-bit/lib/librdkafka-2.1.0/tests/0086-purge.c (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0086-purge.c') diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0086-purge.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0086-purge.c new file mode 100644 index 000000000..4dbf937f3 --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0086-purge.c @@ -0,0 +1,334 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +#include "../src/rdkafka_protocol.h" + +/** + * @name Test rd_kafka_purge() + * + * Local test: + * - produce 29 messages (that will be held up in queues), + * for specific partitions and UA. + * - purge(INFLIGHT) => no change in len() + * - purge(QUEUE) => len() should drop to 0, dr errs should be ERR__PURGE_QUEUE + * + * Remote test (WITH_SOCKEM): + * - Limit in-flight messages to 10 + * - Produce 20 messages to the same partition, in batches of 10. + * - First batch succeeds, then sets a 50 s delay + * - Second batch times out in flight + * - Third batch isn't completed an times out in queue + * - purge(QUEUE) => len should drop to 10, dr err ERR__PURGE_QUEUE + * - purge(INFLIGHT|QUEUE) => len should drop to 0, ERR__PURGE_INFLIGHT + */ + + +static const int msgcnt = 29; +struct waitmsgs { + rd_kafka_resp_err_t exp_err[29]; + int cnt; +}; + +static mtx_t produce_req_lock; +static cnd_t produce_req_cnd; +static int produce_req_cnt = 0; + + +#if WITH_SOCKEM + +int test_sockfd = 0; + +static rd_kafka_resp_err_t on_request_sent(rd_kafka_t *rk, + int sockfd, + const char *brokername, + int32_t brokerid, + int16_t ApiKey, + int16_t ApiVersion, + int32_t CorrId, + size_t size, + void *ic_opaque) { + + /* Save socket fd to limit ProduceRequest */ + if (ApiKey == RD_KAFKAP_ApiVersion) { + test_sockfd = sockfd; + return RD_KAFKA_RESP_ERR_NO_ERROR; + } + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +static rd_kafka_resp_err_t on_response_received(rd_kafka_t *rk, + int sockfd, + const char *brokername, + int32_t brokerid, + int16_t ApiKey, + int16_t ApiVersion, + int32_t CorrId, + size_t size, + int64_t rtt, + rd_kafka_resp_err_t err, + void *ic_opaque) { + /* Add delay to send fd after first batch is received */ + if (ApiKey == RD_KAFKAP_Produce) { + mtx_lock(&produce_req_lock); + produce_req_cnt++; + cnd_broadcast(&produce_req_cnd); + mtx_unlock(&produce_req_lock); + test_socket_sockem_set(test_sockfd, "delay", 50000); + } + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + +static rd_kafka_resp_err_t on_new_producer(rd_kafka_t *rk, + const rd_kafka_conf_t *conf, + void *ic_opaque, + char *errstr, + size_t errstr_size) { + rd_kafka_resp_err_t err; + err = rd_kafka_interceptor_add_on_request_sent(rk, "catch_producer_req", + on_request_sent, NULL); + if (!err) { + rd_kafka_interceptor_add_on_response_received( + rk, "catch_api_version_resp", on_response_received, NULL); + } + return err; +} +#endif + + + +static void +dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { + int msgid; + struct waitmsgs *waitmsgs = rkmessage->_private; + + TEST_ASSERT(waitmsgs->cnt > 0, "wait_msg_cnt is zero on DR"); + + waitmsgs->cnt--; + + TEST_ASSERT(rkmessage->len == sizeof(msgid), + "invalid message size %" PRIusz ", expected sizeof(int)", + rkmessage->len); + + memcpy(&msgid, rkmessage->payload, rkmessage->len); + + TEST_ASSERT(msgid >= 0 && msgid < msgcnt, "msgid %d out of range 0..%d", + msgid, msgcnt - 1); + + TEST_ASSERT((int)waitmsgs->exp_err[msgid] != 12345, + "msgid %d delivered twice", msgid); + + TEST_SAY("DeliveryReport for msg #%d: %s\n", msgid, + rd_kafka_err2name(rkmessage->err)); + + if (rkmessage->err != waitmsgs->exp_err[msgid]) { + TEST_FAIL_LATER("Expected message #%d to fail with %s, not %s", + msgid, + rd_kafka_err2str(waitmsgs->exp_err[msgid]), + rd_kafka_err2str(rkmessage->err)); + } + + /* Indicate already seen */ + waitmsgs->exp_err[msgid] = (rd_kafka_resp_err_t)12345; +} + + + +static void purge_and_expect(const char *what, + int line, + rd_kafka_t *rk, + int purge_flags, + struct waitmsgs *waitmsgs, + int exp_remain, + const char *reason) { + test_timing_t t_purge; + rd_kafka_resp_err_t err; + + TEST_SAY( + "%s:%d: purge(0x%x): " + "expecting %d messages to remain when done\n", + what, line, purge_flags, exp_remain); + TIMING_START(&t_purge, "%s:%d: purge(0x%x)", what, line, purge_flags); + err = rd_kafka_purge(rk, purge_flags); + TIMING_STOP(&t_purge); + + TEST_ASSERT(!err, "purge(0x%x) at %d failed: %s", purge_flags, line, + rd_kafka_err2str(err)); + + rd_kafka_poll(rk, 0); + TEST_ASSERT(waitmsgs->cnt == exp_remain, + "%s:%d: expected %d messages remaining, not %d", what, line, + exp_remain, waitmsgs->cnt); +} + + +/** + * @brief Don't treat ERR__GAPLESS_GUARANTEE as a fatal error + */ +static int gapless_is_not_fatal_cb(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + const char *reason) { + return err != RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE; +} + +static void +do_test_purge(const char *what, int remote, int idempotence, int gapless) { + const char *topic = test_mk_topic_name("0086_purge", 0); + rd_kafka_conf_t *conf; + rd_kafka_t *rk; + int i; + rd_kafka_resp_err_t err; + struct waitmsgs waitmsgs = RD_ZERO_INIT; + +#if !WITH_SOCKEM + if (remote) { + TEST_SKIP("No sockem support\n"); + return; + } +#endif + + TEST_SAY(_C_MAG "Test rd_kafka_purge(): %s\n" _C_CLR, what); + + test_conf_init(&conf, NULL, 20); + + test_conf_set(conf, "batch.num.messages", "10"); + test_conf_set(conf, "max.in.flight", "1"); + test_conf_set(conf, "linger.ms", "5000"); + test_conf_set(conf, "enable.idempotence", + idempotence ? "true" : "false"); + test_conf_set(conf, "enable.gapless.guarantee", + gapless ? "true" : "false"); + rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); + + if (remote) { +#if WITH_SOCKEM + test_socket_enable(conf); + rd_kafka_conf_interceptor_add_on_new(conf, "on_new_producer", + on_new_producer, NULL); +#endif + + if (idempotence && !gapless) + test_curr->is_fatal_cb = gapless_is_not_fatal_cb; + + mtx_init(&produce_req_lock, mtx_plain); + cnd_init(&produce_req_cnd); + } else { + test_conf_set(conf, "bootstrap.servers", NULL); + } + + rk = test_create_handle(RD_KAFKA_PRODUCER, conf); + + TEST_SAY("Producing %d messages to topic %s\n", msgcnt, topic); + + for (i = 0; i < msgcnt; i++) { + int32_t partition; + + if (remote) { + /* We need all messages in the same partition + * so that remaining messages are queued + * up behind the first messageset */ + partition = 0; + } else { + partition = (i < 20 ? i % 3 : RD_KAFKA_PARTITION_UA); + } + + err = rd_kafka_producev( + rk, RD_KAFKA_V_TOPIC(topic), + RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_VALUE((void *)&i, sizeof(i)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_OPAQUE(&waitmsgs), RD_KAFKA_V_END); + TEST_ASSERT(!err, "producev(#%d) failed: %s", i, + rd_kafka_err2str(err)); + + waitmsgs.exp_err[i] = + (remote && i < 10 + ? RD_KAFKA_RESP_ERR_NO_ERROR + : remote && i < 20 ? RD_KAFKA_RESP_ERR__PURGE_INFLIGHT + : RD_KAFKA_RESP_ERR__PURGE_QUEUE); + + waitmsgs.cnt++; + } + + + if (remote) { + /* Wait for ProduceRequest to be sent */ + mtx_lock(&produce_req_lock); + cnd_timedwait_ms(&produce_req_cnd, &produce_req_lock, + 15 * 1000); + TEST_ASSERT(produce_req_cnt > 0, + "First Produce request should've been sent by now"); + mtx_unlock(&produce_req_lock); + + purge_and_expect(what, __LINE__, rk, RD_KAFKA_PURGE_F_QUEUE, + &waitmsgs, 10, + "in-flight messages should not be purged"); + + purge_and_expect( + what, __LINE__, rk, + RD_KAFKA_PURGE_F_INFLIGHT | RD_KAFKA_PURGE_F_QUEUE, + &waitmsgs, 0, "all messages should have been purged"); + } else { + purge_and_expect(what, __LINE__, rk, RD_KAFKA_PURGE_F_INFLIGHT, + &waitmsgs, msgcnt, + "no messagess should have been purged"); + + purge_and_expect(what, __LINE__, rk, RD_KAFKA_PURGE_F_QUEUE, + &waitmsgs, 0, + "no messagess should have been purged"); + } + + + rd_kafka_destroy(rk); + + TEST_LATER_CHECK(); +} + + +int main_0086_purge_remote(int argc, char **argv) { + const rd_bool_t has_idempotence = + test_broker_version >= TEST_BRKVER(0, 11, 0, 0); + + do_test_purge("remote", 1 /*remote*/, 0 /*idempotence*/, + 0 /*!gapless*/); + + if (has_idempotence) { + do_test_purge("remote,idempotence", 1 /*remote*/, + 1 /*idempotence*/, 0 /*!gapless*/); + do_test_purge("remote,idempotence,gapless", 1 /*remote*/, + 1 /*idempotence*/, 1 /*!gapless*/); + } + return 0; +} + + +int main_0086_purge_local(int argc, char **argv) { + do_test_purge("local", 0 /*local*/, 0, 0); + return 0; +} -- cgit v1.2.3