From b485aab7e71c1625cfc27e0f92c9509f42378458 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 5 May 2024 13:19:16 +0200 Subject: Adding upstream version 1.45.3+dfsg. Signed-off-by: Daniel Baumann --- .../lib/librdkafka-2.1.0/tests/0081-admin.c | 3797 ++++++++++++++++++++ 1 file changed, 3797 insertions(+) create mode 100644 src/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c') diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c new file mode 100644 index 000000000..7da2dff15 --- /dev/null +++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c @@ -0,0 +1,3797 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2015, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "test.h" +#include "rdkafka.h" +#include "../src/rdstring.h" + +/** + * @brief Admin API integration tests. + */ + + +static int32_t *avail_brokers; +static size_t avail_broker_cnt; + + + +static void do_test_CreateTopics(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int op_timeout, + rd_bool_t validate_only) { + rd_kafka_queue_t *q; +#define MY_NEW_TOPICS_CNT 7 + char *topics[MY_NEW_TOPICS_CNT]; + rd_kafka_NewTopic_t *new_topics[MY_NEW_TOPICS_CNT]; + rd_kafka_AdminOptions_t *options = NULL; + rd_kafka_resp_err_t exp_topicerr[MY_NEW_TOPICS_CNT] = {0}; + rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; + /* Expected topics in metadata */ + rd_kafka_metadata_topic_t exp_mdtopics[MY_NEW_TOPICS_CNT] = {{0}}; + int exp_mdtopic_cnt = 0; + /* Not expected topics in metadata */ + rd_kafka_metadata_topic_t exp_not_mdtopics[MY_NEW_TOPICS_CNT] = {{0}}; + int exp_not_mdtopic_cnt = 0; + int i; + char errstr[512]; + const char *errstr2; + rd_kafka_resp_err_t err; + test_timing_t timing; + rd_kafka_event_t *rkev; + const rd_kafka_CreateTopics_result_t *res; + const rd_kafka_topic_result_t **restopics; + size_t restopic_cnt; + int metadata_tmout; + int num_replicas = (int)avail_broker_cnt; + int32_t *replicas; + + SUB_TEST_QUICK( + "%s CreateTopics with %s, " + "op_timeout %d, validate_only %d", + rd_kafka_name(rk), what, op_timeout, validate_only); + + q = useq ? useq : rd_kafka_queue_new(rk); + + /* Set up replicas */ + replicas = rd_alloca(sizeof(*replicas) * num_replicas); + for (i = 0; i < num_replicas; i++) + replicas[i] = avail_brokers[i]; + + /** + * Construct NewTopic array with different properties for + * different partitions. + */ + for (i = 0; i < MY_NEW_TOPICS_CNT; i++) { + char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + int use_defaults = + i == 6 && test_broker_version >= TEST_BRKVER(2, 4, 0, 0); + int num_parts = !use_defaults ? (i * 7 + 1) : -1; + int set_config = (i & 1); + int add_invalid_config = (i == 1); + int set_replicas = !use_defaults && !(i % 3); + rd_kafka_resp_err_t this_exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; + + topics[i] = topic; + new_topics[i] = rd_kafka_NewTopic_new( + topic, num_parts, set_replicas ? -1 : num_replicas, NULL, + 0); + + if (set_config) { + /* + * Add various configuration properties + */ + err = rd_kafka_NewTopic_set_config( + new_topics[i], "compression.type", "lz4"); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + + err = rd_kafka_NewTopic_set_config( + new_topics[i], "delete.retention.ms", "900"); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + } + + if (add_invalid_config) { + /* Add invalid config property */ + err = rd_kafka_NewTopic_set_config( + new_topics[i], "dummy.doesntexist", + "broker is verifying this"); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + this_exp_err = RD_KAFKA_RESP_ERR_INVALID_CONFIG; + } + + TEST_SAY( + "Expecting result for topic #%d: %s " + "(set_config=%d, add_invalid_config=%d, " + "set_replicas=%d, use_defaults=%d)\n", + i, rd_kafka_err2name(this_exp_err), set_config, + add_invalid_config, set_replicas, use_defaults); + + if (set_replicas) { + int32_t p; + + /* + * Set valid replica assignments + */ + for (p = 0; p < num_parts; p++) { + err = rd_kafka_NewTopic_set_replica_assignment( + new_topics[i], p, replicas, num_replicas, + errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", errstr); + } + } + + if (this_exp_err || validate_only) { + exp_topicerr[i] = this_exp_err; + exp_not_mdtopics[exp_not_mdtopic_cnt++].topic = topic; + + } else { + exp_mdtopics[exp_mdtopic_cnt].topic = topic; + exp_mdtopics[exp_mdtopic_cnt].partition_cnt = num_parts; + exp_mdtopic_cnt++; + } + } + + if (op_timeout != -1 || validate_only) { + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_CREATETOPICS); + + if (op_timeout != -1) { + err = rd_kafka_AdminOptions_set_operation_timeout( + options, op_timeout, errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + } + + if (validate_only) { + err = rd_kafka_AdminOptions_set_validate_only( + options, validate_only, errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + } + } + + TIMING_START(&timing, "CreateTopics"); + TEST_SAY("Call CreateTopics\n"); + rd_kafka_CreateTopics(rk, new_topics, MY_NEW_TOPICS_CNT, options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + /* Poll result queue for CreateTopics result. + * Print but otherwise ignore other event types + * (typically generic Error events). */ + TIMING_START(&timing, "CreateTopics.queue_poll"); + do { + rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); + TEST_SAY("CreateTopics: got %s in %.3fms\n", + rd_kafka_event_name(rkev), + TIMING_DURATION(&timing) / 1000.0f); + if (rd_kafka_event_error(rkev)) + TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), + rd_kafka_event_error_string(rkev)); + } while (rd_kafka_event_type(rkev) != + RD_KAFKA_EVENT_CREATETOPICS_RESULT); + + /* Convert event to proper result */ + res = rd_kafka_event_CreateTopics_result(rkev); + TEST_ASSERT(res, "expected CreateTopics_result, not %s", + rd_kafka_event_name(rkev)); + + /* Expecting error */ + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(err == exp_err, + "expected CreateTopics to return %s, not %s (%s)", + rd_kafka_err2str(exp_err), rd_kafka_err2str(err), + err ? errstr2 : "n/a"); + + TEST_SAY("CreateTopics: returned %s (%s)\n", rd_kafka_err2str(err), + err ? errstr2 : "n/a"); + + /* Extract topics */ + restopics = rd_kafka_CreateTopics_result_topics(res, &restopic_cnt); + + + /* Scan topics for proper fields and expected failures. */ + for (i = 0; i < (int)restopic_cnt; i++) { + const rd_kafka_topic_result_t *terr = restopics[i]; + + /* Verify that topic order matches our request. */ + if (strcmp(rd_kafka_topic_result_name(terr), topics[i])) + TEST_FAIL_LATER( + "Topic result order mismatch at #%d: " + "expected %s, got %s", + i, topics[i], rd_kafka_topic_result_name(terr)); + + TEST_SAY("CreateTopics result: #%d: %s: %s: %s\n", i, + rd_kafka_topic_result_name(terr), + rd_kafka_err2name(rd_kafka_topic_result_error(terr)), + rd_kafka_topic_result_error_string(terr)); + if (rd_kafka_topic_result_error(terr) != exp_topicerr[i]) + TEST_FAIL_LATER("Expected %s, not %d: %s", + rd_kafka_err2name(exp_topicerr[i]), + rd_kafka_topic_result_error(terr), + rd_kafka_err2name( + rd_kafka_topic_result_error(terr))); + } + + /** + * Verify that the expecteded topics are created and the non-expected + * are not. Allow it some time to propagate. + */ + if (validate_only) { + /* No topics should have been created, give it some time + * before checking. */ + rd_sleep(2); + metadata_tmout = 5 * 1000; + } else { + if (op_timeout > 0) + metadata_tmout = op_timeout + 1000; + else + metadata_tmout = 10 * 1000; + } + + test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, + exp_not_mdtopics, exp_not_mdtopic_cnt, + metadata_tmout); + + rd_kafka_event_destroy(rkev); + + for (i = 0; i < MY_NEW_TOPICS_CNT; i++) { + rd_kafka_NewTopic_destroy(new_topics[i]); + rd_free(topics[i]); + } + + if (options) + rd_kafka_AdminOptions_destroy(options); + + if (!useq) + rd_kafka_queue_destroy(q); + + TEST_LATER_CHECK(); +#undef MY_NEW_TOPICS_CNT + + SUB_TEST_PASS(); +} + + + +/** + * @brief Test deletion of topics + * + * + */ +static void do_test_DeleteTopics(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int op_timeout) { + rd_kafka_queue_t *q; + const int skip_topic_cnt = 2; +#define MY_DEL_TOPICS_CNT 9 + char *topics[MY_DEL_TOPICS_CNT]; + rd_kafka_DeleteTopic_t *del_topics[MY_DEL_TOPICS_CNT]; + rd_kafka_AdminOptions_t *options = NULL; + rd_kafka_resp_err_t exp_topicerr[MY_DEL_TOPICS_CNT] = {0}; + rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; + /* Expected topics in metadata */ + rd_kafka_metadata_topic_t exp_mdtopics[MY_DEL_TOPICS_CNT] = {{0}}; + int exp_mdtopic_cnt = 0; + /* Not expected topics in metadata */ + rd_kafka_metadata_topic_t exp_not_mdtopics[MY_DEL_TOPICS_CNT] = {{0}}; + int exp_not_mdtopic_cnt = 0; + int i; + char errstr[512]; + const char *errstr2; + rd_kafka_resp_err_t err; + test_timing_t timing; + rd_kafka_event_t *rkev; + const rd_kafka_DeleteTopics_result_t *res; + const rd_kafka_topic_result_t **restopics; + size_t restopic_cnt; + int metadata_tmout; + + SUB_TEST_QUICK("%s DeleteTopics with %s, op_timeout %d", + rd_kafka_name(rk), what, op_timeout); + + q = useq ? useq : rd_kafka_queue_new(rk); + + /** + * Construct DeleteTopic array + */ + for (i = 0; i < MY_DEL_TOPICS_CNT; i++) { + char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + int notexist_topic = i >= MY_DEL_TOPICS_CNT - skip_topic_cnt; + + topics[i] = topic; + + del_topics[i] = rd_kafka_DeleteTopic_new(topic); + + if (notexist_topic) + exp_topicerr[i] = + RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + else { + exp_topicerr[i] = RD_KAFKA_RESP_ERR_NO_ERROR; + + exp_mdtopics[exp_mdtopic_cnt++].topic = topic; + } + + exp_not_mdtopics[exp_not_mdtopic_cnt++].topic = topic; + } + + if (op_timeout != -1) { + options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY); + + err = rd_kafka_AdminOptions_set_operation_timeout( + options, op_timeout, errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + } + + + /* Create the topics first, minus the skip count. */ + test_CreateTopics_simple(rk, NULL, topics, + MY_DEL_TOPICS_CNT - skip_topic_cnt, + 2 /*num_partitions*/, NULL); + + /* Verify that topics are reported by metadata */ + test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0, + 15 * 1000); + + TIMING_START(&timing, "DeleteTopics"); + TEST_SAY("Call DeleteTopics\n"); + rd_kafka_DeleteTopics(rk, del_topics, MY_DEL_TOPICS_CNT, options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + /* Poll result queue for DeleteTopics result. + * Print but otherwise ignore other event types + * (typically generic Error events). */ + TIMING_START(&timing, "DeleteTopics.queue_poll"); + while (1) { + rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); + TEST_SAY("DeleteTopics: got %s in %.3fms\n", + rd_kafka_event_name(rkev), + TIMING_DURATION(&timing) / 1000.0f); + if (rd_kafka_event_error(rkev)) + TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), + rd_kafka_event_error_string(rkev)); + + if (rd_kafka_event_type(rkev) == + RD_KAFKA_EVENT_DELETETOPICS_RESULT) + break; + + rd_kafka_event_destroy(rkev); + } + + /* Convert event to proper result */ + res = rd_kafka_event_DeleteTopics_result(rkev); + TEST_ASSERT(res, "expected DeleteTopics_result, not %s", + rd_kafka_event_name(rkev)); + + /* Expecting error */ + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(err == exp_err, + "expected DeleteTopics to return %s, not %s (%s)", + rd_kafka_err2str(exp_err), rd_kafka_err2str(err), + err ? errstr2 : "n/a"); + + TEST_SAY("DeleteTopics: returned %s (%s)\n", rd_kafka_err2str(err), + err ? errstr2 : "n/a"); + + /* Extract topics */ + restopics = rd_kafka_DeleteTopics_result_topics(res, &restopic_cnt); + + + /* Scan topics for proper fields and expected failures. */ + for (i = 0; i < (int)restopic_cnt; i++) { + const rd_kafka_topic_result_t *terr = restopics[i]; + + /* Verify that topic order matches our request. */ + if (strcmp(rd_kafka_topic_result_name(terr), topics[i])) + TEST_FAIL_LATER( + "Topic result order mismatch at #%d: " + "expected %s, got %s", + i, topics[i], rd_kafka_topic_result_name(terr)); + + TEST_SAY("DeleteTopics result: #%d: %s: %s: %s\n", i, + rd_kafka_topic_result_name(terr), + rd_kafka_err2name(rd_kafka_topic_result_error(terr)), + rd_kafka_topic_result_error_string(terr)); + if (rd_kafka_topic_result_error(terr) != exp_topicerr[i]) + TEST_FAIL_LATER("Expected %s, not %d: %s", + rd_kafka_err2name(exp_topicerr[i]), + rd_kafka_topic_result_error(terr), + rd_kafka_err2name( + rd_kafka_topic_result_error(terr))); + } + + /** + * Verify that the expected topics are deleted and the non-expected + * are not. Allow it some time to propagate. + */ + if (op_timeout > 0) + metadata_tmout = op_timeout + 1000; + else + metadata_tmout = 10 * 1000; + + test_wait_metadata_update(rk, NULL, 0, exp_not_mdtopics, + exp_not_mdtopic_cnt, metadata_tmout); + + rd_kafka_event_destroy(rkev); + + for (i = 0; i < MY_DEL_TOPICS_CNT; i++) { + rd_kafka_DeleteTopic_destroy(del_topics[i]); + rd_free(topics[i]); + } + + if (options) + rd_kafka_AdminOptions_destroy(options); + + if (!useq) + rd_kafka_queue_destroy(q); + + TEST_LATER_CHECK(); +#undef MY_DEL_TOPICS_CNT + + SUB_TEST_PASS(); +} + + + +/** + * @brief Test creation of partitions + * + * + */ +static void do_test_CreatePartitions(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int op_timeout) { + rd_kafka_queue_t *q; +#define MY_CRP_TOPICS_CNT 9 + char *topics[MY_CRP_TOPICS_CNT]; + rd_kafka_NewTopic_t *new_topics[MY_CRP_TOPICS_CNT]; + rd_kafka_NewPartitions_t *crp_topics[MY_CRP_TOPICS_CNT]; + rd_kafka_AdminOptions_t *options = NULL; + /* Expected topics in metadata */ + rd_kafka_metadata_topic_t exp_mdtopics[MY_CRP_TOPICS_CNT] = {{0}}; + rd_kafka_metadata_partition_t exp_mdparts[2] = {{0}}; + int exp_mdtopic_cnt = 0; + int i; + char errstr[512]; + rd_kafka_resp_err_t err; + test_timing_t timing; + int metadata_tmout; + int num_replicas = (int)avail_broker_cnt; + + SUB_TEST_QUICK("%s CreatePartitions with %s, op_timeout %d", + rd_kafka_name(rk), what, op_timeout); + + q = useq ? useq : rd_kafka_queue_new(rk); + + /* Set up two expected partitions with different replication sets + * so they can be matched by the metadata checker later. + * Even partitions use exp_mdparts[0] while odd partitions + * use exp_mdparts[1]. */ + + /* Set valid replica assignments (even, and odd (reverse) ) */ + exp_mdparts[0].replicas = + rd_alloca(sizeof(*exp_mdparts[0].replicas) * num_replicas); + exp_mdparts[1].replicas = + rd_alloca(sizeof(*exp_mdparts[1].replicas) * num_replicas); + exp_mdparts[0].replica_cnt = num_replicas; + exp_mdparts[1].replica_cnt = num_replicas; + for (i = 0; i < num_replicas; i++) { + exp_mdparts[0].replicas[i] = avail_brokers[i]; + exp_mdparts[1].replicas[i] = + avail_brokers[num_replicas - i - 1]; + } + + /** + * Construct CreatePartitions array + */ + for (i = 0; i < MY_CRP_TOPICS_CNT; i++) { + char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + int initial_part_cnt = 1 + (i * 2); + int new_part_cnt = 1 + (i / 2); + int final_part_cnt = initial_part_cnt + new_part_cnt; + int set_replicas = !(i % 2); + int pi; + + topics[i] = topic; + + /* Topic to create with initial partition count */ + new_topics[i] = rd_kafka_NewTopic_new( + topic, initial_part_cnt, set_replicas ? -1 : num_replicas, + NULL, 0); + + /* .. and later add more partitions to */ + crp_topics[i] = rd_kafka_NewPartitions_new( + topic, final_part_cnt, errstr, sizeof(errstr)); + + if (set_replicas) { + exp_mdtopics[exp_mdtopic_cnt].partitions = rd_alloca( + final_part_cnt * + sizeof(*exp_mdtopics[exp_mdtopic_cnt].partitions)); + + for (pi = 0; pi < final_part_cnt; pi++) { + const rd_kafka_metadata_partition_t *exp_mdp = + &exp_mdparts[pi & 1]; + + exp_mdtopics[exp_mdtopic_cnt].partitions[pi] = + *exp_mdp; /* copy */ + + exp_mdtopics[exp_mdtopic_cnt] + .partitions[pi] + .id = pi; + + if (pi < initial_part_cnt) { + /* Set replica assignment + * for initial partitions */ + err = + rd_kafka_NewTopic_set_replica_assignment( + new_topics[i], pi, + exp_mdp->replicas, + (size_t)exp_mdp->replica_cnt, + errstr, sizeof(errstr)); + TEST_ASSERT(!err, + "NewTopic_set_replica_" + "assignment: %s", + errstr); + } else { + /* Set replica assignment for new + * partitions */ + err = + rd_kafka_NewPartitions_set_replica_assignment( + crp_topics[i], + pi - initial_part_cnt, + exp_mdp->replicas, + (size_t)exp_mdp->replica_cnt, + errstr, sizeof(errstr)); + TEST_ASSERT(!err, + "NewPartitions_set_replica_" + "assignment: %s", + errstr); + } + } + } + + TEST_SAY(_C_YEL + "Topic %s with %d initial partitions will grow " + "by %d to %d total partitions with%s replicas set\n", + topics[i], initial_part_cnt, new_part_cnt, + final_part_cnt, set_replicas ? "" : "out"); + + exp_mdtopics[exp_mdtopic_cnt].topic = topic; + exp_mdtopics[exp_mdtopic_cnt].partition_cnt = final_part_cnt; + + exp_mdtopic_cnt++; + } + + if (op_timeout != -1) { + options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY); + + err = rd_kafka_AdminOptions_set_operation_timeout( + options, op_timeout, errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + } + + /* + * Create topics with initial partition count + */ + TIMING_START(&timing, "CreateTopics"); + TEST_SAY("Creating topics with initial partition counts\n"); + rd_kafka_CreateTopics(rk, new_topics, MY_CRP_TOPICS_CNT, options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + err = test_wait_topic_admin_result( + q, RD_KAFKA_EVENT_CREATETOPICS_RESULT, NULL, 15000); + TEST_ASSERT(!err, "CreateTopics failed: %s", rd_kafka_err2str(err)); + + rd_kafka_NewTopic_destroy_array(new_topics, MY_CRP_TOPICS_CNT); + + + /* + * Create new partitions + */ + TIMING_START(&timing, "CreatePartitions"); + TEST_SAY("Creating partitions\n"); + rd_kafka_CreatePartitions(rk, crp_topics, MY_CRP_TOPICS_CNT, options, + q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + err = test_wait_topic_admin_result( + q, RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT, NULL, 15000); + TEST_ASSERT(!err, "CreatePartitions failed: %s", rd_kafka_err2str(err)); + + rd_kafka_NewPartitions_destroy_array(crp_topics, MY_CRP_TOPICS_CNT); + + + /** + * Verify that the expected topics are deleted and the non-expected + * are not. Allow it some time to propagate. + */ + if (op_timeout > 0) + metadata_tmout = op_timeout + 1000; + else + metadata_tmout = 10 * 1000; + + test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0, + metadata_tmout); + + for (i = 0; i < MY_CRP_TOPICS_CNT; i++) + rd_free(topics[i]); + + if (options) + rd_kafka_AdminOptions_destroy(options); + + if (!useq) + rd_kafka_queue_destroy(q); + + TEST_LATER_CHECK(); +#undef MY_CRP_TOPICS_CNT + + SUB_TEST_PASS(); +} + + + +/** + * @brief Print the ConfigEntrys in the provided array. + */ +static void test_print_ConfigEntry_array(const rd_kafka_ConfigEntry_t **entries, + size_t entry_cnt, + unsigned int depth) { + const char *indent = &" "[4 - (depth > 4 ? 4 : depth)]; + size_t ei; + + for (ei = 0; ei < entry_cnt; ei++) { + const rd_kafka_ConfigEntry_t *e = entries[ei]; + const rd_kafka_ConfigEntry_t **syns; + size_t syn_cnt; + + syns = rd_kafka_ConfigEntry_synonyms(e, &syn_cnt); + +#define YN(v) ((v) ? "y" : "n") + TEST_SAYL( + 3, + "%s#%" PRIusz "/%" PRIusz + ": Source %s (%d): \"%s\"=\"%s\" " + "[is read-only=%s, default=%s, sensitive=%s, " + "synonym=%s] with %" PRIusz " synonym(s)\n", + indent, ei, entry_cnt, + rd_kafka_ConfigSource_name(rd_kafka_ConfigEntry_source(e)), + rd_kafka_ConfigEntry_source(e), + rd_kafka_ConfigEntry_name(e), + rd_kafka_ConfigEntry_value(e) + ? rd_kafka_ConfigEntry_value(e) + : "(NULL)", + YN(rd_kafka_ConfigEntry_is_read_only(e)), + YN(rd_kafka_ConfigEntry_is_default(e)), + YN(rd_kafka_ConfigEntry_is_sensitive(e)), + YN(rd_kafka_ConfigEntry_is_synonym(e)), syn_cnt); +#undef YN + + if (syn_cnt > 0) + test_print_ConfigEntry_array(syns, syn_cnt, depth + 1); + } +} + + +/** + * @brief Test AlterConfigs + */ +static void do_test_AlterConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { +#define MY_CONFRES_CNT 3 + char *topics[MY_CONFRES_CNT]; + rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT]; + rd_kafka_AdminOptions_t *options; + rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT]; + rd_kafka_event_t *rkev; + rd_kafka_resp_err_t err; + const rd_kafka_AlterConfigs_result_t *res; + const rd_kafka_ConfigResource_t **rconfigs; + size_t rconfig_cnt; + char errstr[128]; + const char *errstr2; + int ci = 0; + int i; + int fails = 0; + + SUB_TEST_QUICK(); + + /* + * Only create one topic, the others will be non-existent. + */ + for (i = 0; i < MY_CONFRES_CNT; i++) + rd_strdupa(&topics[i], test_mk_topic_name(__FUNCTION__, 1)); + + test_CreateTopics_simple(rk, NULL, topics, 1, 1, NULL); + + test_wait_topic_exists(rk, topics[0], 10000); + + /* + * ConfigResource #0: valid topic config + */ + configs[ci] = + rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_TOPIC, topics[ci]); + + err = rd_kafka_ConfigResource_set_config(configs[ci], + "compression.type", "gzip"); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + + err = rd_kafka_ConfigResource_set_config(configs[ci], "flush.ms", + "12345678"); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + + exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; + ci++; + + + if (test_broker_version >= TEST_BRKVER(1, 1, 0, 0)) { + /* + * ConfigResource #1: valid broker config + */ + configs[ci] = rd_kafka_ConfigResource_new( + RD_KAFKA_RESOURCE_BROKER, + tsprintf("%" PRId32, avail_brokers[0])); + + err = rd_kafka_ConfigResource_set_config( + configs[ci], "sasl.kerberos.min.time.before.relogin", + "58000"); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + + exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; + ci++; + } else { + TEST_WARN( + "Skipping RESOURCE_BROKER test on unsupported " + "broker version\n"); + } + + /* + * ConfigResource #2: valid topic config, non-existent topic + */ + configs[ci] = + rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_TOPIC, topics[ci]); + + err = rd_kafka_ConfigResource_set_config(configs[ci], + "compression.type", "lz4"); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + + err = rd_kafka_ConfigResource_set_config( + configs[ci], "offset.metadata.max.bytes", "12345"); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + + if (test_broker_version >= TEST_BRKVER(2, 7, 0, 0)) + exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + else + exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN; + ci++; + + + /* + * Timeout options + */ + options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ALTERCONFIGS); + err = rd_kafka_AdminOptions_set_request_timeout(options, 10000, errstr, + sizeof(errstr)); + TEST_ASSERT(!err, "%s", errstr); + + + /* + * Fire off request + */ + rd_kafka_AlterConfigs(rk, configs, ci, options, rkqu); + + rd_kafka_AdminOptions_destroy(options); + + /* + * Wait for result + */ + rkev = test_wait_admin_result(rkqu, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT, + 10000 + 1000); + + /* + * Extract result + */ + res = rd_kafka_event_AlterConfigs_result(rkev); + TEST_ASSERT(res, "Expected AlterConfigs result, not %s", + rd_kafka_event_name(rkev)); + + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(!err, "Expected success, not %s: %s", + rd_kafka_err2name(err), errstr2); + + rconfigs = rd_kafka_AlterConfigs_result_resources(res, &rconfig_cnt); + TEST_ASSERT((int)rconfig_cnt == ci, + "Expected %d result resources, got %" PRIusz "\n", ci, + rconfig_cnt); + + /* + * Verify status per resource + */ + for (i = 0; i < (int)rconfig_cnt; i++) { + const rd_kafka_ConfigEntry_t **entries; + size_t entry_cnt; + + err = rd_kafka_ConfigResource_error(rconfigs[i]); + errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[i]); + + entries = + rd_kafka_ConfigResource_configs(rconfigs[i], &entry_cnt); + + TEST_SAY( + "ConfigResource #%d: type %s (%d), \"%s\": " + "%" PRIusz " ConfigEntries, error %s (%s)\n", + i, + rd_kafka_ResourceType_name( + rd_kafka_ConfigResource_type(rconfigs[i])), + rd_kafka_ConfigResource_type(rconfigs[i]), + rd_kafka_ConfigResource_name(rconfigs[i]), entry_cnt, + rd_kafka_err2name(err), errstr2 ? errstr2 : ""); + + test_print_ConfigEntry_array(entries, entry_cnt, 1); + + if (rd_kafka_ConfigResource_type(rconfigs[i]) != + rd_kafka_ConfigResource_type(configs[i]) || + strcmp(rd_kafka_ConfigResource_name(rconfigs[i]), + rd_kafka_ConfigResource_name(configs[i]))) { + TEST_FAIL_LATER( + "ConfigResource #%d: " + "expected type %s name %s, " + "got type %s name %s", + i, + rd_kafka_ResourceType_name( + rd_kafka_ConfigResource_type(configs[i])), + rd_kafka_ConfigResource_name(configs[i]), + rd_kafka_ResourceType_name( + rd_kafka_ConfigResource_type(rconfigs[i])), + rd_kafka_ConfigResource_name(rconfigs[i])); + fails++; + continue; + } + + + if (err != exp_err[i]) { + TEST_FAIL_LATER( + "ConfigResource #%d: " + "expected %s (%d), got %s (%s)", + i, rd_kafka_err2name(exp_err[i]), exp_err[i], + rd_kafka_err2name(err), errstr2 ? errstr2 : ""); + fails++; + } + } + + TEST_ASSERT(!fails, "See %d previous failure(s)", fails); + + rd_kafka_event_destroy(rkev); + + rd_kafka_ConfigResource_destroy_array(configs, ci); + + TEST_LATER_CHECK(); +#undef MY_CONFRES_CNT + + SUB_TEST_PASS(); +} + + + +/** + * @brief Test DescribeConfigs + */ +static void do_test_DescribeConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { +#define MY_CONFRES_CNT 3 + char *topics[MY_CONFRES_CNT]; + rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT]; + rd_kafka_AdminOptions_t *options; + rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT]; + rd_kafka_event_t *rkev; + rd_kafka_resp_err_t err; + const rd_kafka_DescribeConfigs_result_t *res; + const rd_kafka_ConfigResource_t **rconfigs; + size_t rconfig_cnt; + char errstr[128]; + const char *errstr2; + int ci = 0; + int i; + int fails = 0; + int max_retry_describe = 3; + + SUB_TEST_QUICK(); + + /* + * Only create one topic, the others will be non-existent. + */ + rd_strdupa(&topics[0], test_mk_topic_name("DescribeConfigs_exist", 1)); + for (i = 1; i < MY_CONFRES_CNT; i++) + rd_strdupa(&topics[i], + test_mk_topic_name("DescribeConfigs_notexist", 1)); + + test_CreateTopics_simple(rk, NULL, topics, 1, 1, NULL); + + /* + * ConfigResource #0: topic config, no config entries. + */ + configs[ci] = + rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_TOPIC, topics[ci]); + exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; + ci++; + + /* + * ConfigResource #1:broker config, no config entries + */ + configs[ci] = rd_kafka_ConfigResource_new( + RD_KAFKA_RESOURCE_BROKER, tsprintf("%" PRId32, avail_brokers[0])); + + exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; + ci++; + + /* + * ConfigResource #2: topic config, non-existent topic, no config entr. + */ + configs[ci] = + rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_TOPIC, topics[ci]); + /* FIXME: This is a bug in the broker ( 0) { + TEST_WARN( + "ConfigResource #%d: " + "expected %s (%d), got %s (%s): " + "this is typically a temporary " + "error while the new resource " + "is propagating: retrying", + i, rd_kafka_err2name(exp_err[i]), + exp_err[i], rd_kafka_err2name(err), + errstr2 ? errstr2 : ""); + rd_kafka_event_destroy(rkev); + rd_sleep(1); + goto retry_describe; + } + + TEST_FAIL_LATER( + "ConfigResource #%d: " + "expected %s (%d), got %s (%s)", + i, rd_kafka_err2name(exp_err[i]), exp_err[i], + rd_kafka_err2name(err), errstr2 ? errstr2 : ""); + fails++; + } + } + + TEST_ASSERT(!fails, "See %d previous failure(s)", fails); + + rd_kafka_event_destroy(rkev); + + rd_kafka_ConfigResource_destroy_array(configs, ci); + + TEST_LATER_CHECK(); +#undef MY_CONFRES_CNT + + SUB_TEST_PASS(); +} + +/** + * @brief Test CreateAcls + */ +static void +do_test_CreateAcls(rd_kafka_t *rk, rd_kafka_queue_t *useq, int version) { + rd_kafka_queue_t *q = useq ? useq : rd_kafka_queue_new(rk); + size_t resacl_cnt; + test_timing_t timing; + rd_kafka_resp_err_t err; + char errstr[128]; + const char *errstr2; + const char *user_test1 = "User:test1"; + const char *user_test2 = "User:test2"; + const char *base_topic_name; + char topic1_name[512]; + char topic2_name[512]; + rd_kafka_AclBinding_t *acl_bindings[2]; + rd_kafka_ResourcePatternType_t pattern_type_first_topic = + RD_KAFKA_RESOURCE_PATTERN_PREFIXED; + rd_kafka_AdminOptions_t *admin_options; + rd_kafka_event_t *rkev_acl_create; + const rd_kafka_CreateAcls_result_t *acl_res; + const rd_kafka_acl_result_t **acl_res_acls; + unsigned int i; + + SUB_TEST_QUICK(); + + if (version == 0) + pattern_type_first_topic = RD_KAFKA_RESOURCE_PATTERN_LITERAL; + + base_topic_name = test_mk_topic_name(__FUNCTION__, 1); + + rd_snprintf(topic1_name, sizeof(topic1_name), "%s_1", base_topic_name); + rd_snprintf(topic2_name, sizeof(topic2_name), "%s_2", base_topic_name); + + + acl_bindings[0] = rd_kafka_AclBinding_new( + RD_KAFKA_RESOURCE_TOPIC, topic1_name, pattern_type_first_topic, + user_test1, "*", RD_KAFKA_ACL_OPERATION_READ, + RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, NULL, 0); + acl_bindings[1] = rd_kafka_AclBinding_new( + RD_KAFKA_RESOURCE_TOPIC, topic2_name, + RD_KAFKA_RESOURCE_PATTERN_LITERAL, user_test2, "*", + RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, + NULL, 0); + + + admin_options = + rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATEACLS); + err = rd_kafka_AdminOptions_set_request_timeout(admin_options, 10000, + errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", errstr); + + TIMING_START(&timing, "CreateAcls"); + TEST_SAY("Call CreateAcls\n"); + rd_kafka_CreateAcls(rk, acl_bindings, 2, admin_options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + /* + * Wait for result + */ + rkev_acl_create = test_wait_admin_result( + q, RD_KAFKA_EVENT_CREATEACLS_RESULT, 10000 + 1000); + + err = rd_kafka_event_error(rkev_acl_create); + errstr2 = rd_kafka_event_error_string(rkev_acl_create); + + if (test_broker_version < TEST_BRKVER(0, 11, 0, 0)) { + TEST_ASSERT(err == RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, + "Expected unsupported feature, not: %s", + rd_kafka_err2name(err)); + TEST_ASSERT(!strcmp(errstr2, + "ACLs Admin API (KIP-140) not supported " + "by broker, requires broker " + "version >= 0.11.0.0"), + "Expected a different message, not: %s", errstr2); + TEST_FAIL("Unexpected error: %s", rd_kafka_err2name(err)); + } + + if (version > 0 && test_broker_version < TEST_BRKVER(2, 0, 0, 0)) { + TEST_ASSERT(err == RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, + "Expected unsupported feature, not: %s", + rd_kafka_err2name(err)); + TEST_ASSERT(!strcmp(errstr2, + "Broker only supports LITERAL " + "resource pattern types"), + "Expected a different message, not: %s", errstr2); + TEST_FAIL("Unexpected error: %s", rd_kafka_err2name(err)); + } + + TEST_ASSERT(!err, "Expected success, not %s: %s", + rd_kafka_err2name(err), errstr2); + + /* + * Extract result + */ + acl_res = rd_kafka_event_CreateAcls_result(rkev_acl_create); + TEST_ASSERT(acl_res, "Expected CreateAcls result, not %s", + rd_kafka_event_name(rkev_acl_create)); + + acl_res_acls = rd_kafka_CreateAcls_result_acls(acl_res, &resacl_cnt); + TEST_ASSERT(resacl_cnt == 2, "Expected 2, not %zu", resacl_cnt); + + for (i = 0; i < resacl_cnt; i++) { + const rd_kafka_acl_result_t *acl_res_acl = *(acl_res_acls + i); + const rd_kafka_error_t *error = + rd_kafka_acl_result_error(acl_res_acl); + + TEST_ASSERT(!error, + "Expected RD_KAFKA_RESP_ERR_NO_ERROR, not %s", + rd_kafka_error_string(error)); + } + + rd_kafka_AdminOptions_destroy(admin_options); + rd_kafka_event_destroy(rkev_acl_create); + rd_kafka_AclBinding_destroy_array(acl_bindings, 2); + if (!useq) + rd_kafka_queue_destroy(q); + + SUB_TEST_PASS(); +} + +/** + * @brief Test DescribeAcls + */ +static void +do_test_DescribeAcls(rd_kafka_t *rk, rd_kafka_queue_t *useq, int version) { + rd_kafka_queue_t *q = useq ? useq : rd_kafka_queue_new(rk); + size_t acl_binding_results_cntp; + test_timing_t timing; + rd_kafka_resp_err_t err; + uint32_t i; + char errstr[128]; + const char *errstr2; + const char *user_test1 = "User:test1"; + const char *user_test2 = "User:test2"; + const char *any_host = "*"; + const char *topic_name; + rd_kafka_AclBinding_t *acl_bindings_create[2]; + rd_kafka_AclBinding_t *acl_bindings_describe; + rd_kafka_AclBinding_t *acl; + const rd_kafka_DescribeAcls_result_t *acl_describe_result; + const rd_kafka_AclBinding_t **acl_binding_results; + rd_kafka_ResourcePatternType_t pattern_type_first_topic_create; + rd_bool_t broker_version1 = + test_broker_version >= TEST_BRKVER(2, 0, 0, 0); + rd_kafka_resp_err_t create_err; + rd_kafka_AdminOptions_t *admin_options; + rd_kafka_event_t *rkev_acl_describe; + const rd_kafka_error_t *error; + + SUB_TEST_QUICK(); + + if (test_broker_version < TEST_BRKVER(0, 11, 0, 0)) { + SUB_TEST_SKIP( + "Skipping DESCRIBE_ACLS test on unsupported " + "broker version\n"); + return; + } + + pattern_type_first_topic_create = RD_KAFKA_RESOURCE_PATTERN_PREFIXED; + if (!broker_version1) + pattern_type_first_topic_create = + RD_KAFKA_RESOURCE_PATTERN_LITERAL; + + topic_name = test_mk_topic_name(__FUNCTION__, 1); + + acl_bindings_create[0] = rd_kafka_AclBinding_new( + RD_KAFKA_RESOURCE_TOPIC, topic_name, + pattern_type_first_topic_create, user_test1, any_host, + RD_KAFKA_ACL_OPERATION_READ, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, + NULL, 0); + acl_bindings_create[1] = rd_kafka_AclBinding_new( + RD_KAFKA_RESOURCE_TOPIC, topic_name, + RD_KAFKA_RESOURCE_PATTERN_LITERAL, user_test2, any_host, + RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, + NULL, 0); + + create_err = + test_CreateAcls_simple(rk, NULL, acl_bindings_create, 2, NULL); + + TEST_ASSERT(!create_err, "create error: %s", + rd_kafka_err2str(create_err)); + + acl_bindings_describe = rd_kafka_AclBindingFilter_new( + RD_KAFKA_RESOURCE_TOPIC, topic_name, + RD_KAFKA_RESOURCE_PATTERN_MATCH, NULL, NULL, + RD_KAFKA_ACL_OPERATION_ANY, RD_KAFKA_ACL_PERMISSION_TYPE_ANY, NULL, + 0); + + admin_options = + rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DESCRIBEACLS); + err = rd_kafka_AdminOptions_set_request_timeout(admin_options, 10000, + errstr, sizeof(errstr)); + + TIMING_START(&timing, "DescribeAcls"); + TEST_SAY("Call DescribeAcls\n"); + rd_kafka_DescribeAcls(rk, acl_bindings_describe, admin_options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + /* + * Wait for result + */ + rkev_acl_describe = test_wait_admin_result( + q, RD_KAFKA_EVENT_DESCRIBEACLS_RESULT, 10000 + 1000); + + err = rd_kafka_event_error(rkev_acl_describe); + errstr2 = rd_kafka_event_error_string(rkev_acl_describe); + + if (!broker_version1) { + TEST_ASSERT( + err == RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, + "expected RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, not %s", + rd_kafka_err2str(err)); + TEST_ASSERT(strcmp(errstr2, + "Broker only supports LITERAL and ANY " + "resource pattern types") == 0, + "expected another message, not %s", errstr2); + } else { + TEST_ASSERT(!err, "expected RD_KAFKA_RESP_ERR_NO_ERROR not %s", + errstr2); + } + + if (!err) { + + acl_describe_result = + rd_kafka_event_DescribeAcls_result(rkev_acl_describe); + + TEST_ASSERT(acl_describe_result, + "acl_describe_result should not be NULL"); + + acl_binding_results_cntp = 0; + acl_binding_results = rd_kafka_DescribeAcls_result_acls( + acl_describe_result, &acl_binding_results_cntp); + + TEST_ASSERT(acl_binding_results_cntp == 2, + "acl_binding_results_cntp should be 2, not %zu", + acl_binding_results_cntp); + + for (i = 0; i < acl_binding_results_cntp; i++) { + acl = (rd_kafka_AclBinding_t *)acl_binding_results[i]; + + if (strcmp(rd_kafka_AclBinding_principal(acl), + user_test1) == 0) { + TEST_ASSERT( + rd_kafka_AclBinding_restype(acl) == + RD_KAFKA_RESOURCE_TOPIC, + "acl->restype should be " + "RD_KAFKA_RESOURCE_TOPIC, not %s", + rd_kafka_ResourceType_name( + rd_kafka_AclBinding_restype(acl))); + TEST_ASSERT( + strcmp(rd_kafka_AclBinding_name(acl), + topic_name) == 0, + "acl->name should be %s, not %s", + topic_name, rd_kafka_AclBinding_name(acl)); + TEST_ASSERT( + rd_kafka_AclBinding_resource_pattern_type( + acl) == pattern_type_first_topic_create, + "acl->resource_pattern_type should be %s, " + "not %s", + rd_kafka_ResourcePatternType_name( + pattern_type_first_topic_create), + rd_kafka_ResourcePatternType_name( + rd_kafka_AclBinding_resource_pattern_type( + acl))); + TEST_ASSERT( + strcmp(rd_kafka_AclBinding_principal(acl), + user_test1) == 0, + "acl->principal should be %s, not %s", + user_test1, + rd_kafka_AclBinding_principal(acl)); + + TEST_ASSERT( + strcmp(rd_kafka_AclBinding_host(acl), + any_host) == 0, + "acl->host should be %s, not %s", any_host, + rd_kafka_AclBinding_host(acl)); + + TEST_ASSERT( + rd_kafka_AclBinding_operation(acl) == + RD_KAFKA_ACL_OPERATION_READ, + "acl->operation should be %s, not %s", + rd_kafka_AclOperation_name( + RD_KAFKA_ACL_OPERATION_READ), + rd_kafka_AclOperation_name( + rd_kafka_AclBinding_operation(acl))); + + TEST_ASSERT( + rd_kafka_AclBinding_permission_type(acl) == + RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, + "acl->permission_type should be %s, not %s", + rd_kafka_AclPermissionType_name( + RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW), + rd_kafka_AclPermissionType_name( + rd_kafka_AclBinding_permission_type( + acl))); + + error = rd_kafka_AclBinding_error(acl); + TEST_ASSERT(!error, + "acl->error should be NULL, not %s", + rd_kafka_error_string(error)); + + } else { + TEST_ASSERT( + rd_kafka_AclBinding_restype(acl) == + RD_KAFKA_RESOURCE_TOPIC, + "acl->restype should be " + "RD_KAFKA_RESOURCE_TOPIC, not %s", + rd_kafka_ResourceType_name( + rd_kafka_AclBinding_restype(acl))); + TEST_ASSERT( + strcmp(rd_kafka_AclBinding_name(acl), + topic_name) == 0, + "acl->name should be %s, not %s", + topic_name, rd_kafka_AclBinding_name(acl)); + TEST_ASSERT( + rd_kafka_AclBinding_resource_pattern_type( + acl) == + RD_KAFKA_RESOURCE_PATTERN_LITERAL, + "acl->resource_pattern_type should be %s, " + "not %s", + rd_kafka_ResourcePatternType_name( + RD_KAFKA_RESOURCE_PATTERN_LITERAL), + rd_kafka_ResourcePatternType_name( + rd_kafka_AclBinding_resource_pattern_type( + acl))); + TEST_ASSERT( + strcmp(rd_kafka_AclBinding_principal(acl), + user_test2) == 0, + "acl->principal should be %s, not %s", + user_test2, + rd_kafka_AclBinding_principal(acl)); + + TEST_ASSERT( + strcmp(rd_kafka_AclBinding_host(acl), + any_host) == 0, + "acl->host should be %s, not %s", any_host, + rd_kafka_AclBinding_host(acl)); + + TEST_ASSERT( + rd_kafka_AclBinding_operation(acl) == + RD_KAFKA_ACL_OPERATION_WRITE, + "acl->operation should be %s, not %s", + rd_kafka_AclOperation_name( + RD_KAFKA_ACL_OPERATION_WRITE), + rd_kafka_AclOperation_name( + rd_kafka_AclBinding_operation(acl))); + + TEST_ASSERT( + rd_kafka_AclBinding_permission_type(acl) == + RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, + "acl->permission_type should be %s, not %s", + rd_kafka_AclPermissionType_name( + RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW), + rd_kafka_AclPermissionType_name( + rd_kafka_AclBinding_permission_type( + acl))); + + + error = rd_kafka_AclBinding_error(acl); + TEST_ASSERT(!error, + "acl->error should be NULL, not %s", + rd_kafka_error_string(error)); + } + } + } + + rd_kafka_AclBinding_destroy(acl_bindings_describe); + rd_kafka_event_destroy(rkev_acl_describe); + + acl_bindings_describe = rd_kafka_AclBindingFilter_new( + RD_KAFKA_RESOURCE_TOPIC, topic_name, + RD_KAFKA_RESOURCE_PATTERN_LITERAL, NULL, NULL, + RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ANY, + NULL, 0); + + TIMING_START(&timing, "DescribeAcls"); + rd_kafka_DescribeAcls(rk, acl_bindings_describe, admin_options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + /* + * Wait for result + */ + rkev_acl_describe = test_wait_admin_result( + q, RD_KAFKA_EVENT_DESCRIBEACLS_RESULT, 10000 + 1000); + + err = rd_kafka_event_error(rkev_acl_describe); + errstr2 = rd_kafka_event_error_string(rkev_acl_describe); + + TEST_ASSERT(!err, "expected RD_KAFKA_RESP_ERR_NO_ERROR not %s", + errstr2); + + acl_describe_result = + rd_kafka_event_DescribeAcls_result(rkev_acl_describe); + + TEST_ASSERT(acl_describe_result, + "acl_describe_result should not be NULL"); + + acl_binding_results_cntp = 0; + acl_binding_results = rd_kafka_DescribeAcls_result_acls( + acl_describe_result, &acl_binding_results_cntp); + + TEST_ASSERT(acl_binding_results_cntp == 1, + "acl_binding_results_cntp should be 1, not %zu", + acl_binding_results_cntp); + + acl = (rd_kafka_AclBinding_t *)acl_binding_results[0]; + + TEST_ASSERT( + rd_kafka_AclBinding_restype(acl) == RD_KAFKA_RESOURCE_TOPIC, + "acl->restype should be RD_KAFKA_RESOURCE_TOPIC, not %s", + rd_kafka_ResourceType_name(rd_kafka_AclBinding_restype(acl))); + TEST_ASSERT(strcmp(rd_kafka_AclBinding_name(acl), topic_name) == 0, + "acl->name should be %s, not %s", topic_name, + rd_kafka_AclBinding_name(acl)); + TEST_ASSERT(rd_kafka_AclBinding_resource_pattern_type(acl) == + RD_KAFKA_RESOURCE_PATTERN_LITERAL, + "acl->resource_pattern_type should be %s, not %s", + rd_kafka_ResourcePatternType_name( + RD_KAFKA_RESOURCE_PATTERN_LITERAL), + rd_kafka_ResourcePatternType_name( + rd_kafka_AclBinding_resource_pattern_type(acl))); + TEST_ASSERT(strcmp(rd_kafka_AclBinding_principal(acl), user_test2) == 0, + "acl->principal should be %s, not %s", user_test2, + rd_kafka_AclBinding_principal(acl)); + + TEST_ASSERT(strcmp(rd_kafka_AclBinding_host(acl), any_host) == 0, + "acl->host should be %s, not %s", any_host, + rd_kafka_AclBinding_host(acl)); + + TEST_ASSERT( + rd_kafka_AclBinding_permission_type(acl) == + RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, + "acl->permission_type should be %s, not %s", + rd_kafka_AclPermissionType_name(RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW), + rd_kafka_AclPermissionType_name( + rd_kafka_AclBinding_permission_type(acl))); + + error = rd_kafka_AclBinding_error(acl); + TEST_ASSERT(!error, "acl->error should be NULL, not %s", + rd_kafka_error_string(error)); + + rd_kafka_AclBinding_destroy(acl_bindings_describe); + rd_kafka_event_destroy(rkev_acl_describe); + rd_kafka_AdminOptions_destroy(admin_options); + rd_kafka_AclBinding_destroy_array(acl_bindings_create, 2); + + if (!useq) + rd_kafka_queue_destroy(q); + + SUB_TEST_PASS(); +} + +/** + * @brief Count acls by acl filter + */ +static size_t +do_test_acls_count(rd_kafka_t *rk, + rd_kafka_AclBindingFilter_t *acl_bindings_describe, + rd_kafka_queue_t *q) { + char errstr[128]; + rd_kafka_resp_err_t err; + rd_kafka_AdminOptions_t *admin_options_describe; + rd_kafka_event_t *rkev_acl_describe; + const rd_kafka_DescribeAcls_result_t *acl_describe_result; + const char *errstr2; + size_t acl_binding_results_cntp; + + admin_options_describe = + rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DESCRIBEACLS); + rd_kafka_AdminOptions_set_request_timeout(admin_options_describe, 10000, + errstr, sizeof(errstr)); + + rd_kafka_DescribeAcls(rk, acl_bindings_describe, admin_options_describe, + q); + /* + * Wait for result + */ + rkev_acl_describe = test_wait_admin_result( + q, RD_KAFKA_EVENT_DESCRIBEACLS_RESULT, 10000 + 1000); + + err = rd_kafka_event_error(rkev_acl_describe); + errstr2 = rd_kafka_event_error_string(rkev_acl_describe); + + TEST_ASSERT(!err, "expected RD_KAFKA_RESP_ERR_NO_ERROR not %s", + errstr2); + + acl_describe_result = + rd_kafka_event_DescribeAcls_result(rkev_acl_describe); + + TEST_ASSERT(acl_describe_result, + "acl_describe_result should not be NULL"); + + acl_binding_results_cntp = 0; + rd_kafka_DescribeAcls_result_acls(acl_describe_result, + &acl_binding_results_cntp); + rd_kafka_event_destroy(rkev_acl_describe); + rd_kafka_AdminOptions_destroy(admin_options_describe); + + return acl_binding_results_cntp; +} + +/** + * @brief Test DeleteAcls + */ +static void +do_test_DeleteAcls(rd_kafka_t *rk, rd_kafka_queue_t *useq, int version) { + rd_kafka_queue_t *q = useq ? useq : rd_kafka_queue_new(rk); + test_timing_t timing; + uint32_t i; + char errstr[128]; + const char *user_test1 = "User:test1"; + const char *user_test2 = "User:test2"; + const char *any_host = "*"; + const char *base_topic_name; + char topic1_name[512]; + char topic2_name[512]; + size_t acl_binding_results_cntp; + size_t DeleteAcls_result_responses_cntp; + size_t matching_acls_cntp; + rd_kafka_AclBinding_t *acl_bindings_create[3]; + rd_kafka_AclBindingFilter_t *acl_bindings_describe; + rd_kafka_AclBindingFilter_t *acl_bindings_delete; + rd_kafka_event_t *rkev_acl_delete; + rd_kafka_AdminOptions_t *admin_options_delete; + const rd_kafka_DeleteAcls_result_t *acl_delete_result; + const rd_kafka_DeleteAcls_result_response_t * + *DeleteAcls_result_responses; + const rd_kafka_DeleteAcls_result_response_t *DeleteAcls_result_response; + const rd_kafka_AclBinding_t **matching_acls; + const rd_kafka_AclBinding_t *matching_acl; + rd_kafka_ResourcePatternType_t pattern_type_first_topic_create; + rd_kafka_ResourcePatternType_t pattern_type_delete; + rd_bool_t broker_version1 = + test_broker_version >= TEST_BRKVER(2, 0, 0, 0); + rd_kafka_resp_err_t create_err; + rd_kafka_ResourceType_t restype; + rd_kafka_ResourcePatternType_t resource_pattern_type; + rd_kafka_AclOperation_t operation; + rd_kafka_AclPermissionType_t permission_type; + const char *name; + const char *principal; + const rd_kafka_error_t *error; + + SUB_TEST_QUICK(); + + if (test_broker_version < TEST_BRKVER(0, 11, 0, 0)) { + SUB_TEST_SKIP( + "Skipping DELETE_ACLS test on unsupported " + "broker version\n"); + return; + } + + pattern_type_first_topic_create = RD_KAFKA_RESOURCE_PATTERN_PREFIXED; + pattern_type_delete = RD_KAFKA_RESOURCE_PATTERN_MATCH; + if (!broker_version1) { + pattern_type_first_topic_create = + RD_KAFKA_RESOURCE_PATTERN_LITERAL; + pattern_type_delete = RD_KAFKA_RESOURCE_PATTERN_LITERAL; + } + + base_topic_name = test_mk_topic_name(__FUNCTION__, 1); + + rd_snprintf(topic1_name, sizeof(topic1_name), "%s_1", base_topic_name); + rd_snprintf(topic2_name, sizeof(topic2_name), "%s_2", base_topic_name); + + acl_bindings_create[0] = rd_kafka_AclBinding_new( + RD_KAFKA_RESOURCE_TOPIC, topic1_name, + pattern_type_first_topic_create, user_test1, any_host, + RD_KAFKA_ACL_OPERATION_READ, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, + NULL, 0); + acl_bindings_create[1] = rd_kafka_AclBinding_new( + RD_KAFKA_RESOURCE_TOPIC, topic1_name, + RD_KAFKA_RESOURCE_PATTERN_LITERAL, user_test2, any_host, + RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, + NULL, 0); + acl_bindings_create[2] = rd_kafka_AclBinding_new( + RD_KAFKA_RESOURCE_TOPIC, topic2_name, + RD_KAFKA_RESOURCE_PATTERN_LITERAL, user_test2, any_host, + RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, + NULL, 0); + + acl_bindings_delete = rd_kafka_AclBindingFilter_new( + RD_KAFKA_RESOURCE_TOPIC, topic1_name, pattern_type_delete, NULL, + NULL, RD_KAFKA_ACL_OPERATION_ANY, RD_KAFKA_ACL_PERMISSION_TYPE_ANY, + NULL, 0); + + acl_bindings_describe = acl_bindings_delete; + + create_err = + test_CreateAcls_simple(rk, NULL, acl_bindings_create, 3, NULL); + + TEST_ASSERT(!create_err, "create error: %s", + rd_kafka_err2str(create_err)); + + admin_options_delete = + rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETEACLS); + rd_kafka_AdminOptions_set_request_timeout(admin_options_delete, 10000, + errstr, sizeof(errstr)); + + acl_binding_results_cntp = + do_test_acls_count(rk, acl_bindings_describe, q); + TEST_ASSERT(acl_binding_results_cntp == 2, + "acl_binding_results_cntp should not be 2, not %zu\n", + acl_binding_results_cntp); + + TIMING_START(&timing, "DeleteAcls"); + rd_kafka_DeleteAcls(rk, &acl_bindings_delete, 1, admin_options_delete, + q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + /* + * Wait for result + */ + rkev_acl_delete = test_wait_admin_result( + q, RD_KAFKA_EVENT_DELETEACLS_RESULT, 10000 + 1000); + + acl_delete_result = rd_kafka_event_DeleteAcls_result(rkev_acl_delete); + + TEST_ASSERT(acl_delete_result, "acl_delete_result should not be NULL"); + + DeleteAcls_result_responses_cntp = 0; + DeleteAcls_result_responses = rd_kafka_DeleteAcls_result_responses( + acl_delete_result, &DeleteAcls_result_responses_cntp); + + TEST_ASSERT(DeleteAcls_result_responses_cntp == 1, + "DeleteAcls_result_responses_cntp should be 1, not %zu\n", + DeleteAcls_result_responses_cntp); + + DeleteAcls_result_response = DeleteAcls_result_responses[0]; + + TEST_CALL_ERROR__(rd_kafka_DeleteAcls_result_response_error( + DeleteAcls_result_response)); + + matching_acls = rd_kafka_DeleteAcls_result_response_matching_acls( + DeleteAcls_result_response, &matching_acls_cntp); + + TEST_ASSERT(matching_acls_cntp == 2, + "matching_acls_cntp should be 2, not %zu\n", + matching_acls_cntp); + + for (i = 0; i < matching_acls_cntp; i++) { + rd_kafka_ResourceType_t restype; + rd_kafka_ResourcePatternType_t resource_pattern_type; + rd_kafka_AclOperation_t operation; + rd_kafka_AclPermissionType_t permission_type; + const char *name; + const char *principal; + + matching_acl = matching_acls[i]; + error = rd_kafka_AclBinding_error(matching_acl); + restype = rd_kafka_AclBinding_restype(matching_acl); + name = rd_kafka_AclBinding_name(matching_acl); + resource_pattern_type = + rd_kafka_AclBinding_resource_pattern_type(matching_acl); + principal = rd_kafka_AclBinding_principal(matching_acl); + operation = rd_kafka_AclBinding_operation(matching_acl); + permission_type = + rd_kafka_AclBinding_permission_type(matching_acl); + + TEST_ASSERT(!error, "expected success, not %s", + rd_kafka_error_string(error)); + TEST_ASSERT(restype == RD_KAFKA_RESOURCE_TOPIC, + "expected RD_KAFKA_RESOURCE_TOPIC not %s", + rd_kafka_ResourceType_name(restype)); + TEST_ASSERT(strcmp(name, topic1_name) == 0, + "expected %s not %s", topic1_name, name); + TEST_ASSERT(permission_type == + RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, + "expected %s not %s", + rd_kafka_AclPermissionType_name( + RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW), + rd_kafka_AclPermissionType_name(permission_type)); + + if (strcmp(user_test1, principal) == 0) { + TEST_ASSERT(resource_pattern_type == + pattern_type_first_topic_create, + "expected %s not %s", + rd_kafka_ResourcePatternType_name( + pattern_type_first_topic_create), + rd_kafka_ResourcePatternType_name( + resource_pattern_type)); + + TEST_ASSERT(operation == RD_KAFKA_ACL_OPERATION_READ, + "expected %s not %s", + rd_kafka_AclOperation_name( + RD_KAFKA_ACL_OPERATION_READ), + rd_kafka_AclOperation_name(operation)); + + } else { + TEST_ASSERT(resource_pattern_type == + RD_KAFKA_RESOURCE_PATTERN_LITERAL, + "expected %s not %s", + rd_kafka_ResourcePatternType_name( + RD_KAFKA_RESOURCE_PATTERN_LITERAL), + rd_kafka_ResourcePatternType_name( + resource_pattern_type)); + + TEST_ASSERT(operation == RD_KAFKA_ACL_OPERATION_WRITE, + "expected %s not %s", + rd_kafka_AclOperation_name( + RD_KAFKA_ACL_OPERATION_WRITE), + rd_kafka_AclOperation_name(operation)); + } + } + + acl_binding_results_cntp = + do_test_acls_count(rk, acl_bindings_describe, q); + TEST_ASSERT(acl_binding_results_cntp == 0, + "acl_binding_results_cntp should be 0, not %zu\n", + acl_binding_results_cntp); + + rd_kafka_event_destroy(rkev_acl_delete); + rd_kafka_AclBinding_destroy(acl_bindings_delete); + + acl_bindings_delete = rd_kafka_AclBindingFilter_new( + RD_KAFKA_RESOURCE_TOPIC, topic2_name, + RD_KAFKA_RESOURCE_PATTERN_LITERAL, NULL, NULL, + RD_KAFKA_ACL_OPERATION_ANY, RD_KAFKA_ACL_PERMISSION_TYPE_ANY, NULL, + 0); + acl_bindings_describe = acl_bindings_delete; + + TIMING_START(&timing, "DeleteAcls"); + rd_kafka_DeleteAcls(rk, &acl_bindings_delete, 1, admin_options_delete, + q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + /* + * Wait for result + */ + rkev_acl_delete = test_wait_admin_result( + q, RD_KAFKA_EVENT_DELETEACLS_RESULT, 10000 + 1000); + + acl_delete_result = rd_kafka_event_DeleteAcls_result(rkev_acl_delete); + + TEST_ASSERT(acl_delete_result, "acl_delete_result should not be NULL"); + + DeleteAcls_result_responses_cntp = 0; + DeleteAcls_result_responses = rd_kafka_DeleteAcls_result_responses( + acl_delete_result, &DeleteAcls_result_responses_cntp); + + TEST_ASSERT(DeleteAcls_result_responses_cntp == 1, + "DeleteAcls_result_responses_cntp should be 1, not %zu\n", + DeleteAcls_result_responses_cntp); + + DeleteAcls_result_response = DeleteAcls_result_responses[0]; + + TEST_CALL_ERROR__(rd_kafka_DeleteAcls_result_response_error( + DeleteAcls_result_response)); + + matching_acls = rd_kafka_DeleteAcls_result_response_matching_acls( + DeleteAcls_result_response, &matching_acls_cntp); + + TEST_ASSERT(matching_acls_cntp == 1, + "matching_acls_cntp should be 1, not %zu\n", + matching_acls_cntp); + + matching_acl = matching_acls[0]; + error = rd_kafka_AclBinding_error(matching_acl); + restype = rd_kafka_AclBinding_restype(matching_acl); + name = rd_kafka_AclBinding_name(matching_acl); + resource_pattern_type = + rd_kafka_AclBinding_resource_pattern_type(matching_acl); + principal = rd_kafka_AclBinding_principal(matching_acl); + operation = rd_kafka_AclBinding_operation(matching_acl); + permission_type = rd_kafka_AclBinding_permission_type(matching_acl); + + TEST_ASSERT(!error, "expected RD_KAFKA_RESP_ERR_NO_ERROR not %s", + rd_kafka_error_string(error)); + TEST_ASSERT(restype == RD_KAFKA_RESOURCE_TOPIC, + "expected RD_KAFKA_RESOURCE_TOPIC not %s", + rd_kafka_ResourceType_name(restype)); + TEST_ASSERT(strcmp(name, topic2_name) == 0, "expected %s not %s", + topic2_name, name); + TEST_ASSERT( + permission_type == RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, + "expected %s not %s", + rd_kafka_AclPermissionType_name(RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW), + rd_kafka_AclPermissionType_name(permission_type)); + TEST_ASSERT(strcmp(user_test2, principal) == 0, "expected %s not %s", + user_test2, principal); + TEST_ASSERT(resource_pattern_type == RD_KAFKA_RESOURCE_PATTERN_LITERAL, + "expected %s not %s", + rd_kafka_ResourcePatternType_name( + RD_KAFKA_RESOURCE_PATTERN_LITERAL), + rd_kafka_ResourcePatternType_name(resource_pattern_type)); + + TEST_ASSERT(operation == RD_KAFKA_ACL_OPERATION_WRITE, + "expected %s not %s", + rd_kafka_AclOperation_name(RD_KAFKA_ACL_OPERATION_WRITE), + rd_kafka_AclOperation_name(operation)); + + acl_binding_results_cntp = + do_test_acls_count(rk, acl_bindings_describe, q); + TEST_ASSERT(acl_binding_results_cntp == 0, + "acl_binding_results_cntp should be 0, not %zu\n", + acl_binding_results_cntp); + + rd_kafka_AclBinding_destroy(acl_bindings_delete); + rd_kafka_event_destroy(rkev_acl_delete); + rd_kafka_AdminOptions_destroy(admin_options_delete); + + rd_kafka_AclBinding_destroy_array(acl_bindings_create, 3); + + if (!useq) + rd_kafka_queue_destroy(q); + + SUB_TEST_PASS(); +} + +/** + * @brief Verify that an unclean rd_kafka_destroy() does not hang. + */ +static void do_test_unclean_destroy(rd_kafka_type_t cltype, int with_mainq) { + rd_kafka_t *rk; + char errstr[512]; + rd_kafka_conf_t *conf; + rd_kafka_queue_t *q; + rd_kafka_NewTopic_t *topic; + test_timing_t t_destroy; + + SUB_TEST_QUICK("Test unclean destroy using %s", + with_mainq ? "mainq" : "tempq"); + + test_conf_init(&conf, NULL, 0); + + rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr)); + TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr); + + if (with_mainq) + q = rd_kafka_queue_get_main(rk); + else + q = rd_kafka_queue_new(rk); + + topic = rd_kafka_NewTopic_new(test_mk_topic_name(__FUNCTION__, 1), 3, 1, + NULL, 0); + rd_kafka_CreateTopics(rk, &topic, 1, NULL, q); + rd_kafka_NewTopic_destroy(topic); + + rd_kafka_queue_destroy(q); + + TEST_SAY( + "Giving rd_kafka_destroy() 5s to finish, " + "despite Admin API request being processed\n"); + test_timeout_set(5); + TIMING_START(&t_destroy, "rd_kafka_destroy()"); + rd_kafka_destroy(rk); + TIMING_STOP(&t_destroy); + + SUB_TEST_PASS(); + + /* Restore timeout */ + test_timeout_set(60); +} + + + +/** + * @brief Test deletion of records + * + * + */ +static void do_test_DeleteRecords(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int op_timeout) { + rd_kafka_queue_t *q; + rd_kafka_AdminOptions_t *options = NULL; + rd_kafka_topic_partition_list_t *offsets = NULL; + rd_kafka_event_t *rkev = NULL; + rd_kafka_resp_err_t err; + char errstr[512]; + const char *errstr2; +#define MY_DEL_RECORDS_CNT 3 + rd_kafka_topic_partition_list_t *results = NULL; + int i; + const int partitions_cnt = 3; + const int msgs_cnt = 100; + char *topics[MY_DEL_RECORDS_CNT]; + rd_kafka_metadata_topic_t exp_mdtopics[MY_DEL_RECORDS_CNT] = {{0}}; + int exp_mdtopic_cnt = 0; + test_timing_t timing; + rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafka_DeleteRecords_t *del_records; + const rd_kafka_DeleteRecords_result_t *res; + + SUB_TEST_QUICK("%s DeleteRecords with %s, op_timeout %d", + rd_kafka_name(rk), what, op_timeout); + + q = useq ? useq : rd_kafka_queue_new(rk); + + if (op_timeout != -1) { + options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY); + + err = rd_kafka_AdminOptions_set_operation_timeout( + options, op_timeout, errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + } + + + for (i = 0; i < MY_DEL_RECORDS_CNT; i++) { + char pfx[32]; + char *topic; + + rd_snprintf(pfx, sizeof(pfx), "DeleteRecords-topic%d", i); + topic = rd_strdup(test_mk_topic_name(pfx, 1)); + + topics[i] = topic; + exp_mdtopics[exp_mdtopic_cnt++].topic = topic; + } + + /* Create the topics first. */ + test_CreateTopics_simple(rk, NULL, topics, MY_DEL_RECORDS_CNT, + partitions_cnt /*num_partitions*/, NULL); + + /* Verify that topics are reported by metadata */ + test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0, + 15 * 1000); + + /* Produce 100 msgs / partition */ + for (i = 0; i < MY_DEL_RECORDS_CNT; i++) { + int32_t partition; + for (partition = 0; partition < partitions_cnt; partition++) { + test_produce_msgs_easy(topics[i], 0, partition, + msgs_cnt); + } + } + + offsets = rd_kafka_topic_partition_list_new(10); + + /* Wipe all data from topic 0 */ + for (i = 0; i < partitions_cnt; i++) + rd_kafka_topic_partition_list_add(offsets, topics[0], i) + ->offset = RD_KAFKA_OFFSET_END; + + /* Wipe all data from partition 0 in topic 1 */ + rd_kafka_topic_partition_list_add(offsets, topics[1], 0)->offset = + RD_KAFKA_OFFSET_END; + + /* Wipe some data from partition 2 in topic 1 */ + rd_kafka_topic_partition_list_add(offsets, topics[1], 2)->offset = + msgs_cnt / 2; + + /* Not changing the offset (out of range) for topic 2 partition 0 */ + rd_kafka_topic_partition_list_add(offsets, topics[2], 0); + + /* Offset out of range for topic 2 partition 1 */ + rd_kafka_topic_partition_list_add(offsets, topics[2], 1)->offset = + msgs_cnt + 1; + + del_records = rd_kafka_DeleteRecords_new(offsets); + + TIMING_START(&timing, "DeleteRecords"); + TEST_SAY("Call DeleteRecords\n"); + rd_kafka_DeleteRecords(rk, &del_records, 1, options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + rd_kafka_DeleteRecords_destroy(del_records); + + TIMING_START(&timing, "DeleteRecords.queue_poll"); + + /* Poll result queue for DeleteRecords result. + * Print but otherwise ignore other event types + * (typically generic Error events). */ + while (1) { + rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); + TEST_SAY("DeleteRecords: got %s in %.3fms\n", + rd_kafka_event_name(rkev), + TIMING_DURATION(&timing) / 1000.0f); + if (rkev == NULL) + continue; + if (rd_kafka_event_error(rkev)) + TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), + rd_kafka_event_error_string(rkev)); + + if (rd_kafka_event_type(rkev) == + RD_KAFKA_EVENT_DELETERECORDS_RESULT) { + break; + } + + rd_kafka_event_destroy(rkev); + } + /* Convert event to proper result */ + res = rd_kafka_event_DeleteRecords_result(rkev); + TEST_ASSERT(res, "expected DeleteRecords_result, not %s", + rd_kafka_event_name(rkev)); + + /* Expecting error */ + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(err == exp_err, + "expected DeleteRecords to return %s, not %s (%s)", + rd_kafka_err2str(exp_err), rd_kafka_err2str(err), + err ? errstr2 : "n/a"); + + TEST_SAY("DeleteRecords: returned %s (%s)\n", rd_kafka_err2str(err), + err ? errstr2 : "n/a"); + + results = rd_kafka_topic_partition_list_copy( + rd_kafka_DeleteRecords_result_offsets(res)); + + /* Sort both input and output list */ + rd_kafka_topic_partition_list_sort(offsets, NULL, NULL); + rd_kafka_topic_partition_list_sort(results, NULL, NULL); + + TEST_SAY("Input partitions:\n"); + test_print_partition_list(offsets); + TEST_SAY("Result partitions:\n"); + test_print_partition_list(results); + + TEST_ASSERT(offsets->cnt == results->cnt, + "expected DeleteRecords_result_offsets to return %d items, " + "not %d", + offsets->cnt, results->cnt); + + for (i = 0; i < results->cnt; i++) { + const rd_kafka_topic_partition_t *input = &offsets->elems[i]; + const rd_kafka_topic_partition_t *output = &results->elems[i]; + int64_t expected_offset = input->offset; + rd_kafka_resp_err_t expected_err = 0; + + if (expected_offset == RD_KAFKA_OFFSET_END) + expected_offset = msgs_cnt; + + /* Expect Offset out of range error */ + if (input->offset < RD_KAFKA_OFFSET_END || + input->offset > msgs_cnt) + expected_err = 1; + + TEST_SAY("DeleteRecords Returned %s for %s [%" PRId32 + "] " + "low-watermark = %d\n", + rd_kafka_err2name(output->err), output->topic, + output->partition, (int)output->offset); + + if (strcmp(output->topic, input->topic)) + TEST_FAIL_LATER( + "Result order mismatch at #%d: " + "expected topic %s, got %s", + i, input->topic, output->topic); + + if (output->partition != input->partition) + TEST_FAIL_LATER( + "Result order mismatch at #%d: " + "expected partition %d, got %d", + i, input->partition, output->partition); + + if (output->err != expected_err) + TEST_FAIL_LATER( + "%s [%" PRId32 + "]: " + "expected error code %d (%s), " + "got %d (%s)", + output->topic, output->partition, expected_err, + rd_kafka_err2str(expected_err), output->err, + rd_kafka_err2str(output->err)); + + if (output->err == 0 && output->offset != expected_offset) + TEST_FAIL_LATER("%s [%" PRId32 + "]: " + "expected offset %" PRId64 + ", " + "got %" PRId64, + output->topic, output->partition, + expected_offset, output->offset); + } + + /* Check watermarks for partitions */ + for (i = 0; i < MY_DEL_RECORDS_CNT; i++) { + int32_t partition; + for (partition = 0; partition < partitions_cnt; partition++) { + const rd_kafka_topic_partition_t *del = + rd_kafka_topic_partition_list_find( + results, topics[i], partition); + int64_t expected_low = 0; + int64_t expected_high = msgs_cnt; + int64_t low, high; + + if (del && del->err == 0) { + expected_low = del->offset; + } + + err = rd_kafka_query_watermark_offsets( + rk, topics[i], partition, &low, &high, + tmout_multip(10000)); + if (err) + TEST_FAIL( + "query_watermark_offsets failed: " + "%s\n", + rd_kafka_err2str(err)); + + if (low != expected_low) + TEST_FAIL_LATER("For %s [%" PRId32 + "] expected " + "a low watermark of %" PRId64 + ", got %" PRId64, + topics[i], partition, + expected_low, low); + + if (high != expected_high) + TEST_FAIL_LATER("For %s [%" PRId32 + "] expected " + "a high watermark of %" PRId64 + ", got %" PRId64, + topics[i], partition, + expected_high, high); + } + } + + rd_kafka_event_destroy(rkev); + + for (i = 0; i < MY_DEL_RECORDS_CNT; i++) + rd_free(topics[i]); + + if (results) + rd_kafka_topic_partition_list_destroy(results); + + if (offsets) + rd_kafka_topic_partition_list_destroy(offsets); + + if (options) + rd_kafka_AdminOptions_destroy(options); + + if (!useq) + rd_kafka_queue_destroy(q); + + TEST_LATER_CHECK(); +#undef MY_DEL_RECORDS_CNT + + SUB_TEST_PASS(); +} + +/** + * @brief Test deletion of groups + * + * + */ + +typedef struct expected_group_result { + char *group; + rd_kafka_resp_err_t err; +} expected_group_result_t; + +static void do_test_DeleteGroups(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int request_timeout) { + rd_kafka_queue_t *q; + rd_kafka_AdminOptions_t *options = NULL; + rd_kafka_event_t *rkev = NULL; + rd_kafka_resp_err_t err; + char errstr[512]; + const char *errstr2; +#define MY_DEL_GROUPS_CNT 4 + int known_groups = MY_DEL_GROUPS_CNT - 1; + int i; + const int partitions_cnt = 1; + const int msgs_cnt = 100; + char *topic; + rd_kafka_metadata_topic_t exp_mdtopic = {0}; + int64_t testid = test_id_generate(); + test_timing_t timing; + rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; + const rd_kafka_group_result_t **results = NULL; + expected_group_result_t expected[MY_DEL_GROUPS_CNT] = {{0}}; + rd_kafka_DeleteGroup_t *del_groups[MY_DEL_GROUPS_CNT]; + const rd_kafka_DeleteGroups_result_t *res; + + SUB_TEST_QUICK("%s DeleteGroups with %s, request_timeout %d", + rd_kafka_name(rk), what, request_timeout); + + q = useq ? useq : rd_kafka_queue_new(rk); + + if (request_timeout != -1) { + options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY); + + err = rd_kafka_AdminOptions_set_request_timeout( + options, request_timeout, errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + } + + + topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + exp_mdtopic.topic = topic; + + /* Create the topics first. */ + test_CreateTopics_simple(rk, NULL, &topic, 1, partitions_cnt, NULL); + + /* Verify that topics are reported by metadata */ + test_wait_metadata_update(rk, &exp_mdtopic, 1, NULL, 0, 15 * 1000); + + /* Produce 100 msgs */ + test_produce_msgs_easy(topic, testid, 0, msgs_cnt); + + for (i = 0; i < MY_DEL_GROUPS_CNT; i++) { + char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + if (i < known_groups) { + test_consume_msgs_easy(group, topic, testid, -1, + msgs_cnt, NULL); + expected[i].group = group; + expected[i].err = RD_KAFKA_RESP_ERR_NO_ERROR; + } else { + expected[i].group = group; + expected[i].err = RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND; + } + del_groups[i] = rd_kafka_DeleteGroup_new(group); + } + + TIMING_START(&timing, "DeleteGroups"); + TEST_SAY("Call DeleteGroups\n"); + rd_kafka_DeleteGroups(rk, del_groups, MY_DEL_GROUPS_CNT, options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + TIMING_START(&timing, "DeleteGroups.queue_poll"); + + /* Poll result queue for DeleteGroups result. + * Print but otherwise ignore other event types + * (typically generic Error events). */ + while (1) { + rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); + TEST_SAY("DeleteGroups: got %s in %.3fms\n", + rd_kafka_event_name(rkev), + TIMING_DURATION(&timing) / 1000.0f); + if (rkev == NULL) + continue; + if (rd_kafka_event_error(rkev)) + TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), + rd_kafka_event_error_string(rkev)); + + if (rd_kafka_event_type(rkev) == + RD_KAFKA_EVENT_DELETEGROUPS_RESULT) { + break; + } + + rd_kafka_event_destroy(rkev); + } + /* Convert event to proper result */ + res = rd_kafka_event_DeleteGroups_result(rkev); + TEST_ASSERT(res, "expected DeleteGroups_result, not %s", + rd_kafka_event_name(rkev)); + + /* Expecting error */ + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(err == exp_err, + "expected DeleteGroups to return %s, not %s (%s)", + rd_kafka_err2str(exp_err), rd_kafka_err2str(err), + err ? errstr2 : "n/a"); + + TEST_SAY("DeleteGroups: returned %s (%s)\n", rd_kafka_err2str(err), + err ? errstr2 : "n/a"); + + size_t cnt = 0; + results = rd_kafka_DeleteGroups_result_groups(res, &cnt); + + TEST_ASSERT(MY_DEL_GROUPS_CNT == cnt, + "expected DeleteGroups_result_groups to return %d items, " + "not %" PRIusz, + MY_DEL_GROUPS_CNT, cnt); + + for (i = 0; i < MY_DEL_GROUPS_CNT; i++) { + const expected_group_result_t *exp = &expected[i]; + rd_kafka_resp_err_t exp_err = exp->err; + const rd_kafka_group_result_t *act = results[i]; + rd_kafka_resp_err_t act_err = + rd_kafka_error_code(rd_kafka_group_result_error(act)); + TEST_ASSERT( + strcmp(exp->group, rd_kafka_group_result_name(act)) == 0, + "Result order mismatch at #%d: expected group name to be " + "%s, not %s", + i, exp->group, rd_kafka_group_result_name(act)); + TEST_ASSERT(exp_err == act_err, + "expected err=%d for group %s, not %d (%s)", + exp_err, exp->group, act_err, + rd_kafka_err2str(act_err)); + } + + rd_kafka_event_destroy(rkev); + + for (i = 0; i < MY_DEL_GROUPS_CNT; i++) { + rd_kafka_DeleteGroup_destroy(del_groups[i]); + rd_free(expected[i].group); + } + + rd_free(topic); + + if (options) + rd_kafka_AdminOptions_destroy(options); + + if (!useq) + rd_kafka_queue_destroy(q); + + TEST_LATER_CHECK(); +#undef MY_DEL_GROUPS_CNT + + SUB_TEST_PASS(); +} + +/** + * @brief Test list groups, creating consumers for a set of groups, + * listing and deleting them at the end. + */ +static void do_test_ListConsumerGroups(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int request_timeout, + rd_bool_t match_states) { +#define TEST_LIST_CONSUMER_GROUPS_CNT 4 + rd_kafka_queue_t *q; + rd_kafka_AdminOptions_t *options = NULL; + rd_kafka_event_t *rkev = NULL; + rd_kafka_resp_err_t err; + size_t valid_cnt, error_cnt; + rd_bool_t is_simple_consumer_group; + rd_kafka_consumer_group_state_t state; + char errstr[512]; + const char *errstr2, *group_id; + char *list_consumer_groups[TEST_LIST_CONSUMER_GROUPS_CNT]; + const int partitions_cnt = 1; + const int msgs_cnt = 100; + size_t i, found; + char *topic; + rd_kafka_metadata_topic_t exp_mdtopic = {0}; + int64_t testid = test_id_generate(); + test_timing_t timing; + rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; + const rd_kafka_ListConsumerGroups_result_t *res; + const rd_kafka_ConsumerGroupListing_t **groups; + rd_bool_t has_match_states = + test_broker_version >= TEST_BRKVER(2, 7, 0, 0); + + SUB_TEST_QUICK( + "%s ListConsumerGroups with %s, request_timeout %d" + ", match_states %s", + rd_kafka_name(rk), what, request_timeout, RD_STR_ToF(match_states)); + + q = useq ? useq : rd_kafka_queue_new(rk); + + if (request_timeout != -1) { + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS); + + if (match_states) { + rd_kafka_consumer_group_state_t empty = + RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY; + + TEST_CALL_ERROR__( + rd_kafka_AdminOptions_set_match_consumer_group_states( + options, &empty, 1)); + } + + TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( + options, request_timeout, errstr, sizeof(errstr))); + } + + + topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + exp_mdtopic.topic = topic; + + /* Create the topics first. */ + test_CreateTopics_simple(rk, NULL, &topic, 1, partitions_cnt, NULL); + + /* Verify that topics are reported by metadata */ + test_wait_metadata_update(rk, &exp_mdtopic, 1, NULL, 0, 15 * 1000); + + /* Produce 100 msgs */ + test_produce_msgs_easy(topic, testid, 0, msgs_cnt); + + for (i = 0; i < TEST_LIST_CONSUMER_GROUPS_CNT; i++) { + char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + test_consume_msgs_easy(group, topic, testid, -1, msgs_cnt, + NULL); + list_consumer_groups[i] = group; + } + + TIMING_START(&timing, "ListConsumerGroups"); + TEST_SAY("Call ListConsumerGroups\n"); + rd_kafka_ListConsumerGroups(rk, options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + TIMING_START(&timing, "ListConsumerGroups.queue_poll"); + + /* Poll result queue for ListConsumerGroups result. + * Print but otherwise ignore other event types + * (typically generic Error events). */ + while (1) { + rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); + TEST_SAY("ListConsumerGroups: got %s in %.3fms\n", + rd_kafka_event_name(rkev), + TIMING_DURATION(&timing) / 1000.0f); + if (rkev == NULL) + continue; + if (rd_kafka_event_error(rkev)) + TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), + rd_kafka_event_error_string(rkev)); + + if (rd_kafka_event_type(rkev) == + RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT) { + break; + } + + rd_kafka_event_destroy(rkev); + } + /* Convert event to proper result */ + res = rd_kafka_event_ListConsumerGroups_result(rkev); + TEST_ASSERT(res, "expected ListConsumerGroups_result, got %s", + rd_kafka_event_name(rkev)); + + /* Expecting error */ + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(err == exp_err, + "expected ListConsumerGroups to return %s, got %s (%s)", + rd_kafka_err2str(exp_err), rd_kafka_err2str(err), + err ? errstr2 : "n/a"); + + TEST_SAY("ListConsumerGroups: returned %s (%s)\n", + rd_kafka_err2str(err), err ? errstr2 : "n/a"); + + groups = rd_kafka_ListConsumerGroups_result_valid(res, &valid_cnt); + rd_kafka_ListConsumerGroups_result_errors(res, &error_cnt); + + /* Other tests could be running */ + TEST_ASSERT(valid_cnt >= TEST_LIST_CONSUMER_GROUPS_CNT, + "expected ListConsumerGroups to return at least %" PRId32 + " valid groups," + " got %zu", + TEST_LIST_CONSUMER_GROUPS_CNT, valid_cnt); + + TEST_ASSERT(error_cnt == 0, + "expected ListConsumerGroups to return 0 errors," + " got %zu", + error_cnt); + + found = 0; + for (i = 0; i < valid_cnt; i++) { + int j; + const rd_kafka_ConsumerGroupListing_t *group; + group = groups[i]; + group_id = rd_kafka_ConsumerGroupListing_group_id(group); + is_simple_consumer_group = + rd_kafka_ConsumerGroupListing_is_simple_consumer_group( + group); + state = rd_kafka_ConsumerGroupListing_state(group); + for (j = 0; j < TEST_LIST_CONSUMER_GROUPS_CNT; j++) { + if (!strcmp(list_consumer_groups[j], group_id)) { + found++; + TEST_ASSERT(!is_simple_consumer_group, + "expected a normal group," + " got a simple group"); + + if (!has_match_states) + break; + + TEST_ASSERT( + state == + RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY, + "expected an Empty state," + " got state %s", + rd_kafka_consumer_group_state_name(state)); + break; + } + } + } + TEST_ASSERT(found == TEST_LIST_CONSUMER_GROUPS_CNT, + "expected to find %d" + " started groups," + " got %" PRIusz, + TEST_LIST_CONSUMER_GROUPS_CNT, found); + + rd_kafka_event_destroy(rkev); + + test_DeleteGroups_simple(rk, NULL, (char **)list_consumer_groups, + TEST_LIST_CONSUMER_GROUPS_CNT, NULL); + + for (i = 0; i < TEST_LIST_CONSUMER_GROUPS_CNT; i++) { + rd_free(list_consumer_groups[i]); + } + + rd_free(topic); + + if (options) + rd_kafka_AdminOptions_destroy(options); + + if (!useq) + rd_kafka_queue_destroy(q); + + TEST_LATER_CHECK(); +#undef TEST_LIST_CONSUMER_GROUPS_CNT + + SUB_TEST_PASS(); +} + +typedef struct expected_DescribeConsumerGroups_result { + char *group_id; + rd_kafka_resp_err_t err; +} expected_DescribeConsumerGroups_result_t; + + +/** + * @brief Test describe groups, creating consumers for a set of groups, + * describing and deleting them at the end. + */ +static void do_test_DescribeConsumerGroups(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int request_timeout) { + rd_kafka_queue_t *q; + rd_kafka_AdminOptions_t *options = NULL; + rd_kafka_event_t *rkev = NULL; + rd_kafka_resp_err_t err; + char errstr[512]; + const char *errstr2; +#define TEST_DESCRIBE_CONSUMER_GROUPS_CNT 4 + int known_groups = TEST_DESCRIBE_CONSUMER_GROUPS_CNT - 1; + int i; + const int partitions_cnt = 1; + const int msgs_cnt = 100; + char *topic; + rd_kafka_metadata_topic_t exp_mdtopic = {0}; + int64_t testid = test_id_generate(); + test_timing_t timing; + rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; + const rd_kafka_ConsumerGroupDescription_t **results = NULL; + expected_DescribeConsumerGroups_result_t + expected[TEST_DESCRIBE_CONSUMER_GROUPS_CNT] = RD_ZERO_INIT; + const char *describe_groups[TEST_DESCRIBE_CONSUMER_GROUPS_CNT]; + char group_instance_ids[TEST_DESCRIBE_CONSUMER_GROUPS_CNT][512]; + char client_ids[TEST_DESCRIBE_CONSUMER_GROUPS_CNT][512]; + rd_kafka_t *rks[TEST_DESCRIBE_CONSUMER_GROUPS_CNT]; + const rd_kafka_DescribeConsumerGroups_result_t *res; + rd_bool_t has_group_instance_id = + test_broker_version >= TEST_BRKVER(2, 4, 0, 0); + + SUB_TEST_QUICK("%s DescribeConsumerGroups with %s, request_timeout %d", + rd_kafka_name(rk), what, request_timeout); + + q = useq ? useq : rd_kafka_queue_new(rk); + + if (request_timeout != -1) { + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS); + + err = rd_kafka_AdminOptions_set_request_timeout( + options, request_timeout, errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + } + + + topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + exp_mdtopic.topic = topic; + + /* Create the topics first. */ + test_CreateTopics_simple(rk, NULL, &topic, 1, partitions_cnt, NULL); + + /* Verify that topics are reported by metadata */ + test_wait_metadata_update(rk, &exp_mdtopic, 1, NULL, 0, 15 * 1000); + + /* Produce 100 msgs */ + test_produce_msgs_easy(topic, testid, 0, msgs_cnt); + + for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { + rd_kafka_conf_t *conf; + char *group_id = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); + if (i < known_groups) { + snprintf(group_instance_ids[i], + sizeof(group_instance_ids[i]), + "group_instance_id_%" PRId32, i); + snprintf(client_ids[i], sizeof(client_ids[i]), + "client_id_%" PRId32, i); + + test_conf_init(&conf, NULL, 0); + test_conf_set(conf, "client.id", client_ids[i]); + test_conf_set(conf, "group.instance.id", + group_instance_ids[i]); + test_conf_set(conf, "session.timeout.ms", "5000"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + rks[i] = + test_create_consumer(group_id, NULL, conf, NULL); + test_consumer_subscribe(rks[i], topic); + /* Consume messages */ + test_consumer_poll("consumer", rks[i], testid, -1, -1, + msgs_cnt, NULL); + } + expected[i].group_id = group_id; + expected[i].err = RD_KAFKA_RESP_ERR_NO_ERROR; + describe_groups[i] = group_id; + } + + TIMING_START(&timing, "DescribeConsumerGroups"); + TEST_SAY("Call DescribeConsumerGroups\n"); + rd_kafka_DescribeConsumerGroups( + rk, describe_groups, TEST_DESCRIBE_CONSUMER_GROUPS_CNT, options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + TIMING_START(&timing, "DescribeConsumerGroups.queue_poll"); + + /* Poll result queue for DescribeConsumerGroups result. + * Print but otherwise ignore other event types + * (typically generic Error events). */ + while (1) { + rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); + TEST_SAY("DescribeConsumerGroups: got %s in %.3fms\n", + rd_kafka_event_name(rkev), + TIMING_DURATION(&timing) / 1000.0f); + if (rkev == NULL) + continue; + if (rd_kafka_event_error(rkev)) + TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), + rd_kafka_event_error_string(rkev)); + + if (rd_kafka_event_type(rkev) == + RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT) { + break; + } + + rd_kafka_event_destroy(rkev); + } + /* Convert event to proper result */ + res = rd_kafka_event_DescribeConsumerGroups_result(rkev); + TEST_ASSERT(res, "expected DescribeConsumerGroups_result, got %s", + rd_kafka_event_name(rkev)); + + /* Expecting error */ + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(err == exp_err, + "expected DescribeConsumerGroups to return %s, got %s (%s)", + rd_kafka_err2str(exp_err), rd_kafka_err2str(err), + err ? errstr2 : "n/a"); + + TEST_SAY("DescribeConsumerGroups: returned %s (%s)\n", + rd_kafka_err2str(err), err ? errstr2 : "n/a"); + + size_t cnt = 0; + results = rd_kafka_DescribeConsumerGroups_result_groups(res, &cnt); + + TEST_ASSERT( + TEST_DESCRIBE_CONSUMER_GROUPS_CNT == cnt, + "expected DescribeConsumerGroups_result_groups to return %d items, " + "got %" PRIusz, + TEST_DESCRIBE_CONSUMER_GROUPS_CNT, cnt); + + for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { + expected_DescribeConsumerGroups_result_t *exp = &expected[i]; + rd_kafka_resp_err_t exp_err = exp->err; + const rd_kafka_ConsumerGroupDescription_t *act = results[i]; + rd_kafka_resp_err_t act_err = rd_kafka_error_code( + rd_kafka_ConsumerGroupDescription_error(act)); + rd_kafka_consumer_group_state_t state = + rd_kafka_ConsumerGroupDescription_state(act); + TEST_ASSERT( + strcmp(exp->group_id, + rd_kafka_ConsumerGroupDescription_group_id(act)) == + 0, + "Result order mismatch at #%d: expected group id to be " + "%s, got %s", + i, exp->group_id, + rd_kafka_ConsumerGroupDescription_group_id(act)); + if (i < known_groups) { + int member_count; + const rd_kafka_MemberDescription_t *member; + const rd_kafka_MemberAssignment_t *assignment; + const char *client_id; + const char *group_instance_id; + const rd_kafka_topic_partition_list_t *partitions; + + TEST_ASSERT(state == + RD_KAFKA_CONSUMER_GROUP_STATE_STABLE, + "Expected Stable state, got %s.", + rd_kafka_consumer_group_state_name(state)); + + TEST_ASSERT( + !rd_kafka_ConsumerGroupDescription_is_simple_consumer_group( + act), + "Expected a normal consumer group, got a simple " + "one."); + + member_count = + rd_kafka_ConsumerGroupDescription_member_count(act); + TEST_ASSERT(member_count == 1, + "Expected one member, got %d.", + member_count); + + member = + rd_kafka_ConsumerGroupDescription_member(act, 0); + + client_id = + rd_kafka_MemberDescription_client_id(member); + TEST_ASSERT(!strcmp(client_id, client_ids[i]), + "Expected client id \"%s\"," + " got \"%s\".", + client_ids[i], client_id); + + if (has_group_instance_id) { + group_instance_id = + rd_kafka_MemberDescription_group_instance_id( + member); + TEST_ASSERT(!strcmp(group_instance_id, + group_instance_ids[i]), + "Expected group instance id \"%s\"," + " got \"%s\".", + group_instance_ids[i], + group_instance_id); + } + + assignment = + rd_kafka_MemberDescription_assignment(member); + TEST_ASSERT(assignment != NULL, + "Expected non-NULL member assignment"); + + partitions = + rd_kafka_MemberAssignment_partitions(assignment); + TEST_ASSERT(partitions != NULL, + "Expected non-NULL member partitions"); + + TEST_SAY( + "Member client.id=\"%s\", " + "group.instance.id=\"%s\", " + "consumer_id=\"%s\", " + "host=\"%s\", assignment:\n", + rd_kafka_MemberDescription_client_id(member), + rd_kafka_MemberDescription_group_instance_id( + member), + rd_kafka_MemberDescription_consumer_id(member), + rd_kafka_MemberDescription_host(member)); + /* This is just to make sure the returned memory + * is valid. */ + test_print_partition_list(partitions); + } else { + TEST_ASSERT(state == RD_KAFKA_CONSUMER_GROUP_STATE_DEAD, + "Expected Dead state, got %s.", + rd_kafka_consumer_group_state_name(state)); + } + TEST_ASSERT(exp_err == act_err, + "expected err=%d for group %s, got %d (%s)", + exp_err, exp->group_id, act_err, + rd_kafka_err2str(act_err)); + } + + rd_kafka_event_destroy(rkev); + + for (i = 0; i < known_groups; i++) { + test_consumer_close(rks[i]); + rd_kafka_destroy(rks[i]); + } + + /* Wait session timeout + 1s. Because using static group membership */ + rd_sleep(6); + + test_DeleteGroups_simple(rk, NULL, (char **)describe_groups, + known_groups, NULL); + + for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { + rd_free(expected[i].group_id); + } + + rd_free(topic); + + if (options) + rd_kafka_AdminOptions_destroy(options); + + if (!useq) + rd_kafka_queue_destroy(q); + + TEST_LATER_CHECK(); +#undef TEST_DESCRIBE_CONSUMER_GROUPS_CNT + + SUB_TEST_PASS(); +} + +/** + * @brief Test deletion of committed offsets. + * + * + */ +static void do_test_DeleteConsumerGroupOffsets(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int req_timeout_ms, + rd_bool_t sub_consumer) { + rd_kafka_queue_t *q; + rd_kafka_AdminOptions_t *options = NULL; + rd_kafka_topic_partition_list_t *orig_offsets, *offsets, *to_delete, + *committed, *deleted, *subscription = NULL; + rd_kafka_event_t *rkev = NULL; + rd_kafka_resp_err_t err; + char errstr[512]; + const char *errstr2; +#define MY_TOPIC_CNT 3 + int i; + const int partitions_cnt = 3; + char *topics[MY_TOPIC_CNT]; + rd_kafka_metadata_topic_t exp_mdtopics[MY_TOPIC_CNT] = {{0}}; + int exp_mdtopic_cnt = 0; + test_timing_t timing; + rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafka_DeleteConsumerGroupOffsets_t *cgoffsets; + const rd_kafka_DeleteConsumerGroupOffsets_result_t *res; + const rd_kafka_group_result_t **gres; + size_t gres_cnt; + rd_kafka_t *consumer; + char *groupid; + + SUB_TEST_QUICK( + "%s DeleteConsumerGroupOffsets with %s, req_timeout_ms %d%s", + rd_kafka_name(rk), what, req_timeout_ms, + sub_consumer ? ", with subscribing consumer" : ""); + + if (sub_consumer) + exp_err = RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC; + + q = useq ? useq : rd_kafka_queue_new(rk); + + if (req_timeout_ms != -1) { + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS); + + err = rd_kafka_AdminOptions_set_request_timeout( + options, req_timeout_ms, errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + } + + + subscription = rd_kafka_topic_partition_list_new(MY_TOPIC_CNT); + + for (i = 0; i < MY_TOPIC_CNT; i++) { + char pfx[64]; + char *topic; + + rd_snprintf(pfx, sizeof(pfx), "DCGO-topic%d", i); + topic = rd_strdup(test_mk_topic_name(pfx, 1)); + + topics[i] = topic; + exp_mdtopics[exp_mdtopic_cnt++].topic = topic; + + rd_kafka_topic_partition_list_add(subscription, topic, + RD_KAFKA_PARTITION_UA); + } + + groupid = topics[0]; + + /* Create the topics first. */ + test_CreateTopics_simple(rk, NULL, topics, MY_TOPIC_CNT, partitions_cnt, + NULL); + + /* Verify that topics are reported by metadata */ + test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0, + 15 * 1000); + + rd_sleep(1); /* Additional wait time for cluster propagation */ + + consumer = test_create_consumer(groupid, NULL, NULL, NULL); + + if (sub_consumer) { + TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription)); + test_consumer_wait_assignment(consumer, rd_true); + } + + /* Commit some offsets */ + orig_offsets = rd_kafka_topic_partition_list_new(MY_TOPIC_CNT * 2); + for (i = 0; i < MY_TOPIC_CNT * 2; i++) + rd_kafka_topic_partition_list_add(orig_offsets, topics[i / 2], + i % MY_TOPIC_CNT) + ->offset = (i + 1) * 10; + + TEST_CALL_ERR__(rd_kafka_commit(consumer, orig_offsets, 0 /*sync*/)); + + /* Verify committed offsets match */ + committed = rd_kafka_topic_partition_list_copy(orig_offsets); + TEST_CALL_ERR__( + rd_kafka_committed(consumer, committed, tmout_multip(5 * 1000))); + + if (test_partition_list_and_offsets_cmp(committed, orig_offsets)) { + TEST_SAY("commit() list:\n"); + test_print_partition_list(orig_offsets); + TEST_SAY("committed() list:\n"); + test_print_partition_list(committed); + TEST_FAIL("committed offsets don't match"); + } + + rd_kafka_topic_partition_list_destroy(committed); + + /* Now delete second half of the commits */ + offsets = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2); + to_delete = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2); + for (i = 0; i < orig_offsets->cnt; i++) { + rd_kafka_topic_partition_t *rktpar; + if (i < orig_offsets->cnt / 2) { + rktpar = rd_kafka_topic_partition_list_add( + offsets, orig_offsets->elems[i].topic, + orig_offsets->elems[i].partition); + rktpar->offset = orig_offsets->elems[i].offset; + } else { + rktpar = rd_kafka_topic_partition_list_add( + to_delete, orig_offsets->elems[i].topic, + orig_offsets->elems[i].partition); + rktpar->offset = RD_KAFKA_OFFSET_INVALID; + rktpar = rd_kafka_topic_partition_list_add( + offsets, orig_offsets->elems[i].topic, + orig_offsets->elems[i].partition); + rktpar->offset = RD_KAFKA_OFFSET_INVALID; + } + } + + cgoffsets = rd_kafka_DeleteConsumerGroupOffsets_new(groupid, to_delete); + + TIMING_START(&timing, "DeleteConsumerGroupOffsets"); + TEST_SAY("Call DeleteConsumerGroupOffsets\n"); + rd_kafka_DeleteConsumerGroupOffsets(rk, &cgoffsets, 1, options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + rd_kafka_DeleteConsumerGroupOffsets_destroy(cgoffsets); + + TIMING_START(&timing, "DeleteConsumerGroupOffsets.queue_poll"); + /* Poll result queue for DeleteConsumerGroupOffsets result. + * Print but otherwise ignore other event types + * (typically generic Error events). */ + while (1) { + rkev = rd_kafka_queue_poll(q, tmout_multip(10 * 1000)); + TEST_SAY("DeleteConsumerGroupOffsets: got %s in %.3fms\n", + rd_kafka_event_name(rkev), + TIMING_DURATION(&timing) / 1000.0f); + if (rkev == NULL) + continue; + if (rd_kafka_event_error(rkev)) + TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), + rd_kafka_event_error_string(rkev)); + + if (rd_kafka_event_type(rkev) == + RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT) + break; + + rd_kafka_event_destroy(rkev); + } + + /* Convert event to proper result */ + res = rd_kafka_event_DeleteConsumerGroupOffsets_result(rkev); + TEST_ASSERT(res, "expected DeleteConsumerGroupOffsets_result, not %s", + rd_kafka_event_name(rkev)); + + /* Expecting error */ + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(!err, + "expected DeleteConsumerGroupOffsets to succeed, " + "got %s (%s)", + rd_kafka_err2name(err), err ? errstr2 : "n/a"); + + TEST_SAY("DeleteConsumerGroupOffsets: returned %s (%s)\n", + rd_kafka_err2str(err), err ? errstr2 : "n/a"); + + gres = + rd_kafka_DeleteConsumerGroupOffsets_result_groups(res, &gres_cnt); + TEST_ASSERT(gres && gres_cnt == 1, + "expected gres_cnt == 1, not %" PRIusz, gres_cnt); + + deleted = rd_kafka_topic_partition_list_copy( + rd_kafka_group_result_partitions(gres[0])); + + if (test_partition_list_and_offsets_cmp(deleted, to_delete)) { + TEST_SAY("Result list:\n"); + test_print_partition_list(deleted); + TEST_SAY("Partitions passed to DeleteConsumerGroupOffsets:\n"); + test_print_partition_list(to_delete); + TEST_FAIL("deleted/requested offsets don't match"); + } + + /* Verify expected errors */ + for (i = 0; i < deleted->cnt; i++) { + TEST_ASSERT_LATER(deleted->elems[i].err == exp_err, + "Result %s [%" PRId32 + "] has error %s, " + "expected %s", + deleted->elems[i].topic, + deleted->elems[i].partition, + rd_kafka_err2name(deleted->elems[i].err), + rd_kafka_err2name(exp_err)); + } + + TEST_LATER_CHECK(); + + rd_kafka_topic_partition_list_destroy(deleted); + rd_kafka_topic_partition_list_destroy(to_delete); + + rd_kafka_event_destroy(rkev); + + + /* Verify committed offsets match */ + committed = rd_kafka_topic_partition_list_copy(orig_offsets); + TEST_CALL_ERR__( + rd_kafka_committed(consumer, committed, tmout_multip(5 * 1000))); + + TEST_SAY("Original committed offsets:\n"); + test_print_partition_list(orig_offsets); + + TEST_SAY("Committed offsets after delete:\n"); + test_print_partition_list(committed); + + rd_kafka_topic_partition_list_t *expected = offsets; + if (sub_consumer) + expected = orig_offsets; + + if (test_partition_list_and_offsets_cmp(committed, expected)) { + TEST_SAY("expected list:\n"); + test_print_partition_list(expected); + TEST_SAY("committed() list:\n"); + test_print_partition_list(committed); + TEST_FAIL("committed offsets don't match"); + } + + rd_kafka_topic_partition_list_destroy(committed); + rd_kafka_topic_partition_list_destroy(offsets); + rd_kafka_topic_partition_list_destroy(orig_offsets); + rd_kafka_topic_partition_list_destroy(subscription); + + for (i = 0; i < MY_TOPIC_CNT; i++) + rd_free(topics[i]); + + rd_kafka_destroy(consumer); + + if (options) + rd_kafka_AdminOptions_destroy(options); + + if (!useq) + rd_kafka_queue_destroy(q); + + TEST_LATER_CHECK(); +#undef MY_TOPIC_CNT + + SUB_TEST_PASS(); +} + + +/** + * @brief Test altering of committed offsets. + * + * + */ +static void do_test_AlterConsumerGroupOffsets(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int req_timeout_ms, + rd_bool_t sub_consumer, + rd_bool_t create_topics) { + rd_kafka_queue_t *q; + rd_kafka_AdminOptions_t *options = NULL; + rd_kafka_topic_partition_list_t *orig_offsets, *offsets, *to_alter, + *committed, *alterd, *subscription = NULL; + rd_kafka_event_t *rkev = NULL; + rd_kafka_resp_err_t err; + char errstr[512]; + const char *errstr2; +#define TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT 3 + int i; + const int partitions_cnt = 3; + char *topics[TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT]; + rd_kafka_metadata_topic_t + exp_mdtopics[TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT] = {{0}}; + int exp_mdtopic_cnt = 0; + test_timing_t timing; + rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafka_AlterConsumerGroupOffsets_t *cgoffsets; + const rd_kafka_AlterConsumerGroupOffsets_result_t *res; + const rd_kafka_group_result_t **gres; + size_t gres_cnt; + rd_kafka_t *consumer = NULL; + char *group_id; + + SUB_TEST_QUICK( + "%s AlterConsumerGroupOffsets with %s, " + "request_timeout %d%s", + rd_kafka_name(rk), what, req_timeout_ms, + sub_consumer ? ", with subscribing consumer" : ""); + + if (!create_topics) + exp_err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; + else if (sub_consumer) + exp_err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID; + + if (sub_consumer && !create_topics) + TEST_FAIL( + "Can't use set sub_consumer and unset create_topics at the " + "same time"); + + q = useq ? useq : rd_kafka_queue_new(rk); + + if (req_timeout_ms != -1) { + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_ALTERCONSUMERGROUPOFFSETS); + + err = rd_kafka_AdminOptions_set_request_timeout( + options, req_timeout_ms, errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + } + + + subscription = rd_kafka_topic_partition_list_new( + TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT); + + for (i = 0; i < TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT; i++) { + char pfx[64]; + char *topic; + + rd_snprintf(pfx, sizeof(pfx), "DCGO-topic%d", i); + topic = rd_strdup(test_mk_topic_name(pfx, 1)); + + topics[i] = topic; + exp_mdtopics[exp_mdtopic_cnt++].topic = topic; + + rd_kafka_topic_partition_list_add(subscription, topic, + RD_KAFKA_PARTITION_UA); + } + + group_id = topics[0]; + + /* Create the topics first if needed. */ + if (create_topics) { + test_CreateTopics_simple( + rk, NULL, topics, + TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT, partitions_cnt, + NULL); + + /* Verify that topics are reported by metadata */ + test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, + NULL, 0, 15 * 1000); + + rd_sleep(1); /* Additional wait time for cluster propagation */ + + consumer = test_create_consumer(group_id, NULL, NULL, NULL); + + if (sub_consumer) { + TEST_CALL_ERR__( + rd_kafka_subscribe(consumer, subscription)); + test_consumer_wait_assignment(consumer, rd_true); + } + } + + orig_offsets = rd_kafka_topic_partition_list_new( + TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT * partitions_cnt); + for (i = 0; + i < TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT * partitions_cnt; + i++) { + rd_kafka_topic_partition_t *rktpar; + rktpar = rd_kafka_topic_partition_list_add( + orig_offsets, topics[i / partitions_cnt], + i % partitions_cnt); + rktpar->offset = (i + 1) * 10; + rd_kafka_topic_partition_set_leader_epoch(rktpar, 1); + } + + /* Commit some offsets, if topics exists */ + if (create_topics) { + TEST_CALL_ERR__( + rd_kafka_commit(consumer, orig_offsets, 0 /*sync*/)); + + /* Verify committed offsets match */ + committed = rd_kafka_topic_partition_list_copy(orig_offsets); + TEST_CALL_ERR__(rd_kafka_committed(consumer, committed, + tmout_multip(5 * 1000))); + + if (test_partition_list_and_offsets_cmp(committed, + orig_offsets)) { + TEST_SAY("commit() list:\n"); + test_print_partition_list(orig_offsets); + TEST_SAY("committed() list:\n"); + test_print_partition_list(committed); + TEST_FAIL("committed offsets don't match"); + } + rd_kafka_topic_partition_list_destroy(committed); + } + + /* Now alter second half of the commits */ + offsets = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2); + to_alter = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2); + for (i = 0; i < orig_offsets->cnt; i++) { + rd_kafka_topic_partition_t *rktpar; + if (i < orig_offsets->cnt / 2) { + rktpar = rd_kafka_topic_partition_list_add( + offsets, orig_offsets->elems[i].topic, + orig_offsets->elems[i].partition); + rktpar->offset = orig_offsets->elems[i].offset; + rd_kafka_topic_partition_set_leader_epoch( + rktpar, rd_kafka_topic_partition_get_leader_epoch( + &orig_offsets->elems[i])); + } else { + rktpar = rd_kafka_topic_partition_list_add( + to_alter, orig_offsets->elems[i].topic, + orig_offsets->elems[i].partition); + rktpar->offset = 5; + rd_kafka_topic_partition_set_leader_epoch(rktpar, 2); + rktpar = rd_kafka_topic_partition_list_add( + offsets, orig_offsets->elems[i].topic, + orig_offsets->elems[i].partition); + rktpar->offset = 5; + rd_kafka_topic_partition_set_leader_epoch(rktpar, 2); + } + } + + cgoffsets = rd_kafka_AlterConsumerGroupOffsets_new(group_id, to_alter); + + TIMING_START(&timing, "AlterConsumerGroupOffsets"); + TEST_SAY("Call AlterConsumerGroupOffsets\n"); + rd_kafka_AlterConsumerGroupOffsets(rk, &cgoffsets, 1, options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + rd_kafka_AlterConsumerGroupOffsets_destroy(cgoffsets); + + TIMING_START(&timing, "AlterConsumerGroupOffsets.queue_poll"); + /* Poll result queue for AlterConsumerGroupOffsets result. + * Print but otherwise ignore other event types + * (typically generic Error events). */ + while (1) { + rkev = rd_kafka_queue_poll(q, tmout_multip(10 * 1000)); + TEST_SAY("AlterConsumerGroupOffsets: got %s in %.3fms\n", + rd_kafka_event_name(rkev), + TIMING_DURATION(&timing) / 1000.0f); + if (rkev == NULL) + continue; + if (rd_kafka_event_error(rkev)) + TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), + rd_kafka_event_error_string(rkev)); + + if (rd_kafka_event_type(rkev) == + RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT) + break; + + rd_kafka_event_destroy(rkev); + } + + /* Convert event to proper result */ + res = rd_kafka_event_AlterConsumerGroupOffsets_result(rkev); + TEST_ASSERT(res, "expected AlterConsumerGroupOffsets_result, not %s", + rd_kafka_event_name(rkev)); + + /* Expecting error */ + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(!err, + "expected AlterConsumerGroupOffsets to succeed, " + "got %s (%s)", + rd_kafka_err2name(err), err ? errstr2 : "n/a"); + + TEST_SAY("AlterConsumerGroupOffsets: returned %s (%s)\n", + rd_kafka_err2str(err), err ? errstr2 : "n/a"); + + gres = rd_kafka_AlterConsumerGroupOffsets_result_groups(res, &gres_cnt); + TEST_ASSERT(gres && gres_cnt == 1, + "expected gres_cnt == 1, not %" PRIusz, gres_cnt); + + alterd = rd_kafka_topic_partition_list_copy( + rd_kafka_group_result_partitions(gres[0])); + + if (test_partition_list_and_offsets_cmp(alterd, to_alter)) { + TEST_SAY("Result list:\n"); + test_print_partition_list(alterd); + TEST_SAY("Partitions passed to AlterConsumerGroupOffsets:\n"); + test_print_partition_list(to_alter); + TEST_FAIL("altered/requested offsets don't match"); + } + + /* Verify expected errors */ + for (i = 0; i < alterd->cnt; i++) { + TEST_ASSERT_LATER(alterd->elems[i].err == exp_err, + "Result %s [%" PRId32 + "] has error %s, " + "expected %s", + alterd->elems[i].topic, + alterd->elems[i].partition, + rd_kafka_err2name(alterd->elems[i].err), + rd_kafka_err2name(exp_err)); + } + + TEST_LATER_CHECK(); + + rd_kafka_topic_partition_list_destroy(alterd); + rd_kafka_topic_partition_list_destroy(to_alter); + + rd_kafka_event_destroy(rkev); + + + /* Verify committed offsets match, if topics exist. */ + if (create_topics) { + committed = rd_kafka_topic_partition_list_copy(orig_offsets); + TEST_CALL_ERR__(rd_kafka_committed(consumer, committed, + tmout_multip(5 * 1000))); + + rd_kafka_topic_partition_list_t *expected = offsets; + if (sub_consumer) { + /* Alter fails with an active consumer */ + expected = orig_offsets; + } + TEST_SAY("Original committed offsets:\n"); + test_print_partition_list(orig_offsets); + + TEST_SAY("Committed offsets after alter:\n"); + test_print_partition_list(committed); + + if (test_partition_list_and_offsets_cmp(committed, expected)) { + TEST_SAY("expected list:\n"); + test_print_partition_list(expected); + TEST_SAY("committed() list:\n"); + test_print_partition_list(committed); + TEST_FAIL("committed offsets don't match"); + } + rd_kafka_topic_partition_list_destroy(committed); + } + + rd_kafka_topic_partition_list_destroy(offsets); + rd_kafka_topic_partition_list_destroy(orig_offsets); + rd_kafka_topic_partition_list_destroy(subscription); + + for (i = 0; i < TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT; i++) + rd_free(topics[i]); + + if (create_topics) /* consumer is created only if topics are. */ + rd_kafka_destroy(consumer); + + if (options) + rd_kafka_AdminOptions_destroy(options); + + if (!useq) + rd_kafka_queue_destroy(q); + + TEST_LATER_CHECK(); +#undef TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT + + SUB_TEST_PASS(); +} + +/** + * @brief Test listing of committed offsets. + * + * + */ +static void do_test_ListConsumerGroupOffsets(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int req_timeout_ms, + rd_bool_t sub_consumer, + rd_bool_t null_toppars) { + rd_kafka_queue_t *q; + rd_kafka_AdminOptions_t *options = NULL; + rd_kafka_topic_partition_list_t *orig_offsets, *to_list, *committed, + *listd, *subscription = NULL; + rd_kafka_event_t *rkev = NULL; + rd_kafka_resp_err_t err; + char errstr[512]; + const char *errstr2; +#define TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT 3 + int i; + const int partitions_cnt = 3; + char *topics[TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT]; + rd_kafka_metadata_topic_t + exp_mdtopics[TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT] = {{0}}; + int exp_mdtopic_cnt = 0; + test_timing_t timing; + rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; + rd_kafka_ListConsumerGroupOffsets_t *cgoffsets; + const rd_kafka_ListConsumerGroupOffsets_result_t *res; + const rd_kafka_group_result_t **gres; + size_t gres_cnt; + rd_kafka_t *consumer; + char *group_id; + + SUB_TEST_QUICK( + "%s ListConsumerGroupOffsets with %s, " + "request timeout %d%s", + rd_kafka_name(rk), what, req_timeout_ms, + sub_consumer ? ", with subscribing consumer" : ""); + + q = useq ? useq : rd_kafka_queue_new(rk); + + if (req_timeout_ms != -1) { + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS); + + err = rd_kafka_AdminOptions_set_request_timeout( + options, req_timeout_ms, errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + } + + + subscription = rd_kafka_topic_partition_list_new( + TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT); + + for (i = 0; i < TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT; i++) { + char pfx[64]; + char *topic; + + rd_snprintf(pfx, sizeof(pfx), "DCGO-topic%d", i); + topic = rd_strdup(test_mk_topic_name(pfx, 1)); + + topics[i] = topic; + exp_mdtopics[exp_mdtopic_cnt++].topic = topic; + + rd_kafka_topic_partition_list_add(subscription, topic, + RD_KAFKA_PARTITION_UA); + } + + group_id = topics[0]; + + /* Create the topics first. */ + test_CreateTopics_simple(rk, NULL, topics, + TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT, + partitions_cnt, NULL); + + /* Verify that topics are reported by metadata */ + test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0, + 15 * 1000); + + rd_sleep(1); /* Additional wait time for cluster propagation */ + + consumer = test_create_consumer(group_id, NULL, NULL, NULL); + + if (sub_consumer) { + TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription)); + test_consumer_wait_assignment(consumer, rd_true); + } + + /* Commit some offsets */ + orig_offsets = rd_kafka_topic_partition_list_new( + TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT * 2); + for (i = 0; i < TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT * 2; i++) { + rd_kafka_topic_partition_t *rktpar; + rktpar = rd_kafka_topic_partition_list_add( + orig_offsets, topics[i / 2], + i % TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT); + rktpar->offset = (i + 1) * 10; + rd_kafka_topic_partition_set_leader_epoch(rktpar, 2); + } + + TEST_CALL_ERR__(rd_kafka_commit(consumer, orig_offsets, 0 /*sync*/)); + + /* Verify committed offsets match */ + committed = rd_kafka_topic_partition_list_copy(orig_offsets); + TEST_CALL_ERR__( + rd_kafka_committed(consumer, committed, tmout_multip(5 * 1000))); + + if (test_partition_list_and_offsets_cmp(committed, orig_offsets)) { + TEST_SAY("commit() list:\n"); + test_print_partition_list(orig_offsets); + TEST_SAY("committed() list:\n"); + test_print_partition_list(committed); + TEST_FAIL("committed offsets don't match"); + } + + rd_kafka_topic_partition_list_destroy(committed); + + to_list = rd_kafka_topic_partition_list_new(orig_offsets->cnt); + for (i = 0; i < orig_offsets->cnt; i++) { + rd_kafka_topic_partition_list_add( + to_list, orig_offsets->elems[i].topic, + orig_offsets->elems[i].partition); + } + + if (null_toppars) { + cgoffsets = + rd_kafka_ListConsumerGroupOffsets_new(group_id, NULL); + } else { + cgoffsets = + rd_kafka_ListConsumerGroupOffsets_new(group_id, to_list); + } + + TIMING_START(&timing, "ListConsumerGroupOffsets"); + TEST_SAY("Call ListConsumerGroupOffsets\n"); + rd_kafka_ListConsumerGroupOffsets(rk, &cgoffsets, 1, options, q); + TIMING_ASSERT_LATER(&timing, 0, 50); + + rd_kafka_ListConsumerGroupOffsets_destroy(cgoffsets); + + TIMING_START(&timing, "ListConsumerGroupOffsets.queue_poll"); + /* Poll result queue for ListConsumerGroupOffsets result. + * Print but otherwise ignore other event types + * (typically generic Error events). */ + while (1) { + rkev = rd_kafka_queue_poll(q, tmout_multip(10 * 1000)); + TEST_SAY("ListConsumerGroupOffsets: got %s in %.3fms\n", + rd_kafka_event_name(rkev), + TIMING_DURATION(&timing) / 1000.0f); + if (rkev == NULL) + continue; + if (rd_kafka_event_error(rkev)) + TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), + rd_kafka_event_error_string(rkev)); + + if (rd_kafka_event_type(rkev) == + RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT) + break; + + rd_kafka_event_destroy(rkev); + } + + /* Convert event to proper result */ + res = rd_kafka_event_ListConsumerGroupOffsets_result(rkev); + TEST_ASSERT(res, "expected ListConsumerGroupOffsets_result, not %s", + rd_kafka_event_name(rkev)); + + /* Expecting error */ + err = rd_kafka_event_error(rkev); + errstr2 = rd_kafka_event_error_string(rkev); + TEST_ASSERT(!err, + "expected ListConsumerGroupOffsets to succeed, " + "got %s (%s)", + rd_kafka_err2name(err), err ? errstr2 : "n/a"); + + TEST_SAY("ListConsumerGroupOffsets: returned %s (%s)\n", + rd_kafka_err2str(err), err ? errstr2 : "n/a"); + + gres = rd_kafka_ListConsumerGroupOffsets_result_groups(res, &gres_cnt); + TEST_ASSERT(gres && gres_cnt == 1, + "expected gres_cnt == 1, not %" PRIusz, gres_cnt); + + listd = rd_kafka_topic_partition_list_copy( + rd_kafka_group_result_partitions(gres[0])); + + if (test_partition_list_and_offsets_cmp(listd, orig_offsets)) { + TEST_SAY("Result list:\n"); + test_print_partition_list(listd); + TEST_SAY("Partitions passed to ListConsumerGroupOffsets:\n"); + test_print_partition_list(orig_offsets); + TEST_FAIL("listd/requested offsets don't match"); + } + + /* Verify expected errors */ + for (i = 0; i < listd->cnt; i++) { + TEST_ASSERT_LATER(listd->elems[i].err == exp_err, + "Result %s [%" PRId32 + "] has error %s, " + "expected %s", + listd->elems[i].topic, + listd->elems[i].partition, + rd_kafka_err2name(listd->elems[i].err), + rd_kafka_err2name(exp_err)); + } + + TEST_LATER_CHECK(); + + rd_kafka_topic_partition_list_destroy(listd); + rd_kafka_topic_partition_list_destroy(to_list); + + rd_kafka_event_destroy(rkev); + + rd_kafka_topic_partition_list_destroy(orig_offsets); + rd_kafka_topic_partition_list_destroy(subscription); + + for (i = 0; i < TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT; i++) + rd_free(topics[i]); + + rd_kafka_destroy(consumer); + + if (options) + rd_kafka_AdminOptions_destroy(options); + + if (!useq) + rd_kafka_queue_destroy(q); + + TEST_LATER_CHECK(); + +#undef TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT + + SUB_TEST_PASS(); +} + +static void do_test_apis(rd_kafka_type_t cltype) { + rd_kafka_t *rk; + rd_kafka_conf_t *conf; + rd_kafka_queue_t *mainq; + + /* Get the available brokers, but use a separate rd_kafka_t instance + * so we don't jinx the tests by having up-to-date metadata. */ + avail_brokers = test_get_broker_ids(NULL, &avail_broker_cnt); + TEST_SAY("%" PRIusz + " brokers in cluster " + "which will be used for replica sets\n", + avail_broker_cnt); + + do_test_unclean_destroy(cltype, 0 /*tempq*/); + do_test_unclean_destroy(cltype, 1 /*mainq*/); + + test_conf_init(&conf, NULL, 180); + test_conf_set(conf, "socket.timeout.ms", "10000"); + rk = test_create_handle(cltype, conf); + + mainq = rd_kafka_queue_get_main(rk); + + /* Create topics */ + do_test_CreateTopics("temp queue, op timeout 0", rk, NULL, 0, 0); + do_test_CreateTopics("temp queue, op timeout 15000", rk, NULL, 15000, + 0); + do_test_CreateTopics( + "temp queue, op timeout 300, " + "validate only", + rk, NULL, 300, rd_true); + do_test_CreateTopics("temp queue, op timeout 9000, validate_only", rk, + NULL, 9000, rd_true); + do_test_CreateTopics("main queue, options", rk, mainq, -1, 0); + + /* Delete topics */ + do_test_DeleteTopics("temp queue, op timeout 0", rk, NULL, 0); + do_test_DeleteTopics("main queue, op timeout 15000", rk, mainq, 1500); + + if (test_broker_version >= TEST_BRKVER(1, 0, 0, 0)) { + /* Create Partitions */ + do_test_CreatePartitions("temp queue, op timeout 6500", rk, + NULL, 6500); + do_test_CreatePartitions("main queue, op timeout 0", rk, mainq, + 0); + } + + /* CreateAcls */ + do_test_CreateAcls(rk, mainq, 0); + do_test_CreateAcls(rk, mainq, 1); + + /* DescribeAcls */ + do_test_DescribeAcls(rk, mainq, 0); + do_test_DescribeAcls(rk, mainq, 1); + + /* DeleteAcls */ + do_test_DeleteAcls(rk, mainq, 0); + do_test_DeleteAcls(rk, mainq, 1); + + /* AlterConfigs */ + do_test_AlterConfigs(rk, mainq); + + /* DescribeConfigs */ + do_test_DescribeConfigs(rk, mainq); + + /* Delete records */ + do_test_DeleteRecords("temp queue, op timeout 0", rk, NULL, 0); + do_test_DeleteRecords("main queue, op timeout 1500", rk, mainq, 1500); + + /* List groups */ + do_test_ListConsumerGroups("temp queue", rk, NULL, -1, rd_false); + do_test_ListConsumerGroups("main queue", rk, mainq, 1500, rd_true); + + /* Describe groups */ + do_test_DescribeConsumerGroups("temp queue", rk, NULL, -1); + do_test_DescribeConsumerGroups("main queue", rk, mainq, 1500); + + /* Delete groups */ + do_test_DeleteGroups("temp queue", rk, NULL, -1); + do_test_DeleteGroups("main queue", rk, mainq, 1500); + + if (test_broker_version >= TEST_BRKVER(2, 4, 0, 0)) { + /* Delete committed offsets */ + do_test_DeleteConsumerGroupOffsets("temp queue", rk, NULL, -1, + rd_false); + do_test_DeleteConsumerGroupOffsets("main queue", rk, mainq, + 1500, rd_false); + do_test_DeleteConsumerGroupOffsets( + "main queue", rk, mainq, 1500, + rd_true /*with subscribing consumer*/); + + /* Alter committed offsets */ + do_test_AlterConsumerGroupOffsets("temp queue", rk, NULL, -1, + rd_false, rd_true); + do_test_AlterConsumerGroupOffsets("main queue", rk, mainq, 1500, + rd_false, rd_true); + do_test_AlterConsumerGroupOffsets( + "main queue, nonexistent topics", rk, mainq, 1500, rd_false, + rd_false /* don't create topics */); + do_test_AlterConsumerGroupOffsets( + "main queue", rk, mainq, 1500, + rd_true, /*with subscribing consumer*/ + rd_true); + + /* List committed offsets */ + do_test_ListConsumerGroupOffsets("temp queue", rk, NULL, -1, + rd_false, rd_false); + do_test_ListConsumerGroupOffsets( + "main queue, op timeout " + "1500", + rk, mainq, 1500, rd_false, rd_false); + do_test_ListConsumerGroupOffsets( + "main queue", rk, mainq, 1500, + rd_true /*with subscribing consumer*/, rd_false); + do_test_ListConsumerGroupOffsets("temp queue", rk, NULL, -1, + rd_false, rd_true); + do_test_ListConsumerGroupOffsets("main queue", rk, mainq, 1500, + rd_false, rd_true); + do_test_ListConsumerGroupOffsets( + "main queue", rk, mainq, 1500, + rd_true /*with subscribing consumer*/, rd_true); + } + + rd_kafka_queue_destroy(mainq); + + rd_kafka_destroy(rk); + + free(avail_brokers); +} + + +int main_0081_admin(int argc, char **argv) { + + do_test_apis(RD_KAFKA_PRODUCER); + + if (test_quick) { + TEST_SAY("Skipping further 0081 tests due to quick mode\n"); + return 0; + } + + do_test_apis(RD_KAFKA_CONSUMER); + + return 0; +} -- cgit v1.2.3