diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_scram.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_scram.c | 973 |
1 files changed, 973 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_scram.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_scram.c new file mode 100644 index 00000000..7d5db564 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_sasl_scram.c @@ -0,0 +1,973 @@ +/* + * librdkafka - The Apache Kafka C/C++ library + * + * Copyright (c) 2017 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + + +/** + * Builtin SASL SCRAM support when Cyrus SASL is not available + */ +#include "rdkafka_int.h" +#include "rdkafka_transport.h" +#include "rdkafka_transport_int.h" +#include "rdkafka_sasl.h" +#include "rdkafka_sasl_int.h" +#include "rdrand.h" +#include "rdunittest.h" + + +#if WITH_SSL +#include <openssl/hmac.h> +#include <openssl/evp.h> +#include <openssl/sha.h> +#else +#error "WITH_SSL (OpenSSL) is required for SASL SCRAM" +#endif + + +/** + * @brief Per-connection state + */ +struct rd_kafka_sasl_scram_state { + enum { RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FIRST_MESSAGE, + RD_KAFKA_SASL_SCRAM_STATE_SERVER_FIRST_MESSAGE, + RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FINAL_MESSAGE, + } state; + rd_chariov_t cnonce; /* client c-nonce */ + rd_chariov_t first_msg_bare; /* client-first-message-bare */ + char *ServerSignatureB64; /* ServerSignature in Base64 */ + const EVP_MD *evp; /* Hash function pointer */ +}; + + +/** + * @brief Close and free authentication state + */ +static void rd_kafka_sasl_scram_close(rd_kafka_transport_t *rktrans) { + struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state; + + if (!state) + return; + + RD_IF_FREE(state->cnonce.ptr, rd_free); + RD_IF_FREE(state->first_msg_bare.ptr, rd_free); + RD_IF_FREE(state->ServerSignatureB64, rd_free); + rd_free(state); +} + + + +/** + * @brief Generates a nonce string (a random printable string) + * @remark dst->ptr will be allocated and must be freed. + */ +static void rd_kafka_sasl_scram_generate_nonce(rd_chariov_t *dst) { + int i; + dst->size = 32; + dst->ptr = rd_malloc(dst->size + 1); + for (i = 0; i < (int)dst->size; i++) + dst->ptr[i] = (char)rd_jitter(0x2d /*-*/, 0x7e /*~*/); + dst->ptr[i] = 0; +} + + +/** + * @brief Parses inbuf for SCRAM attribute \p attr (e.g., 's') + * @returns a newly allocated copy of the value, or NULL + * on failure in which case an error is written to \p errstr + * prefixed by \p description. + */ +static char *rd_kafka_sasl_scram_get_attr(const rd_chariov_t *inbuf, + char attr, + const char *description, + char *errstr, + size_t errstr_size) { + size_t of = 0; + + for (of = 0; of < inbuf->size;) { + const char *td; + size_t len; + + /* Find next delimiter , (if any) */ + td = memchr(&inbuf->ptr[of], ',', inbuf->size - of); + if (td) + len = (size_t)(td - &inbuf->ptr[of]); + else + len = inbuf->size - of; + + /* Check if attr "x=" matches */ + if (inbuf->ptr[of] == attr && inbuf->size > of + 1 && + inbuf->ptr[of + 1] == '=') { + char *ret; + of += 2; /* past = */ + ret = rd_malloc(len - 2 + 1); + memcpy(ret, &inbuf->ptr[of], len - 2); + ret[len - 2] = '\0'; + return ret; + } + + /* Not the attr we are looking for, skip + * past the next delimiter and continue looking. */ + of += len + 1; + } + + rd_snprintf(errstr, errstr_size, "%s: could not find attribute (%c)", + description, attr); + return NULL; +} + + +/** + * @brief Base64 encode binary input \p in + * @returns a newly allocated, base64-encoded string or NULL on error. + */ +static char *rd_base64_encode(const rd_chariov_t *in) { + char *ret; + size_t ret_len, max_len; + + /* OpenSSL takes an |int| argument so the input cannot exceed that. */ + if (in->size > INT_MAX) { + return NULL; + } + + /* This does not overflow given the |INT_MAX| bound, above. */ + max_len = (((in->size + 2) / 3) * 4) + 1; + ret = rd_malloc(max_len); + if (ret == NULL) { + return NULL; + } + + ret_len = + EVP_EncodeBlock((uint8_t *)ret, (uint8_t *)in->ptr, (int)in->size); + assert(ret_len < max_len); + ret[ret_len] = 0; + + return ret; +} + + +/** + * @brief Base64 decode input string \p in. Ignores leading and trailing + * whitespace. + * @returns -1 on invalid Base64, or 0 on successes in which case a + * newly allocated binary string is set in out (and size). + */ +static int rd_base64_decode(const rd_chariov_t *in, rd_chariov_t *out) { + size_t ret_len; + + /* OpenSSL takes an |int| argument, so |in->size| must not exceed + * that. */ + if (in->size % 4 != 0 || in->size > INT_MAX) { + return -1; + } + + ret_len = ((in->size / 4) * 3); + out->ptr = rd_malloc(ret_len + 1); + + if (EVP_DecodeBlock((uint8_t *)out->ptr, (uint8_t *)in->ptr, + (int)in->size) == -1) { + rd_free(out->ptr); + out->ptr = NULL; + return -1; + } + + /* EVP_DecodeBlock will pad the output with trailing NULs and count + * them in the return value. */ + if (in->size > 1 && in->ptr[in->size - 1] == '=') { + if (in->size > 2 && in->ptr[in->size - 2] == '=') { + ret_len -= 2; + } else { + ret_len -= 1; + } + } + + out->ptr[ret_len] = 0; + out->size = ret_len; + + return 0; +} + + +/** + * @brief Perform H(str) hash function and stores the result in \p out + * which must be at least EVP_MAX_MD_SIZE. + * @returns 0 on success, else -1 + */ +static int rd_kafka_sasl_scram_H(rd_kafka_transport_t *rktrans, + const rd_chariov_t *str, + rd_chariov_t *out) { + + rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_H( + (const unsigned char *)str->ptr, str->size, + (unsigned char *)out->ptr); + + out->size = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_H_size; + return 0; +} + +/** + * @brief Perform HMAC(key,str) and stores the result in \p out + * which must be at least EVP_MAX_MD_SIZE. + * @returns 0 on success, else -1 + */ +static int rd_kafka_sasl_scram_HMAC(rd_kafka_transport_t *rktrans, + const rd_chariov_t *key, + const rd_chariov_t *str, + rd_chariov_t *out) { + const EVP_MD *evp = + rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_evp; + unsigned int outsize; + + if (!HMAC(evp, (const unsigned char *)key->ptr, (int)key->size, + (const unsigned char *)str->ptr, (int)str->size, + (unsigned char *)out->ptr, &outsize)) { + rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM", + "HMAC failed"); + return -1; + } + + out->size = outsize; + + return 0; +} + + + +/** + * @brief Perform \p itcnt iterations of HMAC() on the given buffer \p in + * using \p salt, writing the output into \p out which must be + * at least EVP_MAX_MD_SIZE. Actual size is updated in \p *outsize. + * @returns 0 on success, else -1 + */ +static int rd_kafka_sasl_scram_Hi(rd_kafka_transport_t *rktrans, + const rd_chariov_t *in, + const rd_chariov_t *salt, + int itcnt, + rd_chariov_t *out) { + const EVP_MD *evp = + rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.scram_evp; + unsigned int ressize = 0; + unsigned char tempres[EVP_MAX_MD_SIZE]; + unsigned char *saltplus; + int i; + + /* U1 := HMAC(str, salt + INT(1)) */ + saltplus = rd_alloca(salt->size + 4); + memcpy(saltplus, salt->ptr, salt->size); + saltplus[salt->size] = 0; + saltplus[salt->size + 1] = 0; + saltplus[salt->size + 2] = 0; + saltplus[salt->size + 3] = 1; + + /* U1 := HMAC(str, salt + INT(1)) */ + if (!HMAC(evp, (const unsigned char *)in->ptr, (int)in->size, saltplus, + salt->size + 4, tempres, &ressize)) { + rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM", + "HMAC priming failed"); + return -1; + } + + memcpy(out->ptr, tempres, ressize); + + /* Ui-1 := HMAC(str, Ui-2) .. */ + for (i = 1; i < itcnt; i++) { + unsigned char tempdest[EVP_MAX_MD_SIZE]; + int j; + + if (unlikely(!HMAC(evp, (const unsigned char *)in->ptr, + (int)in->size, tempres, ressize, tempdest, + NULL))) { + rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM", + "Hi() HMAC #%d/%d failed", i, itcnt); + return -1; + } + + /* U1 XOR U2 .. */ + for (j = 0; j < (int)ressize; j++) { + out->ptr[j] ^= tempdest[j]; + tempres[j] = tempdest[j]; + } + } + + out->size = ressize; + + return 0; +} + + +/** + * @returns a SASL value-safe-char encoded string, replacing "," and "=" + * with their escaped counterparts in a newly allocated string. + */ +static char *rd_kafka_sasl_safe_string(const char *str) { + char *safe = NULL, *d = NULL /*avoid warning*/; + int pass; + size_t len = 0; + + /* Pass #1: scan for needed length and allocate. + * Pass #2: encode string */ + for (pass = 0; pass < 2; pass++) { + const char *s; + for (s = str; *s; s++) { + if (pass == 0) { + /* If this byte needs to be escaped then + * 3 output bytes are needed instead of 1. */ + len += (*s == ',' || *s == '=') ? 3 : 1; + continue; + } + + if (*s == ',') { + *(d++) = '='; + *(d++) = '2'; + *(d++) = 'C'; + } else if (*s == '=') { + *(d++) = '='; + *(d++) = '3'; + *(d++) = 'D'; + } else + *(d++) = *s; + } + + if (pass == 0) + d = safe = rd_malloc(len + 1); + } + + rd_assert(d == safe + (int)len); + *d = '\0'; + + return safe; +} + + +/** + * @brief Build client-final-message-without-proof + * @remark out->ptr will be allocated and must be freed. + */ +static void rd_kafka_sasl_scram_build_client_final_message_wo_proof( + struct rd_kafka_sasl_scram_state *state, + const char *snonce, + rd_chariov_t *out) { + const char *attr_c = "biws"; /* base64 encode of "n,," */ + + /* + * client-final-message-without-proof = + * channel-binding "," nonce ["," + * extensions] + */ + out->size = strlen("c=,r=") + strlen(attr_c) + state->cnonce.size + + strlen(snonce); + out->ptr = rd_malloc(out->size + 1); + rd_snprintf(out->ptr, out->size + 1, "c=%s,r=%.*s%s", attr_c, + (int)state->cnonce.size, state->cnonce.ptr, snonce); +} + + +/** + * @brief Build client-final-message + * @returns -1 on error. + */ +static int rd_kafka_sasl_scram_build_client_final_message( + rd_kafka_transport_t *rktrans, + const rd_chariov_t *salt, + const char *server_nonce, + const rd_chariov_t *server_first_msg, + int itcnt, + rd_chariov_t *out) { + struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state; + rd_kafka_conf_t *conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf; + rd_chariov_t SaslPassword = RD_ZERO_INIT; + rd_chariov_t SaltedPassword = {.ptr = rd_alloca(EVP_MAX_MD_SIZE)}; + rd_chariov_t ClientKey = {.ptr = rd_alloca(EVP_MAX_MD_SIZE)}; + rd_chariov_t ServerKey = {.ptr = rd_alloca(EVP_MAX_MD_SIZE)}; + rd_chariov_t StoredKey = {.ptr = rd_alloca(EVP_MAX_MD_SIZE)}; + rd_chariov_t AuthMessage = RD_ZERO_INIT; + rd_chariov_t ClientSignature = {.ptr = rd_alloca(EVP_MAX_MD_SIZE)}; + rd_chariov_t ServerSignature = {.ptr = rd_alloca(EVP_MAX_MD_SIZE)}; + const rd_chariov_t ClientKeyVerbatim = {.ptr = "Client Key", + .size = 10}; + const rd_chariov_t ServerKeyVerbatim = {.ptr = "Server Key", + .size = 10}; + rd_chariov_t ClientProof = {.ptr = rd_alloca(EVP_MAX_MD_SIZE)}; + rd_chariov_t client_final_msg_wo_proof; + char *ClientProofB64; + int i; + + mtx_lock(&conf->sasl.lock); + rd_strdupa(&SaslPassword.ptr, conf->sasl.password); + mtx_unlock(&conf->sasl.lock); + SaslPassword.size = strlen(SaslPassword.ptr); + + /* Constructing the ClientProof attribute (p): + * + * p = Base64-encoded ClientProof + * SaltedPassword := Hi(Normalize(password), salt, i) + * ClientKey := HMAC(SaltedPassword, "Client Key") + * StoredKey := H(ClientKey) + * AuthMessage := client-first-message-bare + "," + + * server-first-message + "," + + * client-final-message-without-proof + * ClientSignature := HMAC(StoredKey, AuthMessage) + * ClientProof := ClientKey XOR ClientSignature + * ServerKey := HMAC(SaltedPassword, "Server Key") + * ServerSignature := HMAC(ServerKey, AuthMessage) + */ + + /* SaltedPassword := Hi(Normalize(password), salt, i) */ + if (rd_kafka_sasl_scram_Hi(rktrans, &SaslPassword, salt, itcnt, + &SaltedPassword) == -1) + return -1; + + /* ClientKey := HMAC(SaltedPassword, "Client Key") */ + if (rd_kafka_sasl_scram_HMAC(rktrans, &SaltedPassword, + &ClientKeyVerbatim, &ClientKey) == -1) + return -1; + + /* StoredKey := H(ClientKey) */ + if (rd_kafka_sasl_scram_H(rktrans, &ClientKey, &StoredKey) == -1) + return -1; + + /* client-final-message-without-proof */ + rd_kafka_sasl_scram_build_client_final_message_wo_proof( + state, server_nonce, &client_final_msg_wo_proof); + + /* AuthMessage := client-first-message-bare + "," + + * server-first-message + "," + + * client-final-message-without-proof */ + AuthMessage.size = state->first_msg_bare.size + 1 + + server_first_msg->size + 1 + + client_final_msg_wo_proof.size; + AuthMessage.ptr = rd_alloca(AuthMessage.size + 1); + rd_snprintf(AuthMessage.ptr, AuthMessage.size + 1, "%.*s,%.*s,%.*s", + (int)state->first_msg_bare.size, state->first_msg_bare.ptr, + (int)server_first_msg->size, server_first_msg->ptr, + (int)client_final_msg_wo_proof.size, + client_final_msg_wo_proof.ptr); + + /* + * Calculate ServerSignature for later verification when + * server-final-message is received. + */ + + /* ServerKey := HMAC(SaltedPassword, "Server Key") */ + if (rd_kafka_sasl_scram_HMAC(rktrans, &SaltedPassword, + &ServerKeyVerbatim, &ServerKey) == -1) { + rd_free(client_final_msg_wo_proof.ptr); + return -1; + } + + /* ServerSignature := HMAC(ServerKey, AuthMessage) */ + if (rd_kafka_sasl_scram_HMAC(rktrans, &ServerKey, &AuthMessage, + &ServerSignature) == -1) { + rd_free(client_final_msg_wo_proof.ptr); + return -1; + } + + /* Store the Base64 encoded ServerSignature for quick comparison */ + state->ServerSignatureB64 = rd_base64_encode(&ServerSignature); + if (state->ServerSignatureB64 == NULL) { + rd_free(client_final_msg_wo_proof.ptr); + return -1; + } + + /* + * Continue with client-final-message + */ + + /* ClientSignature := HMAC(StoredKey, AuthMessage) */ + if (rd_kafka_sasl_scram_HMAC(rktrans, &StoredKey, &AuthMessage, + &ClientSignature) == -1) { + rd_free(client_final_msg_wo_proof.ptr); + return -1; + } + + /* ClientProof := ClientKey XOR ClientSignature */ + assert(ClientKey.size == ClientSignature.size); + for (i = 0; i < (int)ClientKey.size; i++) + ClientProof.ptr[i] = ClientKey.ptr[i] ^ ClientSignature.ptr[i]; + ClientProof.size = ClientKey.size; + + + /* Base64 encoded ClientProof */ + ClientProofB64 = rd_base64_encode(&ClientProof); + if (ClientProofB64 == NULL) { + rd_free(client_final_msg_wo_proof.ptr); + return -1; + } + + /* Construct client-final-message */ + out->size = client_final_msg_wo_proof.size + strlen(",p=") + + strlen(ClientProofB64); + out->ptr = rd_malloc(out->size + 1); + + rd_snprintf(out->ptr, out->size + 1, "%.*s,p=%s", + (int)client_final_msg_wo_proof.size, + client_final_msg_wo_proof.ptr, ClientProofB64); + rd_free(ClientProofB64); + rd_free(client_final_msg_wo_proof.ptr); + + return 0; +} + + +/** + * @brief Handle first message from server + * + * Parse server response which looks something like: + * "r=fyko+d2lbbFgONR....,s=QSXCR+Q6sek8bf92,i=4096" + * + * @returns -1 on error. + */ +static int +rd_kafka_sasl_scram_handle_server_first_message(rd_kafka_transport_t *rktrans, + const rd_chariov_t *in, + rd_chariov_t *out, + char *errstr, + size_t errstr_size) { + struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state; + char *server_nonce; + rd_chariov_t salt_b64, salt; + char *itcntstr; + const char *endptr; + int itcnt; + char *attr_m; + + /* Mandatory future extension check */ + if ((attr_m = rd_kafka_sasl_scram_get_attr(in, 'm', NULL, NULL, 0))) { + rd_snprintf(errstr, errstr_size, + "Unsupported mandatory SCRAM extension"); + rd_free(attr_m); + return -1; + } + + /* Server nonce */ + if (!(server_nonce = rd_kafka_sasl_scram_get_attr( + in, 'r', "Server nonce in server-first-message", errstr, + errstr_size))) + return -1; + + if (strlen(server_nonce) <= state->cnonce.size || + strncmp(state->cnonce.ptr, server_nonce, state->cnonce.size)) { + rd_snprintf(errstr, errstr_size, + "Server/client nonce mismatch in " + "server-first-message"); + rd_free(server_nonce); + return -1; + } + + /* Salt (Base64) */ + if (!(salt_b64.ptr = rd_kafka_sasl_scram_get_attr( + in, 's', "Salt in server-first-message", errstr, + errstr_size))) { + rd_free(server_nonce); + return -1; + } + salt_b64.size = strlen(salt_b64.ptr); + + /* Convert Salt to binary */ + if (rd_base64_decode(&salt_b64, &salt) == -1) { + rd_snprintf(errstr, errstr_size, + "Invalid Base64 Salt in server-first-message"); + rd_free(server_nonce); + rd_free(salt_b64.ptr); + return -1; + } + rd_free(salt_b64.ptr); + + /* Iteration count (as string) */ + if (!(itcntstr = rd_kafka_sasl_scram_get_attr( + in, 'i', "Iteration count in server-first-message", errstr, + errstr_size))) { + rd_free(server_nonce); + rd_free(salt.ptr); + return -1; + } + + /* Iteration count (as int) */ + errno = 0; + itcnt = (int)strtoul(itcntstr, (char **)&endptr, 10); + if (itcntstr == endptr || *endptr != '\0' || errno != 0 || + itcnt > 1000000) { + rd_snprintf(errstr, errstr_size, + "Invalid value (not integer or too large) " + "for Iteration count in server-first-message"); + rd_free(server_nonce); + rd_free(salt.ptr); + rd_free(itcntstr); + return -1; + } + rd_free(itcntstr); + + /* Build client-final-message */ + if (rd_kafka_sasl_scram_build_client_final_message( + rktrans, &salt, server_nonce, in, itcnt, out) == -1) { + rd_snprintf(errstr, errstr_size, + "Failed to build SCRAM client-final-message"); + rd_free(salt.ptr); + rd_free(server_nonce); + return -1; + } + + rd_free(server_nonce); + rd_free(salt.ptr); + + return 0; +} + +/** + * @brief Handle server-final-message + * + * This is the end of authentication and the SCRAM state + * will be freed at the end of this function regardless of + * authentication outcome. + * + * @returns -1 on failure + */ +static int +rd_kafka_sasl_scram_handle_server_final_message(rd_kafka_transport_t *rktrans, + const rd_chariov_t *in, + char *errstr, + size_t errstr_size) { + struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state; + char *attr_v, *attr_e; + + if ((attr_e = rd_kafka_sasl_scram_get_attr( + in, 'e', "server-error in server-final-message", errstr, + errstr_size))) { + /* Authentication failed */ + + rd_snprintf(errstr, errstr_size, + "SASL SCRAM authentication failed: " + "broker responded with %s", + attr_e); + rd_free(attr_e); + return -1; + + } else if ((attr_v = rd_kafka_sasl_scram_get_attr( + in, 'v', "verifier in server-final-message", errstr, + errstr_size))) { + rd_kafka_conf_t *conf; + + /* Authentication succesful on server, + * but we need to verify the ServerSignature too. */ + rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY | RD_KAFKA_DBG_BROKER, + "SCRAMAUTH", + "SASL SCRAM authentication successful on server: " + "verifying ServerSignature"); + + if (strcmp(attr_v, state->ServerSignatureB64)) { + rd_snprintf(errstr, errstr_size, + "SASL SCRAM authentication failed: " + "ServerSignature mismatch " + "(server's %s != ours %s)", + attr_v, state->ServerSignatureB64); + rd_free(attr_v); + return -1; + } + rd_free(attr_v); + + conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf; + + mtx_lock(&conf->sasl.lock); + rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY | RD_KAFKA_DBG_BROKER, + "SCRAMAUTH", "Authenticated as %s using %s", + conf->sasl.username, conf->sasl.mechanisms); + mtx_unlock(&conf->sasl.lock); + + rd_kafka_sasl_auth_done(rktrans); + return 0; + + } else { + rd_snprintf(errstr, errstr_size, + "SASL SCRAM authentication failed: " + "no verifier or server-error returned from broker"); + return -1; + } +} + + + +/** + * @brief Build client-first-message + */ +static void +rd_kafka_sasl_scram_build_client_first_message(rd_kafka_transport_t *rktrans, + rd_chariov_t *out) { + char *sasl_username; + struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state; + rd_kafka_conf_t *conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf; + + rd_kafka_sasl_scram_generate_nonce(&state->cnonce); + + mtx_lock(&conf->sasl.lock); + sasl_username = rd_kafka_sasl_safe_string(conf->sasl.username); + mtx_unlock(&conf->sasl.lock); + + out->size = + strlen("n,,n=,r=") + strlen(sasl_username) + state->cnonce.size; + out->ptr = rd_malloc(out->size + 1); + + rd_snprintf(out->ptr, out->size + 1, "n,,n=%s,r=%.*s", sasl_username, + (int)state->cnonce.size, state->cnonce.ptr); + rd_free(sasl_username); + + /* Save client-first-message-bare (skip gs2-header) */ + state->first_msg_bare.size = out->size - 3; + state->first_msg_bare.ptr = + rd_memdup(out->ptr + 3, state->first_msg_bare.size); +} + + + +/** + * @brief SASL SCRAM client state machine + * @returns -1 on failure (errstr set), else 0. + */ +static int rd_kafka_sasl_scram_fsm(rd_kafka_transport_t *rktrans, + const rd_chariov_t *in, + char *errstr, + size_t errstr_size) { + static const char *state_names[] = { + "client-first-message", + "server-first-message", + "client-final-message", + }; + struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state; + rd_chariov_t out = RD_ZERO_INIT; + int r = -1; + rd_ts_t ts_start = rd_clock(); + int prev_state = state->state; + + rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SASLSCRAM", + "SASL SCRAM client in state %s", state_names[state->state]); + + switch (state->state) { + case RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FIRST_MESSAGE: + rd_dassert(!in); /* Not expecting any server-input */ + + rd_kafka_sasl_scram_build_client_first_message(rktrans, &out); + state->state = RD_KAFKA_SASL_SCRAM_STATE_SERVER_FIRST_MESSAGE; + break; + + + case RD_KAFKA_SASL_SCRAM_STATE_SERVER_FIRST_MESSAGE: + rd_dassert(in); /* Requires server-input */ + + if (rd_kafka_sasl_scram_handle_server_first_message( + rktrans, in, &out, errstr, errstr_size) == -1) + return -1; + + state->state = RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FINAL_MESSAGE; + break; + + case RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FINAL_MESSAGE: + rd_dassert(in); /* Requires server-input */ + + r = rd_kafka_sasl_scram_handle_server_final_message( + rktrans, in, errstr, errstr_size); + break; + } + + if (out.ptr) { + r = rd_kafka_sasl_send(rktrans, out.ptr, (int)out.size, errstr, + errstr_size); + rd_free(out.ptr); + } + + ts_start = (rd_clock() - ts_start) / 1000; + if (ts_start >= 100) + rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SCRAM", + "SASL SCRAM state %s handled in %" PRId64 "ms", + state_names[prev_state], ts_start); + + + return r; +} + + +/** + * @brief Handle received frame from broker. + */ +static int rd_kafka_sasl_scram_recv(rd_kafka_transport_t *rktrans, + const void *buf, + size_t size, + char *errstr, + size_t errstr_size) { + const rd_chariov_t in = {.ptr = (char *)buf, .size = size}; + return rd_kafka_sasl_scram_fsm(rktrans, &in, errstr, errstr_size); +} + + +/** + * @brief Initialize and start SASL SCRAM (builtin) authentication. + * + * Returns 0 on successful init and -1 on error. + * + * @locality broker thread + */ +static int rd_kafka_sasl_scram_client_new(rd_kafka_transport_t *rktrans, + const char *hostname, + char *errstr, + size_t errstr_size) { + struct rd_kafka_sasl_scram_state *state; + + state = rd_calloc(1, sizeof(*state)); + state->state = RD_KAFKA_SASL_SCRAM_STATE_CLIENT_FIRST_MESSAGE; + rktrans->rktrans_sasl.state = state; + + /* Kick off the FSM */ + return rd_kafka_sasl_scram_fsm(rktrans, NULL, errstr, errstr_size); +} + + + +/** + * @brief Validate SCRAM config and look up the hash function + */ +static int rd_kafka_sasl_scram_conf_validate(rd_kafka_t *rk, + char *errstr, + size_t errstr_size) { + const char *mech = rk->rk_conf.sasl.mechanisms; + rd_bool_t both_set; + + mtx_lock(&rk->rk_conf.sasl.lock); + both_set = rk->rk_conf.sasl.username && rk->rk_conf.sasl.password; + mtx_unlock(&rk->rk_conf.sasl.lock); + + if (!both_set) { + rd_snprintf(errstr, errstr_size, + "sasl.username and sasl.password must be set"); + return -1; + } + + if (!strcmp(mech, "SCRAM-SHA-1")) { + rk->rk_conf.sasl.scram_evp = EVP_sha1(); + rk->rk_conf.sasl.scram_H = SHA1; + rk->rk_conf.sasl.scram_H_size = SHA_DIGEST_LENGTH; + } else if (!strcmp(mech, "SCRAM-SHA-256")) { + rk->rk_conf.sasl.scram_evp = EVP_sha256(); + rk->rk_conf.sasl.scram_H = SHA256; + rk->rk_conf.sasl.scram_H_size = SHA256_DIGEST_LENGTH; + } else if (!strcmp(mech, "SCRAM-SHA-512")) { + rk->rk_conf.sasl.scram_evp = EVP_sha512(); + rk->rk_conf.sasl.scram_H = SHA512; + rk->rk_conf.sasl.scram_H_size = SHA512_DIGEST_LENGTH; + } else { + rd_snprintf(errstr, errstr_size, + "Unsupported hash function: %s " + "(try SCRAM-SHA-512)", + mech); + return -1; + } + + return 0; +} + + + +const struct rd_kafka_sasl_provider rd_kafka_sasl_scram_provider = { + .name = "SCRAM (builtin)", + .client_new = rd_kafka_sasl_scram_client_new, + .recv = rd_kafka_sasl_scram_recv, + .close = rd_kafka_sasl_scram_close, + .conf_validate = rd_kafka_sasl_scram_conf_validate, +}; + + + +/** + * @name Unit tests + */ + +/** + * @brief Verify that a random nonce is generated. + */ +static int unittest_scram_nonce(void) { + rd_chariov_t out1 = RD_ZERO_INIT; + rd_chariov_t out2 = RD_ZERO_INIT; + + rd_kafka_sasl_scram_generate_nonce(&out1); + RD_UT_ASSERT(out1.size == 32, "Wrong size %d", (int)out1.size); + + rd_kafka_sasl_scram_generate_nonce(&out2); + RD_UT_ASSERT(out1.size == 32, "Wrong size %d", (int)out2.size); + + RD_UT_ASSERT(memcmp(out1.ptr, out2.ptr, out1.size) != 0, + "Expected generate_nonce() to return a random nonce"); + + rd_free(out1.ptr); + rd_free(out2.ptr); + + RD_UT_PASS(); +} + + +/** + * @brief Verify that the safe string function does not overwrite memory. + * Needs to be run with ASAN (which is done in release-tests) for + * proper verification. + */ +static int unittest_scram_safe(void) { + const char *inout[] = { + "just a string", + "just a string", + + "another,one,that,needs=escaping!", + "another=2Cone=2Cthat=2Cneeds=3Descaping!", + + "overflow?============================", + "overflow?=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D" + "=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D", + + "=3D=3D=3D the mind boggles", + "=3D3D=3D3D=3D3D the mind boggles", + + NULL, + NULL}; + int i; + + for (i = 0; inout[i]; i += 2) { + char *out = rd_kafka_sasl_safe_string(inout[i]); + const char *expected = inout[i + 1]; + + RD_UT_ASSERT(!strcmp(out, expected), + "Expected sasl_safe_string(%s) => %s, not %s\n", + inout[i], expected, out); + + rd_free(out); + } + + RD_UT_PASS(); +} + + +int unittest_scram(void) { + int fails = 0; + + fails += unittest_scram_nonce(); + fails += unittest_scram_safe(); + + return fails; +} |