diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_ssl.c')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_ssl.c | 1841 |
1 files changed, 0 insertions, 1841 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_ssl.c b/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_ssl.c deleted file mode 100644 index 9961a240..00000000 --- a/fluent-bit/lib/librdkafka-2.1.0/src/rdkafka_ssl.c +++ /dev/null @@ -1,1841 +0,0 @@ -/* - * librdkafka - The Apache Kafka C/C++ library - * - * Copyright (c) 2019 Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - - -/** - * @name OpenSSL integration - * - */ - -#include "rdkafka_int.h" -#include "rdkafka_transport_int.h" -#include "rdkafka_cert.h" - -#ifdef _WIN32 -#include <wincrypt.h> -#pragma comment(lib, "crypt32.lib") -#pragma comment(lib, "libcrypto.lib") -#pragma comment(lib, "libssl.lib") -#endif - -#include <openssl/x509.h> -#include <openssl/x509_vfy.h> - -#if OPENSSL_VERSION_NUMBER >= 0x30000000 -#include <openssl/provider.h> -#endif - -#include <ctype.h> - -#if !_WIN32 -#include <sys/types.h> -#include <sys/stat.h> -#include <unistd.h> -#endif - - -#if WITH_VALGRIND -/* OpenSSL relies on uninitialized memory, which Valgrind will whine about. - * We use in-code Valgrind macros to suppress those warnings. */ -#include <valgrind/memcheck.h> -#else -#define VALGRIND_MAKE_MEM_DEFINED(A, B) -#endif - - -#if OPENSSL_VERSION_NUMBER < 0x10100000L -static mtx_t *rd_kafka_ssl_locks; -static int rd_kafka_ssl_locks_cnt; -#endif - - -/** - * @brief Close and destroy SSL session - */ -void rd_kafka_transport_ssl_close(rd_kafka_transport_t *rktrans) { - SSL_shutdown(rktrans->rktrans_ssl); - SSL_free(rktrans->rktrans_ssl); - rktrans->rktrans_ssl = NULL; -} - - -/** - * @brief Clear OpenSSL error queue to get a proper error reporting in case - * the next SSL_*() operation fails. - */ -static RD_INLINE void -rd_kafka_transport_ssl_clear_error(rd_kafka_transport_t *rktrans) { - ERR_clear_error(); -#ifdef _WIN32 - WSASetLastError(0); -#else - rd_set_errno(0); -#endif -} - -/** - * @returns a thread-local single-invocation-use error string for - * the last thread-local error in OpenSSL, or an empty string - * if no error. - */ -const char *rd_kafka_ssl_last_error_str(void) { - static RD_TLS char errstr[256]; - unsigned long l; - const char *file, *data, *func; - int line, flags; - -#if OPENSSL_VERSION_NUMBER >= 0x30000000 - l = ERR_peek_last_error_all(&file, &line, &func, &data, &flags); -#else - l = ERR_peek_last_error_line_data(&file, &line, &data, &flags); - func = ERR_func_error_string(l); -#endif - - if (!l) - return ""; - - rd_snprintf(errstr, sizeof(errstr), "%lu:%s:%s:%s:%d: %s", l, - ERR_lib_error_string(l), func, file, line, - ((flags & ERR_TXT_STRING) && data && *data) - ? data - : ERR_reason_error_string(l)); - - return errstr; -} - -/** - * Serves the entire OpenSSL error queue and logs each error. - * The last error is not logged but returned in 'errstr'. - * - * If 'rkb' is non-NULL broker-specific logging will be used, - * else it will fall back on global 'rk' debugging. - */ -static char *rd_kafka_ssl_error(rd_kafka_t *rk, - rd_kafka_broker_t *rkb, - char *errstr, - size_t errstr_size) { - unsigned long l; - const char *file, *data, *func; - int line, flags; - int cnt = 0; - - if (!rk) { - rd_assert(rkb); - rk = rkb->rkb_rk; - } - - while ( -#if OPENSSL_VERSION_NUMBER >= 0x30000000 - (l = ERR_get_error_all(&file, &line, &func, &data, &flags)) -#else - (l = ERR_get_error_line_data(&file, &line, &data, &flags)) -#endif - ) { - char buf[256]; - -#if OPENSSL_VERSION_NUMBER < 0x30000000 - func = ERR_func_error_string(l); -#endif - - if (cnt++ > 0) { - /* Log last message */ - if (rkb) - rd_rkb_log(rkb, LOG_ERR, "SSL", "%s", errstr); - else - rd_kafka_log(rk, LOG_ERR, "SSL", "%s", errstr); - } - - ERR_error_string_n(l, buf, sizeof(buf)); - - if (!(flags & ERR_TXT_STRING) || !data || !*data) - data = NULL; - - /* Include openssl file:line:func if debugging is enabled */ - if (rk->rk_conf.log_level >= LOG_DEBUG) - rd_snprintf(errstr, errstr_size, "%s:%d:%s %s%s%s", - file, line, func, buf, data ? ": " : "", - data ? data : ""); - else - rd_snprintf(errstr, errstr_size, "%s%s%s", buf, - data ? ": " : "", data ? data : ""); - } - - if (cnt == 0) - rd_snprintf(errstr, errstr_size, - "No further error information available"); - - return errstr; -} - - - -/** - * Set transport IO event polling based on SSL error. - * - * Returns -1 on permanent errors. - * - * Locality: broker thread - */ -static RD_INLINE int -rd_kafka_transport_ssl_io_update(rd_kafka_transport_t *rktrans, - int ret, - char *errstr, - size_t errstr_size) { - int serr = SSL_get_error(rktrans->rktrans_ssl, ret); - int serr2; - - switch (serr) { - case SSL_ERROR_WANT_READ: - rd_kafka_transport_poll_set(rktrans, POLLIN); - break; - - case SSL_ERROR_WANT_WRITE: - rd_kafka_transport_set_blocked(rktrans, rd_true); - rd_kafka_transport_poll_set(rktrans, POLLOUT); - break; - - case SSL_ERROR_SYSCALL: - serr2 = ERR_peek_error(); - if (serr2) - rd_kafka_ssl_error(NULL, rktrans->rktrans_rkb, errstr, - errstr_size); - else if (!rd_socket_errno || rd_socket_errno == ECONNRESET) - rd_snprintf(errstr, errstr_size, "Disconnected"); - else - rd_snprintf(errstr, errstr_size, - "SSL transport error: %s", - rd_strerror(rd_socket_errno)); - return -1; - - case SSL_ERROR_ZERO_RETURN: - rd_snprintf(errstr, errstr_size, "Disconnected"); - return -1; - - default: - rd_kafka_ssl_error(NULL, rktrans->rktrans_rkb, errstr, - errstr_size); - return -1; - } - - return 0; -} - -ssize_t rd_kafka_transport_ssl_send(rd_kafka_transport_t *rktrans, - rd_slice_t *slice, - char *errstr, - size_t errstr_size) { - ssize_t sum = 0; - const void *p; - size_t rlen; - - rd_kafka_transport_ssl_clear_error(rktrans); - - while ((rlen = rd_slice_peeker(slice, &p))) { - int r; - size_t r2; - - r = SSL_write(rktrans->rktrans_ssl, p, (int)rlen); - - if (unlikely(r <= 0)) { - if (rd_kafka_transport_ssl_io_update(rktrans, r, errstr, - errstr_size) == -1) - return -1; - else - return sum; - } - - /* Update buffer read position */ - r2 = rd_slice_read(slice, NULL, (size_t)r); - rd_assert((size_t)r == r2 && - *"BUG: wrote more bytes than available in slice"); - - - sum += r; - /* FIXME: remove this and try again immediately and let - * the next SSL_write() call fail instead? */ - if ((size_t)r < rlen) - break; - } - return sum; -} - -ssize_t rd_kafka_transport_ssl_recv(rd_kafka_transport_t *rktrans, - rd_buf_t *rbuf, - char *errstr, - size_t errstr_size) { - ssize_t sum = 0; - void *p; - size_t len; - - while ((len = rd_buf_get_writable(rbuf, &p))) { - int r; - - rd_kafka_transport_ssl_clear_error(rktrans); - - r = SSL_read(rktrans->rktrans_ssl, p, (int)len); - - if (unlikely(r <= 0)) { - if (rd_kafka_transport_ssl_io_update(rktrans, r, errstr, - errstr_size) == -1) - return -1; - else - return sum; - } - - VALGRIND_MAKE_MEM_DEFINED(p, r); - - /* Update buffer write position */ - rd_buf_write(rbuf, NULL, (size_t)r); - - sum += r; - - /* FIXME: remove this and try again immediately and let - * the next SSL_read() call fail instead? */ - if ((size_t)r < len) - break; - } - return sum; -} - - -/** - * OpenSSL password query callback - * - * Locality: application thread - */ -static int rd_kafka_transport_ssl_passwd_cb(char *buf, - int size, - int rwflag, - void *userdata) { - rd_kafka_t *rk = userdata; - int pwlen; - - rd_kafka_dbg(rk, SECURITY, "SSLPASSWD", - "Private key requires password"); - - if (!rk->rk_conf.ssl.key_password) { - rd_kafka_log(rk, LOG_WARNING, "SSLPASSWD", - "Private key requires password but " - "no password configured (ssl.key.password)"); - return -1; - } - - - pwlen = (int)strlen(rk->rk_conf.ssl.key_password); - memcpy(buf, rk->rk_conf.ssl.key_password, RD_MIN(pwlen, size)); - - return pwlen; -} - - -/** - * @brief OpenSSL callback to perform additional broker certificate - * verification and validation. - * - * @return 1 on success when the broker certificate - * is valid and 0 when the certificate is not valid. - * - * @sa SSL_CTX_set_verify() - */ -static int rd_kafka_transport_ssl_cert_verify_cb(int preverify_ok, - X509_STORE_CTX *x509_ctx) { - rd_kafka_transport_t *rktrans = rd_kafka_curr_transport; - rd_kafka_broker_t *rkb; - rd_kafka_t *rk; - X509 *cert; - char *buf = NULL; - int buf_size; - int depth; - int x509_orig_error, x509_error; - char errstr[512]; - int ok; - - rd_assert(rktrans != NULL); - rkb = rktrans->rktrans_rkb; - rk = rkb->rkb_rk; - - cert = X509_STORE_CTX_get_current_cert(x509_ctx); - if (!cert) { - rd_rkb_log(rkb, LOG_ERR, "SSLCERTVRFY", - "Failed to get current certificate to verify"); - return 0; - } - - depth = X509_STORE_CTX_get_error_depth(x509_ctx); - - x509_orig_error = x509_error = X509_STORE_CTX_get_error(x509_ctx); - - buf_size = i2d_X509(cert, (unsigned char **)&buf); - if (buf_size < 0 || !buf) { - rd_rkb_log(rkb, LOG_ERR, "SSLCERTVRFY", - "Unable to convert certificate to X509 format"); - return 0; - } - - *errstr = '\0'; - - /* Call application's verification callback. */ - ok = rk->rk_conf.ssl.cert_verify_cb( - rk, rkb->rkb_nodename, rkb->rkb_nodeid, &x509_error, depth, buf, - (size_t)buf_size, errstr, sizeof(errstr), rk->rk_conf.opaque); - - OPENSSL_free(buf); - - if (!ok) { - char subject[128]; - char issuer[128]; - - X509_NAME_oneline(X509_get_subject_name(cert), subject, - sizeof(subject)); - X509_NAME_oneline(X509_get_issuer_name(cert), issuer, - sizeof(issuer)); - rd_rkb_log(rkb, LOG_ERR, "SSLCERTVRFY", - "Certificate (subject=%s, issuer=%s) verification " - "callback failed: %s", - subject, issuer, errstr); - - X509_STORE_CTX_set_error(x509_ctx, x509_error); - - return 0; /* verification failed */ - } - - /* Clear error */ - if (x509_orig_error != 0 && x509_error == 0) - X509_STORE_CTX_set_error(x509_ctx, 0); - - return 1; /* verification successful */ -} - -/** - * @brief Set TLSEXT hostname for SNI and optionally enable - * SSL endpoint identification verification. - * - * @returns 0 on success or -1 on error. - */ -static int rd_kafka_transport_ssl_set_endpoint_id(rd_kafka_transport_t *rktrans, - char *errstr, - size_t errstr_size) { - char name[RD_KAFKA_NODENAME_SIZE]; - char *t; - - rd_kafka_broker_lock(rktrans->rktrans_rkb); - rd_snprintf(name, sizeof(name), "%s", - rktrans->rktrans_rkb->rkb_nodename); - rd_kafka_broker_unlock(rktrans->rktrans_rkb); - - /* Remove ":9092" port suffix from nodename */ - if ((t = strrchr(name, ':'))) - *t = '\0'; - -#if (OPENSSL_VERSION_NUMBER >= 0x0090806fL) && !defined(OPENSSL_NO_TLSEXT) - /* If non-numerical hostname, send it for SNI */ - if (!(/*ipv6*/ (strchr(name, ':') && - strspn(name, "0123456789abcdefABCDEF:.[]%") == - strlen(name)) || - /*ipv4*/ strspn(name, "0123456789.") == strlen(name)) && - !SSL_set_tlsext_host_name(rktrans->rktrans_ssl, name)) - goto fail; -#endif - - if (rktrans->rktrans_rkb->rkb_rk->rk_conf.ssl.endpoint_identification == - RD_KAFKA_SSL_ENDPOINT_ID_NONE) - return 0; - -#if OPENSSL_VERSION_NUMBER >= 0x10100000 && !defined(OPENSSL_IS_BORINGSSL) - if (!SSL_set1_host(rktrans->rktrans_ssl, name)) - goto fail; -#elif OPENSSL_VERSION_NUMBER >= 0x1000200fL /* 1.0.2 */ - { - X509_VERIFY_PARAM *param; - - param = SSL_get0_param(rktrans->rktrans_ssl); - - if (!X509_VERIFY_PARAM_set1_host(param, name, 0)) - goto fail; - } -#else - rd_snprintf(errstr, errstr_size, - "Endpoint identification not supported on this " - "OpenSSL version (0x%lx)", - OPENSSL_VERSION_NUMBER); - return -1; -#endif - - rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "ENDPOINT", - "Enabled endpoint identification using hostname %s", name); - - return 0; - -fail: - rd_kafka_ssl_error(NULL, rktrans->rktrans_rkb, errstr, errstr_size); - return -1; -} - - -/** - * @brief Set up SSL for a newly connected connection - * - * @returns -1 on failure, else 0. - */ -int rd_kafka_transport_ssl_connect(rd_kafka_broker_t *rkb, - rd_kafka_transport_t *rktrans, - char *errstr, - size_t errstr_size) { - int r; - - rktrans->rktrans_ssl = SSL_new(rkb->rkb_rk->rk_conf.ssl.ctx); - if (!rktrans->rktrans_ssl) - goto fail; - - if (!SSL_set_fd(rktrans->rktrans_ssl, (int)rktrans->rktrans_s)) - goto fail; - - if (rd_kafka_transport_ssl_set_endpoint_id(rktrans, errstr, - errstr_size) == -1) - return -1; - - rd_kafka_transport_ssl_clear_error(rktrans); - - r = SSL_connect(rktrans->rktrans_ssl); - if (r == 1) { - /* Connected, highly unlikely since this is a - * non-blocking operation. */ - rd_kafka_transport_connect_done(rktrans, NULL); - return 0; - } - - if (rd_kafka_transport_ssl_io_update(rktrans, r, errstr, errstr_size) == - -1) - return -1; - - return 0; - -fail: - rd_kafka_ssl_error(NULL, rkb, errstr, errstr_size); - return -1; -} - - -static RD_UNUSED int -rd_kafka_transport_ssl_io_event(rd_kafka_transport_t *rktrans, int events) { - int r; - char errstr[512]; - - if (events & POLLOUT) { - rd_kafka_transport_ssl_clear_error(rktrans); - - r = SSL_write(rktrans->rktrans_ssl, NULL, 0); - if (rd_kafka_transport_ssl_io_update(rktrans, r, errstr, - sizeof(errstr)) == -1) - goto fail; - } - - return 0; - -fail: - /* Permanent error */ - rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR, - RD_KAFKA_RESP_ERR__TRANSPORT, "%s", errstr); - return -1; -} - - -/** - * @brief Verify SSL handshake was valid. - */ -static int rd_kafka_transport_ssl_verify(rd_kafka_transport_t *rktrans) { - long int rl; - X509 *cert; - - if (!rktrans->rktrans_rkb->rkb_rk->rk_conf.ssl.enable_verify) - return 0; - -#if OPENSSL_VERSION_NUMBER >= 0x30000000 - cert = SSL_get1_peer_certificate(rktrans->rktrans_ssl); -#else - cert = SSL_get_peer_certificate(rktrans->rktrans_ssl); -#endif - X509_free(cert); - if (!cert) { - rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR, - RD_KAFKA_RESP_ERR__SSL, - "Broker did not provide a certificate"); - return -1; - } - - if ((rl = SSL_get_verify_result(rktrans->rktrans_ssl)) != X509_V_OK) { - rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR, - RD_KAFKA_RESP_ERR__SSL, - "Failed to verify broker certificate: %s", - X509_verify_cert_error_string(rl)); - return -1; - } - - rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SSLVERIFY", - "Broker SSL certificate verified"); - return 0; -} - -/** - * @brief SSL handshake handling. - * Call repeatedly (based on IO events) until handshake is done. - * - * @returns -1 on error, 0 if handshake is still in progress, - * or 1 on completion. - */ -int rd_kafka_transport_ssl_handshake(rd_kafka_transport_t *rktrans) { - rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; - char errstr[512]; - int r; - - r = SSL_do_handshake(rktrans->rktrans_ssl); - if (r == 1) { - /* SSL handshake done. Verify. */ - if (rd_kafka_transport_ssl_verify(rktrans) == -1) - return -1; - - rd_kafka_transport_connect_done(rktrans, NULL); - return 1; - - } else if (rd_kafka_transport_ssl_io_update(rktrans, r, errstr, - sizeof(errstr)) == -1) { - const char *extra = ""; - rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__SSL; - - if (strstr(errstr, "unexpected message")) - extra = - ": client SSL authentication might be " - "required (see ssl.key.location and " - "ssl.certificate.location and consult the " - "broker logs for more information)"; - else if (strstr(errstr, - "tls_process_server_certificate:" - "certificate verify failed") || - strstr(errstr, "error:0A000086") /*openssl3*/ || - strstr(errstr, - "get_server_certificate:" - "certificate verify failed")) - extra = - ": broker certificate could not be verified, " - "verify that ssl.ca.location is correctly " - "configured or root CA certificates are " - "installed" -#ifdef __APPLE__ - " (brew install openssl)" -#elif defined(_WIN32) - " (add broker's CA certificate to the Windows " - "Root certificate store)" -#else - " (install ca-certificates package)" -#endif - ; - else if (!strcmp(errstr, "Disconnected")) { - extra = ": connecting to a PLAINTEXT broker listener?"; - /* Disconnects during handshake are most likely - * not due to SSL, but rather at the transport level */ - err = RD_KAFKA_RESP_ERR__TRANSPORT; - } - - rd_kafka_broker_fail(rkb, LOG_ERR, err, - "SSL handshake failed: %s%s", errstr, - extra); - return -1; - } - - return 0; -} - - - -/** - * @brief Parse a PEM-formatted string into an EVP_PKEY (PrivateKey) object. - * - * @param str Input PEM string, nul-terminated - * - * @remark This method does not provide automatic addition of PEM - * headers and footers. - * - * @returns a new EVP_PKEY on success or NULL on error. - */ -static EVP_PKEY *rd_kafka_ssl_PKEY_from_string(rd_kafka_t *rk, - const char *str) { - BIO *bio = BIO_new_mem_buf((void *)str, -1); - EVP_PKEY *pkey; - - pkey = PEM_read_bio_PrivateKey(bio, NULL, - rd_kafka_transport_ssl_passwd_cb, rk); - - BIO_free(bio); - - return pkey; -} - -/** - * @brief Parse a PEM-formatted string into an X509 object. - * - * @param str Input PEM string, nul-terminated - * - * @returns a new X509 on success or NULL on error. - */ -static X509 *rd_kafka_ssl_X509_from_string(rd_kafka_t *rk, const char *str) { - BIO *bio = BIO_new_mem_buf((void *)str, -1); - X509 *x509; - - x509 = - PEM_read_bio_X509(bio, NULL, rd_kafka_transport_ssl_passwd_cb, rk); - - BIO_free(bio); - - return x509; -} - - -#ifdef _WIN32 - -/** - * @brief Attempt load CA certificates from a Windows Certificate store. - */ -static int rd_kafka_ssl_win_load_cert_store(rd_kafka_t *rk, - SSL_CTX *ctx, - const char *store_name) { - HCERTSTORE w_store; - PCCERT_CONTEXT w_cctx = NULL; - X509_STORE *store; - int fail_cnt = 0, cnt = 0; - char errstr[256]; - wchar_t *wstore_name; - size_t wsize = 0; - errno_t werr; - - /* Convert store_name to wide-char */ - werr = mbstowcs_s(&wsize, NULL, 0, store_name, strlen(store_name)); - if (werr || wsize < 2 || wsize > 1000) { - rd_kafka_log(rk, LOG_ERR, "CERTSTORE", - "Invalid Windows certificate store name: %.*s%s", - 30, store_name, - wsize < 2 ? " (empty)" : " (truncated)"); - return -1; - } - wstore_name = rd_alloca(sizeof(*wstore_name) * wsize); - werr = mbstowcs_s(NULL, wstore_name, wsize, store_name, - strlen(store_name)); - rd_assert(!werr); - - w_store = CertOpenStore(CERT_STORE_PROV_SYSTEM, 0, 0, - CERT_SYSTEM_STORE_CURRENT_USER | - CERT_STORE_READONLY_FLAG | - CERT_STORE_OPEN_EXISTING_FLAG, - wstore_name); - if (!w_store) { - rd_kafka_log( - rk, LOG_ERR, "CERTSTORE", - "Failed to open Windows certificate " - "%s store: %s", - store_name, - rd_strerror_w32(GetLastError(), errstr, sizeof(errstr))); - return -1; - } - - /* Get the OpenSSL trust store */ - store = SSL_CTX_get_cert_store(ctx); - - /* Enumerate the Windows certs */ - while ((w_cctx = CertEnumCertificatesInStore(w_store, w_cctx))) { - X509 *x509; - - /* Parse Windows cert: DER -> X.509 */ - x509 = d2i_X509(NULL, - (const unsigned char **)&w_cctx->pbCertEncoded, - (long)w_cctx->cbCertEncoded); - if (!x509) { - fail_cnt++; - continue; - } - - /* Add cert to OpenSSL's trust store */ - if (!X509_STORE_add_cert(store, x509)) - fail_cnt++; - else - cnt++; - - X509_free(x509); - } - - if (w_cctx) - CertFreeCertificateContext(w_cctx); - - CertCloseStore(w_store, 0); - - rd_kafka_dbg(rk, SECURITY, "CERTSTORE", - "%d certificate(s) successfully added from " - "Windows Certificate %s store, %d failed", - cnt, store_name, fail_cnt); - - if (cnt == 0 && fail_cnt > 0) - return -1; - - return cnt; -} - -/** - * @brief Load certs from the configured CSV list of Windows Cert stores. - * - * @returns the number of successfully loaded certificates, or -1 on error. - */ -static int rd_kafka_ssl_win_load_cert_stores(rd_kafka_t *rk, - SSL_CTX *ctx, - const char *store_names) { - char *s; - int cert_cnt = 0, fail_cnt = 0; - - if (!store_names || !*store_names) - return 0; - - rd_strdupa(&s, store_names); - - /* Parse CSV list ("Root,CA, , ,Something") and load - * each store in order. */ - while (*s) { - char *t; - const char *store_name; - int r; - - while (isspace((int)*s) || *s == ',') - s++; - - if (!*s) - break; - - store_name = s; - - t = strchr(s, (int)','); - if (t) { - *t = '\0'; - s = t + 1; - for (; t >= store_name && isspace((int)*t); t--) - *t = '\0'; - } else { - s = ""; - } - - r = rd_kafka_ssl_win_load_cert_store(rk, ctx, store_name); - if (r != -1) - cert_cnt += r; - else - fail_cnt++; - } - - if (cert_cnt == 0 && fail_cnt > 0) - return -1; - - return cert_cnt; -} -#endif /* MSC_VER */ - - - -/** - * @brief Probe for the system's CA certificate location and if found set it - * on the \p CTX. - * - * @returns 0 if CA location was set, else -1. - */ -static int rd_kafka_ssl_probe_and_set_default_ca_location(rd_kafka_t *rk, - SSL_CTX *ctx) { -#if _WIN32 - /* No standard location on Windows, CA certs are in the ROOT store. */ - return -1; -#else - /* The probe paths are based on: - * https://www.happyassassin.net/posts/2015/01/12/a-note-about-ssltls-trusted-certificate-stores-and-platforms/ - * Golang's crypto probing paths: - * https://golang.org/search?q=certFiles and certDirectories - */ - static const char *paths[] = { - "/etc/pki/tls/certs/ca-bundle.crt", - "/etc/ssl/certs/ca-bundle.crt", - "/etc/pki/tls/certs/ca-bundle.trust.crt", - "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", - - "/etc/ssl/ca-bundle.pem", - "/etc/pki/tls/cacert.pem", - "/etc/ssl/cert.pem", - "/etc/ssl/cacert.pem", - - "/etc/certs/ca-certificates.crt", - "/etc/ssl/certs/ca-certificates.crt", - - "/etc/ssl/certs", - - "/usr/local/etc/ssl/cert.pem", - "/usr/local/etc/ssl/cacert.pem", - - "/usr/local/etc/ssl/certs/cert.pem", - "/usr/local/etc/ssl/certs/cacert.pem", - - /* BSD */ - "/usr/local/share/certs/ca-root-nss.crt", - "/etc/openssl/certs/ca-certificates.crt", -#ifdef __APPLE__ - "/private/etc/ssl/cert.pem", - "/private/etc/ssl/certs", - "/usr/local/etc/openssl@1.1/cert.pem", - "/usr/local/etc/openssl@1.0/cert.pem", - "/usr/local/etc/openssl/certs", - "/System/Library/OpenSSL", -#endif -#ifdef _AIX - "/var/ssl/certs/ca-bundle.crt", -#endif - NULL, - }; - const char *path = NULL; - int i; - - for (i = 0; (path = paths[i]); i++) { - struct stat st; - rd_bool_t is_dir; - int r; - - if (stat(path, &st) != 0) - continue; - - is_dir = S_ISDIR(st.st_mode); - - if (is_dir && rd_kafka_dir_is_empty(path)) - continue; - - rd_kafka_dbg(rk, SECURITY, "CACERTS", - "Setting default CA certificate location " - "to %s, override with ssl.ca.location", - path); - - r = SSL_CTX_load_verify_locations(ctx, is_dir ? NULL : path, - is_dir ? path : NULL); - if (r != 1) { - char errstr[512]; - /* Read error and clear the error stack */ - rd_kafka_ssl_error(rk, NULL, errstr, sizeof(errstr)); - rd_kafka_dbg(rk, SECURITY, "CACERTS", - "Failed to set default CA certificate " - "location to %s %s: %s: skipping", - is_dir ? "directory" : "file", path, - errstr); - continue; - } - - return 0; - } - - rd_kafka_dbg(rk, SECURITY, "CACERTS", - "Unable to find any standard CA certificate" - "paths: is the ca-certificates package installed?"); - return -1; -#endif -} - - -/** - * @brief Registers certificates, keys, etc, on the SSL_CTX - * - * @returns -1 on error, or 0 on success. - */ -static int rd_kafka_ssl_set_certs(rd_kafka_t *rk, - SSL_CTX *ctx, - char *errstr, - size_t errstr_size) { - rd_bool_t ca_probe = rd_true; - rd_bool_t check_pkey = rd_false; - int r; - - /* - * ssl_ca, ssl.ca.location, or Windows cert root store, - * or default paths. - */ - if (rk->rk_conf.ssl.ca) { - /* CA certificate chain set with conf_set_ssl_cert() */ - rd_kafka_dbg(rk, SECURITY, "SSL", - "Loading CA certificate(s) from memory"); - - SSL_CTX_set_cert_store(ctx, rk->rk_conf.ssl.ca->store); - - /* OpenSSL takes ownership of the store */ - rk->rk_conf.ssl.ca->store = NULL; - - ca_probe = rd_false; - - } else { - - if (rk->rk_conf.ssl.ca_location && - strcmp(rk->rk_conf.ssl.ca_location, "probe")) { - /* CA certificate location, either file or directory. */ - int is_dir = - rd_kafka_path_is_dir(rk->rk_conf.ssl.ca_location); - - rd_kafka_dbg(rk, SECURITY, "SSL", - "Loading CA certificate(s) from %s %s", - is_dir ? "directory" : "file", - rk->rk_conf.ssl.ca_location); - - r = SSL_CTX_load_verify_locations( - ctx, !is_dir ? rk->rk_conf.ssl.ca_location : NULL, - is_dir ? rk->rk_conf.ssl.ca_location : NULL); - - if (r != 1) { - rd_snprintf(errstr, errstr_size, - "ssl.ca.location failed: "); - return -1; - } - - ca_probe = rd_false; - } - - if (rk->rk_conf.ssl.ca_pem) { - /* CA as PEM string */ - X509 *x509; - X509_STORE *store; - BIO *bio; - int cnt = 0; - - /* Get the OpenSSL trust store */ - store = SSL_CTX_get_cert_store(ctx); - rd_assert(store != NULL); - - rd_kafka_dbg(rk, SECURITY, "SSL", - "Loading CA certificate(s) from string"); - - bio = - BIO_new_mem_buf((void *)rk->rk_conf.ssl.ca_pem, -1); - rd_assert(bio != NULL); - - /* Add all certificates to cert store */ - while ((x509 = PEM_read_bio_X509( - bio, NULL, rd_kafka_transport_ssl_passwd_cb, - rk))) { - if (!X509_STORE_add_cert(store, x509)) { - rd_snprintf(errstr, errstr_size, - "failed to add ssl.ca.pem " - "certificate " - "#%d to CA cert store: ", - cnt); - X509_free(x509); - BIO_free(bio); - return -1; - } - - X509_free(x509); - cnt++; - } - - if (!BIO_eof(bio) || !cnt) { - rd_snprintf(errstr, errstr_size, - "failed to read certificate #%d " - "from ssl.ca.pem: " - "not in PEM format?: ", - cnt); - BIO_free(bio); - return -1; - } - - BIO_free(bio); - - rd_kafka_dbg(rk, SECURITY, "SSL", - "Loaded %d CA certificate(s) from string", - cnt); - - - ca_probe = rd_false; - } - } - - if (ca_probe) { -#ifdef _WIN32 - /* Attempt to load CA root certificates from the - * configured Windows certificate stores. */ - r = rd_kafka_ssl_win_load_cert_stores( - rk, ctx, rk->rk_conf.ssl.ca_cert_stores); - if (r == 0) { - rd_kafka_log( - rk, LOG_NOTICE, "CERTSTORE", - "No CA certificates loaded from " - "Windows certificate stores: " - "falling back to default OpenSSL CA paths"); - r = -1; - } else if (r == -1) - rd_kafka_log( - rk, LOG_NOTICE, "CERTSTORE", - "Failed to load CA certificates from " - "Windows certificate stores: " - "falling back to default OpenSSL CA paths"); -#else - r = -1; -#endif - - if ((rk->rk_conf.ssl.ca_location && - !strcmp(rk->rk_conf.ssl.ca_location, "probe")) -#if WITH_STATIC_LIB_libcrypto - || r == -1 -#endif - ) { - /* If OpenSSL was linked statically there is a risk - * that the system installed CA certificate path - * doesn't match the cert path of OpenSSL. - * To circumvent this we check for the existence - * of standard CA certificate paths and use the - * first one that is found. - * Ignore failures. */ - r = rd_kafka_ssl_probe_and_set_default_ca_location(rk, - ctx); - } - - if (r == -1) { - /* Use default CA certificate paths from linked OpenSSL: - * ignore failures */ - - r = SSL_CTX_set_default_verify_paths(ctx); - if (r != 1) { - char errstr2[512]; - /* Read error and clear the error stack. */ - rd_kafka_ssl_error(rk, NULL, errstr2, - sizeof(errstr2)); - rd_kafka_dbg( - rk, SECURITY, "SSL", - "SSL_CTX_set_default_verify_paths() " - "failed: %s: ignoring", - errstr2); - } - r = 0; - } - } - - if (rk->rk_conf.ssl.crl_location) { - rd_kafka_dbg(rk, SECURITY, "SSL", "Loading CRL from file %s", - rk->rk_conf.ssl.crl_location); - - r = SSL_CTX_load_verify_locations( - ctx, rk->rk_conf.ssl.crl_location, NULL); - - if (r != 1) { - rd_snprintf(errstr, errstr_size, - "ssl.crl.location failed: "); - return -1; - } - - - rd_kafka_dbg(rk, SECURITY, "SSL", "Enabling CRL checks"); - - X509_STORE_set_flags(SSL_CTX_get_cert_store(ctx), - X509_V_FLAG_CRL_CHECK); - } - - - /* - * ssl_cert, ssl.certificate.location and ssl.certificate.pem - */ - if (rk->rk_conf.ssl.cert) { - rd_kafka_dbg(rk, SECURITY, "SSL", - "Loading public key from memory"); - - rd_assert(rk->rk_conf.ssl.cert->x509); - r = SSL_CTX_use_certificate(ctx, rk->rk_conf.ssl.cert->x509); - if (r != 1) { - rd_snprintf(errstr, errstr_size, "ssl_cert failed: "); - return -1; - } - } - - if (rk->rk_conf.ssl.cert_location) { - rd_kafka_dbg(rk, SECURITY, "SSL", - "Loading public key from file %s", - rk->rk_conf.ssl.cert_location); - - r = SSL_CTX_use_certificate_chain_file( - ctx, rk->rk_conf.ssl.cert_location); - - if (r != 1) { - rd_snprintf(errstr, errstr_size, - "ssl.certificate.location failed: "); - return -1; - } - } - - if (rk->rk_conf.ssl.cert_pem) { - X509 *x509; - - rd_kafka_dbg(rk, SECURITY, "SSL", - "Loading public key from string"); - - x509 = - rd_kafka_ssl_X509_from_string(rk, rk->rk_conf.ssl.cert_pem); - if (!x509) { - rd_snprintf(errstr, errstr_size, - "ssl.certificate.pem failed: " - "not in PEM format?: "); - return -1; - } - - r = SSL_CTX_use_certificate(ctx, x509); - - X509_free(x509); - - if (r != 1) { - rd_snprintf(errstr, errstr_size, - "ssl.certificate.pem failed: "); - return -1; - } - } - - - /* - * ssl_key, ssl.key.location and ssl.key.pem - */ - if (rk->rk_conf.ssl.key) { - rd_kafka_dbg(rk, SECURITY, "SSL", - "Loading private key file from memory"); - - rd_assert(rk->rk_conf.ssl.key->pkey); - r = SSL_CTX_use_PrivateKey(ctx, rk->rk_conf.ssl.key->pkey); - if (r != 1) { - rd_snprintf(errstr, errstr_size, - "ssl_key (in-memory) failed: "); - return -1; - } - - check_pkey = rd_true; - } - - if (rk->rk_conf.ssl.key_location) { - rd_kafka_dbg(rk, SECURITY, "SSL", - "Loading private key file from %s", - rk->rk_conf.ssl.key_location); - - r = SSL_CTX_use_PrivateKey_file( - ctx, rk->rk_conf.ssl.key_location, SSL_FILETYPE_PEM); - if (r != 1) { - rd_snprintf(errstr, errstr_size, - "ssl.key.location failed: "); - return -1; - } - - check_pkey = rd_true; - } - - if (rk->rk_conf.ssl.key_pem) { - EVP_PKEY *pkey; - - rd_kafka_dbg(rk, SECURITY, "SSL", - "Loading private key from string"); - - pkey = - rd_kafka_ssl_PKEY_from_string(rk, rk->rk_conf.ssl.key_pem); - if (!pkey) { - rd_snprintf(errstr, errstr_size, - "ssl.key.pem failed: " - "not in PEM format?: "); - return -1; - } - - r = SSL_CTX_use_PrivateKey(ctx, pkey); - - EVP_PKEY_free(pkey); - - if (r != 1) { - rd_snprintf(errstr, errstr_size, - "ssl.key.pem failed: "); - return -1; - } - - /* We no longer need the PEM key (it is cached in the CTX), - * clear its memory. */ - rd_kafka_desensitize_str(rk->rk_conf.ssl.key_pem); - - check_pkey = rd_true; - } - - - /* - * ssl.keystore.location - */ - if (rk->rk_conf.ssl.keystore_location) { - EVP_PKEY *pkey; - X509 *cert; - STACK_OF(X509) *ca = NULL; - BIO *bio; - PKCS12 *p12; - - rd_kafka_dbg(rk, SECURITY, "SSL", - "Loading client's keystore file from %s", - rk->rk_conf.ssl.keystore_location); - - bio = BIO_new_file(rk->rk_conf.ssl.keystore_location, "rb"); - if (!bio) { - rd_snprintf(errstr, errstr_size, - "Failed to open ssl.keystore.location: " - "%s: ", - rk->rk_conf.ssl.keystore_location); - return -1; - } - - p12 = d2i_PKCS12_bio(bio, NULL); - if (!p12) { - BIO_free(bio); - rd_snprintf(errstr, errstr_size, - "Error reading ssl.keystore.location " - "PKCS#12 file: %s: ", - rk->rk_conf.ssl.keystore_location); - return -1; - } - - pkey = EVP_PKEY_new(); - cert = X509_new(); - if (!PKCS12_parse(p12, rk->rk_conf.ssl.keystore_password, &pkey, - &cert, &ca)) { - EVP_PKEY_free(pkey); - X509_free(cert); - PKCS12_free(p12); - BIO_free(bio); - if (ca != NULL) - sk_X509_pop_free(ca, X509_free); - rd_snprintf(errstr, errstr_size, - "Failed to parse ssl.keystore.location " - "PKCS#12 file: %s: ", - rk->rk_conf.ssl.keystore_location); - return -1; - } - - if (ca != NULL) - sk_X509_pop_free(ca, X509_free); - - PKCS12_free(p12); - BIO_free(bio); - - r = SSL_CTX_use_certificate(ctx, cert); - X509_free(cert); - if (r != 1) { - EVP_PKEY_free(pkey); - rd_snprintf(errstr, errstr_size, - "Failed to use ssl.keystore.location " - "certificate: "); - return -1; - } - - r = SSL_CTX_use_PrivateKey(ctx, pkey); - EVP_PKEY_free(pkey); - if (r != 1) { - rd_snprintf(errstr, errstr_size, - "Failed to use ssl.keystore.location " - "private key: "); - return -1; - } - - check_pkey = rd_true; - } - -#if WITH_SSL_ENGINE - /* - * If applicable, use OpenSSL engine to fetch SSL certificate. - */ - if (rk->rk_conf.ssl.engine) { - STACK_OF(X509_NAME) *cert_names = sk_X509_NAME_new_null(); - STACK_OF(X509_OBJECT) *roots = - X509_STORE_get0_objects(SSL_CTX_get_cert_store(ctx)); - X509 *x509 = NULL; - EVP_PKEY *pkey = NULL; - int i = 0; - for (i = 0; i < sk_X509_OBJECT_num(roots); i++) { - x509 = X509_OBJECT_get0_X509( - sk_X509_OBJECT_value(roots, i)); - - if (x509) - sk_X509_NAME_push(cert_names, - X509_get_subject_name(x509)); - } - - if (cert_names) - sk_X509_NAME_free(cert_names); - - x509 = NULL; - r = ENGINE_load_ssl_client_cert( - rk->rk_conf.ssl.engine, NULL, cert_names, &x509, &pkey, - NULL, NULL, rk->rk_conf.ssl.engine_callback_data); - - sk_X509_NAME_free(cert_names); - if (r == -1 || !x509 || !pkey) { - X509_free(x509); - EVP_PKEY_free(pkey); - if (r == -1) - rd_snprintf(errstr, errstr_size, - "OpenSSL " - "ENGINE_load_ssl_client_cert " - "failed: "); - else if (!x509) - rd_snprintf(errstr, errstr_size, - "OpenSSL engine failed to " - "load certificate: "); - else - rd_snprintf(errstr, errstr_size, - "OpenSSL engine failed to " - "load private key: "); - - return -1; - } - - r = SSL_CTX_use_certificate(ctx, x509); - X509_free(x509); - if (r != 1) { - rd_snprintf(errstr, errstr_size, - "Failed to use SSL_CTX_use_certificate " - "with engine: "); - EVP_PKEY_free(pkey); - return -1; - } - - r = SSL_CTX_use_PrivateKey(ctx, pkey); - EVP_PKEY_free(pkey); - if (r != 1) { - rd_snprintf(errstr, errstr_size, - "Failed to use SSL_CTX_use_PrivateKey " - "with engine: "); - return -1; - } - - check_pkey = rd_true; - } -#endif /*WITH_SSL_ENGINE*/ - - /* Check that a valid private/public key combo was set. */ - if (check_pkey && SSL_CTX_check_private_key(ctx) != 1) { - rd_snprintf(errstr, errstr_size, "Private key check failed: "); - return -1; - } - - return 0; -} - - -/** - * @brief Once per rd_kafka_t handle cleanup of OpenSSL - * - * @locality any thread - * - * @locks rd_kafka_wrlock() MUST be held - */ -void rd_kafka_ssl_ctx_term(rd_kafka_t *rk) { - SSL_CTX_free(rk->rk_conf.ssl.ctx); - rk->rk_conf.ssl.ctx = NULL; - -#if WITH_SSL_ENGINE - RD_IF_FREE(rk->rk_conf.ssl.engine, ENGINE_free); -#endif -} - - -#if WITH_SSL_ENGINE -/** - * @brief Initialize and load OpenSSL engine, if configured. - * - * @returns true on success, false on error. - */ -static rd_bool_t -rd_kafka_ssl_ctx_init_engine(rd_kafka_t *rk, char *errstr, size_t errstr_size) { - ENGINE *engine; - - /* OpenSSL loads an engine as dynamic id and stores it in - * internal list, as per LIST_ADD command below. If engine - * already exists in internal list, it is supposed to be - * fetched using engine id. - */ - engine = ENGINE_by_id(rk->rk_conf.ssl.engine_id); - if (!engine) { - engine = ENGINE_by_id("dynamic"); - if (!engine) { - rd_snprintf(errstr, errstr_size, - "OpenSSL engine initialization failed in" - " ENGINE_by_id: "); - return rd_false; - } - } - - if (!ENGINE_ctrl_cmd_string(engine, "SO_PATH", - rk->rk_conf.ssl.engine_location, 0)) { - ENGINE_free(engine); - rd_snprintf(errstr, errstr_size, - "OpenSSL engine initialization failed in" - " ENGINE_ctrl_cmd_string SO_PATH: "); - return rd_false; - } - - if (!ENGINE_ctrl_cmd_string(engine, "LIST_ADD", "1", 0)) { - ENGINE_free(engine); - rd_snprintf(errstr, errstr_size, - "OpenSSL engine initialization failed in" - " ENGINE_ctrl_cmd_string LIST_ADD: "); - return rd_false; - } - - if (!ENGINE_ctrl_cmd_string(engine, "LOAD", NULL, 0)) { - ENGINE_free(engine); - rd_snprintf(errstr, errstr_size, - "OpenSSL engine initialization failed in" - " ENGINE_ctrl_cmd_string LOAD: "); - return rd_false; - } - - if (!ENGINE_init(engine)) { - ENGINE_free(engine); - rd_snprintf(errstr, errstr_size, - "OpenSSL engine initialization failed in" - " ENGINE_init: "); - return rd_false; - } - - rk->rk_conf.ssl.engine = engine; - - return rd_true; -} -#endif - - -#if OPENSSL_VERSION_NUMBER >= 0x30000000 -/** - * @brief Wrapper around OSSL_PROVIDER_unload() to expose a free(void*) API - * suitable for rd_list_t's free_cb. - */ -static void rd_kafka_ssl_OSSL_PROVIDER_free(void *ptr) { - OSSL_PROVIDER *prov = ptr; - (void)OSSL_PROVIDER_unload(prov); -} - - -/** - * @brief Load OpenSSL 3.0.x providers specified in comma-separated string. - * - * @remark Only the error preamble/prefix is written here, the actual - * OpenSSL error is retrieved from the OpenSSL error stack by - * the caller. - * - * @returns rd_false on failure (errstr will be written to), or rd_true - * on successs. - */ -static rd_bool_t rd_kafka_ssl_ctx_load_providers(rd_kafka_t *rk, - const char *providers_csv, - char *errstr, - size_t errstr_size) { - size_t provider_cnt, i; - char **providers = rd_string_split( - providers_csv, ',', rd_true /*skip empty*/, &provider_cnt); - - - if (!providers || !provider_cnt) { - rd_snprintf(errstr, errstr_size, - "ssl.providers expects a comma-separated " - "list of OpenSSL 3.0.x providers"); - if (providers) - rd_free(providers); - return rd_false; - } - - rd_list_init(&rk->rk_conf.ssl.loaded_providers, (int)provider_cnt, - rd_kafka_ssl_OSSL_PROVIDER_free); - - for (i = 0; i < provider_cnt; i++) { - const char *provider = providers[i]; - OSSL_PROVIDER *prov; - const char *buildinfo = NULL; - OSSL_PARAM request[] = {{"buildinfo", OSSL_PARAM_UTF8_PTR, - (void *)&buildinfo, 0, 0}, - {NULL, 0, NULL, 0, 0}}; - - prov = OSSL_PROVIDER_load(NULL, provider); - if (!prov) { - rd_snprintf(errstr, errstr_size, - "Failed to load OpenSSL provider \"%s\": ", - provider); - rd_free(providers); - return rd_false; - } - - if (!OSSL_PROVIDER_get_params(prov, request)) - buildinfo = "no buildinfo"; - - rd_kafka_dbg(rk, SECURITY, "SSL", - "OpenSSL provider \"%s\" loaded (%s)", provider, - buildinfo); - - rd_list_add(&rk->rk_conf.ssl.loaded_providers, prov); - } - - rd_free(providers); - - return rd_true; -} -#endif - - - -/** - * @brief Once per rd_kafka_t handle initialization of OpenSSL - * - * @locality application thread - * - * @locks rd_kafka_wrlock() MUST be held - */ -int rd_kafka_ssl_ctx_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { - int r; - SSL_CTX *ctx = NULL; - const char *linking = -#if WITH_STATIC_LIB_libcrypto - "statically linked " -#else - "" -#endif - ; - -#if OPENSSL_VERSION_NUMBER >= 0x10100000 - rd_kafka_dbg(rk, SECURITY, "OPENSSL", - "Using %sOpenSSL version %s " - "(0x%lx, librdkafka built with 0x%lx)", - linking, OpenSSL_version(OPENSSL_VERSION), - OpenSSL_version_num(), OPENSSL_VERSION_NUMBER); -#else - rd_kafka_dbg(rk, SECURITY, "OPENSSL", - "librdkafka built with %sOpenSSL version 0x%lx", linking, - OPENSSL_VERSION_NUMBER); -#endif - - if (errstr_size > 0) - errstr[0] = '\0'; - -#if OPENSSL_VERSION_NUMBER >= 0x30000000 - if (rk->rk_conf.ssl.providers && - !rd_kafka_ssl_ctx_load_providers(rk, rk->rk_conf.ssl.providers, - errstr, errstr_size)) - goto fail; -#endif - -#if WITH_SSL_ENGINE - if (rk->rk_conf.ssl.engine_location && !rk->rk_conf.ssl.engine) { - rd_kafka_dbg(rk, SECURITY, "SSL", - "Loading OpenSSL engine from \"%s\"", - rk->rk_conf.ssl.engine_location); - if (!rd_kafka_ssl_ctx_init_engine(rk, errstr, errstr_size)) - goto fail; - } -#endif - -#if OPENSSL_VERSION_NUMBER >= 0x10100000 - ctx = SSL_CTX_new(TLS_client_method()); -#else - ctx = SSL_CTX_new(SSLv23_client_method()); -#endif - if (!ctx) { - rd_snprintf(errstr, errstr_size, "SSL_CTX_new() failed: "); - goto fail; - } - -#ifdef SSL_OP_NO_SSLv3 - /* Disable SSLv3 (unsafe) */ - SSL_CTX_set_options(ctx, SSL_OP_NO_SSLv3); -#endif - - /* Key file password callback */ - SSL_CTX_set_default_passwd_cb(ctx, rd_kafka_transport_ssl_passwd_cb); - SSL_CTX_set_default_passwd_cb_userdata(ctx, rk); - - /* Ciphers */ - if (rk->rk_conf.ssl.cipher_suites) { - rd_kafka_dbg(rk, SECURITY, "SSL", "Setting cipher list: %s", - rk->rk_conf.ssl.cipher_suites); - if (!SSL_CTX_set_cipher_list(ctx, - rk->rk_conf.ssl.cipher_suites)) { - /* Set a string that will prefix the - * the OpenSSL error message (which is lousy) - * to make it more meaningful. */ - rd_snprintf(errstr, errstr_size, - "ssl.cipher.suites failed: "); - goto fail; - } - } - - /* Set up broker certificate verification. */ - SSL_CTX_set_verify(ctx, - rk->rk_conf.ssl.enable_verify ? SSL_VERIFY_PEER - : SSL_VERIFY_NONE, - rk->rk_conf.ssl.cert_verify_cb - ? rd_kafka_transport_ssl_cert_verify_cb - : NULL); - -#if OPENSSL_VERSION_NUMBER >= 0x1000200fL && !defined(LIBRESSL_VERSION_NUMBER) - /* Curves */ - if (rk->rk_conf.ssl.curves_list) { - rd_kafka_dbg(rk, SECURITY, "SSL", "Setting curves list: %s", - rk->rk_conf.ssl.curves_list); - if (!SSL_CTX_set1_curves_list(ctx, - rk->rk_conf.ssl.curves_list)) { - rd_snprintf(errstr, errstr_size, - "ssl.curves.list failed: "); - goto fail; - } - } - - /* Certificate signature algorithms */ - if (rk->rk_conf.ssl.sigalgs_list) { - rd_kafka_dbg(rk, SECURITY, "SSL", - "Setting signature algorithms list: %s", - rk->rk_conf.ssl.sigalgs_list); - if (!SSL_CTX_set1_sigalgs_list(ctx, - rk->rk_conf.ssl.sigalgs_list)) { - rd_snprintf(errstr, errstr_size, - "ssl.sigalgs.list failed: "); - goto fail; - } - } -#endif - - /* Register certificates, keys, etc. */ - if (rd_kafka_ssl_set_certs(rk, ctx, errstr, errstr_size) == -1) - goto fail; - - - SSL_CTX_set_mode(ctx, SSL_MODE_ENABLE_PARTIAL_WRITE); - - rk->rk_conf.ssl.ctx = ctx; - - return 0; - -fail: - r = (int)strlen(errstr); - /* If only the error preamble is provided in errstr and ending with - * "....: ", then retrieve the last error from the OpenSSL error stack, - * else treat the errstr as complete. */ - if (r > 2 && !strcmp(&errstr[r - 2], ": ")) - rd_kafka_ssl_error(rk, NULL, errstr + r, - (int)errstr_size > r ? (int)errstr_size - r - : 0); - RD_IF_FREE(ctx, SSL_CTX_free); -#if WITH_SSL_ENGINE - RD_IF_FREE(rk->rk_conf.ssl.engine, ENGINE_free); -#endif - rd_list_destroy(&rk->rk_conf.ssl.loaded_providers); - - return -1; -} - - -#if OPENSSL_VERSION_NUMBER < 0x10100000L -static RD_UNUSED void -rd_kafka_transport_ssl_lock_cb(int mode, int i, const char *file, int line) { - if (mode & CRYPTO_LOCK) - mtx_lock(&rd_kafka_ssl_locks[i]); - else - mtx_unlock(&rd_kafka_ssl_locks[i]); -} -#endif - -static RD_UNUSED unsigned long rd_kafka_transport_ssl_threadid_cb(void) { -#ifdef _WIN32 - /* Windows makes a distinction between thread handle - * and thread id, which means we can't use the - * thrd_current() API that returns the handle. */ - return (unsigned long)GetCurrentThreadId(); -#else - return (unsigned long)(intptr_t)thrd_current(); -#endif -} - -#ifdef HAVE_OPENSSL_CRYPTO_THREADID_SET_CALLBACK -static void -rd_kafka_transport_libcrypto_THREADID_callback(CRYPTO_THREADID *id) { - unsigned long thread_id = rd_kafka_transport_ssl_threadid_cb(); - - CRYPTO_THREADID_set_numeric(id, thread_id); -} -#endif - -/** - * @brief Global OpenSSL cleanup. - */ -void rd_kafka_ssl_term(void) { -#if OPENSSL_VERSION_NUMBER < 0x10100000L - int i; - - if (CRYPTO_get_locking_callback() == &rd_kafka_transport_ssl_lock_cb) { - CRYPTO_set_locking_callback(NULL); -#ifdef HAVE_OPENSSL_CRYPTO_THREADID_SET_CALLBACK - CRYPTO_THREADID_set_callback(NULL); -#else - CRYPTO_set_id_callback(NULL); -#endif - - for (i = 0; i < rd_kafka_ssl_locks_cnt; i++) - mtx_destroy(&rd_kafka_ssl_locks[i]); - - rd_free(rd_kafka_ssl_locks); - } -#endif -} - - -/** - * @brief Global (once per process) OpenSSL init. - */ -void rd_kafka_ssl_init(void) { -#if OPENSSL_VERSION_NUMBER < 0x10100000L - int i; - - if (!CRYPTO_get_locking_callback()) { - rd_kafka_ssl_locks_cnt = CRYPTO_num_locks(); - rd_kafka_ssl_locks = rd_malloc(rd_kafka_ssl_locks_cnt * - sizeof(*rd_kafka_ssl_locks)); - for (i = 0; i < rd_kafka_ssl_locks_cnt; i++) - mtx_init(&rd_kafka_ssl_locks[i], mtx_plain); - - CRYPTO_set_locking_callback(rd_kafka_transport_ssl_lock_cb); - -#ifdef HAVE_OPENSSL_CRYPTO_THREADID_SET_CALLBACK - CRYPTO_THREADID_set_callback( - rd_kafka_transport_libcrypto_THREADID_callback); -#else - CRYPTO_set_id_callback(rd_kafka_transport_ssl_threadid_cb); -#endif - } - - /* OPENSSL_init_ssl(3) and OPENSSL_init_crypto(3) say: - * "As of version 1.1.0 OpenSSL will automatically allocate - * all resources that it needs so no explicit initialisation - * is required. Similarly it will also automatically - * deinitialise as required." - */ - SSL_load_error_strings(); - SSL_library_init(); - - ERR_load_BIO_strings(); - ERR_load_crypto_strings(); - OpenSSL_add_all_algorithms(); -#endif -} |