diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 11:19:16 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:07:37 +0000 |
commit | b485aab7e71c1625cfc27e0f92c9509f42378458 (patch) | |
tree | ae9abe108601079d1679194de237c9a435ae5b55 /src/fluent-bit/lib/librdkafka-2.1.0/tests/0104-fetch_from_follower_mock.c | |
parent | Adding upstream version 1.44.3. (diff) | |
download | netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip |
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0104-fetch_from_follower_mock.c')
-rw-r--r-- | src/fluent-bit/lib/librdkafka-2.1.0/tests/0104-fetch_from_follower_mock.c | 617 |
1 files changed, 617 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0104-fetch_from_follower_mock.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0104-fetch_from_follower_mock.c new file mode 100644 index 000000000..1ecf99da3 --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0104-fetch_from_follower_mock.c @@ -0,0 +1,617 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2019, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + + +/** + * @name Fetch from follower tests using the mock broker. + */ + +static int allowed_error; + +/** + * @brief Decide what error_cb's will cause the test to fail. + */ +static int +error_is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) { + if (err == allowed_error || + /* If transport errors are allowed then it is likely + * that we'll also see ALL_BROKERS_DOWN. */ + (allowed_error == RD_KAFKA_RESP_ERR__TRANSPORT && + err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)) { + TEST_SAY("Ignoring allowed error: %s: %s\n", + rd_kafka_err2name(err), reason); + return 0; + } + return 1; +} + + +/** + * @brief Test offset reset when fetching from replica. + * Since the highwatermark is in sync with the leader the + * ERR_OFFSETS_OUT_OF_RANGE is trusted by the consumer and + * a reset is performed. See do_test_offset_reset_lag() + * for the case where the replica is lagging and can't be trusted. + */ +static void do_test_offset_reset(const char *auto_offset_reset) { + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + rd_kafka_t *c; + const char *topic = "test"; + const int msgcnt = 1000; + const size_t msgsize = 1000; + + TEST_SAY(_C_MAG "[ Test FFF auto.offset.reset=%s ]\n", + auto_offset_reset); + + mcluster = test_mock_cluster_new(3, &bootstraps); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, msgsize, + "bootstrap.servers", bootstraps, + "batch.num.messages", "10", NULL); + + /* Set partition leader to broker 1, follower to broker 2 */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 2); + + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "client.rack", "myrack"); + test_conf_set(conf, "auto.offset.reset", auto_offset_reset); + /* Make sure we don't consume the entire partition in one Fetch */ + test_conf_set(conf, "fetch.message.max.bytes", "100"); + + c = test_create_consumer("mygroup", NULL, conf, NULL); + + /* The first fetch will go to the leader which will redirect + * the consumer to the follower, the second and sub-sequent fetches + * will go to the follower. We want the third fetch, second one on + * the follower, to fail and trigger an offset reset. */ + rd_kafka_mock_push_request_errors( + mcluster, 1 /*FetchRequest*/, 3, + RD_KAFKA_RESP_ERR_NO_ERROR /*leader*/, + RD_KAFKA_RESP_ERR_NO_ERROR /*follower*/, + RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE /*follower: fail*/); + + test_consumer_assign_partition(auto_offset_reset, c, topic, 0, + RD_KAFKA_OFFSET_INVALID); + + if (!strcmp(auto_offset_reset, "latest")) + test_consumer_poll_no_msgs(auto_offset_reset, c, 0, 5000); + else + test_consumer_poll(auto_offset_reset, c, 0, 1, 0, msgcnt, NULL); + + test_consumer_close(c); + + rd_kafka_destroy(c); + + test_mock_cluster_destroy(mcluster); + + TEST_SAY(_C_GRN "[ Test FFF auto.offset.reset=%s PASSED ]\n", + auto_offset_reset); +} + + +/** + * @brief Test offset reset when fetching from a lagging replica + * who's high-watermark is behind the leader, which means + * an offset reset should not be triggered. + */ +static void do_test_offset_reset_lag(void) { + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + rd_kafka_t *c; + const char *topic = "test"; + const int msgcnt = 10; + const int lag = 3; + const size_t msgsize = 1000; + + TEST_SAY(_C_MAG "[ Test lagging FFF offset reset ]\n"); + + mcluster = test_mock_cluster_new(3, &bootstraps); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, msgsize, + "bootstrap.servers", bootstraps, + "batch.num.messages", "1", NULL); + + /* Set broker rack */ + /* Set partition leader to broker 1, follower to broker 2 */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 2); + + /* Make follower lag by some messages + * ( .. -1 because offsets start at 0) */ + rd_kafka_mock_partition_set_follower_wmarks(mcluster, topic, 0, -1, + msgcnt - lag - 1); + + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "client.rack", "myrack"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + /* Make sure we don't consume the entire partition in one Fetch */ + test_conf_set(conf, "fetch.message.max.bytes", "100"); + + c = test_create_consumer("mygroup", NULL, conf, NULL); + + test_consumer_assign_partition("lag", c, topic, 0, + RD_KAFKA_OFFSET_INVALID); + + /* Should receive all messages up to the followers hwmark */ + test_consumer_poll("up to wmark", c, 0, 0, 0, msgcnt - lag, NULL); + + /* And then nothing.. as the consumer waits for the replica to + * catch up. */ + test_consumer_poll_no_msgs("no msgs", c, 0, 3000); + + /* Catch up the replica, consumer should now get the + * remaining messages */ + rd_kafka_mock_partition_set_follower_wmarks(mcluster, topic, 0, -1, -1); + test_consumer_poll("remaining", c, 0, 1, msgcnt - lag, lag, NULL); + + test_consumer_close(c); + + rd_kafka_destroy(c); + + test_mock_cluster_destroy(mcluster); + + TEST_SAY(_C_GRN "[ Test lagging FFF offset reset PASSED ]\n"); +} + + +/** + * @brief Test delegating consumer to a follower that does not exist, + * the consumer should not be able to consume any messages (which + * is questionable but for a later PR). Then change to a valid + * replica and verify messages can be consumed. + */ +static void do_test_unknown_follower(void) { + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + rd_kafka_t *c; + const char *topic = "test"; + const int msgcnt = 1000; + const size_t msgsize = 1000; + test_msgver_t mv; + + TEST_SAY(_C_MAG "[ Test unknown follower ]\n"); + + mcluster = test_mock_cluster_new(3, &bootstraps); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, msgsize, + "bootstrap.servers", bootstraps, + "batch.num.messages", "10", NULL); + + /* Set partition leader to broker 1, follower + * to non-existent broker 19 */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 19); + + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "client.rack", "myrack"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + /* Make sure we don't consume the entire partition in one Fetch */ + test_conf_set(conf, "fetch.message.max.bytes", "100"); + + c = test_create_consumer("mygroup", NULL, conf, NULL); + + test_consumer_assign_partition("unknown follower", c, topic, 0, + RD_KAFKA_OFFSET_INVALID); + + test_consumer_poll_no_msgs("unknown follower", c, 0, 5000); + + /* Set a valid follower (broker 3) */ + rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 3); + test_msgver_init(&mv, 0); + test_consumer_poll("proper follower", c, 0, 1, 0, msgcnt, &mv); + /* Verify messages were indeed received from broker 3 */ + test_msgver_verify0( + __FUNCTION__, __LINE__, "broker_id", &mv, TEST_MSGVER_BY_BROKER_ID, + (struct test_mv_vs) { + .msg_base = 0, .exp_cnt = msgcnt, .broker_id = 3}); + test_msgver_clear(&mv); + + test_consumer_close(c); + + rd_kafka_destroy(c); + + test_mock_cluster_destroy(mcluster); + + TEST_SAY(_C_GRN "[ Test unknown follower PASSED ]\n"); +} + + +/** + * @brief Issue #2955: Verify that fetch does not stall until next + * periodic metadata timeout when leader broker is no longer + * a replica. + */ +static void do_test_replica_not_available(void) { + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + rd_kafka_t *c; + const char *topic = "test"; + const int msgcnt = 1000; + + TEST_SAY(_C_MAG "[ Test REPLICA_NOT_AVAILABLE ]\n"); + + mcluster = test_mock_cluster_new(3, &bootstraps); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 1000, + "bootstrap.servers", bootstraps, + "batch.num.messages", "10", NULL); + + /* Set partition leader to broker 1. */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "client.rack", "myrack"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "topic.metadata.refresh.interval.ms", "60000"); + test_conf_set(conf, "fetch.error.backoff.ms", "1000"); + + c = test_create_consumer("mygroup", NULL, conf, NULL); + + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 1 /*Broker 1*/, 1 /*FetchRequest*/, 10, + RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0, + RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0, + RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0, + RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0, + RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0, + RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0, + RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0, + RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0, + RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0, + RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0); + + + test_consumer_assign_partition("REPLICA_NOT_AVAILABLE", c, topic, 0, + RD_KAFKA_OFFSET_INVALID); + + test_consumer_poll_no_msgs("Wait initial metadata", c, 0, 2000); + + /* Switch leader to broker 2 so that metadata is updated, + * causing the consumer to start fetching from the new leader. */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + + test_consumer_poll("Consume", c, 0, 1, 0, msgcnt, NULL); + + test_consumer_close(c); + + rd_kafka_destroy(c); + + test_mock_cluster_destroy(mcluster); + + TEST_SAY(_C_GRN "[ Test REPLICA_NOT_AVAILABLE PASSED ]\n"); +} + +/** + * @brief With an error \p err on a Fetch request should query for the new + * leader or preferred replica and refresh metadata. + */ +static void do_test_delegate_to_leader_on_error(rd_kafka_resp_err_t err) { + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + rd_kafka_t *c; + const char *topic = "test"; + const int msgcnt = 1000; + const char *errstr = rd_kafka_err2name(err); + + TEST_SAY(_C_MAG "[ Test %s ]\n", errstr); + + mcluster = test_mock_cluster_new(3, &bootstraps); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 10, + "bootstrap.servers", bootstraps, + "batch.num.messages", "10", NULL); + + /* Set partition leader to broker 1. */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "client.rack", "myrack"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "topic.metadata.refresh.interval.ms", "60000"); + test_conf_set(conf, "fetch.error.backoff.ms", "1000"); + + c = test_create_consumer("mygroup", NULL, conf, NULL); + + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 1 /*Broker 1*/, 1 /*FetchRequest*/, 10, err, 0, err, 0, + err, 0, err, 0, err, 0, err, 0, err, 0, err, 0, err, 0, err, 0); + + + test_consumer_assign_partition(errstr, c, topic, 0, + RD_KAFKA_OFFSET_INVALID); + + test_consumer_poll_no_msgs("Wait initial metadata", c, 0, 2000); + + /* Switch leader to broker 2 so that metadata is updated, + * causing the consumer to start fetching from the new leader. */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + + test_consumer_poll_timeout("Consume", c, 0, 1, 0, msgcnt, NULL, 2000); + + test_consumer_close(c); + + rd_kafka_destroy(c); + + test_mock_cluster_destroy(mcluster); + + TEST_SAY(_C_GRN "[ Test %s ]\n", errstr); +} + +/** + * @brief Test when the preferred replica is no longer a follower of the + * partition leader. We should try fetch from the leader instead. + */ +static void do_test_not_leader_or_follower(void) { + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + rd_kafka_t *c; + const char *topic = "test"; + const int msgcnt = 10; + + TEST_SAY(_C_MAG "[ Test NOT_LEADER_OR_FOLLOWER ]\n"); + + mcluster = test_mock_cluster_new(3, &bootstraps); + /* Set partition leader to broker 1. */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 2); + + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "client.rack", "myrack"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "topic.metadata.refresh.interval.ms", "60000"); + test_conf_set(conf, "fetch.error.backoff.ms", "1000"); + test_conf_set(conf, "fetch.message.max.bytes", "10"); + + c = test_create_consumer("mygroup", NULL, conf, NULL); + + test_consumer_assign_partition("NOT_LEADER_OR_FOLLOWER", c, topic, 0, + RD_KAFKA_OFFSET_INVALID); + + /* Since there are no messages, this poll only waits for metadata, and + * then sets the preferred replica after the first fetch request. */ + test_consumer_poll_no_msgs("Initial metadata and preferred replica set", + c, 0, 2000); + + /* Change the follower, so that the preferred replica is no longer the + * leader or follower. */ + rd_kafka_mock_partition_set_follower(mcluster, topic, 0, -1); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 1000, + "bootstrap.servers", bootstraps, + "batch.num.messages", "10", NULL); + + /* On getting a NOT_LEADER_OR_FOLLOWER error, we should change to the + * leader and fetch from there without timing out. */ + test_msgver_t mv; + test_msgver_init(&mv, 0); + test_consumer_poll_timeout("from leader", c, 0, 1, 0, msgcnt, &mv, + 2000); + test_msgver_verify0( + __FUNCTION__, __LINE__, "broker_id", &mv, TEST_MSGVER_BY_BROKER_ID, + (struct test_mv_vs) { + .msg_base = 0, .exp_cnt = msgcnt, .broker_id = 1}); + test_msgver_clear(&mv); + + test_consumer_close(c); + + rd_kafka_destroy(c); + + test_mock_cluster_destroy(mcluster); + + TEST_SAY(_C_GRN "[ Test NOT_LEADER_OR_FOLLOWER PASSED ]\n"); +} + + +/** + * @brief Test when the preferred replica broker goes down. When a broker is + * going down, we should delegate all its partitions to their leaders. + */ +static void do_test_follower_down(void) { + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + rd_kafka_t *c; + const char *topic = "test"; + const int msgcnt = 10; + + TEST_SAY(_C_MAG "[ Test with follower down ]\n"); + + mcluster = test_mock_cluster_new(3, &bootstraps); + /* Set partition leader to broker 1. */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 2); + + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "client.rack", "myrack"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "topic.metadata.refresh.interval.ms", "60000"); + test_conf_set(conf, "fetch.error.backoff.ms", "1000"); + test_conf_set(conf, "fetch.message.max.bytes", "10"); + + c = test_create_consumer("mygroup", NULL, conf, NULL); + + test_consumer_assign_partition("follower down", c, topic, 0, + RD_KAFKA_OFFSET_INVALID); + + /* Since there are no messages, this poll only waits for metadata, and + * then sets the preferred replica after the first fetch request. */ + test_consumer_poll_no_msgs("Initial metadata and preferred replica set", + c, 0, 2000); + + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 1000, + "bootstrap.servers", bootstraps, + "batch.num.messages", "10", NULL); + + /* Set follower down. When follower is set as DOWN, we also expect + * that the cluster itself knows and does not ask us to change our + * preferred replica to the broker which is down. To facilitate this, + * we just set the follower to 3 instead of 2. */ + allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT; + test_curr->is_fatal_cb = error_is_fatal_cb; + rd_kafka_mock_broker_set_down(mcluster, 2); + rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 3); + + /* Wee should change to the new follower when the old one goes down, + * and fetch from there without timing out. */ + test_msgver_t mv; + test_msgver_init(&mv, 0); + test_consumer_poll_timeout("from other follower", c, 0, 1, 0, msgcnt, + &mv, 2000); + test_msgver_verify0( + __FUNCTION__, __LINE__, "broker_id", &mv, TEST_MSGVER_BY_BROKER_ID, + (struct test_mv_vs) { + .msg_base = 0, .exp_cnt = msgcnt, .broker_id = 3}); + test_msgver_clear(&mv); + + test_consumer_close(c); + + rd_kafka_destroy(c); + + test_mock_cluster_destroy(mcluster); + + TEST_SAY(_C_GRN "[ Test with follower down PASSED ]\n"); +} + + +/** + * @brief When a seek is done with a leader epoch, + * the expected behavior is to validate it and + * start fetching from the end offset of that epoch if + * less than current offset. + * This is possible in case of external group offsets storage, + * associated with an unclean leader election. + */ +static void do_test_seek_to_offset_with_previous_epoch(void) { + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + rd_kafka_conf_t *conf; + rd_kafka_t *c; + const char *topic = "test"; + const int msgcnt = 10; + const size_t msgsize = 1000; + rd_kafka_topic_partition_list_t *rktpars; + rd_kafka_topic_partition_t *rktpar; + + SUB_TEST_QUICK(); + + mcluster = test_mock_cluster_new(3, &bootstraps); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, msgsize, + "bootstrap.servers", bootstraps, NULL); + + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "auto.offset.reset", "earliest"); + + c = test_create_consumer("mygroup", NULL, conf, NULL); + + test_consumer_assign_partition("zero", c, topic, 0, + RD_KAFKA_OFFSET_INVALID); + + test_consumer_poll("first", c, 0, 0, msgcnt, msgcnt, NULL); + + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, msgsize, + "bootstrap.servers", bootstraps, NULL); + + test_consumer_poll("second", c, 0, 0, msgcnt, msgcnt, NULL); + + rktpars = rd_kafka_topic_partition_list_new(1); + rktpar = rd_kafka_topic_partition_list_add(rktpars, topic, 0); + rktpar->offset = msgcnt * 2; + /* Will validate the offset at start fetching again + * from offset 'msgcnt'. */ + rd_kafka_topic_partition_set_leader_epoch(rktpar, 0); + rd_kafka_seek_partitions(c, rktpars, -1); + + test_consumer_poll("third", c, 0, 0, msgcnt, msgcnt, NULL); + + test_consumer_close(c); + rd_kafka_destroy(c); + + test_mock_cluster_destroy(mcluster); + + SUB_TEST_PASS(); +} + + +int main_0104_fetch_from_follower_mock(int argc, char **argv) { + + if (test_needs_auth()) { + TEST_SKIP("Mock cluster does not support SSL/SASL\n"); + return 0; + } + + do_test_offset_reset("earliest"); + do_test_offset_reset("latest"); + + do_test_offset_reset_lag(); + + do_test_unknown_follower(); + + do_test_replica_not_available(); + + do_test_delegate_to_leader_on_error( + RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE); + + do_test_not_leader_or_follower(); + + do_test_follower_down(); + + do_test_seek_to_offset_with_previous_epoch(); + + return 0; +} |