/* * librdkafka - The Apache Kafka C/C++ library * * Copyright (c) 2019 Magnus Edenhill * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ /** * @name OpenSSL integration * */ #include "rdkafka_int.h" #include "rdkafka_transport_int.h" #include "rdkafka_cert.h" #ifdef _WIN32 #include #pragma comment(lib, "crypt32.lib") #pragma comment(lib, "libcrypto.lib") #pragma comment(lib, "libssl.lib") #endif #include #include #if OPENSSL_VERSION_NUMBER >= 0x30000000 #include #endif #include #if !_WIN32 #include #include #include #endif #if WITH_VALGRIND /* OpenSSL relies on uninitialized memory, which Valgrind will whine about. * We use in-code Valgrind macros to suppress those warnings. */ #include #else #define VALGRIND_MAKE_MEM_DEFINED(A, B) #endif #if OPENSSL_VERSION_NUMBER < 0x10100000L static mtx_t *rd_kafka_ssl_locks; static int rd_kafka_ssl_locks_cnt; #endif /** * @brief Close and destroy SSL session */ void rd_kafka_transport_ssl_close(rd_kafka_transport_t *rktrans) { SSL_shutdown(rktrans->rktrans_ssl); SSL_free(rktrans->rktrans_ssl); rktrans->rktrans_ssl = NULL; } /** * @brief Clear OpenSSL error queue to get a proper error reporting in case * the next SSL_*() operation fails. */ static RD_INLINE void rd_kafka_transport_ssl_clear_error(rd_kafka_transport_t *rktrans) { ERR_clear_error(); #ifdef _WIN32 WSASetLastError(0); #else rd_set_errno(0); #endif } /** * @returns a thread-local single-invocation-use error string for * the last thread-local error in OpenSSL, or an empty string * if no error. */ const char *rd_kafka_ssl_last_error_str(void) { static RD_TLS char errstr[256]; unsigned long l; const char *file, *data, *func; int line, flags; #if OPENSSL_VERSION_NUMBER >= 0x30000000 l = ERR_peek_last_error_all(&file, &line, &func, &data, &flags); #else l = ERR_peek_last_error_line_data(&file, &line, &data, &flags); func = ERR_func_error_string(l); #endif if (!l) return ""; rd_snprintf(errstr, sizeof(errstr), "%lu:%s:%s:%s:%d: %s", l, ERR_lib_error_string(l), func, file, line, ((flags & ERR_TXT_STRING) && data && *data) ? data : ERR_reason_error_string(l)); return errstr; } /** * Serves the entire OpenSSL error queue and logs each error. * The last error is not logged but returned in 'errstr'. * * If 'rkb' is non-NULL broker-specific logging will be used, * else it will fall back on global 'rk' debugging. */ static char *rd_kafka_ssl_error(rd_kafka_t *rk, rd_kafka_broker_t *rkb, char *errstr, size_t errstr_size) { unsigned long l; const char *file, *data, *func; int line, flags; int cnt = 0; if (!rk) { rd_assert(rkb); rk = rkb->rkb_rk; } while ( #if OPENSSL_VERSION_NUMBER >= 0x30000000 (l = ERR_get_error_all(&file, &line, &func, &data, &flags)) #else (l = ERR_get_error_line_data(&file, &line, &data, &flags)) #endif ) { char buf[256]; #if OPENSSL_VERSION_NUMBER < 0x30000000 func = ERR_func_error_string(l); #endif if (cnt++ > 0) { /* Log last message */ if (rkb) rd_rkb_log(rkb, LOG_ERR, "SSL", "%s", errstr); else rd_kafka_log(rk, LOG_ERR, "SSL", "%s", errstr); } ERR_error_string_n(l, buf, sizeof(buf)); if (!(flags & ERR_TXT_STRING) || !data || !*data) data = NULL; /* Include openssl file:line:func if debugging is enabled */ if (rk->rk_conf.log_level >= LOG_DEBUG) rd_snprintf(errstr, errstr_size, "%s:%d:%s %s%s%s", file, line, func, buf, data ? ": " : "", data ? data : ""); else rd_snprintf(errstr, errstr_size, "%s%s%s", buf, data ? ": " : "", data ? data : ""); } if (cnt == 0) rd_snprintf(errstr, errstr_size, "No further error information available"); return errstr; } /** * Set transport IO event polling based on SSL error. * * Returns -1 on permanent errors. * * Locality: broker thread */ static RD_INLINE int rd_kafka_transport_ssl_io_update(rd_kafka_transport_t *rktrans, int ret, char *errstr, size_t errstr_size) { int serr = SSL_get_error(rktrans->rktrans_ssl, ret); int serr2; switch (serr) { case SSL_ERROR_WANT_READ: rd_kafka_transport_poll_set(rktrans, POLLIN); break; case SSL_ERROR_WANT_WRITE: rd_kafka_transport_set_blocked(rktrans, rd_true); rd_kafka_transport_poll_set(rktrans, POLLOUT); break; case SSL_ERROR_SYSCALL: serr2 = ERR_peek_error(); if (serr2) rd_kafka_ssl_error(NULL, rktrans->rktrans_rkb, errstr, errstr_size); else if (!rd_socket_errno || rd_socket_errno == ECONNRESET) rd_snprintf(errstr, errstr_size, "Disconnected"); else rd_snprintf(errstr, errstr_size, "SSL transport error: %s", rd_strerror(rd_socket_errno)); return -1; case SSL_ERROR_ZERO_RETURN: rd_snprintf(errstr, errstr_size, "Disconnected"); return -1; default: rd_kafka_ssl_error(NULL, rktrans->rktrans_rkb, errstr, errstr_size); return -1; } return 0; } ssize_t rd_kafka_transport_ssl_send(rd_kafka_transport_t *rktrans, rd_slice_t *slice, char *errstr, size_t errstr_size) { ssize_t sum = 0; const void *p; size_t rlen; rd_kafka_transport_ssl_clear_error(rktrans); while ((rlen = rd_slice_peeker(slice, &p))) { int r; size_t r2; r = SSL_write(rktrans->rktrans_ssl, p, (int)rlen); if (unlikely(r <= 0)) { if (rd_kafka_transport_ssl_io_update(rktrans, r, errstr, errstr_size) == -1) return -1; else return sum; } /* Update buffer read position */ r2 = rd_slice_read(slice, NULL, (size_t)r); rd_assert((size_t)r == r2 && *"BUG: wrote more bytes than available in slice"); sum += r; /* FIXME: remove this and try again immediately and let * the next SSL_write() call fail instead? */ if ((size_t)r < rlen) break; } return sum; } ssize_t rd_kafka_transport_ssl_recv(rd_kafka_transport_t *rktrans, rd_buf_t *rbuf, char *errstr, size_t errstr_size) { ssize_t sum = 0; void *p; size_t len; while ((len = rd_buf_get_writable(rbuf, &p))) { int r; rd_kafka_transport_ssl_clear_error(rktrans); r = SSL_read(rktrans->rktrans_ssl, p, (int)len); if (unlikely(r <= 0)) { if (rd_kafka_transport_ssl_io_update(rktrans, r, errstr, errstr_size) == -1) return -1; else return sum; } VALGRIND_MAKE_MEM_DEFINED(p, r); /* Update buffer write position */ rd_buf_write(rbuf, NULL, (size_t)r); sum += r; /* FIXME: remove this and try again immediately and let * the next SSL_read() call fail instead? */ if ((size_t)r < len) break; } return sum; } /** * OpenSSL password query callback * * Locality: application thread */ static int rd_kafka_transport_ssl_passwd_cb(char *buf, int size, int rwflag, void *userdata) { rd_kafka_t *rk = userdata; int pwlen; rd_kafka_dbg(rk, SECURITY, "SSLPASSWD", "Private key requires password"); if (!rk->rk_conf.ssl.key_password) { rd_kafka_log(rk, LOG_WARNING, "SSLPASSWD", "Private key requires password but " "no password configured (ssl.key.password)"); return -1; } pwlen = (int)strlen(rk->rk_conf.ssl.key_password); memcpy(buf, rk->rk_conf.ssl.key_password, RD_MIN(pwlen, size)); return pwlen; } /** * @brief OpenSSL callback to perform additional broker certificate * verification and validation. * * @return 1 on success when the broker certificate * is valid and 0 when the certificate is not valid. * * @sa SSL_CTX_set_verify() */ static int rd_kafka_transport_ssl_cert_verify_cb(int preverify_ok, X509_STORE_CTX *x509_ctx) { rd_kafka_transport_t *rktrans = rd_kafka_curr_transport; rd_kafka_broker_t *rkb; rd_kafka_t *rk; X509 *cert; char *buf = NULL; int buf_size; int depth; int x509_orig_error, x509_error; char errstr[512]; int ok; rd_assert(rktrans != NULL); rkb = rktrans->rktrans_rkb; rk = rkb->rkb_rk; cert = X509_STORE_CTX_get_current_cert(x509_ctx); if (!cert) { rd_rkb_log(rkb, LOG_ERR, "SSLCERTVRFY", "Failed to get current certificate to verify"); return 0; } depth = X509_STORE_CTX_get_error_depth(x509_ctx); x509_orig_error = x509_error = X509_STORE_CTX_get_error(x509_ctx); buf_size = i2d_X509(cert, (unsigned char **)&buf); if (buf_size < 0 || !buf) { rd_rkb_log(rkb, LOG_ERR, "SSLCERTVRFY", "Unable to convert certificate to X509 format"); return 0; } *errstr = '\0'; /* Call application's verification callback. */ ok = rk->rk_conf.ssl.cert_verify_cb( rk, rkb->rkb_nodename, rkb->rkb_nodeid, &x509_error, depth, buf, (size_t)buf_size, errstr, sizeof(errstr), rk->rk_conf.opaque); OPENSSL_free(buf); if (!ok) { char subject[128]; char issuer[128]; X509_NAME_oneline(X509_get_subject_name(cert), subject, sizeof(subject)); X509_NAME_oneline(X509_get_issuer_name(cert), issuer, sizeof(issuer)); rd_rkb_log(rkb, LOG_ERR, "SSLCERTVRFY", "Certificate (subject=%s, issuer=%s) verification " "callback failed: %s", subject, issuer, errstr); X509_STORE_CTX_set_error(x509_ctx, x509_error); return 0; /* verification failed */ } /* Clear error */ if (x509_orig_error != 0 && x509_error == 0) X509_STORE_CTX_set_error(x509_ctx, 0); return 1; /* verification successful */ } /** * @brief Set TLSEXT hostname for SNI and optionally enable * SSL endpoint identification verification. * * @returns 0 on success or -1 on error. */ static int rd_kafka_transport_ssl_set_endpoint_id(rd_kafka_transport_t *rktrans, char *errstr, size_t errstr_size) { char name[RD_KAFKA_NODENAME_SIZE]; char *t; rd_kafka_broker_lock(rktrans->rktrans_rkb); rd_snprintf(name, sizeof(name), "%s", rktrans->rktrans_rkb->rkb_nodename); rd_kafka_broker_unlock(rktrans->rktrans_rkb); /* Remove ":9092" port suffix from nodename */ if ((t = strrchr(name, ':'))) *t = '\0'; #if (OPENSSL_VERSION_NUMBER >= 0x0090806fL) && !defined(OPENSSL_NO_TLSEXT) /* If non-numerical hostname, send it for SNI */ if (!(/*ipv6*/ (strchr(name, ':') && strspn(name, "0123456789abcdefABCDEF:.[]%") == strlen(name)) || /*ipv4*/ strspn(name, "0123456789.") == strlen(name)) && !SSL_set_tlsext_host_name(rktrans->rktrans_ssl, name)) goto fail; #endif if (rktrans->rktrans_rkb->rkb_rk->rk_conf.ssl.endpoint_identification == RD_KAFKA_SSL_ENDPOINT_ID_NONE) return 0; #if OPENSSL_VERSION_NUMBER >= 0x10100000 && !defined(OPENSSL_IS_BORINGSSL) if (!SSL_set1_host(rktrans->rktrans_ssl, name)) goto fail; #elif OPENSSL_VERSION_NUMBER >= 0x1000200fL /* 1.0.2 */ { X509_VERIFY_PARAM *param; param = SSL_get0_param(rktrans->rktrans_ssl); if (!X509_VERIFY_PARAM_set1_host(param, name, 0)) goto fail; } #else rd_snprintf(errstr, errstr_size, "Endpoint identification not supported on this " "OpenSSL version (0x%lx)", OPENSSL_VERSION_NUMBER); return -1; #endif rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "ENDPOINT", "Enabled endpoint identification using hostname %s", name); return 0; fail: rd_kafka_ssl_error(NULL, rktrans->rktrans_rkb, errstr, errstr_size); return -1; } /** * @brief Set up SSL for a newly connected connection * * @returns -1 on failure, else 0. */ int rd_kafka_transport_ssl_connect(rd_kafka_broker_t *rkb, rd_kafka_transport_t *rktrans, char *errstr, size_t errstr_size) { int r; rktrans->rktrans_ssl = SSL_new(rkb->rkb_rk->rk_conf.ssl.ctx); if (!rktrans->rktrans_ssl) goto fail; if (!SSL_set_fd(rktrans->rktrans_ssl, (int)rktrans->rktrans_s)) goto fail; if (rd_kafka_transport_ssl_set_endpoint_id(rktrans, errstr, errstr_size) == -1) return -1; rd_kafka_transport_ssl_clear_error(rktrans); r = SSL_connect(rktrans->rktrans_ssl); if (r == 1) { /* Connected, highly unlikely since this is a * non-blocking operation. */ rd_kafka_transport_connect_done(rktrans, NULL); return 0; } if (rd_kafka_transport_ssl_io_update(rktrans, r, errstr, errstr_size) == -1) return -1; return 0; fail: rd_kafka_ssl_error(NULL, rkb, errstr, errstr_size); return -1; } static RD_UNUSED int rd_kafka_transport_ssl_io_event(rd_kafka_transport_t *rktrans, int events) { int r; char errstr[512]; if (events & POLLOUT) { rd_kafka_transport_ssl_clear_error(rktrans); r = SSL_write(rktrans->rktrans_ssl, NULL, 0); if (rd_kafka_transport_ssl_io_update(rktrans, r, errstr, sizeof(errstr)) == -1) goto fail; } return 0; fail: /* Permanent error */ rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR, RD_KAFKA_RESP_ERR__TRANSPORT, "%s", errstr); return -1; } /** * @brief Verify SSL handshake was valid. */ static int rd_kafka_transport_ssl_verify(rd_kafka_transport_t *rktrans) { long int rl; X509 *cert; if (!rktrans->rktrans_rkb->rkb_rk->rk_conf.ssl.enable_verify) return 0; #if OPENSSL_VERSION_NUMBER >= 0x30000000 cert = SSL_get1_peer_certificate(rktrans->rktrans_ssl); #else cert = SSL_get_peer_certificate(rktrans->rktrans_ssl); #endif X509_free(cert); if (!cert) { rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR, RD_KAFKA_RESP_ERR__SSL, "Broker did not provide a certificate"); return -1; } if ((rl = SSL_get_verify_result(rktrans->rktrans_ssl)) != X509_V_OK) { rd_kafka_broker_fail(rktrans->rktrans_rkb, LOG_ERR, RD_KAFKA_RESP_ERR__SSL, "Failed to verify broker certificate: %s", X509_verify_cert_error_string(rl)); return -1; } rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SSLVERIFY", "Broker SSL certificate verified"); return 0; } /** * @brief SSL handshake handling. * Call repeatedly (based on IO events) until handshake is done. * * @returns -1 on error, 0 if handshake is still in progress, * or 1 on completion. */ int rd_kafka_transport_ssl_handshake(rd_kafka_transport_t *rktrans) { rd_kafka_broker_t *rkb = rktrans->rktrans_rkb; char errstr[512]; int r; r = SSL_do_handshake(rktrans->rktrans_ssl); if (r == 1) { /* SSL handshake done. Verify. */ if (rd_kafka_transport_ssl_verify(rktrans) == -1) return -1; rd_kafka_transport_connect_done(rktrans, NULL); return 1; } else if (rd_kafka_transport_ssl_io_update(rktrans, r, errstr, sizeof(errstr)) == -1) { const char *extra = ""; rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR__SSL; if (strstr(errstr, "unexpected message")) extra = ": client SSL authentication might be " "required (see ssl.key.location and " "ssl.certificate.location and consult the " "broker logs for more information)"; else if (strstr(errstr, "tls_process_server_certificate:" "certificate verify failed") || strstr(errstr, "error:0A000086") /*openssl3*/ || strstr(errstr, "get_server_certificate:" "certificate verify failed")) extra = ": broker certificate could not be verified, " "verify that ssl.ca.location is correctly " "configured or root CA certificates are " "installed" #ifdef __APPLE__ " (brew install openssl)" #elif defined(_WIN32) " (add broker's CA certificate to the Windows " "Root certificate store)" #else " (install ca-certificates package)" #endif ; else if (!strcmp(errstr, "Disconnected")) { extra = ": connecting to a PLAINTEXT broker listener?"; /* Disconnects during handshake are most likely * not due to SSL, but rather at the transport level */ err = RD_KAFKA_RESP_ERR__TRANSPORT; } rd_kafka_broker_fail(rkb, LOG_ERR, err, "SSL handshake failed: %s%s", errstr, extra); return -1; } return 0; } /** * @brief Parse a PEM-formatted string into an EVP_PKEY (PrivateKey) object. * * @param str Input PEM string, nul-terminated * * @remark This method does not provide automatic addition of PEM * headers and footers. * * @returns a new EVP_PKEY on success or NULL on error. */ static EVP_PKEY *rd_kafka_ssl_PKEY_from_string(rd_kafka_t *rk, const char *str) { BIO *bio = BIO_new_mem_buf((void *)str, -1); EVP_PKEY *pkey; pkey = PEM_read_bio_PrivateKey(bio, NULL, rd_kafka_transport_ssl_passwd_cb, rk); BIO_free(bio); return pkey; } /** * @brief Parse a PEM-formatted string into an X509 object. * * @param str Input PEM string, nul-terminated * * @returns a new X509 on success or NULL on error. */ static X509 *rd_kafka_ssl_X509_from_string(rd_kafka_t *rk, const char *str) { BIO *bio = BIO_new_mem_buf((void *)str, -1); X509 *x509; x509 = PEM_read_bio_X509(bio, NULL, rd_kafka_transport_ssl_passwd_cb, rk); BIO_free(bio); return x509; } #ifdef _WIN32 /** * @brief Attempt load CA certificates from a Windows Certificate store. */ static int rd_kafka_ssl_win_load_cert_store(rd_kafka_t *rk, SSL_CTX *ctx, const char *store_name) { HCERTSTORE w_store; PCCERT_CONTEXT w_cctx = NULL; X509_STORE *store; int fail_cnt = 0, cnt = 0; char errstr[256]; wchar_t *wstore_name; size_t wsize = 0; errno_t werr; /* Convert store_name to wide-char */ werr = mbstowcs_s(&wsize, NULL, 0, store_name, strlen(store_name)); if (werr || wsize < 2 || wsize > 1000) { rd_kafka_log(rk, LOG_ERR, "CERTSTORE", "Invalid Windows certificate store name: %.*s%s", 30, store_name, wsize < 2 ? " (empty)" : " (truncated)"); return -1; } wstore_name = rd_alloca(sizeof(*wstore_name) * wsize); werr = mbstowcs_s(NULL, wstore_name, wsize, store_name, strlen(store_name)); rd_assert(!werr); w_store = CertOpenStore(CERT_STORE_PROV_SYSTEM, 0, 0, CERT_SYSTEM_STORE_CURRENT_USER | CERT_STORE_READONLY_FLAG | CERT_STORE_OPEN_EXISTING_FLAG, wstore_name); if (!w_store) { rd_kafka_log( rk, LOG_ERR, "CERTSTORE", "Failed to open Windows certificate " "%s store: %s", store_name, rd_strerror_w32(GetLastError(), errstr, sizeof(errstr))); return -1; } /* Get the OpenSSL trust store */ store = SSL_CTX_get_cert_store(ctx); /* Enumerate the Windows certs */ while ((w_cctx = CertEnumCertificatesInStore(w_store, w_cctx))) { X509 *x509; /* Parse Windows cert: DER -> X.509 */ x509 = d2i_X509(NULL, (const unsigned char **)&w_cctx->pbCertEncoded, (long)w_cctx->cbCertEncoded); if (!x509) { fail_cnt++; continue; } /* Add cert to OpenSSL's trust store */ if (!X509_STORE_add_cert(store, x509)) fail_cnt++; else cnt++; X509_free(x509); } if (w_cctx) CertFreeCertificateContext(w_cctx); CertCloseStore(w_store, 0); rd_kafka_dbg(rk, SECURITY, "CERTSTORE", "%d certificate(s) successfully added from " "Windows Certificate %s store, %d failed", cnt, store_name, fail_cnt); if (cnt == 0 && fail_cnt > 0) return -1; return cnt; } /** * @brief Load certs from the configured CSV list of Windows Cert stores. * * @returns the number of successfully loaded certificates, or -1 on error. */ static int rd_kafka_ssl_win_load_cert_stores(rd_kafka_t *rk, SSL_CTX *ctx, const char *store_names) { char *s; int cert_cnt = 0, fail_cnt = 0; if (!store_names || !*store_names) return 0; rd_strdupa(&s, store_names); /* Parse CSV list ("Root,CA, , ,Something") and load * each store in order. */ while (*s) { char *t; const char *store_name; int r; while (isspace((int)*s) || *s == ',') s++; if (!*s) break; store_name = s; t = strchr(s, (int)','); if (t) { *t = '\0'; s = t + 1; for (; t >= store_name && isspace((int)*t); t--) *t = '\0'; } else { s = ""; } r = rd_kafka_ssl_win_load_cert_store(rk, ctx, store_name); if (r != -1) cert_cnt += r; else fail_cnt++; } if (cert_cnt == 0 && fail_cnt > 0) return -1; return cert_cnt; } #endif /* MSC_VER */ /** * @brief Probe for the system's CA certificate location and if found set it * on the \p CTX. * * @returns 0 if CA location was set, else -1. */ static int rd_kafka_ssl_probe_and_set_default_ca_location(rd_kafka_t *rk, SSL_CTX *ctx) { #if _WIN32 /* No standard location on Windows, CA certs are in the ROOT store. */ return -1; #else /* The probe paths are based on: * https://www.happyassassin.net/posts/2015/01/12/a-note-about-ssltls-trusted-certificate-stores-and-platforms/ * Golang's crypto probing paths: * https://golang.org/search?q=certFiles and certDirectories */ static const char *paths[] = { "/etc/pki/tls/certs/ca-bundle.crt", "/etc/ssl/certs/ca-bundle.crt", "/etc/pki/tls/certs/ca-bundle.trust.crt", "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem", "/etc/ssl/ca-bundle.pem", "/etc/pki/tls/cacert.pem", "/etc/ssl/cert.pem", "/etc/ssl/cacert.pem", "/etc/certs/ca-certificates.crt", "/etc/ssl/certs/ca-certificates.crt", "/etc/ssl/certs", "/usr/local/etc/ssl/cert.pem", "/usr/local/etc/ssl/cacert.pem", "/usr/local/etc/ssl/certs/cert.pem", "/usr/local/etc/ssl/certs/cacert.pem", /* BSD */ "/usr/local/share/certs/ca-root-nss.crt", "/etc/openssl/certs/ca-certificates.crt", #ifdef __APPLE__ "/private/etc/ssl/cert.pem", "/private/etc/ssl/certs", "/usr/local/etc/openssl@1.1/cert.pem", "/usr/local/etc/openssl@1.0/cert.pem", "/usr/local/etc/openssl/certs", "/System/Library/OpenSSL", #endif #ifdef _AIX "/var/ssl/certs/ca-bundle.crt", #endif NULL, }; const char *path = NULL; int i; for (i = 0; (path = paths[i]); i++) { struct stat st; rd_bool_t is_dir; int r; if (stat(path, &st) != 0) continue; is_dir = S_ISDIR(st.st_mode); if (is_dir && rd_kafka_dir_is_empty(path)) continue; rd_kafka_dbg(rk, SECURITY, "CACERTS", "Setting default CA certificate location " "to %s, override with ssl.ca.location", path); r = SSL_CTX_load_verify_locations(ctx, is_dir ? NULL : path, is_dir ? path : NULL); if (r != 1) { char errstr[512]; /* Read error and clear the error stack */ rd_kafka_ssl_error(rk, NULL, errstr, sizeof(errstr)); rd_kafka_dbg(rk, SECURITY, "CACERTS", "Failed to set default CA certificate " "location to %s %s: %s: skipping", is_dir ? "directory" : "file", path, errstr); continue; } return 0; } rd_kafka_dbg(rk, SECURITY, "CACERTS", "Unable to find any standard CA certificate" "paths: is the ca-certificates package installed?"); return -1; #endif } /** * @brief Registers certificates, keys, etc, on the SSL_CTX * * @returns -1 on error, or 0 on success. */ static int rd_kafka_ssl_set_certs(rd_kafka_t *rk, SSL_CTX *ctx, char *errstr, size_t errstr_size) { rd_bool_t ca_probe = rd_true; rd_bool_t check_pkey = rd_false; int r; /* * ssl_ca, ssl.ca.location, or Windows cert root store, * or default paths. */ if (rk->rk_conf.ssl.ca) { /* CA certificate chain set with conf_set_ssl_cert() */ rd_kafka_dbg(rk, SECURITY, "SSL", "Loading CA certificate(s) from memory"); SSL_CTX_set_cert_store(ctx, rk->rk_conf.ssl.ca->store); /* OpenSSL takes ownership of the store */ rk->rk_conf.ssl.ca->store = NULL; ca_probe = rd_false; } else { if (rk->rk_conf.ssl.ca_location && strcmp(rk->rk_conf.ssl.ca_location, "probe")) { /* CA certificate location, either file or directory. */ int is_dir = rd_kafka_path_is_dir(rk->rk_conf.ssl.ca_location); rd_kafka_dbg(rk, SECURITY, "SSL", "Loading CA certificate(s) from %s %s", is_dir ? "directory" : "file", rk->rk_conf.ssl.ca_location); r = SSL_CTX_load_verify_locations( ctx, !is_dir ? rk->rk_conf.ssl.ca_location : NULL, is_dir ? rk->rk_conf.ssl.ca_location : NULL); if (r != 1) { rd_snprintf(errstr, errstr_size, "ssl.ca.location failed: "); return -1; } ca_probe = rd_false; } if (rk->rk_conf.ssl.ca_pem) { /* CA as PEM string */ X509 *x509; X509_STORE *store; BIO *bio; int cnt = 0; /* Get the OpenSSL trust store */ store = SSL_CTX_get_cert_store(ctx); rd_assert(store != NULL); rd_kafka_dbg(rk, SECURITY, "SSL", "Loading CA certificate(s) from string"); bio = BIO_new_mem_buf((void *)rk->rk_conf.ssl.ca_pem, -1); rd_assert(bio != NULL); /* Add all certificates to cert store */ while ((x509 = PEM_read_bio_X509( bio, NULL, rd_kafka_transport_ssl_passwd_cb, rk))) { if (!X509_STORE_add_cert(store, x509)) { rd_snprintf(errstr, errstr_size, "failed to add ssl.ca.pem " "certificate " "#%d to CA cert store: ", cnt); X509_free(x509); BIO_free(bio); return -1; } X509_free(x509); cnt++; } if (!BIO_eof(bio) || !cnt) { rd_snprintf(errstr, errstr_size, "failed to read certificate #%d " "from ssl.ca.pem: " "not in PEM format?: ", cnt); BIO_free(bio); return -1; } BIO_free(bio); rd_kafka_dbg(rk, SECURITY, "SSL", "Loaded %d CA certificate(s) from string", cnt); ca_probe = rd_false; } } if (ca_probe) { #ifdef _WIN32 /* Attempt to load CA root certificates from the * configured Windows certificate stores. */ r = rd_kafka_ssl_win_load_cert_stores( rk, ctx, rk->rk_conf.ssl.ca_cert_stores); if (r == 0) { rd_kafka_log( rk, LOG_NOTICE, "CERTSTORE", "No CA certificates loaded from " "Windows certificate stores: " "falling back to default OpenSSL CA paths"); r = -1; } else if (r == -1) rd_kafka_log( rk, LOG_NOTICE, "CERTSTORE", "Failed to load CA certificates from " "Windows certificate stores: " "falling back to default OpenSSL CA paths"); #else r = -1; #endif if ((rk->rk_conf.ssl.ca_location && !strcmp(rk->rk_conf.ssl.ca_location, "probe")) #if WITH_STATIC_LIB_libcrypto || r == -1 #endif ) { /* If OpenSSL was linked statically there is a risk * that the system installed CA certificate path * doesn't match the cert path of OpenSSL. * To circumvent this we check for the existence * of standard CA certificate paths and use the * first one that is found. * Ignore failures. */ r = rd_kafka_ssl_probe_and_set_default_ca_location(rk, ctx); } if (r == -1) { /* Use default CA certificate paths from linked OpenSSL: * ignore failures */ r = SSL_CTX_set_default_verify_paths(ctx); if (r != 1) { char errstr2[512]; /* Read error and clear the error stack. */ rd_kafka_ssl_error(rk, NULL, errstr2, sizeof(errstr2)); rd_kafka_dbg( rk, SECURITY, "SSL", "SSL_CTX_set_default_verify_paths() " "failed: %s: ignoring", errstr2); } r = 0; } } if (rk->rk_conf.ssl.crl_location) { rd_kafka_dbg(rk, SECURITY, "SSL", "Loading CRL from file %s", rk->rk_conf.ssl.crl_location); r = SSL_CTX_load_verify_locations( ctx, rk->rk_conf.ssl.crl_location, NULL); if (r != 1) { rd_snprintf(errstr, errstr_size, "ssl.crl.location failed: "); return -1; } rd_kafka_dbg(rk, SECURITY, "SSL", "Enabling CRL checks"); X509_STORE_set_flags(SSL_CTX_get_cert_store(ctx), X509_V_FLAG_CRL_CHECK); } /* * ssl_cert, ssl.certificate.location and ssl.certificate.pem */ if (rk->rk_conf.ssl.cert) { rd_kafka_dbg(rk, SECURITY, "SSL", "Loading public key from memory"); rd_assert(rk->rk_conf.ssl.cert->x509); r = SSL_CTX_use_certificate(ctx, rk->rk_conf.ssl.cert->x509); if (r != 1) { rd_snprintf(errstr, errstr_size, "ssl_cert failed: "); return -1; } } if (rk->rk_conf.ssl.cert_location) { rd_kafka_dbg(rk, SECURITY, "SSL", "Loading public key from file %s", rk->rk_conf.ssl.cert_location); r = SSL_CTX_use_certificate_chain_file( ctx, rk->rk_conf.ssl.cert_location); if (r != 1) { rd_snprintf(errstr, errstr_size, "ssl.certificate.location failed: "); return -1; } } if (rk->rk_conf.ssl.cert_pem) { X509 *x509; rd_kafka_dbg(rk, SECURITY, "SSL", "Loading public key from string"); x509 = rd_kafka_ssl_X509_from_string(rk, rk->rk_conf.ssl.cert_pem); if (!x509) { rd_snprintf(errstr, errstr_size, "ssl.certificate.pem failed: " "not in PEM format?: "); return -1; } r = SSL_CTX_use_certificate(ctx, x509); X509_free(x509); if (r != 1) { rd_snprintf(errstr, errstr_size, "ssl.certificate.pem failed: "); return -1; } } /* * ssl_key, ssl.key.location and ssl.key.pem */ if (rk->rk_conf.ssl.key) { rd_kafka_dbg(rk, SECURITY, "SSL", "Loading private key file from memory"); rd_assert(rk->rk_conf.ssl.key->pkey); r = SSL_CTX_use_PrivateKey(ctx, rk->rk_conf.ssl.key->pkey); if (r != 1) { rd_snprintf(errstr, errstr_size, "ssl_key (in-memory) failed: "); return -1; } check_pkey = rd_true; } if (rk->rk_conf.ssl.key_location) { rd_kafka_dbg(rk, SECURITY, "SSL", "Loading private key file from %s", rk->rk_conf.ssl.key_location); r = SSL_CTX_use_PrivateKey_file( ctx, rk->rk_conf.ssl.key_location, SSL_FILETYPE_PEM); if (r != 1) { rd_snprintf(errstr, errstr_size, "ssl.key.location failed: "); return -1; } check_pkey = rd_true; } if (rk->rk_conf.ssl.key_pem) { EVP_PKEY *pkey; rd_kafka_dbg(rk, SECURITY, "SSL", "Loading private key from string"); pkey = rd_kafka_ssl_PKEY_from_string(rk, rk->rk_conf.ssl.key_pem); if (!pkey) { rd_snprintf(errstr, errstr_size, "ssl.key.pem failed: " "not in PEM format?: "); return -1; } r = SSL_CTX_use_PrivateKey(ctx, pkey); EVP_PKEY_free(pkey); if (r != 1) { rd_snprintf(errstr, errstr_size, "ssl.key.pem failed: "); return -1; } /* We no longer need the PEM key (it is cached in the CTX), * clear its memory. */ rd_kafka_desensitize_str(rk->rk_conf.ssl.key_pem); check_pkey = rd_true; } /* * ssl.keystore.location */ if (rk->rk_conf.ssl.keystore_location) { EVP_PKEY *pkey; X509 *cert; STACK_OF(X509) *ca = NULL; BIO *bio; PKCS12 *p12; rd_kafka_dbg(rk, SECURITY, "SSL", "Loading client's keystore file from %s", rk->rk_conf.ssl.keystore_location); bio = BIO_new_file(rk->rk_conf.ssl.keystore_location, "rb"); if (!bio) { rd_snprintf(errstr, errstr_size, "Failed to open ssl.keystore.location: " "%s: ", rk->rk_conf.ssl.keystore_location); return -1; } p12 = d2i_PKCS12_bio(bio, NULL); if (!p12) { BIO_free(bio); rd_snprintf(errstr, errstr_size, "Error reading ssl.keystore.location " "PKCS#12 file: %s: ", rk->rk_conf.ssl.keystore_location); return -1; } pkey = EVP_PKEY_new(); cert = X509_new(); if (!PKCS12_parse(p12, rk->rk_conf.ssl.keystore_password, &pkey, &cert, &ca)) { EVP_PKEY_free(pkey); X509_free(cert); PKCS12_free(p12); BIO_free(bio); if (ca != NULL) sk_X509_pop_free(ca, X509_free); rd_snprintf(errstr, errstr_size, "Failed to parse ssl.keystore.location " "PKCS#12 file: %s: ", rk->rk_conf.ssl.keystore_location); return -1; } if (ca != NULL) sk_X509_pop_free(ca, X509_free); PKCS12_free(p12); BIO_free(bio); r = SSL_CTX_use_certificate(ctx, cert); X509_free(cert); if (r != 1) { EVP_PKEY_free(pkey); rd_snprintf(errstr, errstr_size, "Failed to use ssl.keystore.location " "certificate: "); return -1; } r = SSL_CTX_use_PrivateKey(ctx, pkey); EVP_PKEY_free(pkey); if (r != 1) { rd_snprintf(errstr, errstr_size, "Failed to use ssl.keystore.location " "private key: "); return -1; } check_pkey = rd_true; } #if WITH_SSL_ENGINE /* * If applicable, use OpenSSL engine to fetch SSL certificate. */ if (rk->rk_conf.ssl.engine) { STACK_OF(X509_NAME) *cert_names = sk_X509_NAME_new_null(); STACK_OF(X509_OBJECT) *roots = X509_STORE_get0_objects(SSL_CTX_get_cert_store(ctx)); X509 *x509 = NULL; EVP_PKEY *pkey = NULL; int i = 0; for (i = 0; i < sk_X509_OBJECT_num(roots); i++) { x509 = X509_OBJECT_get0_X509( sk_X509_OBJECT_value(roots, i)); if (x509) sk_X509_NAME_push(cert_names, X509_get_subject_name(x509)); } if (cert_names) sk_X509_NAME_free(cert_names); x509 = NULL; r = ENGINE_load_ssl_client_cert( rk->rk_conf.ssl.engine, NULL, cert_names, &x509, &pkey, NULL, NULL, rk->rk_conf.ssl.engine_callback_data); sk_X509_NAME_free(cert_names); if (r == -1 || !x509 || !pkey) { X509_free(x509); EVP_PKEY_free(pkey); if (r == -1) rd_snprintf(errstr, errstr_size, "OpenSSL " "ENGINE_load_ssl_client_cert " "failed: "); else if (!x509) rd_snprintf(errstr, errstr_size, "OpenSSL engine failed to " "load certificate: "); else rd_snprintf(errstr, errstr_size, "OpenSSL engine failed to " "load private key: "); return -1; } r = SSL_CTX_use_certificate(ctx, x509); X509_free(x509); if (r != 1) { rd_snprintf(errstr, errstr_size, "Failed to use SSL_CTX_use_certificate " "with engine: "); EVP_PKEY_free(pkey); return -1; } r = SSL_CTX_use_PrivateKey(ctx, pkey); EVP_PKEY_free(pkey); if (r != 1) { rd_snprintf(errstr, errstr_size, "Failed to use SSL_CTX_use_PrivateKey " "with engine: "); return -1; } check_pkey = rd_true; } #endif /*WITH_SSL_ENGINE*/ /* Check that a valid private/public key combo was set. */ if (check_pkey && SSL_CTX_check_private_key(ctx) != 1) { rd_snprintf(errstr, errstr_size, "Private key check failed: "); return -1; } return 0; } /** * @brief Once per rd_kafka_t handle cleanup of OpenSSL * * @locality any thread * * @locks rd_kafka_wrlock() MUST be held */ void rd_kafka_ssl_ctx_term(rd_kafka_t *rk) { SSL_CTX_free(rk->rk_conf.ssl.ctx); rk->rk_conf.ssl.ctx = NULL; #if WITH_SSL_ENGINE RD_IF_FREE(rk->rk_conf.ssl.engine, ENGINE_free); #endif } #if WITH_SSL_ENGINE /** * @brief Initialize and load OpenSSL engine, if configured. * * @returns true on success, false on error. */ static rd_bool_t rd_kafka_ssl_ctx_init_engine(rd_kafka_t *rk, char *errstr, size_t errstr_size) { ENGINE *engine; /* OpenSSL loads an engine as dynamic id and stores it in * internal list, as per LIST_ADD command below. If engine * already exists in internal list, it is supposed to be * fetched using engine id. */ engine = ENGINE_by_id(rk->rk_conf.ssl.engine_id); if (!engine) { engine = ENGINE_by_id("dynamic"); if (!engine) { rd_snprintf(errstr, errstr_size, "OpenSSL engine initialization failed in" " ENGINE_by_id: "); return rd_false; } } if (!ENGINE_ctrl_cmd_string(engine, "SO_PATH", rk->rk_conf.ssl.engine_location, 0)) { ENGINE_free(engine); rd_snprintf(errstr, errstr_size, "OpenSSL engine initialization failed in" " ENGINE_ctrl_cmd_string SO_PATH: "); return rd_false; } if (!ENGINE_ctrl_cmd_string(engine, "LIST_ADD", "1", 0)) { ENGINE_free(engine); rd_snprintf(errstr, errstr_size, "OpenSSL engine initialization failed in" " ENGINE_ctrl_cmd_string LIST_ADD: "); return rd_false; } if (!ENGINE_ctrl_cmd_string(engine, "LOAD", NULL, 0)) { ENGINE_free(engine); rd_snprintf(errstr, errstr_size, "OpenSSL engine initialization failed in" " ENGINE_ctrl_cmd_string LOAD: "); return rd_false; } if (!ENGINE_init(engine)) { ENGINE_free(engine); rd_snprintf(errstr, errstr_size, "OpenSSL engine initialization failed in" " ENGINE_init: "); return rd_false; } rk->rk_conf.ssl.engine = engine; return rd_true; } #endif #if OPENSSL_VERSION_NUMBER >= 0x30000000 /** * @brief Wrapper around OSSL_PROVIDER_unload() to expose a free(void*) API * suitable for rd_list_t's free_cb. */ static void rd_kafka_ssl_OSSL_PROVIDER_free(void *ptr) { OSSL_PROVIDER *prov = ptr; (void)OSSL_PROVIDER_unload(prov); } /** * @brief Load OpenSSL 3.0.x providers specified in comma-separated string. * * @remark Only the error preamble/prefix is written here, the actual * OpenSSL error is retrieved from the OpenSSL error stack by * the caller. * * @returns rd_false on failure (errstr will be written to), or rd_true * on successs. */ static rd_bool_t rd_kafka_ssl_ctx_load_providers(rd_kafka_t *rk, const char *providers_csv, char *errstr, size_t errstr_size) { size_t provider_cnt, i; char **providers = rd_string_split( providers_csv, ',', rd_true /*skip empty*/, &provider_cnt); if (!providers || !provider_cnt) { rd_snprintf(errstr, errstr_size, "ssl.providers expects a comma-separated " "list of OpenSSL 3.0.x providers"); if (providers) rd_free(providers); return rd_false; } rd_list_init(&rk->rk_conf.ssl.loaded_providers, (int)provider_cnt, rd_kafka_ssl_OSSL_PROVIDER_free); for (i = 0; i < provider_cnt; i++) { const char *provider = providers[i]; OSSL_PROVIDER *prov; const char *buildinfo = NULL; OSSL_PARAM request[] = {{"buildinfo", OSSL_PARAM_UTF8_PTR, (void *)&buildinfo, 0, 0}, {NULL, 0, NULL, 0, 0}}; prov = OSSL_PROVIDER_load(NULL, provider); if (!prov) { rd_snprintf(errstr, errstr_size, "Failed to load OpenSSL provider \"%s\": ", provider); rd_free(providers); return rd_false; } if (!OSSL_PROVIDER_get_params(prov, request)) buildinfo = "no buildinfo"; rd_kafka_dbg(rk, SECURITY, "SSL", "OpenSSL provider \"%s\" loaded (%s)", provider, buildinfo); rd_list_add(&rk->rk_conf.ssl.loaded_providers, prov); } rd_free(providers); return rd_true; } #endif /** * @brief Once per rd_kafka_t handle initialization of OpenSSL * * @locality application thread * * @locks rd_kafka_wrlock() MUST be held */ int rd_kafka_ssl_ctx_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { int r; SSL_CTX *ctx = NULL; const char *linking = #if WITH_STATIC_LIB_libcrypto "statically linked " #else "" #endif ; #if OPENSSL_VERSION_NUMBER >= 0x10100000 rd_kafka_dbg(rk, SECURITY, "OPENSSL", "Using %sOpenSSL version %s " "(0x%lx, librdkafka built with 0x%lx)", linking, OpenSSL_version(OPENSSL_VERSION), OpenSSL_version_num(), OPENSSL_VERSION_NUMBER); #else rd_kafka_dbg(rk, SECURITY, "OPENSSL", "librdkafka built with %sOpenSSL version 0x%lx", linking, OPENSSL_VERSION_NUMBER); #endif if (errstr_size > 0) errstr[0] = '\0'; #if OPENSSL_VERSION_NUMBER >= 0x30000000 if (rk->rk_conf.ssl.providers && !rd_kafka_ssl_ctx_load_providers(rk, rk->rk_conf.ssl.providers, errstr, errstr_size)) goto fail; #endif #if WITH_SSL_ENGINE if (rk->rk_conf.ssl.engine_location && !rk->rk_conf.ssl.engine) { rd_kafka_dbg(rk, SECURITY, "SSL", "Loading OpenSSL engine from \"%s\"", rk->rk_conf.ssl.engine_location); if (!rd_kafka_ssl_ctx_init_engine(rk, errstr, errstr_size)) goto fail; } #endif #if OPENSSL_VERSION_NUMBER >= 0x10100000 ctx = SSL_CTX_new(TLS_client_method()); #else ctx = SSL_CTX_new(SSLv23_client_method()); #endif if (!ctx) { rd_snprintf(errstr, errstr_size, "SSL_CTX_new() failed: "); goto fail; } #ifdef SSL_OP_NO_SSLv3 /* Disable SSLv3 (unsafe) */ SSL_CTX_set_options(ctx, SSL_OP_NO_SSLv3); #endif /* Key file password callback */ SSL_CTX_set_default_passwd_cb(ctx, rd_kafka_transport_ssl_passwd_cb); SSL_CTX_set_default_passwd_cb_userdata(ctx, rk); /* Ciphers */ if (rk->rk_conf.ssl.cipher_suites) { rd_kafka_dbg(rk, SECURITY, "SSL", "Setting cipher list: %s", rk->rk_conf.ssl.cipher_suites); if (!SSL_CTX_set_cipher_list(ctx, rk->rk_conf.ssl.cipher_suites)) { /* Set a string that will prefix the * the OpenSSL error message (which is lousy) * to make it more meaningful. */ rd_snprintf(errstr, errstr_size, "ssl.cipher.suites failed: "); goto fail; } } /* Set up broker certificate verification. */ SSL_CTX_set_verify(ctx, rk->rk_conf.ssl.enable_verify ? SSL_VERIFY_PEER : SSL_VERIFY_NONE, rk->rk_conf.ssl.cert_verify_cb ? rd_kafka_transport_ssl_cert_verify_cb : NULL); #if OPENSSL_VERSION_NUMBER >= 0x1000200fL && !defined(LIBRESSL_VERSION_NUMBER) /* Curves */ if (rk->rk_conf.ssl.curves_list) { rd_kafka_dbg(rk, SECURITY, "SSL", "Setting curves list: %s", rk->rk_conf.ssl.curves_list); if (!SSL_CTX_set1_curves_list(ctx, rk->rk_conf.ssl.curves_list)) { rd_snprintf(errstr, errstr_size, "ssl.curves.list failed: "); goto fail; } } /* Certificate signature algorithms */ if (rk->rk_conf.ssl.sigalgs_list) { rd_kafka_dbg(rk, SECURITY, "SSL", "Setting signature algorithms list: %s", rk->rk_conf.ssl.sigalgs_list); if (!SSL_CTX_set1_sigalgs_list(ctx, rk->rk_conf.ssl.sigalgs_list)) { rd_snprintf(errstr, errstr_size, "ssl.sigalgs.list failed: "); goto fail; } } #endif /* Register certificates, keys, etc. */ if (rd_kafka_ssl_set_certs(rk, ctx, errstr, errstr_size) == -1) goto fail; SSL_CTX_set_mode(ctx, SSL_MODE_ENABLE_PARTIAL_WRITE); rk->rk_conf.ssl.ctx = ctx; return 0; fail: r = (int)strlen(errstr); /* If only the error preamble is provided in errstr and ending with * "....: ", then retrieve the last error from the OpenSSL error stack, * else treat the errstr as complete. */ if (r > 2 && !strcmp(&errstr[r - 2], ": ")) rd_kafka_ssl_error(rk, NULL, errstr + r, (int)errstr_size > r ? (int)errstr_size - r : 0); RD_IF_FREE(ctx, SSL_CTX_free); #if WITH_SSL_ENGINE RD_IF_FREE(rk->rk_conf.ssl.engine, ENGINE_free); #endif rd_list_destroy(&rk->rk_conf.ssl.loaded_providers); return -1; } #if OPENSSL_VERSION_NUMBER < 0x10100000L static RD_UNUSED void rd_kafka_transport_ssl_lock_cb(int mode, int i, const char *file, int line) { if (mode & CRYPTO_LOCK) mtx_lock(&rd_kafka_ssl_locks[i]); else mtx_unlock(&rd_kafka_ssl_locks[i]); } #endif static RD_UNUSED unsigned long rd_kafka_transport_ssl_threadid_cb(void) { #ifdef _WIN32 /* Windows makes a distinction between thread handle * and thread id, which means we can't use the * thrd_current() API that returns the handle. */ return (unsigned long)GetCurrentThreadId(); #else return (unsigned long)(intptr_t)thrd_current(); #endif } #ifdef HAVE_OPENSSL_CRYPTO_THREADID_SET_CALLBACK static void rd_kafka_transport_libcrypto_THREADID_callback(CRYPTO_THREADID *id) { unsigned long thread_id = rd_kafka_transport_ssl_threadid_cb(); CRYPTO_THREADID_set_numeric(id, thread_id); } #endif /** * @brief Global OpenSSL cleanup. */ void rd_kafka_ssl_term(void) { #if OPENSSL_VERSION_NUMBER < 0x10100000L int i; if (CRYPTO_get_locking_callback() == &rd_kafka_transport_ssl_lock_cb) { CRYPTO_set_locking_callback(NULL); #ifdef HAVE_OPENSSL_CRYPTO_THREADID_SET_CALLBACK CRYPTO_THREADID_set_callback(NULL); #else CRYPTO_set_id_callback(NULL); #endif for (i = 0; i < rd_kafka_ssl_locks_cnt; i++) mtx_destroy(&rd_kafka_ssl_locks[i]); rd_free(rd_kafka_ssl_locks); } #endif } /** * @brief Global (once per process) OpenSSL init. */ void rd_kafka_ssl_init(void) { #if OPENSSL_VERSION_NUMBER < 0x10100000L int i; if (!CRYPTO_get_locking_callback()) { rd_kafka_ssl_locks_cnt = CRYPTO_num_locks(); rd_kafka_ssl_locks = rd_malloc(rd_kafka_ssl_locks_cnt * sizeof(*rd_kafka_ssl_locks)); for (i = 0; i < rd_kafka_ssl_locks_cnt; i++) mtx_init(&rd_kafka_ssl_locks[i], mtx_plain); CRYPTO_set_locking_callback(rd_kafka_transport_ssl_lock_cb); #ifdef HAVE_OPENSSL_CRYPTO_THREADID_SET_CALLBACK CRYPTO_THREADID_set_callback( rd_kafka_transport_libcrypto_THREADID_callback); #else CRYPTO_set_id_callback(rd_kafka_transport_ssl_threadid_cb); #endif } /* OPENSSL_init_ssl(3) and OPENSSL_init_crypto(3) say: * "As of version 1.1.0 OpenSSL will automatically allocate * all resources that it needs so no explicit initialisation * is required. Similarly it will also automatically * deinitialise as required." */ SSL_load_error_strings(); SSL_library_init(); ERR_load_BIO_strings(); ERR_load_crypto_strings(); OpenSSL_add_all_algorithms(); #endif }