summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/tests/0019-list_groups.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0019-list_groups.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/tests/0019-list_groups.c289
1 files changed, 289 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0019-list_groups.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0019-list_groups.c
new file mode 100644
index 000000000..02729c339
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/tests/0019-list_groups.c
@@ -0,0 +1,289 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+
+/* Typical include path would be <librdkafka/rdkafka.h>, but this program
+ * is built from within the librdkafka source tree and thus differs. */
+#include "rdkafka.h" /* for Kafka driver */
+
+
+/**
+ * List consumer groups
+ *
+ * Runs two consumers in two different groups and lists them.
+ */
+
+
+
+/**
+ * Verify that all groups in 'groups' are seen, if so returns group_cnt,
+ * else returns -1.
+ */
+static int verify_groups(const struct rd_kafka_group_list *grplist,
+ char **groups,
+ int group_cnt) {
+ int i;
+ int seen = 0;
+
+ for (i = 0; i < grplist->group_cnt; i++) {
+ const struct rd_kafka_group_info *gi = &grplist->groups[i];
+ int j;
+
+ for (j = 0; j < group_cnt; j++) {
+ if (strcmp(gi->group, groups[j]))
+ continue;
+
+ if (gi->err)
+ TEST_SAY(
+ "Group %s has broker-reported "
+ "error: %s\n",
+ gi->group, rd_kafka_err2str(gi->err));
+
+ seen++;
+ }
+ }
+
+ TEST_SAY("Found %d/%d desired groups in list of %d groups\n", seen,
+ group_cnt, grplist->group_cnt);
+
+ if (seen != group_cnt)
+ return -1;
+ else
+ return seen;
+}
+
+
+/**
+ * List groups by:
+ * - List all groups, check that the groups in 'groups' are seen.
+ * - List each group in 'groups', one by one.
+ *
+ * Returns 'group_cnt' if all groups in 'groups' were seen by both
+ * methods, else 0, or -1 on error.
+ */
+static int
+list_groups(rd_kafka_t *rk, char **groups, int group_cnt, const char *desc) {
+ rd_kafka_resp_err_t err = 0;
+ const struct rd_kafka_group_list *grplist;
+ int i, r;
+ int fails = 0;
+ int seen = 0;
+ int seen_all = 0;
+ int retries = 5;
+
+ TEST_SAY("List groups (expect %d): %s\n", group_cnt, desc);
+
+ /* FIXME: Wait for broker to come up. This should really be abstracted
+ * by librdkafka. */
+ do {
+ if (err) {
+ TEST_SAY("Retrying group list in 1s because of: %s\n",
+ rd_kafka_err2str(err));
+ rd_sleep(1);
+ }
+ err = rd_kafka_list_groups(rk, NULL, &grplist,
+ tmout_multip(5000));
+ } while ((err == RD_KAFKA_RESP_ERR__TRANSPORT ||
+ err == RD_KAFKA_RESP_ERR_GROUP_LOAD_IN_PROGRESS) &&
+ retries-- > 0);
+
+ if (err) {
+ TEST_SAY("Failed to list all groups: %s\n",
+ rd_kafka_err2str(err));
+ return -1;
+ }
+
+ seen_all = verify_groups(grplist, groups, group_cnt);
+ rd_kafka_group_list_destroy(grplist);
+
+ for (i = 0; i < group_cnt; i++) {
+ err = rd_kafka_list_groups(rk, groups[i], &grplist, 5000);
+ if (err) {
+ TEST_SAY("Failed to list group %s: %s\n", groups[i],
+ rd_kafka_err2str(err));
+ fails++;
+ continue;
+ }
+
+ r = verify_groups(grplist, &groups[i], 1);
+ if (r == 1)
+ seen++;
+ rd_kafka_group_list_destroy(grplist);
+ }
+
+
+ if (seen_all != seen)
+ return 0;
+
+ return seen;
+}
+
+
+
+static void do_test_list_groups(void) {
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+#define _CONS_CNT 2
+ char *groups[_CONS_CNT];
+ rd_kafka_t *rk, *rk_c[_CONS_CNT];
+ rd_kafka_topic_partition_list_t *topics;
+ rd_kafka_resp_err_t err;
+ test_timing_t t_grps;
+ int i;
+ int groups_seen;
+ rd_kafka_topic_t *rkt;
+ const struct rd_kafka_group_list *grplist;
+
+ SUB_TEST();
+
+ /* Handle for group listings */
+ rk = test_create_producer();
+
+ /* Produce messages so that topic is auto created */
+ rkt = test_create_topic_object(rk, topic, NULL);
+ test_produce_msgs(rk, rkt, 0, 0, 0, 10, NULL, 64);
+ rd_kafka_topic_destroy(rkt);
+
+ /* Query groups before creation, should not list our groups. */
+ groups_seen = list_groups(rk, NULL, 0, "should be none");
+ if (groups_seen != 0)
+ TEST_FAIL(
+ "Saw %d groups when there wasn't "
+ "supposed to be any\n",
+ groups_seen);
+
+ /* Fill in topic subscription set */
+ topics = rd_kafka_topic_partition_list_new(1);
+ rd_kafka_topic_partition_list_add(topics, topic, -1);
+
+ /* Create consumers and start subscription */
+ for (i = 0; i < _CONS_CNT; i++) {
+ groups[i] = malloc(32);
+ test_str_id_generate(groups[i], 32);
+ rk_c[i] = test_create_consumer(groups[i], NULL, NULL, NULL);
+
+ err = rd_kafka_poll_set_consumer(rk_c[i]);
+ if (err)
+ TEST_FAIL("poll_set_consumer: %s\n",
+ rd_kafka_err2str(err));
+
+ err = rd_kafka_subscribe(rk_c[i], topics);
+ if (err)
+ TEST_FAIL("subscribe: %s\n", rd_kafka_err2str(err));
+ }
+
+ rd_kafka_topic_partition_list_destroy(topics);
+
+
+ TIMING_START(&t_grps, "WAIT.GROUPS");
+ /* Query groups again until both groups are seen. */
+ while (1) {
+ groups_seen = list_groups(rk, (char **)groups, _CONS_CNT,
+ "should see my groups");
+ if (groups_seen == _CONS_CNT)
+ break;
+ rd_sleep(1);
+ }
+ TIMING_STOP(&t_grps);
+
+ /* Try a list_groups with a low enough timeout to fail. */
+ grplist = NULL;
+ TIMING_START(&t_grps, "WAIT.GROUPS.TIMEOUT0");
+ err = rd_kafka_list_groups(rk, NULL, &grplist, 0);
+ TIMING_STOP(&t_grps);
+ TEST_SAY("list_groups(timeout=0) returned %d groups and status: %s\n",
+ grplist ? grplist->group_cnt : -1, rd_kafka_err2str(err));
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "expected list_groups(timeout=0) to fail "
+ "with timeout, got %s",
+ rd_kafka_err2str(err));
+
+
+ TEST_SAY("Closing remaining consumers\n");
+ for (i = 0; i < _CONS_CNT; i++) {
+ test_timing_t t_close;
+ if (!rk_c[i])
+ continue;
+
+ TEST_SAY("Closing %s\n", rd_kafka_name(rk_c[i]));
+ TIMING_START(&t_close, "CONSUMER.CLOSE");
+ err = rd_kafka_consumer_close(rk_c[i]);
+ TIMING_STOP(&t_close);
+ if (err)
+ TEST_FAIL("consumer_close failed: %s\n",
+ rd_kafka_err2str(err));
+
+ rd_kafka_destroy(rk_c[i]);
+ rk_c[i] = NULL;
+
+ free(groups[i]);
+ }
+
+ rd_kafka_destroy(rk);
+
+ SUB_TEST_PASS();
+}
+
+
+
+/**
+ * @brief #3705: Verify that list_groups() doesn't hang if unable to
+ * connect to the cluster.
+ */
+static void do_test_list_groups_hang(void) {
+ rd_kafka_conf_t *conf;
+ rd_kafka_t *rk;
+ const struct rd_kafka_group_list *grplist;
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+
+ SUB_TEST();
+ test_conf_init(&conf, NULL, 20);
+
+ /* An unavailable broker */
+ test_conf_set(conf, "bootstrap.servers", "127.0.0.1:65531");
+
+ rk = test_create_handle(RD_KAFKA_CONSUMER, conf);
+
+ TIMING_START(&timing, "list_groups");
+ err = rd_kafka_list_groups(rk, NULL, &grplist, 5 * 1000);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "Expected ERR__TIMED_OUT, not %s", rd_kafka_err2name(err));
+ TIMING_ASSERT(&timing, 5 * 1000, 7 * 1000);
+
+ rd_kafka_destroy(rk);
+
+ SUB_TEST_PASS();
+}
+
+
+int main_0019_list_groups(int argc, char **argv) {
+ do_test_list_groups();
+ do_test_list_groups_hang();
+ return 0;
+}