diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:23 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:54:44 +0000 |
commit | 836b47cb7e99a977c5a23b059ca1d0b5065d310e (patch) | |
tree | 1604da8f482d02effa033c94a84be42bc0c848c3 /fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.tar.xz netdata-836b47cb7e99a977c5a23b059ca1d0b5065d310e.zip |
Merging upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c | 3797 |
1 files changed, 0 insertions, 3797 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c b/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c deleted file mode 100644 index 7da2dff15..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/tests/0081-admin.c +++ /dev/null @@ -1,3797 +0,0 @@ -/* - * librdkafka - Apache Kafka C library - * - * Copyright (c) 2012-2015, Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "test.h" -#include "rdkafka.h" -#include "../src/rdstring.h" - -/** - * @brief Admin API integration tests. - */ - - -static int32_t *avail_brokers; -static size_t avail_broker_cnt; - - - -static void do_test_CreateTopics(const char *what, - rd_kafka_t *rk, - rd_kafka_queue_t *useq, - int op_timeout, - rd_bool_t validate_only) { - rd_kafka_queue_t *q; -#define MY_NEW_TOPICS_CNT 7 - char *topics[MY_NEW_TOPICS_CNT]; - rd_kafka_NewTopic_t *new_topics[MY_NEW_TOPICS_CNT]; - rd_kafka_AdminOptions_t *options = NULL; - rd_kafka_resp_err_t exp_topicerr[MY_NEW_TOPICS_CNT] = {0}; - rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; - /* Expected topics in metadata */ - rd_kafka_metadata_topic_t exp_mdtopics[MY_NEW_TOPICS_CNT] = {{0}}; - int exp_mdtopic_cnt = 0; - /* Not expected topics in metadata */ - rd_kafka_metadata_topic_t exp_not_mdtopics[MY_NEW_TOPICS_CNT] = {{0}}; - int exp_not_mdtopic_cnt = 0; - int i; - char errstr[512]; - const char *errstr2; - rd_kafka_resp_err_t err; - test_timing_t timing; - rd_kafka_event_t *rkev; - const rd_kafka_CreateTopics_result_t *res; - const rd_kafka_topic_result_t **restopics; - size_t restopic_cnt; - int metadata_tmout; - int num_replicas = (int)avail_broker_cnt; - int32_t *replicas; - - SUB_TEST_QUICK( - "%s CreateTopics with %s, " - "op_timeout %d, validate_only %d", - rd_kafka_name(rk), what, op_timeout, validate_only); - - q = useq ? useq : rd_kafka_queue_new(rk); - - /* Set up replicas */ - replicas = rd_alloca(sizeof(*replicas) * num_replicas); - for (i = 0; i < num_replicas; i++) - replicas[i] = avail_brokers[i]; - - /** - * Construct NewTopic array with different properties for - * different partitions. - */ - for (i = 0; i < MY_NEW_TOPICS_CNT; i++) { - char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); - int use_defaults = - i == 6 && test_broker_version >= TEST_BRKVER(2, 4, 0, 0); - int num_parts = !use_defaults ? (i * 7 + 1) : -1; - int set_config = (i & 1); - int add_invalid_config = (i == 1); - int set_replicas = !use_defaults && !(i % 3); - rd_kafka_resp_err_t this_exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; - - topics[i] = topic; - new_topics[i] = rd_kafka_NewTopic_new( - topic, num_parts, set_replicas ? -1 : num_replicas, NULL, - 0); - - if (set_config) { - /* - * Add various configuration properties - */ - err = rd_kafka_NewTopic_set_config( - new_topics[i], "compression.type", "lz4"); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - - err = rd_kafka_NewTopic_set_config( - new_topics[i], "delete.retention.ms", "900"); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - } - - if (add_invalid_config) { - /* Add invalid config property */ - err = rd_kafka_NewTopic_set_config( - new_topics[i], "dummy.doesntexist", - "broker is verifying this"); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - this_exp_err = RD_KAFKA_RESP_ERR_INVALID_CONFIG; - } - - TEST_SAY( - "Expecting result for topic #%d: %s " - "(set_config=%d, add_invalid_config=%d, " - "set_replicas=%d, use_defaults=%d)\n", - i, rd_kafka_err2name(this_exp_err), set_config, - add_invalid_config, set_replicas, use_defaults); - - if (set_replicas) { - int32_t p; - - /* - * Set valid replica assignments - */ - for (p = 0; p < num_parts; p++) { - err = rd_kafka_NewTopic_set_replica_assignment( - new_topics[i], p, replicas, num_replicas, - errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", errstr); - } - } - - if (this_exp_err || validate_only) { - exp_topicerr[i] = this_exp_err; - exp_not_mdtopics[exp_not_mdtopic_cnt++].topic = topic; - - } else { - exp_mdtopics[exp_mdtopic_cnt].topic = topic; - exp_mdtopics[exp_mdtopic_cnt].partition_cnt = num_parts; - exp_mdtopic_cnt++; - } - } - - if (op_timeout != -1 || validate_only) { - options = rd_kafka_AdminOptions_new( - rk, RD_KAFKA_ADMIN_OP_CREATETOPICS); - - if (op_timeout != -1) { - err = rd_kafka_AdminOptions_set_operation_timeout( - options, op_timeout, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - } - - if (validate_only) { - err = rd_kafka_AdminOptions_set_validate_only( - options, validate_only, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - } - } - - TIMING_START(&timing, "CreateTopics"); - TEST_SAY("Call CreateTopics\n"); - rd_kafka_CreateTopics(rk, new_topics, MY_NEW_TOPICS_CNT, options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - /* Poll result queue for CreateTopics result. - * Print but otherwise ignore other event types - * (typically generic Error events). */ - TIMING_START(&timing, "CreateTopics.queue_poll"); - do { - rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); - TEST_SAY("CreateTopics: got %s in %.3fms\n", - rd_kafka_event_name(rkev), - TIMING_DURATION(&timing) / 1000.0f); - if (rd_kafka_event_error(rkev)) - TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), - rd_kafka_event_error_string(rkev)); - } while (rd_kafka_event_type(rkev) != - RD_KAFKA_EVENT_CREATETOPICS_RESULT); - - /* Convert event to proper result */ - res = rd_kafka_event_CreateTopics_result(rkev); - TEST_ASSERT(res, "expected CreateTopics_result, not %s", - rd_kafka_event_name(rkev)); - - /* Expecting error */ - err = rd_kafka_event_error(rkev); - errstr2 = rd_kafka_event_error_string(rkev); - TEST_ASSERT(err == exp_err, - "expected CreateTopics to return %s, not %s (%s)", - rd_kafka_err2str(exp_err), rd_kafka_err2str(err), - err ? errstr2 : "n/a"); - - TEST_SAY("CreateTopics: returned %s (%s)\n", rd_kafka_err2str(err), - err ? errstr2 : "n/a"); - - /* Extract topics */ - restopics = rd_kafka_CreateTopics_result_topics(res, &restopic_cnt); - - - /* Scan topics for proper fields and expected failures. */ - for (i = 0; i < (int)restopic_cnt; i++) { - const rd_kafka_topic_result_t *terr = restopics[i]; - - /* Verify that topic order matches our request. */ - if (strcmp(rd_kafka_topic_result_name(terr), topics[i])) - TEST_FAIL_LATER( - "Topic result order mismatch at #%d: " - "expected %s, got %s", - i, topics[i], rd_kafka_topic_result_name(terr)); - - TEST_SAY("CreateTopics result: #%d: %s: %s: %s\n", i, - rd_kafka_topic_result_name(terr), - rd_kafka_err2name(rd_kafka_topic_result_error(terr)), - rd_kafka_topic_result_error_string(terr)); - if (rd_kafka_topic_result_error(terr) != exp_topicerr[i]) - TEST_FAIL_LATER("Expected %s, not %d: %s", - rd_kafka_err2name(exp_topicerr[i]), - rd_kafka_topic_result_error(terr), - rd_kafka_err2name( - rd_kafka_topic_result_error(terr))); - } - - /** - * Verify that the expecteded topics are created and the non-expected - * are not. Allow it some time to propagate. - */ - if (validate_only) { - /* No topics should have been created, give it some time - * before checking. */ - rd_sleep(2); - metadata_tmout = 5 * 1000; - } else { - if (op_timeout > 0) - metadata_tmout = op_timeout + 1000; - else - metadata_tmout = 10 * 1000; - } - - test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, - exp_not_mdtopics, exp_not_mdtopic_cnt, - metadata_tmout); - - rd_kafka_event_destroy(rkev); - - for (i = 0; i < MY_NEW_TOPICS_CNT; i++) { - rd_kafka_NewTopic_destroy(new_topics[i]); - rd_free(topics[i]); - } - - if (options) - rd_kafka_AdminOptions_destroy(options); - - if (!useq) - rd_kafka_queue_destroy(q); - - TEST_LATER_CHECK(); -#undef MY_NEW_TOPICS_CNT - - SUB_TEST_PASS(); -} - - - -/** - * @brief Test deletion of topics - * - * - */ -static void do_test_DeleteTopics(const char *what, - rd_kafka_t *rk, - rd_kafka_queue_t *useq, - int op_timeout) { - rd_kafka_queue_t *q; - const int skip_topic_cnt = 2; -#define MY_DEL_TOPICS_CNT 9 - char *topics[MY_DEL_TOPICS_CNT]; - rd_kafka_DeleteTopic_t *del_topics[MY_DEL_TOPICS_CNT]; - rd_kafka_AdminOptions_t *options = NULL; - rd_kafka_resp_err_t exp_topicerr[MY_DEL_TOPICS_CNT] = {0}; - rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; - /* Expected topics in metadata */ - rd_kafka_metadata_topic_t exp_mdtopics[MY_DEL_TOPICS_CNT] = {{0}}; - int exp_mdtopic_cnt = 0; - /* Not expected topics in metadata */ - rd_kafka_metadata_topic_t exp_not_mdtopics[MY_DEL_TOPICS_CNT] = {{0}}; - int exp_not_mdtopic_cnt = 0; - int i; - char errstr[512]; - const char *errstr2; - rd_kafka_resp_err_t err; - test_timing_t timing; - rd_kafka_event_t *rkev; - const rd_kafka_DeleteTopics_result_t *res; - const rd_kafka_topic_result_t **restopics; - size_t restopic_cnt; - int metadata_tmout; - - SUB_TEST_QUICK("%s DeleteTopics with %s, op_timeout %d", - rd_kafka_name(rk), what, op_timeout); - - q = useq ? useq : rd_kafka_queue_new(rk); - - /** - * Construct DeleteTopic array - */ - for (i = 0; i < MY_DEL_TOPICS_CNT; i++) { - char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); - int notexist_topic = i >= MY_DEL_TOPICS_CNT - skip_topic_cnt; - - topics[i] = topic; - - del_topics[i] = rd_kafka_DeleteTopic_new(topic); - - if (notexist_topic) - exp_topicerr[i] = - RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; - else { - exp_topicerr[i] = RD_KAFKA_RESP_ERR_NO_ERROR; - - exp_mdtopics[exp_mdtopic_cnt++].topic = topic; - } - - exp_not_mdtopics[exp_not_mdtopic_cnt++].topic = topic; - } - - if (op_timeout != -1) { - options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY); - - err = rd_kafka_AdminOptions_set_operation_timeout( - options, op_timeout, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - } - - - /* Create the topics first, minus the skip count. */ - test_CreateTopics_simple(rk, NULL, topics, - MY_DEL_TOPICS_CNT - skip_topic_cnt, - 2 /*num_partitions*/, NULL); - - /* Verify that topics are reported by metadata */ - test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0, - 15 * 1000); - - TIMING_START(&timing, "DeleteTopics"); - TEST_SAY("Call DeleteTopics\n"); - rd_kafka_DeleteTopics(rk, del_topics, MY_DEL_TOPICS_CNT, options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - /* Poll result queue for DeleteTopics result. - * Print but otherwise ignore other event types - * (typically generic Error events). */ - TIMING_START(&timing, "DeleteTopics.queue_poll"); - while (1) { - rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); - TEST_SAY("DeleteTopics: got %s in %.3fms\n", - rd_kafka_event_name(rkev), - TIMING_DURATION(&timing) / 1000.0f); - if (rd_kafka_event_error(rkev)) - TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), - rd_kafka_event_error_string(rkev)); - - if (rd_kafka_event_type(rkev) == - RD_KAFKA_EVENT_DELETETOPICS_RESULT) - break; - - rd_kafka_event_destroy(rkev); - } - - /* Convert event to proper result */ - res = rd_kafka_event_DeleteTopics_result(rkev); - TEST_ASSERT(res, "expected DeleteTopics_result, not %s", - rd_kafka_event_name(rkev)); - - /* Expecting error */ - err = rd_kafka_event_error(rkev); - errstr2 = rd_kafka_event_error_string(rkev); - TEST_ASSERT(err == exp_err, - "expected DeleteTopics to return %s, not %s (%s)", - rd_kafka_err2str(exp_err), rd_kafka_err2str(err), - err ? errstr2 : "n/a"); - - TEST_SAY("DeleteTopics: returned %s (%s)\n", rd_kafka_err2str(err), - err ? errstr2 : "n/a"); - - /* Extract topics */ - restopics = rd_kafka_DeleteTopics_result_topics(res, &restopic_cnt); - - - /* Scan topics for proper fields and expected failures. */ - for (i = 0; i < (int)restopic_cnt; i++) { - const rd_kafka_topic_result_t *terr = restopics[i]; - - /* Verify that topic order matches our request. */ - if (strcmp(rd_kafka_topic_result_name(terr), topics[i])) - TEST_FAIL_LATER( - "Topic result order mismatch at #%d: " - "expected %s, got %s", - i, topics[i], rd_kafka_topic_result_name(terr)); - - TEST_SAY("DeleteTopics result: #%d: %s: %s: %s\n", i, - rd_kafka_topic_result_name(terr), - rd_kafka_err2name(rd_kafka_topic_result_error(terr)), - rd_kafka_topic_result_error_string(terr)); - if (rd_kafka_topic_result_error(terr) != exp_topicerr[i]) - TEST_FAIL_LATER("Expected %s, not %d: %s", - rd_kafka_err2name(exp_topicerr[i]), - rd_kafka_topic_result_error(terr), - rd_kafka_err2name( - rd_kafka_topic_result_error(terr))); - } - - /** - * Verify that the expected topics are deleted and the non-expected - * are not. Allow it some time to propagate. - */ - if (op_timeout > 0) - metadata_tmout = op_timeout + 1000; - else - metadata_tmout = 10 * 1000; - - test_wait_metadata_update(rk, NULL, 0, exp_not_mdtopics, - exp_not_mdtopic_cnt, metadata_tmout); - - rd_kafka_event_destroy(rkev); - - for (i = 0; i < MY_DEL_TOPICS_CNT; i++) { - rd_kafka_DeleteTopic_destroy(del_topics[i]); - rd_free(topics[i]); - } - - if (options) - rd_kafka_AdminOptions_destroy(options); - - if (!useq) - rd_kafka_queue_destroy(q); - - TEST_LATER_CHECK(); -#undef MY_DEL_TOPICS_CNT - - SUB_TEST_PASS(); -} - - - -/** - * @brief Test creation of partitions - * - * - */ -static void do_test_CreatePartitions(const char *what, - rd_kafka_t *rk, - rd_kafka_queue_t *useq, - int op_timeout) { - rd_kafka_queue_t *q; -#define MY_CRP_TOPICS_CNT 9 - char *topics[MY_CRP_TOPICS_CNT]; - rd_kafka_NewTopic_t *new_topics[MY_CRP_TOPICS_CNT]; - rd_kafka_NewPartitions_t *crp_topics[MY_CRP_TOPICS_CNT]; - rd_kafka_AdminOptions_t *options = NULL; - /* Expected topics in metadata */ - rd_kafka_metadata_topic_t exp_mdtopics[MY_CRP_TOPICS_CNT] = {{0}}; - rd_kafka_metadata_partition_t exp_mdparts[2] = {{0}}; - int exp_mdtopic_cnt = 0; - int i; - char errstr[512]; - rd_kafka_resp_err_t err; - test_timing_t timing; - int metadata_tmout; - int num_replicas = (int)avail_broker_cnt; - - SUB_TEST_QUICK("%s CreatePartitions with %s, op_timeout %d", - rd_kafka_name(rk), what, op_timeout); - - q = useq ? useq : rd_kafka_queue_new(rk); - - /* Set up two expected partitions with different replication sets - * so they can be matched by the metadata checker later. - * Even partitions use exp_mdparts[0] while odd partitions - * use exp_mdparts[1]. */ - - /* Set valid replica assignments (even, and odd (reverse) ) */ - exp_mdparts[0].replicas = - rd_alloca(sizeof(*exp_mdparts[0].replicas) * num_replicas); - exp_mdparts[1].replicas = - rd_alloca(sizeof(*exp_mdparts[1].replicas) * num_replicas); - exp_mdparts[0].replica_cnt = num_replicas; - exp_mdparts[1].replica_cnt = num_replicas; - for (i = 0; i < num_replicas; i++) { - exp_mdparts[0].replicas[i] = avail_brokers[i]; - exp_mdparts[1].replicas[i] = - avail_brokers[num_replicas - i - 1]; - } - - /** - * Construct CreatePartitions array - */ - for (i = 0; i < MY_CRP_TOPICS_CNT; i++) { - char *topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); - int initial_part_cnt = 1 + (i * 2); - int new_part_cnt = 1 + (i / 2); - int final_part_cnt = initial_part_cnt + new_part_cnt; - int set_replicas = !(i % 2); - int pi; - - topics[i] = topic; - - /* Topic to create with initial partition count */ - new_topics[i] = rd_kafka_NewTopic_new( - topic, initial_part_cnt, set_replicas ? -1 : num_replicas, - NULL, 0); - - /* .. and later add more partitions to */ - crp_topics[i] = rd_kafka_NewPartitions_new( - topic, final_part_cnt, errstr, sizeof(errstr)); - - if (set_replicas) { - exp_mdtopics[exp_mdtopic_cnt].partitions = rd_alloca( - final_part_cnt * - sizeof(*exp_mdtopics[exp_mdtopic_cnt].partitions)); - - for (pi = 0; pi < final_part_cnt; pi++) { - const rd_kafka_metadata_partition_t *exp_mdp = - &exp_mdparts[pi & 1]; - - exp_mdtopics[exp_mdtopic_cnt].partitions[pi] = - *exp_mdp; /* copy */ - - exp_mdtopics[exp_mdtopic_cnt] - .partitions[pi] - .id = pi; - - if (pi < initial_part_cnt) { - /* Set replica assignment - * for initial partitions */ - err = - rd_kafka_NewTopic_set_replica_assignment( - new_topics[i], pi, - exp_mdp->replicas, - (size_t)exp_mdp->replica_cnt, - errstr, sizeof(errstr)); - TEST_ASSERT(!err, - "NewTopic_set_replica_" - "assignment: %s", - errstr); - } else { - /* Set replica assignment for new - * partitions */ - err = - rd_kafka_NewPartitions_set_replica_assignment( - crp_topics[i], - pi - initial_part_cnt, - exp_mdp->replicas, - (size_t)exp_mdp->replica_cnt, - errstr, sizeof(errstr)); - TEST_ASSERT(!err, - "NewPartitions_set_replica_" - "assignment: %s", - errstr); - } - } - } - - TEST_SAY(_C_YEL - "Topic %s with %d initial partitions will grow " - "by %d to %d total partitions with%s replicas set\n", - topics[i], initial_part_cnt, new_part_cnt, - final_part_cnt, set_replicas ? "" : "out"); - - exp_mdtopics[exp_mdtopic_cnt].topic = topic; - exp_mdtopics[exp_mdtopic_cnt].partition_cnt = final_part_cnt; - - exp_mdtopic_cnt++; - } - - if (op_timeout != -1) { - options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY); - - err = rd_kafka_AdminOptions_set_operation_timeout( - options, op_timeout, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - } - - /* - * Create topics with initial partition count - */ - TIMING_START(&timing, "CreateTopics"); - TEST_SAY("Creating topics with initial partition counts\n"); - rd_kafka_CreateTopics(rk, new_topics, MY_CRP_TOPICS_CNT, options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - err = test_wait_topic_admin_result( - q, RD_KAFKA_EVENT_CREATETOPICS_RESULT, NULL, 15000); - TEST_ASSERT(!err, "CreateTopics failed: %s", rd_kafka_err2str(err)); - - rd_kafka_NewTopic_destroy_array(new_topics, MY_CRP_TOPICS_CNT); - - - /* - * Create new partitions - */ - TIMING_START(&timing, "CreatePartitions"); - TEST_SAY("Creating partitions\n"); - rd_kafka_CreatePartitions(rk, crp_topics, MY_CRP_TOPICS_CNT, options, - q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - err = test_wait_topic_admin_result( - q, RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT, NULL, 15000); - TEST_ASSERT(!err, "CreatePartitions failed: %s", rd_kafka_err2str(err)); - - rd_kafka_NewPartitions_destroy_array(crp_topics, MY_CRP_TOPICS_CNT); - - - /** - * Verify that the expected topics are deleted and the non-expected - * are not. Allow it some time to propagate. - */ - if (op_timeout > 0) - metadata_tmout = op_timeout + 1000; - else - metadata_tmout = 10 * 1000; - - test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0, - metadata_tmout); - - for (i = 0; i < MY_CRP_TOPICS_CNT; i++) - rd_free(topics[i]); - - if (options) - rd_kafka_AdminOptions_destroy(options); - - if (!useq) - rd_kafka_queue_destroy(q); - - TEST_LATER_CHECK(); -#undef MY_CRP_TOPICS_CNT - - SUB_TEST_PASS(); -} - - - -/** - * @brief Print the ConfigEntrys in the provided array. - */ -static void test_print_ConfigEntry_array(const rd_kafka_ConfigEntry_t **entries, - size_t entry_cnt, - unsigned int depth) { - const char *indent = &" "[4 - (depth > 4 ? 4 : depth)]; - size_t ei; - - for (ei = 0; ei < entry_cnt; ei++) { - const rd_kafka_ConfigEntry_t *e = entries[ei]; - const rd_kafka_ConfigEntry_t **syns; - size_t syn_cnt; - - syns = rd_kafka_ConfigEntry_synonyms(e, &syn_cnt); - -#define YN(v) ((v) ? "y" : "n") - TEST_SAYL( - 3, - "%s#%" PRIusz "/%" PRIusz - ": Source %s (%d): \"%s\"=\"%s\" " - "[is read-only=%s, default=%s, sensitive=%s, " - "synonym=%s] with %" PRIusz " synonym(s)\n", - indent, ei, entry_cnt, - rd_kafka_ConfigSource_name(rd_kafka_ConfigEntry_source(e)), - rd_kafka_ConfigEntry_source(e), - rd_kafka_ConfigEntry_name(e), - rd_kafka_ConfigEntry_value(e) - ? rd_kafka_ConfigEntry_value(e) - : "(NULL)", - YN(rd_kafka_ConfigEntry_is_read_only(e)), - YN(rd_kafka_ConfigEntry_is_default(e)), - YN(rd_kafka_ConfigEntry_is_sensitive(e)), - YN(rd_kafka_ConfigEntry_is_synonym(e)), syn_cnt); -#undef YN - - if (syn_cnt > 0) - test_print_ConfigEntry_array(syns, syn_cnt, depth + 1); - } -} - - -/** - * @brief Test AlterConfigs - */ -static void do_test_AlterConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { -#define MY_CONFRES_CNT 3 - char *topics[MY_CONFRES_CNT]; - rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT]; - rd_kafka_AdminOptions_t *options; - rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT]; - rd_kafka_event_t *rkev; - rd_kafka_resp_err_t err; - const rd_kafka_AlterConfigs_result_t *res; - const rd_kafka_ConfigResource_t **rconfigs; - size_t rconfig_cnt; - char errstr[128]; - const char *errstr2; - int ci = 0; - int i; - int fails = 0; - - SUB_TEST_QUICK(); - - /* - * Only create one topic, the others will be non-existent. - */ - for (i = 0; i < MY_CONFRES_CNT; i++) - rd_strdupa(&topics[i], test_mk_topic_name(__FUNCTION__, 1)); - - test_CreateTopics_simple(rk, NULL, topics, 1, 1, NULL); - - test_wait_topic_exists(rk, topics[0], 10000); - - /* - * ConfigResource #0: valid topic config - */ - configs[ci] = - rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_TOPIC, topics[ci]); - - err = rd_kafka_ConfigResource_set_config(configs[ci], - "compression.type", "gzip"); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - - err = rd_kafka_ConfigResource_set_config(configs[ci], "flush.ms", - "12345678"); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - - exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; - ci++; - - - if (test_broker_version >= TEST_BRKVER(1, 1, 0, 0)) { - /* - * ConfigResource #1: valid broker config - */ - configs[ci] = rd_kafka_ConfigResource_new( - RD_KAFKA_RESOURCE_BROKER, - tsprintf("%" PRId32, avail_brokers[0])); - - err = rd_kafka_ConfigResource_set_config( - configs[ci], "sasl.kerberos.min.time.before.relogin", - "58000"); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - - exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; - ci++; - } else { - TEST_WARN( - "Skipping RESOURCE_BROKER test on unsupported " - "broker version\n"); - } - - /* - * ConfigResource #2: valid topic config, non-existent topic - */ - configs[ci] = - rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_TOPIC, topics[ci]); - - err = rd_kafka_ConfigResource_set_config(configs[ci], - "compression.type", "lz4"); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - - err = rd_kafka_ConfigResource_set_config( - configs[ci], "offset.metadata.max.bytes", "12345"); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - - if (test_broker_version >= TEST_BRKVER(2, 7, 0, 0)) - exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; - else - exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN; - ci++; - - - /* - * Timeout options - */ - options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ALTERCONFIGS); - err = rd_kafka_AdminOptions_set_request_timeout(options, 10000, errstr, - sizeof(errstr)); - TEST_ASSERT(!err, "%s", errstr); - - - /* - * Fire off request - */ - rd_kafka_AlterConfigs(rk, configs, ci, options, rkqu); - - rd_kafka_AdminOptions_destroy(options); - - /* - * Wait for result - */ - rkev = test_wait_admin_result(rkqu, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT, - 10000 + 1000); - - /* - * Extract result - */ - res = rd_kafka_event_AlterConfigs_result(rkev); - TEST_ASSERT(res, "Expected AlterConfigs result, not %s", - rd_kafka_event_name(rkev)); - - err = rd_kafka_event_error(rkev); - errstr2 = rd_kafka_event_error_string(rkev); - TEST_ASSERT(!err, "Expected success, not %s: %s", - rd_kafka_err2name(err), errstr2); - - rconfigs = rd_kafka_AlterConfigs_result_resources(res, &rconfig_cnt); - TEST_ASSERT((int)rconfig_cnt == ci, - "Expected %d result resources, got %" PRIusz "\n", ci, - rconfig_cnt); - - /* - * Verify status per resource - */ - for (i = 0; i < (int)rconfig_cnt; i++) { - const rd_kafka_ConfigEntry_t **entries; - size_t entry_cnt; - - err = rd_kafka_ConfigResource_error(rconfigs[i]); - errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[i]); - - entries = - rd_kafka_ConfigResource_configs(rconfigs[i], &entry_cnt); - - TEST_SAY( - "ConfigResource #%d: type %s (%d), \"%s\": " - "%" PRIusz " ConfigEntries, error %s (%s)\n", - i, - rd_kafka_ResourceType_name( - rd_kafka_ConfigResource_type(rconfigs[i])), - rd_kafka_ConfigResource_type(rconfigs[i]), - rd_kafka_ConfigResource_name(rconfigs[i]), entry_cnt, - rd_kafka_err2name(err), errstr2 ? errstr2 : ""); - - test_print_ConfigEntry_array(entries, entry_cnt, 1); - - if (rd_kafka_ConfigResource_type(rconfigs[i]) != - rd_kafka_ConfigResource_type(configs[i]) || - strcmp(rd_kafka_ConfigResource_name(rconfigs[i]), - rd_kafka_ConfigResource_name(configs[i]))) { - TEST_FAIL_LATER( - "ConfigResource #%d: " - "expected type %s name %s, " - "got type %s name %s", - i, - rd_kafka_ResourceType_name( - rd_kafka_ConfigResource_type(configs[i])), - rd_kafka_ConfigResource_name(configs[i]), - rd_kafka_ResourceType_name( - rd_kafka_ConfigResource_type(rconfigs[i])), - rd_kafka_ConfigResource_name(rconfigs[i])); - fails++; - continue; - } - - - if (err != exp_err[i]) { - TEST_FAIL_LATER( - "ConfigResource #%d: " - "expected %s (%d), got %s (%s)", - i, rd_kafka_err2name(exp_err[i]), exp_err[i], - rd_kafka_err2name(err), errstr2 ? errstr2 : ""); - fails++; - } - } - - TEST_ASSERT(!fails, "See %d previous failure(s)", fails); - - rd_kafka_event_destroy(rkev); - - rd_kafka_ConfigResource_destroy_array(configs, ci); - - TEST_LATER_CHECK(); -#undef MY_CONFRES_CNT - - SUB_TEST_PASS(); -} - - - -/** - * @brief Test DescribeConfigs - */ -static void do_test_DescribeConfigs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) { -#define MY_CONFRES_CNT 3 - char *topics[MY_CONFRES_CNT]; - rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT]; - rd_kafka_AdminOptions_t *options; - rd_kafka_resp_err_t exp_err[MY_CONFRES_CNT]; - rd_kafka_event_t *rkev; - rd_kafka_resp_err_t err; - const rd_kafka_DescribeConfigs_result_t *res; - const rd_kafka_ConfigResource_t **rconfigs; - size_t rconfig_cnt; - char errstr[128]; - const char *errstr2; - int ci = 0; - int i; - int fails = 0; - int max_retry_describe = 3; - - SUB_TEST_QUICK(); - - /* - * Only create one topic, the others will be non-existent. - */ - rd_strdupa(&topics[0], test_mk_topic_name("DescribeConfigs_exist", 1)); - for (i = 1; i < MY_CONFRES_CNT; i++) - rd_strdupa(&topics[i], - test_mk_topic_name("DescribeConfigs_notexist", 1)); - - test_CreateTopics_simple(rk, NULL, topics, 1, 1, NULL); - - /* - * ConfigResource #0: topic config, no config entries. - */ - configs[ci] = - rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_TOPIC, topics[ci]); - exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; - ci++; - - /* - * ConfigResource #1:broker config, no config entries - */ - configs[ci] = rd_kafka_ConfigResource_new( - RD_KAFKA_RESOURCE_BROKER, tsprintf("%" PRId32, avail_brokers[0])); - - exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; - ci++; - - /* - * ConfigResource #2: topic config, non-existent topic, no config entr. - */ - configs[ci] = - rd_kafka_ConfigResource_new(RD_KAFKA_RESOURCE_TOPIC, topics[ci]); - /* FIXME: This is a bug in the broker (<v2.0.0), it returns a full - * response for unknown topics. - * https://issues.apache.org/jira/browse/KAFKA-6778 - */ - if (test_broker_version < TEST_BRKVER(2, 0, 0, 0)) - exp_err[ci] = RD_KAFKA_RESP_ERR_NO_ERROR; - else - exp_err[ci] = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; - ci++; - - -retry_describe: - /* - * Timeout options - */ - options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY); - err = rd_kafka_AdminOptions_set_request_timeout(options, 10000, errstr, - sizeof(errstr)); - TEST_ASSERT(!err, "%s", errstr); - - - /* - * Fire off request - */ - rd_kafka_DescribeConfigs(rk, configs, ci, options, rkqu); - - rd_kafka_AdminOptions_destroy(options); - - /* - * Wait for result - */ - rkev = test_wait_admin_result( - rkqu, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, 10000 + 1000); - - /* - * Extract result - */ - res = rd_kafka_event_DescribeConfigs_result(rkev); - TEST_ASSERT(res, "Expected DescribeConfigs result, not %s", - rd_kafka_event_name(rkev)); - - err = rd_kafka_event_error(rkev); - errstr2 = rd_kafka_event_error_string(rkev); - TEST_ASSERT(!err, "Expected success, not %s: %s", - rd_kafka_err2name(err), errstr2); - - rconfigs = rd_kafka_DescribeConfigs_result_resources(res, &rconfig_cnt); - TEST_ASSERT((int)rconfig_cnt == ci, - "Expected %d result resources, got %" PRIusz "\n", ci, - rconfig_cnt); - - /* - * Verify status per resource - */ - for (i = 0; i < (int)rconfig_cnt; i++) { - const rd_kafka_ConfigEntry_t **entries; - size_t entry_cnt; - - err = rd_kafka_ConfigResource_error(rconfigs[i]); - errstr2 = rd_kafka_ConfigResource_error_string(rconfigs[i]); - - entries = - rd_kafka_ConfigResource_configs(rconfigs[i], &entry_cnt); - - TEST_SAY( - "ConfigResource #%d: type %s (%d), \"%s\": " - "%" PRIusz " ConfigEntries, error %s (%s)\n", - i, - rd_kafka_ResourceType_name( - rd_kafka_ConfigResource_type(rconfigs[i])), - rd_kafka_ConfigResource_type(rconfigs[i]), - rd_kafka_ConfigResource_name(rconfigs[i]), entry_cnt, - rd_kafka_err2name(err), errstr2 ? errstr2 : ""); - - test_print_ConfigEntry_array(entries, entry_cnt, 1); - - if (rd_kafka_ConfigResource_type(rconfigs[i]) != - rd_kafka_ConfigResource_type(configs[i]) || - strcmp(rd_kafka_ConfigResource_name(rconfigs[i]), - rd_kafka_ConfigResource_name(configs[i]))) { - TEST_FAIL_LATER( - "ConfigResource #%d: " - "expected type %s name %s, " - "got type %s name %s", - i, - rd_kafka_ResourceType_name( - rd_kafka_ConfigResource_type(configs[i])), - rd_kafka_ConfigResource_name(configs[i]), - rd_kafka_ResourceType_name( - rd_kafka_ConfigResource_type(rconfigs[i])), - rd_kafka_ConfigResource_name(rconfigs[i])); - fails++; - continue; - } - - - if (err != exp_err[i]) { - if (err == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART && - max_retry_describe-- > 0) { - TEST_WARN( - "ConfigResource #%d: " - "expected %s (%d), got %s (%s): " - "this is typically a temporary " - "error while the new resource " - "is propagating: retrying", - i, rd_kafka_err2name(exp_err[i]), - exp_err[i], rd_kafka_err2name(err), - errstr2 ? errstr2 : ""); - rd_kafka_event_destroy(rkev); - rd_sleep(1); - goto retry_describe; - } - - TEST_FAIL_LATER( - "ConfigResource #%d: " - "expected %s (%d), got %s (%s)", - i, rd_kafka_err2name(exp_err[i]), exp_err[i], - rd_kafka_err2name(err), errstr2 ? errstr2 : ""); - fails++; - } - } - - TEST_ASSERT(!fails, "See %d previous failure(s)", fails); - - rd_kafka_event_destroy(rkev); - - rd_kafka_ConfigResource_destroy_array(configs, ci); - - TEST_LATER_CHECK(); -#undef MY_CONFRES_CNT - - SUB_TEST_PASS(); -} - -/** - * @brief Test CreateAcls - */ -static void -do_test_CreateAcls(rd_kafka_t *rk, rd_kafka_queue_t *useq, int version) { - rd_kafka_queue_t *q = useq ? useq : rd_kafka_queue_new(rk); - size_t resacl_cnt; - test_timing_t timing; - rd_kafka_resp_err_t err; - char errstr[128]; - const char *errstr2; - const char *user_test1 = "User:test1"; - const char *user_test2 = "User:test2"; - const char *base_topic_name; - char topic1_name[512]; - char topic2_name[512]; - rd_kafka_AclBinding_t *acl_bindings[2]; - rd_kafka_ResourcePatternType_t pattern_type_first_topic = - RD_KAFKA_RESOURCE_PATTERN_PREFIXED; - rd_kafka_AdminOptions_t *admin_options; - rd_kafka_event_t *rkev_acl_create; - const rd_kafka_CreateAcls_result_t *acl_res; - const rd_kafka_acl_result_t **acl_res_acls; - unsigned int i; - - SUB_TEST_QUICK(); - - if (version == 0) - pattern_type_first_topic = RD_KAFKA_RESOURCE_PATTERN_LITERAL; - - base_topic_name = test_mk_topic_name(__FUNCTION__, 1); - - rd_snprintf(topic1_name, sizeof(topic1_name), "%s_1", base_topic_name); - rd_snprintf(topic2_name, sizeof(topic2_name), "%s_2", base_topic_name); - - - acl_bindings[0] = rd_kafka_AclBinding_new( - RD_KAFKA_RESOURCE_TOPIC, topic1_name, pattern_type_first_topic, - user_test1, "*", RD_KAFKA_ACL_OPERATION_READ, - RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, NULL, 0); - acl_bindings[1] = rd_kafka_AclBinding_new( - RD_KAFKA_RESOURCE_TOPIC, topic2_name, - RD_KAFKA_RESOURCE_PATTERN_LITERAL, user_test2, "*", - RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, - NULL, 0); - - - admin_options = - rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_CREATEACLS); - err = rd_kafka_AdminOptions_set_request_timeout(admin_options, 10000, - errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", errstr); - - TIMING_START(&timing, "CreateAcls"); - TEST_SAY("Call CreateAcls\n"); - rd_kafka_CreateAcls(rk, acl_bindings, 2, admin_options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - /* - * Wait for result - */ - rkev_acl_create = test_wait_admin_result( - q, RD_KAFKA_EVENT_CREATEACLS_RESULT, 10000 + 1000); - - err = rd_kafka_event_error(rkev_acl_create); - errstr2 = rd_kafka_event_error_string(rkev_acl_create); - - if (test_broker_version < TEST_BRKVER(0, 11, 0, 0)) { - TEST_ASSERT(err == RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, - "Expected unsupported feature, not: %s", - rd_kafka_err2name(err)); - TEST_ASSERT(!strcmp(errstr2, - "ACLs Admin API (KIP-140) not supported " - "by broker, requires broker " - "version >= 0.11.0.0"), - "Expected a different message, not: %s", errstr2); - TEST_FAIL("Unexpected error: %s", rd_kafka_err2name(err)); - } - - if (version > 0 && test_broker_version < TEST_BRKVER(2, 0, 0, 0)) { - TEST_ASSERT(err == RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, - "Expected unsupported feature, not: %s", - rd_kafka_err2name(err)); - TEST_ASSERT(!strcmp(errstr2, - "Broker only supports LITERAL " - "resource pattern types"), - "Expected a different message, not: %s", errstr2); - TEST_FAIL("Unexpected error: %s", rd_kafka_err2name(err)); - } - - TEST_ASSERT(!err, "Expected success, not %s: %s", - rd_kafka_err2name(err), errstr2); - - /* - * Extract result - */ - acl_res = rd_kafka_event_CreateAcls_result(rkev_acl_create); - TEST_ASSERT(acl_res, "Expected CreateAcls result, not %s", - rd_kafka_event_name(rkev_acl_create)); - - acl_res_acls = rd_kafka_CreateAcls_result_acls(acl_res, &resacl_cnt); - TEST_ASSERT(resacl_cnt == 2, "Expected 2, not %zu", resacl_cnt); - - for (i = 0; i < resacl_cnt; i++) { - const rd_kafka_acl_result_t *acl_res_acl = *(acl_res_acls + i); - const rd_kafka_error_t *error = - rd_kafka_acl_result_error(acl_res_acl); - - TEST_ASSERT(!error, - "Expected RD_KAFKA_RESP_ERR_NO_ERROR, not %s", - rd_kafka_error_string(error)); - } - - rd_kafka_AdminOptions_destroy(admin_options); - rd_kafka_event_destroy(rkev_acl_create); - rd_kafka_AclBinding_destroy_array(acl_bindings, 2); - if (!useq) - rd_kafka_queue_destroy(q); - - SUB_TEST_PASS(); -} - -/** - * @brief Test DescribeAcls - */ -static void -do_test_DescribeAcls(rd_kafka_t *rk, rd_kafka_queue_t *useq, int version) { - rd_kafka_queue_t *q = useq ? useq : rd_kafka_queue_new(rk); - size_t acl_binding_results_cntp; - test_timing_t timing; - rd_kafka_resp_err_t err; - uint32_t i; - char errstr[128]; - const char *errstr2; - const char *user_test1 = "User:test1"; - const char *user_test2 = "User:test2"; - const char *any_host = "*"; - const char *topic_name; - rd_kafka_AclBinding_t *acl_bindings_create[2]; - rd_kafka_AclBinding_t *acl_bindings_describe; - rd_kafka_AclBinding_t *acl; - const rd_kafka_DescribeAcls_result_t *acl_describe_result; - const rd_kafka_AclBinding_t **acl_binding_results; - rd_kafka_ResourcePatternType_t pattern_type_first_topic_create; - rd_bool_t broker_version1 = - test_broker_version >= TEST_BRKVER(2, 0, 0, 0); - rd_kafka_resp_err_t create_err; - rd_kafka_AdminOptions_t *admin_options; - rd_kafka_event_t *rkev_acl_describe; - const rd_kafka_error_t *error; - - SUB_TEST_QUICK(); - - if (test_broker_version < TEST_BRKVER(0, 11, 0, 0)) { - SUB_TEST_SKIP( - "Skipping DESCRIBE_ACLS test on unsupported " - "broker version\n"); - return; - } - - pattern_type_first_topic_create = RD_KAFKA_RESOURCE_PATTERN_PREFIXED; - if (!broker_version1) - pattern_type_first_topic_create = - RD_KAFKA_RESOURCE_PATTERN_LITERAL; - - topic_name = test_mk_topic_name(__FUNCTION__, 1); - - acl_bindings_create[0] = rd_kafka_AclBinding_new( - RD_KAFKA_RESOURCE_TOPIC, topic_name, - pattern_type_first_topic_create, user_test1, any_host, - RD_KAFKA_ACL_OPERATION_READ, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, - NULL, 0); - acl_bindings_create[1] = rd_kafka_AclBinding_new( - RD_KAFKA_RESOURCE_TOPIC, topic_name, - RD_KAFKA_RESOURCE_PATTERN_LITERAL, user_test2, any_host, - RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, - NULL, 0); - - create_err = - test_CreateAcls_simple(rk, NULL, acl_bindings_create, 2, NULL); - - TEST_ASSERT(!create_err, "create error: %s", - rd_kafka_err2str(create_err)); - - acl_bindings_describe = rd_kafka_AclBindingFilter_new( - RD_KAFKA_RESOURCE_TOPIC, topic_name, - RD_KAFKA_RESOURCE_PATTERN_MATCH, NULL, NULL, - RD_KAFKA_ACL_OPERATION_ANY, RD_KAFKA_ACL_PERMISSION_TYPE_ANY, NULL, - 0); - - admin_options = - rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DESCRIBEACLS); - err = rd_kafka_AdminOptions_set_request_timeout(admin_options, 10000, - errstr, sizeof(errstr)); - - TIMING_START(&timing, "DescribeAcls"); - TEST_SAY("Call DescribeAcls\n"); - rd_kafka_DescribeAcls(rk, acl_bindings_describe, admin_options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - /* - * Wait for result - */ - rkev_acl_describe = test_wait_admin_result( - q, RD_KAFKA_EVENT_DESCRIBEACLS_RESULT, 10000 + 1000); - - err = rd_kafka_event_error(rkev_acl_describe); - errstr2 = rd_kafka_event_error_string(rkev_acl_describe); - - if (!broker_version1) { - TEST_ASSERT( - err == RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, - "expected RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE, not %s", - rd_kafka_err2str(err)); - TEST_ASSERT(strcmp(errstr2, - "Broker only supports LITERAL and ANY " - "resource pattern types") == 0, - "expected another message, not %s", errstr2); - } else { - TEST_ASSERT(!err, "expected RD_KAFKA_RESP_ERR_NO_ERROR not %s", - errstr2); - } - - if (!err) { - - acl_describe_result = - rd_kafka_event_DescribeAcls_result(rkev_acl_describe); - - TEST_ASSERT(acl_describe_result, - "acl_describe_result should not be NULL"); - - acl_binding_results_cntp = 0; - acl_binding_results = rd_kafka_DescribeAcls_result_acls( - acl_describe_result, &acl_binding_results_cntp); - - TEST_ASSERT(acl_binding_results_cntp == 2, - "acl_binding_results_cntp should be 2, not %zu", - acl_binding_results_cntp); - - for (i = 0; i < acl_binding_results_cntp; i++) { - acl = (rd_kafka_AclBinding_t *)acl_binding_results[i]; - - if (strcmp(rd_kafka_AclBinding_principal(acl), - user_test1) == 0) { - TEST_ASSERT( - rd_kafka_AclBinding_restype(acl) == - RD_KAFKA_RESOURCE_TOPIC, - "acl->restype should be " - "RD_KAFKA_RESOURCE_TOPIC, not %s", - rd_kafka_ResourceType_name( - rd_kafka_AclBinding_restype(acl))); - TEST_ASSERT( - strcmp(rd_kafka_AclBinding_name(acl), - topic_name) == 0, - "acl->name should be %s, not %s", - topic_name, rd_kafka_AclBinding_name(acl)); - TEST_ASSERT( - rd_kafka_AclBinding_resource_pattern_type( - acl) == pattern_type_first_topic_create, - "acl->resource_pattern_type should be %s, " - "not %s", - rd_kafka_ResourcePatternType_name( - pattern_type_first_topic_create), - rd_kafka_ResourcePatternType_name( - rd_kafka_AclBinding_resource_pattern_type( - acl))); - TEST_ASSERT( - strcmp(rd_kafka_AclBinding_principal(acl), - user_test1) == 0, - "acl->principal should be %s, not %s", - user_test1, - rd_kafka_AclBinding_principal(acl)); - - TEST_ASSERT( - strcmp(rd_kafka_AclBinding_host(acl), - any_host) == 0, - "acl->host should be %s, not %s", any_host, - rd_kafka_AclBinding_host(acl)); - - TEST_ASSERT( - rd_kafka_AclBinding_operation(acl) == - RD_KAFKA_ACL_OPERATION_READ, - "acl->operation should be %s, not %s", - rd_kafka_AclOperation_name( - RD_KAFKA_ACL_OPERATION_READ), - rd_kafka_AclOperation_name( - rd_kafka_AclBinding_operation(acl))); - - TEST_ASSERT( - rd_kafka_AclBinding_permission_type(acl) == - RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, - "acl->permission_type should be %s, not %s", - rd_kafka_AclPermissionType_name( - RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW), - rd_kafka_AclPermissionType_name( - rd_kafka_AclBinding_permission_type( - acl))); - - error = rd_kafka_AclBinding_error(acl); - TEST_ASSERT(!error, - "acl->error should be NULL, not %s", - rd_kafka_error_string(error)); - - } else { - TEST_ASSERT( - rd_kafka_AclBinding_restype(acl) == - RD_KAFKA_RESOURCE_TOPIC, - "acl->restype should be " - "RD_KAFKA_RESOURCE_TOPIC, not %s", - rd_kafka_ResourceType_name( - rd_kafka_AclBinding_restype(acl))); - TEST_ASSERT( - strcmp(rd_kafka_AclBinding_name(acl), - topic_name) == 0, - "acl->name should be %s, not %s", - topic_name, rd_kafka_AclBinding_name(acl)); - TEST_ASSERT( - rd_kafka_AclBinding_resource_pattern_type( - acl) == - RD_KAFKA_RESOURCE_PATTERN_LITERAL, - "acl->resource_pattern_type should be %s, " - "not %s", - rd_kafka_ResourcePatternType_name( - RD_KAFKA_RESOURCE_PATTERN_LITERAL), - rd_kafka_ResourcePatternType_name( - rd_kafka_AclBinding_resource_pattern_type( - acl))); - TEST_ASSERT( - strcmp(rd_kafka_AclBinding_principal(acl), - user_test2) == 0, - "acl->principal should be %s, not %s", - user_test2, - rd_kafka_AclBinding_principal(acl)); - - TEST_ASSERT( - strcmp(rd_kafka_AclBinding_host(acl), - any_host) == 0, - "acl->host should be %s, not %s", any_host, - rd_kafka_AclBinding_host(acl)); - - TEST_ASSERT( - rd_kafka_AclBinding_operation(acl) == - RD_KAFKA_ACL_OPERATION_WRITE, - "acl->operation should be %s, not %s", - rd_kafka_AclOperation_name( - RD_KAFKA_ACL_OPERATION_WRITE), - rd_kafka_AclOperation_name( - rd_kafka_AclBinding_operation(acl))); - - TEST_ASSERT( - rd_kafka_AclBinding_permission_type(acl) == - RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, - "acl->permission_type should be %s, not %s", - rd_kafka_AclPermissionType_name( - RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW), - rd_kafka_AclPermissionType_name( - rd_kafka_AclBinding_permission_type( - acl))); - - - error = rd_kafka_AclBinding_error(acl); - TEST_ASSERT(!error, - "acl->error should be NULL, not %s", - rd_kafka_error_string(error)); - } - } - } - - rd_kafka_AclBinding_destroy(acl_bindings_describe); - rd_kafka_event_destroy(rkev_acl_describe); - - acl_bindings_describe = rd_kafka_AclBindingFilter_new( - RD_KAFKA_RESOURCE_TOPIC, topic_name, - RD_KAFKA_RESOURCE_PATTERN_LITERAL, NULL, NULL, - RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ANY, - NULL, 0); - - TIMING_START(&timing, "DescribeAcls"); - rd_kafka_DescribeAcls(rk, acl_bindings_describe, admin_options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - /* - * Wait for result - */ - rkev_acl_describe = test_wait_admin_result( - q, RD_KAFKA_EVENT_DESCRIBEACLS_RESULT, 10000 + 1000); - - err = rd_kafka_event_error(rkev_acl_describe); - errstr2 = rd_kafka_event_error_string(rkev_acl_describe); - - TEST_ASSERT(!err, "expected RD_KAFKA_RESP_ERR_NO_ERROR not %s", - errstr2); - - acl_describe_result = - rd_kafka_event_DescribeAcls_result(rkev_acl_describe); - - TEST_ASSERT(acl_describe_result, - "acl_describe_result should not be NULL"); - - acl_binding_results_cntp = 0; - acl_binding_results = rd_kafka_DescribeAcls_result_acls( - acl_describe_result, &acl_binding_results_cntp); - - TEST_ASSERT(acl_binding_results_cntp == 1, - "acl_binding_results_cntp should be 1, not %zu", - acl_binding_results_cntp); - - acl = (rd_kafka_AclBinding_t *)acl_binding_results[0]; - - TEST_ASSERT( - rd_kafka_AclBinding_restype(acl) == RD_KAFKA_RESOURCE_TOPIC, - "acl->restype should be RD_KAFKA_RESOURCE_TOPIC, not %s", - rd_kafka_ResourceType_name(rd_kafka_AclBinding_restype(acl))); - TEST_ASSERT(strcmp(rd_kafka_AclBinding_name(acl), topic_name) == 0, - "acl->name should be %s, not %s", topic_name, - rd_kafka_AclBinding_name(acl)); - TEST_ASSERT(rd_kafka_AclBinding_resource_pattern_type(acl) == - RD_KAFKA_RESOURCE_PATTERN_LITERAL, - "acl->resource_pattern_type should be %s, not %s", - rd_kafka_ResourcePatternType_name( - RD_KAFKA_RESOURCE_PATTERN_LITERAL), - rd_kafka_ResourcePatternType_name( - rd_kafka_AclBinding_resource_pattern_type(acl))); - TEST_ASSERT(strcmp(rd_kafka_AclBinding_principal(acl), user_test2) == 0, - "acl->principal should be %s, not %s", user_test2, - rd_kafka_AclBinding_principal(acl)); - - TEST_ASSERT(strcmp(rd_kafka_AclBinding_host(acl), any_host) == 0, - "acl->host should be %s, not %s", any_host, - rd_kafka_AclBinding_host(acl)); - - TEST_ASSERT( - rd_kafka_AclBinding_permission_type(acl) == - RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, - "acl->permission_type should be %s, not %s", - rd_kafka_AclPermissionType_name(RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW), - rd_kafka_AclPermissionType_name( - rd_kafka_AclBinding_permission_type(acl))); - - error = rd_kafka_AclBinding_error(acl); - TEST_ASSERT(!error, "acl->error should be NULL, not %s", - rd_kafka_error_string(error)); - - rd_kafka_AclBinding_destroy(acl_bindings_describe); - rd_kafka_event_destroy(rkev_acl_describe); - rd_kafka_AdminOptions_destroy(admin_options); - rd_kafka_AclBinding_destroy_array(acl_bindings_create, 2); - - if (!useq) - rd_kafka_queue_destroy(q); - - SUB_TEST_PASS(); -} - -/** - * @brief Count acls by acl filter - */ -static size_t -do_test_acls_count(rd_kafka_t *rk, - rd_kafka_AclBindingFilter_t *acl_bindings_describe, - rd_kafka_queue_t *q) { - char errstr[128]; - rd_kafka_resp_err_t err; - rd_kafka_AdminOptions_t *admin_options_describe; - rd_kafka_event_t *rkev_acl_describe; - const rd_kafka_DescribeAcls_result_t *acl_describe_result; - const char *errstr2; - size_t acl_binding_results_cntp; - - admin_options_describe = - rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DESCRIBEACLS); - rd_kafka_AdminOptions_set_request_timeout(admin_options_describe, 10000, - errstr, sizeof(errstr)); - - rd_kafka_DescribeAcls(rk, acl_bindings_describe, admin_options_describe, - q); - /* - * Wait for result - */ - rkev_acl_describe = test_wait_admin_result( - q, RD_KAFKA_EVENT_DESCRIBEACLS_RESULT, 10000 + 1000); - - err = rd_kafka_event_error(rkev_acl_describe); - errstr2 = rd_kafka_event_error_string(rkev_acl_describe); - - TEST_ASSERT(!err, "expected RD_KAFKA_RESP_ERR_NO_ERROR not %s", - errstr2); - - acl_describe_result = - rd_kafka_event_DescribeAcls_result(rkev_acl_describe); - - TEST_ASSERT(acl_describe_result, - "acl_describe_result should not be NULL"); - - acl_binding_results_cntp = 0; - rd_kafka_DescribeAcls_result_acls(acl_describe_result, - &acl_binding_results_cntp); - rd_kafka_event_destroy(rkev_acl_describe); - rd_kafka_AdminOptions_destroy(admin_options_describe); - - return acl_binding_results_cntp; -} - -/** - * @brief Test DeleteAcls - */ -static void -do_test_DeleteAcls(rd_kafka_t *rk, rd_kafka_queue_t *useq, int version) { - rd_kafka_queue_t *q = useq ? useq : rd_kafka_queue_new(rk); - test_timing_t timing; - uint32_t i; - char errstr[128]; - const char *user_test1 = "User:test1"; - const char *user_test2 = "User:test2"; - const char *any_host = "*"; - const char *base_topic_name; - char topic1_name[512]; - char topic2_name[512]; - size_t acl_binding_results_cntp; - size_t DeleteAcls_result_responses_cntp; - size_t matching_acls_cntp; - rd_kafka_AclBinding_t *acl_bindings_create[3]; - rd_kafka_AclBindingFilter_t *acl_bindings_describe; - rd_kafka_AclBindingFilter_t *acl_bindings_delete; - rd_kafka_event_t *rkev_acl_delete; - rd_kafka_AdminOptions_t *admin_options_delete; - const rd_kafka_DeleteAcls_result_t *acl_delete_result; - const rd_kafka_DeleteAcls_result_response_t * - *DeleteAcls_result_responses; - const rd_kafka_DeleteAcls_result_response_t *DeleteAcls_result_response; - const rd_kafka_AclBinding_t **matching_acls; - const rd_kafka_AclBinding_t *matching_acl; - rd_kafka_ResourcePatternType_t pattern_type_first_topic_create; - rd_kafka_ResourcePatternType_t pattern_type_delete; - rd_bool_t broker_version1 = - test_broker_version >= TEST_BRKVER(2, 0, 0, 0); - rd_kafka_resp_err_t create_err; - rd_kafka_ResourceType_t restype; - rd_kafka_ResourcePatternType_t resource_pattern_type; - rd_kafka_AclOperation_t operation; - rd_kafka_AclPermissionType_t permission_type; - const char *name; - const char *principal; - const rd_kafka_error_t *error; - - SUB_TEST_QUICK(); - - if (test_broker_version < TEST_BRKVER(0, 11, 0, 0)) { - SUB_TEST_SKIP( - "Skipping DELETE_ACLS test on unsupported " - "broker version\n"); - return; - } - - pattern_type_first_topic_create = RD_KAFKA_RESOURCE_PATTERN_PREFIXED; - pattern_type_delete = RD_KAFKA_RESOURCE_PATTERN_MATCH; - if (!broker_version1) { - pattern_type_first_topic_create = - RD_KAFKA_RESOURCE_PATTERN_LITERAL; - pattern_type_delete = RD_KAFKA_RESOURCE_PATTERN_LITERAL; - } - - base_topic_name = test_mk_topic_name(__FUNCTION__, 1); - - rd_snprintf(topic1_name, sizeof(topic1_name), "%s_1", base_topic_name); - rd_snprintf(topic2_name, sizeof(topic2_name), "%s_2", base_topic_name); - - acl_bindings_create[0] = rd_kafka_AclBinding_new( - RD_KAFKA_RESOURCE_TOPIC, topic1_name, - pattern_type_first_topic_create, user_test1, any_host, - RD_KAFKA_ACL_OPERATION_READ, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, - NULL, 0); - acl_bindings_create[1] = rd_kafka_AclBinding_new( - RD_KAFKA_RESOURCE_TOPIC, topic1_name, - RD_KAFKA_RESOURCE_PATTERN_LITERAL, user_test2, any_host, - RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, - NULL, 0); - acl_bindings_create[2] = rd_kafka_AclBinding_new( - RD_KAFKA_RESOURCE_TOPIC, topic2_name, - RD_KAFKA_RESOURCE_PATTERN_LITERAL, user_test2, any_host, - RD_KAFKA_ACL_OPERATION_WRITE, RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, - NULL, 0); - - acl_bindings_delete = rd_kafka_AclBindingFilter_new( - RD_KAFKA_RESOURCE_TOPIC, topic1_name, pattern_type_delete, NULL, - NULL, RD_KAFKA_ACL_OPERATION_ANY, RD_KAFKA_ACL_PERMISSION_TYPE_ANY, - NULL, 0); - - acl_bindings_describe = acl_bindings_delete; - - create_err = - test_CreateAcls_simple(rk, NULL, acl_bindings_create, 3, NULL); - - TEST_ASSERT(!create_err, "create error: %s", - rd_kafka_err2str(create_err)); - - admin_options_delete = - rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_DELETEACLS); - rd_kafka_AdminOptions_set_request_timeout(admin_options_delete, 10000, - errstr, sizeof(errstr)); - - acl_binding_results_cntp = - do_test_acls_count(rk, acl_bindings_describe, q); - TEST_ASSERT(acl_binding_results_cntp == 2, - "acl_binding_results_cntp should not be 2, not %zu\n", - acl_binding_results_cntp); - - TIMING_START(&timing, "DeleteAcls"); - rd_kafka_DeleteAcls(rk, &acl_bindings_delete, 1, admin_options_delete, - q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - /* - * Wait for result - */ - rkev_acl_delete = test_wait_admin_result( - q, RD_KAFKA_EVENT_DELETEACLS_RESULT, 10000 + 1000); - - acl_delete_result = rd_kafka_event_DeleteAcls_result(rkev_acl_delete); - - TEST_ASSERT(acl_delete_result, "acl_delete_result should not be NULL"); - - DeleteAcls_result_responses_cntp = 0; - DeleteAcls_result_responses = rd_kafka_DeleteAcls_result_responses( - acl_delete_result, &DeleteAcls_result_responses_cntp); - - TEST_ASSERT(DeleteAcls_result_responses_cntp == 1, - "DeleteAcls_result_responses_cntp should be 1, not %zu\n", - DeleteAcls_result_responses_cntp); - - DeleteAcls_result_response = DeleteAcls_result_responses[0]; - - TEST_CALL_ERROR__(rd_kafka_DeleteAcls_result_response_error( - DeleteAcls_result_response)); - - matching_acls = rd_kafka_DeleteAcls_result_response_matching_acls( - DeleteAcls_result_response, &matching_acls_cntp); - - TEST_ASSERT(matching_acls_cntp == 2, - "matching_acls_cntp should be 2, not %zu\n", - matching_acls_cntp); - - for (i = 0; i < matching_acls_cntp; i++) { - rd_kafka_ResourceType_t restype; - rd_kafka_ResourcePatternType_t resource_pattern_type; - rd_kafka_AclOperation_t operation; - rd_kafka_AclPermissionType_t permission_type; - const char *name; - const char *principal; - - matching_acl = matching_acls[i]; - error = rd_kafka_AclBinding_error(matching_acl); - restype = rd_kafka_AclBinding_restype(matching_acl); - name = rd_kafka_AclBinding_name(matching_acl); - resource_pattern_type = - rd_kafka_AclBinding_resource_pattern_type(matching_acl); - principal = rd_kafka_AclBinding_principal(matching_acl); - operation = rd_kafka_AclBinding_operation(matching_acl); - permission_type = - rd_kafka_AclBinding_permission_type(matching_acl); - - TEST_ASSERT(!error, "expected success, not %s", - rd_kafka_error_string(error)); - TEST_ASSERT(restype == RD_KAFKA_RESOURCE_TOPIC, - "expected RD_KAFKA_RESOURCE_TOPIC not %s", - rd_kafka_ResourceType_name(restype)); - TEST_ASSERT(strcmp(name, topic1_name) == 0, - "expected %s not %s", topic1_name, name); - TEST_ASSERT(permission_type == - RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, - "expected %s not %s", - rd_kafka_AclPermissionType_name( - RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW), - rd_kafka_AclPermissionType_name(permission_type)); - - if (strcmp(user_test1, principal) == 0) { - TEST_ASSERT(resource_pattern_type == - pattern_type_first_topic_create, - "expected %s not %s", - rd_kafka_ResourcePatternType_name( - pattern_type_first_topic_create), - rd_kafka_ResourcePatternType_name( - resource_pattern_type)); - - TEST_ASSERT(operation == RD_KAFKA_ACL_OPERATION_READ, - "expected %s not %s", - rd_kafka_AclOperation_name( - RD_KAFKA_ACL_OPERATION_READ), - rd_kafka_AclOperation_name(operation)); - - } else { - TEST_ASSERT(resource_pattern_type == - RD_KAFKA_RESOURCE_PATTERN_LITERAL, - "expected %s not %s", - rd_kafka_ResourcePatternType_name( - RD_KAFKA_RESOURCE_PATTERN_LITERAL), - rd_kafka_ResourcePatternType_name( - resource_pattern_type)); - - TEST_ASSERT(operation == RD_KAFKA_ACL_OPERATION_WRITE, - "expected %s not %s", - rd_kafka_AclOperation_name( - RD_KAFKA_ACL_OPERATION_WRITE), - rd_kafka_AclOperation_name(operation)); - } - } - - acl_binding_results_cntp = - do_test_acls_count(rk, acl_bindings_describe, q); - TEST_ASSERT(acl_binding_results_cntp == 0, - "acl_binding_results_cntp should be 0, not %zu\n", - acl_binding_results_cntp); - - rd_kafka_event_destroy(rkev_acl_delete); - rd_kafka_AclBinding_destroy(acl_bindings_delete); - - acl_bindings_delete = rd_kafka_AclBindingFilter_new( - RD_KAFKA_RESOURCE_TOPIC, topic2_name, - RD_KAFKA_RESOURCE_PATTERN_LITERAL, NULL, NULL, - RD_KAFKA_ACL_OPERATION_ANY, RD_KAFKA_ACL_PERMISSION_TYPE_ANY, NULL, - 0); - acl_bindings_describe = acl_bindings_delete; - - TIMING_START(&timing, "DeleteAcls"); - rd_kafka_DeleteAcls(rk, &acl_bindings_delete, 1, admin_options_delete, - q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - /* - * Wait for result - */ - rkev_acl_delete = test_wait_admin_result( - q, RD_KAFKA_EVENT_DELETEACLS_RESULT, 10000 + 1000); - - acl_delete_result = rd_kafka_event_DeleteAcls_result(rkev_acl_delete); - - TEST_ASSERT(acl_delete_result, "acl_delete_result should not be NULL"); - - DeleteAcls_result_responses_cntp = 0; - DeleteAcls_result_responses = rd_kafka_DeleteAcls_result_responses( - acl_delete_result, &DeleteAcls_result_responses_cntp); - - TEST_ASSERT(DeleteAcls_result_responses_cntp == 1, - "DeleteAcls_result_responses_cntp should be 1, not %zu\n", - DeleteAcls_result_responses_cntp); - - DeleteAcls_result_response = DeleteAcls_result_responses[0]; - - TEST_CALL_ERROR__(rd_kafka_DeleteAcls_result_response_error( - DeleteAcls_result_response)); - - matching_acls = rd_kafka_DeleteAcls_result_response_matching_acls( - DeleteAcls_result_response, &matching_acls_cntp); - - TEST_ASSERT(matching_acls_cntp == 1, - "matching_acls_cntp should be 1, not %zu\n", - matching_acls_cntp); - - matching_acl = matching_acls[0]; - error = rd_kafka_AclBinding_error(matching_acl); - restype = rd_kafka_AclBinding_restype(matching_acl); - name = rd_kafka_AclBinding_name(matching_acl); - resource_pattern_type = - rd_kafka_AclBinding_resource_pattern_type(matching_acl); - principal = rd_kafka_AclBinding_principal(matching_acl); - operation = rd_kafka_AclBinding_operation(matching_acl); - permission_type = rd_kafka_AclBinding_permission_type(matching_acl); - - TEST_ASSERT(!error, "expected RD_KAFKA_RESP_ERR_NO_ERROR not %s", - rd_kafka_error_string(error)); - TEST_ASSERT(restype == RD_KAFKA_RESOURCE_TOPIC, - "expected RD_KAFKA_RESOURCE_TOPIC not %s", - rd_kafka_ResourceType_name(restype)); - TEST_ASSERT(strcmp(name, topic2_name) == 0, "expected %s not %s", - topic2_name, name); - TEST_ASSERT( - permission_type == RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, - "expected %s not %s", - rd_kafka_AclPermissionType_name(RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW), - rd_kafka_AclPermissionType_name(permission_type)); - TEST_ASSERT(strcmp(user_test2, principal) == 0, "expected %s not %s", - user_test2, principal); - TEST_ASSERT(resource_pattern_type == RD_KAFKA_RESOURCE_PATTERN_LITERAL, - "expected %s not %s", - rd_kafka_ResourcePatternType_name( - RD_KAFKA_RESOURCE_PATTERN_LITERAL), - rd_kafka_ResourcePatternType_name(resource_pattern_type)); - - TEST_ASSERT(operation == RD_KAFKA_ACL_OPERATION_WRITE, - "expected %s not %s", - rd_kafka_AclOperation_name(RD_KAFKA_ACL_OPERATION_WRITE), - rd_kafka_AclOperation_name(operation)); - - acl_binding_results_cntp = - do_test_acls_count(rk, acl_bindings_describe, q); - TEST_ASSERT(acl_binding_results_cntp == 0, - "acl_binding_results_cntp should be 0, not %zu\n", - acl_binding_results_cntp); - - rd_kafka_AclBinding_destroy(acl_bindings_delete); - rd_kafka_event_destroy(rkev_acl_delete); - rd_kafka_AdminOptions_destroy(admin_options_delete); - - rd_kafka_AclBinding_destroy_array(acl_bindings_create, 3); - - if (!useq) - rd_kafka_queue_destroy(q); - - SUB_TEST_PASS(); -} - -/** - * @brief Verify that an unclean rd_kafka_destroy() does not hang. - */ -static void do_test_unclean_destroy(rd_kafka_type_t cltype, int with_mainq) { - rd_kafka_t *rk; - char errstr[512]; - rd_kafka_conf_t *conf; - rd_kafka_queue_t *q; - rd_kafka_NewTopic_t *topic; - test_timing_t t_destroy; - - SUB_TEST_QUICK("Test unclean destroy using %s", - with_mainq ? "mainq" : "tempq"); - - test_conf_init(&conf, NULL, 0); - - rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr)); - TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr); - - if (with_mainq) - q = rd_kafka_queue_get_main(rk); - else - q = rd_kafka_queue_new(rk); - - topic = rd_kafka_NewTopic_new(test_mk_topic_name(__FUNCTION__, 1), 3, 1, - NULL, 0); - rd_kafka_CreateTopics(rk, &topic, 1, NULL, q); - rd_kafka_NewTopic_destroy(topic); - - rd_kafka_queue_destroy(q); - - TEST_SAY( - "Giving rd_kafka_destroy() 5s to finish, " - "despite Admin API request being processed\n"); - test_timeout_set(5); - TIMING_START(&t_destroy, "rd_kafka_destroy()"); - rd_kafka_destroy(rk); - TIMING_STOP(&t_destroy); - - SUB_TEST_PASS(); - - /* Restore timeout */ - test_timeout_set(60); -} - - - -/** - * @brief Test deletion of records - * - * - */ -static void do_test_DeleteRecords(const char *what, - rd_kafka_t *rk, - rd_kafka_queue_t *useq, - int op_timeout) { - rd_kafka_queue_t *q; - rd_kafka_AdminOptions_t *options = NULL; - rd_kafka_topic_partition_list_t *offsets = NULL; - rd_kafka_event_t *rkev = NULL; - rd_kafka_resp_err_t err; - char errstr[512]; - const char *errstr2; -#define MY_DEL_RECORDS_CNT 3 - rd_kafka_topic_partition_list_t *results = NULL; - int i; - const int partitions_cnt = 3; - const int msgs_cnt = 100; - char *topics[MY_DEL_RECORDS_CNT]; - rd_kafka_metadata_topic_t exp_mdtopics[MY_DEL_RECORDS_CNT] = {{0}}; - int exp_mdtopic_cnt = 0; - test_timing_t timing; - rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; - rd_kafka_DeleteRecords_t *del_records; - const rd_kafka_DeleteRecords_result_t *res; - - SUB_TEST_QUICK("%s DeleteRecords with %s, op_timeout %d", - rd_kafka_name(rk), what, op_timeout); - - q = useq ? useq : rd_kafka_queue_new(rk); - - if (op_timeout != -1) { - options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY); - - err = rd_kafka_AdminOptions_set_operation_timeout( - options, op_timeout, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - } - - - for (i = 0; i < MY_DEL_RECORDS_CNT; i++) { - char pfx[32]; - char *topic; - - rd_snprintf(pfx, sizeof(pfx), "DeleteRecords-topic%d", i); - topic = rd_strdup(test_mk_topic_name(pfx, 1)); - - topics[i] = topic; - exp_mdtopics[exp_mdtopic_cnt++].topic = topic; - } - - /* Create the topics first. */ - test_CreateTopics_simple(rk, NULL, topics, MY_DEL_RECORDS_CNT, - partitions_cnt /*num_partitions*/, NULL); - - /* Verify that topics are reported by metadata */ - test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0, - 15 * 1000); - - /* Produce 100 msgs / partition */ - for (i = 0; i < MY_DEL_RECORDS_CNT; i++) { - int32_t partition; - for (partition = 0; partition < partitions_cnt; partition++) { - test_produce_msgs_easy(topics[i], 0, partition, - msgs_cnt); - } - } - - offsets = rd_kafka_topic_partition_list_new(10); - - /* Wipe all data from topic 0 */ - for (i = 0; i < partitions_cnt; i++) - rd_kafka_topic_partition_list_add(offsets, topics[0], i) - ->offset = RD_KAFKA_OFFSET_END; - - /* Wipe all data from partition 0 in topic 1 */ - rd_kafka_topic_partition_list_add(offsets, topics[1], 0)->offset = - RD_KAFKA_OFFSET_END; - - /* Wipe some data from partition 2 in topic 1 */ - rd_kafka_topic_partition_list_add(offsets, topics[1], 2)->offset = - msgs_cnt / 2; - - /* Not changing the offset (out of range) for topic 2 partition 0 */ - rd_kafka_topic_partition_list_add(offsets, topics[2], 0); - - /* Offset out of range for topic 2 partition 1 */ - rd_kafka_topic_partition_list_add(offsets, topics[2], 1)->offset = - msgs_cnt + 1; - - del_records = rd_kafka_DeleteRecords_new(offsets); - - TIMING_START(&timing, "DeleteRecords"); - TEST_SAY("Call DeleteRecords\n"); - rd_kafka_DeleteRecords(rk, &del_records, 1, options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - rd_kafka_DeleteRecords_destroy(del_records); - - TIMING_START(&timing, "DeleteRecords.queue_poll"); - - /* Poll result queue for DeleteRecords result. - * Print but otherwise ignore other event types - * (typically generic Error events). */ - while (1) { - rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); - TEST_SAY("DeleteRecords: got %s in %.3fms\n", - rd_kafka_event_name(rkev), - TIMING_DURATION(&timing) / 1000.0f); - if (rkev == NULL) - continue; - if (rd_kafka_event_error(rkev)) - TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), - rd_kafka_event_error_string(rkev)); - - if (rd_kafka_event_type(rkev) == - RD_KAFKA_EVENT_DELETERECORDS_RESULT) { - break; - } - - rd_kafka_event_destroy(rkev); - } - /* Convert event to proper result */ - res = rd_kafka_event_DeleteRecords_result(rkev); - TEST_ASSERT(res, "expected DeleteRecords_result, not %s", - rd_kafka_event_name(rkev)); - - /* Expecting error */ - err = rd_kafka_event_error(rkev); - errstr2 = rd_kafka_event_error_string(rkev); - TEST_ASSERT(err == exp_err, - "expected DeleteRecords to return %s, not %s (%s)", - rd_kafka_err2str(exp_err), rd_kafka_err2str(err), - err ? errstr2 : "n/a"); - - TEST_SAY("DeleteRecords: returned %s (%s)\n", rd_kafka_err2str(err), - err ? errstr2 : "n/a"); - - results = rd_kafka_topic_partition_list_copy( - rd_kafka_DeleteRecords_result_offsets(res)); - - /* Sort both input and output list */ - rd_kafka_topic_partition_list_sort(offsets, NULL, NULL); - rd_kafka_topic_partition_list_sort(results, NULL, NULL); - - TEST_SAY("Input partitions:\n"); - test_print_partition_list(offsets); - TEST_SAY("Result partitions:\n"); - test_print_partition_list(results); - - TEST_ASSERT(offsets->cnt == results->cnt, - "expected DeleteRecords_result_offsets to return %d items, " - "not %d", - offsets->cnt, results->cnt); - - for (i = 0; i < results->cnt; i++) { - const rd_kafka_topic_partition_t *input = &offsets->elems[i]; - const rd_kafka_topic_partition_t *output = &results->elems[i]; - int64_t expected_offset = input->offset; - rd_kafka_resp_err_t expected_err = 0; - - if (expected_offset == RD_KAFKA_OFFSET_END) - expected_offset = msgs_cnt; - - /* Expect Offset out of range error */ - if (input->offset < RD_KAFKA_OFFSET_END || - input->offset > msgs_cnt) - expected_err = 1; - - TEST_SAY("DeleteRecords Returned %s for %s [%" PRId32 - "] " - "low-watermark = %d\n", - rd_kafka_err2name(output->err), output->topic, - output->partition, (int)output->offset); - - if (strcmp(output->topic, input->topic)) - TEST_FAIL_LATER( - "Result order mismatch at #%d: " - "expected topic %s, got %s", - i, input->topic, output->topic); - - if (output->partition != input->partition) - TEST_FAIL_LATER( - "Result order mismatch at #%d: " - "expected partition %d, got %d", - i, input->partition, output->partition); - - if (output->err != expected_err) - TEST_FAIL_LATER( - "%s [%" PRId32 - "]: " - "expected error code %d (%s), " - "got %d (%s)", - output->topic, output->partition, expected_err, - rd_kafka_err2str(expected_err), output->err, - rd_kafka_err2str(output->err)); - - if (output->err == 0 && output->offset != expected_offset) - TEST_FAIL_LATER("%s [%" PRId32 - "]: " - "expected offset %" PRId64 - ", " - "got %" PRId64, - output->topic, output->partition, - expected_offset, output->offset); - } - - /* Check watermarks for partitions */ - for (i = 0; i < MY_DEL_RECORDS_CNT; i++) { - int32_t partition; - for (partition = 0; partition < partitions_cnt; partition++) { - const rd_kafka_topic_partition_t *del = - rd_kafka_topic_partition_list_find( - results, topics[i], partition); - int64_t expected_low = 0; - int64_t expected_high = msgs_cnt; - int64_t low, high; - - if (del && del->err == 0) { - expected_low = del->offset; - } - - err = rd_kafka_query_watermark_offsets( - rk, topics[i], partition, &low, &high, - tmout_multip(10000)); - if (err) - TEST_FAIL( - "query_watermark_offsets failed: " - "%s\n", - rd_kafka_err2str(err)); - - if (low != expected_low) - TEST_FAIL_LATER("For %s [%" PRId32 - "] expected " - "a low watermark of %" PRId64 - ", got %" PRId64, - topics[i], partition, - expected_low, low); - - if (high != expected_high) - TEST_FAIL_LATER("For %s [%" PRId32 - "] expected " - "a high watermark of %" PRId64 - ", got %" PRId64, - topics[i], partition, - expected_high, high); - } - } - - rd_kafka_event_destroy(rkev); - - for (i = 0; i < MY_DEL_RECORDS_CNT; i++) - rd_free(topics[i]); - - if (results) - rd_kafka_topic_partition_list_destroy(results); - - if (offsets) - rd_kafka_topic_partition_list_destroy(offsets); - - if (options) - rd_kafka_AdminOptions_destroy(options); - - if (!useq) - rd_kafka_queue_destroy(q); - - TEST_LATER_CHECK(); -#undef MY_DEL_RECORDS_CNT - - SUB_TEST_PASS(); -} - -/** - * @brief Test deletion of groups - * - * - */ - -typedef struct expected_group_result { - char *group; - rd_kafka_resp_err_t err; -} expected_group_result_t; - -static void do_test_DeleteGroups(const char *what, - rd_kafka_t *rk, - rd_kafka_queue_t *useq, - int request_timeout) { - rd_kafka_queue_t *q; - rd_kafka_AdminOptions_t *options = NULL; - rd_kafka_event_t *rkev = NULL; - rd_kafka_resp_err_t err; - char errstr[512]; - const char *errstr2; -#define MY_DEL_GROUPS_CNT 4 - int known_groups = MY_DEL_GROUPS_CNT - 1; - int i; - const int partitions_cnt = 1; - const int msgs_cnt = 100; - char *topic; - rd_kafka_metadata_topic_t exp_mdtopic = {0}; - int64_t testid = test_id_generate(); - test_timing_t timing; - rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; - const rd_kafka_group_result_t **results = NULL; - expected_group_result_t expected[MY_DEL_GROUPS_CNT] = {{0}}; - rd_kafka_DeleteGroup_t *del_groups[MY_DEL_GROUPS_CNT]; - const rd_kafka_DeleteGroups_result_t *res; - - SUB_TEST_QUICK("%s DeleteGroups with %s, request_timeout %d", - rd_kafka_name(rk), what, request_timeout); - - q = useq ? useq : rd_kafka_queue_new(rk); - - if (request_timeout != -1) { - options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY); - - err = rd_kafka_AdminOptions_set_request_timeout( - options, request_timeout, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - } - - - topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); - exp_mdtopic.topic = topic; - - /* Create the topics first. */ - test_CreateTopics_simple(rk, NULL, &topic, 1, partitions_cnt, NULL); - - /* Verify that topics are reported by metadata */ - test_wait_metadata_update(rk, &exp_mdtopic, 1, NULL, 0, 15 * 1000); - - /* Produce 100 msgs */ - test_produce_msgs_easy(topic, testid, 0, msgs_cnt); - - for (i = 0; i < MY_DEL_GROUPS_CNT; i++) { - char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); - if (i < known_groups) { - test_consume_msgs_easy(group, topic, testid, -1, - msgs_cnt, NULL); - expected[i].group = group; - expected[i].err = RD_KAFKA_RESP_ERR_NO_ERROR; - } else { - expected[i].group = group; - expected[i].err = RD_KAFKA_RESP_ERR_GROUP_ID_NOT_FOUND; - } - del_groups[i] = rd_kafka_DeleteGroup_new(group); - } - - TIMING_START(&timing, "DeleteGroups"); - TEST_SAY("Call DeleteGroups\n"); - rd_kafka_DeleteGroups(rk, del_groups, MY_DEL_GROUPS_CNT, options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - TIMING_START(&timing, "DeleteGroups.queue_poll"); - - /* Poll result queue for DeleteGroups result. - * Print but otherwise ignore other event types - * (typically generic Error events). */ - while (1) { - rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); - TEST_SAY("DeleteGroups: got %s in %.3fms\n", - rd_kafka_event_name(rkev), - TIMING_DURATION(&timing) / 1000.0f); - if (rkev == NULL) - continue; - if (rd_kafka_event_error(rkev)) - TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), - rd_kafka_event_error_string(rkev)); - - if (rd_kafka_event_type(rkev) == - RD_KAFKA_EVENT_DELETEGROUPS_RESULT) { - break; - } - - rd_kafka_event_destroy(rkev); - } - /* Convert event to proper result */ - res = rd_kafka_event_DeleteGroups_result(rkev); - TEST_ASSERT(res, "expected DeleteGroups_result, not %s", - rd_kafka_event_name(rkev)); - - /* Expecting error */ - err = rd_kafka_event_error(rkev); - errstr2 = rd_kafka_event_error_string(rkev); - TEST_ASSERT(err == exp_err, - "expected DeleteGroups to return %s, not %s (%s)", - rd_kafka_err2str(exp_err), rd_kafka_err2str(err), - err ? errstr2 : "n/a"); - - TEST_SAY("DeleteGroups: returned %s (%s)\n", rd_kafka_err2str(err), - err ? errstr2 : "n/a"); - - size_t cnt = 0; - results = rd_kafka_DeleteGroups_result_groups(res, &cnt); - - TEST_ASSERT(MY_DEL_GROUPS_CNT == cnt, - "expected DeleteGroups_result_groups to return %d items, " - "not %" PRIusz, - MY_DEL_GROUPS_CNT, cnt); - - for (i = 0; i < MY_DEL_GROUPS_CNT; i++) { - const expected_group_result_t *exp = &expected[i]; - rd_kafka_resp_err_t exp_err = exp->err; - const rd_kafka_group_result_t *act = results[i]; - rd_kafka_resp_err_t act_err = - rd_kafka_error_code(rd_kafka_group_result_error(act)); - TEST_ASSERT( - strcmp(exp->group, rd_kafka_group_result_name(act)) == 0, - "Result order mismatch at #%d: expected group name to be " - "%s, not %s", - i, exp->group, rd_kafka_group_result_name(act)); - TEST_ASSERT(exp_err == act_err, - "expected err=%d for group %s, not %d (%s)", - exp_err, exp->group, act_err, - rd_kafka_err2str(act_err)); - } - - rd_kafka_event_destroy(rkev); - - for (i = 0; i < MY_DEL_GROUPS_CNT; i++) { - rd_kafka_DeleteGroup_destroy(del_groups[i]); - rd_free(expected[i].group); - } - - rd_free(topic); - - if (options) - rd_kafka_AdminOptions_destroy(options); - - if (!useq) - rd_kafka_queue_destroy(q); - - TEST_LATER_CHECK(); -#undef MY_DEL_GROUPS_CNT - - SUB_TEST_PASS(); -} - -/** - * @brief Test list groups, creating consumers for a set of groups, - * listing and deleting them at the end. - */ -static void do_test_ListConsumerGroups(const char *what, - rd_kafka_t *rk, - rd_kafka_queue_t *useq, - int request_timeout, - rd_bool_t match_states) { -#define TEST_LIST_CONSUMER_GROUPS_CNT 4 - rd_kafka_queue_t *q; - rd_kafka_AdminOptions_t *options = NULL; - rd_kafka_event_t *rkev = NULL; - rd_kafka_resp_err_t err; - size_t valid_cnt, error_cnt; - rd_bool_t is_simple_consumer_group; - rd_kafka_consumer_group_state_t state; - char errstr[512]; - const char *errstr2, *group_id; - char *list_consumer_groups[TEST_LIST_CONSUMER_GROUPS_CNT]; - const int partitions_cnt = 1; - const int msgs_cnt = 100; - size_t i, found; - char *topic; - rd_kafka_metadata_topic_t exp_mdtopic = {0}; - int64_t testid = test_id_generate(); - test_timing_t timing; - rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; - const rd_kafka_ListConsumerGroups_result_t *res; - const rd_kafka_ConsumerGroupListing_t **groups; - rd_bool_t has_match_states = - test_broker_version >= TEST_BRKVER(2, 7, 0, 0); - - SUB_TEST_QUICK( - "%s ListConsumerGroups with %s, request_timeout %d" - ", match_states %s", - rd_kafka_name(rk), what, request_timeout, RD_STR_ToF(match_states)); - - q = useq ? useq : rd_kafka_queue_new(rk); - - if (request_timeout != -1) { - options = rd_kafka_AdminOptions_new( - rk, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS); - - if (match_states) { - rd_kafka_consumer_group_state_t empty = - RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY; - - TEST_CALL_ERROR__( - rd_kafka_AdminOptions_set_match_consumer_group_states( - options, &empty, 1)); - } - - TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout( - options, request_timeout, errstr, sizeof(errstr))); - } - - - topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); - exp_mdtopic.topic = topic; - - /* Create the topics first. */ - test_CreateTopics_simple(rk, NULL, &topic, 1, partitions_cnt, NULL); - - /* Verify that topics are reported by metadata */ - test_wait_metadata_update(rk, &exp_mdtopic, 1, NULL, 0, 15 * 1000); - - /* Produce 100 msgs */ - test_produce_msgs_easy(topic, testid, 0, msgs_cnt); - - for (i = 0; i < TEST_LIST_CONSUMER_GROUPS_CNT; i++) { - char *group = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); - test_consume_msgs_easy(group, topic, testid, -1, msgs_cnt, - NULL); - list_consumer_groups[i] = group; - } - - TIMING_START(&timing, "ListConsumerGroups"); - TEST_SAY("Call ListConsumerGroups\n"); - rd_kafka_ListConsumerGroups(rk, options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - TIMING_START(&timing, "ListConsumerGroups.queue_poll"); - - /* Poll result queue for ListConsumerGroups result. - * Print but otherwise ignore other event types - * (typically generic Error events). */ - while (1) { - rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); - TEST_SAY("ListConsumerGroups: got %s in %.3fms\n", - rd_kafka_event_name(rkev), - TIMING_DURATION(&timing) / 1000.0f); - if (rkev == NULL) - continue; - if (rd_kafka_event_error(rkev)) - TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), - rd_kafka_event_error_string(rkev)); - - if (rd_kafka_event_type(rkev) == - RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT) { - break; - } - - rd_kafka_event_destroy(rkev); - } - /* Convert event to proper result */ - res = rd_kafka_event_ListConsumerGroups_result(rkev); - TEST_ASSERT(res, "expected ListConsumerGroups_result, got %s", - rd_kafka_event_name(rkev)); - - /* Expecting error */ - err = rd_kafka_event_error(rkev); - errstr2 = rd_kafka_event_error_string(rkev); - TEST_ASSERT(err == exp_err, - "expected ListConsumerGroups to return %s, got %s (%s)", - rd_kafka_err2str(exp_err), rd_kafka_err2str(err), - err ? errstr2 : "n/a"); - - TEST_SAY("ListConsumerGroups: returned %s (%s)\n", - rd_kafka_err2str(err), err ? errstr2 : "n/a"); - - groups = rd_kafka_ListConsumerGroups_result_valid(res, &valid_cnt); - rd_kafka_ListConsumerGroups_result_errors(res, &error_cnt); - - /* Other tests could be running */ - TEST_ASSERT(valid_cnt >= TEST_LIST_CONSUMER_GROUPS_CNT, - "expected ListConsumerGroups to return at least %" PRId32 - " valid groups," - " got %zu", - TEST_LIST_CONSUMER_GROUPS_CNT, valid_cnt); - - TEST_ASSERT(error_cnt == 0, - "expected ListConsumerGroups to return 0 errors," - " got %zu", - error_cnt); - - found = 0; - for (i = 0; i < valid_cnt; i++) { - int j; - const rd_kafka_ConsumerGroupListing_t *group; - group = groups[i]; - group_id = rd_kafka_ConsumerGroupListing_group_id(group); - is_simple_consumer_group = - rd_kafka_ConsumerGroupListing_is_simple_consumer_group( - group); - state = rd_kafka_ConsumerGroupListing_state(group); - for (j = 0; j < TEST_LIST_CONSUMER_GROUPS_CNT; j++) { - if (!strcmp(list_consumer_groups[j], group_id)) { - found++; - TEST_ASSERT(!is_simple_consumer_group, - "expected a normal group," - " got a simple group"); - - if (!has_match_states) - break; - - TEST_ASSERT( - state == - RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY, - "expected an Empty state," - " got state %s", - rd_kafka_consumer_group_state_name(state)); - break; - } - } - } - TEST_ASSERT(found == TEST_LIST_CONSUMER_GROUPS_CNT, - "expected to find %d" - " started groups," - " got %" PRIusz, - TEST_LIST_CONSUMER_GROUPS_CNT, found); - - rd_kafka_event_destroy(rkev); - - test_DeleteGroups_simple(rk, NULL, (char **)list_consumer_groups, - TEST_LIST_CONSUMER_GROUPS_CNT, NULL); - - for (i = 0; i < TEST_LIST_CONSUMER_GROUPS_CNT; i++) { - rd_free(list_consumer_groups[i]); - } - - rd_free(topic); - - if (options) - rd_kafka_AdminOptions_destroy(options); - - if (!useq) - rd_kafka_queue_destroy(q); - - TEST_LATER_CHECK(); -#undef TEST_LIST_CONSUMER_GROUPS_CNT - - SUB_TEST_PASS(); -} - -typedef struct expected_DescribeConsumerGroups_result { - char *group_id; - rd_kafka_resp_err_t err; -} expected_DescribeConsumerGroups_result_t; - - -/** - * @brief Test describe groups, creating consumers for a set of groups, - * describing and deleting them at the end. - */ -static void do_test_DescribeConsumerGroups(const char *what, - rd_kafka_t *rk, - rd_kafka_queue_t *useq, - int request_timeout) { - rd_kafka_queue_t *q; - rd_kafka_AdminOptions_t *options = NULL; - rd_kafka_event_t *rkev = NULL; - rd_kafka_resp_err_t err; - char errstr[512]; - const char *errstr2; -#define TEST_DESCRIBE_CONSUMER_GROUPS_CNT 4 - int known_groups = TEST_DESCRIBE_CONSUMER_GROUPS_CNT - 1; - int i; - const int partitions_cnt = 1; - const int msgs_cnt = 100; - char *topic; - rd_kafka_metadata_topic_t exp_mdtopic = {0}; - int64_t testid = test_id_generate(); - test_timing_t timing; - rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; - const rd_kafka_ConsumerGroupDescription_t **results = NULL; - expected_DescribeConsumerGroups_result_t - expected[TEST_DESCRIBE_CONSUMER_GROUPS_CNT] = RD_ZERO_INIT; - const char *describe_groups[TEST_DESCRIBE_CONSUMER_GROUPS_CNT]; - char group_instance_ids[TEST_DESCRIBE_CONSUMER_GROUPS_CNT][512]; - char client_ids[TEST_DESCRIBE_CONSUMER_GROUPS_CNT][512]; - rd_kafka_t *rks[TEST_DESCRIBE_CONSUMER_GROUPS_CNT]; - const rd_kafka_DescribeConsumerGroups_result_t *res; - rd_bool_t has_group_instance_id = - test_broker_version >= TEST_BRKVER(2, 4, 0, 0); - - SUB_TEST_QUICK("%s DescribeConsumerGroups with %s, request_timeout %d", - rd_kafka_name(rk), what, request_timeout); - - q = useq ? useq : rd_kafka_queue_new(rk); - - if (request_timeout != -1) { - options = rd_kafka_AdminOptions_new( - rk, RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS); - - err = rd_kafka_AdminOptions_set_request_timeout( - options, request_timeout, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - } - - - topic = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); - exp_mdtopic.topic = topic; - - /* Create the topics first. */ - test_CreateTopics_simple(rk, NULL, &topic, 1, partitions_cnt, NULL); - - /* Verify that topics are reported by metadata */ - test_wait_metadata_update(rk, &exp_mdtopic, 1, NULL, 0, 15 * 1000); - - /* Produce 100 msgs */ - test_produce_msgs_easy(topic, testid, 0, msgs_cnt); - - for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { - rd_kafka_conf_t *conf; - char *group_id = rd_strdup(test_mk_topic_name(__FUNCTION__, 1)); - if (i < known_groups) { - snprintf(group_instance_ids[i], - sizeof(group_instance_ids[i]), - "group_instance_id_%" PRId32, i); - snprintf(client_ids[i], sizeof(client_ids[i]), - "client_id_%" PRId32, i); - - test_conf_init(&conf, NULL, 0); - test_conf_set(conf, "client.id", client_ids[i]); - test_conf_set(conf, "group.instance.id", - group_instance_ids[i]); - test_conf_set(conf, "session.timeout.ms", "5000"); - test_conf_set(conf, "auto.offset.reset", "earliest"); - rks[i] = - test_create_consumer(group_id, NULL, conf, NULL); - test_consumer_subscribe(rks[i], topic); - /* Consume messages */ - test_consumer_poll("consumer", rks[i], testid, -1, -1, - msgs_cnt, NULL); - } - expected[i].group_id = group_id; - expected[i].err = RD_KAFKA_RESP_ERR_NO_ERROR; - describe_groups[i] = group_id; - } - - TIMING_START(&timing, "DescribeConsumerGroups"); - TEST_SAY("Call DescribeConsumerGroups\n"); - rd_kafka_DescribeConsumerGroups( - rk, describe_groups, TEST_DESCRIBE_CONSUMER_GROUPS_CNT, options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - TIMING_START(&timing, "DescribeConsumerGroups.queue_poll"); - - /* Poll result queue for DescribeConsumerGroups result. - * Print but otherwise ignore other event types - * (typically generic Error events). */ - while (1) { - rkev = rd_kafka_queue_poll(q, tmout_multip(20 * 1000)); - TEST_SAY("DescribeConsumerGroups: got %s in %.3fms\n", - rd_kafka_event_name(rkev), - TIMING_DURATION(&timing) / 1000.0f); - if (rkev == NULL) - continue; - if (rd_kafka_event_error(rkev)) - TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), - rd_kafka_event_error_string(rkev)); - - if (rd_kafka_event_type(rkev) == - RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT) { - break; - } - - rd_kafka_event_destroy(rkev); - } - /* Convert event to proper result */ - res = rd_kafka_event_DescribeConsumerGroups_result(rkev); - TEST_ASSERT(res, "expected DescribeConsumerGroups_result, got %s", - rd_kafka_event_name(rkev)); - - /* Expecting error */ - err = rd_kafka_event_error(rkev); - errstr2 = rd_kafka_event_error_string(rkev); - TEST_ASSERT(err == exp_err, - "expected DescribeConsumerGroups to return %s, got %s (%s)", - rd_kafka_err2str(exp_err), rd_kafka_err2str(err), - err ? errstr2 : "n/a"); - - TEST_SAY("DescribeConsumerGroups: returned %s (%s)\n", - rd_kafka_err2str(err), err ? errstr2 : "n/a"); - - size_t cnt = 0; - results = rd_kafka_DescribeConsumerGroups_result_groups(res, &cnt); - - TEST_ASSERT( - TEST_DESCRIBE_CONSUMER_GROUPS_CNT == cnt, - "expected DescribeConsumerGroups_result_groups to return %d items, " - "got %" PRIusz, - TEST_DESCRIBE_CONSUMER_GROUPS_CNT, cnt); - - for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { - expected_DescribeConsumerGroups_result_t *exp = &expected[i]; - rd_kafka_resp_err_t exp_err = exp->err; - const rd_kafka_ConsumerGroupDescription_t *act = results[i]; - rd_kafka_resp_err_t act_err = rd_kafka_error_code( - rd_kafka_ConsumerGroupDescription_error(act)); - rd_kafka_consumer_group_state_t state = - rd_kafka_ConsumerGroupDescription_state(act); - TEST_ASSERT( - strcmp(exp->group_id, - rd_kafka_ConsumerGroupDescription_group_id(act)) == - 0, - "Result order mismatch at #%d: expected group id to be " - "%s, got %s", - i, exp->group_id, - rd_kafka_ConsumerGroupDescription_group_id(act)); - if (i < known_groups) { - int member_count; - const rd_kafka_MemberDescription_t *member; - const rd_kafka_MemberAssignment_t *assignment; - const char *client_id; - const char *group_instance_id; - const rd_kafka_topic_partition_list_t *partitions; - - TEST_ASSERT(state == - RD_KAFKA_CONSUMER_GROUP_STATE_STABLE, - "Expected Stable state, got %s.", - rd_kafka_consumer_group_state_name(state)); - - TEST_ASSERT( - !rd_kafka_ConsumerGroupDescription_is_simple_consumer_group( - act), - "Expected a normal consumer group, got a simple " - "one."); - - member_count = - rd_kafka_ConsumerGroupDescription_member_count(act); - TEST_ASSERT(member_count == 1, - "Expected one member, got %d.", - member_count); - - member = - rd_kafka_ConsumerGroupDescription_member(act, 0); - - client_id = - rd_kafka_MemberDescription_client_id(member); - TEST_ASSERT(!strcmp(client_id, client_ids[i]), - "Expected client id \"%s\"," - " got \"%s\".", - client_ids[i], client_id); - - if (has_group_instance_id) { - group_instance_id = - rd_kafka_MemberDescription_group_instance_id( - member); - TEST_ASSERT(!strcmp(group_instance_id, - group_instance_ids[i]), - "Expected group instance id \"%s\"," - " got \"%s\".", - group_instance_ids[i], - group_instance_id); - } - - assignment = - rd_kafka_MemberDescription_assignment(member); - TEST_ASSERT(assignment != NULL, - "Expected non-NULL member assignment"); - - partitions = - rd_kafka_MemberAssignment_partitions(assignment); - TEST_ASSERT(partitions != NULL, - "Expected non-NULL member partitions"); - - TEST_SAY( - "Member client.id=\"%s\", " - "group.instance.id=\"%s\", " - "consumer_id=\"%s\", " - "host=\"%s\", assignment:\n", - rd_kafka_MemberDescription_client_id(member), - rd_kafka_MemberDescription_group_instance_id( - member), - rd_kafka_MemberDescription_consumer_id(member), - rd_kafka_MemberDescription_host(member)); - /* This is just to make sure the returned memory - * is valid. */ - test_print_partition_list(partitions); - } else { - TEST_ASSERT(state == RD_KAFKA_CONSUMER_GROUP_STATE_DEAD, - "Expected Dead state, got %s.", - rd_kafka_consumer_group_state_name(state)); - } - TEST_ASSERT(exp_err == act_err, - "expected err=%d for group %s, got %d (%s)", - exp_err, exp->group_id, act_err, - rd_kafka_err2str(act_err)); - } - - rd_kafka_event_destroy(rkev); - - for (i = 0; i < known_groups; i++) { - test_consumer_close(rks[i]); - rd_kafka_destroy(rks[i]); - } - - /* Wait session timeout + 1s. Because using static group membership */ - rd_sleep(6); - - test_DeleteGroups_simple(rk, NULL, (char **)describe_groups, - known_groups, NULL); - - for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) { - rd_free(expected[i].group_id); - } - - rd_free(topic); - - if (options) - rd_kafka_AdminOptions_destroy(options); - - if (!useq) - rd_kafka_queue_destroy(q); - - TEST_LATER_CHECK(); -#undef TEST_DESCRIBE_CONSUMER_GROUPS_CNT - - SUB_TEST_PASS(); -} - -/** - * @brief Test deletion of committed offsets. - * - * - */ -static void do_test_DeleteConsumerGroupOffsets(const char *what, - rd_kafka_t *rk, - rd_kafka_queue_t *useq, - int req_timeout_ms, - rd_bool_t sub_consumer) { - rd_kafka_queue_t *q; - rd_kafka_AdminOptions_t *options = NULL; - rd_kafka_topic_partition_list_t *orig_offsets, *offsets, *to_delete, - *committed, *deleted, *subscription = NULL; - rd_kafka_event_t *rkev = NULL; - rd_kafka_resp_err_t err; - char errstr[512]; - const char *errstr2; -#define MY_TOPIC_CNT 3 - int i; - const int partitions_cnt = 3; - char *topics[MY_TOPIC_CNT]; - rd_kafka_metadata_topic_t exp_mdtopics[MY_TOPIC_CNT] = {{0}}; - int exp_mdtopic_cnt = 0; - test_timing_t timing; - rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; - rd_kafka_DeleteConsumerGroupOffsets_t *cgoffsets; - const rd_kafka_DeleteConsumerGroupOffsets_result_t *res; - const rd_kafka_group_result_t **gres; - size_t gres_cnt; - rd_kafka_t *consumer; - char *groupid; - - SUB_TEST_QUICK( - "%s DeleteConsumerGroupOffsets with %s, req_timeout_ms %d%s", - rd_kafka_name(rk), what, req_timeout_ms, - sub_consumer ? ", with subscribing consumer" : ""); - - if (sub_consumer) - exp_err = RD_KAFKA_RESP_ERR_GROUP_SUBSCRIBED_TO_TOPIC; - - q = useq ? useq : rd_kafka_queue_new(rk); - - if (req_timeout_ms != -1) { - options = rd_kafka_AdminOptions_new( - rk, RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS); - - err = rd_kafka_AdminOptions_set_request_timeout( - options, req_timeout_ms, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - } - - - subscription = rd_kafka_topic_partition_list_new(MY_TOPIC_CNT); - - for (i = 0; i < MY_TOPIC_CNT; i++) { - char pfx[64]; - char *topic; - - rd_snprintf(pfx, sizeof(pfx), "DCGO-topic%d", i); - topic = rd_strdup(test_mk_topic_name(pfx, 1)); - - topics[i] = topic; - exp_mdtopics[exp_mdtopic_cnt++].topic = topic; - - rd_kafka_topic_partition_list_add(subscription, topic, - RD_KAFKA_PARTITION_UA); - } - - groupid = topics[0]; - - /* Create the topics first. */ - test_CreateTopics_simple(rk, NULL, topics, MY_TOPIC_CNT, partitions_cnt, - NULL); - - /* Verify that topics are reported by metadata */ - test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0, - 15 * 1000); - - rd_sleep(1); /* Additional wait time for cluster propagation */ - - consumer = test_create_consumer(groupid, NULL, NULL, NULL); - - if (sub_consumer) { - TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription)); - test_consumer_wait_assignment(consumer, rd_true); - } - - /* Commit some offsets */ - orig_offsets = rd_kafka_topic_partition_list_new(MY_TOPIC_CNT * 2); - for (i = 0; i < MY_TOPIC_CNT * 2; i++) - rd_kafka_topic_partition_list_add(orig_offsets, topics[i / 2], - i % MY_TOPIC_CNT) - ->offset = (i + 1) * 10; - - TEST_CALL_ERR__(rd_kafka_commit(consumer, orig_offsets, 0 /*sync*/)); - - /* Verify committed offsets match */ - committed = rd_kafka_topic_partition_list_copy(orig_offsets); - TEST_CALL_ERR__( - rd_kafka_committed(consumer, committed, tmout_multip(5 * 1000))); - - if (test_partition_list_and_offsets_cmp(committed, orig_offsets)) { - TEST_SAY("commit() list:\n"); - test_print_partition_list(orig_offsets); - TEST_SAY("committed() list:\n"); - test_print_partition_list(committed); - TEST_FAIL("committed offsets don't match"); - } - - rd_kafka_topic_partition_list_destroy(committed); - - /* Now delete second half of the commits */ - offsets = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2); - to_delete = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2); - for (i = 0; i < orig_offsets->cnt; i++) { - rd_kafka_topic_partition_t *rktpar; - if (i < orig_offsets->cnt / 2) { - rktpar = rd_kafka_topic_partition_list_add( - offsets, orig_offsets->elems[i].topic, - orig_offsets->elems[i].partition); - rktpar->offset = orig_offsets->elems[i].offset; - } else { - rktpar = rd_kafka_topic_partition_list_add( - to_delete, orig_offsets->elems[i].topic, - orig_offsets->elems[i].partition); - rktpar->offset = RD_KAFKA_OFFSET_INVALID; - rktpar = rd_kafka_topic_partition_list_add( - offsets, orig_offsets->elems[i].topic, - orig_offsets->elems[i].partition); - rktpar->offset = RD_KAFKA_OFFSET_INVALID; - } - } - - cgoffsets = rd_kafka_DeleteConsumerGroupOffsets_new(groupid, to_delete); - - TIMING_START(&timing, "DeleteConsumerGroupOffsets"); - TEST_SAY("Call DeleteConsumerGroupOffsets\n"); - rd_kafka_DeleteConsumerGroupOffsets(rk, &cgoffsets, 1, options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - rd_kafka_DeleteConsumerGroupOffsets_destroy(cgoffsets); - - TIMING_START(&timing, "DeleteConsumerGroupOffsets.queue_poll"); - /* Poll result queue for DeleteConsumerGroupOffsets result. - * Print but otherwise ignore other event types - * (typically generic Error events). */ - while (1) { - rkev = rd_kafka_queue_poll(q, tmout_multip(10 * 1000)); - TEST_SAY("DeleteConsumerGroupOffsets: got %s in %.3fms\n", - rd_kafka_event_name(rkev), - TIMING_DURATION(&timing) / 1000.0f); - if (rkev == NULL) - continue; - if (rd_kafka_event_error(rkev)) - TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), - rd_kafka_event_error_string(rkev)); - - if (rd_kafka_event_type(rkev) == - RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT) - break; - - rd_kafka_event_destroy(rkev); - } - - /* Convert event to proper result */ - res = rd_kafka_event_DeleteConsumerGroupOffsets_result(rkev); - TEST_ASSERT(res, "expected DeleteConsumerGroupOffsets_result, not %s", - rd_kafka_event_name(rkev)); - - /* Expecting error */ - err = rd_kafka_event_error(rkev); - errstr2 = rd_kafka_event_error_string(rkev); - TEST_ASSERT(!err, - "expected DeleteConsumerGroupOffsets to succeed, " - "got %s (%s)", - rd_kafka_err2name(err), err ? errstr2 : "n/a"); - - TEST_SAY("DeleteConsumerGroupOffsets: returned %s (%s)\n", - rd_kafka_err2str(err), err ? errstr2 : "n/a"); - - gres = - rd_kafka_DeleteConsumerGroupOffsets_result_groups(res, &gres_cnt); - TEST_ASSERT(gres && gres_cnt == 1, - "expected gres_cnt == 1, not %" PRIusz, gres_cnt); - - deleted = rd_kafka_topic_partition_list_copy( - rd_kafka_group_result_partitions(gres[0])); - - if (test_partition_list_and_offsets_cmp(deleted, to_delete)) { - TEST_SAY("Result list:\n"); - test_print_partition_list(deleted); - TEST_SAY("Partitions passed to DeleteConsumerGroupOffsets:\n"); - test_print_partition_list(to_delete); - TEST_FAIL("deleted/requested offsets don't match"); - } - - /* Verify expected errors */ - for (i = 0; i < deleted->cnt; i++) { - TEST_ASSERT_LATER(deleted->elems[i].err == exp_err, - "Result %s [%" PRId32 - "] has error %s, " - "expected %s", - deleted->elems[i].topic, - deleted->elems[i].partition, - rd_kafka_err2name(deleted->elems[i].err), - rd_kafka_err2name(exp_err)); - } - - TEST_LATER_CHECK(); - - rd_kafka_topic_partition_list_destroy(deleted); - rd_kafka_topic_partition_list_destroy(to_delete); - - rd_kafka_event_destroy(rkev); - - - /* Verify committed offsets match */ - committed = rd_kafka_topic_partition_list_copy(orig_offsets); - TEST_CALL_ERR__( - rd_kafka_committed(consumer, committed, tmout_multip(5 * 1000))); - - TEST_SAY("Original committed offsets:\n"); - test_print_partition_list(orig_offsets); - - TEST_SAY("Committed offsets after delete:\n"); - test_print_partition_list(committed); - - rd_kafka_topic_partition_list_t *expected = offsets; - if (sub_consumer) - expected = orig_offsets; - - if (test_partition_list_and_offsets_cmp(committed, expected)) { - TEST_SAY("expected list:\n"); - test_print_partition_list(expected); - TEST_SAY("committed() list:\n"); - test_print_partition_list(committed); - TEST_FAIL("committed offsets don't match"); - } - - rd_kafka_topic_partition_list_destroy(committed); - rd_kafka_topic_partition_list_destroy(offsets); - rd_kafka_topic_partition_list_destroy(orig_offsets); - rd_kafka_topic_partition_list_destroy(subscription); - - for (i = 0; i < MY_TOPIC_CNT; i++) - rd_free(topics[i]); - - rd_kafka_destroy(consumer); - - if (options) - rd_kafka_AdminOptions_destroy(options); - - if (!useq) - rd_kafka_queue_destroy(q); - - TEST_LATER_CHECK(); -#undef MY_TOPIC_CNT - - SUB_TEST_PASS(); -} - - -/** - * @brief Test altering of committed offsets. - * - * - */ -static void do_test_AlterConsumerGroupOffsets(const char *what, - rd_kafka_t *rk, - rd_kafka_queue_t *useq, - int req_timeout_ms, - rd_bool_t sub_consumer, - rd_bool_t create_topics) { - rd_kafka_queue_t *q; - rd_kafka_AdminOptions_t *options = NULL; - rd_kafka_topic_partition_list_t *orig_offsets, *offsets, *to_alter, - *committed, *alterd, *subscription = NULL; - rd_kafka_event_t *rkev = NULL; - rd_kafka_resp_err_t err; - char errstr[512]; - const char *errstr2; -#define TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT 3 - int i; - const int partitions_cnt = 3; - char *topics[TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT]; - rd_kafka_metadata_topic_t - exp_mdtopics[TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT] = {{0}}; - int exp_mdtopic_cnt = 0; - test_timing_t timing; - rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; - rd_kafka_AlterConsumerGroupOffsets_t *cgoffsets; - const rd_kafka_AlterConsumerGroupOffsets_result_t *res; - const rd_kafka_group_result_t **gres; - size_t gres_cnt; - rd_kafka_t *consumer = NULL; - char *group_id; - - SUB_TEST_QUICK( - "%s AlterConsumerGroupOffsets with %s, " - "request_timeout %d%s", - rd_kafka_name(rk), what, req_timeout_ms, - sub_consumer ? ", with subscribing consumer" : ""); - - if (!create_topics) - exp_err = RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART; - else if (sub_consumer) - exp_err = RD_KAFKA_RESP_ERR_UNKNOWN_MEMBER_ID; - - if (sub_consumer && !create_topics) - TEST_FAIL( - "Can't use set sub_consumer and unset create_topics at the " - "same time"); - - q = useq ? useq : rd_kafka_queue_new(rk); - - if (req_timeout_ms != -1) { - options = rd_kafka_AdminOptions_new( - rk, RD_KAFKA_ADMIN_OP_ALTERCONSUMERGROUPOFFSETS); - - err = rd_kafka_AdminOptions_set_request_timeout( - options, req_timeout_ms, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - } - - - subscription = rd_kafka_topic_partition_list_new( - TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT); - - for (i = 0; i < TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT; i++) { - char pfx[64]; - char *topic; - - rd_snprintf(pfx, sizeof(pfx), "DCGO-topic%d", i); - topic = rd_strdup(test_mk_topic_name(pfx, 1)); - - topics[i] = topic; - exp_mdtopics[exp_mdtopic_cnt++].topic = topic; - - rd_kafka_topic_partition_list_add(subscription, topic, - RD_KAFKA_PARTITION_UA); - } - - group_id = topics[0]; - - /* Create the topics first if needed. */ - if (create_topics) { - test_CreateTopics_simple( - rk, NULL, topics, - TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT, partitions_cnt, - NULL); - - /* Verify that topics are reported by metadata */ - test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, - NULL, 0, 15 * 1000); - - rd_sleep(1); /* Additional wait time for cluster propagation */ - - consumer = test_create_consumer(group_id, NULL, NULL, NULL); - - if (sub_consumer) { - TEST_CALL_ERR__( - rd_kafka_subscribe(consumer, subscription)); - test_consumer_wait_assignment(consumer, rd_true); - } - } - - orig_offsets = rd_kafka_topic_partition_list_new( - TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT * partitions_cnt); - for (i = 0; - i < TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT * partitions_cnt; - i++) { - rd_kafka_topic_partition_t *rktpar; - rktpar = rd_kafka_topic_partition_list_add( - orig_offsets, topics[i / partitions_cnt], - i % partitions_cnt); - rktpar->offset = (i + 1) * 10; - rd_kafka_topic_partition_set_leader_epoch(rktpar, 1); - } - - /* Commit some offsets, if topics exists */ - if (create_topics) { - TEST_CALL_ERR__( - rd_kafka_commit(consumer, orig_offsets, 0 /*sync*/)); - - /* Verify committed offsets match */ - committed = rd_kafka_topic_partition_list_copy(orig_offsets); - TEST_CALL_ERR__(rd_kafka_committed(consumer, committed, - tmout_multip(5 * 1000))); - - if (test_partition_list_and_offsets_cmp(committed, - orig_offsets)) { - TEST_SAY("commit() list:\n"); - test_print_partition_list(orig_offsets); - TEST_SAY("committed() list:\n"); - test_print_partition_list(committed); - TEST_FAIL("committed offsets don't match"); - } - rd_kafka_topic_partition_list_destroy(committed); - } - - /* Now alter second half of the commits */ - offsets = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2); - to_alter = rd_kafka_topic_partition_list_new(orig_offsets->cnt / 2); - for (i = 0; i < orig_offsets->cnt; i++) { - rd_kafka_topic_partition_t *rktpar; - if (i < orig_offsets->cnt / 2) { - rktpar = rd_kafka_topic_partition_list_add( - offsets, orig_offsets->elems[i].topic, - orig_offsets->elems[i].partition); - rktpar->offset = orig_offsets->elems[i].offset; - rd_kafka_topic_partition_set_leader_epoch( - rktpar, rd_kafka_topic_partition_get_leader_epoch( - &orig_offsets->elems[i])); - } else { - rktpar = rd_kafka_topic_partition_list_add( - to_alter, orig_offsets->elems[i].topic, - orig_offsets->elems[i].partition); - rktpar->offset = 5; - rd_kafka_topic_partition_set_leader_epoch(rktpar, 2); - rktpar = rd_kafka_topic_partition_list_add( - offsets, orig_offsets->elems[i].topic, - orig_offsets->elems[i].partition); - rktpar->offset = 5; - rd_kafka_topic_partition_set_leader_epoch(rktpar, 2); - } - } - - cgoffsets = rd_kafka_AlterConsumerGroupOffsets_new(group_id, to_alter); - - TIMING_START(&timing, "AlterConsumerGroupOffsets"); - TEST_SAY("Call AlterConsumerGroupOffsets\n"); - rd_kafka_AlterConsumerGroupOffsets(rk, &cgoffsets, 1, options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - rd_kafka_AlterConsumerGroupOffsets_destroy(cgoffsets); - - TIMING_START(&timing, "AlterConsumerGroupOffsets.queue_poll"); - /* Poll result queue for AlterConsumerGroupOffsets result. - * Print but otherwise ignore other event types - * (typically generic Error events). */ - while (1) { - rkev = rd_kafka_queue_poll(q, tmout_multip(10 * 1000)); - TEST_SAY("AlterConsumerGroupOffsets: got %s in %.3fms\n", - rd_kafka_event_name(rkev), - TIMING_DURATION(&timing) / 1000.0f); - if (rkev == NULL) - continue; - if (rd_kafka_event_error(rkev)) - TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), - rd_kafka_event_error_string(rkev)); - - if (rd_kafka_event_type(rkev) == - RD_KAFKA_EVENT_ALTERCONSUMERGROUPOFFSETS_RESULT) - break; - - rd_kafka_event_destroy(rkev); - } - - /* Convert event to proper result */ - res = rd_kafka_event_AlterConsumerGroupOffsets_result(rkev); - TEST_ASSERT(res, "expected AlterConsumerGroupOffsets_result, not %s", - rd_kafka_event_name(rkev)); - - /* Expecting error */ - err = rd_kafka_event_error(rkev); - errstr2 = rd_kafka_event_error_string(rkev); - TEST_ASSERT(!err, - "expected AlterConsumerGroupOffsets to succeed, " - "got %s (%s)", - rd_kafka_err2name(err), err ? errstr2 : "n/a"); - - TEST_SAY("AlterConsumerGroupOffsets: returned %s (%s)\n", - rd_kafka_err2str(err), err ? errstr2 : "n/a"); - - gres = rd_kafka_AlterConsumerGroupOffsets_result_groups(res, &gres_cnt); - TEST_ASSERT(gres && gres_cnt == 1, - "expected gres_cnt == 1, not %" PRIusz, gres_cnt); - - alterd = rd_kafka_topic_partition_list_copy( - rd_kafka_group_result_partitions(gres[0])); - - if (test_partition_list_and_offsets_cmp(alterd, to_alter)) { - TEST_SAY("Result list:\n"); - test_print_partition_list(alterd); - TEST_SAY("Partitions passed to AlterConsumerGroupOffsets:\n"); - test_print_partition_list(to_alter); - TEST_FAIL("altered/requested offsets don't match"); - } - - /* Verify expected errors */ - for (i = 0; i < alterd->cnt; i++) { - TEST_ASSERT_LATER(alterd->elems[i].err == exp_err, - "Result %s [%" PRId32 - "] has error %s, " - "expected %s", - alterd->elems[i].topic, - alterd->elems[i].partition, - rd_kafka_err2name(alterd->elems[i].err), - rd_kafka_err2name(exp_err)); - } - - TEST_LATER_CHECK(); - - rd_kafka_topic_partition_list_destroy(alterd); - rd_kafka_topic_partition_list_destroy(to_alter); - - rd_kafka_event_destroy(rkev); - - - /* Verify committed offsets match, if topics exist. */ - if (create_topics) { - committed = rd_kafka_topic_partition_list_copy(orig_offsets); - TEST_CALL_ERR__(rd_kafka_committed(consumer, committed, - tmout_multip(5 * 1000))); - - rd_kafka_topic_partition_list_t *expected = offsets; - if (sub_consumer) { - /* Alter fails with an active consumer */ - expected = orig_offsets; - } - TEST_SAY("Original committed offsets:\n"); - test_print_partition_list(orig_offsets); - - TEST_SAY("Committed offsets after alter:\n"); - test_print_partition_list(committed); - - if (test_partition_list_and_offsets_cmp(committed, expected)) { - TEST_SAY("expected list:\n"); - test_print_partition_list(expected); - TEST_SAY("committed() list:\n"); - test_print_partition_list(committed); - TEST_FAIL("committed offsets don't match"); - } - rd_kafka_topic_partition_list_destroy(committed); - } - - rd_kafka_topic_partition_list_destroy(offsets); - rd_kafka_topic_partition_list_destroy(orig_offsets); - rd_kafka_topic_partition_list_destroy(subscription); - - for (i = 0; i < TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT; i++) - rd_free(topics[i]); - - if (create_topics) /* consumer is created only if topics are. */ - rd_kafka_destroy(consumer); - - if (options) - rd_kafka_AdminOptions_destroy(options); - - if (!useq) - rd_kafka_queue_destroy(q); - - TEST_LATER_CHECK(); -#undef TEST_ALTER_CONSUMER_GROUP_OFFSETS_TOPIC_CNT - - SUB_TEST_PASS(); -} - -/** - * @brief Test listing of committed offsets. - * - * - */ -static void do_test_ListConsumerGroupOffsets(const char *what, - rd_kafka_t *rk, - rd_kafka_queue_t *useq, - int req_timeout_ms, - rd_bool_t sub_consumer, - rd_bool_t null_toppars) { - rd_kafka_queue_t *q; - rd_kafka_AdminOptions_t *options = NULL; - rd_kafka_topic_partition_list_t *orig_offsets, *to_list, *committed, - *listd, *subscription = NULL; - rd_kafka_event_t *rkev = NULL; - rd_kafka_resp_err_t err; - char errstr[512]; - const char *errstr2; -#define TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT 3 - int i; - const int partitions_cnt = 3; - char *topics[TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT]; - rd_kafka_metadata_topic_t - exp_mdtopics[TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT] = {{0}}; - int exp_mdtopic_cnt = 0; - test_timing_t timing; - rd_kafka_resp_err_t exp_err = RD_KAFKA_RESP_ERR_NO_ERROR; - rd_kafka_ListConsumerGroupOffsets_t *cgoffsets; - const rd_kafka_ListConsumerGroupOffsets_result_t *res; - const rd_kafka_group_result_t **gres; - size_t gres_cnt; - rd_kafka_t *consumer; - char *group_id; - - SUB_TEST_QUICK( - "%s ListConsumerGroupOffsets with %s, " - "request timeout %d%s", - rd_kafka_name(rk), what, req_timeout_ms, - sub_consumer ? ", with subscribing consumer" : ""); - - q = useq ? useq : rd_kafka_queue_new(rk); - - if (req_timeout_ms != -1) { - options = rd_kafka_AdminOptions_new( - rk, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS); - - err = rd_kafka_AdminOptions_set_request_timeout( - options, req_timeout_ms, errstr, sizeof(errstr)); - TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); - } - - - subscription = rd_kafka_topic_partition_list_new( - TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT); - - for (i = 0; i < TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT; i++) { - char pfx[64]; - char *topic; - - rd_snprintf(pfx, sizeof(pfx), "DCGO-topic%d", i); - topic = rd_strdup(test_mk_topic_name(pfx, 1)); - - topics[i] = topic; - exp_mdtopics[exp_mdtopic_cnt++].topic = topic; - - rd_kafka_topic_partition_list_add(subscription, topic, - RD_KAFKA_PARTITION_UA); - } - - group_id = topics[0]; - - /* Create the topics first. */ - test_CreateTopics_simple(rk, NULL, topics, - TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT, - partitions_cnt, NULL); - - /* Verify that topics are reported by metadata */ - test_wait_metadata_update(rk, exp_mdtopics, exp_mdtopic_cnt, NULL, 0, - 15 * 1000); - - rd_sleep(1); /* Additional wait time for cluster propagation */ - - consumer = test_create_consumer(group_id, NULL, NULL, NULL); - - if (sub_consumer) { - TEST_CALL_ERR__(rd_kafka_subscribe(consumer, subscription)); - test_consumer_wait_assignment(consumer, rd_true); - } - - /* Commit some offsets */ - orig_offsets = rd_kafka_topic_partition_list_new( - TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT * 2); - for (i = 0; i < TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT * 2; i++) { - rd_kafka_topic_partition_t *rktpar; - rktpar = rd_kafka_topic_partition_list_add( - orig_offsets, topics[i / 2], - i % TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT); - rktpar->offset = (i + 1) * 10; - rd_kafka_topic_partition_set_leader_epoch(rktpar, 2); - } - - TEST_CALL_ERR__(rd_kafka_commit(consumer, orig_offsets, 0 /*sync*/)); - - /* Verify committed offsets match */ - committed = rd_kafka_topic_partition_list_copy(orig_offsets); - TEST_CALL_ERR__( - rd_kafka_committed(consumer, committed, tmout_multip(5 * 1000))); - - if (test_partition_list_and_offsets_cmp(committed, orig_offsets)) { - TEST_SAY("commit() list:\n"); - test_print_partition_list(orig_offsets); - TEST_SAY("committed() list:\n"); - test_print_partition_list(committed); - TEST_FAIL("committed offsets don't match"); - } - - rd_kafka_topic_partition_list_destroy(committed); - - to_list = rd_kafka_topic_partition_list_new(orig_offsets->cnt); - for (i = 0; i < orig_offsets->cnt; i++) { - rd_kafka_topic_partition_list_add( - to_list, orig_offsets->elems[i].topic, - orig_offsets->elems[i].partition); - } - - if (null_toppars) { - cgoffsets = - rd_kafka_ListConsumerGroupOffsets_new(group_id, NULL); - } else { - cgoffsets = - rd_kafka_ListConsumerGroupOffsets_new(group_id, to_list); - } - - TIMING_START(&timing, "ListConsumerGroupOffsets"); - TEST_SAY("Call ListConsumerGroupOffsets\n"); - rd_kafka_ListConsumerGroupOffsets(rk, &cgoffsets, 1, options, q); - TIMING_ASSERT_LATER(&timing, 0, 50); - - rd_kafka_ListConsumerGroupOffsets_destroy(cgoffsets); - - TIMING_START(&timing, "ListConsumerGroupOffsets.queue_poll"); - /* Poll result queue for ListConsumerGroupOffsets result. - * Print but otherwise ignore other event types - * (typically generic Error events). */ - while (1) { - rkev = rd_kafka_queue_poll(q, tmout_multip(10 * 1000)); - TEST_SAY("ListConsumerGroupOffsets: got %s in %.3fms\n", - rd_kafka_event_name(rkev), - TIMING_DURATION(&timing) / 1000.0f); - if (rkev == NULL) - continue; - if (rd_kafka_event_error(rkev)) - TEST_SAY("%s: %s\n", rd_kafka_event_name(rkev), - rd_kafka_event_error_string(rkev)); - - if (rd_kafka_event_type(rkev) == - RD_KAFKA_EVENT_LISTCONSUMERGROUPOFFSETS_RESULT) - break; - - rd_kafka_event_destroy(rkev); - } - - /* Convert event to proper result */ - res = rd_kafka_event_ListConsumerGroupOffsets_result(rkev); - TEST_ASSERT(res, "expected ListConsumerGroupOffsets_result, not %s", - rd_kafka_event_name(rkev)); - - /* Expecting error */ - err = rd_kafka_event_error(rkev); - errstr2 = rd_kafka_event_error_string(rkev); - TEST_ASSERT(!err, - "expected ListConsumerGroupOffsets to succeed, " - "got %s (%s)", - rd_kafka_err2name(err), err ? errstr2 : "n/a"); - - TEST_SAY("ListConsumerGroupOffsets: returned %s (%s)\n", - rd_kafka_err2str(err), err ? errstr2 : "n/a"); - - gres = rd_kafka_ListConsumerGroupOffsets_result_groups(res, &gres_cnt); - TEST_ASSERT(gres && gres_cnt == 1, - "expected gres_cnt == 1, not %" PRIusz, gres_cnt); - - listd = rd_kafka_topic_partition_list_copy( - rd_kafka_group_result_partitions(gres[0])); - - if (test_partition_list_and_offsets_cmp(listd, orig_offsets)) { - TEST_SAY("Result list:\n"); - test_print_partition_list(listd); - TEST_SAY("Partitions passed to ListConsumerGroupOffsets:\n"); - test_print_partition_list(orig_offsets); - TEST_FAIL("listd/requested offsets don't match"); - } - - /* Verify expected errors */ - for (i = 0; i < listd->cnt; i++) { - TEST_ASSERT_LATER(listd->elems[i].err == exp_err, - "Result %s [%" PRId32 - "] has error %s, " - "expected %s", - listd->elems[i].topic, - listd->elems[i].partition, - rd_kafka_err2name(listd->elems[i].err), - rd_kafka_err2name(exp_err)); - } - - TEST_LATER_CHECK(); - - rd_kafka_topic_partition_list_destroy(listd); - rd_kafka_topic_partition_list_destroy(to_list); - - rd_kafka_event_destroy(rkev); - - rd_kafka_topic_partition_list_destroy(orig_offsets); - rd_kafka_topic_partition_list_destroy(subscription); - - for (i = 0; i < TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT; i++) - rd_free(topics[i]); - - rd_kafka_destroy(consumer); - - if (options) - rd_kafka_AdminOptions_destroy(options); - - if (!useq) - rd_kafka_queue_destroy(q); - - TEST_LATER_CHECK(); - -#undef TEST_LIST_CONSUMER_GROUP_OFFSETS_TOPIC_CNT - - SUB_TEST_PASS(); -} - -static void do_test_apis(rd_kafka_type_t cltype) { - rd_kafka_t *rk; - rd_kafka_conf_t *conf; - rd_kafka_queue_t *mainq; - - /* Get the available brokers, but use a separate rd_kafka_t instance - * so we don't jinx the tests by having up-to-date metadata. */ - avail_brokers = test_get_broker_ids(NULL, &avail_broker_cnt); - TEST_SAY("%" PRIusz - " brokers in cluster " - "which will be used for replica sets\n", - avail_broker_cnt); - - do_test_unclean_destroy(cltype, 0 /*tempq*/); - do_test_unclean_destroy(cltype, 1 /*mainq*/); - - test_conf_init(&conf, NULL, 180); - test_conf_set(conf, "socket.timeout.ms", "10000"); - rk = test_create_handle(cltype, conf); - - mainq = rd_kafka_queue_get_main(rk); - - /* Create topics */ - do_test_CreateTopics("temp queue, op timeout 0", rk, NULL, 0, 0); - do_test_CreateTopics("temp queue, op timeout 15000", rk, NULL, 15000, - 0); - do_test_CreateTopics( - "temp queue, op timeout 300, " - "validate only", - rk, NULL, 300, rd_true); - do_test_CreateTopics("temp queue, op timeout 9000, validate_only", rk, - NULL, 9000, rd_true); - do_test_CreateTopics("main queue, options", rk, mainq, -1, 0); - - /* Delete topics */ - do_test_DeleteTopics("temp queue, op timeout 0", rk, NULL, 0); - do_test_DeleteTopics("main queue, op timeout 15000", rk, mainq, 1500); - - if (test_broker_version >= TEST_BRKVER(1, 0, 0, 0)) { - /* Create Partitions */ - do_test_CreatePartitions("temp queue, op timeout 6500", rk, - NULL, 6500); - do_test_CreatePartitions("main queue, op timeout 0", rk, mainq, - 0); - } - - /* CreateAcls */ - do_test_CreateAcls(rk, mainq, 0); - do_test_CreateAcls(rk, mainq, 1); - - /* DescribeAcls */ - do_test_DescribeAcls(rk, mainq, 0); - do_test_DescribeAcls(rk, mainq, 1); - - /* DeleteAcls */ - do_test_DeleteAcls(rk, mainq, 0); - do_test_DeleteAcls(rk, mainq, 1); - - /* AlterConfigs */ - do_test_AlterConfigs(rk, mainq); - - /* DescribeConfigs */ - do_test_DescribeConfigs(rk, mainq); - - /* Delete records */ - do_test_DeleteRecords("temp queue, op timeout 0", rk, NULL, 0); - do_test_DeleteRecords("main queue, op timeout 1500", rk, mainq, 1500); - - /* List groups */ - do_test_ListConsumerGroups("temp queue", rk, NULL, -1, rd_false); - do_test_ListConsumerGroups("main queue", rk, mainq, 1500, rd_true); - - /* Describe groups */ - do_test_DescribeConsumerGroups("temp queue", rk, NULL, -1); - do_test_DescribeConsumerGroups("main queue", rk, mainq, 1500); - - /* Delete groups */ - do_test_DeleteGroups("temp queue", rk, NULL, -1); - do_test_DeleteGroups("main queue", rk, mainq, 1500); - - if (test_broker_version >= TEST_BRKVER(2, 4, 0, 0)) { - /* Delete committed offsets */ - do_test_DeleteConsumerGroupOffsets("temp queue", rk, NULL, -1, - rd_false); - do_test_DeleteConsumerGroupOffsets("main queue", rk, mainq, - 1500, rd_false); - do_test_DeleteConsumerGroupOffsets( - "main queue", rk, mainq, 1500, - rd_true /*with subscribing consumer*/); - - /* Alter committed offsets */ - do_test_AlterConsumerGroupOffsets("temp queue", rk, NULL, -1, - rd_false, rd_true); - do_test_AlterConsumerGroupOffsets("main queue", rk, mainq, 1500, - rd_false, rd_true); - do_test_AlterConsumerGroupOffsets( - "main queue, nonexistent topics", rk, mainq, 1500, rd_false, - rd_false /* don't create topics */); - do_test_AlterConsumerGroupOffsets( - "main queue", rk, mainq, 1500, - rd_true, /*with subscribing consumer*/ - rd_true); - - /* List committed offsets */ - do_test_ListConsumerGroupOffsets("temp queue", rk, NULL, -1, - rd_false, rd_false); - do_test_ListConsumerGroupOffsets( - "main queue, op timeout " - "1500", - rk, mainq, 1500, rd_false, rd_false); - do_test_ListConsumerGroupOffsets( - "main queue", rk, mainq, 1500, - rd_true /*with subscribing consumer*/, rd_false); - do_test_ListConsumerGroupOffsets("temp queue", rk, NULL, -1, - rd_false, rd_true); - do_test_ListConsumerGroupOffsets("main queue", rk, mainq, 1500, - rd_false, rd_true); - do_test_ListConsumerGroupOffsets( - "main queue", rk, mainq, 1500, - rd_true /*with subscribing consumer*/, rd_true); - } - - rd_kafka_queue_destroy(mainq); - - rd_kafka_destroy(rk); - - free(avail_brokers); -} - - -int main_0081_admin(int argc, char **argv) { - - do_test_apis(RD_KAFKA_PRODUCER); - - if (test_quick) { - TEST_SAY("Skipping further 0081 tests due to quick mode\n"); - return 0; - } - - do_test_apis(RD_KAFKA_CONSUMER); - - return 0; -} |