summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0109-auto_create_topics.cpp
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 11:19:16 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:07:37 +0000
commitb485aab7e71c1625cfc27e0f92c9509f42378458 (patch)
treeae9abe108601079d1679194de237c9a435ae5b55 /src/fluent-bit/lib/librdkafka-2.1.0/tests/0109-auto_create_topics.cpp
parentAdding upstream version 1.44.3. (diff)
downloadnetdata-b485aab7e71c1625cfc27e0f92c9509f42378458.tar.xz
netdata-b485aab7e71c1625cfc27e0f92c9509f42378458.zip
Adding upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0109-auto_create_topics.cpp')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0109-auto_create_topics.cpp218
1 files changed, 218 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0109-auto_create_topics.cpp b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0109-auto_create_topics.cpp
new file mode 100644
index 000000000..cabee6704
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0109-auto_create_topics.cpp
@@ -0,0 +1,218 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2020, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <map>
+#include <cstring>
+#include <cstdlib>
+#include "testcpp.h"
+
+/**
+ * Test consumer allow.auto.create.topics by subscribing to a mix
+ * of available, unauthorized and non-existent topics.
+ *
+ * The same test is run with and without allow.auto.create.topics
+ * and with and without wildcard subscribes.
+ *
+ */
+
+
+static void do_test_consumer(bool allow_auto_create_topics,
+ bool with_wildcards) {
+ Test::Say(tostr() << _C_MAG << "[ Test allow.auto.create.topics="
+ << (allow_auto_create_topics ? "true" : "false")
+ << " with_wildcards=" << (with_wildcards ? "true" : "false")
+ << " ]\n");
+
+ bool has_acl_cli = test_broker_version >= TEST_BRKVER(2, 1, 0, 0) &&
+ !test_needs_auth(); /* We can't bother passing Java
+ * security config to kafka-acls.sh */
+
+ bool supports_allow = test_broker_version >= TEST_BRKVER(0, 11, 0, 0);
+
+ std::string topic_exists = Test::mk_topic_name("0109-exists", 1);
+ std::string topic_notexists = Test::mk_topic_name("0109-notexists", 1);
+ std::string topic_unauth = Test::mk_topic_name("0109-unauthorized", 1);
+
+ /* Create consumer */
+ RdKafka::Conf *conf;
+ Test::conf_init(&conf, NULL, 20);
+ Test::conf_set(conf, "group.id", topic_exists);
+ Test::conf_set(conf, "enable.partition.eof", "true");
+ /* Quickly refresh metadata on topic auto-creation since the first
+ * metadata after auto-create hides the topic due to 0 partition count. */
+ Test::conf_set(conf, "topic.metadata.refresh.interval.ms", "1000");
+ if (allow_auto_create_topics)
+ Test::conf_set(conf, "allow.auto.create.topics", "true");
+
+ std::string bootstraps;
+ if (conf->get("bootstrap.servers", bootstraps) != RdKafka::Conf::CONF_OK)
+ Test::Fail("Failed to retrieve bootstrap.servers");
+
+ std::string errstr;
+ RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
+ if (!c)
+ Test::Fail("Failed to create KafkaConsumer: " + errstr);
+ delete conf;
+
+ /* Create topics */
+ Test::create_topic(c, topic_exists.c_str(), 1, 1);
+
+ if (has_acl_cli) {
+ Test::create_topic(c, topic_unauth.c_str(), 1, 1);
+
+ /* Add denying ACL for unauth topic */
+ test_kafka_cmd(
+ "kafka-acls.sh --bootstrap-server %s "
+ "--add --deny-principal 'User:*' "
+ "--operation All --deny-host '*' "
+ "--topic '%s'",
+ bootstraps.c_str(), topic_unauth.c_str());
+ }
+
+
+ /* Wait for topic to be fully created */
+ test_wait_topic_exists(NULL, topic_exists.c_str(), 10 * 1000);
+
+
+ /*
+ * Subscribe
+ */
+ std::vector<std::string> topics;
+ std::map<std::string, RdKafka::ErrorCode> exp_errors;
+
+ topics.push_back(topic_notexists);
+ if (has_acl_cli)
+ topics.push_back(topic_unauth);
+
+ if (with_wildcards) {
+ topics.push_back("^" + topic_exists);
+ topics.push_back("^" + topic_notexists);
+ /* If the subscription contains at least one wildcard/regex
+ * then no auto topic creation will take place (since the consumer
+ * requests all topics in metadata, and not specific ones, thus
+ * not triggering topic auto creation).
+ * We need to handle the expected error cases accordingly. */
+ exp_errors["^" + topic_notexists] = RdKafka::ERR_UNKNOWN_TOPIC_OR_PART;
+ exp_errors[topic_notexists] = RdKafka::ERR_UNKNOWN_TOPIC_OR_PART;
+
+ if (has_acl_cli) {
+ /* Unauthorized topics are not included in list-all-topics Metadata,
+ * which we use for wildcards, so in this case the error code for
+ * unauthorixed topics show up as unknown topic. */
+ exp_errors[topic_unauth] = RdKafka::ERR_UNKNOWN_TOPIC_OR_PART;
+ }
+ } else {
+ topics.push_back(topic_exists);
+
+ if (has_acl_cli)
+ exp_errors[topic_unauth] = RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED;
+ }
+
+ if (supports_allow && !allow_auto_create_topics)
+ exp_errors[topic_notexists] = RdKafka::ERR_UNKNOWN_TOPIC_OR_PART;
+
+ RdKafka::ErrorCode err;
+ if ((err = c->subscribe(topics)))
+ Test::Fail("subscribe failed: " + RdKafka::err2str(err));
+
+ /* Start consuming until EOF is reached, which indicates that we have an
+ * assignment and any errors should have been reported. */
+ bool run = true;
+ while (run) {
+ RdKafka::Message *msg = c->consume(tmout_multip(1000));
+ switch (msg->err()) {
+ case RdKafka::ERR__TIMED_OUT:
+ case RdKafka::ERR_NO_ERROR:
+ break;
+
+ case RdKafka::ERR__PARTITION_EOF:
+ run = false;
+ break;
+
+ default:
+ Test::Say("Consume error on " + msg->topic_name() + ": " + msg->errstr() +
+ "\n");
+
+ std::map<std::string, RdKafka::ErrorCode>::iterator it =
+ exp_errors.find(msg->topic_name());
+
+ /* Temporary unknown-topic errors are okay for auto-created topics. */
+ bool unknown_is_ok = allow_auto_create_topics && !with_wildcards &&
+ msg->err() == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART &&
+ msg->topic_name() == topic_notexists;
+
+ if (it == exp_errors.end()) {
+ if (unknown_is_ok)
+ Test::Say("Ignoring temporary auto-create error for topic " +
+ msg->topic_name() + ": " + RdKafka::err2str(msg->err()) +
+ "\n");
+ else
+ Test::Fail("Did not expect error for " + msg->topic_name() +
+ ": got: " + RdKafka::err2str(msg->err()));
+ } else if (msg->err() != it->second) {
+ if (unknown_is_ok)
+ Test::Say("Ignoring temporary auto-create error for topic " +
+ msg->topic_name() + ": " + RdKafka::err2str(msg->err()) +
+ "\n");
+ else
+ Test::Fail("Expected '" + RdKafka::err2str(it->second) + "' for " +
+ msg->topic_name() + ", got " +
+ RdKafka::err2str(msg->err()));
+ } else {
+ exp_errors.erase(msg->topic_name());
+ }
+
+ break;
+ }
+
+ delete msg;
+ }
+
+
+ /* Fail if not all expected errors were seen. */
+ if (!exp_errors.empty())
+ Test::Fail(tostr() << "Expecting " << exp_errors.size() << " more errors");
+
+ c->close();
+
+ delete c;
+}
+
+extern "C" {
+int main_0109_auto_create_topics(int argc, char **argv) {
+ /* Parameters:
+ * allow auto create, with wildcards */
+ do_test_consumer(true, true);
+ do_test_consumer(true, false);
+ do_test_consumer(false, true);
+ do_test_consumer(false, false);
+
+ return 0;
+}
+}