summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0080-admin_ut.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:07:37 +0000
commitb485aab7e71c1625cfc27e0f92c9509f42378458 (patch)
treeae9abe108601079d1679194de237c9a435ae5b55 /src/fluent-bit/lib/librdkafka-2.1.0/tests/0080-admin_ut.c
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz
netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0080-admin_ut.c')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0080-admin_ut.c2535
1 files changed, 2535 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0080-admin_ut.c b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0080-admin_ut.c
new file mode 100644
index 000000000..9d049e5b1
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0080-admin_ut.c
@@ -0,0 +1,2535 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2015, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "test.h"
+#include "rdkafka.h"
+
+/**
+ * @brief Admin API local dry-run unit-tests.
+ */
+
+#define MY_SOCKET_TIMEOUT_MS 100
+#define MY_SOCKET_TIMEOUT_MS_STR "100"
+
+
+
+static mtx_t last_event_lock;
+static cnd_t last_event_cnd;
+static rd_kafka_event_t *last_event = NULL;
+
+/**
+ * @brief The background event callback is called automatically
+ * by librdkafka from a background thread.
+ */
+static void
+background_event_cb(rd_kafka_t *rk, rd_kafka_event_t *rkev, void *opaque) {
+ mtx_lock(&last_event_lock);
+ TEST_ASSERT(!last_event,
+ "Multiple events seen in background_event_cb "
+ "(existing %s, new %s)",
+ rd_kafka_event_name(last_event), rd_kafka_event_name(rkev));
+ last_event = rkev;
+ mtx_unlock(&last_event_lock);
+ cnd_broadcast(&last_event_cnd);
+ rd_sleep(1);
+}
+
+static rd_kafka_event_t *wait_background_event_cb(void) {
+ rd_kafka_event_t *rkev;
+ mtx_lock(&last_event_lock);
+ while (!(rkev = last_event))
+ cnd_wait(&last_event_cnd, &last_event_lock);
+ last_event = NULL;
+ mtx_unlock(&last_event_lock);
+
+ return rkev;
+}
+
+
+/**
+ * @brief CreateTopics tests
+ *
+ *
+ *
+ */
+static void do_test_CreateTopics(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int with_background_event_cb,
+ int with_options) {
+ rd_kafka_queue_t *q;
+#define MY_NEW_TOPICS_CNT 6
+ rd_kafka_NewTopic_t *new_topics[MY_NEW_TOPICS_CNT];
+ rd_kafka_AdminOptions_t *options = NULL;
+ int exp_timeout = MY_SOCKET_TIMEOUT_MS;
+ int i;
+ char errstr[512];
+ const char *errstr2;
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ rd_kafka_event_t *rkev;
+ const rd_kafka_CreateTopics_result_t *res;
+ const rd_kafka_topic_result_t **restopics;
+ size_t restopic_cnt;
+ void *my_opaque = NULL, *opaque;
+
+ SUB_TEST_QUICK("%s CreateTopics with %s, timeout %dms",
+ rd_kafka_name(rk), what, exp_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ /**
+ * Construct NewTopic array with different properties for
+ * different partitions.
+ */
+ for (i = 0; i < MY_NEW_TOPICS_CNT; i++) {
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+ int num_parts = i * 51 + 1;
+ int num_replicas = jitter(1, MY_NEW_TOPICS_CNT - 1);
+ int set_config = (i & 2);
+ int set_replicas = !(i % 1);
+
+ new_topics[i] = rd_kafka_NewTopic_new(
+ topic, num_parts, set_replicas ? -1 : num_replicas, NULL,
+ 0);
+
+ if (set_config) {
+ /*
+ * Add various (unverified) configuration properties
+ */
+ err = rd_kafka_NewTopic_set_config(new_topics[i],
+ "dummy.doesntexist",
+ "butThere'sNothing "
+ "to verify that");
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ err = rd_kafka_NewTopic_set_config(
+ new_topics[i], "try.a.null.value", NULL);
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ err = rd_kafka_NewTopic_set_config(new_topics[i],
+ "or.empty", "");
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+ }
+
+
+ if (set_replicas) {
+ int32_t p;
+ int32_t replicas[MY_NEW_TOPICS_CNT];
+ int j;
+
+ for (j = 0; j < num_replicas; j++)
+ replicas[j] = j;
+
+ /*
+ * Set valid replica assignments
+ */
+ for (p = 0; p < num_parts; p++) {
+ /* Try adding an existing out of order,
+ * should fail */
+ if (p == 1) {
+ err =
+ rd_kafka_NewTopic_set_replica_assignment(
+ new_topics[i], p + 1, replicas,
+ num_replicas, errstr,
+ sizeof(errstr));
+ TEST_ASSERT(
+ err ==
+ RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "%s", rd_kafka_err2str(err));
+ }
+
+ err = rd_kafka_NewTopic_set_replica_assignment(
+ new_topics[i], p, replicas, num_replicas,
+ errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", errstr);
+ }
+
+ /* Try to add an existing partition, should fail */
+ err = rd_kafka_NewTopic_set_replica_assignment(
+ new_topics[i], 0, replicas, num_replicas, NULL, 0);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG, "%s",
+ rd_kafka_err2str(err));
+
+ } else {
+ int32_t dummy_replicas[1] = {1};
+
+ /* Test invalid partition */
+ err = rd_kafka_NewTopic_set_replica_assignment(
+ new_topics[i], num_parts + 1, dummy_replicas, 1,
+ errstr, sizeof(errstr));
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "%s: %s", rd_kafka_err2str(err),
+ err == RD_KAFKA_RESP_ERR_NO_ERROR ? ""
+ : errstr);
+
+ /* Setting replicas with with default replicas != -1
+ * is an error. */
+ err = rd_kafka_NewTopic_set_replica_assignment(
+ new_topics[i], 0, dummy_replicas, 1, errstr,
+ sizeof(errstr));
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "%s: %s", rd_kafka_err2str(err),
+ err == RD_KAFKA_RESP_ERR_NO_ERROR ? ""
+ : errstr);
+ }
+ }
+
+ if (with_options) {
+ options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
+
+ exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, exp_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ my_opaque = (void *)123;
+ rd_kafka_AdminOptions_set_opaque(options, my_opaque);
+ }
+
+ TIMING_START(&timing, "CreateTopics");
+ TEST_SAY("Call CreateTopics, timeout is %dms\n", exp_timeout);
+ rd_kafka_CreateTopics(rk, new_topics, MY_NEW_TOPICS_CNT, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ if (with_background_event_cb) {
+ /* Result event will be triggered by callback from
+ * librdkafka background queue thread. */
+ TIMING_START(&timing, "CreateTopics.wait_background_event_cb");
+ rkev = wait_background_event_cb();
+ } else {
+ /* Poll result queue */
+ TIMING_START(&timing, "CreateTopics.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ }
+
+ TIMING_ASSERT_LATER(&timing, exp_timeout - 100, exp_timeout + 100);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("CreateTopics: got %s in %.3fs\n", rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_CreateTopics_result(rkev);
+ TEST_ASSERT(res, "expected CreateTopics_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ opaque = rd_kafka_event_opaque(rkev);
+ TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
+ my_opaque, opaque);
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "expected CreateTopics to return error %s, not %s (%s)",
+ rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT),
+ rd_kafka_err2str(err), err ? errstr2 : "n/a");
+
+ /* Attempt to extract topics anyway, should return NULL. */
+ restopics = rd_kafka_CreateTopics_result_topics(res, &restopic_cnt);
+ TEST_ASSERT(!restopics && restopic_cnt == 0,
+ "expected no result_topics, got %p cnt %" PRIusz, restopics,
+ restopic_cnt);
+
+ rd_kafka_event_destroy(rkev);
+
+ rd_kafka_NewTopic_destroy_array(new_topics, MY_NEW_TOPICS_CNT);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ SUB_TEST_PASS();
+}
+
+
+
+/**
+ * @brief DeleteTopics tests
+ *
+ *
+ *
+ */
+static void do_test_DeleteTopics(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int with_options) {
+ rd_kafka_queue_t *q;
+#define MY_DEL_TOPICS_CNT 4
+ rd_kafka_DeleteTopic_t *del_topics[MY_DEL_TOPICS_CNT];
+ rd_kafka_AdminOptions_t *options = NULL;
+ int exp_timeout = MY_SOCKET_TIMEOUT_MS;
+ int i;
+ char errstr[512];
+ const char *errstr2;
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ rd_kafka_event_t *rkev;
+ const rd_kafka_DeleteTopics_result_t *res;
+ const rd_kafka_topic_result_t **restopics;
+ size_t restopic_cnt;
+ void *my_opaque = NULL, *opaque;
+
+ SUB_TEST_QUICK("%s DeleteTopics with %s, timeout %dms",
+ rd_kafka_name(rk), what, exp_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ for (i = 0; i < MY_DEL_TOPICS_CNT; i++)
+ del_topics[i] = rd_kafka_DeleteTopic_new(
+ test_mk_topic_name(__FUNCTION__, 1));
+
+ if (with_options) {
+ options = rd_kafka_AdminOptions_new(
+ rk, RD_KAFKA_ADMIN_OP_DELETETOPICS);
+
+ exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, exp_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ if (useq) {
+ my_opaque = (void *)456;
+ rd_kafka_AdminOptions_set_opaque(options, my_opaque);
+ }
+ }
+
+ TIMING_START(&timing, "DeleteTopics");
+ TEST_SAY("Call DeleteTopics, timeout is %dms\n", exp_timeout);
+ rd_kafka_DeleteTopics(rk, del_topics, MY_DEL_TOPICS_CNT, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ /* Poll result queue */
+ TIMING_START(&timing, "DeleteTopics.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ TIMING_ASSERT_LATER(&timing, exp_timeout - 100, exp_timeout + 100);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("DeleteTopics: got %s in %.3fs\n", rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_DeleteTopics_result(rkev);
+ TEST_ASSERT(res, "expected DeleteTopics_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ opaque = rd_kafka_event_opaque(rkev);
+ TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
+ my_opaque, opaque);
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "expected DeleteTopics to return error %s, not %s (%s)",
+ rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT),
+ rd_kafka_err2str(err), err ? errstr2 : "n/a");
+
+ /* Attempt to extract topics anyway, should return NULL. */
+ restopics = rd_kafka_DeleteTopics_result_topics(res, &restopic_cnt);
+ TEST_ASSERT(!restopics && restopic_cnt == 0,
+ "expected no result_topics, got %p cnt %" PRIusz, restopics,
+ restopic_cnt);
+
+ rd_kafka_event_destroy(rkev);
+
+ rd_kafka_DeleteTopic_destroy_array(del_topics, MY_DEL_TOPICS_CNT);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+#undef MY_DEL_TOPICS_CNT
+
+ SUB_TEST_QUICK();
+}
+
+/**
+ * @brief DeleteGroups tests
+ *
+ *
+ *
+ */
+static void do_test_DeleteGroups(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int with_options,
+ rd_bool_t destroy) {
+ rd_kafka_queue_t *q;
+#define MY_DEL_GROUPS_CNT 4
+ char *group_names[MY_DEL_GROUPS_CNT];
+ rd_kafka_DeleteGroup_t *del_groups[MY_DEL_GROUPS_CNT];
+ rd_kafka_AdminOptions_t *options = NULL;
+ int exp_timeout = MY_SOCKET_TIMEOUT_MS;
+ int i;
+ char errstr[512];
+ const char *errstr2;
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ rd_kafka_event_t *rkev;
+ const rd_kafka_DeleteGroups_result_t *res;
+ const rd_kafka_group_result_t **resgroups;
+ size_t resgroup_cnt;
+ void *my_opaque = NULL, *opaque;
+
+ SUB_TEST_QUICK("%s DeleteGroups with %s, timeout %dms",
+ rd_kafka_name(rk), what, exp_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ for (i = 0; i < MY_DEL_GROUPS_CNT; i++) {
+ group_names[i] = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
+ del_groups[i] = rd_kafka_DeleteGroup_new(group_names[i]);
+ }
+
+ if (with_options) {
+ options = rd_kafka_AdminOptions_new(
+ rk, RD_KAFKA_ADMIN_OP_DELETEGROUPS);
+
+ exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, exp_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ if (useq) {
+ my_opaque = (void *)456;
+ rd_kafka_AdminOptions_set_opaque(options, my_opaque);
+ }
+ }
+
+ TIMING_START(&timing, "DeleteGroups");
+ TEST_SAY("Call DeleteGroups, timeout is %dms\n", exp_timeout);
+ rd_kafka_DeleteGroups(rk, del_groups, MY_DEL_GROUPS_CNT, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ if (destroy)
+ goto destroy;
+
+ /* Poll result queue */
+ TIMING_START(&timing, "DeleteGroups.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ TIMING_ASSERT_LATER(&timing, exp_timeout - 100, exp_timeout + 100);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("DeleteGroups: got %s in %.3fs\n", rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_DeleteGroups_result(rkev);
+ TEST_ASSERT(res, "expected DeleteGroups_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ opaque = rd_kafka_event_opaque(rkev);
+ TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
+ my_opaque, opaque);
+
+ /* Expecting no error (errors will be per-group) */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR_NO_ERROR,
+ "expected DeleteGroups to return error %s, not %s (%s)",
+ rd_kafka_err2str(RD_KAFKA_RESP_ERR_NO_ERROR),
+ rd_kafka_err2str(err), err ? errstr2 : "n/a");
+
+ /* Extract groups, should return MY_DEL_GROUPS_CNT groups. */
+ resgroups = rd_kafka_DeleteGroups_result_groups(res, &resgroup_cnt);
+ TEST_ASSERT(resgroups && resgroup_cnt == MY_DEL_GROUPS_CNT,
+ "expected %d result_groups, got %p cnt %" PRIusz,
+ MY_DEL_GROUPS_CNT, resgroups, resgroup_cnt);
+
+ /* The returned groups should be in the original order, and
+ * should all have timed out. */
+ for (i = 0; i < MY_DEL_GROUPS_CNT; i++) {
+ TEST_ASSERT(!strcmp(group_names[i],
+ rd_kafka_group_result_name(resgroups[i])),
+ "expected group '%s' at position %d, not '%s'",
+ group_names[i], i,
+ rd_kafka_group_result_name(resgroups[i]));
+ TEST_ASSERT(rd_kafka_error_code(rd_kafka_group_result_error(
+ resgroups[i])) == RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "expected group '%s' to have timed out, got %s",
+ group_names[i],
+ rd_kafka_error_string(
+ rd_kafka_group_result_error(resgroups[i])));
+ }
+
+ rd_kafka_event_destroy(rkev);
+
+destroy:
+ for (i = 0; i < MY_DEL_GROUPS_CNT; i++) {
+ rd_kafka_DeleteGroup_destroy(del_groups[i]);
+ rd_free(group_names[i]);
+ }
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+#undef MY_DEL_GROUPS_CNT
+
+ SUB_TEST_QUICK();
+}
+
+/**
+ * @brief ListConsumerGroups tests
+ *
+ *
+ *
+ */
+static void do_test_ListConsumerGroups(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int with_options,
+ rd_bool_t destroy) {
+ rd_kafka_queue_t *q;
+ rd_kafka_AdminOptions_t *options = NULL;
+ int exp_timeout = MY_SOCKET_TIMEOUT_MS;
+ char errstr[512];
+ const char *errstr2;
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ rd_kafka_event_t *rkev;
+ const rd_kafka_ListConsumerGroups_result_t *res;
+ const rd_kafka_error_t **errors;
+ size_t errors_cnt, valid_cnt;
+ void *my_opaque = NULL, *opaque;
+
+ SUB_TEST_QUICK("%s ListConsumerGroups with %s, timeout %dms",
+ rd_kafka_name(rk), what, exp_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ if (with_options) {
+ rd_kafka_consumer_group_state_t duplicate[2] = {
+ RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY,
+ RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY};
+
+ options = rd_kafka_AdminOptions_new(
+ rk, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS);
+
+ /* Test duplicate error on match states */
+ rd_kafka_error_t *error =
+ rd_kafka_AdminOptions_set_match_consumer_group_states(
+ options, duplicate, 2);
+ TEST_ASSERT(error && rd_kafka_error_code(error), "%s",
+ "Expected error on duplicate states,"
+ " got no error");
+ rd_kafka_error_destroy(error);
+
+ exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
+ TEST_CALL_ERR__(rd_kafka_AdminOptions_set_request_timeout(
+ options, exp_timeout, errstr, sizeof(errstr)));
+
+ if (useq) {
+ my_opaque = (void *)456;
+ rd_kafka_AdminOptions_set_opaque(options, my_opaque);
+ }
+ }
+
+ TIMING_START(&timing, "ListConsumerGroups");
+ TEST_SAY("Call ListConsumerGroups, timeout is %dms\n", exp_timeout);
+ rd_kafka_ListConsumerGroups(rk, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ if (destroy)
+ goto destroy;
+
+ /* Poll result queue */
+ TIMING_START(&timing, "ListConsumerGroups.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ TIMING_ASSERT_LATER(&timing, exp_timeout - 100, exp_timeout + 100);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("ListConsumerGroups: got %s in %.3fs\n",
+ rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_ListConsumerGroups_result(rkev);
+ TEST_ASSERT(res, "expected ListConsumerGroups_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ opaque = rd_kafka_event_opaque(rkev);
+ TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
+ my_opaque, opaque);
+
+ /* Expecting no error here, the real error will be in the error array */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(
+ err == RD_KAFKA_RESP_ERR_NO_ERROR,
+ "expected ListConsumerGroups to return error %s, not %s (%s)",
+ rd_kafka_err2str(RD_KAFKA_RESP_ERR_NO_ERROR), rd_kafka_err2str(err),
+ err ? errstr2 : "n/a");
+
+ errors = rd_kafka_ListConsumerGroups_result_errors(rkev, &errors_cnt);
+ TEST_ASSERT(errors_cnt == 1, "expected one error, got %" PRIusz,
+ errors_cnt);
+ rd_kafka_ListConsumerGroups_result_valid(rkev, &valid_cnt);
+ TEST_ASSERT(valid_cnt == 0, "expected zero valid groups, got %" PRIusz,
+ valid_cnt);
+
+ err = rd_kafka_error_code(errors[0]);
+ errstr2 = rd_kafka_error_string(errors[0]);
+ TEST_ASSERT(
+ err == RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "expected ListConsumerGroups to return error %s, not %s (%s)",
+ rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT),
+ rd_kafka_err2str(err), err ? errstr2 : "n/a");
+
+ rd_kafka_event_destroy(rkev);
+
+destroy:
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief DescribeConsumerGroups tests
+ *
+ *
+ *
+ */
+static void do_test_DescribeConsumerGroups(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int with_options,
+ rd_bool_t destroy) {
+ rd_kafka_queue_t *q;
+#define TEST_DESCRIBE_CONSUMER_GROUPS_CNT 4
+ const char *group_names[TEST_DESCRIBE_CONSUMER_GROUPS_CNT];
+ rd_kafka_AdminOptions_t *options = NULL;
+ int exp_timeout = MY_SOCKET_TIMEOUT_MS;
+ int i;
+ char errstr[512];
+ const char *errstr2;
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ rd_kafka_event_t *rkev;
+ const rd_kafka_DeleteGroups_result_t *res;
+ const rd_kafka_ConsumerGroupDescription_t **resgroups;
+ size_t resgroup_cnt;
+ void *my_opaque = NULL, *opaque;
+
+ SUB_TEST_QUICK("%s DescribeConsumerGroups with %s, timeout %dms",
+ rd_kafka_name(rk), what, exp_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) {
+ group_names[i] = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
+ }
+
+ if (with_options) {
+ options = rd_kafka_AdminOptions_new(
+ rk, RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS);
+
+ exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, exp_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ if (useq) {
+ my_opaque = (void *)456;
+ rd_kafka_AdminOptions_set_opaque(options, my_opaque);
+ }
+ }
+
+ TIMING_START(&timing, "DescribeConsumerGroups");
+ TEST_SAY("Call DescribeConsumerGroups, timeout is %dms\n", exp_timeout);
+ rd_kafka_DescribeConsumerGroups(
+ rk, group_names, TEST_DESCRIBE_CONSUMER_GROUPS_CNT, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ if (destroy)
+ goto destroy;
+
+ /* Poll result queue */
+ TIMING_START(&timing, "DescribeConsumerGroups.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ TIMING_ASSERT_LATER(&timing, exp_timeout - 100, exp_timeout + 100);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("DescribeConsumerGroups: got %s in %.3fs\n",
+ rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_DescribeConsumerGroups_result(rkev);
+ TEST_ASSERT(res, "expected DescribeConsumerGroups_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ opaque = rd_kafka_event_opaque(rkev);
+ TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
+ my_opaque, opaque);
+
+ /* Expecting no error (errors will be per-group) */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(
+ err == RD_KAFKA_RESP_ERR_NO_ERROR,
+ "expected DescribeConsumerGroups to return error %s, not %s (%s)",
+ rd_kafka_err2str(RD_KAFKA_RESP_ERR_NO_ERROR), rd_kafka_err2str(err),
+ err ? errstr2 : "n/a");
+
+ /* Extract groups, should return TEST_DESCRIBE_GROUPS_CNT groups. */
+ resgroups =
+ rd_kafka_DescribeConsumerGroups_result_groups(res, &resgroup_cnt);
+ TEST_ASSERT(resgroups &&
+ resgroup_cnt == TEST_DESCRIBE_CONSUMER_GROUPS_CNT,
+ "expected %d result_groups, got %p cnt %" PRIusz,
+ TEST_DESCRIBE_CONSUMER_GROUPS_CNT, resgroups, resgroup_cnt);
+
+ /* The returned groups should be in the original order, and
+ * should all have timed out. */
+ for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) {
+ TEST_ASSERT(
+ !strcmp(group_names[i],
+ rd_kafka_ConsumerGroupDescription_group_id(
+ resgroups[i])),
+ "expected group '%s' at position %d, not '%s'",
+ group_names[i], i,
+ rd_kafka_ConsumerGroupDescription_group_id(resgroups[i]));
+ TEST_ASSERT(
+ rd_kafka_error_code(rd_kafka_ConsumerGroupDescription_error(
+ resgroups[i])) == RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "expected group '%s' to have timed out, got %s",
+ group_names[i],
+ rd_kafka_error_string(
+ rd_kafka_ConsumerGroupDescription_error(resgroups[i])));
+ }
+
+ rd_kafka_event_destroy(rkev);
+
+destroy:
+ for (i = 0; i < TEST_DESCRIBE_CONSUMER_GROUPS_CNT; i++) {
+ rd_free((char *)group_names[i]);
+ }
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+#undef TEST_DESCRIBE_CONSUMER_GROUPS_CNT
+
+ SUB_TEST_PASS();
+}
+
+static void do_test_DeleteRecords(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int with_options,
+ rd_bool_t destroy) {
+ rd_kafka_queue_t *q;
+#define MY_DEL_RECORDS_CNT 4
+ rd_kafka_AdminOptions_t *options = NULL;
+ rd_kafka_topic_partition_list_t *offsets = NULL;
+ rd_kafka_DeleteRecords_t *del_records;
+ const rd_kafka_DeleteRecords_result_t *res;
+ char *topics[MY_DEL_RECORDS_CNT];
+ int exp_timeout = MY_SOCKET_TIMEOUT_MS;
+ int i;
+ char errstr[512];
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ rd_kafka_event_t *rkev;
+ void *my_opaque = NULL, *opaque;
+
+ SUB_TEST_QUICK("%s DeleteRecords with %s, timeout %dms",
+ rd_kafka_name(rk), what, exp_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ for (i = 0; i < MY_DEL_RECORDS_CNT; i++) {
+ topics[i] = rd_strdup(test_mk_topic_name(__FUNCTION__, 1));
+ }
+
+ if (with_options) {
+ options = rd_kafka_AdminOptions_new(
+ rk, RD_KAFKA_ADMIN_OP_DELETERECORDS);
+
+ exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
+
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, exp_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ if (useq) {
+ my_opaque = (void *)4567;
+ rd_kafka_AdminOptions_set_opaque(options, my_opaque);
+ }
+ }
+
+ offsets = rd_kafka_topic_partition_list_new(MY_DEL_RECORDS_CNT);
+
+ for (i = 0; i < MY_DEL_RECORDS_CNT; i++)
+ rd_kafka_topic_partition_list_add(offsets, topics[i], i)
+ ->offset = RD_KAFKA_OFFSET_END;
+
+ del_records = rd_kafka_DeleteRecords_new(offsets);
+ rd_kafka_topic_partition_list_destroy(offsets);
+
+ TIMING_START(&timing, "DeleteRecords");
+ TEST_SAY("Call DeleteRecords, timeout is %dms\n", exp_timeout);
+ rd_kafka_DeleteRecords(rk, &del_records, 1, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 10);
+
+ rd_kafka_DeleteRecords_destroy(del_records);
+
+ if (destroy)
+ goto destroy;
+
+ /* Poll result queue */
+ TIMING_START(&timing, "DeleteRecords.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ TIMING_ASSERT(&timing, exp_timeout - 100, exp_timeout + 100);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("DeleteRecords: got %s in %.3fs\n", rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_DeleteRecords_result(rkev);
+ TEST_ASSERT(res, "expected DeleteRecords_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ opaque = rd_kafka_event_opaque(rkev);
+ TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
+ my_opaque, opaque);
+
+ /* Expecting error (pre-fanout leader_req will fail) */
+ err = rd_kafka_event_error(rkev);
+ TEST_ASSERT(err, "expected DeleteRecords to fail");
+
+ rd_kafka_event_destroy(rkev);
+
+destroy:
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ for (i = 0; i < MY_DEL_RECORDS_CNT; i++)
+ rd_free(topics[i]);
+
+#undef MY_DEL_RECORDS_CNT
+
+ SUB_TEST_PASS();
+}
+
+
+static void do_test_DeleteConsumerGroupOffsets(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int with_options) {
+ rd_kafka_queue_t *q;
+#define MY_DEL_CGRPOFFS_CNT 1
+ rd_kafka_AdminOptions_t *options = NULL;
+ const rd_kafka_DeleteConsumerGroupOffsets_result_t *res;
+ rd_kafka_DeleteConsumerGroupOffsets_t *cgoffsets[MY_DEL_CGRPOFFS_CNT];
+ int exp_timeout = MY_SOCKET_TIMEOUT_MS;
+ int i;
+ char errstr[512];
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ rd_kafka_event_t *rkev;
+ void *my_opaque = NULL, *opaque;
+
+ SUB_TEST_QUICK("%s DeleteConsumerGroupOffsets with %s, timeout %dms",
+ rd_kafka_name(rk), what, exp_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ for (i = 0; i < MY_DEL_CGRPOFFS_CNT; i++) {
+ rd_kafka_topic_partition_list_t *partitions =
+ rd_kafka_topic_partition_list_new(3);
+ rd_kafka_topic_partition_list_add(partitions, "topic1", 9);
+ rd_kafka_topic_partition_list_add(partitions, "topic3", 15);
+ rd_kafka_topic_partition_list_add(partitions, "topic1", 1);
+ cgoffsets[i] = rd_kafka_DeleteConsumerGroupOffsets_new(
+ "mygroup", partitions);
+ rd_kafka_topic_partition_list_destroy(partitions);
+ }
+
+ if (with_options) {
+ options = rd_kafka_AdminOptions_new(
+ rk, RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS);
+
+ exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
+
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, exp_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ if (useq) {
+ my_opaque = (void *)99981;
+ rd_kafka_AdminOptions_set_opaque(options, my_opaque);
+ }
+ }
+
+ TIMING_START(&timing, "DeleteConsumerGroupOffsets");
+ TEST_SAY("Call DeleteConsumerGroupOffsets, timeout is %dms\n",
+ exp_timeout);
+ rd_kafka_DeleteConsumerGroupOffsets(rk, cgoffsets, MY_DEL_CGRPOFFS_CNT,
+ options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 10);
+
+ /* Poll result queue */
+ TIMING_START(&timing, "DeleteConsumerGroupOffsets.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ TIMING_ASSERT(&timing, exp_timeout - 100, exp_timeout + 100);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("DeleteConsumerGroupOffsets: got %s in %.3fs\n",
+ rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_DeleteConsumerGroupOffsets_result(rkev);
+ TEST_ASSERT(res, "expected DeleteConsumerGroupOffsets_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ opaque = rd_kafka_event_opaque(rkev);
+ TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
+ my_opaque, opaque);
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ TEST_ASSERT(err, "expected DeleteConsumerGroupOffsets to fail");
+
+ rd_kafka_event_destroy(rkev);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ rd_kafka_DeleteConsumerGroupOffsets_destroy_array(cgoffsets,
+ MY_DEL_CGRPOFFS_CNT);
+
+#undef MY_DEL_CGRPOFFS_CNT
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief AclBinding tests
+ *
+ *
+ *
+ */
+static void do_test_AclBinding() {
+ int i;
+ char errstr[512];
+ rd_kafka_AclBinding_t *new_acl;
+
+ rd_bool_t valid_resource_types[] = {rd_false, rd_false, rd_true,
+ rd_true, rd_true, rd_false};
+ rd_bool_t valid_resource_pattern_types[] = {
+ rd_false, rd_false, rd_false, rd_true, rd_true, rd_false};
+ rd_bool_t valid_acl_operation[] = {
+ rd_false, rd_false, rd_true, rd_true, rd_true, rd_true, rd_true,
+ rd_true, rd_true, rd_true, rd_true, rd_true, rd_true, rd_false};
+ rd_bool_t valid_acl_permission_type[] = {rd_false, rd_false, rd_true,
+ rd_true, rd_false};
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+ const char *principal = "User:test";
+ const char *host = "*";
+
+ SUB_TEST_QUICK();
+
+ // Valid acl binding
+ *errstr = '\0';
+ new_acl = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic, RD_KAFKA_RESOURCE_PATTERN_LITERAL,
+ principal, host, RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ TEST_ASSERT(new_acl, "expected AclBinding");
+ rd_kafka_AclBinding_destroy(new_acl);
+
+ *errstr = '\0';
+ new_acl = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, NULL, RD_KAFKA_RESOURCE_PATTERN_LITERAL,
+ principal, host, RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ TEST_ASSERT(!new_acl && !strcmp(errstr, "Invalid resource name"),
+ "expected error string \"Invalid resource name\", not %s",
+ errstr);
+
+ *errstr = '\0';
+ new_acl = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic, RD_KAFKA_RESOURCE_PATTERN_LITERAL,
+ NULL, host, RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ TEST_ASSERT(!new_acl && !strcmp(errstr, "Invalid principal"),
+ "expected error string \"Invalid principal\", not %s",
+ errstr);
+
+ *errstr = '\0';
+ new_acl = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic, RD_KAFKA_RESOURCE_PATTERN_LITERAL,
+ principal, NULL, RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ TEST_ASSERT(!new_acl && !strcmp(errstr, "Invalid host"),
+ "expected error string \"Invalid host\", not %s", errstr);
+
+ for (i = -1; i <= RD_KAFKA_RESOURCE__CNT; i++) {
+ *errstr = '\0';
+ new_acl = rd_kafka_AclBinding_new(
+ i, topic, RD_KAFKA_RESOURCE_PATTERN_LITERAL, principal,
+ host, RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ if (i >= 0 && valid_resource_types[i]) {
+ TEST_ASSERT(new_acl, "expected AclBinding");
+ rd_kafka_AclBinding_destroy(new_acl);
+ } else
+ TEST_ASSERT(
+ !new_acl &&
+ !strcmp(errstr, "Invalid resource type"),
+ "expected error string \"Invalid resource type\", "
+ "not %s",
+ errstr);
+ }
+ for (i = -1; i <= RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT; i++) {
+ *errstr = '\0';
+ new_acl = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic, i, principal, host,
+ RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ if (i >= 0 && valid_resource_pattern_types[i]) {
+ TEST_ASSERT(new_acl, "expected AclBinding");
+ rd_kafka_AclBinding_destroy(new_acl);
+ } else
+ TEST_ASSERT(
+ !new_acl &&
+ !strcmp(errstr,
+ "Invalid resource pattern type"),
+ "expected error string \"Invalid resource pattern "
+ "type\", not %s",
+ errstr);
+ }
+ for (i = -1; i <= RD_KAFKA_ACL_OPERATION__CNT; i++) {
+ *errstr = '\0';
+ new_acl = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic,
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL, principal, host, i,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ if (i >= 0 && valid_acl_operation[i]) {
+ TEST_ASSERT(new_acl, "expected AclBinding");
+ rd_kafka_AclBinding_destroy(new_acl);
+ } else
+ TEST_ASSERT(!new_acl &&
+ !strcmp(errstr, "Invalid operation"),
+ "expected error string \"Invalid "
+ "operation\", not %s",
+ errstr);
+ }
+ for (i = -1; i <= RD_KAFKA_ACL_PERMISSION_TYPE__CNT; i++) {
+ *errstr = '\0';
+ new_acl = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic,
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL, principal, host,
+ RD_KAFKA_ACL_OPERATION_ALL, i, errstr, sizeof(errstr));
+ if (i >= 0 && valid_acl_permission_type[i]) {
+ TEST_ASSERT(new_acl, "expected AclBinding");
+ rd_kafka_AclBinding_destroy(new_acl);
+ } else
+ TEST_ASSERT(
+ !new_acl &&
+ !strcmp(errstr, "Invalid permission type"),
+ "expected error string \"permission type\", not %s",
+ errstr);
+ }
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief AclBindingFilter tests
+ *
+ *
+ *
+ */
+static void do_test_AclBindingFilter() {
+ int i;
+ char errstr[512];
+ rd_kafka_AclBindingFilter_t *new_acl_filter;
+
+ rd_bool_t valid_resource_types[] = {rd_false, rd_true, rd_true,
+ rd_true, rd_true, rd_false};
+ rd_bool_t valid_resource_pattern_types[] = {
+ rd_false, rd_true, rd_true, rd_true, rd_true, rd_false};
+ rd_bool_t valid_acl_operation[] = {
+ rd_false, rd_true, rd_true, rd_true, rd_true, rd_true, rd_true,
+ rd_true, rd_true, rd_true, rd_true, rd_true, rd_true, rd_false};
+ rd_bool_t valid_acl_permission_type[] = {rd_false, rd_true, rd_true,
+ rd_true, rd_false};
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+ const char *principal = "User:test";
+ const char *host = "*";
+
+ SUB_TEST_QUICK();
+
+ // Valid acl binding
+ *errstr = '\0';
+ new_acl_filter = rd_kafka_AclBindingFilter_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic, RD_KAFKA_RESOURCE_PATTERN_LITERAL,
+ principal, host, RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ TEST_ASSERT(new_acl_filter, "expected AclBindingFilter");
+ rd_kafka_AclBinding_destroy(new_acl_filter);
+
+ *errstr = '\0';
+ new_acl_filter = rd_kafka_AclBindingFilter_new(
+ RD_KAFKA_RESOURCE_TOPIC, NULL, RD_KAFKA_RESOURCE_PATTERN_LITERAL,
+ principal, host, RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ TEST_ASSERT(new_acl_filter, "expected AclBindingFilter");
+ rd_kafka_AclBinding_destroy(new_acl_filter);
+
+ *errstr = '\0';
+ new_acl_filter = rd_kafka_AclBindingFilter_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic, RD_KAFKA_RESOURCE_PATTERN_LITERAL,
+ NULL, host, RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ TEST_ASSERT(new_acl_filter, "expected AclBindingFilter");
+ rd_kafka_AclBinding_destroy(new_acl_filter);
+
+ *errstr = '\0';
+ new_acl_filter = rd_kafka_AclBindingFilter_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic, RD_KAFKA_RESOURCE_PATTERN_LITERAL,
+ principal, NULL, RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ TEST_ASSERT(new_acl_filter, "expected AclBindingFilter");
+ rd_kafka_AclBinding_destroy(new_acl_filter);
+
+ for (i = -1; i <= RD_KAFKA_RESOURCE__CNT; i++) {
+ *errstr = '\0';
+ new_acl_filter = rd_kafka_AclBindingFilter_new(
+ i, topic, RD_KAFKA_RESOURCE_PATTERN_LITERAL, principal,
+ host, RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ if (i >= 0 && valid_resource_types[i]) {
+ TEST_ASSERT(new_acl_filter,
+ "expected AclBindingFilter");
+ rd_kafka_AclBinding_destroy(new_acl_filter);
+ } else
+ TEST_ASSERT(
+ !new_acl_filter &&
+ !strcmp(errstr, "Invalid resource type"),
+ "expected error string \"Invalid resource type\", "
+ "not %s",
+ errstr);
+ }
+ for (i = -1; i <= RD_KAFKA_RESOURCE_PATTERN_TYPE__CNT; i++) {
+ *errstr = '\0';
+ new_acl_filter = rd_kafka_AclBindingFilter_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic, i, principal, host,
+ RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ if (i >= 0 && valid_resource_pattern_types[i]) {
+ TEST_ASSERT(new_acl_filter,
+ "expected AclBindingFilter");
+ rd_kafka_AclBinding_destroy(new_acl_filter);
+ } else
+ TEST_ASSERT(
+ !new_acl_filter &&
+ !strcmp(errstr,
+ "Invalid resource pattern type"),
+ "expected error string \"Invalid resource pattern "
+ "type\", not %s",
+ errstr);
+ }
+ for (i = -1; i <= RD_KAFKA_ACL_OPERATION__CNT; i++) {
+ *errstr = '\0';
+ new_acl_filter = rd_kafka_AclBindingFilter_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic,
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL, principal, host, i,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ if (i >= 0 && valid_acl_operation[i]) {
+ TEST_ASSERT(new_acl_filter,
+ "expected AclBindingFilter");
+ rd_kafka_AclBinding_destroy(new_acl_filter);
+ } else
+ TEST_ASSERT(!new_acl_filter &&
+ !strcmp(errstr, "Invalid operation"),
+ "expected error string \"Invalid "
+ "operation\", not %s",
+ errstr);
+ }
+ for (i = -1; i <= RD_KAFKA_ACL_PERMISSION_TYPE__CNT; i++) {
+ *errstr = '\0';
+ new_acl_filter = rd_kafka_AclBindingFilter_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic,
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL, principal, host,
+ RD_KAFKA_ACL_OPERATION_ALL, i, errstr, sizeof(errstr));
+ if (i >= 0 && valid_acl_permission_type[i]) {
+ TEST_ASSERT(new_acl_filter,
+ "expected AclBindingFilter");
+ rd_kafka_AclBinding_destroy(new_acl_filter);
+ } else
+ TEST_ASSERT(
+ !new_acl_filter &&
+ !strcmp(errstr, "Invalid permission type"),
+ "expected error string \"permission type\", not %s",
+ errstr);
+ }
+
+ SUB_TEST_PASS();
+}
+
+
+/**
+ * @brief CreateAcls tests
+ *
+ *
+ *
+ */
+static void do_test_CreateAcls(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ rd_bool_t with_background_event_cb,
+ rd_bool_t with_options) {
+ rd_kafka_queue_t *q;
+#define MY_NEW_ACLS_CNT 2
+ rd_kafka_AclBinding_t *new_acls[MY_NEW_ACLS_CNT];
+ rd_kafka_AdminOptions_t *options = NULL;
+ int exp_timeout = MY_SOCKET_TIMEOUT_MS;
+ int i;
+ char errstr[512];
+ const char *errstr2;
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ rd_kafka_event_t *rkev;
+ const rd_kafka_CreateAcls_result_t *res;
+ const rd_kafka_acl_result_t **resacls;
+ size_t resacls_cnt;
+ void *my_opaque = NULL, *opaque;
+ const char *principal = "User:test";
+ const char *host = "*";
+
+ SUB_TEST_QUICK("%s CreaetAcls with %s, timeout %dms", rd_kafka_name(rk),
+ what, exp_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ /**
+ * Construct AclBinding array
+ */
+ for (i = 0; i < MY_NEW_ACLS_CNT; i++) {
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+ new_acls[i] = rd_kafka_AclBinding_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic,
+ RD_KAFKA_RESOURCE_PATTERN_LITERAL, principal, host,
+ RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ }
+
+ if (with_options) {
+ options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
+
+ exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, exp_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ my_opaque = (void *)123;
+ rd_kafka_AdminOptions_set_opaque(options, my_opaque);
+ }
+
+ TIMING_START(&timing, "CreateAcls");
+ TEST_SAY("Call CreateAcls, timeout is %dms\n", exp_timeout);
+ rd_kafka_CreateAcls(rk, new_acls, MY_NEW_ACLS_CNT, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ if (with_background_event_cb) {
+ /* Result event will be triggered by callback from
+ * librdkafka background queue thread. */
+ TIMING_START(&timing, "CreateAcls.wait_background_event_cb");
+ rkev = wait_background_event_cb();
+ } else {
+ /* Poll result queue */
+ TIMING_START(&timing, "CreateAcls.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ }
+
+ TIMING_ASSERT_LATER(&timing, exp_timeout - 100, exp_timeout + 100);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("CreateAcls: got %s in %.3fs\n", rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_CreateAcls_result(rkev);
+ TEST_ASSERT(res, "expected CreateAcls_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ opaque = rd_kafka_event_opaque(rkev);
+ TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
+ my_opaque, opaque);
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "expected CreateAcls to return error %s, not %s (%s)",
+ rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT),
+ rd_kafka_err2str(err), err ? errstr2 : "n/a");
+
+ /* Attempt to extract acls results anyway, should return NULL. */
+ resacls = rd_kafka_CreateAcls_result_acls(res, &resacls_cnt);
+ TEST_ASSERT(!resacls && resacls_cnt == 0,
+ "expected no acl result, got %p cnt %" PRIusz, resacls,
+ resacls_cnt);
+
+ rd_kafka_event_destroy(rkev);
+
+ rd_kafka_AclBinding_destroy_array(new_acls, MY_NEW_ACLS_CNT);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+#undef MY_NEW_ACLS_CNT
+
+ SUB_TEST_PASS();
+}
+
+/**
+ * @brief DescribeAcls tests
+ *
+ *
+ *
+ */
+static void do_test_DescribeAcls(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ rd_bool_t with_background_event_cb,
+ rd_bool_t with_options) {
+ rd_kafka_queue_t *q;
+ rd_kafka_AclBindingFilter_t *describe_acls;
+ rd_kafka_AdminOptions_t *options = NULL;
+ int exp_timeout = MY_SOCKET_TIMEOUT_MS;
+ char errstr[512];
+ const char *errstr2;
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ rd_kafka_event_t *rkev;
+ const rd_kafka_DescribeAcls_result_t *res;
+ const rd_kafka_AclBinding_t **res_acls;
+ size_t res_acls_cnt;
+ void *my_opaque = NULL, *opaque;
+ const char *principal = "User:test";
+ const char *host = "*";
+
+ SUB_TEST_QUICK("%s DescribeAcls with %s, timeout %dms",
+ rd_kafka_name(rk), what, exp_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ /**
+ * Construct AclBindingFilter
+ */
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+ describe_acls = rd_kafka_AclBindingFilter_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic, RD_KAFKA_RESOURCE_PATTERN_PREFIXED,
+ principal, host, RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+
+ if (with_options) {
+ options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
+
+ exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, exp_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ my_opaque = (void *)123;
+ rd_kafka_AdminOptions_set_opaque(options, my_opaque);
+ }
+
+ TIMING_START(&timing, "DescribeAcls");
+ TEST_SAY("Call DescribeAcls, timeout is %dms\n", exp_timeout);
+ rd_kafka_DescribeAcls(rk, describe_acls, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ if (with_background_event_cb) {
+ /* Result event will be triggered by callback from
+ * librdkafka background queue thread. */
+ TIMING_START(&timing, "DescribeAcls.wait_background_event_cb");
+ rkev = wait_background_event_cb();
+ } else {
+ /* Poll result queue */
+ TIMING_START(&timing, "DescribeAcls.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ }
+
+ TIMING_ASSERT_LATER(&timing, exp_timeout - 100, exp_timeout + 100);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("DescribeAcls: got %s in %.3fs\n", rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_DescribeAcls_result(rkev);
+ TEST_ASSERT(res, "expected DescribeAcls_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ opaque = rd_kafka_event_opaque(rkev);
+ TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
+ my_opaque, opaque);
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "expected DescribeAcls to return error %s, not %s (%s)",
+ rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT),
+ rd_kafka_err2str(err), err ? errstr2 : "n/a");
+
+ /* Attempt to extract result acls anyway, should return NULL. */
+ res_acls = rd_kafka_DescribeAcls_result_acls(res, &res_acls_cnt);
+ TEST_ASSERT(!res_acls && res_acls_cnt == 0,
+ "expected no result acls, got %p cnt %" PRIusz, res_acls,
+ res_acls_cnt);
+
+ rd_kafka_event_destroy(rkev);
+
+ rd_kafka_AclBinding_destroy(describe_acls);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ SUB_TEST_PASS();
+}
+
+
+/**
+ * @brief DeleteAcls tests
+ *
+ *
+ *
+ */
+static void do_test_DeleteAcls(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ rd_bool_t with_background_event_cb,
+ rd_bool_t with_options) {
+#define DELETE_ACLS_FILTERS_CNT 2
+ rd_kafka_queue_t *q;
+ rd_kafka_AclBindingFilter_t *delete_acls[DELETE_ACLS_FILTERS_CNT];
+ rd_kafka_AdminOptions_t *options = NULL;
+ int exp_timeout = MY_SOCKET_TIMEOUT_MS;
+ int i;
+ char errstr[512];
+ const char *errstr2;
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ rd_kafka_event_t *rkev;
+ const rd_kafka_DeleteAcls_result_t *res;
+ const rd_kafka_DeleteAcls_result_response_t **res_response;
+ size_t res_response_cnt;
+ void *my_opaque = NULL, *opaque;
+ const char *principal = "User:test";
+ const char *host = "*";
+
+ SUB_TEST_QUICK("%s DeleteAcls with %s, timeout %dms", rd_kafka_name(rk),
+ what, exp_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ /**
+ * Construct AclBindingFilter array
+ */
+ for (i = 0; i < DELETE_ACLS_FILTERS_CNT; i++) {
+ const char *topic = test_mk_topic_name(__FUNCTION__, 1);
+ delete_acls[i] = rd_kafka_AclBindingFilter_new(
+ RD_KAFKA_RESOURCE_TOPIC, topic,
+ RD_KAFKA_RESOURCE_PATTERN_PREFIXED, principal, host,
+ RD_KAFKA_ACL_OPERATION_ALL,
+ RD_KAFKA_ACL_PERMISSION_TYPE_ALLOW, errstr, sizeof(errstr));
+ }
+
+ if (with_options) {
+ options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
+
+ exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, exp_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ my_opaque = (void *)123;
+ rd_kafka_AdminOptions_set_opaque(options, my_opaque);
+ }
+
+ TIMING_START(&timing, "DeleteAcls");
+ TEST_SAY("Call DeleteAcls, timeout is %dms\n", exp_timeout);
+ rd_kafka_DeleteAcls(rk, delete_acls, DELETE_ACLS_FILTERS_CNT, options,
+ q);
+ TIMING_ASSERT_LATER(&timing, 0, 50);
+
+ if (with_background_event_cb) {
+ /* Result event will be triggered by callback from
+ * librdkafka background queue thread. */
+ TIMING_START(&timing, "DeleteAcls.wait_background_event_cb");
+ rkev = wait_background_event_cb();
+ } else {
+ /* Poll result queue */
+ TIMING_START(&timing, "DeleteAcls.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ }
+
+ TIMING_ASSERT_LATER(&timing, exp_timeout - 100, exp_timeout + 100);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("DeleteAcls: got %s in %.3fs\n", rd_kafka_event_name(rkev),
+ TIMING_DURATION(&timing) / 1000.0f);
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_DeleteAcls_result(rkev);
+ TEST_ASSERT(res, "expected DeleteAcls_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ opaque = rd_kafka_event_opaque(rkev);
+ TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
+ my_opaque, opaque);
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ errstr2 = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "expected DeleteAcls to return error %s, not %s (%s)",
+ rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT),
+ rd_kafka_err2str(err), err ? errstr2 : "n/a");
+
+ /* Attempt to extract result responses anyway, should return NULL. */
+ res_response =
+ rd_kafka_DeleteAcls_result_responses(res, &res_response_cnt);
+ TEST_ASSERT(!res_response && res_response_cnt == 0,
+ "expected no result response, got %p cnt %" PRIusz,
+ res_response, res_response_cnt);
+
+ rd_kafka_event_destroy(rkev);
+
+ rd_kafka_AclBinding_destroy_array(delete_acls, DELETE_ACLS_FILTERS_CNT);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+#undef DELETE_ACLS_FILTERS_CNT
+
+ SUB_TEST_PASS();
+}
+
+
+static void do_test_AlterConsumerGroupOffsets(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int with_options) {
+ rd_kafka_queue_t *q;
+#define MY_ALTER_CGRPOFFS_CNT 1
+ rd_kafka_AdminOptions_t *options = NULL;
+ const rd_kafka_AlterConsumerGroupOffsets_result_t *res;
+ rd_kafka_AlterConsumerGroupOffsets_t *cgoffsets[MY_ALTER_CGRPOFFS_CNT];
+ rd_kafka_AlterConsumerGroupOffsets_t
+ *cgoffsets_empty[MY_ALTER_CGRPOFFS_CNT];
+ rd_kafka_AlterConsumerGroupOffsets_t
+ *cgoffsets_negative[MY_ALTER_CGRPOFFS_CNT];
+ rd_kafka_AlterConsumerGroupOffsets_t
+ *cgoffsets_duplicate[MY_ALTER_CGRPOFFS_CNT];
+ int exp_timeout = MY_SOCKET_TIMEOUT_MS;
+ int i;
+ char errstr[512];
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ rd_kafka_event_t *rkev;
+ void *my_opaque = NULL, *opaque;
+
+ SUB_TEST_QUICK("%s AlterConsumerGroupOffsets with %s, timeout %dms",
+ rd_kafka_name(rk), what, exp_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ for (i = 0; i < MY_ALTER_CGRPOFFS_CNT; i++) {
+ /* Call with three correct topic partitions. */
+ rd_kafka_topic_partition_list_t *partitions =
+ rd_kafka_topic_partition_list_new(3);
+ rd_kafka_topic_partition_list_add(partitions, "topic1", 9)
+ ->offset = 9;
+ rd_kafka_topic_partition_list_add(partitions, "topic3", 15)
+ ->offset = 15;
+ rd_kafka_topic_partition_list_add(partitions, "topic1", 1)
+ ->offset = 1;
+ cgoffsets[i] = rd_kafka_AlterConsumerGroupOffsets_new(
+ "mygroup", partitions);
+ rd_kafka_topic_partition_list_destroy(partitions);
+
+ /* Call with empty topic-partition list. */
+ rd_kafka_topic_partition_list_t *partitions_empty =
+ rd_kafka_topic_partition_list_new(0);
+ cgoffsets_empty[i] = rd_kafka_AlterConsumerGroupOffsets_new(
+ "mygroup", partitions_empty);
+ rd_kafka_topic_partition_list_destroy(partitions_empty);
+
+ /* Call with a topic-partition having negative offset. */
+ rd_kafka_topic_partition_list_t *partitions_negative =
+ rd_kafka_topic_partition_list_new(4);
+ rd_kafka_topic_partition_list_add(partitions_negative, "topic1",
+ 9)
+ ->offset = 9;
+ rd_kafka_topic_partition_list_add(partitions_negative, "topic3",
+ 15)
+ ->offset = 15;
+ rd_kafka_topic_partition_list_add(partitions_negative, "topic1",
+ 1)
+ ->offset = 1;
+ rd_kafka_topic_partition_list_add(partitions_negative, "topic1",
+ 2)
+ ->offset = -3;
+ cgoffsets_negative[i] = rd_kafka_AlterConsumerGroupOffsets_new(
+ "mygroup", partitions_negative);
+ rd_kafka_topic_partition_list_destroy(partitions_negative);
+
+ /* Call with duplicate partitions. */
+ rd_kafka_topic_partition_list_t *partitions_duplicate =
+ rd_kafka_topic_partition_list_new(3);
+ rd_kafka_topic_partition_list_add(partitions_duplicate,
+ "topic1", 9)
+ ->offset = 9;
+ rd_kafka_topic_partition_list_add(partitions_duplicate,
+ "topic3", 15)
+ ->offset = 15;
+ rd_kafka_topic_partition_list_add(partitions_duplicate,
+ "topic1", 9)
+ ->offset = 1;
+
+ cgoffsets_duplicate[i] = rd_kafka_AlterConsumerGroupOffsets_new(
+ "mygroup", partitions_duplicate);
+ rd_kafka_topic_partition_list_destroy(partitions_duplicate);
+ }
+
+ if (with_options) {
+ options = rd_kafka_AdminOptions_new(
+ rk, RD_KAFKA_ADMIN_OP_ALTERCONSUMERGROUPOFFSETS);
+
+ exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
+
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, exp_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ if (useq) {
+ my_opaque = (void *)99981;
+ rd_kafka_AdminOptions_set_opaque(options, my_opaque);
+ }
+ }
+
+ /* Empty topic-partition list */
+ TIMING_START(&timing, "AlterConsumerGroupOffsets");
+ TEST_SAY("Call AlterConsumerGroupOffsets, timeout is %dms\n",
+ exp_timeout);
+ rd_kafka_AlterConsumerGroupOffsets(rk, cgoffsets_empty,
+ MY_ALTER_CGRPOFFS_CNT, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 10);
+ rd_kafka_AlterConsumerGroupOffsets_destroy_array(cgoffsets_empty,
+ MY_ALTER_CGRPOFFS_CNT);
+
+ /* Poll result queue */
+ TIMING_START(&timing, "AlterConsumerGroupOffsets.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ TIMING_ASSERT(&timing, 0, 10);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("AlterConsumerGroupOffsets: got %s in %.3fs\n",
+ rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
+ /* Convert event to proper result */
+ res = rd_kafka_event_AlterConsumerGroupOffsets_result(rkev);
+ TEST_ASSERT(res, "expected AlterConsumerGroupOffsets_result, not %s",
+ rd_kafka_event_name(rkev));
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ const char *event_errstr_empty = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err, "expected AlterConsumerGroupOffsets to fail");
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "expected RD_KAFKA_RESP_ERR__INVALID_ARG, not %s",
+ rd_kafka_err2name(err));
+ TEST_ASSERT(strcmp(event_errstr_empty,
+ "Non-empty topic partition list must be present") ==
+ 0,
+ "expected \"Non-empty topic partition list must be "
+ "present\", not \"%s\"",
+ event_errstr_empty);
+ rd_kafka_event_destroy(rkev);
+
+ /* Negative topic-partition offset */
+ TIMING_START(&timing, "AlterConsumerGroupOffsets");
+ TEST_SAY("Call AlterConsumerGroupOffsets, timeout is %dms\n",
+ exp_timeout);
+ rd_kafka_AlterConsumerGroupOffsets(rk, cgoffsets_negative,
+ MY_ALTER_CGRPOFFS_CNT, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 10);
+ rd_kafka_AlterConsumerGroupOffsets_destroy_array(cgoffsets_negative,
+ MY_ALTER_CGRPOFFS_CNT);
+ /* Poll result queue */
+ TIMING_START(&timing, "AlterConsumerGroupOffsets.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ TIMING_ASSERT(&timing, 0, 10);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("AlterConsumerGroupOffsets: got %s in %.3fs\n",
+ rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
+ /* Convert event to proper result */
+ res = rd_kafka_event_AlterConsumerGroupOffsets_result(rkev);
+ TEST_ASSERT(res, "expected AlterConsumerGroupOffsets_result, not %s",
+ rd_kafka_event_name(rkev));
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ const char *event_errstr_negative = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err, "expected AlterConsumerGroupOffsets to fail");
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "expected RD_KAFKA_RESP_ERR__INVALID_ARG, not %s",
+ rd_kafka_err2name(err));
+ TEST_ASSERT(
+ strcmp(event_errstr_negative,
+ "All topic-partition offsets must be >= 0") == 0,
+ "expected \"All topic-partition offsets must be >= 0\", not \"%s\"",
+ event_errstr_negative);
+ rd_kafka_event_destroy(rkev);
+
+ /* Duplicate topic-partition offset */
+ TIMING_START(&timing, "AlterConsumerGroupOffsets");
+ TEST_SAY("Call AlterConsumerGroupOffsets, timeout is %dms\n",
+ exp_timeout);
+ rd_kafka_AlterConsumerGroupOffsets(rk, cgoffsets_duplicate,
+ MY_ALTER_CGRPOFFS_CNT, options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 10);
+ rd_kafka_AlterConsumerGroupOffsets_destroy_array(cgoffsets_duplicate,
+ MY_ALTER_CGRPOFFS_CNT);
+ /* Poll result queue */
+ TIMING_START(&timing, "AlterConsumerGroupOffsets.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ TIMING_ASSERT(&timing, 0, 10);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("AlterConsumerGroupOffsets: got %s in %.3fs\n",
+ rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
+ /* Convert event to proper result */
+ res = rd_kafka_event_AlterConsumerGroupOffsets_result(rkev);
+ TEST_ASSERT(res, "expected AlterConsumerGroupOffsets_result, not %s",
+ rd_kafka_event_name(rkev));
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ const char *event_errstr_duplicate = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err, "expected AlterConsumerGroupOffsets to fail");
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "expected RD_KAFKA_RESP_ERR__INVALID_ARG, not %s",
+ rd_kafka_err2name(err));
+ TEST_ASSERT(strcmp(event_errstr_duplicate,
+ "Duplicate partitions not allowed") == 0,
+ "expected \"Duplicate partitions not allowed\", not \"%s\"",
+ event_errstr_duplicate);
+ rd_kafka_event_destroy(rkev);
+
+ /* Correct topic-partition list, local timeout */
+ TIMING_START(&timing, "AlterConsumerGroupOffsets");
+ TEST_SAY("Call AlterConsumerGroupOffsets, timeout is %dms\n",
+ exp_timeout);
+ rd_kafka_AlterConsumerGroupOffsets(rk, cgoffsets, MY_ALTER_CGRPOFFS_CNT,
+ options, q);
+ TIMING_ASSERT_LATER(&timing, 0, 10);
+ /* Poll result queue */
+ TIMING_START(&timing, "AlterConsumerGroupOffsets.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ TIMING_ASSERT(&timing, exp_timeout - 100, exp_timeout + 100);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("AlterConsumerGroupOffsets: got %s in %.3fs\n",
+ rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
+ /* Convert event to proper result */
+ res = rd_kafka_event_AlterConsumerGroupOffsets_result(rkev);
+ TEST_ASSERT(res, "expected AlterConsumerGroupOffsets_result, not %s",
+ rd_kafka_event_name(rkev));
+ opaque = rd_kafka_event_opaque(rkev);
+ TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
+ my_opaque, opaque);
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ const char *event_errstr = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(err, "expected AlterConsumerGroupOffsets to fail");
+ TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "expected RD_KAFKA_RESP_ERR__TIMED_OUT, not %s",
+ rd_kafka_err2name(err));
+ TEST_ASSERT(strcmp(event_errstr,
+ "Failed while waiting for response from broker: "
+ "Local: Timed out") == 0,
+ "expected \"Failed while waiting for response from broker: "
+ "Local: Timed out\", not \"%s\"",
+ event_errstr);
+ rd_kafka_event_destroy(rkev);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+ rd_kafka_AlterConsumerGroupOffsets_destroy_array(cgoffsets,
+ MY_ALTER_CGRPOFFS_CNT);
+
+#undef MY_ALTER_CGRPOFFS_CNT
+
+ SUB_TEST_PASS();
+}
+
+
+static void do_test_ListConsumerGroupOffsets(const char *what,
+ rd_kafka_t *rk,
+ rd_kafka_queue_t *useq,
+ int with_options,
+ rd_bool_t null_toppars) {
+ rd_kafka_queue_t *q;
+#define MY_LIST_CGRPOFFS_CNT 1
+ rd_kafka_AdminOptions_t *options = NULL;
+ const rd_kafka_ListConsumerGroupOffsets_result_t *res;
+ rd_kafka_ListConsumerGroupOffsets_t *cgoffsets[MY_LIST_CGRPOFFS_CNT];
+ rd_kafka_ListConsumerGroupOffsets_t
+ *cgoffsets_empty[MY_LIST_CGRPOFFS_CNT];
+ rd_kafka_ListConsumerGroupOffsets_t
+ *cgoffsets_duplicate[MY_LIST_CGRPOFFS_CNT];
+ int exp_timeout = MY_SOCKET_TIMEOUT_MS;
+ int i;
+ char errstr[512];
+ rd_kafka_resp_err_t err;
+ test_timing_t timing;
+ rd_kafka_event_t *rkev;
+ void *my_opaque = NULL, *opaque;
+ const char *errstr_ptr;
+
+ SUB_TEST_QUICK("%s ListConsumerGroupOffsets with %s, timeout %dms",
+ rd_kafka_name(rk), what, exp_timeout);
+
+ q = useq ? useq : rd_kafka_queue_new(rk);
+
+ for (i = 0; i < MY_LIST_CGRPOFFS_CNT; i++) {
+ rd_kafka_topic_partition_list_t *partitions =
+ rd_kafka_topic_partition_list_new(3);
+ rd_kafka_topic_partition_list_add(partitions, "topic1", 9);
+ rd_kafka_topic_partition_list_add(partitions, "topic3", 15);
+ rd_kafka_topic_partition_list_add(partitions, "topic1", 1);
+ if (null_toppars) {
+ cgoffsets[i] = rd_kafka_ListConsumerGroupOffsets_new(
+ "mygroup", NULL);
+ } else {
+ cgoffsets[i] = rd_kafka_ListConsumerGroupOffsets_new(
+ "mygroup", partitions);
+ }
+ rd_kafka_topic_partition_list_destroy(partitions);
+
+ rd_kafka_topic_partition_list_t *partitions_empty =
+ rd_kafka_topic_partition_list_new(0);
+ cgoffsets_empty[i] = rd_kafka_ListConsumerGroupOffsets_new(
+ "mygroup", partitions_empty);
+ rd_kafka_topic_partition_list_destroy(partitions_empty);
+
+ partitions = rd_kafka_topic_partition_list_new(3);
+ rd_kafka_topic_partition_list_add(partitions, "topic1", 9);
+ rd_kafka_topic_partition_list_add(partitions, "topic3", 15);
+ rd_kafka_topic_partition_list_add(partitions, "topic1", 9);
+ cgoffsets_duplicate[i] = rd_kafka_ListConsumerGroupOffsets_new(
+ "mygroup", partitions);
+ rd_kafka_topic_partition_list_destroy(partitions);
+ }
+
+ if (with_options) {
+ options = rd_kafka_AdminOptions_new(
+ rk, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS);
+
+ exp_timeout = MY_SOCKET_TIMEOUT_MS * 2;
+
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, exp_timeout, errstr, sizeof(errstr));
+ TEST_ASSERT(!err, "%s", rd_kafka_err2str(err));
+
+ if (useq) {
+ my_opaque = (void *)99981;
+ rd_kafka_AdminOptions_set_opaque(options, my_opaque);
+ }
+ }
+
+ TEST_SAY(
+ "Call ListConsumerGroupOffsets with empty topic-partition list.\n");
+ rd_kafka_ListConsumerGroupOffsets(rk, cgoffsets_empty,
+ MY_LIST_CGRPOFFS_CNT, options, q);
+ rd_kafka_ListConsumerGroupOffsets_destroy_array(cgoffsets_empty,
+ MY_LIST_CGRPOFFS_CNT);
+ /* Poll result queue */
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ TEST_SAY("ListConsumerGroupOffsets: got %s\n",
+ rd_kafka_event_name(rkev));
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ TEST_ASSERT(err, "expected ListConsumerGroupOffsets to fail");
+
+ errstr_ptr = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(
+ !strcmp(errstr_ptr,
+ "NULL or non-empty topic partition list must be passed"),
+ "expected error string \"NULL or non-empty topic partition list "
+ "must be passed\", not %s",
+ errstr_ptr);
+
+ rd_kafka_event_destroy(rkev);
+
+
+ TEST_SAY(
+ "Call ListConsumerGroupOffsets with topic-partition list"
+ "containing duplicates.\n");
+ rd_kafka_ListConsumerGroupOffsets(rk, cgoffsets_duplicate, 1, options,
+ q);
+ rd_kafka_ListConsumerGroupOffsets_destroy_array(cgoffsets_duplicate,
+ MY_LIST_CGRPOFFS_CNT);
+ /* Poll result queue */
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ TEST_SAY("ListConsumerGroupOffsets: got %s\n",
+ rd_kafka_event_name(rkev));
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ TEST_ASSERT(err, "expected ListConsumerGroupOffsets to fail");
+
+ errstr_ptr = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(!strcmp(errstr_ptr, "Duplicate partitions not allowed"),
+ "expected error string \"Duplicate partitions not allowed\""
+ ", not %s",
+ errstr_ptr);
+
+ rd_kafka_event_destroy(rkev);
+
+
+ TIMING_START(&timing, "ListConsumerGroupOffsets");
+ TEST_SAY("Call ListConsumerGroupOffsets, timeout is %dms\n",
+ exp_timeout);
+ rd_kafka_ListConsumerGroupOffsets(rk, cgoffsets, MY_LIST_CGRPOFFS_CNT,
+ options, q);
+ rd_kafka_ListConsumerGroupOffsets_destroy_array(cgoffsets,
+ MY_LIST_CGRPOFFS_CNT);
+ TIMING_ASSERT_LATER(&timing, 0, 10);
+
+ /* Poll result queue */
+ TIMING_START(&timing, "ListConsumerGroupOffsets.queue_poll");
+ rkev = rd_kafka_queue_poll(q, exp_timeout + 1000);
+ TIMING_ASSERT(&timing, exp_timeout - 100, exp_timeout + 100);
+ TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout);
+ TEST_SAY("ListConsumerGroupOffsets: got %s in %.3fs\n",
+ rd_kafka_event_name(rkev), TIMING_DURATION(&timing) / 1000.0f);
+
+ /* Convert event to proper result */
+ res = rd_kafka_event_ListConsumerGroupOffsets_result(rkev);
+ TEST_ASSERT(res, "expected ListConsumerGroupOffsets_result, not %s",
+ rd_kafka_event_name(rkev));
+
+ opaque = rd_kafka_event_opaque(rkev);
+ TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p",
+ my_opaque, opaque);
+
+ /* Expecting error */
+ err = rd_kafka_event_error(rkev);
+ TEST_ASSERT(err, "expected ListConsumerGroupOffsets to fail");
+
+ errstr_ptr = rd_kafka_event_error_string(rkev);
+ TEST_ASSERT(!strcmp(errstr_ptr,
+ "Failed while waiting for response from broker: "
+ "Local: Timed out"),
+ "expected error string \"Failed while waiting for response "
+ "from broker: Local: Timed out\", not %s",
+ errstr_ptr);
+
+ rd_kafka_event_destroy(rkev);
+
+ if (options)
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (!useq)
+ rd_kafka_queue_destroy(q);
+
+#undef MY_LIST_CGRPOFFS_CNT
+
+ SUB_TEST_PASS();
+}
+
+
+/**
+ * @brief Test a mix of APIs using the same replyq.
+ *
+ * - Create topics A,B
+ * - Delete topic B
+ * - Create topic C
+ * - Delete groups A,B,C
+ * - Delete records from A,B,C
+ * - Create extra partitions for topic D
+ */
+static void do_test_mix(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
+ char *topics[] = {"topicA", "topicB", "topicC"};
+ int cnt = 0;
+ struct waiting {
+ rd_kafka_event_type_t evtype;
+ int seen;
+ };
+ struct waiting id1 = {RD_KAFKA_EVENT_CREATETOPICS_RESULT};
+ struct waiting id2 = {RD_KAFKA_EVENT_DELETETOPICS_RESULT};
+ struct waiting id3 = {RD_KAFKA_EVENT_CREATETOPICS_RESULT};
+ struct waiting id4 = {RD_KAFKA_EVENT_DELETEGROUPS_RESULT};
+ struct waiting id5 = {RD_KAFKA_EVENT_DELETERECORDS_RESULT};
+ struct waiting id6 = {RD_KAFKA_EVENT_CREATEPARTITIONS_RESULT};
+ struct waiting id7 = {RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT};
+ struct waiting id8 = {RD_KAFKA_EVENT_DELETECONSUMERGROUPOFFSETS_RESULT};
+ struct waiting id9 = {RD_KAFKA_EVENT_CREATETOPICS_RESULT};
+ rd_kafka_topic_partition_list_t *offsets;
+
+
+ SUB_TEST_QUICK();
+
+ offsets = rd_kafka_topic_partition_list_new(3);
+ rd_kafka_topic_partition_list_add(offsets, topics[0], 0)->offset =
+ RD_KAFKA_OFFSET_END;
+ rd_kafka_topic_partition_list_add(offsets, topics[1], 0)->offset =
+ RD_KAFKA_OFFSET_END;
+ rd_kafka_topic_partition_list_add(offsets, topics[2], 0)->offset =
+ RD_KAFKA_OFFSET_END;
+
+ test_CreateTopics_simple(rk, rkqu, topics, 2, 1, &id1);
+ test_DeleteTopics_simple(rk, rkqu, &topics[1], 1, &id2);
+ test_CreateTopics_simple(rk, rkqu, &topics[2], 1, 1, &id3);
+ test_DeleteGroups_simple(rk, rkqu, topics, 3, &id4);
+ test_DeleteRecords_simple(rk, rkqu, offsets, &id5);
+ test_CreatePartitions_simple(rk, rkqu, "topicD", 15, &id6);
+ test_DeleteConsumerGroupOffsets_simple(rk, rkqu, "mygroup", offsets,
+ &id7);
+ test_DeleteConsumerGroupOffsets_simple(rk, rkqu, NULL, NULL, &id8);
+ /* Use broker-side defaults for partition count */
+ test_CreateTopics_simple(rk, rkqu, topics, 2, -1, &id9);
+
+ rd_kafka_topic_partition_list_destroy(offsets);
+
+ while (cnt < 9) {
+ rd_kafka_event_t *rkev;
+ struct waiting *w;
+
+ rkev = rd_kafka_queue_poll(rkqu, -1);
+ TEST_ASSERT(rkev);
+
+ TEST_SAY("Got event %s: %s\n", rd_kafka_event_name(rkev),
+ rd_kafka_event_error_string(rkev));
+
+ w = rd_kafka_event_opaque(rkev);
+ TEST_ASSERT(w);
+
+ TEST_ASSERT(w->evtype == rd_kafka_event_type(rkev),
+ "Expected evtype %d, not %d (%s)", w->evtype,
+ rd_kafka_event_type(rkev),
+ rd_kafka_event_name(rkev));
+
+ TEST_ASSERT(w->seen == 0, "Duplicate results");
+
+ w->seen++;
+ cnt++;
+
+ rd_kafka_event_destroy(rkev);
+ }
+
+ SUB_TEST_PASS();
+}
+
+
+/**
+ * @brief Test AlterConfigs and DescribeConfigs
+ */
+static void do_test_configs(rd_kafka_t *rk, rd_kafka_queue_t *rkqu) {
+#define MY_CONFRES_CNT RD_KAFKA_RESOURCE__CNT + 2
+ rd_kafka_ConfigResource_t *configs[MY_CONFRES_CNT];
+ rd_kafka_AdminOptions_t *options;
+ rd_kafka_event_t *rkev;
+ rd_kafka_resp_err_t err;
+ const rd_kafka_AlterConfigs_result_t *res;
+ const rd_kafka_ConfigResource_t **rconfigs;
+ size_t rconfig_cnt;
+ char errstr[128];
+ int i;
+
+ SUB_TEST_QUICK();
+
+ /* Check invalids */
+ configs[0] = rd_kafka_ConfigResource_new((rd_kafka_ResourceType_t)-1,
+ "something");
+ TEST_ASSERT(!configs[0]);
+
+ configs[0] =
+ rd_kafka_ConfigResource_new((rd_kafka_ResourceType_t)0, NULL);
+ TEST_ASSERT(!configs[0]);
+
+
+ for (i = 0; i < MY_CONFRES_CNT; i++) {
+ int set_config = !(i % 2);
+
+ /* librdkafka shall not limit the use of illogical
+ * or unknown settings, they are enforced by the broker. */
+ configs[i] = rd_kafka_ConfigResource_new(
+ (rd_kafka_ResourceType_t)i, "3");
+ TEST_ASSERT(configs[i] != NULL);
+
+ if (set_config) {
+ rd_kafka_ConfigResource_set_config(configs[i],
+ "some.conf",
+ "which remains "
+ "unchecked");
+ rd_kafka_ConfigResource_set_config(
+ configs[i], "some.conf.null", NULL);
+ }
+ }
+
+
+ options = rd_kafka_AdminOptions_new(rk, RD_KAFKA_ADMIN_OP_ANY);
+ err = rd_kafka_AdminOptions_set_request_timeout(options, 1000, errstr,
+ sizeof(errstr));
+ TEST_ASSERT(!err, "%s", errstr);
+
+ /* AlterConfigs */
+ rd_kafka_AlterConfigs(rk, configs, MY_CONFRES_CNT, options, rkqu);
+
+ rkev = test_wait_admin_result(rkqu, RD_KAFKA_EVENT_ALTERCONFIGS_RESULT,
+ 2000);
+
+ TEST_ASSERT(rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "Expected timeout, not %s",
+ rd_kafka_event_error_string(rkev));
+
+ res = rd_kafka_event_AlterConfigs_result(rkev);
+ TEST_ASSERT(res);
+
+ rconfigs = rd_kafka_AlterConfigs_result_resources(res, &rconfig_cnt);
+ TEST_ASSERT(!rconfigs && !rconfig_cnt,
+ "Expected no result resources, got %" PRIusz, rconfig_cnt);
+
+ rd_kafka_event_destroy(rkev);
+
+ /* DescribeConfigs: reuse same configs and options */
+ rd_kafka_DescribeConfigs(rk, configs, MY_CONFRES_CNT, options, rkqu);
+
+ rd_kafka_AdminOptions_destroy(options);
+ rd_kafka_ConfigResource_destroy_array(configs, MY_CONFRES_CNT);
+
+ rkev = test_wait_admin_result(
+ rkqu, RD_KAFKA_EVENT_DESCRIBECONFIGS_RESULT, 2000);
+
+ TEST_ASSERT(rd_kafka_event_error(rkev) == RD_KAFKA_RESP_ERR__TIMED_OUT,
+ "Expected timeout, not %s",
+ rd_kafka_event_error_string(rkev));
+
+ res = rd_kafka_event_DescribeConfigs_result(rkev);
+ TEST_ASSERT(res);
+
+ rconfigs = rd_kafka_DescribeConfigs_result_resources(res, &rconfig_cnt);
+ TEST_ASSERT(!rconfigs && !rconfig_cnt,
+ "Expected no result resources, got %" PRIusz, rconfig_cnt);
+
+ rd_kafka_event_destroy(rkev);
+
+ SUB_TEST_PASS();
+}
+
+
+/**
+ * @brief Verify that an unclean rd_kafka_destroy() does not hang or crash.
+ */
+static void do_test_unclean_destroy(rd_kafka_type_t cltype, int with_mainq) {
+ rd_kafka_t *rk;
+ char errstr[512];
+ rd_kafka_conf_t *conf;
+ rd_kafka_queue_t *q;
+ rd_kafka_event_t *rkev;
+ rd_kafka_DeleteTopic_t *topic;
+ test_timing_t t_destroy;
+
+ SUB_TEST_QUICK("Test unclean destroy using %s",
+ with_mainq ? "mainq" : "tempq");
+
+ test_conf_init(&conf, NULL, 0);
+ /* Remove brokers, if any, since this is a local test and we
+ * rely on the controller not being found. */
+ test_conf_set(conf, "bootstrap.servers", "");
+ test_conf_set(conf, "socket.timeout.ms", "60000");
+
+ rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr));
+ TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr);
+
+ if (with_mainq)
+ q = rd_kafka_queue_get_main(rk);
+ else
+ q = rd_kafka_queue_new(rk);
+
+ topic = rd_kafka_DeleteTopic_new("test");
+ rd_kafka_DeleteTopics(rk, &topic, 1, NULL, q);
+ rd_kafka_DeleteTopic_destroy(topic);
+
+ /* We're not expecting a result yet since DeleteTopics will attempt
+ * to look up the controller for socket.timeout.ms (1 minute). */
+ rkev = rd_kafka_queue_poll(q, 100);
+ TEST_ASSERT(!rkev, "Did not expect result: %s",
+ rd_kafka_event_name(rkev));
+
+ rd_kafka_queue_destroy(q);
+
+ TEST_SAY(
+ "Giving rd_kafka_destroy() 5s to finish, "
+ "despite Admin API request being processed\n");
+ test_timeout_set(5);
+ TIMING_START(&t_destroy, "rd_kafka_destroy()");
+ rd_kafka_destroy(rk);
+ TIMING_STOP(&t_destroy);
+
+ SUB_TEST_PASS();
+
+ /* Restore timeout */
+ test_timeout_set(60);
+}
+
+
+/**
+ * @brief Test AdminOptions
+ */
+static void do_test_options(rd_kafka_t *rk) {
+#define _all_apis \
+ { \
+ RD_KAFKA_ADMIN_OP_CREATETOPICS, \
+ RD_KAFKA_ADMIN_OP_DELETETOPICS, \
+ RD_KAFKA_ADMIN_OP_CREATEPARTITIONS, \
+ RD_KAFKA_ADMIN_OP_ALTERCONFIGS, \
+ RD_KAFKA_ADMIN_OP_DESCRIBECONFIGS, \
+ RD_KAFKA_ADMIN_OP_DELETERECORDS, \
+ RD_KAFKA_ADMIN_OP_CREATEACLS, \
+ RD_KAFKA_ADMIN_OP_DESCRIBEACLS, \
+ RD_KAFKA_ADMIN_OP_DELETEACLS, \
+ RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS, \
+ RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS, \
+ RD_KAFKA_ADMIN_OP_DELETEGROUPS, \
+ RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS, \
+ RD_KAFKA_ADMIN_OP_ALTERCONSUMERGROUPOFFSETS, \
+ RD_KAFKA_ADMIN_OP_DELETECONSUMERGROUPOFFSETS, \
+ RD_KAFKA_ADMIN_OP_ANY /* Must be last */ \
+ }
+ struct {
+ const char *setter;
+ const rd_kafka_admin_op_t valid_apis[16];
+ } matrix[] = {
+ {"request_timeout", _all_apis},
+ {"operation_timeout",
+ {RD_KAFKA_ADMIN_OP_CREATETOPICS, RD_KAFKA_ADMIN_OP_DELETETOPICS,
+ RD_KAFKA_ADMIN_OP_CREATEPARTITIONS,
+ RD_KAFKA_ADMIN_OP_DELETERECORDS}},
+ {"validate_only",
+ {RD_KAFKA_ADMIN_OP_CREATETOPICS,
+ RD_KAFKA_ADMIN_OP_CREATEPARTITIONS,
+ RD_KAFKA_ADMIN_OP_ALTERCONFIGS}},
+ {"broker", _all_apis},
+ {"require_stable_offsets",
+ {RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPOFFSETS}},
+ {"match_consumer_group_states",
+ {RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS}},
+ {"opaque", _all_apis},
+ {NULL},
+ };
+ int i;
+ rd_kafka_AdminOptions_t *options;
+ rd_kafka_consumer_group_state_t state[1] = {
+ RD_KAFKA_CONSUMER_GROUP_STATE_STABLE};
+
+ SUB_TEST_QUICK();
+
+ for (i = 0; matrix[i].setter; i++) {
+ static const rd_kafka_admin_op_t all_apis[] = _all_apis;
+ const rd_kafka_admin_op_t *for_api;
+
+ for (for_api = all_apis;; for_api++) {
+ rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ rd_kafka_resp_err_t exp_err =
+ RD_KAFKA_RESP_ERR_NO_ERROR;
+ rd_kafka_error_t *error = NULL;
+ char errstr[512];
+ int fi;
+
+ options = rd_kafka_AdminOptions_new(rk, *for_api);
+ TEST_ASSERT(options, "AdminOptions_new(%d) failed",
+ *for_api);
+
+ if (!strcmp(matrix[i].setter, "request_timeout"))
+ err = rd_kafka_AdminOptions_set_request_timeout(
+ options, 1234, errstr, sizeof(errstr));
+ else if (!strcmp(matrix[i].setter, "operation_timeout"))
+ err =
+ rd_kafka_AdminOptions_set_operation_timeout(
+ options, 12345, errstr, sizeof(errstr));
+ else if (!strcmp(matrix[i].setter, "validate_only"))
+ err = rd_kafka_AdminOptions_set_validate_only(
+ options, 1, errstr, sizeof(errstr));
+ else if (!strcmp(matrix[i].setter, "broker"))
+ err = rd_kafka_AdminOptions_set_broker(
+ options, 5, errstr, sizeof(errstr));
+ else if (!strcmp(matrix[i].setter,
+ "require_stable_offsets"))
+ error =
+ rd_kafka_AdminOptions_set_require_stable_offsets(
+ options, 0);
+ else if (!strcmp(matrix[i].setter,
+ "match_consumer_group_states"))
+ error =
+ rd_kafka_AdminOptions_set_match_consumer_group_states(
+ options, state, 1);
+ else if (!strcmp(matrix[i].setter, "opaque")) {
+ rd_kafka_AdminOptions_set_opaque(
+ options, (void *)options);
+ err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ } else
+ TEST_FAIL("Invalid setter: %s",
+ matrix[i].setter);
+
+ if (error) {
+ err = rd_kafka_error_code(error);
+ snprintf(errstr, sizeof(errstr), "%s",
+ rd_kafka_error_string(error));
+ rd_kafka_error_destroy(error);
+ }
+
+
+ TEST_SAYL(3,
+ "AdminOptions_set_%s on "
+ "RD_KAFKA_ADMIN_OP_%d options "
+ "returned %s: %s\n",
+ matrix[i].setter, *for_api,
+ rd_kafka_err2name(err),
+ err ? errstr : "success");
+
+ /* Scan matrix valid_apis to see if this
+ * setter should be accepted or not. */
+ if (exp_err) {
+ /* An expected error is already set */
+ } else if (*for_api != RD_KAFKA_ADMIN_OP_ANY) {
+ exp_err = RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ for (fi = 0; matrix[i].valid_apis[fi]; fi++) {
+ if (matrix[i].valid_apis[fi] ==
+ *for_api)
+ exp_err =
+ RD_KAFKA_RESP_ERR_NO_ERROR;
+ }
+ } else {
+ exp_err = RD_KAFKA_RESP_ERR_NO_ERROR;
+ }
+
+ if (err != exp_err)
+ TEST_FAIL_LATER(
+ "Expected AdminOptions_set_%s "
+ "for RD_KAFKA_ADMIN_OP_%d "
+ "options to return %s, "
+ "not %s",
+ matrix[i].setter, *for_api,
+ rd_kafka_err2name(exp_err),
+ rd_kafka_err2name(err));
+
+ rd_kafka_AdminOptions_destroy(options);
+
+ if (*for_api == RD_KAFKA_ADMIN_OP_ANY)
+ break; /* This was the last one */
+ }
+ }
+
+ /* Try an invalid for_api */
+ options = rd_kafka_AdminOptions_new(rk, (rd_kafka_admin_op_t)1234);
+ TEST_ASSERT(!options,
+ "Expected AdminOptions_new() to fail "
+ "with an invalid for_api, didn't.");
+
+ TEST_LATER_CHECK();
+
+ SUB_TEST_PASS();
+}
+
+
+static rd_kafka_t *create_admin_client(rd_kafka_type_t cltype) {
+ rd_kafka_t *rk;
+ char errstr[512];
+ rd_kafka_conf_t *conf;
+
+ test_conf_init(&conf, NULL, 0);
+ /* Remove brokers, if any, since this is a local test and we
+ * rely on the controller not being found. */
+ test_conf_set(conf, "bootstrap.servers", "");
+ test_conf_set(conf, "socket.timeout.ms", MY_SOCKET_TIMEOUT_MS_STR);
+ /* For use with the background queue */
+ rd_kafka_conf_set_background_event_cb(conf, background_event_cb);
+
+ rk = rd_kafka_new(cltype, conf, errstr, sizeof(errstr));
+ TEST_ASSERT(rk, "kafka_new(%d): %s", cltype, errstr);
+
+ return rk;
+}
+
+
+static void do_test_apis(rd_kafka_type_t cltype) {
+ rd_kafka_t *rk;
+ rd_kafka_queue_t *mainq, *backgroundq;
+
+ mtx_init(&last_event_lock, mtx_plain);
+ cnd_init(&last_event_cnd);
+
+ do_test_unclean_destroy(cltype, 0 /*tempq*/);
+ do_test_unclean_destroy(cltype, 1 /*mainq*/);
+
+ rk = create_admin_client(cltype);
+
+ mainq = rd_kafka_queue_get_main(rk);
+ backgroundq = rd_kafka_queue_get_background(rk);
+
+ do_test_options(rk);
+
+ do_test_CreateTopics("temp queue, no options", rk, NULL, 0, 0);
+ do_test_CreateTopics("temp queue, no options, background_event_cb", rk,
+ backgroundq, 1, 0);
+ do_test_CreateTopics("temp queue, options", rk, NULL, 0, 1);
+ do_test_CreateTopics("main queue, options", rk, mainq, 0, 1);
+
+ do_test_DeleteTopics("temp queue, no options", rk, NULL, 0);
+ do_test_DeleteTopics("temp queue, options", rk, NULL, 1);
+ do_test_DeleteTopics("main queue, options", rk, mainq, 1);
+
+ do_test_ListConsumerGroups("temp queue, no options", rk, NULL, 0,
+ rd_false);
+ do_test_ListConsumerGroups("temp queue, options", rk, NULL, 1,
+ rd_false);
+ do_test_ListConsumerGroups("main queue", rk, mainq, 0, rd_false);
+
+ do_test_DescribeConsumerGroups("temp queue, no options", rk, NULL, 0,
+ rd_false);
+ do_test_DescribeConsumerGroups("temp queue, options", rk, NULL, 1,
+ rd_false);
+ do_test_DescribeConsumerGroups("main queue, options", rk, mainq, 1,
+ rd_false);
+
+ do_test_DeleteGroups("temp queue, no options", rk, NULL, 0, rd_false);
+ do_test_DeleteGroups("temp queue, options", rk, NULL, 1, rd_false);
+ do_test_DeleteGroups("main queue, options", rk, mainq, 1, rd_false);
+
+ do_test_DeleteRecords("temp queue, no options", rk, NULL, 0, rd_false);
+ do_test_DeleteRecords("temp queue, options", rk, NULL, 1, rd_false);
+ do_test_DeleteRecords("main queue, options", rk, mainq, 1, rd_false);
+
+ do_test_DeleteConsumerGroupOffsets("temp queue, no options", rk, NULL,
+ 0);
+ do_test_DeleteConsumerGroupOffsets("temp queue, options", rk, NULL, 1);
+ do_test_DeleteConsumerGroupOffsets("main queue, options", rk, mainq, 1);
+
+ do_test_AclBinding();
+ do_test_AclBindingFilter();
+
+ do_test_CreateAcls("temp queue, no options", rk, NULL, rd_false,
+ rd_false);
+ do_test_CreateAcls("temp queue, options", rk, NULL, rd_false, rd_true);
+ do_test_CreateAcls("main queue, options", rk, mainq, rd_false, rd_true);
+
+ do_test_DescribeAcls("temp queue, no options", rk, NULL, rd_false,
+ rd_false);
+ do_test_DescribeAcls("temp queue, options", rk, NULL, rd_false,
+ rd_true);
+ do_test_DescribeAcls("main queue, options", rk, mainq, rd_false,
+ rd_true);
+
+ do_test_DeleteAcls("temp queue, no options", rk, NULL, rd_false,
+ rd_false);
+ do_test_DeleteAcls("temp queue, options", rk, NULL, rd_false, rd_true);
+ do_test_DeleteAcls("main queue, options", rk, mainq, rd_false, rd_true);
+
+ do_test_AlterConsumerGroupOffsets("temp queue, no options", rk, NULL,
+ 0);
+ do_test_AlterConsumerGroupOffsets("temp queue, options", rk, NULL, 1);
+ do_test_AlterConsumerGroupOffsets("main queue, options", rk, mainq, 1);
+
+ do_test_ListConsumerGroupOffsets("temp queue, no options", rk, NULL, 0,
+ rd_false);
+ do_test_ListConsumerGroupOffsets("temp queue, options", rk, NULL, 1,
+ rd_false);
+ do_test_ListConsumerGroupOffsets("main queue, options", rk, mainq, 1,
+ rd_false);
+ do_test_ListConsumerGroupOffsets("temp queue, no options", rk, NULL, 0,
+ rd_true);
+ do_test_ListConsumerGroupOffsets("temp queue, options", rk, NULL, 1,
+ rd_true);
+ do_test_ListConsumerGroupOffsets("main queue, options", rk, mainq, 1,
+ rd_true);
+
+ do_test_mix(rk, mainq);
+
+ do_test_configs(rk, mainq);
+
+ rd_kafka_queue_destroy(backgroundq);
+ rd_kafka_queue_destroy(mainq);
+
+ rd_kafka_destroy(rk);
+
+ /*
+ * Tests which require a unique unused client instance.
+ */
+ rk = create_admin_client(cltype);
+ mainq = rd_kafka_queue_get_main(rk);
+ do_test_DeleteRecords("main queue, options, destroy", rk, mainq, 1,
+ rd_true /*destroy instance before finishing*/);
+ rd_kafka_queue_destroy(mainq);
+ rd_kafka_destroy(rk);
+
+ rk = create_admin_client(cltype);
+ mainq = rd_kafka_queue_get_main(rk);
+ do_test_DeleteGroups("main queue, options, destroy", rk, mainq, 1,
+ rd_true /*destroy instance before finishing*/);
+ rd_kafka_queue_destroy(mainq);
+ rd_kafka_destroy(rk);
+
+
+ /* Done */
+ mtx_destroy(&last_event_lock);
+ cnd_destroy(&last_event_cnd);
+}
+
+
+int main_0080_admin_ut(int argc, char **argv) {
+ do_test_apis(RD_KAFKA_PRODUCER);
+ do_test_apis(RD_KAFKA_CONSUMER);
+ return 0;
+}