summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0104-fetch_from_follower_mock.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0104-fetch_from_follower_mock.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/0104-fetch_from_follower_mock.c617
1 files changed, 617 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0104-fetch_from_follower_mock.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0104-fetch_from_follower_mock.c
new file mode 100644
index 000000000..1ecf99da3
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0104-fetch_from_follower_mock.c
@@ -0,0 +1,617 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2019, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+
+/**
+ * @name Fetch from follower tests using the mock broker.
+ */
+
+static int allowed_error;
+
+/**
+ * @brief Decide what error_cb's will cause the test to fail.
+ */
+static int
+error_is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) {
+ if (err == allowed_error ||
+ /* If transport errors are allowed then it is likely
+ * that we'll also see ALL_BROKERS_DOWN. */
+ (allowed_error == RD_KAFKA_RESP_ERR__TRANSPORT &&
+ err == RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN)) {
+ TEST_SAY("Ignoring allowed error: %s: %s\n",
+ rd_kafka_err2name(err), reason);
+ return 0;
+ }
+ return 1;
+}
+
+
+/**
+ * @brief Test offset reset when fetching from replica.
+ * Since the highwatermark is in sync with the leader the
+ * ERR_OFFSETS_OUT_OF_RANGE is trusted by the consumer and
+ * a reset is performed. See do_test_offset_reset_lag()
+ * for the case where the replica is lagging and can't be trusted.
+ */
+static void do_test_offset_reset(const char *auto_offset_reset) {
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *c;
+ const char *topic = "test";
+ const int msgcnt = 1000;
+ const size_t msgsize = 1000;
+
+ TEST_SAY(_C_MAG "[ Test FFF auto.offset.reset=%s ]\n",
+ auto_offset_reset);
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+
+ /* Seed the topic with messages */
+ test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, msgsize,
+ "bootstrap.servers", bootstraps,
+ "batch.num.messages", "10", NULL);
+
+ /* Set partition leader to broker 1, follower to broker 2 */
+ rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
+ rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 2);
+
+ test_conf_init(&conf, NULL, 0);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "client.rack", "myrack");
+ test_conf_set(conf, "auto.offset.reset", auto_offset_reset);
+ /* Make sure we don't consume the entire partition in one Fetch */
+ test_conf_set(conf, "fetch.message.max.bytes", "100");
+
+ c = test_create_consumer("mygroup", NULL, conf, NULL);
+
+ /* The first fetch will go to the leader which will redirect
+ * the consumer to the follower, the second and sub-sequent fetches
+ * will go to the follower. We want the third fetch, second one on
+ * the follower, to fail and trigger an offset reset. */
+ rd_kafka_mock_push_request_errors(
+ mcluster, 1 /*FetchRequest*/, 3,
+ RD_KAFKA_RESP_ERR_NO_ERROR /*leader*/,
+ RD_KAFKA_RESP_ERR_NO_ERROR /*follower*/,
+ RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE /*follower: fail*/);
+
+ test_consumer_assign_partition(auto_offset_reset, c, topic, 0,
+ RD_KAFKA_OFFSET_INVALID);
+
+ if (!strcmp(auto_offset_reset, "latest"))
+ test_consumer_poll_no_msgs(auto_offset_reset, c, 0, 5000);
+ else
+ test_consumer_poll(auto_offset_reset, c, 0, 1, 0, msgcnt, NULL);
+
+ test_consumer_close(c);
+
+ rd_kafka_destroy(c);
+
+ test_mock_cluster_destroy(mcluster);
+
+ TEST_SAY(_C_GRN "[ Test FFF auto.offset.reset=%s PASSED ]\n",
+ auto_offset_reset);
+}
+
+
+/**
+ * @brief Test offset reset when fetching from a lagging replica
+ * who's high-watermark is behind the leader, which means
+ * an offset reset should not be triggered.
+ */
+static void do_test_offset_reset_lag(void) {
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *c;
+ const char *topic = "test";
+ const int msgcnt = 10;
+ const int lag = 3;
+ const size_t msgsize = 1000;
+
+ TEST_SAY(_C_MAG "[ Test lagging FFF offset reset ]\n");
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+
+ /* Seed the topic with messages */
+ test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, msgsize,
+ "bootstrap.servers", bootstraps,
+ "batch.num.messages", "1", NULL);
+
+ /* Set broker rack */
+ /* Set partition leader to broker 1, follower to broker 2 */
+ rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
+ rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 2);
+
+ /* Make follower lag by some messages
+ * ( .. -1 because offsets start at 0) */
+ rd_kafka_mock_partition_set_follower_wmarks(mcluster, topic, 0, -1,
+ msgcnt - lag - 1);
+
+ test_conf_init(&conf, NULL, 0);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "client.rack", "myrack");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ /* Make sure we don't consume the entire partition in one Fetch */
+ test_conf_set(conf, "fetch.message.max.bytes", "100");
+
+ c = test_create_consumer("mygroup", NULL, conf, NULL);
+
+ test_consumer_assign_partition("lag", c, topic, 0,
+ RD_KAFKA_OFFSET_INVALID);
+
+ /* Should receive all messages up to the followers hwmark */
+ test_consumer_poll("up to wmark", c, 0, 0, 0, msgcnt - lag, NULL);
+
+ /* And then nothing.. as the consumer waits for the replica to
+ * catch up. */
+ test_consumer_poll_no_msgs("no msgs", c, 0, 3000);
+
+ /* Catch up the replica, consumer should now get the
+ * remaining messages */
+ rd_kafka_mock_partition_set_follower_wmarks(mcluster, topic, 0, -1, -1);
+ test_consumer_poll("remaining", c, 0, 1, msgcnt - lag, lag, NULL);
+
+ test_consumer_close(c);
+
+ rd_kafka_destroy(c);
+
+ test_mock_cluster_destroy(mcluster);
+
+ TEST_SAY(_C_GRN "[ Test lagging FFF offset reset PASSED ]\n");
+}
+
+
+/**
+ * @brief Test delegating consumer to a follower that does not exist,
+ * the consumer should not be able to consume any messages (which
+ * is questionable but for a later PR). Then change to a valid
+ * replica and verify messages can be consumed.
+ */
+static void do_test_unknown_follower(void) {
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *c;
+ const char *topic = "test";
+ const int msgcnt = 1000;
+ const size_t msgsize = 1000;
+ test_msgver_t mv;
+
+ TEST_SAY(_C_MAG "[ Test unknown follower ]\n");
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+
+ /* Seed the topic with messages */
+ test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, msgsize,
+ "bootstrap.servers", bootstraps,
+ "batch.num.messages", "10", NULL);
+
+ /* Set partition leader to broker 1, follower
+ * to non-existent broker 19 */
+ rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
+ rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 19);
+
+ test_conf_init(&conf, NULL, 0);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "client.rack", "myrack");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ /* Make sure we don't consume the entire partition in one Fetch */
+ test_conf_set(conf, "fetch.message.max.bytes", "100");
+
+ c = test_create_consumer("mygroup", NULL, conf, NULL);
+
+ test_consumer_assign_partition("unknown follower", c, topic, 0,
+ RD_KAFKA_OFFSET_INVALID);
+
+ test_consumer_poll_no_msgs("unknown follower", c, 0, 5000);
+
+ /* Set a valid follower (broker 3) */
+ rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 3);
+ test_msgver_init(&mv, 0);
+ test_consumer_poll("proper follower", c, 0, 1, 0, msgcnt, &mv);
+ /* Verify messages were indeed received from broker 3 */
+ test_msgver_verify0(
+ __FUNCTION__, __LINE__, "broker_id", &mv, TEST_MSGVER_BY_BROKER_ID,
+ (struct test_mv_vs) {
+ .msg_base = 0, .exp_cnt = msgcnt, .broker_id = 3});
+ test_msgver_clear(&mv);
+
+ test_consumer_close(c);
+
+ rd_kafka_destroy(c);
+
+ test_mock_cluster_destroy(mcluster);
+
+ TEST_SAY(_C_GRN "[ Test unknown follower PASSED ]\n");
+}
+
+
+/**
+ * @brief Issue #2955: Verify that fetch does not stall until next
+ * periodic metadata timeout when leader broker is no longer
+ * a replica.
+ */
+static void do_test_replica_not_available(void) {
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *c;
+ const char *topic = "test";
+ const int msgcnt = 1000;
+
+ TEST_SAY(_C_MAG "[ Test REPLICA_NOT_AVAILABLE ]\n");
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+
+ /* Seed the topic with messages */
+ test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 1000,
+ "bootstrap.servers", bootstraps,
+ "batch.num.messages", "10", NULL);
+
+ /* Set partition leader to broker 1. */
+ rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
+
+ test_conf_init(&conf, NULL, 0);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "client.rack", "myrack");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "60000");
+ test_conf_set(conf, "fetch.error.backoff.ms", "1000");
+
+ c = test_create_consumer("mygroup", NULL, conf, NULL);
+
+ rd_kafka_mock_broker_push_request_error_rtts(
+ mcluster, 1 /*Broker 1*/, 1 /*FetchRequest*/, 10,
+ RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0,
+ RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0,
+ RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0,
+ RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0,
+ RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0,
+ RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0,
+ RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0,
+ RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0,
+ RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0,
+ RD_KAFKA_RESP_ERR_REPLICA_NOT_AVAILABLE, 0);
+
+
+ test_consumer_assign_partition("REPLICA_NOT_AVAILABLE", c, topic, 0,
+ RD_KAFKA_OFFSET_INVALID);
+
+ test_consumer_poll_no_msgs("Wait initial metadata", c, 0, 2000);
+
+ /* Switch leader to broker 2 so that metadata is updated,
+ * causing the consumer to start fetching from the new leader. */
+ rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2);
+
+ test_consumer_poll("Consume", c, 0, 1, 0, msgcnt, NULL);
+
+ test_consumer_close(c);
+
+ rd_kafka_destroy(c);
+
+ test_mock_cluster_destroy(mcluster);
+
+ TEST_SAY(_C_GRN "[ Test REPLICA_NOT_AVAILABLE PASSED ]\n");
+}
+
+/**
+ * @brief With an error \p err on a Fetch request should query for the new
+ * leader or preferred replica and refresh metadata.
+ */
+static void do_test_delegate_to_leader_on_error(rd_kafka_resp_err_t err) {
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *c;
+ const char *topic = "test";
+ const int msgcnt = 1000;
+ const char *errstr = rd_kafka_err2name(err);
+
+ TEST_SAY(_C_MAG "[ Test %s ]\n", errstr);
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+
+ /* Seed the topic with messages */
+ test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 10,
+ "bootstrap.servers", bootstraps,
+ "batch.num.messages", "10", NULL);
+
+ /* Set partition leader to broker 1. */
+ rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
+
+ test_conf_init(&conf, NULL, 0);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "client.rack", "myrack");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "60000");
+ test_conf_set(conf, "fetch.error.backoff.ms", "1000");
+
+ c = test_create_consumer("mygroup", NULL, conf, NULL);
+
+ rd_kafka_mock_broker_push_request_error_rtts(
+ mcluster, 1 /*Broker 1*/, 1 /*FetchRequest*/, 10, err, 0, err, 0,
+ err, 0, err, 0, err, 0, err, 0, err, 0, err, 0, err, 0, err, 0);
+
+
+ test_consumer_assign_partition(errstr, c, topic, 0,
+ RD_KAFKA_OFFSET_INVALID);
+
+ test_consumer_poll_no_msgs("Wait initial metadata", c, 0, 2000);
+
+ /* Switch leader to broker 2 so that metadata is updated,
+ * causing the consumer to start fetching from the new leader. */
+ rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2);
+
+ test_consumer_poll_timeout("Consume", c, 0, 1, 0, msgcnt, NULL, 2000);
+
+ test_consumer_close(c);
+
+ rd_kafka_destroy(c);
+
+ test_mock_cluster_destroy(mcluster);
+
+ TEST_SAY(_C_GRN "[ Test %s ]\n", errstr);
+}
+
+/**
+ * @brief Test when the preferred replica is no longer a follower of the
+ * partition leader. We should try fetch from the leader instead.
+ */
+static void do_test_not_leader_or_follower(void) {
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *c;
+ const char *topic = "test";
+ const int msgcnt = 10;
+
+ TEST_SAY(_C_MAG "[ Test NOT_LEADER_OR_FOLLOWER ]\n");
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+ /* Set partition leader to broker 1. */
+ rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
+ rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 2);
+
+ test_conf_init(&conf, NULL, 0);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "client.rack", "myrack");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "60000");
+ test_conf_set(conf, "fetch.error.backoff.ms", "1000");
+ test_conf_set(conf, "fetch.message.max.bytes", "10");
+
+ c = test_create_consumer("mygroup", NULL, conf, NULL);
+
+ test_consumer_assign_partition("NOT_LEADER_OR_FOLLOWER", c, topic, 0,
+ RD_KAFKA_OFFSET_INVALID);
+
+ /* Since there are no messages, this poll only waits for metadata, and
+ * then sets the preferred replica after the first fetch request. */
+ test_consumer_poll_no_msgs("Initial metadata and preferred replica set",
+ c, 0, 2000);
+
+ /* Change the follower, so that the preferred replica is no longer the
+ * leader or follower. */
+ rd_kafka_mock_partition_set_follower(mcluster, topic, 0, -1);
+
+ /* Seed the topic with messages */
+ test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 1000,
+ "bootstrap.servers", bootstraps,
+ "batch.num.messages", "10", NULL);
+
+ /* On getting a NOT_LEADER_OR_FOLLOWER error, we should change to the
+ * leader and fetch from there without timing out. */
+ test_msgver_t mv;
+ test_msgver_init(&mv, 0);
+ test_consumer_poll_timeout("from leader", c, 0, 1, 0, msgcnt, &mv,
+ 2000);
+ test_msgver_verify0(
+ __FUNCTION__, __LINE__, "broker_id", &mv, TEST_MSGVER_BY_BROKER_ID,
+ (struct test_mv_vs) {
+ .msg_base = 0, .exp_cnt = msgcnt, .broker_id = 1});
+ test_msgver_clear(&mv);
+
+ test_consumer_close(c);
+
+ rd_kafka_destroy(c);
+
+ test_mock_cluster_destroy(mcluster);
+
+ TEST_SAY(_C_GRN "[ Test NOT_LEADER_OR_FOLLOWER PASSED ]\n");
+}
+
+
+/**
+ * @brief Test when the preferred replica broker goes down. When a broker is
+ * going down, we should delegate all its partitions to their leaders.
+ */
+static void do_test_follower_down(void) {
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *c;
+ const char *topic = "test";
+ const int msgcnt = 10;
+
+ TEST_SAY(_C_MAG "[ Test with follower down ]\n");
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+ /* Set partition leader to broker 1. */
+ rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
+ rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 2);
+
+ test_conf_init(&conf, NULL, 0);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "client.rack", "myrack");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "60000");
+ test_conf_set(conf, "fetch.error.backoff.ms", "1000");
+ test_conf_set(conf, "fetch.message.max.bytes", "10");
+
+ c = test_create_consumer("mygroup", NULL, conf, NULL);
+
+ test_consumer_assign_partition("follower down", c, topic, 0,
+ RD_KAFKA_OFFSET_INVALID);
+
+ /* Since there are no messages, this poll only waits for metadata, and
+ * then sets the preferred replica after the first fetch request. */
+ test_consumer_poll_no_msgs("Initial metadata and preferred replica set",
+ c, 0, 2000);
+
+
+ /* Seed the topic with messages */
+ test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, 1000,
+ "bootstrap.servers", bootstraps,
+ "batch.num.messages", "10", NULL);
+
+ /* Set follower down. When follower is set as DOWN, we also expect
+ * that the cluster itself knows and does not ask us to change our
+ * preferred replica to the broker which is down. To facilitate this,
+ * we just set the follower to 3 instead of 2. */
+ allowed_error = RD_KAFKA_RESP_ERR__TRANSPORT;
+ test_curr->is_fatal_cb = error_is_fatal_cb;
+ rd_kafka_mock_broker_set_down(mcluster, 2);
+ rd_kafka_mock_partition_set_follower(mcluster, topic, 0, 3);
+
+ /* Wee should change to the new follower when the old one goes down,
+ * and fetch from there without timing out. */
+ test_msgver_t mv;
+ test_msgver_init(&mv, 0);
+ test_consumer_poll_timeout("from other follower", c, 0, 1, 0, msgcnt,
+ &mv, 2000);
+ test_msgver_verify0(
+ __FUNCTION__, __LINE__, "broker_id", &mv, TEST_MSGVER_BY_BROKER_ID,
+ (struct test_mv_vs) {
+ .msg_base = 0, .exp_cnt = msgcnt, .broker_id = 3});
+ test_msgver_clear(&mv);
+
+ test_consumer_close(c);
+
+ rd_kafka_destroy(c);
+
+ test_mock_cluster_destroy(mcluster);
+
+ TEST_SAY(_C_GRN "[ Test with follower down PASSED ]\n");
+}
+
+
+/**
+ * @brief When a seek is done with a leader epoch,
+ * the expected behavior is to validate it and
+ * start fetching from the end offset of that epoch if
+ * less than current offset.
+ * This is possible in case of external group offsets storage,
+ * associated with an unclean leader election.
+ */
+static void do_test_seek_to_offset_with_previous_epoch(void) {
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *c;
+ const char *topic = "test";
+ const int msgcnt = 10;
+ const size_t msgsize = 1000;
+ rd_kafka_topic_partition_list_t *rktpars;
+ rd_kafka_topic_partition_t *rktpar;
+
+ SUB_TEST_QUICK();
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+
+ /* Seed the topic with messages */
+ test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, msgsize,
+ "bootstrap.servers", bootstraps, NULL);
+
+ test_conf_init(&conf, NULL, 0);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+
+ c = test_create_consumer("mygroup", NULL, conf, NULL);
+
+ test_consumer_assign_partition("zero", c, topic, 0,
+ RD_KAFKA_OFFSET_INVALID);
+
+ test_consumer_poll("first", c, 0, 0, msgcnt, msgcnt, NULL);
+
+ rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2);
+
+ /* Seed the topic with messages */
+ test_produce_msgs_easy_v(topic, 0, 0, 0, msgcnt, msgsize,
+ "bootstrap.servers", bootstraps, NULL);
+
+ test_consumer_poll("second", c, 0, 0, msgcnt, msgcnt, NULL);
+
+ rktpars = rd_kafka_topic_partition_list_new(1);
+ rktpar = rd_kafka_topic_partition_list_add(rktpars, topic, 0);
+ rktpar->offset = msgcnt * 2;
+ /* Will validate the offset at start fetching again
+ * from offset 'msgcnt'. */
+ rd_kafka_topic_partition_set_leader_epoch(rktpar, 0);
+ rd_kafka_seek_partitions(c, rktpars, -1);
+
+ test_consumer_poll("third", c, 0, 0, msgcnt, msgcnt, NULL);
+
+ test_consumer_close(c);
+ rd_kafka_destroy(c);
+
+ test_mock_cluster_destroy(mcluster);
+
+ SUB_TEST_PASS();
+}
+
+
+int main_0104_fetch_from_follower_mock(int argc, char **argv) {
+
+ if (test_needs_auth()) {
+ TEST_SKIP("Mock cluster does not support SSL/SASL\n");
+ return 0;
+ }
+
+ do_test_offset_reset("earliest");
+ do_test_offset_reset("latest");
+
+ do_test_offset_reset_lag();
+
+ do_test_unknown_follower();
+
+ do_test_replica_not_available();
+
+ do_test_delegate_to_leader_on_error(
+ RD_KAFKA_RESP_ERR_OFFSET_NOT_AVAILABLE);
+
+ do_test_not_leader_or_follower();
+
+ do_test_follower_down();
+
+ do_test_seek_to_offset_with_previous_epoch();
+
+ return 0;
+}