diff options
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0102-static_group_rebalance.c')
-rw-r--r-- | src/fluent-bit/lib/librdkafka-2.1.0/tests/0102-static_group_rebalance.c | 535 |
1 files changed, 535 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0102-static_group_rebalance.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0102-static_group_rebalance.c new file mode 100644 index 000000000..231a09065 --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0102-static_group_rebalance.c @@ -0,0 +1,535 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2019, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + + +/** + * @name KafkaConsumer static membership tests + * + * Runs two consumers subscribing to multiple topics simulating various + * rebalance scenarios with static group membership enabled. + */ + +#define _CONSUMER_CNT 2 + +typedef struct _consumer_s { + rd_kafka_t *rk; + test_msgver_t *mv; + int64_t assigned_at; + int64_t revoked_at; + int partition_cnt; + rd_kafka_resp_err_t expected_rb_event; + int curr_line; +} _consumer_t; + + +/** + * @brief Call poll until a rebalance has been triggered + */ +static int static_member_wait_rebalance0(int line, + _consumer_t *c, + int64_t start, + int64_t *target, + int timeout_ms) { + int64_t tmout = test_clock() + (timeout_ms * 1000); + test_timing_t t_time; + + c->curr_line = line; + + TEST_SAY("line %d: %s awaiting %s event\n", line, rd_kafka_name(c->rk), + rd_kafka_err2name(c->expected_rb_event)); + + TIMING_START(&t_time, "wait_rebalance"); + while (timeout_ms < 0 ? 1 : test_clock() <= tmout) { + if (*target > start) { + c->curr_line = 0; + return 1; + } + test_consumer_poll_once(c->rk, c->mv, 1000); + } + TIMING_STOP(&t_time); + + c->curr_line = 0; + + TEST_SAY("line %d: %s timed out awaiting %s event\n", line, + rd_kafka_name(c->rk), rd_kafka_err2name(c->expected_rb_event)); + + return 0; +} + +#define static_member_expect_rebalance(C, START, TARGET, TIMEOUT_MS) \ + do { \ + if (!static_member_wait_rebalance0(__LINE__, C, START, TARGET, \ + TIMEOUT_MS)) \ + TEST_FAIL("%s: timed out waiting for %s event", \ + rd_kafka_name((C)->rk), \ + rd_kafka_err2name((C)->expected_rb_event)); \ + } while (0) + +#define static_member_wait_rebalance(C, START, TARGET, TIMEOUT_MS) \ + static_member_wait_rebalance0(__LINE__, C, START, TARGET, TIMEOUT_MS) + + +static void rebalance_cb(rd_kafka_t *rk, + rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *parts, + void *opaque) { + _consumer_t *c = opaque; + + TEST_ASSERT(c->expected_rb_event == err, + "line %d: %s: Expected rebalance event %s got %s\n", + c->curr_line, rd_kafka_name(rk), + rd_kafka_err2name(c->expected_rb_event), + rd_kafka_err2name(err)); + + switch (err) { + case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: + TEST_SAY("line %d: %s Assignment (%d partition(s)):\n", + c->curr_line, rd_kafka_name(rk), parts->cnt); + test_print_partition_list(parts); + + c->partition_cnt = parts->cnt; + c->assigned_at = test_clock(); + rd_kafka_assign(rk, parts); + + break; + + case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: + c->revoked_at = test_clock(); + rd_kafka_assign(rk, NULL); + TEST_SAY("line %d: %s revoked %d partitions\n", c->curr_line, + rd_kafka_name(c->rk), parts->cnt); + + break; + + default: + TEST_FAIL("rebalance failed: %s", rd_kafka_err2str(err)); + break; + } + + /* Reset error */ + c->expected_rb_event = RD_KAFKA_RESP_ERR_NO_ERROR; + + /* prevent poll from triggering more than one rebalance event */ + rd_kafka_yield(rk); +} + + +static void do_test_static_group_rebalance(void) { + rd_kafka_conf_t *conf; + test_msgver_t mv; + int64_t rebalance_start; + _consumer_t c[_CONSUMER_CNT] = RD_ZERO_INIT; + const int msgcnt = 100; + uint64_t testid = test_id_generate(); + const char *topic = + test_mk_topic_name("0102_static_group_rebalance", 1); + char *topics = rd_strdup(tsprintf("^%s.*", topic)); + test_timing_t t_close; + + SUB_TEST(); + + test_conf_init(&conf, NULL, 70); + test_msgver_init(&mv, testid); + c[0].mv = &mv; + c[1].mv = &mv; + + test_create_topic(NULL, topic, 3, 1); + test_produce_msgs_easy(topic, testid, RD_KAFKA_PARTITION_UA, msgcnt); + + test_conf_set(conf, "max.poll.interval.ms", "9000"); + test_conf_set(conf, "session.timeout.ms", "6000"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "topic.metadata.refresh.interval.ms", "500"); + test_conf_set(conf, "metadata.max.age.ms", "5000"); + test_conf_set(conf, "enable.partition.eof", "true"); + test_conf_set(conf, "group.instance.id", "consumer1"); + + rd_kafka_conf_set_opaque(conf, &c[0]); + c[0].rk = test_create_consumer(topic, rebalance_cb, + rd_kafka_conf_dup(conf), NULL); + + rd_kafka_conf_set_opaque(conf, &c[1]); + test_conf_set(conf, "group.instance.id", "consumer2"); + c[1].rk = test_create_consumer(topic, rebalance_cb, + rd_kafka_conf_dup(conf), NULL); + rd_kafka_conf_destroy(conf); + + test_wait_topic_exists(c[1].rk, topic, 5000); + + test_consumer_subscribe(c[0].rk, topics); + test_consumer_subscribe(c[1].rk, topics); + + /* + * Static members enforce `max.poll.interval.ms` which may prompt + * an unwanted rebalance while the other consumer awaits its assignment. + * These members remain in the member list however so we must + * interleave calls to poll while awaiting our assignment to avoid + * unexpected rebalances being triggered. + */ + rebalance_start = test_clock(); + c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + while (!static_member_wait_rebalance(&c[0], rebalance_start, + &c[0].assigned_at, 1000)) { + /* keep consumer 2 alive while consumer 1 awaits + * its assignment + */ + c[1].curr_line = __LINE__; + test_consumer_poll_once(c[1].rk, &mv, 0); + } + + static_member_expect_rebalance(&c[1], rebalance_start, + &c[1].assigned_at, -1); + + /* + * Consume all the messages so we can watch for duplicates + * after rejoin/rebalance operations. + */ + c[0].curr_line = __LINE__; + test_consumer_poll("serve.queue", c[0].rk, testid, c[0].partition_cnt, + 0, -1, &mv); + c[1].curr_line = __LINE__; + test_consumer_poll("serve.queue", c[1].rk, testid, c[1].partition_cnt, + 0, -1, &mv); + + test_msgver_verify("first.verify", &mv, TEST_MSGVER_ALL, 0, msgcnt); + + TEST_SAY("== Testing consumer restart ==\n"); + conf = rd_kafka_conf_dup(rd_kafka_conf(c[1].rk)); + + /* Only c[1] should exhibit rebalance behavior */ + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + TIMING_START(&t_close, "consumer restart"); + test_consumer_close(c[1].rk); + rd_kafka_destroy(c[1].rk); + + c[1].rk = test_create_handle(RD_KAFKA_CONSUMER, conf); + rd_kafka_poll_set_consumer(c[1].rk); + + test_consumer_subscribe(c[1].rk, topics); + + /* Await assignment */ + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + rebalance_start = test_clock(); + while (!static_member_wait_rebalance(&c[1], rebalance_start, + &c[1].assigned_at, 1000)) { + c[0].curr_line = __LINE__; + test_consumer_poll_once(c[0].rk, &mv, 0); + } + TIMING_STOP(&t_close); + + /* Should complete before `session.timeout.ms` */ + TIMING_ASSERT(&t_close, 0, 6000); + + + TEST_SAY("== Testing subscription expansion ==\n"); + + /* + * New topics matching the subscription pattern should cause + * group rebalance + */ + test_create_topic(c->rk, tsprintf("%snew", topic), 1, 1); + + /* Await revocation */ + rebalance_start = test_clock(); + c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + while (!static_member_wait_rebalance(&c[0], rebalance_start, + &c[0].revoked_at, 1000)) { + c[1].curr_line = __LINE__; + test_consumer_poll_once(c[1].rk, &mv, 0); + } + + static_member_expect_rebalance(&c[1], rebalance_start, &c[1].revoked_at, + -1); + + /* Await assignment */ + c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + while (!static_member_wait_rebalance(&c[0], rebalance_start, + &c[0].assigned_at, 1000)) { + c[1].curr_line = __LINE__; + test_consumer_poll_once(c[1].rk, &mv, 0); + } + + static_member_expect_rebalance(&c[1], rebalance_start, + &c[1].assigned_at, -1); + + TEST_SAY("== Testing consumer unsubscribe ==\n"); + + /* Unsubscribe should send a LeaveGroupRequest invoking a rebalance */ + + /* Send LeaveGroup incrementing generation by 1 */ + rebalance_start = test_clock(); + rd_kafka_unsubscribe(c[1].rk); + + /* Await revocation */ + c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + static_member_expect_rebalance(&c[1], rebalance_start, &c[1].revoked_at, + -1); + static_member_expect_rebalance(&c[0], rebalance_start, &c[0].revoked_at, + -1); + + /* New cgrp generation with 1 member, c[0] */ + c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + static_member_expect_rebalance(&c[0], rebalance_start, + &c[0].assigned_at, -1); + + /* Send JoinGroup bumping generation by 1 */ + rebalance_start = test_clock(); + test_consumer_subscribe(c[1].rk, topics); + + /* End previous single member generation */ + c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + static_member_expect_rebalance(&c[0], rebalance_start, &c[0].revoked_at, + -1); + + /* Await assignment */ + c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + while (!static_member_wait_rebalance(&c[1], rebalance_start, + &c[1].assigned_at, 1000)) { + c[0].curr_line = __LINE__; + test_consumer_poll_once(c[0].rk, &mv, 0); + } + + static_member_expect_rebalance(&c[0], rebalance_start, + &c[0].assigned_at, -1); + + TEST_SAY("== Testing max poll violation ==\n"); + /* max.poll.interval.ms should still be enforced by the consumer */ + + /* + * Block long enough for consumer 2 to be evicted from the group + * `max.poll.interval.ms` + `session.timeout.ms` + */ + rebalance_start = test_clock(); + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + c[0].curr_line = __LINE__; + test_consumer_poll_no_msgs("wait.max.poll", c[0].rk, testid, + 6000 + 9000); + c[1].curr_line = __LINE__; + test_consumer_poll_expect_err(c[1].rk, testid, 1000, + RD_KAFKA_RESP_ERR__MAX_POLL_EXCEEDED); + + /* Await revocation */ + while (!static_member_wait_rebalance(&c[0], rebalance_start, + &c[0].revoked_at, 1000)) { + c[1].curr_line = __LINE__; + test_consumer_poll_once(c[1].rk, &mv, 0); + } + + static_member_expect_rebalance(&c[1], rebalance_start, &c[1].revoked_at, + -1); + + /* Await assignment */ + c[0].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + while (!static_member_wait_rebalance(&c[1], rebalance_start, + &c[1].assigned_at, 1000)) { + c[0].curr_line = __LINE__; + test_consumer_poll_once(c[0].rk, &mv, 0); + } + + static_member_expect_rebalance(&c[0], rebalance_start, + &c[0].assigned_at, -1); + + TEST_SAY("== Testing `session.timeout.ms` member eviction ==\n"); + + rebalance_start = test_clock(); + c[0].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + TIMING_START(&t_close, "consumer close"); + test_consumer_close(c[0].rk); + rd_kafka_destroy(c[0].rk); + + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + static_member_expect_rebalance(&c[1], rebalance_start, &c[1].revoked_at, + 2 * 7000); + + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS; + static_member_expect_rebalance(&c[1], rebalance_start, + &c[1].assigned_at, 2000); + + /* Should take at least as long as `session.timeout.ms` but less than + * `max.poll.interval.ms`, but since we can't really know when + * the last Heartbeat or SyncGroup request was sent we need to + * allow some leeway on the minimum side (4s), and also some on + * the maximum side (1s) for slow runtimes. */ + TIMING_ASSERT(&t_close, 6000 - 4000, 9000 + 1000); + + c[1].expected_rb_event = RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS; + test_consumer_close(c[1].rk); + rd_kafka_destroy(c[1].rk); + + test_msgver_verify("final.validation", &mv, TEST_MSGVER_ALL, 0, msgcnt); + test_msgver_clear(&mv); + free(topics); + + SUB_TEST_PASS(); +} + + +/** + * @brief Await a non-empty assignment for all consumers in \p c + */ +static void await_assignment_multi(const char *what, rd_kafka_t **c, int cnt) { + rd_kafka_topic_partition_list_t *parts; + int assignment_cnt; + + TEST_SAY("%s\n", what); + + do { + int i; + int timeout_ms = 1000; + + assignment_cnt = 0; + + for (i = 0; i < cnt; i++) { + test_consumer_poll_no_msgs("poll", c[i], 0, timeout_ms); + timeout_ms = 100; + + if (!rd_kafka_assignment(c[i], &parts) && parts) { + TEST_SAY("%s has %d partition(s) assigned\n", + rd_kafka_name(c[i]), parts->cnt); + if (parts->cnt > 0) + assignment_cnt++; + rd_kafka_topic_partition_list_destroy(parts); + } + } + + } while (assignment_cnt < cnt); +} + + +static const rd_kafka_t *valid_fatal_rk; +/** + * @brief Tells test harness that fatal error should not fail the current test + */ +static int +is_fatal_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, const char *reason) { + return rk != valid_fatal_rk; +} + +/** + * @brief Test that consumer fencing raises a fatal error + */ +static void do_test_fenced_member(void) { + rd_kafka_t *c[3]; /* 0: consumer2b, 1: consumer1, 2: consumer2a */ + rd_kafka_conf_t *conf; + const char *topic = + test_mk_topic_name("0102_static_group_rebalance", 1); + rd_kafka_message_t *rkm; + char errstr[512]; + rd_kafka_resp_err_t err; + + SUB_TEST(); + + test_conf_init(&conf, NULL, 30); + + test_create_topic(NULL, topic, 3, 1); + + test_conf_set(conf, "group.instance.id", "consumer1"); + c[1] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL); + + test_conf_set(conf, "group.instance.id", "consumer2"); + c[2] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL); + + test_wait_topic_exists(c[2], topic, 5000); + + test_consumer_subscribe(c[1], topic); + test_consumer_subscribe(c[2], topic); + + await_assignment_multi("Awaiting initial assignments", &c[1], 2); + + /* Create conflicting consumer */ + TEST_SAY("Creating conflicting consumer2 instance\n"); + test_conf_set(conf, "group.instance.id", "consumer2"); + c[0] = test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL); + rd_kafka_conf_destroy(conf); + + test_curr->is_fatal_cb = is_fatal_cb; + valid_fatal_rk = c[2]; /* consumer2a is the consumer that should fail */ + + test_consumer_subscribe(c[0], topic); + + /* consumer1 should not be affected (other than a rebalance which + * we ignore here)... */ + test_consumer_poll_no_msgs("consumer1", c[1], 0, 5000); + + /* .. but consumer2a should now have been fenced off by consumer2b */ + rkm = rd_kafka_consumer_poll(c[2], 5000); + TEST_ASSERT(rkm != NULL, "Expected error, not timeout"); + TEST_ASSERT(rkm->err == RD_KAFKA_RESP_ERR__FATAL, + "Expected ERR__FATAL, not %s: %s", + rd_kafka_err2str(rkm->err), rd_kafka_message_errstr(rkm)); + TEST_SAY("Fenced consumer returned expected: %s: %s\n", + rd_kafka_err2name(rkm->err), rd_kafka_message_errstr(rkm)); + rd_kafka_message_destroy(rkm); + + + /* Read the actual error */ + err = rd_kafka_fatal_error(c[2], errstr, sizeof(errstr)); + TEST_SAY("%s fatal error: %s: %s\n", rd_kafka_name(c[2]), + rd_kafka_err2name(err), errstr); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR_FENCED_INSTANCE_ID, + "Expected ERR_FENCED_INSTANCE_ID as fatal error, not %s", + rd_kafka_err2name(err)); + + TEST_SAY("close\n"); + /* Close consumer2a, should also return a fatal error */ + err = rd_kafka_consumer_close(c[2]); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR__FATAL, + "Expected close on %s to return ERR__FATAL, not %s", + rd_kafka_name(c[2]), rd_kafka_err2name(err)); + + rd_kafka_destroy(c[2]); + + /* consumer2b and consumer1 should be fine and get their + * assignments */ + await_assignment_multi("Awaiting post-fencing assignment", c, 2); + + rd_kafka_destroy(c[0]); + rd_kafka_destroy(c[1]); + + SUB_TEST_PASS(); +} + + + +int main_0102_static_group_rebalance(int argc, char **argv) { + + do_test_static_group_rebalance(); + + do_test_fenced_member(); + + return 0; +} |