summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0106-cgrp_sess_timeout.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:07:37 +0000
commitb485aab7e71c1625cfc27e0f92c9509f42378458 (patch)
treeae9abe108601079d1679194de237c9a435ae5b55 /src/fluent-bit/lib/librdkafka-2.1.0/tests/0106-cgrp_sess_timeout.c
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz
netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0106-cgrp_sess_timeout.c')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0106-cgrp_sess_timeout.c300
1 files changed, 300 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0106-cgrp_sess_timeout.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0106-cgrp_sess_timeout.c
new file mode 100644
index 000000000..0451e4a00
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0106-cgrp_sess_timeout.c
@@ -0,0 +1,300 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2020, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+#include "../src/rdkafka_proto.h"
+
+
+/**
+ * @name Verify that the high-level consumer times out itself if
+ * heartbeats are not successful (issue #2631).
+ */
+
+static const char *commit_type;
+static int rebalance_cnt;
+static rd_kafka_resp_err_t rebalance_exp_event;
+static rd_kafka_resp_err_t commit_exp_err;
+
+static void rebalance_cb(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *parts,
+ void *opaque) {
+
+ rebalance_cnt++;
+ TEST_SAY("Rebalance #%d: %s: %d partition(s)\n", rebalance_cnt,
+ rd_kafka_err2name(err), parts->cnt);
+
+ TEST_ASSERT(
+ err == rebalance_exp_event, "Expected rebalance event %s, not %s",
+ rd_kafka_err2name(rebalance_exp_event), rd_kafka_err2name(err));
+
+ if (err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
+ test_consumer_assign("assign", rk, parts);
+ } else {
+ rd_kafka_resp_err_t commit_err;
+
+ if (strcmp(commit_type, "auto")) {
+ rd_kafka_resp_err_t perr;
+
+ TEST_SAY("Performing %s commit\n", commit_type);
+
+ perr = rd_kafka_position(rk, parts);
+ TEST_ASSERT(!perr, "Failed to acquire position: %s",
+ rd_kafka_err2str(perr));
+
+ /* Sleep a short while so the broker times out the
+ * member too. */
+ rd_sleep(1);
+
+ commit_err = rd_kafka_commit(
+ rk, parts, !strcmp(commit_type, "async"));
+
+ if (!strcmp(commit_type, "async"))
+ TEST_ASSERT(!commit_err,
+ "Async commit should not fail, "
+ "but it returned %s",
+ rd_kafka_err2name(commit_err));
+ else
+ TEST_ASSERT(
+ commit_err == commit_exp_err ||
+ (!commit_exp_err &&
+ commit_err ==
+ RD_KAFKA_RESP_ERR__NO_OFFSET),
+ "Expected %s commit to return %s, "
+ "not %s",
+ commit_type,
+ rd_kafka_err2name(commit_exp_err),
+ rd_kafka_err2name(commit_err));
+ }
+
+ test_consumer_unassign("unassign", rk);
+ }
+
+ /* Make sure only one rebalance callback is served per poll()
+ * so that expect_rebalance() returns to the test logic on each
+ * rebalance. */
+ rd_kafka_yield(rk);
+}
+
+
+/**
+ * @brief Wait for an expected rebalance event, or fail.
+ */
+static void expect_rebalance(const char *what,
+ rd_kafka_t *c,
+ rd_kafka_resp_err_t exp_event,
+ int timeout_s) {
+ int64_t tmout = test_clock() + (timeout_s * 1000000);
+ int start_cnt = rebalance_cnt;
+
+ TEST_SAY("Waiting for %s (%s) for %ds\n", what,
+ rd_kafka_err2name(exp_event), timeout_s);
+
+ rebalance_exp_event = exp_event;
+
+ while (tmout > test_clock() && rebalance_cnt == start_cnt) {
+ if (test_consumer_poll_once(c, NULL, 1000))
+ rd_sleep(1);
+ }
+
+ if (rebalance_cnt == start_cnt + 1) {
+ rebalance_exp_event = RD_KAFKA_RESP_ERR_NO_ERROR;
+ return;
+ }
+
+ TEST_FAIL("Timed out waiting for %s (%s)\n", what,
+ rd_kafka_err2name(exp_event));
+}
+
+
+/**
+ * @brief Verify that session timeouts are handled by the consumer itself.
+ *
+ * @param use_commit_type "auto", "sync" (manual), "async" (manual)
+ */
+static void do_test_session_timeout(const char *use_commit_type) {
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *c;
+ const char *groupid = "mygroup";
+ const char *topic = "test";
+
+ rebalance_cnt = 0;
+ commit_type = use_commit_type;
+
+ SUB_TEST0(!strcmp(use_commit_type, "sync") /*quick*/,
+ "Test session timeout with %s commit", use_commit_type);
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+
+ rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
+
+ /* Seed the topic with messages */
+ test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 10, "bootstrap.servers",
+ bootstraps, "batch.num.messages", "10", NULL);
+
+ test_conf_init(&conf, NULL, 30);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "security.protocol", "PLAINTEXT");
+ test_conf_set(conf, "group.id", groupid);
+ test_conf_set(conf, "session.timeout.ms", "5000");
+ test_conf_set(conf, "heartbeat.interval.ms", "1000");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ test_conf_set(conf, "enable.auto.commit",
+ !strcmp(commit_type, "auto") ? "true" : "false");
+
+ c = test_create_consumer(groupid, rebalance_cb, conf, NULL);
+
+ test_consumer_subscribe(c, topic);
+
+ /* Let Heartbeats fail after a couple of successful ones */
+ rd_kafka_mock_push_request_errors(
+ mcluster, RD_KAFKAP_Heartbeat, 9, RD_KAFKA_RESP_ERR_NO_ERROR,
+ RD_KAFKA_RESP_ERR_NO_ERROR, RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
+ RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
+ RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
+ RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
+ RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
+ RD_KAFKA_RESP_ERR_NOT_COORDINATOR,
+ RD_KAFKA_RESP_ERR_NOT_COORDINATOR);
+
+ expect_rebalance("initial assignment", c,
+ RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, 5 + 2);
+
+ /* Consume a couple of messages so that we have something to commit */
+ test_consumer_poll("consume", c, 0, -1, 0, 10, NULL);
+
+ /* The commit in the rebalance callback should fail when the
+ * member has timed out from the group. */
+ commit_exp_err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID;
+
+ expect_rebalance("session timeout revoke", c,
+ RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, 2 + 5 + 2);
+
+ expect_rebalance("second assignment", c,
+ RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, 5 + 2);
+
+ /* Final rebalance in close().
+ * Its commit will work. */
+ rebalance_exp_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+ commit_exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+
+ test_consumer_close(c);
+
+ rd_kafka_destroy(c);
+
+ test_mock_cluster_destroy(mcluster);
+
+ SUB_TEST_PASS();
+}
+
+
+/**
+ * @brief Attempt manual commit when assignment has been lost (#3217)
+ */
+static void do_test_commit_on_lost(void) {
+ const char *bootstraps;
+ rd_kafka_mock_cluster_t *mcluster;
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *c;
+ const char *groupid = "mygroup";
+ const char *topic = "test";
+ rd_kafka_resp_err_t err;
+
+ SUB_TEST();
+
+ test_curr->is_fatal_cb = test_error_is_not_fatal_cb;
+
+ mcluster = test_mock_cluster_new(3, &bootstraps);
+
+ rd_kafka_mock_coordinator_set(mcluster, "group", groupid, 1);
+
+ /* Seed the topic with messages */
+ test_produce_msgs_easy_v(topic, 0, 0, 0, 100, 10, "bootstrap.servers",
+ bootstraps, "batch.num.messages", "10", NULL);
+
+ test_conf_init(&conf, NULL, 30);
+ test_conf_set(conf, "bootstrap.servers", bootstraps);
+ test_conf_set(conf, "security.protocol", "PLAINTEXT");
+ test_conf_set(conf, "group.id", groupid);
+ test_conf_set(conf, "session.timeout.ms", "5000");
+ test_conf_set(conf, "heartbeat.interval.ms", "1000");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ test_conf_set(conf, "enable.auto.commit", "false");
+
+ c = test_create_consumer(groupid, test_rebalance_cb, conf, NULL);
+
+ test_consumer_subscribe(c, topic);
+
+ /* Consume a couple of messages so that we have something to commit */
+ test_consumer_poll("consume", c, 0, -1, 0, 10, NULL);
+
+ /* Make the coordinator unreachable, this will cause a local session
+ * timeout followed by a revoke and assignment lost. */
+ rd_kafka_mock_broker_set_down(mcluster, 1);
+
+ /* Wait until the assignment is lost */
+ TEST_SAY("Waiting for assignment to be lost...\n");
+ while (!rd_kafka_assignment_lost(c))
+ rd_sleep(1);
+
+ TEST_SAY("Assignment is lost, committing\n");
+ /* Perform manual commit */
+ err = rd_kafka_commit(c, NULL, 0 /*sync*/);
+ TEST_SAY("commit() returned: %s\n", rd_kafka_err2name(err));
+ TEST_ASSERT(err, "expected commit to fail");
+
+ test_consumer_close(c);
+
+ rd_kafka_destroy(c);
+
+ test_mock_cluster_destroy(mcluster);
+
+ test_curr->is_fatal_cb = NULL;
+
+ SUB_TEST_PASS();
+}
+
+
+int main_0106_cgrp_sess_timeout(int argc, char **argv) {
+
+ if (test_needs_auth()) {
+ TEST_SKIP("Mock cluster does not support SSL/SASL\n");
+ return 0;
+ }
+
+ do_test_session_timeout("sync");
+ do_test_session_timeout("async");
+ do_test_session_timeout("auto");
+
+ do_test_commit_on_lost();
+
+ return 0;
+}