summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:07:37 +0000
commitb485aab7e71c1625cfc27e0f92c9509f42378458 (patch)
treeae9abe108601079d1679194de237c9a435ae5b55 /src/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz
netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c3797
1 files changed, 3797 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c
new file mode 100644
index 000000000..7da2dff15
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c
@@ -0,0 +1,3797 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+#include "rdkafka.h"
+#include "../src/rdstring.h"
+
+/**
+ * @brief Admin API integration tests.
+ */
+
+
+static int32_t *avail_brokers;
+static size_t avail_broker_cnt;
+
+
+
+static void do_test_CreateTopics(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int op_timeout,
+ rd_bool_t validate_only) {
+ rd_kafka_queue_t *q;
+#define MY_NEW_TOPICS_CNT 7
+ char *topics[MY_NEW_TOPICS_CNT];
+ rd_kafka_NewTopic_t *new_topics[MY_NEW_TOPICS_CNT];
+ rd_kafka_AdminOptions_t *options = NULL;
+ rd_kafka_resp_err_t exp_topicerr[MY_NEW_TOPICS_CNT] = {0};
+ rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ /* Expected topics in metadata */
+ rd_kafka_metadata_topic_t exp_mdtopics[MY_NEW_TOPICS_CNT] = {{0}};
+ int exp_mdtopic_cnt = 0;
+ /* Not expected topics in metadata */
+ rd_kafka_metadata_topic_t exp_not_mdtopics[MY_NEW_TOPICS_CNT] = {{0}};
+ int exp_not_mdtopic_cnt = 0;
+ int i;
+ char errstr[512];
+ const char *errstr2;
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ rd_kafka_event_t *rkev;
+ const rd_kafka_CreateTopics_result_t *res;
+ const rd_kafka_topic_result_t **restopics;
+ size_t restopic_cnt;
+ int metadata_tmout;
+ int num_replicas = (int)avail_broker_cnt;
+ int32_t *replicas;
+
+ SUB_TEST_QUICK(
+ "%s CreateTopics with %s, "
+ "op_timeout %d, validate_only %d",
+ rd_kafka_name(rk), what, op_timeout, validate_only);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ /* Set up replicas */
+ replicas = rd_alloca(sizeof(*replicas) * num_replicas);
+ for (i = 0; i < num_replicas; i++)
+ replicas[i] = avail_brokers[i];
+
+ /**
+ * Construct NewTopic array with different properties for
+ * different partitions.
+ */
+ for (i = 0; i < MY_NEW_TOPICS_CNT; i++) {
+ char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
+ int use_defaults =
+ i == 6 && test_broker_version >= TEST_BRKVER(2, 4, 0, 0);
+ int num_parts = !use_defaults ? (i * 7 + 1) : -1;
+ int set_config = (i & 1);
+ int add_invalid_config = (i == 1);
+ int set_replicas = !use_defaults && !(i % 3);
+ rd_kafka_resp_err_t this_exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+
+ topics[i] = topic;
+ new_topics[i] = rd_kafka_NewTopic_new(
+ topic, num_parts, set_replicas ? -1 : num_replicas, NULL,
+ 0);
+
+ if (set_config) {
+ /*
+ * Add various configuration properties
+ */
+ err = rd_kafka_NewTopic_set_config(
+ new_topics[i], "compression.type", "lz4");
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ err = rd_kafka_NewTopic_set_config(
+ new_topics[i], "delete.retention.ms", "900");
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ }
+
+ if (add_invalid_config) {
+ /* Add invalid config property */
+ err = rd_kafka_NewTopic_set_config(
+ new_topics[i], "dummy.doesntexist",
+ "broker is verifying this");
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ this_exp_err = RD_KAFKA_RESP_ERR_INVALID_CONFIG;
+ }
+
+ TEST_SAY(
+ "Expecting result for topic #%d: %s "
+ "(set_config=%d, add_invalid_config=%d, "
+ "set_replicas=%d, use_defaults=%d)\n",
+ i, rd_kafka_err2name(this_exp_err), set_config,
+ add_invalid_config, set_replicas, use_defaults);
+
+ if (set_replicas) {
+ int32_t p;
+
+ /*
+ * Set valid replica assignments
+ */
+ for (p = 0; p < num_parts; p++) {
+ err = rd_kafka_NewTopic_set_replica_assignment(
+ new_topics[i], p, replicas, num_replicas,
+ errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", errstr);
+ }
+ }
+
+ if (this_exp_err || validate_only) {
+ exp_topicerr[i] = this_exp_err;
+ exp_not_mdtopics[exp_not_mdtopic_cnt++].topic = topic;
+
+ } else {
+ exp_mdtopics[exp_mdtopic_cnt].topic = topic;
+ exp_mdtopics[exp_mdtopic_cnt].partition_cnt = num_parts;
+ exp_mdtopic_cnt++;
+ }
+ }
+
+ if (op_timeout != -1 || validate_only) {
+ options = rd_kafka_AdminOptions_new(
+ rk, RD_KAFKA_ADMIN_OP_CREATETOPICS);
+
+ if (op_timeout != -1) {
+ err = rd_kafka_AdminOptions_set_operation_timeout(
+ options, op_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ }
+
+ if (validate_only) {
+ err = rd_kafka_AdminOptions_set_validate_only(
+ options, validate_only, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ }
+ }
+
+ TIMING_START(&timing, "CreateTopics");
+ TEST_SAY("Call CreateTopics\n");
+ rd_kafka_CreateTopics(rk, new_topics, MY_NEW_TOPICS_CNT, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ /* Poll result queue for CreateTopics result.
+ * Print but otherwise ignore other event types
+ * (typically generic Error events). */
+ TIMING_START(&timing, "CreateTopics.queue_poll");
+ do {
+ rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000));
+ TEST_SAY("CreateTopics: got %s in %.3fms\n",
+ rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+ if (rd_kafka_event_error(rkev))
+ TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev),
+ rd_kafka_event_error_string(rkev));
+ } while (rd_kafka_event_type(rkev) !=
+ RD_KAFKA_EVENT_CREATETOPICS_RESULT);
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_CreateTopics_result(rkev);
+ TEST_ASSERT(res, "expected CreateTopics_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err == exp_err,
+ "expected CreateTopics to return %s, not %s (%s)",
+ rd_kafka_err2str(exp_err), rd_kafka_err2str(err),
+ err ? errstr2 : "n/a");
+
+ TEST_SAY("CreateTopics: returned %s (%s)\n", rd_kafka_err2str(err),
+ err ? errstr2 : "n/a");
+
+ /* Extract topics */
+ restopics = rd_kafka_CreateTopics_result_topics(res, &restopic_cnt);
+
+
+ /* Scan topics for proper fields and expected failures. */
+ for (i = 0; i < (int)restopic_cnt; i++) {
+ const rd_kafka_topic_result_t *terr = restopics[i];
+
+ /* Verify that topic order matches our request. */
+ if (strcmp(rd_kafka_topic_result_name(terr), topics[i]))
+ TEST_FAIL_LATER(
+ "Topic result order mismatch at #%d: "
+ "expected %s, got %s",
+ i, topics[i], rd_kafka_topic_result_name(terr));
+
+ TEST_SAY("CreateTopics result: #%d: %s: %s: %s\n", i,
+ rd_kafka_topic_result_name(terr),
+ rd_kafka_err2name(rd_kafka_topic_result_error(terr)),
+ rd_kafka_topic_result_error_string(terr));
+ if (rd_kafka_topic_result_error(terr) != exp_topicerr[i])
+ TEST_FAIL_LATER("Expected %s, not %d: %s",
+ rd_kafka_err2name(exp_topicerr[i]),
+ rd_kafka_topic_result_error(terr),
+ rd_kafka_err2name(
+ rd_kafka_topic_result_error(terr)));
+ }
+
+ /**
+ * Verify that the expecteded topics are created and the non-expected
+ * are not. Allow it some time to propagate.
+ */
+ if (validate_only) {
+ /* No topics should have been created, give it some time
+ * before checking. */
+ rd_sleep(2);
+ metadata_tmout = 5 * 1000;
+ } else {
+ if (op_timeout > 0)
+ metadata_tmout = op_timeout + 1000;
+ else
+ metadata_tmout = 10 * 1000;
+ }
+
+ test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt,
+ exp_not_mdtopics, exp_not_mdtopic_cnt,
+ metadata_tmout);
+
+ rd_kafka_event_destroy(rkev);
+
+ for (i = 0; i < MY_NEW_TOPICS_CNT; i++) {
+ rd_kafka_NewTopic_destroy(new_topics[i]);
+ rd_free(topics[i]);
+ }
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ TEST_LATER_CHECK();
+#undef MY_NEW_TOPICS_CNT
+
+ SUB_TEST_PASS();
+}
+
+
+
+/**
+ * @brief Test deletion of topics
+ *
+ *
+ */
+static void do_test_DeleteTopics(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int op_timeout) {
+ rd_kafka_queue_t *q;
+ const int skip_topic_cnt = 2;
+#define MY_DEL_TOPICS_CNT 9
+ char *topics[MY_DEL_TOPICS_CNT];
+ rd_kafka_DeleteTopic_t *del_topics[MY_DEL_TOPICS_CNT];
+ rd_kafka_AdminOptions_t *options = NULL;
+ rd_kafka_resp_err_t exp_topicerr[MY_DEL_TOPICS_CNT] = {0};
+ rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ /* Expected topics in metadata */
+ rd_kafka_metadata_topic_t exp_mdtopics[MY_DEL_TOPICS_CNT] = {{0}};
+ int exp_mdtopic_cnt = 0;
+ /* Not expected topics in metadata */
+ rd_kafka_metadata_topic_t exp_not_mdtopics[MY_DEL_TOPICS_CNT] = {{0}};
+ int exp_not_mdtopic_cnt = 0;
+ int i;
+ char errstr[512];
+ const char *errstr2;
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ rd_kafka_event_t *rkev;
+ const rd_kafka_DeleteTopics_result_t *res;
+ const rd_kafka_topic_result_t **restopics;
+ size_t restopic_cnt;
+ int metadata_tmout;
+
+ SUB_TEST_QUICK("%s DeleteTopics with %s, op_timeout %d",
+ rd_kafka_name(rk), what, op_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ /**
+ * Construct DeleteTopic array
+ */
+ for (i = 0; i < MY_DEL_TOPICS_CNT; i++) {
+ char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
+ int notexist_topic = i >= MY_DEL_TOPICS_CNT - skip_topic_cnt;
+
+ topics[i] = topic;
+
+ del_topics[i] = rd_kafka_DeleteTopic_new(topic);
+
+ if (notexist_topic)
+ exp_topicerr[i] =
+ RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
+ else {
+ exp_topicerr[i] = RD_KAFKA_RESP_ERR_NO_ERROR;
+
+ exp_mdtopics[exp_mdtopic_cnt++].topic = topic;
+ }
+
+ exp_not_mdtopics[exp_not_mdtopic_cnt++].topic = topic;
+ }
+
+ if (op_timeout != -1) {
+ options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
+
+ err = rd_kafka_AdminOptions_set_operation_timeout(
+ options, op_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ }
+
+
+ /* Create the topics first, minus the skip count. */
+ test_CreateTopics_simple(rk, NULL, topics,
+ MY_DEL_TOPICS_CNT - skip_topic_cnt,
+ 2 /*num_partitions*/, NULL);
+
+ /* Verify that topics are reported by metadata */
+ test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0,
+ 15 * 1000);
+
+ TIMING_START(&timing, "DeleteTopics");
+ TEST_SAY("Call DeleteTopics\n");
+ rd_kafka_DeleteTopics(rk, del_topics, MY_DEL_TOPICS_CNT, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ /* Poll result queue for DeleteTopics result.
+ * Print but otherwise ignore other event types
+ * (typically generic Error events). */
+ TIMING_START(&timing, "DeleteTopics.queue_poll");
+ while (1) {
+ rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000));
+ TEST_SAY("DeleteTopics: got %s in %.3fms\n",
+ rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+ if (rd_kafka_event_error(rkev))
+ TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev),
+ rd_kafka_event_error_string(rkev));
+
+ if (rd_kafka_event_type(rkev) ==
+ RD_KAFKA_EVENT_DELETETOPICS_RESULT)
+ break;
+
+ rd_kafka_event_destroy(rkev);
+ }
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_DeleteTopics_result(rkev);
+ TEST_ASSERT(res, "expected DeleteTopics_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err == exp_err,
+ "expected DeleteTopics to return %s, not %s (%s)",
+ rd_kafka_err2str(exp_err), rd_kafka_err2str(err),
+ err ? errstr2 : "n/a");
+
+ TEST_SAY("DeleteTopics: returned %s (%s)\n", rd_kafka_err2str(err),
+ err ? errstr2 : "n/a");
+
+ /* Extract topics */
+ restopics = rd_kafka_DeleteTopics_result_topics(res, &restopic_cnt);
+
+
+ /* Scan topics for proper fields and expected failures. */
+ for (i = 0; i < (int)restopic_cnt; i++) {
+ const rd_kafka_topic_result_t *terr = restopics[i];
+
+ /* Verify that topic order matches our request. */
+ if (strcmp(rd_kafka_topic_result_name(terr), topics[i]))
+ TEST_FAIL_LATER(
+ "Topic result order mismatch at #%d: "
+ "expected %s, got %s",
+ i, topics[i], rd_kafka_topic_result_name(terr));
+
+ TEST_SAY("DeleteTopics result: #%d: %s: %s: %s\n", i,
+ rd_kafka_topic_result_name(terr),
+ rd_kafka_err2name(rd_kafka_topic_result_error(terr)),
+ rd_kafka_topic_result_error_string(terr));
+ if (rd_kafka_topic_result_error(terr) != exp_topicerr[i])
+ TEST_FAIL_LATER("Expected %s, not %d: %s",
+ rd_kafka_err2name(exp_topicerr[i]),
+ rd_kafka_topic_result_error(terr),
+ rd_kafka_err2name(
+ rd_kafka_topic_result_error(terr)));
+ }
+
+ /**
+ * Verify that the expected topics are deleted and the non-expected
+ * are not. Allow it some time to propagate.
+ */
+ if (op_timeout > 0)
+ metadata_tmout = op_timeout + 1000;
+ else
+ metadata_tmout = 10 * 1000;
+
+ test_wait_metadata_update(rk, NULL, 0, exp_not_mdtopics,
+ exp_not_mdtopic_cnt, metadata_tmout);
+
+ rd_kafka_event_destroy(rkev);
+
+ for (i = 0; i < MY_DEL_TOPICS_CNT; i++) {
+ rd_kafka_DeleteTopic_destroy(del_topics[i]);
+ rd_free(topics[i]);
+ }
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ TEST_LATER_CHECK();
+#undef MY_DEL_TOPICS_CNT
+
+ SUB_TEST_PASS();
+}
+
+
+
+/**
+ * @brief Test creation of partitions
+ *
+ *
+ */
+static void do_test_CreatePartitions(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int op_timeout) {
+ rd_kafka_queue_t *q;
+#define MY_CRP_TOPICS_CNT 9
+ char *topics[MY_CRP_TOPICS_CNT];
+ rd_kafka_NewTopic_t *new_topics[MY_CRP_TOPICS_CNT];
+ rd_kafka_NewPartitions_t *crp_topics[MY_CRP_TOPICS_CNT];
+ rd_kafka_AdminOptions_t *options = NULL;
+ /* Expected topics in metadata */
+ rd_kafka_metadata_topic_t exp_mdtopics[MY_CRP_TOPICS_CNT] = {{0}};
+ rd_kafka_metadata_partition_t exp_mdparts[2] = {{0}};
+ int exp_mdtopic_cnt = 0;
+ int i;
+ char errstr[512];
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ int metadata_tmout;
+ int num_replicas = (int)avail_broker_cnt;
+
+ SUB_TEST_QUICK("%s CreatePartitions with %s, op_timeout %d",
+ rd_kafka_name(rk), what, op_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ /* Set up two expected partitions with different replication sets
+ * so they can be matched by the metadata checker later.
+ * Even partitions use exp_mdparts[0] while odd partitions
+ * use exp_mdparts[1]. */
+
+ /* Set valid replica assignments (even, and odd (reverse) ) */
+ exp_mdparts[0].replicas =
+ rd_alloca(sizeof(*exp_mdparts[0].replicas) * num_replicas);
+ exp_mdparts[1].replicas =
+ rd_alloca(sizeof(*exp_mdparts[1].replicas) * num_replicas);
+ exp_mdparts[0].replica_cnt = num_replicas;
+ exp_mdparts[1].replica_cnt = num_replicas;
+ for (i = 0; i < num_replicas; i++) {
+ exp_mdparts[0].replicas[i] = avail_brokers[i];
+ exp_mdparts[1].replicas[i] =
+ avail_brokers[num_replicas - i - 1];
+ }
+
+ /**
+ * Construct CreatePartitions array
+ */
+ for (i = 0; i < MY_CRP_TOPICS_CNT; i++) {
+ char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
+ int initial_part_cnt = 1 + (i * 2);
+ int new_part_cnt = 1 + (i / 2);
+ int final_part_cnt = initial_part_cnt + new_part_cnt;
+ int set_replicas = !(i % 2);
+ int pi;
+
+ topics[i] = topic;
+
+ /* Topic to create with initial partition count */
+ new_topics[i] = rd_kafka_NewTopic_new(
+ topic, initial_part_cnt, set_replicas ? -1 : num_replicas,
+ NULL, 0);
+
+ /* .. and later add more partitions to */
+ crp_topics[i] = rd_kafka_NewPartitions_new(
+ topic, final_part_cnt, errstr, sizeof(errstr));
+
+ if (set_replicas) {
+ exp_mdtopics[exp_mdtopic_cnt].partitions = rd_alloca(
+ final_part_cnt *
+ sizeof(*exp_mdtopics[exp_mdtopic_cnt].partitions));
+
+ for (pi = 0; pi < final_part_cnt; pi++) {
+ const rd_kafka_metadata_partition_t *exp_mdp =
+ &exp_mdparts[pi & 1];
+
+ exp_mdtopics[exp_mdtopic_cnt].partitions[pi] =
+ *exp_mdp; /* copy */
+
+ exp_mdtopics[exp_mdtopic_cnt]
+ .partitions[pi]
+ .id = pi;
+
+ if (pi < initial_part_cnt) {
+ /* Set replica assignment
+ * for initial partitions */
+ err =
+ rd_kafka_NewTopic_set_replica_assignment(
+ new_topics[i], pi,
+ exp_mdp->replicas,
+ (size_t)exp_mdp->replica_cnt,
+ errstr, sizeof(errstr));
+ TEST_ASSERT(!err,
+ "NewTopic_set_replica_"
+ "assignment: %s",
+ errstr);
+ } else {
+ /* Set replica assignment for new
+ * partitions */
+ err =
+ rd_kafka_NewPartitions_set_replica_assignment(
+ crp_topics[i],
+ pi - initial_part_cnt,
+ exp_mdp->replicas,
+ (size_t)exp_mdp->replica_cnt,
+ errstr, sizeof(errstr));
+ TEST_ASSERT(!err,
+ "NewPartitions_set_replica_"
+ "assignment: %s",
+ errstr);
+ }
+ }
+ }
+
+ TEST_SAY(_C_YEL
+ "Topic %s with %d initial partitions will grow "
+ "by %d to %d total partitions with%s replicas set\n",
+ topics[i], initial_part_cnt, new_part_cnt,
+ final_part_cnt, set_replicas ? "" : "out");
+
+ exp_mdtopics[exp_mdtopic_cnt].topic = topic;
+ exp_mdtopics[exp_mdtopic_cnt].partition_cnt = final_part_cnt;
+
+ exp_mdtopic_cnt++;
+ }
+
+ if (op_timeout != -1) {
+ options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
+
+ err = rd_kafka_AdminOptions_set_operation_timeout(
+ options, op_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ }
+
+ /*
+ * Create topics with initial partition count
+ */
+ TIMING_START(&timing, "CreateTopics");
+ TEST_SAY("Creating topics with initial partition counts\n");
+ rd_kafka_CreateTopics(rk, new_topics, MY_CRP_TOPICS_CNT, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ err = test_wait_topic_admin_result(
+ q, RD_KAFKA_EVENT_CREATETOPICS_RESULT, NULL, 15000);
+ TEST_ASSERT(!err, "CreateTopics failed: %s", rd_kafka_err2str(err));
+
+ rd_kafka_NewTopic_destroy_array(new_topics, MY_CRP_TOPICS_CNT);
+
+
+ /*
+ * Create new partitions
+ */
+ TIMING_START(&timing, "CreatePartitions");
+ TEST_SAY("Creating partitions\n");
+ rd_kafka_CreatePartitions(rk, crp_topics, MY_CRP_TOPICS_CNT, options,
+ q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ err = test_wait_topic_admin_result(
+ q, RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT, NULL, 15000);
+ TEST_ASSERT(!err, "CreatePartitions failed: %s", rd_kafka_err2str(err));
+
+ rd_kafka_NewPartitions_destroy_array(crp_topics, MY_CRP_TOPICS_CNT);
+
+
+ /**
+ * Verify that the expected topics are deleted and the non-expected
+ * are not. Allow it some time to propagate.
+ */
+ if (op_timeout > 0)
+ metadata_tmout = op_timeout + 1000;
+ else
+ metadata_tmout = 10 * 1000;
+
+ test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0,
+ metadata_tmout);
+
+ for (i = 0; i < MY_CRP_TOPICS_CNT; i++)
+ rd_free(topics[i]);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ TEST_LATER_CHECK();
+#undef MY_CRP_TOPICS_CNT
+
+ SUB_TEST_PASS();
+}
+
+
+
+/**
+ * @brief Print the ConfigEntrys in the provided array.
+ */
+static void test_print_ConfigEntry_array(const rd_kafka_ConfigEntry_t **entries,
+ size_t entry_cnt,
+ unsigned int depth) {
+ const char *indent = &" "[4 - (depth > 4 ? 4 : depth)];
+ size_t ei;
+
+ for (ei = 0; ei < entry_cnt; ei++) {
+ const rd_kafka_ConfigEntry_t *e = entries[ei];
+ const rd_kafka_ConfigEntry_t **syns;
+ size_t syn_cnt;
+
+ syns = rd_kafka_ConfigEntry_synonyms(e, &syn_cnt);
+
+#define YN(v) ((v) ? "y" : "n")
+ TEST_SAYL(
+ 3,
+ "%s#%" PRIusz "/%" PRIusz
+ ": Source %s (%d): \"%s\"=\"%s\" "
+ "[is read-only=%s, default=%s, sensitive=%s, "
+ "synonym=%s] with %" PRIusz " synonym(s)\n",
+ indent, ei, entry_cnt,
+ rd_kafka_ConfigSource_name(rd_kafka_ConfigEntry_source(e)),
+ rd_kafka_ConfigEntry_source(e),
+ rd_kafka_ConfigEntry_name(e),
+ rd_kafka_ConfigEntry_value(e)
+ ? rd_kafka_ConfigEntry_value(e)
+ : "(NULL)",
+ YN(rd_kafka_ConfigEntry_is_read_only(e)),
+ YN(rd_kafka_ConfigEntry_is_default(e)),
+ YN(rd_kafka_ConfigEntry_is_sensitive(e)),
+ YN(rd_kafka_ConfigEntry_is_synonym(e)), syn_cnt);
+#undef YN
+
+ if (syn_cnt > 0)
+ test_print_ConfigEntry_array(syns, syn_cnt, depth + 1);
+ }
+}
+
+
+/**
+ * @brief Test AlterConfigs
+ */
+static void do_test_AlterConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
+#define MY_CONFRES_CNT 3
+ char *topics[MY_CONFRES_CNT];
+ rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
+ rd_kafka_AdminOptions_t *options;
+ rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT];
+ rd_kafka_event_t *rkev;
+ rd_kafka_resp_err_t err;
+ const rd_kafka_AlterConfigs_result_t *res;
+ const rd_kafka_ConfigResource_t **rconfigs;
+ size_t rconfig_cnt;
+ char errstr[128];
+ const char *errstr2;
+ int ci = 0;
+ int i;
+ int fails = 0;
+
+ SUB_TEST_QUICK();
+
+ /*
+ * Only create one topic, the others will be non-existent.
+ */
+ for (i = 0; i < MY_CONFRES_CNT; i++)
+ rd_strdupa(&topics[i], test_mk_topic_name(__FUNCTION__, 1));
+
+ test_CreateTopics_simple(rk, NULL, topics, 1, 1, NULL);
+
+ test_wait_topic_exists(rk, topics[0], 10000);
+
+ /*
+ * ConfigResource #0: valid topic config
+ */
+ configs[ci] =
+ rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_TOPIC, topics[ci]);
+
+ err = rd_kafka_ConfigResource_set_config(configs[ci],
+ "compression.type", "gzip");
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ err = rd_kafka_ConfigResource_set_config(configs[ci], "flush.ms",
+ "12345678");
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
+ ci++;
+
+
+ if (test_broker_version >= TEST_BRKVER(1, 1, 0, 0)) {
+ /*
+ * ConfigResource #1: valid broker config
+ */
+ configs[ci] = rd_kafka_ConfigResource_new(
+ RD_KAFKA_RESOURCE_BROKER,
+ tsprintf("%" PRId32, avail_brokers[0]));
+
+ err = rd_kafka_ConfigResource_set_config(
+ configs[ci], "sasl.kerberos.min.time.before.relogin",
+ "58000");
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
+ ci++;
+ } else {
+ TEST_WARN(
+ "Skipping RESOURCE_BROKER test on unsupported "
+ "broker version\n");
+ }
+
+ /*
+ * ConfigResource #2: valid topic config, non-existent topic
+ */
+ configs[ci] =
+ rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_TOPIC, topics[ci]);
+
+ err = rd_kafka_ConfigResource_set_config(configs[ci],
+ "compression.type", "lz4");
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ err = rd_kafka_ConfigResource_set_config(
+ configs[ci], "offset.metadata.max.bytes", "12345");
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ if (test_broker_version >= TEST_BRKVER(2, 7, 0, 0))
+ exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
+ else
+ exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN;
+ ci++;
+
+
+ /*
+ * Timeout options
+ */
+ options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ALTERCONFIGS);
+ err = rd_kafka_AdminOptions_set_request_timeout(options, 10000, errstr,
+ sizeof(errstr));
+ TEST_ASSERT(!err, "%s", errstr);
+
+
+ /*
+ * Fire off request
+ */
+ rd_kafka_AlterConfigs(rk, configs, ci, options, rkqu);
+
+ rd_kafka_AdminOptions_destroy(options);
+
+ /*
+ * Wait for result
+ */
+ rkev = test_wait_admin_result(rkqu, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT,
+ 10000 + 1000);
+
+ /*
+ * Extract result
+ */
+ res = rd_kafka_event_AlterConfigs_result(rkev);
+ TEST_ASSERT(res, "Expected AlterConfigs result, not %s",
+ rd_kafka_event_name(rkev));
+
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(!err, "Expected success, not %s: %s",
+ rd_kafka_err2name(err), errstr2);
+
+ rconfigs = rd_kafka_AlterConfigs_result_resources(res, &rconfig_cnt);
+ TEST_ASSERT((int)rconfig_cnt == ci,
+ "Expected %d result resources, got %" PRIusz "\n", ci,
+ rconfig_cnt);
+
+ /*
+ * Verify status per resource
+ */
+ for (i = 0; i < (int)rconfig_cnt; i++) {
+ const rd_kafka_ConfigEntry_t **entries;
+ size_t entry_cnt;
+
+ err = rd_kafka_ConfigResource_error(rconfigs[i]);
+ errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[i]);
+
+ entries =
+ rd_kafka_ConfigResource_configs(rconfigs[i], &entry_cnt);
+
+ TEST_SAY(
+ "ConfigResource #%d: type %s (%d), \"%s\": "
+ "%" PRIusz " ConfigEntries, error %s (%s)\n",
+ i,
+ rd_kafka_ResourceType_name(
+ rd_kafka_ConfigResource_type(rconfigs[i])),
+ rd_kafka_ConfigResource_type(rconfigs[i]),
+ rd_kafka_ConfigResource_name(rconfigs[i]), entry_cnt,
+ rd_kafka_err2name(err), errstr2 ? errstr2 : "");
+
+ test_print_ConfigEntry_array(entries, entry_cnt, 1);
+
+ if (rd_kafka_ConfigResource_type(rconfigs[i]) !=
+ rd_kafka_ConfigResource_type(configs[i]) ||
+ strcmp(rd_kafka_ConfigResource_name(rconfigs[i]),
+ rd_kafka_ConfigResource_name(configs[i]))) {
+ TEST_FAIL_LATER(
+ "ConfigResource #%d: "
+ "expected type %s name %s, "
+ "got type %s name %s",
+ i,
+ rd_kafka_ResourceType_name(
+ rd_kafka_ConfigResource_type(configs[i])),
+ rd_kafka_ConfigResource_name(configs[i]),
+ rd_kafka_ResourceType_name(
+ rd_kafka_ConfigResource_type(rconfigs[i])),
+ rd_kafka_ConfigResource_name(rconfigs[i]));
+ fails++;
+ continue;
+ }
+
+
+ if (err != exp_err[i]) {
+ TEST_FAIL_LATER(
+ "ConfigResource #%d: "
+ "expected %s (%d), got %s (%s)",
+ i, rd_kafka_err2name(exp_err[i]), exp_err[i],
+ rd_kafka_err2name(err), errstr2 ? errstr2 : "");
+ fails++;
+ }
+ }
+
+ TEST_ASSERT(!fails, "See %d previous failure(s)", fails);
+
+ rd_kafka_event_destroy(rkev);
+
+ rd_kafka_ConfigResource_destroy_array(configs, ci);
+
+ TEST_LATER_CHECK();
+#undef MY_CONFRES_CNT
+
+ SUB_TEST_PASS();
+}
+
+
+
+/**
+ * @brief Test DescribeConfigs
+ */
+static void do_test_DescribeConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
+#define MY_CONFRES_CNT 3
+ char *topics[MY_CONFRES_CNT];
+ rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
+ rd_kafka_AdminOptions_t *options;
+ rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT];
+ rd_kafka_event_t *rkev;
+ rd_kafka_resp_err_t err;
+ const rd_kafka_DescribeConfigs_result_t *res;
+ const rd_kafka_ConfigResource_t **rconfigs;
+ size_t rconfig_cnt;
+ char errstr[128];
+ const char *errstr2;
+ int ci = 0;
+ int i;
+ int fails = 0;
+ int max_retry_describe = 3;
+
+ SUB_TEST_QUICK();
+
+ /*
+ * Only create one topic, the others will be non-existent.
+ */
+ rd_strdupa(&topics[0], test_mk_topic_name("DescribeConfigs_exist", 1));
+ for (i = 1; i < MY_CONFRES_CNT; i++)
+ rd_strdupa(&topics[i],
+ test_mk_topic_name("DescribeConfigs_notexist", 1));
+
+ test_CreateTopics_simple(rk, NULL, topics, 1, 1, NULL);
+
+ /*
+ * ConfigResource #0: topic config, no config entries.
+ */
+ configs[ci] =
+ rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_TOPIC, topics[ci]);
+ exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
+ ci++;
+
+ /*
+ * ConfigResource #1:broker config, no config entries
+ */
+ configs[ci] = rd_kafka_ConfigResource_new(
+ RD_KAFKA_RESOURCE_BROKER, tsprintf("%" PRId32, avail_brokers[0]));
+
+ exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
+ ci++;
+
+ /*
+ * ConfigResource #2: topic config, non-existent topic, no config entr.
+ */
+ configs[ci] =
+ rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_TOPIC, topics[ci]);
+ /* FIXME: This is a bug in the broker (<v2.0.0), it returns a full
+ * response for unknown topics.
+ * https://issues.apache.org/jira/browse/KAFKA-6778
+ */
+ if (test_broker_version < TEST_BRKVER(2, 0, 0, 0))
+ exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR;
+ else
+ exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
+ ci++;
+
+
+retry_describe:
+ /*
+ * Timeout options
+ */
+ options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
+ err = rd_kafka_AdminOptions_set_request_timeout(options, 10000, errstr,
+ sizeof(errstr));
+ TEST_ASSERT(!err, "%s", errstr);
+
+
+ /*
+ * Fire off request
+ */
+ rd_kafka_DescribeConfigs(rk, configs, ci, options, rkqu);
+
+ rd_kafka_AdminOptions_destroy(options);
+
+ /*
+ * Wait for result
+ */
+ rkev = test_wait_admin_result(
+ rkqu, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, 10000 + 1000);
+
+ /*
+ * Extract result
+ */
+ res = rd_kafka_event_DescribeConfigs_result(rkev);
+ TEST_ASSERT(res, "Expected DescribeConfigs result, not %s",
+ rd_kafka_event_name(rkev));
+
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(!err, "Expected success, not %s: %s",
+ rd_kafka_err2name(err), errstr2);
+
+ rconfigs = rd_kafka_DescribeConfigs_result_resources(res, &rconfig_cnt);
+ TEST_ASSERT((int)rconfig_cnt == ci,
+ "Expected %d result resources, got %" PRIusz "\n", ci,
+ rconfig_cnt);
+
+ /*
+ * Verify status per resource
+ */
+ for (i = 0; i < (int)rconfig_cnt; i++) {
+ const rd_kafka_ConfigEntry_t **entries;
+ size_t entry_cnt;
+
+ err = rd_kafka_ConfigResource_error(rconfigs[i]);
+ errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[i]);
+
+ entries =
+ rd_kafka_ConfigResource_configs(rconfigs[i], &entry_cnt);
+
+ TEST_SAY(
+ "ConfigResource #%d: type %s (%d), \"%s\": "
+ "%" PRIusz " ConfigEntries, error %s (%s)\n",
+ i,
+ rd_kafka_ResourceType_name(
+ rd_kafka_ConfigResource_type(rconfigs[i])),
+ rd_kafka_ConfigResource_type(rconfigs[i]),
+ rd_kafka_ConfigResource_name(rconfigs[i]), entry_cnt,
+ rd_kafka_err2name(err), errstr2 ? errstr2 : "");
+
+ test_print_ConfigEntry_array(entries, entry_cnt, 1);
+
+ if (rd_kafka_ConfigResource_type(rconfigs[i]) !=
+ rd_kafka_ConfigResource_type(configs[i]) ||
+ strcmp(rd_kafka_ConfigResource_name(rconfigs[i]),
+ rd_kafka_ConfigResource_name(configs[i]))) {
+ TEST_FAIL_LATER(
+ "ConfigResource #%d: "
+ "expected type %s name %s, "
+ "got type %s name %s",
+ i,
+ rd_kafka_ResourceType_name(
+ rd_kafka_ConfigResource_type(configs[i])),
+ rd_kafka_ConfigResource_name(configs[i]),
+ rd_kafka_ResourceType_name(
+ rd_kafka_ConfigResource_type(rconfigs[i])),
+ rd_kafka_ConfigResource_name(rconfigs[i]));
+ fails++;
+ continue;
+ }
+
+
+ if (err != exp_err[i]) {
+ if (err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART &&
+ max_retry_describe-- > 0) {
+ TEST_WARN(
+ "ConfigResource #%d: "
+ "expected %s (%d), got %s (%s): "
+ "this is typically a temporary "
+ "error while the new resource "
+ "is propagating: retrying",
+ i, rd_kafka_err2name(exp_err[i]),
+ exp_err[i], rd_kafka_err2name(err),
+ errstr2 ? errstr2 : "");
+ rd_kafka_event_destroy(rkev);
+ rd_sleep(1);
+ goto retry_describe;
+ }
+
+ TEST_FAIL_LATER(
+ "ConfigResource #%d: "
+ "expected %s (%d), got %s (%s)",
+ i, rd_kafka_err2name(exp_err[i]), exp_err[i],
+ rd_kafka_err2name(err), errstr2 ? errstr2 : "");
+ fails++;
+ }
+ }
+
+ TEST_ASSERT(!fails, "See %d previous failure(s)", fails);
+
+ rd_kafka_event_destroy(rkev);
+
+ rd_kafka_ConfigResource_destroy_array(configs, ci);
+
+ TEST_LATER_CHECK();
+#undef MY_CONFRES_CNT
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief Test CreateAcls
+ */
+static void
+do_test_CreateAcls(rd_kafka_t *rk, rd_kafka_queue_t *useq, int version) {
+ rd_kafka_queue_t *q = useq ? useq : rd_kafka_queue_new(rk);
+ size_t resacl_cnt;
+ test_timing_t timing;
+ rd_kafka_resp_err_t err;
+ char errstr[128];
+ const char *errstr2;
+ const char *user_test1 = "User:test1";
+ const char *user_test2 = "User:test2";
+ const char *base_topic_name;
+ char topic1_name[512];
+ char topic2_name[512];
+ rd_kafka_AclBinding_t *acl_bindings[2];
+ rd_kafka_ResourcePatternType_t pattern_type_first_topic =
+ RD_KAFKA_RESOURCE_PATTERN_PREFIXED;
+ rd_kafka_AdminOptions_t *admin_options;
+ rd_kafka_event_t *rkev_acl_create;
+ const rd_kafka_CreateAcls_result_t *acl_res;
+ const rd_kafka_acl_result_t **acl_res_acls;
+ unsigned int i;
+
+ SUB_TEST_QUICK();
+
+ if (version == 0)
+ pattern_type_first_topic = RD_KAFKA_RESOURCE_PATTERN_LITERAL;
+
+ base_topic_name = test_mk_topic_name(__FUNCTION__, 1);
+
+ rd_snprintf(topic1_name, sizeof(topic1_name), "%s_1", base_topic_name);
+ rd_snprintf(topic2_name, sizeof(topic2_name), "%s_2", base_topic_name);
+
+
+ acl_bindings[0] = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic1_name, pattern_type_first_topic,
+ user_test1, "*", RD_KAFKA_ACL_OPERATION_READ,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, NULL, 0);
+ acl_bindings[1] = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic2_name,
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL, user_test2, "*",
+ RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW,
+ NULL, 0);
+
+
+ admin_options =
+ rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATEACLS);
+ err = rd_kafka_AdminOptions_set_request_timeout(admin_options, 10000,
+ errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", errstr);
+
+ TIMING_START(&timing, "CreateAcls");
+ TEST_SAY("Call CreateAcls\n");
+ rd_kafka_CreateAcls(rk, acl_bindings, 2, admin_options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ /*
+ * Wait for result
+ */
+ rkev_acl_create = test_wait_admin_result(
+ q, RD_KAFKA_EVENT_CREATEACLS_RESULT, 10000 + 1000);
+
+ err = rd_kafka_event_error(rkev_acl_create);
+ errstr2 = rd_kafka_event_error_string(rkev_acl_create);
+
+ if (test_broker_version < TEST_BRKVER(0, 11, 0, 0)) {
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
+ "Expected unsupported feature, not: %s",
+ rd_kafka_err2name(err));
+ TEST_ASSERT(!strcmp(errstr2,
+ "ACLs Admin API (KIP-140) not supported "
+ "by broker, requires broker "
+ "version >= 0.11.0.0"),
+ "Expected a different message, not: %s", errstr2);
+ TEST_FAIL("Unexpected error: %s", rd_kafka_err2name(err));
+ }
+
+ if (version > 0 && test_broker_version < TEST_BRKVER(2, 0, 0, 0)) {
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
+ "Expected unsupported feature, not: %s",
+ rd_kafka_err2name(err));
+ TEST_ASSERT(!strcmp(errstr2,
+ "Broker only supports LITERAL "
+ "resource pattern types"),
+ "Expected a different message, not: %s", errstr2);
+ TEST_FAIL("Unexpected error: %s", rd_kafka_err2name(err));
+ }
+
+ TEST_ASSERT(!err, "Expected success, not %s: %s",
+ rd_kafka_err2name(err), errstr2);
+
+ /*
+ * Extract result
+ */
+ acl_res = rd_kafka_event_CreateAcls_result(rkev_acl_create);
+ TEST_ASSERT(acl_res, "Expected CreateAcls result, not %s",
+ rd_kafka_event_name(rkev_acl_create));
+
+ acl_res_acls = rd_kafka_CreateAcls_result_acls(acl_res, &resacl_cnt);
+ TEST_ASSERT(resacl_cnt == 2, "Expected 2, not %zu", resacl_cnt);
+
+ for (i = 0; i < resacl_cnt; i++) {
+ const rd_kafka_acl_result_t *acl_res_acl = *(acl_res_acls + i);
+ const rd_kafka_error_t *error =
+ rd_kafka_acl_result_error(acl_res_acl);
+
+ TEST_ASSERT(!error,
+ "Expected RD_KAFKA_RESP_ERR_NO_ERROR, not %s",
+ rd_kafka_error_string(error));
+ }
+
+ rd_kafka_AdminOptions_destroy(admin_options);
+ rd_kafka_event_destroy(rkev_acl_create);
+ rd_kafka_AclBinding_destroy_array(acl_bindings, 2);
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief Test DescribeAcls
+ */
+static void
+do_test_DescribeAcls(rd_kafka_t *rk, rd_kafka_queue_t *useq, int version) {
+ rd_kafka_queue_t *q = useq ? useq : rd_kafka_queue_new(rk);
+ size_t acl_binding_results_cntp;
+ test_timing_t timing;
+ rd_kafka_resp_err_t err;
+ uint32_t i;
+ char errstr[128];
+ const char *errstr2;
+ const char *user_test1 = "User:test1";
+ const char *user_test2 = "User:test2";
+ const char *any_host = "*";
+ const char *topic_name;
+ rd_kafka_AclBinding_t *acl_bindings_create[2];
+ rd_kafka_AclBinding_t *acl_bindings_describe;
+ rd_kafka_AclBinding_t *acl;
+ const rd_kafka_DescribeAcls_result_t *acl_describe_result;
+ const rd_kafka_AclBinding_t **acl_binding_results;
+ rd_kafka_ResourcePatternType_t pattern_type_first_topic_create;
+ rd_bool_t broker_version1 =
+ test_broker_version >= TEST_BRKVER(2, 0, 0, 0);
+ rd_kafka_resp_err_t create_err;
+ rd_kafka_AdminOptions_t *admin_options;
+ rd_kafka_event_t *rkev_acl_describe;
+ const rd_kafka_error_t *error;
+
+ SUB_TEST_QUICK();
+
+ if (test_broker_version < TEST_BRKVER(0, 11, 0, 0)) {
+ SUB_TEST_SKIP(
+ "Skipping DESCRIBE_ACLS test on unsupported "
+ "broker version\n");
+ return;
+ }
+
+ pattern_type_first_topic_create = RD_KAFKA_RESOURCE_PATTERN_PREFIXED;
+ if (!broker_version1)
+ pattern_type_first_topic_create =
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL;
+
+ topic_name = test_mk_topic_name(__FUNCTION__, 1);
+
+ acl_bindings_create[0] = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic_name,
+ pattern_type_first_topic_create, user_test1, any_host,
+ RD_KAFKA_ACL_OPERATION_READ, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW,
+ NULL, 0);
+ acl_bindings_create[1] = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic_name,
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL, user_test2, any_host,
+ RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW,
+ NULL, 0);
+
+ create_err =
+ test_CreateAcls_simple(rk, NULL, acl_bindings_create, 2, NULL);
+
+ TEST_ASSERT(!create_err, "create error: %s",
+ rd_kafka_err2str(create_err));
+
+ acl_bindings_describe = rd_kafka_AclBindingFilter_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic_name,
+ RD_KAFKA_RESOURCE_PATTERN_MATCH, NULL, NULL,
+ RD_KAFKA_ACL_OPERATION_ANY, RD_KAFKA_ACL_PERMISSION_TYPE_ANY, NULL,
+ 0);
+
+ admin_options =
+ rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DESCRIBEACLS);
+ err = rd_kafka_AdminOptions_set_request_timeout(admin_options, 10000,
+ errstr, sizeof(errstr));
+
+ TIMING_START(&timing, "DescribeAcls");
+ TEST_SAY("Call DescribeAcls\n");
+ rd_kafka_DescribeAcls(rk, acl_bindings_describe, admin_options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ /*
+ * Wait for result
+ */
+ rkev_acl_describe = test_wait_admin_result(
+ q, RD_KAFKA_EVENT_DESCRIBEACLS_RESULT, 10000 + 1000);
+
+ err = rd_kafka_event_error(rkev_acl_describe);
+ errstr2 = rd_kafka_event_error_string(rkev_acl_describe);
+
+ if (!broker_version1) {
+ TEST_ASSERT(
+ err == RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,
+ "expected RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, not %s",
+ rd_kafka_err2str(err));
+ TEST_ASSERT(strcmp(errstr2,
+ "Broker only supports LITERAL and ANY "
+ "resource pattern types") == 0,
+ "expected another message, not %s", errstr2);
+ } else {
+ TEST_ASSERT(!err, "expected RD_KAFKA_RESP_ERR_NO_ERROR not %s",
+ errstr2);
+ }
+
+ if (!err) {
+
+ acl_describe_result =
+ rd_kafka_event_DescribeAcls_result(rkev_acl_describe);
+
+ TEST_ASSERT(acl_describe_result,
+ "acl_describe_result should not be NULL");
+
+ acl_binding_results_cntp = 0;
+ acl_binding_results = rd_kafka_DescribeAcls_result_acls(
+ acl_describe_result, &acl_binding_results_cntp);
+
+ TEST_ASSERT(acl_binding_results_cntp == 2,
+ "acl_binding_results_cntp should be 2, not %zu",
+ acl_binding_results_cntp);
+
+ for (i = 0; i < acl_binding_results_cntp; i++) {
+ acl = (rd_kafka_AclBinding_t *)acl_binding_results[i];
+
+ if (strcmp(rd_kafka_AclBinding_principal(acl),
+ user_test1) == 0) {
+ TEST_ASSERT(
+ rd_kafka_AclBinding_restype(acl) ==
+ RD_KAFKA_RESOURCE_TOPIC,
+ "acl->restype should be "
+ "RD_KAFKA_RESOURCE_TOPIC, not %s",
+ rd_kafka_ResourceType_name(
+ rd_kafka_AclBinding_restype(acl)));
+ TEST_ASSERT(
+ strcmp(rd_kafka_AclBinding_name(acl),
+ topic_name) == 0,
+ "acl->name should be %s, not %s",
+ topic_name, rd_kafka_AclBinding_name(acl));
+ TEST_ASSERT(
+ rd_kafka_AclBinding_resource_pattern_type(
+ acl) == pattern_type_first_topic_create,
+ "acl->resource_pattern_type should be %s, "
+ "not %s",
+ rd_kafka_ResourcePatternType_name(
+ pattern_type_first_topic_create),
+ rd_kafka_ResourcePatternType_name(
+ rd_kafka_AclBinding_resource_pattern_type(
+ acl)));
+ TEST_ASSERT(
+ strcmp(rd_kafka_AclBinding_principal(acl),
+ user_test1) == 0,
+ "acl->principal should be %s, not %s",
+ user_test1,
+ rd_kafka_AclBinding_principal(acl));
+
+ TEST_ASSERT(
+ strcmp(rd_kafka_AclBinding_host(acl),
+ any_host) == 0,
+ "acl->host should be %s, not %s", any_host,
+ rd_kafka_AclBinding_host(acl));
+
+ TEST_ASSERT(
+ rd_kafka_AclBinding_operation(acl) ==
+ RD_KAFKA_ACL_OPERATION_READ,
+ "acl->operation should be %s, not %s",
+ rd_kafka_AclOperation_name(
+ RD_KAFKA_ACL_OPERATION_READ),
+ rd_kafka_AclOperation_name(
+ rd_kafka_AclBinding_operation(acl)));
+
+ TEST_ASSERT(
+ rd_kafka_AclBinding_permission_type(acl) ==
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW,
+ "acl->permission_type should be %s, not %s",
+ rd_kafka_AclPermissionType_name(
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW),
+ rd_kafka_AclPermissionType_name(
+ rd_kafka_AclBinding_permission_type(
+ acl)));
+
+ error = rd_kafka_AclBinding_error(acl);
+ TEST_ASSERT(!error,
+ "acl->error should be NULL, not %s",
+ rd_kafka_error_string(error));
+
+ } else {
+ TEST_ASSERT(
+ rd_kafka_AclBinding_restype(acl) ==
+ RD_KAFKA_RESOURCE_TOPIC,
+ "acl->restype should be "
+ "RD_KAFKA_RESOURCE_TOPIC, not %s",
+ rd_kafka_ResourceType_name(
+ rd_kafka_AclBinding_restype(acl)));
+ TEST_ASSERT(
+ strcmp(rd_kafka_AclBinding_name(acl),
+ topic_name) == 0,
+ "acl->name should be %s, not %s",
+ topic_name, rd_kafka_AclBinding_name(acl));
+ TEST_ASSERT(
+ rd_kafka_AclBinding_resource_pattern_type(
+ acl) ==
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL,
+ "acl->resource_pattern_type should be %s, "
+ "not %s",
+ rd_kafka_ResourcePatternType_name(
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL),
+ rd_kafka_ResourcePatternType_name(
+ rd_kafka_AclBinding_resource_pattern_type(
+ acl)));
+ TEST_ASSERT(
+ strcmp(rd_kafka_AclBinding_principal(acl),
+ user_test2) == 0,
+ "acl->principal should be %s, not %s",
+ user_test2,
+ rd_kafka_AclBinding_principal(acl));
+
+ TEST_ASSERT(
+ strcmp(rd_kafka_AclBinding_host(acl),
+ any_host) == 0,
+ "acl->host should be %s, not %s", any_host,
+ rd_kafka_AclBinding_host(acl));
+
+ TEST_ASSERT(
+ rd_kafka_AclBinding_operation(acl) ==
+ RD_KAFKA_ACL_OPERATION_WRITE,
+ "acl->operation should be %s, not %s",
+ rd_kafka_AclOperation_name(
+ RD_KAFKA_ACL_OPERATION_WRITE),
+ rd_kafka_AclOperation_name(
+ rd_kafka_AclBinding_operation(acl)));
+
+ TEST_ASSERT(
+ rd_kafka_AclBinding_permission_type(acl) ==
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW,
+ "acl->permission_type should be %s, not %s",
+ rd_kafka_AclPermissionType_name(
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW),
+ rd_kafka_AclPermissionType_name(
+ rd_kafka_AclBinding_permission_type(
+ acl)));
+
+
+ error = rd_kafka_AclBinding_error(acl);
+ TEST_ASSERT(!error,
+ "acl->error should be NULL, not %s",
+ rd_kafka_error_string(error));
+ }
+ }
+ }
+
+ rd_kafka_AclBinding_destroy(acl_bindings_describe);
+ rd_kafka_event_destroy(rkev_acl_describe);
+
+ acl_bindings_describe = rd_kafka_AclBindingFilter_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic_name,
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL, NULL, NULL,
+ RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ANY,
+ NULL, 0);
+
+ TIMING_START(&timing, "DescribeAcls");
+ rd_kafka_DescribeAcls(rk, acl_bindings_describe, admin_options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ /*
+ * Wait for result
+ */
+ rkev_acl_describe = test_wait_admin_result(
+ q, RD_KAFKA_EVENT_DESCRIBEACLS_RESULT, 10000 + 1000);
+
+ err = rd_kafka_event_error(rkev_acl_describe);
+ errstr2 = rd_kafka_event_error_string(rkev_acl_describe);
+
+ TEST_ASSERT(!err, "expected RD_KAFKA_RESP_ERR_NO_ERROR not %s",
+ errstr2);
+
+ acl_describe_result =
+ rd_kafka_event_DescribeAcls_result(rkev_acl_describe);
+
+ TEST_ASSERT(acl_describe_result,
+ "acl_describe_result should not be NULL");
+
+ acl_binding_results_cntp = 0;
+ acl_binding_results = rd_kafka_DescribeAcls_result_acls(
+ acl_describe_result, &acl_binding_results_cntp);
+
+ TEST_ASSERT(acl_binding_results_cntp == 1,
+ "acl_binding_results_cntp should be 1, not %zu",
+ acl_binding_results_cntp);
+
+ acl = (rd_kafka_AclBinding_t *)acl_binding_results[0];
+
+ TEST_ASSERT(
+ rd_kafka_AclBinding_restype(acl) == RD_KAFKA_RESOURCE_TOPIC,
+ "acl->restype should be RD_KAFKA_RESOURCE_TOPIC, not %s",
+ rd_kafka_ResourceType_name(rd_kafka_AclBinding_restype(acl)));
+ TEST_ASSERT(strcmp(rd_kafka_AclBinding_name(acl), topic_name) == 0,
+ "acl->name should be %s, not %s", topic_name,
+ rd_kafka_AclBinding_name(acl));
+ TEST_ASSERT(rd_kafka_AclBinding_resource_pattern_type(acl) ==
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL,
+ "acl->resource_pattern_type should be %s, not %s",
+ rd_kafka_ResourcePatternType_name(
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL),
+ rd_kafka_ResourcePatternType_name(
+ rd_kafka_AclBinding_resource_pattern_type(acl)));
+ TEST_ASSERT(strcmp(rd_kafka_AclBinding_principal(acl), user_test2) == 0,
+ "acl->principal should be %s, not %s", user_test2,
+ rd_kafka_AclBinding_principal(acl));
+
+ TEST_ASSERT(strcmp(rd_kafka_AclBinding_host(acl), any_host) == 0,
+ "acl->host should be %s, not %s", any_host,
+ rd_kafka_AclBinding_host(acl));
+
+ TEST_ASSERT(
+ rd_kafka_AclBinding_permission_type(acl) ==
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW,
+ "acl->permission_type should be %s, not %s",
+ rd_kafka_AclPermissionType_name(RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW),
+ rd_kafka_AclPermissionType_name(
+ rd_kafka_AclBinding_permission_type(acl)));
+
+ error = rd_kafka_AclBinding_error(acl);
+ TEST_ASSERT(!error, "acl->error should be NULL, not %s",
+ rd_kafka_error_string(error));
+
+ rd_kafka_AclBinding_destroy(acl_bindings_describe);
+ rd_kafka_event_destroy(rkev_acl_describe);
+ rd_kafka_AdminOptions_destroy(admin_options);
+ rd_kafka_AclBinding_destroy_array(acl_bindings_create, 2);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief Count acls by acl filter
+ */
+static size_t
+do_test_acls_count(rd_kafka_t *rk,
+ rd_kafka_AclBindingFilter_t *acl_bindings_describe,
+ rd_kafka_queue_t *q) {
+ char errstr[128];
+ rd_kafka_resp_err_t err;
+ rd_kafka_AdminOptions_t *admin_options_describe;
+ rd_kafka_event_t *rkev_acl_describe;
+ const rd_kafka_DescribeAcls_result_t *acl_describe_result;
+ const char *errstr2;
+ size_t acl_binding_results_cntp;
+
+ admin_options_describe =
+ rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DESCRIBEACLS);
+ rd_kafka_AdminOptions_set_request_timeout(admin_options_describe, 10000,
+ errstr, sizeof(errstr));
+
+ rd_kafka_DescribeAcls(rk, acl_bindings_describe, admin_options_describe,
+ q);
+ /*
+ * Wait for result
+ */
+ rkev_acl_describe = test_wait_admin_result(
+ q, RD_KAFKA_EVENT_DESCRIBEACLS_RESULT, 10000 + 1000);
+
+ err = rd_kafka_event_error(rkev_acl_describe);
+ errstr2 = rd_kafka_event_error_string(rkev_acl_describe);
+
+ TEST_ASSERT(!err, "expected RD_KAFKA_RESP_ERR_NO_ERROR not %s",
+ errstr2);
+
+ acl_describe_result =
+ rd_kafka_event_DescribeAcls_result(rkev_acl_describe);
+
+ TEST_ASSERT(acl_describe_result,
+ "acl_describe_result should not be NULL");
+
+ acl_binding_results_cntp = 0;
+ rd_kafka_DescribeAcls_result_acls(acl_describe_result,
+ &acl_binding_results_cntp);
+ rd_kafka_event_destroy(rkev_acl_describe);
+ rd_kafka_AdminOptions_destroy(admin_options_describe);
+
+ return acl_binding_results_cntp;
+}
+
+/**
+ * @brief Test DeleteAcls
+ */
+static void
+do_test_DeleteAcls(rd_kafka_t *rk, rd_kafka_queue_t *useq, int version) {
+ rd_kafka_queue_t *q = useq ? useq : rd_kafka_queue_new(rk);
+ test_timing_t timing;
+ uint32_t i;
+ char errstr[128];
+ const char *user_test1 = "User:test1";
+ const char *user_test2 = "User:test2";
+ const char *any_host = "*";
+ const char *base_topic_name;
+ char topic1_name[512];
+ char topic2_name[512];
+ size_t acl_binding_results_cntp;
+ size_t DeleteAcls_result_responses_cntp;
+ size_t matching_acls_cntp;
+ rd_kafka_AclBinding_t *acl_bindings_create[3];
+ rd_kafka_AclBindingFilter_t *acl_bindings_describe;
+ rd_kafka_AclBindingFilter_t *acl_bindings_delete;
+ rd_kafka_event_t *rkev_acl_delete;
+ rd_kafka_AdminOptions_t *admin_options_delete;
+ const rd_kafka_DeleteAcls_result_t *acl_delete_result;
+ const rd_kafka_DeleteAcls_result_response_t *
+ *DeleteAcls_result_responses;
+ const rd_kafka_DeleteAcls_result_response_t *DeleteAcls_result_response;
+ const rd_kafka_AclBinding_t **matching_acls;
+ const rd_kafka_AclBinding_t *matching_acl;
+ rd_kafka_ResourcePatternType_t pattern_type_first_topic_create;
+ rd_kafka_ResourcePatternType_t pattern_type_delete;
+ rd_bool_t broker_version1 =
+ test_broker_version >= TEST_BRKVER(2, 0, 0, 0);
+ rd_kafka_resp_err_t create_err;
+ rd_kafka_ResourceType_t restype;
+ rd_kafka_ResourcePatternType_t resource_pattern_type;
+ rd_kafka_AclOperation_t operation;
+ rd_kafka_AclPermissionType_t permission_type;
+ const char *name;
+ const char *principal;
+ const rd_kafka_error_t *error;
+
+ SUB_TEST_QUICK();
+
+ if (test_broker_version < TEST_BRKVER(0, 11, 0, 0)) {
+ SUB_TEST_SKIP(
+ "Skipping DELETE_ACLS test on unsupported "
+ "broker version\n");
+ return;
+ }
+
+ pattern_type_first_topic_create = RD_KAFKA_RESOURCE_PATTERN_PREFIXED;
+ pattern_type_delete = RD_KAFKA_RESOURCE_PATTERN_MATCH;
+ if (!broker_version1) {
+ pattern_type_first_topic_create =
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL;
+ pattern_type_delete = RD_KAFKA_RESOURCE_PATTERN_LITERAL;
+ }
+
+ base_topic_name = test_mk_topic_name(__FUNCTION__, 1);
+
+ rd_snprintf(topic1_name, sizeof(topic1_name), "%s_1", base_topic_name);
+ rd_snprintf(topic2_name, sizeof(topic2_name), "%s_2", base_topic_name);
+
+ acl_bindings_create[0] = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic1_name,
+ pattern_type_first_topic_create, user_test1, any_host,
+ RD_KAFKA_ACL_OPERATION_READ, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW,
+ NULL, 0);
+ acl_bindings_create[1] = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic1_name,
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL, user_test2, any_host,
+ RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW,
+ NULL, 0);
+ acl_bindings_create[2] = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic2_name,
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL, user_test2, any_host,
+ RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW,
+ NULL, 0);
+
+ acl_bindings_delete = rd_kafka_AclBindingFilter_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic1_name, pattern_type_delete, NULL,
+ NULL, RD_KAFKA_ACL_OPERATION_ANY, RD_KAFKA_ACL_PERMISSION_TYPE_ANY,
+ NULL, 0);
+
+ acl_bindings_describe = acl_bindings_delete;
+
+ create_err =
+ test_CreateAcls_simple(rk, NULL, acl_bindings_create, 3, NULL);
+
+ TEST_ASSERT(!create_err, "create error: %s",
+ rd_kafka_err2str(create_err));
+
+ admin_options_delete =
+ rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETEACLS);
+ rd_kafka_AdminOptions_set_request_timeout(admin_options_delete, 10000,
+ errstr, sizeof(errstr));
+
+ acl_binding_results_cntp =
+ do_test_acls_count(rk, acl_bindings_describe, q);
+ TEST_ASSERT(acl_binding_results_cntp == 2,
+ "acl_binding_results_cntp should not be 2, not %zu\n",
+ acl_binding_results_cntp);
+
+ TIMING_START(&timing, "DeleteAcls");
+ rd_kafka_DeleteAcls(rk, &acl_bindings_delete, 1, admin_options_delete,
+ q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ /*
+ * Wait for result
+ */
+ rkev_acl_delete = test_wait_admin_result(
+ q, RD_KAFKA_EVENT_DELETEACLS_RESULT, 10000 + 1000);
+
+ acl_delete_result = rd_kafka_event_DeleteAcls_result(rkev_acl_delete);
+
+ TEST_ASSERT(acl_delete_result, "acl_delete_result should not be NULL");
+
+ DeleteAcls_result_responses_cntp = 0;
+ DeleteAcls_result_responses = rd_kafka_DeleteAcls_result_responses(
+ acl_delete_result, &DeleteAcls_result_responses_cntp);
+
+ TEST_ASSERT(DeleteAcls_result_responses_cntp == 1,
+ "DeleteAcls_result_responses_cntp should be 1, not %zu\n",
+ DeleteAcls_result_responses_cntp);
+
+ DeleteAcls_result_response = DeleteAcls_result_responses[0];
+
+ TEST_CALL_ERROR__(rd_kafka_DeleteAcls_result_response_error(
+ DeleteAcls_result_response));
+
+ matching_acls = rd_kafka_DeleteAcls_result_response_matching_acls(
+ DeleteAcls_result_response, &matching_acls_cntp);
+
+ TEST_ASSERT(matching_acls_cntp == 2,
+ "matching_acls_cntp should be 2, not %zu\n",
+ matching_acls_cntp);
+
+ for (i = 0; i < matching_acls_cntp; i++) {
+ rd_kafka_ResourceType_t restype;
+ rd_kafka_ResourcePatternType_t resource_pattern_type;
+ rd_kafka_AclOperation_t operation;
+ rd_kafka_AclPermissionType_t permission_type;
+ const char *name;
+ const char *principal;
+
+ matching_acl = matching_acls[i];
+ error = rd_kafka_AclBinding_error(matching_acl);
+ restype = rd_kafka_AclBinding_restype(matching_acl);
+ name = rd_kafka_AclBinding_name(matching_acl);
+ resource_pattern_type =
+ rd_kafka_AclBinding_resource_pattern_type(matching_acl);
+ principal = rd_kafka_AclBinding_principal(matching_acl);
+ operation = rd_kafka_AclBinding_operation(matching_acl);
+ permission_type =
+ rd_kafka_AclBinding_permission_type(matching_acl);
+
+ TEST_ASSERT(!error, "expected success, not %s",
+ rd_kafka_error_string(error));
+ TEST_ASSERT(restype == RD_KAFKA_RESOURCE_TOPIC,
+ "expected RD_KAFKA_RESOURCE_TOPIC not %s",
+ rd_kafka_ResourceType_name(restype));
+ TEST_ASSERT(strcmp(name, topic1_name) == 0,
+ "expected %s not %s", topic1_name, name);
+ TEST_ASSERT(permission_type ==
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW,
+ "expected %s not %s",
+ rd_kafka_AclPermissionType_name(
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW),
+ rd_kafka_AclPermissionType_name(permission_type));
+
+ if (strcmp(user_test1, principal) == 0) {
+ TEST_ASSERT(resource_pattern_type ==
+ pattern_type_first_topic_create,
+ "expected %s not %s",
+ rd_kafka_ResourcePatternType_name(
+ pattern_type_first_topic_create),
+ rd_kafka_ResourcePatternType_name(
+ resource_pattern_type));
+
+ TEST_ASSERT(operation == RD_KAFKA_ACL_OPERATION_READ,
+ "expected %s not %s",
+ rd_kafka_AclOperation_name(
+ RD_KAFKA_ACL_OPERATION_READ),
+ rd_kafka_AclOperation_name(operation));
+
+ } else {
+ TEST_ASSERT(resource_pattern_type ==
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL,
+ "expected %s not %s",
+ rd_kafka_ResourcePatternType_name(
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL),
+ rd_kafka_ResourcePatternType_name(
+ resource_pattern_type));
+
+ TEST_ASSERT(operation == RD_KAFKA_ACL_OPERATION_WRITE,
+ "expected %s not %s",
+ rd_kafka_AclOperation_name(
+ RD_KAFKA_ACL_OPERATION_WRITE),
+ rd_kafka_AclOperation_name(operation));
+ }
+ }
+
+ acl_binding_results_cntp =
+ do_test_acls_count(rk, acl_bindings_describe, q);
+ TEST_ASSERT(acl_binding_results_cntp == 0,
+ "acl_binding_results_cntp should be 0, not %zu\n",
+ acl_binding_results_cntp);
+
+ rd_kafka_event_destroy(rkev_acl_delete);
+ rd_kafka_AclBinding_destroy(acl_bindings_delete);
+
+ acl_bindings_delete = rd_kafka_AclBindingFilter_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic2_name,
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL, NULL, NULL,
+ RD_KAFKA_ACL_OPERATION_ANY, RD_KAFKA_ACL_PERMISSION_TYPE_ANY, NULL,
+ 0);
+ acl_bindings_describe = acl_bindings_delete;
+
+ TIMING_START(&timing, "DeleteAcls");
+ rd_kafka_DeleteAcls(rk, &acl_bindings_delete, 1, admin_options_delete,
+ q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ /*
+ * Wait for result
+ */
+ rkev_acl_delete = test_wait_admin_result(
+ q, RD_KAFKA_EVENT_DELETEACLS_RESULT, 10000 + 1000);
+
+ acl_delete_result = rd_kafka_event_DeleteAcls_result(rkev_acl_delete);
+
+ TEST_ASSERT(acl_delete_result, "acl_delete_result should not be NULL");
+
+ DeleteAcls_result_responses_cntp = 0;
+ DeleteAcls_result_responses = rd_kafka_DeleteAcls_result_responses(
+ acl_delete_result, &DeleteAcls_result_responses_cntp);
+
+ TEST_ASSERT(DeleteAcls_result_responses_cntp == 1,
+ "DeleteAcls_result_responses_cntp should be 1, not %zu\n",
+ DeleteAcls_result_responses_cntp);
+
+ DeleteAcls_result_response = DeleteAcls_result_responses[0];
+
+ TEST_CALL_ERROR__(rd_kafka_DeleteAcls_result_response_error(
+ DeleteAcls_result_response));
+
+ matching_acls = rd_kafka_DeleteAcls_result_response_matching_acls(
+ DeleteAcls_result_response, &matching_acls_cntp);
+
+ TEST_ASSERT(matching_acls_cntp == 1,
+ "matching_acls_cntp should be 1, not %zu\n",
+ matching_acls_cntp);
+
+ matching_acl = matching_acls[0];
+ error = rd_kafka_AclBinding_error(matching_acl);
+ restype = rd_kafka_AclBinding_restype(matching_acl);
+ name = rd_kafka_AclBinding_name(matching_acl);
+ resource_pattern_type =
+ rd_kafka_AclBinding_resource_pattern_type(matching_acl);
+ principal = rd_kafka_AclBinding_principal(matching_acl);
+ operation = rd_kafka_AclBinding_operation(matching_acl);
+ permission_type = rd_kafka_AclBinding_permission_type(matching_acl);
+
+ TEST_ASSERT(!error, "expected RD_KAFKA_RESP_ERR_NO_ERROR not %s",
+ rd_kafka_error_string(error));
+ TEST_ASSERT(restype == RD_KAFKA_RESOURCE_TOPIC,
+ "expected RD_KAFKA_RESOURCE_TOPIC not %s",
+ rd_kafka_ResourceType_name(restype));
+ TEST_ASSERT(strcmp(name, topic2_name) == 0, "expected %s not %s",
+ topic2_name, name);
+ TEST_ASSERT(
+ permission_type == RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW,
+ "expected %s not %s",
+ rd_kafka_AclPermissionType_name(RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW),
+ rd_kafka_AclPermissionType_name(permission_type));
+ TEST_ASSERT(strcmp(user_test2, principal) == 0, "expected %s not %s",
+ user_test2, principal);
+ TEST_ASSERT(resource_pattern_type == RD_KAFKA_RESOURCE_PATTERN_LITERAL,
+ "expected %s not %s",
+ rd_kafka_ResourcePatternType_name(
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL),
+ rd_kafka_ResourcePatternType_name(resource_pattern_type));
+
+ TEST_ASSERT(operation == RD_KAFKA_ACL_OPERATION_WRITE,
+ "expected %s not %s",
+ rd_kafka_AclOperation_name(RD_KAFKA_ACL_OPERATION_WRITE),
+ rd_kafka_AclOperation_name(operation));
+
+ acl_binding_results_cntp =
+ do_test_acls_count(rk, acl_bindings_describe, q);
+ TEST_ASSERT(acl_binding_results_cntp == 0,
+ "acl_binding_results_cntp should be 0, not %zu\n",
+ acl_binding_results_cntp);
+
+ rd_kafka_AclBinding_destroy(acl_bindings_delete);
+ rd_kafka_event_destroy(rkev_acl_delete);
+ rd_kafka_AdminOptions_destroy(admin_options_delete);
+
+ rd_kafka_AclBinding_destroy_array(acl_bindings_create, 3);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief Verify that an unclean rd_kafka_destroy() does not hang.
+ */
+static void do_test_unclean_destroy(rd_kafka_type_t cltype, int with_mainq) {
+ rd_kafka_t *rk;
+ char errstr[512];
+ rd_kafka_conf_t *conf;
+ rd_kafka_queue_t *q;
+ rd_kafka_NewTopic_t *topic;
+ test_timing_t t_destroy;
+
+ SUB_TEST_QUICK("Test unclean destroy using %s",
+ with_mainq ? "mainq" : "tempq");
+
+ test_conf_init(&conf, NULL, 0);
+
+ rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr));
+ TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr);
+
+ if (with_mainq)
+ q = rd_kafka_queue_get_main(rk);
+ else
+ q = rd_kafka_queue_new(rk);
+
+ topic = rd_kafka_NewTopic_new(test_mk_topic_name(__FUNCTION__, 1), 3, 1,
+ NULL, 0);
+ rd_kafka_CreateTopics(rk, &topic, 1, NULL, q);
+ rd_kafka_NewTopic_destroy(topic);
+
+ rd_kafka_queue_destroy(q);
+
+ TEST_SAY(
+ "Giving rd_kafka_destroy() 5s to finish, "
+ "despite Admin API request being processed\n");
+ test_timeout_set(5);
+ TIMING_START(&t_destroy, "rd_kafka_destroy()");
+ rd_kafka_destroy(rk);
+ TIMING_STOP(&t_destroy);
+
+ SUB_TEST_PASS();
+
+ /* Restore timeout */
+ test_timeout_set(60);
+}
+
+
+
+/**
+ * @brief Test deletion of records
+ *
+ *
+ */
+static void do_test_DeleteRecords(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int op_timeout) {
+ rd_kafka_queue_t *q;
+ rd_kafka_AdminOptions_t *options = NULL;
+ rd_kafka_topic_partition_list_t *offsets = NULL;
+ rd_kafka_event_t *rkev = NULL;
+ rd_kafka_resp_err_t err;
+ char errstr[512];
+ const char *errstr2;
+#define MY_DEL_RECORDS_CNT 3
+ rd_kafka_topic_partition_list_t *results = NULL;
+ int i;
+ const int partitions_cnt = 3;
+ const int msgs_cnt = 100;
+ char *topics[MY_DEL_RECORDS_CNT];
+ rd_kafka_metadata_topic_t exp_mdtopics[MY_DEL_RECORDS_CNT] = {{0}};
+ int exp_mdtopic_cnt = 0;
+ test_timing_t timing;
+ rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ rd_kafka_DeleteRecords_t *del_records;
+ const rd_kafka_DeleteRecords_result_t *res;
+
+ SUB_TEST_QUICK("%s DeleteRecords with %s, op_timeout %d",
+ rd_kafka_name(rk), what, op_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ if (op_timeout != -1) {
+ options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
+
+ err = rd_kafka_AdminOptions_set_operation_timeout(
+ options, op_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ }
+
+
+ for (i = 0; i < MY_DEL_RECORDS_CNT; i++) {
+ char pfx[32];
+ char *topic;
+
+ rd_snprintf(pfx, sizeof(pfx), "DeleteRecords-topic%d", i);
+ topic = rd_strdup(test_mk_topic_name(pfx, 1));
+
+ topics[i] = topic;
+ exp_mdtopics[exp_mdtopic_cnt++].topic = topic;
+ }
+
+ /* Create the topics first. */
+ test_CreateTopics_simple(rk, NULL, topics, MY_DEL_RECORDS_CNT,
+ partitions_cnt /*num_partitions*/, NULL);
+
+ /* Verify that topics are reported by metadata */
+ test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0,
+ 15 * 1000);
+
+ /* Produce 100 msgs / partition */
+ for (i = 0; i < MY_DEL_RECORDS_CNT; i++) {
+ int32_t partition;
+ for (partition = 0; partition < partitions_cnt; partition++) {
+ test_produce_msgs_easy(topics[i], 0, partition,
+ msgs_cnt);
+ }
+ }
+
+ offsets = rd_kafka_topic_partition_list_new(10);
+
+ /* Wipe all data from topic 0 */
+ for (i = 0; i < partitions_cnt; i++)
+ rd_kafka_topic_partition_list_add(offsets, topics[0], i)
+ ->offset = RD_KAFKA_OFFSET_END;
+
+ /* Wipe all data from partition 0 in topic 1 */
+ rd_kafka_topic_partition_list_add(offsets, topics[1], 0)->offset =
+ RD_KAFKA_OFFSET_END;
+
+ /* Wipe some data from partition 2 in topic 1 */
+ rd_kafka_topic_partition_list_add(offsets, topics[1], 2)->offset =
+ msgs_cnt / 2;
+
+ /* Not changing the offset (out of range) for topic 2 partition 0 */
+ rd_kafka_topic_partition_list_add(offsets, topics[2], 0);
+
+ /* Offset out of range for topic 2 partition 1 */
+ rd_kafka_topic_partition_list_add(offsets, topics[2], 1)->offset =
+ msgs_cnt + 1;
+
+ del_records = rd_kafka_DeleteRecords_new(offsets);
+
+ TIMING_START(&timing, "DeleteRecords");
+ TEST_SAY("Call DeleteRecords\n");
+ rd_kafka_DeleteRecords(rk, &del_records, 1, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ rd_kafka_DeleteRecords_destroy(del_records);
+
+ TIMING_START(&timing, "DeleteRecords.queue_poll");
+
+ /* Poll result queue for DeleteRecords result.
+ * Print but otherwise ignore other event types
+ * (typically generic Error events). */
+ while (1) {
+ rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000));
+ TEST_SAY("DeleteRecords: got %s in %.3fms\n",
+ rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+ if (rkev == NULL)
+ continue;
+ if (rd_kafka_event_error(rkev))
+ TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev),
+ rd_kafka_event_error_string(rkev));
+
+ if (rd_kafka_event_type(rkev) ==
+ RD_KAFKA_EVENT_DELETERECORDS_RESULT) {
+ break;
+ }
+
+ rd_kafka_event_destroy(rkev);
+ }
+ /* Convert event to proper result */
+ res = rd_kafka_event_DeleteRecords_result(rkev);
+ TEST_ASSERT(res, "expected DeleteRecords_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err == exp_err,
+ "expected DeleteRecords to return %s, not %s (%s)",
+ rd_kafka_err2str(exp_err), rd_kafka_err2str(err),
+ err ? errstr2 : "n/a");
+
+ TEST_SAY("DeleteRecords: returned %s (%s)\n", rd_kafka_err2str(err),
+ err ? errstr2 : "n/a");
+
+ results = rd_kafka_topic_partition_list_copy(
+ rd_kafka_DeleteRecords_result_offsets(res));
+
+ /* Sort both input and output list */
+ rd_kafka_topic_partition_list_sort(offsets, NULL, NULL);
+ rd_kafka_topic_partition_list_sort(results, NULL, NULL);
+
+ TEST_SAY("Input partitions:\n");
+ test_print_partition_list(offsets);
+ TEST_SAY("Result partitions:\n");
+ test_print_partition_list(results);
+
+ TEST_ASSERT(offsets->cnt == results->cnt,
+ "expected DeleteRecords_result_offsets to return %d items, "
+ "not %d",
+ offsets->cnt, results->cnt);
+
+ for (i = 0; i < results->cnt; i++) {
+ const rd_kafka_topic_partition_t *input = &offsets->elems[i];
+ const rd_kafka_topic_partition_t *output = &results->elems[i];
+ int64_t expected_offset = input->offset;
+ rd_kafka_resp_err_t expected_err = 0;
+
+ if (expected_offset == RD_KAFKA_OFFSET_END)
+ expected_offset = msgs_cnt;
+
+ /* Expect Offset out of range error */
+ if (input->offset < RD_KAFKA_OFFSET_END ||
+ input->offset > msgs_cnt)
+ expected_err = 1;
+
+ TEST_SAY("DeleteRecords Returned %s for %s [%" PRId32
+ "] "
+ "low-watermark = %d\n",
+ rd_kafka_err2name(output->err), output->topic,
+ output->partition, (int)output->offset);
+
+ if (strcmp(output->topic, input->topic))
+ TEST_FAIL_LATER(
+ "Result order mismatch at #%d: "
+ "expected topic %s, got %s",
+ i, input->topic, output->topic);
+
+ if (output->partition != input->partition)
+ TEST_FAIL_LATER(
+ "Result order mismatch at #%d: "
+ "expected partition %d, got %d",
+ i, input->partition, output->partition);
+
+ if (output->err != expected_err)
+ TEST_FAIL_LATER(
+ "%s [%" PRId32
+ "]: "
+ "expected error code %d (%s), "
+ "got %d (%s)",
+ output->topic, output->partition, expected_err,
+ rd_kafka_err2str(expected_err), output->err,
+ rd_kafka_err2str(output->err));
+
+ if (output->err == 0 && output->offset != expected_offset)
+ TEST_FAIL_LATER("%s [%" PRId32
+ "]: "
+ "expected offset %" PRId64
+ ", "
+ "got %" PRId64,
+ output->topic, output->partition,
+ expected_offset, output->offset);
+ }
+
+ /* Check watermarks for partitions */
+ for (i = 0; i < MY_DEL_RECORDS_CNT; i++) {
+ int32_t partition;
+ for (partition = 0; partition < partitions_cnt; partition++) {
+ const rd_kafka_topic_partition_t *del =
+ rd_kafka_topic_partition_list_find(
+ results, topics[i], partition);
+ int64_t expected_low = 0;
+ int64_t expected_high = msgs_cnt;
+ int64_t low, high;
+
+ if (del && del->err == 0) {
+ expected_low = del->offset;
+ }
+
+ err = rd_kafka_query_watermark_offsets(
+ rk, topics[i], partition, &low, &high,
+ tmout_multip(10000));
+ if (err)
+ TEST_FAIL(
+ "query_watermark_offsets failed: "
+ "%s\n",
+ rd_kafka_err2str(err));
+
+ if (low != expected_low)
+ TEST_FAIL_LATER("For %s [%" PRId32
+ "] expected "
+ "a low watermark of %" PRId64
+ ", got %" PRId64,
+ topics[i], partition,
+ expected_low, low);
+
+ if (high != expected_high)
+ TEST_FAIL_LATER("For %s [%" PRId32
+ "] expected "
+ "a high watermark of %" PRId64
+ ", got %" PRId64,
+ topics[i], partition,
+ expected_high, high);
+ }
+ }
+
+ rd_kafka_event_destroy(rkev);
+
+ for (i = 0; i < MY_DEL_RECORDS_CNT; i++)
+ rd_free(topics[i]);
+
+ if (results)
+ rd_kafka_topic_partition_list_destroy(results);
+
+ if (offsets)
+ rd_kafka_topic_partition_list_destroy(offsets);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ TEST_LATER_CHECK();
+#undef MY_DEL_RECORDS_CNT
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief Test deletion of groups
+ *
+ *
+ */
+
+typedef struct expected_group_result {
+ char *group;
+ rd_kafka_resp_err_t err;
+} expected_group_result_t;
+
+static void do_test_DeleteGroups(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int request_timeout) {
+ rd_kafka_queue_t *q;
+ rd_kafka_AdminOptions_t *options = NULL;
+ rd_kafka_event_t *rkev = NULL;
+ rd_kafka_resp_err_t err;
+ char errstr[512];
+ const char *errstr2;
+#define MY_DEL_GROUPS_CNT 4
+ int known_groups = MY_DEL_GROUPS_CNT - 1;
+ int i;
+ const int partitions_cnt = 1;
+ const int msgs_cnt = 100;
+ char *topic;
+ rd_kafka_metadata_topic_t exp_mdtopic = {0};
+ int64_t testid = test_id_generate();
+ test_timing_t timing;
+ rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ const rd_kafka_group_result_t **results = NULL;
+ expected_group_result_t expected[MY_DEL_GROUPS_CNT] = {{0}};
+ rd_kafka_DeleteGroup_t *del_groups[MY_DEL_GROUPS_CNT];
+ const rd_kafka_DeleteGroups_result_t *res;
+
+ SUB_TEST_QUICK("%s DeleteGroups with %s, request_timeout %d",
+ rd_kafka_name(rk), what, request_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ if (request_timeout != -1) {
+ options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
+
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, request_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ }
+
+
+ topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
+ exp_mdtopic.topic = topic;
+
+ /* Create the topics first. */
+ test_CreateTopics_simple(rk, NULL, &topic, 1, partitions_cnt, NULL);
+
+ /* Verify that topics are reported by metadata */
+ test_wait_metadata_update(rk, &exp_mdtopic, 1, NULL, 0, 15 * 1000);
+
+ /* Produce 100 msgs */
+ test_produce_msgs_easy(topic, testid, 0, msgs_cnt);
+
+ for (i = 0; i < MY_DEL_GROUPS_CNT; i++) {
+ char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
+ if (i < known_groups) {
+ test_consume_msgs_easy(group, topic, testid, -1,
+ msgs_cnt, NULL);
+ expected[i].group = group;
+ expected[i].err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ } else {
+ expected[i].group = group;
+ expected[i].err = RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND;
+ }
+ del_groups[i] = rd_kafka_DeleteGroup_new(group);
+ }
+
+ TIMING_START(&timing, "DeleteGroups");
+ TEST_SAY("Call DeleteGroups\n");
+ rd_kafka_DeleteGroups(rk, del_groups, MY_DEL_GROUPS_CNT, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ TIMING_START(&timing, "DeleteGroups.queue_poll");
+
+ /* Poll result queue for DeleteGroups result.
+ * Print but otherwise ignore other event types
+ * (typically generic Error events). */
+ while (1) {
+ rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000));
+ TEST_SAY("DeleteGroups: got %s in %.3fms\n",
+ rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+ if (rkev == NULL)
+ continue;
+ if (rd_kafka_event_error(rkev))
+ TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev),
+ rd_kafka_event_error_string(rkev));
+
+ if (rd_kafka_event_type(rkev) ==
+ RD_KAFKA_EVENT_DELETEGROUPS_RESULT) {
+ break;
+ }
+
+ rd_kafka_event_destroy(rkev);
+ }
+ /* Convert event to proper result */
+ res = rd_kafka_event_DeleteGroups_result(rkev);
+ TEST_ASSERT(res, "expected DeleteGroups_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err == exp_err,
+ "expected DeleteGroups to return %s, not %s (%s)",
+ rd_kafka_err2str(exp_err), rd_kafka_err2str(err),
+ err ? errstr2 : "n/a");
+
+ TEST_SAY("DeleteGroups: returned %s (%s)\n", rd_kafka_err2str(err),
+ err ? errstr2 : "n/a");
+
+ size_t cnt = 0;
+ results = rd_kafka_DeleteGroups_result_groups(res, &cnt);
+
+ TEST_ASSERT(MY_DEL_GROUPS_CNT == cnt,
+ "expected DeleteGroups_result_groups to return %d items, "
+ "not %" PRIusz,
+ MY_DEL_GROUPS_CNT, cnt);
+
+ for (i = 0; i < MY_DEL_GROUPS_CNT; i++) {
+ const expected_group_result_t *exp = &expected[i];
+ rd_kafka_resp_err_t exp_err = exp->err;
+ const rd_kafka_group_result_t *act = results[i];
+ rd_kafka_resp_err_t act_err =
+ rd_kafka_error_code(rd_kafka_group_result_error(act));
+ TEST_ASSERT(
+ strcmp(exp->group, rd_kafka_group_result_name(act)) == 0,
+ "Result order mismatch at #%d: expected group name to be "
+ "%s, not %s",
+ i, exp->group, rd_kafka_group_result_name(act));
+ TEST_ASSERT(exp_err == act_err,
+ "expected err=%d for group %s, not %d (%s)",
+ exp_err, exp->group, act_err,
+ rd_kafka_err2str(act_err));
+ }
+
+ rd_kafka_event_destroy(rkev);
+
+ for (i = 0; i < MY_DEL_GROUPS_CNT; i++) {
+ rd_kafka_DeleteGroup_destroy(del_groups[i]);
+ rd_free(expected[i].group);
+ }
+
+ rd_free(topic);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ TEST_LATER_CHECK();
+#undef MY_DEL_GROUPS_CNT
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief Test list groups, creating consumers for a set of groups,
+ * listing and deleting them at the end.
+ */
+static void do_test_ListConsumerGroups(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int request_timeout,
+ rd_bool_t match_states) {
+#define TEST_LIST_CONSUMER_GROUPS_CNT 4
+ rd_kafka_queue_t *q;
+ rd_kafka_AdminOptions_t *options = NULL;
+ rd_kafka_event_t *rkev = NULL;
+ rd_kafka_resp_err_t err;
+ size_t valid_cnt, error_cnt;
+ rd_bool_t is_simple_consumer_group;
+ rd_kafka_consumer_group_state_t state;
+ char errstr[512];
+ const char *errstr2, *group_id;
+ char *list_consumer_groups[TEST_LIST_CONSUMER_GROUPS_CNT];
+ const int partitions_cnt = 1;
+ const int msgs_cnt = 100;
+ size_t i, found;
+ char *topic;
+ rd_kafka_metadata_topic_t exp_mdtopic = {0};
+ int64_t testid = test_id_generate();
+ test_timing_t timing;
+ rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ const rd_kafka_ListConsumerGroups_result_t *res;
+ const rd_kafka_ConsumerGroupListing_t **groups;
+ rd_bool_t has_match_states =
+ test_broker_version >= TEST_BRKVER(2, 7, 0, 0);
+
+ SUB_TEST_QUICK(
+ "%s ListConsumerGroups with %s, request_timeout %d"
+ ", match_states %s",
+ rd_kafka_name(rk), what, request_timeout, RD_STR_ToF(match_states));
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ if (request_timeout != -1) {
+ options = rd_kafka_AdminOptions_new(
+ rk, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS);
+
+ if (match_states) {
+ rd_kafka_consumer_group_state_t empty =
+ RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY;
+
+ TEST_CALL_ERROR__(
+ rd_kafka_AdminOptions_set_match_consumer_group_states(
+ options, &empty, 1));
+ }
+
+ TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout(
+ options, request_timeout, errstr, sizeof(errstr)));
+ }
+
+
+ topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
+ exp_mdtopic.topic = topic;
+
+ /* Create the topics first. */
+ test_CreateTopics_simple(rk, NULL, &topic, 1, partitions_cnt, NULL);
+
+ /* Verify that topics are reported by metadata */
+ test_wait_metadata_update(rk, &exp_mdtopic, 1, NULL, 0, 15 * 1000);
+
+ /* Produce 100 msgs */
+ test_produce_msgs_easy(topic, testid, 0, msgs_cnt);
+
+ for (i = 0; i < TEST_LIST_CONSUMER_GROUPS_CNT; i++) {
+ char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
+ test_consume_msgs_easy(group, topic, testid, -1, msgs_cnt,
+ NULL);
+ list_consumer_groups[i] = group;
+ }
+
+ TIMING_START(&timing, "ListConsumerGroups");
+ TEST_SAY("Call ListConsumerGroups\n");
+ rd_kafka_ListConsumerGroups(rk, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ TIMING_START(&timing, "ListConsumerGroups.queue_poll");
+
+ /* Poll result queue for ListConsumerGroups result.
+ * Print but otherwise ignore other event types
+ * (typically generic Error events). */
+ while (1) {
+ rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000));
+ TEST_SAY("ListConsumerGroups: got %s in %.3fms\n",
+ rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+ if (rkev == NULL)
+ continue;
+ if (rd_kafka_event_error(rkev))
+ TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev),
+ rd_kafka_event_error_string(rkev));
+
+ if (rd_kafka_event_type(rkev) ==
+ RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT) {
+ break;
+ }
+
+ rd_kafka_event_destroy(rkev);
+ }
+ /* Convert event to proper result */
+ res = rd_kafka_event_ListConsumerGroups_result(rkev);
+ TEST_ASSERT(res, "expected ListConsumerGroups_result, got %s",
+ rd_kafka_event_name(rkev));
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err == exp_err,
+ "expected ListConsumerGroups to return %s, got %s (%s)",
+ rd_kafka_err2str(exp_err), rd_kafka_err2str(err),
+ err ? errstr2 : "n/a");
+
+ TEST_SAY("ListConsumerGroups: returned %s (%s)\n",
+ rd_kafka_err2str(err), err ? errstr2 : "n/a");
+
+ groups = rd_kafka_ListConsumerGroups_result_valid(res, &valid_cnt);
+ rd_kafka_ListConsumerGroups_result_errors(res, &error_cnt);
+
+ /* Other tests could be running */
+ TEST_ASSERT(valid_cnt >= TEST_LIST_CONSUMER_GROUPS_CNT,
+ "expected ListConsumerGroups to return at least %" PRId32
+ " valid groups,"
+ " got %zu",
+ TEST_LIST_CONSUMER_GROUPS_CNT, valid_cnt);
+
+ TEST_ASSERT(error_cnt == 0,
+ "expected ListConsumerGroups to return 0 errors,"
+ " got %zu",
+ error_cnt);
+
+ found = 0;
+ for (i = 0; i < valid_cnt; i++) {
+ int j;
+ const rd_kafka_ConsumerGroupListing_t *group;
+ group = groups[i];
+ group_id = rd_kafka_ConsumerGroupListing_group_id(group);
+ is_simple_consumer_group =
+ rd_kafka_ConsumerGroupListing_is_simple_consumer_group(
+ group);
+ state = rd_kafka_ConsumerGroupListing_state(group);
+ for (j = 0; j < TEST_LIST_CONSUMER_GROUPS_CNT; j++) {
+ if (!strcmp(list_consumer_groups[j], group_id)) {
+ found++;
+ TEST_ASSERT(!is_simple_consumer_group,
+ "expected a normal group,"
+ " got a simple group");
+
+ if (!has_match_states)
+ break;
+
+ TEST_ASSERT(
+ state ==
+ RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY,
+ "expected an Empty state,"
+ " got state %s",
+ rd_kafka_consumer_group_state_name(state));
+ break;
+ }
+ }
+ }
+ TEST_ASSERT(found == TEST_LIST_CONSUMER_GROUPS_CNT,
+ "expected to find %d"
+ " started groups,"
+ " got %" PRIusz,
+ TEST_LIST_CONSUMER_GROUPS_CNT, found);
+
+ rd_kafka_event_destroy(rkev);
+
+ test_DeleteGroups_simple(rk, NULL, (char **)list_consumer_groups,
+ TEST_LIST_CONSUMER_GROUPS_CNT, NULL);
+
+ for (i = 0; i < TEST_LIST_CONSUMER_GROUPS_CNT; i++) {
+ rd_free(list_consumer_groups[i]);
+ }
+
+ rd_free(topic);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ TEST_LATER_CHECK();
+#undef TEST_LIST_CONSUMER_GROUPS_CNT
+
+ SUB_TEST_PASS();
+}
+
+typedef struct expected_DescribeConsumerGroups_result {
+ char *group_id;
+ rd_kafka_resp_err_t err;
+} expected_DescribeConsumerGroups_result_t;
+
+
+/**
+ * @brief Test describe groups, creating consumers for a set of groups,
+ * describing and deleting them at the end.
+ */
+static void do_test_DescribeConsumerGroups(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int request_timeout) {
+ rd_kafka_queue_t *q;
+ rd_kafka_AdminOptions_t *options = NULL;
+ rd_kafka_event_t *rkev = NULL;
+ rd_kafka_resp_err_t err;
+ char errstr[512];
+ const char *errstr2;
+#define TEST_DESCRIBE_CONSUMER_GROUPS_CNT 4
+ int known_groups = TEST_DESCRIBE_CONSUMER_GROUPS_CNT - 1;
+ int i;
+ const int partitions_cnt = 1;
+ const int msgs_cnt = 100;
+ char *topic;
+ rd_kafka_metadata_topic_t exp_mdtopic = {0};
+ int64_t testid = test_id_generate();
+ test_timing_t timing;
+ rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ const rd_kafka_ConsumerGroupDescription_t **results = NULL;
+ expected_DescribeConsumerGroups_result_t
+ expected[TEST_DESCRIBE_CONSUMER_GROUPS_CNT] = RD_ZERO_INIT;
+ const char *describe_groups[TEST_DESCRIBE_CONSUMER_GROUPS_CNT];
+ char group_instance_ids[TEST_DESCRIBE_CONSUMER_GROUPS_CNT][512];
+ char client_ids[TEST_DESCRIBE_CONSUMER_GROUPS_CNT][512];
+ rd_kafka_t *rks[TEST_DESCRIBE_CONSUMER_GROUPS_CNT];
+ const rd_kafka_DescribeConsumerGroups_result_t *res;
+ rd_bool_t has_group_instance_id =
+ test_broker_version >= TEST_BRKVER(2, 4, 0, 0);
+
+ SUB_TEST_QUICK("%s DescribeConsumerGroups with %s, request_timeout %d",
+ rd_kafka_name(rk), what, request_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ if (request_timeout != -1) {
+ options = rd_kafka_AdminOptions_new(
+ rk, RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS);
+
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, request_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ }
+
+
+ topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
+ exp_mdtopic.topic = topic;
+
+ /* Create the topics first. */
+ test_CreateTopics_simple(rk, NULL, &topic, 1, partitions_cnt, NULL);
+
+ /* Verify that topics are reported by metadata */
+ test_wait_metadata_update(rk, &exp_mdtopic, 1, NULL, 0, 15 * 1000);
+
+ /* Produce 100 msgs */
+ test_produce_msgs_easy(topic, testid, 0, msgs_cnt);
+
+ for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) {
+ rd_kafka_conf_t *conf;
+ char *group_id = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
+ if (i < known_groups) {
+ snprintf(group_instance_ids[i],
+ sizeof(group_instance_ids[i]),
+ "group_instance_id_%" PRId32, i);
+ snprintf(client_ids[i], sizeof(client_ids[i]),
+ "client_id_%" PRId32, i);
+
+ test_conf_init(&conf, NULL, 0);
+ test_conf_set(conf, "client.id", client_ids[i]);
+ test_conf_set(conf, "group.instance.id",
+ group_instance_ids[i]);
+ test_conf_set(conf, "session.timeout.ms", "5000");
+ test_conf_set(conf, "auto.offset.reset", "earliest");
+ rks[i] =
+ test_create_consumer(group_id, NULL, conf, NULL);
+ test_consumer_subscribe(rks[i], topic);
+ /* Consume messages */
+ test_consumer_poll("consumer", rks[i], testid, -1, -1,
+ msgs_cnt, NULL);
+ }
+ expected[i].group_id = group_id;
+ expected[i].err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ describe_groups[i] = group_id;
+ }
+
+ TIMING_START(&timing, "DescribeConsumerGroups");
+ TEST_SAY("Call DescribeConsumerGroups\n");
+ rd_kafka_DescribeConsumerGroups(
+ rk, describe_groups, TEST_DESCRIBE_CONSUMER_GROUPS_CNT, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ TIMING_START(&timing, "DescribeConsumerGroups.queue_poll");
+
+ /* Poll result queue for DescribeConsumerGroups result.
+ * Print but otherwise ignore other event types
+ * (typically generic Error events). */
+ while (1) {
+ rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000));
+ TEST_SAY("DescribeConsumerGroups: got %s in %.3fms\n",
+ rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+ if (rkev == NULL)
+ continue;
+ if (rd_kafka_event_error(rkev))
+ TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev),
+ rd_kafka_event_error_string(rkev));
+
+ if (rd_kafka_event_type(rkev) ==
+ RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT) {
+ break;
+ }
+
+ rd_kafka_event_destroy(rkev);
+ }
+ /* Convert event to proper result */
+ res = rd_kafka_event_DescribeConsumerGroups_result(rkev);
+ TEST_ASSERT(res, "expected DescribeConsumerGroups_result, got %s",
+ rd_kafka_event_name(rkev));
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err == exp_err,
+ "expected DescribeConsumerGroups to return %s, got %s (%s)",
+ rd_kafka_err2str(exp_err), rd_kafka_err2str(err),
+ err ? errstr2 : "n/a");
+
+ TEST_SAY("DescribeConsumerGroups: returned %s (%s)\n",
+ rd_kafka_err2str(err), err ? errstr2 : "n/a");
+
+ size_t cnt = 0;
+ results = rd_kafka_DescribeConsumerGroups_result_groups(res, &cnt);
+
+ TEST_ASSERT(
+ TEST_DESCRIBE_CONSUMER_GROUPS_CNT == cnt,
+ "expected DescribeConsumerGroups_result_groups to return %d items, "
+ "got %" PRIusz,
+ TEST_DESCRIBE_CONSUMER_GROUPS_CNT, cnt);
+
+ for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) {
+ expected_DescribeConsumerGroups_result_t *exp = &expected[i];
+ rd_kafka_resp_err_t exp_err = exp->err;
+ const rd_kafka_ConsumerGroupDescription_t *act = results[i];
+ rd_kafka_resp_err_t act_err = rd_kafka_error_code(
+ rd_kafka_ConsumerGroupDescription_error(act));
+ rd_kafka_consumer_group_state_t state =
+ rd_kafka_ConsumerGroupDescription_state(act);
+ TEST_ASSERT(
+ strcmp(exp->group_id,
+ rd_kafka_ConsumerGroupDescription_group_id(act)) ==
+ 0,
+ "Result order mismatch at #%d: expected group id to be "
+ "%s, got %s",
+ i, exp->group_id,
+ rd_kafka_ConsumerGroupDescription_group_id(act));
+ if (i < known_groups) {
+ int member_count;
+ const rd_kafka_MemberDescription_t *member;
+ const rd_kafka_MemberAssignment_t *assignment;
+ const char *client_id;
+ const char *group_instance_id;
+ const rd_kafka_topic_partition_list_t *partitions;
+
+ TEST_ASSERT(state ==
+ RD_KAFKA_CONSUMER_GROUP_STATE_STABLE,
+ "Expected Stable state, got %s.",
+ rd_kafka_consumer_group_state_name(state));
+
+ TEST_ASSERT(
+ !rd_kafka_ConsumerGroupDescription_is_simple_consumer_group(
+ act),
+ "Expected a normal consumer group, got a simple "
+ "one.");
+
+ member_count =
+ rd_kafka_ConsumerGroupDescription_member_count(act);
+ TEST_ASSERT(member_count == 1,
+ "Expected one member, got %d.",
+ member_count);
+
+ member =
+ rd_kafka_ConsumerGroupDescription_member(act, 0);
+
+ client_id =
+ rd_kafka_MemberDescription_client_id(member);
+ TEST_ASSERT(!strcmp(client_id, client_ids[i]),
+ "Expected client id \"%s\","
+ " got \"%s\".",
+ client_ids[i], client_id);
+
+ if (has_group_instance_id) {
+ group_instance_id =
+ rd_kafka_MemberDescription_group_instance_id(
+ member);
+ TEST_ASSERT(!strcmp(group_instance_id,
+ group_instance_ids[i]),
+ "Expected group instance id \"%s\","
+ " got \"%s\".",
+ group_instance_ids[i],
+ group_instance_id);
+ }
+
+ assignment =
+ rd_kafka_MemberDescription_assignment(member);
+ TEST_ASSERT(assignment != NULL,
+ "Expected non-NULL member assignment");
+
+ partitions =
+ rd_kafka_MemberAssignment_partitions(assignment);
+ TEST_ASSERT(partitions != NULL,
+ "Expected non-NULL member partitions");
+
+ TEST_SAY(
+ "Member client.id=\"%s\", "
+ "group.instance.id=\"%s\", "
+ "consumer_id=\"%s\", "
+ "host=\"%s\", assignment:\n",
+ rd_kafka_MemberDescription_client_id(member),
+ rd_kafka_MemberDescription_group_instance_id(
+ member),
+ rd_kafka_MemberDescription_consumer_id(member),
+ rd_kafka_MemberDescription_host(member));
+ /* This is just to make sure the returned memory
+ * is valid. */
+ test_print_partition_list(partitions);
+ } else {
+ TEST_ASSERT(state == RD_KAFKA_CONSUMER_GROUP_STATE_DEAD,
+ "Expected Dead state, got %s.",
+ rd_kafka_consumer_group_state_name(state));
+ }
+ TEST_ASSERT(exp_err == act_err,
+ "expected err=%d for group %s, got %d (%s)",
+ exp_err, exp->group_id, act_err,
+ rd_kafka_err2str(act_err));
+ }
+
+ rd_kafka_event_destroy(rkev);
+
+ for (i = 0; i < known_groups; i++) {
+ test_consumer_close(rks[i]);
+ rd_kafka_destroy(rks[i]);
+ }
+
+ /* Wait session timeout + 1s. Because using static group membership */
+ rd_sleep(6);
+
+ test_DeleteGroups_simple(rk, NULL, (char **)describe_groups,
+ known_groups, NULL);
+
+ for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) {
+ rd_free(expected[i].group_id);
+ }
+
+ rd_free(topic);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ TEST_LATER_CHECK();
+#undef TEST_DESCRIBE_CONSUMER_GROUPS_CNT
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief Test deletion of committed offsets.
+ *
+ *
+ */
+static void do_test_DeleteConsumerGroupOffsets(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int req_timeout_ms,
+ rd_bool_t sub_consumer) {
+ rd_kafka_queue_t *q;
+ rd_kafka_AdminOptions_t *options = NULL;
+ rd_kafka_topic_partition_list_t *orig_offsets, *offsets, *to_delete,
+ *committed, *deleted, *subscription = NULL;
+ rd_kafka_event_t *rkev = NULL;
+ rd_kafka_resp_err_t err;
+ char errstr[512];
+ const char *errstr2;
+#define MY_TOPIC_CNT 3
+ int i;
+ const int partitions_cnt = 3;
+ char *topics[MY_TOPIC_CNT];
+ rd_kafka_metadata_topic_t exp_mdtopics[MY_TOPIC_CNT] = {{0}};
+ int exp_mdtopic_cnt = 0;
+ test_timing_t timing;
+ rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ rd_kafka_DeleteConsumerGroupOffsets_t *cgoffsets;
+ const rd_kafka_DeleteConsumerGroupOffsets_result_t *res;
+ const rd_kafka_group_result_t **gres;
+ size_t gres_cnt;
+ rd_kafka_t *consumer;
+ char *groupid;
+
+ SUB_TEST_QUICK(
+ "%s DeleteConsumerGroupOffsets with %s, req_timeout_ms %d%s",
+ rd_kafka_name(rk), what, req_timeout_ms,
+ sub_consumer ? ", with subscribing consumer" : "");
+
+ if (sub_consumer)
+ exp_err = RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC;
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ if (req_timeout_ms != -1) {
+ options = rd_kafka_AdminOptions_new(
+ rk, RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS);
+
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, req_timeout_ms, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ }
+
+
+ subscription = rd_kafka_topic_partition_list_new(MY_TOPIC_CNT);
+
+ for (i = 0; i < MY_TOPIC_CNT; i++) {
+ char pfx[64];
+ char *topic;
+
+ rd_snprintf(pfx, sizeof(pfx), "DCGO-topic%d", i);
+ topic = rd_strdup(test_mk_topic_name(pfx, 1));
+
+ topics[i] = topic;
+ exp_mdtopics[exp_mdtopic_cnt++].topic = topic;
+
+ rd_kafka_topic_partition_list_add(subscription, topic,
+ RD_KAFKA_PARTITION_UA);
+ }
+
+ groupid = topics[0];
+
+ /* Create the topics first. */
+ test_CreateTopics_simple(rk, NULL, topics, MY_TOPIC_CNT, partitions_cnt,
+ NULL);
+
+ /* Verify that topics are reported by metadata */
+ test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0,
+ 15 * 1000);
+
+ rd_sleep(1); /* Additional wait time for cluster propagation */
+
+ consumer = test_create_consumer(groupid, NULL, NULL, NULL);
+
+ if (sub_consumer) {
+ TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription));
+ test_consumer_wait_assignment(consumer, rd_true);
+ }
+
+ /* Commit some offsets */
+ orig_offsets = rd_kafka_topic_partition_list_new(MY_TOPIC_CNT * 2);
+ for (i = 0; i < MY_TOPIC_CNT * 2; i++)
+ rd_kafka_topic_partition_list_add(orig_offsets, topics[i / 2],
+ i % MY_TOPIC_CNT)
+ ->offset = (i + 1) * 10;
+
+ TEST_CALL_ERR__(rd_kafka_commit(consumer, orig_offsets, 0 /*sync*/));
+
+ /* Verify committed offsets match */
+ committed = rd_kafka_topic_partition_list_copy(orig_offsets);
+ TEST_CALL_ERR__(
+ rd_kafka_committed(consumer, committed, tmout_multip(5 * 1000)));
+
+ if (test_partition_list_and_offsets_cmp(committed, orig_offsets)) {
+ TEST_SAY("commit() list:\n");
+ test_print_partition_list(orig_offsets);
+ TEST_SAY("committed() list:\n");
+ test_print_partition_list(committed);
+ TEST_FAIL("committed offsets don't match");
+ }
+
+ rd_kafka_topic_partition_list_destroy(committed);
+
+ /* Now delete second half of the commits */
+ offsets = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2);
+ to_delete = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2);
+ for (i = 0; i < orig_offsets->cnt; i++) {
+ rd_kafka_topic_partition_t *rktpar;
+ if (i < orig_offsets->cnt / 2) {
+ rktpar = rd_kafka_topic_partition_list_add(
+ offsets, orig_offsets->elems[i].topic,
+ orig_offsets->elems[i].partition);
+ rktpar->offset = orig_offsets->elems[i].offset;
+ } else {
+ rktpar = rd_kafka_topic_partition_list_add(
+ to_delete, orig_offsets->elems[i].topic,
+ orig_offsets->elems[i].partition);
+ rktpar->offset = RD_KAFKA_OFFSET_INVALID;
+ rktpar = rd_kafka_topic_partition_list_add(
+ offsets, orig_offsets->elems[i].topic,
+ orig_offsets->elems[i].partition);
+ rktpar->offset = RD_KAFKA_OFFSET_INVALID;
+ }
+ }
+
+ cgoffsets = rd_kafka_DeleteConsumerGroupOffsets_new(groupid, to_delete);
+
+ TIMING_START(&timing, "DeleteConsumerGroupOffsets");
+ TEST_SAY("Call DeleteConsumerGroupOffsets\n");
+ rd_kafka_DeleteConsumerGroupOffsets(rk, &cgoffsets, 1, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ rd_kafka_DeleteConsumerGroupOffsets_destroy(cgoffsets);
+
+ TIMING_START(&timing, "DeleteConsumerGroupOffsets.queue_poll");
+ /* Poll result queue for DeleteConsumerGroupOffsets result.
+ * Print but otherwise ignore other event types
+ * (typically generic Error events). */
+ while (1) {
+ rkev = rd_kafka_queue_poll(q, tmout_multip(10 * 1000));
+ TEST_SAY("DeleteConsumerGroupOffsets: got %s in %.3fms\n",
+ rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+ if (rkev == NULL)
+ continue;
+ if (rd_kafka_event_error(rkev))
+ TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev),
+ rd_kafka_event_error_string(rkev));
+
+ if (rd_kafka_event_type(rkev) ==
+ RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT)
+ break;
+
+ rd_kafka_event_destroy(rkev);
+ }
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_DeleteConsumerGroupOffsets_result(rkev);
+ TEST_ASSERT(res, "expected DeleteConsumerGroupOffsets_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(!err,
+ "expected DeleteConsumerGroupOffsets to succeed, "
+ "got %s (%s)",
+ rd_kafka_err2name(err), err ? errstr2 : "n/a");
+
+ TEST_SAY("DeleteConsumerGroupOffsets: returned %s (%s)\n",
+ rd_kafka_err2str(err), err ? errstr2 : "n/a");
+
+ gres =
+ rd_kafka_DeleteConsumerGroupOffsets_result_groups(res, &gres_cnt);
+ TEST_ASSERT(gres && gres_cnt == 1,
+ "expected gres_cnt == 1, not %" PRIusz, gres_cnt);
+
+ deleted = rd_kafka_topic_partition_list_copy(
+ rd_kafka_group_result_partitions(gres[0]));
+
+ if (test_partition_list_and_offsets_cmp(deleted, to_delete)) {
+ TEST_SAY("Result list:\n");
+ test_print_partition_list(deleted);
+ TEST_SAY("Partitions passed to DeleteConsumerGroupOffsets:\n");
+ test_print_partition_list(to_delete);
+ TEST_FAIL("deleted/requested offsets don't match");
+ }
+
+ /* Verify expected errors */
+ for (i = 0; i < deleted->cnt; i++) {
+ TEST_ASSERT_LATER(deleted->elems[i].err == exp_err,
+ "Result %s [%" PRId32
+ "] has error %s, "
+ "expected %s",
+ deleted->elems[i].topic,
+ deleted->elems[i].partition,
+ rd_kafka_err2name(deleted->elems[i].err),
+ rd_kafka_err2name(exp_err));
+ }
+
+ TEST_LATER_CHECK();
+
+ rd_kafka_topic_partition_list_destroy(deleted);
+ rd_kafka_topic_partition_list_destroy(to_delete);
+
+ rd_kafka_event_destroy(rkev);
+
+
+ /* Verify committed offsets match */
+ committed = rd_kafka_topic_partition_list_copy(orig_offsets);
+ TEST_CALL_ERR__(
+ rd_kafka_committed(consumer, committed, tmout_multip(5 * 1000)));
+
+ TEST_SAY("Original committed offsets:\n");
+ test_print_partition_list(orig_offsets);
+
+ TEST_SAY("Committed offsets after delete:\n");
+ test_print_partition_list(committed);
+
+ rd_kafka_topic_partition_list_t *expected = offsets;
+ if (sub_consumer)
+ expected = orig_offsets;
+
+ if (test_partition_list_and_offsets_cmp(committed, expected)) {
+ TEST_SAY("expected list:\n");
+ test_print_partition_list(expected);
+ TEST_SAY("committed() list:\n");
+ test_print_partition_list(committed);
+ TEST_FAIL("committed offsets don't match");
+ }
+
+ rd_kafka_topic_partition_list_destroy(committed);
+ rd_kafka_topic_partition_list_destroy(offsets);
+ rd_kafka_topic_partition_list_destroy(orig_offsets);
+ rd_kafka_topic_partition_list_destroy(subscription);
+
+ for (i = 0; i < MY_TOPIC_CNT; i++)
+ rd_free(topics[i]);
+
+ rd_kafka_destroy(consumer);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ TEST_LATER_CHECK();
+#undef MY_TOPIC_CNT
+
+ SUB_TEST_PASS();
+}
+
+
+/**
+ * @brief Test altering of committed offsets.
+ *
+ *
+ */
+static void do_test_AlterConsumerGroupOffsets(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int req_timeout_ms,
+ rd_bool_t sub_consumer,
+ rd_bool_t create_topics) {
+ rd_kafka_queue_t *q;
+ rd_kafka_AdminOptions_t *options = NULL;
+ rd_kafka_topic_partition_list_t *orig_offsets, *offsets, *to_alter,
+ *committed, *alterd, *subscription = NULL;
+ rd_kafka_event_t *rkev = NULL;
+ rd_kafka_resp_err_t err;
+ char errstr[512];
+ const char *errstr2;
+#define TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT 3
+ int i;
+ const int partitions_cnt = 3;
+ char *topics[TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT];
+ rd_kafka_metadata_topic_t
+ exp_mdtopics[TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT] = {{0}};
+ int exp_mdtopic_cnt = 0;
+ test_timing_t timing;
+ rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ rd_kafka_AlterConsumerGroupOffsets_t *cgoffsets;
+ const rd_kafka_AlterConsumerGroupOffsets_result_t *res;
+ const rd_kafka_group_result_t **gres;
+ size_t gres_cnt;
+ rd_kafka_t *consumer = NULL;
+ char *group_id;
+
+ SUB_TEST_QUICK(
+ "%s AlterConsumerGroupOffsets with %s, "
+ "request_timeout %d%s",
+ rd_kafka_name(rk), what, req_timeout_ms,
+ sub_consumer ? ", with subscribing consumer" : "");
+
+ if (!create_topics)
+ exp_err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART;
+ else if (sub_consumer)
+ exp_err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID;
+
+ if (sub_consumer && !create_topics)
+ TEST_FAIL(
+ "Can't use set sub_consumer and unset create_topics at the "
+ "same time");
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ if (req_timeout_ms != -1) {
+ options = rd_kafka_AdminOptions_new(
+ rk, RD_KAFKA_ADMIN_OP_ALTERCONSUMERGROUPOFFSETS);
+
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, req_timeout_ms, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ }
+
+
+ subscription = rd_kafka_topic_partition_list_new(
+ TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT);
+
+ for (i = 0; i < TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT; i++) {
+ char pfx[64];
+ char *topic;
+
+ rd_snprintf(pfx, sizeof(pfx), "DCGO-topic%d", i);
+ topic = rd_strdup(test_mk_topic_name(pfx, 1));
+
+ topics[i] = topic;
+ exp_mdtopics[exp_mdtopic_cnt++].topic = topic;
+
+ rd_kafka_topic_partition_list_add(subscription, topic,
+ RD_KAFKA_PARTITION_UA);
+ }
+
+ group_id = topics[0];
+
+ /* Create the topics first if needed. */
+ if (create_topics) {
+ test_CreateTopics_simple(
+ rk, NULL, topics,
+ TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT, partitions_cnt,
+ NULL);
+
+ /* Verify that topics are reported by metadata */
+ test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt,
+ NULL, 0, 15 * 1000);
+
+ rd_sleep(1); /* Additional wait time for cluster propagation */
+
+ consumer = test_create_consumer(group_id, NULL, NULL, NULL);
+
+ if (sub_consumer) {
+ TEST_CALL_ERR__(
+ rd_kafka_subscribe(consumer, subscription));
+ test_consumer_wait_assignment(consumer, rd_true);
+ }
+ }
+
+ orig_offsets = rd_kafka_topic_partition_list_new(
+ TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT * partitions_cnt);
+ for (i = 0;
+ i < TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT * partitions_cnt;
+ i++) {
+ rd_kafka_topic_partition_t *rktpar;
+ rktpar = rd_kafka_topic_partition_list_add(
+ orig_offsets, topics[i / partitions_cnt],
+ i % partitions_cnt);
+ rktpar->offset = (i + 1) * 10;
+ rd_kafka_topic_partition_set_leader_epoch(rktpar, 1);
+ }
+
+ /* Commit some offsets, if topics exists */
+ if (create_topics) {
+ TEST_CALL_ERR__(
+ rd_kafka_commit(consumer, orig_offsets, 0 /*sync*/));
+
+ /* Verify committed offsets match */
+ committed = rd_kafka_topic_partition_list_copy(orig_offsets);
+ TEST_CALL_ERR__(rd_kafka_committed(consumer, committed,
+ tmout_multip(5 * 1000)));
+
+ if (test_partition_list_and_offsets_cmp(committed,
+ orig_offsets)) {
+ TEST_SAY("commit() list:\n");
+ test_print_partition_list(orig_offsets);
+ TEST_SAY("committed() list:\n");
+ test_print_partition_list(committed);
+ TEST_FAIL("committed offsets don't match");
+ }
+ rd_kafka_topic_partition_list_destroy(committed);
+ }
+
+ /* Now alter second half of the commits */
+ offsets = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2);
+ to_alter = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2);
+ for (i = 0; i < orig_offsets->cnt; i++) {
+ rd_kafka_topic_partition_t *rktpar;
+ if (i < orig_offsets->cnt / 2) {
+ rktpar = rd_kafka_topic_partition_list_add(
+ offsets, orig_offsets->elems[i].topic,
+ orig_offsets->elems[i].partition);
+ rktpar->offset = orig_offsets->elems[i].offset;
+ rd_kafka_topic_partition_set_leader_epoch(
+ rktpar, rd_kafka_topic_partition_get_leader_epoch(
+ &orig_offsets->elems[i]));
+ } else {
+ rktpar = rd_kafka_topic_partition_list_add(
+ to_alter, orig_offsets->elems[i].topic,
+ orig_offsets->elems[i].partition);
+ rktpar->offset = 5;
+ rd_kafka_topic_partition_set_leader_epoch(rktpar, 2);
+ rktpar = rd_kafka_topic_partition_list_add(
+ offsets, orig_offsets->elems[i].topic,
+ orig_offsets->elems[i].partition);
+ rktpar->offset = 5;
+ rd_kafka_topic_partition_set_leader_epoch(rktpar, 2);
+ }
+ }
+
+ cgoffsets = rd_kafka_AlterConsumerGroupOffsets_new(group_id, to_alter);
+
+ TIMING_START(&timing, "AlterConsumerGroupOffsets");
+ TEST_SAY("Call AlterConsumerGroupOffsets\n");
+ rd_kafka_AlterConsumerGroupOffsets(rk, &cgoffsets, 1, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ rd_kafka_AlterConsumerGroupOffsets_destroy(cgoffsets);
+
+ TIMING_START(&timing, "AlterConsumerGroupOffsets.queue_poll");
+ /* Poll result queue for AlterConsumerGroupOffsets result.
+ * Print but otherwise ignore other event types
+ * (typically generic Error events). */
+ while (1) {
+ rkev = rd_kafka_queue_poll(q, tmout_multip(10 * 1000));
+ TEST_SAY("AlterConsumerGroupOffsets: got %s in %.3fms\n",
+ rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+ if (rkev == NULL)
+ continue;
+ if (rd_kafka_event_error(rkev))
+ TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev),
+ rd_kafka_event_error_string(rkev));
+
+ if (rd_kafka_event_type(rkev) ==
+ RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT)
+ break;
+
+ rd_kafka_event_destroy(rkev);
+ }
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_AlterConsumerGroupOffsets_result(rkev);
+ TEST_ASSERT(res, "expected AlterConsumerGroupOffsets_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(!err,
+ "expected AlterConsumerGroupOffsets to succeed, "
+ "got %s (%s)",
+ rd_kafka_err2name(err), err ? errstr2 : "n/a");
+
+ TEST_SAY("AlterConsumerGroupOffsets: returned %s (%s)\n",
+ rd_kafka_err2str(err), err ? errstr2 : "n/a");
+
+ gres = rd_kafka_AlterConsumerGroupOffsets_result_groups(res, &gres_cnt);
+ TEST_ASSERT(gres && gres_cnt == 1,
+ "expected gres_cnt == 1, not %" PRIusz, gres_cnt);
+
+ alterd = rd_kafka_topic_partition_list_copy(
+ rd_kafka_group_result_partitions(gres[0]));
+
+ if (test_partition_list_and_offsets_cmp(alterd, to_alter)) {
+ TEST_SAY("Result list:\n");
+ test_print_partition_list(alterd);
+ TEST_SAY("Partitions passed to AlterConsumerGroupOffsets:\n");
+ test_print_partition_list(to_alter);
+ TEST_FAIL("altered/requested offsets don't match");
+ }
+
+ /* Verify expected errors */
+ for (i = 0; i < alterd->cnt; i++) {
+ TEST_ASSERT_LATER(alterd->elems[i].err == exp_err,
+ "Result %s [%" PRId32
+ "] has error %s, "
+ "expected %s",
+ alterd->elems[i].topic,
+ alterd->elems[i].partition,
+ rd_kafka_err2name(alterd->elems[i].err),
+ rd_kafka_err2name(exp_err));
+ }
+
+ TEST_LATER_CHECK();
+
+ rd_kafka_topic_partition_list_destroy(alterd);
+ rd_kafka_topic_partition_list_destroy(to_alter);
+
+ rd_kafka_event_destroy(rkev);
+
+
+ /* Verify committed offsets match, if topics exist. */
+ if (create_topics) {
+ committed = rd_kafka_topic_partition_list_copy(orig_offsets);
+ TEST_CALL_ERR__(rd_kafka_committed(consumer, committed,
+ tmout_multip(5 * 1000)));
+
+ rd_kafka_topic_partition_list_t *expected = offsets;
+ if (sub_consumer) {
+ /* Alter fails with an active consumer */
+ expected = orig_offsets;
+ }
+ TEST_SAY("Original committed offsets:\n");
+ test_print_partition_list(orig_offsets);
+
+ TEST_SAY("Committed offsets after alter:\n");
+ test_print_partition_list(committed);
+
+ if (test_partition_list_and_offsets_cmp(committed, expected)) {
+ TEST_SAY("expected list:\n");
+ test_print_partition_list(expected);
+ TEST_SAY("committed() list:\n");
+ test_print_partition_list(committed);
+ TEST_FAIL("committed offsets don't match");
+ }
+ rd_kafka_topic_partition_list_destroy(committed);
+ }
+
+ rd_kafka_topic_partition_list_destroy(offsets);
+ rd_kafka_topic_partition_list_destroy(orig_offsets);
+ rd_kafka_topic_partition_list_destroy(subscription);
+
+ for (i = 0; i < TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT; i++)
+ rd_free(topics[i]);
+
+ if (create_topics) /* consumer is created only if topics are. */
+ rd_kafka_destroy(consumer);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ TEST_LATER_CHECK();
+#undef TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief Test listing of committed offsets.
+ *
+ *
+ */
+static void do_test_ListConsumerGroupOffsets(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int req_timeout_ms,
+ rd_bool_t sub_consumer,
+ rd_bool_t null_toppars) {
+ rd_kafka_queue_t *q;
+ rd_kafka_AdminOptions_t *options = NULL;
+ rd_kafka_topic_partition_list_t *orig_offsets, *to_list, *committed,
+ *listd, *subscription = NULL;
+ rd_kafka_event_t *rkev = NULL;
+ rd_kafka_resp_err_t err;
+ char errstr[512];
+ const char *errstr2;
+#define TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT 3
+ int i;
+ const int partitions_cnt = 3;
+ char *topics[TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT];
+ rd_kafka_metadata_topic_t
+ exp_mdtopics[TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT] = {{0}};
+ int exp_mdtopic_cnt = 0;
+ test_timing_t timing;
+ rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ rd_kafka_ListConsumerGroupOffsets_t *cgoffsets;
+ const rd_kafka_ListConsumerGroupOffsets_result_t *res;
+ const rd_kafka_group_result_t **gres;
+ size_t gres_cnt;
+ rd_kafka_t *consumer;
+ char *group_id;
+
+ SUB_TEST_QUICK(
+ "%s ListConsumerGroupOffsets with %s, "
+ "request timeout %d%s",
+ rd_kafka_name(rk), what, req_timeout_ms,
+ sub_consumer ? ", with subscribing consumer" : "");
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ if (req_timeout_ms != -1) {
+ options = rd_kafka_AdminOptions_new(
+ rk, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS);
+
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, req_timeout_ms, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ }
+
+
+ subscription = rd_kafka_topic_partition_list_new(
+ TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT);
+
+ for (i = 0; i < TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT; i++) {
+ char pfx[64];
+ char *topic;
+
+ rd_snprintf(pfx, sizeof(pfx), "DCGO-topic%d", i);
+ topic = rd_strdup(test_mk_topic_name(pfx, 1));
+
+ topics[i] = topic;
+ exp_mdtopics[exp_mdtopic_cnt++].topic = topic;
+
+ rd_kafka_topic_partition_list_add(subscription, topic,
+ RD_KAFKA_PARTITION_UA);
+ }
+
+ group_id = topics[0];
+
+ /* Create the topics first. */
+ test_CreateTopics_simple(rk, NULL, topics,
+ TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT,
+ partitions_cnt, NULL);
+
+ /* Verify that topics are reported by metadata */
+ test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0,
+ 15 * 1000);
+
+ rd_sleep(1); /* Additional wait time for cluster propagation */
+
+ consumer = test_create_consumer(group_id, NULL, NULL, NULL);
+
+ if (sub_consumer) {
+ TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription));
+ test_consumer_wait_assignment(consumer, rd_true);
+ }
+
+ /* Commit some offsets */
+ orig_offsets = rd_kafka_topic_partition_list_new(
+ TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT * 2);
+ for (i = 0; i < TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT * 2; i++) {
+ rd_kafka_topic_partition_t *rktpar;
+ rktpar = rd_kafka_topic_partition_list_add(
+ orig_offsets, topics[i / 2],
+ i % TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT);
+ rktpar->offset = (i + 1) * 10;
+ rd_kafka_topic_partition_set_leader_epoch(rktpar, 2);
+ }
+
+ TEST_CALL_ERR__(rd_kafka_commit(consumer, orig_offsets, 0 /*sync*/));
+
+ /* Verify committed offsets match */
+ committed = rd_kafka_topic_partition_list_copy(orig_offsets);
+ TEST_CALL_ERR__(
+ rd_kafka_committed(consumer, committed, tmout_multip(5 * 1000)));
+
+ if (test_partition_list_and_offsets_cmp(committed, orig_offsets)) {
+ TEST_SAY("commit() list:\n");
+ test_print_partition_list(orig_offsets);
+ TEST_SAY("committed() list:\n");
+ test_print_partition_list(committed);
+ TEST_FAIL("committed offsets don't match");
+ }
+
+ rd_kafka_topic_partition_list_destroy(committed);
+
+ to_list = rd_kafka_topic_partition_list_new(orig_offsets->cnt);
+ for (i = 0; i < orig_offsets->cnt; i++) {
+ rd_kafka_topic_partition_list_add(
+ to_list, orig_offsets->elems[i].topic,
+ orig_offsets->elems[i].partition);
+ }
+
+ if (null_toppars) {
+ cgoffsets =
+ rd_kafka_ListConsumerGroupOffsets_new(group_id, NULL);
+ } else {
+ cgoffsets =
+ rd_kafka_ListConsumerGroupOffsets_new(group_id, to_list);
+ }
+
+ TIMING_START(&timing, "ListConsumerGroupOffsets");
+ TEST_SAY("Call ListConsumerGroupOffsets\n");
+ rd_kafka_ListConsumerGroupOffsets(rk, &cgoffsets, 1, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ rd_kafka_ListConsumerGroupOffsets_destroy(cgoffsets);
+
+ TIMING_START(&timing, "ListConsumerGroupOffsets.queue_poll");
+ /* Poll result queue for ListConsumerGroupOffsets result.
+ * Print but otherwise ignore other event types
+ * (typically generic Error events). */
+ while (1) {
+ rkev = rd_kafka_queue_poll(q, tmout_multip(10 * 1000));
+ TEST_SAY("ListConsumerGroupOffsets: got %s in %.3fms\n",
+ rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+ if (rkev == NULL)
+ continue;
+ if (rd_kafka_event_error(rkev))
+ TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev),
+ rd_kafka_event_error_string(rkev));
+
+ if (rd_kafka_event_type(rkev) ==
+ RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT)
+ break;
+
+ rd_kafka_event_destroy(rkev);
+ }
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_ListConsumerGroupOffsets_result(rkev);
+ TEST_ASSERT(res, "expected ListConsumerGroupOffsets_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(!err,
+ "expected ListConsumerGroupOffsets to succeed, "
+ "got %s (%s)",
+ rd_kafka_err2name(err), err ? errstr2 : "n/a");
+
+ TEST_SAY("ListConsumerGroupOffsets: returned %s (%s)\n",
+ rd_kafka_err2str(err), err ? errstr2 : "n/a");
+
+ gres = rd_kafka_ListConsumerGroupOffsets_result_groups(res, &gres_cnt);
+ TEST_ASSERT(gres && gres_cnt == 1,
+ "expected gres_cnt == 1, not %" PRIusz, gres_cnt);
+
+ listd = rd_kafka_topic_partition_list_copy(
+ rd_kafka_group_result_partitions(gres[0]));
+
+ if (test_partition_list_and_offsets_cmp(listd, orig_offsets)) {
+ TEST_SAY("Result list:\n");
+ test_print_partition_list(listd);
+ TEST_SAY("Partitions passed to ListConsumerGroupOffsets:\n");
+ test_print_partition_list(orig_offsets);
+ TEST_FAIL("listd/requested offsets don't match");
+ }
+
+ /* Verify expected errors */
+ for (i = 0; i < listd->cnt; i++) {
+ TEST_ASSERT_LATER(listd->elems[i].err == exp_err,
+ "Result %s [%" PRId32
+ "] has error %s, "
+ "expected %s",
+ listd->elems[i].topic,
+ listd->elems[i].partition,
+ rd_kafka_err2name(listd->elems[i].err),
+ rd_kafka_err2name(exp_err));
+ }
+
+ TEST_LATER_CHECK();
+
+ rd_kafka_topic_partition_list_destroy(listd);
+ rd_kafka_topic_partition_list_destroy(to_list);
+
+ rd_kafka_event_destroy(rkev);
+
+ rd_kafka_topic_partition_list_destroy(orig_offsets);
+ rd_kafka_topic_partition_list_destroy(subscription);
+
+ for (i = 0; i < TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT; i++)
+ rd_free(topics[i]);
+
+ rd_kafka_destroy(consumer);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ TEST_LATER_CHECK();
+
+#undef TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT
+
+ SUB_TEST_PASS();
+}
+
+static void do_test_apis(rd_kafka_type_t cltype) {
+ rd_kafka_t *rk;
+ rd_kafka_conf_t *conf;
+ rd_kafka_queue_t *mainq;
+
+ /* Get the available brokers, but use a separate rd_kafka_t instance
+ * so we don't jinx the tests by having up-to-date metadata. */
+ avail_brokers = test_get_broker_ids(NULL, &avail_broker_cnt);
+ TEST_SAY("%" PRIusz
+ " brokers in cluster "
+ "which will be used for replica sets\n",
+ avail_broker_cnt);
+
+ do_test_unclean_destroy(cltype, 0 /*tempq*/);
+ do_test_unclean_destroy(cltype, 1 /*mainq*/);
+
+ test_conf_init(&conf, NULL, 180);
+ test_conf_set(conf, "socket.timeout.ms", "10000");
+ rk = test_create_handle(cltype, conf);
+
+ mainq = rd_kafka_queue_get_main(rk);
+
+ /* Create topics */
+ do_test_CreateTopics("temp queue, op timeout 0", rk, NULL, 0, 0);
+ do_test_CreateTopics("temp queue, op timeout 15000", rk, NULL, 15000,
+ 0);
+ do_test_CreateTopics(
+ "temp queue, op timeout 300, "
+ "validate only",
+ rk, NULL, 300, rd_true);
+ do_test_CreateTopics("temp queue, op timeout 9000, validate_only", rk,
+ NULL, 9000, rd_true);
+ do_test_CreateTopics("main queue, options", rk, mainq, -1, 0);
+
+ /* Delete topics */
+ do_test_DeleteTopics("temp queue, op timeout 0", rk, NULL, 0);
+ do_test_DeleteTopics("main queue, op timeout 15000", rk, mainq, 1500);
+
+ if (test_broker_version >= TEST_BRKVER(1, 0, 0, 0)) {
+ /* Create Partitions */
+ do_test_CreatePartitions("temp queue, op timeout 6500", rk,
+ NULL, 6500);
+ do_test_CreatePartitions("main queue, op timeout 0", rk, mainq,
+ 0);
+ }
+
+ /* CreateAcls */
+ do_test_CreateAcls(rk, mainq, 0);
+ do_test_CreateAcls(rk, mainq, 1);
+
+ /* DescribeAcls */
+ do_test_DescribeAcls(rk, mainq, 0);
+ do_test_DescribeAcls(rk, mainq, 1);
+
+ /* DeleteAcls */
+ do_test_DeleteAcls(rk, mainq, 0);
+ do_test_DeleteAcls(rk, mainq, 1);
+
+ /* AlterConfigs */
+ do_test_AlterConfigs(rk, mainq);
+
+ /* DescribeConfigs */
+ do_test_DescribeConfigs(rk, mainq);
+
+ /* Delete records */
+ do_test_DeleteRecords("temp queue, op timeout 0", rk, NULL, 0);
+ do_test_DeleteRecords("main queue, op timeout 1500", rk, mainq, 1500);
+
+ /* List groups */
+ do_test_ListConsumerGroups("temp queue", rk, NULL, -1, rd_false);
+ do_test_ListConsumerGroups("main queue", rk, mainq, 1500, rd_true);
+
+ /* Describe groups */
+ do_test_DescribeConsumerGroups("temp queue", rk, NULL, -1);
+ do_test_DescribeConsumerGroups("main queue", rk, mainq, 1500);
+
+ /* Delete groups */
+ do_test_DeleteGroups("temp queue", rk, NULL, -1);
+ do_test_DeleteGroups("main queue", rk, mainq, 1500);
+
+ if (test_broker_version >= TEST_BRKVER(2, 4, 0, 0)) {
+ /* Delete committed offsets */
+ do_test_DeleteConsumerGroupOffsets("temp queue", rk, NULL, -1,
+ rd_false);
+ do_test_DeleteConsumerGroupOffsets("main queue", rk, mainq,
+ 1500, rd_false);
+ do_test_DeleteConsumerGroupOffsets(
+ "main queue", rk, mainq, 1500,
+ rd_true /*with subscribing consumer*/);
+
+ /* Alter committed offsets */
+ do_test_AlterConsumerGroupOffsets("temp queue", rk, NULL, -1,
+ rd_false, rd_true);
+ do_test_AlterConsumerGroupOffsets("main queue", rk, mainq, 1500,
+ rd_false, rd_true);
+ do_test_AlterConsumerGroupOffsets(
+ "main queue, nonexistent topics", rk, mainq, 1500, rd_false,
+ rd_false /* don't create topics */);
+ do_test_AlterConsumerGroupOffsets(
+ "main queue", rk, mainq, 1500,
+ rd_true, /*with subscribing consumer*/
+ rd_true);
+
+ /* List committed offsets */
+ do_test_ListConsumerGroupOffsets("temp queue", rk, NULL, -1,
+ rd_false, rd_false);
+ do_test_ListConsumerGroupOffsets(
+ "main queue, op timeout "
+ "1500",
+ rk, mainq, 1500, rd_false, rd_false);
+ do_test_ListConsumerGroupOffsets(
+ "main queue", rk, mainq, 1500,
+ rd_true /*with subscribing consumer*/, rd_false);
+ do_test_ListConsumerGroupOffsets("temp queue", rk, NULL, -1,
+ rd_false, rd_true);
+ do_test_ListConsumerGroupOffsets("main queue", rk, mainq, 1500,
+ rd_false, rd_true);
+ do_test_ListConsumerGroupOffsets(
+ "main queue", rk, mainq, 1500,
+ rd_true /*with subscribing consumer*/, rd_true);
+ }
+
+ rd_kafka_queue_destroy(mainq);
+
+ rd_kafka_destroy(rk);
+
+ free(avail_brokers);
+}
+
+
+int main_0081_admin(int argc, char **argv) {
+
+ do_test_apis(RD_KAFKA_PRODUCER);
+
+ if (test_quick) {
+ TEST_SAY("Skipping further 0081 tests due to quick mode\n");
+ return 0;
+ }
+
+ do_test_apis(RD_KAFKA_CONSUMER);
+
+ return 0;
+}