summaryrefslogtreecommitdiffstats
path: root/fluent-bit/lib/librdkafka-2.1.0/examples/win_ssl_cert_store.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/examples/win_ssl_cert_store.cpp')
-rw-r--r--fluent-bit/lib/librdkafka-2.1.0/examples/win_ssl_cert_store.cpp395
1 files changed, 395 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/examples/win_ssl_cert_store.cpp b/fluent-bit/lib/librdkafka-2.1.0/examples/win_ssl_cert_store.cpp
new file mode 100644
index 000000000..a80dfea30
--- /dev/null
+++ b/fluent-bit/lib/librdkafka-2.1.0/examples/win_ssl_cert_store.cpp
@@ -0,0 +1,395 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2019, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/**
+ * Example of utilizing the Windows Certificate store with SSL.
+ */
+
+#include <iostream>
+#include <string>
+#include <cstdlib>
+#include <cstdio>
+#include <csignal>
+#include <cstring>
+#include <sstream>
+
+#include "../win32/wingetopt.h"
+#include <windows.h>
+#include <wincrypt.h>
+
+/*
+ * Typically include path in a real application would be
+ * #include <librdkafka/rdkafkacpp.h>
+ */
+#include "rdkafkacpp.h"
+
+
+
+class ExampleStoreRetriever {
+ public:
+ ExampleStoreRetriever(std::string const &subject, std::string const &pass) :
+ m_cert_subject(subject),
+ m_password(pass),
+ m_cert_store(NULL),
+ m_cert_ctx(NULL) {
+ load_certificate();
+ }
+
+ ~ExampleStoreRetriever() {
+ if (m_cert_ctx)
+ CertFreeCertificateContext(m_cert_ctx);
+
+ if (m_cert_store)
+ CertCloseStore(m_cert_store, 0);
+ }
+
+ /* @returns the public key in DER format */
+ const std::vector<unsigned char> get_public_key() {
+ std::vector<unsigned char> buf((size_t)m_cert_ctx->cbCertEncoded);
+ buf.assign((const char *)m_cert_ctx->pbCertEncoded,
+ (const char *)m_cert_ctx->pbCertEncoded +
+ (size_t)m_cert_ctx->cbCertEncoded);
+ return buf;
+ }
+
+ /* @returns the private key in PCKS#12 format */
+ const std::vector<unsigned char> get_private_key() {
+ ssize_t ret = 0;
+ /*
+ * In order to export the private key the certificate
+ * must first be marked as exportable.
+ *
+ * Steps to export the certificate
+ * 1) Create an in-memory cert store
+ * 2) Add the certificate to the store
+ * 3) Export the private key from the in-memory store
+ */
+
+ /* Create an in-memory cert store */
+ HCERTSTORE hMemStore =
+ CertOpenStore(CERT_STORE_PROV_MEMORY, 0, NULL, 0, NULL);
+ if (!hMemStore)
+ throw "Failed to create in-memory cert store: " +
+ GetErrorMsg(GetLastError());
+
+ /* Add certificate to store */
+ if (!CertAddCertificateContextToStore(hMemStore, m_cert_ctx,
+ CERT_STORE_ADD_USE_EXISTING, NULL))
+ throw "Failed to add certificate to store: " +
+ GetErrorMsg(GetLastError());
+
+ /*
+ * Export private key from cert
+ */
+ CRYPT_DATA_BLOB db = {NULL};
+
+ std::wstring w_password(m_password.begin(), m_password.end());
+
+ /* Acquire output size */
+ if (!PFXExportCertStoreEx(hMemStore, &db, w_password.c_str(), NULL,
+ EXPORT_PRIVATE_KEYS | REPORT_NO_PRIVATE_KEY |
+ REPORT_NOT_ABLE_TO_EXPORT_PRIVATE_KEY))
+ throw "Failed to export private key: " + GetErrorMsg(GetLastError());
+
+ std::vector<unsigned char> buf;
+
+ buf.resize(db.cbData);
+ db.pbData = &buf[0];
+
+ /* Extract key */
+ if (!PFXExportCertStoreEx(hMemStore, &db, w_password.c_str(), NULL,
+ EXPORT_PRIVATE_KEYS | REPORT_NO_PRIVATE_KEY |
+ REPORT_NOT_ABLE_TO_EXPORT_PRIVATE_KEY))
+ throw "Failed to export private key (PFX): " +
+ GetErrorMsg(GetLastError());
+
+ CertCloseStore(hMemStore, 0);
+
+ buf.resize(db.cbData);
+
+ return buf;
+ }
+
+ private:
+ void load_certificate() {
+ if (m_cert_ctx)
+ return;
+
+ m_cert_store = CertOpenStore(CERT_STORE_PROV_SYSTEM, 0, NULL,
+ CERT_SYSTEM_STORE_CURRENT_USER, L"My");
+ if (!m_cert_store)
+ throw "Failed to open cert store: " + GetErrorMsg(GetLastError());
+
+ m_cert_ctx = CertFindCertificateInStore(
+ m_cert_store, X509_ASN_ENCODING, 0, CERT_FIND_SUBJECT_STR,
+ /* should probally do a better std::string to std::wstring conversion */
+ std::wstring(m_cert_subject.begin(), m_cert_subject.end()).c_str(),
+ NULL);
+ if (!m_cert_ctx) {
+ CertCloseStore(m_cert_store, 0);
+ m_cert_store = NULL;
+ throw "Certificate " + m_cert_subject +
+ " not found in cert store: " + GetErrorMsg(GetLastError());
+ }
+ }
+
+ std::string GetErrorMsg(unsigned long error) {
+ char *message = NULL;
+ size_t ret = FormatMessageA(
+ FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM, nullptr,
+ error, 0, (char *)&message, 0, nullptr);
+ if (ret == 0) {
+ std::stringstream ss;
+
+ ss << std::string("could not format message for ") << error;
+ return ss.str();
+ } else {
+ std::string result(message, ret);
+ LocalFree(message);
+ return result;
+ }
+ }
+
+ private:
+ std::string m_cert_subject;
+ std::string m_password;
+ PCCERT_CONTEXT m_cert_ctx;
+ HCERTSTORE m_cert_store;
+};
+
+
+class PrintingSSLVerifyCb : public RdKafka::SslCertificateVerifyCb {
+ /* This SSL cert verification callback simply prints the certificates
+ * in the certificate chain.
+ * It provides no validation, everything is ok. */
+ public:
+ bool ssl_cert_verify_cb(const std::string &broker_name,
+ int32_t broker_id,
+ int *x509_error,
+ int depth,
+ const char *buf,
+ size_t size,
+ std::string &errstr) {
+ PCCERT_CONTEXT ctx = CertCreateCertificateContext(
+ X509_ASN_ENCODING | PKCS_7_ASN_ENCODING, (const uint8_t *)buf,
+ static_cast<unsigned long>(size));
+
+ if (!ctx)
+ std::cerr << "Failed to parse certificate" << std::endl;
+
+ char subject[256] = "n/a";
+ char issuer[256] = "n/a";
+
+ CertGetNameStringA(ctx, CERT_NAME_FRIENDLY_DISPLAY_TYPE, 0, NULL, subject,
+ sizeof(subject));
+
+ CertGetNameStringA(ctx, CERT_NAME_FRIENDLY_DISPLAY_TYPE,
+ CERT_NAME_ISSUER_FLAG, NULL, issuer, sizeof(issuer));
+
+ std::cerr << "Broker " << broker_name << " (" << broker_id << "): "
+ << "certificate depth " << depth << ", X509 error " << *x509_error
+ << ", subject " << subject << ", issuer " << issuer << std::endl;
+
+ if (ctx)
+ CertFreeCertificateContext(ctx);
+
+ return true;
+ }
+};
+
+
+/**
+ * @brief Print the brokers in the cluster.
+ */
+static void print_brokers(RdKafka::Handle *handle,
+ const RdKafka::Metadata *md) {
+ std::cout << md->brokers()->size() << " broker(s) in cluster "
+ << handle->clusterid(0) << std::endl;
+
+ /* Iterate brokers */
+ RdKafka::Metadata::BrokerMetadataIterator ib;
+ for (ib = md->brokers()->begin(); ib != md->brokers()->end(); ++ib)
+ std::cout << " broker " << (*ib)->id() << " at " << (*ib)->host() << ":"
+ << (*ib)->port() << std::endl;
+}
+
+
+int main(int argc, char **argv) {
+ std::string brokers;
+ std::string errstr;
+ std::string cert_subject;
+ std::string priv_key_pass;
+
+ /*
+ * Create configuration objects
+ */
+ RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
+ RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
+
+ int opt;
+ while ((opt = getopt(argc, argv, "b:d:X:s:p:")) != -1) {
+ switch (opt) {
+ case 'b':
+ brokers = optarg;
+ break;
+ case 'd':
+ if (conf->set("debug", optarg, errstr) != RdKafka::Conf::CONF_OK) {
+ std::cerr << errstr << std::endl;
+ exit(1);
+ }
+ break;
+ case 'X': {
+ char *name, *val;
+
+ name = optarg;
+ if (!(val = strchr(name, '='))) {
+ std::cerr << "%% Expected -X property=value, not " << name << std::endl;
+ exit(1);
+ }
+
+ *val = '\0';
+ val++;
+
+ if (conf->set(name, val, errstr) != RdKafka::Conf::CONF_OK) {
+ std::cerr << errstr << std::endl;
+ exit(1);
+ }
+ } break;
+
+ case 's':
+ cert_subject = optarg;
+ break;
+
+ case 'p':
+ priv_key_pass = optarg;
+ if (conf->set("ssl.key.password", optarg, errstr) !=
+ RdKafka::Conf::CONF_OK) {
+ std::cerr << errstr << std::endl;
+ exit(1);
+ }
+
+ break;
+
+ default:
+ goto usage;
+ }
+ }
+
+ if (brokers.empty() || optind != argc) {
+ usage:
+ std::string features;
+ conf->get("builtin.features", features);
+ fprintf(stderr,
+ "Usage: %s [options] -b <brokers> -s <cert-subject> -p "
+ "<priv-key-password>\n"
+ "\n"
+ "Windows Certificate Store integration example.\n"
+ "Use certlm.msc or mmc to view your certificates.\n"
+ "\n"
+ "librdkafka version %s (0x%08x, builtin.features \"%s\")\n"
+ "\n"
+ " Options:\n"
+ " -b <brokers> Broker address\n"
+ " -s <cert> The subject name of the client's SSL "
+ "certificate to use\n"
+ " -p <pass> The private key password\n"
+ " -d [facs..] Enable debugging contexts: %s\n"
+ " -X <prop=name> Set arbitrary librdkafka "
+ "configuration property\n"
+ "\n",
+ argv[0], RdKafka::version_str().c_str(), RdKafka::version(),
+ features.c_str(), RdKafka::get_debug_contexts().c_str());
+ exit(1);
+ }
+
+ if (!cert_subject.empty()) {
+ try {
+ /* Load certificates from the Windows store */
+ ExampleStoreRetriever certStore(cert_subject, priv_key_pass);
+
+ std::vector<unsigned char> pubkey, privkey;
+
+ pubkey = certStore.get_public_key();
+ privkey = certStore.get_private_key();
+
+ if (conf->set_ssl_cert(RdKafka::CERT_PUBLIC_KEY, RdKafka::CERT_ENC_DER,
+ &pubkey[0], pubkey.size(),
+ errstr) != RdKafka::Conf::CONF_OK)
+ throw "Failed to set public key: " + errstr;
+
+ if (conf->set_ssl_cert(RdKafka::CERT_PRIVATE_KEY,
+ RdKafka::CERT_ENC_PKCS12, &privkey[0],
+ privkey.size(), errstr) != RdKafka::Conf::CONF_OK)
+ throw "Failed to set private key: " + errstr;
+
+ } catch (const std::string &ex) {
+ std::cerr << ex << std::endl;
+ exit(1);
+ }
+ }
+
+
+ /*
+ * Set configuration properties
+ */
+ conf->set("bootstrap.servers", brokers, errstr);
+
+ /* We use the Certificiate verification callback to print the
+ * certificate chains being used. */
+ PrintingSSLVerifyCb ssl_verify_cb;
+
+ if (conf->set("ssl_cert_verify_cb", &ssl_verify_cb, errstr) !=
+ RdKafka::Conf::CONF_OK) {
+ std::cerr << errstr << std::endl;
+ exit(1);
+ }
+
+ /* Create any type of client, producering being the cheapest. */
+ RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
+ if (!producer) {
+ std::cerr << "Failed to create producer: " << errstr << std::endl;
+ exit(1);
+ }
+
+ RdKafka::Metadata *metadata;
+
+ /* Fetch metadata */
+ RdKafka::ErrorCode err = producer->metadata(false, NULL, &metadata, 5000);
+ if (err != RdKafka::ERR_NO_ERROR) {
+ std::cerr << "%% Failed to acquire metadata: " << RdKafka::err2str(err)
+ << std::endl;
+ exit(1);
+ }
+
+ print_brokers(producer, metadata);
+
+ delete metadata;
+ delete producer;
+
+ return 0;
+}