summaryrefslogtreecommitdiffstats
path: root/src/fluent-bit/lib/librdkafka-2.1.0/tests/0119-consumer_auth.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/fluent-bit/lib/librdkafka-2.1.0/tests/0119-consumer_auth.cpp')
-rw-r--r--src/fluent-bit/lib/librdkafka-2.1.0/tests/0119-consumer_auth.cpp148
1 files changed, 148 insertions, 0 deletions
diff --git a/src/fluent-bit/lib/librdkafka-2.1.0/tests/0119-consumer_auth.cpp b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0119-consumer_auth.cpp
new file mode 100644
index 000000000..507b67302
--- /dev/null
+++ b/src/fluent-bit/lib/librdkafka-2.1.0/tests/0119-consumer_auth.cpp
@@ -0,0 +1,148 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2020, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <iostream>
+#include <map>
+#include <cstring>
+#include <cstdlib>
+#include "testcpp.h"
+
+
+/**
+ * @brief Let FetchRequests fail with authorization failure.
+ *
+ */
+
+
+static void do_test_fetch_unauth() {
+ Test::Say(tostr() << _C_MAG << "[ Test unauthorized Fetch ]\n");
+
+ std::string topic = Test::mk_topic_name("0119-fetch_unauth", 1);
+
+ RdKafka::Conf *conf;
+ Test::conf_init(&conf, NULL, 20);
+
+ Test::conf_set(conf, "group.id", topic);
+
+ std::string bootstraps;
+ if (conf->get("bootstrap.servers", bootstraps) != RdKafka::Conf::CONF_OK)
+ Test::Fail("Failed to retrieve bootstrap.servers");
+
+ std::string errstr;
+ RdKafka::KafkaConsumer *c = RdKafka::KafkaConsumer::create(conf, errstr);
+ if (!c)
+ Test::Fail("Failed to create KafkaConsumer: " + errstr);
+ delete conf;
+
+ /* Create topic */
+ const int partition_cnt = 3;
+ Test::create_topic(NULL, topic.c_str(), partition_cnt, 1);
+
+ /* Produce messages */
+ test_produce_msgs_easy(topic.c_str(), 0, RdKafka::Topic::PARTITION_UA, 1000);
+
+ /* Add ACLs:
+ * Allow Describe (Metadata)
+ * Deny Read (Fetch)
+ */
+
+ test_kafka_cmd(
+ "kafka-acls.sh --bootstrap-server %s "
+ "--add --allow-principal 'User:*' "
+ "--operation Describe --allow-host '*' "
+ "--topic '%s'",
+ bootstraps.c_str(), topic.c_str());
+
+ test_kafka_cmd(
+ "kafka-acls.sh --bootstrap-server %s "
+ "--add --deny-principal 'User:*' "
+ "--operation Read --deny-host '*' "
+ "--topic '%s'",
+ bootstraps.c_str(), topic.c_str());
+
+ Test::subscribe(c, topic);
+
+ int auth_err_cnt = 0;
+
+ /* Consume for 15s (30*0.5), counting the number of auth errors,
+ * should only see one error per consumed partition, and no messages. */
+ for (int i = 0; i < 30; i++) {
+ RdKafka::Message *msg;
+
+ msg = c->consume(500);
+ TEST_ASSERT(msg, "Expected msg");
+
+ switch (msg->err()) {
+ case RdKafka::ERR__TIMED_OUT:
+ break;
+
+ case RdKafka::ERR_NO_ERROR:
+ Test::Fail("Did not expect a valid message");
+ break;
+
+ case RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED:
+ Test::Say(tostr() << "Consumer error on " << msg->topic_name() << " ["
+ << msg->partition() << "]: " << msg->errstr() << "\n");
+
+ if (auth_err_cnt++ > partition_cnt)
+ Test::Fail(
+ "Too many auth errors received, "
+ "expected same as number of partitions");
+ break;
+
+ default:
+ Test::Fail(tostr() << "Unexpected consumer error on " << msg->topic_name()
+ << " [" << msg->partition() << "]: " << msg->errstr());
+ break;
+ }
+
+ delete msg;
+ }
+
+ TEST_ASSERT(auth_err_cnt == partition_cnt,
+ "Expected exactly %d auth errors, saw %d", partition_cnt,
+ auth_err_cnt);
+
+ delete c;
+
+ Test::Say(tostr() << _C_GRN << "[ Test unauthorized Fetch PASS ]\n");
+}
+
+extern "C" {
+int main_0119_consumer_auth(int argc, char **argv) {
+ /* We can't bother passing Java security config to kafka-acls.sh */
+ if (test_needs_auth()) {
+ Test::Skip("Cluster authentication required\n");
+ return 0;
+ }
+
+ do_test_fetch_unauth();
+
+ return 0;
+}
+}