summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_plain.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_plain.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_plain.c142
1 files changed, 142 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_plain.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_plain.c
new file mode 100644
index 000000000..1e715cfba
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_plain.c
@@ -0,0 +1,142 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2017 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+/**
+ * Builtin SASL PLAIN support when Cyrus SASL is not available
+ */
+#include "rdkafka_int.h"
+#include "rdkafka_transport.h"
+#include "rdkafka_transport_int.h"
+#include "rdkafka_sasl.h"
+#include "rdkafka_sasl_int.h"
+
+
+/**
+ * @brief Handle received frame from broker.
+ */
+static int rd_kafka_sasl_plain_recv(struct rd_kafka_transport_s *rktrans,
+ const void *buf,
+ size_t size,
+ char *errstr,
+ size_t errstr_size) {
+ if (size)
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLPLAIN",
+ "Received non-empty SASL PLAIN (builtin) "
+ "response from broker (%" PRIusz " bytes)",
+ size);
+
+ rd_kafka_sasl_auth_done(rktrans);
+
+ return 0;
+}
+
+
+/**
+ * @brief Initialize and start SASL PLAIN (builtin) authentication.
+ *
+ * Returns 0 on successful init and -1 on error.
+ *
+ * @locality broker thread
+ */
+int rd_kafka_sasl_plain_client_new(rd_kafka_transport_t *rktrans,
+ const char *hostname,
+ char *errstr,
+ size_t errstr_size) {
+ rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+ rd_kafka_t *rk = rkb->rkb_rk;
+ /* [authzid] UTF8NUL authcid UTF8NUL passwd */
+ char *buf;
+ int of = 0;
+ int zidlen = 0;
+ int cidlen, pwlen;
+
+ mtx_lock(&rk->rk_conf.sasl.lock);
+
+ cidlen = rk->rk_conf.sasl.username
+ ? (int)strlen(rk->rk_conf.sasl.username)
+ : 0;
+ pwlen = rk->rk_conf.sasl.password
+ ? (int)strlen(rk->rk_conf.sasl.password)
+ : 0;
+
+ buf = rd_alloca(zidlen + 1 + cidlen + 1 + pwlen + 1);
+
+ /* authzid: none (empty) */
+ /* UTF8NUL */
+ buf[of++] = 0;
+ /* authcid */
+ memcpy(&buf[of], rk->rk_conf.sasl.username, cidlen);
+ of += cidlen;
+ /* UTF8NUL */
+ buf[of++] = 0;
+ /* passwd */
+ memcpy(&buf[of], rk->rk_conf.sasl.password, pwlen);
+ of += pwlen;
+ mtx_unlock(&rk->rk_conf.sasl.lock);
+
+ rd_rkb_dbg(rkb, SECURITY, "SASLPLAIN",
+ "Sending SASL PLAIN (builtin) authentication token");
+
+ if (rd_kafka_sasl_send(rktrans, buf, of, errstr, errstr_size))
+ return -1;
+
+ /* PLAIN is appearantly done here, but we still need to make sure
+ * the PLAIN frame is sent and we get a response back (empty) */
+ rktrans->rktrans_sasl.complete = 1;
+ return 0;
+}
+
+
+/**
+ * @brief Validate PLAIN config
+ */
+static int rd_kafka_sasl_plain_conf_validate(rd_kafka_t *rk,
+ char *errstr,
+ size_t errstr_size) {
+ rd_bool_t both_set;
+
+ mtx_lock(&rk->rk_conf.sasl.lock);
+ both_set = rk->rk_conf.sasl.username && rk->rk_conf.sasl.password;
+ mtx_unlock(&rk->rk_conf.sasl.lock);
+
+ if (!both_set) {
+ rd_snprintf(errstr, errstr_size,
+ "sasl.username and sasl.password must be set");
+ return -1;
+ }
+
+ return 0;
+}
+
+
+const struct rd_kafka_sasl_provider rd_kafka_sasl_plain_provider = {
+ .name = "PLAIN (builtin)",
+ .client_new = rd_kafka_sasl_plain_client_new,
+ .recv = rd_kafka_sasl_plain_recv,
+ .conf_validate = rd_kafka_sasl_plain_conf_validate};