diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0086-purge.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0086-purge.c | 334 |
1 files changed, 0 insertions, 334 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0086-purge.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0086-purge.c deleted file mode 100644 index 4dbf937f..00000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/0086-purge.c +++ /dev/null @@ -1,334 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2012-2015, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "test.h" -#include "../src/rdkafka_protocol.h" - -/** - * @name Test rd_kafka_purge() - * - * Local test: - * - produce 29 messages (that will be held up in queues), - * for specific partitions and UA. - * - purge(INFLIGHT) => no change in len() - * - purge(QUEUE) => len() should drop to 0, dr errs should be ERR__PURGE_QUEUE - * - * Remote test (WITH_SOCKEM): - * - Limit in-flight messages to 10 - * - Produce 20 messages to the same partition, in batches of 10. - * - First batch succeeds, then sets a 50 s delay - * - Second batch times out in flight - * - Third batch isn't completed an times out in queue - * - purge(QUEUE) => len should drop to 10, dr err ERR__PURGE_QUEUE - * - purge(INFLIGHT|QUEUE) => len should drop to 0, ERR__PURGE_INFLIGHT - */ - - -static const int msgcnt = 29; -struct waitmsgs { - rd_kafka_resp_err_t exp_err[29]; - int cnt; -}; - -static mtx_t produce_req_lock; -static cnd_t produce_req_cnd; -static int produce_req_cnt = 0; - - -#if WITH_SOCKEM - -int test_sockfd = 0; - -static rd_kafka_resp_err_t on_request_sent(rd_kafka_t *rk, - int sockfd, - const char *brokername, - int32_t brokerid, - int16_t ApiKey, - int16_t ApiVersion, - int32_t CorrId, - size_t size, - void *ic_opaque) { - - /* Save socket fd to limit ProduceRequest */ - if (ApiKey == RD_KAFKAP_ApiVersion) { - test_sockfd = sockfd; - return RD_KAFKA_RESP_ERR_NO_ERROR; - } - - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - -static rd_kafka_resp_err_t on_response_received(rd_kafka_t *rk, - int sockfd, - const char *brokername, - int32_t brokerid, - int16_t ApiKey, - int16_t ApiVersion, - int32_t CorrId, - size_t size, - int64_t rtt, - rd_kafka_resp_err_t err, - void *ic_opaque) { - /* Add delay to send fd after first batch is received */ - if (ApiKey == RD_KAFKAP_Produce) { - mtx_lock(&produce_req_lock); - produce_req_cnt++; - cnd_broadcast(&produce_req_cnd); - mtx_unlock(&produce_req_lock); - test_socket_sockem_set(test_sockfd, "delay", 50000); - } - return RD_KAFKA_RESP_ERR_NO_ERROR; -} - -static rd_kafka_resp_err_t on_new_producer(rd_kafka_t *rk, - const rd_kafka_conf_t *conf, - void *ic_opaque, - char *errstr, - size_t errstr_size) { - rd_kafka_resp_err_t err; - err = rd_kafka_interceptor_add_on_request_sent(rk, "catch_producer_req", - on_request_sent, NULL); - if (!err) { - rd_kafka_interceptor_add_on_response_received( - rk, "catch_api_version_resp", on_response_received, NULL); - } - return err; -} -#endif - - - -static void -dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) { - int msgid; - struct waitmsgs *waitmsgs = rkmessage->_private; - - TEST_ASSERT(waitmsgs->cnt > 0, "wait_msg_cnt is zero on DR"); - - waitmsgs->cnt--; - - TEST_ASSERT(rkmessage->len == sizeof(msgid), - "invalid message size %" PRIusz ", expected sizeof(int)", - rkmessage->len); - - memcpy(&msgid, rkmessage->payload, rkmessage->len); - - TEST_ASSERT(msgid >= 0 && msgid < msgcnt, "msgid %d out of range 0..%d", - msgid, msgcnt - 1); - - TEST_ASSERT((int)waitmsgs->exp_err[msgid] != 12345, - "msgid %d delivered twice", msgid); - - TEST_SAY("DeliveryReport for msg #%d: %s\n", msgid, - rd_kafka_err2name(rkmessage->err)); - - if (rkmessage->err != waitmsgs->exp_err[msgid]) { - TEST_FAIL_LATER("Expected message #%d to fail with %s, not %s", - msgid, - rd_kafka_err2str(waitmsgs->exp_err[msgid]), - rd_kafka_err2str(rkmessage->err)); - } - - /* Indicate already seen */ - waitmsgs->exp_err[msgid] = (rd_kafka_resp_err_t)12345; -} - - - -static void purge_and_expect(const char *what, - int line, - rd_kafka_t *rk, - int purge_flags, - struct waitmsgs *waitmsgs, - int exp_remain, - const char *reason) { - test_timing_t t_purge; - rd_kafka_resp_err_t err; - - TEST_SAY( - "%s:%d: purge(0x%x): " - "expecting %d messages to remain when done\n", - what, line, purge_flags, exp_remain); - TIMING_START(&t_purge, "%s:%d: purge(0x%x)", what, line, purge_flags); - err = rd_kafka_purge(rk, purge_flags); - TIMING_STOP(&t_purge); - - TEST_ASSERT(!err, "purge(0x%x) at %d failed: %s", purge_flags, line, - rd_kafka_err2str(err)); - - rd_kafka_poll(rk, 0); - TEST_ASSERT(waitmsgs->cnt == exp_remain, - "%s:%d: expected %d messages remaining, not %d", what, line, - exp_remain, waitmsgs->cnt); -} - - -/** - * @brief Don't treat ERR__GAPLESS_GUARANTEE as a fatal error - */ -static int gapless_is_not_fatal_cb(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - const char *reason) { - return err != RD_KAFKA_RESP_ERR__GAPLESS_GUARANTEE; -} - -static void -do_test_purge(const char *what, int remote, int idempotence, int gapless) { - const char *topic = test_mk_topic_name("0086_purge", 0); - rd_kafka_conf_t *conf; - rd_kafka_t *rk; - int i; - rd_kafka_resp_err_t err; - struct waitmsgs waitmsgs = RD_ZERO_INIT; - -#if !WITH_SOCKEM - if (remote) { - TEST_SKIP("No sockem support\n"); - return; - } -#endif - - TEST_SAY(_C_MAG "Test rd_kafka_purge(): %s\n" _C_CLR, what); - - test_conf_init(&conf, NULL, 20); - - test_conf_set(conf, "batch.num.messages", "10"); - test_conf_set(conf, "max.in.flight", "1"); - test_conf_set(conf, "linger.ms", "5000"); - test_conf_set(conf, "enable.idempotence", - idempotence ? "true" : "false"); - test_conf_set(conf, "enable.gapless.guarantee", - gapless ? "true" : "false"); - rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb); - - if (remote) { -#if WITH_SOCKEM - test_socket_enable(conf); - rd_kafka_conf_interceptor_add_on_new(conf, "on_new_producer", - on_new_producer, NULL); -#endif - - if (idempotence && !gapless) - test_curr->is_fatal_cb = gapless_is_not_fatal_cb; - - mtx_init(&produce_req_lock, mtx_plain); - cnd_init(&produce_req_cnd); - } else { - test_conf_set(conf, "bootstrap.servers", NULL); - } - - rk = test_create_handle(RD_KAFKA_PRODUCER, conf); - - TEST_SAY("Producing %d messages to topic %s\n", msgcnt, topic); - - for (i = 0; i < msgcnt; i++) { - int32_t partition; - - if (remote) { - /* We need all messages in the same partition - * so that remaining messages are queued - * up behind the first messageset */ - partition = 0; - } else { - partition = (i < 20 ? i % 3 : RD_KAFKA_PARTITION_UA); - } - - err = rd_kafka_producev( - rk, RD_KAFKA_V_TOPIC(topic), - RD_KAFKA_V_PARTITION(partition), - RD_KAFKA_V_VALUE((void *)&i, sizeof(i)), - RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), - RD_KAFKA_V_OPAQUE(&waitmsgs), RD_KAFKA_V_END); - TEST_ASSERT(!err, "producev(#%d) failed: %s", i, - rd_kafka_err2str(err)); - - waitmsgs.exp_err[i] = - (remote && i < 10 - ? RD_KAFKA_RESP_ERR_NO_ERROR - : remote && i < 20 ? RD_KAFKA_RESP_ERR__PURGE_INFLIGHT - : RD_KAFKA_RESP_ERR__PURGE_QUEUE); - - waitmsgs.cnt++; - } - - - if (remote) { - /* Wait for ProduceRequest to be sent */ - mtx_lock(&produce_req_lock); - cnd_timedwait_ms(&produce_req_cnd, &produce_req_lock, - 15 * 1000); - TEST_ASSERT(produce_req_cnt > 0, - "First Produce request should've been sent by now"); - mtx_unlock(&produce_req_lock); - - purge_and_expect(what, __LINE__, rk, RD_KAFKA_PURGE_F_QUEUE, - &waitmsgs, 10, - "in-flight messages should not be purged"); - - purge_and_expect( - what, __LINE__, rk, - RD_KAFKA_PURGE_F_INFLIGHT | RD_KAFKA_PURGE_F_QUEUE, - &waitmsgs, 0, "all messages should have been purged"); - } else { - purge_and_expect(what, __LINE__, rk, RD_KAFKA_PURGE_F_INFLIGHT, - &waitmsgs, msgcnt, - "no messagess should have been purged"); - - purge_and_expect(what, __LINE__, rk, RD_KAFKA_PURGE_F_QUEUE, - &waitmsgs, 0, - "no messagess should have been purged"); - } - - - rd_kafka_destroy(rk); - - TEST_LATER_CHECK(); -} - - -int main_0086_purge_remote(int argc, char **argv) { - const rd_bool_t has_idempotence = - test_broker_version >= TEST_BRKVER(0, 11, 0, 0); - - do_test_purge("remote", 1 /*remote*/, 0 /*idempotence*/, - 0 /*!gapless*/); - - if (has_idempotence) { - do_test_purge("remote,idempotence", 1 /*remote*/, - 1 /*idempotence*/, 0 /*!gapless*/); - do_test_purge("remote,idempotence,gapless", 1 /*remote*/, - 1 /*idempotence*/, 1 /*!gapless*/); - } - return 0; -} - - -int main_0086_purge_local(int argc, char **argv) { - do_test_purge("local", 0 /*local*/, 0, 0); - return 0; -} |