summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_oauthbearer.c
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_oauthbearer.c')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_oauthbearer.c1825
1 files changed, 1825 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_oauthbearer.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_oauthbearer.c
new file mode 100644
index 00000000..39b165a7
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_oauthbearer.c
@@ -0,0 +1,1825 @@
+/*
+ * librdkafka - The Apache Kafka C/C++ library
+ *
+ * Copyright (c) 2019 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+/**
+ * Builtin SASL OAUTHBEARER support
+ */
+#include "rdkafka_int.h"
+#include "rdkafka_transport_int.h"
+#include "rdkafka_sasl_int.h"
+#include <openssl/evp.h>
+#include "rdunittest.h"
+
+#if WITH_OAUTHBEARER_OIDC
+#include "rdkafka_sasl_oauthbearer_oidc.h"
+#endif
+
+
+/**
+ * @struct Per-client-instance SASL/OAUTHBEARER handle.
+ */
+typedef struct rd_kafka_sasl_oauthbearer_handle_s {
+ /**< Read-write lock for fields in the handle. */
+ rwlock_t lock;
+
+ /**< The b64token value as defined in RFC 6750 Section 2.1
+ * https://tools.ietf.org/html/rfc6750#section-2.1
+ */
+ char *token_value;
+
+ /**< When the token expires, in terms of the number of
+ * milliseconds since the epoch. Wall clock time.
+ */
+ rd_ts_t wts_md_lifetime;
+
+ /**< The point after which this token should be replaced with a
+ * new one, in terms of the number of milliseconds since the
+ * epoch. Wall clock time.
+ */
+ rd_ts_t wts_refresh_after;
+
+ /**< When the last token refresh was equeued (0 = never)
+ * in terms of the number of milliseconds since the epoch.
+ * Wall clock time.
+ */
+ rd_ts_t wts_enqueued_refresh;
+
+ /**< The name of the principal to which this token applies. */
+ char *md_principal_name;
+
+ /**< The SASL extensions, as per RFC 7628 Section 3.1
+ * https://tools.ietf.org/html/rfc7628#section-3.1
+ */
+ rd_list_t extensions; /* rd_strtup_t list */
+
+ /**< Error message for validation and/or token retrieval problems. */
+ char *errstr;
+
+ /**< Back-pointer to client instance. */
+ rd_kafka_t *rk;
+
+ /**< Token refresh timer */
+ rd_kafka_timer_t token_refresh_tmr;
+
+ /** Queue to enqueue token_refresh_cb ops on. */
+ rd_kafka_q_t *callback_q;
+
+ /** Using internal refresh callback (sasl.oauthbearer.method=oidc) */
+ rd_bool_t internal_refresh;
+
+} rd_kafka_sasl_oauthbearer_handle_t;
+
+
+/**
+ * @struct Unsecured JWS info populated when sasl.oauthbearer.config is parsed
+ */
+struct rd_kafka_sasl_oauthbearer_parsed_ujws {
+ char *principal_claim_name;
+ char *principal;
+ char *scope_claim_name;
+ char *scope_csv_text;
+ int life_seconds;
+ rd_list_t extensions; /* rd_strtup_t list */
+};
+
+/**
+ * @struct Unsecured JWS token to be set on the client handle
+ */
+struct rd_kafka_sasl_oauthbearer_token {
+ char *token_value;
+ int64_t md_lifetime_ms;
+ char *md_principal_name;
+ char **extensions;
+ size_t extension_size;
+};
+
+/**
+ * @brief Per-connection state
+ */
+struct rd_kafka_sasl_oauthbearer_state {
+ enum { RD_KAFKA_SASL_OAUTHB_STATE_SEND_CLIENT_FIRST_MESSAGE,
+ RD_KAFKA_SASL_OAUTHB_STATE_RECV_SERVER_FIRST_MSG,
+ RD_KAFKA_SASL_OAUTHB_STATE_RECV_SERVER_MSG_AFTER_FAIL,
+ } state;
+ char *server_error_msg;
+
+ /*
+ * A place to store a consistent view of the token and extensions
+ * throughout the authentication process -- even if it is refreshed
+ * midway through this particular authentication.
+ */
+ char *token_value;
+ char *md_principal_name;
+ rd_list_t extensions; /* rd_strtup_t list */
+};
+
+
+
+/**
+ * @brief free memory inside the given token
+ */
+static void rd_kafka_sasl_oauthbearer_token_free(
+ struct rd_kafka_sasl_oauthbearer_token *token) {
+ size_t i;
+
+ RD_IF_FREE(token->token_value, rd_free);
+ RD_IF_FREE(token->md_principal_name, rd_free);
+
+ for (i = 0; i < token->extension_size; i++)
+ rd_free(token->extensions[i]);
+
+ RD_IF_FREE(token->extensions, rd_free);
+
+ memset(token, 0, sizeof(*token));
+}
+
+
+/**
+ * @brief Op callback for RD_KAFKA_OP_OAUTHBEARER_REFRESH
+ *
+ * @locality Application thread
+ */
+static rd_kafka_op_res_t rd_kafka_oauthbearer_refresh_op(rd_kafka_t *rk,
+ rd_kafka_q_t *rkq,
+ rd_kafka_op_t *rko) {
+ /* The op callback is invoked when the op is destroyed via
+ * rd_kafka_op_destroy() or rd_kafka_event_destroy(), so
+ * make sure we don't refresh upon destruction since
+ * the op has already been handled by this point.
+ */
+ if (rko->rko_err != RD_KAFKA_RESP_ERR__DESTROY &&
+ rk->rk_conf.sasl.oauthbearer.token_refresh_cb)
+ rk->rk_conf.sasl.oauthbearer.token_refresh_cb(
+ rk, rk->rk_conf.sasl.oauthbearer_config,
+ rk->rk_conf.opaque);
+ return RD_KAFKA_OP_RES_HANDLED;
+}
+
+/**
+ * @brief Enqueue a token refresh.
+ * @locks rwlock_wrlock(&handle->lock) MUST be held
+ */
+static void rd_kafka_oauthbearer_enqueue_token_refresh(
+ rd_kafka_sasl_oauthbearer_handle_t *handle) {
+ rd_kafka_op_t *rko;
+
+ rko = rd_kafka_op_new_cb(handle->rk, RD_KAFKA_OP_OAUTHBEARER_REFRESH,
+ rd_kafka_oauthbearer_refresh_op);
+ rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_FLASH);
+
+ /* For internal OIDC refresh callback:
+ * Force op to be handled by internal callback on the
+ * receiving queue, rather than being passed as an event to
+ * the application. */
+ if (handle->internal_refresh)
+ rko->rko_flags |= RD_KAFKA_OP_F_FORCE_CB;
+
+ handle->wts_enqueued_refresh = rd_uclock();
+ rd_kafka_q_enq(handle->callback_q, rko);
+}
+
+/**
+ * @brief Enqueue a token refresh if necessary.
+ *
+ * The method rd_kafka_oauthbearer_enqueue_token_refresh() is invoked
+ * if necessary; the required lock is acquired and released. This method
+ * returns immediately when SASL/OAUTHBEARER is not in use by the client.
+ */
+static void rd_kafka_oauthbearer_enqueue_token_refresh_if_necessary(
+ rd_kafka_sasl_oauthbearer_handle_t *handle) {
+ rd_ts_t now_wallclock;
+
+ now_wallclock = rd_uclock();
+
+ rwlock_wrlock(&handle->lock);
+ if (handle->wts_refresh_after < now_wallclock &&
+ handle->wts_enqueued_refresh <= handle->wts_refresh_after)
+ /* Refresh required and not yet scheduled; refresh it */
+ rd_kafka_oauthbearer_enqueue_token_refresh(handle);
+ rwlock_wrunlock(&handle->lock);
+}
+
+/**
+ * @returns \c rd_true if SASL/OAUTHBEARER is the configured authentication
+ * mechanism and a token is available, otherwise \c rd_false.
+ *
+ * @locks none
+ * @locality any
+ */
+static rd_bool_t
+rd_kafka_oauthbearer_has_token(rd_kafka_sasl_oauthbearer_handle_t *handle) {
+ rd_bool_t retval_has_token;
+
+ rwlock_rdlock(&handle->lock);
+ retval_has_token = handle->token_value != NULL;
+ rwlock_rdunlock(&handle->lock);
+
+ return retval_has_token;
+}
+
+/**
+ * @brief Verify that the provided \p key is valid.
+ * @returns 0 on success or -1 if \p key is invalid.
+ */
+static int check_oauthbearer_extension_key(const char *key,
+ char *errstr,
+ size_t errstr_size) {
+ const char *c;
+
+ if (!strcmp(key, "auth")) {
+ rd_snprintf(errstr, errstr_size,
+ "Cannot explicitly set the reserved `auth` "
+ "SASL/OAUTHBEARER extension key");
+ return -1;
+ }
+
+ /*
+ * https://tools.ietf.org/html/rfc7628#section-3.1
+ * key = 1*(ALPHA)
+ *
+ * https://tools.ietf.org/html/rfc5234#appendix-B.1
+ * ALPHA = %x41-5A / %x61-7A ; A-Z / a-z
+ */
+ if (!*key) {
+ rd_snprintf(errstr, errstr_size,
+ "SASL/OAUTHBEARER extension keys "
+ "must not be empty");
+ return -1;
+ }
+
+ for (c = key; *c; c++) {
+ if (!(*c >= 'A' && *c <= 'Z') && !(*c >= 'a' && *c <= 'z')) {
+ rd_snprintf(errstr, errstr_size,
+ "SASL/OAUTHBEARER extension keys must "
+ "only consist of A-Z or "
+ "a-z characters: %s (%c)",
+ key, *c);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * @brief Verify that the provided \p value is valid.
+ * @returns 0 on success or -1 if \p value is invalid.
+ */
+static int check_oauthbearer_extension_value(const char *value,
+ char *errstr,
+ size_t errstr_size) {
+ const char *c;
+
+ /*
+ * https://tools.ietf.org/html/rfc7628#section-3.1
+ * value = *(VCHAR / SP / HTAB / CR / LF )
+ *
+ * https://tools.ietf.org/html/rfc5234#appendix-B.1
+ * VCHAR = %x21-7E ; visible (printing) characters
+ * SP = %x20 ; space
+ * HTAB = %x09 ; horizontal tab
+ * CR = %x0D ; carriage return
+ * LF = %x0A ; linefeed
+ */
+ for (c = value; *c; c++) {
+ if (!(*c >= '\x21' && *c <= '\x7E') && *c != '\x20' &&
+ *c != '\x09' && *c != '\x0D' && *c != '\x0A') {
+ rd_snprintf(errstr, errstr_size,
+ "SASL/OAUTHBEARER extension values must "
+ "only consist of space, horizontal tab, "
+ "CR, LF, and "
+ "visible characters (%%x21-7E): %s (%c)",
+ value, *c);
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * @brief Set SASL/OAUTHBEARER token and metadata
+ *
+ * @param rk Client instance.
+ * @param token_value the mandatory token value to set, often (but not
+ * necessarily) a JWS compact serialization as per
+ * https://tools.ietf.org/html/rfc7515#section-3.1.
+ * Use rd_kafka_sasl_oauthbearer_token_free() to free members if
+ * return value is not -1.
+ * @param md_lifetime_ms when the token expires, in terms of the number of
+ * milliseconds since the epoch. See https://currentmillis.com/.
+ * @param md_principal_name the mandatory Kafka principal name associated
+ * with the token.
+ * @param extensions optional SASL extensions key-value array with
+ * \p extensions_size elements (number of keys * 2), where [i] is the key and
+ * [i+1] is the key's value, to be communicated to the broker
+ * as additional key-value pairs during the initial client response as per
+ * https://tools.ietf.org/html/rfc7628#section-3.1.
+ * @param extension_size the number of SASL extension keys plus values,
+ * which should be a non-negative multiple of 2.
+ *
+ * The SASL/OAUTHBEARER token refresh callback or event handler should cause
+ * this method to be invoked upon success, via
+ * rd_kafka_oauthbearer_set_token(). The extension keys must not include the
+ * reserved key "`auth`", and all extension keys and values must conform to the
+ * required format as per https://tools.ietf.org/html/rfc7628#section-3.1:
+ *
+ * key = 1*(ALPHA)
+ * value = *(VCHAR / SP / HTAB / CR / LF )
+ *
+ * @returns \c RD_KAFKA_RESP_ERR_NO_ERROR on success, otherwise errstr set and:
+ * \c RD_KAFKA_RESP_ERR__INVALID_ARG if any of the arguments are
+ * invalid;
+ * \c RD_KAFKA_RESP_ERR__STATE if SASL/OAUTHBEARER is not configured as
+ * the client's authentication mechanism.
+ *
+ * @sa rd_kafka_oauthbearer_set_token_failure0
+ */
+rd_kafka_resp_err_t
+rd_kafka_oauthbearer_set_token0(rd_kafka_t *rk,
+ const char *token_value,
+ int64_t md_lifetime_ms,
+ const char *md_principal_name,
+ const char **extensions,
+ size_t extension_size,
+ char *errstr,
+ size_t errstr_size) {
+ rd_kafka_sasl_oauthbearer_handle_t *handle = rk->rk_sasl.handle;
+ size_t i;
+ rd_ts_t now_wallclock;
+ rd_ts_t wts_md_lifetime = md_lifetime_ms * 1000;
+
+ /* Check if SASL/OAUTHBEARER is the configured auth mechanism */
+ if (rk->rk_conf.sasl.provider != &rd_kafka_sasl_oauthbearer_provider ||
+ !handle) {
+ rd_snprintf(errstr, errstr_size,
+ "SASL/OAUTHBEARER is not the "
+ "configured authentication mechanism");
+ return RD_KAFKA_RESP_ERR__STATE;
+ }
+
+ /* Check if there is an odd number of extension keys + values */
+ if (extension_size & 1) {
+ rd_snprintf(errstr, errstr_size,
+ "Incorrect extension size "
+ "(must be a non-negative multiple of 2): %" PRIusz,
+ extension_size);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ /* Check args for correct format/value */
+ now_wallclock = rd_uclock();
+ if (wts_md_lifetime <= now_wallclock) {
+ rd_snprintf(errstr, errstr_size,
+ "Must supply an unexpired token: "
+ "now=%" PRId64 "ms, exp=%" PRId64 "ms",
+ now_wallclock / 1000, wts_md_lifetime / 1000);
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ if (check_oauthbearer_extension_value(token_value, errstr,
+ errstr_size) == -1)
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ for (i = 0; i + 1 < extension_size; i += 2) {
+ if (check_oauthbearer_extension_key(extensions[i], errstr,
+ errstr_size) == -1 ||
+ check_oauthbearer_extension_value(extensions[i + 1], errstr,
+ errstr_size) == -1)
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+ }
+
+ rwlock_wrlock(&handle->lock);
+
+ RD_IF_FREE(handle->md_principal_name, rd_free);
+ handle->md_principal_name = rd_strdup(md_principal_name);
+
+ RD_IF_FREE(handle->token_value, rd_free);
+ handle->token_value = rd_strdup(token_value);
+
+ handle->wts_md_lifetime = wts_md_lifetime;
+
+ /* Schedule a refresh 80% through its remaining lifetime */
+ handle->wts_refresh_after =
+ (rd_ts_t)(now_wallclock + 0.8 * (wts_md_lifetime - now_wallclock));
+
+ rd_list_clear(&handle->extensions);
+ for (i = 0; i + 1 < extension_size; i += 2)
+ rd_list_add(&handle->extensions,
+ rd_strtup_new(extensions[i], extensions[i + 1]));
+
+ RD_IF_FREE(handle->errstr, rd_free);
+ handle->errstr = NULL;
+
+ rwlock_wrunlock(&handle->lock);
+
+ rd_kafka_dbg(rk, SECURITY, "BRKMAIN",
+ "Waking up waiting broker threads after "
+ "setting OAUTHBEARER token");
+ rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_TRY_CONNECT,
+ "OAUTHBEARER token update");
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+/**
+ * @brief SASL/OAUTHBEARER token refresh failure indicator.
+ *
+ * @param rk Client instance.
+ * @param errstr mandatory human readable error reason for failing to acquire
+ * a token.
+ *
+ * The SASL/OAUTHBEARER token refresh callback or event handler should cause
+ * this method to be invoked upon failure, via
+ * rd_kafka_oauthbearer_set_token_failure().
+ *
+ * @returns \c RD_KAFKA_RESP_ERR_NO_ERROR on success, otherwise
+ * \c RD_KAFKA_RESP_ERR__STATE if SASL/OAUTHBEARER is enabled but is
+ * not configured to be the client's authentication mechanism,
+ * \c RD_KAFKA_RESP_ERR__INVALID_ARG if no error string is supplied.
+
+ * @sa rd_kafka_oauthbearer_set_token0
+ */
+rd_kafka_resp_err_t
+rd_kafka_oauthbearer_set_token_failure0(rd_kafka_t *rk, const char *errstr) {
+ rd_kafka_sasl_oauthbearer_handle_t *handle = rk->rk_sasl.handle;
+ rd_bool_t error_changed;
+
+ /* Check if SASL/OAUTHBEARER is the configured auth mechanism */
+ if (rk->rk_conf.sasl.provider != &rd_kafka_sasl_oauthbearer_provider ||
+ !handle)
+ return RD_KAFKA_RESP_ERR__STATE;
+
+ if (!errstr || !*errstr)
+ return RD_KAFKA_RESP_ERR__INVALID_ARG;
+
+ rwlock_wrlock(&handle->lock);
+ error_changed = !handle->errstr || strcmp(handle->errstr, errstr);
+ RD_IF_FREE(handle->errstr, rd_free);
+ handle->errstr = rd_strdup(errstr);
+ /* Leave any existing token because it may have some life left,
+ * schedule a refresh for 10 seconds later. */
+ handle->wts_refresh_after = rd_uclock() + (10 * 1000 * 1000);
+ rwlock_wrunlock(&handle->lock);
+
+ /* Trigger an ERR__AUTHENTICATION error if the error changed. */
+ if (error_changed)
+ rd_kafka_op_err(rk, RD_KAFKA_RESP_ERR__AUTHENTICATION,
+ "Failed to acquire SASL OAUTHBEARER token: %s",
+ errstr);
+
+ return RD_KAFKA_RESP_ERR_NO_ERROR;
+}
+
+/**
+ * @brief Parse a config value from the string pointed to by \p loc and starting
+ * with the given \p prefix and ending with the given \p value_end_char, storing
+ * the newly-allocated memory result in the string pointed to by \p value.
+ * @returns -1 if string pointed to by \p value is non-empty (\p errstr set, no
+ * memory allocated), else 0 (caller must free allocated memory).
+ */
+static int parse_ujws_config_value_for_prefix(char **loc,
+ const char *prefix,
+ const char value_end_char,
+ char **value,
+ char *errstr,
+ size_t errstr_size) {
+ if (*value) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid sasl.oauthbearer.config: "
+ "multiple '%s' entries",
+ prefix);
+ return -1;
+ }
+
+ *loc += strlen(prefix);
+ *value = *loc;
+ while (**loc != '\0' && **loc != value_end_char)
+ ++*loc;
+
+ if (**loc == value_end_char) {
+ /* End the string and skip the character */
+ **loc = '\0';
+ ++*loc;
+ }
+
+ /* return new allocated memory */
+ *value = rd_strdup(*value);
+
+ return 0;
+}
+
+/*
+ * @brief Parse Unsecured JWS config, allocates strings that must be freed
+ * @param cfg the config to parse (typically from `sasl.oauthbearer.config`)
+ * @param parsed holds the parsed output; it must be all zeros to start.
+ * @returns -1 on failure (\p errstr set), else 0.
+ */
+static int
+parse_ujws_config(const char *cfg,
+ struct rd_kafka_sasl_oauthbearer_parsed_ujws *parsed,
+ char *errstr,
+ size_t errstr_size) {
+ /*
+ * Extensions:
+ *
+ * https://tools.ietf.org/html/rfc7628#section-3.1
+ * key = 1*(ALPHA)
+ * value = *(VCHAR / SP / HTAB / CR / LF )
+ *
+ * https://tools.ietf.org/html/rfc5234#appendix-B.1
+ * ALPHA = %x41-5A / %x61-7A ; A-Z / a-z
+ * VCHAR = %x21-7E ; visible (printing) characters
+ * SP = %x20 ; space
+ * HTAB = %x09 ; horizontal tab
+ * CR = %x0D ; carriage return
+ * LF = %x0A ; linefeed
+ */
+
+ static const char *prefix_principal_claim_name = "principalClaimName=";
+ static const char *prefix_principal = "principal=";
+ static const char *prefix_scope_claim_name = "scopeClaimName=";
+ static const char *prefix_scope = "scope=";
+ static const char *prefix_life_seconds = "lifeSeconds=";
+ static const char *prefix_extension = "extension_";
+
+ char *cfg_copy = rd_strdup(cfg);
+ char *loc = cfg_copy;
+ int r = 0;
+
+ while (*loc != '\0' && !r) {
+ if (*loc == ' ')
+ ++loc;
+ else if (!strncmp(prefix_principal_claim_name, loc,
+ strlen(prefix_principal_claim_name))) {
+ r = parse_ujws_config_value_for_prefix(
+ &loc, prefix_principal_claim_name, ' ',
+ &parsed->principal_claim_name, errstr, errstr_size);
+
+ if (!r && !*parsed->principal_claim_name) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid sasl.oauthbearer.config: "
+ "empty '%s'",
+ prefix_principal_claim_name);
+ r = -1;
+ }
+
+ } else if (!strncmp(prefix_principal, loc,
+ strlen(prefix_principal))) {
+ r = parse_ujws_config_value_for_prefix(
+ &loc, prefix_principal, ' ', &parsed->principal,
+ errstr, errstr_size);
+
+ if (!r && !*parsed->principal) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid sasl.oauthbearer.config: "
+ "empty '%s'",
+ prefix_principal);
+ r = -1;
+ }
+
+ } else if (!strncmp(prefix_scope_claim_name, loc,
+ strlen(prefix_scope_claim_name))) {
+ r = parse_ujws_config_value_for_prefix(
+ &loc, prefix_scope_claim_name, ' ',
+ &parsed->scope_claim_name, errstr, errstr_size);
+
+ if (!r && !*parsed->scope_claim_name) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid sasl.oauthbearer.config: "
+ "empty '%s'",
+ prefix_scope_claim_name);
+ r = -1;
+ }
+
+ } else if (!strncmp(prefix_scope, loc, strlen(prefix_scope))) {
+ r = parse_ujws_config_value_for_prefix(
+ &loc, prefix_scope, ' ', &parsed->scope_csv_text,
+ errstr, errstr_size);
+
+ if (!r && !*parsed->scope_csv_text) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid sasl.oauthbearer.config: "
+ "empty '%s'",
+ prefix_scope);
+ r = -1;
+ }
+
+ } else if (!strncmp(prefix_life_seconds, loc,
+ strlen(prefix_life_seconds))) {
+ char *life_seconds_text = NULL;
+
+ r = parse_ujws_config_value_for_prefix(
+ &loc, prefix_life_seconds, ' ', &life_seconds_text,
+ errstr, errstr_size);
+
+ if (!r && !*life_seconds_text) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid "
+ "sasl.oauthbearer.config: "
+ "empty '%s'",
+ prefix_life_seconds);
+ r = -1;
+ } else if (!r) {
+ long long life_seconds_long;
+ char *end_ptr;
+ life_seconds_long =
+ strtoll(life_seconds_text, &end_ptr, 10);
+ if (*end_ptr != '\0') {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid "
+ "sasl.oauthbearer.config: "
+ "non-integral '%s': %s",
+ prefix_life_seconds,
+ life_seconds_text);
+ r = -1;
+ } else if (life_seconds_long <= 0 ||
+ life_seconds_long > INT_MAX) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid "
+ "sasl.oauthbearer.config: "
+ "value out of range of "
+ "positive int '%s': %s",
+ prefix_life_seconds,
+ life_seconds_text);
+ r = -1;
+ } else {
+ parsed->life_seconds =
+ (int)life_seconds_long;
+ }
+ }
+
+ RD_IF_FREE(life_seconds_text, rd_free);
+
+ } else if (!strncmp(prefix_extension, loc,
+ strlen(prefix_extension))) {
+ char *extension_key = NULL;
+
+ r = parse_ujws_config_value_for_prefix(
+ &loc, prefix_extension, '=', &extension_key, errstr,
+ errstr_size);
+
+ if (!r && !*extension_key) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid "
+ "sasl.oauthbearer.config: "
+ "empty '%s' key",
+ prefix_extension);
+ r = -1;
+ } else if (!r) {
+ char *extension_value = NULL;
+ r = parse_ujws_config_value_for_prefix(
+ &loc, "", ' ', &extension_value, errstr,
+ errstr_size);
+ if (!r) {
+ rd_list_add(
+ &parsed->extensions,
+ rd_strtup_new(extension_key,
+ extension_value));
+ rd_free(extension_value);
+ }
+ }
+
+ RD_IF_FREE(extension_key, rd_free);
+
+ } else {
+ rd_snprintf(errstr, errstr_size,
+ "Unrecognized sasl.oauthbearer.config "
+ "beginning at: %s",
+ loc);
+ r = -1;
+ }
+ }
+
+ rd_free(cfg_copy);
+
+ return r;
+}
+
+/**
+ * @brief Create unsecured JWS compact serialization
+ * from the given information.
+ * @returns allocated memory that the caller must free.
+ */
+static char *create_jws_compact_serialization(
+ const struct rd_kafka_sasl_oauthbearer_parsed_ujws *parsed,
+ rd_ts_t now_wallclock) {
+ static const char *jose_header_encoded =
+ "eyJhbGciOiJub25lIn0"; // {"alg":"none"}
+ int scope_json_length = 0;
+ int max_json_length;
+ double now_wallclock_seconds;
+ char *scope_json;
+ char *scope_curr;
+ int i;
+ char *claims_json;
+ char *jws_claims;
+ size_t encode_len;
+ char *jws_last_char;
+ char *jws_maybe_non_url_char;
+ char *retval_jws;
+ size_t retval_size;
+ rd_list_t scope;
+
+ rd_list_init(&scope, 0, rd_free);
+ if (parsed->scope_csv_text) {
+ /* Convert from csv to rd_list_t and
+ * calculate json length. */
+ char *start = parsed->scope_csv_text;
+ char *curr = start;
+
+ while (*curr != '\0') {
+ /* Ignore empty elements (e.g. ",,") */
+ while (*curr == ',') {
+ ++curr;
+ ++start;
+ }
+
+ while (*curr != '\0' && *curr != ',')
+ ++curr;
+
+ if (curr == start)
+ continue;
+
+ if (*curr == ',') {
+ *curr = '\0';
+ ++curr;
+ }
+
+ if (!rd_list_find(&scope, start, (void *)strcmp))
+ rd_list_add(&scope, rd_strdup(start));
+
+ if (scope_json_length == 0) {
+ scope_json_length =
+ 2 + // ,"
+ (int)strlen(parsed->scope_claim_name) +
+ 4 + // ":["
+ (int)strlen(start) + 1 + // "
+ 1; // ]
+ } else {
+ scope_json_length += 2; // ,"
+ scope_json_length += (int)strlen(start);
+ scope_json_length += 1; // "
+ }
+
+ start = curr;
+ }
+ }
+
+ now_wallclock_seconds = now_wallclock / 1000000.0;
+
+ /* Generate json */
+ max_json_length = 2 + // {"
+ (int)strlen(parsed->principal_claim_name) +
+ 3 + // ":"
+ (int)strlen(parsed->principal) + 8 + // ","iat":
+ 14 + // iat NumericDate (e.g. 1549251467.546)
+ 7 + // ,"exp":
+ 14 + // exp NumericDate (e.g. 1549252067.546)
+ scope_json_length + 1; // }
+
+ /* Generate scope portion of json */
+ scope_json = rd_malloc(scope_json_length + 1);
+ *scope_json = '\0';
+ scope_curr = scope_json;
+
+ for (i = 0; i < rd_list_cnt(&scope); i++) {
+ if (i == 0)
+ scope_curr += rd_snprintf(
+ scope_curr,
+ (size_t)(scope_json + scope_json_length + 1 -
+ scope_curr),
+ ",\"%s\":[\"", parsed->scope_claim_name);
+ else
+ scope_curr += sprintf(scope_curr, "%s", ",\"");
+ scope_curr += sprintf(scope_curr, "%s\"",
+ (const char *)rd_list_elem(&scope, i));
+ if (i == rd_list_cnt(&scope) - 1)
+ scope_curr += sprintf(scope_curr, "%s", "]");
+ }
+
+ claims_json = rd_malloc(max_json_length + 1);
+ rd_snprintf(claims_json, max_json_length + 1,
+ "{\"%s\":\"%s\",\"iat\":%.3f,\"exp\":%.3f%s}",
+ parsed->principal_claim_name, parsed->principal,
+ now_wallclock_seconds,
+ now_wallclock_seconds + parsed->life_seconds, scope_json);
+ rd_free(scope_json);
+
+ /* Convert to base64URL format, first to base64, then to base64URL */
+ retval_size = strlen(jose_header_encoded) + 1 +
+ (((max_json_length + 2) / 3) * 4) + 1 + 1;
+ retval_jws = rd_malloc(retval_size);
+ rd_snprintf(retval_jws, retval_size, "%s.", jose_header_encoded);
+ jws_claims = retval_jws + strlen(retval_jws);
+ encode_len =
+ EVP_EncodeBlock((uint8_t *)jws_claims, (uint8_t *)claims_json,
+ (int)strlen(claims_json));
+ rd_free(claims_json);
+ jws_last_char = jws_claims + encode_len - 1;
+
+ /* Convert from padded base64 to unpadded base64URL
+ * and eliminate any padding. */
+ while (jws_last_char >= jws_claims && *jws_last_char == '=')
+ --jws_last_char;
+ *(++jws_last_char) = '.';
+ *(jws_last_char + 1) = '\0';
+
+ /* Convert the 2 differing encode characters */
+ for (jws_maybe_non_url_char = retval_jws; *jws_maybe_non_url_char;
+ jws_maybe_non_url_char++)
+ if (*jws_maybe_non_url_char == '+')
+ *jws_maybe_non_url_char = '-';
+ else if (*jws_maybe_non_url_char == '/')
+ *jws_maybe_non_url_char = '_';
+
+ rd_list_destroy(&scope);
+
+ return retval_jws;
+}
+
+/**
+ * @brief Same as rd_kafka_oauthbearer_unsecured_token() except it takes
+ * additional explicit arguments and return a status code along with
+ * the token to set in order to facilitate unit testing.
+ * @param token output defining the token to set
+ * @param cfg the config to parse (typically from `sasl.oauthbearer.config`)
+ * @param now_wallclock_ms the valued to be used for the `iat` claim
+ * (and by implication, the `exp` claim)
+ * @returns -1 on failure (\p errstr set), else 0.
+ */
+static int rd_kafka_oauthbearer_unsecured_token0(
+ struct rd_kafka_sasl_oauthbearer_token *token,
+ const char *cfg,
+ int64_t now_wallclock_ms,
+ char *errstr,
+ size_t errstr_size) {
+ struct rd_kafka_sasl_oauthbearer_parsed_ujws parsed = RD_ZERO_INIT;
+ int r;
+ int i;
+
+ if (!cfg || !*cfg) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid sasl.oauthbearer.config: "
+ "must not be empty");
+ return -1;
+ }
+
+ memset(token, 0, sizeof(*token));
+
+ rd_list_init(&parsed.extensions, 0,
+ (void (*)(void *))rd_strtup_destroy);
+
+ if (!(r = parse_ujws_config(cfg, &parsed, errstr, errstr_size))) {
+ /* Make sure we have required and valid info */
+ if (!parsed.principal_claim_name)
+ parsed.principal_claim_name = rd_strdup("sub");
+ if (!parsed.scope_claim_name)
+ parsed.scope_claim_name = rd_strdup("scope");
+ if (!parsed.life_seconds)
+ parsed.life_seconds = 3600;
+ if (!parsed.principal) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid sasl.oauthbearer.config: "
+ "no principal=<value>");
+ r = -1;
+ } else if (strchr(parsed.principal, '"')) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid sasl.oauthbearer.config: "
+ "'\"' cannot appear in principal: %s",
+ parsed.principal);
+ r = -1;
+ } else if (strchr(parsed.principal_claim_name, '"')) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid sasl.oauthbearer.config: "
+ "'\"' cannot appear in "
+ "principalClaimName: %s",
+ parsed.principal_claim_name);
+ r = -1;
+ } else if (strchr(parsed.scope_claim_name, '"')) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid sasl.oauthbearer.config: "
+ "'\"' cannot appear in scopeClaimName: %s",
+ parsed.scope_claim_name);
+ r = -1;
+ } else if (parsed.scope_csv_text &&
+ strchr(parsed.scope_csv_text, '"')) {
+ rd_snprintf(errstr, errstr_size,
+ "Invalid sasl.oauthbearer.config: "
+ "'\"' cannot appear in scope: %s",
+ parsed.scope_csv_text);
+ r = -1;
+ } else {
+ char **extensionv;
+ int extension_pair_count;
+ char *jws = create_jws_compact_serialization(
+ &parsed, now_wallclock_ms * 1000);
+
+ extension_pair_count = rd_list_cnt(&parsed.extensions);
+ extensionv = rd_malloc(sizeof(*extensionv) * 2 *
+ extension_pair_count);
+ for (i = 0; i < extension_pair_count; ++i) {
+ rd_strtup_t *strtup =
+ (rd_strtup_t *)rd_list_elem(
+ &parsed.extensions, i);
+ extensionv[2 * i] = rd_strdup(strtup->name);
+ extensionv[2 * i + 1] =
+ rd_strdup(strtup->value);
+ }
+ token->token_value = jws;
+ token->md_lifetime_ms =
+ now_wallclock_ms + parsed.life_seconds * 1000;
+ token->md_principal_name = rd_strdup(parsed.principal);
+ token->extensions = extensionv;
+ token->extension_size = 2 * extension_pair_count;
+ }
+ }
+ RD_IF_FREE(parsed.principal_claim_name, rd_free);
+ RD_IF_FREE(parsed.principal, rd_free);
+ RD_IF_FREE(parsed.scope_claim_name, rd_free);
+ RD_IF_FREE(parsed.scope_csv_text, rd_free);
+ rd_list_destroy(&parsed.extensions);
+
+ if (r == -1)
+ rd_kafka_sasl_oauthbearer_token_free(token);
+
+ return r;
+}
+
+/**
+ * @brief Default SASL/OAUTHBEARER token refresh callback that generates an
+ * unsecured JWS as per https://tools.ietf.org/html/rfc7515#appendix-A.5.
+ *
+ * This method interprets `sasl.oauthbearer.config` as space-separated
+ * name=value pairs with valid names including principalClaimName,
+ * principal, scopeClaimName, scope, and lifeSeconds. The default
+ * value for principalClaimName is "sub". The principal must be specified.
+ * The default value for scopeClaimName is "scope", and the default value
+ * for lifeSeconds is 3600. The scope value is CSV format with the
+ * default value being no/empty scope. For example:
+ * "principalClaimName=azp principal=admin scopeClaimName=roles
+ * scope=role1,role2 lifeSeconds=600".
+ *
+ * SASL extensions can be communicated to the broker via
+ * extension_NAME=value. For example:
+ * "principal=admin extension_traceId=123". Extension names and values
+ * must conform to the required syntax as per
+ * https://tools.ietf.org/html/rfc7628#section-3.1
+ *
+ * All values -- whether extensions, claim names, or scope elements -- must not
+ * include a quote (") character. The parsing rules also imply that names
+ * and values cannot include a space character, and scope elements cannot
+ * include a comma (,) character.
+ *
+ * The existence of any kind of parsing problem -- an unrecognized name,
+ * a quote character in a value, an empty value, etc. -- raises the
+ * \c RD_KAFKA_RESP_ERR__AUTHENTICATION event.
+ *
+ * Unsecured tokens are not to be used in production -- they are only good for
+ * testing and development purposess -- so while the inflexibility of the
+ * parsing rules is acknowledged, it is assumed that this is not problematic.
+ */
+void rd_kafka_oauthbearer_unsecured_token(rd_kafka_t *rk,
+ const char *oauthbearer_config,
+ void *opaque) {
+ char errstr[512];
+ struct rd_kafka_sasl_oauthbearer_token token = RD_ZERO_INIT;
+
+ rd_kafka_dbg(rk, SECURITY, "OAUTHBEARER", "Creating unsecured token");
+
+ if (rd_kafka_oauthbearer_unsecured_token0(&token, oauthbearer_config,
+ rd_uclock() / 1000, errstr,
+ sizeof(errstr)) == -1 ||
+ rd_kafka_oauthbearer_set_token(
+ rk, token.token_value, token.md_lifetime_ms,
+ token.md_principal_name, (const char **)token.extensions,
+ token.extension_size, errstr, sizeof(errstr)) == -1) {
+ rd_kafka_oauthbearer_set_token_failure(rk, errstr);
+ }
+
+ rd_kafka_sasl_oauthbearer_token_free(&token);
+}
+
+/**
+ * @brief Close and free authentication state
+ */
+static void rd_kafka_sasl_oauthbearer_close(rd_kafka_transport_t *rktrans) {
+ struct rd_kafka_sasl_oauthbearer_state *state =
+ rktrans->rktrans_sasl.state;
+
+ if (!state)
+ return;
+
+ RD_IF_FREE(state->server_error_msg, rd_free);
+ rd_free(state->token_value);
+ rd_free(state->md_principal_name);
+ rd_list_destroy(&state->extensions);
+ rd_free(state);
+}
+
+
+
+/**
+ * @brief Build client-first-message
+ */
+static void rd_kafka_sasl_oauthbearer_build_client_first_message(
+ rd_kafka_transport_t *rktrans,
+ rd_chariov_t *out) {
+ struct rd_kafka_sasl_oauthbearer_state *state =
+ rktrans->rktrans_sasl.state;
+
+ /*
+ * https://tools.ietf.org/html/rfc7628#section-3.1
+ * kvsep = %x01
+ * key = 1*(ALPHA)
+ * value = *(VCHAR / SP / HTAB / CR / LF )
+ * kvpair = key "=" value kvsep
+ * ;;gs2-header = See RFC 5801
+ * client-resp = (gs2-header kvsep *kvpair kvsep) / kvsep
+ */
+
+ static const char *gs2_header = "n,,";
+ static const char *kvsep = "\x01";
+ const int kvsep_size = (int)strlen(kvsep);
+ int extension_size = 0;
+ int i;
+ char *buf;
+ int size_written;
+ unsigned long r;
+
+ for (i = 0; i < rd_list_cnt(&state->extensions); i++) {
+ rd_strtup_t *extension = rd_list_elem(&state->extensions, i);
+ // kvpair = key "=" value kvsep
+ extension_size += (int)strlen(extension->name) + 1 // "="
+ + (int)strlen(extension->value) + kvsep_size;
+ }
+
+ // client-resp = (gs2-header kvsep *kvpair kvsep) / kvsep
+ out->size = strlen(gs2_header) + kvsep_size + strlen("auth=Bearer ") +
+ strlen(state->token_value) + kvsep_size + extension_size +
+ kvsep_size;
+ out->ptr = rd_malloc(out->size + 1);
+
+ buf = out->ptr;
+ size_written = 0;
+ r = rd_snprintf(buf, out->size + 1 - size_written,
+ "%s%sauth=Bearer %s%s", gs2_header, kvsep,
+ state->token_value, kvsep);
+ rd_assert(r < out->size + 1 - size_written);
+ size_written += r;
+ buf = out->ptr + size_written;
+
+ for (i = 0; i < rd_list_cnt(&state->extensions); i++) {
+ rd_strtup_t *extension = rd_list_elem(&state->extensions, i);
+ r = rd_snprintf(buf, out->size + 1 - size_written, "%s=%s%s",
+ extension->name, extension->value, kvsep);
+ rd_assert(r < out->size + 1 - size_written);
+ size_written += r;
+ buf = out->ptr + size_written;
+ }
+
+ r = rd_snprintf(buf, out->size + 1 - size_written, "%s", kvsep);
+ rd_assert(r < out->size + 1 - size_written);
+
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "OAUTHBEARER",
+ "Built client first message");
+}
+
+
+
+/**
+ * @brief SASL OAUTHBEARER client state machine
+ * @returns -1 on failure (\p errstr set), else 0.
+ */
+static int rd_kafka_sasl_oauthbearer_fsm(rd_kafka_transport_t *rktrans,
+ const rd_chariov_t *in,
+ char *errstr,
+ size_t errstr_size) {
+ static const char *state_names[] = {
+ "client-first-message",
+ "server-first-message",
+ "server-failure-message",
+ };
+ struct rd_kafka_sasl_oauthbearer_state *state =
+ rktrans->rktrans_sasl.state;
+ rd_chariov_t out = RD_ZERO_INIT;
+ int r = -1;
+
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "OAUTHBEARER",
+ "SASL OAUTHBEARER client in state %s",
+ state_names[state->state]);
+
+ switch (state->state) {
+ case RD_KAFKA_SASL_OAUTHB_STATE_SEND_CLIENT_FIRST_MESSAGE:
+ rd_dassert(!in); /* Not expecting any server-input */
+
+ rd_kafka_sasl_oauthbearer_build_client_first_message(rktrans,
+ &out);
+ state->state = RD_KAFKA_SASL_OAUTHB_STATE_RECV_SERVER_FIRST_MSG;
+ break;
+
+
+ case RD_KAFKA_SASL_OAUTHB_STATE_RECV_SERVER_FIRST_MSG:
+ if (!in->size || !*in->ptr) {
+ /* Success */
+ rd_rkb_dbg(rktrans->rktrans_rkb,
+ SECURITY | RD_KAFKA_DBG_BROKER,
+ "OAUTHBEARER",
+ "SASL OAUTHBEARER authentication "
+ "successful (principal=%s)",
+ state->md_principal_name);
+ rd_kafka_sasl_auth_done(rktrans);
+ r = 0;
+ break;
+ }
+
+ /* Failure; save error message for later */
+ state->server_error_msg = rd_strndup(in->ptr, in->size);
+
+ /*
+ * https://tools.ietf.org/html/rfc7628#section-3.1
+ * kvsep = %x01
+ * client-resp = (gs2-header kvsep *kvpair kvsep) / kvsep
+ *
+ * Send final kvsep (CTRL-A) character
+ */
+ out.size = 1;
+ out.ptr = rd_malloc(out.size + 1);
+ rd_snprintf(out.ptr, out.size + 1, "\x01");
+ state->state =
+ RD_KAFKA_SASL_OAUTHB_STATE_RECV_SERVER_MSG_AFTER_FAIL;
+ r = 0; // Will fail later in next state after sending response
+ break;
+
+ case RD_KAFKA_SASL_OAUTHB_STATE_RECV_SERVER_MSG_AFTER_FAIL:
+ /* Failure as previosuly communicated by server first message */
+ rd_snprintf(errstr, errstr_size,
+ "SASL OAUTHBEARER authentication failed "
+ "(principal=%s): %s",
+ state->md_principal_name, state->server_error_msg);
+ rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY | RD_KAFKA_DBG_BROKER,
+ "OAUTHBEARER", "%s", errstr);
+ r = -1;
+ break;
+ }
+
+ if (out.ptr) {
+ r = rd_kafka_sasl_send(rktrans, out.ptr, (int)out.size, errstr,
+ errstr_size);
+ rd_free(out.ptr);
+ }
+
+ return r;
+}
+
+
+/**
+ * @brief Handle received frame from broker.
+ */
+static int rd_kafka_sasl_oauthbearer_recv(rd_kafka_transport_t *rktrans,
+ const void *buf,
+ size_t size,
+ char *errstr,
+ size_t errstr_size) {
+ const rd_chariov_t in = {.ptr = (char *)buf, .size = size};
+ return rd_kafka_sasl_oauthbearer_fsm(rktrans, &in, errstr, errstr_size);
+}
+
+
+/**
+ * @brief Initialize and start SASL OAUTHBEARER (builtin) authentication.
+ *
+ * Returns 0 on successful init and -1 on error.
+ *
+ * @locality broker thread
+ */
+static int rd_kafka_sasl_oauthbearer_client_new(rd_kafka_transport_t *rktrans,
+ const char *hostname,
+ char *errstr,
+ size_t errstr_size) {
+ rd_kafka_sasl_oauthbearer_handle_t *handle =
+ rktrans->rktrans_rkb->rkb_rk->rk_sasl.handle;
+ struct rd_kafka_sasl_oauthbearer_state *state;
+
+ state = rd_calloc(1, sizeof(*state));
+ state->state = RD_KAFKA_SASL_OAUTHB_STATE_SEND_CLIENT_FIRST_MESSAGE;
+
+ /*
+ * Save off the state structure now, before any possibility of
+ * returning, so that we will always free up the allocated memory in
+ * rd_kafka_sasl_oauthbearer_close().
+ */
+ rktrans->rktrans_sasl.state = state;
+
+ /*
+ * Make sure we have a consistent view of the token and extensions
+ * throughout the authentication process -- even if it is refreshed
+ * midway through this particular authentication.
+ */
+ rwlock_rdlock(&handle->lock);
+ if (!handle->token_value) {
+ rd_snprintf(errstr, errstr_size,
+ "OAUTHBEARER cannot log in because there "
+ "is no token available; last error: %s",
+ handle->errstr ? handle->errstr
+ : "(not available)");
+ rwlock_rdunlock(&handle->lock);
+ return -1;
+ }
+
+ state->token_value = rd_strdup(handle->token_value);
+ state->md_principal_name = rd_strdup(handle->md_principal_name);
+ rd_list_copy_to(&state->extensions, &handle->extensions,
+ rd_strtup_list_copy, NULL);
+
+ rwlock_rdunlock(&handle->lock);
+
+ /* Kick off the FSM */
+ return rd_kafka_sasl_oauthbearer_fsm(rktrans, NULL, errstr,
+ errstr_size);
+}
+
+
+/**
+ * @brief Token refresh timer callback.
+ *
+ * @locality rdkafka main thread
+ */
+static void
+rd_kafka_sasl_oauthbearer_token_refresh_tmr_cb(rd_kafka_timers_t *rkts,
+ void *arg) {
+ rd_kafka_t *rk = arg;
+ rd_kafka_sasl_oauthbearer_handle_t *handle = rk->rk_sasl.handle;
+
+ /* Enqueue a token refresh if necessary */
+ rd_kafka_oauthbearer_enqueue_token_refresh_if_necessary(handle);
+}
+
+
+/**
+ * @brief Per-client-instance initializer
+ */
+static int rd_kafka_sasl_oauthbearer_init(rd_kafka_t *rk,
+ char *errstr,
+ size_t errstr_size) {
+ rd_kafka_sasl_oauthbearer_handle_t *handle;
+
+ handle = rd_calloc(1, sizeof(*handle));
+ rk->rk_sasl.handle = handle;
+
+ rwlock_init(&handle->lock);
+
+ handle->rk = rk;
+
+ rd_list_init(&handle->extensions, 0,
+ (void (*)(void *))rd_strtup_destroy);
+
+ rd_kafka_timer_start(
+ &rk->rk_timers, &handle->token_refresh_tmr, 1 * 1000 * 1000,
+ rd_kafka_sasl_oauthbearer_token_refresh_tmr_cb, rk);
+
+ /* Automatically refresh the token if using the builtin
+ * unsecure JWS token refresher, to avoid an initial connection
+ * stall as we wait for the application to call poll(). */
+ if (rk->rk_conf.sasl.oauthbearer.token_refresh_cb ==
+ rd_kafka_oauthbearer_unsecured_token) {
+ rk->rk_conf.sasl.oauthbearer.token_refresh_cb(
+ rk, rk->rk_conf.sasl.oauthbearer_config,
+ rk->rk_conf.opaque);
+
+ return 0;
+ }
+
+ if (rk->rk_conf.sasl.enable_callback_queue) {
+ /* SASL specific callback queue enabled */
+ rk->rk_sasl.callback_q = rd_kafka_q_new(rk);
+ handle->callback_q = rd_kafka_q_keep(rk->rk_sasl.callback_q);
+ } else {
+ /* Use main queue */
+ handle->callback_q = rd_kafka_q_keep(rk->rk_rep);
+ }
+
+#if WITH_OAUTHBEARER_OIDC
+ if (rk->rk_conf.sasl.oauthbearer.method ==
+ RD_KAFKA_SASL_OAUTHBEARER_METHOD_OIDC &&
+ rk->rk_conf.sasl.oauthbearer.token_refresh_cb ==
+ rd_kafka_oidc_token_refresh_cb) {
+ handle->internal_refresh = rd_true;
+ rd_kafka_sasl_background_callbacks_enable(rk);
+ }
+#endif
+
+ /* Otherwise enqueue a refresh callback for the application. */
+ rd_kafka_oauthbearer_enqueue_token_refresh(handle);
+
+ return 0;
+}
+
+
+/**
+ * @brief Per-client-instance destructor
+ */
+static void rd_kafka_sasl_oauthbearer_term(rd_kafka_t *rk) {
+ rd_kafka_sasl_oauthbearer_handle_t *handle = rk->rk_sasl.handle;
+
+ if (!handle)
+ return;
+
+ rk->rk_sasl.handle = NULL;
+
+ rd_kafka_timer_stop(&rk->rk_timers, &handle->token_refresh_tmr, 1);
+
+ RD_IF_FREE(handle->md_principal_name, rd_free);
+ RD_IF_FREE(handle->token_value, rd_free);
+ rd_list_destroy(&handle->extensions);
+ RD_IF_FREE(handle->errstr, rd_free);
+ RD_IF_FREE(handle->callback_q, rd_kafka_q_destroy);
+
+ rwlock_destroy(&handle->lock);
+
+ rd_free(handle);
+}
+
+
+/**
+ * @brief SASL/OAUTHBEARER is unable to connect unless a valid
+ * token is available, and a valid token CANNOT be
+ * available unless/until an initial token retrieval
+ * succeeds, so wait for this precondition if necessary.
+ */
+static rd_bool_t rd_kafka_sasl_oauthbearer_ready(rd_kafka_t *rk) {
+ rd_kafka_sasl_oauthbearer_handle_t *handle = rk->rk_sasl.handle;
+
+ if (!handle)
+ return rd_false;
+
+ return rd_kafka_oauthbearer_has_token(handle);
+}
+
+
+/**
+ * @brief Validate OAUTHBEARER config, which is a no-op
+ * (we rely on initial token retrieval)
+ */
+static int rd_kafka_sasl_oauthbearer_conf_validate(rd_kafka_t *rk,
+ char *errstr,
+ size_t errstr_size) {
+ /*
+ * We must rely on the initial token retrieval as a proxy
+ * for configuration validation because the configuration is
+ * implementation-dependent, and it is not necessarily the case
+ * that the config reflects the default unsecured JWS config
+ * that we know how to parse.
+ */
+ return 0;
+}
+
+
+
+const struct rd_kafka_sasl_provider rd_kafka_sasl_oauthbearer_provider = {
+ .name = "OAUTHBEARER (builtin)",
+ .init = rd_kafka_sasl_oauthbearer_init,
+ .term = rd_kafka_sasl_oauthbearer_term,
+ .ready = rd_kafka_sasl_oauthbearer_ready,
+ .client_new = rd_kafka_sasl_oauthbearer_client_new,
+ .recv = rd_kafka_sasl_oauthbearer_recv,
+ .close = rd_kafka_sasl_oauthbearer_close,
+ .conf_validate = rd_kafka_sasl_oauthbearer_conf_validate,
+};
+
+
+
+/**
+ * @name Unit tests
+ *
+ *
+ */
+
+/**
+ * @brief `sasl.oauthbearer.config` test:
+ * should generate correct default values.
+ */
+static int do_unittest_config_defaults(void) {
+ static const char *sasl_oauthbearer_config =
+ "principal=fubar "
+ "scopeClaimName=whatever";
+ // default scope is empty, default lifetime is 3600 seconds
+ // {"alg":"none"}
+ // .
+ // {"sub":"fubar","iat":1.000,"exp":3601.000}
+ //
+ static const char *expected_token_value =
+ "eyJhbGciOiJub25lIn0"
+ "."
+ "eyJzdWIiOiJmdWJhciIsImlhdCI6MS4wMDAsImV4cCI6MzYwMS4wMDB9"
+ ".";
+ rd_ts_t now_wallclock_ms = 1000;
+ char errstr[512];
+ struct rd_kafka_sasl_oauthbearer_token token;
+ int r;
+
+ r = rd_kafka_oauthbearer_unsecured_token0(
+ &token, sasl_oauthbearer_config, now_wallclock_ms, errstr,
+ sizeof(errstr));
+ if (r == -1)
+ RD_UT_FAIL("Failed to create a token: %s: %s",
+ sasl_oauthbearer_config, errstr);
+
+ RD_UT_ASSERT(token.md_lifetime_ms == now_wallclock_ms + 3600 * 1000,
+ "Invalid md_lifetime_ms %" PRId64, token.md_lifetime_ms);
+ RD_UT_ASSERT(!strcmp(token.md_principal_name, "fubar"),
+ "Invalid md_principal_name %s", token.md_principal_name);
+ RD_UT_ASSERT(!strcmp(token.token_value, expected_token_value),
+ "Invalid token_value %s, expected %s", token.token_value,
+ expected_token_value);
+
+ rd_kafka_sasl_oauthbearer_token_free(&token);
+
+ RD_UT_PASS();
+}
+
+/**
+ * @brief `sasl.oauthbearer.config` test:
+ * should generate correct token for explicit scope and lifeSeconds values.
+ */
+static int do_unittest_config_explicit_scope_and_life(void) {
+ static const char *sasl_oauthbearer_config =
+ "principal=fubar "
+ "scope=role1,role2 lifeSeconds=60";
+ // {"alg":"none"}
+ // .
+ // {"sub":"fubar","iat":1.000,"exp":61.000,"scope":["role1","role2"]}
+ //
+ static const char *expected_token_value =
+ "eyJhbGciOiJub25lIn0"
+ "."
+ "eyJzdWIiOiJmdWJhciIsImlhdCI6MS4wMDAsImV4cCI6NjEuMDAwLCJzY29wZ"
+ "SI6WyJyb2xlMSIsInJvbGUyIl19"
+ ".";
+ rd_ts_t now_wallclock_ms = 1000;
+ char errstr[512];
+ struct rd_kafka_sasl_oauthbearer_token token;
+ int r;
+
+ r = rd_kafka_oauthbearer_unsecured_token0(
+ &token, sasl_oauthbearer_config, now_wallclock_ms, errstr,
+ sizeof(errstr));
+ if (r == -1)
+ RD_UT_FAIL("Failed to create a token: %s: %s",
+ sasl_oauthbearer_config, errstr);
+
+ RD_UT_ASSERT(token.md_lifetime_ms == now_wallclock_ms + 60 * 1000,
+ "Invalid md_lifetime_ms %" PRId64, token.md_lifetime_ms);
+ RD_UT_ASSERT(!strcmp(token.md_principal_name, "fubar"),
+ "Invalid md_principal_name %s", token.md_principal_name);
+ RD_UT_ASSERT(!strcmp(token.token_value, expected_token_value),
+ "Invalid token_value %s, expected %s", token.token_value,
+ expected_token_value);
+
+ rd_kafka_sasl_oauthbearer_token_free(&token);
+
+ RD_UT_PASS();
+}
+
+/**
+ * @brief `sasl.oauthbearer.config` test:
+ * should generate correct token when all values are provided explicitly.
+ */
+static int do_unittest_config_all_explicit_values(void) {
+ static const char *sasl_oauthbearer_config =
+ "principal=fubar "
+ "principalClaimName=azp scope=role1,role2 "
+ "scopeClaimName=roles lifeSeconds=60";
+ // {"alg":"none"}
+ // .
+ // {"azp":"fubar","iat":1.000,"exp":61.000,"roles":["role1","role2"]}
+ //
+ static const char *expected_token_value =
+ "eyJhbGciOiJub25lIn0"
+ "."
+ "eyJhenAiOiJmdWJhciIsImlhdCI6MS4wMDAsImV4cCI6NjEuMDAwLCJyb2xlc"
+ "yI6WyJyb2xlMSIsInJvbGUyIl19"
+ ".";
+ rd_ts_t now_wallclock_ms = 1000;
+ char errstr[512];
+ struct rd_kafka_sasl_oauthbearer_token token;
+ int r;
+
+ r = rd_kafka_oauthbearer_unsecured_token0(
+ &token, sasl_oauthbearer_config, now_wallclock_ms, errstr,
+ sizeof(errstr));
+ if (r == -1)
+ RD_UT_FAIL("Failed to create a token: %s: %s",
+ sasl_oauthbearer_config, errstr);
+
+ RD_UT_ASSERT(token.md_lifetime_ms == now_wallclock_ms + 60 * 1000,
+ "Invalid md_lifetime_ms %" PRId64, token.md_lifetime_ms);
+ RD_UT_ASSERT(!strcmp(token.md_principal_name, "fubar"),
+ "Invalid md_principal_name %s", token.md_principal_name);
+ RD_UT_ASSERT(!strcmp(token.token_value, expected_token_value),
+ "Invalid token_value %s, expected %s", token.token_value,
+ expected_token_value);
+
+ rd_kafka_sasl_oauthbearer_token_free(&token);
+
+ RD_UT_PASS();
+}
+
+/**
+ * @brief `sasl.oauthbearer.config` test:
+ * should fail when no principal specified.
+ */
+static int do_unittest_config_no_principal_should_fail(void) {
+ static const char *expected_msg =
+ "Invalid sasl.oauthbearer.config: "
+ "no principal=<value>";
+ static const char *sasl_oauthbearer_config =
+ "extension_notaprincipal=hi";
+ rd_ts_t now_wallclock_ms = 1000;
+ char errstr[512];
+ struct rd_kafka_sasl_oauthbearer_token token = RD_ZERO_INIT;
+ int r;
+
+ r = rd_kafka_oauthbearer_unsecured_token0(
+ &token, sasl_oauthbearer_config, now_wallclock_ms, errstr,
+ sizeof(errstr));
+ if (r != -1)
+ rd_kafka_sasl_oauthbearer_token_free(&token);
+
+ RD_UT_ASSERT(r == -1, "Did not fail despite missing principal");
+
+ RD_UT_ASSERT(!strcmp(errstr, expected_msg),
+ "Incorrect error message when no principal: "
+ "expected=%s received=%s",
+ expected_msg, errstr);
+ RD_UT_PASS();
+}
+
+/**
+ * @brief `sasl.oauthbearer.config` test:
+ * should fail when no sasl.oauthbearer.config is specified.
+ */
+static int do_unittest_config_empty_should_fail(void) {
+ static const char *expected_msg =
+ "Invalid sasl.oauthbearer.config: "
+ "must not be empty";
+ static const char *sasl_oauthbearer_config = "";
+ rd_ts_t now_wallclock_ms = 1000;
+ char errstr[512];
+ struct rd_kafka_sasl_oauthbearer_token token = RD_ZERO_INIT;
+ int r;
+
+ r = rd_kafka_oauthbearer_unsecured_token0(
+ &token, sasl_oauthbearer_config, now_wallclock_ms, errstr,
+ sizeof(errstr));
+ if (r != -1)
+ rd_kafka_sasl_oauthbearer_token_free(&token);
+
+ RD_UT_ASSERT(r == -1, "Did not fail despite empty config");
+
+ RD_UT_ASSERT(!strcmp(errstr, expected_msg),
+ "Incorrect error message with empty config: "
+ "expected=%s received=%s",
+ expected_msg, errstr);
+ RD_UT_PASS();
+}
+
+/**
+ * @brief `sasl.oauthbearer.config` test:
+ * should fail when something unrecognized is specified.
+ */
+static int do_unittest_config_unrecognized_should_fail(void) {
+ static const char *expected_msg =
+ "Unrecognized "
+ "sasl.oauthbearer.config beginning at: unrecognized";
+ static const char *sasl_oauthbearer_config =
+ "principal=fubar unrecognized";
+ rd_ts_t now_wallclock_ms = 1000;
+ char errstr[512];
+ struct rd_kafka_sasl_oauthbearer_token token;
+ int r;
+
+ r = rd_kafka_oauthbearer_unsecured_token0(
+ &token, sasl_oauthbearer_config, now_wallclock_ms, errstr,
+ sizeof(errstr));
+ if (r != -1)
+ rd_kafka_sasl_oauthbearer_token_free(&token);
+
+ RD_UT_ASSERT(r == -1, "Did not fail with something unrecognized");
+
+ RD_UT_ASSERT(!strcmp(errstr, expected_msg),
+ "Incorrect error message with something unrecognized: "
+ "expected=%s received=%s",
+ expected_msg, errstr);
+ RD_UT_PASS();
+}
+
+/**
+ * @brief `sasl.oauthbearer.config` test:
+ * should fail when empty values are specified.
+ */
+static int do_unittest_config_empty_value_should_fail(void) {
+ static const char *sasl_oauthbearer_configs[] = {
+ "principal=", "principal=fubar principalClaimName=",
+ "principal=fubar scope=", "principal=fubar scopeClaimName=",
+ "principal=fubar lifeSeconds="};
+ static const char *expected_prefix =
+ "Invalid sasl.oauthbearer.config: empty";
+ size_t i;
+ rd_ts_t now_wallclock_ms = 1000;
+ char errstr[512];
+ int r;
+
+ for (i = 0; i < sizeof(sasl_oauthbearer_configs) / sizeof(const char *);
+ i++) {
+ struct rd_kafka_sasl_oauthbearer_token token;
+ r = rd_kafka_oauthbearer_unsecured_token0(
+ &token, sasl_oauthbearer_configs[i], now_wallclock_ms,
+ errstr, sizeof(errstr));
+ if (r != -1)
+ rd_kafka_sasl_oauthbearer_token_free(&token);
+
+ RD_UT_ASSERT(r == -1, "Did not fail with an empty value: %s",
+ sasl_oauthbearer_configs[i]);
+
+ RD_UT_ASSERT(
+ !strncmp(expected_prefix, errstr, strlen(expected_prefix)),
+ "Incorrect error message prefix when empty "
+ "(%s): expected=%s received=%s",
+ sasl_oauthbearer_configs[i], expected_prefix, errstr);
+ }
+ RD_UT_PASS();
+}
+
+/**
+ * @brief `sasl.oauthbearer.config` test:
+ * should fail when value with embedded quote is specified.
+ */
+static int do_unittest_config_value_with_quote_should_fail(void) {
+ static const char *sasl_oauthbearer_configs[] = {
+ "principal=\"fu", "principal=fubar principalClaimName=\"bar",
+ "principal=fubar scope=\"a,b,c",
+ "principal=fubar scopeClaimName=\"baz"};
+ static const char *expected_prefix =
+ "Invalid "
+ "sasl.oauthbearer.config: '\"' cannot appear in ";
+ size_t i;
+ rd_ts_t now_wallclock_ms = 1000;
+ char errstr[512];
+ int r;
+
+ for (i = 0; i < sizeof(sasl_oauthbearer_configs) / sizeof(const char *);
+ i++) {
+ struct rd_kafka_sasl_oauthbearer_token token;
+ r = rd_kafka_oauthbearer_unsecured_token0(
+ &token, sasl_oauthbearer_configs[i], now_wallclock_ms,
+ errstr, sizeof(errstr));
+ if (r != -1)
+ rd_kafka_sasl_oauthbearer_token_free(&token);
+
+ RD_UT_ASSERT(r == -1, "Did not fail with embedded quote: %s",
+ sasl_oauthbearer_configs[i]);
+
+ RD_UT_ASSERT(
+ !strncmp(expected_prefix, errstr, strlen(expected_prefix)),
+ "Incorrect error message prefix with "
+ "embedded quote (%s): expected=%s received=%s",
+ sasl_oauthbearer_configs[i], expected_prefix, errstr);
+ }
+ RD_UT_PASS();
+}
+
+/**
+ * @brief `sasl.oauthbearer.config` test:
+ * should generate correct extensions.
+ */
+static int do_unittest_config_extensions(void) {
+ static const char *sasl_oauthbearer_config =
+ "principal=fubar "
+ "extension_a=b extension_yz=yzval";
+ rd_ts_t now_wallclock_ms = 1000;
+ char errstr[512];
+ struct rd_kafka_sasl_oauthbearer_token token;
+ int r;
+
+ r = rd_kafka_oauthbearer_unsecured_token0(
+ &token, sasl_oauthbearer_config, now_wallclock_ms, errstr,
+ sizeof(errstr));
+
+ if (r == -1)
+ RD_UT_FAIL("Failed to create a token: %s: %s",
+ sasl_oauthbearer_config, errstr);
+
+ RD_UT_ASSERT(token.extension_size == 4,
+ "Incorrect extensions: expected 4, received %" PRIusz,
+ token.extension_size);
+
+ RD_UT_ASSERT(!strcmp(token.extensions[0], "a") &&
+ !strcmp(token.extensions[1], "b") &&
+ !strcmp(token.extensions[2], "yz") &&
+ !strcmp(token.extensions[3], "yzval"),
+ "Incorrect extensions: expected a=b and "
+ "yz=yzval but received %s=%s and %s=%s",
+ token.extensions[0], token.extensions[1],
+ token.extensions[2], token.extensions[3]);
+
+ rd_kafka_sasl_oauthbearer_token_free(&token);
+
+ RD_UT_PASS();
+}
+
+/**
+ * @brief make sure illegal extensions keys are rejected
+ */
+static int do_unittest_illegal_extension_keys_should_fail(void) {
+ static const char *illegal_keys[] = {"", "auth", "a1", " a"};
+ size_t i;
+ char errstr[512];
+ int r;
+
+ for (i = 0; i < sizeof(illegal_keys) / sizeof(const char *); i++) {
+ r = check_oauthbearer_extension_key(illegal_keys[i], errstr,
+ sizeof(errstr));
+ RD_UT_ASSERT(r == -1,
+ "Did not recognize illegal extension key: %s",
+ illegal_keys[i]);
+ }
+ RD_UT_PASS();
+}
+
+/**
+ * @brief make sure illegal extensions keys are rejected
+ */
+static int do_unittest_odd_extension_size_should_fail(void) {
+ static const char *expected_errstr =
+ "Incorrect extension size "
+ "(must be a non-negative multiple of 2): 1";
+ char errstr[512];
+ rd_kafka_resp_err_t err;
+ rd_kafka_t rk = RD_ZERO_INIT;
+ rd_kafka_sasl_oauthbearer_handle_t handle = RD_ZERO_INIT;
+
+ rk.rk_conf.sasl.provider = &rd_kafka_sasl_oauthbearer_provider;
+ rk.rk_sasl.handle = &handle;
+
+ rwlock_init(&handle.lock);
+
+ err = rd_kafka_oauthbearer_set_token0(&rk, "abcd", 1000, "fubar", NULL,
+ 1, errstr, sizeof(errstr));
+
+ rwlock_destroy(&handle.lock);
+
+ RD_UT_ASSERT(err, "Did not recognize illegal extension size");
+ RD_UT_ASSERT(!strcmp(errstr, expected_errstr),
+ "Incorrect error message for illegal "
+ "extension size: expected=%s; received=%s",
+ expected_errstr, errstr);
+ RD_UT_ASSERT(err == RD_KAFKA_RESP_ERR__INVALID_ARG,
+ "Expected ErrInvalidArg, not %s", rd_kafka_err2name(err));
+
+ RD_UT_PASS();
+}
+
+int unittest_sasl_oauthbearer(void) {
+ int fails = 0;
+
+ fails += do_unittest_config_no_principal_should_fail();
+ fails += do_unittest_config_empty_should_fail();
+ fails += do_unittest_config_empty_value_should_fail();
+ fails += do_unittest_config_value_with_quote_should_fail();
+ fails += do_unittest_config_unrecognized_should_fail();
+ fails += do_unittest_config_defaults();
+ fails += do_unittest_config_explicit_scope_and_life();
+ fails += do_unittest_config_all_explicit_values();
+ fails += do_unittest_config_extensions();
+ fails += do_unittest_illegal_extension_keys_should_fail();
+ fails += do_unittest_odd_extension_size_should_fail();
+
+ return fails;
+}