diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/examples/openssl_engine_example.cpp')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/examples/openssl_engine_example.cpp | 249 |
1 files changed, 249 insertions, 0 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/examples/openssl_engine_example.cpp b/fluent-bit/lib/librdkafka-2.1.0/examples/openssl_engine_example.cpp new file mode 100644 index 00000000..401857e6 --- /dev/null +++ b/fluent-bit/lib/librdkafka-2.1.0/examples/openssl_engine_example.cpp @@ -0,0 +1,249 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2021, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/** + * OpenSSL engine integration example. This example fetches metadata + * over SSL connection with broker, established using OpenSSL engine. + */ + +#include <iostream> +#include <string> +#include <cstdlib> +#include <cstdio> +#include <csignal> +#include <cstring> + +#ifdef _WIN32 +#include "../win32/wingetopt.h" +#elif _AIX +#include <unistd.h> +#else +#include <getopt.h> +#endif + +/* + * Typically include path in a real application would be + * #include <librdkafka/rdkafkacpp.h> + */ +#include "rdkafkacpp.h" + +static void metadata_print(const RdKafka::Metadata *metadata) { + std::cout << "Number of topics: " << metadata->topics()->size() << std::endl; + + /* Iterate topics */ + RdKafka::Metadata::TopicMetadataIterator it; + for (it = metadata->topics()->begin(); it != metadata->topics()->end(); ++it) + std::cout << " " << (*it)->topic() << " has " + << (*it)->partitions()->size() << " partitions." << std::endl; +} + + +class PrintingSSLVerifyCb : public RdKafka::SslCertificateVerifyCb { + /* This SSL cert verification callback simply prints the incoming + * parameters. It provides no validation, everything is ok. */ + public: + bool ssl_cert_verify_cb(const std::string &broker_name, + int32_t broker_id, + int *x509_error, + int depth, + const char *buf, + size_t size, + std::string &errstr) { + std::cout << "ssl_cert_verify_cb :" + << ": broker_name=" << broker_name << ", broker_id=" << broker_id + << ", x509_error=" << *x509_error << ", depth=" << depth + << ", buf size=" << size << std::endl; + + return true; + } +}; + + +int main(int argc, char **argv) { + std::string brokers; + std::string errstr; + std::string engine_path; + std::string ca_location; + + /* + * Create configuration objects + */ + RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + std::string engine_id; + std::string engine_callback_data; + int opt; + + if (conf->set("security.protocol", "ssl", errstr) != RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + exit(1); + } + + while ((opt = getopt(argc, argv, "b:p:c:t:d:i:e:X:")) != -1) { + switch (opt) { + case 'b': + brokers = optarg; + break; + case 'p': + engine_path = optarg; + break; + case 'c': + ca_location = optarg; + break; + case 'i': + engine_id = optarg; + break; + case 'e': + engine_callback_data = optarg; + break; + case 'd': + if (conf->set("debug", optarg, errstr) != RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + exit(1); + } + break; + case 'X': { + char *name, *val; + + name = optarg; + if (!(val = strchr(name, '='))) { + std::cerr << "%% Expected -X property=value, not " << name << std::endl; + exit(1); + } + + *val = '\0'; + val++; + + if (conf->set(name, val, errstr) != RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + exit(1); + } + } break; + + default: + goto usage; + } + } + + if (brokers.empty() || engine_path.empty() || optind != argc) { + usage: + std::string features; + conf->get("builtin.features", features); + fprintf(stderr, + "Usage: %s [options] -b <brokers> -p <engine-path> \n" + "\n" + "OpenSSL engine integration example. This example fetches\n" + "metadata over SSL connection with broker, established using\n" + "OpenSSL engine.\n" + "\n" + "librdkafka version %s (0x%08x, builtin.features \"%s\")\n" + "\n" + " Options:\n" + " -b <brokers> Broker address\n" + " -p <engine-path> Path to OpenSSL engine\n" + " -i <engine-id> OpenSSL engine id\n" + " -e <engine-callback-data> OpenSSL engine callback_data\n" + " -c <ca-cert-location> File path to ca cert\n" + " -d [facs..] Enable debugging contexts: %s\n" + " -X <prop=name> Set arbitrary librdkafka configuration" + " property\n" + "\n", + argv[0], RdKafka::version_str().c_str(), RdKafka::version(), + features.c_str(), RdKafka::get_debug_contexts().c_str()); + exit(1); + } + + if (conf->set("bootstrap.servers", brokers, errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + exit(1); + } + + if (conf->set("ssl.engine.location", engine_path, errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + exit(1); + } + + if (ca_location.length() > 0 && conf->set("ssl.ca.location", ca_location, + errstr) != RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + exit(1); + } + + if (engine_id.length() > 0 && + conf->set("ssl.engine.id", engine_id, errstr) != RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + exit(1); + } + + /* engine_callback_data needs to be persistent + * and outlive the lifetime of the Kafka client handle. */ + if (engine_callback_data.length() > 0 && + conf->set_engine_callback_data((void *)engine_callback_data.c_str(), + errstr) != RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + exit(1); + } + + /* We use the Certificiate verification callback to print the + * certificate name being used. */ + PrintingSSLVerifyCb ssl_verify_cb; + + if (conf->set("ssl_cert_verify_cb", &ssl_verify_cb, errstr) != + RdKafka::Conf::CONF_OK) { + std::cerr << errstr << std::endl; + exit(1); + } + + /* + * Create producer using accumulated global configuration. + */ + RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); + if (!producer) { + std::cerr << "Failed to create producer: " << errstr << std::endl; + exit(1); + } + + std::cout << "% Created producer " << producer->name() << std::endl; + + class RdKafka::Metadata *metadata; + + /* Fetch metadata */ + RdKafka::ErrorCode err = producer->metadata(true, NULL, &metadata, 5000); + if (err != RdKafka::ERR_NO_ERROR) + std::cerr << "%% Failed to acquire metadata: " << RdKafka::err2str(err) + << std::endl; + + metadata_print(metadata); + + delete metadata; + delete producer; + delete conf; + + return 0; +} |