summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0102-static_group_rebalance.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0102-static_group_rebalance.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/0102-static_group_rebalance.c535
1 files changed, 535 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0102-static_group_rebalance.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0102-static_group_rebalance.c
new file mode 100644
index 000000000..231a09065
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0102-static_group_rebalance.c
@@ -0,0 +1,535 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2019, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+
+/**
+ * @name KafkaConsumer static membership tests
+ *
+ * Runs two consumers subscribing to multiple topics simulating various
+ * rebalance scenarios with static group membership enabled.
+ */
+
+#define _CONSUMER_CNT 2
+
+typedef struct _consumer_s {
+ rd_kafka_t *rk;
+ test_msgver_t *mv;
+ int64_t assigned_at;
+ int64_t revoked_at;
+ int partition_cnt;
+ rd_kafka_resp_err_t expected_rb_event;
+ int curr_line;
+} _consumer_t;
+
+
+/**
+ * @brief Call poll until a rebalance has been triggered
+ */
+static int static_member_wait_rebalance0(int line,
+ _consumer_t *c,
+ int64_t start,
+ int64_t *target,
+ int timeout_ms) {
+ int64_t tmout = test_clock() + (timeout_ms * 1000);
+ test_timing_t t_time;
+
+ c->curr_line = line;
+
+ TEST_SAY("line %d: %s awaiting %s event\n", line, rd_kafka_name(c->rk),
+ rd_kafka_err2name(c->expected_rb_event));
+
+ TIMING_START(&t_time, "wait_rebalance");
+ while (timeout_ms < 0 ? 1 : test_clock() <= tmout) {
+ if (*target > start) {
+ c->curr_line = 0;
+ return 1;
+ }
+ test_consumer_poll_once(c->rk, c->mv, 1000);
+ }
+ TIMING_STOP(&t_time);
+
+ c->curr_line = 0;
+
+ TEST_SAY("line %d: %s timed out awaiting %s event\n", line,
+ rd_kafka_name(c->rk), rd_kafka_err2name(c->expected_rb_event));
+
+ return 0;
+}
+
+#define static_member_expect_rebalance(C, START, TARGET, TIMEOUT_MS) \
+ do { \
+ if (!static_member_wait_rebalance0(__LINE__, C, START, TARGET, \
+ TIMEOUT_MS)) \
+ TEST_FAIL("%s: timed out waiting for %s event", \
+ rd_kafka_name((C)->rk), \
+ rd_kafka_err2name((C)->expected_rb_event)); \
+ } while (0)
+
+#define static_member_wait_rebalance(C, START, TARGET, TIMEOUT_MS) \
+ static_member_wait_rebalance0(__LINE__, C, START, TARGET, TIMEOUT_MS)
+
+
+static void rebalance_cb(rd_kafka_t *rk,
+ rd_kafka_resp_err_t err,
+ rd_kafka_topic_partition_list_t *parts,
+ void *opaque) {
+ _consumer_t *c = opaque;
+
+ TEST_ASSERT(c->expected_rb_event == err,
+ "line %d: %s: Expected rebalance event %s got %s\n",
+ c->curr_line, rd_kafka_name(rk),
+ rd_kafka_err2name(c->expected_rb_event),
+ rd_kafka_err2name(err));
+
+ switch (err) {
+ case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
+ TEST_SAY("line %d: %s Assignment (%d partition(s)):\n",
+ c->curr_line, rd_kafka_name(rk), parts->cnt);
+ test_print_partition_list(parts);
+
+ c->partition_cnt = parts->cnt;
+ c->assigned_at = test_clock();
+ rd_kafka_assign(rk, parts);
+
+ break;
+
+ case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
+ c->revoked_at = test_clock();
+ rd_kafka_assign(rk, NULL);
+ TEST_SAY("line %d: %s revoked %d partitions\n", c->curr_line,
+ rd_kafka_name(c->rk), parts->cnt);
+
+ break;
+
+ default:
+ TEST_FAIL("rebalance failed: %s", rd_kafka_err2str(err));
+ break;
+ }
+
+ /* Reset error */
+ c->expected_rb_event = RD_KAFKA_RESP_ERR_NO_ERROR;
+
+ /* prevent poll from triggering more than one rebalance event */
+ rd_kafka_yield(rk);
+}
+
+
+static void do_test_static_group_rebalance(void) {
+ rd_kafka_conf_t *conf;
+ test_msgver_t mv;
+ int64_t rebalance_start;
+ _consumer_t c[_CONSUMER_CNT] = RD_ZERO_INIT;
+ const int msgcnt = 100;
+ uint64_t testid = test_id_generate();
+ const char *topic =
+ test_mk_topic_name("0102_static_group_rebalance", 1);
+ char *topics = rd_strdup(tsprintf("^%s.*", topic));
+ test_timing_t t_close;
+
+ SUB_TEST();
+
+ test_conf_init(&conf, NULL, 70);
+ test_msgver_init(&mv, testid);
+ c[0].mv = &mv;
+ c[1].mv = &mv;
+
+ test_create_topic(NULL, topic, 3, 1);
+ test_produce_msgs_easy(topic, testid, RD_KAFKA_PARTITION_UA, msgcnt);
+
+ test_conf_set(conf, "max.poll.interval.ms", "9000");
+ test_conf_set(conf, "session.timeout.ms", "6000");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ test_conf_set(conf, "topic.metadata.refresh.interval.ms", "500");
+ test_conf_set(conf, "metadata.max.age.ms", "5000");
+ test_conf_set(conf, "enable.partition.eof", "true");
+ test_conf_set(conf, "group.instance.id", "consumer1");
+
+ rd_kafka_conf_set_opaque(conf, &c[0]);
+ c[0].rk = test_create_consumer(topic, rebalance_cb,
+ rd_kafka_conf_dup(conf), NULL);
+
+ rd_kafka_conf_set_opaque(conf, &c[1]);
+ test_conf_set(conf, "group.instance.id", "consumer2");
+ c[1].rk = test_create_consumer(topic, rebalance_cb,
+ rd_kafka_conf_dup(conf), NULL);
+ rd_kafka_conf_destroy(conf);
+
+ test_wait_topic_exists(c[1].rk, topic, 5000);
+
+ test_consumer_subscribe(c[0].rk, topics);
+ test_consumer_subscribe(c[1].rk, topics);
+
+ /*
+ * Static members enforce `max.poll.interval.ms` which may prompt
+ * an unwanted rebalance while the other consumer awaits its assignment.
+ * These members remain in the member list however so we must
+ * interleave calls to poll while awaiting our assignment to avoid
+ * unexpected rebalances being triggered.
+ */
+ rebalance_start = test_clock();
+ c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+ c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+ while (!static_member_wait_rebalance(&c[0], rebalance_start,
+ &c[0].assigned_at, 1000)) {
+ /* keep consumer 2 alive while consumer 1 awaits
+ * its assignment
+ */
+ c[1].curr_line = __LINE__;
+ test_consumer_poll_once(c[1].rk, &mv, 0);
+ }
+
+ static_member_expect_rebalance(&c[1], rebalance_start,
+ &c[1].assigned_at, -1);
+
+ /*
+ * Consume all the messages so we can watch for duplicates
+ * after rejoin/rebalance operations.
+ */
+ c[0].curr_line = __LINE__;
+ test_consumer_poll("serve.queue", c[0].rk, testid, c[0].partition_cnt,
+ 0, -1, &mv);
+ c[1].curr_line = __LINE__;
+ test_consumer_poll("serve.queue", c[1].rk, testid, c[1].partition_cnt,
+ 0, -1, &mv);
+
+ test_msgver_verify("first.verify", &mv, TEST_MSGVER_ALL, 0, msgcnt);
+
+ TEST_SAY("== Testing consumer restart ==\n");
+ conf = rd_kafka_conf_dup(rd_kafka_conf(c[1].rk));
+
+ /* Only c[1] should exhibit rebalance behavior */
+ c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+ TIMING_START(&t_close, "consumer restart");
+ test_consumer_close(c[1].rk);
+ rd_kafka_destroy(c[1].rk);
+
+ c[1].rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
+ rd_kafka_poll_set_consumer(c[1].rk);
+
+ test_consumer_subscribe(c[1].rk, topics);
+
+ /* Await assignment */
+ c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+ rebalance_start = test_clock();
+ while (!static_member_wait_rebalance(&c[1], rebalance_start,
+ &c[1].assigned_at, 1000)) {
+ c[0].curr_line = __LINE__;
+ test_consumer_poll_once(c[0].rk, &mv, 0);
+ }
+ TIMING_STOP(&t_close);
+
+ /* Should complete before `session.timeout.ms` */
+ TIMING_ASSERT(&t_close, 0, 6000);
+
+
+ TEST_SAY("== Testing subscription expansion ==\n");
+
+ /*
+ * New topics matching the subscription pattern should cause
+ * group rebalance
+ */
+ test_create_topic(c->rk, tsprintf("%snew", topic), 1, 1);
+
+ /* Await revocation */
+ rebalance_start = test_clock();
+ c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+ c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+ while (!static_member_wait_rebalance(&c[0], rebalance_start,
+ &c[0].revoked_at, 1000)) {
+ c[1].curr_line = __LINE__;
+ test_consumer_poll_once(c[1].rk, &mv, 0);
+ }
+
+ static_member_expect_rebalance(&c[1], rebalance_start, &c[1].revoked_at,
+ -1);
+
+ /* Await assignment */
+ c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+ c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+ while (!static_member_wait_rebalance(&c[0], rebalance_start,
+ &c[0].assigned_at, 1000)) {
+ c[1].curr_line = __LINE__;
+ test_consumer_poll_once(c[1].rk, &mv, 0);
+ }
+
+ static_member_expect_rebalance(&c[1], rebalance_start,
+ &c[1].assigned_at, -1);
+
+ TEST_SAY("== Testing consumer unsubscribe ==\n");
+
+ /* Unsubscribe should send a LeaveGroupRequest invoking a rebalance */
+
+ /* Send LeaveGroup incrementing generation by 1 */
+ rebalance_start = test_clock();
+ rd_kafka_unsubscribe(c[1].rk);
+
+ /* Await revocation */
+ c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+ c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+ static_member_expect_rebalance(&c[1], rebalance_start, &c[1].revoked_at,
+ -1);
+ static_member_expect_rebalance(&c[0], rebalance_start, &c[0].revoked_at,
+ -1);
+
+ /* New cgrp generation with 1 member, c[0] */
+ c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+ static_member_expect_rebalance(&c[0], rebalance_start,
+ &c[0].assigned_at, -1);
+
+ /* Send JoinGroup bumping generation by 1 */
+ rebalance_start = test_clock();
+ test_consumer_subscribe(c[1].rk, topics);
+
+ /* End previous single member generation */
+ c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+ static_member_expect_rebalance(&c[0], rebalance_start, &c[0].revoked_at,
+ -1);
+
+ /* Await assignment */
+ c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+ c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+ while (!static_member_wait_rebalance(&c[1], rebalance_start,
+ &c[1].assigned_at, 1000)) {
+ c[0].curr_line = __LINE__;
+ test_consumer_poll_once(c[0].rk, &mv, 0);
+ }
+
+ static_member_expect_rebalance(&c[0], rebalance_start,
+ &c[0].assigned_at, -1);
+
+ TEST_SAY("== Testing max poll violation ==\n");
+ /* max.poll.interval.ms should still be enforced by the consumer */
+
+ /*
+ * Block long enough for consumer 2 to be evicted from the group
+ * `max.poll.interval.ms` + `session.timeout.ms`
+ */
+ rebalance_start = test_clock();
+ c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+ c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+ c[0].curr_line = __LINE__;
+ test_consumer_poll_no_msgs("wait.max.poll", c[0].rk, testid,
+ 6000 + 9000);
+ c[1].curr_line = __LINE__;
+ test_consumer_poll_expect_err(c[1].rk, testid, 1000,
+ RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED);
+
+ /* Await revocation */
+ while (!static_member_wait_rebalance(&c[0], rebalance_start,
+ &c[0].revoked_at, 1000)) {
+ c[1].curr_line = __LINE__;
+ test_consumer_poll_once(c[1].rk, &mv, 0);
+ }
+
+ static_member_expect_rebalance(&c[1], rebalance_start, &c[1].revoked_at,
+ -1);
+
+ /* Await assignment */
+ c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+ c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+ while (!static_member_wait_rebalance(&c[1], rebalance_start,
+ &c[1].assigned_at, 1000)) {
+ c[0].curr_line = __LINE__;
+ test_consumer_poll_once(c[0].rk, &mv, 0);
+ }
+
+ static_member_expect_rebalance(&c[0], rebalance_start,
+ &c[0].assigned_at, -1);
+
+ TEST_SAY("== Testing `session.timeout.ms` member eviction ==\n");
+
+ rebalance_start = test_clock();
+ c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+ TIMING_START(&t_close, "consumer close");
+ test_consumer_close(c[0].rk);
+ rd_kafka_destroy(c[0].rk);
+
+ c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+ static_member_expect_rebalance(&c[1], rebalance_start, &c[1].revoked_at,
+ 2 * 7000);
+
+ c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
+ static_member_expect_rebalance(&c[1], rebalance_start,
+ &c[1].assigned_at, 2000);
+
+ /* Should take at least as long as `session.timeout.ms` but less than
+ * `max.poll.interval.ms`, but since we can't really know when
+ * the last Heartbeat or SyncGroup request was sent we need to
+ * allow some leeway on the minimum side (4s), and also some on
+ * the maximum side (1s) for slow runtimes. */
+ TIMING_ASSERT(&t_close, 6000 - 4000, 9000 + 1000);
+
+ c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
+ test_consumer_close(c[1].rk);
+ rd_kafka_destroy(c[1].rk);
+
+ test_msgver_verify("final.validation", &mv, TEST_MSGVER_ALL, 0, msgcnt);
+ test_msgver_clear(&mv);
+ free(topics);
+
+ SUB_TEST_PASS();
+}
+
+
+/**
+ * @brief Await a non-empty assignment for all consumers in \p c
+ */
+static void await_assignment_multi(const char *what, rd_kafka_t **c, int cnt) {
+ rd_kafka_topic_partition_list_t *parts;
+ int assignment_cnt;
+
+ TEST_SAY("%s\n", what);
+
+ do {
+ int i;
+ int timeout_ms = 1000;
+
+ assignment_cnt = 0;
+
+ for (i = 0; i < cnt; i++) {
+ test_consumer_poll_no_msgs("poll", c[i], 0, timeout_ms);
+ timeout_ms = 100;
+
+ if (!rd_kafka_assignment(c[i], &parts) && parts) {
+ TEST_SAY("%s has %d partition(s) assigned\n",
+ rd_kafka_name(c[i]), parts->cnt);
+ if (parts->cnt > 0)
+ assignment_cnt++;
+ rd_kafka_topic_partition_list_destroy(parts);
+ }
+ }
+
+ } while (assignment_cnt < cnt);
+}
+
+
+static const rd_kafka_t *valid_fatal_rk;
+/**
+ * @brief Tells test harness that fatal error should not fail the current test
+ */
+static int
+is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) {
+ return rk != valid_fatal_rk;
+}
+
+/**
+ * @brief Test that consumer fencing raises a fatal error
+ */
+static void do_test_fenced_member(void) {
+ rd_kafka_t *c[3]; /* 0: consumer2b, 1: consumer1, 2: consumer2a */
+ rd_kafka_conf_t *conf;
+ const char *topic =
+ test_mk_topic_name("0102_static_group_rebalance", 1);
+ rd_kafka_message_t *rkm;
+ char errstr[512];
+ rd_kafka_resp_err_t err;
+
+ SUB_TEST();
+
+ test_conf_init(&conf, NULL, 30);
+
+ test_create_topic(NULL, topic, 3, 1);
+
+ test_conf_set(conf, "group.instance.id", "consumer1");
+ c[1] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL);
+
+ test_conf_set(conf, "group.instance.id", "consumer2");
+ c[2] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL);
+
+ test_wait_topic_exists(c[2], topic, 5000);
+
+ test_consumer_subscribe(c[1], topic);
+ test_consumer_subscribe(c[2], topic);
+
+ await_assignment_multi("Awaiting initial assignments", &c[1], 2);
+
+ /* Create conflicting consumer */
+ TEST_SAY("Creating conflicting consumer2 instance\n");
+ test_conf_set(conf, "group.instance.id", "consumer2");
+ c[0] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL);
+ rd_kafka_conf_destroy(conf);
+
+ test_curr->is_fatal_cb = is_fatal_cb;
+ valid_fatal_rk = c[2]; /* consumer2a is the consumer that should fail */
+
+ test_consumer_subscribe(c[0], topic);
+
+ /* consumer1 should not be affected (other than a rebalance which
+ * we ignore here)... */
+ test_consumer_poll_no_msgs("consumer1", c[1], 0, 5000);
+
+ /* .. but consumer2a should now have been fenced off by consumer2b */
+ rkm = rd_kafka_consumer_poll(c[2], 5000);
+ TEST_ASSERT(rkm != NULL, "Expected error, not timeout");
+ TEST_ASSERT(rkm->err == RD_KAFKA_RESP_ERR__FATAL,
+ "Expected ERR__FATAL, not %s: %s",
+ rd_kafka_err2str(rkm->err), rd_kafka_message_errstr(rkm));
+ TEST_SAY("Fenced consumer returned expected: %s: %s\n",
+ rd_kafka_err2name(rkm->err), rd_kafka_message_errstr(rkm));
+ rd_kafka_message_destroy(rkm);
+
+
+ /* Read the actual error */
+ err = rd_kafka_fatal_error(c[2], errstr, sizeof(errstr));
+ TEST_SAY("%s fatal error: %s: %s\n", rd_kafka_name(c[2]),
+ rd_kafka_err2name(err), errstr);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID,
+ "Expected ERR_FENCED_INSTANCE_ID as fatal error, not %s",
+ rd_kafka_err2name(err));
+
+ TEST_SAY("close\n");
+ /* Close consumer2a, should also return a fatal error */
+ err = rd_kafka_consumer_close(c[2]);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL,
+ "Expected close on %s to return ERR__FATAL, not %s",
+ rd_kafka_name(c[2]), rd_kafka_err2name(err));
+
+ rd_kafka_destroy(c[2]);
+
+ /* consumer2b and consumer1 should be fine and get their
+ * assignments */
+ await_assignment_multi("Awaiting post-fencing assignment", c, 2);
+
+ rd_kafka_destroy(c[0]);
+ rd_kafka_destroy(c[1]);
+
+ SUB_TEST_PASS();
+}
+
+
+
+int main_0102_static_group_rebalance(int argc, char **argv) {
+
+ do_test_static_group_rebalance();
+
+ do_test_fenced_member();
+
+ return 0;
+}