diff options
Diffstat (limited to 'fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h')
-rw-r--r-- | fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h | 1628 |
1 files changed, 0 insertions, 1628 deletions
diff --git a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h b/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h deleted file mode 100644 index bc024ebe9..000000000 --- a/fluent-bit/lib/librdkafka-2.1.0/src-cpp/rdkafkacpp_int.h +++ /dev/null @@ -1,1628 +0,0 @@ -/* - * librdkafka - Apache Kafka C/C++ library - * - * Copyright (c) 2014 Magnus Edenhill - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef _RDKAFKACPP_INT_H_ -#define _RDKAFKACPP_INT_H_ - -#include <string> -#include <iostream> -#include <cstring> -#include <stdlib.h> - -#include "rdkafkacpp.h" - -extern "C" { -#include "../src/rdkafka.h" -} - -#ifdef _WIN32 -/* Visual Studio */ -#include "../src/win32_config.h" -#else -/* POSIX / UNIX based systems */ -#include "../config.h" /* mklove output */ -#endif - -#ifdef _MSC_VER -typedef int mode_t; -#pragma warning(disable : 4250) -#endif - - -namespace RdKafka { - -void consume_cb_trampoline(rd_kafka_message_t *msg, void *opaque); -void log_cb_trampoline(const rd_kafka_t *rk, - int level, - const char *fac, - const char *buf); -void error_cb_trampoline(rd_kafka_t *rk, - int err, - const char *reason, - void *opaque); -void throttle_cb_trampoline(rd_kafka_t *rk, - const char *broker_name, - int32_t broker_id, - int throttle_time_ms, - void *opaque); -int stats_cb_trampoline(rd_kafka_t *rk, - char *json, - size_t json_len, - void *opaque); -int socket_cb_trampoline(int domain, int type, int protocol, void *opaque); -int open_cb_trampoline(const char *pathname, - int flags, - mode_t mode, - void *opaque); -void rebalance_cb_trampoline(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *c_partitions, - void *opaque); -void offset_commit_cb_trampoline0(rd_kafka_t *rk, - rd_kafka_resp_err_t err, - rd_kafka_topic_partition_list_t *c_offsets, - void *opaque); -void oauthbearer_token_refresh_cb_trampoline(rd_kafka_t *rk, - const char *oauthbearer_config, - void *opaque); - -int ssl_cert_verify_cb_trampoline(rd_kafka_t *rk, - const char *broker_name, - int32_t broker_id, - int *x509_error, - int depth, - const char *buf, - size_t size, - char *errstr, - size_t errstr_size, - void *opaque); - -rd_kafka_topic_partition_list_t *partitions_to_c_parts( - const std::vector<TopicPartition *> &partitions); - -/** - * @brief Update the application provided 'partitions' with info from 'c_parts' - */ -void update_partitions_from_c_parts( - std::vector<TopicPartition *> &partitions, - const rd_kafka_topic_partition_list_t *c_parts); - - -class ErrorImpl : public Error { - public: - ~ErrorImpl() { - rd_kafka_error_destroy(c_error_); - } - - ErrorImpl(ErrorCode code, const std::string *errstr) { - c_error_ = rd_kafka_error_new(static_cast<rd_kafka_resp_err_t>(code), - errstr ? "%s" : NULL, - errstr ? errstr->c_str() : NULL); - } - - ErrorImpl(rd_kafka_error_t *c_error) : c_error_(c_error) { - } - - static Error *create(ErrorCode code, const std::string *errstr) { - return new ErrorImpl(code, errstr); - } - - ErrorCode code() const { - return static_cast<ErrorCode>(rd_kafka_error_code(c_error_)); - } - - std::string name() const { - return std::string(rd_kafka_error_name(c_error_)); - } - - std::string str() const { - return std::string(rd_kafka_error_string(c_error_)); - } - - bool is_fatal() const { - return !!rd_kafka_error_is_fatal(c_error_); - } - - bool is_retriable() const { - return !!rd_kafka_error_is_retriable(c_error_); - } - - bool txn_requires_abort() const { - return !!rd_kafka_error_txn_requires_abort(c_error_); - } - - rd_kafka_error_t *c_error_; -}; - - -class EventImpl : public Event { - public: - ~EventImpl() { - } - - EventImpl(Type type, - ErrorCode err, - Severity severity, - const char *fac, - const char *str) : - type_(type), - err_(err), - severity_(severity), - fac_(fac ? fac : ""), - str_(str), - id_(0), - throttle_time_(0), - fatal_(false) { - } - - EventImpl(Type type) : - type_(type), - err_(ERR_NO_ERROR), - severity_(EVENT_SEVERITY_EMERG), - fac_(""), - str_(""), - id_(0), - throttle_time_(0), - fatal_(false) { - } - - Type type() const { - return type_; - } - ErrorCode err() const { - return err_; - } - Severity severity() const { - return severity_; - } - std::string fac() const { - return fac_; - } - std::string str() const { - return str_; - } - std::string broker_name() const { - if (type_ == EVENT_THROTTLE) - return str_; - else - return std::string(""); - } - int broker_id() const { - return id_; - } - int throttle_time() const { - return throttle_time_; - } - - bool fatal() const { - return fatal_; - } - - Type type_; - ErrorCode err_; - Severity severity_; - std::string fac_; - std::string str_; /* reused for THROTTLE broker_name */ - int id_; - int throttle_time_; - bool fatal_; -}; - -class QueueImpl : virtual public Queue { - public: - QueueImpl(rd_kafka_queue_t *c_rkqu) : queue_(c_rkqu) { - } - ~QueueImpl() { - rd_kafka_queue_destroy(queue_); - } - static Queue *create(Handle *base); - ErrorCode forward(Queue *queue); - Message *consume(int timeout_ms); - int poll(int timeout_ms); - void io_event_enable(int fd, const void *payload, size_t size); - - rd_kafka_queue_t *queue_; -}; - - - -class HeadersImpl : public Headers { - public: - HeadersImpl() : headers_(rd_kafka_headers_new(8)) { - } - - HeadersImpl(rd_kafka_headers_t *headers) : headers_(headers) { - } - - HeadersImpl(const std::vector<Header> &headers) { - if (headers.size() > 0) { - headers_ = rd_kafka_headers_new(headers.size()); - from_vector(headers); - } else { - headers_ = rd_kafka_headers_new(8); - } - } - - ~HeadersImpl() { - if (headers_) { - rd_kafka_headers_destroy(headers_); - } - } - - ErrorCode add(const std::string &key, const char *value) { - rd_kafka_resp_err_t err; - err = rd_kafka_header_add(headers_, key.c_str(), key.size(), value, -1); - return static_cast<RdKafka::ErrorCode>(err); - } - - ErrorCode add(const std::string &key, const void *value, size_t value_size) { - rd_kafka_resp_err_t err; - err = rd_kafka_header_add(headers_, key.c_str(), key.size(), value, - value_size); - return static_cast<RdKafka::ErrorCode>(err); - } - - ErrorCode add(const std::string &key, const std::string &value) { - rd_kafka_resp_err_t err; - err = rd_kafka_header_add(headers_, key.c_str(), key.size(), value.c_str(), - value.size()); - return static_cast<RdKafka::ErrorCode>(err); - } - - ErrorCode add(const Header &header) { - rd_kafka_resp_err_t err; - err = - rd_kafka_header_add(headers_, header.key().c_str(), header.key().size(), - header.value(), header.value_size()); - return static_cast<RdKafka::ErrorCode>(err); - } - - ErrorCode remove(const std::string &key) { - rd_kafka_resp_err_t err; - err = rd_kafka_header_remove(headers_, key.c_str()); - return static_cast<RdKafka::ErrorCode>(err); - } - - std::vector<Headers::Header> get(const std::string &key) const { - std::vector<Headers::Header> headers; - const void *value; - size_t size; - rd_kafka_resp_err_t err; - for (size_t idx = 0; !(err = rd_kafka_header_get(headers_, idx, key.c_str(), - &value, &size)); - idx++) { - headers.push_back(Headers::Header(key, value, size)); - } - return headers; - } - - Headers::Header get_last(const std::string &key) const { - const void *value; - size_t size; - rd_kafka_resp_err_t err; - err = rd_kafka_header_get_last(headers_, key.c_str(), &value, &size); - return Headers::Header(key, value, size, - static_cast<RdKafka::ErrorCode>(err)); - } - - std::vector<Headers::Header> get_all() const { - std::vector<Headers::Header> headers; - size_t idx = 0; - const char *name; - const void *valuep; - size_t size; - while (!rd_kafka_header_get_all(headers_, idx++, &name, &valuep, &size)) { - headers.push_back(Headers::Header(name, valuep, size)); - } - return headers; - } - - size_t size() const { - return rd_kafka_header_cnt(headers_); - } - - /** @brief Reset the C headers pointer to NULL. */ - void c_headers_destroyed() { - headers_ = NULL; - } - - /** @returns the underlying C headers, or NULL. */ - rd_kafka_headers_t *c_ptr() { - return headers_; - } - - - private: - void from_vector(const std::vector<Header> &headers) { - if (headers.size() == 0) - return; - for (std::vector<Header>::const_iterator it = headers.begin(); - it != headers.end(); it++) - this->add(*it); - } - - HeadersImpl(HeadersImpl const &) /*= delete*/; - HeadersImpl &operator=(HeadersImpl const &) /*= delete*/; - - rd_kafka_headers_t *headers_; -}; - - - -class MessageImpl : public Message { - public: - ~MessageImpl() { - if (free_rkmessage_) - rd_kafka_message_destroy(const_cast<rd_kafka_message_t *>(rkmessage_)); - if (key_) - delete key_; - if (headers_) - delete headers_; - } - - MessageImpl(rd_kafka_type_t rk_type, - RdKafka::Topic *topic, - rd_kafka_message_t *rkmessage) : - topic_(topic), - rkmessage_(rkmessage), - free_rkmessage_(true), - key_(NULL), - headers_(NULL), - rk_type_(rk_type) { - } - - MessageImpl(rd_kafka_type_t rk_type, - RdKafka::Topic *topic, - rd_kafka_message_t *rkmessage, - bool dofree) : - topic_(topic), - rkmessage_(rkmessage), - free_rkmessage_(dofree), - key_(NULL), - headers_(NULL), - rk_type_(rk_type) { - } - - MessageImpl(rd_kafka_type_t rk_type, rd_kafka_message_t *rkmessage) : - topic_(NULL), - rkmessage_(rkmessage), - free_rkmessage_(true), - key_(NULL), - headers_(NULL), - rk_type_(rk_type) { - if (rkmessage->rkt) { - /* Possibly NULL */ - topic_ = static_cast<Topic *>(rd_kafka_topic_opaque(rkmessage->rkt)); - } - } - - /* Create errored message */ - MessageImpl(rd_kafka_type_t rk_type, - RdKafka::Topic *topic, - RdKafka::ErrorCode err) : - topic_(topic), - free_rkmessage_(false), - key_(NULL), - headers_(NULL), - rk_type_(rk_type) { - rkmessage_ = &rkmessage_err_; - memset(&rkmessage_err_, 0, sizeof(rkmessage_err_)); - rkmessage_err_.err = static_cast<rd_kafka_resp_err_t>(err); - } - - std::string errstr() const { - const char *es; - /* message_errstr() is only available for the consumer. */ - if (rk_type_ == RD_KAFKA_CONSUMER) - es = rd_kafka_message_errstr(rkmessage_); - else - es = rd_kafka_err2str(rkmessage_->err); - - return std::string(es ? es : ""); - } - - ErrorCode err() const { - return static_cast<RdKafka::ErrorCode>(rkmessage_->err); - } - - Topic *topic() const { - return topic_; - } - std::string topic_name() const { - if (rkmessage_->rkt) - return rd_kafka_topic_name(rkmessage_->rkt); - else - return ""; - } - int32_t partition() const { - return rkmessage_->partition; - } - void *payload() const { - return rkmessage_->payload; - } - size_t len() const { - return rkmessage_->len; - } - const std::string *key() const { - if (key_) { - return key_; - } else if (rkmessage_->key) { - key_ = new std::string(static_cast<char const *>(rkmessage_->key), - rkmessage_->key_len); - return key_; - } - return NULL; - } - const void *key_pointer() const { - return rkmessage_->key; - } - size_t key_len() const { - return rkmessage_->key_len; - } - - int64_t offset() const { - return rkmessage_->offset; - } - - MessageTimestamp timestamp() const { - MessageTimestamp ts; - rd_kafka_timestamp_type_t tstype; - ts.timestamp = rd_kafka_message_timestamp(rkmessage_, &tstype); - ts.type = static_cast<MessageTimestamp::MessageTimestampType>(tstype); - return ts; - } - - void *msg_opaque() const { - return rkmessage_->_private; - } - - int64_t latency() const { - return rd_kafka_message_latency(rkmessage_); - } - - struct rd_kafka_message_s *c_ptr() { - return rkmessage_; - } - - Status status() const { - return static_cast<Status>(rd_kafka_message_status(rkmessage_)); - } - - Headers *headers() { - ErrorCode err; - return headers(&err); - } - - Headers *headers(ErrorCode *err) { - *err = ERR_NO_ERROR; - - if (!headers_) { - rd_kafka_headers_t *c_hdrs; - rd_kafka_resp_err_t c_err; - - if ((c_err = rd_kafka_message_detach_headers(rkmessage_, &c_hdrs))) { - *err = static_cast<RdKafka::ErrorCode>(c_err); - return NULL; - } - - headers_ = new HeadersImpl(c_hdrs); - } - - return headers_; - } - - int32_t broker_id() const { - return rd_kafka_message_broker_id(rkmessage_); - } - - int32_t leader_epoch() const { - return rd_kafka_message_leader_epoch(rkmessage_); - } - - - Error *offset_store() { - rd_kafka_error_t *c_error; - - c_error = rd_kafka_offset_store_message(rkmessage_); - - if (c_error) - return new ErrorImpl(c_error); - else - return NULL; - } - - RdKafka::Topic *topic_; - rd_kafka_message_t *rkmessage_; - bool free_rkmessage_; - /* For error signalling by the C++ layer the .._err_ message is - * used as a place holder and rkmessage_ is set to point to it. */ - rd_kafka_message_t rkmessage_err_; - mutable std::string *key_; /* mutable because it's a cached value */ - - private: - /* "delete" copy ctor + copy assignment, for safety of key_ */ - MessageImpl(MessageImpl const &) /*= delete*/; - MessageImpl &operator=(MessageImpl const &) /*= delete*/; - - RdKafka::Headers *headers_; - const rd_kafka_type_t rk_type_; /**< Client type */ -}; - - -class ConfImpl : public Conf { - public: - ConfImpl(ConfType conf_type) : - consume_cb_(NULL), - dr_cb_(NULL), - event_cb_(NULL), - socket_cb_(NULL), - open_cb_(NULL), - partitioner_cb_(NULL), - partitioner_kp_cb_(NULL), - rebalance_cb_(NULL), - offset_commit_cb_(NULL), - oauthbearer_token_refresh_cb_(NULL), - ssl_cert_verify_cb_(NULL), - conf_type_(conf_type), - rk_conf_(NULL), - rkt_conf_(NULL) { - } - ~ConfImpl() { - if (rk_conf_) - rd_kafka_conf_destroy(rk_conf_); - else if (rkt_conf_) - rd_kafka_topic_conf_destroy(rkt_conf_); - } - - Conf::ConfResult set(const std::string &name, - const std::string &value, - std::string &errstr); - - Conf::ConfResult set(const std::string &name, - DeliveryReportCb *dr_cb, - std::string &errstr) { - if (name != "dr_cb") { - errstr = "Invalid value type, expected RdKafka::DeliveryReportCb"; - return Conf::CONF_INVALID; - } - - if (!rk_conf_) { - errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; - return Conf::CONF_INVALID; - } - - dr_cb_ = dr_cb; - return Conf::CONF_OK; - } - - Conf::ConfResult set(const std::string &name, - OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb, - std::string &errstr) { - if (name != "oauthbearer_token_refresh_cb") { - errstr = - "Invalid value type, expected RdKafka::OAuthBearerTokenRefreshCb"; - return Conf::CONF_INVALID; - } - - if (!rk_conf_) { - errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; - return Conf::CONF_INVALID; - } - - oauthbearer_token_refresh_cb_ = oauthbearer_token_refresh_cb; - return Conf::CONF_OK; - } - - Conf::ConfResult set(const std::string &name, - EventCb *event_cb, - std::string &errstr) { - if (name != "event_cb") { - errstr = "Invalid value type, expected RdKafka::EventCb"; - return Conf::CONF_INVALID; - } - - if (!rk_conf_) { - errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; - return Conf::CONF_INVALID; - } - - event_cb_ = event_cb; - return Conf::CONF_OK; - } - - Conf::ConfResult set(const std::string &name, - const Conf *topic_conf, - std::string &errstr) { - const ConfImpl *tconf_impl = - dynamic_cast<const RdKafka::ConfImpl *>(topic_conf); - if (name != "default_topic_conf" || !tconf_impl->rkt_conf_) { - errstr = "Invalid value type, expected RdKafka::Conf"; - return Conf::CONF_INVALID; - } - - if (!rk_conf_) { - errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; - return Conf::CONF_INVALID; - } - - rd_kafka_conf_set_default_topic_conf( - rk_conf_, rd_kafka_topic_conf_dup(tconf_impl->rkt_conf_)); - - return Conf::CONF_OK; - } - - Conf::ConfResult set(const std::string &name, - PartitionerCb *partitioner_cb, - std::string &errstr) { - if (name != "partitioner_cb") { - errstr = "Invalid value type, expected RdKafka::PartitionerCb"; - return Conf::CONF_INVALID; - } - - if (!rkt_conf_) { - errstr = "Requires RdKafka::Conf::CONF_TOPIC object"; - return Conf::CONF_INVALID; - } - - partitioner_cb_ = partitioner_cb; - return Conf::CONF_OK; - } - - Conf::ConfResult set(const std::string &name, - PartitionerKeyPointerCb *partitioner_kp_cb, - std::string &errstr) { - if (name != "partitioner_key_pointer_cb") { - errstr = "Invalid value type, expected RdKafka::PartitionerKeyPointerCb"; - return Conf::CONF_INVALID; - } - - if (!rkt_conf_) { - errstr = "Requires RdKafka::Conf::CONF_TOPIC object"; - return Conf::CONF_INVALID; - } - - partitioner_kp_cb_ = partitioner_kp_cb; - return Conf::CONF_OK; - } - - Conf::ConfResult set(const std::string &name, - SocketCb *socket_cb, - std::string &errstr) { - if (name != "socket_cb") { - errstr = "Invalid value type, expected RdKafka::SocketCb"; - return Conf::CONF_INVALID; - } - - if (!rk_conf_) { - errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; - return Conf::CONF_INVALID; - } - - socket_cb_ = socket_cb; - return Conf::CONF_OK; - } - - - Conf::ConfResult set(const std::string &name, - OpenCb *open_cb, - std::string &errstr) { - if (name != "open_cb") { - errstr = "Invalid value type, expected RdKafka::OpenCb"; - return Conf::CONF_INVALID; - } - - if (!rk_conf_) { - errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; - return Conf::CONF_INVALID; - } - - open_cb_ = open_cb; - return Conf::CONF_OK; - } - - - - Conf::ConfResult set(const std::string &name, - RebalanceCb *rebalance_cb, - std::string &errstr) { - if (name != "rebalance_cb") { - errstr = "Invalid value type, expected RdKafka::RebalanceCb"; - return Conf::CONF_INVALID; - } - - if (!rk_conf_) { - errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; - return Conf::CONF_INVALID; - } - - rebalance_cb_ = rebalance_cb; - return Conf::CONF_OK; - } - - - Conf::ConfResult set(const std::string &name, - OffsetCommitCb *offset_commit_cb, - std::string &errstr) { - if (name != "offset_commit_cb") { - errstr = "Invalid value type, expected RdKafka::OffsetCommitCb"; - return Conf::CONF_INVALID; - } - - if (!rk_conf_) { - errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; - return Conf::CONF_INVALID; - } - - offset_commit_cb_ = offset_commit_cb; - return Conf::CONF_OK; - } - - - Conf::ConfResult set(const std::string &name, - SslCertificateVerifyCb *ssl_cert_verify_cb, - std::string &errstr) { - if (name != "ssl_cert_verify_cb") { - errstr = "Invalid value type, expected RdKafka::SslCertificateVerifyCb"; - return Conf::CONF_INVALID; - } - - if (!rk_conf_) { - errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; - return Conf::CONF_INVALID; - } - - ssl_cert_verify_cb_ = ssl_cert_verify_cb; - return Conf::CONF_OK; - } - - Conf::ConfResult set_engine_callback_data(void *value, std::string &errstr) { - if (!rk_conf_) { - errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; - return Conf::CONF_INVALID; - } - - rd_kafka_conf_set_engine_callback_data(rk_conf_, value); - return Conf::CONF_OK; - } - - - Conf::ConfResult set_ssl_cert(RdKafka::CertificateType cert_type, - RdKafka::CertificateEncoding cert_enc, - const void *buffer, - size_t size, - std::string &errstr) { - rd_kafka_conf_res_t res; - char errbuf[512]; - - if (!rk_conf_) { - errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; - return Conf::CONF_INVALID; - } - - res = rd_kafka_conf_set_ssl_cert( - rk_conf_, static_cast<rd_kafka_cert_type_t>(cert_type), - static_cast<rd_kafka_cert_enc_t>(cert_enc), buffer, size, errbuf, - sizeof(errbuf)); - - if (res != RD_KAFKA_CONF_OK) - errstr = errbuf; - - return static_cast<Conf::ConfResult>(res); - } - - Conf::ConfResult enable_sasl_queue(bool enable, std::string &errstr) { - if (!rk_conf_) { - errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; - return Conf::CONF_INVALID; - } - - rd_kafka_conf_enable_sasl_queue(rk_conf_, enable ? 1 : 0); - - return Conf::CONF_OK; - } - - - Conf::ConfResult get(const std::string &name, std::string &value) const { - if (name.compare("dr_cb") == 0 || name.compare("event_cb") == 0 || - name.compare("partitioner_cb") == 0 || - name.compare("partitioner_key_pointer_cb") == 0 || - name.compare("socket_cb") == 0 || name.compare("open_cb") == 0 || - name.compare("rebalance_cb") == 0 || - name.compare("offset_commit_cb") == 0 || - name.compare("oauthbearer_token_refresh_cb") == 0 || - name.compare("ssl_cert_verify_cb") == 0 || - name.compare("set_engine_callback_data") == 0 || - name.compare("enable_sasl_queue") == 0) { - return Conf::CONF_INVALID; - } - rd_kafka_conf_res_t res = RD_KAFKA_CONF_INVALID; - - /* Get size of property */ - size_t size; - if (rk_conf_) - res = rd_kafka_conf_get(rk_conf_, name.c_str(), NULL, &size); - else if (rkt_conf_) - res = rd_kafka_topic_conf_get(rkt_conf_, name.c_str(), NULL, &size); - if (res != RD_KAFKA_CONF_OK) - return static_cast<Conf::ConfResult>(res); - - char *tmpValue = new char[size]; - - if (rk_conf_) - res = rd_kafka_conf_get(rk_conf_, name.c_str(), tmpValue, &size); - else if (rkt_conf_) - res = rd_kafka_topic_conf_get(rkt_conf_, name.c_str(), tmpValue, &size); - - if (res == RD_KAFKA_CONF_OK) - value.assign(tmpValue); - delete[] tmpValue; - - return static_cast<Conf::ConfResult>(res); - } - - Conf::ConfResult get(DeliveryReportCb *&dr_cb) const { - if (!rk_conf_) - return Conf::CONF_INVALID; - dr_cb = this->dr_cb_; - return Conf::CONF_OK; - } - - Conf::ConfResult get( - OAuthBearerTokenRefreshCb *&oauthbearer_token_refresh_cb) const { - if (!rk_conf_) - return Conf::CONF_INVALID; - oauthbearer_token_refresh_cb = this->oauthbearer_token_refresh_cb_; - return Conf::CONF_OK; - } - - Conf::ConfResult get(EventCb *&event_cb) const { - if (!rk_conf_) - return Conf::CONF_INVALID; - event_cb = this->event_cb_; - return Conf::CONF_OK; - } - - Conf::ConfResult get(PartitionerCb *&partitioner_cb) const { - if (!rkt_conf_) - return Conf::CONF_INVALID; - partitioner_cb = this->partitioner_cb_; - return Conf::CONF_OK; - } - - Conf::ConfResult get(PartitionerKeyPointerCb *&partitioner_kp_cb) const { - if (!rkt_conf_) - return Conf::CONF_INVALID; - partitioner_kp_cb = this->partitioner_kp_cb_; - return Conf::CONF_OK; - } - - Conf::ConfResult get(SocketCb *&socket_cb) const { - if (!rk_conf_) - return Conf::CONF_INVALID; - socket_cb = this->socket_cb_; - return Conf::CONF_OK; - } - - Conf::ConfResult get(OpenCb *&open_cb) const { - if (!rk_conf_) - return Conf::CONF_INVALID; - open_cb = this->open_cb_; - return Conf::CONF_OK; - } - - Conf::ConfResult get(RebalanceCb *&rebalance_cb) const { - if (!rk_conf_) - return Conf::CONF_INVALID; - rebalance_cb = this->rebalance_cb_; - return Conf::CONF_OK; - } - - Conf::ConfResult get(OffsetCommitCb *&offset_commit_cb) const { - if (!rk_conf_) - return Conf::CONF_INVALID; - offset_commit_cb = this->offset_commit_cb_; - return Conf::CONF_OK; - } - - Conf::ConfResult get(SslCertificateVerifyCb *&ssl_cert_verify_cb) const { - if (!rk_conf_) - return Conf::CONF_INVALID; - ssl_cert_verify_cb = this->ssl_cert_verify_cb_; - return Conf::CONF_OK; - } - - std::list<std::string> *dump(); - - - Conf::ConfResult set(const std::string &name, - ConsumeCb *consume_cb, - std::string &errstr) { - if (name != "consume_cb") { - errstr = "Invalid value type, expected RdKafka::ConsumeCb"; - return Conf::CONF_INVALID; - } - - if (!rk_conf_) { - errstr = "Requires RdKafka::Conf::CONF_GLOBAL object"; - return Conf::CONF_INVALID; - } - - consume_cb_ = consume_cb; - return Conf::CONF_OK; - } - - struct rd_kafka_conf_s *c_ptr_global() { - if (conf_type_ == CONF_GLOBAL) - return rk_conf_; - else - return NULL; - } - - struct rd_kafka_topic_conf_s *c_ptr_topic() { - if (conf_type_ == CONF_TOPIC) - return rkt_conf_; - else - return NULL; - } - - ConsumeCb *consume_cb_; - DeliveryReportCb *dr_cb_; - EventCb *event_cb_; - SocketCb *socket_cb_; - OpenCb *open_cb_; - PartitionerCb *partitioner_cb_; - PartitionerKeyPointerCb *partitioner_kp_cb_; - RebalanceCb *rebalance_cb_; - OffsetCommitCb *offset_commit_cb_; - OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb_; - SslCertificateVerifyCb *ssl_cert_verify_cb_; - ConfType conf_type_; - rd_kafka_conf_t *rk_conf_; - rd_kafka_topic_conf_t *rkt_conf_; -}; - - -class HandleImpl : virtual public Handle { - public: - ~HandleImpl() { - } - HandleImpl() { - } - std::string name() const { - return std::string(rd_kafka_name(rk_)); - } - std::string memberid() const { - char *str = rd_kafka_memberid(rk_); - std::string memberid = str ? str : ""; - if (str) - rd_kafka_mem_free(rk_, str); - return memberid; - } - int poll(int timeout_ms) { - return rd_kafka_poll(rk_, timeout_ms); - } - int outq_len() { - return rd_kafka_outq_len(rk_); - } - - void set_common_config(const RdKafka::ConfImpl *confimpl); - - RdKafka::ErrorCode metadata(bool all_topics, - const Topic *only_rkt, - Metadata **metadatap, - int timeout_ms); - - ErrorCode pause(std::vector<TopicPartition *> &partitions); - ErrorCode resume(std::vector<TopicPartition *> &partitions); - - ErrorCode query_watermark_offsets(const std::string &topic, - int32_t partition, - int64_t *low, - int64_t *high, - int timeout_ms) { - return static_cast<RdKafka::ErrorCode>(rd_kafka_query_watermark_offsets( - rk_, topic.c_str(), partition, low, high, timeout_ms)); - } - - ErrorCode get_watermark_offsets(const std::string &topic, - int32_t partition, - int64_t *low, - int64_t *high) { - return static_cast<RdKafka::ErrorCode>(rd_kafka_get_watermark_offsets( - rk_, topic.c_str(), partition, low, high)); - } - - Queue *get_partition_queue(const TopicPartition *partition); - - Queue *get_sasl_queue() { - rd_kafka_queue_t *rkqu; - rkqu = rd_kafka_queue_get_sasl(rk_); - - if (rkqu == NULL) - return NULL; - - return new QueueImpl(rkqu); - } - - Queue *get_background_queue() { - rd_kafka_queue_t *rkqu; - rkqu = rd_kafka_queue_get_background(rk_); - - if (rkqu == NULL) - return NULL; - - return new QueueImpl(rkqu); - } - - - ErrorCode offsetsForTimes(std::vector<TopicPartition *> &offsets, - int timeout_ms) { - rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets); - ErrorCode err = static_cast<ErrorCode>( - rd_kafka_offsets_for_times(rk_, c_offsets, timeout_ms)); - update_partitions_from_c_parts(offsets, c_offsets); - rd_kafka_topic_partition_list_destroy(c_offsets); - return err; - } - - ErrorCode set_log_queue(Queue *queue); - - void yield() { - rd_kafka_yield(rk_); - } - - std::string clusterid(int timeout_ms) { - char *str = rd_kafka_clusterid(rk_, timeout_ms); - std::string clusterid = str ? str : ""; - if (str) - rd_kafka_mem_free(rk_, str); - return clusterid; - } - - struct rd_kafka_s *c_ptr() { - return rk_; - } - - int32_t controllerid(int timeout_ms) { - return rd_kafka_controllerid(rk_, timeout_ms); - } - - ErrorCode fatal_error(std::string &errstr) const { - char errbuf[512]; - RdKafka::ErrorCode err = static_cast<RdKafka::ErrorCode>( - rd_kafka_fatal_error(rk_, errbuf, sizeof(errbuf))); - if (err) - errstr = errbuf; - return err; - } - - ErrorCode oauthbearer_set_token(const std::string &token_value, - int64_t md_lifetime_ms, - const std::string &md_principal_name, - const std::list<std::string> &extensions, - std::string &errstr) { - char errbuf[512]; - ErrorCode err; - const char **extensions_copy = new const char *[extensions.size()]; - int elem = 0; - - for (std::list<std::string>::const_iterator it = extensions.begin(); - it != extensions.end(); it++) - extensions_copy[elem++] = it->c_str(); - err = static_cast<ErrorCode>(rd_kafka_oauthbearer_set_token( - rk_, token_value.c_str(), md_lifetime_ms, md_principal_name.c_str(), - extensions_copy, extensions.size(), errbuf, sizeof(errbuf))); - delete[] extensions_copy; - - if (err != ERR_NO_ERROR) - errstr = errbuf; - - return err; - } - - ErrorCode oauthbearer_set_token_failure(const std::string &errstr) { - return static_cast<ErrorCode>( - rd_kafka_oauthbearer_set_token_failure(rk_, errstr.c_str())); - } - - Error *sasl_background_callbacks_enable() { - rd_kafka_error_t *c_error = rd_kafka_sasl_background_callbacks_enable(rk_); - - if (c_error) - return new ErrorImpl(c_error); - - return NULL; - } - - Error *sasl_set_credentials(const std::string &username, - const std::string &password) { - rd_kafka_error_t *c_error = - rd_kafka_sasl_set_credentials(rk_, username.c_str(), password.c_str()); - - if (c_error) - return new ErrorImpl(c_error); - - return NULL; - }; - - void *mem_malloc(size_t size) { - return rd_kafka_mem_malloc(rk_, size); - } - - void mem_free(void *ptr) { - rd_kafka_mem_free(rk_, ptr); - } - - rd_kafka_t *rk_; - /* All Producer and Consumer callbacks must reside in HandleImpl and - * the opaque provided to rdkafka must be a pointer to HandleImpl, since - * ProducerImpl and ConsumerImpl classes cannot be safely directly cast to - * HandleImpl due to the skewed diamond inheritance. */ - ConsumeCb *consume_cb_; - EventCb *event_cb_; - SocketCb *socket_cb_; - OpenCb *open_cb_; - DeliveryReportCb *dr_cb_; - PartitionerCb *partitioner_cb_; - PartitionerKeyPointerCb *partitioner_kp_cb_; - RebalanceCb *rebalance_cb_; - OffsetCommitCb *offset_commit_cb_; - OAuthBearerTokenRefreshCb *oauthbearer_token_refresh_cb_; - SslCertificateVerifyCb *ssl_cert_verify_cb_; -}; - - -class TopicImpl : public Topic { - public: - ~TopicImpl() { - rd_kafka_topic_destroy(rkt_); - } - - std::string name() const { - return rd_kafka_topic_name(rkt_); - } - - bool partition_available(int32_t partition) const { - return !!rd_kafka_topic_partition_available(rkt_, partition); - } - - ErrorCode offset_store(int32_t partition, int64_t offset) { - return static_cast<RdKafka::ErrorCode>( - rd_kafka_offset_store(rkt_, partition, offset)); - } - - static Topic *create(Handle &base, const std::string &topic, Conf *conf); - - struct rd_kafka_topic_s *c_ptr() { - return rkt_; - } - - rd_kafka_topic_t *rkt_; - PartitionerCb *partitioner_cb_; - PartitionerKeyPointerCb *partitioner_kp_cb_; -}; - - -/** - * Topic and Partition - */ -class TopicPartitionImpl : public TopicPartition { - public: - ~TopicPartitionImpl() { - } - - static TopicPartition *create(const std::string &topic, int partition); - - TopicPartitionImpl(const std::string &topic, int partition) : - topic_(topic), - partition_(partition), - offset_(RdKafka::Topic::OFFSET_INVALID), - err_(ERR_NO_ERROR), - leader_epoch_(-1) { - } - - TopicPartitionImpl(const std::string &topic, int partition, int64_t offset) : - topic_(topic), - partition_(partition), - offset_(offset), - err_(ERR_NO_ERROR), - leader_epoch_(-1) { - } - - TopicPartitionImpl(const rd_kafka_topic_partition_t *c_part) { - topic_ = std::string(c_part->topic); - partition_ = c_part->partition; - offset_ = c_part->offset; - err_ = static_cast<ErrorCode>(c_part->err); - leader_epoch_ = rd_kafka_topic_partition_get_leader_epoch(c_part); - // FIXME: metadata - } - - static void destroy(std::vector<TopicPartition *> &partitions); - - int partition() const { - return partition_; - } - const std::string &topic() const { - return topic_; - } - - int64_t offset() const { - return offset_; - } - - ErrorCode err() const { - return err_; - } - - void set_offset(int64_t offset) { - offset_ = offset; - } - - int32_t get_leader_epoch() { - return leader_epoch_; - } - - void set_leader_epoch(int32_t leader_epoch) { - leader_epoch_ = leader_epoch_; - } - - std::ostream &operator<<(std::ostream &ostrm) const { - return ostrm << topic_ << " [" << partition_ << "]"; - } - - std::string topic_; - int partition_; - int64_t offset_; - ErrorCode err_; - int32_t leader_epoch_; -}; - - -/** - * @class ConsumerGroupMetadata wraps the - * C rd_kafka_consumer_group_metadata_t object. - */ -class ConsumerGroupMetadataImpl : public ConsumerGroupMetadata { - public: - ~ConsumerGroupMetadataImpl() { - rd_kafka_consumer_group_metadata_destroy(cgmetadata_); - } - - ConsumerGroupMetadataImpl(rd_kafka_consumer_group_metadata_t *cgmetadata) : - cgmetadata_(cgmetadata) { - } - - rd_kafka_consumer_group_metadata_t *cgmetadata_; -}; - - -class KafkaConsumerImpl : virtual public KafkaConsumer, - virtual public HandleImpl { - public: - ~KafkaConsumerImpl() { - if (rk_) - rd_kafka_destroy_flags(rk_, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); - } - - static KafkaConsumer *create(Conf *conf, std::string &errstr); - - ErrorCode assignment(std::vector<TopicPartition *> &partitions); - bool assignment_lost(); - std::string rebalance_protocol() { - const char *str = rd_kafka_rebalance_protocol(rk_); - return std::string(str ? str : ""); - } - ErrorCode subscription(std::vector<std::string> &topics); - ErrorCode subscribe(const std::vector<std::string> &topics); - ErrorCode unsubscribe(); - ErrorCode assign(const std::vector<TopicPartition *> &partitions); - ErrorCode unassign(); - Error *incremental_assign(const std::vector<TopicPartition *> &partitions); - Error *incremental_unassign(const std::vector<TopicPartition *> &partitions); - - Message *consume(int timeout_ms); - ErrorCode commitSync() { - return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 0 /*sync*/)); - } - ErrorCode commitAsync() { - return static_cast<ErrorCode>(rd_kafka_commit(rk_, NULL, 1 /*async*/)); - } - ErrorCode commitSync(Message *message) { - MessageImpl *msgimpl = dynamic_cast<MessageImpl *>(message); - return static_cast<ErrorCode>( - rd_kafka_commit_message(rk_, msgimpl->rkmessage_, 0 /*sync*/)); - } - ErrorCode commitAsync(Message *message) { - MessageImpl *msgimpl = dynamic_cast<MessageImpl *>(message); - return static_cast<ErrorCode>( - rd_kafka_commit_message(rk_, msgimpl->rkmessage_, 1 /*async*/)); - } - - ErrorCode commitSync(std::vector<TopicPartition *> &offsets) { - rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets); - rd_kafka_resp_err_t err = rd_kafka_commit(rk_, c_parts, 0); - if (!err) - update_partitions_from_c_parts(offsets, c_parts); - rd_kafka_topic_partition_list_destroy(c_parts); - return static_cast<ErrorCode>(err); - } - - ErrorCode commitAsync(const std::vector<TopicPartition *> &offsets) { - rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets); - rd_kafka_resp_err_t err = rd_kafka_commit(rk_, c_parts, 1); - rd_kafka_topic_partition_list_destroy(c_parts); - return static_cast<ErrorCode>(err); - } - - ErrorCode commitSync(OffsetCommitCb *offset_commit_cb) { - return static_cast<ErrorCode>(rd_kafka_commit_queue( - rk_, NULL, NULL, RdKafka::offset_commit_cb_trampoline0, - offset_commit_cb)); - } - - ErrorCode commitSync(std::vector<TopicPartition *> &offsets, - OffsetCommitCb *offset_commit_cb) { - rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets); - rd_kafka_resp_err_t err = rd_kafka_commit_queue( - rk_, c_parts, NULL, RdKafka::offset_commit_cb_trampoline0, - offset_commit_cb); - rd_kafka_topic_partition_list_destroy(c_parts); - return static_cast<ErrorCode>(err); - } - - ErrorCode committed(std::vector<TopicPartition *> &partitions, - int timeout_ms); - ErrorCode position(std::vector<TopicPartition *> &partitions); - - ConsumerGroupMetadata *groupMetadata() { - rd_kafka_consumer_group_metadata_t *cgmetadata; - - cgmetadata = rd_kafka_consumer_group_metadata(rk_); - if (!cgmetadata) - return NULL; - - return new ConsumerGroupMetadataImpl(cgmetadata); - } - - ErrorCode close(); - - Error *close(Queue *queue); - - bool closed() { - return rd_kafka_consumer_closed(rk_) ? true : false; - } - - ErrorCode seek(const TopicPartition &partition, int timeout_ms); - - ErrorCode offsets_store(std::vector<TopicPartition *> &offsets) { - rd_kafka_topic_partition_list_t *c_parts = partitions_to_c_parts(offsets); - rd_kafka_resp_err_t err = rd_kafka_offsets_store(rk_, c_parts); - update_partitions_from_c_parts(offsets, c_parts); - rd_kafka_topic_partition_list_destroy(c_parts); - return static_cast<ErrorCode>(err); - } -}; - - -class MetadataImpl : public Metadata { - public: - MetadataImpl(const rd_kafka_metadata_t *metadata); - ~MetadataImpl(); - - const std::vector<const BrokerMetadata *> *brokers() const { - return &brokers_; - } - - const std::vector<const TopicMetadata *> *topics() const { - return &topics_; - } - - std::string orig_broker_name() const { - return std::string(metadata_->orig_broker_name); - } - - int32_t orig_broker_id() const { - return metadata_->orig_broker_id; - } - - private: - const rd_kafka_metadata_t *metadata_; - std::vector<const BrokerMetadata *> brokers_; - std::vector<const TopicMetadata *> topics_; - std::string orig_broker_name_; -}; - - - -class ConsumerImpl : virtual public Consumer, virtual public HandleImpl { - public: - ~ConsumerImpl() { - if (rk_) - rd_kafka_destroy(rk_); - } - static Consumer *create(Conf *conf, std::string &errstr); - - ErrorCode start(Topic *topic, int32_t partition, int64_t offset); - ErrorCode start(Topic *topic, - int32_t partition, - int64_t offset, - Queue *queue); - ErrorCode stop(Topic *topic, int32_t partition); - ErrorCode seek(Topic *topic, - int32_t partition, - int64_t offset, - int timeout_ms); - Message *consume(Topic *topic, int32_t partition, int timeout_ms); - Message *consume(Queue *queue, int timeout_ms); - int consume_callback(Topic *topic, - int32_t partition, - int timeout_ms, - ConsumeCb *cb, - void *opaque); - int consume_callback(Queue *queue, - int timeout_ms, - RdKafka::ConsumeCb *consume_cb, - void *opaque); -}; - - - -class ProducerImpl : virtual public Producer, virtual public HandleImpl { - public: - ~ProducerImpl() { - if (rk_) - rd_kafka_destroy(rk_); - } - - ErrorCode produce(Topic *topic, - int32_t partition, - int msgflags, - void *payload, - size_t len, - const std::string *key, - void *msg_opaque); - - ErrorCode produce(Topic *topic, - int32_t partition, - int msgflags, - void *payload, - size_t len, - const void *key, - size_t key_len, - void *msg_opaque); - - ErrorCode produce(Topic *topic, - int32_t partition, - const std::vector<char> *payload, - const std::vector<char> *key, - void *msg_opaque); - - ErrorCode produce(const std::string topic_name, - int32_t partition, - int msgflags, - void *payload, - size_t len, - const void *key, - size_t key_len, - int64_t timestamp, - void *msg_opaque); - - ErrorCode produce(const std::string topic_name, - int32_t partition, - int msgflags, - void *payload, - size_t len, - const void *key, - size_t key_len, - int64_t timestamp, - RdKafka::Headers *headers, - void *msg_opaque); - - ErrorCode flush(int timeout_ms) { - return static_cast<RdKafka::ErrorCode>(rd_kafka_flush(rk_, timeout_ms)); - } - - ErrorCode purge(int purge_flags) { - return static_cast<RdKafka::ErrorCode>( - rd_kafka_purge(rk_, (int)purge_flags)); - } - - Error *init_transactions(int timeout_ms) { - rd_kafka_error_t *c_error; - - c_error = rd_kafka_init_transactions(rk_, timeout_ms); - - if (c_error) - return new ErrorImpl(c_error); - else - return NULL; - } - - Error *begin_transaction() { - rd_kafka_error_t *c_error; - - c_error = rd_kafka_begin_transaction(rk_); - - if (c_error) - return new ErrorImpl(c_error); - else - return NULL; - } - - Error *send_offsets_to_transaction( - const std::vector<TopicPartition *> &offsets, - const ConsumerGroupMetadata *group_metadata, - int timeout_ms) { - rd_kafka_error_t *c_error; - const RdKafka::ConsumerGroupMetadataImpl *cgmdimpl = - dynamic_cast<const RdKafka::ConsumerGroupMetadataImpl *>( - group_metadata); - rd_kafka_topic_partition_list_t *c_offsets = partitions_to_c_parts(offsets); - - c_error = rd_kafka_send_offsets_to_transaction( - rk_, c_offsets, cgmdimpl->cgmetadata_, timeout_ms); - - rd_kafka_topic_partition_list_destroy(c_offsets); - - if (c_error) - return new ErrorImpl(c_error); - else - return NULL; - } - - Error *commit_transaction(int timeout_ms) { - rd_kafka_error_t *c_error; - - c_error = rd_kafka_commit_transaction(rk_, timeout_ms); - - if (c_error) - return new ErrorImpl(c_error); - else - return NULL; - } - - Error *abort_transaction(int timeout_ms) { - rd_kafka_error_t *c_error; - - c_error = rd_kafka_abort_transaction(rk_, timeout_ms); - - if (c_error) - return new ErrorImpl(c_error); - else - return NULL; - } - - static Producer *create(Conf *conf, std::string &errstr); -}; - - - -} // namespace RdKafka - -#endif /* _RDKAFKACPP_INT_H_ */ |