From b485aab7e71c1625cfc27e0f92c9509f42378458 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 13:19:16 +0200 Subject: Adding upstream version 1.45.3+dfsg. Signed-off-by: Daniel Baumann --- .../lib/librdkafka-2.1.0/tests/0019-list_groups.c | 289 +++++++++++++++++++++ 1 file changed, 289 insertions(+) create mode 100644 src/fluent-bit/lib/librdkafka-2.1.0/tests/0019-list_groups.c (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0019-list_groups.c') diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0019-list_groups.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0019-list_groups.c new file mode 100644 index 000000000..02729c339 --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0019-list_groups.c @@ -0,0 +1,289 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" + +/* Typical include path would be , but this program + * is built from within the librdkafka source tree and thus differs. */ +#include "rdkafka.h" /* for Kafka driver */ + + +/** + * List consumer groups + * + * Runs two consumers in two different groups and lists them. + */ + + + +/** + * Verify that all groups in 'groups' are seen, if so returns group_cnt, + * else returns -1. + */ +static int verify_groups(const struct rd_kafka_group_list *grplist, + char **groups, + int group_cnt) { + int i; + int seen = 0; + + for (i = 0; i < grplist->group_cnt; i++) { + const struct rd_kafka_group_info *gi = &grplist->groups[i]; + int j; + + for (j = 0; j < group_cnt; j++) { + if (strcmp(gi->group, groups[j])) + continue; + + if (gi->err) + TEST_SAY( + "Group %s has broker-reported " + "error: %s\n", + gi->group, rd_kafka_err2str(gi->err)); + + seen++; + } + } + + TEST_SAY("Found %d/%d desired groups in list of %d groups\n", seen, + group_cnt, grplist->group_cnt); + + if (seen != group_cnt) + return -1; + else + return seen; +} + + +/** + * List groups by: + * - List all groups, check that the groups in 'groups' are seen. + * - List each group in 'groups', one by one. + * + * Returns 'group_cnt' if all groups in 'groups' were seen by both + * methods, else 0, or -1 on error. + */ +static int +list_groups(rd_kafka_t *rk, char **groups, int group_cnt, const char *desc) { + rd_kafka_resp_err_t err = 0; + const struct rd_kafka_group_list *grplist; + int i, r; + int fails = 0; + int seen = 0; + int seen_all = 0; + int retries = 5; + + TEST_SAY("List groups (expect %d): %s\n", group_cnt, desc); + + /* FIXME: Wait for broker to come up. This should really be abstracted + * by librdkafka. */ + do { + if (err) { + TEST_SAY("Retrying group list in 1s because of: %s\n", + rd_kafka_err2str(err)); + rd_sleep(1); + } + err = rd_kafka_list_groups(rk, NULL, &grplist, + tmout_multip(5000)); + } while ((err == RD_KAFKA_RESP_ERR__TRANSPORT || + err == RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS) && + retries-- > 0); + + if (err) { + TEST_SAY("Failed to list all groups: %s\n", + rd_kafka_err2str(err)); + return -1; + } + + seen_all = verify_groups(grplist, groups, group_cnt); + rd_kafka_group_list_destroy(grplist); + + for (i = 0; i < group_cnt; i++) { + err = rd_kafka_list_groups(rk, groups[i], &grplist, 5000); + if (err) { + TEST_SAY("Failed to list group %s: %s\n", groups[i], + rd_kafka_err2str(err)); + fails++; + continue; + } + + r = verify_groups(grplist, &groups[i], 1); + if (r == 1) + seen++; + rd_kafka_group_list_destroy(grplist); + } + + + if (seen_all != seen) + return 0; + + return seen; +} + + + +static void do_test_list_groups(void) { + const char *topic = test_mk_topic_name(__FUNCTION__, 1); +#define _CONS_CNT 2 + char *groups[_CONS_CNT]; + rd_kafka_t *rk, *rk_c[_CONS_CNT]; + rd_kafka_topic_partition_list_t *topics; + rd_kafka_resp_err_t err; + test_timing_t t_grps; + int i; + int groups_seen; + rd_kafka_topic_t *rkt; + const struct rd_kafka_group_list *grplist; + + SUB_TEST(); + + /* Handle for group listings */ + rk = test_create_producer(); + + /* Produce messages so that topic is auto created */ + rkt = test_create_topic_object(rk, topic, NULL); + test_produce_msgs(rk, rkt, 0, 0, 0, 10, NULL, 64); + rd_kafka_topic_destroy(rkt); + + /* Query groups before creation, should not list our groups. */ + groups_seen = list_groups(rk, NULL, 0, "should be none"); + if (groups_seen != 0) + TEST_FAIL( + "Saw %d groups when there wasn't " + "supposed to be any\n", + groups_seen); + + /* Fill in topic subscription set */ + topics = rd_kafka_topic_partition_list_new(1); + rd_kafka_topic_partition_list_add(topics, topic, -1); + + /* Create consumers and start subscription */ + for (i = 0; i < _CONS_CNT; i++) { + groups[i] = malloc(32); + test_str_id_generate(groups[i], 32); + rk_c[i] = test_create_consumer(groups[i], NULL, NULL, NULL); + + err = rd_kafka_poll_set_consumer(rk_c[i]); + if (err) + TEST_FAIL("poll_set_consumer: %s\n", + rd_kafka_err2str(err)); + + err = rd_kafka_subscribe(rk_c[i], topics); + if (err) + TEST_FAIL("subscribe: %s\n", rd_kafka_err2str(err)); + } + + rd_kafka_topic_partition_list_destroy(topics); + + + TIMING_START(&t_grps, "WAIT.GROUPS"); + /* Query groups again until both groups are seen. */ + while (1) { + groups_seen = list_groups(rk, (char **)groups, _CONS_CNT, + "should see my groups"); + if (groups_seen == _CONS_CNT) + break; + rd_sleep(1); + } + TIMING_STOP(&t_grps); + + /* Try a list_groups with a low enough timeout to fail. */ + grplist = NULL; + TIMING_START(&t_grps, "WAIT.GROUPS.TIMEOUT0"); + err = rd_kafka_list_groups(rk, NULL, &grplist, 0); + TIMING_STOP(&t_grps); + TEST_SAY("list_groups(timeout=0) returned %d groups and status: %s\n", + grplist ? grplist->group_cnt : -1, rd_kafka_err2str(err)); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT, + "expected list_groups(timeout=0) to fail " + "with timeout, got %s", + rd_kafka_err2str(err)); + + + TEST_SAY("Closing remaining consumers\n"); + for (i = 0; i < _CONS_CNT; i++) { + test_timing_t t_close; + if (!rk_c[i]) + continue; + + TEST_SAY("Closing %s\n", rd_kafka_name(rk_c[i])); + TIMING_START(&t_close, "CONSUMER.CLOSE"); + err = rd_kafka_consumer_close(rk_c[i]); + TIMING_STOP(&t_close); + if (err) + TEST_FAIL("consumer_close failed: %s\n", + rd_kafka_err2str(err)); + + rd_kafka_destroy(rk_c[i]); + rk_c[i] = NULL; + + free(groups[i]); + } + + rd_kafka_destroy(rk); + + SUB_TEST_PASS(); +} + + + +/** + * @brief #3705: Verify that list_groups() doesn't hang if unable to + * connect to the cluster. + */ +static void do_test_list_groups_hang(void) { + rd_kafka_conf_t *conf; + rd_kafka_t *rk; + const struct rd_kafka_group_list *grplist; + rd_kafka_resp_err_t err; + test_timing_t timing; + + SUB_TEST(); + test_conf_init(&conf, NULL, 20); + + /* An unavailable broker */ + test_conf_set(conf, "bootstrap.servers", "127.0.0.1:65531"); + + rk = test_create_handle(RD_KAFKA_CONSUMER, conf); + + TIMING_START(&timing, "list_groups"); + err = rd_kafka_list_groups(rk, NULL, &grplist, 5 * 1000); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT, + "Expected ERR__TIMED_OUT, not %s", rd_kafka_err2name(err)); + TIMING_ASSERT(&timing, 5 * 1000, 7 * 1000); + + rd_kafka_destroy(rk); + + SUB_TEST_PASS(); +} + + +int main_0019_list_groups(int argc, char **argv) { + do_test_list_groups(); + do_test_list_groups_hang(); + return 0; +} -- cgit v1.2.3