summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl.c522
1 files changed, 522 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl.c
new file mode 100644
index 000000000..cab67f241
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl.c
@@ -0,0 +1,522 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2015 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rdkafka_int.h"
+#include "rdkafka_transport.h"
+#include "rdkafka_transport_int.h"
+#include "rdkafka_request.h"
+#include "rdkafka_sasl.h"
+#include "rdkafka_sasl_int.h"
+#include "rdkafka_request.h"
+#include "rdkafka_queue.h"
+
+/**
+ * @brief Send SASL auth data using legacy directly on socket framing.
+ *
+ * @warning This is a blocking call.
+ */
+static int rd_kafka_sasl_send_legacy(rd_kafka_transport_t *rktrans,
+ const void *payload,
+ int len,
+ char *errstr,
+ size_t errstr_size) {
+ rd_buf_t buf;
+ rd_slice_t slice;
+ int32_t hdr;
+
+ rd_buf_init(&buf, 1 + 1, sizeof(hdr));
+
+ hdr = htobe32(len);
+ rd_buf_write(&buf, &hdr, sizeof(hdr));
+ if (payload)
+ rd_buf_push(&buf, payload, len, NULL);
+
+ rd_slice_init_full(&slice, &buf);
+
+ /* Simulate blocking behaviour on non-blocking socket..
+ * FIXME: This isn't optimal but is highly unlikely to stall since
+ * the socket buffer will most likely not be exceeded. */
+ do {
+ int r;
+
+ r = (int)rd_kafka_transport_send(rktrans, &slice, errstr,
+ errstr_size);
+ if (r == -1) {
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
+ "SASL send failed: %s", errstr);
+ rd_buf_destroy(&buf);
+ return -1;
+ }
+
+ if (rd_slice_remains(&slice) == 0)
+ break;
+
+ /* Avoid busy-looping */
+ rd_usleep(10 * 1000, NULL);
+
+ } while (1);
+
+ rd_buf_destroy(&buf);
+
+ return 0;
+}
+
+/**
+ * @brief Send auth message with framing (either legacy or Kafka framing).
+ *
+ * @warning This is a blocking call when used with the legacy framing.
+ */
+int rd_kafka_sasl_send(rd_kafka_transport_t *rktrans,
+ const void *payload,
+ int len,
+ char *errstr,
+ size_t errstr_size) {
+ rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+
+ rd_rkb_dbg(
+ rkb, SECURITY, "SASL", "Send SASL %s frame to broker (%d bytes)",
+ (rkb->rkb_features & RD_KAFKA_FEATURE_SASL_AUTH_REQ) ? "Kafka"
+ : "legacy",
+ len);
+
+ /* Blocking legacy framed send directly on the socket */
+ if (!(rkb->rkb_features & RD_KAFKA_FEATURE_SASL_AUTH_REQ))
+ return rd_kafka_sasl_send_legacy(rktrans, payload, len, errstr,
+ errstr_size);
+
+ /* Kafka-framed asynchronous send */
+ rd_kafka_SaslAuthenticateRequest(
+ rkb, payload, (size_t)len, RD_KAFKA_NO_REPLYQ,
+ rd_kafka_handle_SaslAuthenticate, NULL);
+
+ return 0;
+}
+
+
+/**
+ * @brief Authentication succesful
+ *
+ * Transition to next connect state.
+ */
+void rd_kafka_sasl_auth_done(rd_kafka_transport_t *rktrans) {
+ /* Authenticated */
+ rd_kafka_broker_connect_up(rktrans->rktrans_rkb);
+}
+
+
+/**
+ * @brief Handle SASL auth data from broker.
+ *
+ * @locality broker thread
+ *
+ * @returns -1 on error, else 0.
+ */
+int rd_kafka_sasl_recv(rd_kafka_transport_t *rktrans,
+ const void *buf,
+ size_t len,
+ char *errstr,
+ size_t errstr_size) {
+
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASL",
+ "Received SASL frame from broker (%" PRIusz " bytes)", len);
+
+ return rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.provider->recv(
+ rktrans, buf, len, errstr, errstr_size);
+}
+
+/**
+ * @brief Non-kafka-protocol framed SASL auth data receive event.
+ *
+ * @locality broker thread
+ *
+ * @returns -1 on error, else 0.
+ */
+int rd_kafka_sasl_io_event(rd_kafka_transport_t *rktrans,
+ int events,
+ char *errstr,
+ size_t errstr_size) {
+ rd_kafka_buf_t *rkbuf;
+ int r;
+ const void *buf;
+ size_t len;
+
+ if (!(events & POLLIN))
+ return 0;
+
+ r = rd_kafka_transport_framed_recv(rktrans, &rkbuf, errstr,
+ errstr_size);
+ if (r == -1) {
+ if (!strcmp(errstr, "Disconnected"))
+ rd_snprintf(errstr, errstr_size,
+ "Disconnected: check client %s credentials "
+ "and broker logs",
+ rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl
+ .mechanisms);
+ return -1;
+ } else if (r == 0) /* not fully received yet */
+ return 0;
+
+ if (rkbuf) {
+ rd_slice_init_full(&rkbuf->rkbuf_reader, &rkbuf->rkbuf_buf);
+ /* Seek past framing header */
+ rd_slice_seek(&rkbuf->rkbuf_reader, 4);
+ len = rd_slice_remains(&rkbuf->rkbuf_reader);
+ buf = rd_slice_ensure_contig(&rkbuf->rkbuf_reader, len);
+ } else {
+ buf = NULL;
+ len = 0;
+ }
+
+ r = rd_kafka_sasl_recv(rktrans, buf, len, errstr, errstr_size);
+
+ if (rkbuf)
+ rd_kafka_buf_destroy(rkbuf);
+
+ return r;
+}
+
+
+/**
+ * @brief Close SASL session (from transport code)
+ * @remark May be called on non-SASL transports (no-op)
+ */
+void rd_kafka_sasl_close(rd_kafka_transport_t *rktrans) {
+ const struct rd_kafka_sasl_provider *provider =
+ rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.provider;
+
+ if (provider && provider->close)
+ provider->close(rktrans);
+}
+
+
+
+/**
+ * Initialize and start SASL authentication.
+ *
+ * Returns 0 on successful init and -1 on error.
+ *
+ * Locality: broker thread
+ */
+int rd_kafka_sasl_client_new(rd_kafka_transport_t *rktrans,
+ char *errstr,
+ size_t errstr_size) {
+ int r;
+ rd_kafka_broker_t *rkb = rktrans->rktrans_rkb;
+ rd_kafka_t *rk = rkb->rkb_rk;
+ char *hostname, *t;
+ const struct rd_kafka_sasl_provider *provider =
+ rk->rk_conf.sasl.provider;
+
+ /* Verify broker support:
+ * - RD_KAFKA_FEATURE_SASL_GSSAPI - GSSAPI supported
+ * - RD_KAFKA_FEATURE_SASL_HANDSHAKE - GSSAPI, PLAIN and possibly
+ * other mechanisms supported. */
+ if (!strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
+ if (!(rkb->rkb_features & RD_KAFKA_FEATURE_SASL_GSSAPI)) {
+ rd_snprintf(errstr, errstr_size,
+ "SASL GSSAPI authentication not supported "
+ "by broker");
+ return -1;
+ }
+ } else if (!(rkb->rkb_features & RD_KAFKA_FEATURE_SASL_HANDSHAKE)) {
+ rd_snprintf(errstr, errstr_size,
+ "SASL Handshake not supported by broker "
+ "(required by mechanism %s)%s",
+ rk->rk_conf.sasl.mechanisms,
+ rk->rk_conf.api_version_request
+ ? ""
+ : ": try api.version.request=true");
+ return -1;
+ }
+
+ rd_kafka_broker_lock(rktrans->rktrans_rkb);
+ rd_strdupa(&hostname, rktrans->rktrans_rkb->rkb_nodename);
+ rd_kafka_broker_unlock(rktrans->rktrans_rkb);
+
+ if ((t = strchr(hostname, ':')))
+ *t = '\0'; /* remove ":port" */
+
+ rd_rkb_dbg(rkb, SECURITY, "SASL",
+ "Initializing SASL client: service name %s, "
+ "hostname %s, mechanisms %s, provider %s",
+ rk->rk_conf.sasl.service_name, hostname,
+ rk->rk_conf.sasl.mechanisms, provider->name);
+
+ r = provider->client_new(rktrans, hostname, errstr, errstr_size);
+ if (r != -1)
+ rd_kafka_transport_poll_set(rktrans, POLLIN);
+
+ return r;
+}
+
+
+
+rd_kafka_queue_t *rd_kafka_queue_get_sasl(rd_kafka_t *rk) {
+ if (!rk->rk_sasl.callback_q)
+ return NULL;
+
+ return rd_kafka_queue_new0(rk, rk->rk_sasl.callback_q);
+}
+
+
+/**
+ * Per handle SASL term.
+ *
+ * Locality: broker thread
+ */
+void rd_kafka_sasl_broker_term(rd_kafka_broker_t *rkb) {
+ const struct rd_kafka_sasl_provider *provider =
+ rkb->rkb_rk->rk_conf.sasl.provider;
+ if (provider->broker_term)
+ provider->broker_term(rkb);
+}
+
+/**
+ * Broker SASL init.
+ *
+ * Locality: broker thread
+ */
+void rd_kafka_sasl_broker_init(rd_kafka_broker_t *rkb) {
+ const struct rd_kafka_sasl_provider *provider =
+ rkb->rkb_rk->rk_conf.sasl.provider;
+ if (provider->broker_init)
+ provider->broker_init(rkb);
+}
+
+
+/**
+ * @brief Per-instance initializer using the selected provider
+ *
+ * @returns 0 on success or -1 on error.
+ *
+ * @locality app thread (from rd_kafka_new())
+ */
+int rd_kafka_sasl_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) {
+ const struct rd_kafka_sasl_provider *provider =
+ rk->rk_conf.sasl.provider;
+
+ if (provider && provider->init)
+ return provider->init(rk, errstr, errstr_size);
+
+ return 0;
+}
+
+
+/**
+ * @brief Per-instance destructor for the selected provider
+ *
+ * @locality app thread (from rd_kafka_new()) or rdkafka main thread
+ */
+void rd_kafka_sasl_term(rd_kafka_t *rk) {
+ const struct rd_kafka_sasl_provider *provider =
+ rk->rk_conf.sasl.provider;
+
+ if (provider && provider->term)
+ provider->term(rk);
+
+ RD_IF_FREE(rk->rk_sasl.callback_q, rd_kafka_q_destroy_owner);
+}
+
+
+/**
+ * @returns rd_true if provider is ready to be used or SASL not configured,
+ * else rd_false.
+ *
+ * @locks none
+ * @locality any thread
+ */
+rd_bool_t rd_kafka_sasl_ready(rd_kafka_t *rk) {
+ const struct rd_kafka_sasl_provider *provider =
+ rk->rk_conf.sasl.provider;
+
+ if (provider && provider->ready)
+ return provider->ready(rk);
+
+ return rd_true;
+}
+
+
+/**
+ * @brief Select SASL provider for configured mechanism (singularis)
+ * @returns 0 on success or -1 on failure.
+ */
+int rd_kafka_sasl_select_provider(rd_kafka_t *rk,
+ char *errstr,
+ size_t errstr_size) {
+ const struct rd_kafka_sasl_provider *provider = NULL;
+
+ if (!strcmp(rk->rk_conf.sasl.mechanisms, "GSSAPI")) {
+ /* GSSAPI / Kerberos */
+#ifdef _WIN32
+ provider = &rd_kafka_sasl_win32_provider;
+#elif WITH_SASL_CYRUS
+ provider = &rd_kafka_sasl_cyrus_provider;
+#endif
+
+ } else if (!strcmp(rk->rk_conf.sasl.mechanisms, "PLAIN")) {
+ /* SASL PLAIN */
+ provider = &rd_kafka_sasl_plain_provider;
+
+ } else if (!strncmp(rk->rk_conf.sasl.mechanisms, "SCRAM-SHA-",
+ strlen("SCRAM-SHA-"))) {
+ /* SASL SCRAM */
+#if WITH_SASL_SCRAM
+ provider = &rd_kafka_sasl_scram_provider;
+#endif
+
+ } else if (!strcmp(rk->rk_conf.sasl.mechanisms, "OAUTHBEARER")) {
+ /* SASL OAUTHBEARER */
+#if WITH_SASL_OAUTHBEARER
+ provider = &rd_kafka_sasl_oauthbearer_provider;
+#endif
+ } else {
+ /* Unsupported mechanism */
+ rd_snprintf(errstr, errstr_size,
+ "Unsupported SASL mechanism: %s",
+ rk->rk_conf.sasl.mechanisms);
+ return -1;
+ }
+
+ if (!provider) {
+ rd_snprintf(errstr, errstr_size,
+ "No provider for SASL mechanism %s"
+ ": recompile librdkafka with "
+#ifndef _WIN32
+ "libsasl2 or "
+#endif
+ "openssl support. "
+ "Current build options:"
+ " PLAIN"
+#ifdef _WIN32
+ " WindowsSSPI(GSSAPI)"
+#endif
+#if WITH_SASL_CYRUS
+ " SASL_CYRUS"
+#endif
+#if WITH_SASL_SCRAM
+ " SASL_SCRAM"
+#endif
+#if WITH_SASL_OAUTHBEARER
+ " OAUTHBEARER"
+#endif
+ ,
+ rk->rk_conf.sasl.mechanisms);
+ return -1;
+ }
+
+ rd_kafka_dbg(rk, SECURITY, "SASL",
+ "Selected provider %s for SASL mechanism %s",
+ provider->name, rk->rk_conf.sasl.mechanisms);
+
+ /* Validate SASL config */
+ if (provider->conf_validate &&
+ provider->conf_validate(rk, errstr, errstr_size) == -1)
+ return -1;
+
+ rk->rk_conf.sasl.provider = provider;
+
+ return 0;
+}
+
+
+rd_kafka_error_t *rd_kafka_sasl_background_callbacks_enable(rd_kafka_t *rk) {
+ rd_kafka_queue_t *saslq, *bgq;
+
+ if (!(saslq = rd_kafka_queue_get_sasl(rk)))
+ return rd_kafka_error_new(
+ RD_KAFKA_RESP_ERR__NOT_CONFIGURED,
+ "No SASL mechanism using callbacks is configured");
+
+ if (!(bgq = rd_kafka_queue_get_background(rk))) {
+ rd_kafka_queue_destroy(saslq);
+ return rd_kafka_error_new(
+ RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE,
+ "The background thread is not available");
+ }
+
+ rd_kafka_queue_forward(saslq, bgq);
+
+ rd_kafka_queue_destroy(saslq);
+ rd_kafka_queue_destroy(bgq);
+
+ return NULL;
+}
+
+
+/**
+ * Global SASL termination.
+ */
+void rd_kafka_sasl_global_term(void) {
+#if WITH_SASL_CYRUS
+ rd_kafka_sasl_cyrus_global_term();
+#endif
+}
+
+
+/**
+ * Global SASL init, called once per runtime.
+ */
+int rd_kafka_sasl_global_init(void) {
+#if WITH_SASL_CYRUS
+ return rd_kafka_sasl_cyrus_global_init();
+#else
+ return 0;
+#endif
+}
+
+/**
+ * Sets or resets the SASL (PLAIN or SCRAM) credentials used by this
+ * client when making new connections to brokers.
+ *
+ * @returns NULL on success or an error object on error.
+ */
+rd_kafka_error_t *rd_kafka_sasl_set_credentials(rd_kafka_t *rk,
+ const char *username,
+ const char *password) {
+
+ if (!username || !password)
+ return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "Username and password are required");
+
+ mtx_lock(&rk->rk_conf.sasl.lock);
+
+ if (rk->rk_conf.sasl.username)
+ rd_free(rk->rk_conf.sasl.username);
+ rk->rk_conf.sasl.username = rd_strdup(username);
+
+ if (rk->rk_conf.sasl.password)
+ rd_free(rk->rk_conf.sasl.password);
+ rk->rk_conf.sasl.password = rd_strdup(password);
+
+ mtx_unlock(&rk->rk_conf.sasl.lock);
+
+ rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT,
+ "SASL credentials updated");
+
+ return NULL;
+}